[VOL-5536] - VGC recent fixes
Change-Id: Id6f0e647b37baac827230afbb45d132df8a62b68
Signed-off-by: Sridhar Ravindra <sridhar.ravindra@radisys.com>
diff --git a/internal/pkg/controller/addflows.go b/internal/pkg/controller/addflows.go
index a4f8a8d..83ead45 100644
--- a/internal/pkg/controller/addflows.go
+++ b/internal/pkg/controller/addflows.go
@@ -78,27 +78,10 @@
aft.taskID = taskID
aft.ctx = ctx
flowsToProcess := make(map[uint64]*of.VoltSubFlow)
- flowsPresent := 0
// First add/delete the flows first locally before passing them to actual device
for _, flow := range aft.flow.SubFlows {
logger.Debugw(ctx, "Flow Mod Request", log.Fields{"Cookie": flow.Cookie, "Oper": aft.flow.Command, "Port": aft.flow.PortID})
if aft.flow.Command == of.CommandAdd {
- flow.State = of.FlowAddPending
- if err = aft.device.AddFlow(ctx, flow); err != nil {
- logger.Warnw(ctx, "Add Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
-
- // If flow already exists in cache, check for flow state
- // If Success: Trigger success FLow Indication
- // if Failure: Continue process, so add-retry happens
- if err.Error() == ErrDuplicateFlow {
- dbFlow, _ := aft.device.GetFlow(flow.Cookie)
- if dbFlow.State == of.FlowAddSuccess {
- aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
- flowsPresent++
- continue
- }
- }
- }
flowsToProcess[flow.Cookie] = flow
} else {
dbFlow, ok := aft.device.GetFlow(flow.Cookie)
@@ -114,11 +97,6 @@
}
}
- if flowsPresent == len(aft.flow.SubFlows) {
- logger.Warn(ctx, "All Flows already present in database. Skipping Flow Push to SB")
- return nil
- }
-
// PortName and PortID are used for validation of PortID, whether it is still valid and associated with old PortName or
// PortID got assigned to another PortName. If the condition met, skip these flow update to voltha core
if aft.flow.PortName != "" && aft.flow.PortID != 0 {
diff --git a/internal/pkg/controller/auditdevice.go b/internal/pkg/controller/auditdevice.go
index ecee452..37a9837 100644
--- a/internal/pkg/controller/auditdevice.go
+++ b/internal/pkg/controller/auditdevice.go
@@ -20,6 +20,7 @@
"time"
"voltha-go-controller/internal/pkg/tasks"
+ "voltha-go-controller/internal/pkg/util"
"voltha-go-controller/log"
"github.com/opencord/voltha-protos/v5/go/common"
@@ -43,12 +44,13 @@
// AuditDevice structure
type AuditDevice struct {
- ctx context.Context
- device *Device
- timestamp string
- event AuditEventType
- taskID uint8
- stop bool
+ ctx context.Context
+ device *Device
+ timestamp string
+ event AuditEventType
+ taskID uint8
+ stop bool
+ skipFlowOnRestart bool
}
// NewAuditDevice is constructor for AuditDevice
@@ -107,11 +109,11 @@
missingPorts := make(map[uint32]*ofp.OfpPort)
for _, ofpp := range ofpps.Items {
missingPorts[ofpp.OfpPort.PortNo] = ofpp.OfpPort
- logger.Infow(ctx, "Missing Ports", log.Fields{"Ports": ofpp.OfpPort, "missingPorts": missingPorts})
}
excessPorts := make(map[uint32]*DevicePort)
GetController().SetAuditFlags(ad.device)
+ defer GetController().ResetAuditFlags(ad.device)
processPortState := func(id uint32, vgcPort *DevicePort) {
logger.Debugw(ctx, "Process Port State Ind", log.Fields{"Port No": vgcPort.ID, "Port Name": vgcPort.Name})
@@ -121,7 +123,7 @@
// This port exists in the received list and the map at
// VGC. This is common so delete it
logger.Infow(ctx, "Port State Mismatch", log.Fields{"Port": vgcPort.ID, "OfpPort": ofpPort.PortNo, "ReceivedState": ofpPort.State, "CurrentState": vgcPort.State})
- ad.device.ProcessPortState(ctx, ofpPort.PortNo, ofpPort.State, ofpPort.Name)
+ ad.device.ProcessPortState(ctx, ofpPort.PortNo, ofpPort.State, ofpPort.Name, ad.skipFlowOnRestart)
} else {
//To ensure the flows are in sync with port status and no mismatch due to reboot,
// repush/delete flows based on current port status
@@ -138,13 +140,15 @@
}
// 1st process the NNI port before all other ports so that the device state can be updated.
- if vgcPort, ok := ad.device.PortsByID[NNIPortID]; ok {
- logger.Debugw(ctx, "Processing NNI port state", log.Fields{"PortNo": vgcPort.ID, "PortName": vgcPort.Name, "PortState": vgcPort.State})
- processPortState(NNIPortID, vgcPort)
+ for id, vgcPort := range ad.device.PortsByID {
+ if util.IsNniPort(id) {
+ logger.Debugw(ctx, "Processing NNI port state", log.Fields{"PortNo": vgcPort.ID, "PortName": vgcPort.Name, "PortState": vgcPort.State})
+ processPortState(id, vgcPort)
+ }
}
for id, vgcPort := range ad.device.PortsByID {
- if id == NNIPortID {
+ if util.IsNniPort(id) {
//NNI port already processed
continue
}
@@ -153,7 +157,6 @@
}
processPortState(id, vgcPort)
}
- GetController().ResetAuditFlags(ad.device)
if ad.stop {
logger.Errorw(ctx, "Audit Device Task Canceled", log.Fields{"Context": ad.ctx, "Task": ad.taskID})
@@ -168,30 +171,30 @@
// AddMissingPorts to add the missing ports
func (ad *AuditDevice) AddMissingPorts(cntx context.Context, mps map[uint32]*ofp.OfpPort) {
- logger.Infow(ctx, "Device Audit - Add Missing Ports", log.Fields{"NumPorts": len(mps), "Ports": mps})
+ logger.Debugw(ctx, "Device Audit - Add Missing Ports", log.Fields{"NumPorts": len(mps), "Ports": mps})
addMissingPort := func(mp *ofp.OfpPort) {
logger.Debugw(ctx, "Process Port Add Ind", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name})
- // Error is ignored as it only drops duplicate ports
- logger.Debugw(ctx, "Calling AddPort", log.Fields{"No": mp.PortNo, "Name": mp.Name})
if err := ad.device.AddPort(cntx, mp); err != nil {
logger.Warnw(ctx, "AddPort Failed", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name, "Reason": err})
}
if mp.State == uint32(ofp.OfpPortState_OFPPS_LIVE) {
- ad.device.ProcessPortState(cntx, mp.PortNo, mp.State, mp.Name)
+ ad.device.ProcessPortState(cntx, mp.PortNo, mp.State, mp.Name, ad.skipFlowOnRestart)
}
logger.Debugw(ctx, "Processed Port Add Ind", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name})
}
// 1st process the NNI port before all other ports so that the flow provisioning for UNIs can be enabled
- if mp, ok := mps[NNIPortID]; ok {
- logger.Debugw(ctx, "Adding Missing NNI port", log.Fields{"PortNo": mp.PortNo, "Port Name": mp.Name, "Port Status": mp.State})
- addMissingPort(mp)
+ for portNo, mp := range mps {
+ if util.IsNniPort(portNo) {
+ logger.Debugw(ctx, "Adding Missing NNI port", log.Fields{"PortNo": mp.PortNo, "Port Name": mp.Name, "Port Status": mp.State})
+ addMissingPort(mp)
+ }
}
for portNo, mp := range mps {
- if portNo != NNIPortID {
+ if !util.IsNniPort(portNo) {
addMissingPort(mp)
}
}
diff --git a/internal/pkg/controller/audittables.go b/internal/pkg/controller/audittables.go
index d98888c..9c4c712 100644
--- a/internal/pkg/controller/audittables.go
+++ b/internal/pkg/controller/audittables.go
@@ -17,10 +17,9 @@
import (
"context"
- "strconv"
+ "errors"
"time"
- "voltha-go-controller/internal/pkg/intf"
"voltha-go-controller/internal/pkg/of"
"voltha-go-controller/internal/pkg/tasks"
"voltha-go-controller/internal/pkg/util"
@@ -235,19 +234,21 @@
return err
}
- defaultSuccessFlowStatus := intf.FlowStatus{
- Device: att.device.ID,
- FlowModType: of.CommandAdd,
- Status: 0,
- Reason: "",
- }
+ // defaultSuccessFlowStatus := intf.FlowStatus{
+ // Device: att.device.ID,
+ // FlowModType: of.CommandAdd,
+ // Status: 0,
+ // Reason: "",
+ // }
// Build the map for easy and faster processing
rcvdFlows := make(map[uint64]*ofp.OfpFlowStats)
+ volthaFlows := make(map[uint64]*ofp.OfpFlowStats)
flowsToAdd := &of.VoltFlow{}
flowsToAdd.SubFlows = make(map[uint64]*of.VoltSubFlow)
for _, flow := range f.Items {
rcvdFlows[flow.Cookie] = flow
+ volthaFlows[flow.Cookie] = flow
}
att.device.flowLock.Lock()
@@ -269,10 +270,68 @@
// Update flow delete count since we are retrying the flow delete due to failure
att.device.UpdateFlowCount(cntx, flow.Cookie)
}
- defaultSuccessFlowStatus.Cookie = strconv.FormatUint(flow.Cookie, 10)
+ // defaultSuccessFlowStatus.Cookie = strconv.FormatUint(flow.Cookie, 10)
} else {
+ // Do not add the flow to device whose state was marked as delete failure
+ // Remove the flow from DB as it is no longer reported by voltha
+ if flow.State == of.FlowDelFailure {
+ delete(att.device.flows, flow.Cookie)
+ att.device.DelFlowFromDb(cntx, flow.Cookie)
+ logger.Warnw(ctx, "Found flow with state DelFailure while adding to device, will remove from DB", log.Fields{"Cookie": flow.Cookie})
+ continue
+ }
// The flow exists at the controller but not at the device
// Push the flow to the device
+
+ // If UST0 flow is missing in voltha but the UST1 flow is present in voltha,
+ // then delete the UST1 flow from voltha and add both US flows to voltha
+ if att.device.IsUSTable0Flow(ctx, flow) {
+ logger.Debugw(ctx, "UST0 flow found, checking for UST1 flow", log.Fields{"Cookie": flow.Cookie})
+ ust1Flow := att.device.GetDeviceFlow(ctx, flow, att.device.SerialNum, att.device.ID, false)
+ if ust1Flow != nil {
+ flowToDelete, ok := volthaFlows[ust1Flow.Cookie]
+ if ok {
+ // Sometimes the audit would happen even before all flows are installed for a service.
+ // If the UST0 flow is still missing in voltha on second retry attempt, then remove the UST1 flow from voltha.
+ if flow.FlowCount == 0 {
+ att.device.UpdateFlowCount(cntx, flow.Cookie)
+ continue
+ }
+ logger.Infow(ctx, "UST1 flow already present in Voltha, delete and install US flows", log.Fields{"UST1Flow": ust1Flow.Cookie, "UST0Flow": flow.Cookie})
+ err = att.DeleteDeviceFlow(ctx, flowToDelete)
+ if err == nil {
+ flowsToAdd.SubFlows[ust1Flow.Cookie] = ust1Flow
+ } else {
+ logger.Warnw(ctx, "UST1 flow delete failed", log.Fields{"Cookie": ust1Flow.Cookie, "Reason": err.Error()})
+ }
+ }
+ }
+ }
+
+ // If DST0 flow is missing in voltha but the DST1 flow is present in voltha,
+ // then delete the DST1 flow from voltha and add both DS flows to voltha
+ if att.device.IsDSTable0Flow(ctx, flow) {
+ logger.Debugw(ctx, "DST0 flow found, checking for DST1 flow", log.Fields{"Cookie": flow.Cookie})
+ dst1Flow := att.device.GetDeviceFlow(ctx, flow, att.device.SerialNum, att.device.ID, true)
+ if dst1Flow != nil {
+ flowToDelete, ok := volthaFlows[dst1Flow.Cookie]
+ if ok {
+ // Sometimes the audit would happen even before all flows are installed for a service.
+ // If the DST0 flow is still missing in voltha on second retry attempt, then remove the DST1 flow from voltha.
+ if flow.FlowCount == 0 {
+ att.device.UpdateFlowCount(cntx, flow.Cookie)
+ continue
+ }
+ logger.Infow(ctx, "DST1 flow already present in Voltha, delete and install DS flows", log.Fields{"DST1Flow": dst1Flow.Cookie, "DST0Flow": flow.Cookie})
+ err = att.DeleteDeviceFlow(ctx, flowToDelete)
+ if err == nil {
+ flowsToAdd.SubFlows[dst1Flow.Cookie] = dst1Flow
+ } else {
+ logger.Warnw(ctx, "DST1 flow delete failed", log.Fields{"Cookie": dst1Flow.Cookie, "Reason": err.Error()})
+ }
+ }
+ }
+ }
logger.Debugw(ctx, "Adding Flow To Missing Flows", log.Fields{"Cookie": flow.Cookie})
if !att.device.IsFlowAddThresholdReached(flow.FlowCount, flow.Cookie) {
flowsToAdd.SubFlows[flow.Cookie] = flow
@@ -380,6 +439,44 @@
}
}
+func (att *AuditTablesTask) DeleteDeviceFlow(cntx context.Context, flow *ofp.OfpFlowStats) error {
+ logger.Debugw(ctx, "Deleting Flow", log.Fields{"Cookie": flow.Cookie})
+
+ // Create the flowMod structure and fill it out
+ flowMod := &ofp.OfpFlowMod{}
+ flowMod.Cookie = flow.Cookie
+ flowMod.TableId = flow.TableId
+ flowMod.Command = ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT
+ flowMod.IdleTimeout = flow.IdleTimeout
+ flowMod.HardTimeout = flow.HardTimeout
+ flowMod.Priority = flow.Priority
+ flowMod.BufferId = of.DefaultBufferID
+ flowMod.OutPort = of.DefaultOutPort
+ flowMod.OutGroup = of.DefaultOutGroup
+ flowMod.Flags = flow.Flags
+ flowMod.Match = flow.Match
+ flowMod.Instructions = flow.Instructions
+
+ // Create FlowTableUpdate
+ flowUpdate := &ofp.FlowTableUpdate{
+ Id: att.device.ID,
+ FlowMod: flowMod,
+ }
+
+ var err error
+ var vc voltha.VolthaServiceClient
+ if vc = att.device.VolthaClient(); vc == nil {
+ logger.Error(ctx, "Delete flow failed: Voltha Client Unavailable")
+ return errors.New("voltha client unavailable")
+ }
+
+ if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flowUpdate); err != nil {
+ logger.Errorw(ctx, "Flow delete failed", log.Fields{"Reason": err.Error()})
+ return err
+ }
+ return nil
+}
+
// AuditGroups audit the groups which includes fetching the existing groups at the
// voltha and identifying the delta between the ones held here and the
// ones held at VOLTHA. The delta must be cleaned up to keep both the
@@ -582,7 +679,7 @@
// This port exists in the received list and the map at
// VGC. This is common so delete it
logger.Infow(ctx, "Port State Mismatch", log.Fields{"Port": vgcPort.ID, "OfpPort": ofpPort.PortNo, "ReceivedState": ofpPort.State, "CurrentState": vgcPort.State})
- att.device.ProcessPortState(ctx, ofpPort.PortNo, ofpPort.State, ofpPort.Name)
+ att.device.ProcessPortState(ctx, ofpPort.PortNo, ofpPort.State, ofpPort.Name, false)
}
delete(missingPorts, id)
} else {
@@ -593,13 +690,15 @@
logger.Debugw(ctx, "Processed Port State Ind", log.Fields{"Port No": vgcPort.ID, "Port Name": vgcPort.Name})
}
// 1st process the NNI port before all other ports so that the device state can be updated.
- if vgcPort, ok := att.device.PortsByID[NNIPortID]; ok {
- logger.Debugw(ctx, "Processing NNI port state", log.Fields{"Port ID": vgcPort.ID, "Port Name": vgcPort.Name})
- processPortState(NNIPortID, vgcPort)
+ for id, vgcPort := range att.device.PortsByID {
+ if util.IsNniPort(id) {
+ logger.Debugw(ctx, "Processing NNI port state", log.Fields{"Port ID": vgcPort.ID, "Port Name": vgcPort.Name})
+ processPortState(id, vgcPort)
+ }
}
for id, vgcPort := range att.device.PortsByID {
- if id == NNIPortID {
+ if util.IsNniPort(id) {
// NNI port already processed
continue
}
@@ -629,19 +728,21 @@
logger.Warnw(ctx, "AddPort Failed", log.Fields{"No": mp.PortNo, "Name": mp.Name, "Reason": err})
}
if mp.State == uint32(ofp.OfpPortState_OFPPS_LIVE) {
- att.device.ProcessPortState(cntx, mp.PortNo, mp.State, mp.Name)
+ att.device.ProcessPortState(cntx, mp.PortNo, mp.State, mp.Name, false)
}
logger.Debugw(ctx, "Processed Port Add Ind", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name})
}
// 1st process the NNI port before all other ports so that the flow provisioning for UNIs can be enabled
- if mp, ok := mps[NNIPortID]; ok {
- logger.Debugw(ctx, "Adding Missing NNI port", log.Fields{"PortNo": mp.PortNo})
- addMissingPort(mp)
+ for portNo, mp := range mps {
+ if util.IsNniPort(portNo) {
+ logger.Debugw(ctx, "Adding Missing NNI port", log.Fields{"PortNo": mp.PortNo, "Port Name": mp.Name, "Port Status": mp.State})
+ addMissingPort(mp)
+ }
}
for portNo, mp := range mps {
- if portNo != NNIPortID {
+ if !util.IsNniPort(portNo) {
addMissingPort(mp)
}
}
diff --git a/internal/pkg/controller/changeevent.go b/internal/pkg/controller/changeevent.go
index 54b6d22..0144297 100644
--- a/internal/pkg/controller/changeevent.go
+++ b/internal/pkg/controller/changeevent.go
@@ -77,7 +77,7 @@
case ofp.OfpPortReason_OFPPR_ADD:
_ = cet.device.AddPort(ctx, status.PortStatus.Desc)
if state == uint32(ofp.OfpPortState_OFPPS_LIVE) {
- cet.device.ProcessPortState(ctx, portNo, state, portName)
+ cet.device.ProcessPortState(ctx, portNo, state, portName, false)
}
case ofp.OfpPortReason_OFPPR_DELETE:
cet.device.CheckAndDeletePort(ctx, portNo, portName)
diff --git a/internal/pkg/controller/controller.go b/internal/pkg/controller/controller.go
index 1c02738..ba33d78 100644
--- a/internal/pkg/controller/controller.go
+++ b/internal/pkg/controller/controller.go
@@ -131,8 +131,6 @@
d.RestoreMetersFromDb(cntx)
d.RestoreGroupsFromDb(cntx)
- d.RestoreFlowsFromDb(cntx)
- d.RestorePortsFromDb(cntx)
d.ConnectInd(context.TODO(), intf.DeviceDisc)
d.packetOutChannel = config.PacketOutChannel
@@ -280,6 +278,10 @@
v.app.CheckAndDeactivateService(ctx, flow, devSerialNum, devID)
}
+func (v *VoltController) GetAllFlowsForSvc(ctx context.Context, flow *of.VoltSubFlow, devID string, devSerialNum string) []uint64 {
+ return v.app.GetAllFlowsForSvc(ctx, flow, devID, devSerialNum)
+}
+
// AddVPAgent to add the vpagent
func (v *VoltController) AddVPAgent(vep string, vpa *vpagent.VPAgent) {
v.vagent[vep] = vpa
@@ -306,7 +308,7 @@
}
// AddFlows to add flows
-func (v *VoltController) AddFlows(cntx context.Context, port string, device string, flow *of.VoltFlow) error {
+func (v *VoltController) AddFlows(cntx context.Context, port string, device string, flow *of.VoltFlow, skipFlowPushToVoltha bool) error {
d, err := v.GetDevice(device)
if err != nil {
logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
@@ -347,10 +349,28 @@
}
}
} else {
- flow.Command = of.CommandAdd
- d.UpdateFlows(flow, devPort)
- for cookie := range flow.SubFlows {
- logger.Debugw(ctx, "Flow Add added to queue", log.Fields{"Cookie": cookie, "Device": device, "Port": port})
+ flowsToVoltha := &of.VoltFlow{}
+ flowsToVoltha.SubFlows = make(map[uint64]*of.VoltSubFlow)
+ // During VGC restart, build and add flows only to cache.
+ // No need to push flows to voltha now as it will be audited later
+ for _, subFlow := range flow.SubFlows {
+ logger.Debugw(ctx, "Adding flows to device cache", log.Fields{"Cookie": subFlow.Cookie})
+ if !skipFlowPushToVoltha {
+ subFlow.State = of.FlowAddPending
+ }
+ if err := d.AddFlow(cntx, subFlow); err != nil {
+ logger.Warnw(ctx, "Add Flow Error", log.Fields{"Cookie": subFlow.Cookie, "Reason": err.Error()})
+ } else {
+ flowsToVoltha.SubFlows[subFlow.Cookie] = subFlow
+ }
+ }
+
+ if !skipFlowPushToVoltha {
+ flowsToVoltha.Command = of.CommandAdd
+ d.UpdateFlows(flowsToVoltha, devPort)
+ for cookie := range flowsToVoltha.SubFlows {
+ logger.Debugw(ctx, "Flow Add added to queue", log.Fields{"Cookie": cookie, "Device": device, "Port": port})
+ }
}
}
return nil
@@ -471,8 +491,8 @@
}
// PortUpInd for port up indication
-func (v *VoltController) PortUpInd(cntx context.Context, device string, port string) {
- v.app.PortUpInd(cntx, device, port)
+func (v *VoltController) PortUpInd(cntx context.Context, device string, port string, flag bool) {
+ v.app.PortUpInd(cntx, device, port, flag)
}
// PortDownInd for port down indication
diff --git a/internal/pkg/controller/controller_test.go b/internal/pkg/controller/controller_test.go
index 18b16bc..e47f727 100644
--- a/internal/pkg/controller/controller_test.go
+++ b/internal/pkg/controller/controller_test.go
@@ -173,7 +173,7 @@
v.Devices.Store(key, value)
return true
})
- if err := v.AddFlows(tt.args.cntx, tt.args.port, tt.args.device, tt.args.flow); (err != nil) != tt.wantErr {
+ if err := v.AddFlows(tt.args.cntx, tt.args.port, tt.args.device, tt.args.flow, false); (err != nil) != tt.wantErr {
t.Errorf("VoltController.AddFlows() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -1219,9 +1219,11 @@
State: 1,
SetVlan: of.VlanAny,
}
+
+ // Setup database mock for async operations
dbintf := mocks.NewMockDBIntf(gomock.NewController(t))
db = dbintf
- dbintf.EXPECT().PutGroup(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
+ dbintf.EXPECT().PutGroup(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
tests := []struct {
name string
args args
@@ -1263,7 +1265,18 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
switch tt.name {
- case "GroupUpdate", "DeviceNOtFound_Error", "PortNOtFound_Error":
+ case "GroupUpdate":
+ v := &VoltController{
+ Devices: sync.Map{},
+ }
+ dev.Range(func(key, value interface{}) bool {
+ v.Devices.Store(key, value)
+ return true
+ })
+ if err := v.GroupUpdate(tt.args.port, tt.args.device, tt.args.group); (err != nil) != tt.wantErr {
+ t.Errorf("VoltController.GroupUpdate() error = %v, wantErr %v", err, tt.wantErr)
+ }
+ case "DeviceNOtFound_Error", "PortNOtFound_Error":
v := &VoltController{
Devices: sync.Map{},
}
diff --git a/internal/pkg/controller/device.go b/internal/pkg/controller/device.go
index 0748fa7..93fae14 100644
--- a/internal/pkg/controller/device.go
+++ b/internal/pkg/controller/device.go
@@ -242,7 +242,6 @@
}
}
d.flows[flow.Cookie] = flow
- d.AddFlowToDb(cntx, flow)
return nil
}
@@ -504,7 +503,6 @@
p := NewDevicePort(mp)
d.PortsByID[id] = p
d.PortsByName[name] = p
- d.WritePortToDb(cntx, p)
d.portLock.Unlock()
GetController().PortAddInd(cntx, d.ID, p.ID, p.Name)
logger.Infow(ctx, "Added Port", log.Fields{"Device": d.ID, "Port": id})
@@ -563,7 +561,6 @@
delete(d.PortsByID, p.ID)
p.ID = port
d.PortsByID[port] = p
- d.WritePortToDb(cntx, p)
GetController().PortUpdateInd(d.ID, p.Name, p.ID)
logger.Infow(ctx, "Updated Port", log.Fields{"Device": d.ID, "Port": p.ID, "PortName": name})
}
@@ -698,6 +695,12 @@
logger.Debugw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
t := NewAuditDevice(d, AuditEventDeviceDisc)
+ // During VGC restart or when a device is added. skip pushing flows to voltha during audit device task
+ // When device is added, if required the flows will get pushed during the next audit table task called soon after this audit device task
+ if discType == intf.DeviceDisc {
+ t.skipFlowOnRestart = true
+ }
+
d.Tasks.AddTask(t)
t1 := NewAuditTablesTask(d)
@@ -788,7 +791,6 @@
logger.Debugw(ctx, "Resetting Port State to DOWN", log.Fields{"Device": d.ID, "Port": port})
GetController().PortDownInd(cntx, d.ID, port.Name)
port.State = PortStateDown
- d.WritePortToDb(cntx, port)
}
}
}
@@ -818,7 +820,7 @@
return
//Do not process port update received from change event, as we will only handle port updates during polling
}
- d.ProcessPortState(cntx, port, state, portName)
+ d.ProcessPortState(cntx, port, state, portName, false)
}
}
@@ -838,7 +840,7 @@
// ProcessPortState deals with the change in port status and taking action
// based on the new state and the old state
-func (d *Device) ProcessPortState(cntx context.Context, port uint32, state uint32, portName string) {
+func (d *Device) ProcessPortState(cntx context.Context, port uint32, state uint32, portName string, skipFlowPushToVoltha bool) {
if d.State != DeviceStateUP && !util.IsNniPort(port) {
logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
return
@@ -857,7 +859,7 @@
if state == uint32(ofp.OfpPortState_OFPPS_LIVE) && p.State == PortStateDown {
// Transition from DOWN to UP
logger.Debugw(ctx, "Port State Change to UP", log.Fields{"Device": d.ID, "Port": port})
- GetController().PortUpInd(cntx, d.ID, p.Name)
+ GetController().PortUpInd(cntx, d.ID, p.Name, skipFlowPushToVoltha)
p.State = PortStateUp
d.WritePortToDb(cntx, p)
} else if (state != uint32(ofp.OfpPortState_OFPPS_LIVE)) && (p.State != PortStateDown) {
@@ -865,7 +867,6 @@
logger.Debugw(ctx, "Port State Change to Down", log.Fields{"Device": d.ID, "Port": port})
GetController().PortDownInd(cntx, d.ID, p.Name)
p.State = PortStateDown
- d.WritePortToDb(cntx, p)
} else {
logger.Warnw(ctx, "Dropping Port Ind: No Change in Port State", log.Fields{"PortName": p.Name, "ID": port, "Device": d.ID, "PortState": p.State, "IncomingState": state})
}
@@ -884,7 +885,7 @@
switch p.State {
case PortStateUp:
logger.Debugw(ctx, "Port State: UP", log.Fields{"Device": d.ID, "Port": port})
- GetController().PortUpInd(cntx, d.ID, p.Name)
+ GetController().PortUpInd(cntx, d.ID, p.Name, false)
case PortStateDown:
logger.Debugw(ctx, "Port State: Down", log.Fields{"Device": d.ID, "Port": port})
GetController().PortDownInd(cntx, d.ID, p.Name)
@@ -1084,6 +1085,44 @@
return flowCount >= uint32(GetController().GetMaxFlowRetryAttempt())
}
+// IsUSTable0Flow - check if the flow is for US Table 0
+func (d *Device) IsUSTable0Flow(cntx context.Context, flow *of.VoltSubFlow) bool {
+ if flow.TableID == 0 && !util.IsNniPort(flow.Match.InPort) {
+ return true
+ }
+ return false
+}
+
+// IsDSTable0Flow - check if the flow is for DS Table 0
+func (d *Device) IsDSTable0Flow(cntx context.Context, flow *of.VoltSubFlow) bool {
+ if flow.TableID == 0 && util.IsNniPort(flow.Match.InPort) {
+ return true
+ }
+ return false
+}
+
+// GetDeviceFlow - get the DS or US Table 1 flow based on isDsFlow flag
+func (d *Device) GetDeviceFlow(cntx context.Context, flow *of.VoltSubFlow, deviceSerialNum string, devID string, isDsFlow bool) *of.VoltSubFlow {
+ cookies := GetController().GetAllFlowsForSvc(cntx, flow, devID, deviceSerialNum)
+ for _, cookie := range cookies {
+ if dbFlow, ok := d.flows[cookie]; ok {
+ logger.Debugw(ctx, "Found flow in device", log.Fields{"Cookie": cookie, "Flow": dbFlow})
+ if isDsFlow {
+ // return DS Table1 flow
+ if dbFlow.TableID == 1 && util.IsNniPort(dbFlow.Match.InPort) {
+ return dbFlow
+ }
+ } else {
+ // return US Table1 flow
+ if dbFlow.TableID == 1 && !util.IsNniPort(dbFlow.Match.InPort) {
+ return dbFlow
+ }
+ }
+ }
+ }
+ return nil
+}
+
// IsFlowAddThresholdReached - check if the attempts for flow add has reached threshold or not
func (d *Device) IsFlowAddThresholdReached(flowCount uint32, cookie uint64) bool {
logger.Debugw(ctx, "Check flow add threshold", log.Fields{"Cookie": cookie, "FlowCount": flowCount})
@@ -1093,7 +1132,6 @@
func (d *Device) UpdateFlowCount(cntx context.Context, cookie uint64) {
if dbFlow, ok := d.flows[cookie]; ok {
dbFlow.FlowCount++
- d.AddFlowToDb(cntx, dbFlow)
}
}
@@ -1116,10 +1154,6 @@
if dbFlow, ok := d.flows[cookie]; ok {
dbFlow.State = uint8(state)
dbFlow.ErrorReason = reason
- if state == of.FlowAddSuccess {
- dbFlow.FlowCount = 0
- }
- d.AddFlowToDb(cntx, dbFlow)
}
}
diff --git a/internal/pkg/controller/device_test.go b/internal/pkg/controller/device_test.go
index 8f35aef..ab832b7 100644
--- a/internal/pkg/controller/device_test.go
+++ b/internal/pkg/controller/device_test.go
@@ -168,7 +168,6 @@
_ = NewController(context.Background(), appMock)
dbintf := mocks.NewMockDBIntf(gomock.NewController(t))
db = dbintf
- dbintf.EXPECT().PutFlow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
appMock.EXPECT().ProcessFlowModResultIndication(gomock.Any(), gomock.Any()).Times(1)
d.triggerFlowResultNotification(tt.args.cntx, tt.args.cookie, tt.args.flow, tt.args.oper, tt.args.bwDetails, tt.args.err)
})