blob: a1f242beaaf8e68539611851418659fe51b72d4c [file] [log] [blame]
Sapan Bhatia037c9472016-01-14 11:44:43 -05001import os
2import base64
3from datetime import datetime
4from xos.config import Config
5from util.logger import Logger, logging
6from observer.steps import *
7from django.db.models import F, Q
8from core.models import *
9import json
10import time
11import pdb
12import traceback
13
14logger = Logger(level=logging.INFO)
15
16def f7(seq):
17 seen = set()
18 seen_add = seen.add
19 return [ x for x in seq if not (x in seen or seen_add(x))]
20
21def elim_dups(backend_str):
22 strs = backend_str.split(' // ')
23 strs2 = f7(strs)
24 return ' // '.join(strs2)
25
26def deepgetattr(obj, attr):
27 return reduce(getattr, attr.split('.'), obj)
28
29
30class InnocuousException(Exception):
31 pass
32
33class FailedDependency(Exception):
34 pass
35
36class SyncStep(object):
37 """ An XOS Sync step.
38
39 Attributes:
40 psmodel Model name the step synchronizes
41 dependencies list of names of models that must be synchronized first if the current model depends on them
42 """
43 slow=False
44 def get_prop(self, prop):
45 try:
46 sync_config_dir = Config().sync_config_dir
47 except:
48 sync_config_dir = '/etc/xos/sync'
49 prop_config_path = '/'.join(sync_config_dir,self.name,prop)
50 return open(prop_config_path).read().rstrip()
51
52 def __init__(self, **args):
53 """Initialize a sync step
54 Keyword arguments:
55 name -- Name of the step
56 provides -- XOS models sync'd by this step
57 """
58 dependencies = []
59 self.driver = args.get('driver')
60 self.error_map = args.get('error_map')
61
62 try:
63 self.soft_deadline = int(self.get_prop('soft_deadline_seconds'))
64 except:
65 self.soft_deadline = 5 # 5 seconds
66
67 return
68
69 def fetch_pending(self, deletion=False):
70 # This is the most common implementation of fetch_pending
71 # Steps should override it if they have their own logic
72 # for figuring out what objects are outstanding.
73 main_obj = self.observes
74 if (not deletion):
75 objs = main_obj.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None),Q(lazy_blocked=False))
76 else:
77 objs = main_obj.deleted_objects.all()
78
79 return objs
80 #return Sliver.objects.filter(ip=None)
81
82 def check_dependencies(self, obj, failed):
83 for dep in self.dependencies:
84 peer_name = dep[0].lower() + dep[1:] # django names are camelCased with the first letter lower
85
86 try:
87 peer_object = deepgetattr(obj, peer_name)
88 try:
89 peer_objects = peer_object.all()
90 except AttributeError:
91 peer_objects = [peer_object]
92 except:
93 peer_objects = []
94
95 if (hasattr(obj,'controller')):
96 try:
97 peer_objects = filter(lambda o:o.controller==obj.controller, peer_objects)
98 except AttributeError:
99 pass
100
101 if (failed in peer_objects):
102 if (obj.backend_status!=failed.backend_status):
103 obj.backend_status = failed.backend_status
104 obj.save(update_fields=['backend_status'])
105 raise FailedDependency("Failed dependency for %s:%s peer %s:%s failed %s:%s" % (obj.__class__.__name__, str(getattr(obj,"pk","no_pk")), peer_object.__class__.__name__, str(getattr(peer_object,"pk","no_pk")), failed.__class__.__name__, str(getattr(failed,"pk","no_pk"))))
106
107 def call(self, failed=[], deletion=False):
108 pending = self.fetch_pending(deletion)
109 for o in pending:
110 sync_failed = False
111 try:
112 backoff_disabled = Config().observer_backoff_disabled
113 except:
114 backoff_disabled = 0
115
116 try:
117 scratchpad = json.loads(o.backend_register)
118 if (scratchpad):
119 next_run = scratchpad['next_run']
120 if (not backoff_disabled and next_run>time.time()):
121 sync_failed = True
122 except:
123 logger.log_exc("Exception while loading scratchpad")
124 pass
125
126 if (not sync_failed):
127 try:
128 for f in failed:
129 self.check_dependencies(o,f) # Raises exception if failed
130 if (deletion):
131 self.delete_record(o)
132 o.delete(purge=True)
133 else:
134 self.sync_record(o)
135 o.enacted = datetime.now() # Is this the same timezone? XXX
136 scratchpad = {'next_run':0, 'exponent':0}
137 o.backend_register = json.dumps(scratchpad)
138 o.backend_status = "1 - OK"
139 o.save(update_fields=['enacted','backend_status','backend_register'])
140 except (InnocuousException,Exception) as e:
141 logger.log_exc("sync step failed!")
142 try:
143 if (o.backend_status.startswith('2 - ')):
144 str_e = '%s // %r'%(o.backend_status[4:],e)
145 str_e = elim_dups(str_e)
146 else:
147 str_e = '%r'%e
148 except:
149 str_e = '%r'%e
150
151 try:
152 error = self.error_map.map(str_e)
153 except:
154 error = '2 - %s'%str_e
155
156 if isinstance(e, InnocuousException) and not force_error:
157 o.backend_status = '1 - %s'%error
158 else:
159 o.backend_status = '3 - %s'%error
160
161 try:
162 scratchpad = json.loads(o.backend_register)
163 scratchpad['exponent']
164 except:
165 logger.log_exc("Exception while updating scratchpad")
166 scratchpad = {'next_run':0, 'exponent':0}
167
168 # Second failure
169 if (scratchpad['exponent']):
170 delay = scratchpad['exponent'] * 600 # 10 minutes
171 if (delay<1440):
172 delay = 1440
173 scratchpad['next_run'] = time.time() + delay
174
175 scratchpad['exponent']+=1
176
177 o.backend_register = json.dumps(scratchpad)
178
179 # TOFIX:
180 # DatabaseError: value too long for type character varying(140)
181 if (o.pk):
182 try:
183 o.backend_status = o.backend_status[:1024]
184 o.save(update_fields=['backend_status','backend_register','updated'])
185 except:
186 print "Could not update backend status field!"
187 pass
188 sync_failed = True
189
190
191 if (sync_failed):
192 failed.append(o)
193
194 return failed
195
196 def sync_record(self, o):
197 return
198
199 def delete_record(self, o):
200 return
201
202 def __call__(self, **args):
203 return self.call(**args)