]> jfr.im git - irc/quakenet/qwebirc.git/blob - qwebirc/dns.py
Unbreak IPv6 address expansion where the address contains :: (that's not how list...
[irc/quakenet/qwebirc.git] / qwebirc / dns.py
1 from twisted.names import client, dns
2 from twisted.internet import reactor, defer
3
4 class LookupException(Exception): pass
5 class VerificationException(Exception): pass
6 TimeoutException = defer.TimeoutError
7
8 def lookupPTR(ip, *args, **kwargs):
9 def callback(result):
10 answer, auth, add = result
11 answer = [x for x in answer if x.type == dns.PTR]
12 if len(answer) == 0:
13 raise LookupException, "No ANSWERS in PTR response for %s." % repr(ip)
14 return str(answer[0].payload.name)
15
16 ptr = ".".join(ip.split(".")[::-1]) + ".in-addr.arpa."
17 return client.lookupPointer(ptr, **kwargs).addCallback(callback)
18
19 def expandIPv6(ip):
20 expand_sections = ["".join(["{:0>4}".format(group)
21 for group in section.split(":")])
22 for section in ip.split("::", 1)]
23 if len(expand_sections) == 1:
24 return expand_sections[0]
25 return expand_sections[0] + "".join((32-sum([len(x) for x in expand_sections]))*"0") + expand_sections[1]
26
27 def lookupPTRv6(ip, *args, **kwargs):
28 def callback(result):
29 answer, auth, add = result
30 answer = [x for x in answer if x.type == dns.PTR]
31 if len(answer) == 0:
32 raise LookupException, "No ANSWERS in PTR response for %s." % repr(ip)
33 return str(answer[0].payload.name)
34
35 ptr = ".".join(reversed(expandIPv6(ip))) + ".ip6.arpa."
36 return client.lookupPointer(ptr, **kwargs).addCallback(callback)
37
38 def lookupAs(hostname, *args, **kwargs):
39 def callback(result):
40 answer, auth, add = result
41 answer = [x for x in answer if x.type == dns.A]
42 if len(answer) == 0:
43 raise LookupException, "No ANSWERS in A response for %s." % repr(hostname)
44 return [x.payload.dottedQuad() for x in answer]
45
46 return client.lookupAddress(hostname, *args, **kwargs).addCallback(callback)
47
48 def lookupAAAAs(hostname, *args, **kwargs):
49 def callback(result):
50 answer, auth, add = result
51 answer = [x for x in answer if x.type == dns.AAAA]
52 if len(answer) == 0:
53 raise LookupException, "No ANSWERS in AAAA response for %s." % repr(hostname)
54 return [expandIPv6(x.payload._address) for x in answer]
55
56 return client.lookupIPV6Address(hostname, *args, **kwargs).addCallback(callback)
57
58 def lookupAndVerifyPTR(ip, *args, **kwargs):
59 d = defer.Deferred()
60
61 def gotPTRResult(ptr):
62 def gotAResult(a_records):
63 if ip in a_records:
64 d.callback(ptr)
65 else:
66 raise VerificationException("IP mismatch: %s != %s%s" % (repr(ip), repr(ptr), repr(a_records)))
67 lookupAs(ptr, *args, **kwargs).addCallback(gotAResult).addErrback(d.errback)
68
69 def gotPTRv6Result(ptr):
70 def gotAAAAResult(aaaa_records):
71 if expandIPv6(ip) in aaaa_records:
72 d.callback(ptr)
73 else:
74 raise VerificationException("IPv6 mismatch: %s != %s%s" % (repr(ip), repr(ptr), repr(aaaa_records)))
75 lookupAAAAs(ptr, *args, **kwargs).addCallback(gotAAAAResult).addErrback(d.errback)
76
77 if ":" in ip:
78 lookupPTRv6(ip, *args, **kwargs).addCallback(gotPTRv6Result).addErrback(d.errback)
79 else:
80 lookupPTR(ip, *args, **kwargs).addCallback(gotPTRResult).addErrback(d.errback)
81 return d
82
83 if __name__ == "__main__":
84 import sys
85
86 def callback(x):
87 print x
88 reactor.stop()
89
90 def errback(x):
91 x.printTraceback()
92 reactor.stop()
93
94 d = lookupAndVerifyPTR(sys.argv[1], timeout=[3])
95 d.addCallbacks(callback, errback)
96
97 reactor.run()