import json
|
|
import logging
|
|
import re
|
|
from base64 import b32encode
|
|
from getpass import getpass
|
|
from urllib.parse import urljoin
|
|
from uuid import uuid4
|
|
|
|
import requests
|
|
from pyotp import TOTP
|
|
|
|
from .appdata import AppData
|
|
from .jwe_handler import JWEHandler
|
|
from .utils import RequestFailure, hmac_sha256
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class NotInitializedPIN(Exception):
|
|
pass
|
|
|
|
|
|
class AuthError(Exception):
|
|
pass
|
|
|
|
|
|
class SCA:
|
|
LOGIN_URL = "https://posteid.poste.it/jod-securelogin-schema/"
|
|
AUTH_URL = "https://posteid.poste.it/jod-login-schema/"
|
|
SH_URL = ("https://sh2-web-posteid.poste.it/jod-secure-holder2-web"
|
|
"/public/app/")
|
|
|
|
def __init__(self, profile_name):
|
|
self.appdata = AppData(profile_name)
|
|
self.s = requests.Session()
|
|
self.s.headers = {'Accept-Encoding': "gzip",
|
|
'User-Agent': "okhttp/3.12.1",
|
|
'Connection': "keep-alive"}
|
|
self.jwe_handler = JWEHandler(self.appdata)
|
|
self.reg_token = None
|
|
self.profile_token = None
|
|
self.access_token = None
|
|
|
|
def _send_req(self, request):
|
|
prepped = self.s.prepare_request(request)
|
|
return self.s.send(prepped)
|
|
|
|
def _parse_jwe_response(self, ans):
|
|
if ans.status_code != 200:
|
|
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
|
|
if not ans.headers.get('Content-Type').startswith("application/json"):
|
|
raise RequestFailure("Response not JSON.", ans)
|
|
ans_json = ans.json()
|
|
if not ans_json.get('command-success'):
|
|
raise RequestFailure(f"Command failed: {ans_json}", ans,
|
|
error_code=ans_json.get('command-error-code'))
|
|
if ans_json.get('command-result-type') not in ["JWE", "JSON"]:
|
|
raise RequestFailure(f"Result is not JWE/JSON: {ans_json}", ans)
|
|
return ans_json.get('command-result')
|
|
|
|
def _parse_v4(self, ans):
|
|
if ans.status_code != 200:
|
|
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
|
|
if not ans.headers.get('Content-Type').startswith("application/json"):
|
|
raise RequestFailure("Response not JSON.", ans)
|
|
ans_json = ans.json()
|
|
if (ans_json.get('status')
|
|
not in ["v4_success", "v4_pending", "v4_signed"]):
|
|
if ans_json.get('status') == "v4_error":
|
|
reason = ans_json.get('reason')
|
|
raise RequestFailure(f"Request v4 error: {reason}.", ans,
|
|
error_code=reason)
|
|
raise RequestFailure("Unknown v4 failure.", ans)
|
|
return ans_json
|
|
|
|
def _sign_challenge_v4(self, challenge, userpin):
|
|
userpin = str(userpin)
|
|
signature = {}
|
|
key = self.appdata['sca_seed'] + userpin + challenge['randK']
|
|
message = challenge['transaction-challenge']
|
|
signature['jti'] = challenge['jti']
|
|
signature['signature'] = hmac_sha256(key, message)
|
|
signature['userpin'] = userpin
|
|
signature['authzTool'] = "POSTEID"
|
|
return json.dumps(signature)
|
|
|
|
def _enrol_stage_basiclogin(self, username, password):
|
|
url = urljoin(self.LOGIN_URL, "v4/xmobileauthjwt")
|
|
data = {}
|
|
data['password'] = password
|
|
data['authLevel'] = "0"
|
|
data['userid'] = username
|
|
logger.debug("Enrol(basiclogin): sending username and password.")
|
|
jwe_req = self.jwe_handler.req_jwe_bearer(url, 'login', data,
|
|
auth=True)
|
|
ans = self._send_req(jwe_req)
|
|
if ans.status_code == 401:
|
|
raise AuthError("Wrong username or password!")
|
|
if ans.status_code != 200:
|
|
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
|
|
if 'X-PI' not in ans.headers:
|
|
raise RequestFailure(f"Malformed request, missing 'X-PI'.", ans)
|
|
uid = ans.headers.get('X-PI')
|
|
logger.debug(f"Enrol(basiclogin): completed, uid='{uid}'.")
|
|
|
|
def _enrol_stage_req_sms_otp(self, username):
|
|
url = urljoin(self.LOGIN_URL, "v4/xmobileauthjwt")
|
|
data = {}
|
|
data['password'] = str(uuid4())
|
|
data['authLevel'] = "3"
|
|
data['userid'] = username
|
|
logger.debug("Enrol(SMS-OTP-REQ): requesting SMS-OTP verification..")
|
|
jwe_req = self.jwe_handler.req_jwe_bearer(url, 'login', data,
|
|
auth=True)
|
|
ans = self._send_req(jwe_req)
|
|
if ans.status_code != 200:
|
|
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
|
|
if 'X-TEL' not in ans.headers:
|
|
raise RequestFailure(f"Malformed request, missing 'X-TEL'.", ans)
|
|
logger.debug("Enrol(SMS-OTP-REQ): SMS-OTP verification requested.")
|
|
tel = ans.headers.get('X-TEL')
|
|
logger.debug(f"Enrol(SMS-OTP-REQ): SMS-OTP sent to ***{tel}.")
|
|
return tel
|
|
|
|
def _enrol_stage_send_sms_otp(self, otp):
|
|
url = urljoin(self.LOGIN_URL, "v4/xmobileauthjwt")
|
|
data = {}
|
|
data['otp'] = str(otp)
|
|
data['authLevel'] = "2"
|
|
data['nonce'] = str(uuid4())
|
|
logger.debug("Enrol(SMS-OTP-AUTH): authenticating SMS-OTP.")
|
|
jwe_req = self.jwe_handler.req_jwe_bearer(url, 'login', data,
|
|
auth=True)
|
|
ans = self._send_req(jwe_req)
|
|
if ans.status_code == 401:
|
|
raise AuthError("Wrong SMS-OTP code!")
|
|
if ans.status_code != 200:
|
|
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
|
|
if 'X-RESULT' not in ans.headers:
|
|
raise RequestFailure(
|
|
f"Malformed request, missing 'X-RESULT'.", ans)
|
|
logger.debug("Enrol(SMS-OTP-AUTH): decrypting response.")
|
|
ans_json = self.jwe_handler.decrypt(ans.headers.get('X-RESULT'))
|
|
logger.debug(f"Enrol(SMS-OTP-AUTH): decrypted response: {ans_json}")
|
|
if 'data' not in ans_json:
|
|
raise RequestFailure("Malformed request, missing 'data'.", ans)
|
|
if 'token' not in ans_json['data']:
|
|
raise RequestFailure("Malformed request, missing 'token'.", ans)
|
|
reg_token = ans_json['data']['token']
|
|
logger.debug(f"Enrol(SMS-OTP-AUTH): got reg token: {reg_token}.")
|
|
self.reg_token = reg_token
|
|
|
|
def _enrol_stage_finalize(self, userpin=""):
|
|
url = urljoin(self.SH_URL, "v1/registerApp")
|
|
data = {}
|
|
data['userPIN'] = userpin
|
|
data['idpAccessToken'] = ""
|
|
data['registerToken'] = self.reg_token
|
|
logger.debug("Enrol(REG-APP): encrypting request.")
|
|
print(data)
|
|
jwe_req = self.jwe_handler.req_jwe_post(
|
|
url, 'registerApp', data, auth=True)
|
|
logger.debug("Enrol(REG-APP): sending request.")
|
|
ans = self._send_req(jwe_req)
|
|
|
|
try:
|
|
ans_jwe = self._parse_jwe_response(ans)
|
|
except RequestFailure as e:
|
|
if e.error_code == "PIN-ERR-1":
|
|
raise NotInitializedPIN()
|
|
raise e
|
|
|
|
ans_json = self.jwe_handler.decrypt(ans_jwe)
|
|
if 'data' not in ans_json:
|
|
raise RequestFailure("Malformed request, missing 'data'.", ans)
|
|
if 'appRegisterID' not in ans_json['data']:
|
|
raise RequestFailure(
|
|
"Malformed request, missing 'appRegisterID'.", ans)
|
|
if 'secretAPP' not in ans_json['data']:
|
|
raise RequestFailure(
|
|
"Malformed request, missing 'secretAPP'.", ans)
|
|
self.appdata['sca_app_regid'] = ans_json['data']['appRegisterID']
|
|
self.appdata['sca_seed'] = ans_json['data']['secretAPP']
|
|
logger.debug("Enrol(REG-APP): completed successfully!")
|
|
|
|
def check_register(self):
|
|
url = urljoin(self.SH_URL, "v1/checkRegisterApp")
|
|
data = {'appRegisterID': self.appdata['sca_app_regid']}
|
|
jwe_req = self.jwe_handler.req_jwe_post(url, "checkRegisterApp", data,
|
|
auth=True)
|
|
ans = self._send_req(jwe_req)
|
|
result = self._parse_jwe_response(ans)
|
|
if 'valid' not in result:
|
|
raise RequestFailure(
|
|
"Malformed request, missing 'valid'.", ans)
|
|
return result["valid"]
|
|
|
|
def enrol_sms_start(self, username, password):
|
|
self._enrol_stage_basiclogin(username, password)
|
|
tel = self._enrol_stage_req_sms_otp(username)
|
|
return tel
|
|
|
|
def enrol_sms_finish(self, otp, force_userpin=None):
|
|
self._enrol_stage_send_sms_otp(otp)
|
|
try:
|
|
self._enrol_stage_finalize()
|
|
except NotInitializedPIN as e:
|
|
if force_userpin:
|
|
logger.info('Enrol(REG-APP): Setting the new provided PIN.')
|
|
self._enrol_stage_finalize(force_userpin)
|
|
return
|
|
raise e
|
|
|
|
def _pin_login(self, userpin):
|
|
url_challenge = urljoin(self.LOGIN_URL,
|
|
"secureholder/v4/native/challenge")
|
|
url_authorize = urljoin(self.LOGIN_URL,
|
|
"secureholder/v4/native/az")
|
|
logger.debug("PINLogin: acquiring v4 challenge.")
|
|
req = self.jwe_handler.req_jwe_post(url_challenge, "login", {},
|
|
auth=True)
|
|
ans = self._send_req(req)
|
|
ans = self._parse_v4(ans)
|
|
logger.debug(f"PINLogin: got v4 challenge: {ans}.")
|
|
logger.debug(f"PINLogin: preparing challenge response.")
|
|
data = {}
|
|
data['signature'] = self._sign_challenge_v4(ans, userpin)
|
|
data['appRegisterID'] = self.appdata['sca_app_regid']
|
|
logger.debug(f"PINLogin: sending response {data}.")
|
|
req = self.jwe_handler.req_jwe_post(url_authorize, "login", data,
|
|
auth=True)
|
|
ans = self._send_req(req)
|
|
try:
|
|
ans = self._parse_v4(ans)
|
|
except RequestFailure as e:
|
|
if e.error_code == "PIN-ERR-3":
|
|
raise AuthError("Codice PosteID errato!")
|
|
elif e.error_code == "CERT-ERR-2":
|
|
raise AuthError("Codice PosteID bloccato per troppi errori!")
|
|
raise(e)
|
|
self.profile_token = ans['profile_token']
|
|
self.access_token = ans['access_token']
|
|
logger.debug(f"PINLogin: logged in, session token aquired.")
|
|
|
|
def _unenrol(self):
|
|
if not self.access_token:
|
|
raise Exception(
|
|
'Need to acquire an access token (pin_login) before.')
|
|
logger.debug("Unenrol: unenrolling device.")
|
|
url = urljoin(self.LOGIN_URL, "secureholder/v4/native/delete-posteid")
|
|
jwe_req = self.jwe_handler.req_jwe_bearer(url, "delete_posteid", {},
|
|
auth=True)
|
|
jwe_req.headers['Authorization'] = "Bearer " + self.access_token
|
|
ans = self._send_req(jwe_req)
|
|
ans = self._parse_v4(ans)
|
|
logger.debug("Unenrol: device unenrolled.")
|
|
|
|
def list_txs(self):
|
|
if not self.access_token:
|
|
raise Exception(
|
|
"Need to acquire an access token (pin_login) before.")
|
|
url = urljoin(self.LOGIN_URL,
|
|
"secureholder/v4/native/list-transaction")
|
|
jwe_req = self.jwe_handler.req_jwe_post(url, "login", {}, auth=True)
|
|
jwe_req.headers['Authorization'] = "Bearer " + self.access_token
|
|
ans = self._send_req(jwe_req)
|
|
ans_json = self._parse_v4(ans)
|
|
if 'transaction' not in ans_json:
|
|
raise RequestFailure(
|
|
f"Malformed response, missing 'transaction'.", ans)
|
|
return ans_json['transaction']
|
|
|
|
def authorize_tx_start(self, tx_id):
|
|
data = {}
|
|
data['jti'] = tx_id
|
|
data['appRegisterID']: self.appdata['sca_app_regid']
|
|
url = urljoin(self.AUTH_URL, "secureholder/v4/challenge")
|
|
jwe_req = self.jwe_handler.req_jwe_post(url, "login", data, auth=True)
|
|
ans = self._send_req(jwe_req)
|
|
ans_json = self._parse_v4(ans)
|
|
if ans_json['status'] != "v4_pending":
|
|
raise Exception(
|
|
f"TX Status is '{ans_json['status']}', not pending.")
|
|
if 'jti' not in ans_json:
|
|
raise RequestFailure(f"Malformed response, missing 'jti'.", ans)
|
|
if 'randK' not in ans_json:
|
|
raise RequestFailure(f"Malformed response, missing 'randK'.", ans)
|
|
if 'transaction-challenge' not in ans_json:
|
|
raise RequestFailure(
|
|
f"Malformed response, missing 'transaction-challenge'.", ans)
|
|
return ans_json
|
|
|
|
def authorize_tx_finish(self, challenge, userpin):
|
|
signature = self._sign_challenge_v4(challenge, userpin)
|
|
data = {}
|
|
data['signature'] = signature
|
|
data['appRegisterID'] = self.appdata['sca_app_regid']
|
|
url = urljoin(self.AUTH_URL, "secureholder/v4/az")
|
|
jwe_req = self.jwe_handler.req_jwe_post(url, "login", data, auth=True)
|
|
ans = self._send_req(jwe_req)
|
|
try:
|
|
ans_json = self._parse_v4(ans)
|
|
except RequestFailure as e:
|
|
if e.error_code == "PIN-ERR-3":
|
|
raise AuthError("Codice PosteID errato!")
|
|
elif e.error_code == "CERT-ERR-2":
|
|
raise AuthError("Codice PosteID bloccato per troppi errori!")
|
|
raise(e)
|
|
if ans_json['status'] != "v4_signed":
|
|
raise Exception(
|
|
f"TX Status is '{ans_json['status']}', not signed.")
|
|
|
|
def unenrol(self, userpin):
|
|
self._pin_login(userpin)
|
|
self._unenrol()
|
|
|
|
@property
|
|
def totp(self):
|
|
seed = self.appdata['sca_seed']
|
|
seed = b32encode(seed.encode("utf-8") + b'\0' * 32)
|
|
seed = seed.decode("utf-8").replace('=', '')
|
|
totp = TOTP(seed, interval=120)
|
|
return totp
|