[VOL-5486] Upgrade library versions

Change-Id: I8b4e88699e03f44ee13e467867f45ae3f0a63c4b
Signed-off-by: Abhay Kumar <abhay.kumar@radisys.com>
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index d1a1a3a..75025f2 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -24,8 +24,7 @@
 	"sync"
 	"time"
 
-	"github.com/Shopify/sarama"
-	scc "github.com/bsm/sarama-cluster"
+	"github.com/IBM/sarama"
 	"github.com/eapache/go-resiliency/breaker"
 	"github.com/golang/protobuf/proto"
 	"github.com/google/uuid"
@@ -49,7 +48,7 @@
 	KafkaAddress                  string
 	producer                      sarama.AsyncProducer
 	consumer                      sarama.Consumer
-	groupConsumers                map[string]*scc.Consumer
+	groupConsumers                map[string]sarama.ConsumerGroup
 	lockOfGroupConsumers          sync.RWMutex
 	consumerGroupPrefix           string
 	consumerType                  int
@@ -218,7 +217,7 @@
 		option(client)
 	}
 
-	client.groupConsumers = make(map[string]*scc.Consumer)
+	client.groupConsumers = make(map[string]sarama.ConsumerGroup)
 
 	client.lockTopicToConsumerChannelMap = sync.RWMutex{}
 	client.topicLockMap = make(map[string]*sync.RWMutex)
@@ -776,7 +775,7 @@
 					err = errTemp
 				}
 			}
