blob: 21dddec2d776f29289e061b7c76fcea24a412e80 [file] [log] [blame]
Zsolt Haraszti66862032016-11-28 14:28:39 -08001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti66862032016-11-28 14:28:39 -08003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Agent to play gateway between CORE and an individual adapter.
19"""
20from uuid import uuid4
21
Stephane Barbarie4db8ca22017-04-24 10:30:20 -040022import arrow
Zsolt Haraszti66862032016-11-28 14:28:39 -080023import structlog
Zsolt Harasztief05ad22017-01-07 22:08:06 -080024from google.protobuf.json_format import MessageToJson
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -080025from scapy.packet import Packet
Zsolt Haraszti66862032016-11-28 14:28:39 -080026from twisted.internet.defer import inlineCallbacks, returnValue
27from zope.interface import implementer
28
Zsolt Haraszti89a27302016-12-08 16:53:06 -080029from common.event_bus import EventBusClient
Zsolt Harasztief05ad22017-01-07 22:08:06 -080030from common.frameio.frameio import hexify
khenaidooa8588f22017-06-16 12:13:34 -040031from common.utils.id_generation import create_cluster_logical_device_ids
Zsolt Haraszti66862032016-11-28 14:28:39 -080032from voltha.adapters.interface import IAdapterAgent
33from voltha.protos import third_party
Stephane Barbarie4db8ca22017-04-24 10:30:20 -040034from voltha.core.flow_decomposer import OUTPUT
Sergio Slobodriana2eb52b2017-03-07 12:24:46 -050035from voltha.protos.device_pb2 import Device, Port, PmConfigs
Stephane Barbarie4db8ca22017-04-24 10:30:20 -040036from voltha.protos.events_pb2 import AlarmEvent, AlarmEventType, \
37 AlarmEventSeverity, AlarmEventState, AlarmEventCategory
Sergio Slobodriana2eb52b2017-03-07 12:24:46 -050038from voltha.protos.events_pb2 import KpiEvent
Zsolt Haraszti66862032016-11-28 14:28:39 -080039from voltha.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
Stephane Barbarie4db8ca22017-04-24 10:30:20 -040040 LogicalPort, AdminState, OperStatus, AlarmFilterRuleKey
Zsolt Haraszti66862032016-11-28 14:28:39 -080041from voltha.registry import registry
khenaidoo08d48d22017-06-29 19:42:49 -040042from common.utils.id_generation import create_cluster_device_id
khenaidoo507d9222017-10-10 16:23:49 -040043import re
44
45
46class MacAddressError(BaseException):
47 def __init__(self, error):
48 self.error = error
Zsolt Haraszti66862032016-11-28 14:28:39 -080049
Zsolt Haraszti66862032016-11-28 14:28:39 -080050@implementer(IAdapterAgent)
51class AdapterAgent(object):
52 """
53 Gate-keeper between CORE and device adapters.
54
55 On one side it interacts with Core's internal model and update/dispatch
56 mechanisms.
57
58 On the other side, it interacts with the adapters standard interface as
59 defined in
60 """
61
62 def __init__(self, adapter_name, adapter_cls):
63 self.adapter_name = adapter_name
64 self.adapter_cls = adapter_cls
65 self.core = registry('core')
66 self.adapter = None
67 self.adapter_node_proxy = None
68 self.root_proxy = self.core.get_proxy('/')
Zsolt Haraszti89a27302016-12-08 16:53:06 -080069 self._rx_event_subscriptions = {}
70 self._tx_event_subscriptions = {}
71 self.event_bus = EventBusClient()
Khen Nursimulud068d812017-03-06 11:44:18 -050072 self.packet_out_subscription = None
Zsolt Haraszti89a27302016-12-08 16:53:06 -080073 self.log = structlog.get_logger(adapter_name=adapter_name)
Nikolay Titov89004ec2017-06-19 18:22:42 -040074 self._onu_detect_event_subscriptions = {}
Zsolt Haraszti66862032016-11-28 14:28:39 -080075
76 @inlineCallbacks
77 def start(self):
Zsolt Haraszti89a27302016-12-08 16:53:06 -080078 self.log.debug('starting')
Zsolt Haraszti66862032016-11-28 14:28:39 -080079 config = self._get_adapter_config() # this may be None
Zsolt Haraszti89a27302016-12-08 16:53:06 -080080 try:
81 adapter = self.adapter_cls(self, config)
82 yield adapter.start()
Zsolt Haraszti656ecc62016-12-28 15:08:23 -080083 self.adapter = adapter
84 self.adapter_node_proxy = self._update_adapter_node()
85 self._update_device_types()
Zsolt Haraszti89a27302016-12-08 16:53:06 -080086 except Exception, e:
87 self.log.exception(e)
Zsolt Haraszti89a27302016-12-08 16:53:06 -080088 self.log.info('started')
Zsolt Haraszti66862032016-11-28 14:28:39 -080089 returnValue(self)
90
91 @inlineCallbacks
92 def stop(self):
Zsolt Haraszti89a27302016-12-08 16:53:06 -080093 self.log.debug('stopping')
Zsolt Haraszti66862032016-11-28 14:28:39 -080094 if self.adapter is not None:
95 yield self.adapter.stop()
96 self.adapter = None
Zsolt Haraszti89a27302016-12-08 16:53:06 -080097 self.log.info('stopped')
Zsolt Haraszti66862032016-11-28 14:28:39 -080098
99 def _get_adapter_config(self):
100 """
101 Opportunistically load persisted adapter configuration.
102 Return None if no configuration exists yet.
103 """
104 proxy = self.core.get_proxy('/')
105 try:
106 config = proxy.get('/adapters/' + self.adapter_name)
107 return config
108 except KeyError:
109 return None
110
111 def _update_adapter_node(self):
112 """
113 Creates or updates the adapter node object based on self
114 description from the adapter.
115 """
116
117 adapter_desc = self.adapter.adapter_descriptor()
118 assert adapter_desc.id == self.adapter_name
119 path = self._make_up_to_date(
120 '/adapters', self.adapter_name, adapter_desc)
121 return self.core.get_proxy(path)
122
123 def _update_device_types(self):
124 """
125 Make sure device types are registered in Core
126 """
127 device_types = self.adapter.device_types()
128 for device_type in device_types.items:
129 key = device_type.id
130 self._make_up_to_date('/device_types', key, device_type)
131
132 def _make_up_to_date(self, container_path, key, data):
133 full_path = container_path + '/' + str(key)
134 root_proxy = self.core.get_proxy('/')
135 try:
136 root_proxy.get(full_path)
137 root_proxy.update(full_path, data)
138 except KeyError:
139 root_proxy.add(container_path, data)
140 return full_path
141
Khen Nursimulud068d812017-03-06 11:44:18 -0500142 def _remove_node(self, container_path, key):
143 """
144 Remove a node from the data model
145 :param container_path: path to node
146 :param key: node
147 :return: None
148 """
149 full_path = container_path + '/' + str(key)
150 root_proxy = self.core.get_proxy('/')
151 try:
152 root_proxy.get(full_path)
153 root_proxy.remove(full_path)
154 except KeyError:
155 # Node does not exist
156 pass
157
Zsolt Haraszti66862032016-11-28 14:28:39 -0800158 # ~~~~~~~~~~~~~~~~~~~~~ Core-Facing Service ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
159
160 def adopt_device(self, device):
161 return self.adapter.adopt_device(device)
162
khenaidoo032d3302017-06-09 14:50:04 -0400163 def reconcile_device(self, device):
164 return self.adapter.reconcile_device(device)
165
Zsolt Haraszti66862032016-11-28 14:28:39 -0800166 def abandon_device(self, device):
167 return self.adapter.abandon_device(device)
168
Khen Nursimulud068d812017-03-06 11:44:18 -0500169 def disable_device(self, device):
170 return self.adapter.disable_device(device)
171
172 def reenable_device(self, device):
173 return self.adapter.reenable_device(device)
174
175 def reboot_device(self, device):
176 return self.adapter.reboot_device(device)
177
Lydia Fang01f2e852017-06-28 17:24:58 -0700178 def download_image(self, device, request):
179 return self.adapter.download_image(device, request)
180
181 def get_image_download_status(self, device, request):
182 return self.adapter.get_image_download_status(device, request)
183
184 def cancel_image_download(self, device, request):
185 return self.adapter.cancel_image_download(device, request)
186
187 def activate_image_update(self, device, request):
188 return self.adapter.activate_image_update(device, request)
189
190 def revert_image_update(self, device, request):
191 return self.adapter.revert_image_update(device, request)
192
sathishg5ae86222017-06-28 15:16:29 +0530193 def self_test(self, device):
194 return self.adapter.self_test_device(device)
195
Khen Nursimulud068d812017-03-06 11:44:18 -0500196 def delete_device(self, device):
197 return self.adapter.delete_device(device)
198
199 def get_device_details(self, device):
200 return self.adapter.get_device_details(device)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800201
Zsolt Harasztic5c5d102016-12-07 21:12:27 -0800202 def update_flows_bulk(self, device, flows, groups):
203 return self.adapter.update_flows_bulk(device, flows, groups)
204
205 def update_flows_incrementally(self, device, flow_changes, group_changes):
206 return self.update_flows_incrementally(
207 device, flow_changes, group_changes)
208
Stephane Barbarie980a0912017-05-11 11:27:06 -0400209 def suppress_alarm(self, filter):
210 return self.adapter.suppress_alarm(filter)
211
212 def unsuppress_alarm(self, filter):
213 return self.adapter.unsuppress_alarm(filter)
214
Stephane Barbarie4db8ca22017-04-24 10:30:20 -0400215 # def update_pm_collection(self, device, pm_collection_config):
Sergio Slobodriana2eb52b2017-03-07 12:24:46 -0500216 # return self.adapter.update_pm_collection(device, pm_collection_config)
217
Nikolay Titov176f1db2017-08-10 12:38:43 -0400218 def create_interface(self, device, data):
219 return self.adapter.create_interface(device, data)
Nikolay Titov89004ec2017-06-19 18:22:42 -0400220
Nikolay Titov176f1db2017-08-10 12:38:43 -0400221 def update_interface(self, device, data):
222 return self.adapter.update_interface(device, data)
Nikolay Titov89004ec2017-06-19 18:22:42 -0400223
224 def remove_interface(self, device, data):
225 return self.adapter.remove_interface(device, data)
226
Nikolay Titov176f1db2017-08-10 12:38:43 -0400227 def create_tcont(self, device, tcont_data, traffic_descriptor_data):
228 return self.adapter.create_tcont(device, tcont_data,
229 traffic_descriptor_data)
230
231 def update_tcont(self, device, tcont_data, traffic_descriptor_data):
232 return self.adapter.update_tcont(device, tcont_data,
233 traffic_descriptor_data)
234
235 def remove_tcont(self, device, tcont_data, traffic_descriptor_data):
236 return self.adapter.remove_tcont(device, tcont_data,
237 traffic_descriptor_data)
238
239 def create_gemport(self, device, data):
240 return self.adapter.create_gemport(device, data)
241
242 def update_gemport(self, device, data):
243 return self.adapter.update_gemport(device, data)
244
245 def remove_gemport(self, device, data):
246 return self.adapter.remove_gemport(device, data)
247
248 def create_multicast_gemport(self, device, data):
249 return self.adapter.create_multicast_gemport(device, data)
250
251 def update_multicast_gemport(self, device, data):
252 return self.adapter.update_multicast_gemport(device, data)
253
254 def remove_multicast_gemport(self, device, data):
255 return self.adapter.remove_multicast_gemport(device, data)
256
257 def create_multicast_distribution_set(self, device, data):
258 return self.adapter.create_multicast_distribution_set(device, data)
259
260 def update_multicast_distribution_set(self, device, data):
261 return self.adapter.update_multicast_distribution_set(device, data)
262
263 def remove_multicast_distribution_set(self, device, data):
264 return self.adapter.remove_multicast_distribution_set(device, data)
Sergio Slobodriana2eb52b2017-03-07 12:24:46 -0500265
Zsolt Haraszti66862032016-11-28 14:28:39 -0800266 # ~~~~~~~~~~~~~~~~~~~ Adapter-Facing Service ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
267
268 def get_device(self, device_id):
269 return self.root_proxy.get('/devices/{}'.format(device_id))
270
Peter Shafik9107f2e2017-05-02 15:54:39 -0400271 def get_child_device(self, parent_device_id, **kwargs):
272 """
273 Retrieve a child device object belonging
274 to the specified parent device based on some match
275 criteria. The first child device that matches the
276 provided criteria is returned.
277 :param parent_device_id: parent's device id
278 :param **kwargs: arbitrary list of match criteria
279 :return: Child Device Object or None
280 """
281 # Get all arguments to be used for comparison
Nikolay Titov89004ec2017-06-19 18:22:42 -0400282 # Note that for now we are only matching on the ONU ID & SERIAL NUMBER
Peter Shafik9107f2e2017-05-02 15:54:39 -0400283 # Other matching fields can be added as required in the future
284 onu_id = kwargs.pop('onu_id', None)
Nikolay Titov89004ec2017-06-19 18:22:42 -0400285 serial_number = kwargs.pop('serial_number', None)
286 if onu_id is None and serial_number is None: return None
Peter Shafik9107f2e2017-05-02 15:54:39 -0400287
288 # Get all devices
289 devices = self.root_proxy.get('/devices')
290
291 # Get all child devices with the same parent ID
292 children_ids = set(d.id for d in devices if d.parent_id == parent_device_id)
293
294 # Loop through all the child devices with this parent ID
295 for child_id in children_ids:
296 found = True
297 device = self.get_device(child_id)
298
299 # Does this child device match the passed in ONU ID?
Nikolay Titov89004ec2017-06-19 18:22:42 -0400300 found_onu_id = False
301 if onu_id is not None:
302 if device.proxy_address.onu_id == onu_id:
303 found_onu_id = True
304
305 # Does this child device match the passed in SERIAL NUMBER?
306 found_serial_number = False
307 if serial_number is not None:
308 if device.serial_number == serial_number:
309 found_serial_number = True
310 # Match ONU ID and SERIAL NUMBER
311 if onu_id is not None and serial_number is not None:
312 found = found_onu_id & found_serial_number
313 # Otherwise ONU ID or SERIAL NUMBER
314 else:
315 found = found_onu_id | found_serial_number
Peter Shafik9107f2e2017-05-02 15:54:39 -0400316
317 # Return the matched child device
318 if found is True:
319 return device
320
321 return None
322
Zsolt Haraszti66862032016-11-28 14:28:39 -0800323 def add_device(self, device):
324 assert isinstance(device, Device)
325 self._make_up_to_date('/devices', device.id, device)
326
alshabibbe8ca2e2017-02-01 18:28:57 -0800327 # Ultimately, assign devices to device grpups.
328 # see https://jira.opencord.org/browse/CORD-838
Zsolt Haraszti66862032016-11-28 14:28:39 -0800329
330 dg = DeviceGroup(id='1')
331 self._make_up_to_date('/device_groups', dg.id, dg)
332
333 # add device to device group
alshabibbe8ca2e2017-02-01 18:28:57 -0800334 # see https://jira.opencord.org/browse/CORD-838
Zsolt Haraszti66862032016-11-28 14:28:39 -0800335
336 def update_device(self, device):
337 assert isinstance(device, Device)
338
339 # we run the update through the device_agent so that the change
340 # does not loop back to the adapter unnecessarily
341 device_agent = self.core.get_device_agent(device.id)
342 device_agent.update_device(device)
343
Sergio Slobodrian2db4c102017-03-09 22:29:23 -0500344 def update_device_pm_config(self, device_pm_config, init=False):
345 assert isinstance(device_pm_config, PmConfigs)
346
347 # we run the update through the device_agent so that the change
348 # does not loop back to the adapter unnecessarily
349 device_agent = self.core.get_device_agent(device_pm_config.id)
Stephane Barbarie4db8ca22017-04-24 10:30:20 -0400350 device_agent.update_device_pm_config(device_pm_config, init)
Sergio Slobodrian2db4c102017-03-09 22:29:23 -0500351
Sergio Slobodrian98eff412017-03-15 14:46:30 -0400352 def update_adapter_pm_config(self, device_id, device_pm_config):
353 device = self.get_device(device_id)
Sergio Slobodrian2db4c102017-03-09 22:29:23 -0500354 self.adapter.update_pm_config(device, device_pm_config)
355
Lydia Fang01f2e852017-06-28 17:24:58 -0700356 def update_image_download(self, img_dnld):
357 self.log.info('update-image-download', img_dnld=img_dnld)
358 try:
359 # we run the update through the device_agent so that the change
360 # does not loop back to the adapter unnecessarily
361 device_agent = self.core.get_device_agent(img_dnld.id)
362 device_agent.update_device_image_download(img_dnld)
363 except Exception as e:
364 self.log.exception(e.message)
365
366 def delete_image_download(self, img_dnld):
367 self.log.info('delete-image-download', img_dnld=img_dnld)
368 try:
369 root_proxy = self.core.get_proxy('/')
370 path = '/devices/{}/image_downloads/{}'.\
371 format(img_dnld.id, img_dnld.name)
372 root_proxy.get(path)
373 root_proxy.remove(path)
374 device_agent = self.core.get_device_agent(img_dnld.id)
375 device_agent.unregister_device_image_download(img_dnld.name)
376 except Exception as e:
377 self.log.exception(e.message)
378
Khen Nursimuluc60afa12017-03-13 14:33:50 -0400379 def _add_peer_reference(self, device_id, port):
Zsolt Haraszti66862032016-11-28 14:28:39 -0800380 # for referential integrity, add/augment references
381 port.device_id = device_id
382 me_as_peer = Port.PeerPort(device_id=device_id, port_no=port.port_no)
383 for peer in port.peers:
384 peer_port_path = '/devices/{}/ports/{}'.format(
385 peer.device_id, peer.port_no)
386 peer_port = self.root_proxy.get(peer_port_path)
387 if me_as_peer not in peer_port.peers:
388 new = peer_port.peers.add()
389 new.CopyFrom(me_as_peer)
390 self.root_proxy.update(peer_port_path, peer_port)
391
Khen Nursimuluc60afa12017-03-13 14:33:50 -0400392 def _del_peer_reference(self, device_id, port):
393 me_as_peer = Port.PeerPort(device_id=device_id, port_no=port.port_no)
394 for peer in port.peers:
395 peer_port_path = '/devices/{}/ports/{}'.format(
396 peer.device_id, peer.port_no)
397 peer_port = self.root_proxy.get(peer_port_path)
398 if me_as_peer in peer_port.peers:
399 peer_port.peers.remove(me_as_peer)
400 self.root_proxy.update(peer_port_path, peer_port)
401
402 def add_port(self, device_id, port):
403 assert isinstance(port, Port)
404
405 # for referential integrity, add/augment references
406 self._add_peer_reference(device_id, port)
407
408 # Add port
Zsolt Haraszti66862032016-11-28 14:28:39 -0800409 self._make_up_to_date('/devices/{}/ports'.format(device_id),
410 port.port_no, port)
411
khenaidoo032d3302017-06-09 14:50:04 -0400412 def get_ports(self, device_id, port_type):
413 # assert Port.PortType.DESCRIPTOR.values_by_name[port_type]
414 ports = self.root_proxy.get('/devices/{}/ports'.format(device_id))
415 return [p for p in ports if p.type == port_type]
416
Niren R Chidrawar06bbb6f2017-08-31 02:37:06 -0400417 def delete_port(self, device_id, port):
418 assert isinstance(port, Port)
419 # for referential integrity, add/augment references
420 self._del_peer_reference(device_id, port)
421 # Delete port
422 self._remove_node('/devices/{}/ports'.format(device_id), port.port_no)
khenaidoo032d3302017-06-09 14:50:04 -0400423
Khen Nursimulud068d812017-03-06 11:44:18 -0500424 def disable_all_ports(self, device_id):
425 """
426 Disable all ports on that device, i.e. change the admin status to
Khen Nursimuluc60afa12017-03-13 14:33:50 -0400427 disable and operational status to UNKNOWN.
Khen Nursimulud068d812017-03-06 11:44:18 -0500428 :param device_id: device id
429 :return: None
430 """
431
432 # get all device ports
433 ports = self.root_proxy.get('/devices/{}/ports'.format(device_id))
434 for port in ports:
435 port.admin_state = AdminState.DISABLED
436 port.oper_status = OperStatus.UNKNOWN
437 self._make_up_to_date('/devices/{}/ports'.format(device_id),
438 port.port_no, port)
439
Khen Nursimuluc60afa12017-03-13 14:33:50 -0400440 def enable_all_ports(self, device_id):
Khen Nursimulud068d812017-03-06 11:44:18 -0500441 """
442 Re-enable all ports on that device, i.e. change the admin status to
443 enabled and operational status to ACTIVE
444 :param device_id: device id
445 :return: None
446 """
447
448 # get all device ports
449 ports = self.root_proxy.get('/devices/{}/ports'.format(device_id))
450 for port in ports:
451 port.admin_state = AdminState.ENABLED
452 port.oper_status = OperStatus.ACTIVE
453 self._make_up_to_date('/devices/{}/ports'.format(device_id),
454 port.port_no, port)
455
Niren R Chidrawar06bbb6f2017-08-31 02:37:06 -0400456 def update_operstatus_all_ports(self, device_id, oper_status):
457 ports = self.root_proxy.get('/devices/{}/ports'.format(device_id))
458 for port in ports:
459 port.oper_status = oper_status
460 self._make_up_to_date('/devices/{}/ports'.format(device_id),
461 port.port_no, port)
462
Khen Nursimulud068d812017-03-06 11:44:18 -0500463 def delete_all_peer_references(self, device_id):
464 """
465 Remove all peer port references for that device
466 :param device_id: device_id of device
467 :return: None
468 """
469 ports = self.root_proxy.get('/devices/{}/ports'.format(device_id))
470 for port in ports:
471 port_path = '/devices/{}/ports/{}'.format(device_id, port.port_no)
472 for peer in port.peers:
473 port.peers.remove(peer)
474 self.root_proxy.update(port_path, port)
475
476 def delete_port_reference_from_parent(self, device_id, port):
477 """
478 Delete the port reference from the parent device
479 :param device_id: id of device containing the port
480 :param port: port to remove
481 :return: None
482 """
483 assert isinstance(port, Port)
484 self.log.info('delete-port-reference', device_id=device_id, port=port)
Khen Nursimuluc60afa12017-03-13 14:33:50 -0400485 self._del_peer_reference(device_id, port)
Khen Nursimulud068d812017-03-06 11:44:18 -0500486
rshetty1cc73982017-09-02 03:31:12 +0530487 # update child port details
488 self._make_up_to_date('/devices/{}/ports'.format(device_id),
489 port.port_no, port)
490
Khen Nursimuluc60afa12017-03-13 14:33:50 -0400491 def add_port_reference_to_parent(self, device_id, port):
492 """
493 Add the port reference to the parent device
494 :param device_id: id of device containing the port
495 :param port: port to add
496 :return: None
497 """
498 assert isinstance(port, Port)
499 self.log.info('add-port-reference', device_id=device_id, port=port)
500 self._add_peer_reference(device_id, port)
rshetty1cc73982017-09-02 03:31:12 +0530501 # update child port details
502 self._make_up_to_date('/devices/{}/ports'.format(device_id),
503 port.port_no, port)
Khen Nursimulud068d812017-03-06 11:44:18 -0500504
khenaidoo507d9222017-10-10 16:23:49 -0400505 def _find_first_available_id(self, dpid=None):
506
507 def _is_valid_mac_address(dpid):
508 return re.match("[0-9a-f]{2}([-:])[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$",
509 dpid)
510
511 # If a dpid is provided then validate whether it is a MAC address
512 switch_id = 1
513 if dpid:
514 dpid = dpid.lower()
515 if not _is_valid_mac_address(dpid):
516 raise MacAddressError('Invalid-mac-address-format')
517 switch_id = int(dpid.replace(':', ''), 16)
518
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800519 logical_devices = self.root_proxy.get('/logical_devices')
520 existing_ids = set(ld.id for ld in logical_devices)
521 existing_datapath_ids = set(ld.datapath_id for ld in logical_devices)
khenaidooa8588f22017-06-16 12:13:34 -0400522 core_id = registry('core').core_store_id
khenaidoo507d9222017-10-10 16:23:49 -0400523
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800524 while True:
khenaidoo507d9222017-10-10 16:23:49 -0400525 ld_id, dp_id = create_cluster_logical_device_ids(core_id, switch_id)
khenaidooe6865a62017-11-08 10:02:46 -0500526 id_exists = dp_id in existing_datapath_ids or ld_id in \
khenaidoo507d9222017-10-10 16:23:49 -0400527 existing_ids
khenaidooe6865a62017-11-08 10:02:46 -0500528 if not id_exists:
khenaidooa8588f22017-06-16 12:13:34 -0400529 return ld_id, dp_id
khenaidoo507d9222017-10-10 16:23:49 -0400530 else:
531 if dpid:
532 raise MacAddressError('Already-registered-mac-address')
533 else:
534 switch_id += 1
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800535
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800536 def get_logical_device(self, logical_device_id):
537 return self.root_proxy.get('/logical_devices/{}'.format(
538 logical_device_id))
539
Khen Nursimulud068d812017-03-06 11:44:18 -0500540 def get_logical_port(self, logical_device_id, port_id):
541 return self.root_proxy.get('/logical_devices/{}/ports/{}'.format(
542 logical_device_id, port_id))
543
khenaidoo507d9222017-10-10 16:23:49 -0400544 def create_logical_device(self, logical_device, dpid=None):
545 """
546 Allow the adapters to provide their own datapath id. This must
547 be the OLT MAC address.
548 :param logical_device:
549 :param dpid: OLT MAC address
550 :return: updated logical device
551 """
Zsolt Haraszti66862032016-11-28 14:28:39 -0800552 assert isinstance(logical_device, LogicalDevice)
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800553
554 if not logical_device.id:
khenaidoo507d9222017-10-10 16:23:49 -0400555 ld_id, dp_id = self._find_first_available_id(dpid)
khenaidooa8588f22017-06-16 12:13:34 -0400556 logical_device.id = ld_id
557 logical_device.datapath_id = dp_id
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800558
Zsolt Haraszti66862032016-11-28 14:28:39 -0800559 self._make_up_to_date('/logical_devices',
560 logical_device.id, logical_device)
561
Khen Nursimulud068d812017-03-06 11:44:18 -0500562 # Keep a reference to the packet out subscription as it will be
563 # referred during removal
564 self.packet_out_subscription = self.event_bus.subscribe(
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800565 topic='packet-out:{}'.format(logical_device.id),
566 callback=lambda _, p: self.receive_packet_out(logical_device.id, p)
567 )
568
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800569 return logical_device
570
khenaidoo032d3302017-06-09 14:50:04 -0400571 def reconcile_logical_device(self, logical_device_id):
572 """
573 This is called by the adapter to reconcile the physical device with
574 the logical device. For now, we only set the packet-out subscription
575 :param logical_device_id:
576 :return:
577 """
578 # Keep a reference to the packet out subscription as it will be
579 # referred during removal
580 self.packet_out_subscription = self.event_bus.subscribe(
581 topic='packet-out:{}'.format(logical_device_id),
582 callback=lambda _, p: self.receive_packet_out(logical_device_id, p)
583 )
584
585
Khen Nursimulud068d812017-03-06 11:44:18 -0500586 def delete_logical_device(self, logical_device):
587 """
588 This will remove the logical device as well as all logical ports
589 associated with it
590 :param logical_device: The logical device to remove
591 :return: None
592 """
593 assert isinstance(logical_device, LogicalDevice)
594
595 # Remove packet out subscription
596 self.event_bus.unsubscribe(self.packet_out_subscription)
597
598 # Remove node from the data model - this will trigger the logical
599 # device 'remove callbacks' as well as logical ports 'remove
600 # callbacks' if present
601 self._remove_node('/logical_devices', logical_device.id)
602
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800603 def receive_packet_out(self, logical_device_id, ofp_packet_out):
604
605 def get_port_out(opo):
606 for action in opo.actions:
607 if action.type == OUTPUT:
608 return action.output.port
609
610 out_port = get_port_out(ofp_packet_out)
611 frame = ofp_packet_out.data
612 self.adapter.receive_packet_out(logical_device_id, out_port, frame)
613
Zsolt Haraszti66862032016-11-28 14:28:39 -0800614 def add_logical_port(self, logical_device_id, port):
615 assert isinstance(port, LogicalPort)
616 self._make_up_to_date(
617 '/logical_devices/{}/ports'.format(logical_device_id),
618 port.id, port)
619
Khen Nursimulud068d812017-03-06 11:44:18 -0500620 def delete_logical_port(self, logical_device_id, port):
621 assert isinstance(port, LogicalPort)
622 self._remove_node('/logical_devices/{}/ports'.format(
623 logical_device_id), port.id)
624
Niren R Chidrawar06bbb6f2017-08-31 02:37:06 -0400625 def delete_logical_port_by_id(self, logical_device_id, port_id):
626 self._remove_node('/logical_devices/{}/ports'.format(
627 logical_device_id), port_id)
628
Khen Nursimulud068d812017-03-06 11:44:18 -0500629 def update_logical_port(self, logical_device_id, port):
630 assert isinstance(port, LogicalPort)
631 self.log.debug('update-logical-port',
632 logical_device_id=logical_device_id,
633 port=port)
Khen Nursimulud068d812017-03-06 11:44:18 -0500634 self._make_up_to_date(
635 '/logical_devices/{}/ports'.format(logical_device_id),
636 port.id, port)
637
khenaidoo032d3302017-06-09 14:50:04 -0400638 def get_child_devices(self, parent_device_id):
639 try:
640 devices = self.root_proxy.get('/devices')
641 children = [d for d in devices if d.parent_id == parent_device_id]
642 return children
643 except Exception, e:
644 self.log.exception('failure', e=e)
645
646 def subscribe_to_proxy_child_messages(self, proxy_address):
647 topic = self._gen_tx_proxy_address_topic(proxy_address)
648 self._tx_event_subscriptions[topic] = self.event_bus.subscribe(
649 topic, lambda t, m: self._send_proxied_message(proxy_address, m))
650
651 def reconcile_child_devices(self, parent_device_id):
652 children = self.get_child_devices(parent_device_id)
653 for child in children:
654 # First subscribe to proxy messages from a chile device
655 self.subscribe_to_proxy_child_messages(child.proxy_address)
656
657 # Then trigger the reconciliation of the existing child device
658 device_agent = self.core.get_device_agent(child.id)
659 device_agent.reconcile_existing_device(child)
660
661
Niren R Chidrawarefcebcd2017-07-19 20:03:39 -0400662 #Obselete API - discouraged to be decommissioned after
663 #adapters are align to new APIs
Zsolt Haraszti66862032016-11-28 14:28:39 -0800664 def child_device_detected(self,
665 parent_device_id,
666 parent_port_no,
667 child_device_type,
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800668 proxy_address,
Nikolay Titov89004ec2017-06-19 18:22:42 -0400669 admin_state,
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800670 **kw):
Zsolt Haraszti66862032016-11-28 14:28:39 -0800671 # we create new ONU device objects and insert them into the config
Zsolt Haraszti66862032016-11-28 14:28:39 -0800672 device = Device(
khenaidoo08d48d22017-06-29 19:42:49 -0400673 id=create_cluster_device_id(self.core.core_store_id),
674 # id=uuid4().hex[:12],
Zsolt Haraszti66862032016-11-28 14:28:39 -0800675 type=child_device_type,
676 parent_id=parent_device_id,
677 parent_port_no=parent_port_no,
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800678 proxy_address=proxy_address,
Nikolay Titov89004ec2017-06-19 18:22:42 -0400679 admin_state=admin_state,
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800680 **kw
Zsolt Haraszti66862032016-11-28 14:28:39 -0800681 )
682 self._make_up_to_date(
683 '/devices', device.id, device)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800684
685 topic = self._gen_tx_proxy_address_topic(proxy_address)
686 self._tx_event_subscriptions[topic] = self.event_bus.subscribe(
687 topic, lambda t, m: self._send_proxied_message(proxy_address, m))
688
Niren R Chidrawarefcebcd2017-07-19 20:03:39 -0400689 def add_onu_device(self,
690 parent_device_id,
691 parent_port_no,
692 vendor_id,
693 proxy_address,
694 admin_state,
695 **kw):
696 device_type = next((dt for dt in self.root_proxy.get('/device_types')
Sireesha Kora8fbcb822017-08-22 00:46:46 -0400697 if dt.vendor_id == vendor_id and \
698 dt.id.endswith("_onu")), None)
Niren R Chidrawarefcebcd2017-07-19 20:03:39 -0400699 # we create new ONU device objects and insert them into the config
700 device = Device(
701 id=create_cluster_device_id(self.core.core_store_id),
702 # id=uuid4().hex[:12],
703 type=device_type.id,
704 vendor_id=vendor_id,
705 parent_id=parent_device_id,
706 parent_port_no=parent_port_no,
707 proxy_address=proxy_address,
708 admin_state=admin_state,
709 adapter=device_type.adapter,
710 **kw
711 )
712 self._make_up_to_date('/devices', device.id, device)
713
714 topic = self._gen_tx_proxy_address_topic(proxy_address)
715 self._tx_event_subscriptions[topic] = self.event_bus.subscribe(
716 topic, lambda t, m: self._send_proxied_message(proxy_address, m))
717
rshettyc26a3c32017-07-27 11:06:38 +0530718
khenaidoo032d3302017-06-09 14:50:04 -0400719 def get_child_device_with_proxy_address(self, proxy_address):
720 # Proxy address is defined as {parent id, channel_id}
721 devices = self.root_proxy.get('/devices')
722 children_ids = set(d.id for d in devices if d.parent_id ==
723 proxy_address.device_id)
724 for child_id in children_ids:
725 device = self.get_device(child_id)
726 if device.proxy_address == proxy_address:
727 return device
728
Khen Nursimulud068d812017-03-06 11:44:18 -0500729 def remove_all_logical_ports(self, logical_device_id):
730 """ Remove all logical ports from a given logical device"""
731 ports = self.root_proxy.get('/logical_devices/{}/ports')
732 for port in ports:
733 self._remove_node('/logical_devices/{}/ports', port.id)
734
735 def delete_all_child_devices(self, parent_device_id):
736 """ Remove all ONUs from a given OLT """
737 devices = self.root_proxy.get('/devices')
Girish61687212018-01-08 12:48:58 +0530738 children_ids = set()
739 for device in devices:
740 if device.parent_id == parent_device_id:
741 children_ids.add(device.id)
742 topic = self._gen_tx_proxy_address_topic(device.proxy_address)
743 self.event_bus.unsubscribe(self._tx_event_subscriptions[topic])
744 del self._tx_event_subscriptions[topic]
745
Khen Nursimulud068d812017-03-06 11:44:18 -0500746 self.log.debug('devices-to-delete',
747 parent_id=parent_device_id,
748 children_ids=children_ids)
749 for child_id in children_ids:
750 self._remove_node('/devices', child_id)
751
khenaidoo2d7af132017-03-23 15:45:51 -0400752 def update_child_devices_state(self,
753 parent_device_id,
754 oper_status=None,
755 connect_status=None,
756 admin_state=None):
757 """ Update status of all child devices """
Khen Nursimulud068d812017-03-06 11:44:18 -0500758 devices = self.root_proxy.get('/devices')
759 children_ids = set(d.id for d in devices if d.parent_id == parent_device_id)
khenaidoo2d7af132017-03-23 15:45:51 -0400760 self.log.debug('update-devices',
Khen Nursimulud068d812017-03-06 11:44:18 -0500761 parent_id=parent_device_id,
khenaidoo2d7af132017-03-23 15:45:51 -0400762 children_ids=children_ids,
763 oper_status=oper_status,
764 connect_status=connect_status,
765 admin_state=admin_state)
Khen Nursimulud068d812017-03-06 11:44:18 -0500766
khenaidoo71d0a6c2017-03-22 21:46:04 -0400767 for child_id in children_ids:
768 device = self.get_device(child_id)
Niren R Chidrawar06bbb6f2017-08-31 02:37:06 -0400769 if oper_status is not None:
khenaidoo2d7af132017-03-23 15:45:51 -0400770 device.oper_status = oper_status
771 if connect_status:
772 device.connect_status = connect_status
773 if admin_state:
774 device.admin_state = admin_state
khenaidoo71d0a6c2017-03-22 21:46:04 -0400775 self._make_up_to_date(
776 '/devices', device.id, device)
777
Nikolay Titov89004ec2017-06-19 18:22:42 -0400778 def delete_child_device(self, parent_device_id, child_device_id):
779 onu_device = self.root_proxy.get('/devices/{}'.format(child_device_id))
780 if onu_device is not None:
781 if onu_device.parent_id == parent_device_id:
Girish61687212018-01-08 12:48:58 +0530782 self.log.debug('deleting-child-device',
783 parent_device_id=parent_device_id,
784 child_device_id=child_device_id)
785 topic = self._gen_tx_proxy_address_topic(onu_device.proxy_address)
786 self.event_bus.unsubscribe(self._tx_event_subscriptions[topic])
787 del self._tx_event_subscriptions[topic]
Nikolay Titov89004ec2017-06-19 18:22:42 -0400788 self._remove_node('/devices', child_device_id)
789
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800790 def _gen_rx_proxy_address_topic(self, proxy_address):
791 """Generate unique topic name specific to this proxy address for rx"""
Zsolt Harasztief05ad22017-01-07 22:08:06 -0800792 topic = 'rx:' + MessageToJson(proxy_address)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800793 return topic
794
795 def _gen_tx_proxy_address_topic(self, proxy_address):
796 """Generate unique topic name specific to this proxy address for tx"""
Zsolt Harasztief05ad22017-01-07 22:08:06 -0800797 topic = 'tx:' + MessageToJson(proxy_address)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800798 return topic
799
800 def register_for_proxied_messages(self, proxy_address):
801 topic = self._gen_rx_proxy_address_topic(proxy_address)
802 self._rx_event_subscriptions[topic] = self.event_bus.subscribe(
Stephane Barbariecc6b2e62017-03-02 14:35:55 -0500803 topic,
804 lambda t, m: self._receive_proxied_message(proxy_address, m))
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800805
Khen Nursimulud068d812017-03-06 11:44:18 -0500806 def unregister_for_proxied_messages(self, proxy_address):
807 topic = self._gen_rx_proxy_address_topic(proxy_address)
808 self.event_bus.unsubscribe(self._rx_event_subscriptions[topic])
809 del self._rx_event_subscriptions[topic]
810
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800811 def _receive_proxied_message(self, proxy_address, msg):
812 self.adapter.receive_proxied_message(proxy_address, msg)
813
814 def send_proxied_message(self, proxy_address, msg):
815 topic = self._gen_tx_proxy_address_topic(proxy_address)
816 self.event_bus.publish(topic, msg)
817
818 def _send_proxied_message(self, proxy_address, msg):
819 self.adapter.send_proxied_message(proxy_address, msg)
820
821 def receive_proxied_message(self, proxy_address, msg):
822 topic = self._gen_rx_proxy_address_topic(proxy_address)
823 self.event_bus.publish(topic, msg)
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800824
Peter Shafik9107f2e2017-05-02 15:54:39 -0400825 def register_for_inter_adapter_messages(self):
826 self.event_bus.subscribe(self.adapter_name,
827 lambda t, m: self.adapter.receive_inter_adapter_message(m))
828
829 def unregister_for_inter_adapter_messages(self):
830 self.event_bus.unsubscribe(self.adapter_name)
831
832 def publish_inter_adapter_message(self, device_id, msg):
833 # Get the device from the device_id
834 device = self.get_device(device_id)
835 assert device is not None
836
837 # Publish a message to the adapter that is responsible
838 # for managing this device
839 self.event_bus.publish(device.type, msg)
840
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800841 # ~~~~~~~~~~~~~~~~~~ Handling packet-in and packet-out ~~~~~~~~~~~~~~~~~~~~
842
843 def send_packet_in(self, logical_device_id, logical_port_no, packet):
844 self.log.debug('send-packet-in', logical_device_id=logical_device_id,
Zsolt Harasztief05ad22017-01-07 22:08:06 -0800845 logical_port_no=logical_port_no, packet=hexify(packet))
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800846
847 if isinstance(packet, Packet):
848 packet = str(packet)
849
850 topic = 'packet-in:' + logical_device_id
851 self.event_bus.publish(topic, (logical_port_no, packet))
Zsolt Haraszti749b0952017-01-18 09:02:35 -0800852
853 # ~~~~~~~~~~~~~~~~~~~ Handling KPI metric submissions ~~~~~~~~~~~~~~~~~~~~~
Zsolt Harasztic5f740b2017-01-18 09:53:17 -0800854
Zsolt Haraszti749b0952017-01-18 09:02:35 -0800855 def submit_kpis(self, kpi_event_msg):
856 try:
857 assert isinstance(kpi_event_msg, KpiEvent)
858 self.event_bus.publish('kpis', kpi_event_msg)
859 except Exception as e:
860 self.log.exception('failed-kpi-submission',
861 type=type(kpi_event_msg))
Stephane Barbarie52198b92017-03-02 13:44:46 -0500862
863 # ~~~~~~~~~~~~~~~~~~~ Handle alarm submissions ~~~~~~~~~~~~~~~~~~~~~
864
Stephane Barbariecc6b2e62017-03-02 14:35:55 -0500865 def create_alarm(self, id=None, resource_id=None, description=None,
866 raised_ts=0, changed_ts=0,
867 type=AlarmEventType.EQUIPMENT,
Stephane Barbariebf3e10c2017-03-03 10:15:58 -0500868 category=AlarmEventCategory.PON,
Stephane Barbariecc6b2e62017-03-02 14:35:55 -0500869 severity=AlarmEventSeverity.MINOR,
870 state=AlarmEventState.RAISED,
Stephane Barbarie52198b92017-03-02 13:44:46 -0500871 context=None):
872
873 # Construct the ID if it is not provided
874 if id == None:
875 id = 'voltha.{}.{}'.format(self.adapter_name, resource_id)
876
877 return AlarmEvent(
878 id=id,
879 resource_id=resource_id,
880 type=type,
881 category=category,
882 severity=severity,
883 state=state,
884 description=description,
885 reported_ts=arrow.utcnow().timestamp,
886 raised_ts=raised_ts,
887 changed_ts=changed_ts,
888 context=context
889 )
890
Stephane Barbarie4db8ca22017-04-24 10:30:20 -0400891 def filter_alarm(self, device_id, alarm_event):
892 alarm_filters = self.root_proxy.get('/alarm_filters')
893
894 rule_values = {
895 'id': alarm_event.id,
896 'type': AlarmEventType.AlarmEventType.Name(alarm_event.type),
897 'category': AlarmEventCategory.AlarmEventCategory.Name(alarm_event.category),
898 'severity': AlarmEventSeverity.AlarmEventSeverity.Name(alarm_event.severity),
899 'resource_id': alarm_event.resource_id,
900 'device_id': device_id
901 }
902
903 for alarm_filter in alarm_filters:
904 if alarm_filter.rules:
905 exclude = True
906 for rule in alarm_filter.rules:
907 self.log.debug("compare-alarm-event",
908 key=AlarmFilterRuleKey.AlarmFilterRuleKey.Name(rule.key),
Stephane Barbarie980a0912017-05-11 11:27:06 -0400909 actual=rule_values[AlarmFilterRuleKey.AlarmFilterRuleKey.Name(rule.key)].lower(),
Stephane Barbarie4db8ca22017-04-24 10:30:20 -0400910 expected=rule.value.lower())
911 exclude = exclude and \
912 (rule_values[AlarmFilterRuleKey.AlarmFilterRuleKey.Name(
Stephane Barbarie980a0912017-05-11 11:27:06 -0400913 rule.key)].lower() == rule.value.lower())
Stephane Barbarie4db8ca22017-04-24 10:30:20 -0400914 if not exclude:
915 break
916
917 if exclude:
918 self.log.info("filtered-alarm-event", alarm=alarm_event)
919 return True
920
921 return False
922
923 def submit_alarm(self, device_id, alarm_event_msg):
Stephane Barbarie52198b92017-03-02 13:44:46 -0500924 try:
925 assert isinstance(alarm_event_msg, AlarmEvent)
Stephane Barbarie4db8ca22017-04-24 10:30:20 -0400926 if not self.filter_alarm(device_id, alarm_event_msg):
927 self.event_bus.publish('alarms', alarm_event_msg)
Stephane Barbarie52198b92017-03-02 13:44:46 -0500928
929 except Exception as e:
930 self.log.exception('failed-alarm-submission',
931 type=type(alarm_event_msg))
Nikolay Titov89004ec2017-06-19 18:22:42 -0400932
933 # ~~~~~~~~~~~~~~~~~~~ Handle ONU detect ~~~~~~~~~~~~~~~~~~~~~
934
935 def _gen_onu_detect_proxy_address_topic(self, device_id):
936 """Generate unique topic name specific to this device id for onu detect"""
937 topic = str('onu_detect:{}'.format(device_id))
938 return topic
939
940 def register_for_onu_detect_state(self, device_id):
941 topic = self._gen_onu_detect_proxy_address_topic(device_id)
942 self._onu_detect_event_subscriptions[topic] = self.event_bus.subscribe(
943 topic,
944 lambda t, m: self._forward_onu_detect_state(device_id, m))
945
946 def unregister_for_onu_detect_state(self, device_id):
947 topic = self._gen_onu_detect_proxy_address_topic(device_id)
948 self.event_bus.unsubscribe(self._onu_detect_event_subscriptions[topic])
949 del self._onu_detect_event_subscriptions[topic]
950
951 def _forward_onu_detect_state(self, device_id, state):
952 self.adapter.receive_onu_detect_state(device_id, state)
953
954 def forward_onu_detect_state(self, device_id, state):
955 topic = self._gen_onu_detect_proxy_address_topic(device_id)
956 self.event_bus.publish(topic, state)