From df4f69fb2aa6fd14e505c8137ec0826616f90e44 Mon Sep 17 00:00:00 2001 From: Christoph Egger Date: Sat, 22 Nov 2014 19:17:53 +0100 Subject: [PATCH] Make tls-check more flexible and add smtp/starttls check --- tls-check | 77 +++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 52 insertions(+), 25 deletions(-) diff --git a/tls-check b/tls-check index d16e80a..2c3199e 100644 --- a/tls-check +++ b/tls-check @@ -2,34 +2,59 @@ from __future__ import print_function from optparse import OptionParser -from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED, cert_time_to_seconds, SSLError +from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED, cert_time_to_seconds, SSLError, CertificateError from socket import socket, AF_INET6 from datetime import datetime, timedelta +from smtplib import SMTP VERBOSE=False -def check_cert(host, port, ca, warn, crit): - context = SSLContext(PROTOCOL_TLSv1_2) - context.verify_mode = CERT_REQUIRED - context.load_verify_locations(ca) - connection = context.wrap_socket(socket(AF_INET6), +class Verifier: + def __init__(self, cafile, warn, crit): + self.cafile = cafile + self.crit = crit + self.warn = warn + + def check(self, proto, host, port): + context = SSLContext(PROTOCOL_TLSv1_2) + context.verify_mode = CERT_REQUIRED + context.load_verify_locations(self.cafile) + if hasattr(self, 'remote_check_%s' % proto): + getattr(self, 'remote_check_%s' % proto)(context, host, port) + + def remote_check_smtp(self, context, host, port): + smtp = SMTP(host, port) + try: + smtp.starttls(context=context) + except SSLError: + print("CRIT (invalid certificate) %s:%d" % (host, port)) + return 2 + + cert = smtp.sock.getpeercert() + return self.check_cert(cert, host, port) + + def remote_check_ssl(self, context, host, port): + connection = context.wrap_socket(socket(AF_INET6), server_hostname=host) - try: - connection.connect((host, port)) - except SSLError: - print("CRIT (invalid certificate) %s:%d" % (host, port)) - return 2 - - expiretimestamp = cert_time_to_seconds(connection.getpeercert()['notAfter']) - delta = datetime.utcfromtimestamp(expiretimestamp) - datetime.utcnow() + try: + connection.connect((host, port)) + except SSLError: + print("CRIT (invalid certificate) %s:%d" % (host, port)) + return 2 + + cert = connection.getpeercert() + return self.check_cert(cert, host, port) - if delta < crit: - print("CRIT (expires in %s) %s:%d" % (delta, host, port)) - return 2 - elif delta < warn: - print("WARN (expires in %s) %s:%d" % (delta, host, port)) - return 1 - + def check_cert(self, data, host, port): + expiretimestamp = cert_time_to_seconds(data['notAfter']) + delta = datetime.utcfromtimestamp(expiretimestamp) - datetime.utcnow() + + if delta < self.crit: + print("CRIT (expires in %s) %s:%d" % (delta, host, port)) + return 2 + elif delta < self.warn: + print("WARN (expires in %s) %s:%d" % (delta, host, port)) + return 1 def main(): global VERBOSE @@ -56,13 +81,15 @@ def main(): if not opts.hosts: parser.error("needs at least one host") + verifier = Verifier(opts.ca, timedelta(opts.warn), timedelta(opts.crit)) + try: - hosts = [ (i[0], int(i[1])) for i in [ j.split(':', 1) for j in opts.hosts ] ] + hosts = [ (i[0], i[1], int(i[2])) for i in [ j.split(':', 2) for j in opts.hosts ] ] except (ValueError, IndexError): - parser.error("names need to be in DNSNAME:PORT format") + parser.error("names need to be in PROTO:DNSNAME:PORT format") - for host, port in hosts: - check_cert(host, port, opts.ca, timedelta(opts.warn), timedelta(opts.crit)) + for proto, host, port in hosts: + verifier.check(proto, host, port) if __name__ == "__main__": main() -- 2.39.2