]> git.siccegge.de Git - dane-monitoring-plugins.git/commitdiff
Initial implementation of DANE SMTP check
authorChristoph Egger <christoph@christoph-egger.org>
Mon, 29 Aug 2016 18:40:52 +0000 (20:40 +0200)
committerChristoph Egger <christoph@christoph-egger.org>
Mon, 29 Aug 2016 18:40:52 +0000 (20:40 +0200)
check_dane/cert.py [new file with mode: 0644]
check_dane/tlsa.py [new file with mode: 0644]
check_dane_smtp [new file with mode: 0755]

diff --git a/check_dane/cert.py b/check_dane/cert.py
new file mode 100644 (file)
index 0000000..289f514
--- /dev/null
@@ -0,0 +1,10 @@
+#!/usr/bin/python3
+
+from pyasn1_modules import rfc2459
+from pyasn1.codec.der import decoder, encoder
+
+
+def get_spki(certificate):
+    cert = decoder.decode(certificate, asn1Spec=rfc2459.Certificate())[0]
+    spki = cert['tbsCertificate']["subjectPublicKeyInfo"]
+    return encoder.encode(spki)
diff --git a/check_dane/tlsa.py b/check_dane/tlsa.py
new file mode 100644 (file)
index 0000000..ead2d3f
--- /dev/null
@@ -0,0 +1,55 @@
+#!/usr/bin/python3
+
+import sys
+import codecs
+import hashlib
+
+from .cert import get_spki
+
+from unbound import RR_TYPE_A, RR_TYPE_AAAA
+from unbound import idn2dname, ub_strerror
+
+def verify_tlsa_record(resolver, record, certificate):
+    print(record)
+    print(hashlib.sha256(certificate).hexdigest())
+    s, r = resolver.resolve(record, rrtype=52)
+    if 0 != s:
+        ub_strerror(s)
+        return
+
+    for record in r.data.data:
+        hexencoder = codecs.getencoder('hex')
+        usage = record[0]
+        selector = record[1]
+        matching = record[2]
+        data = record[3:]
+
+        if usage != 3:
+            sys.stderr.write("Only 'Domain-issued certificate' records supported\n")
+
+        if selector == 0:
+            verifieddata = certificate
+        elif selector == 1:
+            verifieddata = get_spki(certificate)
+        else:
+            # currently only 0 and 1 are assigned
+            sys.stderr.write("Only selectors 0 and 1 supported\n")
+
+        if matching == 0:
+            if verifieddata == data:
+                print("success")
+                return 0
+        elif matching == 1:
+            if hashlib.sha256(verifieddata).digest() == data:
+                print("success")
+                return 0
+        elif matching == 2:
+            if hashlib.sha512(verifieddata).digest() == data:
+                print("success")
+                return 0
+        else:
+            # currently only 0, 1 and 2 are assigned
+            sys.stderr.write("Only matching types 0, 1 and 2 supported\n")
+
+    sys.stderr.write("could not verify any tlsa record\n")
+    return -1
diff --git a/check_dane_smtp b/check_dane_smtp
new file mode 100755 (executable)
index 0000000..33ee6a9
--- /dev/null
@@ -0,0 +1,99 @@
+#!/usr/bin/python3
+#
+#
+
+from __future__ import print_function
+
+import sys
+import argparse
+
+from socket import socket, AF_INET6, AF_INET, create_connection
+from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED, cert_time_to_seconds, SSLError, CertificateError, create_default_context
+from unbound import ub_ctx, idn2dname, ub_strerror
+
+from check_dane.tlsa import verify_tlsa_record
+
+def init_connection(sslcontext, args):
+    host = args.Host
+
+    if args.ssl:
+        port = 465 if args.port == 0 else args.port
+        connection = context.wrap_socket(socket(AF_INET),
+                                         server_hostname=host)
+        connection.connect(host, port)
+
+    else:
+        port = 25 if args.port == 0 else args.port
+        connection = create_connection((host, port))
+        print(connection.recv(512))
+        connection.send(b"EHLO localhost\r\n")
+        print(connection.recv(512))
+        connection.send(b"STARTTLS\r\n")
+        print(connection.recv(512))
+        connection = sslcontext.wrap_socket(connection, server_hostname=host)
+        connection.do_handshake()
+
+    return connection
+
+
+def close_connection(connection):
+    connection.send(b"QUIT\r\n")
+    print(connection.recv(512))
+
+
+def init(args):
+    sslcontext = SSLContext(PROTOCOL_TLSv1_2)
+    sslcontext.verify_mode = CERT_REQUIRED
+    sslcontext.load_verify_locations(args.castore)
+
+    resolver = ub_ctx()
+    resolver.add_ta_file(args.ancor)
+
+    return sslcontext, resolver
+
+
+def main():
+    parser = argparse.ArgumentParser()
+    parser.add_argument("Host")
+
+    parser.add_argument("-p", "--port",
+                        action="store", type=int, default=0,
+                        help="SMTP port")
+    parser.add_argument("--ssl",
+                        action="store_true",
+                        help="Use direct TLS connection instead of starttls (default: disabled)")
+    parser.add_argument("--check-dane",
+                        action="store_false",
+                        help="Verify presented certificate via DANE (default: enabled)")
+    parser.add_argument("--check-ca",
+                        action="store_false",
+                        help="Verify presented certificate via the CA system (default: enabled)")
+    parser.add_argument("--check-expire",
+                        action="store_false",
+                        help="Verify presented certificate for expiration (default: enabled)")
+
+    parser.add_argument("-a", "--ancor",
+                        action="store", type=str, default="/etc/unbound/root.key",
+                        help="DNSSEC root ancor")
+    parser.add_argument("--castore", action="store", type=str,
+                        default="/etc/ssl/certs/ca-certificates.crt",
+                        help="ca certificate bundle")
+
+    group = parser.add_mutually_exclusive_group()
+    group.add_argument("-6", "--6", action="store_true", help="check via IPv6 only")
+    group.add_argument("-4", "--4", action="store_true", help="check via IPv4 only")
+    group.add_argument("--64", action="store_false", help="check via IPv4 and IPv6 (default)")
+
+    args = parser.parse_args()
+    sslcontext, resolver = init(args)
+    print(args)
+
+    connection = init_connection(sslcontext, args)
+
+    verify_tlsa_record(resolver, "_25._tcp.%s" % args.Host, connection.getpeercert(binary_form=True))
+
+    close_connection(connection)
+
+
+if __name__ == '__main__':
+   main()