seba-631-632: exporter enhancements for logger and topics
Change-Id: If10e56a7ccfce758712ea02df9656d4f413dbf84
diff --git a/topic-exporter.go b/topic-exporter.go
new file mode 100644
index 0000000..3313f46
--- /dev/null
+++ b/topic-exporter.go
@@ -0,0 +1,381 @@
+// Copyright 2018 Open Networking Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "encoding/json"
+ "log"
+ "github.com/prometheus/client_golang/prometheus"
+ "gerrit.opencord.org/kafka-topic-exporter/common/logger"
+)
+
+var (
+ // voltha kpis
+ volthaTxBytesTotal = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "voltha_tx_bytes_total",
+ Help: "Number of total bytes transmitted",
+ },
+ []string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+ )
+ volthaRxBytesTotal = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "voltha_rx_bytes_total",
+ Help: "Number of total bytes received",
+ },
+ []string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+ )
+ volthaTxPacketsTotal = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "voltha_tx_packets_total",
+ Help: "Number of total packets transmitted",
+ },
+ []string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+ )
+ volthaRxPacketsTotal = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "voltha_rx_packets_total",
+ Help: "Number of total packets received",
+ },
+ []string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+ )
+
+ volthaTxErrorPacketsTotal = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "voltha_tx_error_packets_total",
+ Help: "Number of total transmitted packets error",
+ },
+ []string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+ )
+
+ volthaRxErrorPacketsTotal = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "voltha_rx_error_packets_total",
+ Help: "Number of total received packets error",
+ },
+ []string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+ )
+
+ // onos kpis
+ onosTxBytesTotal = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "onos_tx_bytes_total",
+ Help: "Number of total bytes transmitted",
+ },
+ []string{"device_id", "port_id"},
+ )
+ onosRxBytesTotal = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "onos_rx_bytes_total",
+ Help: "Number of total bytes received",
+ },
+ []string{"device_id", "port_id"},
+ )
+ onosTxPacketsTotal = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "onos_tx_packets_total",
+ Help: "Number of total packets transmitted",
+ },
+ []string{"device_id", "port_id"},
+ )
+ onosRxPacketsTotal = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "onos_rx_packets_total",
+ Help: "Number of total packets received",
+ },
+ []string{"device_id", "port_id"},
+ )
+
+ onosTxDropPacketsTotal = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "onos_tx_drop_packets_total",
+ Help: "Number of total transmitted packets dropped",
+ },
+ []string{"device_id", "port_id"},
+ )
+
+ onosRxDropPacketsTotal = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "onos_rx_drop_packets_total",
+ Help: "Number of total received packets dropped",
+ },
+ []string{"device_id", "port_id"},
+ )
+)
+
+func exportVolthaKPI(kpi VolthaKPI) {
+
+ for _, data := range kpi.SliceDatas {
+ switch title := data.Metadata.Title; title {
+ case "Ethernet", "PON":
+ volthaTxBytesTotal.WithLabelValues(
+ data.Metadata.LogicalDeviceID,
+ data.Metadata.SerialNumber,
+ data.Metadata.DeviceID,
+ data.Metadata.Context.InterfaceID,
+ data.Metadata.Context.PonID,
+ data.Metadata.Context.PortNumber,
+ data.Metadata.Title,
+ ).Set(data.Metrics.TxBytes)
+
+ volthaRxBytesTotal.WithLabelValues(
+ data.Metadata.LogicalDeviceID,
+ data.Metadata.SerialNumber,
+ data.Metadata.DeviceID,
+ data.Metadata.Context.InterfaceID,
+ data.Metadata.Context.PonID,
+ data.Metadata.Context.PortNumber,
+ data.Metadata.Title,
+ ).Set(data.Metrics.RxBytes)
+
+ volthaTxPacketsTotal.WithLabelValues(
+ data.Metadata.LogicalDeviceID,
+ data.Metadata.SerialNumber,
+ data.Metadata.DeviceID,
+ data.Metadata.Context.InterfaceID,
+ data.Metadata.Context.PonID,
+ data.Metadata.Context.PortNumber,
+ data.Metadata.Title,
+ ).Set(data.Metrics.TxPackets)
+
+ volthaRxPacketsTotal.WithLabelValues(
+ data.Metadata.LogicalDeviceID,
+ data.Metadata.SerialNumber,
+ data.Metadata.DeviceID,
+ data.Metadata.Context.InterfaceID,
+ data.Metadata.Context.PonID,
+ data.Metadata.Context.PortNumber,
+ data.Metadata.Title,
+ ).Set(data.Metrics.RxPackets)
+
+ volthaTxErrorPacketsTotal.WithLabelValues(
+ data.Metadata.LogicalDeviceID,
+ data.Metadata.SerialNumber,
+ data.Metadata.DeviceID,
+ data.Metadata.Context.InterfaceID,
+ data.Metadata.Context.PonID,
+ data.Metadata.Context.PortNumber,
+ data.Metadata.Title,
+ ).Set(data.Metrics.TxErrorPackets)
+
+ volthaRxErrorPacketsTotal.WithLabelValues(
+ data.Metadata.LogicalDeviceID,
+ data.Metadata.SerialNumber,
+ data.Metadata.DeviceID,
+ data.Metadata.Context.InterfaceID,
+ data.Metadata.Context.PonID,
+ data.Metadata.Context.PortNumber,
+ data.Metadata.Title,
+ ).Set(data.Metrics.RxErrorPackets)
+
+ // TODO add metrics for:
+ // TxBcastPackets
+ // TxUnicastPackets
+ // TxMulticastPackets
+ // RxBcastPackets
+ // RxMulticastPackets
+
+ case "Ethernet_Bridge_Port_History":
+ if data.Metadata.Context.Upstream == "True" {
+ // ONU. Extended Ethernet statistics.
+ volthaTxPacketsTotal.WithLabelValues(
+ data.Metadata.LogicalDeviceID,
+ data.Metadata.SerialNumber,
+ data.Metadata.DeviceID,
+ "NA", // InterfaceID
+ "NA", // PonID
+ "NA", // PortNumber
+ data.Metadata.Title,
+ ).Add(data.Metrics.Packets)
+
+ volthaTxBytesTotal.WithLabelValues(
+ data.Metadata.LogicalDeviceID,
+ data.Metadata.SerialNumber,
+ data.Metadata.DeviceID,
+ "NA", // InterfaceID
+ "NA", // PonID
+ "NA", // PortNumber
+ data.Metadata.Title,
+ ).Add(data.Metrics.Octets)
+ } else {
+ // ONU. Extended Ethernet statistics.
+ volthaRxPacketsTotal.WithLabelValues(
+ data.Metadata.LogicalDeviceID,
+ data.Metadata.SerialNumber,
+ data.Metadata.DeviceID,
+ "NA", // InterfaceID
+ "NA", // PonID
+ "NA", // PortNumber
+ data.Metadata.Title,
+ ).Add(data.Metrics.Packets)
+
+ volthaRxBytesTotal.WithLabelValues(
+ data.Metadata.LogicalDeviceID,
+ data.Metadata.SerialNumber,
+ data.Metadata.DeviceID,
+ "NA", // InterfaceID
+ "NA", // PonID
+ "NA", // PortNumber
+ data.Metadata.Title,
+ ).Add(data.Metrics.Octets)
+ }
+
+ case "Ethernet_UNI_History":
+ // ONU. Do nothing.
+
+ case "FEC_History":
+ // ONU. Do Nothing.
+
+ volthaTxBytesTotal.WithLabelValues(
+ data.Metadata.LogicalDeviceID,
+ data.Metadata.SerialNumber,
+ data.Metadata.DeviceID,
+ data.Metadata.Context.InterfaceID,
+ data.Metadata.Context.PonID,
+ data.Metadata.Context.PortNumber,
+ data.Metadata.Title,
+ ).Set(data.Metrics.TxBytes)
+
+ volthaRxBytesTotal.WithLabelValues(
+ data.Metadata.LogicalDeviceID,
+ data.Metadata.SerialNumber,
+ data.Metadata.DeviceID,
+ data.Metadata.Context.InterfaceID,
+ data.Metadata.Context.PonID,
+ data.Metadata.Context.PortNumber,
+ data.Metadata.Title,
+ ).Set(data.Metrics.RxBytes)
+
+ volthaTxPacketsTotal.WithLabelValues(
+ data.Metadata.LogicalDeviceID,
+ data.Metadata.SerialNumber,
+ data.Metadata.DeviceID,
+ data.Metadata.Context.InterfaceID,
+ data.Metadata.Context.PonID,
+ data.Metadata.Context.PortNumber,
+ data.Metadata.Title,
+ ).Set(data.Metrics.TxPackets)
+
+ volthaRxPacketsTotal.WithLabelValues(
+ data.Metadata.LogicalDeviceID,
+ data.Metadata.SerialNumber,
+ data.Metadata.DeviceID,
+ data.Metadata.Context.InterfaceID,
+ data.Metadata.Context.PonID,
+ data.Metadata.Context.PortNumber,
+ data.Metadata.Title,
+ ).Set(data.Metrics.RxPackets)
+
+ volthaTxErrorPacketsTotal.WithLabelValues(
+ data.Metadata.LogicalDeviceID,
+ data.Metadata.SerialNumber,
+ data.Metadata.DeviceID,
+ data.Metadata.Context.InterfaceID,
+ data.Metadata.Context.PonID,
+ data.Metadata.Context.PortNumber,
+ data.Metadata.Title,
+ ).Set(data.Metrics.TxErrorPackets)
+
+ volthaRxErrorPacketsTotal.WithLabelValues(
+ data.Metadata.LogicalDeviceID,
+ data.Metadata.SerialNumber,
+ data.Metadata.DeviceID,
+ data.Metadata.Context.InterfaceID,
+ data.Metadata.Context.PonID,
+ data.Metadata.Context.PortNumber,
+ data.Metadata.Title,
+ ).Set(data.Metrics.RxErrorPackets)
+
+ // TODO add metrics for:
+ // TxBcastPackets
+ // TxUnicastPackets
+ // TxMulticastPackets
+ // RxBcastPackets
+ // RxMulticastPackets
+
+ case "voltha.internal":
+ // Voltha Internal. Do nothing.
+ }
+ }
+}
+
+func exportOnosKPI(kpi OnosKPI) {
+
+ for _, data := range kpi.Ports {
+
+ onosTxBytesTotal.WithLabelValues(
+ kpi.DeviceID,
+ data.PortID,
+ ).Set(data.TxBytes)
+
+ onosRxBytesTotal.WithLabelValues(
+ kpi.DeviceID,
+ data.PortID,
+ ).Set(data.RxBytes)
+
+ onosTxPacketsTotal.WithLabelValues(
+ kpi.DeviceID,
+ data.PortID,
+ ).Set(data.TxPackets)
+
+ onosRxPacketsTotal.WithLabelValues(
+ kpi.DeviceID,
+ data.PortID,
+ ).Set(data.RxPackets)
+
+ onosTxDropPacketsTotal.WithLabelValues(
+ kpi.DeviceID,
+ data.PortID,
+ ).Set(data.TxPacketsDrop)
+
+ onosRxDropPacketsTotal.WithLabelValues(
+ kpi.DeviceID,
+ data.PortID,
+ ).Set(data.RxPacketsDrop)
+ }
+}
+
+func exportImporterKPI(kpi ImporterKPI) {
+ // TODO: add metrics for importer data
+ logger.Info("To be implemented")
+}
+
+func export(topic *string, data []byte) {
+ switch *topic {
+ case "voltha.kpis":
+ kpi := VolthaKPI{}
+ err := json.Unmarshal(data, &kpi)
+ if err != nil {
+ log.Fatal(err)
+ }
+ exportVolthaKPI(kpi)
+ case "onos.kpis":
+ kpi := OnosKPI{}
+ err := json.Unmarshal(data, &kpi)
+ if err != nil {
+ log.Fatal(err)
+ }
+ exportOnosKPI(kpi)
+ case "importer.kpis":
+ kpi := ImporterKPI{}
+ err := json.Unmarshal(data, &kpi)
+ if err != nil {
+ log.Fatal(err)
+ }
+ exportImporterKPI(kpi)
+ default:
+ logger.Warn("Unexpected export. Should not come here")
+ }
+}