#1 add default extensions for host and user role as per RFC 5280

Open
zolfa wants to merge 4 commits from rfc5280 into master
  1. +122
    -50
      ca_manager/models/ssl.py

+ 122
- 50
ca_manager/models/ssl.py View File

@ -4,8 +4,8 @@
from playhouse.gfk import * from playhouse.gfk import *
import os import os
from inspect import getsourcefile
import subprocess import subprocess
from tempfile import NamedTemporaryFile
from .authority import Authority from .authority import Authority
from .certificate import Certificate from .certificate import Certificate
@ -16,6 +16,14 @@ import json
class HostSSLRequest(SignRequest): class HostSSLRequest(SignRequest):
v3_exts = {
'subjectKeyIdentifier': 'hash',
'authorityKeyIdentifier': 'keyid:always, issuer',
'basicConstraints': 'critical, CA:FALSE',
'keyUsage': 'critical, digitalSignature, keyEncipherment',
'extendedKeyUsage': 'serverAuth',
}
def __init__(self, req_id, host_name, key_data): def __init__(self, req_id, host_name, key_data):
super().__init__(req_id) super().__init__(req_id)
@ -38,6 +46,14 @@ class HostSSLRequest(SignRequest):
class UserSSLRequest(SignRequest): class UserSSLRequest(SignRequest):
v3_exts = {
'subjectKeyIdentifier': 'hash',
'authorityKeyIdentifier': 'keyid:always, issuer',
'basicConstraints': 'critical, CA:FALSE',
'keyUsage': 'critical, digitalSignature',
'extendedKeyUsage': 'clientAuth',
}
def __init__(self, req_id, user_name, key_data): def __init__(self, req_id, user_name, key_data):
super().__init__(req_id) super().__init__(req_id)
@ -60,6 +76,15 @@ class UserSSLRequest(SignRequest):
class CASSLRequest(SignRequest): class CASSLRequest(SignRequest):
v3_exts = {
'subjectKeyIdentifier': 'hash',
'authorityKeyIdentifier': 'keyid:always, issuer',
'basicConstraints': 'critical, CA:true, pathlen:0',
'keyUsage': 'cRLSign, keyCertSign',
'subjectAltName': 'email:copy',
'issuerAltName': 'issuer:copy',
}
def __init__(self, req_id, ca_name, key_data): def __init__(self, req_id, ca_name, key_data):
super().__init__(req_id) super().__init__(req_id)
@ -88,15 +113,32 @@ class SSLAuthority(Authority):
CASSLRequest, CASSLRequest,
] ]
ca_key_algorithm = 'des3'
key_length = '4096'
key_encryption = 'aes256'
key_format = 'ED25519'
key_format_extra = {}
#key_format = 'RSA'
#key_format_extra = {
# 'rsa_keygen_bits': 4096,
# 'rsa_keygen_primes': 2,
# 'rsa_keygen_pubexp': 65537,
#}
#key_format = 'EC'
#key_format_extra = {
# 'ec_paramgen_curve': 'P-256',
# 'ec_param_enc': 'named_curve',
#}
key_algorithm = 'sha256'
root_ca_validity = '3650' root_ca_validity = '3650'
ca_validity = '1825' ca_validity = '1825'
cert_validity = '365' cert_validity = '365'
def generate(self): def generate(self):
"""
Generate a Root or non Root Certification Authority
"""
if os.path.exists(self.path): if os.path.exists(self.path):
raise ValueError('A CA with the same id and type already exists') raise ValueError('A CA with the same id and type already exists')
confirm = input('Is a root CA? [y/N]> ') confirm = input('Is a root CA? [y/N]> ')
@ -105,75 +147,105 @@ class SSLAuthority(Authority):
else: else:
self.isRoot = False self.isRoot = False
subprocess.check_output(['openssl',
'genrsa',
'-%s' % self.ca_key_algorithm,
'-out', '%s' % (self.path),
self.key_length])
# Create Private Key
cmd = [
'openssl',
'genpkey',
'-{}'.format(self.key_encryption),
'-out', "{}.key".format(self.path),
'-algorithm', self.key_format,
]
for k, v in self.key_format_extra.items():
cmd += ['-pkeyopt', '{}:{}'.format(k, v)]
subprocess.check_output(cmd)
# Create Certificate Request
cmd = [
'openssl',
'req',
'-new',
'-key', "{}.key".format(self.path),
'-out', "{}.csr".format(self.path),
]
subprocess.check_output(cmd)
if self.isRoot: if self.isRoot:
subprocess.check_output(['openssl',
'req',
'-extensions', 'v3_root_ca',
'-config', os.path.join(os.path.dirname(os.path.abspath(getsourcefile(lambda:0))), '../openssl-config/openssl.cnf'),
'-new',
'-x509',
'-days', self.root_ca_validity,
'-key', self.path,
# '-extensions', 'v3_ca'
'-out', '%s.pub' % self.path,
# '-config', "%s.conf"%self.path
])
# If CA is Root, generate self signed certificate
v3_exts = {
'subjectKeyIdentifier': 'hash',
'authorityKeyIdentifier': 'keyid:always, issuer',
'basicConstraints': 'critical, CA:true, pathlen:1',
'keyUsage': 'cRLSign, keyCertSign',
'subjectAltName': 'email:copy',
'issuerAltName': 'issuer:copy',
}
with NamedTemporaryFile(mode='w') as extfile:
extfile.writelines(
["{} = {}\n".format(k, v)
for k, v in v3_exts.items()])
extfile.flush()
subprocess.check_output([
'openssl',
'x509',
'-req',
'-days', self.root_ca_validity,
'-in', "{}.csr".format(self.path),
'-signkey', "{}.key".format(self.path),
'-out', "{}.crt".format(self.path),
'-extfile', extfile.name,
])
else: else:
subprocess.check_output(['openssl',
'req',
'-new',
#'-x509',
# '-days', self.ca_validity,
'-key', self.path,
# '-extensions', 'v3_ca'
'-out', '%s.csr' % self.path,
# '-config', "%s.conf"%self.path
])
# If CA is not Root, format a JSON signing request
result_dict = {} result_dict = {}
result_dict['keyType'] = 'ssl_ca' result_dict['keyType'] = 'ssl_ca'
result_dict['caName'] = self.ca_id result_dict['caName'] = self.ca_id
with open("%s.csr" % self.path, 'r') as f:
with open("{}.csr".format(self.path), 'r') as f:
result_dict['keyData'] = "".join(f.readlines()) result_dict['keyData'] = "".join(f.readlines())
request = {'type': 'sign_request', 'request': result_dict} request = {'type': 'sign_request', 'request': result_dict}
print('Please sign the following request:') print('Please sign the following request:')
print(json.dumps(request)) print(json.dumps(request))
# Init CA serial
with open(self.path + '.serial', 'w') as stream: with open(self.path + '.serial', 'w') as stream:
stream.write(str(0))
stream.write('01\n')
def generate_certificate(self, request): def generate_certificate(self, request):
""" """
Sign a *SSLRequest with this certification authority Sign a *SSLRequest with this certification authority
""" """
if not os.path.exists('%s.pub' % self.path) and not self.isRoot:
raise ValueError("The CA certificate '%s.pub' doesn't exists yet" % self.path)
if not os.path.exists('%s.crt' % self.path) and not self.isRoot:
raise ValueError(
"The CA certificate '%s.crt' doesn't exists yet" % self.path)
pub_key_path = request.destination
csr_path = request.destination
cert_path = request.cert_destination cert_path = request.cert_destination
with open(pub_key_path, 'w') as stream:
with open(csr_path, 'w') as stream:
stream.write(request.key_data) stream.write(request.key_data)
subprocess.check_output(['openssl',
'x509',
'-req',
'-days', self.ca_validity,
'-in', pub_key_path,
'-CA', '%s.pub' % self.path,
'-CAkey', self.path,
'-CAcreateserial',
'-out', cert_path,
'-%s' % self.key_algorithm])
with NamedTemporaryFile(mode='w') as extfile:
extfile.writelines(
["{} = {}\n".format(k, v)
for k, v in request.v3_exts.items()])
extfile.flush()
subprocess.check_output([
'openssl',
'x509',
'-req',
'-days', self.ca_validity,
'-in', csr_path,
'-CA', "{}.crt".format(self.path),
'-CAkey', "{}.key".format(self.path),
'-CAserial', "{}.serial".format(self.path),
'-out', cert_path,
'-extfile', extfile.name,
])
# If it's not a RootCA append the full chain to the output cert
if not self.isRoot: if not self.isRoot:
with open(cert_path, 'a') as cert_file: with open(cert_path, 'a') as cert_file:
with open('%s.pub' % self.path) as ca_cert_file:
with open("{}.crt".format(self.path), 'r') as ca_cert_file:
cert_file.writelines(ca_cert_file.readlines()) cert_file.writelines(ca_cert_file.readlines())
return self.ca_validity return self.ca_validity

Loading…
Cancel
Save