seba-631-632: exporter enhancements for logger and topics

Change-Id: If10e56a7ccfce758712ea02df9656d4f413dbf84
diff --git a/main.go b/main.go
index 0eba8ba..cda8c19 100644
--- a/main.go
+++ b/main.go
@@ -15,66 +15,63 @@
 package main
 
 import (
-	"flag"
-	"fmt"
+	"strconv"
+	"gopkg.in/yaml.v2"
+	"io/ioutil"
+	"log"
 	"net/http"
+	"strings"
 	"sync"
 
 	"github.com/Shopify/sarama"
 	"github.com/prometheus/client_golang/prometheus"
+	"gerrit.opencord.org/kafka-topic-exporter/common/logger"
 )
 
-var (
-	broker      = flag.String("broker", "voltha-kafka.default.svc.cluster.local:9092", "The Kafka broker")
-	volthaTopic = "voltha.kpis"
-	onosTopic   = "onos.kpis"
-
-	volthaTopicPointer = &volthaTopic
-	onosTopicPointer   = &onosTopic
-)
-
-var brokers []string
-
-func kafkaInit(brokers []string) {
+func kafkaInit(broker BrokerInfo) {
 	config := sarama.NewConfig()
 	config.Consumer.Return.Errors = true
 	var wg sync.WaitGroup
 
-	wg.Add(2) // we are spinning up two thread and we need to wait for them to exit before stopping the kafka connection
-
-	master, err := sarama.NewConsumer(brokers, config)
+	master, err := sarama.NewConsumer([]string{broker.Host}, config)
 	if err != nil {
-		fmt.Println("kafkaInit panic")
+		logger.Panic("kafkaInit panic")
 		panic(err)
 	}
 	defer func() {
-		fmt.Println("kafkaInit close connection")
+		logger.Debug("kafkaInit close connection")
 		if err := master.Close(); err != nil {
 			panic(err)
 		}
 	}()
-	go VOLTHAListener(volthaTopicPointer, master, wg)
-	go ONOSListener(onosTopicPointer, master, wg)
+
+	// read topics from config
+	topics := broker.Topics
+
+	// we are spinning threads for each topic, we need to wait for
+	// them to exit before stopping the kafka connection
+	wg.Add(len(topics))
+
+	for _, topic := range topics {
+		t := topic
+		go topicListener(&t, master, wg)
+	}
 
 	wg.Wait()
 }
 
-func runServer() {
-	fmt.Println("Starting Server")
+func runServer(target TargetInfo) {
+	if target.Port == 0 {
+		logger.Warn("Prometheus target port not configured, using default 8080")
+		target.Port = 8080
+	}
+	logger.Debug("Starting HTTP Server on %d port", target.Port)
 	http.Handle("/metrics", prometheus.Handler())
-	http.ListenAndServe(":8080", nil)
+	http.ListenAndServe(":"+strconv.Itoa(target.Port), nil)
 }
 
 func init() {
 
-	// read config from cli flags
-	flag.Parse()
-	brokers = make([]string, 0)
-	brokers = []string{*broker}
-	fmt.Println("Connecting to broker: ", brokers)
-	fmt.Println("Listening to voltha on topic: ", *volthaTopicPointer)
-	fmt.Println("Listening to onos on topic: ", *onosTopicPointer)
-
 	// register metrics within Prometheus
 	prometheus.MustRegister(volthaTxBytesTotal)
 	prometheus.MustRegister(volthaRxBytesTotal)
@@ -91,7 +88,28 @@
 	prometheus.MustRegister(onosRxDropPacketsTotal)
 }
 
-func main() {
-	go kafkaInit(brokers)
-	runServer()
+func loadConfigFile() Config {
+	m := Config{}
+	// this file path is configmap mounted in pod yaml
+	yamlFile, err := ioutil.ReadFile("/etc/config/conf.yaml")
+	if err != nil {
+		log.Printf("yamlFile.Get err: %v ", err)
+	}
+	err = yaml.Unmarshal(yamlFile, &m)
+	if err != nil {
+		log.Fatalf("Unmarshal: %v", err)
+	}
+	return m
 }
+
+func main() {
+	// load configuration
+	conf := loadConfigFile()
+
+	// logger setup
+	logger.Setup(conf.Logger.Host, strings.ToUpper(conf.Logger.LogLevel))
+	logger.Info("Connecting to broker: [%s]", conf.Broker.Host)
+
+	go kafkaInit(conf.Broker)
+	runServer(conf.Target)
+}
\ No newline at end of file