Add ssh / SSHFP service check
[dane-monitoring-plugins.git] / check_dane_ssh
1 #!/usr/bin/python3
2 #
3 #
4
5 from __future__ import print_function
6
7 import sys
8 import argparse
9 import logging
10 import hashlib
11 import codecs
12
13 from unbound import ub_ctx, ub_strerror
14 import paramiko
15
16 try:
17 from unbound import RR_TYPE_SSHFP
18 except ImportError:
19 RR_TYPE_SSHFP=44
20
21
22 class HostKeyMatchSSHFP(BaseException):
23 pass
24
25
26 class HostKeyMismatchSSHFP(BaseException):
27 pass
28
29
30 class HostKeyLookup(paramiko.client.MissingHostKeyPolicy):
31 def __init__(self, args):
32 self._args = args
33 self._resolver = ub_ctx()
34 self._resolver.add_ta_file(args.ancor)
35
36
37 def missing_host_key(self, client, hostname, key):
38 actualhostkey = key.asbytes()
39 actualkeytype = key.get_name()
40 hexencoder = codecs.getencoder('hex')
41
42 s, r = self._resolver.resolve(hostname, RR_TYPE_SSHFP)
43 if 0 != s:
44 ub_strerror(s)
45 return
46
47 if r.data is None:
48 logging.error("No SSHFP record returned")
49 return 2
50
51 for record in r.data.data:
52 keytype = record[0]
53 hashtype = record[1]
54 data = record[2:]
55
56 if hashtype == 1:
57 actualhash = hashlib.sha1(actualhostkey).digest()
58 elif hashtype == 2:
59 actualhash = hashlib.sha256(actualhostkey).digest()
60 else:
61 logging.warn("Only hashtypes 1 and 2 supported")
62
63 if keytype == 1 and actualkeytype == 'ssh-rsa':
64 if data == actualhash:
65 raise HostKeyMatchSSHFP
66
67 elif keytype == 2 and actualkeytype == 'ssh-dss':
68 if data == actualhash:
69 raise HostKeyMatchSSHFP
70
71 elif keytype == 3 and actualkeytype == 'ssh-ecdsa':
72 if data == actualhash:
73 raise HostKeyMatchSSHFP
74
75 elif keytype == 4 and actualkeytype == 'ssh-ed25519':
76 if data == actualhash:
77 raise HostKeyMatchSSHFP
78
79 logging.error("No matching SSHFP record found")
80 raise HostKeyMismatchSSHFP
81
82
83 def init_connection(args):
84 connection = paramiko.client.SSHClient()
85 connection.set_missing_host_key_policy(HostKeyLookup(args))
86
87 return connection
88
89
90 def main():
91 logging.basicConfig(format='%(levelname)5s %(message)s')
92 parser = argparse.ArgumentParser()
93 parser.add_argument("Host")
94
95 parser.add_argument("--verbose", action="store_true")
96 parser.add_argument("--quiet", action="store_true")
97 parser.add_argument("-p", "--port",
98 action="store", type=int, default=22,
99 help="SMTP port")
100
101 parser.add_argument("-a", "--ancor",
102 action="store", type=str, default="/etc/unbound/root.key",
103 help="DNSSEC root ancor")
104
105 group = parser.add_mutually_exclusive_group()
106 group.add_argument("-6", "--6", action="store_true", help="check via IPv6 only")
107 group.add_argument("-4", "--4", action="store_true", help="check via IPv4 only")
108 group.add_argument("--64", action="store_false", help="check via IPv4 and IPv6 (default)")
109
110 args = parser.parse_args()
111
112 if args.verbose:
113 logging.getLogger().setLevel(logging.DEBUG)
114 elif args.quiet:
115 logging.getLogger().setLevel(logging.WARNING)
116 else:
117 logging.getLogger().setLevel(logging.INFO)
118
119 connection = init_connection(args)
120
121 try:
122 connection.connect(args.Host)
123 except HostKeyMatchSSHFP:
124 return 0
125 except HostKeyMismatchSSHFP:
126 return 2
127
128
129 if __name__ == '__main__':
130 sys.exit(main())