Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 1 | # |
Zsolt Haraszti | 3eb27a5 | 2017-01-03 21:56:48 -0800 | [diff] [blame] | 2 | # Copyright 2017 the original author or authors. |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | # |
| 16 | |
| 17 | """ Consul-based coordinator services """ |
| 18 | |
Zsolt Haraszti | 109db83 | 2016-09-16 16:32:36 -0700 | [diff] [blame] | 19 | from consul import ConsulException |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 20 | from consul.twisted import Consul |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 21 | from requests import ConnectionError |
| 22 | from structlog import get_logger |
| 23 | from twisted.internet import reactor |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 24 | from twisted.internet.defer import inlineCallbacks, returnValue, Deferred |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 25 | from twisted.internet.task import LoopingCall |
Zsolt Haraszti | dafefe1 | 2016-11-14 21:29:58 -0800 | [diff] [blame] | 26 | from zope.interface import implementer |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 27 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 28 | from leader import Leader |
Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 29 | from common.utils.asleep import asleep |
khenaidoo | 08d48d2 | 2017-06-29 19:42:49 -0400 | [diff] [blame^] | 30 | from common.utils.message_queue import MessageQueue |
Zsolt Haraszti | dafefe1 | 2016-11-14 21:29:58 -0800 | [diff] [blame] | 31 | from voltha.registry import IComponent |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 32 | from worker import Worker |
khenaidoo | a8588f2 | 2017-06-16 12:13:34 -0400 | [diff] [blame] | 33 | from simplejson import dumps, loads |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 34 | |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 35 | log = get_logger() |
| 36 | |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 37 | |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 38 | class StaleMembershipEntryException(Exception): |
| 39 | pass |
| 40 | |
| 41 | |
Zsolt Haraszti | dafefe1 | 2016-11-14 21:29:58 -0800 | [diff] [blame] | 42 | @implementer(IComponent) |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 43 | class Coordinator(object): |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 44 | """ |
| 45 | An app shall instantiate only one Coordinator (singleton). |
| 46 | A single instance of this object shall take care of all external |
| 47 | with consul, and via consul, all coordination activities with its |
| 48 | clustered peers. Roles include: |
| 49 | - registering an ephemeral membership entry (k/v record) in consul |
| 50 | - participating in a symmetric leader election, and potentially assuming |
| 51 | the leader's role. What leadership entails is not a concern for the |
| 52 | coordination, it simply instantiates (and shuts down) a leader class |
| 53 | when it gains (or looses) leadership. |
| 54 | """ |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 55 | |
| 56 | CONNECT_RETRY_INTERVAL_SEC = 1 |
| 57 | RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5] |
| 58 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 59 | # Public methods: |
| 60 | |
Zsolt Haraszti | de22bbc | 2016-09-14 15:27:33 -0700 | [diff] [blame] | 61 | def __init__(self, |
| 62 | internal_host_address, |
| 63 | external_host_address, |
| 64 | instance_id, |
| 65 | rest_port, |
Rouzbahan Rashidi-Tabrizi | 1c3eba8 | 2016-10-27 21:47:18 -0400 | [diff] [blame] | 66 | config, |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 67 | consul='localhost:8500'): |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 68 | |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 69 | log.info('initializing-coordinator') |
Rouzbahan Rashidi-Tabrizi | 1c3eba8 | 2016-10-27 21:47:18 -0400 | [diff] [blame] | 70 | self.config = config['coordinator'] |
| 71 | self.worker_config = config['worker'] |
| 72 | self.leader_config = config['leader'] |
Rouzbahan Rashidi-Tabrizi | 1c3eba8 | 2016-10-27 21:47:18 -0400 | [diff] [blame] | 73 | self.membership_watch_relatch_delay = config.get( |
| 74 | 'membership_watch_relatch_delay', 0.1) |
| 75 | self.tracking_loop_delay = config.get( |
| 76 | 'tracking_loop_delay', 1) |
| 77 | self.prefix = self.config.get('voltha_kv_prefix', 'service/voltha') |
Zsolt Haraszti | 00d9a84 | 2016-11-23 11:18:23 -0800 | [diff] [blame] | 78 | self.leader_prefix = '/'.join((self.prefix, self.config.get( |
khenaidoo | 08d48d2 | 2017-06-29 19:42:49 -0400 | [diff] [blame^] | 79 | self.config['leader_key'], 'leader'))) |
Zsolt Haraszti | 00d9a84 | 2016-11-23 11:18:23 -0800 | [diff] [blame] | 80 | self.membership_prefix = '/'.join((self.prefix, self.config.get( |
khenaidoo | 08d48d2 | 2017-06-29 19:42:49 -0400 | [diff] [blame^] | 81 | self.config['membership_key'], 'members'), '')) |
Zsolt Haraszti | 00d9a84 | 2016-11-23 11:18:23 -0800 | [diff] [blame] | 82 | self.assignment_prefix = '/'.join((self.prefix, self.config.get( |
khenaidoo | 08d48d2 | 2017-06-29 19:42:49 -0400 | [diff] [blame^] | 83 | self.config['assignment_key'], 'assignments'), '')) |
Zsolt Haraszti | 00d9a84 | 2016-11-23 11:18:23 -0800 | [diff] [blame] | 84 | self.workload_prefix = '/'.join((self.prefix, self.config.get( |
khenaidoo | 08d48d2 | 2017-06-29 19:42:49 -0400 | [diff] [blame^] | 85 | self.config['workload_key'], 'work'), '')) |
khenaidoo | 032d330 | 2017-06-09 14:50:04 -0400 | [diff] [blame] | 86 | self.core_store_prefix = '/'.join((self.prefix, self.config.get( |
khenaidoo | 08d48d2 | 2017-06-29 19:42:49 -0400 | [diff] [blame^] | 87 | self.config['core_store_key'], 'data/core'))) |
| 88 | self.core_store_assignment_key = self.core_store_prefix + \ |
| 89 | '/assignment' |
| 90 | self.core_storage_suffix = 'core_store' |
Rouzbahan Rashidi-Tabrizi | 1c3eba8 | 2016-10-27 21:47:18 -0400 | [diff] [blame] | 91 | |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 92 | self.retries = 0 |
| 93 | self.instance_id = instance_id |
| 94 | self.internal_host_address = internal_host_address |
| 95 | self.external_host_address = external_host_address |
Zsolt Haraszti | de22bbc | 2016-09-14 15:27:33 -0700 | [diff] [blame] | 96 | self.rest_port = rest_port |
Rouzbahan Rashidi-Tabrizi | 1c3eba8 | 2016-10-27 21:47:18 -0400 | [diff] [blame] | 97 | self.membership_record_key = self.membership_prefix + self.instance_id |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 98 | |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 99 | self.session_id = None |
| 100 | self.i_am_leader = False |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 101 | self.leader_id = None # will be the instance id of the current leader |
| 102 | self.shutting_down = False |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 103 | self.leader = None |
Zsolt Haraszti | 3300f74 | 2017-01-09 01:14:20 -0800 | [diff] [blame] | 104 | self.session_renew_timer = None |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 105 | |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 106 | self.worker = Worker(self.instance_id, self) |
| 107 | |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 108 | host = consul.split(':')[0].strip() |
| 109 | port = int(consul.split(':')[1].strip()) |
Zsolt Haraszti | 109db83 | 2016-09-16 16:32:36 -0700 | [diff] [blame] | 110 | |
| 111 | # TODO need to handle reconnect events properly |
| 112 | self.consul = Consul(host=host, port=port) |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 113 | |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 114 | self.wait_for_leader_deferreds = [] |
| 115 | |
khenaidoo | 08d48d2 | 2017-06-29 19:42:49 -0400 | [diff] [blame^] | 116 | self.peers_mapping_queue = MessageQueue() |
| 117 | |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 118 | def start(self): |
| 119 | log.debug('starting') |
| 120 | reactor.callLater(0, self._async_init) |
| 121 | log.info('started') |
Zsolt Haraszti | dafefe1 | 2016-11-14 21:29:58 -0800 | [diff] [blame] | 122 | return self |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 123 | |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 124 | @inlineCallbacks |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 125 | def stop(self): |
| 126 | log.debug('stopping') |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 127 | self.shutting_down = True |
Zsolt Haraszti | 3300f74 | 2017-01-09 01:14:20 -0800 | [diff] [blame] | 128 | self.session_renew_timer.stop() |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 129 | yield self._delete_session() # this will delete the leader lock too |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 130 | yield self.worker.stop() |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 131 | if self.leader is not None: |
Zsolt Haraszti | dafefe1 | 2016-11-14 21:29:58 -0800 | [diff] [blame] | 132 | yield self.leader.stop() |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 133 | self.leader = None |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 134 | log.info('stopped') |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 135 | |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 136 | def wait_for_a_leader(self): |
| 137 | """ |
| 138 | Async wait till a leader is detected/elected. The deferred will be |
| 139 | called with the leader's instance_id |
| 140 | :return: Deferred. |
| 141 | """ |
| 142 | d = Deferred() |
| 143 | if self.leader_id is not None: |
| 144 | d.callback(self.leader_id) |
| 145 | return d |
| 146 | else: |
| 147 | self.wait_for_leader_deferreds.append(d) |
| 148 | return d |
| 149 | |
khenaidoo | 032d330 | 2017-06-09 14:50:04 -0400 | [diff] [blame] | 150 | # Wait for a core data id to be assigned to this voltha instance |
| 151 | @inlineCallbacks |
| 152 | def get_core_store_id_and_prefix(self): |
| 153 | core_store_id = yield self.worker.get_core_store_id() |
| 154 | returnValue((core_store_id, self.core_store_prefix)) |
| 155 | |
khenaidoo | 08d48d2 | 2017-06-29 19:42:49 -0400 | [diff] [blame^] | 156 | def recv_peers_map(self): |
| 157 | return self.peers_mapping_queue.get() |
| 158 | |
| 159 | def publish_peers_map_change(self, msg): |
| 160 | self.peers_mapping_queue.put(msg) |
| 161 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 162 | # Proxy methods for consul with retry support |
| 163 | |
| 164 | def kv_get(self, *args, **kw): |
| 165 | return self._retry(self.consul.kv.get, *args, **kw) |
| 166 | |
| 167 | def kv_put(self, *args, **kw): |
| 168 | return self._retry(self.consul.kv.put, *args, **kw) |
| 169 | |
| 170 | def kv_delete(self, *args, **kw): |
| 171 | return self._retry(self.consul.kv.delete, *args, **kw) |
| 172 | |
Zsolt Haraszti | 00d9a84 | 2016-11-23 11:18:23 -0800 | [diff] [blame] | 173 | # Methods exposing key membership information |
| 174 | |
| 175 | @inlineCallbacks |
| 176 | def get_members(self): |
| 177 | """Return list of all members""" |
| 178 | _, members = yield self.kv_get(self.membership_prefix, recurse=True) |
| 179 | returnValue([member['Key'][len(self.membership_prefix):] |
| 180 | for member in members]) |
| 181 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 182 | # Private (internal) methods: |
| 183 | |
| 184 | @inlineCallbacks |
| 185 | def _async_init(self): |
| 186 | yield self._create_session() |
| 187 | yield self._create_membership_record() |
| 188 | yield self._start_leader_tracking() |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 189 | yield self.worker.start() |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 190 | |
| 191 | def _backoff(self, msg): |
Zsolt Haraszti | 109db83 | 2016-09-16 16:32:36 -0700 | [diff] [blame] | 192 | wait_time = self.RETRY_BACKOFF[min(self.retries, |
| 193 | len(self.RETRY_BACKOFF) - 1)] |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 194 | self.retries += 1 |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 195 | log.error(msg, retry_in=wait_time) |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 196 | return asleep(wait_time) |
| 197 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 198 | def _clear_backoff(self): |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 199 | if self.retries: |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 200 | log.info('reconnected-to-consul', after_retries=self.retries) |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 201 | self.retries = 0 |
| 202 | |
| 203 | @inlineCallbacks |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 204 | def _create_session(self): |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 205 | |
| 206 | @inlineCallbacks |
| 207 | def _renew_session(): |
| 208 | try: |
Zsolt Haraszti | 109db83 | 2016-09-16 16:32:36 -0700 | [diff] [blame] | 209 | result = yield self.consul.session.renew( |
| 210 | session_id=self.session_id) |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 211 | log.debug('just renewed session', result=result) |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 212 | except Exception, e: |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 213 | log.exception('could-not-renew-session', e=e) |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 214 | |
| 215 | @inlineCallbacks |
| 216 | def _create_session(): |
| 217 | |
| 218 | # create consul session |
Zsolt Haraszti | 109db83 | 2016-09-16 16:32:36 -0700 | [diff] [blame] | 219 | self.session_id = yield self.consul.session.create( |
khenaidoo | 032d330 | 2017-06-09 14:50:04 -0400 | [diff] [blame] | 220 | behavior='release', ttl=60, lock_delay=1) |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 221 | log.info('created-consul-session', session_id=self.session_id) |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 222 | |
| 223 | # start renewing session it 3 times within the ttl |
Zsolt Haraszti | 3300f74 | 2017-01-09 01:14:20 -0800 | [diff] [blame] | 224 | self.session_renew_timer = LoopingCall(_renew_session) |
| 225 | self.session_renew_timer.start(3) |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 226 | |
| 227 | yield self._retry(_create_session) |
| 228 | |
| 229 | @inlineCallbacks |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 230 | def _delete_session(self): |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 231 | yield self.consul.session.destroy(self.session_id) |
| 232 | |
| 233 | @inlineCallbacks |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 234 | def _create_membership_record(self): |
| 235 | yield self._retry(self._do_create_membership_record) |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 236 | reactor.callLater(0, self._maintain_membership_record) |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 237 | |
khenaidoo | a8588f2 | 2017-06-16 12:13:34 -0400 | [diff] [blame] | 238 | def _create_membership_record_data(self): |
| 239 | member_record = dict() |
| 240 | member_record['status'] = 'alive' |
| 241 | member_record['host_address'] = self.external_host_address |
| 242 | return member_record |
| 243 | |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 244 | @inlineCallbacks |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 245 | def _do_create_membership_record(self): |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 246 | result = yield self.consul.kv.put( |
khenaidoo | a8588f2 | 2017-06-16 12:13:34 -0400 | [diff] [blame] | 247 | self.membership_record_key, |
| 248 | dumps(self._create_membership_record_data()), |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 249 | acquire=self.session_id) |
| 250 | if not result: |
| 251 | raise StaleMembershipEntryException(self.instance_id) |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 252 | |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 253 | @inlineCallbacks |
| 254 | def _maintain_membership_record(self): |
| 255 | index = None |
| 256 | try: |
| 257 | while 1: |
| 258 | (index, record) = yield self._retry(self.consul.kv.get, |
| 259 | self.membership_record_key, |
| 260 | index=index) |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 261 | log.debug('membership-record-change-detected', |
khenaidoo | 08d48d2 | 2017-06-29 19:42:49 -0400 | [diff] [blame^] | 262 | index=index, record=record) |
khenaidoo | a8588f2 | 2017-06-16 12:13:34 -0400 | [diff] [blame] | 263 | if record is None or \ |
khenaidoo | 08d48d2 | 2017-06-29 19:42:49 -0400 | [diff] [blame^] | 264 | 'Session' not in record or \ |
| 265 | record['Session'] != self.session_id: |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 266 | log.debug('remaking-membership-record') |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 267 | yield self._retry(self._do_create_membership_record) |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 268 | |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 269 | except Exception, e: |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 270 | log.exception('unexpected-error-leader-trackin', e=e) |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 271 | |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 272 | finally: |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 273 | # except in shutdown, the loop must continue (after a short delay) |
| 274 | if not self.shutting_down: |
Rouzbahan Rashidi-Tabrizi | 1c3eba8 | 2016-10-27 21:47:18 -0400 | [diff] [blame] | 275 | reactor.callLater(self.membership_watch_relatch_delay, |
| 276 | self._maintain_membership_record) |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 277 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 278 | def _start_leader_tracking(self): |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 279 | reactor.callLater(0, self._leadership_tracking_loop) |
| 280 | |
| 281 | @inlineCallbacks |
| 282 | def _leadership_tracking_loop(self): |
| 283 | |
| 284 | try: |
| 285 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 286 | # Attempt to acquire leadership lock. True indicates success; |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 287 | # False indicates there is already a leader. It's instance id |
| 288 | # is then the value under the leader key service/voltha/leader. |
| 289 | |
| 290 | # attempt acquire leader lock |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 291 | log.debug('leadership-attempt') |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 292 | result = yield self._retry(self.consul.kv.put, |
Rouzbahan Rashidi-Tabrizi | 1c3eba8 | 2016-10-27 21:47:18 -0400 | [diff] [blame] | 293 | self.leader_prefix, |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 294 | self.instance_id, |
| 295 | acquire=self.session_id) |
| 296 | |
| 297 | # read it back before being too happy; seeing our session id is a |
| 298 | # proof and now we have the change id that we can use to reliably |
| 299 | # track any changes. In an unlikely scenario where the leadership |
| 300 | # key gets wiped out administratively since the previous line, |
| 301 | # the returned record can be None. Handle it. |
| 302 | (index, record) = yield self._retry(self.consul.kv.get, |
Rouzbahan Rashidi-Tabrizi | 1c3eba8 | 2016-10-27 21:47:18 -0400 | [diff] [blame] | 303 | self.leader_prefix) |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 304 | log.debug('leadership-key', |
khenaidoo | 08d48d2 | 2017-06-29 19:42:49 -0400 | [diff] [blame^] | 305 | i_am_leader=result, index=index, record=record) |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 306 | |
| 307 | if record is not None: |
| 308 | if result is True: |
| 309 | if record['Session'] == self.session_id: |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 310 | yield self._assert_leadership() |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 311 | else: |
| 312 | pass # confusion; need to retry leadership |
| 313 | else: |
| 314 | leader_id = record['Value'] |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 315 | yield self._assert_nonleadership(leader_id) |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 316 | |
| 317 | # if record was none, we shall try leadership again |
| 318 | |
| 319 | # using consul's watch feature, start tracking any changes to key |
| 320 | last = record |
| 321 | while last is not None: |
| 322 | # this shall return only when update is made to leader key |
| 323 | (index, updated) = yield self._retry(self.consul.kv.get, |
Rouzbahan Rashidi-Tabrizi | 1c3eba8 | 2016-10-27 21:47:18 -0400 | [diff] [blame] | 324 | self.leader_prefix, |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 325 | index=index) |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 326 | log.debug('leader-key-change', |
khenaidoo | 08d48d2 | 2017-06-29 19:42:49 -0400 | [diff] [blame^] | 327 | index=index, updated=updated) |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 328 | if updated is None or updated != last: |
| 329 | # leadership has changed or vacated (or forcefully |
| 330 | # removed), apply now |
| 331 | break |
| 332 | last = updated |
| 333 | |
| 334 | except Exception, e: |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 335 | log.exception('unexpected-error-leader-trackin', e=e) |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 336 | |
| 337 | finally: |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 338 | # except in shutdown, the loop must continue (after a short delay) |
| 339 | if not self.shutting_down: |
Rouzbahan Rashidi-Tabrizi | 1c3eba8 | 2016-10-27 21:47:18 -0400 | [diff] [blame] | 340 | reactor.callLater(self.tracking_loop_delay, |
| 341 | self._leadership_tracking_loop) |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 342 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 343 | @inlineCallbacks |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 344 | def _assert_leadership(self): |
| 345 | """(Re-)assert leadership""" |
| 346 | if not self.i_am_leader: |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 347 | self.i_am_leader = True |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 348 | self._set_leader_id(self.instance_id) |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 349 | yield self._just_gained_leadership() |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 350 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 351 | @inlineCallbacks |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 352 | def _assert_nonleadership(self, leader_id): |
| 353 | """(Re-)assert non-leader role""" |
| 354 | |
| 355 | # update leader_id anyway |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 356 | self._set_leader_id(leader_id) |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 357 | |
| 358 | if self.i_am_leader: |
| 359 | self.i_am_leader = False |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 360 | yield self._just_lost_leadership() |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 361 | |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 362 | def _set_leader_id(self, leader_id): |
| 363 | self.leader_id = leader_id |
| 364 | deferreds, self.wait_for_leader_deferreds = \ |
| 365 | self.wait_for_leader_deferreds, [] |
| 366 | for d in deferreds: |
| 367 | d.callback(leader_id) |
| 368 | |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 369 | def _just_gained_leadership(self): |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 370 | log.info('became-leader') |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 371 | self.leader = Leader(self) |
| 372 | return self.leader.start() |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 373 | |
| 374 | def _just_lost_leadership(self): |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 375 | log.info('lost-leadership') |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 376 | return self._halt_leader() |
| 377 | |
| 378 | def _halt_leader(self): |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 379 | d = self.leader.stop() |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 380 | self.leader = None |
| 381 | return d |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 382 | |
| 383 | @inlineCallbacks |
| 384 | def _retry(self, func, *args, **kw): |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 385 | while 1: |
| 386 | try: |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 387 | result = yield func(*args, **kw) |
Zsolt Haraszti | d4226ed | 2016-10-05 17:49:27 -0700 | [diff] [blame] | 388 | self._clear_backoff() |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 389 | break |
| 390 | except ConsulException, e: |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 391 | yield self._backoff('consul-not-up') |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 392 | except ConnectionError, e: |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 393 | yield self._backoff('cannot-connect-to-consul') |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 394 | except StaleMembershipEntryException, e: |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 395 | yield self._backoff('stale-membership-record-in-the-way') |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 396 | except Exception, e: |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 397 | if not self.shutting_down: |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 398 | log.exception(e) |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 399 | yield self._backoff('unknown-error') |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 400 | |
| 401 | returnValue(result) |