blob: 9ab5e8ee23aa112f6ffdabb85ed87ff8574970e1 [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
Akash Sonia8246972023-01-03 10:37:08 +053014 */
Naveen Sampath04696f72022-06-13 15:19:14 +053015
16package application
17
18import (
Tinoj Joseph07cc5372022-07-18 22:53:51 +053019 "context"
Naveen Sampath04696f72022-06-13 15:19:14 +053020 "encoding/json"
21 "errors"
22 "net"
23 "reflect"
Naveen Sampath04696f72022-06-13 15:19:14 +053024 "strings"
25 "sync"
26 "time"
Akash Sonia8246972023-01-03 10:37:08 +053027 common "voltha-go-controller/internal/pkg/types"
Naveen Sampath04696f72022-06-13 15:19:14 +053028
29 "github.com/google/gopacket"
30 "github.com/google/gopacket/layers"
31
Naveen Sampath04696f72022-06-13 15:19:14 +053032 "voltha-go-controller/database"
Akash Sonia8246972023-01-03 10:37:08 +053033 cntlr "voltha-go-controller/internal/pkg/controller"
Naveen Sampath04696f72022-06-13 15:19:14 +053034 "voltha-go-controller/internal/pkg/of"
Tinoj Joseph1d108322022-07-13 10:07:39 +053035 "voltha-go-controller/log"
Naveen Sampath04696f72022-06-13 15:19:14 +053036)
37
38const (
39 // IgmpVersion0 constant (Default init value)
40 IgmpVersion0 uint8 = 0
41 // IgmpVersion1 constant
42 IgmpVersion1 uint8 = 1
43 // IgmpVersion2 constant
44 IgmpVersion2 uint8 = 2
45 // IgmpVersion3 constant
46 IgmpVersion3 uint8 = 3
47 // MinKeepAliveInterval constant
48 MinKeepAliveInterval uint32 = 10
49 // MaxDiffKAIntervalResp constant
50 MaxDiffKAIntervalResp uint32 = 5
51 // StaticGroup constant
52 StaticGroup string = "static"
53 // DynamicGroup constant
54 DynamicGroup string = "dynamic"
55 // StaticPort constant
56 StaticPort string = "static_port"
57 // DefaultIgmpProfID constant
58 DefaultIgmpProfID = ""
59 //GroupExpiryTime - group expiry time in minutes
60 GroupExpiryTime uint32 = 15
61)
62
63const (
64 // JoinUnsuccessful constant
65 JoinUnsuccessful string = "JOIN-UNSUCCESSFUL"
66 // JoinUnsuccessfulExceededIGMPChanel constant
67 JoinUnsuccessfulExceededIGMPChanel string = "Exceeded subscriber or PON port IGMP channels threshold"
68 // JoinUnsuccessfulAddFlowGroupFailed constant
69 JoinUnsuccessfulAddFlowGroupFailed string = "Failed to add flow or group for a channel"
70 // JoinUnsuccessfulGroupNotConfigured constant
71 JoinUnsuccessfulGroupNotConfigured string = "Join received from a subscriber on non-configured group"
72 // JoinUnsuccessfulVlanDisabled constant
73 JoinUnsuccessfulVlanDisabled string = "Vlan is disabled"
74 // JoinUnsuccessfulDescription constant
75 JoinUnsuccessfulDescription string = "igmp join unsuccessful"
76 // QueryExpired constant
77 QueryExpired string = "QUERY-EXPIRED"
78 // QueryExpiredGroupSpecific constant
79 QueryExpiredGroupSpecific string = "Group specific multicast query expired"
80 // QueryExpiredDescription constant
81 QueryExpiredDescription string = "igmp query expired"
82)
83
Naveen Sampath04696f72022-06-13 15:19:14 +053084// McastConfig structure
85type McastConfig struct {
86 OltSerialNum string
87 MvlanProfileID string
88 IgmpProfileID string
Naveen Sampath04696f72022-06-13 15:19:14 +053089 Version string
90 // This map will help in updating the igds whenever there is a igmp profile id update
91 IgmpGroupDevices sync.Map `json:"-"` // Key is group id
vinokuma926cb3e2023-03-29 11:41:06 +053092 IgmpProxyIP net.IP
93 OperState OperInProgress
Naveen Sampath04696f72022-06-13 15:19:14 +053094}
95
96var (
97 // NullIPAddr is null ip address var
98 NullIPAddr = net.ParseIP("0.0.0.0")
Tinoj Josephcf161be2022-07-07 19:47:47 +053099 // AllSystemsMulticastGroupIP
100 AllSystemsMulticastGroupIP = net.ParseIP("224.0.0.1")
Naveen Sampath04696f72022-06-13 15:19:14 +0530101 // igmpSrcMac for the proxy
102 igmpSrcMac string
103)
104
105func init() {
106 RegisterPacketHandler(IGMP, ProcessIgmpPacket)
107}
108
109// ProcessIgmpPacket : CallBack function registered with application to handle IGMP packetIn
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530110func ProcessIgmpPacket(cntx context.Context, device string, port string, pkt gopacket.Packet) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530111 GetApplication().IgmpPacketInd(device, port, pkt)
112}
113
114func ipv4ToUint(ip net.IP) uint32 {
115 result := uint32(0)
116 addr := ip.To4()
117 if addr == nil {
118 logger.Warnw(ctx, "Invalid Group Addr", log.Fields{"IP": ip})
119 return 0
120 }
121 result = result + uint32(addr[0])<<24
122 result = result + uint32(addr[1])<<16
123 result = result + uint32(addr[2])<<8
124 result = result + uint32(addr[3])
125 return result
126}
127
128func getPodMacAddr() (string, error) {
129 ifas, err := net.Interfaces()
130 if err != nil {
131 return "", err
132 }
133 var ipv4Addr net.IP
134 for _, ifa := range ifas {
135 addrs, err := ifa.Addrs()
136 if err != nil {
137 return "", err
138 }
139 for _, addr := range addrs {
140 if ipv4Addr = addr.(*net.IPNet).IP.To4(); ipv4Addr != nil {
141 if ipv4Addr.IsGlobalUnicast() {
142 logger.Infow(ctx, "Igmp Static config", log.Fields{"MacAddr": ifa.HardwareAddr.String(), "ipAddr": ipv4Addr})
143 return ifa.HardwareAddr.String(), nil
144 }
145 }
146 }
Naveen Sampath04696f72022-06-13 15:19:14 +0530147 }
148 return "", errors.New("MAC Address not found,Setting default")
149}
150
151// IgmpUsEthLayer : Layers defined for upstream communication
152// Ethernet layer for upstream communication
153func IgmpUsEthLayer(mcip net.IP) *layers.Ethernet {
154 eth := &layers.Ethernet{}
155 // TODO: Set the source MAC properly and remove hardcoding
156 eth.SrcMAC, _ = net.ParseMAC(igmpSrcMac)
157 eth.DstMAC, _ = net.ParseMAC("01:00:5e:00:00:00")
158 eth.DstMAC[3] = mcip[1] & 0x7f
159 eth.DstMAC[4] = mcip[2]
160 eth.DstMAC[5] = mcip[3]
161 eth.EthernetType = layers.EthernetTypeDot1Q
162 return eth
163}
164
165// IgmpUsDot1qLayer set US VLAN layer
166func IgmpUsDot1qLayer(vlan of.VlanType, priority uint8) *layers.Dot1Q {
167 dot1q := &layers.Dot1Q{}
168 dot1q.Priority = priority
169 dot1q.DropEligible = false
170 dot1q.VLANIdentifier = uint16(vlan)
171 dot1q.Type = layers.EthernetTypeIPv4
172 return dot1q
173}
174
175// Igmpv2UsIpv4Layer : Set the IP layer for IGMPv2
176// TODO - Identify correct way of obtaining source IP
177// This should be the configured IGMP proxy address which should be per OLT
178// We should probably be able to have a single function for both
179// upstream and downstream
180func Igmpv2UsIpv4Layer(src net.IP, mcip net.IP) *layers.IPv4 {
181 ip := &layers.IPv4{}
182 ip.Version = 4
183 ip.Protocol = layers.IPProtocolIGMP
184 ip.TTL = 1
185 ip.SrcIP = src
186 ip.DstIP = mcip
187 return ip
188}
189
190// Igmpv3UsIpv4Layer : Set the IP layer for IGMPv3
191// TODO - Identify correct way of obtaining source IP
192// This should be the configured IGMP proxy address which should be per OLT
193// We should probably be able to have a single function for both
194// upstream and downstream
195func Igmpv3UsIpv4Layer(src net.IP) *layers.IPv4 {
196 ip := &layers.IPv4{}
197 ip.Version = 4
198 ip.Protocol = layers.IPProtocolIGMP
199 ip.TTL = 1
200 ip.SrcIP = src
201 ip.DstIP = net.ParseIP("224.0.0.22")
202 return ip
203}
204
205// IgmpDsEthLayer : Layers defined for downstream communication
206// Ethernet layer for downstream communication
207func IgmpDsEthLayer(mcip net.IP) *layers.Ethernet {
208 eth := &layers.Ethernet{}
209 // TODO: Set the source and dest MAC properly and remove hardcoding
210 eth.SrcMAC, _ = net.ParseMAC(igmpSrcMac)
211 eth.DstMAC, _ = net.ParseMAC("01:00:5e:00:00:00")
212 eth.DstMAC[3] = mcip[1] & 0x7f
213 eth.DstMAC[4] = mcip[2]
214 eth.DstMAC[5] = mcip[3]
215 eth.EthernetType = layers.EthernetTypeDot1Q
216 return eth
217}
218
219// IgmpDsDot1qLayer set the DS VLAN layer
220func IgmpDsDot1qLayer(vlan of.VlanType, priority uint8) *layers.Dot1Q {
221 dot1q := &layers.Dot1Q{}
222 dot1q.Priority = priority
223 dot1q.DropEligible = false
224 dot1q.VLANIdentifier = uint16(vlan)
225 dot1q.Type = layers.EthernetTypeIPv4
226 return dot1q
227}
228
229// IgmpDsIpv4Layer set the IP layer
230func IgmpDsIpv4Layer(src net.IP, mcip net.IP) *layers.IPv4 {
231 ip := &layers.IPv4{}
232 ip.Version = 4
233 ip.Protocol = layers.IPProtocolIGMP
234 ip.TTL = 1
235 ip.SrcIP = src
236 if mcip.Equal(net.ParseIP("0.0.0.0")) {
237 mcip = net.ParseIP("224.0.0.1")
238 }
239 ip.DstIP = mcip
240 return ip
241}
242
243// IgmpQueryv2Layer : IGMP Query Layer
244func IgmpQueryv2Layer(mcip net.IP, resptime time.Duration) *layers.IGMPv1or2 {
245 igmp := &layers.IGMPv1or2{}
246 igmp.Type = layers.IGMPMembershipQuery
247 igmp.GroupAddress = mcip
248 igmp.MaxResponseTime = resptime
249 return igmp
250}
251
252// IgmpQueryv3Layer : IGMP v3 Query Layer
253func IgmpQueryv3Layer(mcip net.IP, resptime time.Duration) *layers.IGMP {
254 igmp := &layers.IGMP{}
255 igmp.Type = layers.IGMPMembershipQuery
256 igmp.GroupAddress = mcip
257 igmp.MaxResponseTime = resptime
258 return igmp
259}
260
261// IgmpReportv2Layer : IGMP Layer
262func IgmpReportv2Layer(mcip net.IP) *layers.IGMPv1or2 {
263 igmp := &layers.IGMPv1or2{}
264 igmp.Type = layers.IGMPMembershipReportV2
265 igmp.GroupAddress = mcip
266 return igmp
267}
268
269// IgmpLeavev2Layer : IGMP Leave Layer
270func IgmpLeavev2Layer(mcip net.IP) *layers.IGMPv1or2 {
271 igmp := &layers.IGMPv1or2{}
272 igmp.Type = layers.IGMPLeaveGroup
273 igmp.GroupAddress = mcip
274 return igmp
275}
276
277// IgmpReportv3Layer : IGMP v3 Report Layer
278func IgmpReportv3Layer(mcip net.IP, incl bool, srclist []net.IP) *layers.IGMP {
279 // IGMP base
280 igmp := &layers.IGMP{}
281 igmp.Type = layers.IGMPMembershipReportV3
282 igmp.NumberOfGroupRecords = 1
283
284 // IGMP Group
285 group := layers.IGMPv3GroupRecord{}
286 if incl {
287 group.Type = layers.IGMPIsIn
288 } else {
289 group.Type = layers.IGMPIsEx
290 }
291 group.MulticastAddress = mcip
292 group.NumberOfSources = uint16(len(srclist))
293 group.SourceAddresses = srclist
294 igmp.GroupRecords = append(igmp.GroupRecords, group)
295
296 return igmp
297}
298
299// Igmpv2QueryPacket : IGMP Query in Downstream
300func Igmpv2QueryPacket(mcip net.IP, vlan of.VlanType, selfip net.IP, pbit uint8, maxResp uint32) ([]byte, error) {
301 // Construct the layers that form the packet
302 eth := IgmpDsEthLayer(mcip)
303 dot1q := IgmpDsDot1qLayer(vlan, pbit)
304 ip := IgmpDsIpv4Layer(selfip, mcip)
305 igmp := IgmpQueryv2Layer(mcip, time.Duration(maxResp)*time.Second)
306
307 // Now prepare the buffer into which the layers are to be serialized
308 buff := gopacket.NewSerializeBuffer()
309 opts := gopacket.SerializeOptions{
310 FixLengths: true,
311 ComputeChecksums: true,
312 }
313 if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
314 logger.Error(ctx, "Error in serializing layers")
315 return nil, err
316 }
317 return buff.Bytes(), nil
318}
319
320// Igmpv3QueryPacket : IGMPv3 Query in Downstream
321func Igmpv3QueryPacket(mcip net.IP, vlan of.VlanType, selfip net.IP, pbit uint8, maxResp uint32) ([]byte, error) {
322 // Construct the layers that form the packet
323 eth := IgmpDsEthLayer(mcip)
324 dot1q := IgmpDsDot1qLayer(vlan, pbit)
325 ip := IgmpDsIpv4Layer(selfip, mcip)
326 igmp := IgmpQueryv3Layer(mcip, time.Duration(maxResp)*time.Second)
327
328 // Now prepare the buffer into which the layers are to be serialized
329 buff := gopacket.NewSerializeBuffer()
330 opts := gopacket.SerializeOptions{
331 FixLengths: true,
332 ComputeChecksums: true,
333 }
334 if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
335 logger.Error(ctx, "Error in serializing layers")
336 return nil, err
337 }
338 return buff.Bytes(), nil
339}
340
341// IgmpReportv2Packet : Packet - IGMP v2 report in upstream
342func IgmpReportv2Packet(mcip net.IP, vlan of.VlanType, priority uint8, selfip net.IP) ([]byte, error) {
343 // Construct the layers that form the packet
344 eth := IgmpUsEthLayer(mcip)
345 dot1q := IgmpUsDot1qLayer(vlan, priority)
346 ip := Igmpv2UsIpv4Layer(selfip, mcip)
347 igmp := IgmpReportv2Layer(mcip)
348
349 // Now prepare the buffer into which the layers are to be serialized
350 buff := gopacket.NewSerializeBuffer()
351 opts := gopacket.SerializeOptions{
352 FixLengths: true,
353 ComputeChecksums: true,
354 }
355 if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
356 logger.Error(ctx, "Error in serializing layers")
357 return nil, err
358 }
359 return buff.Bytes(), nil
360}
361
362// Igmpv3ReportPacket : Packet - IGMP v3 report in upstream
363func Igmpv3ReportPacket(mcip net.IP, vlan of.VlanType, priority uint8, selfip net.IP, incl bool, srclist []net.IP) ([]byte, error) {
364 // Construct the layers that form the packet
365 eth := IgmpUsEthLayer(net.ParseIP("224.0.0.22").To4())
366 dot1q := IgmpUsDot1qLayer(vlan, priority)
367 ip := Igmpv3UsIpv4Layer(selfip)
368 igmp := IgmpReportv3Layer(mcip, incl, srclist)
369
370 // Now prepare the buffer into which the layers are to be serialized
371 buff := gopacket.NewSerializeBuffer()
372 opts := gopacket.SerializeOptions{
373 FixLengths: true,
374 ComputeChecksums: true,
375 }
376 if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
377 logger.Error(ctx, "Error in serializing layers")
378 return nil, err
379 }
380 return buff.Bytes(), nil
381}
382
383// IgmpLeavePacket : Packet- IGMP Leave in upstream
384func IgmpLeavePacket(mcip net.IP, vlan of.VlanType, priority uint8, selfip net.IP) ([]byte, error) {
385 // Construct the layers that form the packet
386 eth := IgmpUsEthLayer(mcip)
387 dot1q := IgmpUsDot1qLayer(vlan, priority)
388 ip := Igmpv2UsIpv4Layer(selfip, mcip)
389 igmp := IgmpLeavev2Layer(mcip)
390
391 // Now prepare the buffer into which the layers are to be serialized
392 buff := gopacket.NewSerializeBuffer()
393 opts := gopacket.SerializeOptions{
394 FixLengths: true,
395 ComputeChecksums: true,
396 }
397 if err := gopacket.SerializeLayers(buff, opts, eth, dot1q, ip, igmp); err != nil {
398 logger.Error(ctx, "Error in serializing layers")
399 return nil, err
400 }
401 return buff.Bytes(), nil
402}
403
404// getVersion to get igmp version type
405func getVersion(ver string) uint8 {
406 if ver == "2" || ver == "v2" {
407 return IgmpVersion2
408 }
409 return IgmpVersion3
410}
411
412// IsIPPresent is Utility to check if an IP address is in a list
413func IsIPPresent(i net.IP, ips []net.IP) bool {
414 for _, ip := range ips {
415 if i.Equal(ip) {
416 return true
417 }
418 }
419 return false
420}
421
vinokuma926cb3e2023-03-29 11:41:06 +0530422// AddToPendingPool - adds Igmp Device obj to pending pool
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530423func AddToPendingPool(cntx context.Context, device string, groupKey string) bool {
Naveen Sampath04696f72022-06-13 15:19:14 +0530424 logger.Infow(ctx, "Add Device to IgmpGroup Pending Pool", log.Fields{"Device": device, "GroupKey": groupKey})
425 if grp, ok := GetApplication().IgmpGroups.Load(groupKey); ok {
426 ig := grp.(*IgmpGroup)
427 ig.PendingPoolLock.Lock()
428 logger.Infow(ctx, "Adding Device to IgmpGroup Pending Pool", log.Fields{"Device": device, "GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String()})
429 ig.PendingGroupForDevice[device] = time.Now().Add(time.Duration(GroupExpiryTime) * time.Minute)
430 ig.PendingPoolLock.Unlock()
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530431 if err := ig.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530432 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
433 }
434 return true
435 }
436 return false
437}
438
439/*
440func checkIfForceGroupRemove(device string) bool {
441 if d := GetApplication().GetDevice(device); d != nil {
442 if d.State == cntlr.DeviceStateREBOOTED || d.State == cntlr.DeviceStateDOWN {
443 return true
444 }
445 }
446 return false
447}*/
448
Naveen Sampath04696f72022-06-13 15:19:14 +0530449// SendQueryExpiredEventGroupSpecific to send group specific query expired event.
450func SendQueryExpiredEventGroupSpecific(portKey string, igd *IgmpGroupDevice, igc *IgmpGroupChannel) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530451 logger.Info(ctx, "Processing-SendQueryExpiredEventGroupSpecific-Event")
452 va := GetApplication()
453 mvpName := va.GetMvlanProfileByTag(igd.Mvlan).Name
454
455 sendEvent := func(key interface{}, value interface{}) bool {
456 if value.(*VoltService).IgmpEnabled && value.(*VoltService).MvlanProfileName == mvpName {
457 logger.Debugw(ctx, "sending-query-expired-group-specific-event", log.Fields{"EventType": QueryExpiredGroupSpecific, "ServiceName": value.(*VoltService).Name})
458 }
459 return false
460 }
461
462 // Fetching service name to send with query expired event.
463 vpvs, _ := va.VnetsByPort.Load(portKey)
464 if vpvs == nil {
465 logger.Errorw(ctx, "volt-port-vnet-is-nil", log.Fields{"vpvs": vpvs})
466 return
467 }
468
469 for _, vpv := range vpvs.([]*VoltPortVnet) {
470 vpv.services.Range(sendEvent)
471 }
472}
473
474// GetMcastServiceForSubAlarm to get mcast service name for subscriber alarm.
475func GetMcastServiceForSubAlarm(uniPort *VoltPort, mvp *MvlanProfile) string {
Naveen Sampath04696f72022-06-13 15:19:14 +0530476 var serviceName string
477 mvpName := mvp.Name
478
479 va := GetApplication()
480
481 sendAlm := func(key interface{}, value interface{}) bool {
482 if value.(*VoltService).IgmpEnabled && value.(*VoltService).MvlanProfileName == mvpName {
483 serviceName = value.(*VoltService).Name
484 }
485 return true
486 }
487
488 // Fetching service name to send with active channels exceeded per subscriber alarm.
489 vpvs, _ := va.VnetsByPort.Load(uniPort.Name)
490 if vpvs == nil {
491 logger.Errorw(ctx, "volt-port-vnet-is-nil", log.Fields{"vpvs": vpvs})
492 return serviceName
493 }
494
495 for _, vpv := range vpvs.([]*VoltPortVnet) {
496 vpv.services.Range(sendAlm)
497 }
498
499 return serviceName
Naveen Sampath04696f72022-06-13 15:19:14 +0530500}
501
Naveen Sampath04696f72022-06-13 15:19:14 +0530502// RestoreIgmpGroupsFromDb to restore igmp groups from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530503func (va *VoltApplication) RestoreIgmpGroupsFromDb(cntx context.Context) {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530504 groups, _ := db.GetIgmpGroups(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +0530505 for _, group := range groups {
506 b, ok := group.Value.([]byte)
507 if !ok {
508 logger.Warn(ctx, "The value type is not []byte")
509 continue
510 }
511 var ig IgmpGroup
512 err := json.Unmarshal(b, &ig)
513 if err != nil {
514 logger.Warn(ctx, "Unmarshal of IGMP Group failed")
515 continue
516 }
517 ig.Devices = make(map[string]*IgmpGroupDevice)
518
519 //For Upgrade Case
520 if len(ig.PendingGroupForDevice) == 0 {
521 ig.PendingGroupForDevice = make(map[string]time.Time)
522 }
523 logger.Infow(ctx, "Restoring Groups", log.Fields{"igGroupID": ig.GroupID, "igGroupName": ig.GroupName, "igMvlan": ig.Mvlan})
524 grpKey := ig.getKey()
525 va.IgmpGroups.Store(grpKey, &ig)
526 // Just delete and lose the IGMP group with the same group Id
527 if _, err := va.GetIgmpGroupID(ig.GroupID); err != nil {
528 logger.Warnw(ctx, "GetIgmpGroupID Failed", log.Fields{"igGroupID": ig.GroupID, "Error": err})
529 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530530 ig.RestoreDevices(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +0530531
532 if ig.NumDevicesActive() == 0 {
533 va.AddGroupToPendingPool(&ig)
534 }
535 logger.Infow(ctx, "Restored Groups", log.Fields{"igGroupID": ig.GroupID, "igGroupName": ig.GroupName, "igMvlan": ig.Mvlan})
536 }
537}
538
539// AddIgmpGroup : When the first IGMP packet is received, the MVLAN profile is identified
540// for the IGMP group and grp obj is obtained from the available pending pool of groups.
541// If not, new group obj will be created based on available group IDs
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530542func (va *VoltApplication) AddIgmpGroup(cntx context.Context, mvpName string, gip net.IP, device string) *IgmpGroup {
Naveen Sampath04696f72022-06-13 15:19:14 +0530543 var ig *IgmpGroup
544 if mvp, grpName := va.GetMvlanProfileForMcIP(mvpName, gip); mvp != nil {
545 if ig = va.GetGroupFromPendingPool(mvp.Mvlan, device); ig != nil {
546 logger.Infow(ctx, "Igmp Group obtained from global pending pool", log.Fields{"MvlanProfile": mvpName, "GroupID": ig.GroupID, "Device": device, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String()})
547 oldKey := mvp.generateGroupKey(ig.GroupName, ig.GroupAddr.String())
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530548 ig.IgmpGroupReInit(cntx, grpName, gip)
Naveen Sampath04696f72022-06-13 15:19:14 +0530549 ig.IsGroupStatic = mvp.Groups[grpName].IsStatic
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530550 ig.UpdateIgmpGroup(cntx, oldKey, ig.getKey())
Naveen Sampath04696f72022-06-13 15:19:14 +0530551 } else {
552 logger.Infow(ctx, "No Igmp Group available in global pending pool. Creating new Igmp Group", log.Fields{"MvlanProfile": mvpName, "Device": device, "GroupAddr": gip.String()})
553 if ig = va.GetAvailIgmpGroupID(); ig == nil {
554 logger.Error(ctx, "Igmp Group Creation Failed: Group Id Unavailable")
555 return nil
556 }
557 ig.IgmpGroupInit(grpName, gip, mvp)
558 grpKey := ig.getKey()
559 va.IgmpGroups.Store(grpKey, ig)
560 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530561 if err := ig.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530562 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
563 }
564 return ig
565 }
Tinoj Joseph1d108322022-07-13 10:07:39 +0530566 logger.Errorw(ctx, "GetMvlan Pro failed", log.Fields{"Group": gip})
Naveen Sampath04696f72022-06-13 15:19:14 +0530567 return nil
568}
569
570// GetIgmpGroup helps search for the IGMP group from the list of
571// active IGMP groups. For now, the assumption is that a group
572// cannot belong to more than on MVLAN. If we change that definition,
573// we have to take a relook at this implementation. The key will include
574// both MVLAN and the group IP.
575func (va *VoltApplication) GetIgmpGroup(mvlan of.VlanType, gip net.IP) *IgmpGroup {
Naveen Sampath04696f72022-06-13 15:19:14 +0530576 profile, _ := va.MvlanProfilesByTag.Load(mvlan)
577 if profile == nil {
578 logger.Errorw(ctx, "Mvlan Profile not found for incoming packet. Dropping Request", log.Fields{"Mvlan": mvlan, "GroupAddr": gip.String()})
579 return nil
580 }
581 mvp := profile.(*MvlanProfile)
582 _, gName := va.GetMvlanProfileForMcIP(mvp.Name, gip)
583 grpKey := mvp.generateGroupKey(gName, gip.String())
584 logger.Debugw(ctx, "Get IGMP Group", log.Fields{"Group": grpKey})
585 igIntf, ok := va.IgmpGroups.Load(grpKey)
586 if ok {
587 logger.Debugw(ctx, "Get IGMP Group Success", log.Fields{"Group": grpKey})
588 ig := igIntf.(*IgmpGroup)
589
590 //Case: Group was part of pending and Join came with same channel or different channel from same group
591 // (from same or different device)
592 // In that case, the same group will be allocated since the group is still part of va.IgmpGroups
593 // So, the groups needs to be removed from global pending pool
594 va.RemoveGroupDevicesFromPendingPool(ig)
595 return ig
596 }
597 return nil
598}
599
Naveen Sampath04696f72022-06-13 15:19:14 +0530600// DelIgmpGroup : When the last subscriber leaves the IGMP group across all the devices
601// the IGMP group is removed.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530602func (va *VoltApplication) DelIgmpGroup(cntx context.Context, ig *IgmpGroup) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530603 profile, found := GetApplication().MvlanProfilesByTag.Load(ig.Mvlan)
604 if found {
605 mvp := profile.(*MvlanProfile)
606
607 grpKey := mvp.generateGroupKey(ig.GroupName, ig.GroupAddr.String())
608
609 if igIntf, ok := va.IgmpGroups.Load(grpKey); ok {
610 ig := igIntf.(*IgmpGroup)
611 ig.IgmpGroupLock.Lock()
612 if ig.NumDevicesAll() == 0 {
613 logger.Debugw(ctx, "Deleting IGMP Group", log.Fields{"Group": grpKey})
614 va.PutIgmpGroupID(ig)
615 va.IgmpGroups.Delete(grpKey)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530616 _ = db.DelIgmpGroup(cntx, grpKey)
Naveen Sampath04696f72022-06-13 15:19:14 +0530617 } else {
618 logger.Infow(ctx, "Skipping IgmpGroup Device. Pending Igmp Group Devices present", log.Fields{"GroupID": ig.GroupID, "GroupName": ig.GroupName, "GroupAddr": ig.GroupAddr.String(), "PendingDevices": len(ig.Devices)})
619 va.AddGroupToPendingPool(ig)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530620 if err := ig.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530621 logger.Errorw(ctx, "Igmp group Write to DB failed", log.Fields{"groupName": ig.GroupName})
622 }
623 }
624 ig.IgmpGroupLock.Unlock()
625 }
Naveen Sampath04696f72022-06-13 15:19:14 +0530626 }
627}
628
629// GetPonPortID Gets the PON port ID from uniPortID
630func (va *VoltApplication) GetPonPortID(device, uniPortID string) uint32 {
Naveen Sampath04696f72022-06-13 15:19:14 +0530631 isNNI := strings.Contains(uniPortID, "nni")
632 if isNNI || uniPortID == StaticPort {
633 logger.Debugw(ctx, "Cannot get pon port from UNI port", log.Fields{"port": uniPortID})
634 return 0xFF
635 }
636 dIntf, ok := va.DevicesDisc.Load(device)
637 if !ok {
638 return 0xFF
639 }
640 d := dIntf.(*VoltDevice)
641
642 uniPort := d.GetPort(uniPortID)
643 if uniPort == nil {
644 return 0xFF
645 }
646 return GetPonPortIDFromUNIPort(uniPort.ID)
647}
648
649// AggActiveChannelsCountPerSub aggregates the active channel count for given uni port.
650// It will iterate over all the groups and store the sum of active channels in VoltPort.
651func (va *VoltApplication) AggActiveChannelsCountPerSub(device, uniPort string, port *VoltPort) {
652 var activeChannelCount uint32
653
654 collectActiveChannelCount := func(key interface{}, value interface{}) bool {
655 ig := value.(*IgmpGroup)
656 igd := ig.Devices[device]
657 if igd == nil {
658 return true
659 }
660 if portChannels, ok := igd.PortChannelMap.Load(uniPort); ok {
661 channelList := portChannels.([]net.IP)
Akash Reddy Kankanala105581b2024-09-11 05:20:38 +0530662 channelLength := len(channelList)
663 // Check if the length exceeds uint32's maximum value
664 if channelLength > int(^uint32(0)) {
665 logger.Error(ctx, "Error converting string to uint32")
666 }
667 activeChannelCount += uint32(channelLength)
Naveen Sampath04696f72022-06-13 15:19:14 +0530668 }
669 return true
670 }
671 va.IgmpGroups.Range(collectActiveChannelCount)
672
673 logger.Debugw(ctx, "AggrActiveChannelCount for Subscriber",
674 log.Fields{"UniPortID": uniPort, "count": activeChannelCount})
675
676 port.ActiveChannels = activeChannelCount
677}
678
679// AggActiveChannelsCountForPonPort Aggregates the active channel count for given pon port.
680// It will iterate over all the groups and store the sum of active channels in VoltDevice.
681func (va *VoltApplication) AggActiveChannelsCountForPonPort(device string, ponPortID uint32, port *PonPortCfg) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530682 var activeChannelCount uint32
683
684 collectActiveChannelCount := func(key interface{}, value interface{}) bool {
685 ig := value.(*IgmpGroup)
686 igd := ig.Devices[device]
687 if igd == nil {
688 return true
689 }
690 if ponPortChannels, ok := igd.PonPortChannelMap.Get(ponPortID); ok {
691 activeChannelCount += ponPortChannels.(*PonPortChannels).GetActiveChannelCount()
692 }
693 return true
694 }
695 va.IgmpGroups.Range(collectActiveChannelCount)
696
697 logger.Debugw(ctx, "AggrActiveChannelCount for Pon Port",
698 log.Fields{"PonPortID": ponPortID, "count": activeChannelCount})
699
700 port.ActiveIGMPChannels = activeChannelCount
701}
702
703// UpdateActiveChannelCountForPonPort increments the global counter for active
704// channel count per pon port.
705func (va *VoltApplication) UpdateActiveChannelCountForPonPort(device, uniPortID string, ponPortID uint32, isAdd, isChannel bool, igd *IgmpGroupDevice) {
706 incrDecr := func(value uint32) uint32 {
707 if isAdd {
708 return value + 1
709 }
710 return value - 1
711 }
712 if d, exists := va.DevicesDisc.Load(device); exists {
713 voltDevice := d.(*VoltDevice)
714
715 if isChannel {
716 voltDevice.ActiveChannelCountLock.Lock()
717 // If New channel is added/deleted, then only update the ActiveChannelsPerPon
718 if value, ok := voltDevice.ActiveChannelsPerPon.Load(ponPortID); ok {
719 port := value.(*PonPortCfg)
720 port.ActiveIGMPChannels = incrDecr(port.ActiveIGMPChannels)
721 voltDevice.ActiveChannelsPerPon.Store(ponPortID, port)
722 logger.Debugw(ctx, "+++ActiveChannelsPerPon", log.Fields{"count": port.ActiveIGMPChannels}) // TODO: remove me
723 }
724 voltDevice.ActiveChannelCountLock.Unlock()
725 }
726 if uPort, ok := voltDevice.Ports.Load(uniPortID); ok {
727 uniPort := uPort.(*VoltPort)
728 uniPort.ActiveChannels = incrDecr(uniPort.ActiveChannels)
729 voltDevice.Ports.Store(uniPortID, uniPort)
730 logger.Debugw(ctx, "+++ActiveChannelsPerSub", log.Fields{"count": uniPort.ActiveChannels}) // TODO: remove me
731 }
732 }
733}
734
735// IsMaxChannelsCountExceeded checks if the PON port active channel
736// capacity and subscriber level channel capacity is reached to max allowed
737// channel per pon threshold. If Exceeds, return true else return false.
738func (va *VoltApplication) IsMaxChannelsCountExceeded(device, uniPortID string,
739 ponPortID uint32, ig *IgmpGroup, channelIP net.IP, mvp *MvlanProfile) bool {
Naveen Sampath04696f72022-06-13 15:19:14 +0530740 // New receiver check is required to identify the IgmpReportMsg received
741 // in response to the IGMP Query sent from VGC.
742 if newReceiver := ig.IsNewReceiver(device, uniPortID, channelIP); !newReceiver {
743 logger.Debugw(ctx, "Not a new receiver. It is a response to IGMP Query",
744 log.Fields{"port": uniPortID, "channel": channelIP})
745 return false
746 }
747
748 if vDev, exists := va.DevicesDisc.Load(device); exists {
749 voltDevice := vDev.(*VoltDevice)
750
751 // Checking subscriber active channel count with maxChannelsAllowedPerSub
752 if uniPort, present := voltDevice.Ports.Load(uniPortID); present {
753 if uniPort.(*VoltPort).ActiveChannels >= mvp.MaxActiveChannels {
754 logger.Errorw(ctx, "Max allowed channels per subscriber is exceeded",
755 log.Fields{"activeCount": uniPort.(*VoltPort).ActiveChannels, "channel": channelIP, "UNI": uniPort.(*VoltPort).Name})
756 if !(uniPort.(*VoltPort).ChannelPerSubAlarmRaised) {
757 serviceName := GetMcastServiceForSubAlarm(uniPort.(*VoltPort), mvp)
758 logger.Debugw(ctx, "Raising-SendActiveChannelPerSubscriberAlarm-Initiated", log.Fields{"ActiveChannels": uniPort.(*VoltPort).ActiveChannels, "ServiceName": serviceName})
759 uniPort.(*VoltPort).ChannelPerSubAlarmRaised = true
760 }
761 return true
762 }
763 } else {
764 logger.Errorw(ctx, "UNI port not found in VoltDevice", log.Fields{"uniPortID": uniPortID})
765 }
766 if value, ok := voltDevice.ActiveChannelsPerPon.Load(ponPortID); ok {
767 ponPort := value.(*PonPortCfg)
768
769 logger.Debugw(ctx, "----Active channels count for PON port",
770 log.Fields{"PonPortID": ponPortID, "activeChannels": ponPort.ActiveIGMPChannels,
771 "maxAllowedChannelsPerPon": ponPort.MaxActiveChannels})
772
773 if ponPort.ActiveIGMPChannels < ponPort.MaxActiveChannels {
774 // PON port active channel capacity is not yet reached to max allowed channels per pon.
775 // So allowing to add receiver.
776 return false
777 } else if ponPort.ActiveIGMPChannels >= ponPort.MaxActiveChannels && ig != nil {
778 // PON port active channel capacity is reached to max allowed channels per pon.
779 // Check if same channel is already configured on that PON port.
780 // If that channel is present, then allow AddReceiver else it will be rejected.
781 igd, isPresent := ig.Devices[device]
782 if isPresent {
783 if channelListForPonPort, _ := igd.PonPortChannelMap.Get(ponPortID); channelListForPonPort != nil {
784 if _, isExists := channelListForPonPort.(*PonPortChannels).ChannelList.Get(channelIP.String()); isExists {
785 return false
786 }
787 }
788 }
789 }
790 logger.Errorw(ctx, "Active channels count for PON port exceeded",
791 log.Fields{"PonPortID": ponPortID, "activeChannels": ponPort.ActiveIGMPChannels, "channel": channelIP, "UNI": uniPortID})
792 } else {
793 logger.Warnw(ctx, "PON port level active channel count does not exists",
794 log.Fields{"ponPortID": ponPortID})
795 return false
796 }
797 }
798 logger.Warnw(ctx, "Max allowed channels per pon threshold is reached", log.Fields{"PonPortID": ponPortID})
799 return true
800}
801
802// ProcessIgmpv2Pkt : This is IGMPv2 packet.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530803func (va *VoltApplication) ProcessIgmpv2Pkt(cntx context.Context, device string, port string, pkt gopacket.Packet) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530804 // First get the layers of interest
805 dot1Q := pkt.Layer(layers.LayerTypeDot1Q).(*layers.Dot1Q)
806 pktVlan := of.VlanType(dot1Q.VLANIdentifier)
807 igmpv2 := pkt.Layer(layers.LayerTypeIGMP).(*layers.IGMPv1or2)
808
809 ponPortID := va.GetPonPortID(device, port)
810
811 var vpv *VoltPortVnet
812
813 logger.Debugw(ctx, "Received IGMPv2 Type", log.Fields{"Type": igmpv2.Type})
814
mgoudabb017dc2025-10-29 19:53:34 +0530815 switch igmpv2.Type {
816 case layers.IGMPMembershipReportV2, layers.IGMPMembershipReportV1:
Naveen Sampath04696f72022-06-13 15:19:14 +0530817 logger.Infow(ctx, "IGMP Join received: v2", log.Fields{"Addr": igmpv2.GroupAddress, "Port": port})
818
819 // This is a report coming from the PON. We must be able to first find the
820 // subscriber from the VLAN tag and port and verify if the IGMP proxy is
821 // enabled for the subscriber
822 vpv, _ = va.GetVnetFromPkt(device, port, pkt)
823
824 if vpv == nil {
825 logger.Errorw(ctx, "Couldn't find VNET associated with port", log.Fields{"Port": port})
826 return
827 } else if !vpv.IgmpEnabled {
828 logger.Errorw(ctx, "IGMP is not activated on the port", log.Fields{"Port": port})
829 return
830 }
831
832 mvp := va.GetMvlanProfileByName(vpv.MvlanProfileName)
833 if mvp == nil {
834 logger.Errorw(ctx, "Igmp Packet Received for Subscriber with Missing Mvlan Profile",
835 log.Fields{"Receiver": vpv.Port, "MvlanProfile": vpv.MvlanProfileName})
836 return
837 }
838 mvlan := mvp.Mvlan
839
840 mvp.mvpLock.RLock()
841 defer mvp.mvpLock.RUnlock()
842 // The subscriber is validated and now process the IGMP report
843 ig := va.GetIgmpGroup(mvlan, igmpv2.GroupAddress)
844
845 if yes := va.IsMaxChannelsCountExceeded(device, port, ponPortID, ig, igmpv2.GroupAddress, mvp); yes {
846 logger.Warnw(ctx, "Dropping IGMP Join v2: Active channel threshold exceeded",
847 log.Fields{"PonPortID": ponPortID, "Addr": igmpv2.GroupAddress, "MvlanProfile": vpv.MvlanProfileName})
848 return
849 }
850 if ig != nil {
851 logger.Infow(ctx, "IGMP Group", log.Fields{"Group": ig.GroupID, "devices": ig.Devices})
852 // If the IGMP group is already created. just add the receiver
853 ig.IgmpGroupLock.Lock()
854 // Check for port state to avoid race condition where PortDown event
855 // acquired lock before packet processing
856 vd := GetApplication().GetDevice(device)
857 vp := vd.GetPort(port)
858 if vp == nil || vp.State != PortStateUp {
859 logger.Warnw(ctx, "Join received from a Port that is DOWN or not present",
Akash Sonia8246972023-01-03 10:37:08 +0530860 log.Fields{"Port": port})
Naveen Sampath04696f72022-06-13 15:19:14 +0530861 ig.IgmpGroupLock.Unlock()
862 return
863 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530864 ig.AddReceiver(cntx, device, port, igmpv2.GroupAddress, nil, IgmpVersion2, dot1Q.VLANIdentifier, dot1Q.Priority, ponPortID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530865 ig.IgmpGroupLock.Unlock()
866 } else {
867 // Create the IGMP group and then add the receiver to the group
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530868 if ig := va.AddIgmpGroup(cntx, vpv.MvlanProfileName, igmpv2.GroupAddress, device); ig != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530869 logger.Infow(ctx, "New IGMP Group", log.Fields{"Group": ig.GroupID, "devices": ig.Devices})
870 ig.IgmpGroupLock.Lock()
871 // Check for port state to avoid race condition where PortDown event
872 // acquired lock before packet processing
873 vd := GetApplication().GetDevice(device)
874 vp := vd.GetPort(port)
875 if vp == nil || vp.State != PortStateUp {
876 logger.Warnw(ctx, "Join received from a Port that is DOWN or not present",
Akash Sonia8246972023-01-03 10:37:08 +0530877 log.Fields{"Port": port})
Naveen Sampath04696f72022-06-13 15:19:14 +0530878 ig.IgmpGroupLock.Unlock()
879 return
880 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530881 ig.AddReceiver(cntx, device, port, igmpv2.GroupAddress, nil, IgmpVersion2, dot1Q.VLANIdentifier, dot1Q.Priority, ponPortID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530882 ig.IgmpGroupLock.Unlock()
883 } else {
884 logger.Errorw(ctx, "IGMP Group Creation Failed", log.Fields{"Addr": igmpv2.GroupAddress})
885 return
886 }
887 }
mgoudabb017dc2025-10-29 19:53:34 +0530888 case layers.IGMPLeaveGroup:
Naveen Sampath04696f72022-06-13 15:19:14 +0530889 // This is a IGMP leave coming from one of the receivers. We essentially remove the
890 // the receiver.
891 logger.Infow(ctx, "IGMP Leave received: v2", log.Fields{"Addr": igmpv2.GroupAddress, "Port": port})
Naveen Sampath04696f72022-06-13 15:19:14 +0530892 vpv, _ = va.GetVnetFromPkt(device, port, pkt)
893 if vpv == nil {
894 logger.Errorw(ctx, "Couldn't find VNET associated with port", log.Fields{"Port": port})
895 return
896 } else if !vpv.IgmpEnabled {
897 logger.Errorw(ctx, "IGMP is not activated on the port", log.Fields{"Port": port})
898 return
899 }
900
901 mvp := va.GetMvlanProfileByName(vpv.MvlanProfileName)
902 mvp.mvpLock.RLock()
903 defer mvp.mvpLock.RUnlock()
904 mvlan := mvp.Mvlan
905 // The subscriber is validated and now process the IGMP report
906 if ig := va.GetIgmpGroup(mvlan, igmpv2.GroupAddress); ig != nil {
907 ig.IgmpGroupLock.Lock()
908 // Delete the receiver once the IgmpGroup is identified
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530909 ig.DelReceiver(cntx, device, port, igmpv2.GroupAddress, nil, ponPortID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530910 ig.IgmpGroupLock.Unlock()
911 if ig.NumDevicesActive() == 0 {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530912 va.DelIgmpGroup(cntx, ig)
Naveen Sampath04696f72022-06-13 15:19:14 +0530913 }
914 }
mgoudabb017dc2025-10-29 19:53:34 +0530915 default:
Naveen Sampath04696f72022-06-13 15:19:14 +0530916 // This must be a query on the NNI port. However, we dont make that assumption.
917 // Need to look for the IGMP group based on the VLAN in the packet as
918 // the MVLAN
919
920 //Check if mvlan profile exist for the incoming pkt vlan
921 profile, _ := va.MvlanProfilesByTag.Load(pktVlan)
922 if profile == nil {
923 logger.Errorw(ctx, "Mvlan Profile not found for incoming packet. Dropping Request", log.Fields{"Mvlan": pktVlan})
924 return
925 }
926 mvp := profile.(*MvlanProfile)
927 mvp.mvpLock.RLock()
928 defer mvp.mvpLock.RUnlock()
929
930 if net.ParseIP("0.0.0.0").Equal(igmpv2.GroupAddress) {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530931 va.processIgmpQueries(cntx, device, pktVlan, IgmpVersion2)
Naveen Sampath04696f72022-06-13 15:19:14 +0530932 } else {
933 if ig := va.GetIgmpGroup(pktVlan, igmpv2.GroupAddress); ig != nil {
934 ig.IgmpGroupLock.Lock()
935 igd, ok := ig.Devices[device]
936 if ok {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530937 igd.ProcessQuery(cntx, igmpv2.GroupAddress, IgmpVersion2)
Naveen Sampath04696f72022-06-13 15:19:14 +0530938 } else {
939 logger.Warnw(ctx, "IGMP Device not found", log.Fields{"Device": device, "Group": igmpv2.GroupAddress})
940 }
941 ig.IgmpGroupLock.Unlock()
942 }
943 }
944 }
945}
946
947// ProcessIgmpv3Pkt : Process IGMPv3 packet
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530948func (va *VoltApplication) ProcessIgmpv3Pkt(cntx context.Context, device string, port string, pkt gopacket.Packet) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530949 // First get the layers of interest
950 dot1QLayer := pkt.Layer(layers.LayerTypeDot1Q)
951
952 if dot1QLayer == nil {
953 logger.Error(ctx, "Igmp Packet Received without Vlan - Dropping pkt")
954 return
955 }
956 dot1Q := dot1QLayer.(*layers.Dot1Q)
957 pktVlan := of.VlanType(dot1Q.VLANIdentifier)
958 igmpv3 := pkt.Layer(layers.LayerTypeIGMP).(*layers.IGMP)
959
960 ponPortID := va.GetPonPortID(device, port)
961
962 var vpv *VoltPortVnet
963 logger.Debugw(ctx, "Received IGMPv3 Type", log.Fields{"Type": igmpv3.Type})
964
965 if igmpv3.Type == layers.IGMPMembershipReportV3 {
966 // This is a report coming from the PON. We must be able to first find the
967 // subscriber from the VLAN tag and port and verify if the IGMP proxy is
968 // enabled for the subscriber
969 vpv, _ = va.GetVnetFromPkt(device, port, pkt)
970 if vpv == nil {
971 logger.Errorw(ctx, "Couldn't find VNET associated with port", log.Fields{"Port": port})
972 return
973 } else if !vpv.IgmpEnabled {
974 logger.Errorw(ctx, "IGMP is not activated on the port", log.Fields{"Port": port})
975 return
976 }
977 mvp := va.GetMvlanProfileByName(vpv.MvlanProfileName)
978 if mvp == nil {
979 logger.Errorw(ctx, "Igmp Packet received for Subscriber with Missing Mvlan Profile",
980 log.Fields{"Receiver": vpv.Port, "MvlanProfile": vpv.MvlanProfileName})
981 return
982 }
983 mvp.mvpLock.RLock()
984 defer mvp.mvpLock.RUnlock()
985 mvlan := mvp.Mvlan
986
vinokuma926cb3e2023-03-29 11:41:06 +0530987 for i, group := range igmpv3.GroupRecords {
Naveen Sampath04696f72022-06-13 15:19:14 +0530988 isJoin := isIgmpJoin(group.Type, group.SourceAddresses)
989 // The subscriber is validated and now process the IGMP report
990 ig := va.GetIgmpGroup(mvlan, group.MulticastAddress)
991 if isJoin {
992 if yes := va.IsMaxChannelsCountExceeded(device, port, ponPortID, ig, group.MulticastAddress, mvp); yes {
993 logger.Warnw(ctx, "Dropping IGMP Join v3: Active channel threshold exceeded",
994 log.Fields{"PonPortID": ponPortID, "Addr": group.MulticastAddress, "MvlanProfile": vpv.MvlanProfileName})
995
996 return
997 }
998 if ig != nil {
999 // If the IGMP group is already created. just add the receiver
1000 logger.Infow(ctx, "IGMP Join received for existing group", log.Fields{"Addr": group.MulticastAddress, "Port": port})
1001 ig.IgmpGroupLock.Lock()
1002 // Check for port state to avoid race condition where PortDown event
1003 // acquired lock before packet processing
1004 vd := GetApplication().GetDevice(device)
1005 vp := vd.GetPort(port)
1006 if vp == nil || vp.State != PortStateUp {
1007 logger.Warnw(ctx, "Join received from a Port that is DOWN or not present",
Akash Sonia8246972023-01-03 10:37:08 +05301008 log.Fields{"Port": port})
Naveen Sampath04696f72022-06-13 15:19:14 +05301009 ig.IgmpGroupLock.Unlock()
1010 return
1011 }
vinokuma926cb3e2023-03-29 11:41:06 +05301012 ig.AddReceiver(cntx, device, port, group.MulticastAddress, &igmpv3.GroupRecords[i], IgmpVersion3,
Naveen Sampath04696f72022-06-13 15:19:14 +05301013 dot1Q.VLANIdentifier, dot1Q.Priority, ponPortID)
1014 ig.IgmpGroupLock.Unlock()
1015 } else {
1016 // Create the IGMP group and then add the receiver to the group
1017 logger.Infow(ctx, "IGMP Join received for new group", log.Fields{"Addr": group.MulticastAddress, "Port": port})
vinokuma926cb3e2023-03-29 11:41:06 +05301018 if ig = va.AddIgmpGroup(cntx, vpv.MvlanProfileName, group.MulticastAddress, device); ig != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301019 ig.IgmpGroupLock.Lock()
1020 // Check for port state to avoid race condition where PortDown event
1021 // acquired lock before packet processing
1022 vd := GetApplication().GetDevice(device)
1023 vp := vd.GetPort(port)
1024 if vp == nil || vp.State != PortStateUp {
1025 logger.Warnw(ctx, "Join received from a Port that is DOWN or not present",
Akash Sonia8246972023-01-03 10:37:08 +05301026 log.Fields{"Port": port})
Naveen Sampath04696f72022-06-13 15:19:14 +05301027 ig.IgmpGroupLock.Unlock()
1028 return
1029 }
vinokuma926cb3e2023-03-29 11:41:06 +05301030 ig.AddReceiver(cntx, device, port, group.MulticastAddress, &igmpv3.GroupRecords[i], IgmpVersion3,
Naveen Sampath04696f72022-06-13 15:19:14 +05301031 dot1Q.VLANIdentifier, dot1Q.Priority, ponPortID)
1032 ig.IgmpGroupLock.Unlock()
1033 } else {
1034 logger.Warnw(ctx, "IGMP Group Creation Failed", log.Fields{"Addr": group.MulticastAddress})
1035 }
1036 }
1037 } else if ig != nil {
1038 logger.Infow(ctx, "IGMP Leave received for existing group", log.Fields{"Addr": group.MulticastAddress, "Port": port})
1039 ig.IgmpGroupLock.Lock()
vinokuma926cb3e2023-03-29 11:41:06 +05301040 ig.DelReceiver(cntx, device, port, group.MulticastAddress, &igmpv3.GroupRecords[i], ponPortID)
Naveen Sampath04696f72022-06-13 15:19:14 +05301041 ig.IgmpGroupLock.Unlock()
1042 if ig.NumDevicesActive() == 0 {
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301043 va.DelIgmpGroup(cntx, ig)
Naveen Sampath04696f72022-06-13 15:19:14 +05301044 }
1045 } else {
1046 logger.Warnw(ctx, "IGMP Leave received for unknown group", log.Fields{"Addr": group.MulticastAddress})
1047 }
1048 }
1049 } else {
1050 // This must be a query on the NNI port. However, we dont make that assumption.
1051 // Need to look for the IGMP group based on the VLAN in the packet as
1052 // the MVLAN
1053
1054 //Check if mvlan profile exist for the incoming pkt vlan
1055 profile, _ := va.MvlanProfilesByTag.Load(pktVlan)
1056 if profile == nil {
1057 logger.Errorw(ctx, "Mvlan Profile not found for incoming packet. Dropping Request", log.Fields{"Mvlan": pktVlan})
1058 return
1059 }
1060 mvp := profile.(*MvlanProfile)
1061 mvp.mvpLock.RLock()
1062 defer mvp.mvpLock.RUnlock()
1063
1064 if net.ParseIP("0.0.0.0").Equal(igmpv3.GroupAddress) {
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301065 va.processIgmpQueries(cntx, device, pktVlan, IgmpVersion3)
Naveen Sampath04696f72022-06-13 15:19:14 +05301066 } else {
1067 if ig := va.GetIgmpGroup(pktVlan, igmpv3.GroupAddress); ig != nil {
1068 ig.IgmpGroupLock.Lock()
1069 igd, ok := ig.Devices[device]
1070 if ok {
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301071 igd.ProcessQuery(cntx, igmpv3.GroupAddress, IgmpVersion3)
Naveen Sampath04696f72022-06-13 15:19:14 +05301072 } else {
1073 logger.Warnw(ctx, "IGMP Device not found", log.Fields{"Device": device, "Group": igmpv3.GroupAddress})
1074 }
1075 ig.IgmpGroupLock.Unlock()
1076 }
1077 }
1078 }
1079}
1080
1081// processIgmpQueries to process the igmp queries
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301082func (va *VoltApplication) processIgmpQueries(cntx context.Context, device string, pktVlan of.VlanType, version uint8) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301083 // This is a generic query and respond with all the groups channels in currently being viewed.
1084 processquery := func(key interface{}, value interface{}) bool {
1085 ig := value.(*IgmpGroup)
1086 ig.IgmpGroupLock.Lock()
1087 if ig.Mvlan != pktVlan {
1088 ig.IgmpGroupLock.Unlock()
1089 return true
1090 }
1091 igd, ok := ig.Devices[device]
1092 if !ok {
1093 logger.Warnw(ctx, "IGMP Device not found", log.Fields{"Device": device})
1094 ig.IgmpGroupLock.Unlock()
1095 return true
1096 }
1097 processQueryForEachChannel := func(key interface{}, value interface{}) bool {
1098 groupAddr := key.(string)
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301099 igd.ProcessQuery(cntx, net.ParseIP(groupAddr), version)
Naveen Sampath04696f72022-06-13 15:19:14 +05301100 return true
1101 }
1102 igd.GroupChannels.Range(processQueryForEachChannel)
1103 ig.IgmpGroupLock.Unlock()
1104 return true
1105 }
1106 va.IgmpGroups.Range(processquery)
1107}
1108
1109// isIgmpJoin to check if it is igmp join
1110func isIgmpJoin(recordType layers.IGMPv3GroupRecordType, sourceAddr []net.IP) bool {
1111 var join = false
1112
1113 if (layers.IGMPToEx == recordType) || (layers.IGMPIsEx == recordType) {
1114 join = true
1115 } else if layers.IGMPBlock == recordType {
1116 if len(sourceAddr) == 0 {
1117 join = true
1118 }
1119 } else if (layers.IGMPToIn == recordType) || (layers.IGMPIsIn == recordType) || (layers.IGMPAllow == recordType) {
1120 if len(sourceAddr) != 0 {
1121 join = true
1122 }
1123 }
1124 return join
1125}
1126
1127func isIncl(recordType layers.IGMPv3GroupRecordType) bool {
Naveen Sampath04696f72022-06-13 15:19:14 +05301128 if (layers.IGMPToIn == recordType) || (layers.IGMPIsIn == recordType) || (layers.IGMPAllow == recordType) {
1129 return true
1130 }
1131 return false
1132}
1133
1134// IgmpProcessPkt to process the IGMP packet received. The packet received brings along with it
1135// the port on which the packet is received and the device the port is in.
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301136func (va *VoltApplication) IgmpProcessPkt(cntx context.Context, device string, port string, pkt gopacket.Packet) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301137 igmpl := pkt.Layer(layers.LayerTypeIGMP)
1138 if igmpl == nil {
1139 logger.Error(ctx, "Invalid IGMP packet arrived as IGMP packet")
1140 return
1141 }
1142 if igmp, ok := igmpl.(*layers.IGMPv1or2); ok {
1143 // This is an IGMPv2 packet.
1144 logger.Debugw(ctx, "IGMPv2 Packet Received", log.Fields{"IPAddr": igmp.GroupAddress})
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301145 va.ProcessIgmpv2Pkt(cntx, device, port, pkt)
Naveen Sampath04696f72022-06-13 15:19:14 +05301146 return
1147 }
1148 if igmpv3, ok := igmpl.(*layers.IGMP); ok {
1149 logger.Debugw(ctx, "IGMPv3 Packet Received", log.Fields{"NumOfGroups": igmpv3.NumberOfGroupRecords})
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301150 va.ProcessIgmpv3Pkt(cntx, device, port, pkt)
Naveen Sampath04696f72022-06-13 15:19:14 +05301151 }
1152}
1153
1154// IgmpPacketInd for igmp packet indication
1155func (va *VoltApplication) IgmpPacketInd(device string, port string, pkt gopacket.Packet) {
1156 pt := NewIgmpPacketTask(device, port, pkt)
1157 va.IgmpTasks.AddTask(pt)
1158}
1159
Naveen Sampath04696f72022-06-13 15:19:14 +05301160// storeMvlansMap to store mvlan map
1161func (va *VoltApplication) storeMvlansMap(mvlan of.VlanType, name string, mvp *MvlanProfile) {
1162 va.MvlanProfilesByTag.Store(mvlan, mvp)
1163 va.MvlanProfilesByName.Store(name, mvp)
1164}
1165
1166// deleteMvlansMap to delete mvlan map
1167func (va *VoltApplication) deleteMvlansMap(mvlan of.VlanType, name string) {
1168 va.MvlanProfilesByTag.Delete(mvlan)
1169 va.MvlanProfilesByName.Delete(name)
1170}
1171
1172// RestoreMvlansFromDb to read from the DB and restore all the MVLANs
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301173func (va *VoltApplication) RestoreMvlansFromDb(cntx context.Context) {
1174 mvlans, _ := db.GetMvlans(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +05301175 for _, mvlan := range mvlans {
1176 b, ok := mvlan.Value.([]byte)
1177 if !ok {
1178 logger.Warn(ctx, "The value type is not []byte")
1179 continue
1180 }
1181 var mvp MvlanProfile
1182 err := json.Unmarshal(b, &mvp)
1183 if err != nil {
1184 logger.Warn(ctx, "Unmarshal of MVLAN failed")
1185 continue
1186 }
1187 va.storeMvlansMap(mvp.Mvlan, mvp.Name, &mvp)
1188
1189 for srNo := range mvp.DevicesList {
1190 if mvp.IgmpServVersion[srNo] == nil {
1191 servVersion := IgmpVersion0
1192 mvp.IgmpServVersion[srNo] = &servVersion
1193 }
1194 }
1195 logger.Infow(ctx, "Restored Mvlan Profile", log.Fields{"MVPName": mvp.Name})
1196 }
1197}
1198
1199// GetMvlanProfileByTag fetches MVLAN profile based on the MC VLAN
1200func (va *VoltApplication) GetMvlanProfileByTag(vlan of.VlanType) *MvlanProfile {
1201 if mvp, ok := va.MvlanProfilesByTag.Load(vlan); ok {
1202 return mvp.(*MvlanProfile)
1203 }
1204 return nil
1205}
1206
1207// GetMvlanProfileByName fetches MVLAN profile based on the profile name.
1208func (va *VoltApplication) GetMvlanProfileByName(name string) *MvlanProfile {
1209 if mvp, ok := va.MvlanProfilesByName.Load(name); ok {
1210 return mvp.(*MvlanProfile)
1211 }
1212 return nil
1213}
1214
vinokuma926cb3e2023-03-29 11:41:06 +05301215// UpdateMvlanProfile - only channel groups be updated
Akash Reddy Kankanala105581b2024-09-11 05:20:38 +05301216func (va *VoltApplication) UpdateMvlanProfile(cntx context.Context, name string, vlan of.VlanType, groups map[string][]string, activeChannelCount uint32, proxy map[string]common.MulticastGroupProxy) error {
Naveen Sampath04696f72022-06-13 15:19:14 +05301217 mvpIntf, ok := va.MvlanProfilesByName.Load(name)
1218 if !ok {
1219 logger.Error(ctx, "Update Mvlan Failed: Profile does not exist")
1220 return errors.New("MVLAN profile not found")
1221 }
1222 mvp := mvpIntf.(*MvlanProfile)
1223 // check if groups are same then just update the OLTSerial numbers, push the config on new serial numbers
1224
1225 existingGroup := mvp.Groups
1226 existingProxy := mvp.Proxy
1227 mvp.Groups = make(map[string]*MvlanGroup)
1228 mvp.Proxy = make(map[string]*MCGroupProxy)
1229
1230 /* Need to protect groups and proxy write lock */
1231 mvp.mvpLock.Lock()
1232 for grpName, grpIPList := range groups {
1233 mvp.AddMvlanGroup(grpName, grpIPList)
1234 }
1235 for grpName, proxyInfo := range proxy {
1236 mvp.AddMvlanProxy(grpName, proxyInfo)
1237 }
1238 if _, ok := mvp.Groups[common.StaticGroup]; ok {
1239 if _, yes := mvp.Proxy[common.StaticGroup]; !yes {
1240 mvp.Groups[common.StaticGroup].IsStatic = true
1241 }
1242 }
1243 prevMaxActiveChannels := mvp.MaxActiveChannels
1244 if reflect.DeepEqual(mvp.Groups, existingGroup) && reflect.DeepEqual(mvp.Proxy, existingProxy) {
1245 logger.Info(ctx, "No change in groups config")
1246 if uint32(activeChannelCount) != mvp.MaxActiveChannels {
1247 mvp.MaxActiveChannels = uint32(activeChannelCount)
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301248 if err := mvp.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301249 logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
1250 }
1251 if prevMaxActiveChannels != mvp.MaxActiveChannels {
1252 mvp.UpdateActiveChannelSubscriberAlarm()
1253 }
1254 }
1255 mvp.mvpLock.Unlock()
1256 return nil
1257 }
1258 mvp.mvpLock.Unlock()
1259 mvp.MaxActiveChannels = uint32(activeChannelCount)
1260
1261 // Status is maintained so that in the event of any crash or reboot during update,
1262 // the recovery is possible once the pod is UP again
1263 mvp.SetUpdateStatus("", UpdateInProgress)
1264 mvp.oldGroups = existingGroup
1265 mvp.oldProxy = existingProxy
1266 va.storeMvlansMap(vlan, name, mvp)
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301267 if err := mvp.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301268 logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
1269 }
1270 if prevMaxActiveChannels != mvp.MaxActiveChannels {
1271 mvp.UpdateActiveChannelSubscriberAlarm()
1272 }
1273
1274 // The update task is added as part of Igm p task list, so that any parallel igmp pkt processing is avoided
1275 // Until, the update operation is completed, the igmp pkt processing will be enqueued
1276 updateTask := NewUpdateMvlanTask(mvp, "")
1277 va.IgmpTasks.AddTask(updateTask)
1278 return nil
1279}
1280
1281// isDeviceInList to check if device is the list
1282func isDeviceInList(serialNum string, OLTSerialNums []string) bool {
1283 for _, oltSerialNum := range OLTSerialNums {
1284 if serialNum == oltSerialNum {
1285 return true
1286 }
1287 }
1288 return false
1289}
1290
1291// McastConfigKey creates the key using the olt serial number and mvlan profile id
1292func McastConfigKey(oltSerialNum string, mvlanProfID string) string {
1293 return oltSerialNum + "_" + mvlanProfID
1294}
1295
1296// GetMcastConfig to get McastConfig Information by OLT and Mvlan Profile ID
1297func (va *VoltApplication) GetMcastConfig(oltSerialNum string, mvlanProfID string) *McastConfig {
1298 if mc, ok := va.McastConfigMap.Load(McastConfigKey(oltSerialNum, mvlanProfID)); ok {
1299 return mc.(*McastConfig)
1300 }
1301 return nil
1302}
1303
1304func (va *VoltApplication) storeMcastConfig(oltSerialNum string, mvlanProfID string, mcastConfig *McastConfig) {
1305 va.McastConfigMap.Store(McastConfigKey(oltSerialNum, mvlanProfID), mcastConfig)
1306}
1307
1308func (va *VoltApplication) deleteMcastConfig(oltSerialNum string, mvlanProfID string) {
1309 va.McastConfigMap.Delete(McastConfigKey(oltSerialNum, mvlanProfID))
1310}
1311
1312// AddMcastConfig for addition of a MVLAN profile
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301313func (va *VoltApplication) AddMcastConfig(cntx context.Context, MvlanProfileID string, IgmpProfileID string, IgmpProxyIP string, OltSerialNum string) error {
Naveen Sampath04696f72022-06-13 15:19:14 +05301314 var mcastCfg *McastConfig
1315
1316 mcastCfg = va.GetMcastConfig(OltSerialNum, MvlanProfileID)
1317 if mcastCfg == nil {
1318 mcastCfg = &McastConfig{}
1319 } else {
1320 logger.Debugw(ctx, "Mcast Config already exists", log.Fields{"OltSerialNum": mcastCfg.OltSerialNum,
1321 "MVLAN Profile ID": mcastCfg.MvlanProfileID})
1322 }
1323
1324 // Update all igds available
1325 mvpIntf, ok := va.MvlanProfilesByName.Load(MvlanProfileID)
1326 if !ok {
1327 return errors.New("MVLAN profile not found during add mcast config")
1328 }
1329 mvlan := mvpIntf.(*MvlanProfile).Mvlan
1330
1331 mcastCfg.OltSerialNum = OltSerialNum
1332 mcastCfg.MvlanProfileID = MvlanProfileID
1333 mcastCfg.IgmpProfileID = IgmpProfileID
1334 mcastCfg.IgmpProxyIP = net.ParseIP(IgmpProxyIP)
1335
1336 proxyCfg := va.getIgmpProfileMap(IgmpProfileID)
1337
1338 iterIgmpGroups := func(key interface{}, value interface{}) bool {
1339 ig := value.(*IgmpGroup)
1340 if ig.Mvlan != mvlan {
1341 return true
1342 }
1343
1344 for _, igd := range ig.Devices {
1345 if igd.SerialNo != OltSerialNum {
1346 continue
1347 }
1348 igd.proxyCfg = proxyCfg
1349 if IgmpProfileID == "" {
1350 igd.IgmpProxyIP = &igd.proxyCfg.IgmpSourceIP
1351 } else {
1352 igd.IgmpProxyIP = &mcastCfg.IgmpProxyIP
1353 }
1354 mcastCfg.IgmpGroupDevices.Store(igd.GroupID, igd)
1355 logger.Debugw(ctx, "Igd updated with proxyCfg and proxyIP", log.Fields{"name": igd.GroupName,
1356 "IgmpProfileID": IgmpProfileID, "ProxyIP": mcastCfg.IgmpProxyIP})
1357 }
1358 return true
1359 }
1360 va.IgmpGroups.Range(iterIgmpGroups)
1361
1362 va.storeMcastConfig(OltSerialNum, MvlanProfileID, mcastCfg)
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301363 if err := mcastCfg.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301364 logger.Errorw(ctx, "McastConfig Write to DB failed", log.Fields{"OltSerialNum": mcastCfg.OltSerialNum, "MvlanProfileID": mcastCfg.MvlanProfileID})
1365 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301366 va.addOltToMvlan(cntx, MvlanProfileID, OltSerialNum)
Naveen Sampath04696f72022-06-13 15:19:14 +05301367
1368 return nil
1369}
1370
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301371func (va *VoltApplication) addOltToMvlan(cntx context.Context, MvlanProfileID string, OltSerialNum string) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301372 var mvp *MvlanProfile
1373 if mvpIntf, ok := va.MvlanProfilesByName.Load(MvlanProfileID); ok {
1374 servVersion := IgmpVersion0
1375 mvp = mvpIntf.(*MvlanProfile)
1376 mvp.DevicesList[OltSerialNum] = NoOp
1377 mvp.IgmpServVersion[OltSerialNum] = &servVersion
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301378 if err := mvp.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301379 logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
1380 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301381 mvp.pushIgmpMcastFlows(cntx, OltSerialNum)
Naveen Sampath04696f72022-06-13 15:19:14 +05301382 }
1383}
1384
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301385func (va *VoltApplication) delOltFromMvlan(cntx context.Context, MvlanProfileID string, OltSerialNum string) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301386 var mvp *MvlanProfile
1387 if mvpIntf, ok := va.MvlanProfilesByName.Load(MvlanProfileID); ok {
1388 mvp = mvpIntf.(*MvlanProfile)
vinokuma926cb3e2023-03-29 11:41:06 +05301389 // Delete from mvp list
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301390 mvp.removeIgmpMcastFlows(cntx, OltSerialNum)
Naveen Sampath04696f72022-06-13 15:19:14 +05301391 delete(mvp.DevicesList, OltSerialNum)
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301392 if err := mvp.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301393 logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
1394 }
1395 }
1396}
1397
1398// DelMcastConfig for addition of a MVLAN profile
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301399func (va *VoltApplication) DelMcastConfig(cntx context.Context, MvlanProfileID string, IgmpProfileID string, IgmpProxyIP string, OltSerialNum string) {
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301400 va.delOltFromMvlan(cntx, MvlanProfileID, OltSerialNum)
Naveen Sampath04696f72022-06-13 15:19:14 +05301401 va.deleteMcastConfig(OltSerialNum, MvlanProfileID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301402 _ = db.DelMcastConfig(cntx, McastConfigKey(OltSerialNum, MvlanProfileID))
Tinoj Joseph50d722c2022-12-06 22:53:22 +05301403 if d, _ := va.GetDeviceBySerialNo(OltSerialNum); d != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301404 if mvp := va.GetMvlanProfileByName(MvlanProfileID); mvp != nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301405 va.RemoveGroupsFromPendingPool(cntx, d.Name, mvp.Mvlan)
Naveen Sampath04696f72022-06-13 15:19:14 +05301406 }
1407 }
1408}
1409
1410// DelAllMcastConfig for deletion of all mcast config
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301411func (va *VoltApplication) DelAllMcastConfig(cntx context.Context, OltSerialNum string) error {
Naveen Sampath04696f72022-06-13 15:19:14 +05301412 deleteIndividualMcastConfig := func(key interface{}, value interface{}) bool {
1413 mcastCfg := value.(*McastConfig)
1414 if mcastCfg.OltSerialNum == OltSerialNum {
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301415 va.DelMcastConfig(cntx, mcastCfg.MvlanProfileID, mcastCfg.IgmpProfileID, mcastCfg.IgmpProxyIP.String(), mcastCfg.OltSerialNum)
Naveen Sampath04696f72022-06-13 15:19:14 +05301416 }
1417 return true
1418 }
1419 va.McastConfigMap.Range(deleteIndividualMcastConfig)
1420 return nil
1421}
1422
1423// UpdateMcastConfig for addition of a MVLAN profile
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301424func (va *VoltApplication) UpdateMcastConfig(cntx context.Context, MvlanProfileID string, IgmpProfileID string, IgmpProxyIP string, OltSerialNum string) error {
Naveen Sampath04696f72022-06-13 15:19:14 +05301425 mcastCfg := va.GetMcastConfig(OltSerialNum, MvlanProfileID)
1426 if mcastCfg == nil {
1427 logger.Warnw(ctx, "Mcast Config not found. Unable to update", log.Fields{"Mvlan Profile ID": MvlanProfileID, "OltSerialNum": OltSerialNum})
1428 return nil
1429 }
1430
1431 oldProfID := mcastCfg.IgmpProfileID
1432 mcastCfg.IgmpProfileID = IgmpProfileID
1433 mcastCfg.IgmpProxyIP = net.ParseIP(IgmpProxyIP)
1434
1435 va.storeMcastConfig(OltSerialNum, MvlanProfileID, mcastCfg)
1436
1437 // Update all igds
1438 if oldProfID != mcastCfg.IgmpProfileID {
1439 updateIgdProxyCfg := func(key interface{}, value interface{}) bool {
1440 igd := value.(*IgmpGroupDevice)
1441 igd.proxyCfg = va.getIgmpProfileMap(mcastCfg.IgmpProfileID)
1442 if IgmpProfileID == "" {
1443 igd.IgmpProxyIP = &igd.proxyCfg.IgmpSourceIP
1444 } else {
1445 igd.IgmpProxyIP = &mcastCfg.IgmpProxyIP
1446 }
1447 return true
1448 }
1449 mcastCfg.IgmpGroupDevices.Range(updateIgdProxyCfg)
1450 }
1451
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301452 if err := mcastCfg.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301453 logger.Errorw(ctx, "McastConfig Write to DB failed", log.Fields{"OltSerialNum": mcastCfg.OltSerialNum, "MvlanProfileID": mcastCfg.MvlanProfileID})
1454 }
1455
1456 return nil
1457}
1458
1459// WriteToDb is utility to write Mcast config Info to database
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301460func (mc *McastConfig) WriteToDb(cntx context.Context) error {
Naveen Sampath04696f72022-06-13 15:19:14 +05301461 mc.Version = database.PresentVersionMap[database.McastConfigPath]
1462 b, err := json.Marshal(mc)
1463 if err != nil {
1464 return err
1465 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301466 if err1 := db.PutMcastConfig(cntx, McastConfigKey(mc.OltSerialNum, mc.MvlanProfileID), string(b)); err1 != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301467 return err1
1468 }
1469 return nil
1470}
1471
1472// RestoreMcastConfigsFromDb to read from the DB and restore Mcast configs
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301473func (va *VoltApplication) RestoreMcastConfigsFromDb(cntx context.Context) {
1474 mcastConfigs, _ := db.GetMcastConfigs(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +05301475 for hash, mcastConfig := range mcastConfigs {
1476 b, ok := mcastConfig.Value.([]byte)
1477 if !ok {
1478 logger.Warn(ctx, "The value type is not []byte")
1479 continue
1480 }
1481 var mc McastConfig
1482 err := json.Unmarshal(b, &mc)
1483 if err != nil {
1484 logger.Warn(ctx, "Unmarshal of Mcast config failed")
1485 continue
1486 }
1487 va.storeMcastConfig(mc.OltSerialNum, mc.MvlanProfileID, &mc)
1488 logger.Infow(ctx, "Restored Mcast config", log.Fields{"OltSerialNum": mc.OltSerialNum, "MvlanProfileID": mc.MvlanProfileID, "hash": hash})
1489 }
1490}
1491
1492// AddMvlanProfile for addition of a MVLAN profile
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301493func (va *VoltApplication) AddMvlanProfile(cntx context.Context, name string, mvlan of.VlanType, ponVlan of.VlanType,
Akash Reddy Kankanala105581b2024-09-11 05:20:38 +05301494 groups map[string][]string, isChannelBasedGroup bool, OLTSerialNum []string, activeChannelsPerPon uint32, proxy map[string]common.MulticastGroupProxy) error {
Naveen Sampath04696f72022-06-13 15:19:14 +05301495 var mvp *MvlanProfile
1496
1497 if mvp = va.GetMvlanProfileByTag(mvlan); mvp != nil {
1498 logger.Errorw(ctx, "Duplicate MVLAN ID configured", log.Fields{"mvlan": mvlan})
1499 return errors.New("MVLAN profile with same VLANID exists")
1500 }
1501 if mvpIntf, ok := va.MvlanProfilesByName.Load(name); ok {
1502 mvp = mvpIntf.(*MvlanProfile)
1503 for _, serialNum := range OLTSerialNum {
1504 if mvp.DevicesList[serialNum] != Nil {
1505 //This is backup restore scenario, just update the profile
1506 logger.Info(ctx, "Add Mvlan : Profile Name already exists, update-the-profile")
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301507 return va.UpdateMvlanProfile(cntx, name, mvlan, groups, activeChannelsPerPon, proxy)
Naveen Sampath04696f72022-06-13 15:19:14 +05301508 }
1509 }
1510 }
1511
1512 if mvp == nil {
Akash Reddy Kankanala105581b2024-09-11 05:20:38 +05301513 mvp = NewMvlanProfile(name, mvlan, ponVlan, isChannelBasedGroup, OLTSerialNum, activeChannelsPerPon)
Naveen Sampath04696f72022-06-13 15:19:14 +05301514 }
1515
1516 va.storeMvlansMap(mvlan, name, mvp)
1517
1518 /* Need to protect groups and proxy write lock */
1519 mvp.mvpLock.Lock()
1520 for grpName, grpInfo := range groups {
1521 mvp.AddMvlanGroup(grpName, grpInfo)
1522 }
1523 for grpName, proxyInfo := range proxy {
1524 mvp.AddMvlanProxy(grpName, proxyInfo)
1525 }
1526 if _, ok := mvp.Groups[common.StaticGroup]; ok {
1527 if _, yes := mvp.Proxy[common.StaticGroup]; !yes {
1528 mvp.Groups[common.StaticGroup].IsStatic = true
1529 }
1530 }
1531
1532 logger.Debugw(ctx, "Added MVLAN Profile", log.Fields{"MVLAN": mvp.Mvlan, "PonVlan": mvp.PonVlan, "Name": mvp.Name, "Grp IPs": mvp.Groups, "IsPonVlanPresent": mvp.IsPonVlanPresent})
1533 mvp.mvpLock.Unlock()
1534
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301535 if err := mvp.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301536 logger.Errorw(ctx, "Mvlan profile Write to DB failed", log.Fields{"ProfileName": mvp.Name})
1537 }
1538
1539 return nil
1540}
1541
Naveen Sampath04696f72022-06-13 15:19:14 +05301542// GetMvlanProfileForMcIP - Get an MVLAN profile for a given MC IP. This is used when an
1543// IGMP report is received from the PON port. The MVLAN profile
1544// located is used to idnetify the MC VLAN used in upstream for
1545// join/leave
1546func (va *VoltApplication) GetMvlanProfileForMcIP(profileName string, ip net.IP) (*MvlanProfile, string) {
1547 if mvpIntf, ok := va.MvlanProfilesByName.Load(profileName); ok {
1548 mvp := mvpIntf.(*MvlanProfile)
1549 if grpName := mvp.GetMvlanGroup(ip); grpName != "" {
1550 return mvp, grpName
1551 }
1552 } else {
1553 logger.Warnw(ctx, "Mvlan Profile not found for given profile name", log.Fields{"Profile": profileName})
1554 }
1555 return nil, ""
1556}
1557
Naveen Sampath04696f72022-06-13 15:19:14 +05301558// IgmpTick for igmp tick info
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301559func (va *VoltApplication) IgmpTick(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301560 tickCount++
1561 if (tickCount % 1000) == 0 {
1562 logger.Debugw(ctx, "Time @ Tick", log.Fields{"Tick": tickCount, "Time": time.Now()})
1563 }
1564 igmptick := func(key interface{}, value interface{}) bool {
1565 ig := value.(*IgmpGroup)
1566 if ig.NumDevicesActive() != 0 {
1567 if tickCount%10 == ig.Hash()%10 {
1568 ig.IgmpGroupLock.Lock()
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301569 ig.Tick(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +05301570 ig.IgmpGroupLock.Unlock()
1571 if ig.NumDevicesActive() == 0 {
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301572 va.DelIgmpGroup(cntx, ig)
Naveen Sampath04696f72022-06-13 15:19:14 +05301573 }
1574 }
1575 }
1576 return true
1577 }
1578 va.IgmpGroups.Range(igmptick)
1579}
1580
1581// Tick to add Tick Task
1582func (va *VoltApplication) Tick() {
1583 tt := NewTickTask()
1584 va.IgmpTasks.AddTask(tt)
1585 // va.IgmpTick()
1586}
1587
vinokuma926cb3e2023-03-29 11:41:06 +05301588// AddIgmpProfile for addition of IGMP Profile
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301589func (va *VoltApplication) AddIgmpProfile(cntx context.Context, igmpProfileConfig *common.IGMPConfig) error {
Naveen Sampath04696f72022-06-13 15:19:14 +05301590 var igmpProfile *IgmpProfile
1591
1592 if igmpProfileConfig.ProfileID == DefaultIgmpProfID {
1593 logger.Info(ctx, "Updating default IGMP profile")
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301594 return va.UpdateIgmpProfile(cntx, igmpProfileConfig)
Naveen Sampath04696f72022-06-13 15:19:14 +05301595 }
1596
1597 igmpProfile = va.checkIgmpProfileMap(igmpProfileConfig.ProfileID)
1598 if igmpProfile == nil {
1599 igmpProfile = newIgmpProfile(igmpProfileConfig)
1600 } else {
1601 logger.Errorw(ctx, "IGMP profile already exists", log.Fields{"IgmpProfile": igmpProfileConfig.ProfileID})
1602 return errors.New("IGMP Profile already exists")
1603 }
1604
1605 va.storeIgmpProfileMap(igmpProfileConfig.ProfileID, igmpProfile)
1606
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301607 if err := igmpProfile.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301608 logger.Errorw(ctx, "Igmp profile Write to DB failed", log.Fields{"profileID": igmpProfile.ProfileID})
1609 }
1610
1611 return nil
1612}
1613
Naveen Sampath04696f72022-06-13 15:19:14 +05301614// checkIgmpProfileMap to get Igmp Profile. If not found return nil
1615func (va *VoltApplication) checkIgmpProfileMap(name string) *IgmpProfile {
1616 if igmpProfileIntf, ok := va.IgmpProfilesByName.Load(name); ok {
1617 return igmpProfileIntf.(*IgmpProfile)
1618 }
1619 return nil
1620}
1621
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301622func (va *VoltApplication) resetIgmpProfileToDefault(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301623 igmpProf := va.getIgmpProfileMap(DefaultIgmpProfID)
1624 defIgmpProf := newDefaultIgmpProfile()
1625
1626 igmpProf.UnsolicitedTimeOut = defIgmpProf.UnsolicitedTimeOut
1627 igmpProf.MaxResp = defIgmpProf.MaxResp
1628 igmpProf.KeepAliveInterval = defIgmpProf.KeepAliveInterval
1629 igmpProf.KeepAliveCount = defIgmpProf.KeepAliveCount
1630 igmpProf.LastQueryInterval = defIgmpProf.LastQueryInterval
1631 igmpProf.LastQueryCount = defIgmpProf.LastQueryCount
1632 igmpProf.FastLeave = defIgmpProf.FastLeave
1633 igmpProf.PeriodicQuery = defIgmpProf.PeriodicQuery
1634 igmpProf.IgmpCos = defIgmpProf.IgmpCos
1635 igmpProf.WithRAUpLink = defIgmpProf.WithRAUpLink
1636 igmpProf.WithRADownLink = defIgmpProf.WithRADownLink
1637 igmpProf.IgmpVerToServer = defIgmpProf.IgmpVerToServer
1638 igmpProf.IgmpSourceIP = defIgmpProf.IgmpSourceIP
1639
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301640 if err := igmpProf.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301641 logger.Errorw(ctx, "Igmp profile Write to DB failed", log.Fields{"profileID": igmpProf.ProfileID})
1642 }
1643}
1644
1645// getIgmpProfileMap to get Igmp Profile. If not found return default IGMP config
1646func (va *VoltApplication) getIgmpProfileMap(name string) *IgmpProfile {
1647 if igmpProfileIntf, ok := va.IgmpProfilesByName.Load(name); ok {
1648 return igmpProfileIntf.(*IgmpProfile)
1649 }
1650
1651 // There will be always a default igmp profile.
1652 defaultIgmpProfileIntf, _ := va.IgmpProfilesByName.Load(DefaultIgmpProfID)
1653 return defaultIgmpProfileIntf.(*IgmpProfile)
1654}
1655
1656// storeIgmpProfileMap to store Igmp Profile
1657func (va *VoltApplication) storeIgmpProfileMap(name string, igmpProfile *IgmpProfile) {
1658 va.IgmpProfilesByName.Store(name, igmpProfile)
1659}
1660
1661// deleteIgmpProfileMap to delete Igmp Profile
1662func (va *VoltApplication) deleteIgmpProfileMap(name string) {
1663 va.IgmpProfilesByName.Delete(name)
1664}
1665
Akash Sonia8246972023-01-03 10:37:08 +05301666// TODO - DelIgmpProfile for deleting IGMP Profile based on profile Id
1667// func (va *VoltApplication) DelIgmpProfile(cntx context.Context, igmpProfileConfig *common.IGMPConfig) error {
1668// // Deletion of default igmp profile is blocked from submgr. Keeping additional check for safety.
1669// if igmpProfileConfig.ProfileID == DefaultIgmpProfID {
1670// logger.Info(ctx, "Resetting default IGMP profile")
1671// va.resetIgmpProfileToDefault(cntx)
1672// return nil
1673// }
1674// igmpProfile := va.checkIgmpProfileMap(igmpProfileConfig.ProfileID)
1675// if igmpProfile == nil {
1676// logger.Warnw(ctx, "Igmp Profile not found. Unable to delete", log.Fields{"Profile ID": igmpProfileConfig.ProfileID})
1677// return nil
1678// }
1679
1680// va.deleteIgmpProfileMap(igmpProfileConfig.ProfileID)
1681
1682// _ = db.DelIgmpProfile(cntx, igmpProfileConfig.ProfileID)
1683
1684// return nil
1685// }
1686
1687// DelIgmpProfile for deleting IGMP Profile based on profile Id
1688func (va *VoltApplication) DelIgmpProfile(cntx context.Context, profileID string) error {
Naveen Sampath04696f72022-06-13 15:19:14 +05301689 // Deletion of default igmp profile is blocked from submgr. Keeping additional check for safety.
Akash Sonia8246972023-01-03 10:37:08 +05301690 if profileID == DefaultIgmpProfID {
Naveen Sampath04696f72022-06-13 15:19:14 +05301691 logger.Info(ctx, "Resetting default IGMP profile")
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301692 va.resetIgmpProfileToDefault(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +05301693 return nil
1694 }
Akash Sonia8246972023-01-03 10:37:08 +05301695 igmpProfile := va.checkIgmpProfileMap(profileID)
Naveen Sampath04696f72022-06-13 15:19:14 +05301696 if igmpProfile == nil {
Akash Sonia8246972023-01-03 10:37:08 +05301697 logger.Warnw(ctx, "Igmp Profile not found. Unable to delete", log.Fields{"Profile ID": profileID})
Naveen Sampath04696f72022-06-13 15:19:14 +05301698 return nil
1699 }
1700
Akash Sonia8246972023-01-03 10:37:08 +05301701 va.deleteIgmpProfileMap(profileID)
Naveen Sampath04696f72022-06-13 15:19:14 +05301702
Akash Sonia8246972023-01-03 10:37:08 +05301703 err := db.DelIgmpProfile(cntx, profileID)
1704 if err != nil {
1705 logger.Errorw(ctx, "Failed to delete Igmp profile from DB", log.Fields{"Error": err})
1706 return err
1707 }
Naveen Sampath04696f72022-06-13 15:19:14 +05301708
1709 return nil
1710}
1711
vinokuma926cb3e2023-03-29 11:41:06 +05301712// UpdateIgmpProfile for addition of IGMP Profile
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301713func (va *VoltApplication) UpdateIgmpProfile(cntx context.Context, igmpProfileConfig *common.IGMPConfig) error {
Naveen Sampath04696f72022-06-13 15:19:14 +05301714 igmpProfile := va.checkIgmpProfileMap(igmpProfileConfig.ProfileID)
1715 if igmpProfile == nil {
1716 logger.Errorw(ctx, "Igmp Profile not found. Unable to update", log.Fields{"Profile ID": igmpProfileConfig.ProfileID})
1717 return errors.New("IGMP Profile not found")
1718 }
1719
1720 igmpProfile.ProfileID = igmpProfileConfig.ProfileID
Akash Reddy Kankanala105581b2024-09-11 05:20:38 +05301721 igmpProfile.UnsolicitedTimeOut = igmpProfileConfig.UnsolicitedTimeOut
1722 igmpProfile.MaxResp = igmpProfileConfig.MaxResp
Naveen Sampath04696f72022-06-13 15:19:14 +05301723
Akash Reddy Kankanala105581b2024-09-11 05:20:38 +05301724 keepAliveInterval := igmpProfileConfig.KeepAliveInterval
Naveen Sampath04696f72022-06-13 15:19:14 +05301725
vinokuma926cb3e2023-03-29 11:41:06 +05301726 // KeepAliveInterval should have a min of 10 seconds
Naveen Sampath04696f72022-06-13 15:19:14 +05301727 if keepAliveInterval < MinKeepAliveInterval {
1728 keepAliveInterval = MinKeepAliveInterval
1729 logger.Infow(ctx, "Auto adjust keepAliveInterval - Value < 10", log.Fields{"Received": igmpProfileConfig.KeepAliveInterval, "Configured": keepAliveInterval})
1730 }
1731 igmpProfile.KeepAliveInterval = keepAliveInterval
1732
Akash Reddy Kankanala105581b2024-09-11 05:20:38 +05301733 igmpProfile.KeepAliveCount = igmpProfileConfig.KeepAliveCount
1734 igmpProfile.LastQueryInterval = igmpProfileConfig.LastQueryInterval
1735 igmpProfile.LastQueryCount = igmpProfileConfig.LastQueryCount
Naveen Sampath04696f72022-06-13 15:19:14 +05301736 igmpProfile.FastLeave = *igmpProfileConfig.FastLeave
1737 igmpProfile.PeriodicQuery = *igmpProfileConfig.PeriodicQuery
Akash Reddy Kankanala105581b2024-09-11 05:20:38 +05301738 igmpProfile.IgmpCos = igmpProfileConfig.IgmpCos
Naveen Sampath04696f72022-06-13 15:19:14 +05301739 igmpProfile.WithRAUpLink = *igmpProfileConfig.WithRAUpLink
1740 igmpProfile.WithRADownLink = *igmpProfileConfig.WithRADownLink
1741
1742 if igmpProfileConfig.IgmpVerToServer == "2" || igmpProfileConfig.IgmpVerToServer == "v2" {
1743 igmpProfile.IgmpVerToServer = "2"
1744 } else {
1745 igmpProfile.IgmpVerToServer = "3"
1746 }
1747
1748 if igmpProfileConfig.IgmpSourceIP != "" {
1749 igmpProfile.IgmpSourceIP = net.ParseIP(igmpProfileConfig.IgmpSourceIP)
1750 }
1751
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301752 if err := igmpProfile.WriteToDb(cntx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301753 logger.Errorw(ctx, "Igmp profile Write to DB failed", log.Fields{"profileID": igmpProfile.ProfileID})
1754 }
1755
1756 return nil
1757}
1758
1759// RestoreIGMPProfilesFromDb to read from the DB and restore IGMP Profiles
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301760func (va *VoltApplication) RestoreIGMPProfilesFromDb(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301761 // Loading IGMP profiles
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301762 igmpProfiles, _ := db.GetIgmpProfiles(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +05301763 for _, igmpProfile := range igmpProfiles {
1764 b, ok := igmpProfile.Value.([]byte)
1765 if !ok {
1766 logger.Warn(ctx, "The value type is not []byte")
1767 continue
1768 }
1769 var igmpProf IgmpProfile
1770 err := json.Unmarshal(b, &igmpProf)
1771 if err != nil {
1772 logger.Warn(ctx, "Unmarshal of IGMP Profile failed")
1773 continue
1774 }
1775 va.storeIgmpProfileMap(igmpProf.ProfileID, &igmpProf)
1776 logger.Infow(ctx, "Restored Igmp Profile", log.Fields{"Conf": igmpProf})
1777 }
1778}
1779
1780// InitIgmpSrcMac for initialization of igmp source mac
1781func (va *VoltApplication) InitIgmpSrcMac() {
1782 srcMac, err := getPodMacAddr()
1783 if err != nil {
1784 igmpSrcMac = "00:11:11:11:11:11"
1785 return
1786 }
1787 igmpSrcMac = srcMac
1788}
1789
Naveen Sampath04696f72022-06-13 15:19:14 +05301790// DelMvlanProfile for deletion of a MVLAN group
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301791func (va *VoltApplication) DelMvlanProfile(cntx context.Context, name string) error {
Naveen Sampath04696f72022-06-13 15:19:14 +05301792 if mvpIntf, ok := va.MvlanProfilesByName.Load(name); ok {
1793 mvp := mvpIntf.(*MvlanProfile)
1794
1795 if len(mvp.DevicesList) == 0 {
1796 mvp.DeleteInProgress = true
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301797 mvp.DelFromDb(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +05301798 va.deleteMvlansMap(mvp.Mvlan, name)
1799 logger.Debugw(ctx, "Deleted MVLAN Profile", log.Fields{"Name": mvp.Name})
1800 } else {
1801 logger.Errorw(ctx, "Unable to delete Mvlan Profile as there is still an OLT attached to it", log.Fields{"Name": mvp.Name,
1802 "Device List": mvp.DevicesList})
1803 return errors.New("MVLAN attached to devices")
1804 }
1805
1806 return nil
1807 }
1808 logger.Errorw(ctx, "MVLAN Profile not found", log.Fields{"MvlanProfile Name": name})
1809 return nil
1810}
1811
1812// ReceiverUpInd for receiver up indication
1813func (va *VoltApplication) ReceiverUpInd(device string, port string, mvpName string, vlan of.VlanType, pbits []of.PbitType) {
1814 logger.Infow(ctx, "Receiver Indication: UP", log.Fields{"device": device, "port": port, "MVP": mvpName, "vlan": vlan, "pbits": pbits})
1815 if mvpIntf, ok := va.MvlanProfilesByName.Load(mvpName); ok {
1816 mvp := mvpIntf.(*MvlanProfile)
1817 if devIntf, ok := va.DevicesDisc.Load(device); ok {
1818 dev := devIntf.(*VoltDevice)
1819 proxyCfg, proxyIP, _ := getIgmpProxyCfgAndIP(mvp.Mvlan, dev.SerialNum)
1820 for _, pbit := range pbits {
1821 sendGeneralQuery(device, port, vlan, uint8(pbit), proxyCfg, proxyIP)
1822 }
1823 } else {
1824 logger.Warnw(ctx, "Device not found for given port", log.Fields{"device": device, "port": port})
1825 }
1826 } else {
1827 logger.Warnw(ctx, "Mvlan Profile not found for given profileName", log.Fields{"MVP": mvpName, "vlan": vlan})
1828 }
1829}
1830
1831// sendGeneralQuery to send general query
1832func sendGeneralQuery(device string, port string, cVlan of.VlanType, pbit uint8, proxyCfg *IgmpProfile, proxyIP *net.IP) {
Tinoj Josephcf161be2022-07-07 19:47:47 +05301833 if queryPkt, err := Igmpv2QueryPacket(AllSystemsMulticastGroupIP, cVlan, *proxyIP, pbit, proxyCfg.MaxResp); err == nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301834 if err := cntlr.GetController().PacketOutReq(device, port, port, queryPkt, false); err != nil {
1835 logger.Warnw(ctx, "General Igmpv2 Query Failed to send", log.Fields{"Device": device, "Port": port, "Packet": queryPkt, "Pbit": pbit})
1836 } else {
1837 logger.Debugw(ctx, "General Igmpv2 Query Sent", log.Fields{"Device": device, "Port": port, "Packet": queryPkt, "Pbit": pbit})
1838 }
1839 }
1840 if getVersion(proxyCfg.IgmpVerToServer) == IgmpVersion3 {
Tinoj Josephcf161be2022-07-07 19:47:47 +05301841 if queryPkt, err := Igmpv3QueryPacket(AllSystemsMulticastGroupIP, cVlan, *proxyIP, pbit, proxyCfg.MaxResp); err == nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301842 if err := cntlr.GetController().PacketOutReq(device, port, port, queryPkt, false); err != nil {
1843 logger.Warnw(ctx, "General Igmpv3 Query Failed to send", log.Fields{"Device": device, "Port": port, "Packet": queryPkt, "Pbit": pbit})
1844 } else {
1845 logger.Debugw(ctx, "General Igmpv3 Query Sent", log.Fields{"Device": device, "Port": port, "Packet": queryPkt, "Pbit": pbit})
1846 }
1847 }
1848 }
1849}
1850
1851// ReceiverDownInd to send receiver down indication
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301852func (va *VoltApplication) ReceiverDownInd(cntx context.Context, device string, port string) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301853 logger.Infow(ctx, " Receiver Indication: DOWN", log.Fields{"device": device, "port": port})
1854
1855 ponPortID := va.GetPonPortID(device, port)
1856
1857 del := func(key interface{}, value interface{}) bool {
1858 ig := value.(*IgmpGroup)
1859 ig.IgmpGroupLock.Lock()
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301860 ig.DelReceiveronDownInd(cntx, device, port, ponPortID)
Naveen Sampath04696f72022-06-13 15:19:14 +05301861 ig.IgmpGroupLock.Unlock()
1862 if ig.NumDevicesActive() == 0 {
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301863 va.DelIgmpGroup(cntx, ig)
Naveen Sampath04696f72022-06-13 15:19:14 +05301864 }
1865 return true
1866 }
1867 va.IgmpGroups.Range(del)
1868}