[VOL-5473] Crash fix in startCollector()
Signed-off-by: bseeniva <balaji.seenivasan@radisys.com>
Change-Id: I4a2805b6666da9f0773070d06af6c0e0bd4659cf
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index bc971a6..c866695 100755
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -136,6 +136,7 @@
isDeviceDeletionInProgress bool
prevOperStatus common.OperStatus_Types
+ collectorWaitGroup sync.WaitGroup
}
// OnuDevice represents ONU related info
@@ -937,6 +938,7 @@
//starting the stat collector
// Declare deviceStateFilter to be used later
var deviceStateFilter *ca.DeviceStateFilter
+ dh.collectorWaitGroup.Add(1)
go startCollector(ctx, dh)
device, err := dh.getDeviceFromCore(ctx, dh.device.Id)
if err != nil {
@@ -965,6 +967,9 @@
if dh.isReadIndicationRoutineActive {
dh.stopIndications <- true
}
+ if dh.isCollectorActive {
+ dh.stopCollector <- true
+ }
dh.lockDevice.RUnlock()
if err = dh.cleanupDeviceResources(ctx); err != nil {
logger.Error(ctx, "unable to clean up device resources", log.Fields{"error": err})
@@ -973,6 +978,8 @@
if err = dh.initializeDeviceHandlerModules(ctx); err != nil {
return olterrors.NewErrAdapter("device-handler-initialization-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
}
+ dh.collectorWaitGroup.Add(1)
+ go startCollector(ctx, dh)
go startHeartbeatCheck(ctx, dh)
//dh.lockDevice.RUnlock()
} else if device.OperStatus == voltha.OperStatus_RECONCILING {
@@ -1245,6 +1252,14 @@
}
dh.totalPonPorts = dh.deviceInfo.GetPonPorts()
dh.agentPreviouslyConnected = dh.deviceInfo.PreviouslyConnected
+ // If collector go routine is active, wait for it to stop
+ dh.lockDevice.RLock()
+ if dh.isCollectorActive {
+ dh.lockDevice.RUnlock()
+ dh.collectorWaitGroup.Wait()
+ } else {
+ dh.lockDevice.RUnlock()
+ }
// +1 is for NNI
dh.resourceMgr = make([]*rsrcMgr.OpenOltResourceMgr, dh.totalPonPorts+1)
dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts+1)
@@ -1326,12 +1341,13 @@
}
func startCollector(ctx context.Context, dh *DeviceHandler) {
- logger.Debugw(ctx, "starting-collector", log.Fields{"device-id": dh.device.Id})
+ logger.Infow(ctx, "starting-collector", log.Fields{"device-id": dh.device.Id})
defer func() {
dh.lockDevice.Lock()
dh.isCollectorActive = false
dh.lockDevice.Unlock()
+ dh.collectorWaitGroup.Done()
}()
dh.lockDevice.Lock()
@@ -1341,7 +1357,7 @@
for {
select {
case <-dh.stopCollector:
- logger.Debugw(ctx, "stopping-collector-for-olt", log.Fields{"device-id": dh.device.Id})
+ logger.Infow(ctx, "stopping-collector-for-olt", log.Fields{"device-id": dh.device.Id})
return
case <-time.After(time.Duration(dh.metrics.ToPmConfigs().DefaultFreq) * time.Second):
diff --git a/internal/pkg/core/device_handler_test.go b/internal/pkg/core/device_handler_test.go
index bca0287..7a7ec52 100644
--- a/internal/pkg/core/device_handler_test.go
+++ b/internal/pkg/core/device_handler_test.go
@@ -1356,6 +1356,7 @@
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ tt.args.dh.collectorWaitGroup.Add(1)
go func() {
time.Sleep(1 * time.Second) // simulated wait time to stop startCollector
tt.args.dh.stopCollector <- true