blob: bdab8f32f2fba29f13be8627c0eb205f19b9b6a1 [file] [log] [blame]
Sapan Bhatiafe16ae42016-01-14 11:44:43 -05001import os
2import base64
3from datetime import datetime
4from xos.config import Config
Scott Bakerf154cc22016-01-14 16:07:32 -08005from xos.logger import Logger, logging
Sapan Bhatia003e84c2016-01-15 11:05:52 -05006from synchronizers.base.steps import *
Sapan Bhatiafe16ae42016-01-14 11:44:43 -05007from django.db.models import F, Q
8from core.models import *
9from django.db import reset_queries
Sapan Bhatia003e84c2016-01-15 11:05:52 -050010from synchronizers.base.ansible import *
Sapan Bhatiaed1883d2016-01-14 14:16:33 -050011from generate.dependency_walker import *
Sapan Bhatiafe16ae42016-01-14 11:44:43 -050012
13from time import time
14import json
15import time
16import pdb
17
18logger = Logger(level=logging.INFO)
19
20def f7(seq):
21 seen = set()
22 seen_add = seen.add
23 return [ x for x in seq if not (x in seen or seen_add(x))]
24
25def elim_dups(backend_str):
26 strs = backend_str.split(' // ')
27 strs2 = f7(strs)
28 return ' // '.join(strs2)
29
30def deepgetattr(obj, attr):
31 return reduce(getattr, attr.split('.'), obj)
32
33
34class InnocuousException(Exception):
35 pass
36
37class DeferredException(Exception):
38 pass
39
40class FailedDependency(Exception):
41 pass
42
43class SyncStep(object):
44 """ An XOS Sync step.
45
46 Attributes:
47 psmodel Model name the step synchronizes
48 dependencies list of names of models that must be synchronized first if the current model depends on them
49 """
50
51 # map_sync_outputs can return this value to cause a step to be marked
52 # successful without running ansible. Used for sync_network_controllers
53 # on nat networks.
54 SYNC_WITHOUT_RUNNING = "sync_without_running"
55
56 slow=False
57 def get_prop(self, prop):
58 try:
59 sync_config_dir = Config().sync_config_dir
60 except:
61 sync_config_dir = '/etc/xos/sync'
62 prop_config_path = '/'.join(sync_config_dir,self.name,prop)
63 return open(prop_config_path).read().rstrip()
64
65 def __init__(self, **args):
66 """Initialize a sync step
67 Keyword arguments:
68 name -- Name of the step
69 provides -- XOS models sync'd by this step
70 """
71 dependencies = []
72 self.driver = args.get('driver')
73 self.error_map = args.get('error_map')
74
75 try:
76 self.soft_deadline = int(self.get_prop('soft_deadline_seconds'))
77 except:
78 self.soft_deadline = 5 # 5 seconds
79
80 return
81
82 def fetch_pending(self, deletion=False):
83 # This is the most common implementation of fetch_pending
84 # Steps should override it if they have their own logic
85 # for figuring out what objects are outstanding.
86
87 main_objs = self.observes
88 if (type(main_objs) is not list):
89 main_objs=[main_objs]
90
91 objs = []
92 for main_obj in main_objs:
93 if (not deletion):
94 lobjs = main_obj.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None),Q(lazy_blocked=False),Q(no_sync=False))
95 else:
96 lobjs = main_obj.deleted_objects.all()
97 objs.extend(lobjs)
98
99 return objs
100 #return Instance.objects.filter(ip=None)
101
102 def check_dependencies(self, obj, failed):
103 for dep in self.dependencies:
104 peer_name = dep[0].lower() + dep[1:] # django names are camelCased with the first letter lower
105
106 peer_objects=[]
107 try:
108 peer_names = plural(peer_name)
109 peer_object_list=[]
110
111 try:
112 peer_object_list.append(deepgetattr(obj, peer_name))
113 except:
114 pass
115
116 try:
117 peer_object_list.append(deepgetattr(obj, peer_names))
118 except:
119 pass
120
121 for peer_object in peer_object_list:
122 try:
123 peer_objects.extend(peer_object.all())
124 except AttributeError:
125 peer_objects.append(peer_object)
126 except:
127 peer_objects = []
128
129 if (hasattr(obj,'controller')):
130 try:
131 peer_objects = filter(lambda o:o.controller==obj.controller, peer_objects)
132 except AttributeError:
133 pass
134
135 if (failed in peer_objects):
136 if (obj.backend_status!=failed.backend_status):
137 obj.backend_status = failed.backend_status
138 obj.save(update_fields=['backend_status'])
139 raise FailedDependency("Failed dependency for %s:%s peer %s:%s failed %s:%s" % (obj.__class__.__name__, str(getattr(obj,"pk","no_pk")), peer_object.__class__.__name__, str(getattr(peer_object,"pk","no_pk")), failed.__class__.__name__, str(getattr(failed,"pk","no_pk"))))
140
141
142 def sync_record(self, o):
143 try:
144 controller = o.get_controller()
145 controller_register = json.loads(o.node.site_deployment.controller.backend_register)
146
147 if (controller_register.get('disabled',False)):
148 raise InnocuousException('Controller %s is disabled'%sliver.node.site_deployment.controller.name)
149 except AttributeError:
150 pass
151
152 tenant_fields = self.map_sync_inputs(o)
153 if tenant_fields == SyncStep.SYNC_WITHOUT_RUNNING:
154 return
155 main_objs=self.observes
156 if (type(main_objs) is list):
157 main_objs=main_objs[0]
158
159 path = ''.join(main_objs.__name__).lower()
160 res = run_template(self.playbook,tenant_fields,path=path)
161
162 try:
163 self.map_sync_outputs(o,res)
164 except AttributeError:
165 pass
166
167 def delete_record(self, o):
168 try:
169 controller = o.get_controller()
170 controller_register = json.loads(o.node.site_deployment.controller.backend_register)
171
172 if (controller_register.get('disabled',False)):
173 raise InnocuousException('Controller %s is disabled'%sliver.node.site_deployment.controller.name)
174 except AttributeError:
175 pass
176
177 tenant_fields = self.map_delete_inputs(o)
178
179 main_objs=self.observes
180 if (type(main_objs) is list):
181 main_objs=main_objs[0]
182
183 path = ''.join(main_objs.__name__).lower()
184
185 tenant_fields['delete']=True
186 res = run_template(self.playbook,tenant_fields,path=path)
187 try:
188 self.map_delete_outputs(o,res)
189 except AttributeError:
190 pass
191
192 def call(self, failed=[], deletion=False):
193 #if ('Instance' in self.__class__.__name__):
194 # pdb.set_trace()
195
196 pending = self.fetch_pending(deletion)
197
198 for o in pending:
199 # another spot to clean up debug state
200 try:
201 reset_queries()
202 except:
203 # this shouldn't happen, but in case it does, catch it...
204 logger.log_exc("exception in reset_queries")
205
206 sync_failed = False
207 try:
208 backoff_disabled = Config().observer_backoff_disabled
209 except:
210 backoff_disabled = 0
211
212 try:
213 scratchpad = json.loads(o.backend_register)
214 if (scratchpad):
215 next_run = scratchpad['next_run']
216 if (not backoff_disabled and next_run>time.time()):
217 sync_failed = True
218 except:
219 logger.log_exc("Exception while loading scratchpad")
220 pass
221
222 if (not sync_failed):
223 try:
224 for f in failed:
225 self.check_dependencies(o,f) # Raises exception if failed
226 if (deletion):
227 self.delete_record(o)
228 o.delete(purge=True)
229 else:
230 self.sync_record(o)
231 o.enacted = datetime.now() # Is this the same timezone? XXX
232 scratchpad = {'next_run':0, 'exponent':0, 'last_success':time.time()}
233 o.backend_register = json.dumps(scratchpad)
234 o.backend_status = "1 - OK"
235 o.save(update_fields=['enacted','backend_status','backend_register'])
236 except (InnocuousException,Exception,DeferredException) as e:
237 logger.log_exc("sync step failed!")
238 try:
239 if (o.backend_status.startswith('2 - ')):
240 str_e = '%s // %r'%(o.backend_status[4:],e)
241 str_e = elim_dups(str_e)
242 else:
243 str_e = '%r'%e
244 except:
245 str_e = '%r'%e
246
247 try:
248 error = self.error_map.map(str_e)
249 except:
250 error = '%s'%str_e
251
252 if isinstance(e, InnocuousException) and not force_error:
253 o.backend_status = '1 - %s'%error
254 else:
255 o.backend_status = '2 - %s'%error
256
257 try:
258 scratchpad = json.loads(o.backend_register)
259 scratchpad['exponent']
260 except:
261 logger.log_exc("Exception while updating scratchpad")
262 scratchpad = {'next_run':0, 'exponent':0, 'last_success':time.time(),'failures':0}
263
264 # Second failure
265 if (scratchpad['exponent']):
266 if isinstance(e,DeferredException):
267 delay = scratchpad['exponent'] * 60 # 1 minute
268 else:
269 delay = scratchpad['exponent'] * 600 # 10 minutes
270 # cap delays at 8 hours
271 if (delay>8*60*60):
272 delay=8*60*60
273 scratchpad['next_run'] = time.time() + delay
274
275 try:
276 scratchpad['exponent']+=1
277 except:
278 scratchpad['exponent']=1
279
280 try:
281 scratchpad['failures']+=1
282 except KeyError:
283 scratchpad['failures']=1
284
285 scratchpad['last_failure']=time.time()
286
287 o.backend_register = json.dumps(scratchpad)
288
289 # TOFIX:
290 # DatabaseError: value too long for type character varying(140)
291 if (o.pk):
292 try:
293 o.backend_status = o.backend_status[:1024]
294 o.save(update_fields=['backend_status','backend_register','updated'])
295 except:
296 print "Could not update backend status field!"
297 pass
298 sync_failed = True
299
300
301 if (sync_failed):
302 failed.append(o)
303
304 return failed
305
306 def __call__(self, **args):
307 return self.call(**args)