diff --git a/internal/consensus/invalid_test.go b/internal/consensus/invalid_test.go index 4fcda7274..3ed18e71c 100644 --- a/internal/consensus/invalid_test.go +++ b/internal/consensus/invalid_test.go @@ -26,7 +26,7 @@ func TestReactorInvalidPrecommit(t *testing.T) { config := configSetup(t) - n := 4 + n := 2 states, cleanup := makeConsensusState(ctx, t, config, n, "consensus_reactor_test", newMockTickerFunc(true)) diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index 3daf09dfe..773b20bde 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -355,7 +355,7 @@ func TestReactorBasic(t *testing.T) { cfg := configSetup(t) - n := 4 + n := 2 states, cleanup := makeConsensusState(ctx, t, cfg, n, "consensus_reactor_test", newMockTickerFunc(true)) @@ -368,7 +368,8 @@ func TestReactorBasic(t *testing.T) { } var wg sync.WaitGroup - errCh := make(chan error, len(rts.subs)) + bErrCh := make(chan error, len(rts.subs)) + bsErrCh := make(chan error, len(rts.blocksyncSubs)) for _, sub := range rts.subs { wg.Add(1) @@ -382,14 +383,16 @@ func TestReactorBasic(t *testing.T) { return case errors.Is(err, context.Canceled): return - case err != nil: + default: select { - case errCh <- err: + case bErrCh <- err: + if err != nil { + cancel() // terminate other workers + } + return case <-ctx.Done(): return } - cancel() // terminate other workers - return } }(sub) } @@ -401,20 +404,19 @@ func TestReactorBasic(t *testing.T) { select { case <-ctx.Done(): t.Fatal("encountered timeout") - case err := <-errCh: + case err := <-bErrCh: if err != nil { t.Fatal(err) } } case <-ctx.Done(): t.Fatal("encountered timeout") - case err := <-errCh: + case err := <-bErrCh: if err != nil { t.Fatal(err) } } - errCh = make(chan error, len(rts.blocksyncSubs)) for _, sub := range rts.blocksyncSubs { wg.Add(1) @@ -428,7 +430,7 @@ func TestReactorBasic(t *testing.T) { case errors.Is(err, context.Canceled): return case err != nil: - errCh <- err + bsErrCh <- err cancel() // terminate other workers return } @@ -442,7 +444,7 @@ func TestReactorBasic(t *testing.T) { } select { - case err := <-errCh: + case err := <-bsErrCh: if err != nil { t.Fatal(err) } @@ -456,7 +458,7 @@ func TestReactorWithEvidence(t *testing.T) { cfg := configSetup(t) - n := 4 + n := 2 testName := "consensus_reactor_test" tickerFunc := newMockTickerFunc(true) @@ -576,7 +578,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) { cfg := configSetup(t) - n := 4 + n := 2 states, cleanup := makeConsensusState(ctx, t, cfg, @@ -599,7 +601,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) { // send a tx require.NoError( t, - assertMempool(t, states[3].txNotifier).CheckTx( + assertMempool(t, states[1].txNotifier).CheckTx( ctx, []byte{1, 2, 3}, nil, @@ -642,7 +644,7 @@ func TestReactorRecordsVotesAndBlockParts(t *testing.T) { cfg := configSetup(t) - n := 4 + n := 2 states, cleanup := makeConsensusState(ctx, t, cfg, n, "consensus_reactor_test", newMockTickerFunc(true)) @@ -720,7 +722,7 @@ func TestReactorVotingPowerChange(t *testing.T) { cfg := configSetup(t) - n := 4 + n := 2 states, cleanup := makeConsensusState(ctx, t, cfg, @@ -795,11 +797,12 @@ func TestReactorVotingPowerChange(t *testing.T) { waitForAndValidateBlock(ctx, t, n, activeVals, blocksSubs, states) waitForAndValidateBlock(ctx, t, n, activeVals, blocksSubs, states) + newVotingPower := states[0].GetRoundState().LastValidators.TotalVotingPower() require.NotEqualf( - t, previousTotalVotingPower, states[0].GetRoundState().LastValidators.TotalVotingPower(), + t, previousTotalVotingPower, newVotingPower, "expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, - states[0].GetRoundState().LastValidators.TotalVotingPower(), + newVotingPower, ) updateValidatorTx = kvstore.MakeValSetChangeTx(val1PubKeyABCI, 2) @@ -833,13 +836,13 @@ func TestReactorVotingPowerChange(t *testing.T) { } func TestReactorValidatorSetChanges(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() cfg := configSetup(t) - nPeers := 7 - nVals := 4 + nPeers := 2 + nVals := 2 states, _, _, cleanup := randConsensusNetWithPeers( ctx, t, @@ -852,7 +855,7 @@ func TestReactorValidatorSetChanges(t *testing.T) { ) t.Cleanup(cleanup) - rts := setup(ctx, t, nPeers, states, 100) // buffer must be large enough to not deadlock + rts := setup(ctx, t, nPeers, states, 512) // buffer must be large enough to not deadlock for _, reactor := range rts.reactors { state := reactor.state.GetState() @@ -908,7 +911,7 @@ func TestReactorValidatorSetChanges(t *testing.T) { t.Fatal("subscription encountered unexpected error") } - newValidatorPubKey1, err := states[nVals].privValidator.GetPubKey(ctx) + newValidatorPubKey1, err := states[nVals-1].privValidator.GetPubKey(ctx) require.NoError(t, err) valPubKey1ABCI, err := encoding.PubKeyToProto(newValidatorPubKey1) @@ -941,14 +944,14 @@ func TestReactorValidatorSetChanges(t *testing.T) { // it includes the commit for block 4, which should have the updated validator set waitForBlockWithUpdatedValsAndValidateIt(ctx, t, nPeers, activeVals, blocksSubs, states) - updateValidatorPubKey1, err := states[nVals].privValidator.GetPubKey(ctx) + updateValidatorPubKey1, err := states[nVals-1].privValidator.GetPubKey(ctx) require.NoError(t, err) updatePubKey1ABCI, err := encoding.PubKeyToProto(updateValidatorPubKey1) require.NoError(t, err) updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25) - previousTotalVotingPower := states[nVals].GetRoundState().LastValidators.TotalVotingPower() + previousTotalVotingPower := states[nVals-1].GetRoundState().LastValidators.TotalVotingPower() waitForAndValidateBlock(ctx, t, nPeers, activeVals, blocksSubs, states, updateValidatorTx1) waitForAndValidateBlockWithTx(ctx, t, nPeers, activeVals, blocksSubs, states, updateValidatorTx1) @@ -956,12 +959,12 @@ func TestReactorValidatorSetChanges(t *testing.T) { waitForBlockWithUpdatedValsAndValidateIt(ctx, t, nPeers, activeVals, blocksSubs, states) require.NotEqualf( - t, states[nVals].GetRoundState().LastValidators.TotalVotingPower(), previousTotalVotingPower, + t, states[nVals-1].GetRoundState().LastValidators.TotalVotingPower(), previousTotalVotingPower, "expected voting power to change (before: %d, after: %d)", - previousTotalVotingPower, states[nVals].GetRoundState().LastValidators.TotalVotingPower(), + previousTotalVotingPower, states[nVals-1].GetRoundState().LastValidators.TotalVotingPower(), ) - newValidatorPubKey2, err := states[nVals+1].privValidator.GetPubKey(ctx) + newValidatorPubKey2, err := states[nVals-2].privValidator.GetPubKey(ctx) require.NoError(t, err) newVal2ABCI, err := encoding.PubKeyToProto(newValidatorPubKey2) @@ -969,7 +972,7 @@ func TestReactorValidatorSetChanges(t *testing.T) { newValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, testMinPower) - newValidatorPubKey3, err := states[nVals+2].privValidator.GetPubKey(ctx) + newValidatorPubKey3, err := states[nVals-2].privValidator.GetPubKey(ctx) require.NoError(t, err) newVal3ABCI, err := encoding.PubKeyToProto(newValidatorPubKey3) diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 99df337a3..3d8ecfdef 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -514,6 +514,8 @@ func (cs *State) OnStop() { } } + close(cs.getOnStopCh()) + if cs.timeoutTicker.IsRunning() { cs.timeoutTicker.Stop() } @@ -636,6 +638,8 @@ func (cs *State) SetProposalAndBlock( func (cs *State) updateHeight(height int64) { cs.metrics.Height.Set(float64(height)) + cs.mtx.Lock() + defer cs.mtx.Unlock() cs.Height = height } @@ -757,7 +761,13 @@ func (cs *State) updateToState(ctx context.Context, state sm.State) { switch { case state.LastBlockHeight == 0: // Very first commit should be empty. - cs.LastCommit = (*types.VoteSet)(nil) + func() { + cs.mtx.Lock() + defer cs.mtx.Unlock() + + cs.LastCommit = (*types.VoteSet)(nil) + }() + case cs.CommitRound > -1 && cs.Votes != nil: // Otherwise, use cs.Votes if !cs.Votes.Precommits(cs.CommitRound).HasTwoThirdsMajority() { panic(fmt.Sprintf( @@ -766,7 +776,12 @@ func (cs *State) updateToState(ctx context.Context, state sm.State) { )) } - cs.LastCommit = cs.Votes.Precommits(cs.CommitRound) + func() { + cs.mtx.Lock() + defer cs.mtx.Unlock() + + cs.LastCommit = cs.Votes.Precommits(cs.CommitRound) + }() case cs.LastCommit == nil: // NOTE: when Tendermint starts, it has no votes. reconstructLastCommit @@ -1802,8 +1817,13 @@ func (cs *State) enterCommit(ctx context.Context, height int64, commitRound int3 // Done enterCommit: // keep cs.Round the same, commitRound points to the right Precommits set. cs.updateRoundStep(cs.Round, cstypes.RoundStepCommit) - cs.CommitRound = commitRound - cs.CommitTime = tmtime.Now() + func() { + cs.mtx.Lock() + defer cs.mtx.Unlock() + + cs.CommitRound = commitRound + cs.CommitTime = tmtime.Now() + }() cs.newStep(ctx) // Maybe finalize immediately. @@ -2215,6 +2235,47 @@ func (cs *State) addProposalBlockPart( if err := cs.eventBus.PublishEventCompleteProposal(ctx, cs.CompleteProposalEvent()); err != nil { cs.logger.Error("failed publishing event complete proposal", "err", err) } + + // Update Valid* if we can. + prevotes := cs.Votes.Prevotes(cs.Round) + blockID, hasTwoThirds := prevotes.TwoThirdsMajority() + if hasTwoThirds && !blockID.IsNil() && (cs.ValidRound < cs.Round) { + if cs.ProposalBlock.HashesTo(blockID.Hash) { + cs.logger.Debug( + "updating valid block to new proposal block", + "valid_round", cs.Round, + "valid_block_hash", cs.ProposalBlock.Hash(), + ) + + func() { + cs.mtx.Lock() + defer cs.mtx.Unlock() + + cs.ValidRound = cs.Round + cs.ValidBlock = cs.ProposalBlock + cs.ValidBlockParts = cs.ProposalBlockParts + + }() + } + // TODO: In case there is +2/3 majority in Prevotes set for some + // block and cs.ProposalBlock contains different block, either + // proposer is faulty or voting power of faulty processes is more + // than 1/3. We should trigger in the future accountability + // procedure at this point. + } + + if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() { + // Move onto the next step + cs.enterPrevote(ctx, height, cs.Round) + if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added + cs.enterPrecommit(ctx, height, cs.Round) + } + } else if cs.Step == cstypes.RoundStepCommit { + // If we're waiting on the proposal block... + cs.tryFinalizeCommit(ctx, height) + } + + return added, nil } return added, nil @@ -2403,12 +2464,22 @@ func (cs *State) addVote( ) // we're getting the wrong block - cs.ProposalBlock = nil + func() { + cs.mtx.Lock() + defer cs.mtx.Unlock() + + cs.ProposalBlock = nil + }() } if !cs.ProposalBlockParts.HasHeader(blockID.PartSetHeader) { cs.metrics.MarkBlockGossipStarted() - cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader) + func() { + cs.mtx.Lock() + defer cs.mtx.Unlock() + + cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader) + }() } cs.evsw.FireEvent(ctx, types.EventValidBlockValue, &cs.RoundState)