import json
|
|
from random import randint
|
|
from time import time
|
|
from uuid import uuid4
|
|
|
|
from jwcrypto.jwe import JWE
|
|
from pyotp import HOTP
|
|
from requests import Request
|
|
|
|
from .utils import sha256_base64, hmac_sha256
|
|
|
|
|
|
class JWEHandler:
|
|
def __init__(self, appdata):
|
|
self.appdata = appdata
|
|
|
|
def new_otp_specs(self):
|
|
generator = HOTP(self.appdata['xkey_seed'], 8, "SHA1")
|
|
counter = randint(0, 99999999)
|
|
otp_specs = {}
|
|
otp_specs['movingFactor'] = counter
|
|
otp_specs['otp'] = generator.generate_otp(counter)
|
|
otp_specs['type'] = "HMAC-SHA1"
|
|
return otp_specs
|
|
|
|
def encrypt(self, sub, data, auth=False):
|
|
now = int(time())
|
|
claims = {'data': data}
|
|
claims['sub'] = sub
|
|
claims['jti'] = str(uuid4())
|
|
claims['iat'] = now
|
|
claims['nbf'] = now
|
|
claims['exp'] = now + 60
|
|
claims['iss'] = "app-posteid-v3"
|
|
headers = {}
|
|
headers['cty'] = "JWE"
|
|
headers['typ'] = "JWT"
|
|
headers['alg'] = "RSA-OAEP-256"
|
|
headers['enc'] = "A256CBC-HS512"
|
|
if auth:
|
|
app_uuid = self.appdata['xkey_appuuid']
|
|
headers['kid'] = app_uuid
|
|
claims['kid-sha256'] = sha256_base64(app_uuid)
|
|
claims['otp-specs'] = self.new_otp_specs()
|
|
jwe_token = JWE(json.dumps(claims),
|
|
protected=headers,
|
|
recipient=self.appdata.serv_pubkey.jwk)
|
|
return jwe_token.serialize(compact=True)
|
|
|
|
def req_jwe_post(self, url, sub, data, auth=False):
|
|
jwe_token = self.encrypt(sub, data, auth)
|
|
headers = {'Content-Type': "application/json; charset=UTF-8"}
|
|
req = Request('POST', url, headers, data=jwe_token)
|
|
return req
|
|
|
|
def req_jwe_bearer(self, url, sub, data, auth=False):
|
|
jwe_token = self.encrypt(sub, data, auth)
|
|
headers = {'Content-Type': "",
|
|
'Authorization': "Bearer " + jwe_token}
|
|
req = Request('GET', url, headers)
|
|
return req
|
|
|
|
def decrypt(self, serialized_jwe_token):
|
|
if isinstance(serialized_jwe_token, bytes):
|
|
serialized_jwe_token = serialized_jwe_token.decode('utf-8')
|
|
jwe_token = JWE()
|
|
jwe_token.deserialize(serialized_jwe_token)
|
|
jwe_token.decrypt(self.appdata.app_privkey.jwk)
|
|
return json.loads(jwe_token.payload)
|