Browse Source

rpc: add support for batched requests/responses (#3534)

Continues from #3280 in building support for batched requests/responses in the JSON RPC (as per issue #3213).

* Add JSON RPC batching for client and server

As per #3213, this adds support for [JSON RPC batch requests and
responses](https://www.jsonrpc.org/specification#batch).

* Add additional checks to ensure client responses are the same as results

* Fix case where a notification is sent and no response is expected

* Add test to check that JSON RPC notifications in a batch are left out in responses

* Update CHANGELOG_PENDING.md

* Update PR number now that PR has been created

* Make errors start with lowercase letter

* Refactor batch functionality to be standalone

This refactors the batching functionality to rather act in a standalone
way. In light of supporting concurrent goroutines making use of the same
client, it would make sense to have batching functionality where one
could create a batch of requests per goroutine and send that batch
without interfering with a batch from another goroutine.

* Add examples for simple and batch HTTP client usage

* Check errors from writer and remove nolinter directives

* Make error strings start with lowercase letter

* Refactor examples to make them testable

* Use safer deferred shutdown for example Tendermint test node

* Recompose rpcClient interface from pre-existing interface components

* Rename WaitGroup for brevity

* Replace empty ID string with request ID

* Remove extraneous test case

* Convert first letter of errors.Wrap() messages to lowercase

* Remove extraneous function parameter

* Make variable declaration terse

* Reorder WaitGroup.Done call to help prevent race conditions in the face of failure

* Swap mutex to value representation and remove initialization

* Restore empty JSONRPC string ID in response to prevent nil

* Make JSONRPCBufferedRequest private

* Revert PR hard link in CHANGELOG_PENDING

* Add client ID for JSONRPCClient

This adds code to automatically generate a randomized client ID for the
JSONRPCClient, and adds a check of the IDs in the responses (if one was
set in the requests).

* Extract response ID validation into separate function

* Remove extraneous comments

* Reorder fields to indicate clearly which are protected by the mutex

* Refactor for loop to remove indexing

* Restructure and combine loop

* Flatten conditional block for better readability

* Make multi-variable declaration slightly more readable

* Change for loop style

* Compress error check statements

* Make function description more generic to show that we support different protocols

* Preallocate memory for request and result objects
pull/3578/head
Thane Thomson 6 years ago
committed by Anton Kaliaev
parent
commit
90465f727f
11 changed files with 746 additions and 132 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +126
    -0
      rpc/client/examples_test.go
  3. +1
    -1
      rpc/client/helpers.go
  4. +135
    -52
      rpc/client/httpclient.go
  5. +100
    -0
      rpc/client/rpc_test.go
  6. +176
    -10
      rpc/lib/client/http_client.go
  7. +54
    -44
      rpc/lib/server/handlers.go
  8. +66
    -0
      rpc/lib/server/handlers_test.go
  9. +1
    -1
      rpc/lib/server/http_params.go
  10. +28
    -5
      rpc/lib/server/http_server.go
  11. +58
    -19
      rpc/test/helpers.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -17,6 +17,7 @@
### FEATURES:
### IMPROVEMENTS:
- [rpc] [\#3534](https://github.com/tendermint/tendermint/pull/3534) Add support for batched requests/responses in JSON RPC
### BUG FIXES:
- [state] [\#3537](https://github.com/tendermint/tendermint/pull/3537#issuecomment-482711833) LoadValidators: do not return an empty validator set


+ 126
- 0
rpc/client/examples_test.go View File

@ -0,0 +1,126 @@
package client_test
import (
"bytes"
"fmt"
"github.com/tendermint/tendermint/abci/example/kvstore"
"github.com/tendermint/tendermint/rpc/client"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctest "github.com/tendermint/tendermint/rpc/test"
)
func ExampleHTTP_simple() {
// Start a tendermint node (and kvstore) in the background to test against
app := kvstore.NewKVStoreApplication()
node := rpctest.StartTendermint(app, rpctest.SuppressStdout, rpctest.RecreateConfig)
defer rpctest.StopTendermint(node)
// Create our RPC client
rpcAddr := rpctest.GetConfig().RPC.ListenAddress
c := client.NewHTTP(rpcAddr, "/websocket")
// Create a transaction
k := []byte("name")
v := []byte("satoshi")
tx := append(k, append([]byte("="), v...)...)
// Broadcast the transaction and wait for it to commit (rather use
// c.BroadcastTxSync though in production)
bres, err := c.BroadcastTxCommit(tx)
if err != nil {
panic(err)
}
if bres.CheckTx.IsErr() || bres.DeliverTx.IsErr() {
panic("BroadcastTxCommit transaction failed")
}
// Now try to fetch the value for the key
qres, err := c.ABCIQuery("/key", k)
if err != nil {
panic(err)
}
if qres.Response.IsErr() {
panic("ABCIQuery failed")
}
if !bytes.Equal(qres.Response.Key, k) {
panic("returned key does not match queried key")
}
if !bytes.Equal(qres.Response.Value, v) {
panic("returned value does not match sent value")
}
fmt.Println("Sent tx :", string(tx))
fmt.Println("Queried for :", string(qres.Response.Key))
fmt.Println("Got value :", string(qres.Response.Value))
// Output:
// Sent tx : name=satoshi
// Queried for : name
// Got value : satoshi
}
func ExampleHTTP_batching() {
// Start a tendermint node (and kvstore) in the background to test against
app := kvstore.NewKVStoreApplication()
node := rpctest.StartTendermint(app, rpctest.SuppressStdout, rpctest.RecreateConfig)
defer rpctest.StopTendermint(node)
// Create our RPC client
rpcAddr := rpctest.GetConfig().RPC.ListenAddress
c := client.NewHTTP(rpcAddr, "/websocket")
// Create our two transactions
k1 := []byte("firstName")
v1 := []byte("satoshi")
tx1 := append(k1, append([]byte("="), v1...)...)
k2 := []byte("lastName")
v2 := []byte("nakamoto")
tx2 := append(k2, append([]byte("="), v2...)...)
txs := [][]byte{tx1, tx2}
// Create a new batch
batch := c.NewBatch()
// Queue up our transactions
for _, tx := range txs {
if _, err := batch.BroadcastTxCommit(tx); err != nil {
panic(err)
}
}
// Send the batch of 2 transactions
if _, err := batch.Send(); err != nil {
panic(err)
}
// Now let's query for the original results as a batch
keys := [][]byte{k1, k2}
for _, key := range keys {
if _, err := batch.ABCIQuery("/key", key); err != nil {
panic(err)
}
}
// Send the 2 queries and keep the results
results, err := batch.Send()
if err != nil {
panic(err)
}
// Each result in the returned list is the deserialized result of each
// respective ABCIQuery response
for _, result := range results {
qr, ok := result.(*ctypes.ResultABCIQuery)
if !ok {
panic("invalid result type from ABCIQuery request")
}
fmt.Println(string(qr.Response.Key), "=", string(qr.Response.Value))
}
// Output:
// firstName = satoshi
// lastName = nakamoto
}

+ 1
- 1
rpc/client/helpers.go View File

@ -15,7 +15,7 @@ type Waiter func(delta int64) (abort error)
// but you can plug in another one
func DefaultWaitStrategy(delta int64) (abort error) {
if delta > 10 {
return errors.Errorf("Waiting for %d blocks... aborting", delta)
return errors.Errorf("waiting for %d blocks... aborting", delta)
} else if delta > 0 {
// estimate of wait time....
// wait half a second for the next block (in progress)


+ 135
- 52
rpc/client/httpclient.go View File

@ -18,27 +18,72 @@ import (
)
/*
HTTP is a Client implementation that communicates with a tendermint node over
json rpc and websockets.
HTTP is a Client implementation that communicates with a Tendermint node over
JSON RPC and WebSockets.
This is the main implementation you probably want to use in production code.
There are other implementations when calling the tendermint node in-process
There are other implementations when calling the Tendermint node in-process
(Local), or when you want to mock out the server for test code (mock).
You can subscribe for any event published by Tendermint using Subscribe method.
Note delivery is best-effort. If you don't read events fast enough or network
is slow, Tendermint might cancel the subscription. The client will attempt to
Note delivery is best-effort. If you don't read events fast enough or network is
slow, Tendermint might cancel the subscription. The client will attempt to
resubscribe (you don't need to do anything). It will keep trying every second
indefinitely until successful.
Request batching is available for JSON RPC requests over HTTP, which conforms to
the JSON RPC specification (https://www.jsonrpc.org/specification#batch). See
the example for more details.
*/
type HTTP struct {
remote string
rpc *rpcclient.JSONRPCClient
*baseRPCClient
*WSEvents
}
// NewHTTP takes a remote endpoint in the form tcp://<host>:<port>
// and the websocket path (which always seems to be "/websocket")
// BatchHTTP provides the same interface as `HTTP`, but allows for batching of
// requests (as per https://www.jsonrpc.org/specification#batch). Do not
// instantiate directly - rather use the HTTP.NewBatch() method to create an
// instance of this struct.
//
// Batching of HTTP requests is thread-safe in the sense that multiple
// goroutines can each create their own batches and send them using the same
// HTTP client. Multiple goroutines could also enqueue transactions in a single
// batch, but ordering of transactions in the batch cannot be guaranteed in such
// an example.
type BatchHTTP struct {
rpcBatch *rpcclient.JSONRPCRequestBatch
*baseRPCClient
}
// rpcClient is an internal interface to which our RPC clients (batch and
// non-batch) must conform. Acts as an additional code-level sanity check to
// make sure the implementations stay coherent.
type rpcClient interface {
ABCIClient
HistoryClient
NetworkClient
SignClient
StatusClient
}
// baseRPCClient implements the basic RPC method logic without the actual
// underlying RPC call functionality, which is provided by `caller`.
type baseRPCClient struct {
caller rpcclient.JSONRPCCaller
}
var _ rpcClient = (*HTTP)(nil)
var _ rpcClient = (*BatchHTTP)(nil)
var _ rpcClient = (*baseRPCClient)(nil)
//-----------------------------------------------------------------------------
// HTTP
// NewHTTP takes a remote endpoint in the form <protocol>://<host>:<port> and
// the websocket path (which always seems to be "/websocket")
func NewHTTP(remote, wsEndpoint string) *HTTP {
rc := rpcclient.NewJSONRPCClient(remote)
cdc := rc.Codec()
@ -46,39 +91,76 @@ func NewHTTP(remote, wsEndpoint string) *HTTP {
rc.SetCodec(cdc)
return &HTTP{
rpc: rc,
remote: remote,
WSEvents: newWSEvents(cdc, remote, wsEndpoint),
rpc: rc,
remote: remote,
baseRPCClient: &baseRPCClient{caller: rc},
WSEvents: newWSEvents(cdc, remote, wsEndpoint),
}
}
var _ Client = (*HTTP)(nil)
func (c *HTTP) Status() (*ctypes.ResultStatus, error) {
// NewBatch creates a new batch client for this HTTP client.
func (c *HTTP) NewBatch() *BatchHTTP {
rpcBatch := c.rpc.NewRequestBatch()
return &BatchHTTP{
rpcBatch: rpcBatch,
baseRPCClient: &baseRPCClient{
caller: rpcBatch,
},
}
}
//-----------------------------------------------------------------------------
// BatchHTTP
// Send is a convenience function for an HTTP batch that will trigger the
// compilation of the batched requests and send them off using the client as a
// single request. On success, this returns a list of the deserialized results
// from each request in the sent batch.
func (b *BatchHTTP) Send() ([]interface{}, error) {
return b.rpcBatch.Send()
}
// Clear will empty out this batch of requests and return the number of requests
// that were cleared out.
func (b *BatchHTTP) Clear() int {
return b.rpcBatch.Clear()
}
// Count returns the number of enqueued requests waiting to be sent.
func (b *BatchHTTP) Count() int {
return b.rpcBatch.Count()
}
//-----------------------------------------------------------------------------
// baseRPCClient
func (c *baseRPCClient) Status() (*ctypes.ResultStatus, error) {
result := new(ctypes.ResultStatus)
_, err := c.rpc.Call("status", map[string]interface{}{}, result)
_, err := c.caller.Call("status", map[string]interface{}{}, result)
if err != nil {
return nil, errors.Wrap(err, "Status")
}
return result, nil
}
func (c *HTTP) ABCIInfo() (*ctypes.ResultABCIInfo, error) {
func (c *baseRPCClient) ABCIInfo() (*ctypes.ResultABCIInfo, error) {
result := new(ctypes.ResultABCIInfo)
_, err := c.rpc.Call("abci_info", map[string]interface{}{}, result)
_, err := c.caller.Call("abci_info", map[string]interface{}{}, result)
if err != nil {
return nil, errors.Wrap(err, "ABCIInfo")
}
return result, nil
}
func (c *HTTP) ABCIQuery(path string, data cmn.HexBytes) (*ctypes.ResultABCIQuery, error) {
func (c *baseRPCClient) ABCIQuery(path string, data cmn.HexBytes) (*ctypes.ResultABCIQuery, error) {
return c.ABCIQueryWithOptions(path, data, DefaultABCIQueryOptions)
}
func (c *HTTP) ABCIQueryWithOptions(path string, data cmn.HexBytes, opts ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) {
func (c *baseRPCClient) ABCIQueryWithOptions(path string, data cmn.HexBytes, opts ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) {
result := new(ctypes.ResultABCIQuery)
_, err := c.rpc.Call("abci_query",
_, err := c.caller.Call("abci_query",
map[string]interface{}{"path": path, "data": data, "height": opts.Height, "prove": opts.Prove},
result)
if err != nil {
@ -87,89 +169,89 @@ func (c *HTTP) ABCIQueryWithOptions(path string, data cmn.HexBytes, opts ABCIQue
return result, nil
}
func (c *HTTP) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
func (c *baseRPCClient) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
result := new(ctypes.ResultBroadcastTxCommit)
_, err := c.rpc.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, result)
_, err := c.caller.Call("broadcast_tx_commit", map[string]interface{}{"tx": tx}, result)
if err != nil {
return nil, errors.Wrap(err, "broadcast_tx_commit")
}
return result, nil
}
func (c *HTTP) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
func (c *baseRPCClient) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
return c.broadcastTX("broadcast_tx_async", tx)
}
func (c *HTTP) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
func (c *baseRPCClient) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
return c.broadcastTX("broadcast_tx_sync", tx)
}
func (c *HTTP) broadcastTX(route string, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
func (c *baseRPCClient) broadcastTX(route string, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
result := new(ctypes.ResultBroadcastTx)
_, err := c.rpc.Call(route, map[string]interface{}{"tx": tx}, result)
_, err := c.caller.Call(route, map[string]interface{}{"tx": tx}, result)
if err != nil {
return nil, errors.Wrap(err, route)
}
return result, nil
}
func (c *HTTP) UnconfirmedTxs(limit int) (*ctypes.ResultUnconfirmedTxs, error) {
func (c *baseRPCClient) UnconfirmedTxs(limit int) (*ctypes.ResultUnconfirmedTxs, error) {
result := new(ctypes.ResultUnconfirmedTxs)
_, err := c.rpc.Call("unconfirmed_txs", map[string]interface{}{"limit": limit}, result)
_, err := c.caller.Call("unconfirmed_txs", map[string]interface{}{"limit": limit}, result)
if err != nil {
return nil, errors.Wrap(err, "unconfirmed_txs")
}
return result, nil
}
func (c *HTTP) NumUnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) {
func (c *baseRPCClient) NumUnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) {
result := new(ctypes.ResultUnconfirmedTxs)
_, err := c.rpc.Call("num_unconfirmed_txs", map[string]interface{}{}, result)
_, err := c.caller.Call("num_unconfirmed_txs", map[string]interface{}{}, result)
if err != nil {
return nil, errors.Wrap(err, "num_unconfirmed_txs")
}
return result, nil
}
func (c *HTTP) NetInfo() (*ctypes.ResultNetInfo, error) {
func (c *baseRPCClient) NetInfo() (*ctypes.ResultNetInfo, error) {
result := new(ctypes.ResultNetInfo)
_, err := c.rpc.Call("net_info", map[string]interface{}{}, result)
_, err := c.caller.Call("net_info", map[string]interface{}{}, result)
if err != nil {
return nil, errors.Wrap(err, "NetInfo")
}
return result, nil
}
func (c *HTTP) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
func (c *baseRPCClient) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
result := new(ctypes.ResultDumpConsensusState)
_, err := c.rpc.Call("dump_consensus_state", map[string]interface{}{}, result)
_, err := c.caller.Call("dump_consensus_state", map[string]interface{}{}, result)
if err != nil {
return nil, errors.Wrap(err, "DumpConsensusState")
}
return result, nil
}
func (c *HTTP) ConsensusState() (*ctypes.ResultConsensusState, error) {
func (c *baseRPCClient) ConsensusState() (*ctypes.ResultConsensusState, error) {
result := new(ctypes.ResultConsensusState)
_, err := c.rpc.Call("consensus_state", map[string]interface{}{}, result)
_, err := c.caller.Call("consensus_state", map[string]interface{}{}, result)
if err != nil {
return nil, errors.Wrap(err, "ConsensusState")
}
return result, nil
}
func (c *HTTP) Health() (*ctypes.ResultHealth, error) {
func (c *baseRPCClient) Health() (*ctypes.ResultHealth, error) {
result := new(ctypes.ResultHealth)
_, err := c.rpc.Call("health", map[string]interface{}{}, result)
_, err := c.caller.Call("health", map[string]interface{}{}, result)
if err != nil {
return nil, errors.Wrap(err, "Health")
}
return result, nil
}
func (c *HTTP) BlockchainInfo(minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) {
func (c *baseRPCClient) BlockchainInfo(minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) {
result := new(ctypes.ResultBlockchainInfo)
_, err := c.rpc.Call("blockchain",
_, err := c.caller.Call("blockchain",
map[string]interface{}{"minHeight": minHeight, "maxHeight": maxHeight},
result)
if err != nil {
@ -178,56 +260,56 @@ func (c *HTTP) BlockchainInfo(minHeight, maxHeight int64) (*ctypes.ResultBlockch
return result, nil
}
func (c *HTTP) Genesis() (*ctypes.ResultGenesis, error) {
func (c *baseRPCClient) Genesis() (*ctypes.ResultGenesis, error) {
result := new(ctypes.ResultGenesis)
_, err := c.rpc.Call("genesis", map[string]interface{}{}, result)
_, err := c.caller.Call("genesis", map[string]interface{}{}, result)
if err != nil {
return nil, errors.Wrap(err, "Genesis")
}
return result, nil
}
func (c *HTTP) Block(height *int64) (*ctypes.ResultBlock, error) {
func (c *baseRPCClient) Block(height *int64) (*ctypes.ResultBlock, error) {
result := new(ctypes.ResultBlock)
_, err := c.rpc.Call("block", map[string]interface{}{"height": height}, result)
_, err := c.caller.Call("block", map[string]interface{}{"height": height}, result)
if err != nil {
return nil, errors.Wrap(err, "Block")
}
return result, nil
}
func (c *HTTP) BlockResults(height *int64) (*ctypes.ResultBlockResults, error) {
func (c *baseRPCClient) BlockResults(height *int64) (*ctypes.ResultBlockResults, error) {
result := new(ctypes.ResultBlockResults)
_, err := c.rpc.Call("block_results", map[string]interface{}{"height": height}, result)
_, err := c.caller.Call("block_results", map[string]interface{}{"height": height}, result)
if err != nil {
return nil, errors.Wrap(err, "Block Result")
}
return result, nil
}
func (c *HTTP) Commit(height *int64) (*ctypes.ResultCommit, error) {
func (c *baseRPCClient) Commit(height *int64) (*ctypes.ResultCommit, error) {
result := new(ctypes.ResultCommit)
_, err := c.rpc.Call("commit", map[string]interface{}{"height": height}, result)
_, err := c.caller.Call("commit", map[string]interface{}{"height": height}, result)
if err != nil {
return nil, errors.Wrap(err, "Commit")
}
return result, nil
}
func (c *HTTP) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
func (c *baseRPCClient) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
result := new(ctypes.ResultTx)
params := map[string]interface{}{
"hash": hash,
"prove": prove,
}
_, err := c.rpc.Call("tx", params, result)
_, err := c.caller.Call("tx", params, result)
if err != nil {
return nil, errors.Wrap(err, "Tx")
}
return result, nil
}
func (c *HTTP) TxSearch(query string, prove bool, page, perPage int) (*ctypes.ResultTxSearch, error) {
func (c *baseRPCClient) TxSearch(query string, prove bool, page, perPage int) (*ctypes.ResultTxSearch, error) {
result := new(ctypes.ResultTxSearch)
params := map[string]interface{}{
"query": query,
@ -235,23 +317,24 @@ func (c *HTTP) TxSearch(query string, prove bool, page, perPage int) (*ctypes.Re
"page": page,
"per_page": perPage,
}
_, err := c.rpc.Call("tx_search", params, result)
_, err := c.caller.Call("tx_search", params, result)
if err != nil {
return nil, errors.Wrap(err, "TxSearch")
}
return result, nil
}
func (c *HTTP) Validators(height *int64) (*ctypes.ResultValidators, error) {
func (c *baseRPCClient) Validators(height *int64) (*ctypes.ResultValidators, error) {
result := new(ctypes.ResultValidators)
_, err := c.rpc.Call("validators", map[string]interface{}{"height": height}, result)
_, err := c.caller.Call("validators", map[string]interface{}{"height": height}, result)
if err != nil {
return nil, errors.Wrap(err, "Validators")
}
return result, nil
}
/** websocket event stuff here... **/
//-----------------------------------------------------------------------------
// WSEvents
type WSEvents struct {
cmn.BaseService


+ 100
- 0
rpc/client/rpc_test.go View File

@ -4,6 +4,7 @@ import (
"fmt"
"net/http"
"strings"
"sync"
"testing"
"github.com/stretchr/testify/assert"
@ -11,7 +12,9 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/rpc/client"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctest "github.com/tendermint/tendermint/rpc/test"
"github.com/tendermint/tendermint/types"
)
@ -441,3 +444,100 @@ func TestTxSearch(t *testing.T) {
require.Len(t, result.Txs, 0)
}
}
func TestBatchedJSONRPCCalls(t *testing.T) {
c := getHTTPClient()
testBatchedJSONRPCCalls(t, c)
}
func testBatchedJSONRPCCalls(t *testing.T, c *client.HTTP) {
k1, v1, tx1 := MakeTxKV()
k2, v2, tx2 := MakeTxKV()
batch := c.NewBatch()
r1, err := batch.BroadcastTxCommit(tx1)
require.NoError(t, err)
r2, err := batch.BroadcastTxCommit(tx2)
require.NoError(t, err)
require.Equal(t, 2, batch.Count())
bresults, err := batch.Send()
require.NoError(t, err)
require.Len(t, bresults, 2)
require.Equal(t, 0, batch.Count())
bresult1, ok := bresults[0].(*ctypes.ResultBroadcastTxCommit)
require.True(t, ok)
require.Equal(t, *bresult1, *r1)
bresult2, ok := bresults[1].(*ctypes.ResultBroadcastTxCommit)
require.True(t, ok)
require.Equal(t, *bresult2, *r2)
apph := cmn.MaxInt64(bresult1.Height, bresult2.Height) + 1
client.WaitForHeight(c, apph, nil)
q1, err := batch.ABCIQuery("/key", k1)
require.NoError(t, err)
q2, err := batch.ABCIQuery("/key", k2)
require.NoError(t, err)
require.Equal(t, 2, batch.Count())
qresults, err := batch.Send()
require.NoError(t, err)
require.Len(t, qresults, 2)
require.Equal(t, 0, batch.Count())
qresult1, ok := qresults[0].(*ctypes.ResultABCIQuery)
require.True(t, ok)
require.Equal(t, *qresult1, *q1)
qresult2, ok := qresults[1].(*ctypes.ResultABCIQuery)
require.True(t, ok)
require.Equal(t, *qresult2, *q2)
require.Equal(t, qresult1.Response.Key, k1)
require.Equal(t, qresult2.Response.Key, k2)
require.Equal(t, qresult1.Response.Value, v1)
require.Equal(t, qresult2.Response.Value, v2)
}
func TestBatchedJSONRPCCallsCancellation(t *testing.T) {
c := getHTTPClient()
_, _, tx1 := MakeTxKV()
_, _, tx2 := MakeTxKV()
batch := c.NewBatch()
_, err := batch.BroadcastTxCommit(tx1)
require.NoError(t, err)
_, err = batch.BroadcastTxCommit(tx2)
require.NoError(t, err)
// we should have 2 requests waiting
require.Equal(t, 2, batch.Count())
// we want to make sure we cleared 2 pending requests
require.Equal(t, 2, batch.Clear())
// now there should be no batched requests
require.Equal(t, 0, batch.Count())
}
func TestSendingEmptyJSONRPCRequestBatch(t *testing.T) {
c := getHTTPClient()
batch := c.NewBatch()
_, err := batch.Send()
require.Error(t, err, "sending an empty batch of JSON RPC requests should result in an error")
}
func TestClearingEmptyJSONRPCRequestBatch(t *testing.T) {
c := getHTTPClient()
batch := c.NewBatch()
require.Zero(t, batch.Clear(), "clearing an empty batch of JSON RPC requests should result in a 0 result")
}
func TestConcurrentJSONRPCBatching(t *testing.T) {
var wg sync.WaitGroup
c := getHTTPClient()
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
defer wg.Done()
testBatchedJSONRPCCalls(t, c)
}()
}
wg.Wait()
}

+ 176
- 10
rpc/lib/client/http_client.go View File

@ -10,9 +10,11 @@ import (
"net/url"
"reflect"
"strings"
"sync"
"github.com/pkg/errors"
amino "github.com/tendermint/go-amino"
cmn "github.com/tendermint/tendermint/libs/common"
types "github.com/tendermint/tendermint/rpc/lib/types"
)
@ -83,25 +85,56 @@ func makeHTTPClient(remoteAddr string) (string, *http.Client) {
//------------------------------------------------------------------------------------
// jsonRPCBufferedRequest encapsulates a single buffered request, as well as its
// anticipated response structure.
type jsonRPCBufferedRequest struct {
request types.RPCRequest
result interface{} // The result will be deserialized into this object.
}
// JSONRPCRequestBatch allows us to buffer multiple request/response structures
// into a single batch request. Note that this batch acts like a FIFO queue, and
// is thread-safe.
type JSONRPCRequestBatch struct {
client *JSONRPCClient
mtx sync.Mutex
requests []*jsonRPCBufferedRequest
}
// JSONRPCClient takes params as a slice
type JSONRPCClient struct {
address string
client *http.Client
id types.JSONRPCStringID
cdc *amino.Codec
}
// JSONRPCCaller implementers can facilitate calling the JSON RPC endpoint.
type JSONRPCCaller interface {
Call(method string, params map[string]interface{}, result interface{}) (interface{}, error)
}
// Both JSONRPCClient and JSONRPCRequestBatch can facilitate calls to the JSON
// RPC endpoint.
var _ JSONRPCCaller = (*JSONRPCClient)(nil)
var _ JSONRPCCaller = (*JSONRPCRequestBatch)(nil)
// NewJSONRPCClient returns a JSONRPCClient pointed at the given address.
func NewJSONRPCClient(remote string) *JSONRPCClient {
address, client := makeHTTPClient(remote)
return &JSONRPCClient{
address: address,
client: client,
id: types.JSONRPCStringID("jsonrpc-client-" + cmn.RandStr(8)),
cdc: amino.NewCodec(),
}
}
// Call will send the request for the given method through to the RPC endpoint
// immediately, without buffering of requests.
func (c *JSONRPCClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) {
request, err := types.MapToRequest(c.cdc, types.JSONRPCStringID("jsonrpc-client"), method, params)
request, err := types.MapToRequest(c.cdc, c.id, method, params)
if err != nil {
return nil, err
}
@ -109,9 +142,7 @@ func (c *JSONRPCClient) Call(method string, params map[string]interface{}, resul
if err != nil {
return nil, err
}
// log.Info(string(requestBytes))
requestBuf := bytes.NewBuffer(requestBytes)
// log.Info(Fmt("RPC request to %v (%v): %v", c.remote, method, string(requestBytes)))
httpResponse, err := c.client.Post(c.address, "text/json", requestBuf)
if err != nil {
return nil, err
@ -122,8 +153,40 @@ func (c *JSONRPCClient) Call(method string, params map[string]interface{}, resul
if err != nil {
return nil, err
}
// log.Info(Fmt("RPC response: %v", string(responseBytes)))
return unmarshalResponseBytes(c.cdc, responseBytes, result)
return unmarshalResponseBytes(c.cdc, responseBytes, c.id, result)
}
// NewRequestBatch starts a batch of requests for this client.
func (c *JSONRPCClient) NewRequestBatch() *JSONRPCRequestBatch {
return &JSONRPCRequestBatch{
requests: make([]*jsonRPCBufferedRequest, 0),
client: c,
}
}
func (c *JSONRPCClient) sendBatch(requests []*jsonRPCBufferedRequest) ([]interface{}, error) {
reqs := make([]types.RPCRequest, 0, len(requests))
results := make([]interface{}, 0, len(requests))
for _, req := range requests {
reqs = append(reqs, req.request)
results = append(results, req.result)
}
// serialize the array of requests into a single JSON object
requestBytes, err := json.Marshal(reqs)
if err != nil {
return nil, err
}
httpResponse, err := c.client.Post(c.address, "text/json", bytes.NewBuffer(requestBytes))
if err != nil {
return nil, err
}
defer httpResponse.Body.Close() // nolint: errcheck
responseBytes, err := ioutil.ReadAll(httpResponse.Body)
if err != nil {
return nil, err
}
return unmarshalResponseBytesArray(c.cdc, responseBytes, c.id, results)
}
func (c *JSONRPCClient) Codec() *amino.Codec {
@ -136,6 +199,57 @@ func (c *JSONRPCClient) SetCodec(cdc *amino.Codec) {
//-------------------------------------------------------------
// Count returns the number of enqueued requests waiting to be sent.
func (b *JSONRPCRequestBatch) Count() int {
b.mtx.Lock()
defer b.mtx.Unlock()
return len(b.requests)
}
func (b *JSONRPCRequestBatch) enqueue(req *jsonRPCBufferedRequest) {
b.mtx.Lock()
defer b.mtx.Unlock()
b.requests = append(b.requests, req)
}
// Clear empties out the request batch.
func (b *JSONRPCRequestBatch) Clear() int {
b.mtx.Lock()
defer b.mtx.Unlock()
return b.clear()
}
func (b *JSONRPCRequestBatch) clear() int {
count := len(b.requests)
b.requests = make([]*jsonRPCBufferedRequest, 0)
return count
}
// Send will attempt to send the current batch of enqueued requests, and then
// will clear out the requests once done. On success, this returns the
// deserialized list of results from each of the enqueued requests.
func (b *JSONRPCRequestBatch) Send() ([]interface{}, error) {
b.mtx.Lock()
defer func() {
b.clear()
b.mtx.Unlock()
}()
return b.client.sendBatch(b.requests)
}
// Call enqueues a request to call the given RPC method with the specified
// parameters, in the same way that the `JSONRPCClient.Call` function would.
func (b *JSONRPCRequestBatch) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) {
request, err := types.MapToRequest(b.client.cdc, b.client.id, method, params)
if err != nil {
return nil, err
}
b.enqueue(&jsonRPCBufferedRequest{request: request, result: result})
return result, nil
}
//-------------------------------------------------------------
// URI takes params as a map
type URIClient struct {
address string
@ -168,7 +282,7 @@ func (c *URIClient) Call(method string, params map[string]interface{}, result in
if err != nil {
return nil, err
}
return unmarshalResponseBytes(c.cdc, responseBytes, result)
return unmarshalResponseBytes(c.cdc, responseBytes, "", result)
}
func (c *URIClient) Codec() *amino.Codec {
@ -181,7 +295,7 @@ func (c *URIClient) SetCodec(cdc *amino.Codec) {
//------------------------------------------------
func unmarshalResponseBytes(cdc *amino.Codec, responseBytes []byte, result interface{}) (interface{}, error) {
func unmarshalResponseBytes(cdc *amino.Codec, responseBytes []byte, expectedID types.JSONRPCStringID, result interface{}) (interface{}, error) {
// Read response. If rpc/core/types is imported, the result will unmarshal
// into the correct type.
// log.Notice("response", "response", string(responseBytes))
@ -189,19 +303,71 @@ func unmarshalResponseBytes(cdc *amino.Codec, responseBytes []byte, result inter
response := &types.RPCResponse{}
err = json.Unmarshal(responseBytes, response)
if err != nil {
return nil, errors.Errorf("Error unmarshalling rpc response: %v", err)
return nil, errors.Errorf("error unmarshalling rpc response: %v", err)
}
if response.Error != nil {
return nil, errors.Errorf("Response error: %v", response.Error)
return nil, errors.Errorf("response error: %v", response.Error)
}
// From the JSON-RPC 2.0 spec:
// id: It MUST be the same as the value of the id member in the Request Object.
if err := validateResponseID(response, expectedID); err != nil {
return nil, err
}
// Unmarshal the RawMessage into the result.
err = cdc.UnmarshalJSON(response.Result, result)
if err != nil {
return nil, errors.Errorf("Error unmarshalling rpc response result: %v", err)
return nil, errors.Errorf("error unmarshalling rpc response result: %v", err)
}
return result, nil
}
func unmarshalResponseBytesArray(cdc *amino.Codec, responseBytes []byte, expectedID types.JSONRPCStringID, results []interface{}) ([]interface{}, error) {
var (
err error
responses []types.RPCResponse
)
err = json.Unmarshal(responseBytes, &responses)
if err != nil {
return nil, errors.Errorf("error unmarshalling rpc response: %v", err)
}
// No response error checking here as there may be a mixture of successful
// and unsuccessful responses.
if len(results) != len(responses) {
return nil, errors.Errorf("expected %d result objects into which to inject responses, but got %d", len(responses), len(results))
}
for i, response := range responses {
// From the JSON-RPC 2.0 spec:
// id: It MUST be the same as the value of the id member in the Request Object.
if err := validateResponseID(&response, expectedID); err != nil {
return nil, errors.Errorf("failed to validate response ID in response %d: %v", i, err)
}
if err := cdc.UnmarshalJSON(responses[i].Result, results[i]); err != nil {
return nil, errors.Errorf("error unmarshalling rpc response result: %v", err)
}
}
return results, nil
}
func validateResponseID(res *types.RPCResponse, expectedID types.JSONRPCStringID) error {
// we only validate a response ID if the expected ID is non-empty
if len(expectedID) == 0 {
return nil
}
if res.ID == nil {
return errors.Errorf("missing ID in response")
}
id, ok := res.ID.(types.JSONRPCStringID)
if !ok {
return errors.Errorf("expected ID string in response but got: %v", id)
}
if expectedID != id {
return errors.Errorf("response ID (%s) does not match request ID (%s)", id, expectedID)
}
return nil
}
func argsToURLValues(cdc *amino.Codec, args map[string]interface{}) (url.Values, error) {
values := make(url.Values)
if len(args) == 0 {


+ 54
- 44
rpc/lib/server/handlers.go View File

@ -103,7 +103,7 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, cdc *amino.Codec, logger lo
return func(w http.ResponseWriter, r *http.Request) {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
WriteRPCResponseHTTP(w, types.RPCInvalidRequestError(types.JSONRPCStringID(""), errors.Wrap(err, "Error reading request body")))
WriteRPCResponseHTTP(w, types.RPCInvalidRequestError(types.JSONRPCStringID(""), errors.Wrap(err, "error reading request body")))
return
}
// if its an empty request (like from a browser),
@ -113,49 +113,59 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, cdc *amino.Codec, logger lo
return
}
var request types.RPCRequest
err = json.Unmarshal(b, &request)
if err != nil {
WriteRPCResponseHTTP(w, types.RPCParseError(types.JSONRPCStringID(""), errors.Wrap(err, "Error unmarshalling request")))
return
}
// A Notification is a Request object without an "id" member.
// The Server MUST NOT reply to a Notification, including those that are within a batch request.
if request.ID == types.JSONRPCStringID("") {
logger.Debug("HTTPJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)")
return
}
if len(r.URL.Path) > 1 {
WriteRPCResponseHTTP(w, types.RPCInvalidRequestError(request.ID, errors.Errorf("Path %s is invalid", r.URL.Path)))
return
}
rpcFunc := funcMap[request.Method]
if rpcFunc == nil || rpcFunc.ws {
WriteRPCResponseHTTP(w, types.RPCMethodNotFoundError(request.ID))
return
// first try to unmarshal the incoming request as an array of RPC requests
var (
requests []types.RPCRequest
responses []types.RPCResponse
)
if err := json.Unmarshal(b, &requests); err != nil {
// next, try to unmarshal as a single request
var request types.RPCRequest
if err := json.Unmarshal(b, &request); err != nil {
WriteRPCResponseHTTP(w, types.RPCParseError(types.JSONRPCStringID(""), errors.Wrap(err, "error unmarshalling request")))
return
}
requests = []types.RPCRequest{request}
}
ctx := &types.Context{JSONReq: &request, HTTPReq: r}
args := []reflect.Value{reflect.ValueOf(ctx)}
if len(request.Params) > 0 {
fnArgs, err := jsonParamsToArgs(rpcFunc, cdc, request.Params)
for _, request := range requests {
// A Notification is a Request object without an "id" member.
// The Server MUST NOT reply to a Notification, including those that are within a batch request.
if request.ID == types.JSONRPCStringID("") {
logger.Debug("HTTPJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)")
continue
}
if len(r.URL.Path) > 1 {
responses = append(responses, types.RPCInvalidRequestError(request.ID, errors.Errorf("path %s is invalid", r.URL.Path)))
continue
}
rpcFunc, ok := funcMap[request.Method]
if !ok || rpcFunc.ws {
responses = append(responses, types.RPCMethodNotFoundError(request.ID))
continue
}
ctx := &types.Context{JSONReq: &request, HTTPReq: r}
args := []reflect.Value{reflect.ValueOf(ctx)}
if len(request.Params) > 0 {
fnArgs, err := jsonParamsToArgs(rpcFunc, cdc, request.Params)
if err != nil {
responses = append(responses, types.RPCInvalidParamsError(request.ID, errors.Wrap(err, "error converting json params to arguments")))
continue
}
args = append(args, fnArgs...)
}
returns := rpcFunc.f.Call(args)
logger.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
result, err := unreflectResult(returns)
if err != nil {
WriteRPCResponseHTTP(w, types.RPCInvalidParamsError(request.ID, errors.Wrap(err, "Error converting json params to arguments")))
return
responses = append(responses, types.RPCInternalError(request.ID, err))
continue
}
args = append(args, fnArgs...)
responses = append(responses, types.NewRPCSuccessResponse(cdc, request.ID, result))
}
returns := rpcFunc.f.Call(args)
logger.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
result, err := unreflectResult(returns)
if err != nil {
WriteRPCResponseHTTP(w, types.RPCInternalError(request.ID, err))
return
if len(responses) > 0 {
WriteRPCResponseArrayHTTP(w, responses)
}
WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse(cdc, request.ID, result))
}
}
@ -194,7 +204,7 @@ func mapParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, params map[string]json.
func arrayParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, params []json.RawMessage, argsOffset int) ([]reflect.Value, error) {
if len(rpcFunc.argNames) != len(params) {
return nil, errors.Errorf("Expected %v parameters (%v), got %v (%v)",
return nil, errors.Errorf("expected %v parameters (%v), got %v (%v)",
len(rpcFunc.argNames), rpcFunc.argNames, len(params), params)
}
@ -236,7 +246,7 @@ func jsonParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, raw []byte) ([]reflect
}
// Otherwise, bad format, we cannot parse
return nil, errors.Errorf("Unknown type for JSON params: %v. Expected map or array", err)
return nil, errors.Errorf("unknown type for JSON params: %v. Expected map or array", err)
}
// rpc.json
@ -261,7 +271,7 @@ func makeHTTPHandler(rpcFunc *RPCFunc, cdc *amino.Codec, logger log.Logger) func
fnArgs, err := httpParamsToArgs(rpcFunc, cdc, r)
if err != nil {
WriteRPCResponseHTTP(w, types.RPCInvalidParamsError(types.JSONRPCStringID(""), errors.Wrap(err, "Error converting http params to arguments")))
WriteRPCResponseHTTP(w, types.RPCInvalidParamsError(types.JSONRPCStringID(""), errors.Wrap(err, "error converting http params to arguments")))
return
}
args = append(args, fnArgs...)
@ -372,7 +382,7 @@ func _nonJSONStringToArg(cdc *amino.Codec, rt reflect.Type, arg string) (reflect
if isHexString {
if !expectingString && !expectingByteSlice {
err := errors.Errorf("Got a hex string arg, but expected '%s'",
err := errors.Errorf("got a hex string arg, but expected '%s'",
rt.Kind().String())
return reflect.ValueOf(nil), err, false
}
@ -631,7 +641,7 @@ func (wsc *wsConnection) readRoutine() {
var request types.RPCRequest
err = json.Unmarshal(in, &request)
if err != nil {
wsc.WriteRPCResponse(types.RPCParseError(types.JSONRPCStringID(""), errors.Wrap(err, "Error unmarshaling request")))
wsc.WriteRPCResponse(types.RPCParseError(types.JSONRPCStringID(""), errors.Wrap(err, "error unmarshaling request")))
continue
}
@ -654,7 +664,7 @@ func (wsc *wsConnection) readRoutine() {
if len(request.Params) > 0 {
fnArgs, err := jsonParamsToArgs(rpcFunc, wsc.cdc, request.Params)
if err != nil {
wsc.WriteRPCResponse(types.RPCInternalError(request.ID, errors.Wrap(err, "Error converting json params to arguments")))
wsc.WriteRPCResponse(types.RPCInternalError(request.ID, errors.Wrap(err, "error converting json params to arguments")))
continue
}
args = append(args, fnArgs...)


+ 66
- 0
rpc/lib/server/handlers_test.go View File

@ -154,6 +154,72 @@ func TestRPCNotification(t *testing.T) {
require.Equal(t, len(blob), 0, "a notification SHOULD NOT be responded to by the server")
}
func TestRPCNotificationInBatch(t *testing.T) {
mux := testMux()
tests := []struct {
payload string
expectCount int
}{
{
`[
{"jsonrpc": "2.0","id": ""},
{"jsonrpc": "2.0","method":"c","id":"abc","params":["a","10"]}
]`,
1,
},
{
`[
{"jsonrpc": "2.0","id": ""},
{"jsonrpc": "2.0","method":"c","id":"abc","params":["a","10"]},
{"jsonrpc": "2.0","id": ""},
{"jsonrpc": "2.0","method":"c","id":"abc","params":["a","10"]}
]`,
2,
},
}
for i, tt := range tests {
req, _ := http.NewRequest("POST", "http://localhost/", strings.NewReader(tt.payload))
rec := httptest.NewRecorder()
mux.ServeHTTP(rec, req)
res := rec.Result()
// Always expecting back a JSONRPCResponse
assert.True(t, statusOK(res.StatusCode), "#%d: should always return 2XX", i)
blob, err := ioutil.ReadAll(res.Body)
if err != nil {
t.Errorf("#%d: err reading body: %v", i, err)
continue
}
var responses []types.RPCResponse
// try to unmarshal an array first
err = json.Unmarshal(blob, &responses)
if err != nil {
// if we were actually expecting an array, but got an error
if tt.expectCount > 1 {
t.Errorf("#%d: expected an array, couldn't unmarshal it\nblob: %s", i, blob)
continue
} else {
// we were expecting an error here, so let's unmarshal a single response
var response types.RPCResponse
err = json.Unmarshal(blob, &response)
if err != nil {
t.Errorf("#%d: expected successful parsing of an RPCResponse\nblob: %s", i, blob)
continue
}
// have a single-element result
responses = []types.RPCResponse{response}
}
}
if tt.expectCount != len(responses) {
t.Errorf("#%d: expected %d response(s), but got %d\nblob: %s", i, tt.expectCount, len(responses), blob)
continue
}
for _, response := range responses {
assert.NotEqual(t, response, new(types.RPCResponse), "#%d: not expecting a blank RPCResponse", i)
}
}
}
func TestUnknownRPCPath(t *testing.T) {
mux := testMux()
req, _ := http.NewRequest("GET", "http://localhost/unknownrpcpath", nil)


+ 1
- 1
rpc/lib/server/http_params.go View File

@ -76,7 +76,7 @@ func GetParamUint(r *http.Request, param string) (uint, error) {
func GetParamRegexp(r *http.Request, param string, re *regexp.Regexp) (string, error) {
s := GetParam(r, param)
if !re.MatchString(s) {
return "", errors.Errorf(param, "Did not match regular expression %v", re.String())
return "", errors.Errorf(param, "did not match regular expression %v", re.String())
}
return s, nil
}


+ 28
- 5
rpc/lib/server/http_server.go View File

@ -98,7 +98,9 @@ func WriteRPCResponseHTTPError(
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(httpCode)
w.Write(jsonBytes) // nolint: errcheck, gas
if _, err := w.Write(jsonBytes); err != nil {
panic(err)
}
}
func WriteRPCResponseHTTP(w http.ResponseWriter, res types.RPCResponse) {
@ -108,12 +110,33 @@ func WriteRPCResponseHTTP(w http.ResponseWriter, res types.RPCResponse) {
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(200)
w.Write(jsonBytes) // nolint: errcheck, gas
if _, err := w.Write(jsonBytes); err != nil {
panic(err)
}
}
// WriteRPCResponseArrayHTTP will do the same as WriteRPCResponseHTTP, except it
// can write arrays of responses for batched request/response interactions via
// the JSON RPC.
func WriteRPCResponseArrayHTTP(w http.ResponseWriter, res []types.RPCResponse) {
if len(res) == 1 {
WriteRPCResponseHTTP(w, res[0])
} else {
jsonBytes, err := json.MarshalIndent(res, "", " ")
if err != nil {
panic(err)
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(200)
if _, err := w.Write(jsonBytes); err != nil {
panic(err)
}
}
}
//-----------------------------------------------------------------------------
// Wraps an HTTP handler, adding error logging.
// RecoverAndLogHandler wraps an HTTP handler, adding error logging.
// If the inner function panics, the outer function recovers, logs, sends an
// HTTP 500 error response.
func RecoverAndLogHandler(handler http.Handler, logger log.Logger) http.Handler {
@ -191,14 +214,14 @@ func Listen(addr string, config *Config) (listener net.Listener, err error) {
parts := strings.SplitN(addr, "://", 2)
if len(parts) != 2 {
return nil, errors.Errorf(
"Invalid listening address %s (use fully formed addresses, including the tcp:// or unix:// prefix)",
"invalid listening address %s (use fully formed addresses, including the tcp:// or unix:// prefix)",
addr,
)
}
proto, addr := parts[0], parts[1]
listener, err = net.Listen(proto, addr)
if err != nil {
return nil, errors.Errorf("Failed to listen on %v: %v", addr, err)
return nil, errors.Errorf("failed to listen on %v: %v", addr, err)
}
if config.MaxOpenConnections > 0 {
listener = netutil.LimitListener(listener, config.MaxOpenConnections)


+ 58
- 19
rpc/test/helpers.go View File

@ -23,7 +23,18 @@ import (
rpcclient "github.com/tendermint/tendermint/rpc/lib/client"
)
// Options helps with specifying some parameters for our RPC testing for greater
// control.
type Options struct {
suppressStdout bool
recreateConfig bool
}
var globalConfig *cfg.Config
var defaultOptions = Options{
suppressStdout: false,
recreateConfig: false,
}
func waitForRPC() {
laddr := GetConfig().RPC.ListenAddress
@ -77,19 +88,24 @@ func makeAddrs() (string, string, string) {
fmt.Sprintf("tcp://0.0.0.0:%d", randPort())
}
func createConfig() *cfg.Config {
pathname := makePathname()
c := cfg.ResetTestRoot(pathname)
// and we use random ports to run in parallel
tm, rpc, grpc := makeAddrs()
c.P2P.ListenAddress = tm
c.RPC.ListenAddress = rpc
c.RPC.CORSAllowedOrigins = []string{"https://tendermint.com/"}
c.RPC.GRPCListenAddress = grpc
c.TxIndex.IndexTags = "app.creator,tx.height" // see kvstore application
return c
}
// GetConfig returns a config for the test cases as a singleton
func GetConfig() *cfg.Config {
if globalConfig == nil {
pathname := makePathname()
globalConfig = cfg.ResetTestRoot(pathname)
// and we use random ports to run in parallel
tm, rpc, grpc := makeAddrs()
globalConfig.P2P.ListenAddress = tm
globalConfig.RPC.ListenAddress = rpc
globalConfig.RPC.CORSAllowedOrigins = []string{"https://tendermint.com/"}
globalConfig.RPC.GRPCListenAddress = grpc
globalConfig.TxIndex.IndexTags = "app.creator,tx.height" // see kvstore application
func GetConfig(forceCreate ...bool) *cfg.Config {
if globalConfig == nil || (len(forceCreate) > 0 && forceCreate[0]) {
globalConfig = createConfig()
}
return globalConfig
}
@ -100,8 +116,12 @@ func GetGRPCClient() core_grpc.BroadcastAPIClient {
}
// StartTendermint starts a test tendermint server in a go routine and returns when it is initialized
func StartTendermint(app abci.Application) *nm.Node {
node := NewTendermint(app)
func StartTendermint(app abci.Application, opts ...func(*Options)) *nm.Node {
nodeOpts := defaultOptions
for _, opt := range opts {
opt(&nodeOpts)
}
node := NewTendermint(app, &nodeOpts)
err := node.Start()
if err != nil {
panic(err)
@ -111,7 +131,9 @@ func StartTendermint(app abci.Application) *nm.Node {
waitForRPC()
waitForGRPC()
fmt.Println("Tendermint running!")
if !nodeOpts.suppressStdout {
fmt.Println("Tendermint running!")
}
return node
}
@ -125,11 +147,16 @@ func StopTendermint(node *nm.Node) {
}
// NewTendermint creates a new tendermint server and sleeps forever
func NewTendermint(app abci.Application) *nm.Node {
func NewTendermint(app abci.Application, opts *Options) *nm.Node {
// Create & start node
config := GetConfig()
logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout))
logger = log.NewFilter(logger, log.AllowError())
config := GetConfig(opts.recreateConfig)
var logger log.Logger
if opts.suppressStdout {
logger = log.NewNopLogger()
} else {
logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout))
logger = log.NewFilter(logger, log.AllowError())
}
pvKeyFile := config.PrivValidatorKeyFile()
pvKeyStateFile := config.PrivValidatorStateFile()
pv := privval.LoadOrGenFilePV(pvKeyFile, pvKeyStateFile)
@ -148,3 +175,15 @@ func NewTendermint(app abci.Application) *nm.Node {
}
return node
}
// SuppressStdout is an option that tries to make sure the RPC test Tendermint
// node doesn't log anything to stdout.
func SuppressStdout(o *Options) {
o.suppressStdout = true
}
// RecreateConfig instructs the RPC test to recreate the configuration each
// time, instead of treating it as a global singleton.
func RecreateConfig(o *Options) {
o.recreateConfig = true
}

Loading…
Cancel
Save