| # |
| # Copyright 2016 the original author or authors. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """ |
| A device agent is instantiated for each Device and plays an important role |
| between the Device object and its adapter. |
| """ |
| import structlog |
| from twisted.internet import reactor |
| from twisted.internet.defer import inlineCallbacks, returnValue |
| |
| from voltha.core.config.config_proxy import CallbackType |
| from voltha.protos.common_pb2 import AdminState, OperStatus |
| from voltha.registry import registry |
| |
| log = structlog.get_logger() |
| |
| |
| class InvalidStateTransition(Exception): pass |
| |
| |
| class DeviceAgent(object): |
| |
| def __init__(self, core, initial_data): |
| self.core = core |
| self._tmp_initial_data = initial_data |
| self.proxy = core.get_proxy('/devices/{}'.format(initial_data.id)) |
| self.proxy.register_callback( |
| CallbackType.PRE_UPDATE, self._validate_update) |
| self.proxy.register_callback( |
| CallbackType.POST_UPDATE, self._process_update) |
| self.last_data = None |
| self.adapter_agent = None |
| |
| @inlineCallbacks |
| def start(self): |
| log.debug('starting') |
| self._set_adapter_agent() |
| yield self._process_update(self._tmp_initial_data) |
| del self._tmp_initial_data |
| log.info('started') |
| returnValue(self) |
| |
| def stop(self): |
| log.debug('stopping') |
| self.proxy.unregister_callback( |
| CallbackType.PRE_UPDATE, self._validate_update) |
| self.proxy.unregister_callback( |
| CallbackType.POST_UPDATE, self._process_update) |
| log.info('stopped') |
| |
| def _set_adapter_agent(self): |
| adapter_name = self._tmp_initial_data.adapter |
| if adapter_name == '': |
| proxy = self.core.get_proxy('/') |
| known_device_types = dict( |
| (dt.id, dt) for dt in proxy.get('/device_types')) |
| device_type = known_device_types[self._tmp_initial_data.type] |
| adapter_name = device_type.adapter |
| assert adapter_name != '' |
| self.adapter_agent = registry('adapter_loader').get_agent(adapter_name) |
| |
| @inlineCallbacks |
| def _validate_update(self, device): |
| """ |
| Called before each update, it allows the blocking of the update |
| (by raising an exception), or even the augmentation of the incoming |
| data. |
| """ |
| log.debug('device-pre-update', device=device) |
| yield self._process_state_transitions(device, dry_run=True) |
| returnValue(device) |
| |
| @inlineCallbacks |
| def _process_update(self, device): |
| """ |
| Called after the device object was updated (individually or part of |
| a transaction), and it is used to propagate the change down to the |
| adapter |
| """ |
| log.debug('device-post-update', device=device) |
| |
| # first, process any potential state transition |
| yield self._process_state_transitions(device) |
| |
| # finally, store this data as last data so we can see what changed |
| self.last_data = device |
| |
| @inlineCallbacks |
| def _process_state_transitions(self, device, dry_run=False): |
| |
| old_admin_state = getattr(self.last_data, 'admin_state', |
| AdminState.UNKNOWN) |
| new_admin_state = device.admin_state |
| transition_handler = self.admin_state_fsm.get( |
| (old_admin_state, new_admin_state), None) |
| if transition_handler is None: |
| pass # no-op |
| elif transition_handler is False: |
| raise InvalidStateTransition('{} -> {}'.format( |
| old_admin_state, new_admin_state)) |
| else: |
| assert callable(transition_handler) |
| yield transition_handler(self, device, dry_run) |
| |
| @inlineCallbacks |
| def _activate_device(self, device, dry_run=False): |
| log.info('activate-device', device=device, dry_run=dry_run) |
| if not dry_run: |
| device = yield self.adapter_agent.adopt_device(device) |
| device.oper_status = OperStatus.ACTIVATING |
| # successful return from this may also populated the device |
| # data, so we need to write it back |
| reactor.callLater(0, self.update_device, device) |
| |
| def update_device(self, device): |
| self.last_data = device # so that we don't propagate back |
| self.proxy.update('/', device) |
| |
| def remove_device(self, device_id): |
| raise NotImplementedError() |
| |
| def _propagate_change(self, device, dry_run=False): |
| log.info('propagate-change', device=device, dry_run=dry_run) |
| if device != self.last_data: |
| raise NotImplementedError() |
| else: |
| log.debug('no-op') |
| |
| def _abandon_device(self, device, dry_run=False): |
| log.info('abandon-device', device=device, dry_run=dry_run) |
| raise NotImplementedError() |
| |
| def _disable_device(self, device, dry_run=False): |
| log.info('disable-device', device=device, dry_run=dry_run) |
| raise NotImplementedError() |
| |
| def _reenable_device(self, device, dry_run=False): |
| log.info('reenable-device', device=device, dry_run=dry_run) |
| raise NotImplementedError() |
| |
| admin_state_fsm = { |
| |
| # Missing entries yield no-op |
| # False means invalid state change |
| |
| (AdminState.UNKNOWN, AdminState.ENABLED): _activate_device, |
| |
| (AdminState.PREPROVISIONED, AdminState.UNKNOWN): False, |
| (AdminState.PREPROVISIONED, AdminState.ENABLED): _activate_device, |
| |
| (AdminState.ENABLED, AdminState.UNKNOWN): False, |
| (AdminState.ENABLED, AdminState.ENABLED): _propagate_change, |
| (AdminState.ENABLED, AdminState.DISABLED): _disable_device, |
| (AdminState.ENABLED, AdminState.PREPROVISIONED): _abandon_device, |
| |
| (AdminState.DISABLED, AdminState.UNKNOWN): False, |
| (AdminState.DISABLED, AdminState.PREPROVISIONED): _abandon_device, |
| (AdminState.DISABLED, AdminState.ENABLED): _reenable_device |
| |
| } |