Packet in/out streaming from ofagent to core
Getting ready for packet streaming

Change-Id: I8d70d4d6ffbb23c0d7ab20582e9afac49f9f6461

Support flow_delete_strict

Change-Id: I5dab5f74a7daddcddfeb8691a3940347cb2fc11b

Packet out halfway plumbed

Change-Id: I799d3f59d42ac9de0563b5e6b9a0064fd895a6f6

refactored async_twisted

Change-Id: I68f8d12ce6fdbb70cee398f581669529b567d94d

Packet in pipeline and ofagent refactoring

Change-Id: I31ecbf7d52fdd18c3884b8d1870f673488f808df
diff --git a/ofagent/connection_mgr.py b/ofagent/connection_mgr.py
index 9a17121..13fb6de 100644
--- a/ofagent/connection_mgr.py
+++ b/ofagent/connection_mgr.py
@@ -28,60 +28,53 @@
 
 from agent import Agent
 
+log = get_logger()
+
+
 class ConnectionManager(object):
 
-    log = get_logger()
-
     def __init__(self, consul_endpoint, voltha_endpoint, controller_endpoint,
-                 voltha_retry_interval=0.5, devices_refresh_interval=60):
+                 voltha_retry_interval=0.5, devices_refresh_interval=5):
 
-        self.log.info('Initializing connection manager')
+        log.info('init-connection-manager')
         self.controller_endpoint = controller_endpoint
         self.consul_endpoint = consul_endpoint
         self.voltha_endpoint = voltha_endpoint
 
         self.channel = None
-        self.connected_devices = None
-        self.unprocessed_devices = None
-        self.agent_map = {}
-        self.grpc_client = None
-        self.device_id_map = None
+        self.grpc_client = None  # single, shared gRPC client to Voltha
+
+        self.agent_map = {}  # datapath_id -> Agent()
+        self.device_id_to_datapath_id_map = {}
 
         self.voltha_retry_interval = voltha_retry_interval
         self.devices_refresh_interval = devices_refresh_interval
 
         self.running = False
 
-    @inlineCallbacks
     def run(self):
+
         if self.running:
             return
 
-        self.log.info('Running connection manager')
+        log.info('run-connection-manager')
 
         self.running = True
 
         # Get voltha grpc endpoint
         self.channel = self.get_grpc_channel_with_voltha()
 
-        # Connect to voltha using grpc and fetch the list of logical devices
-        yield self.get_list_of_logical_devices_from_voltha()
-
         # Create shared gRPC API object
-        self.grpc_client = GrpcClient(self.channel, self.device_id_map)
+        self.grpc_client = GrpcClient(self, self.channel)
 
-        # Instantiate an OpenFlow agent for each logical device
-        self.refresh_openflow_agent_connections()
+        # Start monitoring logical devices and manage agents accordingly
+        reactor.callLater(0, self.monitor_logical_devices)
 
-        reactor.addSystemEventTrigger('before', 'shutdown', self.shutdown)
-        reactor.callLater(0, self.monitor_connections)
-
-        returnValue(self)
-
+        return self
 
     def shutdown(self):
         # clean up all controller connections
-        for key, value in enumerate(self.agent_map):
+        for _, value in enumerate(self.agent_map):
             value.stop()
         self.running = False
         # TODO: close grpc connection to voltha
@@ -92,11 +85,11 @@
             try:
                 ip_port_endpoint = get_endpoint_from_consul(
                     self.consul_endpoint, endpoint[1:])
