Browse Source

back out of consensus changes

pull/8127/head
tycho garen 3 years ago
committed by M. J. Fromberger
parent
commit
b9d25538ce
7 changed files with 48 additions and 66 deletions
  1. +5
    -5
      internal/consensus/byzantine_test.go
  2. +5
    -3
      internal/consensus/common_test.go
  3. +5
    -7
      internal/consensus/invalid_test.go
  4. +6
    -7
      internal/consensus/mempool_test.go
  5. +3
    -8
      internal/consensus/reactor.go
  6. +3
    -3
      internal/consensus/reactor_test.go
  7. +21
    -33
      internal/consensus/state.go

+ 5
- 5
internal/consensus/byzantine_test.go View File

@ -3,6 +3,7 @@ package consensus
import (
"context"
"fmt"
"os"
"path"
"sync"
"testing"
@ -62,6 +63,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
thisConfig, err := ResetConfig(t.TempDir(), fmt.Sprintf("%s_%d", testName, i))
require.NoError(t, err)
defer os.RemoveAll(thisConfig.RootDir)
ensureDir(t, path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
app := kvstore.NewApplication()
vals := types.TM2PB.ValidatorUpdates(state.Validators)
@ -95,15 +98,12 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
blockExec := sm.NewBlockExecutor(stateStore, log.NewNopLogger(), proxyAppConnCon, mempool, evpool, blockStore, eventBus)
cs, err := NewState(ctx, logger, thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool, eventBus)
require.NoError(t, err)
// set private validator
pv := privVals[i]
cs.SetPrivValidator(ctx, pv)
cs.SetTimeoutTicker(tickerFunc())
require.NoError(t, cs.Start(ctx))
states[i] = cs
}()
}
@ -232,8 +232,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
}
}
for id, reactor := range rts.reactors {
reactor.SwitchToConsensus(ctx, rts.states[id].state, false)
for _, reactor := range rts.reactors {
reactor.SwitchToConsensus(ctx, reactor.state.GetState(), false)
}
// Evidence should be submitted and committed at the third height but


+ 5
- 3
internal/consensus/common_test.go View File

@ -227,6 +227,7 @@ func sortVValidatorStubsByPower(ctx context.Context, t *testing.T, vss []*valida
func startTestRound(ctx context.Context, cs *State, height int64, round int32) {
cs.enterNewRound(ctx, height, round)
cs.startRoutines(ctx, 0)
}
// Create proposal block from cs1 but sign it with vs.
@ -464,14 +465,15 @@ func newStateWithConfigAndBlockStore(
t.Helper()
// one for mempool, one for consensus
appConn := abciclient.NewLocalClient(logger, app)
proxyAppConnMem := abciclient.NewLocalClient(logger, app)
proxyAppConnCon := abciclient.NewLocalClient(logger, app)
// Make Mempool
mempool := mempool.NewTxMempool(
logger.With("module", "mempool"),
thisConfig.Mempool,
appConn,
proxyAppConnMem,
)
if thisConfig.Consensus.WaitForTxs() {
@ -488,7 +490,7 @@ func newStateWithConfigAndBlockStore(
eventBus := eventbus.NewDefault(logger.With("module", "events"))
require.NoError(t, eventBus.Start(ctx))
blockExec := sm.NewBlockExecutor(stateStore, logger, appConn, mempool, evpool, blockStore, eventBus)
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyAppConnCon, mempool, evpool, blockStore, eventBus)
cs, err := NewState(ctx,
logger.With("module", "consensus"),
thisConfig.Consensus,


+ 5
- 7
internal/consensus/invalid_test.go View File

@ -35,13 +35,13 @@ func TestReactorInvalidPrecommit(t *testing.T) {
for i := 0; i < 4; i++ {
ticker := NewTimeoutTicker(states[i].logger)
states[i].SetTimeoutTicker(ticker)
require.NoError(t, states[i].Start(ctx))
}
rts := setup(ctx, t, n, states, 100) // buffer must be large enough to not deadlock
for id, reactor := range rts.reactors {
reactor.SwitchToConsensus(ctx, rts.states[id].state, false)
for _, reactor := range rts.reactors {
state := reactor.state.GetState()
reactor.SwitchToConsensus(ctx, state, false)
}
// this val sends a random precommit at each height
@ -91,11 +91,9 @@ func TestReactorInvalidPrecommit(t *testing.T) {
t.Fatal("test condition did not fire")
}
case <-ctx.Done():
select {
case <-signal:
// pass
default:
if _, ok := <-signal; !ok {
t.Fatal("test condition did not fire after timeout")
return
}
case <-signal:
// test passed


+ 6
- 7
internal/consensus/mempool_test.go View File

@ -138,7 +138,7 @@ func checkTxsRange(ctx context.Context, t *testing.T, cs *State, start, end int)
}
func TestMempoolTxConcurrentWithCommit(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
config := configSetup(t)
@ -148,28 +148,27 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) {
Power: 10})
stateStore := sm.NewStore(dbm.NewMemDB())
blockStore := store.NewBlockStore(dbm.NewMemDB())
require.NoError(t, stateStore.Save(state))
cs := newStateWithConfigAndBlockStore(
ctx,
t,
logger, config, state, privVals[0], NewCounterApplication(), blockStore)
require.NoError(t, cs.Start(ctx))
err := stateStore.Save(state)
require.NoError(t, err)
newBlockHeaderCh := subscribe(ctx, t, cs.eventBus, types.EventQueryNewBlockHeader)
const numTxs int64 = 3000
go checkTxsRange(ctx, t, cs, 0, int(numTxs))
startTestRound(ctx, cs, cs.Height, cs.Round)
startTestRound(ctx, cs, cs.Height, cs.Round)
for n := int64(0); n < numTxs; {
select {
case msg := <-newBlockHeaderCh:
headerEvent := msg.Data().(types.EventDataNewBlockHeader)
n += headerEvent.NumTxs
case <-ctx.Done():
t.Fatalf("Timed out waiting to commit blocks with transactions [%d of %d]", n, numTxs)
case <-time.After(30 * time.Second):
t.Fatal("Timed out waiting 30s to commit blocks with transactions")
}
}
}


+ 3
- 8
internal/consensus/reactor.go View File

@ -865,15 +865,10 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) {
return
}
// TODO create more reliable coppies of these
// structures so the following go routines don't race
rs := r.state.GetRoundState()
prs := ps.GetRoundState()
select {
case <-ctx.Done():
return
default:
}
// TODO create more reliable coppies of these
// structures so the following go routines don't race
wg := &sync.WaitGroup{}
@ -1032,7 +1027,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// Send our state to the peer. If we're block-syncing, broadcast a
// RoundStepMessage later upon SwitchToConsensus().
if !r.WaitSync() {
_ = r.sendNewRoundStepMessage(ctx, ps.peerID)
go func() { _ = r.sendNewRoundStepMessage(ctx, ps.peerID) }()
}
}()


+ 3
- 3
internal/consensus/reactor_test.go View File

@ -509,7 +509,6 @@ func TestReactorWithEvidence(t *testing.T) {
cs, err := NewState(ctx, logger.With("validator", i, "module", "consensus"),
thisConfig.Consensus, stateStore, blockExec, blockStore, mempool, evpool2, eventBus)
require.NoError(t, err)
require.NoError(t, cs.Start(ctx))
cs.SetPrivValidator(ctx, pv)
cs.SetTimeoutTicker(tickerFunc())
@ -519,8 +518,9 @@ func TestReactorWithEvidence(t *testing.T) {
rts := setup(ctx, t, n, states, 100) // buffer must be large enough to not deadlock
for id, reactor := range rts.reactors {
reactor.SwitchToConsensus(ctx, rts.states[id].state, false)
for _, reactor := range rts.reactors {
state := reactor.state.GetState()
reactor.SwitchToConsensus(ctx, state, false)
}
var wg sync.WaitGroup


+ 21
- 33
internal/consensus/state.go View File

@ -223,6 +223,10 @@ func NewState(
cs.doPrevote = cs.defaultDoPrevote
cs.setProposal = cs.defaultSetProposal
if err := cs.updateStateFromStore(ctx); err != nil {
return nil, err
}
// NOTE: we do not call scheduleRound0 yet, we do that upon Start()
cs.BaseService = *service.NewBaseService(logger, "State", cs)
for _, option := range options {
@ -286,9 +290,9 @@ func (cs *State) GetRoundState() *cstypes.RoundState {
cs.mtx.RLock()
defer cs.mtx.RUnlock()
// NOTE: this is probably be dodgy, as RoundState isn't thread
// safe as it contains a number of pointers which make the
// resulting object shared anyway
// NOTE: this might be dodgy, as RoundState itself isn't thread
// safe as it contains a number of pointers and is explicitly
// not thread safe.
rs := cs.RoundState // copy
return &rs
}
@ -351,9 +355,8 @@ func (cs *State) SetPrivValidator(ctx context.Context, priv types.PrivValidator)
// testing.
func (cs *State) SetTimeoutTicker(timeoutTicker TimeoutTicker) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.timeoutTicker = timeoutTicker
cs.mtx.Unlock()
}
// LoadCommit loads the commit for a given height.
@ -1116,6 +1119,7 @@ func (cs *State) handleTxsAvailable(ctx context.Context) {
// NOTE: cs.StartTime was already set for height.
func (cs *State) enterNewRound(ctx context.Context, height int64, round int32) {
// TODO: remove panics in this function and return an error
logger := cs.logger.With("height", height, "round", round)
if cs.Height != height || round < cs.Round || (cs.Round == round && cs.Step != cstypes.RoundStepNewHeight) {
@ -1345,8 +1349,6 @@ func (cs *State) defaultDecideProposal(ctx context.Context, height int64, round
// Returns true if the proposal block is complete &&
// (if POLRound was proposed, we have +2/3 prevotes from there).
func (cs *State) isProposalComplete() bool {
cs.mtx.RLock()
defer cs.mtx.RUnlock()
if cs.Proposal == nil || cs.ProposalBlock == nil {
return false
}
@ -2056,27 +2058,19 @@ func (cs *State) RecordMetrics(height int64, block *types.Block) {
func (cs *State) defaultSetProposal(proposal *types.Proposal, recvTime time.Time) error {
// Already have one
// TODO: possibly catch double proposals
if shouldReturn, err := func() (bool, error) {
cs.mtx.RLock()
defer cs.mtx.RUnlock()
if cs.Proposal != nil || proposal == nil {
return true, nil
}
if cs.Proposal != nil || proposal == nil {
return nil
}
// Does not apply
if proposal.Height != cs.Height || proposal.Round != cs.Round {
return true, nil
}
// Does not apply
if proposal.Height != cs.Height || proposal.Round != cs.Round {
return nil
}
// Verify POLRound, which must be -1 or in range [0, proposal.Round).
if proposal.POLRound < -1 ||
(proposal.POLRound >= 0 && proposal.POLRound >= proposal.Round) {
return true, ErrInvalidProposalPOLRound
}
return false, nil
}(); shouldReturn {
return err
// Verify POLRound, which must be -1 or in range [0, proposal.Round).
if proposal.POLRound < -1 ||
(proposal.POLRound >= 0 && proposal.POLRound >= proposal.Round) {
return ErrInvalidProposalPOLRound
}
p := proposal.ToProto()
@ -2088,21 +2082,15 @@ func (cs *State) defaultSetProposal(proposal *types.Proposal, recvTime time.Time
}
proposal.Signature = p.Signature
cs.mtx.Lock()
cs.Proposal = proposal
cs.ProposalReceiveTime = recvTime
cs.mtx.Unlock()
cs.calculateProposalTimestampDifferenceMetric()
// We don't update cs.ProposalBlockParts if it is already set.
// This happens if we're already in cstypes.RoundStepCommit or if there is a valid block in the current round.
// TODO: We can check if Proposal is for a different block as this is a sign of misbehavior!
if cs.ProposalBlockParts == nil {
cs.metrics.MarkBlockGossipStarted()
func() {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.ProposalBlockParts = types.NewPartSetFromHeader(proposal.BlockID.PartSetHeader)
}()
cs.ProposalBlockParts = types.NewPartSetFromHeader(proposal.BlockID.PartSetHeader)
}
cs.logger.Info("received proposal", "proposal", proposal)


Loading…
Cancel
Save