Async/streaming gRPC client/server proto

This experiment was to fine-tune how we can implement
async gRPC client and server code inside a Twisted
python app.

Change-Id: I945014e27f4b9d6ed624666e0284cc298548adb3

Major cleanup of openflow_13.proto

Change-Id: I4e54eaf87b682124ec518a0ade1a6050a6ec6da8

Relocated openflow_13.proto to voltha

Change-Id: I66ae45a9142d180c2c6651e75c7a1ee08aef7ef8

Removed forced utest from make build

Change-Id: If0da58e9d135ebde6ca68c3316688a03a7b10f2f

twisted openflow agent first pass

Change-Id: Ibe5b4727ccfe92e6fd464ccd3baf6275569ef5d3

store openflow derived files

Change-Id: Ib3e1384bb2ca2a9c0872767f7b793f96b0a154e2

Minor cleanup

Change-Id: I1280ed3acb606121b616a0efd573f5f59d010dca

Factored out common utils

Change-Id: Icd86fcd50f60d0900924674cbcd65e13e47782a1

Refactored twisted agent

Change-Id: I71f26ce5357a4f98477df60b8c5ddc068cf75d43

Relocated openflow agent to ofagent

... and preserved obsolete working (non-twisted) agent under
~/obsolete, so we can still run the olt-oftest and pass tests,
unit the new twisted based agent reaches that maturity point.

Change-Id: I727f8d7144b1291a40276dad2966b7643bd7bc4b

olt-oftest in fake mode works with new agent

Change-Id: I43b4f5812e8dfaa9f45e4a77fdcf6c30ac520f8d

Initial ofagent/voltha operation

Change-Id: Ia8104f1285a6b1c51635d36d7d78fc113f800e79

Additional callouts to Voltha

Change-Id: If8f483d5140d3c9d45f22b480b8d33249a29cd4e

More gRPC calls

Change-Id: I7d24fadf9425217fb26ffe18f25359d072ef38fa

Flow add/list now works

Change-Id: Ie3e3e73108645b47891cef798fc61372a022fd93

Missed some files

Change-Id: I29e81238ff1a26c095c0c73e521579edf7092e21
diff --git a/ofagent/of_connection.py b/ofagent/of_connection.py
new file mode 100644
index 0000000..ab05011
--- /dev/null
+++ b/ofagent/of_connection.py
@@ -0,0 +1,126 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from hexdump import hexdump
+from twisted.internet import protocol
+
+import loxi.of14
+from common.utils.message_queue import MessageQueue
+
+log = structlog.get_logger()
+
+
+class OpenFlowConnection(protocol.Protocol):
+
+    def __init__(self, agent):
+        self.agent = agent  # the protocol will call agent.enter_disconnected()
+                            # and agent.enter_connected() methods to indicate
+                            # when state change is necessary
+        self.next_xid = 1
+        self.read_buffer = None
+        self.rx = MessageQueue()
+
+    def connectionLost(self, reason):
+        self.agent.enter_disconnected('connection-lost', reason)
+
+    def connectionMade(self):
+        self.agent.enter_connected()
+
+    def dataReceived(self, data):
+        log.debug('data-received', len=len(data),
+                  received=hexdump(data, result='return'))
+
+        assert len(data)  # connection close shall be handled by the protocol
+        buf = self.read_buffer
+        if buf:
+            buf += data
+        else:
+            buf = data
+
+        offset = 0
+        while offset < len(buf):
+            if offset + 8 > len(buf):
+                break  # not enough data for the OpenFlow header
+
+            # parse the header to get type
+            _version, _type, _len, _xid = \
+                loxi.of14.message.parse_header(buf[offset:])
+
+            ofp = loxi.protocol(_version)
+
+            if (offset + _len) > len(buf):
+                break  # not enough data to cover whole message
+
+            rawmsg = buf[offset : offset + _len]
+            offset += _len
+
+            msg = ofp.message.parse_message(rawmsg)
+            if not msg:
+                log.warn('could-not-parse',
+                         data=hexdump(rawmsg, result='return'))
+            log.debug('received-msg', module=type(msg).__module__,
+                  name=type(msg).__name__, xid=msg.xid, len=len(buf))
+            self.rx.put(msg)
+
+        if offset == len(buf):
+            self.read_buffer = None
+        else:
+            self.read_buffer = buf[offset:]
+            log.debug('remaining', len=len(self.read_buffer))
+
+    def send_raw(self, buf):
+        """
+        Send raw bytes on the socket
+        :param buf: bytes buffer
+        :return: None
+        """
+        assert self.connected
+        log.debug('sending-raw', len=len(buf))
+        self.transport.write(buf)
+
+    def send(self, msg):
+        """
+        Send a message
+        :param msg: An OpenFlow protocol message
+        :return: None
+        """
+        assert self.connected
+
+        if msg.xid is None:
+            msg.xid = self._gen_xid()
+        buf = msg.pack()
+        log.debug('sending', module=type(msg).__module__,
+                  name=type(msg).__name__, xid=msg.xid, len=len(buf))
+        self.transport.write(buf)
+        log.debug('data-sent', sent=hexdump(buf, result='return'))
+
+    def recv(self, predicate):
+        assert self.connected
+        return self.rx.get(predicate)
+
+    def recv_any(self):
+        return self.recv(lambda _: True)
+
+    def recv_xid(self, xid):
+        return self.recv(lambda msg: msg.xid == xid)
+
+    def recv_class(self, klass):
+        return self.recv(lambda msg: isinstance(msg, klass))
+
+    def _gen_xid(self):
+        xid = self.next_xid
+        self.next_xid += 1
+        return xid