Implementazione open-source del protocollo di Strong Customer Authentication di Poste Italiane, (https://posteid.poste.it), lato client.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

323 lines
14 KiB

import json
import logging
import re
from base64 import b32encode
from getpass import getpass
from urllib.parse import urljoin
from uuid import uuid4
import requests
from pyotp import TOTP
from .appdata import AppData
from .jwe_handler import JWEHandler
from .utils import RequestFailure, hmac_sha256
logger = logging.getLogger(__name__)
class NotInitializedPIN(Exception):
pass
class AuthError(Exception):
pass
class SCA:
LOGIN_URL = "https://posteid.poste.it/jod-securelogin-schema/"
AUTH_URL = "https://posteid.poste.it/jod-login-schema/"
SH_URL = ("https://sh2-web-posteid.poste.it/jod-secure-holder2-web"
"/public/app/")
def __init__(self, profile_name):
self.appdata = AppData(profile_name)
self.s = requests.Session()
self.s.headers = {'Accept-Encoding': "gzip",
'User-Agent': "okhttp/3.12.1",
'Connection': "keep-alive"}
self.jwe_handler = JWEHandler(self.appdata)
self.reg_token = None
self.profile_token = None
self.access_token = None
def _send_req(self, request):
prepped = self.s.prepare_request(request)
return self.s.send(prepped)
def _parse_jwe_response(self, ans):
if ans.status_code != 200:
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
if not ans.headers.get('Content-Type').startswith("application/json"):
raise RequestFailure("Response not JSON.", ans)
ans_json = ans.json()
if not ans_json.get('command-success'):
raise RequestFailure(f"Command failed: {ans_json}", ans,
error_code=ans_json.get('command-error-code'))
if ans_json.get('command-result-type') not in ["JWE", "JSON"]:
raise RequestFailure(f"Result is not JWE/JSON: {ans_json}", ans)
return ans_json.get('command-result')
def _parse_v4(self, ans):
if ans.status_code != 200:
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
if not ans.headers.get('Content-Type').startswith("application/json"):
raise RequestFailure("Response not JSON.", ans)
ans_json = ans.json()
if (ans_json.get('status')
not in ["v4_success", "v4_pending", "v4_signed"]):
if ans_json.get('status') == "v4_error":
reason = ans_json.get('reason')
raise RequestFailure(f"Request v4 error: {reason}.", ans,
error_code=reason)
raise RequestFailure("Unknown v4 failure.", ans)
return ans_json
def _sign_challenge_v4(self, challenge, userpin):
userpin = str(userpin)
signature = {}
key = self.appdata['sca_seed'] + userpin + challenge['randK']
message = challenge['transaction-challenge']
signature['jti'] = challenge['jti']
signature['signature'] = hmac_sha256(key, message)
signature['userpin'] = userpin
signature['authzTool'] = "POSTEID"
return json.dumps(signature)
def _enrol_stage_basiclogin(self, username, password):
url = urljoin(self.LOGIN_URL, "v4/xmobileauthjwt")
data = {}
data['password'] = password
data['authLevel'] = "0"
data['userid'] = username
logger.debug("Enrol(basiclogin): sending username and password.")
jwe_req = self.jwe_handler.req_jwe_bearer(url, 'login', data,
auth=True)
ans = self._send_req(jwe_req)
if ans.status_code == 401:
raise AuthError("Wrong username or password!")
if ans.status_code != 200:
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
if 'X-PI' not in ans.headers:
raise RequestFailure(f"Malformed request, missing 'X-PI'.", ans)
uid = ans.headers.get('X-PI')
logger.debug(f"Enrol(basiclogin): completed, uid='{uid}'.")
def _enrol_stage_req_sms_otp(self, username):
url = urljoin(self.LOGIN_URL, "v4/xmobileauthjwt")
data = {}
data['password'] = str(uuid4())
data['authLevel'] = "3"
data['userid'] = username
logger.debug("Enrol(SMS-OTP-REQ): requesting SMS-OTP verification..")
jwe_req = self.jwe_handler.req_jwe_bearer(url, 'login', data,
auth=True)
ans = self._send_req(jwe_req)
if ans.status_code != 200:
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
if 'X-TEL' not in ans.headers:
raise RequestFailure(f"Malformed request, missing 'X-TEL'.", ans)
logger.debug("Enrol(SMS-OTP-REQ): SMS-OTP verification requested.")
tel = ans.headers.get('X-TEL')
logger.debug(f"Enrol(SMS-OTP-REQ): SMS-OTP sent to ***{tel}.")
return tel
def _enrol_stage_send_sms_otp(self, otp):
url = urljoin(self.LOGIN_URL, "v4/xmobileauthjwt")
data = {}
data['otp'] = str(otp)
data['authLevel'] = "2"
data['nonce'] = str(uuid4())
logger.debug("Enrol(SMS-OTP-AUTH): authenticating SMS-OTP.")
jwe_req = self.jwe_handler.req_jwe_bearer(url, 'login', data,
auth=True)
ans = self._send_req(jwe_req)
if ans.status_code == 401:
raise AuthError("Wrong SMS-OTP code!")
if ans.status_code != 200:
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
if 'X-RESULT' not in ans.headers:
raise RequestFailure(
f"Malformed request, missing 'X-RESULT'.", ans)
logger.debug("Enrol(SMS-OTP-AUTH): decrypting response.")
ans_json = self.jwe_handler.decrypt(ans.headers.get('X-RESULT'))
logger.debug(f"Enrol(SMS-OTP-AUTH): decrypted response: {ans_json}")
if 'data' not in ans_json:
raise RequestFailure("Malformed request, missing 'data'.", ans)
if 'token' not in ans_json['data']:
raise RequestFailure("Malformed request, missing 'token'.", ans)
reg_token = ans_json['data']['token']
logger.debug(f"Enrol(SMS-OTP-AUTH): got reg token: {reg_token}.")
self.reg_token = reg_token
def _enrol_stage_finalize(self, userpin=""):
url = urljoin(self.SH_URL, "v1/registerApp")
data = {}
data['userPIN'] = userpin
data['idpAccessToken'] = ""
data['registerToken'] = self.reg_token
logger.debug("Enrol(REG-APP): encrypting request.")
print(data)
jwe_req = self.jwe_handler.req_jwe_post(
url, 'registerApp', data, auth=True)
logger.debug("Enrol(REG-APP): sending request.")
ans = self._send_req(jwe_req)
try:
ans_jwe = self._parse_jwe_response(ans)
except RequestFailure as e:
if e.error_code == "PIN-ERR-1":
raise NotInitializedPIN()
raise e
ans_json = self.jwe_handler.decrypt(ans_jwe)
if 'data' not in ans_json:
raise RequestFailure("Malformed request, missing 'data'.", ans)
if 'appRegisterID' not in ans_json['data']:
raise RequestFailure(
"Malformed request, missing 'appRegisterID'.", ans)
if 'secretAPP' not in ans_json['data']:
raise RequestFailure(
"Malformed request, missing 'secretAPP'.", ans)
self.appdata['sca_app_regid'] = ans_json['data']['appRegisterID']
self.appdata['sca_seed'] = ans_json['data']['secretAPP']
logger.debug("Enrol(REG-APP): completed successfully!")
def check_register(self):
url = urljoin(self.SH_URL, "v1/checkRegisterApp")
data = {'appRegisterID': self.appdata['sca_app_regid']}
jwe_req = self.jwe_handler.req_jwe_post(url, "checkRegisterApp", data,
auth=True)
ans = self._send_req(jwe_req)
result = self._parse_jwe_response(ans)
if 'valid' not in result:
raise RequestFailure(
"Malformed request, missing 'valid'.", ans)
return result["valid"]
def enrol_sms_start(self, username, password):
self._enrol_stage_basiclogin(username, password)
tel = self._enrol_stage_req_sms_otp(username)
return tel
def enrol_sms_finish(self, otp, force_userpin=None):
self._enrol_stage_send_sms_otp(otp)
try:
self._enrol_stage_finalize()
except NotInitializedPIN as e:
if force_userpin:
logger.info('Enrol(REG-APP): Setting the new provided PIN.')
self._enrol_stage_finalize(force_userpin)
return
raise e
def _pin_login(self, userpin):
url_challenge = urljoin(self.LOGIN_URL,
"secureholder/v4/native/challenge")
url_authorize = urljoin(self.LOGIN_URL,
"secureholder/v4/native/az")
logger.debug("PINLogin: acquiring v4 challenge.")
req = self.jwe_handler.req_jwe_post(url_challenge, "login", {},
auth=True)
ans = self._send_req(req)
ans = self._parse_v4(ans)
logger.debug(f"PINLogin: got v4 challenge: {ans}.")
logger.debug(f"PINLogin: preparing challenge response.")
data = {}
data['signature'] = self._sign_challenge_v4(ans, userpin)
data['appRegisterID'] = self.appdata['sca_app_regid']
logger.debug(f"PINLogin: sending response {data}.")
req = self.jwe_handler.req_jwe_post(url_authorize, "login", data,
auth=True)
ans = self._send_req(req)
try:
ans = self._parse_v4(ans)
except RequestFailure as e:
if e.error_code == "PIN-ERR-3":
raise AuthError("Codice PosteID errato!")
elif e.error_code == "CERT-ERR-2":
raise AuthError("Codice PosteID bloccato per troppi errori!")
raise(e)
self.profile_token = ans['profile_token']
self.access_token = ans['access_token']
logger.debug(f"PINLogin: logged in, session token aquired.")
def _unenrol(self):
if not self.access_token:
raise Exception(
'Need to acquire an access token (pin_login) before.')
logger.debug("Unenrol: unenrolling device.")
url = urljoin(self.LOGIN_URL, "secureholder/v4/native/delete-posteid")
jwe_req = self.jwe_handler.req_jwe_bearer(url, "delete_posteid", {},
auth=True)
jwe_req.headers['Authorization'] = "Bearer " + self.access_token
ans = self._send_req(jwe_req)
ans = self._parse_v4(ans)
logger.debug("Unenrol: device unenrolled.")
def list_txs(self):
if not self.access_token:
raise Exception(
"Need to acquire an access token (pin_login) before.")
url = urljoin(self.LOGIN_URL,
"secureholder/v4/native/list-transaction")
jwe_req = self.jwe_handler.req_jwe_post(url, "login", {}, auth=True)
jwe_req.headers['Authorization'] = "Bearer " + self.access_token
ans = self._send_req(jwe_req)
ans_json = self._parse_v4(ans)
if 'transaction' not in ans_json:
raise RequestFailure(
f"Malformed response, missing 'transaction'.", ans)
return ans_json['transaction']
def authorize_tx_start(self, tx_id):
data = {}
data['jti'] = tx_id
data['appRegisterID']: self.appdata['sca_app_regid']
url = urljoin(self.AUTH_URL, "secureholder/v4/challenge")
jwe_req = self.jwe_handler.req_jwe_post(url, "login", data, auth=True)
ans = self._send_req(jwe_req)
ans_json = self._parse_v4(ans)
if ans_json['status'] != "v4_pending":
raise Exception(
f"TX Status is '{ans_json['status']}', not pending.")
if 'jti' not in ans_json:
raise RequestFailure(f"Malformed response, missing 'jti'.", ans)
if 'randK' not in ans_json:
raise RequestFailure(f"Malformed response, missing 'randK'.", ans)
if 'transaction-challenge' not in ans_json:
raise RequestFailure(
f"Malformed response, missing 'transaction-challenge'.", ans)
return ans_json
def authorize_tx_finish(self, challenge, userpin):
signature = self._sign_challenge_v4(challenge, userpin)
data = {}
data['signature'] = signature
data['appRegisterID'] = self.appdata['sca_app_regid']
url = urljoin(self.AUTH_URL, "secureholder/v4/az")
jwe_req = self.jwe_handler.req_jwe_post(url, "login", data, auth=True)
ans = self._send_req(jwe_req)
try:
ans_json = self._parse_v4(ans)
except RequestFailure as e:
if e.error_code == "PIN-ERR-3":
raise AuthError("Codice PosteID errato!")
elif e.error_code == "CERT-ERR-2":
raise AuthError("Codice PosteID bloccato per troppi errori!")
raise(e)
if ans_json['status'] != "v4_signed":
raise Exception(
f"TX Status is '{ans_json['status']}', not signed.")
def unenrol(self, userpin):
self._pin_login(userpin)
self._unenrol()
@property
def totp(self):
seed = self.appdata['sca_seed']
seed = b32encode(seed.encode("utf-8") + b'\0' * 32)
seed = seed.decode("utf-8").replace('=', '')
totp = TOTP(seed, interval=120)
return totp