]> git.siccegge.de Git - dane-monitoring-plugins.git/blob - check_dnssec
Add some debian packaging
[dane-monitoring-plugins.git] / check_dnssec
1 #!/usr/bin/python3
2
3 from __future__ import print_function
4
5 import sys
6 import argparse
7 import logging
8 import hashlib
9 import codecs
10 import struct
11 from collections import namedtuple
12 from pprint import pprint
13 from hashlib import sha256
14
15 from unbound import RR_TYPE_SOA, RR_TYPE_DNSKEY, RR_TYPE_NS
16 from unbound import RR_TYPE_A, RR_TYPE_AAAA, RR_TYPE_DS
17 from ldns import LDNS_SECTION_ANSWER
18 from ldns import ldns_wire2pkt
19
20
21 from check_dane.resolve import Resolver, ResolverException
22 from check_dane.resolve import format_address, dnssec_verify_rrsig_validity
23
24
25 DSRecord = namedtuple('DSRecord', ['keytag', 'algorithm', 'digesttype', 'digest'])
26 DNSKEYRecord = namedtuple('DNSKEYRecord', ['flags', 'protocol', 'algorithm', 'key', 'digest'])
27
28
29 def _keytag(data):
30 keytag = 0
31
32 while len(data) > 0:
33 a, b = data[:2]
34 keytag = keytag + ord(b) + 256 * ord(a)
35 data = data[2:]
36
37 keytag = keytag + ((keytag / 65536) & 0xFFFF)
38 return keytag & 0xFFFF
39
40
41 def check_main_records(resolver, zone, args):
42 """Confirms that the necessary records on a zone all verify"""
43 retval = 0
44
45 for rrtype in [RR_TYPE_DNSKEY, RR_TYPE_NS, RR_TYPE_SOA]:
46 result = resolver.resolve(zone, rrtype=rrtype, secure=True)
47 nretval = dnssec_verify_rrsig_validity(result.packet, args.warndays, args.critdays)
48 retval = max(nretval, retval)
49
50 return retval
51
52
53 def check_ds_delegation(resolver, zone, args):
54 retval = 0
55 try:
56 dses = dict()
57 result = resolver.resolve(zone, RR_TYPE_DS, secure=True)
58
59 for entry in result.data.data:
60 tag, algo, digest = struct.unpack("!HBB", entry[:4])
61 value = entry[4:]
62 dses[tag] = DSRecord(tag, algo, digest, value)
63
64 dnskeys = dict()
65 result = resolver.resolve(zone, RR_TYPE_DNSKEY, secure=True)
66
67 for entry in result.data.data:
68 flags, protocol, algorithm = struct.unpack("!HBB", entry[:4])
69 value = entry[4:]
70 digest = sha256()
71 for label in zone.split('.'):
72 digest.update(struct.pack('b', len(label)))
73 digest.update(label)
74 digest.update(struct.pack('b', 0))
75 digest.update(entry)
76 if flags & 0x1 == 1 and (flags >> 7) & 0x1 == 0:
77 dnskeys[_keytag(entry)] = DNSKEYRecord(flags, protocol, algorithm, value,digest.digest())
78
79 for key in dnskeys:
80 dnskey = dnskeys[key]
81 if not key in dses:
82 logging.warn("No DS record found for %s", dnskey)
83 retval = max(retval, 1)
84
85 else:
86 ds = dses[key]
87 if ds.digest != dnskey.digest:
88 logging.error("DS and DNSKEY do not match: %s %s", ds, dnskey)
89 retval = 2
90
91 for ds in dses:
92 if not ds in dnskeys:
93 logging.warn("Unused DS record: %s", dses[ds])
94 retval = max(retval, 1)
95
96 return retval
97
98 except ResolverException as e:
99 logging.exception("check_ds_delegation: %s", e.message)
100
101
102 def check_nsec_cycle(resolver, zone, args):
103 """Confirms that NSEC records are completely available"""
104 return 0
105
106
107 def check_synced(resolver, zone, args):
108 """Makes sure the zone is at the same serial on all secondaries"""
109 try:
110 result = resolver.resolve(zone, RR_TYPE_NS, secure=True)
111
112 if result.data is None:
113 logging.error("No nameservers found for zone %s", zone)
114 return 2
115
116 nameservers = result.data.as_domain_list()
117 nameserver_ips = []
118 for nameserver in nameservers:
119 ips = []
120 for rrtype in [RR_TYPE_AAAA, RR_TYPE_A]:
121 result = resolver.resolve(nameserver, rrtype=rrtype, secure=True)
122 if result.data is not None:
123 ips = ips + [format_address(data, rrtype) for data in result.data.data]
124
125 if ips == []:
126 logging.warning("Could not find any address for nameserver %s", nameserver)
127
128 nameserver_ips = nameserver_ips + ips
129
130 if nameserver_ips == []:
131 logging.error("No authoritive nameserver for %s could be resolved", zone)
132 return 2
133
134 results = dict()
135 for ip in nameserver_ips:
136 newresolver = Resolver(args.ancor, ip)
137
138 # We can't request secure here as the authoritative
139 # nameservers for the zone won't let us retrieve the
140 # signature chain below their own zone. We'll later
141 # recheck the SOA record using the main recursor
142 #
143 # Alternatively one could get the DS / DNSKEY for the zone with
144 # resolver and add it to newresolver as a hint.
145 result = newresolver.resolve(zone, rrtype=RR_TYPE_SOA)
146
147 s, result = ldns_wire2pkt(result.packet)
148 if s != 0:
149 logging.error("Parsing packet failed with errorcode %d", s)
150 return 2
151
152 rrs = result.rr_list_by_type(RR_TYPE_SOA, LDNS_SECTION_ANSWER).rrs()
153 soa = next(rrs)
154
155 serial = str(soa).split()[6]
156 results[serial] = results.get(serial, []) + [ip]
157
158 if len(results) == 1:
159 return 0
160 else:
161 logging.error("different SOAs: %s", results)
162 return 2
163
164 except ResolverException as e:
165 logging.exception("check_synced: %s", e.message)
166
167
168 def main():
169 logging.basicConfig(format='%(levelname)5s %(message)s')
170 parser = argparse.ArgumentParser()
171 parser.add_argument("Zone")
172
173 parser.add_argument("--verbose", action="store_true")
174 parser.add_argument("--quiet", action="store_true")
175
176 parser.add_argument("-a", "--ancor",
177 action="store", type=str, default="/etc/unbound/root.key",
178 help="DNSSEC root ancor")
179
180 parser.add_argument("--nsec", action="store_false",
181 help="Verifies the complete NSEC/NSEC3 cycle (default: false)")
182 parser.add_argument("--warndays", type=int, default=-1,
183 help="Days before rrsig expiration to warn")
184 parser.add_argument("--critdays", type=int, default=-1,
185 help="Days before rrsig expiration to raise error")
186
187
188 args = parser.parse_args()
189
190 if args.verbose:
191 logging.getLogger().setLevel(logging.DEBUG)
192 elif args.quiet:
193 logging.getLogger().setLevel(logging.WARNING)
194 else:
195 logging.getLogger().setLevel(logging.INFO)
196
197 resolver = Resolver(args.ancor)
198 zone = args.Zone.encode('idna').decode()
199
200 retval1 = check_synced(resolver, zone, args)
201 retval2 = check_main_records(resolver, zone, args)
202 retval3 = check_ds_delegation(resolver, zone, args)
203 if args.nsec:
204 retval4 = check_nsec_cycle(resolver, zone, args)
205 return max(retval1, retval2, retval3, retval4)
206 else:
207 return max(retval1, retval2, retval4)
208
209 if __name__ == '__main__':
210 sys.exit(main())