#!/usr/bin/env python3
|
|
import ldap
|
|
import ldap.modlist
|
|
import string
|
|
from random import SystemRandom
|
|
|
|
# LDAP Server Configuration
|
|
LDAP_URI = "ldap://ldap1.dmz.lilik.it"
|
|
LDAP_STARTTLS = True
|
|
LDAP_BASEDN = "dc=lilik,dc=it"
|
|
MAIL_DOMAIN = "lilik.it"
|
|
NEW_PW_LEN = 10
|
|
|
|
# Manager Account
|
|
# Required privileges:
|
|
# - New user creation in `ou=People`
|
|
# - User search in `ou=People`
|
|
# All other action are performed as the logged-in user, and must be
|
|
# allowed by LDAP ACLs
|
|
LDAPMANAGER_BINDDN = "cn=,ou=Server,dc=lilik,dc=it"
|
|
LDAPMANAGER_BINDPW = ""
|
|
|
|
|
|
def ldapmanager_check_uid_exists(uid):
|
|
conn = ldap.initialize(LDAP_URI)
|
|
if(LDAP_STARTTLS):
|
|
conn.start_tls_s()
|
|
conn.simple_bind_s(LDAPMANAGER_BINDDN, LDAPMANAGER_BINDPW)
|
|
results = conn.search_s('ou=People,{}'.format(LDAP_BASEDN),
|
|
ldap.SCOPE_SUBTREE,
|
|
'uid={}'.format(uid),
|
|
['uid'])
|
|
conn.unbind_s()
|
|
return len(results) > 0
|
|
|
|
def ldapmanager_create_uid(uid, group_cn, req_attrs):
|
|
dn="uid={},ou=People,{}".format(uid, LDAP_BASEDN)
|
|
|
|
# New user template
|
|
attrs = {
|
|
'objectClass': [b'top', b'inetOrgPerson', b'authorizedServiceObject'],
|
|
'uid': uid.encode('utf-8'),
|
|
'mail': "{}@{}".format(uid, MAIL_DOMAIN).encode('utf-8'),
|
|
'manager': "cn={},ou=Group,{}".format(group_cn, LDAP_BASEDN).encode('utf-8'),
|
|
'cn': req_attrs['cn'].encode('utf-8'),
|
|
'sn': req_attrs['sn'].encode('utf-8'),
|
|
'authorizedService': [b'nextcloud', b'matrix', b'gitea']
|
|
}
|
|
|
|
conn = ldap.initialize(LDAP_URI)
|
|
if(LDAP_STARTTLS):
|
|
conn.start_tls_s()
|
|
conn.simple_bind_s(LDAPMANAGER_BINDDN, LDAPMANAGER_BINDPW)
|
|
conn.add_s(dn, ldap.modlist.addModlist(attrs))
|
|
conn.unbind_s()
|
|
|
|
return
|
|
|
|
class Group(object):
|
|
|
|
def __init__(self, admin_uid, admin_pass):
|
|
self.admin_uid = admin_uid
|
|
self.conn = self.bind(admin_uid, admin_pass)
|
|
|
|
def bind(self, admin_uid, admin_pass):
|
|
conn = ldap.initialize(LDAP_URI)
|
|
if(LDAP_STARTTLS):
|
|
conn.start_tls_s()
|
|
conn.simple_bind_s("uid={},ou=People,{}".format(admin_uid, LDAP_BASEDN), admin_pass)
|
|
return conn
|
|
|
|
def group_list(self):
|
|
results = self.conn.search_s('ou=Group,{}'.format(LDAP_BASEDN),
|
|
ldap.SCOPE_SUBTREE,
|
|
'owner=uid={},ou=People,{}'.format(self.admin_uid, LDAP_BASEDN),
|
|
['cn', 'description'])
|
|
return [ (group["cn"][0].decode('utf-8'),
|
|
group["description"][0].decode('utf-8')) for key, group in results ]
|
|
|
|
def member_list(self, group_cn):
|
|
results = self.conn.search_s('cn={},ou=Group,{}'.format(group_cn, LDAP_BASEDN),
|
|
ldap.SCOPE_BASE,
|
|
'owner=uid={},ou=People,{}'.format(self.admin_uid, LDAP_BASEDN),
|
|
['member'])
|
|
|
|
return [ ldap.dn.str2dn(member)[0][0][1] for member in results[0][1]["member"] ]
|
|
|
|
def add_member(self, group_cn, uid):
|
|
self.conn.modify_s('cn={},ou=Group,{}'.format(group_cn, LDAP_BASEDN),
|
|
[ (ldap.MOD_ADD, 'member', ["uid={},ou=People,{}".format(uid, LDAP_BASEDN).encode('utf-8')]) ])
|
|
return
|
|
def reset_password(self, uid):
|
|
valid_chars = (string.ascii_uppercase
|
|
+ string.ascii_lowercase
|
|
+ string.digits)
|
|
rng = SystemRandom()
|
|
passwd = "".join([rng.choice(valid_chars) for _ in range(NEW_PW_LEN)])
|
|
|
|
result = self.conn.passwd_s('uid={},ou=People,{}'.format(uid, LDAP_BASEDN),
|
|
None,
|
|
passwd)
|
|
|
|
return passwd
|
|
|
|
from flask import Flask
|
|
from flask import request, jsonify
|
|
app = Flask(__name__)
|
|
|
|
@app.route('/get_list', methods=['POST'])
|
|
def display_list():
|
|
try:
|
|
g = Group(request.form['username'], request.form['password'])
|
|
result = [
|
|
{ "cn": group[0],
|
|
"description": group[1],
|
|
"members": g.member_list(group[0])
|
|
}
|
|
for group in g.group_list()
|
|
]
|
|
except Exception as e:
|
|
return jsonify({"failed": True, "reason": str(e)})
|
|
return jsonify(result)
|
|
|
|
|
|
|
|
@app.route('/group/<group_cn>/create/<new_uid>', methods=['POST'])
|
|
def new_user(group_cn, new_uid):
|
|
try:
|
|
g = Group(request.json['username'], request.json['password'])
|
|
if group_cn not in [ group[0] for group in g.group_list() ]:
|
|
result = { "failed": True,
|
|
"reason": "User {} is not an administrator for group {}".format(request.json['username'], group_cn) }
|
|
elif ldapmanager_check_uid_exists(new_uid):
|
|
result = { "failed": True,
|
|
"reason": "User {} already exists, choose another name.".format(new_uid) }
|
|
else:
|
|
ldapmanager_create_uid(new_uid, group_cn, request.json['newEntry'])
|
|
g.add_member(group_cn, new_uid)
|
|
new_passwd = g.reset_password(new_uid)
|
|
result = { "failed": False,
|
|
"newPasswd": new_passwd }
|
|
except Exception as e:
|
|
return jsonify({ "failed": True, "reason": str(e) })
|
|
|
|
return jsonify(result)
|
|
|
|
@app.route('/groups', methods=['POST'])
|
|
def group_list():
|
|
try:
|
|
g = Group(request.json['username'], request.json['password'])
|
|
result = {
|
|
"failed": False,
|
|
"groups": { group[0]: group[1] for group in g.group_list() }
|
|
}
|
|
except Exception as e:
|
|
return jsonify({"failed": True, "reason": str(e)})
|
|
return jsonify(result)
|
|
|
|
@app.route('/reset_password/<target_user>', methods=['POST'])
|
|
def reset_password(target_user):
|
|
try:
|
|
g = Group(request.form['username'], request.form['password'])
|
|
newpasswd = g.reset_password(target_user)
|
|
except Exception as e:
|
|
return jsonify({"failed": True, "reason": str(e)})
|
|
return jsonify({"failed": False, "passwd": newpasswd})
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host='127.0.0.1')
|