From: Christoph Egger Date: Mon, 29 Aug 2016 18:40:52 +0000 (+0200) Subject: Initial implementation of DANE SMTP check X-Git-Url: https://git.siccegge.de//index.cgi?a=commitdiff_plain;h=43ff512931d365648e65d2a8e88ecfd15fbf2752;p=dane-monitoring-plugins.git Initial implementation of DANE SMTP check --- 43ff512931d365648e65d2a8e88ecfd15fbf2752 diff --git a/check_dane/cert.py b/check_dane/cert.py new file mode 100644 index 0000000..289f514 --- /dev/null +++ b/check_dane/cert.py @@ -0,0 +1,10 @@ +#!/usr/bin/python3 + +from pyasn1_modules import rfc2459 +from pyasn1.codec.der import decoder, encoder + + +def get_spki(certificate): + cert = decoder.decode(certificate, asn1Spec=rfc2459.Certificate())[0] + spki = cert['tbsCertificate']["subjectPublicKeyInfo"] + return encoder.encode(spki) diff --git a/check_dane/tlsa.py b/check_dane/tlsa.py new file mode 100644 index 0000000..ead2d3f --- /dev/null +++ b/check_dane/tlsa.py @@ -0,0 +1,55 @@ +#!/usr/bin/python3 + +import sys +import codecs +import hashlib + +from .cert import get_spki + +from unbound import RR_TYPE_A, RR_TYPE_AAAA +from unbound import idn2dname, ub_strerror + +def verify_tlsa_record(resolver, record, certificate): + print(record) + print(hashlib.sha256(certificate).hexdigest()) + s, r = resolver.resolve(record, rrtype=52) + if 0 != s: + ub_strerror(s) + return + + for record in r.data.data: + hexencoder = codecs.getencoder('hex') + usage = record[0] + selector = record[1] + matching = record[2] + data = record[3:] + + if usage != 3: + sys.stderr.write("Only 'Domain-issued certificate' records supported\n") + + if selector == 0: + verifieddata = certificate + elif selector == 1: + verifieddata = get_spki(certificate) + else: + # currently only 0 and 1 are assigned + sys.stderr.write("Only selectors 0 and 1 supported\n") + + if matching == 0: + if verifieddata == data: + print("success") + return 0 + elif matching == 1: + if hashlib.sha256(verifieddata).digest() == data: + print("success") + return 0 + elif matching == 2: + if hashlib.sha512(verifieddata).digest() == data: + print("success") + return 0 + else: + # currently only 0, 1 and 2 are assigned + sys.stderr.write("Only matching types 0, 1 and 2 supported\n") + + sys.stderr.write("could not verify any tlsa record\n") + return -1 diff --git a/check_dane_smtp b/check_dane_smtp new file mode 100755 index 0000000..33ee6a9 --- /dev/null +++ b/check_dane_smtp @@ -0,0 +1,99 @@ +#!/usr/bin/python3 +# +# + +from __future__ import print_function + +import sys +import argparse + +from socket import socket, AF_INET6, AF_INET, create_connection +from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED, cert_time_to_seconds, SSLError, CertificateError, create_default_context +from unbound import ub_ctx, idn2dname, ub_strerror + +from check_dane.tlsa import verify_tlsa_record + +def init_connection(sslcontext, args): + host = args.Host + + if args.ssl: + port = 465 if args.port == 0 else args.port + connection = context.wrap_socket(socket(AF_INET), + server_hostname=host) + connection.connect(host, port) + + else: + port = 25 if args.port == 0 else args.port + connection = create_connection((host, port)) + print(connection.recv(512)) + connection.send(b"EHLO localhost\r\n") + print(connection.recv(512)) + connection.send(b"STARTTLS\r\n") + print(connection.recv(512)) + connection = sslcontext.wrap_socket(connection, server_hostname=host) + connection.do_handshake() + + return connection + + +def close_connection(connection): + connection.send(b"QUIT\r\n") + print(connection.recv(512)) + + +def init(args): + sslcontext = SSLContext(PROTOCOL_TLSv1_2) + sslcontext.verify_mode = CERT_REQUIRED + sslcontext.load_verify_locations(args.castore) + + resolver = ub_ctx() + resolver.add_ta_file(args.ancor) + + return sslcontext, resolver + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("Host") + + parser.add_argument("-p", "--port", + action="store", type=int, default=0, + help="SMTP port") + parser.add_argument("--ssl", + action="store_true", + help="Use direct TLS connection instead of starttls (default: disabled)") + parser.add_argument("--check-dane", + action="store_false", + help="Verify presented certificate via DANE (default: enabled)") + parser.add_argument("--check-ca", + action="store_false", + help="Verify presented certificate via the CA system (default: enabled)") + parser.add_argument("--check-expire", + action="store_false", + help="Verify presented certificate for expiration (default: enabled)") + + parser.add_argument("-a", "--ancor", + action="store", type=str, default="/etc/unbound/root.key", + help="DNSSEC root ancor") + parser.add_argument("--castore", action="store", type=str, + default="/etc/ssl/certs/ca-certificates.crt", + help="ca certificate bundle") + + group = parser.add_mutually_exclusive_group() + group.add_argument("-6", "--6", action="store_true", help="check via IPv6 only") + group.add_argument("-4", "--4", action="store_true", help="check via IPv4 only") + group.add_argument("--64", action="store_false", help="check via IPv4 and IPv6 (default)") + + args = parser.parse_args() + sslcontext, resolver = init(args) + print(args) + + connection = init_connection(sslcontext, args) + + verify_tlsa_record(resolver, "_25._tcp.%s" % args.Host, connection.getpeercert(binary_form=True)) + + close_connection(connection) + + +if __name__ == '__main__': + main()