|
|
# the decoder works off a reader
|
|
# the encoder returns bytearray
|
|
|
|
|
|
def hex2bytes(h):
|
|
return bytearray(h.decode('hex'))
|
|
|
|
|
|
def bytes2hex(b):
|
|
if type(b) in (str, str):
|
|
return "".join([hex(ord(c))[2:].zfill(2) for c in b])
|
|
else:
|
|
return bytes2hex(b.decode())
|
|
|
|
|
|
# expects uvarint64 (no crazy big nums!)
|
|
def uvarint_size(i):
|
|
if i == 0:
|
|
return 0
|
|
for j in range(1, 8):
|
|
if i < 1 << j * 8:
|
|
return j
|
|
return 8
|
|
|
|
# expects i < 2**size
|
|
|
|
|
|
def encode_big_endian(i, size):
|
|
if size == 0:
|
|
return bytearray()
|
|
return encode_big_endian(i // 256, size - 1) + bytearray([i % 256])
|
|
|
|
|
|
def decode_big_endian(reader, size):
|
|
if size == 0:
|
|
return 0
|
|
firstByte = reader.read(1)[0]
|
|
return firstByte * (256 ** (size - 1)) + decode_big_endian(reader, size - 1)
|
|
|
|
# ints are max 16 bytes long
|
|
|
|
|
|
def encode_varint(i):
|
|
negate = False
|
|
if i < 0:
|
|
negate = True
|
|
i = -i
|
|
size = uvarint_size(i)
|
|
if size == 0:
|
|
return bytearray([0])
|
|
big_end = encode_big_endian(i, size)
|
|
if negate:
|
|
size += 0xF0
|
|
return bytearray([size]) + big_end
|
|
|
|
# returns the int and whats left of the byte array
|
|
|
|
|
|
def decode_varint(reader):
|
|
size = reader.read(1)[0]
|
|
if size == 0:
|
|
return 0
|
|
|
|
negate = True if size > int(0xF0) else False
|
|
if negate:
|
|
size = size - 0xF0
|
|
i = decode_big_endian(reader, size)
|
|
if negate:
|
|
i = i * (-1)
|
|
return i
|
|
|
|
|
|
def encode_string(s):
|
|
size = encode_varint(len(s))
|
|
return size + bytearray(s, 'utf8')
|
|
|
|
|
|
def decode_string(reader):
|
|
length = decode_varint(reader)
|
|
raw_data = reader.read(length)
|
|
return raw_data.decode()
|
|
|
|
|
|
def encode_list(s):
|
|
b = bytearray()
|
|
list(map(b.extend, list(map(encode, s))))
|
|
return encode_varint(len(s)) + b
|
|
|
|
|
|
def encode(s):
|
|
print('encoding', repr(s))
|
|
if s is None:
|
|
return bytearray()
|
|
if isinstance(s, int):
|
|
return encode_varint(s)
|
|
elif isinstance(s, str):
|
|
return encode_string(s)
|
|
elif isinstance(s, list):
|
|
return encode_list(s)
|
|
elif isinstance(s, bytearray):
|
|
return encode_string(s)
|
|
else:
|
|
print("UNSUPPORTED TYPE!", type(s), s)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
ns = [100, 100, 1000, 256]
|
|
ss = [2, 5, 5, 2]
|
|
bs = list(map(encode_big_endian, ns, ss))
|
|
ds = list(map(decode_big_endian, bs, ss))
|
|
print(ns)
|
|
print([i[0] for i in ds])
|
|
|
|
ss = ["abc", "hi there jim", "ok now what"]
|
|
e = list(map(encode_string, ss))
|
|
d = list(map(decode_string, e))
|
|
print(ss)
|
|
print([i[0] for i in d])
|