Packet in/out streaming from ofagent to core
Getting ready for packet streaming

Change-Id: I8d70d4d6ffbb23c0d7ab20582e9afac49f9f6461

Support flow_delete_strict

Change-Id: I5dab5f74a7daddcddfeb8691a3940347cb2fc11b

Packet out halfway plumbed

Change-Id: I799d3f59d42ac9de0563b5e6b9a0064fd895a6f6

refactored async_twisted

Change-Id: I68f8d12ce6fdbb70cee398f581669529b567d94d

Packet in pipeline and ofagent refactoring

Change-Id: I31ecbf7d52fdd18c3884b8d1870f673488f808df
diff --git a/experiments/streaming_server.py b/experiments/streaming_server.py
index 588d4b3..c2ab286 100644
--- a/experiments/streaming_server.py
+++ b/experiments/streaming_server.py
@@ -7,7 +7,9 @@
 from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
 
 from common.utils.asleep import asleep
+from google.protobuf.empty_pb2 import Empty
 
+from common.utils.grpc_utils import twisted_async
 from streaming_pb2 import add_ExperimentalServiceServicer_to_server, \
     AsyncEvent, ExperimentalServiceServicer, Echo
 
@@ -19,57 +21,6 @@
 class ShuttingDown(Exception): pass
 
 
-def twisted_async(func):
-    """
-    This decorator can be used to implement a gRPC method on the twisted
-    thread, allowing asynchronous programming in Twisted while serving
-    a gRPC call.
-
-    gRPC methods normally are called on the futures.ThreadPool threads,
-    so these methods cannot directly use Twisted protocol constructs.
-    If the implementation of the methods needs to touch Twisted, it is
-    safer (or mandatory) to wrap the method with this decorator, which will
-    call the inner method from the external thread and ensure that the
-    result is passed back to the foreign thread.
-    """
-    def in_thread_wrapper(*args, **kw):
-
-        if ShutDown.stop:
-            raise ShuttingDown()
-        f = Future()
-
-        def twisted_wrapper():
-            try:
-                d = func(*args, **kw)
-                if isinstance(d, Deferred):
-
-                    def _done(result):
-                        f.set_result(result)
-                        f.done()
-
-                    def _error(e):
-                        f.set_exception(e)
-                        f.done()
-
-                    d.addCallback(_done)
-                    d.addErrback(_error)
-
-                else:
-                    f.set_result(d)
-                    f.done()
-
-            except Exception, e:
-                f.set_exception(e)
-                f.done()
-
-        reactor.callFromThread(twisted_wrapper)
-        result = f.result()
-
-        return result
-
-    return in_thread_wrapper
-
-
 class Service(ExperimentalServiceServicer):
 
     def __init__(self):
@@ -88,7 +39,7 @@
     @inlineCallbacks
     def get_next_event(self):
         """called on the twisted thread"""
-        yield asleep(0.0001)
+        yield asleep(0.000001)
         event = AsyncEvent(seq=self.event_seq, details='foo')
         self.event_seq += 1
         returnValue(event)
@@ -105,7 +56,12 @@
         pass
 
     def SendPackets(self, request, context):
-        pass
+        count = 0
+        for _ in request:
+            count += 1
+            if count % 1000 == 0:
+                print '%s got %d packets' % (20 * ' ', count)
+        return Empty()
 
 
 if __name__ == '__main__':