-import pexpect
-
-from sicceggetools.acme import constants
-
-
-def get_client():
- logging.info("Loading account key")
- with open("data/account.key.pem", "rb") as keyfd:
- private_key = serialization.load_pem_private_key(
- keyfd.read(),
- password=None,
- backend=default_backend()
- )
-
- logging.info("Loading account registration")
- with open("data/registration.json", "rb") as regfd:
- registration = messages.RegistrationResource.json_loads(regfd.read())
-
- account_key = jose.JWKRSA(key=private_key)
- acme_client = client.Client(constants.DIRECTORY_URL, account_key)
-
- return registration, acme_client, account_key
-
-
-def authorize(sans):
- registration, acme_client, account_key = get_client()
- authorizations = []
- for san in sans:
- authzr = acme_client.request_challenges(
- identifier=messages.Identifier(typ=messages.IDENTIFIER_FQDN, value=san),
- new_authzr_uri=registration.new_authzr_uri)
- authorizations.append(authzr)
- for challenge in authzr.body.challenges:
- if isinstance(challenge.chall, challenges.DNS01):
- print(san)
- print(challenge.validation(account_key))
- print(challenge.key_authorization(account_key))
- ssh = pexpect.spawn("ssh _tls@ns1.siccegge.de acme")
- ssh.expect("Hostname:")
- ssh.sendline(san)
- ssh.expect("Value:")
- ssh.sendline(challenge.validation(account_key))
- ssh.expect("OK")
-
- break
- else:
- print("fallthrough")
-
- time.sleep(5)
- for authorization in authorizations:
- for challenge in authorization.body.challenges:
- if isinstance(challenge.chall, challenges.DNS01):
- response = challenges.DNS01Response(key_authorization=challenge.key_authorization(account_key))
- acme_client.answer_challenge(challenge, response)
-
- while(True):
- print("sleeping")
- time.sleep(5)
- new_authorizations = []
- for authorization in authorizations:
- new_auth, response = acme_client.poll(authorization)
- new_authorizations.append(new_auth)
- if new_auth.body.status != messages.Status("valid"):
- break
- else:
- return new_authorizations
-
-
-def get_certificate(servicetype, cname, sans):
- registration, acme_client, account_key = get_client()
- authorizations = authorize(sans)
-
- with open(os.path.join("certs", servicetype, cname, "key.pem"), "rb") as keyfd:
- private_key = serialization.load_pem_private_key(
- keyfd.read(),
- password=None,
- backend=default_backend())
-
- builder = x509.CertificateSigningRequestBuilder()
- builder = builder.subject_name(x509.Name([
- x509.NameAttribute(NameOID.COMMON_NAME, cname.decode()),
- ]))
- builder = builder.add_extension(
- x509.SubjectAlternativeName([x509.DNSName(x.decode()) for x in sans]),
- critical=False)
-
- request = builder.sign(private_key, hashes.SHA512(), default_backend())
- orequest = OpenSSL.crypto.load_certificate_request(
- OpenSSL.crypto.FILETYPE_PEM,
- request.public_bytes(serialization.Encoding.PEM))
-
- jrequest = jose.util.ComparableX509(orequest)
- cert = acme_client.request_issuance(jrequest, authorizations)
- certs = acme_client.fetch_chain(cert)
-
- with open(os.path.join("certs", servicetype, cname, "cert.pem"), "wb") as certfd:
- certfd.write(cert.body._dump(OpenSSL.crypto.FILETYPE_PEM))
- for cert in certs:
- certfd.write(cert._dump(OpenSSL.crypto.FILETYPE_PEM))