[VOL-5486] Upgrade library versions
Change-Id: I8b4e88699e03f44ee13e467867f45ae3f0a63c4b
Signed-off-by: Abhay Kumar <abhay.kumar@radisys.com>
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index d1a1a3a..75025f2 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -24,8 +24,7 @@
"sync"
"time"
- "github.com/Shopify/sarama"
- scc "github.com/bsm/sarama-cluster"
+ "github.com/IBM/sarama"
"github.com/eapache/go-resiliency/breaker"
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
@@ -49,7 +48,7 @@
KafkaAddress string
producer sarama.AsyncProducer
consumer sarama.Consumer
- groupConsumers map[string]*scc.Consumer
+ groupConsumers map[string]sarama.ConsumerGroup
lockOfGroupConsumers sync.RWMutex
consumerGroupPrefix string
consumerType int
@@ -218,7 +217,7 @@
option(client)
}
- client.groupConsumers = make(map[string]*scc.Consumer)
+ client.groupConsumers = make(map[string]sarama.ConsumerGroup)
client.lockTopicToConsumerChannelMap = sync.RWMutex{}
client.topicLockMap = make(map[string]*sync.RWMutex)
@@ -776,7 +775,7 @@
err = errTemp
}
}
- } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
+ } else if groupConsumer, ok := consumer.(sarama.ConsumerGroup); ok {
if errTemp := groupConsumer.Close(); errTemp != nil {
if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
// This can occur on race condition
@@ -879,33 +878,28 @@
}
// createGroupConsumer creates a consumers group
-func (sc *SaramaClient) createGroupConsumer(ctx context.Context, topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
- config := scc.NewConfig()
+func (sc *SaramaClient) createGroupConsumer(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (sarama.ConsumerGroup, error) {
+ config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
config.ClientID = uuid.New().String()
- config.Group.Mode = scc.ConsumerModeMultiplex
+ config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
config.Consumer.Return.Errors = true
- //config.Group.Return.Notifications = false
- //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
- //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
config.Consumer.Offsets.Initial = initialOffset
- //config.Consumer.Offsets.Initial = sarama.OffsetOldest
+
brokers := []string{sc.KafkaAddress}
-
- topics := []string{topic.Name}
- var consumer *scc.Consumer
+ // topics := []string{topic.Name}
+ var consumerGroup sarama.ConsumerGroup
var err error
-
- if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
- logger.Errorw(ctx, "create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+ if consumerGroup, err = sarama.NewConsumerGroup(brokers, groupId, config); err != nil {
+ logger.Errorw(ctx, "create-group-consumer-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
return nil, err
}
- logger.Debugw(ctx, "create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
- //sc.groupConsumers[topic.Name] = consumer
- sc.addToGroupConsumers(topic.Name, consumer)
- return consumer, nil
+ logger.Debugw(ctx, "create-group-consumer-success", log.Fields{"topic": topic.Name, "groupId": groupId})
+ //sc.groupConsumers[topic.Name] = consumerGroup
+ sc.addToGroupConsumers(topic.Name, consumerGroup)
+ return consumerGroup, nil
}
// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
@@ -964,48 +958,59 @@
sc.setUnhealthy(ctx)
}
-func (sc *SaramaClient) consumeGroupMessages(ctx context.Context, topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
+func (sc *SaramaClient) consumeGroupMessages(ctx context.Context, topic *Topic, consumerGroup sarama.ConsumerGroup, consumerChnls *consumerChannels) {
logger.Debugw(ctx, "starting-group-consumption-loop", log.Fields{"topic": topic.Name})
-startloop:
- for {
- select {
- case err, ok := <-consumer.Errors():
- if ok {
+ go func() {
+ for {
+ err := consumerGroup.Consume(ctx, []string{topic.Name}, &groupConsumerHandler{
+ consumerChnls: consumerChnls,
+ sc: sc,
+ topic: topic,
+ })
+ if err != nil {
+ logger.Warnw(ctx, "group-consumer-error", log.Fields{"topic": topic.Name, "error": err})
if sc.isLivenessError(ctx, err) {
sc.updateLiveness(ctx, false)
}
- logger.Warnw(ctx, "group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
- } else {
- logger.Warnw(ctx, "group-consumers-closed-err", log.Fields{"topic": topic.Name})
- // channel is closed
- break startloop
}
- case msg, ok := <-consumer.Messages():
- if !ok {
- logger.Warnw(ctx, "group-consumers-closed-msg", log.Fields{"topic": topic.Name})
- // Channel closed
- break startloop
+ select {
+ case <-sc.doneCh:
+ logger.Infow(ctx, "group-received-exit-signal", log.Fields{"topic": topic.Name})
+ return
+ default:
}
- sc.updateLiveness(ctx, true)
- logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
- msgBody := msg.Value
- var protoMsg proto.Message
- if err := proto.Unmarshal(msgBody, protoMsg); err != nil {
- logger.Warnw(ctx, "invalid-message", log.Fields{"error": err})
- continue
- }
- go sc.dispatchToConsumers(consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
- consumer.MarkOffset(msg, "")
- case ntf := <-consumer.Notifications():
- logger.Debugw(ctx, "group-received-notification", log.Fields{"notification": ntf})
- case <-sc.doneCh:
- logger.Infow(ctx, "group-received-exit-signal", log.Fields{"topic": topic.Name})
- break startloop
}
+ }()
+}
+
+type groupConsumerHandler struct {
+ consumerChnls *consumerChannels
+ sc *SaramaClient
+ topic *Topic
+}
+
+func (h *groupConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error {
+ return nil
+}
+
+func (h *groupConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
+ return nil
+}
+
+func (h *groupConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+ for msg := range claim.Messages() {
+ h.sc.updateLiveness(context.Background(), true)
+ logger.Debugw(context.Background(), "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
+ var protoMsg proto.Message
+ if err := proto.Unmarshal(msg.Value, protoMsg); err != nil {
+ logger.Warnw(context.Background(), "invalid-message", log.Fields{"error": err})
+ continue
+ }
+ go h.sc.dispatchToConsumers(h.consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
+ session.MarkMessage(msg, "")
}
- logger.Infow(ctx, "group-consumer-stopped", log.Fields{"topic": topic.Name})
- sc.setUnhealthy(ctx)
+ return nil
}
func (sc *SaramaClient) startConsumers(ctx context.Context, topic *Topic) error {
@@ -1019,7 +1024,7 @@
for _, consumer := range consumerCh.consumers {
if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
go sc.consumeFromAPartition(ctx, topic, pConsumer, consumerCh)
- } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
+ } else if gConsumer, ok := consumer.(sarama.ConsumerGroup); ok {
go sc.consumeGroupMessages(ctx, topic, gConsumer, consumerCh)
} else {
logger.Errorw(ctx, "invalid-consumer", log.Fields{"topic": topic})
@@ -1071,18 +1076,16 @@
// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
// for that topic. It also starts the routine that listens for messages on that topic.
func (sc *SaramaClient) setupGroupConsumerChannel(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (chan proto.Message, error) {
- // TODO: Replace this development partition consumers with a group consumers
- var pConsumer *scc.Consumer
+ var consumerGroup sarama.ConsumerGroup
var err error
- if pConsumer, err = sc.createGroupConsumer(ctx, topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
- logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
+ if consumerGroup, err = sc.createGroupConsumer(ctx, topic, groupId, initialOffset); err != nil {
+ logger.Errorw(ctx, "creating-group-consumer-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
- // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
- // unbuffered to verify race conditions.
+
consumerListeningChannel := make(chan proto.Message)
cc := &consumerChannels{
- consumers: []interface{}{pConsumer},
+ consumers: []interface{}{consumerGroup},
channels: []chan proto.Message{consumerListeningChannel},
}
@@ -1135,22 +1138,21 @@
return channels
}
-func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
+func (sc *SaramaClient) addToGroupConsumers(topic string, consumerGroup sarama.ConsumerGroup) {
sc.lockOfGroupConsumers.Lock()
defer sc.lockOfGroupConsumers.Unlock()
if _, exist := sc.groupConsumers[topic]; !exist {
- sc.groupConsumers[topic] = consumer
+ sc.groupConsumers[topic] = consumerGroup
}
}
func (sc *SaramaClient) deleteFromGroupConsumers(ctx context.Context, topic string) error {
sc.lockOfGroupConsumers.Lock()
defer sc.lockOfGroupConsumers.Unlock()
- if _, exist := sc.groupConsumers[topic]; exist {
- consumer := sc.groupConsumers[topic]
+ if consumerGroup, exist := sc.groupConsumers[topic]; exist {
delete(sc.groupConsumers, topic)
- if err := consumer.Close(); err != nil {
- logger.Errorw(ctx, "failure-closing-consumer", log.Fields{"error": err})
+ if err := consumerGroup.Close(); err != nil {
+ logger.Errorw(ctx, "failure-closing-consumer-group", log.Fields{"error": err})
return err
}
}