Async/streaming gRPC client/server proto
This experiment was to fine-tune how we can implement
async gRPC client and server code inside a Twisted
python app.
Change-Id: I945014e27f4b9d6ed624666e0284cc298548adb3
Major cleanup of openflow_13.proto
Change-Id: I4e54eaf87b682124ec518a0ade1a6050a6ec6da8
Relocated openflow_13.proto to voltha
Change-Id: I66ae45a9142d180c2c6651e75c7a1ee08aef7ef8
Removed forced utest from make build
Change-Id: If0da58e9d135ebde6ca68c3316688a03a7b10f2f
twisted openflow agent first pass
Change-Id: Ibe5b4727ccfe92e6fd464ccd3baf6275569ef5d3
store openflow derived files
Change-Id: Ib3e1384bb2ca2a9c0872767f7b793f96b0a154e2
Minor cleanup
Change-Id: I1280ed3acb606121b616a0efd573f5f59d010dca
Factored out common utils
Change-Id: Icd86fcd50f60d0900924674cbcd65e13e47782a1
Refactored twisted agent
Change-Id: I71f26ce5357a4f98477df60b8c5ddc068cf75d43
Relocated openflow agent to ofagent
... and preserved obsolete working (non-twisted) agent under
~/obsolete, so we can still run the olt-oftest and pass tests,
unit the new twisted based agent reaches that maturity point.
Change-Id: I727f8d7144b1291a40276dad2966b7643bd7bc4b
olt-oftest in fake mode works with new agent
Change-Id: I43b4f5812e8dfaa9f45e4a77fdcf6c30ac520f8d
Initial ofagent/voltha operation
Change-Id: Ia8104f1285a6b1c51635d36d7d78fc113f800e79
Additional callouts to Voltha
Change-Id: If8f483d5140d3c9d45f22b480b8d33249a29cd4e
More gRPC calls
Change-Id: I7d24fadf9425217fb26ffe18f25359d072ef38fa
Flow add/list now works
Change-Id: Ie3e3e73108645b47891cef798fc61372a022fd93
Missed some files
Change-Id: I29e81238ff1a26c095c0c73e521579edf7092e21
diff --git a/obsolete/agent.py b/obsolete/agent.py
new file mode 100644
index 0000000..a0a14fc
--- /dev/null
+++ b/obsolete/agent.py
@@ -0,0 +1,280 @@
+import logging
+import loxi.of13 as ofp
+import socket
+import sys
+import time
+
+from loxi.connection import Connection
+from ofagent.utils import pp
+
+
+class Agent(object):
+
+ def __init__(self, controller, datapath_id,
+ store, backend, retry_interval=1):
+ self.ip = controller.split(':')[0]
+ self.port = int(controller.split(':')[1])
+ self.datapath_id = datapath_id
+ self.store = store
+ self.backend = backend
+ self.exiting = False
+ self.retry_interval = retry_interval
+ self.cxn = None
+ self.soc = None
+
+ def run(self):
+ self.connect()
+
+ def connect(self):
+ """
+ Connect to a controller
+ """
+ while not self.exiting:
+ self.cxn = None
+ self.soc = soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ soc.connect((self.ip, self.port))
+ except socket.error, e:
+ logging.info(
+ "Cannot connect to controller (errno=%d), "
+ "retrying in %s secs" %
+ (e.errno, self.retry_interval))
+ else:
+ logging.info("Connected to controller")
+ soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
+ self.cxn = cxn = Connection(self.soc)
+ cxn.daemon = False
+ cxn.start()
+ try:
+ self.handle_protocol()
+ except Exception, e:
+ logging.info(
+ "Connection was lost (%s), will retry in %s secs" %
+ (e, self.retry_interval))
+ time.sleep(self.retry_interval)
+
+ def stop(self):
+ if self.cxn is not None:
+ self.cxn.stop()
+ if self.soc is not None:
+ self.soc.close()
+
+ def signal_flow_mod_error(self, code, data):
+ msg = ofp.message.flow_mod_failed_error_msg(code=code, data=data)
+ self.cxn.send(msg)
+
+ def signal_group_mod_error(self, code, data):
+ msg = ofp.message.group_mod_failed_error_msg(code=code, data=data)
+ self.cxn.send(msg)
+
+ def signal_flow_removal(self, flow):
+ assert isinstance(flow, ofp.common.flow_stats_entry)
+ msg = ofp.message.flow_removed(
+ cookie=flow.cookie,
+ priority=flow.priority,
+ reason=None, # TODO
+ table_id=flow.table_id,
+ duration_sec=flow.duration_sec,
+ duration_nsec=flow.duration_nsec,
+ idle_timeout=flow.idle_timeout,
+ hard_timeout=flow.hard_timeout,
+ packet_count=flow.packet_count,
+ byte_count=flow.byte_count,
+ match=flow.match)
+ self.cxn.send(msg)
+
+ def send_packet_in(self, data, in_port):
+ match = ofp.match()
+ match.oxm_list.append(ofp.oxm.in_port(in_port))
+ msg = ofp.message.packet_in(
+ reason=ofp.OFPR_ACTION,
+ match=match,
+ data=data)
+ self.cxn.send(msg)
+
+ def handle_protocol(self):
+
+ cxn = self.cxn
+
+ # Send initial hello
+ cxn.send(ofp.message.hello())
+
+ if not cxn.recv(lambda msg: msg.type == ofp.OFPT_HELLO):
+ raise Exception("Did not receive initial HELLO")
+
+ while True:
+
+ try:
+ req = cxn.recv(lambda msg: True)
+ except AssertionError, e:
+ raise Exception("Connection is no longer alive")
+
+ print(pp(req))
+
+ if req is None:
+ # this simply means we timed out
+ # later we can use this to do other stuff
+ # for now we simply ignore this and loop back
+ pass
+
+ elif req.type == ofp.OFPT_FEATURES_REQUEST:
+ msg = ofp.message.features_reply(
+ xid=req.xid,
+ datapath_id=self.datapath_id,
+ n_buffers=256,
+ n_tables=2,
+ capabilities= (
+ ofp.OFPC_FLOW_STATS
+ | ofp.OFPC_TABLE_STATS
+ | ofp.OFPC_PORT_STATS
+ | ofp.OFPC_GROUP_STATS
+ )
+ )
+ cxn.send(msg)
+
+ elif req.type == ofp.OFPT_STATS_REQUEST:
+
+ if req.stats_type == ofp.OFPST_PORT_DESC:
+ # port stats request
+ msg = ofp.message.port_desc_stats_reply(
+ xid=req.xid,
+ #flags=None,
+ entries=self.store.port_list())
+ cxn.send(msg)
+
+ elif req.stats_type == ofp.OFPST_DESC:
+ # device description
+ msg = ofp.message.desc_stats_reply(
+ xid=req.xid,
+ flags=None,
+ mfr_desc=self.backend.mfr_desc,
+ hw_desc=self.backend.hw_desc,
+ sw_desc="pyofagent",
+ serial_num=self.backend.get_serial_num(),
+ dp_desc=self.backend.get_dp_desc())
+ cxn.send(msg)
+
+ elif req.stats_type == ofp.OFPST_FLOW:
+ # flow stats requested
+ msg = ofp.message.flow_stats_reply(
+ xid=req.xid, entries=self.store.flow_list())
+ cxn.send(msg)
+
+ elif req.stats_type == ofp.OFPST_TABLE:
+ # table stats requested
+ msg = ofp.message.table_stats_reply(
+ xid=req.xid, entries=self.store.table_stats())
+ cxn.send(msg)
+
+ elif req.stats_type == ofp.OFPST_PORT:
+ # port list
+ msg = ofp.message.port_stats_reply(
+ xid=req.xid, entries=self.store.port_stats())
+ cxn.send(msg)
+
+ elif req.stats_type == ofp.OFPST_GROUP:
+ msg = ofp.message.group_stats_reply(
+ xid=req.xid, entries=self.store.group_stats())
+ cxn.send(msg)
+
+ elif req.stats_type == ofp.OFPST_GROUP_DESC:
+ msg = ofp.message.group_desc_stats_reply(
+ xid=req.xid, entries=self.store.group_list())
+ cxn.send(msg)
+
+ elif req.stats_type == ofp.OFPST_METER:
+ msg = ofp.message.meter_stats_reply(
+ xid=req.xid, entries=[])
+ cxn.send(msg)
+
+ else:
+ logging.error("Unhandled stats type: %d in request:"
+ % req.stats_type)
+ logging.error(pp(req))
+
+ elif req.type == ofp.OFPT_SET_CONFIG:
+ # TODO ignored for now
+ pass
+
+ elif req.type == ofp.OFPT_BARRIER_REQUEST:
+ # TODO this will be the place to commit all changes before
+ # replying
+ # but now we send a reply right away
+ msg = ofp.message.barrier_reply(xid=req.xid)
+ cxn.send(msg)
+
+ elif req.type == ofp.OFPT_GET_CONFIG_REQUEST:
+ # send back configuration reply
+ msg = ofp.message.get_config_reply(
+ xid=req.xid, miss_send_len=ofp.OFPCML_NO_BUFFER)
+ cxn.send(msg)
+
+ elif req.type == ofp.OFPT_ROLE_REQUEST:
+ # TODO this is where we shall manage which connection is active
+ # now we simply verify that the role request is for active and
+ # reply
+ if req.role != ofp.OFPCR_ROLE_MASTER:
+ self.stop()
+ sys.exit(1)
+ msg = ofp.message.role_reply(
+ xid=req.xid, role=req.role,
+ generation_id=req.generation_id)
+ cxn.send(msg)
+
+ elif req.type == ofp.OFPT_PACKET_OUT:
+ in_port = req.in_port
+ data = req.data
+ for action in req.actions:
+ if action.type == ofp.OFPAT_OUTPUT:
+ port = action.port
+ self.backend.packet_out(in_port, port, data)
+ else:
+ logging.warn("Unhandled packet out action type %s"
+ % action.type)
+
+ elif req.type == ofp.OFPT_FLOW_MOD:
+
+ command = req._command
+
+ if command == ofp.OFPFC_ADD:
+ self.store.flow_add(req)
+
+ elif command == ofp.OFPFC_DELETE:
+ self.store.flow_delete(req)
+
+ elif command == ofp.OFPFC_DELETE_STRICT:
+ self.store.flow_delete_strict(req)
+
+ elif command == ofp.OFPFC_MODIFY:
+ self.store.flow_modify(req)
+
+ elif command == ofp.OFPFC_MODIFY_STRICT:
+ self.store.flow_modify_strict(req)
+
+ else:
+ logging.warn("Unhandled flow mod command %s in message:"
+ % command)
+ logging.warn(pp(req))
+
+ elif req.type == ofp.OFPT_GROUP_MOD:
+
+ command = req.command
+
+ if command == ofp.OFPGC_DELETE:
+ self.store.group_delete(req)
+
+ elif command == ofp.OFPGC_ADD:
+ self.store.group_add(req)
+
+ elif command == ofp.OFPGC_MODIFY:
+ self.store.group_modify(req)
+
+ else:
+ logging.warn("Unhandled group command %s in message:"
+ % command)
+ logging.warn(pp(req))
+
+ else:
+ logging.warn("Unhandled message from controller:")
+ logging.warn(pp(req))
+