]> jfr.im git - irc/quakenet/qwebirc.git/blob - qwebirc/dns.py
dns: don't assume that all answers are of the requested type (i.e. support CNAME...
[irc/quakenet/qwebirc.git] / qwebirc / dns.py
1 from twisted.names import client, dns
2 from twisted.internet import reactor, defer
3
4 class LookupException(Exception): pass
5 class VerificationException(Exception): pass
6 TimeoutException = defer.TimeoutError
7
8 def lookupPTR(ip, *args, **kwargs):
9 def callback(result):
10 answer, auth, add = result
11 answer = [x for x in answer if x.type == dns.PTR]
12 if len(answer) == 0:
13 raise LookupException, "No ANSWERS in PTR response for %s." % repr(ip)
14 return str(answer[0].payload.name)
15
16 ptr = ".".join(ip.split(".")[::-1]) + ".in-addr.arpa."
17 return client.lookupPointer(ptr, **kwargs).addCallback(callback)
18
19 def expandIPv6(ip):
20 expand_start, expand_end = ["".join(["{:0>4}".format(group)
21 for group in section.split(":")])
22 for section in ip.split("::")]
23 return expand_start + (32-len(expand_start)-len(expand_end))*"0" + expand_end
24
25 def lookupPTRv6(ip, *args, **kwargs):
26 def callback(result):
27 answer, auth, add = result
28 answer = [x for x in answer if x.type == dns.PTR]
29 if len(answer) == 0:
30 raise LookupException, "No ANSWERS in PTR response for %s." % repr(ip)
31 return str(answer[0].payload.name)
32
33 ptr = ".".join(reversed(expandIPv6(ip))) + ".ip6.arpa."
34 return client.lookupPointer(ptr, **kwargs).addCallback(callback)
35
36 def lookupAs(hostname, *args, **kwargs):
37 def callback(result):
38 answer, auth, add = result
39 answer = [x for x in answer if x.type == dns.A]
40 if len(answer) == 0:
41 raise LookupException, "No ANSWERS in A response for %s." % repr(hostname)
42 return [x.payload.dottedQuad() for x in answer]
43
44 return client.lookupAddress(hostname, *args, **kwargs).addCallback(callback)
45
46 def lookupAAAAs(hostname, *args, **kwargs):
47 def callback(result):
48 answer, auth, add = result
49 answer = [x for x in answer if x.type == dns.AAAA]
50 if len(answer) == 0:
51 raise LookupException, "No ANSWERS in AAAA response for %s." % repr(hostname)
52 return [expandIPv6(x.payload._address) for x in answer]
53
54 return client.lookupIPV6Address(hostname, *args, **kwargs).addCallback(callback)
55
56 def lookupAndVerifyPTR(ip, *args, **kwargs):
57 d = defer.Deferred()
58
59 def gotPTRResult(ptr):
60 def gotAResult(a_records):
61 if ip in a_records:
62 d.callback(ptr)
63 else:
64 raise VerificationException("IP mismatch: %s != %s%s" % (repr(ip), repr(ptr), repr(a_records)))
65 lookupAs(ptr, *args, **kwargs).addCallback(gotAResult).addErrback(d.errback)
66
67 def gotPTRv6Result(ptr):
68 def gotAAAAResult(aaaa_records):
69 if expandIPv6(ip) in aaaa_records:
70 d.callback(ptr)
71 else:
72 raise VerificationException("IPv6 mismatch: %s != %s%s" % (repr(ip), repr(ptr), repr(aaaa_records)))
73 lookupAAAAs(ptr, *args, **kwargs).addCallback(gotAAAAResult).addErrback(d.errback)
74
75 if ":" in ip:
76 lookupPTRv6(ip, *args, **kwargs).addCallback(gotPTRv6Result).addErrback(d.errback)
77 else:
78 lookupPTR(ip, *args, **kwargs).addCallback(gotPTRResult).addErrback(d.errback)
79 return d
80
81 if __name__ == "__main__":
82 import sys
83
84 def callback(x):
85 print x
86 reactor.stop()
87
88 def errback(x):
89 x.printTraceback()
90 reactor.stop()
91
92 d = lookupAndVerifyPTR(sys.argv[1], timeout=[3])
93 d.addCallbacks(callback, errback)
94
95 reactor.run()