blob: 9fdc563ead116346c471d8a639c60743511f26e9 [file] [log] [blame]
Zsolt Haraszti66862032016-11-28 14:28:39 -08001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti66862032016-11-28 14:28:39 -08003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Agent to play gateway between CORE and an individual adapter.
19"""
20from uuid import uuid4
21
Stephane Barbarie4db8ca22017-04-24 10:30:20 -040022import arrow
Zsolt Haraszti66862032016-11-28 14:28:39 -080023import structlog
Zsolt Harasztief05ad22017-01-07 22:08:06 -080024from google.protobuf.json_format import MessageToJson
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -080025from scapy.packet import Packet
Zsolt Haraszti66862032016-11-28 14:28:39 -080026from twisted.internet.defer import inlineCallbacks, returnValue
27from zope.interface import implementer
28
Zsolt Haraszti89a27302016-12-08 16:53:06 -080029from common.event_bus import EventBusClient
Zsolt Harasztief05ad22017-01-07 22:08:06 -080030from common.frameio.frameio import hexify
khenaidooa8588f22017-06-16 12:13:34 -040031from common.utils.id_generation import create_cluster_logical_device_ids
Zsolt Haraszti66862032016-11-28 14:28:39 -080032from voltha.adapters.interface import IAdapterAgent
33from voltha.protos import third_party
Stephane Barbarie4db8ca22017-04-24 10:30:20 -040034from voltha.core.flow_decomposer import OUTPUT
Sergio Slobodriana2eb52b2017-03-07 12:24:46 -050035from voltha.protos.device_pb2 import Device, Port, PmConfigs
Stephane Barbarie4db8ca22017-04-24 10:30:20 -040036from voltha.protos.events_pb2 import AlarmEvent, AlarmEventType, \
37 AlarmEventSeverity, AlarmEventState, AlarmEventCategory
Sergio Slobodriana2eb52b2017-03-07 12:24:46 -050038from voltha.protos.events_pb2 import KpiEvent
Zsolt Haraszti66862032016-11-28 14:28:39 -080039from voltha.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
Stephane Barbarie4db8ca22017-04-24 10:30:20 -040040 LogicalPort, AdminState, OperStatus, AlarmFilterRuleKey
Zsolt Haraszti66862032016-11-28 14:28:39 -080041from voltha.registry import registry
khenaidoo08d48d22017-06-29 19:42:49 -040042from common.utils.id_generation import create_cluster_device_id
Zsolt Haraszti66862032016-11-28 14:28:39 -080043
Zsolt Haraszti66862032016-11-28 14:28:39 -080044@implementer(IAdapterAgent)
45class AdapterAgent(object):
46 """
47 Gate-keeper between CORE and device adapters.
48
49 On one side it interacts with Core's internal model and update/dispatch
50 mechanisms.
51
52 On the other side, it interacts with the adapters standard interface as
53 defined in
54 """
55
56 def __init__(self, adapter_name, adapter_cls):
57 self.adapter_name = adapter_name
58 self.adapter_cls = adapter_cls
59 self.core = registry('core')
60 self.adapter = None
61 self.adapter_node_proxy = None
62 self.root_proxy = self.core.get_proxy('/')
Zsolt Haraszti89a27302016-12-08 16:53:06 -080063 self._rx_event_subscriptions = {}
64 self._tx_event_subscriptions = {}
65 self.event_bus = EventBusClient()
Khen Nursimulud068d812017-03-06 11:44:18 -050066 self.packet_out_subscription = None
Zsolt Haraszti89a27302016-12-08 16:53:06 -080067 self.log = structlog.get_logger(adapter_name=adapter_name)
Nikolay Titov89004ec2017-06-19 18:22:42 -040068 self._onu_detect_event_subscriptions = {}
Zsolt Haraszti66862032016-11-28 14:28:39 -080069
70 @inlineCallbacks
71 def start(self):
Zsolt Haraszti89a27302016-12-08 16:53:06 -080072 self.log.debug('starting')
Zsolt Haraszti66862032016-11-28 14:28:39 -080073 config = self._get_adapter_config() # this may be None
Zsolt Haraszti89a27302016-12-08 16:53:06 -080074 try:
75 adapter = self.adapter_cls(self, config)
76 yield adapter.start()
Zsolt Haraszti656ecc62016-12-28 15:08:23 -080077 self.adapter = adapter
78 self.adapter_node_proxy = self._update_adapter_node()
79 self._update_device_types()
Zsolt Haraszti89a27302016-12-08 16:53:06 -080080 except Exception, e:
81 self.log.exception(e)
Zsolt Haraszti89a27302016-12-08 16:53:06 -080082 self.log.info('started')
Zsolt Haraszti66862032016-11-28 14:28:39 -080083 returnValue(self)
84
85 @inlineCallbacks
86 def stop(self):
Zsolt Haraszti89a27302016-12-08 16:53:06 -080087 self.log.debug('stopping')
Zsolt Haraszti66862032016-11-28 14:28:39 -080088 if self.adapter is not None:
89 yield self.adapter.stop()
90 self.adapter = None
Zsolt Haraszti89a27302016-12-08 16:53:06 -080091 self.log.info('stopped')
Zsolt Haraszti66862032016-11-28 14:28:39 -080092
93 def _get_adapter_config(self):
94 """
95 Opportunistically load persisted adapter configuration.
96 Return None if no configuration exists yet.
97 """
98 proxy = self.core.get_proxy('/')
99 try:
100 config = proxy.get('/adapters/' + self.adapter_name)
101 return config
102 except KeyError:
103 return None
104
105 def _update_adapter_node(self):
106 """
107 Creates or updates the adapter node object based on self
108 description from the adapter.
109 """
110
111 adapter_desc = self.adapter.adapter_descriptor()
112 assert adapter_desc.id == self.adapter_name
113 path = self._make_up_to_date(
114 '/adapters', self.adapter_name, adapter_desc)
115 return self.core.get_proxy(path)
116
117 def _update_device_types(self):
118 """
119 Make sure device types are registered in Core
120 """
121 device_types = self.adapter.device_types()
122 for device_type in device_types.items:
123 key = device_type.id
124 self._make_up_to_date('/device_types', key, device_type)
125
126 def _make_up_to_date(self, container_path, key, data):
127 full_path = container_path + '/' + str(key)
128 root_proxy = self.core.get_proxy('/')
129 try:
130 root_proxy.get(full_path)
131 root_proxy.update(full_path, data)
132 except KeyError:
133 root_proxy.add(container_path, data)
134 return full_path
135
Khen Nursimulud068d812017-03-06 11:44:18 -0500136 def _remove_node(self, container_path, key):
137 """
138 Remove a node from the data model
139 :param container_path: path to node
140 :param key: node
141 :return: None
142 """
143 full_path = container_path + '/' + str(key)
144 root_proxy = self.core.get_proxy('/')
145 try:
146 root_proxy.get(full_path)
147 root_proxy.remove(full_path)
148 except KeyError:
149 # Node does not exist
150 pass
151
Zsolt Haraszti66862032016-11-28 14:28:39 -0800152 # ~~~~~~~~~~~~~~~~~~~~~ Core-Facing Service ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
153
154 def adopt_device(self, device):
155 return self.adapter.adopt_device(device)
156
khenaidoo032d3302017-06-09 14:50:04 -0400157 def reconcile_device(self, device):
158 return self.adapter.reconcile_device(device)
159
Zsolt Haraszti66862032016-11-28 14:28:39 -0800160 def abandon_device(self, device):
161 return self.adapter.abandon_device(device)
162
Khen Nursimulud068d812017-03-06 11:44:18 -0500163 def disable_device(self, device):
164 return self.adapter.disable_device(device)
165
166 def reenable_device(self, device):
167 return self.adapter.reenable_device(device)
168
169 def reboot_device(self, device):
170 return self.adapter.reboot_device(device)
171
Lydia Fang01f2e852017-06-28 17:24:58 -0700172 def download_image(self, device, request):
173 return self.adapter.download_image(device, request)
174
175 def get_image_download_status(self, device, request):
176 return self.adapter.get_image_download_status(device, request)
177
178 def cancel_image_download(self, device, request):
179 return self.adapter.cancel_image_download(device, request)
180
181 def activate_image_update(self, device, request):
182 return self.adapter.activate_image_update(device, request)
183
184 def revert_image_update(self, device, request):
185 return self.adapter.revert_image_update(device, request)
186
sathishg5ae86222017-06-28 15:16:29 +0530187 def self_test(self, device):
188 return self.adapter.self_test_device(device)
189
Khen Nursimulud068d812017-03-06 11:44:18 -0500190 def delete_device(self, device):
191 return self.adapter.delete_device(device)
192
193 def get_device_details(self, device):
194 return self.adapter.get_device_details(device)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800195
Zsolt Harasztic5c5d102016-12-07 21:12:27 -0800196 def update_flows_bulk(self, device, flows, groups):
197 return self.adapter.update_flows_bulk(device, flows, groups)
198
199 def update_flows_incrementally(self, device, flow_changes, group_changes):
200 return self.update_flows_incrementally(
201 device, flow_changes, group_changes)
202
Stephane Barbarie980a0912017-05-11 11:27:06 -0400203 def suppress_alarm(self, filter):
204 return self.adapter.suppress_alarm(filter)
205
206 def unsuppress_alarm(self, filter):
207 return self.adapter.unsuppress_alarm(filter)
208
Stephane Barbarie4db8ca22017-04-24 10:30:20 -0400209 # def update_pm_collection(self, device, pm_collection_config):
Sergio Slobodriana2eb52b2017-03-07 12:24:46 -0500210 # return self.adapter.update_pm_collection(device, pm_collection_config)
211
Nikolay Titov89004ec2017-06-19 18:22:42 -0400212 def create_interface (self, device, data):
213 return self.adapter.create_interface (device, data)
214
215 def update_interface (self, device, data):
216 return self.adapter.update_interface (device, data)
217
218 def remove_interface(self, device, data):
219 return self.adapter.remove_interface(device, data)
220
Sergio Slobodriana2eb52b2017-03-07 12:24:46 -0500221
Zsolt Haraszti66862032016-11-28 14:28:39 -0800222 # ~~~~~~~~~~~~~~~~~~~ Adapter-Facing Service ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
223
224 def get_device(self, device_id):
225 return self.root_proxy.get('/devices/{}'.format(device_id))
226
Peter Shafik9107f2e2017-05-02 15:54:39 -0400227 def get_child_device(self, parent_device_id, **kwargs):
228 """
229 Retrieve a child device object belonging
230 to the specified parent device based on some match
231 criteria. The first child device that matches the
232 provided criteria is returned.
233 :param parent_device_id: parent's device id
234 :param **kwargs: arbitrary list of match criteria
235 :return: Child Device Object or None
236 """
237 # Get all arguments to be used for comparison
Nikolay Titov89004ec2017-06-19 18:22:42 -0400238 # Note that for now we are only matching on the ONU ID & SERIAL NUMBER
Peter Shafik9107f2e2017-05-02 15:54:39 -0400239 # Other matching fields can be added as required in the future
240 onu_id = kwargs.pop('onu_id', None)
Nikolay Titov89004ec2017-06-19 18:22:42 -0400241 serial_number = kwargs.pop('serial_number', None)
242 if onu_id is None and serial_number is None: return None
Peter Shafik9107f2e2017-05-02 15:54:39 -0400243
244 # Get all devices
245 devices = self.root_proxy.get('/devices')
246
247 # Get all child devices with the same parent ID
248 children_ids = set(d.id for d in devices if d.parent_id == parent_device_id)
249
250 # Loop through all the child devices with this parent ID
251 for child_id in children_ids:
252 found = True
253 device = self.get_device(child_id)
254
255 # Does this child device match the passed in ONU ID?
Nikolay Titov89004ec2017-06-19 18:22:42 -0400256 found_onu_id = False
257 if onu_id is not None:
258 if device.proxy_address.onu_id == onu_id:
259 found_onu_id = True
260
261 # Does this child device match the passed in SERIAL NUMBER?
262 found_serial_number = False
263 if serial_number is not None:
264 if device.serial_number == serial_number:
265 found_serial_number = True
266 # Match ONU ID and SERIAL NUMBER
267 if onu_id is not None and serial_number is not None:
268 found = found_onu_id & found_serial_number
269 # Otherwise ONU ID or SERIAL NUMBER
270 else:
271 found = found_onu_id | found_serial_number
Peter Shafik9107f2e2017-05-02 15:54:39 -0400272
273 # Return the matched child device
274 if found is True:
275 return device
276
277 return None
278
Zsolt Haraszti66862032016-11-28 14:28:39 -0800279 def add_device(self, device):
280 assert isinstance(device, Device)
281 self._make_up_to_date('/devices', device.id, device)
282
alshabibbe8ca2e2017-02-01 18:28:57 -0800283 # Ultimately, assign devices to device grpups.
284 # see https://jira.opencord.org/browse/CORD-838
Zsolt Haraszti66862032016-11-28 14:28:39 -0800285
286 dg = DeviceGroup(id='1')
287 self._make_up_to_date('/device_groups', dg.id, dg)
288
289 # add device to device group
alshabibbe8ca2e2017-02-01 18:28:57 -0800290 # see https://jira.opencord.org/browse/CORD-838
Zsolt Haraszti66862032016-11-28 14:28:39 -0800291
292 def update_device(self, device):
293 assert isinstance(device, Device)
294
295 # we run the update through the device_agent so that the change
296 # does not loop back to the adapter unnecessarily
297 device_agent = self.core.get_device_agent(device.id)
298 device_agent.update_device(device)
299
Sergio Slobodrian2db4c102017-03-09 22:29:23 -0500300 def update_device_pm_config(self, device_pm_config, init=False):
301 assert isinstance(device_pm_config, PmConfigs)
302
303 # we run the update through the device_agent so that the change
304 # does not loop back to the adapter unnecessarily
305 device_agent = self.core.get_device_agent(device_pm_config.id)
Stephane Barbarie4db8ca22017-04-24 10:30:20 -0400306 device_agent.update_device_pm_config(device_pm_config, init)
Sergio Slobodrian2db4c102017-03-09 22:29:23 -0500307
Sergio Slobodrian98eff412017-03-15 14:46:30 -0400308 def update_adapter_pm_config(self, device_id, device_pm_config):
309 device = self.get_device(device_id)
Sergio Slobodrian2db4c102017-03-09 22:29:23 -0500310 self.adapter.update_pm_config(device, device_pm_config)
311
Lydia Fang01f2e852017-06-28 17:24:58 -0700312 def update_image_download(self, img_dnld):
313 self.log.info('update-image-download', img_dnld=img_dnld)
314 try:
315 # we run the update through the device_agent so that the change
316 # does not loop back to the adapter unnecessarily
317 device_agent = self.core.get_device_agent(img_dnld.id)
318 device_agent.update_device_image_download(img_dnld)
319 except Exception as e:
320 self.log.exception(e.message)
321
322 def delete_image_download(self, img_dnld):
323 self.log.info('delete-image-download', img_dnld=img_dnld)
324 try:
325 root_proxy = self.core.get_proxy('/')
326 path = '/devices/{}/image_downloads/{}'.\
327 format(img_dnld.id, img_dnld.name)
328 root_proxy.get(path)
329 root_proxy.remove(path)
330 device_agent = self.core.get_device_agent(img_dnld.id)
331 device_agent.unregister_device_image_download(img_dnld.name)
332 except Exception as e:
333 self.log.exception(e.message)
334
Khen Nursimuluc60afa12017-03-13 14:33:50 -0400335 def _add_peer_reference(self, device_id, port):
Zsolt Haraszti66862032016-11-28 14:28:39 -0800336 # for referential integrity, add/augment references
337 port.device_id = device_id
338 me_as_peer = Port.PeerPort(device_id=device_id, port_no=port.port_no)
339 for peer in port.peers:
340 peer_port_path = '/devices/{}/ports/{}'.format(
341 peer.device_id, peer.port_no)
342 peer_port = self.root_proxy.get(peer_port_path)
343 if me_as_peer not in peer_port.peers:
344 new = peer_port.peers.add()
345 new.CopyFrom(me_as_peer)
346 self.root_proxy.update(peer_port_path, peer_port)
347
Khen Nursimuluc60afa12017-03-13 14:33:50 -0400348 def _del_peer_reference(self, device_id, port):
349 me_as_peer = Port.PeerPort(device_id=device_id, port_no=port.port_no)
350 for peer in port.peers:
351 peer_port_path = '/devices/{}/ports/{}'.format(
352 peer.device_id, peer.port_no)
353 peer_port = self.root_proxy.get(peer_port_path)
354 if me_as_peer in peer_port.peers:
355 peer_port.peers.remove(me_as_peer)
356 self.root_proxy.update(peer_port_path, peer_port)
357
358 def add_port(self, device_id, port):
359 assert isinstance(port, Port)
360
361 # for referential integrity, add/augment references
362 self._add_peer_reference(device_id, port)
363
364 # Add port
Zsolt Haraszti66862032016-11-28 14:28:39 -0800365 self._make_up_to_date('/devices/{}/ports'.format(device_id),
366 port.port_no, port)
367
khenaidoo032d3302017-06-09 14:50:04 -0400368 def get_ports(self, device_id, port_type):
369 # assert Port.PortType.DESCRIPTOR.values_by_name[port_type]
370 ports = self.root_proxy.get('/devices/{}/ports'.format(device_id))
371 return [p for p in ports if p.type == port_type]
372
373
Khen Nursimulud068d812017-03-06 11:44:18 -0500374 def disable_all_ports(self, device_id):
375 """
376 Disable all ports on that device, i.e. change the admin status to
Khen Nursimuluc60afa12017-03-13 14:33:50 -0400377 disable and operational status to UNKNOWN.
Khen Nursimulud068d812017-03-06 11:44:18 -0500378 :param device_id: device id
379 :return: None
380 """
381
382 # get all device ports
383 ports = self.root_proxy.get('/devices/{}/ports'.format(device_id))
384 for port in ports:
385 port.admin_state = AdminState.DISABLED
386 port.oper_status = OperStatus.UNKNOWN
387 self._make_up_to_date('/devices/{}/ports'.format(device_id),
388 port.port_no, port)
389
Khen Nursimuluc60afa12017-03-13 14:33:50 -0400390 def enable_all_ports(self, device_id):
Khen Nursimulud068d812017-03-06 11:44:18 -0500391 """
392 Re-enable all ports on that device, i.e. change the admin status to
393 enabled and operational status to ACTIVE
394 :param device_id: device id
395 :return: None
396 """
397
398 # get all device ports
399 ports = self.root_proxy.get('/devices/{}/ports'.format(device_id))
400 for port in ports:
401 port.admin_state = AdminState.ENABLED
402 port.oper_status = OperStatus.ACTIVE
403 self._make_up_to_date('/devices/{}/ports'.format(device_id),
404 port.port_no, port)
405
406 def delete_all_peer_references(self, device_id):
407 """
408 Remove all peer port references for that device
409 :param device_id: device_id of device
410 :return: None
411 """
412 ports = self.root_proxy.get('/devices/{}/ports'.format(device_id))
413 for port in ports:
414 port_path = '/devices/{}/ports/{}'.format(device_id, port.port_no)
415 for peer in port.peers:
416 port.peers.remove(peer)
417 self.root_proxy.update(port_path, port)
418
419 def delete_port_reference_from_parent(self, device_id, port):
420 """
421 Delete the port reference from the parent device
422 :param device_id: id of device containing the port
423 :param port: port to remove
424 :return: None
425 """
426 assert isinstance(port, Port)
427 self.log.info('delete-port-reference', device_id=device_id, port=port)
Khen Nursimuluc60afa12017-03-13 14:33:50 -0400428 self._del_peer_reference(device_id, port)
Khen Nursimulud068d812017-03-06 11:44:18 -0500429
Khen Nursimuluc60afa12017-03-13 14:33:50 -0400430 def add_port_reference_to_parent(self, device_id, port):
431 """
432 Add the port reference to the parent device
433 :param device_id: id of device containing the port
434 :param port: port to add
435 :return: None
436 """
437 assert isinstance(port, Port)
438 self.log.info('add-port-reference', device_id=device_id, port=port)
439 self._add_peer_reference(device_id, port)
Khen Nursimulud068d812017-03-06 11:44:18 -0500440
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800441 def _find_first_available_id(self):
442 logical_devices = self.root_proxy.get('/logical_devices')
443 existing_ids = set(ld.id for ld in logical_devices)
444 existing_datapath_ids = set(ld.datapath_id for ld in logical_devices)
khenaidooa8588f22017-06-16 12:13:34 -0400445 core_id = registry('core').core_store_id
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800446 i = 1
447 while True:
khenaidooa8588f22017-06-16 12:13:34 -0400448 ld_id, dp_id = create_cluster_logical_device_ids(core_id, i)
449 if dp_id not in existing_datapath_ids and ld_id not in existing_ids:
450 return ld_id, dp_id
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800451 i += 1
452
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800453 def get_logical_device(self, logical_device_id):
454 return self.root_proxy.get('/logical_devices/{}'.format(
455 logical_device_id))
456
Khen Nursimulud068d812017-03-06 11:44:18 -0500457 def get_logical_port(self, logical_device_id, port_id):
458 return self.root_proxy.get('/logical_devices/{}/ports/{}'.format(
459 logical_device_id, port_id))
460
Zsolt Haraszti66862032016-11-28 14:28:39 -0800461 def create_logical_device(self, logical_device):
462 assert isinstance(logical_device, LogicalDevice)
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800463
464 if not logical_device.id:
khenaidooa8588f22017-06-16 12:13:34 -0400465 ld_id, dp_id = self._find_first_available_id()
466 logical_device.id = ld_id
467 logical_device.datapath_id = dp_id
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800468
Zsolt Haraszti66862032016-11-28 14:28:39 -0800469 self._make_up_to_date('/logical_devices',
470 logical_device.id, logical_device)
471
Khen Nursimulud068d812017-03-06 11:44:18 -0500472 # Keep a reference to the packet out subscription as it will be
473 # referred during removal
474 self.packet_out_subscription = self.event_bus.subscribe(
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800475 topic='packet-out:{}'.format(logical_device.id),
476 callback=lambda _, p: self.receive_packet_out(logical_device.id, p)
477 )
478
Zsolt Harasztid036b7e2016-12-23 15:36:01 -0800479 return logical_device
480
khenaidoo032d3302017-06-09 14:50:04 -0400481 def reconcile_logical_device(self, logical_device_id):
482 """
483 This is called by the adapter to reconcile the physical device with
484 the logical device. For now, we only set the packet-out subscription
485 :param logical_device_id:
486 :return:
487 """
488 # Keep a reference to the packet out subscription as it will be
489 # referred during removal
490 self.packet_out_subscription = self.event_bus.subscribe(
491 topic='packet-out:{}'.format(logical_device_id),
492 callback=lambda _, p: self.receive_packet_out(logical_device_id, p)
493 )
494
495
Khen Nursimulud068d812017-03-06 11:44:18 -0500496 def delete_logical_device(self, logical_device):
497 """
498 This will remove the logical device as well as all logical ports
499 associated with it
500 :param logical_device: The logical device to remove
501 :return: None
502 """
503 assert isinstance(logical_device, LogicalDevice)
504
505 # Remove packet out subscription
506 self.event_bus.unsubscribe(self.packet_out_subscription)
507
508 # Remove node from the data model - this will trigger the logical
509 # device 'remove callbacks' as well as logical ports 'remove
510 # callbacks' if present
511 self._remove_node('/logical_devices', logical_device.id)
512
Zsolt Haraszti656ecc62016-12-28 15:08:23 -0800513 def receive_packet_out(self, logical_device_id, ofp_packet_out):
514
515 def get_port_out(opo):
516 for action in opo.actions:
517 if action.type == OUTPUT:
518 return action.output.port
519
520 out_port = get_port_out(ofp_packet_out)
521 frame = ofp_packet_out.data
522 self.adapter.receive_packet_out(logical_device_id, out_port, frame)
523
Zsolt Haraszti66862032016-11-28 14:28:39 -0800524 def add_logical_port(self, logical_device_id, port):
525 assert isinstance(port, LogicalPort)
526 self._make_up_to_date(
527 '/logical_devices/{}/ports'.format(logical_device_id),
528 port.id, port)
529
Khen Nursimulud068d812017-03-06 11:44:18 -0500530 def delete_logical_port(self, logical_device_id, port):
531 assert isinstance(port, LogicalPort)
532 self._remove_node('/logical_devices/{}/ports'.format(
533 logical_device_id), port.id)
534
535 def update_logical_port(self, logical_device_id, port):
536 assert isinstance(port, LogicalPort)
537 self.log.debug('update-logical-port',
538 logical_device_id=logical_device_id,
539 port=port)
540
541 self._make_up_to_date(
542 '/logical_devices/{}/ports'.format(logical_device_id),
543 port.id, port)
544
khenaidoo032d3302017-06-09 14:50:04 -0400545 def get_child_devices(self, parent_device_id):
546 try:
547 devices = self.root_proxy.get('/devices')
548 children = [d for d in devices if d.parent_id == parent_device_id]
549 return children
550 except Exception, e:
551 self.log.exception('failure', e=e)
552
553 def subscribe_to_proxy_child_messages(self, proxy_address):
554 topic = self._gen_tx_proxy_address_topic(proxy_address)
555 self._tx_event_subscriptions[topic] = self.event_bus.subscribe(
556 topic, lambda t, m: self._send_proxied_message(proxy_address, m))
557
558 def reconcile_child_devices(self, parent_device_id):
559 children = self.get_child_devices(parent_device_id)
560 for child in children:
561 # First subscribe to proxy messages from a chile device
562 self.subscribe_to_proxy_child_messages(child.proxy_address)
563
564 # Then trigger the reconciliation of the existing child device
565 device_agent = self.core.get_device_agent(child.id)
566 device_agent.reconcile_existing_device(child)
567
568
Niren R Chidrawarefcebcd2017-07-19 20:03:39 -0400569 #Obselete API - discouraged to be decommissioned after
570 #adapters are align to new APIs
Zsolt Haraszti66862032016-11-28 14:28:39 -0800571 def child_device_detected(self,
572 parent_device_id,
573 parent_port_no,
574 child_device_type,
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800575 proxy_address,
Nikolay Titov89004ec2017-06-19 18:22:42 -0400576 admin_state,
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800577 **kw):
Zsolt Haraszti66862032016-11-28 14:28:39 -0800578 # we create new ONU device objects and insert them into the config
Zsolt Haraszti66862032016-11-28 14:28:39 -0800579 device = Device(
khenaidoo08d48d22017-06-29 19:42:49 -0400580 id=create_cluster_device_id(self.core.core_store_id),
581 # id=uuid4().hex[:12],
Zsolt Haraszti66862032016-11-28 14:28:39 -0800582 type=child_device_type,
583 parent_id=parent_device_id,
584 parent_port_no=parent_port_no,
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800585 proxy_address=proxy_address,
Nikolay Titov89004ec2017-06-19 18:22:42 -0400586 admin_state=admin_state,
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800587 **kw
Zsolt Haraszti66862032016-11-28 14:28:39 -0800588 )
589 self._make_up_to_date(
590 '/devices', device.id, device)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800591
592 topic = self._gen_tx_proxy_address_topic(proxy_address)
593 self._tx_event_subscriptions[topic] = self.event_bus.subscribe(
594 topic, lambda t, m: self._send_proxied_message(proxy_address, m))
595
Niren R Chidrawarefcebcd2017-07-19 20:03:39 -0400596 def add_onu_device(self,
597 parent_device_id,
598 parent_port_no,
599 vendor_id,
600 proxy_address,
601 admin_state,
602 **kw):
603 device_type = next((dt for dt in self.root_proxy.get('/device_types')
604 if dt.vendor_id == vendor_id), None)
605 # we create new ONU device objects and insert them into the config
606 device = Device(
607 id=create_cluster_device_id(self.core.core_store_id),
608 # id=uuid4().hex[:12],
609 type=device_type.id,
610 vendor_id=vendor_id,
611 parent_id=parent_device_id,
612 parent_port_no=parent_port_no,
613 proxy_address=proxy_address,
614 admin_state=admin_state,
615 adapter=device_type.adapter,
616 **kw
617 )
618 self._make_up_to_date('/devices', device.id, device)
619
620 topic = self._gen_tx_proxy_address_topic(proxy_address)
621 self._tx_event_subscriptions[topic] = self.event_bus.subscribe(
622 topic, lambda t, m: self._send_proxied_message(proxy_address, m))
623
khenaidoo032d3302017-06-09 14:50:04 -0400624 def get_child_device_with_proxy_address(self, proxy_address):
625 # Proxy address is defined as {parent id, channel_id}
626 devices = self.root_proxy.get('/devices')
627 children_ids = set(d.id for d in devices if d.parent_id ==
628 proxy_address.device_id)
629 for child_id in children_ids:
630 device = self.get_device(child_id)
631 if device.proxy_address == proxy_address:
632 return device
633
Khen Nursimulud068d812017-03-06 11:44:18 -0500634 def remove_all_logical_ports(self, logical_device_id):
635 """ Remove all logical ports from a given logical device"""
636 ports = self.root_proxy.get('/logical_devices/{}/ports')
637 for port in ports:
638 self._remove_node('/logical_devices/{}/ports', port.id)
639
640 def delete_all_child_devices(self, parent_device_id):
641 """ Remove all ONUs from a given OLT """
642 devices = self.root_proxy.get('/devices')
643 children_ids = set(d.id for d in devices if d.parent_id == parent_device_id)
644 self.log.debug('devices-to-delete',
645 parent_id=parent_device_id,
646 children_ids=children_ids)
647 for child_id in children_ids:
648 self._remove_node('/devices', child_id)
649
khenaidoo2d7af132017-03-23 15:45:51 -0400650 def update_child_devices_state(self,
651 parent_device_id,
652 oper_status=None,
653 connect_status=None,
654 admin_state=None):
655 """ Update status of all child devices """
Khen Nursimulud068d812017-03-06 11:44:18 -0500656 devices = self.root_proxy.get('/devices')
657 children_ids = set(d.id for d in devices if d.parent_id == parent_device_id)
khenaidoo2d7af132017-03-23 15:45:51 -0400658 self.log.debug('update-devices',
Khen Nursimulud068d812017-03-06 11:44:18 -0500659 parent_id=parent_device_id,
khenaidoo2d7af132017-03-23 15:45:51 -0400660 children_ids=children_ids,
661 oper_status=oper_status,
662 connect_status=connect_status,
663 admin_state=admin_state)
Khen Nursimulud068d812017-03-06 11:44:18 -0500664
khenaidoo71d0a6c2017-03-22 21:46:04 -0400665 for child_id in children_ids:
666 device = self.get_device(child_id)
khenaidoo2d7af132017-03-23 15:45:51 -0400667 if oper_status:
668 device.oper_status = oper_status
669 if connect_status:
670 device.connect_status = connect_status
671 if admin_state:
672 device.admin_state = admin_state
khenaidoo71d0a6c2017-03-22 21:46:04 -0400673 self._make_up_to_date(
674 '/devices', device.id, device)
675
Nikolay Titov89004ec2017-06-19 18:22:42 -0400676 def delete_child_device(self, parent_device_id, child_device_id):
677 onu_device = self.root_proxy.get('/devices/{}'.format(child_device_id))
678 if onu_device is not None:
679 if onu_device.parent_id == parent_device_id:
680 self.log.debug('deleting-child-device', parent_device_id=parent_device_id, child_device_id=child_device_id)
681 self._remove_node('/devices', child_device_id)
682
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800683 def _gen_rx_proxy_address_topic(self, proxy_address):
684 """Generate unique topic name specific to this proxy address for rx"""
Zsolt Harasztief05ad22017-01-07 22:08:06 -0800685 topic = 'rx:' + MessageToJson(proxy_address)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800686 return topic
687
688 def _gen_tx_proxy_address_topic(self, proxy_address):
689 """Generate unique topic name specific to this proxy address for tx"""
Zsolt Harasztief05ad22017-01-07 22:08:06 -0800690 topic = 'tx:' + MessageToJson(proxy_address)
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800691 return topic
692
693 def register_for_proxied_messages(self, proxy_address):
694 topic = self._gen_rx_proxy_address_topic(proxy_address)
695 self._rx_event_subscriptions[topic] = self.event_bus.subscribe(
Stephane Barbariecc6b2e62017-03-02 14:35:55 -0500696 topic,
697 lambda t, m: self._receive_proxied_message(proxy_address, m))
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800698
Khen Nursimulud068d812017-03-06 11:44:18 -0500699 def unregister_for_proxied_messages(self, proxy_address):
700 topic = self._gen_rx_proxy_address_topic(proxy_address)
701 self.event_bus.unsubscribe(self._rx_event_subscriptions[topic])
702 del self._rx_event_subscriptions[topic]
703
Zsolt Haraszti89a27302016-12-08 16:53:06 -0800704 def _receive_proxied_message(self, proxy_address, msg):
705 self.adapter.receive_proxied_message(proxy_address, msg)
706
707 def send_proxied_message(self, proxy_address, msg):
708 topic = self._gen_tx_proxy_address_topic(proxy_address)
709 self.event_bus.publish(topic, msg)
710
711 def _send_proxied_message(self, proxy_address, msg):
712 self.adapter.send_proxied_message(proxy_address, msg)
713
714 def receive_proxied_message(self, proxy_address, msg):
715 topic = self._gen_rx_proxy_address_topic(proxy_address)
716 self.event_bus.publish(topic, msg)
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800717
Peter Shafik9107f2e2017-05-02 15:54:39 -0400718 def register_for_inter_adapter_messages(self):
719 self.event_bus.subscribe(self.adapter_name,
720 lambda t, m: self.adapter.receive_inter_adapter_message(m))
721
722 def unregister_for_inter_adapter_messages(self):
723 self.event_bus.unsubscribe(self.adapter_name)
724
725 def publish_inter_adapter_message(self, device_id, msg):
726 # Get the device from the device_id
727 device = self.get_device(device_id)
728 assert device is not None
729
730 # Publish a message to the adapter that is responsible
731 # for managing this device
732 self.event_bus.publish(device.type, msg)
733
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800734 # ~~~~~~~~~~~~~~~~~~ Handling packet-in and packet-out ~~~~~~~~~~~~~~~~~~~~
735
736 def send_packet_in(self, logical_device_id, logical_port_no, packet):
737 self.log.debug('send-packet-in', logical_device_id=logical_device_id,
Zsolt Harasztief05ad22017-01-07 22:08:06 -0800738 logical_port_no=logical_port_no, packet=hexify(packet))
Zsolt Haraszti8925d1f2016-12-21 00:45:19 -0800739
740 if isinstance(packet, Packet):
741 packet = str(packet)
742
743 topic = 'packet-in:' + logical_device_id
744 self.event_bus.publish(topic, (logical_port_no, packet))
Zsolt Haraszti749b0952017-01-18 09:02:35 -0800745
746 # ~~~~~~~~~~~~~~~~~~~ Handling KPI metric submissions ~~~~~~~~~~~~~~~~~~~~~
Zsolt Harasztic5f740b2017-01-18 09:53:17 -0800747
Zsolt Haraszti749b0952017-01-18 09:02:35 -0800748 def submit_kpis(self, kpi_event_msg):
749 try:
750 assert isinstance(kpi_event_msg, KpiEvent)
751 self.event_bus.publish('kpis', kpi_event_msg)
752 except Exception as e:
753 self.log.exception('failed-kpi-submission',
754 type=type(kpi_event_msg))
Stephane Barbarie52198b92017-03-02 13:44:46 -0500755
756 # ~~~~~~~~~~~~~~~~~~~ Handle alarm submissions ~~~~~~~~~~~~~~~~~~~~~
757
Stephane Barbariecc6b2e62017-03-02 14:35:55 -0500758 def create_alarm(self, id=None, resource_id=None, description=None,
759 raised_ts=0, changed_ts=0,
760 type=AlarmEventType.EQUIPMENT,
Stephane Barbariebf3e10c2017-03-03 10:15:58 -0500761 category=AlarmEventCategory.PON,
Stephane Barbariecc6b2e62017-03-02 14:35:55 -0500762 severity=AlarmEventSeverity.MINOR,
763 state=AlarmEventState.RAISED,
Stephane Barbarie52198b92017-03-02 13:44:46 -0500764 context=None):
765
766 # Construct the ID if it is not provided
767 if id == None:
768 id = 'voltha.{}.{}'.format(self.adapter_name, resource_id)
769
770 return AlarmEvent(
771 id=id,
772 resource_id=resource_id,
773 type=type,
774 category=category,
775 severity=severity,
776 state=state,
777 description=description,
778 reported_ts=arrow.utcnow().timestamp,
779 raised_ts=raised_ts,
780 changed_ts=changed_ts,
781 context=context
782 )
783
Stephane Barbarie4db8ca22017-04-24 10:30:20 -0400784 def filter_alarm(self, device_id, alarm_event):
785 alarm_filters = self.root_proxy.get('/alarm_filters')
786
787 rule_values = {
788 'id': alarm_event.id,
789 'type': AlarmEventType.AlarmEventType.Name(alarm_event.type),
790 'category': AlarmEventCategory.AlarmEventCategory.Name(alarm_event.category),
791 'severity': AlarmEventSeverity.AlarmEventSeverity.Name(alarm_event.severity),
792 'resource_id': alarm_event.resource_id,
793 'device_id': device_id
794 }
795
796 for alarm_filter in alarm_filters:
797 if alarm_filter.rules:
798 exclude = True
799 for rule in alarm_filter.rules:
800 self.log.debug("compare-alarm-event",
801 key=AlarmFilterRuleKey.AlarmFilterRuleKey.Name(rule.key),
Stephane Barbarie980a0912017-05-11 11:27:06 -0400802 actual=rule_values[AlarmFilterRuleKey.AlarmFilterRuleKey.Name(rule.key)].lower(),
Stephane Barbarie4db8ca22017-04-24 10:30:20 -0400803 expected=rule.value.lower())
804 exclude = exclude and \
805 (rule_values[AlarmFilterRuleKey.AlarmFilterRuleKey.Name(
Stephane Barbarie980a0912017-05-11 11:27:06 -0400806 rule.key)].lower() == rule.value.lower())
Stephane Barbarie4db8ca22017-04-24 10:30:20 -0400807 if not exclude:
808 break
809
810 if exclude:
811 self.log.info("filtered-alarm-event", alarm=alarm_event)
812 return True
813
814 return False
815
816 def submit_alarm(self, device_id, alarm_event_msg):
Stephane Barbarie52198b92017-03-02 13:44:46 -0500817 try:
818 assert isinstance(alarm_event_msg, AlarmEvent)
Stephane Barbarie4db8ca22017-04-24 10:30:20 -0400819 if not self.filter_alarm(device_id, alarm_event_msg):
820 self.event_bus.publish('alarms', alarm_event_msg)
Stephane Barbarie52198b92017-03-02 13:44:46 -0500821
822 except Exception as e:
823 self.log.exception('failed-alarm-submission',
824 type=type(alarm_event_msg))
Nikolay Titov89004ec2017-06-19 18:22:42 -0400825
826 # ~~~~~~~~~~~~~~~~~~~ Handle ONU detect ~~~~~~~~~~~~~~~~~~~~~
827
828 def _gen_onu_detect_proxy_address_topic(self, device_id):
829 """Generate unique topic name specific to this device id for onu detect"""
830 topic = str('onu_detect:{}'.format(device_id))
831 return topic
832
833 def register_for_onu_detect_state(self, device_id):
834 topic = self._gen_onu_detect_proxy_address_topic(device_id)
835 self._onu_detect_event_subscriptions[topic] = self.event_bus.subscribe(
836 topic,
837 lambda t, m: self._forward_onu_detect_state(device_id, m))
838
839 def unregister_for_onu_detect_state(self, device_id):
840 topic = self._gen_onu_detect_proxy_address_topic(device_id)
841 self.event_bus.unsubscribe(self._onu_detect_event_subscriptions[topic])
842 del self._onu_detect_event_subscriptions[topic]
843
844 def _forward_onu_detect_state(self, device_id, state):
845 self.adapter.receive_onu_detect_state(device_id, state)
846
847 def forward_onu_detect_state(self, device_id, state):
848 topic = self._gen_onu_detect_proxy_address_topic(device_id)
849 self.event_bus.publish(topic, state)