--- /dev/null
+from __future__ import print_function
+import sys
+import argparse
+import logging
+import hashlib
+import codecs
+from unbound import ub_ctx, ub_strerror
+import paramiko
+ from unbound import RR_TYPE_SSHFP
+except ImportError:
+class HostKeyMatchSSHFP(BaseException):
+ pass
+class HostKeyMismatchSSHFP(BaseException):
+ pass
+class HostKeyLookup(paramiko.client.MissingHostKeyPolicy):
+ def __init__(self, args):
+ self._args = args
+ self._resolver = ub_ctx()
+ self._resolver.add_ta_file(args.ancor)
+ def missing_host_key(self, client, hostname, key):
+ actualhostkey = key.asbytes()
+ actualkeytype = key.get_name()
+ hexencoder = codecs.getencoder('hex')
+ s, r = self._resolver.resolve(hostname, RR_TYPE_SSHFP)
+ if 0 != s:
+ ub_strerror(s)
+ return
+ if r.data is None:
+ logging.error("No SSHFP record returned")
+ return 2
+ for record in r.data.data:
+ keytype = record[0]
+ hashtype = record[1]
+ data = record[2:]
+ if hashtype == 1:
+ actualhash = hashlib.sha1(actualhostkey).digest()
+ elif hashtype == 2:
+ actualhash = hashlib.sha256(actualhostkey).digest()
+ else:
+ logging.warn("Only hashtypes 1 and 2 supported")
+ if keytype == 1 and actualkeytype == 'ssh-rsa':
+ if data == actualhash:
+ raise HostKeyMatchSSHFP
+ elif keytype == 2 and actualkeytype == 'ssh-dss':
+ if data == actualhash:
+ raise HostKeyMatchSSHFP
+ elif keytype == 3 and actualkeytype == 'ssh-ecdsa':
+ if data == actualhash:
+ raise HostKeyMatchSSHFP
+ elif keytype == 4 and actualkeytype == 'ssh-ed25519':
+ if data == actualhash:
+ raise HostKeyMatchSSHFP
+ logging.error("No matching SSHFP record found")
+ raise HostKeyMismatchSSHFP
+def init_connection(args):
+ connection = paramiko.client.SSHClient()
+ connection.set_missing_host_key_policy(HostKeyLookup(args))
+ return connection
+def main():
+ logging.basicConfig(format='%(levelname)5s %(message)s')
+ parser = argparse.ArgumentParser()
+ parser.add_argument("Host")
+ parser.add_argument("--verbose", action="store_true")
+ parser.add_argument("--quiet", action="store_true")
+ parser.add_argument("-p", "--port",
+ action="store", type=int, default=22,
+ help="SMTP port")
+ parser.add_argument("-a", "--ancor",
+ action="store", type=str, default="/etc/unbound/root.key",
+ help="DNSSEC root ancor")
+ group = parser.add_mutually_exclusive_group()
+ group.add_argument("-6", "--6", action="store_true", help="check via IPv6 only")
+ group.add_argument("-4", "--4", action="store_true", help="check via IPv4 only")
+ group.add_argument("--64", action="store_false", help="check via IPv4 and IPv6 (default)")
+ args = parser.parse_args()
+ if args.verbose:
+ logging.getLogger().setLevel(logging.DEBUG)
+ elif args.quiet:
+ logging.getLogger().setLevel(logging.WARNING)
+ else:
+ logging.getLogger().setLevel(logging.INFO)
+ connection = init_connection(args)
+ try:
+ connection.connect(args.Host)
+ except HostKeyMatchSSHFP:
+ return 0
+ except HostKeyMismatchSSHFP:
+ return 2
+if __name__ == '__main__':
+ sys.exit(main())