import json from random import randint from time import time from uuid import uuid4 from jwcrypto.jwe import JWE from pyotp import HOTP from requests import Request from .utils import sha256_base64, hmac_sha256 class JWEHandler: def __init__(self, appdata): self.appdata = appdata def new_otp_specs(self): generator = HOTP(self.appdata['xkey_seed'], 8, "SHA1") counter = randint(0, 99999999) otp_specs = {} otp_specs['movingFactor'] = counter otp_specs['otp'] = generator.generate_otp(counter) otp_specs['type'] = "HMAC-SHA1" return otp_specs def encrypt(self, sub, data, auth=False): now = int(time()) claims = {'data': data} claims['sub'] = sub claims['jti'] = str(uuid4()) claims['iat'] = now claims['nbf'] = now claims['exp'] = now + 60 claims['iss'] = "app-posteid-v3" headers = {} headers['cty'] = "JWE" headers['typ'] = "JWT" headers['alg'] = "RSA-OAEP-256" headers['enc'] = "A256CBC-HS512" if auth: app_uuid = self.appdata['xkey_appuuid'] headers['kid'] = app_uuid claims['kid-sha256'] = sha256_base64(app_uuid) claims['otp-specs'] = self.new_otp_specs() jwe_token = JWE(json.dumps(claims), protected=headers, recipient=self.appdata.serv_pubkey.jwk) return jwe_token.serialize(compact=True) def req_jwe_post(self, url, sub, data, auth=False): jwe_token = self.encrypt(sub, data, auth) headers = {'Content-Type': "application/json; charset=UTF-8"} req = Request('POST', url, headers, data=jwe_token) return req def req_jwe_bearer(self, url, sub, data, auth=False): jwe_token = self.encrypt(sub, data, auth) headers = {'Content-Type': "", 'Authorization': "Bearer " + jwe_token} req = Request('GET', url, headers) return req def decrypt(self, serialized_jwe_token): if isinstance(serialized_jwe_token, bytes): serialized_jwe_token = serialized_jwe_token.decode('utf-8') jwe_token = JWE() jwe_token.deserialize(serialized_jwe_token) jwe_token.decrypt(self.appdata.app_privkey.jwk) return json.loads(jwe_token.payload)