blob: 3b7a2173985c446e6c676734b4cb8209d65af1ff [file] [log] [blame]
Zsolt Haraszti66862032016-11-28 14:28:39 -08001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti66862032016-11-28 14:28:39 -08003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Dispatcher is responsible to dispatch incoming "global" gRPC requests
19to the respective Voltha instance (leader, peer instance, local). Local
20calls are forwarded to the LocalHandler.
21"""
22import structlog
khenaidoo08d48d22017-06-29 19:42:49 -040023from twisted.internet.defer import inlineCallbacks, returnValue
Zsolt Haraszti66862032016-11-28 14:28:39 -080024from voltha.protos.voltha_pb2 import VolthaLocalServiceStub
khenaidoo08d48d22017-06-29 19:42:49 -040025from voltha.registry import registry
26from twisted.internet import reactor
27import grpc
28from grpc import StatusCode
29from grpc._channel import _Rendezvous
30from common.utils.id_generation import get_core_id_from_device_id, \
31 is_broadcast_core_id
Zsolt Haraszti66862032016-11-28 14:28:39 -080032
33log = structlog.get_logger()
34
35
khenaidoo08d48d22017-06-29 19:42:49 -040036class DispatchError(object):
37 def __init__(self, error_code):
38 self.error_code = error_code
Zsolt Haraszti66862032016-11-28 14:28:39 -080039
khenaidoo08d48d22017-06-29 19:42:49 -040040
41class Dispatcher(object):
42 def __init__(self, core, instance_id, core_store_id, grpc_port):
Zsolt Haraszti66862032016-11-28 14:28:39 -080043 self.core = core
44 self.instance_id = instance_id
khenaidoo08d48d22017-06-29 19:42:49 -040045 self.core_store_id = core_store_id
46 self.grpc_port = grpc_port
Zsolt Haraszti66862032016-11-28 14:28:39 -080047 self.local_handler = None
khenaidoo08d48d22017-06-29 19:42:49 -040048 self.peers_map = dict()
49 self.grpc_conn_map = {}
Zsolt Haraszti66862032016-11-28 14:28:39 -080050
51 def start(self):
52 log.debug('starting')
53 self.local_handler = self.core.get_local_handler()
khenaidoo08d48d22017-06-29 19:42:49 -040054 reactor.callLater(0, self._start_tracking_peers)
Zsolt Haraszti66862032016-11-28 14:28:39 -080055 log.info('started')
56 return self
57
58 def stop(self):
59 log.debug('stopping')
60 log.info('stopped')
61
khenaidoo08d48d22017-06-29 19:42:49 -040062 @inlineCallbacks
63 def dispatch(self,
64 method_name,
65 request,
66 context,
67 core_id=None,
68 id=None,
69 broadcast=False):
70 """
71 Called whenever a global request is received from the NBI. The
72 request will be dispatch as follows:
73 1) to a specific voltha Instance if the core_id is specified
74 2) to the local Voltha Instance if the request specifies an ID that
75 matches the core id of the local Voltha instance
76 3) to a remote Voltha Instance if the request specifies an ID that
77 matches the core id of that voltha instance
78 4) to all Voltha Instances if it's a broadcast request,
79 e.g. getDevices, i.e. broadcast=True. The id may or may not be
80 None. In this case, the results will be returned as a list of
81 responses back to the global handler
82 5) to the local voltha instance if id=None and broadcast=False.
83 This occurs in cases where any Voltha instance will return the same
84 output, e.g. getAdapters()
85 :param method_name: rpc name
86 :param id: the id in the request, if present.
87 :param request: the input parameters
88 :param context: grpc context
89 :return: the response of that dispatching request
90 """
91 log.debug('start',
92 _method_name=method_name,
93 id=id,
94 request=request)
Zsolt Haraszti66862032016-11-28 14:28:39 -080095
khenaidoo08d48d22017-06-29 19:42:49 -040096 core_id_from_request_id = None
97 if id:
98 try:
99 core_id_from_request_id = get_core_id_from_device_id(id)
100 except Exception, e:
101 log.warning('invalid-id', request=request, id=id)
102 returnValue(DispatchError(StatusCode.NOT_FOUND))
103
104 try:
105 # Broadcast request if set
106 if broadcast:
107 # broadcast to all instances (including locally)
108 res = yield self._broadcast_request(method_name,
109 request,
110 context)
111 returnValue(res)
112
113 # Local Dispatch
114 elif (core_id and core_id == self.core_store_id) or (not id) or \
115 (core_id_from_request_id and (
116 (core_id_from_request_id == self.core_store_id) or
117 (is_broadcast_core_id(id))
118 )
119 ):
120 returnValue(self._local_dispatch(self.core_store_id,
121 method_name,
122 request,
123 context))
124 # Peer Dispatch
125 elif core_id_from_request_id:
126 res = yield self._dispatch_to_peer(core_id_from_request_id,
127 method_name,
128 request,
129 context)
130 returnValue(res)
131 else:
132 log.warning('invalid-request', request=request, id=id,
133 core_id=core_id, broadcast=broadcast)
134 returnValue(DispatchError(StatusCode.INVALID_ARGUMENT))
135
136 except Exception as e:
137 log.exception('remote-dispatch-exception', e=e)
138 returnValue(DispatchError(StatusCode.UNKNOWN))
139
140 def get_core_id_from_instance_id(self, instance_id):
141 """
142 :param instance_id: instance name
143 :return: core id of that instance
144 """
145 if instance_id == self.instance_id:
146 return self.core_store_id
147 for id, instance in self.peers_map.iteritems():
148 if instance['id'] == instance_id:
149 return id
150
151 def get_cluster_instances(self):
152 result = []
153 result.append(self.instance_id)
154 for id, instance in self.peers_map.iteritems():
155 result.append(instance['id'])
156 return result
Zsolt Haraszti66862032016-11-28 14:28:39 -0800157
158 def instance_id_by_logical_device_id(self, logical_device_id):
159 log.warning('temp-mapping-logical-device-id')
160 # TODO no true dispatchong yet, we blindly map everything to self
161 return self.instance_id
162
163 def instance_id_by_device_id(self, device_id):
164 log.warning('temp-mapping-logical-device-id')
165 # TODO no true dispatchong yet, we blindly map everything to self
166 return self.instance_id
khenaidoo08d48d22017-06-29 19:42:49 -0400167
168 @inlineCallbacks
169 def _broadcast_request(self, method_name, request, context):
170 # First get local result
171 result = self._local_dispatch(self.core_store_id,
172 method_name,
173 request,
174 context)
175 # Then get peers results
khenaidood3c335e2017-07-05 16:36:59 -0400176 current_responses = [result]
khenaidoo08d48d22017-06-29 19:42:49 -0400177 for core_id in self.peers_map:
khenaidood3c335e2017-07-05 16:36:59 -0400178 if core_id == self.core_store_id:
179 continue # already processed
180
khenaidoo08d48d22017-06-29 19:42:49 -0400181 if self.peers_map[core_id] and self.grpc_conn_map[core_id]:
182 res = yield self._dispatch_to_peer(core_id,
183 method_name,
184 request,
185 context)
186 if isinstance(res, DispatchError):
187 log.warning('ignoring-peer',
188 core_id=core_id,
189 error_code=res.error_code)
khenaidood3c335e2017-07-05 16:36:59 -0400190 elif res not in current_responses:
khenaidoo08d48d22017-06-29 19:42:49 -0400191 result.MergeFrom(res)
khenaidood3c335e2017-07-05 16:36:59 -0400192 current_responses.append(res)
khenaidoo08d48d22017-06-29 19:42:49 -0400193 returnValue(result)
194
195 def _local_dispatch(self, core_id, method_name, request, context):
196 log.debug('local-dispatch', core_id=core_id)
197 method = getattr(self.local_handler, method_name)
198 res = method(request, context=context)
199 log.debug('local-dispatch-result', res=res, context=context)
200 return res
201
202 @inlineCallbacks
203 def _start_tracking_peers(self):
204 try:
205 while True:
206 peers_map = yield registry('coordinator').recv_peers_map()
207 log.info('peers-map-changed', peers_map=peers_map)
208 yield self.update_grpc_client_map(peers_map)
209 self.peers_map = peers_map
210 except Exception, e:
211 log.exception('exception', e=e)
212
213 @inlineCallbacks
214 def update_grpc_client_map(self, peers_map):
215 try:
216 # 1. Get the list of connection to open and to close
217 to_open = dict()
218 to_close = set()
219 for id, instance in peers_map.iteritems():
220 # Check for no change
221 if id in self.peers_map and self.peers_map[id] == instance:
222 continue
223
224 if id not in self.peers_map:
225 if instance:
226 to_open[id] = instance['host']
227 elif instance:
228 to_open[id] = instance['host']
229 if self.peers_map[id]:
230 to_close.add(id)
231 else:
232 if self.peers_map[id]:
233 to_close.add(id)
234
235 # Close connections that are no longer referenced
236 old_ids = set(self.peers_map.keys()) - set(peers_map.keys())
237 for id in old_ids:
238 if self.peers_map[id]:
239 to_close.add(id)
240
241 # 2. Refresh the grpc connections
242 yield self._refresh_grpc_connections(to_open, to_close)
243 except Exception, e:
244 log.exception('exception', e=e)
245
246 @inlineCallbacks
247 def _refresh_grpc_connections(self, to_open, to_close):
248 try:
249 log.info('grpc-channel-refresh', to_open=to_open,
250 to_close=to_close)
251 # First open the connection
252 for id, host in to_open.iteritems():
253 if id in self.grpc_conn_map and self.grpc_conn_map[id]:
254 # clear connection
255 self._disconnect_from_peer(id)
256 if host:
257 self.grpc_conn_map[id] = \
258 yield self._connect_to_peer(host, self.grpc_port)
259
260 # Close the unused connection
261 for id in to_close:
262 if self.grpc_conn_map[id]:
263 # clear connection
264 self._disconnect_from_peer(id)
265 except Exception, e:
266 log.exception('exception', e=e)
267
268 @inlineCallbacks
269 def _connect_to_peer(self, host, port):
270 try:
271 channel = yield grpc.insecure_channel('{}:{}'.format(host, port))
272 log.info('grpc-channel-created-with-peer', peer=host)
273 returnValue(channel)
274 except Exception, e:
275 log.exception('exception', e=e)
276
277 def _disconnect_from_peer(self, peer_id):
278 try:
279 if self.grpc_conn_map[peer_id]:
280 # Let garbage collection clear the connect - no API exist to
281 # close the connection
282 # yield self.grpc_conn_map[peer_id].close()
283 self.grpc_conn_map[peer_id] = None
284 log.info('grpc-channel-closed-with-peer', peer_id=peer_id)
285 except Exception, e:
286 log.exception('exception', e=e)
287 finally:
288 self.grpc_conn_map.pop(peer_id)
289
290 @inlineCallbacks
291 def _reconnect_to_peer(self, peer_id):
292 try:
293 # First disconnect
294 yield self._disconnect_from_peer(peer_id)
295 # Then reconnect
296 peer_instance = self.peers_map.get(peer_id, None)
297 if peer_instance:
298 self.grpc_conn_map[peer_id] = \
299 yield self._connect_to_peer(peer_instance['host'],
300 self.grpc_port)
301 log.info('reconnected-to-peer', peer_id=peer_id)
302 returnValue(True)
303 else:
304 log.info('peer-unavailable', peer_id=peer_id)
305 except Exception, e:
306 log.exception('exception', e=e)
307 returnValue(False)
308
309 @inlineCallbacks
310 def _dispatch_to_peer(self,
311 core_id,
312 method_name,
313 request,
314 context,
315 retry=1):
316 """
317 Invoke a gRPC call to the remote server and return the response.
318 :param core_id: The voltha instance where this request needs to be sent
319 :param method_name: The method name inside the service stub
320 :param request: The request protobuf message
321 :param context: grprc context
322 :return: The response as a protobuf message
323 """
324 log.debug('peer-dispatch',
325 core_id=core_id,
326 _method_name=method_name,
327 request=request)
328
329 if core_id not in self.peers_map or not self.peers_map[core_id]:
330 log.exception('non-existent-core-id', core_id=core_id,
331 peers_map=self.peers_map)
332 return
333
334 try:
335 # Always request from the local service when making request to peer
336 stub = VolthaLocalServiceStub
337 method = getattr(stub(self.grpc_conn_map[core_id]), method_name)
338 response, rendezvous = yield method.with_call(request,
339 metadata=context.invocation_metadata())
340 log.debug('peer-response',
341 core_id=core_id,
342 response=response,
343 rendezvous_metadata=rendezvous.trailing_metadata())
344 # TODO: Should we return the metadata as well
345 returnValue(response)
346 except grpc._channel._Rendezvous, e:
347 code = e.code()
348 if code == grpc.StatusCode.UNAVAILABLE:
349 # Try to reconnect
350 status = self._reconnect_to_peer(core_id)
351 if status and retry > 0:
352 response = yield self._dispatch_to_peer(core_id,
353 method_name,
354 request,
355 context,
356 retry=retry - 1)
357 returnValue(response)
358 elif code in (
359 grpc.StatusCode.NOT_FOUND,
360 grpc.StatusCode.INVALID_ARGUMENT,
361 grpc.StatusCode.ALREADY_EXISTS,
362 grpc.StatusCode.UNAUTHENTICATED,
363 grpc.StatusCode.PERMISSION_DENIED):
364
365 pass # don't log error, these occur naturally
366
367 else:
368 log.exception('error-invoke', e=e)
369
370 log.warning('error-from-peer', code=code)
371 returnValue(DispatchError(code))