from __future__ import print_function
import ldns
-from unbound import ub_ctx, idn2dname, RR_TYPE_SOA, RR_TYPE_RRSIG, ub_strerror
+from unbound import ub_ctx, idn2dname, ub_strerror
+from unbound import RR_TYPE_SOA, RR_TYPE_DNSKEY, RR_TYPE_RRSIG, RR_TYPE_NS
+from unbound import RR_TYPE_A, RR_TYPE_AAAA
from optparse import OptionParser
import sys
from datetime import datetime, timedelta
return delta
def check_dnssec_expire(resolver, name, warn, crit):
- s, result = resolver.resolve(name, rrtype=RR_TYPE_SOA)
+ for rrtype in [RR_TYPE_SOA, RR_TYPE_DNSKEY]:
+ s, result = resolver.resolve(name, rrtype=rrtype)
+ if 0 != s:
+ ub_strerror(s)
+ return 3
+
+ if not result.secure:
+ print("CRIT (does not verify) %s" % (name, ))
+ return 2
+
+ s, packet = ldns.ldns_wire2pkt(result.packet)
+ rrsigs = packet.rr_list_by_type(RR_TYPE_RRSIG, ldns.LDNS_SECTION_ANSWER).rrs()
+
+ for rrsig in rrsigs:
+ delta = parse_rrsig_expire(str(rrsig.rrsig_expiration()))
+
+ if delta < crit:
+ print("CRIT (expires in %s) %s" % (delta, name))
+ return 2
+ elif delta < warn:
+ print("WARN (expires in %s) %s" % (delta, name))
+ return 1
+ return 0
+
+def check_zone_synced(resolver, name):
+ s, result = resolver.resolve(name, RR_TYPE_NS)
if 0 != s:
ub_strerror(s)
return 3
print("CRIT (does not verify) %s" % (name, ))
return 2
- s, packet = ldns.ldns_wire2pkt(result.packet)
- rrsigs = packet.rr_list_by_type(RR_TYPE_RRSIG, ldns.LDNS_SECTION_ANSWER).rrs()
- for rrsig in rrsigs:
- delta = parse_rrsig_expire(str(rrsig.rrsig_expiration()))
-
- if delta < crit:
- print("CRIT (expires in %s) %s" % (delta, name))
- return 2
- elif delta < warn:
- print("WARN (expires in %s) %s" % (delta, name))
- return 1
- return 0
+ nameservers = result.data.as_domain_list()
+ nsips = []
+ for nameserver in nameservers:
+ for rrtype in [RR_TYPE_AAAA, RR_TYPE_A]:
+ s, result = resolver.resolve(nameserver, rrtype=rrtype)
+ if 0 != s:
+ ub_strerror(s)
+ return 3
+ nsips = nsips + result.data.as_address_list()
+ results = dict()
+ for ip in nsips:
+ newres = ub_ctx()
+ newres.set_fwd(ip)
+ s, result = newres.resolve(name, rrtype=RR_TYPE_SOA)
+ if 0 != s:
+ ub_strerror(s)
+ return 3
+
+ s, result = ldns.ldns_wire2pkt(result.packet)
+ soas = list(result.rr_list_by_type(RR_TYPE_SOA,
+ ldns.LDNS_SECTION_ANSWER).rrs())
+
+ if len(soas) != 1:
+ return 3
+ serial = str(soas[0]).split()[6]
+ results[serial] = results.get(serial, []) + [ip]
+
+ if len(results) == 1:
+ return 0
+ else:
+ print("CRIT (different SOAs): %s", results)
+ return 2
def main():
parser = OptionParser()
parser.add_option("-c", "--critical-days",
action="store", type=int, dest="crit", default=2,
help="minimum remaining validity in days before a warning is issued")
-
+ parser.add_option("-v", action="store_true", dest="verbose", default=False)
+ parser.add_option("-q", action="store_false", dest="verbose")
opts, _args = parser.parse_args()
+ if not opts.names:
+ parser.error("needs at least one DNS name")
+
resolver = ub_ctx()
resolver.add_ta_file(opts.ancor)
encoding = sys.getfilesystemencoding()
final = 0
- for name in opts.names:
- result = check_dnssec_expire(resolver, idn2dname(name.decode(encoding)),
+ for name in opts.names:
+ name = idn2dname(name.decode(encoding))
+ result1 = check_zone_synced(resolver, name)
+ if result1 == 2:
+ final = 2
+ elif result1 == 1 and final != 2:
+ final = 1
+ elif result1 == 3 and final not in [1, 2]:
+ final = 3
+
+ result2 = check_dnssec_expire(resolver, name,
timedelta(opts.warn), timedelta(opts.crit))
- if result == 2:
+ if result1 + result2 == 0 and opts.verbose:
+ print("OK %s" % name)
+ if result2 == 2:
final = 2
- elif result == 1 and final != 2:
+ elif result2 == 1 and final != 2:
final = 1
- elif result == 3 and final not in [1, 2]:
+ elif result2 == 3 and final not in [1, 2]:
final = 3
sys.exit(final)