blob: 5f5b59b2403430232cb24409e69ff233b7df1168 [file] [log] [blame]
Chip Boling8e042f62019-02-12 16:14:34 -06001# Copyright 2017-present Adtran, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from adapters.adtran_common.xpon.traffic_descriptor import TrafficDescriptor
16from adapters.adtran_common.xpon.best_effort import BestEffort
17from twisted.internet.defer import inlineCallbacks, returnValue, succeed
18
19
20class OnuTrafficDescriptor(TrafficDescriptor):
21 """
22 Adtran ONU specific implementation
23 """
24 def __init__(self, fixed, assured, maximum,
25 additional=TrafficDescriptor.AdditionalBwEligibility.DEFAULT,
26 best_effort=None):
27 super(OnuTrafficDescriptor, self).__init__(fixed, assured, maximum,
28 additional=additional,
29 best_effort=best_effort)
30
31 @staticmethod
32 def create(traffic_disc):
33 assert isinstance(traffic_disc, dict), 'Traffic Descriptor should be a dictionary'
34
35 additional = TrafficDescriptor.AdditionalBwEligibility.from_value(
36 traffic_disc['additional-bw-eligibility-indicator'])
37
38 if additional == TrafficDescriptor.AdditionalBwEligibility.BEST_EFFORT_SHARING:
39 best_effort = BestEffort(traffic_disc['maximum-bandwidth'],
40 traffic_disc['priority'],
41 traffic_disc['weight'])
42 else:
43 best_effort = None
44
45 return OnuTrafficDescriptor(traffic_disc['fixed-bandwidth'],
46 traffic_disc['assured-bandwidth'],
47 traffic_disc['maximum-bandwidth'],
48 best_effort=best_effort,
49 additional=additional)
50
51 @inlineCallbacks
52 def add_to_hardware(self, omci):
53
54 results = succeed('TODO: Implement me')
55 # from ..adtran_olt_handler import AdtranOltHandler
56 #
57 # uri = AdtranOltHandler.GPON_TCONT_CONFIG_URI.format(pon_id, onu_id, alloc_id)
58 # data = json.dumps({'traffic-descriptor': self.to_dict()})
59 # name = 'tcont-td-{}-{}: {}'.format(pon_id, onu_id, alloc_id)
60 # try:
61 # results = yield session.request('PATCH', uri, data=data, name=name)
62 #
63 # except Exception as e:
64 # log.exception('traffic-descriptor', td=self, e=e)
65 # raise
66 #
67 # if self.additional_bandwidth_eligibility == \
68 # TrafficDescriptor.AdditionalBwEligibility.BEST_EFFORT_SHARING:
69 # if self.best_effort is None:
70 # raise ValueError('TCONT is best-effort but does not define best effort sharing')
71 #
72 # try:
73 # results = yield self.best_effort.add_to_hardware(session, pon_id, onu_id, alloc_id)
74 #
75 # except Exception as e:
76 # log.exception('best-effort', best_effort=self.best_effort, e=e)
77 # raise
78 returnValue(results)