Browse Source

Merge pull request #5 from tendermint/js

Length prefixed tmsp messages; counter app in javascript
pull/1780/head
Jae Kwon 9 years ago
parent
commit
13c50a5fdf
10 changed files with 770 additions and 19 deletions
  1. +270
    -0
      cmd/tmsp-cli/tmsp_cli.go
  2. +87
    -0
      example/js/app.js
  3. +64
    -0
      example/js/msgs.js
  4. +128
    -0
      example/js/server.js
  5. +113
    -0
      example/js/wire.js
  6. +2
    -1
      example/python/app.py
  7. +19
    -1
      example/python/tmsp/reader.py
  8. +81
    -15
      example/python/tmsp/server.py
  9. +4
    -0
      tests/test.sh
  10. +2
    -2
      tests/test_counter.sh

+ 270
- 0
cmd/tmsp-cli/tmsp_cli.go View File

@ -0,0 +1,270 @@
package main
import (
"bufio"
"encoding/hex"
"fmt"
"io"
"net"
"os"
"strings"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
"github.com/tendermint/tmsp/types"
"github.com/codegangsta/cli"
)
// connection is a global variable so it can be reused by the console
var conn net.Conn
func main() {
app := cli.NewApp()
app.Name = "cli"
app.Usage = "cli [command] [args...]"
app.Flags = []cli.Flag{
cli.StringFlag{
Name: "address",
Value: "tcp://127.0.0.1:46658",
Usage: "address of application socket",
},
}
app.Commands = []cli.Command{
{
Name: "batch",
Usage: "Run a batch of tmsp commands against an application",
Action: func(c *cli.Context) {
cmdBatch(app, c)
},
},
{
Name: "console",
Usage: "Start an interactive tmsp console for multiple commands",
Action: func(c *cli.Context) {
cmdConsole(app, c)
},
},
{
Name: "echo",
Usage: "Have the application echo a message",
Action: func(c *cli.Context) {
cmdEcho(c)
},
},
{
Name: "info",
Usage: "Get some info about the application",
Action: func(c *cli.Context) {
cmdInfo(c)
},
},
{
Name: "set_option",
Usage: "Set an option on the application",
Action: func(c *cli.Context) {
cmdSetOption(c)
},
},
{
Name: "append_tx",
Usage: "Append a new tx to application",
Action: func(c *cli.Context) {
cmdAppendTx(c)
},
},
{
Name: "get_hash",
Usage: "Get application Merkle root hash",
Action: func(c *cli.Context) {
cmdGetHash(c)
},
},
{
Name: "commit",
Usage: "Commit the application state",
Action: func(c *cli.Context) {
cmdCommit(c)
},
},
{
Name: "rollback",
Usage: "Roll back the application state to the latest commit",
Action: func(c *cli.Context) {
cmdRollback(c)
},
},
}
app.Before = before
app.Run(os.Args)
}
func before(c *cli.Context) error {
if conn == nil {
var err error
conn, err = Connect(c.GlobalString("address"))
if err != nil {
Exit(err.Error())
}
}
return nil
}
//--------------------------------------------------------------------------------
func cmdBatch(app *cli.App, c *cli.Context) {
bufReader := bufio.NewReader(os.Stdin)
for {
line, more, err := bufReader.ReadLine()
if more {
Exit("input line is too long")
} else if err == io.EOF {
break
} else if len(line) == 0 {
continue
} else if err != nil {
Exit(err.Error())
}
args := []string{"tmsp"}
args = append(args, strings.Split(string(line), " ")...)
app.Run(args)
}
}
func cmdConsole(app *cli.App, c *cli.Context) {
for {
fmt.Printf("> ")
bufReader := bufio.NewReader(os.Stdin)
line, more, err := bufReader.ReadLine()
if more {
Exit("input is too long")
} else if err != nil {
Exit(err.Error())
}
args := []string{"tmsp"}
args = append(args, strings.Split(string(line), " ")...)
app.Run(args)
}
}
// Have the application echo a message
func cmdEcho(c *cli.Context) {
args := c.Args()
if len(args) != 1 {
Exit("echo takes 1 argument")
}
res, err := makeRequest(conn, types.RequestEcho{args[0]})
if err != nil {
Exit(err.Error())
}
fmt.Println(res)
}
// Get some info from the application
func cmdInfo(c *cli.Context) {
res, err := makeRequest(conn, types.RequestInfo{})
if err != nil {
Exit(err.Error())
}
fmt.Println(res)
}
// Set an option on the application
func cmdSetOption(c *cli.Context) {
args := c.Args()
if len(args) != 2 {
Exit("set_option takes 2 arguments (key, value)")
}
_, err := makeRequest(conn, types.RequestSetOption{args[0], args[1]})
if err != nil {
Exit(err.Error())
}
fmt.Printf("%s=%s\n", args[0], args[1])
}
// Append a new tx to application
func cmdAppendTx(c *cli.Context) {
args := c.Args()
if len(args) != 1 {
Exit("append_tx takes 1 argument")
}
txString := args[0]
tx := []byte(txString)
if len(txString) > 2 && strings.HasPrefix(txString, "0x") {
var err error
tx, err = hex.DecodeString(txString[2:])
if err != nil {
Exit(err.Error())
}
}
res, err := makeRequest(conn, types.RequestAppendTx{tx})
if err != nil {
Exit(err.Error())
}
fmt.Println("Response:", res)
}
// Get application Merkle root hash
func cmdGetHash(c *cli.Context) {
res, err := makeRequest(conn, types.RequestGetHash{})
if err != nil {
Exit(err.Error())
}
fmt.Printf("%X\n", res.(types.ResponseGetHash).Hash)
}
// Commit the application state
func cmdCommit(c *cli.Context) {
_, err := makeRequest(conn, types.RequestCommit{})
if err != nil {
Exit(err.Error())
}
fmt.Println("Committed.")
}
// Roll back the application state to the latest commit
func cmdRollback(c *cli.Context) {
_, err := makeRequest(conn, types.RequestRollback{})
if err != nil {
Exit(err.Error())
}
fmt.Println("Rolled back.")
}
//--------------------------------------------------------------------------------
func makeRequest(conn net.Conn, req types.Request) (types.Response, error) {
var n int
var err error
// Write desired request
wire.WriteBinaryLengthPrefixed(req, conn, &n, &err)
if err != nil {
return nil, err
}
// Write flush request
wire.WriteBinaryLengthPrefixed(types.RequestFlush{}, conn, &n, &err)
if err != nil {
return nil, err
}
// Read desired response
var res types.Response
wire.ReadBinaryPtrLengthPrefixed(&res, conn, 0, &n, &err)
if err != nil {
return nil, err
}
// Read flush response
var resFlush types.ResponseFlush
wire.ReadBinaryPtrLengthPrefixed(&resFlush, conn, 0, &n, &err)
if err != nil {
return nil, err
}
return res, nil
}

