VOL-1451 Initial checkin of openonu build

Produced docker container capable of building and running
openonu/brcm_openonci_onu.  Copied over current onu code
and resolved all imports by copying into the local source tree.

Change-Id: Ib9785d37afc65b7d32ecf74aee2456352626e2b6
diff --git a/python/extensions/omci/state_machines/performance_intervals.py b/python/extensions/omci/state_machines/performance_intervals.py
new file mode 100644
index 0000000..73e145b
--- /dev/null
+++ b/python/extensions/omci/state_machines/performance_intervals.py
@@ -0,0 +1,901 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+import arrow
+from transitions import Machine
+from datetime import datetime, timedelta
+from random import uniform, shuffle
+from twisted.internet import reactor
+from common.utils.indexpool import IndexPool
+from voltha.protos.omci_mib_db_pb2 import OpenOmciEventType
+from voltha.extensions.omci.omci_defs import EntityOperations, ReasonCodes
+from voltha.extensions.omci.omci_cc import OmciCCRxEvents, OMCI_CC, TX_REQUEST_KEY, \
+    RX_RESPONSE_KEY
+from voltha.extensions.omci.database.mib_db_api import ATTRIBUTES_KEY
+from voltha.extensions.omci.tasks.omci_get_request import OmciGetRequest
+from voltha.extensions.omci.omci_entities import MacBridgePortConfigurationData
+from voltha.extensions.omci.omci_entities import EthernetPMMonitoringHistoryData, \
+    FecPerformanceMonitoringHistoryData, \
+    XgPonTcPerformanceMonitoringHistoryData, \
+    XgPonDownstreamPerformanceMonitoringHistoryData, \
+    XgPonUpstreamPerformanceMonitoringHistoryData, \
+    EthernetFrameUpstreamPerformanceMonitoringHistoryData, \
+    EthernetFrameDownstreamPerformanceMonitoringHistoryData, \
+    EthernetFrameExtendedPerformanceMonitoring, \
+    EthernetFrameExtendedPerformanceMonitoring64Bit, AniG
+
+
+RxEvent = OmciCCRxEvents
+OP = EntityOperations
+RC = ReasonCodes
+
+
+class PerformanceIntervals(object):
+    """
+    OpenOMCI ONU Performance Monitoring Intervals State machine
+
+    This state machine focuses on L2 Internet Data Service and Classical
+    PM (for the v2.0 release).
+    """
+    DEFAULT_STATES = ['disabled', 'starting', 'synchronize_time', 'idle', 'create_pm_me',
+                      'collect_data', 'threshold_exceeded']
+
+    DEFAULT_TRANSITIONS = [
+        {'trigger': 'start', 'source': 'disabled', 'dest': 'starting'},
+        {'trigger': 'tick', 'source': 'starting', 'dest': 'synchronize_time'},
+
+        {'trigger': 'success', 'source': 'synchronize_time', 'dest': 'idle'},
+        {'trigger': 'failure', 'source': 'synchronize_time', 'dest': 'synchronize_time'},
+
+        {'trigger': 'tick', 'source': 'idle', 'dest': 'collect_data'},
+        {'trigger': 'add_me', 'source': 'idle', 'dest': 'create_pm_me'},
+        {'trigger': 'delete_me', 'source': 'idle', 'dest': 'delete_pm_me'},
+
+        {'trigger': 'success', 'source': 'create_pm_me', 'dest': 'idle'},
+        {'trigger': 'failure', 'source': 'create_pm_me', 'dest': 'idle'},
+
+        {'trigger': 'success', 'source': 'delete_pm_me', 'dest': 'idle'},
+        {'trigger': 'failure', 'source': 'delete_pm_me', 'dest': 'idle'},
+
+        {'trigger': 'success', 'source': 'collect_data', 'dest': 'idle'},
+        {'trigger': 'failure', 'source': 'collect_data', 'dest': 'idle'},
+
+        # TODO: Add rebooted event transitions to disabled or synchronize_time
+        # TODO: Need to capture Threshold Crossing Alarms appropriately
+
+        # Do wildcard 'stop' trigger last so it covers all previous states
+        {'trigger': 'stop', 'source': '*', 'dest': 'disabled'},
+        {'trigger': 'reboot', 'source': '*', 'dest': 'rebooted'},
+    ]
+    DEFAULT_RETRY = 10               # Seconds to delay after task failure/timeout/poll
+    DEFAULT_TICK_DELAY = 15          # Seconds between checks for collection tick
+    DEFAULT_INTERVAL_SKEW = 10 * 60  # Seconds to skew past interval boundary
+    DEFAULT_COLLECT_ATTEMPTS = 3     # Maximum number of collection fetch attempts
+    DEFAULT_CREATE_ATTEMPTS = 15     # Maximum number of attempts to create a PM Managed Entities
+
+    def __init__(self, agent, device_id, tasks,
+                 advertise_events=False,
+                 states=DEFAULT_STATES,
+                 transitions=DEFAULT_TRANSITIONS,
+                 initial_state='disabled',
+                 timeout_delay=DEFAULT_RETRY,
+                 tick_delay=DEFAULT_TICK_DELAY,
+                 interval_skew=DEFAULT_INTERVAL_SKEW,
+                 collect_attempts=DEFAULT_COLLECT_ATTEMPTS,
+                 create_attempts=DEFAULT_CREATE_ATTEMPTS):
+        """
+        Class initialization
+
+        :param agent: (OpenOmciAgent) Agent
+        :param device_id: (str) ONU Device ID
+        :param tasks: (dict) Tasks to run
+        :param advertise_events: (bool) Advertise events on OpenOMCI Event Bus
+        :param states: (list) List of valid states
+        :param transitions: (dict) Dictionary of triggers and state changes
+        :param initial_state: (str) Initial state machine state
+        :param timeout_delay: (int/float) Number of seconds after a timeout to pause
+        :param tick_delay: (int/float) Collection poll check delay while idle
+        :param interval_skew: (int/float) Seconds to randomly skew the next interval
+                              collection to spread out requests for PM intervals
+        :param collect_attempts: (int) Max requests for a single PM interval before fail
+        :param create_attempts: (int) Max attempts to create PM Managed entities before stopping state machine
+        """
+        self.log = structlog.get_logger(device_id=device_id)
+
+        self._agent = agent
+        self._device_id = device_id
+        self._device = None
+        self._pm_config = None
+        self._timeout_delay = timeout_delay
+        self._tick_delay = tick_delay
+        self._interval_skew = interval_skew
+        self._collect_attempts = collect_attempts
+        self._create_attempts = create_attempts
+
+        self._sync_time_task = tasks['sync-time']
+        self._get_interval_task = tasks['collect-data']
+        self._create_pm_task = tasks['create-pm']
+        self._delete_pm_task = tasks['delete-pm']
+        self._advertise_events = advertise_events
+
+        self._omci_cc_subscriptions = {               # RxEvent.enum -> Subscription Object
+            RxEvent.MIB_Reset: None,
+            RxEvent.Create: None,
+            RxEvent.Delete: None
+        }
+        self._omci_cc_sub_mapping = {
+            RxEvent.MIB_Reset: self.on_mib_reset_response,
+            RxEvent.Create: self.on_create_response,
+            RxEvent.Delete: self.on_delete_response,
+        }
+        self._me_watch_list = {
+            MacBridgePortConfigurationData.class_id: {
+                'create-delete': self.add_remove_enet_frame_pm,
+                'instances': dict()  # BP entity_id -> (PM class_id, PM entity_id)
+            }
+        }
+        self._deferred = None
+        self._task_deferred = None
+        self._current_task = None
+        self._add_me_deferred = None
+        self._delete_me_deferred = None
+        self._next_interval = None
+        self._enet_entity_id = IndexPool(1024, 1)
+        self._add_pm_me_retry = 0
+
+        # (Class ID, Instance ID) -> Collect attempts remaining
+        self._pm_me_collect_retries = dict()
+        self._pm_me_extended_info = dict()
+        self._add_pm_me = dict()        # (pm cid, pm eid) -> (me cid, me eid, upstream)
+        self._del_pm_me = set()
+
+        # Pollable PM items
+        # Note that some items the KPI extracts are not listed below. These are the
+        # administrative states, operational states, and sensed ethernet type. The values
+        # in the MIB database should be accurate for these items.
+
+        self._ani_g_items = ["optical_signal_level", "transmit_optical_level"]
+        self._next_poll_time = datetime.utcnow()
+        self._poll_interval = 60                    # TODO: Fixed at once a minute
+
+        # Statistics and attributes
+        # TODO: add any others if it will support problem diagnosis
+
+        # Set up state machine to manage states
+        self.machine = Machine(model=self, states=states,
+                               transitions=transitions,
+                               initial=initial_state,
+                               queued=True,
+                               ignore_invalid_triggers=True,
+                               name='{}-{}'.format(self.__class__.__name__,
+                                                   device_id))
+        try:
+            import logging
+            logging.getLogger('transitions').setLevel(logging.WARNING)
+        except Exception as e:
+            self.log.exception('log-level-failed', e=e)
+
+
+    def _cancel_deferred(self):
+        d1, self._deferred = self._deferred, None
+        d2, self._task_deferred = self._task_deferred, None
+        d3, self._add_me_deferred = self._add_me_deferred, None
+        d4, self._delete_me_deferred = self._delete_me_deferred, None
+
+        for d in [d1, d2, d3, d4]:
+            try:
+                if d is not None and not d.called:
+                    d.cancel()
+            except:
+                pass
+
+    def _cancel_tasks(self):
+        task, self._current_task = self._current_task, None
+        if task is not None:
+            task.stop()
+
+    def __str__(self):
+        return 'PerformanceIntervals: Device ID: {}, State:{}'.format(self._device_id,
+                                                                      self.state)
+
+    def delete(self):
+        """
+        Cleanup any state information
+        """
+        self.stop()
+
+    @property
+    def device_id(self):
+        return self._device_id
+
+    @property
+    def advertise_events(self):
+        return self._advertise_events
+
+    @advertise_events.setter
+    def advertise_events(self, value):
+        if not isinstance(value, bool):
+            raise TypeError('Advertise event is a boolean')
+        self._advertise_events = value
+
+    def advertise(self, event, info):
+        """Advertise an event on the OpenOMCI event bus"""
+        if self._advertise_events:
+            self._agent.advertise(event,
+                                  {
+                                      'state-machine': self.machine.name,
+                                      'info': info,
+                                      'time': str(datetime.utcnow()),
+                                      'next': str(self._next_interval)
+                                  })
+
+    def set_pm_config(self, pm_config):
+        """
+        Set PM interval configuration
+
+        :param pm_config: (OnuPmIntervalMetrics) PM Interval configuration
+        :return:
+        """
+        self._pm_config = pm_config
+
+    def _me_is_supported(self, class_id):
+        """
+        Check to see if ONU supports this ME
+        :param class_id: (int) ME Class ID
+        :return: (bool) If ME is supported
+        """
+        #
+        supported = self._device.omci_capabilities.supported_managed_entities
+        return class_id in supported if supported is not None else False
+
+    def add_pm_me(self, pm_class_id, pm_entity_id, cid=0, eid=0, upstream=False):
+        """
+        Add a new Performance Monitoring ME.
+
+        The ME ID will be added to an internal list and will be added the next
+        time the idle state is reached. An 'add_pm_me' trigger will be raised in
+        case already in the Idle state.
+
+        :param pm_class_id: (int) ME Class ID (1..0xFFFE)
+        :param pm_entity_id: (int) Instance ID (1..0xFFFE)
+        :param cid: (int) Class ID of entity monitored, may be None
+        :param eid: (int) Instance ID of entity monitored, may be None
+        :param upstream: (bool): Flag indicating if PM is for upstream traffic
+        """
+        if not isinstance(pm_class_id, int):
+            raise TypeError('PM ME Instance ID is an integer')
+        if not 0 < pm_class_id < 0xFFFF:
+            raise ValueError('PM ME Instance ID must be 1..65534')
+
+        # Check to see if ONU supports this ME
+        if not self._me_is_supported(pm_class_id):
+            self.log.warn('unsupported-PM-me', class_id=pm_class_id)
+            return
+
+        key = (pm_class_id, pm_entity_id)
+        entry = (cid, eid, upstream)
+
+        if key not in self._pm_me_collect_retries and key not in self._add_pm_me:
+            self._add_pm_me[key] = entry
+
+            if self._add_me_deferred is None:
+                self._add_me_deferred = reactor.callLater(0, self.add_me)
+
+        if (pm_class_id, pm_entity_id) in self._del_pm_me:
+            self._del_pm_me.remove((pm_class_id, pm_entity_id))
+
+    def delete_pm_me(self, class_id, entity_id):
+        """
+        Remove a new Performance Monitoring ME.
+
+        The ME ID will be added to an internal list and will be removed the next
+        time the idle state is reached. An 'delete_pm_me' trigger will be raised in
+        case already in the Idle state.
+
+        :param class_id: (int) ME Class ID (1..0xFFFE)
+        :param entity_id: (int) Instance ID (1..0xFFFE)
+        """
+        if not isinstance(class_id, int):
+            raise TypeError('PM ME Class ID is an integer')
+        if not 0 < class_id < 0xFFFF:
+            raise ValueError('PM ME Class ID must be 1..65534')
+
+        # Check to see if ONU supports this ME
+        if not self._me_is_supported(class_id):
+            self.log.warn('unsupported-PM-me', class_id=class_id)
+            return
+
+        key = (class_id, entity_id)
+
+        if key in self._pm_me_collect_retries and key not in self._del_pm_me:
+            self._del_pm_me.add(key)
+
+            if self._delete_me_deferred is None:
+                self._delete_me_deferred = reactor.callLater(0, self.delete_me)
+
+        if key in self._add_pm_me:
+            self._add_pm_me.pop(key)
+
+    def on_enter_disabled(self):
+        """
+        State machine is being stopped
+        """
+        self.advertise(OpenOmciEventType.state_change, self.state)
+        self._cancel_deferred()
+        self._cancel_tasks()
+        self._next_interval = None
+
+        # Drop OMCI ME Response subscriptions
+        for event, sub in self._omci_cc_subscriptions.iteritems():
+            if sub is not None:
+                self._omci_cc_subscriptions[event] = None
+                self._device.omci_cc.event_bus.unsubscribe(sub)
+
+        # Manually remove ani ANI/PON and UNI PM interval MEs
+        config = self._device.configuration
+        anis = config.ani_g_entities
+        unis = config.uni_g_entities
+
+        if anis is not None:
+            for entity_id in anis.iterkeys():
+                self.delete_pm_me(FecPerformanceMonitoringHistoryData.class_id, entity_id)
+                self.delete_pm_me(XgPonTcPerformanceMonitoringHistoryData.class_id, entity_id)
+                self.delete_pm_me(XgPonDownstreamPerformanceMonitoringHistoryData.class_id, entity_id)
+                self.delete_pm_me(XgPonUpstreamPerformanceMonitoringHistoryData.class_id, entity_id)
+
+        if unis is not None:
+            for entity_id in config.uni_g_entities.iterkeys():
+                self.delete_pm_me(EthernetPMMonitoringHistoryData.class_id, entity_id)
+
+    def on_enter_starting(self):
+        """ Add the PON/ANI and UNI PM intervals"""
+        self.advertise(OpenOmciEventType.state_change, self.state)
+
+        self._device = self._agent.get_device(self._device_id)
+        self._cancel_deferred()
+
+        # Set up OMCI ME Response subscriptions
+        try:
+            for event, sub in self._omci_cc_sub_mapping.iteritems():
+                if self._omci_cc_subscriptions[event] is None:
+                    self._omci_cc_subscriptions[event] = \
+                        self._device.omci_cc.event_bus.subscribe(
+                            topic=OMCI_CC.event_bus_topic(self._device_id, event),
+                            callback=sub)
+
+        except Exception as e:
+            self.log.exception('omci-cc-subscription-setup', e=e)
+
+        try:
+            # Manually start some ANI/PON and UNI PM interval MEs
+            config = self._device.configuration
+            anis = config.ani_g_entities
+            unis = config.uni_g_entities
+
+            if anis is not None:
+                for entity_id in anis.iterkeys():
+                    self.add_pm_me(FecPerformanceMonitoringHistoryData.class_id,
+                                   entity_id)
+                    self.add_pm_me(XgPonTcPerformanceMonitoringHistoryData.class_id,
+                                   entity_id)
+                    self.add_pm_me(XgPonDownstreamPerformanceMonitoringHistoryData.class_id,
+                                   entity_id)
+                    self.add_pm_me(XgPonUpstreamPerformanceMonitoringHistoryData.class_id,
+                                   entity_id)
+
+            if unis is not None:
+                for entity_id in config.uni_g_entities.iterkeys():
+                    self.add_pm_me(EthernetPMMonitoringHistoryData.class_id, entity_id)
+
+            # Look for existing instances of dynamically created ME's that have PM
+            # associated with them and add them now
+            for class_id in self._me_watch_list.iterkeys():
+                instances = {k: v for k, v in
+                             self._device.query_mib(class_id=class_id).items()
+                             if isinstance(k, int)}
+
+                for entity_id, data in instances.items():
+                    method = self._me_watch_list[class_id]['create-delete']
+                    cid, eid = method(None, class_id, entity_id,
+                                      add=True, attributes=data[ATTRIBUTES_KEY])
+                    if cid > 0:
+                        # BP entity_id -> (PM class_id, PM entity_id)
+                        instances = self._me_watch_list[class_id]['instances']
+                        instances[entity_id] = (cid, eid)
+
+        except Exception as e:
+            self.log.exception('pm-me-setup', class_id=class_id, e=e)
+
+        # Got to synchronize_time state
+        self._deferred = reactor.callLater(0, self.tick)
+
+    def on_enter_synchronize_time(self):
+        """
+        State machine has just transitioned to the synchronize_time state
+        """
+        self.advertise(OpenOmciEventType.state_change, self.state)
+        self._cancel_deferred()
+
+        def success(_results):
+            self.log.debug('sync-time-success')
+            self._current_task = None
+            self._deferred = reactor.callLater(0, self.success)
+            # Calculate next interval time
+            self._next_interval = self.get_next_interval
+
+        def failure(reason):
+            self.log.info('sync-time-failure', reason=reason)
+            self._current_task = None
+            self._deferred = reactor.callLater(self._timeout_delay, self.failure)
+
+        # Schedule a task to set the ONU time
+        self._current_task = self._sync_time_task(self._agent, self._device_id)
+        self._task_deferred = self._device.task_runner.queue_task(self._current_task)
+        self._task_deferred.addCallbacks(success, failure)
+
+    def on_enter_idle(self):
+        """
+        State machine has just transitioned to the idle state
+
+        In this state, any added PM MEs that need to be created will be.
+        TODO: some non-interval PM stats (if there are any) are collected here
+        """
+        self.advertise(OpenOmciEventType.state_change, self.state)
+        self._cancel_deferred()
+
+        if len(self._del_pm_me) and self._delete_me_deferred is None:
+            self._delete_me_deferred = reactor.callLater(0, self.delete_me)
+
+        elif len(self._add_pm_me) and self._add_me_deferred is None:
+            self._add_me_deferred = reactor.callLater(0, self.add_me)
+
+        elif datetime.utcnow() >= self._next_poll_time:
+            def success(results):
+                self._device.timestamp = arrow.utcnow().float_timestamp
+                self._device.mib_synchronizer.mib_set(results.me_class.class_id,
+                                                      results.entity_id,
+                                                      results.attributes)
+                self._next_poll_time = datetime.utcnow() + timedelta(seconds=self._poll_interval)
+
+            def failure(reason):
+                self.log.info('poll-failure', reason=reason)
+                self._device.timestamp = None
+                return None
+
+            # Scan all ANI-G ports
+            ani_g_entities = self._device.configuration.ani_g_entities
+            ani_g_entities_ids = ani_g_entities.keys() if ani_g_entities is not None else None
+
+            if ani_g_entities_ids is not None and len(ani_g_entities_ids):
+                for entity_id in ani_g_entities_ids:
+                    task = OmciGetRequest(self._agent, self.device_id,
+                                          AniG, entity_id,
+                                          self._ani_g_items, allow_failure=True)
+                    self._task_deferred = self._device.task_runner.queue_task(task)
+                    self._task_deferred.addCallbacks(success, failure)
+            else:
+                self.log.warn('poll-pm-no-anis')
+                self._next_poll_time = datetime.utcnow() + timedelta(seconds=self._poll_interval)
+
+        # TODO: Compute a better mechanism than just polling here, perhaps based on
+        #       the next time to fetch data for 'any' interval
+        self._deferred = reactor.callLater(self._tick_delay, self.tick)
+
+    def on_enter_create_pm_me(self):
+        """
+        State machine has just transitioned to the create_pm_me state
+        """
+        self.advertise(OpenOmciEventType.state_change, self.state)
+        self._cancel_deferred()
+        self._cancel_tasks()
+        mes, self._add_pm_me = self._add_pm_me, dict()
+
+        def success(results):
+            self.log.debug('create-me-success', results=results)
+
+            # Check if already here. The create request could have received
+            # an already-exists status code which we consider successful
+            for pm, me in mes.items():
+                self._pm_me_collect_retries[pm] = self.pm_collected(pm)
+                self._pm_me_extended_info[pm] = me
+
+            self._current_task = None
+            self._deferred = reactor.callLater(0, self.success)
+
+        def failure(reason):
+            self.log.info('create-me-failure', reason=reason, retries=self._add_pm_me_retry)
+            self._current_task = None
+            if self._add_pm_me_retry <= self._create_attempts:
+              for pm, me in mes.items():
+                  self._add_pm_me[pm] = me
+              self._add_pm_me_retry += 1
+              self._deferred = reactor.callLater(self._timeout_delay, self.failure)
+            else:
+              # we cant seem to create any collection me, no point in doing anything
+              self.log.warn('unable-to-create-pm-me-disabling-collection', reason=reason, device_id=self._device_id)
+              self._deferred = reactor.callLater(self._timeout_delay, self.stop)
+
+        self._current_task = self._create_pm_task(self._agent, self._device_id, mes)
+        self._task_deferred = self._device.task_runner.queue_task(self._current_task)
+        self._task_deferred.addCallbacks(success, failure)
+
+    def on_enter_delete_pm_me(self):
+        """
+        State machine has just transitioned to the delete_pm_me state
+        """
+        self.advertise(OpenOmciEventType.state_change, self.state)
+        self._cancel_deferred()
+        self._cancel_tasks()
+
+        mes, self._del_pm_me = self._del_pm_me, set()
+
+        def success(results):
+            self.log.debug('delete-me-success', results=results)
+            self._current_task = None
+            for me in mes:
+                self._pm_me_collect_retries.pop(me)
+
+            self._deferred = reactor.callLater(0, self.success)
+
+        def failure(reason):
+            self.log.info('delete-me-failure', reason=reason)
+            self._current_task = None
+            for me in mes:
+                self._del_pm_me.add(me)
+
+            self._deferred = reactor.callLater(self._timeout_delay, self.failure)
+
+        self._current_task = self._delete_pm_task(self._agent, self._device_id, mes)
+        self._task_deferred = self._device.task_runner.queue_task(self._current_task)
+        self._task_deferred.addCallbacks(success, failure)
+
+    def on_enter_collect_data(self):
+        """
+        State machine has just transitioned to the collect_data state
+        """
+
+        if self._next_interval is not None and self._next_interval > datetime.utcnow():
+            self.log.debug('wait-next-interval')
+            # Not ready for next interval, transition back to idle and we should get
+            # called again after a short delay
+            reactor.callLater(0, self.success)
+            return
+
+        self.advertise(OpenOmciEventType.state_change, self.state)
+        self._cancel_deferred()
+        self._cancel_tasks()
+        keys = self._pm_me_collect_retries.keys()
+        shuffle(keys)
+
+        for key in keys:
+            class_id = key[0]
+            entity_id = key[1]
+
+            self.log.debug("in-enter-collect-data", data_key=key,
+                           retries=self._pm_me_collect_retries[key])
+
+            # Collect the data ?
+            if self._pm_me_collect_retries[key] > 0:
+                def success(results):
+                    self.log.debug('collect-success', results=results,
+                                   class_id=results.get('class_id'),
+                                   entity_id=results.get('entity_id'))
+                    self._current_task = None
+                    self._pm_me_collect_retries[key] = 0
+                    self._deferred = reactor.callLater(0, self.success)
+                    return results
+
+                def failure(reason):
+                    self.log.info('collect-failure', reason=reason)
+                    self._current_task = None
+                    self._pm_me_collect_retries[key] -= 1
+                    self._deferred = reactor.callLater(self._timeout_delay, self.failure)
+                    return reason   # Halt callback processing
+
+                # start the task
+                if key in self._pm_me_extended_info:
+                    self.log.debug('collect-extended-info-found', data_key=key,
+                                   extended_info=self._pm_me_extended_info[key])
+                    parent_class_id = self._pm_me_extended_info[key][0]
+                    parent_entity_id = self._pm_me_extended_info[key][1]
+                    upstream = self._pm_me_extended_info[key][2]
+                else:
+                    self.log.debug('collect-extended-info-not-found', data_key=key)
+                    parent_class_id = None
+                    parent_entity_id = None
+                    upstream = None
+
+                self._current_task = self._get_interval_task(self._agent, self._device_id,
+                                                             class_id, entity_id,
+                                                             parent_class_id=parent_class_id,
+                                                             parent_entity_id=parent_entity_id,
+                                                             upstream=upstream)
+                self._task_deferred = self._device.task_runner.queue_task(self._current_task)
+                self._task_deferred.addCallbacks(success, failure)
+                self._task_deferred.addCallback(self.publish_data)
+                return
+
+        # Here if all intervals have been collected (we are up to date)
+        self._next_interval = self.get_next_interval
+        self.log.debug('collect-calculate-next', next=self._next_interval)
+
+        self._pm_me_collect_retries = dict.fromkeys(self._pm_me_collect_retries, self._collect_attempts)
+        reactor.callLater(0, self.success)
+
+    def on_enter_threshold_exceeded(self):
+        """
+        State machine has just transitioned to the threshold_exceeded state
+        """
+        pass  # TODO: Not sure if we want this state. Need to get alarm synchronizer working first
+
+    @property
+    def get_next_interval(self):
+        """
+        Determine the time for the next interval collection for all of this
+        ONUs PM Intervals. Earliest fetch time is at least 1 minute into the
+        next interval.
+
+        :return: (datetime) UTC time to get the next interval
+        """
+        now = datetime.utcnow()
+
+        # Get delta seconds to at least 1 minute into next interval
+        next_delta_secs = (16 - (now.minute % 15)) * 60
+        next_interval = now + timedelta(seconds=next_delta_secs)
+
+        # NOTE: For debugging, uncomment next section to perform collection
+        #       right after initial code startup/mib-sync
+        if self._next_interval is None:
+            return now     # Do it now  (just for debugging purposes)
+
+        # Skew the next time up to the maximum specified
+        # TODO: May want to skew in a shorter range and select the minute
+        #       based off some device property value to make collection a
+        #       little more predictable on a per-ONU basis.
+        return next_interval + timedelta(seconds=uniform(0, self._interval_skew))
+
+    def pm_collected(self, key):
+        """
+        Query database and determine if PM data needs to be collected for this ME
+        """
+        class_id = key[0]
+        entity_id = key[1]
+
+        return self._collect_attempts        # TODO: Implement persistent storage
+
+    def publish_data(self, results):
+        """
+        Publish the PM interval results on the appropriate bus.  The results are
+        a dictionary with the following format.
+
+            'class-id':          (int) ME Class ID,
+            'entity-id':         (int) ME Entity ID,
+            'me-name':           (str) ME Class name,   # Mostly for debugging...
+            'interval-end-time': None,
+            'interval-utc-time': (DateTime) UTC time when retrieved from ONU,
+
+            Counters added here as they are retrieved with the format of
+            'counter-attribute-name': value (int)
+
+        :param results: (dict) PM results
+        """
+        self.log.debug('collect-publish', results=results)
+
+        if self._pm_config is not None:
+            self._pm_config.publish_metrics(results)
+
+        pass  # TODO: Save off last time interval fetched to persistent storage?
+
+    def on_mib_reset_response(self, _topic, msg):
+        """
+        Called upon receipt of a MIB Reset Response for this ONU
+
+        :param _topic: (str) OMCI-RX topic
+        :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
+        """
+        self.log.debug('on-mib-reset-response', state=self.state)
+        try:
+            response = msg[RX_RESPONSE_KEY]
+            omci_msg = response.fields['omci_message'].fields
+            status = omci_msg['success_code']
+
+            if status == RC.Success:
+                for class_id in self._me_watch_list.iterkeys():
+                    # BP entity_id -> (PM class_id, PM entity_id)
+                    instances = self._me_watch_list[class_id]['instances']
+                    for _, me_pair in instances.items():
+                        self._me_watch_list[class_id]['create-delete'](None, me_pair[0],
+                                                                       me_pair[1], add=False)
+                    self._me_watch_list[class_id]['instances'] = dict()
+
+        except KeyError:
+            pass            # NOP
+
+    def on_create_response(self, _topic, msg):
+        """
+        Called upon receipt of a Create Response for this ONU.
+
+        :param _topic: (str) OMCI-RX topic
+        :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
+        """
+        self.log.debug('on-create-response', state=self.state)
+
+        def valid_request(stat, c_id, e_id):
+            return self._omci_cc_subscriptions[RxEvent.Delete] is not None\
+                and stat in (RC.Success, RC.InstanceExists) \
+                and c_id in self._me_watch_list.keys() \
+                and e_id not in self._me_watch_list[c_id]['instances']
+
+        response = msg[RX_RESPONSE_KEY]
+        omci = response.fields['omci_message'].fields
+        class_id = omci['entity_class']
+        entity_id = omci['entity_id']
+        status = omci['success_code']
+
+        if valid_request(status, class_id, entity_id):
+            request = msg[TX_REQUEST_KEY]
+            method = self._me_watch_list[class_id]['create-delete']
+            cid, eid = method(request, class_id, entity_id, add=True)
+
+            if cid > 0:
+                # BP entity_id -> (PM class_id, PM entity_id)
+                instances = self._me_watch_list[class_id]['instances']
+                instances[entity_id] = (cid, eid)
+
+    def on_delete_response(self, _topic, msg):
+        """
+        Called upon receipt of a Delete Response for this ONU
+
+        :param _topic: (str) OMCI-RX topic
+        :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
+        """
+        self.log.debug('on-delete-response', state=self.state)
+
+        def valid_request(stat, cid, eid):
+            return self._omci_cc_subscriptions[RxEvent.Delete] is not None\
+                and stat in (RC.Success, RC.UnknownInstance) \
+                and cid in self._me_watch_list.keys() \
+                and eid in self._me_watch_list[cid]['instances']
+
+        response = msg[RX_RESPONSE_KEY]
+        omci = response.fields['omci_message'].fields
+        class_id = omci['entity_class']
+        entity_id = omci['entity_id']
+        status = omci['success_code']
+
+        if valid_request(status, class_id, entity_id):
+            request = msg[TX_REQUEST_KEY]
+            method = self._me_watch_list[class_id]['create-delete']
+
+            method(request, class_id, entity_id, add=False)
+            # BP entity_id -> (PM class_id, PM entity_id)
+            instances = self._me_watch_list[class_id]['instances']
+            del instances[entity_id]
+
+    def get_pm_entity_id_for_add(self, pm_cid, eid):
+        """
+        Select the Entity ID to use for a specific PM Class ID.  For extended
+        PM ME's, an entity id (>0) is allocated
+
+        :param pm_cid: (int) PM ME Class ID to create/get entry ID for
+        :param eid: (int) Reference class's entity ID. Used as PM entity ID for non-
+                    extended PM history PMs
+        :return: (int) Entity ID to use
+        """
+        if pm_cid in (EthernetFrameExtendedPerformanceMonitoring.class_id,
+                      EthernetFrameExtendedPerformanceMonitoring64Bit.class_id):
+            return self._enet_entity_id.get_next()
+        return eid
+
+    def release_pm_entity_id(self, pm_cid, eid):
+        if pm_cid in (EthernetFrameExtendedPerformanceMonitoring.class_id,
+                      EthernetFrameExtendedPerformanceMonitoring64Bit.class_id):
+            try:
+                self._enet_entity_id.release(eid)
+            except:
+                pass
+
+    def add_remove_enet_frame_pm(self, request, class_id, entity_id,
+                                 add=True,
+                                 attributes=None):
+        """
+        Add/remove PM for the dynamic MAC Port configuration data.
+
+        This can be called in a variety of ways:
+
+           o If from an Response event from OMCI_CC, the request will contain
+             the original create/delete request. The class_id and entity_id will
+             be the MAC Data Configuration Data class and instance ID.
+             add = True if create, False if delete
+
+           o If starting up (and the associated ME is already created), the MAC
+             Data Configuration Data class and instance ID, and attributes are
+             provided. request = None and add = True
+
+           o If cleaning up (stopping), the PM ME class_id, entity_id are provided.
+             request = None and add = False
+
+        :return: (int, int) PM ME class_id and entity_id for add/remove was performed.
+                            class and entity IDs are non-zero on success
+        """
+        pm_entity_id = 0
+        cid = 0
+        eid = 0
+        upstream = False
+
+        def tp_type_to_pm(tp):
+            # TODO: Support 64-bit extended Monitoring MEs.
+            # This will result in the need to maintain entity IDs of PMs differently
+            upstream_types = [  # EthernetFrameExtendedPerformanceMonitoring64Bit.class_id,
+                              EthernetFrameExtendedPerformanceMonitoring.class_id,
+                              EthernetFrameUpstreamPerformanceMonitoringHistoryData.class_id], True
+            downstream_types = [  # EthernetFrameExtendedPerformanceMonitoring64Bit.class_id,
+                                EthernetFrameExtendedPerformanceMonitoring.class_id,
+                                EthernetFrameDownstreamPerformanceMonitoringHistoryData.class_id], False
+            return {
+                1: downstream_types,
+                3: upstream_types,
+                5: downstream_types,
+                6: downstream_types,
+            }.get(tp, None)
+
+        if request is not None:
+            assert class_id == MacBridgePortConfigurationData.class_id
+
+            # Is this associated with the ANI or the UNI side of the bridge?
+            # For VOLTHA v2.0, only high-speed internet data service is
+            attributes = request.fields['omci_message'].fields['data']
+            pm_class_ids, upstream = tp_type_to_pm(attributes['tp_type'])
+            cid = request.fields['omci_message'].fields['entity_class']
+            eid = request.fields['omci_message'].fields['entity_id']
+            if not add:
+                instances = self._me_watch_list[cid]['instances']
+                _, pm_entity_id = instances.get(eid, (None, None))
+
+        elif add:
+            assert class_id == MacBridgePortConfigurationData.class_id
+            assert isinstance(attributes, dict)
+
+            # Is this associated with the ANI or the UNI side of the bridge?
+            pm_class_ids, upstream = tp_type_to_pm(attributes.get('tp_type'))
+            cid = class_id
+            eid = entity_id
+
+        else:
+            assert class_id in (EthernetFrameUpstreamPerformanceMonitoringHistoryData.class_id,
+                                EthernetFrameDownstreamPerformanceMonitoringHistoryData.class_id,
+                                EthernetFrameExtendedPerformanceMonitoring.class_id,
+                                EthernetFrameExtendedPerformanceMonitoring64Bit.class_id)
+            pm_class_ids = [class_id]
+
+        if pm_class_ids is None:
+            return False     # Unable to select a supported ME for this ONU
+
+        if add:
+            for pm_class_id in pm_class_ids:
+                if self._me_is_supported(pm_class_id):
+                    pm_entity_id = self.get_pm_entity_id_for_add(pm_class_id, eid)
+                    self.add_pm_me(pm_class_id, pm_entity_id, cid=cid, eid=eid,
+                                   upstream=upstream)
+                    return pm_class_id, pm_entity_id
+        else:
+            for pm_class_id in pm_class_ids:
+                if self._me_is_supported(pm_class_id):
+                    self.delete_pm_me(pm_class_id, pm_entity_id)
+                    self.release_pm_entity_id(pm_class_id, pm_entity_id)
+                    return pm_class_id, pm_entity_id
+
+        return 0, 0