blob: 4b1006d490171a12bf3c4b81b4a7fba6a868a1ce [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16from uuid import uuid4
17
18import structlog
19from simplejson import dumps, loads
20
21from voltha.core.config.config_node import ConfigNode
22from voltha.core.config.config_rev import ConfigRevision
23from voltha.core.config.config_rev_persisted import PersistedConfigRevision
24from voltha.core.config.merge_3way import MergeConflictException
25
26log = structlog.get_logger()
27
28
29class ConfigRoot(ConfigNode):
30
31 __slots__ = (
32 '_dirty_nodes', # holds set of modified nodes per transaction branch
33 '_kv_store',
34 '_loading',
35 '_rev_cls',
36 '_deferred_callback_queue',
37 '_notification_deferred_callback_queue'
38 )
39
40 def __init__(self, initial_data, kv_store=None, rev_cls=ConfigRevision):
41 self._kv_store = kv_store
42 self._dirty_nodes = {}
43 self._loading = False
44 if kv_store is not None and \
45 not issubclass(rev_cls, PersistedConfigRevision):
46 rev_cls = PersistedConfigRevision
47 self._rev_cls = rev_cls
48 self._deferred_callback_queue = []
49 self._notification_deferred_callback_queue = []
50 super(ConfigRoot, self).__init__(self, initial_data, False)
51
52 @property
53 def kv_store(self):
54 if self._loading:
55 # provide fake store for storing things
56 # TODO this shall be a fake_dict providing noop for all relevant
57 # operations
58 return dict()
59 else:
60 return self._kv_store
61
62 def mkrev(self, *args, **kw):
63 return self._rev_cls(*args, **kw)
64
65 def mk_txbranch(self):
66 txid = uuid4().hex[:12]
67 self._dirty_nodes[txid] = {self}
68 self._mk_txbranch(txid)
69 return txid
70
71 def del_txbranch(self, txid):
72 for dirty_node in self._dirty_nodes[txid]:
73 dirty_node._del_txbranch(txid)
74 del self._dirty_nodes[txid]
75
76 def fold_txbranch(self, txid):
77 try:
78 self._merge_txbranch(txid, dry_run=1)
79 except MergeConflictException:
80 self.del_txbranch(txid)
81 raise
82
83 try:
84 self._merge_txbranch(txid)
85 finally:
86 self.execute_deferred_callbacks()
87
88 # ~~~~~~ Overridden, root-level CRUD methods to handle transactions ~~~~~~~
89
90 def update(self, path, data, strict=None, txid=None, mk_branch=None):
91 assert mk_branch is None
92 self.check_callback_queue()
93 try:
94 if txid is not None:
95 dirtied = self._dirty_nodes[txid]
96
97 def track_dirty(node):
98 dirtied.add(node)
99 return node._mk_txbranch(txid)
100
101 res = super(ConfigRoot, self).update(path, data, strict,
102 txid, track_dirty)
103 else:
104 res = super(ConfigRoot, self).update(path, data, strict)
105 finally:
106 self.execute_deferred_callbacks()
107 return res
108
109 def add(self, path, data, txid=None, mk_branch=None):
110 assert mk_branch is None
111 self.check_callback_queue()
112 try:
113 if txid is not None:
114 dirtied = self._dirty_nodes[txid]
115
116 def track_dirty(node):
117 dirtied.add(node)
118 return node._mk_txbranch(txid)
119
120 res = super(ConfigRoot, self).add(path, data, txid, track_dirty)
121 else:
122 res = super(ConfigRoot, self).add(path, data)
123 finally:
124 self.execute_deferred_callbacks()
125 return res
126
127 def remove(self, path, txid=None, mk_branch=None):
128 assert mk_branch is None
129 self.check_callback_queue()
130 try:
131 if txid is not None:
132 dirtied = self._dirty_nodes[txid]
133
134 def track_dirty(node):
135 dirtied.add(node)
136 return node._mk_txbranch(txid)
137
138 res = super(ConfigRoot, self).remove(path, txid, track_dirty)
139 else:
140 res = super(ConfigRoot, self).remove(path)
141 finally:
142 self.execute_deferred_callbacks()
143 return res
144
145 def check_callback_queue(self):
146 assert len(self._deferred_callback_queue) == 0
147
148 def enqueue_callback(self, func, *args, **kw):
149 self._deferred_callback_queue.append((func, args, kw))
150
151 def enqueue_notification_callback(self, func, *args, **kw):
152 """
153 A separate queue is required for notification. Previously, when the
154 notifications were added to the self._deferred_callback_queue there
155 was a deadlock condition where two callbacks were added (one
156 related to the model change and one for the notification related to
157 that model change). Since the model change requires the
158 self._deferred_callback_queue to be empty then there was a deadlock
159 in that scenario. The simple approach to avoid this problem is to
160 have separate queues for model and notification.
161 TODO: Investigate whether there is a need for the
162 self._deferred_callback_queue to handle multiple model events at the same time
163 :param func: callback function
164 :param args: args
165 :param kw: key-value args
166 :return: None
167 """
168 self._notification_deferred_callback_queue.append((func, args, kw))
169
170 def execute_deferred_callbacks(self):
171 # First process the model-triggered related callbacks
172 while self._deferred_callback_queue:
173 func, args, kw = self._deferred_callback_queue.pop(0)
174 func(*args, **kw)
175
176 # Execute the notification callbacks
177 while self._notification_deferred_callback_queue:
178 func, args, kw = self._notification_deferred_callback_queue.pop(0)
179 func(*args, **kw)
180
181
182 # ~~~~~~~~~~~~~~~~ Persistence related ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
183
184 @classmethod
185 def load(cls, root_msg_cls, kv_store):
186 # need to use fake kv store during initial load for not to override
187 # our real k vstore
188 fake_kv_store = dict() # shall use more efficient mock dict
189 root = cls(root_msg_cls(), kv_store=fake_kv_store,
190 rev_cls=PersistedConfigRevision)
191 # we can install the real store now
192 root._kv_store = kv_store
193 root.load_from_persistence(root_msg_cls)
194 return root
195
196 def _make_latest(self, branch, *args, **kw):
197 super(ConfigRoot, self)._make_latest(branch, *args, **kw)
198 # only persist the committed branch
199 if self._kv_store is not None and branch._txid is None:
200 root_data = dict(
201 latest=branch._latest._hash,
202 tags=dict((k, v._hash) for k, v in self._tags.iteritems())
203 )
204 blob = dumps(root_data)
205 self._kv_store['root'] = blob
206
207 def persist_tags(self):
208 if self._kv_store is not None:
209 root_data = loads(self.kv_store['root'])
210 root_data = dict(
211 latest=root_data['latest'],
212 tags=dict((k, v._hash) for k, v in self._tags.iteritems())
213 )
214 blob = dumps(root_data)
215 self._kv_store['root'] = blob
216
217 def load_from_persistence(self, root_msg_cls):
218 self._loading = True
219 blob = self._kv_store['root']
220 root_data = loads(blob)
221
222 for tag, hash in root_data['tags'].iteritems():
223 self.load_latest(hash)
224 self._tags[tag] = self.latest
225
226 self.load_latest(root_data['latest'])
227
228 self._loading = False
229