Also check for DS <-> DNSKEY consistency
authorChristoph Egger <christoph@christoph-egger.org>
Mon, 12 Jun 2017 20:52:15 +0000 (22:52 +0200)
committerChristoph Egger <christoph@christoph-egger.org>
Mon, 12 Jun 2017 20:52:15 +0000 (22:52 +0200)
check_dnssec

index 1d30853..ca96061 100755 (executable)
@@ -7,9 +7,13 @@ import argparse
 import logging
 import hashlib
 import codecs
+import struct
+from collections import namedtuple
+from pprint import pprint
+from hashlib import sha256
 
 from unbound import RR_TYPE_SOA, RR_TYPE_DNSKEY, RR_TYPE_NS
-from unbound import RR_TYPE_A, RR_TYPE_AAAA
+from unbound import RR_TYPE_A, RR_TYPE_AAAA, RR_TYPE_DS
 from ldns import LDNS_SECTION_ANSWER
 from ldns import ldns_wire2pkt
 
@@ -18,6 +22,22 @@ from check_dane.resolve import Resolver, ResolverException
 from check_dane.resolve import format_address, dnssec_verify_rrsig_validity
 
 
+DSRecord = namedtuple('DSRecord', ['keytag', 'algorithm', 'digesttype', 'digest'])
+DNSKEYRecord = namedtuple('DNSKEYRecord', ['flags', 'protocol', 'algorithm', 'key', 'digest'])
+
+
+def _keytag(data):
+    keytag = 0
+
+    while len(data) > 0:
+        a, b = data[:2]
+        keytag = keytag + ord(b) + 256 * ord(a)
+        data = data[2:]
+
+    keytag = keytag + ((keytag / 65536) & 0xFFFF)
+    return keytag & 0xFFFF
+
+
 def check_main_records(resolver, zone, args):
     """Confirms that the necessary records on a zone all verify"""
     retval = 0
@@ -30,6 +50,55 @@ def check_main_records(resolver, zone, args):
     return retval
 
 
+def check_ds_delegation(resolver, zone, args):
+    retval = 0
+    try:
+        dses = dict()
+        result = resolver.resolve(zone, RR_TYPE_DS, secure=True)
+
+        for entry in result.data.data:
+            tag, algo, digest = struct.unpack("!HBB", entry[:4])
+            value = entry[4:]
+            dses[tag] = DSRecord(tag, algo, digest, value)
+
+        dnskeys = dict()
+        result = resolver.resolve(zone, RR_TYPE_DNSKEY, secure=True)
+
+        for entry in result.data.data:
+            flags, protocol, algorithm = struct.unpack("!HBB", entry[:4])
+            value = entry[4:]
+            digest = sha256()
+            for label in zone.split('.'):
+                digest.update(struct.pack('b', len(label)))
+                digest.update(label)
+            digest.update(struct.pack('b', 0))
+            digest.update(entry)
+            if flags & 0x1 == 1 and (flags >> 7) & 0x1 == 0:
+                dnskeys[_keytag(entry)] = DNSKEYRecord(flags, protocol, algorithm, value,digest.digest())
+
+        for key in dnskeys:
+            dnskey = dnskeys[key]
+            if not key in dses:
+                logging.warn("No DS record found for %s", dnskey)
+                retval = max(retval, 1)
+
+            else:
+                ds = dses[key]
+                if ds.digest != dnskey.digest:
+                    logging.error("DS and DNSKEY do not match: %s %s", ds, dnskey)
+                    retval = 2
+
+        for ds in dses:
+            if not ds in dnskeys:
+                logging.warn("Unused DS record: %s", dses[ds])
+                retval = max(retval, 1)
+
+        return retval
+
+    except ResolverException as e:
+        logging.exception("check_ds_delegation: %s", e.message)
+
+
 def check_nsec_cycle(resolver, zone, args):
     """Confirms that NSEC records are completely available"""
     return 0
@@ -93,7 +162,7 @@ def check_synced(resolver, zone, args):
             return 2
 
     except ResolverException as e:
-        logging.error(e.message)
+        logging.exception("check_synced: %s", e.message)
 
 
 def main():
@@ -130,11 +199,12 @@ def main():
 
     retval1 = check_synced(resolver, zone, args)
     retval2 = check_main_records(resolver, zone, args)
+    retval3 = check_ds_delegation(resolver, zone, args)
     if args.nsec:
-        retval3 = check_nsec_cycle(resolver, zone, args)
-        return max(retval1, retval2, retval3)
+        retval4 = check_nsec_cycle(resolver, zone, args)
+        return max(retval1, retval2, retval3, retval4)
     else:
-        return max(retval1, retval2)
+        return max(retval1, retval2, retval4)
 
 if __name__ == '__main__':
     sys.exit(main())