[VOL-5514] Add timeout for OLT gRPC requests

Signed-off-by: bseeniva <balaji.seenivasan@radisys.com>
Change-Id: I56bab809e9b67effc6f6d4c6ef32889a7a2c128d
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index c64b4bc..ca1b904 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -519,13 +519,18 @@
 		UniId: sq.uniID, PortNo: sq.uniPort,
 		TrafficQueues: trafficQueues,
 		TechProfileId: TrafficSched[0].TechProfileId}
-	if _, err = f.deviceHandler.Client.CreateTrafficQueues(ctx, queues); err != nil {
+	subCtx1, cancel1 := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+	if _, err = f.deviceHandler.Client.CreateTrafficQueues(subCtx1, queues); err != nil {
+		cancel1()
 		if len(queues.TrafficQueues) > 1 {
 			logger.Debug(ctx, "removing-queues-for-1tcont-multi-gem", log.Fields{"intfID": sq.intfID, "onuID": sq.onuID, "dir": sq.direction})
-			_, _ = f.deviceHandler.Client.RemoveTrafficQueues(ctx, queues)
+			subCtx2, cancel2 := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+			_, _ = f.deviceHandler.Client.RemoveTrafficQueues(subCtx2, queues)
+			cancel2()
 		}
 		return olterrors.NewErrAdapter("failed-to-create-traffic-queues-in-device", log.Fields{"traffic-queues": trafficQueues}, err)
 	}
+	cancel1()
 	return err
 }
 
@@ -549,12 +554,15 @@
 				"nniIntfID":     sq.nniIntfID,
 				"onuID":         sq.onuID,
 				"uniID":         sq.uniID})
