This commit adds a complete partition consumer as well as a
group consumer to the sarama client library. It also upgrades
the kafka running version.
Change-Id: Idca3eb1aa31d668afa86d12b39d6a1b0ab1965bc
diff --git a/kafka/client.go b/kafka/client.go
index cb33a35..ad8f01a 100644
--- a/kafka/client.go
+++ b/kafka/client.go
@@ -17,29 +17,41 @@
import (
ca "github.com/opencord/voltha-go/protos/core_adapter"
+ "time"
)
const (
- DefaultKafkaHost = "127.0.0.1"
- DefaultKafkaPort = 9092
- DefaultGroupName = "rw_core"
- DefaultSleepOnError = 1
- DefaultFlushFrequency = 1
- DefaultFlushMessages = 1
- DefaultFlushMaxmessages = 1
- DefaultReturnSuccess = false
- DefaultReturnErrors = true
- DefaultConsumerMaxwait = 10
- DefaultMaxProcessingTime = 100
+ PartitionConsumer = iota
+ GroupCustomer = iota
+)
+
+const (
+ DefaultKafkaHost = "127.0.0.1"
+ DefaultKafkaPort = 9092
+ DefaultGroupName = "rw_core"
+ DefaultSleepOnError = 1
+ DefaultProducerFlushFrequency = 5
+ DefaultProducerFlushMessages = 1
+ DefaultProducerFlushMaxmessages = 5
+ DefaultProducerReturnSuccess = false
+ DefaultProducerReturnErrors = true
+ DefaultProducerRetryMax = 3
+ DefaultProducerRetryBackoff = time.Millisecond * 100
+ DefaultConsumerMaxwait = 10
+ DefaultMaxProcessingTime = 100
+ DefaultConsumerType = PartitionConsumer
+ DefaultNumberPartitions = 3
+ DefaultNumberReplicas = 1
+ DefaultAutoCreateTopic = false
)
// MsgClient represents the set of APIs a Kafka MsgClient must implement
type Client interface {
- Start(retries int) error
+ Start() error
Stop()
- CreateTopic(topic *Topic, numPartition int, repFactor int, retries int) error
+ CreateTopic(topic *Topic, numPartition int, repFactor int) error
DeleteTopic(topic *Topic) error
- Subscribe(topic *Topic, retries int) (<-chan *ca.InterContainerMessage, error)
+ Subscribe(topic *Topic) (<-chan *ca.InterContainerMessage, error)
UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error
- Send(msg interface{}, topic *Topic, keys ...string)
+ Send(msg interface{}, topic *Topic, keys ...string) error
}