VOL-1451 Initial checkin of openonu build
Produced docker container capable of building and running
openonu/brcm_openonci_onu. Copied over current onu code
and resolved all imports by copying into the local source tree.
Change-Id: Ib9785d37afc65b7d32ecf74aee2456352626e2b6
diff --git a/python/core/config/config_node.py b/python/core/config/config_node.py
new file mode 100644
index 0000000..ab73484
--- /dev/null
+++ b/python/core/config/config_node.py
@@ -0,0 +1,617 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from copy import copy
+
+from jsonpatch import JsonPatch
+from jsonpatch import make_patch
+
+from common.utils.json_format import MessageToDict
+from voltha.core.config.config_branch import ConfigBranch
+from voltha.core.config.config_event_bus import ConfigEventBus
+from voltha.core.config.config_proxy import CallbackType, ConfigProxy
+from voltha.core.config.config_rev import is_proto_message, children_fields, \
+ ConfigRevision, access_rights
+from voltha.core.config.config_rev_persisted import PersistedConfigRevision
+from voltha.core.config.merge_3way import merge_3way
+from voltha.protos import third_party
+from voltha.protos import meta_pb2
+
+import structlog
+
+log = structlog.get_logger()
+
+def message_to_dict(m):
+ return MessageToDict(m, True, True, False)
+
+
+def check_access_violation(new_msg, old_msg):
+ """Raise ValueError if attempt is made to change a read-only field"""
+ access_map = access_rights(new_msg.__class__)
+ violated_fields = []
+ for field_name, access in access_map.iteritems():
+ if access == meta_pb2.READ_ONLY:
+ if getattr(new_msg, field_name) != getattr(old_msg, field_name):
+ violated_fields.append(field_name)
+ if violated_fields:
+ raise ValueError('Cannot change read-only field(s) %s' %
+ ', '.join('"%s"' % f for f in violated_fields))
+
+
+def find_rev_by_key(revs, keyname, value):
+ for i, rev in enumerate(revs):
+ if getattr(rev._config._data, keyname) == value:
+ return i, rev
+ raise KeyError('key {}={} not found'.format(keyname, value))
+
+
+class ConfigNode(object):
+ """
+ Represents a configuration node which can hold a number of revisions
+ of the configuration for this node.
+ When the configuration changes, the new version is appended to the
+ node.
+ Initial data must be a protobuf message and it will determine the type of
+ this node.
+ """
+ __slots__ = (
+ '_root', # ref to root node
+ '_type', # node type, as __class__ of protobuf message
+ '_branches', # dict of transaction branches and a default (committed)
+ # branch
+ '_tags', # dict of tag-name to ref of ConfigRevision
+ '_proxy', # ref to proxy observer or None if no proxy assigned
+ '_event_bus', # ref to event_bus or None if no event bus is assigned
+ '_auto_prune'
+ )
+
+ def __init__(self, root, initial_data, auto_prune=True, txid=None):
+ self._root = root
+ self._branches = {}
+ self._tags = {}
+ self._proxy = None
+ self._event_bus = None
+ self._auto_prune = auto_prune
+
+ if isinstance(initial_data, type):
+ self._type = initial_data
+ elif is_proto_message(initial_data):
+ self._type = initial_data.__class__
+ copied_data = initial_data.__class__()
+ copied_data.CopyFrom(initial_data)
+ self._initialize(copied_data, txid)
+ else:
+ raise NotImplementedError()
+
+ def _mknode(self, *args, **kw):
+ return ConfigNode(self._root, *args, **kw)
+
+ def _mkrev(self, *args, **kw):
+ return self._root.mkrev(*args, **kw)
+
+ def _initialize(self, data, txid):
+ # separate external children data away from locally stored data
+ # based on child_node annotations in protobuf
+ children = {}
+ for field_name, field in children_fields(self._type).iteritems():
+ field_value = getattr(data, field_name)
+ if field.is_container:
+ if field.key:
+ keys_seen = set()
+ children[field_name] = lst = []
+ for v in field_value:
+ rev = self._mknode(v, txid=txid).latest
+ key = getattr(v, field.key)
+ if key in keys_seen:
+ raise ValueError('Duplicate key "{}"'.format(key))
+ lst.append(rev)
+ keys_seen.add(key)
+ else:
+ children[field_name] = [
+ self._mknode(v, txid=txid).latest for v in field_value]
+ else:
+ children[field_name] = [
+ self._mknode(field_value, txid=txid).latest]
+ data.ClearField(field_name)
+
+ branch = ConfigBranch(self, auto_prune=self._auto_prune)
+ rev = self._mkrev(branch, data, children)
+ self._make_latest(branch, rev)
+ self._branches[txid] = branch
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ accessors ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ # these convenience short-cuts only work for the committed branch
+
+ @property
+ def revisions(self):
+ return [r._hash for r in self._branches[None]._revs.itervalues()]
+
+ @property
+ def latest(self):
+ return self._branches[None]._latest
+
+ def __getitem__(self, hash):
+ return self._branches[None]._revs[hash]
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ get operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def get(self, path=None, hash=None, depth=0, deep=False, txid=None):
+
+ # depth preparation
+ if deep:
+ depth = -1
+
+ # path preparation
+ path = '' if path is None else path
+ while path.startswith('/'):
+ path = path[1:]
+
+ # determine branch; if lookup fails, it is ok to use default branch
+ branch = self._branches.get(txid, None) or self._branches[None]
+
+ # determine rev
+ if hash is not None:
+ rev = branch._revs[hash]
+ else:
+ rev = branch.latest
+
+ return self._get(rev, path, depth)
+
+ def _get(self, rev, path, depth):
+
+ if not path:
+ return self._do_get(rev, depth)
+
+ # ... otherwise
+ name, _, path = path.partition('/')
+ field = children_fields(self._type)[name]
+ if field.is_container:
+ if field.key:
+ children = rev._children[name]
+ if path:
+ # need to escalate further
+ key, _, path = path.partition('/')
+ key = field.key_from_str(key)
+ _, child_rev = find_rev_by_key(children, field.key, key)
+ child_node = child_rev.node
+ return child_node._get(child_rev, path, depth)
+ else:
+ # we are the node of interest
+ response = []
+ for child_rev in children:
+ child_node = child_rev.node
+ value = child_node._do_get(child_rev, depth)
+ response.append(value)
+ return response
+ else:
+ if path:
+ raise LookupError(
+ 'Cannot index into container with no key defined')
+ response = []
+ for child_rev in rev._children[name]:
+ child_node = child_rev.node
+ value = child_node._do_get(child_rev, depth)
+ response.append(value)
+ return response
+ else:
+ child_rev = rev._children[name][0]
+ child_node = child_rev.node
+ return child_node._get(child_rev, path, depth)
+
+ def _do_get(self, rev, depth):
+ msg = rev.get(depth)
+ if self._proxy is not None:
+ msg = self._proxy.invoke_callbacks(CallbackType.GET, msg)
+ return msg
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ update operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def update(self, path, data, strict=False, txid=None, mk_branch=None):
+
+ while path.startswith('/'):
+ path = path[1:]
+
+ try:
+ branch = self._branches[txid]
+ except KeyError:
+ branch = mk_branch(self)
+
+ if not path:
+ return self._do_update(branch, data, strict)
+
+ rev = branch._latest # change is always made to the latest
+ name, _, path = path.partition('/')
+ field = children_fields(self._type)[name]
+ if field.is_container:
+ if not path:
+ raise ValueError('Cannot update a list')
+ if field.key:
+ key, _, path = path.partition('/')
+ key = field.key_from_str(key)
+ children = copy(rev._children[name])
+ idx, child_rev = find_rev_by_key(children, field.key, key)
+ child_node = child_rev.node
+ # chek if deep copy will work better
+ new_child_rev = child_node.update(
+ path, data, strict, txid, mk_branch)
+ if new_child_rev.hash == child_rev.hash:
+ # When the new_child_rev goes out of scope,
+ # it's destructor gets invoked as it is not being
+ # referred by any other data structures. To prevent
+ # this to trigger the hash it is holding from being
+ # erased in the db, its hash is set to None. If the
+ # new_child_rev object is pointing at the same address
+ # as the child_rev address then do not clear the hash
+ if new_child_rev != child_rev:
+ log.debug('clear-hash',
+ hash=new_child_rev.hash, object_ref=new_child_rev)
+ new_child_rev.clear_hash()
+ return branch._latest
+ if getattr(new_child_rev.data, field.key) != key:
+ raise ValueError('Cannot change key field')
+ children[idx] = new_child_rev
+ rev = rev.update_children(name, children, branch)
+ self._make_latest(branch, rev)
+ return rev
+ else:
+ raise ValueError('Cannot index into container with no keys')
+
+ else:
+ child_rev = rev._children[name][0]
+ child_node = child_rev.node
+ new_child_rev = child_node.update(
+ path, data, strict, txid, mk_branch)
+ rev = rev.update_children(name, [new_child_rev], branch)
+ self._make_latest(branch, rev)
+ return rev
+
+ def _do_update(self, branch, data, strict):
+ if not isinstance(data, self._type):
+ raise ValueError(
+ '"{}" is not a valid data type for this node'.format(
+ data.__class__.__name__))
+ self._test_no_children(data)
+ if self._proxy is not None:
+ self._proxy.invoke_callbacks(CallbackType.PRE_UPDATE, data)
+
+ if branch._latest.data != data:
+ if strict:
+ # check if attempt is made to change read-only field
+ check_access_violation(data, branch._latest.data)
+ rev = branch._latest.update_data(data, branch)
+ self._make_latest(branch, rev,
+ ((CallbackType.POST_UPDATE, rev.data),))
+ return rev
+ else:
+ return branch._latest
+
+ def _make_latest(self, branch, rev, change_announcements=()):
+ # Update the latest branch only when the hash between the previous
+ # data and the new rev is different, otherwise this will trigger the
+ # data already saved in the db (with that hash) to be erased
+ if rev.hash not in branch._revs:
+ branch._revs[rev.hash] = rev
+
+ if not branch._latest or rev.hash != branch._latest.hash:
+ branch._latest = rev
+
+ # announce only if this is main branch
+ if change_announcements and branch._txid is None:
+
+ if self._proxy is not None:
+ for change_type, data in change_announcements:
+ # since the callback may operate on the config tree,
+ # we have to defer the execution of the callbacks till
+ # the change is propagated to the root, then root will
+ # call the callbacks
+ self._root.enqueue_callback(
+ self._proxy.invoke_callbacks,
+ change_type,
+ data,
+ proceed_on_errors=1,
+ )
+
+ for change_type, data in change_announcements:
+ self._root.enqueue_notification_callback(
+ self._mk_event_bus().advertise,
+ change_type,
+ data,
+ hash=rev.hash
+ )
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ add operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def add(self, path, data, txid=None, mk_branch=None):
+ while path.startswith('/'):
+ path = path[1:]
+ if not path:
+ raise ValueError('Cannot add to non-container node')
+
+ try:
+ branch = self._branches[txid]
+ except KeyError:
+ branch = mk_branch(self)
+
+ rev = branch._latest # change is always made to latest
+ name, _, path = path.partition('/')
+ field = children_fields(self._type)[name]
+ if field.is_container:
+ if not path:
+ # we do need to add a new child to the field
+ if field.key:
+ if self._proxy is not None:
+ self._proxy.invoke_callbacks(
+ CallbackType.PRE_ADD, data)
+ children = copy(rev._children[name])
+ key = getattr(data, field.key)
+ try:
+ find_rev_by_key(children, field.key, key)
+ except KeyError:
+ pass
+ else:
+ raise ValueError('Duplicate key "{}"'.format(key))
+ child_rev = self._mknode(data).latest
+ children.append(child_rev)
+ rev = rev.update_children(name, children, branch)
+ self._make_latest(branch, rev,
+ ((CallbackType.POST_ADD, data),))
+ return rev
+ else:
+ # adding to non-keyed containers not implemented yet
+ raise ValueError('Cannot add to non-keyed container')
+ else:
+ if field.key:
+ # need to escalate
+ key, _, path = path.partition('/')
+ key = field.key_from_str(key)
+ children = copy(rev._children[name])
+ idx, child_rev = find_rev_by_key(children, field.key, key)
+ child_node = child_rev.node
+ new_child_rev = child_node.add(path, data, txid, mk_branch)
+ children[idx] = new_child_rev
+ rev = rev.update_children(name, children, branch)
+ self._make_latest(branch, rev)
+ return rev
+ else:
+ raise ValueError(
+ 'Cannot index into container with no keys')
+ else:
+ raise ValueError('Cannot add to non-container field')
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ remove operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def remove(self, path, txid=None, mk_branch=None):
+ while path.startswith('/'):
+ path = path[1:]
+ if not path:
+ raise ValueError('Cannot remove from non-container node')
+
+ try:
+ branch = self._branches[txid]
+ except KeyError:
+ branch = mk_branch(self)
+
+ rev = branch._latest # change is always made to latest
+ name, _, path = path.partition('/')
+ field = children_fields(self._type)[name]
+ if field.is_container:
+ if not path:
+ raise ValueError("Cannot remove without a key")
+ if field.key:
+ key, _, path = path.partition('/')
+ key = field.key_from_str(key)
+ if path:
+ # need to escalate
+ children = copy(rev._children[name])
+ idx, child_rev = find_rev_by_key(children, field.key, key)
+ child_node = child_rev.node
+ new_child_rev = child_node.remove(path, txid, mk_branch)
+ children[idx] = new_child_rev
+ rev = rev.update_children(name, children, branch)
+ self._make_latest(branch, rev)
+ return rev
+ else:
+ # need to remove from this very node
+ children = copy(rev._children[name])
+ idx, child_rev = find_rev_by_key(children, field.key, key)
+ if self._proxy is not None:
+ data = child_rev.data
+ self._proxy.invoke_callbacks(
+ CallbackType.PRE_REMOVE, data)
+ post_anno = ((CallbackType.POST_REMOVE, data),)
+ else:
+ post_anno = ((CallbackType.POST_REMOVE, child_rev.data),)
+ del children[idx]
+ rev = rev.update_children(name, children, branch)
+ self._make_latest(branch, rev, post_anno)
+ return rev
+ else:
+ raise ValueError('Cannot remove from non-keyed container')
+ else:
+ raise ValueError('Cannot remove non-conatiner field')
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Branching ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def _mk_txbranch(self, txid):
+ branch_point = self._branches[None].latest
+ branch = ConfigBranch(self, txid, branch_point)
+ self._branches[txid] = branch
+ return branch
+
+ def _del_txbranch(self, txid):
+ del self._branches[txid]
+
+ def _merge_txbranch(self, txid, dry_run=False):
+ """
+ Make latest in branch to be latest in the common branch, but only
+ if no conflict is detected. Conflict is where the txbranch branch
+ point no longer matches the latest in the default branch. This has
+ to be verified recursively.
+ """
+
+ def merge_child(child_rev):
+ child_branch = child_rev._branch
+ if child_branch._txid == txid:
+ child_rev = child_branch._node._merge_txbranch(txid, dry_run)
+ return child_rev
+
+ src_branch = self._branches[txid]
+ dst_branch = self._branches[None]
+
+ fork_rev = src_branch.origin # rev from which src branch was made
+ src_rev = src_branch.latest # head rev of source branch
+ dst_rev = dst_branch.latest # head rev of target branch
+
+ rev, changes = merge_3way(
+ fork_rev, src_rev, dst_rev, merge_child, dry_run)
+
+ if not dry_run:
+ self._make_latest(dst_branch, rev, change_announcements=changes)
+ del self._branches[txid]
+
+ return rev
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Diff utility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def diff(self, hash1, hash2=None, txid=None):
+ branch = self._branches[txid]
+ rev1 = branch[hash1]
+ rev2 = branch[hash2] if hash2 else branch._latest
+ if rev1.hash == rev2.hash:
+ return JsonPatch([])
+ else:
+ dict1 = message_to_dict(rev1.data)
+ dict2 = message_to_dict(rev2.data)
+ return make_patch(dict1, dict2)
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Tagging utility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def tag(self, tag, hash=None):
+ branch = self._branches[None] # tag only what has been committed
+ rev = branch._latest if hash is None else branch._revs[hash]
+ self._tags[tag] = rev
+ self.persist_tags()
+ return self
+
+ @property
+ def tags(self):
+ return sorted(self._tags.iterkeys())
+
+ def by_tag(self, tag):
+ """
+ Return revision based on tag
+ :param tag: previously registered tag value
+ :return: revision object
+ """
+ return self._tags[tag]
+
+ def diff_by_tag(self, tag1, tag2):
+ return self.diff(self._tags[tag1].hash, self._tags[tag2].hash)
+
+ def delete_tag(self, tag):
+ del self._tags[tag]
+ self.persist_tags()
+
+ def delete_tags(self, *tags):
+ for tag in tags:
+ del self._tags[tag]
+ self.persist_tags()
+
+ def prune_untagged(self):
+ branch = self._branches[None]
+ keep = set(rev.hash for rev in self._tags.itervalues())
+ keep.add(branch._latest.hash)
+ for hash in branch._revs.keys():
+ if hash not in keep:
+ del branch._revs[hash]
+ return self
+
+ def persist_tags(self):
+ """
+ Persist tag information to the backend
+ """
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Internals ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def _test_no_children(self, data):
+ for field_name, field in children_fields(self._type).items():
+ field_value = getattr(data, field_name)
+ if field.is_container:
+ if len(field_value):
+ raise NotImplementedError(
+ 'Cannot update external children')
+ else:
+ if data.HasField(field_name):
+ raise NotImplementedError(
+ 'Cannot update externel children')
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Node proxy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def get_proxy(self, path, exclusive=False):
+ return self._get_proxy(path, self, path, exclusive)
+
+ def _get_proxy(self, path, root, full_path, exclusive):
+ while path.startswith('/'):
+ path = path[1:]
+ if not path:
+ return self._mk_proxy(root, full_path, exclusive)
+
+ # need to escalate
+ rev = self._branches[None]._latest
+ name, _, path = path.partition('/')
+ field = children_fields(self._type)[name]
+ if field.is_container:
+ if not path:
+ raise ValueError('Cannot proxy a container field')
+ if field.key:
+ key, _, path = path.partition('/')
+ key = field.key_from_str(key)
+ children = rev._children[name]
+ _, child_rev = find_rev_by_key(children, field.key, key)
+ child_node = child_rev.node
+ return child_node._get_proxy(path, root, full_path, exclusive)
+
+ raise ValueError('Cannot index into container with no keys')
+
+ else:
+ child_rev = rev._children[name][0]
+ child_node = child_rev.node
+ return child_node._get_proxy(path, root, full_path, exclusive)
+
+ def _mk_proxy(self, root, full_path, exclusive):
+ if self._proxy is None:
+ self._proxy = ConfigProxy(root, self, full_path, exclusive)
+ else:
+ if self._proxy.exclusive:
+ raise ValueError('Node is already owned exclusively')
+ return self._proxy
+
+ def _mk_event_bus(self):
+ if self._event_bus is None:
+ self._event_bus = ConfigEventBus()
+ return self._event_bus
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~ Persistence loading ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def load_latest(self, latest_hash):
+
+ root = self._root
+ kv_store = root._kv_store
+
+ branch = ConfigBranch(node=self, auto_prune=self._auto_prune)
+ rev = PersistedConfigRevision.load(
+ branch, kv_store, self._type, latest_hash)
+ self._make_latest(branch, rev)
+ self._branches[None] = branch