From 90465f727f122367f4bf1f808e3e352fd1b5d11c Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Wed, 17 Apr 2019 11:10:12 -0400 Subject: [PATCH] rpc: add support for batched requests/responses (#3534) Continues from #3280 in building support for batched requests/responses in the JSON RPC (as per issue #3213). * Add JSON RPC batching for client and server As per #3213, this adds support for [JSON RPC batch requests and responses](https://www.jsonrpc.org/specification#batch). * Add additional checks to ensure client responses are the same as results * Fix case where a notification is sent and no response is expected * Add test to check that JSON RPC notifications in a batch are left out in responses * Update CHANGELOG_PENDING.md * Update PR number now that PR has been created * Make errors start with lowercase letter * Refactor batch functionality to be standalone This refactors the batching functionality to rather act in a standalone way. In light of supporting concurrent goroutines making use of the same client, it would make sense to have batching functionality where one could create a batch of requests per goroutine and send that batch without interfering with a batch from another goroutine. * Add examples for simple and batch HTTP client usage * Check errors from writer and remove nolinter directives * Make error strings start with lowercase letter * Refactor examples to make them testable * Use safer deferred shutdown for example Tendermint test node * Recompose rpcClient interface from pre-existing interface components * Rename WaitGroup for brevity * Replace empty ID string with request ID * Remove extraneous test case * Convert first letter of errors.Wrap() messages to lowercase * Remove extraneous function parameter * Make variable declaration terse * Reorder WaitGroup.Done call to help prevent race conditions in the face of failure * Swap mutex to value representation and remove initialization * Restore empty JSONRPC string ID in response to prevent nil * Make JSONRPCBufferedRequest private * Revert PR hard link in CHANGELOG_PENDING * Add client ID for JSONRPCClient This adds code to automatically generate a randomized client ID for the JSONRPCClient, and adds a check of the IDs in the responses (if one was set in the requests). * Extract response ID validation into separate function * Remove extraneous comments * Reorder fields to indicate clearly which are protected by the mutex * Refactor for loop to remove indexing * Restructure and combine loop * Flatten conditional block for better readability * Make multi-variable declaration slightly more readable * Change for loop style * Compress error check statements * Make function description more generic to show that we support different protocols * Preallocate memory for request and result objects --- CHANGELOG_PENDING.md | 1 + rpc/client/examples_test.go | 126 +++++++++++++++++++++ rpc/client/helpers.go | 2 +- rpc/client/httpclient.go | 187 +++++++++++++++++++++++--------- rpc/client/rpc_test.go | 100 +++++++++++++++++ rpc/lib/client/http_client.go | 186 +++++++++++++++++++++++++++++-- rpc/lib/server/handlers.go | 98 +++++++++-------- rpc/lib/server/handlers_test.go | 66 +++++++++++ rpc/lib/server/http_params.go | 2 +- rpc/lib/server/http_server.go | 33 +++++- rpc/test/helpers.go | 77 +++++++++---- 11 files changed, 746 insertions(+), 132 deletions(-) create mode 100644 rpc/client/examples_test.go diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index d628bc598..3f151f71a 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -17,6 +17,7 @@ ### FEATURES: ### IMPROVEMENTS: +- [rpc] [\#3534](https://github.com/tendermint/tendermint/pull/3534) Add support for batched requests/responses in JSON RPC ### BUG FIXES: - [state] [\#3537](https://github.com/tendermint/tendermint/pull/3537#issuecomment-482711833) LoadValidators: do not return an empty validator set diff --git a/rpc/client/examples_test.go b/rpc/client/examples_test.go new file mode 100644 index 000000000..720e48492 --- /dev/null +++ b/rpc/client/examples_test.go @@ -0,0 +1,126 @@ +package client_test + +import ( + "bytes" + "fmt" + + "github.com/tendermint/tendermint/abci/example/kvstore" + "github.com/tendermint/tendermint/rpc/client" + ctypes "github.com/tendermint/tendermint/rpc/core/types" + rpctest "github.com/tendermint/tendermint/rpc/test" +) + +func ExampleHTTP_simple() { + // Start a tendermint node (and kvstore) in the background to test against + app := kvstore.NewKVStoreApplication() + node := rpctest.StartTendermint(app, rpctest.SuppressStdout, rpctest.RecreateConfig) + defer rpctest.StopTendermint(node) + + // Create our RPC client + rpcAddr := rpctest.GetConfig().RPC.ListenAddress + c := client.NewHTTP(rpcAddr, "/websocket") + + // Create a transaction + k := []byte("name") + v := []byte("satoshi") + tx := append(k, append([]byte("="), v...)...) + + // Broadcast the transaction and wait for it to commit (rather use + // c.BroadcastTxSync though in production) + bres, err := c.BroadcastTxCommit(tx) + if err != nil { + panic(err) + } + if bres.CheckTx.IsErr() || bres.DeliverTx.IsErr() { + panic("BroadcastTxCommit transaction failed") + } + + // Now try to fetch the value for the key + qres, err := c.ABCIQuery("/key", k) + if err != nil { + panic(err) + } + if qres.Response.IsErr() { + panic("ABCIQuery failed") + } + if !bytes.Equal(qres.Response.Key, k) { + panic("returned key does not match queried key") + } + if !bytes.Equal(qres.Response.Value, v) { + panic("returned value does not match sent value") + } + + fmt.Println("Sent tx :", string(tx)) + fmt.Println("Queried for :", string(qres.Response.Key)) + fmt.Println("Got value :", string(qres.Response.Value)) + + // Output: + // Sent tx : name=satoshi + // Queried for : name + // Got value : satoshi +} + +func ExampleHTTP_batching() { + // Start a tendermint node (and kvstore) in the background to test against + app := kvstore.NewKVStoreApplication() + node := rpctest.StartTendermint(app, rpctest.SuppressStdout, rpctest.RecreateConfig) + defer rpctest.StopTendermint(node) + + // Create our RPC client + rpcAddr := rpctest.GetConfig().RPC.ListenAddress + c := client.NewHTTP(rpcAddr, "/websocket") + + // Create our two transactions + k1 := []byte("firstName") + v1 := []byte("satoshi") + tx1 := append(k1, append([]byte("="), v1...)...) + + k2 := []byte("lastName") + v2 := []byte("nakamoto") + tx2 := append(k2, append([]byte("="), v2...)...) + + txs := [][]byte{tx1, tx2} + + // Create a new batch + batch := c.NewBatch() + + // Queue up our transactions + for _, tx := range txs { + if _, err := batch.BroadcastTxCommit(tx); err != nil { + panic(err) + } + } + + // Send the batch of 2 transactions + if _, err := batch.Send(); err != nil { + panic(err) + } + + // Now let's query for the original results as a batch + keys := [][]byte{k1, k2} + for _, key := range keys { + if _, err := batch.ABCIQuery("/key", key); err != nil { + panic(err) + } + } + + // Send the 2 queries and keep the results + results, err := batch.Send() + if err != nil { + panic(err) + } + + // Each result in the returned list is the deserialized result of each + // respective ABCIQuery response + for _, result := range results { + qr, ok := result.(*ctypes.ResultABCIQuery) + if !ok { + panic("invalid result type from ABCIQuery request") + } + fmt.Println(string(qr.Response.Key), "=", string(qr.Response.Value)) + } + + // Output: + // firstName = satoshi + // lastName = nakamoto +} diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index 4889b0740..756ba2818 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -15,7 +15,7 @@ type Waiter func(delta int64) (abort error) // but you can plug in another one func DefaultWaitStrategy(delta int64) (abort error) { if delta > 10 { - return errors.Errorf("Waiting for %d blocks... aborting", delta) + return errors.Errorf("waiting for %d blocks... aborting", delta) } else if delta > 0 { // estimate of wait time.... // wait half a second for the next block (in progress) diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index 55c7b4f17..3fd13da37 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -18,27 +18,72 @@ import ( ) /* -HTTP is a Client implementation that communicates with a tendermint node over -json rpc and websockets. +HTTP is a Client implementation that communicates with a Tendermint node over +JSON RPC and WebSockets. This is the main implementation you probably want to use in production code. -There are other implementations when calling the tendermint node in-process +There are other implementations when calling the Tendermint node in-process (Local), or when you want to mock out the server for test code (mock). You can subscribe for any event published by Tendermint using Subscribe method. -Note delivery is best-effort. If you don't read events fast enough or network -is slow, Tendermint might cancel the subscription. The client will attempt to +Note delivery is best-effort. If you don't read events fast enough or network is +slow, Tendermint might cancel the subscription. The client will attempt to resubscribe (you don't need to do anything). It will keep trying every second indefinitely until successful. + +Request batching is available for JSON RPC requests over HTTP, which conforms to +the JSON RPC specification (https://www.jsonrpc.org/specification#batch). See +the example for more details. */ type HTTP struct { remote string rpc *rpcclient.JSONRPCClient + + *baseRPCClient *WSEvents } -// NewHTTP takes a remote endpoint in the form tcp://: -// and the websocket path (which always seems to be "/websocket") +// BatchHTTP provides the same interface as `HTTP`, but allows for batching of +// requests (as per https://www.jsonrpc.org/specification#batch). Do not +// instantiate directly - rather use the HTTP.NewBatch() method to create an +// instance of this struct. +// +// Batching of HTTP requests is thread-safe in the sense that multiple +// goroutines can each create their own batches and send them using the same +// HTTP client. Multiple goroutines could also enqueue transactions in a single +// batch, but ordering of transactions in the batch cannot be guaranteed in such +// an example. +type BatchHTTP struct { + rpcBatch *rpcclient.JSONRPCRequestBatch + *baseRPCClient +} + +// rpcClient is an internal interface to which our RPC clients (batch and +// non-batch) must conform. Acts as an additional code-level sanity check to +// make sure the implementations stay coherent. +type rpcClient interface { + ABCIClient + HistoryClient + NetworkClient + SignClient + StatusClient +} + +// baseRPCClient implements the basic RPC method logic without the actual +// underlying RPC call functionality, which is provided by `caller`. +type baseRPCClient struct { + caller rpcclient.JSONRPCCaller +} + +var _ rpcClient = (*HTTP)(nil) +var _ rpcClient = (*BatchHTTP)(nil) +var _ rpcClient = (*baseRPCClient)(nil) + +//----------------------------------------------------------------------------- +// HTTP + +// NewHTTP takes a remote endpoint in the form ://: and +// the websocket path (which always seems to be "/websocket") func NewHTTP(remote, wsEndpoint string) *HTTP { rc := rpcclient.NewJSONRPCClient(remote) cdc := rc.Codec() @@ -46,39 +91,76 @@ func NewHTTP(remote, wsEndpoint string) *HTTP { rc.SetCodec(cdc) return &HTTP{ - rpc: rc, - remote: remote, - WSEvents: newWSEvents(cdc, remote, wsEndpoint), + rpc: rc, + remote: remote, + baseRPCClient: &baseRPCClient{caller: rc}, + WSEvents: newWSEvents(cdc, remote, wsEndpoint), } } var _ Client = (*HTTP)(nil) -func (c *HTTP) Status() (*ctypes.ResultStatus, error) { +// NewBatch creates a new batch client for this HTTP client. +func (c *HTTP) NewBatch() *BatchHTTP { + rpcBatch := c.rpc.NewRequestBatch() + return &BatchHTTP{ + rpcBatch: rpcBatch, + baseRPCClient: &baseRPCClient{ + caller: rpcBatch, + }, + } +} + +//----------------------------------------------------------------------------- +// BatchHTTP + +// Send is a convenience function for an HTTP batch that will trigger the +// compilation of the batched requests and send them off using the client as a +// single request. On success, this returns a list of the deserialized results +// from each request in the sent batch. +func (b *BatchHTTP) Send() ([]interface{}, error) { + return b.rpcBatch.Send() +} + +// Clear will empty out this batch of requests and return the number of requests +// that were cleared out. +func (b *BatchHTTP) Clear() int { + return b.rpcBatch.Clear() +} + +// Count returns the number of enqueued requests waiting to be sent. +func (b *BatchHTTP) Count() int { + return b.rpcBatch.Count() +} + +//----------------------------------------------------------------------------- +// baseRPCClient + +func (c *baseRPCClient) Status() (*ctypes.ResultStatus, error) { result := new(ctypes.ResultStatus) - _, err := c.rpc.Call("status", map[string]interface{}{}, result) + _, err := c.caller.Call("status", map[string]interface{}{}, result) if err != nil { return nil, errors.Wrap(err, "Status") } return result, nil } -func (c *HTTP) ABCIInfo() (*ctypes.ResultABCIInfo, error) { +func (c *baseRPCClient) ABCIInfo() (*ctypes.ResultABCIInfo, error) { result := new(ctypes.ResultABCIInfo) - _, err := c.rpc.Call("abci_info", map[string]interface{}{}, result) + _, err := c.caller.Call("abci_info", map[string]interface{}{}, result) if err != nil { return nil, errors.Wrap(err, "ABCIInfo") } return result, nil } -func (c *HTTP) ABCIQuery(path string, data cmn.HexBytes) (*ctypes.ResultABCIQuery, error) { +func (c *baseRPCClient) ABCIQuery(path string, data cmn.HexBytes) (*ctypes.ResultABCIQuery, error) { return c.ABCIQueryWithOptions(path, data, DefaultABCIQueryOptions) } -func (c *HTTP) ABCIQueryWithOptions(path string, data cmn.HexBytes, opts ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) { +func (c *baseRPCClient) ABCIQueryWithOptions(path string, data cmn.HexBytes, opts ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) { result := new(ctypes.ResultABCIQuery) - _, err := c.rpc.Call("abci_query", + _, err := c.caller.Call("abci_query", map[string]interface{}{"path": path, "data": data, "height": opts.Height, "prove": opts.Prove}, result) if err != nil { @@ -87,89 +169,89 @@ func (c *HTTP) ABCIQueryWithOptions(path string, data cmn.HexBytes, opts ABCIQue return result, nil } -func (c *HTTP) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { +func (c *baseRPCClient) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { result := new(ctypes.ResultBroadcastTxCommit) - _, err := c.rpc.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, result) + _, err := c.caller.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, result) if err != nil { return nil, errors.Wrap(err, "broadcast_tx_commit") } return result, nil } -func (c *HTTP) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { +func (c *baseRPCClient) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { return c.broadcastTX("broadcast_tx_async", tx) } -func (c *HTTP) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { +func (c *baseRPCClient) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { return c.broadcastTX("broadcast_tx_sync", tx) } -func (c *HTTP) broadcastTX(route string, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { +func (c *baseRPCClient) broadcastTX(route string, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { result := new(ctypes.ResultBroadcastTx) - _, err := c.rpc.Call(route, map[string]interface{}{"tx": tx}, result) + _, err := c.caller.Call(route, map[string]interface{}{"tx": tx}, result) if err != nil { return nil, errors.Wrap(err, route) } return result, nil } -func (c *HTTP) UnconfirmedTxs(limit int) (*ctypes.ResultUnconfirmedTxs, error) { +func (c *baseRPCClient) UnconfirmedTxs(limit int) (*ctypes.ResultUnconfirmedTxs, error) { result := new(ctypes.ResultUnconfirmedTxs) - _, err := c.rpc.Call("unconfirmed_txs", map[string]interface{}{"limit": limit}, result) + _, err := c.caller.Call("unconfirmed_txs", map[string]interface{}{"limit": limit}, result) if err != nil { return nil, errors.Wrap(err, "unconfirmed_txs") } return result, nil } -func (c *HTTP) NumUnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) { +func (c *baseRPCClient) NumUnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) { result := new(ctypes.ResultUnconfirmedTxs) - _, err := c.rpc.Call("num_unconfirmed_txs", map[string]interface{}{}, result) + _, err := c.caller.Call("num_unconfirmed_txs", map[string]interface{}{}, result) if err != nil { return nil, errors.Wrap(err, "num_unconfirmed_txs") } return result, nil } -func (c *HTTP) NetInfo() (*ctypes.ResultNetInfo, error) { +func (c *baseRPCClient) NetInfo() (*ctypes.ResultNetInfo, error) { result := new(ctypes.ResultNetInfo) - _, err := c.rpc.Call("net_info", map[string]interface{}{}, result) + _, err := c.caller.Call("net_info", map[string]interface{}{}, result) if err != nil { return nil, errors.Wrap(err, "NetInfo") } return result, nil } -func (c *HTTP) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) { +func (c *baseRPCClient) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) { result := new(ctypes.ResultDumpConsensusState) - _, err := c.rpc.Call("dump_consensus_state", map[string]interface{}{}, result) + _, err := c.caller.Call("dump_consensus_state", map[string]interface{}{}, result) if err != nil { return nil, errors.Wrap(err, "DumpConsensusState") } return result, nil } -func (c *HTTP) ConsensusState() (*ctypes.ResultConsensusState, error) { +func (c *baseRPCClient) ConsensusState() (*ctypes.ResultConsensusState, error) { result := new(ctypes.ResultConsensusState) - _, err := c.rpc.Call("consensus_state", map[string]interface{}{}, result) + _, err := c.caller.Call("consensus_state", map[string]interface{}{}, result) if err != nil { return nil, errors.Wrap(err, "ConsensusState") } return result, nil } -func (c *HTTP) Health() (*ctypes.ResultHealth, error) { +func (c *baseRPCClient) Health() (*ctypes.ResultHealth, error) { result := new(ctypes.ResultHealth) - _, err := c.rpc.Call("health", map[string]interface{}{}, result) + _, err := c.caller.Call("health", map[string]interface{}{}, result) if err != nil { return nil, errors.Wrap(err, "Health") } return result, nil } -func (c *HTTP) BlockchainInfo(minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) { +func (c *baseRPCClient) BlockchainInfo(minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) { result := new(ctypes.ResultBlockchainInfo) - _, err := c.rpc.Call("blockchain", + _, err := c.caller.Call("blockchain", map[string]interface{}{"minHeight": minHeight, "maxHeight": maxHeight}, result) if err != nil { @@ -178,56 +260,56 @@ func (c *HTTP) BlockchainInfo(minHeight, maxHeight int64) (*ctypes.ResultBlockch return result, nil } -func (c *HTTP) Genesis() (*ctypes.ResultGenesis, error) { +func (c *baseRPCClient) Genesis() (*ctypes.ResultGenesis, error) { result := new(ctypes.ResultGenesis) - _, err := c.rpc.Call("genesis", map[string]interface{}{}, result) + _, err := c.caller.Call("genesis", map[string]interface{}{}, result) if err != nil { return nil, errors.Wrap(err, "Genesis") } return result, nil } -func (c *HTTP) Block(height *int64) (*ctypes.ResultBlock, error) { +func (c *baseRPCClient) Block(height *int64) (*ctypes.ResultBlock, error) { result := new(ctypes.ResultBlock) - _, err := c.rpc.Call("block", map[string]interface{}{"height": height}, result) + _, err := c.caller.Call("block", map[string]interface{}{"height": height}, result) if err != nil { return nil, errors.Wrap(err, "Block") } return result, nil } -func (c *HTTP) BlockResults(height *int64) (*ctypes.ResultBlockResults, error) { +func (c *baseRPCClient) BlockResults(height *int64) (*ctypes.ResultBlockResults, error) { result := new(ctypes.ResultBlockResults) - _, err := c.rpc.Call("block_results", map[string]interface{}{"height": height}, result) + _, err := c.caller.Call("block_results", map[string]interface{}{"height": height}, result) if err != nil { return nil, errors.Wrap(err, "Block Result") } return result, nil } -func (c *HTTP) Commit(height *int64) (*ctypes.ResultCommit, error) { +func (c *baseRPCClient) Commit(height *int64) (*ctypes.ResultCommit, error) { result := new(ctypes.ResultCommit) - _, err := c.rpc.Call("commit", map[string]interface{}{"height": height}, result) + _, err := c.caller.Call("commit", map[string]interface{}{"height": height}, result) if err != nil { return nil, errors.Wrap(err, "Commit") } return result, nil } -func (c *HTTP) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) { +func (c *baseRPCClient) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) { result := new(ctypes.ResultTx) params := map[string]interface{}{ "hash": hash, "prove": prove, } - _, err := c.rpc.Call("tx", params, result) + _, err := c.caller.Call("tx", params, result) if err != nil { return nil, errors.Wrap(err, "Tx") } return result, nil } -func (c *HTTP) TxSearch(query string, prove bool, page, perPage int) (*ctypes.ResultTxSearch, error) { +func (c *baseRPCClient) TxSearch(query string, prove bool, page, perPage int) (*ctypes.ResultTxSearch, error) { result := new(ctypes.ResultTxSearch) params := map[string]interface{}{ "query": query, @@ -235,23 +317,24 @@ func (c *HTTP) TxSearch(query string, prove bool, page, perPage int) (*ctypes.Re "page": page, "per_page": perPage, } - _, err := c.rpc.Call("tx_search", params, result) + _, err := c.caller.Call("tx_search", params, result) if err != nil { return nil, errors.Wrap(err, "TxSearch") } return result, nil } -func (c *HTTP) Validators(height *int64) (*ctypes.ResultValidators, error) { +func (c *baseRPCClient) Validators(height *int64) (*ctypes.ResultValidators, error) { result := new(ctypes.ResultValidators) - _, err := c.rpc.Call("validators", map[string]interface{}{"height": height}, result) + _, err := c.caller.Call("validators", map[string]interface{}{"height": height}, result) if err != nil { return nil, errors.Wrap(err, "Validators") } return result, nil } -/** websocket event stuff here... **/ +//----------------------------------------------------------------------------- +// WSEvents type WSEvents struct { cmn.BaseService diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index ba9bc3af7..1544a3d95 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -4,6 +4,7 @@ import ( "fmt" "net/http" "strings" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -11,7 +12,9 @@ import ( abci "github.com/tendermint/tendermint/abci/types" + cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/rpc/client" + ctypes "github.com/tendermint/tendermint/rpc/core/types" rpctest "github.com/tendermint/tendermint/rpc/test" "github.com/tendermint/tendermint/types" ) @@ -441,3 +444,100 @@ func TestTxSearch(t *testing.T) { require.Len(t, result.Txs, 0) } } + +func TestBatchedJSONRPCCalls(t *testing.T) { + c := getHTTPClient() + testBatchedJSONRPCCalls(t, c) +} + +func testBatchedJSONRPCCalls(t *testing.T, c *client.HTTP) { + k1, v1, tx1 := MakeTxKV() + k2, v2, tx2 := MakeTxKV() + + batch := c.NewBatch() + r1, err := batch.BroadcastTxCommit(tx1) + require.NoError(t, err) + r2, err := batch.BroadcastTxCommit(tx2) + require.NoError(t, err) + require.Equal(t, 2, batch.Count()) + bresults, err := batch.Send() + require.NoError(t, err) + require.Len(t, bresults, 2) + require.Equal(t, 0, batch.Count()) + + bresult1, ok := bresults[0].(*ctypes.ResultBroadcastTxCommit) + require.True(t, ok) + require.Equal(t, *bresult1, *r1) + bresult2, ok := bresults[1].(*ctypes.ResultBroadcastTxCommit) + require.True(t, ok) + require.Equal(t, *bresult2, *r2) + apph := cmn.MaxInt64(bresult1.Height, bresult2.Height) + 1 + + client.WaitForHeight(c, apph, nil) + + q1, err := batch.ABCIQuery("/key", k1) + require.NoError(t, err) + q2, err := batch.ABCIQuery("/key", k2) + require.NoError(t, err) + require.Equal(t, 2, batch.Count()) + qresults, err := batch.Send() + require.NoError(t, err) + require.Len(t, qresults, 2) + require.Equal(t, 0, batch.Count()) + + qresult1, ok := qresults[0].(*ctypes.ResultABCIQuery) + require.True(t, ok) + require.Equal(t, *qresult1, *q1) + qresult2, ok := qresults[1].(*ctypes.ResultABCIQuery) + require.True(t, ok) + require.Equal(t, *qresult2, *q2) + + require.Equal(t, qresult1.Response.Key, k1) + require.Equal(t, qresult2.Response.Key, k2) + require.Equal(t, qresult1.Response.Value, v1) + require.Equal(t, qresult2.Response.Value, v2) +} + +func TestBatchedJSONRPCCallsCancellation(t *testing.T) { + c := getHTTPClient() + _, _, tx1 := MakeTxKV() + _, _, tx2 := MakeTxKV() + + batch := c.NewBatch() + _, err := batch.BroadcastTxCommit(tx1) + require.NoError(t, err) + _, err = batch.BroadcastTxCommit(tx2) + require.NoError(t, err) + // we should have 2 requests waiting + require.Equal(t, 2, batch.Count()) + // we want to make sure we cleared 2 pending requests + require.Equal(t, 2, batch.Clear()) + // now there should be no batched requests + require.Equal(t, 0, batch.Count()) +} + +func TestSendingEmptyJSONRPCRequestBatch(t *testing.T) { + c := getHTTPClient() + batch := c.NewBatch() + _, err := batch.Send() + require.Error(t, err, "sending an empty batch of JSON RPC requests should result in an error") +} + +func TestClearingEmptyJSONRPCRequestBatch(t *testing.T) { + c := getHTTPClient() + batch := c.NewBatch() + require.Zero(t, batch.Clear(), "clearing an empty batch of JSON RPC requests should result in a 0 result") +} + +func TestConcurrentJSONRPCBatching(t *testing.T) { + var wg sync.WaitGroup + c := getHTTPClient() + for i := 0; i < 50; i++ { + wg.Add(1) + go func() { + defer wg.Done() + testBatchedJSONRPCCalls(t, c) + }() + } + wg.Wait() +} diff --git a/rpc/lib/client/http_client.go b/rpc/lib/client/http_client.go index cfa26e89c..824820fab 100644 --- a/rpc/lib/client/http_client.go +++ b/rpc/lib/client/http_client.go @@ -10,9 +10,11 @@ import ( "net/url" "reflect" "strings" + "sync" "github.com/pkg/errors" amino "github.com/tendermint/go-amino" + cmn "github.com/tendermint/tendermint/libs/common" types "github.com/tendermint/tendermint/rpc/lib/types" ) @@ -83,25 +85,56 @@ func makeHTTPClient(remoteAddr string) (string, *http.Client) { //------------------------------------------------------------------------------------ +// jsonRPCBufferedRequest encapsulates a single buffered request, as well as its +// anticipated response structure. +type jsonRPCBufferedRequest struct { + request types.RPCRequest + result interface{} // The result will be deserialized into this object. +} + +// JSONRPCRequestBatch allows us to buffer multiple request/response structures +// into a single batch request. Note that this batch acts like a FIFO queue, and +// is thread-safe. +type JSONRPCRequestBatch struct { + client *JSONRPCClient + + mtx sync.Mutex + requests []*jsonRPCBufferedRequest +} + // JSONRPCClient takes params as a slice type JSONRPCClient struct { address string client *http.Client + id types.JSONRPCStringID cdc *amino.Codec } +// JSONRPCCaller implementers can facilitate calling the JSON RPC endpoint. +type JSONRPCCaller interface { + Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) +} + +// Both JSONRPCClient and JSONRPCRequestBatch can facilitate calls to the JSON +// RPC endpoint. +var _ JSONRPCCaller = (*JSONRPCClient)(nil) +var _ JSONRPCCaller = (*JSONRPCRequestBatch)(nil) + // NewJSONRPCClient returns a JSONRPCClient pointed at the given address. func NewJSONRPCClient(remote string) *JSONRPCClient { address, client := makeHTTPClient(remote) return &JSONRPCClient{ address: address, client: client, + id: types.JSONRPCStringID("jsonrpc-client-" + cmn.RandStr(8)), cdc: amino.NewCodec(), } } +// Call will send the request for the given method through to the RPC endpoint +// immediately, without buffering of requests. func (c *JSONRPCClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) { - request, err := types.MapToRequest(c.cdc, types.JSONRPCStringID("jsonrpc-client"), method, params) + request, err := types.MapToRequest(c.cdc, c.id, method, params) if err != nil { return nil, err } @@ -109,9 +142,7 @@ func (c *JSONRPCClient) Call(method string, params map[string]interface{}, resul if err != nil { return nil, err } - // log.Info(string(requestBytes)) requestBuf := bytes.NewBuffer(requestBytes) - // log.Info(Fmt("RPC request to %v (%v): %v", c.remote, method, string(requestBytes))) httpResponse, err := c.client.Post(c.address, "text/json", requestBuf) if err != nil { return nil, err @@ -122,8 +153,40 @@ func (c *JSONRPCClient) Call(method string, params map[string]interface{}, resul if err != nil { return nil, err } - // log.Info(Fmt("RPC response: %v", string(responseBytes))) - return unmarshalResponseBytes(c.cdc, responseBytes, result) + return unmarshalResponseBytes(c.cdc, responseBytes, c.id, result) +} + +// NewRequestBatch starts a batch of requests for this client. +func (c *JSONRPCClient) NewRequestBatch() *JSONRPCRequestBatch { + return &JSONRPCRequestBatch{ + requests: make([]*jsonRPCBufferedRequest, 0), + client: c, + } +} + +func (c *JSONRPCClient) sendBatch(requests []*jsonRPCBufferedRequest) ([]interface{}, error) { + reqs := make([]types.RPCRequest, 0, len(requests)) + results := make([]interface{}, 0, len(requests)) + for _, req := range requests { + reqs = append(reqs, req.request) + results = append(results, req.result) + } + // serialize the array of requests into a single JSON object + requestBytes, err := json.Marshal(reqs) + if err != nil { + return nil, err + } + httpResponse, err := c.client.Post(c.address, "text/json", bytes.NewBuffer(requestBytes)) + if err != nil { + return nil, err + } + defer httpResponse.Body.Close() // nolint: errcheck + + responseBytes, err := ioutil.ReadAll(httpResponse.Body) + if err != nil { + return nil, err + } + return unmarshalResponseBytesArray(c.cdc, responseBytes, c.id, results) } func (c *JSONRPCClient) Codec() *amino.Codec { @@ -136,6 +199,57 @@ func (c *JSONRPCClient) SetCodec(cdc *amino.Codec) { //------------------------------------------------------------- +// Count returns the number of enqueued requests waiting to be sent. +func (b *JSONRPCRequestBatch) Count() int { + b.mtx.Lock() + defer b.mtx.Unlock() + return len(b.requests) +} + +func (b *JSONRPCRequestBatch) enqueue(req *jsonRPCBufferedRequest) { + b.mtx.Lock() + defer b.mtx.Unlock() + b.requests = append(b.requests, req) +} + +// Clear empties out the request batch. +func (b *JSONRPCRequestBatch) Clear() int { + b.mtx.Lock() + defer b.mtx.Unlock() + return b.clear() +} + +func (b *JSONRPCRequestBatch) clear() int { + count := len(b.requests) + b.requests = make([]*jsonRPCBufferedRequest, 0) + return count +} + +// Send will attempt to send the current batch of enqueued requests, and then +// will clear out the requests once done. On success, this returns the +// deserialized list of results from each of the enqueued requests. +func (b *JSONRPCRequestBatch) Send() ([]interface{}, error) { + b.mtx.Lock() + defer func() { + b.clear() + b.mtx.Unlock() + }() + return b.client.sendBatch(b.requests) +} + +// Call enqueues a request to call the given RPC method with the specified +// parameters, in the same way that the `JSONRPCClient.Call` function would. +func (b *JSONRPCRequestBatch) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) { + request, err := types.MapToRequest(b.client.cdc, b.client.id, method, params) + if err != nil { + return nil, err + } + b.enqueue(&jsonRPCBufferedRequest{request: request, result: result}) + return result, nil +} + +//------------------------------------------------------------- + // URI takes params as a map type URIClient struct { address string @@ -168,7 +282,7 @@ func (c *URIClient) Call(method string, params map[string]interface{}, result in if err != nil { return nil, err } - return unmarshalResponseBytes(c.cdc, responseBytes, result) + return unmarshalResponseBytes(c.cdc, responseBytes, "", result) } func (c *URIClient) Codec() *amino.Codec { @@ -181,7 +295,7 @@ func (c *URIClient) SetCodec(cdc *amino.Codec) { //------------------------------------------------ -func unmarshalResponseBytes(cdc *amino.Codec, responseBytes []byte, result interface{}) (interface{}, error) { +func unmarshalResponseBytes(cdc *amino.Codec, responseBytes []byte, expectedID types.JSONRPCStringID, result interface{}) (interface{}, error) { // Read response. If rpc/core/types is imported, the result will unmarshal // into the correct type. // log.Notice("response", "response", string(responseBytes)) @@ -189,19 +303,71 @@ func unmarshalResponseBytes(cdc *amino.Codec, responseBytes []byte, result inter response := &types.RPCResponse{} err = json.Unmarshal(responseBytes, response) if err != nil { - return nil, errors.Errorf("Error unmarshalling rpc response: %v", err) + return nil, errors.Errorf("error unmarshalling rpc response: %v", err) } if response.Error != nil { - return nil, errors.Errorf("Response error: %v", response.Error) + return nil, errors.Errorf("response error: %v", response.Error) + } + // From the JSON-RPC 2.0 spec: + // id: It MUST be the same as the value of the id member in the Request Object. + if err := validateResponseID(response, expectedID); err != nil { + return nil, err } // Unmarshal the RawMessage into the result. err = cdc.UnmarshalJSON(response.Result, result) if err != nil { - return nil, errors.Errorf("Error unmarshalling rpc response result: %v", err) + return nil, errors.Errorf("error unmarshalling rpc response result: %v", err) } return result, nil } +func unmarshalResponseBytesArray(cdc *amino.Codec, responseBytes []byte, expectedID types.JSONRPCStringID, results []interface{}) ([]interface{}, error) { + var ( + err error + responses []types.RPCResponse + ) + err = json.Unmarshal(responseBytes, &responses) + if err != nil { + return nil, errors.Errorf("error unmarshalling rpc response: %v", err) + } + // No response error checking here as there may be a mixture of successful + // and unsuccessful responses. + + if len(results) != len(responses) { + return nil, errors.Errorf("expected %d result objects into which to inject responses, but got %d", len(responses), len(results)) + } + + for i, response := range responses { + // From the JSON-RPC 2.0 spec: + // id: It MUST be the same as the value of the id member in the Request Object. + if err := validateResponseID(&response, expectedID); err != nil { + return nil, errors.Errorf("failed to validate response ID in response %d: %v", i, err) + } + if err := cdc.UnmarshalJSON(responses[i].Result, results[i]); err != nil { + return nil, errors.Errorf("error unmarshalling rpc response result: %v", err) + } + } + return results, nil +} + +func validateResponseID(res *types.RPCResponse, expectedID types.JSONRPCStringID) error { + // we only validate a response ID if the expected ID is non-empty + if len(expectedID) == 0 { + return nil + } + if res.ID == nil { + return errors.Errorf("missing ID in response") + } + id, ok := res.ID.(types.JSONRPCStringID) + if !ok { + return errors.Errorf("expected ID string in response but got: %v", id) + } + if expectedID != id { + return errors.Errorf("response ID (%s) does not match request ID (%s)", id, expectedID) + } + return nil +} + func argsToURLValues(cdc *amino.Codec, args map[string]interface{}) (url.Values, error) { values := make(url.Values) if len(args) == 0 { diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index 6391b0090..c1c1ebf1a 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -103,7 +103,7 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, cdc *amino.Codec, logger lo return func(w http.ResponseWriter, r *http.Request) { b, err := ioutil.ReadAll(r.Body) if err != nil { - WriteRPCResponseHTTP(w, types.RPCInvalidRequestError(types.JSONRPCStringID(""), errors.Wrap(err, "Error reading request body"))) + WriteRPCResponseHTTP(w, types.RPCInvalidRequestError(types.JSONRPCStringID(""), errors.Wrap(err, "error reading request body"))) return } // if its an empty request (like from a browser), @@ -113,49 +113,59 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, cdc *amino.Codec, logger lo return } - var request types.RPCRequest - err = json.Unmarshal(b, &request) - if err != nil { - WriteRPCResponseHTTP(w, types.RPCParseError(types.JSONRPCStringID(""), errors.Wrap(err, "Error unmarshalling request"))) - return - } - // A Notification is a Request object without an "id" member. - // The Server MUST NOT reply to a Notification, including those that are within a batch request. - if request.ID == types.JSONRPCStringID("") { - logger.Debug("HTTPJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)") - return - } - if len(r.URL.Path) > 1 { - WriteRPCResponseHTTP(w, types.RPCInvalidRequestError(request.ID, errors.Errorf("Path %s is invalid", r.URL.Path))) - return - } - - rpcFunc := funcMap[request.Method] - if rpcFunc == nil || rpcFunc.ws { - WriteRPCResponseHTTP(w, types.RPCMethodNotFoundError(request.ID)) - return + // first try to unmarshal the incoming request as an array of RPC requests + var ( + requests []types.RPCRequest + responses []types.RPCResponse + ) + if err := json.Unmarshal(b, &requests); err != nil { + // next, try to unmarshal as a single request + var request types.RPCRequest + if err := json.Unmarshal(b, &request); err != nil { + WriteRPCResponseHTTP(w, types.RPCParseError(types.JSONRPCStringID(""), errors.Wrap(err, "error unmarshalling request"))) + return + } + requests = []types.RPCRequest{request} } - ctx := &types.Context{JSONReq: &request, HTTPReq: r} - args := []reflect.Value{reflect.ValueOf(ctx)} - if len(request.Params) > 0 { - fnArgs, err := jsonParamsToArgs(rpcFunc, cdc, request.Params) + for _, request := range requests { + // A Notification is a Request object without an "id" member. + // The Server MUST NOT reply to a Notification, including those that are within a batch request. + if request.ID == types.JSONRPCStringID("") { + logger.Debug("HTTPJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)") + continue + } + if len(r.URL.Path) > 1 { + responses = append(responses, types.RPCInvalidRequestError(request.ID, errors.Errorf("path %s is invalid", r.URL.Path))) + continue + } + rpcFunc, ok := funcMap[request.Method] + if !ok || rpcFunc.ws { + responses = append(responses, types.RPCMethodNotFoundError(request.ID)) + continue + } + ctx := &types.Context{JSONReq: &request, HTTPReq: r} + args := []reflect.Value{reflect.ValueOf(ctx)} + if len(request.Params) > 0 { + fnArgs, err := jsonParamsToArgs(rpcFunc, cdc, request.Params) + if err != nil { + responses = append(responses, types.RPCInvalidParamsError(request.ID, errors.Wrap(err, "error converting json params to arguments"))) + continue + } + args = append(args, fnArgs...) + } + returns := rpcFunc.f.Call(args) + logger.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns) + result, err := unreflectResult(returns) if err != nil { - WriteRPCResponseHTTP(w, types.RPCInvalidParamsError(request.ID, errors.Wrap(err, "Error converting json params to arguments"))) - return + responses = append(responses, types.RPCInternalError(request.ID, err)) + continue } - args = append(args, fnArgs...) + responses = append(responses, types.NewRPCSuccessResponse(cdc, request.ID, result)) } - - returns := rpcFunc.f.Call(args) - - logger.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns) - result, err := unreflectResult(returns) - if err != nil { - WriteRPCResponseHTTP(w, types.RPCInternalError(request.ID, err)) - return + if len(responses) > 0 { + WriteRPCResponseArrayHTTP(w, responses) } - WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse(cdc, request.ID, result)) } } @@ -194,7 +204,7 @@ func mapParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, params map[string]json. func arrayParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, params []json.RawMessage, argsOffset int) ([]reflect.Value, error) { if len(rpcFunc.argNames) != len(params) { - return nil, errors.Errorf("Expected %v parameters (%v), got %v (%v)", + return nil, errors.Errorf("expected %v parameters (%v), got %v (%v)", len(rpcFunc.argNames), rpcFunc.argNames, len(params), params) } @@ -236,7 +246,7 @@ func jsonParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, raw []byte) ([]reflect } // Otherwise, bad format, we cannot parse - return nil, errors.Errorf("Unknown type for JSON params: %v. Expected map or array", err) + return nil, errors.Errorf("unknown type for JSON params: %v. Expected map or array", err) } // rpc.json @@ -261,7 +271,7 @@ func makeHTTPHandler(rpcFunc *RPCFunc, cdc *amino.Codec, logger log.Logger) func fnArgs, err := httpParamsToArgs(rpcFunc, cdc, r) if err != nil { - WriteRPCResponseHTTP(w, types.RPCInvalidParamsError(types.JSONRPCStringID(""), errors.Wrap(err, "Error converting http params to arguments"))) + WriteRPCResponseHTTP(w, types.RPCInvalidParamsError(types.JSONRPCStringID(""), errors.Wrap(err, "error converting http params to arguments"))) return } args = append(args, fnArgs...) @@ -372,7 +382,7 @@ func _nonJSONStringToArg(cdc *amino.Codec, rt reflect.Type, arg string) (reflect if isHexString { if !expectingString && !expectingByteSlice { - err := errors.Errorf("Got a hex string arg, but expected '%s'", + err := errors.Errorf("got a hex string arg, but expected '%s'", rt.Kind().String()) return reflect.ValueOf(nil), err, false } @@ -631,7 +641,7 @@ func (wsc *wsConnection) readRoutine() { var request types.RPCRequest err = json.Unmarshal(in, &request) if err != nil { - wsc.WriteRPCResponse(types.RPCParseError(types.JSONRPCStringID(""), errors.Wrap(err, "Error unmarshaling request"))) + wsc.WriteRPCResponse(types.RPCParseError(types.JSONRPCStringID(""), errors.Wrap(err, "error unmarshaling request"))) continue } @@ -654,7 +664,7 @@ func (wsc *wsConnection) readRoutine() { if len(request.Params) > 0 { fnArgs, err := jsonParamsToArgs(rpcFunc, wsc.cdc, request.Params) if err != nil { - wsc.WriteRPCResponse(types.RPCInternalError(request.ID, errors.Wrap(err, "Error converting json params to arguments"))) + wsc.WriteRPCResponse(types.RPCInternalError(request.ID, errors.Wrap(err, "error converting json params to arguments"))) continue } args = append(args, fnArgs...) diff --git a/rpc/lib/server/handlers_test.go b/rpc/lib/server/handlers_test.go index f8ad06107..9cded2953 100644 --- a/rpc/lib/server/handlers_test.go +++ b/rpc/lib/server/handlers_test.go @@ -154,6 +154,72 @@ func TestRPCNotification(t *testing.T) { require.Equal(t, len(blob), 0, "a notification SHOULD NOT be responded to by the server") } +func TestRPCNotificationInBatch(t *testing.T) { + mux := testMux() + tests := []struct { + payload string + expectCount int + }{ + { + `[ + {"jsonrpc": "2.0","id": ""}, + {"jsonrpc": "2.0","method":"c","id":"abc","params":["a","10"]} + ]`, + 1, + }, + { + `[ + {"jsonrpc": "2.0","id": ""}, + {"jsonrpc": "2.0","method":"c","id":"abc","params":["a","10"]}, + {"jsonrpc": "2.0","id": ""}, + {"jsonrpc": "2.0","method":"c","id":"abc","params":["a","10"]} + ]`, + 2, + }, + } + for i, tt := range tests { + req, _ := http.NewRequest("POST", "http://localhost/", strings.NewReader(tt.payload)) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + res := rec.Result() + // Always expecting back a JSONRPCResponse + assert.True(t, statusOK(res.StatusCode), "#%d: should always return 2XX", i) + blob, err := ioutil.ReadAll(res.Body) + if err != nil { + t.Errorf("#%d: err reading body: %v", i, err) + continue + } + + var responses []types.RPCResponse + // try to unmarshal an array first + err = json.Unmarshal(blob, &responses) + if err != nil { + // if we were actually expecting an array, but got an error + if tt.expectCount > 1 { + t.Errorf("#%d: expected an array, couldn't unmarshal it\nblob: %s", i, blob) + continue + } else { + // we were expecting an error here, so let's unmarshal a single response + var response types.RPCResponse + err = json.Unmarshal(blob, &response) + if err != nil { + t.Errorf("#%d: expected successful parsing of an RPCResponse\nblob: %s", i, blob) + continue + } + // have a single-element result + responses = []types.RPCResponse{response} + } + } + if tt.expectCount != len(responses) { + t.Errorf("#%d: expected %d response(s), but got %d\nblob: %s", i, tt.expectCount, len(responses), blob) + continue + } + for _, response := range responses { + assert.NotEqual(t, response, new(types.RPCResponse), "#%d: not expecting a blank RPCResponse", i) + } + } +} + func TestUnknownRPCPath(t *testing.T) { mux := testMux() req, _ := http.NewRequest("GET", "http://localhost/unknownrpcpath", nil) diff --git a/rpc/lib/server/http_params.go b/rpc/lib/server/http_params.go index 3c948c0ba..8ade41c79 100644 --- a/rpc/lib/server/http_params.go +++ b/rpc/lib/server/http_params.go @@ -76,7 +76,7 @@ func GetParamUint(r *http.Request, param string) (uint, error) { func GetParamRegexp(r *http.Request, param string, re *regexp.Regexp) (string, error) { s := GetParam(r, param) if !re.MatchString(s) { - return "", errors.Errorf(param, "Did not match regular expression %v", re.String()) + return "", errors.Errorf(param, "did not match regular expression %v", re.String()) } return s, nil } diff --git a/rpc/lib/server/http_server.go b/rpc/lib/server/http_server.go index c4bb6fa17..7825605eb 100644 --- a/rpc/lib/server/http_server.go +++ b/rpc/lib/server/http_server.go @@ -98,7 +98,9 @@ func WriteRPCResponseHTTPError( w.Header().Set("Content-Type", "application/json") w.WriteHeader(httpCode) - w.Write(jsonBytes) // nolint: errcheck, gas + if _, err := w.Write(jsonBytes); err != nil { + panic(err) + } } func WriteRPCResponseHTTP(w http.ResponseWriter, res types.RPCResponse) { @@ -108,12 +110,33 @@ func WriteRPCResponseHTTP(w http.ResponseWriter, res types.RPCResponse) { } w.Header().Set("Content-Type", "application/json") w.WriteHeader(200) - w.Write(jsonBytes) // nolint: errcheck, gas + if _, err := w.Write(jsonBytes); err != nil { + panic(err) + } +} + +// WriteRPCResponseArrayHTTP will do the same as WriteRPCResponseHTTP, except it +// can write arrays of responses for batched request/response interactions via +// the JSON RPC. +func WriteRPCResponseArrayHTTP(w http.ResponseWriter, res []types.RPCResponse) { + if len(res) == 1 { + WriteRPCResponseHTTP(w, res[0]) + } else { + jsonBytes, err := json.MarshalIndent(res, "", " ") + if err != nil { + panic(err) + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(200) + if _, err := w.Write(jsonBytes); err != nil { + panic(err) + } + } } //----------------------------------------------------------------------------- -// Wraps an HTTP handler, adding error logging. +// RecoverAndLogHandler wraps an HTTP handler, adding error logging. // If the inner function panics, the outer function recovers, logs, sends an // HTTP 500 error response. func RecoverAndLogHandler(handler http.Handler, logger log.Logger) http.Handler { @@ -191,14 +214,14 @@ func Listen(addr string, config *Config) (listener net.Listener, err error) { parts := strings.SplitN(addr, "://", 2) if len(parts) != 2 { return nil, errors.Errorf( - "Invalid listening address %s (use fully formed addresses, including the tcp:// or unix:// prefix)", + "invalid listening address %s (use fully formed addresses, including the tcp:// or unix:// prefix)", addr, ) } proto, addr := parts[0], parts[1] listener, err = net.Listen(proto, addr) if err != nil { - return nil, errors.Errorf("Failed to listen on %v: %v", addr, err) + return nil, errors.Errorf("failed to listen on %v: %v", addr, err) } if config.MaxOpenConnections > 0 { listener = netutil.LimitListener(listener, config.MaxOpenConnections) diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index 10d165625..cddc80b8e 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -23,7 +23,18 @@ import ( rpcclient "github.com/tendermint/tendermint/rpc/lib/client" ) +// Options helps with specifying some parameters for our RPC testing for greater +// control. +type Options struct { + suppressStdout bool + recreateConfig bool +} + var globalConfig *cfg.Config +var defaultOptions = Options{ + suppressStdout: false, + recreateConfig: false, +} func waitForRPC() { laddr := GetConfig().RPC.ListenAddress @@ -77,19 +88,24 @@ func makeAddrs() (string, string, string) { fmt.Sprintf("tcp://0.0.0.0:%d", randPort()) } +func createConfig() *cfg.Config { + pathname := makePathname() + c := cfg.ResetTestRoot(pathname) + + // and we use random ports to run in parallel + tm, rpc, grpc := makeAddrs() + c.P2P.ListenAddress = tm + c.RPC.ListenAddress = rpc + c.RPC.CORSAllowedOrigins = []string{"https://tendermint.com/"} + c.RPC.GRPCListenAddress = grpc + c.TxIndex.IndexTags = "app.creator,tx.height" // see kvstore application + return c +} + // GetConfig returns a config for the test cases as a singleton -func GetConfig() *cfg.Config { - if globalConfig == nil { - pathname := makePathname() - globalConfig = cfg.ResetTestRoot(pathname) - - // and we use random ports to run in parallel - tm, rpc, grpc := makeAddrs() - globalConfig.P2P.ListenAddress = tm - globalConfig.RPC.ListenAddress = rpc - globalConfig.RPC.CORSAllowedOrigins = []string{"https://tendermint.com/"} - globalConfig.RPC.GRPCListenAddress = grpc - globalConfig.TxIndex.IndexTags = "app.creator,tx.height" // see kvstore application +func GetConfig(forceCreate ...bool) *cfg.Config { + if globalConfig == nil || (len(forceCreate) > 0 && forceCreate[0]) { + globalConfig = createConfig() } return globalConfig } @@ -100,8 +116,12 @@ func GetGRPCClient() core_grpc.BroadcastAPIClient { } // StartTendermint starts a test tendermint server in a go routine and returns when it is initialized -func StartTendermint(app abci.Application) *nm.Node { - node := NewTendermint(app) +func StartTendermint(app abci.Application, opts ...func(*Options)) *nm.Node { + nodeOpts := defaultOptions + for _, opt := range opts { + opt(&nodeOpts) + } + node := NewTendermint(app, &nodeOpts) err := node.Start() if err != nil { panic(err) @@ -111,7 +131,9 @@ func StartTendermint(app abci.Application) *nm.Node { waitForRPC() waitForGRPC() - fmt.Println("Tendermint running!") + if !nodeOpts.suppressStdout { + fmt.Println("Tendermint running!") + } return node } @@ -125,11 +147,16 @@ func StopTendermint(node *nm.Node) { } // NewTendermint creates a new tendermint server and sleeps forever -func NewTendermint(app abci.Application) *nm.Node { +func NewTendermint(app abci.Application, opts *Options) *nm.Node { // Create & start node - config := GetConfig() - logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout)) - logger = log.NewFilter(logger, log.AllowError()) + config := GetConfig(opts.recreateConfig) + var logger log.Logger + if opts.suppressStdout { + logger = log.NewNopLogger() + } else { + logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout)) + logger = log.NewFilter(logger, log.AllowError()) + } pvKeyFile := config.PrivValidatorKeyFile() pvKeyStateFile := config.PrivValidatorStateFile() pv := privval.LoadOrGenFilePV(pvKeyFile, pvKeyStateFile) @@ -148,3 +175,15 @@ func NewTendermint(app abci.Application) *nm.Node { } return node } + +// SuppressStdout is an option that tries to make sure the RPC test Tendermint +// node doesn't log anything to stdout. +func SuppressStdout(o *Options) { + o.suppressStdout = true +} + +// RecreateConfig instructs the RPC test to recreate the configuration each +// time, instead of treating it as a global singleton. +func RecreateConfig(o *Options) { + o.recreateConfig = true +}