#!/usr/bin/python3
#
#
from __future__ import print_function
import sys
import argparse
import logging
from socket import socket, AF_INET6, AF_INET, create_connection
from ssl import SSLError, CertificateError, SSLContext
from ssl import PROTOCOL_TLSv1_2, CERT_REQUIRED
from unbound import ub_ctx
from check_dane.resolve import Resolver, ResolverException, srv_lookup
from check_dane.tlsa import verify_tlsa_record
from check_dane.cert import verify_certificate, add_certificate_options
XMPP_OPEN = ("")
XMPP_CLOSE = ""
XMPP_STARTTLS = ""
def init_connection(sslcontext, args, family, endpoint, metadata):
host, _ = endpoint
logging.debug("Connecting to %s", endpoint)
connection = socket(family=family)
connection.connect(endpoint)
connection.sendall(XMPP_OPEN.format(metadata['type'], args.Host).encode())
answer = connection.recv(4096)
logging.debug(answer)
if not '' in answer:
answer = connection.recv(4096)
logging.debug(answer)
connection.sendall(XMPP_STARTTLS.encode())
answer = connection.recv(4096)
logging.debug(answer)
connection = sslcontext.wrap_socket(connection, server_hostname=host)
connection.do_handshake()
connection.sendall(XMPP_OPEN.format(metadata['type'], args.Host).encode())
answer = connection.recv(4096)
logging.debug(answer)
if not '' in answer:
answer = connection.recv(4096)
logging.debug(answer)
return connection
def close_connection(connection):
connection.send(XMPP_CLOSE.encode())
answer = connection.recv(512)
logging.debug(answer)
def init(args):
sslcontext = SSLContext(PROTOCOL_TLSv1_2)
sslcontext.verify_mode = CERT_REQUIRED
sslcontext.load_verify_locations(args.castore)
resolver = ub_ctx()
resolver.add_ta_file(args.ancor)
return sslcontext, resolver
def main():
logging.basicConfig(format='%(levelname)5s %(message)s')
parser = argparse.ArgumentParser()
parser.add_argument("Host")
parser.add_argument("--verbose", action="store_true")
parser.add_argument("--quiet", action="store_true")
# parser.add_argument("-p", "--port",
# action="store", type=int, default=0,
# help="XMPP port")
group = parser.add_mutually_exclusive_group()
group.add_argument("--s2s", action="store_true",
help="Only check server-to-server connections")
group.add_argument("--c2s", action="store_true",
help="Only check client-to-server connections")
# parser.add_argument("--ssl",
# action="store_true",
# help="Use direct TLS connection instead of starttls (default: disabled)")
parser.add_argument("--check-dane",
action="store_false",
help="Verify presented certificate via DANE (default: enabled)")
parser.add_argument("--check-ca",
action="store_false",
help="Verify presented certificate via the CA system (default: enabled)")
parser.add_argument("--check-expire",
action="store_false",
help="Verify presented certificate for expiration (default: enabled)")
parser.add_argument("-a", "--ancor",
action="store", type=str, default="/etc/unbound/root.key",
help="DNSSEC root ancor")
parser.add_argument("--castore", action="store", type=str,
default="/etc/ssl/certs/ca-certificates.crt",
help="ca certificate bundle")
group = parser.add_mutually_exclusive_group()
group.add_argument("-6", "--6", action="store_true", dest="use6", help="check via IPv6 only")
group.add_argument("-4", "--4", action="store_true", dest="use4", help="check via IPv4 only")
group.add_argument("--64", action="store_false", dest="use64", help="check via IPv4 and IPv6 (default)")
add_certificate_options(parser)
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
elif args.quiet:
logging.getLogger().setLevel(logging.WARNING)
else:
logging.getLogger().setLevel(logging.INFO)
host = args.Host.encode('idna').decode()
sslcontext, resolver = init(args)
if args.use6:
afamilies = [AF_INET6]
elif args.use4:
afamilies = [AF_INET6]
else:
afamilies = [AF_INET, AF_INET6]
xresolver = Resolver(args.ancor)
endpoints = []
if not args.s2s:
for endpoint, meta in srv_lookup("_xmpp-client._tcp.%s" % host, xresolver):
meta['type'] = 'client'
endpoints.append((endpoint, meta))
if not args.c2s:
for endpoint, meta in srv_lookup("_xmpp-server._tcp.%s" % host, xresolver):
meta['type'] = 'server'
endpoints.append((endpoint, meta))
retval = 0
for afamily in afamilies:
for endpoint, metadata in endpoints:
host, port = endpoint
try:
connection = init_connection(sslcontext, args, afamily, endpoint, metadata)
except:
logging.exception("Connection refused")
return 2
nretval = verify_certificate(connection.getpeercert(), args)
retval = max(retval, nretval)
nretval = verify_tlsa_record(resolver, "_%d._tcp.%s" % (port, host),
connection.getpeercert(binary_form=True))
retval = max(retval, nretval)
close_connection(connection)
return retval
if __name__ == '__main__':
sys.exit(main())