Async/streaming gRPC client/server proto
This experiment was to fine-tune how we can implement
async gRPC client and server code inside a Twisted
python app.
Change-Id: I945014e27f4b9d6ed624666e0284cc298548adb3
Major cleanup of openflow_13.proto
Change-Id: I4e54eaf87b682124ec518a0ade1a6050a6ec6da8
Relocated openflow_13.proto to voltha
Change-Id: I66ae45a9142d180c2c6651e75c7a1ee08aef7ef8
Removed forced utest from make build
Change-Id: If0da58e9d135ebde6ca68c3316688a03a7b10f2f
twisted openflow agent first pass
Change-Id: Ibe5b4727ccfe92e6fd464ccd3baf6275569ef5d3
store openflow derived files
Change-Id: Ib3e1384bb2ca2a9c0872767f7b793f96b0a154e2
Minor cleanup
Change-Id: I1280ed3acb606121b616a0efd573f5f59d010dca
Factored out common utils
Change-Id: Icd86fcd50f60d0900924674cbcd65e13e47782a1
Refactored twisted agent
Change-Id: I71f26ce5357a4f98477df60b8c5ddc068cf75d43
Relocated openflow agent to ofagent
... and preserved obsolete working (non-twisted) agent under
~/obsolete, so we can still run the olt-oftest and pass tests,
unit the new twisted based agent reaches that maturity point.
Change-Id: I727f8d7144b1291a40276dad2966b7643bd7bc4b
olt-oftest in fake mode works with new agent
Change-Id: I43b4f5812e8dfaa9f45e4a77fdcf6c30ac520f8d
Initial ofagent/voltha operation
Change-Id: Ia8104f1285a6b1c51635d36d7d78fc113f800e79
Additional callouts to Voltha
Change-Id: If8f483d5140d3c9d45f22b480b8d33249a29cd4e
More gRPC calls
Change-Id: I7d24fadf9425217fb26ffe18f25359d072ef38fa
Flow add/list now works
Change-Id: Ie3e3e73108645b47891cef798fc61372a022fd93
Missed some files
Change-Id: I29e81238ff1a26c095c0c73e521579edf7092e21
diff --git a/ofagent/agent.py b/ofagent/agent.py
new file mode 100644
index 0000000..5196e6d
--- /dev/null
+++ b/ofagent/agent.py
@@ -0,0 +1,145 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+
+import structlog
+from twisted.internet import protocol
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred, inlineCallbacks
+
+import loxi.of13 as of13
+from common.utils.asleep import asleep
+from of_connection import OpenFlowConnection
+from of_protocol_handler import OpenFlowProtocolHandler
+
+
+log = structlog.get_logger()
+
+
+class Agent(protocol.ClientFactory):
+
+ def __init__(self, controller_endpoint, datapath_id, rpc_stub,
+ conn_retry_interval=1):
+ self.controller_endpoint = controller_endpoint
+ self.datapath_id = datapath_id
+ self.rpc_stub = rpc_stub
+ self.retry_interval = conn_retry_interval
+
+ self.running = False
+ self.connector = None # will be a Connector instance once connected
+ self.d_disconnected = None # a deferred to signal reconnect loop when
+ # TCP connection is lost
+ self.connected = False
+ self.exiting = False
+
+ def run(self):
+ if self.running:
+ return
+ self.running = True
+ reactor.callLater(0, self.keep_connected)
+ return self
+
+ def resolve_endpoint(self, endpoint):
+ # TODO allow resolution via consul
+ host, port = endpoint.split(':', 2)
+ return host, int(port)
+
+ @inlineCallbacks
+ def keep_connected(self):
+ """Keep reconnecting to the controller"""
+ while not self.exiting:
+ host, port = self.resolve_endpoint(self.controller_endpoint)
+ log.info('connecting', host=host, port=port)
+ self.connector = reactor.connectTCP(host, port, self)
+ self.d_disconnected = Deferred()
+ yield self.d_disconnected
+ log.debug('reconnect', after_delay=self.retry_interval)
+ yield asleep(self.retry_interval)
+
+ def stop(self):
+ self.connected = False
+ self.exiting = True
+ self.connector.disconnect()
+ log.info('stopped')
+
+ def enter_disconnected(self, event, reason):
+ """Internally signal entering disconnected state"""
+ log.error(event, reason=reason)
+ self.connected = False
+ self.d_disconnected.callback(None)
+
+ def enter_connected(self):
+ """Handle transitioning from disconnected to connected state"""
+ log.info('connected')
+ self.connected = True
+ self.read_buffer = None
+ reactor.callLater(0, self.proto_handler.run)
+
+ # protocol.ClientFactory methods
+
+ def protocol(self):
+ cxn = OpenFlowConnection(self) # Low level message handler
+ self.proto_handler = OpenFlowProtocolHandler(
+ self.datapath_id, self, cxn, self.rpc_stub)
+ return cxn
+
+ def clientConnectionFailed(self, connector, reason):
+ self.enter_disconnected('connection-failed', reason)
+
+ def clientConnectionLost(self, connector, reason):
+ log.error('client-connection-lost',
+ reason=reason, connector=connector)
+
+
+if __name__ == '__main__':
+ """Run this to test the agent for N concurrent sessions:
+ python agent [<number-of-desired-instances>]
+ """
+
+ n = 1 if len(sys.argv) < 2 else int(sys.argv[1])
+
+ from utils import mac_str_to_tuple
+
+ class MockRpc(object):
+ @staticmethod
+ def get_port_list(_):
+ ports = []
+ cap = of13.OFPPF_1GB_FD | of13.OFPPF_FIBER
+ for pno, mac, nam, cur, adv, sup, spe in (
+ (1, '00:00:00:00:00:01', 'onu1', cap, cap, cap,
+ of13.OFPPF_1GB_FD),
+ (2, '00:00:00:00:00:02', 'onu2', cap, cap, cap,
+ of13.OFPPF_1GB_FD),
+ (129, '00:00:00:00:00:81', 'olt', cap, cap, cap,
+ of13.OFPPF_1GB_FD)
+ ):
+ port = of13.common.port_desc(pno, mac_str_to_tuple(mac), nam,
+ curr=cur, advertised=adv,
+ supported=sup,
+ curr_speed=spe, max_speed=spe)
+ ports.append(port)
+ return ports
+
+ stub = MockRpc()
+ agents = [Agent('localhost:6633', 256 + i, stub).run() for i in range(n)]
+
+ def shutdown():
+ [a.stop() for a in agents]
+
+ reactor.addSystemEventTrigger('before', 'shutdown', shutdown)
+ reactor.run()
+