seba-631-632: exporter enhancements for logger and topics
Change-Id: If10e56a7ccfce758712ea02df9656d4f413dbf84
diff --git a/main.go b/main.go
index 0eba8ba..cda8c19 100644
--- a/main.go
+++ b/main.go
@@ -15,66 +15,63 @@
package main
import (
- "flag"
- "fmt"
+ "strconv"
+ "gopkg.in/yaml.v2"
+ "io/ioutil"
+ "log"
"net/http"
+ "strings"
"sync"
"github.com/Shopify/sarama"
"github.com/prometheus/client_golang/prometheus"
+ "gerrit.opencord.org/kafka-topic-exporter/common/logger"
)
-var (
- broker = flag.String("broker", "voltha-kafka.default.svc.cluster.local:9092", "The Kafka broker")
- volthaTopic = "voltha.kpis"
- onosTopic = "onos.kpis"
-
- volthaTopicPointer = &volthaTopic
- onosTopicPointer = &onosTopic
-)
-
-var brokers []string
-
-func kafkaInit(brokers []string) {
+func kafkaInit(broker BrokerInfo) {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
var wg sync.WaitGroup
- wg.Add(2) // we are spinning up two thread and we need to wait for them to exit before stopping the kafka connection
-
- master, err := sarama.NewConsumer(brokers, config)
+ master, err := sarama.NewConsumer([]string{broker.Host}, config)
if err != nil {
- fmt.Println("kafkaInit panic")
+ logger.Panic("kafkaInit panic")
panic(err)
}
defer func() {
- fmt.Println("kafkaInit close connection")
+ logger.Debug("kafkaInit close connection")
if err := master.Close(); err != nil {
panic(err)
}
}()
- go VOLTHAListener(volthaTopicPointer, master, wg)
- go ONOSListener(onosTopicPointer, master, wg)
+
+ // read topics from config
+ topics := broker.Topics
+
+ // we are spinning threads for each topic, we need to wait for
+ // them to exit before stopping the kafka connection
+ wg.Add(len(topics))
+
+ for _, topic := range topics {
+ t := topic
+ go topicListener(&t, master, wg)
+ }
wg.Wait()
}
-func runServer() {
- fmt.Println("Starting Server")
+func runServer(target TargetInfo) {
+ if target.Port == 0 {
+ logger.Warn("Prometheus target port not configured, using default 8080")
+ target.Port = 8080
+ }
+ logger.Debug("Starting HTTP Server on %d port", target.Port)
http.Handle("/metrics", prometheus.Handler())
- http.ListenAndServe(":8080", nil)
+ http.ListenAndServe(":"+strconv.Itoa(target.Port), nil)
}
func init() {
- // read config from cli flags
- flag.Parse()
- brokers = make([]string, 0)
- brokers = []string{*broker}
- fmt.Println("Connecting to broker: ", brokers)
- fmt.Println("Listening to voltha on topic: ", *volthaTopicPointer)
- fmt.Println("Listening to onos on topic: ", *onosTopicPointer)
-
// register metrics within Prometheus
prometheus.MustRegister(volthaTxBytesTotal)
prometheus.MustRegister(volthaRxBytesTotal)
@@ -91,7 +88,28 @@
prometheus.MustRegister(onosRxDropPacketsTotal)
}
-func main() {
- go kafkaInit(brokers)
- runServer()
+func loadConfigFile() Config {
+ m := Config{}
+ // this file path is configmap mounted in pod yaml
+ yamlFile, err := ioutil.ReadFile("/etc/config/conf.yaml")
+ if err != nil {
+ log.Printf("yamlFile.Get err: %v ", err)
+ }
+ err = yaml.Unmarshal(yamlFile, &m)
+ if err != nil {
+ log.Fatalf("Unmarshal: %v", err)
+ }
+ return m
}
+
+func main() {
+ // load configuration
+ conf := loadConfigFile()
+
+ // logger setup
+ logger.Setup(conf.Logger.Host, strings.ToUpper(conf.Logger.LogLevel))
+ logger.Info("Connecting to broker: [%s]", conf.Broker.Host)
+
+ go kafkaInit(conf.Broker)
+ runServer(conf.Target)
+}
\ No newline at end of file