#!/usr/bin/env python3
|
|
import lilikusers
|
|
import json
|
|
from flask import Flask, jsonify
|
|
from flask import request, Response
|
|
from functools import wraps
|
|
|
|
app = Flask(__name__)
|
|
|
|
lilik_ldap = lilikusers.LILiK_LDAP()
|
|
|
|
def check_auth(user_name, password):
|
|
"""This function is called to check if a username /
|
|
password combination is valid.
|
|
"""
|
|
if lilik_ldap.login(user_name, password):
|
|
return lilik_ldap.get_user(user_name)
|
|
raise ValueError
|
|
|
|
def authenticate():
|
|
"""Sends a 401 response that enables basic auth"""
|
|
return Response(
|
|
'Could not verify your access level for that URL.\n'
|
|
'You have to login with proper credentials\n', 401)
|
|
|
|
def admin_required():
|
|
"""Sends a 401 response that enables basic auth"""
|
|
return Response(
|
|
'Could not verify your access level for that URL.\n'
|
|
'You have to login with admin rights\n', 401)
|
|
|
|
def requires_auth(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
auth = request.authorization
|
|
if not auth:
|
|
return authenticate()
|
|
try:
|
|
user = check_auth(auth.username, auth.password)
|
|
except ValueError:
|
|
app.logger.warning("Authentication failed for %s" % auth.username)
|
|
return authenticate()
|
|
return f(user, *args, **kwargs)
|
|
return decorated
|
|
|
|
def requires_admin_auth(f):
|
|
@wraps(f)
|
|
def decorated(user, *args, **kwargs):
|
|
if not user.services['admin']:
|
|
app.logger.warning("Admin privilege required for user %s" % user.uid)
|
|
return admin_required()
|
|
return f(user, *args, **kwargs)
|
|
return decorated
|
|
|
|
def requires_same_user_or_admin_auth(f):
|
|
@wraps(f)
|
|
def decorated(user, user_name, *args, **kwargs):
|
|
if not user.services['admin'] and user.uid != user_name:
|
|
app.logger.warning("Admin privilege required for user %s" % user.uid)
|
|
return admin_required()
|
|
return f(user, user_name, *args, **kwargs)
|
|
return decorated
|
|
|
|
@app.route('/api/users', methods=['GET'])
|
|
@requires_auth
|
|
@requires_admin_auth
|
|
def get_users(user):
|
|
''' return the list of users'''
|
|
return jsonify(lilik_ldap.get_users())
|
|
|
|
@app.route('/api/users/<user_name>', methods=['GET'])
|
|
@requires_auth
|
|
@requires_same_user_or_admin_auth
|
|
def get_user(user, user_name):
|
|
''' return the list of users'''
|
|
return jsonify(lilik_ldap.get_user(user_name).to_dict())
|
|
|
|
@app.route('/api/users/<user_name>', methods=['PUT'])
|
|
@requires_auth
|
|
@requires_admin_auth
|
|
def update_user(user, user_name):
|
|
new_lilik_user = request.get_json()
|
|
lilik_ldap.get_user(user_name).update(new_lilik_user)
|
|
return jsonify(lilik_ldap.get_user(user_name).to_dict())
|
|
|
|
@app.route('/api/users', methods=['POST'])
|
|
@requires_auth
|
|
@requires_admin_auth
|
|
def new_user(user):
|
|
new_lilik_user = request.get_json()
|
|
print(lilik_ldap.new_user(new_lilik_user))
|
|
return jsonify(lilik_ldap.get_user(new_lilik_user['uid']).to_dict())
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True)
|