diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index 3a430ef26..b2fe3d082 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -124,8 +124,8 @@ func TestRmBadTx(t *testing.T) { app.DeliverTx(txBytes) app.Commit() - ch := make(chan struct{}) - cbCh := make(chan struct{}) + emptyMempoolCh := make(chan struct{}) + checkTxRespCh := make(chan struct{}) go func() { // Try to send the tx through the mempool. // CheckTx should not err, but the app should return a bad abci code @@ -134,28 +134,24 @@ func TestRmBadTx(t *testing.T) { if r.GetCheckTx().Code != abci.CodeType_BadNonce { t.Fatalf("expected checktx to return bad nonce, got %v", r) } - cbCh <- struct{}{} + checkTxRespCh <- struct{}{} }) if err != nil { t.Fatal("Error after CheckTx: %v", err) } // check for the tx - for { - time.Sleep(time.Second) - txs := cs.mempool.Reap(1) - if len(txs) == 0 { - ch <- struct{}{} - return - } - + txs := cs.mempool.Reap(1) + if len(txs) == 0 { + emptyMempoolCh <- struct{}{} + return } }() // Wait until the tx returns ticker := time.After(time.Second * 5) select { - case <-cbCh: + case <-checkTxRespCh: // success case <-ticker: t.Fatalf("Timed out waiting for tx to return") @@ -164,7 +160,7 @@ func TestRmBadTx(t *testing.T) { // Wait until the tx is removed ticker = time.After(time.Second * 5) select { - case <-ch: + case <-emptyMempoolCh: // success case <-ticker: t.Fatalf("Timed out waiting for tx to be removed") diff --git a/node/node_test.go b/node/node_test.go index 641e606c3..f19d91639 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -20,19 +20,23 @@ func TestNodeStartStop(t *testing.T) { n.Start() t.Logf("Started node %v", n.sw.NodeInfo()) - // Wait a bit to initialize - // TODO remove time.Sleep(), make asynchronous. - time.Sleep(time.Second * 2) + ticker := time.NewTicker(10 * time.Millisecond) + select { + case <-ticker.C: + if n.IsRunning() { + return + } + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for start") + } - ch := make(chan struct{}, 1) go func() { n.Stop() - ch <- struct{}{} }() - ticker := time.NewTicker(time.Second * 5) + select { - case <-ch: - case <-ticker.C: + case <-n.Quit: + case <-time.After(5 * time.Second): t.Fatal("timed out waiting for shutdown") } } diff --git a/p2p/pex_reactor_test.go b/p2p/pex_reactor_test.go index b2c15ed89..dc0792658 100644 --- a/p2p/pex_reactor_test.go +++ b/p2p/pex_reactor_test.go @@ -1,6 +1,7 @@ package p2p import ( + "fmt" "io/ioutil" "math/rand" "os" @@ -98,15 +99,7 @@ func TestPEXReactorRunning(t *testing.T) { require.Nil(err) } - time.Sleep(1 * time.Second) - - // check peers are connected after some time - for _, s := range switches { - outbound, inbound, _ := s.NumPeers() - if outbound+inbound == 0 { - t.Errorf("%v expected to be connected to at least one peer", s.NodeInfo().ListenAddr) - } - } + assertSomePeersWithTimeout(t, switches, 10*time.Millisecond, 10*time.Second) // stop them for _, s := range switches { @@ -114,6 +107,31 @@ func TestPEXReactorRunning(t *testing.T) { } } +func assertSomePeersWithTimeout(t *testing.T, switches []*Switch, checkPeriod, timeout time.Duration) { + ticker := time.NewTicker(checkPeriod) + select { + case <-ticker.C: + // check peers are connected + allGood := true + for _, s := range switches { + outbound, inbound, _ := s.NumPeers() + if outbound+inbound == 0 { + allGood = false + } + } + if allGood { + return + } + case <-time.After(timeout): + numPeersStr := "" + for i, s := range switches { + outbound, inbound, _ := s.NumPeers() + numPeersStr += fmt.Sprintf("%d => {outbound: %d, inbound: %d}, ", i, outbound, inbound) + } + t.Errorf("expected all switches to be connected to at least one peer (switches: %s)", numPeersStr) + } +} + func TestPEXReactorReceive(t *testing.T) { assert, require := assert.New(t), require.New(t) diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 115811b04..93108b92a 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -131,41 +131,31 @@ func TestSwitches(t *testing.T) { s1.Broadcast(byte(0x01), ch1Msg) s1.Broadcast(byte(0x02), ch2Msg) - // Wait for things to settle... - time.Sleep(5000 * time.Millisecond) - - // Check message on ch0 - ch0Msgs := s2.Reactor("foo").(*TestReactor).getMsgs(byte(0x00)) - if len(ch0Msgs) != 1 { - t.Errorf("Expected to have received 1 message in ch0") - } - if !bytes.Equal(ch0Msgs[0].Bytes, wire.BinaryBytes(ch0Msg)) { - t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch0Msg), ch0Msgs[0].Bytes) - } - - // Check message on ch1 - ch1Msgs := s2.Reactor("foo").(*TestReactor).getMsgs(byte(0x01)) - if len(ch1Msgs) != 1 { - t.Errorf("Expected to have received 1 message in ch1") - } - if !bytes.Equal(ch1Msgs[0].Bytes, wire.BinaryBytes(ch1Msg)) { - t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch1Msg), ch1Msgs[0].Bytes) - } + assertMsgReceivedWithTimeout(t, ch0Msg, byte(0x00), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second) + assertMsgReceivedWithTimeout(t, ch1Msg, byte(0x01), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second) + assertMsgReceivedWithTimeout(t, ch2Msg, byte(0x02), s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second) +} - // Check message on ch2 - ch2Msgs := s2.Reactor("bar").(*TestReactor).getMsgs(byte(0x02)) - if len(ch2Msgs) != 1 { - t.Errorf("Expected to have received 1 message in ch2") - } - if !bytes.Equal(ch2Msgs[0].Bytes, wire.BinaryBytes(ch2Msg)) { - t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch2Msg), ch2Msgs[0].Bytes) +func assertMsgReceivedWithTimeout(t *testing.T, msg string, channel byte, reactor *TestReactor, checkPeriod, timeout time.Duration) { + ticker := time.NewTicker(checkPeriod) + select { + case <-ticker.C: + msgs := reactor.getMsgs(channel) + if len(msgs) > 0 { + if !bytes.Equal(msgs[0].Bytes, wire.BinaryBytes(msg)) { + t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(msg), msgs[0].Bytes) + } + } + case <-time.After(timeout): + t.Fatalf("Expected to have received 1 message in channel #%v, got zero", channel) } - } func TestConnAddrFilter(t *testing.T) { s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) + defer s1.Stop() + defer s2.Stop() c1, c2 := net.Pipe() @@ -184,22 +174,27 @@ func TestConnAddrFilter(t *testing.T) { s2.addPeerWithConnection(c2) }() - // Wait for things to happen, peers to get added... - time.Sleep(100 * time.Millisecond * time.Duration(4)) + assertNoPeersWithTimeout(t, s1, 100*time.Millisecond, 400*time.Millisecond) + assertNoPeersWithTimeout(t, s2, 100*time.Millisecond, 400*time.Millisecond) +} - defer s1.Stop() - defer s2.Stop() - if s1.Peers().Size() != 0 { - t.Errorf("Expected s1 not to connect to peers, got %d", s1.Peers().Size()) - } - if s2.Peers().Size() != 0 { - t.Errorf("Expected s2 not to connect to peers, got %d", s2.Peers().Size()) +func assertNoPeersWithTimeout(t *testing.T, sw *Switch, checkPeriod, timeout time.Duration) { + ticker := time.NewTicker(checkPeriod) + select { + case <-ticker.C: + if sw.Peers().Size() != 0 { + t.Fatalf("Expected %v to not connect to some peers, got %d", sw, sw.Peers().Size()) + } + case <-time.After(timeout): + return } } func TestConnPubKeyFilter(t *testing.T) { s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) + defer s1.Stop() + defer s2.Stop() c1, c2 := net.Pipe() @@ -219,17 +214,8 @@ func TestConnPubKeyFilter(t *testing.T) { s2.addPeerWithConnection(c2) }() - // Wait for things to happen, peers to get added... - time.Sleep(100 * time.Millisecond * time.Duration(4)) - - defer s1.Stop() - defer s2.Stop() - if s1.Peers().Size() != 0 { - t.Errorf("Expected s1 not to connect to peers, got %d", s1.Peers().Size()) - } - if s2.Peers().Size() != 0 { - t.Errorf("Expected s2 not to connect to peers, got %d", s2.Peers().Size()) - } + assertNoPeersWithTimeout(t, s1, 100*time.Millisecond, 400*time.Millisecond) + assertNoPeersWithTimeout(t, s2, 100*time.Millisecond, 400*time.Millisecond) } func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { @@ -252,9 +238,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { // simulate failure by closing connection peer.CloseConn() - time.Sleep(100 * time.Millisecond) - - assert.Zero(sw.Peers().Size()) + assertNoPeersWithTimeout(t, sw, 100*time.Millisecond, 100*time.Millisecond) assert.False(peer.IsRunning()) } @@ -305,7 +289,7 @@ func BenchmarkSwitches(b *testing.B) { defer s2.Stop() // Allow time for goroutines to boot up - time.Sleep(1000 * time.Millisecond) + time.Sleep(1 * time.Second) b.StartTimer() numSuccess, numFailure := 0, 0 @@ -327,5 +311,4 @@ func BenchmarkSwitches(b *testing.B) { // Allow everything to flush before stopping switches & closing connections. b.StopTimer() - time.Sleep(1000 * time.Millisecond) }