[SEBA-293] Agg swith portstats
Change-Id: Ie11fe348d60841fda1fdb52ba57c3accf063ce10
diff --git a/main.go b/main.go
index 106421f..0eba8ba 100644
--- a/main.go
+++ b/main.go
@@ -15,21 +15,22 @@
package main
import (
- "encoding/json"
"flag"
"fmt"
- "log"
"net/http"
- "os"
- "os/signal"
+ "sync"
"github.com/Shopify/sarama"
"github.com/prometheus/client_golang/prometheus"
)
var (
- broker = flag.String("broker", "voltha-kafka.default.svc.cluster.local:9092", "The Kafka broker")
- topic = flag.String("topic", "voltha.kpis", "The Kafka topic")
+ broker = flag.String("broker", "voltha-kafka.default.svc.cluster.local:9092", "The Kafka broker")
+ volthaTopic = "voltha.kpis"
+ onosTopic = "onos.kpis"
+
+ volthaTopicPointer = &volthaTopic
+ onosTopicPointer = &onosTopic
)
var brokers []string
@@ -37,48 +38,25 @@
func kafkaInit(brokers []string) {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
+ var wg sync.WaitGroup
+
+ wg.Add(2) // we are spinning up two thread and we need to wait for them to exit before stopping the kafka connection
master, err := sarama.NewConsumer(brokers, config)
if err != nil {
+ fmt.Println("kafkaInit panic")
panic(err)
}
defer func() {
+ fmt.Println("kafkaInit close connection")
if err := master.Close(); err != nil {
panic(err)
}
}()
- consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
- if err != nil {
- panic(err)
- }
- signals := make(chan os.Signal, 1)
- signal.Notify(signals, os.Interrupt)
- doneCh := make(chan struct{})
- go func() {
- for {
- select {
- case err := <-consumer.Errors():
- fmt.Println(err)
- case msg := <-consumer.Messages():
- // fmt.Println(string(msg.Value))
+ go VOLTHAListener(volthaTopicPointer, master, wg)
+ go ONOSListener(onosTopicPointer, master, wg)
- kpi := KPI{}
-
- err := json.Unmarshal(msg.Value, &kpi)
-
- if err != nil {
- log.Fatal(err)
- }
-
- export(kpi)
-
- case <-signals:
- fmt.Println("Interrupt is detected")
- doneCh <- struct{}{}
- }
- }
- }()
- <-doneCh
+ wg.Wait()
}
func runServer() {
@@ -94,13 +72,23 @@
brokers = make([]string, 0)
brokers = []string{*broker}
fmt.Println("Connecting to broker: ", brokers)
- fmt.Println("Listening to topic: ", *topic)
+ fmt.Println("Listening to voltha on topic: ", *volthaTopicPointer)
+ fmt.Println("Listening to onos on topic: ", *onosTopicPointer)
// register metrics within Prometheus
- prometheus.MustRegister(txBytesTotal)
- prometheus.MustRegister(rxBytesTotal)
- prometheus.MustRegister(txPacketsTotal)
- prometheus.MustRegister(rxPacketsTotal)
+ prometheus.MustRegister(volthaTxBytesTotal)
+ prometheus.MustRegister(volthaRxBytesTotal)
+ prometheus.MustRegister(volthaTxPacketsTotal)
+ prometheus.MustRegister(volthaRxPacketsTotal)
+ prometheus.MustRegister(volthaTxErrorPacketsTotal)
+ prometheus.MustRegister(volthaRxErrorPacketsTotal)
+
+ prometheus.MustRegister(onosTxBytesTotal)
+ prometheus.MustRegister(onosRxBytesTotal)
+ prometheus.MustRegister(onosTxPacketsTotal)
+ prometheus.MustRegister(onosRxPacketsTotal)
+ prometheus.MustRegister(onosTxDropPacketsTotal)
+ prometheus.MustRegister(onosRxDropPacketsTotal)
}
func main() {