#!/usr/bin/env python3 import ldap import ldap.modlist import string from random import SystemRandom # LDAP Server Configuration LDAP_URI = "ldap://ldap1.dmz.lilik.it" LDAP_STARTTLS = True LDAP_BASEDN = "dc=lilik,dc=it" MAIL_DOMAIN = "lilik.it" NEW_PW_LEN = 10 # Manager Account # Required privileges: # - New user creation in `ou=People` # - User search in `ou=People` # All other action are performed as the logged-in user, and must be # allowed by LDAP ACLs LDAPMANAGER_BINDDN = "cn=,ou=Server,dc=lilik,dc=it" LDAPMANAGER_BINDPW = "" def ldapmanager_check_uid_exists(uid): conn = ldap.initialize(LDAP_URI) if(LDAP_STARTTLS): conn.start_tls_s() conn.simple_bind_s(LDAPMANAGER_BINDDN, LDAPMANAGER_BINDPW) results = conn.search_s('ou=People,{}'.format(LDAP_BASEDN), ldap.SCOPE_SUBTREE, 'uid={}'.format(uid), ['uid']) conn.unbind_s() return len(results) > 0 def ldapmanager_create_uid(uid, group_cn, req_attrs): dn="uid={},ou=People,{}".format(uid, LDAP_BASEDN) # New user template attrs = { 'objectClass': [b'top', b'inetOrgPerson', b'authorizedServiceObject'], 'uid': uid.encode('utf-8'), 'mail': "{}@{}".format(uid, MAIL_DOMAIN).encode('utf-8'), 'manager': "cn={},ou=Group,{}".format(group_cn, LDAP_BASEDN).encode('utf-8'), 'cn': req_attrs['cn'].encode('utf-8'), 'sn': req_attrs['sn'].encode('utf-8'), 'authorizedService': [b'nextcloud', b'matrix', b'gitea'] } conn = ldap.initialize(LDAP_URI) if(LDAP_STARTTLS): conn.start_tls_s() conn.simple_bind_s(LDAPMANAGER_BINDDN, LDAPMANAGER_BINDPW) conn.add_s(dn, ldap.modlist.addModlist(attrs)) conn.unbind_s() return class Group(object): def __init__(self, admin_uid, admin_pass): self.admin_uid = admin_uid self.conn = self.bind(admin_uid, admin_pass) def bind(self, admin_uid, admin_pass): conn = ldap.initialize(LDAP_URI) if(LDAP_STARTTLS): conn.start_tls_s() conn.simple_bind_s("uid={},ou=People,{}".format(admin_uid, LDAP_BASEDN), admin_pass) return conn def group_list(self): results = self.conn.search_s('ou=Group,{}'.format(LDAP_BASEDN), ldap.SCOPE_SUBTREE, 'owner=uid={},ou=People,{}'.format(self.admin_uid, LDAP_BASEDN), ['cn', 'description']) return [ (group["cn"][0].decode('utf-8'), group["description"][0].decode('utf-8')) for key, group in results ] def member_list(self, group_cn): results = self.conn.search_s('cn={},ou=Group,{}'.format(group_cn, LDAP_BASEDN), ldap.SCOPE_BASE, 'owner=uid={},ou=People,{}'.format(self.admin_uid, LDAP_BASEDN), ['member']) return [ ldap.dn.str2dn(member)[0][0][1] for member in results[0][1]["member"] ] def add_member(self, group_cn, uid): self.conn.modify_s('cn={},ou=Group,{}'.format(group_cn, LDAP_BASEDN), [ (ldap.MOD_ADD, 'member', ["uid={},ou=People,{}".format(uid, LDAP_BASEDN).encode('utf-8')]) ]) return def reset_password(self, uid): valid_chars = (string.ascii_uppercase + string.ascii_lowercase + string.digits) rng = SystemRandom() passwd = "".join([rng.choice(valid_chars) for _ in range(NEW_PW_LEN)]) result = self.conn.passwd_s('uid={},ou=People,{}'.format(uid, LDAP_BASEDN), None, passwd) return passwd from flask import Flask from flask import request, jsonify app = Flask(__name__) @app.route('/get_list', methods=['POST']) def display_list(): try: g = Group(request.form['username'], request.form['password']) result = [ { "cn": group[0], "description": group[1], "members": g.member_list(group[0]) } for group in g.group_list() ] except Exception as e: return jsonify({"failed": True, "reason": str(e)}) return jsonify(result) @app.route('/group//create/', methods=['POST']) def new_user(group_cn, new_uid): try: g = Group(request.json['username'], request.json['password']) if group_cn not in [ group[0] for group in g.group_list() ]: result = { "failed": True, "reason": "User {} is not an administrator for group {}".format(request.json['username'], group_cn) } elif ldapmanager_check_uid_exists(new_uid): result = { "failed": True, "reason": "User {} already exists, choose another name.".format(new_uid) } else: ldapmanager_create_uid(new_uid, group_cn, request.json['newEntry']) g.add_member(group_cn, new_uid) new_passwd = g.reset_password(new_uid) result = { "failed": False, "newPasswd": new_passwd } except Exception as e: return jsonify({ "failed": True, "reason": str(e) }) return jsonify(result) @app.route('/groups', methods=['POST']) def group_list(): try: g = Group(request.json['username'], request.json['password']) result = { "failed": False, "groups": { group[0]: group[1] for group in g.group_list() } } except Exception as e: return jsonify({"failed": True, "reason": str(e)}) return jsonify(result) @app.route('/reset_password/', methods=['POST']) def reset_password(target_user): try: g = Group(request.form['username'], request.form['password']) newpasswd = g.reset_password(target_user) except Exception as e: return jsonify({"failed": True, "reason": str(e)}) return jsonify({"failed": False, "passwd": newpasswd}) if __name__ == "__main__": app.run(host='127.0.0.1')