Async/streaming gRPC client/server proto

This experiment was to fine-tune how we can implement
async gRPC client and server code inside a Twisted
python app.

Change-Id: I945014e27f4b9d6ed624666e0284cc298548adb3
diff --git a/experiments/streaming_client.py b/experiments/streaming_client.py
new file mode 100644
index 0000000..e7593f9
--- /dev/null
+++ b/experiments/streaming_client.py
@@ -0,0 +1,65 @@
+import time
+
+import grpc
+from google.protobuf.empty_pb2 import Empty
+from twisted.internet import reactor
+from twisted.internet import threads
+from twisted.internet.defer import Deferred, inlineCallbacks, DeferredQueue
+
+from streaming_pb2 import ExperimentalServiceStub, Echo
+
+
+def asleep(t):
+    d = Deferred()
+    reactor.callLater(t, d.callback, None)
+    return d
+
+t0 = time.time()
+
+
+def pr(s):
+    print '%lf %s' % (time.time() - t0, s)
+
+
+class ClientServices(object):
+
+    def async_receive_stream(self, func, *args, **kw):
+        queue = DeferredQueue()
+        def _execute():
+            for result in func(*args, **kw):
+                reactor.callFromThread(queue.put, result)
+        _ = threads.deferToThread(_execute)
+        while 1:
+            yield queue.get()
+
+    @inlineCallbacks
+    def echo_loop(self, stub, prefix='', interval=1.0):
+        """Send an echo message and print its return value"""
+        seq = 0
+        while 1:
+            msg = 'ECHO%05d' % seq
+            pr('{}sending echo {}'.format(prefix, msg))
+            request = Echo(msg=msg, delay=interval)
+            response = yield threads.deferToThread(stub.GetEcho, request)
+            pr('{}    got echo {}'.format(prefix, response.msg))
+            seq += 1
+            yield asleep(interval)
+
+    @inlineCallbacks
+    def receive_async_events(self, stub):
+        e = Empty()
+        for next in self.async_receive_stream(stub.ReceiveStreamedEvents, e):
+            event = yield next
+            if event.seq % 100 == 0:
+                pr('event received: %s %s %s' % (
+                   event.seq, event.type, event.details))
+
+
+if __name__ == '__main__':
+    client_services = ClientServices()
+    channel = grpc.insecure_channel('localhost:50050')
+    stub = ExperimentalServiceStub(channel)
+    reactor.callLater(0, client_services.echo_loop, stub, '', 0.2)
+    reactor.callLater(0, client_services.echo_loop, stub, 40*' ', 2)
+    reactor.callLater(0, client_services.receive_async_events, stub)
+    reactor.run()