https://jira.opencord.org/browse/CORD-824
Support for connecting multiple controllers in voltha provided.
Addressed review comments patch 7
Change-Id: I2d2375c7754014743c305a0f3841debe5eb3e795
diff --git a/ofagent/connection_mgr.py b/ofagent/connection_mgr.py
index 11bc0c0..8a4081c 100644
--- a/ofagent/connection_mgr.py
+++ b/ofagent/connection_mgr.py
@@ -39,18 +39,19 @@
class ConnectionManager(object):
- def __init__(self, consul_endpoint, voltha_endpoint, controller_endpoint,
+ def __init__(self, consul_endpoint, voltha_endpoint, controller_endpoints,
voltha_retry_interval=0.5, devices_refresh_interval=5):
log.info('init-connection-manager')
- self.controller_endpoint = controller_endpoint
+ log.info('list-of-controllers',controller_endpoints=controller_endpoints)
+ self.controller_endpoints = controller_endpoints
self.consul_endpoint = consul_endpoint
self.voltha_endpoint = voltha_endpoint
self.channel = None
self.grpc_client = None # single, shared gRPC client to Voltha
- self.agent_map = {} # datapath_id -> Agent()
+ self.agent_map = {} # (datapath_id, controller_endpoint) -> Agent()
self.device_id_to_datapath_id_map = {}
self.voltha_retry_interval = voltha_retry_interval
@@ -156,7 +157,7 @@
# Use datapath ids for deciding what's new and what's obsolete
desired_datapath_ids = set(d.datapath_id for d in devices)
- current_datapath_ids = set(self.agent_map.iterkeys())
+ current_datapath_ids = set(datapath_ids[0] for datapath_ids in self.agent_map.iterkeys())
# if identical, nothing to do
if desired_datapath_ids == current_datapath_ids:
@@ -182,18 +183,20 @@
def create_agent(self, device):
datapath_id = device.datapath_id
device_id = device.id
- agent = Agent(self.controller_endpoint, datapath_id,
+ for controller_endpoint in self.controller_endpoints:
+ agent = Agent(controller_endpoint, datapath_id,
device_id, self.grpc_client)
- agent.start()
- self.agent_map[datapath_id] = agent
- self.device_id_to_datapath_id_map[device_id] = datapath_id
+ agent.start()
+ self.agent_map[(datapath_id,controller_endpoint)] = agent
+ self.device_id_to_datapath_id_map[device_id] = datapath_id
def delete_agent(self, datapath_id):
- agent = self.agent_map[datapath_id]
- device_id = agent.get_device_id()
- agent.stop()
- del self.agent_map[datapath_id]
- del self.device_id_to_datapath_id_map[device_id]
+ for controller_endpoint in self.controller_endpoints:
+ agent = self.agent_map[(datapath_id,controller_endpoint)]
+ device_id = agent.get_device_id()
+ agent.stop()
+ del self.agent_map[(datapath_id,controller_endpoint)]
+ del self.device_id_to_datapath_id_map[device_id]
@inlineCallbacks
def monitor_logical_devices(self):
@@ -214,11 +217,13 @@
def forward_packet_in(self, device_id, ofp_packet_in):
datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
if datapath_id:
- agent = self.agent_map[datapath_id]
- agent.forward_packet_in(ofp_packet_in)
+ for controller_endpoint in self.controller_endpoints:
+ agent = self.agent_map[(datapath_id,controller_endpoint)]
+ agent.forward_packet_in(ofp_packet_in)
def forward_change_event(self, device_id, event):
datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
if datapath_id:
- agent = self.agent_map[datapath_id]
- agent.forward_change_event(event)
+ for controller_endpoint in self.controller_endpoints:
+ agent = self.agent_map[(datapath_id,controller_endpoint)]
+ agent.forward_change_event(event)
diff --git a/ofagent/main.py b/ofagent/main.py
index f2e8122..4811380 100755
--- a/ofagent/main.py
+++ b/ofagent/main.py
@@ -60,10 +60,10 @@
default=defs['consul'],
help=_help)
- _help = '<hostname>:<port> to openflow controller (default: %s)' % \
+ _help = '<hostname1>:<port1> <hostname2>:<port2> <hostname3>:<port3> ... <hostnamen>:<portn> to openflow controller (default: %s)' % \
defs['controller']
parser.add_argument(
- '-O', '--controller', dest='controller', action='store',
+ '-O', '--controller',nargs = '*', dest='controller', action='store',
default=defs['controller'],
help=_help)
diff --git a/ofagent/of_protocol_handler.py b/ofagent/of_protocol_handler.py
index a91a4a1..f4af2b9 100644
--- a/ofagent/of_protocol_handler.py
+++ b/ofagent/of_protocol_handler.py
@@ -42,6 +42,7 @@
self.agent = agent
self.cxn = cxn
self.rpc = rpc
+ self.role = None
@inlineCallbacks
def start(self):
@@ -108,12 +109,16 @@
raise NotImplementedError()
def handle_flow_mod_request(self, req):
- try:
- grpc_req = to_grpc(req)
- except Exception, e:
- log.exception('failed-to-convert', e=e)
- else:
- return self.rpc.update_flow_table(self.device_id, grpc_req)
+ if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
+ try:
+ grpc_req = to_grpc(req)
+ except Exception, e:
+ log.exception('failed-to-convert', e=e)
+ else:
+ return self.rpc.update_flow_table(self.device_id, grpc_req)
+
+ elif self.role == ofp.OFPCR_ROLE_SLAVE:
+ self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
def handle_get_async_request(self, req):
raise NotImplementedError()
@@ -126,21 +131,33 @@
@inlineCallbacks
def handle_group_mod_request(self, req):
- yield self.rpc.update_group_table(self.device_id, to_grpc(req))
+ if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
+ yield self.rpc.update_group_table(self.device_id, to_grpc(req))
+ elif self.role == ofp.OFPCR_ROLE_SLAVE:
+ self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
+
def handle_meter_mod_request(self, req):
raise NotImplementedError()
def handle_role_request(self, req):
- # Handle role messages appropriately to support multiple controllers
- # see https://jira.opencord.org/browse/CORD-824
- if req.role != ofp.OFPCR_ROLE_MASTER:
- raise NotImplementedError()
- self.cxn.send(ofp.message.role_reply(
+ # https://jira.opencord.org/browse/CORD-1174
+ # Need to handle generator_id
+ if req.role == ofp.OFPCR_ROLE_MASTER or req.role == ofp.OFPCR_ROLE_SLAVE:
+ self.role = req.role
+ self.cxn.send(ofp.message.role_reply(
xid=req.xid, role=req.role, generation_id=req.generation_id))
+ elif req.role == ofp.OFPCR_ROLE_EQUAL:
+ self.role = req.role
+ self.cxn.send(ofp.message.role_reply(
+ xid=req.xid, role=req.role))
def handle_packet_out_request(self, req):
- self.rpc.send_packet_out(self.device_id, to_grpc(req))
+ if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
+ self.rpc.send_packet_out(self.device_id, to_grpc(req))
+
+ elif self.role == ofp.OFPCR_ROLE_SLAVE:
+ self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
def handle_set_config_request(self, req):
# Handle set config appropriately
@@ -270,8 +287,9 @@
}
def forward_packet_in(self, ofp_packet_in):
- log.info('sending-packet-in', ofp_packet_in=ofp_packet_in)
- self.cxn.send(to_loxi(ofp_packet_in))
+ if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
+ log.info('sending-packet-in', ofp_packet_in=ofp_packet_in)
+ self.cxn.send(to_loxi(ofp_packet_in))
def forward_port_status(self, ofp_port_status):
self.cxn.send(to_loxi(ofp_port_status))