VOL-4471: Stale data in resource manager

Change-Id: I026774317ba577b1d5d0748c3d177b4b7bf2ac94
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 3f23fb4..f7d1405 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -102,7 +102,9 @@
 
 	// Slice of channels. Each channel in slice, index by (mcast-group-id modulo MaxNumOfGroupHandlerChannels)
 	// A go routine per index, waits on a unique channel for incoming mcast flow or group (add/modify/remove).
-	incomingMcastFlowOrGroup []chan McastFlowOrGroupControlBlock
+	incomingMcastFlowOrGroup  []chan McastFlowOrGroupControlBlock
+	stopMcastHandlerRoutine   []chan bool
+	mcastHandlerRoutineActive []bool
 
 	adapterPreviouslyConnected bool
 	agentPreviouslyConnected   bool
@@ -188,13 +190,17 @@
 	dh.perPonOnuIndicationChannel = make(map[uint32]onuIndicationChannels)
 	// Create a slice of buffered channels for handling concurrent mcast flow/group.
 	dh.incomingMcastFlowOrGroup = make([]chan McastFlowOrGroupControlBlock, MaxNumOfGroupHandlerChannels)
+	dh.stopMcastHandlerRoutine = make([]chan bool, MaxNumOfGroupHandlerChannels)
+	dh.mcastHandlerRoutineActive = make([]bool, MaxNumOfGroupHandlerChannels)
 	for i := range dh.incomingMcastFlowOrGroup {
 		dh.incomingMcastFlowOrGroup[i] = make(chan McastFlowOrGroupControlBlock, MaxNumOfGroupHandlerChannels)
+		dh.stopMcastHandlerRoutine[i] = make(chan bool, 1)
 		// Spin up a go routine to handling incoming mcast flow/group (add/modify/remove).
 		// There will be MaxNumOfGroupHandlerChannels number of mcastFlowOrGroupChannelHandlerRoutine go routines.
 		// These routines will be blocked on the dh.incomingMcastFlowOrGroup[mcast-group-id modulo MaxNumOfGroupHandlerChannels] channel
 		// for incoming mcast flow/group to be processed serially.
-		go dh.mcastFlowOrGroupChannelHandlerRoutine(dh.incomingMcastFlowOrGroup[i])
+		dh.mcastHandlerRoutineActive[i] = true
+		go dh.mcastFlowOrGroupChannelHandlerRoutine(i, dh.incomingMcastFlowOrGroup[i], dh.stopMcastHandlerRoutine[i])
 	}
 	//TODO initialize the support classes.
 	return &dh
@@ -639,6 +645,20 @@
 	//starting the stat collector
 	go startCollector(ctx, dh)
 
+	// instantiate the mcast handler routines.
+	for i := range dh.incomingMcastFlowOrGroup {
+		// We land inside the below "if" code path, after the OLT comes back from a reboot, otherwise the routines
+		// are already active when the DeviceHandler module is first instantiated (as part of Adopt_device RPC invocation).
+		if !dh.mcastHandlerRoutineActive[i] {
+			// Spin up a go routine to handling incoming mcast flow/group (add/modify/remove).
+			// There will be MaxNumOfGroupHandlerChannels number of mcastFlowOrGroupChannelHandlerRoutine go routines.
+			// These routines will be blocked on the dh.incomingMcastFlowOrGroup[mcast-group-id modulo MaxNumOfGroupHandlerChannels] channel
+			// for incoming mcast flow/group to be processed serially.
+			dh.mcastHandlerRoutineActive[i] = true
+			go dh.mcastFlowOrGroupChannelHandlerRoutine(i, dh.incomingMcastFlowOrGroup[i], dh.stopMcastHandlerRoutine[i])
+		}
+	}
+
 	// Synchronous call to update device state - this method is run in its own go routine
 	if err := dh.coreProxy.DeviceStateUpdate(ctx, dh.device.Id, voltha.ConnectStatus_REACHABLE,
 		voltha.OperStatus_ACTIVE); err != nil {
@@ -1525,9 +1545,7 @@
 	}
 }
 
