blob: 16f1ef1678afbf1dda637a7b83017091021af3b2 [file] [log] [blame]
Scott Baker4df05ee2015-03-16 16:43:51 -07001import os
2import sys
3sys.path.append("/opt/xos")
4os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
5import django
6from django.contrib.contenttypes.models import ContentType
7from core.models import *
8from hpc.models import *
9from requestrouter.models import *
10django.setup()
11import time
12
13from dnslib.dns import DNSRecord,DNSHeader,DNSQuestion,QTYPE
14from dnslib.digparser import DigParser
15
16from threading import Thread, Condition
17
18class WorkQueue:
19 def __init__(self):
20 self.job_cv = Condition()
21 self.jobs = []
22 self.result_cv = Condition()
23 self.results = []
24 self.outstanding = 0
25
26 def get_job(self):
27 self.job_cv.acquire()
28 while not self.jobs:
29 self.job_cv.wait()
30 result = self.jobs.pop()
31 self.job_cv.release()
32 return result
33
34 def submit_job(self, job):
35 self.job_cv.acquire()
36 self.jobs.append(job)
37 self.job_cv.notify()
38 self.job_cv.release()
39 self.outstanding = self.outstanding + 1
40
41 def get_result(self):
42 self.result_cv.acquire()
43 while not self.results:
44 self.result_cv.wait()
45 result = self.results.pop()
46 self.result_cv.release()
47 self.outstanding = self.outstanding - 1
48 return result
49
50 def submit_result(self, result):
51 self.result_cv.acquire()
52 self.results.append(result)
53 self.result_cv.notify()
54 self.result_cv.release()
55
56class DnsResolver(Thread):
57 def __init__(self, queue):
58 Thread.__init__(self)
59 self.queue = queue
60 self.daemon = True
61 self.start()
62
63 def run(self):
64 while True:
65 job = self.queue.get_job()
66 self.handle_job(job)
67 self.queue.submit_result(job)
68
69 def handle_job(self, job):
70 domain = job["domain"]
71 server = job["server"]
72 port = job["port"]
73
74 try:
75 q = DNSRecord(q=DNSQuestion(domain, getattr(QTYPE,"A")))
76
77 a_pkt = q.send(server, port, tcp=False, timeout=10)
78 a = DNSRecord.parse(a_pkt)
79
80 found_a_record = False
81 for record in a.ar:
82 if (record.rtype==QTYPE.A):
83 found_a_record=True
84 print record
85
86 if not found_a_record:
87 job["status"] = "%s,No A records" % domain
88 return
89
90 except Exception, e:
91 job["status"] = "%s,Exception: %s" % (domain, str(e))
92 return
93
94 job["status"] = "success"
95
96class HpcWatcher:
97 def __init__(self):
98 self.resolver_queue = WorkQueue()
99 for i in range(0,10):
100 DnsResolver(queue = self.resolver_queue)
101
102 def set_status(self, sliver, service, kind, msg):
103 print sliver.node.name, kind, msg
104 sliver.has_error = (msg!="success")
105
106 sliver_type = ContentType.objects.get_for_model(sliver)
107
108 t = Tag.objects.filter(service=service, name=kind+".msg", content_type__pk=sliver_type.id, object_id=sliver.id)
109 if t:
110 t=t[0]
111 if (t.value != msg):
112 t.value = msg
113 t.save()
114 else:
115 Tag(service=service, name=kind+".msg", content_object = sliver, value=msg).save()
116
117 t = Tag.objects.filter(service=service, name=kind+".time", content_type__pk=sliver_type.id, object_id=sliver.id)
118 if t:
119 t=t[0]
120 t.value = str(time.time())
121 t.save()
122 else:
123 Tag(service=service, name=kind+".time", content_object = sliver, value=str(time.time())).save()
124
125 def check_request_routers(self, service, slivers):
126 for sliver in slivers:
127 sliver.has_error = False
128
129 ip = sliver.get_public_ip(sliver)
130 if not ip:
131 self.set_status(sliver, service, "watcher.DNS", "no public IP")
132 continue
133
134 for domain in ["onlab1.vicci.org"]:
135 q = DNSRecord(q=DNSQuestion(domain, getattr(QTYPE,"A")))
136
137 self.resolver_queue.submit_job({"domain": domain, "server": ip, "port": 53, "sliver": sliver})
138
139 print self.resolver_queue.outstanding
140
141 while self.resolver_queue.outstanding > 0:
142 result = self.resolver_queue.get_result()
143 sliver = result["sliver"]
144 if (result["status"]!="success") and (not sliver.has_error):
145 self.set_status(sliver, service, "watcher.DNS", result["status"])
146
147 for sliver in slivers:
148 if not sliver.has_error:
149 self.set_status(sliver, service, "watcher.DNS", "success")
150
151 def get_service_slices(self, service, kind):
152 try:
153 slices = service.slices.all()
154 except:
155 # buggy data model
156 slices = service.service.all()
157
158 return [x for x in slices if (kind in x.name)]
159
160 def run_once(self):
161 for hpcService in HpcService.objects.all():
162 for slice in self.get_service_slices(hpcService, "dnsdemux"):
163 self.check_request_routers(hpcService, slice.slivers.all())
164
165 for rrService in RequestRouterService.objects.all():
166 for slice in self.get_service_slices(rrService, "dnsdemux"):
167 self.check_request_routers(rrService, slice.slivers.all())
168
169
170if __name__ == "__main__":
171 HpcWatcher().run_once()
172