@ -5,6 +5,7 @@ from playhouse.gfk import *
import os
import subprocess
from tempfile import NamedTemporaryFile
from .authority import Authority
from .certificate import Certificate
@ -15,8 +16,11 @@ import json
class HostSSLRequest ( SignRequest ) :
x509_extensions = {
' keyUsage ' : ' digitalSignature,keyEncipherment ' ,
v3_exts = {
' subjectKeyIdentifier ' : ' hash ' ,
' authorityKeyIdentifier ' : ' keyid:always, issuer ' ,
' basicConstraints ' : ' critical, CA:FALSE ' ,
' keyUsage ' : ' critical, digitalSignature, keyEncipherment ' ,
' extendedKeyUsage ' : ' serverAuth ' ,
}
@ -42,8 +46,11 @@ class HostSSLRequest(SignRequest):
class UserSSLRequest ( SignRequest ) :
x509_extensions = {
' keyUsage ' : ' digitalSignature ' ,
v3_exts = {
' subjectKeyIdentifier ' : ' hash ' ,
' authorityKeyIdentifier ' : ' keyid:always, issuer ' ,
' basicConstraints ' : ' critical, CA:FALSE ' ,
' keyUsage ' : ' critical, digitalSignature ' ,
' extendedKeyUsage ' : ' clientAuth ' ,
}
@ -69,10 +76,10 @@ class UserSSLRequest(SignRequest):
class CASSLRequest ( SignRequest ) :
x509_extension s = {
v3_ext s = {
' subjectKeyIdentifier ' : ' hash ' ,
' authorityKeyIdentifier ' : ' keyid:always, issuer ' ,
' basicConstraints ' : ' critical, CA:true ' ,
' basicConstraints ' : ' critical, CA:true, pathlen:0 ' ,
' keyUsage ' : ' cRLSign, keyCertSign ' ,
' subjectAltName ' : ' email:copy ' ,
' issuerAltName ' : ' issuer:copy ' ,
@ -106,10 +113,24 @@ class SSLAuthority(Authority):
CASSLRequest ,
]
ca_key_algorithm = ' des3 '
key_length = ' 4096 '
key_encryption = ' aes256 '
key_format = ' ED25519 '
key_format_extra = { }
#key_format = 'RSA'
#key_format_extra = {
# 'rsa_keygen_bits': 4096,
# 'rsa_keygen_primes': 2,
# 'rsa_keygen_pubexp': 65537,
#}
#key_format = 'EC'
#key_format_extra = {
# 'ec_paramgen_curve': 'P-256',
# 'ec_param_enc': 'named_curve',
#}
key_algorithm = ' sha256 '
root_ca_validity = ' 3650 '
ca_validity = ' 1825 '
cert_validity = ' 365 '
@ -126,25 +147,31 @@ class SSLAuthority(Authority):
else :
self . isRoot = False
# Create Private Key
cmd = [
' openssl ' ,
' genpkey ' ,
' -aes256 ' ,
' -algorithm ' , ' ED25519 ' ,
' -{} ' . format ( self . key_encryption ) ,
' -out ' , " {}.key " . format ( self . path ) ,
' -algorithm ' , self . key_format ,
]
for k , v in self . key_format_extra . items ( ) :
cmd + = [ ' -pkeyopt ' , ' {}:{} ' . format ( k , v ) ]
subprocess . check_output ( cmd )
# Create Certificate Request
cmd = [
' openssl ' ,
' req ' ,
' -new ' ,
' -key ' , " {}.key " . format ( self . path ) ,
' -out ' , " {}.csr " . format ( self . path ) ,
]
subprocess . check_output ( cmd )
if self . isRoot :
x509_ext = {
# If CA is Root, generate self signed certificate
v3_exts = {
' subjectKeyIdentifier ' : ' hash ' ,
' authorityKeyIdentifier ' : ' keyid:always, issuer ' ,
' basicConstraints ' : ' critical, CA:true, pathlen:1 ' ,
@ -152,44 +179,45 @@ class SSLAuthority(Authority):
' subjectAltName ' : ' email:copy ' ,
' issuerAltName ' : ' issuer:copy ' ,
}
cmd + = [
' -x509 ' ,
' -days ' , self . root_ca_validity ,
' -out ' , " {}.crt " . format ( self . path ) ,
]
for k , v in x509_ext . items ( ) :
cmd + = [ ' -addext ' , " {}={} " . format ( k , v ) ]
subprocess . check_output ( cmd )
with NamedTemporaryFile ( mode = ' w ' ) as extfile :
extfile . writelines (
[ " {} = {} \n " . format ( k , v )
for k , v in v3_exts . items ( ) ] )
extfile . flush ( )
subprocess . check_output ( [
' openssl ' ,
' x509 ' ,
' -req ' ,
' -days ' , self . root_ca_validity ,
' -in ' , " {}.csr " . format ( self . path ) ,
' -signkey ' , " {}.key " . format ( self . path ) ,
' -out ' , " {}.crt " . format ( self . path ) ,
' -extfile ' , extfile . name ,
] )
else :
cmd + = [
' -out ' , " {}.csr " . format ( self . path ) ,
]
subprocess . check_output ( cmd )
# If CA is not Root, format a JSON signing request
result_dict = { }
result_dict [ ' keyType ' ] = ' ssl_ca '
result_dict [ ' caName ' ] = self . ca_id
with open ( " {}.csr " . format ( self . path ) , ' r ' ) as f :
result_dict [ ' keyData ' ] = " " . join ( f . readlines ( ) )
request = { ' type ' : ' sign_request ' , ' request ' : result_dict }
print ( ' Please sign the following request: ' )
print ( json . dumps ( request ) )
# Init CA serial
with open ( self . path + ' .serial ' , ' w ' ) as stream :
stream . write ( str ( 0 ) )
stream . write ( ' 01 \n ' )
def generate_certificate ( self , request ) :
"""
Sign a * SSLRequest with this certification authority
"""
if not os . path . exists ( ' %s .pub ' % self . path ) and not self . isRoot :
raise ValueError ( " The CA certificate ' %s .pub ' doesn ' t exists yet " % self . path )
if not os . path . exists ( ' %s .crt ' % self . path ) and not self . isRoot :
raise ValueError (
" The CA certificate ' %s .crt ' doesn ' t exists yet " % self . path )
csr_path = request . destination
cert_path = request . cert_destination
@ -197,24 +225,23 @@ class SSLAuthority(Authority):
with open ( csr_path , ' w ' ) as stream :
stream . write ( request . key_data )
cmd = [
' openssl ' ,
' x509 ' ,
' -req ' ,
' -days ' , self . ca_validity ,
' -in ' , csr_path ,
' -CA ' , " {}.crt " . format ( self . path ) ,
' -CAkey ' , " {}.key " . format ( self . path ) ,
' -CAcreateserial ' ,
' -out ' , cert_path ,
' -extfile ' , ' - ' ,
]
ext_string = ' \n ' . join (
" {} = {} " . format ( k , v ) for k , v in request . x509_extensions . items ( )
)
subprocess . check_output ( cmd , input = ext_string . encode ( ' utf-8 ' ) )
with NamedTemporaryFile ( mode = ' w ' ) as extfile :
extfile . writelines (
[ " {} = {} \n " . format ( k , v )
for k , v in request . v3_exts . items ( ) ] )
extfile . flush ( )
subprocess . check_output ( [
' openssl ' ,
' x509 ' ,
' -req ' ,
' -days ' , self . ca_validity ,
' -in ' , csr_path ,
' -CA ' , " {}.crt " . format ( self . path ) ,
' -CAkey ' , " {}.key " . format ( self . path ) ,
' -CAserial ' , " {}.serial " . format ( self . path ) ,
' -out ' , cert_path ,
' -extfile ' , extfile . name ,
] )
# If it's not a RootCA append the full chain to the output cert
if not self . isRoot :