From 8ae0a0a4816529e5f84744eb29ef6cf6358ec05d Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Fri, 22 Jan 2016 15:50:11 -0800 Subject: [PATCH] Add Query; Add TMSPClient --- client/golang/client.go | 336 +++++++++++++++++++++++++++++++++++++++ cmd/tmsp-cli/tmsp-cli.go | 34 +++- 2 files changed, 368 insertions(+), 2 deletions(-) create mode 100644 client/golang/client.go diff --git a/client/golang/client.go b/client/golang/client.go new file mode 100644 index 000000000..2dee848ac --- /dev/null +++ b/client/golang/client.go @@ -0,0 +1,336 @@ +package tmspcli + +import ( + "bufio" + "container/list" + "errors" + "fmt" + "net" + "reflect" + "sync" + + . "github.com/tendermint/go-common" + "github.com/tendermint/go-wire" + tmsp "github.com/tendermint/tmsp/types" +) + +const maxResponseSize = 1048576 // 1MB TODO make configurable +const flushThrottleMS = 20 // Don't wait longer than... + +type Callback func(tmsp.Request, tmsp.Response) + +// This is goroutine-safe, but users should beware that +// the application in general is not meant to be interfaced +// with concurrent callers. +type TMSPClient struct { + QuitService + sync.Mutex // [EB]: is this even used? + + reqQueue chan *reqRes + flushTimer *ThrottleTimer + + mtx sync.Mutex + conn net.Conn + bufWriter *bufio.Writer + err error + reqSent *list.List + resCb func(tmsp.Request, tmsp.Response) +} + +func NewTMSPClient(conn net.Conn, bufferSize int) *TMSPClient { + cli := &TMSPClient{ + reqQueue: make(chan *reqRes, bufferSize), + flushTimer: NewThrottleTimer("TMSPClient", flushThrottleMS), + + conn: conn, + bufWriter: bufio.NewWriter(conn), + reqSent: list.New(), + resCb: nil, + } + cli.QuitService = *NewQuitService(nil, "TMSPClient", cli) + return cli +} + +func (cli *TMSPClient) OnStart() error { + cli.QuitService.OnStart() + go cli.sendRequestsRoutine() + go cli.recvResponseRoutine() + return nil +} + +func (cli *TMSPClient) OnStop() { + cli.QuitService.OnStop() + cli.conn.Close() +} + +// NOTE: callback may get internally generated flush responses. +func (cli *TMSPClient) SetResponseCallback(resCb Callback) { + cli.mtx.Lock() + defer cli.mtx.Unlock() + cli.resCb = resCb +} + +func (cli *TMSPClient) StopForError(err error) { + cli.mtx.Lock() + // log.Error("Stopping TMSPClient for error.", "error", err) + if cli.err == nil { + cli.err = err + } + cli.mtx.Unlock() + cli.Stop() +} + +func (cli *TMSPClient) Error() error { + cli.mtx.Lock() + defer cli.mtx.Unlock() + return cli.err +} + +//---------------------------------------- + +func (cli *TMSPClient) sendRequestsRoutine() { + for { + var n int + var err error + select { + case <-cli.flushTimer.Ch: + select { + case cli.reqQueue <- newReqRes(tmsp.RequestFlush{}): + default: + // Probably will fill the buffer, or retry later. + } + case <-cli.QuitService.Quit: + return + case reqres := <-cli.reqQueue: + cli.willSendReq(reqres) + wire.WriteBinaryLengthPrefixed(struct{ tmsp.Request }{reqres.Request}, cli.bufWriter, &n, &err) // Length prefix + if err != nil { + cli.StopForError(err) + return + } + // log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request) + if _, ok := reqres.Request.(tmsp.RequestFlush); ok { + err = cli.bufWriter.Flush() + if err != nil { + cli.StopForError(err) + return + } + } + } + } +} + +func (cli *TMSPClient) recvResponseRoutine() { + r := bufio.NewReader(cli.conn) // Buffer reads + for { + var res tmsp.Response + var n int + var err error + wire.ReadBinaryPtrLengthPrefixed(&res, r, maxResponseSize, &n, &err) + if err != nil { + cli.StopForError(err) + return + } + switch res := res.(type) { + case tmsp.ResponseException: + cli.StopForError(errors.New(res.Error)) + default: + // log.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res) + err := cli.didRecvResponse(res) + if err != nil { + cli.StopForError(err) + } + } + } +} + +func (cli *TMSPClient) willSendReq(reqres *reqRes) { + cli.mtx.Lock() + defer cli.mtx.Unlock() + cli.reqSent.PushBack(reqres) +} + +func (cli *TMSPClient) didRecvResponse(res tmsp.Response) error { + cli.mtx.Lock() + defer cli.mtx.Unlock() + + // Special logic for events which have no corresponding requests. + if _, ok := res.(tmsp.ResponseEvent); ok && cli.resCb != nil { + cli.resCb(nil, res) + return nil + } + + // Get the first reqRes + next := cli.reqSent.Front() + if next == nil { + return fmt.Errorf("Unexpected result type %v when nothing expected", reflect.TypeOf(res)) + } + reqres := next.Value.(*reqRes) + if !resMatchesReq(reqres.Request, res) { + return fmt.Errorf("Unexpected result type %v when response to %v expected", + reflect.TypeOf(res), reflect.TypeOf(reqres.Request)) + } + + reqres.Response = res // Set response + reqres.Done() // Release waiters + cli.reqSent.Remove(next) // Pop first item from linked list + + // Callback if there is a listener + if cli.resCb != nil { + cli.resCb(reqres.Request, res) + } + + return nil +} + +//---------------------------------------- + +func (cli *TMSPClient) EchoAsync(msg string) { + cli.queueRequest(tmsp.RequestEcho{msg}) +} + +func (cli *TMSPClient) FlushAsync() { + cli.queueRequest(tmsp.RequestFlush{}) +} + +func (cli *TMSPClient) SetOptionAsync(key string, value string) { + cli.queueRequest(tmsp.RequestSetOption{key, value}) +} + +func (cli *TMSPClient) AppendTxAsync(tx []byte) { + cli.queueRequest(tmsp.RequestAppendTx{tx}) +} + +func (cli *TMSPClient) CheckTxAsync(tx []byte) { + cli.queueRequest(tmsp.RequestCheckTx{tx}) +} + +func (cli *TMSPClient) GetHashAsync() { + cli.queueRequest(tmsp.RequestGetHash{}) +} + +func (cli *TMSPClient) AddListenerAsync(key string) { + cli.queueRequest(tmsp.RequestAddListener{key}) +} + +func (cli *TMSPClient) RemListenerAsync(key string) { + cli.queueRequest(tmsp.RequestRemListener{key}) +} + +func (cli *TMSPClient) QueryAsync(query []byte) { + cli.queueRequest(tmsp.RequestQuery{query}) +} + +//---------------------------------------- + +func (cli *TMSPClient) InfoSync() (info []string, err error) { + reqres := cli.queueRequest(tmsp.RequestInfo{}) + cli.FlushSync() + if cli.err != nil { + return nil, cli.err + } + return reqres.Response.(tmsp.ResponseInfo).Data, nil +} + +func (cli *TMSPClient) FlushSync() error { + cli.queueRequest(tmsp.RequestFlush{}).Wait() + return cli.err +} + +func (cli *TMSPClient) AppendTxSync(tx []byte) error { + reqres := cli.queueRequest(tmsp.RequestAppendTx{tx}) + cli.FlushSync() + if cli.err != nil { + return cli.err + } + res := reqres.Response.(tmsp.ResponseAppendTx) + return res.RetCode.Error() +} + +func (cli *TMSPClient) GetHashSync() (hash []byte, err error) { + reqres := cli.queueRequest(tmsp.RequestGetHash{}) + cli.FlushSync() + if cli.err != nil { + return nil, cli.err + } + res := reqres.Response.(tmsp.ResponseGetHash) + return res.Hash, res.RetCode.Error() +} + +func (cli *TMSPClient) QuerySync(query []byte) (result []byte, err error) { + reqres := cli.queueRequest(tmsp.RequestQuery{query}) + cli.FlushSync() + if cli.err != nil { + return nil, cli.err + } + res := reqres.Response.(tmsp.ResponseQuery) + return res.Result, res.RetCode.Error() +} + +//---------------------------------------- + +func (cli *TMSPClient) queueRequest(req tmsp.Request) *reqRes { + reqres := newReqRes(req) + // TODO: set cli.err if reqQueue times out + cli.reqQueue <- reqres + + // Maybe auto-flush, or unset auto-flush + switch req.(type) { + case tmsp.RequestFlush: + cli.flushTimer.Unset() + default: + cli.flushTimer.Set() + } + + return reqres +} + +//---------------------------------------- + +func resMatchesReq(req tmsp.Request, res tmsp.Response) (ok bool) { + switch req.(type) { + case tmsp.RequestEcho: + _, ok = res.(tmsp.ResponseEcho) + case tmsp.RequestFlush: + _, ok = res.(tmsp.ResponseFlush) + case tmsp.RequestInfo: + _, ok = res.(tmsp.ResponseInfo) + case tmsp.RequestSetOption: + _, ok = res.(tmsp.ResponseSetOption) + case tmsp.RequestAppendTx: + _, ok = res.(tmsp.ResponseAppendTx) + case tmsp.RequestCheckTx: + _, ok = res.(tmsp.ResponseCheckTx) + case tmsp.RequestGetHash: + _, ok = res.(tmsp.ResponseGetHash) + case tmsp.RequestAddListener: + _, ok = res.(tmsp.ResponseAddListener) + case tmsp.RequestRemListener: + _, ok = res.(tmsp.ResponseRemListener) + case tmsp.RequestQuery: + _, ok = res.(tmsp.ResponseQuery) + default: + return false + } + return +} + +type reqRes struct { + tmsp.Request + *sync.WaitGroup + tmsp.Response // Not set atomically, so be sure to use WaitGroup. +} + +func newReqRes(req tmsp.Request) *reqRes { + return &reqRes{ + Request: req, + WaitGroup: waitGroup1(), + Response: nil, + } +} + +func waitGroup1() (wg *sync.WaitGroup) { + wg = &sync.WaitGroup{} + wg.Add(1) + return +} diff --git a/cmd/tmsp-cli/tmsp-cli.go b/cmd/tmsp-cli/tmsp-cli.go index a623a95a9..f27af7802 100644 --- a/cmd/tmsp-cli/tmsp-cli.go +++ b/cmd/tmsp-cli/tmsp-cli.go @@ -23,8 +23,8 @@ var conn net.Conn func main() { app := cli.NewApp() - app.Name = "cli" - app.Usage = "cli [command] [args...]" + app.Name = "tmsp-cli" + app.Usage = "tmsp-cli [command] [args...]" app.Flags = []cli.Flag{ cli.StringFlag{ Name: "address", @@ -89,6 +89,13 @@ func main() { cmdGetHash(c) }, }, + { + Name: "query", + Usage: "Query application state", + Action: func(c *cli.Context) { + cmdQuery(c) + }, + }, } app.Before = before app.Run(os.Args) @@ -234,6 +241,29 @@ func cmdGetHash(c *cli.Context) { fmt.Printf("%X\n", res.(types.ResponseGetHash).Hash) } +// Query application state +func cmdQuery(c *cli.Context) { + args := c.Args() + if len(args) != 1 { + Exit("append_tx takes 1 argument") + } + queryString := args[0] + query := []byte(queryString) + if len(queryString) > 2 && strings.HasPrefix(queryString, "0x") { + var err error + query, err = hex.DecodeString(queryString[2:]) + if err != nil { + Exit(err.Error()) + } + } + + res, err := makeRequest(conn, types.RequestQuery{query}) + if err != nil { + Exit(err.Error()) + } + fmt.Println("->", res) +} + //-------------------------------------------------------------------------------- func makeRequest(conn net.Conn, req types.Request) (types.Response, error) {