import json import logging import re from base64 import b32encode from getpass import getpass from urllib.parse import urljoin from uuid import uuid4 import requests from pyotp import TOTP from .appdata import AppData from .jwe_handler import JWEHandler from .utils import RequestFailure, hmac_sha256 logger = logging.getLogger(__name__) class NotInitializedPIN(Exception): pass class AuthError(Exception): pass class SCA: LOGIN_URL = "https://posteid.poste.it/jod-securelogin-schema/" AUTH_URL = "https://posteid.poste.it/jod-login-schema/" SH_URL = ("https://sh2-web-posteid.poste.it/jod-secure-holder2-web" "/public/app/") def __init__(self, profile_name): self.appdata = AppData(profile_name) self.s = requests.Session() self.s.headers = {'Accept-Encoding': "gzip", 'User-Agent': "okhttp/3.12.1", 'Connection': "keep-alive"} self.jwe_handler = JWEHandler(self.appdata) self.reg_token = None self.profile_token = None self.access_token = None def _send_req(self, request): prepped = self.s.prepare_request(request) return self.s.send(prepped) def _parse_jwe_response(self, ans): if ans.status_code != 200: raise RequestFailure(f"Wrong status code: {ans.status_code}", ans) if not ans.headers.get('Content-Type').startswith("application/json"): raise RequestFailure("Response not JSON.", ans) ans_json = ans.json() if not ans_json.get('command-success'): raise RequestFailure(f"Command failed: {ans_json}", ans, error_code=ans_json.get('command-error-code')) if ans_json.get('command-result-type') not in ["JWE", "JSON"]: raise RequestFailure(f"Result is not JWE/JSON: {ans_json}", ans) return ans_json.get('command-result') def _parse_v4(self, ans): if ans.status_code != 200: raise RequestFailure(f"Wrong status code: {ans.status_code}", ans) if not ans.headers.get('Content-Type').startswith("application/json"): raise RequestFailure("Response not JSON.", ans) ans_json = ans.json() if (ans_json.get('status') not in ["v4_success", "v4_pending", "v4_signed"]): if ans_json.get('status') == "v4_error": reason = ans_json.get('reason') raise RequestFailure(f"Request v4 error: {reason}.", ans, error_code=reason) raise RequestFailure("Unknown v4 failure.", ans) return ans_json def _sign_challenge_v4(self, challenge, userpin): userpin = str(userpin) signature = {} key = self.appdata['sca_seed'] + userpin + challenge['randK'] message = challenge['transaction-challenge'] signature['jti'] = challenge['jti'] signature['signature'] = hmac_sha256(key, message) signature['userpin'] = userpin signature['authzTool'] = "POSTEID" return json.dumps(signature) def _enrol_stage_basiclogin(self, username, password): url = urljoin(self.LOGIN_URL, "v4/xmobileauthjwt") data = {} data['password'] = password data['authLevel'] = "0" data['userid'] = username logger.debug("Enrol(basiclogin): sending username and password.") jwe_req = self.jwe_handler.req_jwe_bearer(url, 'login', data, auth=True) ans = self._send_req(jwe_req) if ans.status_code == 401: raise AuthError("Wrong username or password!") if ans.status_code != 200: raise RequestFailure(f"Wrong status code: {ans.status_code}", ans) if 'X-PI' not in ans.headers: raise RequestFailure(f"Malformed request, missing 'X-PI'.", ans) uid = ans.headers.get('X-PI') logger.debug(f"Enrol(basiclogin): completed, uid='{uid}'.") def _enrol_stage_req_sms_otp(self, username): url = urljoin(self.LOGIN_URL, "v4/xmobileauthjwt") data = {} data['password'] = str(uuid4()) data['authLevel'] = "3" data['userid'] = username logger.debug("Enrol(SMS-OTP-REQ): requesting SMS-OTP verification..") jwe_req = self.jwe_handler.req_jwe_bearer(url, 'login', data, auth=True) ans = self._send_req(jwe_req) if ans.status_code != 200: raise RequestFailure(f"Wrong status code: {ans.status_code}", ans) if 'X-TEL' not in ans.headers: raise RequestFailure(f"Malformed request, missing 'X-TEL'.", ans) logger.debug("Enrol(SMS-OTP-REQ): SMS-OTP verification requested.") tel = ans.headers.get('X-TEL') logger.debug(f"Enrol(SMS-OTP-REQ): SMS-OTP sent to ***{tel}.") return tel def _enrol_stage_send_sms_otp(self, otp): url = urljoin(self.LOGIN_URL, "v4/xmobileauthjwt") data = {} data['otp'] = str(otp) data['authLevel'] = "2" data['nonce'] = str(uuid4()) logger.debug("Enrol(SMS-OTP-AUTH): authenticating SMS-OTP.") jwe_req = self.jwe_handler.req_jwe_bearer(url, 'login', data, auth=True) ans = self._send_req(jwe_req) if ans.status_code == 401: raise AuthError("Wrong SMS-OTP code!") if ans.status_code != 200: raise RequestFailure(f"Wrong status code: {ans.status_code}", ans) if 'X-RESULT' not in ans.headers: raise RequestFailure( f"Malformed request, missing 'X-RESULT'.", ans) logger.debug("Enrol(SMS-OTP-AUTH): decrypting response.") ans_json = self.jwe_handler.decrypt(ans.headers.get('X-RESULT')) logger.debug(f"Enrol(SMS-OTP-AUTH): decrypted response: {ans_json}") if 'data' not in ans_json: raise RequestFailure("Malformed request, missing 'data'.", ans) if 'token' not in ans_json['data']: raise RequestFailure("Malformed request, missing 'token'.", ans) reg_token = ans_json['data']['token'] logger.debug(f"Enrol(SMS-OTP-AUTH): got reg token: {reg_token}.") self.reg_token = reg_token def _enrol_stage_finalize(self, userpin=""): url = urljoin(self.SH_URL, "v1/registerApp") data = {} data['userPIN'] = userpin data['idpAccessToken'] = "" data['registerToken'] = self.reg_token logger.debug("Enrol(REG-APP): encrypting request.") print(data) jwe_req = self.jwe_handler.req_jwe_post( url, 'registerApp', data, auth=True) logger.debug("Enrol(REG-APP): sending request.") ans = self._send_req(jwe_req) try: ans_jwe = self._parse_jwe_response(ans) except RequestFailure as e: if e.error_code == "PIN-ERR-1": raise NotInitializedPIN() raise e ans_json = self.jwe_handler.decrypt(ans_jwe) if 'data' not in ans_json: raise RequestFailure("Malformed request, missing 'data'.", ans) if 'appRegisterID' not in ans_json['data']: raise RequestFailure( "Malformed request, missing 'appRegisterID'.", ans) if 'secretAPP' not in ans_json['data']: raise RequestFailure( "Malformed request, missing 'secretAPP'.", ans) self.appdata['sca_app_regid'] = ans_json['data']['appRegisterID'] self.appdata['sca_seed'] = ans_json['data']['secretAPP'] logger.debug("Enrol(REG-APP): completed successfully!") def check_register(self): url = urljoin(self.SH_URL, "v1/checkRegisterApp") data = {'appRegisterID': self.appdata['sca_app_regid']} jwe_req = self.jwe_handler.req_jwe_post(url, "checkRegisterApp", data, auth=True) ans = self._send_req(jwe_req) result = self._parse_jwe_response(ans) if 'valid' not in result: raise RequestFailure( "Malformed request, missing 'valid'.", ans) return result["valid"] def enrol_sms_start(self, username, password): self._enrol_stage_basiclogin(username, password) tel = self._enrol_stage_req_sms_otp(username) return tel def enrol_sms_finish(self, otp, force_userpin=None): self._enrol_stage_send_sms_otp(otp) try: self._enrol_stage_finalize() except NotInitializedPIN as e: if force_userpin: logger.info('Enrol(REG-APP): Setting the new provided PIN.') self._enrol_stage_finalize(force_userpin) return raise e def _pin_login(self, userpin): url_challenge = urljoin(self.LOGIN_URL, "secureholder/v4/native/challenge") url_authorize = urljoin(self.LOGIN_URL, "secureholder/v4/native/az") logger.debug("PINLogin: acquiring v4 challenge.") req = self.jwe_handler.req_jwe_post(url_challenge, "login", {}, auth=True) ans = self._send_req(req) ans = self._parse_v4(ans) logger.debug(f"PINLogin: got v4 challenge: {ans}.") logger.debug(f"PINLogin: preparing challenge response.") data = {} data['signature'] = self._sign_challenge_v4(ans, userpin) data['appRegisterID'] = self.appdata['sca_app_regid'] logger.debug(f"PINLogin: sending response {data}.") req = self.jwe_handler.req_jwe_post(url_authorize, "login", data, auth=True) ans = self._send_req(req) try: ans = self._parse_v4(ans) except RequestFailure as e: if e.error_code == "PIN-ERR-3": raise AuthError("Codice PosteID errato!") elif e.error_code == "CERT-ERR-2": raise AuthError("Codice PosteID bloccato per troppi errori!") raise(e) self.profile_token = ans['profile_token'] self.access_token = ans['access_token'] logger.debug(f"PINLogin: logged in, session token aquired.") def _unenrol(self): if not self.access_token: raise Exception( 'Need to acquire an access token (pin_login) before.') logger.debug("Unenrol: unenrolling device.") url = urljoin(self.LOGIN_URL, "secureholder/v4/native/delete-posteid") jwe_req = self.jwe_handler.req_jwe_bearer(url, "delete_posteid", {}, auth=True) jwe_req.headers['Authorization'] = "Bearer " + self.access_token ans = self._send_req(jwe_req) ans = self._parse_v4(ans) logger.debug("Unenrol: device unenrolled.") def list_txs(self): if not self.access_token: raise Exception( "Need to acquire an access token (pin_login) before.") url = urljoin(self.LOGIN_URL, "secureholder/v4/native/list-transaction") jwe_req = self.jwe_handler.req_jwe_post(url, "login", {}, auth=True) jwe_req.headers['Authorization'] = "Bearer " + self.access_token ans = self._send_req(jwe_req) ans_json = self._parse_v4(ans) if 'transaction' not in ans_json: raise RequestFailure( f"Malformed response, missing 'transaction'.", ans) return ans_json['transaction'] def authorize_tx_start(self, tx_id): data = {} data['jti'] = tx_id data['appRegisterID']: self.appdata['sca_app_regid'] url = urljoin(self.AUTH_URL, "secureholder/v4/challenge") jwe_req = self.jwe_handler.req_jwe_post(url, "login", data, auth=True) ans = self._send_req(jwe_req) ans_json = self._parse_v4(ans) if ans_json['status'] != "v4_pending": raise Exception( f"TX Status is '{ans_json['status']}', not pending.") if 'jti' not in ans_json: raise RequestFailure(f"Malformed response, missing 'jti'.", ans) if 'randK' not in ans_json: raise RequestFailure(f"Malformed response, missing 'randK'.", ans) if 'transaction-challenge' not in ans_json: raise RequestFailure( f"Malformed response, missing 'transaction-challenge'.", ans) return ans_json def authorize_tx_finish(self, challenge, userpin): signature = self._sign_challenge_v4(challenge, userpin) data = {} data['signature'] = signature data['appRegisterID'] = self.appdata['sca_app_regid'] url = urljoin(self.AUTH_URL, "secureholder/v4/az") jwe_req = self.jwe_handler.req_jwe_post(url, "login", data, auth=True) ans = self._send_req(jwe_req) try: ans_json = self._parse_v4(ans) except RequestFailure as e: if e.error_code == "PIN-ERR-3": raise AuthError("Codice PosteID errato!") elif e.error_code == "CERT-ERR-2": raise AuthError("Codice PosteID bloccato per troppi errori!") raise(e) if ans_json['status'] != "v4_signed": raise Exception( f"TX Status is '{ans_json['status']}', not signed.") def unenrol(self, userpin): self._pin_login(userpin) self._unenrol() @property def totp(self): seed = self.appdata['sca_seed'] seed = b32encode(seed.encode("utf-8") + b'\0' * 32) seed = seed.decode("utf-8").replace('=', '') totp = TOTP(seed, interval=120) return totp