|
import logging
|
|
from urllib.parse import urljoin
|
|
from uuid import uuid4
|
|
|
|
import requests
|
|
|
|
from .appdata import AppData
|
|
from .jwe_handler import JWEHandler
|
|
from .utils import sha256_base64, RequestFailure
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class XKey:
|
|
REGISTRY_URL = ("https://appregistry-posteid.mobile.poste.it"
|
|
"/jod-app-registry/")
|
|
APP_NAME = "app-posteid-v3"
|
|
ACTIVITY_ID = "C6050AC80E8B5288A01237"
|
|
DEVICE_SPECS = ("Android", "11",
|
|
"sdk_gphone_x86_64_arm64", "4.5.204",
|
|
"true")
|
|
|
|
def __init__(self, profile_name):
|
|
self.appdata = AppData(profile_name)
|
|
self.s = requests.Session()
|
|
self.s.headers = {'Accept-Encoding': "gzip",
|
|
'User-Agent': "okhttp/3.12.1",
|
|
'Connection': "keep-alive"}
|
|
self.jwe_handler = JWEHandler(self.appdata)
|
|
|
|
def _send_req(self, request):
|
|
prepped = self.s.prepare_request(request)
|
|
return self.s.send(prepped)
|
|
|
|
def _register_stage_init(self, register_nonce):
|
|
url = urljoin(self.REGISTRY_URL, "v2/registerInit")
|
|
headers = {'Content-Type': "application/json; charset=UTF-8"}
|
|
data = {}
|
|
data['appName'] = "app-posteid-v3"
|
|
data['initCodeChallenge'] = sha256_base64(register_nonce)
|
|
logger.debug(f"Registration(INIT): sending challenge: {data}")
|
|
ans = self.s.post(url, headers=headers, json=data)
|
|
if ans.status_code != 200:
|
|
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
|
|
if not ans.headers.get('Content-Type').startswith("application/json"):
|
|
raise RequestFailure("Response not JSON.", ans)
|
|
ans_json = ans.json()
|
|
if 'pubServerKey' not in ans_json:
|
|
raise RequetFailure("Response does not contain 'pubServerKey'",
|
|
ans)
|
|
pubkey = ans_json['pubServerKey']
|
|
self.appdata.serv_pubkey = pubkey
|
|
logger.debug(f"Registration(INIT): got server pubkey: {pubkey}.")
|
|
|
|
def _register_stage_register(self, register_nonce):
|
|
url = urljoin(self.REGISTRY_URL, "v2/register")
|
|
data = {}
|
|
data['initCodeVerifier'] = register_nonce
|
|
data['xdevice'] = self.ACTIVITY_ID + "::" + ":".join(self.DEVICE_SPECS)
|
|
data['pubAppKey'] = self.appdata.app_privkey.pubkey_b64
|
|
logger.debug("Registration(REG): encrypting app data.")
|
|
jwe_req = self.jwe_handler.req_jwe_post(url, "register", data)
|
|
logger.debug("Registration(REG): sending app data.")
|
|
ans = self._send_req(jwe_req)
|
|
if ans.status_code != 200:
|
|
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
|
|
logger.debug("Registration(REG): decrypting response.")
|
|
ans_json = self.jwe_handler.decrypt(ans.content)
|
|
logger.debug(f"Registration(REG): decrypted response: {ans_json}")
|
|
if 'data' not in ans_json:
|
|
raise RequestFailure("Malformed request, missing 'data'.", ans)
|
|
if 'app-uuid' not in ans_json['data']:
|
|
raise RequestFailure("Malformed request, missing 'app-uuid'.", ans)
|
|
if 'otpSecretKey' not in ans_json['data']:
|
|
raise RequestFailure("Malformed request, missing 'otpSecretKey'.",
|
|
ans)
|
|
self.appdata['xkey_appuuid'] = ans_json['data']['app-uuid']
|
|
self.appdata['xkey_seed'] = ans_json['data']['otpSecretKey']
|
|
|
|
def _register_stage_activate(self):
|
|
url = urljoin(self.REGISTRY_URL, "v2/activation")
|
|
logger.debug("Registration(ACTIVATE): encrypting request.")
|
|
jwe_req = self.jwe_handler.req_jwe_post(url, "activation", {},
|
|
auth=True)
|
|
logger.debug("Registration(ACTIVATE): sending request.")
|
|
ans = self._send_req(jwe_req)
|
|
if ans.status_code != 200:
|
|
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
|
|
self.appdata['activated'] = True
|
|
logger.debug("Registration(ACTIVATE): activated.")
|
|
|
|
def _register_stage_update(self, register_nonce):
|
|
url = urljoin(self.REGISTRY_URL, "v2/register")
|
|
data = {}
|
|
data['initCodeVerifier'] = register_nonce
|
|
data['xdevice'] = self.ACTIVITY_ID + "::" + ":".join(self.DEVICE_SPECS)
|
|
data['pubAppKey'] = self.appdata.app_privkey.pubkey_b64
|
|
logger.debug("Registration(UPDATE): encrypting app data.")
|
|
jwe_req = self.jwe_handler.req_jwe_post(url, "registerUpdate", data,
|
|
auth=True)
|
|
logger.debug("Registration(UPDATE): sending app data.")
|
|
ans = self._send_req(jwe_req)
|
|
if ans.status_code != 200:
|
|
raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
|
|
logger.debug("Registration(UPDATE): decrypting response.")
|
|
ans_json = self.jwe_handler.decrypt(ans.content)
|
|
logger.debug(f"Registration(UPDATE): decrypted response: {ans_json}")
|
|
if 'data' not in ans_json:
|
|
raise RequestFailure("Malformed request, missing 'data'.", ans)
|
|
if 'app-uuid' not in ans_json['data']:
|
|
raise RequestFailure("Malformed request, missing 'app-uuid'.", ans)
|
|
if 'otpSecretKey' not in ans_json['data']:
|
|
raise RequestFailure("Malformed request, missing 'otpSecretKey'.",
|
|
ans)
|
|
self.appdata['xkey_appuuid'] = ans_json['data']['app-uuid']
|
|
self.appdata['xkey_seed'] = ans_json['data']['otpSecretKey']
|
|
|
|
def register_xkey(self):
|
|
register_nonce = str(uuid4())
|
|
logger.debug(f"Starting registration (nonce: {register_nonce}).")
|
|
self._register_stage_init(register_nonce)
|
|
if self.appdata['xkey_seed']:
|
|
self._register_stage_update(register_nonce)
|
|
else:
|
|
self._register_stage_register(register_nonce)
|
|
self._register_stage_activate()
|