Browse Source

Initial commit.

main
Lorenzo Zolfanelli 2 years ago
commit
1d9ede9881
Signed by: zolfa GPG Key ID: E1A43B038C4D6616
13 changed files with 1112 additions and 0 deletions
  1. +160
    -0
      .gitignore
  2. +0
    -0
      README.md
  3. +0
    -0
      pyjod/__init__.py
  4. +128
    -0
      pyjod/appdata.py
  5. +228
    -0
      pyjod/cli.py
  6. +69
    -0
      pyjod/jwe_handler.py
  7. +323
    -0
      pyjod/sca.py
  8. +25
    -0
      pyjod/utils.py
  9. +5
    -0
      pyjod/version.py
  10. +126
    -0
      pyjod/xkey.py
  11. +10
    -0
      pyproject.toml
  12. +38
    -0
      setup.cfg
  13. BIN
      test.png

+ 160
- 0
.gitignore View File

@ -0,0 +1,160 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

+ 0
- 0
README.md View File


+ 0
- 0
pyjod/__init__.py View File


+ 128
- 0
pyjod/appdata.py View File

@ -0,0 +1,128 @@
import json
from base64 import b64encode, b64decode
from pathlib import Path
import xdgappdirs
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from jwcrypto import jwk
from pathvalidate import sanitize_filename
data_dir = xdgappdirs.user_data_dir('pyjod', as_path=True)
class RSAPrivateKey:
def __init__(self, key):
self.key = key
@classmethod
def generate(cls):
key = rsa.generate_private_key(65537, 2048)
return cls(key)
@classmethod
def from_pem(cls, data):
key = serialization.load_pem_private_key(data, None)
return cls(key)
@property
def jwk(self):
return jwk.JWK.from_pyca(self.key)
@property
def pubkey_b64(self):
pubkey = self.key.public_key()
pubkey_bytes = pubkey.public_bytes(
serialization.Encoding.DER,
serialization.PublicFormat.SubjectPublicKeyInfo
)
return b64encode(pubkey_bytes).decode('utf-8')
def to_pem(self):
return self.key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption()
)
class RSAPublicKey:
def __init__(self, key):
self.key = key
@classmethod
def from_b64(cls, data):
key_bytes = b64decode(data)
key = serialization.load_der_public_key(key_bytes)
return cls(key)
@classmethod
def from_pem(cls, data):
key = serialization.load_pem_public_key(data)
return cls(key)
@property
def jwk(self):
return jwk.JWK.from_pyca(self.key)
def to_pem(self):
return self.key.public_bytes(
serialization.Encoding.PEM,
serialization.PublicFormat.SubjectPublicKeyInfo
)
class AppData:
def __init__(self, profile_name):
self.profile_dir = data_dir / sanitize_filename(profile_name)
self.profile_dir.mkdir(exist_ok=True, parents=True)
self.values_file = self.profile_dir / "values.json"
@property
def app_privkey(self):
key_file = self.profile_dir / 'app_privkey.pem'
if key_file.is_file():
with key_file.open('rb') as f:
key_bytes = f.read()
return RSAPrivateKey.from_pem(key_bytes)
else:
key = RSAPrivateKey.generate()
with key_file.open('wb') as f:
f.write(key.to_pem())
return key
@property
def serv_pubkey(self):
key_file = self.profile_dir / 'serv_pubkey.pem'
if key_file.is_file():
with key_file.open('rb') as f:
key_bytes = f.read()
return RSAPublicKey.from_pem(key_bytes)
else:
return None
@serv_pubkey.setter
def serv_pubkey(self, key_b64):
key = RSAPublicKey.from_b64(key_b64)
key_file = self.profile_dir / 'serv_pubkey.pem'
with key_file.open('wb') as f:
f.write(key.to_pem())
def __getitem__(self, key):
if self.values_file.is_file():
with self.values_file.open() as f:
values = json.load(f)
if key in values:
return values[key]
return None
def __setitem__(self, key, value):
if self.values_file.is_file():
with self.values_file.open() as f:
values = json.load(f)
else:
values = {}
values[key] = value
with self.values_file.open('w') as f:
json.dump(values, f)

+ 228
- 0
pyjod/cli.py View File

