[VOL-5471]:Flow add/delete channel fix

Change-Id: I44f8f1a49db9138c64df4cef76bf5deba6b668bf
Signed-off-by: balaji.nagarajan <balaji.nagarajan@radisys.com>
diff --git a/internal/pkg/avcfg/omci_ani_config.go b/internal/pkg/avcfg/omci_ani_config.go
index 758c220..ac489da 100755
--- a/internal/pkg/avcfg/omci_ani_config.go
+++ b/internal/pkg/avcfg/omci_ani_config.go
@@ -151,7 +151,7 @@
 	macBPCD0ID               uint16
 	tcont0ID                 uint16
 	alloc0ID                 uint16
-	uniTpKey                 uniTP
+	uniTpKey                 UniTP
 	techProfileID            uint8
 	isCanceled               bool
 	isAwaitingResponse       bool
@@ -180,7 +180,7 @@
 		chanSet:         false,
 		tcontSetBefore:  false,
 	}
-	instFsm.uniTpKey = uniTP{uniID: apUniPort.UniID, tpID: aTechProfileID}
+	instFsm.uniTpKey = UniTP{UniID: apUniPort.UniID, TpID: aTechProfileID}
 	instFsm.waitFlowDeleteChannel = make(chan bool)
 
 	instFsm.PAdaptFsm = cmn.NewAdapterFsm(aName, instFsm.deviceID, aCommChannel)
diff --git a/internal/pkg/avcfg/onu_uni_tp.go b/internal/pkg/avcfg/onu_uni_tp.go
index a905192..f27969b 100755
--- a/internal/pkg/avcfg/onu_uni_tp.go
+++ b/internal/pkg/avcfg/onu_uni_tp.go
@@ -93,9 +93,9 @@
 }
 
 // refers a unique combination of uniID and tpID for a given ONU.
-type uniTP struct {
-	uniID uint8
-	tpID  uint8
+type UniTP struct {
+	UniID uint8
+	TpID  uint8
 }
 
 // OnuUniTechProf structure holds information about the TechProfiles attached to Uni Ports of the ONU
@@ -103,21 +103,21 @@
 	baseDeviceHandler        cmn.IdeviceHandler
 	onuDevice                cmn.IonuDeviceEntry
 	chTpConfigProcessingStep chan uint8
-	mapUniTpIndication       map[uniTP]*tTechProfileIndication //use pointer values to ease assignments to the map
-	mapPonAniConfig          map[uniTP]*tcontGemList           //per UNI: use pointer values to ease assignments to the map
-	PAniConfigFsm            map[uniTP]*UniPonAniConfigFsm
-	procResult               map[uniTP]error //error indication of processing
-	tpProfileExists          map[uniTP]bool
-	tpProfileResetting       map[uniTP]bool
-	mapRemoveGemEntry        map[uniTP]*gemPortParamStruct //per UNI: pointer to GemEntry to be removed
+	mapUniTpIndication       map[UniTP]*tTechProfileIndication //use pointer values to ease assignments to the map
+	mapPonAniConfig          map[UniTP]*tcontGemList           //per UNI: use pointer values to ease assignments to the map
+	PAniConfigFsm            map[UniTP]*UniPonAniConfigFsm
+	procResult               map[UniTP]error //error indication of processing
+	tpProfileExists          map[UniTP]bool
+	tpProfileResetting       map[UniTP]bool
+	mapRemoveGemEntry        map[UniTP]*gemPortParamStruct //per UNI: pointer to GemEntry to be removed
 	deviceID                 string
 	tpProcMutex              sync.RWMutex
 	mutexTPState             sync.RWMutex
 }
 