-//UpdateFlowsIncrementally updates the device flow
-func (dh *DeviceHandler) UpdateFlowsIncrementally(ctx context.Context, device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
-	logger.Debugw(ctx, "received-incremental-flowupdate-in-device-handler", log.Fields{"device-id": device.Id, "flows": flows, "groups": groups, "flowMetadata": flowMetadata})
+func (dh *DeviceHandler) handleFlows(ctx context.Context, device *voltha.Device, flows *of.FlowChanges, flowMetadata *voltha.FlowMetadata) []error {
 	var err error
 	var errorsList []error
 
@@ -1564,7 +1582,13 @@
 			if flow_utils.HasGroup(flow) {
 				err = dh.RouteMcastFlowOrGroupMsgToChannel(ctx, flow, nil, McastFlowOrGroupAdd)
 			} else {
-				err = dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, true, flowMetadata)
+				if dh.flowMgr == nil || dh.flowMgr[ponIf] == nil {
+					// The flow manager module could be uninitialized if the flow arrives too soon before the device has reconciled fully
+					logger.Errorw(ctx, "flow-manager-uninitialized", log.Fields{"device-id": device.Id})
+					err = fmt.Errorf("flow-manager-uninitialized-%v", device.Id)
+				} else {
+					err = dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, true, flowMetadata)
+				}
 			}
 			if err != nil {
 				errorsList = append(errorsList, err)
@@ -1572,6 +1596,19 @@
 		}
 	}
 
+	return errorsList
+}
+
+func (dh *DeviceHandler) handleGroups(ctx context.Context, groups *of.FlowGroupChanges) []error {
+	var err error
+	var errorsList []error
+
+	if dh.getDeviceDeletionInProgressFlag() {
+		// The device itself is going to be reset as part of deletion. So nothing to be done.
+		logger.Infow(ctx, "device-deletion-in-progress--not-handling-flows-or-groups", log.Fields{"device-id": dh.device.Id})
+		return nil
+	}
+
 	// Whether we need to synchronize multicast group adds and modifies like flow add and delete needs to be investigated
 	if groups != nil {
 		for _, group := range groups.ToAdd.Items {
@@ -1596,6 +1633,24 @@
 			}
 		}
 	}
+
+	return errorsList
+}
+
+//UpdateFlowsIncrementally updates the device flow
+func (dh *DeviceHandler) UpdateFlowsIncrementally(ctx context.Context, device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
+
+	var errorsList []error
+
+	if dh.getDeviceDeletionInProgressFlag() {
+		// The device itself is going to be reset as part of deletion. So nothing to be done.
+		logger.Infow(ctx, "device-deletion-in-progress--not-handling-flows-or-groups", log.Fields{"device-id": device.Id})
+		return nil
+	}
+
+	logger.Debugw(ctx, "received-incremental-flowupdate-in-device-handler", log.Fields{"device-id": device.Id, "flows": flows, "groups": groups, "flowMetadata": flowMetadata})
+	errorsList = append(errorsList, dh.handleFlows(ctx, device, flows, flowMetadata)...)
+	errorsList = append(errorsList, dh.handleGroups(ctx, groups)...)
 	if len(errorsList) > 0 {
 		return fmt.Errorf("errors-installing-flows-groups, errors:%v", errorsList)
 	}
@@ -1744,6 +1799,18 @@
 	*/
 
 	dh.setDeviceDeletionInProgressFlag(true)
+	var wg sync.WaitGroup
+	wg.Add(1) // for the mcast routine below to finish
+	go dh.StopAllMcastHandlerRoutines(ctx, &wg)
+	for _, flMgr := range dh.flowMgr {
+		wg.Add(1) // for the flow handler routine below to finish
+		go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
+	}
+	if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
+		logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": device.Id})
+	} else {
+		logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": device.Id})
+	}
 
 	dh.cleanupDeviceResources(ctx)
 	logger.Debugw(ctx, "removed-device-from-Resource-manager-KV-store", log.Fields{"device-id": dh.device.Id})
@@ -2054,6 +2121,18 @@
 		}
 		dh.lockDevice.RUnlock()
 
+		var wg sync.WaitGroup
+		wg.Add(1) // for the multicast handler routine
+		go dh.StopAllMcastHandlerRoutines(ctx, &wg)
+		for _, flMgr := range dh.flowMgr {
+			wg.Add(1) // for the flow handler routine
+			go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
+		}
+		if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
+			logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": device.Id})
+		} else {
+			logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": device.Id})
+		}
 		//reset adapter reconcile flag
 		dh.adapterPreviouslyConnected = false
 
@@ -2400,6 +2479,11 @@
 // RouteMcastFlowOrGroupMsgToChannel routes incoming mcast flow or group to a channel to be handled by the a specific
 // instance of mcastFlowOrGroupChannelHandlerRoutine meant to handle messages for that group.
 func (dh *DeviceHandler) RouteMcastFlowOrGroupMsgToChannel(ctx context.Context, flow *voltha.OfpFlowStats, group *voltha.OfpGroupEntry, action string) error {
+	if dh.getDeviceDeletionInProgressFlag() {
+		// The device itself is going to be reset as part of deletion. So nothing to be done.
+		logger.Infow(ctx, "device-deletion-in-progress--not-handling-flows-or-groups", log.Fields{"device-id": dh.device.Id})
+		return nil
+	}
 	// Step1 : Fill McastFlowOrGroupControlBlock
 	// Step2 : Push the McastFlowOrGroupControlBlock to appropriate channel
 	// Step3 : Wait on response channel for response
@@ -2422,64 +2506,89 @@
 	} else {
 		return errors.New("flow-and-group-both-nil")
 	}
