import sys
import codecs
import hashlib
+import logging
from .cert import get_spki
from unbound import RR_TYPE_A, RR_TYPE_AAAA
from unbound import idn2dname, ub_strerror
+try:
+ from unbound import RR_TYPE_TLSA
+except ImportError:
+ RR_TYPE_TLSA=52
+
def verify_tlsa_record(resolver, record, certificate):
- print(record)
- print(hashlib.sha256(certificate).hexdigest())
- s, r = resolver.resolve(record, rrtype=52)
+ s, r = resolver.resolve(record, rrtype=RR_TYPE_TLSA)
if 0 != s:
ub_strerror(s)
return
+ if r.data is None:
+ logging.error("No TLSA record returned")
+ return 2
+
for record in r.data.data:
hexencoder = codecs.getencoder('hex')
usage = record[0]
data = record[3:]
if usage != 3:
- sys.stderr.write("Only 'Domain-issued certificate' records supported\n")
+ logging.warning("Only 'Domain-issued certificate' records supported\n")
if selector == 0:
verifieddata = certificate
if matching == 0:
if verifieddata == data:
- print("success")
+ logging.info("Found matching record: `TLSA %d %d %d %s`", usage, selector, matching, hexencoder(data)[0])
return 0
elif matching == 1:
if hashlib.sha256(verifieddata).digest() == data:
- print("success")
+ logging.info("Found matching record: `TLSA %d %d %d %s`", usage, selector, matching, hexencoder(data)[0].decode())
return 0
elif matching == 2:
if hashlib.sha512(verifieddata).digest() == data:
- print("success")
+ logging.info("Found matching record: `TLSA %d %d %d %s`", usage, selector, matching, hexencoder(data)[0].decode())
return 0
else:
# currently only 0, 1 and 2 are assigned
- sys.stderr.write("Only matching types 0, 1 and 2 supported\n")
+ logging.warning("Only matching types 0, 1 and 2 supported\n")
- sys.stderr.write("could not verify any tlsa record\n")
- return -1
+ logging.error("could not verify any tlsa record\n")
+ return 2
import sys
import argparse
+import logging
from socket import socket, AF_INET6, AF_INET, create_connection
from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED, cert_time_to_seconds, SSLError, CertificateError, create_default_context
if args.ssl:
port = 465 if args.port == 0 else args.port
- connection = context.wrap_socket(socket(AF_INET),
- server_hostname=host)
- connection.connect(host, port)
+ connection = sslcontext.wrap_socket(socket(AF_INET),
+ server_hostname=host)
+ connection.connect((host, port))
else:
port = 25 if args.port == 0 else args.port
+
connection = create_connection((host, port))
- print(connection.recv(512))
+ answer = connection.recv(512)
+ logging.debug(answer)
+
connection.send(b"EHLO localhost\r\n")
- print(connection.recv(512))
+ answer = connection.recv(512)
+ logging.debug(answer)
+
connection.send(b"STARTTLS\r\n")
- print(connection.recv(512))
+ answer = connection.recv(512)
+ logging.debug(answer)
+
connection = sslcontext.wrap_socket(connection, server_hostname=host)
connection.do_handshake()
def close_connection(connection):
connection.send(b"QUIT\r\n")
- print(connection.recv(512))
+ answer = connection.recv(512)
+ logging.debug(answer)
def init(args):
def main():
+ logging.basicConfig(format='%(levelname)5s %(message)s')
parser = argparse.ArgumentParser()
parser.add_argument("Host")
+ parser.add_argument("--verbose", action="store_true")
+ parser.add_argument("--quiet", action="store_true")
parser.add_argument("-p", "--port",
action="store", type=int, default=0,
help="SMTP port")
group.add_argument("--64", action="store_false", help="check via IPv4 and IPv6 (default)")
args = parser.parse_args()
- sslcontext, resolver = init(args)
- print(args)
- connection = init_connection(sslcontext, args)
+ if args.verbose:
+ logging.getLogger().setLevel(logging.DEBUG)
+ elif args.quiet:
+ logging.getLogger().setLevel(logging.WARNING)
+ else:
+ logging.getLogger().setLevel(logging.INFO)
+
+ port = args.port
+ if port == 0:
+ port = 465 if args.ssl else 25
+ host = args.Host.encode('idna').decode()
+
+ sslcontext, resolver = init(args)
+ try:
+ connection = init_connection(sslcontext, args)
+ except ConnectionRefusedError:
+ logging.error("Connection refused")
+ return 2
- verify_tlsa_record(resolver, "_25._tcp.%s" % args.Host, connection.getpeercert(binary_form=True))
+ retval = verify_tlsa_record(resolver, "_%d._tcp.%s" % (port, host), connection.getpeercert(binary_form=True))
close_connection(connection)
+ return retval
if __name__ == '__main__':