VOL-3419: Replicate voltha flows in openolt agent
- The scale-tester-app will adhere to new openolt.proto interface (version 4.0.2)
and will pass necessary information for openolt-agent to replicate the flows.
- upgrade to voltha-lib-go version 4.0.0

Change-Id: I9d862929ae8ac4468d4e93096f8cd8e16f26ec93
diff --git a/core/workflow_utils.go b/core/workflow_utils.go
index a223c02..818b3ec 100644
--- a/core/workflow_utils.go
+++ b/core/workflow_utils.go
@@ -22,10 +22,9 @@
 	"time"
 
 	"github.com/opencord/openolt-scale-tester/config"
-	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	"github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
-	oop "github.com/opencord/voltha-protos/v3/go/openolt"
-	tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
+	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	oop "github.com/opencord/voltha-protos/v4/go/openolt"
+	tp_pb "github.com/opencord/voltha-protos/v4/go/tech_profile"
 	"golang.org/x/net/context"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
@@ -102,14 +101,14 @@
 
 	if direction == tp_pb.Direction_DOWNSTREAM {
 		SchedCfg, err = subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
-			GetDsScheduler(subs.TpInstance[subs.TestConfig.TpIDList[0]])
+			GetDsScheduler(nil, subs.TpInstance[subs.TestConfig.TpIDList[0]])
 	} else {
 		SchedCfg, err = subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
-			GetUsScheduler(subs.TpInstance[subs.TestConfig.TpIDList[0]])
+			GetUsScheduler(nil, subs.TpInstance[subs.TestConfig.TpIDList[0]])
 	}
 
 	if err != nil {
-		log.Errorw("Failed to create traffic schedulers", log.Fields{"direction": direction, "error": err})
+		logger.Errorw(nil, "Failed to create traffic schedulers", log.Fields{"direction": direction, "error": err})
 		return nil
 	}
 
@@ -132,13 +131,13 @@
 func getTrafficQueues(subs *Subscriber, direction tp_pb.Direction) []*tp_pb.TrafficQueue {
 
 	trafficQueues, err := subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
-		GetTrafficQueues(subs.TpInstance[subs.TestConfig.TpIDList[0]], direction)
+		GetTrafficQueues(nil, subs.TpInstance[subs.TestConfig.TpIDList[0]], direction)
 
 	if err == nil {
 		return trafficQueues
 	}
 
-	log.Errorw("Failed to create traffic queues", log.Fields{"direction": direction, "error": err})
+	logger.Errorw(nil, "Failed to create traffic queues", log.Fields{"direction": direction, "error": err})
 	return nil
 }
 
@@ -178,21 +177,21 @@
 			actionInfo.Cmd = &actionCmd
 			actionInfo.OVid = subs.Stag
 		default:
-			log.Errorw("Unsupported flow type", log.Fields{"flowtype": flowType,
+			logger.Errorw(nil, "Unsupported flow type", log.Fields{"flowtype": flowType,
 				"direction": direction})
 		}
 	} else if direction == Downstream {
 		switch flowType {
 		case EapolFlow:
-			log.Errorw("Downstream EAP flows are not required instead controller "+
+			logger.Errorw(nil, "Downstream EAP flows are not required instead controller "+
 				"packet outs EAP response directly to onu in downstream", log.Fields{"flowtype": flowType,
 				"direction": direction})
 		case DhcpFlowIPV4:
-			log.Errorw("Downstream DHCPIPV4 flows are not required instead we have "+
+			logger.Errorw(nil, "Downstream DHCPIPV4 flows are not required instead we have "+
 				"NNI trap flows already installed", log.Fields{"flowtype": flowType,
 				"direction": direction})
 		case DhcpFlowIPV6:
-			log.Errorw("Downstream DHCPIPV6 flows are not required instead we have "+
+			logger.Errorw(nil, "Downstream DHCPIPV6 flows are not required instead we have "+
 				"NNI trap flows already installed", log.Fields{"flowtype": flowType,
 				"direction": direction})
 		case HsiaFlow:
@@ -203,51 +202,54 @@
 			actionInfo.Cmd = &actionCmd
 			actionInfo.OVid = subs.Stag
 		default:
-			log.Errorw("Unsupported flow type", log.Fields{"flowtype": flowType,
+			logger.Errorw(nil, "Unsupported flow type", log.Fields{"flowtype": flowType,
 				"direction": direction})
 		}
 	}
 	return flowClassifier, actionInfo
 }
 
-func AddFlow(subs *Subscriber, flowType string, direction string, flowID uint32,
-	allocID uint32, gemID uint32, pcp uint32) error {
-	log.Infow("add-flow", log.Fields{"WorkFlow": subs.TestConfig.WorkflowName, "FlowType": flowType,
+func AddFlow(subs *Subscriber, flowType string, direction string, flowID uint64,
+	allocID uint32, gemID uint32, pcp uint32, replicateFlow bool, symmetricFlowID uint64,
+	pbitToGem map[uint32]uint32) error {
+	logger.Infow(nil, "add-flow", log.Fields{"WorkFlow": subs.TestConfig.WorkflowName, "FlowType": flowType,
 		"direction": direction, "flowID": flowID})
 	var err error
 
 	flowClassifier, actionInfo := FormatClassfierAction(flowType, direction, subs)
-	// Update the o_pbit for which this flow has to be classified
-	flowClassifier.OPbits = pcp
+	// Update the o_pbit (if valid) for which this flow has to be classified
+	if pcp != 0xff {
+		flowClassifier.OPbits = pcp
+	}
 	flow := oop.Flow{AccessIntfId: int32(subs.PonIntf), OnuId: int32(subs.OnuID),
 		UniId: int32(subs.UniID), FlowId: flowID,
 		FlowType: direction, AllocId: int32(allocID), GemportId: int32(gemID),
 		Classifier: &flowClassifier, Action: &actionInfo,
-		Priority: 1000, PortNo: subs.UniPortNo}
+		Priority: 1000, PortNo: subs.UniPortNo, SymmetricFlowId: symmetricFlowID,
+		ReplicateFlow: replicateFlow, PbitToGemport: pbitToGem}
 
 	_, err = subs.OpenOltClient.FlowAdd(context.Background(), &flow)
 
 	st, _ := status.FromError(err)
 	if st.Code() == codes.AlreadyExists {
-		log.Debugw("Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
+		logger.Debugw(nil, "Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
 		return nil
 	}
 
 	if err != nil {
-		log.Errorw("Failed to Add flow to device", log.Fields{"err": err, "deviceFlow": flow})
+		logger.Errorw(nil, "Failed to Add flow to device", log.Fields{"err": err, "deviceFlow": flow})
 		return errors.New(ReasonCodeToReasonString(FLOW_ADD_FAILED))
 	}
-	log.Debugw("Flow added to device successfully ", log.Fields{"flow": flow})
+	logger.Debugw(nil, "Flow added to device successfully ", log.Fields{"flow": flow})
 
 	return nil
 }
 
 func AddLldpFlow(oo oop.OpenoltClient, config *config.OpenOltScaleTesterConfig, rsrMgr *OpenOltResourceMgr) error {
-	var flowID []uint32
+	var flowID uint64
 	var err error
 
-	if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(context.Background(), uint32(config.NniIntfID),
-		ponresourcemanager.FLOW_ID, 1); err != nil {
+	if flowID, err = rsrMgr.GetFlowID(context.Background(), uint32(config.NniIntfID)); err != nil {
 		return err
 	}
 
@@ -255,7 +257,7 @@
 	actionCmd := &oop.ActionCmd{TrapToHost: true}
 	actionInfo := &oop.Action{Cmd: actionCmd}
 
-	flow := oop.Flow{AccessIntfId: -1, OnuId: -1, UniId: -1, FlowId: flowID[0],
+	flow := oop.Flow{AccessIntfId: -1, OnuId: -1, UniId: -1, FlowId: flowID,
 		FlowType: "downstream", AllocId: -1, GemportId: -1,
 		Classifier: flowClassifier, Action: actionInfo,
 		Priority: 1000, PortNo: uint32(config.NniIntfID)}
@@ -264,17 +266,15 @@
 
 	st, _ := status.FromError(err)
 	if st.Code() == codes.AlreadyExists {
-		log.Debugw("Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
+		logger.Debugw(nil, "Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
 		return nil
 	}
 
 	if err != nil {
-		log.Errorw("Failed to Add LLDP flow to device", log.Fields{"err": err, "deviceFlow": flow})
-		rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(context.Background(), uint32(config.NniIntfID),
-			ponresourcemanager.FLOW_ID, flowID)
+		logger.Errorw(nil, "Failed to Add LLDP flow to device", log.Fields{"err": err, "deviceFlow": flow})
 		return err
 	}
-	log.Debugw("LLDP flow added to device successfully ", log.Fields{"flow": flow})
+	logger.Debugw(nil, "LLDP flow added to device successfully ", log.Fields{"flow": flow})
 
 	return nil
 }
@@ -312,11 +312,11 @@
 	var res *oop.Empty
 
 	if res, err = oop.OpenoltClient.PerformGroupOperation(oo, context.Background(), groupCfg); err != nil {
-		log.Errorw("Failed to perform - PerformGroupOperation()", log.Fields{"err": err})
+		logger.Errorw(nil, "Failed to perform - PerformGroupOperation()", log.Fields{"err": err})
 		return nil, err
 	}
 
-	log.Info("Successfully called - PerformGroupOperation()")
+	logger.Info(nil, "Successfully called - PerformGroupOperation()")
 
 	return res, nil
 }
@@ -324,7 +324,7 @@
 func CreateGroup(grp *GroupData) (*oop.Empty, error) {
 	var groupCfg oop.Group
 
-	log.Infow("creating group", log.Fields{"GroupID": grp.GroupID})
+	logger.Infow(nil, "creating group", log.Fields{"GroupID": grp.GroupID})
 
 	groupCfg.Command = oop.Group_SET_MEMBERS
 	groupCfg.GroupId = grp.GroupID
@@ -333,7 +333,7 @@
 }
 
 func OpMulticastTrafficQueue(grp *GroupData, isCreating bool) (*oop.Empty, error) {
-	log.Infow("operating on multicast traffic queue", log.Fields{"Creating": isCreating, "GroupID": grp.GroupID})
+	logger.Infow(nil, "operating on multicast traffic queue", log.Fields{"Creating": isCreating, "GroupID": grp.GroupID})
 
 	oo := grp.Subs.OpenOltClient
 
@@ -360,35 +360,34 @@
 
 	if isCreating {
 		if res, err = oop.OpenoltClient.CreateTrafficQueues(oo, context.Background(), &request); err != nil {
-			log.Errorw("Failed to perform - CreateTrafficQueues()", log.Fields{"err": err})
+			logger.Errorw(nil, "Failed to perform - CreateTrafficQueues()", log.Fields{"err": err})
 			return nil, err
 		}
 
-		log.Info("Successfully called - CreateTrafficQueues()")
+		logger.Info(nil, "Successfully called - CreateTrafficQueues()")
 	} else {
 		if res, err = oop.OpenoltClient.RemoveTrafficQueues(oo, context.Background(), &request); err != nil {
-			log.Errorw("Failed to perform - RemoveTrafficQueues()", log.Fields{"err": err})
+			logger.Errorw(nil, "Failed to perform - RemoveTrafficQueues()", log.Fields{"err": err})
 			return nil, err
 		}
 
-		log.Info("Successfully called - RemoveTrafficQueues()")
+		logger.Info(nil, "Successfully called - RemoveTrafficQueues()")
 	}
 
 	return res, nil
 }
 
 func AddMulticastFlow(grp *GroupData) error {
-	log.Infow("add multicast flow", log.Fields{"GroupID": grp.GroupID})
+	logger.Infow(nil, "add multicast flow", log.Fields{"GroupID": grp.GroupID})
 
 	oo := grp.Subs.OpenOltClient
 	config := grp.Subs.TestConfig
 	rsrMgr := grp.Subs.RsrMgr
 
-	var flowID []uint32
+	var flowID uint64
 	var err error
 
-	if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(context.Background(), uint32(config.NniIntfID),
-		ponresourcemanager.FLOW_ID, 1); err != nil {
+	if flowID, err = rsrMgr.GetFlowID(context.Background(), uint32(config.NniIntfID)); err != nil {
 		return err
 	}
 
@@ -400,7 +399,7 @@
 		DstMac:     GenerateMulticastMac(grp.Subs.OnuID, grp.GroupID),
 		PktTagType: DoubleTag}
 
-	flow := oop.Flow{AccessIntfId: int32(grp.Subs.PonIntf), OnuId: int32(grp.Subs.OnuID), UniId: int32(grp.Subs.UniID), FlowId: flowID[0],
+	flow := oop.Flow{AccessIntfId: int32(grp.Subs.PonIntf), OnuId: int32(grp.Subs.OnuID), UniId: int32(grp.Subs.UniID), FlowId: flowID,
 		FlowType: "multicast", AllocId: int32(grp.AllocID), GemportId: int32(grp.GemPortID),
 		Classifier: flowClassifier, Priority: int32(grp.Priority), PortNo: uint32(grp.Subs.UniPortNo), GroupId: uint32(grp.GroupID)}
 
@@ -408,24 +407,22 @@
 
 	st, _ := status.FromError(err)
 	if st.Code() == codes.AlreadyExists {
-		log.Debugw("Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
+		logger.Debugw(nil, "Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
 		return nil
 	}
 
 	if err != nil {
-		log.Errorw("Failed to add multicast flow to device", log.Fields{"err": err, "deviceFlow": flow})
-		rsrMgr.ResourceMgrs[uint32(grp.Subs.PonIntf)].FreeResourceID(context.Background(), uint32(config.NniIntfID),
-			ponresourcemanager.FLOW_ID, flowID)
+		logger.Errorw(nil, "Failed to add multicast flow to device", log.Fields{"err": err, "deviceFlow": flow})
 		return err
 	}
 
-	log.Debugw("Multicast flow added to device successfully ", log.Fields{"flow": flow})
+	logger.Debugw(nil, "Multicast flow added to device successfully ", log.Fields{"flow": flow})
 
 	return nil
 }
 
 func AddMulticastSched(grp *GroupData) error {
-	log.Infow("creating multicast sched", log.Fields{"GroupID": grp.GroupID})
+	logger.Infow(nil, "creating multicast sched", log.Fields{"GroupID": grp.GroupID})
 
 	SchedCfg := &tp_pb.SchedulerConfig{
 		Direction:    tp_pb.Direction_DOWNSTREAM,
@@ -448,18 +445,18 @@
 		GetTrafficScheduler(grp.Subs.TpInstance[grp.Subs.TestConfig.TpIDList[0]], SchedCfg, TfShInfo)}
 
 	if TrafficSched == nil {
-		log.Error("Create scheduler for multicast traffic failed")
+		logger.Error(nil, "Create scheduler for multicast traffic failed")
 		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
 	}
 
-	log.Debugw("Sending Traffic scheduler create to device",
+	logger.Debugw(nil, "Sending Traffic scheduler create to device",
 		log.Fields{"Direction": tp_pb.Direction_DOWNSTREAM, "TrafficScheds": TrafficSched})
 
 	if _, err := grp.Subs.OpenOltClient.CreateTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
 		IntfId: grp.Subs.PonIntf, OnuId: grp.Subs.OnuID,
 		UniId: grp.Subs.UniID, PortNo: grp.Subs.UniPortNo,
 		TrafficScheds: TrafficSched}); err != nil {
-		log.Errorw("Failed to create traffic schedulers", log.Fields{"error": err})
+		logger.Errorw(nil, "Failed to create traffic schedulers", log.Fields{"error": err})
 		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
 	}
 
@@ -467,7 +464,7 @@
 }
 
 func OpMemberToGroup(grp *GroupData, isAdding bool) (*oop.Empty, error) {
-	log.Infow("operating on group", log.Fields{"Adding": isAdding})
+	logger.Infow(nil, "operating on group", log.Fields{"Adding": isAdding})
 
 	var groupCfg oop.Group
 
@@ -498,40 +495,40 @@
 func AddMulticastQueueFlow(grp *GroupData) error {
 	var err error
 
-	log.Debugw("Create multicast queue flow", log.Fields{"GroupID": grp.GroupID, "AddGroup": grp.AddGroup,
+	logger.Debugw(nil, "Create multicast queue flow", log.Fields{"GroupID": grp.GroupID, "AddGroup": grp.AddGroup,
 		"AddFlow": grp.AddFlow, "AddSched": grp.AddSched, "AddQueue": grp.AddQueue, "AddMember": grp.AddMember})
 
 	if grp.AddGroup {
 		if _, err = CreateGroup(grp); err != nil {
-			log.Error("Failed to add group to device")
+			logger.Error(nil, "Failed to add group to device")
 			return err
 		}
 	}
 
 	if grp.AddFlow {
 		if err = AddMulticastFlow(grp); err != nil {
-			log.Error("Failed to add multicast flow to device")
+			logger.Error(nil, "Failed to add multicast flow to device")
 			return err
 		}
 	}
 
 	if grp.AddSched {
 		if err = AddMulticastSched(grp); err != nil {
-			log.Error("Failed to add multicast sched to device")
+			logger.Error(nil, "Failed to add multicast sched to device")
 			return err
 		}
 	}
 
 	if grp.AddQueue {
 		if _, err = OpMulticastTrafficQueue(grp, true); err != nil {
-			log.Error("Failed to add multicast queue to device")
+			logger.Error(nil, "Failed to add multicast queue to device")
 			return err
 		}
 	}
 
 	if grp.AddMember {
 		if _, err = OpMemberToGroup(grp, true); err != nil {
-			log.Error("Failed to add member to group")
+			logger.Error(nil, "Failed to add member to group")
 			return err
 		}
 	}
@@ -542,18 +539,18 @@
 func CreateTrafficSchedWithRetry(OpenOltClient oop.OpenoltClient, sched *oop.TrafficSchedulers) error {
 	maxRetry := 20
 	if _, err := OpenOltClient.CreateTrafficSchedulers(context.Background(), sched); err == nil {
-		log.Info("succeeded in first attempt")
+		logger.Info(nil, "succeeded in first attempt")
 		return nil
 	} else {
-		log.Info("going for a retry")
+		logger.Info(nil, "going for a retry")
 	}
 	for i := 0; i < maxRetry; i++ {
 		if _, err := OpenOltClient.CreateTrafficSchedulers(context.Background(), sched); err != nil {
-			log.Error("retying after delay")
+			logger.Error(nil, "retying after delay")
 			time.Sleep(50 * time.Millisecond)
 			continue
 		} else {
-			log.Infow("succeeded in retry iteration=%d!!", log.Fields{"i": i})
+			logger.Infow(nil, "succeeded in retry iteration=%d!!", log.Fields{"i": i})
 			return nil
 		}
 	}
@@ -564,7 +561,7 @@
 func CreateTrafficQueuesWithRetry(OpenOltClient oop.OpenoltClient, queue *oop.TrafficQueues) error {
 	maxRetry := 20
 	if _, err := OpenOltClient.CreateTrafficQueues(context.Background(), queue); err == nil {
-		log.Info("succeeded in first attempt")
+		logger.Info(nil, "succeeded in first attempt")
 		return nil
 	}
 	for i := 0; i < maxRetry; i++ {
@@ -572,7 +569,7 @@
 			time.Sleep(50 * time.Millisecond)
 			continue
 		} else {
-			log.Infow("succeeded in retry iteration=%d!!", log.Fields{"i": i})
+			logger.Infow(nil, "succeeded in retry iteration=%d!!", log.Fields{"i": i})
 			return nil
 		}
 	}
@@ -589,7 +586,7 @@
 
 	st, _ := status.FromError(err)
 	if st.Code() == codes.AlreadyExists {
-		log.Debugw("Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
+		logger.Debugw(nil, "Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
 		return nil
 	}
 	if st.Code() == codes.ResourceExhausted {
@@ -597,17 +594,17 @@
 			_, err = OpenOltClient.FlowAdd(context.Background(), flow)
 			st, _ := status.FromError(err)
 			if st.Code() == codes.ResourceExhausted {
-				log.Error("flow-install-failed--retrying")
+				logger.Error(nil, "flow-install-failed--retrying")
 				continue
 			} else if st.Code() == codes.OK {
-				log.Infow("flow-install-succeeded-on-retry", log.Fields{"i": i, "flow": flow})
+				logger.Infow(nil, "flow-install-succeeded-on-retry", log.Fields{"i": i, "flow": flow})
 				return nil
 			}
 		}
 
 	}
 
-	log.Debugw("Flow install failed on all retries ", log.Fields{"flow": flow})
+	logger.Debugw(nil, "Flow install failed on all retries ", log.Fields{"flow": flow})
 
 	return err
 }