Browse Source

Fix rpc tests

pull/1347/head
Jae Kwon 6 years ago
parent
commit
3ca5292dc9
11 changed files with 168 additions and 129 deletions
  1. +3
    -3
      Gopkg.lock
  2. +1
    -1
      Gopkg.toml
  3. +5
    -2
      node/node.go
  4. +106
    -95
      rpc/client/event_test.go
  5. +13
    -5
      rpc/client/httpclient.go
  6. +1
    -14
      rpc/core/routes.go
  7. +13
    -0
      rpc/core/types/wire.go
  8. +9
    -0
      rpc/lib/client/http_client.go
  9. +4
    -0
      rpc/lib/client/ws_client.go
  10. +5
    -0
      rpc/test/helpers.go
  11. +8
    -9
      types/events.go

+ 3
- 3
Gopkg.lock View File

@ -254,8 +254,8 @@
[[projects]]
name = "github.com/tendermint/go-amino"
packages = ["."]
revision = "26718ab6738f938d4b33d593543cee7681f2a6a6"
version = "0.9.5"
revision = "42246108ff925a457fb709475070a03dfd3e2b5c"
version = "0.9.6"
[[projects]]
name = "github.com/tendermint/go-crypto"
@ -383,6 +383,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "0dacd2eb1550ca01e0c64f77b721eda1a381dde1d246a56bfe5a2746b78b7bad"
inputs-digest = "d14dbd59436d0ea3b322c42ce33c213b26cd2451ba023a466764b8002e0e649d"
solver-name = "gps-cdcl"
solver-version = 1

+ 1
- 1
Gopkg.toml View File

@ -79,7 +79,7 @@
[[constraint]]
name = "github.com/tendermint/go-amino"
version = "0.9.5"
version = "0.9.6"
[[constraint]]
name = "github.com/tendermint/tmlibs"


+ 5
- 2
node/node.go View File

