-from abc import ABCMeta, abstractmethod
-from unbound import ub_ctx
-from socket import socket, AF_INET6, AF_INET
-from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED
+from abc import ABC, abstractmethod
+from socket import AF_INET6, AF_INET
+from unbound import ub_ctx
-from check_dane.cert import verify_certificate, add_certificate_options
+from check_dane.cert import verify_certificate
from check_dane.tlsa import get_tlsa_records, match_tlsa_records
pass
-class DaneChecker:
+class DaneChecker(ABC):
def __init__(self):
pass
@abstractmethod
- def _init_connection(self):
+ def _init_connection(self, family, host, port):
pass
@abstractmethod
- def _close_connection(self):
+ def _close_connection(self, connection):
pass
def port(self):
pass
-
+
def _gather_certificates(self):
retval = 0
certificates = set()
for afamily in self._afamilies:
- try:
- connection = self._init_connection(afamily, self._host, self.port)
- except ConnectionRefusedError:
- logging.error("Connection refused")
- return 2
+ connection = self._init_connection(afamily, self._host, self.port)
nretval = verify_certificate(connection.getpeercert(), self._args)
retval = max(retval, nretval)
self._close_connection(connection)
return certificates
-
-
+
+
def _gather_records(self):
return get_tlsa_records(self._resolver, "_%d._tcp.%s" % (self.port, self._host))
-
+
def generate_menu(self, argparser):
argparser.add_argument("Host")
argparser.add_argument("--check-dane",
- action="store_false",
- help="Verify presented certificate via DANE (default: enabled)")
+ action="store_false",
+ help="Verify presented certificate via DANE (default: enabled)")
argparser.add_argument("--check-ca",
- action="store_false",
- help="Verify presented certificate via the CA system (default: enabled)")
+ action="store_false",
+ help="Verify presented certificate via the CA system (default: enabled)")
argparser.add_argument("--check-expire",
- action="store_false",
- help="Verify presented certificate for expiration (default: enabled)")
+ action="store_false",
+ help="Verify presented certificate for expiration (default: enabled)")
argparser.add_argument("-a", "--ancor",
- action="store", type=str, default="/usr/share/dns/root.key",
- help="DNSSEC root ancor")
+ action="store", type=str, default="/usr/share/dns/root.key",
+ help="DNSSEC root ancor")
argparser.add_argument("--castore", action="store", type=str,
- default="/etc/ssl/certs/ca-certificates.crt",
- help="ca certificate bundle")
+ default="/etc/ssl/certs/ca-certificates.crt",
+ help="ca certificate bundle")
group = argparser.add_mutually_exclusive_group()
group.add_argument("-6", "--6", action="store_true", dest="use6", help="check via IPv6 only")
group.add_argument("-4", "--4", action="store_true", dest="use4", help="check via IPv4 only")
- def set_args(self, args):
+ def set_args(self, args):
self._args = args
resolver = ub_ctx()
resolver.add_ta_file(args.ancor)
self._afamilies = [AF_INET, AF_INET6]
self._host = args.Host.encode('idna').decode()
-
+
def check(self):
records = self._gather_records()
from __future__ import print_function
-import sys
import argparse
import logging
+from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED
from socket import socket
-from check_dane.tlsa import get_tlsa_records, match_tlsa_records
-from check_dane.cert import verify_certificate, add_certificate_options
+from check_dane.cert import add_certificate_options
from check_dane.abstract import DaneChecker
-from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED
-
-
class HttpsDaneChecker(DaneChecker):
def _init_connection(self, family, host, port):
connection = self._sslcontext.wrap_socket(socket(family),
@property
def port(self):
- return 443
+ return self._port
+
-
def _close_connection(self, connection):
connection.close()
-
+
def __init__(self):
DaneChecker.__init__(self)
def set_args(self, args):
DaneChecker.set_args(self, args)
-
+
+ self._port = args.port
+
sslcontext = SSLContext(PROTOCOL_TLSv1_2)
sslcontext.verify_mode = CERT_REQUIRED
sslcontext.load_verify_locations(args.castore)
self._sslcontext = sslcontext
-
+
def generate_menu(self, argparser):
DaneChecker.generate_menu(self, argparser)
argparser.add_argument("-p", "--port",
action="store", type=int, default=443,
help="HTTPS port")
-
+
def main():
logging.getLogger().setLevel(logging.WARNING)
else:
logging.getLogger().setLevel(logging.INFO)
-
+
return checker.check()
+
if __name__ == '__main__':
import sys
sys.exit(main())
--- /dev/null
+#!/usr/bin/python3
+
+#!/usr/bin/python3
+
+from __future__ import print_function
+
+import argparse
+import logging
+
+from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED
+from socket import socket
+
+from check_dane.cert import add_certificate_options
+from check_dane.abstract import DaneChecker
+
+
+class SmtpDaneChecker(DaneChecker):
+ def _init_connection(self, family, host, port):
+
+ if self.ssl:
+ connection = self._sslcontext.wrap_socket(socket(family),
+ server_hostname=host)
+ connection.connect((host, port))
+ answer = connection.recv(512)
+ logging.debug(answer)
+
+ connection.send(b"EHLO localhost\r\n")
+ answer = connection.recv(512)
+ logging.debug(answer)
+
+ else:
+ connection = socket(family=family)
+ connection.connect((host, port))
+ answer = connection.recv(512)
+ logging.debug(answer)
+
+ connection.send(b"EHLO localhost\r\n")
+ answer = connection.recv(512)
+ logging.debug(answer)
+
+ connection.send(b"STARTTLS\r\n")
+ answer = connection.recv(512)
+ logging.debug(answer)
+
+ connection = self._sslcontext.wrap_socket(connection, server_hostname=host)
+ connection.do_handshake()
+
+ connection.send(b"EHLO localhost\r\n")
+ answer = connection.recv(512)
+ logging.debug(answer)
+
+ return connection
+
+
+ @property
+ def port(self):
+ return self._port
+
+
+ @property
+ def ssl(self):
+ return self._ssl
+
+
+ def _close_connection(self, connection):
+ connection.send(b"QUIT\r\n")
+ answer = connection.recv(512)
+ logging.debug(answer)
+ connection.close()
+
+
+ def __init__(self):
+ self._port = None
+ self._ssl = None
+ DaneChecker.__init__(self)
+
+
+ def set_args(self, args):
+ DaneChecker.set_args(self, args)
+
+ self._ssl = args.ssl
+ if args.port == 0:
+ self._port = 465 if args.ssl else 25
+ else:
+ self._port = args.port
+
+ sslcontext = SSLContext(PROTOCOL_TLSv1_2)
+ sslcontext.verify_mode = CERT_REQUIRED
+ sslcontext.load_verify_locations(args.castore)
+
+ self._sslcontext = sslcontext
+
+
+ def generate_menu(self, argparser):
+ DaneChecker.generate_menu(self, argparser)
+ argparser.add_argument("-p", "--port",
+ action="store", type=int, default=0,
+ help="SMTP port")
+ argparser.add_argument("--ssl",
+ action="store_true",
+ help="Use direct TLS connection instead of starttls (default: disabled)")
+
+
+
+
+
+def main():
+ logging.basicConfig(format='%(levelname)5s %(message)s')
+ checker = SmtpDaneChecker()
+ parser = argparse.ArgumentParser()
+
+ parser.add_argument("--verbose", action="store_true")
+ parser.add_argument("--quiet", action="store_true")
+
+ checker.generate_menu(parser)
+ add_certificate_options(parser)
+
+ args = parser.parse_args()
+ checker.set_args(args)
+
+ if args.verbose:
+ logging.getLogger().setLevel(logging.DEBUG)
+ elif args.quiet:
+ logging.getLogger().setLevel(logging.WARNING)
+ else:
+ logging.getLogger().setLevel(logging.INFO)
+
+ return checker.check()
+
+
+if __name__ == '__main__':
+ import sys
+ sys.exit(main())
return
if r.data is None:
- logging.warn("No TLSA record returned")
+ logging.warning("No TLSA record returned")
return set()
result = set()
for record in r.data.data:
- hexencoder = codecs.getencoder('hex')
- usage = ord(record[0])
- selector = ord(record[1])
- matching = ord(record[2])
+ usage = record[0]
+ selector = record[1]
+ matching = record[2]
data = record[3:]
result.add(TLSARecord(usage, selector, matching, data))
-
+
return result
for record in records:
if not record in usedrecords:
- logging.warn("Unused record %s", record)
+ logging.warning("Unused record %s", record)
if result == 0:
result = 1
--- /dev/null
+#!/usr/bin/python3
+
+#!/usr/bin/python3
+
+from __future__ import print_function
+
+import argparse
+import logging
+
+from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED
+from socket import socket
+
+from check_dane.tlsa import get_tlsa_records
+from check_dane.cert import add_certificate_options
+from check_dane.abstract import DaneChecker
+from check_dane.resolve import Resolver, srv_lookup
+
+XMPP_OPEN = ("<stream:stream xmlns='jabber:{0}' xmlns:stream='"
+ "http://etherx.jabber.org/streams' xmlns:tls='http://www.ietf.org/rfc/"
+ "rfc2595.txt' to='{1}' xml:lang='en' version='1.0'>")
+XMPP_CLOSE = "</stream:stream>"
+XMPP_STARTTLS = "<starttls xmlns='urn:ietf:params:xml:ns:xmpp-tls'/>"
+
+class XmppDaneChecker(DaneChecker):
+ def _init_connection(self, family, host, port):
+
+ logging.debug("Connecting to %s:%d", host, port)
+
+ connection = socket(family=family)
+ connection.connect((host, port))
+
+ connection.sendall(XMPP_OPEN.format(self.servicetype, self._hostname).encode())
+ answer = connection.recv(4096)
+ logging.debug(answer)
+
+ if not b'</stream:features>' in answer:
+ answer = connection.recv(4096)
+ logging.debug(answer)
+
+ connection.sendall(XMPP_STARTTLS.encode())
+ answer = connection.recv(4096)
+ logging.debug(answer)
+
+ print(host, self._hostname)
+ connection = self._sslcontext.wrap_socket(connection, server_hostname=self._hostname)
+ connection.do_handshake()
+
+ connection.sendall(XMPP_OPEN.format(self.servicetype, self._hostname).encode())
+ answer = connection.recv(4096)
+ logging.debug(answer)
+
+ if not b'</stream:features>' in answer:
+ answer = connection.recv(4096)
+ logging.debug(answer)
+
+ return connection
+
+
+ @property
+ def port(self):
+ return self._port
+
+
+ @property
+ def servicetype(self):
+ return self._type
+
+
+ def _gather_certificates(self):
+ result = set()
+ for (host, port), meta in self._endpoints:
+ self._host = host
+ self._port = port
+ self._type = meta['type']
+ result.update(DaneChecker._gather_certificates(self))
+
+ return result
+
+
+ def _gather_records(self):
+ result = set()
+ for (host, port), _ in self._endpoints:
+ print(repr((host, port)))
+ result.update(get_tlsa_records(self._resolver, "_%d._tcp.%s" % (port, host)))
+
+ return result
+
+
+ def _close_connection(self, connection):
+ connection.send(XMPP_CLOSE.encode())
+ answer = connection.recv(512)
+ logging.debug(answer)
+ connection.close()
+
+
+ def __init__(self):
+ self._port = None
+ self._ssl = None
+ self._host = None
+ self._hostname = None
+ DaneChecker.__init__(self)
+
+
+ def set_args(self, args):
+ DaneChecker.set_args(self, args)
+
+ sslcontext = SSLContext(PROTOCOL_TLSv1_2)
+ sslcontext.verify_mode = CERT_REQUIRED
+ sslcontext.load_verify_locations(args.castore)
+
+ cresolver = Resolver(args.ancor)
+ self._sslcontext = sslcontext
+
+ self._hostname = args.Host.encode('idna').decode()
+ endpoints = []
+ if not args.s2s:
+ for endpoint, meta in srv_lookup("_xmpp-client._tcp.%s" %
+ self._hostname,
+ cresolver):
+ meta['type'] = 'client'
+ endpoints.append((endpoint, meta))
+ if not args.c2s:
+ for endpoint, meta in srv_lookup("_xmpp-server._tcp.%s" %
+ self._hostname,
+ cresolver):
+ meta['type'] = 'server'
+ endpoints.append((endpoint, meta))
+
+ self._endpoints = endpoints
+
+
+ def generate_menu(self, argparser):
+ DaneChecker.generate_menu(self, argparser)
+ group = argparser.add_mutually_exclusive_group()
+ group.add_argument("--s2s", action="store_true",
+ help="Only check server-to-server connections")
+ group.add_argument("--c2s", action="store_true",
+ help="Only check client-to-server connections")
+ argparser.add_argument("-p", "--port",
+ action="store", type=int, default=0,
+ help="SMTP port")
+
+
+
+
+def main():
+ logging.basicConfig(format='%(levelname)5s %(message)s')
+ checker = XmppDaneChecker()
+ parser = argparse.ArgumentParser()
+
+ parser.add_argument("--verbose", action="store_true")
+ parser.add_argument("--quiet", action="store_true")
+
+ checker.generate_menu(parser)
+ add_certificate_options(parser)
+
+ args = parser.parse_args()
+ checker.set_args(args)
+
+ if args.verbose:
+ logging.getLogger().setLevel(logging.DEBUG)
+ elif args.quiet:
+ logging.getLogger().setLevel(logging.WARNING)
+ else:
+ logging.getLogger().setLevel(logging.INFO)
+
+ return checker.check()
+
+
+if __name__ == '__main__':
+ import sys
+ sys.exit(main())
+++ /dev/null
-#!/usr/bin/python3
-
-from __future__ import print_function
-
-import sys
-import argparse
-import logging
-
-from socket import socket, AF_INET6, AF_INET
-from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED
-from unbound import ub_ctx
-
-from check_dane.tlsa import verify_tlsa_record
-from check_dane.cert import verify_certificate, add_certificate_options
-
-def init_connection(sslcontext, family, host, port):
- connection = sslcontext.wrap_socket(socket(family),
- server_hostname=host)
- connection.connect((host, port))
- connection.send(b"HEAD / HTTP/1.1\r\nHost: %s\r\n\r\n" % host.encode())
- answer = connection.recv(512)
- logging.debug(answer)
-
- return connection
-
-
-def close_connection(connection):
- connection.close()
-
-
-def init(args):
- sslcontext = SSLContext(PROTOCOL_TLSv1_2)
- sslcontext.verify_mode = CERT_REQUIRED
- sslcontext.load_verify_locations(args.castore)
-
- resolver = ub_ctx()
- resolver.add_ta_file(args.ancor)
-
- return sslcontext, resolver
-
-
-def main():
- logging.basicConfig(format='%(levelname)5s %(message)s')
- parser = argparse.ArgumentParser()
- parser.add_argument("Host")
-
- parser.add_argument("--verbose", action="store_true")
- parser.add_argument("--quiet", action="store_true")
- parser.add_argument("-p", "--port",
- action="store", type=int, default=443,
- help="HTTPS port")
- parser.add_argument("--check-dane",
- action="store_false",
- help="Verify presented certificate via DANE (default: enabled)")
- parser.add_argument("--check-ca",
- action="store_false",
- help="Verify presented certificate via the CA system (default: enabled)")
- parser.add_argument("--check-expire",
- action="store_false",
- help="Verify presented certificate for expiration (default: enabled)")
-
- parser.add_argument("-a", "--ancor",
- action="store", type=str, default="/etc/unbound/root.key",
- help="DNSSEC root ancor")
- parser.add_argument("--castore", action="store", type=str,
- default="/etc/ssl/certs/ca-certificates.crt",
- help="ca certificate bundle")
-
- group = parser.add_mutually_exclusive_group()
- group.add_argument("-6", "--6", action="store_true", dest="use6", help="check via IPv6 only")
- group.add_argument("-4", "--4", action="store_true", dest="use4", help="check via IPv4 only")
- group.add_argument("--64", action="store_false", dest="use64", help="check via IPv4 and IPv6 (default)")
-
- add_certificate_options(parser)
-
- args = parser.parse_args()
-
- if args.verbose:
- logging.getLogger().setLevel(logging.DEBUG)
- elif args.quiet:
- logging.getLogger().setLevel(logging.WARNING)
- else:
- logging.getLogger().setLevel(logging.INFO)
-
- host = args.Host.encode('idna').decode()
- sslcontext, resolver = init(args)
-
- if args.use6:
- afamilies = [AF_INET6]
- elif args.use4:
- afamilies = [AF_INET6]
- else:
- afamilies = [AF_INET, AF_INET6]
-
- retval = 0
- for afamily in afamilies:
- try:
- connection = init_connection(sslcontext, afamily, host, args.port)
- except ConnectionRefusedError:
- logging.error("Connection refused")
- return 2
-
- nretval = verify_certificate(connection.getpeercert(), args)
- retval = max(retval, nretval)
- nretval = verify_tlsa_record(resolver, "_%d._tcp.%s" % (args.port, host),
- connection.getpeercert(binary_form=True))
- retval = max(retval, nretval)
-
- close_connection(connection)
-
- return retval
-
-
-if __name__ == '__main__':
- sys.exit(main())
+++ /dev/null
-#!/usr/bin/python3
-#
-#
-
-from __future__ import print_function
-
-import sys
-import argparse
-import logging
-
-from socket import socket, AF_INET6, AF_INET, create_connection
-from ssl import SSLError, CertificateError, SSLContext
-from ssl import PROTOCOL_TLSv1_2, CERT_REQUIRED
-from unbound import ub_ctx
-
-from check_dane.tlsa import verify_tlsa_record
-from check_dane.cert import verify_certificate, add_certificate_options
-
-def init_connection(sslcontext, args, family):
- host = args.Host
-
- if args.ssl:
- port = 465 if args.port == 0 else args.port
- connection = sslcontext.wrap_socket(socket(family),
- server_hostname=host)
- connection.connect((host, port))
- answer = connection.recv(512)
- logging.debug(answer)
-
- connection.send(b"EHLO localhost\r\n")
- answer = connection.recv(512)
- logging.debug(answer)
-
- else:
- port = 25 if args.port == 0 else args.port
-
- connection = socket(family=family)
- connection.connect((host, port))
- answer = connection.recv(512)
- logging.debug(answer)
-
- connection.send(b"EHLO localhost\r\n")
- answer = connection.recv(512)
- logging.debug(answer)
-
- connection.send(b"STARTTLS\r\n")
- answer = connection.recv(512)
- logging.debug(answer)
-
- connection = sslcontext.wrap_socket(connection, server_hostname=host)
- connection.do_handshake()
-
- connection.send(b"EHLO localhost\r\n")
- answer = connection.recv(512)
- logging.debug(answer)
-
- return connection
-
-
-def close_connection(connection):
- connection.send(b"QUIT\r\n")
- answer = connection.recv(512)
- logging.debug(answer)
-
-
-def init(args):
- sslcontext = SSLContext(PROTOCOL_TLSv1_2)
- sslcontext.verify_mode = CERT_REQUIRED
- sslcontext.load_verify_locations(args.castore)
-
- resolver = ub_ctx()
- resolver.add_ta_file(args.ancor)
-
- return sslcontext, resolver
-
-
-def main():
- logging.basicConfig(format='%(levelname)5s %(message)s')
- parser = argparse.ArgumentParser()
- parser.add_argument("Host")
-
- parser.add_argument("--verbose", action="store_true")
- parser.add_argument("--quiet", action="store_true")
- parser.add_argument("-p", "--port",
- action="store", type=int, default=0,
- help="SMTP port")
- parser.add_argument("--ssl",
- action="store_true",
- help="Use direct TLS connection instead of starttls (default: disabled)")
- parser.add_argument("--check-dane",
- action="store_false",
- help="Verify presented certificate via DANE (default: enabled)")
- parser.add_argument("--check-ca",
- action="store_false",
- help="Verify presented certificate via the CA system (default: enabled)")
- parser.add_argument("--check-expire",
- action="store_false",
- help="Verify presented certificate for expiration (default: enabled)")
-
- parser.add_argument("-a", "--ancor",
- action="store", type=str, default="/etc/unbound/root.key",
- help="DNSSEC root ancor")
- parser.add_argument("--castore", action="store", type=str,
- default="/etc/ssl/certs/ca-certificates.crt",
- help="ca certificate bundle")
-
- group = parser.add_mutually_exclusive_group()
- group.add_argument("-6", "--6", action="store_true", dest="use6", help="check via IPv6 only")
- group.add_argument("-4", "--4", action="store_true", dest="use4", help="check via IPv4 only")
- group.add_argument("--64", action="store_false", dest="use64", help="check via IPv4 and IPv6 (default)")
-
- add_certificate_options(parser)
-
- args = parser.parse_args()
-
- if args.verbose:
- logging.getLogger().setLevel(logging.DEBUG)
- elif args.quiet:
- logging.getLogger().setLevel(logging.WARNING)
- else:
- logging.getLogger().setLevel(logging.INFO)
-
- port = args.port
- if port == 0:
- port = 465 if args.ssl else 25
- host = args.Host.encode('idna').decode()
-
- sslcontext, resolver = init(args)
-
- if args.use6:
- afamilies = [AF_INET6]
- elif args.use4:
- afamilies = [AF_INET6]
- else:
- afamilies = [AF_INET, AF_INET6]
-
- retval = 0
- for afamily in afamilies:
- try:
- connection = init_connection(sslcontext, args, afamily)
- except ConnectionRefusedError:
- logging.error("Connection refused")
- return 2
-
- nretval = verify_certificate(connection.getpeercert(), args)
- retval = max(retval, nretval)
- nretval = verify_tlsa_record(resolver, "_%d._tcp.%s" % (port, host),
- connection.getpeercert(binary_form=True))
- retval = max(retval, nretval)
-
- close_connection(connection)
-
- return retval
-
-
-if __name__ == '__main__':
- sys.exit(main())
+++ /dev/null
-#!/usr/bin/python3
-#
-#
-
-from __future__ import print_function
-
-import sys
-import argparse
-import logging
-
-from socket import socket, AF_INET6, AF_INET, create_connection
-from ssl import SSLError, CertificateError, SSLContext
-from ssl import PROTOCOL_TLSv1_2, CERT_REQUIRED
-
-from unbound import ub_ctx
-
-from check_dane.resolve import Resolver, ResolverException, srv_lookup
-from check_dane.tlsa import verify_tlsa_record
-from check_dane.cert import verify_certificate, add_certificate_options
-
-XMPP_OPEN = ("<stream:stream xmlns='jabber:{0}' xmlns:stream='"
- "http://etherx.jabber.org/streams' xmlns:tls='http://www.ietf.org/rfc/"
- "rfc2595.txt' to='{1}' xml:lang='en' version='1.0'>")
-XMPP_CLOSE = "</stream:stream>"
-XMPP_STARTTLS = "<starttls xmlns='urn:ietf:params:xml:ns:xmpp-tls'/>"
-
-
-def init_connection(sslcontext, args, family, endpoint, metadata):
- host, _ = endpoint
- logging.debug("Connecting to %s", endpoint)
-
- connection = socket(family=family)
- connection.connect(endpoint)
-
- connection.sendall(XMPP_OPEN.format(metadata['type'], args.Host).encode())
- answer = connection.recv(4096)
- logging.debug(answer)
-
- if not '</stream:features>' in answer:
- answer = connection.recv(4096)
- logging.debug(answer)
-
- connection.sendall(XMPP_STARTTLS.encode())
- answer = connection.recv(4096)
- logging.debug(answer)
-
- connection = sslcontext.wrap_socket(connection, server_hostname=host)
- connection.do_handshake()
-
- connection.sendall(XMPP_OPEN.format(metadata['type'], args.Host).encode())
- answer = connection.recv(4096)
- logging.debug(answer)
-
- if not '</stream:features>' in answer:
- answer = connection.recv(4096)
- logging.debug(answer)
-
- return connection
-
-
-def close_connection(connection):
- connection.send(XMPP_CLOSE.encode())
- answer = connection.recv(512)
- logging.debug(answer)
-
-
-def init(args):
- sslcontext = SSLContext(PROTOCOL_TLSv1_2)
- sslcontext.verify_mode = CERT_REQUIRED
- sslcontext.load_verify_locations(args.castore)
-
- resolver = ub_ctx()
- resolver.add_ta_file(args.ancor)
-
- return sslcontext, resolver
-
-
-def main():
- logging.basicConfig(format='%(levelname)5s %(message)s')
- parser = argparse.ArgumentParser()
- parser.add_argument("Host")
-
- parser.add_argument("--verbose", action="store_true")
- parser.add_argument("--quiet", action="store_true")
- # parser.add_argument("-p", "--port",
- # action="store", type=int, default=0,
- # help="XMPP port")
- group = parser.add_mutually_exclusive_group()
- group.add_argument("--s2s", action="store_true",
- help="Only check server-to-server connections")
- group.add_argument("--c2s", action="store_true",
- help="Only check client-to-server connections")
- # parser.add_argument("--ssl",
- # action="store_true",
- # help="Use direct TLS connection instead of starttls (default: disabled)")
- parser.add_argument("--check-dane",
- action="store_false",
- help="Verify presented certificate via DANE (default: enabled)")
- parser.add_argument("--check-ca",
- action="store_false",
- help="Verify presented certificate via the CA system (default: enabled)")
- parser.add_argument("--check-expire",
- action="store_false",
- help="Verify presented certificate for expiration (default: enabled)")
-
- parser.add_argument("-a", "--ancor",
- action="store", type=str, default="/etc/unbound/root.key",
- help="DNSSEC root ancor")
- parser.add_argument("--castore", action="store", type=str,
- default="/etc/ssl/certs/ca-certificates.crt",
- help="ca certificate bundle")
-
- group = parser.add_mutually_exclusive_group()
- group.add_argument("-6", "--6", action="store_true", dest="use6", help="check via IPv6 only")
- group.add_argument("-4", "--4", action="store_true", dest="use4", help="check via IPv4 only")
- group.add_argument("--64", action="store_false", dest="use64", help="check via IPv4 and IPv6 (default)")
-
- add_certificate_options(parser)
-
- args = parser.parse_args()
-
- if args.verbose:
- logging.getLogger().setLevel(logging.DEBUG)
- elif args.quiet:
- logging.getLogger().setLevel(logging.WARNING)
- else:
- logging.getLogger().setLevel(logging.INFO)
-
- host = args.Host.encode('idna').decode()
-
- sslcontext, resolver = init(args)
-
- if args.use6:
- afamilies = [AF_INET6]
- elif args.use4:
- afamilies = [AF_INET6]
- else:
- afamilies = [AF_INET, AF_INET6]
-
- xresolver = Resolver(args.ancor)
- endpoints = []
- if not args.s2s:
- for endpoint, meta in srv_lookup("_xmpp-client._tcp.%s" % host, xresolver):
- meta['type'] = 'client'
- endpoints.append((endpoint, meta))
- if not args.c2s:
- for endpoint, meta in srv_lookup("_xmpp-server._tcp.%s" % host, xresolver):
- meta['type'] = 'server'
- endpoints.append((endpoint, meta))
-
- retval = 0
- for afamily in afamilies:
- for endpoint, metadata in endpoints:
- host, port = endpoint
- try:
- connection = init_connection(sslcontext, args, afamily, endpoint, metadata)
- except:
- logging.exception("Connection refused")
- return 2
-
- nretval = verify_certificate(connection.getpeercert(), args)
- retval = max(retval, nretval)
- nretval = verify_tlsa_record(resolver, "_%d._tcp.%s" % (port, host),
- connection.getpeercert(binary_form=True))
- retval = max(retval, nretval)
-
- close_connection(connection)
-
- return retval
-
-
-if __name__ == '__main__':
- sys.exit(main())
from setuptools import setup
setup(name='DANE monitoring plugins',
- version='0.1-alpha1',
+ version='0.1',
description='DANE aware monitoring plugins',
author='Christoph Egger',
author_email='christoph@christoph-egger.org',
entry_points={
'console_scripts': [
'check_dane_https = check_dane.https:main',
+ 'check_dane_smtp = check_dane.smtp:main',
+ 'check_dane_xmpp = check_dane.xmpp:main',
],
}
)