[VOL-1386] This commit add "dep" as the package management tool
for voltha-go.
Change-Id: I52bc4911dd00a441756ec7c30f46d45091f3f90e
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go
new file mode 100644
index 0000000..5c42ece
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go
@@ -0,0 +1,581 @@
+package kafka
+
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import (
+ "fmt"
+ "math"
+ "time"
+ "unsafe"
+)
+
+/*
+#include <stdlib.h>
+#include <librdkafka/rdkafka.h>
+
+
+static rd_kafka_topic_partition_t *_c_rdkafka_topic_partition_list_entry(rd_kafka_topic_partition_list_t *rktparlist, int idx) {
+ return idx < rktparlist->cnt ? &rktparlist->elems[idx] : NULL;
+}
+*/
+import "C"
+
+// RebalanceCb provides a per-Subscribe*() rebalance event callback.
+// The passed Event will be either AssignedPartitions or RevokedPartitions
+type RebalanceCb func(*Consumer, Event) error
+
+// Consumer implements a High-level Apache Kafka Consumer instance
+type Consumer struct {
+ events chan Event
+ handle handle
+ eventsChanEnable bool
+ readerTermChan chan bool
+ rebalanceCb RebalanceCb
+ appReassigned bool
+ appRebalanceEnable bool // config setting
+}
+
+// Strings returns a human readable name for a Consumer instance
+func (c *Consumer) String() string {
+ return c.handle.String()
+}
+
+// getHandle implements the Handle interface
+func (c *Consumer) gethandle() *handle {
+ return &c.handle
+}
+
+// Subscribe to a single topic
+// This replaces the current subscription
+func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error {
+ return c.SubscribeTopics([]string{topic}, rebalanceCb)
+}
+
+// SubscribeTopics subscribes to the provided list of topics.
+// This replaces the current subscription.
+func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error) {
+ ctopics := C.rd_kafka_topic_partition_list_new(C.int(len(topics)))
+ defer C.rd_kafka_topic_partition_list_destroy(ctopics)
+
+ for _, topic := range topics {
+ ctopic := C.CString(topic)
+ defer C.free(unsafe.Pointer(ctopic))
+ C.rd_kafka_topic_partition_list_add(ctopics, ctopic, C.RD_KAFKA_PARTITION_UA)
+ }
+
+ e := C.rd_kafka_subscribe(c.handle.rk, ctopics)
+ if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return newError(e)
+ }
+
+ c.rebalanceCb = rebalanceCb
+ c.handle.currAppRebalanceEnable = c.rebalanceCb != nil || c.appRebalanceEnable
+
+ return nil
+}
+
+// Unsubscribe from the current subscription, if any.
+func (c *Consumer) Unsubscribe() (err error) {
+ C.rd_kafka_unsubscribe(c.handle.rk)
+ return nil
+}
+
+// Assign an atomic set of partitions to consume.
+// This replaces the current assignment.
+func (c *Consumer) Assign(partitions []TopicPartition) (err error) {
+ c.appReassigned = true
+
+ cparts := newCPartsFromTopicPartitions(partitions)
+ defer C.rd_kafka_topic_partition_list_destroy(cparts)
+
+ e := C.rd_kafka_assign(c.handle.rk, cparts)
+ if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return newError(e)
+ }
+
+ return nil
+}
+
+// Unassign the current set of partitions to consume.
+func (c *Consumer) Unassign() (err error) {
+ c.appReassigned = true
+
+ e := C.rd_kafka_assign(c.handle.rk, nil)
+ if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return newError(e)
+ }
+
+ return nil
+}
+
+// commit offsets for specified offsets.
+// If offsets is nil the currently assigned partitions' offsets are committed.
+// This is a blocking call, caller will need to wrap in go-routine to
+// get async or throw-away behaviour.
+func (c *Consumer) commit(offsets []TopicPartition) (committedOffsets []TopicPartition, err error) {
+ var rkqu *C.rd_kafka_queue_t
+
+ rkqu = C.rd_kafka_queue_new(c.handle.rk)
+ defer C.rd_kafka_queue_destroy(rkqu)
+
+ var coffsets *C.rd_kafka_topic_partition_list_t
+ if offsets != nil {
+ coffsets = newCPartsFromTopicPartitions(offsets)
+ defer C.rd_kafka_topic_partition_list_destroy(coffsets)
+ }
+
+ cErr := C.rd_kafka_commit_queue(c.handle.rk, coffsets, rkqu, nil, nil)
+ if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return nil, newError(cErr)
+ }
+
+ rkev := C.rd_kafka_queue_poll(rkqu, C.int(-1))
+ if rkev == nil {
+ // shouldn't happen
+ return nil, newError(C.RD_KAFKA_RESP_ERR__DESTROY)
+ }
+ defer C.rd_kafka_event_destroy(rkev)
+
+ if C.rd_kafka_event_type(rkev) != C.RD_KAFKA_EVENT_OFFSET_COMMIT {
+ panic(fmt.Sprintf("Expected OFFSET_COMMIT, got %s",
+ C.GoString(C.rd_kafka_event_name(rkev))))
+ }
+
+ cErr = C.rd_kafka_event_error(rkev)
+ if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return nil, newErrorFromCString(cErr, C.rd_kafka_event_error_string(rkev))
+ }
+
+ cRetoffsets := C.rd_kafka_event_topic_partition_list(rkev)
+ if cRetoffsets == nil {
+ // no offsets, no error
+ return nil, nil
+ }
+ committedOffsets = newTopicPartitionsFromCparts(cRetoffsets)
+
+ return committedOffsets, nil
+}
+
+// Commit offsets for currently assigned partitions
+// This is a blocking call.
+// Returns the committed offsets on success.
+func (c *Consumer) Commit() ([]TopicPartition, error) {
+ return c.commit(nil)
+}
+
+// CommitMessage commits offset based on the provided message.
+// This is a blocking call.
+// Returns the committed offsets on success.
+func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error) {
+ if m.TopicPartition.Error != nil {
+ return nil, Error{ErrInvalidArg, "Can't commit errored message"}
+ }
+ offsets := []TopicPartition{m.TopicPartition}
+ offsets[0].Offset++
+ return c.commit(offsets)
+}
+
+// CommitOffsets commits the provided list of offsets
+// This is a blocking call.
+// Returns the committed offsets on success.
+func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error) {
+ return c.commit(offsets)
+}
+
+// StoreOffsets stores the provided list of offsets that will be committed
+// to the offset store according to `auto.commit.interval.ms` or manual
+// offset-less Commit().
+//
+// Returns the stored offsets on success. If at least one offset couldn't be stored,
+// an error and a list of offsets is returned. Each offset can be checked for
+// specific errors via its `.Error` member.
+func (c *Consumer) StoreOffsets(offsets []TopicPartition) (storedOffsets []TopicPartition, err error) {
+ coffsets := newCPartsFromTopicPartitions(offsets)
+ defer C.rd_kafka_topic_partition_list_destroy(coffsets)
+
+ cErr := C.rd_kafka_offsets_store(c.handle.rk, coffsets)
+
+ // coffsets might be annotated with an error
+ storedOffsets = newTopicPartitionsFromCparts(coffsets)
+
+ if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return storedOffsets, newError(cErr)
+ }
+
+ return storedOffsets, nil
+}
+
+// Seek seeks the given topic partitions using the offset from the TopicPartition.
+//
+// If timeoutMs is not 0 the call will wait this long for the
+// seek to be performed. If the timeout is reached the internal state
+// will be unknown and this function returns ErrTimedOut.
+// If timeoutMs is 0 it will initiate the seek but return
+// immediately without any error reporting (e.g., async).
+//
+// Seek() may only be used for partitions already being consumed
+// (through Assign() or implicitly through a self-rebalanced Subscribe()).
+// To set the starting offset it is preferred to use Assign() and provide
+// a starting offset for each partition.
+//
+// Returns an error on failure or nil otherwise.
+func (c *Consumer) Seek(partition TopicPartition, timeoutMs int) error {
+ rkt := c.handle.getRkt(*partition.Topic)
+ cErr := C.rd_kafka_seek(rkt,
+ C.int32_t(partition.Partition),
+ C.int64_t(partition.Offset),
+ C.int(timeoutMs))
+ if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return newError(cErr)
+ }
+ return nil
+}
+
+// Poll the consumer for messages or events.
+//
+// Will block for at most timeoutMs milliseconds
+//
+// The following callbacks may be triggered:
+// Subscribe()'s rebalanceCb
+//
+// Returns nil on timeout, else an Event
+func (c *Consumer) Poll(timeoutMs int) (event Event) {
+ ev, _ := c.handle.eventPoll(nil, timeoutMs, 1, nil)
+ return ev
+}
+
+// Events returns the Events channel (if enabled)
+func (c *Consumer) Events() chan Event {
+ return c.events
+}
+
+// ReadMessage polls the consumer for a message.
+//
+// This is a conveniance API that wraps Poll() and only returns
+// messages or errors. All other event types are discarded.
+//
+// The call will block for at most `timeout` waiting for
+// a new message or error. `timeout` may be set to -1 for
+// indefinite wait.
+//
+// Timeout is returned as (nil, err) where err is `kafka.(Error).Code == Kafka.ErrTimedOut`.
+//
+// Messages are returned as (msg, nil),
+// while general errors are returned as (nil, err),
+// and partition-specific errors are returned as (msg, err) where
+// msg.TopicPartition provides partition-specific information (such as topic, partition and offset).
+//
+// All other event types, such as PartitionEOF, AssignedPartitions, etc, are silently discarded.
+//
+func (c *Consumer) ReadMessage(timeout time.Duration) (*Message, error) {
+
+ var absTimeout time.Time
+ var timeoutMs int
+
+ if timeout > 0 {
+ absTimeout = time.Now().Add(timeout)
+ timeoutMs = (int)(timeout.Seconds() * 1000.0)
+ } else {
+ timeoutMs = (int)(timeout)
+ }
+
+ for {
+ ev := c.Poll(timeoutMs)
+
+ switch e := ev.(type) {
+ case *Message:
+ if e.TopicPartition.Error != nil {
+ return e, e.TopicPartition.Error
+ }
+ return e, nil
+ case Error:
+ return nil, e
+ default:
+ // Ignore other event types
+ }
+
+ if timeout > 0 {
+ // Calculate remaining time
+ timeoutMs = int(math.Max(0.0, absTimeout.Sub(time.Now()).Seconds()*1000.0))
+ }
+
+ if timeoutMs == 0 && ev == nil {
+ return nil, newError(C.RD_KAFKA_RESP_ERR__TIMED_OUT)
+ }
+
+ }
+
+}
+
+// Close Consumer instance.
+// The object is no longer usable after this call.
+func (c *Consumer) Close() (err error) {
+
+ if c.eventsChanEnable {
+ // Wait for consumerReader() to terminate (by closing readerTermChan)
+ close(c.readerTermChan)
+ c.handle.waitTerminated(1)
+ close(c.events)
+ }
+
+ C.rd_kafka_queue_destroy(c.handle.rkq)
+ c.handle.rkq = nil
+
+ e := C.rd_kafka_consumer_close(c.handle.rk)
+ if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return newError(e)
+ }
+
+ c.handle.cleanup()
+
+ C.rd_kafka_destroy(c.handle.rk)
+
+ return nil
+}
+
+// NewConsumer creates a new high-level Consumer instance.
+//
+// Supported special configuration properties:
+// go.application.rebalance.enable (bool, false) - Forward rebalancing responsibility to application via the Events() channel.
+// If set to true the app must handle the AssignedPartitions and
+// RevokedPartitions events and call Assign() and Unassign()
+// respectively.
+// go.events.channel.enable (bool, false) - Enable the Events() channel. Messages and events will be pushed on the Events() channel and the Poll() interface will be disabled. (Experimental)
+// go.events.channel.size (int, 1000) - Events() channel size
+//
+// WARNING: Due to the buffering nature of channels (and queues in general) the
+// use of the events channel risks receiving outdated events and
+// messages. Minimizing go.events.channel.size reduces the risk
+// and number of outdated events and messages but does not eliminate
+// the factor completely. With a channel size of 1 at most one
+// event or message may be outdated.
+func NewConsumer(conf *ConfigMap) (*Consumer, error) {
+
+ err := versionCheck()
+ if err != nil {
+ return nil, err
+ }
+
+ // before we do anything with the configuration, create a copy such that
+ // the original is not mutated.
+ confCopy := conf.clone()
+
+ groupid, _ := confCopy.get("group.id", nil)
+ if groupid == nil {
+ // without a group.id the underlying cgrp subsystem in librdkafka wont get started
+ // and without it there is no way to consume assigned partitions.
+ // So for now require the group.id, this might change in the future.
+ return nil, newErrorFromString(ErrInvalidArg, "Required property group.id not set")
+ }
+
+ c := &Consumer{}
+
+ v, err := confCopy.extract("go.application.rebalance.enable", false)
+ if err != nil {
+ return nil, err
+ }
+ c.appRebalanceEnable = v.(bool)
+
+ v, err = confCopy.extract("go.events.channel.enable", false)
+ if err != nil {
+ return nil, err
+ }
+ c.eventsChanEnable = v.(bool)
+
+ v, err = confCopy.extract("go.events.channel.size", 1000)
+ if err != nil {
+ return nil, err
+ }
+ eventsChanSize := v.(int)
+
+ cConf, err := confCopy.convert()
+ if err != nil {
+ return nil, err
+ }
+ cErrstr := (*C.char)(C.malloc(C.size_t(256)))
+ defer C.free(unsafe.Pointer(cErrstr))
+
+ C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_REBALANCE|C.RD_KAFKA_EVENT_OFFSET_COMMIT|C.RD_KAFKA_EVENT_STATS|C.RD_KAFKA_EVENT_ERROR)
+
+ c.handle.rk = C.rd_kafka_new(C.RD_KAFKA_CONSUMER, cConf, cErrstr, 256)
+ if c.handle.rk == nil {
+ return nil, newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr)
+ }
+
+ C.rd_kafka_poll_set_consumer(c.handle.rk)
+
+ c.handle.c = c
+ c.handle.setup()
+ c.handle.rkq = C.rd_kafka_queue_get_consumer(c.handle.rk)
+ if c.handle.rkq == nil {
+ // no cgrp (no group.id configured), revert to main queue.
+ c.handle.rkq = C.rd_kafka_queue_get_main(c.handle.rk)
+ }
+
+ if c.eventsChanEnable {
+ c.events = make(chan Event, eventsChanSize)
+ c.readerTermChan = make(chan bool)
+
+ /* Start rdkafka consumer queue reader -> events writer goroutine */
+ go consumerReader(c, c.readerTermChan)
+ }
+
+ return c, nil
+}
+
+// rebalance calls the application's rebalance callback, if any.
+// Returns true if the underlying assignment was updated, else false.
+func (c *Consumer) rebalance(ev Event) bool {
+ c.appReassigned = false
+
+ if c.rebalanceCb != nil {
+ c.rebalanceCb(c, ev)
+ }
+
+ return c.appReassigned
+}
+
+// consumerReader reads messages and events from the librdkafka consumer queue
+// and posts them on the consumer channel.
+// Runs until termChan closes
+func consumerReader(c *Consumer, termChan chan bool) {
+
+out:
+ for true {
+ select {
+ case _ = <-termChan:
+ break out
+ default:
+ _, term := c.handle.eventPoll(c.events, 100, 1000, termChan)
+ if term {
+ break out
+ }
+
+ }
+ }
+
+ c.handle.terminatedChan <- "consumerReader"
+ return
+
+}
+
+// GetMetadata queries broker for cluster and topic metadata.
+// If topic is non-nil only information about that topic is returned, else if
+// allTopics is false only information about locally used topics is returned,
+// else information about all topics is returned.
+// GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.
+func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) {
+ return getMetadata(c, topic, allTopics, timeoutMs)
+}
+
+// QueryWatermarkOffsets returns the broker's low and high offsets for the given topic
+// and partition.
+func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) {
+ return queryWatermarkOffsets(c, topic, partition, timeoutMs)
+}
+
+// OffsetsForTimes looks up offsets by timestamp for the given partitions.
+//
+// The returned offset for each partition is the earliest offset whose
+// timestamp is greater than or equal to the given timestamp in the
+// corresponding partition.
+//
+// The timestamps to query are represented as `.Offset` in the `times`
+// argument and the looked up offsets are represented as `.Offset` in the returned
+// `offsets` list.
+//
+// The function will block for at most timeoutMs milliseconds.
+//
+// Duplicate Topic+Partitions are not supported.
+// Per-partition errors may be returned in the `.Error` field.
+func (c *Consumer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) {
+ return offsetsForTimes(c, times, timeoutMs)
+}
+
+// Subscription returns the current subscription as set by Subscribe()
+func (c *Consumer) Subscription() (topics []string, err error) {
+ var cTopics *C.rd_kafka_topic_partition_list_t
+
+ cErr := C.rd_kafka_subscription(c.handle.rk, &cTopics)
+ if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return nil, newError(cErr)
+ }
+ defer C.rd_kafka_topic_partition_list_destroy(cTopics)
+
+ topicCnt := int(cTopics.cnt)
+ topics = make([]string, topicCnt)
+ for i := 0; i < topicCnt; i++ {
+ crktpar := C._c_rdkafka_topic_partition_list_entry(cTopics,
+ C.int(i))
+ topics[i] = C.GoString(crktpar.topic)
+ }
+
+ return topics, nil
+}
+
+// Assignment returns the current partition assignments
+func (c *Consumer) Assignment() (partitions []TopicPartition, err error) {
+ var cParts *C.rd_kafka_topic_partition_list_t
+
+ cErr := C.rd_kafka_assignment(c.handle.rk, &cParts)
+ if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return nil, newError(cErr)
+ }
+ defer C.rd_kafka_topic_partition_list_destroy(cParts)
+
+ partitions = newTopicPartitionsFromCparts(cParts)
+
+ return partitions, nil
+}
+
+// Committed retrieves committed offsets for the given set of partitions
+func (c *Consumer) Committed(partitions []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) {
+ cparts := newCPartsFromTopicPartitions(partitions)
+ defer C.rd_kafka_topic_partition_list_destroy(cparts)
+ cerr := C.rd_kafka_committed(c.handle.rk, cparts, C.int(timeoutMs))
+ if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return nil, newError(cerr)
+ }
+
+ return newTopicPartitionsFromCparts(cparts), nil
+}
+
+// Pause consumption for the provided list of partitions
+//
+// Note that messages already enqueued on the consumer's Event channel
+// (if `go.events.channel.enable` has been set) will NOT be purged by
+// this call, set `go.events.channel.size` accordingly.
+func (c *Consumer) Pause(partitions []TopicPartition) (err error) {
+ cparts := newCPartsFromTopicPartitions(partitions)
+ defer C.rd_kafka_topic_partition_list_destroy(cparts)
+ cerr := C.rd_kafka_pause_partitions(c.handle.rk, cparts)
+ if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return newError(cerr)
+ }
+ return nil
+}
+
+// Resume consumption for the provided list of partitions
+func (c *Consumer) Resume(partitions []TopicPartition) (err error) {
+ cparts := newCPartsFromTopicPartitions(partitions)
+ defer C.rd_kafka_topic_partition_list_destroy(cparts)
+ cerr := C.rd_kafka_resume_partitions(c.handle.rk, cparts)
+ if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return newError(cerr)
+ }
+ return nil
+}