-		} else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
+		} else if groupConsumer, ok := consumer.(sarama.ConsumerGroup); ok {
 			if errTemp := groupConsumer.Close(); errTemp != nil {
 				if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
 					// This can occur on race condition
@@ -879,33 +878,28 @@
 }
 
 // createGroupConsumer creates a consumers group
-func (sc *SaramaClient) createGroupConsumer(ctx context.Context, topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
-	config := scc.NewConfig()
+func (sc *SaramaClient) createGroupConsumer(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (sarama.ConsumerGroup, error) {
+	config := sarama.NewConfig()
 	config.Version = sarama.V1_0_0_0
 	config.ClientID = uuid.New().String()
-	config.Group.Mode = scc.ConsumerModeMultiplex
+	config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
 	config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
 	config.Consumer.Return.Errors = true
-	//config.Group.Return.Notifications = false
-	//config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
-	//config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
 	config.Consumer.Offsets.Initial = initialOffset
-	//config.Consumer.Offsets.Initial = sarama.OffsetOldest
+
 	brokers := []string{sc.KafkaAddress}
-
-	topics := []string{topic.Name}
-	var consumer *scc.Consumer
+	// topics := []string{topic.Name}
+	var consumerGroup sarama.ConsumerGroup
 	var err error
-
-	if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
-		logger.Errorw(ctx, "create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+	if consumerGroup, err = sarama.NewConsumerGroup(brokers, groupId, config); err != nil {
+		logger.Errorw(ctx, "create-group-consumer-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
 		return nil, err
 	}
-	logger.Debugw(ctx, "create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
 
-	//sc.groupConsumers[topic.Name] = consumer
-	sc.addToGroupConsumers(topic.Name, consumer)
-	return consumer, nil
+	logger.Debugw(ctx, "create-group-consumer-success", log.Fields{"topic": topic.Name, "groupId": groupId})
+	//sc.groupConsumers[topic.Name] = consumerGroup
+	sc.addToGroupConsumers(topic.Name, consumerGroup)
+	return consumerGroup, nil
 }
 
 // dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
@@ -964,48 +958,59 @@
 	sc.setUnhealthy(ctx)
 }
 
-func (sc *SaramaClient) consumeGroupMessages(ctx context.Context, topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
+func (sc *SaramaClient) consumeGroupMessages(ctx context.Context, topic *Topic, consumerGroup sarama.ConsumerGroup, consumerChnls *consumerChannels) {
 	logger.Debugw(ctx, "starting-group-consumption-loop", log.Fields{"topic": topic.Name})
 
-startloop:
-	for {
-		select {
-		case err, ok := <-consumer.Errors():
-			if ok {
+	go func() {
+		for {
+			err := consumerGroup.Consume(ctx, []string{topic.Name}, &groupConsumerHandler{
+				consumerChnls: consumerChnls,
+				sc:            sc,
+				topic:         topic,
+			})
+			if err != nil {
+				logger.Warnw(ctx, "group-consumer-error", log.Fields{"topic": topic.Name, "error": err})
 				if sc.isLivenessError(ctx, err) {
 					sc.updateLiveness(ctx, false)
 				}
-				logger.Warnw(ctx, "group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
-			} else {
-				logger.Warnw(ctx, "group-consumers-closed-err", log.Fields{"topic": topic.Name})
-				// channel is closed
-				break startloop
 			}
-		case msg, ok := <-consumer.Messages():
-			if !ok {
-				logger.Warnw(ctx, "group-consumers-closed-msg", log.Fields{"topic": topic.Name})
-				// Channel closed
-				break startloop
+			select {
+			case <-sc.doneCh:
+				logger.Infow(ctx, "group-received-exit-signal", log.Fields{"topic": topic.Name})
+				return
+			default:
 			}
-			sc.updateLiveness(ctx, true)
-			logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
-			msgBody := msg.Value
-			var protoMsg proto.Message
-			if err := proto.Unmarshal(msgBody, protoMsg); err != nil {
-				logger.Warnw(ctx, "invalid-message", log.Fields{"error": err})
-				continue
-			}
-			go sc.dispatchToConsumers(consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
-			consumer.MarkOffset(msg, "")
-		case ntf := <-consumer.Notifications():
-			logger.Debugw(ctx, "group-received-notification", log.Fields{"notification": ntf})
-		case <-sc.doneCh:
-			logger.Infow(ctx, "group-received-exit-signal", log.Fields{"topic": topic.Name})
-			break startloop
 		}
+	}()
+}
+
+type groupConsumerHandler struct {
+	consumerChnls *consumerChannels
+	sc            *SaramaClient
+	topic         *Topic
+}
+
+func (h *groupConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error {
+	return nil
+}
+
+func (h *groupConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
+	return nil
+}
+
+func (h *groupConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+	for msg := range claim.Messages() {
+		h.sc.updateLiveness(context.Background(), true)
+		logger.Debugw(context.Background(), "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
+		var protoMsg proto.Message
+		if err := proto.Unmarshal(msg.Value, protoMsg); err != nil {
+			logger.Warnw(context.Background(), "invalid-message", log.Fields{"error": err})
+			continue
+		}
+		go h.sc.dispatchToConsumers(h.consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
+		session.MarkMessage(msg, "")
 	}
-	logger.Infow(ctx, "group-consumer-stopped", log.Fields{"topic": topic.Name})
-	sc.setUnhealthy(ctx)
+	return nil
 }
 
 func (sc *SaramaClient) startConsumers(ctx context.Context, topic *Topic) error {
@@ -1019,7 +1024,7 @@
 	for _, consumer := range consumerCh.consumers {
 		if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
 			go sc.consumeFromAPartition(ctx, topic, pConsumer, consumerCh)
-		} else if gConsumer, ok := consumer.(*scc.Consumer); ok {
+		} else if gConsumer, ok := consumer.(sarama.ConsumerGroup); ok {
 			go sc.consumeGroupMessages(ctx, topic, gConsumer, consumerCh)
 		} else {
 			logger.Errorw(ctx, "invalid-consumer", log.Fields{"topic": topic})
@@ -1071,18 +1076,16 @@
 // setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
 // for that topic.  It also starts the routine that listens for messages on that topic.
 func (sc *SaramaClient) setupGroupConsumerChannel(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (chan proto.Message, error) {
-	// TODO:  Replace this development partition consumers with a group consumers
-	var pConsumer *scc.Consumer
+	var consumerGroup sarama.ConsumerGroup
 	var err error
-	if pConsumer, err = sc.createGroupConsumer(ctx, topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
-		logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
+	if consumerGroup, err = sc.createGroupConsumer(ctx, topic, groupId, initialOffset); err != nil {
+		logger.Errorw(ctx, "creating-group-consumer-failure", log.Fields{"error": err, "topic": topic.Name})
 		return nil, err
 	}
-	// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
-	// unbuffered to verify race conditions.
+
 	consumerListeningChannel := make(chan proto.Message)
 	cc := &consumerChannels{
-		consumers: []interface{}{pConsumer},
+		consumers: []interface{}{consumerGroup},
 		channels:  []chan proto.Message{consumerListeningChannel},
 	}
 
@@ -1135,22 +1138,21 @@
 	return channels
 }
 
-func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
+func (sc *SaramaClient) addToGroupConsumers(topic string, consumerGroup sarama.ConsumerGroup) {
 	sc.lockOfGroupConsumers.Lock()
 	defer sc.lockOfGroupConsumers.Unlock()
 	if _, exist := sc.groupConsumers[topic]; !exist {
-		sc.groupConsumers[topic] = consumer
+		sc.groupConsumers[topic] = consumerGroup
 	}
 }
 
 func (sc *SaramaClient) deleteFromGroupConsumers(ctx context.Context, topic string) error {
 	sc.lockOfGroupConsumers.Lock()
 	defer sc.lockOfGroupConsumers.Unlock()
-	if _, exist := sc.groupConsumers[topic]; exist {
-		consumer := sc.groupConsumers[topic]
+	if consumerGroup, exist := sc.groupConsumers[topic]; exist {
 		delete(sc.groupConsumers, topic)
-		if err := consumer.Close(); err != nil {
-			logger.Errorw(ctx, "failure-closing-consumer", log.Fields{"error": err})
+		if err := consumerGroup.Close(); err != nil {
+			logger.Errorw(ctx, "failure-closing-consumer-group", log.Fields{"error": err})
 			return err
 		}
 	}