blob: d93a67b51d1e50f04f6f654565b2c4af9c5d5982 [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
Joey Armstrong9cdee9f2024-01-03 04:56:14 -05002 * Copyright 2020-2024 Open Networking Foundation (ONF) and the ONF Contributors
Scott Baker2c1c4822019-10-16 11:02:41 -07003
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Himani Chawla4d2eb5d2020-11-12 17:19:20 +053017package events
Scott Baker2c1c4822019-10-16 11:02:41 -070018
19import (
Himani Chawla6c38e572021-03-23 19:44:25 +053020 "container/ring"
Neha Sharma94f16a92020-06-26 04:17:55 +000021 "context"
Scott Baker2c1c4822019-10-16 11:02:41 -070022 "errors"
23 "fmt"
24 "strconv"
25 "strings"
Himani Chawla6c38e572021-03-23 19:44:25 +053026 "sync"
Scott Baker2c1c4822019-10-16 11:02:41 -070027 "time"
28
khenaidoo26721882021-08-11 17:42:52 -040029 "github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
30 "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
31 "github.com/opencord/voltha-lib-go/v7/pkg/log"
32 "github.com/opencord/voltha-protos/v5/go/voltha"
33 "google.golang.org/protobuf/types/known/timestamppb"
Scott Baker2c1c4822019-10-16 11:02:41 -070034)
35
Himani Chawla6c38e572021-03-23 19:44:25 +053036// TODO: Make configurable through helm chart
37const EVENT_THRESHOLD = 1000
38
39type lastEvent struct{}
40
Scott Baker2c1c4822019-10-16 11:02:41 -070041type EventProxy struct {
Himani Chawla6c38e572021-03-23 19:44:25 +053042 kafkaClient kafka.Client
43 eventTopic kafka.Topic
44 eventQueue *EventQueue
45 queueCtx context.Context
46 queueCancelCtx context.CancelFunc
Scott Baker2c1c4822019-10-16 11:02:41 -070047}
48
49func NewEventProxy(opts ...EventProxyOption) *EventProxy {
50 var proxy EventProxy
51 for _, option := range opts {
52 option(&proxy)
53 }
Himani Chawla6c38e572021-03-23 19:44:25 +053054 proxy.eventQueue = newEventQueue()
55 proxy.queueCtx, proxy.queueCancelCtx = context.WithCancel(context.Background())
Scott Baker2c1c4822019-10-16 11:02:41 -070056 return &proxy
57}
58
59type EventProxyOption func(*EventProxy)
60
61func MsgClient(client kafka.Client) EventProxyOption {
62 return func(args *EventProxy) {
63 args.kafkaClient = client
64 }
65}
66
67func MsgTopic(topic kafka.Topic) EventProxyOption {
68 return func(args *EventProxy) {
69 args.eventTopic = topic
70 }
71}
72
73func (ep *EventProxy) formatId(eventName string) string {
74 return fmt.Sprintf("Voltha.openolt.%s.%s", eventName, strconv.FormatInt(time.Now().UnixNano(), 10))
75}
76
Scott Baker8e2be6b2020-02-10 17:27:15 -080077func (ep *EventProxy) getEventHeader(eventName string,
Himani Chawla4d2eb5d2020-11-12 17:19:20 +053078 category eventif.EventCategory,
79 subCategory *eventif.EventSubCategory,
80 eventType eventif.EventType,
Scott Baker8e2be6b2020-02-10 17:27:15 -080081 raisedTs int64) (*voltha.EventHeader, error) {
Scott Baker2c1c4822019-10-16 11:02:41 -070082 var header voltha.EventHeader
83 if strings.Contains(eventName, "_") {
84 eventName = strings.Join(strings.Split(eventName, "_")[:len(strings.Split(eventName, "_"))-2], "_")
85 } else {
86 eventName = "UNKNOWN_EVENT"
87 }
88 /* Populating event header */
89 header.Id = ep.formatId(eventName)
90 header.Category = category
Himani Chawla4d2eb5d2020-11-12 17:19:20 +053091 if subCategory != nil {
92 header.SubCategory = *subCategory
Himani Chawlaadc1b312021-02-04 13:12:33 +053093 } else {
94 header.SubCategory = voltha.EventSubCategory_NONE
Himani Chawla4d2eb5d2020-11-12 17:19:20 +053095 }
Scott Baker2c1c4822019-10-16 11:02:41 -070096 header.Type = eventType
Himani Chawla4d2eb5d2020-11-12 17:19:20 +053097 header.TypeVersion = eventif.EventTypeVersion
Scott Baker8e2be6b2020-02-10 17:27:15 -080098
Girish Gowdradcd54062021-04-22 12:49:17 -070099 // raisedTs is in seconds
khenaidoo26721882021-08-11 17:42:52 -0400100 header.RaisedTs = timestamppb.New(time.Unix(raisedTs, 0))
101 header.ReportedTs = timestamppb.New(time.Now())
Scott Baker8e2be6b2020-02-10 17:27:15 -0800102
103 return &header, nil
Scott Baker2c1c4822019-10-16 11:02:41 -0700104}
105
Himani Chawla4d2eb5d2020-11-12 17:19:20 +0530106/* Send out rpc events*/
107func (ep *EventProxy) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent, category eventif.EventCategory, subCategory *eventif.EventSubCategory, raisedTs int64) error {
108 if rpcEvent == nil {
109 logger.Error(ctx, "Received empty rpc event")
110 return errors.New("rpc event nil")
111 }
112 var event voltha.Event
113 var err error
114 if event.Header, err = ep.getEventHeader(id, category, subCategory, voltha.EventType_RPC_EVENT, raisedTs); err != nil {
115 return err
116 }
117 event.EventType = &voltha.Event_RpcEvent{RpcEvent: rpcEvent}
Himani Chawla6c38e572021-03-23 19:44:25 +0530118 ep.eventQueue.push(&event)
Himani Chawla4d2eb5d2020-11-12 17:19:20 +0530119 return nil
120
121}
122
Scott Baker2c1c4822019-10-16 11:02:41 -0700123/* Send out device events*/
Himani Chawla4d2eb5d2020-11-12 17:19:20 +0530124func (ep *EventProxy) SendDeviceEvent(ctx context.Context, deviceEvent *voltha.DeviceEvent, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
kesavandd85e52b2022-03-15 16:38:08 +0530125 return ep.SendDeviceEventWithKey(ctx, deviceEvent, category, subCategory, raisedTs, "")
126}
127
128/* Send out device events with key*/
129func (ep *EventProxy) SendDeviceEventWithKey(ctx context.Context, deviceEvent *voltha.DeviceEvent, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64, key string) error {
Scott Baker2c1c4822019-10-16 11:02:41 -0700130 if deviceEvent == nil {
Abhay Kumar40252eb2025-10-13 13:25:53 +0000131 logger.Error(ctx, "recieved empty device event")
132 return errors.New("device event nil")
Scott Baker2c1c4822019-10-16 11:02:41 -0700133 }
134 var event voltha.Event
135 var de voltha.Event_DeviceEvent
Scott Baker8e2be6b2020-02-10 17:27:15 -0800136 var err error
Scott Baker2c1c4822019-10-16 11:02:41 -0700137 de.DeviceEvent = deviceEvent
Himani Chawla4d2eb5d2020-11-12 17:19:20 +0530138 if event.Header, err = ep.getEventHeader(deviceEvent.DeviceEventName, category, &subCategory, voltha.EventType_DEVICE_EVENT, raisedTs); err != nil {
Scott Baker8e2be6b2020-02-10 17:27:15 -0800139 return err
140 }
Scott Baker2c1c4822019-10-16 11:02:41 -0700141 event.EventType = &de
kesavandd85e52b2022-03-15 16:38:08 +0530142
143 if err := ep.sendEvent(ctx, &event, key); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000144 logger.Errorw(ctx, "Failed to send device event to KAFKA bus", log.Fields{"device-event": deviceEvent})
Scott Baker2c1c4822019-10-16 11:02:41 -0700145 return err
146 }
bseeniva206459b2026-02-12 11:56:29 +0530147 logger.Debugw(ctx, "Successfully sent device event KAFKA", log.Fields{"key": key, "Id": event.Header.Id, "Category": event.Header.Category,
Scott Baker2c1c4822019-10-16 11:02:41 -0700148 "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
149 "ReportedTs": event.Header.ReportedTs, "ResourceId": deviceEvent.ResourceId, "Context": deviceEvent.Context,
150 "DeviceEventName": deviceEvent.DeviceEventName})
151
152 return nil
153
154}
155
Naga Manjunatha59c9152019-10-30 12:48:49 +0530156// SendKpiEvent is to send kpi events to voltha.event topic
Himani Chawla4d2eb5d2020-11-12 17:19:20 +0530157func (ep *EventProxy) SendKpiEvent(ctx context.Context, id string, kpiEvent *voltha.KpiEvent2, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
Naga Manjunath86e9d2e2019-10-25 15:21:49 +0530158 if kpiEvent == nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000159 logger.Error(ctx, "Recieved empty kpi event")
Naga Manjunath86e9d2e2019-10-25 15:21:49 +0530160 return errors.New("KPI event nil")
161 }
162 var event voltha.Event
163 var de voltha.Event_KpiEvent2
Scott Baker8e2be6b2020-02-10 17:27:15 -0800164 var err error
Naga Manjunath86e9d2e2019-10-25 15:21:49 +0530165 de.KpiEvent2 = kpiEvent
Himani Chawla4d2eb5d2020-11-12 17:19:20 +0530166 if event.Header, err = ep.getEventHeader(id, category, &subCategory, voltha.EventType_KPI_EVENT2, raisedTs); err != nil {
Scott Baker8e2be6b2020-02-10 17:27:15 -0800167 return err
168 }
Naga Manjunath86e9d2e2019-10-25 15:21:49 +0530169 event.EventType = &de
kesavandd85e52b2022-03-15 16:38:08 +0530170
171 if err := ep.sendEvent(ctx, &event, strconv.FormatInt(raisedTs, 10)); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000172 logger.Errorw(ctx, "Failed to send kpi event to KAFKA bus", log.Fields{"device-event": kpiEvent})
Naga Manjunath86e9d2e2019-10-25 15:21:49 +0530173 return err
174 }
bseeniva206459b2026-02-12 11:56:29 +0530175 logger.Debugw(ctx, "Successfully sent kpi event to KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
Naga Manjunath86e9d2e2019-10-25 15:21:49 +0530176 "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
177 "ReportedTs": event.Header.ReportedTs, "KpiEventName": "STATS_EVENT"})
178
179 return nil
180
181}
182
pnalmas81f3ed22026-03-31 13:33:24 +0530183// SendKpiEvent3 is to send kpi events with 64-bit counter support to voltha.event topic
184func (ep *EventProxy) SendKpiEvent3(ctx context.Context, id string, kpiEvent *voltha.KpiEvent3, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
185 if kpiEvent == nil {
186 logger.Error(ctx, "Received empty kpi event3")
187 return errors.New("KPI event3 nil")
188 }
189 var event voltha.Event
190 var de voltha.Event_KpiEvent3
191 var err error
192 de.KpiEvent3 = kpiEvent
193 if event.Header, err = ep.getEventHeader(id, category, &subCategory, voltha.EventType_KPI_EVENT3, raisedTs); err != nil {
194 return err
195 }
196 event.EventType = &de
197
198 if err := ep.sendEvent(ctx, &event, strconv.FormatInt(raisedTs, 10)); err != nil {
199 logger.Errorw(ctx, "Failed to send kpi event3 to KAFKA bus", log.Fields{"device-event": kpiEvent})
200 return err
201 }
202 logger.Debugw(ctx, "Successfully sent kpi event3 to KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
203 "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
204 "ReportedTs": event.Header.ReportedTs, "KpiEventName": "STATS_EVENT"})
205
206 return nil
207
208}
209
kesavandd85e52b2022-03-15 16:38:08 +0530210func (ep *EventProxy) sendEvent(ctx context.Context, event *voltha.Event, key string) error {
Himani Chawla4d2eb5d2020-11-12 17:19:20 +0530211 logger.Debugw(ctx, "Send event to kafka", log.Fields{"event": event})
kesavandd85e52b2022-03-15 16:38:08 +0530212 if err := ep.kafkaClient.Send(ctx, event, &ep.eventTopic, key); err != nil {
Scott Baker2c1c4822019-10-16 11:02:41 -0700213 return err
214 }
Neha Sharma94f16a92020-06-26 04:17:55 +0000215 logger.Debugw(ctx, "Sent event to kafka", log.Fields{"event": event})
Scott Baker2c1c4822019-10-16 11:02:41 -0700216
217 return nil
218}
Matteo Scandolo2ca74462021-03-01 14:03:17 -0800219
220func (ep *EventProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
221 return ep.kafkaClient.EnableLivenessChannel(ctx, enable)
222}
223
224func (ep *EventProxy) SendLiveness(ctx context.Context) error {
225 return ep.kafkaClient.SendLiveness(ctx)
226}
Himani Chawla6c38e572021-03-23 19:44:25 +0530227
228// Start the event proxy
khenaidoo26721882021-08-11 17:42:52 -0400229func (ep *EventProxy) Start() error {
kesavandd85e52b2022-03-15 16:38:08 +0530230 if !ep.eventTopicExits(context.Background()) {
231 logger.Errorw(context.Background(), "event-topic-doesn't-exist-in-kafka", log.Fields{"element": ep.eventTopic.Name})
232 return fmt.Errorf("event topic doesn't exist in kafka")
233 }
234
Himani Chawla6c38e572021-03-23 19:44:25 +0530235 eq := ep.eventQueue
kesavandd85e52b2022-03-15 16:38:08 +0530236
Himani Chawla6c38e572021-03-23 19:44:25 +0530237 go eq.start(ep.queueCtx)
238 logger.Debugw(context.Background(), "event-proxy-starting...", log.Fields{"events-threashold": EVENT_THRESHOLD})
239 for {
240 // Notify the queue I am ready
241 eq.readyToSendToKafkaCh <- struct{}{}
242 // Wait for an event
243 elem, ok := <-eq.eventChannel
244 if !ok {
245 logger.Debug(context.Background(), "event-channel-closed-exiting")
246 break
247 }
248 // Check for last event
249 if _, ok := elem.(*lastEvent); ok {
250 // close the queuing loop
251 logger.Info(context.Background(), "received-last-event")
252 ep.queueCancelCtx()
253 break
254 }
255 ctx := context.Background()
256 event, ok := elem.(*voltha.Event)
257 if !ok {
258 logger.Warnw(ctx, "invalid-event", log.Fields{"element": elem})
259 continue
260 }
kesavandd85e52b2022-03-15 16:38:08 +0530261 if err := ep.sendEvent(ctx, event, ""); err != nil {
262 logger.Warnw(ctx, "failed-to-send-event-to-kafka-bus", log.Fields{"event": event})
Himani Chawla6c38e572021-03-23 19:44:25 +0530263 } else {
264 logger.Debugw(ctx, "successfully-sent-rpc-event-to-kafka-bus", log.Fields{"id": event.Header.Id, "category": event.Header.Category,
265 "sub-category": event.Header.SubCategory, "type": event.Header.Type, "type-version": event.Header.TypeVersion,
266 "reported-ts": event.Header.ReportedTs, "event-type": event.EventType})
267 }
268 }
khenaidoo26721882021-08-11 17:42:52 -0400269 return nil
Himani Chawla6c38e572021-03-23 19:44:25 +0530270}
271
272func (ep *EventProxy) Stop() {
khenaidoo26721882021-08-11 17:42:52 -0400273 if ep.eventQueue != nil {
274 ep.eventQueue.stop()
275 }
Himani Chawla6c38e572021-03-23 19:44:25 +0530276}
277
278type EventQueue struct {
279 mutex sync.RWMutex
280 eventChannel chan interface{}
281 insertPosition *ring.Ring
282 popPosition *ring.Ring
283 dataToSendAvailable chan struct{}
284 readyToSendToKafkaCh chan struct{}
285 eventQueueStopped chan struct{}
286}
287
288func newEventQueue() *EventQueue {
289 ev := &EventQueue{
290 eventChannel: make(chan interface{}),
291 insertPosition: ring.New(EVENT_THRESHOLD),
292 dataToSendAvailable: make(chan struct{}),
293 readyToSendToKafkaCh: make(chan struct{}),
294 eventQueueStopped: make(chan struct{}),
295 }
296 ev.popPosition = ev.insertPosition
297 return ev
298}
299
300// push is invoked to push an event at the back of a queue
301func (eq *EventQueue) push(event interface{}) {
302 eq.mutex.Lock()
303
304 if eq.insertPosition != nil {
305 // Handle Queue is full.
306 // TODO: Current default is to overwrite old data if queue is full. Is there a need to
307 // block caller if max threshold is reached?
308 if eq.insertPosition.Value != nil && eq.insertPosition == eq.popPosition {
309 eq.popPosition = eq.popPosition.Next()
310 }
311
312 // Insert data and move pointer to next empty position
313 eq.insertPosition.Value = event
314 eq.insertPosition = eq.insertPosition.Next()
315
316 // Check for last event
317 if _, ok := event.(*lastEvent); ok {
318 eq.insertPosition = nil
319 }
320 eq.mutex.Unlock()
321 // Notify waiting thread of data availability
322 eq.dataToSendAvailable <- struct{}{}
323
324 } else {
325 logger.Debug(context.Background(), "event-queue-is-closed-as-insert-position-is-cleared")
326 eq.mutex.Unlock()
327 }
328}
329
330// start starts the routine that extracts an element from the event queue and
331// send it to the kafka sending routine to process.
332func (eq *EventQueue) start(ctx context.Context) {
333 logger.Info(ctx, "starting-event-queue")
334loop:
335 for {
336 select {
337 case <-eq.dataToSendAvailable:
338 // Do nothing - use to prevent caller pushing data to block
339 case <-eq.readyToSendToKafkaCh:
340 {
341 // Kafka sending routine is ready to process an event
342 eq.mutex.Lock()
343 element := eq.popPosition.Value
344 if element == nil {
345 // No events to send. Wait
346 eq.mutex.Unlock()
347 select {
348 case _, ok := <-eq.dataToSendAvailable:
349 if !ok {
350 // channel closed
351 eq.eventQueueStopped <- struct{}{}
352 return
353 }
354 case <-ctx.Done():
355 logger.Info(ctx, "event-queue-context-done")
356 eq.eventQueueStopped <- struct{}{}
357 return
358 }
359 eq.mutex.Lock()
360 element = eq.popPosition.Value
361 }
362 eq.popPosition.Value = nil
363 eq.popPosition = eq.popPosition.Next()
364 eq.mutex.Unlock()
365 eq.eventChannel <- element
366 }
367 case <-ctx.Done():
368 logger.Info(ctx, "event-queue-context-done")
369 eq.eventQueueStopped <- struct{}{}
370 break loop
371 }
372 }
373 logger.Info(ctx, "event-queue-stopped")
374
375}
376
377func (eq *EventQueue) stop() {
378 // Flush all
379 eq.push(&lastEvent{})
380 <-eq.eventQueueStopped
381 eq.mutex.Lock()
382 close(eq.readyToSendToKafkaCh)
383 close(eq.dataToSendAvailable)
384 close(eq.eventChannel)
385 eq.mutex.Unlock()
386
387}
kesavandd85e52b2022-03-15 16:38:08 +0530388
389func (ep *EventProxy) eventTopicExits(ctx context.Context) bool {
390
391 // check if voltha.events topic exists
392 topics, err := ep.kafkaClient.ListTopics(ctx)
393 if err != nil {
394 logger.Errorw(ctx, "fail-to-get-topics", log.Fields{"topic": ep.eventTopic.Name, "error": err})
395 return false
396 }
397
398 logger.Debugw(ctx, "topics in kafka", log.Fields{"topics": topics, "event-topic": ep.eventTopic.Name})
399 for _, topic := range topics {
400 if topic == ep.eventTopic.Name {
401 return true
402 }
403 }
404 return false
405}