Add leader, membership, and workload tracking

Still work in progress, but leader correctly detects workload
and/or membership change events and properly soaks before
rerunning reassignment routine (which is not flashed out yet).
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index 432c0fe..8992ccb 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -16,14 +16,6 @@
 
 """ Consul-based coordinator services """
 
-# TODO move this to the consul.twisted async client once it is available.
-# Note:
-# We use https://github.com/cablehead/python-consul for consul client.
-# It's master branch already provides support for Twisted, but the latest
-# released version (0.6.1) was cut before twisted support was added. So keep
-# an eye on when 0.6.2 comes out and move over to the twisted interface once
-# it's available.
-
 from consul import ConsulException
 from consul.twisted import Consul
 from requests import ConnectionError
@@ -33,6 +25,7 @@
 from twisted.internet.task import LoopingCall
 
 from asleep import asleep
+from leader import Leader
 
 
 class StaleMembershipEntryException(Exception):
@@ -40,18 +33,32 @@
 
 
 class Coordinator(object):
+    """
+    An app shall instantiate only one Coordinator (singleton).
+    A single instance of this object shall take care of all external
+    with consul, and via consul, all coordination activities with its
+    clustered peers. Roles include:
+    - registering an ephemeral membership entry (k/v record) in consul
+    - participating in a symmetric leader election, and potentially assuming
+      the leader's role. What leadership entails is not a concern for the
+      coordination, it simply instantiates (and shuts down) a leader class
+      when it gains (or looses) leadership.
+    """
 
     CONNECT_RETRY_INTERVAL_SEC = 1
     RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
     LEADER_KEY = 'service/voltha/leader'
     MEMBERSHIP_PREFIX = 'service/voltha/members/'
 
+    # Public methods:
+
     def __init__(self,
                  internal_host_address,
                  external_host_address,
                  instance_id,
                  rest_port,
-                 consul='localhost:8500'):
+                 consul='localhost:8500',
+                 leader_class=Leader):
 
         self.retries = 0
         self.instance_id = instance_id
@@ -64,6 +71,7 @@
         self.i_am_leader = False
         self.leader_id = None  # will be the instance id of the current leader
         self.shutting_down = False
+        self.leader = None
 
         self.log = get_logger()
         self.log.info('initializing-coordinator')
@@ -74,34 +82,50 @@
         # TODO need to handle reconnect events properly
         self.consul = Consul(host=host, port=port)
 
-        reactor.callLater(0, self.async_init)
+        reactor.callLater(0, self._async_init)
         self.log.info('initialized-coordinator')
 
     @inlineCallbacks
-    def async_init(self):
-        yield self.create_session()
-        yield self.create_membership_record()
-        yield self.start_leader_tracking()
-
-    @inlineCallbacks
     def shutdown(self):
         self.shutting_down = True
-        yield self.delete_session()  # this will delete the leader lock too
+        yield self._delete_session()  # this will delete the leader lock too
+        if self.leader is not None:
+            yield self.leader.halt()
+            self.leader = None
 
-    def backoff(self, msg):
+    # Proxy methods for consul with retry support
+
+    def kv_get(self, *args, **kw):
+        return self._retry(self.consul.kv.get, *args, **kw)
+
+    def kv_put(self, *args, **kw):
+        return self._retry(self.consul.kv.put, *args, **kw)
+
+    def kv_delete(self, *args, **kw):
+        return self._retry(self.consul.kv.delete, *args, **kw)
+
+    # Private (internal) methods:
+
+    @inlineCallbacks
+    def _async_init(self):
+        yield self._create_session()
+        yield self._create_membership_record()
+        yield self._start_leader_tracking()
+
+    def _backoff(self, msg):
         wait_time = self.RETRY_BACKOFF[min(self.retries,
                                            len(self.RETRY_BACKOFF) - 1)]
         self.retries += 1
         self.log.error(msg, retry_in=wait_time)
         return asleep(wait_time)
 
-    def clear_backoff(self):
+    def _clear_backoff(self):
         if self.retries:
             self.log.info('reconnected-to-consul', after_retries=self.retries)
             self.retries = 0
 
     @inlineCallbacks
