[VOL-5471]:Flow add/delete channel fix
Change-Id: I44f8f1a49db9138c64df4cef76bf5deba6b668bf
Signed-off-by: balaji.nagarajan <balaji.nagarajan@radisys.com>
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index a3cdb6c..aeeef16 100755
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -4868,9 +4868,58 @@
go dh.removeFlowItemFromUniPort(flowCb.ctx, flowCb.flowItem, flowCb.uniPort, &respChan)
}
// Block on response and tunnel it back to the caller
- *flowCb.respChan <- <-respChan
- logger.Info(flowCb.ctx, "serial-flow-processor--end",
- log.Fields{"device-id": dh.DeviceID, "absoluteTimeForFlowProcessingInSecs": time.Since(startTime).Seconds()})
+ select {
+
+ case msg := <-respChan:
+ *flowCb.respChan <- msg
+ // response sent successfully
+ logger.Info(flowCb.ctx, "serial-flow-processor--end",
+ log.Fields{"device-id": dh.DeviceID, "absoluteTimeForFlowProcessingInSecs": time.Since(startTime).Seconds()})
+ case <-flowCb.ctx.Done():
+ logger.Info(flowCb.ctx, "flow handler context cancelled or timed out", log.Fields{"device-id": dh.DeviceID})
+ // Optionally, you can handle cleanup or logging here
+ if dh.UniVlanConfigFsmMap[flowCb.uniPort.UniID] != nil && dh.UniVlanConfigFsmMap[flowCb.uniPort.UniID].PAdaptFsm != nil {
+ pVlanFilterStatemachine := dh.UniVlanConfigFsmMap[flowCb.uniPort.UniID].PAdaptFsm.PFsm
+ if pVlanFilterStatemachine != nil {
+
+ if err := pVlanFilterStatemachine.Event(avcfg.VlanEvReset); err != nil {
+ logger.Warnw(flowCb.ctx, "UniVlanConfigFsm: can't reset",
+ log.Fields{"device-id": dh.DeviceID, "err": err})
+
+ }
+
+ }
+ }
+
+ ctx2 := context.Background()
+ metadata := flow.GetMetadataFromWriteMetadataAction(ctx2, flowCb.flowItem)
+ if metadata == 0 {
+ logger.Warnw(flowCb.ctx, "AniConfigFsm: can't reset,failed to fetch metadata flow: %v",
+ log.Fields{"device-id": dh.DeviceID, "flows": flowCb.flowItem})
+ continue
+ }
+ TpID := flow.GetTechProfileIDFromWriteMetaData(ctx2, metadata)
+
+ if TpID == uint16(0) {
+ logger.Warnw(flowCb.ctx, "AniConfigFsm: can't reset,failed to fetch techprofileid flow: %v",
+ log.Fields{"device-id": dh.DeviceID, "flows": flowCb.flowItem})
+ continue
+ }
+ if dh.pOnuTP != nil {
+ // should always be the case here
+ // FSM stop maybe encapsulated as OnuTP method - perhaps later in context of module splitting
+ if dh.pOnuTP.PAniConfigFsm != nil {
+ uniTP := avcfg.UniTP{
+ UniID: flowCb.uniPort.UniID,
+ TpID: uint8(TpID),
+ }
+ if dh.pOnuTP.PAniConfigFsm[uniTP] != nil {
+ dh.pOnuTP.PAniConfigFsm[uniTP].CancelProcessing(context.Background())
+ }
+ }
+ }
+
+ }
case <-dh.stopFlowMonitoringRoutine[uniID]:
logger.Infow(context.Background(), "stopping-flow-handler-routine", log.Fields{"device-id": dh.DeviceID})
dh.setFlowMonitoringIsRunning(uniID, false)
diff --git a/internal/pkg/core/openonu.go b/internal/pkg/core/openonu.go
index 176e15e..3135a94 100755
--- a/internal/pkg/core/openonu.go
+++ b/internal/pkg/core/openonu.go
@@ -419,7 +419,7 @@
}
if handler, err := oo.getDeviceHandler(ctx, incrFlows.Device.Id, false); handler != nil {
- if flowUpdateErr := handler.FlowUpdateIncremental(log.WithSpanFromContext(context.Background(), ctx), incrFlows.Flows, incrFlows.Groups, incrFlows.FlowMetadata); flowUpdateErr != nil {
+ if flowUpdateErr := handler.FlowUpdateIncremental(ctx, incrFlows.Flows, incrFlows.Groups, incrFlows.FlowMetadata); flowUpdateErr != nil {
return nil, flowUpdateErr
}
return &empty.Empty{}, nil