blob: d023d6b3a16f79eacdf0338c730fadabe5155b56 [file] [log] [blame]
Zsolt Harasztif2da1d02016-09-13 23:21:35 -07001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17""" Consul-based coordinator services """
18
Zsolt Haraszti109db832016-09-16 16:32:36 -070019from consul import ConsulException
Zsolt Harasztie060a7d2016-09-16 11:08:24 -070020from consul.twisted import Consul
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070021from requests import ConnectionError
22from structlog import get_logger
23from twisted.internet import reactor
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070024from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
Zsolt Harasztie060a7d2016-09-16 11:08:24 -070025from twisted.internet.task import LoopingCall
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070026
27from asleep import asleep
Zsolt Harasztia3410312016-09-18 23:29:04 -070028from leader import Leader
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070029from worker import Worker
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070030
31
Zsolt Haraszti1420def2016-09-18 00:07:31 -070032class StaleMembershipEntryException(Exception):
33 pass
34
35
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070036class Coordinator(object):
Zsolt Harasztia3410312016-09-18 23:29:04 -070037 """
38 An app shall instantiate only one Coordinator (singleton).
39 A single instance of this object shall take care of all external
40 with consul, and via consul, all coordination activities with its
41 clustered peers. Roles include:
42 - registering an ephemeral membership entry (k/v record) in consul
43 - participating in a symmetric leader election, and potentially assuming
44 the leader's role. What leadership entails is not a concern for the
45 coordination, it simply instantiates (and shuts down) a leader class
46 when it gains (or looses) leadership.
47 """
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070048
49 CONNECT_RETRY_INTERVAL_SEC = 1
50 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
Zsolt Haraszti1420def2016-09-18 00:07:31 -070051 LEADER_KEY = 'service/voltha/leader'
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070052
Zsolt Haraszti1420def2016-09-18 00:07:31 -070053 MEMBERSHIP_PREFIX = 'service/voltha/members/'
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070054 ASSIGNMENT_PREFIX = 'service/voltha/assignments/'
55 WORKLOAD_PREFIX = 'service/voltha/work/'
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070056
Zsolt Harasztia3410312016-09-18 23:29:04 -070057 # Public methods:
58
Zsolt Harasztide22bbc2016-09-14 15:27:33 -070059 def __init__(self,
60 internal_host_address,
61 external_host_address,
62 instance_id,
63 rest_port,
Zsolt Harasztia3410312016-09-18 23:29:04 -070064 consul='localhost:8500',
65 leader_class=Leader):
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070066
67 self.retries = 0
68 self.instance_id = instance_id
69 self.internal_host_address = internal_host_address
70 self.external_host_address = external_host_address
Zsolt Harasztide22bbc2016-09-14 15:27:33 -070071 self.rest_port = rest_port
Zsolt Haraszti1420def2016-09-18 00:07:31 -070072 self.membership_record_key = self.MEMBERSHIP_PREFIX + self.instance_id
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070073
Zsolt Harasztie060a7d2016-09-16 11:08:24 -070074 self.session_id = None
75 self.i_am_leader = False
Zsolt Haraszti1420def2016-09-18 00:07:31 -070076 self.leader_id = None # will be the instance id of the current leader
77 self.shutting_down = False
Zsolt Harasztia3410312016-09-18 23:29:04 -070078 self.leader = None
Zsolt Harasztie060a7d2016-09-16 11:08:24 -070079
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070080 self.worker = Worker(self.instance_id, self)
81
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070082 self.log = get_logger()
83 self.log.info('initializing-coordinator')
84
85 host = consul.split(':')[0].strip()
86 port = int(consul.split(':')[1].strip())
Zsolt Haraszti109db832016-09-16 16:32:36 -070087
88 # TODO need to handle reconnect events properly
89 self.consul = Consul(host=host, port=port)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070090
Zsolt Harasztia3410312016-09-18 23:29:04 -070091 reactor.callLater(0, self._async_init)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070092 self.log.info('initialized-coordinator')
93
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070094 self.wait_for_leader_deferreds = []
95
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070096 @inlineCallbacks
Zsolt Haraszti1420def2016-09-18 00:07:31 -070097 def shutdown(self):
98 self.shutting_down = True
Zsolt Harasztia3410312016-09-18 23:29:04 -070099 yield self._delete_session() # this will delete the leader lock too
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700100 yield self.worker.halt()
Zsolt Harasztia3410312016-09-18 23:29:04 -0700101 if self.leader is not None:
102 yield self.leader.halt()
103 self.leader = None
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700104
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700105 def wait_for_a_leader(self):
106 """
107 Async wait till a leader is detected/elected. The deferred will be
108 called with the leader's instance_id
109 :return: Deferred.
110 """
111 d = Deferred()
112 if self.leader_id is not None:
113 d.callback(self.leader_id)
114 return d
115 else:
116 self.wait_for_leader_deferreds.append(d)
117 return d
118
Zsolt Harasztia3410312016-09-18 23:29:04 -0700119 # Proxy methods for consul with retry support
120
121 def kv_get(self, *args, **kw):
122 return self._retry(self.consul.kv.get, *args, **kw)
123
124 def kv_put(self, *args, **kw):
125 return self._retry(self.consul.kv.put, *args, **kw)
126
127 def kv_delete(self, *args, **kw):
128 return self._retry(self.consul.kv.delete, *args, **kw)
129
130 # Private (internal) methods:
131
132 @inlineCallbacks
133 def _async_init(self):
134 yield self._create_session()
135 yield self._create_membership_record()
136 yield self._start_leader_tracking()
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700137 yield self.worker.start()
Zsolt Harasztia3410312016-09-18 23:29:04 -0700138
139 def _backoff(self, msg):
Zsolt Haraszti109db832016-09-16 16:32:36 -0700140 wait_time = self.RETRY_BACKOFF[min(self.retries,
141 len(self.RETRY_BACKOFF) - 1)]
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700142 self.retries += 1
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700143 self.log.error(msg, retry_in=wait_time)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700144 return asleep(wait_time)
145
Zsolt Harasztia3410312016-09-18 23:29:04 -0700146 def _clear_backoff(self):
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700147 if self.retries:
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700148 self.log.info('reconnected-to-consul', after_retries=self.retries)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700149 self.retries = 0
150
151 @inlineCallbacks
Zsolt Harasztia3410312016-09-18 23:29:04 -0700152 def _create_session(self):
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700153
154 @inlineCallbacks
155 def _renew_session():
156 try:
Zsolt Haraszti109db832016-09-16 16:32:36 -0700157 result = yield self.consul.session.renew(
158 session_id=self.session_id)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700159 self.log.debug('just renewed session', result=result)
160 except Exception, e:
161 self.log.exception('could-not-renew-session', e=e)
162
163 @inlineCallbacks
164 def _create_session():
165
166 # create consul session
Zsolt Haraszti109db832016-09-16 16:32:36 -0700167 self.session_id = yield self.consul.session.create(
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700168 behavior='delete', ttl=10, lock_delay=1)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700169 self.log.info('created-consul-session', session_id=self.session_id)
170
171 # start renewing session it 3 times within the ttl
172 lc = LoopingCall(_renew_session)
173 lc.start(3)
174
175 yield self._retry(_create_session)
176
177 @inlineCallbacks
Zsolt Harasztia3410312016-09-18 23:29:04 -0700178 def _delete_session(self):
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700179 yield self.consul.session.destroy(self.session_id)
180
181 @inlineCallbacks
Zsolt Harasztia3410312016-09-18 23:29:04 -0700182 def _create_membership_record(self):
183 yield self._retry(self._do_create_membership_record)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700184 reactor.callLater(0, self._maintain_membership_record)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700185
186 @inlineCallbacks
Zsolt Harasztia3410312016-09-18 23:29:04 -0700187 def _do_create_membership_record(self):
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700188 result = yield self.consul.kv.put(
189 self.membership_record_key, 'alive',
190 acquire=self.session_id)
191 if not result:
192 raise StaleMembershipEntryException(self.instance_id)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700193
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700194 @inlineCallbacks
195 def _maintain_membership_record(self):
196 index = None
197 try:
198 while 1:
199 (index, record) = yield self._retry(self.consul.kv.get,
200 self.membership_record_key,
201 index=index)
202 self.log.debug('membership-record-change-detected',
203 index=index, record=record)
204 if record is None or record['Session'] != self.session_id:
205 self.log.debug('remaking-membership-record')
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700206 yield self._retry(self._do_create_membership_record)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700207
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700208 except Exception, e:
209 self.log.exception('unexpected-error-leader-trackin', e=e)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700210
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700211 finally:
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700212 # except in shutdown, the loop must continue (after a short delay)
213 if not self.shutting_down:
214 reactor.callLater(0.1, self._maintain_membership_record)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700215
Zsolt Harasztia3410312016-09-18 23:29:04 -0700216 def _start_leader_tracking(self):
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700217 reactor.callLater(0, self._leadership_tracking_loop)
218
219 @inlineCallbacks
220 def _leadership_tracking_loop(self):
221
222 try:
223
Zsolt Harasztia3410312016-09-18 23:29:04 -0700224 # Attempt to acquire leadership lock. True indicates success;
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700225 # False indicates there is already a leader. It's instance id
226 # is then the value under the leader key service/voltha/leader.
227
228 # attempt acquire leader lock
229 self.log.debug('leadership-attempt')
230 result = yield self._retry(self.consul.kv.put,
231 self.LEADER_KEY,
232 self.instance_id,
233 acquire=self.session_id)
234
235 # read it back before being too happy; seeing our session id is a
236 # proof and now we have the change id that we can use to reliably
237 # track any changes. In an unlikely scenario where the leadership
238 # key gets wiped out administratively since the previous line,
239 # the returned record can be None. Handle it.
240 (index, record) = yield self._retry(self.consul.kv.get,
241 self.LEADER_KEY)
242 self.log.debug('leadership-key',
243 i_am_leader=result, index=index, record=record)
244
245 if record is not None:
246 if result is True:
247 if record['Session'] == self.session_id:
Zsolt Harasztia3410312016-09-18 23:29:04 -0700248 yield self._assert_leadership()
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700249 else:
250 pass # confusion; need to retry leadership
251 else:
252 leader_id = record['Value']
Zsolt Harasztia3410312016-09-18 23:29:04 -0700253 yield self._assert_nonleadership(leader_id)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700254
255 # if record was none, we shall try leadership again
256
257 # using consul's watch feature, start tracking any changes to key
258 last = record
259 while last is not None:
260 # this shall return only when update is made to leader key
261 (index, updated) = yield self._retry(self.consul.kv.get,
262 self.LEADER_KEY,
263 index=index)
264 self.log.debug('leader-key-change',
265 index=index, updated=updated)
266 if updated is None or updated != last:
267 # leadership has changed or vacated (or forcefully
268 # removed), apply now
269 break
270 last = updated
271
272 except Exception, e:
273 self.log.exception('unexpected-error-leader-trackin', e=e)
274
275 finally:
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700276 # except in shutdown, the loop must continue (after a short delay)
277 if not self.shutting_down:
278 reactor.callLater(1, self._leadership_tracking_loop)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700279
Zsolt Harasztia3410312016-09-18 23:29:04 -0700280 @inlineCallbacks
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700281 def _assert_leadership(self):
282 """(Re-)assert leadership"""
283 if not self.i_am_leader:
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700284 self.i_am_leader = True
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700285 self._set_leader_id(self.instance_id)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700286 yield self._just_gained_leadership()
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700287
Zsolt Harasztia3410312016-09-18 23:29:04 -0700288 @inlineCallbacks
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700289 def _assert_nonleadership(self, leader_id):
290 """(Re-)assert non-leader role"""
291
292 # update leader_id anyway
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700293 self._set_leader_id(leader_id)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700294
295 if self.i_am_leader:
296 self.i_am_leader = False
Zsolt Harasztia3410312016-09-18 23:29:04 -0700297 yield self._just_lost_leadership()
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700298
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700299 def _set_leader_id(self, leader_id):
300 self.leader_id = leader_id
301 deferreds, self.wait_for_leader_deferreds = \
302 self.wait_for_leader_deferreds, []
303 for d in deferreds:
304 d.callback(leader_id)
305
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700306 def _just_gained_leadership(self):
307 self.log.info('became-leader')
Zsolt Harasztia3410312016-09-18 23:29:04 -0700308 self.leader = Leader(self)
309 return self.leader.start()
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700310
311 def _just_lost_leadership(self):
312 self.log.info('lost-leadership')
Zsolt Harasztia3410312016-09-18 23:29:04 -0700313 return self._halt_leader()
314
315 def _halt_leader(self):
316 d = self.leader.halt()
317 self.leader = None
318 return d
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700319
320 @inlineCallbacks
321 def _retry(self, func, *args, **kw):
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700322 while 1:
323 try:
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700324 result = yield func(*args, **kw)
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700325 self._clear_backoff()
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700326 break
327 except ConsulException, e:
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700328 yield self._backoff('consul-not-up')
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700329 except ConnectionError, e:
Zsolt Harasztia3410312016-09-18 23:29:04 -0700330 yield self._backoff('cannot-connect-to-consul')
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700331 except StaleMembershipEntryException, e:
Zsolt Harasztia3410312016-09-18 23:29:04 -0700332 yield self._backoff('stale-membership-record-in-the-way')
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700333 except Exception, e:
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700334 if not self.shutting_down:
335 self.log.exception(e)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700336 yield self._backoff('unknown-error')
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700337
338 returnValue(result)