-	// Derive the appropriate go routine to handle the request by a simple module operation.
-	// There are only MaxNumOfGroupHandlerChannels number of channels to handle the mcast flow or group
-	dh.incomingMcastFlowOrGroup[groupID%MaxNumOfGroupHandlerChannels] <- mcastFlowOrGroupCb
-	// Wait for handler to return error value
-	err := <-errChan
-	logger.Debugw(ctx, "process-flow-or-group--received-resp", log.Fields{"err": err, "totalTimeInSeconds": time.Since(startTime).Milliseconds()})
-	return err
+	mcastRoutineIdx := groupID % MaxNumOfGroupHandlerChannels
+	if dh.mcastHandlerRoutineActive[mcastRoutineIdx] {
+		// Derive the appropriate go routine to handle the request by a simple module operation.
+		// There are only MaxNumOfGroupHandlerChannels number of channels to handle the mcast flow or group
+		dh.incomingMcastFlowOrGroup[groupID%MaxNumOfGroupHandlerChannels] <- mcastFlowOrGroupCb
+		// Wait for handler to return error value
+		err := <-errChan
+		logger.Debugw(ctx, "process-flow-or-group--received-resp", log.Fields{"err": err, "totalTimeInSeconds": time.Since(startTime).Milliseconds()})
+		return err
+	}
+	logger.Errorw(ctx, "mcast handler routine not active for onu", log.Fields{"mcastRoutineIdx": mcastRoutineIdx})
+	return fmt.Errorf("mcast-handler-routine-not-active-for-index-%v", mcastRoutineIdx)
 }
 
 // mcastFlowOrGroupChannelHandlerRoutine routine to handle incoming mcast flow/group message
