Browse Source

startConsensusNet and stopConsensusNet

pull/362/head
Ethan Buchman 8 years ago
parent
commit
3c589dac19
2 changed files with 39 additions and 86 deletions
  1. +9
    -0
      consensus/byzantine_test.go
  2. +30
    -86
      consensus/reactor_test.go

+ 9
- 0
consensus/byzantine_test.go View File

@ -40,6 +40,15 @@ func TestByzantine(t *testing.T) {
} }
reactors := make([]p2p.Reactor, N) reactors := make([]p2p.Reactor, N)
defer func() {
for _, r := range reactors {
if rr, ok := r.(*ByzantineReactor); ok {
rr.reactor.Switch.Stop()
} else {
r.(*ConsensusReactor).Switch.Stop()
}
}
}()
eventChans := make([]chan interface{}, N) eventChans := make([]chan interface{}, N)
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
if i == 0 { if i == 0 {


+ 30
- 86
consensus/reactor_test.go View File

@ -21,10 +21,7 @@ func init() {
//---------------------------------------------- //----------------------------------------------
// in-process testnets // in-process testnets
// Ensure a testnet makes blocks
func TestReactor(t *testing.T) {
N := 4
css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
func startConsensusNet(t *testing.T, css []*ConsensusState, N int, subscribeEventRespond bool) ([]*ConsensusReactor, []chan interface{}) {
reactors := make([]*ConsensusReactor, N) reactors := make([]*ConsensusReactor, N)
eventChans := make([]chan interface{}, N) eventChans := make([]chan interface{}, N)
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
@ -37,7 +34,11 @@ func TestReactor(t *testing.T) {
} }
reactors[i].SetEventSwitch(eventSwitch) reactors[i].SetEventSwitch(eventSwitch)
eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1)
if subscribeEventRespond {
eventChans[i] = subscribeToEventRespond(eventSwitch, "tester", types.EventStringNewBlock())
} else {
eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1)
}
} }
// make connected switches and start all reactors // make connected switches and start all reactors
p2p.MakeConnectedSwitches(N, func(i int, s *p2p.Switch) *p2p.Switch { p2p.MakeConnectedSwitches(N, func(i int, s *p2p.Switch) *p2p.Switch {
@ -45,12 +46,28 @@ func TestReactor(t *testing.T) {
return s return s
}, p2p.Connect2Switches) }, p2p.Connect2Switches)
// start the state machines
// now that everyone is connected, start the state machines
// If we started the state machines before everyone was connected,
// we'd block when the cs fires NewBlockEvent and the peers are trying to start their reactors
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
s := reactors[i].conS.GetState() s := reactors[i].conS.GetState()
reactors[i].SwitchToConsensus(s) reactors[i].SwitchToConsensus(s)
} }
return reactors, eventChans
}
func stopConsensusNet(reactors []*ConsensusReactor) {
for _, r := range reactors {
r.Switch.Stop()
}
}
// Ensure a testnet makes blocks
func TestReactor(t *testing.T) {
N := 4
css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
reactors, eventChans := startConsensusNet(t, css, N, false)
defer stopConsensusNet(reactors)
// wait till everyone makes the first new block // wait till everyone makes the first new block
timeoutWaitGroup(t, N, func(wg *sync.WaitGroup, j int) { timeoutWaitGroup(t, N, func(wg *sync.WaitGroup, j int) {
<-eventChans[j] <-eventChans[j]
@ -64,32 +81,8 @@ func TestReactor(t *testing.T) {
func TestVotingPowerChange(t *testing.T) { func TestVotingPowerChange(t *testing.T) {
nVals := 4 nVals := 4
css := randConsensusNet(nVals, "consensus_voting_power_changes_test", newMockTickerFunc(true), newPersistentDummy) css := randConsensusNet(nVals, "consensus_voting_power_changes_test", newMockTickerFunc(true), newPersistentDummy)
reactors := make([]*ConsensusReactor, nVals)
eventChans := make([]chan interface{}, nVals)
for i := 0; i < nVals; i++ {
reactors[i] = NewConsensusReactor(css[i], true) // so we dont start the consensus states
eventSwitch := events.NewEventSwitch()
_, err := eventSwitch.Start()
if err != nil {
t.Fatalf("Failed to start switch: %v", err)
}
reactors[i].SetEventSwitch(eventSwitch)
eventChans[i] = subscribeToEventRespond(eventSwitch, "tester", types.EventStringNewBlock())
}
p2p.MakeConnectedSwitches(nVals, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("CONSENSUS", reactors[i])
return s
}, p2p.Connect2Switches)
// now that everyone is connected, start the state machines
// If we started the state machines before everyone was connected,
// we'd block when the cs fires NewBlockEvent and the peers are trying to start their reactors
for i := 0; i < nVals; i++ {
s := reactors[i].conS.GetState()
reactors[i].SwitchToConsensus(s)
}
reactors, eventChans := startConsensusNet(t, css, nVals, true)
defer stopConsensusNet(reactors)
// map of active validators // map of active validators
activeVals := make(map[string]struct{}) activeVals := make(map[string]struct{})
@ -146,36 +139,11 @@ func TestVotingPowerChange(t *testing.T) {
} }
func TestValidatorSetChanges(t *testing.T) { func TestValidatorSetChanges(t *testing.T) {
nPeers := 8
nPeers := 7
nVals := 4 nVals := 4
css := randConsensusNetWithPeers(nVals, nPeers, "consensus_val_set_changes_test", newMockTickerFunc(true), newPersistentDummy) css := randConsensusNetWithPeers(nVals, nPeers, "consensus_val_set_changes_test", newMockTickerFunc(true), newPersistentDummy)
reactors := make([]*ConsensusReactor, nPeers)
eventChans := make([]chan interface{}, nPeers)
for i := 0; i < nPeers; i++ {
reactors[i] = NewConsensusReactor(css[i], true) // so we dont start the consensus states
eventSwitch := events.NewEventSwitch()
_, err := eventSwitch.Start()
if err != nil {
t.Fatalf("Failed to start switch: %v", err)
}
reactors[i].SetEventSwitch(eventSwitch)
eventChans[i] = subscribeToEventRespond(eventSwitch, "tester", types.EventStringNewBlock())
}
p2p.MakeConnectedSwitches(nPeers, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("CONSENSUS", reactors[i])
return s
}, p2p.Connect2Switches)
// now that everyone is connected, start the state machines
// If we started the state machines before everyone was connected,
// we'd block when the cs fires NewBlockEvent and the peers are trying to start their reactors
for i := 0; i < nPeers; i++ {
s := reactors[i].conS.GetState()
reactors[i].SwitchToConsensus(s)
}
reactors, eventChans := startConsensusNet(t, css, nPeers, true)
defer stopConsensusNet(reactors)
// map of active validators // map of active validators
activeVals := make(map[string]struct{}) activeVals := make(map[string]struct{})
@ -266,37 +234,13 @@ func TestValidatorSetChanges(t *testing.T) {
func TestReactorWithTimeoutCommit(t *testing.T) { func TestReactorWithTimeoutCommit(t *testing.T) {
N := 4 N := 4
css := randConsensusNet(N, "consensus_reactor_with_timeout_commit_test", newMockTickerFunc(false), newCounter) css := randConsensusNet(N, "consensus_reactor_with_timeout_commit_test", newMockTickerFunc(false), newCounter)
// override default SkipTimeoutCommit == true for tests // override default SkipTimeoutCommit == true for tests
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
css[i].timeoutParams.SkipTimeoutCommit = false css[i].timeoutParams.SkipTimeoutCommit = false
} }
reactors := make([]*ConsensusReactor, N-1)
eventChans := make([]chan interface{}, N-1)
for i := 0; i < N-1; i++ {
reactors[i] = NewConsensusReactor(css[i], true) // so we dont start the consensus states
eventSwitch := events.NewEventSwitch()
_, err := eventSwitch.Start()
if err != nil {
t.Fatalf("Failed to start switch: %v", err)
}
reactors[i].SetEventSwitch(eventSwitch)
eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1)
}
// make connected switches and start all reactors
p2p.MakeConnectedSwitches(N-1, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("CONSENSUS", reactors[i])
return s
}, p2p.Connect2Switches)
// start the state machines
for i := 0; i < N-1; i++ {
s := reactors[i].conS.GetState()
reactors[i].SwitchToConsensus(s)
}
reactors, eventChans := startConsensusNet(t, css, N-1, false)
defer stopConsensusNet(reactors)
// wait till everyone makes the first new block // wait till everyone makes the first new block
timeoutWaitGroup(t, N-1, func(wg *sync.WaitGroup, j int) { timeoutWaitGroup(t, N-1, func(wg *sync.WaitGroup, j int) {


Loading…
Cancel
Save