@ -0,0 +1,228 @@
import argparse
import getpass
import json
import logging
import re
import time
from argparse import ArgumentParser
from .xkey import XKey
from .sca import SCA, NotInitializedPIN, AuthError
logger = logging.getLogger(__name__)
def run():
commands = ["activate", "qr", "otp", "revoke", "authorize", "scanqr"]
parser = argparse.ArgumentParser(description="Gestisci OTP PosteID.")
parser.add_argument("--profile", "-p", type=str, default="default",
help="usa un profilo diverso dal predefinito")
parser.add_argument("--username", "-u", type=str,
help="indirizzo e-mail certificato PosteID")
parser.add_argument("command", nargs="?", choices=commands)
args = parser.parse_args()
logger.debug("Inizializzazione modulo XKey.")
xkey = XKey(args.profile)
if not xkey.appdata['xkey_appuuid']:
xkey.register_xkey()
logger.debug("Inizializzazione modulo SCA.")
sca = SCA(args.profile)
sca_app_regid = sca.appdata['sca_app_regid']
cli_prefix = "posteid"
if args.profile != "default":
cli_prefix += " --profile " + args.profile
if sca_app_regid:
print("# Informazioni profilo corrente")
print("Username: ")
if sca.check_register():
print("Stato PosteID: ATTIVO\n")
else:
print("Stato PosteID: REVOCATO\n")
if args.command == "activate":
print("# Riattivazione credenziali PosteID.")
perform_2fa_auth(sca, args.username)
else:
print("Le tue credenziali non sono più attive.")
print("Esegui `" + cli_prefix + " activate` per riattivarle.")
else:
logger.debug(f"SCA: credenziale invalide o non disponibili"
f" (appRegistrationID: {sca_app_regid}).")
print("Attivazione credenziali PosteID.")
perform_2fa_auth(sca, args.username)
if args.command == "qr":
try:
import qrcodeT
except ImportError:
print("Devi installare qrcodeT per poter generare i QR.")
print("Prova con `pip install qrcodeT`.")
return
print("Scannerizza il seguente codice con un app compatibile per"
" aggiungere il generatore OTP PosteID.\n")
qrcodeT.qrcodeT(sca.totp.provisioning_uri())
if args.command == "otp":
totp = sca.totp
remaining = totp.interval - time.time() % totp.interval
print(f"Codice OTP corrente: {totp.now()}"
f" (tempo rimanente: {remaining:.0f}s).\n")
if args.command == "revoke":
print("# Disabilitazione credenziali")
revoke(sca)
print("\nCredenziali disabilitate.")
if args.command == "authorize":
pin_login(sca)
authorize(sca)
if args.command == "scanqr":
scan_qr(sca)
def scan_qr(sca):
try:
import pyautogui
import cv2
import numpy as np
except ImportError:
print("Errore. Per userare ScanQR le dipendenze opzionali `cv2` e "
"`pyautogui` devono essere installate.")
scr = pyautogui.screenshot()
detector = cv2.QRCodeDetector()
mm = re.compile(r"^https://secureholder\.mobile\.poste\.it"
r"/jod-secure-holder/qrcodeResolver/(\w+)")
qr = detector.detectAndDecode(np.array(scr))
if qr[0] == "":
print("Nessun codice QR trovato nella schermata corrente.")
return
match_url = mm.match(qr[0])
if not match_url:
print("Codice QR trovato ma non valido!")
return
tx_id = match_url.groups()[0]
ch = sca.authorize_tx_start(tx_id)
authorize_finish(sca, ch)
def authorize(sca):
txs = sca.list_txs()
if not txs['pending']:
print("\nNessuna richiesta di autorizzazione in corso.\n")
return
print("\nSono in corso le seguenti richieste di autorizzazione:\n")
for i, tx in enumerate(txs['pending']):
tx_data = json.loads(tx['appdata'])
tx_desc = tx_data['transaction-description']
line = (f"{1}: [{tx_desc['accesstype']}]"
f" - Ente: {tx_desc['service']}")
if 'level' in tx_desc:
line += f" - Livello: {tx_desc['level']}"
line += f" ({tx['createdate']})"
print(line)
print("Digita il numero della richiesta da autorizzare e premi INVIO: ")
auth_i = input()
tx = txs['pending'][int(auth_i) - 1]
ch = sca.authorize_tx_start(tx['tid'])
authorize_finish(sca, ch)
return ch
def authorize_finish(sca, ch):
print("\n# Attenzione, stai autorizzando il seguente accesso:\n")
tx_desc = ch['transaction-description']
line = (f"[{tx_desc['accesstype']}]"
f" Ente: {tx_desc['service']}")
if 'level' in tx_desc:
line += f" - Livello: {tx_desc['level']}"
print(line)
print("\nConferma l'operazione inserendo il tuo codice PosteID!\n")
userpin = getpass.getpass("Codice PosteID: ")
sca.authorize_tx_finish(ch, userpin)
print("Accesso autorizzato!")
def pin_login(sca, attempts=0):
userpin = getpass.getpass("Codice PosteID: ")
try:
sca._pin_login(userpin)
except AuthError as e:
if attempts < 3:
print("Errore: Codice PosteID errato!")
print("Attenzione, il codice sarà bloccato dopo 5 tentativi.")
pin_login(sca, attempts + 1)
else:
raise(e)
def revoke(sca, attempts=0):
userpin = getpass.getpass("Codice PosteID: ")
try:
sca.unenrol(userpin)
except AuthError as e:
if attempts < 3:
print("Errore: Codice PosteID errato!")
print("Attenzione, il codice sarà bloccato dopo 5 tentativi.")
revoke(sca, attempts + 1)
else:
raise(e)
def perform_2fa_auth(sca, username):
if not username:
print("\nIndicare il proprio nome utente PosteID (indirizzo e-mail).")
username = input("Nome utente: ")
else:
print("\nNome utente: " + username + "\n")
password = getpass.getpass("Password: ")
tel = sca.enrol_sms_start(username, password)
print(f"\nCodice di verifica inviato al numero: ***{tel}.\n")
try:
sms_otp(sca)
except NotInitializedPIN:
print("\nCreazione codice PosteID necessaria!")
print("Scegli un codice PIN numerico di 6 cifre.")
initialize_pin(sca)
def sms_otp(sca, attempts=0):
otp = getpass.getpass("Codice verifica SMS: ")
try:
sca.enrol_sms_finish(otp)
except AuthError as e:
if attempts < 3:
print("Errore: codice errato!\n")
sms_otp(sca, attempts + 1)
else:
raise(e)
def initialize_pin(sca):
pin1 = getpass.getpass("Nuovo codice PosteID: ")
if not re.match(r"^[0-9]{6}$", pin1):
print("Errore: il formato del PIN non è corrretto!")
initialize_pin(sca)
return
pin2 = getpass.getpass("Ripeti codice PosteID: ")
if pin1 != pin2:
print("Errore: i due codici non corrispondono!")
initialize_pin(sca)
return
sca._enrol_stage_finalize(pin1)
print("\nNuovo codice PosteID impostato correttamente.\n")
if __name__ == "__main__":
run()

