You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

933 lines
25 KiB

package consensus
import (
"context"
"errors"
"fmt"
"os"
"path"
"sync"
"testing"
"time"
"github.com/fortytw2/leaktest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/abci/example/kvstore"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto/encoding"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/p2ptest"
tmpubsub "github.com/tendermint/tendermint/internal/pubsub"
sm "github.com/tendermint/tendermint/internal/state"
statemocks "github.com/tendermint/tendermint/internal/state/mocks"
"github.com/tendermint/tendermint/internal/store"
"github.com/tendermint/tendermint/internal/test/factory"
"github.com/tendermint/tendermint/libs/log"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
"github.com/tendermint/tendermint/types"
)
var (
defaultTestTime = time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC)
)
type reactorTestSuite struct {
network *p2ptest.Network
states map[types.NodeID]*State
reactors map[types.NodeID]*Reactor
subs map[types.NodeID]eventbus.Subscription
blocksyncSubs map[types.NodeID]eventbus.Subscription
stateChannels map[types.NodeID]*p2p.Channel
dataChannels map[types.NodeID]*p2p.Channel
voteChannels map[types.NodeID]*p2p.Channel
voteSetBitsChannels map[types.NodeID]*p2p.Channel
}
func chDesc(chID p2p.ChannelID, size int) *p2p.ChannelDescriptor {
return &p2p.ChannelDescriptor{
ID: chID,
MessageType: new(tmcons.Message),
RecvBufferCapacity: size,
}
}
func setup(
ctx context.Context,
t *testing.T,
numNodes int,
states []*State,
size int,
) *reactorTestSuite {
t.Helper()
rts := &reactorTestSuite{
network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes}),
states: make(map[types.NodeID]*State),
reactors: make(map[types.NodeID]*Reactor, numNodes),
subs: make(map[types.NodeID]eventbus.Subscription, numNodes),
blocksyncSubs: make(map[types.NodeID]eventbus.Subscription, numNodes),
}
rts.stateChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(StateChannel, size))
rts.dataChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(DataChannel, size))
rts.voteChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(VoteChannel, size))
rts.voteSetBitsChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc(VoteSetBitsChannel, size))
ctx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)
chCreator := func(nodeID types.NodeID) p2p.ChannelCreator {
return func(ctx context.Context, desc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
switch desc.ID {
case StateChannel:
return rts.stateChannels[nodeID], nil
case DataChannel:
return rts.dataChannels[nodeID], nil
case VoteChannel:
return rts.voteChannels[nodeID], nil
case VoteSetBitsChannel:
return rts.voteSetBitsChannels[nodeID], nil
default:
return nil, fmt.Errorf("invalid channel; %v", desc.ID)
}
}
}
i := 0
for nodeID, node := range rts.network.Nodes {
state := states[i]
reactor, err := NewReactor(ctx,
state.logger.With("node", nodeID),
state,
chCreator(nodeID),
node.MakePeerUpdates(ctx, t),
true,
NopMetrics(),
)
require.NoError(t, err)
reactor.SetEventBus(state.eventBus)
blocksSub, err := state.eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{
ClientID: testSubscriber,
Query: types.EventQueryNewBlock,
Limit: size,
})
require.NoError(t, err)
fsSub, err := state.eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{
ClientID: testSubscriber,
Query: types.EventQueryBlockSyncStatus,
Limit: size,
})
require.NoError(t, err)
rts.states[nodeID] = state
rts.subs[nodeID] = blocksSub
rts.reactors[nodeID] = reactor
rts.blocksyncSubs[nodeID] = fsSub
// simulate handle initChain in handshake
if state.state.LastBlockHeight == 0 {
require.NoError(t, state.blockExec.Store().Save(state.state))
}
require.NoError(t, reactor.Start(ctx))
require.True(t, reactor.IsRunning())
t.Cleanup(reactor.Wait)
i++
}
require.Len(t, rts.reactors, numNodes)
// start the in-memory network and connect all peers with each other
rts.network.Start(ctx, t)
t.Cleanup(leaktest.Check(t))
return rts
}
func validateBlock(block *types.Block, activeVals map[string]struct{}) error {
if block.LastCommit.Size() != len(activeVals) {
return fmt.Errorf(
"commit size doesn't match number of active validators. Got %d, expected %d",
block.LastCommit.Size(), len(activeVals),
)
}
for _, commitSig := range block.LastCommit.Signatures {
if _, ok := activeVals[string(commitSig.ValidatorAddress)]; !ok {
return fmt.Errorf("found vote for inactive validator %X", commitSig.ValidatorAddress)
}
}
return nil
}
func waitForAndValidateBlock(
bctx context.Context,
t *testing.T,
n int,
activeVals map[string]struct{},
blocksSubs []eventbus.Subscription,
states []*State,
txs ...[]byte,
) {
t.Helper()
ctx, cancel := context.WithCancel(bctx)
defer cancel()
fn := func(j int) {
msg, err := blocksSubs[j].Next(ctx)
switch {
case errors.Is(err, context.DeadlineExceeded):
return
case errors.Is(err, context.Canceled):
return
case err != nil:
cancel() // terminate other workers
require.NoError(t, err)
return
}
newBlock := msg.Data().(types.EventDataNewBlock).Block
require.NoError(t, validateBlock(newBlock, activeVals))
for _, tx := range txs {
err := assertMempool(t, states[j].txNotifier).CheckTx(ctx, tx, nil, mempool.TxInfo{})
if errors.Is(err, types.ErrTxInCache) {
continue
}
require.NoError(t, err)
}
}
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func(j int) {
defer wg.Done()
fn(j)
}(i)
}
wg.Wait()
if err := ctx.Err(); errors.Is(err, context.DeadlineExceeded) {
t.Fatal("encountered timeout")
}
}
func waitForAndValidateBlockWithTx(
bctx context.Context,
t *testing.T,
n int,
activeVals map[string]struct{},
blocksSubs []eventbus.Subscription,
states []*State,
txs ...[]byte,
) {
t.Helper()
ctx, cancel := context.WithCancel(bctx)
defer cancel()
fn := func(j int) {
ntxs := 0
for {
msg, err := blocksSubs[j].Next(ctx)
switch {
case errors.Is(err, context.DeadlineExceeded):
return
case errors.Is(err, context.Canceled):
return
case err != nil:
cancel() // terminate other workers
t.Fatalf("problem waiting for %d subscription: %v", j, err)
return
}
newBlock := msg.Data().(types.EventDataNewBlock).Block
require.NoError(t, validateBlock(newBlock, activeVals))
// check that txs match the txs we're waiting for.
// note they could be spread over multiple blocks,
// but they should be in order.
for _, tx := range newBlock.Data.Txs {
require.EqualValues(t, txs[ntxs], tx)
ntxs++
}
if ntxs == len(txs) {
break
}
}
}
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func(j int) {
defer wg.Done()
fn(j)
}(i)
}
wg.Wait()
if err := ctx.Err(); errors.Is(err, context.DeadlineExceeded) {
t.Fatal("encountered timeout")
}
}
func waitForBlockWithUpdatedValsAndValidateIt(
bctx context.Context,
t *testing.T,
n int,
updatedVals map[string]struct{},
blocksSubs []eventbus.Subscription,
css []*State,
) {
t.Helper()
ctx, cancel := context.WithCancel(bctx)
defer cancel()
fn := func(j int) {
var newBlock *types.Block
for {
msg, err := blocksSubs[j].Next(ctx)
switch {
case errors.Is(err, context.DeadlineExceeded):
return
case errors.Is(err, context.Canceled):
return
case err != nil:
cancel() // terminate other workers
t.Fatalf("problem waiting for %d subscription: %v", j, err)
return
}
newBlock = msg.Data().(types.EventDataNewBlock).Block
if newBlock.LastCommit.Size() == len(updatedVals) {
break
}
}
require.NoError(t, validateBlock(newBlock, updatedVals))
}
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func(j int) {
defer wg.Done()
fn(j)
}(i)
}
wg.Wait()
if err := ctx.Err(); errors.Is(err, context.DeadlineExceeded) {
t.Fatal("encountered timeout")
}
}
func ensureBlockSyncStatus(t *testing.T, msg tmpubsub.Message, complete bool, height int64) {
t.Helper()
status, ok := msg.Data().(types.EventDataBlockSyncStatus)
require.True(t, ok)
require.Equal(t, complete, status.Complete)
require.Equal(t, height, status.Height)
}
func TestReactorBasic(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
cfg := configSetup(t)
n := 4
states, cleanup := makeConsensusState(ctx, t,
cfg, n, "consensus_reactor_test",
newMockTickerFunc(true))
t.Cleanup(cleanup)
rts := setup(ctx, t, n, states, 100) // buffer must be large enough to not deadlock
for _, reactor := range rts.reactors {
state := reactor.state.GetState()
reactor.SwitchToConsensus(ctx, state, false)
}
var wg sync.WaitGroup
errCh := make(chan error, len(rts.subs))
for _, sub := range rts.subs {
wg.Add(1)
// wait till everyone makes the first new block
go func(s eventbus.Subscription) {
defer wg.Done()
_, err := s.Next(ctx)
switch {
case errors.Is(err, context.DeadlineExceeded):
return
case errors.Is(err, context.Canceled):
return
case err != nil:
errCh <- err
cancel() // terminate other workers
return
}
}(sub)
}
wg.Wait()
if err := ctx.Err(); errors.Is(err, context.DeadlineExceeded) {
t.Fatal("encountered timeout")
}
select {
case err := <-errCh:
if err != nil {
t.Fatal(err)
}
default:
}
errCh = make(chan error, len(rts.blocksyncSubs))
for _, sub := range rts.blocksyncSubs {
wg.Add(1)
// wait till everyone makes the consensus switch
go func(s eventbus.Subscription) {
defer wg.Done()
msg, err := s.Next(ctx)
switch {
case errors.Is(err, context.DeadlineExceeded):
return
case errors.Is(err, context.Canceled):
return
case err != nil:
errCh <- err
cancel() // terminate other workers
return
}
ensureBlockSyncStatus(t, msg, true, 0)
}(sub)
}
wg.Wait()
if err := ctx.Err(); errors.Is(err, context.DeadlineExceeded) {
t.Fatal("encountered timeout")
}
select {
case err := <-errCh:
if err != nil {
t.Fatal(err)
}
default:
}
}
func TestReactorWithEvidence(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
cfg := configSetup(t)
n := 4
testName := "consensus_reactor_test"
tickerFunc := newMockTickerFunc(true)
valSet, privVals := factory.ValidatorSet(ctx, t, n, 30)
genDoc := factory.GenesisDoc(cfg, time.Now(), valSet.Validators, nil)
states := make([]*State, n)
logger := consensusLogger()
for i := 0; i < n; i++ {
stateDB := dbm.NewMemDB() // each state needs its own db
stateStore := sm.NewStore(stateDB)
state, err := sm.MakeGenesisState(genDoc)
require.NoError(t, err)
thisConfig, err := ResetConfig(t.TempDir(), fmt.Sprintf("%s_%d", testName, i))
require.NoError(t, err)
defer os.RemoveAll(thisConfig.RootDir)
ensureDir(t, path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
app := kvstore.NewApplication()
vals := types.TM2PB.ValidatorUpdates(state.Validators)
app.InitChain(abci.RequestInitChain{Validators: vals})
pv := privVals[i]
blockDB := dbm.NewMemDB()
blockStore := store.NewBlockStore(blockDB)
// one for mempool, one for consensus
proxyAppConnMem := abciclient.NewLocalClient(logger, app)
proxyAppConnCon := abciclient.NewLocalClient(logger, app)
mempool := mempool.NewTxMempool(
log.TestingLogger().With("module", "mempool"),
thisConfig.Mempool,
proxyAppConnMem,
0,
)
if thisConfig.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable()
}
// mock the evidence pool
// everyone includes evidence of another double signing
vIdx := (i + 1) % n
ev, err := types.NewMockDuplicateVoteEvidenceWithValidator(ctx, 1, defaultTestTime, privVals[vIdx], cfg.ChainID())
require.NoError(t, err)
evpool := &statemocks.EvidencePool{}
evpool.On("CheckEvidence", ctx, mock.AnythingOfType("types.EvidenceList")).Return(nil)
evpool.On("PendingEvidence", mock.AnythingOfType("int64")).Return([]types.Evidence{
ev}, int64(len(ev.Bytes())))
evpool.On("Update", ctx, mock.AnythingOfType("state.State"), mock.AnythingOfType("types.EvidenceList")).Return()
evpool2 := sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore)
cs := NewState(ctx, logger.With("validator", i, "module", "consensus"),
thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2)
cs.SetPrivValidator(ctx, pv)
eventBus := eventbus.NewDefault(log.TestingLogger().With("module", "events"))
require.NoError(t, eventBus.Start(ctx))
cs.SetEventBus(eventBus)
cs.SetTimeoutTicker(tickerFunc())
states[i] = cs
}
rts := setup(ctx, t, n, states, 100) // buffer must be large enough to not deadlock
for _, reactor := range rts.reactors {
state := reactor.state.GetState()
reactor.SwitchToConsensus(ctx, state, false)
}
var wg sync.WaitGroup
for _, sub := range rts.subs {
wg.Add(1)
// We expect for each validator that is the proposer to propose one piece of
// evidence.
go func(s eventbus.Subscription) {
defer wg.Done()
msg, err := s.Next(ctx)
if !assert.NoError(t, err) {
cancel()
return
}
block := msg.Data().(types.EventDataNewBlock).Block
require.Len(t, block.Evidence, 1)
}(sub)
}
wg.Wait()
}
func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
cfg := configSetup(t)
n := 4
states, cleanup := makeConsensusState(ctx,
t,
cfg,
n,
"consensus_reactor_test",
newMockTickerFunc(true),
func(c *config.Config) {
c.Consensus.CreateEmptyBlocks = false
},
)
t.Cleanup(cleanup)
rts := setup(ctx, t, n, states, 100) // buffer must be large enough to not deadlock
for _, reactor := range rts.reactors {
state := reactor.state.GetState()
reactor.SwitchToConsensus(ctx, state, false)
}
// send a tx
require.NoError(
t,
assertMempool(t, states[3].txNotifier).CheckTx(
ctx,
[]byte{1, 2, 3},
nil,
mempool.TxInfo{},
),
)
var wg sync.WaitGroup
for _, sub := range rts.subs {
wg.Add(1)
// wait till everyone makes the first new block
go func(s eventbus.Subscription) {
defer wg.Done()
_, err := s.Next(ctx)
if !assert.NoError(t, err) {
cancel()
}
}(sub)
}
wg.Wait()
}
func TestReactorRecordsVotesAndBlockParts(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
cfg := configSetup(t)
n := 4
states, cleanup := makeConsensusState(ctx, t,
cfg, n, "consensus_reactor_test",
newMockTickerFunc(true))
t.Cleanup(cleanup)
rts := setup(ctx, t, n, states, 100) // buffer must be large enough to not deadlock
for _, reactor := range rts.reactors {
state := reactor.state.GetState()
reactor.SwitchToConsensus(ctx, state, false)
}
var wg sync.WaitGroup
for _, sub := range rts.subs {
wg.Add(1)
// wait till everyone makes the first new block
go func(s eventbus.Subscription) {
defer wg.Done()
_, err := s.Next(ctx)
if !assert.NoError(t, err) {
cancel()
}
}(sub)
}
wg.Wait()
// Require at least one node to have sent block parts, but we can't know which
// peer sent it.
require.Eventually(
t,
func() bool {
for _, reactor := range rts.reactors {
for _, ps := range reactor.peers {
if ps.BlockPartsSent() > 0 {
return true
}
}
}
return false
},
time.Second,
10*time.Millisecond,
"number of block parts sent should've increased",
)
nodeID := rts.network.RandomNode().NodeID
reactor := rts.reactors[nodeID]
peers := rts.network.Peers(nodeID)
ps, ok := reactor.GetPeerState(peers[0].NodeID)
require.True(t, ok)
require.NotNil(t, ps)
require.Greater(t, ps.VotesSent(), 0, "number of votes sent should've increased")
}
func TestReactorVotingPowerChange(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
cfg := configSetup(t)
n := 4
states, cleanup := makeConsensusState(ctx,
t,
cfg,
n,
"consensus_voting_power_changes_test",
newMockTickerFunc(true),
)
t.Cleanup(cleanup)
rts := setup(ctx, t, n, states, 100) // buffer must be large enough to not deadlock
for _, reactor := range rts.reactors {
state := reactor.state.GetState()
reactor.SwitchToConsensus(ctx, state, false)
}
// map of active validators
activeVals := make(map[string]struct{})
for i := 0; i < n; i++ {
pubKey, err := states[i].privValidator.GetPubKey(ctx)
require.NoError(t, err)
addr := pubKey.Address()
activeVals[string(addr)] = struct{}{}
}
var wg sync.WaitGroup
for _, sub := range rts.subs {
wg.Add(1)
// wait till everyone makes the first new block
go func(s eventbus.Subscription) {
defer wg.Done()
_, err := s.Next(ctx)
if !assert.NoError(t, err) {
cancel()
}
}(sub)
}
wg.Wait()
blocksSubs := []eventbus.Subscription{}
for _, sub := range rts.subs {
blocksSubs = append(blocksSubs, sub)
}
val1PubKey, err := states[0].privValidator.GetPubKey(ctx)
require.NoError(t, err)
val1PubKeyABCI, err := encoding.PubKeyToProto(val1PubKey)
require.NoError(t, err)
updateValidatorTx := kvstore.MakeValSetChangeTx(val1PubKeyABCI, 25)
previousTotalVotingPower := states[0].GetRoundState().LastValidators.TotalVotingPower()
waitForAndValidateBlock(ctx, t, n, activeVals, blocksSubs, states, updateValidatorTx)
waitForAndValidateBlockWithTx(ctx, t, n, activeVals, blocksSubs, states, updateValidatorTx)
waitForAndValidateBlock(ctx, t, n, activeVals, blocksSubs, states)
waitForAndValidateBlock(ctx, t, n, activeVals, blocksSubs, states)
require.NotEqualf(
t, previousTotalVotingPower, states[0].GetRoundState().LastValidators.TotalVotingPower(),
"expected voting power to change (before: %d, after: %d)",
previousTotalVotingPower,
states[0].GetRoundState().LastValidators.TotalVotingPower(),
)
updateValidatorTx = kvstore.MakeValSetChangeTx(val1PubKeyABCI, 2)
previousTotalVotingPower = states[0].GetRoundState().LastValidators.TotalVotingPower()
waitForAndValidateBlock(ctx, t, n, activeVals, blocksSubs, states, updateValidatorTx)
waitForAndValidateBlockWithTx(ctx, t, n, activeVals, blocksSubs, states, updateValidatorTx)
waitForAndValidateBlock(ctx, t, n, activeVals, blocksSubs, states)
waitForAndValidateBlock(ctx, t, n, activeVals, blocksSubs, states)
require.NotEqualf(
t, states[0].GetRoundState().LastValidators.TotalVotingPower(), previousTotalVotingPower,
"expected voting power to change (before: %d, after: %d)",
previousTotalVotingPower, states[0].GetRoundState().LastValidators.TotalVotingPower(),
)
updateValidatorTx = kvstore.MakeValSetChangeTx(val1PubKeyABCI, 26)
previousTotalVotingPower = states[0].GetRoundState().LastValidators.TotalVotingPower()
waitForAndValidateBlock(ctx, t, n, activeVals, blocksSubs, states, updateValidatorTx)
waitForAndValidateBlockWithTx(ctx, t, n, activeVals, blocksSubs, states, updateValidatorTx)
waitForAndValidateBlock(ctx, t, n, activeVals, blocksSubs, states)
waitForAndValidateBlock(ctx, t, n, activeVals, blocksSubs, states)
require.NotEqualf(
t, previousTotalVotingPower, states[0].GetRoundState().LastValidators.TotalVotingPower(),
"expected voting power to change (before: %d, after: %d)",
previousTotalVotingPower,
states[0].GetRoundState().LastValidators.TotalVotingPower(),
)
}
func TestReactorValidatorSetChanges(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
cfg := configSetup(t)
nPeers := 7
nVals := 4
states, _, _, cleanup := randConsensusNetWithPeers(
ctx,
t,
cfg,
nVals,
nPeers,
"consensus_val_set_changes_test",
newMockTickerFunc(true),
newEpehemeralKVStore,
)
t.Cleanup(cleanup)
rts := setup(ctx, t, nPeers, states, 100) // buffer must be large enough to not deadlock
for _, reactor := range rts.reactors {
state := reactor.state.GetState()
reactor.SwitchToConsensus(ctx, state, false)
}
// map of active validators
activeVals := make(map[string]struct{})
for i := 0; i < nVals; i++ {
pubKey, err := states[i].privValidator.GetPubKey(ctx)
require.NoError(t, err)
activeVals[string(pubKey.Address())] = struct{}{}
}
var wg sync.WaitGroup
for _, sub := range rts.subs {
wg.Add(1)
// wait till everyone makes the first new block
go func(s eventbus.Subscription) {
defer wg.Done()
_, err := s.Next(ctx)
switch {
case err == nil:
case errors.Is(err, context.DeadlineExceeded):
default:
t.Log(err)
cancel()
}
}(sub)
}
wg.Wait()
// after the wait returns, either there was an error with a
// subscription (very unlikely, and causes the context to be
// canceled manually), there was a timeout and the test's root context
// was canceled (somewhat likely,) or the test can proceed
// (common.)
if err := ctx.Err(); errors.Is(err, context.DeadlineExceeded) {
t.Fatal("encountered timeout")
} else if errors.Is(err, context.Canceled) {
t.Fatal("subscription encountered unexpected error")
}
newValidatorPubKey1, err := states[nVals].privValidator.GetPubKey(ctx)
require.NoError(t, err)
valPubKey1ABCI, err := encoding.PubKeyToProto(newValidatorPubKey1)
require.NoError(t, err)
newValidatorTx1 := kvstore.MakeValSetChangeTx(valPubKey1ABCI, testMinPower)
blocksSubs := []eventbus.Subscription{}
for _, sub := range rts.subs {
blocksSubs = append(blocksSubs, sub)
}
// wait till everyone makes block 2
// ensure the commit includes all validators
// send newValTx to change vals in block 3
waitForAndValidateBlock(ctx, t, nPeers, activeVals, blocksSubs, states, newValidatorTx1)
// wait till everyone makes block 3.
// it includes the commit for block 2, which is by the original validator set
waitForAndValidateBlockWithTx(ctx, t, nPeers, activeVals, blocksSubs, states, newValidatorTx1)
// wait till everyone makes block 4.
// it includes the commit for block 3, which is by the original validator set
waitForAndValidateBlock(ctx, t, nPeers, activeVals, blocksSubs, states)
// the commits for block 4 should be with the updated validator set
activeVals[string(newValidatorPubKey1.Address())] = struct{}{}
// wait till everyone makes block 5
// it includes the commit for block 4, which should have the updated validator set
waitForBlockWithUpdatedValsAndValidateIt(ctx, t, nPeers, activeVals, blocksSubs, states)
updateValidatorPubKey1, err := states[nVals].privValidator.GetPubKey(ctx)
require.NoError(t, err)
updatePubKey1ABCI, err := encoding.PubKeyToProto(updateValidatorPubKey1)
require.NoError(t, err)
updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25)
previousTotalVotingPower := states[nVals].GetRoundState().LastValidators.TotalVotingPower()
waitForAndValidateBlock(ctx, t, nPeers, activeVals, blocksSubs, states, updateValidatorTx1)
waitForAndValidateBlockWithTx(ctx, t, nPeers, activeVals, blocksSubs, states, updateValidatorTx1)
waitForAndValidateBlock(ctx, t, nPeers, activeVals, blocksSubs, states)
waitForBlockWithUpdatedValsAndValidateIt(ctx, t, nPeers, activeVals, blocksSubs, states)
require.NotEqualf(
t, states[nVals].GetRoundState().LastValidators.TotalVotingPower(), previousTotalVotingPower,
"expected voting power to change (before: %d, after: %d)",
previousTotalVotingPower, states[nVals].GetRoundState().LastValidators.TotalVotingPower(),
)
newValidatorPubKey2, err := states[nVals+1].privValidator.GetPubKey(ctx)
require.NoError(t, err)
newVal2ABCI, err := encoding.PubKeyToProto(newValidatorPubKey2)
require.NoError(t, err)
newValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, testMinPower)
newValidatorPubKey3, err := states[nVals+2].privValidator.GetPubKey(ctx)
require.NoError(t, err)
newVal3ABCI, err := encoding.PubKeyToProto(newValidatorPubKey3)
require.NoError(t, err)
newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower)
waitForAndValidateBlock(ctx, t, nPeers, activeVals, blocksSubs, states, newValidatorTx2, newValidatorTx3)
waitForAndValidateBlockWithTx(ctx, t, nPeers, activeVals, blocksSubs, states, newValidatorTx2, newValidatorTx3)
waitForAndValidateBlock(ctx, t, nPeers, activeVals, blocksSubs, states)
activeVals[string(newValidatorPubKey2.Address())] = struct{}{}
activeVals[string(newValidatorPubKey3.Address())] = struct{}{}
waitForBlockWithUpdatedValsAndValidateIt(ctx, t, nPeers, activeVals, blocksSubs, states)
removeValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, 0)
removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0)
waitForAndValidateBlock(ctx, t, nPeers, activeVals, blocksSubs, states, removeValidatorTx2, removeValidatorTx3)
waitForAndValidateBlockWithTx(ctx, t, nPeers, activeVals, blocksSubs, states, removeValidatorTx2, removeValidatorTx3)
waitForAndValidateBlock(ctx, t, nPeers, activeVals, blocksSubs, states)
delete(activeVals, string(newValidatorPubKey2.Address()))
delete(activeVals, string(newValidatorPubKey3.Address()))
waitForBlockWithUpdatedValsAndValidateIt(ctx, t, nPeers, activeVals, blocksSubs, states)
}