Browse Source

Merge pull request #820 from tendermint/790-use-tickers-instead-of-time-Sleep

prefer tickers to time.Sleep
pull/824/head
Ethan Buchman 7 years ago
committed by GitHub
parent
commit
dd47884661
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 86 additions and 80 deletions
  1. +7
    -9
      consensus/mempool_test.go
  2. +16
    -9
      node/node_test.go
  3. +29
    -9
      p2p/pex_reactor_test.go
  4. +34
    -53
      p2p/switch_test.go

+ 7
- 9
consensus/mempool_test.go View File

@ -124,8 +124,8 @@ func TestRmBadTx(t *testing.T) {
app.DeliverTx(txBytes)
app.Commit()
ch := make(chan struct{})
cbCh := make(chan struct{})
emptyMempoolCh := make(chan struct{})
checkTxRespCh := make(chan struct{})
go func() {
// Try to send the tx through the mempool.
// CheckTx should not err, but the app should return a bad abci code
@ -134,7 +134,7 @@ func TestRmBadTx(t *testing.T) {
if r.GetCheckTx().Code != abci.CodeType_BadNonce {
t.Fatalf("expected checktx to return bad nonce, got %v", r)
}
cbCh <- struct{}{}
checkTxRespCh <- struct{}{}
})
if err != nil {
t.Fatal("Error after CheckTx: %v", err)
@ -142,20 +142,18 @@ func TestRmBadTx(t *testing.T) {
// check for the tx
for {
time.Sleep(time.Second)
txs := cs.mempool.Reap(1)
if len(txs) == 0 {
ch <- struct{}{}
return
emptyMempoolCh <- struct{}{}
}
time.Sleep(10 * time.Millisecond)
}
}()
// Wait until the tx returns
ticker := time.After(time.Second * 5)
select {
case <-cbCh:
case <-checkTxRespCh:
// success
case <-ticker:
t.Fatalf("Timed out waiting for tx to return")
@ -164,7 +162,7 @@ func TestRmBadTx(t *testing.T) {
// Wait until the tx is removed
ticker = time.After(time.Second * 5)
select {
case <-ch:
case <-emptyMempoolCh:
// success
case <-ticker:
t.Fatalf("Timed out waiting for tx to be removed")


+ 16
- 9
node/node_test.go View File

@ -9,30 +9,37 @@ import (
"github.com/tendermint/tmlibs/log"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/types"
)
func TestNodeStartStop(t *testing.T) {
config := cfg.ResetTestRoot("node_node_test")
// Create & start node
// create & start node
n, err := DefaultNewNode(config, log.TestingLogger())
assert.NoError(t, err, "expected no err on DefaultNewNode")
n.Start()
t.Logf("Started node %v", n.sw.NodeInfo())
// Wait a bit to initialize
// TODO remove time.Sleep(), make asynchronous.
time.Sleep(time.Second * 2)
// wait for the node to produce a block
blockCh := make(chan struct{})
types.AddListenerForEvent(n.EventSwitch(), "node_test", types.EventStringNewBlock(), func(types.TMEventData) {
blockCh <- struct{}{}
})
select {
case <-blockCh:
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for the node to produce a block")
}
ch := make(chan struct{}, 1)
// stop the node
go func() {
n.Stop()
ch <- struct{}{}
}()
ticker := time.NewTicker(time.Second * 5)
select {
case <-ch:
case <-ticker.C:
case <-n.Quit:
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for shutdown")
}
}

+ 29
- 9
p2p/pex_reactor_test.go View File

@ -1,6 +1,7 @@
package p2p
import (
"fmt"
"io/ioutil"
"math/rand"
"os"
@ -98,15 +99,7 @@ func TestPEXReactorRunning(t *testing.T) {
require.Nil(err)
}
time.Sleep(1 * time.Second)
// check peers are connected after some time
for _, s := range switches {
outbound, inbound, _ := s.NumPeers()
if outbound+inbound == 0 {
t.Errorf("%v expected to be connected to at least one peer", s.NodeInfo().ListenAddr)
}
}
assertSomePeersWithTimeout(t, switches, 10*time.Millisecond, 10*time.Second)
// stop them
for _, s := range switches {
@ -114,6 +107,33 @@ func TestPEXReactorRunning(t *testing.T) {
}
}
func assertSomePeersWithTimeout(t *testing.T, switches []*Switch, checkPeriod, timeout time.Duration) {
ticker := time.NewTicker(checkPeriod)
for {
select {
case <-ticker.C:
// check peers are connected
allGood := true
for _, s := range switches {
outbound, inbound, _ := s.NumPeers()
if outbound+inbound == 0 {
allGood = false
}
}
if allGood {
return
}
case <-time.After(timeout):
numPeersStr := ""
for i, s := range switches {
outbound, inbound, _ := s.NumPeers()
numPeersStr += fmt.Sprintf("%d => {outbound: %d, inbound: %d}, ", i, outbound, inbound)
}
t.Errorf("expected all switches to be connected to at least one peer (switches: %s)", numPeersStr)
}
}
}
func TestPEXReactorReceive(t *testing.T) {
assert, require := assert.New(t), require.New(t)


+ 34
- 53
p2p/switch_test.go View File

@ -131,41 +131,34 @@ func TestSwitches(t *testing.T) {
s1.Broadcast(byte(0x01), ch1Msg)
s1.Broadcast(byte(0x02), ch2Msg)
// Wait for things to settle...
time.Sleep(5000 * time.Millisecond)
// Check message on ch0
ch0Msgs := s2.Reactor("foo").(*TestReactor).getMsgs(byte(0x00))
if len(ch0Msgs) != 1 {
t.Errorf("Expected to have received 1 message in ch0")
}
if !bytes.Equal(ch0Msgs[0].Bytes, wire.BinaryBytes(ch0Msg)) {
t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch0Msg), ch0Msgs[0].Bytes)
}
// Check message on ch1
ch1Msgs := s2.Reactor("foo").(*TestReactor).getMsgs(byte(0x01))
if len(ch1Msgs) != 1 {
t.Errorf("Expected to have received 1 message in ch1")
}
if !bytes.Equal(ch1Msgs[0].Bytes, wire.BinaryBytes(ch1Msg)) {
t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch1Msg), ch1Msgs[0].Bytes)
}
assertMsgReceivedWithTimeout(t, ch0Msg, byte(0x00), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
assertMsgReceivedWithTimeout(t, ch1Msg, byte(0x01), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
assertMsgReceivedWithTimeout(t, ch2Msg, byte(0x02), s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second)
}
// Check message on ch2
ch2Msgs := s2.Reactor("bar").(*TestReactor).getMsgs(byte(0x02))
if len(ch2Msgs) != 1 {
t.Errorf("Expected to have received 1 message in ch2")
}
if !bytes.Equal(ch2Msgs[0].Bytes, wire.BinaryBytes(ch2Msg)) {
t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch2Msg), ch2Msgs[0].Bytes)
func assertMsgReceivedWithTimeout(t *testing.T, msg string, channel byte, reactor *TestReactor, checkPeriod, timeout time.Duration) {
ticker := time.NewTicker(checkPeriod)
for {
select {
case <-ticker.C:
msgs := reactor.getMsgs(channel)
if len(msgs) > 0 {
if !bytes.Equal(msgs[0].Bytes, wire.BinaryBytes(msg)) {
t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(msg), msgs[0].Bytes)
}
return
}
case <-time.After(timeout):
t.Fatalf("Expected to have received 1 message in channel #%v, got zero", channel)
}
}
}
func TestConnAddrFilter(t *testing.T) {
s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
defer s1.Stop()
defer s2.Stop()
c1, c2 := net.Pipe()
@ -184,22 +177,22 @@ func TestConnAddrFilter(t *testing.T) {
s2.addPeerWithConnection(c2)
}()
// Wait for things to happen, peers to get added...
time.Sleep(100 * time.Millisecond * time.Duration(4))
assertNoPeersAfterTimeout(t, s1, 400*time.Millisecond)
assertNoPeersAfterTimeout(t, s2, 400*time.Millisecond)
}
defer s1.Stop()
defer s2.Stop()
if s1.Peers().Size() != 0 {
t.Errorf("Expected s1 not to connect to peers, got %d", s1.Peers().Size())
}
if s2.Peers().Size() != 0 {
t.Errorf("Expected s2 not to connect to peers, got %d", s2.Peers().Size())
func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration) {
time.Sleep(timeout)
if sw.Peers().Size() != 0 {
t.Fatalf("Expected %v to not connect to some peers, got %d", sw, sw.Peers().Size())
}
}
func TestConnPubKeyFilter(t *testing.T) {
s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
defer s1.Stop()
defer s2.Stop()
c1, c2 := net.Pipe()
@ -219,17 +212,8 @@ func TestConnPubKeyFilter(t *testing.T) {
s2.addPeerWithConnection(c2)
}()
// Wait for things to happen, peers to get added...
time.Sleep(100 * time.Millisecond * time.Duration(4))
defer s1.Stop()
defer s2.Stop()
if s1.Peers().Size() != 0 {
t.Errorf("Expected s1 not to connect to peers, got %d", s1.Peers().Size())
}
if s2.Peers().Size() != 0 {
t.Errorf("Expected s2 not to connect to peers, got %d", s2.Peers().Size())
}
assertNoPeersAfterTimeout(t, s1, 400*time.Millisecond)
assertNoPeersAfterTimeout(t, s2, 400*time.Millisecond)
}
func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
@ -252,9 +236,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
// simulate failure by closing connection
peer.CloseConn()
time.Sleep(100 * time.Millisecond)
assert.Zero(sw.Peers().Size())
assertNoPeersAfterTimeout(t, sw, 100*time.Millisecond)
assert.False(peer.IsRunning())
}
@ -305,7 +287,7 @@ func BenchmarkSwitches(b *testing.B) {
defer s2.Stop()
// Allow time for goroutines to boot up
time.Sleep(1000 * time.Millisecond)
time.Sleep(1 * time.Second)
b.StartTimer()
numSuccess, numFailure := 0, 0
@ -327,5 +309,4 @@ func BenchmarkSwitches(b *testing.B) {
// Allow everything to flush before stopping switches & closing connections.
b.StopTimer()
time.Sleep(1000 * time.Millisecond)
}

Loading…
Cancel
Save