| Scott Baker | 4df05ee | 2015-03-16 16:43:51 -0700 | [diff] [blame^] | 1 | import os |
| 2 | import sys |
| 3 | sys.path.append("/opt/xos") |
| 4 | os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings") |
| 5 | import django |
| 6 | from django.contrib.contenttypes.models import ContentType |
| 7 | from core.models import *
|
| 8 | from hpc.models import *
|
| 9 | from requestrouter.models import *
|
| 10 | django.setup() |
| 11 | import time |
| 12 | |
| 13 | from dnslib.dns import DNSRecord,DNSHeader,DNSQuestion,QTYPE |
| 14 | from dnslib.digparser import DigParser |
| 15 | |
| 16 | from threading import Thread, Condition |
| 17 | |
| 18 | class WorkQueue: |
| 19 | def __init__(self): |
| 20 | self.job_cv = Condition() |
| 21 | self.jobs = [] |
| 22 | self.result_cv = Condition() |
| 23 | self.results = [] |
| 24 | self.outstanding = 0 |
| 25 | |
| 26 | def get_job(self): |
| 27 | self.job_cv.acquire() |
| 28 | while not self.jobs: |
| 29 | self.job_cv.wait() |
| 30 | result = self.jobs.pop() |
| 31 | self.job_cv.release() |
| 32 | return result |
| 33 | |
| 34 | def submit_job(self, job): |
| 35 | self.job_cv.acquire() |
| 36 | self.jobs.append(job) |
| 37 | self.job_cv.notify() |
| 38 | self.job_cv.release() |
| 39 | self.outstanding = self.outstanding + 1 |
| 40 | |
| 41 | def get_result(self): |
| 42 | self.result_cv.acquire() |
| 43 | while not self.results: |
| 44 | self.result_cv.wait() |
| 45 | result = self.results.pop() |
| 46 | self.result_cv.release() |
| 47 | self.outstanding = self.outstanding - 1 |
| 48 | return result |
| 49 | |
| 50 | def submit_result(self, result): |
| 51 | self.result_cv.acquire() |
| 52 | self.results.append(result) |
| 53 | self.result_cv.notify() |
| 54 | self.result_cv.release() |
| 55 | |
| 56 | class DnsResolver(Thread): |
| 57 | def __init__(self, queue): |
| 58 | Thread.__init__(self) |
| 59 | self.queue = queue |
| 60 | self.daemon = True |
| 61 | self.start() |
| 62 | |
| 63 | def run(self): |
| 64 | while True: |
| 65 | job = self.queue.get_job() |
| 66 | self.handle_job(job) |
| 67 | self.queue.submit_result(job) |
| 68 | |
| 69 | def handle_job(self, job): |
| 70 | domain = job["domain"] |
| 71 | server = job["server"] |
| 72 | port = job["port"] |
| 73 | |
| 74 | try: |
| 75 | q = DNSRecord(q=DNSQuestion(domain, getattr(QTYPE,"A"))) |
| 76 | |
| 77 | a_pkt = q.send(server, port, tcp=False, timeout=10) |
| 78 | a = DNSRecord.parse(a_pkt) |
| 79 | |
| 80 | found_a_record = False |
| 81 | for record in a.ar: |
| 82 | if (record.rtype==QTYPE.A): |
| 83 | found_a_record=True |
| 84 | print record |
| 85 | |
| 86 | if not found_a_record: |
| 87 | job["status"] = "%s,No A records" % domain |
| 88 | return |
| 89 | |
| 90 | except Exception, e: |
| 91 | job["status"] = "%s,Exception: %s" % (domain, str(e)) |
| 92 | return |
| 93 | |
| 94 | job["status"] = "success" |
| 95 | |
| 96 | class HpcWatcher: |
| 97 | def __init__(self): |
| 98 | self.resolver_queue = WorkQueue() |
| 99 | for i in range(0,10): |
| 100 | DnsResolver(queue = self.resolver_queue) |
| 101 | |
| 102 | def set_status(self, sliver, service, kind, msg): |
| 103 | print sliver.node.name, kind, msg |
| 104 | sliver.has_error = (msg!="success") |
| 105 | |
| 106 | sliver_type = ContentType.objects.get_for_model(sliver) |
| 107 | |
| 108 | t = Tag.objects.filter(service=service, name=kind+".msg", content_type__pk=sliver_type.id, object_id=sliver.id) |
| 109 | if t: |
| 110 | t=t[0] |
| 111 | if (t.value != msg): |
| 112 | t.value = msg |
| 113 | t.save() |
| 114 | else: |
| 115 | Tag(service=service, name=kind+".msg", content_object = sliver, value=msg).save() |
| 116 | |
| 117 | t = Tag.objects.filter(service=service, name=kind+".time", content_type__pk=sliver_type.id, object_id=sliver.id) |
| 118 | if t: |
| 119 | t=t[0] |
| 120 | t.value = str(time.time()) |
| 121 | t.save() |
| 122 | else: |
| 123 | Tag(service=service, name=kind+".time", content_object = sliver, value=str(time.time())).save() |
| 124 | |
| 125 | def check_request_routers(self, service, slivers): |
| 126 | for sliver in slivers: |
| 127 | sliver.has_error = False |
| 128 | |
| 129 | ip = sliver.get_public_ip(sliver) |
| 130 | if not ip: |
| 131 | self.set_status(sliver, service, "watcher.DNS", "no public IP") |
| 132 | continue |
| 133 | |
| 134 | for domain in ["onlab1.vicci.org"]: |
| 135 | q = DNSRecord(q=DNSQuestion(domain, getattr(QTYPE,"A"))) |
| 136 | |
| 137 | self.resolver_queue.submit_job({"domain": domain, "server": ip, "port": 53, "sliver": sliver}) |
| 138 | |
| 139 | print self.resolver_queue.outstanding |
| 140 | |
| 141 | while self.resolver_queue.outstanding > 0: |
| 142 | result = self.resolver_queue.get_result() |
| 143 | sliver = result["sliver"] |
| 144 | if (result["status"]!="success") and (not sliver.has_error): |
| 145 | self.set_status(sliver, service, "watcher.DNS", result["status"]) |
| 146 | |
| 147 | for sliver in slivers: |
| 148 | if not sliver.has_error: |
| 149 | self.set_status(sliver, service, "watcher.DNS", "success") |
| 150 | |
| 151 | def get_service_slices(self, service, kind): |
| 152 | try: |
| 153 | slices = service.slices.all() |
| 154 | except: |
| 155 | # buggy data model |
| 156 | slices = service.service.all() |
| 157 | |
| 158 | return [x for x in slices if (kind in x.name)] |
| 159 | |
| 160 | def run_once(self): |
| 161 | for hpcService in HpcService.objects.all(): |
| 162 | for slice in self.get_service_slices(hpcService, "dnsdemux"): |
| 163 | self.check_request_routers(hpcService, slice.slivers.all()) |
| 164 | |
| 165 | for rrService in RequestRouterService.objects.all(): |
| 166 | for slice in self.get_service_slices(rrService, "dnsdemux"): |
| 167 | self.check_request_routers(rrService, slice.slivers.all()) |
| 168 | |
| 169 | |
| 170 | if __name__ == "__main__": |
| 171 | HpcWatcher().run_once() |
| 172 | |