blob: 54415ba3fdc913b116dc5b546d33fd8fccd9c929 [file] [log] [blame]
Shad Ansarie969afc2019-04-05 15:16:41 -07001#
2# Copyright 2019 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import threading
18from google.protobuf.json_format import Parse
19from simplejson import loads
20from twisted.internet import reactor
21import structlog
Shad Ansari995ca632019-04-08 19:43:46 -070022from scapy.layers.l2 import Ether, Packet
23from common.frameio.frameio import hexify
Shad Ansarie969afc2019-04-05 15:16:41 -070024
25from voltha.adapters.openolt.protos import openolt_pb2
26from voltha.adapters.openolt.openolt_kafka_consumer import KConsumer
27
28
29class OpenoltIndications(object):
30 def __init__(self, device):
31 self.log = structlog.get_logger()
32 self.device = device
33 self.indications_thread_handle = threading.Thread(
34 target=self.indications_thread)
35 self.indications_thread_handle.setDaemon(True)
36
37 def start(self):
38 self.indications_thread_handle.start()
39
40 def stop(self):
41 pass
42
43 def indications_thread(self):
44 self.log.debug('openolt indications thread starting')
Shad Ansari929e6a02019-04-06 23:41:42 -070045 KConsumer(self.indications_process, "openolt.ind")
Shad Ansarie969afc2019-04-05 15:16:41 -070046
47 def indications_process(self, topic, msg):
Shad Ansarie969afc2019-04-05 15:16:41 -070048
Shad Ansari929e6a02019-04-06 23:41:42 -070049 ind = Parse(loads(msg), openolt_pb2.Indication(),
50 ignore_unknown_fields=True)
51
52 self.log.debug("received openolt indication", ind=ind)
53
54 # indication handlers run in the main event loop
55 if ind.HasField('olt_ind'):
56 reactor.callFromThread(self.device.olt_indication, ind.olt_ind)
57 elif ind.HasField('intf_ind'):
58 reactor.callFromThread(self.device.intf_indication, ind.intf_ind)
59 elif ind.HasField('intf_oper_ind'):
60 reactor.callFromThread(self.device.intf_oper_indication,
61 ind.intf_oper_ind)
62 elif ind.HasField('onu_disc_ind'):
63 reactor.callFromThread(self.device.onu_discovery_indication,
64 ind.onu_disc_ind)
65 elif ind.HasField('onu_ind'):
66 reactor.callFromThread(self.device.onu_indication, ind.onu_ind)
67 elif ind.HasField('omci_ind'):
68 reactor.callFromThread(self.device.omci_indication, ind.omci_ind)
69 elif ind.HasField('pkt_ind'):
Shad Ansari995ca632019-04-08 19:43:46 -070070 self.send_packet_in(ind.pkt_ind)
Shad Ansari929e6a02019-04-06 23:41:42 -070071 elif ind.HasField('port_stats'):
Shad Ansarie969afc2019-04-05 15:16:41 -070072 reactor.callFromThread(
Shad Ansari929e6a02019-04-06 23:41:42 -070073 self.device.stats_mgr.port_statistics_indication,
74 ind.port_stats)
75 elif ind.HasField('flow_stats'):
76 reactor.callFromThread(
77 self.device.stats_mgr.flow_statistics_indication,
78 ind.flow_stats)
79 elif ind.HasField('alarm_ind'):
80 reactor.callFromThread(
81 self.device.alarm_mgr.process_alarms, ind.alarm_ind)
82 else:
83 self.log.warn('unknown indication type')
Shad Ansari995ca632019-04-08 19:43:46 -070084
85 def send_packet_in(self, pkt_indication):
86 self.log.debug("packet indication",
87 intf_type=pkt_indication.intf_type,
88 intf_id=pkt_indication.intf_id,
89 port_no=pkt_indication.port_no,
90 cookie=pkt_indication.cookie,
91 gemport_id=pkt_indication.gemport_id,
92 flow_id=pkt_indication.flow_id)
93 try:
94 logical_port_num = self.device.data_model.logical_port_num(
95 pkt_indication.intf_type,
96 pkt_indication.intf_id,
97 pkt_indication.port_no,
98 pkt_indication.gemport_id)
99 except ValueError:
100 self.log.error('No logical port found',
101 intf_type=pkt_indication.intf_type,
102 intf_id=pkt_indication.intf_id,
103 port_no=pkt_indication.port_no,
104 gemport_id=pkt_indication.gemport_id)
105 return
106
107 ether_pkt = Ether(pkt_indication.pkt)
108
109 if isinstance(ether_pkt, Packet):
110 ether_pkt = str(ether_pkt)
111
112 logical_device_id = self.device.data_model.logical_device_id
113 topic = 'packet-in:' + logical_device_id
114
115 self.log.debug('send-packet-in', logical_device_id=logical_device_id,
116 logical_port_num=logical_port_num,
117 packet=hexify(ether_pkt))
118
119 self.device.data_model.adapter_agent.event_bus.publish(
120 topic, (logical_port_num, str(ether_pkt)))