SEBA-949 support for publishing bbsim events on kafka
Change-Id: I4354cd026bbadc801e4d6d08b2f9cd3462917b4c
diff --git a/internal/bbsim/devices/helpers.go b/internal/bbsim/devices/helpers.go
index 5a27202..c4bab28 100644
--- a/internal/bbsim/devices/helpers.go
+++ b/internal/bbsim/devices/helpers.go
@@ -17,9 +17,12 @@
package devices
import (
- "github.com/looplab/fsm"
- "github.com/opencord/voltha-protos/v2/go/openolt"
"strconv"
+ "time"
+
+ "github.com/looplab/fsm"
+ "github.com/opencord/bbsim/internal/common"
+ "github.com/opencord/voltha-protos/v2/go/openolt"
)
type mode int
@@ -65,3 +68,20 @@
}
return s
}
+
+func publishEvent(eventType string, intfID int32, onuID int32, onuSerial string) {
+ if olt.PublishEvents {
+ currentTime := time.Now()
+
+ event := common.Event{
+ EventType: eventType,
+ OltID: olt.ID,
+ IntfID: intfID,
+ OnuID: onuID,
+ OnuSerial: onuSerial,
+ Timestamp: currentTime.Format("2006-01-02 15:04:05.000000000"),
+ EpochTime: currentTime.UnixNano() / 1000000,
+ }
+ olt.EventChannel <- event
+ }
+}
diff --git a/internal/bbsim/devices/olt.go b/internal/bbsim/devices/olt.go
index d5ba84f..fa0ba5d 100644
--- a/internal/bbsim/devices/olt.go
+++ b/internal/bbsim/devices/olt.go
@@ -61,6 +61,8 @@
Flows map[FlowKey]openolt.Flow
Delay int
ControlledActivation mode
+ EventChannel chan common.Event
+ PublishEvents bool
Pons []*PonPort
Nnis []*NniPort
@@ -82,7 +84,7 @@
return &olt
}
-func CreateOLT(oltId int, nni int, pon int, onuPerPon int, sTag int, cTagInit int, auth bool, dhcp bool, delay int, ca string, enablePerf bool, isMock bool) *OltDevice {
+func CreateOLT(oltId int, nni int, pon int, onuPerPon int, sTag int, cTagInit int, auth bool, dhcp bool, delay int, ca string, enablePerf bool, event bool, isMock bool) *OltDevice {
oltLogger.WithFields(log.Fields{
"ID": oltId,
"NumNni": nni,
@@ -104,6 +106,7 @@
Delay: delay,
Flows: make(map[FlowKey]openolt.Flow),
enablePerf: enablePerf,
+ PublishEvents: event,
}
if val, ok := ControlledActivationModes[ca]; ok {
@@ -165,6 +168,12 @@
}
}
+ if olt.PublishEvents {
+ log.Debugf("BBSim event publishing is enabled")
+ // Create a channel to write event messages
+ olt.EventChannel = make(chan common.Event, 100)
+ }
+
return &olt
}
@@ -728,6 +737,7 @@
oltLogger.WithFields(log.Fields{
"OnuSn": onuSnToString(onu.SerialNumber),
}).Info("Received ActivateOnu call from VOLTHA")
+ publishEvent("ONU-activate-indication-received", int32(onu.IntfId), int32(onu.OnuId), onuSnToString(onu.SerialNumber))
pon, _ := o.GetPonById(onu.IntfId)
_onu, _ := pon.GetOnuBySn(onu.SerialNumber)
@@ -797,6 +807,7 @@
oltLogger.WithFields(log.Fields{
"oltId": o.ID,
}).Info("Disabling OLT")
+ publishEvent("OLT-disable-received", -1, -1, "")
for _, pon := range o.Pons {
if pon.InternalState.Current() == "enabled" {
@@ -833,6 +844,7 @@
func (o *OltDevice) EnableIndication(_ *openolt.Empty, stream openolt.Openolt_EnableIndicationServer) error {
oltLogger.WithField("oltId", o.ID).Info("OLT receives EnableIndication call from VOLTHA")
+ publishEvent("OLT-enable-received", -1, -1, "")
o.Enable(stream)
return nil
}
@@ -1051,6 +1063,7 @@
oltLogger.WithFields(log.Fields{
"oltId": o.ID,
}).Info("Shutting down")
+ publishEvent("OLT-reboot-received", -1, -1, "")
go o.RestartOLT()
return new(openolt.Empty), nil
}
@@ -1059,6 +1072,7 @@
oltLogger.WithFields(log.Fields{
"oltId": o.ID,
}).Info("Received ReenableOlt request from VOLTHA")
+ publishEvent("OLT-reenable-received", -1, -1, "")
// enable OLT
oltMsg := Message{
diff --git a/internal/bbsim/devices/onu.go b/internal/bbsim/devices/onu.go
index 3622ee3..ee16cb8 100644
--- a/internal/bbsim/devices/onu.go
+++ b/internal/bbsim/devices/onu.go
@@ -213,6 +213,9 @@
}
o.Channel <- msg
},
+ "enter_eap_response_success_received": func(e *fsm.Event) {
+ publishEvent("ONU-authentication-done", int32(o.PonPortID), int32(o.ID), o.Sn())
+ },
"enter_auth_failed": func(e *fsm.Event) {
onuLogger.WithFields(log.Fields{
"OnuId": o.ID,
@@ -235,6 +238,9 @@
}
o.Channel <- msg
},
+ "enter_dhcp_ack_received": func(e *fsm.Event) {
+ publishEvent("ONU-DHCP-ACK-received", int32(o.PonPortID), int32(o.ID), o.Sn())
+ },
"enter_dhcp_failed": func(e *fsm.Event) {
onuLogger.WithFields(log.Fields{
"OnuId": o.ID,
@@ -516,6 +522,7 @@
"OnuSn": msg.Onu.Sn(),
"OnuId": o.ID,
}).Debug("Sent Indication_OnuDiscInd")
+ publishEvent("ONU-discovery-indication-sent", int32(msg.Onu.PonPortID), int32(o.ID), msg.Onu.Sn())
// after DiscoveryRetryDelay check if the state is the same and in case send a new OnuDiscIndication
go func(delay time.Duration) {
@@ -553,6 +560,25 @@
}
+func (o *Onu) publishOmciEvent(msg OmciMessage) {
+ if olt.PublishEvents {
+ _, _, msgType, _, _, _, err := omcisim.ParsePkt(HexDecode(msg.omciMsg.Pkt))
+ if err != nil {
+ log.Errorf("error in getting msgType %v", err)
+ return
+ }
+ if msgType == omcisim.MibUpload {
+ o.seqNumber = 0
+ publishEvent("MIB-upload-received", int32(o.PonPortID), int32(o.ID), common.OnuSnToString(o.SerialNumber))
+ } else if msgType == omcisim.MibUploadNext {
+ o.seqNumber++
+ if o.seqNumber > 290 {
+ publishEvent("MIB-upload-done", int32(o.PonPortID), int32(o.ID), common.OnuSnToString(o.SerialNumber))
+ }
+ }
+ }
+}
+
func (o *Onu) handleOmciMessage(msg OmciMessage, stream openolt.Openolt_EnableIndicationServer) {
onuLogger.WithFields(log.Fields{
@@ -561,6 +587,8 @@
"omciPacket": msg.omciMsg.Pkt,
}).Tracef("Received OMCI message")
+ o.publishOmciEvent(msg)
+
var omciInd openolt.OmciIndication
respPkt, err := omcisim.OmciSim(o.PonPortID, o.ID, HexDecode(msg.omciMsg.Pkt))
if err != nil {
diff --git a/internal/common/kafka_utils.go b/internal/common/kafka_utils.go
new file mode 100644
index 0000000..9ddd79c
--- /dev/null
+++ b/internal/common/kafka_utils.go
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import (
+ "encoding/json"
+ "strconv"
+ "time"
+
+ "github.com/Shopify/sarama"
+ log "github.com/sirupsen/logrus"
+)
+
+var producer sarama.AsyncProducer
+var topic string
+
+// Event defines structure for bbsim events
+type Event struct {
+ EventType string
+ OnuSerial string
+ OltID int
+ IntfID int32
+ OnuID int32
+ EpochTime int64
+ Timestamp string
+}
+
+// InitializePublisher initalizes kafka publisher
+func InitializePublisher(oltID int) error {
+
+ var err error
+ sarama.Logger = log.New()
+ // producer config
+ config := sarama.NewConfig()
+ config.Producer.Retry.Max = 5
+ config.Metadata.Retry.Max = 10
+ config.Metadata.Retry.Backoff = 10 * time.Second
+ config.ClientID = "BBSim-OLT-" + strconv.Itoa(oltID)
+ topic = "BBSim-OLT-" + strconv.Itoa(oltID) + "-Events"
+
+ producer, err = sarama.NewAsyncProducer([]string{Options.BBSim.KafkaAddress}, config)
+ return err
+}
+
+// KafkaPublisher receives messages on eventChannel and publish them to kafka
+func KafkaPublisher(eventChannel chan Event) {
+ defer log.Debugf("KafkaPublisher stopped")
+ for {
+ select {
+ case event := <-eventChannel:
+ log.Debugf("Received event on channel %v", event)
+ jsonEvent, err := json.Marshal(event)
+ if err != nil {
+ log.Errorf("Failed to get json event %v", err)
+ continue
+ }
+ producer.Input() <- &sarama.ProducerMessage{
+ Topic: topic,
+ Value: sarama.ByteEncoder(jsonEvent),
+ }
+ log.Debugf("Event sent on kafka")
+ }
+ }
+}
diff --git a/internal/common/options.go b/internal/common/options.go
index d5317e1..f1f5f0a 100644
--- a/internal/common/options.go
+++ b/internal/common/options.go
@@ -70,6 +70,8 @@
LegacyRestApiAddress string `yaml:"legacy_rest_api_address"`
SadisRestAddress string `yaml:"sadis_rest_address"`
SadisServer bool `yaml:"sadis_server"`
+ KafkaAddress string `yaml:"kafka_address"`
+ Events bool `yaml:"enable_events"`
ControlledActivation string `yaml:"controlled_activation"`
EnablePerf bool `yaml:"enable_perf"`
}
@@ -105,6 +107,8 @@
LegacyRestApiAddress: ":50073",
SadisRestAddress: ":50074",
SadisServer: true,
+ KafkaAddress: ":9092",
+ Events: false,
ControlledActivation: "default",
EnablePerf: false,
},
@@ -171,6 +175,8 @@
controlledActivation := flag.String("ca", conf.BBSim.ControlledActivation, "Set the mode for controlled activation of PON ports and ONUs")
enablePerf := flag.Bool("enableperf", conf.BBSim.EnablePerf, "Setting this flag will cause BBSim to not store data like traffic schedulers, flows of ONUs etc..")
+ enableEvents := flag.Bool("enableEvents", conf.BBSim.Events, "Enable sending BBSim events on configured kafka server")
+ kafkaAddress := flag.String("kafkaAddress", conf.BBSim.KafkaAddress, "IP:Port for kafka")
flag.Parse()
conf.Olt.ID = int(*olt_id)
@@ -187,6 +193,8 @@
conf.BBSim.Delay = *delay
conf.BBSim.ControlledActivation = *controlledActivation
conf.BBSim.EnablePerf = *enablePerf
+ conf.BBSim.Events = *enableEvents
+ conf.BBSim.KafkaAddress = *kafkaAddress
// update device id if not set
if conf.Olt.DeviceId == "" {