#!/usr/bin/python from __future__ import print_function from optparse import OptionParser from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED, cert_time_to_seconds, SSLError, CertificateError from socket import socket, AF_INET6, create_connection from datetime import datetime, timedelta from smtplib import SMTP import yaml VERBOSE=False class Verifier: def __init__(self, cafile, warn, crit): self.cafile = cafile self.crit = crit self.warn = warn def check(self, proto, host, port, name): context = SSLContext(PROTOCOL_TLSv1_2) context.verify_mode = CERT_REQUIRED context.load_verify_locations(self.cafile) if hasattr(self, 'remote_check_%s' % proto): getattr(self, 'remote_check_%s' % proto)(context, host, port, name) def remote_check_xmpp(self, context, host, port, name): xmpp_open = ("" ) xmpp_starttls = "" connection = create_connection((host, port)) connection.sendall(xmpp_open.format(name).encode('utf-8')) response = connection.recv(4096).decode('utf-8') if not '' in response: connection.recv(4096) connection.sendall(xmpp_starttls.encode('utf-8')) connection.recv(4096) connection = context.wrap_socket(connection, server_hostname=name) connection.do_handshake() cert = connection.getpeercert() return self.check_cert(cert, host, port, name) def remote_check_smtp(self, context, host, port, name): smtp = SMTP(host, port) try: smtp.starttls(context=context) except SSLError: print("CRIT (invalid certificate) %s:%d" % (host, port)) return 2 cert = smtp.sock.getpeercert() return self.check_cert(cert, host, port, name) def remote_check_ssl(self, context, host, port, name): connection = context.wrap_socket(socket(AF_INET6), server_hostname=name) try: connection.connect((host, port)) except SSLError: print("CRIT (invalid certificate) %s:%d" % (host, port)) return 2 cert = connection.getpeercert() return self.check_cert(cert, host, port, name) def check_cert(self, data, host, port, name): expiretimestamp = cert_time_to_seconds(data['notAfter']) delta = datetime.utcfromtimestamp(expiretimestamp) - datetime.utcnow() deltastr = str(delta).split(",") if delta < self.crit: print("CRIT (expires in %8s,%16s) %s:%d" % (deltastr[0], deltastr[1], name, port)) return 2 elif delta < self.warn: print("WARN (expires in %8s,%16s) %s:%d" % (deltastr[0], deltastr[1], name, port)) return 1 def main(): global VERBOSE parser = OptionParser() parser.add_option("--config", action="store", type="string", dest="config", help="configuration file to use") parser.add_option("-n", "--name", action="append", type="string", dest="names", help="hostname:port to check for expired certificates") parser.add_option("-w", "--warning-days", action="store", type=int, dest="warn", help="minimum remaining validity in days before a warning is issued") parser.add_option("-c", "--critical-days", action="store", type=int, dest="crit", help="minimum remaining validity in days before a warning is issued") parser.add_option("-v", action="store_true", dest="verbose", default=False) parser.add_option("-q", action="store_false", dest="verbose") parser.add_option("--ca", action="store", type="string", dest="ca", help="ca certificate bundle") opts, _args = parser.parse_args() if opts.config: configuration = yaml.load(open(opts.config)) else: configuration = dict() if opts.names: configuration['names'] = opts.names if opts.warn: configuration['warn_days'] = opts.warn if opts.crit: configuration['crit_days'] = opts.crit if opts.ca: configuration['cacertificates'] = opts.ca if opts.verbose: configuration['verbose'] = opts.verbose if 'verbose' in configuration: VERBOSE = configuration['verbose'] if not 'names' in configuration: parser.error("needs at least one host") verifier = Verifier(configuration['cacertificates'] if 'cacertificates' in configuration else '/etc/ssl/certs/ca-certificates.crt', timedelta(configuration['warn_days'] if 'warn_days' in configuration else 15), timedelta(configuration['crit_days'] if 'crit_days' in configuration else 5)) try: hosts = [ (i[0], i[1], int(i[2]), i[3] if len(i) == 4 else i[1]) for i in [ j.split(':', 3) for j in configuration['names'] ] ] except (ValueError, IndexError): parser.error("names need to be in PROTO:DNSNAME:PORT format") for proto, host, port, name in hosts: verifier.check(proto, host, port, name) if __name__ == "__main__": main()