blob: 8df974bf9e5101fb55047eb3c4b1232ba67c93b4 [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001/*
Joey Armstrong89c812c2024-01-12 19:00:20 -05002 * Copyright 2020-2024 Open Networking Foundation (ONF) and the ONF Contributors
Holger Hildebrandtfa074992020-03-27 15:42:06 +00003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Joey Armstrong89c812c2024-01-12 19:00:20 -050017// Package main -> this is the entry point of the OpenOnuAdapter
Holger Hildebrandtfa074992020-03-27 15:42:06 +000018package main
19
20import (
21 "context"
22 "errors"
23 "fmt"
Abhay Kumarb5c1d7a2025-12-09 08:10:00 +000024 "net/http"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000025 "os"
26 "os/signal"
Andrea Campanella3d7c9312021-01-19 09:20:49 +010027 "strings"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000028 "syscall"
29 "time"
30
nikesh.krishnanca4afa32023-06-28 03:42:16 +053031 grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
khenaidoo7d3c5582021-08-11 18:09:44 -040032 conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
33 "github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
34 "github.com/opencord/voltha-lib-go/v7/pkg/events"
35 "github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
36 vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
37 "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
38 "github.com/opencord/voltha-lib-go/v7/pkg/log"
39 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
40 "github.com/opencord/voltha-lib-go/v7/pkg/version"
khenaidoo42dcdfd2021-10-19 17:34:12 -040041 "github.com/opencord/voltha-protos/v5/go/adapter_service"
42 "github.com/opencord/voltha-protos/v5/go/core_service"
khenaidoo42dcdfd2021-10-19 17:34:12 -040043 "github.com/opencord/voltha-protos/v5/go/onu_inter_adapter_service"
khenaidoo7d3c5582021-08-11 18:09:44 -040044 "github.com/opencord/voltha-protos/v5/go/voltha"
Abhay Kumarb5c1d7a2025-12-09 08:10:00 +000045 "github.com/prometheus/client_golang/prometheus/promhttp"
khenaidoo7d3c5582021-08-11 18:09:44 -040046 "google.golang.org/grpc"
nikesh.krishnanca4afa32023-06-28 03:42:16 +053047 codes "google.golang.org/grpc/codes"
khenaidoo7d3c5582021-08-11 18:09:44 -040048
khenaidoo42dcdfd2021-10-19 17:34:12 -040049 "github.com/opencord/voltha-protos/v5/go/core_adapter"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000050
Matteo Scandolo761f7512020-11-23 15:52:40 -080051 "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/config"
Holger Hildebrandt4b5e73f2021-08-19 06:51:21 +000052 ac "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/core"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000053)
54
khenaidoo7d3c5582021-08-11 18:09:44 -040055const (
56 clusterMessagingService = "cluster-message-service"
57 onuAdapterService = "onu-adapter-service"
58 kvService = "kv-service"
59 coreService = "core-service"
60)
61
Holger Hildebrandtfa074992020-03-27 15:42:06 +000062type adapter struct {
khenaidoof3333552021-12-15 16:52:31 -050063 kafkaClient kafka.Client
64 kvClient kvstore.Client
65 eventProxy eventif.EventProxy
Akash Reddy Kankanala92dfdf82025-03-23 22:07:09 +053066 config *config.AdapterFlags
khenaidoof3333552021-12-15 16:52:31 -050067 grpcServer *vgrpc.GrpcServer
68 onuAdapter *ac.OpenONUAC
69 onuInterAdapter *ac.OpenONUACInterAdapter
70 coreClient *vgrpc.Client
Akash Reddy Kankanala92dfdf82025-03-23 22:07:09 +053071 //defaultAppName string
72 instanceID string
Holger Hildebrandtfa074992020-03-27 15:42:06 +000073}
74
Holger Hildebrandtfa074992020-03-27 15:42:06 +000075func newAdapter(cf *config.AdapterFlags) *adapter {
76 var a adapter
77 a.instanceID = cf.InstanceID
78 a.config = cf
Holger Hildebrandtfa074992020-03-27 15:42:06 +000079 return &a
80}
81
82func (a *adapter) start(ctx context.Context) error {
dbainbri4d3a0dc2020-12-02 00:33:42 +000083 logger.Info(ctx, "Starting Core Adapter components")
Holger Hildebrandtfa074992020-03-27 15:42:06 +000084 var err error
85
86 var p *probe.Probe
87 if value := ctx.Value(probe.ProbeContextKey); value != nil {
88 if _, ok := value.(*probe.Probe); ok {
89 p = value.(*probe.Probe)
90 p.RegisterService(
dbainbri4d3a0dc2020-12-02 00:33:42 +000091 ctx,
khenaidoo7d3c5582021-08-11 18:09:44 -040092 clusterMessagingService,
93 kvService,
94 onuAdapterService,
95 coreService,
Holger Hildebrandtfa074992020-03-27 15:42:06 +000096 )
97 }
98 }
99
100 // Setup KV Client
dbainbri4d3a0dc2020-12-02 00:33:42 +0000101 logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
102 if err = a.setKVClient(ctx); err != nil {
103 logger.Fatalw(ctx, "error-setting-kv-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000104 }
105
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000106 // Setup Log Config
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800107 cm := conf.NewConfigManager(ctx, a.kvClient, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000108 go conf.StartLogLevelConfigProcessing(cm, ctx)
109
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000110 // Setup Kafka Client
akashreddykff176ea2025-09-16 23:20:16 +0530111 if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000112 logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000113 }
114
khenaidoo7d3c5582021-08-11 18:09:44 -0400115 // Start kafka communication with the broker
Akash Reddy Kankanala92dfdf82025-03-23 22:07:09 +0530116 if err = kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, a.kafkaClient, a.config.HeartbeatCheckInterval, clusterMessagingService); err != nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400117 logger.Fatal(ctx, "unable-to-connect-to-kafka")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000118 }
119
khenaidoo7d3c5582021-08-11 18:09:44 -0400120 // Wait until connection to KV store is established
Akash Reddy Kankanala92dfdf82025-03-23 22:07:09 +0530121 if err = WaitUntilKvStoreConnectionIsUp(ctx, a.kvClient, a.config.KVStoreTimeout, kvService); err != nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400122 logger.Fatal(ctx, "unable-to-connect-to-kv-store")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000123 }
124
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000125 // Create the event proxy to post events to KAFKA
Himani Chawlac07fda02020-12-09 16:21:21 +0530126 a.eventProxy = events.NewEventProxy(events.MsgClient(a.kafkaClient), events.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
khenaidoo7d3c5582021-08-11 18:09:44 -0400127 go func() {
Akash Reddy Kankanala92dfdf82025-03-23 22:07:09 +0530128 if err = a.eventProxy.Start(); err != nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400129 logger.Fatalw(ctx, "event-proxy-cannot-start", log.Fields{"error": err})
130 }
131 }()
132
133 // Create the Core client to handle requests to the Core. Note that the coreClient is an interface and needs to be
134 // cast to the appropriate grpc client by invoking GetCoreGrpcClient on the a.coreClient
khenaidoo55cebc62021-12-08 14:44:41 -0500135 if a.coreClient, err = vgrpc.NewClient(
136 a.config.AdapterEndpoint,
137 a.config.CoreEndpoint,
khenaidoof3333552021-12-15 16:52:31 -0500138 "core_service.CoreService",
khenaidoo55cebc62021-12-08 14:44:41 -0500139 a.coreRestarted); err != nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400140 logger.Fatal(ctx, "grpc-client-not-created")
141 }
nikesh.krishnanca4afa32023-06-28 03:42:16 +0530142 // the backoff function sets the wait time bw each grpc retries, if not set it will take the deafault value of 50ms which is too low, the jitter sets the rpc retry wait time to be in a range of[PerRPCRetryTimeout-0.2, PerRPCRetryTimeout+0.2]
143 backoffCtxOption := grpc_retry.WithBackoff(grpc_retry.BackoffLinearWithJitter(a.config.PerRPCRetryTimeout, 0.2))
144
145 retryCodes := []codes.Code{
146 codes.Unavailable, // server is currently unavailable
147 codes.DeadlineExceeded, // deadline for the operation was exceeded
148 }
149 grpcRetryOptions := grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(a.config.MaxRetries), grpc_retry.WithPerRetryTimeout(a.config.PerRPCRetryTimeout), grpc_retry.WithCodes(retryCodes...), backoffCtxOption)
150 logger.Debug(ctx, "Configuration values", log.Fields{"RETRY": a.config.MaxRetries, "TIMEOUT": a.config.PerRPCRetryTimeout})
khenaidoo7d3c5582021-08-11 18:09:44 -0400151 // Start the core grpc client
nikesh.krishnanca4afa32023-06-28 03:42:16 +0530152 go a.coreClient.Start(ctx, getCoreServiceClientHandler, grpcRetryOptions)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000153
154 // Create the open ONU interface adapter
khenaidoof3333552021-12-15 16:52:31 -0500155 if a.onuAdapter, err = a.startONUAdapter(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
156 logger.Fatalw(ctx, "error-starting-startONUAdapter", log.Fields{"error": err})
157 }
158
159 // Create the open ONU Inter adapter
160 if a.onuInterAdapter, err = a.startONUInterAdapter(ctx, a.onuAdapter); err != nil {
161 logger.Fatalw(ctx, "error-starting-startONUInterAdapter", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000162 }
163
khenaidoo7d3c5582021-08-11 18:09:44 -0400164 // Create and start the grpc server
165 a.grpcServer = vgrpc.NewGrpcServer(a.config.GrpcAddress, nil, false, p)
166
167 //Register the adapter service
168 a.addAdapterService(ctx, a.grpcServer, a.onuAdapter)
169
170 //Register the onu inter adapter service
khenaidoof3333552021-12-15 16:52:31 -0500171 a.addOnuInterAdapterService(ctx, a.grpcServer, a.onuInterAdapter)
khenaidoo7d3c5582021-08-11 18:09:44 -0400172
173 go a.startGRPCService(ctx, a.grpcServer, onuAdapterService)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000174
175 // Register this adapter to the Core - retries indefinitely
khenaidoo7d3c5582021-08-11 18:09:44 -0400176 if err = a.registerWithCore(ctx, coreService, -1); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000177 logger.Fatalw(ctx, "error-registering-with-core", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000178 }
179
khenaidoo7d3c5582021-08-11 18:09:44 -0400180 // Start the readiness and liveliness check and update the probe status
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000181 a.checkServicesReadiness(ctx)
182 return err
183}
184
khenaidoo7d3c5582021-08-11 18:09:44 -0400185// TODO: Any action the adapter needs to do following a Core restart?
186func (a *adapter) coreRestarted(ctx context.Context, endPoint string) error {
187 logger.Errorw(ctx, "core-restarted", log.Fields{"endpoint": endPoint})
188 return nil
189}
190
khenaidoof3333552021-12-15 16:52:31 -0500191// getCoreServiceClientHandler is used to setup the remote gRPC service
192func getCoreServiceClientHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
193 if conn == nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400194 return nil
195 }
khenaidoof3333552021-12-15 16:52:31 -0500196 return core_service.NewCoreServiceClient(conn)
khenaidoo7d3c5582021-08-11 18:09:44 -0400197}
198
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000199func (a *adapter) stop(ctx context.Context) {
khenaidoof3333552021-12-15 16:52:31 -0500200 // Cleanup the grpc services first
201 if err := a.onuAdapter.Stop(ctx); err != nil {
202 logger.Errorw(ctx, "failure-stopping-onu-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterEndpoint})
203 }
204 if err := a.onuInterAdapter.Stop(ctx); err != nil {
205 logger.Errorw(ctx, "failure-stopping-onu-inter-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterEndpoint})
206 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000207 // Cleanup - applies only if we had a kvClient
208 if a.kvClient != nil {
209 // Release all reservations
210 if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000211 logger.Infow(ctx, "fail-to-release-all-reservations", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000212 }
213 // Close the DB connection
Girish Gowdra55507832022-06-01 18:12:06 -0700214 go a.kvClient.Close(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000215 }
216
khenaidoo7d3c5582021-08-11 18:09:44 -0400217 if a.eventProxy != nil {
218 a.eventProxy.Stop()
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000219 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400220
221 if a.kafkaClient != nil {
222 a.kafkaClient.Stop(ctx)
223 }
224
225 // Stop core client
226 if a.coreClient != nil {
227 a.coreClient.Stop(ctx)
228 }
229
230 // TODO: More cleanup
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000231}
232
233// #############################################
234// Adapter Utility methods ##### begin #########
235
dbainbri4d3a0dc2020-12-02 00:33:42 +0000236func newKVClient(ctx context.Context, storeType, address string, timeout time.Duration) (kvstore.Client, error) {
237 logger.Infow(ctx, "kv-store-type", log.Fields{"store": storeType})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000238 switch storeType {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000239 case "etcd":
dbainbri4d3a0dc2020-12-02 00:33:42 +0000240 return kvstore.NewEtcdClient(ctx, address, timeout, log.FatalLevel)
Abhay Kumar3282a142024-07-12 06:03:12 +0530241 case "redis":
242 return kvstore.NewRedisClient(address, timeout, false)
243 case "redis-sentinel":
244 return kvstore.NewRedisClient(address, timeout, true)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000245 }
246 return nil, errors.New("unsupported-kv-store")
247}
248
akashreddykff176ea2025-09-16 23:20:16 +0530249func newKafkaClient(ctx context.Context, clientType string, config *config.AdapterFlags) (kafka.Client, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000250
dbainbri4d3a0dc2020-12-02 00:33:42 +0000251 logger.Infow(ctx, "common-client-type", log.Fields{"client": clientType})
mpagenkoaf801632020-07-03 10:00:42 +0000252
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000253 switch clientType {
254 case "sarama":
255 return kafka.NewSaramaClient(
akashreddykff176ea2025-09-16 23:20:16 +0530256 kafka.Address(config.KafkaClusterAddress),
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000257 kafka.ProducerReturnOnErrors(true),
258 kafka.ProducerReturnOnSuccess(true),
akashreddykff176ea2025-09-16 23:20:16 +0530259 kafka.ProducerMaxRetries(config.ProducerRetryMax),
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000260 kafka.ProducerRetryBackoff(time.Millisecond*30),
akashreddykff176ea2025-09-16 23:20:16 +0530261 kafka.MetadatMaxRetries(config.MetadataRetryMax)), nil
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000262 }
263
264 return nil, errors.New("unsupported-client-type")
265}
266
dbainbri4d3a0dc2020-12-02 00:33:42 +0000267func (a *adapter) setKVClient(ctx context.Context) error {
Matteo Scandolo127c59d2021-01-28 11:31:18 -0800268 client, err := newKVClient(ctx, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000269 if err != nil {
270 a.kvClient = nil
dbainbri4d3a0dc2020-12-02 00:33:42 +0000271 logger.Errorw(ctx, "error-starting-KVClient", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000272 return err
273 }
274 a.kvClient = client
275 return nil
276}
277
khenaidoof3333552021-12-15 16:52:31 -0500278func (a *adapter) startONUAdapter(ctx context.Context, cc *vgrpc.Client, ep eventif.EventProxy,
Matteo Scandolof1f39a72020-11-24 12:08:11 -0800279 cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenONUAC, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000280 var err error
khenaidoo7d3c5582021-08-11 18:09:44 -0400281 sAcONU := ac.NewOpenONUAC(ctx, cc, ep, a.kvClient, cfg, cm)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000282
283 if err = sAcONU.Start(ctx); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000284 logger.Fatalw(ctx, "error-starting-OpenOnuAdapterCore", log.Fields{"error": err})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000285 return nil, err
286 }
287
dbainbri4d3a0dc2020-12-02 00:33:42 +0000288 logger.Info(ctx, "open-ont-OpenOnuAdapterCore-started")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000289 return sAcONU, nil
290}
291
khenaidoof3333552021-12-15 16:52:31 -0500292func (a *adapter) startONUInterAdapter(ctx context.Context, onuA *ac.OpenONUAC) (*ac.OpenONUACInterAdapter, error) {
293 var err error
294 sAcONUInterAdapter := ac.NewOpenONUACAdapter(ctx, onuA)
295
296 if err = sAcONUInterAdapter.Start(ctx); err != nil {
297 logger.Fatalw(ctx, "error-starting-OpenONUACInterAdapter", log.Fields{"error": err})
298 return nil, err
299 }
300
301 logger.Info(ctx, "OpenONUACInterAdapter-started")
302 return sAcONUInterAdapter, nil
303}
304
khenaidoo7d3c5582021-08-11 18:09:44 -0400305func (a *adapter) registerWithCore(ctx context.Context, serviceName string, retries int) error {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000306 adapterID := fmt.Sprintf("brcm_openomci_onu_%d", a.config.CurrentReplica)
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100307 vendorIdsList := strings.Split(a.config.OnuVendorIds, ",")
dbainbri4d3a0dc2020-12-02 00:33:42 +0000308 logger.Infow(ctx, "registering-with-core", log.Fields{
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000309 "adapterID": adapterID,
310 "currentReplica": a.config.CurrentReplica,
311 "totalReplicas": a.config.TotalReplicas,
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100312 "onuVendorIds": vendorIdsList,
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000313 })
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700314 adapterDescription := &voltha.Adapter{
Andrea Campanella961734c2021-01-18 11:44:47 +0100315 Id: adapterID, // Unique name for the device type ->exact type required for OLT comm????
316 Vendor: "VOLTHA OpenONUGo",
317 Version: version.VersionInfo.Version,
khenaidoo7d3c5582021-08-11 18:09:44 -0400318 Endpoint: a.config.AdapterEndpoint,
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700319 Type: "brcm_openomci_onu",
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000320 CurrentReplica: int32(a.config.CurrentReplica),
321 TotalReplicas: int32(a.config.TotalReplicas),
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700322 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000323 types := []*voltha.DeviceType{{Id: "brcm_openomci_onu",
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100324 VendorIds: vendorIdsList,
khenaidoo7d3c5582021-08-11 18:09:44 -0400325 AdapterType: "brcm_openomci_onu", // Type of adapter that handles this device type
326 Adapter: "brcm_openomci_onu", // Deprecated attribute
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000327 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
328 AcceptsAddRemoveFlowUpdates: true}}
329 deviceTypes := &voltha.DeviceTypes{Items: types}
330 count := 0
331 for {
khenaidoo7d3c5582021-08-11 18:09:44 -0400332 gClient, err := a.coreClient.GetCoreServiceClient()
333 if gClient != nil {
334 if gClient != nil {
khenaidoo42dcdfd2021-10-19 17:34:12 -0400335 if _, err = gClient.RegisterAdapter(log.WithSpanFromContext(context.TODO(), ctx), &core_adapter.AdapterRegistration{
khenaidoo7d3c5582021-08-11 18:09:44 -0400336 Adapter: adapterDescription,
337 DTypes: deviceTypes}); err == nil {
338 break
339 }
340 }
341 logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"endpoint": a.config.CoreEndpoint, "error": err, "count": count, "gclient": gClient})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000342 if retries == count {
343 return err
344 }
345 count++
khenaidoo7d3c5582021-08-11 18:09:44 -0400346 // Take a power nap before retrying
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000347 time.Sleep(2 * time.Second)
khenaidoo7d3c5582021-08-11 18:09:44 -0400348
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000349 }
350 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400351 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000352 logger.Info(ctx, "registered-with-core")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000353 return nil
354}
355
khenaidoo7d3c5582021-08-11 18:09:44 -0400356// startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
357func (a *adapter) startGRPCService(ctx context.Context, server *vgrpc.GrpcServer, serviceName string) {
358 logger.Infow(ctx, "service-created", log.Fields{"service": serviceName})
359
360 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
361 logger.Infow(ctx, "service-started", log.Fields{"service": serviceName})
362
363 server.Start(ctx)
364 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusStopped)
365}
366
khenaidoo42dcdfd2021-10-19 17:34:12 -0400367func (a *adapter) addAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler adapter_service.AdapterServiceServer) {
khenaidoo7d3c5582021-08-11 18:09:44 -0400368 logger.Info(ctx, "adding-adapter-service")
369
370 server.AddService(func(gs *grpc.Server) {
khenaidoo42dcdfd2021-10-19 17:34:12 -0400371 adapter_service.RegisterAdapterServiceServer(gs, handler)
khenaidoo7d3c5582021-08-11 18:09:44 -0400372 })
373}
374
khenaidoo42dcdfd2021-10-19 17:34:12 -0400375func (a *adapter) addOnuInterAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler onu_inter_adapter_service.OnuInterAdapterServiceServer) {
khenaidoo7d3c5582021-08-11 18:09:44 -0400376 logger.Info(ctx, "adding-onu-inter-adapter-service")
377
378 server.AddService(func(gs *grpc.Server) {
khenaidoo42dcdfd2021-10-19 17:34:12 -0400379 onu_inter_adapter_service.RegisterOnuInterAdapterServiceServer(gs, handler)
khenaidoo7d3c5582021-08-11 18:09:44 -0400380 })
381}
382
Joey Armstrong89c812c2024-01-12 19:00:20 -0500383/*
384*
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000385This function checks the liveliness and readiness of the kakfa and kv-client services
386and update the status in the probe.
387*/
388func (a *adapter) checkServicesReadiness(ctx context.Context) {
389 // checks the kafka readiness
khenaidoo7d3c5582021-08-11 18:09:44 -0400390 go kafka.MonitorKafkaReadiness(ctx, a.kafkaClient, a.config.LiveProbeInterval, a.config.NotLiveProbeInterval, clusterMessagingService)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000391
392 // checks the kv-store readiness
393 go a.checkKvStoreReadiness(ctx)
394}
395
Joey Armstrong89c812c2024-01-12 19:00:20 -0500396/*
397*
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000398This function checks the liveliness and readiness of the kv-store service
399and update the status in the probe.
400*/
401func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
402 // dividing the live probe interval by 2 to get updated status every 30s
403 timeout := a.config.LiveProbeInterval / 2
404 kvStoreChannel := make(chan bool, 1)
405
khenaidoo7d3c5582021-08-11 18:09:44 -0400406 // Default true - we are here only after we already had a KV store connection
407 kvStoreChannel <- true
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000408 for {
409 timeoutTimer := time.NewTimer(timeout)
410 select {
411 case liveliness := <-kvStoreChannel:
412 if !liveliness {
413 // kv-store not reachable or down, updating the status to not ready state
khenaidoo7d3c5582021-08-11 18:09:44 -0400414 probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusNotReady)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000415 timeout = a.config.NotLiveProbeInterval
416 } else {
417 // kv-store is reachable , updating the status to running state
khenaidoo7d3c5582021-08-11 18:09:44 -0400418 probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusRunning)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000419 timeout = a.config.LiveProbeInterval / 2
420 }
421 // Check if the timer has expired or not
422 if !timeoutTimer.Stop() {
423 <-timeoutTimer.C
424 }
425 case <-timeoutTimer.C:
426 // Check the status of the kv-store
Akash Soni840f8d62024-12-11 19:37:06 +0530427 logger.Debug(ctx, "kv-store liveliness-recheck")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000428 if a.kvClient.IsConnectionUp(ctx) {
429 kvStoreChannel <- true
430 } else {
431 kvStoreChannel <- false
432 }
433 }
434 }
435}
436
khenaidoo7d3c5582021-08-11 18:09:44 -0400437// WaitUntilKvStoreConnectionIsUp waits until the KV client can establish a connection to the KV server or until the
438// context times out.
439func WaitUntilKvStoreConnectionIsUp(ctx context.Context, kvClient kvstore.Client, connectionRetryInterval time.Duration, serviceName string) error {
440 if kvClient == nil {
441 return errors.New("kvclient-is-nil")
442 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000443 for {
khenaidoo7d3c5582021-08-11 18:09:44 -0400444 if !kvClient.IsConnectionUp(ctx) {
445 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
446 logger.Warnw(ctx, "kvconnection-down", log.Fields{"service-name": serviceName, "connect-retry-interval": connectionRetryInterval})
447 select {
448 case <-time.After(connectionRetryInterval):
449 continue
450 case <-ctx.Done():
451 return ctx.Err()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000452 }
453 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400454 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
455 logger.Info(ctx, "kv-connection-up")
456 break
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000457 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400458 return nil
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000459}
460
461// Adapter Utility methods ##### end #########
462// #############################################
463
dbainbri4d3a0dc2020-12-02 00:33:42 +0000464func getVerifiedCodeVersion(ctx context.Context) string {
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000465 if version.VersionInfo.Version == "unknown-version" {
Akash Reddy Kankanala92dfdf82025-03-23 22:07:09 +0530466 content, err := os.ReadFile("VERSION")
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000467 if err == nil {
Andrea Campanella3d7c9312021-01-19 09:20:49 +0100468 return string(content)
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000469 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000470 logger.Error(ctx, "'VERSION'-file not readable")
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000471 }
472 return version.VersionInfo.Version
473}
474
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000475func printVersion(appName string) {
476 fmt.Println(appName)
477 fmt.Println(version.VersionInfo.String(" "))
478}
479
480func printBanner() {
481 fmt.Println(" ____ ____ ___ ___ _ ")
482 fmt.Println(" / __ \\ / __ \\| \\ \\ | | | | |")
483 fmt.Println(" | | | |_ __ ___ _ __ | | | | |\\ \\ | | | | | ____ ____")
484 fmt.Println(" | | | | '_ \\ / _ \\ '_ \\ | | | | | \\ \\ | | | | | / '_ \\ / _' \\")
485 fmt.Println(" | |__| | |_) | __/| | | || |__| | | \\ \\| | \\__/ || (__) | (__) |")
486 fmt.Println(" \\___ /| .__/ \\___|_| |_| \\____/|_| \\___|______| \\.___ |\\___./")
487 fmt.Println(" | | __| |")
488 fmt.Println(" |_| |____/")
489 fmt.Println(" ")
490}
491
492func waitForExit(ctx context.Context) int {
493 signalChannel := make(chan os.Signal, 1)
494 signal.Notify(signalChannel,
495 syscall.SIGHUP,
496 syscall.SIGINT,
497 syscall.SIGTERM,
498 syscall.SIGQUIT)
499
500 exitChannel := make(chan int)
501
502 go func() {
503 select {
504 case <-ctx.Done():
dbainbri4d3a0dc2020-12-02 00:33:42 +0000505 logger.Infow(ctx, "Adapter run aborted due to internal errors", log.Fields{"context": "done"})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000506 exitChannel <- 2
507 case s := <-signalChannel:
508 switch s {
509 case syscall.SIGHUP,
510 syscall.SIGINT,
511 syscall.SIGTERM,
512 syscall.SIGQUIT:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000513 logger.Infow(ctx, "closing-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000514 exitChannel <- 0
515 default:
dbainbri4d3a0dc2020-12-02 00:33:42 +0000516 logger.Infow(ctx, "unexpected-signal-received", log.Fields{"signal": s})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000517 exitChannel <- 1
518 }
519 }
520 }()
521
522 code := <-exitChannel
523 return code
524}
525
526func main() {
527 start := time.Now()
dbainbri4d3a0dc2020-12-02 00:33:42 +0000528 ctx, cancel := context.WithCancel(context.Background())
529 defer cancel()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000530
khenaidoo7d3c5582021-08-11 18:09:44 -0400531 cf := &config.AdapterFlags{}
532 cf.ParseCommandArguments(os.Args[1:])
533
dbainbri4d3a0dc2020-12-02 00:33:42 +0000534 defaultAppName := cf.InstanceID + "_" + getVerifiedCodeVersion(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000535
536 // Setup logging
537
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000538 logLevel, err := log.StringToLogLevel(cf.LogLevel)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700539 if err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000540 logger.Fatalf(ctx, "Cannot setup logging, %s", err)
Matteo Scandolo2e6f1e32020-04-15 11:28:45 -0700541 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000542
543 // Setup default logger - applies for packages that do not have specific logger set
Akash Reddy Kankanala92dfdf82025-03-23 22:07:09 +0530544 if _, err = log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000545 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000546 }
547
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000548 // Update all loggers (provisioned via init) with a common field
Akash Reddy Kankanala92dfdf82025-03-23 22:07:09 +0530549 if err = log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
dbainbri4d3a0dc2020-12-02 00:33:42 +0000550 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000551 }
552
Holger Hildebrandt0f9b88d2020-04-20 13:33:25 +0000553 log.SetAllLogLevel(logLevel)
554
dbainbri4d3a0dc2020-12-02 00:33:42 +0000555 realMain(ctx) //fatal on httpListen(0,6060) ...
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000556
Himani Chawla4d908332020-08-31 12:30:20 +0530557 defer func() {
558 _ = log.CleanUp()
559 }()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000560 // Print version / build information and exit
561 if cf.DisplayVersionOnly {
562 printVersion(defaultAppName)
563 return
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000564 }
dbainbri4d3a0dc2020-12-02 00:33:42 +0000565 logger.Infow(ctx, "config", log.Fields{"StartName": defaultAppName})
566 logger.Infow(ctx, "config", log.Fields{"BuildVersion": version.VersionInfo.String(" ")})
567 logger.Infow(ctx, "config", log.Fields{"Arguments": os.Args[1:]})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000568
569 // Print banner if specified
570 if cf.Banner {
571 printBanner()
572 }
573
dbainbri4d3a0dc2020-12-02 00:33:42 +0000574 logger.Infow(ctx, "config", log.Fields{"config": *cf})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000575
576 ad := newAdapter(cf)
Abhay Kumarb5c1d7a2025-12-09 08:10:00 +0000577 http.Handle("/metrics", promhttp.Handler())
578 go func() {
579 logger.Infof(ctx, "Metrics available at %s/metrics", ad.config.PrometheusAddress)
580 // Create HTTP server with explicit timeouts to prevent slowloris attacks and resource exhaustion.
581 // Using http.ListenAndServe() directly doesn't allow setting timeouts, which is a security risk.
582 // The server uses http.DefaultServeMux (nil handler) which includes the /metrics endpoint registered above.
583 metricsServer := &http.Server{
584 Addr: ad.config.PrometheusAddress,
585 Handler: nil,
586 ReadHeaderTimeout: 10 * time.Second,
587 ReadTimeout: 30 * time.Second,
588 WriteTimeout: 30 * time.Second,
589 IdleTimeout: 120 * time.Second,
590 }
591 if err := metricsServer.ListenAndServe(); err != nil {
592 logger.Errorw(ctx, "failed to start metrics HTTP server: ", log.Fields{"error": err})
593 }
594 }()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000595
596 p := &probe.Probe{}
dbainbri4d3a0dc2020-12-02 00:33:42 +0000597 logger.Infow(ctx, "resources", log.Fields{"Context": ctx, "Adapter": ad.instanceID, "ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000598
dbainbri4d3a0dc2020-12-02 00:33:42 +0000599 go p.ListenAndServe(ctx, fmt.Sprintf("%s:%d", ad.config.ProbeHost, ad.config.ProbePort))
600 logger.Infow(ctx, "probeState", log.Fields{"ProbeCoreState": p.GetStatus("register-with-core")})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000601
602 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
603
dbainbri4d3a0dc2020-12-02 00:33:42 +0000604 closer, err := log.GetGlobalLFM().InitTracingAndLogCorrelation(cf.TraceEnabled, cf.TraceAgentAddress, cf.LogCorrelationEnabled)
605 if err != nil {
606 logger.Warnw(ctx, "unable-to-initialize-tracing-and-log-correlation-module", log.Fields{"error": err})
607 } else {
608 defer log.TerminateTracing(closer)
609 }
610
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000611 go func() {
612 err := ad.start(probeCtx)
613 // If this operation returns an error
614 // cancel all operations using this context
615 if err != nil {
616 cancel()
617 }
618 }()
619
620 code := waitForExit(ctx)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000621 logger.Infow(ctx, "received-a-closing-signal", log.Fields{"code": code})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000622
khenaidoo1fd58e02021-10-13 11:51:20 -0400623 // Set the ONU adapter GRPC service as not ready. This will prevent any request from coming to this adapter instance
624 probe.UpdateStatusFromContext(probeCtx, onuAdapterService, probe.ServiceStatusStopped)
625
Girish Gowdra55507832022-06-01 18:12:06 -0700626 // Use context with cancel as etcd-client stop could take more time sometimes to stop slowing down container shutdown.
627 ctxWithCancel, cancelFunc := context.WithCancel(ctx)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000628 // Cleanup before leaving
Girish Gowdra55507832022-06-01 18:12:06 -0700629 ad.stop(ctxWithCancel)
630 // Will halt any long-running stop routine gracefully
631 cancelFunc()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000632
633 elapsed := time.Since(start)
dbainbri4d3a0dc2020-12-02 00:33:42 +0000634 logger.Infow(ctx, "run-time", log.Fields{"Name": "openadapter", "time": elapsed / time.Microsecond})
635 //logger.Infow(ctx,"run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000636}