Browse Source

First commit.

main
Zolfa 2 years ago
commit
60b44f4488
Signed by: zolfa GPG Key ID: E1A43B038C4D6616
8 changed files with 442 additions and 0 deletions
  1. +161
    -0
      .gitignore
  2. +10
    -0
      pyproject.toml
  3. +31
    -0
      setup.cfg
  4. +0
    -0
      src/zolfa/zauth/__init__.py
  5. +122
    -0
      src/zolfa/zauth/pasteur.py
  6. +0
    -0
      src/zolfa/zauth/utils/__init__.py
  7. +34
    -0
      src/zolfa/zauth/utils/exceptions.py
  8. +84
    -0
      src/zolfa/zauth/utils/webclient.py

+ 161
- 0
.gitignore View File

@ -0,0 +1,161 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
flask_session/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

+ 10
- 0
pyproject.toml View File

@ -0,0 +1,10 @@
[build-system]
requires = [
"setuptools>=42",
"wheel",
"setuptools_scm[toml]>3.4",
]
build-backend = "setuptools.build_meta"
[tool.setuptools_scm]
write_to = "src/zolfa/zauth/version.py"

+ 31
- 0
setup.cfg View File

@ -0,0 +1,31 @@
[metadata]
name = Zauth
version = attr:zolfa.zauth.version
author = Zolfa
author_email = zolfa@lilik.it
description = Helpers to authenticate and programmatically use external websites.
classifiers =
Development Status :: 1 - Planning
Programming Language :: Python
Programming Language :: Python :: 3.11
platforms = any
[options]
include_package_data = True
packages = find_namespace:
package_dir =
= src
install_requires =
requests
lxml
setup_requires =
setuptools_scm
[options.packages.find]
where = src
include = zolfa.*
[options.entry_points]
console_scripts =
serve_freebeer=stapa.freebeer.serve:serve

+ 0
- 0
src/zolfa/zauth/__init__.py View File


+ 122
- 0
src/zolfa/zauth/pasteur.py View File

@ -0,0 +1,122 @@
import logging
import random
import re
from http.cookiejar import Cookie
from urllib.parse import urljoin, urlparse, parse_qs
import requests
from .utils.webclient import WebClient
from .utils.exceptions import *
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class PasteurSSO:
HEADERS = {
'User-Agent': ("Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:104.0)"
" Gecko/20100101 Firefox/104.0)")
}
SP_START = "https://connect.pasteur.fr"
IDP_ENTRY_POINT = "https://idp.pasteur.fr/idp/profile/SAML2/POST/SSO"
def __init__(self, username, password):
self.username = username
self.password = password
self.client = WebClient(headers=self.HEADERS)
def authenticate(self):
# Go to SP authentication start page and look for SAML form.
logger.info(f"SP: Starting authentication from {self.SP_START}")
self.client.get(self.SP_START)
if not self.client.find_forms(action=self.IDP_ENTRY_POINT):
logger.info("SP: SAML form not found: already logged in?")
self.verify_sp_auth()
return
# Send SAML form to the IDP entry point
logger.info(f"SP: SAML form found: submitting to IDP")
self.client.select_form(action=self.IDP_ENTRY_POINT)
self.client.submit_form()
if self.client.status_code == 400:
raise Exception("IDP: Error 400: SAML request probably expired.")
# Check if already authenticated to IDP
self.client.select_form()
if 'SAMLResponse' in self.client.form:
logger.info("IDP: Already authenticated")
else:
# Perform authentication with IDP
logger.info("IDP: Authentication required.")
self.client.select_form()
self.client.submit_form()
self.client.select_form()
self.client.form['j_username'] = self.username
self.client.form['j_password'] = self.password
self.client.form['_eventId_proceed'] = ""
# Make authentication persistent
self.client.form['donotcache'] = "0"
logger.info(f"IDP: Authenticating '{self.username}'.")
self.client.submit_form()
self.client.select_form()
if 'SAMLResponse' not in self.client.form:
raise Exception("IDP: Authentication failed.")
# Send IDP SAMLResponse back to SP
logger.info("IDP: Got SAMLResponse, sending to SP")
self.client.submit_form()
self.sp_after_saml()
def verify_sp_auth(self):
m = re.match(
r"https://connect.pasteur.fr/f5-w-[0-9a-f]+\$\$/connect/$",
sso.client.url
)
if not m:
raise Exception("SP: unlogged and not redirecting to IDP!")
def sp_after_saml(self):
pass
class PasteurEmail(PasteurSSO):
SP_START = "https://email.pasteur.fr"
def verify_sp_auth(self):
if self.client.url != 'https://email.pasteur.fr/owa/':
raise Exception("SP: unlogged and not redirecting to IDP!")
class PasteurSAP(PasteurSSO):
SP_START = "https://portailha.pasteur.fr"
def sp_after_saml(self):
self.client.select_form(action="/sap/bc/ui2/nwbc")
self.client.submit_form()
def verify_sp_auth(self):
pass
class PasteurEurofins(PasteurSAP):
def authenticate(self):
self.client.get("https://b2b.eurofinsgenomics.eu")
if self.client.url == "https://b2b.eurofinsgenomics.eu/":
return True
super().authenticate()
self.client.get(
f"https://portailha.pasteur.fr/sap/opu/odata/srmnxp"
f"/CATALOG_LAUNCH_DETAILS/PollDetails(LAUNCH_FROM='PUNCH_OUT',"
f"SERVICE_ID='PEUROFINS2',OBJECT_ID='442078',PRODUCTID='')/?="
f"&random={random.random()}&random={random.random()}",
headers={'Accept': "application/json"}
)
launch_data = self.client.res.json()['d']
self.client.post(
launch_data['SERVICE_URL'],
data=launch_data['FORM_DATA']
)

