Browse Source

draft for new descriptor API

less_magic_more_descriptors
Edoardo Putti 8 years ago
parent
commit
abfbfe3174
2 changed files with 48 additions and 73 deletions
  1. +37
    -65
      ca_manager.py
  2. +11
    -8
      ca_shell.py

+ 37
- 65
ca_manager.py View File

@ -11,7 +11,8 @@ import sqlite3
import tempfile import tempfile
from certificate_requests import * from certificate_requests import *
from paths import *
#from paths import *
from local import *
__doc__= """ __doc__= """
Define classes to interact with certificate requests and Certification Authority Define classes to interact with certificate requests and Certification Authority
@ -21,9 +22,11 @@ class CAManager(object):
""" """
Middleware to interact with ssh-keygen Middleware to interact with ssh-keygen
""" """
def __init__(self, path): def __init__(self, path):
self.path = path self.path = path
self.ca = CALookup(self.ssh_ca_dir, self.ssl_ca_dir) self.ca = CALookup(self.ssh_ca_dir, self.ssl_ca_dir)
self.request = RequestLookup()
def __enter__(self): def __enter__(self):
""" """
@ -57,69 +60,6 @@ class CAManager(object):
def ssl_ca_dir(self): def ssl_ca_dir(self):
return os.path.join(self.path, 'ssl_cas') return os.path.join(self.path, 'ssl_cas')
def create_ssh_ca(self, ca_id, ca_name):
"""
Create a new ssh certification authority, insert
it into the database
"""
ca_path = self._get_ssh_ca_path(ca_id)
authority = SSHAuthority(ca_id, ca_name, ca_path)
authority.generate()
c = self.conn.cursor()
c.execute("""INSERT INTO cas VALUES (?, ?, 'ssh')""",
(ca_id, ca_name))
self.conn.commit()
def create_ssl_ca(self, ca_id, ca_name):
"""
Create a new ssl certification authority, insert
it into the database
"""
ca_path = self._get_ssl_ca_path(ca_id)
authority = SSLAuthority(ca_id, ca_name, ca_path)
authority.generate()
c = self.conn.cursor()
c.execute("""INSERT INTO cas VALUES (?, ?, 'ssl')""",
(ca_id, ca_name))
self.conn.commit()
def get_cas_list(self):
"""
Get all the certification authorities saved in
the database
"""
c = self.conn.cursor()
c.execute("""SELECT id, name, type FROM cas""")
return c.fetchall()
def get_ca(self, ca_id):
"""
Get a specific certification authority from the database
"""
c = self.conn.cursor()
c.execute("""SELECT name, type FROM cas WHERE id = ?""", (ca_id, ))
result = c.fetchone()
if not result:
raise ValueError('Unknown CA "%s"'%ca_id)
ca_name, ca_type = result
if ca_type == 'ssh':
ca_path = self._get_ssh_ca_path(ca_id)
return SSHAuthority(ca_id, ca_name, ca_path)
elif ca_type == 'ssl':
ca_path = self._get_ssl_ca_path(ca_id)
return SSLAuthority(ca_id, ca_name, ca_path)
def get_requests(self, ca_type=None): def get_requests(self, ca_type=None):
req_objs = [] req_objs = []
@ -231,6 +171,38 @@ class CALookup(object):
(ca_id, ca_name, ca_type.lower())) (ca_id, ca_name, ca_type.lower()))
self.conn.commit() self.conn.commit()
class RequestLookup(object):
"""
Proxy to interact with the requests
"""
def __init__(self):
self.request_dir = REQUESTS_PATH
self.output_dir = OUTPUT_PATH
def __iter__(self):
pass
#return iter(c.fetchall())
def __delitem__(self, request_id):
"""
Delete a specific certificate request
"""
os.unlink(os.path.join(self.request_dir, request_id))
def __getitem__(self, request_id):
"""
Get a specific certificate request
"""
request_path = os.path.join(self.request_dir, request_id)
@property
def ssh(self):
pass
@property
def ssl(self):
pass
def init_manager(paths): def init_manager(paths):
""" """
Initiate the manager by creating the Initiate the manager by creating the
@ -269,7 +241,7 @@ def sign_request(ca_manager, request_name, authority_name):
request = None request = None
try: try:
authority = ca_manager.get_ca(authority_name)
authority = ca_manager.ca[authority_name]
except IndexError: except IndexError:
print("Could not find CA '%d'" % choosen_ca) print("Could not find CA '%d'" % choosen_ca)
return return


+ 11
- 8
ca_shell.py View File

@ -19,7 +19,7 @@ class CAManagerShell(cmd.Cmd, object):
def __init__(self, ca_manager): def __init__(self, ca_manager):
super(CAManagerShell, self).__init__() super(CAManagerShell, self).__init__()
self.ca_manager= ca_manager
self.ca_manager = ca_manager
def do_ls_ca(self, l): def do_ls_ca(self, l):
'List the available certification authorities: LS_CA' 'List the available certification authorities: LS_CA'
@ -75,7 +75,7 @@ class CAManagerShell(cmd.Cmd, object):
return results return results
def do_sign_request(self, l): def do_sign_request(self, l):
'Sign a request using a CA: SIGN_REQUEST ca_name request_id'
'Sign a request using a CA: SIGN_REQUEST ca_id request_id'
argv = l.split() argv = l.split()
argc = len(argv) argc = len(argv)
@ -94,13 +94,14 @@ class CAManagerShell(cmd.Cmd, object):
elif argc == 1: elif argc == 1:
ca_type = None ca_type = None
ca_id = argv[0]
try: try:
ca_type = self.ca_manager.get_ca(argv[0]).ca_type
ca_type = self.ca_manager.ca[ca_id].ca_type
except Exception as e: except Exception as e:
print ("Error: %s"%e) print ("Error: %s"%e)
return return
# print available requests # print available requests
print("Available request for CA %s (type %s)"%(argv[0], ca_type))
print("Available request for CA %s (type %s)" % (ca_id, ca_type))
print_available_requests(self.ca_manager, ca_type) print_available_requests(self.ca_manager, ca_type)
print("==================") print("==================")
@ -113,19 +114,21 @@ class CAManagerShell(cmd.Cmd, object):
def complete_sign_request(self, text, line, begidx, endidx): def complete_sign_request(self, text, line, begidx, endidx):
results = '' results = ''
argc = len(("%send"%line).split())
#too much magic
argc = len(( "%send" % line ).split() )
if argc == 2: if argc == 2:
results = [a[0] for a in self.ca_manager.get_cas_list() if a[0].startswith(text)]
results = [a[0] for a in self.ca_manager.ca if a[0].startswith(text)]
elif argc == 3: elif argc == 3:
ca_type = None ca_type = None
try: try:
ca_type = self.ca_manager.get_ca(line.split()[1]).ca_type
ca_id = line.split()[1]
ca_type = self.ca_manager.ca[ca_id].ca_type
except Exception as e: except Exception as e:
print ("Error: %s"%e) print ("Error: %s"%e)
return return
results = [a for a in self.ca_manager.get_requests(ca_type) if str(a).startswith(text)]
results = [a for a in self.ca_manager.request[ca_type] if str(a).startswith(text)]
return results return results
def complete(self, text, state): def complete(self, text, state):


Loading…
Cancel
Save