blob: 73e145b4eb1dd1a45a9d7e76b00b5bec1fb4e73f [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#
2# Copyright 2018 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import structlog
17import arrow
18from transitions import Machine
19from datetime import datetime, timedelta
20from random import uniform, shuffle
21from twisted.internet import reactor
22from common.utils.indexpool import IndexPool
23from voltha.protos.omci_mib_db_pb2 import OpenOmciEventType
24from voltha.extensions.omci.omci_defs import EntityOperations, ReasonCodes
25from voltha.extensions.omci.omci_cc import OmciCCRxEvents, OMCI_CC, TX_REQUEST_KEY, \
26 RX_RESPONSE_KEY
27from voltha.extensions.omci.database.mib_db_api import ATTRIBUTES_KEY
28from voltha.extensions.omci.tasks.omci_get_request import OmciGetRequest
29from voltha.extensions.omci.omci_entities import MacBridgePortConfigurationData
30from voltha.extensions.omci.omci_entities import EthernetPMMonitoringHistoryData, \
31 FecPerformanceMonitoringHistoryData, \
32 XgPonTcPerformanceMonitoringHistoryData, \
33 XgPonDownstreamPerformanceMonitoringHistoryData, \
34 XgPonUpstreamPerformanceMonitoringHistoryData, \
35 EthernetFrameUpstreamPerformanceMonitoringHistoryData, \
36 EthernetFrameDownstreamPerformanceMonitoringHistoryData, \
37 EthernetFrameExtendedPerformanceMonitoring, \
38 EthernetFrameExtendedPerformanceMonitoring64Bit, AniG
39
40
41RxEvent = OmciCCRxEvents
42OP = EntityOperations
43RC = ReasonCodes
44
45
46class PerformanceIntervals(object):
47 """
48 OpenOMCI ONU Performance Monitoring Intervals State machine
49
50 This state machine focuses on L2 Internet Data Service and Classical
51 PM (for the v2.0 release).
52 """
53 DEFAULT_STATES = ['disabled', 'starting', 'synchronize_time', 'idle', 'create_pm_me',
54 'collect_data', 'threshold_exceeded']
55
56 DEFAULT_TRANSITIONS = [
57 {'trigger': 'start', 'source': 'disabled', 'dest': 'starting'},
58 {'trigger': 'tick', 'source': 'starting', 'dest': 'synchronize_time'},
59
60 {'trigger': 'success', 'source': 'synchronize_time', 'dest': 'idle'},
61 {'trigger': 'failure', 'source': 'synchronize_time', 'dest': 'synchronize_time'},
62
63 {'trigger': 'tick', 'source': 'idle', 'dest': 'collect_data'},
64 {'trigger': 'add_me', 'source': 'idle', 'dest': 'create_pm_me'},
65 {'trigger': 'delete_me', 'source': 'idle', 'dest': 'delete_pm_me'},
66
67 {'trigger': 'success', 'source': 'create_pm_me', 'dest': 'idle'},
68 {'trigger': 'failure', 'source': 'create_pm_me', 'dest': 'idle'},
69
70 {'trigger': 'success', 'source': 'delete_pm_me', 'dest': 'idle'},
71 {'trigger': 'failure', 'source': 'delete_pm_me', 'dest': 'idle'},
72
73 {'trigger': 'success', 'source': 'collect_data', 'dest': 'idle'},
74 {'trigger': 'failure', 'source': 'collect_data', 'dest': 'idle'},
75
76 # TODO: Add rebooted event transitions to disabled or synchronize_time
77 # TODO: Need to capture Threshold Crossing Alarms appropriately
78
79 # Do wildcard 'stop' trigger last so it covers all previous states
80 {'trigger': 'stop', 'source': '*', 'dest': 'disabled'},
81 {'trigger': 'reboot', 'source': '*', 'dest': 'rebooted'},
82 ]
83 DEFAULT_RETRY = 10 # Seconds to delay after task failure/timeout/poll
84 DEFAULT_TICK_DELAY = 15 # Seconds between checks for collection tick
85 DEFAULT_INTERVAL_SKEW = 10 * 60 # Seconds to skew past interval boundary
86 DEFAULT_COLLECT_ATTEMPTS = 3 # Maximum number of collection fetch attempts
87 DEFAULT_CREATE_ATTEMPTS = 15 # Maximum number of attempts to create a PM Managed Entities
88
89 def __init__(self, agent, device_id, tasks,
90 advertise_events=False,
91 states=DEFAULT_STATES,
92 transitions=DEFAULT_TRANSITIONS,
93 initial_state='disabled',
94 timeout_delay=DEFAULT_RETRY,
95 tick_delay=DEFAULT_TICK_DELAY,
96 interval_skew=DEFAULT_INTERVAL_SKEW,
97 collect_attempts=DEFAULT_COLLECT_ATTEMPTS,
98 create_attempts=DEFAULT_CREATE_ATTEMPTS):
99 """
100 Class initialization
101
102 :param agent: (OpenOmciAgent) Agent
103 :param device_id: (str) ONU Device ID
104 :param tasks: (dict) Tasks to run
105 :param advertise_events: (bool) Advertise events on OpenOMCI Event Bus
106 :param states: (list) List of valid states
107 :param transitions: (dict) Dictionary of triggers and state changes
108 :param initial_state: (str) Initial state machine state
109 :param timeout_delay: (int/float) Number of seconds after a timeout to pause
110 :param tick_delay: (int/float) Collection poll check delay while idle
111 :param interval_skew: (int/float) Seconds to randomly skew the next interval
112 collection to spread out requests for PM intervals
113 :param collect_attempts: (int) Max requests for a single PM interval before fail
114 :param create_attempts: (int) Max attempts to create PM Managed entities before stopping state machine
115 """
116 self.log = structlog.get_logger(device_id=device_id)
117
118 self._agent = agent
119 self._device_id = device_id
120 self._device = None
121 self._pm_config = None
122 self._timeout_delay = timeout_delay
123 self._tick_delay = tick_delay
124 self._interval_skew = interval_skew
125 self._collect_attempts = collect_attempts
126 self._create_attempts = create_attempts
127
128 self._sync_time_task = tasks['sync-time']
129 self._get_interval_task = tasks['collect-data']
130 self._create_pm_task = tasks['create-pm']
131 self._delete_pm_task = tasks['delete-pm']
132 self._advertise_events = advertise_events
133
134 self._omci_cc_subscriptions = { # RxEvent.enum -> Subscription Object
135 RxEvent.MIB_Reset: None,
136 RxEvent.Create: None,
137 RxEvent.Delete: None
138 }
139 self._omci_cc_sub_mapping = {
140 RxEvent.MIB_Reset: self.on_mib_reset_response,
141 RxEvent.Create: self.on_create_response,
142 RxEvent.Delete: self.on_delete_response,
143 }
144 self._me_watch_list = {
145 MacBridgePortConfigurationData.class_id: {
146 'create-delete': self.add_remove_enet_frame_pm,
147 'instances': dict() # BP entity_id -> (PM class_id, PM entity_id)
148 }
149 }
150 self._deferred = None
151 self._task_deferred = None
152 self._current_task = None
153 self._add_me_deferred = None
154 self._delete_me_deferred = None
155 self._next_interval = None
156 self._enet_entity_id = IndexPool(1024, 1)
157 self._add_pm_me_retry = 0
158
159 # (Class ID, Instance ID) -> Collect attempts remaining
160 self._pm_me_collect_retries = dict()
161 self._pm_me_extended_info = dict()
162 self._add_pm_me = dict() # (pm cid, pm eid) -> (me cid, me eid, upstream)
163 self._del_pm_me = set()
164
165 # Pollable PM items
166 # Note that some items the KPI extracts are not listed below. These are the
167 # administrative states, operational states, and sensed ethernet type. The values
168 # in the MIB database should be accurate for these items.
169
170 self._ani_g_items = ["optical_signal_level", "transmit_optical_level"]
171 self._next_poll_time = datetime.utcnow()
172 self._poll_interval = 60 # TODO: Fixed at once a minute
173
174 # Statistics and attributes
175 # TODO: add any others if it will support problem diagnosis
176
177 # Set up state machine to manage states
178 self.machine = Machine(model=self, states=states,
179 transitions=transitions,
180 initial=initial_state,
181 queued=True,
182 ignore_invalid_triggers=True,
183 name='{}-{}'.format(self.__class__.__name__,
184 device_id))
185 try:
186 import logging
187 logging.getLogger('transitions').setLevel(logging.WARNING)
188 except Exception as e:
189 self.log.exception('log-level-failed', e=e)
190
191
192 def _cancel_deferred(self):
193 d1, self._deferred = self._deferred, None
194 d2, self._task_deferred = self._task_deferred, None
195 d3, self._add_me_deferred = self._add_me_deferred, None
196 d4, self._delete_me_deferred = self._delete_me_deferred, None
197
198 for d in [d1, d2, d3, d4]:
199 try:
200 if d is not None and not d.called:
201 d.cancel()
202 except:
203 pass
204
205 def _cancel_tasks(self):
206 task, self._current_task = self._current_task, None
207 if task is not None:
208 task.stop()
209
210 def __str__(self):
211 return 'PerformanceIntervals: Device ID: {}, State:{}'.format(self._device_id,
212 self.state)
213
214 def delete(self):
215 """
216 Cleanup any state information
217 """
218 self.stop()
219
220 @property
221 def device_id(self):
222 return self._device_id
223
224 @property
225 def advertise_events(self):
226 return self._advertise_events
227
228 @advertise_events.setter
229 def advertise_events(self, value):
230 if not isinstance(value, bool):
231 raise TypeError('Advertise event is a boolean')
232 self._advertise_events = value
233
234 def advertise(self, event, info):
235 """Advertise an event on the OpenOMCI event bus"""
236 if self._advertise_events:
237 self._agent.advertise(event,
238 {
239 'state-machine': self.machine.name,
240 'info': info,
241 'time': str(datetime.utcnow()),
242 'next': str(self._next_interval)
243 })
244
245 def set_pm_config(self, pm_config):
246 """
247 Set PM interval configuration
248
249 :param pm_config: (OnuPmIntervalMetrics) PM Interval configuration
250 :return:
251 """
252 self._pm_config = pm_config
253
254 def _me_is_supported(self, class_id):
255 """
256 Check to see if ONU supports this ME
257 :param class_id: (int) ME Class ID
258 :return: (bool) If ME is supported
259 """
260 #
261 supported = self._device.omci_capabilities.supported_managed_entities
262 return class_id in supported if supported is not None else False
263
264 def add_pm_me(self, pm_class_id, pm_entity_id, cid=0, eid=0, upstream=False):
265 """
266 Add a new Performance Monitoring ME.
267
268 The ME ID will be added to an internal list and will be added the next
269 time the idle state is reached. An 'add_pm_me' trigger will be raised in
270 case already in the Idle state.
271
272 :param pm_class_id: (int) ME Class ID (1..0xFFFE)
273 :param pm_entity_id: (int) Instance ID (1..0xFFFE)
274 :param cid: (int) Class ID of entity monitored, may be None
275 :param eid: (int) Instance ID of entity monitored, may be None
276 :param upstream: (bool): Flag indicating if PM is for upstream traffic
277 """
278 if not isinstance(pm_class_id, int):
279 raise TypeError('PM ME Instance ID is an integer')
280 if not 0 < pm_class_id < 0xFFFF:
281 raise ValueError('PM ME Instance ID must be 1..65534')
282
283 # Check to see if ONU supports this ME
284 if not self._me_is_supported(pm_class_id):
285 self.log.warn('unsupported-PM-me', class_id=pm_class_id)
286 return
287
288 key = (pm_class_id, pm_entity_id)
289 entry = (cid, eid, upstream)
290
291 if key not in self._pm_me_collect_retries and key not in self._add_pm_me:
292 self._add_pm_me[key] = entry
293
294 if self._add_me_deferred is None:
295 self._add_me_deferred = reactor.callLater(0, self.add_me)
296
297 if (pm_class_id, pm_entity_id) in self._del_pm_me:
298 self._del_pm_me.remove((pm_class_id, pm_entity_id))
299
300 def delete_pm_me(self, class_id, entity_id):
301 """
302 Remove a new Performance Monitoring ME.
303
304 The ME ID will be added to an internal list and will be removed the next
305 time the idle state is reached. An 'delete_pm_me' trigger will be raised in
306 case already in the Idle state.
307
308 :param class_id: (int) ME Class ID (1..0xFFFE)
309 :param entity_id: (int) Instance ID (1..0xFFFE)
310 """
311 if not isinstance(class_id, int):
312 raise TypeError('PM ME Class ID is an integer')
313 if not 0 < class_id < 0xFFFF:
314 raise ValueError('PM ME Class ID must be 1..65534')
315
316 # Check to see if ONU supports this ME
317 if not self._me_is_supported(class_id):
318 self.log.warn('unsupported-PM-me', class_id=class_id)
319 return
320
321 key = (class_id, entity_id)
322
323 if key in self._pm_me_collect_retries and key not in self._del_pm_me:
324 self._del_pm_me.add(key)
325
326 if self._delete_me_deferred is None:
327 self._delete_me_deferred = reactor.callLater(0, self.delete_me)
328
329 if key in self._add_pm_me:
330 self._add_pm_me.pop(key)
331
332 def on_enter_disabled(self):
333 """
334 State machine is being stopped
335 """
336 self.advertise(OpenOmciEventType.state_change, self.state)
337 self._cancel_deferred()
338 self._cancel_tasks()
339 self._next_interval = None
340
341 # Drop OMCI ME Response subscriptions
342 for event, sub in self._omci_cc_subscriptions.iteritems():
343 if sub is not None:
344 self._omci_cc_subscriptions[event] = None
345 self._device.omci_cc.event_bus.unsubscribe(sub)
346
347 # Manually remove ani ANI/PON and UNI PM interval MEs
348 config = self._device.configuration
349 anis = config.ani_g_entities
350 unis = config.uni_g_entities
351
352 if anis is not None:
353 for entity_id in anis.iterkeys():
354 self.delete_pm_me(FecPerformanceMonitoringHistoryData.class_id, entity_id)
355 self.delete_pm_me(XgPonTcPerformanceMonitoringHistoryData.class_id, entity_id)
356 self.delete_pm_me(XgPonDownstreamPerformanceMonitoringHistoryData.class_id, entity_id)
357 self.delete_pm_me(XgPonUpstreamPerformanceMonitoringHistoryData.class_id, entity_id)
358
359 if unis is not None:
360 for entity_id in config.uni_g_entities.iterkeys():
361 self.delete_pm_me(EthernetPMMonitoringHistoryData.class_id, entity_id)
362
363 def on_enter_starting(self):
364 """ Add the PON/ANI and UNI PM intervals"""
365 self.advertise(OpenOmciEventType.state_change, self.state)
366
367 self._device = self._agent.get_device(self._device_id)
368 self._cancel_deferred()
369
370 # Set up OMCI ME Response subscriptions
371 try:
372 for event, sub in self._omci_cc_sub_mapping.iteritems():
373 if self._omci_cc_subscriptions[event] is None:
374 self._omci_cc_subscriptions[event] = \
375 self._device.omci_cc.event_bus.subscribe(
376 topic=OMCI_CC.event_bus_topic(self._device_id, event),
377 callback=sub)
378
379 except Exception as e:
380 self.log.exception('omci-cc-subscription-setup', e=e)
381
382 try:
383 # Manually start some ANI/PON and UNI PM interval MEs
384 config = self._device.configuration
385 anis = config.ani_g_entities
386 unis = config.uni_g_entities
387
388 if anis is not None:
389 for entity_id in anis.iterkeys():
390 self.add_pm_me(FecPerformanceMonitoringHistoryData.class_id,
391 entity_id)
392 self.add_pm_me(XgPonTcPerformanceMonitoringHistoryData.class_id,
393 entity_id)
394 self.add_pm_me(XgPonDownstreamPerformanceMonitoringHistoryData.class_id,
395 entity_id)
396 self.add_pm_me(XgPonUpstreamPerformanceMonitoringHistoryData.class_id,
397 entity_id)
398
399 if unis is not None:
400 for entity_id in config.uni_g_entities.iterkeys():
401 self.add_pm_me(EthernetPMMonitoringHistoryData.class_id, entity_id)
402
403 # Look for existing instances of dynamically created ME's that have PM
404 # associated with them and add them now
405 for class_id in self._me_watch_list.iterkeys():
406 instances = {k: v for k, v in
407 self._device.query_mib(class_id=class_id).items()
408 if isinstance(k, int)}
409
410 for entity_id, data in instances.items():
411 method = self._me_watch_list[class_id]['create-delete']
412 cid, eid = method(None, class_id, entity_id,
413 add=True, attributes=data[ATTRIBUTES_KEY])
414 if cid > 0:
415 # BP entity_id -> (PM class_id, PM entity_id)
416 instances = self._me_watch_list[class_id]['instances']
417 instances[entity_id] = (cid, eid)
418
419 except Exception as e:
420 self.log.exception('pm-me-setup', class_id=class_id, e=e)
421
422 # Got to synchronize_time state
423 self._deferred = reactor.callLater(0, self.tick)
424
425 def on_enter_synchronize_time(self):
426 """
427 State machine has just transitioned to the synchronize_time state
428 """
429 self.advertise(OpenOmciEventType.state_change, self.state)
430 self._cancel_deferred()
431
432 def success(_results):
433 self.log.debug('sync-time-success')
434 self._current_task = None
435 self._deferred = reactor.callLater(0, self.success)
436 # Calculate next interval time
437 self._next_interval = self.get_next_interval
438
439 def failure(reason):
440 self.log.info('sync-time-failure', reason=reason)
441 self._current_task = None
442 self._deferred = reactor.callLater(self._timeout_delay, self.failure)
443
444 # Schedule a task to set the ONU time
445 self._current_task = self._sync_time_task(self._agent, self._device_id)
446 self._task_deferred = self._device.task_runner.queue_task(self._current_task)
447 self._task_deferred.addCallbacks(success, failure)
448
449 def on_enter_idle(self):
450 """
451 State machine has just transitioned to the idle state
452
453 In this state, any added PM MEs that need to be created will be.
454 TODO: some non-interval PM stats (if there are any) are collected here
455 """
456 self.advertise(OpenOmciEventType.state_change, self.state)
457 self._cancel_deferred()
458
459 if len(self._del_pm_me) and self._delete_me_deferred is None:
460 self._delete_me_deferred = reactor.callLater(0, self.delete_me)
461
462 elif len(self._add_pm_me) and self._add_me_deferred is None:
463 self._add_me_deferred = reactor.callLater(0, self.add_me)
464
465 elif datetime.utcnow() >= self._next_poll_time:
466 def success(results):
467 self._device.timestamp = arrow.utcnow().float_timestamp
468 self._device.mib_synchronizer.mib_set(results.me_class.class_id,
469 results.entity_id,
470 results.attributes)
471 self._next_poll_time = datetime.utcnow() + timedelta(seconds=self._poll_interval)
472
473 def failure(reason):
474 self.log.info('poll-failure', reason=reason)
475 self._device.timestamp = None
476 return None
477
478 # Scan all ANI-G ports
479 ani_g_entities = self._device.configuration.ani_g_entities
480 ani_g_entities_ids = ani_g_entities.keys() if ani_g_entities is not None else None
481
482 if ani_g_entities_ids is not None and len(ani_g_entities_ids):
483 for entity_id in ani_g_entities_ids:
484 task = OmciGetRequest(self._agent, self.device_id,
485 AniG, entity_id,
486 self._ani_g_items, allow_failure=True)
487 self._task_deferred = self._device.task_runner.queue_task(task)
488 self._task_deferred.addCallbacks(success, failure)
489 else:
490 self.log.warn('poll-pm-no-anis')
491 self._next_poll_time = datetime.utcnow() + timedelta(seconds=self._poll_interval)
492
493 # TODO: Compute a better mechanism than just polling here, perhaps based on
494 # the next time to fetch data for 'any' interval
495 self._deferred = reactor.callLater(self._tick_delay, self.tick)
496
497 def on_enter_create_pm_me(self):
498 """
499 State machine has just transitioned to the create_pm_me state
500 """
501 self.advertise(OpenOmciEventType.state_change, self.state)
502 self._cancel_deferred()
503 self._cancel_tasks()
504 mes, self._add_pm_me = self._add_pm_me, dict()
505
506 def success(results):
507 self.log.debug('create-me-success', results=results)
508
509 # Check if already here. The create request could have received
510 # an already-exists status code which we consider successful
511 for pm, me in mes.items():
512 self._pm_me_collect_retries[pm] = self.pm_collected(pm)
513 self._pm_me_extended_info[pm] = me
514
515 self._current_task = None
516 self._deferred = reactor.callLater(0, self.success)
517
518 def failure(reason):
519 self.log.info('create-me-failure', reason=reason, retries=self._add_pm_me_retry)
520 self._current_task = None
521 if self._add_pm_me_retry <= self._create_attempts:
522 for pm, me in mes.items():
523 self._add_pm_me[pm] = me
524 self._add_pm_me_retry += 1
525 self._deferred = reactor.callLater(self._timeout_delay, self.failure)
526 else:
527 # we cant seem to create any collection me, no point in doing anything
528 self.log.warn('unable-to-create-pm-me-disabling-collection', reason=reason, device_id=self._device_id)
529 self._deferred = reactor.callLater(self._timeout_delay, self.stop)
530
531 self._current_task = self._create_pm_task(self._agent, self._device_id, mes)
532 self._task_deferred = self._device.task_runner.queue_task(self._current_task)
533 self._task_deferred.addCallbacks(success, failure)
534
535 def on_enter_delete_pm_me(self):
536 """
537 State machine has just transitioned to the delete_pm_me state
538 """
539 self.advertise(OpenOmciEventType.state_change, self.state)
540 self._cancel_deferred()
541 self._cancel_tasks()
542
543 mes, self._del_pm_me = self._del_pm_me, set()
544
545 def success(results):
546 self.log.debug('delete-me-success', results=results)
547 self._current_task = None
548 for me in mes:
549 self._pm_me_collect_retries.pop(me)
550
551 self._deferred = reactor.callLater(0, self.success)
552
553 def failure(reason):
554 self.log.info('delete-me-failure', reason=reason)
555 self._current_task = None
556 for me in mes:
557 self._del_pm_me.add(me)
558
559 self._deferred = reactor.callLater(self._timeout_delay, self.failure)
560
561 self._current_task = self._delete_pm_task(self._agent, self._device_id, mes)
562 self._task_deferred = self._device.task_runner.queue_task(self._current_task)
563 self._task_deferred.addCallbacks(success, failure)
564
565 def on_enter_collect_data(self):
566 """
567 State machine has just transitioned to the collect_data state
568 """
569
570 if self._next_interval is not None and self._next_interval > datetime.utcnow():
571 self.log.debug('wait-next-interval')
572 # Not ready for next interval, transition back to idle and we should get
573 # called again after a short delay
574 reactor.callLater(0, self.success)
575 return
576
577 self.advertise(OpenOmciEventType.state_change, self.state)
578 self._cancel_deferred()
579 self._cancel_tasks()
580 keys = self._pm_me_collect_retries.keys()
581 shuffle(keys)
582
583 for key in keys:
584 class_id = key[0]
585 entity_id = key[1]
586
587 self.log.debug("in-enter-collect-data", data_key=key,
588 retries=self._pm_me_collect_retries[key])
589
590 # Collect the data ?
591 if self._pm_me_collect_retries[key] > 0:
592 def success(results):
593 self.log.debug('collect-success', results=results,
594 class_id=results.get('class_id'),
595 entity_id=results.get('entity_id'))
596 self._current_task = None
597 self._pm_me_collect_retries[key] = 0
598 self._deferred = reactor.callLater(0, self.success)
599 return results
600
601 def failure(reason):
602 self.log.info('collect-failure', reason=reason)
603 self._current_task = None
604 self._pm_me_collect_retries[key] -= 1
605 self._deferred = reactor.callLater(self._timeout_delay, self.failure)
606 return reason # Halt callback processing
607
608 # start the task
609 if key in self._pm_me_extended_info:
610 self.log.debug('collect-extended-info-found', data_key=key,
611 extended_info=self._pm_me_extended_info[key])
612 parent_class_id = self._pm_me_extended_info[key][0]
613 parent_entity_id = self._pm_me_extended_info[key][1]
614 upstream = self._pm_me_extended_info[key][2]
615 else:
616 self.log.debug('collect-extended-info-not-found', data_key=key)
617 parent_class_id = None
618 parent_entity_id = None
619 upstream = None
620
621 self._current_task = self._get_interval_task(self._agent, self._device_id,
622 class_id, entity_id,
623 parent_class_id=parent_class_id,
624 parent_entity_id=parent_entity_id,
625 upstream=upstream)
626 self._task_deferred = self._device.task_runner.queue_task(self._current_task)
627 self._task_deferred.addCallbacks(success, failure)
628 self._task_deferred.addCallback(self.publish_data)
629 return
630
631 # Here if all intervals have been collected (we are up to date)
632 self._next_interval = self.get_next_interval
633 self.log.debug('collect-calculate-next', next=self._next_interval)
634
635 self._pm_me_collect_retries = dict.fromkeys(self._pm_me_collect_retries, self._collect_attempts)
636 reactor.callLater(0, self.success)
637
638 def on_enter_threshold_exceeded(self):
639 """
640 State machine has just transitioned to the threshold_exceeded state
641 """
642 pass # TODO: Not sure if we want this state. Need to get alarm synchronizer working first
643
644 @property
645 def get_next_interval(self):
646 """
647 Determine the time for the next interval collection for all of this
648 ONUs PM Intervals. Earliest fetch time is at least 1 minute into the
649 next interval.
650
651 :return: (datetime) UTC time to get the next interval
652 """
653 now = datetime.utcnow()
654
655 # Get delta seconds to at least 1 minute into next interval
656 next_delta_secs = (16 - (now.minute % 15)) * 60
657 next_interval = now + timedelta(seconds=next_delta_secs)
658
659 # NOTE: For debugging, uncomment next section to perform collection
660 # right after initial code startup/mib-sync
661 if self._next_interval is None:
662 return now # Do it now (just for debugging purposes)
663
664 # Skew the next time up to the maximum specified
665 # TODO: May want to skew in a shorter range and select the minute
666 # based off some device property value to make collection a
667 # little more predictable on a per-ONU basis.
668 return next_interval + timedelta(seconds=uniform(0, self._interval_skew))
669
670 def pm_collected(self, key):
671 """
672 Query database and determine if PM data needs to be collected for this ME
673 """
674 class_id = key[0]
675 entity_id = key[1]
676
677 return self._collect_attempts # TODO: Implement persistent storage
678
679 def publish_data(self, results):
680 """
681 Publish the PM interval results on the appropriate bus. The results are
682 a dictionary with the following format.
683
684 'class-id': (int) ME Class ID,
685 'entity-id': (int) ME Entity ID,
686 'me-name': (str) ME Class name, # Mostly for debugging...
687 'interval-end-time': None,
688 'interval-utc-time': (DateTime) UTC time when retrieved from ONU,
689
690 Counters added here as they are retrieved with the format of
691 'counter-attribute-name': value (int)
692
693 :param results: (dict) PM results
694 """
695 self.log.debug('collect-publish', results=results)
696
697 if self._pm_config is not None:
698 self._pm_config.publish_metrics(results)
699
700 pass # TODO: Save off last time interval fetched to persistent storage?
701
702 def on_mib_reset_response(self, _topic, msg):
703 """
704 Called upon receipt of a MIB Reset Response for this ONU
705
706 :param _topic: (str) OMCI-RX topic
707 :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
708 """
709 self.log.debug('on-mib-reset-response', state=self.state)
710 try:
711 response = msg[RX_RESPONSE_KEY]
712 omci_msg = response.fields['omci_message'].fields
713 status = omci_msg['success_code']
714
715 if status == RC.Success:
716 for class_id in self._me_watch_list.iterkeys():
717 # BP entity_id -> (PM class_id, PM entity_id)
718 instances = self._me_watch_list[class_id]['instances']
719 for _, me_pair in instances.items():
720 self._me_watch_list[class_id]['create-delete'](None, me_pair[0],
721 me_pair[1], add=False)
722 self._me_watch_list[class_id]['instances'] = dict()
723
724 except KeyError:
725 pass # NOP
726
727 def on_create_response(self, _topic, msg):
728 """
729 Called upon receipt of a Create Response for this ONU.
730
731 :param _topic: (str) OMCI-RX topic
732 :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
733 """
734 self.log.debug('on-create-response', state=self.state)
735
736 def valid_request(stat, c_id, e_id):
737 return self._omci_cc_subscriptions[RxEvent.Delete] is not None\
738 and stat in (RC.Success, RC.InstanceExists) \
739 and c_id in self._me_watch_list.keys() \
740 and e_id not in self._me_watch_list[c_id]['instances']
741
742 response = msg[RX_RESPONSE_KEY]
743 omci = response.fields['omci_message'].fields
744 class_id = omci['entity_class']
745 entity_id = omci['entity_id']
746 status = omci['success_code']
747
748 if valid_request(status, class_id, entity_id):
749 request = msg[TX_REQUEST_KEY]
750 method = self._me_watch_list[class_id]['create-delete']
751 cid, eid = method(request, class_id, entity_id, add=True)
752
753 if cid > 0:
754 # BP entity_id -> (PM class_id, PM entity_id)
755 instances = self._me_watch_list[class_id]['instances']
756 instances[entity_id] = (cid, eid)
757
758 def on_delete_response(self, _topic, msg):
759 """
760 Called upon receipt of a Delete Response for this ONU
761
762 :param _topic: (str) OMCI-RX topic
763 :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
764 """
765 self.log.debug('on-delete-response', state=self.state)
766
767 def valid_request(stat, cid, eid):
768 return self._omci_cc_subscriptions[RxEvent.Delete] is not None\
769 and stat in (RC.Success, RC.UnknownInstance) \
770 and cid in self._me_watch_list.keys() \
771 and eid in self._me_watch_list[cid]['instances']
772
773 response = msg[RX_RESPONSE_KEY]
774 omci = response.fields['omci_message'].fields
775 class_id = omci['entity_class']
776 entity_id = omci['entity_id']
777 status = omci['success_code']
778
779 if valid_request(status, class_id, entity_id):
780 request = msg[TX_REQUEST_KEY]
781 method = self._me_watch_list[class_id]['create-delete']
782
783 method(request, class_id, entity_id, add=False)
784 # BP entity_id -> (PM class_id, PM entity_id)
785 instances = self._me_watch_list[class_id]['instances']
786 del instances[entity_id]
787
788 def get_pm_entity_id_for_add(self, pm_cid, eid):
789 """
790 Select the Entity ID to use for a specific PM Class ID. For extended
791 PM ME's, an entity id (>0) is allocated
792
793 :param pm_cid: (int) PM ME Class ID to create/get entry ID for
794 :param eid: (int) Reference class's entity ID. Used as PM entity ID for non-
795 extended PM history PMs
796 :return: (int) Entity ID to use
797 """
798 if pm_cid in (EthernetFrameExtendedPerformanceMonitoring.class_id,
799 EthernetFrameExtendedPerformanceMonitoring64Bit.class_id):
800 return self._enet_entity_id.get_next()
801 return eid
802
803 def release_pm_entity_id(self, pm_cid, eid):
804 if pm_cid in (EthernetFrameExtendedPerformanceMonitoring.class_id,
805 EthernetFrameExtendedPerformanceMonitoring64Bit.class_id):
806 try:
807 self._enet_entity_id.release(eid)
808 except:
809 pass
810
811 def add_remove_enet_frame_pm(self, request, class_id, entity_id,
812 add=True,
813 attributes=None):
814 """
815 Add/remove PM for the dynamic MAC Port configuration data.
816
817 This can be called in a variety of ways:
818
819 o If from an Response event from OMCI_CC, the request will contain
820 the original create/delete request. The class_id and entity_id will
821 be the MAC Data Configuration Data class and instance ID.
822 add = True if create, False if delete
823
824 o If starting up (and the associated ME is already created), the MAC
825 Data Configuration Data class and instance ID, and attributes are
826 provided. request = None and add = True
827
828 o If cleaning up (stopping), the PM ME class_id, entity_id are provided.
829 request = None and add = False
830
831 :return: (int, int) PM ME class_id and entity_id for add/remove was performed.
832 class and entity IDs are non-zero on success
833 """
834 pm_entity_id = 0
835 cid = 0
836 eid = 0
837 upstream = False
838
839 def tp_type_to_pm(tp):
840 # TODO: Support 64-bit extended Monitoring MEs.
841 # This will result in the need to maintain entity IDs of PMs differently
842 upstream_types = [ # EthernetFrameExtendedPerformanceMonitoring64Bit.class_id,
843 EthernetFrameExtendedPerformanceMonitoring.class_id,
844 EthernetFrameUpstreamPerformanceMonitoringHistoryData.class_id], True
845 downstream_types = [ # EthernetFrameExtendedPerformanceMonitoring64Bit.class_id,
846 EthernetFrameExtendedPerformanceMonitoring.class_id,
847 EthernetFrameDownstreamPerformanceMonitoringHistoryData.class_id], False
848 return {
849 1: downstream_types,
850 3: upstream_types,
851 5: downstream_types,
852 6: downstream_types,
853 }.get(tp, None)
854
855 if request is not None:
856 assert class_id == MacBridgePortConfigurationData.class_id
857
858 # Is this associated with the ANI or the UNI side of the bridge?
859 # For VOLTHA v2.0, only high-speed internet data service is
860 attributes = request.fields['omci_message'].fields['data']
861 pm_class_ids, upstream = tp_type_to_pm(attributes['tp_type'])
862 cid = request.fields['omci_message'].fields['entity_class']
863 eid = request.fields['omci_message'].fields['entity_id']
864 if not add:
865 instances = self._me_watch_list[cid]['instances']
866 _, pm_entity_id = instances.get(eid, (None, None))
867
868 elif add:
869 assert class_id == MacBridgePortConfigurationData.class_id
870 assert isinstance(attributes, dict)
871
872 # Is this associated with the ANI or the UNI side of the bridge?
873 pm_class_ids, upstream = tp_type_to_pm(attributes.get('tp_type'))
874 cid = class_id
875 eid = entity_id
876
877 else:
878 assert class_id in (EthernetFrameUpstreamPerformanceMonitoringHistoryData.class_id,
879 EthernetFrameDownstreamPerformanceMonitoringHistoryData.class_id,
880 EthernetFrameExtendedPerformanceMonitoring.class_id,
881 EthernetFrameExtendedPerformanceMonitoring64Bit.class_id)
882 pm_class_ids = [class_id]
883
884 if pm_class_ids is None:
885 return False # Unable to select a supported ME for this ONU
886
887 if add:
888 for pm_class_id in pm_class_ids:
889 if self._me_is_supported(pm_class_id):
890 pm_entity_id = self.get_pm_entity_id_for_add(pm_class_id, eid)
891 self.add_pm_me(pm_class_id, pm_entity_id, cid=cid, eid=eid,
892 upstream=upstream)
893 return pm_class_id, pm_entity_id
894 else:
895 for pm_class_id in pm_class_ids:
896 if self._me_is_supported(pm_class_id):
897 self.delete_pm_me(pm_class_id, pm_entity_id)
898 self.release_pm_entity_id(pm_class_id, pm_entity_id)
899 return pm_class_id, pm_entity_id
900
901 return 0, 0