+ 0
- 0
src/zolfa/zauth/utils/__init__.py View File


+ 34
- 0
src/zolfa/zauth/utils/exceptions.py View File

@ -0,0 +1,34 @@
class HTTPError(Exception):
def __init__(self, ans):
self.ans = ans
self.status_code = ans.status_code
super().__init__(f"Unexpected Status Code: {self.status_code}")
class TooManyFormsError(Exception):
def __init__(self, res, filters):
self.res = res
self.filters = filters
message = f"Too many forms matched the filters: {filters}."
super().__init__(message)
class FormNotFoundError(Exception):
def __init__(self, res, filters):
self.res = res
self.filters = filters
message = f"No forms matched the filters: {filters}."
super().__init__(message)
class NoFormSelectedError(Exception):
def __init__(self, function):
message = (f"Function '{function}' has been called before selecting "
f" any form.")
super().__init__(message)
class RemoteException(Exception):
def __init__(self, message, ans):
self.ans = ans
super().__init__(message)

+ 84
- 0
src/zolfa/zauth/utils/webclient.py View File

@ -0,0 +1,84 @@
from urllib.parse import urljoin
import requests
from lxml.etree import HTML
from .exceptions import *
class WebClientForm:
def __init__(self, form):
self.action = form.attrib['action']
self.method = form.attrib.get('method', "GET").upper()
self.data = {i.attrib['name']: i.attrib.get('value', "")
for i in form.xpath("//input")
if 'name' in i.attrib}
def update(self, items):
for k, v in items.items():
self[k] = v
def __getitem__(self, k):
return self.data[k]
def __setitem__(self, k, v):
self.data.update({k: v})
def __contains__(self, k):
return k in self.data
def __iter__(self):
return self.data.items()
def __repr__(self):
return str((self.method, self.action, self.data))
class WebClient(requests.Session):
def __init__(self, headers={}, **kwargs):
self.url = ""
self.res = None
self.form = None
super().__init__(**kwargs)
self.headers.update(headers)
def send(self, *args, **kwargs):
r = super().send(*args, **kwargs)
self.res = r
self.url = r.url
self.status_code = r.status_code
self.form = None
return r
def find_forms(self, **filters):
if not self.res:
return []
tree = HTML(self.res.content)
filters_xpath = [f"[@{k}='{v}']" for k, v in filters.items()]
filters_xpath = "".join(filters_xpath)
return tree.xpath("//form" + filters_xpath)
def select_form(self, **filters):
forms = self.find_forms(**filters)
if len(forms) != 1:
if forms:
raise TooManyFormsError(self.res, filters)
else:
raise FormNotFoundError(self.res, filters)
self.form = WebClientForm(forms[0])
def update_form(self, **arguments):
if not self.form:
raise NoFormSelectedError('update_form')
def submit_form(self):
if not self.form:
raise NoFormSelectedError('submit_form')
if self.form.method not in ["POST"]:
raise NotImplementedError(
f"Submit method '{self.form.method}' not supported.")
self.request(
self.form.method,
urljoin(self.url, self.form.action),
data=self.form.data
)

Loading…
Cancel
Save