+ 87
- 0
example/js/app.js View File

@ -0,0 +1,87 @@
server = require("./server")
wire = require("./wire")
util = require("util")
function CounterApp(){
this.hashCount = 0;
this.txCount = 0;
this.commitCount = 0;
};
CounterApp.prototype.open = function(){
return new CounterAppContext(this);
}
function CounterAppContext(app) {
this.hashCount = app.hashCount;
this.txCount = app.txCount;
this.commitCount = app.commitCount;
this.serial = false;
}
CounterAppContext.prototype.echo = function(msg){
return {"response": msg, "ret_code":0}
}
CounterAppContext.prototype.info = function(){
return {"response": [util.format("hash, tx, commit counts: %d, %d, %d", this.hashCount, this.txCount, this.commitCount)]}
}
CounterAppContext.prototype.set_option = function(key, value){
if (key == "serial" && value == "on"){
this.serial = true;
}
return {"ret_code":0}
}
CounterAppContext.prototype.append_tx = function(txBytes){
if (this.serial) {
txByteArray = new Buffer(txBytes)
if (txBytes.length >= 2 && txBytes.slice(0, 2) == "0x") {
txByteArray = wire.hex2bytes(txBytes.slice(2));
}
r = new msg.buffer(txByteArray)
txValue = wire.decode_big_endian(r, txBytes.length)
if (txValue != this.txCount){
return {"ret_code":1}
}
}
this.txCount += 1;
return {"ret_code":0} // TODO: return events
}
CounterAppContext.prototype.get_hash = function(){
this.hashCount += 1;
if (this.txCount == 0){
return {"response": "", "ret_code":0}
}
h = wire.encode_big_endian(this.txCount, 8);
h = wire.reverse(h); // TODO
return {"response": h.toString(), "ret_code":0}
}
CounterAppContext.prototype.commit = function(){
this.commitCount += 1;
return {"ret_code":0}
}
CounterAppContext.prototype.rollback = function(){
return {"ret_code":0}
}
CounterAppContext.prototype.add_listener = function(){
return {"ret_code":0}
}
CounterAppContext.prototype.rm_listener = function(){
return {"ret_code":0}
}
CounterAppContext.prototype.event = function(){
}
console.log("Counter app in Javascript")
var app = new CounterApp();
var appServer = new server.AppServer(app);
appServer.server.listen(46658)

