Bug fixes
Change-Id: I67349475a93d523795bbeeb28e45a0d3e416028f
diff --git a/kafka/kafka-consumer.py b/kafka/kafka-consumer.py
index 7ed354d..3f405d8 100644
--- a/kafka/kafka-consumer.py
+++ b/kafka/kafka-consumer.py
@@ -20,7 +20,7 @@
class ConsumerExample(object):
- def __init__(self, consul_endpoint, topic='voltha-heartbeat', runtime=60):
+ def __init__(self, consul_endpoint, topic="heartbeat.voltha", runtime=60):
self.topic = topic
self.runtime = runtime
self.kafka_endpoint = get_endpoint_from_consul(consul_endpoint,
@@ -55,6 +55,7 @@
c = Consumer(self._client, self.topic, partition,
self.msg_processor)
self._consumer_list.append(c)
+ # log.info('consumer-started', topic=self.topic, partition=partition)
d = c.start(OFFSET_LATEST)
d.addBoth(_note_consumer_stopped, c)
self._consumer_d_list.append(d)
@@ -93,7 +94,8 @@
def msg_processor(self, consumer, msglist):
for msg in msglist:
- log.info('proc', msg=msg)
+ log.info(msg)
+
def parse_options():
parser = ArgumentParser("Consume kafka messages")
@@ -103,7 +105,7 @@
parser.add_argument("-t", "--topic",
help="topic to listen from",
- default='voltha-heartbeat')
+ default="heartbeat.voltha")
parser.add_argument("-r", "--runtime",
help="total runtime",
@@ -117,6 +119,7 @@
'%(levelname)s:%(process)d:%(message)s',
level=logging.INFO
)
+
args = parse_options()
consumer_example = ConsumerExample(args.consul, args.topic,