Browse Source

Merge pull request #845 from tendermint/fix/826-wait-for-rpc

rpc: wait for rpc servers to be available in tests
pull/842/merge
Ethan Buchman 7 years ago
committed by GitHub
parent
commit
7670049a31
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 154 additions and 42 deletions
  1. +1
    -1
      mempool/reactor_test.go
  2. +9
    -3
      p2p/switch_test.go
  3. +6
    -4
      rpc/client/event_test.go
  4. +1
    -2
      rpc/core/events.go
  5. +1
    -1
      rpc/core/mempool.go
  6. +4
    -0
      rpc/core/pipe.go
  7. +6
    -1
      rpc/grpc/api.go
  8. +1
    -1
      rpc/grpc/grpc_test.go
  9. +74
    -18
      rpc/grpc/types.pb.go
  10. +8
    -1
      rpc/grpc/types.proto
  11. +3
    -1
      rpc/lib/client/ws_client_test.go
  12. +39
    -8
      rpc/test/helpers.go
  13. +1
    -1
      test/app/grpc_client.go

+ 1
- 1
mempool/reactor_test.go View File

@ -81,7 +81,7 @@ func _waitForTxs(t *testing.T, wg *sync.WaitGroup, txs types.Txs, reactorIdx int
mempool := reactors[reactorIdx].Mempool mempool := reactors[reactorIdx].Mempool
for mempool.Size() != len(txs) { for mempool.Size() != len(txs) {
time.Sleep(time.Second)
time.Sleep(time.Millisecond * 100)
} }
reapedTxs := mempool.Reap(len(txs)) reapedTxs := mempool.Reap(len(txs))


+ 9
- 3
p2p/switch_test.go View File

@ -262,9 +262,15 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
peer.CloseConn() peer.CloseConn()
// TODO: actually detect the disconnection and wait for reconnect // TODO: actually detect the disconnection and wait for reconnect
time.Sleep(100 * time.Millisecond)
assert.NotZero(sw.Peers().Size())
npeers := sw.Peers().Size()
for i := 0; i < 20; i++ {
time.Sleep(100 * time.Millisecond)
npeers = sw.Peers().Size()
if npeers > 0 {
break
}
}
assert.NotZero(npeers)
assert.False(peer.IsRunning()) assert.False(peer.IsRunning())
} }


+ 6
- 4
rpc/client/event_test.go View File

