blob: e7a4912e77d8e649960fa9d8292bf6b88e958cf0 [file] [log] [blame]
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08001# Copyright 2017 the original author or authors.
Zsolt Haraszti66862032016-11-28 14:28:39 -08002#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
Zsolt Haraszti3300f742017-01-09 01:14:20 -080015from Queue import Empty as QueueEmpty
Zsolt Haraszti66862032016-11-28 14:28:39 -080016from uuid import uuid4
17
18import structlog
Zsolt Haraszti3300f742017-01-09 01:14:20 -080019from google.protobuf.empty_pb2 import Empty
Zsolt Haraszti66862032016-11-28 14:28:39 -080020from grpc import StatusCode
21
22from common.utils.grpc_utils import twisted_async
23from voltha.core.config.config_root import ConfigRoot
Ryan Van Gilder5a0ab622017-03-01 16:39:25 -080024from voltha.core.config.config_backend import ConsulStore
Zsolt Haraszti217a12e2016-12-19 16:37:55 -080025from voltha.protos.openflow_13_pb2 import PacketIn, Flows, FlowGroups, \
26 ofp_port_status
Zsolt Haraszti66862032016-11-28 14:28:39 -080027from voltha.protos.voltha_pb2 import \
28 add_VolthaLocalServiceServicer_to_server, VolthaLocalServiceServicer, \
29 VolthaInstance, Adapters, LogicalDevices, LogicalDevice, Ports, \
30 LogicalPorts, Devices, Device, DeviceType, \
Zsolt Haraszti217a12e2016-12-19 16:37:55 -080031 DeviceTypes, DeviceGroups, DeviceGroup, AdminState, OperStatus, ChangeEvent
Zsolt Haraszti66862032016-11-28 14:28:39 -080032from voltha.registry import registry
33
34log = structlog.get_logger()
35
36
37class LocalHandler(VolthaLocalServiceServicer):
Zsolt Haraszti66862032016-11-28 14:28:39 -080038 def __init__(self, core, **init_kw):
39 self.core = core
40 self.init_kw = init_kw
41 self.root = None
42 self.stopped = False
43
Ryan Van Gilder5a0ab622017-03-01 16:39:25 -080044 def start(self, config_backend=None):
Zsolt Haraszti66862032016-11-28 14:28:39 -080045 log.debug('starting')
Ryan Van Gilder5a0ab622017-03-01 16:39:25 -080046 if config_backend:
47 if 'root' in config_backend:
48 # This is going to block the entire reactor until loading is completed
49 log.info('loading config from persisted backend')
Khen Nursimulud068d812017-03-06 11:44:18 -050050 self.root = ConfigRoot.load(VolthaInstance,
51 kv_store=config_backend)
Ryan Van Gilder5a0ab622017-03-01 16:39:25 -080052 else:
53 log.info('initializing new config')
Khen Nursimulud068d812017-03-06 11:44:18 -050054 self.root = ConfigRoot(VolthaInstance(**self.init_kw),
55 kv_store=config_backend)
Ryan Van Gilder5a0ab622017-03-01 16:39:25 -080056 else:
57 self.root = ConfigRoot(VolthaInstance(**self.init_kw))
58
Zsolt Haraszti66862032016-11-28 14:28:39 -080059 registry('grpc_server').register(
60 add_VolthaLocalServiceServicer_to_server, self)
61 log.info('started')
62 return self
63
64 def stop(self):
65 log.debug('stopping')
66 self.stopped = True
67 log.info('stopped')
68
69 def get_proxy(self, path, exclusive=False):
70 return self.root.get_proxy(path, exclusive)
71
72 # gRPC service method implementations. BE CAREFUL; THESE ARE CALLED ON
73 # the gRPC threadpool threads.
74
75 @twisted_async
76 def GetVolthaInstance(self, request, context):
77 log.info('grpc-request', request=request)
78 depth = int(dict(context.invocation_metadata()).get('get-depth', 0))
79 res = self.root.get('/', depth=depth)
80 return res
81
82 @twisted_async
83 def GetHealth(self, request, context):
84 log.info('grpc-request', request=request)
85 return self.root.get('/health')
86
87 @twisted_async
88 def ListAdapters(self, request, context):
89 log.info('grpc-request', request=request)
90 items = self.root.get('/adapters')
91 return Adapters(items=items)
92
93 @twisted_async
94 def ListLogicalDevices(self, request, context):
95 log.info('grpc-request', request=request)
96 items = self.root.get('/logical_devices')
97 return LogicalDevices(items=items)
98
99 @twisted_async
100 def GetLogicalDevice(self, request, context):
101 log.info('grpc-request', request=request)
102
Zsolt Harasztif7722f92016-12-13 17:54:30 -0800103 depth = int(dict(context.invocation_metadata()).get('get-depth', 0))
104
Zsolt Haraszti66862032016-11-28 14:28:39 -0800105 if '/' in request.id:
106 context.set_details(
107 'Malformed logical device id \'{}\''.format(request.id))
108 context.set_code(StatusCode.INVALID_ARGUMENT)
109 return LogicalDevice()
110
111 try:
Zsolt Harasztif7722f92016-12-13 17:54:30 -0800112 return self.root.get('/logical_devices/' + request.id, depth=depth)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800113 except KeyError:
114 context.set_details(
115 'Logical device \'{}\' not found'.format(request.id))
116 context.set_code(StatusCode.NOT_FOUND)
117 return LogicalDevice()
118
119 @twisted_async
120 def ListLogicalDevicePorts(self, request, context):
121 log.info('grpc-request', request=request)
122
123 if '/' in request.id:
124 context.set_details(
125 'Malformed logical device id \'{}\''.format(request.id))
126 context.set_code(StatusCode.INVALID_ARGUMENT)
127 return LogicalPorts()
128
129 try:
Khen Nursimulud068d812017-03-06 11:44:18 -0500130 items = self.root.get(
131 '/logical_devices/{}/ports'.format(request.id))
Zsolt Haraszti66862032016-11-28 14:28:39 -0800132 return LogicalPorts(items=items)
133 except KeyError:
134 context.set_details(
135 'Logical device \'{}\' not found'.format(request.id))
136 context.set_code(StatusCode.NOT_FOUND)
137 return LogicalPorts()
138
139 @twisted_async
140 def ListLogicalDeviceFlows(self, request, context):
141 log.info('grpc-request', request=request)
142
143 if '/' in request.id:
144 context.set_details(
145 'Malformed logical device id \'{}\''.format(request.id))
146 context.set_code(StatusCode.INVALID_ARGUMENT)
147 return Flows()
148
149 try:
Khen Nursimulud068d812017-03-06 11:44:18 -0500150 flows = self.root.get(
151 '/logical_devices/{}/flows'.format(request.id))
Zsolt Haraszti66862032016-11-28 14:28:39 -0800152 return flows
153 except KeyError:
154 context.set_details(
155 'Logical device \'{}\' not found'.format(request.id))
156 context.set_code(StatusCode.NOT_FOUND)
157 return Flows()
158
Zsolt Haraszti66862032016-11-28 14:28:39 -0800159 @twisted_async
160 def UpdateLogicalDeviceFlowTable(self, request, context):
161 log.info('grpc-request', request=request)
162
163 if '/' in request.id:
164 context.set_details(
165 'Malformed logical device id \'{}\''.format(request.id))
166 context.set_code(StatusCode.INVALID_ARGUMENT)
167 return Empty()
168
169 try:
170 agent = self.core.get_logical_device_agent(request.id)
171 agent.update_flow_table(request.flow_mod)
172 return Empty()
173 except KeyError:
174 context.set_details(
175 'Logical device \'{}\' not found'.format(request.id))
176 context.set_code(StatusCode.NOT_FOUND)
177 return Empty()
178
179 @twisted_async
180 def ListLogicalDeviceFlowGroups(self, request, context):
181 log.info('grpc-request', request=request)
182
183 if '/' in request.id:
184 context.set_details(
185 'Malformed logical device id \'{}\''.format(request.id))
186 context.set_code(StatusCode.INVALID_ARGUMENT)
187 return FlowGroups()
188
189 try:
190 groups = self.root.get(
191 '/logical_devices/{}/flow_groups'.format(request.id))
192 return groups
193 except KeyError:
194 context.set_details(
195 'Logical device \'{}\' not found'.format(request.id))
196 context.set_code(StatusCode.NOT_FOUND)
197 return FlowGroups()
198
199 @twisted_async
200 def UpdateLogicalDeviceFlowGroupTable(self, request, context):
201 log.info('grpc-request', request=request)
202
203 if '/' in request.id:
204 context.set_details(
205 'Malformed logical device id \'{}\''.format(request.id))
206 context.set_code(StatusCode.INVALID_ARGUMENT)
207 return Empty()
208
209 try:
210 agent = self.core.get_logical_device_agent(request.id)
211 agent.update_group_table(request.group_mod)
212 return Empty()
213 except KeyError:
214 context.set_details(
215 'Logical device \'{}\' not found'.format(request.id))
216 context.set_code(StatusCode.NOT_FOUND)
217 return Empty()
218
219 @twisted_async
220 def ListDevices(self, request, context):
221 log.info('grpc-request', request=request)
222 items = self.root.get('/devices')
223 return Devices(items=items)
224
225 @twisted_async
226 def GetDevice(self, request, context):
227 log.info('grpc-request', request=request)
228
Zsolt Harasztif7722f92016-12-13 17:54:30 -0800229 depth = int(dict(context.invocation_metadata()).get('get-depth', 0))
230
Zsolt Haraszti66862032016-11-28 14:28:39 -0800231 if '/' in request.id:
232 context.set_details(
233 'Malformed device id \'{}\''.format(request.id))
234 context.set_code(StatusCode.INVALID_ARGUMENT)
235 return Device()
236
237 try:
Zsolt Harasztif7722f92016-12-13 17:54:30 -0800238 return self.root.get('/devices/' + request.id, depth=depth)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800239 except KeyError:
240 context.set_details(
241 'Device \'{}\' not found'.format(request.id))
242 context.set_code(StatusCode.NOT_FOUND)
243 return Device()
244
245 @twisted_async
246 def CreateDevice(self, request, context):
247 log.info('grpc-request', request=request)
248
249 known_device_types = dict(
250 (dt.id, dt) for dt in self.root.get('/device_types'))
251
252 try:
253 assert isinstance(request, Device)
254 device = request
255 assert device.id == '', 'Device to be created cannot have id yet'
256 assert device.type in known_device_types, \
257 'Unknown device type \'{}\''.format(device.type)
258 assert device.admin_state in (AdminState.UNKNOWN,
259 AdminState.PREPROVISIONED), \
260 'Newly created device cannot be ' \
261 'in admin state \'{}\''.format(device.admin_state)
262
263 except AssertionError, e:
264 context.set_details(e.msg)
265 context.set_code(StatusCode.INVALID_ARGUMENT)
266 return Device()
267
268 # fill additional data
269 device.id = uuid4().hex[:12]
270 device_type = known_device_types[device.type]
271 device.adapter = device_type.adapter
272 if device.admin_state != AdminState.PREPROVISIONED:
273 device.admin_state = AdminState.PREPROVISIONED
274 device.oper_status = OperStatus.UNKNOWN
275
276 # add device to tree
277 self.root.add('/devices', device)
278
279 return request
280
281 @twisted_async
Khen Nursimulud068d812017-03-06 11:44:18 -0500282 def EnableDevice(self, request, context):
Zsolt Haraszti66862032016-11-28 14:28:39 -0800283 log.info('grpc-request', request=request)
284
285 if '/' in request.id:
286 context.set_details(
287 'Malformed device id \'{}\''.format(request.id))
288 context.set_code(StatusCode.INVALID_ARGUMENT)
289 return Device()
290
291 try:
292 path = '/devices/{}'.format(request.id)
293 device = self.root.get(path)
Khen Nursimulud068d812017-03-06 11:44:18 -0500294 assert device.admin_state in (AdminState.PREPROVISIONED,
295 AdminState.DISABLED), \
296 'Device to enable cannot be ' \
297 'in admin state \'{}\''.format(device.admin_state)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800298 device.admin_state = AdminState.ENABLED
299 self.root.update(path, device, strict=True)
300
Khen Nursimulud068d812017-03-06 11:44:18 -0500301 except AssertionError, e:
302 context.set_details(e.msg)
303 context.set_code(StatusCode.INVALID_ARGUMENT)
304 return Device()
305
306 except KeyError:
307 context.set_details(
308 'Device \'{}\' not found'.format(request.id))
309 context.set_code(StatusCode.NOT_FOUND)
310
311 return Empty()
312
313 @twisted_async
314 def DisableDevice(self, request, context):
315 log.info('grpc-request', request=request)
316
317 if '/' in request.id:
318 context.set_details(
319 'Malformed device id \'{}\''.format(request.id))
320 context.set_code(StatusCode.INVALID_ARGUMENT)
321 return Device()
322
323 try:
324 path = '/devices/{}'.format(request.id)
325 device = self.root.get(path)
326 assert device.admin_state == AdminState.ENABLED, \
327 'Device to disable cannot be ' \
328 'in admin state \'{}\''.format(device.admin_state)
329 device.admin_state = AdminState.DISABLED
330 self.root.update(path, device, strict=True)
331
332 except AssertionError, e:
333 context.set_details(e.msg)
334 context.set_code(StatusCode.INVALID_ARGUMENT)
335 return Device()
336
337 except KeyError:
338 context.set_details(
339 'Device \'{}\' not found'.format(request.id))
340 context.set_code(StatusCode.NOT_FOUND)
341
342 return Empty()
343
344 @twisted_async
345 def RebootDevice(self, request, context):
346 log.info('grpc-request', request=request)
347
348 if '/' in request.id:
349 context.set_details(
350 'Malformed device id \'{}\''.format(request.id))
351 context.set_code(StatusCode.INVALID_ARGUMENT)
352 return Device()
353
354 try:
355 path = '/devices/{}'.format(request.id)
356 device = self.root.get(path)
357
358 agent = self.core.get_device_agent(device.id)
359 agent.reboot_device(device)
360
361 except AssertionError, e:
362 context.set_details(e.msg)
363 context.set_code(StatusCode.INVALID_ARGUMENT)
364 return Device()
365
366 except KeyError:
367 context.set_details(
368 'Device \'{}\' not found'.format(request.id))
369 context.set_code(StatusCode.NOT_FOUND)
370
371 return Empty()
372
373 @twisted_async
374 def DeleteDevice(self, request, context):
375 log.info('grpc-request', request=request)
376
377 if '/' in request.id:
378 context.set_details(
379 'Malformed device id \'{}\''.format(request.id))
380 context.set_code(StatusCode.INVALID_ARGUMENT)
381 return Device()
382
383 try:
384 path = '/devices/{}'.format(request.id)
385 device = self.root.get(path)
386 assert device.admin_state == AdminState.DISABLED, \
387 'Device to delete cannot be ' \
388 'in admin state \'{}\''.format(device.admin_state)
389
390 self.root.remove(path)
391
392 except AssertionError, e:
393 context.set_details(e.msg)
394 context.set_code(StatusCode.INVALID_ARGUMENT)
395 return Device()
396
Zsolt Haraszti66862032016-11-28 14:28:39 -0800397 except KeyError:
398 context.set_details(
399 'Device \'{}\' not found'.format(request.id))
400 context.set_code(StatusCode.NOT_FOUND)
401
402 return Empty()
403
404 @twisted_async
405 def ListDevicePorts(self, request, context):
406 log.info('grpc-request', request=request)
407
408 if '/' in request.id:
409 context.set_details(
410 'Malformed device id \'{}\''.format(request.id))
411 context.set_code(StatusCode.INVALID_ARGUMENT)
412 return Ports()
413
414 try:
415 items = self.root.get('/devices/{}/ports'.format(request.id))
416 return Ports(items=items)
417 except KeyError:
418 context.set_details(
419 'Device \'{}\' not found'.format(request.id))
420 context.set_code(StatusCode.NOT_FOUND)
421 return Ports()
422
Zsolt Haraszti66862032016-11-28 14:28:39 -0800423 @twisted_async
424 def ListDeviceFlows(self, request, context):
425 log.info('grpc-request', request=request)
426
427 if '/' in request.id:
428 context.set_details(
429 'Malformed device id \'{}\''.format(request.id))
430 context.set_code(StatusCode.INVALID_ARGUMENT)
431 return Flows()
432
433 try:
434 flows = self.root.get('/devices/{}/flows'.format(request.id))
435 return flows
436 except KeyError:
437 context.set_details(
438 'Device \'{}\' not found'.format(request.id))
439 context.set_code(StatusCode.NOT_FOUND)
440 return Flows()
441
Zsolt Haraszti66862032016-11-28 14:28:39 -0800442 @twisted_async
443 def ListDeviceFlowGroups(self, request, context):
444 log.info('grpc-request', request=request)
445
446 if '/' in request.id:
447 context.set_details(
448 'Malformed device id \'{}\''.format(request.id))
449 context.set_code(StatusCode.INVALID_ARGUMENT)
450 return FlowGroups()
451
452 try:
Khen Nursimulud068d812017-03-06 11:44:18 -0500453 groups = self.root.get(
454 '/devices/{}/flow_groups'.format(request.id))
Zsolt Haraszti66862032016-11-28 14:28:39 -0800455 return groups
456 except KeyError:
457 context.set_details(
458 'Device \'{}\' not found'.format(request.id))
459 context.set_code(StatusCode.NOT_FOUND)
460 return FlowGroups()
461
462 @twisted_async
463 def ListDeviceTypes(self, request, context):
464 log.info('grpc-request', request=request)
465 items = self.root.get('/device_types')
466 return DeviceTypes(items=items)
467
468 @twisted_async
469 def GetDeviceType(self, request, context):
470 log.info('grpc-request', request=request)
471
Zsolt Harasztif7722f92016-12-13 17:54:30 -0800472 depth = int(dict(context.invocation_metadata()).get('get-depth', 0))
473
Zsolt Haraszti66862032016-11-28 14:28:39 -0800474 if '/' in request.id:
475 context.set_details(
476 'Malformed device type id \'{}\''.format(request.id))
477 context.set_code(StatusCode.INVALID_ARGUMENT)
478 return DeviceType()
479
480 try:
Zsolt Harasztif7722f92016-12-13 17:54:30 -0800481 return self.root.get('/device_types/' + request.id, depth=depth)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800482 except KeyError:
483 context.set_details(
484 'Device type \'{}\' not found'.format(request.id))
485 context.set_code(StatusCode.NOT_FOUND)
486 return DeviceType()
487
488 @twisted_async
489 def ListDeviceGroups(self, request, context):
490 log.info('grpc-request', request=request)
491 # TODO is this mapped to tree or taken from coordinator?
492 items = self.root.get('/device_groups')
493 return DeviceGroups(items=items)
494
495 @twisted_async
496 def GetDeviceGroup(self, request, context):
497 log.info('grpc-request', request=request)
498
Zsolt Harasztif7722f92016-12-13 17:54:30 -0800499 depth = int(dict(context.invocation_metadata()).get('get-depth', 0))
500
Zsolt Haraszti66862032016-11-28 14:28:39 -0800501 if '/' in request.id:
502 context.set_details(
503 'Malformed device group id \'{}\''.format(request.id))
504 context.set_code(StatusCode.INVALID_ARGUMENT)
505 return DeviceGroup()
506
507 # TODO is this mapped to tree or taken from coordinator?
508 try:
Zsolt Harasztif7722f92016-12-13 17:54:30 -0800509 return self.root.get('/device_groups/' + request.id, depth=depth)
Zsolt Haraszti66862032016-11-28 14:28:39 -0800510 except KeyError:
511 context.set_details(
512 'Device group \'{}\' not found'.format(request.id))
513 context.set_code(StatusCode.NOT_FOUND)
514 return DeviceGroup()
515
516 def StreamPacketsOut(self, request_iterator, context):
517
518 @twisted_async
519 def forward_packet_out(packet_out):
520 agent = self.core.get_logical_device_agent(packet_out.id)
521 agent.packet_out(packet_out.packet_out)
522
523 for request in request_iterator:
524 forward_packet_out(packet_out=request)
525
526 return Empty()
527
528 def ReceivePacketsIn(self, request, context):
529 while 1:
Zsolt Haraszti3300f742017-01-09 01:14:20 -0800530 try:
531 packet_in = self.core.packet_in_queue.get(timeout=1)
532 yield packet_in
533 except QueueEmpty:
534 if self.stopped:
535 break
Zsolt Haraszti66862032016-11-28 14:28:39 -0800536
537 def send_packet_in(self, device_id, ofp_packet_in):
538 """Must be called on the twisted thread"""
539 packet_in = PacketIn(id=device_id, packet_in=ofp_packet_in)
540 self.core.packet_in_queue.put(packet_in)
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800541
542 def ReceiveChangeEvents(self, request, context):
543 while 1:
Zsolt Haraszti3300f742017-01-09 01:14:20 -0800544 try:
545 event = self.core.change_event_queue.get(timeout=1)
546 yield event
547 except QueueEmpty:
548 if self.stopped:
549 break
Zsolt Haraszti217a12e2016-12-19 16:37:55 -0800550
551 def send_port_change_event(self, device_id, port_status):
552 """Must be called on the twisted thread"""
553 assert isinstance(port_status, ofp_port_status)
554 event = ChangeEvent(id=device_id, port_status=port_status)
555 self.core.change_event_queue.put(event)