blob: 17ec6204278d66509d9523ddf55e86874d350990 [file] [log] [blame]
Zsolt Haraszti66862032016-11-28 14:28:39 -08001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18A device agent is instantiated for each Device and plays an important role
19between the Device object and its adapter.
20"""
21import structlog
22from twisted.internet import reactor
23from twisted.internet.defer import inlineCallbacks, returnValue
24
25from voltha.core.config.config_proxy import CallbackType
26from voltha.protos.common_pb2 import AdminState, OperStatus
27from voltha.registry import registry
28
29log = structlog.get_logger()
30
31
32class InvalidStateTransition(Exception): pass
33
34
35class DeviceAgent(object):
36
37 def __init__(self, core, initial_data):
Zsolt Harasztic5c5d102016-12-07 21:12:27 -080038
Zsolt Haraszti66862032016-11-28 14:28:39 -080039 self.core = core
40 self._tmp_initial_data = initial_data
Zsolt Harasztic5c5d102016-12-07 21:12:27 -080041 self.last_data = None
42
Zsolt Haraszti66862032016-11-28 14:28:39 -080043 self.proxy = core.get_proxy('/devices/{}'.format(initial_data.id))
Zsolt Harasztic5c5d102016-12-07 21:12:27 -080044 self.flows_proxy = core.get_proxy(
45 '/devices/{}/flows'.format(initial_data.id))
46 self.groups_proxy = core.get_proxy(
47 '/devices/{}/flow_groups'.format(initial_data.id))
48
Zsolt Haraszti66862032016-11-28 14:28:39 -080049 self.proxy.register_callback(
50 CallbackType.PRE_UPDATE, self._validate_update)
51 self.proxy.register_callback(
52 CallbackType.POST_UPDATE, self._process_update)
Zsolt Harasztic5c5d102016-12-07 21:12:27 -080053
54 self.flows_proxy.register_callback(
55 CallbackType.POST_UPDATE, self._flow_table_updated)
56 self.groups_proxy.register_callback(
57 CallbackType.POST_UPDATE, self._group_table_updated)
58
59 # to know device capabilities
60 self.device_type = core.get_proxy(
61 '/device_types/{}'.format(initial_data.type)).get()
62
Zsolt Haraszti66862032016-11-28 14:28:39 -080063 self.adapter_agent = None
64
65 @inlineCallbacks
66 def start(self):
67 log.debug('starting')
68 self._set_adapter_agent()
69 yield self._process_update(self._tmp_initial_data)
70 del self._tmp_initial_data
71 log.info('started')
72 returnValue(self)
73
74 def stop(self):
75 log.debug('stopping')
76 self.proxy.unregister_callback(
77 CallbackType.PRE_UPDATE, self._validate_update)
78 self.proxy.unregister_callback(
79 CallbackType.POST_UPDATE, self._process_update)
80 log.info('stopped')
81
82 def _set_adapter_agent(self):
83 adapter_name = self._tmp_initial_data.adapter
84 if adapter_name == '':
85 proxy = self.core.get_proxy('/')
86 known_device_types = dict(
87 (dt.id, dt) for dt in proxy.get('/device_types'))
88 device_type = known_device_types[self._tmp_initial_data.type]
89 adapter_name = device_type.adapter
90 assert adapter_name != ''
91 self.adapter_agent = registry('adapter_loader').get_agent(adapter_name)
92
93 @inlineCallbacks
94 def _validate_update(self, device):
95 """
96 Called before each update, it allows the blocking of the update
97 (by raising an exception), or even the augmentation of the incoming
98 data.
99 """
100 log.debug('device-pre-update', device=device)
101 yield self._process_state_transitions(device, dry_run=True)
102 returnValue(device)
103
104 @inlineCallbacks
105 def _process_update(self, device):
106 """
107 Called after the device object was updated (individually or part of
108 a transaction), and it is used to propagate the change down to the
109 adapter
110 """
111 log.debug('device-post-update', device=device)
112
113 # first, process any potential state transition
114 yield self._process_state_transitions(device)
115
116 # finally, store this data as last data so we can see what changed
117 self.last_data = device
118
119 @inlineCallbacks
120 def _process_state_transitions(self, device, dry_run=False):
121
122 old_admin_state = getattr(self.last_data, 'admin_state',
123 AdminState.UNKNOWN)
124 new_admin_state = device.admin_state
125 transition_handler = self.admin_state_fsm.get(
126 (old_admin_state, new_admin_state), None)
127 if transition_handler is None:
128 pass # no-op
129 elif transition_handler is False:
130 raise InvalidStateTransition('{} -> {}'.format(
131 old_admin_state, new_admin_state))
132 else:
133 assert callable(transition_handler)
134 yield transition_handler(self, device, dry_run)
135
136 @inlineCallbacks
137 def _activate_device(self, device, dry_run=False):
138 log.info('activate-device', device=device, dry_run=dry_run)
139 if not dry_run:
140 device = yield self.adapter_agent.adopt_device(device)
141 device.oper_status = OperStatus.ACTIVATING
142 # successful return from this may also populated the device
143 # data, so we need to write it back
144 reactor.callLater(0, self.update_device, device)
145
146 def update_device(self, device):
147 self.last_data = device # so that we don't propagate back
148 self.proxy.update('/', device)
149
150 def remove_device(self, device_id):
151 raise NotImplementedError()
152
153 def _propagate_change(self, device, dry_run=False):
154 log.info('propagate-change', device=device, dry_run=dry_run)
155 if device != self.last_data:
156 raise NotImplementedError()
157 else:
158 log.debug('no-op')
159
160 def _abandon_device(self, device, dry_run=False):
161 log.info('abandon-device', device=device, dry_run=dry_run)
162 raise NotImplementedError()
163
164 def _disable_device(self, device, dry_run=False):
165 log.info('disable-device', device=device, dry_run=dry_run)
166 raise NotImplementedError()
167
168 def _reenable_device(self, device, dry_run=False):
169 log.info('reenable-device', device=device, dry_run=dry_run)
170 raise NotImplementedError()
171
172 admin_state_fsm = {
173
174 # Missing entries yield no-op
175 # False means invalid state change
176
177 (AdminState.UNKNOWN, AdminState.ENABLED): _activate_device,
178
179 (AdminState.PREPROVISIONED, AdminState.UNKNOWN): False,
180 (AdminState.PREPROVISIONED, AdminState.ENABLED): _activate_device,
181
182 (AdminState.ENABLED, AdminState.UNKNOWN): False,
183 (AdminState.ENABLED, AdminState.ENABLED): _propagate_change,
184 (AdminState.ENABLED, AdminState.DISABLED): _disable_device,
185 (AdminState.ENABLED, AdminState.PREPROVISIONED): _abandon_device,
186
187 (AdminState.DISABLED, AdminState.UNKNOWN): False,
188 (AdminState.DISABLED, AdminState.PREPROVISIONED): _abandon_device,
189 (AdminState.DISABLED, AdminState.ENABLED): _reenable_device
190
191 }
Zsolt Harasztic5c5d102016-12-07 21:12:27 -0800192
193 ## <======================= FLOW TABLE UPDATE HANDLING ====================
194
195 @inlineCallbacks
196 def _flow_table_updated(self, flows):
197 log.debug('flow-table-updated',
198 logical_device_id=self.last_data.id, flows=flows)
199
200 # if device accepts bulk flow update, lets just call that
201 if self.device_type.accepts_bulk_flow_update:
202 groups = self.groups_proxy.get('/') # gather flow groups
203 yield self.adapter_agent.update_flows_bulk(
204 device=self.last_data,
205 flows=flows,
206 groups=groups)
207 # TODO place to feed back completion
208
209 elif self.accepts_add_remove_flow_updates:
210 raise NotImplementedError()
211
212 else:
213 raise NotImplementedError()
214
215 ## <======================= GROUP TABLE UPDATE HANDLING ===================
216
217 @inlineCallbacks
218 def _group_table_updated(self, groups):
219 log.debug('group-table-updated',
220 logical_device_id=self.last_data.id,
221 flow_groups=groups)
222
223 # if device accepts bulk flow update, lets just call that
224 if self.device_type.accepts_bulk_flow_update:
225 flows = self.flows_proxy.get('/') # gather flows
226 yield self.adapter_agent.update_flows_bulk(
227 device=self.last_data,
228 flows=flows,
229 groups=groups)
230 # TODO place to feed back completion
231
232 elif self.accepts_add_remove_flow_updates:
233 raise NotImplementedError()
234
235 else:
236 raise NotImplementedError()
237