+ 69
- 0
pyjod/jwe_handler.py View File

@ -0,0 +1,69 @@
import json
from random import randint
from time import time
from uuid import uuid4
from jwcrypto.jwe import JWE
from pyotp import HOTP
from requests import Request
from .utils import sha256_base64, hmac_sha256
class JWEHandler:
def __init__(self, appdata):
self.appdata = appdata
def new_otp_specs(self):
generator = HOTP(self.appdata['xkey_seed'], 8, "SHA1")
counter = randint(0, 99999999)
otp_specs = {}
otp_specs['movingFactor'] = counter
otp_specs['otp'] = generator.generate_otp(counter)
otp_specs['type'] = "HMAC-SHA1"
return otp_specs
def encrypt(self, sub, data, auth=False):
now = int(time())
claims = {'data': data}
claims['sub'] = sub
claims['jti'] = str(uuid4())
claims['iat'] = now
claims['nbf'] = now
claims['exp'] = now + 60
claims['iss'] = "app-posteid-v3"
headers = {}
headers['cty'] = "JWE"
headers['typ'] = "JWT"
headers['alg'] = "RSA-OAEP-256"
headers['enc'] = "A256CBC-HS512"
if auth:
app_uuid = self.appdata['xkey_appuuid']
headers['kid'] = app_uuid
claims['kid-sha256'] = sha256_base64(app_uuid)
claims['otp-specs'] = self.new_otp_specs()
jwe_token = JWE(json.dumps(claims),
protected=headers,
recipient=self.appdata.serv_pubkey.jwk)
return jwe_token.serialize(compact=True)
def req_jwe_post(self, url, sub, data, auth=False):
jwe_token = self.encrypt(sub, data, auth)
headers = {'Content-Type': "application/json; charset=UTF-8"}
req = Request('POST', url, headers, data=jwe_token)
return req
def req_jwe_bearer(self, url, sub, data, auth=False):
jwe_token = self.encrypt(sub, data, auth)
headers = {'Content-Type': "",
'Authorization': "Bearer " + jwe_token}
req = Request('GET', url, headers)
return req
def decrypt(self, serialized_jwe_token):
if isinstance(serialized_jwe_token, bytes):
serialized_jwe_token = serialized_jwe_token.decode('utf-8')
jwe_token = JWE()
jwe_token.deserialize(serialized_jwe_token)
jwe_token.decrypt(self.appdata.app_privkey.jwk)
return json.loads(jwe_token.payload)

