Browse Source

fixes

pull/8091/head
tycho garen 3 years ago
parent
commit
082b166b59
6 changed files with 48 additions and 35 deletions
  1. +5
    -5
      internal/consensus/byzantine_test.go
  2. +3
    -4
      internal/consensus/common_test.go
  3. +3
    -3
      internal/consensus/invalid_test.go
  4. +7
    -6
      internal/consensus/mempool_test.go
  5. +3
    -3
      internal/consensus/reactor_test.go
  6. +27
    -14
      internal/consensus/state.go

+ 5
- 5
internal/consensus/byzantine_test.go View File

@ -3,7 +3,6 @@ package consensus
import ( import (
"context" "context"
"fmt" "fmt"
"os"
"path" "path"
"sync" "sync"
"testing" "testing"
@ -63,8 +62,6 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
thisConfig, err := ResetConfig(t.TempDir(), fmt.Sprintf("%s_%d", testName, i)) thisConfig, err := ResetConfig(t.TempDir(), fmt.Sprintf("%s_%d", testName, i))
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(thisConfig.RootDir)
ensureDir(t, path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal ensureDir(t, path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
app := kvstore.NewApplication() app := kvstore.NewApplication()
vals := types.TM2PB.ValidatorUpdates(state.Validators) vals := types.TM2PB.ValidatorUpdates(state.Validators)
@ -98,12 +95,15 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore, eventBus) blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool, blockStore, eventBus)
cs, err := NewState(ctx, logger, thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool, eventBus) cs, err := NewState(ctx, logger, thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool, eventBus)
require.NoError(t, err) require.NoError(t, err)
// set private validator // set private validator
pv := privVals[i] pv := privVals[i]
cs.SetPrivValidator(ctx, pv) cs.SetPrivValidator(ctx, pv)
cs.SetTimeoutTicker(tickerFunc()) cs.SetTimeoutTicker(tickerFunc())
require.NoError(t, cs.Start(ctx))
states[i] = cs states[i] = cs
}() }()
} }
@ -233,8 +233,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
} }
} }
for _, reactor := range rts.reactors {
reactor.SwitchToConsensus(ctx, reactor.state.GetState(), false)
for id, reactor := range rts.reactors {
reactor.SwitchToConsensus(ctx, rts.states[id].state, false)
} }
// Evidence should be submitted and committed at the third height but // Evidence should be submitted and committed at the third height but


+ 3
- 4
internal/consensus/common_test.go View File

