#!/usr/bin/python3 from __future__ import print_function import sys import argparse import logging import hashlib import codecs import struct from collections import namedtuple from pprint import pprint from hashlib import sha256 from unbound import RR_TYPE_SOA, RR_TYPE_DNSKEY, RR_TYPE_NS from unbound import RR_TYPE_A, RR_TYPE_AAAA, RR_TYPE_DS from ldns import LDNS_SECTION_ANSWER from ldns import ldns_wire2pkt from check_dane.resolve import Resolver, ResolverException from check_dane.resolve import format_address, dnssec_verify_rrsig_validity DSRecord = namedtuple('DSRecord', ['keytag', 'algorithm', 'digesttype', 'digest']) DNSKEYRecord = namedtuple('DNSKEYRecord', ['flags', 'protocol', 'algorithm', 'key', 'digest']) def _keytag(data): keytag = 0 while len(data) > 0: a, b = data[:2] keytag = keytag + ord(b) + 256 * ord(a) data = data[2:] keytag = keytag + ((keytag / 65536) & 0xFFFF) return keytag & 0xFFFF def check_main_records(resolver, zone, args): """Confirms that the necessary records on a zone all verify""" retval = 0 for rrtype in [RR_TYPE_DNSKEY, RR_TYPE_NS, RR_TYPE_SOA]: result = resolver.resolve(zone, rrtype=rrtype, secure=True) nretval = dnssec_verify_rrsig_validity(result.packet, args.warndays, args.critdays) retval = max(nretval, retval) return retval def check_ds_delegation(resolver, zone, args): retval = 0 try: dses = dict() result = resolver.resolve(zone, RR_TYPE_DS, secure=True) for entry in result.data.data: tag, algo, digest = struct.unpack("!HBB", entry[:4]) value = entry[4:] dses[tag] = DSRecord(tag, algo, digest, value) dnskeys = dict() result = resolver.resolve(zone, RR_TYPE_DNSKEY, secure=True) for entry in result.data.data: flags, protocol, algorithm = struct.unpack("!HBB", entry[:4]) value = entry[4:] digest = sha256() for label in zone.split('.'): digest.update(struct.pack('b', len(label))) digest.update(label) digest.update(struct.pack('b', 0)) digest.update(entry) if flags & 0x1 == 1 and (flags >> 7) & 0x1 == 0: dnskeys[_keytag(entry)] = DNSKEYRecord(flags, protocol, algorithm, value,digest.digest()) for key in dnskeys: dnskey = dnskeys[key] if not key in dses: logging.warn("No DS record found for %s", dnskey) retval = max(retval, 1) else: ds = dses[key] if ds.digest != dnskey.digest: logging.error("DS and DNSKEY do not match: %s %s", ds, dnskey) retval = 2 for ds in dses: if not ds in dnskeys: logging.warn("Unused DS record: %s", dses[ds]) retval = max(retval, 1) return retval except ResolverException as e: logging.exception("check_ds_delegation: %s", e.message) def check_nsec_cycle(resolver, zone, args): """Confirms that NSEC records are completely available""" return 0 def check_synced(resolver, zone, args): """Makes sure the zone is at the same serial on all secondaries""" try: result = resolver.resolve(zone, RR_TYPE_NS, secure=True) if result.data is None: logging.error("No nameservers found for zone %s", zone) return 2 nameservers = result.data.as_domain_list() nameserver_ips = [] for nameserver in nameservers: ips = [] for rrtype in [RR_TYPE_AAAA, RR_TYPE_A]: result = resolver.resolve(nameserver, rrtype=rrtype, secure=True) if result.data is not None: ips = ips + [format_address(data, rrtype) for data in result.data.data] if ips == []: logging.warning("Could not find any address for nameserver %s", nameserver) nameserver_ips = nameserver_ips + ips if nameserver_ips == []: logging.error("No authoritive nameserver for %s could be resolved", zone) return 2 results = dict() for ip in nameserver_ips: newresolver = Resolver(args.ancor, ip) # We can't request secure here as the authoritative # nameservers for the zone won't let us retrieve the # signature chain below their own zone. We'll later # recheck the SOA record using the main recursor # # Alternatively one could get the DS / DNSKEY for the zone with # resolver and add it to newresolver as a hint. result = newresolver.resolve(zone, rrtype=RR_TYPE_SOA) s, result = ldns_wire2pkt(result.packet) if s != 0: logging.error("Parsing packet failed with errorcode %d", s) return 2 rrs = result.rr_list_by_type(RR_TYPE_SOA, LDNS_SECTION_ANSWER).rrs() soa = next(rrs) serial = str(soa).split()[6] results[serial] = results.get(serial, []) + [ip] if len(results) == 1: return 0 else: logging.error("different SOAs: %s", results) return 2 except ResolverException as e: logging.exception("check_synced: %s", e.message) def main(): logging.basicConfig(format='%(levelname)5s %(message)s') parser = argparse.ArgumentParser() parser.add_argument("Zone") parser.add_argument("--verbose", action="store_true") parser.add_argument("--quiet", action="store_true") parser.add_argument("-a", "--ancor", action="store", type=str, default="/etc/unbound/root.key", help="DNSSEC root ancor") parser.add_argument("--nsec", action="store_false", help="Verifies the complete NSEC/NSEC3 cycle (default: false)") parser.add_argument("--warndays", type=int, default=-1, help="Days before rrsig expiration to warn") parser.add_argument("--critdays", type=int, default=-1, help="Days before rrsig expiration to raise error") args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) elif args.quiet: logging.getLogger().setLevel(logging.WARNING) else: logging.getLogger().setLevel(logging.INFO) resolver = Resolver(args.ancor) zone = args.Zone.encode('idna').decode() retval1 = check_synced(resolver, zone, args) retval2 = check_main_records(resolver, zone, args) retval3 = check_ds_delegation(resolver, zone, args) if args.nsec: retval4 = check_nsec_cycle(resolver, zone, args) return max(retval1, retval2, retval3, retval4) else: return max(retval1, retval2, retval4) if __name__ == '__main__': sys.exit(main())