blob: b0587f53cc669af777afcbb32e67a3702f826e84 [file] [log] [blame]
Scott Baker4c116932015-03-21 15:04:30 -07001"""
2 hpc_watcher.py
3
4 Daemon to watch the health of HPC and RR slivers.
5
6 This deamon uses HpcHealthCheck objects in the Data Model to conduct
7 periodic tests of HPC and RR nodes. Two types of Health Checks are
8 supported:
9
10 kind="dns": checks the request routers to make sure that a DNS
11 name is resolveable and returns the right kind of records.
12
13 resource_name should be set to the domain name to lookup.
14
Scott Bakera4f30792015-03-31 21:41:31 -070015 result_contains is option and can be used to hold "A", "CNAME", or
16 a particular address or hostname that should be contained in the
17 query's answer.
18
Scott Baker4c116932015-03-21 15:04:30 -070019 kind="http": checks the hpc nodes to make sure that a URL can be
20 retrieved from the node.
21
22 resource_name should be set to the HostName:Url to fetch. For
23 example, cdn-stream.htm.fiu.edu:/hft2441/intro.mp4
24
25 In addition to the above, HPC heartbeat probes are conducted, similar to
26 the ones that dnsredir conducts.
27
28 The results of health checks are stored in a tag attached to the Sliver
29 the healthcheck was conducted against. If all healthchecks of a particular
30 variety were successful for a sliver, then "success" will be stored in
31 the tag. Otherwise, the first healthcheck to fail will be stored in the
32 tag.
33"""
34
Scott Baker4df05ee2015-03-16 16:43:51 -070035import os
Scott Baker67cdef42015-04-13 17:24:01 -070036import socket
Scott Baker4df05ee2015-03-16 16:43:51 -070037import sys
38sys.path.append("/opt/xos")
39os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
40import django
41from django.contrib.contenttypes.models import ContentType
42from core.models import *
43from hpc.models import *
44from requestrouter.models import *
45django.setup()
46import time
Scott Baker06e87ec2015-03-16 22:52:13 -070047import pycurl
48import traceback
49from StringIO import StringIO
Scott Baker4df05ee2015-03-16 16:43:51 -070050
51from dnslib.dns import DNSRecord,DNSHeader,DNSQuestion,QTYPE
52from dnslib.digparser import DigParser
53
54from threading import Thread, Condition
55
Scott Baker6230e952015-03-18 23:21:04 -070056"""
57from dnslib import *
58q = DNSRecord(q=DNSQuestion("cdn-stream.htm.fiu.edu"))
59a_pkt = q.send("150.135.65.10", tcp=False, timeout=10)
60a = DNSRecord.parse(a_pkt)
61
62from dnslib import *
63q = DNSRecord(q=DNSQuestion("onlab.vicci.org"))
64a_pkt = q.send("150.135.65.10", tcp=False, timeout=10)
65a = DNSRecord.parse(a_pkt)
66"""
67
Scott Baker4df05ee2015-03-16 16:43:51 -070068class WorkQueue:
69 def __init__(self):
70 self.job_cv = Condition()
71 self.jobs = []
72 self.result_cv = Condition()
73 self.results = []
74 self.outstanding = 0
75
76 def get_job(self):
77 self.job_cv.acquire()
78 while not self.jobs:
79 self.job_cv.wait()
80 result = self.jobs.pop()
81 self.job_cv.release()
82 return result
83
84 def submit_job(self, job):
85 self.job_cv.acquire()
86 self.jobs.append(job)
87 self.job_cv.notify()
88 self.job_cv.release()
89 self.outstanding = self.outstanding + 1
90
91 def get_result(self):
92 self.result_cv.acquire()
93 while not self.results:
94 self.result_cv.wait()
95 result = self.results.pop()
96 self.result_cv.release()
97 self.outstanding = self.outstanding - 1
98 return result
99
100 def submit_result(self, result):
101 self.result_cv.acquire()
102 self.results.append(result)
103 self.result_cv.notify()
104 self.result_cv.release()
105
106class DnsResolver(Thread):
107 def __init__(self, queue):
108 Thread.__init__(self)
109 self.queue = queue
110 self.daemon = True
111 self.start()
112
113 def run(self):
114 while True:
115 job = self.queue.get_job()
116 self.handle_job(job)
117 self.queue.submit_result(job)
118
119 def handle_job(self, job):
120 domain = job["domain"]
121 server = job["server"]
122 port = job["port"]
Scott Baker6230e952015-03-18 23:21:04 -0700123 result_contains = job.get("result_contains", None)
Scott Baker4df05ee2015-03-16 16:43:51 -0700124
125 try:
Scott Baker6230e952015-03-18 23:21:04 -0700126 q = DNSRecord(q=DNSQuestion(domain)) #, getattr(QTYPE,"A")))
Scott Baker4df05ee2015-03-16 16:43:51 -0700127
128 a_pkt = q.send(server, port, tcp=False, timeout=10)
129 a = DNSRecord.parse(a_pkt)
130
Scott Baker6230e952015-03-18 23:21:04 -0700131 found_record = False
132 for record in a.rr:
133 if (not result_contains):
134 if ((record.rtype==QTYPE_A) or (record.qtype==QTYPE_CNAME)):
135 found_record = True
136 else:
137 tmp = QTYPE.get(record.rtype) + str(record.rdata)
138 if (result_contains in tmp):
139 found_record = True
Scott Baker4df05ee2015-03-16 16:43:51 -0700140
Scott Baker6230e952015-03-18 23:21:04 -0700141 if not found_record:
142 if result_contains:
143 job["status"] = "%s,No %s records" % (domain, result_contains)
144 else:
145 job["status"] = "%s,No A or CNAME records" % domain
146
Scott Baker4df05ee2015-03-16 16:43:51 -0700147 return
148
149 except Exception, e:
150 job["status"] = "%s,Exception: %s" % (domain, str(e))
151 return
152
153 job["status"] = "success"
154
Scott Baker06e87ec2015-03-16 22:52:13 -0700155class HpcHeartbeat(Thread):
156 def __init__(self, queue):
157 Thread.__init__(self)
158 self.queue = queue
159 self.daemon = True
160 self.start()
161
162 def run(self):
163 while True:
164 job = self.queue.get_job()
165 self.handle_job(job)
166 self.queue.submit_result(job)
167
168 def curl_error_message(self, e):
169 if e.args[0] == 6:
170 return "couldn't resolve host"
171 if e.args[0] == 7:
172 return "failed to connect"
173 return "curl error %d" % e.args[0]
174
175 def handle_job(self, job):
176 server = job["server"]
177 port = job["port"]
178
179 try:
180 buffer = StringIO()
181 c = pycurl.Curl()
182
183 c.setopt(c.URL, "http://%s:%s/heartbeat" % (server, port))
184 c.setopt(c.WRITEDATA, buffer)
185 c.setopt(c.HTTPHEADER, ['host: hpc-heartbeat', 'X-heartbeat: 1'])
186 c.setopt(c.TIMEOUT, 10)
187 c.setopt(c.CONNECTTIMEOUT, 10)
188 c.setopt(c.NOSIGNAL, 1)
189
190 try:
191 c.perform()
192 response_code = c.getinfo(c.RESPONSE_CODE)
193 except Exception, e:
194 #traceback.print_exc()
195 job["status"] = self.curl_error_message(e)
196 return
197 finally:
198 c.close()
199
200 if response_code != 200:
Scott Baker4c116932015-03-21 15:04:30 -0700201 job["status"] = "error response %d" % response_code
Scott Baker06e87ec2015-03-16 22:52:13 -0700202 return
203
204 except Exception, e:
205 job["status"] = "Exception: %s" % str(e)
206 return
207
208 job["status"] = "success"
209
Scott Baker4c116932015-03-21 15:04:30 -0700210class HpcFetchUrl(Thread):
211 def __init__(self, queue):
212 Thread.__init__(self)
213 self.queue = queue
214 self.daemon = True
215 self.start()
Scott Baker4df05ee2015-03-16 16:43:51 -0700216
Scott Baker4c116932015-03-21 15:04:30 -0700217 def run(self):
218 while True:
219 job = self.queue.get_job()
220 self.handle_job(job)
221 self.queue.submit_result(job)
222
223 def curl_error_message(self, e):
224 if e.args[0] == 6:
225 return "couldn't resolve host"
226 if e.args[0] == 7:
227 return "failed to connect"
228 return "curl error %d" % e.args[0]
229
230 def handle_job(self, job):
231 server = job["server"]
232 port = job["port"]
233 url = job["url"]
234 domain = job["domain"]
235
236 def progress(download_t, download_d, upload_t, upload_d):
237 # limit download size to a megabyte
238 if (download_d > 1024*1024):
239 return 1
240 else:
241 return 0
242
243 try:
244 buffer = StringIO()
245 c = pycurl.Curl()
246
247 c.setopt(c.URL, "http://%s:%s/%s" % (server, port, url))
248 c.setopt(c.WRITEDATA, buffer)
249 c.setopt(c.HTTPHEADER, ['host: ' + domain])
250 c.setopt(c.TIMEOUT, 10)
251 c.setopt(c.CONNECTTIMEOUT, 10)
252 c.setopt(c.NOSIGNAL, 1)
253 c.setopt(c.NOPROGRESS, 0)
254 c.setopt(c.PROGRESSFUNCTION, progress)
255
256 try:
257 try:
258 c.perform()
259 except Exception, e:
260 # prevent callback abort from raising exception
261 if (e.args[0] != pycurl.E_ABORTED_BY_CALLBACK):
262 raise
263 response_code = c.getinfo(c.RESPONSE_CODE)
264 bytes_downloaded = int(c.getinfo(c.SIZE_DOWNLOAD))
265 except Exception, e:
266 #traceback.print_exc()
267 job["status"] = self.curl_error_message(e)
268 return
269 finally:
270 c.close()
271
272 if response_code != 200:
273 job["status"] = "error response %s" % str(response_code)
274 return
275
276 except Exception, e:
277 job["status"] = "Exception: %s" % str(e)
278 return
279
280 job["status"] = "success"
281
282class BaseWatcher(Thread):
283 def __init__(self):
284 Thread.__init__(self)
285 self.daemon = True
Scott Baker06e87ec2015-03-16 22:52:13 -0700286
Scott Baker4df05ee2015-03-16 16:43:51 -0700287 def set_status(self, sliver, service, kind, msg):
Scott Baker06e87ec2015-03-16 22:52:13 -0700288 #print sliver.node.name, kind, msg
Scott Baker4df05ee2015-03-16 16:43:51 -0700289 sliver.has_error = (msg!="success")
290
291 sliver_type = ContentType.objects.get_for_model(sliver)
292
293 t = Tag.objects.filter(service=service, name=kind+".msg", content_type__pk=sliver_type.id, object_id=sliver.id)
294 if t:
295 t=t[0]
296 if (t.value != msg):
297 t.value = msg
298 t.save()
299 else:
300 Tag(service=service, name=kind+".msg", content_object = sliver, value=msg).save()
301
302 t = Tag.objects.filter(service=service, name=kind+".time", content_type__pk=sliver_type.id, object_id=sliver.id)
303 if t:
304 t=t[0]
305 t.value = str(time.time())
306 t.save()
307 else:
308 Tag(service=service, name=kind+".time", content_object = sliver, value=str(time.time())).save()
309
Scott Baker4c116932015-03-21 15:04:30 -0700310 def get_service_slices(self, service, kind):
311 try:
312 slices = service.slices.all()
313 except:
314 # buggy data model
315 slices = service.service.all()
316
317 return [x for x in slices if (kind in x.name)]
318
319class RRWatcher(BaseWatcher):
320 def __init__(self):
321 BaseWatcher.__init__(self)
322
323 self.resolver_queue = WorkQueue()
324 for i in range(0,10):
325 DnsResolver(queue = self.resolver_queue)
326
Scott Baker4df05ee2015-03-16 16:43:51 -0700327 def check_request_routers(self, service, slivers):
328 for sliver in slivers:
329 sliver.has_error = False
330
Scott Baker7d7d8cd2015-03-16 17:13:58 -0700331 ip = sliver.get_public_ip()
Scott Baker4df05ee2015-03-16 16:43:51 -0700332 if not ip:
Scott Baker67cdef42015-04-13 17:24:01 -0700333 ip = socket.gethostbyname(sliver.node.name)
334
335 #if not ip:
336 # self.set_status(sliver, service, "watcher.DNS", "no public IP")
337 # continue
Scott Baker4df05ee2015-03-16 16:43:51 -0700338
Scott Baker6230e952015-03-18 23:21:04 -0700339 checks = HpcHealthCheck.objects.filter(kind="dns")
340 if not checks:
341 self.set_status(sliver, service, "watcher.DNS", "no DNS HealthCheck tests configured")
342
343 for check in checks:
344 self.resolver_queue.submit_job({"domain": check.resource_name, "server": ip, "port": 53, "sliver": sliver, "result_contains": check.result_contains})
Scott Baker4df05ee2015-03-16 16:43:51 -0700345
Scott Baker4df05ee2015-03-16 16:43:51 -0700346 while self.resolver_queue.outstanding > 0:
347 result = self.resolver_queue.get_result()
348 sliver = result["sliver"]
349 if (result["status"]!="success") and (not sliver.has_error):
350 self.set_status(sliver, service, "watcher.DNS", result["status"])
351
352 for sliver in slivers:
353 if not sliver.has_error:
354 self.set_status(sliver, service, "watcher.DNS", "success")
355
Scott Baker4c116932015-03-21 15:04:30 -0700356 def run_once(self):
357 for hpcService in HpcService.objects.all():
358 for slice in self.get_service_slices(hpcService, "dnsdemux"):
359 self.check_request_routers(hpcService, slice.slivers.all())
360
361 for rrService in RequestRouterService.objects.all():
362 for slice in self.get_service_slices(rrService, "dnsdemux"):
363 self.check_request_routers(rrService, slice.slivers.all())
364
365 def run(self):
366 while True:
367 self.run_once()
368 time.sleep(10)
369
370class HpcProber(BaseWatcher):
371 def __init__(self):
372 BaseWatcher.__init__(self)
373
374 self.heartbeat_queue = WorkQueue()
375 for i in range(0, 10):
376 HpcHeartbeat(queue = self.heartbeat_queue)
377
Scott Baker06e87ec2015-03-16 22:52:13 -0700378 def probe_hpc(self, service, slivers):
379 for sliver in slivers:
380 sliver.has_error = False
381
382 self.heartbeat_queue.submit_job({"server": sliver.node.name, "port": 8009, "sliver": sliver})
383
384 while self.heartbeat_queue.outstanding > 0:
385 result = self.heartbeat_queue.get_result()
386 sliver = result["sliver"]
387 if (result["status"]!="success") and (not sliver.has_error):
388 self.set_status(sliver, service, "watcher.HPC-hb", result["status"])
389
390 for sliver in slivers:
391 if not sliver.has_error:
392 self.set_status(sliver, service, "watcher.HPC-hb", "success")
393
Scott Baker4df05ee2015-03-16 16:43:51 -0700394 def run_once(self):
395 for hpcService in HpcService.objects.all():
Scott Baker06e87ec2015-03-16 22:52:13 -0700396 for slice in self.get_service_slices(hpcService, "hpc"):
397 self.probe_hpc(hpcService, slice.slivers.all())
398
Scott Baker4c116932015-03-21 15:04:30 -0700399 def run(self):
400 while True:
401 self.run_once()
402 time.sleep(10)
403
404class HpcFetcher(BaseWatcher):
405 def __init__(self):
406 BaseWatcher.__init__(self)
407
408 self.fetch_queue = WorkQueue()
409 for i in range(0, 10):
410 HpcFetchUrl(queue = self.fetch_queue)
411
412 def fetch_hpc(self, service, slivers):
413 for sliver in slivers:
414 sliver.has_error = False
415
416 checks = HpcHealthCheck.objects.filter(kind="http")
417 if not checks:
418 self.set_status(sliver, service, "watcher.HPC-fetch", "no HTTP HealthCheck tests configured")
419
420 for check in checks:
421 if (not check.resource_name) or (":" not in check.resource_name):
422 self.set_status(sliver, service, "watcher.HPC-fetch", "malformed resource_name: " + str(check.resource_name))
423 break
424
425 (domain, url) = check.resource_name.split(":",1)
426
427 self.fetch_queue.submit_job({"server": sliver.node.name, "port": 80, "sliver": sliver, "domain": domain, "url": url})
428
429 while self.fetch_queue.outstanding > 0:
430 result = self.fetch_queue.get_result()
431 sliver = result["sliver"]
432 if (result["status"]!="success") and (not sliver.has_error):
433 self.set_status(sliver, service, "watcher.HPC-fetch", result["status"])
434
435 for sliver in slivers:
436 if not sliver.has_error:
437 self.set_status(sliver, service, "watcher.HPC-fetch", "success")
438
439 def run_once(self):
440 for hpcService in HpcService.objects.all():
441 for slice in self.get_service_slices(hpcService, "hpc"):
442 self.fetch_hpc(hpcService, slice.slivers.all())
443
444 def run(self):
Scott Baker06e87ec2015-03-16 22:52:13 -0700445 while True:
446 self.run_once()
447 time.sleep(10)
Scott Baker4df05ee2015-03-16 16:43:51 -0700448
449if __name__ == "__main__":
Scott Baker06e87ec2015-03-16 22:52:13 -0700450 if "--once" in sys.argv:
Scott Baker4c116932015-03-21 15:04:30 -0700451 RRWatcher().run_once()
452 HpcProber().run_once()
453 HpcFetcher().run_once()
Scott Baker06e87ec2015-03-16 22:52:13 -0700454 else:
Scott Baker4c116932015-03-21 15:04:30 -0700455 RRWatcher().start()
456 HpcProber().start()
457 HpcFetcher().start()
458
459 print "Running forever..."
460 while True:
461 time.sleep(60)
Scott Baker4df05ee2015-03-16 16:43:51 -0700462