[VOL-4663] create voltha event topic (voltha.events) with conifgurable no of partitions and replication factor

Change-Id: Ibaf8681ccdbffcc8a3c68612c49d7822a20e1b14
diff --git a/main.go b/main.go
index 3cc0136..6fc6c77 100644
--- a/main.go
+++ b/main.go
@@ -15,6 +15,7 @@
 package main
 
 import (
+	"context"
 	"io/ioutil"
 	"log"
 	"net/http"
@@ -23,19 +24,28 @@
 	"sync"
 
 	"gerrit.opencord.org/kafka-topic-exporter/common/logger"
+	"gerrit.opencord.org/kafka-topic-exporter/utils"
 	"github.com/Shopify/sarama"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/client_golang/prometheus/promhttp"
-	 "gerrit.opencord.org/kafka-topic-exporter/utils"
 	"gopkg.in/yaml.v2"
 )
 
+const (
+	volthaEventsTopic  = "voltha.events"
+	consumerGroup      = "kte_grp"
+	cDefaultPartitions = 1
+	cDefaultReplicas   = 1
+)
+
 func kafkaInit(broker BrokerInfo) {
 	config := sarama.NewConfig()
+
 	config.Consumer.Return.Errors = true
+	config.Metadata.AllowAutoTopicCreation = false
 	var wg sync.WaitGroup
 
-	master, err := sarama.NewConsumer([]string{broker.Host}, config)
+	consumer, err := sarama.NewConsumerGroup([]string{broker.Host}, consumerGroup, config)
 
 	if err != nil {
 		logger.Panic("kafkaInit panic")
@@ -43,24 +53,72 @@
 	}
 	defer func() {
 		logger.Debug("kafkaInit close connection")
-		if err := master.Close(); err != nil {
+		if err := consumer.Close(); err != nil {
 			panic(err)
 		}
 	}()
 
-	// read topics from config
-	topics := broker.Topics
-
-	// we are spinning threads for each topic, we need to wait for
-	// them to exit before stopping the kafka connection
-	wg.Add(len(topics))
-
-	for _, topic := range topics {
-		t := topic
-		go topicListener(&t, master, wg)
+	clusterAdmin, err := sarama.NewClusterAdmin([]string{broker.Host}, config)
+	if err != nil {
+		logger.Panic("Fail to create cluster admin")
+		panic(err)
 	}
 
+	// read topics from config
+	topics := broker.Topics
+	//conusmerTopics := strings.Join(topics, ",")
+	logger.Info("conusmer topics are %s", topics)
+
+	if broker.Partitions == 0 {
+		broker.Partitions = cDefaultPartitions
+	}
+
+	if broker.Replicas == 0 {
+		broker.Replicas = cDefaultReplicas
+	}
+
+	// create topics
+	for _, topic := range topics {
+		if topic == volthaEventsTopic {
+			continue
+		}
+		logger.Info("creating topic [%s] with [%d] partitions  and [%d] replicas ", topic, broker.Partitions, broker.Replicas)
+		err := createTopic(clusterAdmin, topic, broker.Partitions, broker.Replicas)
+		if err != nil {
+			logger.Panic("Fail to create topic %s", err)
+		}
+
+	}
+
+	ctx, cancel := context.WithCancel(context.Background())
+
+	wg.Add(1)
+
+	go topicListener(ctx, topics, consumer, wg)
+
 	wg.Wait()
+	cancel()
+}
+
+func createTopic(clusterAdmin sarama.ClusterAdmin, topic string, numPartitions int, replFactor int) error {
+	topicDetail := &sarama.TopicDetail{}
+
+	topicDetail.NumPartitions = int32(numPartitions)
+	topicDetail.ReplicationFactor = int16(replFactor)
+
+	topicDetail.ConfigEntries = make(map[string]*string)
+
+	err := clusterAdmin.CreateTopic(topic, topicDetail, false)
+	switch typedErr := err.(type) {
+	case *sarama.TopicError:
+		if typedErr.Err == sarama.ErrTopicAlreadyExists {
+			err = nil
+		}
+	}
+	if err != nil {
+		return err
+	}
+	return nil
 }
 
 func runServer(target TargetInfo) {
@@ -231,7 +289,6 @@
 	return m
 }
 
-
 func main() {
 	// load configuration
 	conf := loadConfigFile()
@@ -243,7 +300,6 @@
 	logger.Info("The utils.OnuSNhex : [%t]", utils.OnuSNhex)
 	logger.Info("The conf.Conv.Onusnformat is : [%t]", conf.Conv.Onusnhex)
 
-
 	go kafkaInit(conf.Broker)
 	runServer(conf.Target)
 }