X-Git-Url: https://git.siccegge.de//index.cgi?p=tools.git;a=blobdiff_plain;f=tls-check;h=19a8dfc65d061f4c7e5d5e5dd6db4de564a8d906;hp=d16e80ab158e7a691f4fd7d0357e79972b192209;hb=HEAD;hpb=64ea73a6ab1d9d792167502984f775a95813ce3a diff --git a/tls-check b/tls-check index d16e80a..19a8dfc 100644 --- a/tls-check +++ b/tls-check @@ -2,67 +2,139 @@ from __future__ import print_function from optparse import OptionParser -from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED, cert_time_to_seconds, SSLError -from socket import socket, AF_INET6 +from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED, cert_time_to_seconds, SSLError, CertificateError +from socket import socket, AF_INET6, create_connection from datetime import datetime, timedelta +from smtplib import SMTP +import yaml VERBOSE=False -def check_cert(host, port, ca, warn, crit): - context = SSLContext(PROTOCOL_TLSv1_2) - context.verify_mode = CERT_REQUIRED - context.load_verify_locations(ca) - connection = context.wrap_socket(socket(AF_INET6), - server_hostname=host) - try: - connection.connect((host, port)) - except SSLError: - print("CRIT (invalid certificate) %s:%d" % (host, port)) - return 2 - - expiretimestamp = cert_time_to_seconds(connection.getpeercert()['notAfter']) - delta = datetime.utcfromtimestamp(expiretimestamp) - datetime.utcnow() +class Verifier: + def __init__(self, cafile, warn, crit): + self.cafile = cafile + self.crit = crit + self.warn = warn + + def check(self, proto, host, port, name): + context = SSLContext(PROTOCOL_TLSv1_2) + context.verify_mode = CERT_REQUIRED + context.load_verify_locations(self.cafile) + if hasattr(self, 'remote_check_%s' % proto): + getattr(self, 'remote_check_%s' % proto)(context, host, port, name) + + def remote_check_xmpp(self, context, host, port, name): + xmpp_open = ("" ) + xmpp_starttls = "" + + connection = create_connection((host, port)) + connection.sendall(xmpp_open.format(name).encode('utf-8')) + response = connection.recv(4096).decode('utf-8') + + if not '' in response: + connection.recv(4096) + + connection.sendall(xmpp_starttls.encode('utf-8')) + connection.recv(4096) + + connection = context.wrap_socket(connection, server_hostname=name) + connection.do_handshake() + + cert = connection.getpeercert() + return self.check_cert(cert, host, port, name) - if delta < crit: - print("CRIT (expires in %s) %s:%d" % (delta, host, port)) - return 2 - elif delta < warn: - print("WARN (expires in %s) %s:%d" % (delta, host, port)) - return 1 - + def remote_check_smtp(self, context, host, port, name): + smtp = SMTP(host, port) + try: + smtp.starttls(context=context) + except SSLError: + print("CRIT (invalid certificate) %s:%d" % (host, port)) + return 2 + + cert = smtp.sock.getpeercert() + return self.check_cert(cert, host, port, name) + + def remote_check_ssl(self, context, host, port, name): + connection = context.wrap_socket(socket(AF_INET6), + server_hostname=name) + try: + connection.connect((host, port)) + except SSLError: + print("CRIT (invalid certificate) %s:%d" % (host, port)) + return 2 + + cert = connection.getpeercert() + return self.check_cert(cert, host, port, name) + + def check_cert(self, data, host, port, name): + expiretimestamp = cert_time_to_seconds(data['notAfter']) + delta = datetime.utcfromtimestamp(expiretimestamp) - datetime.utcnow() + deltastr = str(delta).split(",") + + if delta < self.crit: + print("CRIT (expires in %8s,%16s) %s:%d" % (deltastr[0], deltastr[1], name, port)) + return 2 + elif delta < self.warn: + print("WARN (expires in %8s,%16s) %s:%d" % (deltastr[0], deltastr[1], name, port)) + return 1 def main(): global VERBOSE parser = OptionParser() + parser.add_option("--config", action="store", type="string", dest="config", + help="configuration file to use") parser.add_option("-n", "--name", - action="append", type="string", dest="hosts", + action="append", type="string", dest="names", help="hostname:port to check for expired certificates") parser.add_option("-w", "--warning-days", - action="store", type=int, dest="warn", default=15, + action="store", type=int, dest="warn", help="minimum remaining validity in days before a warning is issued") parser.add_option("-c", "--critical-days", - action="store", type=int, dest="crit", default=5, + action="store", type=int, dest="crit", help="minimum remaining validity in days before a warning is issued") parser.add_option("-v", action="store_true", dest="verbose", default=False) parser.add_option("-q", action="store_false", dest="verbose") parser.add_option("--ca", action="store", type="string", dest="ca", - default="/etc/ssl/certs/ca-certificates.crt", help="ca certificate bundle") opts, _args = parser.parse_args() - VERBOSE = opts.verbose - if not opts.hosts: + if opts.config: + configuration = yaml.load(open(opts.config)) + else: + configuration = dict() + + if opts.names: + configuration['names'] = opts.names + if opts.warn: + configuration['warn_days'] = opts.warn + if opts.crit: + configuration['crit_days'] = opts.crit + if opts.ca: + configuration['cacertificates'] = opts.ca + if opts.verbose: + configuration['verbose'] = opts.verbose + + if 'verbose' in configuration: + VERBOSE = configuration['verbose'] + + if not 'names' in configuration: parser.error("needs at least one host") + verifier = Verifier(configuration['cacertificates'] if 'cacertificates' in configuration else '/etc/ssl/certs/ca-certificates.crt', + timedelta(configuration['warn_days'] if 'warn_days' in configuration else 15), + timedelta(configuration['crit_days'] if 'crit_days' in configuration else 5)) + try: - hosts = [ (i[0], int(i[1])) for i in [ j.split(':', 1) for j in opts.hosts ] ] + hosts = [ (i[0], i[1], int(i[2]), i[3] if len(i) == 4 else i[1]) for i in [ j.split(':', 3) for j in configuration['names'] ] ] except (ValueError, IndexError): - parser.error("names need to be in DNSNAME:PORT format") + parser.error("names need to be in PROTO:DNSNAME:PORT format") - for host, port in hosts: - check_cert(host, port, opts.ca, timedelta(opts.warn), timedelta(opts.crit)) + for proto, host, port, name in hosts: + verifier.check(proto, host, port, name) if __name__ == "__main__": main()