diff --git a/ca_manager/models/ssl.py b/ca_manager/models/ssl.py index e4d93d8..bd6095c 100644 --- a/ca_manager/models/ssl.py +++ b/ca_manager/models/ssl.py @@ -4,8 +4,8 @@ from playhouse.gfk import * import os -from inspect import getsourcefile import subprocess +from tempfile import NamedTemporaryFile from .authority import Authority from .certificate import Certificate @@ -16,6 +16,14 @@ import json class HostSSLRequest(SignRequest): + v3_exts = { + 'subjectKeyIdentifier': 'hash', + 'authorityKeyIdentifier': 'keyid:always, issuer', + 'basicConstraints': 'critical, CA:FALSE', + 'keyUsage': 'critical, digitalSignature, keyEncipherment', + 'extendedKeyUsage': 'serverAuth', + } + def __init__(self, req_id, host_name, key_data): super().__init__(req_id) @@ -38,6 +46,14 @@ class HostSSLRequest(SignRequest): class UserSSLRequest(SignRequest): + v3_exts = { + 'subjectKeyIdentifier': 'hash', + 'authorityKeyIdentifier': 'keyid:always, issuer', + 'basicConstraints': 'critical, CA:FALSE', + 'keyUsage': 'critical, digitalSignature', + 'extendedKeyUsage': 'clientAuth', + } + def __init__(self, req_id, user_name, key_data): super().__init__(req_id) @@ -60,6 +76,15 @@ class UserSSLRequest(SignRequest): class CASSLRequest(SignRequest): + v3_exts = { + 'subjectKeyIdentifier': 'hash', + 'authorityKeyIdentifier': 'keyid:always, issuer', + 'basicConstraints': 'critical, CA:true, pathlen:0', + 'keyUsage': 'cRLSign, keyCertSign', + 'subjectAltName': 'email:copy', + 'issuerAltName': 'issuer:copy', + } + def __init__(self, req_id, ca_name, key_data): super().__init__(req_id) @@ -88,15 +113,32 @@ class SSLAuthority(Authority): CASSLRequest, ] - ca_key_algorithm = 'des3' - key_length = '4096' + key_encryption = 'aes256' + + key_format = 'ED25519' + key_format_extra = {} + + #key_format = 'RSA' + #key_format_extra = { + # 'rsa_keygen_bits': 4096, + # 'rsa_keygen_primes': 2, + # 'rsa_keygen_pubexp': 65537, + #} + + #key_format = 'EC' + #key_format_extra = { + # 'ec_paramgen_curve': 'P-256', + # 'ec_param_enc': 'named_curve', + #} - key_algorithm = 'sha256' root_ca_validity = '3650' ca_validity = '1825' cert_validity = '365' def generate(self): + """ + Generate a Root or non Root Certification Authority + """ if os.path.exists(self.path): raise ValueError('A CA with the same id and type already exists') confirm = input('Is a root CA? [y/N]> ') @@ -105,75 +147,105 @@ class SSLAuthority(Authority): else: self.isRoot = False - subprocess.check_output(['openssl', - 'genrsa', - '-%s' % self.ca_key_algorithm, - '-out', '%s' % (self.path), - self.key_length]) + # Create Private Key + cmd = [ + 'openssl', + 'genpkey', + '-{}'.format(self.key_encryption), + '-out', "{}.key".format(self.path), + '-algorithm', self.key_format, + ] + for k, v in self.key_format_extra.items(): + cmd += ['-pkeyopt', '{}:{}'.format(k, v)] + subprocess.check_output(cmd) + + # Create Certificate Request + cmd = [ + 'openssl', + 'req', + '-new', + '-key', "{}.key".format(self.path), + '-out', "{}.csr".format(self.path), + ] + subprocess.check_output(cmd) + if self.isRoot: - subprocess.check_output(['openssl', - 'req', - '-extensions', 'v3_root_ca', - '-config', os.path.join(os.path.dirname(os.path.abspath(getsourcefile(lambda:0))), '../openssl-config/openssl.cnf'), - '-new', - '-x509', - '-days', self.root_ca_validity, - '-key', self.path, - # '-extensions', 'v3_ca' - '-out', '%s.pub' % self.path, - # '-config', "%s.conf"%self.path - ]) + # If CA is Root, generate self signed certificate + v3_exts = { + 'subjectKeyIdentifier': 'hash', + 'authorityKeyIdentifier': 'keyid:always, issuer', + 'basicConstraints': 'critical, CA:true, pathlen:1', + 'keyUsage': 'cRLSign, keyCertSign', + 'subjectAltName': 'email:copy', + 'issuerAltName': 'issuer:copy', + } + with NamedTemporaryFile(mode='w') as extfile: + extfile.writelines( + ["{} = {}\n".format(k, v) + for k, v in v3_exts.items()]) + extfile.flush() + subprocess.check_output([ + 'openssl', + 'x509', + '-req', + '-days', self.root_ca_validity, + '-in', "{}.csr".format(self.path), + '-signkey', "{}.key".format(self.path), + '-out', "{}.crt".format(self.path), + '-extfile', extfile.name, + ]) + else: - subprocess.check_output(['openssl', - 'req', - '-new', - #'-x509', - # '-days', self.ca_validity, - '-key', self.path, - # '-extensions', 'v3_ca' - '-out', '%s.csr' % self.path, - # '-config', "%s.conf"%self.path - ]) + # If CA is not Root, format a JSON signing request result_dict = {} result_dict['keyType'] = 'ssl_ca' result_dict['caName'] = self.ca_id - with open("%s.csr" % self.path, 'r') as f: + with open("{}.csr".format(self.path), 'r') as f: result_dict['keyData'] = "".join(f.readlines()) - request = {'type': 'sign_request', 'request': result_dict} print('Please sign the following request:') print(json.dumps(request)) + # Init CA serial with open(self.path + '.serial', 'w') as stream: - stream.write(str(0)) + stream.write('01\n') def generate_certificate(self, request): """ Sign a *SSLRequest with this certification authority """ - if not os.path.exists('%s.pub' % self.path) and not self.isRoot: - raise ValueError("The CA certificate '%s.pub' doesn't exists yet" % self.path) + if not os.path.exists('%s.crt' % self.path) and not self.isRoot: + raise ValueError( + "The CA certificate '%s.crt' doesn't exists yet" % self.path) - pub_key_path = request.destination + csr_path = request.destination cert_path = request.cert_destination - with open(pub_key_path, 'w') as stream: + with open(csr_path, 'w') as stream: stream.write(request.key_data) - subprocess.check_output(['openssl', - 'x509', - '-req', - '-days', self.ca_validity, - '-in', pub_key_path, - '-CA', '%s.pub' % self.path, - '-CAkey', self.path, - '-CAcreateserial', - '-out', cert_path, - '-%s' % self.key_algorithm]) - + with NamedTemporaryFile(mode='w') as extfile: + extfile.writelines( + ["{} = {}\n".format(k, v) + for k, v in request.v3_exts.items()]) + extfile.flush() + subprocess.check_output([ + 'openssl', + 'x509', + '-req', + '-days', self.ca_validity, + '-in', csr_path, + '-CA', "{}.crt".format(self.path), + '-CAkey', "{}.key".format(self.path), + '-CAserial', "{}.serial".format(self.path), + '-out', cert_path, + '-extfile', extfile.name, + ]) + + # If it's not a RootCA append the full chain to the output cert if not self.isRoot: with open(cert_path, 'a') as cert_file: - with open('%s.pub' % self.path) as ca_cert_file: + with open("{}.crt".format(self.path), 'r') as ca_cert_file: cert_file.writelines(ca_cert_file.readlines()) return self.ca_validity