@ -26,6 +26,7 @@ import (
"github.com/tendermint/tendermint/p2p/trust"
"github.com/tendermint/tendermint/proxy"
rpccore "github.com/tendermint/tendermint/rpc/core"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
grpccore "github.com/tendermint/tendermint/rpc/grpc"
rpc "github.com/tendermint/tendermint/rpc/lib"
rpcserver "github.com/tendermint/tendermint/rpc/lib/server"
@ -489,6 +490,8 @@ func (n *Node) ConfigureRPC() {
func (n *Node) startRPC() ([]net.Listener, error) {
n.ConfigureRPC()
listenAddrs := strings.Split(n.config.RPC.ListenAddress, ",")
coreCodec := amino.NewCodec()
ctypes.RegisterAmino(coreCodec)
if n.config.RPC.Unsafe {
rpccore.AddUnsafeRoutes()
@ -499,10 +502,10 @@ func (n *Node) startRPC() ([]net.Listener, error) {
for i, listenAddr := range listenAddrs {
mux := http.NewServeMux()
rpcLogger := n.Logger.With("module", "rpc-server")
wm := rpcserver.NewWebsocketManager(rpccore.Routes, rpccore.RoutesCodec, rpcserver.EventSubscriber(n.eventBus))
wm := rpcserver.NewWebsocketManager(rpccore.Routes, coreCodec, rpcserver.EventSubscriber(n.eventBus))
wm.SetLogger(rpcLogger.With("protocol", "websocket"))
mux.HandleFunc("/websocket", wm.WebsocketHandler)
rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpccore.RoutesCodec, rpcLogger)
rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger)
listener, err := rpcserver.StartHTTPServer(listenAddr, mux, rpcLogger)
if err != nil {
return nil, err


+ 106
- 95
rpc/client/event_test.go View File

@ -1,16 +1,16 @@
package client_test
import (
"reflect"
"testing"
"time"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/abci/types"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tendermint/rpc/client"
"github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
)
var waitForEventTimeout = 5 * time.Second
@ -23,116 +23,127 @@ func MakeTxKV() ([]byte, []byte, []byte) {
}
func TestHeaderEvents(t *testing.T) {
require := require.New(t)
for i, c := range GetClients() {
// start for this test it if it wasn't already running
if !c.IsRunning() {
// if so, then we start it, listen, and stop it.
err := c.Start()
require.Nil(err, "%d: %+v", i, err)
defer c.Stop()
}
evtTyp := types.EventNewBlockHeader
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
require.Nil(err, "%d: %+v", i, err)
_, ok := evt.(types.EventDataNewBlockHeader)
require.True(ok, "%d: %#v", i, evt)
// TODO: more checks...
i, c := i, c // capture params
t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
// start for this test it if it wasn't already running
if !c.IsRunning() {
// if so, then we start it, listen, and stop it.
err := c.Start()
require.Nil(t, err, "%d: %+v", i, err)
defer c.Stop()
}
evtTyp := types.EventNewBlockHeader
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
require.Nil(t, err, "%d: %+v", i, err)
_, ok := evt.(types.EventDataNewBlockHeader)
require.True(t, ok, "%d: %#v", i, evt)
// TODO: more checks...
})
}
}
func TestBlockEvents(t *testing.T) {
require := require.New(t)
for i, c := range GetClients() {
// start for this test it if it wasn't already running
if !c.IsRunning() {
// if so, then we start it, listen, and stop it.
err := c.Start()
require.Nil(err, "%d: %+v", i, err)
defer c.Stop()
}
// listen for a new block; ensure height increases by 1
var firstBlockHeight int64
for j := 0; j < 3; j++ {
evtTyp := types.EventNewBlock
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
require.Nil(err, "%d: %+v", j, err)
blockEvent, ok := evt.(types.EventDataNewBlock)
require.True(ok, "%d: %#v", j, evt)
block := blockEvent.Block
if j == 0 {
firstBlockHeight = block.Header.Height
continue
i, c := i, c // capture params
t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
// start for this test it if it wasn't already running
if !c.IsRunning() {
// if so, then we start it, listen, and stop it.
err := c.Start()
require.Nil(t, err, "%d: %+v", i, err)
defer c.Stop()
}
require.Equal(block.Header.Height, firstBlockHeight+int64(j))
}
// listen for a new block; ensure height increases by 1
var firstBlockHeight int64
for j := 0; j < 3; j++ {
evtTyp := types.EventNewBlock
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
require.Nil(t, err, "%d: %+v", j, err)
blockEvent, ok := evt.(types.EventDataNewBlock)
require.True(t, ok, "%d: %#v", j, evt)
block := blockEvent.Block
if j == 0 {
firstBlockHeight = block.Header.Height
continue
}
require.Equal(t, block.Header.Height, firstBlockHeight+int64(j))
}
})
}
}
func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) {
require := require.New(t)
for i, c := range GetClients() {
// start for this test it if it wasn't already running
if !c.IsRunning() {
// if so, then we start it, listen, and stop it.
err := c.Start()
require.Nil(err, "%d: %+v", i, err)
defer c.Stop()
}
// make the tx
_, _, tx := MakeTxKV()
evtTyp := types.EventTx
// send async
txres, err := c.BroadcastTxAsync(tx)
require.Nil(err, "%+v", err)
require.Equal(txres.Code, abci.CodeTypeOK) // FIXME
// and wait for confirmation
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
require.Nil(err, "%d: %+v", i, err)
// and make sure it has the proper info
txe, ok := evt.(types.EventDataTx)
require.True(ok, "%d: %#v", i, evt)
// make sure this is the proper tx
require.EqualValues(tx, txe.Tx)
require.True(txe.Result.IsOK())
i, c := i, c // capture params
t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
// start for this test it if it wasn't already running
if !c.IsRunning() {
// if so, then we start it, listen, and stop it.
err := c.Start()
require.Nil(t, err, "%d: %+v", i, err)
defer c.Stop()
}
// make the tx
_, _, tx := MakeTxKV()
evtTyp := types.EventTx
// send async
txres, err := c.BroadcastTxAsync(tx)
require.Nil(t, err, "%+v", err)
require.Equal(t, txres.Code, abci.CodeTypeOK) // FIXME
// and wait for confirmation
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
require.Nil(t, err, "%d: %+v", i, err)
// and make sure it has the proper info
txe, ok := evt.(types.EventDataTx)
require.True(t, ok, "%d: %#v", i, evt)
// make sure this is the proper tx
require.EqualValues(t, tx, txe.Tx)
require.True(t, txe.Result.IsOK())
})
}
}
func TestTxEventsSentWithBroadcastTxSync(t *testing.T) {
require := require.New(t)
for i, c := range GetClients() {
// start for this test it if it wasn't already running
if !c.IsRunning() {
// if so, then we start it, listen, and stop it.
err := c.Start()
require.Nil(err, "%d: %+v", i, err)
defer c.Stop()
}
// make the tx
_, _, tx := MakeTxKV()
evtTyp := types.EventTx
// send sync
txres, err := c.BroadcastTxSync(tx)
require.Nil(err, "%+v", err)
require.Equal(txres.Code, abci.CodeTypeOK) // FIXME
// and wait for confirmation
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
require.Nil(err, "%d: %+v", i, err)
// and make sure it has the proper info
txe, ok := evt.(types.EventDataTx)
require.True(ok, "%d: %#v", i, evt)
// make sure this is the proper tx
require.EqualValues(tx, txe.Tx)
require.True(txe.Result.IsOK())
i, c := i, c // capture params
t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
// start for this test it if it wasn't already running
if !c.IsRunning() {
// if so, then we start it, listen, and stop it.
err := c.Start()
require.Nil(t, err, "%d: %+v", i, err)
defer c.Stop()
}
// make the tx
_, _, tx := MakeTxKV()
evtTyp := types.EventTx
// send sync
txres, err := c.BroadcastTxSync(tx)
require.Nil(t, err, "%+v", err)
require.Equal(t, txres.Code, abci.CodeTypeOK) // FIXME
// and wait for confirmation
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
require.Nil(t, err, "%d: %+v", i, err)
// and make sure it has the proper info
txe, ok := evt.(types.EventDataTx)
require.True(t, ok, "%d: %#v", i, evt)
// make sure this is the proper tx
require.EqualValues(t, tx, txe.Tx)
require.True(t, txe.Result.IsOK())
})
}
}

