[VOL-5483] Add support at openonu adapter to fetch the current PM data from the supported ONT devices.
Change-Id: I205b4a59b2a4eacff71e2114de8ceb07d7d131e5
Signed-off-by: pnalmas <praneeth.nalmas@radisys.com>
diff --git a/internal/pkg/pmmgr/onu_metrics_manager.go b/internal/pkg/pmmgr/onu_metrics_manager.go
index e6eda1c..c333ba4 100755
--- a/internal/pkg/pmmgr/onu_metrics_manager.go
+++ b/internal/pkg/pmmgr/onu_metrics_manager.go
@@ -259,7 +259,7 @@
)
// Defines the type for generic metric population function
-type groupMetricPopulateFunc func(context.Context, me.ClassID, uint16, me.AttributeValueMap, me.AttributeValueMap, map[string]float32, *int) error
+type groupMetricPopulateFunc func(context.Context, me.ClassID, uint16, me.AttributeValueMap, me.AttributeValueMap, map[string]float32, *int, bool) error
// *** Classical L2 PM Counters end ***
@@ -1233,6 +1233,12 @@
me.EthernetFrameExtendedPm64BitClassID:
mm.extendedPmMeChan <- meAttributes
return nil
+ case me.GemPortNetworkCtpPerformanceMonitoringHistoryDataClassID:
+ mm.l2PmChan <- meAttributes
+ return nil
+ case me.FecPerformanceMonitoringHistoryDataClassID:
+ mm.opticalMetricsChan <- meAttributes
+ return nil
default:
logger.Errorw(ctx, "unhandled omci get current data response message",
log.Fields{"device-id": mm.deviceID, "class-id": msgObj.EntityClass})
@@ -1246,6 +1252,15 @@
// AttributeFailure error code, while correctly populating other counters it supports
mm.extendedPmMeChan <- meAttributes
return nil
+ case me.GemPortNetworkCtpPerformanceMonitoringHistoryDataClassID:
+ // Handle attribute failure for GemPort data similar to Extended PM
+ mm.l2PmChan <- meAttributes
+ return nil
+ case me.FecPerformanceMonitoringHistoryDataClassID:
+ // Handle attribute failure for FEC data similar to Extended PM
+ mm.opticalMetricsChan <- meAttributes
+ return nil
+
default:
logger.Errorw(ctx, "unhandled omci get current data response message",
log.Fields{"device-id": mm.deviceID, "class-id": msgObj.EntityClass})
@@ -1541,13 +1556,13 @@
case FecHistoryName:
for _, entityID := range copyOfEntityIDs {
- if metricInfo := mm.collectFecHistoryData(ctx, entityID); metricInfo != nil { // upstream
+ if metricInfo := mm.collectFecHistoryData(ctx, entityID, false); metricInfo != nil { // upstream
metricInfoSlice = append(metricInfoSlice, metricInfo)
}
}
case GemPortHistoryName:
for _, entityID := range copyOfEntityIDs {
- if metricInfo := mm.collectGemHistoryData(ctx, entityID); metricInfo != nil { // upstream
+ if metricInfo := mm.collectGemHistoryData(ctx, entityID, false); metricInfo != nil { // upstream
metricInfoSlice = append(metricInfoSlice, metricInfo)
}
}
@@ -2037,7 +2052,7 @@
intervalEndTime := -1
ethPMHistData := make(map[string]float32)
- if err := mm.populateGroupSpecificMetrics(ctx, mEnt, classID, entityID, meAttributes, ethPMHistData, &intervalEndTime); err != nil {
+ if err := mm.populateGroupSpecificMetrics(ctx, mEnt, classID, entityID, meAttributes, ethPMHistData, &intervalEndTime, false); err != nil {
return nil
}
@@ -2074,7 +2089,7 @@
intervalEndTime := -1
ethUniHistData := make(map[string]float32)
- if err := mm.populateGroupSpecificMetrics(ctx, mEnt, classID, entityID, meAttributes, ethUniHistData, &intervalEndTime); err != nil {
+ if err := mm.populateGroupSpecificMetrics(ctx, mEnt, classID, entityID, meAttributes, ethUniHistData, &intervalEndTime, false); err != nil {
return nil
}
@@ -2089,7 +2104,7 @@
return &metricInfo
}
-func (mm *OnuMetricsManager) collectFecHistoryData(ctx context.Context, entityID uint16) *voltha.MetricInformation {
+func (mm *OnuMetricsManager) collectFecHistoryData(ctx context.Context, entityID uint16, isCurrent bool) *voltha.MetricInformation {
var mEnt *me.ManagedEntity
var omciErr me.OmciErrors
var classID me.ClassID
@@ -2104,7 +2119,7 @@
intervalEndTime := -1
fecHistData := make(map[string]float32)
- if err := mm.populateGroupSpecificMetrics(ctx, mEnt, classID, entityID, meAttributes, fecHistData, &intervalEndTime); err != nil {
+ if err := mm.populateGroupSpecificMetrics(ctx, mEnt, classID, entityID, meAttributes, fecHistData, &intervalEndTime, isCurrent); err != nil {
return nil
}
@@ -2119,7 +2134,7 @@
return &metricInfo
}
-func (mm *OnuMetricsManager) collectGemHistoryData(ctx context.Context, entityID uint16) *voltha.MetricInformation {
+func (mm *OnuMetricsManager) collectGemHistoryData(ctx context.Context, entityID uint16, isCurrent bool) *voltha.MetricInformation {
var mEnt *me.ManagedEntity
var omciErr me.OmciErrors
var classID me.ClassID
@@ -2134,7 +2149,7 @@
intervalEndTime := -1
gemHistData := make(map[string]float32)
- if err := mm.populateGroupSpecificMetrics(ctx, mEnt, classID, entityID, meAttributes, gemHistData, &intervalEndTime); err != nil {
+ if err := mm.populateGroupSpecificMetrics(ctx, mEnt, classID, entityID, meAttributes, gemHistData, &intervalEndTime, isCurrent); err != nil {
return nil
}
@@ -2151,16 +2166,22 @@
// nolint: gocyclo
func (mm *OnuMetricsManager) populateEthernetBridgeHistoryMetrics(ctx context.Context, classID me.ClassID, entityID uint16,
- meAttributes me.AttributeValueMap, requestedAttributes me.AttributeValueMap, ethPMHistData map[string]float32, intervalEndTime *int) error {
- //nolint:staticcheck
- upstream := false
- if classID == me.EthernetFramePerformanceMonitoringHistoryDataUpstreamClassID {
- upstream = true
- }
+ meAttributes me.AttributeValueMap, requestedAttributes me.AttributeValueMap, ethPMHistData map[string]float32, intervalEndTime *int, isCurrent bool) error {
+ upstream := classID == me.EthernetFramePerformanceMonitoringHistoryDataUpstreamClassID
// Insert "IntervalEndTime" as part of the requested attributes as we need this to compare the get responses when get request is multipart
requestedAttributes[me.EthernetFramePerformanceMonitoringHistoryDataUpstream_IntervalEndTime] = 0
- meInstance, err := mm.GetMeInstance(ctx, classID, entityID, requestedAttributes,
- mm.pDeviceHandler.GetOmciTimeout(), true, mm.PAdaptFsm.CommChan, mm.isExtendedOmci)
+ var meInstance *me.ManagedEntity
+ var err error
+
+ // Use GetCurrentDataMEInstance if isCurrent is true, otherwise use GetMeInstance
+ if isCurrent {
+ meInstance, err = mm.GetCurrentDataMEInstance(ctx, classID, entityID, requestedAttributes,
+ mm.pDeviceHandler.GetOmciTimeout(), true, mm.PAdaptFsm.CommChan, mm.isExtendedOmci)
+ } else {
+ meInstance, err = mm.GetMeInstance(ctx, classID, entityID, requestedAttributes,
+ mm.pDeviceHandler.GetOmciTimeout(), true, mm.PAdaptFsm.CommChan, mm.isExtendedOmci)
+ }
+
if err != nil {
if CheckMeInstanceStatusCode(err) {
return err // Device is being deleted, so we stop processing
@@ -2269,13 +2290,23 @@
// nolint: gocyclo
func (mm *OnuMetricsManager) populateEthernetUniHistoryMetrics(ctx context.Context, classID me.ClassID, entityID uint16,
- meAttributes me.AttributeValueMap, requestedAttributes me.AttributeValueMap, ethPMUniHistData map[string]float32, intervalEndTime *int) error {
+ meAttributes me.AttributeValueMap, requestedAttributes me.AttributeValueMap, ethPMUniHistData map[string]float32, intervalEndTime *int, isCurrent bool) error {
// Insert "IntervalEndTime" as part of the requested attributes as we need this to compare the get responses when get request is multipart
if _, ok := requestedAttributes["IntervalEndTime"]; !ok {
requestedAttributes["IntervalEndTime"] = 0
}
- meInstance, err := mm.GetMeInstance(ctx, classID, entityID, requestedAttributes,
- mm.pDeviceHandler.GetOmciTimeout(), true, mm.PAdaptFsm.CommChan, mm.isExtendedOmci)
+ var meInstance *me.ManagedEntity
+ var err error
+
+ // Use GetCurrentDataMEInstance if isCurrent is true, otherwise use GetMeInstance
+ if isCurrent {
+ meInstance, err = mm.GetCurrentDataMEInstance(ctx, classID, entityID, requestedAttributes,
+ mm.pDeviceHandler.GetOmciTimeout(), true, mm.PAdaptFsm.CommChan, mm.isExtendedOmci)
+ } else {
+ meInstance, err = mm.GetMeInstance(ctx, classID, entityID, requestedAttributes,
+ mm.pDeviceHandler.GetOmciTimeout(), true, mm.PAdaptFsm.CommChan, mm.isExtendedOmci)
+ }
+
if err != nil {
if CheckMeInstanceStatusCode(err) {
return err // Device is being deleted, so we stop processing
@@ -2378,13 +2409,23 @@
// nolint: gocyclo
func (mm *OnuMetricsManager) populateFecHistoryMetrics(ctx context.Context, classID me.ClassID, entityID uint16,
- meAttributes me.AttributeValueMap, requestedAttributes me.AttributeValueMap, fecHistData map[string]float32, intervalEndTime *int) error {
+ meAttributes me.AttributeValueMap, requestedAttributes me.AttributeValueMap, fecHistData map[string]float32, intervalEndTime *int, isCurrent bool) error {
// Insert "IntervalEndTime" as part of the requested attributes as we need this to compare the get responses when get request is multipart
if _, ok := requestedAttributes[me.FecPerformanceMonitoringHistoryData_IntervalEndTime]; !ok {
requestedAttributes[me.FecPerformanceMonitoringHistoryData_IntervalEndTime] = 0
}
- meInstance, err := mm.GetMeInstance(ctx, classID, entityID, requestedAttributes,
- mm.pDeviceHandler.GetOmciTimeout(), true, mm.PAdaptFsm.CommChan, mm.isExtendedOmci)
+ var meInstance *me.ManagedEntity
+ var err error
+
+ // Use GetCurrentDataMEInstance if isCurrent is true, otherwise use GetMeInstance
+ if isCurrent {
+ meInstance, err = mm.GetCurrentDataMEInstance(ctx, classID, entityID, requestedAttributes,
+ mm.pDeviceHandler.GetOmciTimeout(), true, mm.PAdaptFsm.CommChan, mm.isExtendedOmci)
+ } else {
+ meInstance, err = mm.GetMeInstance(ctx, classID, entityID, requestedAttributes,
+ mm.pDeviceHandler.GetOmciTimeout(), true, mm.PAdaptFsm.CommChan, mm.isExtendedOmci)
+ }
+
if err != nil {
if CheckMeInstanceStatusCode(err) {
return err // Device is being deleted, so we stop processing
@@ -2400,9 +2441,14 @@
logger.Warnw(ctx, "Deleting the device, stopping FEC history metrics collection for the device ", log.Fields{"device-id": mm.deviceID})
return fmt.Errorf("deleting the device, stopping FEC history metrics collection for the device %v", mm.deviceID)
}
- case meAttributes = <-mm.l2PmChan:
+ case meAttributes = <-func() chan me.AttributeValueMap {
+ if isCurrent {
+ return mm.opticalMetricsChan
+ }
+ return mm.l2PmChan
+ }():
logger.Debugw(ctx, "received fec history data metrics",
- log.Fields{"device-id": mm.deviceID, "entityID": entityID})
+ log.Fields{"device-id": mm.deviceID, "entityID": entityID, "isCurrent": isCurrent})
case <-time.After(mm.pOnuDeviceEntry.GetDevOmciCC().GetMaxOmciTimeoutWithRetries() * time.Second):
logger.Errorw(ctx, "timeout waiting for omci-get response for fec history data",
log.Fields{"device-id": mm.deviceID, "entityID": entityID})
@@ -2469,6 +2515,25 @@
return nil, nil
}
+// GetCurrentDataMEInstance gets current performance monitoring data from ME instance
+func (mm *OnuMetricsManager) GetCurrentDataMEInstance(ctx context.Context, classID me.ClassID, entityID uint16, requestedAttributes me.AttributeValueMap,
+ timeout int, highPrio bool, rxChan chan cmn.Message, isExtendedOmci bool) (*me.ManagedEntity, error) {
+
+ select {
+ case <-mm.pDeviceHandler.GetDeviceDeleteCommChan(ctx):
+ errMsg := "deleting the device, stopping GetCurrentDataMEInstance for the device " + mm.deviceID
+ logger.Warn(ctx, errMsg)
+ return nil, status.Error(codes.NotFound, errMsg)
+ default:
+ if mm.pOnuDeviceEntry.GetDevOmciCC() != nil {
+ meInstance, err := mm.pOnuDeviceEntry.GetDevOmciCC().SendGetCurrentDataME(ctx, classID, entityID, requestedAttributes,
+ mm.pDeviceHandler.GetOmciTimeout(), true, mm.PAdaptFsm.CommChan, mm.isExtendedOmci)
+ return meInstance, err
+ }
+ }
+ return nil, nil
+}
+
// CheckMeInstanceStatusCode checked status code if not found returns true
func CheckMeInstanceStatusCode(err error) bool {
if err != nil {
@@ -2482,13 +2547,23 @@
// nolint: gocyclo
func (mm *OnuMetricsManager) populateGemPortMetrics(ctx context.Context, classID me.ClassID, entityID uint16,
- meAttributes me.AttributeValueMap, requestedAttributes me.AttributeValueMap, gemPortHistData map[string]float32, intervalEndTime *int) error {
+ meAttributes me.AttributeValueMap, requestedAttributes me.AttributeValueMap, gemPortHistData map[string]float32, intervalEndTime *int, isCurrent bool) error {
// Insert "IntervalEndTime" is part of the requested attributes as we need this to compare the get responses when get request is multipart
if _, ok := requestedAttributes[me.GemPortNetworkCtpPerformanceMonitoringHistoryData_IntervalEndTime]; !ok {
requestedAttributes[me.GemPortNetworkCtpPerformanceMonitoringHistoryData_IntervalEndTime] = 0
}
- meInstance, err := mm.GetMeInstance(ctx, classID, entityID, requestedAttributes,
- mm.pDeviceHandler.GetOmciTimeout(), true, mm.PAdaptFsm.CommChan, mm.isExtendedOmci)
+ var meInstance *me.ManagedEntity
+ var err error
+
+ // Use GetCurrentDataMEInstance if isCurrent is true, otherwise use GetMeInstance
+ if isCurrent {
+ meInstance, err = mm.GetCurrentDataMEInstance(ctx, classID, entityID, requestedAttributes,
+ mm.pDeviceHandler.GetOmciTimeout(), true, mm.PAdaptFsm.CommChan, mm.isExtendedOmci)
+ } else {
+ meInstance, err = mm.GetMeInstance(ctx, classID, entityID, requestedAttributes,
+ mm.pDeviceHandler.GetOmciTimeout(), true, mm.PAdaptFsm.CommChan, mm.isExtendedOmci)
+ }
+
if err != nil {
if CheckMeInstanceStatusCode(err) {
return err // Device is being deleted, so we stop processing
@@ -2661,7 +2736,7 @@
// nolint: unparam
func (mm *OnuMetricsManager) populateGroupSpecificMetrics(ctx context.Context, mEnt *me.ManagedEntity, classID me.ClassID, entityID uint16,
- meAttributes me.AttributeValueMap, data map[string]float32, intervalEndTime *int) error {
+ meAttributes me.AttributeValueMap, data map[string]float32, intervalEndTime *int, isCurrent bool) error {
var grpFunc groupMetricPopulateFunc
switch classID {
case me.EthernetFramePerformanceMonitoringHistoryDataUpstreamClassID, me.EthernetFramePerformanceMonitoringHistoryDataDownstreamClassID:
@@ -2690,7 +2765,7 @@
size = v.Size + size
} else { // We exceeded the allow omci get size
// Let's collect the attributes via get now and collect remaining in the next iteration
- if err := grpFunc(ctx, classID, entityID, meAttributes, requestedAttributes, data, intervalEndTime); err != nil {
+ if err := grpFunc(ctx, classID, entityID, meAttributes, requestedAttributes, data, intervalEndTime, isCurrent); err != nil {
logger.Errorw(ctx, "error during metric collection",
log.Fields{"device-id": mm.deviceID, "entityID": entityID, "err": err})
return err
@@ -2701,7 +2776,7 @@
}
}
// Collect the omci get attributes for the last bunch of attributes.
- if err := grpFunc(ctx, classID, entityID, meAttributes, requestedAttributes, data, intervalEndTime); err != nil {
+ if err := grpFunc(ctx, classID, entityID, meAttributes, requestedAttributes, data, intervalEndTime, isCurrent); err != nil {
logger.Errorw(ctx, "error during metric collection",
log.Fields{"device-id": mm.deviceID, "entityID": entityID, "err": err})
return err
@@ -4130,6 +4205,29 @@
mm.deviceDeletionInProgress = true
}
+// safeGetMetricValue safely extracts a metric value from the metrics map
+// Returns 0 if the map is nil, key doesn't exist, or value is 0
+// Logs warnings for missing keys to aid in debugging
+func (mm *OnuMetricsManager) safeGetMetricValue(ctx context.Context, metrics map[string]float32, key string, deviceID string) uint32 {
+ if metrics == nil {
+ logger.Warnw(ctx, "Metrics map is nil", log.Fields{"device-id": deviceID, "key": key})
+ return 0
+ }
+
+ value, exists := metrics[key]
+ if !exists {
+ logger.Warnw(ctx, "Metric key not found in metrics map", log.Fields{"device-id": deviceID, "key": key})
+ return 0
+ }
+
+ if value < 0 {
+ logger.Warnw(ctx, "Negative metric value, setting to 0", log.Fields{"device-id": deviceID, "key": key, "value": value})
+ return 0
+ }
+
+ return uint32(value)
+}
+
// Obtain the ONU GEM counters for the ONU device
func (mm *OnuMetricsManager) GetONUGEMCounters(ctx context.Context) *extension.SingleGetValueResponse {
@@ -4167,15 +4265,16 @@
// Loop through each element in the slice
for _, gem := range gemSlice {
logger.Debugw(ctx, "Collecting stats for Gem: ", log.Fields{"GEMID": gem})
- if metricInfo := mm.collectGemHistoryData(ctx, gem); metricInfo != nil {
+ if metricInfo := mm.collectGemHistoryData(ctx, gem, true); metricInfo != nil {
logger.Infow(ctx, "Metricinfo for GEM", log.Fields{"GEMID": gem, "metricInfo": metricInfo})
gemHistoryData := extension.OnuGemPortHistoryData{}
gemHistoryData.GemId = uint32(gem)
- gemHistoryData.TransmittedGEMFrames = uint32(metricInfo.GetMetrics()["transmitted_gem_frames"])
- gemHistoryData.ReceivedGEMFrames = uint32(metricInfo.GetMetrics()["received_gem_frames"])
- gemHistoryData.ReceivedPayloadBytes = uint32(metricInfo.GetMetrics()["received_payload_bytes"])
- gemHistoryData.TransmittedPayloadBytes = uint32(metricInfo.GetMetrics()["transmitted_payload_bytes"])
- gemHistoryData.EncryptionKeyErrors = uint32(metricInfo.GetMetrics()["encryption_key_errors"])
+ metrics := metricInfo.GetMetrics()
+ gemHistoryData.TransmittedGEMFrames = mm.safeGetMetricValue(ctx, metrics, "transmitted_gem_frames", mm.deviceID)
+ gemHistoryData.ReceivedGEMFrames = mm.safeGetMetricValue(ctx, metrics, "received_gem_frames", mm.deviceID)
+ gemHistoryData.ReceivedPayloadBytes = mm.safeGetMetricValue(ctx, metrics, "received_payload_bytes", mm.deviceID)
+ gemHistoryData.TransmittedPayloadBytes = mm.safeGetMetricValue(ctx, metrics, "transmitted_payload_bytes", mm.deviceID)
+ gemHistoryData.EncryptionKeyErrors = mm.safeGetMetricValue(ctx, metrics, "encryption_key_errors", mm.deviceID)
allocIdGemData.GemPortInfo = append(allocIdGemData.GemPortInfo, &gemHistoryData)
logger.Debugw(ctx, " allocIdGemData value ", log.Fields{"AllocIDGemData": allocIdGemData})
@@ -4188,3 +4287,54 @@
return &resp
}
+
+// Obtain the ONU FEC counters for the ONU device
+func (mm *OnuMetricsManager) GetONUFECCounters(ctx context.Context) *extension.SingleGetValueResponse {
+
+ resp := extension.SingleGetValueResponse{
+ Response: &extension.GetValueResponse{
+ Status: extension.GetValueResponse_OK,
+ Response: &extension.GetValueResponse_FecHistory{
+ FecHistory: &extension.GetOnuFecHistoryResponse{},
+ },
+ },
+ }
+
+ if mm.GetdeviceDeletionInProgress() {
+ logger.Infow(ctx, "device already deleted, return", log.Fields{"curr-state": mm.PAdaptFsm.PFsm.Current, "deviceID": mm.deviceID})
+ return nil
+ }
+
+ mm.OnuMetricsManagerLock.RLock()
+ defer mm.OnuMetricsManagerLock.RUnlock()
+
+ FecHistoryInstKeys := mm.pOnuDeviceEntry.GetOnuDB().GetSortedInstKeys(ctx, me.AniGClassID)
+ if len(FecHistoryInstKeys) > 0 {
+ firstInstanceKey := FecHistoryInstKeys[0]
+ logger.Debugw(ctx, "First FEC History Instance Key and Length", log.Fields{"InstanceKey": firstInstanceKey, "FecHistoryInstKeys": FecHistoryInstKeys, "Length": len(FecHistoryInstKeys)})
+
+ logger.Debugw(ctx, "Collecting FEC stats for ONU", log.Fields{"EntityID": firstInstanceKey})
+ if metricInfo := mm.collectFecHistoryData(ctx, firstInstanceKey, true); metricInfo != nil {
+ // Process FEC metrics data here
+ logger.Debugw(ctx, "FEC metrics collected successfully", log.Fields{"metricInfo": metricInfo})
+
+ // Populate the response with the collected FEC metrics directly
+ fecResponse := resp.Response.GetFecHistory()
+ metrics := metricInfo.GetMetrics()
+ fecResponse.CorrectedBytes = mm.safeGetMetricValue(ctx, metrics, "corrected_bytes", mm.deviceID)
+ fecResponse.CorrectedCodeWords = mm.safeGetMetricValue(ctx, metrics, "corrected_code_words", mm.deviceID)
+ fecResponse.UncorrectableCodeWords = mm.safeGetMetricValue(ctx, metrics, "uncorrectable_code_words", mm.deviceID)
+ fecResponse.TotalCodeWords = mm.safeGetMetricValue(ctx, metrics, "total_code_words", mm.deviceID)
+ fecResponse.FecSeconds = mm.safeGetMetricValue(ctx, metrics, "fec_seconds", mm.deviceID)
+
+ logger.Debugw(ctx, "FEC response populated successfully",
+ log.Fields{"device-id": mm.deviceID, "correctedBytes": fecResponse.CorrectedBytes,
+ "correctedCodeWords": fecResponse.CorrectedCodeWords, "fecSeconds": fecResponse.FecSeconds})
+ } else {
+ logger.Warnw(ctx, "Failed to collect FEC metrics", log.Fields{"device-id": mm.deviceID, "EntityID": firstInstanceKey})
+ }
+ }
+
+ logger.Debugw(ctx, "FEC History collection completed", log.Fields{"device-id": mm.deviceID})
+ return &resp
+}