-func (onuTP *OnuUniTechProf) multicastConfiguredForOtherUniTps(ctx context.Context, uniTpKey uniTP) bool {
+func (onuTP *OnuUniTechProf) multicastConfiguredForOtherUniTps(ctx context.Context, uniTpKey UniTP) bool {
 	for _, aniFsm := range onuTP.PAniConfigFsm {
-		if aniFsm.uniTpKey.uniID == uniTpKey.uniID && aniFsm.uniTpKey.tpID == uniTpKey.tpID {
+		if aniFsm.uniTpKey.UniID == uniTpKey.UniID && aniFsm.uniTpKey.TpID == uniTpKey.TpID {
 			continue
 		}
 		if aniFsm.hasMulticastGem(ctx) {
@@ -137,12 +137,12 @@
 	onuTP.baseDeviceHandler = aDeviceHandler
 	onuTP.onuDevice = aOnuDev
 	onuTP.chTpConfigProcessingStep = make(chan uint8)
-	onuTP.mapUniTpIndication = make(map[uniTP]*tTechProfileIndication)
-	onuTP.mapPonAniConfig = make(map[uniTP]*tcontGemList)
-	onuTP.procResult = make(map[uniTP]error)
-	onuTP.tpProfileExists = make(map[uniTP]bool)
-	onuTP.tpProfileResetting = make(map[uniTP]bool)
-	onuTP.mapRemoveGemEntry = make(map[uniTP]*gemPortParamStruct)
+	onuTP.mapUniTpIndication = make(map[UniTP]*tTechProfileIndication)
+	onuTP.mapPonAniConfig = make(map[UniTP]*tcontGemList)
+	onuTP.procResult = make(map[UniTP]error)
+	onuTP.tpProfileExists = make(map[UniTP]bool)
+	onuTP.tpProfileResetting = make(map[UniTP]bool)
+	onuTP.mapRemoveGemEntry = make(map[UniTP]*gemPortParamStruct)
 
 	return &onuTP
 }
@@ -162,14 +162,14 @@
 func (onuTP *OnuUniTechProf) ResetTpProcessingErrorIndication(aUniID uint8, aTpID uint8) {
 	onuTP.mutexTPState.Lock()
 	defer onuTP.mutexTPState.Unlock()
-	onuTP.procResult[uniTP{uniID: aUniID, tpID: aTpID}] = nil
+	onuTP.procResult[UniTP{UniID: aUniID, TpID: aTpID}] = nil
 }
 
 // GetTpProcessingErrorIndication - TODO: add comment
 func (onuTP *OnuUniTechProf) GetTpProcessingErrorIndication(aUniID uint8, aTpID uint8) error {
 	onuTP.mutexTPState.RLock()
 	defer onuTP.mutexTPState.RUnlock()
-	return onuTP.procResult[uniTP{uniID: aUniID, tpID: aTpID}]
+	return onuTP.procResult[UniTP{UniID: aUniID, TpID: aTpID}]
 }
 
 // ConfigureUniTp checks existing tp resources to configure and starts the corresponding OMCI configuation of the UNI port
@@ -183,7 +183,7 @@
 	logger.Info(ctx, "configure the Uni according to TpPath", log.Fields{
 		"device-id": onuTP.deviceID, "uni-id": aUniID, "path": aPathString})
 	tpID, err := cmn.GetTpIDFromTpPath(aPathString)
-	uniTpKey := uniTP{uniID: aUniID, tpID: tpID}
+	uniTpKey := UniTP{UniID: aUniID, TpID: tpID}
 	if err != nil {
 		logger.Errorw(ctx, "error-extracting-tp-id-from-tp-path", log.Fields{"device-id": onuTP.deviceID, "uni-id": aUniID, "path": aPathString})
 		return
@@ -200,7 +200,7 @@
 	}
 	if pCurrentUniPort == nil {
 		logger.Errorw(ctx, "TechProfile configuration aborted: requested uniID not found in PortDB",
-			log.Fields{"device-id": onuTP.deviceID, "uni-id": aUniID, "tp-id": uniTpKey.tpID})
+			log.Fields{"device-id": onuTP.deviceID, "uni-id": aUniID, "tp-id": uniTpKey.TpID})
 		onuTP.mutexTPState.Lock()
 		defer onuTP.mutexTPState.Unlock()
 		onuTP.procResult[uniTpKey] = fmt.Errorf("techProfile config aborted: requested uniID not found %d on %s",
@@ -210,7 +210,7 @@
 
 	if onuTP.getProfileResetting(uniTpKey) {
 		logger.Debugw(ctx, "aborting TP configuration, reset requested in parallel", log.Fields{
-			"device-id": onuTP.deviceID, "uni-id": aUniID, "tp-id": uniTpKey.tpID})
+			"device-id": onuTP.deviceID, "uni-id": aUniID, "tp-id": uniTpKey.TpID})
 		onuTP.mutexTPState.Lock()
 		defer onuTP.mutexTPState.Unlock()
 		onuTP.procResult[uniTpKey] = fmt.Errorf(
@@ -259,7 +259,7 @@
 	}
 	if onuTP.getProfileResetting(uniTpKey) {
 		logger.Debugw(ctx, "aborting TP configuration, reset requested in parallel", log.Fields{
-			"device-id": onuTP.deviceID, "uni-id": aUniID, "tp-id": uniTpKey.tpID})
+			"device-id": onuTP.deviceID, "uni-id": aUniID, "tp-id": uniTpKey.TpID})
 		onuTP.mutexTPState.Lock()
 		defer onuTP.mutexTPState.Unlock()
 		onuTP.procResult[uniTpKey] = fmt.Errorf(
@@ -342,8 +342,8 @@
 	onuTP.mutexTPState.Lock()
 	defer onuTP.mutexTPState.Unlock()
 
-	uniTPKey := uniTP{uniID: aUniID, tpID: aTpID}
-	onuTP.tpProfileExists[uniTP{uniID: aUniID, tpID: aTpID}] = false
+	uniTPKey := UniTP{UniID: aUniID, TpID: aTpID}
+	onuTP.tpProfileExists[UniTP{UniID: aUniID, TpID: aTpID}] = false
 
 	//at this point it is assumed that a new TechProfile is assigned to the UNI
 	//expectation is that no TPIndication entry exists here, if exists and with the same TPId
@@ -547,7 +547,7 @@
 	//OMCI transfer of ANI data acc. to mapPonAniConfig
 	// also the FSM's are running in background,
 	//   hence we have to make sure they indicate 'success' on chTpConfigProcessingStep with aProcessingStep
-	uniTPKey := uniTP{uniID: aUniID, tpID: aTpID}
+	uniTPKey := UniTP{UniID: aUniID, TpID: aTpID}
 	if onuTP.PAniConfigFsm == nil {
 		return onuTP.createAniConfigFsm(ctx, aUniID, aTpID, apCurrentUniPort, cmn.OmciAniConfigDone, aProcessingStep)
 	} else if _, ok := onuTP.PAniConfigFsm[uniTPKey]; !ok {
@@ -565,7 +565,7 @@
 	defer wg.Done()
 	logger.Debugw(ctx, "will remove TP resources from ONU's UNI", log.Fields{
 		"device-id": onuTP.deviceID, "uni-id": aUniID, "path": aPathString, "Resource": aResource})
-	uniTPKey := uniTP{uniID: aUniID, tpID: aTpID}
+	uniTPKey := UniTP{UniID: aUniID, TpID: aTpID}
 
 	if CResourceGemPort == aResource {
 		logger.Debugw(ctx, "remove GemPort from the list of existing ones of the TP", log.Fields{
@@ -771,7 +771,7 @@
 
 // IsTechProfileConfigCleared - TODO: add comment
 func (onuTP *OnuUniTechProf) IsTechProfileConfigCleared(ctx context.Context, uniID uint8, tpID uint8) bool {
-	uniTPKey := uniTP{uniID: uniID, tpID: tpID}
+	uniTPKey := UniTP{UniID: uniID, TpID: tpID}
 	logger.Debugw(ctx, "IsTechProfileConfigCleared", log.Fields{"device-id": onuTP.deviceID})
 	onuTP.mutexTPState.RLock()
 	if onuTP.mapPonAniConfig[uniTPKey] != nil {
@@ -822,7 +822,7 @@
 	apCurrentUniPort *cmn.OnuUniPort, devEvent cmn.OnuDeviceEvent, aProcessingStep uint8) error {
 	logger.Info(ctx, "createAniConfigFsm", log.Fields{"device-id": onuTP.deviceID})
 	chAniConfigFsm := make(chan cmn.Message, 2)
-	uniTPKey := uniTP{uniID: aUniID, tpID: aTpID}
+	uniTPKey := UniTP{UniID: aUniID, TpID: aTpID}
 	if onuTP.onuDevice == nil {
 		logger.Errorw(ctx, "No valid OnuDevice - aborting", log.Fields{"device-id": onuTP.deviceID})
 		return fmt.Errorf("no valid OnuDevice: %s", onuTP.deviceID)
@@ -835,21 +835,21 @@
 		return fmt.Errorf("could not create AniConfigFSM: %s", onuTP.deviceID)
 	}
 	if onuTP.PAniConfigFsm == nil {
-		onuTP.PAniConfigFsm = make(map[uniTP]*UniPonAniConfigFsm)
+		onuTP.PAniConfigFsm = make(map[UniTP]*UniPonAniConfigFsm)
 	}
 	onuTP.PAniConfigFsm[uniTPKey] = pAniCfgFsm
 	return onuTP.runAniConfigFsm(ctx, aniEvStart, aProcessingStep, aUniID, aTpID)
 }
 
 // deleteGemPortParams removes GemPort from config DB
-func (onuTP *OnuUniTechProf) deleteGemPortParams(ctx context.Context, uniTPKey uniTP) {
+func (onuTP *OnuUniTechProf) deleteGemPortParams(ctx context.Context, uniTPKey UniTP) {
 	//ensure write protection for access to mapPonAniConfig
 	onuTP.mutexTPState.Lock()
 	if _, ok := onuTP.mapPonAniConfig[uniTPKey]; ok {
 		delete(onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams, onuTP.mapRemoveGemEntry[uniTPKey].removeGemID)
 	} else {
 		logger.Warnw(ctx, "GemPort removal - GemPort not found in mapPonAniConfig",
-			log.Fields{"device-id": onuTP.deviceID, "uni-id": uniTPKey.uniID, "tp-id": uniTPKey.tpID})
+			log.Fields{"device-id": onuTP.deviceID, "uni-id": uniTPKey.UniID, "tp-id": uniTPKey.TpID})
 	}
 	// remove from the removGemeEntry
 	delete(onuTP.mapRemoveGemEntry, uniTPKey)
@@ -863,7 +863,7 @@
 	 */
 	logger.Info(ctx, "Run AniConfigFSM with", log.Fields{
 		"ProcessingStep": aProcessingStep, "device-id": onuTP.deviceID, "UniId": aUniID, "TpID": aTpID, "event": aEvent})
-	uniTpKey := uniTP{uniID: aUniID, tpID: aTpID}
+	uniTpKey := UniTP{UniID: aUniID, TpID: aTpID}
 
 	pACStatemachine := onuTP.PAniConfigFsm[uniTpKey].PAdaptFsm.PFsm
 	if pACStatemachine != nil {
@@ -899,7 +899,7 @@
 func (onuTP *OnuUniTechProf) clearAniSideConfig(ctx context.Context, aUniID uint8, aTpID uint8) {
 	logger.Debugw(ctx, "removing TpIndication and PonAniConfig data", log.Fields{
 		"device-id": onuTP.deviceID, "uni-id": aUniID})
-	uniTpKey := uniTP{uniID: aUniID, tpID: aTpID}
+	uniTpKey := UniTP{UniID: aUniID, TpID: aTpID}
 
 	onuTP.mutexTPState.Lock()
 	defer onuTP.mutexTPState.Unlock()
@@ -912,7 +912,7 @@
 
 // setConfigDone sets the requested techProfile config state (if possible)
 func (onuTP *OnuUniTechProf) setConfigDone(aUniID uint8, aTpID uint8, aState bool) {
-	uniTpKey := uniTP{uniID: aUniID, tpID: aTpID}
+	uniTpKey := UniTP{UniID: aUniID, TpID: aTpID}
 	onuTP.mutexTPState.Lock()
 	defer onuTP.mutexTPState.Unlock()
 	if _, existTP := onuTP.mapUniTpIndication[uniTpKey]; existTP {
@@ -922,7 +922,7 @@
 
 // getTechProfileDone checks if the Techprofile processing with the requested TechProfile ID was done
 func (onuTP *OnuUniTechProf) getTechProfileDone(ctx context.Context, aUniID uint8, aTpID uint8) bool {
-	uniTpKey := uniTP{uniID: aUniID, tpID: aTpID}
+	uniTpKey := UniTP{UniID: aUniID, TpID: aTpID}
 	onuTP.mutexTPState.RLock()
 	defer onuTP.mutexTPState.RUnlock()
 	if _, existTP := onuTP.mapUniTpIndication[uniTpKey]; existTP {
@@ -941,7 +941,7 @@
 
 // SetProfileToDelete sets the requested techProfile toDelete state (if possible)
 func (onuTP *OnuUniTechProf) SetProfileToDelete(aUniID uint8, aTpID uint8, aState bool) {
-	uniTpKey := uniTP{uniID: aUniID, tpID: aTpID}
+	uniTpKey := UniTP{UniID: aUniID, TpID: aTpID}
 	onuTP.mutexTPState.Lock()
 	defer onuTP.mutexTPState.Unlock()
 	if _, existTP := onuTP.mapUniTpIndication[uniTpKey]; existTP {
@@ -950,7 +950,7 @@
 }
 
 func (onuTP *OnuUniTechProf) getMulticastGemPorts(ctx context.Context, aUniID uint8, aTpID uint8) []uint16 {
-	uniTpKey := uniTP{uniID: aUniID, tpID: aTpID}
+	uniTpKey := UniTP{UniID: aUniID, TpID: aTpID}
 	onuTP.mutexTPState.RLock()
 	defer onuTP.mutexTPState.RUnlock()
 	gemPortIds := make([]uint16, 0)
@@ -968,7 +968,7 @@
 }
 
 func (onuTP *OnuUniTechProf) getBidirectionalGemPortIDsForTP(ctx context.Context, aUniID uint8, aTpID uint8) []uint16 {
-	uniTpKey := uniTP{uniID: aUniID, tpID: aTpID}
+	uniTpKey := UniTP{UniID: aUniID, TpID: aTpID}
 	onuTP.mutexTPState.RLock()
 	defer onuTP.mutexTPState.RUnlock()
 	gemPortIds := make([]uint16, 0)
@@ -1032,14 +1032,14 @@
 //
 //nolint:unparam
 func (onuTP *OnuUniTechProf) setProfileResetting(ctx context.Context, aUniID uint8, aTpID uint8, aState bool) {
-	uniTpKey := uniTP{uniID: aUniID, tpID: aTpID}
+	uniTpKey := UniTP{UniID: aUniID, TpID: aTpID}
 	onuTP.mutexTPState.Lock()
 	defer onuTP.mutexTPState.Unlock()
 	onuTP.tpProfileResetting[uniTpKey] = aState
 }
 
 // getProfileResetting returns true, if the the according indication for started reset procedure is set
-func (onuTP *OnuUniTechProf) getProfileResetting(aUniTpKey uniTP) bool {
+func (onuTP *OnuUniTechProf) getProfileResetting(aUniTpKey UniTP) bool {
 	onuTP.mutexTPState.RLock()
 	defer onuTP.mutexTPState.RUnlock()
 	if isResetting, exist := onuTP.tpProfileResetting[aUniTpKey]; exist {
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index a3cdb6c..aeeef16 100755
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -4868,9 +4868,58 @@
 				go dh.removeFlowItemFromUniPort(flowCb.ctx, flowCb.flowItem, flowCb.uniPort, &respChan)
 			}
 			// Block on response and tunnel it back to the caller
-			*flowCb.respChan <- <-respChan
-			logger.Info(flowCb.ctx, "serial-flow-processor--end",
-				log.Fields{"device-id": dh.DeviceID, "absoluteTimeForFlowProcessingInSecs": time.Since(startTime).Seconds()})
+			select {
+
+			case msg := <-respChan:
+				*flowCb.respChan <- msg
+				// response sent successfully
+				logger.Info(flowCb.ctx, "serial-flow-processor--end",
+					log.Fields{"device-id": dh.DeviceID, "absoluteTimeForFlowProcessingInSecs": time.Since(startTime).Seconds()})
+			case <-flowCb.ctx.Done():
+				logger.Info(flowCb.ctx, "flow handler context cancelled or timed out", log.Fields{"device-id": dh.DeviceID})
+				// Optionally, you can handle cleanup or logging here
+				if dh.UniVlanConfigFsmMap[flowCb.uniPort.UniID] != nil && dh.UniVlanConfigFsmMap[flowCb.uniPort.UniID].PAdaptFsm != nil {
+					pVlanFilterStatemachine := dh.UniVlanConfigFsmMap[flowCb.uniPort.UniID].PAdaptFsm.PFsm
+					if pVlanFilterStatemachine != nil {
+
+						if err := pVlanFilterStatemachine.Event(avcfg.VlanEvReset); err != nil {
+							logger.Warnw(flowCb.ctx, "UniVlanConfigFsm: can't reset",
+								log.Fields{"device-id": dh.DeviceID, "err": err})
+
+						}
+
+					}
+				}
+
+				ctx2 := context.Background()
+				metadata := flow.GetMetadataFromWriteMetadataAction(ctx2, flowCb.flowItem)
+				if metadata == 0 {
+					logger.Warnw(flowCb.ctx, "AniConfigFsm: can't reset,failed to fetch metadata flow: %v",
+						log.Fields{"device-id": dh.DeviceID, "flows": flowCb.flowItem})
+					continue
+				}
+				TpID := flow.GetTechProfileIDFromWriteMetaData(ctx2, metadata)
+
+				if TpID == uint16(0) {
+					logger.Warnw(flowCb.ctx, "AniConfigFsm: can't reset,failed to fetch techprofileid flow: %v",
+						log.Fields{"device-id": dh.DeviceID, "flows": flowCb.flowItem})
+					continue
+				}
+				if dh.pOnuTP != nil {
+					// should always be the case here
+					// FSM  stop maybe encapsulated as OnuTP method - perhaps later in context of module splitting
+					if dh.pOnuTP.PAniConfigFsm != nil {
+						uniTP := avcfg.UniTP{
+							UniID: flowCb.uniPort.UniID,
+							TpID:  uint8(TpID),
+						}
+						if dh.pOnuTP.PAniConfigFsm[uniTP] != nil {
+							dh.pOnuTP.PAniConfigFsm[uniTP].CancelProcessing(context.Background())
+						}
+					}
+				}
+
+			}
 		case <-dh.stopFlowMonitoringRoutine[uniID]:
 			logger.Infow(context.Background(), "stopping-flow-handler-routine", log.Fields{"device-id": dh.DeviceID})
 			dh.setFlowMonitoringIsRunning(uniID, false)
diff --git a/internal/pkg/core/openonu.go b/internal/pkg/core/openonu.go
index 176e15e..3135a94 100755
--- a/internal/pkg/core/openonu.go
+++ b/internal/pkg/core/openonu.go
@@ -419,7 +419,7 @@
 	}
 
 	if handler, err := oo.getDeviceHandler(ctx, incrFlows.Device.Id, false); handler != nil {
-		if flowUpdateErr := handler.FlowUpdateIncremental(log.WithSpanFromContext(context.Background(), ctx), incrFlows.Flows, incrFlows.Groups, incrFlows.FlowMetadata); flowUpdateErr != nil {
+		if flowUpdateErr := handler.FlowUpdateIncremental(ctx, incrFlows.Flows, incrFlows.Groups, incrFlows.FlowMetadata); flowUpdateErr != nil {
 			return nil, flowUpdateErr
 		}
 		return &empty.Empty{}, nil