From e550e1592464737b9f42d45870b20dc20ff5bdd1 Mon Sep 17 00:00:00 2001 From: tycho garen Date: Fri, 11 Mar 2022 09:33:15 -0500 Subject: [PATCH] shrink and fix --- internal/consensus/invalid_test.go | 2 +- internal/consensus/reactor_test.go | 61 ++++++++++++++++-------------- internal/consensus/state.go | 59 +++++++++++++++++++++-------- 3 files changed, 76 insertions(+), 46 deletions(-) diff --git a/internal/consensus/invalid_test.go b/internal/consensus/invalid_test.go index 4fcda7274..3ed18e71c 100644 --- a/internal/consensus/invalid_test.go +++ b/internal/consensus/invalid_test.go @@ -26,7 +26,7 @@ func TestReactorInvalidPrecommit(t *testing.T) { config := configSetup(t) - n := 4 + n := 2 states, cleanup := makeConsensusState(ctx, t, config, n, "consensus_reactor_test", newMockTickerFunc(true)) diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index 1a6b40b1b..3fd8347e5 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -356,7 +356,7 @@ func TestReactorBasic(t *testing.T) { cfg := configSetup(t) - n := 4 + n := 2 states, cleanup := makeConsensusState(ctx, t, cfg, n, "consensus_reactor_test", newMockTickerFunc(true)) @@ -369,7 +369,8 @@ func TestReactorBasic(t *testing.T) { } var wg sync.WaitGroup - errCh := make(chan error, len(rts.subs)) + bErrCh := make(chan error, len(rts.subs)) + bsErrCh := make(chan error, len(rts.blocksyncSubs)) for _, sub := range rts.subs { wg.Add(1) @@ -383,14 +384,16 @@ func TestReactorBasic(t *testing.T) { return case errors.Is(err, context.Canceled): return - case err != nil: + default: select { - case errCh <- err: + case bErrCh <- err: + if err != nil { + cancel() // terminate other workers + } + return case <-ctx.Done(): return } - cancel() // terminate other workers - return } }(sub) } @@ -402,20 +405,19 @@ func TestReactorBasic(t *testing.T) { select { case <-ctx.Done(): t.Fatal("encountered timeout") - case err := <-errCh: + case err := <-bErrCh: if err != nil { t.Fatal(err) } } case <-ctx.Done(): t.Fatal("encountered timeout") - case err := <-errCh: + case err := <-bErrCh: if err != nil { t.Fatal(err) } } - errCh = make(chan error, len(rts.blocksyncSubs)) for _, sub := range rts.blocksyncSubs { wg.Add(1) @@ -429,7 +431,7 @@ func TestReactorBasic(t *testing.T) { case errors.Is(err, context.Canceled): return case err != nil: - errCh <- err + bsErrCh <- err cancel() // terminate other workers return } @@ -443,7 +445,7 @@ func TestReactorBasic(t *testing.T) { } select { - case err := <-errCh: + case err := <-bsErrCh: if err != nil { t.Fatal(err) } @@ -457,7 +459,7 @@ func TestReactorWithEvidence(t *testing.T) { cfg := configSetup(t) - n := 4 + n := 2 testName := "consensus_reactor_test" tickerFunc := newMockTickerFunc(true) @@ -578,7 +580,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) { cfg := configSetup(t) - n := 4 + n := 2 states, cleanup := makeConsensusState(ctx, t, cfg, @@ -602,7 +604,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) { // send a tx require.NoError( t, - assertMempool(t, states[3].txNotifier).CheckTx( + assertMempool(t, states[1].txNotifier).CheckTx( ctx, []byte{1, 2, 3}, nil, @@ -645,7 +647,7 @@ func TestReactorRecordsVotesAndBlockParts(t *testing.T) { cfg := configSetup(t) - n := 4 + n := 2 states, cleanup := makeConsensusState(ctx, t, cfg, n, "consensus_reactor_test", newMockTickerFunc(true)) @@ -723,7 +725,7 @@ func TestReactorVotingPowerChange(t *testing.T) { cfg := configSetup(t) - n := 4 + n := 2 states, cleanup := makeConsensusState(ctx, t, cfg, @@ -798,11 +800,12 @@ func TestReactorVotingPowerChange(t *testing.T) { waitForAndValidateBlock(ctx, t, n, activeVals, blocksSubs, states) waitForAndValidateBlock(ctx, t, n, activeVals, blocksSubs, states) + newVotingPower := states[0].GetRoundState().LastValidators.TotalVotingPower() require.NotEqualf( - t, previousTotalVotingPower, states[0].GetRoundState().LastValidators.TotalVotingPower(), + t, previousTotalVotingPower, newVotingPower, "expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, - states[0].GetRoundState().LastValidators.TotalVotingPower(), + newVotingPower, ) updateValidatorTx = kvstore.MakeValSetChangeTx(val1PubKeyABCI, 2) @@ -836,13 +839,13 @@ func TestReactorVotingPowerChange(t *testing.T) { } func TestReactorValidatorSetChanges(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() cfg := configSetup(t) - nPeers := 7 - nVals := 4 + nPeers := 2 + nVals := 2 states, _, _, cleanup := randConsensusNetWithPeers( ctx, t, @@ -855,7 +858,7 @@ func TestReactorValidatorSetChanges(t *testing.T) { ) t.Cleanup(cleanup) - rts := setup(ctx, t, nPeers, states, 100) // buffer must be large enough to not deadlock + rts := setup(ctx, t, nPeers, states, 512) // buffer must be large enough to not deadlock for _, reactor := range rts.reactors { state := reactor.state.GetState() @@ -911,7 +914,7 @@ func TestReactorValidatorSetChanges(t *testing.T) { t.Fatal("subscription encountered unexpected error") } - newValidatorPubKey1, err := states[nVals].privValidator.GetPubKey(ctx) + newValidatorPubKey1, err := states[nVals-1].privValidator.GetPubKey(ctx) require.NoError(t, err) valPubKey1ABCI, err := encoding.PubKeyToProto(newValidatorPubKey1) @@ -944,14 +947,14 @@ func TestReactorValidatorSetChanges(t *testing.T) { // it includes the commit for block 4, which should have the updated validator set waitForBlockWithUpdatedValsAndValidateIt(ctx, t, nPeers, activeVals, blocksSubs, states) - updateValidatorPubKey1, err := states[nVals].privValidator.GetPubKey(ctx) + updateValidatorPubKey1, err := states[nVals-1].privValidator.GetPubKey(ctx) require.NoError(t, err) updatePubKey1ABCI, err := encoding.PubKeyToProto(updateValidatorPubKey1) require.NoError(t, err) updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25) - previousTotalVotingPower := states[nVals].GetRoundState().LastValidators.TotalVotingPower() + previousTotalVotingPower := states[nVals-1].GetRoundState().LastValidators.TotalVotingPower() waitForAndValidateBlock(ctx, t, nPeers, activeVals, blocksSubs, states, updateValidatorTx1) waitForAndValidateBlockWithTx(ctx, t, nPeers, activeVals, blocksSubs, states, updateValidatorTx1) @@ -959,12 +962,12 @@ func TestReactorValidatorSetChanges(t *testing.T) { waitForBlockWithUpdatedValsAndValidateIt(ctx, t, nPeers, activeVals, blocksSubs, states) require.NotEqualf( - t, states[nVals].GetRoundState().LastValidators.TotalVotingPower(), previousTotalVotingPower, + t, states[nVals-1].GetRoundState().LastValidators.TotalVotingPower(), previousTotalVotingPower, "expected voting power to change (before: %d, after: %d)", - previousTotalVotingPower, states[nVals].GetRoundState().LastValidators.TotalVotingPower(), + previousTotalVotingPower, states[nVals-1].GetRoundState().LastValidators.TotalVotingPower(), ) - newValidatorPubKey2, err := states[nVals+1].privValidator.GetPubKey(ctx) + newValidatorPubKey2, err := states[nVals-2].privValidator.GetPubKey(ctx) require.NoError(t, err) newVal2ABCI, err := encoding.PubKeyToProto(newValidatorPubKey2) @@ -972,7 +975,7 @@ func TestReactorValidatorSetChanges(t *testing.T) { newValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, testMinPower) - newValidatorPubKey3, err := states[nVals+2].privValidator.GetPubKey(ctx) + newValidatorPubKey3, err := states[nVals-2].privValidator.GetPubKey(ctx) require.NoError(t, err) newVal3ABCI, err := encoding.PubKeyToProto(newValidatorPubKey3) diff --git a/internal/consensus/state.go b/internal/consensus/state.go index fb1596b5a..d8fe20f64 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -514,13 +514,6 @@ func (cs *State) OnStop() { } } - func() { - cs.mtx.Lock() - defer cs.mtx.Unlock() - - close(cs.onStopCh) - }() - if cs.evsw.IsRunning() { cs.evsw.Stop() } @@ -655,6 +648,8 @@ func (cs *State) SetProposalAndBlock( func (cs *State) updateHeight(height int64) { cs.metrics.Height.Set(float64(height)) + cs.mtx.Lock() + defer cs.mtx.Unlock() cs.Height = height } @@ -776,7 +771,13 @@ func (cs *State) updateToState(ctx context.Context, state sm.State) { switch { case state.LastBlockHeight == 0: // Very first commit should be empty. - cs.LastCommit = (*types.VoteSet)(nil) + func() { + cs.mtx.Lock() + defer cs.mtx.Unlock() + + cs.LastCommit = (*types.VoteSet)(nil) + }() + case cs.CommitRound > -1 && cs.Votes != nil: // Otherwise, use cs.Votes if !cs.Votes.Precommits(cs.CommitRound).HasTwoThirdsMajority() { panic(fmt.Sprintf( @@ -785,7 +786,12 @@ func (cs *State) updateToState(ctx context.Context, state sm.State) { )) } - cs.LastCommit = cs.Votes.Precommits(cs.CommitRound) + func() { + cs.mtx.Lock() + defer cs.mtx.Unlock() + + cs.LastCommit = cs.Votes.Precommits(cs.CommitRound) + }() case cs.LastCommit == nil: // NOTE: when Tendermint starts, it has no votes. reconstructLastCommit @@ -1790,8 +1796,13 @@ func (cs *State) enterCommit(ctx context.Context, height int64, commitRound int3 // Done enterCommit: // keep cs.Round the same, commitRound points to the right Precommits set. cs.updateRoundStep(cs.Round, cstypes.RoundStepCommit) - cs.CommitRound = commitRound - cs.CommitTime = tmtime.Now() + func() { + cs.mtx.Lock() + defer cs.mtx.Unlock() + + cs.CommitRound = commitRound + cs.CommitTime = tmtime.Now() + }() cs.newStep(ctx) // Maybe finalize immediately. @@ -2215,9 +2226,15 @@ func (cs *State) addProposalBlockPart( "valid_block_hash", cs.ProposalBlock.Hash(), ) - cs.ValidRound = cs.Round - cs.ValidBlock = cs.ProposalBlock - cs.ValidBlockParts = cs.ProposalBlockParts + func() { + cs.mtx.Lock() + defer cs.mtx.Unlock() + + cs.ValidRound = cs.Round + cs.ValidBlock = cs.ProposalBlock + cs.ValidBlockParts = cs.ProposalBlockParts + + }() } // TODO: In case there is +2/3 majority in Prevotes set for some // block and cs.ProposalBlock contains different block, either @@ -2392,12 +2409,22 @@ func (cs *State) addVote( ) // we're getting the wrong block - cs.ProposalBlock = nil + func() { + cs.mtx.Lock() + defer cs.mtx.Unlock() + + cs.ProposalBlock = nil + }() } if !cs.ProposalBlockParts.HasHeader(blockID.PartSetHeader) { cs.metrics.MarkBlockGossipStarted() - cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader) + func() { + cs.mtx.Lock() + defer cs.mtx.Unlock() + + cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader) + }() } cs.evsw.FireEvent(ctx, types.EventValidBlockValue, &cs.RoundState)