|
|
- #!/usr/bin/env python3
- import ldap
- import ldap.modlist
- import string
- from random import SystemRandom
-
- # LDAP Server Configuration
- LDAP_URI = "ldap://ldap1.dmz.lilik.it"
- LDAP_STARTTLS = True
- LDAP_BASEDN = "dc=lilik,dc=it"
- MAIL_DOMAIN = "lilik.it"
- NEW_PW_LEN = 10
-
- # Manager Account
- # Required privileges:
- # - New user creation in `ou=People`
- # - User search in `ou=People`
- # All other action are performed as the logged-in user, and must be
- # allowed by LDAP ACLs
- LDAPMANAGER_BINDDN = "cn=,ou=Server,dc=lilik,dc=it"
- LDAPMANAGER_BINDPW = ""
-
-
- def ldapmanager_check_uid_exists(uid):
- conn = ldap.initialize(LDAP_URI)
- if(LDAP_STARTTLS):
- conn.start_tls_s()
- conn.simple_bind_s(LDAPMANAGER_BINDDN, LDAPMANAGER_BINDPW)
- results = conn.search_s('ou=People,{}'.format(LDAP_BASEDN),
- ldap.SCOPE_SUBTREE,
- 'uid={}'.format(uid),
- ['uid'])
- conn.unbind_s()
- return len(results) > 0
-
- def ldapmanager_create_uid(uid, group_cn, req_attrs):
- dn="uid={},ou=People,{}".format(uid, LDAP_BASEDN)
-
- # New user template
- attrs = {
- 'objectClass': [b'top', b'inetOrgPerson', b'authorizedServiceObject'],
- 'uid': uid.encode('utf-8'),
- 'mail': "{}@{}".format(uid, MAIL_DOMAIN).encode('utf-8'),
- 'manager': "cn={},ou=Group,{}".format(group_cn, LDAP_BASEDN).encode('utf-8'),
- 'cn': req_attrs['cn'].encode('utf-8'),
- 'sn': req_attrs['sn'].encode('utf-8'),
- 'authorizedService': [b'nextcloud', b'matrix', b'gitea']
- }
-
- conn = ldap.initialize(LDAP_URI)
- if(LDAP_STARTTLS):
- conn.start_tls_s()
- conn.simple_bind_s(LDAPMANAGER_BINDDN, LDAPMANAGER_BINDPW)
- conn.add_s(dn, ldap.modlist.addModlist(attrs))
- conn.unbind_s()
-
- return
-
- class Group(object):
-
- def __init__(self, admin_uid, admin_pass):
- self.admin_uid = admin_uid
- self.conn = self.bind(admin_uid, admin_pass)
-
- def bind(self, admin_uid, admin_pass):
- conn = ldap.initialize(LDAP_URI)
- if(LDAP_STARTTLS):
- conn.start_tls_s()
- conn.simple_bind_s("uid={},ou=People,{}".format(admin_uid, LDAP_BASEDN), admin_pass)
- return conn
-
- def group_list(self):
- results = self.conn.search_s('ou=Group,{}'.format(LDAP_BASEDN),
- ldap.SCOPE_SUBTREE,
- 'owner=uid={},ou=People,{}'.format(self.admin_uid, LDAP_BASEDN),
- ['cn', 'description'])
- return [ (group["cn"][0].decode('utf-8'),
- group["description"][0].decode('utf-8')) for key, group in results ]
-
- def member_list(self, group_cn):
- results = self.conn.search_s('cn={},ou=Group,{}'.format(group_cn, LDAP_BASEDN),
- ldap.SCOPE_BASE,
- 'owner=uid={},ou=People,{}'.format(self.admin_uid, LDAP_BASEDN),
- ['member'])
-
- return [ ldap.dn.str2dn(member)[0][0][1] for member in results[0][1]["member"] ]
-
- def add_member(self, group_cn, uid):
- self.conn.modify_s('cn={},ou=Group,{}'.format(group_cn, LDAP_BASEDN),
- [ (ldap.MOD_ADD, 'member', ["uid={},ou=People,{}".format(uid, LDAP_BASEDN).encode('utf-8')]) ])
- return
- def reset_password(self, uid):
- valid_chars = (string.ascii_uppercase
- + string.ascii_lowercase
- + string.digits)
- rng = SystemRandom()
- passwd = "".join([rng.choice(valid_chars) for _ in range(NEW_PW_LEN)])
-
- result = self.conn.passwd_s('uid={},ou=People,{}'.format(uid, LDAP_BASEDN),
- None,
- passwd)
-
- return passwd
-
- from flask import Flask
- from flask import request, jsonify
- app = Flask(__name__)
-
- @app.route('/get_list', methods=['POST'])
- def display_list():
- try:
- g = Group(request.form['username'], request.form['password'])
- result = [
- { "cn": group[0],
- "description": group[1],
- "members": g.member_list(group[0])
- }
- for group in g.group_list()
- ]
- except Exception as e:
- return jsonify({"failed": True, "reason": str(e)})
- return jsonify(result)
-
-
-
- @app.route('/group/<group_cn>/create/<new_uid>', methods=['POST'])
- def new_user(group_cn, new_uid):
- try:
- g = Group(request.json['username'], request.json['password'])
- if group_cn not in [ group[0] for group in g.group_list() ]:
- result = { "failed": True,
- "reason": "User {} is not an administrator for group {}".format(request.json['username'], group_cn) }
- elif ldapmanager_check_uid_exists(new_uid):
- result = { "failed": True,
- "reason": "User {} already exists, choose another name.".format(new_uid) }
- else:
- ldapmanager_create_uid(new_uid, group_cn, request.json['newEntry'])
- g.add_member(group_cn, new_uid)
- if 'memberOf' in request.json['newEntry']:
- for extra_group in request.json['newEntry']['memberOf']:
- g.add_member(extra_group, new_uid)
- new_passwd = g.reset_password(new_uid)
- result = { "failed": False,
- "newPasswd": new_passwd }
- except Exception as e:
- return jsonify({ "failed": True, "reason": str(e) })
-
- return jsonify(result)
-
- @app.route('/groups', methods=['POST'])
- def group_list():
- try:
- g = Group(request.json['username'], request.json['password'])
- result = {
- "failed": False,
- "groups": { group[0]: group[1] for group in g.group_list() }
- }
- except Exception as e:
- return jsonify({"failed": True, "reason": str(e)})
- return jsonify(result)
-
- @app.route('/reset_password/<target_user>', methods=['POST'])
- def reset_password(target_user):
- try:
- g = Group(request.form['username'], request.form['password'])
- newpasswd = g.reset_password(target_user)
- except Exception as e:
- return jsonify({"failed": True, "reason": str(e)})
- return jsonify({"failed": False, "passwd": newpasswd})
-
- if __name__ == "__main__":
- app.run(host='127.0.0.1')
|