[VOL-5527]:Implement new KPI event to handle publishing of MEs with 64-bit attributes
Change-Id: I216ef0fba07b80fcfb8e1c39139d8fdc5a963166
Signed-off-by: pnalmas <praneeth.nalmas@radisys.com>
diff --git a/pkg/events/events_proxy.go b/pkg/events/events_proxy.go
index 7d08e51..d93a67b 100644
--- a/pkg/events/events_proxy.go
+++ b/pkg/events/events_proxy.go
@@ -180,6 +180,33 @@
}
+// SendKpiEvent3 is to send kpi events with 64-bit counter support to voltha.event topic
+func (ep *EventProxy) SendKpiEvent3(ctx context.Context, id string, kpiEvent *voltha.KpiEvent3, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
+ if kpiEvent == nil {
+ logger.Error(ctx, "Received empty kpi event3")
+ return errors.New("KPI event3 nil")
+ }
+ var event voltha.Event
+ var de voltha.Event_KpiEvent3
+ var err error
+ de.KpiEvent3 = kpiEvent
+ if event.Header, err = ep.getEventHeader(id, category, &subCategory, voltha.EventType_KPI_EVENT3, raisedTs); err != nil {
+ return err
+ }
+ event.EventType = &de
+
+ if err := ep.sendEvent(ctx, &event, strconv.FormatInt(raisedTs, 10)); err != nil {
+ logger.Errorw(ctx, "Failed to send kpi event3 to KAFKA bus", log.Fields{"device-event": kpiEvent})
+ return err
+ }
+ logger.Debugw(ctx, "Successfully sent kpi event3 to KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
+ "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
+ "ReportedTs": event.Header.ReportedTs, "KpiEventName": "STATS_EVENT"})
+
+ return nil
+
+}
+
func (ep *EventProxy) sendEvent(ctx context.Context, event *voltha.Event, key string) error {
logger.Debugw(ctx, "Send event to kafka", log.Fields{"event": event})
if err := ep.kafkaClient.Send(ctx, event, &ep.eventTopic, key); err != nil {