blob: f4c57f49dda30bacc36e9e149c178e72285bf506 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001#!/usr/bin/env python
2#
3# Copyright 2018 the original author or authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""Ponsim ONU Adapter main entry point"""
19
20import argparse
21import arrow
22import os
23import time
24
25import yaml
26from simplejson import dumps
27from twisted.internet.defer import inlineCallbacks, returnValue
28from twisted.internet.task import LoopingCall
29from zope.interface import implementer
30from adapters.protos import third_party
31from adapters.common.structlog_setup import setup_logging, update_logging
32from adapters.common.utils.dockerhelpers import get_my_containers_name
33from adapters.common.utils.nethelpers import get_my_primary_local_ipv4, \
34 get_my_primary_interface
35from adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
36from adapters.common.utils.registry import registry, IComponent
37from packaging.version import Version
38from adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, get_messaging_proxy
39from adapters.ponsim_onu.ponsim_onu import PonSimOnuAdapter
40from adapters.protos.adapter_pb2 import AdapterConfig, Adapter
41from adapters.kafka.adapter_request_facade import AdapterRequestFacade
42from adapters.kafka.core_proxy import CoreProxy
khenaidoo6fdf0ba2018-11-02 14:38:33 -040043
44from adapters.kafka.adapter_proxy import AdapterProxy
45
khenaidoob9203542018-09-17 22:56:37 -040046from adapters.common.utils.deferred_utils import TimeOutError
47from adapters.common.utils.asleep import asleep
48
49_ = third_party
50
51defs = dict(
52 version_file='./VERSION',
53 config=os.environ.get('CONFIG', './ponsim_onu.yml'),
54 container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
55 '0-9]+)\..*$'),
56 consul=os.environ.get('CONSUL', 'localhost:8500'),
57 name=os.environ.get('NAME', 'ponsim_onu'),
58 vendor=os.environ.get('VENDOR', 'Voltha Project'),
59 device_type=os.environ.get('DEVICE_TYPE', 'ponsim_onu'),
60 accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
61 accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
62 etcd=os.environ.get('ETCD', 'localhost:2379'),
63 core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
64 interface=os.environ.get('INTERFACE', get_my_primary_interface()),
65 instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
66 kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
67 kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
68 backend=os.environ.get('BACKEND', 'none'),
69 retry_interval=os.environ.get('RETRY_INTERVAL', 2),
70 heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
71)
72
73
74def parse_args():
75 parser = argparse.ArgumentParser()
76
77 _help = ('Path to ponsim_onu.yml config file (default: %s). '
78 'If relative, it is relative to main.py of ponsim adapter.'
79 % defs['config'])
80 parser.add_argument('-c', '--config',
81 dest='config',
82 action='store',
83 default=defs['config'],
84 help=_help)
85
86 _help = 'Regular expression for extracting conatiner number from ' \
87 'container name (default: %s)' % defs['container_name_regex']
88 parser.add_argument('-X', '--container-number-extractor',
89 dest='container_name_regex',
90 action='store',
91 default=defs['container_name_regex'],
92 help=_help)
93
94 _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
95 parser.add_argument('-C', '--consul',
96 dest='consul',
97 action='store',
98 default=defs['consul'],
99 help=_help)
100
101 _help = 'name of this adapter (default: %s)' % defs['name']
102 parser.add_argument('-na', '--name',
103 dest='name',
104 action='store',
105 default=defs['name'],
106 help=_help)
107
108 _help = 'vendor of this adapter (default: %s)' % defs['vendor']
109 parser.add_argument('-ven', '--vendor',
110 dest='vendor',
111 action='store',
112 default=defs['vendor'],
113 help=_help)
114
115 _help = 'supported device type of this adapter (default: %s)' % defs[
116 'device_type']
117 parser.add_argument('-dt', '--device_type',
118 dest='device_type',
119 action='store',
120 default=defs['device_type'],
121 help=_help)
122
123 _help = 'specifies whether the device type accepts bulk flow updates ' \
124 'adapter (default: %s)' % defs['accept_bulk_flow']
125 parser.add_argument('-abf', '--accept_bulk_flow',
126 dest='accept_bulk_flow',
127 action='store',
128 default=defs['accept_bulk_flow'],
129 help=_help)
130
131 _help = 'specifies whether the device type accepts add/remove flow ' \
132 '(default: %s)' % defs['accept_atomic_flow']
133 parser.add_argument('-aaf', '--accept_atomic_flow',
134 dest='accept_atomic_flow',
135 action='store',
136 default=defs['accept_atomic_flow'],
137 help=_help)
138
139 _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
140 parser.add_argument('-e', '--etcd',
141 dest='etcd',
142 action='store',
143 default=defs['etcd'],
144 help=_help)
145
146 _help = ('unique string id of this container instance (default: %s)'
147 % defs['instance_id'])
148 parser.add_argument('-i', '--instance-id',
149 dest='instance_id',
150 action='store',
151 default=defs['instance_id'],
152 help=_help)
153
154 _help = 'ETH interface to recieve (default: %s)' % defs['interface']
155 parser.add_argument('-I', '--interface',
156 dest='interface',
157 action='store',
158 default=defs['interface'],
159 help=_help)
160
161 _help = 'omit startup banner log lines'
162 parser.add_argument('-n', '--no-banner',
163 dest='no_banner',
164 action='store_true',
165 default=False,
166 help=_help)
167
168 _help = 'do not emit periodic heartbeat log messages'
169 parser.add_argument('-N', '--no-heartbeat',
170 dest='no_heartbeat',
171 action='store_true',
172 default=False,
173 help=_help)
174
175 _help = "suppress debug and info logs"
176 parser.add_argument('-q', '--quiet',
177 dest='quiet',
178 action='count',
179 help=_help)
180
181 _help = 'enable verbose logging'
182 parser.add_argument('-v', '--verbose',
183 dest='verbose',
184 action='count',
185 help=_help)
186
187 _help = ('use docker container name as conatiner instance id'
188 ' (overrides -i/--instance-id option)')
189 parser.add_argument('--instance-id-is-container-name',
190 dest='instance_id_is_container_name',
191 action='store_true',
192 default=False,
193 help=_help)
194
195 _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
196 'If not '
197 'specified (None), the address from the config file is used'
198 % defs['kafka_adapter'])
199 parser.add_argument('-KA', '--kafka_adapter',
200 dest='kafka_adapter',
201 action='store',
202 default=defs['kafka_adapter'],
203 help=_help)
204
205 _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
206 'If not '
207 'specified (None), the address from the config file is used'
208 % defs['kafka_cluster'])
209 parser.add_argument('-KC', '--kafka_cluster',
210 dest='kafka_cluster',
211 action='store',
212 default=defs['kafka_cluster'],
213 help=_help)
214
215 _help = 'backend to use for config persitence'
216 parser.add_argument('-b', '--backend',
217 default=defs['backend'],
218 choices=['none', 'consul', 'etcd'],
219 help=_help)
220
221 _help = 'topic of core on the kafka bus'
222 parser.add_argument('-ct', '--core_topic',
223 dest='core_topic',
224 action='store',
225 default=defs['core_topic'],
226 help=_help)
227
228 args = parser.parse_args()
229
230 # post-processing
231
232 if args.instance_id_is_container_name:
233 args.instance_id = get_my_containers_name()
234
235 return args
236
237
238def load_config(args):
239 path = args.config
240 if path.startswith('.'):
241 dir = os.path.dirname(os.path.abspath(__file__))
242 path = os.path.join(dir, path)
243 path = os.path.abspath(path)
244 with open(path) as fd:
245 config = yaml.load(fd)
246 return config
247
248
249def print_banner(log):
250 log.info(' ____ _ ___ _ _ _ _ ')
251 log.info('| _ \ ___ _ __ ___(_)_ __ ___ / _ \| \ | | | | | ')
252 log.info('| |_) / _ \| \'_ \/ __| | \'_ ` _ \ | | | | \| | | | | ')
253 log.info('| __/ (_) | | | \__ \ | | | | | | | |_| | |\ | |_| | ')
254 log.info('|_| \___/|_| |_|___/_|_| |_| |_| \___/|_| \_|\___/ ')
255 log.info(' _ _ _ ')
256 log.info(' / \ __| | __ _ _ __ | |_ ___ _ __ ')
257 log.info(' / _ \ / _` |/ _` | \'_ \| __/ _ \ \'__| ')
258 log.info(' / ___ \ (_| | (_| | |_) | || __/ | ')
259 log.info('/_/ \_\__,_|\__,_| .__/ \__\___|_| ')
260 log.info(' |_| ')
261 log.info('(to stop: press Ctrl-C)')
262
263
264@implementer(IComponent)
265class Main(object):
266
267 def __init__(self):
268
269 self.args = args = parse_args()
270 self.config = load_config(args)
271
272 verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
273 self.log = setup_logging(self.config.get('logging', {}),
274 args.instance_id,
275 verbosity_adjust=verbosity_adjust)
276 self.log.info('container-number-extractor',
277 regex=args.container_name_regex)
278
279 self.ponsim_olt_adapter_version = self.get_version()
280 self.log.info('Ponsim-ONU-Adapter-Version', version=
281 self.ponsim_olt_adapter_version)
282
283 if not args.no_banner:
284 print_banner(self.log)
285
286 # Create a unique instance id using the passed-in instance id and
287 # UTC timestamp
288 current_time = arrow.utcnow().timestamp
289 self.instance_id = self.args.instance_id + '_' + str(current_time)
290
291 self.core_topic = args.core_topic
292 self.listening_topic = args.name
293 self.startup_components()
294
295 if not args.no_heartbeat:
296 self.start_heartbeat()
297 self.start_kafka_cluster_heartbeat(self.instance_id)
298
299 def get_version(self):
300 path = defs['version_file']
301 if not path.startswith('/'):
302 dir = os.path.dirname(os.path.abspath(__file__))
303 path = os.path.join(dir, path)
304
305 path = os.path.abspath(path)
306 version_file = open(path, 'r')
307 v = version_file.read()
308
309 # Use Version to validate the version string - exception will be raised
310 # if the version is invalid
311 Version(v)
312
313 version_file.close()
314 return v
315
316 def start(self):
317 self.start_reactor() # will not return except Keyboard interrupt
318
319 def stop(self):
320 pass
321
322 def get_args(self):
323 """Allow access to command line args"""
324 return self.args
325
326 def get_config(self):
327 """Allow access to content of config file"""
328 return self.config
329
330 def _get_adapter_config(self):
331 cfg = AdapterConfig()
332 return cfg
333
334 @inlineCallbacks
335 def startup_components(self):
336 try:
337 self.log.info('starting-internal-components',
338 consul=self.args.consul,
339 etcd=self.args.etcd)
340
341 registry.register('main', self)
342
343 # Update the logger to output the vcore id.
344 self.log = update_logging(instance_id=self.instance_id,
345 vcore_id=None)
346
347 yield registry.register(
348 'kafka_cluster_proxy',
349 KafkaProxy(
350 self.args.consul,
351 self.args.kafka_cluster,
352 config=self.config.get('kafka-cluster-proxy', {})
353 )
354 ).start()
355
356 config = self._get_adapter_config()
357
358 self.core_proxy = CoreProxy(
359 kafka_proxy=None,
360 core_topic=self.core_topic,
361 my_listening_topic=self.listening_topic)
362
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400363 self.adapter_proxy = AdapterProxy(
364 kafka_proxy=None,
365 core_topic=self.core_topic,
366 my_listening_topic=self.listening_topic)
367
khenaidoob9203542018-09-17 22:56:37 -0400368 ponsim_onu_adapter = PonSimOnuAdapter(
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400369 core_proxy=self.core_proxy, adapter_proxy=self.adapter_proxy, config=config)
khenaidoob9203542018-09-17 22:56:37 -0400370 ponsim_request_handler = AdapterRequestFacade(
371 adapter=ponsim_onu_adapter)
372
373 yield registry.register(
374 'kafka_adapter_proxy',
375 IKafkaMessagingProxy(
376 kafka_host_port=self.args.kafka_adapter,
377 # TODO: Add KV Store object reference
378 kv_store=self.args.backend,
379 default_topic=self.args.name,
380 target_cls=ponsim_request_handler
381 )
382 ).start()
383
384 self.core_proxy.kafka_proxy = get_messaging_proxy()
khenaidoo6fdf0ba2018-11-02 14:38:33 -0400385 self.adapter_proxy.kafka_proxy = get_messaging_proxy()
khenaidoob9203542018-09-17 22:56:37 -0400386
387 # retry for ever
388 res = yield self._register_with_core(-1)
389
390 self.log.info('started-internal-services')
391
392 except Exception as e:
393 self.log.exception('Failure-to-start-all-components', e=e)
394
395 @inlineCallbacks
396 def shutdown_components(self):
397 """Execute before the reactor is shut down"""
398 self.log.info('exiting-on-keyboard-interrupt')
399 for component in reversed(registry.iterate()):
400 yield component.stop()
401
402 import threading
403 self.log.info('THREADS:')
404 main_thread = threading.current_thread()
405 for t in threading.enumerate():
406 if t is main_thread:
407 continue
408 if not t.isDaemon():
409 continue
410 self.log.info('joining thread {} {}'.format(
411 t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
412 t.join()
413
414 def start_reactor(self):
415 from twisted.internet import reactor
416 reactor.callWhenRunning(
417 lambda: self.log.info('twisted-reactor-started'))
418 reactor.addSystemEventTrigger('before', 'shutdown',
419 self.shutdown_components)
420 reactor.run()
421
422 @inlineCallbacks
423 def _register_with_core(self, retries):
424 # Send registration to Core with adapter specs
425 adapter = Adapter()
426 adapter.id = self.args.name
427 adapter.vendor = self.args.name
428 adapter.version = self.ponsim_olt_adapter_version
429 while 1:
430 try:
431 resp = yield self.core_proxy.register(adapter)
432 self.log.info('registration-response', response=resp)
433 returnValue(resp)
434 except TimeOutError as e:
435 self.log.warn("timeout-when-registering-with-core", e=e)
436 if retries == 0:
437 self.log.exception("no-more-retries", e=e)
438 raise
439 else:
440 retries = retries if retries < 0 else retries - 1
441 yield asleep(defs['retry_interval'])
442 except Exception as e:
443 self.log.exception("failed-registration", e=e)
444 raise
445
446 def start_heartbeat(self):
447
448 t0 = time.time()
449 t0s = time.ctime(t0)
450
451 def heartbeat():
452 self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
453
454 lc = LoopingCall(heartbeat)
455 lc.start(10)
456
457 # Temporary function to send a heartbeat message to the external kafka
458 # broker
459 def start_kafka_cluster_heartbeat(self, instance_id):
460 # For heartbeat we will send a message to a specific "voltha-heartbeat"
461 # topic. The message is a protocol buf
462 # message
463 message = dict(
464 type='heartbeat',
465 adapter=self.args.name,
466 instance=instance_id,
467 ip=get_my_primary_local_ipv4()
468 )
469 topic = defs['heartbeat_topic']
470
471 def send_msg(start_time):
472 try:
473 kafka_cluster_proxy = get_kafka_proxy()
474 if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
475 # self.log.debug('kafka-proxy-available')
476 message['ts'] = arrow.utcnow().timestamp
477 message['uptime'] = time.time() - start_time
478 # self.log.debug('start-kafka-heartbeat')
479 kafka_cluster_proxy.send_message(topic, dumps(message))
480 else:
481 self.log.error('kafka-proxy-unavailable')
482 except Exception, e:
483 self.log.exception('failed-sending-message-heartbeat', e=e)
484
485 try:
486 t0 = time.time()
487 lc = LoopingCall(send_msg, t0)
488 lc.start(10)
489 except Exception, e:
490 self.log.exception('failed-kafka-heartbeat', e=e)
491
492
493if __name__ == '__main__':
494 Main().start()