#!/usr/bin/python3 # # from __future__ import print_function import sys import argparse import logging from socket import socket, AF_INET6, AF_INET, create_connection from ssl import SSLError, CertificateError, SSLContext from ssl import PROTOCOL_TLSv1_2, CERT_REQUIRED from unbound import ub_ctx from check_dane.resolve import Resolver, ResolverException, srv_lookup from check_dane.tlsa import verify_tlsa_record from check_dane.cert import verify_certificate, add_certificate_options XMPP_OPEN = ("") XMPP_CLOSE = "" XMPP_STARTTLS = "" def init_connection(sslcontext, args, family, endpoint, metadata): host, _ = endpoint logging.debug("Connecting to %s", endpoint) connection = socket(family=family) connection.connect(endpoint) connection.sendall(XMPP_OPEN.format(metadata['type'], args.Host).encode()) answer = connection.recv(4096) logging.debug(answer) if not '' in answer: answer = connection.recv(4096) logging.debug(answer) connection.sendall(XMPP_STARTTLS.encode()) answer = connection.recv(4096) logging.debug(answer) connection = sslcontext.wrap_socket(connection, server_hostname=host) connection.do_handshake() connection.sendall(XMPP_OPEN.format(metadata['type'], args.Host).encode()) answer = connection.recv(4096) logging.debug(answer) if not '' in answer: answer = connection.recv(4096) logging.debug(answer) return connection def close_connection(connection): connection.send(XMPP_CLOSE.encode()) answer = connection.recv(512) logging.debug(answer) def init(args): sslcontext = SSLContext(PROTOCOL_TLSv1_2) sslcontext.verify_mode = CERT_REQUIRED sslcontext.load_verify_locations(args.castore) resolver = ub_ctx() resolver.add_ta_file(args.ancor) return sslcontext, resolver def main(): logging.basicConfig(format='%(levelname)5s %(message)s') parser = argparse.ArgumentParser() parser.add_argument("Host") parser.add_argument("--verbose", action="store_true") parser.add_argument("--quiet", action="store_true") # parser.add_argument("-p", "--port", # action="store", type=int, default=0, # help="XMPP port") group = parser.add_mutually_exclusive_group() group.add_argument("--s2s", action="store_true", help="Only check server-to-server connections") group.add_argument("--c2s", action="store_true", help="Only check client-to-server connections") # parser.add_argument("--ssl", # action="store_true", # help="Use direct TLS connection instead of starttls (default: disabled)") parser.add_argument("--check-dane", action="store_false", help="Verify presented certificate via DANE (default: enabled)") parser.add_argument("--check-ca", action="store_false", help="Verify presented certificate via the CA system (default: enabled)") parser.add_argument("--check-expire", action="store_false", help="Verify presented certificate for expiration (default: enabled)") parser.add_argument("-a", "--ancor", action="store", type=str, default="/etc/unbound/root.key", help="DNSSEC root ancor") parser.add_argument("--castore", action="store", type=str, default="/etc/ssl/certs/ca-certificates.crt", help="ca certificate bundle") group = parser.add_mutually_exclusive_group() group.add_argument("-6", "--6", action="store_true", dest="use6", help="check via IPv6 only") group.add_argument("-4", "--4", action="store_true", dest="use4", help="check via IPv4 only") group.add_argument("--64", action="store_false", dest="use64", help="check via IPv4 and IPv6 (default)") add_certificate_options(parser) args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) elif args.quiet: logging.getLogger().setLevel(logging.WARNING) else: logging.getLogger().setLevel(logging.INFO) host = args.Host.encode('idna').decode() sslcontext, resolver = init(args) if args.use6: afamilies = [AF_INET6] elif args.use4: afamilies = [AF_INET6] else: afamilies = [AF_INET, AF_INET6] xresolver = Resolver(args.ancor) endpoints = [] if not args.s2s: for endpoint, meta in srv_lookup("_xmpp-client._tcp.%s" % host, xresolver): meta['type'] = 'client' endpoints.append((endpoint, meta)) if not args.c2s: for endpoint, meta in srv_lookup("_xmpp-server._tcp.%s" % host, xresolver): meta['type'] = 'server' endpoints.append((endpoint, meta)) retval = 0 for afamily in afamilies: for endpoint, metadata in endpoints: host, port = endpoint try: connection = init_connection(sslcontext, args, afamily, endpoint, metadata) except: logging.exception("Connection refused") return 2 nretval = verify_certificate(connection.getpeercert(), args) retval = max(retval, nretval) nretval = verify_tlsa_record(resolver, "_%d._tcp.%s" % (port, host), connection.getpeercert(binary_form=True)) retval = max(retval, nretval) close_connection(connection) return retval if __name__ == '__main__': sys.exit(main())