blob: 9c4c7122556cf22f3e55e7221887454b24eb1b7a [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
vinokuma926cb3e2023-03-29 11:41:06 +053014 */
Naveen Sampath04696f72022-06-13 15:19:14 +053015
16package controller
17
18import (
19 "context"
Sridhar Ravindra64b19ca2026-01-26 22:19:07 +053020 "errors"
Naveen Sampath04696f72022-06-13 15:19:14 +053021 "time"
22
Naveen Sampath04696f72022-06-13 15:19:14 +053023 "voltha-go-controller/internal/pkg/of"
24 "voltha-go-controller/internal/pkg/tasks"
25 "voltha-go-controller/internal/pkg/util"
Tinoj Joseph1d108322022-07-13 10:07:39 +053026 "voltha-go-controller/log"
vinokuma926cb3e2023-03-29 11:41:06 +053027
Naveen Sampath04696f72022-06-13 15:19:14 +053028 "github.com/opencord/voltha-protos/v5/go/common"
29 ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
30 "github.com/opencord/voltha-protos/v5/go/voltha"
31)
32
33var (
34 rcvdGroups map[uint32]*ofp.OfpGroupDesc
35 groupsToAdd []*of.Group
36 groupsToMod []*of.Group
37)
38
39// AuditTablesTask structure
40type AuditTablesTask struct {
Naveen Sampath04696f72022-06-13 15:19:14 +053041 ctx context.Context
42 device *Device
Naveen Sampath04696f72022-06-13 15:19:14 +053043 timestamp string
vinokuma926cb3e2023-03-29 11:41:06 +053044 taskID uint8
45 stop bool
Naveen Sampath04696f72022-06-13 15:19:14 +053046}
47
48// NewAuditTablesTask is constructor for AuditTablesTask
49func NewAuditTablesTask(device *Device) *AuditTablesTask {
50 var att AuditTablesTask
51 att.device = device
52 att.stop = false
53 tstamp := (time.Now()).Format(time.RFC3339Nano)
54 att.timestamp = tstamp
55 return &att
56}
57
58// Name returns name of the task
59func (att *AuditTablesTask) Name() string {
60 return "Audit Table Task"
61}
62
63// TaskID to return task id of the task
64func (att *AuditTablesTask) TaskID() uint8 {
65 return att.taskID
66}
67
68// Timestamp to return timestamp for the task
69func (att *AuditTablesTask) Timestamp() string {
70 return att.timestamp
71}
72
73// Stop to stop the task
74func (att *AuditTablesTask) Stop() {
75 att.stop = true
76}
77
78// Start is called by the framework and is responsible for implementing
79// the actual task.
80func (att *AuditTablesTask) Start(ctx context.Context, taskID uint8) error {
Akash Sonie863fe42023-11-30 14:35:01 +053081 logger.Debugw(ctx, "Audit Table Task Triggered", log.Fields{"Context": ctx, "taskId": taskID, "Device": att.device.ID})
Naveen Sampath04696f72022-06-13 15:19:14 +053082 att.taskID = taskID
83 att.ctx = ctx
84 var errInfo error
85 var err error
86
Tinoj Josephaf37ce82022-12-28 11:59:43 +053087 // Audit ports
88 if err = att.AuditPorts(); err != nil {
89 logger.Errorw(ctx, "Audit Ports Failed", log.Fields{"Reason": err.Error()})
90 errInfo = err
91 }
92
Naveen Sampath04696f72022-06-13 15:19:14 +053093 // Audit the meters
94 if err = att.AuditMeters(); err != nil {
95 logger.Errorw(ctx, "Audit Meters Failed", log.Fields{"Reason": err.Error()})
96 errInfo = err
97 }
98
99 // Audit the Groups
100 if rcvdGroups, err = att.AuditGroups(); err != nil {
101 logger.Errorw(ctx, "Audit Groups Failed", log.Fields{"Reason": err.Error()})
102 errInfo = err
103 }
104
105 // Audit the flows
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530106 if err = att.AuditFlows(ctx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530107 logger.Errorw(ctx, "Audit Flows Failed", log.Fields{"Reason": err.Error()})
108 errInfo = err
109 }
110
111 // Triggering deletion of excess groups from device after the corresponding flows are removed
112 // to avoid flow dependency error during group deletion
Akash Soni6168f312023-05-18 20:57:33 +0530113 logger.Debugw(ctx, "Excess Groups", log.Fields{"Groups": rcvdGroups})
Naveen Sampath04696f72022-06-13 15:19:14 +0530114 att.DelExcessGroups(rcvdGroups)
Akash Sonie863fe42023-11-30 14:35:01 +0530115 logger.Debugw(ctx, "Audit Table Task Completed", log.Fields{"Context": ctx, "taskId": taskID, "Device": att.device.ID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530116 return errInfo
Naveen Sampath04696f72022-06-13 15:19:14 +0530117}
118
119// AuditMeters : Audit the meters which includes fetching the existing meters at the
120// voltha and identifying the delta between the ones held here and the
121// ones held at VOLTHA. The delta must be cleaned up to keep both the
122// components in sync
123func (att *AuditTablesTask) AuditMeters() error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530124 if att.stop {
125 return tasks.ErrTaskCancelError
126 }
127 var vc voltha.VolthaServiceClient
128 if vc = att.device.VolthaClient(); vc == nil {
129 logger.Error(ctx, "Fetch Device Meters Failed: Voltha Client Unavailable")
130 return nil
131 }
132
133 //-----------------------------
134 // Perform the audit of meters
135 // Fetch the meters
136 ms, err := vc.ListLogicalDeviceMeters(att.ctx, &voltha.ID{Id: att.device.ID})
137 if err != nil {
138 logger.Warnw(ctx, "Audit of flows failed", log.Fields{"Reason": err.Error()})
139 return err
140 }
141
142 // Build the map for easy and faster processing
143 rcvdMeters := make(map[uint32]*ofp.OfpMeterStats)
144 for _, m := range ms.Items {
145 rcvdMeters[m.Stats.MeterId] = m.Stats
146 }
147
148 // Verify all meters that are in the controller but not in the device
149 missingMeters := []*of.Meter{}
150 for _, meter := range att.device.meters {
Naveen Sampath04696f72022-06-13 15:19:14 +0530151 if att.stop {
152 break
153 }
154 logger.Debugw(ctx, "Auditing Meter", log.Fields{"Id": meter.ID})
155
156 if _, ok := rcvdMeters[meter.ID]; ok {
157 // The meter exists in the device too. Just remove it from
158 // the received meters
159 delete(rcvdMeters, meter.ID)
160 } else {
161 // The flow exists at the controller but not at the device
162 // Push the flow to the device
balaji.nagarajan182b64f2025-09-04 11:25:17 +0530163 logger.Infow(ctx, "Adding Meter To Missing Meters", log.Fields{"Id": meter.ID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530164 missingMeters = append(missingMeters, meter)
165 }
166 }
167 if !att.stop {
168 att.AddMissingMeters(missingMeters)
169 att.DelExcessMeters(rcvdMeters)
170 } else {
171 err = tasks.ErrTaskCancelError
172 }
173 return err
174}
175
176// AddMissingMeters adds the missing meters detected by AuditMeters
177func (att *AuditTablesTask) AddMissingMeters(meters []*of.Meter) {
178 logger.Debugw(ctx, "Adding missing meters", log.Fields{"Number": len(meters)})
179 for _, meter := range meters {
180 meterMod, err := of.MeterUpdate(att.device.ID, of.MeterCommandAdd, meter)
181 if err != nil {
182 logger.Errorw(ctx, "Update Meter Table Failed", log.Fields{"Reason": err.Error()})
183 continue
184 }
185 if vc := att.device.VolthaClient(); vc != nil {
186 if _, err = vc.UpdateLogicalDeviceMeterTable(att.ctx, meterMod); err != nil {
187 logger.Errorw(ctx, "Update Meter Table Failed", log.Fields{"Reason": err.Error()})
188 }
189 } else {
190 logger.Error(ctx, "Update Meter Table Failed: Voltha Client Unavailable")
191 }
192 }
193}
194
195// DelExcessMeters to delete excess meters
196func (att *AuditTablesTask) DelExcessMeters(meters map[uint32]*ofp.OfpMeterStats) {
197 logger.Debugw(ctx, "Deleting Excess Meters", log.Fields{"Number": len(meters)})
198 for _, meter := range meters {
199 meterMod := &ofp.OfpMeterMod{}
200 meterMod.Command = ofp.OfpMeterModCommand_OFPMC_DELETE
201 meterMod.MeterId = meter.MeterId
202 meterUpd := &ofp.MeterModUpdate{Id: att.device.ID, MeterMod: meterMod}
203 if vc := att.device.VolthaClient(); vc != nil {
204 if _, err := vc.UpdateLogicalDeviceMeterTable(att.ctx, meterUpd); err != nil {
205 logger.Errorw(ctx, "Update Meter Table Failed", log.Fields{"Reason": err.Error()})
206 }
207 } else {
208 logger.Error(ctx, "Update Meter Table Failed: Voltha Client Unavailable")
209 }
210 }
211}
212
213// AuditFlows audit the flows which includes fetching the existing meters at the
214// voltha and identifying the delta between the ones held here and the
215// ones held at VOLTHA. The delta must be cleaned up to keep both the
216// components in sync
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530217func (att *AuditTablesTask) AuditFlows(cntx context.Context) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530218 if att.stop {
219 return tasks.ErrTaskCancelError
220 }
221
222 var vc voltha.VolthaServiceClient
223 if vc = att.device.VolthaClient(); vc == nil {
224 logger.Error(ctx, "Flow Audit Failed: Voltha Client Unavailable")
225 return nil
226 }
227
228 // ---------------------------------
229 // Perform the audit of flows first
230 // Retrieve the flows from the device
231 f, err := vc.ListLogicalDeviceFlows(att.ctx, &common.ID{Id: att.device.ID})
232 if err != nil {
233 logger.Warnw(ctx, "Audit of flows failed", log.Fields{"Reason": err.Error()})
234 return err
235 }
236
Sridhar Ravindra64b19ca2026-01-26 22:19:07 +0530237 // defaultSuccessFlowStatus := intf.FlowStatus{
238 // Device: att.device.ID,
239 // FlowModType: of.CommandAdd,
240 // Status: 0,
241 // Reason: "",
242 // }
Naveen Sampath04696f72022-06-13 15:19:14 +0530243
244 // Build the map for easy and faster processing
245 rcvdFlows := make(map[uint64]*ofp.OfpFlowStats)
Sridhar Ravindra64b19ca2026-01-26 22:19:07 +0530246 volthaFlows := make(map[uint64]*ofp.OfpFlowStats)
Naveen Sampath04696f72022-06-13 15:19:14 +0530247 flowsToAdd := &of.VoltFlow{}
248 flowsToAdd.SubFlows = make(map[uint64]*of.VoltSubFlow)
249 for _, flow := range f.Items {
250 rcvdFlows[flow.Cookie] = flow
Sridhar Ravindra64b19ca2026-01-26 22:19:07 +0530251 volthaFlows[flow.Cookie] = flow
Naveen Sampath04696f72022-06-13 15:19:14 +0530252 }
253
254 att.device.flowLock.Lock()
255 // Verify all flows that are in the controller but not in the device
256 for _, flow := range att.device.flows {
Naveen Sampath04696f72022-06-13 15:19:14 +0530257 if att.stop {
258 break
259 }
260
Akash Sonief452f12024-12-12 18:20:28 +0530261 logger.Debugw(ctx, "Auditing Flow", log.Fields{"Cookie": flow.Cookie, "State": flow.State})
Naveen Sampath04696f72022-06-13 15:19:14 +0530262 if _, ok := rcvdFlows[flow.Cookie]; ok {
263 // The flow exists in the device too. Just remove it from
264 // the received flows & trigger flow success indication unless
265 // the flow in del failure/pending state
266
267 if flow.State != of.FlowDelFailure && flow.State != of.FlowDelPending {
268 delete(rcvdFlows, flow.Cookie)
Akash Sonief452f12024-12-12 18:20:28 +0530269 } else {
270 // Update flow delete count since we are retrying the flow delete due to failure
271 att.device.UpdateFlowCount(cntx, flow.Cookie)
Naveen Sampath04696f72022-06-13 15:19:14 +0530272 }
Sridhar Ravindra64b19ca2026-01-26 22:19:07 +0530273 // defaultSuccessFlowStatus.Cookie = strconv.FormatUint(flow.Cookie, 10)
Naveen Sampath04696f72022-06-13 15:19:14 +0530274 } else {
Sridhar Ravindra64b19ca2026-01-26 22:19:07 +0530275 // Do not add the flow to device whose state was marked as delete failure
276 // Remove the flow from DB as it is no longer reported by voltha
277 if flow.State == of.FlowDelFailure {
278 delete(att.device.flows, flow.Cookie)
279 att.device.DelFlowFromDb(cntx, flow.Cookie)
280 logger.Warnw(ctx, "Found flow with state DelFailure while adding to device, will remove from DB", log.Fields{"Cookie": flow.Cookie})
281 continue
282 }
Naveen Sampath04696f72022-06-13 15:19:14 +0530283 // The flow exists at the controller but not at the device
284 // Push the flow to the device
Sridhar Ravindra64b19ca2026-01-26 22:19:07 +0530285
286 // If UST0 flow is missing in voltha but the UST1 flow is present in voltha,
287 // then delete the UST1 flow from voltha and add both US flows to voltha
288 if att.device.IsUSTable0Flow(ctx, flow) {
289 logger.Debugw(ctx, "UST0 flow found, checking for UST1 flow", log.Fields{"Cookie": flow.Cookie})
290 ust1Flow := att.device.GetDeviceFlow(ctx, flow, att.device.SerialNum, att.device.ID, false)
291 if ust1Flow != nil {
292 flowToDelete, ok := volthaFlows[ust1Flow.Cookie]
293 if ok {
294 // Sometimes the audit would happen even before all flows are installed for a service.
295 // If the UST0 flow is still missing in voltha on second retry attempt, then remove the UST1 flow from voltha.
296 if flow.FlowCount == 0 {
297 att.device.UpdateFlowCount(cntx, flow.Cookie)
298 continue
299 }
300 logger.Infow(ctx, "UST1 flow already present in Voltha, delete and install US flows", log.Fields{"UST1Flow": ust1Flow.Cookie, "UST0Flow": flow.Cookie})
301 err = att.DeleteDeviceFlow(ctx, flowToDelete)
302 if err == nil {
303 flowsToAdd.SubFlows[ust1Flow.Cookie] = ust1Flow
304 } else {
305 logger.Warnw(ctx, "UST1 flow delete failed", log.Fields{"Cookie": ust1Flow.Cookie, "Reason": err.Error()})
306 }
307 }
308 }
309 }
310
311 // If DST0 flow is missing in voltha but the DST1 flow is present in voltha,
312 // then delete the DST1 flow from voltha and add both DS flows to voltha
313 if att.device.IsDSTable0Flow(ctx, flow) {
314 logger.Debugw(ctx, "DST0 flow found, checking for DST1 flow", log.Fields{"Cookie": flow.Cookie})
315 dst1Flow := att.device.GetDeviceFlow(ctx, flow, att.device.SerialNum, att.device.ID, true)
316 if dst1Flow != nil {
317 flowToDelete, ok := volthaFlows[dst1Flow.Cookie]
318 if ok {
319 // Sometimes the audit would happen even before all flows are installed for a service.
320 // If the DST0 flow is still missing in voltha on second retry attempt, then remove the DST1 flow from voltha.
321 if flow.FlowCount == 0 {
322 att.device.UpdateFlowCount(cntx, flow.Cookie)
323 continue
324 }
325 logger.Infow(ctx, "DST1 flow already present in Voltha, delete and install DS flows", log.Fields{"DST1Flow": dst1Flow.Cookie, "DST0Flow": flow.Cookie})
326 err = att.DeleteDeviceFlow(ctx, flowToDelete)
327 if err == nil {
328 flowsToAdd.SubFlows[dst1Flow.Cookie] = dst1Flow
329 } else {
330 logger.Warnw(ctx, "DST1 flow delete failed", log.Fields{"Cookie": dst1Flow.Cookie, "Reason": err.Error()})
331 }
332 }
333 }
334 }
Naveen Sampath04696f72022-06-13 15:19:14 +0530335 logger.Debugw(ctx, "Adding Flow To Missing Flows", log.Fields{"Cookie": flow.Cookie})
Akash Sonief452f12024-12-12 18:20:28 +0530336 if !att.device.IsFlowAddThresholdReached(flow.FlowCount, flow.Cookie) {
337 flowsToAdd.SubFlows[flow.Cookie] = flow
338 att.device.UpdateFlowCount(cntx, flow.Cookie)
339 } else if flow.State != of.FlowDelFailure {
340 // Release the lock before deactivating service, as we acquire the same lock to delete flows
341 att.device.flowLock.Unlock()
342 // If flow add threshold has reached, deactivate the service corresponding to the UNI
343 GetController().CheckAndDeactivateService(cntx, flow, att.device.SerialNum, att.device.ID)
344 // Acquire the lock again for processing remaining flows
345 att.device.flowLock.Lock()
346 }
Naveen Sampath04696f72022-06-13 15:19:14 +0530347 }
348 }
349 att.device.flowLock.Unlock()
350
351 if !att.stop {
352 // The flows remaining in the received flows are the excess flows at
353 // the device. Delete those flows
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530354 att.DelExcessFlows(cntx, rcvdFlows)
Naveen Sampath04696f72022-06-13 15:19:14 +0530355 // Add the flows missing at the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530356 att.AddMissingFlows(cntx, flowsToAdd)
Naveen Sampath04696f72022-06-13 15:19:14 +0530357 } else {
358 err = tasks.ErrTaskCancelError
359 }
360 return err
361}
362
363// AddMissingFlows : The flows missing from the device are reinstalled att the audit
364// The flows are added into a VoltFlow structure.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530365func (att *AuditTablesTask) AddMissingFlows(cntx context.Context, mflow *of.VoltFlow) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530366 logger.Debugw(ctx, "Add Missing Flows", log.Fields{"Number": len(mflow.SubFlows)})
367 mflow.Command = of.CommandAdd
368 ofFlows := of.ProcessVoltFlow(att.device.ID, mflow.Command, mflow.SubFlows)
369 var vc voltha.VolthaServiceClient
370 var bwConsumedInfo of.BwAvailDetails
371 if vc = att.device.VolthaClient(); vc == nil {
372 logger.Error(ctx, "Update Flow Table Failed: Voltha Client Unavailable")
373 return
374 }
375 for _, flow := range ofFlows {
Naveen Sampath04696f72022-06-13 15:19:14 +0530376 if flow.FlowMod != nil {
Akash Sonief452f12024-12-12 18:20:28 +0530377 if _, present := att.device.GetFlow(flow.FlowMod.Cookie); !present {
Tinoj Joseph1d108322022-07-13 10:07:39 +0530378 logger.Warnw(ctx, "Flow Removed from DB. Ignoring Add Missing Flow", log.Fields{"Device": att.device.ID, "Cookie": flow.FlowMod.Cookie})
Naveen Sampath04696f72022-06-13 15:19:14 +0530379 continue
380 }
381 }
382 var err error
383 if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flow); err != nil {
384 logger.Errorw(ctx, "Update Flow Table Failed", log.Fields{"Reason": err.Error()})
385 }
Akash Sonief452f12024-12-12 18:20:28 +0530386 att.device.triggerFlowNotification(cntx, flow.FlowMod.Cookie, of.CommandAdd, bwConsumedInfo, err)
Naveen Sampath04696f72022-06-13 15:19:14 +0530387 }
388}
389
390// DelExcessFlows delete the excess flows held at the VOLTHA
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530391func (att *AuditTablesTask) DelExcessFlows(cntx context.Context, flows map[uint64]*ofp.OfpFlowStats) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530392 logger.Debugw(ctx, "Deleting Excess Flows", log.Fields{"Number of Flows": len(flows)})
393
394 var vc voltha.VolthaServiceClient
395 if vc = att.device.VolthaClient(); vc == nil {
396 logger.Error(ctx, "Delete Excess Flows Failed: Voltha Client Unavailable")
397 return
398 }
399
400 // Let's cycle through the flows to delete the excess flows
401 for _, flow := range flows {
Akash Sonief452f12024-12-12 18:20:28 +0530402 if dbFlow, present := att.device.GetFlow(flow.Cookie); present {
403 if dbFlow.State != of.FlowDelFailure && dbFlow.State != of.FlowDelPending {
404 logger.Warnw(ctx, "Flow Present in DB. Ignoring Delete Excess Flow", log.Fields{"Device": att.device.ID, "Cookie": flow.Cookie})
405 continue
406 }
407 } else {
408 logger.Debugw(ctx, "Flow removed from DB after delete threshold reached. Ignoring Delete Excess Flow", log.Fields{"Device": att.device.ID, "Cookie": flow.Cookie})
Sridhar Ravindra3ec14232024-01-01 19:11:48 +0530409 continue
410 }
411
Naveen Sampath04696f72022-06-13 15:19:14 +0530412 logger.Debugw(ctx, "Deleting Flow", log.Fields{"Cookie": flow.Cookie})
413 // Create the flowMod structure and fill it out
414 flowMod := &ofp.OfpFlowMod{}
415 flowMod.Cookie = flow.Cookie
416 flowMod.TableId = flow.TableId
417 flowMod.Command = ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT
418 flowMod.IdleTimeout = flow.IdleTimeout
419 flowMod.HardTimeout = flow.HardTimeout
420 flowMod.Priority = flow.Priority
421 flowMod.BufferId = of.DefaultBufferID
422 flowMod.OutPort = of.DefaultOutPort
423 flowMod.OutGroup = of.DefaultOutGroup
424 flowMod.Flags = flow.Flags
425 flowMod.Match = flow.Match
426 flowMod.Instructions = flow.Instructions
427
428 // Create FlowTableUpdate
429 flowUpdate := &ofp.FlowTableUpdate{
430 Id: att.device.ID,
431 FlowMod: flowMod,
432 }
433
434 var err error
435 if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flowUpdate); err != nil {
436 logger.Errorw(ctx, "Flow Audit Delete Failed", log.Fields{"Reason": err.Error()})
437 }
Akash Sonief452f12024-12-12 18:20:28 +0530438 att.device.triggerFlowNotification(cntx, flow.Cookie, of.CommandDel, of.BwAvailDetails{}, err)
Naveen Sampath04696f72022-06-13 15:19:14 +0530439 }
440}
441
Sridhar Ravindra64b19ca2026-01-26 22:19:07 +0530442func (att *AuditTablesTask) DeleteDeviceFlow(cntx context.Context, flow *ofp.OfpFlowStats) error {
443 logger.Debugw(ctx, "Deleting Flow", log.Fields{"Cookie": flow.Cookie})
444
445 // Create the flowMod structure and fill it out
446 flowMod := &ofp.OfpFlowMod{}
447 flowMod.Cookie = flow.Cookie
448 flowMod.TableId = flow.TableId
449 flowMod.Command = ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT
450 flowMod.IdleTimeout = flow.IdleTimeout
451 flowMod.HardTimeout = flow.HardTimeout
452 flowMod.Priority = flow.Priority
453 flowMod.BufferId = of.DefaultBufferID
454 flowMod.OutPort = of.DefaultOutPort
455 flowMod.OutGroup = of.DefaultOutGroup
456 flowMod.Flags = flow.Flags
457 flowMod.Match = flow.Match
458 flowMod.Instructions = flow.Instructions
459
460 // Create FlowTableUpdate
461 flowUpdate := &ofp.FlowTableUpdate{
462 Id: att.device.ID,
463 FlowMod: flowMod,
464 }
465
466 var err error
467 var vc voltha.VolthaServiceClient
468 if vc = att.device.VolthaClient(); vc == nil {
469 logger.Error(ctx, "Delete flow failed: Voltha Client Unavailable")
470 return errors.New("voltha client unavailable")
471 }
472
473 if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flowUpdate); err != nil {
474 logger.Errorw(ctx, "Flow delete failed", log.Fields{"Reason": err.Error()})
475 return err
476 }
477 return nil
478}
479
Naveen Sampath04696f72022-06-13 15:19:14 +0530480// AuditGroups audit the groups which includes fetching the existing groups at the
481// voltha and identifying the delta between the ones held here and the
482// ones held at VOLTHA. The delta must be cleaned up to keep both the
483// components in sync
484func (att *AuditTablesTask) AuditGroups() (map[uint32]*ofp.OfpGroupDesc, error) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530485 // Build the map for easy and faster processing
486 rcvdGroups = make(map[uint32]*ofp.OfpGroupDesc)
487
488 if att.stop {
489 return rcvdGroups, tasks.ErrTaskCancelError
490 }
491
492 var vc voltha.VolthaServiceClient
493 if vc = att.device.VolthaClient(); vc == nil {
494 logger.Error(ctx, "Group Audit Failed: Voltha Client Unavailable")
495 return rcvdGroups, nil
496 }
497
498 // ---------------------------------
499 // Perform the audit of groups first
500 // Retrieve the groups from the device
501 g, err := vc.ListLogicalDeviceFlowGroups(att.ctx, &common.ID{Id: att.device.ID})
502 if err != nil {
503 logger.Warnw(ctx, "Audit of groups failed", log.Fields{"Reason": err.Error()})
504 return rcvdGroups, err
505 }
506
507 groupsToAdd = []*of.Group{}
508 groupsToMod = []*of.Group{}
509 for _, group := range g.Items {
510 rcvdGroups[group.Desc.GroupId] = group.Desc
511 }
Akash Sonie863fe42023-11-30 14:35:01 +0530512 logger.Debugw(ctx, "Received Groups", log.Fields{"Groups": rcvdGroups})
Naveen Sampath04696f72022-06-13 15:19:14 +0530513
514 // Verify all groups that are in the controller but not in the device
515 att.device.groups.Range(att.compareGroupEntries)
516
517 if !att.stop {
518 // Add the groups missing at the device
Akash Sonie863fe42023-11-30 14:35:01 +0530519 logger.Debugw(ctx, "Missing Groups", log.Fields{"Groups": groupsToAdd})
Naveen Sampath04696f72022-06-13 15:19:14 +0530520 att.AddMissingGroups(groupsToAdd)
521
522 // Update groups with group member mismatch
Akash Sonie863fe42023-11-30 14:35:01 +0530523 logger.Debugw(ctx, "Modify Groups", log.Fields{"Groups": groupsToMod})
Naveen Sampath04696f72022-06-13 15:19:14 +0530524 att.UpdateMismatchGroups(groupsToMod)
525
526 // Note: Excess groups will be deleted after ensuring the connected
527 // flows are also removed as part fo audit flows
528 } else {
529 err = tasks.ErrTaskCancelError
530 }
531 // The groups remaining in the received groups are the excess groups at
532 // the device
533 return rcvdGroups, err
534}
535
536// compareGroupEntries to compare the group entries
537func (att *AuditTablesTask) compareGroupEntries(key, value interface{}) bool {
Naveen Sampath04696f72022-06-13 15:19:14 +0530538 if att.stop {
539 return false
540 }
541
542 groupID := key.(uint32)
543 dbGroup := value.(*of.Group)
544 logger.Debugw(ctx, "Auditing Group", log.Fields{"Groupid": groupID})
545 if rcvdGrp, ok := rcvdGroups[groupID]; ok {
546 // The group exists in the device too.
547 // Compare the group members and add to modify list if required
548 compareGroupMembers(dbGroup, rcvdGrp)
549 delete(rcvdGroups, groupID)
550 } else {
551 // The group exists at the controller but not at the device
552 // Push the group to the device
553 logger.Debugw(ctx, "Adding Group To Missing Groups", log.Fields{"GroupId": groupID})
554 groupsToAdd = append(groupsToAdd, value.(*of.Group))
555 }
556 return true
557}
558
559func compareGroupMembers(refGroup *of.Group, rcvdGroup *ofp.OfpGroupDesc) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530560 portList := []uint32{}
561 refPortList := []uint32{}
562
vinokuma926cb3e2023-03-29 11:41:06 +0530563 // Collect port list from response Group Mod structure
564 // If PON is configured even for one group, then only PON shall be considered for compared for all groups
Naveen Sampath04696f72022-06-13 15:19:14 +0530565 for _, bucket := range rcvdGroup.Buckets {
566 for _, actionBucket := range bucket.Actions {
567 if actionBucket.Type == ofp.OfpActionType_OFPAT_OUTPUT {
568 action := actionBucket.GetOutput()
569 portList = append(portList, action.Port)
570 }
571 }
572 }
573
574 refPortList = append(refPortList, refGroup.Buckets...)
575
vinokuma926cb3e2023-03-29 11:41:06 +0530576 // Is port list differs, trigger group update
Naveen Sampath04696f72022-06-13 15:19:14 +0530577 if !util.IsSliceSame(refPortList, portList) {
578 groupsToMod = append(groupsToMod, refGroup)
579 }
580}
581
vinokuma926cb3e2023-03-29 11:41:06 +0530582// AddMissingGroups - addmissing groups to Voltha
Naveen Sampath04696f72022-06-13 15:19:14 +0530583func (att *AuditTablesTask) AddMissingGroups(groupList []*of.Group) {
584 att.PushGroups(groupList, of.GroupCommandAdd)
585}
586
vinokuma926cb3e2023-03-29 11:41:06 +0530587// UpdateMismatchGroups - updates mismatched groups to Voltha
Naveen Sampath04696f72022-06-13 15:19:14 +0530588func (att *AuditTablesTask) UpdateMismatchGroups(groupList []*of.Group) {
589 att.PushGroups(groupList, of.GroupCommandMod)
590}
591
592// PushGroups - The groups missing/to be updated in the device are reinstalled att the audit
593func (att *AuditTablesTask) PushGroups(groupList []*of.Group, grpCommand of.GroupCommand) {
594 logger.Debugw(ctx, "Pushing Groups", log.Fields{"Number": len(groupList), "Command": grpCommand})
595
596 var vc voltha.VolthaServiceClient
597 if vc = att.device.VolthaClient(); vc == nil {
598 logger.Error(ctx, "Update Group Table Failed: Voltha Client Unavailable")
599 return
600 }
601 for _, group := range groupList {
602 group.Command = grpCommand
603 groupUpdate := of.CreateGroupTableUpdate(group)
604 if _, err := vc.UpdateLogicalDeviceFlowGroupTable(att.ctx, groupUpdate); err != nil {
605 logger.Errorw(ctx, "Update Group Table Failed", log.Fields{"Reason": err.Error()})
606 }
607 }
608}
609
610// DelExcessGroups - Delete the excess groups held at the VOLTHA
611func (att *AuditTablesTask) DelExcessGroups(groups map[uint32]*ofp.OfpGroupDesc) {
612 logger.Debugw(ctx, "Deleting Excess Groups", log.Fields{"Number of Groups": len(groups)})
613
614 var vc voltha.VolthaServiceClient
615 if vc = att.device.VolthaClient(); vc == nil {
616 logger.Error(ctx, "Delete Excess Groups Failed: Voltha Client Unavailable")
617 return
618 }
619
620 // Let's cycle through the groups to delete the excess groups
621 for _, groupDesc := range groups {
622 logger.Debugw(ctx, "Deleting Group", log.Fields{"GroupId": groupDesc.GroupId})
623 group := &of.Group{}
624 group.Device = att.device.ID
625 group.GroupID = groupDesc.GroupId
626
vinokuma926cb3e2023-03-29 11:41:06 +0530627 // Group Members should be deleted before triggered group delete
Naveen Sampath04696f72022-06-13 15:19:14 +0530628 group.Command = of.GroupCommandMod
629 groupUpdate := of.CreateGroupTableUpdate(group)
630 if _, err := vc.UpdateLogicalDeviceFlowGroupTable(att.ctx, groupUpdate); err != nil {
631 logger.Errorw(ctx, "Update Group Table Failed", log.Fields{"Reason": err.Error()})
632 }
633
634 group.Command = of.GroupCommandDel
635 groupUpdate = of.CreateGroupTableUpdate(group)
636 if _, err := vc.UpdateLogicalDeviceFlowGroupTable(att.ctx, groupUpdate); err != nil {
637 logger.Errorw(ctx, "Update Group Table Failed", log.Fields{"Reason": err.Error()})
638 }
639 }
640}
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530641
642func (att *AuditTablesTask) AuditPorts() error {
vinokuma926cb3e2023-03-29 11:41:06 +0530643 if att.stop {
644 return tasks.ErrTaskCancelError
645 }
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530646
vinokuma926cb3e2023-03-29 11:41:06 +0530647 var vc voltha.VolthaServiceClient
648 if vc = att.device.VolthaClient(); vc == nil {
649 logger.Error(ctx, "Flow Audit Failed: Voltha Client Unavailable")
650 return nil
651 }
652 ofpps, err := vc.ListLogicalDevicePorts(att.ctx, &common.ID{Id: att.device.ID})
653 if err != nil {
654 return err
655 }
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530656
vinokuma926cb3e2023-03-29 11:41:06 +0530657 // Compute the difference between the ports received and ports at VGC
658 // First build a map of all the received ports under missing ports. We
659 // will eliminate the ports that are in the device from the missing ports
660 // so that the elements remaining are missing ports. The ones that are
661 // not in missing ports are added to excess ports which should be deleted
662 // from the VGC.
663 missingPorts := make(map[uint32]*ofp.OfpPort)
664 for _, ofpp := range ofpps.Items {
665 missingPorts[ofpp.OfpPort.PortNo] = ofpp.OfpPort
666 }
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530667
Sridhar Ravindra0bc5dc52023-12-13 19:03:30 +0530668 excessPorts := make(map[uint32]*DevicePort)
vinokuma926cb3e2023-03-29 11:41:06 +0530669 processPortState := func(id uint32, vgcPort *DevicePort) {
670 logger.Debugw(ctx, "Process Port State Ind", log.Fields{"Port No": vgcPort.ID, "Port Name": vgcPort.Name})
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530671
vinokuma926cb3e2023-03-29 11:41:06 +0530672 if ofpPort, ok := missingPorts[id]; ok {
Akash Sonief452f12024-12-12 18:20:28 +0530673 if vgcPort.Name != ofpPort.Name {
674 logger.Infow(ctx, "Port Name Mismatch", log.Fields{"vgcPort": vgcPort.Name, "ofpPort": ofpPort.Name, "ID": id})
675 att.DeleteMismatchPorts(ctx, vgcPort, ofpPort.Name)
676 return
677 }
vinokuma926cb3e2023-03-29 11:41:06 +0530678 if ((vgcPort.State == PortStateDown) && (ofpPort.State == uint32(ofp.OfpPortState_OFPPS_LIVE))) || ((vgcPort.State == PortStateUp) && (ofpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE))) {
679 // This port exists in the received list and the map at
680 // VGC. This is common so delete it
681 logger.Infow(ctx, "Port State Mismatch", log.Fields{"Port": vgcPort.ID, "OfpPort": ofpPort.PortNo, "ReceivedState": ofpPort.State, "CurrentState": vgcPort.State})
Sridhar Ravindra64b19ca2026-01-26 22:19:07 +0530682 att.device.ProcessPortState(ctx, ofpPort.PortNo, ofpPort.State, ofpPort.Name, false)
vinokuma926cb3e2023-03-29 11:41:06 +0530683 }
684 delete(missingPorts, id)
685 } else {
686 // This port is missing from the received list. This is an
687 // excess port at VGC. This must be added to excess ports
Sridhar Ravindra0bc5dc52023-12-13 19:03:30 +0530688 excessPorts[id] = vgcPort
vinokuma926cb3e2023-03-29 11:41:06 +0530689 }
690 logger.Debugw(ctx, "Processed Port State Ind", log.Fields{"Port No": vgcPort.ID, "Port Name": vgcPort.Name})
691 }
692 // 1st process the NNI port before all other ports so that the device state can be updated.
Sridhar Ravindra64b19ca2026-01-26 22:19:07 +0530693 for id, vgcPort := range att.device.PortsByID {
694 if util.IsNniPort(id) {
695 logger.Debugw(ctx, "Processing NNI port state", log.Fields{"Port ID": vgcPort.ID, "Port Name": vgcPort.Name})
696 processPortState(id, vgcPort)
697 }
vinokuma926cb3e2023-03-29 11:41:06 +0530698 }
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530699
vinokuma926cb3e2023-03-29 11:41:06 +0530700 for id, vgcPort := range att.device.PortsByID {
Sridhar Ravindra64b19ca2026-01-26 22:19:07 +0530701 if util.IsNniPort(id) {
vinokuma926cb3e2023-03-29 11:41:06 +0530702 // NNI port already processed
703 continue
704 }
705 if att.stop {
706 break
707 }
708 processPortState(id, vgcPort)
709 }
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530710
711 if att.stop {
Akash Soni6168f312023-05-18 20:57:33 +0530712 logger.Warnw(ctx, "Audit Device Task Canceled", log.Fields{"Context": att.ctx, "Task": att.taskID})
vinokuma926cb3e2023-03-29 11:41:06 +0530713 return tasks.ErrTaskCancelError
714 }
715 att.AddMissingPorts(ctx, missingPorts)
716 att.DelExcessPorts(ctx, excessPorts)
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530717 return nil
718}
719
720// AddMissingPorts to add the missing ports
721func (att *AuditTablesTask) AddMissingPorts(cntx context.Context, mps map[uint32]*ofp.OfpPort) {
Sridhar Ravindra3ec14232024-01-01 19:11:48 +0530722 logger.Debugw(ctx, "Device Audit - Add Missing Ports", log.Fields{"NumPorts": len(mps)})
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530723
vinokuma926cb3e2023-03-29 11:41:06 +0530724 addMissingPort := func(mp *ofp.OfpPort) {
725 logger.Debugw(ctx, "Process Port Add Ind", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name})
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530726
vinokuma926cb3e2023-03-29 11:41:06 +0530727 if err := att.device.AddPort(cntx, mp); err != nil {
728 logger.Warnw(ctx, "AddPort Failed", log.Fields{"No": mp.PortNo, "Name": mp.Name, "Reason": err})
729 }
730 if mp.State == uint32(ofp.OfpPortState_OFPPS_LIVE) {
Sridhar Ravindra64b19ca2026-01-26 22:19:07 +0530731 att.device.ProcessPortState(cntx, mp.PortNo, mp.State, mp.Name, false)
vinokuma926cb3e2023-03-29 11:41:06 +0530732 }
733 logger.Debugw(ctx, "Processed Port Add Ind", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name})
734 }
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530735
vinokuma926cb3e2023-03-29 11:41:06 +0530736 // 1st process the NNI port before all other ports so that the flow provisioning for UNIs can be enabled
Sridhar Ravindra64b19ca2026-01-26 22:19:07 +0530737 for portNo, mp := range mps {
738 if util.IsNniPort(portNo) {
739 logger.Debugw(ctx, "Adding Missing NNI port", log.Fields{"PortNo": mp.PortNo, "Port Name": mp.Name, "Port Status": mp.State})
740 addMissingPort(mp)
741 }
vinokuma926cb3e2023-03-29 11:41:06 +0530742 }
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530743
vinokuma926cb3e2023-03-29 11:41:06 +0530744 for portNo, mp := range mps {
Sridhar Ravindra64b19ca2026-01-26 22:19:07 +0530745 if !util.IsNniPort(portNo) {
vinokuma926cb3e2023-03-29 11:41:06 +0530746 addMissingPort(mp)
747 }
748 }
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530749}
750
751// DelExcessPorts to delete the excess ports
Sridhar Ravindra0bc5dc52023-12-13 19:03:30 +0530752func (att *AuditTablesTask) DelExcessPorts(cntx context.Context, eps map[uint32]*DevicePort) {
vinokuma926cb3e2023-03-29 11:41:06 +0530753 logger.Debugw(ctx, "Device Audit - Delete Excess Ports", log.Fields{"NumPorts": len(eps)})
Sridhar Ravindra0bc5dc52023-12-13 19:03:30 +0530754 for portNo, ep := range eps {
vinokuma926cb3e2023-03-29 11:41:06 +0530755 // Now delete the port from the device @ VGC
Sridhar Ravindra0bc5dc52023-12-13 19:03:30 +0530756 logger.Debugw(ctx, "Device Audit - Deleting Port", log.Fields{"PortId": portNo})
757 if err := att.device.DelPort(cntx, ep.ID, ep.Name); err != nil {
758 logger.Warnw(ctx, "DelPort Failed", log.Fields{"PortId": portNo, "Reason": err})
vinokuma926cb3e2023-03-29 11:41:06 +0530759 }
760 }
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530761}
Akash Sonief452f12024-12-12 18:20:28 +0530762
763func (att *AuditTablesTask) DeleteMismatchPorts(cntx context.Context, vgcPort *DevicePort, ofpPortName string) {
764 logger.Infow(ctx, "Deleting port in VGC due to mismatch with voltha", log.Fields{"vgcPortID": vgcPort.ID, "vgcPortName": vgcPort.Name})
765 _ = att.device.DelPort(cntx, vgcPort.ID, vgcPort.Name)
766 if p := att.device.GetPortByName(ofpPortName); p != nil {
767 logger.Infow(ctx, "Delete port by name in VGC due to mismatch with voltha", log.Fields{"portID": p.ID, "portName": p.Name})
768 _ = att.device.DelPort(cntx, p.ID, p.Name)
769 }
770}