@ -460,15 +460,14 @@ func newStateWithConfigAndBlockStore(
t.Helper() t.Helper()
// one for mempool, one for consensus // one for mempool, one for consensus
proxyAppConnMem := abciclient.NewLocalClient(logger, app)
proxyAppConnCon := abciclient.NewLocalClient(logger, app)
appConn := abciclient.NewLocalClient(logger, app)
// Make Mempool // Make Mempool
mempool := mempool.NewTxMempool( mempool := mempool.NewTxMempool(
logger.With("module", "mempool"), logger.With("module", "mempool"),
thisConfig.Mempool, thisConfig.Mempool,
proxyAppConnMem,
appConn,
) )
if thisConfig.Consensus.WaitForTxs() { if thisConfig.Consensus.WaitForTxs() {
@ -485,7 +484,7 @@ func newStateWithConfigAndBlockStore(
eventBus := eventbus.NewDefault(logger.With("module", "events")) eventBus := eventbus.NewDefault(logger.With("module", "events"))
require.NoError(t, eventBus.Start(ctx)) require.NoError(t, eventBus.Start(ctx))
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyAppConnCon, mempool, evpool, blockStore, eventBus)
blockExec := sm.NewBlockExecutor(stateStore, logger, appConn, mempool, evpool, blockStore, eventBus)
cs, err := NewState(ctx, cs, err := NewState(ctx,
logger.With("module", "consensus"), logger.With("module", "consensus"),
thisConfig.Consensus, thisConfig.Consensus,


+ 3
- 3
internal/consensus/invalid_test.go View File

@ -35,13 +35,13 @@ func TestReactorInvalidPrecommit(t *testing.T) {
for i := 0; i < 4; i++ { for i := 0; i < 4; i++ {
ticker := NewTimeoutTicker(states[i].logger) ticker := NewTimeoutTicker(states[i].logger)
states[i].SetTimeoutTicker(ticker) states[i].SetTimeoutTicker(ticker)
require.NoError(t, states[i].Start(ctx))
} }
rts := setup(ctx, t, n, states, 100) // buffer must be large enough to not deadlock rts := setup(ctx, t, n, states, 100) // buffer must be large enough to not deadlock
for _, reactor := range rts.reactors {
state := reactor.state.GetState()
reactor.SwitchToConsensus(ctx, state, false)
for id, reactor := range rts.reactors {
reactor.SwitchToConsensus(ctx, rts.states[id].state, false)
} }
// this val sends a random precommit at each height // this val sends a random precommit at each height


+ 7
- 6
internal/consensus/mempool_test.go View File

@ -138,7 +138,7 @@ func checkTxsRange(ctx context.Context, t *testing.T, cs *State, start, end int)
} }
func TestMempoolTxConcurrentWithCommit(t *testing.T) { func TestMempoolTxConcurrentWithCommit(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel() defer cancel()
config := configSetup(t) config := configSetup(t)
@ -148,27 +148,28 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) {
Power: 10}) Power: 10})
stateStore := sm.NewStore(dbm.NewMemDB()) stateStore := sm.NewStore(dbm.NewMemDB())
blockStore := store.NewBlockStore(dbm.NewMemDB()) blockStore := store.NewBlockStore(dbm.NewMemDB())
require.NoError(t, stateStore.Save(state))
cs := newStateWithConfigAndBlockStore( cs := newStateWithConfigAndBlockStore(
ctx, ctx,
t, t,
logger, config, state, privVals[0], NewCounterApplication(), blockStore) logger, config, state, privVals[0], NewCounterApplication(), blockStore)
require.NoError(t, cs.Start(ctx))
err := stateStore.Save(state)
require.NoError(t, err)
newBlockHeaderCh := subscribe(ctx, t, cs.eventBus, types.EventQueryNewBlockHeader) newBlockHeaderCh := subscribe(ctx, t, cs.eventBus, types.EventQueryNewBlockHeader)
const numTxs int64 = 3000 const numTxs int64 = 3000
go checkTxsRange(ctx, t, cs, 0, int(numTxs))
go checkTxsRange(ctx, t, cs, 0, int(numTxs))
startTestRound(ctx, cs, cs.Height, cs.Round) startTestRound(ctx, cs, cs.Height, cs.Round)
for n := int64(0); n < numTxs; { for n := int64(0); n < numTxs; {
select { select {
case msg := <-newBlockHeaderCh: case msg := <-newBlockHeaderCh:
headerEvent := msg.Data().(types.EventDataNewBlockHeader) headerEvent := msg.Data().(types.EventDataNewBlockHeader)
n += headerEvent.NumTxs n += headerEvent.NumTxs
case <-time.After(30 * time.Second):
t.Fatal("Timed out waiting 30s to commit blocks with transactions")
case <-ctx.Done():
t.Fatalf("Timed out waiting to commit blocks with transactions [%d of %d]", n, numTxs)
} }
} }
} }


+ 3
- 3
internal/consensus/reactor_test.go View File

