Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 1 | # |
| 2 | # Copyright 2016 the original author or authors. |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | # |
| 16 | |
| 17 | """ Consul-based coordinator services """ |
| 18 | |
Zsolt Haraszti | 109db83 | 2016-09-16 16:32:36 -0700 | [diff] [blame] | 19 | from consul import ConsulException |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 20 | from consul.twisted import Consul |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 21 | from requests import ConnectionError |
| 22 | from structlog import get_logger |
| 23 | from twisted.internet import reactor |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 24 | from twisted.internet.defer import inlineCallbacks, returnValue |
| 25 | from twisted.internet.task import LoopingCall |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 26 | |
| 27 | from asleep import asleep |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 28 | from leader import Leader |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 29 | |
| 30 | |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 31 | class StaleMembershipEntryException(Exception): |
| 32 | pass |
| 33 | |
| 34 | |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 35 | class Coordinator(object): |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 36 | """ |
| 37 | An app shall instantiate only one Coordinator (singleton). |
| 38 | A single instance of this object shall take care of all external |
| 39 | with consul, and via consul, all coordination activities with its |
| 40 | clustered peers. Roles include: |
| 41 | - registering an ephemeral membership entry (k/v record) in consul |
| 42 | - participating in a symmetric leader election, and potentially assuming |
| 43 | the leader's role. What leadership entails is not a concern for the |
| 44 | coordination, it simply instantiates (and shuts down) a leader class |
| 45 | when it gains (or looses) leadership. |
| 46 | """ |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 47 | |
| 48 | CONNECT_RETRY_INTERVAL_SEC = 1 |
| 49 | RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5] |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 50 | LEADER_KEY = 'service/voltha/leader' |
| 51 | MEMBERSHIP_PREFIX = 'service/voltha/members/' |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 52 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 53 | # Public methods: |
| 54 | |
Zsolt Haraszti | de22bbc | 2016-09-14 15:27:33 -0700 | [diff] [blame] | 55 | def __init__(self, |
| 56 | internal_host_address, |
| 57 | external_host_address, |
| 58 | instance_id, |
| 59 | rest_port, |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 60 | consul='localhost:8500', |
| 61 | leader_class=Leader): |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 62 | |
| 63 | self.retries = 0 |
| 64 | self.instance_id = instance_id |
| 65 | self.internal_host_address = internal_host_address |
| 66 | self.external_host_address = external_host_address |
Zsolt Haraszti | de22bbc | 2016-09-14 15:27:33 -0700 | [diff] [blame] | 67 | self.rest_port = rest_port |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 68 | self.membership_record_key = self.MEMBERSHIP_PREFIX + self.instance_id |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 69 | |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 70 | self.session_id = None |
| 71 | self.i_am_leader = False |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 72 | self.leader_id = None # will be the instance id of the current leader |
| 73 | self.shutting_down = False |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 74 | self.leader = None |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 75 | |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 76 | self.log = get_logger() |
| 77 | self.log.info('initializing-coordinator') |
| 78 | |
| 79 | host = consul.split(':')[0].strip() |
| 80 | port = int(consul.split(':')[1].strip()) |
Zsolt Haraszti | 109db83 | 2016-09-16 16:32:36 -0700 | [diff] [blame] | 81 | |
| 82 | # TODO need to handle reconnect events properly |
| 83 | self.consul = Consul(host=host, port=port) |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 84 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 85 | reactor.callLater(0, self._async_init) |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 86 | self.log.info('initialized-coordinator') |
| 87 | |
| 88 | @inlineCallbacks |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 89 | def shutdown(self): |
| 90 | self.shutting_down = True |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 91 | yield self._delete_session() # this will delete the leader lock too |
| 92 | if self.leader is not None: |
| 93 | yield self.leader.halt() |
| 94 | self.leader = None |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 95 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 96 | # Proxy methods for consul with retry support |
| 97 | |
| 98 | def kv_get(self, *args, **kw): |
| 99 | return self._retry(self.consul.kv.get, *args, **kw) |
| 100 | |
| 101 | def kv_put(self, *args, **kw): |
| 102 | return self._retry(self.consul.kv.put, *args, **kw) |
| 103 | |
| 104 | def kv_delete(self, *args, **kw): |
| 105 | return self._retry(self.consul.kv.delete, *args, **kw) |
| 106 | |
| 107 | # Private (internal) methods: |
| 108 | |
| 109 | @inlineCallbacks |
| 110 | def _async_init(self): |
| 111 | yield self._create_session() |
| 112 | yield self._create_membership_record() |
| 113 | yield self._start_leader_tracking() |
| 114 | |
| 115 | def _backoff(self, msg): |
Zsolt Haraszti | 109db83 | 2016-09-16 16:32:36 -0700 | [diff] [blame] | 116 | wait_time = self.RETRY_BACKOFF[min(self.retries, |
| 117 | len(self.RETRY_BACKOFF) - 1)] |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 118 | self.retries += 1 |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 119 | self.log.error(msg, retry_in=wait_time) |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 120 | return asleep(wait_time) |
| 121 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 122 | def _clear_backoff(self): |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 123 | if self.retries: |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 124 | self.log.info('reconnected-to-consul', after_retries=self.retries) |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 125 | self.retries = 0 |
| 126 | |
| 127 | @inlineCallbacks |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 128 | def _create_session(self): |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 129 | |
| 130 | @inlineCallbacks |
| 131 | def _renew_session(): |
| 132 | try: |
Zsolt Haraszti | 109db83 | 2016-09-16 16:32:36 -0700 | [diff] [blame] | 133 | result = yield self.consul.session.renew( |
| 134 | session_id=self.session_id) |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 135 | self.log.debug('just renewed session', result=result) |
| 136 | except Exception, e: |
| 137 | self.log.exception('could-not-renew-session', e=e) |
| 138 | |
| 139 | @inlineCallbacks |
| 140 | def _create_session(): |
| 141 | |
| 142 | # create consul session |
Zsolt Haraszti | 109db83 | 2016-09-16 16:32:36 -0700 | [diff] [blame] | 143 | self.session_id = yield self.consul.session.create( |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 144 | behavior='delete', ttl=10, lock_delay=1) |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 145 | self.log.info('created-consul-session', session_id=self.session_id) |
| 146 | |
| 147 | # start renewing session it 3 times within the ttl |
| 148 | lc = LoopingCall(_renew_session) |
| 149 | lc.start(3) |
| 150 | |
| 151 | yield self._retry(_create_session) |
| 152 | |
| 153 | @inlineCallbacks |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 154 | def _delete_session(self): |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 155 | yield self.consul.session.destroy(self.session_id) |
| 156 | |
| 157 | @inlineCallbacks |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 158 | def _create_membership_record(self): |
| 159 | yield self._retry(self._do_create_membership_record) |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 160 | reactor.callLater(0, self._maintain_membership_record) |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 161 | |
| 162 | @inlineCallbacks |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 163 | def _do_create_membership_record(self): |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 164 | result = yield self.consul.kv.put( |
| 165 | self.membership_record_key, 'alive', |
| 166 | acquire=self.session_id) |
| 167 | if not result: |
| 168 | raise StaleMembershipEntryException(self.instance_id) |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 169 | |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 170 | @inlineCallbacks |
| 171 | def _maintain_membership_record(self): |
| 172 | index = None |
| 173 | try: |
| 174 | while 1: |
| 175 | (index, record) = yield self._retry(self.consul.kv.get, |
| 176 | self.membership_record_key, |
| 177 | index=index) |
| 178 | self.log.debug('membership-record-change-detected', |
| 179 | index=index, record=record) |
| 180 | if record is None or record['Session'] != self.session_id: |
| 181 | self.log.debug('remaking-membership-record') |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 182 | yield self._do_create_membership_record() |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 183 | |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 184 | except Exception, e: |
| 185 | self.log.exception('unexpected-error-leader-trackin', e=e) |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 186 | |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 187 | finally: |
| 188 | # no matter what, the loop need to continue (after a short delay) |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 189 | reactor.callLater(0.1, self._maintain_membership_record) |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 190 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 191 | def _start_leader_tracking(self): |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 192 | reactor.callLater(0, self._leadership_tracking_loop) |
| 193 | |
| 194 | @inlineCallbacks |
| 195 | def _leadership_tracking_loop(self): |
| 196 | |
| 197 | try: |
| 198 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 199 | # Attempt to acquire leadership lock. True indicates success; |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 200 | # False indicates there is already a leader. It's instance id |
| 201 | # is then the value under the leader key service/voltha/leader. |
| 202 | |
| 203 | # attempt acquire leader lock |
| 204 | self.log.debug('leadership-attempt') |
| 205 | result = yield self._retry(self.consul.kv.put, |
| 206 | self.LEADER_KEY, |
| 207 | self.instance_id, |
| 208 | acquire=self.session_id) |
| 209 | |
| 210 | # read it back before being too happy; seeing our session id is a |
| 211 | # proof and now we have the change id that we can use to reliably |
| 212 | # track any changes. In an unlikely scenario where the leadership |
| 213 | # key gets wiped out administratively since the previous line, |
| 214 | # the returned record can be None. Handle it. |
| 215 | (index, record) = yield self._retry(self.consul.kv.get, |
| 216 | self.LEADER_KEY) |
| 217 | self.log.debug('leadership-key', |
| 218 | i_am_leader=result, index=index, record=record) |
| 219 | |
| 220 | if record is not None: |
| 221 | if result is True: |
| 222 | if record['Session'] == self.session_id: |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 223 | yield self._assert_leadership() |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 224 | else: |
| 225 | pass # confusion; need to retry leadership |
| 226 | else: |
| 227 | leader_id = record['Value'] |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 228 | yield self._assert_nonleadership(leader_id) |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 229 | |
| 230 | # if record was none, we shall try leadership again |
| 231 | |
| 232 | # using consul's watch feature, start tracking any changes to key |
| 233 | last = record |
| 234 | while last is not None: |
| 235 | # this shall return only when update is made to leader key |
| 236 | (index, updated) = yield self._retry(self.consul.kv.get, |
| 237 | self.LEADER_KEY, |
| 238 | index=index) |
| 239 | self.log.debug('leader-key-change', |
| 240 | index=index, updated=updated) |
| 241 | if updated is None or updated != last: |
| 242 | # leadership has changed or vacated (or forcefully |
| 243 | # removed), apply now |
| 244 | break |
| 245 | last = updated |
| 246 | |
| 247 | except Exception, e: |
| 248 | self.log.exception('unexpected-error-leader-trackin', e=e) |
| 249 | |
| 250 | finally: |
| 251 | # no matter what, the loop need to continue (after a short delay) |
| 252 | reactor.callLater(1, self._leadership_tracking_loop) |
| 253 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 254 | @inlineCallbacks |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 255 | def _assert_leadership(self): |
| 256 | """(Re-)assert leadership""" |
| 257 | if not self.i_am_leader: |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 258 | self.i_am_leader = True |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 259 | self.leader_id = self.instance_id |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 260 | yield self._just_gained_leadership() |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 261 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 262 | @inlineCallbacks |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 263 | def _assert_nonleadership(self, leader_id): |
| 264 | """(Re-)assert non-leader role""" |
| 265 | |
| 266 | # update leader_id anyway |
| 267 | self.leader_id = leader_id |
| 268 | |
| 269 | if self.i_am_leader: |
| 270 | self.i_am_leader = False |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 271 | yield self._just_lost_leadership() |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 272 | |
| 273 | def _just_gained_leadership(self): |
| 274 | self.log.info('became-leader') |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 275 | self.leader = Leader(self) |
| 276 | return self.leader.start() |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 277 | |
| 278 | def _just_lost_leadership(self): |
| 279 | self.log.info('lost-leadership') |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 280 | return self._halt_leader() |
| 281 | |
| 282 | def _halt_leader(self): |
| 283 | d = self.leader.halt() |
| 284 | self.leader = None |
| 285 | return d |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 286 | |
| 287 | @inlineCallbacks |
| 288 | def _retry(self, func, *args, **kw): |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 289 | while 1: |
| 290 | try: |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 291 | result = yield func(*args, **kw) |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 292 | break |
| 293 | except ConsulException, e: |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 294 | yield self._backoff('consul-not-upC') |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 295 | except ConnectionError, e: |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 296 | yield self._backoff('cannot-connect-to-consul') |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 297 | except StaleMembershipEntryException, e: |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 298 | yield self._backoff('stale-membership-record-in-the-way') |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 299 | except Exception, e: |
| 300 | self.log.exception(e) |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame^] | 301 | yield self._backoff('unknown-error') |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 302 | |
| 303 | returnValue(result) |