This update consists of the following changes:
1) Add GroupConsumer to the Go sarama_client and modify the Core
code to use a groupConsumer instead of a partition consumer. This
change will ensure that multiple consumers (with different group Ids)
can consume kafka messages from the same topic.
2) Remove afkak kafka client and replace it with confluent kakfa,
a change done in voltha 1.x. Modify the code accordingly.
3) Add a Group Consumer to the Python kakfa client such that
several instances of an Adapter can consume the same messages from
the same topic.
4) Set the datapath_id for the logical device in the Core.
Change-Id: I5d7ced27c9aeca4f6211baa3dc8cb3db861545e4
diff --git a/python/adapters/kafka/kafka_inter_container_library.py b/python/adapters/kafka/kafka_inter_container_library.py
index fbb0834..5cad2e8 100644
--- a/python/adapters/kafka/kafka_inter_container_library.py
+++ b/python/adapters/kafka/kafka_inter_container_library.py
@@ -18,8 +18,6 @@
from uuid import uuid4
import structlog
-from afkak.client import KafkaClient
-from afkak.consumer import OFFSET_LATEST, Consumer
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, \
DeferredQueue, gatherResults
@@ -34,6 +32,9 @@
log = structlog.get_logger()
+KAFKA_OFFSET_LATEST = 'latest'
+KAFKA_OFFSET_EARLIEST = 'earliest'
+
class KafkaMessagingError(BaseException):
def __init__(self, error):
@@ -48,6 +49,7 @@
kafka_host_port,
kv_store,
default_topic,
+ group_id_prefix,
target_cls):
"""
Initialize the kafka proxy. This is a singleton (may change to
@@ -67,15 +69,15 @@
self.kafka_host_port = kafka_host_port
self.kv_store = kv_store
self.default_topic = default_topic
+ self.default_group_id = "_".join((group_id_prefix, default_topic))
self.target_cls = target_cls
self.topic_target_cls_map = {}
- self.topic_consumer_map = {}
self.topic_callback_map = {}
self.subscribers = {}
- self.kafka_client = None
self.kafka_proxy = None
self.transaction_id_deferred_map = {}
self.received_msg_queue = DeferredQueue()
+ self.stopped = False
self.init_time = 0
self.init_received_time = 0
@@ -91,11 +93,7 @@
def start(self):
try:
- # Create the kafka client
- # assert self.kafka_host is not None
- # assert self.kafka_port is not None
- # kafka_host_port = ":".join((self.kafka_host, self.kafka_port))
- self.kafka_client = KafkaClient(self.kafka_host_port)
+ log.debug("KafkaProxy-starting")
# Get the kafka proxy instance. If it does not exist then
# create it
@@ -110,12 +108,17 @@
# Start the queue to handle incoming messages
reactor.callLater(0, self._received_message_processing_loop)
- # Start listening for incoming messages
- reactor.callLater(0, self.subscribe, self.default_topic,
- target_cls=self.target_cls)
+ # Subscribe using the default topic and default group id. Whenever
+ # a message is received on that topic then teh target_cls will be
+ # invoked.
+ reactor.callLater(0, self.subscribe,
+ topic=self.default_topic,
+ target_cls=self.target_cls,
+ group_id=self.default_group_id)
# Setup the singleton instance
IKafkaMessagingProxy._kafka_messaging_instance = self
+ log.debug("KafkaProxy-started")
except Exception as e:
log.exception("Failed-to-start-proxy", e=e)
@@ -126,37 +129,14 @@
"""
log.debug("Stopping-messaging-proxy ...")
try:
- # Stop all the consumers
- deferred_list = []
- for key, values in self.topic_consumer_map.iteritems():
- deferred_list.extend([c.stop() for c in values])
-
- if not deferred_list:
- d = gatherResults(deferred_list)
- d.addCallback(lambda result: self.kafka_client.close())
+ # Stop the kafka proxy. This will stop all the consumers
+ # and producers
+ self.stopped = True
+ self.kafka_proxy.stop()
log.debug("Messaging-proxy-stopped.")
except Exception as e:
log.exception("Exception-when-stopping-messaging-proxy:", e=e)
-
- @inlineCallbacks
- def create_topic(self, topic):
- yield self._wait_until_topic_is_ready(self.kafka_client, topic)
-
- @inlineCallbacks
- def _wait_until_topic_is_ready(self, client, topic):
- e = True
- while e:
- yield client.load_metadata_for_topics(topic)
- e = client.metadata_error_for_topic(topic)
- if e:
- log.debug("Topic-not-ready-retrying...", topic=topic)
-
- def _clear_backoff(self):
- if self.retries:
- log.info('reconnected-to-consul', after_retries=self.retries)
- self.retries = 0
-
def get_target_cls(self):
return self.target_cls
@@ -164,24 +144,13 @@
return self.default_topic
@inlineCallbacks
- def _subscribe(self, topic, offset, callback=None, target_cls=None):
+ def _subscribe_group_consumer(self, group_id, topic, offset, callback=None,
+ target_cls=None):
try:
log.debug("subscribing-to-topic-start", topic=topic)
- yield self._wait_until_topic_is_ready(self.kafka_client, topic)
- partitions = self.kafka_client.topic_partitions[topic]
- consumers = []
-
- # First setup the generic callback - all received messages will
- # go through that queue
- if topic not in self.topic_consumer_map:
- log.debug("topic-not-in-consumer-map", topic=topic)
- consumers = [Consumer(self.kafka_client, topic, partition,
- self._enqueue_received_message)
- for partition in partitions]
- self.topic_consumer_map[topic] = consumers
-
- log.debug("_subscribe", topic=topic,
- consumermap=self.topic_consumer_map)
+ yield self.kafka_proxy.subscribe(topic,
+ self._enqueue_received_group_message,
+ group_id, offset)
if target_cls is not None and callback is None:
# Scenario #1
@@ -198,25 +167,6 @@
else:
log.warn("invalid-parameters")
- def cb_closed(result):
- """
- Called when a consumer cleanly stops.
- """
- log.debug("Consumers-cleanly-stopped")
-
- def eb_failed(failure):
- """
- Called when a consumer fails due to an uncaught exception in the
- processing callback or a network error on shutdown. In this case we
- simply log the error.
- """
- log.warn("Consumers-failed", failure=failure)
-
- for c in consumers:
- c.start(offset).addCallbacks(cb_closed, eb_failed)
-
- log.debug("subscribed-to-topic", topic=topic)
-
returnValue(True)
except Exception as e:
log.exception("Exception-during-subscription", e=e)
@@ -224,7 +174,7 @@
@inlineCallbacks
def subscribe(self, topic, callback=None, target_cls=None,
- max_retry=3, offset=OFFSET_LATEST):
+ max_retry=3, group_id=None, offset=KAFKA_OFFSET_LATEST):
"""
Scenario 1: invoked to subscribe to a specific topic with a
target_cls to invoke when a message is received on that topic. This
@@ -245,6 +195,8 @@
:param max_retry: the number of retries before reporting failure
to subscribe. This caters for scenario where the kafka topic is not
ready.
+ :param group_id: The ID of the group the consumer is subscribing to
+ :param offset: The topic offset on the kafka bus from where message consumption will start
:return: True on success, False on failure
"""
RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
@@ -253,13 +205,20 @@
wait_time = RETRY_BACKOFF[min(retries,
len(RETRY_BACKOFF) - 1)]
log.info(msg, retry_in=wait_time)
- return asleep(wait_time)
+ return asleep.asleep(wait_time)
+
+ log.debug("subscribing", topic=topic, group_id=group_id,
+ callback=callback, target=target_cls)
retry = 0
subscribed = False
+ if group_id is None:
+ group_id = self.default_group_id
while not subscribed:
- subscribed = yield self._subscribe(topic, callback=callback,
- target_cls=target_cls, offset=offset)
+ subscribed = yield self._subscribe_group_consumer(group_id, topic,
+ callback=callback,
+ target_cls=target_cls,
+ offset=offset)
if subscribed:
returnValue(True)
elif retry > max_retry:
@@ -268,51 +227,56 @@
_backoff("subscription-not-complete", retry)
retry += 1
- # while not self._subscribe(topic, callback=callback,
- # target_cls=target_cls):
- # if retry > max_retry:
- # return False
- # else:
- # _backoff("subscription-not-complete", retry)
- # retry += 1
- # return True
-
- def unsubscribe(self, topic):
+ def unsubscribe(self, topic, callback=None, target_cls=None):
"""
Invoked when unsubscribing to a topic
- :param topic: topic to unsubscibe from
+ :param topic: topic to unsubscribe from
+ :param callback: the callback used when subscribing to the topic, if any
+ :param target_cls: the targert class used when subscribing to the topic, if any
:return: None on success or Exception on failure
"""
log.debug("Unsubscribing-to-topic", topic=topic)
- def remove_topic(topic):
- if topic in self.topic_consumer_map:
- del self.topic_consumer_map[topic]
-
try:
- if topic in self.topic_consumer_map:
- consumers = self.topic_consumer_map[topic]
- d = gatherResults([c.stop() for c in consumers])
- d.addCallback(remove_topic, topic)
- log.debug("Unsubscribed-to-topic.", topic=topic)
- else:
- log.debug("Topic-does-not-exist.", topic=topic)
+ self.kafka_proxy.unsubscribe(topic,
+ self._enqueue_received_group_message)
+
+ if callback is None and target_cls is None:
+ log.error("both-call-and-target-cls-cannot-be-none",
+ topic=topic)
+ raise KafkaMessagingError(
+ error="both-call-and-target-cls-cannot-be-none")
+
+ if target_cls is not None and topic in self.topic_target_cls_map:
+ del self.topic_target_cls_map[topic]
+
+ if callback is not None and topic in self.topic_callback_map:
+ index = 0
+ for cb in self.topic_callback_map[topic]:
+ if cb == callback:
+ break
+ index += 1
+ if index < len(self.topic_callback_map[topic]):
+ self.topic_callback_map[topic].pop(index)
+
+ if len(self.topic_callback_map[topic]) == 0:
+ del self.topic_callback_map[topic]
except Exception as e:
- log.exception("Exception-when-stopping-messaging-proxy:", e=e)
+ log.exception("Exception-when-unsubscribing-to-topic", topic=topic,
+ e=e)
+ return e
@inlineCallbacks
- def _enqueue_received_message(self, reactor, message_list):
+ def _enqueue_received_group_message(self, msg):
"""
Internal method to continuously queue all received messaged
irrespective of topic
- :param reactor: A requirement by the Twisted Python kafka library
- :param message_list: Received list of messages
+ :param msg: Received message
:return: None on success, Exception on failure
"""
try:
- for m in message_list:
- log.debug("received-msg", msg=m)
- yield self.received_msg_queue.put(m)
+ log.debug("received-msg", msg=msg)
+ yield self.received_msg_queue.put(msg)
except Exception as e:
log.exception("Failed-enqueueing-received-message", e=e)
@@ -327,6 +291,8 @@
try:
message = yield self.received_msg_queue.get()
yield self._process_message(message)
+ if self.stopped:
+ break
except Exception as e:
log.exception("Failed-dequeueing-received-message", e=e)
@@ -457,16 +423,18 @@
current_time = int(round(time.time() * 1000))
# log.debug("Got Message", message=m)
try:
- val = m.message.value
+ val = m.value()
+ # val = m.message.value
# print m.topic
# Go over customized callbacks first
- if m.topic in self.topic_callback_map:
- for c in self.topic_callback_map[m.topic]:
+ m_topic = m.topic()
+ if m_topic in self.topic_callback_map:
+ for c in self.topic_callback_map[m_topic]:
yield c(val)
# Check whether we need to process request/response scenario
- if m.topic not in self.topic_target_cls_map:
+ if m_topic not in self.topic_target_cls_map:
return
# Process request/response scenario
@@ -506,7 +474,8 @@
response.header.to_topic)
self._send_kafka_message(res_topic, response)
- log.debug("Response-sent", response=response.body, to_topic=res_topic)
+ log.debug("Response-sent", response=response.body,
+ to_topic=res_topic)
elif message.header.type == MessageType.Value("RESPONSE"):
trns_id = self._to_string(message.header.id)
if trns_id in self.transaction_id_deferred_map: