]> git.siccegge.de Git - dane-monitoring-plugins.git/blob - check_dane_smtp
Initial implementation of DANE SMTP check
[dane-monitoring-plugins.git] / check_dane_smtp
1 #!/usr/bin/python3
2 #
3 #
4
5 from __future__ import print_function
6
7 import sys
8 import argparse
9
10 from socket import socket, AF_INET6, AF_INET, create_connection
11 from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED, cert_time_to_seconds, SSLError, CertificateError, create_default_context
12 from unbound import ub_ctx, idn2dname, ub_strerror
13
14 from check_dane.tlsa import verify_tlsa_record
15
16 def init_connection(sslcontext, args):
17 host = args.Host
18
19 if args.ssl:
20 port = 465 if args.port == 0 else args.port
21 connection = context.wrap_socket(socket(AF_INET),
22 server_hostname=host)
23 connection.connect(host, port)
24
25 else:
26 port = 25 if args.port == 0 else args.port
27 connection = create_connection((host, port))
28 print(connection.recv(512))
29 connection.send(b"EHLO localhost\r\n")
30 print(connection.recv(512))
31 connection.send(b"STARTTLS\r\n")
32 print(connection.recv(512))
33 connection = sslcontext.wrap_socket(connection, server_hostname=host)
34 connection.do_handshake()
35
36 return connection
37
38
39 def close_connection(connection):
40 connection.send(b"QUIT\r\n")
41 print(connection.recv(512))
42
43
44 def init(args):
45 sslcontext = SSLContext(PROTOCOL_TLSv1_2)
46 sslcontext.verify_mode = CERT_REQUIRED
47 sslcontext.load_verify_locations(args.castore)
48
49 resolver = ub_ctx()
50 resolver.add_ta_file(args.ancor)
51
52 return sslcontext, resolver
53
54
55 def main():
56 parser = argparse.ArgumentParser()
57 parser.add_argument("Host")
58
59 parser.add_argument("-p", "--port",
60 action="store", type=int, default=0,
61 help="SMTP port")
62 parser.add_argument("--ssl",
63 action="store_true",
64 help="Use direct TLS connection instead of starttls (default: disabled)")
65 parser.add_argument("--check-dane",
66 action="store_false",
67 help="Verify presented certificate via DANE (default: enabled)")
68 parser.add_argument("--check-ca",
69 action="store_false",
70 help="Verify presented certificate via the CA system (default: enabled)")
71 parser.add_argument("--check-expire",
72 action="store_false",
73 help="Verify presented certificate for expiration (default: enabled)")
74
75 parser.add_argument("-a", "--ancor",
76 action="store", type=str, default="/etc/unbound/root.key",
77 help="DNSSEC root ancor")
78 parser.add_argument("--castore", action="store", type=str,
79 default="/etc/ssl/certs/ca-certificates.crt",
80 help="ca certificate bundle")
81
82 group = parser.add_mutually_exclusive_group()
83 group.add_argument("-6", "--6", action="store_true", help="check via IPv6 only")
84 group.add_argument("-4", "--4", action="store_true", help="check via IPv4 only")
85 group.add_argument("--64", action="store_false", help="check via IPv4 and IPv6 (default)")
86
87 args = parser.parse_args()
88 sslcontext, resolver = init(args)
89 print(args)
90
91 connection = init_connection(sslcontext, args)
92
93 verify_tlsa_record(resolver, "_25._tcp.%s" % args.Host, connection.getpeercert(binary_form=True))
94
95 close_connection(connection)
96
97
98 if __name__ == '__main__':
99 main()