]> git.siccegge.de Git - tools.git/blobdiff - tls-check
rebuild actual zonefiles
[tools.git] / tls-check
index d16e80ab158e7a691f4fd7d0357e79972b192209..19a8dfc65d061f4c7e5d5e5dd6db4de564a8d906 100644 (file)
--- a/tls-check
+++ b/tls-check
 
 from __future__ import print_function
 from optparse import OptionParser
 
 from __future__ import print_function
 from optparse import OptionParser
-from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED, cert_time_to_seconds, SSLError
-from socket import socket, AF_INET6
+from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED, cert_time_to_seconds, SSLError, CertificateError
+from socket import socket, AF_INET6, create_connection
 from datetime import datetime, timedelta
 from datetime import datetime, timedelta
+from smtplib import SMTP
+import yaml
 
 VERBOSE=False
 
 
 VERBOSE=False
 
-def check_cert(host, port, ca, warn, crit):
-    context = SSLContext(PROTOCOL_TLSv1_2)
-    context.verify_mode = CERT_REQUIRED
-    context.load_verify_locations(ca)
-    connection = context.wrap_socket(socket(AF_INET6),
-                                     server_hostname=host)
-    try:
-        connection.connect((host, port))
-    except SSLError:
-        print("CRIT (invalid certificate) %s:%d" % (host, port))
-        return 2
-        
-    expiretimestamp = cert_time_to_seconds(connection.getpeercert()['notAfter'])
-    delta = datetime.utcfromtimestamp(expiretimestamp) - datetime.utcnow()
+class Verifier:
+    def __init__(self, cafile, warn, crit):
+        self.cafile = cafile
+        self.crit = crit
+        self.warn = warn
+
+    def check(self, proto, host, port, name):
+        context = SSLContext(PROTOCOL_TLSv1_2)
+        context.verify_mode = CERT_REQUIRED
+        context.load_verify_locations(self.cafile)
+        if hasattr(self, 'remote_check_%s' % proto):
+            getattr(self, 'remote_check_%s' % proto)(context, host, port, name)
+
+    def remote_check_xmpp(self, context, host, port, name):
+        xmpp_open = ("<stream:stream xmlns='jabber:client' xmlns:stream='"
+                     "http://etherx.jabber.org/streams' xmlns:tls='http://www.ietf.org/rfc/"
+                     "rfc2595.txt' to='{0}' xml:lang='en' version='1.0'>" )
+        xmpp_starttls = "<starttls xmlns='urn:ietf:params:xml:ns:xmpp-tls'/>"
+
+        connection = create_connection((host, port))
+        connection.sendall(xmpp_open.format(name).encode('utf-8'))
+        response = connection.recv(4096).decode('utf-8')
+
+        if not '</stream:features>' in response:
+            connection.recv(4096)
+
+        connection.sendall(xmpp_starttls.encode('utf-8'))
+        connection.recv(4096)
+
+        connection = context.wrap_socket(connection, server_hostname=name)
+        connection.do_handshake()
+
+        cert = connection.getpeercert()
+        return self.check_cert(cert, host, port, name)
 
 
-    if delta < crit:
-        print("CRIT (expires in %s) %s:%d" % (delta, host, port))
-        return 2
-    elif delta < warn:
-        print("WARN (expires in %s) %s:%d" % (delta, host, port))
-        return 1
-    
+    def remote_check_smtp(self, context, host, port, name):
+        smtp = SMTP(host, port)
+        try:
+            smtp.starttls(context=context)
+        except SSLError:
+            print("CRIT (invalid certificate) %s:%d" % (host, port))
+            return 2
+
+        cert = smtp.sock.getpeercert()
+        return self.check_cert(cert, host, port, name)
+
+    def remote_check_ssl(self, context, host, port, name):
+        connection = context.wrap_socket(socket(AF_INET6),
+                                     server_hostname=name)
+        try:
+            connection.connect((host, port))
+        except SSLError:
+            print("CRIT (invalid certificate) %s:%d" % (host, port))
+            return 2
+
+        cert = connection.getpeercert()
+        return self.check_cert(cert, host, port, name)
+
+    def check_cert(self, data, host, port, name):
+        expiretimestamp = cert_time_to_seconds(data['notAfter'])
+        delta = datetime.utcfromtimestamp(expiretimestamp) - datetime.utcnow()
+        deltastr = str(delta).split(",")
+
+        if delta < self.crit:
+            print("CRIT (expires in %8s,%16s) %s:%d" % (deltastr[0], deltastr[1], name, port))
+            return 2
+        elif delta < self.warn:
+            print("WARN (expires in %8s,%16s) %s:%d" % (deltastr[0], deltastr[1], name, port))
+            return 1
 
 def main():
     global VERBOSE
     parser = OptionParser()
 
 def main():
     global VERBOSE
     parser = OptionParser()
