blob: 07abb0bb21d45ada42bcedebb99ec93d3716940d [file] [log] [blame]
Sapan Bhatia037c9472016-01-14 11:44:43 -05001import os
2import base64
3from datetime import datetime
4from xos.config import Config
Scott Baker3f417a82016-01-14 16:07:32 -08005from xos.logger import Logger, logging
Sapan Bhatiaf0538b82016-01-15 11:05:52 -05006from synchronizers.base.steps import *
Sapan Bhatia037c9472016-01-14 11:44:43 -05007from django.db.models import F, Q
Sapan Bhatia80b83cb2016-05-06 15:20:13 -04008from django.utils import timezone
Sapan Bhatia037c9472016-01-14 11:44:43 -05009from core.models import *
10from django.db import reset_queries
11import json
12import time
13import pdb
14import traceback
15
16logger = Logger(level=logging.INFO)
17
18def f7(seq):
19 seen = set()
20 seen_add = seen.add
21 return [ x for x in seq if not (x in seen or seen_add(x))]
22
23def elim_dups(backend_str):
24 strs = backend_str.split('/')
25 strs = map(lambda x:x.split('(')[0],strs)
26 strs2 = f7(strs)
27 return '/'.join(strs2)
28
29def deepgetattr(obj, attr):
30 return reduce(getattr, attr.split('.'), obj)
31
32
33class InnocuousException(Exception):
34 pass
35
36class FailedDependency(Exception):
37 pass
38
39class SyncStep(object):
40 """ An XOS Sync step.
41
42 Attributes:
43 psmodel Model name the step synchronizes
44 dependencies list of names of models that must be synchronized first if the current model depends on them
45 """
46 slow=False
47 def get_prop(prop):
48 try:
49 sync_config_dir = Config().sync_config_dir
50 except:
51 sync_config_dir = '/etc/xos/sync'
52 prop_config_path = '/'.join(sync_config_dir,self.name,prop)
53 return open(prop_config_path).read().rstrip()
54
55 def __init__(self, **args):
56 """Initialize a sync step
57 Keyword arguments:
58 name -- Name of the step
59 provides -- XOS models sync'd by this step
60 """
61 dependencies = []
62 self.driver = args.get('driver')
63 self.error_map = args.get('error_map')
64
65 try:
66 self.soft_deadline = int(self.get_prop('soft_deadline_seconds'))
67 except:
68 self.soft_deadline = 5 # 5 seconds
69
70 return
71
72 def fetch_pending(self, deletion=False):
73 # This is the most common implementation of fetch_pending
74 # Steps should override it if they have their own logic
75 # for figuring out what objects are outstanding.
76 main_obj = self.observes
77 if (not deletion):
78 objs = main_obj.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None),Q(lazy_blocked=False))
79 else:
80 objs = main_obj.deleted_objects.all()
81
82 return objs
83 #return Sliver.objects.filter(ip=None)
84
85 def check_dependencies(self, obj, failed):
86 for dep in self.dependencies:
87 peer_name = dep[0].lower() + dep[1:] # django names are camelCased with the first letter lower
88
89 try:
90 peer_object = deepgetattr(obj, peer_name)
91 try:
92 peer_objects = peer_object.all()
93 except AttributeError:
94 peer_objects = [peer_object]
95 except:
96 peer_objects = []
97
98 if (hasattr(obj,'controller')):
99 try:
100 peer_objects = filter(lambda o:o.controller==obj.controller, peer_objects)
101 except AttributeError:
102 pass
103
104 if (failed in peer_objects):
105 if (obj.backend_status!=failed.backend_status):
106 obj.backend_status = failed.backend_status
107 obj.save(update_fields=['backend_status'])
108 raise FailedDependency("Failed dependency for %s:%s peer %s:%s failed %s:%s" % (obj.__class__.__name__, str(getattr(obj,"pk","no_pk")), peer_object.__class__.__name__, str(getattr(peer_object,"pk","no_pk")), failed.__class__.__name__, str(getattr(failed,"pk","no_pk"))))
109
110 def call(self, failed=[], deletion=False):
111 pending = self.fetch_pending(deletion)
112 for o in pending:
113 # another spot to clean up debug state
114 try:
115 reset_queries()
116 except:
117 # this shouldn't happen, but in case it does, catch it...
Sapan Bhatia91c497e2016-04-06 19:04:05 +0200118 logger.log_exc("exception in reset_queries",extra=o.tologdict())
Sapan Bhatia037c9472016-01-14 11:44:43 -0500119
120 sync_failed = False
121 try:
122 backoff_disabled = Config().observer_backoff_disabled
123 except:
124 backoff_disabled = 0
125
126 try:
127 scratchpad = json.loads(o.backend_register)
128 if (scratchpad):
129 next_run = scratchpad['next_run']
130 if (not backoff_disabled and next_run>time.time()):
131 sync_failed = True
132 except:
Sapan Bhatia91c497e2016-04-06 19:04:05 +0200133 logger.log_exc("Exception while loading scratchpad",extra=o.tologdict())
Sapan Bhatia037c9472016-01-14 11:44:43 -0500134 pass
135
136 if (not sync_failed):
137 try:
138 for f in failed:
139 self.check_dependencies(o,f) # Raises exception if failed
140 if (deletion):
141 self.delete_record(o)
142 o.delete(purge=True)
143 else:
144 self.sync_record(o)
Sapan Bhatia80b83cb2016-05-06 15:20:13 -0400145 o.enacted = timezone.now() # Is this the same timezone? XXX
Sapan Bhatia037c9472016-01-14 11:44:43 -0500146 scratchpad = {'next_run':0, 'exponent':0}
147 o.backend_register = json.dumps(scratchpad)
148 o.backend_status = "1 - OK"
149 o.save(update_fields=['enacted','backend_status','backend_register'])
150 except (InnocuousException,Exception) as e:
Sapan Bhatia91c497e2016-04-06 19:04:05 +0200151 logger.log_exc("Syncstep caught exception",extra=o.tologdict())
Sapan Bhatia037c9472016-01-14 11:44:43 -0500152
153 force_error = False
154 try:
155 if (o.backend_status.startswith('2 - ')):
156 force_error = False # Already in error state
157 str_e = '%s/%s'%(o.backend_status[4:],str(e))
158 str_e = elim_dups(str_e)
159 else:
160 str_e = str(e)
161 except:
162 str_e = str(e)
163
164 if (not str_e):
165 str_e = 'Unknown'
166
167 try:
168 error = self.error_map.map(str_e)
169 except:
170 error = str_e
171
172 if isinstance(e, InnocuousException) and not force_error:
173 o.backend_status = '1 - %s'%error
174 else:
175 o.backend_status = '2 - %s'%error
176
177 cmd = 'wget -O /dev/null -q "http://xoslnprof.appspot.com/command?action=pushlog&node=1&log_path=/%s/%s"'%(self.__class__.__name__,error)
178 os.system(cmd)
179
180 try:
181 scratchpad = json.loads(o.backend_register)
182 scratchpad['exponent']
183 except:
Sapan Bhatia91c497e2016-04-06 19:04:05 +0200184 logger.log_exc("Exception while updating scratchpad",extra=o.tologdict())
Sapan Bhatia037c9472016-01-14 11:44:43 -0500185 scratchpad = {'next_run':0, 'exponent':0}
186
187 # Second failure
188 if (scratchpad['exponent']):
189 delay = scratchpad['exponent'] * 600 # 10 minutes
190 if (delay<1440):
191 delay = 1440
192 scratchpad['next_run'] = time.time() + delay
193
194 scratchpad['exponent']+=1
195
196 o.backend_register = json.dumps(scratchpad)
197
198 # TOFIX:
199 # DatabaseError: value too long for type character varying(140)
200 if (o.pk):
201 try:
202 o.backend_status = o.backend_status[:1024]
203 o.save(update_fields=['backend_status','backend_register','updated'])
204 except:
205 print "Could not update backend status field!"
206 pass
207 sync_failed = True
208
209
210 if (sync_failed):
211 failed.append(o)
212
213 return failed
214
215 def sync_record(self, o):
216 return
217
218 def delete_record(self, o):
219 return
220
221 def __call__(self, **args):
Sapan Bhatia91c497e2016-04-06 19:04:05 +0200222 return self.call(**args)