Browse Source

Remove TMSP Commit/Rollback; Add CheckTx

pull/1780/head
Jae Kwon 9 years ago
parent
commit
f15476b157
17 changed files with 219 additions and 368 deletions
  1. +21
    -11
      README.md
  2. +26
    -28
      cmd/tmsp-cli/tmsp-cli.go
  3. +31
    -65
      example/golang/counter.go
  4. +13
    -54
      example/golang/dummy.go
  5. +25
    -30
      example/js/app.js
  6. +5
    -10
      example/js/msgs.js
  7. +3
    -5
      example/js/server.js
  8. +12
    -23
      example/python/app.py
  9. +7
    -12
      example/python/tmsp/msg.py
  10. +6
    -10
      example/python/tmsp/server.py
  11. +12
    -23
      example/python3/app.py
  12. +7
    -12
      example/python3/tmsp/msg.py
  13. +6
    -9
      example/python3/tmsp/server.py
  14. +18
    -24
      server/server.go
  15. +3
    -4
      tests/test_dummy.sh
  16. +4
    -16
      types/application.go
  17. +20
    -32
      types/messages.go

+ 21
- 11
README.md View File

@ -16,24 +16,20 @@ For more information on TMSP, motivations, and tutorials, please visit [our blog
* __Usage__:<br/>
Append and run a transaction. The transaction may or may not be final.
#### GetHash
* __Returns__:
* `RetCode (int8)`
* `Hash ([]byte)`
* __Usage__:<br/>
Return a Merkle root hash of the application state
#### Commit
#### CheckTx
* __Arguments__:
* `TxBytes ([]byte)`
* __Returns__:
* `RetCode (int8)`
* __Usage__:<br/>
Finalize all appended transactions
Validate a transaction. This message should not mutate the state.
#### Rollback
#### GetHash
* __Returns__:
* `RetCode (int8)`
* `Hash ([]byte)`
* __Usage__:<br/>
Roll back to the last commit
Return a Merkle root hash of the application state
#### AddListener
* __Arguments__:
@ -70,3 +66,17 @@ For more information on TMSP, motivations, and tutorials, please visit [our blog
* __Usage__:<br/>
Set application options. E.g. Key="mode", Value="mempool" for a mempool connection, or Key="mode", Value="consensus" for a consensus connection.
Other options are application specific.
## Changelog
### Jan 8th, 2016
Tendermint/TMSP now comes to consensus on the order first before AppendTx.
This means that we no longer need the Commit/Rollback TMSP messages.
Instead, there’s a “CheckTx” message for mempool to check the validity of a message.
One consequence is that txs in blocks now may include invalid txs that are ignored.
In the future, we can include a bitarray or merkle structure in the block so anyone can see which txs were valid.
To prevent spam, applications can implement their “CheckTx” messages to deduct some balance, so at least spam txs will cost something. This isn’t any more work that what we already needed to do, so it’s not any worse.
You can see the new changes in the tendermint/tendermint “order_first” branch, and tendermint/tmsp “order_first” branch. If you your TMSP apps to me I can help with the transition.
Please take a look at how the examples in TMSP changed, e.g. how AppContext was removed, CheckTx was added, how the TMSP msg bytes changed, and how commit/rollback messages were removed.

+ 26
- 28
cmd/tmsp-cli/tmsp-cli.go View File

@ -76,24 +76,17 @@ func main() {
},
},
{
Name: "get_hash",
Usage: "Get application Merkle root hash",
Name: "check_tx",
Usage: "Validate a tx",
Action: func(c *cli.Context) {
cmdGetHash(c)
cmdCheckTx(c)
},
},
{
Name: "commit",
Usage: "Commit the application state",
Action: func(c *cli.Context) {
cmdCommit(c)
},
},
{
Name: "rollback",
Usage: "Roll back the application state to the latest commit",
Name: "get_hash",
Usage: "Get application Merkle root hash",
Action: func(c *cli.Context) {
cmdRollback(c)
cmdGetHash(c)
},
},
}
@ -209,31 +202,36 @@ func cmdAppendTx(c *cli.Context) {
fmt.Println("Response:", res)
}
// Get application Merkle root hash
func cmdGetHash(c *cli.Context) {
res, err := makeRequest(conn, types.RequestGetHash{})
if err != nil {
Exit(err.Error())
// Validate a tx
func cmdCheckTx(c *cli.Context) {
args := c.Args()
if len(args) != 1 {
Exit("append_tx takes 1 argument")
}
txString := args[0]
tx := []byte(txString)
if len(txString) > 2 && strings.HasPrefix(txString, "0x") {
var err error
tx, err = hex.DecodeString(txString[2:])
if err != nil {
Exit(err.Error())
}
}
fmt.Printf("%X\n", res.(types.ResponseGetHash).Hash)
}
// Commit the application state
func cmdCommit(c *cli.Context) {
_, err := makeRequest(conn, types.RequestCommit{})
res, err := makeRequest(conn, types.RequestCheckTx{tx})
if err != nil {
Exit(err.Error())
}
fmt.Println("Committed.")
fmt.Println("Response:", res)
}
// Roll back the application state to the latest commit
func cmdRollback(c *cli.Context) {
_, err := makeRequest(conn, types.RequestRollback{})
// Get application Merkle root hash
func cmdGetHash(c *cli.Context) {
res, err := makeRequest(conn, types.RequestGetHash{})
if err != nil {
Exit(err.Error())
}
fmt.Println("Rolled back.")
fmt.Printf("%X\n", res.(types.ResponseGetHash).Hash)
}
//--------------------------------------------------------------------------------


+ 31
- 65
example/golang/counter.go View File

@ -2,111 +2,77 @@ package example
import (
"encoding/binary"
"sync"
. "github.com/tendermint/go-common"
"github.com/tendermint/tmsp/types"
)
type CounterApplication struct {
mtx sync.Mutex
hashCount int
txCount int
commitCount int
serial bool
hashCount int
txCount int
serial bool
}
func NewCounterApplication(serial bool) *CounterApplication {
return &CounterApplication{serial: serial}
}
func (app *CounterApplication) Open() types.AppContext {
return &CounterAppContext{
app: app,
hashCount: app.hashCount,
txCount: app.txCount,
commitCount: app.commitCount,
serial: app.serial,
}
}
//--------------------------------------------------------------------------------
type CounterAppContext struct {
app *CounterApplication
hashCount int
txCount int
commitCount int
serial bool
}
func (appC *CounterAppContext) Echo(message string) string {
func (app *CounterApplication) Echo(message string) string {
return message
}
func (appC *CounterAppContext) Info() []string {
return []string{Fmt("hash, tx, commit counts:%d, %d, %d", appC.hashCount, appC.txCount, appC.commitCount)}
func (app *CounterApplication) Info() []string {
return []string{Fmt("hashes:%v, txs:%v", app.hashCount, app.txCount)}
}
func (appC *CounterAppContext) SetOption(key string, value string) types.RetCode {
func (app *CounterApplication) SetOption(key string, value string) types.RetCode {
if key == "serial" && value == "on" {
appC.serial = true
app.serial = true
}
return 0
}
func (appC *CounterAppContext) AppendTx(tx []byte) ([]types.Event, types.RetCode) {
if appC.serial {
func (app *CounterApplication) AppendTx(tx []byte) ([]types.Event, types.RetCode) {
if app.serial {
tx8 := make([]byte, 8)
copy(tx8, tx)
txValue := binary.LittleEndian.Uint64(tx8)
if txValue != uint64(appC.txCount) {
if txValue != uint64(app.txCount) {
return nil, types.RetCodeInternalError
}
}
appC.txCount += 1
app.txCount += 1
return nil, 0
}
func (appC *CounterAppContext) GetHash() ([]byte, types.RetCode) {
appC.hashCount += 1
if appC.txCount == 0 {
func (app *CounterApplication) CheckTx(tx []byte) types.RetCode {
if app.serial {
tx8 := make([]byte, 8)
copy(tx8, tx)
txValue := binary.LittleEndian.Uint64(tx8)
if txValue < uint64(app.txCount) {
return types.RetCodeInternalError
}
}
return 0
}
func (app *CounterApplication) GetHash() ([]byte, types.RetCode) {
app.hashCount += 1
if app.txCount == 0 {
return nil, 0
} else {
hash := make([]byte, 32)
binary.LittleEndian.PutUint64(hash, uint64(appC.txCount))
binary.LittleEndian.PutUint64(hash, uint64(app.txCount))
return hash, 0
}
}
func (appC *CounterAppContext) Commit() types.RetCode {
appC.commitCount += 1
appC.app.mtx.Lock()
appC.app.hashCount = appC.hashCount
appC.app.txCount = appC.txCount
appC.app.commitCount = appC.commitCount
appC.app.mtx.Unlock()
return 0
}
func (appC *CounterAppContext) Rollback() types.RetCode {
appC.app.mtx.Lock()
appC.hashCount = appC.app.hashCount
appC.txCount = appC.app.txCount
appC.commitCount = appC.app.commitCount
appC.app.mtx.Unlock()
return 0
}
func (appC *CounterAppContext) AddListener(key string) types.RetCode {
func (app *CounterApplication) AddListener(key string) types.RetCode {
return 0
}
func (appC *CounterAppContext) RemListener(key string) types.RetCode {
func (app *CounterApplication) RemListener(key string) types.RetCode {
return 0
}
func (appC *CounterAppContext) Close() error {
return nil
}

+ 13
- 54
example/golang/dummy.go View File

@ -1,8 +1,6 @@
package example
import (
"sync"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-merkle"
"github.com/tendermint/go-wire"
@ -10,7 +8,6 @@ import (
)
type DummyApplication struct {
mtx sync.Mutex
state merkle.Tree
}
@ -24,74 +21,36 @@ func NewDummyApplication() *DummyApplication {
return &DummyApplication{state: state}
}
func (dapp *DummyApplication) Open() types.AppContext {
dapp.mtx.Lock()
defer dapp.mtx.Unlock()
return &DummyAppContext{
app: dapp,
state: dapp.state.Copy(),
}
}
func (dapp *DummyApplication) commitState(state merkle.Tree) {
dapp.mtx.Lock()
defer dapp.mtx.Unlock()
dapp.state = state.Copy()
}
func (dapp *DummyApplication) getState() merkle.Tree {
dapp.mtx.Lock()
defer dapp.mtx.Unlock()
return dapp.state.Copy()
}
//--------------------------------------------------------------------------------
type DummyAppContext struct {
app *DummyApplication
state merkle.Tree
}
func (dac *DummyAppContext) Echo(message string) string {
func (app *DummyApplication) Echo(message string) string {
return message
}
func (dac *DummyAppContext) Info() []string {
return []string{Fmt("size:%v", dac.state.Size())}
func (app *DummyApplication) Info() []string {
return []string{Fmt("size:%v", app.state.Size())}
}
func (dac *DummyAppContext) SetOption(key string, value string) types.RetCode {
func (app *DummyApplication) SetOption(key string, value string) types.RetCode {
return 0
}
func (dac *DummyAppContext) AppendTx(tx []byte) ([]types.Event, types.RetCode) {
dac.state.Set(tx, tx)
func (app *DummyApplication) AppendTx(tx []byte) ([]types.Event, types.RetCode) {
app.state.Set(tx, tx)
return nil, 0
}
func (dac *DummyAppContext) GetHash() ([]byte, types.RetCode) {
hash := dac.state.Hash()
return hash, 0
}
func (dac *DummyAppContext) Commit() types.RetCode {
dac.app.commitState(dac.state)
return 0
func (app *DummyApplication) CheckTx(tx []byte) types.RetCode {
return 0 // all txs are valid
}
func (dac *DummyAppContext) Rollback() types.RetCode {
dac.state = dac.app.getState()
return 0
func (app *DummyApplication) GetHash() ([]byte, types.RetCode) {
hash := app.state.Hash()
return hash, 0
}
func (dac *DummyAppContext) AddListener(key string) types.RetCode {
func (app *DummyApplication) AddListener(key string) types.RetCode {
return 0
}
func (dac *DummyAppContext) RemListener(key string) types.RetCode {
func (app *DummyApplication) RemListener(key string) types.RetCode {
return 0
}
func (dac *DummyAppContext) Close() error {
return nil
}

+ 25
- 30
example/js/app.js View File

@ -5,36 +5,25 @@ util = require("util")
function CounterApp(){
this.hashCount = 0;
this.txCount = 0;
this.commitCount = 0;
this.serial = false;
};
CounterApp.prototype.open = function(){
return new CounterAppContext(this);
}
function CounterAppContext(app) {
this.hashCount = app.hashCount;
this.txCount = app.txCount;
this.commitCount = app.commitCount;
this.serial = false;
}
CounterAppContext.prototype.echo = function(msg){
CounterApp.prototype.echo = function(msg){
return {"response": msg, "ret_code":0}
}
CounterAppContext.prototype.info = function(){
return {"response": [util.format("hash, tx, commit counts: %d, %d, %d", this.hashCount, this.txCount, this.commitCount)]}
CounterApp.prototype.info = function(){
return {"response": [util.format("hashes:%d, txs:%d", this.hashCount, this.txCount)]}
}
CounterAppContext.prototype.set_option = function(key, value){
CounterApp.prototype.set_option = function(key, value){
if (key == "serial" && value == "on"){
this.serial = true;
}
return {"ret_code":0}
}
CounterAppContext.prototype.append_tx = function(txBytes){
CounterApp.prototype.append_tx = function(txBytes){
if (this.serial) {
txByteArray = new Buffer(txBytes)
if (txBytes.length >= 2 && txBytes.slice(0, 2) == "0x") {
@ -50,7 +39,22 @@ CounterAppContext.prototype.append_tx = function(txBytes){
return {"ret_code":0} // TODO: return events
}
CounterAppContext.prototype.get_hash = function(){
CounterApp.prototype.check_tx = function(txBytes){
if (this.serial) {
txByteArray = new Buffer(txBytes)
if (txBytes.length >= 2 && txBytes.slice(0, 2) == "0x") {
txByteArray = wire.hex2bytes(txBytes.slice(2));
}
r = new msg.buffer(txByteArray)
txValue = wire.decode_big_endian(r, txBytes.length)
if (txValue < this.txCount){
return {"ret_code":1}
}
}
return {"ret_code":0}
}
CounterApp.prototype.get_hash = function(){
this.hashCount += 1;
if (this.txCount == 0){
return {"response": "", "ret_code":0}
@ -60,24 +64,15 @@ CounterAppContext.prototype.get_hash = function(){
return {"response": h.toString(), "ret_code":0}
}
CounterAppContext.prototype.commit = function(){
this.commitCount += 1;
return {"ret_code":0}
}
CounterAppContext.prototype.rollback = function(){
return {"ret_code":0}
}
CounterAppContext.prototype.add_listener = function(){
CounterApp.prototype.add_listener = function(){
return {"ret_code":0}
}
CounterAppContext.prototype.rm_listener = function(){
CounterApp.prototype.rm_listener = function(){
return {"ret_code":0}
}
CounterAppContext.prototype.event = function(){
CounterApp.prototype.event = function(){
}
console.log("Counter app in Javascript")


+ 5
- 10
example/js/msgs.js View File

@ -7,17 +7,13 @@ module.exports = {
0x03 : "info",
0x04 : "set_option",
0x21 : "append_tx",
0x22 : "get_hash",
0x23 : "commit",
0x24 : "rollback",
0x25 : "add_listener",
0x26 : "rm_listener",
0x22 : "check_tx",
0x23 : "get_hash",
0x24 : "add_listener",
0x25 : "rm_listener",
},
decoder : RequestDecoder,
buffer: BytesBuffer
}
function RequestDecoder(buf){
@ -32,9 +28,8 @@ RequestDecoder.prototype.flush = function(){};
RequestDecoder.prototype.info = function(){};
RequestDecoder.prototype.set_option = function(){ return [decode_string(this.buf), decode_string(this.buf)] };
RequestDecoder.prototype.append_tx = function(){ return decode_string(this.buf)};
RequestDecoder.prototype.check_tx = function(){ return decode_string(this.buf)};
RequestDecoder.prototype.get_hash = function(){ };
RequestDecoder.prototype.commit = function(){ };
RequestDecoder.prototype.rollback = function(){ };
RequestDecoder.prototype.add_listener = function(){ }; // TODO
RequestDecoder.prototype.rm_listener = function(){ }; // TODO


+ 3
- 5
example/js/server.js View File

@ -27,8 +27,6 @@ AppServer.prototype.createServer = function(){
socket.name = socket.remoteAddress + ":" + socket.remotePort
console.log("new connection from", socket.name)
appCtx = app.open()
var conn = {
recBuf: new msg.buffer(new Buffer(0)),
resBuf: new msg.buffer(new Buffer(0)),
@ -90,11 +88,11 @@ AppServer.prototype.createServer = function(){
var res = function(){
if (args == null){
return appCtx[reqType]();
return app[reqType]();
} else if (Array.isArray(args)){
return appCtx[reqType].apply(appCtx, args);
return app[reqType].apply(app, args);
} else {
return appCtx[reqType](args)
return app[reqType](args)
}
}()


+ 12
- 23
example/python/app.py View File

@ -10,28 +10,13 @@ class CounterApplication():
def __init__(self):
self.hashCount = 0
self.txCount = 0
self.commitCount = 0
def open(self):
return CounterAppContext(self)
class CounterAppContext():
def __init__(self, app):
self.app = app
self.hashCount = app.hashCount
self.txCount = app.txCount
self.commitCount = app.commitCount
self.serial = False
def echo(self, msg):
return msg, 0
def info(self):
return ["hash, tx, commit counts:%d, %d, %d" % (self.hashCount,
self.txCount,
self.commitCount)], 0
return ["hashes:%d, txs:%d" % (self.hashCount, self.txCount)], 0
def set_option(self, key, value):
if key == "serial" and value == "on":
@ -50,6 +35,17 @@ class CounterAppContext():
self.txCount += 1
return None, 0
def check_tx(self, txBytes):
if self.serial:
txByteArray = bytearray(txBytes)
if len(txBytes) >= 2 and txBytes[:2] == "0x":
txByteArray = hex2bytes(txBytes[2:])
txValue = decode_big_endian(
BytesBuffer(txByteArray), len(txBytes))
if txValue < self.txCount:
return 1
return 0
def get_hash(self):
self.hashCount += 1
if self.txCount == 0:
@ -58,13 +54,6 @@ class CounterAppContext():
h.reverse()
return str(h), 0
def commit(self):
self.commitCount += 1
return 0
def rollback(self):
return 0
def add_listener(self):
return 0


+ 7
- 12
example/python/tmsp/msg.py View File

@ -7,16 +7,14 @@ message_types = {
0x03: "info",
0x04: "set_option",
0x21: "append_tx",
0x22: "get_hash",
0x23: "commit",
0x24: "rollback",
0x25: "add_listener",
0x26: "rm_listener",
0x22: "check_tx",
0x23: "get_hash",
0x24: "add_listener",
0x25: "rm_listener",
}
# return the decoded arguments of tmsp messages
class RequestDecoder():
def __init__(self, reader):
@ -37,13 +35,10 @@ class RequestDecoder():
def append_tx(self):
return decode_string(self.reader)
def get_hash(self):
return
def commit(self):
return
def check_tx(self):
return decode_string(self.reader)
def rollback(self):
def get_hash(self):
return
def add_listener(self):


+ 6
- 10
example/python/tmsp/server.py View File

@ -2,7 +2,6 @@ import socket
import select
import sys
from wire import decode_varint, encode
from reader import BytesBuffer
from msg import RequestDecoder, message_types
@ -10,12 +9,11 @@ from msg import RequestDecoder, message_types
# hold the asyncronous state of a connection
# ie. we may not get enough bytes on one read to decode the message
class Connection():
def __init__(self, fd, appCtx):
def __init__(self, fd, app):
self.fd = fd
self.appCtx = appCtx
self.app = app
self.recBuf = BytesBuffer(bytearray())
self.resBuf = BytesBuffer(bytearray())
self.msgLength = 0
@ -30,12 +28,11 @@ class Connection():
# TMSP server responds to messges by calling methods on the app
class TMSPServer():
def __init__(self, app, port=5410):
self.app = app
# map conn file descriptors to (appContext, reqBuf, resBuf, msgDecoder)
# map conn file descriptors to (app, reqBuf, resBuf, msgDecoder)
self.appMap = {}
self.port = port
@ -60,8 +57,7 @@ class TMSPServer():
self.write_list.append(new_fd)
print 'new connection to', new_addr
appContext = self.app.open()
self.appMap[new_fd] = Connection(new_fd, appContext)
self.appMap[new_fd] = Connection(new_fd, self.app)
def handle_conn_closed(self, r):
self.read_list.remove(r)
@ -70,7 +66,7 @@ class TMSPServer():
print "connection closed"
def handle_recv(self, r):
# appCtx, recBuf, resBuf, conn
# app, recBuf, resBuf, conn
conn = self.appMap[r]
while True:
try:
@ -127,7 +123,7 @@ class TMSPServer():
conn.msgLength = 0
conn.inProgress = False
req_f = getattr(conn.appCtx, req_type)
req_f = getattr(conn.app, req_type)
if req_args is None:
res = req_f()
elif isinstance(req_args, tuple):


+ 12
- 23
example/python3/app.py View File

@ -10,28 +10,13 @@ class CounterApplication():
def __init__(self):
self.hashCount = 0
self.txCount = 0
self.commitCount = 0
def open(self):
return CounterAppContext(self)
class CounterAppContext():
def __init__(self, app):
self.app = app
self.hashCount = app.hashCount
self.txCount = app.txCount
self.commitCount = app.commitCount
self.serial = False
def echo(self, msg):
return msg, 0
def info(self):
return ["hash, tx, commit counts:%d, %d, %d" % (self.hashCount,
self.txCount,
self.commitCount)], 0
return ["hashes:%d, txs:%d" % (self.hashCount, self.txCount)], 0
def set_option(self, key, value):
if key == "serial" and value == "on":
@ -50,6 +35,17 @@ class CounterAppContext():
self.txCount += 1
return None, 0
def check_tx(self, txBytes):
if self.serial:
txByteArray = bytearray(txBytes)
if len(txBytes) >= 2 and txBytes[:2] == "0x":
txByteArray = hex2bytes(txBytes[2:])
txValue = decode_big_endian(
BytesBuffer(txByteArray), len(txBytes))
if txValue < self.txCount:
return 1
return 0
def get_hash(self):
self.hashCount += 1
if self.txCount == 0:
@ -58,13 +54,6 @@ class CounterAppContext():
h.reverse()
return h.decode(), 0
def commit(self):
self.commitCount += 1
return 0
def rollback(self):
return 0
def add_listener(self):
return 0


+ 7
- 12
example/python3/tmsp/msg.py View File

@ -7,16 +7,14 @@ message_types = {
0x03: "info",
0x04: "set_option",
0x21: "append_tx",
0x22: "get_hash",
0x23: "commit",
0x24: "rollback",
0x25: "add_listener",
0x26: "rm_listener",
0x22: "check_tx",
0x23: "get_hash",
0x24: "add_listener",
0x25: "rm_listener",
}
# return the decoded arguments of tmsp messages
class RequestDecoder():
def __init__(self, reader):
@ -37,13 +35,10 @@ class RequestDecoder():
def append_tx(self):
return decode_string(self.reader)
def get_hash(self):
return
def commit(self):
return
def check_tx(self):
return decode_string(self.reader)
def rollback(self):
def get_hash(self):
return
def add_listener(self):


+ 6
- 9
example/python3/tmsp/server.py View File

@ -12,12 +12,11 @@ from .msg import RequestDecoder, message_types
logger = logging.getLogger(__name__)
class Connection():
def __init__(self, fd, appCtx):
def __init__(self, fd, app):
self.fd = fd
self.appCtx = appCtx
self.app = app
self.recBuf = BytesBuffer(bytearray())
self.resBuf = BytesBuffer(bytearray())
self.msgLength = 0
@ -32,12 +31,11 @@ class Connection():
# TMSP server responds to messges by calling methods on the app
class TMSPServer():
def __init__(self, app, port=5410):
self.app = app
# map conn file descriptors to (appContext, reqBuf, resBuf, msgDecoder)
# map conn file descriptors to (app, reqBuf, resBuf, msgDecoder)
self.appMap = {}
self.port = port
@ -62,8 +60,7 @@ class TMSPServer():
self.write_list.append(new_fd)
print('new connection to', new_addr)
appContext = self.app.open()
self.appMap[new_fd] = Connection(new_fd, appContext)
self.appMap[new_fd] = Connection(new_fd, self.app)
def handle_conn_closed(self, r):
self.read_list.remove(r)
@ -72,7 +69,7 @@ class TMSPServer():
print("connection closed")
def handle_recv(self, r):
# appCtx, recBuf, resBuf, conn
# app, recBuf, resBuf, conn
conn = self.appMap[r]
while True:
try:
@ -129,7 +126,7 @@ class TMSPServer():
conn.msgLength = 0
conn.inProgress = False
req_f = getattr(conn.appCtx, req_type)
req_f = getattr(conn.app, req_type)
if req_args is None:
res = req_f()
elif isinstance(req_args, tuple):


+ 18
- 24
server/server.go View File

@ -6,6 +6,7 @@ import (
"io"
"net"
"strings"
"sync"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
@ -15,6 +16,7 @@ import (
// var maxNumberConnections = 2
func StartListener(protoAddr string, app types.Application) (net.Listener, error) {
var mtx sync.Mutex // global mutex
parts := strings.SplitN(protoAddr, "://", 2)
proto, addr := parts[0], parts[1]
ln, err := net.Listen(proto, addr)
@ -38,12 +40,11 @@ func StartListener(protoAddr string, app types.Application) (net.Listener, error
fmt.Println("Accepted a new connection")
}
appContext := app.Open()
closeConn := make(chan error, 2) // Push to signal connection closed
responses := make(chan types.Response, 1000) // A channel to buffer responses
// Read requests from conn and deal with them
go handleRequests(appContext, closeConn, conn, responses)
go handleRequests(&mtx, app, closeConn, conn, responses)
// Pull responses from 'responses' and write them to conn.
go handleResponses(closeConn, responses, conn)
@ -62,12 +63,6 @@ func StartListener(protoAddr string, app types.Application) (net.Listener, error
fmt.Printf("Error in closing connection: %v\n", err)
}
// Close the AppContext
err = appContext.Close()
if err != nil {
fmt.Printf("Error in closing app context: %v\n", err)
}
// <-semaphore
}()
}
@ -78,7 +73,7 @@ func StartListener(protoAddr string, app types.Application) (net.Listener, error
}
// Read requests from conn and deal with them
func handleRequests(appC types.AppContext, closeConn chan error, conn net.Conn, responses chan<- types.Response) {
func handleRequests(mtx *sync.Mutex, app types.Application, closeConn chan error, conn net.Conn, responses chan<- types.Response) {
var count int
var bufReader = bufio.NewReader(conn)
for {
@ -94,44 +89,43 @@ func handleRequests(appC types.AppContext, closeConn chan error, conn net.Conn,
}
return
}
mtx.Lock()
count++
handleRequest(appC, req, responses)
handleRequest(app, req, responses)
mtx.Unlock()
}
}
func handleRequest(appC types.AppContext, req types.Request, responses chan<- types.Response) {
func handleRequest(app types.Application, req types.Request, responses chan<- types.Response) {
switch req := req.(type) {
case types.RequestEcho:
msg := appC.Echo(req.Message)
msg := app.Echo(req.Message)
responses <- types.ResponseEcho{msg}
case types.RequestFlush:
responses <- types.ResponseFlush{}
case types.RequestInfo:
data := appC.Info()
data := app.Info()
responses <- types.ResponseInfo{data}
case types.RequestSetOption:
retCode := appC.SetOption(req.Key, req.Value)
retCode := app.SetOption(req.Key, req.Value)
responses <- types.ResponseSetOption{retCode}
case types.RequestAppendTx:
events, retCode := appC.AppendTx(req.TxBytes)
events, retCode := app.AppendTx(req.TxBytes)
responses <- types.ResponseAppendTx{retCode}
for _, event := range events {
responses <- types.ResponseEvent{event}
}
case types.RequestCheckTx:
retCode := app.CheckTx(req.TxBytes)
responses <- types.ResponseCheckTx{retCode}
case types.RequestGetHash:
hash, retCode := appC.GetHash()
hash, retCode := app.GetHash()
responses <- types.ResponseGetHash{retCode, hash}
case types.RequestCommit:
retCode := appC.Commit()
responses <- types.ResponseCommit{retCode}
case types.RequestRollback:
retCode := appC.Rollback()
responses <- types.ResponseRollback{retCode}
case types.RequestAddListener:
retCode := appC.AddListener(req.EventKey)
retCode := app.AddListener(req.EventKey)
responses <- types.ResponseAddListener{retCode}
case types.RequestRemListener:
retCode := appC.RemListener(req.EventKey)
retCode := app.RemListener(req.EventKey)
responses <- types.ResponseRemListener{retCode}
default:
responses <- types.ResponseException{"Unknown request"}


+ 3
- 4
tests/test_dummy.sh View File

@ -13,17 +13,16 @@ fi
echo "... Pass!"
echo ""
# Add a tx, get hash, commit, get hash
# Add a tx, get hash, get hash
# hashes should be non-empty and identical
echo "Dummy batch test ..."
OUTPUT=`(tmsp-cli batch) <<STDIN
append_tx abc
get_hash
commit
get_hash
STDIN`
HASH1=`echo "$OUTPUT" | tail -n 3 | head -n 1`
HASH1=`echo "$OUTPUT" | tail -n 2 | head -n 1`
HASH2=`echo "$OUTPUT" | tail -n 1`
if [[ "$HASH1" == "" ]]; then
@ -37,7 +36,7 @@ if [[ "$HASH1" == "EOF" ]]; then
fi
if [[ "$HASH1" != "$HASH2" ]]; then
echo "Expected hashes before and after commit to match: $HASH1, $HASH2"
echo "Expected first and second hashes to match: $HASH1, $HASH2"
exit 1
fi
echo "... Pass!"


+ 4
- 16
types/application.go View File

@ -2,12 +2,6 @@ package types
type Application interface {
// For new socket connections
Open() AppContext
}
type AppContext interface {
// Echo a message
Echo(message string) string
@ -17,24 +11,18 @@ type AppContext interface {
// Set application option (e.g. mode=mempool, mode=consensus)
SetOption(key string, value string) RetCode
// Append a tx, which may or may not get committed
// Append a tx
AppendTx(tx []byte) ([]Event, RetCode)
// Validate a tx for the mempool
CheckTx(tx []byte) RetCode
// Return the application Merkle root hash
GetHash() ([]byte, RetCode)
// Set commit checkpoint
Commit() RetCode
// Rollback to the latest commit
Rollback() RetCode
// Add event listener
AddListener(key string) RetCode
// Remove event listener
RemListener(key string) RetCode
// Close this AppContext
Close() error
}

+ 20
- 32
types/messages.go View File

@ -17,20 +17,18 @@ const (
// reserved for GetOption = byte(0x15)
requestTypeAppendTx = byte(0x21)
requestTypeGetHash = byte(0x22)
requestTypeCommit = byte(0x23)
requestTypeRollback = byte(0x24)
requestTypeAddListener = byte(0x25)
requestTypeRemListener = byte(0x26)
// reserved for responseTypeEvent 0x27
requestTypeCheckTx = byte(0x22)
requestTypeGetHash = byte(0x23)
requestTypeAddListener = byte(0x24)
requestTypeRemListener = byte(0x25)
// reserved for responseTypeEvent 0x26
responseTypeAppendTx = byte(0x31)
responseTypeGetHash = byte(0x32)
responseTypeCommit = byte(0x33)
responseTypeRollback = byte(0x34)
responseTypeAddListener = byte(0x35)
responseTypeRemListener = byte(0x36)
responseTypeEvent = byte(0x37)
responseTypeCheckTx = byte(0x32)
responseTypeGetHash = byte(0x33)
responseTypeAddListener = byte(0x34)
responseTypeRemListener = byte(0x35)
responseTypeEvent = byte(0x36)
)
//----------------------------------------
@ -54,13 +52,11 @@ type RequestAppendTx struct {
TxBytes []byte
}
type RequestGetHash struct {
}
type RequestCommit struct {
type RequestCheckTx struct {
TxBytes []byte
}
type RequestRollback struct {
type RequestGetHash struct {
}
type RequestAddListener struct {
@ -80,9 +76,8 @@ func (_ RequestFlush) AssertRequestType() {}
func (_ RequestInfo) AssertRequestType() {}
func (_ RequestSetOption) AssertRequestType() {}
func (_ RequestAppendTx) AssertRequestType() {}
func (_ RequestCheckTx) AssertRequestType() {}
func (_ RequestGetHash) AssertRequestType() {}
func (_ RequestCommit) AssertRequestType() {}
func (_ RequestRollback) AssertRequestType() {}
func (_ RequestAddListener) AssertRequestType() {}
func (_ RequestRemListener) AssertRequestType() {}
@ -93,9 +88,8 @@ var _ = wire.RegisterInterface(
wire.ConcreteType{RequestInfo{}, requestTypeInfo},
wire.ConcreteType{RequestSetOption{}, requestTypeSetOption},
wire.ConcreteType{RequestAppendTx{}, requestTypeAppendTx},
wire.ConcreteType{RequestCheckTx{}, requestTypeCheckTx},
wire.ConcreteType{RequestGetHash{}, requestTypeGetHash},
wire.ConcreteType{RequestCommit{}, requestTypeCommit},
wire.ConcreteType{RequestRollback{}, requestTypeRollback},
wire.ConcreteType{RequestAddListener{}, requestTypeAddListener},
wire.ConcreteType{RequestRemListener{}, requestTypeRemListener},
)
@ -121,17 +115,13 @@ type ResponseAppendTx struct {
RetCode
}
type ResponseGetHash struct {
RetCode
Hash []byte
}
type ResponseCommit struct {
type ResponseCheckTx struct {
RetCode
}
type ResponseRollback struct {
type ResponseGetHash struct {
RetCode
Hash []byte
}
type ResponseAddListener struct {
@ -159,9 +149,8 @@ func (_ ResponseFlush) AssertResponseType() {}
func (_ ResponseInfo) AssertResponseType() {}
func (_ ResponseSetOption) AssertResponseType() {}
func (_ ResponseAppendTx) AssertResponseType() {}
func (_ ResponseCheckTx) AssertResponseType() {}
func (_ ResponseGetHash) AssertResponseType() {}
func (_ ResponseCommit) AssertResponseType() {}
func (_ ResponseRollback) AssertResponseType() {}
func (_ ResponseAddListener) AssertResponseType() {}
func (_ ResponseRemListener) AssertResponseType() {}
func (_ ResponseException) AssertResponseType() {}
@ -174,9 +163,8 @@ var _ = wire.RegisterInterface(
wire.ConcreteType{ResponseInfo{}, responseTypeInfo},
wire.ConcreteType{ResponseSetOption{}, responseTypeSetOption},
wire.ConcreteType{ResponseAppendTx{}, responseTypeAppendTx},
wire.ConcreteType{ResponseCheckTx{}, responseTypeCheckTx},
wire.ConcreteType{ResponseGetHash{}, responseTypeGetHash},
wire.ConcreteType{ResponseCommit{}, responseTypeCommit},
wire.ConcreteType{ResponseRollback{}, responseTypeRollback},
wire.ConcreteType{ResponseAddListener{}, responseTypeAddListener},
wire.ConcreteType{ResponseRemListener{}, responseTypeRemListener},
wire.ConcreteType{ResponseException{}, responseTypeException},


Loading…
Cancel
Save