[VOL-5536] - VGC recent fixes
Change-Id: Id6f0e647b37baac827230afbb45d132df8a62b68
Signed-off-by: Sridhar Ravindra <sridhar.ravindra@radisys.com>
diff --git a/internal/pkg/application/application.go b/internal/pkg/application/application.go
index cc5e331..9836cea 100644
--- a/internal/pkg/application/application.go
+++ b/internal/pkg/application/application.go
@@ -416,7 +416,7 @@
for _, vpv := range vnets.([]*VoltPortVnet) {
vpv.VpvLock.Lock()
- vpv.PortUpInd(cntx, d, port, "")
+ vpv.PortUpInd(cntx, d, port, "", false)
vpv.VpvLock.Unlock()
}
return true
@@ -1451,7 +1451,7 @@
// Port UP indication is passed to all services associated with the port
// so that the services can configure flows applicable when the port goes
// up from down state
-func (va *VoltApplication) PortUpInd(cntx context.Context, device string, port string) {
+func (va *VoltApplication) PortUpInd(cntx context.Context, device string, port string, skipFlowPushToVoltha bool) {
logger.Infow(ctx, "Received Southbound Port Ind: UP", log.Fields{"Device": device, "Port": port})
d := va.GetDevice(device)
@@ -1505,7 +1505,7 @@
// part of service delete (during the lock wait duration)
// In that case, the services associated wil be zero
if vpv.servicesCount.Load() != 0 {
- vpv.PortUpInd(cntx, d, port, nniPort)
+ vpv.PortUpInd(cntx, d, port, nniPort, skipFlowPushToVoltha)
}
} else {
// Service not activated, still attach device to service
@@ -1838,6 +1838,33 @@
}
}
+func (va *VoltApplication) GetAllFlowsForSvc(cntx context.Context, flow *of.VoltSubFlow, devID string, devSerialNum string) []uint64 {
+ devConfig := va.GetDeviceConfig(devSerialNum)
+ portNo := util.GetUniPortFromFlow(devConfig.UplinkPort, devConfig.NniPorts, flow)
+ portName, err := va.GetPortName(portNo)
+
+ if err != nil {
+ logger.Warnw(ctx, "Error getting port name", log.Fields{"Reason": err.Error(), "PortID": flow.Match.InPort})
+ return nil
+ } else if portName == "" {
+ logger.Warnw(ctx, "Port does not exist", log.Fields{"PortID": flow.Match.InPort})
+ return nil
+ }
+ svc := va.GetServiceNameFromCookie(flow.Cookie, portName, uint8(of.PbitMatchNone), devID, flow.Match.TableMetadata)
+ if svc != nil {
+ dsFlows := make([]uint64, 0)
+ for cookie, ok := range svc.AssociatedFlows {
+ if ok {
+ if val, err := strconv.ParseUint(cookie, 10, 64); err == nil {
+ dsFlows = append(dsFlows, val)
+ }
+ }
+ }
+ return dsFlows
+ }
+ return nil
+}
+
// CheckAndDeactivateService - check if the attempts for flow delete has reached threshold or not
func (va *VoltApplication) CheckAndDeactivateService(ctx context.Context, flow *of.VoltSubFlow, devSerialNum string, devID string) {
logger.Debugw(ctx, "Check and Deactivate service", log.Fields{"Cookie": flow.Cookie, "FlowCount": flow.FlowCount, "DeviceSerial": devSerialNum})
diff --git a/internal/pkg/application/application_test.go b/internal/pkg/application/application_test.go
index 5eef192..7a3b314 100644
--- a/internal/pkg/application/application_test.go
+++ b/internal/pkg/application/application_test.go
@@ -2611,7 +2611,7 @@
dbintf := mocks.NewMockDBIntf(gomock.NewController(t))
db = dbintf
dbintf.EXPECT().PutVpv(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
- va.PortUpInd(tt.args.cntx, tt.args.device, tt.args.port)
+ va.PortUpInd(tt.args.cntx, tt.args.device, tt.args.port, false)
})
}
}
diff --git a/internal/pkg/application/igmpgroupchannel.go b/internal/pkg/application/igmpgroupchannel.go
index 56f189c..6bb4452 100644
--- a/internal/pkg/application/igmpgroupchannel.go
+++ b/internal/pkg/application/igmpgroupchannel.go
@@ -581,7 +581,7 @@
return
}
port, _ := GetApplication().GetNniPort(igc.Device)
- _ = cntlr.GetController().AddFlows(cntx, port, igc.Device, flow)
+ _ = cntlr.GetController().AddFlows(cntx, port, igc.Device, flow, false)
}
// DelMcFlow deletes flow from the device when the last receiver leaves
diff --git a/internal/pkg/application/igmpprofiles.go b/internal/pkg/application/igmpprofiles.go
index 673dacc..10c7491 100644
--- a/internal/pkg/application/igmpprofiles.go
+++ b/internal/pkg/application/igmpprofiles.go
@@ -497,7 +497,7 @@
if err1 != nil {
logger.Errorw(ctx, "Error getting NNI port", log.Fields{"Error": err1})
}
- err = cntlr.GetController().AddFlows(cntx, nniPort, device, flows)
+ err = cntlr.GetController().AddFlows(cntx, nniPort, device, flows, false)
if err != nil {
logger.Warnw(ctx, "Configuring IGMP Flow for device failed ", log.Fields{"Device": device, "err": err})
return err
diff --git a/internal/pkg/application/service.go b/internal/pkg/application/service.go
index bb2966f..ec10fb3 100644
--- a/internal/pkg/application/service.go
+++ b/internal/pkg/application/service.go
@@ -300,12 +300,12 @@
// AddHsiaFlows - Adds US & DS HSIA Flows for the service
func (vs *VoltService) AddHsiaFlows(cntx context.Context) {
logger.Debugw(ctx, "Add US & DS HSIA Flows for the service", log.Fields{"ServiceName": vs.Name})
- if err := vs.AddUsHsiaFlows(cntx); err != nil {
+ if err := vs.AddUsHsiaFlows(cntx, false); err != nil {
logger.Errorw(ctx, "Error adding US HSIA Flows", log.Fields{"Service": vs.Name, "Port": vs.Port, "Reason": err.Error()})
statusCode, statusMessage := errorCodes.GetErrorInfo(err)
vs.triggerServiceFailureInd(statusCode, statusMessage)
}
- if err := vs.AddDsHsiaFlows(cntx); err != nil {
+ if err := vs.AddDsHsiaFlows(cntx, false); err != nil {
logger.Errorw(ctx, "Error adding DS HSIA Flows", log.Fields{"Service": vs.Name, "Port": vs.Port, "Reason": err.Error()})
statusCode, statusMessage := errorCodes.GetErrorInfo(err)
vs.triggerServiceFailureInd(statusCode, statusMessage)
@@ -348,7 +348,7 @@
}
// AddUsHsiaFlows - Add US HSIA Flows for the service
-func (vs *VoltService) AddUsHsiaFlows(cntx context.Context) error {
+func (vs *VoltService) AddUsHsiaFlows(cntx context.Context, skipFlowPushToVoltha bool) error {
logger.Infow(ctx, "Configuring US HSIA Service Flows", log.Fields{"Device": vs.Device, "ServiceName": vs.Name, "Port": vs.Port})
if vs.DeleteInProgress || vs.UpdateInProgress {
logger.Warnw(ctx, "Ignoring US HSIA Flow Push, Service deleteion In-Progress", log.Fields{"Device": vs.Device, "Service": vs.Name})
@@ -387,7 +387,7 @@
continue
}
usflows.MigrateCookie = vgcRebooted
- if err := vs.AddFlows(cntx, device, usflows); err != nil {
+ if err := vs.AddFlows(cntx, device, usflows, skipFlowPushToVoltha); err != nil {
logger.Errorw(ctx, "Error adding HSIA US flows", log.Fields{"Device": vs.Device, "Service": vs.Name, "Reason": err.Error()})
statusCode, statusMessage := errorCodes.GetErrorInfo(err)
vs.triggerServiceFailureInd(statusCode, statusMessage)
@@ -401,7 +401,7 @@
}
// AddDsHsiaFlows - Add DS HSIA Flows for the service
-func (vs *VoltService) AddDsHsiaFlows(cntx context.Context) error {
+func (vs *VoltService) AddDsHsiaFlows(cntx context.Context, skipFlowPushToVoltha bool) error {
logger.Infow(ctx, "Configuring DS HSIA Service Flows", log.Fields{"Device": vs.Device, "ServiceName": vs.Name, "Port": vs.Port})
if vs.DeleteInProgress {
logger.Warnw(ctx, "Ignoring DS HSIA Flow Push, Service deleteion In-Progress", log.Fields{"Device": vs.Device, "Service": vs.Name})
@@ -428,7 +428,7 @@
return fmt.Errorf("Error Building HSIA DS flows for Service %s and Port %s : %w", vs.Name, vs.Port, err)
}
dsflows.MigrateCookie = vgcRebooted
- if err = vs.AddFlows(cntx, device, dsflows); err != nil {
+ if err = vs.AddFlows(cntx, device, dsflows, skipFlowPushToVoltha); err != nil {
logger.Errorw(ctx, "Failed to add HSIA DS flows", log.Fields{"Device": vs.Device, "Service": vs.Name, "Reason": err})
statusCode, statusMessage := errorCodes.GetErrorInfo(err)
vs.triggerServiceFailureInd(statusCode, statusMessage)
@@ -442,7 +442,7 @@
}
logger.Debug(ctx, "Add-one-match-all-pbit-flow")
dsflows.MigrateCookie = vgcRebooted
- if err := vs.AddFlows(cntx, device, dsflows); err != nil {
+ if err := vs.AddFlows(cntx, device, dsflows, skipFlowPushToVoltha); err != nil {
logger.Errorw(ctx, "Failed to add HSIA DS flows", log.Fields{"Device": vs.Device, "Service": vs.Name, "Reason": err})
statusCode, statusMessage := errorCodes.GetErrorInfo(err)
vs.triggerServiceFailureInd(statusCode, statusMessage)
@@ -457,7 +457,7 @@
continue
}
dsflows.MigrateCookie = vgcRebooted
- if err := vs.AddFlows(cntx, device, dsflows); err != nil {
+ if err := vs.AddFlows(cntx, device, dsflows, skipFlowPushToVoltha); err != nil {
logger.Errorw(ctx, "Failed to Add HSIA DS flows", log.Fields{"Device": vs.Device, "Service": vs.Name, "Reason": err})
statusCode, statusMessage := errorCodes.GetErrorInfo(err)
vs.triggerServiceFailureInd(statusCode, statusMessage)
@@ -1132,8 +1132,10 @@
}
//}
- AppMutex.ServiceDataMutex.Lock()
- defer AppMutex.ServiceDataMutex.Unlock()
+ // The global ServiceDataMutex lock is redundant here and
+ // can be safely removed to avoid unnecessary contention and improve performance.
+ // AppMutex.ServiceDataMutex.Lock()
+ // defer AppMutex.ServiceDataMutex.Unlock()
// Add the service to the VNET
vnet := va.GetVnet(cfg.SVlan, cfg.CVlan, cfg.UniVlan)
@@ -1315,7 +1317,7 @@
// AddFlows - Adds the flow to the service
// Triggers flow addition after registering for flow indication event
-func (vs *VoltService) AddFlows(cntx context.Context, device *VoltDevice, flow *of.VoltFlow) error {
+func (vs *VoltService) AddFlows(cntx context.Context, device *VoltDevice, flow *of.VoltFlow, skipFlowPushToVoltha bool) error {
// Using locks instead of concurrent map for PendingFlows to avoid
// race condition during flow response indication processing
vs.ServiceLock.Lock()
@@ -1333,7 +1335,7 @@
device.RegisterFlowAddEvent(cookie, fe)
vs.PendingFlows[cookie] = true
}
- return controller.GetController().AddFlows(cntx, vs.Port, device.Name, flow)
+ return controller.GetController().AddFlows(cntx, vs.Port, device.Name, flow, skipFlowPushToVoltha)
}
// FlowInstallSuccess - Called when corresponding service flow installation is success
@@ -2227,7 +2229,7 @@
// VGC once service is activated remembers and pushes the flows again
// if there was a restart in VGC during the execution of the go routine.
// Making it as a go routine will not impact anything
- go vpv.PortUpInd(cntx, device, portNo, vs.NniPort)
+ go vpv.PortUpInd(cntx, device, portNo, vs.NniPort, false)
} else {
logger.Warnw(ctx, "VPV does not exists!!!", log.Fields{"Device": deviceID, "port": portNo, "SvcName": vs.Name})
}
diff --git a/internal/pkg/application/service_test.go b/internal/pkg/application/service_test.go
index b105d7f..f54feff 100644
--- a/internal/pkg/application/service_test.go
+++ b/internal/pkg/application/service_test.go
@@ -686,7 +686,7 @@
DeleteInProgress: true,
},
}
- err := vs.AddUsHsiaFlows(tt.args.cntx)
+ err := vs.AddUsHsiaFlows(tt.args.cntx, false)
assert.Nil(t, err)
case "GetDeviceFromPort_error":
vs := &VoltService{
@@ -694,7 +694,7 @@
DeleteInProgress: false,
},
}
- err := vs.AddUsHsiaFlows(tt.args.cntx)
+ err := vs.AddUsHsiaFlows(tt.args.cntx, false)
assert.NotNil(t, err)
case "DeviceState_down":
vs := &VoltService{
@@ -708,7 +708,7 @@
ga := GetApplication()
ga.PortsDisc.Store("test_port", voltPort)
ga.DevicesDisc.Store(test_device, voltDevice1)
- err := vs.AddUsHsiaFlows(tt.args.cntx)
+ err := vs.AddUsHsiaFlows(tt.args.cntx, false)
assert.Nil(t, err)
}
})
diff --git a/internal/pkg/application/vnets.go b/internal/pkg/application/vnets.go
index 3f1cbae..7b70315 100644
--- a/internal/pkg/application/vnets.go
+++ b/internal/pkg/application/vnets.go
@@ -672,9 +672,9 @@
}
// RangeOnServices to call a function on all services on the vpv
-func (vpv *VoltPortVnet) RangeOnServices(cntx context.Context, callback func(cntx context.Context, key, value interface{}, flag bool) bool, delFlowsInDevice bool) {
+func (vpv *VoltPortVnet) RangeOnServices(cntx context.Context, callback func(cntx context.Context, key, value interface{}, flag bool) bool, flag bool) {
vpv.services.Range(func(key, value interface{}) bool {
- return callback(cntx, key, value, delFlowsInDevice)
+ return callback(cntx, key, value, flag)
})
}
@@ -747,7 +747,7 @@
// vpv.DsFlowsApplied = false
// vpv.UsFlowsApplied = false
vpv.VpvLock.Lock()
- vpv.PortUpInd(cntx, d, vpv.Port, "")
+ vpv.PortUpInd(cntx, d, vpv.Port, "", false)
vpv.VpvLock.Unlock()
}
@@ -756,7 +756,7 @@
// again here to apply the latest configuration if the configuration
// changed. Thus, a reboot of ONT forces the new configuration to get
// applied.
-func (vpv *VoltPortVnet) PortUpInd(cntx context.Context, device *VoltDevice, port string, nniPort string) {
+func (vpv *VoltPortVnet) PortUpInd(cntx context.Context, device *VoltDevice, port string, nniPort string, skipFlowPushToVoltha bool) {
logger.Infow(ctx, "Port UP Ind, pushing flows for the port", log.Fields{"Device": device, "Port": port, "VnetDhcp": vpv.DhcpRelay, "McastService": vpv.McastService})
if vpv.DeleteInProgress {
logger.Warnw(ctx, "Ignoring VPV Port UP Ind, VPV deletion In-Progress", log.Fields{"Device": device, "Port": port, "Vnet": vpv.VnetName})
@@ -789,23 +789,23 @@
logger.Infow(ctx, "Port Up - Trap Flows", log.Fields{"Device": device.Name, "Port": port})
// no HSIA flows for multicast service and DPU_MGMT Service
if !vpv.McastService && vpv.VnetType != DpuMgmtTraffic {
- vpv.RangeOnServices(cntx, AddUsHsiaFlows, false)
+ vpv.RangeOnServices(cntx, AddUsHsiaFlows, skipFlowPushToVoltha)
}
if vpv.VnetType == DpuMgmtTraffic {
- vpv.RangeOnServices(cntx, AddMeterToDevice, false)
+ vpv.RangeOnServices(cntx, AddMeterToDevice, skipFlowPushToVoltha)
}
vpv.AddTrapFlows(cntx)
if vpv.MacLearning == MacLearningNone || NonZeroMacAddress(vpv.MacAddr) {
logger.Infow(ctx, "Port Up - DS Flows", log.Fields{"Device": device.Name, "Port": port})
/*In case of DPU_MGMT_TRAFFIC, need to install both US and DS traffic */
if vpv.VnetType == DpuMgmtTraffic {
- vpv.RangeOnServices(cntx, AddUsHsiaFlows, false)
+ vpv.RangeOnServices(cntx, AddUsHsiaFlows, skipFlowPushToVoltha)
}
// US & DS DHCP, US HSIA flows are already installed
// install only DS HSIA flow here.
// no HSIA flows for multicast service
if !vpv.McastService {
- vpv.RangeOnServices(cntx, AddDsHsiaFlows, false)
+ vpv.RangeOnServices(cntx, AddDsHsiaFlows, skipFlowPushToVoltha)
}
}
} else {
@@ -816,25 +816,25 @@
// however is not seen as a real use case.
logger.Debugw(ctx, "Port Up - Service Flows", log.Fields{"Device": device.Name, "Port": port})
if !vpv.McastService {
- vpv.RangeOnServices(cntx, AddUsHsiaFlows, false)
+ vpv.RangeOnServices(cntx, AddUsHsiaFlows, skipFlowPushToVoltha)
}
vpv.AddTrapFlows(cntx)
if !vpv.McastService {
- vpv.RangeOnServices(cntx, AddDsHsiaFlows, false)
+ vpv.RangeOnServices(cntx, AddDsHsiaFlows, skipFlowPushToVoltha)
}
}
// Process IGMP proxy - install IGMP trap rules before DHCP trap rules
if vpv.IgmpEnabled {
logger.Debugw(ctx, "Port Up - IGMP Flows", log.Fields{"Device": device.Name, "Port": port})
- vpv.RangeOnServices(cntx, AddSvcUsMeterToDevice, false)
+ vpv.RangeOnServices(cntx, AddSvcUsMeterToDevice, skipFlowPushToVoltha)
if err := vpv.AddIgmpFlows(cntx); err != nil {
statusCode, statusMessage := errorCodes.GetErrorInfo(err)
vpv.FlowInstallFailure("VGC processing failure", statusCode, statusMessage)
}
if vpv.McastService {
- vpv.RangeOnServices(cntx, PostAccessConfigSuccessInd, false)
+ vpv.RangeOnServices(cntx, PostAccessConfigSuccessInd, skipFlowPushToVoltha)
}
}
@@ -1133,7 +1133,7 @@
if NonZeroMacAddress(vpv.MacAddr) || svc.MacLearning == MacLearningNone {
svc.AddHsiaFlows(cntx)
} else {
- if err := svc.AddUsHsiaFlows(cntx); err != nil {
+ if err := svc.AddUsHsiaFlows(cntx, false); err != nil {
logger.Warnw(ctx, "Add US hsia flow failed", log.Fields{"service": svc.Name, "Error": err})
}
}
@@ -1194,7 +1194,7 @@
func AddUsHsiaFlows(cntx context.Context, key, value interface{}, flag bool) bool {
svc := value.(*VoltService)
logger.Debugw(ctx, "Add US Hsia Flows", log.Fields{"ServiceName": svc.Name})
- if err := svc.AddUsHsiaFlows(cntx); err != nil {
+ if err := svc.AddUsHsiaFlows(cntx, flag); err != nil {
logger.Warnw(ctx, "Add US hsia flow failed", log.Fields{"service": svc.Name, "Error": err})
}
return true
@@ -1204,7 +1204,7 @@
func AddDsHsiaFlows(cntx context.Context, key, value interface{}, flag bool) bool {
svc := value.(*VoltService)
logger.Debugw(ctx, "Add DS Hsia Flows", log.Fields{"ServiceName": svc.Name})
- if err := svc.AddDsHsiaFlows(cntx); err != nil {
+ if err := svc.AddDsHsiaFlows(cntx, flag); err != nil {
logger.Warnw(ctx, "Add DS hsia flow failed", log.Fields{"service": svc.Name, "Error": err})
}
return true
@@ -1816,7 +1816,7 @@
vd.RegisterFlowAddEvent(cookie, fe)
}
}
- if err1 := controller.GetController().AddFlows(cntx, vpv.Port, device.Name, flows); err1 != nil {
+ if err1 := controller.GetController().AddFlows(cntx, vpv.Port, device.Name, flows, false); err1 != nil {
return err1
}
} else {
@@ -2469,7 +2469,7 @@
if p != nil {
logger.Debugw(ctx, "Checking UNI port state", log.Fields{"State": p.State})
if d.State == controller.DeviceStateUP && p.State == PortStateUp {
- vpv.PortUpInd(cntx, d, port, vs.NniPort)
+ vpv.PortUpInd(cntx, d, port, vs.NniPort, false)
}
}
}
@@ -2729,7 +2729,7 @@
// Pushing ICMPv6 Flow
flow := BuildICMPv6Flow(portID, vnet)
- err = controller.GetController().AddFlows(cntx, nniPort, device.Name, flow)
+ err = controller.GetController().AddFlows(cntx, nniPort, device.Name, flow, false)
if err != nil {
logger.Warnw(ctx, "Configuring ICMPv6 Flow for device failed ", log.Fields{"Device": device.Name, "err": err})
return true
@@ -2738,7 +2738,7 @@
// Pushing ARP Flow
flow = BuildDSArpFlow(portID, vnet)
- err = controller.GetController().AddFlows(cntx, nniPort, device.Name, flow)
+ err = controller.GetController().AddFlows(cntx, nniPort, device.Name, flow, false)
if err != nil {
logger.Warnw(ctx, "Configuring ARP Flow for device failed ", log.Fields{"Device": device.Name, "err": err})
return true
@@ -2756,7 +2756,7 @@
func (va *VoltApplication) PushTrapFlows(cntx context.Context, device *VoltDevice, nniPort string, flow *of.VoltFlow) error {
logger.Debugw(ctx, "Push NNI DHCP Trap Flows", log.Fields{"DeviceName": device.Name, "Flow port": flow.PortID})
- return controller.GetController().AddFlows(cntx, nniPort, device.Name, flow)
+ return controller.GetController().AddFlows(cntx, nniPort, device.Name, flow, false)
}
// PushDevFlowForDevice to push icmpv6 flows for device
@@ -2790,7 +2790,7 @@
return true
}
flow := BuildICMPv6Flow(nniPortID, vnet)
- err = controller.GetController().AddFlows(cntx, nniPort, device.Name, flow)
+ err = controller.GetController().AddFlows(cntx, nniPort, device.Name, flow, false)
if err != nil {
logger.Warnw(ctx, "Configuring ICMPv6 Flow for device failed ", log.Fields{"Device": device.Name, "err": err})
return true
@@ -2798,7 +2798,7 @@
logger.Infow(ctx, "ICMP Flow Added to Queue", log.Fields{"flow": flow})
flow = BuildDSArpFlow(nniPortID, vnet)
- err = controller.GetController().AddFlows(cntx, nniPort, device.Name, flow)
+ err = controller.GetController().AddFlows(cntx, nniPort, device.Name, flow, false)
if err != nil {
logger.Warnw(ctx, "Configuring ARP Flow for device failed ", log.Fields{"Device": device.Name, "err": err})
return true
@@ -3128,7 +3128,7 @@
}
device.RegisterFlowAddEvent(cookie, fe)
}
- return controller.GetController().AddFlows(cntx, vpv.Port, device.Name, flow)
+ return controller.GetController().AddFlows(cntx, vpv.Port, device.Name, flow, false)
}
// FlowInstallFailure - Process flow failure indication and triggers HSIA failure for all associated services