+ 323
- 0
pyjod/sca.py View File

@ -0,0 +1,323 @@
import json
import logging
import re
from base64 import b32encode
from getpass import getpass
from urllib.parse import urljoin
from uuid import uuid4
import requests
from pyotp import TOTP
from .appdata import AppData
from .jwe_handler import JWEHandler
from .utils import RequestFailure, hmac_sha256
logger = logging.getLogger(__name__)
class NotInitializedPIN(Exception):
pass
class AuthError(Exception):
pass
class SCA:
LOGIN_URL = "https://posteid.poste.it/jod-securelogin-schema/"
AUTH_URL = "https://posteid.poste.it/jod-login-schema/"
SH_URL = ("https://sh2-web-posteid.poste.it/jod-secure-holder2-web"
"/public/app/")
def __init__(self, profile_name):
self.appdata = AppData(profile_name)
self.s = requests.Session()
self.s.headers = {'Accept-Encoding': "gzip",
'User-Agent': "okhttp/3.12.1",
'Connection': "keep-alive"}
self.jwe_handler = JWEHandler(self.appdata)
self.reg_token = None
self.profile_token = None
self.access_token = None
def _send_req(self, request):
prepped = self.s.prepare_request(request)
return self.s.send(prepped)
def _parse_jwe_response(self, ans):
if ans.status_code != 200:
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
if not ans.headers.get('Content-Type').startswith("application/json"):
raise RequestFailure("Response not JSON.", ans)
ans_json = ans.json()
if not ans_json.get('command-success'):
raise RequestFailure(f"Command failed: {ans_json}", ans,
error_code=ans_json.get('command-error-code'))
if ans_json.get('command-result-type') not in ["JWE", "JSON"]:
raise RequestFailure(f"Result is not JWE/JSON: {ans_json}", ans)
return ans_json.get('command-result')
def _parse_v4(self, ans):
if ans.status_code != 200:
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
if not ans.headers.get('Content-Type').startswith("application/json"):
raise RequestFailure("Response not JSON.", ans)
ans_json = ans.json()
if (ans_json.get('status')
not in ["v4_success", "v4_pending", "v4_signed"]):
if ans_json.get('status') == "v4_error":
reason = ans_json.get('reason')
raise RequestFailure(f"Request v4 error: {reason}.", ans,
error_code=reason)
raise RequestFailure("Unknown v4 failure.", ans)
return ans_json
def _sign_challenge_v4(self, challenge, userpin):
userpin = str(userpin)
signature = {}
key = self.appdata['sca_seed'] + userpin + challenge['randK']
message = challenge['transaction-challenge']
signature['jti'] = challenge['jti']
signature['signature'] = hmac_sha256(key, message)
signature['userpin'] = userpin
signature['authzTool'] = "POSTEID"
return json.dumps(signature)
def _enrol_stage_basiclogin(self, username, password):
url = urljoin(self.LOGIN_URL, "v4/xmobileauthjwt")
data = {}
data['password'] = password
data['authLevel'] = "0"
data['userid'] = username
logger.debug("Enrol(basiclogin): sending username and password.")
jwe_req = self.jwe_handler.req_jwe_bearer(url, 'login', data,
auth=True)
ans = self._send_req(jwe_req)
if ans.status_code == 401:
raise AuthError("Wrong username or password!")
if ans.status_code != 200:
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
if 'X-PI' not in ans.headers:
raise RequestFailure(f"Malformed request, missing 'X-PI'.", ans)
uid = ans.headers.get('X-PI')
logger.debug(f"Enrol(basiclogin): completed, uid='{uid}'.")
def _enrol_stage_req_sms_otp(self, username):
url = urljoin(self.LOGIN_URL, "v4/xmobileauthjwt")
data = {}
data['password'] = str(uuid4())
data['authLevel'] = "3"
data['userid'] = username
logger.debug("Enrol(SMS-OTP-REQ): requesting SMS-OTP verification..")
jwe_req = self.jwe_handler.req_jwe_bearer(url, 'login', data,
auth=True)
ans = self._send_req(jwe_req)
if ans.status_code != 200:
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
if 'X-TEL' not in ans.headers:
raise RequestFailure(f"Malformed request, missing 'X-TEL'.", ans)
logger.debug("Enrol(SMS-OTP-REQ): SMS-OTP verification requested.")
tel = ans.headers.get('X-TEL')
logger.debug(f"Enrol(SMS-OTP-REQ): SMS-OTP sent to ***{tel}.")
return tel
def _enrol_stage_send_sms_otp(self, otp):
url = urljoin(self.LOGIN_URL, "v4/xmobileauthjwt")
data = {}
data['otp'] = str(otp)
data['authLevel'] = "2"
data['nonce'] = str(uuid4())
logger.debug("Enrol(SMS-OTP-AUTH): authenticating SMS-OTP.")
jwe_req = self.jwe_handler.req_jwe_bearer(url, 'login', data,
auth=True)
ans = self._send_req(jwe_req)
if ans.status_code == 401:
raise AuthError("Wrong SMS-OTP code!")
if ans.status_code != 200:
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
if 'X-RESULT' not in ans.headers:
raise RequestFailure(
f"Malformed request, missing 'X-RESULT'.", ans)
logger.debug("Enrol(SMS-OTP-AUTH): decrypting response.")
ans_json = self.jwe_handler.decrypt(ans.headers.get('X-RESULT'))
logger.debug(f"Enrol(SMS-OTP-AUTH): decrypted response: {ans_json}")
if 'data' not in ans_json:
raise RequestFailure("Malformed request, missing 'data'.", ans)
if 'token' not in ans_json['data']:
raise RequestFailure("Malformed request, missing 'token'.", ans)
reg_token = ans_json['data']['token']
logger.debug(f"Enrol(SMS-OTP-AUTH): got reg token: {reg_token}.")
self.reg_token = reg_token
def _enrol_stage_finalize(self, userpin=""):
url = urljoin(self.SH_URL, "v1/registerApp")
data = {}
data['userPIN'] = userpin
data['idpAccessToken'] = ""
data['registerToken'] = self.reg_token
logger.debug("Enrol(REG-APP): encrypting request.")
print(data)
jwe_req = self.jwe_handler.req_jwe_post(
url, 'registerApp', data, auth=True)
logger.debug("Enrol(REG-APP): sending request.")
ans = self._send_req(jwe_req)
try:
ans_jwe = self._parse_jwe_response(ans)
except RequestFailure as e:
if e.error_code == "PIN-ERR-1":
raise NotInitializedPIN()
raise e
ans_json = self.jwe_handler.decrypt(ans_jwe)
if 'data' not in ans_json:
raise RequestFailure("Malformed request, missing 'data'.", ans)
if 'appRegisterID' not in ans_json['data']:
raise RequestFailure(
"Malformed request, missing 'appRegisterID'.", ans)
if 'secretAPP' not in ans_json['data']:
raise RequestFailure(
"Malformed request, missing 'secretAPP'.", ans)
self.appdata['sca_app_regid'] = ans_json['data']['appRegisterID']
self.appdata['sca_seed'] = ans_json['data']['secretAPP']
logger.debug("Enrol(REG-APP): completed successfully!")
def check_register(self):
url = urljoin(self.SH_URL, "v1/checkRegisterApp")
data = {'appRegisterID': self.appdata['sca_app_regid']}
jwe_req = self.jwe_handler.req_jwe_post(url, "checkRegisterApp", data,
auth=True)
ans = self._send_req(jwe_req)
result = self._parse_jwe_response(ans)
if 'valid' not in result:
raise RequestFailure(
"Malformed request, missing 'valid'.", ans)
return result["valid"]
def enrol_sms_start(self, username, password):
self._enrol_stage_basiclogin(username, password)
tel = self._enrol_stage_req_sms_otp(username)
return tel
def enrol_sms_finish(self, otp, force_userpin=None):
self._enrol_stage_send_sms_otp(otp)
try:
self._enrol_stage_finalize()
except NotInitializedPIN as e:
if force_userpin:
logger.info('Enrol(REG-APP): Setting the new provided PIN.')
self._enrol_stage_finalize(force_userpin)
return
raise e
def _pin_login(self, userpin):
url_challenge = urljoin(self.LOGIN_URL,
"secureholder/v4/native/challenge")
url_authorize = urljoin(self.LOGIN_URL,
"secureholder/v4/native/az")
logger.debug("PINLogin: acquiring v4 challenge.")
req = self.jwe_handler.req_jwe_post(url_challenge, "login", {},
auth=True)
ans = self._send_req(req)
ans = self._parse_v4(ans)
logger.debug(f"PINLogin: got v4 challenge: {ans}.")
logger.debug(f"PINLogin: preparing challenge response.")
data = {}
data['signature'] = self._sign_challenge_v4(ans, userpin)
data['appRegisterID'] = self.appdata['sca_app_regid']
logger.debug(f"PINLogin: sending response {data}.")
req = self.jwe_handler.req_jwe_post(url_authorize, "login", data,
auth=True)
ans = self._send_req(req)
try:
ans = self._parse_v4(ans)
except RequestFailure as e:
if e.error_code == "PIN-ERR-3":
raise AuthError("Codice PosteID errato!")
elif e.error_code == "CERT-ERR-2":
raise AuthError("Codice PosteID bloccato per troppi errori!")
raise(e)
self.profile_token = ans['profile_token']
self.access_token = ans['access_token']
logger.debug(f"PINLogin: logged in, session token aquired.")
def _unenrol(self):
if not self.access_token:
raise Exception(
'Need to acquire an access token (pin_login) before.')
logger.debug("Unenrol: unenrolling device.")
url = urljoin(self.LOGIN_URL, "secureholder/v4/native/delete-posteid")
jwe_req = self.jwe_handler.req_jwe_bearer(url, "delete_posteid", {},
auth=True)
jwe_req.headers['Authorization'] = "Bearer " + self.access_token
ans = self._send_req(jwe_req)
ans = self._parse_v4(ans)
logger.debug("Unenrol: device unenrolled.")
def list_txs(self):
if not self.access_token:
raise Exception(
"Need to acquire an access token (pin_login) before.")
url = urljoin(self.LOGIN_URL,
"secureholder/v4/native/list-transaction")
jwe_req = self.jwe_handler.req_jwe_post(url, "login", {}, auth=True)
jwe_req.headers['Authorization'] = "Bearer " + self.access_token
ans = self._send_req(jwe_req)
ans_json = self._parse_v4(ans)
if 'transaction' not in ans_json:
raise RequestFailure(
f"Malformed response, missing 'transaction'.", ans)
return ans_json['transaction']
def authorize_tx_start(self, tx_id):
data = {}
data['jti'] = tx_id
data['appRegisterID']: self.appdata['sca_app_regid']
url = urljoin(self.AUTH_URL, "secureholder/v4/challenge")
jwe_req = self.jwe_handler.req_jwe_post(url, "login", data, auth=True)
ans = self._send_req(jwe_req)
ans_json = self._parse_v4(ans)
if ans_json['status'] != "v4_pending":
raise Exception(
f"TX Status is '{ans_json['status']}', not pending.")
if 'jti' not in ans_json:
raise RequestFailure(f"Malformed response, missing 'jti'.", ans)
if 'randK' not in ans_json:
raise RequestFailure(f"Malformed response, missing 'randK'.", ans)
if 'transaction-challenge' not in ans_json:
raise RequestFailure(
f"Malformed response, missing 'transaction-challenge'.", ans)
return ans_json
def authorize_tx_finish(self, challenge, userpin):
signature = self._sign_challenge_v4(challenge, userpin)
data = {}
data['signature'] = signature
data['appRegisterID'] = self.appdata['sca_app_regid']
url = urljoin(self.AUTH_URL, "secureholder/v4/az")
jwe_req = self.jwe_handler.req_jwe_post(url, "login", data, auth=True)
ans = self._send_req(jwe_req)
try:
ans_json = self._parse_v4(ans)
except RequestFailure as e:
if e.error_code == "PIN-ERR-3":
raise AuthError("Codice PosteID errato!")
elif e.error_code == "CERT-ERR-2":
raise AuthError("Codice PosteID bloccato per troppi errori!")
raise(e)
if ans_json['status'] != "v4_signed":
raise Exception(
f"TX Status is '{ans_json['status']}', not signed.")
def unenrol(self, userpin):
self._pin_login(userpin)
self._unenrol()
@property
def totp(self):
seed = self.appdata['sca_seed']
seed = b32encode(seed.encode("utf-8") + b'\0' * 32)
seed = seed.decode("utf-8").replace('=', '')
totp = TOTP(seed, interval=120)
return totp

