blob: f00b06e75ec74da4c9a8c533bd8b7b81dd6b2256 [file] [log] [blame]
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -07001#
Zsolt Harasztiaccad4a2017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
Zsolt Harasztie7b60762016-10-05 17:49:27 -070018gRPC client meant to connect to a gRPC server endpoint, and query the
Zsolt Haraszticba96de2016-11-06 14:04:55 -080019end-point's schema by calling SchemaService.Schema(Empty) and all of its
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070020semantics are derived from the recovered schema.
21"""
Zsolt Harasztie7b60762016-10-05 17:49:27 -070022
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070023import os
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070024import sys
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070025from random import randint
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070026from zlib import decompress
27
Scott Baker6ee1a292018-02-22 17:13:16 -080028import functools
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -070029import grpc
30from consul import Consul
Zsolt Harasztie7b60762016-10-05 17:49:27 -070031from grpc._channel import _Rendezvous
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -070032from structlog import get_logger
Zsolt Harasztie7b60762016-10-05 17:49:27 -070033from twisted.internet import reactor
Zsolt Haraszti2aac6232016-11-23 11:18:23 -080034from twisted.internet.defer import inlineCallbacks, returnValue
Zsolt Harasztie7b60762016-10-05 17:49:27 -070035from werkzeug.exceptions import ServiceUnavailable
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -070036
Scott Baker80a36b72018-03-12 18:55:53 -070037from chameleon.protos.schema_pb2_grpc import SchemaServiceStub
Zsolt Haraszticba96de2016-11-06 14:04:55 -080038from google.protobuf.empty_pb2 import Empty
39
alshabib6ca05622017-01-31 14:08:36 -080040from chameleon.utils.asleep import asleep
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070041
42log = get_logger()
43
44
45class GrpcClient(object):
Zsolt Harasztie7b60762016-10-05 17:49:27 -070046 """
47 Connect to a gRPC server, fetch its schema, and process the downloaded
48 schema files to drive the customization of the north-bound interface(s)
49 of Chameleon.
50 """
51 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070052
Zsolt Harasztidca6fa12016-11-03 16:56:17 -070053 def __init__(self, consul_endpoint, work_dir, endpoint='localhost:50055',
Scott Baker73784f12018-02-20 12:05:35 -080054 reconnect_callback=None, credentials=None, restart_on_disconnect=False):
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070055 self.consul_endpoint = consul_endpoint
56 self.endpoint = endpoint
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -070057 self.work_dir = work_dir
Zsolt Harasztidca6fa12016-11-03 16:56:17 -070058 self.reconnect_callback = reconnect_callback
Scott Baker74548db2017-02-15 14:16:13 -080059 self.credentials = credentials
Scott Baker73784f12018-02-20 12:05:35 -080060 self.restart_on_disconnect = restart_on_disconnect
Zsolt Harasztidca6fa12016-11-03 16:56:17 -070061
Daniele Moro538eba72020-02-12 22:07:24 -080062 self.google_api_dir = os.path.abspath(os.path.join(
63 os.path.dirname(__file__), '../protos'))
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -070064 self.plugin_dir = os.path.abspath(os.path.join(
65 os.path.dirname(__file__), '../protoc_plugins'))
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070066
67 self.channel = None
68 self.schema = None
Zsolt Harasztie7b60762016-10-05 17:49:27 -070069 self.retries = 0
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070070 self.shutting_down = False
Zsolt Haraszti3cf36342016-10-05 20:40:19 -070071 self.connected = False
Scott Baker73784f12018-02-20 12:05:35 -080072 self.was_connected = False
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070073
Zsolt Harasztidca6fa12016-11-03 16:56:17 -070074 def start(self):
75 log.debug('starting')
Zsolt Haraszti3cf36342016-10-05 20:40:19 -070076 if not self.connected:
77 reactor.callLater(0, self.connect)
Zsolt Harasztidca6fa12016-11-03 16:56:17 -070078 log.info('started')
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070079 return self
80
Zsolt Harasztidca6fa12016-11-03 16:56:17 -070081 def stop(self):
82 log.debug('stopping')
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070083 if self.shutting_down:
84 return
85 self.shutting_down = True
Zsolt Harasztidca6fa12016-11-03 16:56:17 -070086 log.info('stopped')
87
88 def set_reconnect_callback(self, reconnect_callback):
89 self.reconnect_callback = reconnect_callback
90 return self
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070091
Scott Baker6ee1a292018-02-22 17:13:16 -080092 def connectivity_callback(self, client, connectivity):
Scott Baker80a36b72018-03-12 18:55:53 -070093 if (self.was_connected) and (connectivity in [connectivity.TRANSIENT_FAILURE, connectivity.SHUTDOWN]):
Scott Baker73784f12018-02-20 12:05:35 -080094 log.info("connectivity lost -- restarting")
95 os.execv(sys.executable, ['python'] + sys.argv)
96
97 if (connectivity == connectivity.READY):
98 self.was_connected = True
99
Scott Baker6ee1a292018-02-22 17:13:16 -0800100 # Sometimes gRPC transitions from READY to IDLE, skipping TRANSIENT_FAILURE even though a socket is
101 # disconnected. So on idle, force a connectivity check.
102 if (connectivity == connectivity.IDLE) and (self.was_connected):
103 connectivity = client.channel._channel.check_connectivity_state(True)
104 # The result will probably show IDLE, but passing in True has the side effect of reconnecting if the
105 # connection has been lost, which will trigger the TRANSIENT_FALURE we were looking for.
106
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700107 @inlineCallbacks
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700108 def connect(self):
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700109 """
110 (Re-)Connect to end-point
111 """
112
Zsolt Haraszti3cf36342016-10-05 20:40:19 -0700113 if self.shutting_down or self.connected:
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700114 return
115
116 try:
117 if self.endpoint.startswith('@'):
Zsolt Haraszti267a9062016-12-26 23:11:15 -0800118 _endpoint = yield self._get_endpoint_from_consul(
119 self.endpoint[1:])
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700120 else:
121 _endpoint = self.endpoint
122
Scott Baker74548db2017-02-15 14:16:13 -0800123 if self.credentials:
124 log.info('securely connecting', endpoint=_endpoint)
125 self.channel = grpc.secure_channel(_endpoint, self.credentials)
126 else:
127 log.info('insecurely connecting', endpoint=_endpoint)
128 self.channel = grpc.insecure_channel(_endpoint)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700129
Scott Baker73784f12018-02-20 12:05:35 -0800130 if self.restart_on_disconnect:
Scott Baker6ee1a292018-02-22 17:13:16 -0800131 connectivity_callback = functools.partial(self.connectivity_callback, self)
132 self.channel.subscribe(connectivity_callback)
Scott Baker73784f12018-02-20 12:05:35 -0800133
Zsolt Haraszti21980762016-11-08 10:57:19 -0800134 swagger_from = self._retrieve_schema()
135 self._compile_proto_files(swagger_from)
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700136 self._clear_backoff()
137
Zsolt Haraszti3cf36342016-10-05 20:40:19 -0700138 self.connected = True
Zsolt Harasztidca6fa12016-11-03 16:56:17 -0700139 if self.reconnect_callback is not None:
140 reactor.callLater(0, self.reconnect_callback)
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700141
142 return
143
Zack Williams7eb36d02019-03-19 07:16:12 -0700144 except _Rendezvous as e:
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700145 if e.code() == grpc.StatusCode.UNAVAILABLE:
146 log.info('grpc-endpoint-not-available')
147 else:
Sapan Bhatiadd273342017-08-30 15:27:55 -0400148 log.exception('rendezvous error', e=e)
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700149 yield self._backoff('not-available')
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700150
Zack Williams7eb36d02019-03-19 07:16:12 -0700151 except Exception as e:
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700152 if not self.shutting_down:
153 log.exception('cannot-connect', endpoint=_endpoint)
154 yield self._backoff('unknown-error')
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700155
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700156 reactor.callLater(0, self.connect)
157
158 def _backoff(self, msg):
159 wait_time = self.RETRY_BACKOFF[min(self.retries,
160 len(self.RETRY_BACKOFF) - 1)]
161 self.retries += 1
162 log.error(msg, retry_in=wait_time)
163 return asleep(wait_time)
164
165 def _clear_backoff(self):
166 if self.retries:
167 log.info('reconnected', after_retries=self.retries)
168 self.retries = 0
169
Zsolt Haraszti267a9062016-12-26 23:11:15 -0800170 @inlineCallbacks
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700171 def _get_endpoint_from_consul(self, service_name):
172 """
173 Look up an appropriate grpc endpoint (host, port) from
174 consul, under the service name specified by service-name
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700175 """
176 host = self.consul_endpoint.split(':')[0].strip()
177 port = int(self.consul_endpoint.split(':')[1].strip())
178
Zsolt Haraszti267a9062016-12-26 23:11:15 -0800179 while True:
180 log.debug('consul-lookup', host=host, port=port)
181 consul = Consul(host=host, port=port)
182 _, services = consul.catalog.service(service_name)
183 log.debug('consul-response', services=services)
184 if services:
185 break
186 log.warning('no-service', consul_host=host, consul_port=port,
187 service_name=service_name)
188 yield asleep(1.0)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700189
alshabibc8224902017-01-26 11:59:52 -0800190 # pick local addresses when resolving a service via consul
191 # see CORD-815 (https://jira.opencord.org/browse/CORD-815)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700192
193 service = services[randint(0, len(services) - 1)]
194 endpoint = '{}:{}'.format(service['ServiceAddress'],
195 service['ServicePort'])
Zsolt Haraszti267a9062016-12-26 23:11:15 -0800196 returnValue(endpoint)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700197
198 def _retrieve_schema(self):
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700199 """
200 Retrieve schema from gRPC end-point, and save all *.proto files in
201 the work directory.
202 """
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700203 assert isinstance(self.channel, grpc.Channel)
204 stub = SchemaServiceStub(self.channel)
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700205 # try:
Scott Bakere958b962018-06-12 16:46:45 -0700206 schemas = stub.GetSchema(Empty(), timeout=120)
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700207 # except _Rendezvous, e:
208 # if e.code == grpc.StatusCode.UNAVAILABLE:
209 #
210 # else:
211 # raise e
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700212
213 os.system('mkdir -p %s' % self.work_dir)
214 os.system('rm -fr /tmp/%s/*' %
215 self.work_dir.replace('/tmp/', '')) # safer
216
Zsolt Harasztidca6fa12016-11-03 16:56:17 -0700217 for proto_file in schemas.protos:
218 proto_fname = proto_file.file_name
219 proto_content = proto_file.proto
220 log.debug('saving-proto', fname=proto_fname, dir=self.work_dir,
221 length=len(proto_content))
222 with open(os.path.join(self.work_dir, proto_fname), 'w') as f:
223 f.write(proto_content)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700224
Zsolt Harasztidca6fa12016-11-03 16:56:17 -0700225 desc_content = decompress(proto_file.descriptor)
226 desc_fname = proto_fname.replace('.proto', '.desc')
227 log.debug('saving-descriptor', fname=desc_fname, dir=self.work_dir,
228 length=len(desc_content))
229 with open(os.path.join(self.work_dir, desc_fname), 'wb') as f:
230 f.write(desc_content)
Zsolt Haraszti21980762016-11-08 10:57:19 -0800231 return schemas.swagger_from
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700232
Zsolt Haraszti21980762016-11-08 10:57:19 -0800233 def _compile_proto_files(self, swagger_from):
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700234 """
235 For each *.proto file in the work directory, compile the proto
236 file into the respective *_pb2.py file as well as generate the
237 web server gateway python file *_gw.py.
238 :return: None
239 """
Zsolt Haraszti46c72002016-10-10 09:55:30 -0700240
241 chameleon_base_dir = os.path.abspath(os.path.join(
242 os.path.dirname(__file__), '../..'
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700243 ))
244
245 for fname in [f for f in os.listdir(self.work_dir)
246 if f.endswith('.proto')]:
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700247
Zsolt Haraszti21980762016-11-08 10:57:19 -0800248 need_swagger = fname == swagger_from
249 log.debug('compiling', file=fname, need_swagger=need_swagger)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700250 cmd = (
251 'cd %s && '
Zsolt Haraszti05b837a2016-10-05 00:18:57 -0700252 'env PATH=%s PYTHONPATH=%s '
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700253 'python -m grpc.tools.protoc '
Daniele Moro538eba72020-02-12 22:07:24 -0800254 '-I. -I %s '
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700255 '--python_out=. '
256 '--grpc_python_out=. '
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -0700257 '--plugin=protoc-gen-gw=%s/gw_gen.py '
258 '--gw_out=. '
Zsolt Haraszti46c72002016-10-10 09:55:30 -0700259 '--plugin=protoc-gen-swagger=%s/swagger_gen.py '
Zsolt Haraszti21980762016-11-08 10:57:19 -0800260 '%s'
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -0700261 '%s' % (
262 self.work_dir,
263 ':'.join([os.environ['PATH'], self.plugin_dir]),
Zack Williamsf97bf092018-03-22 21:27:28 -0700264 chameleon_base_dir,
Daniele Moro538eba72020-02-12 22:07:24 -0800265 self.google_api_dir,
Zsolt Haraszti46c72002016-10-10 09:55:30 -0700266 self.plugin_dir,
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -0700267 self.plugin_dir,
Zsolt Haraszti21980762016-11-08 10:57:19 -0800268 '--swagger_out=. ' if need_swagger else '',
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -0700269 fname)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700270 )
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700271 log.debug('executing', cmd=cmd, file=fname)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700272 os.system(cmd)
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700273 log.info('compiled', file=fname)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700274
275 # test-load each _pb2 file to see all is right
276 if self.work_dir not in sys.path:
277 sys.path.insert(0, self.work_dir)
278
279 for fname in [f for f in os.listdir(self.work_dir)
280 if f.endswith('_pb2.py')]:
281 modname = fname[:-len('.py')]
282 log.debug('test-import', modname=modname)
283 _ = __import__(modname)
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -0700284
Zsolt Haraszti2aac6232016-11-23 11:18:23 -0800285 @inlineCallbacks
Zsolt Harasztic8cfdf32016-11-28 14:28:39 -0800286 def invoke(self, stub, method_name, request, metadata, retry=1):
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700287 """
288 Invoke a gRPC call to the remote server and return the response.
289 :param stub: Reference to the *_pb2 service stub
290 :param method_name: The method name inside the service stub
291 :param request: The request protobuf message
Zsolt Harasztic8cfdf32016-11-28 14:28:39 -0800292 :param metadata: [(str, str), (str, str), ...]
293 :return: The response protobuf message and returned trailing metadata
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700294 """
295
Zsolt Haraszti3cf36342016-10-05 20:40:19 -0700296 if not self.connected:
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700297 raise ServiceUnavailable()
298
299 try:
Zsolt Harasztic8cfdf32016-11-28 14:28:39 -0800300 method = getattr(stub(self.channel), method_name)
301 response, rendezvous = method.with_call(request, metadata=metadata)
302 returnValue((response, rendezvous.trailing_metadata()))
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700303
Zack Williams7eb36d02019-03-19 07:16:12 -0700304 except grpc._channel._Rendezvous as e:
Zsolt Harasztic8cfdf32016-11-28 14:28:39 -0800305 code = e.code()
306 if code == grpc.StatusCode.UNAVAILABLE:
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700307 e = ServiceUnavailable()
Zsolt Haraszti2aac6232016-11-23 11:18:23 -0800308
309 if self.connected:
310 self.connected = False
311 yield self.connect()
312 if retry > 0:
313 response = yield self.invoke(stub, method_name,
Zsolt Harasztic8cfdf32016-11-28 14:28:39 -0800314 request, metadata,
Zsolt Haraszti2aac6232016-11-23 11:18:23 -0800315 retry=retry - 1)
316 returnValue(response)
317
Zsolt Harasztic8cfdf32016-11-28 14:28:39 -0800318 elif code in (
319 grpc.StatusCode.NOT_FOUND,
320 grpc.StatusCode.INVALID_ARGUMENT,
Matteo Scandolo92f7f1f2017-04-28 15:56:29 -0700321 grpc.StatusCode.ALREADY_EXISTS,
322 grpc.StatusCode.UNAUTHENTICATED,
323 grpc.StatusCode.PERMISSION_DENIED):
Zsolt Harasztic8cfdf32016-11-28 14:28:39 -0800324
325 pass # don't log error, these occur naturally
326
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700327 else:
328 log.exception(e)
329
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700330 raise e