]> git.siccegge.de Git - dane-monitoring-plugins.git/blobdiff - check_dane/abstract.py
Refactor TLSA service checks
[dane-monitoring-plugins.git] / check_dane / abstract.py
index 1373ed0e12613ddfa73bedb92a64c589bf9a14cc..4a3265b120919c940148e34beffba9b3d690f34f 100644 (file)
@@ -1,10 +1,9 @@
-from abc import ABCMeta, abstractmethod
-from unbound import ub_ctx
-from socket import socket, AF_INET6, AF_INET
-from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED
+from abc import ABC, abstractmethod
+from socket import AF_INET6, AF_INET
 
+from unbound import ub_ctx
 
-from check_dane.cert import verify_certificate, add_certificate_options
+from check_dane.cert import verify_certificate
 from check_dane.tlsa import get_tlsa_records, match_tlsa_records
 
 
@@ -15,18 +14,18 @@ class DaneError:
     pass
 
 
-class DaneChecker:
+class DaneChecker(ABC):
     def __init__(self):
         pass
 
 
     @abstractmethod
-    def _init_connection(self):
+    def _init_connection(self, family, host, port):
         pass
 
 
     @abstractmethod
-    def _close_connection(self):
+    def _close_connection(self, connection):
         pass
 
 
@@ -35,16 +34,12 @@ class DaneChecker:
     def port(self):
         pass
 
-    
+
     def _gather_certificates(self):
         retval = 0
         certificates = set()
         for afamily in self._afamilies:
-            try:
-                connection = self._init_connection(afamily, self._host, self.port)
-            except ConnectionRefusedError:
-                logging.error("Connection refused")
-                return 2
+            connection = self._init_connection(afamily, self._host, self.port)
 
             nretval = verify_certificate(connection.getpeercert(), self._args)
             retval = max(retval, nretval)
@@ -53,38 +48,38 @@ class DaneChecker:
             self._close_connection(connection)
 
         return certificates
-    
-    
+
+
     def _gather_records(self):
         return get_tlsa_records(self._resolver, "_%d._tcp.%s" % (self.port, self._host))
 
-        
+
     def generate_menu(self, argparser):
         argparser.add_argument("Host")
 
         argparser.add_argument("--check-dane",
-                            action="store_false",
-                            help="Verify presented certificate via DANE (default: enabled)")
+                               action="store_false",
+                               help="Verify presented certificate via DANE (default: enabled)")
         argparser.add_argument("--check-ca",
-                            action="store_false",
-                            help="Verify presented certificate via the CA system (default: enabled)")
+                               action="store_false",
+                               help="Verify presented certificate via the CA system (default: enabled)")
         argparser.add_argument("--check-expire",
-                            action="store_false",
-                            help="Verify presented certificate for expiration (default: enabled)")
+                               action="store_false",
+                               help="Verify presented certificate for expiration (default: enabled)")
 
         argparser.add_argument("-a", "--ancor",
-                            action="store", type=str, default="/usr/share/dns/root.key",
-                            help="DNSSEC root ancor")
+                               action="store", type=str, default="/usr/share/dns/root.key",
+                               help="DNSSEC root ancor")
         argparser.add_argument("--castore", action="store", type=str,
-                            default="/etc/ssl/certs/ca-certificates.crt",
-                            help="ca certificate bundle")
+                               default="/etc/ssl/certs/ca-certificates.crt",
+                               help="ca certificate bundle")
 
         group = argparser.add_mutually_exclusive_group()
         group.add_argument("-6", "--6", action="store_true", dest="use6", help="check via IPv6 only")
         group.add_argument("-4", "--4", action="store_true", dest="use4", help="check via IPv4 only")
 
 
-    def set_args(self, args):        
+    def set_args(self, args):
         self._args = args
         resolver = ub_ctx()
         resolver.add_ta_file(args.ancor)
@@ -98,7 +93,7 @@ class DaneChecker:
             self._afamilies = [AF_INET, AF_INET6]
 
         self._host = args.Host.encode('idna').decode()
-        
+
 
     def check(self):
         records = self._gather_records()