blob: ab73484a0a0c4d97d695d59b274dc524dfa2dbea [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16from copy import copy
17
18from jsonpatch import JsonPatch
19from jsonpatch import make_patch
20
21from common.utils.json_format import MessageToDict
22from voltha.core.config.config_branch import ConfigBranch
23from voltha.core.config.config_event_bus import ConfigEventBus
24from voltha.core.config.config_proxy import CallbackType, ConfigProxy
25from voltha.core.config.config_rev import is_proto_message, children_fields, \
26 ConfigRevision, access_rights
27from voltha.core.config.config_rev_persisted import PersistedConfigRevision
28from voltha.core.config.merge_3way import merge_3way
29from voltha.protos import third_party
30from voltha.protos import meta_pb2
31
32import structlog
33
34log = structlog.get_logger()
35
36def message_to_dict(m):
37 return MessageToDict(m, True, True, False)
38
39
40def check_access_violation(new_msg, old_msg):
41 """Raise ValueError if attempt is made to change a read-only field"""
42 access_map = access_rights(new_msg.__class__)
43 violated_fields = []
44 for field_name, access in access_map.iteritems():
45 if access == meta_pb2.READ_ONLY:
46 if getattr(new_msg, field_name) != getattr(old_msg, field_name):
47 violated_fields.append(field_name)
48 if violated_fields:
49 raise ValueError('Cannot change read-only field(s) %s' %
50 ', '.join('"%s"' % f for f in violated_fields))
51
52
53def find_rev_by_key(revs, keyname, value):
54 for i, rev in enumerate(revs):
55 if getattr(rev._config._data, keyname) == value:
56 return i, rev
57 raise KeyError('key {}={} not found'.format(keyname, value))
58
59
60class ConfigNode(object):
61 """
62 Represents a configuration node which can hold a number of revisions
63 of the configuration for this node.
64 When the configuration changes, the new version is appended to the
65 node.
66 Initial data must be a protobuf message and it will determine the type of
67 this node.
68 """
69 __slots__ = (
70 '_root', # ref to root node
71 '_type', # node type, as __class__ of protobuf message
72 '_branches', # dict of transaction branches and a default (committed)
73 # branch
74 '_tags', # dict of tag-name to ref of ConfigRevision
75 '_proxy', # ref to proxy observer or None if no proxy assigned
76 '_event_bus', # ref to event_bus or None if no event bus is assigned
77 '_auto_prune'
78 )
79
80 def __init__(self, root, initial_data, auto_prune=True, txid=None):
81 self._root = root
82 self._branches = {}
83 self._tags = {}
84 self._proxy = None
85 self._event_bus = None
86 self._auto_prune = auto_prune
87
88 if isinstance(initial_data, type):
89 self._type = initial_data
90 elif is_proto_message(initial_data):
91 self._type = initial_data.__class__
92 copied_data = initial_data.__class__()
93 copied_data.CopyFrom(initial_data)
94 self._initialize(copied_data, txid)
95 else:
96 raise NotImplementedError()
97
98 def _mknode(self, *args, **kw):
99 return ConfigNode(self._root, *args, **kw)
100
101 def _mkrev(self, *args, **kw):
102 return self._root.mkrev(*args, **kw)
103
104 def _initialize(self, data, txid):
105 # separate external children data away from locally stored data
106 # based on child_node annotations in protobuf
107 children = {}
108 for field_name, field in children_fields(self._type).iteritems():
109 field_value = getattr(data, field_name)
110 if field.is_container:
111 if field.key:
112 keys_seen = set()
113 children[field_name] = lst = []
114 for v in field_value:
115 rev = self._mknode(v, txid=txid).latest
116 key = getattr(v, field.key)
117 if key in keys_seen:
118 raise ValueError('Duplicate key "{}"'.format(key))
119 lst.append(rev)
120 keys_seen.add(key)
121 else:
122 children[field_name] = [
123 self._mknode(v, txid=txid).latest for v in field_value]
124 else:
125 children[field_name] = [
126 self._mknode(field_value, txid=txid).latest]
127 data.ClearField(field_name)
128
129 branch = ConfigBranch(self, auto_prune=self._auto_prune)
130 rev = self._mkrev(branch, data, children)
131 self._make_latest(branch, rev)
132 self._branches[txid] = branch
133
134 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ accessors ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
135 # these convenience short-cuts only work for the committed branch
136
137 @property
138 def revisions(self):
139 return [r._hash for r in self._branches[None]._revs.itervalues()]
140
141 @property
142 def latest(self):
143 return self._branches[None]._latest
144
145 def __getitem__(self, hash):
146 return self._branches[None]._revs[hash]
147
148 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ get operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
149
150 def get(self, path=None, hash=None, depth=0, deep=False, txid=None):
151
152 # depth preparation
153 if deep:
154 depth = -1
155
156 # path preparation
157 path = '' if path is None else path
158 while path.startswith('/'):
159 path = path[1:]
160
161 # determine branch; if lookup fails, it is ok to use default branch
162 branch = self._branches.get(txid, None) or self._branches[None]
163
164 # determine rev
165 if hash is not None:
166 rev = branch._revs[hash]
167 else:
168 rev = branch.latest
169
170 return self._get(rev, path, depth)
171
172 def _get(self, rev, path, depth):
173
174 if not path:
175 return self._do_get(rev, depth)
176
177 # ... otherwise
178 name, _, path = path.partition('/')
179 field = children_fields(self._type)[name]
180 if field.is_container:
181 if field.key:
182 children = rev._children[name]
183 if path:
184 # need to escalate further
185 key, _, path = path.partition('/')
186 key = field.key_from_str(key)
187 _, child_rev = find_rev_by_key(children, field.key, key)
188 child_node = child_rev.node
189 return child_node._get(child_rev, path, depth)
190 else:
191 # we are the node of interest
192 response = []
193 for child_rev in children:
194 child_node = child_rev.node
195 value = child_node._do_get(child_rev, depth)
196 response.append(value)
197 return response
198 else:
199 if path:
200 raise LookupError(
201 'Cannot index into container with no key defined')
202 response = []
203 for child_rev in rev._children[name]:
204 child_node = child_rev.node
205 value = child_node._do_get(child_rev, depth)
206 response.append(value)
207 return response
208 else:
209 child_rev = rev._children[name][0]
210 child_node = child_rev.node
211 return child_node._get(child_rev, path, depth)
212
213 def _do_get(self, rev, depth):
214 msg = rev.get(depth)
215 if self._proxy is not None:
216 msg = self._proxy.invoke_callbacks(CallbackType.GET, msg)
217 return msg
218
219 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ update operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
220
221 def update(self, path, data, strict=False, txid=None, mk_branch=None):
222
223 while path.startswith('/'):
224 path = path[1:]
225
226 try:
227 branch = self._branches[txid]
228 except KeyError:
229 branch = mk_branch(self)
230
231 if not path:
232 return self._do_update(branch, data, strict)
233
234 rev = branch._latest # change is always made to the latest
235 name, _, path = path.partition('/')
236 field = children_fields(self._type)[name]
237 if field.is_container:
238 if not path:
239 raise ValueError('Cannot update a list')
240 if field.key:
241 key, _, path = path.partition('/')
242 key = field.key_from_str(key)
243 children = copy(rev._children[name])
244 idx, child_rev = find_rev_by_key(children, field.key, key)
245 child_node = child_rev.node
246 # chek if deep copy will work better
247 new_child_rev = child_node.update(
248 path, data, strict, txid, mk_branch)
249 if new_child_rev.hash == child_rev.hash:
250 # When the new_child_rev goes out of scope,
251 # it's destructor gets invoked as it is not being
252 # referred by any other data structures. To prevent
253 # this to trigger the hash it is holding from being
254 # erased in the db, its hash is set to None. If the
255 # new_child_rev object is pointing at the same address
256 # as the child_rev address then do not clear the hash
257 if new_child_rev != child_rev:
258 log.debug('clear-hash',
259 hash=new_child_rev.hash, object_ref=new_child_rev)
260 new_child_rev.clear_hash()
261 return branch._latest
262 if getattr(new_child_rev.data, field.key) != key:
263 raise ValueError('Cannot change key field')
264 children[idx] = new_child_rev
265 rev = rev.update_children(name, children, branch)
266 self._make_latest(branch, rev)
267 return rev
268 else:
269 raise ValueError('Cannot index into container with no keys')
270
271 else:
272 child_rev = rev._children[name][0]
273 child_node = child_rev.node
274 new_child_rev = child_node.update(
275 path, data, strict, txid, mk_branch)
276 rev = rev.update_children(name, [new_child_rev], branch)
277 self._make_latest(branch, rev)
278 return rev
279
280 def _do_update(self, branch, data, strict):
281 if not isinstance(data, self._type):
282 raise ValueError(
283 '"{}" is not a valid data type for this node'.format(
284 data.__class__.__name__))
285 self._test_no_children(data)
286 if self._proxy is not None:
287 self._proxy.invoke_callbacks(CallbackType.PRE_UPDATE, data)
288
289 if branch._latest.data != data:
290 if strict:
291 # check if attempt is made to change read-only field
292 check_access_violation(data, branch._latest.data)
293 rev = branch._latest.update_data(data, branch)
294 self._make_latest(branch, rev,
295 ((CallbackType.POST_UPDATE, rev.data),))
296 return rev
297 else:
298 return branch._latest
299
300 def _make_latest(self, branch, rev, change_announcements=()):
301 # Update the latest branch only when the hash between the previous
302 # data and the new rev is different, otherwise this will trigger the
303 # data already saved in the db (with that hash) to be erased
304 if rev.hash not in branch._revs:
305 branch._revs[rev.hash] = rev
306
307 if not branch._latest or rev.hash != branch._latest.hash:
308 branch._latest = rev
309
310 # announce only if this is main branch
311 if change_announcements and branch._txid is None:
312
313 if self._proxy is not None:
314 for change_type, data in change_announcements:
315 # since the callback may operate on the config tree,
316 # we have to defer the execution of the callbacks till
317 # the change is propagated to the root, then root will
318 # call the callbacks
319 self._root.enqueue_callback(
320 self._proxy.invoke_callbacks,
321 change_type,
322 data,
323 proceed_on_errors=1,
324 )
325
326 for change_type, data in change_announcements:
327 self._root.enqueue_notification_callback(
328 self._mk_event_bus().advertise,
329 change_type,
330 data,
331 hash=rev.hash
332 )
333
334 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ add operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
335
336 def add(self, path, data, txid=None, mk_branch=None):
337 while path.startswith('/'):
338 path = path[1:]
339 if not path:
340 raise ValueError('Cannot add to non-container node')
341
342 try:
343 branch = self._branches[txid]
344 except KeyError:
345 branch = mk_branch(self)
346
347 rev = branch._latest # change is always made to latest
348 name, _, path = path.partition('/')
349 field = children_fields(self._type)[name]
350 if field.is_container:
351 if not path:
352 # we do need to add a new child to the field
353 if field.key:
354 if self._proxy is not None:
355 self._proxy.invoke_callbacks(
356 CallbackType.PRE_ADD, data)
357 children = copy(rev._children[name])
358 key = getattr(data, field.key)
359 try:
360 find_rev_by_key(children, field.key, key)
361 except KeyError:
362 pass
363 else:
364 raise ValueError('Duplicate key "{}"'.format(key))
365 child_rev = self._mknode(data).latest
366 children.append(child_rev)
367 rev = rev.update_children(name, children, branch)
368 self._make_latest(branch, rev,
369 ((CallbackType.POST_ADD, data),))
370 return rev
371 else:
372 # adding to non-keyed containers not implemented yet
373 raise ValueError('Cannot add to non-keyed container')
374 else:
375 if field.key:
376 # need to escalate
377 key, _, path = path.partition('/')
378 key = field.key_from_str(key)
379 children = copy(rev._children[name])
380 idx, child_rev = find_rev_by_key(children, field.key, key)
381 child_node = child_rev.node
382 new_child_rev = child_node.add(path, data, txid, mk_branch)
383 children[idx] = new_child_rev
384 rev = rev.update_children(name, children, branch)
385 self._make_latest(branch, rev)
386 return rev
387 else:
388 raise ValueError(
389 'Cannot index into container with no keys')
390 else:
391 raise ValueError('Cannot add to non-container field')
392
393 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ remove operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
394
395 def remove(self, path, txid=None, mk_branch=None):
396 while path.startswith('/'):
397 path = path[1:]
398 if not path:
399 raise ValueError('Cannot remove from non-container node')
400
401 try:
402 branch = self._branches[txid]
403 except KeyError:
404 branch = mk_branch(self)
405
406 rev = branch._latest # change is always made to latest
407 name, _, path = path.partition('/')
408 field = children_fields(self._type)[name]
409 if field.is_container:
410 if not path:
411 raise ValueError("Cannot remove without a key")
412 if field.key:
413 key, _, path = path.partition('/')
414 key = field.key_from_str(key)
415 if path:
416 # need to escalate
417 children = copy(rev._children[name])
418 idx, child_rev = find_rev_by_key(children, field.key, key)
419 child_node = child_rev.node
420 new_child_rev = child_node.remove(path, txid, mk_branch)
421 children[idx] = new_child_rev
422 rev = rev.update_children(name, children, branch)
423 self._make_latest(branch, rev)
424 return rev
425 else:
426 # need to remove from this very node
427 children = copy(rev._children[name])
428 idx, child_rev = find_rev_by_key(children, field.key, key)
429 if self._proxy is not None:
430 data = child_rev.data
431 self._proxy.invoke_callbacks(
432 CallbackType.PRE_REMOVE, data)
433 post_anno = ((CallbackType.POST_REMOVE, data),)
434 else:
435 post_anno = ((CallbackType.POST_REMOVE, child_rev.data),)
436 del children[idx]
437 rev = rev.update_children(name, children, branch)
438 self._make_latest(branch, rev, post_anno)
439 return rev
440 else:
441 raise ValueError('Cannot remove from non-keyed container')
442 else:
443 raise ValueError('Cannot remove non-conatiner field')
444
445 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Branching ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
446
447 def _mk_txbranch(self, txid):
448 branch_point = self._branches[None].latest
449 branch = ConfigBranch(self, txid, branch_point)
450 self._branches[txid] = branch
451 return branch
452
453 def _del_txbranch(self, txid):
454 del self._branches[txid]
455
456 def _merge_txbranch(self, txid, dry_run=False):
457 """
458 Make latest in branch to be latest in the common branch, but only
459 if no conflict is detected. Conflict is where the txbranch branch
460 point no longer matches the latest in the default branch. This has
461 to be verified recursively.
462 """
463
464 def merge_child(child_rev):
465 child_branch = child_rev._branch
466 if child_branch._txid == txid:
467 child_rev = child_branch._node._merge_txbranch(txid, dry_run)
468 return child_rev
469
470 src_branch = self._branches[txid]
471 dst_branch = self._branches[None]
472
473 fork_rev = src_branch.origin # rev from which src branch was made
474 src_rev = src_branch.latest # head rev of source branch
475 dst_rev = dst_branch.latest # head rev of target branch
476
477 rev, changes = merge_3way(
478 fork_rev, src_rev, dst_rev, merge_child, dry_run)
479
480 if not dry_run:
481 self._make_latest(dst_branch, rev, change_announcements=changes)
482 del self._branches[txid]
483
484 return rev
485
486 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Diff utility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
487
488 def diff(self, hash1, hash2=None, txid=None):
489 branch = self._branches[txid]
490 rev1 = branch[hash1]
491 rev2 = branch[hash2] if hash2 else branch._latest
492 if rev1.hash == rev2.hash:
493 return JsonPatch([])
494 else:
495 dict1 = message_to_dict(rev1.data)
496 dict2 = message_to_dict(rev2.data)
497 return make_patch(dict1, dict2)
498
499 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Tagging utility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
500
501 def tag(self, tag, hash=None):
502 branch = self._branches[None] # tag only what has been committed
503 rev = branch._latest if hash is None else branch._revs[hash]
504 self._tags[tag] = rev
505 self.persist_tags()
506 return self
507
508 @property
509 def tags(self):
510 return sorted(self._tags.iterkeys())
511
512 def by_tag(self, tag):
513 """
514 Return revision based on tag
515 :param tag: previously registered tag value
516 :return: revision object
517 """
518 return self._tags[tag]
519
520 def diff_by_tag(self, tag1, tag2):
521 return self.diff(self._tags[tag1].hash, self._tags[tag2].hash)
522
523 def delete_tag(self, tag):
524 del self._tags[tag]
525 self.persist_tags()
526
527 def delete_tags(self, *tags):
528 for tag in tags:
529 del self._tags[tag]
530 self.persist_tags()
531
532 def prune_untagged(self):
533 branch = self._branches[None]
534 keep = set(rev.hash for rev in self._tags.itervalues())
535 keep.add(branch._latest.hash)
536 for hash in branch._revs.keys():
537 if hash not in keep:
538 del branch._revs[hash]
539 return self
540
541 def persist_tags(self):
542 """
543 Persist tag information to the backend
544 """
545
546 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Internals ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
547
548 def _test_no_children(self, data):
549 for field_name, field in children_fields(self._type).items():
550 field_value = getattr(data, field_name)
551 if field.is_container:
552 if len(field_value):
553 raise NotImplementedError(
554 'Cannot update external children')
555 else:
556 if data.HasField(field_name):
557 raise NotImplementedError(
558 'Cannot update externel children')
559
560 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Node proxy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
561
562 def get_proxy(self, path, exclusive=False):
563 return self._get_proxy(path, self, path, exclusive)
564
565 def _get_proxy(self, path, root, full_path, exclusive):
566 while path.startswith('/'):
567 path = path[1:]
568 if not path:
569 return self._mk_proxy(root, full_path, exclusive)
570
571 # need to escalate
572 rev = self._branches[None]._latest
573 name, _, path = path.partition('/')
574 field = children_fields(self._type)[name]
575 if field.is_container:
576 if not path:
577 raise ValueError('Cannot proxy a container field')
578 if field.key:
579 key, _, path = path.partition('/')
580 key = field.key_from_str(key)
581 children = rev._children[name]
582 _, child_rev = find_rev_by_key(children, field.key, key)
583 child_node = child_rev.node
584 return child_node._get_proxy(path, root, full_path, exclusive)
585
586 raise ValueError('Cannot index into container with no keys')
587
588 else:
589 child_rev = rev._children[name][0]
590 child_node = child_rev.node
591 return child_node._get_proxy(path, root, full_path, exclusive)
592
593 def _mk_proxy(self, root, full_path, exclusive):
594 if self._proxy is None:
595 self._proxy = ConfigProxy(root, self, full_path, exclusive)
596 else:
597 if self._proxy.exclusive:
598 raise ValueError('Node is already owned exclusively')
599 return self._proxy
600
601 def _mk_event_bus(self):
602 if self._event_bus is None:
603 self._event_bus = ConfigEventBus()
604 return self._event_bus
605
606 # ~~~~~~~~~~~~~~~~~~~~~~~~ Persistence loading ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
607
608 def load_latest(self, latest_hash):
609
610 root = self._root
611 kv_store = root._kv_store
612
613 branch = ConfigBranch(node=self, auto_prune=self._auto_prune)
614 rev = PersistedConfigRevision.load(
615 branch, kv_store, self._type, latest_hash)
616 self._make_latest(branch, rev)
617 self._branches[None] = branch