X-Git-Url: https://git.siccegge.de//index.cgi?p=tooling%2Fletool.git;a=blobdiff_plain;f=sicceggetools%2Facme%2Fauthorize.py;fp=sicceggetools%2Facme%2Fauthorize.py;h=3fe4bd294b848b41cdac4f1163beff5c9a1b68f5;hp=0000000000000000000000000000000000000000;hb=51cfaa176a021af7f611f3ffe024bafc99b696d0;hpb=e1de0bea6b56b5245178cedf4610f3d19e20f894 diff --git a/sicceggetools/acme/authorize.py b/sicceggetools/acme/authorize.py new file mode 100644 index 0000000..3fe4bd2 --- /dev/null +++ b/sicceggetools/acme/authorize.py @@ -0,0 +1,85 @@ +#!/usr/bin/python + +from functools import partial +import logging +import os.path +import time + +import pexpect + +from acme import messages +from acme import challenges + + +def _authorize_dns01(san, validation): + logging.info("Using DNS-01 for %s", san) + ssh = pexpect.spawn("ssh _tls@ns1.siccegge.de acme") + ssh.expect("Hostname:") + ssh.sendline(san) + ssh.expect("Value:") + ssh.sendline(validation) + ssh.expect("OK") + + +def _authorize_http01(san, key_auth): + logging.info("Using HTTP-01 for %s", san) + with open(os.path.join('/srv/tls/http-01/', key_auth.split('.')[0]), 'w') as fd: + fd.write(key_auth) + + +def _authorize_challenge(san, thechallenges, client, settings=None): + _, acme_client, account_key = client + responsefun = None + + for challenge in thechallenges: + if settings.use_method("HTTP01", san, settings) and isinstance(challenge.chall, challenges.HTTP01): + def _response(challenge): + response = challenges.HTTP01Response(key_authorization=challenge.key_authorization(account_key)) + acme_client.answer_challenge(challenge, response) + + _authorize_http01(san, challenge.key_authorization(account_key)) + responsefun = partial(_response, challenge) + + elif settings.use_method("DNS01", san, settings) and isinstance(challenge.chall, challenges.DNS01): + def _response(challenge): + response = challenges.DNS01Response(key_authorization=challenge.key_authorization(account_key)) + acme_client.answer_challenge(challenge, response) + + _authorize_dns01(san, challenge.validation(account_key)) + responsefun = partial(_response, challenge) + + return responsefun + + +def authorize(sans, client, settings=None): + registration, acme_client, _ = client + authorizations = [] + responsefuns = [] + + for san in sans: + authzr = acme_client.request_challenges( + identifier=messages.Identifier(typ=messages.IDENTIFIER_FQDN, value=san), + new_authzr_uri=registration.new_authzr_uri) + authorizations.append(authzr) + + result = _authorize_challenge(san, authzr.body.challenges, client, settings) + if result is None: + logging.warn("fallthrough") + else: + responsefuns.append(result) + + time.sleep(5) + for respfun in responsefuns: + respfun() + + while True: + logging.info("sleeping") + time.sleep(5) + new_authorizations = [] + for authorization in authorizations: + new_auth, _ = acme_client.poll(authorization) + new_authorizations.append(new_auth) + if new_auth.body.status != messages.Status("valid"): + break + else: + return new_authorizations