from __future__ import print_function
from optparse import OptionParser
-from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED, cert_time_to_seconds, SSLError
+from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED, cert_time_to_seconds, SSLError, CertificateError
from socket import socket, AF_INET6
from datetime import datetime, timedelta
+from smtplib import SMTP
VERBOSE=False
-def check_cert(host, port, ca, warn, crit):
- context = SSLContext(PROTOCOL_TLSv1_2)
- context.verify_mode = CERT_REQUIRED
- context.load_verify_locations(ca)
- connection = context.wrap_socket(socket(AF_INET6),
+class Verifier:
+ def __init__(self, cafile, warn, crit):
+ self.cafile = cafile
+ self.crit = crit
+ self.warn = warn
+
+ def check(self, proto, host, port):
+ context = SSLContext(PROTOCOL_TLSv1_2)
+ context.verify_mode = CERT_REQUIRED
+ context.load_verify_locations(self.cafile)
+ if hasattr(self, 'remote_check_%s' % proto):
+ getattr(self, 'remote_check_%s' % proto)(context, host, port)
+
+ def remote_check_smtp(self, context, host, port):
+ smtp = SMTP(host, port)
+ try:
+ smtp.starttls(context=context)
+ except SSLError:
+ print("CRIT (invalid certificate) %s:%d" % (host, port))
+ return 2
+
+ cert = smtp.sock.getpeercert()
+ return self.check_cert(cert, host, port)
+
+ def remote_check_ssl(self, context, host, port):
+ connection = context.wrap_socket(socket(AF_INET6),
server_hostname=host)
- try:
- connection.connect((host, port))
- except SSLError:
- print("CRIT (invalid certificate) %s:%d" % (host, port))
- return 2
-
- expiretimestamp = cert_time_to_seconds(connection.getpeercert()['notAfter'])
- delta = datetime.utcfromtimestamp(expiretimestamp) - datetime.utcnow()
+ try:
+ connection.connect((host, port))
+ except SSLError:
+ print("CRIT (invalid certificate) %s:%d" % (host, port))
+ return 2
+
+ cert = connection.getpeercert()
+ return self.check_cert(cert, host, port)
- if delta < crit:
- print("CRIT (expires in %s) %s:%d" % (delta, host, port))
- return 2
- elif delta < warn:
- print("WARN (expires in %s) %s:%d" % (delta, host, port))
- return 1
-
+ def check_cert(self, data, host, port):
+ expiretimestamp = cert_time_to_seconds(data['notAfter'])
+ delta = datetime.utcfromtimestamp(expiretimestamp) - datetime.utcnow()
+
+ if delta < self.crit:
+ print("CRIT (expires in %s) %s:%d" % (delta, host, port))
+ return 2
+ elif delta < self.warn:
+ print("WARN (expires in %s) %s:%d" % (delta, host, port))
+ return 1
def main():
global VERBOSE
if not opts.hosts:
parser.error("needs at least one host")
+ verifier = Verifier(opts.ca, timedelta(opts.warn), timedelta(opts.crit))
+
try:
- hosts = [ (i[0], int(i[1])) for i in [ j.split(':', 1) for j in opts.hosts ] ]
+ hosts = [ (i[0], i[1], int(i[2])) for i in [ j.split(':', 2) for j in opts.hosts ] ]
except (ValueError, IndexError):
- parser.error("names need to be in DNSNAME:PORT format")
+ parser.error("names need to be in PROTO:DNSNAME:PORT format")
- for host, port in hosts:
- check_cert(host, port, opts.ca, timedelta(opts.warn), timedelta(opts.crit))
+ for proto, host, port in hosts:
+ verifier.check(proto, host, port)
if __name__ == "__main__":
main()