From 7c26be3242fc34bc3f2cc9feee7e570460093c86 Mon Sep 17 00:00:00 2001 From: Ethan Frey Date: Wed, 22 Feb 2017 14:41:10 +0100 Subject: [PATCH] Begin implementation of local client --- node/node.go | 8 +- rpc/client/http/client.go | 57 +++++++---- rpc/client/http/rpc_test.go | 2 +- rpc/client/local/app_test.go | 45 +++++++++ rpc/client/local/client.go | 134 +++++++++++++++++++++++++ rpc/client/local/main_test.go | 21 ++++ rpc/client/local/rpc_test.go | 183 ++++++++++++++++++++++++++++++++++ rpc/test/helpers.go | 4 + 8 files changed, 429 insertions(+), 25 deletions(-) create mode 100644 rpc/client/local/app_test.go create mode 100644 rpc/client/local/client.go create mode 100644 rpc/client/local/main_test.go create mode 100644 rpc/client/local/rpc_test.go diff --git a/node/node.go b/node/node.go index deaa9e163..6ee027710 100644 --- a/node/node.go +++ b/node/node.go @@ -246,9 +246,10 @@ func (n *Node) AddListener(l p2p.Listener) { n.sw.AddListener(l) } -func (n *Node) startRPC() ([]net.Listener, error) { +// ConfigureRPC sets all variables in rpccore so they will serve +// rpc calls from this node +func (n *Node) ConfigureRPC() { rpccore.SetConfig(n.config) - rpccore.SetEventSwitch(n.evsw) rpccore.SetBlockStore(n.blockStore) rpccore.SetConsensusState(n.consensusState) @@ -257,7 +258,10 @@ func (n *Node) startRPC() ([]net.Listener, error) { rpccore.SetPubKey(n.privValidator.PubKey) rpccore.SetGenesisDoc(n.genesisDoc) rpccore.SetProxyAppQuery(n.proxyApp.Query()) +} +func (n *Node) startRPC() ([]net.Listener, error) { + n.ConfigureRPC() listenAddrs := strings.Split(n.config.GetString("rpc_laddr"), ",") // we may expose the rpc over both a unix and tcp socket diff --git a/rpc/client/http/client.go b/rpc/client/http/client.go index 773b313a9..c0c8d00ed 100644 --- a/rpc/client/http/client.go +++ b/rpc/client/http/client.go @@ -1,3 +1,12 @@ +/* +package httpclient returns a Client implementation that communicates +with a tendermint node over json rpc and websockets. + +This is the main implementation you probably want to use in +production code. There are other implementations when calling +the tendermint node in-process (local), or when you want to mock +out the server for test code (mock). +*/ package httpclient import ( @@ -9,22 +18,26 @@ import ( "github.com/tendermint/tendermint/types" ) -type HTTPClient struct { +// Client is a Client implementation that communicates over +// JSONRPC +type Client struct { remote string endpoint string rpc *rpcclient.ClientJSONRPC ws *rpcclient.WSClient } -func New(remote, wsEndpoint string) *HTTPClient { - return &HTTPClient{ +// New takes a remote endpoint in the form tcp://: +// and the websocket path (which always seems to be "/websocket") +func New(remote, wsEndpoint string) *Client { + return &Client{ rpc: rpcclient.NewClientJSONRPC(remote), remote: remote, endpoint: wsEndpoint, } } -func (c *HTTPClient) Status() (*ctypes.ResultStatus, error) { +func (c *Client) Status() (*ctypes.ResultStatus, error) { tmResult := new(ctypes.TMResult) _, err := c.rpc.Call("status", []interface{}{}, tmResult) if err != nil { @@ -34,7 +47,7 @@ func (c *HTTPClient) Status() (*ctypes.ResultStatus, error) { return (*tmResult).(*ctypes.ResultStatus), nil } -func (c *HTTPClient) ABCIInfo() (*ctypes.ResultABCIInfo, error) { +func (c *Client) ABCIInfo() (*ctypes.ResultABCIInfo, error) { tmResult := new(ctypes.TMResult) _, err := c.rpc.Call("abci_info", []interface{}{}, tmResult) if err != nil { @@ -43,7 +56,7 @@ func (c *HTTPClient) ABCIInfo() (*ctypes.ResultABCIInfo, error) { return (*tmResult).(*ctypes.ResultABCIInfo), nil } -func (c *HTTPClient) ABCIQuery(path string, data []byte, prove bool) (*ctypes.ResultABCIQuery, error) { +func (c *Client) ABCIQuery(path string, data []byte, prove bool) (*ctypes.ResultABCIQuery, error) { tmResult := new(ctypes.TMResult) _, err := c.rpc.Call("abci_query", []interface{}{path, data, prove}, tmResult) if err != nil { @@ -52,19 +65,19 @@ func (c *HTTPClient) ABCIQuery(path string, data []byte, prove bool) (*ctypes.Re return (*tmResult).(*ctypes.ResultABCIQuery), nil } -func (c *HTTPClient) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { +func (c *Client) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { return c.broadcastTX("broadcast_tx_commit", tx) } -func (c *HTTPClient) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { +func (c *Client) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { return c.broadcastTX("broadcast_tx_async", tx) } -func (c *HTTPClient) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { +func (c *Client) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { return c.broadcastTX("broadcast_tx_sync", tx) } -func (c *HTTPClient) broadcastTX(route string, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { +func (c *Client) broadcastTX(route string, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { tmResult := new(ctypes.TMResult) _, err := c.rpc.Call(route, []interface{}{tx}, tmResult) if err != nil { @@ -73,7 +86,7 @@ func (c *HTTPClient) broadcastTX(route string, tx types.Tx) (*ctypes.ResultBroad return (*tmResult).(*ctypes.ResultBroadcastTxCommit), nil } -func (c *HTTPClient) NetInfo() (*ctypes.ResultNetInfo, error) { +func (c *Client) NetInfo() (*ctypes.ResultNetInfo, error) { tmResult := new(ctypes.TMResult) _, err := c.rpc.Call("net_info", nil, tmResult) if err != nil { @@ -82,7 +95,7 @@ func (c *HTTPClient) NetInfo() (*ctypes.ResultNetInfo, error) { return (*tmResult).(*ctypes.ResultNetInfo), nil } -func (c *HTTPClient) DialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) { +func (c *Client) DialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) { tmResult := new(ctypes.TMResult) // TODO: is this the correct way to marshall seeds? _, err := c.rpc.Call("dial_seeds", []interface{}{seeds}, tmResult) @@ -92,7 +105,7 @@ func (c *HTTPClient) DialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) return (*tmResult).(*ctypes.ResultDialSeeds), nil } -func (c *HTTPClient) BlockchainInfo(minHeight, maxHeight int) (*ctypes.ResultBlockchainInfo, error) { +func (c *Client) BlockchainInfo(minHeight, maxHeight int) (*ctypes.ResultBlockchainInfo, error) { tmResult := new(ctypes.TMResult) _, err := c.rpc.Call("blockchain", []interface{}{minHeight, maxHeight}, tmResult) if err != nil { @@ -101,7 +114,7 @@ func (c *HTTPClient) BlockchainInfo(minHeight, maxHeight int) (*ctypes.ResultBlo return (*tmResult).(*ctypes.ResultBlockchainInfo), nil } -func (c *HTTPClient) Genesis() (*ctypes.ResultGenesis, error) { +func (c *Client) Genesis() (*ctypes.ResultGenesis, error) { tmResult := new(ctypes.TMResult) _, err := c.rpc.Call("genesis", nil, tmResult) if err != nil { @@ -110,7 +123,7 @@ func (c *HTTPClient) Genesis() (*ctypes.ResultGenesis, error) { return (*tmResult).(*ctypes.ResultGenesis), nil } -func (c *HTTPClient) Block(height int) (*ctypes.ResultBlock, error) { +func (c *Client) Block(height int) (*ctypes.ResultBlock, error) { tmResult := new(ctypes.TMResult) _, err := c.rpc.Call("block", []interface{}{height}, tmResult) if err != nil { @@ -119,7 +132,7 @@ func (c *HTTPClient) Block(height int) (*ctypes.ResultBlock, error) { return (*tmResult).(*ctypes.ResultBlock), nil } -func (c *HTTPClient) Commit(height int) (*ctypes.ResultCommit, error) { +func (c *Client) Commit(height int) (*ctypes.ResultCommit, error) { tmResult := new(ctypes.TMResult) _, err := c.rpc.Call("commit", []interface{}{height}, tmResult) if err != nil { @@ -128,7 +141,7 @@ func (c *HTTPClient) Commit(height int) (*ctypes.ResultCommit, error) { return (*tmResult).(*ctypes.ResultCommit), nil } -func (c *HTTPClient) Validators() (*ctypes.ResultValidators, error) { +func (c *Client) Validators() (*ctypes.ResultValidators, error) { tmResult := new(ctypes.TMResult) _, err := c.rpc.Call("validators", nil, tmResult) if err != nil { @@ -141,7 +154,7 @@ func (c *HTTPClient) Validators() (*ctypes.ResultValidators, error) { // StartWebsocket starts up a websocket and a listener goroutine // if already started, do nothing -func (c *HTTPClient) StartWebsocket() error { +func (c *Client) StartWebsocket() error { var err error if c.ws == nil { ws := rpcclient.NewWSClient(c.remote, c.endpoint) @@ -154,7 +167,7 @@ func (c *HTTPClient) StartWebsocket() error { } // StopWebsocket stops the websocket connection -func (c *HTTPClient) StopWebsocket() { +func (c *Client) StopWebsocket() { if c.ws != nil { c.ws.Stop() c.ws = nil @@ -162,17 +175,17 @@ func (c *HTTPClient) StopWebsocket() { } // GetEventChannels returns the results and error channel from the websocket -func (c *HTTPClient) GetEventChannels() (chan json.RawMessage, chan error) { +func (c *Client) GetEventChannels() (chan json.RawMessage, chan error) { if c.ws == nil { return nil, nil } return c.ws.ResultsCh, c.ws.ErrorsCh } -func (c *HTTPClient) Subscribe(event string) error { +func (c *Client) Subscribe(event string) error { return errors.Wrap(c.ws.Subscribe(event), "Subscribe") } -func (c *HTTPClient) Unsubscribe(event string) error { +func (c *Client) Unsubscribe(event string) error { return errors.Wrap(c.ws.Unsubscribe(event), "Unsubscribe") } diff --git a/rpc/client/http/rpc_test.go b/rpc/client/http/rpc_test.go index 3a7f85402..09a260966 100644 --- a/rpc/client/http/rpc_test.go +++ b/rpc/client/http/rpc_test.go @@ -14,7 +14,7 @@ import ( ) // GetClient gets a rpc client pointing to the test tendermint rpc -func GetClient() *httpclient.HTTPClient { +func GetClient() *httpclient.Client { rpcAddr := rpctest.GetConfig().GetString("rpc_laddr") return httpclient.New(rpcAddr, "/websocket") } diff --git a/rpc/client/local/app_test.go b/rpc/client/local/app_test.go new file mode 100644 index 000000000..bd16b3751 --- /dev/null +++ b/rpc/client/local/app_test.go @@ -0,0 +1,45 @@ +package localclient_test + +import ( + "math/rand" + + meapp "github.com/tendermint/merkleeyes/app" + + wire "github.com/tendermint/go-wire" +) + +// MakeTxKV returns a text transaction, allong with expected key, value pair +func MakeTxKV() ([]byte, []byte, []byte) { + k := RandAsciiBytes(8) + v := RandAsciiBytes(8) + return k, v, makeSet(k, v) +} + +// blatently copied from merkleeyes/app/app_test.go +// constructs a "set" transaction +func makeSet(key, value []byte) []byte { + tx := make([]byte, 1+wire.ByteSliceSize(key)+wire.ByteSliceSize(value)) + buf := tx + buf[0] = meapp.WriteSet // Set TypeByte + buf = buf[1:] + n, err := wire.PutByteSlice(buf, key) + if err != nil { + panic(err) + } + buf = buf[n:] + n, err = wire.PutByteSlice(buf, value) + if err != nil { + panic(err) + } + return tx +} + +const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + +func RandAsciiBytes(n int) []byte { + b := make([]byte, n) + for i := range b { + b[i] = letterBytes[rand.Intn(len(letterBytes))] + } + return b +} diff --git a/rpc/client/local/client.go b/rpc/client/local/client.go new file mode 100644 index 000000000..3817e2df1 --- /dev/null +++ b/rpc/client/local/client.go @@ -0,0 +1,134 @@ +/* +package localclient returns a Client implementation that +directly executes the rpc functions on a given node. + +This implementation is useful for: + +* Running tests against a node in-process without the overhead +of going through an http server +* Communication between an ABCI app and tendermin core when they +are compiled in process. + +For real clients, you probably want the "http" package. For more +powerful control during testing, you probably want the "mock" package. +*/ +package localclient + +import ( + nm "github.com/tendermint/tendermint/node" + "github.com/tendermint/tendermint/rpc/core" + ctypes "github.com/tendermint/tendermint/rpc/core/types" + "github.com/tendermint/tendermint/types" +) + +type Client struct { + node *nm.Node +} + +// New configures this to call the Node directly. +// +// Note that given how rpc/core works with package singletons, that +// you can only have one node per process. So make sure test cases +// don't run in parallel, or try to simulate an entire network in +// one process... +func New(node *nm.Node) Client { + node.ConfigureRPC() + return Client{ + node: node, + } +} + +func (c Client) Status() (*ctypes.ResultStatus, error) { + return core.Status() +} + +func (c Client) ABCIInfo() (*ctypes.ResultABCIInfo, error) { + return core.ABCIInfo() +} + +func (c Client) ABCIQuery(path string, data []byte, prove bool) (*ctypes.ResultABCIQuery, error) { + return core.ABCIQuery(path, data, prove) +} + +func (c Client) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { + return core.BroadcastTxCommit(tx) +} + +func (c Client) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { + return core.BroadcastTxAsync(tx) +} + +func (c Client) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { + return core.BroadcastTxSync(tx) +} + +func (c Client) NetInfo() (*ctypes.ResultNetInfo, error) { + return core.NetInfo() +} + +func (c Client) DialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) { + return core.UnsafeDialSeeds(seeds) +} + +func (c Client) BlockchainInfo(minHeight, maxHeight int) (*ctypes.ResultBlockchainInfo, error) { + return core.BlockchainInfo(minHeight, maxHeight) +} + +func (c Client) Genesis() (*ctypes.ResultGenesis, error) { + return core.Genesis() +} + +func (c Client) Block(height int) (*ctypes.ResultBlock, error) { + return core.Block(height) +} + +func (c Client) Commit(height int) (*ctypes.ResultCommit, error) { + return core.Commit(height) +} + +func (c Client) Validators() (*ctypes.ResultValidators, error) { + return core.Validators() +} + +/** websocket event stuff here... **/ + +/* +// StartWebsocket starts up a websocket and a listener goroutine +// if already started, do nothing +func (c Client) StartWebsocket() error { + var err error + if c.ws == nil { + ws := rpcclient.NewWSClient(c.remote, c.endpoint) + _, err = ws.Start() + if err == nil { + c.ws = ws + } + } + return errors.Wrap(err, "StartWebsocket") +} + +// StopWebsocket stops the websocket connection +func (c Client) StopWebsocket() { + if c.ws != nil { + c.ws.Stop() + c.ws = nil + } +} + +// GetEventChannels returns the results and error channel from the websocket +func (c Client) GetEventChannels() (chan json.RawMessage, chan error) { + if c.ws == nil { + return nil, nil + } + return c.ws.ResultsCh, c.ws.ErrorsCh +} + +func (c Client) Subscribe(event string) error { + return errors.Wrap(c.ws.Subscribe(event), "Subscribe") +} + +func (c Client) Unsubscribe(event string) error { + return errors.Wrap(c.ws.Unsubscribe(event), "Unsubscribe") +} + +*/ diff --git a/rpc/client/local/main_test.go b/rpc/client/local/main_test.go new file mode 100644 index 000000000..5e0a0c237 --- /dev/null +++ b/rpc/client/local/main_test.go @@ -0,0 +1,21 @@ +package localclient_test + +import ( + "os" + "testing" + + meapp "github.com/tendermint/merkleeyes/app" + rpctest "github.com/tendermint/tendermint/rpc/test" +) + +func TestMain(m *testing.M) { + // start a tendermint node (and merkleeyes) in the background to test against + app := meapp.NewMerkleEyesApp("", 100) + node := rpctest.StartTendermint(app) + code := m.Run() + + // and shut down proper at the end + node.Stop() + node.Wait() + os.Exit(code) +} diff --git a/rpc/client/local/rpc_test.go b/rpc/client/local/rpc_test.go new file mode 100644 index 000000000..13fc55471 --- /dev/null +++ b/rpc/client/local/rpc_test.go @@ -0,0 +1,183 @@ +package localclient_test + +import ( + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + merkle "github.com/tendermint/go-merkle" + localclient "github.com/tendermint/tendermint/rpc/client/local" + rpctest "github.com/tendermint/tendermint/rpc/test" +) + +// GetClient gets a rpc client pointing to the test tendermint rpc +func GetClient() localclient.Client { + node := rpctest.GetNode() + return localclient.New(node) +} + +// Make sure status is correct (we connect properly) +func TestStatus(t *testing.T) { + c := GetClient() + chainID := rpctest.GetConfig().GetString("chain_id") + status, err := c.Status() + require.Nil(t, err, "%+v", err) + assert.Equal(t, chainID, status.NodeInfo.Network) +} + +// Make sure info is correct (we connect properly) +func TestInfo(t *testing.T) { + c := GetClient() + status, err := c.Status() + require.Nil(t, err, "%+v", err) + info, err := c.ABCIInfo() + require.Nil(t, err, "%+v", err) + assert.EqualValues(t, status.LatestBlockHeight, info.Response.LastBlockHeight) + assert.True(t, strings.HasPrefix(info.Response.Data, "size:")) +} + +func TestNetInfo(t *testing.T) { + c := GetClient() + netinfo, err := c.NetInfo() + require.Nil(t, err, "%+v", err) + assert.True(t, netinfo.Listening) + assert.Equal(t, 0, len(netinfo.Peers)) +} + +func TestDialSeeds(t *testing.T) { + c := GetClient() + // FIXME: fix server so it doesn't panic on invalid input + _, err := c.DialSeeds([]string{"12.34.56.78:12345"}) + require.Nil(t, err, "%+v", err) +} + +func TestGenesisAndValidators(t *testing.T) { + c := GetClient() + chainID := rpctest.GetConfig().GetString("chain_id") + + // make sure this is the right genesis file + gen, err := c.Genesis() + require.Nil(t, err, "%+v", err) + assert.Equal(t, chainID, gen.Genesis.ChainID) + // get the genesis validator + require.Equal(t, 1, len(gen.Genesis.Validators)) + gval := gen.Genesis.Validators[0] + + // get the current validators + vals, err := c.Validators() + require.Nil(t, err, "%+v", err) + require.Equal(t, 1, len(vals.Validators)) + val := vals.Validators[0] + + // make sure the current set is also the genesis set + assert.Equal(t, gval.Amount, val.VotingPower) + assert.Equal(t, gval.PubKey, val.PubKey) +} + +// Make some app checks +func TestAppCalls(t *testing.T) { + assert, require := assert.New(t), require.New(t) + c := GetClient() + _, err := c.Block(1) + assert.NotNil(err) // no block yet + k, v, tx := MakeTxKV() + _, err = c.BroadcastTxCommit(tx) + require.Nil(err, "%+v", err) + // wait before querying + time.Sleep(time.Second * 1) + qres, err := c.ABCIQuery("/key", k, false) + if assert.Nil(err) && assert.True(qres.Response.Code.IsOK()) { + data := qres.Response + // assert.Equal(k, data.GetKey()) // only returned for proofs + assert.Equal(v, data.GetValue()) + } + // and we can even check the block is added + block, err := c.Block(3) + require.Nil(err, "%+v", err) + appHash := block.BlockMeta.Header.AppHash + assert.True(len(appHash) > 0) + assert.EqualValues(3, block.BlockMeta.Header.Height) + + // check blockchain info, now that we know there is info + // TODO: is this commented somewhere that they are returned + // in order of descending height??? + info, err := c.BlockchainInfo(1, 3) + require.Nil(err, "%+v", err) + assert.True(info.LastHeight > 2) + if assert.Equal(3, len(info.BlockMetas)) { + lastMeta := info.BlockMetas[0] + assert.EqualValues(3, lastMeta.Header.Height) + bMeta := block.BlockMeta + assert.Equal(bMeta.Header.AppHash, lastMeta.Header.AppHash) + assert.Equal(bMeta.BlockID, lastMeta.BlockID) + } + + // and get the corresponding commit with the same apphash + commit, err := c.Commit(3) + require.Nil(err, "%+v", err) + cappHash := commit.Header.AppHash + assert.Equal(appHash, cappHash) + assert.NotNil(commit.Commit) + + // compare the commits (note Commit(2) has commit from Block(3)) + commit2, err := c.Commit(2) + require.Nil(err, "%+v", err) + assert.Equal(block.Block.LastCommit, commit2.Commit) + + // and we got a proof that works! + pres, err := c.ABCIQuery("/key", k, true) + if assert.Nil(err) && assert.True(pres.Response.Code.IsOK()) { + proof, err := merkle.ReadProof(pres.Response.GetProof()) + if assert.Nil(err) { + key := pres.Response.GetKey() + value := pres.Response.GetValue() + assert.Equal(appHash, proof.RootHash) + valid := proof.Verify(key, value, appHash) + assert.True(valid) + } + } +} + +/* +func TestSubscriptions(t *testing.T) { + assert, require := assert.New(t), require.New(t) + c := GetClient() + err := c.StartWebsocket() + require.Nil(err) + defer c.StopWebsocket() + + // subscribe to a transaction event + _, _, tx := MakeTxKV() + // this causes a panic in tendermint core!!! + eventType := types.EventStringTx(types.Tx(tx)) + c.Subscribe(eventType) + read := 0 + + // set up a listener + r, e := c.GetEventChannels() + go func() { + // read one event in the background + select { + case <-r: + // TODO: actually parse this or something + read += 1 + case err := <-e: + panic(err) + } + }() + + // make sure nothing has happened yet. + assert.Equal(0, read) + + // send a tx and wait for it to propogate + _, err = c.BroadcastTxCommit(tx) + assert.Nil(err, string(tx)) + // wait before querying + time.Sleep(time.Second) + + // now make sure the event arrived + assert.Equal(1, read) +} +*/ diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index c028fa5a2..e835a3930 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -37,6 +37,10 @@ func GetConfig() cfg.Config { return config } +func GetNode() *nm.Node { + return node +} + // GetURIClient gets a uri client pointing to the test tendermint rpc func GetURIClient() *client.ClientURI { rpcAddr := GetConfig().GetString("rpc_laddr")