+    parser.add_option("--config", action="store", type="string", dest="config",
+                      help="configuration file to use")
     parser.add_option("-n", "--name",
     parser.add_option("-n", "--name",
-                      action="append", type="string", dest="hosts",
+                      action="append", type="string", dest="names",
                       help="hostname:port to check for expired certificates")
     parser.add_option("-w", "--warning-days",
                       help="hostname:port to check for expired certificates")
     parser.add_option("-w", "--warning-days",
-                      action="store", type=int, dest="warn", default=15,
+                      action="store", type=int, dest="warn",
                       help="minimum remaining validity in days before a warning is issued")
     parser.add_option("-c", "--critical-days",
                       help="minimum remaining validity in days before a warning is issued")
     parser.add_option("-c", "--critical-days",
-                      action="store", type=int, dest="crit", default=5,
+                      action="store", type=int, dest="crit",
                       help="minimum remaining validity in days before a warning is issued")
     parser.add_option("-v", action="store_true", dest="verbose", default=False)
     parser.add_option("-q", action="store_false", dest="verbose")
     parser.add_option("--ca", action="store", type="string", dest="ca",
                       help="minimum remaining validity in days before a warning is issued")
     parser.add_option("-v", action="store_true", dest="verbose", default=False)
     parser.add_option("-q", action="store_false", dest="verbose")
     parser.add_option("--ca", action="store", type="string", dest="ca",
-                      default="/etc/ssl/certs/ca-certificates.crt",
                       help="ca certificate bundle")
 
         
     opts, _args = parser.parse_args()
 
                       help="ca certificate bundle")
 
         
     opts, _args = parser.parse_args()
 
-    VERBOSE = opts.verbose
-    if not opts.hosts:
+    if opts.config:
+        configuration = yaml.load(open(opts.config))
+    else:
+        configuration = dict()
+
+    if opts.names:
+        configuration['names'] = opts.names
+    if opts.warn:
+        configuration['warn_days'] = opts.warn
+    if opts.crit:
+        configuration['crit_days'] = opts.crit
+    if opts.ca:
+        configuration['cacertificates'] = opts.ca
+    if opts.verbose:
+        configuration['verbose'] = opts.verbose
+
+    if 'verbose' in configuration:
+        VERBOSE = configuration['verbose']
+
+    if not 'names' in configuration:
         parser.error("needs at least one host")
 
         parser.error("needs at least one host")
 
+    verifier = Verifier(configuration['cacertificates'] if 'cacertificates' in configuration else '/etc/ssl/certs/ca-certificates.crt',
+                        timedelta(configuration['warn_days'] if 'warn_days' in configuration else 15),
+                        timedelta(configuration['crit_days'] if 'crit_days' in configuration else 5))
+
     try:
     try:
-        hosts = [ (i[0], int(i[1])) for i in [ j.split(':', 1) for j in opts.hosts ] ]
+        hosts = [ (i[0], i[1], int(i[2]), i[3] if len(i) == 4 else i[1]) for i in [ j.split(':', 3) for j in configuration['names'] ] ]
     except (ValueError, IndexError):
     except (ValueError, IndexError):
-        parser.error("names need to be in DNSNAME:PORT format")
+        parser.error("names need to be in PROTO:DNSNAME:PORT format")
         
         
-    for host, port in hosts:
-        check_cert(host, port, opts.ca, timedelta(opts.warn), timedelta(opts.crit))
+    for proto, host, port, name in hosts:
+        verifier.check(proto, host, port, name)
                       
 if __name__ == "__main__":
     main()
                       
 if __name__ == "__main__":
     main()