[VOL-5471]:Flow add/delete channel fix
Change-Id: I44f8f1a49db9138c64df4cef76bf5deba6b668bf
Signed-off-by: balaji.nagarajan <balaji.nagarajan@radisys.com>
diff --git a/internal/pkg/avcfg/omci_ani_config.go b/internal/pkg/avcfg/omci_ani_config.go
index 758c220..ac489da 100755
--- a/internal/pkg/avcfg/omci_ani_config.go
+++ b/internal/pkg/avcfg/omci_ani_config.go
@@ -151,7 +151,7 @@
macBPCD0ID uint16
tcont0ID uint16
alloc0ID uint16
- uniTpKey uniTP
+ uniTpKey UniTP
techProfileID uint8
isCanceled bool
isAwaitingResponse bool
@@ -180,7 +180,7 @@
chanSet: false,
tcontSetBefore: false,
}
- instFsm.uniTpKey = uniTP{uniID: apUniPort.UniID, tpID: aTechProfileID}
+ instFsm.uniTpKey = UniTP{UniID: apUniPort.UniID, TpID: aTechProfileID}
instFsm.waitFlowDeleteChannel = make(chan bool)
instFsm.PAdaptFsm = cmn.NewAdapterFsm(aName, instFsm.deviceID, aCommChannel)
diff --git a/internal/pkg/avcfg/onu_uni_tp.go b/internal/pkg/avcfg/onu_uni_tp.go
index a905192..f27969b 100755
--- a/internal/pkg/avcfg/onu_uni_tp.go
+++ b/internal/pkg/avcfg/onu_uni_tp.go
@@ -93,9 +93,9 @@
}
// refers a unique combination of uniID and tpID for a given ONU.
-type uniTP struct {
- uniID uint8
- tpID uint8
+type UniTP struct {
+ UniID uint8
+ TpID uint8
}
// OnuUniTechProf structure holds information about the TechProfiles attached to Uni Ports of the ONU
@@ -103,21 +103,21 @@
baseDeviceHandler cmn.IdeviceHandler
onuDevice cmn.IonuDeviceEntry
chTpConfigProcessingStep chan uint8
- mapUniTpIndication map[uniTP]*tTechProfileIndication //use pointer values to ease assignments to the map
- mapPonAniConfig map[uniTP]*tcontGemList //per UNI: use pointer values to ease assignments to the map
- PAniConfigFsm map[uniTP]*UniPonAniConfigFsm
- procResult map[uniTP]error //error indication of processing
- tpProfileExists map[uniTP]bool
- tpProfileResetting map[uniTP]bool
- mapRemoveGemEntry map[uniTP]*gemPortParamStruct //per UNI: pointer to GemEntry to be removed
+ mapUniTpIndication map[UniTP]*tTechProfileIndication //use pointer values to ease assignments to the map
+ mapPonAniConfig map[UniTP]*tcontGemList //per UNI: use pointer values to ease assignments to the map
+ PAniConfigFsm map[UniTP]*UniPonAniConfigFsm
+ procResult map[UniTP]error //error indication of processing
+ tpProfileExists map[UniTP]bool
+ tpProfileResetting map[UniTP]bool
+ mapRemoveGemEntry map[UniTP]*gemPortParamStruct //per UNI: pointer to GemEntry to be removed
deviceID string
tpProcMutex sync.RWMutex
mutexTPState sync.RWMutex
}
-func (onuTP *OnuUniTechProf) multicastConfiguredForOtherUniTps(ctx context.Context, uniTpKey uniTP) bool {
+func (onuTP *OnuUniTechProf) multicastConfiguredForOtherUniTps(ctx context.Context, uniTpKey UniTP) bool {
for _, aniFsm := range onuTP.PAniConfigFsm {
- if aniFsm.uniTpKey.uniID == uniTpKey.uniID && aniFsm.uniTpKey.tpID == uniTpKey.tpID {
+ if aniFsm.uniTpKey.UniID == uniTpKey.UniID && aniFsm.uniTpKey.TpID == uniTpKey.TpID {
continue
}
if aniFsm.hasMulticastGem(ctx) {
@@ -137,12 +137,12 @@
onuTP.baseDeviceHandler = aDeviceHandler
onuTP.onuDevice = aOnuDev
onuTP.chTpConfigProcessingStep = make(chan uint8)
- onuTP.mapUniTpIndication = make(map[uniTP]*tTechProfileIndication)
- onuTP.mapPonAniConfig = make(map[uniTP]*tcontGemList)
- onuTP.procResult = make(map[uniTP]error)
- onuTP.tpProfileExists = make(map[uniTP]bool)
- onuTP.tpProfileResetting = make(map[uniTP]bool)
- onuTP.mapRemoveGemEntry = make(map[uniTP]*gemPortParamStruct)
+ onuTP.mapUniTpIndication = make(map[UniTP]*tTechProfileIndication)
+ onuTP.mapPonAniConfig = make(map[UniTP]*tcontGemList)
+ onuTP.procResult = make(map[UniTP]error)
+ onuTP.tpProfileExists = make(map[UniTP]bool)
+ onuTP.tpProfileResetting = make(map[UniTP]bool)
+ onuTP.mapRemoveGemEntry = make(map[UniTP]*gemPortParamStruct)
return &onuTP
}
@@ -162,14 +162,14 @@
func (onuTP *OnuUniTechProf) ResetTpProcessingErrorIndication(aUniID uint8, aTpID uint8) {
onuTP.mutexTPState.Lock()
defer onuTP.mutexTPState.Unlock()
- onuTP.procResult[uniTP{uniID: aUniID, tpID: aTpID}] = nil
+ onuTP.procResult[UniTP{UniID: aUniID, TpID: aTpID}] = nil
}
// GetTpProcessingErrorIndication - TODO: add comment
func (onuTP *OnuUniTechProf) GetTpProcessingErrorIndication(aUniID uint8, aTpID uint8) error {
onuTP.mutexTPState.RLock()
defer onuTP.mutexTPState.RUnlock()
- return onuTP.procResult[uniTP{uniID: aUniID, tpID: aTpID}]
+ return onuTP.procResult[UniTP{UniID: aUniID, TpID: aTpID}]
}
// ConfigureUniTp checks existing tp resources to configure and starts the corresponding OMCI configuation of the UNI port
@@ -183,7 +183,7 @@
logger.Info(ctx, "configure the Uni according to TpPath", log.Fields{
"device-id": onuTP.deviceID, "uni-id": aUniID, "path": aPathString})
tpID, err := cmn.GetTpIDFromTpPath(aPathString)
- uniTpKey := uniTP{uniID: aUniID, tpID: tpID}
+ uniTpKey := UniTP{UniID: aUniID, TpID: tpID}
if err != nil {
logger.Errorw(ctx, "error-extracting-tp-id-from-tp-path", log.Fields{"device-id": onuTP.deviceID, "uni-id": aUniID, "path": aPathString})
return
@@ -200,7 +200,7 @@
}
if pCurrentUniPort == nil {
logger.Errorw(ctx, "TechProfile configuration aborted: requested uniID not found in PortDB",
- log.Fields{"device-id": onuTP.deviceID, "uni-id": aUniID, "tp-id": uniTpKey.tpID})
+ log.Fields{"device-id": onuTP.deviceID, "uni-id": aUniID, "tp-id": uniTpKey.TpID})
onuTP.mutexTPState.Lock()
defer onuTP.mutexTPState.Unlock()
onuTP.procResult[uniTpKey] = fmt.Errorf("techProfile config aborted: requested uniID not found %d on %s",
@@ -210,7 +210,7 @@
if onuTP.getProfileResetting(uniTpKey) {
logger.Debugw(ctx, "aborting TP configuration, reset requested in parallel", log.Fields{
- "device-id": onuTP.deviceID, "uni-id": aUniID, "tp-id": uniTpKey.tpID})
+ "device-id": onuTP.deviceID, "uni-id": aUniID, "tp-id": uniTpKey.TpID})
onuTP.mutexTPState.Lock()
defer onuTP.mutexTPState.Unlock()
onuTP.procResult[uniTpKey] = fmt.Errorf(
@@ -259,7 +259,7 @@
}
if onuTP.getProfileResetting(uniTpKey) {
logger.Debugw(ctx, "aborting TP configuration, reset requested in parallel", log.Fields{
- "device-id": onuTP.deviceID, "uni-id": aUniID, "tp-id": uniTpKey.tpID})
+ "device-id": onuTP.deviceID, "uni-id": aUniID, "tp-id": uniTpKey.TpID})
onuTP.mutexTPState.Lock()
defer onuTP.mutexTPState.Unlock()
onuTP.procResult[uniTpKey] = fmt.Errorf(
@@ -342,8 +342,8 @@
onuTP.mutexTPState.Lock()
defer onuTP.mutexTPState.Unlock()
- uniTPKey := uniTP{uniID: aUniID, tpID: aTpID}
- onuTP.tpProfileExists[uniTP{uniID: aUniID, tpID: aTpID}] = false
+ uniTPKey := UniTP{UniID: aUniID, TpID: aTpID}
+ onuTP.tpProfileExists[UniTP{UniID: aUniID, TpID: aTpID}] = false
//at this point it is assumed that a new TechProfile is assigned to the UNI
//expectation is that no TPIndication entry exists here, if exists and with the same TPId
@@ -547,7 +547,7 @@
//OMCI transfer of ANI data acc. to mapPonAniConfig
// also the FSM's are running in background,
// hence we have to make sure they indicate 'success' on chTpConfigProcessingStep with aProcessingStep
- uniTPKey := uniTP{uniID: aUniID, tpID: aTpID}
+ uniTPKey := UniTP{UniID: aUniID, TpID: aTpID}
if onuTP.PAniConfigFsm == nil {
return onuTP.createAniConfigFsm(ctx, aUniID, aTpID, apCurrentUniPort, cmn.OmciAniConfigDone, aProcessingStep)
} else if _, ok := onuTP.PAniConfigFsm[uniTPKey]; !ok {
@@ -565,7 +565,7 @@
defer wg.Done()
logger.Debugw(ctx, "will remove TP resources from ONU's UNI", log.Fields{
"device-id": onuTP.deviceID, "uni-id": aUniID, "path": aPathString, "Resource": aResource})
- uniTPKey := uniTP{uniID: aUniID, tpID: aTpID}
+ uniTPKey := UniTP{UniID: aUniID, TpID: aTpID}
if CResourceGemPort == aResource {
logger.Debugw(ctx, "remove GemPort from the list of existing ones of the TP", log.Fields{
@@ -771,7 +771,7 @@
// IsTechProfileConfigCleared - TODO: add comment
func (onuTP *OnuUniTechProf) IsTechProfileConfigCleared(ctx context.Context, uniID uint8, tpID uint8) bool {
- uniTPKey := uniTP{uniID: uniID, tpID: tpID}
+ uniTPKey := UniTP{UniID: uniID, TpID: tpID}
logger.Debugw(ctx, "IsTechProfileConfigCleared", log.Fields{"device-id": onuTP.deviceID})
onuTP.mutexTPState.RLock()
if onuTP.mapPonAniConfig[uniTPKey] != nil {
@@ -822,7 +822,7 @@
apCurrentUniPort *cmn.OnuUniPort, devEvent cmn.OnuDeviceEvent, aProcessingStep uint8) error {
logger.Info(ctx, "createAniConfigFsm", log.Fields{"device-id": onuTP.deviceID})
chAniConfigFsm := make(chan cmn.Message, 2)
- uniTPKey := uniTP{uniID: aUniID, tpID: aTpID}
+ uniTPKey := UniTP{UniID: aUniID, TpID: aTpID}
if onuTP.onuDevice == nil {
logger.Errorw(ctx, "No valid OnuDevice - aborting", log.Fields{"device-id": onuTP.deviceID})
return fmt.Errorf("no valid OnuDevice: %s", onuTP.deviceID)
@@ -835,21 +835,21 @@
return fmt.Errorf("could not create AniConfigFSM: %s", onuTP.deviceID)
}
if onuTP.PAniConfigFsm == nil {
- onuTP.PAniConfigFsm = make(map[uniTP]*UniPonAniConfigFsm)
+ onuTP.PAniConfigFsm = make(map[UniTP]*UniPonAniConfigFsm)
}
onuTP.PAniConfigFsm[uniTPKey] = pAniCfgFsm
return onuTP.runAniConfigFsm(ctx, aniEvStart, aProcessingStep, aUniID, aTpID)
}
// deleteGemPortParams removes GemPort from config DB
-func (onuTP *OnuUniTechProf) deleteGemPortParams(ctx context.Context, uniTPKey uniTP) {
+func (onuTP *OnuUniTechProf) deleteGemPortParams(ctx context.Context, uniTPKey UniTP) {
//ensure write protection for access to mapPonAniConfig
onuTP.mutexTPState.Lock()
if _, ok := onuTP.mapPonAniConfig[uniTPKey]; ok {
delete(onuTP.mapPonAniConfig[uniTPKey].mapGemPortParams, onuTP.mapRemoveGemEntry[uniTPKey].removeGemID)
} else {
logger.Warnw(ctx, "GemPort removal - GemPort not found in mapPonAniConfig",
- log.Fields{"device-id": onuTP.deviceID, "uni-id": uniTPKey.uniID, "tp-id": uniTPKey.tpID})
+ log.Fields{"device-id": onuTP.deviceID, "uni-id": uniTPKey.UniID, "tp-id": uniTPKey.TpID})
}
// remove from the removGemeEntry
delete(onuTP.mapRemoveGemEntry, uniTPKey)
@@ -863,7 +863,7 @@
*/
logger.Info(ctx, "Run AniConfigFSM with", log.Fields{
"ProcessingStep": aProcessingStep, "device-id": onuTP.deviceID, "UniId": aUniID, "TpID": aTpID, "event": aEvent})
- uniTpKey := uniTP{uniID: aUniID, tpID: aTpID}
+ uniTpKey := UniTP{UniID: aUniID, TpID: aTpID}
pACStatemachine := onuTP.PAniConfigFsm[uniTpKey].PAdaptFsm.PFsm
if pACStatemachine != nil {
@@ -899,7 +899,7 @@
func (onuTP *OnuUniTechProf) clearAniSideConfig(ctx context.Context, aUniID uint8, aTpID uint8) {
logger.Debugw(ctx, "removing TpIndication and PonAniConfig data", log.Fields{
"device-id": onuTP.deviceID, "uni-id": aUniID})
- uniTpKey := uniTP{uniID: aUniID, tpID: aTpID}
+ uniTpKey := UniTP{UniID: aUniID, TpID: aTpID}
onuTP.mutexTPState.Lock()
defer onuTP.mutexTPState.Unlock()
@@ -912,7 +912,7 @@
// setConfigDone sets the requested techProfile config state (if possible)
func (onuTP *OnuUniTechProf) setConfigDone(aUniID uint8, aTpID uint8, aState bool) {
- uniTpKey := uniTP{uniID: aUniID, tpID: aTpID}
+ uniTpKey := UniTP{UniID: aUniID, TpID: aTpID}
onuTP.mutexTPState.Lock()
defer onuTP.mutexTPState.Unlock()
if _, existTP := onuTP.mapUniTpIndication[uniTpKey]; existTP {
@@ -922,7 +922,7 @@
// getTechProfileDone checks if the Techprofile processing with the requested TechProfile ID was done
func (onuTP *OnuUniTechProf) getTechProfileDone(ctx context.Context, aUniID uint8, aTpID uint8) bool {
- uniTpKey := uniTP{uniID: aUniID, tpID: aTpID}
+ uniTpKey := UniTP{UniID: aUniID, TpID: aTpID}
onuTP.mutexTPState.RLock()
defer onuTP.mutexTPState.RUnlock()
if _, existTP := onuTP.mapUniTpIndication[uniTpKey]; existTP {
@@ -941,7 +941,7 @@
// SetProfileToDelete sets the requested techProfile toDelete state (if possible)
func (onuTP *OnuUniTechProf) SetProfileToDelete(aUniID uint8, aTpID uint8, aState bool) {
- uniTpKey := uniTP{uniID: aUniID, tpID: aTpID}
+ uniTpKey := UniTP{UniID: aUniID, TpID: aTpID}
onuTP.mutexTPState.Lock()
defer onuTP.mutexTPState.Unlock()
if _, existTP := onuTP.mapUniTpIndication[uniTpKey]; existTP {
@@ -950,7 +950,7 @@
}
func (onuTP *OnuUniTechProf) getMulticastGemPorts(ctx context.Context, aUniID uint8, aTpID uint8) []uint16 {
- uniTpKey := uniTP{uniID: aUniID, tpID: aTpID}
+ uniTpKey := UniTP{UniID: aUniID, TpID: aTpID}
onuTP.mutexTPState.RLock()
defer onuTP.mutexTPState.RUnlock()
gemPortIds := make([]uint16, 0)
@@ -968,7 +968,7 @@
}
func (onuTP *OnuUniTechProf) getBidirectionalGemPortIDsForTP(ctx context.Context, aUniID uint8, aTpID uint8) []uint16 {
- uniTpKey := uniTP{uniID: aUniID, tpID: aTpID}
+ uniTpKey := UniTP{UniID: aUniID, TpID: aTpID}
onuTP.mutexTPState.RLock()
defer onuTP.mutexTPState.RUnlock()
gemPortIds := make([]uint16, 0)
@@ -1032,14 +1032,14 @@
//
//nolint:unparam
func (onuTP *OnuUniTechProf) setProfileResetting(ctx context.Context, aUniID uint8, aTpID uint8, aState bool) {
- uniTpKey := uniTP{uniID: aUniID, tpID: aTpID}
+ uniTpKey := UniTP{UniID: aUniID, TpID: aTpID}
onuTP.mutexTPState.Lock()
defer onuTP.mutexTPState.Unlock()
onuTP.tpProfileResetting[uniTpKey] = aState
}
// getProfileResetting returns true, if the the according indication for started reset procedure is set
-func (onuTP *OnuUniTechProf) getProfileResetting(aUniTpKey uniTP) bool {
+func (onuTP *OnuUniTechProf) getProfileResetting(aUniTpKey UniTP) bool {
onuTP.mutexTPState.RLock()
defer onuTP.mutexTPState.RUnlock()
if isResetting, exist := onuTP.tpProfileResetting[aUniTpKey]; exist {
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index a3cdb6c..aeeef16 100755
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -4868,9 +4868,58 @@
go dh.removeFlowItemFromUniPort(flowCb.ctx, flowCb.flowItem, flowCb.uniPort, &respChan)
}
// Block on response and tunnel it back to the caller
- *flowCb.respChan <- <-respChan
- logger.Info(flowCb.ctx, "serial-flow-processor--end",
- log.Fields{"device-id": dh.DeviceID, "absoluteTimeForFlowProcessingInSecs": time.Since(startTime).Seconds()})
+ select {
+
+ case msg := <-respChan:
+ *flowCb.respChan <- msg
+ // response sent successfully
+ logger.Info(flowCb.ctx, "serial-flow-processor--end",
+ log.Fields{"device-id": dh.DeviceID, "absoluteTimeForFlowProcessingInSecs": time.Since(startTime).Seconds()})
+ case <-flowCb.ctx.Done():
+ logger.Info(flowCb.ctx, "flow handler context cancelled or timed out", log.Fields{"device-id": dh.DeviceID})
+ // Optionally, you can handle cleanup or logging here
+ if dh.UniVlanConfigFsmMap[flowCb.uniPort.UniID] != nil && dh.UniVlanConfigFsmMap[flowCb.uniPort.UniID].PAdaptFsm != nil {
+ pVlanFilterStatemachine := dh.UniVlanConfigFsmMap[flowCb.uniPort.UniID].PAdaptFsm.PFsm
+ if pVlanFilterStatemachine != nil {
+
+ if err := pVlanFilterStatemachine.Event(avcfg.VlanEvReset); err != nil {
+ logger.Warnw(flowCb.ctx, "UniVlanConfigFsm: can't reset",
+ log.Fields{"device-id": dh.DeviceID, "err": err})
+
+ }
+
+ }
+ }
+
+ ctx2 := context.Background()
+ metadata := flow.GetMetadataFromWriteMetadataAction(ctx2, flowCb.flowItem)
+ if metadata == 0 {
+ logger.Warnw(flowCb.ctx, "AniConfigFsm: can't reset,failed to fetch metadata flow: %v",
+ log.Fields{"device-id": dh.DeviceID, "flows": flowCb.flowItem})
+ continue
+ }
+ TpID := flow.GetTechProfileIDFromWriteMetaData(ctx2, metadata)
+
+ if TpID == uint16(0) {
+ logger.Warnw(flowCb.ctx, "AniConfigFsm: can't reset,failed to fetch techprofileid flow: %v",
+ log.Fields{"device-id": dh.DeviceID, "flows": flowCb.flowItem})
+ continue
+ }
+ if dh.pOnuTP != nil {
+ // should always be the case here
+ // FSM stop maybe encapsulated as OnuTP method - perhaps later in context of module splitting
+ if dh.pOnuTP.PAniConfigFsm != nil {
+ uniTP := avcfg.UniTP{
+ UniID: flowCb.uniPort.UniID,
+ TpID: uint8(TpID),
+ }
+ if dh.pOnuTP.PAniConfigFsm[uniTP] != nil {
+ dh.pOnuTP.PAniConfigFsm[uniTP].CancelProcessing(context.Background())
+ }
+ }
+ }
+
+ }
case <-dh.stopFlowMonitoringRoutine[uniID]:
logger.Infow(context.Background(), "stopping-flow-handler-routine", log.Fields{"device-id": dh.DeviceID})
dh.setFlowMonitoringIsRunning(uniID, false)
diff --git a/internal/pkg/core/openonu.go b/internal/pkg/core/openonu.go
index 176e15e..3135a94 100755
--- a/internal/pkg/core/openonu.go
+++ b/internal/pkg/core/openonu.go
@@ -419,7 +419,7 @@
}
if handler, err := oo.getDeviceHandler(ctx, incrFlows.Device.Id, false); handler != nil {
- if flowUpdateErr := handler.FlowUpdateIncremental(log.WithSpanFromContext(context.Background(), ctx), incrFlows.Flows, incrFlows.Groups, incrFlows.FlowMetadata); flowUpdateErr != nil {
+ if flowUpdateErr := handler.FlowUpdateIncremental(ctx, incrFlows.Flows, incrFlows.Groups, incrFlows.FlowMetadata); flowUpdateErr != nil {
return nil, flowUpdateErr
}
return &empty.Empty{}, nil