VOL-1397: Adtran-OLT - Initial containerization commit
- Need to move VERSION to base directory
Change-Id: I9d62d0607a011ce642e379fd92b35ec48b300070
diff --git a/adapters/adtran_common/flow/evc_map.py b/adapters/adtran_common/flow/evc_map.py
new file mode 100644
index 0000000..688124a
--- /dev/null
+++ b/adapters/adtran_common/flow/evc_map.py
@@ -0,0 +1,1015 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import xmltodict
+import re
+import structlog
+from enum import Enum
+from acl import ACL
+from twisted.internet import defer, reactor
+from twisted.internet.defer import inlineCallbacks, returnValue, succeed
+from ncclient.operations.rpc import RPCError
+
+
+log = structlog.get_logger()
+
+# NOTE: For the EVC Map name, the ingress-port number is the VOLTHA port number (not pon-id since
+# it covers NNI ports as well in order to handle the NNI-NNI case. For flows that
+# cover an entire pon, the name will have the ONU ID and GEM ID appended to it upon
+# installation with a period as a separator.
+
+EVC_MAP_NAME_FORMAT = 'VOLTHA-{}-{}' # format(logical-ingress-port-number, flow-id)
+EVC_MAP_NAME_REGEX_ALL = 'VOLTHA-*'
+
+
+class EVCMap(object):
+ """
+ Class to wrap EVC functionality
+ """
+ class EvcConnection(Enum):
+ NO_EVC_CONNECTION = 0
+ EVC = 1
+ DISCARD = 2
+ DEFAULT = NO_EVC_CONNECTION
+
+ @staticmethod
+ def xml(value):
+ # Note we do not have XML for 'EVC' enumeration.
+ if value is None:
+ value = EVCMap.EvcConnection.DEFAULT
+ if value == EVCMap.EvcConnection.DISCARD:
+ return '<no-evc-connection/>'
+ elif value == EVCMap.EvcConnection.DISCARD:
+ return 'discard/'
+ raise ValueError('Invalid EvcConnection enumeration')
+
+ class PriorityOption(Enum):
+ INHERIT_PRIORITY = 0
+ EXPLICIT_PRIORITY = 1
+ DEFAULT = INHERIT_PRIORITY
+
+ @staticmethod
+ def xml(value):
+ if value is None:
+ value = EVCMap.PriorityOption.DEFAULT
+ if value == EVCMap.PriorityOption.INHERIT_PRIORITY:
+ return '<inherit-pri/>'
+ elif value == EVCMap.PriorityOption.EXPLICIT_PRIORITY:
+ return '<explicit-pri/>'
+ raise ValueError('Invalid PriorityOption enumeration')
+
+ def __init__(self, flow, evc, is_ingress_map):
+ self._handler = flow.handler # Same for all Flows attached to this EVC MAP
+ self._flows = {flow.flow_id: flow}
+ self._evc = None
+ self._new_acls = dict() # ACL Name -> ACL Object (To be installed into h/w)
+ self._existing_acls = dict() # ACL Name -> ACL Object (Already in H/w)
+ self._is_ingress_map = is_ingress_map
+ self._pon_id = None
+ self._onu_id = None # Remains None if associated with a multicast flow
+ self._installed = False
+ self._needs_update = False
+ self._status_message = None
+ self._deferred = None
+ self._name = None
+ self._enabled = True
+ self._uni_port = None
+ self._evc_connection = EVCMap.EvcConnection.DEFAULT
+ self._men_priority = EVCMap.PriorityOption.DEFAULT
+ self._men_pri = 0 # If Explicit Priority
+
+ self._c_tag = None
+ self._men_ctag_priority = EVCMap.PriorityOption.DEFAULT
+ self._men_ctag_pri = 0 # If Explicit Priority
+ self._match_ce_vlan_id = None
+ self._match_untagged = False
+ self._match_destination_mac_address = None
+ self._match_l2cp = False
+ self._match_broadcast = False
+ self._match_multicast = False
+ self._match_unicast = False
+ self._match_igmp = False
+
+ from common.tech_profile.tech_profile import DEFAULT_TECH_PROFILE_TABLE_ID
+ self._tech_profile_id = DEFAULT_TECH_PROFILE_TABLE_ID
+ self._gem_ids_and_vid = None # { key -> onu-id, value -> tuple(sorted GEM Port IDs, onu_vid) }
+ self._upstream_bandwidth = None
+ self._shaper_name = None
+
+ # ACL logic
+ self._eth_type = None
+ self._ip_protocol = None
+ self._ipv4_dst = None
+ self._udp_dst = None
+ self._udp_src = None
+
+ try:
+ self._valid = self._decode(evc)
+
+ except Exception as e:
+ log.exception('decode', e=e)
+ self._valid = False
+
+ def __str__(self):
+ return "EVCMap-{}: UNI: {}, hasACL: {}".format(self._name, self._uni_port,
+ self._needs_acl_support)
+
+ @staticmethod
+ def create_ingress_map(flow, evc, dry_run=False):
+ evc_map = EVCMap(flow, evc, True)
+
+ if evc_map._valid and not dry_run:
+ evc.add_evc_map(evc_map)
+ evc_map._evc = evc
+
+ return evc_map
+
+ @staticmethod
+ def create_egress_map(flow, evc, dry_run=False):
+ evc_map = EVCMap(flow, evc, False)
+
+ if evc_map._valid and not dry_run:
+ evc.add_evc_map(evc_map)
+ evc_map._evc = evc
+
+ return evc_map
+
+ @property
+ def valid(self):
+ return self._valid
+
+ @property
+ def installed(self):
+ return self._installed
+
+ @property
+ def needs_update(self):
+ """ True if an parameter/ACL/... needs update or map needs to be reflowed after a failure"""
+ return self._needs_update
+
+ @needs_update.setter
+ def needs_update(self, value):
+ assert not value, 'needs update can only be reset' # Can only reset
+ self._needs_update = False
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def status(self):
+ return self._status_message
+
+ @status.setter
+ def status(self, value):
+ self._status_message = value
+
+ @property
+ def evc(self):
+ return self._evc
+
+ @property
+ def _needs_acl_support(self):
+ if self._ipv4_dst is not None: # In case MCAST downstream has ACL on it
+ return False
+
+ return self._eth_type is not None or self._ip_protocol is not None or\
+ self._udp_dst is not None or self._udp_src is not None
+
+ @property
+ def pon_id(self):
+ return self._pon_id # May be None
+
+ @property
+ def onu_id(self):
+ return self._onu_id # May be None if associated with a multicast flow
+
+ # @property
+ # def onu_ids(self):
+ # return self._gem_ids_and_vid.keys()
+
+ @property
+ def gem_ids_and_vid(self):
+ return self._gem_ids_and_vid.copy()
+
+ @staticmethod
+ def _xml_header(operation=None):
+ return '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps"{}><evc-map>'.\
+ format('' if operation is None else ' xc:operation="{}"'.format(operation))
+
+ @staticmethod
+ def _xml_trailer():
+ return '</evc-map></evc-maps>'
+
+ def get_evcmap_name(self, onu_id, gem_id):
+ return'{}.{}.{}.{}'.format(self.name, self.pon_id, onu_id, gem_id)
+
+ def _common_install_xml(self):
+ xml = '<enabled>{}</enabled>'.format('true' if self._enabled else 'false')
+ xml += '<uni>{}</uni>'.format(self._uni_port)
+
+ evc_name = self._evc.name if self._evc is not None else None
+ if evc_name is not None:
+ xml += '<evc>{}</evc>'.format(evc_name)
+ else:
+ xml += EVCMap.EvcConnection.xml(self._evc_connection)
+
+ xml += '<match-untagged>{}</match-untagged>'.format('true'
+ if self._match_untagged
+ else 'false')
+
+ # TODO: The following is not yet supported (and in some cases, not decoded)
+ # self._men_priority = EVCMap.PriorityOption.INHERIT_PRIORITY
+ # self._men_pri = 0 # If Explicit Priority
+ #
+ # self._men_ctag_priority = EVCMap.PriorityOption.INHERIT_PRIORITY
+ # self._men_ctag_pri = 0 # If Explicit Priority
+ #
+ # self._match_ce_vlan_id = None
+ # self._match_untagged = True
+ # self._match_destination_mac_address = None
+ return xml
+
+ def _ingress_install_xml(self, onu_s_gem_ids_and_vid, acl_list, create):
+ from ..onu import Onu
+
+ if len(acl_list):
+ xml = '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps"' +\
+ ' xmlns:adtn-evc-map-acl="http://www.adtran.com/ns/yang/adtran-evc-map-access-control-list">'
+ else:
+ xml = '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps">'
+
+ for onu_or_vlan_id, gem_ids_and_vid in onu_s_gem_ids_and_vid.iteritems():
+ first_gem_id = True
+ gem_ids = gem_ids_and_vid[0]
+ vid = gem_ids_and_vid[1]
+ ident = '{}.{}'.format(self._pon_id, onu_or_vlan_id) if vid is None \
+ else onu_or_vlan_id
+
+ for gem_id in gem_ids:
+ xml += '<evc-map{}>'.format('' if not create else ' xc:operation="create"')
+ xml += '<name>{}.{}.{}</name>'.format(self.name, ident, gem_id)
+ xml += '<ce-vlan-id>{}</ce-vlan-id>'.format(Onu.gem_id_to_gvid(gem_id))
+
+ # GEM-IDs are a sorted list (ascending). First gemport handles downstream traffic
+ if first_gem_id and (self._c_tag is not None or vid is not None):
+ first_gem_id = False
+ vlan = vid or self._c_tag
+ xml += '<network-ingress-filter>'
+ xml += '<men-ctag>{}</men-ctag>'.format(vlan) # Added in August 2017 model
+ xml += '</network-ingress-filter>'
+
+ if len(acl_list):
+ xml += '<adtn-evc-map-acl:access-lists>'
+ for acl in acl_list:
+ xml += ' <adtn-evc-map-acl:ingress-acl>'
+ xml += acl.evc_map_ingress_xml()
+ xml += ' </adtn-evc-map-acl:ingress-acl>'
+ xml += '</adtn-evc-map-acl:access-lists>'
+ xml += self._common_install_xml()
+ xml += '</evc-map>'
+ xml += '</evc-maps>'
+ return xml
+
+ def _egress_install_xml(self):
+ xml = EVCMap._xml_header()
+ xml += '<name>{}</name>'.format(self.name)
+ xml += self._common_install_xml()
+ xml += EVCMap._xml_trailer()
+ return xml
+
+ def _ingress_remove_acl_xml(self, onu_s_gem_ids_and_vid, acl):
+ xml = '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps"' +\
+ ' xmlns:adtn-evc-map-acl="http://www.adtran.com/ns/yang/adtran-evc-map-access-control-list">'
+ for onu_or_vlan_id, gem_ids_and_vid in onu_s_gem_ids_and_vid.iteritems():
+ first_gem_id = True
+ vid = gem_ids_and_vid[1]
+ ident = '{}.{}'.format(self._pon_id, onu_or_vlan_id) if vid is None \
+ else onu_or_vlan_id
+
+ for gem_id in gem_ids_and_vid[0]:
+ xml += '<evc-map>'
+ xml += '<name>{}.{}.{}</name>'.format(self.name, ident, gem_id)
+ xml += '<adtn-evc-map-acl:access-lists>'
+ xml += ' <adtn-evc-map-acl:ingress-acl xc:operation="delete">'
+ xml += acl.evc_map_ingress_xml()
+ xml += ' </adtn-evc-map-acl:ingress-acl>'
+ xml += '</adtn-evc-map-acl:access-lists>'
+ xml += '</evc-map>'
+ xml += '</evc-maps>'
+ return xml
+
+ @inlineCallbacks
+ def install(self):
+ def gem_ports():
+ ports = []
+ for gems_and_vids in self._gem_ids_and_vid.itervalues():
+ ports.extend(gems_and_vids[0])
+ return ports
+
+ log.debug('install-evc-map', valid=self._valid, gem_ports=gem_ports())
+
+ if self._valid and len(gem_ports()) > 0:
+ # Install ACLs first (if not yet installed)
+ work_acls = self._new_acls.copy()
+ self._new_acls = dict()
+
+ log.debug('install-evc-map-acls', install_acls=len(work_acls))
+ for acl in work_acls.itervalues():
+ try:
+ yield acl.install()
+
+ except Exception as e:
+ log.exception('acl-install-failed', name=self.name, e=e)
+ self._new_acls.update(work_acls)
+ raise
+
+ # Any user-data flows attached to this map ?
+ c_tag = None
+ for flow_id, flow in self._flows.items():
+ c_tag = flow.inner_vid or flow.vlan_id or c_tag
+
+ self._c_tag = c_tag
+
+ # Now EVC-MAP
+ if not self._installed or self._needs_update:
+ log.debug('needs-install-or-update', installed=self._installed, update=self._needs_update)
+ is_installed = self._installed
+ self._installed = True
+ try:
+ self._cancel_deferred()
+
+ log.info('upstream-bandwidth')
+ try:
+ yield self.update_upstream_flow_bandwidth()
+
+ except Exception as e:
+ log.exception('upstream-bandwidth-failed', name=self.name, e=e)
+ raise
+
+ map_xml = self._ingress_install_xml(self._gem_ids_and_vid, work_acls.values(),
+ not is_installed) \
+ if self._is_ingress_map else self._egress_install_xml()
+
+ log.debug('install', xml=map_xml, name=self.name)
+ results = yield self._handler.netconf_client.edit_config(map_xml)
+ self._installed = results.ok
+ self._needs_update = results.ok
+ self._status_message = '' if results.ok else results.error
+
+ if results.ok:
+ self._existing_acls.update(work_acls)
+ else:
+ self._new_acls.update(work_acls)
+
+ except RPCError as rpc_err:
+ if rpc_err.tag == 'data-exists': # Known race due to bulk-flow operation
+ pass
+
+ except Exception as e:
+ log.exception('evc-map-install-failed', name=self.name, e=e)
+ self._installed = is_installed
+ self._new_acls.update(work_acls)
+ raise
+
+ # Install any needed shapers
+ if self._installed:
+ try:
+ yield self.update_downstream_flow_bandwidth()
+
+ except Exception as e:
+ log.exception('shaper-install-failed', name=self.name, e=e)
+ raise
+
+ returnValue(self._installed and self._valid)
+
+ def _ingress_remove_xml(self, onus_gem_ids_and_vid):
+ xml = '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps"' + \
+ ' xc:operation="delete">'
+
+ for onu_id, gem_ids_and_vid in onus_gem_ids_and_vid.iteritems():
+ for gem_id in gem_ids_and_vid[0]:
+ xml += '<evc-map>'
+ xml += '<name>{}.{}.{}</name>'.format(self.name, onu_id, gem_id)
+ xml += '</evc-map>'
+ xml += '</evc-maps>'
+ return xml
+
+ def _egress_remove_xml(self):
+ return EVCMap._xml_header('delete') + \
+ '<name>{}</name>'.format(self.name) + EVCMap._xml_trailer()
+
+ def _remove(self):
+ if not self.installed:
+ returnValue('Not installed')
+
+ log.info('removing', evc_map=self)
+
+ def _success(rpc_reply):
+ log.debug('remove-success', rpc_reply=rpc_reply)
+ self._installed = False
+
+ def _failure(failure):
+ log.error('remove-failed', failure=failure)
+ self._installed = False
+
+ def _remove_acls(_):
+ acls, self._new_acls = self._new_acls, dict()
+ existing, self._existing_acls = self._existing_acls, dict()
+ acls.update(existing)
+
+ dl = []
+ for acl in acls.itervalues():
+ dl.append(acl.remove())
+
+ if len(dl) > 0:
+ defer.gatherResults(dl, consumeErrors=True)
+
+ def _remove_shaper(_):
+ if self._shaper_name is not None:
+ self.update_downstream_flow_bandwidth(remove=True)
+
+ map_xml = self._ingress_remove_xml(self._gem_ids_and_vid) if self._is_ingress_map \
+ else self._egress_remove_xml()
+
+ d = self._handler.netconf_client.edit_config(map_xml)
+ d.addCallbacks(_success, _failure)
+ d.addBoth(_remove_acls)
+ d.addBoth(_remove_shaper)
+ return d
+
+ @inlineCallbacks
+ def delete(self, flow):
+ """
+ Remove from hardware and delete/clean-up EVC-MAP Object
+
+ :param flow: (FlowEntry) Specific flow to remove from the MAP or None if all
+ flows should be removed
+ :return:
+ """
+ flows = [flow] if flow is not None else list(self._flows.values())
+ removing_all = len(flows) == len(self._flows)
+
+ log.debug('delete', removing_all=removing_all)
+ if not removing_all:
+ for f in flows:
+ self._remove_flow(f)
+
+ else:
+ if self._evc is not None:
+ self._evc.remove_evc_map(self)
+ self._evc = None
+
+ self._valid = False
+ self._cancel_deferred()
+ try:
+ yield self._remove()
+
+ except Exception as e:
+ log.exception('removal', e=e)
+
+ returnValue('Done')
+
+ def reflow_needed(self):
+ log.debug('reflow-needed', installed=self.installed, needs_update=self.needs_update)
+ reflow = not self.installed or self.needs_update
+
+ if not reflow:
+ pass # TODO: implement retrieve & compare of EVC Map parameters
+
+ return reflow
+
+ @staticmethod
+ def find_matching_ingress_flow(flow, upstream_flow_table):
+ """
+ Look for an existing EVC-MAP that may match this flow. Called when upstream signature
+ for a flow does not make match. This can happen if an ACL flow is added and only an User
+ Data flow exists, or if only an ACL flow exists.
+
+ :param flow: (FlowEntry) flow to add
+ :param upstream_flow_table: (dict of FlowEntry) Existing upstream flows for this device,
+ including the flow we are looking to add
+ :return: (EVCMap) if appropriate one is found, else None
+ """
+ # A User Data flow will have:
+ # signature: <dev>.1.5.2.242
+ # down-sig: <dev>.1.*.2.*
+ # logical-port: 66
+ # is-acl-flow: False
+ #
+ # An ACL flow will have:
+ # signature: <dev>.1.5.[4092 or 4094].None (untagged VLAN == utility VLAN case)
+ # down-sig: <dev>.1.*.[4092 or 4094].*
+ # logical-port: 66
+ # is-acl-flow: True
+ #
+ # Reduce the upstream flow table to only those that match the ingress,
+ # and logical-ports match (and is not this flow) and have a map
+
+ log.debug('find-matching-ingress-flow', logical_port=flow.logical_port, flow=flow.output)
+ candidate_flows = [f for f in upstream_flow_table.itervalues() if
+ f.in_port == flow.in_port and
+ f.logical_port == flow.logical_port and
+ f.output == flow.output and
+ f.evc_map is not None] # This weeds out this flow
+
+ log.debug('find-matching-ingress-flow', candidate_flows=candidate_flows)
+ return candidate_flows[0].evc_map if len(candidate_flows) > 0 else None
+
+ def add_flow(self, flow, evc):
+ """
+ Add a new flow to an existing EVC-MAP. This can be called to add:
+ o an ACL flow to an existing utility EVC, or
+ o an ACL flow to an existing User Data Flow, or
+ o a User Data Flow to an existing ACL flow (and this needs the EVC updated
+ as well.
+
+ Note that the Downstream EVC provided is the one that matches this flow. If
+ this is adding an ACL to and existing User data flow, we DO NOT want to
+ change the EVC Map's EVC
+
+ :param flow: (FlowEntry) New flow
+ :param evc: (EVC) Matching EVC for downstream flow
+ """
+ from flow_entry import FlowEntry
+ # Create temporary EVC-MAP
+ assert flow.flow_direction in FlowEntry.upstream_flow_types, \
+ 'Only Upstream flows additions are supported at this time'
+
+ log.debug('add-flow-to-evc', flow=flow, evc=evc)
+
+ tmp_map = EVCMap.create_ingress_map(flow, evc, dry_run=True) \
+ if flow.flow_direction in FlowEntry.upstream_flow_types \
+ else EVCMap.create_egress_map(flow, evc, dry_run=True)
+
+ if tmp_map is None or not tmp_map.valid:
+ return None
+
+ self._flows[flow.flow_id] = flow
+ self._needs_update = True
+
+ # Are there ACLs to add to any existing (or empty) ACLs
+ if len(tmp_map._new_acls) > 0:
+ self._new_acls.update(tmp_map._new_acls) # New ACL flow
+ log.debug('add-acl-flows', map=str(self), new=tmp_map._new_acls)
+
+ # Look up existing EVC for this flow. If it is a service EVC for
+ # Packet In/Out, and this is a regular flow, migrate to the newer EVC
+ if self._evc.service_evc and not evc.service_evc:
+ log.info('new-evc-for-map', old=self._evc.name, new=evc.name)
+ self._evc.remove_evc_map(self)
+ evc.add_evc_map(self)
+ self._evc = evc
+
+ return self
+
+ @inlineCallbacks
+ def _remove_flow(self, flow):
+ """
+ Remove a specific flow from an EVC_MAP. This includes removing any
+ ACL entries associated with the flow and could result in moving the
+ EVC-MAP over to another EVC.
+
+ :param flow: (FlowEntry) Flow to remove
+ """
+ try:
+ del self._flows[flow.flow_id]
+
+ log('remove-flow-to-evc', flow=flow)
+ # Remove any ACLs
+ acl_name = ACL.flow_to_name(flow)
+ acl = None
+
+ # if not yet installed just remove it from list
+ if acl_name in self._new_acls:
+ del self._new_acls[acl_name]
+ else:
+ acl = self._existing_acls[acl_name]
+ if acl is not None:
+ # Remove ACL from EVC-MAP entry
+
+ try:
+ map_xml = self._ingress_remove_acl_xml(self._gem_ids_and_vid, acl)
+ log.debug('remove', xml=map_xml, name=acl.name)
+ results = yield self._handler.netconf_client.edit_config(map_xml)
+ if results.ok:
+ del self._existing_acls[acl.name]
+
+ # Scan EVC to see if it needs to move back to the Utility
+ # or Untagged EVC from a user data EVC
+ if self._evc and not self._evc.service_evc and\
+ len(self._flows) > 0 and\
+ all(f.is_acl_flow for f in self._flows.itervalues()):
+
+ self._evc.remove_evc_map(self)
+ first_flow = self._flows.itervalues().next()
+ self._evc = first_flow.get_utility_evc(True)
+ self._evc.add_evc_map(self)
+ log.debug('moved-acl-flows-to-utility-evc', newevcname=self._evc.name)
+
+ self._needs_update = True
+ self._evc.schedule_install()
+
+ except Exception as e:
+ log.exception('acl-remove-from-evc', e=e)
+
+ # Remove ACL itself
+ try:
+ yield acl.remove()
+
+ except Exception as e:
+ log.exception('acl-remove', e=e)
+
+ except Exception as e:
+ log.exception('remove-failed', e=e)
+
+ @staticmethod
+ def create_evc_map_name(flow):
+ # Note: When actually installed into the OLT, the .onu_id.gem_port is
+ # appended to the name
+ return EVC_MAP_NAME_FORMAT.format(flow.logical_port, flow.flow_id)
+
+ @staticmethod
+ def decode_evc_map_name(name):
+ """
+ Reverse engineer EVC-MAP name parameters. Helpful in quick packet-in
+ processing
+
+ :param name: (str) EVC Map Name
+ :return: (dict) Logical Ingress Port, OpenFlow Flow-ID
+ """
+ items = name.split('-') if name is not None else dict()
+
+ # Note: When actually installed into the OLT, the .onu_id.gem_port is
+ # appended to the name
+ return {'ingress-port': items[1],
+ 'flow-id': items[2].split('.')[0]} if len(items) > 2 else dict()
+
+ @inlineCallbacks
+ def update_upstream_flow_bandwidth(self):
+ """
+ Upstream flow bandwidth comes from the flow_entry related to this EVC-MAP
+ and if no bandwidth property is found, allow full bandwidth
+ """
+ # all flows should should be on the same PON
+ flow = self._flows.itervalues().next()
+ is_pon = flow.handler.is_pon_port(flow.in_port)
+
+ if self._is_ingress_map and is_pon:
+ pon_port = flow.handler.get_southbound_port(flow.in_port)
+ if pon_port is None:
+ returnValue('no PON')
+
+ session = self._handler.rest_client
+ # TODO: Refactor with tech profiles
+ tconts = None # pon_port.tconts
+ traffic_descriptors = None # pon_port.traffic_descriptors
+
+ if traffic_descriptors is None or tconts is None:
+ returnValue('no TDs on PON')
+
+ bandwidth = self._upstream_bandwidth or 10000000000
+
+ if self.pon_id is not None and self.onu_id is not None:
+ name = 'tcont-{}-{}-data'.format(self.pon_id, self.onu_id)
+ td = traffic_descriptors.get(name)
+ tcont = tconts.get(name)
+
+ if td is not None and tcont is not None:
+ alloc_id = tcont.alloc_id
+ td.maximum_bandwidth = bandwidth
+ try:
+ results = yield td.add_to_hardware(session)
+ log.debug('td-modify-results', results=results)
+
+ except Exception as _e:
+ pass
+
+ @inlineCallbacks
+ def update_downstream_flow_bandwidth(self, remove=False):
+ """
+ Downstream flow bandwidth is extracted from the related EVC flow_entry
+ bandwidth property. It is written to this EVC-MAP only if it is found
+ """
+ xml = None
+ results = None
+
+ if remove:
+ name, self._shaper_name = self._shaper_name, None
+ if name is not None:
+ xml = self._shaper_remove_xml(name)
+ else:
+ if self._evc is not None and self._evc.flow_entry is not None \
+ and self._evc.flow_entry.bandwidth is not None:
+ self._shaper_name = self._name
+ xml = self._shaper_install_xml(self._shaper_name,
+ self._evc.flow_entry.bandwidth * 1000) # kbps
+ if xml is not None:
+ try:
+ log.info('downstream-bandwidth', xml=xml, name=self.name, remove=remove)
+ results = yield self._handler.netconf_client.edit_config(xml)
+
+ except RPCError as rpc_err:
+ if rpc_err.tag == 'data-exists':
+ pass
+
+ except Exception as e:
+ log.exception('downstream-bandwidth', name=self.name, remove=remove, e=e)
+ raise
+
+ returnValue(results)
+
+ def _shaper_install_xml(self, name, bandwidth):
+ xml = '<adtn-shaper:shapers xmlns:adtn-shaper="http://www.adtran.com/ns/yang/adtran-traffic-shapers" xmlns:nc="urn:ietf:params:xml:ns:netconf:base:1.0" nc:operation="merge">'
+ for onu_id, gem_ids_and_vid in self._gem_ids_and_vid.iteritems():
+ for gem_id in gem_ids_and_vid[0]:
+ xml += ' <adtn-shaper:shaper>'
+ xml += ' <adtn-shaper:name>{}.{}.{}</adtn-shaper:name>'.format(name, onu_id, gem_id)
+ xml += ' <adtn-shaper:enabled>true</adtn-shaper:enabled>'
+ xml += ' <adtn-shaper:rate>{}</adtn-shaper:rate>'.format(bandwidth)
+ xml += ' <adtn-shaper-evc-map:evc-map xmlns:adtn-shaper-evc-map="http://www.adtran.com/ns/yang/adtran-traffic-shaper-evc-maps">{}.{}.{}</adtn-shaper-evc-map:evc-map>'.format(self.name, onu_id, gem_id)
+ xml += ' </adtn-shaper:shaper>'
+ xml += '</adtn-shaper:shapers>'
+ return xml
+
+ def _shaper_remove_xml(self, name):
+ xml = '<adtn-shaper:shapers xmlns:adtn-shaper="http://www.adtran.com/ns/yang/adtran-traffic-shapers" xmlns:nc="urn:ietf:params:xml:ns:netconf:base:1.0" nc:operation="delete">'
+ for onu_id, gem_ids_and_vid in self._gem_ids_and_vid.iteritems():
+ for gem_id in gem_ids_and_vid[0]:
+ xml += ' <adtn-shaper:shaper >'
+ xml += ' <adtn-shaper:name>{}.{}.{}</adtn-shaper:name>'.format(name, onu_id, gem_id)
+ xml += ' </adtn-shaper:shaper>'
+ xml += '</adtn-shaper:shapers>'
+ return xml
+
+ def _setup_tech_profiles(self):
+ # Set up the TCONT / GEM Ports for this connection (Downstream only of course)
+ # all flows should have same GEM port setup
+ flow = self._flows.itervalues().next()
+ is_pon = flow.handler.is_pon_port(flow.in_port)
+
+ if self._is_ingress_map and is_pon:
+ pon_port = flow.handler.get_southbound_port(flow.in_port)
+
+ if pon_port is None:
+ return
+
+ onu = next((onu for onu in pon_port.onus if onu.logical_port == flow.logical_port), None)
+
+ if onu is None: # TODO: Add multicast support later (self.onu_id == None)
+ return
+
+ self._pon_id = pon_port.pon_id
+ self._onu_id = onu.onu_id
+
+ # Identify or allocate TCONT and GEM Ports. If the ONU has been informed of the
+ # GEM PORTs that belong to it, the tech profiles were already set up by a previous
+ # flows
+ onu_gems = onu.gem_ids(self._tech_profile_id)
+
+ if len(onu_gems) > 0:
+ self._gem_ids_and_vid[onu.onu_id] = (onu_gems, flow.vlan_id)
+ return
+
+ uni_id = self._handler.platform.uni_id_from_uni_port(flow.logical_port)
+ pon_profile = self._handler.tech_profiles[self.pon_id]
+ alloc_id = None
+
+ try:
+ (ofp_port_name, ofp_port_no) = self._handler.get_ofp_port_name(self.pon_id,
+ self.onu_id,
+ flow.logical_port)
+ if ofp_port_name is None:
+ log.error("port-name-not-found")
+ return
+
+ # Check tech profile instance already exists for derived port name
+ tech_profile = pon_profile.get_tech_profile_instance(self._tech_profile_id,
+ ofp_port_name)
+ log.debug('Get-tech-profile-instance-status',
+ tech_profile_instance=tech_profile)
+
+ if tech_profile is None:
+ # create tech profile instance
+ tech_profile = pon_profile.create_tech_profile_instance(self._tech_profile_id,
+ ofp_port_name,
+ self.pon_id)
+ if tech_profile is None:
+ raise Exception('Tech-profile-instance-creation-failed')
+ else:
+ log.debug('Tech-profile-instance-already-exist-for-given port-name',
+ ofp_port_name=ofp_port_name)
+
+ # upstream scheduler
+ us_scheduler = pon_profile.get_us_scheduler(tech_profile)
+
+ # downstream scheduler
+ ds_scheduler = pon_profile.get_ds_scheduler(tech_profile)
+
+ # create Tcont protobuf
+ pb_tconts = pon_profile.get_tconts(tech_profile, us_scheduler, ds_scheduler)
+
+ # create TCONTs & GEM Ports locally
+ for pb_tcont in pb_tconts:
+ from ..xpon.olt_tcont import OltTCont
+ tcont = OltTCont.create(pb_tcont,
+ self.pon_id,
+ self.onu_id,
+ self._tech_profile_id,
+ uni_id,
+ ofp_port_no)
+ if tcont is not None:
+ onu.add_tcont(tcont)
+
+ # Fetch alloc id and gemports from tech profile instance
+ alloc_id = tech_profile.us_scheduler.alloc_id
+
+ onu_gems = [gem.gemport_id for gem in tech_profile.upstream_gem_port_attribute_list]
+
+ for gem in tech_profile.upstream_gem_port_attribute_list:
+ from ..xpon.olt_gem_port import OltGemPort
+ gem_port = OltGemPort.create(self._handler,
+ gem,
+ tech_profile.us_scheduler.alloc_id,
+ self._tech_profile_id,
+ self.pon_id,
+ self.onu_id,
+ uni_id,
+ ofp_port_no)
+ if gem_port is not None:
+ onu.add_gem_port(gem_port)
+
+ self._gem_ids_and_vid = {onu.onu_id: (onu_gems, flow.vlan_id)}
+
+ # Send technology profile information to ONU
+ reactor.callLater(0, self._handler.setup_onu_tech_profile, self._pon_id,
+ self.onu_id, flow.logical_port)
+
+ except BaseException as e:
+ log.exception(exception=e)
+
+ # Update the allocated alloc_id and gem_port_id for the ONU/UNI to KV store
+ pon_intf_onu_id = (self.pon_id, self.onu_id, uni_id)
+ resource_manager = self._handler.resource_mgr.resource_managers[self.pon_id]
+
+ resource_manager.update_alloc_ids_for_onu(pon_intf_onu_id, list([alloc_id]))
+ resource_manager.update_gemport_ids_for_onu(pon_intf_onu_id, onu_gems)
+
+ self._handler.resource_mgr.update_gemports_ponport_to_onu_map_on_kv_store(onu_gems,
+ self.pon_id,
+ self.onu_id,
+ uni_id)
+
+ def _decode(self, evc):
+ from evc import EVC
+ from flow_entry import FlowEntry
+
+ # Only called from initializer, so first flow is only flow
+ flow = self._flows.itervalues().next()
+
+ self._name = EVCMap.create_evc_map_name(flow)
+
+ if evc:
+ self._evc_connection = EVCMap.EvcConnection.EVC
+ else:
+ self._status_message = 'Can only create EVC-MAP if EVC supplied'
+ return False
+
+ is_pon = flow.handler.is_pon_port(flow.in_port)
+ is_uni = flow.handler.is_uni_port(flow.in_port)
+
+ if flow.bandwidth is not None:
+ self._upstream_bandwidth = flow.bandwidth * 1000000
+
+ if is_pon or is_uni:
+ # Preserve CE VLAN tag only if utility VLAN/EVC
+ self._uni_port = flow.handler.get_port_name(flow.in_port)
+ evc.ce_vlan_preservation = evc.ce_vlan_preservation or False
+ else:
+ self._status_message = 'EVC-MAPS without UNI or PON ports are not supported'
+ return False # UNI Ports handled in the EVC Maps
+
+ # ACL logic
+ self._eth_type = flow.eth_type
+
+ if self._eth_type == FlowEntry.EtherType.IPv4:
+ self._ip_protocol = flow.ip_protocol
+ self._ipv4_dst = flow.ipv4_dst
+
+ if self._ip_protocol == FlowEntry.IpProtocol.UDP:
+ self._udp_dst = flow.udp_dst
+ self._udp_src = flow.udp_src
+
+ # If no match of VLAN this may be for untagged traffic or upstream and needs to
+ # match the gem-port vid
+
+ self._setup_tech_profiles()
+
+ # self._match_untagged = flow.vlan_id is None and flow.inner_vid is None
+ self._c_tag = flow.inner_vid or flow.vlan_id
+
+ # If a push of a single VLAN is present with a POP of the VLAN in the EVC's
+ # flow, then this is a traditional EVC flow
+
+ evc.men_to_uni_tag_manipulation = EVC.Men2UniManipulation.POP_OUT_TAG_ONLY
+ evc.switching_method = EVC.SwitchingMethod.DOUBLE_TAGGED \
+ if self._c_tag is not None else EVC.SwitchingMethod.SINGLE_TAGGED
+
+ try:
+ acl = ACL.create(flow)
+ if acl.name not in self._new_acls:
+ self._new_acls[acl.name] = acl
+
+ except Exception as e:
+ log.exception('ACL-decoding', e=e)
+ return False
+
+ return True
+
+ # Bulk operations
+
+ @staticmethod
+ def remove_all(client, regex_=EVC_MAP_NAME_REGEX_ALL):
+ """
+ Remove all matching EVC Maps from hardware
+
+ :param client: (ncclient) NETCONF Client to use
+ :param regex_: (String) Regular expression for name matching
+ :return: (deferred)
+ """
+ # Do a 'get' on the evc-map config an you should get the names
+ get_xml = """
+ <filter>
+ <evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps">
+ <evc-map>
+ <name/>
+ </evc-map>
+ </evc-maps>
+ </filter>
+ """
+ log.debug('query', xml=get_xml, regex=regex_)
+
+ def request_failed(results, operation):
+ log.error('{}-failed'.format(operation), results=results)
+ # No further actions. Periodic poll later on will scrub any old EVC-Maps if needed
+
+ def delete_complete(results):
+ log.debug('delete-complete', results=results)
+
+ def do_delete(rpc_reply, regexpr):
+ log.debug('query-complete', rpc_reply=rpc_reply)
+
+ if rpc_reply.ok:
+ result_dict = xmltodict.parse(rpc_reply.data_xml)
+ entries = result_dict['data']['evc-maps'] if 'evc-maps' in result_dict['data'] else {}
+
+ if 'evc-map' in entries:
+ p = re.compile(regexpr)
+
+ if isinstance(entries['evc-map'], list):
+ names = {entry['name'] for entry in entries['evc-map']
+ if 'name' in entry and p.match(entry['name'])}
+ else:
+ names = set()
+ for item in entries['evc-map'].items():
+ if isinstance(item, tuple) and item[0] == 'name':
+ names.add(item[1])
+ break
+
+ if len(names) > 0:
+ del_xml = '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps"' + \
+ ' xc:operation = "delete">'
+ for name in names:
+ del_xml += '<evc-map>'
+ del_xml += '<name>{}</name>'.format(name)
+ del_xml += '</evc-map>'
+ del_xml += '</evc-maps>'
+ log.debug('removing', xml=del_xml)
+
+ return client.edit_config(del_xml)
+
+ return succeed('no entries')
+
+ d = client.get(get_xml)
+ d.addCallbacks(do_delete, request_failed, callbackArgs=[regex_], errbackArgs=['get'])
+ d.addCallbacks(delete_complete, request_failed, errbackArgs=['edit-config'])
+ return d
+
+ def _cancel_deferred(self):
+ d, self._deferred = self._deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass