Implementazione open-source del protocollo di Strong Customer Authentication di Poste Italiane, (https://posteid.poste.it), lato client.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

323 lines
14 KiB

2 years ago
  1. import json
  2. import logging
  3. import re
  4. from base64 import b32encode
  5. from getpass import getpass
  6. from urllib.parse import urljoin
  7. from uuid import uuid4
  8. import requests
  9. from pyotp import TOTP
  10. from .appdata import AppData
  11. from .jwe_handler import JWEHandler
  12. from .utils import RequestFailure, hmac_sha256
  13. logger = logging.getLogger(__name__)
  14. class NotInitializedPIN(Exception):
  15. pass
  16. class AuthError(Exception):
  17. pass
  18. class SCA:
  19. LOGIN_URL = "https://posteid.poste.it/jod-securelogin-schema/"
  20. AUTH_URL = "https://posteid.poste.it/jod-login-schema/"
  21. SH_URL = ("https://sh2-web-posteid.poste.it/jod-secure-holder2-web"
  22. "/public/app/")
  23. def __init__(self, profile_name):
  24. self.appdata = AppData(profile_name)
  25. self.s = requests.Session()
  26. self.s.headers = {'Accept-Encoding': "gzip",
  27. 'User-Agent': "okhttp/3.12.1",
  28. 'Connection': "keep-alive"}
  29. self.jwe_handler = JWEHandler(self.appdata)
  30. self.reg_token = None
  31. self.profile_token = None
  32. self.access_token = None
  33. def _send_req(self, request):
  34. prepped = self.s.prepare_request(request)
  35. return self.s.send(prepped)
  36. def _parse_jwe_response(self, ans):
  37. if ans.status_code != 200:
  38. raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
  39. if not ans.headers.get('Content-Type').startswith("application/json"):
  40. raise RequestFailure("Response not JSON.", ans)
  41. ans_json = ans.json()
  42. if not ans_json.get('command-success'):
  43. raise RequestFailure(f"Command failed: {ans_json}", ans,
  44. error_code=ans_json.get('command-error-code'))
  45. if ans_json.get('command-result-type') not in ["JWE", "JSON"]:
  46. raise RequestFailure(f"Result is not JWE/JSON: {ans_json}", ans)
  47. return ans_json.get('command-result')
  48. def _parse_v4(self, ans):
  49. if ans.status_code != 200:
  50. raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
  51. if not ans.headers.get('Content-Type').startswith("application/json"):
  52. raise RequestFailure("Response not JSON.", ans)
  53. ans_json = ans.json()
  54. if (ans_json.get('status')
  55. not in ["v4_success", "v4_pending", "v4_signed"]):
  56. if ans_json.get('status') == "v4_error":
  57. reason = ans_json.get('reason')
  58. raise RequestFailure(f"Request v4 error: {reason}.", ans,
  59. error_code=reason)
  60. raise RequestFailure("Unknown v4 failure.", ans)
  61. return ans_json
  62. def _sign_challenge_v4(self, challenge, userpin):
  63. userpin = str(userpin)
  64. signature = {}
  65. key = self.appdata['sca_seed'] + userpin + challenge['randK']
  66. message = challenge['transaction-challenge']
  67. signature['jti'] = challenge['jti']
  68. signature['signature'] = hmac_sha256(key, message)
  69. signature['userpin'] = userpin
  70. signature['authzTool'] = "POSTEID"
  71. return json.dumps(signature)
  72. def _enrol_stage_basiclogin(self, username, password):
  73. url = urljoin(self.LOGIN_URL, "v4/xmobileauthjwt")
  74. data = {}
  75. data['password'] = password
  76. data['authLevel'] = "0"
  77. data['userid'] = username
  78. logger.debug("Enrol(basiclogin): sending username and password.")
  79. jwe_req = self.jwe_handler.req_jwe_bearer(url, 'login', data,
  80. auth=True)
  81. ans = self._send_req(jwe_req)
  82. if ans.status_code == 401:
  83. raise AuthError("Wrong username or password!")
  84. if ans.status_code != 200:
  85. raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
  86. if 'X-PI' not in ans.headers:
  87. raise RequestFailure(f"Malformed request, missing 'X-PI'.", ans)
  88. uid = ans.headers.get('X-PI')
  89. logger.debug(f"Enrol(basiclogin): completed, uid='{uid}'.")
  90. def _enrol_stage_req_sms_otp(self, username):
  91. url = urljoin(self.LOGIN_URL, "v4/xmobileauthjwt")
  92. data = {}
  93. data['password'] = str(uuid4())
  94. data['authLevel'] = "3"
  95. data['userid'] = username
  96. logger.debug("Enrol(SMS-OTP-REQ): requesting SMS-OTP verification..")
  97. jwe_req = self.jwe_handler.req_jwe_bearer(url, 'login', data,
  98. auth=True)
  99. ans = self._send_req(jwe_req)
  100. if ans.status_code != 200:
  101. raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
  102. if 'X-TEL' not in ans.headers:
  103. raise RequestFailure(f"Malformed request, missing 'X-TEL'.", ans)
  104. logger.debug("Enrol(SMS-OTP-REQ): SMS-OTP verification requested.")
  105. tel = ans.headers.get('X-TEL')
  106. logger.debug(f"Enrol(SMS-OTP-REQ): SMS-OTP sent to ***{tel}.")
  107. return tel
  108. def _enrol_stage_send_sms_otp(self, otp):
  109. url = urljoin(self.LOGIN_URL, "v4/xmobileauthjwt")
  110. data = {}
  111. data['otp'] = str(otp)
  112. data['authLevel'] = "2"
  113. data['nonce'] = str(uuid4())
  114. logger.debug("Enrol(SMS-OTP-AUTH): authenticating SMS-OTP.")
  115. jwe_req = self.jwe_handler.req_jwe_bearer(url, 'login', data,
  116. auth=True)
  117. ans = self._send_req(jwe_req)
  118. if ans.status_code == 401:
  119. raise AuthError("Wrong SMS-OTP code!")
  120. if ans.status_code != 200:
  121. raise RequestFailure(f"Wrong status code: {ans.status_code}", ans)
  122. if 'X-RESULT' not in ans.headers:
  123. raise RequestFailure(
  124. f"Malformed request, missing 'X-RESULT'.", ans)
  125. logger.debug("Enrol(SMS-OTP-AUTH): decrypting response.")
  126. ans_json = self.jwe_handler.decrypt(ans.headers.get('X-RESULT'))
  127. logger.debug(f"Enrol(SMS-OTP-AUTH): decrypted response: {ans_json}")
  128. if 'data' not in ans_json:
  129. raise RequestFailure("Malformed request, missing 'data'.", ans)
  130. if 'token' not in ans_json['data']:
  131. raise RequestFailure("Malformed request, missing 'token'.", ans)
  132. reg_token = ans_json['data']['token']
  133. logger.debug(f"Enrol(SMS-OTP-AUTH): got reg token: {reg_token}.")
  134. self.reg_token = reg_token
  135. def _enrol_stage_finalize(self, userpin=""):
  136. url = urljoin(self.SH_URL, "v1/registerApp")
  137. data = {}
  138. data['userPIN'] = userpin
  139. data['idpAccessToken'] = ""
  140. data['registerToken'] = self.reg_token
  141. logger.debug("Enrol(REG-APP): encrypting request.")
  142. print(data)
  143. jwe_req = self.jwe_handler.req_jwe_post(
  144. url, 'registerApp', data, auth=True)
  145. logger.debug("Enrol(REG-APP): sending request.")
  146. ans = self._send_req(jwe_req)
  147. try:
  148. ans_jwe = self._parse_jwe_response(ans)
  149. except RequestFailure as e:
  150. if e.error_code == "PIN-ERR-1":
  151. raise NotInitializedPIN()
  152. raise e
  153. ans_json = self.jwe_handler.decrypt(ans_jwe)
  154. if 'data' not in ans_json:
  155. raise RequestFailure("Malformed request, missing 'data'.", ans)
  156. if 'appRegisterID' not in ans_json['data']:
  157. raise RequestFailure(
  158. "Malformed request, missing 'appRegisterID'.", ans)
  159. if 'secretAPP' not in ans_json['data']:
  160. raise RequestFailure(
  161. "Malformed request, missing 'secretAPP'.", ans)
  162. self.appdata['sca_app_regid'] = ans_json['data']['appRegisterID']
  163. self.appdata['sca_seed'] = ans_json['data']['secretAPP']
  164. logger.debug("Enrol(REG-APP): completed successfully!")
  165. def check_register(self):
  166. url = urljoin(self.SH_URL, "v1/checkRegisterApp")
  167. data = {'appRegisterID': self.appdata['sca_app_regid']}
  168. jwe_req = self.jwe_handler.req_jwe_post(url, "checkRegisterApp", data,
  169. auth=True)
  170. ans = self._send_req(jwe_req)
  171. result = self._parse_jwe_response(ans)
  172. if 'valid' not in result:
  173. raise RequestFailure(
  174. "Malformed request, missing 'valid'.", ans)
  175. return result["valid"]
  176. def enrol_sms_start(self, username, password):
  177. self._enrol_stage_basiclogin(username, password)
  178. tel = self._enrol_stage_req_sms_otp(username)
  179. return tel
  180. def enrol_sms_finish(self, otp, force_userpin=None):
  181. self._enrol_stage_send_sms_otp(otp)
  182. try:
  183. self._enrol_stage_finalize()
  184. except NotInitializedPIN as e:
  185. if force_userpin:
  186. logger.info('Enrol(REG-APP): Setting the new provided PIN.')
  187. self._enrol_stage_finalize(force_userpin)
  188. return
  189. raise e
  190. def _pin_login(self, userpin):
  191. url_challenge = urljoin(self.LOGIN_URL,
  192. "secureholder/v4/native/challenge")
  193. url_authorize = urljoin(self.LOGIN_URL,
  194. "secureholder/v4/native/az")
  195. logger.debug("PINLogin: acquiring v4 challenge.")
  196. req = self.jwe_handler.req_jwe_post(url_challenge, "login", {},
  197. auth=True)
  198. ans = self._send_req(req)
  199. ans = self._parse_v4(ans)
  200. logger.debug(f"PINLogin: got v4 challenge: {ans}.")
  201. logger.debug(f"PINLogin: preparing challenge response.")
  202. data = {}
  203. data['signature'] = self._sign_challenge_v4(ans, userpin)
  204. data['appRegisterID'] = self.appdata['sca_app_regid']
  205. logger.debug(f"PINLogin: sending response {data}.")
  206. req = self.jwe_handler.req_jwe_post(url_authorize, "login", data,
  207. auth=True)
  208. ans = self._send_req(req)
  209. try:
  210. ans = self._parse_v4(ans)
  211. except RequestFailure as e:
  212. if e.error_code == "PIN-ERR-3":
  213. raise AuthError("Codice PosteID errato!")
  214. elif e.error_code == "CERT-ERR-2":
  215. raise AuthError("Codice PosteID bloccato per troppi errori!")
  216. raise(e)
  217. self.profile_token = ans['profile_token']
  218. self.access_token = ans['access_token']
  219. logger.debug(f"PINLogin: logged in, session token aquired.")
  220. def _unenrol(self):
  221. if not self.access_token:
  222. raise Exception(
  223. 'Need to acquire an access token (pin_login) before.')
  224. logger.debug("Unenrol: unenrolling device.")
  225. url = urljoin(self.LOGIN_URL, "secureholder/v4/native/delete-posteid")
  226. jwe_req = self.jwe_handler.req_jwe_bearer(url, "delete_posteid", {},
  227. auth=True)
  228. jwe_req.headers['Authorization'] = "Bearer " + self.access_token
  229. ans = self._send_req(jwe_req)
  230. ans = self._parse_v4(ans)
  231. logger.debug("Unenrol: device unenrolled.")
  232. def list_txs(self):
  233. if not self.access_token:
  234. raise Exception(
  235. "Need to acquire an access token (pin_login) before.")
  236. url = urljoin(self.LOGIN_URL,
  237. "secureholder/v4/native/list-transaction")
  238. jwe_req = self.jwe_handler.req_jwe_post(url, "login", {}, auth=True)
  239. jwe_req.headers['Authorization'] = "Bearer " + self.access_token
  240. ans = self._send_req(jwe_req)
  241. ans_json = self._parse_v4(ans)
  242. if 'transaction' not in ans_json:
  243. raise RequestFailure(
  244. f"Malformed response, missing 'transaction'.", ans)
  245. return ans_json['transaction']
  246. def authorize_tx_start(self, tx_id):
  247. data = {}
  248. data['jti'] = tx_id
  249. data['appRegisterID']: self.appdata['sca_app_regid']
  250. url = urljoin(self.AUTH_URL, "secureholder/v4/challenge")
  251. jwe_req = self.jwe_handler.req_jwe_post(url, "login", data, auth=True)
  252. ans = self._send_req(jwe_req)
  253. ans_json = self._parse_v4(ans)
  254. if ans_json['status'] != "v4_pending":
  255. raise Exception(
  256. f"TX Status is '{ans_json['status']}', not pending.")
  257. if 'jti' not in ans_json:
  258. raise RequestFailure(f"Malformed response, missing 'jti'.", ans)
  259. if 'randK' not in ans_json:
  260. raise RequestFailure(f"Malformed response, missing 'randK'.", ans)
  261. if 'transaction-challenge' not in ans_json:
  262. raise RequestFailure(
  263. f"Malformed response, missing 'transaction-challenge'.", ans)
  264. return ans_json
  265. def authorize_tx_finish(self, challenge, userpin):
  266. signature = self._sign_challenge_v4(challenge, userpin)
  267. data = {}
  268. data['signature'] = signature
  269. data['appRegisterID'] = self.appdata['sca_app_regid']
  270. url = urljoin(self.AUTH_URL, "secureholder/v4/az")
  271. jwe_req = self.jwe_handler.req_jwe_post(url, "login", data, auth=True)
  272. ans = self._send_req(jwe_req)
  273. try:
  274. ans_json = self._parse_v4(ans)
  275. except RequestFailure as e:
  276. if e.error_code == "PIN-ERR-3":
  277. raise AuthError("Codice PosteID errato!")
  278. elif e.error_code == "CERT-ERR-2":
  279. raise AuthError("Codice PosteID bloccato per troppi errori!")
  280. raise(e)
  281. if ans_json['status'] != "v4_signed":
  282. raise Exception(
  283. f"TX Status is '{ans_json['status']}', not signed.")
  284. def unenrol(self, userpin):
  285. self._pin_login(userpin)
  286. self._unenrol()
  287. @property
  288. def totp(self):
  289. seed = self.appdata['sca_seed']
  290. seed = b32encode(seed.encode("utf-8") + b'\0' * 32)
  291. seed = seed.decode("utf-8").replace('=', '')
  292. totp = TOTP(seed, interval=120)
  293. return totp