VOL-1397: Adtran-OLT - Initial containerization commit
- Need to move VERSION to base directory
Change-Id: I9d62d0607a011ce642e379fd92b35ec48b300070
diff --git a/adapters/adtran_olt/pon_port.py b/adapters/adtran_olt/pon_port.py
new file mode 100644
index 0000000..70ec564
--- /dev/null
+++ b/adapters/adtran_olt/pon_port.py
@@ -0,0 +1,886 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import random
+import arrow
+
+import structlog
+from adapters.adtran_common.port import AdtnPort
+from twisted.internet import reactor, defer
+from twisted.internet.defer import inlineCallbacks, returnValue
+from adtran_olt_handler import AdtranOltHandler
+from adapters.adtran_common.net.adtran_rest import RestInvalidResponseCode
+from codec.olt_config import OltConfig
+from onu import Onu
+from pyvoltha.adapters.extensions.alarms.onu.onu_los_alarm import OnuLosAlarm
+from pyvoltha.adapters.extensions.alarms.onu.onu_discovery_alarm import OnuDiscoveryAlarm
+from pyvoltha.protos.common_pb2 import AdminState
+from pyvoltha.protos.device_pb2 import Port
+import resources.adtranolt_platform as platform
+
+
+class PonPort(AdtnPort):
+ """
+ GPON Port
+ """
+ MAX_ONUS_SUPPORTED = 128
+ MAX_DEPLOYMENT_RANGE = 25000 # Meters (OLT-PB maximum)
+
+ _MCAST_ONU_ID = 253
+ _MCAST_ALLOC_BASE = 0x500
+
+ # AutoActivate should be used if xPON configuration is not supported
+ _SUPPORTED_ACTIVATION_METHODS = ['autodiscovery', 'autoactivate']
+ _SUPPORTED_AUTHENTICATION_METHODS = ['serial-number']
+
+ def __init__(self, parent, **kwargs):
+ super(PonPort, self).__init__(parent, **kwargs)
+ assert 'pon-id' in kwargs, 'PON ID not found'
+
+ self._parent = parent
+ self._pon_id = kwargs['pon-id']
+ self.log = structlog.get_logger(device_id=parent.device_id, pon_id=self._pon_id)
+ self._port_no = kwargs['port_no']
+ self._physical_port_name = 'xpon 0/{}'.format(self._pon_id+1)
+ self._label = 'pon-{}'.format(self._pon_id)
+
+ self._in_sync = False
+ self._expedite_sync = False
+ self._expedite_count = 0
+
+ self._discovery_tick = 20.0
+ self._no_onu_discover_tick = self._discovery_tick / 2
+ self._discovered_onus = [] # List of serial numbers
+ self._discovery_deferred = None # Specifically for ONU discovery
+
+ self._onus = {} # serial_number-base64 -> ONU
+ self._onu_by_id = {} # onu-id -> ONU
+ self._mcast_gem_ports = {} # VLAN -> GemPort
+
+ self._active_los_alarms = set() # ONU-ID
+
+ # xPON configuration
+ self._activation_method = 'autoactivate'
+
+ self._downstream_fec_enable = True
+ self._upstream_fec_enable = True
+ self._deployment_range = 25000
+ self._authentication_method = 'serial-number'
+ self._mcast_aes = False
+
+ # Statistics
+ self.tx_bip_errors = 0
+
+ def __str__(self):
+ return "PonPort-{}: Admin: {}, Oper: {}, OLT: {}".format(self._label,
+ self._admin_state,
+ self._oper_status,
+ self.olt)
+
+ def get_port(self):
+ """
+ Get the VOLTHA PORT object for this port
+ :return: VOLTHA Port object
+ """
+ if self._port is None:
+ self._port = Port(port_no=self._port_no,
+ label=self._label,
+ type=Port.PON_OLT,
+ admin_state=self._admin_state,
+ oper_status=self._oper_status)
+
+ return self._port
+
+ @property
+ def pon_id(self):
+ return self._pon_id
+
+ @property
+ def onus(self):
+ """
+ Get a set of all ONUs. While the set is immutable, do not use this method
+ to get a collection that you will iterate through that my yield the CPU
+ such as inline callback. ONUs may be deleted at any time and they will
+ set some references to other objects to NULL during the 'delete' call.
+ Instead, get a list of ONU-IDs and iterate on these and call the 'onu'
+ method below (which will return 'None' if the ONU has been deleted.
+
+ :return: (frozenset) collection of ONU objects on this PON
+ """
+ return frozenset(self._onus.values())
+
+ @property
+ def onu_ids(self):
+ return frozenset(self._onu_by_id.keys())
+
+ def onu(self, onu_id):
+ return self._onu_by_id.get(onu_id)
+
+ @property
+ def in_service_onus(self):
+ return len({onu.onu_id for onu in self.onus
+ if onu.onu_id not in self._active_los_alarms})
+
+ @property
+ def closest_onu_distance(self):
+ distance = -1
+ for onu in self.onus:
+ if onu.fiber_length < distance or distance == -1:
+ distance = onu.fiber_length
+ return distance
+
+ @property
+ def downstream_fec_enable(self):
+ return self._downstream_fec_enable
+
+ @downstream_fec_enable.setter
+ def downstream_fec_enable(self, value):
+ assert isinstance(value, bool), 'downstream FEC enabled is a boolean'
+
+ if self._downstream_fec_enable != value:
+ self._downstream_fec_enable = value
+ if self.state == AdtnPort.State.RUNNING:
+ self.deferred = self._set_pon_config("downstream-fec-enable", value)
+
+ @property
+ def upstream_fec_enable(self):
+ return self._upstream_fec_enable
+
+ @upstream_fec_enable.setter
+ def upstream_fec_enable(self, value):
+ assert isinstance(value, bool), 'upstream FEC enabled is a boolean'
+ if self._upstream_fec_enable != value:
+ self._upstream_fec_enable = value
+ if self.state == AdtnPort.State.RUNNING:
+ self.deferred = self._set_pon_config("upstream-fec-enable", value)
+
+ @property
+ def any_upstream_fec_enabled(self):
+ for onu in self.onus:
+ if onu.upstream_fec_enable and onu.enabled:
+ return True
+ return False
+
+ @property
+ def mcast_aes(self):
+ return self._mcast_aes
+
+ @mcast_aes.setter
+ def mcast_aes(self, value):
+ assert isinstance(value, bool), 'MCAST AES is a boolean'
+ if self._mcast_aes != value:
+ self._mcast_aes = value
+ if self.state == AdtnPort.State.RUNNING:
+ pass # TODO
+
+ @property
+ def deployment_range(self):
+ """Maximum deployment range (in meters)"""
+ return self._deployment_range
+
+ @deployment_range.setter
+ def deployment_range(self, value):
+ """Maximum deployment range (in meters)"""
+ if not 0 <= value <= PonPort.MAX_DEPLOYMENT_RANGE:
+ raise ValueError('Deployment range should be 0..{} meters'.
+ format(PonPort.MAX_DEPLOYMENT_RANGE))
+ if self._deployment_range != value:
+ self._deployment_range = value
+ if self.state == AdtnPort.State.RUNNING:
+ self.deferred = self._set_pon_config("deployment-range", value)
+
+ @property
+ def discovery_tick(self):
+ return self._discovery_tick * 10
+
+ @discovery_tick.setter
+ def discovery_tick(self, value):
+ if value < 0:
+ raise ValueError("Polling interval must be >= 0")
+
+ if self.discovery_tick != value:
+ self._discovery_tick = value / 10
+
+ try:
+ if self._discovery_deferred is not None and \
+ not self._discovery_deferred.called:
+ self._discovery_deferred.cancel()
+ except:
+ pass
+ self._discovery_deferred = None
+
+ if self._discovery_tick > 0:
+ self._discovery_deferred = reactor.callLater(self._discovery_tick,
+ self._discover_onus)
+
+ @property
+ def activation_method(self):
+ return self._activation_method
+
+ @activation_method.setter
+ def activation_method(self, value):
+ value = value.lower()
+ if value not in PonPort._SUPPORTED_ACTIVATION_METHODS:
+ raise ValueError('Invalid ONU activation method')
+
+ self._activation_method = value
+
+ @property
+ def authentication_method(self):
+ return self._authentication_method
+
+ @authentication_method.setter
+ def authentication_method(self, value):
+ value = value.lower()
+ if value not in PonPort._SUPPORTED_AUTHENTICATION_METHODS:
+ raise ValueError('Invalid ONU authentication method')
+ self._authentication_method = value
+
+ def cancel_deferred(self):
+ super(PonPort, self).cancel_deferred()
+
+ d, self._discovery_deferred = self._discovery_deferred, None
+
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except Exception as e:
+ pass
+
+ def _update_adapter_agent(self):
+ """
+ Update the port status and state in the core
+ """
+ self.log.debug('update-adapter-agent', admin_state=self._admin_state,
+ oper_status=self._oper_status)
+
+ # because the core does not provide methods for updating admin
+ # and oper status per port, we need to copy any existing port
+ # info so that we don't wipe out the peers
+ if self._port is not None:
+ agent_ports = self.adapter_agent.get_ports(self.olt.device_id, Port.PON_OLT)
+
+ agent_port = next((ap for ap in agent_ports if ap.port_no == self._port_no), None)
+
+ # copy current Port info
+ if agent_port is not None:
+ self._port = agent_port
+
+ # set new states
+ self._port.admin_state = self._admin_state
+ self._port.oper_status = self._oper_status
+
+ # adapter_agent add_port also does an update of existing port
+ self.adapter_agent.add_port(self.olt.device_id, self.get_port())
+
+ @inlineCallbacks
+ def finish_startup(self):
+ """
+ Do all startup offline since REST may fail
+ """
+ if self.state != AdtnPort.State.INITIAL:
+ returnValue('Done')
+
+ self.log.debug('final-startup')
+ results = None
+
+ try:
+ self.deferred = self._get_pon_config()
+ results = yield self.deferred
+
+ except Exception as e:
+ self.log.exception('initial-GET', e=e)
+ self.deferred = reactor.callLater(5, self.finish_startup)
+ returnValue(self.deferred)
+
+ # Load config from hardware
+
+ enabled = results.get('enabled', False)
+ downstream_fec_enable = results.get('downstream-fec-enable', False)
+ upstream_fec_enable = results.get('upstream-fec-enable', False)
+ deployment_range = results.get('deployment-range', 25000)
+ self._in_sync = True
+
+ if enabled != self._enabled:
+ try:
+ self.deferred = self._set_pon_config("enabled", True)
+ yield self.deferred
+
+ except Exception as e:
+ self.log.exception('final-startup-enable', e=e)
+ self.deferred = reactor.callLater(3, self.finish_startup)
+ returnValue(self.deferred)
+
+ if downstream_fec_enable != self._downstream_fec_enable:
+ try:
+ self.deferred = self._set_pon_config("downstream-fec-enable",
+ self._downstream_fec_enable)
+ yield self.deferred
+
+ except Exception as e:
+ self.log.warning('final-startup-downstream-FEC', e=e)
+ self._in_sync = False
+ # Non-fatal. May have failed due to no SFQ in slot
+
+ if upstream_fec_enable != self._upstream_fec_enable:
+ try:
+ self.deferred = self._set_pon_config("upstream-fec-enable",
+ self._upstream_fec_enable)
+ yield self.deferred
+
+ except Exception as e:
+ self.log.warning('final-startup-upstream-FEC', e=e)
+ self._in_sync = False
+ # Non-fatal. May have failed due to no SFQ in slot
+
+ if deployment_range != self._deployment_range:
+ try:
+ self.deferred = self._set_pon_config("deployment-range",
+ self._deployment_range)
+ yield self.deferred
+
+ except Exception as e:
+ self.log.warning('final-startup-deployment-range', e=e)
+ self._in_sync = False
+ # Non-fatal. May have failed due to no SFQ in slot
+
+ if len(self._onus) > 0:
+ dl = []
+ for onu_id in self.onu_ids:
+ onu = self.onu(onu_id)
+ if onu is not None:
+ dl.append(onu.restart())
+ yield defer.gatherResults(dl, consumeErrors=True)
+
+ # Begin to ONU discovery and hardware sync
+
+ self._discovery_deferred = reactor.callLater(5, self._discover_onus)
+
+ # If here, initial settings were successfully written to hardware
+
+ super(PonPort, self).finish_startup()
+ returnValue('Enabled')
+
+ @inlineCallbacks
+ def finish_stop(self):
+ # Remove all existing ONUs. They will need to be re-discovered
+ dl = []
+ onu_ids = frozenset(self._onu_by_id.keys())
+ for onu_id in onu_ids:
+ try:
+ dl.append(self.delete_onu(onu_id))
+
+ except Exception as e:
+ self.log.exception('onu-cleanup', onu_id=onu_id, e=e)
+
+ dl.append(self._set_pon_config("enabled", False))
+ results = yield defer.gatherResults(dl, consumeErrors=True)
+ returnValue(results)
+
+ @inlineCallbacks
+ def reset(self):
+ """
+ Set the PON Port to a known good state on initial port startup. Actual
+ PON 'Start' is done elsewhere
+ """
+ initial_port_state = AdminState.ENABLED
+ self.log.info('reset', initial_state=initial_port_state)
+
+ try:
+ self.deferred = self._get_pon_config()
+ results = yield self.deferred
+ enabled = results.get('enabled', False)
+
+ except Exception as e:
+ self.log.exception('get-config', e=e)
+ enabled = False
+
+ enable = initial_port_state == AdminState.ENABLED
+
+ if enable != enabled:
+ try:
+ self.deferred = yield self._set_pon_config("enabled", enable)
+ except Exception as e:
+ self.log.exception('reset-enabled', e=e, enabled=enabled)
+
+ # TODO: Move to 'set_pon_config' method and also make sure GRPC/Port is ok
+ self._admin_state = AdminState.ENABLED if enable else AdminState.DISABLED
+
+ try:
+ # Walk the provisioned ONU list and disable any existing ONUs
+ results = yield self._get_onu_config()
+
+ if isinstance(results, list) and len(results) > 0:
+ onu_configs = OltConfig.Pon.Onu.decode(results)
+ dl = []
+ for onu_id in onu_configs.iterkeys():
+ dl.append(self.delete_onu(onu_id))
+
+ try:
+ if len(dl) > 0:
+ yield defer.gatherResults(dl, consumeErrors=True)
+
+ except Exception as e:
+ self.log.exception('rest-ONU-delete', e=e)
+ pass # Non-fatal
+
+ except Exception as e:
+ self.log.exception('onu-delete', e=e)
+
+ returnValue('Reset complete')
+
+ def gem_ids(self, logical_port, flow_vlan, multicast_gems=False):
+ """
+ Get all GEM Port IDs used on a given PON
+
+ :param logical_port: (int) Logical port number of ONU. None if for all ONUs
+ on PON, if Multicast, VID for Multicast, or None for all
+ Multicast GEMPorts
+ :param flow_vlan: (int) If not None, this is the ingress tag (c-tag)
+ :param multicast_gems: (boolean) Select from available Multicast GEM Ports
+ :return: (dict) data_gem -> key -> onu-id, value -> tuple(sorted list of GEM Port IDs, onu_vid)
+ mcast_gem-> key -> mcast-vid, value -> GEM Port IDs
+ """
+ gem_ids = {}
+
+ if multicast_gems:
+ # Multicast GEMs belong to the PON, but we may need to register them on
+ # all ONUs. TODO: Rework when BBF MCAST is addressed in VOLTHA v2.O+
+ for vlan, gem_port in self._mcast_gem_ports.iteritems():
+ if logical_port is None or (logical_port == vlan and logical_port in self.olt.multicast_vlans):
+ gem_ids[vlan] = ([gem_port.gem_id], None)
+ else:
+ raise NotImplemented('TODO: This is deprecated')
+ # for onu_id, onu in self._onu_by_id.iteritems():
+ # if logical_port is None or logical_port == onu.logical_port:
+ # gem_ids[onu_id] = (onu.gem_ids(), flow_vlan)
+ return gem_ids
+
+ def _get_pon_config(self):
+ uri = AdtranOltHandler.GPON_PON_CONFIG_URI.format(self._pon_id)
+ name = 'pon-get-config-{}'.format(self._pon_id)
+ return self._parent.rest_client.request('GET', uri, name=name)
+
+ def _get_onu_config(self, onu_id=None):
+ if onu_id is None:
+ uri = AdtranOltHandler.GPON_ONU_CONFIG_LIST_URI.format(self._pon_id)
+ else:
+ uri = AdtranOltHandler.GPON_ONU_CONFIG_URI.format(self._pon_id, onu_id)
+
+ name = 'pon-get-onu_config-{}-{}'.format(self._pon_id, onu_id)
+ return self._parent.rest_client.request('GET', uri, name=name)
+
+ def _set_pon_config(self, leaf, value):
+ data = json.dumps({leaf: value})
+ uri = AdtranOltHandler.GPON_PON_CONFIG_URI.format(self._pon_id)
+ name = 'pon-set-config-{}-{}-{}'.format(self._pon_id, leaf, str(value))
+ # If no optics on PON, then PON config fails with status 400, suppress this
+ suppress_error = len(self.onu_ids) == 0
+ return self._parent.rest_client.request('PATCH', uri, data=data, name=name,
+ suppress_error=suppress_error)
+
+ def _discover_onus(self):
+ self.log.debug('discovery', state=self._admin_state, in_sync=self._in_sync)
+ if self._admin_state == AdminState.ENABLED:
+ if self._in_sync:
+ data = json.dumps({'pon-id': self._pon_id})
+ uri = AdtranOltHandler.GPON_PON_DISCOVER_ONU
+ name = 'pon-discover-onu-{}'.format(self._pon_id)
+
+ self._discovery_deferred = self._parent.rest_client.request('POST', uri, data, name=name)
+ self._discovery_deferred.addBoth(self._onu_discovery_init_complete)
+ else:
+ self.discovery_deferred = reactor.callLater(0,
+ self._onu_discovery_init_complete,
+ None)
+
+ def _onu_discovery_init_complete(self, _result):
+ """
+ This method is called after the REST POST to request ONU discovery is
+ completed. The results (body) of the post is always empty / 204 NO CONTENT
+ """
+ delay = self._no_onu_discover_tick if len(self._onus) == 0 else self._discovery_tick
+ delay += random.uniform(-delay / 10, delay / 10)
+ self._discovery_deferred = reactor.callLater(delay, self._discover_onus)
+
+ def sync_hardware(self):
+ if self.state == AdtnPort.State.RUNNING or self.state == AdtnPort.State.STOPPED:
+ def read_config(results):
+ self.log.debug('read-config', results=results)
+ config = OltConfig.Pon.decode([results])
+ assert self.pon_id in config, 'sync-pon-not-found-{}'.format(self.pon_id)
+ config = config[self.pon_id]
+ self._in_sync = True
+
+ dl = []
+
+ if self.enabled != config.enabled:
+ self._in_sync = False
+ self._expedite_sync = True
+ dl.append(self._set_pon_config("enabled", self.enabled))
+
+ elif self.state == AdtnPort.State.RUNNING:
+ if self.deployment_range != config.deployment_range:
+ self._in_sync = False
+ self._expedite_sync = True
+ dl.append(self._set_pon_config("deployment-range",
+ self.deployment_range))
+
+ # A little side note: FEC enable/disable cannot be changed and
+ # will remain in the previous status until an optical module
+ # is plugged in.
+ if self.downstream_fec_enable != config.downstream_fec_enable:
+ self._in_sync = False
+ dl.append(self._set_pon_config("downstream-fec-enable",
+ self.downstream_fec_enable))
+
+ if self.upstream_fec_enable != config.upstream_fec_enable:
+ self._in_sync = False
+ self._expedite_sync = True
+ dl.append(self._set_pon_config("upstream-fec-enable",
+ self.upstream_fec_enable))
+ defer.gatherResults(dl, consumeErrors=True)
+ return config.onus
+
+ def sync_onus(hw_onus):
+ if self.state == AdtnPort.State.RUNNING:
+ self.log.debug('sync-pon-onu-results', config=hw_onus)
+
+ # ONU's have their own sync task, extra (should be deleted) are
+ # handled here.
+ hw_onu_ids = frozenset(hw_onus.keys())
+ my_onu_ids = frozenset(self._onu_by_id.keys())
+
+ extra_onus = hw_onu_ids - my_onu_ids
+ dl = [self.delete_onu(onu_id, hw_only=True) for onu_id in extra_onus]
+
+ if self.activation_method == "autoactivate":
+ # Autoactivation of ONUs requires missing ONU detection. If
+ # not found, create them here but let the TCont/GEM-Port restore
+ # be handle by ONU H/w sync logic.
+ for onu in [self._onu_by_id[onu_id] for onu_id in my_onu_ids - hw_onu_ids
+ if self._onu_by_id.get(onu_id) is not None]:
+ dl.append(onu.create(reflow=True))
+
+ return defer.gatherResults(dl, consumeErrors=True)
+
+ def failure(reason, what):
+ self.log.error('hardware-sync-{}-failed'.format(what), reason=reason)
+ self._in_sync = False
+ self._expedite_sync = False
+
+ def reschedule(_):
+ # Speed up sequential resync a limited number of times if out of sync.
+
+ delay = self.sync_tick
+
+ if self._expedite_sync:
+ self._expedite_count += 1
+ if self._expedite_count < 5:
+ delay = 1
+ else:
+ self._expedite_count = 0
+
+ delay += random.uniform(-delay / 10, delay / 10)
+ self.sync_deferred = reactor.callLater(delay, self.sync_hardware)
+
+ self.sync_deferred = self._get_pon_config()
+ self.sync_deferred.addCallbacks(read_config, failure, errbackArgs=['get-config'])
+ self.sync_deferred.addCallbacks(sync_onus, failure, errbackArgs=['pon-sync'])
+ self.sync_deferred.addBoth(reschedule)
+
+ def process_status_poll(self, status):
+ """
+ Process PON status poll request
+
+ :param status: (OltState.Pon object) results from RESTCONF GET
+ """
+ self.log.debug('process-status-poll', status=status)
+
+ if self._admin_state != AdminState.ENABLED:
+ return
+
+ # Process LOS list
+ self._process_los_alarms(frozenset(status.ont_los))
+
+ # Get new/missing from the discovered ONU leaf. Stale ONUs from previous
+ # configs are now cleaned up during h/w re-sync/reflow.
+ new, rediscovered_onus = self._process_status_onu_discovered_list(status.discovered_onu)
+
+ # Process newly discovered ONU list and rediscovered ONUs
+ for serial_number in new | rediscovered_onus:
+ reactor.callLater(0, self.add_onu, serial_number, status)
+
+ # PON Statistics
+ timestamp = arrow.utcnow().float_timestamp
+ self._process_statistics(status, timestamp)
+
+ # Process ONU info. Note that newly added ONUs will not be processed
+ # until the next pass
+ self._update_onu_status(status.onus, timestamp)
+
+ # Process GEM Port information
+ self._update_gem_status(status.gems, timestamp)
+
+ def _process_statistics(self, status, timestamp):
+ self.timestamp = timestamp
+ self.rx_packets = status.rx_packets
+ self.rx_bytes = status.rx_bytes
+ self.tx_packets = status.tx_packets
+ self.tx_bytes = status.tx_bytes
+ self.tx_bip_errors = status.tx_bip_errors
+
+ def _update_onu_status(self, onus, timestamp):
+ """
+ Process ONU status for this PON
+ :param onus: (dict) onu_id: ONU State
+ """
+ for onu_id, onu_status in onus.iteritems():
+ if onu_id in self._onu_by_id:
+ onu = self._onu_by_id[onu_id]
+ onu.timestamp = timestamp
+ onu.rssi = onu_status.rssi
+ onu.equalization_delay = onu_status.equalization_delay
+ onu.equalization_delay = onu_status.equalization_delay
+ onu.fiber_length = onu_status.fiber_length
+ onu.password = onu_status.reported_password
+
+ def _update_gem_status(self, gems, timestamp):
+ for gem_id, gem_status in gems.iteritems():
+ onu = self._onu_by_id.get(gem_status.onu_id)
+ if onu is not None:
+ gem_port = onu.gem_port(gem_status.gem_id)
+ if gem_port is not None:
+ gem_port.timestamp = timestamp
+ gem_port.rx_packets = gem_status.rx_packets
+ gem_port.rx_bytes = gem_status.rx_bytes
+ gem_port.tx_packets = gem_status.tx_packets
+ gem_port.tx_bytes = gem_status.tx_bytes
+
+ def _process_los_alarms(self, ont_los):
+ """
+ Walk current LOS and set/clear LOS as appropriate
+ :param ont_los: (frozenset) ONU IDs of ONUs in LOS alarm state
+ """
+ cleared_alarms = self._active_los_alarms - ont_los
+ new_alarms = ont_los - self._active_los_alarms
+
+ if len(cleared_alarms) > 0 or len(new_alarms) > 0:
+ self.log.info('onu-los', cleared=cleared_alarms, new=new_alarms)
+
+ for onu_id in cleared_alarms:
+ self._active_los_alarms.remove(onu_id)
+ OnuLosAlarm(self.olt.alarms, onu_id, self.port_no).clear_alarm()
+
+ for onu_id in new_alarms:
+ self._active_los_alarms.add(onu_id)
+ OnuLosAlarm(self.olt.alarms, onu_id, self.port_no).raise_alarm()
+ reactor.callLater(0, self.delete_onu, onu_id)
+
+ def _process_status_onu_discovered_list(self, discovered_onus):
+ """
+ Look for new ONUs
+
+ :param discovered_onus: (frozenset) Set of ONUs currently discovered
+ """
+ self.log.debug('discovered-ONUs', list=discovered_onus)
+
+ # Only request discovery if activation is auto-discovery or auto-activate
+ continue_discovery = ['autodiscovery', 'autoactivate']
+
+ if self._activation_method not in continue_discovery:
+ return set(), set()
+
+ my_onus = frozenset(self._onus.keys())
+
+ new_onus = discovered_onus - my_onus
+ rediscovered_onus = my_onus & discovered_onus
+
+ return new_onus, rediscovered_onus
+
+ def _get_onu_info(self, serial_number):
+ """
+ Parse through available xPON information for ONU configuration settings
+
+ :param serial_number: (string) Decoded (not base64) serial number string
+ :return: (dict) onu config data or None on lookup failure
+ """
+ try:
+ if self.activation_method == "autodiscovery":
+ # if self.authentication_method == 'serial-number':
+ raise NotImplemented('autodiscovery: Not supported at this time')
+
+ elif self.activation_method == "autoactivate":
+ onu_id = self.get_next_onu_id
+ enabled = True
+ upstream_fec_enabled = True
+
+ else:
+ self.log.error('unsupported-activation-method', method=self.activation_method)
+ return None
+
+ onu_info = {
+ 'device-id': self.olt.device_id,
+ 'serial-number': serial_number,
+ 'pon': self,
+ 'onu-id': onu_id,
+ 'enabled': enabled,
+ 'upstream-fec': upstream_fec_enabled,
+ 'password': Onu.DEFAULT_PASSWORD,
+ }
+ pon_id = self.olt.pon_id_to_port_number(self._pon_id)
+
+ # TODO: Currently only one UNI port and it is hardcoded to port 0
+ onu_info['uni-ports'] = [platform.mk_uni_port_num(pon_id, onu_id)]
+
+ # return onu_info
+ return onu_info
+
+ except Exception as e:
+ self.log.exception('get-onu-info-tech-profiles', e=e)
+ return None
+
+ @inlineCallbacks
+ def add_onu(self, serial_number_64, status):
+ """
+ Add an ONU to the PON
+
+ :param serial_number_64: (str) base-64 encoded serial number
+ :param status: (dict) OLT PON status. Used to detect if ONU is already provisioned
+ """
+ serial_number = Onu.serial_number_to_string(serial_number_64)
+ self.log.info('add-onu', serial_number=serial_number,
+ serial_number_64=serial_number_64, status=status)
+
+ # It takes a little while for a new ONU to be removed from the discovery
+ # list. Return early here so extra ONU IDs are not allocated
+ if serial_number_64 in self._onus:
+ returnValue('wait-for-fpga')
+
+ if serial_number_64 in status.onus:
+ # Handles fast entry into this task before FPGA can clear results of ONU delete
+ returnValue('sticky-onu')
+
+ # At our limit? TODO: Retrieve from device resource manager if available
+ if len(self._onus) >= self.MAX_ONUS_SUPPORTED:
+ self.log.warning('max-onus-provisioned', count=len(self._onus))
+ returnValue('max-onus-reached')
+
+ onu_info = self._get_onu_info(serial_number)
+ onu_id = onu_info['onu-id']
+
+ if onu_id is None:
+ self.log.warning('no-onu-ids-available', serial_number=serial_number,
+ serial_number_64=serial_number_64)
+ returnValue('no-ids-available')
+
+ # TODO: Is the best before or after creation in parent device?
+ alarm = OnuDiscoveryAlarm(self.olt.alarms, self.pon_id, serial_number)
+ reactor.callLater(0, alarm.raise_alarm)
+
+ # Have the core create the ONU device
+ self._parent.add_onu_device(self.pon_id, onu_id, serial_number)
+
+ try:
+ onu = Onu(onu_info)
+ self._onus[serial_number_64] = onu
+ self._onu_by_id[onu.onu_id] = onu
+
+ # Add Multicast to PON on a per-ONU basis
+ #
+ # for id_or_vid, gem_port in gem_ports.iteritems():
+ # try:
+ # if gem_port.multicast:
+ # self.log.debug('id-or-vid', id_or_vid=id_or_vid)
+ # vid = self.olt.multicast_vlans[0] if len(self.olt.multicast_vlans) else None
+ # if vid is not None:
+ # self.add_mcast_gem_port(gem_port, vid)
+ #
+ # except Exception as e:
+ # self.log.exception('id-or-vid', e=e)
+
+ _results = yield onu.create()
+
+ except Exception as e:
+ self.log.warning('add-onu', serial_number=serial_number_64, e=e)
+ # allowable exception. H/w re-sync will recover/fix any issues
+
+ @property
+ def get_next_onu_id(self):
+ return self._parent.resource_mgr.get_onu_id(self._pon_id)
+
+ def release_onu_id(self, onu_id):
+ self._parent.resource_mgr.free_onu_id(self._pon_id, onu_id)
+
+ @inlineCallbacks
+ def _remove_from_hardware(self, onu_id):
+ uri = AdtranOltHandler.GPON_ONU_CONFIG_URI.format(self._pon_id, onu_id)
+ name = 'pon-delete-onu-{}-{}'.format(self._pon_id, onu_id)
+
+ try:
+ yield self._parent.rest_client.request('DELETE', uri, name=name)
+
+ except RestInvalidResponseCode as e:
+ if e.code != 404:
+ self.log.exception('onu-delete', e=e)
+
+ except Exception as e:
+ self.log.exception('onu-hw-delete', onu_id=onu_id, e=e)
+
+ @inlineCallbacks
+ def delete_onu(self, onu_id, hw_only=False):
+ onu = self._onu_by_id.get(onu_id)
+
+ # Remove from any local dictionary
+ if onu_id in self._onu_by_id:
+ del self._onu_by_id[onu_id]
+
+ if onu is not None:
+ if onu.serial_number_64 in self._onus:
+ del self._onus[onu.serial_number_64]
+ try:
+ proxy_address = onu.proxy_address
+ onu.delete() # Remove from hardware
+
+ # And removal from VOLTHA adapter agent
+ if not hw_only:
+ self._parent.delete_child_device(proxy_address)
+
+ except Exception as e:
+ self.log.exception('onu-delete', serial_number=onu.serial_number, e=e)
+ else:
+ try:
+ yield self._remove_from_hardware(onu_id)
+
+ except Exception as e:
+ self.log.debug('onu-remove', serial_number=onu.serial_number, e=e)
+
+ # Remove from LOS list if needed TODO: Should a 'clear' alarm be sent as well ?
+ if onu is not None and onu.id in self._active_los_alarms:
+ self._active_los_alarms.remove(onu.id)
+
+ def add_mcast_gem_port(self, mcast_gem, vlan):
+ """
+ Add any new Multicast GEM Ports to the PON
+ :param mcast_gem: (GemPort)
+ """
+ if vlan in self._mcast_gem_ports:
+ return
+
+ assert len(self._mcast_gem_ports) == 0, 'Only 1 MCAST GEMPort until BBF Support'
+ assert 1 <= vlan <= 4095, 'Invalid Multicast VLAN ID'
+ assert len(self.olt.multicast_vlans) == 1, 'Only support 1 MCAST VLAN until BBF Support'
+
+ self._mcast_gem_ports[vlan] = mcast_gem