|
|
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
-
- from peewee import *
-
- from datetime import datetime
- import os.path
- import subprocess
-
- from models.authority import Authority
- from models.certificate import Certificate
- from models.request import SignRequest
- from paths import *
-
-
- class UserSSHRequest(SignRequest):
- def __init__(self, req_id, user_name, root_requested, key_data):
- super(UserSSHRequest, self).__init__(req_id)
-
- self.user_name = user_name
- self.root_requested = root_requested
- self.key_data = key_data
-
- @property
- def name(self):
- return "User: %s [R:%d]" % (self.user_name, int(self.root_requested))
-
- @property
- def fields(self):
- return [
- ("User name", self.user_name),
- ("Root access requested", 'yes' if self.root_requested else 'no')
- ]
-
- @property
- def receiver(self):
- return self.user_name
-
-
- class HostSSHRequest(SignRequest):
- def __init__(self, req_id, host_name, key_data):
- super(HostSSHRequest, self).__init__(req_id)
-
- self.host_name = host_name
- self.key_data = key_data
-
- @property
- def name(self):
- return "Hostname: %s" % self.host_name
-
- @property
- def fields(self):
- return [
- ("Hostname", self.host_name)
- ]
-
- @property
- def receiver(self):
- return self.host_name
-
-
- class SSHAuthority(Authority):
-
- request_allowed = [ UserSSHRequest, HostSSHRequest, ]
-
- key_algorithm = 'ed25519'
-
- cert_validity = '+52w'
-
- def __bool__(self):
- """
- Check if key pair already exists
- """
- keys_pair_exist = os.path.exists(self.path) and os.path.exists(self.path + '.pub')
-
- return keys_pair_exist
-
- def generate(self):
- """
- Generate a SSHAuthority if the files associated
- do not exists
- """
- # check if the public key exists
- if not self:
- # let ssh-keygen do its job
- subprocess.check_output(['ssh-keygen',
- '-f', self.path,
- '-t', self.key_algorithm,
- '-C', self.name])
-
- else:
- raise ValueError('A CA with the same id already exists')
-
-
- def sign(self, request):
- """
- Sign a *SSHRequest with this certification authority
- """
-
- assert type(request) in self.request_allowed
-
- pub_key_path = request.destination
-
- cert = Certificate(
- signed_by = self,
- cert_id = request.req_id,
- date_issued = datetime.now(),
- receiver = self.receiver,
- serial_number = self.serial,
- validity_interval = self.user_validity,
- )
-
- # write the key data from the request into
- # the output folder
- with open(request.destination, 'w') as stream:
- stream.write(request.key_data)
-
- ca_private_key = self.path
-
- if type(request) == UserSSHRequest:
- login_names = [ request.user_name, ]
- if request.root_requested:
- login_names.append('root')
-
- subprocess.check_output(['ssh-keygen',
- '-s', ca_private_key,
- '-I', 'user_%s' % request.user_name,
- '-n', ','.join(login_names),
- '-V', self.user_validity,
- '-z', str(self.serial),
- pub_key_path])
-
-
- elif type(request) == HostSSHRequest:
- subprocess.check_output(['ssh-keygen',
- '-s', ca_private_key,
- '-I', 'host_%s' % request.host_name.replace('.', '_'),
- '-h',
- '-n', request.host_name,
- '-V', self.host_validity,
- '-z', str(self.serial),
- pub_key_path])
-
- self.serial += 1
-
- cert.save()
-
- return cert.path
-
-
|