diff --git a/cmd/tmsp/cli.go b/cmd/tmsp/cli.go index 1d1359c22..1eab662e1 100644 --- a/cmd/tmsp/cli.go +++ b/cmd/tmsp/cli.go @@ -66,7 +66,7 @@ func cmdAppendTx(c *cli.Context) { if err != nil { Exit(err.Error()) } - res, err := write(conn, types.RequestAppendTx{[]byte(args[0])}) + res, err := makeRequest(conn, types.RequestAppendTx{[]byte(args[0])}) if err != nil { Exit(err.Error()) } @@ -79,7 +79,7 @@ func cmdGetHash(c *cli.Context) { if err != nil { Exit(err.Error()) } - res, err := write(conn, types.RequestGetHash{}) + res, err := makeRequest(conn, types.RequestGetHash{}) if err != nil { Exit(err.Error()) } @@ -92,7 +92,7 @@ func cmdCommit(c *cli.Context) { if err != nil { Exit(err.Error()) } - _, err = write(conn, types.RequestCommit{}) + _, err = makeRequest(conn, types.RequestCommit{}) if err != nil { Exit(err.Error()) } @@ -105,7 +105,7 @@ func cmdRollback(c *cli.Context) { if err != nil { Exit(err.Error()) } - _, err = write(conn, types.RequestRollback{}) + _, err = makeRequest(conn, types.RequestRollback{}) if err != nil { Exit(err.Error()) } @@ -114,21 +114,35 @@ func cmdRollback(c *cli.Context) { //-------------------------------------------------------------------------------- -func write(conn net.Conn, req types.Request) (types.Response, error) { +func makeRequest(conn net.Conn, req types.Request) (types.Response, error) { var n int var err error + + // Write desired request wire.WriteBinary(req, conn, &n, &err) if err != nil { return nil, err } - // flush! + // Write flush request wire.WriteBinary(types.RequestFlush{}, conn, &n, &err) if err != nil { return nil, err } + // Read desired response var res types.Response wire.ReadBinaryPtr(&res, conn, 0, &n, &err) - return res, err + if err != nil { + return nil, err + } + + // Read flush response + var resFlush types.ResponseFlush + wire.ReadBinaryPtr(&resFlush, conn, 0, &n, &err) + if err != nil { + return nil, err + } + + return res, nil } diff --git a/example/dummy.go b/example/dummy.go index b3e9ae43c..7f8db3cde 100644 --- a/example/dummy.go +++ b/example/dummy.go @@ -1,6 +1,8 @@ package example import ( + "sync" + . "github.com/tendermint/go-common" "github.com/tendermint/go-merkle" "github.com/tendermint/go-wire" @@ -8,8 +10,8 @@ import ( ) type DummyApplication struct { - state merkle.Tree - lastCommitState merkle.Tree + mtx sync.Mutex + state merkle.Tree } func NewDummyApplication() *DummyApplication { @@ -19,48 +21,77 @@ func NewDummyApplication() *DummyApplication { 0, nil, ) - return &DummyApplication{ - state: state, - lastCommitState: state, + return &DummyApplication{state: state} +} + +func (dapp *DummyApplication) Open() types.AppContext { + dapp.mtx.Lock() + defer dapp.mtx.Unlock() + return &DummyAppContext{ + app: dapp, + state: dapp.state.Copy(), } } -func (dapp *DummyApplication) Echo(message string) string { +func (dapp *DummyApplication) commitState(state merkle.Tree) { + dapp.mtx.Lock() + defer dapp.mtx.Unlock() + dapp.state = state.Copy() +} + +func (dapp *DummyApplication) getState() merkle.Tree { + dapp.mtx.Lock() + defer dapp.mtx.Unlock() + return dapp.state.Copy() +} + +//-------------------------------------------------------------------------------- + +type DummyAppContext struct { + app *DummyApplication + state merkle.Tree +} + +func (dac *DummyAppContext) Echo(message string) string { return message } -func (dapp *DummyApplication) Info() []string { - return []string{Fmt("size:%v", dapp.state.Size())} +func (dac *DummyAppContext) Info() []string { + return []string{Fmt("size:%v", dac.state.Size())} } -func (dapp *DummyApplication) SetOption(key string, value string) types.RetCode { +func (dac *DummyAppContext) SetOption(key string, value string) types.RetCode { return 0 } -func (dapp *DummyApplication) AppendTx(tx []byte) ([]types.Event, types.RetCode) { - dapp.state.Set(tx, tx) +func (dac *DummyAppContext) AppendTx(tx []byte) ([]types.Event, types.RetCode) { + dac.state.Set(tx, tx) return nil, 0 } -func (dapp *DummyApplication) GetHash() ([]byte, types.RetCode) { - hash := dapp.state.Hash() +func (dac *DummyAppContext) GetHash() ([]byte, types.RetCode) { + hash := dac.state.Hash() return hash, 0 } -func (dapp *DummyApplication) Commit() types.RetCode { - dapp.lastCommitState = dapp.state.Copy() +func (dac *DummyAppContext) Commit() types.RetCode { + dac.app.commitState(dac.state) return 0 } -func (dapp *DummyApplication) Rollback() types.RetCode { - dapp.state = dapp.lastCommitState.Copy() +func (dac *DummyAppContext) Rollback() types.RetCode { + dac.state = dac.app.getState() return 0 } -func (dapp *DummyApplication) AddListener(key string) types.RetCode { +func (dac *DummyAppContext) AddListener(key string) types.RetCode { return 0 } -func (dapp *DummyApplication) RemListener(key string) types.RetCode { +func (dac *DummyAppContext) RemListener(key string) types.RetCode { return 0 } + +func (dac *DummyAppContext) Close() error { + return nil +} diff --git a/example/dummy_test.go b/example/dummy_test.go index d0a40d878..d6a4ad194 100644 --- a/example/dummy_test.go +++ b/example/dummy_test.go @@ -1,8 +1,9 @@ package example import ( - // "fmt" + "reflect" "testing" + "time" . "github.com/tendermint/go-common" "github.com/tendermint/go-wire" @@ -12,6 +13,8 @@ import ( func TestStream(t *testing.T) { + numAppendTxs := 200000 + // Start the listener _, err := server.StartListener("tcp://127.0.0.1:8080", NewDummyApplication()) if err != nil { @@ -25,7 +28,9 @@ func TestStream(t *testing.T) { } // Read response data + done := make(chan struct{}) go func() { + counter := 0 for { var n int var err error @@ -34,19 +39,59 @@ func TestStream(t *testing.T) { if err != nil { Exit(err.Error()) } - // fmt.Println("Read", n) + + // Process response + switch res := res.(type) { + case types.ResponseAppendTx: + counter += 1 + if res.RetCode != types.RetCodeOK { + t.Error("AppendTx failed with ret_code", res.RetCode) + } + if counter > numAppendTxs { + t.Fatal("Too many AppendTx responses") + } + t.Log("response", counter) + if counter == numAppendTxs { + go func() { + time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow + close(done) + }() + } + case types.ResponseFlush: + // ignore + default: + t.Error("Unexpected response type", reflect.TypeOf(res)) + } } }() // Write requests - for { + for counter := 0; counter < numAppendTxs; counter++ { + // Send request var n int var err error var req types.Request = types.RequestAppendTx{TxBytes: []byte("test")} wire.WriteBinary(req, conn, &n, &err) if err != nil { - Exit(err.Error()) + t.Fatal(err.Error()) + } + + // Sometimes send flush messages + if counter%123 == 0 { + t.Log("flush") + wire.WriteBinary(types.RequestFlush{}, conn, &n, &err) + if err != nil { + t.Fatal(err.Error()) + } } - // fmt.Println("Wrote", n) } + + // Send final flush message + var n int + wire.WriteBinary(types.RequestFlush{}, conn, &n, &err) + if err != nil { + t.Fatal(err.Error()) + } + + <-done } diff --git a/server/server.go b/server/server.go index 075e3e66c..1ed2dec4f 100644 --- a/server/server.go +++ b/server/server.go @@ -11,7 +11,7 @@ import ( "github.com/tendermint/tmsp/types" ) -var maxNumberConnections = 2 +// var maxNumberConnections = 2 func StartListener(protoAddr string, app types.Application) (net.Listener, error) { parts := strings.SplitN(protoAddr, "://", 2) @@ -23,33 +23,51 @@ func StartListener(protoAddr string, app types.Application) (net.Listener, error // A goroutine to accept a connection. go func() { - - semaphore := make(chan struct{}, maxNumberConnections) + // semaphore := make(chan struct{}, maxNumberConnections) for { - semaphore <- struct{}{} + // semaphore <- struct{}{} // Accept a connection + fmt.Println("Waiting for new connection...") conn, err := ln.Accept() if err != nil { Exit("Failed to accept connection") } else { fmt.Println("Accepted a new connection") } - connClosed := make(chan struct{}, 2) // Push to signal connection closed + + appContext := app.Open() + closeConn := make(chan error, 2) // Push to signal connection closed responses := make(chan types.Response, 1000) // A channel to buffer responses // Read requests from conn and deal with them - go handleRequests(app, connClosed, conn, responses) + go handleRequests(appContext, closeConn, conn, responses) // Pull responses from 'responses' and write them to conn. - go handleResponses(connClosed, responses, conn) + go handleResponses(closeConn, responses, conn) go func() { - // Wait until connection is closed - <-connClosed - fmt.Println("Connection was closed. Waiting for new connection...") - - <-semaphore + // Wait until signal to close connection + errClose := <-closeConn + if errClose != nil { + fmt.Printf("Connection error: %v\n", errClose) + } else { + fmt.Println("Connection was closed.") + } + + // Close the connection + err := conn.Close() + if err != nil { + fmt.Printf("Error in closing connection: %v\n", err) + } + + // Close the AppContext + err = appContext.Close() + if err != nil { + fmt.Printf("Error in closing app context: %v\n", err) + } + + // <-semaphore }() } @@ -59,7 +77,7 @@ func StartListener(protoAddr string, app types.Application) (net.Listener, error } // Read requests from conn and deal with them -func handleRequests(app types.Application, connClosed chan struct{}, conn net.Conn, responses chan<- types.Response) { +func handleRequests(appC types.AppContext, closeConn chan error, conn net.Conn, responses chan<- types.Response) { var count int var bufReader = bufio.NewReader(conn) for { @@ -68,48 +86,47 @@ func handleRequests(app types.Application, connClosed chan struct{}, conn net.Co var req types.Request wire.ReadBinaryPtr(&req, bufReader, 0, &n, &err) if err != nil { - fmt.Println("Error in handleRequests:", err.Error()) - connClosed <- struct{}{} + closeConn <- fmt.Errorf("Error in handleRequests: %v", err.Error()) return } count++ - handleRequest(app, req, responses) + handleRequest(appC, req, responses) } } -func handleRequest(app types.Application, req types.Request, responses chan<- types.Response) { +func handleRequest(appC types.AppContext, req types.Request, responses chan<- types.Response) { switch req := req.(type) { case types.RequestEcho: - msg := app.Echo(req.Message) + msg := appC.Echo(req.Message) responses <- types.ResponseEcho{msg} case types.RequestFlush: responses <- types.ResponseFlush{} case types.RequestInfo: - data := app.Info() + data := appC.Info() responses <- types.ResponseInfo{data} case types.RequestSetOption: - retCode := app.SetOption(req.Key, req.Value) + retCode := appC.SetOption(req.Key, req.Value) responses <- types.ResponseSetOption{retCode} case types.RequestAppendTx: - events, retCode := app.AppendTx(req.TxBytes) + events, retCode := appC.AppendTx(req.TxBytes) responses <- types.ResponseAppendTx{retCode} for _, event := range events { responses <- types.ResponseEvent{event} } case types.RequestGetHash: - hash, retCode := app.GetHash() + hash, retCode := appC.GetHash() responses <- types.ResponseGetHash{retCode, hash} case types.RequestCommit: - retCode := app.Commit() + retCode := appC.Commit() responses <- types.ResponseCommit{retCode} case types.RequestRollback: - retCode := app.Rollback() + retCode := appC.Rollback() responses <- types.ResponseRollback{retCode} case types.RequestAddListener: - retCode := app.AddListener(req.EventKey) + retCode := appC.AddListener(req.EventKey) responses <- types.ResponseAddListener{retCode} case types.RequestRemListener: - retCode := app.RemListener(req.EventKey) + retCode := appC.RemListener(req.EventKey) responses <- types.ResponseRemListener{retCode} default: responses <- types.ResponseException{"Unknown request"} @@ -117,7 +134,7 @@ func handleRequest(app types.Application, req types.Request, responses chan<- ty } // Pull responses from 'responses' and write them to conn. -func handleResponses(connClosed chan struct{}, responses <-chan types.Response, conn net.Conn) { +func handleResponses(closeConn chan error, responses <-chan types.Response, conn net.Conn) { var count int var bufWriter = bufio.NewWriter(conn) for { @@ -126,15 +143,13 @@ func handleResponses(connClosed chan struct{}, responses <-chan types.Response, var err error wire.WriteBinary(res, bufWriter, &n, &err) if err != nil { - fmt.Println(err.Error()) - connClosed <- struct{}{} + closeConn <- fmt.Errorf("Error in handleResponses: %v", err.Error()) return } if _, ok := res.(types.ResponseFlush); ok { err = bufWriter.Flush() if err != nil { - fmt.Println(err.Error()) - connClosed <- struct{}{} + closeConn <- fmt.Errorf("Error in handleResponses: %v", err.Error()) return } } diff --git a/types/application.go b/types/application.go index d3a46cb7b..555f97e73 100644 --- a/types/application.go +++ b/types/application.go @@ -2,6 +2,12 @@ package types type Application interface { + // For new socket connections + Open() AppContext +} + +type AppContext interface { + // Echo a message Echo(message string) string @@ -28,4 +34,7 @@ type Application interface { // Remove event listener RemListener(key string) RetCode + + // Close this AppContext + Close() error }