blob: 106421f63a004f56b012c82c205bf50c8b761d8d [file] [log] [blame]
Matteo Scandoloa8bd93e2018-09-13 13:36:50 -07001// Copyright 2018 Open Networking Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
Matteo Scandolo189526a2018-07-13 09:10:23 -070015package main
16
17import (
18 "encoding/json"
19 "flag"
20 "fmt"
Matteo Scandoloa8bd93e2018-09-13 13:36:50 -070021 "log"
Matteo Scandolo189526a2018-07-13 09:10:23 -070022 "net/http"
23 "os"
24 "os/signal"
Matteo Scandolo189526a2018-07-13 09:10:23 -070025
26 "github.com/Shopify/sarama"
27 "github.com/prometheus/client_golang/prometheus"
28)
29
30var (
Matteo Scandoloa8bd93e2018-09-13 13:36:50 -070031 broker = flag.String("broker", "voltha-kafka.default.svc.cluster.local:9092", "The Kafka broker")
32 topic = flag.String("topic", "voltha.kpis", "The Kafka topic")
Matteo Scandolo189526a2018-07-13 09:10:23 -070033)
34
Matteo Scandolo8bc8ba62018-07-27 12:16:20 -070035var brokers []string
Matteo Scandolo189526a2018-07-13 09:10:23 -070036
Matteo Scandolo8bc8ba62018-07-27 12:16:20 -070037func kafkaInit(brokers []string) {
Matteo Scandolo189526a2018-07-13 09:10:23 -070038 config := sarama.NewConfig()
39 config.Consumer.Return.Errors = true
40
41 master, err := sarama.NewConsumer(brokers, config)
42 if err != nil {
43 panic(err)
44 }
45 defer func() {
46 if err := master.Close(); err != nil {
47 panic(err)
48 }
49 }()
50 consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
51 if err != nil {
52 panic(err)
53 }
54 signals := make(chan os.Signal, 1)
55 signal.Notify(signals, os.Interrupt)
56 doneCh := make(chan struct{})
57 go func() {
58 for {
59 select {
60 case err := <-consumer.Errors():
61 fmt.Println(err)
62 case msg := <-consumer.Messages():
Matteo Scandoloa8bd93e2018-09-13 13:36:50 -070063 // fmt.Println(string(msg.Value))
Matteo Scandolo189526a2018-07-13 09:10:23 -070064
Matteo Scandoloa8bd93e2018-09-13 13:36:50 -070065 kpi := KPI{}
Matteo Scandolo189526a2018-07-13 09:10:23 -070066
Matteo Scandoloa8bd93e2018-09-13 13:36:50 -070067 err := json.Unmarshal(msg.Value, &kpi)
Matteo Scandolo189526a2018-07-13 09:10:23 -070068
Matteo Scandoloa8bd93e2018-09-13 13:36:50 -070069 if err != nil {
70 log.Fatal(err)
Matteo Scandolo189526a2018-07-13 09:10:23 -070071 }
72
Matteo Scandoloa8bd93e2018-09-13 13:36:50 -070073 export(kpi)
Matteo Scandolo189526a2018-07-13 09:10:23 -070074
75 case <-signals:
76 fmt.Println("Interrupt is detected")
77 doneCh <- struct{}{}
78 }
79 }
80 }()
81 <-doneCh
Matteo Scandolo189526a2018-07-13 09:10:23 -070082}
83
84func runServer() {
85 fmt.Println("Starting Server")
86 http.Handle("/metrics", prometheus.Handler())
87 http.ListenAndServe(":8080", nil)
88}
89
90func init() {
91
92 // read config from cli flags
93 flag.Parse()
Matteo Scandolo8bc8ba62018-07-27 12:16:20 -070094 brokers = make([]string, 0)
95 brokers = []string{*broker}
96 fmt.Println("Connecting to broker: ", brokers)
Matteo Scandolo189526a2018-07-13 09:10:23 -070097 fmt.Println("Listening to topic: ", *topic)
98
99 // register metrics within Prometheus
100 prometheus.MustRegister(txBytesTotal)
101 prometheus.MustRegister(rxBytesTotal)
102 prometheus.MustRegister(txPacketsTotal)
103 prometheus.MustRegister(rxPacketsTotal)
104}
105
106func main() {
Matteo Scandolo8bc8ba62018-07-27 12:16:20 -0700107 go kafkaInit(brokers)
Matteo Scandolo189526a2018-07-13 09:10:23 -0700108 runServer()
109}