@ -511,6 +511,7 @@ func TestReactorWithEvidence(t *testing.T) {
cs, err := NewState(ctx, logger.With("validator", i, "module", "consensus"), cs, err := NewState(ctx, logger.With("validator", i, "module", "consensus"),
thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool2, eventBus) thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool2, eventBus)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, cs.Start(ctx))
cs.SetPrivValidator(ctx, pv) cs.SetPrivValidator(ctx, pv)
cs.SetTimeoutTicker(tickerFunc()) cs.SetTimeoutTicker(tickerFunc())
@ -520,9 +521,8 @@ func TestReactorWithEvidence(t *testing.T) {
rts := setup(ctx, t, n, states, 100) // buffer must be large enough to not deadlock rts := setup(ctx, t, n, states, 100) // buffer must be large enough to not deadlock
for _, reactor := range rts.reactors {
state := reactor.state.GetState()
reactor.SwitchToConsensus(ctx, state, false)
for id, reactor := range rts.reactors {
reactor.SwitchToConsensus(ctx, rts.states[id].state, false)
} }
var wg sync.WaitGroup var wg sync.WaitGroup


+ 27
- 14
internal/consensus/state.go View File

@ -1103,7 +1103,6 @@ func (cs *State) handleTxsAvailable(ctx context.Context) {
// NOTE: cs.StartTime was already set for height. // NOTE: cs.StartTime was already set for height.
func (cs *State) enterNewRound(ctx context.Context, height int64, round int32) { func (cs *State) enterNewRound(ctx context.Context, height int64, round int32) {
// TODO: remove panics in this function and return an error // TODO: remove panics in this function and return an error
logger := cs.logger.With("height", height, "round", round) logger := cs.logger.With("height", height, "round", round)
if cs.Height != height || round < cs.Round || (cs.Round == round && cs.Step != cstypes.RoundStepNewHeight) { if cs.Height != height || round < cs.Round || (cs.Round == round && cs.Step != cstypes.RoundStepNewHeight) {
@ -1325,6 +1324,8 @@ func (cs *State) defaultDecideProposal(ctx context.Context, height int64, round
// Returns true if the proposal block is complete && // Returns true if the proposal block is complete &&
// (if POLRound was proposed, we have +2/3 prevotes from there). // (if POLRound was proposed, we have +2/3 prevotes from there).
func (cs *State) isProposalComplete() bool { func (cs *State) isProposalComplete() bool {
cs.mtx.RLock()
defer cs.mtx.RUnlock()
if cs.Proposal == nil || cs.ProposalBlock == nil { if cs.Proposal == nil || cs.ProposalBlock == nil {
return false return false
} }
@ -2036,19 +2037,27 @@ func (cs *State) RecordMetrics(height int64, block *types.Block) {
func (cs *State) defaultSetProposal(proposal *types.Proposal, recvTime time.Time) error { func (cs *State) defaultSetProposal(proposal *types.Proposal, recvTime time.Time) error {
// Already have one // Already have one
// TODO: possibly catch double proposals // TODO: possibly catch double proposals
if cs.Proposal != nil || proposal == nil {
return nil
}
if shouldReturn, err := func() (bool, error) {
cs.mtx.RLock()
defer cs.mtx.RUnlock()
// Does not apply
if proposal.Height != cs.Height || proposal.Round != cs.Round {
return nil
}
if cs.Proposal != nil || proposal == nil {
return true, nil
}
// Does not apply
if proposal.Height != cs.Height || proposal.Round != cs.Round {
return true, nil
}
// Verify POLRound, which must be -1 or in range [0, proposal.Round).
if proposal.POLRound < -1 ||
(proposal.POLRound >= 0 && proposal.POLRound >= proposal.Round) {
return ErrInvalidProposalPOLRound
// Verify POLRound, which must be -1 or in range [0, proposal.Round).
if proposal.POLRound < -1 ||
(proposal.POLRound >= 0 && proposal.POLRound >= proposal.Round) {
return true, ErrInvalidProposalPOLRound
}
return false, nil
}(); shouldReturn {
return err
} }
p := proposal.ToProto() p := proposal.ToProto()
@ -2060,8 +2069,12 @@ func (cs *State) defaultSetProposal(proposal *types.Proposal, recvTime time.Time
} }
proposal.Signature = p.Signature proposal.Signature = p.Signature
cs.Proposal = proposal
cs.ProposalReceiveTime = recvTime
func() {
cs.mtx.Lock()
defer cs.mtx.RUnlock()
cs.Proposal = proposal
cs.ProposalReceiveTime = recvTime
}()
cs.calculateProposalTimestampDifferenceMetric() cs.calculateProposalTimestampDifferenceMetric()
// We don't update cs.ProposalBlockParts if it is already set. // We don't update cs.ProposalBlockParts if it is already set.
// This happens if we're already in cstypes.RoundStepCommit or if there is a valid block in the current round. // This happens if we're already in cstypes.RoundStepCommit or if there is a valid block in the current round.


Loading…
Cancel
Save