-func (dh *DeviceHandler) mcastFlowOrGroupChannelHandlerRoutine(mcastFlowOrGroupChannel chan McastFlowOrGroupControlBlock) {
+func (dh *DeviceHandler) mcastFlowOrGroupChannelHandlerRoutine(routineIndex int, mcastFlowOrGroupChannel chan McastFlowOrGroupControlBlock, stopHandler chan bool) {
 	for {
+		select {
 		// block on the channel to receive an incoming mcast flow/group
 		// process the flow completely before proceeding to handle the next flow
-		mcastFlowOrGroupCb := <-mcastFlowOrGroupChannel
-		if mcastFlowOrGroupCb.flow != nil {
-			if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupAdd {
-				logger.Debugw(mcastFlowOrGroupCb.ctx, "adding-mcast-flow",
-					log.Fields{"device-id": dh.device.Id,
-						"flowToAdd": mcastFlowOrGroupCb.flow})
-				// The mcast flow is not unique to any particular PON port, so it is OK to default to PON0
-				err := dh.flowMgr[0].AddFlow(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.flow, nil)
-				// Pass the return value over the return channel
-				*mcastFlowOrGroupCb.errChan <- err
-			} else { // flow remove
-				logger.Debugw(mcastFlowOrGroupCb.ctx, "removing-mcast-flow",
-					log.Fields{"device-id": dh.device.Id,
-						"flowToRemove": mcastFlowOrGroupCb.flow})
-				// The mcast flow is not unique to any particular PON port, so it is OK to default to PON0
-				err := dh.flowMgr[0].RemoveFlow(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.flow)
-				// Pass the return value over the return channel
-				*mcastFlowOrGroupCb.errChan <- err
+		case mcastFlowOrGroupCb := <-mcastFlowOrGroupChannel:
+			if mcastFlowOrGroupCb.flow != nil {
+				if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupAdd {
+					logger.Debugw(mcastFlowOrGroupCb.ctx, "adding-mcast-flow",
+						log.Fields{"device-id": dh.device.Id,
+							"flowToAdd": mcastFlowOrGroupCb.flow})
+					// The mcast flow is not unique to any particular PON port, so it is OK to default to PON0
+					err := dh.flowMgr[0].AddFlow(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.flow, nil)
+					// Pass the return value over the return channel
+					*mcastFlowOrGroupCb.errChan <- err
+				} else { // flow remove
+					logger.Debugw(mcastFlowOrGroupCb.ctx, "removing-mcast-flow",
+						log.Fields{"device-id": dh.device.Id,
+							"flowToRemove": mcastFlowOrGroupCb.flow})
+					// The mcast flow is not unique to any particular PON port, so it is OK to default to PON0
+					err := dh.flowMgr[0].RemoveFlow(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.flow)
+					// Pass the return value over the return channel
+					*mcastFlowOrGroupCb.errChan <- err
+				}
+			} else { // mcast group
+				if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupAdd {
+					logger.Debugw(mcastFlowOrGroupCb.ctx, "adding-mcast-group",
+						log.Fields{"device-id": dh.device.Id,
+							"groupToAdd": mcastFlowOrGroupCb.group})
+					err := dh.groupMgr.AddGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
+					// Pass the return value over the return channel
+					*mcastFlowOrGroupCb.errChan <- err
+				} else if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupModify { // group modify
+					logger.Debugw(mcastFlowOrGroupCb.ctx, "modifying-mcast-group",
+						log.Fields{"device-id": dh.device.Id,
+							"groupToModify": mcastFlowOrGroupCb.group})
+					err := dh.groupMgr.ModifyGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
+					// Pass the return value over the return channel
+					*mcastFlowOrGroupCb.errChan <- err
+				} else { // group remove
+					logger.Debugw(mcastFlowOrGroupCb.ctx, "removing-mcast-group",
+						log.Fields{"device-id": dh.device.Id,
+							"groupToRemove": mcastFlowOrGroupCb.group})
+					err := dh.groupMgr.DeleteGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
+					// Pass the return value over the return channel
+					*mcastFlowOrGroupCb.errChan <- err
+				}
 			}
-		} else { // mcast group
-			if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupAdd {
-				logger.Debugw(mcastFlowOrGroupCb.ctx, "adding-mcast-group",
-					log.Fields{"device-id": dh.device.Id,
-						"groupToAdd": mcastFlowOrGroupCb.group})
-				err := dh.groupMgr.AddGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
-				// Pass the return value over the return channel
-				*mcastFlowOrGroupCb.errChan <- err
-			} else if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupModify { // group modify
-				logger.Debugw(mcastFlowOrGroupCb.ctx, "modifying-mcast-group",
-					log.Fields{"device-id": dh.device.Id,
-						"groupToModify": mcastFlowOrGroupCb.group})
-				err := dh.groupMgr.ModifyGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
-				// Pass the return value over the return channel
-				*mcastFlowOrGroupCb.errChan <- err
-			} else { // group remove
-				logger.Debugw(mcastFlowOrGroupCb.ctx, "removing-mcast-group",
-					log.Fields{"device-id": dh.device.Id,
-						"groupToRemove": mcastFlowOrGroupCb.group})
-				err := dh.groupMgr.DeleteGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
-				// Pass the return value over the return channel
-				*mcastFlowOrGroupCb.errChan <- err
+		case <-stopHandler:
+			dh.mcastHandlerRoutineActive[routineIndex] = false
+			return
+		}
+	}
+}
+
+// StopAllMcastHandlerRoutines stops all flow handler routines. Call this when device is being rebooted or deleted
+func (dh *DeviceHandler) StopAllMcastHandlerRoutines(ctx context.Context, wg *sync.WaitGroup) {
+	for i, v := range dh.stopMcastHandlerRoutine {
+		if dh.mcastHandlerRoutineActive[i] {
+			select {
+			case v <- true:
+			case <-time.After(time.Second * 5):
+				logger.Warnw(ctx, "timeout stopping mcast handler routine", log.Fields{"idx": i, "deviceID": dh.device.Id})
 			}
 		}
 	}
+	wg.Done()
+	logger.Debug(ctx, "stopped all mcast handler routines")
 }
 
 func (dh *DeviceHandler) getOltPortCounters(ctx context.Context, oltPortInfo *extension.GetOltPortCounters) *extension.SingleGetValueResponse {
@@ -2659,3 +2768,19 @@
 	defer dh.lockDevice.RUnlock()
 	return dh.isDeviceDeletionInProgress
 }
+
+// waitForTimeoutOrCompletion waits for the waitgroup for the specified max timeout.
+// Returns false if waiting timed out.
+func (dh *DeviceHandler) waitForTimeoutOrCompletion(wg *sync.WaitGroup, timeout time.Duration) bool {
+	c := make(chan struct{})
+	go func() {
+		defer close(c)
+		wg.Wait()
+	}()
+	select {
+	case <-c:
+		return true // completed normally
+	case <-time.After(timeout):
+		return false // timed out
+	}
+}