From 2363d8897949919fafec4992047ef803bc5b8ee4 Mon Sep 17 00:00:00 2001 From: Zarko Milosevic Date: Fri, 12 Oct 2018 22:13:01 +0200 Subject: [PATCH] consensus: Wait for proposal or timeout before prevote (#2540) * Fix termination issues and improve tests * Improve formatting and tests based on reviewer feedback --- CHANGELOG_PENDING.md | 2 + config/config.go | 2 +- consensus/common_test.go | 155 +++++++++-- consensus/mempool_test.go | 36 ++- consensus/state.go | 52 ++-- consensus/state_test.go | 567 ++++++++++++++++++++------------------ node/node.go | 2 +- p2p/metrics.go | 1 - state/execution.go | 32 +-- state/metrics.go | 2 +- 10 files changed, 500 insertions(+), 351 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index f12684020..0f919bdaa 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -50,6 +50,8 @@ BUG FIXES: - [node] \#2434 Make node respond to signal interrupts while sleeping for genesis time - [consensus] [\#1690](https://github.com/tendermint/tendermint/issues/1690) wait for timeoutPrecommit before starting next round +- [consensus] [\#1745](https://github.com/tendermint/tendermint/issues/1745) wait for +Proposal or timeoutProposal before entering prevote - [evidence] \#2515 fix db iter leak (@goolAdapter) - [common/bit_array] Fixed a bug in the `Or` function - [common/bit_array] Fixed a bug in the `Sub` function (@james-ray) diff --git a/config/config.go b/config/config.go index f2bac5c6f..ede57207c 100644 --- a/config/config.go +++ b/config/config.go @@ -565,7 +565,7 @@ func DefaultConsensusConfig() *ConsensusConfig { // TestConsensusConfig returns a configuration for testing the consensus service func TestConsensusConfig() *ConsensusConfig { cfg := DefaultConsensusConfig() - cfg.TimeoutPropose = 100 * time.Millisecond + cfg.TimeoutPropose = 40 * time.Millisecond cfg.TimeoutProposeDelta = 1 * time.Millisecond cfg.TimeoutPrevote = 10 * time.Millisecond cfg.TimeoutPrevoteDelta = 1 * time.Millisecond diff --git a/consensus/common_test.go b/consensus/common_test.go index 2a5cc8e79..cf76e924d 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -39,8 +39,8 @@ const ( ) // genesis, chain_id, priv_val -var config *cfg.Config // NOTE: must be reset for each _test.go file -var ensureTimeout = time.Second * 1 // must be in seconds because CreateEmptyBlocksInterval is +var config *cfg.Config // NOTE: must be reset for each _test.go file +var ensureTimeout = time.Millisecond * 100 func ensureDir(dir string, mode os.FileMode) { if err := cmn.EnsureDir(dir, mode); err != nil { @@ -317,67 +317,156 @@ func ensureNoNewEvent(ch <-chan interface{}, timeout time.Duration, } } -func ensureNoNewStep(stepCh <-chan interface{}) { - ensureNoNewEvent(stepCh, ensureTimeout, "We should be stuck waiting, "+ - "not moving to the next step") +func ensureNoNewEventOnChannel(ch <-chan interface{}) { + ensureNoNewEvent( + ch, + ensureTimeout, + "We should be stuck waiting, not receiving new event on the channel") +} + +func ensureNoNewRoundStep(stepCh <-chan interface{}) { + ensureNoNewEvent( + stepCh, + ensureTimeout, + "We should be stuck waiting, not receiving NewRoundStep event") +} + +func ensureNoNewUnlock(unlockCh <-chan interface{}) { + ensureNoNewEvent( + unlockCh, + ensureTimeout, + "We should be stuck waiting, not receiving Unlock event") } func ensureNoNewTimeout(stepCh <-chan interface{}, timeout int64) { timeoutDuration := time.Duration(timeout*5) * time.Nanosecond - ensureNoNewEvent(stepCh, timeoutDuration, "We should be stuck waiting, "+ - "not moving to the next step") + ensureNoNewEvent( + stepCh, + timeoutDuration, + "We should be stuck waiting, not receiving NewTimeout event") } -func ensureNewEvent(ch <-chan interface{}, timeout time.Duration, errorMessage string) { +func ensureNewEvent( + ch <-chan interface{}, + height int64, + round int, + timeout time.Duration, + errorMessage string) { + select { case <-time.After(timeout): panic(errorMessage) - case <-ch: - break + case ev := <-ch: + rs, ok := ev.(types.EventDataRoundState) + if !ok { + panic( + fmt.Sprintf( + "expected a EventDataRoundState, got %v.Wrong subscription channel?", + reflect.TypeOf(rs))) + } + if rs.Height != height { + panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height)) + } + if rs.Round != round { + panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round)) + } + // TODO: We could check also for a step at this point! } } -func ensureNewStep(stepCh <-chan interface{}) { - ensureNewEvent(stepCh, ensureTimeout, +func ensureNewRoundStep(stepCh <-chan interface{}, height int64, round int) { + ensureNewEvent( + stepCh, + height, + round, + ensureTimeout, "Timeout expired while waiting for NewStep event") } -func ensureNewRound(roundCh <-chan interface{}) { - ensureNewEvent(roundCh, ensureTimeout, +func ensureNewVote(voteCh <-chan interface{}, height int64, round int) { + select { + case <-time.After(ensureTimeout): + break + case v := <-voteCh: + edv, ok := v.(types.EventDataVote) + if !ok { + panic(fmt.Sprintf("expected a *types.Vote, "+ + "got %v. wrong subscription channel?", + reflect.TypeOf(v))) + } + vote := edv.Vote + if vote.Height != height { + panic(fmt.Sprintf("expected height %v, got %v", height, vote.Height)) + } + if vote.Round != round { + panic(fmt.Sprintf("expected round %v, got %v", round, vote.Round)) + } + } +} + +func ensureNewRound(roundCh <-chan interface{}, height int64, round int) { + ensureNewEvent(roundCh, height, round, ensureTimeout, "Timeout expired while waiting for NewRound event") } -func ensureNewTimeout(timeoutCh <-chan interface{}, timeout int64) { - timeoutDuration := time.Duration(timeout*5) * time.Nanosecond - ensureNewEvent(timeoutCh, timeoutDuration, +func ensureNewTimeout(timeoutCh <-chan interface{}, height int64, round int, timeout int64) { + timeoutDuration := time.Duration(timeout*3) * time.Nanosecond + ensureNewEvent(timeoutCh, height, round, timeoutDuration, "Timeout expired while waiting for NewTimeout event") } -func ensureNewProposal(proposalCh <-chan interface{}) { - ensureNewEvent(proposalCh, ensureTimeout, +func ensureNewProposal(proposalCh <-chan interface{}, height int64, round int) { + ensureNewEvent(proposalCh, height, round, ensureTimeout, "Timeout expired while waiting for NewProposal event") } -func ensureNewBlock(blockCh <-chan interface{}) { - ensureNewEvent(blockCh, ensureTimeout, - "Timeout expired while waiting for NewBlock event") +func ensureNewBlock(blockCh <-chan interface{}, height int64) { + select { + case <-time.After(ensureTimeout): + panic("Timeout expired while waiting for NewBlock event") + case ev := <-blockCh: + block, ok := ev.(types.EventDataNewBlock) + if !ok { + panic(fmt.Sprintf("expected a *types.EventDataNewBlock, "+ + "got %v. wrong subscription channel?", + reflect.TypeOf(block))) + } + if block.Block.Height != height { + panic(fmt.Sprintf("expected height %v, got %v", height, block.Block.Height)) + } + } } -func ensureNewVote(voteCh <-chan interface{}) { - ensureNewEvent(voteCh, ensureTimeout, - "Timeout expired while waiting for NewVote event") +func ensureNewBlockHeader(blockCh <-chan interface{}, height int64, blockHash cmn.HexBytes) { + select { + case <-time.After(ensureTimeout): + panic("Timeout expired while waiting for NewBlockHeader event") + case ev := <-blockCh: + blockHeader, ok := ev.(types.EventDataNewBlockHeader) + if !ok { + panic(fmt.Sprintf("expected a *types.EventDataNewBlockHeader, "+ + "got %v. wrong subscription channel?", + reflect.TypeOf(blockHeader))) + } + if blockHeader.Header.Height != height { + panic(fmt.Sprintf("expected height %v, got %v", height, blockHeader.Header.Height)) + } + if !bytes.Equal(blockHeader.Header.Hash(), blockHash) { + panic(fmt.Sprintf("expected header %X, got %X", blockHash, blockHeader.Header.Hash())) + } + } } -func ensureNewUnlock(unlockCh <-chan interface{}) { - ensureNewEvent(unlockCh, ensureTimeout, +func ensureNewUnlock(unlockCh <-chan interface{}, height int64, round int) { + ensureNewEvent(unlockCh, height, round, ensureTimeout, "Timeout expired while waiting for NewUnlock event") } -func ensureVote(voteCh chan interface{}, height int64, round int, +func ensureVote(voteCh <-chan interface{}, height int64, round int, voteType byte) { select { case <-time.After(ensureTimeout): - break + panic("Timeout expired while waiting for NewVote event") case v := <-voteCh: edv, ok := v.(types.EventDataVote) if !ok { @@ -398,6 +487,14 @@ func ensureVote(voteCh chan interface{}, height int64, round int, } } +func ensureNewEventOnChannel(ch <-chan interface{}) { + select { + case <-time.After(ensureTimeout): + panic("Timeout expired while waiting for new activity on the channel") + case <-ch: + } +} + //------------------------------------------------------------------------------- // consensus nets diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index 179766fd0..ed97ae681 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -28,12 +28,12 @@ func TestMempoolNoProgressUntilTxsAvailable(t *testing.T) { newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock) startTestRound(cs, height, round) - ensureNewStep(newBlockCh) // first block gets committed - ensureNoNewStep(newBlockCh) + ensureNewEventOnChannel(newBlockCh) // first block gets committed + ensureNoNewEventOnChannel(newBlockCh) deliverTxsRange(cs, 0, 1) - ensureNewStep(newBlockCh) // commit txs - ensureNewStep(newBlockCh) // commit updated app hash - ensureNoNewStep(newBlockCh) + ensureNewEventOnChannel(newBlockCh) // commit txs + ensureNewEventOnChannel(newBlockCh) // commit updated app hash + ensureNoNewEventOnChannel(newBlockCh) } func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) { @@ -46,9 +46,9 @@ func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) { newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock) startTestRound(cs, height, round) - ensureNewStep(newBlockCh) // first block gets committed - ensureNoNewStep(newBlockCh) // then we dont make a block ... - ensureNewStep(newBlockCh) // until the CreateEmptyBlocksInterval has passed + ensureNewEventOnChannel(newBlockCh) // first block gets committed + ensureNoNewEventOnChannel(newBlockCh) // then we dont make a block ... + ensureNewEventOnChannel(newBlockCh) // until the CreateEmptyBlocksInterval has passed } func TestMempoolProgressInHigherRound(t *testing.T) { @@ -72,13 +72,19 @@ func TestMempoolProgressInHigherRound(t *testing.T) { } startTestRound(cs, height, round) - ensureNewStep(newRoundCh) // first round at first height - ensureNewStep(newBlockCh) // first block gets committed - ensureNewStep(newRoundCh) // first round at next height - deliverTxsRange(cs, 0, 1) // we deliver txs, but dont set a proposal so we get the next round - <-timeoutCh - ensureNewStep(newRoundCh) // wait for the next round - ensureNewStep(newBlockCh) // now we can commit the block + ensureNewRoundStep(newRoundCh, height, round) // first round at first height + ensureNewEventOnChannel(newBlockCh) // first block gets committed + + height = height + 1 // moving to the next height + round = 0 + + ensureNewRoundStep(newRoundCh, height, round) // first round at next height + deliverTxsRange(cs, 0, 1) // we deliver txs, but dont set a proposal so we get the next round + ensureNewTimeout(timeoutCh, height, round, cs.config.TimeoutPropose.Nanoseconds()) + + round = round + 1 // moving to the next round + ensureNewRoundStep(newRoundCh, height, round) // wait for the next round + ensureNewEventOnChannel(newBlockCh) // now we can commit the block } func deliverTxsRange(cs *ConsensusState, start, end int) { diff --git a/consensus/state.go b/consensus/state.go index 0100a1504..022023ae3 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -83,7 +83,8 @@ type ConsensusState struct { // internal state mtx sync.RWMutex cstypes.RoundState - state sm.State // State until height-1. + triggeredTimeoutPrecommit bool + state sm.State // State until height-1. // state changes may be triggered by: msgs from peers, // msgs from ourself, or by timeouts @@ -711,6 +712,7 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs cstypes.RoundState) { cs.enterPrecommit(ti.Height, ti.Round) case cstypes.RoundStepPrecommitWait: cs.eventBus.PublishEventTimeoutWait(cs.RoundStateEvent()) + cs.enterPrecommit(ti.Height, ti.Round) cs.enterNewRound(ti.Height, ti.Round+1) default: panic(fmt.Sprintf("Invalid timeout step: %v", ti.Step)) @@ -772,6 +774,7 @@ func (cs *ConsensusState) enterNewRound(height int64, round int) { cs.ProposalBlockParts = nil } cs.Votes.SetRound(round + 1) // also track next round (round+1) to allow round-skipping + cs.triggeredTimeoutPrecommit = false cs.eventBus.PublishEventNewRound(cs.RoundStateEvent()) cs.metrics.Rounds.Set(float64(round)) @@ -782,7 +785,8 @@ func (cs *ConsensusState) enterNewRound(height int64, round int) { waitForTxs := cs.config.WaitForTxs() && round == 0 && !cs.needProofBlock(height) if waitForTxs { if cs.config.CreateEmptyBlocksInterval > 0 { - cs.scheduleTimeout(cs.config.CreateEmptyBlocksInterval, height, round, cstypes.RoundStepNewRound) + cs.scheduleTimeout(cs.config.CreateEmptyBlocksInterval, height, round, + cstypes.RoundStepNewRound) } go cs.proposalHeartbeat(height, round) } else { @@ -1013,6 +1017,7 @@ func (cs *ConsensusState) enterPrevote(height int64, round int) { func (cs *ConsensusState) defaultDoPrevote(height int64, round int) { logger := cs.Logger.With("height", height, "round", round) + // If a block is locked, prevote that. if cs.LockedBlock != nil { logger.Info("enterPrevote: Block was locked") @@ -1171,8 +1176,12 @@ func (cs *ConsensusState) enterPrecommit(height int64, round int) { func (cs *ConsensusState) enterPrecommitWait(height int64, round int) { logger := cs.Logger.With("height", height, "round", round) - if cs.Height != height || round < cs.Round || (cs.Round == round && cstypes.RoundStepPrecommitWait <= cs.Step) { - logger.Debug(fmt.Sprintf("enterPrecommitWait(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + if cs.Height != height || round < cs.Round || (cs.Round == round && cs.triggeredTimeoutPrecommit) { + logger.Debug( + fmt.Sprintf( + "enterPrecommitWait(%v/%v): Invalid args. "+ + "Current state is Height/Round: %v/%v/, triggeredTimeoutPrecommit:%v", + height, round, cs.Height, cs.Round, cs.triggeredTimeoutPrecommit)) return } if !cs.Votes.Precommits(round).HasTwoThirdsAny() { @@ -1182,7 +1191,7 @@ func (cs *ConsensusState) enterPrecommitWait(height int64, round int) { defer func() { // Done enterPrecommitWait: - cs.updateRoundStep(round, cstypes.RoundStepPrecommitWait) + cs.triggeredTimeoutPrecommit = true cs.newStep() }() @@ -1495,6 +1504,9 @@ func (cs *ConsensusState) addProposalBlockPart(msg *BlockPartMessage, peerID p2p if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() { // Move onto the next step cs.enterPrevote(height, cs.Round) + if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added + cs.enterPrecommit(height, cs.Round) + } } else if cs.Step == cstypes.RoundStepCommit { // If we're waiting on the proposal block... cs.tryFinalizeCommit(height) @@ -1609,7 +1621,7 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool, // Update Valid* if we can. // NOTE: our proposal block may be nil or not what received a polka.. // TODO: we may want to still update the ValidBlock and obtain it via gossipping - if !blockID.IsZero() && + if len(blockID.Hash) != 0 && (cs.ValidRound < vote.Round) && (vote.Round <= cs.Round) && cs.ProposalBlock.HashesTo(blockID.Hash) { @@ -1621,14 +1633,14 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool, } } - // If +2/3 prevotes for *anything* for this or future round: - if cs.Round <= vote.Round && prevotes.HasTwoThirdsAny() { - // Round-skip over to PrevoteWait or goto Precommit. - cs.enterNewRound(height, vote.Round) // if the vote is ahead of us + // If +2/3 prevotes for *anything* for future round: + if cs.Round < vote.Round && prevotes.HasTwoThirdsAny() { + // Round-skip if there is any 2/3+ of votes ahead of us + cs.enterNewRound(height, vote.Round) + } else if cs.Round == vote.Round && cstypes.RoundStepPrevote <= cs.Step { // current round if prevotes.HasTwoThirdsMajority() { cs.enterPrecommit(height, vote.Round) - } else { - cs.enterPrevote(height, vote.Round) // if the vote is ahead of us + } else if prevotes.HasTwoThirdsAny() { cs.enterPrevoteWait(height, vote.Round) } } else if cs.Proposal != nil && 0 <= cs.Proposal.POLRound && cs.Proposal.POLRound == vote.Round { @@ -1641,21 +1653,25 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool, case types.VoteTypePrecommit: precommits := cs.Votes.Precommits(vote.Round) cs.Logger.Info("Added to precommit", "vote", vote, "precommits", precommits.StringShort()) + blockID, ok := precommits.TwoThirdsMajority() - if ok && len(blockID.Hash) != 0 { + if ok { // Executed as TwoThirdsMajority could be from a higher round cs.enterNewRound(height, vote.Round) cs.enterPrecommit(height, vote.Round) - cs.enterCommit(height, vote.Round) - - if cs.config.SkipTimeoutCommit && precommits.HasAll() { - cs.enterNewRound(cs.Height, 0) + if len(blockID.Hash) != 0 { + cs.enterCommit(height, vote.Round) + if cs.config.SkipTimeoutCommit && precommits.HasAll() { + cs.enterNewRound(cs.Height, 0) + } + } else { + cs.enterPrecommitWait(height, vote.Round) } } else if cs.Round <= vote.Round && precommits.HasTwoThirdsAny() { cs.enterNewRound(height, vote.Round) - cs.enterPrecommit(height, vote.Round) cs.enterPrecommitWait(height, vote.Round) } + default: panic(fmt.Sprintf("Unexpected vote type %X", vote.Type)) // go-wire should prevent this. } diff --git a/consensus/state_test.go b/consensus/state_test.go index e7d4b4fab..d80b0c8ae 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" cstypes "github.com/tendermint/tendermint/consensus/types" @@ -68,7 +69,7 @@ func TestStateProposerSelection0(t *testing.T) { startTestRound(cs1, height, round) // Wait for new round so proposer is set. - ensureNewRound(newRoundCh) + ensureNewRound(newRoundCh, height, round) // Commit a block and ensure proposer for the next height is correct. prop := cs1.GetRoundState().Validators.GetProposer() @@ -77,13 +78,13 @@ func TestStateProposerSelection0(t *testing.T) { } // Wait for complete proposal. - ensureNewProposal(proposalCh) + ensureNewProposal(proposalCh, height, round) rs := cs1.GetRoundState() signAddVotes(cs1, types.VoteTypePrecommit, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), vss[1:]...) // Wait for new round so next validator is set. - ensureNewRound(newRoundCh) + ensureNewRound(newRoundCh, height+1, 0) prop = cs1.GetRoundState().Validators.GetProposer() if !bytes.Equal(prop.Address, vss[1].GetAddress()) { @@ -94,27 +95,29 @@ func TestStateProposerSelection0(t *testing.T) { // Now let's do it all again, but starting from round 2 instead of 0 func TestStateProposerSelection2(t *testing.T) { cs1, vss := randConsensusState(4) // test needs more work for more than 3 validators - + height := cs1.Height newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound) // this time we jump in at round 2 incrementRound(vss[1:]...) incrementRound(vss[1:]...) - startTestRound(cs1, cs1.Height, 2) - ensureNewRound(newRoundCh) // wait for the new round + round := 2 + startTestRound(cs1, height, round) + + ensureNewRound(newRoundCh, height, round) // wait for the new round // everyone just votes nil. we get a new proposer each round for i := 0; i < len(vss); i++ { prop := cs1.GetRoundState().Validators.GetProposer() - correctProposer := vss[(i+2)%len(vss)].GetAddress() + correctProposer := vss[(i+round)%len(vss)].GetAddress() if !bytes.Equal(prop.Address, correctProposer) { panic(fmt.Sprintf("expected RoundState.Validators.GetProposer() to be validator %d. Got %X", (i+2)%len(vss), prop.Address)) } rs := cs1.GetRoundState() signAddVotes(cs1, types.VoteTypePrecommit, nil, rs.ProposalBlockParts.Header(), vss[1:]...) - ensureNewRound(newRoundCh) // wait for the new round event each round + ensureNewRound(newRoundCh, height, i+round+1) // wait for the new round event each round incrementRound(vss[1:]...) } @@ -132,7 +135,7 @@ func TestStateEnterProposeNoPrivValidator(t *testing.T) { startTestRound(cs, height, round) // if we're not a validator, EnterPropose should timeout - ensureNewTimeout(timeoutCh, cs.config.TimeoutPropose.Nanoseconds()) + ensureNewTimeout(timeoutCh, height, round, cs.config.TimeoutPropose.Nanoseconds()) if cs.GetRoundState().Proposal != nil { t.Error("Expected to make no proposal, since no privValidator") @@ -152,7 +155,7 @@ func TestStateEnterProposeYesPrivValidator(t *testing.T) { cs.enterNewRound(height, round) cs.startRoutines(3) - ensureNewProposal(proposalCh) + ensureNewProposal(proposalCh, height, round) // Check that Proposal, ProposalBlock, ProposalBlockParts are set. rs := cs.GetRoundState() @@ -208,22 +211,19 @@ func TestStateBadProposal(t *testing.T) { startTestRound(cs1, height, round) // wait for proposal - ensureNewProposal(proposalCh) + ensureNewProposal(proposalCh, height, round) // wait for prevote - ensureNewVote(voteCh) - + ensureVote(voteCh, height, round, types.VoteTypePrevote) validatePrevote(t, cs1, round, vss[0], nil) // add bad prevote from vs2 and wait for it signAddVotes(cs1, types.VoteTypePrevote, propBlock.Hash(), propBlock.MakePartSet(partSize).Header(), vs2) - ensureNewVote(voteCh) + ensureVote(voteCh, height, round, types.VoteTypePrevote) // wait for precommit - ensureNewVote(voteCh) - + ensureVote(voteCh, height, round, types.VoteTypePrecommit) validatePrecommit(t, cs1, round, 0, vss[0], nil, nil) - signAddVotes(cs1, types.VoteTypePrecommit, propBlock.Hash(), propBlock.MakePartSet(partSize).Header(), vs2) } //---------------------------------------------------------------------------------------------------- @@ -246,21 +246,21 @@ func TestStateFullRound1(t *testing.T) { propCh := subscribe(cs.eventBus, types.EventQueryCompleteProposal) newRoundCh := subscribe(cs.eventBus, types.EventQueryNewRound) + // Maybe it would be better to call explicitly startRoutines(4) startTestRound(cs, height, round) - ensureNewRound(newRoundCh) + ensureNewRound(newRoundCh, height, round) - // grab proposal - re := <-propCh - propBlockHash := re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState).ProposalBlock.Hash() + ensureNewProposal(propCh, height, round) + propBlockHash := cs.GetRoundState().ProposalBlock.Hash() - ensureNewVote(voteCh) // wait for prevote + ensureVote(voteCh, height, round, types.VoteTypePrevote) // wait for prevote validatePrevote(t, cs, round, vss[0], propBlockHash) - ensureNewVote(voteCh) // wait for precommit + ensureVote(voteCh, height, round, types.VoteTypePrecommit) // wait for precommit // we're going to roll right into new height - ensureNewRound(newRoundCh) + ensureNewRound(newRoundCh, height+1, 0) validateLastPrecommit(t, cs, vss[0], propBlockHash) } @@ -275,8 +275,8 @@ func TestStateFullRoundNil(t *testing.T) { cs.enterPrevote(height, round) cs.startRoutines(4) - ensureNewVote(voteCh) // prevote - ensureNewVote(voteCh) // precommit + ensureVote(voteCh, height, round, types.VoteTypePrevote) // prevote + ensureVote(voteCh, height, round, types.VoteTypePrecommit) // precommit // should prevote and precommit nil validatePrevoteAndPrecommit(t, cs, round, 0, vss[0], nil, nil) @@ -295,7 +295,7 @@ func TestStateFullRound2(t *testing.T) { // start round and wait for propose and prevote startTestRound(cs1, height, round) - ensureNewVote(voteCh) // prevote + ensureVote(voteCh, height, round, types.VoteTypePrevote) // prevote // we should be stuck in limbo waiting for more prevotes rs := cs1.GetRoundState() @@ -303,10 +303,9 @@ func TestStateFullRound2(t *testing.T) { // prevote arrives from vs2: signAddVotes(cs1, types.VoteTypePrevote, propBlockHash, propPartsHeader, vs2) - ensureNewVote(voteCh) - - ensureNewVote(voteCh) //precommit + ensureVote(voteCh, height, round, types.VoteTypePrevote) // prevote + ensureVote(voteCh, height, round, types.VoteTypePrecommit) //precommit // the proposed block should now be locked and our precommit added validatePrecommit(t, cs1, 0, 0, vss[0], propBlockHash, propBlockHash) @@ -314,10 +313,10 @@ func TestStateFullRound2(t *testing.T) { // precommit arrives from vs2: signAddVotes(cs1, types.VoteTypePrecommit, propBlockHash, propPartsHeader, vs2) - ensureNewVote(voteCh) + ensureVote(voteCh, height, round, types.VoteTypePrecommit) // wait to finish commit, propose in next height - ensureNewBlock(newBlockCh) + ensureNewBlock(newBlockCh, height) } //------------------------------------------------------------------------------------------ @@ -328,7 +327,7 @@ func TestStateFullRound2(t *testing.T) { func TestStateLockNoPOL(t *testing.T) { cs1, vss := randConsensusState(2) vs2 := vss[1] - height := cs1.Height + height, round := cs1.Height, cs1.Round partSize := types.BlockPartSizeBytes @@ -343,41 +342,43 @@ func TestStateLockNoPOL(t *testing.T) { */ // start round and wait for prevote - cs1.enterNewRound(height, 0) + cs1.enterNewRound(height, round) cs1.startRoutines(0) - re := <-proposalCh - rs := re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState) - theBlockHash := rs.ProposalBlock.Hash() + ensureNewRound(newRoundCh, height, round) + + ensureNewProposal(proposalCh, height, round) + roundState := cs1.GetRoundState() + theBlockHash := roundState.ProposalBlock.Hash() + thePartSetHeader := roundState.ProposalBlockParts.Header() - ensureNewVote(voteCh) // prevote + ensureVote(voteCh, height, round, types.VoteTypePrevote) // prevote // we should now be stuck in limbo forever, waiting for more prevotes // prevote arrives from vs2: - signAddVotes(cs1, types.VoteTypePrevote, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header(), vs2) - ensureNewVote(voteCh) // prevote - - ensureNewVote(voteCh) // precommit + signAddVotes(cs1, types.VoteTypePrevote, theBlockHash, thePartSetHeader, vs2) + ensureVote(voteCh, height, round, types.VoteTypePrevote) // prevote + ensureVote(voteCh, height, round, types.VoteTypePrecommit) // precommit // the proposed block should now be locked and our precommit added - validatePrecommit(t, cs1, 0, 0, vss[0], theBlockHash, theBlockHash) + validatePrecommit(t, cs1, round, round, vss[0], theBlockHash, theBlockHash) // we should now be stuck in limbo forever, waiting for more precommits // lets add one for a different block - // NOTE: in practice we should never get to a point where there are precommits for different blocks at the same round hash := make([]byte, len(theBlockHash)) copy(hash, theBlockHash) hash[0] = byte((hash[0] + 1) % 255) - signAddVotes(cs1, types.VoteTypePrecommit, hash, rs.ProposalBlock.MakePartSet(partSize).Header(), vs2) - ensureNewVote(voteCh) // precommit + signAddVotes(cs1, types.VoteTypePrecommit, hash, thePartSetHeader, vs2) + ensureVote(voteCh, height, round, types.VoteTypePrecommit) // precommit // (note we're entering precommit for a second time this round) // but with invalid args. then we enterPrecommitWait, and the timeout to new round - ensureNewTimeout(timeoutWaitCh, cs1.config.TimeoutPrecommit.Nanoseconds()) + ensureNewTimeout(timeoutWaitCh, height, round, cs1.config.TimeoutPrecommit.Nanoseconds()) /// - ensureNewRound(newRoundCh) + round = round + 1 // moving to the next round + ensureNewRound(newRoundCh, height, round) t.Log("#### ONTO ROUND 1") /* Round2 (cs1, B) // B B2 @@ -386,43 +387,42 @@ func TestStateLockNoPOL(t *testing.T) { incrementRound(vs2) // now we're on a new round and not the proposer, so wait for timeout - re = <-timeoutProposeCh - rs = re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState) + ensureNewTimeout(timeoutProposeCh, height, round, cs1.config.TimeoutPropose.Nanoseconds()) + + rs := cs1.GetRoundState() if rs.ProposalBlock != nil { panic("Expected proposal block to be nil") } // wait to finish prevote - ensureNewVote(voteCh) - + ensureVote(voteCh, height, round, types.VoteTypePrevote) // we should have prevoted our locked block - validatePrevote(t, cs1, 1, vss[0], rs.LockedBlock.Hash()) + validatePrevote(t, cs1, round, vss[0], rs.LockedBlock.Hash()) // add a conflicting prevote from the other validator signAddVotes(cs1, types.VoteTypePrevote, hash, rs.LockedBlock.MakePartSet(partSize).Header(), vs2) - ensureNewVote(voteCh) + ensureVote(voteCh, height, round, types.VoteTypePrevote) // now we're going to enter prevote again, but with invalid args // and then prevote wait, which should timeout. then wait for precommit - ensureNewTimeout(timeoutWaitCh, cs1.config.TimeoutPrevote.Nanoseconds()) - - ensureNewVote(voteCh) // precommit + ensureNewTimeout(timeoutWaitCh, height, round, cs1.config.TimeoutPrevote.Nanoseconds()) + ensureVote(voteCh, height, round, types.VoteTypePrecommit) // precommit // the proposed block should still be locked and our precommit added // we should precommit nil and be locked on the proposal - validatePrecommit(t, cs1, 1, 0, vss[0], nil, theBlockHash) + validatePrecommit(t, cs1, round, 0, vss[0], nil, theBlockHash) // add conflicting precommit from vs2 - // NOTE: in practice we should never get to a point where there are precommits for different blocks at the same round signAddVotes(cs1, types.VoteTypePrecommit, hash, rs.LockedBlock.MakePartSet(partSize).Header(), vs2) - ensureNewVote(voteCh) + ensureVote(voteCh, height, round, types.VoteTypePrecommit) // (note we're entering precommit for a second time this round, but with invalid args // then we enterPrecommitWait and timeout into NewRound - ensureNewTimeout(timeoutWaitCh, cs1.config.TimeoutPrecommit.Nanoseconds()) + ensureNewTimeout(timeoutWaitCh, height, round, cs1.config.TimeoutPrecommit.Nanoseconds()) - ensureNewRound(newRoundCh) + round = round + 1 // entering new round + ensureNewRound(newRoundCh, height, round) t.Log("#### ONTO ROUND 2") /* Round3 (vs2, _) // B, B2 @@ -430,30 +430,29 @@ func TestStateLockNoPOL(t *testing.T) { incrementRound(vs2) - re = <-proposalCh - rs = re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState) + ensureNewProposal(proposalCh, height, round) + rs = cs1.GetRoundState() // now we're on a new round and are the proposer if !bytes.Equal(rs.ProposalBlock.Hash(), rs.LockedBlock.Hash()) { panic(fmt.Sprintf("Expected proposal block to be locked block. Got %v, Expected %v", rs.ProposalBlock, rs.LockedBlock)) } - ensureNewVote(voteCh) // prevote - - validatePrevote(t, cs1, 2, vss[0], rs.LockedBlock.Hash()) + ensureVote(voteCh, height, round, types.VoteTypePrevote) // prevote + validatePrevote(t, cs1, round, vss[0], rs.LockedBlock.Hash()) signAddVotes(cs1, types.VoteTypePrevote, hash, rs.ProposalBlock.MakePartSet(partSize).Header(), vs2) - ensureNewVote(voteCh) + ensureVote(voteCh, height, round, types.VoteTypePrevote) - ensureNewTimeout(timeoutWaitCh, cs1.config.TimeoutPrevote.Nanoseconds()) - ensureNewVote(voteCh) // precommit + ensureNewTimeout(timeoutWaitCh, height, round, cs1.config.TimeoutPrevote.Nanoseconds()) + ensureVote(voteCh, height, round, types.VoteTypePrecommit) // precommit - validatePrecommit(t, cs1, 2, 0, vss[0], nil, theBlockHash) // precommit nil but be locked on proposal + validatePrecommit(t, cs1, round, 0, vss[0], nil, theBlockHash) // precommit nil but be locked on proposal signAddVotes(cs1, types.VoteTypePrecommit, hash, rs.ProposalBlock.MakePartSet(partSize).Header(), vs2) // NOTE: conflicting precommits at same height - ensureNewVote(voteCh) + ensureVote(voteCh, height, round, types.VoteTypePrecommit) - ensureNewTimeout(timeoutWaitCh, cs1.config.TimeoutPrecommit.Nanoseconds()) + ensureNewTimeout(timeoutWaitCh, height, round, cs1.config.TimeoutPrecommit.Nanoseconds()) cs2, _ := randConsensusState(2) // needed so generated block is different than locked block // before we time out into new round, set next proposal block @@ -464,7 +463,8 @@ func TestStateLockNoPOL(t *testing.T) { incrementRound(vs2) - ensureNewRound(newRoundCh) + round = round + 1 // entering new round + ensureNewRound(newRoundCh, height, round) t.Log("#### ONTO ROUND 3") /* Round4 (vs2, C) // B C // B C @@ -476,35 +476,34 @@ func TestStateLockNoPOL(t *testing.T) { t.Fatal(err) } - ensureNewProposal(proposalCh) - ensureNewVote(voteCh) // prevote - + ensureNewProposal(proposalCh, height, round) + ensureVote(voteCh, height, round, types.VoteTypePrevote) // prevote // prevote for locked block (not proposal) validatePrevote(t, cs1, 3, vss[0], cs1.LockedBlock.Hash()) + // prevote for proposed block signAddVotes(cs1, types.VoteTypePrevote, propBlock.Hash(), propBlock.MakePartSet(partSize).Header(), vs2) - ensureNewVote(voteCh) - - ensureNewTimeout(timeoutWaitCh, cs1.config.TimeoutPrevote.Nanoseconds()) - ensureNewVote(voteCh) + ensureVote(voteCh, height, round, types.VoteTypePrevote) - validatePrecommit(t, cs1, 3, 0, vss[0], nil, theBlockHash) // precommit nil but locked on proposal + ensureNewTimeout(timeoutWaitCh, height, round, cs1.config.TimeoutPrevote.Nanoseconds()) + ensureVote(voteCh, height, round, types.VoteTypePrecommit) + validatePrecommit(t, cs1, round, 0, vss[0], nil, theBlockHash) // precommit nil but locked on proposal signAddVotes(cs1, types.VoteTypePrecommit, propBlock.Hash(), propBlock.MakePartSet(partSize).Header(), vs2) // NOTE: conflicting precommits at same height - ensureNewVote(voteCh) + ensureVote(voteCh, height, round, types.VoteTypePrecommit) } // 4 vals, one precommits, other 3 polka at next round, so we unlock and precomit the polka func TestStateLockPOLRelock(t *testing.T) { cs1, vss := randConsensusState(4) vs2, vs3, vs4 := vss[1], vss[2], vss[3] + height, round := cs1.Height, cs1.Round partSize := types.BlockPartSizeBytes - timeoutProposeCh := subscribe(cs1.eventBus, types.EventQueryTimeoutPropose) timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait) proposalCh := subscribe(cs1.eventBus, types.EventQueryCompleteProposal) - voteCh := subscribe(cs1.eventBus, types.EventQueryVote) + voteCh := subscribeToVoter(cs1, cs1.privValidator.GetAddress()) newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound) newBlockCh := subscribe(cs1.eventBus, types.EventQueryNewBlockHeader) @@ -517,28 +516,25 @@ func TestStateLockPOLRelock(t *testing.T) { */ // start round and wait for propose and prevote - startTestRound(cs1, cs1.Height, 0) + startTestRound(cs1, height, round) - ensureNewRound(newRoundCh) - re := <-proposalCh - rs := re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState) + ensureNewRound(newRoundCh, height, round) + ensureNewProposal(proposalCh, height, round) + rs := cs1.GetRoundState() theBlockHash := rs.ProposalBlock.Hash() + theBlockParts := rs.ProposalBlockParts.Header() - ensureNewVote(voteCh) // prevote + ensureVote(voteCh, height, round, types.VoteTypePrevote) // prevote - signAddVotes(cs1, types.VoteTypePrevote, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header(), vs2, vs3, vs4) - // prevotes - discardFromChan(voteCh, 3) + signAddVotes(cs1, types.VoteTypePrevote, theBlockHash, theBlockParts, vs2, vs3, vs4) - ensureNewVote(voteCh) // our precommit + ensureVote(voteCh, height, round, types.VoteTypePrecommit) // our precommit // the proposed block should now be locked and our precommit added - validatePrecommit(t, cs1, 0, 0, vss[0], theBlockHash, theBlockHash) + validatePrecommit(t, cs1, round, round, vss[0], theBlockHash, theBlockHash) // add precommits from the rest signAddVotes(cs1, types.VoteTypePrecommit, nil, types.PartSetHeader{}, vs2, vs4) - signAddVotes(cs1, types.VoteTypePrecommit, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header(), vs3) - // precommites - discardFromChan(voteCh, 3) + signAddVotes(cs1, types.VoteTypePrecommit, theBlockHash, theBlockParts, vs3) // before we timeout to the new round set the new proposal prop, propBlock := decideProposal(cs1, vs2, vs2.Height, vs2.Round+1) @@ -548,14 +544,15 @@ func TestStateLockPOLRelock(t *testing.T) { incrementRound(vs2, vs3, vs4) // timeout to new round - ensureNewTimeout(timeoutWaitCh, cs1.config.TimeoutPrecommit.Nanoseconds()) + ensureNewTimeout(timeoutWaitCh, height, round, cs1.config.TimeoutPrecommit.Nanoseconds()) + round = round + 1 // moving to the next round //XXX: this isnt guaranteed to get there before the timeoutPropose ... if err := cs1.SetProposalAndBlock(prop, propBlock, propBlockParts, "some peer"); err != nil { t.Fatal(err) } - ensureNewRound(newRoundCh) + ensureNewRound(newRoundCh, height, round) t.Log("### ONTO ROUND 1") /* @@ -566,60 +563,34 @@ func TestStateLockPOLRelock(t *testing.T) { // now we're on a new round and not the proposer // but we should receive the proposal - select { - case <-proposalCh: - case <-timeoutProposeCh: - <-proposalCh - } + ensureNewProposal(proposalCh, height, round) // go to prevote, prevote for locked block (not proposal), move on - ensureNewVote(voteCh) - validatePrevote(t, cs1, 0, vss[0], theBlockHash) + ensureVote(voteCh, height, round, types.VoteTypePrevote) + validatePrevote(t, cs1, round, vss[0], theBlockHash) // now lets add prevotes from everyone else for the new block signAddVotes(cs1, types.VoteTypePrevote, propBlockHash, propBlockParts.Header(), vs2, vs3, vs4) - // prevotes - discardFromChan(voteCh, 3) - - // now either we go to PrevoteWait or Precommit - select { - case <-timeoutWaitCh: // we're in PrevoteWait, go to Precommit - // XXX: there's no guarantee we see the polka, this might be a precommit for nil, - // in which case the test fails! - <-voteCh - case <-voteCh: // we went straight to Precommit - } + ensureVote(voteCh, height, round, types.VoteTypePrecommit) // we should have unlocked and locked on the new block - validatePrecommit(t, cs1, 1, 1, vss[0], propBlockHash, propBlockHash) + validatePrecommit(t, cs1, round, round, vss[0], propBlockHash, propBlockHash) signAddVotes(cs1, types.VoteTypePrecommit, propBlockHash, propBlockParts.Header(), vs2, vs3) - discardFromChan(voteCh, 2) - - be := <-newBlockCh - b := be.(types.EventDataNewBlockHeader) - re = <-newRoundCh - rs = re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState) - if rs.Height != 2 { - panic("Expected height to increment") - } + ensureNewBlockHeader(newBlockCh, height, propBlockHash) - if !bytes.Equal(b.Header.Hash(), propBlockHash) { - panic("Expected new block to be proposal block") - } + ensureNewRound(newRoundCh, height+1, 0) } // 4 vals, one precommits, other 3 polka at next round, so we unlock and precomit the polka func TestStateLockPOLUnlock(t *testing.T) { cs1, vss := randConsensusState(4) vs2, vs3, vs4 := vss[1], vss[2], vss[3] - h := cs1.GetRoundState().Height - r := cs1.GetRoundState().Round + height, round := cs1.Height, cs1.Round partSize := types.BlockPartSizeBytes proposalCh := subscribe(cs1.eventBus, types.EventQueryCompleteProposal) - timeoutProposeCh := subscribe(cs1.eventBus, types.EventQueryTimeoutPropose) timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait) newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound) unlockCh := subscribe(cs1.eventBus, types.EventQueryUnlock) @@ -634,75 +605,72 @@ func TestStateLockPOLUnlock(t *testing.T) { */ // start round and wait for propose and prevote - startTestRound(cs1, h, r) - ensureNewRound(newRoundCh) - re := <-proposalCh - rs := re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState) - theBlockHash := rs.ProposalBlock.Hash() + startTestRound(cs1, height, round) + ensureNewRound(newRoundCh, height, round) - ensureVote(voteCh, h, r, types.VoteTypePrevote) + ensureNewProposal(proposalCh, height, round) + rs := cs1.GetRoundState() + theBlockHash := rs.ProposalBlock.Hash() + theBlockParts := rs.ProposalBlockParts.Header() - signAddVotes(cs1, types.VoteTypePrevote, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header(), vs2, vs3, vs4) + ensureVote(voteCh, height, round, types.VoteTypePrevote) + validatePrevote(t, cs1, round, vss[0], theBlockHash) - ensureVote(voteCh, h, r, types.VoteTypePrecommit) + signAddVotes(cs1, types.VoteTypePrevote, theBlockHash, theBlockParts, vs2, vs3, vs4) + ensureVote(voteCh, height, round, types.VoteTypePrecommit) // the proposed block should now be locked and our precommit added - validatePrecommit(t, cs1, r, 0, vss[0], theBlockHash, theBlockHash) + validatePrecommit(t, cs1, round, round, vss[0], theBlockHash, theBlockHash) rs = cs1.GetRoundState() // add precommits from the rest signAddVotes(cs1, types.VoteTypePrecommit, nil, types.PartSetHeader{}, vs2, vs4) - signAddVotes(cs1, types.VoteTypePrecommit, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header(), vs3) + signAddVotes(cs1, types.VoteTypePrecommit, theBlockHash, theBlockParts, vs3) // before we time out into new round, set next proposal block prop, propBlock := decideProposal(cs1, vs2, vs2.Height, vs2.Round+1) propBlockParts := propBlock.MakePartSet(partSize) - incrementRound(vs2, vs3, vs4) - // timeout to new round - re = <-timeoutWaitCh - rs = re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState) + ensureNewTimeout(timeoutWaitCh, height, round, cs1.config.TimeoutPrecommit.Nanoseconds()) + rs = cs1.GetRoundState() lockedBlockHash := rs.LockedBlock.Hash() - //XXX: this isnt guaranteed to get there before the timeoutPropose ... - if err := cs1.SetProposalAndBlock(prop, propBlock, propBlockParts, "some peer"); err != nil { - t.Fatal(err) - } + incrementRound(vs2, vs3, vs4) + round = round + 1 // moving to the next round - ensureNewRound(newRoundCh) + ensureNewRound(newRoundCh, height, round) t.Log("#### ONTO ROUND 1") /* Round2 (vs2, C) // B nil nil nil // nil nil nil _ cs1 unlocks! */ - - // now we're on a new round and not the proposer, - // but we should receive the proposal - select { - case <-proposalCh: - case <-timeoutProposeCh: - <-proposalCh + //XXX: this isnt guaranteed to get there before the timeoutPropose ... + if err := cs1.SetProposalAndBlock(prop, propBlock, propBlockParts, "some peer"); err != nil { + t.Fatal(err) } + ensureNewProposal(proposalCh, height, round) + // go to prevote, prevote for locked block (not proposal) - ensureVote(voteCh, h, r+1, types.VoteTypePrevote) - validatePrevote(t, cs1, 0, vss[0], lockedBlockHash) + ensureVote(voteCh, height, round, types.VoteTypePrevote) + validatePrevote(t, cs1, round, vss[0], lockedBlockHash) + // now lets add prevotes from everyone else for nil (a polka!) signAddVotes(cs1, types.VoteTypePrevote, nil, types.PartSetHeader{}, vs2, vs3, vs4) // the polka makes us unlock and precommit nil - ensureNewUnlock(unlockCh) - ensureVote(voteCh, h, r+1, types.VoteTypePrecommit) + ensureNewUnlock(unlockCh, height, round) + ensureVote(voteCh, height, round, types.VoteTypePrecommit) // we should have unlocked and committed nil // NOTE: since we don't relock on nil, the lock round is 0 - validatePrecommit(t, cs1, r+1, 0, vss[0], nil, nil) + validatePrecommit(t, cs1, round, 0, vss[0], nil, nil) signAddVotes(cs1, types.VoteTypePrecommit, nil, types.PartSetHeader{}, vs2, vs3) - ensureNewRound(newRoundCh) + ensureNewRound(newRoundCh, height, round+1) } // 4 vals @@ -712,8 +680,7 @@ func TestStateLockPOLUnlock(t *testing.T) { func TestStateLockPOLSafety1(t *testing.T) { cs1, vss := randConsensusState(4) vs2, vs3, vs4 := vss[1], vss[2], vss[3] - h := cs1.GetRoundState().Height - r := cs1.GetRoundState().Round + height, round := cs1.Height, cs1.Round partSize := types.BlockPartSizeBytes @@ -724,38 +691,28 @@ func TestStateLockPOLSafety1(t *testing.T) { voteCh := subscribeToVoter(cs1, cs1.privValidator.GetAddress()) // start round and wait for propose and prevote - startTestRound(cs1, cs1.Height, 0) - ensureNewRound(newRoundCh) - re := <-proposalCh - rs := re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState) - propBlock := rs.ProposalBlock + startTestRound(cs1, cs1.Height, round) + ensureNewRound(newRoundCh, height, round) - ensureVote(voteCh, h, r, types.VoteTypePrevote) + ensureNewProposal(proposalCh, height, round) + rs := cs1.GetRoundState() + propBlock := rs.ProposalBlock - validatePrevote(t, cs1, 0, vss[0], propBlock.Hash()) + ensureVote(voteCh, height, round, types.VoteTypePrevote) + validatePrevote(t, cs1, round, vss[0], propBlock.Hash()) // the others sign a polka but we don't see it prevotes := signVotes(types.VoteTypePrevote, propBlock.Hash(), propBlock.MakePartSet(partSize).Header(), vs2, vs3, vs4) - // before we time out into new round, set next proposer - // and next proposal block - - //TODO: Should we remove this? - /* - _, v1 := cs1.Validators.GetByAddress(vss[0].Address) - v1.VotingPower = 1 - if updated := cs1.Validators.Update(v1); !updated { - panic("failed to update validator") - }*/ - t.Logf("old prop hash %v", fmt.Sprintf("%X", propBlock.Hash())) // we do see them precommit nil signAddVotes(cs1, types.VoteTypePrecommit, nil, types.PartSetHeader{}, vs2, vs3, vs4) - ensureVote(voteCh, h, r, types.VoteTypePrecommit) + // cs1 precommit nil + ensureVote(voteCh, height, round, types.VoteTypePrecommit) + ensureNewTimeout(timeoutWaitCh, height, round, cs1.config.TimeoutPrecommit.Nanoseconds()) - ensureNewRound(newRoundCh) t.Log("### ONTO ROUND 1") prop, propBlock := decideProposal(cs1, vs2, vs2.Height, vs2.Round+1) @@ -764,6 +721,9 @@ func TestStateLockPOLSafety1(t *testing.T) { incrementRound(vs2, vs3, vs4) + round = round + 1 // moving to the next round + ensureNewRound(newRoundCh, height, round) + //XXX: this isnt guaranteed to get there before the timeoutPropose ... if err := cs1.SetProposalAndBlock(prop, propBlock, propBlockParts, "some peer"); err != nil { t.Fatal(err) @@ -773,39 +733,34 @@ func TestStateLockPOLSafety1(t *testing.T) { // a polka happened but we didn't see it! */ - // now we're on a new round and not the proposer, - // but we should receive the proposal - select { - case re = <-proposalCh: - case <-timeoutProposeCh: - re = <-proposalCh - } + ensureNewProposal(proposalCh, height, round) - rs = re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState) + rs = cs1.GetRoundState() if rs.LockedBlock != nil { panic("we should not be locked!") } t.Logf("new prop hash %v", fmt.Sprintf("%X", propBlockHash)) + // go to prevote, prevote for proposal block - ensureVote(voteCh, h, r+1, types.VoteTypePrevote) - validatePrevote(t, cs1, 1, vss[0], propBlockHash) + ensureVote(voteCh, height, round, types.VoteTypePrevote) + validatePrevote(t, cs1, round, vss[0], propBlockHash) // now we see the others prevote for it, so we should lock on it signAddVotes(cs1, types.VoteTypePrevote, propBlockHash, propBlockParts.Header(), vs2, vs3, vs4) - ensureVote(voteCh, h, r+1, types.VoteTypePrecommit) - + ensureVote(voteCh, height, round, types.VoteTypePrecommit) // we should have precommitted - validatePrecommit(t, cs1, 1, 1, vss[0], propBlockHash, propBlockHash) + validatePrecommit(t, cs1, round, round, vss[0], propBlockHash, propBlockHash) - signAddVotes(cs1, types.VoteTypePrecommit, nil, types.PartSetHeader{}, vs2, vs3) + signAddVotes(cs1, types.VoteTypePrecommit, nil, types.PartSetHeader{}, vs2, vs3, vs4) - ensureNewTimeout(timeoutWaitCh, cs1.config.TimeoutPrecommit.Nanoseconds()) + ensureNewTimeout(timeoutWaitCh, height, round, cs1.config.TimeoutPrecommit.Nanoseconds()) incrementRound(vs2, vs3, vs4) + round = round + 1 // moving to the next round - ensureNewRound(newRoundCh) + ensureNewRound(newRoundCh, height, round) t.Log("### ONTO ROUND 2") /*Round3 @@ -813,22 +768,22 @@ func TestStateLockPOLSafety1(t *testing.T) { */ // timeout of propose - ensureNewTimeout(timeoutProposeCh, cs1.config.TimeoutPropose.Nanoseconds()) + ensureNewTimeout(timeoutProposeCh, height, round, cs1.config.TimeoutPropose.Nanoseconds()) // finish prevote - ensureVote(voteCh, h, r+2, types.VoteTypePrevote) - + ensureVote(voteCh, height, round, types.VoteTypePrevote) // we should prevote what we're locked on - validatePrevote(t, cs1, 2, vss[0], propBlockHash) + validatePrevote(t, cs1, round, vss[0], propBlockHash) newStepCh := subscribe(cs1.eventBus, types.EventQueryNewRoundStep) + // before prevotes from the previous round are added // add prevotes from the earlier round addVotes(cs1, prevotes...) t.Log("Done adding prevotes!") - ensureNoNewStep(newStepCh) + ensureNoNewRoundStep(newStepCh) } // 4 vals. @@ -841,13 +796,11 @@ func TestStateLockPOLSafety1(t *testing.T) { func TestStateLockPOLSafety2(t *testing.T) { cs1, vss := randConsensusState(4) vs2, vs3, vs4 := vss[1], vss[2], vss[3] - h := cs1.GetRoundState().Height - r := cs1.GetRoundState().Round + height, round := cs1.Height, cs1.Round partSize := types.BlockPartSizeBytes proposalCh := subscribe(cs1.eventBus, types.EventQueryCompleteProposal) - timeoutProposeCh := subscribe(cs1.eventBus, types.EventQueryTimeoutPropose) timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait) newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound) unlockCh := subscribe(cs1.eventBus, types.EventQueryUnlock) @@ -855,7 +808,7 @@ func TestStateLockPOLSafety2(t *testing.T) { // the block for R0: gets polkad but we miss it // (even though we signed it, shhh) - _, propBlock0 := decideProposal(cs1, vss[0], cs1.Height, cs1.Round) + _, propBlock0 := decideProposal(cs1, vss[0], height, round) propBlockHash0 := propBlock0.Hash() propBlockParts0 := propBlock0.MakePartSet(partSize) @@ -870,25 +823,25 @@ func TestStateLockPOLSafety2(t *testing.T) { incrementRound(vs2, vs3, vs4) - cs1.updateRoundStep(0, cstypes.RoundStepPrecommitWait) - + round = round + 1 // moving to the next round t.Log("### ONTO Round 1") // jump in at round 1 - startTestRound(cs1, h, r+1) - ensureNewRound(newRoundCh) + startTestRound(cs1, height, round) + ensureNewRound(newRoundCh, height, round) if err := cs1.SetProposalAndBlock(prop1, propBlock1, propBlockParts1, "some peer"); err != nil { t.Fatal(err) } - ensureNewProposal(proposalCh) + ensureNewProposal(proposalCh, height, round) - ensureVote(voteCh, h, r+1, types.VoteTypePrevote) + ensureVote(voteCh, height, round, types.VoteTypePrevote) + validatePrevote(t, cs1, round, vss[0], propBlockHash1) signAddVotes(cs1, types.VoteTypePrevote, propBlockHash1, propBlockParts1.Header(), vs2, vs3, vs4) - ensureVote(voteCh, h, r+1, types.VoteTypePrecommit) + ensureVote(voteCh, height, round, types.VoteTypePrecommit) // the proposed block should now be locked and our precommit added - validatePrecommit(t, cs1, 1, 1, vss[0], propBlockHash1, propBlockHash1) + validatePrecommit(t, cs1, round, round, vss[0], propBlockHash1, propBlockHash1) // add precommits from the rest signAddVotes(cs1, types.VoteTypePrecommit, nil, types.PartSetHeader{}, vs2, vs4) @@ -897,10 +850,11 @@ func TestStateLockPOLSafety2(t *testing.T) { incrementRound(vs2, vs3, vs4) // timeout of precommit wait to new round - ensureNewTimeout(timeoutWaitCh, cs1.config.TimeoutPrecommit.Nanoseconds()) + ensureNewTimeout(timeoutWaitCh, height, round, cs1.config.TimeoutPrecommit.Nanoseconds()) + round = round + 1 // moving to the next round // in round 2 we see the polkad block from round 0 - newProp := types.NewProposal(h, 2, propBlockParts0.Header(), 0, propBlockID1) + newProp := types.NewProposal(height, round, propBlockParts0.Header(), 0, propBlockID1) if err := vs3.SignProposal(config.ChainID(), newProp); err != nil { t.Fatal(err) } @@ -911,25 +865,16 @@ func TestStateLockPOLSafety2(t *testing.T) { // Add the pol votes addVotes(cs1, prevotes...) - ensureNewRound(newRoundCh) + ensureNewRound(newRoundCh, height, round) t.Log("### ONTO Round 2") /*Round2 // now we see the polka from round 1, but we shouldnt unlock */ + ensureNewProposal(proposalCh, height, round) - select { - case <-timeoutProposeCh: - <-proposalCh - case <-proposalCh: - } - - select { - case <-unlockCh: - panic("validator unlocked using an old polka") - case <-voteCh: - // prevote our locked block - } - validatePrevote(t, cs1, 2, vss[0], propBlockHash1) + ensureNoNewUnlock(unlockCh) + ensureVote(voteCh, height, round, types.VoteTypePrevote) + validatePrevote(t, cs1, round, vss[0], propBlockHash1) } @@ -939,18 +884,110 @@ func TestStateLockPOLSafety2(t *testing.T) { func TestWaitingTimeoutOnNilPolka(t *testing.T) { cs1, vss := randConsensusState(4) vs2, vs3, vs4 := vss[1], vss[2], vss[3] + height, round := cs1.Height, cs1.Round timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait) newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound) // start round - startTestRound(cs1, cs1.Height, 0) - ensureNewRound(newRoundCh) + startTestRound(cs1, height, round) + ensureNewRound(newRoundCh, height, round) signAddVotes(cs1, types.VoteTypePrecommit, nil, types.PartSetHeader{}, vs2, vs3, vs4) - ensureNewTimeout(timeoutWaitCh, cs1.config.TimeoutPrecommit.Nanoseconds()) - ensureNewRound(newRoundCh) + ensureNewTimeout(timeoutWaitCh, height, round, cs1.config.TimeoutPrecommit.Nanoseconds()) + ensureNewRound(newRoundCh, height, round+1) +} + +// 4 vals, 3 Prevotes for nil from the higher round. +// What we want: +// P0 waits for timeoutPropose in the next round before entering prevote +func TestWaitingTimeoutProposeOnNewRound(t *testing.T) { + cs1, vss := randConsensusState(4) + vs2, vs3, vs4 := vss[1], vss[2], vss[3] + height, round := cs1.Height, cs1.Round + + timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutPropose) + newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound) + voteCh := subscribeToVoter(cs1, cs1.privValidator.GetAddress()) + + // start round + startTestRound(cs1, height, round) + ensureNewRound(newRoundCh, height, round) + + ensureVote(voteCh, height, round, types.VoteTypePrevote) + + incrementRound(vss[1:]...) + signAddVotes(cs1, types.VoteTypePrevote, nil, types.PartSetHeader{}, vs2, vs3, vs4) + + round = round + 1 // moving to the next round + ensureNewRound(newRoundCh, height, round) + + rs := cs1.GetRoundState() + assert.True(t, rs.Step == cstypes.RoundStepPropose) // P0 does not prevote before timeoutPropose expires + + ensureNewTimeout(timeoutWaitCh, height, round, cs1.config.TimeoutPropose.Nanoseconds()) + + ensureVote(voteCh, height, round, types.VoteTypePrevote) + validatePrevote(t, cs1, round, vss[0], nil) +} + +// 4 vals, 3 Precommits for nil from the higher round. +// What we want: +// P0 jump to higher round, precommit and start precommit wait +func TestRoundSkipOnNilPolkaFromHigherRound(t *testing.T) { + cs1, vss := randConsensusState(4) + vs2, vs3, vs4 := vss[1], vss[2], vss[3] + height, round := cs1.Height, cs1.Round + + timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait) + newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound) + voteCh := subscribeToVoter(cs1, cs1.privValidator.GetAddress()) + + // start round + startTestRound(cs1, height, round) + ensureNewRound(newRoundCh, height, round) + + ensureVote(voteCh, height, round, types.VoteTypePrevote) + + incrementRound(vss[1:]...) + signAddVotes(cs1, types.VoteTypePrecommit, nil, types.PartSetHeader{}, vs2, vs3, vs4) + + round = round + 1 // moving to the next round + ensureNewRound(newRoundCh, height, round) + + ensureVote(voteCh, height, round, types.VoteTypePrecommit) + validatePrecommit(t, cs1, round, 0, vss[0], nil, nil) + + ensureNewTimeout(timeoutWaitCh, height, round, cs1.config.TimeoutPrecommit.Nanoseconds()) + + round = round + 1 // moving to the next round + ensureNewRound(newRoundCh, height, round) +} + +// 4 vals, 3 Prevotes for nil in the current round. +// What we want: +// P0 wait for timeoutPropose to expire before sending prevote. +func TestWaitTimeoutProposeOnNilPolkaForTheCurrentRound(t *testing.T) { + cs1, vss := randConsensusState(4) + vs2, vs3, vs4 := vss[1], vss[2], vss[3] + height, round := cs1.Height, 1 + + timeoutProposeCh := subscribe(cs1.eventBus, types.EventQueryTimeoutPropose) + newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound) + voteCh := subscribeToVoter(cs1, cs1.privValidator.GetAddress()) + + // start round in which PO is not proposer + startTestRound(cs1, height, round) + ensureNewRound(newRoundCh, height, round) + + incrementRound(vss[1:]...) + signAddVotes(cs1, types.VoteTypePrevote, nil, types.PartSetHeader{}, vs2, vs3, vs4) + + ensureNewTimeout(timeoutProposeCh, height, round, cs1.config.TimeoutPropose.Nanoseconds()) + + ensureVote(voteCh, height, round, types.VoteTypePrevote) + validatePrevote(t, cs1, round, vss[0], nil) } //------------------------------------------------------------------------------------------ @@ -1041,8 +1078,7 @@ func TestStateSlashingPrecommits(t *testing.T) { func TestStateHalt1(t *testing.T) { cs1, vss := randConsensusState(4) vs2, vs3, vs4 := vss[1], vss[2], vss[3] - h := cs1.GetRoundState().Height - r := cs1.GetRoundState().Round + height, round := cs1.Height, cs1.Round partSize := types.BlockPartSizeBytes proposalCh := subscribe(cs1.eventBus, types.EventQueryCompleteProposal) @@ -1052,20 +1088,21 @@ func TestStateHalt1(t *testing.T) { voteCh := subscribeToVoter(cs1, cs1.privValidator.GetAddress()) // start round and wait for propose and prevote - startTestRound(cs1, cs1.Height, 0) - ensureNewRound(newRoundCh) - re := <-proposalCh - rs := re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState) + startTestRound(cs1, height, round) + ensureNewRound(newRoundCh, height, round) + + ensureNewProposal(proposalCh, height, round) + rs := cs1.GetRoundState() propBlock := rs.ProposalBlock propBlockParts := propBlock.MakePartSet(partSize) - ensureVote(voteCh, h, r, types.VoteTypePrevote) + ensureVote(voteCh, height, round, types.VoteTypePrevote) - signAddVotes(cs1, types.VoteTypePrevote, propBlock.Hash(), propBlockParts.Header(), vs3, vs4) - ensureVote(voteCh, h, r, types.VoteTypePrecommit) + signAddVotes(cs1, types.VoteTypePrevote, propBlock.Hash(), propBlockParts.Header(), vs2, vs3, vs4) + ensureVote(voteCh, height, round, types.VoteTypePrecommit) // the proposed block should now be locked and our precommit added - validatePrecommit(t, cs1, 0, 0, vss[0], propBlock.Hash(), propBlock.Hash()) + validatePrecommit(t, cs1, round, round, vss[0], propBlock.Hash(), propBlock.Hash()) // add precommits from the rest signAddVotes(cs1, types.VoteTypePrecommit, nil, types.PartSetHeader{}, vs2) // didnt receive proposal @@ -1076,9 +1113,12 @@ func TestStateHalt1(t *testing.T) { incrementRound(vs2, vs3, vs4) // timeout to new round - ensureNewTimeout(timeoutWaitCh, cs1.config.TimeoutPrecommit.Nanoseconds()) - re = <-newRoundCh - rs = re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState) + ensureNewTimeout(timeoutWaitCh, height, round, cs1.config.TimeoutPrecommit.Nanoseconds()) + + round = round + 1 // moving to the next round + + ensureNewRound(newRoundCh, height, round) + rs = cs1.GetRoundState() t.Log("### ONTO ROUND 1") /*Round2 @@ -1087,20 +1127,16 @@ func TestStateHalt1(t *testing.T) { */ // go to prevote, prevote for locked block - ensureVote(voteCh, h, r+1, types.VoteTypePrevote) - validatePrevote(t, cs1, 0, vss[0], rs.LockedBlock.Hash()) + ensureVote(voteCh, height, round, types.VoteTypePrevote) + validatePrevote(t, cs1, round, vss[0], rs.LockedBlock.Hash()) // now we receive the precommit from the previous round addVotes(cs1, precommit4) // receiving that precommit should take us straight to commit - ensureNewBlock(newBlockCh) - re = <-newRoundCh - rs = re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState) + ensureNewBlock(newBlockCh, height) - if rs.Height != 2 { - panic("expected height to increment") - } + ensureNewRound(newRoundCh, height+1, 0) } func TestStateOutputsBlockPartsStats(t *testing.T) { @@ -1186,10 +1222,3 @@ func subscribe(eventBus *types.EventBus, q tmpubsub.Query) <-chan interface{} { } return out } - -// discardFromChan reads n values from the channel. -func discardFromChan(ch <-chan interface{}, n int) { - for i := 0; i < n; i++ { - <-ch - } -} diff --git a/node/node.go b/node/node.go index d1ab0f86a..9c409787d 100644 --- a/node/node.go +++ b/node/node.go @@ -106,7 +106,7 @@ func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider { return func() (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) { if config.Prometheus { return cs.PrometheusMetrics(config.Namespace), p2p.PrometheusMetrics(config.Namespace), - mempl.PrometheusMetrics(config.Namespace), sm.PrometheusMetrics(config.Namespace) + mempl.PrometheusMetrics(config.Namespace), sm.PrometheusMetrics(config.Namespace) } return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics() } diff --git a/p2p/metrics.go b/p2p/metrics.go index 86a205056..b066fb317 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -56,7 +56,6 @@ func PrometheusMetrics(namespace string) *Metrics { Name: "num_txs", Help: "Number of transactions submitted by each peer.", }, []string{"peer_id"}), - } } diff --git a/state/execution.go b/state/execution.go index d5a1a1617..611efa516 100644 --- a/state/execution.go +++ b/state/execution.go @@ -49,7 +49,7 @@ func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption { // NewBlockExecutor returns a new BlockExecutor with a NopEventBus. // Call SetEventBus to provide one. func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus, - mempool Mempool, evpool EvidencePool, options ...BlockExecutorOption) *BlockExecutor { + mempool Mempool, evpool EvidencePool, options ...BlockExecutorOption) *BlockExecutor { res := &BlockExecutor{ db: db, proxyApp: proxyApp, @@ -95,7 +95,7 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b startTime := time.Now().UnixNano() abciResponses, err := execBlockOnProxyApp(blockExec.logger, blockExec.proxyApp, block, state.LastValidators, blockExec.db) endTime := time.Now().UnixNano() - blockExec.metrics.BlockProcessingTime.Observe(float64(endTime - startTime) / 1000000) + blockExec.metrics.BlockProcessingTime.Observe(float64(endTime-startTime) / 1000000) if err != nil { return state, ErrProxyAppConn(err) } @@ -198,11 +198,11 @@ func (blockExec *BlockExecutor) Commit( // Executes block's transactions on proxyAppConn. // Returns a list of transaction results and updates to the validator set func execBlockOnProxyApp( - logger log.Logger, - proxyAppConn proxy.AppConnConsensus, - block *types.Block, - lastValSet *types.ValidatorSet, - stateDB dbm.DB, + logger log.Logger, + proxyAppConn proxy.AppConnConsensus, + block *types.Block, + lastValSet *types.ValidatorSet, + stateDB dbm.DB, ) (*ABCIResponses, error) { var validTxs, invalidTxs = 0, 0 @@ -360,10 +360,10 @@ func updateValidators(currentSet *types.ValidatorSet, abciUpdates []abci.Validat // updateState returns a new State updated according to the header and responses. func updateState( - state State, - blockID types.BlockID, - header *types.Header, - abciResponses *ABCIResponses, + state State, + blockID types.BlockID, + header *types.Header, + abciResponses *ABCIResponses, ) (State, error) { // Copy the valset so we can apply changes from EndBlock @@ -448,11 +448,11 @@ func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *ty // ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state. // It returns the application root hash (result of abci.Commit). func ExecCommitBlock( - appConnConsensus proxy.AppConnConsensus, - block *types.Block, - logger log.Logger, - lastValSet *types.ValidatorSet, - stateDB dbm.DB, + appConnConsensus proxy.AppConnConsensus, + block *types.Block, + logger log.Logger, + lastValSet *types.ValidatorSet, + stateDB dbm.DB, ) ([]byte, error) { _, err := execBlockOnProxyApp(logger, appConnConsensus, block, lastValSet, stateDB) if err != nil { diff --git a/state/metrics.go b/state/metrics.go index 7acbafa30..4e99753f0 100644 --- a/state/metrics.go +++ b/state/metrics.go @@ -2,9 +2,9 @@ package state import ( "github.com/go-kit/kit/metrics" + "github.com/go-kit/kit/metrics/discard" "github.com/go-kit/kit/metrics/prometheus" stdprometheus "github.com/prometheus/client_golang/prometheus" - "github.com/go-kit/kit/metrics/discard" ) const MetricsSubsystem = "state"