Browse Source

crank context timeouts

pull/845/head
Ethan Buchman 7 years ago
parent
commit
a969e24177
5 changed files with 15 additions and 8 deletions
  1. +6
    -4
      rpc/client/event_test.go
  2. +1
    -2
      rpc/core/events.go
  3. +1
    -1
      rpc/core/mempool.go
  4. +4
    -0
      rpc/core/pipe.go
  5. +3
    -1
      rpc/lib/client/ws_client_test.go

+ 6
- 4
rpc/client/event_test.go View File

@ -12,6 +12,8 @@ import (
"github.com/tendermint/tendermint/types"
)
var waitForEventTimeout = 5 * time.Second
// MakeTxKV returns a text transaction, allong with expected key, value pair
func MakeTxKV() ([]byte, []byte, []byte) {
k := []byte(cmn.RandStr(8))
@ -32,7 +34,7 @@ func TestHeaderEvents(t *testing.T) {
}
evtTyp := types.EventNewBlockHeader
evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second)
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
require.Nil(err, "%d: %+v", i, err)
_, ok := evt.Unwrap().(types.EventDataNewBlockHeader)
require.True(ok, "%d: %#v", i, evt)
@ -56,7 +58,7 @@ func TestBlockEvents(t *testing.T) {
var firstBlockHeight int
for j := 0; j < 3; j++ {
evtTyp := types.EventNewBlock
evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second)
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
require.Nil(err, "%d: %+v", j, err)
blockEvent, ok := evt.Unwrap().(types.EventDataNewBlock)
require.True(ok, "%d: %#v", j, evt)
@ -94,7 +96,7 @@ func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) {
require.True(txres.Code.IsOK())
// and wait for confirmation
evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second)
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
require.Nil(err, "%d: %+v", i, err)
// and make sure it has the proper info
txe, ok := evt.Unwrap().(types.EventDataTx)
@ -127,7 +129,7 @@ func TestTxEventsSentWithBroadcastTxSync(t *testing.T) {
require.True(txres.Code.IsOK())
// and wait for confirmation
evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second)
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
require.Nil(err, "%d: %+v", i, err)
// and make sure it has the proper info
txe, ok := evt.Unwrap().(types.EventDataTx)


+ 1
- 2
rpc/core/events.go View File

@ -2,7 +2,6 @@ package core
import (
"context"
"time"
"github.com/pkg/errors"
@ -53,7 +52,7 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri
return nil, errors.Wrap(err, "failed to add subscription")
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel()
ch := make(chan interface{})
err = eventBus.Subscribe(ctx, addr, q, ch)


+ 1
- 1
rpc/core/mempool.go View File

@ -151,7 +151,7 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
// | tx | Tx | nil | true | The transaction |
func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// subscribe to tx being committed in block
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel()
deliverTxResCh := make(chan interface{})
q := types.EventQueryTx(tx)


+ 4
- 0
rpc/core/pipe.go View File

@ -1,6 +1,8 @@
package core
import (
"time"
crypto "github.com/tendermint/go-crypto"
"github.com/tendermint/tendermint/consensus"
cstypes "github.com/tendermint/tendermint/consensus/types"
@ -12,6 +14,8 @@ import (
"github.com/tendermint/tmlibs/log"
)
var subscribeTimeout = 5 * time.Second
//----------------------------------------------
// These interfaces are used by RPC and must be thread safe


+ 3
- 1
rpc/lib/client/ws_client_test.go View File

@ -17,6 +17,8 @@ import (
types "github.com/tendermint/tendermint/rpc/lib/types"
)
var wsCallTimeout = 5 * time.Second
type myHandler struct {
closeConnAfterRead bool
mtx sync.RWMutex
@ -138,7 +140,7 @@ func TestWSClientReconnectFailure(t *testing.T) {
// results in WS write error
// provide timeout to avoid blocking
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.Background(), wsCallTimeout)
defer cancel()
c.Call(ctx, "a", make(map[string]interface{}))


Loading…
Cancel
Save