#!/usr/bin/python3 # # from __future__ import print_function import sys import argparse import logging import hashlib import codecs from unbound import ub_ctx, ub_strerror import paramiko try: from unbound import RR_TYPE_SSHFP except ImportError: RR_TYPE_SSHFP=44 class HostKeyMatchSSHFP(BaseException): pass class HostKeyMismatchSSHFP(BaseException): pass class HostKeyLookup(paramiko.client.MissingHostKeyPolicy): def __init__(self, args): self._args = args self._resolver = ub_ctx() self._resolver.add_ta_file(args.ancor) def missing_host_key(self, client, hostname, key): actualhostkey = key.asbytes() actualkeytype = key.get_name() hexencoder = codecs.getencoder('hex') s, r = self._resolver.resolve(hostname, RR_TYPE_SSHFP) if 0 != s: ub_strerror(s) return if r.data is None: logging.error("No SSHFP record returned") return 2 for record in r.data.data: keytype = record[0] hashtype = record[1] data = record[2:] if hashtype == 1: actualhash = hashlib.sha1(actualhostkey).digest() elif hashtype == 2: actualhash = hashlib.sha256(actualhostkey).digest() else: logging.warn("Only hashtypes 1 and 2 supported") if keytype == 1 and actualkeytype == 'ssh-rsa': if data == actualhash: raise HostKeyMatchSSHFP elif keytype == 2 and actualkeytype == 'ssh-dss': if data == actualhash: raise HostKeyMatchSSHFP elif keytype == 3 and actualkeytype == 'ssh-ecdsa': if data == actualhash: raise HostKeyMatchSSHFP elif keytype == 4 and actualkeytype == 'ssh-ed25519': if data == actualhash: raise HostKeyMatchSSHFP logging.error("No matching SSHFP record found") raise HostKeyMismatchSSHFP def init_connection(args): connection = paramiko.client.SSHClient() connection.set_missing_host_key_policy(HostKeyLookup(args)) return connection def main(): logging.basicConfig(format='%(levelname)5s %(message)s') parser = argparse.ArgumentParser() parser.add_argument("Host") parser.add_argument("--verbose", action="store_true") parser.add_argument("--quiet", action="store_true") parser.add_argument("-p", "--port", action="store", type=int, default=22, help="SMTP port") parser.add_argument("-a", "--ancor", action="store", type=str, default="/etc/unbound/root.key", help="DNSSEC root ancor") group = parser.add_mutually_exclusive_group() group.add_argument("-6", "--6", action="store_true", help="check via IPv6 only") group.add_argument("-4", "--4", action="store_true", help="check via IPv4 only") group.add_argument("--64", action="store_false", help="check via IPv4 and IPv6 (default)") args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) elif args.quiet: logging.getLogger().setLevel(logging.WARNING) else: logging.getLogger().setLevel(logging.INFO) connection = init_connection(args) try: connection.connect(args.Host) except HostKeyMatchSSHFP: return 0 except HostKeyMismatchSSHFP: return 2 if __name__ == '__main__': sys.exit(main())