VOL-1451 Initial checkin of openonu build
Produced docker container capable of building and running
openonu/brcm_openonci_onu. Copied over current onu code
and resolved all imports by copying into the local source tree.
Change-Id: Ib9785d37afc65b7d32ecf74aee2456352626e2b6
diff --git a/python/extensions/omci/state_machines/omci_onu_capabilities.py b/python/extensions/omci/state_machines/omci_onu_capabilities.py
new file mode 100644
index 0000000..c13739e
--- /dev/null
+++ b/python/extensions/omci/state_machines/omci_onu_capabilities.py
@@ -0,0 +1,262 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from transitions import Machine
+from twisted.internet import reactor
+from voltha.extensions.omci.onu_device_entry import OnuDeviceEntry, OnuDeviceEvents, IN_SYNC_KEY
+from voltha.protos.omci_mib_db_pb2 import OpenOmciEventType
+
+
+class OnuOmciCapabilities(object):
+ """
+ OpenOMCI ONU OMCI Capabilities State machine
+ """
+ DEFAULT_STATES = ['disabled', 'out_of_sync', 'in_sync', 'idle']
+
+ DEFAULT_TRANSITIONS = [
+ {'trigger': 'start', 'source': 'disabled', 'dest': 'out_of_sync'},
+ {'trigger': 'synchronized', 'source': 'out_of_sync', 'dest': 'in_sync'},
+
+ {'trigger': 'success', 'source': 'in_sync', 'dest': 'idle'},
+ {'trigger': 'failure', 'source': 'in_sync', 'dest': 'out_of_sync'},
+
+ {'trigger': 'not_synchronized', 'source': 'idle', 'dest': 'out_of_sync'},
+
+ # Do wildcard 'stop' trigger last so it covers all previous states
+ {'trigger': 'stop', 'source': '*', 'dest': 'disabled'},
+ ]
+ DEFAULT_RETRY = 10 # Seconds to delay after task failure/timeout/poll
+
+ def __init__(self, agent, device_id, tasks,
+ advertise_events=False,
+ states=DEFAULT_STATES,
+ transitions=DEFAULT_TRANSITIONS,
+ initial_state='disabled',
+ timeout_delay=DEFAULT_RETRY):
+ """
+ Class initialization
+
+ :param agent: (OpenOmciAgent) Agent
+ :param device_id: (str) ONU Device ID
+ :param tasks: (dict) Tasks to run
+ :param advertise_events: (bool) Advertise events on OpenOMCI Event Bus
+ :param states: (list) List of valid states
+ :param transitions: (dict) Dictionary of triggers and state changes
+ :param initial_state: (str) Initial state machine state
+ :param timeout_delay: (int/float) Number of seconds after a timeout or poll
+ """
+ self.log = structlog.get_logger(device_id=device_id)
+
+ self._agent = agent
+ self._device_id = device_id
+ self._device = None
+ self._timeout_delay = timeout_delay
+
+ self._get_capabilities_task = tasks['get-capabilities']
+ self._advertise_events = advertise_events
+
+ self._deferred = None
+ self._current_task = None
+ self._task_deferred = None
+ self._supported_entities = frozenset()
+ self._supported_msg_types = frozenset()
+
+ self._subscriptions = { # RxEvent.enum -> Subscription Object
+ OnuDeviceEvents.MibDatabaseSyncEvent: None
+ }
+ self._sub_mapping = {
+ OnuDeviceEvents.MibDatabaseSyncEvent: self.on_mib_sync_event
+ }
+ # Statistics and attributes
+ # TODO: add any others if it will support problem diagnosis
+
+ # Set up state machine to manage states
+ self.machine = Machine(model=self, states=states,
+ transitions=transitions,
+ initial=initial_state,
+ queued=True,
+ name='{}-{}'.format(self.__class__.__name__,
+ device_id))
+
+ def _cancel_deferred(self):
+ d1, self._deferred = self._deferred, None
+ d2, self._task_deferred = self._task_deferred, None
+
+ for d in [d1, d2]:
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def _cancel_tasks(self):
+ task, self._current_task = self._current_task, None
+ if task is not None:
+ task.stop()
+
+ def __str__(self):
+ return 'OnuOmciCapabilities: Device ID: {}, State:{}'.format(self._device_id, self.state)
+
+ def delete(self):
+ """
+ Cleanup any state information
+ """
+ self.stop()
+
+ @property
+ def device_id(self):
+ return self._device_id
+
+ @property
+ def supported_managed_entities(self):
+ """
+ Return a set of the Managed Entity class IDs supported on this ONU
+ None is returned if no MEs have been discovered
+
+ :return: (set of ints)
+ """
+ return self._supported_entities if len(self._supported_entities) else None
+
+ @property
+ def supported_message_types(self):
+ """
+ Return a set of the Message Types supported on this ONU
+ None is returned if no message types have been discovered
+
+ :return: (set of EntityOperations)
+ """
+ return self._supported_msg_types if len(self._supported_msg_types) else None
+
+ @property
+ def advertise_events(self):
+ return self._advertise_events
+
+ @advertise_events.setter
+ def advertise_events(self, value):
+ if not isinstance(value, bool):
+ raise TypeError('Advertise event is a boolean')
+ self._advertise_events = value
+
+ def advertise(self, event, info):
+ """Advertise an event on the OpenOMCI event bus"""
+ from datetime import datetime
+
+ if self._advertise_events:
+ self._agent.advertise(event,
+ {
+ 'state-machine': self.machine.name,
+ 'info': info,
+ 'time': str(datetime.utcnow())
+ })
+
+ def on_enter_disabled(self):
+ """
+ State machine is being stopped
+ """
+ self.advertise(OpenOmciEventType.state_change, self.state)
+ self._cancel_deferred()
+ self._cancel_tasks()
+
+ self._supported_entities = frozenset()
+ self._supported_msg_types = frozenset()
+
+ # Drop Response and Autonomous notification subscriptions
+ for event, sub in self._subscriptions.iteritems():
+ if sub is not None:
+ self._subscriptions[event] = None
+ self._device.event_bus.unsubscribe(sub)
+
+ def on_enter_out_of_sync(self):
+ """
+ State machine has just started or the MIB database has transitioned
+ to an out-of-synchronization state
+ """
+ self.advertise(OpenOmciEventType.state_change, self.state)
+ self._cancel_deferred()
+ self._device = self._agent.get_device(self._device_id)
+
+ # Subscribe to events of interest
+ try:
+ for event, sub in self._sub_mapping.iteritems():
+ if self._subscriptions[event] is None:
+ self._subscriptions[event] = \
+ self._device.event_bus.subscribe(
+ topic=OnuDeviceEntry.event_bus_topic(self._device_id,
+ event),
+ callback=sub)
+
+ except Exception as e:
+ self.log.exception('subscription-setup', e=e)
+
+ # Periodically check/poll for in-sync in case subscription was missed or
+ # already in sync
+ self._deferred = reactor.callLater(0, self.check_in_sync)
+
+ def check_in_sync(self):
+ if self._device.mib_db_in_sync:
+ self.synchronized()
+ else:
+ self._deferred = reactor.callLater(self._timeout_delay,
+ self.check_in_sync)
+
+ def on_enter_in_sync(self):
+ """
+ State machine has just transitioned to an in-synchronization state
+ """
+ self.advertise(OpenOmciEventType.state_change, self.state)
+ self._cancel_deferred()
+
+ def success(results):
+ self.log.debug('capabilities-success', results=results)
+ self._supported_entities = self._current_task.supported_managed_entities
+ self._supported_msg_types = self._current_task.supported_message_types
+ self._current_task = None
+ self._deferred = reactor.callLater(0, self.success)
+
+ def failure(reason):
+ self.log.info('capabilities-failure', reason=reason)
+ self._supported_entities = frozenset()
+ self._supported_msg_types = frozenset()
+ self._current_task = None
+ self._deferred = reactor.callLater(self._timeout_delay, self.failure)
+
+ # Schedule a task to read the ONU's OMCI capabilities
+ self._current_task = self._get_capabilities_task(self._agent, self._device_id)
+ self._task_deferred = self._device.task_runner.queue_task(self._current_task)
+ self._task_deferred.addCallbacks(success, failure)
+
+ def on_enter_idle(self):
+ """
+ Notify any subscribers for a capabilities event and wait until
+ stopped or ONU MIB database goes out of sync
+ """
+ self.advertise(OpenOmciEventType.state_change, self.state)
+ self._cancel_deferred()
+ self._device.publish_omci_capabilities_event()
+
+ def on_mib_sync_event(self, _topic, msg):
+ """
+ Handle In-Sync/Out-of-Sync for the MIB database
+ :param _topic: (str) Subscription topic
+ :param msg: (dict) In-Sync event data
+ """
+ if self._subscriptions.get(OnuDeviceEvents.MibDatabaseSyncEvent) is None:
+ return
+
+ if msg[IN_SYNC_KEY]:
+ self.synchronized()
+ else:
+ self.not_synchronized()