Easy CA management
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

282 lines
8.5 KiB

  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. import cmd
  4. import sys
  5. from datetime import datetime
  6. from models.ssh import SSHAuthority
  7. from models.ssl import SSLAuthority
  8. from manager import sign_request
  9. __doc__ = """
  10. Class to make a shell and interact with the user
  11. """
  12. class CAManagerShell(cmd.Cmd):
  13. intro = """# LILiK CA Manager #
  14. Welcome to the certification authority shell.
  15. Type help or ? to list commands.
  16. """
  17. prompt = "(CA Manager)> "
  18. def __init__(self, ca_manager):
  19. super(CAManagerShell, self).__init__()
  20. self.ca_manager = ca_manager
  21. def do_ls_cas(self, l):
  22. 'List the available certification authorities: LS_CA'
  23. for i, authority in enumerate(self.ca_manager.ca):
  24. print(authority)
  25. def do_ls_certificates(self, l):
  26. 'List the issued certificates: LS_CERTIFICATES'
  27. for i, cert in enumerate(self.ca_manager.certificate):
  28. print(cert)
  29. def do_ls_requests(self, l):
  30. 'List the available certification requests: LS_REQUESTS'
  31. print_available_requests(self.ca_manager)
  32. def do_describe_ca(self, l):
  33. 'Show certification authority information: DESCRIBE_CA ca_id'
  34. argv = l.split()
  35. argc = len(argv)
  36. # argument number is too low
  37. if argc < 1:
  38. print("Usage: DESCRIBE_CA ca_id")
  39. return
  40. ca = self.ca_manager.ca[argv[0]]
  41. if ca:
  42. ca_description = """
  43. Certification authority: %s
  44. --------------------------------------------------
  45. CA type: %s
  46. CA name: %s
  47. Serial: %s
  48. """
  49. ca_info = (
  50. ca.ca_id,
  51. ca.__class__.__name__,
  52. ca.name,
  53. ca.serial,
  54. )
  55. print(ca_description % ca_info)
  56. else:
  57. print("No CA found for id: '%s'" % argv[0])
  58. def do_describe_certificate(self, l):
  59. 'Show certificate information: DESCRIBE_CERTIFICATE request_id'
  60. argv = l.split()
  61. argc = len(argv)
  62. # argument number is too low
  63. if argc < 1:
  64. print("Usage: DESCRIBE_CERTIFICATE request_id")
  65. return
  66. cert = self.ca_manager.certificate[argv[0]]
  67. if cert:
  68. cert_description = """
  69. Certificate %s
  70. --------------------------------------------------
  71. Signin authority: %s
  72. Signed on: %s
  73. Receiver: %s
  74. Certificate Serial: %s
  75. Validity Interval: %s
  76. """
  77. request_info = (
  78. cert.cert_id,
  79. cert.signed_by,
  80. cert.date_issued,
  81. cert.receiver,
  82. cert.serial_number,
  83. cert.validity_interval,
  84. )
  85. print(cert_description % cert_info)
  86. else:
  87. print('No certificate found for id: "%s"' % argv[0])
  88. pass
  89. def do_describe_request(self, l):
  90. 'Show sign request information: DESCRIBE_REQUEST request_id'
  91. argv = l.split()
  92. argc = len(argv)
  93. # argument number is too low
  94. if argc < 1:
  95. print("Usage: DESCRIBE_REQUEST request_id")
  96. return
  97. request = self.ca_manager.request[argv[0]]
  98. if request:
  99. request_description = """
  100. Request %s
  101. --------------------------------------------------
  102. Request type: %s
  103. %s
  104. Key %s
  105. """
  106. request_info = (
  107. request.req_id,
  108. request.__class__.__name__,
  109. request.fields,
  110. request.key_data,
  111. )
  112. print(request_description % request_info)
  113. else:
  114. print('No request found for id: "%s"' % argv[0])
  115. def do_drop_request(self, l):
  116. 'Delete a sign request: DROP_REQUEST request_id'
  117. argv = l.split()
  118. argc = len(argv)
  119. # argument number is too low
  120. if argc < 1:
  121. print("Usage: DROP_REQUEST request_id")
  122. return
  123. del self.ca_manager.request[argv[0]]
  124. def do_gen_ssh(self, l):
  125. 'Generate a SSH Certification authority: GEN_SSH ca_id ca_description'
  126. argv = l.split(maxsplit=1)
  127. argc = len(argv)
  128. # argument number is too low
  129. if argc < 2:
  130. print("Usage: GEN_SSH ca_id ca_description")
  131. return
  132. ca_id = argv[0]
  133. name = argv[1]
  134. new_auth = SSHAuthority(
  135. ca_id = ca_id,
  136. name = name,
  137. serial = 0,
  138. active = True,
  139. creation_date = datetime.now(),
  140. )
  141. new_auth.generate()
  142. new_auth.save()
  143. def do_gen_ssl(self, l):
  144. 'Generate a SSL Certification authority: GEN_SSL ca_id ca_description'
  145. argv = l.split(maxsplit=1)
  146. argc = len(argv)
  147. # argument number is too low
  148. if argc < 2:
  149. print("Usage: gen_ssl ca_id ca_description")
  150. return
  151. ca_id = argv[0]
  152. name = argv[1]
  153. new_auth = SSLAuthority(
  154. ca_id = ca_id,
  155. name = name,
  156. serial = 0,
  157. active = True,
  158. creation_date = datetime.now(),
  159. )
  160. new_auth.generate()
  161. new_auth.save()
  162. def do_sign_request(self, l):
  163. 'Sign a request using a CA: SIGN_REQUEST ca_id request_id'
  164. argv = l.split()
  165. argc = len(argv)
  166. # argument number is too low
  167. if argc < 2:
  168. if argc == 0:
  169. # print available ca
  170. print("Available authority")
  171. print_available_authorities(self.ca_manager)
  172. print("==================")
  173. # print available requests
  174. print("Available request")
  175. print_available_requests(self.ca_manager)
  176. else:
  177. authority_id, request_id = argv[0], argv[1]
  178. sign_request(self.ca_manager, request_id, authority_id)
  179. def common_complete_request(self, text, line, begidx, endidx):
  180. argc = len(("%send"%line).split())
  181. if argc == 2:
  182. return [request.req_id for i, request in enumerate(self.ca_manager.request) if request.req_id.startswith(text)]
  183. def common_complete_ca(self, text, line, begidx, endidx):
  184. argc = len(("%send"%line).split())
  185. if argc == 2:
  186. return [ca_item.ca_id for i, ca_item in enumerate(self.ca_manager.ca) if ca_item.ca_id.startswith(text)]
  187. def common_complete_certificate(self, text, line, begidx, endidx):
  188. argc = len(("%send"%line).split())
  189. if argc == 2:
  190. return [certificate.cert_id for i, certificate in enumerate(self.ca_manager.certificate) if certificate.cert_id.startswith(text)]
  191. def complete_drop_request(self, text, line, begidx, endidx):
  192. return self.common_complete_request(text, line, begidx, endidx)
  193. def complete_describe_certificate(self, text, line, begidx, endidx):
  194. return self.common_complete_certificate(text, line, begidx, endidx)
  195. def complete_describe_ca(self, text, line, begidx, endidx):
  196. return self.common_complete_ca(text, line, begidx, endidx)
  197. def complete_describe_request(self, text, line, begidx, endidx):
  198. return self.common_complete_request(text, line, begidx, endidx)
  199. def complete_sign_request(self, text, line, begidx, endidx):
  200. results = ''
  201. argc = len(("%send"%line).split())
  202. if argc == 2:
  203. results = [ca_item.ca_id for i, ca_item in enumerate(self.ca_manager.ca) if ca_item.ca_id.startswith(text)]
  204. elif argc == 3:
  205. try:
  206. ca = self.ca_manager.ca[line.split()[1]]
  207. except Exception as e:
  208. print ("Error: %s"%e)
  209. return
  210. results = [request.req_id for i, request in enumerate(self.ca_manager.request) if request.req_id.startswith(text) and request.__class__ in ca.request_allowed]
  211. return results
  212. def complete(self, text, state):
  213. results = super().complete(text, state)
  214. if results is not None:
  215. return "%s "%results
  216. return results
  217. def do_quit(self, l):
  218. 'Quit this shell'
  219. return True
  220. def print_available_authorities(ca_manager):
  221. for i, ca in enumerate(ca_manager.ca):
  222. print(ca)
  223. def print_available_requests(ca_manager):
  224. for i, request in enumerate(ca_manager.request):
  225. print(request)