From b7b41094138e47cdbae606543f0d0751848bd6be Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 19 Dec 2015 21:31:53 -0500 Subject: [PATCH 1/4] msg prefix python --- example/python/app.py | 2 +- example/python/tmsp/reader.py | 20 +++++++- example/python/tmsp/server.py | 96 +++++++++++++++++++++++++++++------ 3 files changed, 101 insertions(+), 17 deletions(-) diff --git a/example/python/app.py b/example/python/app.py index a9e9d3b75..2875fcd6c 100644 --- a/example/python/app.py +++ b/example/python/app.py @@ -41,7 +41,7 @@ class CounterAppContext(): txByteArray = bytearray(txBytes) if len(txBytes) >= 2 and txBytes[:2] == "0x": txByteArray = hex2bytes(txBytes[2:]) - txValue = decode_big_endian(BytesReader(txByteArray), len(txBytes)) + txValue = decode_big_endian(BytesBuffer(txByteArray), len(txBytes)) if txValue != self.txCount: return None, 1 self.txCount += 1 diff --git a/example/python/tmsp/reader.py b/example/python/tmsp/reader.py index f1b3dfae4..3b1f87fcb 100644 --- a/example/python/tmsp/reader.py +++ b/example/python/tmsp/reader.py @@ -1,14 +1,32 @@ # Simple read() method around a bytearray -class BytesReader(): +class BytesBuffer(): def __init__(self, b): self.buf = b + self.readCount = 0 + + def count(self): + return self.readCount + + def reset_count(self): + self.readCount = 0 + + def size(self): + return len(self.buf) + + def peek(self): + return self.buf[0] + + def write(self, b): + # b should be castable to byte array + self.buf += bytearray(b) def read(self, n): if len(self.buf) < n: print "reader err: buf less than n" # TODO: exception return + self.readCount += n r = self.buf[:n] self.buf = self.buf[n:] return r diff --git a/example/python/tmsp/server.py b/example/python/tmsp/server.py index e25e4e1fd..0beb59d13 100644 --- a/example/python/tmsp/server.py +++ b/example/python/tmsp/server.py @@ -8,11 +8,29 @@ from wire import * from reader import * from msg import * +# hold the asyncronous state of a connection +# ie. we may not get enough bytes on one read to decode the message +class Connection(): + def __init__(self, fd, appCtx): + self.fd = fd + self.appCtx = appCtx + self.recBuf = BytesBuffer(bytearray()) + self.resBuf = BytesBuffer(bytearray()) + self.msgLength = 0 + self.decoder = RequestDecoder(self.recBuf) + self.inProgress = False # are we in the middle of a message + + def recv(this): + data = this.fd.recv(1024) + if not data: # what about len(data) == 0 + raise IOError("dead connection") + this.recBuf.write(data) + # TMSP server responds to messges by calling methods on the app class TMSPServer(): def __init__(self, app, port=5410): self.app = app - self.appMap = {} # map conn file descriptors to (appContext, msgDecoder) + self.appMap = {} # map conn file descriptors to (appContext, reqBuf, resBuf, msgDecoder) self.port = port self.listen_backlog = 10 @@ -31,12 +49,13 @@ class TMSPServer(): def handle_new_connection(self, r): new_fd, new_addr = r.accept() + new_fd.setblocking(0) # non-blocking self.read_list.append(new_fd) self.write_list.append(new_fd) print 'new connection to', new_addr appContext = self.app.open() - self.appMap[new_fd] = (appContext, RequestDecoder(ConnReader(new_fd))) + self.appMap[new_fd] = Connection(new_fd, appContext) def handle_conn_closed(self, r): self.read_list.remove(r) @@ -45,25 +64,63 @@ class TMSPServer(): print "connection closed" def handle_recv(self, r): - appCtx, conn = self.appMap[r] - response = bytearray() +# appCtx, recBuf, resBuf, conn + conn = self.appMap[r] while True: try: - # first read the request type and get the msg decoder - typeByte = conn.reader.read(1) + print "recv loop" + # check if we need more data first + if conn.inProgress: + if conn.msgLength == 0 or conn.recBuf.size() < conn.msgLength: + conn.recv() + else: + if conn.recBuf.size() == 0: + conn.recv() + + conn.inProgress = True + + # see if we have enough to get the message length + if conn.msgLength == 0: + ll = conn.recBuf.peek() + if conn.recBuf.size() < 1 + ll: + # we don't have enough bytes to read the length yet + return + print "decoding msg length" + conn.msgLength = decode_varint(conn.recBuf) + + # see if we have enough to decode the message + if conn.recBuf.size() < conn.msgLength: + return + + # now we can decode the message + + # first read the request type and get the particular msg decoder + typeByte = conn.recBuf.read(1) typeByte = int(typeByte[0]) resTypeByte = typeByte+0x10 req_type = message_types[typeByte] if req_type == "flush": - response += bytearray([resTypeByte]) - sent = r.send(str(response)) + # messages are length prefixed + conn.resBuf.write(encode(1)) + conn.resBuf.write([resTypeByte]) + sent = conn.fd.send(str(conn.resBuf.buf)) + conn.msgLength = 0 + conn.inProgress = False + conn.resBuf = BytesBuffer(bytearray()) return - decoder = getattr(conn, req_type) + decoder = getattr(conn.decoder, req_type) + print "decoding args" req_args = decoder() - req_f = getattr(appCtx, req_type) + print "got args", req_args + + # done decoding message + conn.msgLength = 0 + conn.inProgress = False + + req_f = getattr(conn.appCtx, req_type) if req_args == None: res = req_f() elif isinstance(req_args, tuple): @@ -82,9 +139,18 @@ class TMSPServer(): print "non-zero retcode:", ret_code if req_type in ("echo", "info"): # these dont return a ret code - response += bytearray([resTypeByte]) + encode(res) + enc = encode(res) + # messages are length prefixed + conn.resBuf.write(encode(len(enc) + 1)) + conn.resBuf.write([resTypeByte]) + conn.resBuf.write(enc) else: - response += bytearray([resTypeByte]) + encode(ret_code) + encode(res) + enc, encRet = encode(res), encode(ret_code) + # messages are length prefixed + conn.resBuf.write(encode(len(enc)+len(encRet)+1)) + conn.resBuf.write([resTypeByte]) + conn.resBuf.write(encRet) + conn.resBuf.write(enc) except TypeError as e: print "TypeError on reading from connection:", e self.handle_conn_closed(r) @@ -97,8 +163,8 @@ class TMSPServer(): print "IOError on reading from connection:", e self.handle_conn_closed(r) return - except: - print "error reading from connection", sys.exc_info()[0] # TODO better + except Exception as e: + print "error reading from connection", str(e) # sys.exc_info()[0] # TODO better self.handle_conn_closed(r) return @@ -112,7 +178,7 @@ class TMSPServer(): self.handle_new_connection(r) # undo adding to read list ... - except NameError as e: + except rameError as e: print "Could not connect due to NameError:", e except TypeError as e: print "Could not connect due to TypeError:", e From d560c1d4552b49eff9a014f386783fb903683373 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 18 Dec 2015 20:44:48 -0500 Subject: [PATCH 2/4] example/js --- example/js/app.js | 87 ++++++++++++++++++++++++++++ example/js/msgs.js | 64 +++++++++++++++++++++ example/js/server.js | 130 ++++++++++++++++++++++++++++++++++++++++++ example/js/wire.js | 112 ++++++++++++++++++++++++++++++++++++ tests/test.sh | 4 ++ tests/test_counter.sh | 4 +- 6 files changed, 399 insertions(+), 2 deletions(-) create mode 100644 example/js/app.js create mode 100644 example/js/msgs.js create mode 100644 example/js/server.js create mode 100644 example/js/wire.js diff --git a/example/js/app.js b/example/js/app.js new file mode 100644 index 000000000..0ddd16ade --- /dev/null +++ b/example/js/app.js @@ -0,0 +1,87 @@ +server = require("./server") +wire = require("./wire") +util = require("util") + +function CounterApp(){ + this.hashCount = 0; + this.txCount = 0; + this.commitCount = 0; +}; + +CounterApp.prototype.open = function(){ + return new CounterAppContext(this); +} + +function CounterAppContext(app) { + this.hashCount = app.hashCount; + this.txCount = app.txCount; + this.commitCount = app.commitCount; + this.serial = false; +} + +CounterAppContext.prototype.echo = function(msg){ + return {"response": msg, "ret_code":0} +} + +CounterAppContext.prototype.info = function(){ + return {"response": [util.format("hash, tx, commit counts: %d, %d, %d", this.hashCount, this.txCount, this.commitCount)]} +} + +CounterAppContext.prototype.set_option = function(key, value){ + if (key == "serial" && value == "on"){ + this.serial = true; + } + return {"ret_code":0} +} + +CounterAppContext.prototype.append_tx = function(txBytes){ + if (this.serial) { + txByteArray = txBytes + if (txByte.length >= 2 && txBytes.slice(0, 2) == "0x") { + txByteArray = wire.hex2bytes(txBytes.slice(2)); + } + r = new wire.BytesReader(txByteArray) + txValue = decode_big_endian(r, txBytes.length) + if (txValue != this.txcount){ + return {"ret_code":1} + } + } + this.txCount += 1; + return {"ret_code":0} // TODO: return events +} + +CounterAppContext.prototype.get_hash = function(){ + this.hashCount += 1; + if (this.txCount == 0){ + return {"response": "", "ret_code":0} + } + h = wire.encode_big_endian(this.txCount, 8); + h = wire.reverse(h); // TODO + return {"response": h.toString(), "ret_code":0} +} + +CounterAppContext.prototype.commit = function(){ + this.commitCount += 1; + return {"ret_code":0} +} + +CounterAppContext.prototype.rollback = function(){ + return {"ret_code":0} +} + +CounterAppContext.prototype.add_listener = function(){ + return {"ret_code":0} +} + +CounterAppContext.prototype.rm_listener = function(){ + return {"ret_code":0} +} + +CounterAppContext.prototype.event = function(){ +} + +console.log("Counter app in Javascript") + +var app = new CounterApp(); +var appServer = new server.AppServer(app); +appServer.server.listen(46658) diff --git a/example/js/msgs.js b/example/js/msgs.js new file mode 100644 index 000000000..a8b90ecd8 --- /dev/null +++ b/example/js/msgs.js @@ -0,0 +1,64 @@ +wire = require("./wire") + +module.exports = { + types : { + 0x01 : "echo", + 0x02 : "flush", + 0x03 : "info", + 0x04 : "set_option", + 0x21 : "append_tx", + 0x22 : "get_hash", + 0x23 : "commit", + 0x24 : "rollback", + 0x25 : "add_listener", + 0x26 : "rm_listener", + }, + + decoder : RequestDecoder, + + buffer: BytesBuffer + +} + +function RequestDecoder(buf){ + this.buf= buf +} + +var decode_string = wire.decode_string + +// return nothing, one thing, or a list of things +RequestDecoder.prototype.echo = function(){ return decode_string(this.buf) }; +RequestDecoder.prototype.flush = function(){}; +RequestDecoder.prototype.info = function(){}; +RequestDecoder.prototype.set_option = function(){ return [decode_string(this.buf), decode_string(this.buf)] }; +RequestDecoder.prototype.append_tx = function(){ return decode_string(this.buf)}; +RequestDecoder.prototype.get_hash = function(){ }; +RequestDecoder.prototype.commit = function(){ }; +RequestDecoder.prototype.rollback = function(){ }; +RequestDecoder.prototype.add_listener = function(){ }; // TODO +RequestDecoder.prototype.rm_listener = function(){ }; // TODO + +// buffered reader with read(n) method +function BytesBuffer(buf){ + this.buf = buf +} + +BytesBuffer.prototype.read = function(n){ + b = this.buf.slice(0, n) + this.buf = this.buf.slice(n) + return b +}; + +BytesBuffer.prototype.write = function(buf){ + this.buf = Buffer.concat([this.buf, buf]); +}; + + +BytesBuffer.prototype.size = function(){ + return this.buf.length +} + +BytesBuffer.prototype.peek = function(){ + return this.buf[0] +} + diff --git a/example/js/server.js b/example/js/server.js new file mode 100644 index 000000000..57af143e6 --- /dev/null +++ b/example/js/server.js @@ -0,0 +1,130 @@ + +// Load the TCP Library +net = require('net'); +msg = require('./msgs'); +wire = require("./wire") + +// Takes an application and handles tmsp connection +// which invoke methods on the app +function AppServer(app){ + // set the app for the socket handler + this.app = app; + + // create a server by providing callback for + // accepting new connection and callbacks for + // connection events ('data', 'end', etc.) + this.createServer() +} + +module.exports = { AppServer: AppServer }; + +AppServer.prototype.createServer = function(){ + app = this.app + conns = {} // map sockets to their state + + // define the socket handler + this.server = net.createServer(function(socket){ + socket.name = socket.remoteAddress + ":" + socket.remotePort + console.log("new connection from", socket.name) + + appCtx = app.open() + + var conn = { + recBuf: new msg.buffer(new Buffer(0)), + resBuf: new msg.buffer(new Buffer(0)), + msgLength: 0, + inProgress: false + } + conns[socket] = conn + + // Handle tmsp requests. + socket.on('data', function (data) { + + if (data.length == 0){ + // TODO err + console.log("empty data!") + return + } + conn = conns[socket] + + // we received data. append it + conn.recBuf.write(data) + + while ( conn.recBuf.size() > 0 ){ + + if (conn.msgLength == 0){ + ll = conn.recBuf.peek(); + if (conn.recBuf.size() < 1 + ll){ + // don't have enough bytes to read length yet + return + } + conn.msgLength = wire.decode_varint(conn.recBuf) + } + + if (conn.recBuf.size() < conn.msgLength) { + // don't have enough to decode the message + return + } + + // now we can decode + typeByte = conn.recBuf.read(1); + resTypeByte = typeByte[0] + 0x10 + reqType = msg.types[typeByte[0]]; + + if (reqType == "flush"){ + // msgs are length prefixed + conn.resBuf.write(wire.encode(1)); + conn.resBuf.write(new Buffer([resTypeByte])) + n = socket.write(conn.resBuf.buf); + conn.msgLength = 0; + conn.resBuf = new msg.buffer(new Buffer(0)); + return + } + + // decode args + decoder = new msg.decoder(conn.recBuf); + args = decoder[reqType](); + + // done decoding + conn.msgLength = 0 + + // NOTE: this throws of the "this"'s in app.js + //reqFunc = appCtx[reqType]; + var res = function(){ + if (args == null){ + return appCtx[reqType](); + } else if (Array.isArray(args)){ + return appCtx[reqType].apply(this, args); + } else { + return appCtx[reqType](args) + } + }() + + + var retCode = res["ret_code"] + var res = res["response"] + + if (retCode != null && retCode != 0){ + console.log("non-zero ret code", retCode) + } + + + if (reqType == "echo" || reqType == "info"){ + enc = Buffer.concat([new Buffer([resTypeByte]), wire.encode(res)]); + // length prefixed + conn.resBuf.write(wire.encode(enc.length)); + conn.resBuf.write(enc); + } else { + enc = Buffer.concat([new Buffer([resTypeByte]), wire.encode(retCode), wire.encode(res)]); + conn.resBuf.write(wire.encode(enc.length)); + conn.resBuf.write(enc); + } + } + }); + + socket.on('end', function () { + console.log("connection ended") + }); + }) +} + diff --git a/example/js/wire.js b/example/js/wire.js new file mode 100644 index 000000000..87cc9234e --- /dev/null +++ b/example/js/wire.js @@ -0,0 +1,112 @@ +math = require("math") + +module.exports = { + decode_string: decode_string, + decode_varint: decode_varint, + encode_big_endian: encode_big_endian, + encode: encode, + reverse: reverse, +} + +function reverse(buf){ + for (var i = 0; i < buf.length/2; i++){ + a = buf[i]; + b = buf[buf.length-1 - i]; + buf[i] = b; + buf[buf.length-1 - i] = a; + } + return buf +} + +function uvarint_size(i){ + if (i == 0){ + return 0 + } + + for(var j = 1; j < 9; j++) { + if ( i < 1< 0xF0){ negate = true } + if (negate) { size = size - 0xF0 } + i = decode_big_endian(reader, size); + if (negate) { i = i * -1} + return i +} + +function encode_list(l){ + var l2 = l.map(encode); + var buf = new Buffer(encode_varint(l2.length)); + return Buffer.concat([buf, Buffer.concat(l2)]); +} + +function encode(b){ + if (b == null){ + return Buffer(0) + } else if (typeof b == "number"){ + return encode_varint(b) + } else if (typeof b == "string"){ + return encode_string(b) + } else if (Array.isArray(b)){ + return encode_list(b) + } else{ + console.log("UNSUPPORTED TYPE!", typeof b, b) + } +} + + + + + diff --git a/tests/test.sh b/tests/test.sh index 848d14cbc..9ee9be135 100644 --- a/tests/test.sh +++ b/tests/test.sh @@ -11,3 +11,7 @@ bash tests/test_counter.sh # test python counter cd example/python COUNTER_APP="python app.py" bash $ROOT/tests/test_counter.sh + +# test js counter +cd ../js +COUNTER_APP="node app.js" bash $ROOT/tests/test_counter.sh diff --git a/tests/test_counter.sh b/tests/test_counter.sh index 41909dde8..8bbd8d376 100644 --- a/tests/test_counter.sh +++ b/tests/test_counter.sh @@ -11,7 +11,7 @@ $COUNTER_APP &> /dev/null & PID=`echo $!` if [[ "$?" != 0 ]]; then - echo "Error running tmsp command" + echo "Error running tmsp app" echo $OUTPUT exit 1 fi @@ -24,7 +24,7 @@ append_tx abc STDIN` if [[ "$?" != 0 ]]; then - echo "Error running tmsp command" + echo "Error running tmsp batch command" echo $OUTPUT exit 1 fi From dc75b71f55211349fc36d8fe0001a0183fb2ae7a Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 21 Dec 2015 18:12:38 -0500 Subject: [PATCH 3/4] tmsp_cli -> tmsp-cli --- cmd/{tmsp_cli => tmsp-cli}/tmsp_cli.go | 0 example/python/app.py | 1 + 2 files changed, 1 insertion(+) rename cmd/{tmsp_cli => tmsp-cli}/tmsp_cli.go (100%) diff --git a/cmd/tmsp_cli/tmsp_cli.go b/cmd/tmsp-cli/tmsp_cli.go similarity index 100% rename from cmd/tmsp_cli/tmsp_cli.go rename to cmd/tmsp-cli/tmsp_cli.go diff --git a/example/python/app.py b/example/python/app.py index 2875fcd6c..a439aa748 100644 --- a/example/python/app.py +++ b/example/python/app.py @@ -56,6 +56,7 @@ class CounterAppContext(): return str(h), 0 def commit(self): + self.commitCount += 1 return 0 def rollback(self): From 3fb3a81b929c77a16a318d6167c460d8f9a4ba28 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 21 Dec 2015 18:47:14 -0500 Subject: [PATCH 4/4] fix js --- example/js/app.js | 10 +++++----- example/js/server.js | 4 +--- example/js/wire.js | 1 + 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/example/js/app.js b/example/js/app.js index 0ddd16ade..f7def6a02 100644 --- a/example/js/app.js +++ b/example/js/app.js @@ -36,13 +36,13 @@ CounterAppContext.prototype.set_option = function(key, value){ CounterAppContext.prototype.append_tx = function(txBytes){ if (this.serial) { - txByteArray = txBytes - if (txByte.length >= 2 && txBytes.slice(0, 2) == "0x") { + txByteArray = new Buffer(txBytes) + if (txBytes.length >= 2 && txBytes.slice(0, 2) == "0x") { txByteArray = wire.hex2bytes(txBytes.slice(2)); } - r = new wire.BytesReader(txByteArray) - txValue = decode_big_endian(r, txBytes.length) - if (txValue != this.txcount){ + r = new msg.buffer(txByteArray) + txValue = wire.decode_big_endian(r, txBytes.length) + if (txValue != this.txCount){ return {"ret_code":1} } } diff --git a/example/js/server.js b/example/js/server.js index 57af143e6..4ca870d9d 100644 --- a/example/js/server.js +++ b/example/js/server.js @@ -88,13 +88,11 @@ AppServer.prototype.createServer = function(){ // done decoding conn.msgLength = 0 - // NOTE: this throws of the "this"'s in app.js - //reqFunc = appCtx[reqType]; var res = function(){ if (args == null){ return appCtx[reqType](); } else if (Array.isArray(args)){ - return appCtx[reqType].apply(this, args); + return appCtx[reqType].apply(appCtx, args); } else { return appCtx[reqType](args) } diff --git a/example/js/wire.js b/example/js/wire.js index 87cc9234e..13ee55292 100644 --- a/example/js/wire.js +++ b/example/js/wire.js @@ -3,6 +3,7 @@ math = require("math") module.exports = { decode_string: decode_string, decode_varint: decode_varint, + decode_big_endian: decode_big_endian, encode_big_endian: encode_big_endian, encode: encode, reverse: reverse,