[SEBA-128] Using new KPI format

Change-Id: I25aee363f35dd1380af990bdfd9eca65d30ffc54
diff --git a/main.go b/main.go
index 512dba3..106421f 100644
--- a/main.go
+++ b/main.go
@@ -1,91 +1,38 @@
+// Copyright 2018 Open Networking Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package main
 
 import (
 	"encoding/json"
 	"flag"
 	"fmt"
-	"math"
+	"log"
 	"net/http"
 	"os"
 	"os/signal"
-	"reflect"
-	"strings"
 
 	"github.com/Shopify/sarama"
 	"github.com/prometheus/client_golang/prometheus"
 )
 
 var (
-	txBytesTotal = prometheus.NewGaugeVec(
-		prometheus.GaugeOpts{
-			Name: "tx_bytes_total",
-			Help: "Number of total bytes transmitted, partitioned by device_id, port_type and port_id",
-		},
-		[]string{"device_id", "port_type", "port_id"},
-	)
-	rxBytesTotal = prometheus.NewGaugeVec(
-		prometheus.GaugeOpts{
-			Name: "rx_bytes_total",
-			Help: "Number of total bytes received, partitioned by device_id, port_type and port_id",
-		},
-		[]string{"device_id", "port_type", "port_id"},
-	)
-	txPacketsTotal = prometheus.NewGaugeVec(
-		prometheus.GaugeOpts{
-			Name: "tx_packets_total",
-			Help: "Number of total packets transmitted, partitioned by device_id, port_type and port_id",
-		},
-		[]string{"device_id", "port_type", "port_id"},
-	)
-	rxPacketsTotal = prometheus.NewGaugeVec(
-		prometheus.GaugeOpts{
-			Name: "rx_packets_total",
-			Help: "Number of total packets received, partitioned by device_id, port_type and port_id",
-		},
-		[]string{"device_id", "port_type", "port_id"},
-	)
-
-	// config
-	broker  = flag.String("broker", "voltha-kafka.default.svc.cluster.local:9092", "The Kafka broker")
-	topic   = flag.String("topic", "voltha.kpis", "The Kafka topic")
+	broker = flag.String("broker", "voltha-kafka.default.svc.cluster.local:9092", "The Kafka broker")
+	topic  = flag.String("topic", "voltha.kpis", "The Kafka topic")
 )
 
 var brokers []string
-var messageCountStart int
-
-func prefixToLabels(prefix string) (string, string, string) {
-	var p = strings.Split(prefix, ".")
-	var deviceId, portType, portId string = "", "", ""
-	if len(p) == 5 {
-		// format is voltha.openolt.000130af0b0b2c51.pon.0
-		deviceId = p[2]
-		portType = p[3]
-		portId = p[4]
-	}
-	if len(p) == 4 {
-		// fomrat is voltha.openolt.000130af0b0b2c51nni.129
-		s := p[2]
-		deviceId = string(s[0 : len(s)-3])
-		portType = string(s[len(s)-3:])
-		portId = p[3]
-	}
-
-	return deviceId, portType, portId
-
-}
-
-func interfaceToFloat(unk interface{}) float64 {
-	switch i := unk.(type) {
-	case float64:
-		return i
-	case float32:
-		return float64(i)
-	case int64:
-		return float64(i)
-	default:
-		return math.NaN()
-	}
-}
 
 func kafkaInit(brokers []string) {
 	config := sarama.NewConfig()
@@ -113,53 +60,17 @@
 			case err := <-consumer.Errors():
 				fmt.Println(err)
 			case msg := <-consumer.Messages():
-				messageCountStart++
+				// fmt.Println(string(msg.Value))
 
-				var label map[string]interface{}
-				json.Unmarshal(msg.Value, &label)
+				kpi := KPI{}
 
-				var data map[string]map[string]map[string]interface{}
-				json.Unmarshal(msg.Value, &data)
+				err := json.Unmarshal(msg.Value, &kpi)
 
-				var tagString = reflect.ValueOf(label["prefixes"]).MapKeys()[0].String()
-
-				fmt.Println("tagString: "+tagString, "\n")
-				fmt.Println("data: ", data["prefixes"][tagString]["metrics"], "\n")
-
-				v, ok := data["prefixes"][tagString]["metrics"].(map[string]interface{})
-				if !ok {
-					// Can't assert, handle error.
-					fmt.Println("Eroror")
-				}
-				for k, s := range v {
-					fmt.Println("Type k: ", reflect.TypeOf(k))
-					fmt.Println("Type: ", reflect.TypeOf(s))
-					fmt.Printf("Value: %v\n", s)
-
-					d, pt, pi := prefixToLabels(tagString)
-
-					if k == "tx_bytes" {
-						txBytesTotal.WithLabelValues(d, pt, pi).Set(interfaceToFloat(s))
-					}
-					if k == "rx_bytes" {
-						rxBytesTotal.WithLabelValues(d, pt, pi).Set(interfaceToFloat(s))
-					}
-					if k == "tx_packets" {
-						txPacketsTotal.WithLabelValues(d, pt, pi).Set(interfaceToFloat(s))
-					}
-					if k == "rx_packets" {
-						rxPacketsTotal.WithLabelValues(d, pt, pi).Set(interfaceToFloat(s))
-					}
+				if err != nil {
+					log.Fatal(err)
 				}
 
-				// fmt.Println("data: ", data["prefixes"][tagString]["metrics"].tx_bytes, "\n")
-				// var txBytesTotalValue = data["prefixes"][tagString]["metrics"]["tx_bytes"]
-
-				// d, pt, pi := prefixToLabels(tagString)
-
-				// txBytesTotal.WithLabelValues(d, pt, pi).Set(float64(txBytesTotalValue))
-
-				// fmt.Println("Adding txBytesTotal metric: ", d, pt, pi, txBytesTotalValue)
+				export(kpi)
 
 			case <-signals:
 				fmt.Println("Interrupt is detected")
@@ -168,7 +79,6 @@
 		}
 	}()
 	<-doneCh
-	fmt.Println("Processed", messageCountStart, "messages")
 }
 
 func runServer() {