X-Git-Url: https://git.siccegge.de//index.cgi?p=dane-monitoring-plugins.git;a=blobdiff_plain;f=check_dane%2Fhttps.py;fp=check_dane%2Fhttps.py;h=c437e47f20e039569d4b720420e5e1aa7e8d90fe;hp=0000000000000000000000000000000000000000;hb=9d773872479fe33744a40753b53f974ea6aa0594;hpb=41f20efa5664b53e68296c332ad1502773f4baaa diff --git a/check_dane/https.py b/check_dane/https.py new file mode 100644 index 0000000..c437e47 --- /dev/null +++ b/check_dane/https.py @@ -0,0 +1,88 @@ +#!/usr/bin/python3 + +from __future__ import print_function + +import sys +import argparse +import logging + +from socket import socket + +from check_dane.tlsa import get_tlsa_records, match_tlsa_records +from check_dane.cert import verify_certificate, add_certificate_options +from check_dane.abstract import DaneChecker + + +from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED + + +class HttpsDaneChecker(DaneChecker): + def _init_connection(self, family, host, port): + connection = self._sslcontext.wrap_socket(socket(family), + server_hostname=host) + connection.connect((host, port)) + connection.send(b"HEAD / HTTP/1.1\r\nHost: %s\r\n\r\n" % host.encode()) + answer = connection.recv(512) + logging.debug(answer) + + return connection + + + @property + def port(self): + return 443 + + + def _close_connection(self, connection): + connection.close() + + + def __init__(self): + DaneChecker.__init__(self) + + + def set_args(self, args): + DaneChecker.set_args(self, args) + + sslcontext = SSLContext(PROTOCOL_TLSv1_2) + sslcontext.verify_mode = CERT_REQUIRED + sslcontext.load_verify_locations(args.castore) + + self._sslcontext = sslcontext + + + def generate_menu(self, argparser): + DaneChecker.generate_menu(self, argparser) + argparser.add_argument("-p", "--port", + action="store", type=int, default=443, + help="HTTPS port") + + + + +def main(): + logging.basicConfig(format='%(levelname)5s %(message)s') + checker = HttpsDaneChecker() + parser = argparse.ArgumentParser() + + parser.add_argument("--verbose", action="store_true") + parser.add_argument("--quiet", action="store_true") + + checker.generate_menu(parser) + add_certificate_options(parser) + + args = parser.parse_args() + checker.set_args(args) + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + elif args.quiet: + logging.getLogger().setLevel(logging.WARNING) + else: + logging.getLogger().setLevel(logging.INFO) + + return checker.check() + +if __name__ == '__main__': + import sys + sys.exit(main())