@ -0,0 +1,63 @@ | |||
# Tendermint Streaming Protocol (TMSP) | |||
**TMSP** is a socket protocol, which means applications can be written in any programming language. | |||
TMSP is an asynchronous streaming protocol: message responses are written back asynchronously to the platform. | |||
*Applications must be deterministic.* | |||
## Message types | |||
#### AppendTx | |||
* __Arguments__: | |||
* `TxBytes ([]byte)` | |||
* __Returns__: | |||
* `RetCode (int8)` | |||
* __Usage__:<br/> | |||
Append and run a transaction. The transaction may or may not be final. | |||
#### GetHash | |||
* __Returns__: | |||
* `RetCode (int8)` | |||
* `Hash ([]byte)` | |||
* __Usage__:<br/> | |||
Return a Merkle root hash of the application state | |||
#### Commit | |||
* __Returns__: | |||
* `RetCode (int8)` | |||
* __Usage__:<br/> | |||
Finalize all appended transactions | |||
#### Rollback | |||
* __Returns__: | |||
* `RetCode (int8)` | |||
* __Usage__:<br/> | |||
Roll back to the last commit | |||
#### SetEventsMode | |||
* __Arguments__: | |||
* `EventsMode (int8)`: | |||
* `EventsModeOff (0)`: Events are not reported. Used for mempool. | |||
* `EventsModeCached (1)`: Events are cached. | |||
* `EventsModeOn (2)`: Flush cache and report events. | |||
* __Returns__: | |||
* `RetCode (int8)` | |||
* __Usage__:<br/> | |||
Set event reporting mode for future transactions | |||
#### AddListener | |||
* __Arguments__: | |||
* `EventKey (string)` | |||
* __Returns__: | |||
* `RetCode (int8)` | |||
* __Usage__:<br/> | |||
Add event listener callback for events with given key. | |||
#### RemoveListener | |||
* __Arguments__: | |||
* `EventKey (string)` | |||
* __Returns__: | |||
* `RetCode (int8)` | |||
* __Usage__:<br/> | |||
Remove event listener callback for events with given key. | |||
@ -0,0 +1,127 @@ | |||
package main | |||
import ( | |||
"fmt" | |||
"net" | |||
"os" | |||
. "github.com/tendermint/go-common" | |||
"github.com/tendermint/go-wire" | |||
"github.com/tendermint/tmsp/types" | |||
"github.com/codegangsta/cli" | |||
) | |||
func main() { | |||
app := cli.NewApp() | |||
app.Name = "cli" | |||
app.Usage = "cli [command] [args...]" | |||
app.Flags = []cli.Flag{ | |||
cli.StringFlag{ | |||
Name: "address", | |||
Value: "tcp://127.0.0.1:8080", | |||
Usage: "address of application socket", | |||
}, | |||
} | |||
app.Commands = []cli.Command{ | |||
{ | |||
Name: "append_tx", | |||
Usage: "Append a new tx to application", | |||
Action: func(c *cli.Context) { | |||
cmdAppendTx(c) | |||
}, | |||
}, | |||
{ | |||
Name: "get_hash", | |||
Usage: "Get application Merkle root hash", | |||
Action: func(c *cli.Context) { | |||
cmdGetHash(c) | |||
}, | |||
}, | |||
{ | |||
Name: "commit", | |||
Usage: "Commit the application state", | |||
Action: func(c *cli.Context) { | |||
cmdCommit(c) | |||
}, | |||
}, | |||
{ | |||
Name: "rollback", | |||
Usage: "Roll back the application state to the latest commit", | |||
Action: func(c *cli.Context) { | |||
cmdRollback(c) | |||
}, | |||
}, | |||
} | |||
app.Run(os.Args) | |||
} | |||
//-------------------------------------------------------------------------------- | |||
// Append a new tx to application | |||
func cmdAppendTx(c *cli.Context) { | |||
args := c.Args() // Args to AppendTx | |||
conn, err := Connect(c.GlobalString("address")) | |||
if err != nil { | |||
Exit(err.Error()) | |||
} | |||
res, err := write(conn, types.RequestAppendTx{[]byte(args[0])}) | |||
if err != nil { | |||
Exit(err.Error()) | |||
} | |||
fmt.Println("Sent tx:", args[0], "response:", res) | |||
} | |||
// Get application Merkle root hash | |||
func cmdGetHash(c *cli.Context) { | |||
conn, err := Connect(c.GlobalString("address")) | |||
if err != nil { | |||
Exit(err.Error()) | |||
} | |||
res, err := write(conn, types.RequestGetHash{}) | |||
if err != nil { | |||
Exit(err.Error()) | |||
} | |||
fmt.Println("Got hash:", Fmt("%X", res.(types.ResponseGetHash).Hash)) | |||
} | |||
// Commit the application state | |||
func cmdCommit(c *cli.Context) { | |||
conn, err := Connect(c.GlobalString("address")) | |||
if err != nil { | |||
Exit(err.Error()) | |||
} | |||
_, err = write(conn, types.RequestCommit{}) | |||
if err != nil { | |||
Exit(err.Error()) | |||
} | |||
fmt.Println("Committed.") | |||
} | |||
// Roll back the application state to the latest commit | |||
func cmdRollback(c *cli.Context) { | |||
conn, err := Connect(c.GlobalString("address")) | |||
if err != nil { | |||
Exit(err.Error()) | |||
} | |||
_, err = write(conn, types.RequestRollback{}) | |||
if err != nil { | |||
Exit(err.Error()) | |||
} | |||
fmt.Println("Rolled back.") | |||
} | |||
//-------------------------------------------------------------------------------- | |||
func write(conn net.Conn, req types.Request) (types.Response, error) { | |||
var n int64 | |||
var err error | |||
wire.WriteBinary(req, conn, &n, &err) | |||
if err != nil { | |||
return nil, err | |||
} | |||
var res types.Response | |||
wire.ReadBinaryPtr(&res, conn, &n, &err) | |||
return res, err | |||
} |
@ -0,0 +1,84 @@ | |||
package main | |||
import ( | |||
. "github.com/tendermint/go-common" | |||
"github.com/tendermint/go-merkle" | |||
"github.com/tendermint/go-wire" | |||
"github.com/tendermint/tmsp/server" | |||
"github.com/tendermint/tmsp/types" | |||
) | |||
func main() { | |||
// Start the listener | |||
_, err := server.StartListener("tcp://127.0.0.1:8080", &DummyApplication{}) | |||
if err != nil { | |||
Exit(err.Error()) | |||
} | |||
// Wait forever | |||
TrapSignal(func() { | |||
// Cleanup | |||
}) | |||
} | |||
//-------------------------------------------------------------------------------- | |||
type DummyApplication struct { | |||
state merkle.Tree | |||
lastCommitState merkle.Tree | |||
} | |||
func NewDummyApplication() *DummyApplication { | |||
state := merkle.NewIAVLTree( | |||
wire.BasicCodec, | |||
wire.BasicCodec, | |||
0, | |||
nil, | |||
) | |||
return &DummyApplication{ | |||
state: state, | |||
lastCommitState: state, | |||
} | |||
} | |||
func (dapp *DummyApplication) Echo(message string) (types.RetCode, string) { | |||
return 0, message | |||
} | |||
func (dapp *DummyApplication) AppendTx(tx []byte) types.RetCode { | |||
dapp.state.Set(tx, tx) | |||
return 0 | |||
} | |||
func (dapp *DummyApplication) GetHash() ([]byte, types.RetCode) { | |||
hash := dapp.state.Hash() | |||
return hash, 0 | |||
} | |||
func (dapp *DummyApplication) Commit() types.RetCode { | |||
dapp.lastCommitState = dapp.state.Copy() | |||
return 0 | |||
} | |||
func (dapp *DummyApplication) Rollback() types.RetCode { | |||
dapp.state = dapp.lastCommitState.Copy() | |||
return 0 | |||
} | |||
func (dapp *DummyApplication) SetEventsMode(mode types.EventsMode) types.RetCode { | |||
return 0 | |||
} | |||
func (dapp *DummyApplication) AddListener(key string) types.RetCode { | |||
return 0 | |||
} | |||
func (dapp *DummyApplication) RemListener(key string) types.RetCode { | |||
return 0 | |||
} | |||
func (dapp *DummyApplication) GetEvents() []types.Event { | |||
return nil | |||
} |
@ -0,0 +1,52 @@ | |||
package main | |||
import ( | |||
// "fmt" | |||
"testing" | |||
. "github.com/tendermint/go-common" | |||
"github.com/tendermint/go-wire" | |||
"github.com/tendermint/tmsp/server" | |||
"github.com/tendermint/tmsp/types" | |||
) | |||
func TestStream(t *testing.T) { | |||
// Start the listener | |||
_, err := server.StartListener("tcp://127.0.0.1:8080", NewDummyApplication()) | |||
if err != nil { | |||
Exit(err.Error()) | |||
} | |||
// Connect to the socket | |||
conn, err := Connect("tcp://127.0.0.1:8080") | |||
if err != nil { | |||
Exit(err.Error()) | |||
} | |||
// Read response data | |||
go func() { | |||
for { | |||
var n int64 | |||
var err error | |||
var res types.Response | |||
wire.ReadBinaryPtr(&res, conn, &n, &err) | |||
if err != nil { | |||
Exit(err.Error()) | |||
} | |||
// fmt.Println("Read", n) | |||
} | |||
}() | |||
// Write requests | |||
for { | |||
var n int64 | |||
var err error | |||
var req types.Request = types.RequestAppendTx{TxBytes: []byte("test")} | |||
wire.WriteBinary(req, conn, &n, &err) | |||
if err != nil { | |||
Exit(err.Error()) | |||
} | |||
// fmt.Println("Wrote", n) | |||
} | |||
} |
@ -0,0 +1,136 @@ | |||
package server | |||
import ( | |||
"fmt" | |||
"net" | |||
"reflect" | |||
"strings" | |||
. "github.com/tendermint/go-common" | |||
"github.com/tendermint/go-wire" | |||
"github.com/tendermint/tmsp/types" | |||
) | |||
func StartListener(protoAddr string, app types.Application) (net.Listener, error) { | |||
parts := strings.SplitN(protoAddr, "://", 2) | |||
proto, addr := parts[0], parts[1] | |||
ln, err := net.Listen(proto, addr) | |||
if err != nil { | |||
return nil, err | |||
} | |||
// A goroutine to accept a connection. | |||
go func() { | |||
for { | |||
// Accept a connection | |||
conn, err := ln.Accept() | |||
if err != nil { | |||
Exit("Failed to accept connection") | |||
} else { | |||
fmt.Println("Accepted a new connection") | |||
} | |||
connClosed := make(chan struct{}, 2) // Push to signal connection closed | |||
responses := make(chan types.Response, 1000) // A channel to buffer responses | |||
// Read requests from conn and deal with them | |||
go handleRequests(app, connClosed, conn, responses) | |||
// Pull responses from 'responses' and write them to conn. | |||
go handleResponses(connClosed, responses, conn) | |||
// Wait until connection is closed | |||
<-connClosed | |||
fmt.Println("Connection was closed. Waiting for new connection...") | |||
} | |||
}() | |||
return ln, nil | |||
} | |||
// Read requests from conn and deal with them | |||
func handleRequests(app types.Application, connClosed chan struct{}, conn net.Conn, responses chan<- types.Response) { | |||
var count int | |||
for { | |||
var n int64 | |||
var err error | |||
var req types.Request | |||
wire.ReadBinaryPtr(&req, conn, &n, &err) | |||
if err != nil { | |||
fmt.Println(err.Error()) | |||
connClosed <- struct{}{} | |||
return | |||
} | |||
count++ | |||
if count%1000 == 0 { | |||
fmt.Println("Received request", reflect.TypeOf(req), req, n, err, count) | |||
} | |||
handleRequest(app, req, responses) | |||
} | |||
} | |||
func handleRequest(app types.Application, req types.Request, responses chan<- types.Response) { | |||
switch req := req.(type) { | |||
case types.RequestEcho: | |||
retCode, msg := app.Echo(req.Message) | |||
responses <- types.ResponseEcho{retCode, msg} | |||
case types.RequestAppendTx: | |||
retCode := app.AppendTx(req.TxBytes) | |||
responses <- types.ResponseAppendTx{retCode} | |||
events := app.GetEvents() | |||
for _, event := range events { | |||
responses <- types.ResponseEvent{event} | |||
} | |||
case types.RequestGetHash: | |||
hash, retCode := app.GetHash() | |||
responses <- types.ResponseGetHash{retCode, hash} | |||
case types.RequestCommit: | |||
retCode := app.Commit() | |||
responses <- types.ResponseCommit{retCode} | |||
case types.RequestRollback: | |||
retCode := app.Rollback() | |||
responses <- types.ResponseRollback{retCode} | |||
case types.RequestSetEventsMode: | |||
retCode := app.SetEventsMode(req.EventsMode) | |||
responses <- types.ResponseSetEventsMode{retCode} | |||
if req.EventsMode == types.EventsModeOn { | |||
events := app.GetEvents() | |||
for _, event := range events { | |||
responses <- types.ResponseEvent{event} | |||
} | |||
} | |||
case types.RequestAddListener: | |||
retCode := app.AddListener(req.EventKey) | |||
responses <- types.ResponseAddListener{retCode} | |||
case types.RequestRemListener: | |||
retCode := app.RemListener(req.EventKey) | |||
responses <- types.ResponseRemListener{retCode} | |||
default: | |||
responses <- types.ResponseException{"Unknown request"} | |||
} | |||
} | |||
// Pull responses from 'responses' and write them to conn. | |||
func handleResponses(connClosed chan struct{}, responses <-chan types.Response, conn net.Conn) { | |||
var count int | |||
for { | |||
var res = <-responses | |||
var n int64 | |||
var err error | |||
wire.WriteBinary(res, conn, &n, &err) | |||
if err != nil { | |||
fmt.Println(err.Error()) | |||
connClosed <- struct{}{} | |||
return | |||
} | |||
count++ | |||
if count%1000 == 0 { | |||
fmt.Println("Sent response", reflect.TypeOf(res), res, n, err, count) | |||
} | |||
} | |||
} |
@ -0,0 +1,31 @@ | |||
package types | |||
type Application interface { | |||
// Echo a message | |||
Echo(message string) (RetCode, string) | |||
// Append a tx, which may or may not get committed | |||
AppendTx(tx []byte) RetCode | |||
// Return the application Merkle root hash | |||
GetHash() ([]byte, RetCode) | |||
// Set commit checkpoint | |||
Commit() RetCode | |||
// Rollback to the latest commit | |||
Rollback() RetCode | |||
// Set events reporting mode | |||
SetEventsMode(mode EventsMode) RetCode | |||
// Add event listener | |||
AddListener(key string) RetCode | |||
// Remove event listener | |||
RemListener(key string) RetCode | |||
// Get all events | |||
GetEvents() []Event | |||
} |
@ -0,0 +1,14 @@ | |||
package types | |||
type EventsMode int8 | |||
const ( | |||
EventsModeOff = EventsMode(0) | |||
EventsModeCached = EventsMode(1) | |||
EventsModeOn = EventsMode(2) | |||
) | |||
type Event struct { | |||
Key string | |||
TxBytes []byte | |||
} |
@ -0,0 +1,155 @@ | |||
package types | |||
import "github.com/tendermint/go-wire" | |||
const ( | |||
requestTypeEcho = byte(0x01) | |||
requestTypeAppendTx = byte(0x02) | |||
requestTypeGetHash = byte(0x03) | |||
requestTypeCommit = byte(0x04) | |||
requestTypeRollback = byte(0x05) | |||
requestTypeSetEventsMode = byte(0x06) | |||
requestTypeAddListener = byte(0x07) | |||
requestTypeRemListener = byte(0x08) | |||
responseTypeEcho = byte(0x11) | |||
responseTypeAppendTx = byte(0x12) | |||
responseTypeGetHash = byte(0x13) | |||
responseTypeCommit = byte(0x14) | |||
responseTypeRollback = byte(0x15) | |||
responseTypeSetEventsMode = byte(0x16) | |||
responseTypeAddListener = byte(0x17) | |||
responseTypeRemListener = byte(0x18) | |||
responseTypeException = byte(0x20) | |||
responseTypeEvent = byte(0x21) | |||
) | |||
//---------------------------------------- | |||
type RequestEcho struct { | |||
Message string | |||
} | |||
type RequestAppendTx struct { | |||
TxBytes []byte | |||
} | |||
type RequestGetHash struct { | |||
} | |||
type RequestCommit struct { | |||
} | |||
type RequestRollback struct { | |||
} | |||
type RequestSetEventsMode struct { | |||
EventsMode | |||
} | |||
type RequestAddListener struct { | |||
EventKey string | |||
} | |||
type RequestRemListener struct { | |||
EventKey string | |||
} | |||
type Request interface { | |||
AssertRequestType() | |||
} | |||
func (_ RequestEcho) AssertRequestType() {} | |||
func (_ RequestAppendTx) AssertRequestType() {} | |||
func (_ RequestGetHash) AssertRequestType() {} | |||
func (_ RequestCommit) AssertRequestType() {} | |||
func (_ RequestRollback) AssertRequestType() {} | |||
func (_ RequestSetEventsMode) AssertRequestType() {} | |||
func (_ RequestAddListener) AssertRequestType() {} | |||
func (_ RequestRemListener) AssertRequestType() {} | |||
var _ = wire.RegisterInterface( | |||
struct{ Request }{}, | |||
wire.ConcreteType{RequestEcho{}, requestTypeEcho}, | |||
wire.ConcreteType{RequestAppendTx{}, requestTypeAppendTx}, | |||
wire.ConcreteType{RequestGetHash{}, requestTypeGetHash}, | |||
wire.ConcreteType{RequestCommit{}, requestTypeCommit}, | |||
wire.ConcreteType{RequestRollback{}, requestTypeRollback}, | |||
wire.ConcreteType{RequestSetEventsMode{}, requestTypeSetEventsMode}, | |||
wire.ConcreteType{RequestAddListener{}, requestTypeAddListener}, | |||
wire.ConcreteType{RequestRemListener{}, requestTypeRemListener}, | |||
) | |||
//---------------------------------------- | |||
type ResponseEcho struct { | |||
RetCode | |||
Message string | |||
} | |||
type ResponseAppendTx struct { | |||
RetCode | |||
} | |||
type ResponseGetHash struct { | |||
RetCode | |||
Hash []byte | |||
} | |||
type ResponseCommit struct { | |||
RetCode | |||
} | |||
type ResponseRollback struct { | |||
RetCode | |||
} | |||
type ResponseSetEventsMode struct { | |||
RetCode | |||
} | |||
type ResponseAddListener struct { | |||
RetCode | |||
} | |||
type ResponseRemListener struct { | |||
RetCode | |||
} | |||
type ResponseException struct { | |||
Error string | |||
} | |||
type ResponseEvent struct { | |||
Event | |||
} | |||
type Response interface { | |||
AssertResponseType() | |||
} | |||
func (_ ResponseEcho) AssertResponseType() {} | |||
func (_ ResponseAppendTx) AssertResponseType() {} | |||
func (_ ResponseGetHash) AssertResponseType() {} | |||
func (_ ResponseCommit) AssertResponseType() {} | |||
func (_ ResponseRollback) AssertResponseType() {} | |||
func (_ ResponseSetEventsMode) AssertResponseType() {} | |||
func (_ ResponseAddListener) AssertResponseType() {} | |||
func (_ ResponseRemListener) AssertResponseType() {} | |||
func (_ ResponseException) AssertResponseType() {} | |||
func (_ ResponseEvent) AssertResponseType() {} | |||
var _ = wire.RegisterInterface( | |||
struct{ Response }{}, | |||
wire.ConcreteType{ResponseEcho{}, responseTypeEcho}, | |||
wire.ConcreteType{ResponseAppendTx{}, responseTypeAppendTx}, | |||
wire.ConcreteType{ResponseGetHash{}, responseTypeGetHash}, | |||
wire.ConcreteType{ResponseCommit{}, responseTypeCommit}, | |||
wire.ConcreteType{ResponseRollback{}, responseTypeRollback}, | |||
wire.ConcreteType{ResponseSetEventsMode{}, responseTypeSetEventsMode}, | |||
wire.ConcreteType{ResponseAddListener{}, responseTypeAddListener}, | |||
wire.ConcreteType{ResponseRemListener{}, responseTypeRemListener}, | |||
wire.ConcreteType{ResponseException{}, responseTypeException}, | |||
wire.ConcreteType{ResponseEvent{}, responseTypeEvent}, | |||
) |
@ -0,0 +1,12 @@ | |||
package types | |||
type RetCode int | |||
// Reserved return codes | |||
const ( | |||
RetCodeOK = RetCode(0) | |||
RetCodeInternalError = RetCode(1) | |||
RetCodeUnauthorized = RetCode(2) | |||
RetCodeInsufficientFees = RetCode(3) | |||
RetCodeUnknownRequest = RetCode(4) | |||
) |