]> git.siccegge.de Git - dane-monitoring-plugins.git/blob - check_dane_smtp
README beautifications
[dane-monitoring-plugins.git] / check_dane_smtp
1 #!/usr/bin/python3
2 #
3 #
4
5 from __future__ import print_function
6
7 import sys
8 import argparse
9 import logging
10
11 from socket import socket, AF_INET6, AF_INET, create_connection
12 from ssl import SSLError, CertificateError, SSLContext
13 from ssl import PROTOCOL_TLSv1_2, CERT_REQUIRED
14 from unbound import ub_ctx
15
16 from check_dane.tlsa import verify_tlsa_record
17 from check_dane.cert import verify_certificate, add_certificate_options
18
19 def init_connection(sslcontext, args, family):
20 host = args.Host
21
22 if args.ssl:
23 port = 465 if args.port == 0 else args.port
24 connection = sslcontext.wrap_socket(socket(family),
25 server_hostname=host)
26 connection.connect((host, port))
27 answer = connection.recv(512)
28 logging.debug(answer)
29
30 connection.send(b"EHLO localhost\r\n")
31 answer = connection.recv(512)
32 logging.debug(answer)
33
34 else:
35 port = 25 if args.port == 0 else args.port
36
37 connection = socket(family=family)
38 connection.connect((host, port))
39 answer = connection.recv(512)
40 logging.debug(answer)
41
42 connection.send(b"EHLO localhost\r\n")
43 answer = connection.recv(512)
44 logging.debug(answer)
45
46 connection.send(b"STARTTLS\r\n")
47 answer = connection.recv(512)
48 logging.debug(answer)
49
50 connection = sslcontext.wrap_socket(connection, server_hostname=host)
51 connection.do_handshake()
52
53 connection.send(b"EHLO localhost\r\n")
54 answer = connection.recv(512)
55 logging.debug(answer)
56
57 return connection
58
59
60 def close_connection(connection):
61 connection.send(b"QUIT\r\n")
62 answer = connection.recv(512)
63 logging.debug(answer)
64
65
66 def init(args):
67 sslcontext = SSLContext(PROTOCOL_TLSv1_2)
68 sslcontext.verify_mode = CERT_REQUIRED
69 sslcontext.load_verify_locations(args.castore)
70
71 resolver = ub_ctx()
72 resolver.add_ta_file(args.ancor)
73
74 return sslcontext, resolver
75
76
77 def main():
78 logging.basicConfig(format='%(levelname)5s %(message)s')
79 parser = argparse.ArgumentParser()
80 parser.add_argument("Host")
81
82 parser.add_argument("--verbose", action="store_true")
83 parser.add_argument("--quiet", action="store_true")
84 parser.add_argument("-p", "--port",
85 action="store", type=int, default=0,
86 help="SMTP port")
87 parser.add_argument("--ssl",
88 action="store_true",
89 help="Use direct TLS connection instead of starttls (default: disabled)")
90 parser.add_argument("--check-dane",
91 action="store_false",
92 help="Verify presented certificate via DANE (default: enabled)")
93 parser.add_argument("--check-ca",
94 action="store_false",
95 help="Verify presented certificate via the CA system (default: enabled)")
96 parser.add_argument("--check-expire",
97 action="store_false",
98 help="Verify presented certificate for expiration (default: enabled)")
99
100 parser.add_argument("-a", "--ancor",
101 action="store", type=str, default="/etc/unbound/root.key",
102 help="DNSSEC root ancor")
103 parser.add_argument("--castore", action="store", type=str,
104 default="/etc/ssl/certs/ca-certificates.crt",
105 help="ca certificate bundle")
106
107 group = parser.add_mutually_exclusive_group()
108 group.add_argument("-6", "--6", action="store_true", dest="use6", help="check via IPv6 only")
109 group.add_argument("-4", "--4", action="store_true", dest="use4", help="check via IPv4 only")
110 group.add_argument("--64", action="store_false", dest="use64", help="check via IPv4 and IPv6 (default)")
111
112 add_certificate_options(parser)
113
114 args = parser.parse_args()
115
116 if args.verbose:
117 logging.getLogger().setLevel(logging.DEBUG)
118 elif args.quiet:
119 logging.getLogger().setLevel(logging.WARNING)
120 else:
121 logging.getLogger().setLevel(logging.INFO)
122
123 port = args.port
124 if port == 0:
125 port = 465 if args.ssl else 25
126 host = args.Host.encode('idna').decode()
127
128 sslcontext, resolver = init(args)
129
130 if args.use6:
131 afamilies = [AF_INET6]
132 elif args.use4:
133 afamilies = [AF_INET6]
134 else:
135 afamilies = [AF_INET, AF_INET6]
136
137 retval = 0
138 for afamily in afamilies:
139 try:
140 connection = init_connection(sslcontext, args, afamily)
141 except ConnectionRefusedError:
142 logging.error("Connection refused")
143 return 2
144
145 nretval = verify_certificate(connection.getpeercert(), args)
146 retval = max(retval, nretval)
147 nretval = verify_tlsa_record(resolver, "_%d._tcp.%s" % (port, host),
148 connection.getpeercert(binary_form=True))
149 retval = max(retval, nretval)
150
151 close_connection(connection)
152
153 return retval
154
155
156 if __name__ == '__main__':
157 sys.exit(main())