from __future__ import print_function
from optparse import OptionParser
from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED, cert_time_to_seconds, SSLError, CertificateError
-from socket import socket, AF_INET6
+from socket import socket, AF_INET6, create_connection
from datetime import datetime, timedelta
from smtplib import SMTP
+import yaml
VERBOSE=False
self.crit = crit
self.warn = warn
- def check(self, proto, host, port):
+ def check(self, proto, host, port, name):
context = SSLContext(PROTOCOL_TLSv1_2)
context.verify_mode = CERT_REQUIRED
context.load_verify_locations(self.cafile)
if hasattr(self, 'remote_check_%s' % proto):
- getattr(self, 'remote_check_%s' % proto)(context, host, port)
+ getattr(self, 'remote_check_%s' % proto)(context, host, port, name)
- def remote_check_smtp(self, context, host, port):
+ def remote_check_xmpp(self, context, host, port, name):
+ xmpp_open = ("<stream:stream xmlns='jabber:client' xmlns:stream='"
+ "http://etherx.jabber.org/streams' xmlns:tls='http://www.ietf.org/rfc/"
+ "rfc2595.txt' to='{0}' xml:lang='en' version='1.0'>" )
+ xmpp_starttls = "<starttls xmlns='urn:ietf:params:xml:ns:xmpp-tls'/>"
+
+ connection = create_connection((host, port))
+ connection.sendall(xmpp_open.format(name).encode('utf-8'))
+ response = connection.recv(4096).decode('utf-8')
+
+ if not '</stream:features>' in response:
+ connection.recv(4096)
+
+ connection.sendall(xmpp_starttls.encode('utf-8'))
+ connection.recv(4096)
+
+ connection = context.wrap_socket(connection, server_hostname=name)
+ connection.do_handshake()
+
+ cert = connection.getpeercert()
+ return self.check_cert(cert, host, port, name)
+
+ def remote_check_smtp(self, context, host, port, name):
smtp = SMTP(host, port)
try:
smtp.starttls(context=context)
return 2
cert = smtp.sock.getpeercert()
- return self.check_cert(cert, host, port)
+ return self.check_cert(cert, host, port, name)
- def remote_check_ssl(self, context, host, port):
+ def remote_check_ssl(self, context, host, port, name):
connection = context.wrap_socket(socket(AF_INET6),
- server_hostname=host)
+ server_hostname=name)
try:
connection.connect((host, port))
except SSLError:
return 2
cert = connection.getpeercert()
- return self.check_cert(cert, host, port)
+ return self.check_cert(cert, host, port, name)
- def check_cert(self, data, host, port):
+ def check_cert(self, data, host, port, name):
expiretimestamp = cert_time_to_seconds(data['notAfter'])
delta = datetime.utcfromtimestamp(expiretimestamp) - datetime.utcnow()
+ deltastr = str(delta).split(",")
if delta < self.crit:
- print("CRIT (expires in %s) %s:%d" % (delta, host, port))
+ print("CRIT (expires in %8s,%16s) %s:%d" % (deltastr[0], deltastr[1], name, port))
return 2
elif delta < self.warn:
- print("WARN (expires in %s) %s:%d" % (delta, host, port))
+ print("WARN (expires in %8s,%16s) %s:%d" % (deltastr[0], deltastr[1], name, port))
return 1
def main():
global VERBOSE
parser = OptionParser()
+ parser.add_option("--config", action="store", type="string", dest="config",
+ help="configuration file to use")
parser.add_option("-n", "--name",
- action="append", type="string", dest="hosts",
+ action="append", type="string", dest="names",
help="hostname:port to check for expired certificates")
parser.add_option("-w", "--warning-days",
- action="store", type=int, dest="warn", default=15,
+ action="store", type=int, dest="warn",
help="minimum remaining validity in days before a warning is issued")
parser.add_option("-c", "--critical-days",
- action="store", type=int, dest="crit", default=5,
+ action="store", type=int, dest="crit",
help="minimum remaining validity in days before a warning is issued")
parser.add_option("-v", action="store_true", dest="verbose", default=False)
parser.add_option("-q", action="store_false", dest="verbose")
parser.add_option("--ca", action="store", type="string", dest="ca",
- default="/etc/ssl/certs/ca-certificates.crt",
help="ca certificate bundle")
opts, _args = parser.parse_args()
- VERBOSE = opts.verbose
- if not opts.hosts:
+ if opts.config:
+ configuration = yaml.load(open(opts.config))
+ else:
+ configuration = dict()
+
+ if opts.names:
+ configuration['names'] = opts.names
+ if opts.warn:
+ configuration['warn_days'] = opts.warn
+ if opts.crit:
+ configuration['crit_days'] = opts.crit
+ if opts.ca:
+ configuration['cacertificates'] = opts.ca
+ if opts.verbose:
+ configuration['verbose'] = opts.verbose
+
+ if 'verbose' in configuration:
+ VERBOSE = configuration['verbose']
+
+ if not 'names' in configuration:
parser.error("needs at least one host")
- verifier = Verifier(opts.ca, timedelta(opts.warn), timedelta(opts.crit))
-
+ verifier = Verifier(configuration['cacertificates'] if 'cacertificates' in configuration else '/etc/ssl/certs/ca-certificates.crt',
+ timedelta(configuration['warn_days'] if 'warn_days' in configuration else 15),
+ timedelta(configuration['crit_days'] if 'crit_days' in configuration else 5))
+
try:
- hosts = [ (i[0], i[1], int(i[2])) for i in [ j.split(':', 2) for j in opts.hosts ] ]
+ hosts = [ (i[0], i[1], int(i[2]), i[3] if len(i) == 4 else i[1]) for i in [ j.split(':', 3) for j in configuration['names'] ] ]
except (ValueError, IndexError):
parser.error("names need to be in PROTO:DNSNAME:PORT format")
- for proto, host, port in hosts:
- verifier.check(proto, host, port)
+ for proto, host, port, name in hosts:
+ verifier.check(proto, host, port, name)
if __name__ == "__main__":
main()