You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

180 lines
5.5 KiB

package consensus
import (
"fmt"
"sync"
"testing"
"time"
"github.com/tendermint/tendermint/config/tendermint_test"
"github.com/tendermint/go-events"
"github.com/tendermint/go-p2p"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tmsp/example/dummy"
)
func init() {
config = tendermint_test.ResetConfig("consensus_reactor_test")
}
//----------------------------------------------
// in-process testnets
// Ensure a testnet makes blocks
func TestReactor(t *testing.T) {
N := 4
css := randConsensusNet(N, "consensus_reactor_test", crankTimeoutPropose, newMockTickerFunc(true))
reactors := make([]*ConsensusReactor, N)
eventChans := make([]chan interface{}, N)
for i := 0; i < N; i++ {
reactors[i] = NewConsensusReactor(css[i], false)
eventSwitch := events.NewEventSwitch()
_, err := eventSwitch.Start()
if err != nil {
t.Fatalf("Failed to start switch: %v", err)
}
reactors[i].SetEventSwitch(eventSwitch)
eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1)
}
// make connected switches and start all reactors
p2p.MakeConnectedSwitches(N, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("CONSENSUS", reactors[i])
return s
}, p2p.Connect2Switches)
// wait till everyone makes the first new block
timeoutWaitGroup(t, N, func(wg *sync.WaitGroup, j int) {
<-eventChans[j]
wg.Done()
})
}
//-------------------------------------------------------------
// ensure we can make blocks despite cycling a validator set
func TestValidatorSetChanges(t *testing.T) {
nPeers := 8
nVals := 4
css := randConsensusNetWithPeers(nVals, nPeers, "consensus_val_set_changes_test", crankTimeoutPropose, newMockTickerFunc(true))
reactors := make([]*ConsensusReactor, nPeers)
eventChans := make([]chan interface{}, nPeers)
for i := 0; i < nPeers; i++ {
reactors[i] = NewConsensusReactor(css[i], true) // so we dont start the consensus states
eventSwitch := events.NewEventSwitch()
_, err := eventSwitch.Start()
if err != nil {
t.Fatalf("Failed to start switch: %v", err)
}
reactors[i].SetEventSwitch(eventSwitch)
eventChans[i] = subscribeToEventRespond(eventSwitch, "tester", types.EventStringNewBlock())
}
p2p.MakeConnectedSwitches(nPeers, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("CONSENSUS", reactors[i])
return s
}, p2p.Connect2Switches)
// now that everyone is connected, start the state machines
// (otherwise, we could block forever in firing new block while a peer is trying to
// access state info for AddPeer)
for i := 0; i < nPeers; i++ {
s := reactors[i].conS.GetState()
reactors[i].SwitchToConsensus(s)
}
// map of active validators
activeVals := make(map[string]struct{})
for i := 0; i < nVals; i++ {
activeVals[string(css[i].privValidator.GetAddress())] = struct{}{}
}
// wait till everyone makes block 1
timeoutWaitGroup(t, nPeers, func(wg *sync.WaitGroup, j int) {
<-eventChans[j]
eventChans[j] <- struct{}{}
wg.Done()
})
newValidatorPubKey := css[nVals].privValidator.(*types.PrivValidator).PubKey
newValidatorTx := dummy.MakeValSetChangeTx(newValidatorPubKey.Bytes(), uint64(testMinPower))
// wait till everyone makes block 2
// ensure the commit includes all validators
// send newValTx to change vals in block 3
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, newValidatorTx)
// wait till everyone makes block 3.
// it includes the commit for block 2, which is by the original validator set
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
// wait till everyone makes block 4.
// it includes the commit for block 3, which is by the original validator set
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
// the commits for block 4 should be with the updated validator set
activeVals[string(newValidatorPubKey.Address())] = struct{}{}
// wait till everyone makes block 5
// it includes the commit for block 4, which should have the updated validator set
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
// TODO: test more changes!
}
func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) {
timeoutWaitGroup(t, n, func(wg *sync.WaitGroup, j int) {
newBlockI := <-eventChans[j]
newBlock := newBlockI.(types.EventDataNewBlock).Block
log.Info("Got block", "height", newBlock.Height, "validator", j)
err := validateBlock(newBlock, activeVals)
if err != nil {
t.Fatal(err)
}
for _, tx := range txs {
css[j].mempool.CheckTx(tx, nil)
}
eventChans[j] <- struct{}{}
wg.Done()
})
}
// expects high synchrony!
func validateBlock(block *types.Block, activeVals map[string]struct{}) error {
if block.LastCommit.Size() != len(activeVals) {
return fmt.Errorf("Commit size doesn't match number of active validators. Got %d, expected %d", block.LastCommit.Size(), len(activeVals))
}
for _, vote := range block.LastCommit.Precommits {
if _, ok := activeVals[string(vote.ValidatorAddress)]; !ok {
return fmt.Errorf("Found vote for unactive validator %X", vote.ValidatorAddress)
}
}
return nil
}
func timeoutWaitGroup(t *testing.T, n int, f func(*sync.WaitGroup, int)) {
wg := new(sync.WaitGroup)
wg.Add(n)
for i := 0; i < n; i++ {
go f(wg, i)
}
// Make wait into a channel
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
tick := time.NewTicker(time.Second * 3)
select {
case <-done:
case <-tick.C:
t.Fatalf("Timed out waiting for all validators to commit a block")
}
}