|
|
- #!/usr/bin/env python3
- import lilikusers
-
- import logging
- from logging.handlers import RotatingFileHandler
- import json
- from flask import Flask, jsonify
- from flask import request, Response, redirect, url_for
- from functools import wraps
-
- app = Flask(__name__)
-
- logfmt = logging.Formatter('time=%(asctime)s level=%(levelname)s app=%(name) msg=%(message)s')
-
- logrotate = RotatingFileHandler(
- '/var/log/login/server',
- backupCount=10,
- )
-
- logrotate.setFormatter(logfmt)
-
- logrotate.setLevel(logging.DEBUG)
-
- app.logger.addHandler(logrotate)
-
- lilik_ldap = lilikusers.LILiK_LDAP()
-
- def check_auth(user_name, password):
- """
- This function is called to check if a username /
- password combination is valid.
- """
- if lilik_ldap.login(user_name, password):
- return lilik_ldap.get_user(user_name)
- app.logger.warning('Invalid login for <{}>'.format(user_name))
- raise ValueError
-
- def authenticate():
- """Sends a 401 response that enables basic auth"""
- app.logger.debug('Request http basic auth')
- return Response(
- 'Could not verify your access level for that URL.\n'
- 'You have to login with proper credentials\n', 401)
-
- def admin_required():
- """Sends a 401 response that enables basic auth"""
- app.logger.debug('Request admin level')
- return Response(
- 'Could not verify your access level for that URL.\n'
- 'You have to login with admin rights\n', 401)
-
- def requires_auth(f):
- @wraps(f)
- def decorated(*args, **kwargs):
- auth = request.authorization
- if not auth:
- return authenticate()
- try:
- user = check_auth(auth.username, auth.password)
- except ValueError:
- app.logger.warning('Authentication failed for <{}>'.format(auth.username))
- return authenticate()
- return f(user, *args, **kwargs)
- return decorated
-
- def requires_admin_auth(f):
- @wraps(f)
- def decorated(user, *args, **kwargs):
- if not user.services['admin']:
- app.logger.warning('Admin privilege required for user <{}>'.format(user.uid))
- return admin_required()
- return f(user, *args, **kwargs)
- return decorated
-
- def requires_same_user_or_admin_auth(f):
- @wraps(f)
- def decorated(user, user_name, *args, **kwargs):
- if not user.services['admin'] and user.uid != user_name:
- app.logger.warning('Admin privilege required for user <{}>'.format(user.uid))
- return admin_required()
- return f(user, user_name, *args, **kwargs)
- return decorated
-
- @app.route('/api/users', methods=['GET'])
- @requires_auth
- @requires_admin_auth
- def get_users(user):
- ''' return the list of users'''
- return jsonify(lilik_ldap.get_users())
-
- @app.route('/api/users/<user_name>', methods=['GET'])
- @requires_auth
- @requires_same_user_or_admin_auth
- def get_user(user, user_name):
- ''' return the list of users'''
- return jsonify(lilik_ldap.get_user(user_name).to_dict())
-
- @app.route('/api/users/<user_name>', methods=['PUT'])
- @requires_auth
- @requires_same_user_or_admin_auth
- def update_user(user, user_name):
- new_lilik_user = request.get_json()
- app.logger.info('User <{}> editing user <{}>'.format(user.uid, user_name))
- user_to_edit = lilik_ldap.get_user(user_name)
- diff = user_to_edit.diff(new_lilik_user)
- is_permitted_self_changes = diff.changed() <= set(['cn']) and diff.removed() == set() and diff.added() <= set(['userPassword'])
- if user.services['admin'] or is_permitted_self_changes:
- user_to_edit.update(new_lilik_user)
- return jsonify(lilik_ldap.get_user(user_name).to_dict())
-
- @app.route('/api/users', methods=['POST'])
- @requires_auth
- @requires_admin_auth
- def new_user(user):
- app.logger.info('User <{}> create a new user'.format(user.uid))
- new_lilik_user = request.get_json()
- app.logger.info(lilik_ldap.new_user(new_lilik_user))
- return jsonify(lilik_ldap.get_user(new_lilik_user['uid']).to_dict())
-
- @app.route('/')
- def home():
- return redirect(url_for('static', filename='index.html'))
-
- if __name__ == '__main__':
- app.run(debug=True)
|