blob: 93fae1477bda7e7f0fd756f021836881d9a9a494 [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
vinokuma926cb3e2023-03-29 11:41:06 +053014 */
Naveen Sampath04696f72022-06-13 15:19:14 +053015
16package controller
17
18import (
19 "context"
20 "encoding/json"
21 "errors"
Tinoj Joseph429b9d92022-11-16 18:51:05 +053022 "fmt"
Naveen Sampath04696f72022-06-13 15:19:14 +053023 "strconv"
Tinoj Joseph429b9d92022-11-16 18:51:05 +053024 "strings"
Naveen Sampath04696f72022-06-13 15:19:14 +053025 "sync"
26 "time"
vinokuma926cb3e2023-03-29 11:41:06 +053027 infraerror "voltha-go-controller/internal/pkg/errorcodes"
Naveen Sampath04696f72022-06-13 15:19:14 +053028
29 "voltha-go-controller/database"
30 "voltha-go-controller/internal/pkg/holder"
31 "voltha-go-controller/internal/pkg/intf"
32 "voltha-go-controller/internal/pkg/of"
vinokuma926cb3e2023-03-29 11:41:06 +053033
Naveen Sampath04696f72022-06-13 15:19:14 +053034 //"voltha-go-controller/internal/pkg/vpagent"
35 "voltha-go-controller/internal/pkg/tasks"
36 "voltha-go-controller/internal/pkg/util"
Tinoj Joseph1d108322022-07-13 10:07:39 +053037 "voltha-go-controller/log"
vinokuma926cb3e2023-03-29 11:41:06 +053038
Naveen Sampath04696f72022-06-13 15:19:14 +053039 ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
40 "github.com/opencord/voltha-protos/v5/go/voltha"
41)
42
43// PortState type
44type PortState string
45
46const (
47 // PortStateDown constant
48 PortStateDown PortState = "DOWN"
49 // PortStateUp constant
50 PortStateUp PortState = "UP"
51 // DefaultMaxFlowQueues constant
52 DefaultMaxFlowQueues = 67
53 //ErrDuplicateFlow - indicates flow already exists in DB
Akash Sonid36d23b2023-08-18 12:51:40 +053054 ErrDuplicateFlow string = "duplicate flow"
55 //Unknown_Port_ID - indicates that the port id is unknown
56 Unknown_Port_ID = "unknown port id"
57 //Duplicate_Port - indicates the port is already exist in controller
58 Duplicate_Port = "duplicate port"
Naveen Sampath04696f72022-06-13 15:19:14 +053059)
60
61// DevicePort structure
62type DevicePort struct {
vinokuma926cb3e2023-03-29 11:41:06 +053063 Name string
64 State PortState
65 Version string
66 HwAddr string
Naveen Sampath04696f72022-06-13 15:19:14 +053067 tasks.Tasks
Tinoj Joseph429b9d92022-11-16 18:51:05 +053068 CurrSpeed uint32
69 MaxSpeed uint32
vinokuma926cb3e2023-03-29 11:41:06 +053070 ID uint32
Naveen Sampath04696f72022-06-13 15:19:14 +053071}
72
73// NewDevicePort is the constructor for DevicePort
Tinoj Joseph429b9d92022-11-16 18:51:05 +053074func NewDevicePort(mp *ofp.OfpPort) *DevicePort {
Naveen Sampath04696f72022-06-13 15:19:14 +053075 var port DevicePort
76
Tinoj Joseph429b9d92022-11-16 18:51:05 +053077 port.ID = mp.PortNo
78 port.Name = mp.Name
79
80 //port.HwAddr = strings.Trim(strings.Join(strings.Fields(fmt.Sprint("%02x", mp.HwAddr)), ":"), "[]")
81 port.HwAddr = strings.Trim(strings.ReplaceAll(fmt.Sprintf("%02x", mp.HwAddr), " ", ":"), "[]")
82 port.CurrSpeed = mp.CurrSpeed
83 port.MaxSpeed = mp.MaxSpeed
Naveen Sampath04696f72022-06-13 15:19:14 +053084 port.State = PortStateDown
85 return &port
86}
87
88// UniIDFlowQueue structure which maintains flows in queue.
89type UniIDFlowQueue struct {
90 tasks.Tasks
91 ID uint32
92}
93
94// NewUniIDFlowQueue is the constructor for UniIDFlowQueue.
95func NewUniIDFlowQueue(id uint32) *UniIDFlowQueue {
96 var flowQueue UniIDFlowQueue
97 flowQueue.ID = id
98 return &flowQueue
99}
100
101// DeviceState type
102type DeviceState string
103
104const (
105
106 // DeviceStateUNKNOWN constant
107 DeviceStateUNKNOWN DeviceState = "UNKNOWN"
108 // DeviceStateINIT constant
109 DeviceStateINIT DeviceState = "INIT"
110 // DeviceStateUP constant
111 DeviceStateUP DeviceState = "UP"
112 // DeviceStateDOWN constant
113 DeviceStateDOWN DeviceState = "DOWN"
114 // DeviceStateREBOOTED constant
115 DeviceStateREBOOTED DeviceState = "REBOOTED"
116 // DeviceStateDISABLED constant
117 DeviceStateDISABLED DeviceState = "DISABLED"
118 // DeviceStateDELETED constant
119 DeviceStateDELETED DeviceState = "DELETED"
120)
121
Akash Soni6f369452023-09-19 11:18:28 +0530122type DeviceInterface interface {
123 SetFlowHash(cntx context.Context, hash uint32)
124}
125
Naveen Sampath04696f72022-06-13 15:19:14 +0530126// Device structure
127type Device struct {
vinokuma926cb3e2023-03-29 11:41:06 +0530128 ctx context.Context
129 cancel context.CancelFunc
130 vclientHolder *holder.VolthaServiceClientHolder
131 packetOutChannel chan *ofp.PacketOut
132 PortsByName map[string]*DevicePort
133 flows map[uint64]*of.VoltSubFlow
134 PortsByID map[uint32]*DevicePort
135 meters map[uint32]*of.Meter
136 flowQueue map[uint32]*UniIDFlowQueue // key is hash ID generated and value is UniIDFlowQueue.
137 SouthBoundID string
138 MfrDesc string
139 HwDesc string
140 SwDesc string
141 ID string
142 SerialNum string
143 State DeviceState
144 TimeStamp time.Time
145 groups sync.Map //map[uint32]*of.Group -> [GroupId : Group]
Naveen Sampath04696f72022-06-13 15:19:14 +0530146 tasks.Tasks
Naveen Sampath04696f72022-06-13 15:19:14 +0530147 portLock sync.RWMutex
Naveen Sampath04696f72022-06-13 15:19:14 +0530148 flowLock sync.RWMutex
Naveen Sampath04696f72022-06-13 15:19:14 +0530149 meterLock sync.RWMutex
Naveen Sampath04696f72022-06-13 15:19:14 +0530150 flowQueueLock sync.RWMutex
151 flowHash uint32
vinokuma926cb3e2023-03-29 11:41:06 +0530152 auditInProgress bool
Naveen Sampath04696f72022-06-13 15:19:14 +0530153 deviceAuditInProgress bool
Naveen Sampath04696f72022-06-13 15:19:14 +0530154}
155
156// NewDevice is the constructor for Device
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530157func NewDevice(cntx context.Context, id string, slno string, vclientHldr *holder.VolthaServiceClientHolder, southBoundID, mfr, hwDesc, swDesc string) *Device {
Naveen Sampath04696f72022-06-13 15:19:14 +0530158 var device Device
159 device.ID = id
160 device.SerialNum = slno
161 device.State = DeviceStateDOWN
162 device.PortsByID = make(map[uint32]*DevicePort)
163 device.PortsByName = make(map[string]*DevicePort)
164 device.vclientHolder = vclientHldr
165 device.flows = make(map[uint64]*of.VoltSubFlow)
166 device.meters = make(map[uint32]*of.Meter)
167 device.flowQueue = make(map[uint32]*UniIDFlowQueue)
vinokuma926cb3e2023-03-29 11:41:06 +0530168 // Get the flowhash from db and update the flowhash variable in the device.
Naveen Sampath04696f72022-06-13 15:19:14 +0530169 device.SouthBoundID = southBoundID
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530170 device.MfrDesc = mfr
171 device.HwDesc = hwDesc
172 device.SwDesc = swDesc
173 device.TimeStamp = time.Now()
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530174 flowHash, err := db.GetFlowHash(cntx, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530175 if err != nil {
176 device.flowHash = DefaultMaxFlowQueues
177 } else {
178 var hash uint32
179 err = json.Unmarshal([]byte(flowHash), &hash)
180 if err != nil {
Akash Soni6168f312023-05-18 20:57:33 +0530181 logger.Errorw(ctx, "Failed to unmarshall flowhash", log.Fields{"data": flowHash})
Naveen Sampath04696f72022-06-13 15:19:14 +0530182 } else {
183 device.flowHash = hash
184 }
185 }
186 logger.Infow(ctx, "Flow hash for device", log.Fields{"Deviceid": id, "hash": device.flowHash})
187 return &device
188}
189
190// ResetCache to reset cache
191func (d *Device) ResetCache() {
192 logger.Warnw(ctx, "Resetting flows, meters and groups cache", log.Fields{"Device": d.ID})
193 d.flows = make(map[uint64]*of.VoltSubFlow)
194 d.meters = make(map[uint32]*of.Meter)
195 d.groups = sync.Map{}
196}
197
198// GetFlow - Get the flow from device obj
199func (d *Device) GetFlow(cookie uint64) (*of.VoltSubFlow, bool) {
200 d.flowLock.RLock()
201 defer d.flowLock.RUnlock()
Naveen Sampath04696f72022-06-13 15:19:14 +0530202 flow, ok := d.flows[cookie]
Akash Sonief452f12024-12-12 18:20:28 +0530203 logger.Debugw(ctx, "Get Flow", log.Fields{"Cookie": cookie})
Naveen Sampath04696f72022-06-13 15:19:14 +0530204 return flow, ok
205}
206
Tinoj Josephec742f62022-09-29 19:11:10 +0530207// GetAllFlows - Get the flow from device obj
vinokuma926cb3e2023-03-29 11:41:06 +0530208func (d *Device) GetAllFlows() []*of.VoltSubFlow {
Tinoj Josephec742f62022-09-29 19:11:10 +0530209 d.flowLock.RLock()
210 defer d.flowLock.RUnlock()
211 var flows []*of.VoltSubFlow
Akash Soni6168f312023-05-18 20:57:33 +0530212 logger.Debugw(ctx, "Get All Flows", log.Fields{"deviceID": d.ID})
Tinoj Josephec742f62022-09-29 19:11:10 +0530213 for _, f := range d.flows {
214 flows = append(flows, f)
215 }
216 return flows
217}
218
219// GetAllPendingFlows - Get the flow from device obj
vinokuma926cb3e2023-03-29 11:41:06 +0530220func (d *Device) GetAllPendingFlows() []*of.VoltSubFlow {
Tinoj Josephec742f62022-09-29 19:11:10 +0530221 d.flowLock.RLock()
222 defer d.flowLock.RUnlock()
223 var flows []*of.VoltSubFlow
Akash Soni6168f312023-05-18 20:57:33 +0530224 logger.Debugw(ctx, "Get All Pending Flows", log.Fields{"deviceID": d.ID})
Tinoj Josephec742f62022-09-29 19:11:10 +0530225 for _, f := range d.flows {
226 if f.State == of.FlowAddPending {
227 flows = append(flows, f)
228 }
229 }
230 return flows
231}
232
Naveen Sampath04696f72022-06-13 15:19:14 +0530233// AddFlow - Adds the flow to the device and also to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530234func (d *Device) AddFlow(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530235 d.flowLock.Lock()
236 defer d.flowLock.Unlock()
Akash Soni6168f312023-05-18 20:57:33 +0530237 logger.Debugw(ctx, "AddFlow to device", log.Fields{"Cookie": flow.Cookie})
Akash Sonief452f12024-12-12 18:20:28 +0530238 if dbFlow, ok := d.flows[flow.Cookie]; ok {
239 // In case of ONU reboot after flow delete failure, try to install flow in the device by checking for previous flow state
240 if dbFlow.State != of.FlowDelFailure {
241 return errors.New(ErrDuplicateFlow)
242 }
Naveen Sampath04696f72022-06-13 15:19:14 +0530243 }
244 d.flows[flow.Cookie] = flow
Naveen Sampath04696f72022-06-13 15:19:14 +0530245 return nil
246}
247
248// AddFlowToDb is the utility to add the flow to the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530249func (d *Device) AddFlowToDb(cntx context.Context, flow *of.VoltSubFlow) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530250 if b, err := json.Marshal(flow); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530251 if err = db.PutFlow(cntx, d.ID, flow.Cookie, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530252 logger.Errorw(ctx, "Write Flow to DB failed", log.Fields{"device": d.ID, "cookie": flow.Cookie, "Reason": err})
253 }
254 }
255}
256
257// DelFlow - Deletes the flow from the device and the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530258func (d *Device) DelFlow(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530259 d.flowLock.Lock()
260 defer d.flowLock.Unlock()
261 if _, ok := d.flows[flow.Cookie]; ok {
262 delete(d.flows, flow.Cookie)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530263 d.DelFlowFromDb(cntx, flow.Cookie)
Naveen Sampath04696f72022-06-13 15:19:14 +0530264 return nil
265 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530266 return errors.New("flow does not exist")
Naveen Sampath04696f72022-06-13 15:19:14 +0530267}
268
269// DelFlowFromDb is utility to delete the flow from the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530270func (d *Device) DelFlowFromDb(cntx context.Context, flowID uint64) {
271 _ = db.DelFlow(cntx, d.ID, flowID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530272}
273
274// IsFlowPresentWithOldCookie is to check whether there is any flow with old cookie.
275func (d *Device) IsFlowPresentWithOldCookie(flow *of.VoltSubFlow) bool {
276 d.flowLock.RLock()
277 defer d.flowLock.RUnlock()
278 if _, ok := d.flows[flow.Cookie]; ok {
279 return false
280 } else if flow.OldCookie != 0 && flow.Cookie != flow.OldCookie {
281 if _, ok := d.flows[flow.OldCookie]; ok {
Akash Sonief452f12024-12-12 18:20:28 +0530282 logger.Warnw(ctx, "Flow present with old cookie", log.Fields{"OldCookie": flow.OldCookie})
Naveen Sampath04696f72022-06-13 15:19:14 +0530283 return true
284 }
285 }
286 return false
287}
288
289// DelFlowWithOldCookie is to delete flow with old cookie.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530290func (d *Device) DelFlowWithOldCookie(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530291 d.flowLock.Lock()
292 defer d.flowLock.Unlock()
293 if _, ok := d.flows[flow.OldCookie]; ok {
Akash Soni6168f312023-05-18 20:57:33 +0530294 logger.Debugw(ctx, "Flow was added before vgc upgrade. Trying to delete with old cookie",
Naveen Sampath04696f72022-06-13 15:19:14 +0530295 log.Fields{"OldCookie": flow.OldCookie})
296 delete(d.flows, flow.OldCookie)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530297 d.DelFlowFromDb(cntx, flow.OldCookie)
Naveen Sampath04696f72022-06-13 15:19:14 +0530298 return nil
299 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530300 return errors.New("flow does not exist")
Naveen Sampath04696f72022-06-13 15:19:14 +0530301}
302
303// RestoreFlowsFromDb to restore flows from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530304func (d *Device) RestoreFlowsFromDb(cntx context.Context) {
305 flows, _ := db.GetFlows(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530306 for _, flow := range flows {
307 b, ok := flow.Value.([]byte)
308 if !ok {
309 logger.Warn(ctx, "The value type is not []byte")
310 continue
311 }
312 d.CreateFlowFromString(b)
313 }
314}
315
316// CreateFlowFromString to create flow from string
317func (d *Device) CreateFlowFromString(b []byte) {
318 var flow of.VoltSubFlow
319 if err := json.Unmarshal(b, &flow); err == nil {
320 if _, ok := d.flows[flow.Cookie]; !ok {
321 logger.Debugw(ctx, "Adding Flow From Db", log.Fields{"Cookie": flow.Cookie})
322 d.flows[flow.Cookie] = &flow
323 } else {
324 logger.Warnw(ctx, "Duplicate Flow", log.Fields{"Cookie": flow.Cookie})
325 }
326 } else {
327 logger.Warn(ctx, "Unmarshal failed")
328 }
329}
330
331// ----------------------------------------------------------
332// Database related functionality
333// Group operations at the device which include update and delete
334
335// UpdateGroupEntry - Adds/Updates the group to the device and also to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530336func (d *Device) UpdateGroupEntry(cntx context.Context, group *of.Group) {
Akash Soni6168f312023-05-18 20:57:33 +0530337 logger.Debugw(ctx, "Update Group to device", log.Fields{"ID": group.GroupID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530338 d.groups.Store(group.GroupID, group)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530339 d.AddGroupToDb(cntx, group)
Naveen Sampath04696f72022-06-13 15:19:14 +0530340}
341
342// AddGroupToDb - Utility to add the group to the device DB
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530343func (d *Device) AddGroupToDb(cntx context.Context, group *of.Group) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530344 if b, err := json.Marshal(group); err == nil {
Akash Soni6168f312023-05-18 20:57:33 +0530345 logger.Debugw(ctx, "Adding Group to DB", log.Fields{"grp": group, "Json": string(b)})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530346 if err = db.PutGroup(cntx, d.ID, group.GroupID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530347 logger.Errorw(ctx, "Write Group to DB failed", log.Fields{"device": d.ID, "groupID": group.GroupID, "Reason": err})
348 }
349 }
350}
351
352// DelGroupEntry - Deletes the group from the device and the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530353func (d *Device) DelGroupEntry(cntx context.Context, group *of.Group) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530354 if _, ok := d.groups.Load(group.GroupID); ok {
355 d.groups.Delete(group.GroupID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530356 d.DelGroupFromDb(cntx, group.GroupID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530357 }
358}
359
360// DelGroupFromDb - Utility to delete the Group from the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530361func (d *Device) DelGroupFromDb(cntx context.Context, groupID uint32) {
362 _ = db.DelGroup(cntx, d.ID, groupID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530363}
364
vinokuma926cb3e2023-03-29 11:41:06 +0530365// RestoreGroupsFromDb - restores all groups from DB
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530366func (d *Device) RestoreGroupsFromDb(cntx context.Context) {
balaji.nagarajan182b64f2025-09-04 11:25:17 +0530367 logger.Debug(ctx, "Restoring Groups")
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530368 groups, _ := db.GetGroups(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530369 for _, group := range groups {
370 b, ok := group.Value.([]byte)
371 if !ok {
372 logger.Warn(ctx, "The value type is not []byte")
373 continue
374 }
375 d.CreateGroupFromString(b)
376 }
377}
378
vinokuma926cb3e2023-03-29 11:41:06 +0530379// CreateGroupFromString - Forms group struct from json string
Naveen Sampath04696f72022-06-13 15:19:14 +0530380func (d *Device) CreateGroupFromString(b []byte) {
381 var group of.Group
382 if err := json.Unmarshal(b, &group); err == nil {
383 if _, ok := d.groups.Load(group.GroupID); !ok {
384 logger.Debugw(ctx, "Adding Group From Db", log.Fields{"GroupId": group.GroupID})
385 d.groups.Store(group.GroupID, &group)
386 } else {
387 logger.Warnw(ctx, "Duplicate Group", log.Fields{"GroupId": group.GroupID})
388 }
389 } else {
390 logger.Warn(ctx, "Unmarshal failed")
391 }
392}
393
394// AddMeter to add meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530395func (d *Device) AddMeter(cntx context.Context, meter *of.Meter) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530396 d.meterLock.Lock()
397 defer d.meterLock.Unlock()
398 if _, ok := d.meters[meter.ID]; ok {
Akash Sonid36d23b2023-08-18 12:51:40 +0530399 return errors.New("duplicate meter")
Naveen Sampath04696f72022-06-13 15:19:14 +0530400 }
401 d.meters[meter.ID] = meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530402 go d.AddMeterToDb(cntx, meter)
Naveen Sampath04696f72022-06-13 15:19:14 +0530403 return nil
404}
405
Sridhar Ravindra2d2ef4e2023-02-08 16:43:38 +0530406// UpdateMeter to update meter
407func (d *Device) UpdateMeter(cntx context.Context, meter *of.Meter) error {
vinokuma926cb3e2023-03-29 11:41:06 +0530408 d.meterLock.Lock()
409 defer d.meterLock.Unlock()
410 if _, ok := d.meters[meter.ID]; ok {
411 d.meters[meter.ID] = meter
412 d.AddMeterToDb(cntx, meter)
413 } else {
Akash Sonid36d23b2023-08-18 12:51:40 +0530414 return errors.New("meter not found for updation")
vinokuma926cb3e2023-03-29 11:41:06 +0530415 }
416 return nil
Sridhar Ravindra2d2ef4e2023-02-08 16:43:38 +0530417}
418
Naveen Sampath04696f72022-06-13 15:19:14 +0530419// GetMeter to get meter
420func (d *Device) GetMeter(id uint32) (*of.Meter, error) {
421 d.meterLock.RLock()
422 defer d.meterLock.RUnlock()
423 if m, ok := d.meters[id]; ok {
424 return m, nil
425 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530426 return nil, errors.New("meter not found")
Naveen Sampath04696f72022-06-13 15:19:14 +0530427}
428
429// DelMeter to delete meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530430func (d *Device) DelMeter(cntx context.Context, meter *of.Meter) bool {
Naveen Sampath04696f72022-06-13 15:19:14 +0530431 d.meterLock.Lock()
432 defer d.meterLock.Unlock()
433 if _, ok := d.meters[meter.ID]; ok {
434 delete(d.meters, meter.ID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530435 go d.DelMeterFromDb(cntx, meter.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530436 return true
437 }
438 return false
439}
440
441// AddMeterToDb is utility to add the Group to the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530442func (d *Device) AddMeterToDb(cntx context.Context, meter *of.Meter) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530443 if b, err := json.Marshal(meter); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530444 if err = db.PutDeviceMeter(cntx, d.ID, meter.ID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530445 logger.Errorw(ctx, "Write Meter to DB failed", log.Fields{"device": d.ID, "meterID": meter.ID, "Reason": err})
446 }
447 }
448}
449
450// DelMeterFromDb to delete meter from db
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530451func (d *Device) DelMeterFromDb(cntx context.Context, id uint32) {
452 _ = db.DelDeviceMeter(cntx, d.ID, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530453}
454
455// RestoreMetersFromDb to restore meters from db
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530456func (d *Device) RestoreMetersFromDb(cntx context.Context) {
457 meters, _ := db.GetDeviceMeters(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530458 for _, meter := range meters {
459 b, ok := meter.Value.([]byte)
460 if !ok {
461 logger.Warn(ctx, "The value type is not []byte")
462 continue
463 }
464 d.CreateMeterFromString(b)
465 }
466}
467
468// CreateMeterFromString to create meter from string
469func (d *Device) CreateMeterFromString(b []byte) {
470 var meter of.Meter
471 if err := json.Unmarshal(b, &meter); err == nil {
472 if _, ok := d.meters[meter.ID]; !ok {
473 logger.Debugw(ctx, "Adding Meter From Db", log.Fields{"ID": meter.ID})
474 d.meters[meter.ID] = &meter
475 } else {
476 logger.Warnw(ctx, "Duplicate Meter", log.Fields{"ID": meter.ID})
477 }
478 } else {
Akash Soni6168f312023-05-18 20:57:33 +0530479 logger.Warnw(ctx, "Unmarshal failed", log.Fields{"error": err, "meter": string(b)})
Naveen Sampath04696f72022-06-13 15:19:14 +0530480 }
481}
482
483// VolthaClient to get voltha client
484func (d *Device) VolthaClient() voltha.VolthaServiceClient {
485 return d.vclientHolder.Get()
486}
487
488// AddPort to add the port as requested by the device/VOLTHA
489// Inform the application if the port is successfully added
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530490func (d *Device) AddPort(cntx context.Context, mp *ofp.OfpPort) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530491 d.portLock.Lock()
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530492 id := mp.PortNo
493 name := mp.Name
Naveen Sampath04696f72022-06-13 15:19:14 +0530494 if _, ok := d.PortsByID[id]; ok {
Akash Sonief452f12024-12-12 18:20:28 +0530495 d.portLock.Unlock()
Akash Sonid36d23b2023-08-18 12:51:40 +0530496 return errors.New(Duplicate_Port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530497 }
498 if _, ok := d.PortsByName[name]; ok {
Akash Sonief452f12024-12-12 18:20:28 +0530499 d.portLock.Unlock()
Akash Sonid36d23b2023-08-18 12:51:40 +0530500 return errors.New(Duplicate_Port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530501 }
502
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530503 p := NewDevicePort(mp)
Naveen Sampath04696f72022-06-13 15:19:14 +0530504 d.PortsByID[id] = p
505 d.PortsByName[name] = p
Akash Sonief452f12024-12-12 18:20:28 +0530506 d.portLock.Unlock()
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530507 GetController().PortAddInd(cntx, d.ID, p.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530508 logger.Infow(ctx, "Added Port", log.Fields{"Device": d.ID, "Port": id})
509 return nil
510}
511
512// DelPort to delete the port as requested by the device/VOLTHA
513// Inform the application if the port is successfully deleted
Sridhar Ravindra0bc5dc52023-12-13 19:03:30 +0530514func (d *Device) DelPort(cntx context.Context, id uint32, portName string) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530515 p := d.GetPortByID(id)
516 if p == nil {
Sridhar Ravindra0bc5dc52023-12-13 19:03:30 +0530517 p = d.GetPortByName(portName)
518 if p == nil {
519 return errors.New("unknown port")
520 } else {
balaji.nagarajan182b64f2025-09-04 11:25:17 +0530521 logger.Debugw(ctx, "Found port by name", log.Fields{"PortName": p.Name, "PortID": p.ID})
Sridhar Ravindra0bc5dc52023-12-13 19:03:30 +0530522 }
Naveen Sampath04696f72022-06-13 15:19:14 +0530523 }
524 if p.State == PortStateUp {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530525 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530526 }
Tinoj Joseph4ead4e02023-01-30 03:12:44 +0530527 GetController().PortDelInd(cntx, d.ID, p.Name)
528
Naveen Sampath04696f72022-06-13 15:19:14 +0530529 d.portLock.Lock()
530 defer d.portLock.Unlock()
531
Naveen Sampath04696f72022-06-13 15:19:14 +0530532 delete(d.PortsByID, p.ID)
533 delete(d.PortsByName, p.Name)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530534 d.DelPortFromDb(cntx, p.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530535 logger.Infow(ctx, "Deleted Port", log.Fields{"Device": d.ID, "Port": id})
536 return nil
537}
538
Akash Sonief452f12024-12-12 18:20:28 +0530539// CheckAndDeletePort deletes the port if the port name matches with VGC and one sent from voltha in OFPPR_DELETE
540func (d *Device) CheckAndDeletePort(cntx context.Context, portNo uint32, portName string) {
541 if p := d.GetPortByID(portNo); p != nil {
542 if p.Name != portName {
543 logger.Warnw(ctx, "Dropping Del Port event: Port name mismatch", log.Fields{"vgcPortName": p.Name, "ofpPortName": portName, "ID": p.ID})
544 return
545 }
546 if err := d.DelPort(cntx, portNo, portName); err != nil {
547 logger.Warnw(ctx, "DelPort Failed", log.Fields{"Port No": portNo, "Error": err})
548 }
549 }
550}
551
Naveen Sampath04696f72022-06-13 15:19:14 +0530552// UpdatePortByName is utility to update the port by Name
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530553func (d *Device) UpdatePortByName(cntx context.Context, name string, port uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530554 d.portLock.Lock()
555 defer d.portLock.Unlock()
556
557 p, ok := d.PortsByName[name]
558 if !ok {
559 return
560 }
561 delete(d.PortsByID, p.ID)
562 p.ID = port
563 d.PortsByID[port] = p
Naveen Sampath04696f72022-06-13 15:19:14 +0530564 GetController().PortUpdateInd(d.ID, p.Name, p.ID)
565 logger.Infow(ctx, "Updated Port", log.Fields{"Device": d.ID, "Port": p.ID, "PortName": name})
566}
567
568// GetPortName to get the name of the port by its id
569func (d *Device) GetPortName(id uint32) (string, error) {
570 d.portLock.RLock()
571 defer d.portLock.RUnlock()
572
573 if p, ok := d.PortsByID[id]; ok {
574 return p.Name, nil
575 }
576 logger.Errorw(ctx, "Port not found", log.Fields{"port": id})
Akash Sonid36d23b2023-08-18 12:51:40 +0530577 return "", errors.New(Unknown_Port_ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530578}
579
580// GetPortByID is utility to retrieve the port by ID
581func (d *Device) GetPortByID(id uint32) *DevicePort {
582 d.portLock.RLock()
583 defer d.portLock.RUnlock()
584
585 p, ok := d.PortsByID[id]
586 if ok {
587 return p
588 }
589 return nil
590}
591
592// GetPortByName is utility to retrieve the port by Name
593func (d *Device) GetPortByName(name string) *DevicePort {
594 d.portLock.RLock()
595 defer d.portLock.RUnlock()
596
597 p, ok := d.PortsByName[name]
598 if ok {
599 return p
600 }
601 return nil
602}
603
604// GetPortState to get the state of the port by name
605func (d *Device) GetPortState(name string) (PortState, error) {
606 d.portLock.RLock()
607 defer d.portLock.RUnlock()
608
609 if p, ok := d.PortsByName[name]; ok {
610 return p.State, nil
611 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530612 return PortStateDown, errors.New(Unknown_Port_ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530613}
614
615// GetPortID to get the port-id by the port name
616func (d *Device) GetPortID(name string) (uint32, error) {
617 d.portLock.RLock()
618 defer d.portLock.RUnlock()
619
620 if p, ok := d.PortsByName[name]; ok {
621 return p.ID, nil
622 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530623 return 0, errors.New(Unknown_Port_ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530624}
625
626// WritePortToDb to add the port to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530627func (d *Device) WritePortToDb(ctx context.Context, port *DevicePort) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530628 port.Version = database.PresentVersionMap[database.DevicePortPath]
629 if b, err := json.Marshal(port); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530630 if err = db.PutPort(ctx, d.ID, port.ID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530631 logger.Errorw(ctx, "Write port to DB failed", log.Fields{"device": d.ID, "port": port.ID, "Reason": err})
632 }
633 }
634}
635
636// DelPortFromDb to delete port from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530637func (d *Device) DelPortFromDb(cntx context.Context, id uint32) {
638 _ = db.DelPort(cntx, d.ID, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530639}
640
641// RestorePortsFromDb to restore ports from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530642func (d *Device) RestorePortsFromDb(cntx context.Context) {
643 ports, _ := db.GetPorts(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530644 for _, port := range ports {
645 b, ok := port.Value.([]byte)
646 if !ok {
647 logger.Warn(ctx, "The value type is not []byte")
648 continue
649 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530650 d.CreatePortFromString(cntx, b)
Naveen Sampath04696f72022-06-13 15:19:14 +0530651 }
652}
653
654// CreatePortFromString to create port from string
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530655func (d *Device) CreatePortFromString(cntx context.Context, b []byte) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530656 var port DevicePort
657 if err := json.Unmarshal(b, &port); err == nil {
658 if _, ok := d.PortsByID[port.ID]; !ok {
659 logger.Debugw(ctx, "Adding Port From Db", log.Fields{"ID": port.ID})
660 d.PortsByID[port.ID] = &port
661 d.PortsByName[port.Name] = &port
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530662 GetController().PortAddInd(cntx, d.ID, port.ID, port.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530663 } else {
Akash Sonid36d23b2023-08-18 12:51:40 +0530664 logger.Warnw(ctx, Duplicate_Port, log.Fields{"ID": port.ID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530665 }
666 } else {
Akash Soni6168f312023-05-18 20:57:33 +0530667 logger.Warnw(ctx, "Unmarshal failed", log.Fields{"port": string(b)})
Naveen Sampath04696f72022-06-13 15:19:14 +0530668 }
669}
670
671// Delete : OLT Delete functionality yet to be implemented. IDeally all of the
672// resources should have been removed by this time. It is an error
673// scenario if the OLT has resources associated with it.
674func (d *Device) Delete() {
675 d.StopAll()
676}
677
678// Stop to stop the task
679func (d *Device) Stop() {
680}
681
682// ConnectInd is called when the connection between VGC and the VOLTHA is
683// restored. This will perform audit of the device post reconnection
684func (d *Device) ConnectInd(ctx context.Context, discType intf.DiscoveryType) {
685 logger.Warnw(ctx, "Audit Upon Connection Establishment", log.Fields{"Device": d.ID, "State": d.State})
686 ctx1, cancel := context.WithCancel(ctx)
687 d.cancel = cancel
688 d.ctx = ctx1
689 d.Tasks.Initialize(ctx1)
690
Akash Soni6168f312023-05-18 20:57:33 +0530691 logger.Debugw(ctx, "Device State change Ind: UP", log.Fields{"Device": d.ID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530692 d.State = DeviceStateUP
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530693 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530694 GetController().DeviceUpInd(d.ID)
695
Akash Soni6168f312023-05-18 20:57:33 +0530696 logger.Debugw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530697 t := NewAuditDevice(d, AuditEventDeviceDisc)
Sridhar Ravindra64b19ca2026-01-26 22:19:07 +0530698 // During VGC restart or when a device is added. skip pushing flows to voltha during audit device task
699 // When device is added, if required the flows will get pushed during the next audit table task called soon after this audit device task
700 if discType == intf.DeviceDisc {
701 t.skipFlowOnRestart = true
702 }
703
Naveen Sampath04696f72022-06-13 15:19:14 +0530704 d.Tasks.AddTask(t)
705
706 t1 := NewAuditTablesTask(d)
707 d.Tasks.AddTask(t1)
708
709 t2 := NewPendingProfilesTask(d)
710 d.Tasks.AddTask(t2)
711
712 go d.synchronizeDeviceTables()
713}
714
715func (d *Device) synchronizeDeviceTables() {
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530716 tick := time.NewTicker(GetController().GetDeviceTableSyncDuration())
Naveen Sampath04696f72022-06-13 15:19:14 +0530717loop:
718 for {
719 select {
720 case <-d.ctx.Done():
vinokuma926cb3e2023-03-29 11:41:06 +0530721 logger.Warnw(d.ctx, "Context Done. Canceling Periodic Audit", log.Fields{"Context": ctx, "Device": d.ID, "DeviceSerialNum": d.SerialNum})
Naveen Sampath04696f72022-06-13 15:19:14 +0530722 break loop
723 case <-tick.C:
724 t1 := NewAuditTablesTask(d)
725 d.Tasks.AddTask(t1)
726 }
727 }
728 tick.Stop()
729}
730
731// DeviceUpInd is called when the logical device state changes to UP. This will perform audit of the device post reconnection
732func (d *Device) DeviceUpInd() {
733 logger.Warnw(ctx, "Device State change Ind: UP", log.Fields{"Device": d.ID})
734 d.State = DeviceStateUP
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530735 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530736 GetController().DeviceUpInd(d.ID)
737
738 logger.Warnw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
739 t := NewAuditDevice(d, AuditEventDeviceDisc)
740 d.Tasks.AddTask(t)
741
742 t1 := NewAuditTablesTask(d)
743 d.Tasks.AddTask(t1)
744
745 t2 := NewPendingProfilesTask(d)
746 d.Tasks.AddTask(t2)
747}
748
749// DeviceDownInd is called when the logical device state changes to Down.
750func (d *Device) DeviceDownInd() {
751 logger.Warnw(ctx, "Device State change Ind: Down", log.Fields{"Device": d.ID})
752 d.State = DeviceStateDOWN
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530753 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530754 GetController().DeviceDownInd(d.ID)
755}
756
757// DeviceRebootInd is called when the logical device is rebooted.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530758func (d *Device) DeviceRebootInd(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530759 logger.Warnw(ctx, "Device State change Ind: Rebooted", log.Fields{"Device": d.ID})
760
761 if d.State == DeviceStateREBOOTED {
762 d.State = DeviceStateREBOOTED
763 logger.Warnw(ctx, "Ignoring Device State change Ind: REBOOT, Device Already in REBOOT state", log.Fields{"Device": d.ID, "SeralNo": d.SerialNum})
764 return
765 }
766
767 d.State = DeviceStateREBOOTED
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530768 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530769 GetController().SetRebootInProgressForDevice(d.ID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530770 GetController().DeviceRebootInd(cntx, d.ID, d.SerialNum, d.SouthBoundID)
771 d.ReSetAllPortStates(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +0530772}
773
774// DeviceDisabledInd is called when the logical device is disabled
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530775func (d *Device) DeviceDisabledInd(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530776 logger.Warnw(ctx, "Device State change Ind: Disabled", log.Fields{"Device": d.ID})
777 d.State = DeviceStateDISABLED
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530778 d.TimeStamp = time.Now()
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530779 GetController().DeviceDisableInd(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530780}
781
vinokuma926cb3e2023-03-29 11:41:06 +0530782// ReSetAllPortStates - Set all logical device port status to DOWN
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530783func (d *Device) ReSetAllPortStates(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530784 logger.Warnw(ctx, "Resetting all Ports State to DOWN", log.Fields{"Device": d.ID, "State": d.State})
785
786 d.portLock.Lock()
787 defer d.portLock.Unlock()
788
789 for _, port := range d.PortsByID {
790 if port.State != PortStateDown {
balaji.nagarajan182b64f2025-09-04 11:25:17 +0530791 logger.Debugw(ctx, "Resetting Port State to DOWN", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530792 GetController().PortDownInd(cntx, d.ID, port.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530793 port.State = PortStateDown
Naveen Sampath04696f72022-06-13 15:19:14 +0530794 }
795 }
796}
797
vinokuma926cb3e2023-03-29 11:41:06 +0530798// ReSetAllPortStatesInDb - Set all logical device port status to DOWN in DB and skip indication to application
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530799func (d *Device) ReSetAllPortStatesInDb(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530800 logger.Warnw(ctx, "Resetting all Ports State to DOWN In DB", log.Fields{"Device": d.ID, "State": d.State})
801
802 d.portLock.Lock()
803 defer d.portLock.Unlock()
804
805 for _, port := range d.PortsByID {
806 if port.State != PortStateDown {
Akash Soni6168f312023-05-18 20:57:33 +0530807 logger.Debugw(ctx, "Resetting Port State to DOWN and Write to DB", log.Fields{"Device": d.ID, "Port": port})
Naveen Sampath04696f72022-06-13 15:19:14 +0530808 port.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530809 d.WritePortToDb(cntx, port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530810 }
811 }
812}
813
814// ProcessPortUpdate deals with the change in port id (ONU movement) and taking action
815// to update only when the port state is DOWN
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530816func (d *Device) ProcessPortUpdate(cntx context.Context, portName string, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530817 if p := d.GetPortByName(portName); p != nil {
818 if p.ID != port {
Akash Sonief452f12024-12-12 18:20:28 +0530819 logger.Warnw(ctx, "Port update indication received with mismatching ID", log.Fields{"Port": p.Name, "Old PortID": p.ID, "New Port ID": port})
820 return
821 //Do not process port update received from change event, as we will only handle port updates during polling
Naveen Sampath04696f72022-06-13 15:19:14 +0530822 }
Sridhar Ravindra64b19ca2026-01-26 22:19:07 +0530823 d.ProcessPortState(cntx, port, state, portName, false)
Naveen Sampath04696f72022-06-13 15:19:14 +0530824 }
825}
826
827// ***Operations Performed on Port state Transitions***
828//
829// |-----------------------------------------------------------------------------|
830// | State | Action |
831// |--------------------|--------------------------------------------------------|
832// | UP | UNI - Trigger Flow addition for service configured |
833// | | NNI - Trigger Flow addition for vnets & mvlan profiles |
834// | | |
835// | DOWN | UNI - Trigger Flow deletion for service configured |
836// | | NNI - Trigger Flow deletion for vnets & mvlan profiles |
837// | | |
838// |-----------------------------------------------------------------------------|
839//
840
841// ProcessPortState deals with the change in port status and taking action
842// based on the new state and the old state
Sridhar Ravindra64b19ca2026-01-26 22:19:07 +0530843func (d *Device) ProcessPortState(cntx context.Context, port uint32, state uint32, portName string, skipFlowPushToVoltha bool) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530844 if d.State != DeviceStateUP && !util.IsNniPort(port) {
845 logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
846 return
847 }
848 if p := d.GetPortByID(port); p != nil {
balaji.nagarajan182b64f2025-09-04 11:25:17 +0530849 logger.Infow(ctx, "Port State Processing", log.Fields{"Received": state, "Current": p.State, "port": port, "portName": portName, "Device": d.ID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530850
Akash Sonief452f12024-12-12 18:20:28 +0530851 if p.Name != portName {
852 logger.Warnw(ctx, "Dropping Port State processing: Port name does not match", log.Fields{"vgcPort": p.Name, "ofpPort": portName, "ID": port})
853 return
854 }
Naveen Sampath04696f72022-06-13 15:19:14 +0530855 // Avoid blind initialization as the current tasks in the queue will be lost
856 // Eg: Service Del followed by Port Down - The flows will be dangling
857 // Eg: NNI Down followed by NNI UP - Mcast data flows will be dangling
858 p.Tasks.CheckAndInitialize(d.ctx)
859 if state == uint32(ofp.OfpPortState_OFPPS_LIVE) && p.State == PortStateDown {
860 // Transition from DOWN to UP
balaji.nagarajan182b64f2025-09-04 11:25:17 +0530861 logger.Debugw(ctx, "Port State Change to UP", log.Fields{"Device": d.ID, "Port": port})
Sridhar Ravindra64b19ca2026-01-26 22:19:07 +0530862 GetController().PortUpInd(cntx, d.ID, p.Name, skipFlowPushToVoltha)
Naveen Sampath04696f72022-06-13 15:19:14 +0530863 p.State = PortStateUp
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530864 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530865 } else if (state != uint32(ofp.OfpPortState_OFPPS_LIVE)) && (p.State != PortStateDown) {
866 // Transition from UP to Down
balaji.nagarajan182b64f2025-09-04 11:25:17 +0530867 logger.Debugw(ctx, "Port State Change to Down", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530868 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530869 p.State = PortStateDown
Naveen Sampath04696f72022-06-13 15:19:14 +0530870 } else {
871 logger.Warnw(ctx, "Dropping Port Ind: No Change in Port State", log.Fields{"PortName": p.Name, "ID": port, "Device": d.ID, "PortState": p.State, "IncomingState": state})
872 }
873 }
874}
875
876// ProcessPortStateAfterReboot - triggers the port state indication to sort out configu mismatch due to reboot
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530877func (d *Device) ProcessPortStateAfterReboot(cntx context.Context, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530878 if d.State != DeviceStateUP && !util.IsNniPort(port) {
879 logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
880 return
881 }
882 if p := d.GetPortByID(port); p != nil {
balaji.nagarajan182b64f2025-09-04 11:25:17 +0530883 logger.Infow(ctx, "Port State Processing after Reboot", log.Fields{"Received": state, "Current": p.State, "port": port, "Device": d.ID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530884 p.Tasks.Initialize(d.ctx)
mgoudabb017dc2025-10-29 19:53:34 +0530885 switch p.State {
886 case PortStateUp:
balaji.nagarajan182b64f2025-09-04 11:25:17 +0530887 logger.Debugw(ctx, "Port State: UP", log.Fields{"Device": d.ID, "Port": port})
Sridhar Ravindra64b19ca2026-01-26 22:19:07 +0530888 GetController().PortUpInd(cntx, d.ID, p.Name, false)
mgoudabb017dc2025-10-29 19:53:34 +0530889 case PortStateDown:
balaji.nagarajan182b64f2025-09-04 11:25:17 +0530890 logger.Debugw(ctx, "Port State: Down", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530891 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530892 }
893 }
894}
895
896// ChangeEvent : Change event brings in ports related changes such as addition/deletion
897// or modification where the port status change up/down is indicated to the
898// controller
899func (d *Device) ChangeEvent(event *ofp.ChangeEvent) error {
900 cet := NewChangeEventTask(d.ctx, event, d)
901 d.AddTask(cet)
902 return nil
903}
904
905// PacketIn handle the incoming packet-in and deliver to the application for the
906// actual processing
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530907func (d *Device) PacketIn(cntx context.Context, pkt *ofp.PacketIn) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530908 logger.Debugw(ctx, "Received a Packet-In", log.Fields{"Device": d.ID})
909 if pkt.PacketIn.Reason != ofp.OfpPacketInReason_OFPR_ACTION {
910 logger.Warnw(ctx, "Unsupported PacketIn Reason", log.Fields{"Reason": pkt.PacketIn.Reason})
911 return
912 }
913 data := pkt.PacketIn.Data
914 port := PacketInGetPort(pkt.PacketIn)
915 if pName, err := d.GetPortName(port); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530916 GetController().PacketInInd(cntx, d.ID, pName, data)
Naveen Sampath04696f72022-06-13 15:19:14 +0530917 } else {
918 logger.Warnw(ctx, "Unknown Port", log.Fields{"Reason": err.Error()})
919 }
920}
921
922// PacketInGetPort to get the port on which the packet-in is reported
923func PacketInGetPort(pkt *ofp.OfpPacketIn) uint32 {
924 for _, field := range pkt.Match.OxmFields {
925 if field.OxmClass == ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
926 if ofbField, ok := field.Field.(*ofp.OfpOxmField_OfbField); ok {
927 if ofbField.OfbField.Type == ofp.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT {
928 if port, ok := ofbField.OfbField.Value.(*ofp.OfpOxmOfbField_Port); ok {
929 return port.Port
930 }
931 }
932 }
933 }
934 }
935 return 0
936}
937
938// PacketOutReq receives the packet out request from the application via the
939// controller. The interface from the application uses name as the identity.
940func (d *Device) PacketOutReq(outport string, inport string, data []byte, isCustomPkt bool) error {
941 inp, err := d.GetPortID(inport)
942 if err != nil {
Akash Sonid36d23b2023-08-18 12:51:40 +0530943 return errors.New("unknown inport")
Naveen Sampath04696f72022-06-13 15:19:14 +0530944 }
945 outp, err1 := d.GetPortID(outport)
946 if err1 != nil {
Akash Sonid36d23b2023-08-18 12:51:40 +0530947 return errors.New("unknown outport")
Naveen Sampath04696f72022-06-13 15:19:14 +0530948 }
949 logger.Debugw(ctx, "Sending packet out", log.Fields{"Device": d.ID, "Inport": inport, "Outport": outport})
950 return d.SendPacketOut(outp, inp, data, isCustomPkt)
951}
952
953// SendPacketOut is responsible for building the OF structure and send the
954// packet-out to the VOLTHA
955func (d *Device) SendPacketOut(outport uint32, inport uint32, data []byte, isCustomPkt bool) error {
956 pout := &ofp.PacketOut{}
957 pout.Id = d.ID
958 opout := &ofp.OfpPacketOut{}
959 pout.PacketOut = opout
960 opout.InPort = inport
961 opout.Data = data
962 opout.Actions = []*ofp.OfpAction{
963 {
964 Type: ofp.OfpActionType_OFPAT_OUTPUT,
965 Action: &ofp.OfpAction_Output{
966 Output: &ofp.OfpActionOutput{
967 Port: outport,
968 MaxLen: 65535,
969 },
970 },
971 },
972 }
973 d.packetOutChannel <- pout
974 return nil
975}
976
977// UpdateFlows receives the flows in the form that is implemented
978// in the VGC and transforms them to the OF format. This is handled
979// as a port of the task that is enqueued to do the same.
980func (d *Device) UpdateFlows(flow *of.VoltFlow, devPort *DevicePort) {
981 t := NewAddFlowsTask(d.ctx, flow, d)
982 logger.Debugw(ctx, "Port Context", log.Fields{"Ctx": devPort.GetContext()})
983 // check if port isNni , if yes flows will be added to device port queues.
984 if util.IsNniPort(devPort.ID) {
985 // Adding the flows to device port queues.
986 devPort.AddTask(t)
987 return
988 }
989 // If the flowHash is enabled then add the flows to the flowhash generated queues.
990 flowQueue := d.getAndAddFlowQueueForUniID(uint32(devPort.ID))
991 if flowQueue != nil {
992 logger.Debugw(ctx, "flowHashQId", log.Fields{"uniid": devPort.ID, "flowhash": flowQueue.ID})
993 flowQueue.AddTask(t)
994 logger.Debugw(ctx, "Tasks Info", log.Fields{"uniid": devPort.ID, "flowhash": flowQueue.ID, "Total": flowQueue.TotalTasks(), "Pending": flowQueue.NumPendingTasks()})
995 } else {
996 //FlowThrotling disabled, add to the device port queue
997 devPort.AddTask(t)
998 return
999 }
1000}
1001
1002// UpdateGroup to update group info
1003func (d *Device) UpdateGroup(group *of.Group, devPort *DevicePort) {
1004 task := NewModGroupTask(d.ctx, group, d)
1005 logger.Debugw(ctx, "NNI Port Context", log.Fields{"Ctx": devPort.GetContext()})
1006 devPort.AddTask(task)
1007}
1008
1009// ModMeter for mod meter task
1010func (d *Device) ModMeter(command of.MeterCommand, meter *of.Meter, devPort *DevicePort) {
1011 if command == of.MeterCommandAdd {
1012 if _, err := d.GetMeter(meter.ID); err == nil {
1013 logger.Debugw(ctx, "Meter already added", log.Fields{"ID": meter.ID})
1014 return
1015 }
1016 }
1017 t := NewModMeterTask(d.ctx, command, meter, d)
1018 devPort.AddTask(t)
1019}
1020
1021func (d *Device) getAndAddFlowQueueForUniID(id uint32) *UniIDFlowQueue {
1022 d.flowQueueLock.RLock()
vinokuma926cb3e2023-03-29 11:41:06 +05301023 // If flowhash is 0 that means flowhash throttling is disabled, return nil
Naveen Sampath04696f72022-06-13 15:19:14 +05301024 if d.flowHash == 0 {
1025 d.flowQueueLock.RUnlock()
1026 return nil
1027 }
1028 flowHashID := id % uint32(d.flowHash)
1029 if value, found := d.flowQueue[uint32(flowHashID)]; found {
1030 d.flowQueueLock.RUnlock()
1031 return value
1032 }
1033 d.flowQueueLock.RUnlock()
1034 logger.Debugw(ctx, "Flow queue not found creating one", log.Fields{"uniid": id, "hash": flowHashID})
1035
1036 return d.addFlowQueueForUniID(id)
1037}
1038
1039func (d *Device) addFlowQueueForUniID(id uint32) *UniIDFlowQueue {
Naveen Sampath04696f72022-06-13 15:19:14 +05301040 d.flowQueueLock.Lock()
1041 defer d.flowQueueLock.Unlock()
1042 flowHashID := id % uint32(d.flowHash)
1043 flowQueue := NewUniIDFlowQueue(uint32(flowHashID))
1044 flowQueue.Tasks.Initialize(d.ctx)
1045 d.flowQueue[flowHashID] = flowQueue
1046 return flowQueue
1047}
1048
1049// SetFlowHash sets the device flow hash and writes to the DB.
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301050func (d *Device) SetFlowHash(cntx context.Context, hash uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301051 d.flowQueueLock.Lock()
1052 defer d.flowQueueLock.Unlock()
1053
1054 d.flowHash = hash
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301055 d.writeFlowHashToDB(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +05301056}
1057
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301058func (d *Device) writeFlowHashToDB(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301059 hash, err := json.Marshal(d.flowHash)
1060 if err != nil {
Akash Soni6168f312023-05-18 20:57:33 +05301061 logger.Errorw(ctx, "failed to marshal flow hash", log.Fields{"hash": d.flowHash, "error": err})
Naveen Sampath04696f72022-06-13 15:19:14 +05301062 return
1063 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301064 if err := db.PutFlowHash(cntx, d.ID, string(hash)); err != nil {
Akash Soni6168f312023-05-18 20:57:33 +05301065 logger.Errorw(ctx, "Failed to add flow hash to DB", log.Fields{"device": d.ID, "hash": d.flowHash, "error": err})
Naveen Sampath04696f72022-06-13 15:19:14 +05301066 }
1067}
1068
vinokuma926cb3e2023-03-29 11:41:06 +05301069// isSBOperAllowed - determines if the SB operation is allowed based on device state & force flag
Naveen Sampath04696f72022-06-13 15:19:14 +05301070func (d *Device) isSBOperAllowed(forceAction bool) bool {
Naveen Sampath04696f72022-06-13 15:19:14 +05301071 if d.State == DeviceStateUP {
1072 return true
1073 }
1074
1075 if d.State == DeviceStateDISABLED && forceAction {
1076 return true
1077 }
1078
1079 return false
1080}
1081
Akash Sonief452f12024-12-12 18:20:28 +05301082// IsFlowDelThresholdReached - check if the attempts for flow delete has reached threshold or not
1083func (d *Device) IsFlowDelThresholdReached(flowCount uint32, cookie uint64) bool {
1084 logger.Debugw(ctx, "Check flow delete threshold", log.Fields{"Cookie": cookie, "FlowCount": flowCount})
Akash Reddy Kankanala105581b2024-09-11 05:20:38 +05301085 return flowCount >= uint32(GetController().GetMaxFlowRetryAttempt())
Naveen Sampath04696f72022-06-13 15:19:14 +05301086}
1087
Sridhar Ravindra64b19ca2026-01-26 22:19:07 +05301088// IsUSTable0Flow - check if the flow is for US Table 0
1089func (d *Device) IsUSTable0Flow(cntx context.Context, flow *of.VoltSubFlow) bool {
1090 if flow.TableID == 0 && !util.IsNniPort(flow.Match.InPort) {
1091 return true
1092 }
1093 return false
1094}
1095
1096// IsDSTable0Flow - check if the flow is for DS Table 0
1097func (d *Device) IsDSTable0Flow(cntx context.Context, flow *of.VoltSubFlow) bool {
1098 if flow.TableID == 0 && util.IsNniPort(flow.Match.InPort) {
1099 return true
1100 }
1101 return false
1102}
1103
1104// GetDeviceFlow - get the DS or US Table 1 flow based on isDsFlow flag
1105func (d *Device) GetDeviceFlow(cntx context.Context, flow *of.VoltSubFlow, deviceSerialNum string, devID string, isDsFlow bool) *of.VoltSubFlow {
1106 cookies := GetController().GetAllFlowsForSvc(cntx, flow, devID, deviceSerialNum)
1107 for _, cookie := range cookies {
1108 if dbFlow, ok := d.flows[cookie]; ok {
1109 logger.Debugw(ctx, "Found flow in device", log.Fields{"Cookie": cookie, "Flow": dbFlow})
1110 if isDsFlow {
1111 // return DS Table1 flow
1112 if dbFlow.TableID == 1 && util.IsNniPort(dbFlow.Match.InPort) {
1113 return dbFlow
1114 }
1115 } else {
1116 // return US Table1 flow
1117 if dbFlow.TableID == 1 && !util.IsNniPort(dbFlow.Match.InPort) {
1118 return dbFlow
1119 }
1120 }
1121 }
1122 }
1123 return nil
1124}
1125
Akash Sonief452f12024-12-12 18:20:28 +05301126// IsFlowAddThresholdReached - check if the attempts for flow add has reached threshold or not
1127func (d *Device) IsFlowAddThresholdReached(flowCount uint32, cookie uint64) bool {
1128 logger.Debugw(ctx, "Check flow add threshold", log.Fields{"Cookie": cookie, "FlowCount": flowCount})
Akash Reddy Kankanala105581b2024-09-11 05:20:38 +05301129 return flowCount >= uint32(GetController().GetMaxFlowRetryAttempt())
Akash Sonief452f12024-12-12 18:20:28 +05301130}
1131
1132func (d *Device) UpdateFlowCount(cntx context.Context, cookie uint64) {
1133 if dbFlow, ok := d.flows[cookie]; ok {
1134 dbFlow.FlowCount++
Akash Sonief452f12024-12-12 18:20:28 +05301135 }
1136}
1137
1138func (d *Device) triggerFlowNotification(cntx context.Context, cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error) {
bseeniva1fc88162025-01-15 12:32:24 +05301139 flow, ok := d.GetFlow(cookie)
1140 if ok {
1141 d.triggerFlowResultNotification(cntx, cookie, flow, oper, bwDetails, err)
1142 } else {
1143 logger.Warnw(ctx, "Flow not found", log.Fields{"device-id": d.ID, "Cookie": cookie})
1144 }
Akash Sonief452f12024-12-12 18:20:28 +05301145}
1146
1147func (d *Device) triggerFlowResultNotification(cntx context.Context, cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301148 statusCode, statusMsg := infraerror.GetErrorInfo(err)
1149 success := isFlowOperSuccess(statusCode, oper)
1150
Akash Sonief452f12024-12-12 18:20:28 +05301151 updateFlowStatus := func(cookie uint64, state int, reason string) {
1152 d.flowLock.Lock()
1153 defer d.flowLock.Unlock()
1154 if dbFlow, ok := d.flows[cookie]; ok {
Naveen Sampath04696f72022-06-13 15:19:14 +05301155 dbFlow.State = uint8(state)
1156 dbFlow.ErrorReason = reason
Naveen Sampath04696f72022-06-13 15:19:14 +05301157 }
1158 }
1159
vinokuma926cb3e2023-03-29 11:41:06 +05301160 // Update flow results
Naveen Sampath04696f72022-06-13 15:19:14 +05301161 // Add - Update Success or Failure status with reason
1162 // Del - Delete entry from DB on success else update error reason
1163 if oper == of.CommandAdd {
1164 state := of.FlowAddSuccess
1165 reason := ""
1166 if !success {
1167 state = of.FlowAddFailure
1168 reason = statusMsg
1169 }
Akash Sonief452f12024-12-12 18:20:28 +05301170 updateFlowStatus(cookie, state, reason)
1171 logger.Debugw(ctx, "Add flow updated to DB", log.Fields{"Cookie": cookie, "State": state})
Naveen Sampath04696f72022-06-13 15:19:14 +05301172 } else {
1173 if success && flow != nil {
Akash Sonief452f12024-12-12 18:20:28 +05301174 logger.Debugw(ctx, "Deleted flow from device and DB", log.Fields{"Cookie": cookie})
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301175 if err := d.DelFlow(cntx, flow); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301176 logger.Warnw(ctx, "Delete Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
1177 }
bseeniva1fc88162025-01-15 12:32:24 +05301178 } else if !success && flow != nil {
Akash Sonief452f12024-12-12 18:20:28 +05301179 if d.IsFlowDelThresholdReached(flow.FlowCount, flow.Cookie) {
1180 logger.Debugw(ctx, "Deleted flow from device and DB after delete threshold reached", log.Fields{"Cookie": cookie})
1181 if err := d.DelFlow(cntx, flow); err != nil {
1182 logger.Warnw(ctx, "Delete Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
1183 }
1184 } else {
1185 updateFlowStatus(cookie, of.FlowDelFailure, statusMsg)
1186 logger.Debugw(ctx, "Delete flow updated to DB", log.Fields{"Cookie": cookie})
1187 }
Naveen Sampath04696f72022-06-13 15:19:14 +05301188 }
1189 }
1190
1191 flowResult := intf.FlowStatus{
1192 Cookie: strconv.FormatUint(cookie, 10),
1193 Device: d.ID,
1194 FlowModType: oper,
1195 Flow: flow,
1196 Status: statusCode,
1197 Reason: statusMsg,
1198 AdditionalData: bwDetails,
1199 }
1200
Akash Sonief452f12024-12-12 18:20:28 +05301201 logger.Debugw(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
1202 GetController().ProcessFlowModResultIndication(cntx, flowResult)
Naveen Sampath04696f72022-06-13 15:19:14 +05301203}