Async/streaming gRPC client/server proto
This experiment was to fine-tune how we can implement
async gRPC client and server code inside a Twisted
python app.
Change-Id: I945014e27f4b9d6ed624666e0284cc298548adb3
diff --git a/experiments/streaming_server.py b/experiments/streaming_server.py
new file mode 100644
index 0000000..a207477
--- /dev/null
+++ b/experiments/streaming_server.py
@@ -0,0 +1,139 @@
+import grpc
+from concurrent import futures
+from concurrent.futures import Future
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
+
+from openflow_13_pb2 import add_OpenFlowServicer_to_server, \
+ OpenFlowServicer
+from streaming_pb2 import add_ExperimentalServiceServicer_to_server, \
+ AsyncEvent, ExperimentalServiceServicer, Echo
+
+
+class ShutDown(object):
+ stop = False # semaphore for all loops to stop when this flag is set
+
+
+def asleep(t):
+ d = Deferred()
+ reactor.callLater(t, d.callback, None)
+ return d
+
+
+class ShuttingDown(Exception): pass
+
+
+def twisted_async(func):
+ """
+ This decorator can be used to implement a gRPC method on the twisted
+ thread, allowing asynchronous programming in Twisted while serving
+ a gRPC call.
+
+ gRPC methods normally are called on the futures.ThreadPool threads,
+ so these methods cannot directly use Twisted protocol constructs.
+ If the implementation of the methods needs to touch Twisted, it is
+ safer (or mandatory) to wrap the method with this decorator, which will
+ call the inner method from the external thread and ensure that the
+ result is passed back to the foreign thread.
+ """
+ def in_thread_wrapper(*args, **kw):
+
+ if ShutDown.stop:
+ raise ShuttingDown()
+ f = Future()
+
+ def twisted_wrapper():
+ try:
+ d = func(*args, **kw)
+ if isinstance(d, Deferred):
+
+ def _done(result):
+ f.set_result(result)
+ f.done()
+
+ def _error(e):
+ f.set_exception(e)
+ f.done()
+
+ d.addCallback(_done)
+ d.addErrback(_error)
+
+ else:
+ f.set_result(d)
+ f.done()
+
+ except Exception, e:
+ f.set_exception(e)
+ f.done()
+
+ reactor.callFromThread(twisted_wrapper)
+ result = f.result()
+
+ return result
+
+ return in_thread_wrapper
+
+
+class Service(ExperimentalServiceServicer):
+
+ def __init__(self):
+ self.event_seq = 0
+
+ @twisted_async
+ @inlineCallbacks
+ def GetEcho(self, request, context):
+ print 'got Echo({}) request'.format(request.msg)
+ yield asleep(request.delay)
+ msg = request.msg + ' <<'
+ print ' Echo({}) reply'.format(msg)
+ returnValue(Echo(msg=msg))
+
+ @twisted_async
+ @inlineCallbacks
+ def get_next_event(self):
+ """called on the twisted thread"""
+ yield asleep(0.000001)
+ event = AsyncEvent(seq=self.event_seq, details='foo')
+ self.event_seq += 1
+ returnValue(event)
+
+ def ReceiveStreamedEvents(self, request, context):
+ """called on a thread-pool thread"""
+ print 'got ReceiveStreamedEvents request'
+ while 1:
+ if ShutDown.stop:
+ break
+ yield self.get_next_event()
+
+ def ReceivePackets(self, request, context):
+ pass
+
+ def SendPackets(self, request, context):
+ pass
+
+
+class OpenFlow(OpenFlowServicer):
+
+ def EchoRequest(self, request, context):
+ pass
+
+ def SendPacketsOutMessages(self, request, context):
+ pass
+
+ def ReceivePacketInMessages(self, request, context):
+ pass
+
+
+if __name__ == '__main__':
+ thread_pool = futures.ThreadPoolExecutor(max_workers=10)
+ server = grpc.server(thread_pool)
+ add_ExperimentalServiceServicer_to_server(Service(), server)
+ add_OpenFlowServicer_to_server(OpenFlow(), server)
+ server.add_insecure_port('[::]:50050')
+ server.start()
+ def shutdown():
+ ShutDown.stop = True
+ thread_pool.shutdown(wait=True)
+ server.stop(0)
+ reactor.addSystemEventTrigger('before', 'shutdown', shutdown)
+ reactor.run()