VOL-1670 - close and reestablish connections
When grpc connectivity to the core is broken the ofagent will
break the connection to ONOS and then work to reconnect to
the core. After connecting to the core the connection to
ONOS will be restablished.
Change-Id: I75e645de3784a64ef4f9992df8baf37959cbbd86
diff --git a/python/ofagent/connection_mgr.py b/python/ofagent/connection_mgr.py
index 3b10280..9c64854 100755
--- a/python/ofagent/connection_mgr.py
+++ b/python/ofagent/connection_mgr.py
@@ -37,12 +37,6 @@
# _ = third_party
class ConnectionManager(object):
- running = False
- core_ready = False
- channel = None
- subscription = None
- grpc_client = None
-
def __init__(self, consul_endpoint,
vcore_endpoint, vcore_grpc_timeout, vcore_binding_key,
vcore_transaction_key, controller_endpoints, instance_id,
@@ -73,6 +67,8 @@
self.devices_refresh_interval = devices_refresh_interval
self.subscription_refresh_interval = subscription_refresh_interval
self.subscription = None
+ self.connecting = True
+ self.monitor = True
self.running = False
@@ -84,8 +80,6 @@
log.debug('starting')
self.running = True
- ConnectionManager.core_ready = True # Assume core is ready until proven otherwise
- ConnectionManager.running = True
# Get a subscription to vcore
reactor.callInThread(self.get_vcore_subscription)
@@ -97,25 +91,25 @@
return self
- @classmethod
- def liveness_probe(cls):
- # Pod restarts when liveness condition fails
- return ConnectionManager.running
+ def grpc_client_terminated(self):
+ if not self.connecting and self.grpc_client is not None:
+ self.connecting = True
+ self._reset_grpc_attributes()
+ self.delete_all_agents()
+ reactor.callInThread(self.get_vcore_subscription)
- @classmethod
- def readiness_probe(cls):
+ def liveness_probe(self):
+ return self.running
+
+ def readiness_probe(self):
# Pod is isolated when readiness condition fails
- return bool(ConnectionManager.core_ready and ConnectionManager.channel and ConnectionManager.subscription and ConnectionManager.grpc_client)
+ return bool(not self.connecting and self.channel and self.subscription and self.grpc_client)
def stop(self):
log.debug('stopping')
- # clean up all controller connections
- for agent in self.agent_map.itervalues():
- agent.stop()
- self.running = False
-
+ self.delete_all_agents()
self._reset_grpc_attributes()
-
+ self.running = False
log.info('stopped')
def resolve_endpoint(self, endpoint):
@@ -148,10 +142,6 @@
self.subscription = None
self.grpc_client = None
- ConnectionManager.channel = None
- ConnectionManager.subscription = None
- ConnectionManager.grpc_client = None
-
log.debug('stop-reset-grpc-attributes')
def _assign_grpc_attributes(self):
@@ -166,15 +156,13 @@
# Establish a connection to the vcore GRPC server
self.channel = grpc.insecure_channel('{}:{}'.format(host, port))
- # For Readiness probe
- ConnectionManager.channel = self.channel
-
log.debug('stop-assign-grpc-attributes')
@inlineCallbacks
def get_vcore_subscription(self):
log.debug('start-get-vcore-subscription')
+ self.connecting = True
while self.running and self.subscription is None:
try:
# If a subscription is not yet assigned then establish new GRPC connection
@@ -190,9 +178,6 @@
subscription = yield self.grpc_client.subscribe(
OfAgentSubscriber(ofagent_id=container_name))
- #For Readiness probes
- ConnectionManager.subscription = subscription
- ConnectionManager.grpc_client = self.grpc_client
# If the subscriber id matches the current instance
# ... then the subscription has succeeded
if subscription is not None and subscription.ofagent_id == container_name:
@@ -201,6 +186,7 @@
log.debug('subscription-with-vcore-successful', subscription=subscription)
self.subscription = subscription
self.grpc_client.start()
+ self.connecting = False
# Sleep a bit in between each subscribe
yield asleep(self.subscription_refresh_interval)
@@ -228,30 +214,32 @@
log.debug('stop-get-vcore-subscription')
+ def get_rpc_client(self):
+ return self.grpc_client if not self.connecting else None
+
@inlineCallbacks
def get_list_of_logical_devices_from_voltha(self):
while self.running:
log.info('retrieve-logical-device-list')
- try:
- devices = yield \
- self.grpc_client.list_logical_devices()
+ rpc = self.get_rpc_client()
+ if rpc is not None:
+ try:
+ devices = yield rpc.list_logical_devices()
- ConnectionManager.core_ready = True # We've successfully talked to the core
+ for device in devices:
+ log.info("logical-device-entry", id=device.id,
+ datapath_id=device.datapath_id)
- for device in devices:
- log.info("logical-device-entry", id=device.id,
- datapath_id=device.datapath_id)
+ returnValue(devices)
- returnValue(devices)
+ except _Rendezvous, e:
+ rpc.stop()
+ status = e.code()
+ log.error('vcore-communication-failure', exception=e, status=status)
- except _Rendezvous, e:
- status = e.code()
- log.error('vcore-communication-failure', exception=e, status=status)
- ConnectionManager.core_ready = False # Will be reflected in readiness probe
-
- except Exception as e:
- log.exception('logical-devices-retrieval-failure', exception=e)
- ConnectionManager.core_ready = False # will be reflected in readiness probe
+ except Exception as e:
+ rpc.stop()
+ log.exception('logical-devices-retrieval-failure', exception=e)
log.info('reconnect', after_delay=self.vcore_retry_interval)
yield asleep(self.vcore_retry_interval)
@@ -295,13 +283,18 @@
datapath_id = device.datapath_id
device_id = device.id
for controller_endpoint in self.controller_endpoints:
- agent = Agent(controller_endpoint, datapath_id,
- device_id, self.grpc_client, self.enable_tls,
+ agent = Agent(self, controller_endpoint, datapath_id,
+ device_id, self.enable_tls,
self.key_file, self.cert_file)
agent.start()
self.agent_map[(datapath_id,controller_endpoint)] = agent
self.device_id_to_datapath_id_map[device_id] = datapath_id
+ def delete_all_agents(self):
+ for agent in self.agent_map.itervalues(): agent.stop()
+ self.agent_map = {}
+ self.device_id_to_datapath_id_map = {}
+
def delete_agent(self, datapath_id):
for controller_endpoint in self.controller_endpoints:
agent = self.agent_map[(datapath_id,controller_endpoint)]
@@ -321,11 +314,9 @@
# see https://jira.opencord.org/browse/CORD-821
try:
- if self.channel is not None and self.grpc_client is not None and \
- self.subscription is not None:
+ if self.channel is not None and self.get_rpc_client() is not None and self.subscription is not None:
# get current list from Voltha
- devices = yield \
- self.get_list_of_logical_devices_from_voltha()
+ devices = yield self.get_list_of_logical_devices_from_voltha()
# update agent list and mapping tables as needed
self.refresh_agent_connections(devices)