[SEBA-293] Agg swith portstats

Change-Id: Ie11fe348d60841fda1fdb52ba57c3accf063ce10
diff --git a/main.go b/main.go
index 106421f..0eba8ba 100644
--- a/main.go
+++ b/main.go
@@ -15,21 +15,22 @@
 package main
 
 import (
-	"encoding/json"
 	"flag"
 	"fmt"
-	"log"
 	"net/http"
-	"os"
-	"os/signal"
+	"sync"
 
 	"github.com/Shopify/sarama"
 	"github.com/prometheus/client_golang/prometheus"
 )
 
 var (
-	broker = flag.String("broker", "voltha-kafka.default.svc.cluster.local:9092", "The Kafka broker")
-	topic  = flag.String("topic", "voltha.kpis", "The Kafka topic")
+	broker      = flag.String("broker", "voltha-kafka.default.svc.cluster.local:9092", "The Kafka broker")
+	volthaTopic = "voltha.kpis"
+	onosTopic   = "onos.kpis"
+
+	volthaTopicPointer = &volthaTopic
+	onosTopicPointer   = &onosTopic
 )
 
 var brokers []string
@@ -37,48 +38,25 @@
 func kafkaInit(brokers []string) {
 	config := sarama.NewConfig()
 	config.Consumer.Return.Errors = true
+	var wg sync.WaitGroup
+
+	wg.Add(2) // we are spinning up two thread and we need to wait for them to exit before stopping the kafka connection
 
 	master, err := sarama.NewConsumer(brokers, config)
 	if err != nil {
+		fmt.Println("kafkaInit panic")
 		panic(err)
 	}
 	defer func() {
+		fmt.Println("kafkaInit close connection")
 		if err := master.Close(); err != nil {
 			panic(err)
 		}
 	}()
-	consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
-	if err != nil {
-		panic(err)
-	}
-	signals := make(chan os.Signal, 1)
-	signal.Notify(signals, os.Interrupt)
-	doneCh := make(chan struct{})
-	go func() {
-		for {
-			select {
-			case err := <-consumer.Errors():
-				fmt.Println(err)
-			case msg := <-consumer.Messages():
-				// fmt.Println(string(msg.Value))
+	go VOLTHAListener(volthaTopicPointer, master, wg)
+	go ONOSListener(onosTopicPointer, master, wg)
 
-				kpi := KPI{}
-
-				err := json.Unmarshal(msg.Value, &kpi)
-
-				if err != nil {
-					log.Fatal(err)
-				}
-
-				export(kpi)
-
-			case <-signals:
-				fmt.Println("Interrupt is detected")
-				doneCh <- struct{}{}
-			}
-		}
-	}()
-	<-doneCh
+	wg.Wait()
 }
 
 func runServer() {
@@ -94,13 +72,23 @@
 	brokers = make([]string, 0)
 	brokers = []string{*broker}
 	fmt.Println("Connecting to broker: ", brokers)
-	fmt.Println("Listening to topic: ", *topic)
+	fmt.Println("Listening to voltha on topic: ", *volthaTopicPointer)
+	fmt.Println("Listening to onos on topic: ", *onosTopicPointer)
 
 	// register metrics within Prometheus
-	prometheus.MustRegister(txBytesTotal)
-	prometheus.MustRegister(rxBytesTotal)
-	prometheus.MustRegister(txPacketsTotal)
-	prometheus.MustRegister(rxPacketsTotal)
+	prometheus.MustRegister(volthaTxBytesTotal)
+	prometheus.MustRegister(volthaRxBytesTotal)
+	prometheus.MustRegister(volthaTxPacketsTotal)
+	prometheus.MustRegister(volthaRxPacketsTotal)
+	prometheus.MustRegister(volthaTxErrorPacketsTotal)
+	prometheus.MustRegister(volthaRxErrorPacketsTotal)
+
+	prometheus.MustRegister(onosTxBytesTotal)
+	prometheus.MustRegister(onosRxBytesTotal)
+	prometheus.MustRegister(onosTxPacketsTotal)
+	prometheus.MustRegister(onosRxPacketsTotal)
+	prometheus.MustRegister(onosTxDropPacketsTotal)
+	prometheus.MustRegister(onosRxDropPacketsTotal)
 }
 
 func main() {