[VOL-5514] Add timeout for OLT gRPC requests
Signed-off-by: bseeniva <balaji.seenivasan@radisys.com>
Change-Id: I56bab809e9b67effc6f6d4c6ef32889a7a2c128d
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index c64b4bc..ca1b904 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -519,13 +519,18 @@
UniId: sq.uniID, PortNo: sq.uniPort,
TrafficQueues: trafficQueues,
TechProfileId: TrafficSched[0].TechProfileId}
- if _, err = f.deviceHandler.Client.CreateTrafficQueues(ctx, queues); err != nil {
+ subCtx1, cancel1 := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+ if _, err = f.deviceHandler.Client.CreateTrafficQueues(subCtx1, queues); err != nil {
+ cancel1()
if len(queues.TrafficQueues) > 1 {
logger.Debug(ctx, "removing-queues-for-1tcont-multi-gem", log.Fields{"intfID": sq.intfID, "onuID": sq.onuID, "dir": sq.direction})
- _, _ = f.deviceHandler.Client.RemoveTrafficQueues(ctx, queues)
+ subCtx2, cancel2 := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+ _, _ = f.deviceHandler.Client.RemoveTrafficQueues(subCtx2, queues)
+ cancel2()
}
return olterrors.NewErrAdapter("failed-to-create-traffic-queues-in-device", log.Fields{"traffic-queues": trafficQueues}, err)
}
+ cancel1()
return err
}
@@ -549,12 +554,15 @@
"nniIntfID": sq.nniIntfID,
"onuID": sq.onuID,
"uniID": sq.uniID})
- if _, err := f.deviceHandler.Client.CreateTrafficSchedulers(ctx, &tp_pb.TrafficSchedulers{
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+ if _, err := f.deviceHandler.Client.CreateTrafficSchedulers(subCtx, &tp_pb.TrafficSchedulers{
IntfId: sq.intfID, OnuId: sq.onuID,
UniId: sq.uniID, PortNo: sq.uniPort,
TrafficScheds: TrafficSched}); err != nil {
+ cancel()
return olterrors.NewErrAdapter("failed-to-create-traffic-schedulers-in-device", log.Fields{"TrafficScheds": TrafficSched}, err)
}
+ cancel()
logger.Infow(ctx, "successfully-created-traffic-schedulers", log.Fields{
"direction": sq.direction,
"traffic-queues": trafficQueues,
@@ -572,14 +580,19 @@
UniId: sq.uniID, PortNo: sq.uniPort,
TrafficQueues: trafficQueues,
TechProfileId: TrafficSched[0].TechProfileId}
- if _, err := f.deviceHandler.Client.CreateTrafficQueues(ctx, queues); err != nil {
+ subCtx1, cancel1 := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+ if _, err := f.deviceHandler.Client.CreateTrafficQueues(subCtx1, queues); err != nil {
+ cancel1()
if len(queues.TrafficQueues) > 1 {
logger.Debug(ctx, "removing-queues-for-1tcont-multi-gem", log.Fields{"intfID": sq.intfID, "onuID": sq.onuID, "dir": sq.direction})
- _, _ = f.deviceHandler.Client.RemoveTrafficQueues(ctx, queues)
+ subCtx2, cancel2 := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+ _, _ = f.deviceHandler.Client.RemoveTrafficQueues(subCtx2, queues)
+ cancel2()
}
f.revertScheduler(ctx, sq, TrafficSched)
return olterrors.NewErrAdapter("failed-to-create-traffic-queues-in-device", log.Fields{"traffic-queues": trafficQueues}, err)
}
+ cancel1()
logger.Infow(ctx, "successfully-created-traffic-schedulers", log.Fields{
"direction": sq.direction,
"traffic-queues": trafficQueues,
@@ -633,7 +646,9 @@
"device-id": f.deviceHandler.device.Id}, err)
}
- if _, err = f.deviceHandler.Client.RemoveTrafficQueues(ctx,
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+ defer cancel()
+ if _, err = f.deviceHandler.Client.RemoveTrafficQueues(subCtx,
&tp_pb.TrafficQueues{IntfId: sq.intfID, OnuId: sq.onuID,
NetworkIntfId: sq.nniIntfID,
UniId: sq.uniID, PortNo: sq.uniPort,
@@ -677,10 +692,12 @@
TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile.GetTrafficScheduler(sq.tpInst.(*tp_pb.TechProfileInstance), SchedCfg, TrafficShaping)}
TrafficSched[0].TechProfileId = sq.tpID
- if _, err = f.deviceHandler.Client.RemoveTrafficSchedulers(ctx, &tp_pb.TrafficSchedulers{
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+ if _, err = f.deviceHandler.Client.RemoveTrafficSchedulers(subCtx, &tp_pb.TrafficSchedulers{
IntfId: sq.intfID, OnuId: sq.onuID,
UniId: sq.uniID, PortNo: sq.uniPort,
TrafficScheds: TrafficSched}); err != nil {
+ cancel()
return olterrors.NewErrAdapter("unable-to-remove-traffic-schedulers-from-device",
log.Fields{
"intf-id": sq.intfID,
@@ -689,7 +706,7 @@
"uni-id": sq.uniID,
"uni-port": sq.uniPort}, err)
}
-
+ cancel()
logger.Infow(ctx, "removed-traffic-schedulers-successfully",
log.Fields{"device-id": f.deviceHandler.device.Id,
"intf-id": sq.intfID,
@@ -757,7 +774,8 @@
"device-id": f.deviceHandler.device.Id,
"err": err})
} else {
- if _, err = f.deviceHandler.Client.RemoveTrafficQueues(ctx,
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+ if _, err := f.deviceHandler.Client.RemoveTrafficQueues(subCtx,
&tp_pb.TrafficQueues{IntfId: sq.intfID, OnuId: sq.onuID,
UniId: sq.uniID, PortNo: sq.uniPort,
TrafficQueues: TrafficQueues,
@@ -780,10 +798,12 @@
"uni-port": sq.uniPort,
"tp-id": sq.tpID})
}
+ cancel()
}
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
// Remove traffic schedulers. Ignore any errors, just log them.
- if _, err := f.deviceHandler.Client.RemoveTrafficSchedulers(ctx, &tp_pb.TrafficSchedulers{
+ if _, err := f.deviceHandler.Client.RemoveTrafficSchedulers(subCtx, &tp_pb.TrafficSchedulers{
IntfId: sq.intfID, OnuId: sq.onuID,
UniId: sq.uniID, PortNo: sq.uniPort,
TrafficScheds: TrafficSched}); err != nil {
@@ -805,6 +825,7 @@
"uni-port": sq.uniPort,
"tp-id": sq.tpID})
}
+ cancel()
}
// This function allocates tconts and GEM ports for an ONU
@@ -1624,7 +1645,9 @@
"flow": *deviceFlow,
"device-id": f.deviceHandler.device.Id,
"intf-id": intfID})
- _, err := f.deviceHandler.Client.FlowAdd(log.WithSpanFromContext(context.Background(), ctx), deviceFlow)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+ _, err := f.deviceHandler.Client.FlowAdd(subCtx, deviceFlow)
+ cancel()
st, _ := status.FromError(err)
if st.Code() == codes.AlreadyExists {
@@ -1662,7 +1685,9 @@
log.Fields{
"flow": *deviceFlow,
"device-id": f.deviceHandler.device.Id})
- _, err := f.deviceHandler.Client.FlowRemove(log.WithSpanFromContext(context.Background(), ctx), deviceFlow)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+ _, err := f.deviceHandler.Client.FlowRemove(subCtx, deviceFlow)
+ cancel()
if err != nil {
if f.deviceHandler.device.ConnectStatus == common.ConnectStatus_UNREACHABLE {
logger.Warnw(ctx, "can-not-remove-flow-from-device--unreachable",
@@ -3459,10 +3484,12 @@
func (f *OpenOltFlowMgr) revertScheduler(ctx context.Context, sq schedQueue, TrafficSched []*tp_pb.TrafficScheduler) {
// revert scheduler
logger.Warnw(ctx, "reverting-scheduler-for-onu", log.Fields{"intf-id": sq.intfID, "onu-id": sq.onuID, "uni-id": sq.uniID, "tp-id": sq.tpID})
- _, _ = f.deviceHandler.Client.RemoveTrafficSchedulers(ctx, &tp_pb.TrafficSchedulers{
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+ _, _ = f.deviceHandler.Client.RemoveTrafficSchedulers(subCtx, &tp_pb.TrafficSchedulers{
IntfId: sq.intfID, OnuId: sq.onuID,
UniId: sq.uniID, PortNo: sq.uniPort,
TrafficScheds: TrafficSched})
+ cancel()
}
// validateMeter validates if there is a meter mismatch for the given direction. It also clears the stale meter if the reference count is zero