Async/streaming gRPC client/server proto
This experiment was to fine-tune how we can implement
async gRPC client and server code inside a Twisted
python app.
Change-Id: I945014e27f4b9d6ed624666e0284cc298548adb3
Major cleanup of openflow_13.proto
Change-Id: I4e54eaf87b682124ec518a0ade1a6050a6ec6da8
Relocated openflow_13.proto to voltha
Change-Id: I66ae45a9142d180c2c6651e75c7a1ee08aef7ef8
Removed forced utest from make build
Change-Id: If0da58e9d135ebde6ca68c3316688a03a7b10f2f
twisted openflow agent first pass
Change-Id: Ibe5b4727ccfe92e6fd464ccd3baf6275569ef5d3
store openflow derived files
Change-Id: Ib3e1384bb2ca2a9c0872767f7b793f96b0a154e2
Minor cleanup
Change-Id: I1280ed3acb606121b616a0efd573f5f59d010dca
Factored out common utils
Change-Id: Icd86fcd50f60d0900924674cbcd65e13e47782a1
Refactored twisted agent
Change-Id: I71f26ce5357a4f98477df60b8c5ddc068cf75d43
Relocated openflow agent to ofagent
... and preserved obsolete working (non-twisted) agent under
~/obsolete, so we can still run the olt-oftest and pass tests,
unit the new twisted based agent reaches that maturity point.
Change-Id: I727f8d7144b1291a40276dad2966b7643bd7bc4b
olt-oftest in fake mode works with new agent
Change-Id: I43b4f5812e8dfaa9f45e4a77fdcf6c30ac520f8d
Initial ofagent/voltha operation
Change-Id: Ia8104f1285a6b1c51635d36d7d78fc113f800e79
Additional callouts to Voltha
Change-Id: If8f483d5140d3c9d45f22b480b8d33249a29cd4e
More gRPC calls
Change-Id: I7d24fadf9425217fb26ffe18f25359d072ef38fa
Flow add/list now works
Change-Id: Ie3e3e73108645b47891cef798fc61372a022fd93
Missed some files
Change-Id: I29e81238ff1a26c095c0c73e521579edf7092e21
diff --git a/ofagent/of_connection.py b/ofagent/of_connection.py
new file mode 100644
index 0000000..ab05011
--- /dev/null
+++ b/ofagent/of_connection.py
@@ -0,0 +1,126 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from hexdump import hexdump
+from twisted.internet import protocol
+
+import loxi.of14
+from common.utils.message_queue import MessageQueue
+
+log = structlog.get_logger()
+
+
+class OpenFlowConnection(protocol.Protocol):
+
+ def __init__(self, agent):
+ self.agent = agent # the protocol will call agent.enter_disconnected()
+ # and agent.enter_connected() methods to indicate
+ # when state change is necessary
+ self.next_xid = 1
+ self.read_buffer = None
+ self.rx = MessageQueue()
+
+ def connectionLost(self, reason):
+ self.agent.enter_disconnected('connection-lost', reason)
+
+ def connectionMade(self):
+ self.agent.enter_connected()
+
+ def dataReceived(self, data):
+ log.debug('data-received', len=len(data),
+ received=hexdump(data, result='return'))
+
+ assert len(data) # connection close shall be handled by the protocol
+ buf = self.read_buffer
+ if buf:
+ buf += data
+ else:
+ buf = data
+
+ offset = 0
+ while offset < len(buf):
+ if offset + 8 > len(buf):
+ break # not enough data for the OpenFlow header
+
+ # parse the header to get type
+ _version, _type, _len, _xid = \
+ loxi.of14.message.parse_header(buf[offset:])
+
+ ofp = loxi.protocol(_version)
+
+ if (offset + _len) > len(buf):
+ break # not enough data to cover whole message
+
+ rawmsg = buf[offset : offset + _len]
+ offset += _len
+
+ msg = ofp.message.parse_message(rawmsg)
+ if not msg:
+ log.warn('could-not-parse',
+ data=hexdump(rawmsg, result='return'))
+ log.debug('received-msg', module=type(msg).__module__,
+ name=type(msg).__name__, xid=msg.xid, len=len(buf))
+ self.rx.put(msg)
+
+ if offset == len(buf):
+ self.read_buffer = None
+ else:
+ self.read_buffer = buf[offset:]
+ log.debug('remaining', len=len(self.read_buffer))
+
+ def send_raw(self, buf):
+ """
+ Send raw bytes on the socket
+ :param buf: bytes buffer
+ :return: None
+ """
+ assert self.connected
+ log.debug('sending-raw', len=len(buf))
+ self.transport.write(buf)
+
+ def send(self, msg):
+ """
+ Send a message
+ :param msg: An OpenFlow protocol message
+ :return: None
+ """
+ assert self.connected
+
+ if msg.xid is None:
+ msg.xid = self._gen_xid()
+ buf = msg.pack()
+ log.debug('sending', module=type(msg).__module__,
+ name=type(msg).__name__, xid=msg.xid, len=len(buf))
+ self.transport.write(buf)
+ log.debug('data-sent', sent=hexdump(buf, result='return'))
+
+ def recv(self, predicate):
+ assert self.connected
+ return self.rx.get(predicate)
+
+ def recv_any(self):
+ return self.recv(lambda _: True)
+
+ def recv_xid(self, xid):
+ return self.recv(lambda msg: msg.xid == xid)
+
+ def recv_class(self, klass):
+ return self.recv(lambda msg: isinstance(msg, klass))
+
+ def _gen_xid(self):
+ xid = self.next_xid
+ self.next_xid += 1
+ return xid