]> git.siccegge.de Git - dane-monitoring-plugins.git/blobdiff - check_dane/tlsa.py
Rework https checker
[dane-monitoring-plugins.git] / check_dane / tlsa.py
index 3f8b48919343ea8986d1453bd32a00004b8bd235..9d31b5d6c7f7eb8e5a309df64c2b13324a262f81 100644 (file)
@@ -14,6 +14,130 @@ try:
 except ImportError:
     RR_TYPE_TLSA = 52
 
+
+
+class TLSARecord:
+    """Class representing a TLSA record"""
+    def __init__(self, usage, selector, matching, payload):
+        self._usage = usage
+        self._selector = selector
+        self._matching = matching
+        self._payload = payload
+
+
+    def match(self, certificate):
+        """Returns true if the certificate is covered by this TLSA record"""
+        if self._selector == 0:
+            verifieddata = certificate
+        elif self._selector == 1:
+            verifieddata = get_spki(certificate)
+        else:
+            # currently only 0 and 1 are assigned
+            sys.stderr.write("Only selectors 0 and 1 supported\n")
+
+        if self._matching == 0:
+            if verifieddata == self._payload:
+                return True
+
+        elif self._matching == 1:
+            if hashlib.sha256(verifieddata).digest() == self._payload:
+                return True
+
+        elif self._matching == 2:
+            if hashlib.sha512(verifieddata).digest() == self._payload:
+                return True
+
+        else:
+            # currently only 0, 1 and 2 are assigned
+            logging.warning("Only matching types 0, 1 and 2 supported\n")
+
+        return False
+
+
+
+    @property
+    def usage(self):
+        """Usage for this TLSA record"""
+        return self._usage
+
+
+    @property
+    def selector(self):
+        """Selector for this record"""
+        return self._selector
+
+
+    @property
+    def matching(self):
+        """Way to match data against certificate"""
+        return self._matching
+
+
+    @property
+    def payload(self):
+        """Payload data of the TLSA record"""
+        return self._payload
+
+
+    def __repr__(self):
+        hexencoder = codecs.getencoder('hex')
+        return '<TLSA %d %d %d %s>' % (self._usage, self._selector, self._matching, hexencoder(self._payload)[0].decode())
+
+
+
+def get_tlsa_records(resolver, name):
+    """Extracts all TLSA records for a given name"""
+
+    logging.debug("searching for TLSA record on %s", name)
+    s, r = resolver.resolve(name, rrtype=RR_TYPE_TLSA)
+    if 0 != s:
+        ub_strerror(s)
+        return
+
+    if r.data is None:
+        logging.warn("No TLSA record returned")
+        return set()
+
+    result = set()
+    for record in r.data.data:
+        hexencoder = codecs.getencoder('hex')
+        usage = ord(record[0])
+        selector = ord(record[1])
+        matching = ord(record[2])
+        data = record[3:]
+        result.add(TLSARecord(usage, selector, matching, data))
+
+    return result
+
+
+def match_tlsa_records(records, certificates):
+    """Returns all TLSA records matching the certificate"""
+
+    usedrecords = set()
+    result = 0
+
+    for certificate in certificates:
+        recfound = False
+
+        for record in records:
+            if record.match(certificate):
+                logging.info("Matched record %s", record)
+                usedrecords.add(record)
+                recfound = True
+
+        if not recfound:
+            logging.error("No TLSA record returned")
+            result = 2
+
+    for record in records:
+        if not record in usedrecords:
+            logging.warn("Unused record %s", record)
+            if result == 0:
+                result = 1
+
+    return result
+
+
 def verify_tlsa_record(resolver, record, certificate):
     logging.debug("searching for TLSA record on %s", record)
     s, r = resolver.resolve(record, rrtype=RR_TYPE_TLSA)
@@ -27,9 +151,9 @@ def verify_tlsa_record(resolver, record, certificate):
 
     for record in r.data.data:
         hexencoder = codecs.getencoder('hex')
-        usage = record[0]
-        selector = record[1]
-        matching = record[2]
+        usage = ord(record[0])
+        selector = ord(record[1])
+        matching = ord(record[2])
         data = record[3:]
 
         if usage != 3: