[VOL-5514] Add timeout for OLT gRPC requests

Signed-off-by: bseeniva <balaji.seenivasan@radisys.com>
Change-Id: I56bab809e9b67effc6f6d4c6ef32889a7a2c128d
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 30bcbf9..7ef9639 100755
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -1294,8 +1294,10 @@
 			return olterrors.NewErrAdapter("device-state-update-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
 		}
 
+		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
 		// Since the device was disabled before the OLT was rebooted, enforce the OLT to be Disabled after re-connection.
-		_, err = dh.Client.DisableOlt(ctx, new(oop.Empty))
+		_, err = dh.Client.DisableOlt(subCtx, new(oop.Empty))
+		cancel()
 		if err != nil {
 			return olterrors.NewErrAdapter("olt-disable-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
 		}
@@ -1392,7 +1394,9 @@
 	var deviceInfo *oop.DeviceInfo
 	var genmac string
 
-	deviceInfo, err = dh.Client.GetDeviceInfo(log.WithSpanFromContext(context.Background(), ctx), new(oop.Empty))
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	deviceInfo, err = dh.Client.GetDeviceInfo(subCtx, new(oop.Empty))
+	cancel()
 
 	if err != nil {
 		return nil, olterrors.NewErrPersistence("get", "device", 0, nil, err)
@@ -1674,7 +1678,9 @@
 		logger.Debugw(ctx, "sent-omci-msg", log.Fields{"intf-id": intfID, "onu-id": onuID,
 			"omciTransactionID": transid, "omciMsg": string(omciMessage.Pkt)})
 
-		_, err := dh.Client.OmciMsgOut(log.WithSpanFromContext(context.Background(), ctx), omciMessage)
+		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+		_, err := dh.Client.OmciMsgOut(subCtx, omciMessage)
+		cancel()
 		if err != nil {
 			return olterrors.NewErrCommunication("omci-send-failed", log.Fields{
 				"intf-id": intfID,
@@ -1751,7 +1757,9 @@
 	logger.Debugw(ctx, "sent-omci-msg", log.Fields{"intf-id": intfID, "onu-id": onuID,
 		"omciTransactionID": transid, "omciMsg": string(omciMessage.Pkt)})
 
-	_, err := dh.Client.OmciMsgOut(log.WithSpanFromContext(context.Background(), ctx), omciMessage)
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	_, err := dh.Client.OmciMsgOut(subCtx, omciMessage)
+	cancel()
 	if err != nil {
 		return olterrors.NewErrCommunication("omci-send-failed", log.Fields{
 			"intf-id": intfID,
@@ -1768,7 +1776,9 @@
 	}
 	var pir uint32 = 1000000
 	Onu := oop.Onu{IntfId: intfID, OnuId: uint32(onuID), SerialNumber: serialNum, Pir: pir, OmccEncryption: dh.openOLT.config.OmccEncryption}
-	if _, err := dh.Client.ActivateOnu(ctx, &Onu); err != nil {
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	defer cancel()
+	if _, err := dh.Client.ActivateOnu(subCtx, &Onu); err != nil {
 		st, _ := status.FromError(err)
 		if st.Code() == codes.AlreadyExists {
 			logger.Debugw(ctx, "onu-activation-in-progress", log.Fields{"SerialNumber": serialNumber, "onu-id": onuID, "device-id": dh.device.Id})
@@ -2420,11 +2430,14 @@
 	/* On device disable ,admin state update has to be done prior sending request to agent since
 	   the indication thread may processes invalid  indications of ONU and OLT*/
 	if dh.Client != nil {
-		if _, err := dh.Client.DisableOlt(log.WithSpanFromContext(context.Background(), ctx), new(oop.Empty)); err != nil {
+		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+		if _, err := dh.Client.DisableOlt(subCtx, new(oop.Empty)); err != nil {
 			if e, ok := status.FromError(err); ok && e.Code() == codes.Internal {
+				cancel()
 				return olterrors.NewErrAdapter("olt-disable-failed", log.Fields{"device-id": device.Id}, err)
 			}
 		}
+		cancel()
 	}
 	logger.Debugw(ctx, "olt-disabled", log.Fields{"device-id": device.Id})
 	/* Discovered ONUs entries need to be cleared , since on device disable the child devices goes to
@@ -2488,11 +2501,14 @@
 // Device Oper-State: ACTIVE
 func (dh *DeviceHandler) ReenableDevice(ctx context.Context, device *voltha.Device) error {
 	if dh.Client != nil {
-		if _, err := dh.Client.ReenableOlt(log.WithSpanFromContext(context.Background(), ctx), new(oop.Empty)); err != nil {
+		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+		if _, err := dh.Client.ReenableOlt(subCtx, new(oop.Empty)); err != nil {
 			if e, ok := status.FromError(err); ok && e.Code() == codes.Internal {
+				cancel()
 				return olterrors.NewErrAdapter("olt-reenable-failed", log.Fields{"device-id": dh.device.Id}, err)
 			}
 		}
+		cancel()
 	} else {
 		return olterrors.NewErrAdapter("olt-reenable-failed", log.Fields{"device-id": dh.device.Id}, errors.New("nil device client"))
 	}
@@ -2622,7 +2638,8 @@
 	dh.removeOnuIndicationChannels(ctx)
 	// Reset the state
 	if dh.Client != nil {
-		if _, err = dh.Client.Reboot(ctx, new(oop.Empty)); err != nil {
+		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+		if _, err = dh.Client.Reboot(subCtx, new(oop.Empty)); err != nil {
 			go func() {
 				failureReason := fmt.Sprintf("Failed to reboot during device delete request with error: %s", err.Error())
 				if err1 := dh.eventMgr.oltRebootFailedEvent(ctx, dh.device.Id, failureReason, time.Now().Unix()); err1 != nil {
@@ -2631,6 +2648,7 @@
 			}()
 			logger.Errorw(ctx, "olt-reboot-failed", log.Fields{"device-id": dh.device.Id, "err": err})
 		}
+		cancel()
 	}
 	// There is no need to update the core about operation status and connection status of the OLT.
 	// The OLT is getting deleted anyway and the core might have already cleared the OLT device from its DB.
@@ -2721,7 +2739,9 @@
 // RebootDevice reboots the given device
 func (dh *DeviceHandler) RebootDevice(ctx context.Context, device *voltha.Device) error {
 	if dh.Client != nil {
-		if _, err := dh.Client.Reboot(log.WithSpanFromContext(context.Background(), ctx), new(oop.Empty)); err != nil {
+		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+		defer cancel()
+		if _, err := dh.Client.Reboot(subCtx, new(oop.Empty)); err != nil {
 			return olterrors.NewErrAdapter("olt-reboot-failed", log.Fields{"device-id": dh.device.Id}, err)
 		}
 	} else {
@@ -2796,7 +2816,9 @@
 		})
 	}
 
-	if _, err := dh.Client.UplinkPacketOut(ctx, &uplinkPkt); err != nil {
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	defer cancel()
+	if _, err := dh.Client.UplinkPacketOut(subCtx, &uplinkPkt); err != nil {
 		return olterrors.NewErrCommunication("packet-out-to-nni", log.Fields{
 			"packet":    hex.EncodeToString(packet.Data),
 			"device-id": dh.device.Id,
@@ -2867,7 +2889,9 @@
 		})
 	}
 
-	if _, err := dh.Client.OnuPacketOut(ctx, &onuPkt); err != nil {
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	defer cancel()
+	if _, err := dh.Client.OnuPacketOut(subCtx, &onuPkt); err != nil {
 		return olterrors.NewErrCommunication("packet-out-send", log.Fields{
 			"source":             "adapter",
 			"destination":        "onu",
@@ -3300,10 +3324,11 @@
 	ponID := plt.PortNoToIntfID(port.GetPortNo(), voltha.Port_PON_OLT)
 	ponIntf := &oop.Interface{IntfId: ponID}
 	var operStatus voltha.OperStatus_Types
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
 	if enablePort {
 		operStatus = voltha.OperStatus_ACTIVE
-		out, err := dh.Client.EnablePonIf(ctx, ponIntf)
-
+		out, err := dh.Client.EnablePonIf(subCtx, ponIntf)
+		cancel()
 		if err != nil {
 			return olterrors.NewErrAdapter("pon-port-enable-failed", log.Fields{
 				"device-id": dh.device.Id,
@@ -3314,7 +3339,8 @@
 		logger.Infow(ctx, "enabled-pon-port", log.Fields{"out": out, "device-id": dh.device, "Port": port})
 	} else {
 		operStatus = voltha.OperStatus_UNKNOWN
-		out, err := dh.Client.DisablePonIf(ctx, ponIntf)
+		out, err := dh.Client.DisablePonIf(subCtx, ponIntf)
+		cancel()
 		if err != nil {
 			return olterrors.NewErrAdapter("pon-port-disable-failed", log.Fields{
 				"device-id": dh.device.Id,
@@ -3457,13 +3483,15 @@
 	dh.onus.Delete(onuKey)
 	dh.discOnus.Delete(onuSn)
 
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
 	// Now clear the ONU on the OLT
-	if _, err := dh.Client.DeleteOnu(log.WithSpanFromContext(context.Background(), ctx), onu); err != nil {
+	if _, err := dh.Client.DeleteOnu(subCtx, onu); err != nil {
+		cancel()
 		return olterrors.NewErrAdapter("failed-to-delete-onu", log.Fields{
 			"device-id": dh.device.Id,
 			"onu-id":    onuID}, err).Log()
 	}
-
+	cancel()
 	return nil
 }
 func (dh *DeviceHandler) removeFlowFromDevice(ctx context.Context, flowID uint64, intfID uint32) {
@@ -3829,10 +3857,12 @@
 	// request openOlt agent to send the the port statistics indication
 
 	go func() {
-		_, err := dh.Client.CollectStatistics(ctx, new(oop.Empty))
+		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+		_, err := dh.Client.CollectStatistics(subCtx, new(oop.Empty))
 		if err != nil {
 			logger.Errorw(ctx, "getOltPortCounters CollectStatistics failed ", log.Fields{"err": err})
 		}
+		cancel()
 	}()
 	select {
 	case <-statIndChn:
@@ -3982,7 +4012,9 @@
 				case *tp_pb.TechProfileInstance:
 					allocId := tpInst.UsScheduler.AllocId
 					onuAllocPkt := oop.OnuPacket{IntfId: intfID, OnuId: onuID, AllocId: allocId}
-					allocStats, err = dh.Client.GetAllocIdStatistics(ctx, &onuAllocPkt)
+					allocCtx, allocCtxcancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+					allocStats, err = dh.Client.GetAllocIdStatistics(allocCtx, &onuAllocPkt)
+					allocCtxcancel()
 					if err != nil {
 						logger.Errorw(ctx, "Error received from openolt for GetAllocIdStatistics", log.Fields{"onuID": onuID, "AllodId": allocId, "Error": err})
 						return err
@@ -3996,7 +4028,9 @@
 					gemPorts := tpInst.UpstreamGemPortAttributeList
 					for _, gem := range gemPorts {
 						onuGemPkt := oop.OnuPacket{IntfId: intfID, OnuId: onuID, GemportId: gem.GemportId}
-						onuGemStats, err = dh.Client.GetGemPortStatistics(ctx, &onuGemPkt)
+						gemCtx, gemCtxcancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+						onuGemStats, err = dh.Client.GetGemPortStatistics(gemCtx, &onuGemPkt)
+						gemCtxcancel()
 						if err != nil {
 							logger.Errorw(ctx, "Error received from openolt for GetGemPortStatistics", log.Fields{"onuID": onuID, "GemPortId": gem.GemportId, "Error": err})
 							return err
@@ -4072,7 +4106,9 @@
 
 func (dh *DeviceHandler) getOnuInfo(ctx context.Context, intfID uint32, onuID *uint32) (*oop.OnuInfo, error) {
 	Onu := oop.Onu{IntfId: intfID, OnuId: *onuID}
-	OnuInfo, err := dh.Client.GetOnuInfo(ctx, &Onu)
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	OnuInfo, err := dh.Client.GetOnuInfo(subCtx, &Onu)
+	cancel()
 	if err != nil {
 		return nil, err
 	}
@@ -4081,7 +4117,9 @@
 
 func (dh *DeviceHandler) getIntfInfo(ctx context.Context, intfID uint32) (*oop.PonIntfInfo, error) {
 	Intf := oop.Interface{IntfId: intfID}
-	IntfInfo, err := dh.Client.GetPonInterfaceInfo(ctx, &Intf)
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	IntfInfo, err := dh.Client.GetPonInterfaceInfo(subCtx, &Intf)
+	cancel()
 	if err != nil {
 		return nil, err
 	}
@@ -4090,7 +4128,9 @@
 
 func (dh *DeviceHandler) getRxPower(ctx context.Context, rxPowerRequest *extension.GetRxPowerRequest) *extension.SingleGetValueResponse {
 	Onu := oop.Onu{IntfId: rxPowerRequest.IntfId, OnuId: rxPowerRequest.OnuId}
-	rxPower, err := dh.Client.GetPonRxPower(ctx, &Onu)
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	rxPower, err := dh.Client.GetPonRxPower(subCtx, &Onu)
+	cancel()
 	if err != nil {
 		logger.Errorw(ctx, "error-while-getting-rx-power", log.Fields{"Onu": Onu, "err": err})
 		return generateSingleGetValueErrorResponse(err)
@@ -4150,8 +4190,10 @@
 	if serialNumber != "" {
 		onuDev := dh.getChildDevice(ctx, serialNumber, (uint32)(portNumber))
 		if onuDev != nil {
+			subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
 			Onu := oop.Onu{IntfId: uint32(portNumber), OnuId: onuDev.onuID}
-			rxPower, err := dh.Client.GetPonRxPower(ctx, &Onu)
+			rxPower, err := dh.Client.GetPonRxPower(subCtx, &Onu)
+			cancel()
 			if err != nil {
 				logger.Errorw(ctx, "error-while-getting-rx-power", log.Fields{"Onu": Onu, "err": err})
 				return generateSingleGetValueErrorResponse(err)
@@ -4172,7 +4214,9 @@
 		dh.onus.Range(func(Onukey interface{}, onuInCache interface{}) bool {
 			if onuInCache.(*OnuDevice).intfID == (uint32)(portNumber) {
 				Onu := oop.Onu{IntfId: (uint32)(portNumber), OnuId: onuInCache.(*OnuDevice).onuID}
-				rxPower, err := dh.Client.GetPonRxPower(ctx, &Onu)
+				subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+				rxPower, err := dh.Client.GetPonRxPower(subCtx, &Onu)
+				cancel()
 				if err != nil {
 					logger.Errorw(ctx, "error-while-getting-rx-power, however considering to proceed further with other ONUs on PON", log.Fields{"Onu": Onu, "err": err})
 				} else {
@@ -4230,7 +4274,9 @@
 	}
 
 	Interface := oop.Interface{IntfId: uint32(portNumber)}
-	ponStats, err := dh.Client.GetPonPortStatistics(ctx, &Interface)
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	ponStats, err := dh.Client.GetPonPortStatistics(subCtx, &Interface)
+	cancel()
 	if err != nil {
 		logger.Errorw(ctx, "error-while-getting-pon-port-stats", log.Fields{"IntfId": portNumber, "err": err})
 		return generateSingleGetValueErrorResponse(err)
@@ -4281,7 +4327,9 @@
 	}
 
 	Interface := oop.Interface{IntfId: uint32(portNumber)}
-	nniStats, err := dh.Client.GetNniPortStatistics(ctx, &Interface)
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.cfg.RPCTimeout)
+	nniStats, err := dh.Client.GetNniPortStatistics(subCtx, &Interface)
+	cancel()
 	if err != nil {
 		logger.Errorw(ctx, "error-while-getting-nni-port-stats", log.Fields{"PortNo": portNumber, "err": err})
 		return generateSingleGetValueErrorResponse(err)
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index c64b4bc..ca1b904 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -519,13 +519,18 @@
 		UniId: sq.uniID, PortNo: sq.uniPort,
 		TrafficQueues: trafficQueues,
 		TechProfileId: TrafficSched[0].TechProfileId}
-	if _, err = f.deviceHandler.Client.CreateTrafficQueues(ctx, queues); err != nil {
+	subCtx1, cancel1 := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+	if _, err = f.deviceHandler.Client.CreateTrafficQueues(subCtx1, queues); err != nil {
+		cancel1()
 		if len(queues.TrafficQueues) > 1 {
 			logger.Debug(ctx, "removing-queues-for-1tcont-multi-gem", log.Fields{"intfID": sq.intfID, "onuID": sq.onuID, "dir": sq.direction})
-			_, _ = f.deviceHandler.Client.RemoveTrafficQueues(ctx, queues)
+			subCtx2, cancel2 := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+			_, _ = f.deviceHandler.Client.RemoveTrafficQueues(subCtx2, queues)
+			cancel2()
 		}
 		return olterrors.NewErrAdapter("failed-to-create-traffic-queues-in-device", log.Fields{"traffic-queues": trafficQueues}, err)
 	}
+	cancel1()
 	return err
 }
 
@@ -549,12 +554,15 @@
 				"nniIntfID":     sq.nniIntfID,
 				"onuID":         sq.onuID,
 				"uniID":         sq.uniID})
-		if _, err := f.deviceHandler.Client.CreateTrafficSchedulers(ctx, &tp_pb.TrafficSchedulers{
+		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+		if _, err := f.deviceHandler.Client.CreateTrafficSchedulers(subCtx, &tp_pb.TrafficSchedulers{
 			IntfId: sq.intfID, OnuId: sq.onuID,
 			UniId: sq.uniID, PortNo: sq.uniPort,
 			TrafficScheds: TrafficSched}); err != nil {
+			cancel()
 			return olterrors.NewErrAdapter("failed-to-create-traffic-schedulers-in-device", log.Fields{"TrafficScheds": TrafficSched}, err)
 		}
+		cancel()
 		logger.Infow(ctx, "successfully-created-traffic-schedulers", log.Fields{
 			"direction":      sq.direction,
 			"traffic-queues": trafficQueues,
@@ -572,14 +580,19 @@
 		UniId:         sq.uniID, PortNo: sq.uniPort,
 		TrafficQueues: trafficQueues,
 		TechProfileId: TrafficSched[0].TechProfileId}
-	if _, err := f.deviceHandler.Client.CreateTrafficQueues(ctx, queues); err != nil {
+	subCtx1, cancel1 := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+	if _, err := f.deviceHandler.Client.CreateTrafficQueues(subCtx1, queues); err != nil {
+		cancel1()
 		if len(queues.TrafficQueues) > 1 {
 			logger.Debug(ctx, "removing-queues-for-1tcont-multi-gem", log.Fields{"intfID": sq.intfID, "onuID": sq.onuID, "dir": sq.direction})
-			_, _ = f.deviceHandler.Client.RemoveTrafficQueues(ctx, queues)
+			subCtx2, cancel2 := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+			_, _ = f.deviceHandler.Client.RemoveTrafficQueues(subCtx2, queues)
+			cancel2()
 		}
 		f.revertScheduler(ctx, sq, TrafficSched)
 		return olterrors.NewErrAdapter("failed-to-create-traffic-queues-in-device", log.Fields{"traffic-queues": trafficQueues}, err)
 	}
+	cancel1()
 	logger.Infow(ctx, "successfully-created-traffic-schedulers", log.Fields{
 		"direction":      sq.direction,
 		"traffic-queues": trafficQueues,
@@ -633,7 +646,9 @@
 				"device-id": f.deviceHandler.device.Id}, err)
 	}
 
-	if _, err = f.deviceHandler.Client.RemoveTrafficQueues(ctx,
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+	defer cancel()
+	if _, err = f.deviceHandler.Client.RemoveTrafficQueues(subCtx,
 		&tp_pb.TrafficQueues{IntfId: sq.intfID, OnuId: sq.onuID,
 			NetworkIntfId: sq.nniIntfID,
 			UniId:         sq.uniID, PortNo: sq.uniPort,
@@ -677,10 +692,12 @@
 	TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile.GetTrafficScheduler(sq.tpInst.(*tp_pb.TechProfileInstance), SchedCfg, TrafficShaping)}
 	TrafficSched[0].TechProfileId = sq.tpID
 
-	if _, err = f.deviceHandler.Client.RemoveTrafficSchedulers(ctx, &tp_pb.TrafficSchedulers{
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+	if _, err = f.deviceHandler.Client.RemoveTrafficSchedulers(subCtx, &tp_pb.TrafficSchedulers{
 		IntfId: sq.intfID, OnuId: sq.onuID,
 		UniId: sq.uniID, PortNo: sq.uniPort,
 		TrafficScheds: TrafficSched}); err != nil {
+		cancel()
 		return olterrors.NewErrAdapter("unable-to-remove-traffic-schedulers-from-device",
 			log.Fields{
 				"intf-id":            sq.intfID,
@@ -689,7 +706,7 @@
 				"uni-id":             sq.uniID,
 				"uni-port":           sq.uniPort}, err)
 	}
-
+	cancel()
 	logger.Infow(ctx, "removed-traffic-schedulers-successfully",
 		log.Fields{"device-id": f.deviceHandler.device.Id,
 			"intf-id":  sq.intfID,
@@ -757,7 +774,8 @@
 			"device-id": f.deviceHandler.device.Id,
 			"err":       err})
 	} else {
-		if _, err = f.deviceHandler.Client.RemoveTrafficQueues(ctx,
+		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+		if _, err := f.deviceHandler.Client.RemoveTrafficQueues(subCtx,
 			&tp_pb.TrafficQueues{IntfId: sq.intfID, OnuId: sq.onuID,
 				UniId: sq.uniID, PortNo: sq.uniPort,
 				TrafficQueues: TrafficQueues,
@@ -780,10 +798,12 @@
 				"uni-port":  sq.uniPort,
 				"tp-id":     sq.tpID})
 		}
+		cancel()
 	}
 
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
 	// Remove traffic schedulers. Ignore any errors, just log them.
-	if _, err := f.deviceHandler.Client.RemoveTrafficSchedulers(ctx, &tp_pb.TrafficSchedulers{
+	if _, err := f.deviceHandler.Client.RemoveTrafficSchedulers(subCtx, &tp_pb.TrafficSchedulers{
 		IntfId: sq.intfID, OnuId: sq.onuID,
 		UniId: sq.uniID, PortNo: sq.uniPort,
 		TrafficScheds: TrafficSched}); err != nil {
@@ -805,6 +825,7 @@
 			"uni-port":  sq.uniPort,
 			"tp-id":     sq.tpID})
 	}
+	cancel()
 }
 
 // This function allocates tconts and GEM ports for an ONU
@@ -1624,7 +1645,9 @@
 		"flow":      *deviceFlow,
 		"device-id": f.deviceHandler.device.Id,
 		"intf-id":   intfID})
-	_, err := f.deviceHandler.Client.FlowAdd(log.WithSpanFromContext(context.Background(), ctx), deviceFlow)
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+	_, err := f.deviceHandler.Client.FlowAdd(subCtx, deviceFlow)
+	cancel()
 
 	st, _ := status.FromError(err)
 	if st.Code() == codes.AlreadyExists {
@@ -1662,7 +1685,9 @@
 		log.Fields{
 			"flow":      *deviceFlow,
 			"device-id": f.deviceHandler.device.Id})
-	_, err := f.deviceHandler.Client.FlowRemove(log.WithSpanFromContext(context.Background(), ctx), deviceFlow)
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+	_, err := f.deviceHandler.Client.FlowRemove(subCtx, deviceFlow)
+	cancel()
 	if err != nil {
 		if f.deviceHandler.device.ConnectStatus == common.ConnectStatus_UNREACHABLE {
 			logger.Warnw(ctx, "can-not-remove-flow-from-device--unreachable",
@@ -3459,10 +3484,12 @@
 func (f *OpenOltFlowMgr) revertScheduler(ctx context.Context, sq schedQueue, TrafficSched []*tp_pb.TrafficScheduler) {
 	// revert scheduler
 	logger.Warnw(ctx, "reverting-scheduler-for-onu", log.Fields{"intf-id": sq.intfID, "onu-id": sq.onuID, "uni-id": sq.uniID, "tp-id": sq.tpID})
-	_, _ = f.deviceHandler.Client.RemoveTrafficSchedulers(ctx, &tp_pb.TrafficSchedulers{
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), f.deviceHandler.cfg.RPCTimeout)
+	_, _ = f.deviceHandler.Client.RemoveTrafficSchedulers(subCtx, &tp_pb.TrafficSchedulers{
 		IntfId: sq.intfID, OnuId: sq.onuID,
 		UniId: sq.uniID, PortNo: sq.uniPort,
 		TrafficScheds: TrafficSched})
+	cancel()
 }
 
 // validateMeter validates if there is a meter mismatch for the given direction. It also clears the stale meter if the reference count is zero
diff --git a/internal/pkg/core/openolt_groupmgr.go b/internal/pkg/core/openolt_groupmgr.go
index 6da8475..f2456ac 100644
--- a/internal/pkg/core/openolt_groupmgr.go
+++ b/internal/pkg/core/openolt_groupmgr.go
@@ -71,7 +71,9 @@
 		Action:  g.buildGroupAction(),
 	}
 	logger.Debugw(ctx, "sending-group-to-device", log.Fields{"groupToOlt": groupToOlt})
-	_, err := g.deviceHandler.Client.PerformGroupOperation(ctx, &groupToOlt)
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), g.deviceHandler.cfg.RPCTimeout)
+	_, err := g.deviceHandler.Client.PerformGroupOperation(subCtx, &groupToOlt)
+	cancel()
 	if err != nil {
 		return olterrors.NewErrAdapter("add-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err)
 	}
@@ -94,7 +96,9 @@
 		GroupId: group.Desc.GroupId,
 	}
 	logger.Debugw(ctx, "deleting-group-from-device", log.Fields{"groupToOlt": groupToOlt})
-	_, err := g.deviceHandler.Client.DeleteGroup(ctx, &groupToOlt)
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), g.deviceHandler.cfg.RPCTimeout)
+	_, err := g.deviceHandler.Client.DeleteGroup(subCtx, &groupToOlt)
+	cancel()
 	if err != nil {
 		logger.Errorw(ctx, "delete-group-failed-on-dev", log.Fields{"groupToOlt": groupToOlt, "err": err})
 		return olterrors.NewErrAdapter("delete-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err)
@@ -297,7 +301,9 @@
 		log.Fields{
 			"groupToOlt": group,
 			"command":    group.Command})
-	_, err := g.deviceHandler.Client.PerformGroupOperation(log.WithSpanFromContext(context.Background(), ctx), group)
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), g.deviceHandler.cfg.RPCTimeout)
+	_, err := g.deviceHandler.Client.PerformGroupOperation(subCtx, group)
+	cancel()
 	if err != nil {
 		return olterrors.NewErrAdapter("group-operation-failed", log.Fields{"groupToOlt": group}, err)
 	}
diff --git a/internal/pkg/core/statsmanager.go b/internal/pkg/core/statsmanager.go
index 447704b..ec3487a 100644
--- a/internal/pkg/core/statsmanager.go
+++ b/internal/pkg/core/statsmanager.go
@@ -529,7 +529,9 @@
 func (StatMgr *OpenOltStatisticsMgr) collectOnuStats(ctx context.Context, onuGemInfo rsrcMgr.OnuGemInfo) {
 	onu := &openolt.Onu{IntfId: onuGemInfo.IntfID, OnuId: onuGemInfo.OnuID}
 	logger.Debugw(ctx, "pulling-onu-stats", log.Fields{"IntfID": onuGemInfo.IntfID, "OnuID": onuGemInfo.OnuID})
-	if stats, err := StatMgr.Device.Client.GetOnuStatistics(context.Background(), onu); err == nil {
+	subCtx, cancel := context.WithTimeout(context.Background(), StatMgr.Device.openOLT.rpcTimeout)
+	defer cancel()
+	if stats, err := StatMgr.Device.Client.GetOnuStatistics(subCtx, onu); err == nil {
 		onuStats <- stats
 	} else {
 		logger.Errorw(ctx, "error-while-getting-onu-stats-for-onu", log.Fields{"IntfID": onuGemInfo.IntfID, "OnuID": onuGemInfo.OnuID, "err": err})
@@ -542,10 +544,13 @@
 	var stats *openolt.OnuStatistics
 	var err error
 	logger.Debugw(ctx, "pulling-onu-stats-on-demand", log.Fields{"IntfID": intfID, "OnuID": onuID})
-	if stats, err = StatMgr.Device.Client.GetOnuStatistics(context.Background(), onu); err == nil {
+	subCtx, cancel := context.WithTimeout(context.Background(), StatMgr.Device.openOLT.rpcTimeout)
+	if stats, err = StatMgr.Device.Client.GetOnuStatistics(subCtx, onu); err == nil {
+		cancel()
 		statValue := StatMgr.convertONUStats(stats)
 		return statValue
 	}
+	cancel()
 	logger.Errorw(ctx, "error-while-getting-onu-stats-for-onu", log.Fields{"IntfID": intfID, "OnuID": onuID, "err": err})
 	return nil
 }
@@ -571,12 +576,14 @@
 	for _, gem := range onuGemInfo.GemPorts {
 		logger.Debugw(ctx, "pulling-gem-stats", log.Fields{"IntfID": onuGemInfo.IntfID, "OnuID": onuGemInfo.OnuID, "GemID": gem})
 		onuPacket := &openolt.OnuPacket{IntfId: onuGemInfo.IntfID, OnuId: onuGemInfo.OnuID, GemportId: gem}
-		if stats, err := StatMgr.Device.Client.GetGemPortStatistics(context.Background(), onuPacket); err == nil {
+		subCtx, cancel := context.WithTimeout(context.Background(), StatMgr.Device.openOLT.rpcTimeout)
+		if stats, err := StatMgr.Device.Client.GetGemPortStatistics(subCtx, onuPacket); err == nil {
 			gemStats <- stats
 		} else {
 			logger.Errorw(ctx, "error-while-getting-gem-stats-for-onu",
 				log.Fields{"IntfID": onuGemInfo.IntfID, "OnuID": onuGemInfo.OnuID, "GemID": gem, "err": err})
 		}
+		cancel()
 	}
 }