diff --git a/cmd/tmsp-cli/tmsp_cli.go b/cmd/tmsp-cli/tmsp_cli.go new file mode 100644 index 000000000..d90dd10b7 --- /dev/null +++ b/cmd/tmsp-cli/tmsp_cli.go @@ -0,0 +1,270 @@ +package main + +import ( + "bufio" + "encoding/hex" + "fmt" + "io" + "net" + "os" + "strings" + + . "github.com/tendermint/go-common" + "github.com/tendermint/go-wire" + "github.com/tendermint/tmsp/types" + + "github.com/codegangsta/cli" +) + +// connection is a global variable so it can be reused by the console +var conn net.Conn + +func main() { + app := cli.NewApp() + app.Name = "cli" + app.Usage = "cli [command] [args...]" + app.Flags = []cli.Flag{ + cli.StringFlag{ + Name: "address", + Value: "tcp://127.0.0.1:46658", + Usage: "address of application socket", + }, + } + app.Commands = []cli.Command{ + { + Name: "batch", + Usage: "Run a batch of tmsp commands against an application", + Action: func(c *cli.Context) { + cmdBatch(app, c) + }, + }, + { + Name: "console", + Usage: "Start an interactive tmsp console for multiple commands", + Action: func(c *cli.Context) { + cmdConsole(app, c) + }, + }, + { + Name: "echo", + Usage: "Have the application echo a message", + Action: func(c *cli.Context) { + cmdEcho(c) + }, + }, + { + Name: "info", + Usage: "Get some info about the application", + Action: func(c *cli.Context) { + cmdInfo(c) + }, + }, + { + Name: "set_option", + Usage: "Set an option on the application", + Action: func(c *cli.Context) { + cmdSetOption(c) + }, + }, + { + Name: "append_tx", + Usage: "Append a new tx to application", + Action: func(c *cli.Context) { + cmdAppendTx(c) + }, + }, + { + Name: "get_hash", + Usage: "Get application Merkle root hash", + Action: func(c *cli.Context) { + cmdGetHash(c) + }, + }, + { + Name: "commit", + Usage: "Commit the application state", + Action: func(c *cli.Context) { + cmdCommit(c) + }, + }, + { + Name: "rollback", + Usage: "Roll back the application state to the latest commit", + Action: func(c *cli.Context) { + cmdRollback(c) + }, + }, + } + app.Before = before + app.Run(os.Args) + +} + +func before(c *cli.Context) error { + if conn == nil { + var err error + conn, err = Connect(c.GlobalString("address")) + if err != nil { + Exit(err.Error()) + } + } + return nil +} + +//-------------------------------------------------------------------------------- + +func cmdBatch(app *cli.App, c *cli.Context) { + bufReader := bufio.NewReader(os.Stdin) + for { + line, more, err := bufReader.ReadLine() + if more { + Exit("input line is too long") + } else if err == io.EOF { + break + } else if len(line) == 0 { + continue + } else if err != nil { + Exit(err.Error()) + } + args := []string{"tmsp"} + args = append(args, strings.Split(string(line), " ")...) + app.Run(args) + } +} + +func cmdConsole(app *cli.App, c *cli.Context) { + for { + fmt.Printf("> ") + bufReader := bufio.NewReader(os.Stdin) + line, more, err := bufReader.ReadLine() + if more { + Exit("input is too long") + } else if err != nil { + Exit(err.Error()) + } + + args := []string{"tmsp"} + args = append(args, strings.Split(string(line), " ")...) + app.Run(args) + } +} + +// Have the application echo a message +func cmdEcho(c *cli.Context) { + args := c.Args() + if len(args) != 1 { + Exit("echo takes 1 argument") + } + res, err := makeRequest(conn, types.RequestEcho{args[0]}) + if err != nil { + Exit(err.Error()) + } + fmt.Println(res) +} + +// Get some info from the application +func cmdInfo(c *cli.Context) { + res, err := makeRequest(conn, types.RequestInfo{}) + if err != nil { + Exit(err.Error()) + } + fmt.Println(res) +} + +// Set an option on the application +func cmdSetOption(c *cli.Context) { + args := c.Args() + if len(args) != 2 { + Exit("set_option takes 2 arguments (key, value)") + } + _, err := makeRequest(conn, types.RequestSetOption{args[0], args[1]}) + if err != nil { + Exit(err.Error()) + } + fmt.Printf("%s=%s\n", args[0], args[1]) +} + +// Append a new tx to application +func cmdAppendTx(c *cli.Context) { + args := c.Args() + if len(args) != 1 { + Exit("append_tx takes 1 argument") + } + txString := args[0] + tx := []byte(txString) + if len(txString) > 2 && strings.HasPrefix(txString, "0x") { + var err error + tx, err = hex.DecodeString(txString[2:]) + if err != nil { + Exit(err.Error()) + } + } + + res, err := makeRequest(conn, types.RequestAppendTx{tx}) + if err != nil { + Exit(err.Error()) + } + fmt.Println("Response:", res) +} + +// Get application Merkle root hash +func cmdGetHash(c *cli.Context) { + res, err := makeRequest(conn, types.RequestGetHash{}) + if err != nil { + Exit(err.Error()) + } + fmt.Printf("%X\n", res.(types.ResponseGetHash).Hash) +} + +// Commit the application state +func cmdCommit(c *cli.Context) { + _, err := makeRequest(conn, types.RequestCommit{}) + if err != nil { + Exit(err.Error()) + } + fmt.Println("Committed.") +} + +// Roll back the application state to the latest commit +func cmdRollback(c *cli.Context) { + _, err := makeRequest(conn, types.RequestRollback{}) + if err != nil { + Exit(err.Error()) + } + fmt.Println("Rolled back.") +} + +//-------------------------------------------------------------------------------- + +func makeRequest(conn net.Conn, req types.Request) (types.Response, error) { + var n int + var err error + + // Write desired request + wire.WriteBinaryLengthPrefixed(req, conn, &n, &err) + if err != nil { + return nil, err + } + + // Write flush request + wire.WriteBinaryLengthPrefixed(types.RequestFlush{}, conn, &n, &err) + if err != nil { + return nil, err + } + + // Read desired response + var res types.Response + wire.ReadBinaryPtrLengthPrefixed(&res, conn, 0, &n, &err) + if err != nil { + return nil, err + } + + // Read flush response + var resFlush types.ResponseFlush + wire.ReadBinaryPtrLengthPrefixed(&resFlush, conn, 0, &n, &err) + if err != nil { + return nil, err + } + + return res, nil +} diff --git a/example/js/app.js b/example/js/app.js new file mode 100644 index 000000000..f7def6a02 --- /dev/null +++ b/example/js/app.js @@ -0,0 +1,87 @@ +server = require("./server") +wire = require("./wire") +util = require("util") + +function CounterApp(){ + this.hashCount = 0; + this.txCount = 0; + this.commitCount = 0; +}; + +CounterApp.prototype.open = function(){ + return new CounterAppContext(this); +} + +function CounterAppContext(app) { + this.hashCount = app.hashCount; + this.txCount = app.txCount; + this.commitCount = app.commitCount; + this.serial = false; +} + +CounterAppContext.prototype.echo = function(msg){ + return {"response": msg, "ret_code":0} +} + +CounterAppContext.prototype.info = function(){ + return {"response": [util.format("hash, tx, commit counts: %d, %d, %d", this.hashCount, this.txCount, this.commitCount)]} +} + +CounterAppContext.prototype.set_option = function(key, value){ + if (key == "serial" && value == "on"){ + this.serial = true; + } + return {"ret_code":0} +} + +CounterAppContext.prototype.append_tx = function(txBytes){ + if (this.serial) { + txByteArray = new Buffer(txBytes) + if (txBytes.length >= 2 && txBytes.slice(0, 2) == "0x") { + txByteArray = wire.hex2bytes(txBytes.slice(2)); + } + r = new msg.buffer(txByteArray) + txValue = wire.decode_big_endian(r, txBytes.length) + if (txValue != this.txCount){ + return {"ret_code":1} + } + } + this.txCount += 1; + return {"ret_code":0} // TODO: return events +} + +CounterAppContext.prototype.get_hash = function(){ + this.hashCount += 1; + if (this.txCount == 0){ + return {"response": "", "ret_code":0} + } + h = wire.encode_big_endian(this.txCount, 8); + h = wire.reverse(h); // TODO + return {"response": h.toString(), "ret_code":0} +} + +CounterAppContext.prototype.commit = function(){ + this.commitCount += 1; + return {"ret_code":0} +} + +CounterAppContext.prototype.rollback = function(){ + return {"ret_code":0} +} + +CounterAppContext.prototype.add_listener = function(){ + return {"ret_code":0} +} + +CounterAppContext.prototype.rm_listener = function(){ + return {"ret_code":0} +} + +CounterAppContext.prototype.event = function(){ +} + +console.log("Counter app in Javascript") + +var app = new CounterApp(); +var appServer = new server.AppServer(app); +appServer.server.listen(46658) diff --git a/example/js/msgs.js b/example/js/msgs.js new file mode 100644 index 000000000..a8b90ecd8 --- /dev/null +++ b/example/js/msgs.js @@ -0,0 +1,64 @@ +wire = require("./wire") + +module.exports = { + types : { + 0x01 : "echo", + 0x02 : "flush", + 0x03 : "info", + 0x04 : "set_option", + 0x21 : "append_tx", + 0x22 : "get_hash", + 0x23 : "commit", + 0x24 : "rollback", + 0x25 : "add_listener", + 0x26 : "rm_listener", + }, + + decoder : RequestDecoder, + + buffer: BytesBuffer + +} + +function RequestDecoder(buf){ + this.buf= buf +} + +var decode_string = wire.decode_string + +// return nothing, one thing, or a list of things +RequestDecoder.prototype.echo = function(){ return decode_string(this.buf) }; +RequestDecoder.prototype.flush = function(){}; +RequestDecoder.prototype.info = function(){}; +RequestDecoder.prototype.set_option = function(){ return [decode_string(this.buf), decode_string(this.buf)] }; +RequestDecoder.prototype.append_tx = function(){ return decode_string(this.buf)}; +RequestDecoder.prototype.get_hash = function(){ }; +RequestDecoder.prototype.commit = function(){ }; +RequestDecoder.prototype.rollback = function(){ }; +RequestDecoder.prototype.add_listener = function(){ }; // TODO +RequestDecoder.prototype.rm_listener = function(){ }; // TODO + +// buffered reader with read(n) method +function BytesBuffer(buf){ + this.buf = buf +} + +BytesBuffer.prototype.read = function(n){ + b = this.buf.slice(0, n) + this.buf = this.buf.slice(n) + return b +}; + +BytesBuffer.prototype.write = function(buf){ + this.buf = Buffer.concat([this.buf, buf]); +}; + + +BytesBuffer.prototype.size = function(){ + return this.buf.length +} + +BytesBuffer.prototype.peek = function(){ + return this.buf[0] +} + diff --git a/example/js/server.js b/example/js/server.js new file mode 100644 index 000000000..4ca870d9d --- /dev/null +++ b/example/js/server.js @@ -0,0 +1,128 @@ + +// Load the TCP Library +net = require('net'); +msg = require('./msgs'); +wire = require("./wire") + +// Takes an application and handles tmsp connection +// which invoke methods on the app +function AppServer(app){ + // set the app for the socket handler + this.app = app; + + // create a server by providing callback for + // accepting new connection and callbacks for + // connection events ('data', 'end', etc.) + this.createServer() +} + +module.exports = { AppServer: AppServer }; + +AppServer.prototype.createServer = function(){ + app = this.app + conns = {} // map sockets to their state + + // define the socket handler + this.server = net.createServer(function(socket){ + socket.name = socket.remoteAddress + ":" + socket.remotePort + console.log("new connection from", socket.name) + + appCtx = app.open() + + var conn = { + recBuf: new msg.buffer(new Buffer(0)), + resBuf: new msg.buffer(new Buffer(0)), + msgLength: 0, + inProgress: false + } + conns[socket] = conn + + // Handle tmsp requests. + socket.on('data', function (data) { + + if (data.length == 0){ + // TODO err + console.log("empty data!") + return + } + conn = conns[socket] + + // we received data. append it + conn.recBuf.write(data) + + while ( conn.recBuf.size() > 0 ){ + + if (conn.msgLength == 0){ + ll = conn.recBuf.peek(); + if (conn.recBuf.size() < 1 + ll){ + // don't have enough bytes to read length yet + return + } + conn.msgLength = wire.decode_varint(conn.recBuf) + } + + if (conn.recBuf.size() < conn.msgLength) { + // don't have enough to decode the message + return + } + + // now we can decode + typeByte = conn.recBuf.read(1); + resTypeByte = typeByte[0] + 0x10 + reqType = msg.types[typeByte[0]]; + + if (reqType == "flush"){ + // msgs are length prefixed + conn.resBuf.write(wire.encode(1)); + conn.resBuf.write(new Buffer([resTypeByte])) + n = socket.write(conn.resBuf.buf); + conn.msgLength = 0; + conn.resBuf = new msg.buffer(new Buffer(0)); + return + } + + // decode args + decoder = new msg.decoder(conn.recBuf); + args = decoder[reqType](); + + // done decoding + conn.msgLength = 0 + + var res = function(){ + if (args == null){ + return appCtx[reqType](); + } else if (Array.isArray(args)){ + return appCtx[reqType].apply(appCtx, args); + } else { + return appCtx[reqType](args) + } + }() + + + var retCode = res["ret_code"] + var res = res["response"] + + if (retCode != null && retCode != 0){ + console.log("non-zero ret code", retCode) + } + + + if (reqType == "echo" || reqType == "info"){ + enc = Buffer.concat([new Buffer([resTypeByte]), wire.encode(res)]); + // length prefixed + conn.resBuf.write(wire.encode(enc.length)); + conn.resBuf.write(enc); + } else { + enc = Buffer.concat([new Buffer([resTypeByte]), wire.encode(retCode), wire.encode(res)]); + conn.resBuf.write(wire.encode(enc.length)); + conn.resBuf.write(enc); + } + } + }); + + socket.on('end', function () { + console.log("connection ended") + }); + }) +} + diff --git a/example/js/wire.js b/example/js/wire.js new file mode 100644 index 000000000..13ee55292 --- /dev/null +++ b/example/js/wire.js @@ -0,0 +1,113 @@ +math = require("math") + +module.exports = { + decode_string: decode_string, + decode_varint: decode_varint, + decode_big_endian: decode_big_endian, + encode_big_endian: encode_big_endian, + encode: encode, + reverse: reverse, +} + +function reverse(buf){ + for (var i = 0; i < buf.length/2; i++){ + a = buf[i]; + b = buf[buf.length-1 - i]; + buf[i] = b; + buf[buf.length-1 - i] = a; + } + return buf +} + +function uvarint_size(i){ + if (i == 0){ + return 0 + } + + for(var j = 1; j < 9; j++) { + if ( i < 1< 0xF0){ negate = true } + if (negate) { size = size - 0xF0 } + i = decode_big_endian(reader, size); + if (negate) { i = i * -1} + return i +} + +function encode_list(l){ + var l2 = l.map(encode); + var buf = new Buffer(encode_varint(l2.length)); + return Buffer.concat([buf, Buffer.concat(l2)]); +} + +function encode(b){ + if (b == null){ + return Buffer(0) + } else if (typeof b == "number"){ + return encode_varint(b) + } else if (typeof b == "string"){ + return encode_string(b) + } else if (Array.isArray(b)){ + return encode_list(b) + } else{ + console.log("UNSUPPORTED TYPE!", typeof b, b) + } +} + + + + + diff --git a/example/python/app.py b/example/python/app.py index a9e9d3b75..a439aa748 100644 --- a/example/python/app.py +++ b/example/python/app.py @@ -41,7 +41,7 @@ class CounterAppContext(): txByteArray = bytearray(txBytes) if len(txBytes) >= 2 and txBytes[:2] == "0x": txByteArray = hex2bytes(txBytes[2:]) - txValue = decode_big_endian(BytesReader(txByteArray), len(txBytes)) + txValue = decode_big_endian(BytesBuffer(txByteArray), len(txBytes)) if txValue != self.txCount: return None, 1 self.txCount += 1 @@ -56,6 +56,7 @@ class CounterAppContext(): return str(h), 0 def commit(self): + self.commitCount += 1 return 0 def rollback(self): diff --git a/example/python/tmsp/reader.py b/example/python/tmsp/reader.py index f1b3dfae4..3b1f87fcb 100644 --- a/example/python/tmsp/reader.py +++ b/example/python/tmsp/reader.py @@ -1,14 +1,32 @@ # Simple read() method around a bytearray -class BytesReader(): +class BytesBuffer(): def __init__(self, b): self.buf = b + self.readCount = 0 + + def count(self): + return self.readCount + + def reset_count(self): + self.readCount = 0 + + def size(self): + return len(self.buf) + + def peek(self): + return self.buf[0] + + def write(self, b): + # b should be castable to byte array + self.buf += bytearray(b) def read(self, n): if len(self.buf) < n: print "reader err: buf less than n" # TODO: exception return + self.readCount += n r = self.buf[:n] self.buf = self.buf[n:] return r diff --git a/example/python/tmsp/server.py b/example/python/tmsp/server.py index e25e4e1fd..0beb59d13 100644 --- a/example/python/tmsp/server.py +++ b/example/python/tmsp/server.py @@ -8,11 +8,29 @@ from wire import * from reader import * from msg import * +# hold the asyncronous state of a connection +# ie. we may not get enough bytes on one read to decode the message +class Connection(): + def __init__(self, fd, appCtx): + self.fd = fd + self.appCtx = appCtx + self.recBuf = BytesBuffer(bytearray()) + self.resBuf = BytesBuffer(bytearray()) + self.msgLength = 0 + self.decoder = RequestDecoder(self.recBuf) + self.inProgress = False # are we in the middle of a message + + def recv(this): + data = this.fd.recv(1024) + if not data: # what about len(data) == 0 + raise IOError("dead connection") + this.recBuf.write(data) + # TMSP server responds to messges by calling methods on the app class TMSPServer(): def __init__(self, app, port=5410): self.app = app - self.appMap = {} # map conn file descriptors to (appContext, msgDecoder) + self.appMap = {} # map conn file descriptors to (appContext, reqBuf, resBuf, msgDecoder) self.port = port self.listen_backlog = 10 @@ -31,12 +49,13 @@ class TMSPServer(): def handle_new_connection(self, r): new_fd, new_addr = r.accept() + new_fd.setblocking(0) # non-blocking self.read_list.append(new_fd) self.write_list.append(new_fd) print 'new connection to', new_addr appContext = self.app.open() - self.appMap[new_fd] = (appContext, RequestDecoder(ConnReader(new_fd))) + self.appMap[new_fd] = Connection(new_fd, appContext) def handle_conn_closed(self, r): self.read_list.remove(r) @@ -45,25 +64,63 @@ class TMSPServer(): print "connection closed" def handle_recv(self, r): - appCtx, conn = self.appMap[r] - response = bytearray() +# appCtx, recBuf, resBuf, conn + conn = self.appMap[r] while True: try: - # first read the request type and get the msg decoder - typeByte = conn.reader.read(1) + print "recv loop" + # check if we need more data first + if conn.inProgress: + if conn.msgLength == 0 or conn.recBuf.size() < conn.msgLength: + conn.recv() + else: + if conn.recBuf.size() == 0: + conn.recv() + + conn.inProgress = True + + # see if we have enough to get the message length + if conn.msgLength == 0: + ll = conn.recBuf.peek() + if conn.recBuf.size() < 1 + ll: + # we don't have enough bytes to read the length yet + return + print "decoding msg length" + conn.msgLength = decode_varint(conn.recBuf) + + # see if we have enough to decode the message + if conn.recBuf.size() < conn.msgLength: + return + + # now we can decode the message + + # first read the request type and get the particular msg decoder + typeByte = conn.recBuf.read(1) typeByte = int(typeByte[0]) resTypeByte = typeByte+0x10 req_type = message_types[typeByte] if req_type == "flush": - response += bytearray([resTypeByte]) - sent = r.send(str(response)) + # messages are length prefixed + conn.resBuf.write(encode(1)) + conn.resBuf.write([resTypeByte]) + sent = conn.fd.send(str(conn.resBuf.buf)) + conn.msgLength = 0 + conn.inProgress = False + conn.resBuf = BytesBuffer(bytearray()) return - decoder = getattr(conn, req_type) + decoder = getattr(conn.decoder, req_type) + print "decoding args" req_args = decoder() - req_f = getattr(appCtx, req_type) + print "got args", req_args + + # done decoding message + conn.msgLength = 0 + conn.inProgress = False + + req_f = getattr(conn.appCtx, req_type) if req_args == None: res = req_f() elif isinstance(req_args, tuple): @@ -82,9 +139,18 @@ class TMSPServer(): print "non-zero retcode:", ret_code if req_type in ("echo", "info"): # these dont return a ret code - response += bytearray([resTypeByte]) + encode(res) + enc = encode(res) + # messages are length prefixed + conn.resBuf.write(encode(len(enc) + 1)) + conn.resBuf.write([resTypeByte]) + conn.resBuf.write(enc) else: - response += bytearray([resTypeByte]) + encode(ret_code) + encode(res) + enc, encRet = encode(res), encode(ret_code) + # messages are length prefixed + conn.resBuf.write(encode(len(enc)+len(encRet)+1)) + conn.resBuf.write([resTypeByte]) + conn.resBuf.write(encRet) + conn.resBuf.write(enc) except TypeError as e: print "TypeError on reading from connection:", e self.handle_conn_closed(r) @@ -97,8 +163,8 @@ class TMSPServer(): print "IOError on reading from connection:", e self.handle_conn_closed(r) return - except: - print "error reading from connection", sys.exc_info()[0] # TODO better + except Exception as e: + print "error reading from connection", str(e) # sys.exc_info()[0] # TODO better self.handle_conn_closed(r) return @@ -112,7 +178,7 @@ class TMSPServer(): self.handle_new_connection(r) # undo adding to read list ... - except NameError as e: + except rameError as e: print "Could not connect due to NameError:", e except TypeError as e: print "Could not connect due to TypeError:", e diff --git a/tests/test.sh b/tests/test.sh index 848d14cbc..9ee9be135 100644 --- a/tests/test.sh +++ b/tests/test.sh @@ -11,3 +11,7 @@ bash tests/test_counter.sh # test python counter cd example/python COUNTER_APP="python app.py" bash $ROOT/tests/test_counter.sh + +# test js counter +cd ../js +COUNTER_APP="node app.js" bash $ROOT/tests/test_counter.sh diff --git a/tests/test_counter.sh b/tests/test_counter.sh index 41909dde8..8bbd8d376 100644 --- a/tests/test_counter.sh +++ b/tests/test_counter.sh @@ -11,7 +11,7 @@ $COUNTER_APP &> /dev/null & PID=`echo $!` if [[ "$?" != 0 ]]; then - echo "Error running tmsp command" + echo "Error running tmsp app" echo $OUTPUT exit 1 fi @@ -24,7 +24,7 @@ append_tx abc STDIN` if [[ "$?" != 0 ]]; then - echo "Error running tmsp command" + echo "Error running tmsp batch command" echo $OUTPUT exit 1 fi