Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 1 | # |
| 2 | # Copyright 2016 the original author or authors. |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | # |
| 16 | |
| 17 | """ Consul-based coordinator services """ |
| 18 | |
Zsolt Haraszti | 109db83 | 2016-09-16 16:32:36 -0700 | [diff] [blame] | 19 | from consul import ConsulException |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 20 | from consul.twisted import Consul |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 21 | from requests import ConnectionError |
| 22 | from structlog import get_logger |
| 23 | from twisted.internet import reactor |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 24 | from twisted.internet.defer import inlineCallbacks, returnValue, Deferred |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 25 | from twisted.internet.task import LoopingCall |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 26 | |
| 27 | from asleep import asleep |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 28 | from leader import Leader |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 29 | from worker import Worker |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 30 | |
| 31 | |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 32 | class StaleMembershipEntryException(Exception): |
| 33 | pass |
| 34 | |
| 35 | |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 36 | class Coordinator(object): |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 37 | """ |
| 38 | An app shall instantiate only one Coordinator (singleton). |
| 39 | A single instance of this object shall take care of all external |
| 40 | with consul, and via consul, all coordination activities with its |
| 41 | clustered peers. Roles include: |
| 42 | - registering an ephemeral membership entry (k/v record) in consul |
| 43 | - participating in a symmetric leader election, and potentially assuming |
| 44 | the leader's role. What leadership entails is not a concern for the |
| 45 | coordination, it simply instantiates (and shuts down) a leader class |
| 46 | when it gains (or looses) leadership. |
| 47 | """ |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 48 | |
| 49 | CONNECT_RETRY_INTERVAL_SEC = 1 |
| 50 | RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5] |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 51 | LEADER_KEY = 'service/voltha/leader' |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 52 | |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 53 | MEMBERSHIP_PREFIX = 'service/voltha/members/' |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 54 | ASSIGNMENT_PREFIX = 'service/voltha/assignments/' |
| 55 | WORKLOAD_PREFIX = 'service/voltha/work/' |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 56 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 57 | # Public methods: |
| 58 | |
Zsolt Haraszti | de22bbc | 2016-09-14 15:27:33 -0700 | [diff] [blame] | 59 | def __init__(self, |
| 60 | internal_host_address, |
| 61 | external_host_address, |
| 62 | instance_id, |
| 63 | rest_port, |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 64 | consul='localhost:8500', |
| 65 | leader_class=Leader): |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 66 | |
| 67 | self.retries = 0 |
| 68 | self.instance_id = instance_id |
| 69 | self.internal_host_address = internal_host_address |
| 70 | self.external_host_address = external_host_address |
Zsolt Haraszti | de22bbc | 2016-09-14 15:27:33 -0700 | [diff] [blame] | 71 | self.rest_port = rest_port |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 72 | self.membership_record_key = self.MEMBERSHIP_PREFIX + self.instance_id |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 73 | |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 74 | self.session_id = None |
| 75 | self.i_am_leader = False |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 76 | self.leader_id = None # will be the instance id of the current leader |
| 77 | self.shutting_down = False |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 78 | self.leader = None |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 79 | |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 80 | self.worker = Worker(self.instance_id, self) |
| 81 | |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 82 | self.log = get_logger() |
| 83 | self.log.info('initializing-coordinator') |
| 84 | |
| 85 | host = consul.split(':')[0].strip() |
| 86 | port = int(consul.split(':')[1].strip()) |
Zsolt Haraszti | 109db83 | 2016-09-16 16:32:36 -0700 | [diff] [blame] | 87 | |
| 88 | # TODO need to handle reconnect events properly |
| 89 | self.consul = Consul(host=host, port=port) |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 90 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 91 | reactor.callLater(0, self._async_init) |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 92 | self.log.info('initialized-coordinator') |
| 93 | |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 94 | self.wait_for_leader_deferreds = [] |
| 95 | |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 96 | @inlineCallbacks |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 97 | def shutdown(self): |
| 98 | self.shutting_down = True |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 99 | yield self._delete_session() # this will delete the leader lock too |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 100 | yield self.worker.halt() |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 101 | if self.leader is not None: |
| 102 | yield self.leader.halt() |
| 103 | self.leader = None |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 104 | |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 105 | def wait_for_a_leader(self): |
| 106 | """ |
| 107 | Async wait till a leader is detected/elected. The deferred will be |
| 108 | called with the leader's instance_id |
| 109 | :return: Deferred. |
| 110 | """ |
| 111 | d = Deferred() |
| 112 | if self.leader_id is not None: |
| 113 | d.callback(self.leader_id) |
| 114 | return d |
| 115 | else: |
| 116 | self.wait_for_leader_deferreds.append(d) |
| 117 | return d |
| 118 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 119 | # Proxy methods for consul with retry support |
| 120 | |
| 121 | def kv_get(self, *args, **kw): |
| 122 | return self._retry(self.consul.kv.get, *args, **kw) |
| 123 | |
| 124 | def kv_put(self, *args, **kw): |
| 125 | return self._retry(self.consul.kv.put, *args, **kw) |
| 126 | |
| 127 | def kv_delete(self, *args, **kw): |
| 128 | return self._retry(self.consul.kv.delete, *args, **kw) |
| 129 | |
| 130 | # Private (internal) methods: |
| 131 | |
| 132 | @inlineCallbacks |
| 133 | def _async_init(self): |
| 134 | yield self._create_session() |
| 135 | yield self._create_membership_record() |
| 136 | yield self._start_leader_tracking() |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 137 | yield self.worker.start() |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 138 | |
| 139 | def _backoff(self, msg): |
Zsolt Haraszti | 109db83 | 2016-09-16 16:32:36 -0700 | [diff] [blame] | 140 | wait_time = self.RETRY_BACKOFF[min(self.retries, |
| 141 | len(self.RETRY_BACKOFF) - 1)] |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 142 | self.retries += 1 |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 143 | self.log.error(msg, retry_in=wait_time) |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 144 | return asleep(wait_time) |
| 145 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 146 | def _clear_backoff(self): |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 147 | if self.retries: |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 148 | self.log.info('reconnected-to-consul', after_retries=self.retries) |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 149 | self.retries = 0 |
| 150 | |
| 151 | @inlineCallbacks |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 152 | def _create_session(self): |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 153 | |
| 154 | @inlineCallbacks |
| 155 | def _renew_session(): |
| 156 | try: |
Zsolt Haraszti | 109db83 | 2016-09-16 16:32:36 -0700 | [diff] [blame] | 157 | result = yield self.consul.session.renew( |
| 158 | session_id=self.session_id) |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 159 | self.log.debug('just renewed session', result=result) |
| 160 | except Exception, e: |
| 161 | self.log.exception('could-not-renew-session', e=e) |
| 162 | |
| 163 | @inlineCallbacks |
| 164 | def _create_session(): |
| 165 | |
| 166 | # create consul session |
Zsolt Haraszti | 109db83 | 2016-09-16 16:32:36 -0700 | [diff] [blame] | 167 | self.session_id = yield self.consul.session.create( |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 168 | behavior='delete', ttl=10, lock_delay=1) |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 169 | self.log.info('created-consul-session', session_id=self.session_id) |
| 170 | |
| 171 | # start renewing session it 3 times within the ttl |
| 172 | lc = LoopingCall(_renew_session) |
| 173 | lc.start(3) |
| 174 | |
| 175 | yield self._retry(_create_session) |
| 176 | |
| 177 | @inlineCallbacks |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 178 | def _delete_session(self): |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 179 | yield self.consul.session.destroy(self.session_id) |
| 180 | |
| 181 | @inlineCallbacks |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 182 | def _create_membership_record(self): |
| 183 | yield self._retry(self._do_create_membership_record) |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 184 | reactor.callLater(0, self._maintain_membership_record) |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 185 | |
| 186 | @inlineCallbacks |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 187 | def _do_create_membership_record(self): |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 188 | result = yield self.consul.kv.put( |
| 189 | self.membership_record_key, 'alive', |
| 190 | acquire=self.session_id) |
| 191 | if not result: |
| 192 | raise StaleMembershipEntryException(self.instance_id) |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 193 | |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 194 | @inlineCallbacks |
| 195 | def _maintain_membership_record(self): |
| 196 | index = None |
| 197 | try: |
| 198 | while 1: |
| 199 | (index, record) = yield self._retry(self.consul.kv.get, |
| 200 | self.membership_record_key, |
| 201 | index=index) |
| 202 | self.log.debug('membership-record-change-detected', |
| 203 | index=index, record=record) |
| 204 | if record is None or record['Session'] != self.session_id: |
| 205 | self.log.debug('remaking-membership-record') |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 206 | yield self._retry(self._do_create_membership_record) |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 207 | |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 208 | except Exception, e: |
| 209 | self.log.exception('unexpected-error-leader-trackin', e=e) |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 210 | |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 211 | finally: |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 212 | # except in shutdown, the loop must continue (after a short delay) |
| 213 | if not self.shutting_down: |
| 214 | reactor.callLater(0.1, self._maintain_membership_record) |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 215 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 216 | def _start_leader_tracking(self): |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 217 | reactor.callLater(0, self._leadership_tracking_loop) |
| 218 | |
| 219 | @inlineCallbacks |
| 220 | def _leadership_tracking_loop(self): |
| 221 | |
| 222 | try: |
| 223 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 224 | # Attempt to acquire leadership lock. True indicates success; |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 225 | # False indicates there is already a leader. It's instance id |
| 226 | # is then the value under the leader key service/voltha/leader. |
| 227 | |
| 228 | # attempt acquire leader lock |
| 229 | self.log.debug('leadership-attempt') |
| 230 | result = yield self._retry(self.consul.kv.put, |
| 231 | self.LEADER_KEY, |
| 232 | self.instance_id, |
| 233 | acquire=self.session_id) |
| 234 | |
| 235 | # read it back before being too happy; seeing our session id is a |
| 236 | # proof and now we have the change id that we can use to reliably |
| 237 | # track any changes. In an unlikely scenario where the leadership |
| 238 | # key gets wiped out administratively since the previous line, |
| 239 | # the returned record can be None. Handle it. |
| 240 | (index, record) = yield self._retry(self.consul.kv.get, |
| 241 | self.LEADER_KEY) |
| 242 | self.log.debug('leadership-key', |
| 243 | i_am_leader=result, index=index, record=record) |
| 244 | |
| 245 | if record is not None: |
| 246 | if result is True: |
| 247 | if record['Session'] == self.session_id: |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 248 | yield self._assert_leadership() |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 249 | else: |
| 250 | pass # confusion; need to retry leadership |
| 251 | else: |
| 252 | leader_id = record['Value'] |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 253 | yield self._assert_nonleadership(leader_id) |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 254 | |
| 255 | # if record was none, we shall try leadership again |
| 256 | |
| 257 | # using consul's watch feature, start tracking any changes to key |
| 258 | last = record |
| 259 | while last is not None: |
| 260 | # this shall return only when update is made to leader key |
| 261 | (index, updated) = yield self._retry(self.consul.kv.get, |
| 262 | self.LEADER_KEY, |
| 263 | index=index) |
| 264 | self.log.debug('leader-key-change', |
| 265 | index=index, updated=updated) |
| 266 | if updated is None or updated != last: |
| 267 | # leadership has changed or vacated (or forcefully |
| 268 | # removed), apply now |
| 269 | break |
| 270 | last = updated |
| 271 | |
| 272 | except Exception, e: |
| 273 | self.log.exception('unexpected-error-leader-trackin', e=e) |
| 274 | |
| 275 | finally: |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 276 | # except in shutdown, the loop must continue (after a short delay) |
| 277 | if not self.shutting_down: |
| 278 | reactor.callLater(1, self._leadership_tracking_loop) |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 279 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 280 | @inlineCallbacks |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 281 | def _assert_leadership(self): |
| 282 | """(Re-)assert leadership""" |
| 283 | if not self.i_am_leader: |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 284 | self.i_am_leader = True |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 285 | self._set_leader_id(self.instance_id) |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 286 | yield self._just_gained_leadership() |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 287 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 288 | @inlineCallbacks |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 289 | def _assert_nonleadership(self, leader_id): |
| 290 | """(Re-)assert non-leader role""" |
| 291 | |
| 292 | # update leader_id anyway |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 293 | self._set_leader_id(leader_id) |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 294 | |
| 295 | if self.i_am_leader: |
| 296 | self.i_am_leader = False |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 297 | yield self._just_lost_leadership() |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 298 | |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 299 | def _set_leader_id(self, leader_id): |
| 300 | self.leader_id = leader_id |
| 301 | deferreds, self.wait_for_leader_deferreds = \ |
| 302 | self.wait_for_leader_deferreds, [] |
| 303 | for d in deferreds: |
| 304 | d.callback(leader_id) |
| 305 | |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 306 | def _just_gained_leadership(self): |
| 307 | self.log.info('became-leader') |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 308 | self.leader = Leader(self) |
| 309 | return self.leader.start() |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 310 | |
| 311 | def _just_lost_leadership(self): |
| 312 | self.log.info('lost-leadership') |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 313 | return self._halt_leader() |
| 314 | |
| 315 | def _halt_leader(self): |
| 316 | d = self.leader.halt() |
| 317 | self.leader = None |
| 318 | return d |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 319 | |
| 320 | @inlineCallbacks |
| 321 | def _retry(self, func, *args, **kw): |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 322 | while 1: |
| 323 | try: |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 324 | result = yield func(*args, **kw) |
Zsolt Haraszti | d4226ed | 2016-10-05 17:49:27 -0700 | [diff] [blame^] | 325 | self._clear_backoff() |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 326 | break |
| 327 | except ConsulException, e: |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 328 | yield self._backoff('consul-not-up') |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 329 | except ConnectionError, e: |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 330 | yield self._backoff('cannot-connect-to-consul') |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 331 | except StaleMembershipEntryException, e: |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 332 | yield self._backoff('stale-membership-record-in-the-way') |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 333 | except Exception, e: |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 334 | if not self.shutting_down: |
| 335 | self.log.exception(e) |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 336 | yield self._backoff('unknown-error') |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 337 | |
| 338 | returnValue(result) |