[VOL-5473] Crash fix in startCollector()

Signed-off-by: bseeniva <balaji.seenivasan@radisys.com>
Change-Id: I4a2805b6666da9f0773070d06af6c0e0bd4659cf
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index bc971a6..c866695 100755
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -136,6 +136,7 @@
 
 	isDeviceDeletionInProgress bool
 	prevOperStatus             common.OperStatus_Types
+	collectorWaitGroup         sync.WaitGroup
 }
 
 // OnuDevice represents ONU related info
@@ -937,6 +938,7 @@
 	//starting the stat collector
 	// Declare deviceStateFilter to be used later
 	var deviceStateFilter *ca.DeviceStateFilter
+	dh.collectorWaitGroup.Add(1)
 	go startCollector(ctx, dh)
 	device, err := dh.getDeviceFromCore(ctx, dh.device.Id)
 	if err != nil {
@@ -965,6 +967,9 @@
 		if dh.isReadIndicationRoutineActive {
 			dh.stopIndications <- true
 		}
+		if dh.isCollectorActive {
+			dh.stopCollector <- true
+		}
 		dh.lockDevice.RUnlock()
 		if err = dh.cleanupDeviceResources(ctx); err != nil {
 			logger.Error(ctx, "unable to clean up device resources", log.Fields{"error": err})
@@ -973,6 +978,8 @@
 		if err = dh.initializeDeviceHandlerModules(ctx); err != nil {
 			return olterrors.NewErrAdapter("device-handler-initialization-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
 		}
+		dh.collectorWaitGroup.Add(1)
+		go startCollector(ctx, dh)
 		go startHeartbeatCheck(ctx, dh)
 		//dh.lockDevice.RUnlock()
 	} else if device.OperStatus == voltha.OperStatus_RECONCILING {
@@ -1245,6 +1252,14 @@
 	}
 	dh.totalPonPorts = dh.deviceInfo.GetPonPorts()
 	dh.agentPreviouslyConnected = dh.deviceInfo.PreviouslyConnected
+	// If collector go routine is active, wait for it to stop
+	dh.lockDevice.RLock()
+	if dh.isCollectorActive {
+		dh.lockDevice.RUnlock()
+		dh.collectorWaitGroup.Wait()
+	} else {
+		dh.lockDevice.RUnlock()
+	}
 	// +1 is for NNI
 	dh.resourceMgr = make([]*rsrcMgr.OpenOltResourceMgr, dh.totalPonPorts+1)
 	dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts+1)
@@ -1326,12 +1341,13 @@
 }
 
 func startCollector(ctx context.Context, dh *DeviceHandler) {
-	logger.Debugw(ctx, "starting-collector", log.Fields{"device-id": dh.device.Id})
+	logger.Infow(ctx, "starting-collector", log.Fields{"device-id": dh.device.Id})
 
 	defer func() {
 		dh.lockDevice.Lock()
 		dh.isCollectorActive = false
 		dh.lockDevice.Unlock()
+		dh.collectorWaitGroup.Done()
 	}()
 
 	dh.lockDevice.Lock()
@@ -1341,7 +1357,7 @@
 	for {
 		select {
 		case <-dh.stopCollector:
-			logger.Debugw(ctx, "stopping-collector-for-olt", log.Fields{"device-id": dh.device.Id})
+			logger.Infow(ctx, "stopping-collector-for-olt", log.Fields{"device-id": dh.device.Id})
 			return
 		case <-time.After(time.Duration(dh.metrics.ToPmConfigs().DefaultFreq) * time.Second):