Packet in/out streaming from ofagent to core
Getting ready for packet streaming
Change-Id: I8d70d4d6ffbb23c0d7ab20582e9afac49f9f6461
Support flow_delete_strict
Change-Id: I5dab5f74a7daddcddfeb8691a3940347cb2fc11b
Packet out halfway plumbed
Change-Id: I799d3f59d42ac9de0563b5e6b9a0064fd895a6f6
refactored async_twisted
Change-Id: I68f8d12ce6fdbb70cee398f581669529b567d94d
Packet in pipeline and ofagent refactoring
Change-Id: I31ecbf7d52fdd18c3884b8d1870f673488f808df
diff --git a/ofagent/connection_mgr.py b/ofagent/connection_mgr.py
index 9a17121..13fb6de 100644
--- a/ofagent/connection_mgr.py
+++ b/ofagent/connection_mgr.py
@@ -28,60 +28,53 @@
from agent import Agent
+log = get_logger()
+
+
class ConnectionManager(object):
- log = get_logger()
-
def __init__(self, consul_endpoint, voltha_endpoint, controller_endpoint,
- voltha_retry_interval=0.5, devices_refresh_interval=60):
+ voltha_retry_interval=0.5, devices_refresh_interval=5):
- self.log.info('Initializing connection manager')
+ log.info('init-connection-manager')
self.controller_endpoint = controller_endpoint
self.consul_endpoint = consul_endpoint
self.voltha_endpoint = voltha_endpoint
self.channel = None
- self.connected_devices = None
- self.unprocessed_devices = None
- self.agent_map = {}
- self.grpc_client = None
- self.device_id_map = None
+ self.grpc_client = None # single, shared gRPC client to Voltha
+
+ self.agent_map = {} # datapath_id -> Agent()
+ self.device_id_to_datapath_id_map = {}
self.voltha_retry_interval = voltha_retry_interval
self.devices_refresh_interval = devices_refresh_interval
self.running = False
- @inlineCallbacks
def run(self):
+
if self.running:
return
- self.log.info('Running connection manager')
+ log.info('run-connection-manager')
self.running = True
# Get voltha grpc endpoint
self.channel = self.get_grpc_channel_with_voltha()
- # Connect to voltha using grpc and fetch the list of logical devices
- yield self.get_list_of_logical_devices_from_voltha()
-
# Create shared gRPC API object
- self.grpc_client = GrpcClient(self.channel, self.device_id_map)
+ self.grpc_client = GrpcClient(self, self.channel)
- # Instantiate an OpenFlow agent for each logical device
- self.refresh_openflow_agent_connections()
+ # Start monitoring logical devices and manage agents accordingly
+ reactor.callLater(0, self.monitor_logical_devices)
- reactor.addSystemEventTrigger('before', 'shutdown', self.shutdown)
- reactor.callLater(0, self.monitor_connections)
-
- returnValue(self)
-
+ return self
def shutdown(self):
# clean up all controller connections
- for key, value in enumerate(self.agent_map):
+ for _, value in enumerate(self.agent_map):
value.stop()
self.running = False
# TODO: close grpc connection to voltha
@@ -92,11 +85,11 @@
try:
ip_port_endpoint = get_endpoint_from_consul(
self.consul_endpoint, endpoint[1:])
- self.log.info(
+ log.info(
'Found endpoint {} service at {}'.format(endpoint,
ip_port_endpoint))
except Exception as e:
- self.log.error('Failure to locate {} service from '
+ log.error('Failure to locate {} service from '
'consul {}:'.format(endpoint, repr(e)))
return
if ip_port_endpoint:
@@ -104,128 +97,108 @@
return host, int(port)
def get_grpc_channel_with_voltha(self):
- self.log.info('Resolving voltha endpoint {} from consul'.format(
+ log.info('Resolving voltha endpoint {} from consul'.format(
self.voltha_endpoint))
host, port = self.resolve_endpoint(self.voltha_endpoint)
assert host is not None
assert port is not None
# Create grpc channel to Voltha
channel = grpc.insecure_channel('{}:{}'.format(host, port))
- self.log.info('Acquired a grpc channel to voltha')
+ log.info('Acquired a grpc channel to voltha')
return channel
@inlineCallbacks
def get_list_of_logical_devices_from_voltha(self):
+
while True:
- self.log.info('Retrieve devices from voltha')
+ log.info('Retrieve devices from voltha')
try:
stub = voltha_pb2.VolthaLogicalLayerStub(self.channel)
devices = stub.ListLogicalDevices(
voltha_pb2.NullMessage()).items
for device in devices:
- self.log.info("Devices {} -> {}".format(device.id,
+ log.info("Devices {} -> {}".format(device.id,
device.datapath_id))
- self.unprocessed_devices = devices
- self.device_id_map = dict(
- (device.datapath_id, device.id) for device in devices)
- return
+ returnValue(devices)
+
except Exception as e:
- self.log.error('Failure to retrieve devices from '
+ log.error('Failure to retrieve devices from '
'voltha: {}'.format(repr(e)))
- self.log.info('reconnect', after_delay=self.voltha_retry_interval)
+ log.info('reconnect', after_delay=self.voltha_retry_interval)
yield asleep(self.voltha_retry_interval)
- def refresh_openflow_agent_connections(self):
- # Compare the new device list again the previous
- # For any new device, an agent connection will be created. For
- # existing device that are no longer part of the list then that
- # agent connection will be stopped
+ def refresh_agent_connections(self, devices):
+ """
+ Based on the new device list, update the following state in the class:
+ * agent_map
+ * datapath_map
+ * device_id_map
+ :param devices: full device list freshly received from Voltha
+ :return: None
+ """
- # If the ofagent has no previous devices then just add them
- if self.connected_devices is None:
- datapath_ids_to_add = [device.datapath_id for device in self.unprocessed_devices]
- else:
- previous_datapath_ids = [device.datapath_id for device in self.connected_devices]
- current_datapath_ids = [device.datapath_id for device in self.unprocessed_devices]
- datapath_ids_to_add = [d for d in current_datapath_ids if
- d not in previous_datapath_ids]
- datapath_ids_to_remove = [d for d in previous_datapath_ids if
- d not in current_datapath_ids]
+ # Use datapath ids for deciding what's new and what's obsolete
+ desired_datapath_ids = set(d.datapath_id for d in devices)
+ current_datapath_ids = set(self.agent_map.iterkeys())
- # Check for no change
- if not datapath_ids_to_add and not datapath_ids_to_remove:
- self.log.info('No new devices found. No OF agent update '
- 'required')
- return
+ # if identical, nothing to do
+ if desired_datapath_ids == current_datapath_ids:
+ return
- self.log.info('Updating OF agent connections.')
- print self.agent_map
+ # ... otherwise calculate differences
+ to_add = desired_datapath_ids.difference(current_datapath_ids)
+ to_del = current_datapath_ids.difference(desired_datapath_ids)
- # Stop previous agents
- for datapath_id in datapath_ids_to_remove:
- if self.agent_map.has_key(datapath_id):
- self.agent_map[datapath_id].stop()
- del self.agent_map[datapath_id]
- self.log.info('Removed OF agent with datapath id {'
- '}'.format(datapath_id))
+ # remove what we don't need
+ for datapath_id in to_del:
+ self.delete_agent(datapath_id)
- # Add the new agents
- for datapath_id in datapath_ids_to_add:
- self.agent_map[datapath_id] = Agent(self.controller_endpoint,
- datapath_id,
- self.grpc_client)
- self.agent_map[datapath_id].run()
- self.log.info('Launched OF agent with datapath id {}'.format(
- datapath_id))
+ # start new agents as needed
+ for device in devices:
+ if device.datapath_id in to_add:
+ self.create_agent(device)
- # replace the old device list with the new ones
- self.connected_devices = self.unprocessed_devices
- self.unprocessed_devices = None
+ log.debug('updated-agent-list', count=len(self.agent_map))
+ log.debug('updated-device-id-to-datapath-id-map',
+ map=str(self.device_id_to_datapath_id_map))
+
+ def create_agent(self, device):
+ datapath_id = device.datapath_id
+ device_id = device.id
+ agent = Agent(self.controller_endpoint, datapath_id,
+ device_id, self.grpc_client)
+ agent.run()
+ self.agent_map[datapath_id] = agent
+ self.device_id_to_datapath_id_map[device_id] = datapath_id
+
+ def delete_agent(self, datapath_id):
+ agent = self.agent_map[datapath_id]
+ device_id = agent.get_device_id()
+ agent.stop()
+ del self.agent_map[datapath_id]
+ del self.device_id_to_datapath_id_map[device_id]
@inlineCallbacks
- def monitor_connections(self):
+ def monitor_logical_devices(self):
while True:
- # sleep first
+ # TODO @khen We should switch to a polling mode based on a
+ # streaming gRPC method
+
+ # get current list from Voltha
+ devices = yield self.get_list_of_logical_devices_from_voltha()
+
+ # update agent list and mapping tables as needed
+ self.refresh_agent_connections(devices)
+
+ # wait before next poll
yield asleep(self.devices_refresh_interval)
- self.log.info('Monitor connections')
- yield self.get_list_of_logical_devices_from_voltha()
- self.refresh_openflow_agent_connections()
+ log.info('Monitor connections')
-# class Model(object):
-# def __init__(self, id, path):
-# self.id=id
-# self.datapath_id=path,
-
-
-# if __name__ == '__main__':
-# conn = ConnectionManager("10.0.2.15:3181", "localhost:50555",
-# "10.100.198.150:6633")
-#
-# conn.connected_devices = None
-# model1 = Model('12311', 'wdadsa1')
-# model2 = Model('12312', 'wdadsa2')
-# model3 = Model('12313', 'wdadsa3')
-# model4 = Model('12314', 'wdadsa4')
-#
-# conn.unprocessed_devices = [model1, model2, model3]
-#
-# conn.refresh_openflow_agent_connections()
-#
-#
-# # val = [device.datapath_id for device in conn.connected_devices]
-# # print val
-# #
-# # for (id,n) in enumerate(val):
-# # print n
-#
-#
-# conn.unprocessed_devices = [model1, model2, model3]
-#
-# conn.refresh_openflow_agent_connections()
-#
-# conn.unprocessed_devices = [model1, model2, model4]
-#
-# conn.refresh_openflow_agent_connections()
\ No newline at end of file
+ def forward_packet_in(self, device_id, ofp_packet_in):
+ datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
+ if datapath_id:
+ agent = self.agent_map[datapath_id]
+ agent.forward_packet_in(ofp_packet_in)