+ 25
- 0
pyjod/utils.py View File

@ -0,0 +1,25 @@
import hmac
import json
from base64 import b64encode, urlsafe_b64encode
from hashlib import sha256
def sha256_base64(payload):
out = payload.encode('utf-8')
out = sha256(out).digest()
out = b64encode(out)
return out.decode('utf-8')
def hmac_sha256(key, message):
hm = hmac.new(key.encode('utf-8'), digestmod=sha256)
hm.update(message.encode('utf-8'))
digest = hm.digest()
return urlsafe_b64encode(digest).decode('utf-8').replace('=', '')
class RequestFailure(Exception):
def __init__(self, message, ans, error_code=None):
super().__init__(message)
self.ans = ans
self.error_code = error_code

+ 5
- 0
pyjod/version.py View File

@ -0,0 +1,5 @@
# coding: utf-8
# file generated by setuptools_scm
# don't change, don't track in version control
__version__ = version = '0.1.dev0'
__version_tuple__ = version_tuple = (0, 1, 'dev0')

+ 126
- 0
pyjod/xkey.py View File

@ -0,0 +1,126 @@
import logging
from urllib.parse import urljoin
from uuid import uuid4
import requests
from .appdata import AppData
from .jwe_handler import JWEHandler
from .utils import sha256_base64, RequestFailure
logger = logging.getLogger(__name__)
class XKey:
REGISTRY_URL = ("https://appregistry-posteid.mobile.poste.it"
"/jod-app-registry/")
APP_NAME = "app-posteid-v3"
ACTIVITY_ID = "C6050AC80E8B5288A01237"
DEVICE_SPECS = ("Android", "11",
"sdk_gphone_x86_64_arm64", "4.5.204",
"true")
def __init__(self, profile_name):
self.appdata = AppData(profile_name)
self.s = requests.Session()
self.s.headers = {'Accept-Encoding': "gzip",
'User-Agent': "okhttp/3.12.1",
'Connection': "keep-alive"}
self.jwe_handler = JWEHandler(self.appdata)
def _send_req(self, request):
prepped = self.s.prepare_request(request)
return self.s.send(prepped)
def _register_stage_init(self, register_nonce):
url = urljoin(self.REGISTRY_URL, "v2/registerInit")
headers = {'Content-Type': "application/json; charset=UTF-8"}
data = {}
data['appName'] = "app-posteid-v3"
data['initCodeChallenge'] = sha256_base64(register_nonce)
logger.debug(f"Registration(INIT): sending challenge: {data}")
ans = self.s.post(url, headers=headers, json=data)
if ans.status_code != 200:
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
if not ans.headers.get('Content-Type').startswith("application/json"):
raise RequestFailure("Response not JSON.", ans)
ans_json = ans.json()
if 'pubServerKey' not in ans_json:
raise RequetFailure("Response does not contain 'pubServerKey'",
ans)
pubkey = ans_json['pubServerKey']
self.appdata.serv_pubkey = pubkey
logger.debug(f"Registration(INIT): got server pubkey: {pubkey}.")
def _register_stage_register(self, register_nonce):
url = urljoin(self.REGISTRY_URL, "v2/register")
data = {}
data['initCodeVerifier'] = register_nonce
data['xdevice'] = self.ACTIVITY_ID + "::" + ":".join(self.DEVICE_SPECS)
data['pubAppKey'] = self.appdata.app_privkey.pubkey_b64
logger.debug("Registration(REG): encrypting app data.")
jwe_req = self.jwe_handler.req_jwe_post(url, "register", data)
logger.debug("Registration(REG): sending app data.")
ans = self._send_req(jwe_req)
if ans.status_code != 200:
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
logger.debug("Registration(REG): decrypting response.")
ans_json = self.jwe_handler.decrypt(ans.content)
logger.debug(f"Registration(REG): decrypted response: {ans_json}")
if 'data' not in ans_json:
raise RequestFailure("Malformed request, missing 'data'.", ans)
if 'app-uuid' not in ans_json['data']:
raise RequestFailure("Malformed request, missing 'app-uuid'.", ans)
if 'otpSecretKey' not in ans_json['data']:
raise RequestFailure("Malformed request, missing 'otpSecretKey'.",
ans)
self.appdata['xkey_appuuid'] = ans_json['data']['app-uuid']
self.appdata['xkey_seed'] = ans_json['data']['otpSecretKey']
def _register_stage_activate(self):
url = urljoin(self.REGISTRY_URL, "v2/activation")
logger.debug("Registration(ACTIVATE): encrypting request.")
jwe_req = self.jwe_handler.req_jwe_post(url, "activation", {},
auth=True)
logger.debug("Registration(ACTIVATE): sending request.")
ans = self._send_req(jwe_req)
if ans.status_code != 200:
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
self.appdata['activated'] = True
logger.debug("Registration(ACTIVATE): activated.")
def _register_stage_update(self, register_nonce):
url = urljoin(self.REGISTRY_URL, "v2/register")
data = {}
data['initCodeVerifier'] = register_nonce
data['xdevice'] = self.ACTIVITY_ID + "::" + ":".join(self.DEVICE_SPECS)
data['pubAppKey'] = self.appdata.app_privkey.pubkey_b64
logger.debug("Registration(UPDATE): encrypting app data.")
jwe_req = self.jwe_handler.req_jwe_post(url, "registerUpdate", data,
auth=True)
logger.debug("Registration(UPDATE): sending app data.")
ans = self._send_req(jwe_req)
if ans.status_code != 200:
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
logger.debug("Registration(UPDATE): decrypting response.")
ans_json = self.jwe_handler.decrypt(ans.content)
logger.debug(f"Registration(UPDATE): decrypted response: {ans_json}")
if 'data' not in ans_json:
raise RequestFailure("Malformed request, missing 'data'.", ans)
if 'app-uuid' not in ans_json['data']:
raise RequestFailure("Malformed request, missing 'app-uuid'.", ans)
if 'otpSecretKey' not in ans_json['data']:
raise RequestFailure("Malformed request, missing 'otpSecretKey'.",
ans)
self.appdata['xkey_appuuid'] = ans_json['data']['app-uuid']
self.appdata['xkey_seed'] = ans_json['data']['otpSecretKey']
def register_xkey(self):
register_nonce = str(uuid4())
logger.debug(f"Starting registration (nonce: {register_nonce}).")
self._register_stage_init(register_nonce)
if self.appdata['xkey_seed']:
self._register_stage_update(register_nonce)
else:
self._register_stage_register(register_nonce)
self._register_stage_activate()

