Async/streaming gRPC client/server proto
This experiment was to fine-tune how we can implement
async gRPC client and server code inside a Twisted
python app.
Change-Id: I945014e27f4b9d6ed624666e0284cc298548adb3
diff --git a/experiments/streaming_client.py b/experiments/streaming_client.py
new file mode 100644
index 0000000..e7593f9
--- /dev/null
+++ b/experiments/streaming_client.py
@@ -0,0 +1,65 @@
+import time
+
+import grpc
+from google.protobuf.empty_pb2 import Empty
+from twisted.internet import reactor
+from twisted.internet import threads
+from twisted.internet.defer import Deferred, inlineCallbacks, DeferredQueue
+
+from streaming_pb2 import ExperimentalServiceStub, Echo
+
+
+def asleep(t):
+ d = Deferred()
+ reactor.callLater(t, d.callback, None)
+ return d
+
+t0 = time.time()
+
+
+def pr(s):
+ print '%lf %s' % (time.time() - t0, s)
+
+
+class ClientServices(object):
+
+ def async_receive_stream(self, func, *args, **kw):
+ queue = DeferredQueue()
+ def _execute():
+ for result in func(*args, **kw):
+ reactor.callFromThread(queue.put, result)
+ _ = threads.deferToThread(_execute)
+ while 1:
+ yield queue.get()
+
+ @inlineCallbacks
+ def echo_loop(self, stub, prefix='', interval=1.0):
+ """Send an echo message and print its return value"""
+ seq = 0
+ while 1:
+ msg = 'ECHO%05d' % seq
+ pr('{}sending echo {}'.format(prefix, msg))
+ request = Echo(msg=msg, delay=interval)
+ response = yield threads.deferToThread(stub.GetEcho, request)
+ pr('{} got echo {}'.format(prefix, response.msg))
+ seq += 1
+ yield asleep(interval)
+
+ @inlineCallbacks
+ def receive_async_events(self, stub):
+ e = Empty()
+ for next in self.async_receive_stream(stub.ReceiveStreamedEvents, e):
+ event = yield next
+ if event.seq % 100 == 0:
+ pr('event received: %s %s %s' % (
+ event.seq, event.type, event.details))
+
+
+if __name__ == '__main__':
+ client_services = ClientServices()
+ channel = grpc.insecure_channel('localhost:50050')
+ stub = ExperimentalServiceStub(channel)
+ reactor.callLater(0, client_services.echo_loop, stub, '', 0.2)
+ reactor.callLater(0, client_services.echo_loop, stub, 40*' ', 2)
+ reactor.callLater(0, client_services.receive_async_events, stub)
+ reactor.run()