import logging
import hashlib
import codecs
+import struct
+from collections import namedtuple
+from pprint import pprint
+from hashlib import sha256
from unbound import RR_TYPE_SOA, RR_TYPE_DNSKEY, RR_TYPE_NS
-from unbound import RR_TYPE_A, RR_TYPE_AAAA
+from unbound import RR_TYPE_A, RR_TYPE_AAAA, RR_TYPE_DS
from ldns import LDNS_SECTION_ANSWER
from ldns import ldns_wire2pkt
from check_dane.resolve import format_address, dnssec_verify_rrsig_validity
+DSRecord = namedtuple('DSRecord', ['keytag', 'algorithm', 'digesttype', 'digest'])
+DNSKEYRecord = namedtuple('DNSKEYRecord', ['flags', 'protocol', 'algorithm', 'key', 'digest'])
+
+
+def _keytag(data):
+ keytag = 0
+
+ while len(data) > 0:
+ a, b = data[:2]
+ keytag = keytag + ord(b) + 256 * ord(a)
+ data = data[2:]
+
+ keytag = keytag + ((keytag / 65536) & 0xFFFF)
+ return keytag & 0xFFFF
+
+
def check_main_records(resolver, zone, args):
"""Confirms that the necessary records on a zone all verify"""
retval = 0
return retval
+def check_ds_delegation(resolver, zone, args):
+ retval = 0
+ try:
+ dses = dict()
+ result = resolver.resolve(zone, RR_TYPE_DS, secure=True)
+
+ for entry in result.data.data:
+ tag, algo, digest = struct.unpack("!HBB", entry[:4])
+ value = entry[4:]
+ dses[tag] = DSRecord(tag, algo, digest, value)
+
+ dnskeys = dict()
+ result = resolver.resolve(zone, RR_TYPE_DNSKEY, secure=True)
+
+ for entry in result.data.data:
+ flags, protocol, algorithm = struct.unpack("!HBB", entry[:4])
+ value = entry[4:]
+ digest = sha256()
+ for label in zone.split('.'):
+ digest.update(struct.pack('b', len(label)))
+ digest.update(label)
+ digest.update(struct.pack('b', 0))
+ digest.update(entry)
+ if flags & 0x1 == 1 and (flags >> 7) & 0x1 == 0:
+ dnskeys[_keytag(entry)] = DNSKEYRecord(flags, protocol, algorithm, value,digest.digest())
+
+ for key in dnskeys:
+ dnskey = dnskeys[key]
+ if not key in dses:
+ logging.warn("No DS record found for %s", dnskey)
+ retval = max(retval, 1)
+
+ else:
+ ds = dses[key]
+ if ds.digest != dnskey.digest:
+ logging.error("DS and DNSKEY do not match: %s %s", ds, dnskey)
+ retval = 2
+
+ for ds in dses:
+ if not ds in dnskeys:
+ logging.warn("Unused DS record: %s", dses[ds])
+ retval = max(retval, 1)
+
+ return retval
+
+ except ResolverException as e:
+ logging.exception("check_ds_delegation: %s", e.message)
+
+
def check_nsec_cycle(resolver, zone, args):
"""Confirms that NSEC records are completely available"""
return 0
return 2
except ResolverException as e:
- logging.error(e.message)
+ logging.exception("check_synced: %s", e.message)
def main():
retval1 = check_synced(resolver, zone, args)
retval2 = check_main_records(resolver, zone, args)
+ retval3 = check_ds_delegation(resolver, zone, args)
if args.nsec:
- retval3 = check_nsec_cycle(resolver, zone, args)
- return max(retval1, retval2, retval3)
+ retval4 = check_nsec_cycle(resolver, zone, args)
+ return max(retval1, retval2, retval3, retval4)
else:
- return max(retval1, retval2)
+ return max(retval1, retval2, retval4)
if __name__ == '__main__':
sys.exit(main())