From 3c589dac1932306c4e4d3ee8c19df41fcc533db2 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 12 Jan 2017 02:29:53 -0500 Subject: [PATCH] startConsensusNet and stopConsensusNet --- consensus/byzantine_test.go | 9 +++ consensus/reactor_test.go | 116 ++++++++++-------------------------- 2 files changed, 39 insertions(+), 86 deletions(-) diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 989103cce..396c8c074 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -40,6 +40,15 @@ func TestByzantine(t *testing.T) { } reactors := make([]p2p.Reactor, N) + defer func() { + for _, r := range reactors { + if rr, ok := r.(*ByzantineReactor); ok { + rr.reactor.Switch.Stop() + } else { + r.(*ConsensusReactor).Switch.Stop() + } + } + }() eventChans := make([]chan interface{}, N) for i := 0; i < N; i++ { if i == 0 { diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 6b52e6308..ae6a6fc49 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -21,10 +21,7 @@ func init() { //---------------------------------------------- // in-process testnets -// Ensure a testnet makes blocks -func TestReactor(t *testing.T) { - N := 4 - css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter) +func startConsensusNet(t *testing.T, css []*ConsensusState, N int, subscribeEventRespond bool) ([]*ConsensusReactor, []chan interface{}) { reactors := make([]*ConsensusReactor, N) eventChans := make([]chan interface{}, N) for i := 0; i < N; i++ { @@ -37,7 +34,11 @@ func TestReactor(t *testing.T) { } reactors[i].SetEventSwitch(eventSwitch) - eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1) + if subscribeEventRespond { + eventChans[i] = subscribeToEventRespond(eventSwitch, "tester", types.EventStringNewBlock()) + } else { + eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1) + } } // make connected switches and start all reactors p2p.MakeConnectedSwitches(N, func(i int, s *p2p.Switch) *p2p.Switch { @@ -45,12 +46,28 @@ func TestReactor(t *testing.T) { return s }, p2p.Connect2Switches) - // start the state machines + // now that everyone is connected, start the state machines + // If we started the state machines before everyone was connected, + // we'd block when the cs fires NewBlockEvent and the peers are trying to start their reactors for i := 0; i < N; i++ { s := reactors[i].conS.GetState() reactors[i].SwitchToConsensus(s) } + return reactors, eventChans +} + +func stopConsensusNet(reactors []*ConsensusReactor) { + for _, r := range reactors { + r.Switch.Stop() + } +} +// Ensure a testnet makes blocks +func TestReactor(t *testing.T) { + N := 4 + css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter) + reactors, eventChans := startConsensusNet(t, css, N, false) + defer stopConsensusNet(reactors) // wait till everyone makes the first new block timeoutWaitGroup(t, N, func(wg *sync.WaitGroup, j int) { <-eventChans[j] @@ -64,32 +81,8 @@ func TestReactor(t *testing.T) { func TestVotingPowerChange(t *testing.T) { nVals := 4 css := randConsensusNet(nVals, "consensus_voting_power_changes_test", newMockTickerFunc(true), newPersistentDummy) - reactors := make([]*ConsensusReactor, nVals) - eventChans := make([]chan interface{}, nVals) - for i := 0; i < nVals; i++ { - reactors[i] = NewConsensusReactor(css[i], true) // so we dont start the consensus states - - eventSwitch := events.NewEventSwitch() - _, err := eventSwitch.Start() - if err != nil { - t.Fatalf("Failed to start switch: %v", err) - } - - reactors[i].SetEventSwitch(eventSwitch) - eventChans[i] = subscribeToEventRespond(eventSwitch, "tester", types.EventStringNewBlock()) - } - p2p.MakeConnectedSwitches(nVals, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("CONSENSUS", reactors[i]) - return s - }, p2p.Connect2Switches) - - // now that everyone is connected, start the state machines - // If we started the state machines before everyone was connected, - // we'd block when the cs fires NewBlockEvent and the peers are trying to start their reactors - for i := 0; i < nVals; i++ { - s := reactors[i].conS.GetState() - reactors[i].SwitchToConsensus(s) - } + reactors, eventChans := startConsensusNet(t, css, nVals, true) + defer stopConsensusNet(reactors) // map of active validators activeVals := make(map[string]struct{}) @@ -146,36 +139,11 @@ func TestVotingPowerChange(t *testing.T) { } func TestValidatorSetChanges(t *testing.T) { - nPeers := 8 + nPeers := 7 nVals := 4 css := randConsensusNetWithPeers(nVals, nPeers, "consensus_val_set_changes_test", newMockTickerFunc(true), newPersistentDummy) - - reactors := make([]*ConsensusReactor, nPeers) - eventChans := make([]chan interface{}, nPeers) - for i := 0; i < nPeers; i++ { - reactors[i] = NewConsensusReactor(css[i], true) // so we dont start the consensus states - - eventSwitch := events.NewEventSwitch() - _, err := eventSwitch.Start() - if err != nil { - t.Fatalf("Failed to start switch: %v", err) - } - - reactors[i].SetEventSwitch(eventSwitch) - eventChans[i] = subscribeToEventRespond(eventSwitch, "tester", types.EventStringNewBlock()) - } - p2p.MakeConnectedSwitches(nPeers, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("CONSENSUS", reactors[i]) - return s - }, p2p.Connect2Switches) - - // now that everyone is connected, start the state machines - // If we started the state machines before everyone was connected, - // we'd block when the cs fires NewBlockEvent and the peers are trying to start their reactors - for i := 0; i < nPeers; i++ { - s := reactors[i].conS.GetState() - reactors[i].SwitchToConsensus(s) - } + reactors, eventChans := startConsensusNet(t, css, nPeers, true) + defer stopConsensusNet(reactors) // map of active validators activeVals := make(map[string]struct{}) @@ -266,37 +234,13 @@ func TestValidatorSetChanges(t *testing.T) { func TestReactorWithTimeoutCommit(t *testing.T) { N := 4 css := randConsensusNet(N, "consensus_reactor_with_timeout_commit_test", newMockTickerFunc(false), newCounter) - // override default SkipTimeoutCommit == true for tests for i := 0; i < N; i++ { css[i].timeoutParams.SkipTimeoutCommit = false } - reactors := make([]*ConsensusReactor, N-1) - eventChans := make([]chan interface{}, N-1) - for i := 0; i < N-1; i++ { - reactors[i] = NewConsensusReactor(css[i], true) // so we dont start the consensus states - - eventSwitch := events.NewEventSwitch() - _, err := eventSwitch.Start() - if err != nil { - t.Fatalf("Failed to start switch: %v", err) - } - - reactors[i].SetEventSwitch(eventSwitch) - eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1) - } - // make connected switches and start all reactors - p2p.MakeConnectedSwitches(N-1, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("CONSENSUS", reactors[i]) - return s - }, p2p.Connect2Switches) - - // start the state machines - for i := 0; i < N-1; i++ { - s := reactors[i].conS.GetState() - reactors[i].SwitchToConsensus(s) - } + reactors, eventChans := startConsensusNet(t, css, N-1, false) + defer stopConsensusNet(reactors) // wait till everyone makes the first new block timeoutWaitGroup(t, N-1, func(wg *sync.WaitGroup, j int) {