blob: 6eecc112975acf6747b9053762e33a77950b2cfb [file] [log] [blame]
Zsolt Harasztif2da1d02016-09-13 23:21:35 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Harasztif2da1d02016-09-13 23:21:35 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17""" Consul-based coordinator services """
18
Zsolt Haraszti109db832016-09-16 16:32:36 -070019from consul import ConsulException
Zsolt Harasztie060a7d2016-09-16 11:08:24 -070020from consul.twisted import Consul
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070021from requests import ConnectionError
22from structlog import get_logger
23from twisted.internet import reactor
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070024from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
Zsolt Harasztie060a7d2016-09-16 11:08:24 -070025from twisted.internet.task import LoopingCall
Zsolt Harasztidafefe12016-11-14 21:29:58 -080026from zope.interface import implementer
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070027
Zsolt Harasztia3410312016-09-18 23:29:04 -070028from leader import Leader
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070029from common.utils.asleep import asleep
khenaidoo08d48d22017-06-29 19:42:49 -040030from common.utils.message_queue import MessageQueue
Zsolt Harasztidafefe12016-11-14 21:29:58 -080031from voltha.registry import IComponent
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070032from worker import Worker
khenaidooa8588f22017-06-16 12:13:34 -040033from simplejson import dumps, loads
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070034
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070035log = get_logger()
36
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070037
Zsolt Haraszti1420def2016-09-18 00:07:31 -070038class StaleMembershipEntryException(Exception):
39 pass
40
41
Zsolt Harasztidafefe12016-11-14 21:29:58 -080042@implementer(IComponent)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070043class Coordinator(object):
Zsolt Harasztia3410312016-09-18 23:29:04 -070044 """
45 An app shall instantiate only one Coordinator (singleton).
46 A single instance of this object shall take care of all external
47 with consul, and via consul, all coordination activities with its
48 clustered peers. Roles include:
49 - registering an ephemeral membership entry (k/v record) in consul
50 - participating in a symmetric leader election, and potentially assuming
51 the leader's role. What leadership entails is not a concern for the
52 coordination, it simply instantiates (and shuts down) a leader class
53 when it gains (or looses) leadership.
54 """
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070055
56 CONNECT_RETRY_INTERVAL_SEC = 1
57 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
58
Zsolt Harasztia3410312016-09-18 23:29:04 -070059 # Public methods:
60
Zsolt Harasztide22bbc2016-09-14 15:27:33 -070061 def __init__(self,
62 internal_host_address,
63 external_host_address,
64 instance_id,
65 rest_port,
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040066 config,
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070067 consul='localhost:8500'):
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070068
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070069 log.info('initializing-coordinator')
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040070 self.config = config['coordinator']
71 self.worker_config = config['worker']
72 self.leader_config = config['leader']
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040073 self.membership_watch_relatch_delay = config.get(
74 'membership_watch_relatch_delay', 0.1)
75 self.tracking_loop_delay = config.get(
76 'tracking_loop_delay', 1)
77 self.prefix = self.config.get('voltha_kv_prefix', 'service/voltha')
Zsolt Haraszti00d9a842016-11-23 11:18:23 -080078 self.leader_prefix = '/'.join((self.prefix, self.config.get(
khenaidoo08d48d22017-06-29 19:42:49 -040079 self.config['leader_key'], 'leader')))
Zsolt Haraszti00d9a842016-11-23 11:18:23 -080080 self.membership_prefix = '/'.join((self.prefix, self.config.get(
khenaidoo08d48d22017-06-29 19:42:49 -040081 self.config['membership_key'], 'members'), ''))
Zsolt Haraszti00d9a842016-11-23 11:18:23 -080082 self.assignment_prefix = '/'.join((self.prefix, self.config.get(
khenaidoo08d48d22017-06-29 19:42:49 -040083 self.config['assignment_key'], 'assignments'), ''))
Zsolt Haraszti00d9a842016-11-23 11:18:23 -080084 self.workload_prefix = '/'.join((self.prefix, self.config.get(
khenaidoo08d48d22017-06-29 19:42:49 -040085 self.config['workload_key'], 'work'), ''))
khenaidoo032d3302017-06-09 14:50:04 -040086 self.core_store_prefix = '/'.join((self.prefix, self.config.get(
khenaidoo08d48d22017-06-29 19:42:49 -040087 self.config['core_store_key'], 'data/core')))
88 self.core_store_assignment_key = self.core_store_prefix + \
89 '/assignment'
90 self.core_storage_suffix = 'core_store'
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040091
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070092 self.retries = 0
93 self.instance_id = instance_id
94 self.internal_host_address = internal_host_address
95 self.external_host_address = external_host_address
Zsolt Harasztide22bbc2016-09-14 15:27:33 -070096 self.rest_port = rest_port
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040097 self.membership_record_key = self.membership_prefix + self.instance_id
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070098
Zsolt Harasztie060a7d2016-09-16 11:08:24 -070099 self.session_id = None
100 self.i_am_leader = False
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700101 self.leader_id = None # will be the instance id of the current leader
102 self.shutting_down = False
Zsolt Harasztia3410312016-09-18 23:29:04 -0700103 self.leader = None
Zsolt Haraszti3300f742017-01-09 01:14:20 -0800104 self.session_renew_timer = None
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700105
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700106 self.worker = Worker(self.instance_id, self)
107
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700108 host = consul.split(':')[0].strip()
109 port = int(consul.split(':')[1].strip())
Zsolt Haraszti109db832016-09-16 16:32:36 -0700110
111 # TODO need to handle reconnect events properly
112 self.consul = Consul(host=host, port=port)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700113
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700114 self.wait_for_leader_deferreds = []
115
khenaidoo08d48d22017-06-29 19:42:49 -0400116 self.peers_mapping_queue = MessageQueue()
117
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700118 def start(self):
119 log.debug('starting')
120 reactor.callLater(0, self._async_init)
121 log.info('started')
Zsolt Harasztidafefe12016-11-14 21:29:58 -0800122 return self
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700123
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700124 @inlineCallbacks
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700125 def stop(self):
126 log.debug('stopping')
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700127 self.shutting_down = True
Zsolt Haraszti3300f742017-01-09 01:14:20 -0800128 self.session_renew_timer.stop()
Zsolt Harasztia3410312016-09-18 23:29:04 -0700129 yield self._delete_session() # this will delete the leader lock too
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700130 yield self.worker.stop()
Zsolt Harasztia3410312016-09-18 23:29:04 -0700131 if self.leader is not None:
Zsolt Harasztidafefe12016-11-14 21:29:58 -0800132 yield self.leader.stop()
Zsolt Harasztia3410312016-09-18 23:29:04 -0700133 self.leader = None
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700134 log.info('stopped')
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700135
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700136 def wait_for_a_leader(self):
137 """
138 Async wait till a leader is detected/elected. The deferred will be
139 called with the leader's instance_id
140 :return: Deferred.
141 """
142 d = Deferred()
143 if self.leader_id is not None:
144 d.callback(self.leader_id)
145 return d
146 else:
147 self.wait_for_leader_deferreds.append(d)
148 return d
149
khenaidoo032d3302017-06-09 14:50:04 -0400150 # Wait for a core data id to be assigned to this voltha instance
151 @inlineCallbacks
152 def get_core_store_id_and_prefix(self):
153 core_store_id = yield self.worker.get_core_store_id()
154 returnValue((core_store_id, self.core_store_prefix))
155
khenaidoo08d48d22017-06-29 19:42:49 -0400156 def recv_peers_map(self):
157 return self.peers_mapping_queue.get()
158
159 def publish_peers_map_change(self, msg):
160 self.peers_mapping_queue.put(msg)
161
Zsolt Harasztia3410312016-09-18 23:29:04 -0700162 # Proxy methods for consul with retry support
163
164 def kv_get(self, *args, **kw):
165 return self._retry(self.consul.kv.get, *args, **kw)
166
167 def kv_put(self, *args, **kw):
168 return self._retry(self.consul.kv.put, *args, **kw)
169
170 def kv_delete(self, *args, **kw):
171 return self._retry(self.consul.kv.delete, *args, **kw)
172
Zsolt Haraszti00d9a842016-11-23 11:18:23 -0800173 # Methods exposing key membership information
174
175 @inlineCallbacks
176 def get_members(self):
177 """Return list of all members"""
178 _, members = yield self.kv_get(self.membership_prefix, recurse=True)
179 returnValue([member['Key'][len(self.membership_prefix):]
180 for member in members])
181
Zsolt Harasztia3410312016-09-18 23:29:04 -0700182 # Private (internal) methods:
183
184 @inlineCallbacks
185 def _async_init(self):
186 yield self._create_session()
187 yield self._create_membership_record()
188 yield self._start_leader_tracking()
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700189 yield self.worker.start()
Zsolt Harasztia3410312016-09-18 23:29:04 -0700190
191 def _backoff(self, msg):
Zsolt Haraszti109db832016-09-16 16:32:36 -0700192 wait_time = self.RETRY_BACKOFF[min(self.retries,
193 len(self.RETRY_BACKOFF) - 1)]
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700194 self.retries += 1
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700195 log.error(msg, retry_in=wait_time)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700196 return asleep(wait_time)
197
Zsolt Harasztia3410312016-09-18 23:29:04 -0700198 def _clear_backoff(self):
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700199 if self.retries:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700200 log.info('reconnected-to-consul', after_retries=self.retries)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700201 self.retries = 0
202
203 @inlineCallbacks
Zsolt Harasztia3410312016-09-18 23:29:04 -0700204 def _create_session(self):
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700205
206 @inlineCallbacks
207 def _renew_session():
208 try:
Zsolt Haraszti109db832016-09-16 16:32:36 -0700209 result = yield self.consul.session.renew(
210 session_id=self.session_id)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700211 log.debug('just renewed session', result=result)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700212 except Exception, e:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700213 log.exception('could-not-renew-session', e=e)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700214
215 @inlineCallbacks
216 def _create_session():
217
218 # create consul session
Zsolt Haraszti109db832016-09-16 16:32:36 -0700219 self.session_id = yield self.consul.session.create(
khenaidoo032d3302017-06-09 14:50:04 -0400220 behavior='release', ttl=60, lock_delay=1)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700221 log.info('created-consul-session', session_id=self.session_id)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700222
223 # start renewing session it 3 times within the ttl
Zsolt Haraszti3300f742017-01-09 01:14:20 -0800224 self.session_renew_timer = LoopingCall(_renew_session)
225 self.session_renew_timer.start(3)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700226
227 yield self._retry(_create_session)
228
229 @inlineCallbacks
Zsolt Harasztia3410312016-09-18 23:29:04 -0700230 def _delete_session(self):
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700231 yield self.consul.session.destroy(self.session_id)
232
233 @inlineCallbacks
Zsolt Harasztia3410312016-09-18 23:29:04 -0700234 def _create_membership_record(self):
235 yield self._retry(self._do_create_membership_record)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700236 reactor.callLater(0, self._maintain_membership_record)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700237
khenaidooa8588f22017-06-16 12:13:34 -0400238 def _create_membership_record_data(self):
239 member_record = dict()
240 member_record['status'] = 'alive'
241 member_record['host_address'] = self.external_host_address
242 return member_record
243
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700244 @inlineCallbacks
Zsolt Harasztia3410312016-09-18 23:29:04 -0700245 def _do_create_membership_record(self):
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700246 result = yield self.consul.kv.put(
khenaidooa8588f22017-06-16 12:13:34 -0400247 self.membership_record_key,
248 dumps(self._create_membership_record_data()),
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700249 acquire=self.session_id)
250 if not result:
251 raise StaleMembershipEntryException(self.instance_id)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700252
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700253 @inlineCallbacks
254 def _maintain_membership_record(self):
255 index = None
256 try:
257 while 1:
258 (index, record) = yield self._retry(self.consul.kv.get,
259 self.membership_record_key,
260 index=index)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700261 log.debug('membership-record-change-detected',
khenaidoo08d48d22017-06-29 19:42:49 -0400262 index=index, record=record)
khenaidooa8588f22017-06-16 12:13:34 -0400263 if record is None or \
khenaidoo08d48d22017-06-29 19:42:49 -0400264 'Session' not in record or \
265 record['Session'] != self.session_id:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700266 log.debug('remaking-membership-record')
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700267 yield self._retry(self._do_create_membership_record)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700268
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700269 except Exception, e:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700270 log.exception('unexpected-error-leader-trackin', e=e)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700271
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700272 finally:
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700273 # except in shutdown, the loop must continue (after a short delay)
274 if not self.shutting_down:
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400275 reactor.callLater(self.membership_watch_relatch_delay,
276 self._maintain_membership_record)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700277
Zsolt Harasztia3410312016-09-18 23:29:04 -0700278 def _start_leader_tracking(self):
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700279 reactor.callLater(0, self._leadership_tracking_loop)
280
281 @inlineCallbacks
282 def _leadership_tracking_loop(self):
283
284 try:
285
Zsolt Harasztia3410312016-09-18 23:29:04 -0700286 # Attempt to acquire leadership lock. True indicates success;
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700287 # False indicates there is already a leader. It's instance id
288 # is then the value under the leader key service/voltha/leader.
289
290 # attempt acquire leader lock
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700291 log.debug('leadership-attempt')
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700292 result = yield self._retry(self.consul.kv.put,
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400293 self.leader_prefix,
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700294 self.instance_id,
295 acquire=self.session_id)
296
297 # read it back before being too happy; seeing our session id is a
298 # proof and now we have the change id that we can use to reliably
299 # track any changes. In an unlikely scenario where the leadership
300 # key gets wiped out administratively since the previous line,
301 # the returned record can be None. Handle it.
302 (index, record) = yield self._retry(self.consul.kv.get,
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400303 self.leader_prefix)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700304 log.debug('leadership-key',
khenaidoo08d48d22017-06-29 19:42:49 -0400305 i_am_leader=result, index=index, record=record)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700306
307 if record is not None:
308 if result is True:
309 if record['Session'] == self.session_id:
Zsolt Harasztia3410312016-09-18 23:29:04 -0700310 yield self._assert_leadership()
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700311 else:
312 pass # confusion; need to retry leadership
313 else:
314 leader_id = record['Value']
Zsolt Harasztia3410312016-09-18 23:29:04 -0700315 yield self._assert_nonleadership(leader_id)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700316
317 # if record was none, we shall try leadership again
318
319 # using consul's watch feature, start tracking any changes to key
320 last = record
321 while last is not None:
322 # this shall return only when update is made to leader key
323 (index, updated) = yield self._retry(self.consul.kv.get,
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400324 self.leader_prefix,
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700325 index=index)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700326 log.debug('leader-key-change',
khenaidoo08d48d22017-06-29 19:42:49 -0400327 index=index, updated=updated)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700328 if updated is None or updated != last:
329 # leadership has changed or vacated (or forcefully
330 # removed), apply now
331 break
332 last = updated
333
334 except Exception, e:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700335 log.exception('unexpected-error-leader-trackin', e=e)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700336
337 finally:
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700338 # except in shutdown, the loop must continue (after a short delay)
339 if not self.shutting_down:
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400340 reactor.callLater(self.tracking_loop_delay,
341 self._leadership_tracking_loop)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700342
Zsolt Harasztia3410312016-09-18 23:29:04 -0700343 @inlineCallbacks
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700344 def _assert_leadership(self):
345 """(Re-)assert leadership"""
346 if not self.i_am_leader:
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700347 self.i_am_leader = True
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700348 self._set_leader_id(self.instance_id)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700349 yield self._just_gained_leadership()
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700350
Zsolt Harasztia3410312016-09-18 23:29:04 -0700351 @inlineCallbacks
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700352 def _assert_nonleadership(self, leader_id):
353 """(Re-)assert non-leader role"""
354
355 # update leader_id anyway
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700356 self._set_leader_id(leader_id)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700357
358 if self.i_am_leader:
359 self.i_am_leader = False
Zsolt Harasztia3410312016-09-18 23:29:04 -0700360 yield self._just_lost_leadership()
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700361
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700362 def _set_leader_id(self, leader_id):
363 self.leader_id = leader_id
364 deferreds, self.wait_for_leader_deferreds = \
365 self.wait_for_leader_deferreds, []
366 for d in deferreds:
367 d.callback(leader_id)
368
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700369 def _just_gained_leadership(self):
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700370 log.info('became-leader')
Zsolt Harasztia3410312016-09-18 23:29:04 -0700371 self.leader = Leader(self)
372 return self.leader.start()
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700373
374 def _just_lost_leadership(self):
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700375 log.info('lost-leadership')
Zsolt Harasztia3410312016-09-18 23:29:04 -0700376 return self._halt_leader()
377
378 def _halt_leader(self):
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700379 d = self.leader.stop()
Zsolt Harasztia3410312016-09-18 23:29:04 -0700380 self.leader = None
381 return d
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700382
383 @inlineCallbacks
384 def _retry(self, func, *args, **kw):
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700385 while 1:
386 try:
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700387 result = yield func(*args, **kw)
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700388 self._clear_backoff()
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700389 break
390 except ConsulException, e:
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700391 yield self._backoff('consul-not-up')
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700392 except ConnectionError, e:
Zsolt Harasztia3410312016-09-18 23:29:04 -0700393 yield self._backoff('cannot-connect-to-consul')
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700394 except StaleMembershipEntryException, e:
Zsolt Harasztia3410312016-09-18 23:29:04 -0700395 yield self._backoff('stale-membership-record-in-the-way')
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700396 except Exception, e:
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700397 if not self.shutting_down:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700398 log.exception(e)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700399 yield self._backoff('unknown-error')
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700400
401 returnValue(result)