#!/usr/bin/python # -*- coding: utf-8 -*- # (C) Christoph Egger from __future__ import print_function from socket import getfqdn import argparse import logging import os.path import yaml import time from acme import client from acme import jose from acme import messages from acme import challenges from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import hashes from cryptography import x509 from cryptography.x509.oid import NameOID import OpenSSL import pexpect from sicceggetools.acme import constants def get_client(): logging.info("Loading account key") with open("data/account.key.pem", "rb") as keyfd: private_key = serialization.load_pem_private_key( keyfd.read(), password=None, backend=default_backend() ) logging.info("Loading account registration") with open("data/registration.json", "rb") as regfd: registration = messages.RegistrationResource.json_loads(regfd.read()) account_key = jose.JWKRSA(key=private_key) acme_client = client.Client(constants.DIRECTORY_URL, account_key) return registration, acme_client, account_key def authorize(sans): registration, acme_client, account_key = get_client() authorizations = [] for san in sans: authzr = acme_client.request_challenges( identifier=messages.Identifier(typ=messages.IDENTIFIER_FQDN, value=san), new_authzr_uri=registration.new_authzr_uri) authorizations.append(authzr) for challenge in authzr.body.challenges: if isinstance(challenge.chall, challenges.DNS01): print(san) print(challenge.validation(account_key)) print(challenge.key_authorization(account_key)) ssh = pexpect.spawn("ssh _tls@ns1.siccegge.de acme") ssh.expect("Hostname:") ssh.sendline(san) ssh.expect("Value:") ssh.sendline(challenge.validation(account_key)) ssh.expect("OK") break else: print("fallthrough") time.sleep(5) for authorization in authorizations: for challenge in authorization.body.challenges: if isinstance(challenge.chall, challenges.DNS01): response = challenges.DNS01Response(key_authorization=challenge.key_authorization(account_key)) acme_client.answer_challenge(challenge, response) while(True): print("sleeping") time.sleep(5) new_authorizations = [] for authorization in authorizations: new_auth, response = acme_client.poll(authorization) new_authorizations.append(new_auth) if new_auth.body.status != messages.Status("valid"): break else: return new_authorizations def get_certificate(cname, sans): registration, acme_client, account_key = get_client() authorizations = authorize(sans) with open(os.path.join("certs", cname, "key.pem"), "rb") as keyfd: private_key = serialization.load_pem_private_key( keyfd.read(), password=None, backend=default_backend()) builder = x509.CertificateSigningRequestBuilder() builder = builder.subject_name(x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, cname.decode()), ])) builder = builder.add_extension( x509.SubjectAlternativeName([x509.DNSName(x.decode()) for x in sans]), critical=False) request = builder.sign(private_key, hashes.SHA512(), default_backend()) orequest = OpenSSL.crypto.load_certificate_request( OpenSSL.crypto.FILETYPE_PEM, request.public_bytes(serialization.Encoding.PEM)) jrequest = jose.util.ComparableX509(orequest) cert = acme_client.request_issuance(jrequest, authorizations) certs = acme_client.fetch_chain(cert) with open(os.path.join("certs", cname, "cert.pem"), "wb") as certfd: certfd.write(cert.body._dump(OpenSSL.crypto.FILETYPE_PEM)) for cert in certs: certfd.write(cert._dump(OpenSSL.crypto.FILETYPE_PEM)) print(cname) print(sans) print(cert) def main(): parser = argparse.ArgumentParser() parser.add_argument('--servicetype', '-s', type=str) parser.add_argument('certificate', type=str) args = parser.parse_args() with open("config/inventory.yaml") as invfd: inventory = yaml.load(invfd.read()) certificate_list = inventory[getfqdn()][args.servicetype] if type(certificate_list) is list: if args.certificate in certificate_list: get_certificate(args.certificate, [args.certificate]) elif type(certificate_list) is dict: if args.certificate in certificate_list.keys(): get_certificate(args.certificate, certificate_list[args.certificate]) else: print("unexpected type: %s", type(certificate_list)) if __name__ == '__main__': main()