Initial commit moving openolt adapter from voltha-go to the new repo.
This version works with ponsim rather than openolt, this is temporary.
It is currently being fixed to work with openolt.

Change-Id: I34a800c98f050140b367e2d474b7aa8b79f34b9a
Signed-off-by: William Kurkian <wkurkian@cisco.com>
diff --git a/python/adapters/kafka/kafka_proxy.py b/python/adapters/kafka/kafka_proxy.py
new file mode 100644
index 0000000..cefc590
--- /dev/null
+++ b/python/adapters/kafka/kafka_proxy.py
@@ -0,0 +1,338 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+from confluent_kafka import Producer as _kafkaProducer
+from structlog import get_logger
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.threads import deferToThread
+from zope.interface import implementer
+
+from python.common.utils.consulhelpers import get_endpoint_from_consul
+from event_bus_publisher import EventBusPublisher
+from python.common.utils.registry import IComponent
+from confluent_kafka import Consumer, KafkaError
+import threading
+
+log = get_logger()
+
+
+@implementer(IComponent)
+class KafkaProxy(object):
+    """
+    This is a singleton proxy kafka class to hide the kafka client details. This
+    proxy uses confluent-kafka-python as the kafka client. Since that client is
+    not a Twisted client then requests to that client are wrapped with
+    twisted.internet.threads.deferToThread to avoid any potential blocking of
+    the Twisted loop.
+    """
+    _kafka_instance = None
+
+    def __init__(self,
+                 consul_endpoint='localhost:8500',
+                 kafka_endpoint='localhost:9092',
+                 ack_timeout=1000,
+                 max_req_attempts=10,
+                 consumer_poll_timeout=10,
+                 config={}):
+
+        # return an exception if the object already exist
+        if KafkaProxy._kafka_instance:
+            raise Exception('Singleton exist for :{}'.format(KafkaProxy))
+
+        log.debug('initializing', endpoint=kafka_endpoint)
+        self.ack_timeout = ack_timeout
+        self.max_req_attempts = max_req_attempts
+        self.consul_endpoint = consul_endpoint
+        self.kafka_endpoint = kafka_endpoint
+        self.config = config
+        self.kclient = None
+        self.kproducer = None
+        self.event_bus_publisher = None
+        self.stopping = False
+        self.faulty = False
+        self.consumer_poll_timeout = consumer_poll_timeout
+        self.topic_consumer_map = {}
+        self.topic_callbacks_map = {}
+        self.topic_any_map_lock = threading.Lock()
+        log.debug('initialized', endpoint=kafka_endpoint)
+
+    @inlineCallbacks
+    def start(self):
+        log.debug('starting')
+        self._get_kafka_producer()
+        KafkaProxy._kafka_instance = self
+        self.event_bus_publisher = yield EventBusPublisher(
+            self, self.config.get('event_bus_publisher', {})).start()
+        log.info('started')
+        KafkaProxy.faulty = False
+        self.stopping = False
+        returnValue(self)
+
+    @inlineCallbacks
+    def stop(self):
+        try:
+            log.debug('stopping-kafka-proxy')
+            self.stopping = True
+            try:
+                if self.kclient:
+                    yield self.kclient.close()
+                    self.kclient = None
+                    log.debug('stopped-kclient-kafka-proxy')
+            except Exception, e:
+                log.exception('failed-stopped-kclient-kafka-proxy', e=e)
+
+            try:
+                if self.kproducer:
+                    yield self.kproducer.flush()
+                    self.kproducer = None
+                    log.debug('stopped-kproducer-kafka-proxy')
+            except Exception, e:
+                log.exception('failed-stopped-kproducer-kafka-proxy', e=e)
+
+            # Stop all consumers
+            try:
+                self.topic_any_map_lock.acquire()
+                log.debug('stopping-consumers-kafka-proxy')
+                for _, c in self.topic_consumer_map.iteritems():
+                    yield deferToThread(c.close)
+                self.topic_consumer_map.clear()
+                self.topic_callbacks_map.clear()
+                log.debug('stopped-consumers-kafka-proxy')
+            except Exception, e:
+                log.exception('failed-stopped-consumers-kafka-proxy', e=e)
+            finally:
+                self.topic_any_map_lock.release()
+                log.debug('stopping-consumers-kafka-proxy-released-lock')
+
+            # try:
+            #    if self.event_bus_publisher:
+            #        yield self.event_bus_publisher.stop()
+            #        self.event_bus_publisher = None
+            #        log.debug('stopped-event-bus-publisher-kafka-proxy')
+            # except Exception, e:
+            #    log.debug('failed-stopped-event-bus-publisher-kafka-proxy')
+            #    pass
+
+            log.debug('stopped-kafka-proxy')
+
+        except Exception, e:
+            self.kclient = None
+            self.kproducer = None
+            # self.event_bus_publisher = None
+            log.exception('failed-stopped-kafka-proxy', e=e)
+            pass
+
+    def _get_kafka_producer(self):
+
+        try:
+
+            if self.kafka_endpoint.startswith('@'):
+                try:
+                    _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
+                                                           self.kafka_endpoint[
+                                                           1:])
+                    log.debug('found-kafka-service', endpoint=_k_endpoint)
+
+                except Exception as e:
+                    log.exception('no-kafka-service-in-consul', e=e)
+
+                    self.kproducer = None
+                    self.kclient = None
+                    return
+            else:
+                _k_endpoint = self.kafka_endpoint
+            self.kproducer = _kafkaProducer(
+                {'bootstrap.servers': _k_endpoint,
+                 }
+            )
+            pass
+        except Exception, e:
+            log.exception('failed-get-kafka-producer', e=e)
+            return
+
+    @inlineCallbacks
+    def _wait_for_messages(self, consumer, topic):
+        while True:
+            try:
+                msg = yield deferToThread(consumer.poll,
+                                          self.consumer_poll_timeout)
+
+                if self.stopping:
+                    log.debug("stop-request-recieved", topic=topic)
+                    break
+
+                if msg is None:
+                    continue
+                if msg.error():
+                    # This typically is received when there are no more messages
+                    # to read from kafka. Ignore.
+                    continue
+
+                # Invoke callbacks
+                for cb in self.topic_callbacks_map[topic]:
+                    yield cb(msg)
+            except Exception as e:
+                log.debug("exception-receiving-msg", topic=topic, e=e)
+
+    @inlineCallbacks
+    def subscribe(self, topic, callback, groupId, offset='latest'):
+        """
+        subscribe allows a caller to subscribe to a given kafka topic.  This API
+        always create a group consumer.
+        :param topic - the topic to subscribe to
+        :param callback - the callback to invoke whenever a message is received
+        on that topic
+        :param groupId - the groupId for this consumer.  In the current
+        implementation there is a one-to-one mapping between a topic and a
+        groupId.  In other words, once a groupId is used for a given topic then
+        we won't be able to create another groupId for the same topic.
+        :param offset:  the kafka offset from where the consumer will start
+        consuming messages
+        """
+        try:
+            self.topic_any_map_lock.acquire()
+            if topic in self.topic_consumer_map:
+                # Just add the callback
+                if topic in self.topic_callbacks_map:
+                    self.topic_callbacks_map[topic].append(callback)
+                else:
+                    self.topic_callbacks_map[topic] = [callback]
+                return
+
+            # Create consumer for that topic
+            c = Consumer({
+                'bootstrap.servers': self.kafka_endpoint,
+                'group.id': groupId,
+                'auto.offset.reset': offset
+            })
+            yield deferToThread(c.subscribe, [topic])
+            # c.subscribe([topic])
+            self.topic_consumer_map[topic] = c
+            self.topic_callbacks_map[topic] = [callback]
+            # Start the consumer
+            reactor.callLater(0, self._wait_for_messages, c, topic)
+        except Exception, e:
+            log.exception("topic-subscription-error", e=e)
+        finally:
+            self.topic_any_map_lock.release()
+
+    @inlineCallbacks
+    def unsubscribe(self, topic, callback):
+        """
+        Unsubscribe to a given topic.  Since there they be multiple callers
+        consuming from the same topic then to ensure only the relevant caller
+        gets unsubscribe then the callback is used as a differentiator.   The
+        kafka consumer will be closed when there are no callbacks required.
+        :param topic: topic to unsubscribe
+        :param callback: callback the caller used when subscribing to the topic.
+        If multiple callers have subscribed to a topic using the same callback
+        then the first callback on the list will be removed.
+        :return:None
+        """
+        try:
+            self.topic_any_map_lock.acquire()
+            log.debug("unsubscribing-to-topic", topic=topic)
+            if topic in self.topic_callbacks_map:
+                index = 0
+                for cb in self.topic_callbacks_map[topic]:
+                    if cb == callback:
+                        break
+                    index += 1
+                if index < len(self.topic_callbacks_map[topic]):
+                    self.topic_callbacks_map[topic].pop(index)
+
+                if len(self.topic_callbacks_map[topic]) == 0:
+                    # Stop the consumer
+                    if topic in self.topic_consumer_map:
+                        yield deferToThread(
+                            self.topic_consumer_map[topic].close)
+                        del self.topic_consumer_map[topic]
+                    del self.topic_callbacks_map[topic]
+                    log.debug("unsubscribed-to-topic", topic=topic)
+                else:
+                    log.debug("consumers-for-topic-still-exist", topic=topic,
+                              num=len(self.topic_callbacks_map[topic]))
+        except Exception, e:
+            log.exception("topic-unsubscription-error", e=e)
+        finally:
+            self.topic_any_map_lock.release()
+            log.debug("unsubscribing-to-topic-release-lock", topic=topic)
+
+    @inlineCallbacks
+    def send_message(self, topic, msg, key=None):
+        assert topic is not None
+        assert msg is not None
+
+        # first check whether we have a kafka producer.  If there is none
+        # then try to get one - this happens only when we try to lookup the
+        # kafka service from consul
+        try:
+            if self.faulty is False:
+
+                if self.kproducer is None:
+                    self._get_kafka_producer()
+                    # Lets the next message request do the retry if still a failure
+                    if self.kproducer is None:
+                        log.error('no-kafka-producer',
+                                  endpoint=self.kafka_endpoint)
+                        return
+
+                log.debug('sending-kafka-msg', topic=topic, kafka_msg=msg)
+                msgs = [msg]
+
+                if self.kproducer is not None and self.event_bus_publisher and self.faulty is False:
+                    d = deferToThread(self.kproducer.produce, topic, msg, key)
+                    yield d
+                    log.debug('sent-kafka-msg', topic=topic, kafka_msg=msg)
+                    # send a lightweight poll to avoid an exception after 100k messages.
+                    d1 = deferToThread(self.kproducer.poll, 0)
+                    yield d1
+                else:
+                    return
+
+        except Exception, e:
+            self.faulty = True
+            log.error('failed-to-send-kafka-msg', topic=topic, kafka_msg=msg,
+                      e=e)
+
+            # set the kafka producer to None.  This is needed if the
+            # kafka docker went down and comes back up with a different
+            # port number.
+            if self.stopping is False:
+                log.debug('stopping-kafka-proxy')
+                try:
+                    self.stopping = True
+                    self.stop()
+                    self.stopping = False
+                    self.faulty = False
+                    log.debug('stopped-kafka-proxy')
+                except Exception, e:
+                    log.exception('failed-stopping-kafka-proxy', e=e)
+                    pass
+            else:
+                log.info('already-stopping-kafka-proxy')
+
+            return
+
+    def is_faulty(self):
+        return self.faulty
+
+
+# Common method to get the singleton instance of the kafka proxy class
+def get_kafka_proxy():
+    return KafkaProxy._kafka_instance