From 6738ef6f85affb39ea1d1ccddf034f36111b03aa Mon Sep 17 00:00:00 2001 From: tycho garen Date: Tue, 8 Mar 2022 15:55:37 -0500 Subject: [PATCH] fixes --- internal/consensus/byzantine_test.go | 10 +++---- internal/consensus/common_test.go | 7 ++--- internal/consensus/invalid_test.go | 6 ++-- internal/consensus/mempool_test.go | 13 +++++---- internal/consensus/reactor_test.go | 6 ++-- internal/consensus/state.go | 41 ++++++++++++++++++---------- 6 files changed, 48 insertions(+), 35 deletions(-) diff --git a/internal/consensus/byzantine_test.go b/internal/consensus/byzantine_test.go index dfeb556fe..1f1247400 100644 --- a/internal/consensus/byzantine_test.go +++ b/internal/consensus/byzantine_test.go @@ -3,7 +3,6 @@ package consensus import ( "context" "fmt" - "os" "path" "sync" "testing" @@ -63,8 +62,6 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { thisConfig, err := ResetConfig(t.TempDir(), fmt.Sprintf("%s_%d", testName, i)) require.NoError(t, err) - defer os.RemoveAll(thisConfig.RootDir) - ensureDir(t, path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal app := kvstore.NewApplication() vals := types.TM2PB.ValidatorUpdates(state.Validators) @@ -98,12 +95,15 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { blockExec := sm.NewBlockExecutor(stateStore, log.NewNopLogger(), proxyAppConnCon, mempool, evpool, blockStore, eventBus) cs, err := NewState(ctx, logger, thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool, eventBus) require.NoError(t, err) + // set private validator pv := privVals[i] cs.SetPrivValidator(ctx, pv) cs.SetTimeoutTicker(tickerFunc()) + require.NoError(t, cs.Start(ctx)) + states[i] = cs }() } @@ -232,8 +232,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { } } - for _, reactor := range rts.reactors { - reactor.SwitchToConsensus(ctx, reactor.state.GetState(), false) + for id, reactor := range rts.reactors { + reactor.SwitchToConsensus(ctx, rts.states[id].state, false) } // Evidence should be submitted and committed at the third height but diff --git a/internal/consensus/common_test.go b/internal/consensus/common_test.go index 161594021..2174f45fc 100644 --- a/internal/consensus/common_test.go +++ b/internal/consensus/common_test.go @@ -465,15 +465,14 @@ func newStateWithConfigAndBlockStore( t.Helper() // one for mempool, one for consensus - proxyAppConnMem := abciclient.NewLocalClient(logger, app) - proxyAppConnCon := abciclient.NewLocalClient(logger, app) + appConn := abciclient.NewLocalClient(logger, app) // Make Mempool mempool := mempool.NewTxMempool( logger.With("module", "mempool"), thisConfig.Mempool, - proxyAppConnMem, + appConn, ) if thisConfig.Consensus.WaitForTxs() { @@ -490,7 +489,7 @@ func newStateWithConfigAndBlockStore( eventBus := eventbus.NewDefault(logger.With("module", "events")) require.NoError(t, eventBus.Start(ctx)) - blockExec := sm.NewBlockExecutor(stateStore, logger, proxyAppConnCon, mempool, evpool, blockStore, eventBus) + blockExec := sm.NewBlockExecutor(stateStore, logger, appConn, mempool, evpool, blockStore, eventBus) cs, err := NewState(ctx, logger.With("module", "consensus"), thisConfig.Consensus, diff --git a/internal/consensus/invalid_test.go b/internal/consensus/invalid_test.go index 033b096ba..0b4fdf830 100644 --- a/internal/consensus/invalid_test.go +++ b/internal/consensus/invalid_test.go @@ -35,13 +35,13 @@ func TestReactorInvalidPrecommit(t *testing.T) { for i := 0; i < 4; i++ { ticker := NewTimeoutTicker(states[i].logger) states[i].SetTimeoutTicker(ticker) + require.NoError(t, states[i].Start(ctx)) } rts := setup(ctx, t, n, states, 100) // buffer must be large enough to not deadlock - for _, reactor := range rts.reactors { - state := reactor.state.GetState() - reactor.SwitchToConsensus(ctx, state, false) + for id, reactor := range rts.reactors { + reactor.SwitchToConsensus(ctx, rts.states[id].state, false) } // this val sends a random precommit at each height diff --git a/internal/consensus/mempool_test.go b/internal/consensus/mempool_test.go index 6d1d7e8e4..393555a61 100644 --- a/internal/consensus/mempool_test.go +++ b/internal/consensus/mempool_test.go @@ -138,7 +138,7 @@ func checkTxsRange(ctx context.Context, t *testing.T, cs *State, start, end int) } func TestMempoolTxConcurrentWithCommit(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() config := configSetup(t) @@ -148,27 +148,28 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) { Power: 10}) stateStore := sm.NewStore(dbm.NewMemDB()) blockStore := store.NewBlockStore(dbm.NewMemDB()) + require.NoError(t, stateStore.Save(state)) cs := newStateWithConfigAndBlockStore( ctx, t, logger, config, state, privVals[0], NewCounterApplication(), blockStore) + require.NoError(t, cs.Start(ctx)) - err := stateStore.Save(state) - require.NoError(t, err) newBlockHeaderCh := subscribe(ctx, t, cs.eventBus, types.EventQueryNewBlockHeader) const numTxs int64 = 3000 - go checkTxsRange(ctx, t, cs, 0, int(numTxs)) + go checkTxsRange(ctx, t, cs, 0, int(numTxs)) startTestRound(ctx, cs, cs.Height, cs.Round) + for n := int64(0); n < numTxs; { select { case msg := <-newBlockHeaderCh: headerEvent := msg.Data().(types.EventDataNewBlockHeader) n += headerEvent.NumTxs - case <-time.After(30 * time.Second): - t.Fatal("Timed out waiting 30s to commit blocks with transactions") + case <-ctx.Done(): + t.Fatalf("Timed out waiting to commit blocks with transactions [%d of %d]", n, numTxs) } } } diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index b8d638976..a396ffc44 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -509,6 +509,7 @@ func TestReactorWithEvidence(t *testing.T) { cs, err := NewState(ctx, logger.With("validator", i, "module", "consensus"), thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool2, eventBus) require.NoError(t, err) + require.NoError(t, cs.Start(ctx)) cs.SetPrivValidator(ctx, pv) cs.SetTimeoutTicker(tickerFunc()) @@ -518,9 +519,8 @@ func TestReactorWithEvidence(t *testing.T) { rts := setup(ctx, t, n, states, 100) // buffer must be large enough to not deadlock - for _, reactor := range rts.reactors { - state := reactor.state.GetState() - reactor.SwitchToConsensus(ctx, state, false) + for id, reactor := range rts.reactors { + reactor.SwitchToConsensus(ctx, rts.states[id].state, false) } var wg sync.WaitGroup diff --git a/internal/consensus/state.go b/internal/consensus/state.go index e4a6521a2..dadaf55a1 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -1115,7 +1115,6 @@ func (cs *State) handleTxsAvailable(ctx context.Context) { // NOTE: cs.StartTime was already set for height. func (cs *State) enterNewRound(ctx context.Context, height int64, round int32) { // TODO: remove panics in this function and return an error - logger := cs.logger.With("height", height, "round", round) if cs.Height != height || round < cs.Round || (cs.Round == round && cs.Step != cstypes.RoundStepNewHeight) { @@ -1345,6 +1344,8 @@ func (cs *State) defaultDecideProposal(ctx context.Context, height int64, round // Returns true if the proposal block is complete && // (if POLRound was proposed, we have +2/3 prevotes from there). func (cs *State) isProposalComplete() bool { + cs.mtx.RLock() + defer cs.mtx.RUnlock() if cs.Proposal == nil || cs.ProposalBlock == nil { return false } @@ -2054,19 +2055,27 @@ func (cs *State) RecordMetrics(height int64, block *types.Block) { func (cs *State) defaultSetProposal(proposal *types.Proposal, recvTime time.Time) error { // Already have one // TODO: possibly catch double proposals - if cs.Proposal != nil || proposal == nil { - return nil - } + if shouldReturn, err := func() (bool, error) { + cs.mtx.RLock() + defer cs.mtx.RUnlock() - // Does not apply - if proposal.Height != cs.Height || proposal.Round != cs.Round { - return nil - } + if cs.Proposal != nil || proposal == nil { + return true, nil + } - // Verify POLRound, which must be -1 or in range [0, proposal.Round). - if proposal.POLRound < -1 || - (proposal.POLRound >= 0 && proposal.POLRound >= proposal.Round) { - return ErrInvalidProposalPOLRound + // Does not apply + if proposal.Height != cs.Height || proposal.Round != cs.Round { + return true, nil + } + + // Verify POLRound, which must be -1 or in range [0, proposal.Round). + if proposal.POLRound < -1 || + (proposal.POLRound >= 0 && proposal.POLRound >= proposal.Round) { + return true, ErrInvalidProposalPOLRound + } + return false, nil + }(); shouldReturn { + return err } p := proposal.ToProto() @@ -2078,8 +2087,12 @@ func (cs *State) defaultSetProposal(proposal *types.Proposal, recvTime time.Time } proposal.Signature = p.Signature - cs.Proposal = proposal - cs.ProposalReceiveTime = recvTime + func() { + cs.mtx.Lock() + defer cs.mtx.RUnlock() + cs.Proposal = proposal + cs.ProposalReceiveTime = recvTime + }() cs.calculateProposalTimestampDifferenceMetric() // We don't update cs.ProposalBlockParts if it is already set. // This happens if we're already in cstypes.RoundStepCommit or if there is a valid block in the current round.