+ 13
- 5
rpc/client/httpclient.go View File

@ -2,11 +2,11 @@ package client
import (
"context"
"encoding/json"
"sync"
"github.com/pkg/errors"
amino "github.com/tendermint/go-amino"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpcclient "github.com/tendermint/tendermint/rpc/lib/client"
"github.com/tendermint/tendermint/types"
@ -32,10 +32,14 @@ type HTTP struct {
// New takes a remote endpoint in the form tcp://<host>:<port>
// and the websocket path (which always seems to be "/websocket")
func NewHTTP(remote, wsEndpoint string) *HTTP {
rc := rpcclient.NewJSONRPCClient(remote)
cdc := rc.Codec()
ctypes.RegisterAmino(cdc)
return &HTTP{
rpc: rpcclient.NewJSONRPCClient(remote),
rpc: rc,
remote: remote,
WSEvents: newWSEvents(remote, wsEndpoint),
WSEvents: newWSEvents(cdc, remote, wsEndpoint),
}
}
@ -208,6 +212,7 @@ func (c *HTTP) Validators(height *int64) (*ctypes.ResultValidators, error) {
type WSEvents struct {
cmn.BaseService
cdc *amino.Codec
remote string
endpoint string
ws *rpcclient.WSClient
@ -216,8 +221,9 @@ type WSEvents struct {
subscriptions map[string]chan<- interface{}
}
func newWSEvents(remote, endpoint string) *WSEvents {
func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents {
wsEvents := &WSEvents{
cdc: cdc,
endpoint: endpoint,
remote: remote,
subscriptions: make(map[string]chan<- interface{}),
@ -231,6 +237,8 @@ func (w *WSEvents) OnStart() error {
w.ws = rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() {
w.redoSubscriptions()
}))
w.ws.SetCodec(w.cdc)
err := w.ws.Start()
if err != nil {
return err
@ -326,7 +334,7 @@ func (w *WSEvents) eventListener() {
continue
}
result := new(ctypes.ResultEvent)
err := json.Unmarshal(resp.Result, result)
err := w.cdc.UnmarshalJSON(resp.Result, result)
if err != nil {
w.Logger.Error("failed to unmarshal response", "err", err)
continue


+ 1
- 14
rpc/core/routes.go View File

@ -1,13 +1,11 @@
package core
import (
"github.com/tendermint/go-amino"
"github.com/tendermint/go-crypto"
rpc "github.com/tendermint/tendermint/rpc/lib/server"
"github.com/tendermint/tendermint/types"
)
// TODO: better system than "unsafe" prefix
// NOTE: Amino is registered in rpc/core/types/wire.go.
var Routes = map[string]*rpc.RPCFunc{
// subscribe/unsubscribe are reserved for websocket events.
"subscribe": rpc.NewWSRPCFunc(Subscribe, "query"),
@ -50,14 +48,3 @@ func AddUnsafeRoutes() {
Routes["unsafe_stop_cpu_profiler"] = rpc.NewRPCFunc(UnsafeStopCPUProfiler, "")
Routes["unsafe_write_heap_profile"] = rpc.NewRPCFunc(UnsafeWriteHeapProfile, "filename")
}
var RoutesCodec *amino.Codec
func init() {
cdc := amino.NewCodec()
RoutesCodec = cdc
types.RegisterEventDatas(cdc)
types.RegisterEvidences(cdc)
crypto.RegisterAmino(cdc)
}

+ 13
- 0
rpc/core/types/wire.go View File

@ -0,0 +1,13 @@
package core_types
import (
"github.com/tendermint/go-amino"
"github.com/tendermint/go-crypto"
"github.com/tendermint/tendermint/types"
)
func RegisterAmino(cdc *amino.Codec) {
types.RegisterEventDatas(cdc)
types.RegisterEvidences(cdc)
crypto.RegisterAmino(cdc)
}

+ 9
- 0
rpc/lib/client/http_client.go View File

@ -21,6 +21,7 @@ import (
type HTTPClient interface {
Call(method string, params map[string]interface{}, result interface{}) (interface{}, error)
Codec() *amino.Codec
SetCodec(*amino.Codec)
}
// TODO: Deprecate support for IP:PORT or /path/to/socket
@ -111,6 +112,10 @@ func (c *JSONRPCClient) Codec() *amino.Codec {
return c.cdc
}
func (c *JSONRPCClient) SetCodec(cdc *amino.Codec) {
c.cdc = cdc
}
//-------------------------------------------------------------
// URI takes params as a map
@ -152,6 +157,10 @@ func (c *URIClient) Codec() *amino.Codec {
return c.cdc
}
func (c *URIClient) SetCodec(cdc *amino.Codec) {
c.cdc = cdc
}
//------------------------------------------------
func unmarshalResponseBytes(cdc *amino.Codec, responseBytes []byte, result interface{}) (interface{}, error) {


+ 4
- 0
rpc/lib/client/ws_client.go View File

@ -230,6 +230,10 @@ func (c *WSClient) Codec() *amino.Codec {
return c.cdc
}
func (c *WSClient) SetCodec(cdc *amino.Codec) {
c.cdc = cdc
}
///////////////////////////////////////////////////////////////////////////////
// Private methods


+ 5
- 0
rpc/test/helpers.go View File

@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"strings"
"time"
"github.com/tendermint/tmlibs/log"
@ -26,11 +27,15 @@ var globalConfig *cfg.Config
func waitForRPC() {
laddr := GetConfig().RPC.ListenAddress
client := rpcclient.NewJSONRPCClient(laddr)
ctypes.RegisterAmino(client.Codec())
result := new(ctypes.ResultStatus)
for {
_, err := client.Call("status", map[string]interface{}{}, result)
if err == nil {
return
} else {
fmt.Println("error", err)
time.Sleep(time.Millisecond)
}
}
}


+ 8
- 9
types/events.go View File

@ -35,20 +35,19 @@ const (
// ENCODING / DECODING
///////////////////////////////////////////////////////////////////////////////
var (
EventDataNameNewBlock = "new_block"
EventDataNameNewBlockHeader = "new_block_header"
EventDataNameTx = "tx"
EventDataNameRoundState = "round_state"
EventDataNameVote = "vote"
EventDataNameProposalHeartbeat = "proposal_heartbeat"
)
// implements events.EventData
type TMEventData interface {
AssertIsTMEventData()
// empty interface
}
func (_ EventDataNewBlock) AssertIsTMEventData() {}
func (_ EventDataNewBlockHeader) AssertIsTMEventData() {}
func (_ EventDataTx) AssertIsTMEventData() {}
func (_ EventDataRoundState) AssertIsTMEventData() {}
func (_ EventDataVote) AssertIsTMEventData() {}
func (_ EventDataProposalHeartbeat) AssertIsTMEventData() {}
func RegisterEventDatas(cdc *amino.Codec) {
cdc.RegisterInterface((*TMEventData)(nil), nil)
cdc.RegisterConcrete(EventDataNewBlock{}, "tendermint/EventDataNameNewBlock", nil)


Loading…
Cancel
Save