[VOL-5527]:Implement new KPI event to handle publishing of MEs with 64-bit attributes

Change-Id: I216ef0fba07b80fcfb8e1c39139d8fdc5a963166
Signed-off-by: pnalmas <praneeth.nalmas@radisys.com>
diff --git a/pkg/events/eventif/events_proxy_if.go b/pkg/events/eventif/events_proxy_if.go
index e0538b5..836df11 100644
--- a/pkg/events/eventif/events_proxy_if.go
+++ b/pkg/events/eventif/events_proxy_if.go
@@ -30,6 +30,8 @@
 		subCategory EventSubCategory, raisedTs int64, key string) error
 	SendKpiEvent(ctx context.Context, id string, deviceEvent *voltha.KpiEvent2, category EventCategory,
 		subCategory EventSubCategory, raisedTs int64) error
+	SendKpiEvent3(ctx context.Context, id string, deviceEvent *voltha.KpiEvent3, category EventCategory,
+		subCategory EventSubCategory, raisedTs int64) error
 	SendRPCEvent(ctx context.Context, id string, deviceEvent *voltha.RPCEvent, category EventCategory,
 		subCategory *EventSubCategory, raisedTs int64) error
 	EnableLivenessChannel(ctx context.Context, enable bool) chan bool
diff --git a/pkg/events/events_proxy.go b/pkg/events/events_proxy.go
index 7d08e51..d93a67b 100644
--- a/pkg/events/events_proxy.go
+++ b/pkg/events/events_proxy.go
@@ -180,6 +180,33 @@
 
 }
 
+// SendKpiEvent3 is to send kpi events with 64-bit counter support to voltha.event topic
+func (ep *EventProxy) SendKpiEvent3(ctx context.Context, id string, kpiEvent *voltha.KpiEvent3, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
+	if kpiEvent == nil {
+		logger.Error(ctx, "Received empty kpi event3")
+		return errors.New("KPI event3 nil")
+	}
+	var event voltha.Event
+	var de voltha.Event_KpiEvent3
+	var err error
+	de.KpiEvent3 = kpiEvent
+	if event.Header, err = ep.getEventHeader(id, category, &subCategory, voltha.EventType_KPI_EVENT3, raisedTs); err != nil {
+		return err
+	}
+	event.EventType = &de
+
+	if err := ep.sendEvent(ctx, &event, strconv.FormatInt(raisedTs, 10)); err != nil {
+		logger.Errorw(ctx, "Failed to send kpi event3 to KAFKA bus", log.Fields{"device-event": kpiEvent})
+		return err
+	}
+	logger.Debugw(ctx, "Successfully sent kpi event3 to KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
+		"SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
+		"ReportedTs": event.Header.ReportedTs, "KpiEventName": "STATS_EVENT"})
+
+	return nil
+
+}
+
 func (ep *EventProxy) sendEvent(ctx context.Context, event *voltha.Event, key string) error {
 	logger.Debugw(ctx, "Send event to kafka", log.Fields{"event": event})
 	if err := ep.kafkaClient.Send(ctx, event, &ep.eventTopic, key); err != nil {