ptr = ".".join(ip.split(".")[::-1]) + ".in-addr.arpa."
return client.lookupPointer(ptr, **kwargs).addCallback(callback)
+def expandIPv6(ip):
+ expand_start, expand_end = ["".join(["{:0>4}".format(group)
+ for group in section.split(":")])
+ for section in ip.split("::")]
+ return expand_start + (32-len(expand_start)-len(expand_end))*"0" + expand_end
+
+def lookupPTRv6(ip, *args, **kwargs):
+ def callback(result):
+ answer, auth, add = result
+
+ if len(answer) == 0:
+ raise LookupException, "No ANSWERS in PTR response for %s." % repr(ip)
+ return str(answer[0].payload.name)
+
+ ptr = ".".join(reversed(expandIPv6(ip))) + ".ip6.arpa."
+ return client.lookupPointer(ptr, **kwargs).addCallback(callback)
+
def lookupAs(hostname, *args, **kwargs):
def callback(result):
answer, auth, add = result
return client.lookupAddress(hostname, *args, **kwargs).addCallback(callback)
+def lookupAAAAs(hostname, *args, **kwargs):
+ def callback(result):
+ answer, auth, add = result
+ if len(answer) == 0:
+ raise LookupException, "No ANSWERS in AAAA response for %s." % repr(hostname)
+ return [expandIPv6(x.payload._address) for x in answer]
+
+ return client.lookupIPV6Address(hostname, *args, **kwargs).addCallback(callback)
+
def lookupAndVerifyPTR(ip, *args, **kwargs):
d = defer.Deferred()
raise VerificationException("IP mismatch: %s != %s%s" % (repr(ip), repr(ptr), repr(a_records)))
lookupAs(ptr, *args, **kwargs).addCallback(gotAResult).addErrback(d.errback)
- lookupPTR(ip, *args, **kwargs).addCallback(gotPTRResult).addErrback(d.errback)
+ def gotPTRv6Result(ptr):
+ def gotAAAAResult(aaaa_records):
+ if expandIPv6(ip) in aaaa_records:
+ d.callback(ptr)
+ else:
+ raise VerificationException("IPv6 mismatch: %s != %s%s" % (repr(ip), repr(ptr), repr(aaaa_records)))
+ lookupAAAAs(ptr, *args, **kwargs).addCallback(gotAAAAResult).addErrback(d.errback)
+
+ if ":" in ip:
+ lookupPTRv6(ip, *args, **kwargs).addCallback(gotPTRv6Result).addErrback(d.errback)
+ else:
+ lookupPTR(ip, *args, **kwargs).addCallback(gotPTRResult).addErrback(d.errback)
return d
if __name__ == "__main__":
x.printTraceback()
reactor.stop()
- d = lookupAndVerifyPTR(sys.argv[1], timeout=[.001])
+ d = lookupAndVerifyPTR(sys.argv[1], timeout=[3])
d.addCallbacks(callback, errback)
reactor.run()