]>
jfr.im git - irc/quakenet/qwebirc.git/blob - qwebirc/dns.py
1 from twisted
.names
import client
, dns
2 from twisted
.internet
import reactor
, defer
4 class LookupException(Exception): pass
5 class VerificationException(Exception): pass
6 TimeoutException
= defer
.TimeoutError
8 def lookupPTR(ip
, *args
, **kwargs
):
10 answer
, auth
, add
= result
11 answer
= [x
for x
in answer
if x
.type == dns
.PTR
]
13 raise LookupException
, "No ANSWERS in PTR response for %s." % repr(ip
)
14 return str(answer
[0].payload
.name
)
16 ptr
= ".".join(ip
.split(".")[::-1]) + ".in-addr.arpa."
17 return client
.lookupPointer(ptr
, **kwargs
).addCallback(callback
)
20 expand_sections
= ["".join(["{:0>4}".format(group
)
21 for group
in section
.split(":")])
22 for section
in ip
.split("::", 1)]
23 if len(expand_sections
) == 1:
24 return expand_sections
[0]
25 return expand_sections
[0] + "".join((32-sum([len(x
) for x
in expand_sections
]))*"0") + expand_sections
[1]
27 def lookupPTRv6(ip
, *args
, **kwargs
):
29 answer
, auth
, add
= result
30 answer
= [x
for x
in answer
if x
.type == dns
.PTR
]
32 raise LookupException
, "No ANSWERS in PTR response for %s." % repr(ip
)
33 return str(answer
[0].payload
.name
)
35 ptr
= ".".join(reversed(expandIPv6(ip
))) + ".ip6.arpa."
36 return client
.lookupPointer(ptr
, **kwargs
).addCallback(callback
)
38 def lookupAs(hostname
, *args
, **kwargs
):
40 answer
, auth
, add
= result
41 answer
= [x
for x
in answer
if x
.type == dns
.A
]
43 raise LookupException
, "No ANSWERS in A response for %s." % repr(hostname
)
44 return [x
.payload
.dottedQuad() for x
in answer
]
46 return client
.lookupAddress(hostname
, *args
, **kwargs
).addCallback(callback
)
48 def lookupAAAAs(hostname
, *args
, **kwargs
):
50 answer
, auth
, add
= result
51 answer
= [x
for x
in answer
if x
.type == dns
.AAAA
]
53 raise LookupException
, "No ANSWERS in AAAA response for %s." % repr(hostname
)
54 return [expandIPv6(x
.payload
._address
) for x
in answer
]
56 return client
.lookupIPV6Address(hostname
, *args
, **kwargs
).addCallback(callback
)
58 def lookupAndVerifyPTR(ip
, *args
, **kwargs
):
61 def gotPTRResult(ptr
):
62 def gotAResult(a_records
):
66 raise VerificationException("IP mismatch: %s != %s%s" % (repr(ip
), repr(ptr
), repr(a_records
)))
67 lookupAs(ptr
, *args
, **kwargs
).addCallback(gotAResult
).addErrback(d
.errback
)
69 def gotPTRv6Result(ptr
):
70 def gotAAAAResult(aaaa_records
):
71 if expandIPv6(ip
) in aaaa_records
:
74 raise VerificationException("IPv6 mismatch: %s != %s%s" % (repr(ip
), repr(ptr
), repr(aaaa_records
)))
75 lookupAAAAs(ptr
, *args
, **kwargs
).addCallback(gotAAAAResult
).addErrback(d
.errback
)
78 lookupPTRv6(ip
, *args
, **kwargs
).addCallback(gotPTRv6Result
).addErrback(d
.errback
)
80 lookupPTR(ip
, *args
, **kwargs
).addCallback(gotPTRResult
).addErrback(d
.errback
)
83 if __name__
== "__main__":
94 d
= lookupAndVerifyPTR(sys
.argv
[1], timeout
=[3])
95 d
.addCallbacks(callback
, errback
)