blob: 4b56d02cfcb79d1f42ae683750cac4491bc05a6c [file] [log] [blame]
cuilin20187b2a8c32019-03-26 19:52:28 -07001/*
Joey Armstrong11f5a572024-01-12 19:11:32 -05002* Copyright 2018-2024 Open Networking Foundation (ONF) and the ONF Contributors
cuilin20187b2a8c32019-03-26 19:52:28 -07003
cbabu116b73f2019-12-10 17:56:32 +05304* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
cuilin20187b2a8c32019-03-26 19:52:28 -07007
cbabu116b73f2019-12-10 17:56:32 +05308* http://www.apache.org/licenses/LICENSE-2.0
cuilin20187b2a8c32019-03-26 19:52:28 -07009
cbabu116b73f2019-12-10 17:56:32 +053010* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
cuilin20187b2a8c32019-03-26 19:52:28 -070015 */
Girish Gowdru6a80bbd2019-07-02 07:36:09 -070016
Joey Armstrong87b55f72023-06-27 12:12:53 -040017// Package main invokes the application
cuilin20187b2a8c32019-03-26 19:52:28 -070018package main
19
20import (
21 "context"
22 "errors"
23 "fmt"
Abhay Kumard3f18512025-12-09 07:51:12 +000024 "net/http"
kdarapu381c6902019-07-31 18:23:16 +053025 "os"
26 "os/signal"
kdarapu381c6902019-07-31 18:23:16 +053027 "syscall"
28 "time"
29
Abhay Kumar9bcfeb22024-07-12 09:14:25 +053030 grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
Abhay Kumard3f18512025-12-09 07:51:12 +000031 "github.com/prometheus/client_golang/prometheus/promhttp"
Abhay Kumar9bcfeb22024-07-12 09:14:25 +053032 codes "google.golang.org/grpc/codes"
33
khenaidoo106c61a2021-08-11 18:05:46 -040034 conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
35 "github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
36 "github.com/opencord/voltha-lib-go/v7/pkg/events"
37 "github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
38 vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
39 "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
40 "github.com/opencord/voltha-lib-go/v7/pkg/log"
41 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
42 "github.com/opencord/voltha-lib-go/v7/pkg/version"
Scott Bakerdbd960e2020-02-28 08:57:51 -080043 "github.com/opencord/voltha-openolt-adapter/internal/pkg/config"
44 ac "github.com/opencord/voltha-openolt-adapter/internal/pkg/core"
khenaidoodc2116e2021-10-19 17:33:19 -040045 "github.com/opencord/voltha-protos/v5/go/adapter_service"
46 ca "github.com/opencord/voltha-protos/v5/go/core_adapter"
47 "github.com/opencord/voltha-protos/v5/go/core_service"
khenaidoodc2116e2021-10-19 17:33:19 -040048 "github.com/opencord/voltha-protos/v5/go/olt_inter_adapter_service"
khenaidoo106c61a2021-08-11 18:05:46 -040049 "github.com/opencord/voltha-protos/v5/go/voltha"
50 "google.golang.org/grpc"
51)
52
53const (
54 clusterMessagingService = "cluster-message-service"
55 oltAdapterService = "olt-adapter-service"
56 kvService = "kv-service"
57 coreService = "core-service"
Akash Kankanala041a2122024-10-16 15:49:22 +053058 etcdStoreName = "etcd"
cuilin20187b2a8c32019-03-26 19:52:28 -070059)
60
61type adapter struct {
Akash Kankanala041a2122024-10-16 15:49:22 +053062 kafkaClient kafka.Client
63 kvClient kvstore.Client
64 eventProxy eventif.EventProxy
khenaidooefff76e2021-12-15 16:51:30 -050065 config *config.AdapterFlags
66 grpcServer *vgrpc.GrpcServer
67 oltAdapter *ac.OpenOLT
68 oltInterAdapter *ac.OpenOLTInterAdapter
khenaidooefff76e2021-12-15 16:51:30 -050069 coreClient *vgrpc.Client
khenaidooefff76e2021-12-15 16:51:30 -050070 exitChannel chan int
Akash Kankanala041a2122024-10-16 15:49:22 +053071 instanceID string
72 halted bool
cuilin20187b2a8c32019-03-26 19:52:28 -070073}
74
cuilin20187b2a8c32019-03-26 19:52:28 -070075func newAdapter(cf *config.AdapterFlags) *adapter {
76 var a adapter
Girish Gowdru6a80bbd2019-07-02 07:36:09 -070077 a.instanceID = cf.InstanceID
cuilin20187b2a8c32019-03-26 19:52:28 -070078 a.config = cf
79 a.halted = false
80 a.exitChannel = make(chan int, 1)
cuilin20187b2a8c32019-03-26 19:52:28 -070081 return &a
82}
83
84func (a *adapter) start(ctx context.Context) {
Neha Sharma96b7bf22020-06-15 10:37:32 +000085 logger.Info(ctx, "Starting Core Adapter components")
cuilin20187b2a8c32019-03-26 19:52:28 -070086 var err error
87
Rohan Agrawal828bf4e2019-10-22 10:13:19 +000088 var p *probe.Probe
89 if value := ctx.Value(probe.ProbeContextKey); value != nil {
90 if _, ok := value.(*probe.Probe); ok {
91 p = value.(*probe.Probe)
92 p.RegisterService(
Neha Sharma96b7bf22020-06-15 10:37:32 +000093 ctx,
khenaidoo106c61a2021-08-11 18:05:46 -040094 clusterMessagingService,
95 kvService,
96 oltAdapterService,
97 coreService,
Rohan Agrawal828bf4e2019-10-22 10:13:19 +000098 )
99 }
100 }
101
cuilin20187b2a8c32019-03-26 19:52:28 -0700102 // Setup KV Client
Neha Sharma96b7bf22020-06-15 10:37:32 +0000103 logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
104 if err = a.setKVClient(ctx); err != nil {
105 logger.Fatalw(ctx, "error-setting-kv-client", log.Fields{"error": err})
cuilin20187b2a8c32019-03-26 19:52:28 -0700106 }
107
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000108 if p != nil {
khenaidoo106c61a2021-08-11 18:05:46 -0400109 p.UpdateStatus(ctx, kvService, probe.ServiceStatusRunning)
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000110 }
111
divyadesaia37f78b2020-02-07 12:41:22 +0000112 // Setup Log Config
Neha Sharma96b7bf22020-06-15 10:37:32 +0000113 cm := conf.NewConfigManager(ctx, a.kvClient, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
Matteo Scandolodfa7a972020-11-06 13:03:40 -0800114
divyadesaid26f6b12020-03-19 06:30:28 +0000115 go conf.StartLogLevelConfigProcessing(cm, ctx)
Girish Kumar935f7af2020-08-18 11:59:42 +0000116 go conf.StartLogFeaturesConfigProcessing(cm, ctx)
divyadesaia37f78b2020-02-07 12:41:22 +0000117
cuilin20187b2a8c32019-03-26 19:52:28 -0700118 // Setup Kafka Client
akashreddyk02b2bfe2025-09-05 10:37:40 +0530119 if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000120 logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
cuilin20187b2a8c32019-03-26 19:52:28 -0700121 }
122
khenaidoo106c61a2021-08-11 18:05:46 -0400123 // Start kafka communication with the broker
Akash Kankanala041a2122024-10-16 15:49:22 +0530124 if err = kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, a.kafkaClient, a.config.HeartbeatCheckInterval, clusterMessagingService); err != nil {
khenaidoo106c61a2021-08-11 18:05:46 -0400125 logger.Fatal(ctx, "unable-to-connect-to-kafka")
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000126 }
127
Devmalya Paulfb990a52019-07-09 10:01:49 -0400128 // Create the event proxy to post events to KAFKA
Himani Chawlacd407802020-12-10 12:08:59 +0530129 a.eventProxy = events.NewEventProxy(events.MsgClient(a.kafkaClient), events.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
khenaidoo106c61a2021-08-11 18:05:46 -0400130 go func() {
Akash Kankanala041a2122024-10-16 15:49:22 +0530131 if err = a.eventProxy.Start(); err != nil {
khenaidoo106c61a2021-08-11 18:05:46 -0400132 logger.Fatalw(ctx, "event-proxy-cannot-start", log.Fields{"error": err})
133 }
134 }()
135
136 // Create the Core client to handle requests to the Core. Note that the coreClient is an interface and needs to be
137 // cast to the appropriate grpc client by invoking GetCoreGrpcClient on the a.coreClient
khenaidoo27e7ac92021-12-08 14:43:09 -0500138 if a.coreClient, err = vgrpc.NewClient(
139 a.config.AdapterEndpoint,
140 a.config.CoreEndpoint,
khenaidooefff76e2021-12-15 16:51:30 -0500141 "core_service.CoreService",
khenaidoo27e7ac92021-12-08 14:43:09 -0500142 a.coreRestarted); err != nil {
khenaidoo106c61a2021-08-11 18:05:46 -0400143 logger.Fatal(ctx, "grpc-client-not-created")
144 }
145 // Start the core grpc client
nikesh.krishnan6dd882b2023-03-14 10:02:41 +0530146 retryCodes := []codes.Code{
147 codes.Unavailable, // server is currently unavailable
148 codes.DeadlineExceeded, // deadline for the operation was exceeded
149 }
nikesh.krishnan97e74d22023-06-28 13:54:01 +0530150 // the backoff function sets the wait time bw each grpc retries, if not set it will take the deafault value of 50ms which is too low, the jitter sets the rpc retry wait time to be in a range of[PerRPCRetryTimeout-0.2, PerRPCRetryTimeout+0.2]
151 backoffCtxOption := grpc_retry.WithBackoff(grpc_retry.BackoffLinearWithJitter(a.config.PerRPCRetryTimeout, 0.2))
152 grpcRetryOptions := grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(a.config.MaxRetries), grpc_retry.WithPerRetryTimeout(a.config.PerRPCRetryTimeout), grpc_retry.WithCodes(retryCodes...), backoffCtxOption)
nikesh.krishnan6dd882b2023-03-14 10:02:41 +0530153 logger.Debug(ctx, "Configuration values", log.Fields{"RETRY": a.config.MaxRetries, "TIMEOUT": a.config.PerRPCRetryTimeout})
154 go a.coreClient.Start(ctx, getCoreServiceClientHandler, grpcRetryOptions)
Devmalya Paulfb990a52019-07-09 10:01:49 -0400155
cuilin20187b2a8c32019-03-26 19:52:28 -0700156 // Create the open OLT adapter
khenaidoo106c61a2021-08-11 18:05:46 -0400157 if a.oltAdapter, err = a.startOpenOLT(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000158 logger.Fatalw(ctx, "error-starting-openolt", log.Fields{"error": err})
cuilin20187b2a8c32019-03-26 19:52:28 -0700159 }
160
khenaidooefff76e2021-12-15 16:51:30 -0500161 // Create the open OLT Inter adapter adapter
162 if a.oltInterAdapter, err = a.startOpenOLTInterAdapter(ctx, a.oltAdapter); err != nil {
163 logger.Fatalw(ctx, "error-starting-openolt-inter-adapter", log.Fields{"error": err})
164 }
165
khenaidoo106c61a2021-08-11 18:05:46 -0400166 // Create and start the grpc server
167 a.grpcServer = vgrpc.NewGrpcServer(a.config.GrpcAddress, nil, false, p)
168
Akash Kankanala041a2122024-10-16 15:49:22 +0530169 // Register the adapter service
khenaidoo106c61a2021-08-11 18:05:46 -0400170 a.addAdapterService(ctx, a.grpcServer, a.oltAdapter)
171
Akash Kankanala041a2122024-10-16 15:49:22 +0530172 // Register the olt inter-adapter service
khenaidooefff76e2021-12-15 16:51:30 -0500173 a.addOltInterAdapterService(ctx, a.grpcServer, a.oltInterAdapter)
khenaidoo106c61a2021-08-11 18:05:46 -0400174
175 // Start the grpc server
176 go a.startGRPCService(ctx, a.grpcServer, oltAdapterService)
cuilin20187b2a8c32019-03-26 19:52:28 -0700177
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700178 // Register this adapter to the Core - retries indefinitely
khenaidoo106c61a2021-08-11 18:05:46 -0400179 if err = a.registerWithCore(ctx, coreService, -1); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000180 logger.Fatal(ctx, "error-registering-with-core")
cuilin20187b2a8c32019-03-26 19:52:28 -0700181 }
cbabu95f21522019-11-13 14:25:18 +0100182
cbabu116b73f2019-12-10 17:56:32 +0530183 // check the readiness and liveliness and update the probe status
184 a.checkServicesReadiness(ctx)
cbabu95f21522019-11-13 14:25:18 +0100185}
186
khenaidoo106c61a2021-08-11 18:05:46 -0400187// TODO: Any action the adapter needs to do following a Core restart?
188func (a *adapter) coreRestarted(ctx context.Context, endPoint string) error {
189 logger.Errorw(ctx, "core-restarted", log.Fields{"endpoint": endPoint})
190 return nil
191}
192
khenaidooefff76e2021-12-15 16:51:30 -0500193// getCoreServiceClientHandler is used to test whether the remote gRPC service is up
194func getCoreServiceClientHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
195 if conn == nil {
khenaidoo106c61a2021-08-11 18:05:46 -0400196 return nil
197 }
khenaidooefff76e2021-12-15 16:51:30 -0500198 return core_service.NewCoreServiceClient(conn)
khenaidoo106c61a2021-08-11 18:05:46 -0400199}
200
Joey Armstrong87b55f72023-06-27 12:12:53 -0400201/*
202*
cbabu95f21522019-11-13 14:25:18 +0100203This function checks the liveliness and readiness of the kakfa and kv-client services
204and update the status in the probe.
205*/
cbabu116b73f2019-12-10 17:56:32 +0530206func (a *adapter) checkServicesReadiness(ctx context.Context) {
207 // checks the kafka readiness
khenaidoo106c61a2021-08-11 18:05:46 -0400208 go kafka.MonitorKafkaReadiness(ctx, a.kafkaClient, a.config.LiveProbeInterval, a.config.NotLiveProbeInterval, clusterMessagingService)
cbabu116b73f2019-12-10 17:56:32 +0530209
210 // checks the kv-store readiness
211 go a.checkKvStoreReadiness(ctx)
212}
213
Joey Armstrong87b55f72023-06-27 12:12:53 -0400214/*
215*
cbabu116b73f2019-12-10 17:56:32 +0530216This function checks the liveliness and readiness of the kv-store service
217and update the status in the probe.
218*/
219func (a *adapter) checkKvStoreReadiness(ctx context.Context) {
220 // dividing the live probe interval by 2 to get updated status every 30s
221 timeout := a.config.LiveProbeInterval / 2
222 kvStoreChannel := make(chan bool, 1)
223
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700224 timeoutCtx, cancelFunc := context.WithTimeout(ctx, 2*time.Second)
225 kvStoreChannel <- a.kvClient.IsConnectionUp(timeoutCtx)
226 cancelFunc()
227
cbabu95f21522019-11-13 14:25:18 +0100228 for {
cbabu116b73f2019-12-10 17:56:32 +0530229 timeoutTimer := time.NewTimer(timeout)
230 select {
231 case liveliness := <-kvStoreChannel:
232 if !liveliness {
233 // kv-store not reachable or down, updating the status to not ready state
khenaidoo106c61a2021-08-11 18:05:46 -0400234 probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusNotReady)
cbabu116b73f2019-12-10 17:56:32 +0530235 timeout = a.config.NotLiveProbeInterval
236 } else {
237 // kv-store is reachable , updating the status to running state
khenaidoo106c61a2021-08-11 18:05:46 -0400238 probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusRunning)
cbabu116b73f2019-12-10 17:56:32 +0530239 timeout = a.config.LiveProbeInterval / 2
240 }
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700241
cbabu116b73f2019-12-10 17:56:32 +0530242 // Check if the timer has expired or not
243 if !timeoutTimer.Stop() {
244 <-timeoutTimer.C
245 }
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700246
cbabu116b73f2019-12-10 17:56:32 +0530247 case <-timeoutTimer.C:
Girish Kumarbeadc112020-02-26 18:41:02 +0000248 // Check the status of the kv-store. Use timeout of 2 seconds to avoid forever blocking
Neha Sharma96b7bf22020-06-15 10:37:32 +0000249 logger.Info(ctx, "kv-store liveliness-recheck")
Girish Kumarbeadc112020-02-26 18:41:02 +0000250 timeoutCtx, cancelFunc := context.WithTimeout(ctx, 2*time.Second)
251
252 kvStoreChannel <- a.kvClient.IsConnectionUp(timeoutCtx)
253 // Cleanup cancel func resources
254 cancelFunc()
cbabu95f21522019-11-13 14:25:18 +0100255 }
cbabu116b73f2019-12-10 17:56:32 +0530256 }
257}
258
npujarec5762e2020-01-01 14:08:48 +0530259func (a *adapter) stop(ctx context.Context) {
cuilin20187b2a8c32019-03-26 19:52:28 -0700260 // Stop leadership tracking
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700261 a.halted = true
cuilin20187b2a8c32019-03-26 19:52:28 -0700262
263 // send exit signal
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700264 a.exitChannel <- 0
cuilin20187b2a8c32019-03-26 19:52:28 -0700265
khenaidooefff76e2021-12-15 16:51:30 -0500266 // Stop all grpc processing
267 if err := a.oltAdapter.Stop(ctx); err != nil {
268 logger.Errorw(ctx, "failure-stopping-olt-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterName})
269 }
270 if err := a.oltInterAdapter.Stop(ctx); err != nil {
271 logger.Errorw(ctx, "failure-stopping-olt-inter-adapter-service", log.Fields{"error": err, "adapter": a.config.AdapterName})
272 }
273
cuilin20187b2a8c32019-03-26 19:52:28 -0700274 // Cleanup - applies only if we had a kvClient
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700275 if a.kvClient != nil {
cuilin20187b2a8c32019-03-26 19:52:28 -0700276 // Release all reservations
npujarec5762e2020-01-01 14:08:48 +0530277 if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000278 logger.Infow(ctx, "fail-to-release-all-reservations", log.Fields{"error": err})
cuilin20187b2a8c32019-03-26 19:52:28 -0700279 }
280 // Close the DB connection
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700281 go a.kvClient.Close(ctx)
cuilin20187b2a8c32019-03-26 19:52:28 -0700282 }
283
khenaidoo106c61a2021-08-11 18:05:46 -0400284 if a.eventProxy != nil {
285 a.eventProxy.Stop()
Scott Bakere701b862020-02-20 16:19:16 -0800286 }
287
khenaidoo106c61a2021-08-11 18:05:46 -0400288 if a.kafkaClient != nil {
289 a.kafkaClient.Stop(ctx)
290 }
291
292 // Stop core client
293 if a.coreClient != nil {
294 a.coreClient.Stop(ctx)
295 }
296
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700297 logger.Info(ctx, "main-stop-processing-complete")
298
khenaidoo106c61a2021-08-11 18:05:46 -0400299 // TODO: Stop child devices connections
300
cuilin20187b2a8c32019-03-26 19:52:28 -0700301 // TODO: More cleanup
302}
303
Neha Sharma96b7bf22020-06-15 10:37:32 +0000304func newKVClient(ctx context.Context, storeType, address string, timeout time.Duration) (kvstore.Client, error) {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000305 logger.Infow(ctx, "kv-store-type", log.Fields{"store": storeType})
cuilin20187b2a8c32019-03-26 19:52:28 -0700306 switch storeType {
Akash Kankanala041a2122024-10-16 15:49:22 +0530307 case etcdStoreName:
Neha Sharma96b7bf22020-06-15 10:37:32 +0000308 return kvstore.NewEtcdClient(ctx, address, timeout, log.FatalLevel)
Abhay Kumar9bcfeb22024-07-12 09:14:25 +0530309 case "redis":
310 return kvstore.NewRedisClient(address, timeout, false)
311 case "redis-sentinel":
312 return kvstore.NewRedisClient(address, timeout, true)
cuilin20187b2a8c32019-03-26 19:52:28 -0700313 }
314 return nil, errors.New("unsupported-kv-store")
315}
316
akashreddyk02b2bfe2025-09-05 10:37:40 +0530317func newKafkaClient(ctx context.Context, clientType string, config *config.AdapterFlags) (kafka.Client, error) {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000318 logger.Infow(ctx, "common-client-type", log.Fields{"client": clientType})
cuilin20187b2a8c32019-03-26 19:52:28 -0700319 switch clientType {
320 case "sarama":
321 return kafka.NewSaramaClient(
akashreddyk02b2bfe2025-09-05 10:37:40 +0530322 kafka.Address(config.KafkaClusterAddress),
cuilin20187b2a8c32019-03-26 19:52:28 -0700323 kafka.ProducerReturnOnErrors(true),
324 kafka.ProducerReturnOnSuccess(true),
akashreddyk02b2bfe2025-09-05 10:37:40 +0530325 kafka.ProducerMaxRetries(config.ProducerRetryMax),
Abhilash S.L3b494632019-07-16 15:51:09 +0530326 kafka.ProducerRetryBackoff(time.Millisecond*30),
akashreddyk02b2bfe2025-09-05 10:37:40 +0530327 kafka.MetadatMaxRetries(config.MetadataRetryMax)), nil
cuilin20187b2a8c32019-03-26 19:52:28 -0700328 }
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700329
cuilin20187b2a8c32019-03-26 19:52:28 -0700330 return nil, errors.New("unsupported-client-type")
331}
332
Neha Sharma96b7bf22020-06-15 10:37:32 +0000333func (a *adapter) setKVClient(ctx context.Context) error {
334 client, err := newKVClient(ctx, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
cuilin20187b2a8c32019-03-26 19:52:28 -0700335 if err != nil {
336 a.kvClient = nil
cuilin20187b2a8c32019-03-26 19:52:28 -0700337 return err
338 }
339 a.kvClient = client
divyadesaia37f78b2020-02-07 12:41:22 +0000340
cuilin20187b2a8c32019-03-26 19:52:28 -0700341 return nil
342}
343
khenaidoo106c61a2021-08-11 18:05:46 -0400344// startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
345func (a *adapter) startGRPCService(ctx context.Context, server *vgrpc.GrpcServer, serviceName string) {
346 logger.Infow(ctx, "starting-grpc-service", log.Fields{"service": serviceName})
347
348 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
349 logger.Infow(ctx, "grpc-service-started", log.Fields{"service": serviceName})
350
351 server.Start(ctx)
352 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusStopped)
cuilin20187b2a8c32019-03-26 19:52:28 -0700353}
354
khenaidoodc2116e2021-10-19 17:33:19 -0400355func (a *adapter) addAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler adapter_service.AdapterServiceServer) {
khenaidoo106c61a2021-08-11 18:05:46 -0400356 logger.Info(ctx, "adding-adapter-service")
357
358 server.AddService(func(gs *grpc.Server) {
khenaidoodc2116e2021-10-19 17:33:19 -0400359 adapter_service.RegisterAdapterServiceServer(gs, handler)
khenaidoo106c61a2021-08-11 18:05:46 -0400360 })
361}
362
khenaidoodc2116e2021-10-19 17:33:19 -0400363func (a *adapter) addOltInterAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler olt_inter_adapter_service.OltInterAdapterServiceServer) {
khenaidoo106c61a2021-08-11 18:05:46 -0400364 logger.Info(ctx, "adding-olt-inter-adapter-service")
365
366 server.AddService(func(gs *grpc.Server) {
khenaidoodc2116e2021-10-19 17:33:19 -0400367 olt_inter_adapter_service.RegisterOltInterAdapterServiceServer(gs, handler)
khenaidoo106c61a2021-08-11 18:05:46 -0400368 })
369}
370
371func (a *adapter) startOpenOLT(ctx context.Context, cc *vgrpc.Client, ep eventif.EventProxy,
Matteo Scandolodfa7a972020-11-06 13:03:40 -0800372 cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenOLT, error) {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000373 logger.Info(ctx, "starting-open-olt")
cuilin20187b2a8c32019-03-26 19:52:28 -0700374 var err error
khenaidoo106c61a2021-08-11 18:05:46 -0400375 sOLT := ac.NewOpenOLT(ctx, cc, ep, cfg, cm)
cuilin20187b2a8c32019-03-26 19:52:28 -0700376
377 if err = sOLT.Start(ctx); err != nil {
cuilin20187b2a8c32019-03-26 19:52:28 -0700378 return nil, err
379 }
380
Neha Sharma96b7bf22020-06-15 10:37:32 +0000381 logger.Info(ctx, "open-olt-started")
cuilin20187b2a8c32019-03-26 19:52:28 -0700382 return sOLT, nil
383}
384
khenaidooefff76e2021-12-15 16:51:30 -0500385func (a *adapter) startOpenOLTInterAdapter(ctx context.Context, oo *ac.OpenOLT) (*ac.OpenOLTInterAdapter, error) {
386 logger.Info(ctx, "starting-open-olt-inter-adapter")
387 var err error
388 sOLTInterAdapter := ac.NewOpenOLTInterAdapter(oo)
389
390 if err = sOLTInterAdapter.Start(ctx); err != nil {
391 return nil, err
392 }
393
394 logger.Info(ctx, "open-olt-inter-adapter-started")
395 return sOLTInterAdapter, nil
396}
397
khenaidoo106c61a2021-08-11 18:05:46 -0400398func (a *adapter) registerWithCore(ctx context.Context, serviceName string, retries int) error {
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700399 adapterID := fmt.Sprintf("openolt_%d", a.config.CurrentReplica)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000400 logger.Infow(ctx, "registering-with-core", log.Fields{
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700401 "adapterID": adapterID,
402 "currentReplica": a.config.CurrentReplica,
403 "totalReplicas": a.config.TotalReplicas,
404 })
405 adapterDescription := &voltha.Adapter{
406 Id: adapterID, // Unique name for the device type
Matt Jeanneretf880eb62019-07-16 20:08:03 -0400407 Vendor: "VOLTHA OpenOLT",
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700408 Version: version.VersionInfo.Version,
khenaidoo106c61a2021-08-11 18:05:46 -0400409 // The Endpoint refers to the address this service is listening on.
410 Endpoint: a.config.AdapterEndpoint,
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700411 Type: "openolt",
412 CurrentReplica: int32(a.config.CurrentReplica),
413 TotalReplicas: int32(a.config.TotalReplicas),
414 }
415 types := []*voltha.DeviceType{{
416 Id: "openolt",
khenaidoo106c61a2021-08-11 18:05:46 -0400417 AdapterType: "openolt", // Type of the adapter that handles device type
418 Adapter: "openolt", // Deprecated attribute
Girish Gowdru0c588b22019-04-23 23:24:56 -0400419 AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
420 AcceptsAddRemoveFlowUpdates: true}}
cuilin20187b2a8c32019-03-26 19:52:28 -0700421 deviceTypes := &voltha.DeviceTypes{Items: types}
422 count := 0
423 for {
khenaidoo106c61a2021-08-11 18:05:46 -0400424 gClient, err := a.coreClient.GetCoreServiceClient()
425 if gClient != nil {
khenaidoodc2116e2021-10-19 17:33:19 -0400426 if _, err = gClient.RegisterAdapter(log.WithSpanFromContext(context.TODO(), ctx), &ca.AdapterRegistration{
khenaidoo106c61a2021-08-11 18:05:46 -0400427 Adapter: adapterDescription,
428 DTypes: deviceTypes}); err == nil {
429 break
cuilin20187b2a8c32019-03-26 19:52:28 -0700430 }
cuilin20187b2a8c32019-03-26 19:52:28 -0700431 }
khenaidoo106c61a2021-08-11 18:05:46 -0400432 logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"endpoint": a.config.CoreEndpoint, "error": err, "count": count, "gclient": gClient})
433 if retries == count {
434 return err
435 }
436 count++
437 // Take a nap before retrying
438 time.Sleep(2 * time.Second)
cuilin20187b2a8c32019-03-26 19:52:28 -0700439 }
khenaidoo106c61a2021-08-11 18:05:46 -0400440 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000441 logger.Info(ctx, "registered-with-core")
cuilin20187b2a8c32019-03-26 19:52:28 -0700442 return nil
443}
444
Neha Sharma96b7bf22020-06-15 10:37:32 +0000445func waitForExit(ctx context.Context) int {
cuilin20187b2a8c32019-03-26 19:52:28 -0700446 signalChannel := make(chan os.Signal, 1)
447 signal.Notify(signalChannel,
448 syscall.SIGHUP,
449 syscall.SIGINT,
450 syscall.SIGTERM,
451 syscall.SIGQUIT)
452
453 exitChannel := make(chan int)
454
455 go func() {
456 s := <-signalChannel
457 switch s {
458 case syscall.SIGHUP,
459 syscall.SIGINT,
460 syscall.SIGTERM,
461 syscall.SIGQUIT:
Neha Sharma96b7bf22020-06-15 10:37:32 +0000462 logger.Infow(ctx, "closing-signal-received", log.Fields{"signal": s})
cuilin20187b2a8c32019-03-26 19:52:28 -0700463 exitChannel <- 0
464 default:
Neha Sharma96b7bf22020-06-15 10:37:32 +0000465 logger.Infow(ctx, "unexpected-signal-received", log.Fields{"signal": s})
cuilin20187b2a8c32019-03-26 19:52:28 -0700466 exitChannel <- 1
467 }
468 }()
469
470 code := <-exitChannel
471 return code
472}
473
474func printBanner() {
David K. Bainbridge794735f2020-02-11 21:01:37 -0800475 fmt.Println(` ____ ____ _ _______ `)
476 fmt.Println(` / _ \ / __ \| | |__ __|`)
477 fmt.Println(` | | | |_ __ ___ _ __ | | | | | | | `)
478 fmt.Println(` | | | | '_ \ / _ \ '_ \ | | | | | | | `)
479 fmt.Println(` | |__| | |_) | __/ | | || |__| | |____| | `)
480 fmt.Println(` \____/| .__/ \___|_| |_| \____/|______|_| `)
481 fmt.Println(` | | `)
482 fmt.Println(` |_| `)
483 fmt.Println(` `)
cuilin20187b2a8c32019-03-26 19:52:28 -0700484}
485
Matt Jeanneretf880eb62019-07-16 20:08:03 -0400486func printVersion() {
487 fmt.Println("VOLTHA OpenOLT Adapter")
488 fmt.Println(version.VersionInfo.String(" "))
489}
490
cuilin20187b2a8c32019-03-26 19:52:28 -0700491func main() {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000492 ctx := context.Background()
cuilin20187b2a8c32019-03-26 19:52:28 -0700493 start := time.Now()
494
495 cf := config.NewAdapterFlags()
496 cf.ParseCommandArguments()
497
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700498 // Setup logging
cuilin20187b2a8c32019-03-26 19:52:28 -0700499
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000500 logLevel, err := log.StringToLogLevel(cf.LogLevel)
501 if err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000502 logger.Fatalf(ctx, "Cannot setup logging, %s", err)
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000503 }
Rohan Agrawal2488f192020-01-31 09:26:55 +0000504
Girish Gowdru6a80bbd2019-07-02 07:36:09 -0700505 // Setup default logger - applies for packages that do not have specific logger set
Akash Kankanala041a2122024-10-16 15:49:22 +0530506 if _, err = log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
Girish Kumara1ea2aa2020-08-19 18:14:22 +0000507 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
cuilin20187b2a8c32019-03-26 19:52:28 -0700508 }
509
510 // Update all loggers (provisionned via init) with a common field
Akash Kankanala041a2122024-10-16 15:49:22 +0530511 if err = log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
Girish Kumara1ea2aa2020-08-19 18:14:22 +0000512 logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
cuilin20187b2a8c32019-03-26 19:52:28 -0700513 }
514
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000515 log.SetAllLogLevel(logLevel)
Rohan Agrawal93bced32020-02-11 10:16:01 +0000516
Matteo Scandolo8f2b9572020-02-28 15:35:23 -0800517 realMain()
518
Kent Hagermane6ff1012020-07-14 15:07:53 -0400519 defer func() {
Akash Kankanala041a2122024-10-16 15:49:22 +0530520 err = log.CleanUp()
Kent Hagermane6ff1012020-07-14 15:07:53 -0400521 if err != nil {
522 logger.Errorw(context.Background(), "unable-to-flush-any-buffered-log-entries", log.Fields{"error": err})
523 }
524 }()
cuilin20187b2a8c32019-03-26 19:52:28 -0700525
Matt Jeanneretf880eb62019-07-16 20:08:03 -0400526 // Print version / build information and exit
527 if cf.DisplayVersionOnly {
528 printVersion()
529 return
530 }
531
cuilin20187b2a8c32019-03-26 19:52:28 -0700532 // Print banner if specified
533 if cf.Banner {
534 printBanner()
535 }
536
Neha Sharma96b7bf22020-06-15 10:37:32 +0000537 logger.Infow(ctx, "config", log.Fields{"config": *cf})
cuilin20187b2a8c32019-03-26 19:52:28 -0700538
539 ctx, cancel := context.WithCancel(context.Background())
540 defer cancel()
541
542 ad := newAdapter(cf)
Abhay Kumard3f18512025-12-09 07:51:12 +0000543 http.Handle("/metrics", promhttp.Handler())
544 go func() {
545 logger.Infof(ctx, "Metrics available at %s/metrics", ad.config.PrometheusAddress)
546 // Create HTTP server with explicit timeouts to prevent slowloris attacks and resource exhaustion.
547 // Using http.ListenAndServe() directly doesn't allow setting timeouts, which is a security risk.
548 // The server uses http.DefaultServeMux (nil handler) which includes the /metrics endpoint registered above.
549 metricsServer := &http.Server{
550 Addr: ad.config.PrometheusAddress,
551 Handler: nil,
552 ReadHeaderTimeout: 10 * time.Second,
553 ReadTimeout: 30 * time.Second,
554 WriteTimeout: 30 * time.Second,
555 IdleTimeout: 120 * time.Second,
556 }
557 if err := metricsServer.ListenAndServe(); err != nil {
558 logger.Errorw(ctx, "failed to start metrics HTTP server: ", log.Fields{"error": err})
559 }
560 }()
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000561
562 p := &probe.Probe{}
Neha Sharma96b7bf22020-06-15 10:37:32 +0000563 go p.ListenAndServe(ctx, ad.config.ProbeAddress)
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000564
565 probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
566
Girish Kumar935f7af2020-08-18 11:59:42 +0000567 closer, err := log.GetGlobalLFM().InitTracingAndLogCorrelation(cf.TraceEnabled, cf.TraceAgentAddress, cf.LogCorrelationEnabled)
Girish Kumar11e15972020-06-15 14:51:10 +0000568 if err != nil {
569 logger.Warnw(ctx, "unable-to-initialize-tracing-and-log-correlation-module", log.Fields{"error": err})
570 } else {
571 defer log.TerminateTracing(closer)
572 }
573
Rohan Agrawal828bf4e2019-10-22 10:13:19 +0000574 go ad.start(probeCtx)
cuilin20187b2a8c32019-03-26 19:52:28 -0700575
Neha Sharma96b7bf22020-06-15 10:37:32 +0000576 code := waitForExit(ctx)
577 logger.Infow(ctx, "received-a-closing-signal", log.Fields{"code": code})
cuilin20187b2a8c32019-03-26 19:52:28 -0700578
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700579 // Use context with cancel as etcd-client stop could take more time sometimes to stop slowing down container shutdown.
580 ctxWithCancel, cancelFunc := context.WithCancel(ctx)
cuilin20187b2a8c32019-03-26 19:52:28 -0700581 // Cleanup before leaving
Girish Gowdra4b48fa42022-06-01 18:10:08 -0700582 ad.stop(ctxWithCancel)
583 // Will halt any long-running stop routine gracefully
584 cancelFunc()
cuilin20187b2a8c32019-03-26 19:52:28 -0700585
586 elapsed := time.Since(start)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000587 logger.Infow(ctx, "run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
cuilin20187b2a8c32019-03-26 19:52:28 -0700588}
Joey Armstrong87b55f72023-06-27 12:12:53 -0400589
590// [EOF]