[VOL-5536] - VGC recent fixes

Change-Id: Id6f0e647b37baac827230afbb45d132df8a62b68
Signed-off-by: Sridhar Ravindra <sridhar.ravindra@radisys.com>
diff --git a/internal/pkg/application/application.go b/internal/pkg/application/application.go
index cc5e331..9836cea 100644
--- a/internal/pkg/application/application.go
+++ b/internal/pkg/application/application.go
@@ -416,7 +416,7 @@
 
 		for _, vpv := range vnets.([]*VoltPortVnet) {
 			vpv.VpvLock.Lock()
-			vpv.PortUpInd(cntx, d, port, "")
+			vpv.PortUpInd(cntx, d, port, "", false)
 			vpv.VpvLock.Unlock()
 		}
 		return true
@@ -1451,7 +1451,7 @@
 // Port UP indication is passed to all services associated with the port
 // so that the services can configure flows applicable when the port goes
 // up from down state
-func (va *VoltApplication) PortUpInd(cntx context.Context, device string, port string) {
+func (va *VoltApplication) PortUpInd(cntx context.Context, device string, port string, skipFlowPushToVoltha bool) {
 	logger.Infow(ctx, "Received Southbound Port Ind: UP", log.Fields{"Device": device, "Port": port})
 	d := va.GetDevice(device)
 
@@ -1505,7 +1505,7 @@
 			// part of service delete (during the lock wait duration)
 			// In that case, the services associated wil be zero
 			if vpv.servicesCount.Load() != 0 {
-				vpv.PortUpInd(cntx, d, port, nniPort)
+				vpv.PortUpInd(cntx, d, port, nniPort, skipFlowPushToVoltha)
 			}
 		} else {
 			// Service not activated, still attach device to service
@@ -1838,6 +1838,33 @@
 	}
 }
 
+func (va *VoltApplication) GetAllFlowsForSvc(cntx context.Context, flow *of.VoltSubFlow, devID string, devSerialNum string) []uint64 {
+	devConfig := va.GetDeviceConfig(devSerialNum)
+	portNo := util.GetUniPortFromFlow(devConfig.UplinkPort, devConfig.NniPorts, flow)
+	portName, err := va.GetPortName(portNo)
+
+	if err != nil {
+		logger.Warnw(ctx, "Error getting port name", log.Fields{"Reason": err.Error(), "PortID": flow.Match.InPort})
+		return nil
+	} else if portName == "" {
+		logger.Warnw(ctx, "Port does not exist", log.Fields{"PortID": flow.Match.InPort})
+		return nil
+	}
+	svc := va.GetServiceNameFromCookie(flow.Cookie, portName, uint8(of.PbitMatchNone), devID, flow.Match.TableMetadata)
+	if svc != nil {
+		dsFlows := make([]uint64, 0)
+		for cookie, ok := range svc.AssociatedFlows {
+			if ok {
+				if val, err := strconv.ParseUint(cookie, 10, 64); err == nil {
+					dsFlows = append(dsFlows, val)
+				}
+			}
+		}
+		return dsFlows
+	}
+	return nil
+}
+
 // CheckAndDeactivateService - check if the attempts for flow delete has reached threshold or not
 func (va *VoltApplication) CheckAndDeactivateService(ctx context.Context, flow *of.VoltSubFlow, devSerialNum string, devID string) {
 	logger.Debugw(ctx, "Check and Deactivate service", log.Fields{"Cookie": flow.Cookie, "FlowCount": flow.FlowCount, "DeviceSerial": devSerialNum})
diff --git a/internal/pkg/application/application_test.go b/internal/pkg/application/application_test.go
index 5eef192..7a3b314 100644
--- a/internal/pkg/application/application_test.go
+++ b/internal/pkg/application/application_test.go
@@ -2611,7 +2611,7 @@
 			dbintf := mocks.NewMockDBIntf(gomock.NewController(t))
 			db = dbintf
 			dbintf.EXPECT().PutVpv(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
-			va.PortUpInd(tt.args.cntx, tt.args.device, tt.args.port)
+			va.PortUpInd(tt.args.cntx, tt.args.device, tt.args.port, false)
 		})
 	}
 }
diff --git a/internal/pkg/application/igmpgroupchannel.go b/internal/pkg/application/igmpgroupchannel.go
index 56f189c..6bb4452 100644
--- a/internal/pkg/application/igmpgroupchannel.go
+++ b/internal/pkg/application/igmpgroupchannel.go
@@ -581,7 +581,7 @@
 		return
 	}
 	port, _ := GetApplication().GetNniPort(igc.Device)
-	_ = cntlr.GetController().AddFlows(cntx, port, igc.Device, flow)
+	_ = cntlr.GetController().AddFlows(cntx, port, igc.Device, flow, false)
 }
 
 // DelMcFlow deletes flow from the device when the last receiver leaves
diff --git a/internal/pkg/application/igmpprofiles.go b/internal/pkg/application/igmpprofiles.go
index 673dacc..10c7491 100644
--- a/internal/pkg/application/igmpprofiles.go
+++ b/internal/pkg/application/igmpprofiles.go
@@ -497,7 +497,7 @@
 			if err1 != nil {
 				logger.Errorw(ctx, "Error getting NNI port", log.Fields{"Error": err1})
 			}
