Exporting PPPoE stats from ONOS

Change-Id: I031b9e747e77ecf4220238c0f27d7618851da7e0
diff --git a/topic-exporter.go b/topic-exporter.go
index c306975..5d33458 100644
--- a/topic-exporter.go
+++ b/topic-exporter.go
@@ -18,6 +18,8 @@
 	"encoding/json"
 	"gerrit.opencord.org/kafka-topic-exporter/common/logger"
 	"github.com/prometheus/client_golang/prometheus"
+	log "github.com/sirupsen/logrus"
+	"strconv"
 )
 
 var (
@@ -215,6 +217,72 @@
 			Name: "onosaaa_request_re_tx",
 			Help: "Number of access request packets retransmitted to the server",
 		})
+
+	// ONOS PPPoE kpis
+
+	onosPppoeUpTermBytes = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onosPppoeupTermBytes",
+			Help: "onosPppoeupTermBytes",
+		},
+		[]string{"mac_address", "ip", "session_id", "s_tag", "c_tag", "onu_serial"},
+	)
+	onosPppoeUpTermPackets = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onosPppoeupTermPackets",
+			Help: "onosPppoeupTermPackets",
+		},
+		[]string{"mac_address", "ip", "session_id", "s_tag", "c_tag", "onu_serial"},
+	)
+	onosPppoeUpDropBytes = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onosPppoeupDropBytes",
+			Help: "onosPppoeupDropBytes",
+		},
+		[]string{"mac_address", "ip", "session_id", "s_tag", "c_tag", "onu_serial"},
+	)
+	onosPppoeUpDropPackets = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onosPppoeupDropPackets",
+			Help: "onosPppoeupDropPackets",
+		},
+		[]string{"mac_address", "ip", "session_id", "s_tag", "c_tag", "onu_serial"},
+	)
+	onosPppoeUpControlPackets = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onosPppoeupControlPackets",
+			Help: "onosPppoeupControlPackets",
+		},
+		[]string{"mac_address", "ip", "session_id", "s_tag", "c_tag", "onu_serial"},
+	)
+	onosPppoeDownRxBytes = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onosPppoedownRxBytes",
+			Help: "onosPppoedownRxBytes",
+		},
+		[]string{"mac_address", "ip", "session_id", "s_tag", "c_tag", "onu_serial"},
+	)
+	onosPppoeDownRxPackets = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onosPppoedownRxPackets",
+			Help: "onosPppoedownRxPackets",
+		},
+		[]string{"mac_address", "ip", "session_id", "s_tag", "c_tag", "onu_serial"},
+	)
+	onosPppoeDownTxBytes = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onosPppoedownTxBytes",
+			Help: "onosPppoedownTxBytes",
+		},
+		[]string{"mac_address", "ip", "session_id", "s_tag", "c_tag", "onu_serial"},
+	)
+	onosPppoeDownTxPackets = prometheus.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "onosPppoedownTxPackets",
+			Help: "onosPppoedownTxPackets",
+		},
+		[]string{"mac_address", "ip", "session_id", "s_tag", "c_tag", "onu_serial"},
+	)
 )
 
 func exportVolthaKPI(kpi VolthaKPI) {
@@ -385,7 +453,6 @@
 				data.Metadata.Title,
 			).Set(data.Metrics.ReceivedOpticalPower)
 
-
 		case "Ethernet_UNI_History":
 			// ONU. Do nothing.
 
@@ -533,6 +600,93 @@
 	onosaaaRequestReTx.Set(kpi.RequestReTx)
 }
 
+func exportOnosPppoeKPI(kpi OnosPppoeKPI) {
+	for _, s := range kpi.Subscribers {
+
+		logger.WithFields(log.Fields{
+			"Mac":        s.Mac,
+			"Ip":         s.Ip,
+			"SessionId":  strconv.Itoa(s.SessionId),
+			"STag":       strconv.Itoa(s.STag),
+			"CTag":       strconv.Itoa(s.CTag),
+			"onuSerialNumber": s.SerialNumber,
+		}).Trace("Received OnosPppoeKPI message")
+
+		onosPppoeUpTermBytes.WithLabelValues(
+			s.Mac,
+			s.Ip,
+			strconv.Itoa(s.SessionId),
+			strconv.Itoa(s.STag),
+			strconv.Itoa(s.CTag),
+			s.SerialNumber,
+		).Set(s.UpTermBytes)
+		onosPppoeUpTermPackets.WithLabelValues(
+			s.Mac,
+			s.Ip,
+			strconv.Itoa(s.SessionId),
+			strconv.Itoa(s.STag),
+			strconv.Itoa(s.CTag),
+			s.SerialNumber,
+		).Set(s.UpTermPackets)
+		onosPppoeUpDropBytes.WithLabelValues(
+			s.Mac,
+			s.Ip,
+			strconv.Itoa(s.SessionId),
+			strconv.Itoa(s.STag),
+			strconv.Itoa(s.CTag),
+			s.SerialNumber,
+		).Set(s.UpDropBytes)
+		onosPppoeUpDropPackets.WithLabelValues(
+			s.Mac,
+			s.Ip,
+			strconv.Itoa(s.SessionId),
+			strconv.Itoa(s.STag),
+			strconv.Itoa(s.CTag),
+			s.SerialNumber,
+		).Set(s.UpDropPackets)
+		onosPppoeUpControlPackets.WithLabelValues(
+			s.Mac,
+			s.Ip,
+			strconv.Itoa(s.SessionId),
+			strconv.Itoa(s.STag),
+			strconv.Itoa(s.CTag),
+			s.SerialNumber,
+		).Set(s.UpControlPackets)
+		onosPppoeDownRxBytes.WithLabelValues(
+			s.Mac,
+			s.Ip,
+			strconv.Itoa(s.SessionId),
+			strconv.Itoa(s.STag),
+			strconv.Itoa(s.CTag),
+			s.SerialNumber,
+		).Set(s.DownRxBytes)
+		onosPppoeDownRxPackets.WithLabelValues(
+			s.Mac,
+			s.Ip,
+			strconv.Itoa(s.SessionId),
+			strconv.Itoa(s.STag),
+			strconv.Itoa(s.CTag),
+			s.SerialNumber,
+		).Set(s.DownRxPackets)
+		onosPppoeDownTxBytes.WithLabelValues(
+			s.Mac,
+			s.Ip,
+			strconv.Itoa(s.SessionId),
+			strconv.Itoa(s.STag),
+			strconv.Itoa(s.CTag),
+			s.SerialNumber,
+		).Set(s.DownTxBytes)
+		onosPppoeDownTxPackets.WithLabelValues(
+			s.Mac,
+			s.Ip,
+			strconv.Itoa(s.SessionId),
+			strconv.Itoa(s.STag),
+			strconv.Itoa(s.CTag),
+			s.SerialNumber,
+		).Set(s.DownTxPackets)
+	}
+}
+
 func export(topic *string, data []byte) {
 	switch *topic {
 	case "voltha.kpis":
@@ -567,6 +721,14 @@
 			break
 		}
 		exportOnosAaaKPI(kpi)
+	case "pppoe.stats":
+		kpi := OnosPppoeKPI{}
+		err := json.Unmarshal(data, &kpi)
+		if err != nil {
+			logger.Error("Invalid msg on pppoe.stats: %s, Unprocessed Msg: %s", err.Error(), string(data))
+			break
+		}
+		exportOnosPppoeKPI(kpi)
 	default:
 		logger.Warn("Unexpected export. Topic [%s] not supported. Should not come here", *topic)
 	}