From 19426697a401ad52cf88d88700b3e14f05a9d4a7 Mon Sep 17 00:00:00 2001 From: Christoph Egger Date: Sat, 6 Jan 2018 22:46:52 +0100 Subject: [PATCH] Refactor TLSA service checks --- check_dane/abstract.py | 53 ++++++------- check_dane/https.py | 25 +++--- check_dane/smtp.py | 133 +++++++++++++++++++++++++++++++ check_dane/tlsa.py | 13 ++-- check_dane/xmpp.py | 172 ++++++++++++++++++++++++++++++++++++++++ check_dane_https | 115 --------------------------- check_dane_smtp | 157 ------------------------------------- check_dane_xmpp | 173 ----------------------------------------- setup.py | 4 +- 9 files changed, 350 insertions(+), 495 deletions(-) create mode 100644 check_dane/smtp.py create mode 100644 check_dane/xmpp.py delete mode 100755 check_dane_https delete mode 100755 check_dane_smtp delete mode 100755 check_dane_xmpp diff --git a/check_dane/abstract.py b/check_dane/abstract.py index 1373ed0..4a3265b 100644 --- a/check_dane/abstract.py +++ b/check_dane/abstract.py @@ -1,10 +1,9 @@ -from abc import ABCMeta, abstractmethod -from unbound import ub_ctx -from socket import socket, AF_INET6, AF_INET -from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED +from abc import ABC, abstractmethod +from socket import AF_INET6, AF_INET +from unbound import ub_ctx -from check_dane.cert import verify_certificate, add_certificate_options +from check_dane.cert import verify_certificate from check_dane.tlsa import get_tlsa_records, match_tlsa_records @@ -15,18 +14,18 @@ class DaneError: pass -class DaneChecker: +class DaneChecker(ABC): def __init__(self): pass @abstractmethod - def _init_connection(self): + def _init_connection(self, family, host, port): pass @abstractmethod - def _close_connection(self): + def _close_connection(self, connection): pass @@ -35,16 +34,12 @@ class DaneChecker: def port(self): pass - + def _gather_certificates(self): retval = 0 certificates = set() for afamily in self._afamilies: - try: - connection = self._init_connection(afamily, self._host, self.port) - except ConnectionRefusedError: - logging.error("Connection refused") - return 2 + connection = self._init_connection(afamily, self._host, self.port) nretval = verify_certificate(connection.getpeercert(), self._args) retval = max(retval, nretval) @@ -53,38 +48,38 @@ class DaneChecker: self._close_connection(connection) return certificates - - + + def _gather_records(self): return get_tlsa_records(self._resolver, "_%d._tcp.%s" % (self.port, self._host)) - + def generate_menu(self, argparser): argparser.add_argument("Host") argparser.add_argument("--check-dane", - action="store_false", - help="Verify presented certificate via DANE (default: enabled)") + action="store_false", + help="Verify presented certificate via DANE (default: enabled)") argparser.add_argument("--check-ca", - action="store_false", - help="Verify presented certificate via the CA system (default: enabled)") + action="store_false", + help="Verify presented certificate via the CA system (default: enabled)") argparser.add_argument("--check-expire", - action="store_false", - help="Verify presented certificate for expiration (default: enabled)") + action="store_false", + help="Verify presented certificate for expiration (default: enabled)") argparser.add_argument("-a", "--ancor", - action="store", type=str, default="/usr/share/dns/root.key", - help="DNSSEC root ancor") + action="store", type=str, default="/usr/share/dns/root.key", + help="DNSSEC root ancor") argparser.add_argument("--castore", action="store", type=str, - default="/etc/ssl/certs/ca-certificates.crt", - help="ca certificate bundle") + default="/etc/ssl/certs/ca-certificates.crt", + help="ca certificate bundle") group = argparser.add_mutually_exclusive_group() group.add_argument("-6", "--6", action="store_true", dest="use6", help="check via IPv6 only") group.add_argument("-4", "--4", action="store_true", dest="use4", help="check via IPv4 only") - def set_args(self, args): + def set_args(self, args): self._args = args resolver = ub_ctx() resolver.add_ta_file(args.ancor) @@ -98,7 +93,7 @@ class DaneChecker: self._afamilies = [AF_INET, AF_INET6] self._host = args.Host.encode('idna').decode() - + def check(self): records = self._gather_records() diff --git a/check_dane/https.py b/check_dane/https.py index c437e47..a5b6895 100644 --- a/check_dane/https.py +++ b/check_dane/https.py @@ -2,20 +2,16 @@ from __future__ import print_function -import sys import argparse import logging +from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED from socket import socket -from check_dane.tlsa import get_tlsa_records, match_tlsa_records -from check_dane.cert import verify_certificate, add_certificate_options +from check_dane.cert import add_certificate_options from check_dane.abstract import DaneChecker -from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED - - class HttpsDaneChecker(DaneChecker): def _init_connection(self, family, host, port): connection = self._sslcontext.wrap_socket(socket(family), @@ -30,34 +26,36 @@ class HttpsDaneChecker(DaneChecker): @property def port(self): - return 443 + return self._port + - def _close_connection(self, connection): connection.close() - + def __init__(self): DaneChecker.__init__(self) def set_args(self, args): DaneChecker.set_args(self, args) - + + self._port = args.port + sslcontext = SSLContext(PROTOCOL_TLSv1_2) sslcontext.verify_mode = CERT_REQUIRED sslcontext.load_verify_locations(args.castore) self._sslcontext = sslcontext - + def generate_menu(self, argparser): DaneChecker.generate_menu(self, argparser) argparser.add_argument("-p", "--port", action="store", type=int, default=443, help="HTTPS port") - + def main(): @@ -80,9 +78,10 @@ def main(): logging.getLogger().setLevel(logging.WARNING) else: logging.getLogger().setLevel(logging.INFO) - + return checker.check() + if __name__ == '__main__': import sys sys.exit(main()) diff --git a/check_dane/smtp.py b/check_dane/smtp.py new file mode 100644 index 0000000..6450fc8 --- /dev/null +++ b/check_dane/smtp.py @@ -0,0 +1,133 @@ +#!/usr/bin/python3 + +#!/usr/bin/python3 + +from __future__ import print_function + +import argparse +import logging + +from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED +from socket import socket + +from check_dane.cert import add_certificate_options +from check_dane.abstract import DaneChecker + + +class SmtpDaneChecker(DaneChecker): + def _init_connection(self, family, host, port): + + if self.ssl: + connection = self._sslcontext.wrap_socket(socket(family), + server_hostname=host) + connection.connect((host, port)) + answer = connection.recv(512) + logging.debug(answer) + + connection.send(b"EHLO localhost\r\n") + answer = connection.recv(512) + logging.debug(answer) + + else: + connection = socket(family=family) + connection.connect((host, port)) + answer = connection.recv(512) + logging.debug(answer) + + connection.send(b"EHLO localhost\r\n") + answer = connection.recv(512) + logging.debug(answer) + + connection.send(b"STARTTLS\r\n") + answer = connection.recv(512) + logging.debug(answer) + + connection = self._sslcontext.wrap_socket(connection, server_hostname=host) + connection.do_handshake() + + connection.send(b"EHLO localhost\r\n") + answer = connection.recv(512) + logging.debug(answer) + + return connection + + + @property + def port(self): + return self._port + + + @property + def ssl(self): + return self._ssl + + + def _close_connection(self, connection): + connection.send(b"QUIT\r\n") + answer = connection.recv(512) + logging.debug(answer) + connection.close() + + + def __init__(self): + self._port = None + self._ssl = None + DaneChecker.__init__(self) + + + def set_args(self, args): + DaneChecker.set_args(self, args) + + self._ssl = args.ssl + if args.port == 0: + self._port = 465 if args.ssl else 25 + else: + self._port = args.port + + sslcontext = SSLContext(PROTOCOL_TLSv1_2) + sslcontext.verify_mode = CERT_REQUIRED + sslcontext.load_verify_locations(args.castore) + + self._sslcontext = sslcontext + + + def generate_menu(self, argparser): + DaneChecker.generate_menu(self, argparser) + argparser.add_argument("-p", "--port", + action="store", type=int, default=0, + help="SMTP port") + argparser.add_argument("--ssl", + action="store_true", + help="Use direct TLS connection instead of starttls (default: disabled)") + + + + + +def main(): + logging.basicConfig(format='%(levelname)5s %(message)s') + checker = SmtpDaneChecker() + parser = argparse.ArgumentParser() + + parser.add_argument("--verbose", action="store_true") + parser.add_argument("--quiet", action="store_true") + + checker.generate_menu(parser) + add_certificate_options(parser) + + args = parser.parse_args() + checker.set_args(args) + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + elif args.quiet: + logging.getLogger().setLevel(logging.WARNING) + else: + logging.getLogger().setLevel(logging.INFO) + + return checker.check() + + +if __name__ == '__main__': + import sys + sys.exit(main()) diff --git a/check_dane/tlsa.py b/check_dane/tlsa.py index 9d31b5d..a7992b2 100644 --- a/check_dane/tlsa.py +++ b/check_dane/tlsa.py @@ -95,18 +95,17 @@ def get_tlsa_records(resolver, name): return if r.data is None: - logging.warn("No TLSA record returned") + logging.warning("No TLSA record returned") return set() result = set() for record in r.data.data: - hexencoder = codecs.getencoder('hex') - usage = ord(record[0]) - selector = ord(record[1]) - matching = ord(record[2]) + usage = record[0] + selector = record[1] + matching = record[2] data = record[3:] result.add(TLSARecord(usage, selector, matching, data)) - + return result @@ -131,7 +130,7 @@ def match_tlsa_records(records, certificates): for record in records: if not record in usedrecords: - logging.warn("Unused record %s", record) + logging.warning("Unused record %s", record) if result == 0: result = 1 diff --git a/check_dane/xmpp.py b/check_dane/xmpp.py new file mode 100644 index 0000000..9452f8e --- /dev/null +++ b/check_dane/xmpp.py @@ -0,0 +1,172 @@ +#!/usr/bin/python3 + +#!/usr/bin/python3 + +from __future__ import print_function + +import argparse +import logging + +from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED +from socket import socket + +from check_dane.tlsa import get_tlsa_records +from check_dane.cert import add_certificate_options +from check_dane.abstract import DaneChecker +from check_dane.resolve import Resolver, srv_lookup + +XMPP_OPEN = ("") +XMPP_CLOSE = "" +XMPP_STARTTLS = "" + +class XmppDaneChecker(DaneChecker): + def _init_connection(self, family, host, port): + + logging.debug("Connecting to %s:%d", host, port) + + connection = socket(family=family) + connection.connect((host, port)) + + connection.sendall(XMPP_OPEN.format(self.servicetype, self._hostname).encode()) + answer = connection.recv(4096) + logging.debug(answer) + + if not b'' in answer: + answer = connection.recv(4096) + logging.debug(answer) + + connection.sendall(XMPP_STARTTLS.encode()) + answer = connection.recv(4096) + logging.debug(answer) + + print(host, self._hostname) + connection = self._sslcontext.wrap_socket(connection, server_hostname=self._hostname) + connection.do_handshake() + + connection.sendall(XMPP_OPEN.format(self.servicetype, self._hostname).encode()) + answer = connection.recv(4096) + logging.debug(answer) + + if not b'' in answer: + answer = connection.recv(4096) + logging.debug(answer) + + return connection + + + @property + def port(self): + return self._port + + + @property + def servicetype(self): + return self._type + + + def _gather_certificates(self): + result = set() + for (host, port), meta in self._endpoints: + self._host = host + self._port = port + self._type = meta['type'] + result.update(DaneChecker._gather_certificates(self)) + + return result + + + def _gather_records(self): + result = set() + for (host, port), _ in self._endpoints: + print(repr((host, port))) + result.update(get_tlsa_records(self._resolver, "_%d._tcp.%s" % (port, host))) + + return result + + + def _close_connection(self, connection): + connection.send(XMPP_CLOSE.encode()) + answer = connection.recv(512) + logging.debug(answer) + connection.close() + + + def __init__(self): + self._port = None + self._ssl = None + self._host = None + self._hostname = None + DaneChecker.__init__(self) + + + def set_args(self, args): + DaneChecker.set_args(self, args) + + sslcontext = SSLContext(PROTOCOL_TLSv1_2) + sslcontext.verify_mode = CERT_REQUIRED + sslcontext.load_verify_locations(args.castore) + + cresolver = Resolver(args.ancor) + self._sslcontext = sslcontext + + self._hostname = args.Host.encode('idna').decode() + endpoints = [] + if not args.s2s: + for endpoint, meta in srv_lookup("_xmpp-client._tcp.%s" % + self._hostname, + cresolver): + meta['type'] = 'client' + endpoints.append((endpoint, meta)) + if not args.c2s: + for endpoint, meta in srv_lookup("_xmpp-server._tcp.%s" % + self._hostname, + cresolver): + meta['type'] = 'server' + endpoints.append((endpoint, meta)) + + self._endpoints = endpoints + + + def generate_menu(self, argparser): + DaneChecker.generate_menu(self, argparser) + group = argparser.add_mutually_exclusive_group() + group.add_argument("--s2s", action="store_true", + help="Only check server-to-server connections") + group.add_argument("--c2s", action="store_true", + help="Only check client-to-server connections") + argparser.add_argument("-p", "--port", + action="store", type=int, default=0, + help="SMTP port") + + + + +def main(): + logging.basicConfig(format='%(levelname)5s %(message)s') + checker = XmppDaneChecker() + parser = argparse.ArgumentParser() + + parser.add_argument("--verbose", action="store_true") + parser.add_argument("--quiet", action="store_true") + + checker.generate_menu(parser) + add_certificate_options(parser) + + args = parser.parse_args() + checker.set_args(args) + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + elif args.quiet: + logging.getLogger().setLevel(logging.WARNING) + else: + logging.getLogger().setLevel(logging.INFO) + + return checker.check() + + +if __name__ == '__main__': + import sys + sys.exit(main()) diff --git a/check_dane_https b/check_dane_https deleted file mode 100755 index 3d758c2..0000000 --- a/check_dane_https +++ /dev/null @@ -1,115 +0,0 @@ -#!/usr/bin/python3 - -from __future__ import print_function - -import sys -import argparse -import logging - -from socket import socket, AF_INET6, AF_INET -from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED -from unbound import ub_ctx - -from check_dane.tlsa import verify_tlsa_record -from check_dane.cert import verify_certificate, add_certificate_options - -def init_connection(sslcontext, family, host, port): - connection = sslcontext.wrap_socket(socket(family), - server_hostname=host) - connection.connect((host, port)) - connection.send(b"HEAD / HTTP/1.1\r\nHost: %s\r\n\r\n" % host.encode()) - answer = connection.recv(512) - logging.debug(answer) - - return connection - - -def close_connection(connection): - connection.close() - - -def init(args): - sslcontext = SSLContext(PROTOCOL_TLSv1_2) - sslcontext.verify_mode = CERT_REQUIRED - sslcontext.load_verify_locations(args.castore) - - resolver = ub_ctx() - resolver.add_ta_file(args.ancor) - - return sslcontext, resolver - - -def main(): - logging.basicConfig(format='%(levelname)5s %(message)s') - parser = argparse.ArgumentParser() - parser.add_argument("Host") - - parser.add_argument("--verbose", action="store_true") - parser.add_argument("--quiet", action="store_true") - parser.add_argument("-p", "--port", - action="store", type=int, default=443, - help="HTTPS port") - parser.add_argument("--check-dane", - action="store_false", - help="Verify presented certificate via DANE (default: enabled)") - parser.add_argument("--check-ca", - action="store_false", - help="Verify presented certificate via the CA system (default: enabled)") - parser.add_argument("--check-expire", - action="store_false", - help="Verify presented certificate for expiration (default: enabled)") - - parser.add_argument("-a", "--ancor", - action="store", type=str, default="/etc/unbound/root.key", - help="DNSSEC root ancor") - parser.add_argument("--castore", action="store", type=str, - default="/etc/ssl/certs/ca-certificates.crt", - help="ca certificate bundle") - - group = parser.add_mutually_exclusive_group() - group.add_argument("-6", "--6", action="store_true", dest="use6", help="check via IPv6 only") - group.add_argument("-4", "--4", action="store_true", dest="use4", help="check via IPv4 only") - group.add_argument("--64", action="store_false", dest="use64", help="check via IPv4 and IPv6 (default)") - - add_certificate_options(parser) - - args = parser.parse_args() - - if args.verbose: - logging.getLogger().setLevel(logging.DEBUG) - elif args.quiet: - logging.getLogger().setLevel(logging.WARNING) - else: - logging.getLogger().setLevel(logging.INFO) - - host = args.Host.encode('idna').decode() - sslcontext, resolver = init(args) - - if args.use6: - afamilies = [AF_INET6] - elif args.use4: - afamilies = [AF_INET6] - else: - afamilies = [AF_INET, AF_INET6] - - retval = 0 - for afamily in afamilies: - try: - connection = init_connection(sslcontext, afamily, host, args.port) - except ConnectionRefusedError: - logging.error("Connection refused") - return 2 - - nretval = verify_certificate(connection.getpeercert(), args) - retval = max(retval, nretval) - nretval = verify_tlsa_record(resolver, "_%d._tcp.%s" % (args.port, host), - connection.getpeercert(binary_form=True)) - retval = max(retval, nretval) - - close_connection(connection) - - return retval - - -if __name__ == '__main__': - sys.exit(main()) diff --git a/check_dane_smtp b/check_dane_smtp deleted file mode 100755 index 28c6efe..0000000 --- a/check_dane_smtp +++ /dev/null @@ -1,157 +0,0 @@ -#!/usr/bin/python3 -# -# - -from __future__ import print_function - -import sys -import argparse -import logging - -from socket import socket, AF_INET6, AF_INET, create_connection -from ssl import SSLError, CertificateError, SSLContext -from ssl import PROTOCOL_TLSv1_2, CERT_REQUIRED -from unbound import ub_ctx - -from check_dane.tlsa import verify_tlsa_record -from check_dane.cert import verify_certificate, add_certificate_options - -def init_connection(sslcontext, args, family): - host = args.Host - - if args.ssl: - port = 465 if args.port == 0 else args.port - connection = sslcontext.wrap_socket(socket(family), - server_hostname=host) - connection.connect((host, port)) - answer = connection.recv(512) - logging.debug(answer) - - connection.send(b"EHLO localhost\r\n") - answer = connection.recv(512) - logging.debug(answer) - - else: - port = 25 if args.port == 0 else args.port - - connection = socket(family=family) - connection.connect((host, port)) - answer = connection.recv(512) - logging.debug(answer) - - connection.send(b"EHLO localhost\r\n") - answer = connection.recv(512) - logging.debug(answer) - - connection.send(b"STARTTLS\r\n") - answer = connection.recv(512) - logging.debug(answer) - - connection = sslcontext.wrap_socket(connection, server_hostname=host) - connection.do_handshake() - - connection.send(b"EHLO localhost\r\n") - answer = connection.recv(512) - logging.debug(answer) - - return connection - - -def close_connection(connection): - connection.send(b"QUIT\r\n") - answer = connection.recv(512) - logging.debug(answer) - - -def init(args): - sslcontext = SSLContext(PROTOCOL_TLSv1_2) - sslcontext.verify_mode = CERT_REQUIRED - sslcontext.load_verify_locations(args.castore) - - resolver = ub_ctx() - resolver.add_ta_file(args.ancor) - - return sslcontext, resolver - - -def main(): - logging.basicConfig(format='%(levelname)5s %(message)s') - parser = argparse.ArgumentParser() - parser.add_argument("Host") - - parser.add_argument("--verbose", action="store_true") - parser.add_argument("--quiet", action="store_true") - parser.add_argument("-p", "--port", - action="store", type=int, default=0, - help="SMTP port") - parser.add_argument("--ssl", - action="store_true", - help="Use direct TLS connection instead of starttls (default: disabled)") - parser.add_argument("--check-dane", - action="store_false", - help="Verify presented certificate via DANE (default: enabled)") - parser.add_argument("--check-ca", - action="store_false", - help="Verify presented certificate via the CA system (default: enabled)") - parser.add_argument("--check-expire", - action="store_false", - help="Verify presented certificate for expiration (default: enabled)") - - parser.add_argument("-a", "--ancor", - action="store", type=str, default="/etc/unbound/root.key", - help="DNSSEC root ancor") - parser.add_argument("--castore", action="store", type=str, - default="/etc/ssl/certs/ca-certificates.crt", - help="ca certificate bundle") - - group = parser.add_mutually_exclusive_group() - group.add_argument("-6", "--6", action="store_true", dest="use6", help="check via IPv6 only") - group.add_argument("-4", "--4", action="store_true", dest="use4", help="check via IPv4 only") - group.add_argument("--64", action="store_false", dest="use64", help="check via IPv4 and IPv6 (default)") - - add_certificate_options(parser) - - args = parser.parse_args() - - if args.verbose: - logging.getLogger().setLevel(logging.DEBUG) - elif args.quiet: - logging.getLogger().setLevel(logging.WARNING) - else: - logging.getLogger().setLevel(logging.INFO) - - port = args.port - if port == 0: - port = 465 if args.ssl else 25 - host = args.Host.encode('idna').decode() - - sslcontext, resolver = init(args) - - if args.use6: - afamilies = [AF_INET6] - elif args.use4: - afamilies = [AF_INET6] - else: - afamilies = [AF_INET, AF_INET6] - - retval = 0 - for afamily in afamilies: - try: - connection = init_connection(sslcontext, args, afamily) - except ConnectionRefusedError: - logging.error("Connection refused") - return 2 - - nretval = verify_certificate(connection.getpeercert(), args) - retval = max(retval, nretval) - nretval = verify_tlsa_record(resolver, "_%d._tcp.%s" % (port, host), - connection.getpeercert(binary_form=True)) - retval = max(retval, nretval) - - close_connection(connection) - - return retval - - -if __name__ == '__main__': - sys.exit(main()) diff --git a/check_dane_xmpp b/check_dane_xmpp deleted file mode 100755 index 1c1f4a6..0000000 --- a/check_dane_xmpp +++ /dev/null @@ -1,173 +0,0 @@ -#!/usr/bin/python3 -# -# - -from __future__ import print_function - -import sys -import argparse -import logging - -from socket import socket, AF_INET6, AF_INET, create_connection -from ssl import SSLError, CertificateError, SSLContext -from ssl import PROTOCOL_TLSv1_2, CERT_REQUIRED - -from unbound import ub_ctx - -from check_dane.resolve import Resolver, ResolverException, srv_lookup -from check_dane.tlsa import verify_tlsa_record -from check_dane.cert import verify_certificate, add_certificate_options - -XMPP_OPEN = ("") -XMPP_CLOSE = "" -XMPP_STARTTLS = "" - - -def init_connection(sslcontext, args, family, endpoint, metadata): - host, _ = endpoint - logging.debug("Connecting to %s", endpoint) - - connection = socket(family=family) - connection.connect(endpoint) - - connection.sendall(XMPP_OPEN.format(metadata['type'], args.Host).encode()) - answer = connection.recv(4096) - logging.debug(answer) - - if not '' in answer: - answer = connection.recv(4096) - logging.debug(answer) - - connection.sendall(XMPP_STARTTLS.encode()) - answer = connection.recv(4096) - logging.debug(answer) - - connection = sslcontext.wrap_socket(connection, server_hostname=host) - connection.do_handshake() - - connection.sendall(XMPP_OPEN.format(metadata['type'], args.Host).encode()) - answer = connection.recv(4096) - logging.debug(answer) - - if not '' in answer: - answer = connection.recv(4096) - logging.debug(answer) - - return connection - - -def close_connection(connection): - connection.send(XMPP_CLOSE.encode()) - answer = connection.recv(512) - logging.debug(answer) - - -def init(args): - sslcontext = SSLContext(PROTOCOL_TLSv1_2) - sslcontext.verify_mode = CERT_REQUIRED - sslcontext.load_verify_locations(args.castore) - - resolver = ub_ctx() - resolver.add_ta_file(args.ancor) - - return sslcontext, resolver - - -def main(): - logging.basicConfig(format='%(levelname)5s %(message)s') - parser = argparse.ArgumentParser() - parser.add_argument("Host") - - parser.add_argument("--verbose", action="store_true") - parser.add_argument("--quiet", action="store_true") - # parser.add_argument("-p", "--port", - # action="store", type=int, default=0, - # help="XMPP port") - group = parser.add_mutually_exclusive_group() - group.add_argument("--s2s", action="store_true", - help="Only check server-to-server connections") - group.add_argument("--c2s", action="store_true", - help="Only check client-to-server connections") - # parser.add_argument("--ssl", - # action="store_true", - # help="Use direct TLS connection instead of starttls (default: disabled)") - parser.add_argument("--check-dane", - action="store_false", - help="Verify presented certificate via DANE (default: enabled)") - parser.add_argument("--check-ca", - action="store_false", - help="Verify presented certificate via the CA system (default: enabled)") - parser.add_argument("--check-expire", - action="store_false", - help="Verify presented certificate for expiration (default: enabled)") - - parser.add_argument("-a", "--ancor", - action="store", type=str, default="/etc/unbound/root.key", - help="DNSSEC root ancor") - parser.add_argument("--castore", action="store", type=str, - default="/etc/ssl/certs/ca-certificates.crt", - help="ca certificate bundle") - - group = parser.add_mutually_exclusive_group() - group.add_argument("-6", "--6", action="store_true", dest="use6", help="check via IPv6 only") - group.add_argument("-4", "--4", action="store_true", dest="use4", help="check via IPv4 only") - group.add_argument("--64", action="store_false", dest="use64", help="check via IPv4 and IPv6 (default)") - - add_certificate_options(parser) - - args = parser.parse_args() - - if args.verbose: - logging.getLogger().setLevel(logging.DEBUG) - elif args.quiet: - logging.getLogger().setLevel(logging.WARNING) - else: - logging.getLogger().setLevel(logging.INFO) - - host = args.Host.encode('idna').decode() - - sslcontext, resolver = init(args) - - if args.use6: - afamilies = [AF_INET6] - elif args.use4: - afamilies = [AF_INET6] - else: - afamilies = [AF_INET, AF_INET6] - - xresolver = Resolver(args.ancor) - endpoints = [] - if not args.s2s: - for endpoint, meta in srv_lookup("_xmpp-client._tcp.%s" % host, xresolver): - meta['type'] = 'client' - endpoints.append((endpoint, meta)) - if not args.c2s: - for endpoint, meta in srv_lookup("_xmpp-server._tcp.%s" % host, xresolver): - meta['type'] = 'server' - endpoints.append((endpoint, meta)) - - retval = 0 - for afamily in afamilies: - for endpoint, metadata in endpoints: - host, port = endpoint - try: - connection = init_connection(sslcontext, args, afamily, endpoint, metadata) - except: - logging.exception("Connection refused") - return 2 - - nretval = verify_certificate(connection.getpeercert(), args) - retval = max(retval, nretval) - nretval = verify_tlsa_record(resolver, "_%d._tcp.%s" % (port, host), - connection.getpeercert(binary_form=True)) - retval = max(retval, nretval) - - close_connection(connection) - - return retval - - -if __name__ == '__main__': - sys.exit(main()) diff --git a/setup.py b/setup.py index 4bf82e8..64c7636 100755 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup(name='DANE monitoring plugins', - version='0.1-alpha1', + version='0.1', description='DANE aware monitoring plugins', author='Christoph Egger', author_email='christoph@christoph-egger.org', @@ -13,6 +13,8 @@ setup(name='DANE monitoring plugins', entry_points={ 'console_scripts': [ 'check_dane_https = check_dane.https:main', + 'check_dane_smtp = check_dane.smtp:main', + 'check_dane_xmpp = check_dane.xmpp:main', ], } ) -- 2.39.5