]> git.siccegge.de Git - dane-monitoring-plugins.git/blob - check_dane/smtp.py
Refactor TLSA service checks
[dane-monitoring-plugins.git] / check_dane / smtp.py
1 #!/usr/bin/python3
2
3 #!/usr/bin/python3
4
5 from __future__ import print_function
6
7 import argparse
8 import logging
9
10 from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED
11 from socket import socket
12
13 from check_dane.cert import add_certificate_options
14 from check_dane.abstract import DaneChecker
15
16
17 class SmtpDaneChecker(DaneChecker):
18 def _init_connection(self, family, host, port):
19
20 if self.ssl:
21 connection = self._sslcontext.wrap_socket(socket(family),
22 server_hostname=host)
23 connection.connect((host, port))
24 answer = connection.recv(512)
25 logging.debug(answer)
26
27 connection.send(b"EHLO localhost\r\n")
28 answer = connection.recv(512)
29 logging.debug(answer)
30
31 else:
32 connection = socket(family=family)
33 connection.connect((host, port))
34 answer = connection.recv(512)
35 logging.debug(answer)
36
37 connection.send(b"EHLO localhost\r\n")
38 answer = connection.recv(512)
39 logging.debug(answer)
40
41 connection.send(b"STARTTLS\r\n")
42 answer = connection.recv(512)
43 logging.debug(answer)
44
45 connection = self._sslcontext.wrap_socket(connection, server_hostname=host)
46 connection.do_handshake()
47
48 connection.send(b"EHLO localhost\r\n")
49 answer = connection.recv(512)
50 logging.debug(answer)
51
52 return connection
53
54
55 @property
56 def port(self):
57 return self._port
58
59
60 @property
61 def ssl(self):
62 return self._ssl
63
64
65 def _close_connection(self, connection):
66 connection.send(b"QUIT\r\n")
67 answer = connection.recv(512)
68 logging.debug(answer)
69 connection.close()
70
71
72 def __init__(self):
73 self._port = None
74 self._ssl = None
75 DaneChecker.__init__(self)
76
77
78 def set_args(self, args):
79 DaneChecker.set_args(self, args)
80
81 self._ssl = args.ssl
82 if args.port == 0:
83 self._port = 465 if args.ssl else 25
84 else:
85 self._port = args.port
86
87 sslcontext = SSLContext(PROTOCOL_TLSv1_2)
88 sslcontext.verify_mode = CERT_REQUIRED
89 sslcontext.load_verify_locations(args.castore)
90
91 self._sslcontext = sslcontext
92
93
94 def generate_menu(self, argparser):
95 DaneChecker.generate_menu(self, argparser)
96 argparser.add_argument("-p", "--port",
97 action="store", type=int, default=0,
98 help="SMTP port")
99 argparser.add_argument("--ssl",
100 action="store_true",
101 help="Use direct TLS connection instead of starttls (default: disabled)")
102
103
104
105
106
107 def main():
108 logging.basicConfig(format='%(levelname)5s %(message)s')
109 checker = SmtpDaneChecker()
110 parser = argparse.ArgumentParser()
111
112 parser.add_argument("--verbose", action="store_true")
113 parser.add_argument("--quiet", action="store_true")
114
115 checker.generate_menu(parser)
116 add_certificate_options(parser)
117
118 args = parser.parse_args()
119 checker.set_args(args)
120
121 if args.verbose:
122 logging.getLogger().setLevel(logging.DEBUG)
123 elif args.quiet:
124 logging.getLogger().setLevel(logging.WARNING)
125 else:
126 logging.getLogger().setLevel(logging.INFO)
127
128 return checker.check()
129
130
131 if __name__ == '__main__':
132 import sys
133 sys.exit(main())