-    def create_session(self):
+    def _create_session(self):
 
         @inlineCallbacks
         def _renew_session():
@@ -127,16 +151,16 @@
         yield self._retry(_create_session)
 
     @inlineCallbacks
-    def delete_session(self):
+    def _delete_session(self):
         yield self.consul.session.destroy(self.session_id)
 
     @inlineCallbacks
-    def create_membership_record(self):
-        yield self._retry(self._create_membership_record)
+    def _create_membership_record(self):
+        yield self._retry(self._do_create_membership_record)
         reactor.callLater(0, self._maintain_membership_record)
 
     @inlineCallbacks
-    def _create_membership_record(self):
+    def _do_create_membership_record(self):
         result = yield self.consul.kv.put(
             self.membership_record_key, 'alive',
             acquire=self.session_id)
@@ -155,16 +179,16 @@
                                index=index, record=record)
                 if record is None or record['Session'] != self.session_id:
                     self.log.debug('remaking-membership-record')
-                    yield self._create_membership_record()
+                    yield self._do_create_membership_record()
 
         except Exception, e:
             self.log.exception('unexpected-error-leader-trackin', e=e)
 
         finally:
             # no matter what, the loop need to continue (after a short delay)
-            reactor.callAfter(0.1, self._maintain_membership_record)
+            reactor.callLater(0.1, self._maintain_membership_record)
 
-    def start_leader_tracking(self):
+    def _start_leader_tracking(self):
         reactor.callLater(0, self._leadership_tracking_loop)
 
     @inlineCallbacks
@@ -172,7 +196,7 @@
 
         try:
 
-            # Attempt to acquire leadership lock. True indicates success,
+            # Attempt to acquire leadership lock. True indicates success;
             # False indicates there is already a leader. It's instance id
             # is then the value under the leader key service/voltha/leader.
 
@@ -196,12 +220,12 @@
             if record is not None:
                 if result is True:
                     if record['Session'] == self.session_id:
-                        self._assert_leadership()
+                        yield self._assert_leadership()
                     else:
                         pass  # confusion; need to retry leadership
                 else:
                     leader_id = record['Value']
-                    self._assert_nonleadership(leader_id)
+                    yield self._assert_nonleadership(leader_id)
 
             # if record was none, we shall try leadership again
 
@@ -227,13 +251,15 @@
             # no matter what, the loop need to continue (after a short delay)
             reactor.callLater(1, self._leadership_tracking_loop)
 
+    @inlineCallbacks
     def _assert_leadership(self):
         """(Re-)assert leadership"""
         if not self.i_am_leader:
             self.i_am_leader = True
             self.leader_id = self.instance_id
-            self._just_gained_leadership()
+            yield self._just_gained_leadership()
 
+    @inlineCallbacks
     def _assert_nonleadership(self, leader_id):
         """(Re-)assert non-leader role"""
 
@@ -242,13 +268,21 @@
 
         if self.i_am_leader:
             self.i_am_leader = False
-            self._just_lost_leadership()
+            yield self._just_lost_leadership()
 
     def _just_gained_leadership(self):
         self.log.info('became-leader')
+        self.leader = Leader(self)
+        return self.leader.start()
 
     def _just_lost_leadership(self):
         self.log.info('lost-leadership')
+        return self._halt_leader()
+
+    def _halt_leader(self):
+        d = self.leader.halt()
+        self.leader = None
+        return d
 
     @inlineCallbacks
     def _retry(self, func, *args, **kw):
@@ -257,13 +291,13 @@
                 result = yield func(*args, **kw)
                 break
             except ConsulException, e:
-                yield self.backoff('consul-not-upC')
+                yield self._backoff('consul-not-upC')
             except ConnectionError, e:
-                yield self.backoff('cannot-connect-to-consul')
+                yield self._backoff('cannot-connect-to-consul')
             except StaleMembershipEntryException, e:
-                yield self.backoff('stale-membership-record-in-the-way')
+                yield self._backoff('stale-membership-record-in-the-way')
             except Exception, e:
                 self.log.exception(e)
-                yield self.backoff('unknown-error')
+                yield self._backoff('unknown-error')
 
         returnValue(result)