-                self.log.info(
+                log.info(
                     'Found endpoint {} service at {}'.format(endpoint,
                                                              ip_port_endpoint))
             except Exception as e:
-                self.log.error('Failure to locate {} service from '
+                log.error('Failure to locate {} service from '
                                'consul {}:'.format(endpoint, repr(e)))
                 return
         if ip_port_endpoint:
@@ -104,128 +97,108 @@
             return host, int(port)
 
     def get_grpc_channel_with_voltha(self):
-        self.log.info('Resolving voltha endpoint {} from consul'.format(
+        log.info('Resolving voltha endpoint {} from consul'.format(
             self.voltha_endpoint))
         host, port = self.resolve_endpoint(self.voltha_endpoint)
         assert host is not None
         assert port is not None
         # Create grpc channel to Voltha
         channel = grpc.insecure_channel('{}:{}'.format(host, port))
-        self.log.info('Acquired a grpc channel to voltha')
+        log.info('Acquired a grpc channel to voltha')
         return channel
 
 
     @inlineCallbacks
     def get_list_of_logical_devices_from_voltha(self):
+
         while True:
-            self.log.info('Retrieve devices from voltha')
+            log.info('Retrieve devices from voltha')
             try:
                 stub = voltha_pb2.VolthaLogicalLayerStub(self.channel)
                 devices = stub.ListLogicalDevices(
                     voltha_pb2.NullMessage()).items
                 for device in devices:
-                    self.log.info("Devices {} -> {}".format(device.id,
+                    log.info("Devices {} -> {}".format(device.id,
                                                             device.datapath_id))
-                self.unprocessed_devices = devices
-                self.device_id_map = dict(
-                    (device.datapath_id, device.id) for device in devices)
-                return
+                returnValue(devices)
+
             except Exception as e:
-                self.log.error('Failure to retrieve devices from '
+                log.error('Failure to retrieve devices from '
                                'voltha: {}'.format(repr(e)))
 
-            self.log.info('reconnect', after_delay=self.voltha_retry_interval)
+            log.info('reconnect', after_delay=self.voltha_retry_interval)
             yield asleep(self.voltha_retry_interval)
 
 
-    def refresh_openflow_agent_connections(self):
-        # Compare the new device list again the previous
-        # For any new device, an agent connection will be created.  For
-        # existing device that are no longer part of the list then that
-        # agent connection will be stopped
+    def refresh_agent_connections(self, devices):
+        """
+        Based on the new device list, update the following state in the class:
+        * agent_map
+        * datapath_map
+        * device_id_map
+        :param devices: full device list freshly received from Voltha
+        :return: None
+        """
 
-        # If the ofagent has no previous devices then just add them
-        if self.connected_devices is None:
-            datapath_ids_to_add = [device.datapath_id for device in self.unprocessed_devices]
-        else:
-            previous_datapath_ids = [device.datapath_id for device in self.connected_devices]
-            current_datapath_ids = [device.datapath_id for device in self.unprocessed_devices]
-            datapath_ids_to_add = [d for d in current_datapath_ids if
-                                 d not in previous_datapath_ids]
-            datapath_ids_to_remove = [d for d in previous_datapath_ids if
-                                 d not in current_datapath_ids]
+        # Use datapath ids for deciding what's new and what's obsolete
+        desired_datapath_ids = set(d.datapath_id for d in devices)
+        current_datapath_ids = set(self.agent_map.iterkeys())
 
-            # Check for no change
-            if not datapath_ids_to_add and not datapath_ids_to_remove:
-                self.log.info('No new devices found.  No OF agent update '
-                              'required')
-                return
+        # if identical, nothing to do
+        if desired_datapath_ids == current_datapath_ids:
+            return
 
-            self.log.info('Updating OF agent connections.')
-            print self.agent_map
+        # ... otherwise calculate differences
+        to_add = desired_datapath_ids.difference(current_datapath_ids)
+        to_del = current_datapath_ids.difference(desired_datapath_ids)
 
-            # Stop previous agents
-            for datapath_id in datapath_ids_to_remove:
-                if self.agent_map.has_key(datapath_id):
-                    self.agent_map[datapath_id].stop()
-                    del self.agent_map[datapath_id]
-                    self.log.info('Removed OF agent with datapath id {'
-                                  '}'.format(datapath_id))
+        # remove what we don't need
+        for datapath_id in to_del:
+            self.delete_agent(datapath_id)
 
-        # Add the new agents
-        for datapath_id in datapath_ids_to_add:
-            self.agent_map[datapath_id] = Agent(self.controller_endpoint,
-                                                datapath_id,
-                                                self.grpc_client)
-            self.agent_map[datapath_id].run()
-            self.log.info('Launched OF agent with datapath id {}'.format(
-                datapath_id))
+        # start new agents as needed
+        for device in devices:
+            if device.datapath_id in to_add:
+                self.create_agent(device)
 
-        # replace the old device list with the new ones
-        self.connected_devices = self.unprocessed_devices
-        self.unprocessed_devices = None
+        log.debug('updated-agent-list', count=len(self.agent_map))
+        log.debug('updated-device-id-to-datapath-id-map',
+                  map=str(self.device_id_to_datapath_id_map))
+
+    def create_agent(self, device):
+        datapath_id = device.datapath_id
+        device_id = device.id
+        agent = Agent(self.controller_endpoint, datapath_id,
+                      device_id, self.grpc_client)
+        agent.run()
+        self.agent_map[datapath_id] = agent
+        self.device_id_to_datapath_id_map[device_id] = datapath_id
+
+    def delete_agent(self, datapath_id):
+        agent = self.agent_map[datapath_id]
+        device_id = agent.get_device_id()
+        agent.stop()
+        del self.agent_map[datapath_id]
+        del self.device_id_to_datapath_id_map[device_id]
 
     @inlineCallbacks
-    def monitor_connections(self):
+    def monitor_logical_devices(self):
         while True:
-            # sleep first
+            # TODO @khen We should switch to a polling mode based on a
+            # streaming gRPC method
+
+            # get current list from Voltha
+            devices = yield self.get_list_of_logical_devices_from_voltha()
+
+            # update agent list and mapping tables as needed
+            self.refresh_agent_connections(devices)
+
+            # wait before next poll
             yield asleep(self.devices_refresh_interval)
-            self.log.info('Monitor connections')
-            yield self.get_list_of_logical_devices_from_voltha()
-            self.refresh_openflow_agent_connections()
+            log.info('Monitor connections')
 
-# class Model(object):
-#     def __init__(self, id, path):
-#         self.id=id
-#         self.datapath_id=path,
-
-
-# if __name__ == '__main__':
-#     conn = ConnectionManager("10.0.2.15:3181", "localhost:50555",
-#                              "10.100.198.150:6633")
-#
-#     conn.connected_devices = None
-#     model1 = Model('12311', 'wdadsa1')
-#     model2 = Model('12312', 'wdadsa2')
-#     model3 = Model('12313', 'wdadsa3')
-#     model4 = Model('12314', 'wdadsa4')
-#
-#     conn.unprocessed_devices = [model1, model2, model3]
-#
-#     conn.refresh_openflow_agent_connections()
-#
-#
-#     # val = [device.datapath_id for device in conn.connected_devices]
-#     # print val
-#     #
-#     # for (id,n) in enumerate(val):
-#     #     print n
-#
-#
-#     conn.unprocessed_devices = [model1, model2, model3]
-#
-#     conn.refresh_openflow_agent_connections()
-#
-#     conn.unprocessed_devices = [model1, model2, model4]
-#
-#     conn.refresh_openflow_agent_connections()
\ No newline at end of file
+    def forward_packet_in(self, device_id, ofp_packet_in):
+        datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
+        if datapath_id:
+            agent = self.agent_map[datapath_id]
+            agent.forward_packet_in(ofp_packet_in)