From 04a07d4423916c9d8fb490bd3b8eea7fb63c4334 Mon Sep 17 00:00:00 2001 From: Edoardo Putti Date: Sat, 20 Aug 2016 01:32:08 +0200 Subject: [PATCH] move out certificates and authority classes from ca_manager --- ca_manager.py | 206 +------------------------------------------------- 1 file changed, 1 insertion(+), 205 deletions(-) diff --git a/ca_manager.py b/ca_manager.py index d05d5f2..c69e52f 100755 --- a/ca_manager.py +++ b/ca_manager.py @@ -11,211 +11,7 @@ import sqlite3 import subprocess import tempfile - - -class SignRequest(object): - def __init__(self, req_id): - self.req_id = req_id - - def get_name(self): - raise NotImplementedError() - - def get_fields(self): - raise NotImplementedError() - - -class UserSSHRequest(SignRequest): - def __init__(self, req_id, user_name, root_requested, key_data): - super().__init__(req_id) - - self.user_name = user_name - self.root_requested = root_requested - self.key_data = key_data - - def get_name(self): - return "User: %s [R:%d]" % (self.user_name, int(self.root_requested)) - - def get_fields(self): - return [ - ("User name", self.user_name), - ("Root access requested", 'yes' if self.root_requested else 'no') - ] - -class HostSSLRequest(SignRequest): - def __init__(self, req_id, host_name, key_data): - super().__init__(req_id) - - self.host_name = host_name - self.key_data = key_data - - def get_name(self): - return "Hostname: %s" % self.host_name - - def get_fields(self): - return [ - ("Hostname", self.host_name) - ] - -class HostSSHRequest(SignRequest): - def __init__(self, req_id, host_name, key_data): - super().__init__(req_id) - - self.host_name = host_name - self.key_data = key_data - - def get_name(self): - return "Hostname: %s" % self.host_name - - def get_fields(self): - return [ - ("Hostname", self.host_name) - ] - - -class Authority(object): - ca_type = None - - def __init__(self, ca_id, name, path): - self.ca_id = ca_id - self.name = name - self.path = path - - def generate(self): - raise NotImplementedError() - - def sign(self, request): - raise NotImplementedError() - - -class SSHAuthority(Authority): - ca_type = 'ssh' - - key_algorithm = 'ed25519' - - user_validity = '+52w' - host_validity = '+52w' - - def generate(self): - if os.path.exists(self.path): - raise ValueError("A CA with the same id and type already exists") - - subprocess.call(['ssh-keygen', - '-f', self.path, - '-t', self.key_algorithm, - '-C', self.name]) - - with open(self.path + '.serial', 'w') as stream: - stream.write(str(0)) - - - def sign(self, request): - global OUTPUT_PATH - - assert type(request) in [UserSSHRequest, HostSSHRequest] - - pub_key_path = os.path.join(OUTPUT_PATH, request.req_id + '.pub') - cert_path = os.path.join(OUTPUT_PATH, request.req_id + '-cert.pub') - - with open(self.path + '.serial', 'r') as stream: - next_serial = int(stream.read()) - with open(self.path + '.serial', 'w') as stream: - stream.write(str(next_serial + 1)) - - with open(pub_key_path, 'w') as stream: - stream.write(request.key_data) - - ca_private_key = self.path - - if type(request) == UserSSHRequest: - login_names = [request.user_name] - if request.root_requested: - login_names.append('root') - - subprocess.call(['ssh-keygen', - '-s', ca_private_key, - '-I', 'user_%s' % request.user_name, - '-n', ','.join(login_names), - '-V', self.user_validity, - '-z', str(next_serial), - pub_key_path]) - elif type(request) == HostSSHRequest: - subprocess.call(['ssh-keygen', - '-s', ca_private_key, - '-I', 'host_%s' % request.host_name.replace('.', '_'), - '-h', - '-n', request.host_name, - '-V', self.host_validity, - '-z', str(next_serial), - pub_key_path]) - - return cert_path - -class SSLAuthority(Authority): - ca_type = 'ssl' - - ca_key_algorithm = 'des3' - key_length = '4096' - - key_algorithm = 'sha256' - ca_validity = '365' - cert_validity = '365' - - def generate(self): - if os.path.exists(self.path): - raise ValueError("A CA with the same id and type already exists") - - subprocess.call(['openssl', - 'genrsa', - '-%s'%self.ca_key_algorithm, - '-out', '%s'%(self.path), - self.key_length]) - - subprocess.call(['openssl', - 'req', - '-new', - '-x509', - '-days', self.ca_validity, - '-key', self.path, - # '-extensions', 'v3_ca' - '-out', "%s.pub"%self.path, - # '-config', "%s.conf"%self.path - ]) - - with open(self.path + '.serial', 'w') as stream: - stream.write(str(0)) - - - def sign(self, request): - global OUTPUT_PATH - - assert type(request) in [HostSSLRequest] - - pub_key_path = os.path.join(OUTPUT_PATH, request.req_id + '.pub') - cert_path = os.path.join(OUTPUT_PATH, request.req_id + '-cert.pub') - - with open(self.path + '.serial', 'r') as stream: - next_serial = int(stream.read()) - with open(self.path + '.serial', 'w') as stream: - stream.write(str(next_serial + 1)) - - with open(pub_key_path, 'w') as stream: - stream.write(request.key_data) - - ca_private_key = self.path - - # openssl x509 -req -days 360 -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt # print() - subprocess.check_output(['openssl', - 'x509', - '-req', - '-days', self.ca_validity, - '-in', pub_key_path, - '-CA', "%s.pub"%self.path, - '-CAkey', self.path, - '-CAcreateserial', - '-out', cert_path, - '-%s'%self.key_algorithm]) - - return cert_path +from certificate_requests import * from paths import *