Easy CA management
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

148 lines
3.7 KiB

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from itertools import chain
import json
import os
import os.path
from .models.ssh import SSHAuthority, UserSSHRequest, HostSSHRequest
from .models.ssl import SSLAuthority, HostSSLRequest, CASSLRequest
from .models.certificate import Certificate
from .models.request import SignRequest
from .paths import *
class CALookup:
"""
Proxy to interact with authorities
"""
allowed_auth = [
SSHAuthority,
SSLAuthority,
]
def __init__(self):
self.path = MANAGER_PATH
def __iter__(self):
all_the_authorities = [auth.select().iterator() for auth in self.allowed_auth]
return chain.from_iterable(all_the_authorities)
def __getitem__(self, ca_id):
for authority_type in self.allowed_auth:
try:
ca = authority_type.get(authority_type.ca_id == ca_id)
return ca
except authority_type.DoesNotExist:
continue
class RequestLookup:
"""
Proxy to interact with the requests
"""
def __init__(self):
self.request_dir = REQUESTS_PATH
self.output_dir = OUTPUT_PATH
def __iter__(self):
"""
Iterate over all certificate request in REQUEST_PATH
"""
for request_id in os.listdir(self.request_dir):
"""
request_id is formatted as uuid
"""
yield self[request_id]
def __delitem__(self, request_id):
"""
Delete a specific certificate request
"""
os.unlink(SignRequest(request_id).path)
def __getitem__(self, request_id):
"""
Get a specific certificate request
"""
with open(SignRequest(request_id).path, 'r') as stream:
request_data = json.load(
stream,
)
requester = request_data.get('userName', None) or request_data.get('hostName', None) or request_data.get('caName', None)
root_requested = request_data.get('rootRequested', False)
key_data = request_data.get('keyData', None)
values = request_data.values()
if 'ssh_user' in values:
return UserSSHRequest(
request_id,
requester,
root_requested,
key_data,
)
elif 'ssh_host' in values:
return HostSSHRequest(
request_id,
requester,
key_data,
)
elif 'ssl_host' in values:
return HostSSLRequest(
request_id,
requester,
key_data,
)
elif 'ssl_ca' in values:
return CASSLRequest(
request_id,
requester,
key_data,
)
else:
return SignRequest(request_id)
@property
def ssh(self):
pass
@property
def ssl(self):
pass
class CertificateLookup:
"""
Proxy to interact with certificates
"""
def __iter__(self):
self.cert_dir = OUTPUT_PATH
def __getitem__(self, certificate_id):
"""
Get a specific certificate from disk
"""
try:
return Certificate.get(Certificate.cert_id == certificate_id)
except Certificate.DoesNotExist:
raise IndexError()
def __iter__(self):
"""
Iterate over all certificate request in OUTPUT_PATH
"""
return Certificate.select().iterator()