]>
jfr.im git - irc/quakenet/qwebirc.git/blob - qwebirc/dns.py
1 from twisted
.names
import client
, dns
2 from twisted
.internet
import reactor
, defer
4 class LookupException(Exception): pass
5 class VerificationException(Exception): pass
6 TimeoutException
= defer
.TimeoutError
8 def lookupPTR(ip
, *args
, **kwargs
):
10 answer
, auth
, add
= result
11 answer
= [x
for x
in answer
if x
.type == dns
.PTR
]
13 raise LookupException
, "No ANSWERS in PTR response for %s." % repr(ip
)
14 return str(answer
[0].payload
.name
)
16 ptr
= ".".join(ip
.split(".")[::-1]) + ".in-addr.arpa."
17 return client
.lookupPointer(ptr
, **kwargs
).addCallback(callback
)
20 expand_start
, expand_end
= ["".join(["{:0>4}".format(group
)
21 for group
in section
.split(":")])
22 for section
in ip
.split("::")]
23 return expand_start
+ (32-len(expand_start
)-len(expand_end
))*"0" + expand_end
25 def lookupPTRv6(ip
, *args
, **kwargs
):
27 answer
, auth
, add
= result
28 answer
= [x
for x
in answer
if x
.type == dns
.PTR
]
30 raise LookupException
, "No ANSWERS in PTR response for %s." % repr(ip
)
31 return str(answer
[0].payload
.name
)
33 ptr
= ".".join(reversed(expandIPv6(ip
))) + ".ip6.arpa."
34 return client
.lookupPointer(ptr
, **kwargs
).addCallback(callback
)
36 def lookupAs(hostname
, *args
, **kwargs
):
38 answer
, auth
, add
= result
39 answer
= [x
for x
in answer
if x
.type == dns
.A
]
41 raise LookupException
, "No ANSWERS in A response for %s." % repr(hostname
)
42 return [x
.payload
.dottedQuad() for x
in answer
]
44 return client
.lookupAddress(hostname
, *args
, **kwargs
).addCallback(callback
)
46 def lookupAAAAs(hostname
, *args
, **kwargs
):
48 answer
, auth
, add
= result
49 answer
= [x
for x
in answer
if x
.type == dns
.AAAA
]
51 raise LookupException
, "No ANSWERS in AAAA response for %s." % repr(hostname
)
52 return [expandIPv6(x
.payload
._address
) for x
in answer
]
54 return client
.lookupIPV6Address(hostname
, *args
, **kwargs
).addCallback(callback
)
56 def lookupAndVerifyPTR(ip
, *args
, **kwargs
):
59 def gotPTRResult(ptr
):
60 def gotAResult(a_records
):
64 raise VerificationException("IP mismatch: %s != %s%s" % (repr(ip
), repr(ptr
), repr(a_records
)))
65 lookupAs(ptr
, *args
, **kwargs
).addCallback(gotAResult
).addErrback(d
.errback
)
67 def gotPTRv6Result(ptr
):
68 def gotAAAAResult(aaaa_records
):
69 if expandIPv6(ip
) in aaaa_records
:
72 raise VerificationException("IPv6 mismatch: %s != %s%s" % (repr(ip
), repr(ptr
), repr(aaaa_records
)))
73 lookupAAAAs(ptr
, *args
, **kwargs
).addCallback(gotAAAAResult
).addErrback(d
.errback
)
76 lookupPTRv6(ip
, *args
, **kwargs
).addCallback(gotPTRv6Result
).addErrback(d
.errback
)
78 lookupPTR(ip
, *args
, **kwargs
).addCallback(gotPTRResult
).addErrback(d
.errback
)
81 if __name__
== "__main__":
92 d
= lookupAndVerifyPTR(sys
.argv
[1], timeout
=[3])
93 d
.addCallbacks(callback
, errback
)