From c9ec9cf00e0ef370caa8258226e3399d6e3caac9 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 29 Feb 2016 16:15:23 -0500 Subject: [PATCH] config: block size, consensus timeouts, recheck tx --- blockchain/pool.go | 3 +- blockchain/pool_test.go | 5 +- config/tendermint/config.go | 14 +++++- config/tendermint_test/config.go | 14 +++++- consensus/state.go | 81 +++++++++++++++++++++++++------- consensus/state_test.go | 14 +++--- mempool/mempool.go | 12 ++--- mempool/mempool_test.go | 1 + 8 files changed, 107 insertions(+), 37 deletions(-) diff --git a/blockchain/pool.go b/blockchain/pool.go index 0409976f3..06556154f 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -15,10 +15,11 @@ const ( maxTotalRequesters = 300 maxPendingRequests = maxTotalRequesters maxPendingRequestsPerPeer = 75 - peerTimeoutSeconds = 15 minRecvRate = 10240 // 10Kb/s ) +var peerTimeoutSeconds = time.Duration(15) // not const so we can override with tests + /* Peers self report their heights when we join the block pool. Starting from our latest pool.height, we request blocks diff --git a/blockchain/pool_test.go b/blockchain/pool_test.go index c91f33680..8e6dbee94 100644 --- a/blockchain/pool_test.go +++ b/blockchain/pool_test.go @@ -1,4 +1,3 @@ - package blockchain import ( @@ -10,6 +9,10 @@ import ( "github.com/tendermint/tendermint/types" ) +func init() { + peerTimeoutSeconds = time.Duration(2) +} + type testPeer struct { id string height int diff --git a/config/tendermint/config.go b/config/tendermint/config.go index bdeb6add5..7e8e5b44c 100644 --- a/config/tendermint/config.go +++ b/config/tendermint/config.go @@ -62,12 +62,22 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("priv_validator_file", rootDir+"/priv_validator.json") mapConfig.SetDefault("db_backend", "leveldb") mapConfig.SetDefault("db_dir", rootDir+"/data") - mapConfig.SetDefault("vm_log", true) mapConfig.SetDefault("log_level", "info") mapConfig.SetDefault("rpc_laddr", "0.0.0.0:46657") mapConfig.SetDefault("prof_laddr", "") mapConfig.SetDefault("revision_file", rootDir+"/revision") - mapConfig.SetDefault("cswal", rootDir+"/cswal") + mapConfig.SetDefault("cswal", rootDir+"/data/cswal") + + mapConfig.SetDefault("block_size", 10000) + mapConfig.SetDefault("timeout_propose", 3000) + mapConfig.SetDefault("timeout_propose_delta", 500) + mapConfig.SetDefault("timeout_prevote", 1000) + mapConfig.SetDefault("timeout_prevote_delta", 500) + mapConfig.SetDefault("timeout_precommit", 1000) + mapConfig.SetDefault("timeout_precommit_delta", 500) + mapConfig.SetDefault("timeout_commit", 1000) + mapConfig.SetDefault("mempool_recheck", true) + return mapConfig } diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index 21dbe9332..a9153e50c 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -87,11 +87,21 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("db_backend", "memdb") mapConfig.SetDefault("db_dir", rootDir+"/data") mapConfig.SetDefault("log_level", "debug") - mapConfig.SetDefault("vm_log", true) mapConfig.SetDefault("rpc_laddr", "0.0.0.0:36657") mapConfig.SetDefault("prof_laddr", "") mapConfig.SetDefault("revision_file", rootDir+"/revision") - mapConfig.SetDefault("cswal", rootDir+"/cswal") + mapConfig.SetDefault("cswal", rootDir+"/data/cswal") + + mapConfig.SetDefault("block_size", 10000) + mapConfig.SetDefault("timeout_propose", 100) + mapConfig.SetDefault("timeout_propose_delta", 1) + mapConfig.SetDefault("timeout_prevote", 1) + mapConfig.SetDefault("timeout_prevote_delta", 1) + mapConfig.SetDefault("timeout_precommit", 1) + mapConfig.SetDefault("timeout_precommit_delta", 1) + mapConfig.SetDefault("timeout_commit", 1) + mapConfig.SetDefault("mempool_recheck", true) + return mapConfig } diff --git a/consensus/state.go b/consensus/state.go index 201586643..eb7109609 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -18,16 +18,55 @@ import ( "github.com/tendermint/tendermint/types" ) -var ( - timeoutPropose0 = 3000 * time.Millisecond // Wait this long for a proposal - timeoutProposeDelta = 0500 * time.Millisecond // timeoutProposeN is timeoutPropose0 + timeoutProposeDelta*N - timeoutPrevote0 = 1000 * time.Millisecond // After any +2/3 prevotes received, wait this long for stragglers. - timeoutPrevoteDelta = 0500 * time.Millisecond // timeoutPrevoteN is timeoutPrevote0 + timeoutPrevoteDelta*N - timeoutPrecommit0 = 1000 * time.Millisecond // After any +2/3 precommits received, wait this long for stragglers. - timeoutPrecommitDelta = 0500 * time.Millisecond // timeoutPrecommitN is timeoutPrecommit0 + timeoutPrecommitDelta*N - timeoutCommit = 1000 * time.Millisecond // After +2/3 commits received for committed block, wait this long for stragglers in the next height's RoundStepNewHeight. +//----------------------------------------------------------------------------- +// Timeout Parameters + +// All in milliseconds +type TimeoutParams struct { + Propose0 int + ProposeDelta int + Prevote0 int + PrevoteDelta int + Precommit0 int + PrecommitDelta int + Commit0 int +} -) +// Wait this long for a proposal +func (tp *TimeoutParams) Propose(round int) time.Duration { + return time.Duration(tp.Propose0+tp.ProposeDelta*round) * time.Millisecond +} + +// After receiving any +2/3 prevote, wait this long for stragglers +func (tp *TimeoutParams) Prevote(round int) time.Duration { + return time.Duration(tp.Prevote0+tp.PrevoteDelta*round) * time.Millisecond +} + +// After receiving any +2/3 precommits, wait this long for stragglers +func (tp *TimeoutParams) Precommit(round int) time.Duration { + return time.Duration(tp.Precommit0+tp.PrecommitDelta*round) * time.Millisecond +} + +// After receiving +2/3 precommits for a single block (a commit), wait this long for stragglers in the next height's RoundStepNewHeight +func (tp *TimeoutParams) Commit(t time.Time) time.Time { + return t.Add(time.Duration(tp.Commit0) * time.Millisecond) +} + +// Initialize parameters from config file +func InitTimeoutParamsFromConfig() *TimeoutParams { + return &TimeoutParams{ + Propose0: config.GetInt("timeout_propose"), + ProposeDelta: config.GetInt("timeout_propose_delta"), + Prevote0: config.GetInt("timeout_prevote"), + PrevoteDelta: config.GetInt("timeout_prevote_delta"), + Precommit0: config.GetInt("timeout_precommit"), + PrecommitDelta: config.GetInt("timeout_precommit_delta"), + Commit0: config.GetInt("timeout_commit"), + } +} + +//----------------------------------------------------------------------------- +// Errors var ( ErrInvalidProposalSignature = errors.New("Error invalid proposal signature") @@ -188,6 +227,7 @@ type ConsensusState struct { timeoutTicker *time.Ticker // ticker for timeouts tickChan chan timeoutInfo // start the timeoutTicker in the timeoutRoutine tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine + timeoutParams *TimeoutParams // parameters and functions for timeout intervals evsw *events.EventSwitch @@ -206,6 +246,7 @@ func NewConsensusState(state *sm.State, proxyAppConn proxy.AppConn, blockStore * timeoutTicker: new(time.Ticker), tickChan: make(chan timeoutInfo, tickTockBufferSize), tockChan: make(chan timeoutInfo, tickTockBufferSize), + timeoutParams: InitTimeoutParamsFromConfig(), } cs.updateToState(state) // Don't call scheduleRound0 yet. @@ -453,9 +494,9 @@ func (cs *ConsensusState) updateToState(state *sm.State) { // to be gathered for the first block. // And alternative solution that relies on clocks: // cs.StartTime = state.LastBlockTime.Add(timeoutCommit) - cs.StartTime = time.Now().Add(timeoutCommit) + cs.StartTime = cs.timeoutParams.Commit(time.Now()) } else { - cs.StartTime = cs.CommitTime.Add(timeoutCommit) + cs.StartTime = cs.timeoutParams.Commit(cs.CommitTime) } cs.CommitTime = time.Time{} cs.Validators = validators @@ -726,8 +767,8 @@ func (cs *ConsensusState) enterPropose(height int, round int) { } }() - // This step times out after `timeoutPropose` - cs.scheduleTimeout(timeoutPropose0+timeoutProposeDelta*time.Duration(round), height, round, RoundStepPropose) + // If we don't get the proposal and all block parts quick enough, enterPrevote + cs.scheduleTimeout(cs.timeoutParams.Propose(round), height, round, RoundStepPropose) // Nothing more to do if we're not a validator if cs.privValidator == nil { @@ -821,6 +862,12 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts // Mempool validated transactions txs := cs.mempool.Reap() + // Cap the number of txs in a block + maxBlockSize := config.GetInt("block_size") + if maxBlockSize > 0 && maxBlockSize < len(txs) { + txs = txs[:maxBlockSize] + } + block = &types.Block{ Header: &types.Header{ ChainID: cs.state.ChainID, @@ -928,8 +975,8 @@ func (cs *ConsensusState) enterPrevoteWait(height int, round int) { cs.newStep() }() - // After `timeoutPrevote0+timeoutPrevoteDelta*round`, enterPrecommit() - cs.scheduleTimeout(timeoutPrevote0+timeoutPrevoteDelta*time.Duration(round), height, round, RoundStepPrevoteWait) + // Wait for some more prevotes; enterPrecommit + cs.scheduleTimeout(cs.timeoutParams.Prevote(round), height, round, RoundStepPrevoteWait) } // Enter: +2/3 precomits for block or nil. @@ -1049,8 +1096,8 @@ func (cs *ConsensusState) enterPrecommitWait(height int, round int) { cs.newStep() }() - // After `timeoutPrecommit0+timeoutPrecommitDelta*round`, enterNewRound() - cs.scheduleTimeout(timeoutPrecommit0+timeoutPrecommitDelta*time.Duration(round), height, round, RoundStepPrecommitWait) + // Wait for some more precommits; enterNewRound + cs.scheduleTimeout(cs.timeoutParams.Precommit(round), height, round, RoundStepPrecommitWait) } diff --git a/consensus/state_test.go b/consensus/state_test.go index 1def6ae8e..b6e55d6ca 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -11,6 +11,10 @@ import ( "github.com/tendermint/tendermint/types" ) +func (tp *TimeoutParams) ensureProposeTimeout() time.Duration { + return time.Duration(tp.Propose0*2) * time.Millisecond +} + /* ProposeSuite @@ -44,12 +48,6 @@ x * TestHalt1 - if we see +2/3 precommits after timing out into new round, we sh //---------------------------------------------------------------------------------------------------- // ProposeSuite -func init() { - fmt.Println("") - timeoutPropose0 = 100 * time.Millisecond - timeoutProposeDelta = 1 * time.Millisecond -} - func TestProposerSelection0(t *testing.T) { cs1, vss := randConsensusState(4) height, round := cs1.Height, cs1.Round @@ -124,7 +122,7 @@ func TestEnterProposeNoPrivValidator(t *testing.T) { startTestRound(cs, height, round) // if we're not a validator, EnterPropose should timeout - ticker := time.NewTicker(timeoutPropose0 * 2) + ticker := time.NewTicker(cs.timeoutParams.ensureProposeTimeout()) select { case <-timeoutCh: case <-ticker.C: @@ -165,7 +163,7 @@ func TestEnterProposeYesPrivValidator(t *testing.T) { } // if we're a validator, enterPropose should not timeout - ticker := time.NewTicker(timeoutPropose0 * 2) + ticker := time.NewTicker(cs.timeoutParams.ensureProposeTimeout()) select { case <-timeoutCh: t.Fatal("Expected EnterPropose not to timeout") diff --git a/mempool/mempool.go b/mempool/mempool.go index 2af9bc0e1..1d0a375d6 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -223,12 +223,12 @@ func (mem *Mempool) Update(height int, txs []types.Tx) { // Remove transactions that are already in txs. goodTxs := mem.filterTxs(txsMap) // Recheck mempool txs - // TODO: make optional - mem.recheckTxs(goodTxs) - - // At this point, mem.txs are being rechecked. - // mem.recheckCursor re-scans mem.txs and possibly removes some txs. - // Before mem.Reap(), we should wait for mem.recheckCursor to be nil. + if config.GetBool("mempool_recheck") { + mem.recheckTxs(goodTxs) + // At this point, mem.txs are being rechecked. + // mem.recheckCursor re-scans mem.txs and possibly removes some txs. + // Before mem.Reap(), we should wait for mem.recheckCursor to be nil. + } } func (mem *Mempool) filterTxs(blockTxsMap map[string]struct{}) []types.Tx { diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index 1d93efc90..82738c180 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -5,6 +5,7 @@ import ( "sync" "testing" + _ "github.com/tendermint/tendermint/config/tendermint_test" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" "github.com/tendermint/tmsp/example/counter"