-		if _, err := f.deviceHandler.Client.CreateTrafficSchedulers(ctx, &tp_pb.TrafficSchedulers{
+		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+		if _, err := f.deviceHandler.Client.CreateTrafficSchedulers(subCtx, &tp_pb.TrafficSchedulers{
 			IntfId: sq.intfID, OnuId: sq.onuID,
 			UniId: sq.uniID, PortNo: sq.uniPort,
 			TrafficScheds: TrafficSched}); err != nil {
+			cancel()
 			return olterrors.NewErrAdapter("failed-to-create-traffic-schedulers-in-device", log.Fields{"TrafficScheds": TrafficSched}, err)
 		}
+		cancel()
 		logger.Infow(ctx, "successfully-created-traffic-schedulers", log.Fields{
 			"direction":      sq.direction,
 			"traffic-queues": trafficQueues,
@@ -572,14 +580,19 @@
 		UniId:         sq.uniID, PortNo: sq.uniPort,
 		TrafficQueues: trafficQueues,
 		TechProfileId: TrafficSched[0].TechProfileId}
-	if _, err := f.deviceHandler.Client.CreateTrafficQueues(ctx, queues); err != nil {
+	subCtx1, cancel1 := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+	if _, err := f.deviceHandler.Client.CreateTrafficQueues(subCtx1, queues); err != nil {
+		cancel1()
 		if len(queues.TrafficQueues) > 1 {
 			logger.Debug(ctx, "removing-queues-for-1tcont-multi-gem", log.Fields{"intfID": sq.intfID, "onuID": sq.onuID, "dir": sq.direction})
-			_, _ = f.deviceHandler.Client.RemoveTrafficQueues(ctx, queues)
+			subCtx2, cancel2 := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+			_, _ = f.deviceHandler.Client.RemoveTrafficQueues(subCtx2, queues)
+			cancel2()
 		}
 		f.revertScheduler(ctx, sq, TrafficSched)
 		return olterrors.NewErrAdapter("failed-to-create-traffic-queues-in-device", log.Fields{"traffic-queues": trafficQueues}, err)
 	}
+	cancel1()
 	logger.Infow(ctx, "successfully-created-traffic-schedulers", log.Fields{
 		"direction":      sq.direction,
 		"traffic-queues": trafficQueues,
@@ -633,7 +646,9 @@
 				"device-id": f.deviceHandler.device.Id}, err)
 	}
 
-	if _, err = f.deviceHandler.Client.RemoveTrafficQueues(ctx,
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+	defer cancel()
+	if _, err = f.deviceHandler.Client.RemoveTrafficQueues(subCtx,
 		&tp_pb.TrafficQueues{IntfId: sq.intfID, OnuId: sq.onuID,
 			NetworkIntfId: sq.nniIntfID,
 			UniId:         sq.uniID, PortNo: sq.uniPort,
@@ -677,10 +692,12 @@
 	TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile.GetTrafficScheduler(sq.tpInst.(*tp_pb.TechProfileInstance), SchedCfg, TrafficShaping)}
 	TrafficSched[0].TechProfileId = sq.tpID
 
-	if _, err = f.deviceHandler.Client.RemoveTrafficSchedulers(ctx, &tp_pb.TrafficSchedulers{
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+	if _, err = f.deviceHandler.Client.RemoveTrafficSchedulers(subCtx, &tp_pb.TrafficSchedulers{
 		IntfId: sq.intfID, OnuId: sq.onuID,
 		UniId: sq.uniID, PortNo: sq.uniPort,
 		TrafficScheds: TrafficSched}); err != nil {
+		cancel()
 		return olterrors.NewErrAdapter("unable-to-remove-traffic-schedulers-from-device",
 			log.Fields{
 				"intf-id":            sq.intfID,
@@ -689,7 +706,7 @@
 				"uni-id":             sq.uniID,
 				"uni-port":           sq.uniPort}, err)
 	}
-
+	cancel()
 	logger.Infow(ctx, "removed-traffic-schedulers-successfully",
 		log.Fields{"device-id": f.deviceHandler.device.Id,
 			"intf-id":  sq.intfID,
@@ -757,7 +774,8 @@
 			"device-id": f.deviceHandler.device.Id,
 			"err":       err})
 	} else {
-		if _, err = f.deviceHandler.Client.RemoveTrafficQueues(ctx,
+		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+		if _, err := f.deviceHandler.Client.RemoveTrafficQueues(subCtx,
 			&tp_pb.TrafficQueues{IntfId: sq.intfID, OnuId: sq.onuID,
 				UniId: sq.uniID, PortNo: sq.uniPort,
 				TrafficQueues: TrafficQueues,
@@ -780,10 +798,12 @@
 				"uni-port":  sq.uniPort,
 				"tp-id":     sq.tpID})
 		}
+		cancel()
 	}
 
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
 	// Remove traffic schedulers. Ignore any errors, just log them.
-	if _, err := f.deviceHandler.Client.RemoveTrafficSchedulers(ctx, &tp_pb.TrafficSchedulers{
+	if _, err := f.deviceHandler.Client.RemoveTrafficSchedulers(subCtx, &tp_pb.TrafficSchedulers{
 		IntfId: sq.intfID, OnuId: sq.onuID,
 		UniId: sq.uniID, PortNo: sq.uniPort,
 		TrafficScheds: TrafficSched}); err != nil {
@@ -805,6 +825,7 @@
 			"uni-port":  sq.uniPort,
 			"tp-id":     sq.tpID})
 	}
+	cancel()
 }
 
 // This function allocates tconts and GEM ports for an ONU
@@ -1624,7 +1645,9 @@
 		"flow":      *deviceFlow,
 		"device-id": f.deviceHandler.device.Id,
 		"intf-id":   intfID})
-	_, err := f.deviceHandler.Client.FlowAdd(log.WithSpanFromContext(context.Background(), ctx), deviceFlow)
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+	_, err := f.deviceHandler.Client.FlowAdd(subCtx, deviceFlow)
+	cancel()
 
 	st, _ := status.FromError(err)
 	if st.Code() == codes.AlreadyExists {
@@ -1662,7 +1685,9 @@
 		log.Fields{
 			"flow":      *deviceFlow,
 			"device-id": f.deviceHandler.device.Id})
-	_, err := f.deviceHandler.Client.FlowRemove(log.WithSpanFromContext(context.Background(), ctx), deviceFlow)
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+	_, err := f.deviceHandler.Client.FlowRemove(subCtx, deviceFlow)
+	cancel()
 	if err != nil {
 		if f.deviceHandler.device.ConnectStatus == common.ConnectStatus_UNREACHABLE {
 			logger.Warnw(ctx, "can-not-remove-flow-from-device--unreachable",
@@ -3459,10 +3484,12 @@
 func (f *OpenOltFlowMgr) revertScheduler(ctx context.Context, sq schedQueue, TrafficSched []*tp_pb.TrafficScheduler) {
 	// revert scheduler
 	logger.Warnw(ctx, "reverting-scheduler-for-onu", log.Fields{"intf-id": sq.intfID, "onu-id": sq.onuID, "uni-id": sq.uniID, "tp-id": sq.tpID})
-	_, _ = f.deviceHandler.Client.RemoveTrafficSchedulers(ctx, &tp_pb.TrafficSchedulers{
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+	_, _ = f.deviceHandler.Client.RemoveTrafficSchedulers(subCtx, &tp_pb.TrafficSchedulers{
 		IntfId: sq.intfID, OnuId: sq.onuID,
 		UniId: sq.uniID, PortNo: sq.uniPort,
 		TrafficScheds: TrafficSched})
+	cancel()
 }
 
 // validateMeter validates if there is a meter mismatch for the given direction. It also clears the stale meter if the reference count is zero