Add ssh / SSHFP service check
authorChristoph Egger <christoph@christoph-egger.org>
Mon, 29 Aug 2016 21:58:18 +0000 (23:58 +0200)
committerChristoph Egger <christoph@christoph-egger.org>
Mon, 29 Aug 2016 21:58:18 +0000 (23:58 +0200)
check_dane_ssh [new file with mode: 0755]

diff --git a/check_dane_ssh b/check_dane_ssh
new file mode 100755 (executable)
index 0000000..fe56dea
--- /dev/null
@@ -0,0 +1,130 @@
+#!/usr/bin/python3
+#
+#
+
+from __future__ import print_function
+
+import sys
+import argparse
+import logging
+import hashlib
+import codecs
+
+from unbound import ub_ctx, ub_strerror
+import paramiko
+
+try:
+    from unbound import RR_TYPE_SSHFP
+except ImportError:
+    RR_TYPE_SSHFP=44
+
+
+class HostKeyMatchSSHFP(BaseException):
+    pass
+
+
+class HostKeyMismatchSSHFP(BaseException):
+    pass
+
+
+class HostKeyLookup(paramiko.client.MissingHostKeyPolicy):
+    def __init__(self, args):
+        self._args = args
+        self._resolver = ub_ctx()
+        self._resolver.add_ta_file(args.ancor)
+
+
+    def missing_host_key(self, client, hostname, key):
+        actualhostkey = key.asbytes()
+        actualkeytype = key.get_name()
+        hexencoder = codecs.getencoder('hex')
+
+        s, r = self._resolver.resolve(hostname, RR_TYPE_SSHFP)
+        if 0 != s:
+            ub_strerror(s)
+            return
+
+        if r.data is None:
+            logging.error("No SSHFP record returned")
+            return 2
+
+        for record in r.data.data:
+            keytype = record[0]
+            hashtype = record[1]
+            data = record[2:]
+
+            if hashtype == 1:
+                actualhash = hashlib.sha1(actualhostkey).digest()
+            elif hashtype == 2:
+                actualhash = hashlib.sha256(actualhostkey).digest()
+            else:
+                logging.warn("Only hashtypes 1 and 2 supported")
+
+            if keytype == 1 and actualkeytype == 'ssh-rsa':
+                if data == actualhash:
+                    raise HostKeyMatchSSHFP
+
+            elif keytype == 2 and actualkeytype == 'ssh-dss':
+                if data == actualhash:
+                    raise HostKeyMatchSSHFP
+
+            elif keytype == 3 and actualkeytype == 'ssh-ecdsa':
+                if data == actualhash:
+                    raise HostKeyMatchSSHFP
+
+            elif keytype == 4 and actualkeytype == 'ssh-ed25519':
+                if data == actualhash:
+                    raise HostKeyMatchSSHFP
+
+        logging.error("No matching SSHFP record found")
+        raise HostKeyMismatchSSHFP
+
+
+def init_connection(args):
+    connection = paramiko.client.SSHClient()
+    connection.set_missing_host_key_policy(HostKeyLookup(args))
+
+    return connection
+
+
+def main():
+    logging.basicConfig(format='%(levelname)5s %(message)s')
+    parser = argparse.ArgumentParser()
+    parser.add_argument("Host")
+
+    parser.add_argument("--verbose", action="store_true")
+    parser.add_argument("--quiet", action="store_true")
+    parser.add_argument("-p", "--port",
+                        action="store", type=int, default=22,
+                        help="SMTP port")
+
+    parser.add_argument("-a", "--ancor",
+                        action="store", type=str, default="/etc/unbound/root.key",
+                        help="DNSSEC root ancor")
+
+    group = parser.add_mutually_exclusive_group()
+    group.add_argument("-6", "--6", action="store_true", help="check via IPv6 only")
+    group.add_argument("-4", "--4", action="store_true", help="check via IPv4 only")
+    group.add_argument("--64", action="store_false", help="check via IPv4 and IPv6 (default)")
+
+    args = parser.parse_args()
+
+    if args.verbose:
+        logging.getLogger().setLevel(logging.DEBUG)
+    elif args.quiet:
+        logging.getLogger().setLevel(logging.WARNING)
+    else:
+        logging.getLogger().setLevel(logging.INFO)
+
+    connection = init_connection(args)
+
+    try:
+        connection.connect(args.Host)
+    except HostKeyMatchSSHFP:
+        return 0
+    except HostKeyMismatchSSHFP:
+        return 2
+
+
+if __name__ == '__main__':
+    sys.exit(main())