Easy CA management
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

132 lines
3.2 KiB

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import cmd
import hashlib
import json
import os
import os.path
import shutil
import sqlite3
import tempfile
from certificate_requests import *
from paths import *
__doc__= """
Define classes
"""
class CALookup(object):
"""
Proxy to interact with the database, get CA as element or as list
"""
def __init__(self, ssh_ca_dir, ssl_ca_dir):
"""
The connection attribute is setted by the CAManager instance
when used
"""
self.conn = None
self.ssh_ca_dir = ssh_ca_dir
self.ssl_ca_dir = ssl_ca_dir
def __iter__(self):
c = self.conn.cursor()
c.execute("""SELECT id, name, type FROM cas""")
return iter(c.fetchall())
def __delitem__(self, ca_id):
"""
Delete a specific certification authority from the database
"""
c = self.conn.cursor()
c.execute("""DELETE FROM cas WHERE id = ?""", (ca_id, ))
def __getitem__(self, ca_id):
"""
Get a specific certification authority from the database
"""
c = self.conn.cursor()
c.execute("""SELECT name, type FROM cas WHERE id = ?""", (ca_id, ))
result = c.fetchone()
if not result:
raise ValueError('Unknown CA "%s"' % ca_id)
ca_name, ca_type = result
if ca_type.lower() == 'ssh':
return SSHAuthority(ca_id, ca_name, self.ssh_ca_dir)
elif ca_type.lower() == 'ssl':
return SSLAuthority(ca_id, ca_name, self.ssl_ca_dir)
def __setitem__(self, ca_id, ca_value):
"""
Create a new certification authority, insert
it into the database
"""
ca_name, ca_type = ca_value
authority = None
if ca_type == 'ssh':
authority = SSHAuthority(ca_id, ca_name, self.ssh_ca_dir)
elif ca_type == 'ssl':
authority = SSLAuthority(ca_id, ca_name, self.ssl_ca_dir)
else:
raise ValueError('CA type is not supported')
authority.generate()
c = self.conn.cursor()
c.execute("""INSERT INTO cas VALUES (?, ?, ?)""",
(ca_id, ca_name, ca_type.lower()))
self.conn.commit()
class RequestLookup(object):
"""
Proxy to interact with the requests
"""
def __init__(self):
self.request_dir = REQUESTS_PATH
self.output_dir = OUTPUT_PATH
def __iter__(self):
"""
Iterate over all certificate request in REQUEST_PATH
"""
req_objs = []
for request_id in os.listdir(self.request_dir):
"""
request_id is formatted as uuid
"""
with RequestLoader(request_id) as request:
req_objs.append(request)
return iter(req_objs)
def __delitem__(self, request_id):
"""
Delete a specific certificate request
"""
os.unlink(os.path.join(self.request_dir, request_id))
def __getitem__(self, request_id):
"""
Get a specific certificate request
"""
request_path = os.path.join(self.request_dir, request_id)
@property
def ssh(self):
pass
@property
def ssl(self):
pass