--- /dev/null
+#!/usr/bin/python3
+
+import struct
+import logging
+from datetime import datetime
+
+from unbound import ub_ctx, ub_strerror
+from unbound import RR_TYPE_A, RR_TYPE_AAAA, RR_TYPE_RRSIG
+
+from ldns import ldns_wire2pkt
+from ldns import LDNS_SECTION_ANSWER
+
+
+def _parse_rrsig_date(expirestring):
+ expires = datetime(int(expirestring[:4]),
+ int(expirestring[4:6]),
+ int(expirestring[6:8]),
+ int(expirestring[8:10]),
+ int(expirestring[10:12]),
+ int(expirestring[12:14]))
+ return expires
+
+
+def format_address(data, datatype):
+ """Given a answer packet for an A or AAAA query, return the string
+ representation of the address
+ """
+ if datatype == RR_TYPE_A:
+ return '.'.join([str(a) for a in data])
+ elif datatype == RR_TYPE_AAAA:
+ data = list(struct.iter_unpack("!H", data))
+ return ":".join(["%x" % a for a in data])
+ else:
+ return None
+
+
+def dnssec_verify_rrsig_validity(data, warn=-1, critical=0):
+ """Given a answer packet confirm validity of rrsigs (with safety) """
+ now = datetime.utcnow()
+
+ s, packet = ldns_wire2pkt(data)
+ if s != 0:
+ logging.error("Parsing packet failed with errorcode %d", s)
+ return 2
+
+ rrsigs = packet.rr_list_by_type(RR_TYPE_RRSIG, LDNS_SECTION_ANSWER).rrs()
+ rrsig = next(rrsigs)
+
+ expire = _parse_rrsig_date(str(rrsig.rrsig_expiration()))
+ incept = _parse_rrsig_date(str(rrsig.rrsig_inception()))
+
+ if now < incept:
+ logging.error("Signature not yet valid, only from %s", incept)
+ return 2
+
+ stillvalid = expire - now
+ deltastr = str(stillvalid).split(",")
+
+ if stillvalid.days < max(0, critical):
+ logging.error("expires in %8s,%16s", deltastr[0], deltastr[1])
+ return 2
+ elif stillvalid.days < warn:
+ logging.warning("expires in %8s,%16s", deltastr[0], deltastr[1])
+ return 1
+
+
+class ResolverException(BaseException):
+ def __init__(self, message):
+ BaseException.__init__(self)
+ self.message = message
+
+
+class Resolver:
+ def __init__(self, ancor, fwd=None):
+ self._resolver = ub_ctx()
+ status = self._resolver.add_ta_file(ancor)
+ if status != 0:
+ raise ResolverException(ub_strerror(status))
+
+ if fwd is not None:
+ status = self._resolver.set_fwd(fwd)
+ if status != 0:
+ raise ResolverException(ub_strerror(status))
+
+
+ def resolve(self, name, rrtype, secure=False):
+ status, result = self._resolver.resolve(name, rrtype)
+ if 0 != status:
+ raise ResolverException(ub_strerror(status))
+
+ if secure and not result.secure:
+ raise ResolverException("Response was not signed")
+
+ return result
--- /dev/null
+#!/usr/bin/python3
+
+from __future__ import print_function
+
+import sys
+import argparse
+import logging
+import hashlib
+import codecs
+
+from unbound import RR_TYPE_SOA, RR_TYPE_DNSKEY, RR_TYPE_NS
+from unbound import RR_TYPE_A, RR_TYPE_AAAA
+from ldns import LDNS_SECTION_ANSWER
+from ldns import ldns_wire2pkt
+
+
+from check_dane.resolve import Resolver, ResolverException
+from check_dane.resolve import format_address, dnssec_verify_rrsig_validity
+
+
+def check_main_records(resolver, zone, args):
+ """Confirms that the necessary records on a zone all verify"""
+ retval = 0
+
+ for rrtype in [RR_TYPE_DNSKEY, RR_TYPE_NS, RR_TYPE_SOA]:
+ result = resolver.resolve(zone, rrtype=rrtype, secure=True)
+ nretval = dnssec_verify_rrsig_validity(result.packet, args.warndays, args.critdays)
+ retval = max(nretval, retval)
+
+ return retval
+
+
+def check_nsec_cycle(resolver, zone, args):
+ """Confirms that NSEC records are completely available"""
+ return 0
+
+
+def check_synced(resolver, zone, args):
+ """Makes sure the zone is at the same serial on all secondaries"""
+ try:
+ result = resolver.resolve(zone, RR_TYPE_NS, secure=True)
+
+ if result.data is None:
+ logging.error("No nameservers found for zone %s", zone)
+ return 2
+
+ nameservers = result.data.as_domain_list()
+ nameserver_ips = []
+ for nameserver in nameservers:
+ ips = []
+ for rrtype in [RR_TYPE_AAAA, RR_TYPE_A]:
+ result = resolver.resolve(nameserver, rrtype=rrtype, secure=True)
+ if result.data is not None:
+ ips = ips + [format_address(data, rrtype) for data in result.data.data]
+
+ if ips == []:
+ logging.warning("Could not find any address for nameserver %s", nameserver)
+
+ nameserver_ips = nameserver_ips + ips
+
+ if nameserver_ips == []:
+ logging.error("No authoritive nameserver for %s could be resolved", zone)
+ return 2
+
+ results = dict()
+ for ip in nameserver_ips:
+ newresolver = Resolver(args.ancor, ip)
+
+ # We can't request secure here as the authoritative
+ # nameservers for the zone won't let us retrieve the
+ # signature chain below their own zone. We'll later
+ # recheck the SOA record using the main recursor
+ #
+ # Alternatively one could get the DS / DNSKEY for the zone with
+ # resolver and add it to newresolver as a hint.
+ result = newresolver.resolve(zone, rrtype=RR_TYPE_SOA)
+
+ s, result = ldns_wire2pkt(result.packet)
+ if s != 0:
+ logging.error("Parsing packet failed with errorcode %d", s)
+ return 2
+
+ rrs = result.rr_list_by_type(RR_TYPE_SOA, LDNS_SECTION_ANSWER).rrs()
+ soa = next(rrs)
+
+ serial = str(soa).split()[6]
+ results[serial] = results.get(serial, []) + [ip]
+
+ if len(results) == 1:
+ return 0
+ else:
+ logging.error("different SOAs: %s", results)
+ return 2
+
+ except ResolverException as e:
+ logging.error(e.message)
+
+
+def main():
+ logging.basicConfig(format='%(levelname)5s %(message)s')
+ parser = argparse.ArgumentParser()
+ parser.add_argument("Zone")
+
+ parser.add_argument("--verbose", action="store_true")
+ parser.add_argument("--quiet", action="store_true")
+
+ parser.add_argument("-a", "--ancor",
+ action="store", type=str, default="/etc/unbound/root.key",
+ help="DNSSEC root ancor")
+
+ parser.add_argument("--nsec", action="store_false",
+ help="Verifies the complete NSEC/NSEC3 cycle (default: false)")
+ parser.add_argument("--warndays", type=int, default=-1,
+ help="Days before rrsig expiration to warn")
+ parser.add_argument("--critdays", type=int, default=-1,
+ help="Days before rrsig expiration to raise error")
+
+
+ args = parser.parse_args()
+
+ if args.verbose:
+ logging.getLogger().setLevel(logging.DEBUG)
+ elif args.quiet:
+ logging.getLogger().setLevel(logging.WARNING)
+ else:
+ logging.getLogger().setLevel(logging.INFO)
+
+ resolver = Resolver(args.ancor)
+ zone = args.Zone.encode('idna').decode()
+
+ retval1 = check_synced(resolver, zone, args)
+ retval2 = check_main_records(resolver, zone, args)
+ if args.nsec:
+ retval3 = check_nsec_cycle(resolver, zone, args)
+ return max(retval1, retval2, retval3)
+ else:
+ return max(retval1, retval2)
+
+if __name__ == '__main__':
+ sys.exit(main())