This commit made some modifications in the way messages over
kafka are consumed, mostly around the initial offset to use.

Change-Id: I6104ef710d9c595034cd4cedc0d58ae774cec719
diff --git a/python/adapters/kafka/kafka_proxy.py b/python/adapters/kafka/kafka_proxy.py
index 6dcb10f..d596334 100644
--- a/python/adapters/kafka/kafka_proxy.py
+++ b/python/adapters/kafka/kafka_proxy.py
@@ -136,6 +136,25 @@
             log.exception('failed-get-kafka-producer', e=e)
             return
 
+
+    # @inlineCallbacks
+    # def wait_until_topic_is_ready(self, topic):
+    #     #  Assumes "auto.create.topics.enable" is set in the broker
+    #     #  configuration
+    #     e = True
+    #     while e:
+    #         yield self.kclient.load_metadata_for_topics(topic)
+    #         e = self.kclient.metadata_error_for_topic(topic)
+    #         if e:
+    #             log.debug("Topic-not-ready-retrying...", topic=topic)
+
+
+    @inlineCallbacks
+    def create_topic(self, topic):
+        # Assume auto create is enabled on the broker configuration
+        yield self.wait_until_topic_is_ready(topic)
+
+
     @inlineCallbacks
     def send_message(self, topic, msg):
         assert topic is not None