[SEBA-450] (part 2)
Add tox testing support on additional XOS library modules:
- xos-api
- xos-kafka (has no tests)
- xos-migrate (has no tests)
- xos-synchronizer
Change-Id: I98195bc9747971d3515882d517affe058dd86ac5
diff --git a/lib/xos-synchronizer/xossynchronizer/event_loop.py b/lib/xos-synchronizer/xossynchronizer/event_loop.py
index 5cbfda9..15f005c 100644
--- a/lib/xos-synchronizer/xossynchronizer/event_loop.py
+++ b/lib/xos-synchronizer/xossynchronizer/event_loop.py
@@ -16,23 +16,28 @@
# Add unit tests:
# - 2 sets of Instance, ControllerSlice, ControllerNetworks - delete and create case
-import time
-import threading
-import json
+from __future__ import absolute_import
+import json
+import threading
+import time
from collections import defaultdict
+
+from multistructlog import create_logger
from networkx import (
DiGraph,
- weakly_connected_component_subgraphs,
- all_shortest_paths,
NetworkXNoPath,
+ all_shortest_paths,
+ weakly_connected_component_subgraphs,
)
from networkx.algorithms.dag import topological_sort
-
-from xossynchronizer.steps.syncstep import InnocuousException, DeferredException, SyncStep
-
from xosconfig import Config
-from multistructlog import create_logger
+from xossynchronizer.steps.syncstep import (
+ DeferredException,
+ InnocuousException,
+ SyncStep,
+)
+from six.moves import range
log = create_logger(Config().get("logging"))
@@ -113,10 +118,12 @@
# src_port is the field that accesses Model2 from Model1
# dst_port is the field that accesses Model1 from Model2
static_dependencies = json.loads(dep_graph_str)
- dynamic_dependencies = [] # Dropped Service and ServiceInstance dynamic dependencies
+ dynamic_dependencies = (
+ []
+ ) # Dropped Service and ServiceInstance dynamic dependencies
joint_dependencies = dict(
- static_dependencies.items() + dynamic_dependencies
+ list(static_dependencies.items()) + dynamic_dependencies
)
model_dependency_graph = DiGraph()
@@ -267,7 +274,6 @@
def handle_sync_exception(self, o, e):
self.log.exception("sync step failed!", e=e, **o.tologdict())
- current_code = o.backend_code
if hasattr(e, "message"):
status = str(e.message)
@@ -359,7 +365,6 @@
sc_log = self.log.new(thread_id=threading.current_thread().ident)
try:
- start_time = time.time()
sc_log.debug("Starting to work on cohort", cohort=cohort, deletion=deletion)
cohort_emptied = False
@@ -417,14 +422,13 @@
self.run_once()
def fetch_pending(self, deletion=False):
- unique_model_list = list(set(self.model_to_step.keys()))
pending_objects = []
pending_steps = []
- step_list = self.step_lookup.values()
+ step_list = list(self.step_lookup.values())
for e in self.external_dependencies:
s = SyncStep
- if isinstance(e,str):
+ if isinstance(e, str):
# external dependency is a string that names a model class
s.observes = self.model_accessor.get_model_class(e)
else:
@@ -439,7 +443,9 @@
if not hasattr(step, "call"):
pending = step.fetch_pending(deletion)
for obj in pending:
- step = step_class(driver=self.driver, model_accessor=self.model_accessor)
+ step = step_class(
+ driver=self.driver, model_accessor=self.model_accessor
+ )
step.log = self.log.new(step=step)
obj.synchronizer_step = step
@@ -460,7 +466,7 @@
if o is None:
return [], None
try:
- o_lst = [o for o in o.all()]
+ o_lst = [oa for oa in o.all()]
edge_type = PROXY_EDGE
except (AttributeError, TypeError):
o_lst = [o]
@@ -606,10 +612,8 @@
"""
def compute_dependent_cohorts(self, objects, deletion):
- model_map = defaultdict(list)
n = len(objects)
- r = range(n)
- indexed_objects = zip(r, objects)
+ r = list(range(n))
oG = DiGraph()
@@ -652,8 +656,6 @@
# Why are we checking the DB connection here?
self.model_accessor.check_db_connection_okay()
- loop_start = time.time()
-
# Two passes. One for sync, the other for deletion.
for deletion in (False, True):
objects_to_process = []
@@ -693,8 +695,6 @@
except Exception as e:
self.log.exception("Legacy step failed", step=step, e=e)
- loop_end = time.time()
-
except Exception as e:
self.log.exception(
"Core error. This seems like a misconfiguration or bug. This error will not be relayed to the user!",