Add leader, membership, and workload tracking
Still work in progress, but leader correctly detects workload
and/or membership change events and properly soaks before
rerunning reassignment routine (which is not flashed out yet).
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index 432c0fe..8992ccb 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -16,14 +16,6 @@
""" Consul-based coordinator services """
-# TODO move this to the consul.twisted async client once it is available.
-# Note:
-# We use https://github.com/cablehead/python-consul for consul client.
-# It's master branch already provides support for Twisted, but the latest
-# released version (0.6.1) was cut before twisted support was added. So keep
-# an eye on when 0.6.2 comes out and move over to the twisted interface once
-# it's available.
-
from consul import ConsulException
from consul.twisted import Consul
from requests import ConnectionError
@@ -33,6 +25,7 @@
from twisted.internet.task import LoopingCall
from asleep import asleep
+from leader import Leader
class StaleMembershipEntryException(Exception):
@@ -40,18 +33,32 @@
class Coordinator(object):
+ """
+ An app shall instantiate only one Coordinator (singleton).
+ A single instance of this object shall take care of all external
+ with consul, and via consul, all coordination activities with its
+ clustered peers. Roles include:
+ - registering an ephemeral membership entry (k/v record) in consul
+ - participating in a symmetric leader election, and potentially assuming
+ the leader's role. What leadership entails is not a concern for the
+ coordination, it simply instantiates (and shuts down) a leader class
+ when it gains (or looses) leadership.
+ """
CONNECT_RETRY_INTERVAL_SEC = 1
RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
LEADER_KEY = 'service/voltha/leader'
MEMBERSHIP_PREFIX = 'service/voltha/members/'
+ # Public methods:
+
def __init__(self,
internal_host_address,
external_host_address,
instance_id,
rest_port,
- consul='localhost:8500'):
+ consul='localhost:8500',
+ leader_class=Leader):
self.retries = 0
self.instance_id = instance_id
@@ -64,6 +71,7 @@
self.i_am_leader = False
self.leader_id = None # will be the instance id of the current leader
self.shutting_down = False
+ self.leader = None
self.log = get_logger()
self.log.info('initializing-coordinator')
@@ -74,34 +82,50 @@
# TODO need to handle reconnect events properly
self.consul = Consul(host=host, port=port)
- reactor.callLater(0, self.async_init)
+ reactor.callLater(0, self._async_init)
self.log.info('initialized-coordinator')
@inlineCallbacks
- def async_init(self):
- yield self.create_session()
- yield self.create_membership_record()
- yield self.start_leader_tracking()
-
- @inlineCallbacks
def shutdown(self):
self.shutting_down = True
- yield self.delete_session() # this will delete the leader lock too
+ yield self._delete_session() # this will delete the leader lock too
+ if self.leader is not None:
+ yield self.leader.halt()
+ self.leader = None
- def backoff(self, msg):
+ # Proxy methods for consul with retry support
+
+ def kv_get(self, *args, **kw):
+ return self._retry(self.consul.kv.get, *args, **kw)
+
+ def kv_put(self, *args, **kw):
+ return self._retry(self.consul.kv.put, *args, **kw)
+
+ def kv_delete(self, *args, **kw):
+ return self._retry(self.consul.kv.delete, *args, **kw)
+
+ # Private (internal) methods:
+
+ @inlineCallbacks
+ def _async_init(self):
+ yield self._create_session()
+ yield self._create_membership_record()
+ yield self._start_leader_tracking()
+
+ def _backoff(self, msg):
wait_time = self.RETRY_BACKOFF[min(self.retries,
len(self.RETRY_BACKOFF) - 1)]
self.retries += 1
self.log.error(msg, retry_in=wait_time)
return asleep(wait_time)
- def clear_backoff(self):
+ def _clear_backoff(self):
if self.retries:
self.log.info('reconnected-to-consul', after_retries=self.retries)
self.retries = 0
@inlineCallbacks
- def create_session(self):
+ def _create_session(self):
@inlineCallbacks
def _renew_session():
@@ -127,16 +151,16 @@
yield self._retry(_create_session)
@inlineCallbacks
- def delete_session(self):
+ def _delete_session(self):
yield self.consul.session.destroy(self.session_id)
@inlineCallbacks
- def create_membership_record(self):
- yield self._retry(self._create_membership_record)
+ def _create_membership_record(self):
+ yield self._retry(self._do_create_membership_record)
reactor.callLater(0, self._maintain_membership_record)
@inlineCallbacks
- def _create_membership_record(self):
+ def _do_create_membership_record(self):
result = yield self.consul.kv.put(
self.membership_record_key, 'alive',
acquire=self.session_id)
@@ -155,16 +179,16 @@
index=index, record=record)
if record is None or record['Session'] != self.session_id:
self.log.debug('remaking-membership-record')
- yield self._create_membership_record()
+ yield self._do_create_membership_record()
except Exception, e:
self.log.exception('unexpected-error-leader-trackin', e=e)
finally:
# no matter what, the loop need to continue (after a short delay)
- reactor.callAfter(0.1, self._maintain_membership_record)
+ reactor.callLater(0.1, self._maintain_membership_record)
- def start_leader_tracking(self):
+ def _start_leader_tracking(self):
reactor.callLater(0, self._leadership_tracking_loop)
@inlineCallbacks
@@ -172,7 +196,7 @@
try:
- # Attempt to acquire leadership lock. True indicates success,
+ # Attempt to acquire leadership lock. True indicates success;
# False indicates there is already a leader. It's instance id
# is then the value under the leader key service/voltha/leader.
@@ -196,12 +220,12 @@
if record is not None:
if result is True:
if record['Session'] == self.session_id:
- self._assert_leadership()
+ yield self._assert_leadership()
else:
pass # confusion; need to retry leadership
else:
leader_id = record['Value']
- self._assert_nonleadership(leader_id)
+ yield self._assert_nonleadership(leader_id)
# if record was none, we shall try leadership again
@@ -227,13 +251,15 @@
# no matter what, the loop need to continue (after a short delay)
reactor.callLater(1, self._leadership_tracking_loop)
+ @inlineCallbacks
def _assert_leadership(self):
"""(Re-)assert leadership"""
if not self.i_am_leader:
self.i_am_leader = True
self.leader_id = self.instance_id
- self._just_gained_leadership()
+ yield self._just_gained_leadership()
+ @inlineCallbacks
def _assert_nonleadership(self, leader_id):
"""(Re-)assert non-leader role"""
@@ -242,13 +268,21 @@
if self.i_am_leader:
self.i_am_leader = False
- self._just_lost_leadership()
+ yield self._just_lost_leadership()
def _just_gained_leadership(self):
self.log.info('became-leader')
+ self.leader = Leader(self)
+ return self.leader.start()
def _just_lost_leadership(self):
self.log.info('lost-leadership')
+ return self._halt_leader()
+
+ def _halt_leader(self):
+ d = self.leader.halt()
+ self.leader = None
+ return d
@inlineCallbacks
def _retry(self, func, *args, **kw):
@@ -257,13 +291,13 @@
result = yield func(*args, **kw)
break
except ConsulException, e:
- yield self.backoff('consul-not-upC')
+ yield self._backoff('consul-not-upC')
except ConnectionError, e:
- yield self.backoff('cannot-connect-to-consul')
+ yield self._backoff('cannot-connect-to-consul')
except StaleMembershipEntryException, e:
- yield self.backoff('stale-membership-record-in-the-way')
+ yield self._backoff('stale-membership-record-in-the-way')
except Exception, e:
self.log.exception(e)
- yield self.backoff('unknown-error')
+ yield self._backoff('unknown-error')
returnValue(result)