From 41918d619c5ccdafb6d57e5c7443ea78e4381a1c Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 22 Aug 2016 16:00:48 -0400 Subject: [PATCH] expose query and info through rpc --- node/node.go | 1 + rpc/core/pipe.go | 6 +++ rpc/core/routes.go | 19 +++++++++ rpc/core/tmsp.go | 17 ++++++++ rpc/core/types/responses.go | 14 +++++++ rpc/test/client_test.go | 81 ++++++++++++++++++++++++++++++++----- 6 files changed, 128 insertions(+), 10 deletions(-) create mode 100644 rpc/core/tmsp.go diff --git a/node/node.go b/node/node.go index 220bf81b9..2c3ae565a 100644 --- a/node/node.go +++ b/node/node.go @@ -209,6 +209,7 @@ func (n *Node) StartRPC() ([]net.Listener, error) { rpccore.SetSwitch(n.sw) rpccore.SetPrivValidator(n.privValidator) rpccore.SetGenesisDoc(n.genesisDoc) + rpccore.SetProxyAppQuery(n.proxyApp.Query()) listenAddrs := strings.Split(n.config.GetString("rpc_laddr"), ",") diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index 464f7fda9..90febf0b0 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -8,6 +8,7 @@ import ( bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/consensus" mempl "github.com/tendermint/tendermint/mempool" + "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" ) @@ -19,6 +20,7 @@ var mempoolReactor *mempl.MempoolReactor var p2pSwitch *p2p.Switch var privValidator *types.PrivValidator var genDoc *types.GenesisDoc // cache the genesis structure +var proxyAppQuery proxy.AppConnQuery var config cfg.Config = nil @@ -57,3 +59,7 @@ func SetPrivValidator(pv *types.PrivValidator) { func SetGenesisDoc(doc *types.GenesisDoc) { genDoc = doc } + +func SetProxyAppQuery(appConn proxy.AppConnQuery) { + proxyAppQuery = appConn +} diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 78a0b6187..97c013ab7 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -25,6 +25,9 @@ var Routes = map[string]*rpc.RPCFunc{ "unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxsResult, ""), "num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxsResult, ""), + "tmsp_query": rpc.NewRPCFunc(TMSPQueryResult, "query"), + "tmsp_info": rpc.NewRPCFunc(TMSPInfoResult, ""), + "unsafe_flush_mempool": rpc.NewRPCFunc(UnsafeFlushMempool, ""), "unsafe_set_config": rpc.NewRPCFunc(UnsafeSetConfigResult, "type,key,value"), "unsafe_start_cpu_profiler": rpc.NewRPCFunc(UnsafeStartCPUProfilerResult, "filename"), @@ -152,6 +155,22 @@ func BroadcastTxAsyncResult(tx []byte) (ctypes.TMResult, error) { } } +func TMSPQueryResult(query []byte) (ctypes.TMResult, error) { + if r, err := TMSPQuery(query); err != nil { + return nil, err + } else { + return r, nil + } +} + +func TMSPInfoResult() (ctypes.TMResult, error) { + if r, err := TMSPInfo(); err != nil { + return nil, err + } else { + return r, nil + } +} + func UnsafeFlushMempoolResult() (ctypes.TMResult, error) { if r, err := UnsafeFlushMempool(); err != nil { return nil, err diff --git a/rpc/core/tmsp.go b/rpc/core/tmsp.go new file mode 100644 index 000000000..9a19e6eeb --- /dev/null +++ b/rpc/core/tmsp.go @@ -0,0 +1,17 @@ +package core + +import ( + ctypes "github.com/tendermint/tendermint/rpc/core/types" +) + +//----------------------------------------------------------------------------- + +func TMSPQuery(query []byte) (*ctypes.ResultTMSPQuery, error) { + res := proxyAppQuery.QuerySync(query) + return &ctypes.ResultTMSPQuery{res}, nil +} + +func TMSPInfo() (*ctypes.ResultTMSPInfo, error) { + res := proxyAppQuery.InfoSync() + return &ctypes.ResultTMSPInfo{res}, nil +} diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index c1eebb6e1..cd68addd5 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -68,6 +68,14 @@ type ResultUnconfirmedTxs struct { Txs []types.Tx `json:"txs"` } +type ResultTMSPInfo struct { + Result tmsp.Result `json:"result"` +} + +type ResultTMSPQuery struct { + Result tmsp.Result `json:"result"` +} + type ResultUnsafeFlushMempool struct{} type ResultUnsafeSetConfig struct{} @@ -107,6 +115,10 @@ const ( ResultTypeBroadcastTx = byte(0x60) ResultTypeUnconfirmedTxs = byte(0x61) + // 0x7 bytes are for querying the application + ResultTypeTMSPQuery = byte(0x70) + ResultTypeTMSPInfo = byte(0x71) + // 0x8 bytes are for events ResultTypeSubscribe = byte(0x80) ResultTypeUnsubscribe = byte(0x81) @@ -145,4 +157,6 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&ResultUnsafeProfile{}, ResultTypeUnsafeStopCPUProfiler}, wire.ConcreteType{&ResultUnsafeProfile{}, ResultTypeUnsafeWriteHeapProfile}, wire.ConcreteType{&ResultUnsafeFlushMempool{}, ResultTypeUnsafeFlushMempool}, + wire.ConcreteType{&ResultTMSPQuery{}, ResultTypeTMSPQuery}, + wire.ConcreteType{&ResultTMSPInfo{}, ResultTypeTMSPInfo}, ) diff --git a/rpc/test/client_test.go b/rpc/test/client_test.go index 47d2ead10..345049d5a 100644 --- a/rpc/test/client_test.go +++ b/rpc/test/client_test.go @@ -2,9 +2,12 @@ package rpctest import ( "bytes" - "crypto/rand" + crand "crypto/rand" "fmt" + "math/rand" + "strings" "testing" + "time" . "github.com/tendermint/go-common" ctypes "github.com/tendermint/tendermint/rpc/core/types" @@ -14,6 +17,7 @@ import ( //-------------------------------------------------------------------------------- // Test the HTTP client +// These tests assume the dummy app //-------------------------------------------------------------------------------- //-------------------------------------------------------------------------------- @@ -49,20 +53,22 @@ func testStatus(t *testing.T, statusI interface{}) { //-------------------------------------------------------------------------------- // broadcast tx sync -func testTx() []byte { - buf := make([]byte, 16) - _, err := rand.Read(buf) +// random bytes (excluding byte('=')) +func randBytes() []byte { + n := rand.Intn(10) + 2 + buf := make([]byte, n) + _, err := crand.Read(buf) if err != nil { panic(err) } - return buf + return bytes.Replace(buf, []byte("="), []byte{100}, -1) } func TestURIBroadcastTxSync(t *testing.T) { config.Set("block_size", 0) defer config.Set("block_size", -1) tmResult := new(ctypes.TMResult) - tx := testTx() + tx := randBytes() _, err := clientURI.Call("broadcast_tx_sync", map[string]interface{}{"tx": tx}, tmResult) if err != nil { panic(err) @@ -74,7 +80,7 @@ func TestJSONBroadcastTxSync(t *testing.T) { config.Set("block_size", 0) defer config.Set("block_size", -1) tmResult := new(ctypes.TMResult) - tx := testTx() + tx := randBytes() _, err := clientJSON.Call("broadcast_tx_sync", []interface{}{tx}, tmResult) if err != nil { panic(err) @@ -95,18 +101,73 @@ func testBroadcastTxSync(t *testing.T, resI interface{}, tx []byte) { txs := mem.Reap(1) if !bytes.Equal(txs[0], tx) { - panic(Fmt("Tx in mempool does not match test tx. Got %X, expected %X", txs[0], testTx)) + panic(Fmt("Tx in mempool does not match test tx. Got %X, expected %X", txs[0], tx)) } mem.Flush() } +//-------------------------------------------------------------------------------- +// query + +func testTxKV() ([]byte, []byte, []byte) { + k := randBytes() + v := randBytes() + return k, v, []byte(Fmt("%s=%s", k, v)) +} + +func sendTx() ([]byte, []byte) { + tmResult := new(ctypes.TMResult) + k, v, tx := testTxKV() + _, err := clientJSON.Call("broadcast_tx_commit", []interface{}{tx}, tmResult) + if err != nil { + panic(err) + } + fmt.Println("SENT TX", tx) + fmt.Printf("SENT TX %X\n", tx) + fmt.Printf("k %X; v %X", k, v) + return k, v +} + +func TestURITMSPQuery(t *testing.T) { + k, v := sendTx() + time.Sleep(time.Second) + tmResult := new(ctypes.TMResult) + _, err := clientURI.Call("tmsp_query", map[string]interface{}{"query": Fmt("%X", k)}, tmResult) + if err != nil { + panic(err) + } + testTMSPQuery(t, tmResult, v) +} + +func TestJSONTMSPQuery(t *testing.T) { + k, v := sendTx() + tmResult := new(ctypes.TMResult) + _, err := clientJSON.Call("tmsp_query", []interface{}{Fmt("%X", k)}, tmResult) + if err != nil { + panic(err) + } + testTMSPQuery(t, tmResult, v) +} + +func testTMSPQuery(t *testing.T, statusI interface{}, value []byte) { + tmRes := statusI.(*ctypes.TMResult) + query := (*tmRes).(*ctypes.ResultTMSPQuery) + if query.Result.IsErr() { + panic(Fmt("Query returned an err: %v", query)) + } + // XXX: specific to value returned by the dummy + if !strings.Contains(string(query.Result.Data), "exists=true") { + panic(Fmt("Query error. Expected to find 'exists=true'. Got: %s", query.Result.Data)) + } +} + //-------------------------------------------------------------------------------- // broadcast tx commit func TestURIBroadcastTxCommit(t *testing.T) { tmResult := new(ctypes.TMResult) - tx := testTx() + tx := randBytes() _, err := clientURI.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, tmResult) if err != nil { panic(err) @@ -116,7 +177,7 @@ func TestURIBroadcastTxCommit(t *testing.T) { func TestJSONBroadcastTxCommit(t *testing.T) { tmResult := new(ctypes.TMResult) - tx := testTx() + tx := randBytes() _, err := clientJSON.Call("broadcast_tx_commit", []interface{}{tx}, tmResult) if err != nil { panic(err)