[VOL-5514] Add timeout for OLT gRPC requests

Signed-off-by: bseeniva <balaji.seenivasan@radisys.com>
Change-Id: I56bab809e9b67effc6f6d4c6ef32889a7a2c128d
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 30bcbf9..7ef9639 100755
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -1294,8 +1294,10 @@
 			return olterrors.NewErrAdapter("device-state-update-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
 		}
 
+		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
 		// Since the device was disabled before the OLT was rebooted, enforce the OLT to be Disabled after re-connection.
-		_, err = dh.Client.DisableOlt(ctx, new(oop.Empty))
+		_, err = dh.Client.DisableOlt(subCtx, new(oop.Empty))
+		cancel()
 		if err != nil {
 			return olterrors.NewErrAdapter("olt-disable-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
 		}
@@ -1392,7 +1394,9 @@
 	var deviceInfo *oop.DeviceInfo
 	var genmac string
 
-	deviceInfo, err = dh.Client.GetDeviceInfo(log.WithSpanFromContext(context.Background(), ctx), new(oop.Empty))
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	deviceInfo, err = dh.Client.GetDeviceInfo(subCtx, new(oop.Empty))
+	cancel()
 
 	if err != nil {
 		return nil, olterrors.NewErrPersistence("get", "device", 0, nil, err)
@@ -1674,7 +1678,9 @@
 		logger.Debugw(ctx, "sent-omci-msg", log.Fields{"intf-id": intfID, "onu-id": onuID,
 			"omciTransactionID": transid, "omciMsg": string(omciMessage.Pkt)})
 
-		_, err := dh.Client.OmciMsgOut(log.WithSpanFromContext(context.Background(), ctx), omciMessage)
+		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+		_, err := dh.Client.OmciMsgOut(subCtx, omciMessage)
+		cancel()
 		if err != nil {
 			return olterrors.NewErrCommunication("omci-send-failed", log.Fields{
 				"intf-id": intfID,
@@ -1751,7 +1757,9 @@
 	logger.Debugw(ctx, "sent-omci-msg", log.Fields{"intf-id": intfID, "onu-id": onuID,
 		"omciTransactionID": transid, "omciMsg": string(omciMessage.Pkt)})
 
-	_, err := dh.Client.OmciMsgOut(log.WithSpanFromContext(context.Background(), ctx), omciMessage)
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	_, err := dh.Client.OmciMsgOut(subCtx, omciMessage)
+	cancel()
 	if err != nil {
 		return olterrors.NewErrCommunication("omci-send-failed", log.Fields{
 			"intf-id": intfID,
@@ -1768,7 +1776,9 @@
 	}
 	var pir uint32 = 1000000
 	Onu := oop.Onu{IntfId: intfID, OnuId: uint32(onuID), SerialNumber: serialNum, Pir: pir, OmccEncryption: dh.openOLT.config.OmccEncryption}
-	if _, err := dh.Client.ActivateOnu(ctx, &Onu); err != nil {
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	defer cancel()
+	if _, err := dh.Client.ActivateOnu(subCtx, &Onu); err != nil {
 		st, _ := status.FromError(err)
 		if st.Code() == codes.AlreadyExists {
 			logger.Debugw(ctx, "onu-activation-in-progress", log.Fields{"SerialNumber": serialNumber, "onu-id": onuID, "device-id": dh.device.Id})
@@ -2420,11 +2430,14 @@
 	/* On device disable ,admin state update has to be done prior sending request to agent since
 	   the indication thread may processes invalid  indications of ONU and OLT*/
 	if dh.Client != nil {
-		if _, err := dh.Client.DisableOlt(log.WithSpanFromContext(context.Background(), ctx), new(oop.Empty)); err != nil {
+		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+		if _, err := dh.Client.DisableOlt(subCtx, new(oop.Empty)); err != nil {
 			if e, ok := status.FromError(err); ok && e.Code() == codes.Internal {
+				cancel()
 				return olterrors.NewErrAdapter("olt-disable-failed", log.Fields{"device-id": device.Id}, err)
 			}
 		}
+		cancel()
 	}
 	logger.Debugw(ctx, "olt-disabled", log.Fields{"device-id": device.Id})
 	/* Discovered ONUs entries need to be cleared , since on device disable the child devices goes to
@@ -2488,11 +2501,14 @@
 // Device Oper-State: ACTIVE
 func (dh *DeviceHandler) ReenableDevice(ctx context.Context, device *voltha.Device) error {
 	if dh.Client != nil {
-		if _, err := dh.Client.ReenableOlt(log.WithSpanFromContext(context.Background(), ctx), new(oop.Empty)); err != nil {
+		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+		if _, err := dh.Client.ReenableOlt(subCtx, new(oop.Empty)); err != nil {
 			if e, ok := status.FromError(err); ok && e.Code() == codes.Internal {
+				cancel()
 				return olterrors.NewErrAdapter("olt-reenable-failed", log.Fields{"device-id": dh.device.Id}, err)
 			}
 		}
+		cancel()
 	} else {
 		return olterrors.NewErrAdapter("olt-reenable-failed", log.Fields{"device-id": dh.device.Id}, errors.New("nil device client"))
 	}
@@ -2622,7 +2638,8 @@
 	dh.removeOnuIndicationChannels(ctx)
 	// Reset the state
 	if dh.Client != nil {
-		if _, err = dh.Client.Reboot(ctx, new(oop.Empty)); err != nil {
+		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+		if _, err = dh.Client.Reboot(subCtx, new(oop.Empty)); err != nil {
 			go func() {
 				failureReason := fmt.Sprintf("Failed to reboot during device delete request with error: %s", err.Error())
 				if err1 := dh.eventMgr.oltRebootFailedEvent(ctx, dh.device.Id, failureReason, time.Now().Unix()); err1 != nil {
@@ -2631,6 +2648,7 @@
 			}()
 			logger.Errorw(ctx, "olt-reboot-failed", log.Fields{"device-id": dh.device.Id, "err": err})
 		}
+		cancel()
 	}
 	// There is no need to update the core about operation status and connection status of the OLT.
 	// The OLT is getting deleted anyway and the core might have already cleared the OLT device from its DB.
@@ -2721,7 +2739,9 @@
 // RebootDevice reboots the given device
 func (dh *DeviceHandler) RebootDevice(ctx context.Context, device *voltha.Device) error {
 	if dh.Client != nil {
-		if _, err := dh.Client.Reboot(log.WithSpanFromContext(context.Background(), ctx), new(oop.Empty)); err != nil {
+		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+		defer cancel()
+		if _, err := dh.Client.Reboot(subCtx, new(oop.Empty)); err != nil {
 			return olterrors.NewErrAdapter("olt-reboot-failed", log.Fields{"device-id": dh.device.Id}, err)
 		}
 	} else {
@@ -2796,7 +2816,9 @@
 		})
 	}
 
-	if _, err := dh.Client.UplinkPacketOut(ctx, &uplinkPkt); err != nil {
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	defer cancel()
+	if _, err := dh.Client.UplinkPacketOut(subCtx, &uplinkPkt); err != nil {
 		return olterrors.NewErrCommunication("packet-out-to-nni", log.Fields{
 			"packet":    hex.EncodeToString(packet.Data),
 			"device-id": dh.device.Id,
@@ -2867,7 +2889,9 @@
 		})
 	}
 
-	if _, err := dh.Client.OnuPacketOut(ctx, &onuPkt); err != nil {
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	defer cancel()
+	if _, err := dh.Client.OnuPacketOut(subCtx, &onuPkt); err != nil {
 		return olterrors.NewErrCommunication("packet-out-send", log.Fields{
 			"source":             "adapter",
 			"destination":        "onu",
@@ -3300,10 +3324,11 @@
 	ponID := plt.PortNoToIntfID(port.GetPortNo(), voltha.Port_PON_OLT)
 	ponIntf := &oop.Interface{IntfId: ponID}
 	var operStatus voltha.OperStatus_Types
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
 	if enablePort {
 		operStatus = voltha.OperStatus_ACTIVE
-		out, err := dh.Client.EnablePonIf(ctx, ponIntf)
-
+		out, err := dh.Client.EnablePonIf(subCtx, ponIntf)
+		cancel()
 		if err != nil {
 			return olterrors.NewErrAdapter("pon-port-enable-failed", log.Fields{
 				"device-id": dh.device.Id,
@@ -3314,7 +3339,8 @@
 		logger.Infow(ctx, "enabled-pon-port", log.Fields{"out": out, "device-id": dh.device, "Port": port})
 	} else {
 		operStatus = voltha.OperStatus_UNKNOWN
-		out, err := dh.Client.DisablePonIf(ctx, ponIntf)
+		out, err := dh.Client.DisablePonIf(subCtx, ponIntf)
+		cancel()
 		if err != nil {
 			return olterrors.NewErrAdapter("pon-port-disable-failed", log.Fields{
 				"device-id": dh.device.Id,
@@ -3457,13 +3483,15 @@
 	dh.onus.Delete(onuKey)
 	dh.discOnus.Delete(onuSn)
 
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
 	// Now clear the ONU on the OLT
-	if _, err := dh.Client.DeleteOnu(log.WithSpanFromContext(context.Background(), ctx), onu); err != nil {
+	if _, err := dh.Client.DeleteOnu(subCtx, onu); err != nil {
+		cancel()
 		return olterrors.NewErrAdapter("failed-to-delete-onu", log.Fields{
 			"device-id": dh.device.Id,
 			"onu-id":    onuID}, err).Log()
 	}
-
+	cancel()
 	return nil
 }
 func (dh *DeviceHandler) removeFlowFromDevice(ctx context.Context, flowID uint64, intfID uint32) {
@@ -3829,10 +3857,12 @@
 	// request openOlt agent to send the the port statistics indication
 
 	go func() {
-		_, err := dh.Client.CollectStatistics(ctx, new(oop.Empty))
+		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+		_, err := dh.Client.CollectStatistics(subCtx, new(oop.Empty))
 		if err != nil {
 			logger.Errorw(ctx, "getOltPortCounters CollectStatistics failed ", log.Fields{"err": err})
 		}
+		cancel()
 	}()
 	select {
 	case <-statIndChn:
@@ -3982,7 +4012,9 @@
 				case *tp_pb.TechProfileInstance:
 					allocId := tpInst.UsScheduler.AllocId
 					onuAllocPkt := oop.OnuPacket{IntfId: intfID, OnuId: onuID, AllocId: allocId}
-					allocStats, err = dh.Client.GetAllocIdStatistics(ctx, &onuAllocPkt)
+					allocCtx, allocCtxcancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+					allocStats, err = dh.Client.GetAllocIdStatistics(allocCtx, &onuAllocPkt)
+					allocCtxcancel()
 					if err != nil {
 						logger.Errorw(ctx, "Error received from openolt for GetAllocIdStatistics", log.Fields{"onuID": onuID, "AllodId": allocId, "Error": err})
 						return err
@@ -3996,7 +4028,9 @@
 					gemPorts := tpInst.UpstreamGemPortAttributeList
 					for _, gem := range gemPorts {
 						onuGemPkt := oop.OnuPacket{IntfId: intfID, OnuId: onuID, GemportId: gem.GemportId}
-						onuGemStats, err = dh.Client.GetGemPortStatistics(ctx, &onuGemPkt)
+						gemCtx, gemCtxcancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+						onuGemStats, err = dh.Client.GetGemPortStatistics(gemCtx, &onuGemPkt)
+						gemCtxcancel()
 						if err != nil {
 							logger.Errorw(ctx, "Error received from openolt for GetGemPortStatistics", log.Fields{"onuID": onuID, "GemPortId": gem.GemportId, "Error": err})
 							return err
@@ -4072,7 +4106,9 @@
 
 func (dh *DeviceHandler) getOnuInfo(ctx context.Context, intfID uint32, onuID *uint32) (*oop.OnuInfo, error) {
 	Onu := oop.Onu{IntfId: intfID, OnuId: *onuID}
-	OnuInfo, err := dh.Client.GetOnuInfo(ctx, &Onu)
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	OnuInfo, err := dh.Client.GetOnuInfo(subCtx, &Onu)
+	cancel()
 	if err != nil {
 		return nil, err
 	}
@@ -4081,7 +4117,9 @@
 
 func (dh *DeviceHandler) getIntfInfo(ctx context.Context, intfID uint32) (*oop.PonIntfInfo, error) {
 	Intf := oop.Interface{IntfId: intfID}
-	IntfInfo, err := dh.Client.GetPonInterfaceInfo(ctx, &Intf)
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	IntfInfo, err := dh.Client.GetPonInterfaceInfo(subCtx, &Intf)
+	cancel()
 	if err != nil {
 		return nil, err
 	}
@@ -4090,7 +4128,9 @@
 
 func (dh *DeviceHandler) getRxPower(ctx context.Context, rxPowerRequest *extension.GetRxPowerRequest) *extension.SingleGetValueResponse {
 	Onu := oop.Onu{IntfId: rxPowerRequest.IntfId, OnuId: rxPowerRequest.OnuId}
-	rxPower, err := dh.Client.GetPonRxPower(ctx, &Onu)
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	rxPower, err := dh.Client.GetPonRxPower(subCtx, &Onu)
+	cancel()
 	if err != nil {
 		logger.Errorw(ctx, "error-while-getting-rx-power", log.Fields{"Onu": Onu, "err": err})
 		return generateSingleGetValueErrorResponse(err)
@@ -4150,8 +4190,10 @@
 	if serialNumber != "" {
 		onuDev := dh.getChildDevice(ctx, serialNumber, (uint32)(portNumber))
 		if onuDev != nil {
+			subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
 			Onu := oop.Onu{IntfId: uint32(portNumber), OnuId: onuDev.onuID}
-			rxPower, err := dh.Client.GetPonRxPower(ctx, &Onu)
+			rxPower, err := dh.Client.GetPonRxPower(subCtx, &Onu)
+			cancel()
 			if err != nil {
 				logger.Errorw(ctx, "error-while-getting-rx-power", log.Fields{"Onu": Onu, "err": err})
 				return generateSingleGetValueErrorResponse(err)
@@ -4172,7 +4214,9 @@
 		dh.onus.Range(func(Onukey interface{}, onuInCache interface{}) bool {
 			if onuInCache.(*OnuDevice).intfID == (uint32)(portNumber) {
 				Onu := oop.Onu{IntfId: (uint32)(portNumber), OnuId: onuInCache.(*OnuDevice).onuID}
-				rxPower, err := dh.Client.GetPonRxPower(ctx, &Onu)
+				subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+				rxPower, err := dh.Client.GetPonRxPower(subCtx, &Onu)
+				cancel()
 				if err != nil {
 					logger.Errorw(ctx, "error-while-getting-rx-power, however considering to proceed further with other ONUs on PON", log.Fields{"Onu": Onu, "err": err})
 				} else {
@@ -4230,7 +4274,9 @@
 	}
 
 	Interface := oop.Interface{IntfId: uint32(portNumber)}
-	ponStats, err := dh.Client.GetPonPortStatistics(ctx, &Interface)
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	ponStats, err := dh.Client.GetPonPortStatistics(subCtx, &Interface)
+	cancel()
 	if err != nil {
 		logger.Errorw(ctx, "error-while-getting-pon-port-stats", log.Fields{"IntfId": portNumber, "err": err})
 		return generateSingleGetValueErrorResponse(err)
@@ -4281,7 +4327,9 @@
 	}
 
 	Interface := oop.Interface{IntfId: uint32(portNumber)}
-	nniStats, err := dh.Client.GetNniPortStatistics(ctx, &Interface)
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	nniStats, err := dh.Client.GetNniPortStatistics(subCtx, &Interface)
+	cancel()
 	if err != nil {
 		logger.Errorw(ctx, "error-while-getting-nni-port-stats", log.Fields{"PortNo": portNumber, "err": err})
 		return generateSingleGetValueErrorResponse(err)