@ -12,6 +12,8 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
var waitForEventTimeout = 5 * time.Second
// MakeTxKV returns a text transaction, allong with expected key, value pair // MakeTxKV returns a text transaction, allong with expected key, value pair
func MakeTxKV() ([]byte, []byte, []byte) { func MakeTxKV() ([]byte, []byte, []byte) {
k := []byte(cmn.RandStr(8)) k := []byte(cmn.RandStr(8))
@ -32,7 +34,7 @@ func TestHeaderEvents(t *testing.T) {
} }
evtTyp := types.EventNewBlockHeader evtTyp := types.EventNewBlockHeader
evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second)
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
require.Nil(err, "%d: %+v", i, err) require.Nil(err, "%d: %+v", i, err)
_, ok := evt.Unwrap().(types.EventDataNewBlockHeader) _, ok := evt.Unwrap().(types.EventDataNewBlockHeader)
require.True(ok, "%d: %#v", i, evt) require.True(ok, "%d: %#v", i, evt)
@ -56,7 +58,7 @@ func TestBlockEvents(t *testing.T) {
var firstBlockHeight int var firstBlockHeight int
for j := 0; j < 3; j++ { for j := 0; j < 3; j++ {
evtTyp := types.EventNewBlock evtTyp := types.EventNewBlock
evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second)
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
require.Nil(err, "%d: %+v", j, err) require.Nil(err, "%d: %+v", j, err)
blockEvent, ok := evt.Unwrap().(types.EventDataNewBlock) blockEvent, ok := evt.Unwrap().(types.EventDataNewBlock)
require.True(ok, "%d: %#v", j, evt) require.True(ok, "%d: %#v", j, evt)
@ -94,7 +96,7 @@ func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) {
require.True(txres.Code.IsOK()) require.True(txres.Code.IsOK())
// and wait for confirmation // and wait for confirmation
evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second)
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
require.Nil(err, "%d: %+v", i, err) require.Nil(err, "%d: %+v", i, err)
// and make sure it has the proper info // and make sure it has the proper info
txe, ok := evt.Unwrap().(types.EventDataTx) txe, ok := evt.Unwrap().(types.EventDataTx)
@ -127,7 +129,7 @@ func TestTxEventsSentWithBroadcastTxSync(t *testing.T) {
require.True(txres.Code.IsOK()) require.True(txres.Code.IsOK())
// and wait for confirmation // and wait for confirmation
evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second)
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
require.Nil(err, "%d: %+v", i, err) require.Nil(err, "%d: %+v", i, err)
// and make sure it has the proper info // and make sure it has the proper info
txe, ok := evt.Unwrap().(types.EventDataTx) txe, ok := evt.Unwrap().(types.EventDataTx)


+ 1
- 2
rpc/core/events.go View File

@ -2,7 +2,6 @@ package core
import ( import (
"context" "context"
"time"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -53,7 +52,7 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri
return nil, errors.Wrap(err, "failed to add subscription") return nil, errors.Wrap(err, "failed to add subscription")
} }
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel() defer cancel()
ch := make(chan interface{}) ch := make(chan interface{})
err = eventBus.Subscribe(ctx, addr, q, ch) err = eventBus.Subscribe(ctx, addr, q, ch)


+ 1
- 1
rpc/core/mempool.go View File

@ -151,7 +151,7 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
// | tx | Tx | nil | true | The transaction | // | tx | Tx | nil | true | The transaction |
func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// subscribe to tx being committed in block // subscribe to tx being committed in block
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel() defer cancel()
deliverTxResCh := make(chan interface{}) deliverTxResCh := make(chan interface{})
q := types.EventQueryTx(tx) q := types.EventQueryTx(tx)


+ 4
- 0
rpc/core/pipe.go View File

@ -1,6 +1,8 @@
package core package core
import ( import (
"time"
crypto "github.com/tendermint/go-crypto" crypto "github.com/tendermint/go-crypto"
"github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/consensus"
cstypes "github.com/tendermint/tendermint/consensus/types" cstypes "github.com/tendermint/tendermint/consensus/types"
@ -12,6 +14,8 @@ import (
"github.com/tendermint/tmlibs/log" "github.com/tendermint/tmlibs/log"
) )
var subscribeTimeout = 5 * time.Second
//---------------------------------------------- //----------------------------------------------
// These interfaces are used by RPC and must be thread safe // These interfaces are used by RPC and must be thread safe


+ 6
- 1
rpc/grpc/api.go View File

@ -1,7 +1,7 @@
package core_grpc package core_grpc
import ( import (
context "golang.org/x/net/context"
"context"
abci "github.com/tendermint/abci/types" abci "github.com/tendermint/abci/types"
core "github.com/tendermint/tendermint/rpc/core" core "github.com/tendermint/tendermint/rpc/core"
@ -10,6 +10,11 @@ import (
type broadcastAPI struct { type broadcastAPI struct {
} }
func (bapi *broadcastAPI) Ping(ctx context.Context, req *RequestPing) (*ResponsePing, error) {
// dummy so we can check if the server is up
return &ResponsePing{}, nil
}
func (bapi *broadcastAPI) BroadcastTx(ctx context.Context, req *RequestBroadcastTx) (*ResponseBroadcastTx, error) { func (bapi *broadcastAPI) BroadcastTx(ctx context.Context, req *RequestBroadcastTx) (*ResponseBroadcastTx, error) {
res, err := core.BroadcastTxCommit(req.Tx) res, err := core.BroadcastTxCommit(req.Tx)
if err != nil { if err != nil {


+ 1
- 1
rpc/grpc/grpc_test.go View File

@ -1,11 +1,11 @@
package core_grpc_test package core_grpc_test
import ( import (
"context"
"os" "os"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/net/context"
"github.com/tendermint/abci/example/dummy" "github.com/tendermint/abci/example/dummy"
"github.com/tendermint/tendermint/rpc/grpc" "github.com/tendermint/tendermint/rpc/grpc"


+ 74
- 18
rpc/grpc/types.pb.go View File

@ -9,7 +9,9 @@ It is generated from these files:
types.proto types.proto
It has these top-level messages: It has these top-level messages:
RequestPing
RequestBroadcastTx RequestBroadcastTx
ResponsePing
ResponseBroadcastTx ResponseBroadcastTx
*/ */
package core_grpc package core_grpc
@ -20,7 +22,8 @@ import math "math"
import types "github.com/tendermint/abci/types" import types "github.com/tendermint/abci/types"
import ( import (
context "golang.org/x/net/context"
"context"
grpc "google.golang.org/grpc" grpc "google.golang.org/grpc"
) )
@ -35,6 +38,14 @@ var _ = math.Inf
// proto package needs to be updated. // proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type RequestPing struct {
}
func (m *RequestPing) Reset() { *m = RequestPing{} }
func (m *RequestPing) String() string { return proto.CompactTextString(m) }
func (*RequestPing) ProtoMessage() {}
func (*RequestPing) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
type RequestBroadcastTx struct { type RequestBroadcastTx struct {
Tx []byte `protobuf:"bytes,1,opt,name=tx,proto3" json:"tx,omitempty"` Tx []byte `protobuf:"bytes,1,opt,name=tx,proto3" json:"tx,omitempty"`
} }
@ -42,7 +53,7 @@ type RequestBroadcastTx struct {
func (m *RequestBroadcastTx) Reset() { *m = RequestBroadcastTx{} } func (m *RequestBroadcastTx) Reset() { *m = RequestBroadcastTx{} }
func (m *RequestBroadcastTx) String() string { return proto.CompactTextString(m) } func (m *RequestBroadcastTx) String() string { return proto.CompactTextString(m) }
func (*RequestBroadcastTx) ProtoMessage() {} func (*RequestBroadcastTx) ProtoMessage() {}
func (*RequestBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (*RequestBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *RequestBroadcastTx) GetTx() []byte { func (m *RequestBroadcastTx) GetTx() []byte {
if m != nil { if m != nil {
@ -51,15 +62,23 @@ func (m *RequestBroadcastTx) GetTx() []byte {
return nil return nil
} }
type ResponsePing struct {
}
func (m *ResponsePing) Reset() { *m = ResponsePing{} }
func (m *ResponsePing) String() string { return proto.CompactTextString(m) }
func (*ResponsePing) ProtoMessage() {}
func (*ResponsePing) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
type ResponseBroadcastTx struct { type ResponseBroadcastTx struct {
CheckTx *types.ResponseCheckTx `protobuf:"bytes,1,opt,name=check_tx,json=checkTx" json:"check_tx,omitempty"`
CheckTx *types.ResponseCheckTx `protobuf:"bytes,1,opt,name=check_tx,json=checkTx" json:"check_tx,omitempty"`
DeliverTx *types.ResponseDeliverTx `protobuf:"bytes,2,opt,name=deliver_tx,json=deliverTx" json:"deliver_tx,omitempty"` DeliverTx *types.ResponseDeliverTx `protobuf:"bytes,2,opt,name=deliver_tx,json=deliverTx" json:"deliver_tx,omitempty"`
} }
func (m *ResponseBroadcastTx) Reset() { *m = ResponseBroadcastTx{} } func (m *ResponseBroadcastTx) Reset() { *m = ResponseBroadcastTx{} }
func (m *ResponseBroadcastTx) String() string { return proto.CompactTextString(m) } func (m *ResponseBroadcastTx) String() string { return proto.CompactTextString(m) }
func (*ResponseBroadcastTx) ProtoMessage() {} func (*ResponseBroadcastTx) ProtoMessage() {}
func (*ResponseBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (*ResponseBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
func (m *ResponseBroadcastTx) GetCheckTx() *types.ResponseCheckTx { func (m *ResponseBroadcastTx) GetCheckTx() *types.ResponseCheckTx {
if m != nil { if m != nil {
@ -76,7 +95,9 @@ func (m *ResponseBroadcastTx) GetDeliverTx() *types.ResponseDeliverTx {
} }
func init() { func init() {
proto.RegisterType((*RequestPing)(nil), "core_grpc.RequestPing")
proto.RegisterType((*RequestBroadcastTx)(nil), "core_grpc.RequestBroadcastTx") proto.RegisterType((*RequestBroadcastTx)(nil), "core_grpc.RequestBroadcastTx")
proto.RegisterType((*ResponsePing)(nil), "core_grpc.ResponsePing")
proto.RegisterType((*ResponseBroadcastTx)(nil), "core_grpc.ResponseBroadcastTx") proto.RegisterType((*ResponseBroadcastTx)(nil), "core_grpc.ResponseBroadcastTx")
} }
@ -91,6 +112,7 @@ const _ = grpc.SupportPackageIsVersion4
// Client API for BroadcastAPI service // Client API for BroadcastAPI service
type BroadcastAPIClient interface { type BroadcastAPIClient interface {
Ping(ctx context.Context, in *RequestPing, opts ...grpc.CallOption) (*ResponsePing, error)
BroadcastTx(ctx context.Context, in *RequestBroadcastTx, opts ...grpc.CallOption) (*ResponseBroadcastTx, error) BroadcastTx(ctx context.Context, in *RequestBroadcastTx, opts ...grpc.CallOption) (*ResponseBroadcastTx, error)
} }
@ -102,6 +124,15 @@ func NewBroadcastAPIClient(cc *grpc.ClientConn) BroadcastAPIClient {
return &broadcastAPIClient{cc} return &broadcastAPIClient{cc}
} }
func (c *broadcastAPIClient) Ping(ctx context.Context, in *RequestPing, opts ...grpc.CallOption) (*ResponsePing, error) {
out := new(ResponsePing)
err := grpc.Invoke(ctx, "/core_grpc.BroadcastAPI/Ping", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *broadcastAPIClient) BroadcastTx(ctx context.Context, in *RequestBroadcastTx, opts ...grpc.CallOption) (*ResponseBroadcastTx, error) { func (c *broadcastAPIClient) BroadcastTx(ctx context.Context, in *RequestBroadcastTx, opts ...grpc.CallOption) (*ResponseBroadcastTx, error) {
out := new(ResponseBroadcastTx) out := new(ResponseBroadcastTx)
err := grpc.Invoke(ctx, "/core_grpc.BroadcastAPI/BroadcastTx", in, out, c.cc, opts...) err := grpc.Invoke(ctx, "/core_grpc.BroadcastAPI/BroadcastTx", in, out, c.cc, opts...)
@ -114,6 +145,7 @@ func (c *broadcastAPIClient) BroadcastTx(ctx context.Context, in *RequestBroadca
// Server API for BroadcastAPI service // Server API for BroadcastAPI service
type BroadcastAPIServer interface { type BroadcastAPIServer interface {
Ping(context.Context, *RequestPing) (*ResponsePing, error)
BroadcastTx(context.Context, *RequestBroadcastTx) (*ResponseBroadcastTx, error) BroadcastTx(context.Context, *RequestBroadcastTx) (*ResponseBroadcastTx, error)
} }
@ -121,6 +153,24 @@ func RegisterBroadcastAPIServer(s *grpc.Server, srv BroadcastAPIServer) {
s.RegisterService(&_BroadcastAPI_serviceDesc, srv) s.RegisterService(&_BroadcastAPI_serviceDesc, srv)
} }
func _BroadcastAPI_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RequestPing)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BroadcastAPIServer).Ping(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/core_grpc.BroadcastAPI/Ping",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BroadcastAPIServer).Ping(ctx, req.(*RequestPing))
}
return interceptor(ctx, in, info, handler)
}
func _BroadcastAPI_BroadcastTx_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { func _BroadcastAPI_BroadcastTx_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RequestBroadcastTx) in := new(RequestBroadcastTx)
if err := dec(in); err != nil { if err := dec(in); err != nil {
@ -143,6 +193,10 @@ var _BroadcastAPI_serviceDesc = grpc.ServiceDesc{
ServiceName: "core_grpc.BroadcastAPI", ServiceName: "core_grpc.BroadcastAPI",
HandlerType: (*BroadcastAPIServer)(nil), HandlerType: (*BroadcastAPIServer)(nil),
Methods: []grpc.MethodDesc{ Methods: []grpc.MethodDesc{
{
MethodName: "Ping",
Handler: _BroadcastAPI_Ping_Handler,
},
{ {
MethodName: "BroadcastTx", MethodName: "BroadcastTx",
Handler: _BroadcastAPI_BroadcastTx_Handler, Handler: _BroadcastAPI_BroadcastTx_Handler,
@ -155,20 +209,22 @@ var _BroadcastAPI_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("types.proto", fileDescriptor0) } func init() { proto.RegisterFile("types.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{ var fileDescriptor0 = []byte{
// 226 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2e, 0xa9, 0x2c, 0x48,
// 264 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2e, 0xa9, 0x2c, 0x48,
0x2d, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x4c, 0xce, 0x2f, 0x4a, 0x8d, 0x4f, 0x2f, 0x2d, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x4c, 0xce, 0x2f, 0x4a, 0x8d, 0x4f, 0x2f,
0x2a, 0x48, 0x96, 0xd2, 0x49, 0xcf, 0x2c, 0xc9, 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x2f, 0x2a, 0x48, 0x96, 0xd2, 0x49, 0xcf, 0x2c, 0xc9, 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x2f,
0x49, 0xcd, 0x4b, 0x49, 0x2d, 0xca, 0xcd, 0xcc, 0x2b, 0xd1, 0x2f, 0xc9, 0x2d, 0x2e, 0xd0, 0x07,
0x6b, 0xd1, 0x47, 0xd2, 0xa8, 0xa4, 0xc2, 0x25, 0x14, 0x94, 0x5a, 0x58, 0x9a, 0x5a, 0x5c, 0xe2,
0x54, 0x94, 0x9f, 0x98, 0x92, 0x9c, 0x58, 0x5c, 0x12, 0x52, 0x21, 0xc4, 0xc7, 0xc5, 0x54, 0x52,
0x21, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x13, 0xc4, 0x54, 0x52, 0xa1, 0x54, 0xc7, 0x25, 0x1c, 0x94,
0x5a, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0x8a, 0xac, 0xcc, 0x90, 0x8b, 0x23, 0x39, 0x23, 0x35, 0x39,
0x3b, 0x1e, 0xaa, 0x98, 0xdb, 0x48, 0x4c, 0x0f, 0x62, 0x38, 0x4c, 0xb5, 0x33, 0x48, 0x3a, 0xa4,
0x22, 0x88, 0x3d, 0x19, 0xc2, 0x10, 0x32, 0xe1, 0xe2, 0x4c, 0x2c, 0x28, 0x48, 0xcd, 0x4b, 0x01,
0xe9, 0x61, 0x02, 0xeb, 0x11, 0x47, 0xd3, 0xe3, 0x08, 0x96, 0x0f, 0xa9, 0x08, 0xe2, 0x48, 0x84,
0xb2, 0x8c, 0x62, 0xb8, 0x78, 0xe0, 0xf6, 0x3a, 0x06, 0x78, 0x0a, 0xf9, 0x70, 0x71, 0x23, 0xbb,
0x43, 0x56, 0x0f, 0xee, 0x7d, 0x3d, 0x4c, 0xdf, 0x48, 0xc9, 0xa1, 0x48, 0x63, 0x78, 0x23, 0x89,
0x0d, 0x1c, 0x14, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x68, 0x73, 0x87, 0xb0, 0x52, 0x01,
0x00, 0x00,
0x49, 0xcd, 0x4b, 0x49, 0x2d, 0xca, 0xcd, 0xcc, 0x2b, 0xd1, 0x4f, 0x4c, 0x4a, 0xce, 0xd4, 0x07,
0x6b, 0xd1, 0x47, 0xd2, 0xa8, 0xc4, 0xcb, 0xc5, 0x1d, 0x94, 0x5a, 0x58, 0x9a, 0x5a, 0x5c, 0x12,
0x90, 0x99, 0x97, 0xae, 0xa4, 0xc2, 0x25, 0x04, 0xe5, 0x3a, 0x15, 0xe5, 0x27, 0xa6, 0x24, 0x27,
0x16, 0x97, 0x84, 0x54, 0x08, 0xf1, 0x71, 0x31, 0x95, 0x54, 0x48, 0x30, 0x2a, 0x30, 0x6a, 0xf0,
0x04, 0x31, 0x95, 0x54, 0x28, 0xf1, 0x71, 0xf1, 0x04, 0xa5, 0x16, 0x17, 0xe4, 0xe7, 0x15, 0xa7,
0x82, 0x75, 0x35, 0x32, 0x72, 0x09, 0xc3, 0x04, 0x90, 0xf5, 0x19, 0x72, 0x71, 0x24, 0x67, 0xa4,
0x26, 0x67, 0xc7, 0x43, 0x75, 0x73, 0x1b, 0x89, 0xe9, 0x41, 0x2c, 0x87, 0xa9, 0x76, 0x06, 0x49,
0x87, 0x54, 0x04, 0xb1, 0x27, 0x43, 0x18, 0x42, 0xe6, 0x5c, 0x5c, 0x29, 0xa9, 0x39, 0x99, 0x65,
0xa9, 0x45, 0x20, 0x4d, 0x4c, 0x60, 0x4d, 0x12, 0x68, 0x9a, 0x5c, 0x20, 0x0a, 0x42, 0x2a, 0x82,
0x38, 0x53, 0x60, 0x4c, 0xa3, 0xa9, 0x8c, 0x5c, 0x3c, 0x70, 0xbb, 0x1d, 0x03, 0x3c, 0x85, 0xcc,
0xb9, 0x58, 0x40, 0x8e, 0x13, 0x12, 0xd3, 0x83, 0x87, 0x8d, 0x1e, 0x92, 0x57, 0xa5, 0xc4, 0x51,
0xc4, 0x11, 0xbe, 0x11, 0xf2, 0xe1, 0xe2, 0x46, 0xf6, 0x84, 0x2c, 0xa6, 0x7e, 0x24, 0x69, 0x29,
0x39, 0x2c, 0xc6, 0x20, 0xc9, 0x27, 0xb1, 0x81, 0xc3, 0xd9, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff,
0x92, 0x29, 0xd9, 0x42, 0xaf, 0x01, 0x00, 0x00,
} }

+ 8
- 1
rpc/grpc/types.proto View File

@ -1,7 +1,7 @@
syntax = "proto3"; syntax = "proto3";
package core_grpc; package core_grpc;
import "github.com/tendermint/abci/blob/master/types/types.proto";
import "github.com/tendermint/abci/types/types.proto";
//---------------------------------------- //----------------------------------------
// Message types // Message types
@ -9,6 +9,9 @@ import "github.com/tendermint/abci/blob/master/types/types.proto";
//---------------------------------------- //----------------------------------------
// Request types // Request types
message RequestPing {
}
message RequestBroadcastTx { message RequestBroadcastTx {
bytes tx = 1; bytes tx = 1;
} }
@ -16,6 +19,9 @@ message RequestBroadcastTx {
//---------------------------------------- //----------------------------------------
// Response types // Response types
message ResponsePing{
}
message ResponseBroadcastTx{ message ResponseBroadcastTx{
types.ResponseCheckTx check_tx = 1; types.ResponseCheckTx check_tx = 1;
types.ResponseDeliverTx deliver_tx = 2; types.ResponseDeliverTx deliver_tx = 2;
@ -25,5 +31,6 @@ message ResponseBroadcastTx{
// Service Definition // Service Definition
service BroadcastAPI { service BroadcastAPI {
rpc Ping(RequestPing) returns (ResponsePing) ;
rpc BroadcastTx(RequestBroadcastTx) returns (ResponseBroadcastTx) ; rpc BroadcastTx(RequestBroadcastTx) returns (ResponseBroadcastTx) ;
} }

+ 3
- 1
rpc/lib/client/ws_client_test.go View File

@ -17,6 +17,8 @@ import (
types "github.com/tendermint/tendermint/rpc/lib/types" types "github.com/tendermint/tendermint/rpc/lib/types"
) )
var wsCallTimeout = 5 * time.Second
type myHandler struct { type myHandler struct {
closeConnAfterRead bool closeConnAfterRead bool
mtx sync.RWMutex mtx sync.RWMutex
@ -138,7 +140,7 @@ func TestWSClientReconnectFailure(t *testing.T) {
// results in WS write error // results in WS write error
// provide timeout to avoid blocking // provide timeout to avoid blocking
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.Background(), wsCallTimeout)
defer cancel() defer cancel()
c.Call(ctx, "a", make(map[string]interface{})) c.Call(ctx, "a", make(map[string]interface{}))


+ 39
- 8
rpc/test/helpers.go View File

@ -1,6 +1,7 @@
package rpctest package rpctest
import ( import (
"context"
"fmt" "fmt"
"math/rand" "math/rand"
"os" "os"
@ -13,11 +14,35 @@ import (
cfg "github.com/tendermint/tendermint/config" cfg "github.com/tendermint/tendermint/config"
nm "github.com/tendermint/tendermint/node" nm "github.com/tendermint/tendermint/node"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
core_grpc "github.com/tendermint/tendermint/rpc/grpc" core_grpc "github.com/tendermint/tendermint/rpc/grpc"
rpcclient "github.com/tendermint/tendermint/rpc/lib/client"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
var config *cfg.Config
var globalConfig *cfg.Config
func waitForRPC() {
laddr := GetConfig().RPC.ListenAddress
client := rpcclient.NewJSONRPCClient(laddr)
result := new(ctypes.ResultStatus)
for {
_, err := client.Call("status", map[string]interface{}{}, result)
if err == nil {
return
}
}
}
func waitForGRPC() {
client := GetGRPCClient()
for {
_, err := client.Ping(context.Background(), &core_grpc.RequestPing{})
if err == nil {
return
}
}
}
// f**ing long, but unique for each test // f**ing long, but unique for each test
func makePathname() string { func makePathname() string {
@ -46,21 +71,21 @@ func makeAddrs() (string, string, string) {
// GetConfig returns a config for the test cases as a singleton // GetConfig returns a config for the test cases as a singleton
func GetConfig() *cfg.Config { func GetConfig() *cfg.Config {
if config == nil {
if globalConfig == nil {
pathname := makePathname() pathname := makePathname()
config = cfg.ResetTestRoot(pathname)
globalConfig = cfg.ResetTestRoot(pathname)
// and we use random ports to run in parallel // and we use random ports to run in parallel
tm, rpc, grpc := makeAddrs() tm, rpc, grpc := makeAddrs()
config.P2P.ListenAddress = tm
config.RPC.ListenAddress = rpc
config.RPC.GRPCListenAddress = grpc
globalConfig.P2P.ListenAddress = tm
globalConfig.RPC.ListenAddress = rpc
globalConfig.RPC.GRPCListenAddress = grpc
} }
return config
return globalConfig
} }
func GetGRPCClient() core_grpc.BroadcastAPIClient { func GetGRPCClient() core_grpc.BroadcastAPIClient {
grpcAddr := config.RPC.GRPCListenAddress
grpcAddr := globalConfig.RPC.GRPCListenAddress
return core_grpc.StartGRPCClient(grpcAddr) return core_grpc.StartGRPCClient(grpcAddr)
} }
@ -68,7 +93,13 @@ func GetGRPCClient() core_grpc.BroadcastAPIClient {
func StartTendermint(app abci.Application) *nm.Node { func StartTendermint(app abci.Application) *nm.Node {
node := NewTendermint(app) node := NewTendermint(app)
node.Start() node.Start()
// wait for rpc
waitForRPC()
waitForGRPC()
fmt.Println("Tendermint running!") fmt.Println("Tendermint running!")
return node return node
} }


+ 1
- 1
test/app/grpc_client.go View File

@ -5,7 +5,7 @@ import (
"fmt" "fmt"
"os" "os"
"golang.org/x/net/context"
"context"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/rpc/grpc" "github.com/tendermint/tendermint/rpc/grpc"


Loading…
Cancel
Save