-			err = cntlr.GetController().AddFlows(cntx, nniPort, device, flows)
+			err = cntlr.GetController().AddFlows(cntx, nniPort, device, flows, false)
 			if err != nil {
 				logger.Warnw(ctx, "Configuring IGMP Flow for device failed ", log.Fields{"Device": device, "err": err})
 				return err
diff --git a/internal/pkg/application/service.go b/internal/pkg/application/service.go
index bb2966f..ec10fb3 100644
--- a/internal/pkg/application/service.go
+++ b/internal/pkg/application/service.go
@@ -300,12 +300,12 @@
 // AddHsiaFlows - Adds US & DS HSIA Flows for the service
 func (vs *VoltService) AddHsiaFlows(cntx context.Context) {
 	logger.Debugw(ctx, "Add US & DS HSIA Flows for the service", log.Fields{"ServiceName": vs.Name})
-	if err := vs.AddUsHsiaFlows(cntx); err != nil {
+	if err := vs.AddUsHsiaFlows(cntx, false); err != nil {
 		logger.Errorw(ctx, "Error adding US HSIA Flows", log.Fields{"Service": vs.Name, "Port": vs.Port, "Reason": err.Error()})
 		statusCode, statusMessage := errorCodes.GetErrorInfo(err)
 		vs.triggerServiceFailureInd(statusCode, statusMessage)
 	}
-	if err := vs.AddDsHsiaFlows(cntx); err != nil {
+	if err := vs.AddDsHsiaFlows(cntx, false); err != nil {
 		logger.Errorw(ctx, "Error adding DS HSIA Flows", log.Fields{"Service": vs.Name, "Port": vs.Port, "Reason": err.Error()})
 		statusCode, statusMessage := errorCodes.GetErrorInfo(err)
 		vs.triggerServiceFailureInd(statusCode, statusMessage)
@@ -348,7 +348,7 @@
 }
 
 // AddUsHsiaFlows - Add US HSIA Flows for the service
-func (vs *VoltService) AddUsHsiaFlows(cntx context.Context) error {
+func (vs *VoltService) AddUsHsiaFlows(cntx context.Context, skipFlowPushToVoltha bool) error {
 	logger.Infow(ctx, "Configuring US HSIA Service Flows", log.Fields{"Device": vs.Device, "ServiceName": vs.Name, "Port": vs.Port})
 	if vs.DeleteInProgress || vs.UpdateInProgress {
 		logger.Warnw(ctx, "Ignoring US HSIA Flow Push, Service deleteion In-Progress", log.Fields{"Device": vs.Device, "Service": vs.Name})
@@ -387,7 +387,7 @@
 				continue
 			}
 			usflows.MigrateCookie = vgcRebooted
-			if err := vs.AddFlows(cntx, device, usflows); err != nil {
+			if err := vs.AddFlows(cntx, device, usflows, skipFlowPushToVoltha); err != nil {
 				logger.Errorw(ctx, "Error adding HSIA US flows", log.Fields{"Device": vs.Device, "Service": vs.Name, "Reason": err.Error()})
 				statusCode, statusMessage := errorCodes.GetErrorInfo(err)
 				vs.triggerServiceFailureInd(statusCode, statusMessage)
@@ -401,7 +401,7 @@
 }
 
 // AddDsHsiaFlows - Add DS HSIA Flows for the service
-func (vs *VoltService) AddDsHsiaFlows(cntx context.Context) error {
+func (vs *VoltService) AddDsHsiaFlows(cntx context.Context, skipFlowPushToVoltha bool) error {
 	logger.Infow(ctx, "Configuring DS HSIA Service Flows", log.Fields{"Device": vs.Device, "ServiceName": vs.Name, "Port": vs.Port})
 	if vs.DeleteInProgress {
 		logger.Warnw(ctx, "Ignoring DS HSIA Flow Push, Service deleteion In-Progress", log.Fields{"Device": vs.Device, "Service": vs.Name})
@@ -428,7 +428,7 @@
 				return fmt.Errorf("Error Building HSIA DS flows for Service %s and Port %s  : %w", vs.Name, vs.Port, err)
 			}
 			dsflows.MigrateCookie = vgcRebooted
-			if err = vs.AddFlows(cntx, device, dsflows); err != nil {
+			if err = vs.AddFlows(cntx, device, dsflows, skipFlowPushToVoltha); err != nil {
 				logger.Errorw(ctx, "Failed to add HSIA DS flows", log.Fields{"Device": vs.Device, "Service": vs.Name, "Reason": err})
 				statusCode, statusMessage := errorCodes.GetErrorInfo(err)
 				vs.triggerServiceFailureInd(statusCode, statusMessage)
@@ -442,7 +442,7 @@
 				}
 				logger.Debug(ctx, "Add-one-match-all-pbit-flow")
 				dsflows.MigrateCookie = vgcRebooted
-				if err := vs.AddFlows(cntx, device, dsflows); err != nil {
+				if err := vs.AddFlows(cntx, device, dsflows, skipFlowPushToVoltha); err != nil {
 					logger.Errorw(ctx, "Failed to add HSIA DS flows", log.Fields{"Device": vs.Device, "Service": vs.Name, "Reason": err})
 					statusCode, statusMessage := errorCodes.GetErrorInfo(err)
 					vs.triggerServiceFailureInd(statusCode, statusMessage)
@@ -457,7 +457,7 @@
 						continue
 					}
 					dsflows.MigrateCookie = vgcRebooted
-					if err := vs.AddFlows(cntx, device, dsflows); err != nil {
+					if err := vs.AddFlows(cntx, device, dsflows, skipFlowPushToVoltha); err != nil {
 						logger.Errorw(ctx, "Failed to Add HSIA DS flows", log.Fields{"Device": vs.Device, "Service": vs.Name, "Reason": err})
 						statusCode, statusMessage := errorCodes.GetErrorInfo(err)
 						vs.triggerServiceFailureInd(statusCode, statusMessage)
@@ -1132,8 +1132,10 @@
 	}
 	//}
 
-	AppMutex.ServiceDataMutex.Lock()
-	defer AppMutex.ServiceDataMutex.Unlock()
+	// The global ServiceDataMutex lock is redundant here and
+	// can be safely removed to avoid unnecessary contention and improve performance.
+	// AppMutex.ServiceDataMutex.Lock()
+	// defer AppMutex.ServiceDataMutex.Unlock()
 
 	// Add the service to the VNET
 	vnet := va.GetVnet(cfg.SVlan, cfg.CVlan, cfg.UniVlan)
@@ -1315,7 +1317,7 @@
 
 // AddFlows - Adds the flow to the service
 // Triggers flow addition after registering for flow indication event
-func (vs *VoltService) AddFlows(cntx context.Context, device *VoltDevice, flow *of.VoltFlow) error {
+func (vs *VoltService) AddFlows(cntx context.Context, device *VoltDevice, flow *of.VoltFlow, skipFlowPushToVoltha bool) error {
 	// Using locks instead of concurrent map for PendingFlows to avoid
 	// race condition during flow response indication processing
 	vs.ServiceLock.Lock()
@@ -1333,7 +1335,7 @@
 		device.RegisterFlowAddEvent(cookie, fe)
 		vs.PendingFlows[cookie] = true
 	}
-	return controller.GetController().AddFlows(cntx, vs.Port, device.Name, flow)
+	return controller.GetController().AddFlows(cntx, vs.Port, device.Name, flow, skipFlowPushToVoltha)
 }
 
 // FlowInstallSuccess - Called when corresponding service flow installation is success
@@ -2227,7 +2229,7 @@
 						// VGC once service is activated remembers and pushes the flows again
 						// if there was a restart in VGC during the execution of the go routine.
 						// Making it as a go routine will not impact anything
-						go vpv.PortUpInd(cntx, device, portNo, vs.NniPort)
+						go vpv.PortUpInd(cntx, device, portNo, vs.NniPort, false)
 					} else {
 						logger.Warnw(ctx, "VPV does not exists!!!", log.Fields{"Device": deviceID, "port": portNo, "SvcName": vs.Name})
 					}
diff --git a/internal/pkg/application/service_test.go b/internal/pkg/application/service_test.go
index b105d7f..f54feff 100644
--- a/internal/pkg/application/service_test.go
+++ b/internal/pkg/application/service_test.go
@@ -686,7 +686,7 @@
 						DeleteInProgress: true,
 					},
 				}
-				err := vs.AddUsHsiaFlows(tt.args.cntx)
+				err := vs.AddUsHsiaFlows(tt.args.cntx, false)
 				assert.Nil(t, err)
 			case "GetDeviceFromPort_error":
 				vs := &VoltService{
@@ -694,7 +694,7 @@
 						DeleteInProgress: false,
 					},
 				}
-				err := vs.AddUsHsiaFlows(tt.args.cntx)
+				err := vs.AddUsHsiaFlows(tt.args.cntx, false)
 				assert.NotNil(t, err)
 			case "DeviceState_down":
 				vs := &VoltService{
@@ -708,7 +708,7 @@
 				ga := GetApplication()
 				ga.PortsDisc.Store("test_port", voltPort)
 				ga.DevicesDisc.Store(test_device, voltDevice1)
-				err := vs.AddUsHsiaFlows(tt.args.cntx)
+				err := vs.AddUsHsiaFlows(tt.args.cntx, false)
 				assert.Nil(t, err)
 			}
 		})
diff --git a/internal/pkg/application/vnets.go b/internal/pkg/application/vnets.go
index 3f1cbae..7b70315 100644
--- a/internal/pkg/application/vnets.go
+++ b/internal/pkg/application/vnets.go
@@ -672,9 +672,9 @@
 }
 
 // RangeOnServices to call a function on all services on the vpv
-func (vpv *VoltPortVnet) RangeOnServices(cntx context.Context, callback func(cntx context.Context, key, value interface{}, flag bool) bool, delFlowsInDevice bool) {
+func (vpv *VoltPortVnet) RangeOnServices(cntx context.Context, callback func(cntx context.Context, key, value interface{}, flag bool) bool, flag bool) {
 	vpv.services.Range(func(key, value interface{}) bool {
-		return callback(cntx, key, value, delFlowsInDevice)
+		return callback(cntx, key, value, flag)
 	})
 }
 
@@ -747,7 +747,7 @@
 	// vpv.DsFlowsApplied = false
 	// vpv.UsFlowsApplied = false
 	vpv.VpvLock.Lock()
-	vpv.PortUpInd(cntx, d, vpv.Port, "")
+	vpv.PortUpInd(cntx, d, vpv.Port, "", false)
 	vpv.VpvLock.Unlock()
 }
 
@@ -756,7 +756,7 @@
 // again here to apply the latest configuration if the configuration
 // changed. Thus, a reboot of ONT forces the new configuration to get
 // applied.
-func (vpv *VoltPortVnet) PortUpInd(cntx context.Context, device *VoltDevice, port string, nniPort string) {
+func (vpv *VoltPortVnet) PortUpInd(cntx context.Context, device *VoltDevice, port string, nniPort string, skipFlowPushToVoltha bool) {
 	logger.Infow(ctx, "Port UP Ind, pushing flows for the port", log.Fields{"Device": device, "Port": port, "VnetDhcp": vpv.DhcpRelay, "McastService": vpv.McastService})
 	if vpv.DeleteInProgress {
 		logger.Warnw(ctx, "Ignoring VPV Port UP Ind, VPV deletion In-Progress", log.Fields{"Device": device, "Port": port, "Vnet": vpv.VnetName})
@@ -789,23 +789,23 @@
 		logger.Infow(ctx, "Port Up - Trap Flows", log.Fields{"Device": device.Name, "Port": port})
 		// no HSIA flows for multicast service and DPU_MGMT Service
 		if !vpv.McastService && vpv.VnetType != DpuMgmtTraffic {
-			vpv.RangeOnServices(cntx, AddUsHsiaFlows, false)
+			vpv.RangeOnServices(cntx, AddUsHsiaFlows, skipFlowPushToVoltha)
 		}
 		if vpv.VnetType == DpuMgmtTraffic {
-			vpv.RangeOnServices(cntx, AddMeterToDevice, false)
+			vpv.RangeOnServices(cntx, AddMeterToDevice, skipFlowPushToVoltha)
 		}
 		vpv.AddTrapFlows(cntx)
 		if vpv.MacLearning == MacLearningNone || NonZeroMacAddress(vpv.MacAddr) {
 			logger.Infow(ctx, "Port Up - DS Flows", log.Fields{"Device": device.Name, "Port": port})
 			/*In case of DPU_MGMT_TRAFFIC, need to install both US and DS traffic */
 			if vpv.VnetType == DpuMgmtTraffic {
-				vpv.RangeOnServices(cntx, AddUsHsiaFlows, false)
+				vpv.RangeOnServices(cntx, AddUsHsiaFlows, skipFlowPushToVoltha)
 			}
 			// US & DS DHCP, US HSIA flows are already installed
 			// install only DS HSIA flow here.
 			// no HSIA flows for multicast service
 			if !vpv.McastService {
-				vpv.RangeOnServices(cntx, AddDsHsiaFlows, false)
+				vpv.RangeOnServices(cntx, AddDsHsiaFlows, skipFlowPushToVoltha)
 			}
 		}
 	} else {
@@ -816,25 +816,25 @@
 		// however is not seen as a real use case.
 		logger.Debugw(ctx, "Port Up - Service Flows", log.Fields{"Device": device.Name, "Port": port})
 		if !vpv.McastService {
-			vpv.RangeOnServices(cntx, AddUsHsiaFlows, false)
+			vpv.RangeOnServices(cntx, AddUsHsiaFlows, skipFlowPushToVoltha)
 		}
 		vpv.AddTrapFlows(cntx)
 		if !vpv.McastService {
-			vpv.RangeOnServices(cntx, AddDsHsiaFlows, false)
+			vpv.RangeOnServices(cntx, AddDsHsiaFlows, skipFlowPushToVoltha)
 		}
 	}
 
 	// Process IGMP proxy - install IGMP trap rules before DHCP trap rules
 	if vpv.IgmpEnabled {
 		logger.Debugw(ctx, "Port Up - IGMP Flows", log.Fields{"Device": device.Name, "Port": port})
-		vpv.RangeOnServices(cntx, AddSvcUsMeterToDevice, false)
+		vpv.RangeOnServices(cntx, AddSvcUsMeterToDevice, skipFlowPushToVoltha)
 		if err := vpv.AddIgmpFlows(cntx); err != nil {
 			statusCode, statusMessage := errorCodes.GetErrorInfo(err)
 			vpv.FlowInstallFailure("VGC processing failure", statusCode, statusMessage)
 		}
 
 		if vpv.McastService {
-			vpv.RangeOnServices(cntx, PostAccessConfigSuccessInd, false)
+			vpv.RangeOnServices(cntx, PostAccessConfigSuccessInd, skipFlowPushToVoltha)
 		}
 	}
 
@@ -1133,7 +1133,7 @@
 		if NonZeroMacAddress(vpv.MacAddr) || svc.MacLearning == MacLearningNone {
 			svc.AddHsiaFlows(cntx)
 		} else {
-			if err := svc.AddUsHsiaFlows(cntx); err != nil {
+			if err := svc.AddUsHsiaFlows(cntx, false); err != nil {
 				logger.Warnw(ctx, "Add US hsia flow failed", log.Fields{"service": svc.Name, "Error": err})
 			}
 		}
@@ -1194,7 +1194,7 @@
 func AddUsHsiaFlows(cntx context.Context, key, value interface{}, flag bool) bool {
 	svc := value.(*VoltService)
 	logger.Debugw(ctx, "Add US Hsia Flows", log.Fields{"ServiceName": svc.Name})
-	if err := svc.AddUsHsiaFlows(cntx); err != nil {
+	if err := svc.AddUsHsiaFlows(cntx, flag); err != nil {
 		logger.Warnw(ctx, "Add US hsia flow failed", log.Fields{"service": svc.Name, "Error": err})
 	}
 	return true
@@ -1204,7 +1204,7 @@
 func AddDsHsiaFlows(cntx context.Context, key, value interface{}, flag bool) bool {
 	svc := value.(*VoltService)
 	logger.Debugw(ctx, "Add DS Hsia Flows", log.Fields{"ServiceName": svc.Name})
-	if err := svc.AddDsHsiaFlows(cntx); err != nil {
+	if err := svc.AddDsHsiaFlows(cntx, flag); err != nil {
 		logger.Warnw(ctx, "Add DS hsia flow failed", log.Fields{"service": svc.Name, "Error": err})
 	}
 	return true
@@ -1816,7 +1816,7 @@
 					vd.RegisterFlowAddEvent(cookie, fe)
 				}
 			}
-			if err1 := controller.GetController().AddFlows(cntx, vpv.Port, device.Name, flows); err1 != nil {
+			if err1 := controller.GetController().AddFlows(cntx, vpv.Port, device.Name, flows, false); err1 != nil {
 				return err1
 			}
 		} else {
@@ -2469,7 +2469,7 @@
 		if p != nil {
 			logger.Debugw(ctx, "Checking UNI port state", log.Fields{"State": p.State})
 			if d.State == controller.DeviceStateUP && p.State == PortStateUp {
-				vpv.PortUpInd(cntx, d, port, vs.NniPort)
+				vpv.PortUpInd(cntx, d, port, vs.NniPort, false)
 			}
 		}
 	}
@@ -2729,7 +2729,7 @@
 
 			// Pushing ICMPv6 Flow
 			flow := BuildICMPv6Flow(portID, vnet)
-			err = controller.GetController().AddFlows(cntx, nniPort, device.Name, flow)
+			err = controller.GetController().AddFlows(cntx, nniPort, device.Name, flow, false)
 			if err != nil {
 				logger.Warnw(ctx, "Configuring ICMPv6 Flow for device failed ", log.Fields{"Device": device.Name, "err": err})
 				return true
@@ -2738,7 +2738,7 @@
 
 			// Pushing ARP Flow
 			flow = BuildDSArpFlow(portID, vnet)
-			err = controller.GetController().AddFlows(cntx, nniPort, device.Name, flow)
+			err = controller.GetController().AddFlows(cntx, nniPort, device.Name, flow, false)
 			if err != nil {
 				logger.Warnw(ctx, "Configuring ARP Flow for device failed ", log.Fields{"Device": device.Name, "err": err})
 				return true
@@ -2756,7 +2756,7 @@
 
 func (va *VoltApplication) PushTrapFlows(cntx context.Context, device *VoltDevice, nniPort string, flow *of.VoltFlow) error {
 	logger.Debugw(ctx, "Push NNI DHCP Trap Flows", log.Fields{"DeviceName": device.Name, "Flow port": flow.PortID})
-	return controller.GetController().AddFlows(cntx, nniPort, device.Name, flow)
+	return controller.GetController().AddFlows(cntx, nniPort, device.Name, flow, false)
 }
 
 // PushDevFlowForDevice to push icmpv6 flows for device
@@ -2790,7 +2790,7 @@
 			return true
 		}
 		flow := BuildICMPv6Flow(nniPortID, vnet)
-		err = controller.GetController().AddFlows(cntx, nniPort, device.Name, flow)
+		err = controller.GetController().AddFlows(cntx, nniPort, device.Name, flow, false)
 		if err != nil {
 			logger.Warnw(ctx, "Configuring ICMPv6 Flow for device failed ", log.Fields{"Device": device.Name, "err": err})
 			return true
@@ -2798,7 +2798,7 @@
 		logger.Infow(ctx, "ICMP Flow Added to Queue", log.Fields{"flow": flow})
 
 		flow = BuildDSArpFlow(nniPortID, vnet)
-		err = controller.GetController().AddFlows(cntx, nniPort, device.Name, flow)
+		err = controller.GetController().AddFlows(cntx, nniPort, device.Name, flow, false)
 		if err != nil {
 			logger.Warnw(ctx, "Configuring ARP Flow for device failed ", log.Fields{"Device": device.Name, "err": err})
 			return true
@@ -3128,7 +3128,7 @@
 		}
 		device.RegisterFlowAddEvent(cookie, fe)
 	}
-	return controller.GetController().AddFlows(cntx, vpv.Port, device.Name, flow)
+	return controller.GetController().AddFlows(cntx, vpv.Port, device.Name, flow, false)
 }
 
 // FlowInstallFailure - Process flow failure indication and triggers HSIA failure for all associated services
diff --git a/internal/pkg/controller/addflows.go b/internal/pkg/controller/addflows.go
index a4f8a8d..83ead45 100644
--- a/internal/pkg/controller/addflows.go
+++ b/internal/pkg/controller/addflows.go
@@ -78,27 +78,10 @@
 	aft.taskID = taskID
 	aft.ctx = ctx
 	flowsToProcess := make(map[uint64]*of.VoltSubFlow)
-	flowsPresent := 0
 	// First add/delete the flows first locally before passing them to actual device
 	for _, flow := range aft.flow.SubFlows {
 		logger.Debugw(ctx, "Flow Mod Request", log.Fields{"Cookie": flow.Cookie, "Oper": aft.flow.Command, "Port": aft.flow.PortID})
 		if aft.flow.Command == of.CommandAdd {
-			flow.State = of.FlowAddPending
-			if err = aft.device.AddFlow(ctx, flow); err != nil {
-				logger.Warnw(ctx, "Add Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
-
-				// If flow already exists in cache, check for flow state
-				// If Success: Trigger success FLow Indication
-				// if Failure: Continue process, so add-retry happens
-				if err.Error() == ErrDuplicateFlow {
-					dbFlow, _ := aft.device.GetFlow(flow.Cookie)
-					if dbFlow.State == of.FlowAddSuccess {
-						aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
-						flowsPresent++
-						continue
-					}
-				}
-			}
 			flowsToProcess[flow.Cookie] = flow
 		} else {
 			dbFlow, ok := aft.device.GetFlow(flow.Cookie)
@@ -114,11 +97,6 @@
 		}
 	}
 
-	if flowsPresent == len(aft.flow.SubFlows) {
-		logger.Warn(ctx, "All Flows already present in database. Skipping Flow Push to SB")
-		return nil
-	}
-
 	// PortName and PortID are used for validation of PortID, whether it is still valid and associated with old PortName or
 	// PortID got assigned to another PortName. If the condition met, skip these flow update to voltha core
 	if aft.flow.PortName != "" && aft.flow.PortID != 0 {
diff --git a/internal/pkg/controller/auditdevice.go b/internal/pkg/controller/auditdevice.go
index ecee452..37a9837 100644
--- a/internal/pkg/controller/auditdevice.go
+++ b/internal/pkg/controller/auditdevice.go
@@ -20,6 +20,7 @@
 	"time"
 
 	"voltha-go-controller/internal/pkg/tasks"
+	"voltha-go-controller/internal/pkg/util"
 	"voltha-go-controller/log"
 
 	"github.com/opencord/voltha-protos/v5/go/common"
@@ -43,12 +44,13 @@
 
 // AuditDevice structure
 type AuditDevice struct {
-	ctx       context.Context
-	device    *Device
-	timestamp string
-	event     AuditEventType
-	taskID    uint8
-	stop      bool
+	ctx               context.Context
+	device            *Device
+	timestamp         string
+	event             AuditEventType
+	taskID            uint8
+	stop              bool
+	skipFlowOnRestart bool
 }
 
 // NewAuditDevice is constructor for AuditDevice
@@ -107,11 +109,11 @@
 	missingPorts := make(map[uint32]*ofp.OfpPort)
 	for _, ofpp := range ofpps.Items {
 		missingPorts[ofpp.OfpPort.PortNo] = ofpp.OfpPort
-		logger.Infow(ctx, "Missing Ports", log.Fields{"Ports": ofpp.OfpPort, "missingPorts": missingPorts})
 	}
 
 	excessPorts := make(map[uint32]*DevicePort)
 	GetController().SetAuditFlags(ad.device)
+	defer GetController().ResetAuditFlags(ad.device)
 
 	processPortState := func(id uint32, vgcPort *DevicePort) {
 		logger.Debugw(ctx, "Process Port State Ind", log.Fields{"Port No": vgcPort.ID, "Port Name": vgcPort.Name})
@@ -121,7 +123,7 @@
 				// This port exists in the received list and the map at
 				// VGC. This is common so delete it
 				logger.Infow(ctx, "Port State Mismatch", log.Fields{"Port": vgcPort.ID, "OfpPort": ofpPort.PortNo, "ReceivedState": ofpPort.State, "CurrentState": vgcPort.State})
-				ad.device.ProcessPortState(ctx, ofpPort.PortNo, ofpPort.State, ofpPort.Name)
+				ad.device.ProcessPortState(ctx, ofpPort.PortNo, ofpPort.State, ofpPort.Name, ad.skipFlowOnRestart)
 			} else {
 				//To ensure the flows are in sync with port status and no mismatch due to reboot,
 				// repush/delete flows based on current port status
@@ -138,13 +140,15 @@
 	}
 
 	// 1st process the NNI port before all other ports so that the device state can be updated.
-	if vgcPort, ok := ad.device.PortsByID[NNIPortID]; ok {
-		logger.Debugw(ctx, "Processing NNI port state", log.Fields{"PortNo": vgcPort.ID, "PortName": vgcPort.Name, "PortState": vgcPort.State})
-		processPortState(NNIPortID, vgcPort)
+	for id, vgcPort := range ad.device.PortsByID {
+		if util.IsNniPort(id) {
+			logger.Debugw(ctx, "Processing NNI port state", log.Fields{"PortNo": vgcPort.ID, "PortName": vgcPort.Name, "PortState": vgcPort.State})
+			processPortState(id, vgcPort)
+		}
 	}
 
 	for id, vgcPort := range ad.device.PortsByID {
-		if id == NNIPortID {
+		if util.IsNniPort(id) {
 			//NNI port already processed
 			continue
 		}
@@ -153,7 +157,6 @@
 		}
 		processPortState(id, vgcPort)
 	}
-	GetController().ResetAuditFlags(ad.device)
 
 	if ad.stop {
 		logger.Errorw(ctx, "Audit Device Task Canceled", log.Fields{"Context": ad.ctx, "Task": ad.taskID})
@@ -168,30 +171,30 @@
 
 // AddMissingPorts to add the missing ports
 func (ad *AuditDevice) AddMissingPorts(cntx context.Context, mps map[uint32]*ofp.OfpPort) {
-	logger.Infow(ctx, "Device Audit - Add Missing Ports", log.Fields{"NumPorts": len(mps), "Ports": mps})
+	logger.Debugw(ctx, "Device Audit - Add Missing Ports", log.Fields{"NumPorts": len(mps), "Ports": mps})
 
 	addMissingPort := func(mp *ofp.OfpPort) {
 		logger.Debugw(ctx, "Process Port Add Ind", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name})
 
-		// Error is ignored as it only drops duplicate ports
-		logger.Debugw(ctx, "Calling AddPort", log.Fields{"No": mp.PortNo, "Name": mp.Name})
 		if err := ad.device.AddPort(cntx, mp); err != nil {
 			logger.Warnw(ctx, "AddPort Failed", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name, "Reason": err})
 		}
 		if mp.State == uint32(ofp.OfpPortState_OFPPS_LIVE) {
-			ad.device.ProcessPortState(cntx, mp.PortNo, mp.State, mp.Name)
+			ad.device.ProcessPortState(cntx, mp.PortNo, mp.State, mp.Name, ad.skipFlowOnRestart)
 		}
 		logger.Debugw(ctx, "Processed Port Add Ind", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name})
 	}
 
 	// 1st process the NNI port before all other ports so that the flow provisioning for UNIs can be enabled
-	if mp, ok := mps[NNIPortID]; ok {
-		logger.Debugw(ctx, "Adding Missing NNI port", log.Fields{"PortNo": mp.PortNo, "Port Name": mp.Name, "Port Status": mp.State})
-		addMissingPort(mp)
+	for portNo, mp := range mps {
+		if util.IsNniPort(portNo) {
+			logger.Debugw(ctx, "Adding Missing NNI port", log.Fields{"PortNo": mp.PortNo, "Port Name": mp.Name, "Port Status": mp.State})
+			addMissingPort(mp)
+		}
 	}
 
 	for portNo, mp := range mps {
-		if portNo != NNIPortID {
+		if !util.IsNniPort(portNo) {
 			addMissingPort(mp)
 		}
 	}
diff --git a/internal/pkg/controller/audittables.go b/internal/pkg/controller/audittables.go
index d98888c..9c4c712 100644
--- a/internal/pkg/controller/audittables.go
+++ b/internal/pkg/controller/audittables.go
@@ -17,10 +17,9 @@
 
 import (
 	"context"
-	"strconv"
+	"errors"
 	"time"
 
-	"voltha-go-controller/internal/pkg/intf"
 	"voltha-go-controller/internal/pkg/of"
 	"voltha-go-controller/internal/pkg/tasks"
 	"voltha-go-controller/internal/pkg/util"
@@ -235,19 +234,21 @@
 		return err
 	}
 
-	defaultSuccessFlowStatus := intf.FlowStatus{
-		Device:      att.device.ID,
-		FlowModType: of.CommandAdd,
-		Status:      0,
-		Reason:      "",
-	}
+	// defaultSuccessFlowStatus := intf.FlowStatus{
+	// 	Device:      att.device.ID,
+	// 	FlowModType: of.CommandAdd,
+	// 	Status:      0,
+	// 	Reason:      "",
+	// }
 
 	// Build the map for easy and faster processing
 	rcvdFlows := make(map[uint64]*ofp.OfpFlowStats)
+	volthaFlows := make(map[uint64]*ofp.OfpFlowStats)
 	flowsToAdd := &of.VoltFlow{}
 	flowsToAdd.SubFlows = make(map[uint64]*of.VoltSubFlow)
 	for _, flow := range f.Items {
 		rcvdFlows[flow.Cookie] = flow
+		volthaFlows[flow.Cookie] = flow
 	}
 
 	att.device.flowLock.Lock()
@@ -269,10 +270,68 @@
 				// Update flow delete count since we are retrying the flow delete due to failure
 				att.device.UpdateFlowCount(cntx, flow.Cookie)
 			}
-			defaultSuccessFlowStatus.Cookie = strconv.FormatUint(flow.Cookie, 10)
+			// defaultSuccessFlowStatus.Cookie = strconv.FormatUint(flow.Cookie, 10)
 		} else {
+			// Do not add the flow to device whose state was marked as delete failure
+			// Remove the flow from DB as it is no longer reported by voltha
+			if flow.State == of.FlowDelFailure {
+				delete(att.device.flows, flow.Cookie)
+				att.device.DelFlowFromDb(cntx, flow.Cookie)
+				logger.Warnw(ctx, "Found flow with state DelFailure while adding to device, will remove from DB", log.Fields{"Cookie": flow.Cookie})
+				continue
+			}
 			// The flow exists at the controller but not at the device
 			// Push the flow to the device
+
+			// If UST0 flow is missing in voltha but the UST1 flow is present in voltha,
+			// then delete the UST1 flow from voltha and add both US flows to voltha
+			if att.device.IsUSTable0Flow(ctx, flow) {
+				logger.Debugw(ctx, "UST0 flow found, checking for UST1 flow", log.Fields{"Cookie": flow.Cookie})
+				ust1Flow := att.device.GetDeviceFlow(ctx, flow, att.device.SerialNum, att.device.ID, false)
+				if ust1Flow != nil {
+					flowToDelete, ok := volthaFlows[ust1Flow.Cookie]
+					if ok {
+						// Sometimes the audit would happen even before all flows are installed for a service.
+						// If the UST0 flow is still missing in voltha on second retry attempt, then remove the UST1 flow from voltha.
+						if flow.FlowCount == 0 {
+							att.device.UpdateFlowCount(cntx, flow.Cookie)
+							continue
+						}
+						logger.Infow(ctx, "UST1 flow already present in Voltha, delete and install US flows", log.Fields{"UST1Flow": ust1Flow.Cookie, "UST0Flow": flow.Cookie})
+						err = att.DeleteDeviceFlow(ctx, flowToDelete)
+						if err == nil {
+							flowsToAdd.SubFlows[ust1Flow.Cookie] = ust1Flow
+						} else {
+							logger.Warnw(ctx, "UST1 flow delete failed", log.Fields{"Cookie": ust1Flow.Cookie, "Reason": err.Error()})
+						}
+					}
+				}
+			}
+
+			// If DST0 flow is missing in voltha but the DST1 flow is present in voltha,
+			// then delete the DST1 flow from voltha and add both DS flows to voltha
+			if att.device.IsDSTable0Flow(ctx, flow) {
+				logger.Debugw(ctx, "DST0 flow found, checking for DST1 flow", log.Fields{"Cookie": flow.Cookie})
+				dst1Flow := att.device.GetDeviceFlow(ctx, flow, att.device.SerialNum, att.device.ID, true)
+				if dst1Flow != nil {
+					flowToDelete, ok := volthaFlows[dst1Flow.Cookie]
+					if ok {
+						// Sometimes the audit would happen even before all flows are installed for a service.
+						// If the DST0 flow is still missing in voltha on second retry attempt, then remove the DST1 flow from voltha.
+						if flow.FlowCount == 0 {
+							att.device.UpdateFlowCount(cntx, flow.Cookie)
+							continue
+						}
+						logger.Infow(ctx, "DST1 flow already present in Voltha, delete and install DS flows", log.Fields{"DST1Flow": dst1Flow.Cookie, "DST0Flow": flow.Cookie})
+						err = att.DeleteDeviceFlow(ctx, flowToDelete)
+						if err == nil {
+							flowsToAdd.SubFlows[dst1Flow.Cookie] = dst1Flow
+						} else {
+							logger.Warnw(ctx, "DST1 flow delete failed", log.Fields{"Cookie": dst1Flow.Cookie, "Reason": err.Error()})
+						}
+					}
+				}
+			}
 			logger.Debugw(ctx, "Adding Flow To Missing Flows", log.Fields{"Cookie": flow.Cookie})
 			if !att.device.IsFlowAddThresholdReached(flow.FlowCount, flow.Cookie) {
 				flowsToAdd.SubFlows[flow.Cookie] = flow
@@ -380,6 +439,44 @@
 	}
 }
 
+func (att *AuditTablesTask) DeleteDeviceFlow(cntx context.Context, flow *ofp.OfpFlowStats) error {
+	logger.Debugw(ctx, "Deleting Flow", log.Fields{"Cookie": flow.Cookie})
+
+	// Create the flowMod structure and fill it out
+	flowMod := &ofp.OfpFlowMod{}
+	flowMod.Cookie = flow.Cookie
+	flowMod.TableId = flow.TableId
+	flowMod.Command = ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT
+	flowMod.IdleTimeout = flow.IdleTimeout
+	flowMod.HardTimeout = flow.HardTimeout
+	flowMod.Priority = flow.Priority
+	flowMod.BufferId = of.DefaultBufferID
+	flowMod.OutPort = of.DefaultOutPort
+	flowMod.OutGroup = of.DefaultOutGroup
+	flowMod.Flags = flow.Flags
+	flowMod.Match = flow.Match
+	flowMod.Instructions = flow.Instructions
+
+	// Create FlowTableUpdate
+	flowUpdate := &ofp.FlowTableUpdate{
+		Id:      att.device.ID,
+		FlowMod: flowMod,
+	}
+
+	var err error
+	var vc voltha.VolthaServiceClient
+	if vc = att.device.VolthaClient(); vc == nil {
+		logger.Error(ctx, "Delete flow failed: Voltha Client Unavailable")
+		return errors.New("voltha client unavailable")
+	}
+
+	if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flowUpdate); err != nil {
+		logger.Errorw(ctx, "Flow delete failed", log.Fields{"Reason": err.Error()})
+		return err
+	}
+	return nil
+}
+
 // AuditGroups audit the groups which includes fetching the existing groups at the
 // voltha and identifying the delta between the ones held here and the
 // ones held at VOLTHA. The delta must be cleaned up to keep both the
@@ -582,7 +679,7 @@
 				// This port exists in the received list and the map at
 				// VGC. This is common so delete it
 				logger.Infow(ctx, "Port State Mismatch", log.Fields{"Port": vgcPort.ID, "OfpPort": ofpPort.PortNo, "ReceivedState": ofpPort.State, "CurrentState": vgcPort.State})
-				att.device.ProcessPortState(ctx, ofpPort.PortNo, ofpPort.State, ofpPort.Name)
+				att.device.ProcessPortState(ctx, ofpPort.PortNo, ofpPort.State, ofpPort.Name, false)
 			}
 			delete(missingPorts, id)
 		} else {
@@ -593,13 +690,15 @@
 		logger.Debugw(ctx, "Processed Port State Ind", log.Fields{"Port No": vgcPort.ID, "Port Name": vgcPort.Name})
 	}
 	// 1st process the NNI port before all other ports so that the device state can be updated.
-	if vgcPort, ok := att.device.PortsByID[NNIPortID]; ok {
-		logger.Debugw(ctx, "Processing NNI port state", log.Fields{"Port ID": vgcPort.ID, "Port Name": vgcPort.Name})
-		processPortState(NNIPortID, vgcPort)
+	for id, vgcPort := range att.device.PortsByID {
+		if util.IsNniPort(id) {
+			logger.Debugw(ctx, "Processing NNI port state", log.Fields{"Port ID": vgcPort.ID, "Port Name": vgcPort.Name})
+			processPortState(id, vgcPort)
+		}
 	}
 
 	for id, vgcPort := range att.device.PortsByID {
-		if id == NNIPortID {
+		if util.IsNniPort(id) {
 			// NNI port already processed
 			continue
 		}
@@ -629,19 +728,21 @@
 			logger.Warnw(ctx, "AddPort Failed", log.Fields{"No": mp.PortNo, "Name": mp.Name, "Reason": err})
 		}
 		if mp.State == uint32(ofp.OfpPortState_OFPPS_LIVE) {
-			att.device.ProcessPortState(cntx, mp.PortNo, mp.State, mp.Name)
+			att.device.ProcessPortState(cntx, mp.PortNo, mp.State, mp.Name, false)
 		}
 		logger.Debugw(ctx, "Processed Port Add Ind", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name})
 	}
 
 	// 1st process the NNI port before all other ports so that the flow provisioning for UNIs can be enabled
-	if mp, ok := mps[NNIPortID]; ok {
-		logger.Debugw(ctx, "Adding Missing NNI port", log.Fields{"PortNo": mp.PortNo})
-		addMissingPort(mp)
+	for portNo, mp := range mps {
+		if util.IsNniPort(portNo) {
+			logger.Debugw(ctx, "Adding Missing NNI port", log.Fields{"PortNo": mp.PortNo, "Port Name": mp.Name, "Port Status": mp.State})
+			addMissingPort(mp)
+		}
 	}
 
 	for portNo, mp := range mps {
-		if portNo != NNIPortID {
+		if !util.IsNniPort(portNo) {
 			addMissingPort(mp)
 		}
 	}
diff --git a/internal/pkg/controller/changeevent.go b/internal/pkg/controller/changeevent.go
index 54b6d22..0144297 100644
--- a/internal/pkg/controller/changeevent.go
+++ b/internal/pkg/controller/changeevent.go
@@ -77,7 +77,7 @@
 		case ofp.OfpPortReason_OFPPR_ADD:
 			_ = cet.device.AddPort(ctx, status.PortStatus.Desc)
 			if state == uint32(ofp.OfpPortState_OFPPS_LIVE) {
-				cet.device.ProcessPortState(ctx, portNo, state, portName)
+				cet.device.ProcessPortState(ctx, portNo, state, portName, false)
 			}
 		case ofp.OfpPortReason_OFPPR_DELETE:
 			cet.device.CheckAndDeletePort(ctx, portNo, portName)
diff --git a/internal/pkg/controller/controller.go b/internal/pkg/controller/controller.go
index 1c02738..ba33d78 100644
--- a/internal/pkg/controller/controller.go
+++ b/internal/pkg/controller/controller.go
@@ -131,8 +131,6 @@
 
 	d.RestoreMetersFromDb(cntx)
 	d.RestoreGroupsFromDb(cntx)
-	d.RestoreFlowsFromDb(cntx)
-	d.RestorePortsFromDb(cntx)
 	d.ConnectInd(context.TODO(), intf.DeviceDisc)
 	d.packetOutChannel = config.PacketOutChannel
 
@@ -280,6 +278,10 @@
 	v.app.CheckAndDeactivateService(ctx, flow, devSerialNum, devID)
 }
 
+func (v *VoltController) GetAllFlowsForSvc(ctx context.Context, flow *of.VoltSubFlow, devID string, devSerialNum string) []uint64 {
+	return v.app.GetAllFlowsForSvc(ctx, flow, devID, devSerialNum)
+}
+
 // AddVPAgent to add the vpagent
 func (v *VoltController) AddVPAgent(vep string, vpa *vpagent.VPAgent) {
 	v.vagent[vep] = vpa
@@ -306,7 +308,7 @@
 }
 
 // AddFlows to add flows
-func (v *VoltController) AddFlows(cntx context.Context, port string, device string, flow *of.VoltFlow) error {
+func (v *VoltController) AddFlows(cntx context.Context, port string, device string, flow *of.VoltFlow, skipFlowPushToVoltha bool) error {
 	d, err := v.GetDevice(device)
 	if err != nil {
 		logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
@@ -347,10 +349,28 @@
 			}
 		}
 	} else {
-		flow.Command = of.CommandAdd
-		d.UpdateFlows(flow, devPort)
-		for cookie := range flow.SubFlows {
-			logger.Debugw(ctx, "Flow Add added to queue", log.Fields{"Cookie": cookie, "Device": device, "Port": port})
+		flowsToVoltha := &of.VoltFlow{}
+		flowsToVoltha.SubFlows = make(map[uint64]*of.VoltSubFlow)
+		// During VGC restart, build and add flows only to cache.
+		// No need to push flows to voltha now as it will be audited later
+		for _, subFlow := range flow.SubFlows {
+			logger.Debugw(ctx, "Adding flows to device cache", log.Fields{"Cookie": subFlow.Cookie})
+			if !skipFlowPushToVoltha {
+				subFlow.State = of.FlowAddPending
+			}
+			if err := d.AddFlow(cntx, subFlow); err != nil {
+				logger.Warnw(ctx, "Add Flow Error", log.Fields{"Cookie": subFlow.Cookie, "Reason": err.Error()})
+			} else {
+				flowsToVoltha.SubFlows[subFlow.Cookie] = subFlow
+			}
+		}
+
+		if !skipFlowPushToVoltha {
+			flowsToVoltha.Command = of.CommandAdd
+			d.UpdateFlows(flowsToVoltha, devPort)
+			for cookie := range flowsToVoltha.SubFlows {
+				logger.Debugw(ctx, "Flow Add added to queue", log.Fields{"Cookie": cookie, "Device": device, "Port": port})
+			}
 		}
 	}
 	return nil
@@ -471,8 +491,8 @@
 }
 
 // PortUpInd for port up indication
-func (v *VoltController) PortUpInd(cntx context.Context, device string, port string) {
-	v.app.PortUpInd(cntx, device, port)
+func (v *VoltController) PortUpInd(cntx context.Context, device string, port string, flag bool) {
+	v.app.PortUpInd(cntx, device, port, flag)
 }
 
 // PortDownInd for port down indication
diff --git a/internal/pkg/controller/controller_test.go b/internal/pkg/controller/controller_test.go
index 18b16bc..e47f727 100644
--- a/internal/pkg/controller/controller_test.go
+++ b/internal/pkg/controller/controller_test.go
@@ -173,7 +173,7 @@
 				v.Devices.Store(key, value)
 				return true
 			})
-			if err := v.AddFlows(tt.args.cntx, tt.args.port, tt.args.device, tt.args.flow); (err != nil) != tt.wantErr {
+			if err := v.AddFlows(tt.args.cntx, tt.args.port, tt.args.device, tt.args.flow, false); (err != nil) != tt.wantErr {
 				t.Errorf("VoltController.AddFlows() error = %v, wantErr %v", err, tt.wantErr)
 			}
 		})
@@ -1219,9 +1219,11 @@
 		State:   1,
 		SetVlan: of.VlanAny,
 	}
+
+	// Setup database mock for async operations
 	dbintf := mocks.NewMockDBIntf(gomock.NewController(t))
 	db = dbintf
-	dbintf.EXPECT().PutGroup(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
+	dbintf.EXPECT().PutGroup(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
 	tests := []struct {
 		name    string
 		args    args
@@ -1263,7 +1265,18 @@
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			switch tt.name {
-			case "GroupUpdate", "DeviceNOtFound_Error", "PortNOtFound_Error":
+			case "GroupUpdate":
+				v := &VoltController{
+					Devices: sync.Map{},
+				}
+				dev.Range(func(key, value interface{}) bool {
+					v.Devices.Store(key, value)
+					return true
+				})
+				if err := v.GroupUpdate(tt.args.port, tt.args.device, tt.args.group); (err != nil) != tt.wantErr {
+					t.Errorf("VoltController.GroupUpdate() error = %v, wantErr %v", err, tt.wantErr)
+				}
+			case "DeviceNOtFound_Error", "PortNOtFound_Error":
 				v := &VoltController{
 					Devices: sync.Map{},
 				}
diff --git a/internal/pkg/controller/device.go b/internal/pkg/controller/device.go
index 0748fa7..93fae14 100644
--- a/internal/pkg/controller/device.go
+++ b/internal/pkg/controller/device.go
@@ -242,7 +242,6 @@
 		}
 	}
 	d.flows[flow.Cookie] = flow
-	d.AddFlowToDb(cntx, flow)
 	return nil
 }
 
@@ -504,7 +503,6 @@
 	p := NewDevicePort(mp)
 	d.PortsByID[id] = p
 	d.PortsByName[name] = p
-	d.WritePortToDb(cntx, p)
 	d.portLock.Unlock()
 	GetController().PortAddInd(cntx, d.ID, p.ID, p.Name)
 	logger.Infow(ctx, "Added Port", log.Fields{"Device": d.ID, "Port": id})
@@ -563,7 +561,6 @@
 	delete(d.PortsByID, p.ID)
 	p.ID = port
 	d.PortsByID[port] = p
-	d.WritePortToDb(cntx, p)
 	GetController().PortUpdateInd(d.ID, p.Name, p.ID)
 	logger.Infow(ctx, "Updated Port", log.Fields{"Device": d.ID, "Port": p.ID, "PortName": name})
 }
@@ -698,6 +695,12 @@
 
 	logger.Debugw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
 	t := NewAuditDevice(d, AuditEventDeviceDisc)
+	// During VGC restart or when a device is added. skip pushing flows to voltha during audit device task
+	// When device is added, if required the flows will get pushed during the next audit table task called soon after this audit device task
+	if discType == intf.DeviceDisc {
+		t.skipFlowOnRestart = true
+	}
+
 	d.Tasks.AddTask(t)
 
 	t1 := NewAuditTablesTask(d)
@@ -788,7 +791,6 @@
 			logger.Debugw(ctx, "Resetting Port State to DOWN", log.Fields{"Device": d.ID, "Port": port})
 			GetController().PortDownInd(cntx, d.ID, port.Name)
 			port.State = PortStateDown
-			d.WritePortToDb(cntx, port)
 		}
 	}
 }
@@ -818,7 +820,7 @@
 			return
 			//Do not process port update received from change event, as we will only handle port updates during polling
 		}
-		d.ProcessPortState(cntx, port, state, portName)
+		d.ProcessPortState(cntx, port, state, portName, false)
 	}
 }
 
@@ -838,7 +840,7 @@
 
 // ProcessPortState deals with the change in port status and taking action
 // based on the new state and the old state
-func (d *Device) ProcessPortState(cntx context.Context, port uint32, state uint32, portName string) {
+func (d *Device) ProcessPortState(cntx context.Context, port uint32, state uint32, portName string, skipFlowPushToVoltha bool) {
 	if d.State != DeviceStateUP && !util.IsNniPort(port) {
 		logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
 		return
@@ -857,7 +859,7 @@
 		if state == uint32(ofp.OfpPortState_OFPPS_LIVE) && p.State == PortStateDown {
 			// Transition from DOWN to UP
 			logger.Debugw(ctx, "Port State Change to UP", log.Fields{"Device": d.ID, "Port": port})
-			GetController().PortUpInd(cntx, d.ID, p.Name)
+			GetController().PortUpInd(cntx, d.ID, p.Name, skipFlowPushToVoltha)
 			p.State = PortStateUp
 			d.WritePortToDb(cntx, p)
 		} else if (state != uint32(ofp.OfpPortState_OFPPS_LIVE)) && (p.State != PortStateDown) {
@@ -865,7 +867,6 @@
 			logger.Debugw(ctx, "Port State Change to Down", log.Fields{"Device": d.ID, "Port": port})
 			GetController().PortDownInd(cntx, d.ID, p.Name)
 			p.State = PortStateDown
-			d.WritePortToDb(cntx, p)
 		} else {
 			logger.Warnw(ctx, "Dropping Port Ind: No Change in Port State", log.Fields{"PortName": p.Name, "ID": port, "Device": d.ID, "PortState": p.State, "IncomingState": state})
 		}
@@ -884,7 +885,7 @@
 		switch p.State {
 		case PortStateUp:
 			logger.Debugw(ctx, "Port State: UP", log.Fields{"Device": d.ID, "Port": port})
-			GetController().PortUpInd(cntx, d.ID, p.Name)
+			GetController().PortUpInd(cntx, d.ID, p.Name, false)
 		case PortStateDown:
 			logger.Debugw(ctx, "Port State: Down", log.Fields{"Device": d.ID, "Port": port})
 			GetController().PortDownInd(cntx, d.ID, p.Name)
@@ -1084,6 +1085,44 @@
 	return flowCount >= uint32(GetController().GetMaxFlowRetryAttempt())
 }
 
+// IsUSTable0Flow - check if the flow is for US Table 0
+func (d *Device) IsUSTable0Flow(cntx context.Context, flow *of.VoltSubFlow) bool {
+	if flow.TableID == 0 && !util.IsNniPort(flow.Match.InPort) {
+		return true
+	}
+	return false
+}
+
+// IsDSTable0Flow - check if the flow is for DS Table 0
+func (d *Device) IsDSTable0Flow(cntx context.Context, flow *of.VoltSubFlow) bool {
+	if flow.TableID == 0 && util.IsNniPort(flow.Match.InPort) {
+		return true
+	}
+	return false
+}
+
+// GetDeviceFlow - get the DS or US Table 1 flow based on isDsFlow flag
+func (d *Device) GetDeviceFlow(cntx context.Context, flow *of.VoltSubFlow, deviceSerialNum string, devID string, isDsFlow bool) *of.VoltSubFlow {
+	cookies := GetController().GetAllFlowsForSvc(cntx, flow, devID, deviceSerialNum)
+	for _, cookie := range cookies {
+		if dbFlow, ok := d.flows[cookie]; ok {
+			logger.Debugw(ctx, "Found flow in device", log.Fields{"Cookie": cookie, "Flow": dbFlow})
+			if isDsFlow {
+				// return DS Table1 flow
+				if dbFlow.TableID == 1 && util.IsNniPort(dbFlow.Match.InPort) {
+					return dbFlow
+				}
+			} else {
+				// return US Table1 flow
+				if dbFlow.TableID == 1 && !util.IsNniPort(dbFlow.Match.InPort) {
+					return dbFlow
+				}
+			}
+		}
+	}
+	return nil
+}
+
 // IsFlowAddThresholdReached - check if the attempts for flow add has reached threshold or not
 func (d *Device) IsFlowAddThresholdReached(flowCount uint32, cookie uint64) bool {
 	logger.Debugw(ctx, "Check flow add threshold", log.Fields{"Cookie": cookie, "FlowCount": flowCount})
@@ -1093,7 +1132,6 @@
 func (d *Device) UpdateFlowCount(cntx context.Context, cookie uint64) {
 	if dbFlow, ok := d.flows[cookie]; ok {
 		dbFlow.FlowCount++
-		d.AddFlowToDb(cntx, dbFlow)
 	}
 }
 
@@ -1116,10 +1154,6 @@
 		if dbFlow, ok := d.flows[cookie]; ok {
 			dbFlow.State = uint8(state)
 			dbFlow.ErrorReason = reason
-			if state == of.FlowAddSuccess {
-				dbFlow.FlowCount = 0
-			}
-			d.AddFlowToDb(cntx, dbFlow)
 		}
 	}
 
diff --git a/internal/pkg/controller/device_test.go b/internal/pkg/controller/device_test.go
index 8f35aef..ab832b7 100644
--- a/internal/pkg/controller/device_test.go
+++ b/internal/pkg/controller/device_test.go
@@ -168,7 +168,6 @@
 			_ = NewController(context.Background(), appMock)
 			dbintf := mocks.NewMockDBIntf(gomock.NewController(t))
 			db = dbintf
-			dbintf.EXPECT().PutFlow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
 			appMock.EXPECT().ProcessFlowModResultIndication(gomock.Any(), gomock.Any()).Times(1)
 			d.triggerFlowResultNotification(tt.args.cntx, tt.args.cookie, tt.args.flow, tt.args.oper, tt.args.bwDetails, tt.args.err)
 		})
diff --git a/internal/pkg/intf/appif.go b/internal/pkg/intf/appif.go
index 91a9f8a..e03fd50 100644
--- a/internal/pkg/intf/appif.go
+++ b/internal/pkg/intf/appif.go
@@ -26,7 +26,7 @@
 	PortDelInd(context.Context, string, string)
 	PortUpdateInd(string, string, uint32)
 	PacketInInd(context.Context, string, string, []byte)
-	PortUpInd(context.Context, string, string)
+	PortUpInd(context.Context, string, string, bool)
 	PortDownInd(context.Context, string, string)
 	AddDevice(context.Context, string, string, string)
 	DeviceUpInd(string)
@@ -35,6 +35,7 @@
 	SetRebootFlag(bool)
 	ProcessFlowModResultIndication(context.Context, FlowStatus)
 	CheckAndDeactivateService(context.Context, *of.VoltSubFlow, string, string)
+	GetAllFlowsForSvc(context.Context, *of.VoltSubFlow, string, string) []uint64
 	DeviceRebootInd(context.Context, string, string, string)
 	DeviceDisableInd(context.Context, string)
 	UpdateMvlanProfilesForDevice(context.Context, string)
diff --git a/internal/pkg/vpagent/common.go b/internal/pkg/vpagent/common.go
index 5b62368..a8046f6 100644
--- a/internal/pkg/vpagent/common.go
+++ b/internal/pkg/vpagent/common.go
@@ -30,17 +30,24 @@
 	if err == nil {
 		return false
 	}
-	// >= gRPC v1.23.x
+
 	s, ok := status.FromError(err)
 	if ok {
-		// connection is canceled or server has already closed the connection
-		return s.Code() == codes.Canceled || s.Message() == "transport is closing"
-	}
-
-	e, ok := status.FromError(err)
-	if ok {
-		// connection is canceled or server has already closed the connection
-		return e.Code() == codes.Canceled || e.Message() == "all SubConns are in TransientFailure"
+		switch {
+		case s.Code() == codes.Canceled || s.Message() == "transport is closing":
+			// >= gRPC v1.23.x
+			// connection is canceled or server has already closed the connection
+			return true
+		case s.Message() == "all SubConns are in TransientFailure":
+			return true
+		case s.Code() == codes.Unavailable || s.Message() == "error reading from server: EOF":
+			// >= gRPC v1.76.x
+			// connection is unavailable or server has already closed the connection
+			return true
+		case s.Code() == codes.Internal || s.Code() == codes.DeadlineExceeded:
+			// connection is interrupted
+			return true
+		}
 	}
 
 	// >= gRPC v1.10.x
diff --git a/internal/test/mocks/mock_appif.go b/internal/test/mocks/mock_appif.go
index b61576c..a987f3a 100644
--- a/internal/test/mocks/mock_appif.go
+++ b/internal/test/mocks/mock_appif.go
@@ -172,15 +172,15 @@
 }
 
 // PortUpInd mocks base method.
-func (m *MockApp) PortUpInd(arg0 context.Context, arg1, arg2 string) {
+func (m *MockApp) PortUpInd(arg0 context.Context, arg1, arg2 string, arg3 bool) {
 	m.ctrl.T.Helper()
-	m.ctrl.Call(m, "PortUpInd", arg0, arg1, arg2)
+	m.ctrl.Call(m, "PortUpInd", arg0, arg1, arg2, arg3)
 }
 
 // PortUpInd indicates an expected call of PortUpInd.
-func (mr *MockAppMockRecorder) PortUpInd(arg0, arg1, arg2 interface{}) *gomock.Call {
+func (mr *MockAppMockRecorder) PortUpInd(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
 	mr.mock.ctrl.T.Helper()
-	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PortUpInd", reflect.TypeOf((*MockApp)(nil).PortUpInd), arg0, arg1, arg2)
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PortUpInd", reflect.TypeOf((*MockApp)(nil).PortUpInd), arg0, arg1, arg2, arg3)
 }
 
 // PortUpdateInd mocks base method.
@@ -232,6 +232,20 @@
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckAndDeactivateService", reflect.TypeOf((*MockApp)(nil).CheckAndDeactivateService), arg0, arg1, arg2)
 }
 
+// GetAllFlowsForSvc mocks base method.
+func (m *MockApp) GetAllFlowsForSvc(arg0 context.Context, arg1 *of.VoltSubFlow, arg2 string, arg3 string) []uint64 {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "GetAllFlowsForSvc", arg0, arg1, arg2, arg3)
+	ret0, _ := ret[0].([]uint64)
+	return ret0
+}
+
+// GetAllFlowsForSvc indicates an expected call of GetAllFlowsForSvc.
+func (mr *MockAppMockRecorder) GetAllFlowsForSvc(arg0, arg1, arg2 interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllFlowsForSvc", reflect.TypeOf((*MockApp)(nil).GetAllFlowsForSvc), arg0, arg1, arg2)
+}
+
 // SetRebootFlag mocks base method.
 func (m *MockApp) SetRebootFlag(arg0 bool) {
 	m.ctrl.T.Helper()