+ 10
- 0
pyproject.toml View File

@ -0,0 +1,10 @@
[build-system]
requires = [
"setuptools>=42",
"wheel",
"setuptools_scm[toml]>3.4",
]
build-backend = "setuptools.build_meta"
[tool.setuptools_scm]
write_to = "pyjod/version.py"

+ 38
- 0
setup.cfg View File

@ -0,0 +1,38 @@
[metadata]
name = pyjod
version = attr:pyjod.version
description = Python implementation of PosteID.
long_description = file:README.md
long_description_content_type = text/markdown
author = Vendetta
author_email = aaron@guerrilla.open
classifiers =
Development Status :: 1 - Planning
Programming Language :: Python
Programming Language :: Python :: 3.9
Programming Language :: Python :: 3.10
platforms = any
[options]
packages =
pyjod
install_requires =
xdgappdirs>=1.4
cryptography>=3.3
jwcrypto>=1.3
pathvalidate>=2.5
pyotp>=2.6
requests
setup_requires =
setuptools_scm
[options.extras_require]
qr =
numpy
qrcodeT
[options.entry_points]
console_scripts =
posteid = pyjod.cli:run

BIN
test.png View File

Before After
Width: 1920  |  Height: 1080  |  Size: 313 KiB

Loading…
Cancel
Save