]> git.siccegge.de Git - tooling/letool.git/blob - bin/newcert
6817b704518de9bb13519a83b2535018c9d8bb0c
[tooling/letool.git] / bin / newcert
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
3 # (C) Christoph Egger <christoph@christoph-egger.org>
4
5 from __future__ import print_function
6
7 from socket import getfqdn
8 import argparse
9 import logging
10 import os.path
11 import yaml
12 import time
13
14 from acme import client
15 from acme import jose
16 from acme import messages
17 from acme import challenges
18
19 from cryptography.hazmat.backends import default_backend
20 from cryptography.hazmat.primitives.asymmetric import rsa
21 from cryptography.hazmat.primitives import serialization
22 from cryptography.hazmat.primitives import hashes
23 from cryptography import x509
24 from cryptography.x509.oid import NameOID
25
26 import OpenSSL
27
28 import pexpect
29
30 from sicceggetools.acme import constants
31
32
33 def get_client():
34 logging.info("Loading account key")
35 with open("data/account.key.pem", "rb") as keyfd:
36 private_key = serialization.load_pem_private_key(
37 keyfd.read(),
38 password=None,
39 backend=default_backend()
40 )
41
42 logging.info("Loading account registration")
43 with open("data/registration.json", "rb") as regfd:
44 registration = messages.RegistrationResource.json_loads(regfd.read())
45
46 account_key = jose.JWKRSA(key=private_key)
47 acme_client = client.Client(constants.DIRECTORY_URL, account_key)
48
49 return registration, acme_client, account_key
50
51
52 def authorize(sans):
53 registration, acme_client, account_key = get_client()
54 authorizations = []
55 for san in sans:
56 authzr = acme_client.request_challenges(
57 identifier=messages.Identifier(typ=messages.IDENTIFIER_FQDN, value=san),
58 new_authzr_uri=registration.new_authzr_uri)
59 authorizations.append(authzr)
60 for challenge in authzr.body.challenges:
61 if isinstance(challenge.chall, challenges.DNS01):
62 print(san)
63 print(challenge.validation(account_key))
64 print(challenge.key_authorization(account_key))
65 ssh = pexpect.spawn("ssh _tls@ns1.siccegge.de acme")
66 ssh.expect("Hostname:")
67 ssh.sendline(san)
68 ssh.expect("Value:")
69 ssh.sendline(challenge.validation(account_key))
70 ssh.expect("OK")
71
72 break
73 else:
74 print("fallthrough")
75
76 time.sleep(5)
77 for authorization in authorizations:
78 for challenge in authorization.body.challenges:
79 if isinstance(challenge.chall, challenges.DNS01):
80 response = challenges.DNS01Response(key_authorization=challenge.key_authorization(account_key))
81 acme_client.answer_challenge(challenge, response)
82
83 while(True):
84 print("sleeping")
85 time.sleep(5)
86 new_authorizations = []
87 for authorization in authorizations:
88 new_auth, response = acme_client.poll(authorization)
89 new_authorizations.append(new_auth)
90 if new_auth.body.status != messages.Status("valid"):
91 break
92 else:
93 return new_authorizations
94
95
96 def get_certificate(cname, sans):
97 registration, acme_client, account_key = get_client()
98 authorizations = authorize(sans)
99
100 with open(os.path.join("certs", cname, "key.pem"), "rb") as keyfd:
101 private_key = serialization.load_pem_private_key(
102 keyfd.read(),
103 password=None,
104 backend=default_backend())
105
106 builder = x509.CertificateSigningRequestBuilder()
107 builder = builder.subject_name(x509.Name([
108 x509.NameAttribute(NameOID.COMMON_NAME, cname.decode()),
109 ]))
110 builder = builder.add_extension(
111 x509.SubjectAlternativeName([x509.DNSName(x.decode()) for x in sans]),
112 critical=False)
113
114 request = builder.sign(private_key, hashes.SHA512(), default_backend())
115 orequest = OpenSSL.crypto.load_certificate_request(
116 OpenSSL.crypto.FILETYPE_PEM,
117 request.public_bytes(serialization.Encoding.PEM))
118
119 jrequest = jose.util.ComparableX509(orequest)
120 cert = acme_client.request_issuance(jrequest, authorizations)
121 certs = acme_client.fetch_chain(cert)
122
123 with open(os.path.join("certs", cname, "cert.pem"), "wb") as certfd:
124 certfd.write(cert.body._dump(OpenSSL.crypto.FILETYPE_PEM))
125 for cert in certs:
126 certfd.write(cert._dump(OpenSSL.crypto.FILETYPE_PEM))
127
128 print(cname)
129 print(sans)
130 print(cert)
131
132
133 def main():
134 parser = argparse.ArgumentParser()
135
136 parser.add_argument('--servicetype', '-s', type=str)
137 parser.add_argument('certificate', type=str)
138 args = parser.parse_args()
139
140 with open("config/inventory.yaml") as invfd:
141 inventory = yaml.load(invfd.read())
142
143 certificate_list = inventory[getfqdn()][args.servicetype]
144 if type(certificate_list) is list:
145 if args.certificate in certificate_list:
146 get_certificate(args.certificate, [args.certificate])
147 elif type(certificate_list) is dict:
148 if args.certificate in certificate_list.keys():
149 get_certificate(args.certificate, certificate_list[args.certificate])
150 else:
151 print("unexpected type: %s", type(certificate_list))
152
153
154 if __name__ == '__main__':
155 main()