From 062c33c1096761f142db1eda66479591b65e1e15 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Thu, 24 Mar 2016 10:19:48 -0700 Subject: [PATCH] TMSP Client is an interface --- client/client.go | 371 +++------------------------------------- client/local_client.go | 193 +++++++++++++++++++++ client/remote_client.go | 370 +++++++++++++++++++++++++++++++++++++++ tests/test_counter.go | 17 +- 4 files changed, 599 insertions(+), 352 deletions(-) create mode 100644 client/local_client.go create mode 100644 client/remote_client.go diff --git a/client/client.go b/client/client.go index 19de9e000..acd99f60a 100644 --- a/client/client.go +++ b/client/client.go @@ -1,360 +1,43 @@ package tmspcli import ( - "bufio" - "container/list" - "errors" - "fmt" - "net" - "sync" - "time" - - . "github.com/tendermint/go-common" "github.com/tendermint/tmsp/types" + "sync" ) -const reqQueueSize = 256 // TODO make configurable -const maxResponseSize = 1048576 // 1MB TODO make configurable -const flushThrottleMS = 20 // Don't wait longer than... - -type Callback func(*types.Request, *types.Response) - -// This is goroutine-safe, but users should beware that -// the application in general is not meant to be interfaced -// with concurrent callers. -type Client struct { - QuitService - sync.Mutex // [EB]: is this even used? - - reqQueue chan *ReqRes - flushTimer *ThrottleTimer - mustConnect bool - - mtx sync.Mutex - addr string - conn net.Conn - err error - reqSent *list.List - resCb func(*types.Request, *types.Response) // listens to all callbacks -} - -func NewClient(addr string, mustConnect bool) (*Client, error) { - cli := &Client{ - reqQueue: make(chan *ReqRes, reqQueueSize), - flushTimer: NewThrottleTimer("Client", flushThrottleMS), - mustConnect: mustConnect, - - addr: addr, - reqSent: list.New(), - resCb: nil, - } - cli.QuitService = *NewQuitService(nil, "Client", cli) - _, err := cli.Start() // Just start it, it's confusing for callers to remember to start. - if mustConnect { - return nil, err - } else { - return cli, nil - } -} - -func (cli *Client) OnStart() (err error) { - cli.QuitService.OnStart() - doneCh := make(chan struct{}) - go func() { - RETRY_LOOP: - for { - conn, err_ := Connect(cli.addr) - if err_ != nil { - if cli.mustConnect { - err = err_ // OnStart() will return this. - close(doneCh) - return - } else { - fmt.Printf("tmsp.Client failed to connect to %v. Retrying...\n", cli.addr) - time.Sleep(time.Second * 3) - continue RETRY_LOOP - } - } - go cli.sendRequestsRoutine(conn) - go cli.recvResponseRoutine(conn) - close(doneCh) // OnStart() will return no error. - return - } - }() - <-doneCh - return // err -} - -func (cli *Client) OnStop() { - cli.QuitService.OnStop() - if cli.conn != nil { - cli.conn.Close() - } -} - -// Set listener for all responses -// NOTE: callback may get internally generated flush responses. -func (cli *Client) SetResponseCallback(resCb Callback) { - cli.mtx.Lock() - defer cli.mtx.Unlock() - cli.resCb = resCb -} - -func (cli *Client) StopForError(err error) { - cli.mtx.Lock() - fmt.Printf("Stopping tmsp.Client for error: %v\n", err.Error()) - if cli.err == nil { - cli.err = err - } - cli.mtx.Unlock() - cli.Stop() -} - -func (cli *Client) Error() error { - cli.mtx.Lock() - defer cli.mtx.Unlock() - return cli.err +type Client interface { + SetResponseCallback(Callback) + Error() error + Stop() bool + + FlushAsync() *ReqRes + EchoAsync(msg string) *ReqRes + InfoAsync() *ReqRes + SetOptionAsync(key string, value string) *ReqRes + AppendTxAsync(tx []byte) *ReqRes + CheckTxAsync(tx []byte) *ReqRes + QueryAsync(tx []byte) *ReqRes + CommitAsync() *ReqRes + + FlushSync() error + EchoSync(msg string) (res types.Result) + InfoSync() (res types.Result) + SetOptionSync(key string, value string) (res types.Result) + AppendTxSync(tx []byte) (res types.Result) + CheckTxSync(tx []byte) (res types.Result) + QuerySync(tx []byte) (res types.Result) + CommitSync() (res types.Result) + + InitChainSync(validators []*types.Validator) (err error) + EndBlockSync(height uint64) (changedValidators []*types.Validator, err error) } //---------------------------------------- -func (cli *Client) sendRequestsRoutine(conn net.Conn) { - w := bufio.NewWriter(conn) - for { - select { - case <-cli.flushTimer.Ch: - select { - case cli.reqQueue <- NewReqRes(types.RequestFlush()): - default: - // Probably will fill the buffer, or retry later. - } - case <-cli.QuitService.Quit: - return - case reqres := <-cli.reqQueue: - cli.willSendReq(reqres) - err := types.WriteMessage(reqres.Request, w) - if err != nil { - cli.StopForError(err) - return - } - // log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request) - if reqres.Request.Type == types.MessageType_Flush { - err = w.Flush() - if err != nil { - cli.StopForError(err) - return - } - } - } - } -} - -func (cli *Client) recvResponseRoutine(conn net.Conn) { - r := bufio.NewReader(conn) // Buffer reads - for { - var res = &types.Response{} - err := types.ReadMessage(r, res) - if err != nil { - cli.StopForError(err) - return - } - switch res.Type { - case types.MessageType_Exception: - // XXX After setting cli.err, release waiters (e.g. reqres.Done()) - cli.StopForError(errors.New(res.Error)) - default: - // log.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res) - err := cli.didRecvResponse(res) - if err != nil { - cli.StopForError(err) - } - } - } -} - -func (cli *Client) willSendReq(reqres *ReqRes) { - cli.mtx.Lock() - defer cli.mtx.Unlock() - cli.reqSent.PushBack(reqres) -} - -func (cli *Client) didRecvResponse(res *types.Response) error { - cli.mtx.Lock() - defer cli.mtx.Unlock() - - // Get the first ReqRes - next := cli.reqSent.Front() - if next == nil { - return fmt.Errorf("Unexpected result type %v when nothing expected", res.Type) - } - reqres := next.Value.(*ReqRes) - if !resMatchesReq(reqres.Request, res) { - return fmt.Errorf("Unexpected result type %v when response to %v expected", - res.Type, reqres.Request.Type) - } - - reqres.Response = res // Set response - reqres.Done() // Release waiters - cli.reqSent.Remove(next) // Pop first item from linked list - - // Notify reqRes listener if set - if cb := reqres.GetCallback(); cb != nil { - cb(res) - } - - // Notify client listener if set - if cli.resCb != nil { - cli.resCb(reqres.Request, res) - } - - return nil -} - -//---------------------------------------- - -func (cli *Client) EchoAsync(msg string) *ReqRes { - return cli.queueRequest(types.RequestEcho(msg)) -} - -func (cli *Client) FlushAsync() *ReqRes { - return cli.queueRequest(types.RequestFlush()) -} - -func (cli *Client) SetOptionAsync(key string, value string) *ReqRes { - return cli.queueRequest(types.RequestSetOption(key, value)) -} - -func (cli *Client) AppendTxAsync(tx []byte) *ReqRes { - return cli.queueRequest(types.RequestAppendTx(tx)) -} - -func (cli *Client) CheckTxAsync(tx []byte) *ReqRes { - return cli.queueRequest(types.RequestCheckTx(tx)) -} - -func (cli *Client) CommitAsync() *ReqRes { - return cli.queueRequest(types.RequestCommit()) -} - -func (cli *Client) QueryAsync(query []byte) *ReqRes { - return cli.queueRequest(types.RequestQuery(query)) -} - -func (cli *Client) InitChainAsync(validators []*types.Validator) *ReqRes { - return cli.queueRequest(types.RequestInitChain(validators)) -} - -func (cli *Client) EndBlockAsync(height uint64) *ReqRes { - return cli.queueRequest(types.RequestEndBlock(height)) -} - -//---------------------------------------- - -func (cli *Client) FlushSync() error { - cli.queueRequest(types.RequestFlush()).Wait() - return cli.err -} - -func (cli *Client) InfoSync() (info string, err error) { - reqres := cli.queueRequest(types.RequestInfo()) - cli.FlushSync() - if cli.err != nil { - return "", cli.err - } - return string(reqres.Response.Data), nil -} - -func (cli *Client) SetOptionSync(key string, value string) (log string, err error) { - reqres := cli.queueRequest(types.RequestSetOption(key, value)) - cli.FlushSync() - if cli.err != nil { - return "", cli.err - } - return reqres.Response.Log, nil -} - -func (cli *Client) AppendTxSync(tx []byte) (res types.Result) { - reqres := cli.queueRequest(types.RequestAppendTx(tx)) - cli.FlushSync() - if cli.err != nil { - return types.ErrInternalError.SetLog(cli.err.Error()) - } - resp := reqres.Response - return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} -} - -func (cli *Client) CheckTxSync(tx []byte) (res types.Result) { - reqres := cli.queueRequest(types.RequestCheckTx(tx)) - cli.FlushSync() - if cli.err != nil { - return types.ErrInternalError.SetLog(cli.err.Error()) - } - resp := reqres.Response - return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} -} - -func (cli *Client) CommitSync() (res types.Result) { - reqres := cli.queueRequest(types.RequestCommit()) - cli.FlushSync() - if cli.err != nil { - return types.ErrInternalError.SetLog(cli.err.Error()) - } - resp := reqres.Response - return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} -} - -func (cli *Client) QuerySync(query []byte) (res types.Result) { - reqres := cli.queueRequest(types.RequestQuery(query)) - cli.FlushSync() - if cli.err != nil { - return types.ErrInternalError.SetLog(cli.err.Error()) - } - resp := reqres.Response - return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} -} - -func (cli *Client) InitChainSync(validators []*types.Validator) (err error) { - cli.queueRequest(types.RequestInitChain(validators)) - cli.FlushSync() - if cli.err != nil { - return cli.err - } - return nil -} - -func (cli *Client) EndBlockSync(height uint64) (validators []*types.Validator, err error) { - reqres := cli.queueRequest(types.RequestEndBlock(height)) - cli.FlushSync() - if cli.err != nil { - return nil, cli.err - } - return reqres.Response.Validators, nil -} - -//---------------------------------------- - -func (cli *Client) queueRequest(req *types.Request) *ReqRes { - reqres := NewReqRes(req) - // TODO: set cli.err if reqQueue times out - cli.reqQueue <- reqres - - // Maybe auto-flush, or unset auto-flush - switch req.Type { - case types.MessageType_Flush: - cli.flushTimer.Unset() - default: - cli.flushTimer.Set() - } - - return reqres -} +type Callback func(*types.Request, *types.Response) //---------------------------------------- -func resMatchesReq(req *types.Request, res *types.Response) (ok bool) { - return req.Type == res.Type -} - type ReqRes struct { *types.Request *sync.WaitGroup diff --git a/client/local_client.go b/client/local_client.go new file mode 100644 index 000000000..b14164f89 --- /dev/null +++ b/client/local_client.go @@ -0,0 +1,193 @@ +package tmspcli + +import ( + types "github.com/tendermint/tmsp/types" + "sync" +) + +type localClient struct { + mtx *sync.Mutex + types.Application + Callback +} + +func NewLocalClient(mtx *sync.Mutex, app types.Application) *localClient { + if mtx == nil { + mtx = new(sync.Mutex) + } + return &localClient{ + mtx: mtx, + Application: app, + } +} + +func (app *localClient) SetResponseCallback(cb Callback) { + app.mtx.Lock() + defer app.mtx.Unlock() + app.Callback = cb +} + +// TODO: change types.Application to include Error()? +func (app *localClient) Error() error { + return nil +} + +func (app *localClient) Stop() bool { + return true +} + +func (app *localClient) FlushAsync() *ReqRes { + // Do nothing + return newLocalReqRes(types.RequestFlush(), nil) +} + +func (app *localClient) EchoAsync(msg string) *ReqRes { + return app.callback( + types.RequestEcho(msg), + types.ResponseEcho(msg), + ) +} + +func (app *localClient) InfoAsync() *ReqRes { + app.mtx.Lock() + info := app.Application.Info() + app.mtx.Unlock() + return app.callback( + types.RequestInfo(), + types.ResponseInfo(info), + ) +} + +func (app *localClient) SetOptionAsync(key string, value string) *ReqRes { + app.mtx.Lock() + log := app.Application.SetOption(key, value) + app.mtx.Unlock() + return app.callback( + types.RequestSetOption(key, value), + types.ResponseSetOption(log), + ) +} + +func (app *localClient) AppendTxAsync(tx []byte) *ReqRes { + app.mtx.Lock() + res := app.Application.AppendTx(tx) + app.mtx.Unlock() + return app.callback( + types.RequestAppendTx(tx), + types.ResponseAppendTx(res.Code, res.Data, res.Log), + ) +} + +func (app *localClient) CheckTxAsync(tx []byte) *ReqRes { + app.mtx.Lock() + res := app.Application.CheckTx(tx) + app.mtx.Unlock() + return app.callback( + types.RequestCheckTx(tx), + types.ResponseCheckTx(res.Code, res.Data, res.Log), + ) +} + +func (app *localClient) QueryAsync(tx []byte) *ReqRes { + app.mtx.Lock() + res := app.Application.Query(tx) + app.mtx.Unlock() + return app.callback( + types.RequestQuery(tx), + types.ResponseQuery(res.Code, res.Data, res.Log), + ) +} + +func (app *localClient) CommitAsync() *ReqRes { + app.mtx.Lock() + res := app.Application.Commit() + app.mtx.Unlock() + return app.callback( + types.RequestCommit(), + types.ResponseCommit(res.Code, res.Data, res.Log), + ) +} + +//------------------------------------------------------- + +func (app *localClient) FlushSync() error { + return nil +} + +func (app *localClient) EchoSync(msg string) (res types.Result) { + return types.OK.SetData([]byte(msg)) +} + +func (app *localClient) InfoSync() (res types.Result) { + app.mtx.Lock() + info := app.Application.Info() + app.mtx.Unlock() + return types.OK.SetData([]byte(info)) +} + +func (app *localClient) SetOptionSync(key string, value string) (res types.Result) { + app.mtx.Lock() + log := app.Application.SetOption(key, value) + app.mtx.Unlock() + return types.OK.SetLog(log) +} + +func (app *localClient) AppendTxSync(tx []byte) (res types.Result) { + app.mtx.Lock() + res = app.Application.AppendTx(tx) + app.mtx.Unlock() + return res +} + +func (app *localClient) CheckTxSync(tx []byte) (res types.Result) { + app.mtx.Lock() + res = app.Application.CheckTx(tx) + app.mtx.Unlock() + return res +} + +func (app *localClient) QuerySync(query []byte) (res types.Result) { + app.mtx.Lock() + res = app.Application.Query(query) + app.mtx.Unlock() + return res +} + +func (app *localClient) CommitSync() (res types.Result) { + app.mtx.Lock() + res = app.Application.Commit() + app.mtx.Unlock() + return res +} + +func (app *localClient) InitChainSync(validators []*types.Validator) (err error) { + app.mtx.Lock() + if bcApp, ok := app.Application.(types.BlockchainAware); ok { + bcApp.InitChain(validators) + } + app.mtx.Unlock() + return nil +} + +func (app *localClient) EndBlockSync(height uint64) (changedValidators []*types.Validator, err error) { + app.mtx.Lock() + if bcApp, ok := app.Application.(types.BlockchainAware); ok { + changedValidators = bcApp.EndBlock(height) + } + app.mtx.Unlock() + return changedValidators, nil +} + +//------------------------------------------------------- + +func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRes { + app.Callback(req, res) + return newLocalReqRes(req, res) +} + +func newLocalReqRes(req *types.Request, res *types.Response) *ReqRes { + reqRes := NewReqRes(req) + reqRes.Response = res + reqRes.SetDone() + return reqRes +} diff --git a/client/remote_client.go b/client/remote_client.go new file mode 100644 index 000000000..fb8a5c8ea --- /dev/null +++ b/client/remote_client.go @@ -0,0 +1,370 @@ +package tmspcli + +import ( + "bufio" + "container/list" + "errors" + "fmt" + "net" + "sync" + "time" + + . "github.com/tendermint/go-common" + "github.com/tendermint/tmsp/types" +) + +const reqQueueSize = 256 // TODO make configurable +const maxResponseSize = 1048576 // 1MB TODO make configurable +const flushThrottleMS = 20 // Don't wait longer than... + +// This is goroutine-safe, but users should beware that +// the application in general is not meant to be interfaced +// with concurrent callers. +type remoteClient struct { + QuitService + sync.Mutex // [EB]: is this even used? + + reqQueue chan *ReqRes + flushTimer *ThrottleTimer + mustConnect bool + + mtx sync.Mutex + addr string + conn net.Conn + err error + reqSent *list.List + resCb func(*types.Request, *types.Response) // listens to all callbacks +} + +func NewClient(addr string, mustConnect bool) (*remoteClient, error) { + cli := &remoteClient{ + reqQueue: make(chan *ReqRes, reqQueueSize), + flushTimer: NewThrottleTimer("remoteClient", flushThrottleMS), + mustConnect: mustConnect, + + addr: addr, + reqSent: list.New(), + resCb: nil, + } + cli.QuitService = *NewQuitService(nil, "remoteClient", cli) + _, err := cli.Start() // Just start it, it's confusing for callers to remember to start. + if mustConnect { + return nil, err + } else { + return cli, nil + } +} + +func (cli *remoteClient) OnStart() (err error) { + cli.QuitService.OnStart() + doneCh := make(chan struct{}) + go func() { + RETRY_LOOP: + for { + conn, err_ := Connect(cli.addr) + if err_ != nil { + if cli.mustConnect { + err = err_ // OnStart() will return this. + close(doneCh) + return + } else { + fmt.Printf("tmsp.remoteClient failed to connect to %v. Retrying...\n", cli.addr) + time.Sleep(time.Second * 3) + continue RETRY_LOOP + } + } + go cli.sendRequestsRoutine(conn) + go cli.recvResponseRoutine(conn) + close(doneCh) // OnStart() will return no error. + return + } + }() + <-doneCh + return // err +} + +func (cli *remoteClient) OnStop() { + cli.QuitService.OnStop() + if cli.conn != nil { + cli.conn.Close() + } +} + +// Set listener for all responses +// NOTE: callback may get internally generated flush responses. +func (cli *remoteClient) SetResponseCallback(resCb Callback) { + cli.mtx.Lock() + defer cli.mtx.Unlock() + cli.resCb = resCb +} + +func (cli *remoteClient) StopForError(err error) { + cli.mtx.Lock() + fmt.Printf("Stopping tmsp.remoteClient for error: %v\n", err.Error()) + if cli.err == nil { + cli.err = err + } + cli.mtx.Unlock() + cli.Stop() +} + +func (cli *remoteClient) Error() error { + cli.mtx.Lock() + defer cli.mtx.Unlock() + return cli.err +} + +//---------------------------------------- + +func (cli *remoteClient) sendRequestsRoutine(conn net.Conn) { + w := bufio.NewWriter(conn) + for { + select { + case <-cli.flushTimer.Ch: + select { + case cli.reqQueue <- NewReqRes(types.RequestFlush()): + default: + // Probably will fill the buffer, or retry later. + } + case <-cli.QuitService.Quit: + return + case reqres := <-cli.reqQueue: + cli.willSendReq(reqres) + err := types.WriteMessage(reqres.Request, w) + if err != nil { + cli.StopForError(err) + return + } + // log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request) + if reqres.Request.Type == types.MessageType_Flush { + err = w.Flush() + if err != nil { + cli.StopForError(err) + return + } + } + } + } +} + +func (cli *remoteClient) recvResponseRoutine(conn net.Conn) { + r := bufio.NewReader(conn) // Buffer reads + for { + var res = &types.Response{} + err := types.ReadMessage(r, res) + if err != nil { + cli.StopForError(err) + return + } + switch res.Type { + case types.MessageType_Exception: + // XXX After setting cli.err, release waiters (e.g. reqres.Done()) + cli.StopForError(errors.New(res.Error)) + default: + // log.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res) + err := cli.didRecvResponse(res) + if err != nil { + cli.StopForError(err) + } + } + } +} + +func (cli *remoteClient) willSendReq(reqres *ReqRes) { + cli.mtx.Lock() + defer cli.mtx.Unlock() + cli.reqSent.PushBack(reqres) +} + +func (cli *remoteClient) didRecvResponse(res *types.Response) error { + cli.mtx.Lock() + defer cli.mtx.Unlock() + + // Get the first ReqRes + next := cli.reqSent.Front() + if next == nil { + return fmt.Errorf("Unexpected result type %v when nothing expected", res.Type) + } + reqres := next.Value.(*ReqRes) + if !resMatchesReq(reqres.Request, res) { + return fmt.Errorf("Unexpected result type %v when response to %v expected", + res.Type, reqres.Request.Type) + } + + reqres.Response = res // Set response + reqres.Done() // Release waiters + cli.reqSent.Remove(next) // Pop first item from linked list + + // Notify reqRes listener if set + if cb := reqres.GetCallback(); cb != nil { + cb(res) + } + + // Notify client listener if set + if cli.resCb != nil { + cli.resCb(reqres.Request, res) + } + + return nil +} + +//---------------------------------------- + +func (cli *remoteClient) EchoAsync(msg string) *ReqRes { + return cli.queueRequest(types.RequestEcho(msg)) +} + +func (cli *remoteClient) FlushAsync() *ReqRes { + return cli.queueRequest(types.RequestFlush()) +} + +func (cli *remoteClient) InfoAsync() *ReqRes { + return cli.queueRequest(types.RequestInfo()) +} + +func (cli *remoteClient) SetOptionAsync(key string, value string) *ReqRes { + return cli.queueRequest(types.RequestSetOption(key, value)) +} + +func (cli *remoteClient) AppendTxAsync(tx []byte) *ReqRes { + return cli.queueRequest(types.RequestAppendTx(tx)) +} + +func (cli *remoteClient) CheckTxAsync(tx []byte) *ReqRes { + return cli.queueRequest(types.RequestCheckTx(tx)) +} + +func (cli *remoteClient) QueryAsync(query []byte) *ReqRes { + return cli.queueRequest(types.RequestQuery(query)) +} + +func (cli *remoteClient) CommitAsync() *ReqRes { + return cli.queueRequest(types.RequestCommit()) +} + +func (cli *remoteClient) InitChainAsync(validators []*types.Validator) *ReqRes { + return cli.queueRequest(types.RequestInitChain(validators)) +} + +func (cli *remoteClient) EndBlockAsync(height uint64) *ReqRes { + return cli.queueRequest(types.RequestEndBlock(height)) +} + +//---------------------------------------- + +func (cli *remoteClient) EchoSync(msg string) (res types.Result) { + reqres := cli.queueRequest(types.RequestEcho(msg)) + cli.FlushSync() + if cli.err != nil { + return types.ErrInternalError.SetLog(cli.err.Error()) + } + resp := reqres.Response + return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} +} + +func (cli *remoteClient) FlushSync() error { + cli.queueRequest(types.RequestFlush()).Wait() + return cli.err +} + +func (cli *remoteClient) InfoSync() (res types.Result) { + reqres := cli.queueRequest(types.RequestInfo()) + cli.FlushSync() + if cli.err != nil { + return types.ErrInternalError.SetLog(cli.err.Error()) + } + resp := reqres.Response + return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} +} + +func (cli *remoteClient) SetOptionSync(key string, value string) (res types.Result) { + reqres := cli.queueRequest(types.RequestSetOption(key, value)) + cli.FlushSync() + if cli.err != nil { + return types.ErrInternalError.SetLog(cli.err.Error()) + } + resp := reqres.Response + return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} +} + +func (cli *remoteClient) AppendTxSync(tx []byte) (res types.Result) { + reqres := cli.queueRequest(types.RequestAppendTx(tx)) + cli.FlushSync() + if cli.err != nil { + return types.ErrInternalError.SetLog(cli.err.Error()) + } + resp := reqres.Response + return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} +} + +func (cli *remoteClient) CheckTxSync(tx []byte) (res types.Result) { + reqres := cli.queueRequest(types.RequestCheckTx(tx)) + cli.FlushSync() + if cli.err != nil { + return types.ErrInternalError.SetLog(cli.err.Error()) + } + resp := reqres.Response + return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} +} + +func (cli *remoteClient) QuerySync(query []byte) (res types.Result) { + reqres := cli.queueRequest(types.RequestQuery(query)) + cli.FlushSync() + if cli.err != nil { + return types.ErrInternalError.SetLog(cli.err.Error()) + } + resp := reqres.Response + return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} +} + +func (cli *remoteClient) CommitSync() (res types.Result) { + reqres := cli.queueRequest(types.RequestCommit()) + cli.FlushSync() + if cli.err != nil { + return types.ErrInternalError.SetLog(cli.err.Error()) + } + resp := reqres.Response + return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} +} + +func (cli *remoteClient) InitChainSync(validators []*types.Validator) (err error) { + cli.queueRequest(types.RequestInitChain(validators)) + cli.FlushSync() + if cli.err != nil { + return cli.err + } + return nil +} + +func (cli *remoteClient) EndBlockSync(height uint64) (validators []*types.Validator, err error) { + reqres := cli.queueRequest(types.RequestEndBlock(height)) + cli.FlushSync() + if cli.err != nil { + return nil, cli.err + } + return reqres.Response.Validators, nil +} + +//---------------------------------------- + +func (cli *remoteClient) queueRequest(req *types.Request) *ReqRes { + reqres := NewReqRes(req) + // TODO: set cli.err if reqQueue times out + cli.reqQueue <- reqres + + // Maybe auto-flush, or unset auto-flush + switch req.Type { + case types.MessageType_Flush: + cli.flushTimer.Unset() + default: + cli.flushTimer.Set() + } + + return reqres +} + +//---------------------------------------- + +func resMatchesReq(req *types.Request, res *types.Response) (ok bool) { + return req.Type == res.Type +} diff --git a/tests/test_counter.go b/tests/test_counter.go index 6ed8f4f41..44adc3207 100644 --- a/tests/test_counter.go +++ b/tests/test_counter.go @@ -68,7 +68,7 @@ func startApp() *process.Process { return proc } -func startClient() *tmspcli.Client { +func startClient() tmspcli.Client { // Start client client, err := tmspcli.NewClient("tcp://127.0.0.1:46658", true) if err != nil { @@ -77,14 +77,15 @@ func startClient() *tmspcli.Client { return client } -func setOption(client *tmspcli.Client, key, value string) { - log, err := client.SetOptionSync(key, value) - if err != nil { - panic(Fmt("setting %v=%v: %v\nlog: %v", key, value, err, log)) +func setOption(client tmspcli.Client, key, value string) { + res := client.SetOptionSync(key, value) + _, _, log := res.Code, res.Data, res.Log + if res.IsErr() { + panic(Fmt("setting %v=%v: \nlog: %v", key, value, log)) } } -func commit(client *tmspcli.Client, hashExp []byte) { +func commit(client tmspcli.Client, hashExp []byte) { res := client.CommitSync() _, data, log := res.Code, res.Data, res.Log if res.IsErr() { @@ -96,7 +97,7 @@ func commit(client *tmspcli.Client, hashExp []byte) { } } -func appendTx(client *tmspcli.Client, txBytes []byte, codeExp types.CodeType, dataExp []byte) { +func appendTx(client tmspcli.Client, txBytes []byte, codeExp types.CodeType, dataExp []byte) { res := client.AppendTxSync(txBytes) code, data, log := res.Code, res.Data, res.Log if res.IsErr() { @@ -112,7 +113,7 @@ func appendTx(client *tmspcli.Client, txBytes []byte, codeExp types.CodeType, da } } -func checkTx(client *tmspcli.Client, txBytes []byte, codeExp types.CodeType, dataExp []byte) { +func checkTx(client tmspcli.Client, txBytes []byte, codeExp types.CodeType, dataExp []byte) { res := client.CheckTxSync(txBytes) code, data, log := res.Code, res.Data, res.Log if res.IsErr() {