]> git.siccegge.de Git - dane-monitoring-plugins.git/blob - check_dane/abstract.py
Refactor TLSA service checks
[dane-monitoring-plugins.git] / check_dane / abstract.py
1 from abc import ABC, abstractmethod
2 from socket import AF_INET6, AF_INET
3
4 from unbound import ub_ctx
5
6 from check_dane.cert import verify_certificate
7 from check_dane.tlsa import get_tlsa_records, match_tlsa_records
8
9
10 class DaneWarning:
11 pass
12
13 class DaneError:
14 pass
15
16
17 class DaneChecker(ABC):
18 def __init__(self):
19 pass
20
21
22 @abstractmethod
23 def _init_connection(self, family, host, port):
24 pass
25
26
27 @abstractmethod
28 def _close_connection(self, connection):
29 pass
30
31
32 @property
33 @abstractmethod
34 def port(self):
35 pass
36
37
38 def _gather_certificates(self):
39 retval = 0
40 certificates = set()
41 for afamily in self._afamilies:
42 connection = self._init_connection(afamily, self._host, self.port)
43
44 nretval = verify_certificate(connection.getpeercert(), self._args)
45 retval = max(retval, nretval)
46 certificates.add(connection.getpeercert(binary_form=True))
47
48 self._close_connection(connection)
49
50 return certificates
51
52
53 def _gather_records(self):
54 return get_tlsa_records(self._resolver, "_%d._tcp.%s" % (self.port, self._host))
55
56
57 def generate_menu(self, argparser):
58 argparser.add_argument("Host")
59
60 argparser.add_argument("--check-dane",
61 action="store_false",
62 help="Verify presented certificate via DANE (default: enabled)")
63 argparser.add_argument("--check-ca",
64 action="store_false",
65 help="Verify presented certificate via the CA system (default: enabled)")
66 argparser.add_argument("--check-expire",
67 action="store_false",
68 help="Verify presented certificate for expiration (default: enabled)")
69
70 argparser.add_argument("-a", "--ancor",
71 action="store", type=str, default="/usr/share/dns/root.key",
72 help="DNSSEC root ancor")
73 argparser.add_argument("--castore", action="store", type=str,
74 default="/etc/ssl/certs/ca-certificates.crt",
75 help="ca certificate bundle")
76
77 group = argparser.add_mutually_exclusive_group()
78 group.add_argument("-6", "--6", action="store_true", dest="use6", help="check via IPv6 only")
79 group.add_argument("-4", "--4", action="store_true", dest="use4", help="check via IPv4 only")
80
81
82 def set_args(self, args):
83 self._args = args
84 resolver = ub_ctx()
85 resolver.add_ta_file(args.ancor)
86 self._resolver = resolver
87
88 if args.use6:
89 self._afamilies = [AF_INET6]
90 elif args.use4:
91 self._afamilies = [AF_INET]
92 else:
93 self._afamilies = [AF_INET, AF_INET6]
94
95 self._host = args.Host.encode('idna').decode()
96
97
98 def check(self):
99 records = self._gather_records()
100 certificates = self._gather_certificates()
101 return match_tlsa_records(records, certificates)