[VOL-5514] Add timeout for OLT gRPC requests
Signed-off-by: bseeniva <balaji.seenivasan@radisys.com>
Change-Id: I56bab809e9b67effc6f6d4c6ef32889a7a2c128d
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 30bcbf9..7ef9639 100755
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -1294,8 +1294,10 @@
return olterrors.NewErrAdapter("device-state-update-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
}
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
// Since the device was disabled before the OLT was rebooted, enforce the OLT to be Disabled after re-connection.
- _, err = dh.Client.DisableOlt(ctx, new(oop.Empty))
+ _, err = dh.Client.DisableOlt(subCtx, new(oop.Empty))
+ cancel()
if err != nil {
return olterrors.NewErrAdapter("olt-disable-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
}
@@ -1392,7 +1394,9 @@
var deviceInfo *oop.DeviceInfo
var genmac string
- deviceInfo, err = dh.Client.GetDeviceInfo(log.WithSpanFromContext(context.Background(), ctx), new(oop.Empty))
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ deviceInfo, err = dh.Client.GetDeviceInfo(subCtx, new(oop.Empty))
+ cancel()
if err != nil {
return nil, olterrors.NewErrPersistence("get", "device", 0, nil, err)
@@ -1674,7 +1678,9 @@
logger.Debugw(ctx, "sent-omci-msg", log.Fields{"intf-id": intfID, "onu-id": onuID,
"omciTransactionID": transid, "omciMsg": string(omciMessage.Pkt)})
- _, err := dh.Client.OmciMsgOut(log.WithSpanFromContext(context.Background(), ctx), omciMessage)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ _, err := dh.Client.OmciMsgOut(subCtx, omciMessage)
+ cancel()
if err != nil {
return olterrors.NewErrCommunication("omci-send-failed", log.Fields{
"intf-id": intfID,
@@ -1751,7 +1757,9 @@
logger.Debugw(ctx, "sent-omci-msg", log.Fields{"intf-id": intfID, "onu-id": onuID,
"omciTransactionID": transid, "omciMsg": string(omciMessage.Pkt)})
- _, err := dh.Client.OmciMsgOut(log.WithSpanFromContext(context.Background(), ctx), omciMessage)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ _, err := dh.Client.OmciMsgOut(subCtx, omciMessage)
+ cancel()
if err != nil {
return olterrors.NewErrCommunication("omci-send-failed", log.Fields{
"intf-id": intfID,
@@ -1768,7 +1776,9 @@
}
var pir uint32 = 1000000
Onu := oop.Onu{IntfId: intfID, OnuId: uint32(onuID), SerialNumber: serialNum, Pir: pir, OmccEncryption: dh.openOLT.config.OmccEncryption}
- if _, err := dh.Client.ActivateOnu(ctx, &Onu); err != nil {
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ defer cancel()
+ if _, err := dh.Client.ActivateOnu(subCtx, &Onu); err != nil {
st, _ := status.FromError(err)
if st.Code() == codes.AlreadyExists {
logger.Debugw(ctx, "onu-activation-in-progress", log.Fields{"SerialNumber": serialNumber, "onu-id": onuID, "device-id": dh.device.Id})
@@ -2420,11 +2430,14 @@
/* On device disable ,admin state update has to be done prior sending request to agent since
the indication thread may processes invalid indications of ONU and OLT*/
if dh.Client != nil {
- if _, err := dh.Client.DisableOlt(log.WithSpanFromContext(context.Background(), ctx), new(oop.Empty)); err != nil {
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ if _, err := dh.Client.DisableOlt(subCtx, new(oop.Empty)); err != nil {
if e, ok := status.FromError(err); ok && e.Code() == codes.Internal {
+ cancel()
return olterrors.NewErrAdapter("olt-disable-failed", log.Fields{"device-id": device.Id}, err)
}
}
+ cancel()
}
logger.Debugw(ctx, "olt-disabled", log.Fields{"device-id": device.Id})
/* Discovered ONUs entries need to be cleared , since on device disable the child devices goes to
@@ -2488,11 +2501,14 @@
// Device Oper-State: ACTIVE
func (dh *DeviceHandler) ReenableDevice(ctx context.Context, device *voltha.Device) error {
if dh.Client != nil {
- if _, err := dh.Client.ReenableOlt(log.WithSpanFromContext(context.Background(), ctx), new(oop.Empty)); err != nil {
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ if _, err := dh.Client.ReenableOlt(subCtx, new(oop.Empty)); err != nil {
if e, ok := status.FromError(err); ok && e.Code() == codes.Internal {
+ cancel()
return olterrors.NewErrAdapter("olt-reenable-failed", log.Fields{"device-id": dh.device.Id}, err)
}
}
+ cancel()
} else {
return olterrors.NewErrAdapter("olt-reenable-failed", log.Fields{"device-id": dh.device.Id}, errors.New("nil device client"))
}
@@ -2622,7 +2638,8 @@
dh.removeOnuIndicationChannels(ctx)
// Reset the state
if dh.Client != nil {
- if _, err = dh.Client.Reboot(ctx, new(oop.Empty)); err != nil {
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ if _, err = dh.Client.Reboot(subCtx, new(oop.Empty)); err != nil {
go func() {
failureReason := fmt.Sprintf("Failed to reboot during device delete request with error: %s", err.Error())
if err1 := dh.eventMgr.oltRebootFailedEvent(ctx, dh.device.Id, failureReason, time.Now().Unix()); err1 != nil {
@@ -2631,6 +2648,7 @@
}()
logger.Errorw(ctx, "olt-reboot-failed", log.Fields{"device-id": dh.device.Id, "err": err})
}
+ cancel()
}
// There is no need to update the core about operation status and connection status of the OLT.
// The OLT is getting deleted anyway and the core might have already cleared the OLT device from its DB.
@@ -2721,7 +2739,9 @@
// RebootDevice reboots the given device
func (dh *DeviceHandler) RebootDevice(ctx context.Context, device *voltha.Device) error {
if dh.Client != nil {
- if _, err := dh.Client.Reboot(log.WithSpanFromContext(context.Background(), ctx), new(oop.Empty)); err != nil {
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ defer cancel()
+ if _, err := dh.Client.Reboot(subCtx, new(oop.Empty)); err != nil {
return olterrors.NewErrAdapter("olt-reboot-failed", log.Fields{"device-id": dh.device.Id}, err)
}
} else {
@@ -2796,7 +2816,9 @@
})
}
- if _, err := dh.Client.UplinkPacketOut(ctx, &uplinkPkt); err != nil {
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ defer cancel()
+ if _, err := dh.Client.UplinkPacketOut(subCtx, &uplinkPkt); err != nil {
return olterrors.NewErrCommunication("packet-out-to-nni", log.Fields{
"packet": hex.EncodeToString(packet.Data),
"device-id": dh.device.Id,
@@ -2867,7 +2889,9 @@
})
}
- if _, err := dh.Client.OnuPacketOut(ctx, &onuPkt); err != nil {
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ defer cancel()
+ if _, err := dh.Client.OnuPacketOut(subCtx, &onuPkt); err != nil {
return olterrors.NewErrCommunication("packet-out-send", log.Fields{
"source": "adapter",
"destination": "onu",
@@ -3300,10 +3324,11 @@
ponID := plt.PortNoToIntfID(port.GetPortNo(), voltha.Port_PON_OLT)
ponIntf := &oop.Interface{IntfId: ponID}
var operStatus voltha.OperStatus_Types
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
if enablePort {
operStatus = voltha.OperStatus_ACTIVE
- out, err := dh.Client.EnablePonIf(ctx, ponIntf)
-
+ out, err := dh.Client.EnablePonIf(subCtx, ponIntf)
+ cancel()
if err != nil {
return olterrors.NewErrAdapter("pon-port-enable-failed", log.Fields{
"device-id": dh.device.Id,
@@ -3314,7 +3339,8 @@
logger.Infow(ctx, "enabled-pon-port", log.Fields{"out": out, "device-id": dh.device, "Port": port})
} else {
operStatus = voltha.OperStatus_UNKNOWN
- out, err := dh.Client.DisablePonIf(ctx, ponIntf)
+ out, err := dh.Client.DisablePonIf(subCtx, ponIntf)
+ cancel()
if err != nil {
return olterrors.NewErrAdapter("pon-port-disable-failed", log.Fields{
"device-id": dh.device.Id,
@@ -3457,13 +3483,15 @@
dh.onus.Delete(onuKey)
dh.discOnus.Delete(onuSn)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
// Now clear the ONU on the OLT
- if _, err := dh.Client.DeleteOnu(log.WithSpanFromContext(context.Background(), ctx), onu); err != nil {
+ if _, err := dh.Client.DeleteOnu(subCtx, onu); err != nil {
+ cancel()
return olterrors.NewErrAdapter("failed-to-delete-onu", log.Fields{
"device-id": dh.device.Id,
"onu-id": onuID}, err).Log()
}
-
+ cancel()
return nil
}
func (dh *DeviceHandler) removeFlowFromDevice(ctx context.Context, flowID uint64, intfID uint32) {
@@ -3829,10 +3857,12 @@
// request openOlt agent to send the the port statistics indication
go func() {
- _, err := dh.Client.CollectStatistics(ctx, new(oop.Empty))
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ _, err := dh.Client.CollectStatistics(subCtx, new(oop.Empty))
if err != nil {
logger.Errorw(ctx, "getOltPortCounters CollectStatistics failed ", log.Fields{"err": err})
}
+ cancel()
}()
select {
case <-statIndChn:
@@ -3982,7 +4012,9 @@
case *tp_pb.TechProfileInstance:
allocId := tpInst.UsScheduler.AllocId
onuAllocPkt := oop.OnuPacket{IntfId: intfID, OnuId: onuID, AllocId: allocId}
- allocStats, err = dh.Client.GetAllocIdStatistics(ctx, &onuAllocPkt)
+ allocCtx, allocCtxcancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ allocStats, err = dh.Client.GetAllocIdStatistics(allocCtx, &onuAllocPkt)
+ allocCtxcancel()
if err != nil {
logger.Errorw(ctx, "Error received from openolt for GetAllocIdStatistics", log.Fields{"onuID": onuID, "AllodId": allocId, "Error": err})
return err
@@ -3996,7 +4028,9 @@
gemPorts := tpInst.UpstreamGemPortAttributeList
for _, gem := range gemPorts {
onuGemPkt := oop.OnuPacket{IntfId: intfID, OnuId: onuID, GemportId: gem.GemportId}
- onuGemStats, err = dh.Client.GetGemPortStatistics(ctx, &onuGemPkt)
+ gemCtx, gemCtxcancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ onuGemStats, err = dh.Client.GetGemPortStatistics(gemCtx, &onuGemPkt)
+ gemCtxcancel()
if err != nil {
logger.Errorw(ctx, "Error received from openolt for GetGemPortStatistics", log.Fields{"onuID": onuID, "GemPortId": gem.GemportId, "Error": err})
return err
@@ -4072,7 +4106,9 @@
func (dh *DeviceHandler) getOnuInfo(ctx context.Context, intfID uint32, onuID *uint32) (*oop.OnuInfo, error) {
Onu := oop.Onu{IntfId: intfID, OnuId: *onuID}
- OnuInfo, err := dh.Client.GetOnuInfo(ctx, &Onu)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ OnuInfo, err := dh.Client.GetOnuInfo(subCtx, &Onu)
+ cancel()
if err != nil {
return nil, err
}
@@ -4081,7 +4117,9 @@
func (dh *DeviceHandler) getIntfInfo(ctx context.Context, intfID uint32) (*oop.PonIntfInfo, error) {
Intf := oop.Interface{IntfId: intfID}
- IntfInfo, err := dh.Client.GetPonInterfaceInfo(ctx, &Intf)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ IntfInfo, err := dh.Client.GetPonInterfaceInfo(subCtx, &Intf)
+ cancel()
if err != nil {
return nil, err
}
@@ -4090,7 +4128,9 @@
func (dh *DeviceHandler) getRxPower(ctx context.Context, rxPowerRequest *extension.GetRxPowerRequest) *extension.SingleGetValueResponse {
Onu := oop.Onu{IntfId: rxPowerRequest.IntfId, OnuId: rxPowerRequest.OnuId}
- rxPower, err := dh.Client.GetPonRxPower(ctx, &Onu)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ rxPower, err := dh.Client.GetPonRxPower(subCtx, &Onu)
+ cancel()
if err != nil {
logger.Errorw(ctx, "error-while-getting-rx-power", log.Fields{"Onu": Onu, "err": err})
return generateSingleGetValueErrorResponse(err)
@@ -4150,8 +4190,10 @@
if serialNumber != "" {
onuDev := dh.getChildDevice(ctx, serialNumber, (uint32)(portNumber))
if onuDev != nil {
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
Onu := oop.Onu{IntfId: uint32(portNumber), OnuId: onuDev.onuID}
- rxPower, err := dh.Client.GetPonRxPower(ctx, &Onu)
+ rxPower, err := dh.Client.GetPonRxPower(subCtx, &Onu)
+ cancel()
if err != nil {
logger.Errorw(ctx, "error-while-getting-rx-power", log.Fields{"Onu": Onu, "err": err})
return generateSingleGetValueErrorResponse(err)
@@ -4172,7 +4214,9 @@
dh.onus.Range(func(Onukey interface{}, onuInCache interface{}) bool {
if onuInCache.(*OnuDevice).intfID == (uint32)(portNumber) {
Onu := oop.Onu{IntfId: (uint32)(portNumber), OnuId: onuInCache.(*OnuDevice).onuID}
- rxPower, err := dh.Client.GetPonRxPower(ctx, &Onu)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ rxPower, err := dh.Client.GetPonRxPower(subCtx, &Onu)
+ cancel()
if err != nil {
logger.Errorw(ctx, "error-while-getting-rx-power, however considering to proceed further with other ONUs on PON", log.Fields{"Onu": Onu, "err": err})
} else {
@@ -4230,7 +4274,9 @@
}
Interface := oop.Interface{IntfId: uint32(portNumber)}
- ponStats, err := dh.Client.GetPonPortStatistics(ctx, &Interface)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ ponStats, err := dh.Client.GetPonPortStatistics(subCtx, &Interface)
+ cancel()
if err != nil {
logger.Errorw(ctx, "error-while-getting-pon-port-stats", log.Fields{"IntfId": portNumber, "err": err})
return generateSingleGetValueErrorResponse(err)
@@ -4281,7 +4327,9 @@
}
Interface := oop.Interface{IntfId: uint32(portNumber)}
- nniStats, err := dh.Client.GetNniPortStatistics(ctx, &Interface)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+ nniStats, err := dh.Client.GetNniPortStatistics(subCtx, &Interface)
+ cancel()
if err != nil {
logger.Errorw(ctx, "error-while-getting-nni-port-stats", log.Fields{"PortNo": portNumber, "err": err})
return generateSingleGetValueErrorResponse(err)