This commit made some modifications in the way messages over
kafka are consumed, mostly around the initial offset to use.
Change-Id: I6104ef710d9c595034cd4cedc0d58ae774cec719
diff --git a/python/adapters/kafka/kafka_proxy.py b/python/adapters/kafka/kafka_proxy.py
index 6dcb10f..d596334 100644
--- a/python/adapters/kafka/kafka_proxy.py
+++ b/python/adapters/kafka/kafka_proxy.py
@@ -136,6 +136,25 @@
log.exception('failed-get-kafka-producer', e=e)
return
+
+ # @inlineCallbacks
+ # def wait_until_topic_is_ready(self, topic):
+ # # Assumes "auto.create.topics.enable" is set in the broker
+ # # configuration
+ # e = True
+ # while e:
+ # yield self.kclient.load_metadata_for_topics(topic)
+ # e = self.kclient.metadata_error_for_topic(topic)
+ # if e:
+ # log.debug("Topic-not-ready-retrying...", topic=topic)
+
+
+ @inlineCallbacks
+ def create_topic(self, topic):
+ # Assume auto create is enabled on the broker configuration
+ yield self.wait_until_topic_is_ready(topic)
+
+
@inlineCallbacks
def send_message(self, topic, msg):
assert topic is not None