+ 64
- 0
example/js/msgs.js View File

@ -0,0 +1,64 @@
wire = require("./wire")
module.exports = {
types : {
0x01 : "echo",
0x02 : "flush",
0x03 : "info",
0x04 : "set_option",
0x21 : "append_tx",
0x22 : "get_hash",
0x23 : "commit",
0x24 : "rollback",
0x25 : "add_listener",
0x26 : "rm_listener",
},
decoder : RequestDecoder,
buffer: BytesBuffer
}
function RequestDecoder(buf){
this.buf= buf
}
var decode_string = wire.decode_string
// return nothing, one thing, or a list of things
RequestDecoder.prototype.echo = function(){ return decode_string(this.buf) };
RequestDecoder.prototype.flush = function(){};
RequestDecoder.prototype.info = function(){};
RequestDecoder.prototype.set_option = function(){ return [decode_string(this.buf), decode_string(this.buf)] };
RequestDecoder.prototype.append_tx = function(){ return decode_string(this.buf)};
RequestDecoder.prototype.get_hash = function(){ };
RequestDecoder.prototype.commit = function(){ };
RequestDecoder.prototype.rollback = function(){ };
RequestDecoder.prototype.add_listener = function(){ }; // TODO
RequestDecoder.prototype.rm_listener = function(){ }; // TODO
// buffered reader with read(n) method
function BytesBuffer(buf){
this.buf = buf
}
BytesBuffer.prototype.read = function(n){
b = this.buf.slice(0, n)
this.buf = this.buf.slice(n)
return b
};
BytesBuffer.prototype.write = function(buf){
this.buf = Buffer.concat([this.buf, buf]);
};
BytesBuffer.prototype.size = function(){
return this.buf.length
}
BytesBuffer.prototype.peek = function(){
return this.buf[0]
}

+ 128
- 0
example/js/server.js View File

@ -0,0 +1,128 @@
// Load the TCP Library
net = require('net');
msg = require('./msgs');
wire = require("./wire")
// Takes an application and handles tmsp connection
// which invoke methods on the app
function AppServer(app){
// set the app for the socket handler
this.app = app;
// create a server by providing callback for
// accepting new connection and callbacks for
// connection events ('data', 'end', etc.)
this.createServer()
}
module.exports = { AppServer: AppServer };
AppServer.prototype.createServer = function(){
app = this.app
conns = {} // map sockets to their state
// define the socket handler
this.server = net.createServer(function(socket){
socket.name = socket.remoteAddress + ":" + socket.remotePort
console.log("new connection from", socket.name)
appCtx = app.open()
var conn = {
recBuf: new msg.buffer(new Buffer(0)),
resBuf: new msg.buffer(new Buffer(0)),
msgLength: 0,
inProgress: false
}
conns[socket] = conn
// Handle tmsp requests.
socket.on('data', function (data) {
if (data.length == 0){
// TODO err
console.log("empty data!")
return
}
conn = conns[socket]
// we received data. append it
conn.recBuf.write(data)
while ( conn.recBuf.size() > 0 ){
if (conn.msgLength == 0){
ll = conn.recBuf.peek();
if (conn.recBuf.size() < 1 + ll){
// don't have enough bytes to read length yet
return
}
conn.msgLength = wire.decode_varint(conn.recBuf)
}
if (conn.recBuf.size() < conn.msgLength) {
// don't have enough to decode the message
return
}
// now we can decode
typeByte = conn.recBuf.read(1);
resTypeByte = typeByte[0] + 0x10
reqType = msg.types[typeByte[0]];
if (reqType == "flush"){
// msgs are length prefixed
conn.resBuf.write(wire.encode(1));
conn.resBuf.write(new Buffer([resTypeByte]))
n = socket.write(conn.resBuf.buf);
conn.msgLength = 0;
conn.resBuf = new msg.buffer(new Buffer(0));
return
}
// decode args
decoder = new msg.decoder(conn.recBuf);
args = decoder[reqType]();
// done decoding
conn.msgLength = 0
var res = function(){
if (args == null){
return appCtx[reqType]();
} else if (Array.isArray(args)){
return appCtx[reqType].apply(appCtx, args);
} else {
return appCtx[reqType](args)
}
}()
var retCode = res["ret_code"]
var res = res["response"]
if (retCode != null && retCode != 0){
console.log("non-zero ret code", retCode)
}
if (reqType == "echo" || reqType == "info"){
enc = Buffer.concat([new Buffer([resTypeByte]), wire.encode(res)]);
// length prefixed
conn.resBuf.write(wire.encode(enc.length));
conn.resBuf.write(enc);
} else {
enc = Buffer.concat([new Buffer([resTypeByte]), wire.encode(retCode), wire.encode(res)]);
conn.resBuf.write(wire.encode(enc.length));
conn.resBuf.write(enc);
}
}
});
socket.on('end', function () {
console.log("connection ended")
});
})
}

+ 113
- 0
example/js/wire.js View File

@ -0,0 +1,113 @@
math = require("math")
module.exports = {
decode_string: decode_string,
decode_varint: decode_varint,
decode_big_endian: decode_big_endian,
encode_big_endian: encode_big_endian,
encode: encode,
reverse: reverse,
}
function reverse(buf){
for (var i = 0; i < buf.length/2; i++){
a = buf[i];
b = buf[buf.length-1 - i];
buf[i] = b;
buf[buf.length-1 - i] = a;
}
return buf
}
function uvarint_size(i){
if (i == 0){
return 0
}
for(var j = 1; j < 9; j++) {
if ( i < 1<<j*8 ) {
return j
}
}
return 8
}
function encode_big_endian(i, size){
if (size == 0){
return new Buffer(0);
}
b = encode_big_endian(math.floor(i/256), size-1);
return Buffer.concat([b, new Buffer([i%256])]);
}
function decode_big_endian(reader, size){
if (size == 0){ return 0 }
firstByte = reader.read(1)[0];
return firstByte*(math.pow(256, size-1)) + decode_big_endian(reader, size-1)
}
function encode_string(s){
size = encode_varint(s.length);
return Buffer.concat([size, new Buffer(s)])
}
function decode_string(reader){
length = decode_varint(reader);
return reader.read(length).toString()
}
function encode_varint(i){
var negate = false;
if (i < 0){
negate = true;
i = -i;
}
size = uvarint_size(i);
if (size == 0){
return new Buffer([0])
}
big_end = encode_big_endian(i, size);
if (negate){ size += 0xF0 }
var buf = new Buffer([1]);
return Buffer.concat([buf, big_end])
}
function decode_varint(reader){
size = reader.read(1)[0];
if (size == 0 ){
return 0
}
var negate = false;
if (size > 0xF0){ negate = true }
if (negate) { size = size - 0xF0 }
i = decode_big_endian(reader, size);
if (negate) { i = i * -1}
return i
}
function encode_list(l){
var l2 = l.map(encode);
var buf = new Buffer(encode_varint(l2.length));
return Buffer.concat([buf, Buffer.concat(l2)]);
}
function encode(b){
if (b == null){
return Buffer(0)
} else if (typeof b == "number"){
return encode_varint(b)
} else if (typeof b == "string"){
return encode_string(b)
} else if (Array.isArray(b)){
return encode_list(b)
} else{
console.log("UNSUPPORTED TYPE!", typeof b, b)
}
}

+ 2
- 1
example/python/app.py View File

@ -41,7 +41,7 @@ class CounterAppContext():
txByteArray = bytearray(txBytes)
if len(txBytes) >= 2 and txBytes[:2] == "0x":
txByteArray = hex2bytes(txBytes[2:])
txValue = decode_big_endian(BytesReader(txByteArray), len(txBytes))
txValue = decode_big_endian(BytesBuffer(txByteArray), len(txBytes))
if txValue != self.txCount:
return None, 1
self.txCount += 1
@ -56,6 +56,7 @@ class CounterAppContext():
return str(h), 0
def commit(self):
self.commitCount += 1
return 0
def rollback(self):


+ 19
- 1
example/python/tmsp/reader.py View File

@ -1,14 +1,32 @@
# Simple read() method around a bytearray
class BytesReader():
class BytesBuffer():
def __init__(self, b):
self.buf = b
self.readCount = 0
def count(self):
return self.readCount
def reset_count(self):
self.readCount = 0
def size(self):
return len(self.buf)
def peek(self):
return self.buf[0]
def write(self, b):
# b should be castable to byte array
self.buf += bytearray(b)
def read(self, n):
if len(self.buf) < n:
print "reader err: buf less than n"
# TODO: exception
return
self.readCount += n
r = self.buf[:n]
self.buf = self.buf[n:]
return r


+ 81
- 15
example/python/tmsp/server.py View File

@ -8,11 +8,29 @@ from wire import *
from reader import *
from msg import *
# hold the asyncronous state of a connection
# ie. we may not get enough bytes on one read to decode the message
class Connection():
def __init__(self, fd, appCtx):
self.fd = fd
self.appCtx = appCtx
self.recBuf = BytesBuffer(bytearray())
self.resBuf = BytesBuffer(bytearray())
self.msgLength = 0
self.decoder = RequestDecoder(self.recBuf)
self.inProgress = False # are we in the middle of a message
def recv(this):
data = this.fd.recv(1024)
if not data: # what about len(data) == 0
raise IOError("dead connection")
this.recBuf.write(data)
# TMSP server responds to messges by calling methods on the app
class TMSPServer():
def __init__(self, app, port=5410):
self.app = app
self.appMap = {} # map conn file descriptors to (appContext, msgDecoder)
self.appMap = {} # map conn file descriptors to (appContext, reqBuf, resBuf, msgDecoder)
self.port = port
self.listen_backlog = 10
@ -31,12 +49,13 @@ class TMSPServer():
def handle_new_connection(self, r):
new_fd, new_addr = r.accept()
new_fd.setblocking(0) # non-blocking
self.read_list.append(new_fd)
self.write_list.append(new_fd)
print 'new connection to', new_addr
appContext = self.app.open()
self.appMap[new_fd] = (appContext, RequestDecoder(ConnReader(new_fd)))
self.appMap[new_fd] = Connection(new_fd, appContext)
def handle_conn_closed(self, r):
self.read_list.remove(r)
@ -45,25 +64,63 @@ class TMSPServer():
print "connection closed"
def handle_recv(self, r):
appCtx, conn = self.appMap[r]
response = bytearray()
# appCtx, recBuf, resBuf, conn
conn = self.appMap[r]
while True:
try:
# first read the request type and get the msg decoder
typeByte = conn.reader.read(1)
print "recv loop"
# check if we need more data first
if conn.inProgress:
if conn.msgLength == 0 or conn.recBuf.size() < conn.msgLength:
conn.recv()
else:
if conn.recBuf.size() == 0:
conn.recv()
conn.inProgress = True
# see if we have enough to get the message length
if conn.msgLength == 0:
ll = conn.recBuf.peek()
if conn.recBuf.size() < 1 + ll:
# we don't have enough bytes to read the length yet
return
print "decoding msg length"
conn.msgLength = decode_varint(conn.recBuf)
# see if we have enough to decode the message
if conn.recBuf.size() < conn.msgLength:
return
# now we can decode the message
# first read the request type and get the particular msg decoder
typeByte = conn.recBuf.read(1)
typeByte = int(typeByte[0])
resTypeByte = typeByte+0x10
req_type = message_types[typeByte]
if req_type == "flush":
response += bytearray([resTypeByte])
sent = r.send(str(response))
# messages are length prefixed
conn.resBuf.write(encode(1))
conn.resBuf.write([resTypeByte])
sent = conn.fd.send(str(conn.resBuf.buf))
conn.msgLength = 0
conn.inProgress = False
conn.resBuf = BytesBuffer(bytearray())
return
decoder = getattr(conn, req_type)
decoder = getattr(conn.decoder, req_type)
print "decoding args"
req_args = decoder()
req_f = getattr(appCtx, req_type)
print "got args", req_args
# done decoding message
conn.msgLength = 0
conn.inProgress = False
req_f = getattr(conn.appCtx, req_type)
if req_args == None:
res = req_f()
elif isinstance(req_args, tuple):
@ -82,9 +139,18 @@ class TMSPServer():
print "non-zero retcode:", ret_code
if req_type in ("echo", "info"): # these dont return a ret code
response += bytearray([resTypeByte]) + encode(res)
enc = encode(res)
# messages are length prefixed
conn.resBuf.write(encode(len(enc) + 1))
conn.resBuf.write([resTypeByte])
conn.resBuf.write(enc)
else:
response += bytearray([resTypeByte]) + encode(ret_code) + encode(res)
enc, encRet = encode(res), encode(ret_code)
# messages are length prefixed
conn.resBuf.write(encode(len(enc)+len(encRet)+1))
conn.resBuf.write([resTypeByte])
conn.resBuf.write(encRet)
conn.resBuf.write(enc)
except TypeError as e:
print "TypeError on reading from connection:", e
self.handle_conn_closed(r)
@ -97,8 +163,8 @@ class TMSPServer():
print "IOError on reading from connection:", e
self.handle_conn_closed(r)
return
except:
print "error reading from connection", sys.exc_info()[0] # TODO better
except Exception as e:
print "error reading from connection", str(e) # sys.exc_info()[0] # TODO better
self.handle_conn_closed(r)
return
@ -112,7 +178,7 @@ class TMSPServer():
self.handle_new_connection(r)
# undo adding to read list ...
except NameError as e:
except rameError as e:
print "Could not connect due to NameError:", e
except TypeError as e:
print "Could not connect due to TypeError:", e


+ 4
- 0
tests/test.sh View File

@ -11,3 +11,7 @@ bash tests/test_counter.sh
# test python counter
cd example/python
COUNTER_APP="python app.py" bash $ROOT/tests/test_counter.sh
# test js counter
cd ../js
COUNTER_APP="node app.js" bash $ROOT/tests/test_counter.sh

+ 2
- 2
tests/test_counter.sh View File

@ -11,7 +11,7 @@ $COUNTER_APP &> /dev/null &
PID=`echo $!`
if [[ "$?" != 0 ]]; then
echo "Error running tmsp command"
echo "Error running tmsp app"
echo $OUTPUT
exit 1
fi
@ -24,7 +24,7 @@ append_tx abc
STDIN`
if [[ "$?" != 0 ]]; then
echo "Error running tmsp command"
echo "Error running tmsp batch command"
echo $OUTPUT
exit 1
fi


Loading…
Cancel
Save