blob: 8992ccb3b6bc2c4dbbb9f1cf633fba734beb037c [file] [log] [blame]
Zsolt Harasztif2da1d02016-09-13 23:21:35 -07001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17""" Consul-based coordinator services """
18
Zsolt Haraszti109db832016-09-16 16:32:36 -070019from consul import ConsulException
Zsolt Harasztie060a7d2016-09-16 11:08:24 -070020from consul.twisted import Consul
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070021from requests import ConnectionError
22from structlog import get_logger
23from twisted.internet import reactor
Zsolt Harasztie060a7d2016-09-16 11:08:24 -070024from twisted.internet.defer import inlineCallbacks, returnValue
25from twisted.internet.task import LoopingCall
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070026
27from asleep import asleep
Zsolt Harasztia3410312016-09-18 23:29:04 -070028from leader import Leader
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070029
30
Zsolt Haraszti1420def2016-09-18 00:07:31 -070031class StaleMembershipEntryException(Exception):
32 pass
33
34
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070035class Coordinator(object):
Zsolt Harasztia3410312016-09-18 23:29:04 -070036 """
37 An app shall instantiate only one Coordinator (singleton).
38 A single instance of this object shall take care of all external
39 with consul, and via consul, all coordination activities with its
40 clustered peers. Roles include:
41 - registering an ephemeral membership entry (k/v record) in consul
42 - participating in a symmetric leader election, and potentially assuming
43 the leader's role. What leadership entails is not a concern for the
44 coordination, it simply instantiates (and shuts down) a leader class
45 when it gains (or looses) leadership.
46 """
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070047
48 CONNECT_RETRY_INTERVAL_SEC = 1
49 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
Zsolt Haraszti1420def2016-09-18 00:07:31 -070050 LEADER_KEY = 'service/voltha/leader'
51 MEMBERSHIP_PREFIX = 'service/voltha/members/'
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070052
Zsolt Harasztia3410312016-09-18 23:29:04 -070053 # Public methods:
54
Zsolt Harasztide22bbc2016-09-14 15:27:33 -070055 def __init__(self,
56 internal_host_address,
57 external_host_address,
58 instance_id,
59 rest_port,
Zsolt Harasztia3410312016-09-18 23:29:04 -070060 consul='localhost:8500',
61 leader_class=Leader):
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070062
63 self.retries = 0
64 self.instance_id = instance_id
65 self.internal_host_address = internal_host_address
66 self.external_host_address = external_host_address
Zsolt Harasztide22bbc2016-09-14 15:27:33 -070067 self.rest_port = rest_port
Zsolt Haraszti1420def2016-09-18 00:07:31 -070068 self.membership_record_key = self.MEMBERSHIP_PREFIX + self.instance_id
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070069
Zsolt Harasztie060a7d2016-09-16 11:08:24 -070070 self.session_id = None
71 self.i_am_leader = False
Zsolt Haraszti1420def2016-09-18 00:07:31 -070072 self.leader_id = None # will be the instance id of the current leader
73 self.shutting_down = False
Zsolt Harasztia3410312016-09-18 23:29:04 -070074 self.leader = None
Zsolt Harasztie060a7d2016-09-16 11:08:24 -070075
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070076 self.log = get_logger()
77 self.log.info('initializing-coordinator')
78
79 host = consul.split(':')[0].strip()
80 port = int(consul.split(':')[1].strip())
Zsolt Haraszti109db832016-09-16 16:32:36 -070081
82 # TODO need to handle reconnect events properly
83 self.consul = Consul(host=host, port=port)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070084
Zsolt Harasztia3410312016-09-18 23:29:04 -070085 reactor.callLater(0, self._async_init)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070086 self.log.info('initialized-coordinator')
87
88 @inlineCallbacks
Zsolt Haraszti1420def2016-09-18 00:07:31 -070089 def shutdown(self):
90 self.shutting_down = True
Zsolt Harasztia3410312016-09-18 23:29:04 -070091 yield self._delete_session() # this will delete the leader lock too
92 if self.leader is not None:
93 yield self.leader.halt()
94 self.leader = None
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070095
Zsolt Harasztia3410312016-09-18 23:29:04 -070096 # Proxy methods for consul with retry support
97
98 def kv_get(self, *args, **kw):
99 return self._retry(self.consul.kv.get, *args, **kw)
100
101 def kv_put(self, *args, **kw):
102 return self._retry(self.consul.kv.put, *args, **kw)
103
104 def kv_delete(self, *args, **kw):
105 return self._retry(self.consul.kv.delete, *args, **kw)
106
107 # Private (internal) methods:
108
109 @inlineCallbacks
110 def _async_init(self):
111 yield self._create_session()
112 yield self._create_membership_record()
113 yield self._start_leader_tracking()
114
115 def _backoff(self, msg):
Zsolt Haraszti109db832016-09-16 16:32:36 -0700116 wait_time = self.RETRY_BACKOFF[min(self.retries,
117 len(self.RETRY_BACKOFF) - 1)]
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700118 self.retries += 1
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700119 self.log.error(msg, retry_in=wait_time)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700120 return asleep(wait_time)
121
Zsolt Harasztia3410312016-09-18 23:29:04 -0700122 def _clear_backoff(self):
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700123 if self.retries:
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700124 self.log.info('reconnected-to-consul', after_retries=self.retries)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700125 self.retries = 0
126
127 @inlineCallbacks
Zsolt Harasztia3410312016-09-18 23:29:04 -0700128 def _create_session(self):
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700129
130 @inlineCallbacks
131 def _renew_session():
132 try:
Zsolt Haraszti109db832016-09-16 16:32:36 -0700133 result = yield self.consul.session.renew(
134 session_id=self.session_id)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700135 self.log.debug('just renewed session', result=result)
136 except Exception, e:
137 self.log.exception('could-not-renew-session', e=e)
138
139 @inlineCallbacks
140 def _create_session():
141
142 # create consul session
Zsolt Haraszti109db832016-09-16 16:32:36 -0700143 self.session_id = yield self.consul.session.create(
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700144 behavior='delete', ttl=10, lock_delay=1)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700145 self.log.info('created-consul-session', session_id=self.session_id)
146
147 # start renewing session it 3 times within the ttl
148 lc = LoopingCall(_renew_session)
149 lc.start(3)
150
151 yield self._retry(_create_session)
152
153 @inlineCallbacks
Zsolt Harasztia3410312016-09-18 23:29:04 -0700154 def _delete_session(self):
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700155 yield self.consul.session.destroy(self.session_id)
156
157 @inlineCallbacks
Zsolt Harasztia3410312016-09-18 23:29:04 -0700158 def _create_membership_record(self):
159 yield self._retry(self._do_create_membership_record)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700160 reactor.callLater(0, self._maintain_membership_record)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700161
162 @inlineCallbacks
Zsolt Harasztia3410312016-09-18 23:29:04 -0700163 def _do_create_membership_record(self):
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700164 result = yield self.consul.kv.put(
165 self.membership_record_key, 'alive',
166 acquire=self.session_id)
167 if not result:
168 raise StaleMembershipEntryException(self.instance_id)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700169
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700170 @inlineCallbacks
171 def _maintain_membership_record(self):
172 index = None
173 try:
174 while 1:
175 (index, record) = yield self._retry(self.consul.kv.get,
176 self.membership_record_key,
177 index=index)
178 self.log.debug('membership-record-change-detected',
179 index=index, record=record)
180 if record is None or record['Session'] != self.session_id:
181 self.log.debug('remaking-membership-record')
Zsolt Harasztia3410312016-09-18 23:29:04 -0700182 yield self._do_create_membership_record()
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700183
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700184 except Exception, e:
185 self.log.exception('unexpected-error-leader-trackin', e=e)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700186
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700187 finally:
188 # no matter what, the loop need to continue (after a short delay)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700189 reactor.callLater(0.1, self._maintain_membership_record)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700190
Zsolt Harasztia3410312016-09-18 23:29:04 -0700191 def _start_leader_tracking(self):
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700192 reactor.callLater(0, self._leadership_tracking_loop)
193
194 @inlineCallbacks
195 def _leadership_tracking_loop(self):
196
197 try:
198
Zsolt Harasztia3410312016-09-18 23:29:04 -0700199 # Attempt to acquire leadership lock. True indicates success;
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700200 # False indicates there is already a leader. It's instance id
201 # is then the value under the leader key service/voltha/leader.
202
203 # attempt acquire leader lock
204 self.log.debug('leadership-attempt')
205 result = yield self._retry(self.consul.kv.put,
206 self.LEADER_KEY,
207 self.instance_id,
208 acquire=self.session_id)
209
210 # read it back before being too happy; seeing our session id is a
211 # proof and now we have the change id that we can use to reliably
212 # track any changes. In an unlikely scenario where the leadership
213 # key gets wiped out administratively since the previous line,
214 # the returned record can be None. Handle it.
215 (index, record) = yield self._retry(self.consul.kv.get,
216 self.LEADER_KEY)
217 self.log.debug('leadership-key',
218 i_am_leader=result, index=index, record=record)
219
220 if record is not None:
221 if result is True:
222 if record['Session'] == self.session_id:
Zsolt Harasztia3410312016-09-18 23:29:04 -0700223 yield self._assert_leadership()
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700224 else:
225 pass # confusion; need to retry leadership
226 else:
227 leader_id = record['Value']
Zsolt Harasztia3410312016-09-18 23:29:04 -0700228 yield self._assert_nonleadership(leader_id)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700229
230 # if record was none, we shall try leadership again
231
232 # using consul's watch feature, start tracking any changes to key
233 last = record
234 while last is not None:
235 # this shall return only when update is made to leader key
236 (index, updated) = yield self._retry(self.consul.kv.get,
237 self.LEADER_KEY,
238 index=index)
239 self.log.debug('leader-key-change',
240 index=index, updated=updated)
241 if updated is None or updated != last:
242 # leadership has changed or vacated (or forcefully
243 # removed), apply now
244 break
245 last = updated
246
247 except Exception, e:
248 self.log.exception('unexpected-error-leader-trackin', e=e)
249
250 finally:
251 # no matter what, the loop need to continue (after a short delay)
252 reactor.callLater(1, self._leadership_tracking_loop)
253
Zsolt Harasztia3410312016-09-18 23:29:04 -0700254 @inlineCallbacks
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700255 def _assert_leadership(self):
256 """(Re-)assert leadership"""
257 if not self.i_am_leader:
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700258 self.i_am_leader = True
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700259 self.leader_id = self.instance_id
Zsolt Harasztia3410312016-09-18 23:29:04 -0700260 yield self._just_gained_leadership()
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700261
Zsolt Harasztia3410312016-09-18 23:29:04 -0700262 @inlineCallbacks
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700263 def _assert_nonleadership(self, leader_id):
264 """(Re-)assert non-leader role"""
265
266 # update leader_id anyway
267 self.leader_id = leader_id
268
269 if self.i_am_leader:
270 self.i_am_leader = False
Zsolt Harasztia3410312016-09-18 23:29:04 -0700271 yield self._just_lost_leadership()
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700272
273 def _just_gained_leadership(self):
274 self.log.info('became-leader')
Zsolt Harasztia3410312016-09-18 23:29:04 -0700275 self.leader = Leader(self)
276 return self.leader.start()
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700277
278 def _just_lost_leadership(self):
279 self.log.info('lost-leadership')
Zsolt Harasztia3410312016-09-18 23:29:04 -0700280 return self._halt_leader()
281
282 def _halt_leader(self):
283 d = self.leader.halt()
284 self.leader = None
285 return d
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700286
287 @inlineCallbacks
288 def _retry(self, func, *args, **kw):
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700289 while 1:
290 try:
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700291 result = yield func(*args, **kw)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700292 break
293 except ConsulException, e:
Zsolt Harasztia3410312016-09-18 23:29:04 -0700294 yield self._backoff('consul-not-upC')
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700295 except ConnectionError, e:
Zsolt Harasztia3410312016-09-18 23:29:04 -0700296 yield self._backoff('cannot-connect-to-consul')
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700297 except StaleMembershipEntryException, e:
Zsolt Harasztia3410312016-09-18 23:29:04 -0700298 yield self._backoff('stale-membership-record-in-the-way')
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700299 except Exception, e:
300 self.log.exception(e)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700301 yield self._backoff('unknown-error')
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700302
303 returnValue(result)