[VOL-5479]Handle IP change for OLT in olt-adapter
Change-Id: I83a50533e64f8f0392369fe7a8e1f36f583de05d
Signed-off-by: Akash Reddy Kankanala <akash.kankanala@radisys.com>
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index ee582a6..429178b 100755
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -138,6 +138,7 @@
isDeviceDeletionInProgress bool
prevOperStatus common.OperStatus_Types
collectorWaitGroup sync.WaitGroup
+ transitionHandlerCancel context.CancelFunc
}
// OnuDevice represents ONU related info
@@ -1205,7 +1206,9 @@
// doStateInit dial the grpc before going to init state
func (dh *DeviceHandler) doStateInit(ctx context.Context) error {
var err error
-
+ dh.lockDevice.RLock()
+ hostAndPort := dh.device.GetHostAndPort() //store the IP of OLT , so that when we are blocked on grpc dail with old IP,and IP is updated it wont be updated here.
+ dh.lockDevice.RUnlock()
// if the connection is already available, close the previous connection (olt reboot case)
if dh.clientCon != nil {
if err = dh.clientCon.Close(); err != nil {
@@ -1215,10 +1218,10 @@
}
}
- logger.Debugw(ctx, "Dialing grpc", log.Fields{"device-id": dh.device.Id})
+ logger.Debugw(ctx, "Dialing grpc", log.Fields{"device-id": dh.device.Id, "host-and-port": hostAndPort})
grpc_prometheus.EnableClientHandlingTimeHistogram()
// Use Interceptors to automatically inject and publish Open Tracing Spans by this GRPC client
- dh.clientCon, err = grpc.Dial(dh.device.GetHostAndPort(),
+ dh.clientCon, err = grpc.DialContext(ctx, hostAndPort,
grpc.WithInsecure(),
grpc.WithBlock(),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
@@ -1233,18 +1236,11 @@
if err != nil {
return olterrors.NewErrCommunication("dial-failure", log.Fields{
"device-id": dh.device.Id,
- "host-and-port": dh.device.GetHostAndPort()}, err)
- }
- //Setting oper and connection state to RECONCILING and conn state to reachable
- cgClient, err := dh.coreClient.GetCoreServiceClient()
- if err != nil {
- return err
+ "host-and-port": hostAndPort}, err)
}
if dh.device.OperStatus == voltha.OperStatus_RECONCILING {
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.openOLT.rpcTimeout)
- defer cancel()
- if _, err := cgClient.DeviceStateUpdate(subCtx, &ca.DeviceStateFilter{
+ if err = dh.updateDeviceStateInCore(ctx, &ca.DeviceStateFilter{
DeviceId: dh.device.Id,
OperStatus: voltha.OperStatus_RECONCILING,
ConnStatus: voltha.ConnectStatus_REACHABLE,
@@ -1507,9 +1503,11 @@
// AdoptDevice adopts the OLT device
func (dh *DeviceHandler) AdoptDevice(ctx context.Context, device *voltha.Device) {
+ var dhCtx context.Context
dh.transitionMap = NewTransitionMap(dh)
logger.Infow(ctx, "adopt-device", log.Fields{"device-id": device.Id, "Address": device.GetHostAndPort()})
- dh.transitionMap.Handle(ctx, DeviceInit)
+ dhCtx, dh.transitionHandlerCancel = context.WithCancel(context.Background())
+ dh.transitionMap.Handle(dhCtx, DeviceInit)
// Now, set the initial PM configuration for that device
cgClient, err := dh.coreClient.GetCoreServiceClient()
@@ -4754,3 +4752,56 @@
}
return signature
}
+
+func (dh *DeviceHandler) UpdateDevice(ctx context.Context, updateDevice *voltha.UpdateDevice) {
+ var dhCtx context.Context
+ logger.Debug(ctx, "UpdateDevice called", log.Fields{"deviceConfig": updateDevice})
+ dh.lockDevice.Lock()
+ if dh.transitionHandlerCancel != nil {
+ dh.transitionHandlerCancel() //if the previous device transition was grpc blocked on old IP,this make that handler return/exit.
+ dh.transitionHandlerCancel = nil
+ }
+ err := updateDeviceAddress(dh.device, updateDevice) //update the device handler with the new device which would have newIP
+ if err != nil {
+ dh.lockDevice.Unlock()
+ logger.Errorw(ctx, "failed-to-update-device-address", log.Fields{"error": err})
+ return
+ }
+ connectStatus := dh.device.ConnectStatus
+ dhCtx, dh.transitionHandlerCancel = context.WithCancel(context.Background())
+ dh.lockDevice.Unlock()
+ if connectStatus == voltha.ConnectStatus_REACHABLE {
+ dh.updateStateUnreachable(dhCtx)
+ } else {
+ dh.transitionMap.Handle(dhCtx, DeviceInit)
+ }
+}
+
+func updateDeviceAddress(device *voltha.Device, deviceConfig *voltha.UpdateDevice) error {
+ switch addr := deviceConfig.Address.(type) {
+ case *voltha.UpdateDevice_Ipv4Address:
+ if current, ok := device.Address.(*voltha.Device_Ipv4Address); ok {
+ if current.Ipv4Address == addr.Ipv4Address {
+ return fmt.Errorf("no-change-in-IPV4-device-address")
+ }
+ device.Address = &voltha.Device_Ipv4Address{Ipv4Address: addr.Ipv4Address}
+ }
+ case *voltha.UpdateDevice_Ipv6Address:
+ if current, ok := device.Address.(*voltha.Device_Ipv6Address); ok {
+ if current.Ipv6Address == addr.Ipv6Address {
+ return fmt.Errorf("no-change-in-IPV6-device-address")
+ }
+ device.Address = &voltha.Device_Ipv6Address{Ipv6Address: addr.Ipv6Address}
+ }
+ case *voltha.UpdateDevice_HostAndPort:
+ if current, ok := device.Address.(*voltha.Device_HostAndPort); ok {
+ if current.HostAndPort == addr.HostAndPort {
+ return fmt.Errorf("no-change-in-HostAndPort-device-address")
+ }
+ device.Address = &voltha.Device_HostAndPort{HostAndPort: addr.HostAndPort}
+ }
+ default:
+ return fmt.Errorf("invalid-device-config-address-type")
+ }
+ return nil
+}
diff --git a/internal/pkg/core/openolt.go b/internal/pkg/core/openolt.go
index cc11caa..ddd331f 100644
--- a/internal/pkg/core/openolt.go
+++ b/internal/pkg/core/openolt.go
@@ -175,6 +175,7 @@
}
logger.Infow(ctx, "reconcile-device", log.Fields{"device-id": device.Id})
var handler *DeviceHandler
+ var dhCtx context.Context
if handler = oo.getDeviceHandler(device.Id); handler == nil {
//Setting state to RECONCILING
// Fetch previous state
@@ -224,7 +225,8 @@
oo.addDeviceHandlerToMap(handler)
handler.transitionMap = NewTransitionMap(handler)
- go handler.transitionMap.Handle(log.WithSpanFromContext(context.Background(), ctx), DeviceInit)
+ dhCtx, handler.transitionHandlerCancel = context.WithCancel(log.WithSpanFromContext(context.Background(), ctx))
+ go handler.transitionMap.Handle(dhCtx, DeviceInit)
} else {
logger.Warnf(ctx, "device-already-reconciled-or-active", log.Fields{"device-id": device.Id})
return &empty.Empty{}, status.Errorf(codes.AlreadyExists, "handler exists: %s", device.Id)
@@ -681,6 +683,23 @@
return err
}
+// UpdateDevice updates the address of the OLT for now
+func (oo *OpenOLT) UpdateDevice(ctx context.Context, updateDeviceReq *voltha.UpdateDevice) (*empty.Empty, error) {
+ logger.Infow(ctx, "update-device", log.Fields{"device": updateDeviceReq})
+ if updateDeviceReq == nil {
+ return nil, fmt.Errorf("nil-device-config")
+ }
+ if updateDeviceReq.Address == nil {
+ return nil, fmt.Errorf("device-address-not-found")
+ }
+ if handler := oo.getDeviceHandler(updateDeviceReq.Id); handler != nil {
+ go handler.UpdateDevice(context.Background(), updateDeviceReq)
+ return &empty.Empty{}, nil
+ }
+
+ return nil, olterrors.NewErrNotFound("device-handler", log.Fields{"device-id": updateDeviceReq.Id}, nil).Log()
+}
+
/*
*
* Unimplemented APIs