|
|
- #!/bin/usr/env python3
- import ldap3
- import utils
- import config
-
- def connection_decorator(f):
- with ldap3.Connection(config.SERVER, auto_bind=True) as conn:
- def wrapped_f(*args, **kwargs):
- return f(args[0], conn, *args[1:], **kwargs)
- return wrapped_f
-
- def admin_connection_decorator(f):
- with ldap3.Connection(config.SERVER, user=config.ADMIN_CN, password=config.ADMIN_PASSWORD, auto_bind=True) as conn:
- def wrapped_f(*args, **kwargs):
- return f(args[0], conn, *args[1:], **kwargs)
- return wrapped_f
-
- class LILiK_USER(object):
- _attributes = {'uid': [], 'cn': ['givenName']}
- _flags = {'mail': 'accountActive'}
- _hosts = ['ltsp',
- 'users']
- _groups = ['admin',
- 'wiki',
- 'lilik.it',
- 'cloud',
- 'projects',
- 'teambox',
- 'im']
- _posix_groups = ['users_sites']
- def __init__(self, entry, posix_groups):
- results = {}
- for attribute in self._attributes.keys():
- self.__setattr__(attribute, str(entry.__getattr__(attribute)))
- services = {}
-
- for service, flag in self._flags.items():
- services[service] = flag in entry and bool(entry[flag])
- for host in self._hosts:
- services[host] = 'host' in entry and host in entry['host']
- for group in self._groups:
- services[group] = 'memberOf' in entry and utils.ldap_path("cn=%s"%utils.clean_value(group), config.GROUP, config.DOMAIN) in list(entry['memberOf'])
- for posix_group in self._posix_groups:
- services[posix_group] = posix_group in posix_groups and str(entry.uid) in list(posix_groups[posix_group])
- self.__setattr__('services', services)
-
- def to_json(self):
- return json.dumps(self, default=lambda o: o.__dict__,)
-
- def to_dict(self):
- return self.__dict__
-
- @admin_connection_decorator
- def update(self, conn, new_lilik_user):
- user_cn = utils.ldap_path('uid=%s'%self.uid, config.PEOPLE, config.DOMAIN)
- diff = utils.DictDiffer(new_lilik_user, self.__dict__)
- modifiers = {user_cn: {}}
- if 'userPassword' in diff.added():
- modifiers[user_cn]['userPassword'] = [(ldap3.MODIFY_REPLACE, [new_lilik_user['userPassword']])] #TODO add hash encryption?
- for changed in diff.changed():
- if changed == 'services':
- services_diff = utils.DictDiffer(self.__dict__[changed], new_lilik_user[changed])
- for service_changed in services_diff.changed():
- if service_changed in self._flags:
- flag = self._flags[service_changed]
- modifiers[user_cn][flag] = [(ldap3.MODIFY_REPLACE, [new_lilik_user['services'][service_changed]])]
-
- elif service_changed in self._hosts:
- action = ldap3.MODIFY_ADD if new_lilik_user['services'][service_changed] else ldap3.MODIFY_DELETE
- modifiers[user_cn]['host'] = [(action, [service_changed])]
-
- elif service_changed in self._groups:
- group_cn = utils.ldap_path('cn=%s'%service_changed, config.GROUP, config.DOMAIN)
- action = ldap3.MODIFY_ADD if new_lilik_user['services'][service_changed] else ldap3.MODIFY_DELETE
- modifiers[group_cn] = {'member': [(action, [user_cn])]}
-
- elif service_changed in self._posix_groups:
- group_cn = utils.ldap_path('cn=%s'%service_changed, config.GROUP, config.DOMAIN)
- action = ldap3.MODIFY_ADD if new_lilik_user['services'][service_changed] else ldap3.MODIFY_DELETE
- modifiers[group_cn] = {'memberUid': [(action, [self.uid])]}
- else:
- raise Exception('Unknown user attribute')
- else:
- for alias in self._attributes[changed]:
- modifiers[user_cn][alias] = [(ldap3.MODIFY_REPLACE, [new_lilik_user[changed]])]
- modifiers[user_cn][changed] = [(ldap3.MODIFY_REPLACE, [new_lilik_user[changed]])]
- for entry_cn, modifier in modifiers.items():
- if modifier:
- conn.modify(entry_cn, modifier)
- if conn.result['result'] != 0:
- return False
- return True
-
- class LILiK_LDAP(object):
-
- def login(self, user_name, password):
- bind_dn = utils.ldap_path('uid=%s'%utils.clean_user_name(user_name), config.PEOPLE, config.DOMAIN)
- c = ldap3.Connection(config.SERVER, user=bind_dn, password=password)
- return c.bind()
-
- @connection_decorator
- def get_users(self, conn):
- conn.search(utils.ldap_path(config.PEOPLE, config.DOMAIN), '(objectclass=posixAccount)', attributes=['uid'])
- return [str(a.uid) for a in conn.entries]
-
- @connection_decorator
- def get_user(self, conn, user_name):
- conn.search(utils.ldap_path(config.PEOPLE, config.DOMAIN), '(&(objectclass=posixAccount)(uid=%s))'%utils.clean_user_name(user_name), attributes=['*', 'memberOf'])
- # return clean_user_name(user_name)
- if len(conn.entries) == 0:
- return None
- entry = conn.entries[0]
- return LILiK_USER(entry, self.get_posix_groups())
-
- @connection_decorator
- def get_posix_groups(self, conn):
- conn.search(utils.ldap_path(config.GROUP, config.DOMAIN), '(objectclass=posixGroup)', attributes=['*'])
- # return clean_user_name(user_name)
- results = {}
- for group in conn.entries:
- results[str(group.cn)] = list(group.memberUid) if 'memberUid' in group else []
- return results
|