]> git.siccegge.de Git - dane-monitoring-plugins.git/blob - check_dane/abstract.py
1373ed0e12613ddfa73bedb92a64c589bf9a14cc
[dane-monitoring-plugins.git] / check_dane / abstract.py
1 from abc import ABCMeta, abstractmethod
2 from unbound import ub_ctx
3 from socket import socket, AF_INET6, AF_INET
4 from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED
5
6
7 from check_dane.cert import verify_certificate, add_certificate_options
8 from check_dane.tlsa import get_tlsa_records, match_tlsa_records
9
10
11 class DaneWarning:
12 pass
13
14 class DaneError:
15 pass
16
17
18 class DaneChecker:
19 def __init__(self):
20 pass
21
22
23 @abstractmethod
24 def _init_connection(self):
25 pass
26
27
28 @abstractmethod
29 def _close_connection(self):
30 pass
31
32
33 @property
34 @abstractmethod
35 def port(self):
36 pass
37
38
39 def _gather_certificates(self):
40 retval = 0
41 certificates = set()
42 for afamily in self._afamilies:
43 try:
44 connection = self._init_connection(afamily, self._host, self.port)
45 except ConnectionRefusedError:
46 logging.error("Connection refused")
47 return 2
48
49 nretval = verify_certificate(connection.getpeercert(), self._args)
50 retval = max(retval, nretval)
51 certificates.add(connection.getpeercert(binary_form=True))
52
53 self._close_connection(connection)
54
55 return certificates
56
57
58 def _gather_records(self):
59 return get_tlsa_records(self._resolver, "_%d._tcp.%s" % (self.port, self._host))
60
61
62 def generate_menu(self, argparser):
63 argparser.add_argument("Host")
64
65 argparser.add_argument("--check-dane",
66 action="store_false",
67 help="Verify presented certificate via DANE (default: enabled)")
68 argparser.add_argument("--check-ca",
69 action="store_false",
70 help="Verify presented certificate via the CA system (default: enabled)")
71 argparser.add_argument("--check-expire",
72 action="store_false",
73 help="Verify presented certificate for expiration (default: enabled)")
74
75 argparser.add_argument("-a", "--ancor",
76 action="store", type=str, default="/usr/share/dns/root.key",
77 help="DNSSEC root ancor")
78 argparser.add_argument("--castore", action="store", type=str,
79 default="/etc/ssl/certs/ca-certificates.crt",
80 help="ca certificate bundle")
81
82 group = argparser.add_mutually_exclusive_group()
83 group.add_argument("-6", "--6", action="store_true", dest="use6", help="check via IPv6 only")
84 group.add_argument("-4", "--4", action="store_true", dest="use4", help="check via IPv4 only")
85
86
87 def set_args(self, args):
88 self._args = args
89 resolver = ub_ctx()
90 resolver.add_ta_file(args.ancor)
91 self._resolver = resolver
92
93 if args.use6:
94 self._afamilies = [AF_INET6]
95 elif args.use4:
96 self._afamilies = [AF_INET]
97 else:
98 self._afamilies = [AF_INET, AF_INET6]
99
100 self._host = args.Host.encode('idna').decode()
101
102
103 def check(self):
104 records = self._gather_records()
105 certificates = self._gather_certificates()
106 return match_tlsa_records(records, certificates)