[VOL-4663] create voltha event topic (voltha.events) with conifgurable no of partitions and replication factor
Change-Id: Ibaf8681ccdbffcc8a3c68612c49d7822a20e1b14
diff --git a/main.go b/main.go
index 3cc0136..6fc6c77 100644
--- a/main.go
+++ b/main.go
@@ -15,6 +15,7 @@
package main
import (
+ "context"
"io/ioutil"
"log"
"net/http"
@@ -23,19 +24,28 @@
"sync"
"gerrit.opencord.org/kafka-topic-exporter/common/logger"
+ "gerrit.opencord.org/kafka-topic-exporter/utils"
"github.com/Shopify/sarama"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
- "gerrit.opencord.org/kafka-topic-exporter/utils"
"gopkg.in/yaml.v2"
)
+const (
+ volthaEventsTopic = "voltha.events"
+ consumerGroup = "kte_grp"
+ cDefaultPartitions = 1
+ cDefaultReplicas = 1
+)
+
func kafkaInit(broker BrokerInfo) {
config := sarama.NewConfig()
+
config.Consumer.Return.Errors = true
+ config.Metadata.AllowAutoTopicCreation = false
var wg sync.WaitGroup
- master, err := sarama.NewConsumer([]string{broker.Host}, config)
+ consumer, err := sarama.NewConsumerGroup([]string{broker.Host}, consumerGroup, config)
if err != nil {
logger.Panic("kafkaInit panic")
@@ -43,24 +53,72 @@
}
defer func() {
logger.Debug("kafkaInit close connection")
- if err := master.Close(); err != nil {
+ if err := consumer.Close(); err != nil {
panic(err)
}
}()
- // read topics from config
- topics := broker.Topics
-
- // we are spinning threads for each topic, we need to wait for
- // them to exit before stopping the kafka connection
- wg.Add(len(topics))
-
- for _, topic := range topics {
- t := topic
- go topicListener(&t, master, wg)
+ clusterAdmin, err := sarama.NewClusterAdmin([]string{broker.Host}, config)
+ if err != nil {
+ logger.Panic("Fail to create cluster admin")
+ panic(err)
}
+ // read topics from config
+ topics := broker.Topics
+ //conusmerTopics := strings.Join(topics, ",")
+ logger.Info("conusmer topics are %s", topics)
+
+ if broker.Partitions == 0 {
+ broker.Partitions = cDefaultPartitions
+ }
+
+ if broker.Replicas == 0 {
+ broker.Replicas = cDefaultReplicas
+ }
+
+ // create topics
+ for _, topic := range topics {
+ if topic == volthaEventsTopic {
+ continue
+ }
+ logger.Info("creating topic [%s] with [%d] partitions and [%d] replicas ", topic, broker.Partitions, broker.Replicas)
+ err := createTopic(clusterAdmin, topic, broker.Partitions, broker.Replicas)
+ if err != nil {
+ logger.Panic("Fail to create topic %s", err)
+ }
+
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+
+ wg.Add(1)
+
+ go topicListener(ctx, topics, consumer, wg)
+
wg.Wait()
+ cancel()
+}
+
+func createTopic(clusterAdmin sarama.ClusterAdmin, topic string, numPartitions int, replFactor int) error {
+ topicDetail := &sarama.TopicDetail{}
+
+ topicDetail.NumPartitions = int32(numPartitions)
+ topicDetail.ReplicationFactor = int16(replFactor)
+
+ topicDetail.ConfigEntries = make(map[string]*string)
+
+ err := clusterAdmin.CreateTopic(topic, topicDetail, false)
+ switch typedErr := err.(type) {
+ case *sarama.TopicError:
+ if typedErr.Err == sarama.ErrTopicAlreadyExists {
+ err = nil
+ }
+ }
+ if err != nil {
+ return err
+ }
+ return nil
}
func runServer(target TargetInfo) {
@@ -231,7 +289,6 @@
return m
}
-
func main() {
// load configuration
conf := loadConfigFile()
@@ -243,7 +300,6 @@
logger.Info("The utils.OnuSNhex : [%t]", utils.OnuSNhex)
logger.Info("The conf.Conv.Onusnformat is : [%t]", conf.Conv.Onusnhex)
-
go kafkaInit(conf.Broker)
runServer(conf.Target)
}