Browse Source

config: block size, consensus timeouts, recheck tx

pull/192/head
Ethan Buchman 9 years ago
parent
commit
c9ec9cf00e
8 changed files with 107 additions and 37 deletions
  1. +2
    -1
      blockchain/pool.go
  2. +4
    -1
      blockchain/pool_test.go
  3. +12
    -2
      config/tendermint/config.go
  4. +12
    -2
      config/tendermint_test/config.go
  5. +64
    -17
      consensus/state.go
  6. +6
    -8
      consensus/state_test.go
  7. +6
    -6
      mempool/mempool.go
  8. +1
    -0
      mempool/mempool_test.go

+ 2
- 1
blockchain/pool.go View File

@ -15,10 +15,11 @@ const (
maxTotalRequesters = 300 maxTotalRequesters = 300
maxPendingRequests = maxTotalRequesters maxPendingRequests = maxTotalRequesters
maxPendingRequestsPerPeer = 75 maxPendingRequestsPerPeer = 75
peerTimeoutSeconds = 15
minRecvRate = 10240 // 10Kb/s minRecvRate = 10240 // 10Kb/s
) )
var peerTimeoutSeconds = time.Duration(15) // not const so we can override with tests
/* /*
Peers self report their heights when we join the block pool. Peers self report their heights when we join the block pool.
Starting from our latest pool.height, we request blocks Starting from our latest pool.height, we request blocks


+ 4
- 1
blockchain/pool_test.go View File

@ -1,4 +1,3 @@
package blockchain package blockchain
import ( import (
@ -10,6 +9,10 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
func init() {
peerTimeoutSeconds = time.Duration(2)
}
type testPeer struct { type testPeer struct {
id string id string
height int height int


+ 12
- 2
config/tendermint/config.go View File

@ -62,12 +62,22 @@ func GetConfig(rootDir string) cfg.Config {
mapConfig.SetDefault("priv_validator_file", rootDir+"/priv_validator.json") mapConfig.SetDefault("priv_validator_file", rootDir+"/priv_validator.json")
mapConfig.SetDefault("db_backend", "leveldb") mapConfig.SetDefault("db_backend", "leveldb")
mapConfig.SetDefault("db_dir", rootDir+"/data") mapConfig.SetDefault("db_dir", rootDir+"/data")
mapConfig.SetDefault("vm_log", true)
mapConfig.SetDefault("log_level", "info") mapConfig.SetDefault("log_level", "info")
mapConfig.SetDefault("rpc_laddr", "0.0.0.0:46657") mapConfig.SetDefault("rpc_laddr", "0.0.0.0:46657")
mapConfig.SetDefault("prof_laddr", "") mapConfig.SetDefault("prof_laddr", "")
mapConfig.SetDefault("revision_file", rootDir+"/revision") mapConfig.SetDefault("revision_file", rootDir+"/revision")
mapConfig.SetDefault("cswal", rootDir+"/cswal")
mapConfig.SetDefault("cswal", rootDir+"/data/cswal")
mapConfig.SetDefault("block_size", 10000)
mapConfig.SetDefault("timeout_propose", 3000)
mapConfig.SetDefault("timeout_propose_delta", 500)
mapConfig.SetDefault("timeout_prevote", 1000)
mapConfig.SetDefault("timeout_prevote_delta", 500)
mapConfig.SetDefault("timeout_precommit", 1000)
mapConfig.SetDefault("timeout_precommit_delta", 500)
mapConfig.SetDefault("timeout_commit", 1000)
mapConfig.SetDefault("mempool_recheck", true)
return mapConfig return mapConfig
} }


+ 12
- 2
config/tendermint_test/config.go View File

@ -87,11 +87,21 @@ func GetConfig(rootDir string) cfg.Config {
mapConfig.SetDefault("db_backend", "memdb") mapConfig.SetDefault("db_backend", "memdb")
mapConfig.SetDefault("db_dir", rootDir+"/data") mapConfig.SetDefault("db_dir", rootDir+"/data")
mapConfig.SetDefault("log_level", "debug") mapConfig.SetDefault("log_level", "debug")
mapConfig.SetDefault("vm_log", true)
mapConfig.SetDefault("rpc_laddr", "0.0.0.0:36657") mapConfig.SetDefault("rpc_laddr", "0.0.0.0:36657")
mapConfig.SetDefault("prof_laddr", "") mapConfig.SetDefault("prof_laddr", "")
mapConfig.SetDefault("revision_file", rootDir+"/revision") mapConfig.SetDefault("revision_file", rootDir+"/revision")
mapConfig.SetDefault("cswal", rootDir+"/cswal")
mapConfig.SetDefault("cswal", rootDir+"/data/cswal")
mapConfig.SetDefault("block_size", 10000)
mapConfig.SetDefault("timeout_propose", 100)
mapConfig.SetDefault("timeout_propose_delta", 1)
mapConfig.SetDefault("timeout_prevote", 1)
mapConfig.SetDefault("timeout_prevote_delta", 1)
mapConfig.SetDefault("timeout_precommit", 1)
mapConfig.SetDefault("timeout_precommit_delta", 1)
mapConfig.SetDefault("timeout_commit", 1)
mapConfig.SetDefault("mempool_recheck", true)
return mapConfig return mapConfig
} }


+ 64
- 17
consensus/state.go View File

@ -18,16 +18,55 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
var (
timeoutPropose0 = 3000 * time.Millisecond // Wait this long for a proposal
timeoutProposeDelta = 0500 * time.Millisecond // timeoutProposeN is timeoutPropose0 + timeoutProposeDelta*N
timeoutPrevote0 = 1000 * time.Millisecond // After any +2/3 prevotes received, wait this long for stragglers.
timeoutPrevoteDelta = 0500 * time.Millisecond // timeoutPrevoteN is timeoutPrevote0 + timeoutPrevoteDelta*N
timeoutPrecommit0 = 1000 * time.Millisecond // After any +2/3 precommits received, wait this long for stragglers.
timeoutPrecommitDelta = 0500 * time.Millisecond // timeoutPrecommitN is timeoutPrecommit0 + timeoutPrecommitDelta*N
timeoutCommit = 1000 * time.Millisecond // After +2/3 commits received for committed block, wait this long for stragglers in the next height's RoundStepNewHeight.
//-----------------------------------------------------------------------------
// Timeout Parameters
// All in milliseconds
type TimeoutParams struct {
Propose0 int
ProposeDelta int
Prevote0 int
PrevoteDelta int
Precommit0 int
PrecommitDelta int
Commit0 int
}
)
// Wait this long for a proposal
func (tp *TimeoutParams) Propose(round int) time.Duration {
return time.Duration(tp.Propose0+tp.ProposeDelta*round) * time.Millisecond
}
// After receiving any +2/3 prevote, wait this long for stragglers
func (tp *TimeoutParams) Prevote(round int) time.Duration {
return time.Duration(tp.Prevote0+tp.PrevoteDelta*round) * time.Millisecond
}
// After receiving any +2/3 precommits, wait this long for stragglers
func (tp *TimeoutParams) Precommit(round int) time.Duration {
return time.Duration(tp.Precommit0+tp.PrecommitDelta*round) * time.Millisecond
}
// After receiving +2/3 precommits for a single block (a commit), wait this long for stragglers in the next height's RoundStepNewHeight
func (tp *TimeoutParams) Commit(t time.Time) time.Time {
return t.Add(time.Duration(tp.Commit0) * time.Millisecond)
}
// Initialize parameters from config file
func InitTimeoutParamsFromConfig() *TimeoutParams {
return &TimeoutParams{
Propose0: config.GetInt("timeout_propose"),
ProposeDelta: config.GetInt("timeout_propose_delta"),
Prevote0: config.GetInt("timeout_prevote"),
PrevoteDelta: config.GetInt("timeout_prevote_delta"),
Precommit0: config.GetInt("timeout_precommit"),
PrecommitDelta: config.GetInt("timeout_precommit_delta"),
Commit0: config.GetInt("timeout_commit"),
}
}
//-----------------------------------------------------------------------------
// Errors
var ( var (
ErrInvalidProposalSignature = errors.New("Error invalid proposal signature") ErrInvalidProposalSignature = errors.New("Error invalid proposal signature")
@ -188,6 +227,7 @@ type ConsensusState struct {
timeoutTicker *time.Ticker // ticker for timeouts timeoutTicker *time.Ticker // ticker for timeouts
tickChan chan timeoutInfo // start the timeoutTicker in the timeoutRoutine tickChan chan timeoutInfo // start the timeoutTicker in the timeoutRoutine
tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine
timeoutParams *TimeoutParams // parameters and functions for timeout intervals
evsw *events.EventSwitch evsw *events.EventSwitch
@ -206,6 +246,7 @@ func NewConsensusState(state *sm.State, proxyAppConn proxy.AppConn, blockStore *
timeoutTicker: new(time.Ticker), timeoutTicker: new(time.Ticker),
tickChan: make(chan timeoutInfo, tickTockBufferSize), tickChan: make(chan timeoutInfo, tickTockBufferSize),
tockChan: make(chan timeoutInfo, tickTockBufferSize), tockChan: make(chan timeoutInfo, tickTockBufferSize),
timeoutParams: InitTimeoutParamsFromConfig(),
} }
cs.updateToState(state) cs.updateToState(state)
// Don't call scheduleRound0 yet. // Don't call scheduleRound0 yet.
@ -453,9 +494,9 @@ func (cs *ConsensusState) updateToState(state *sm.State) {
// to be gathered for the first block. // to be gathered for the first block.
// And alternative solution that relies on clocks: // And alternative solution that relies on clocks:
// cs.StartTime = state.LastBlockTime.Add(timeoutCommit) // cs.StartTime = state.LastBlockTime.Add(timeoutCommit)
cs.StartTime = time.Now().Add(timeoutCommit)
cs.StartTime = cs.timeoutParams.Commit(time.Now())
} else { } else {
cs.StartTime = cs.CommitTime.Add(timeoutCommit)
cs.StartTime = cs.timeoutParams.Commit(cs.CommitTime)
} }
cs.CommitTime = time.Time{} cs.CommitTime = time.Time{}
cs.Validators = validators cs.Validators = validators
@ -726,8 +767,8 @@ func (cs *ConsensusState) enterPropose(height int, round int) {
} }
}() }()
// This step times out after `timeoutPropose`
cs.scheduleTimeout(timeoutPropose0+timeoutProposeDelta*time.Duration(round), height, round, RoundStepPropose)
// If we don't get the proposal and all block parts quick enough, enterPrevote
cs.scheduleTimeout(cs.timeoutParams.Propose(round), height, round, RoundStepPropose)
// Nothing more to do if we're not a validator // Nothing more to do if we're not a validator
if cs.privValidator == nil { if cs.privValidator == nil {
@ -821,6 +862,12 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts
// Mempool validated transactions // Mempool validated transactions
txs := cs.mempool.Reap() txs := cs.mempool.Reap()
// Cap the number of txs in a block
maxBlockSize := config.GetInt("block_size")
if maxBlockSize > 0 && maxBlockSize < len(txs) {
txs = txs[:maxBlockSize]
}
block = &types.Block{ block = &types.Block{
Header: &types.Header{ Header: &types.Header{
ChainID: cs.state.ChainID, ChainID: cs.state.ChainID,
@ -928,8 +975,8 @@ func (cs *ConsensusState) enterPrevoteWait(height int, round int) {
cs.newStep() cs.newStep()
}() }()
// After `timeoutPrevote0+timeoutPrevoteDelta*round`, enterPrecommit()
cs.scheduleTimeout(timeoutPrevote0+timeoutPrevoteDelta*time.Duration(round), height, round, RoundStepPrevoteWait)
// Wait for some more prevotes; enterPrecommit
cs.scheduleTimeout(cs.timeoutParams.Prevote(round), height, round, RoundStepPrevoteWait)
} }
// Enter: +2/3 precomits for block or nil. // Enter: +2/3 precomits for block or nil.
@ -1049,8 +1096,8 @@ func (cs *ConsensusState) enterPrecommitWait(height int, round int) {
cs.newStep() cs.newStep()
}() }()
// After `timeoutPrecommit0+timeoutPrecommitDelta*round`, enterNewRound()
cs.scheduleTimeout(timeoutPrecommit0+timeoutPrecommitDelta*time.Duration(round), height, round, RoundStepPrecommitWait)
// Wait for some more precommits; enterNewRound
cs.scheduleTimeout(cs.timeoutParams.Precommit(round), height, round, RoundStepPrecommitWait)
} }


+ 6
- 8
consensus/state_test.go View File

@ -11,6 +11,10 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
func (tp *TimeoutParams) ensureProposeTimeout() time.Duration {
return time.Duration(tp.Propose0*2) * time.Millisecond
}
/* /*
ProposeSuite ProposeSuite
@ -44,12 +48,6 @@ x * TestHalt1 - if we see +2/3 precommits after timing out into new round, we sh
//---------------------------------------------------------------------------------------------------- //----------------------------------------------------------------------------------------------------
// ProposeSuite // ProposeSuite
func init() {
fmt.Println("")
timeoutPropose0 = 100 * time.Millisecond
timeoutProposeDelta = 1 * time.Millisecond
}
func TestProposerSelection0(t *testing.T) { func TestProposerSelection0(t *testing.T) {
cs1, vss := randConsensusState(4) cs1, vss := randConsensusState(4)
height, round := cs1.Height, cs1.Round height, round := cs1.Height, cs1.Round
@ -124,7 +122,7 @@ func TestEnterProposeNoPrivValidator(t *testing.T) {
startTestRound(cs, height, round) startTestRound(cs, height, round)
// if we're not a validator, EnterPropose should timeout // if we're not a validator, EnterPropose should timeout
ticker := time.NewTicker(timeoutPropose0 * 2)
ticker := time.NewTicker(cs.timeoutParams.ensureProposeTimeout())
select { select {
case <-timeoutCh: case <-timeoutCh:
case <-ticker.C: case <-ticker.C:
@ -165,7 +163,7 @@ func TestEnterProposeYesPrivValidator(t *testing.T) {
} }
// if we're a validator, enterPropose should not timeout // if we're a validator, enterPropose should not timeout
ticker := time.NewTicker(timeoutPropose0 * 2)
ticker := time.NewTicker(cs.timeoutParams.ensureProposeTimeout())
select { select {
case <-timeoutCh: case <-timeoutCh:
t.Fatal("Expected EnterPropose not to timeout") t.Fatal("Expected EnterPropose not to timeout")


+ 6
- 6
mempool/mempool.go View File

@ -223,12 +223,12 @@ func (mem *Mempool) Update(height int, txs []types.Tx) {
// Remove transactions that are already in txs. // Remove transactions that are already in txs.
goodTxs := mem.filterTxs(txsMap) goodTxs := mem.filterTxs(txsMap)
// Recheck mempool txs // Recheck mempool txs
// TODO: make optional
mem.recheckTxs(goodTxs)
// At this point, mem.txs are being rechecked.
// mem.recheckCursor re-scans mem.txs and possibly removes some txs.
// Before mem.Reap(), we should wait for mem.recheckCursor to be nil.
if config.GetBool("mempool_recheck") {
mem.recheckTxs(goodTxs)
// At this point, mem.txs are being rechecked.
// mem.recheckCursor re-scans mem.txs and possibly removes some txs.
// Before mem.Reap(), we should wait for mem.recheckCursor to be nil.
}
} }
func (mem *Mempool) filterTxs(blockTxsMap map[string]struct{}) []types.Tx { func (mem *Mempool) filterTxs(blockTxsMap map[string]struct{}) []types.Tx {


+ 1
- 0
mempool/mempool_test.go View File

@ -5,6 +5,7 @@ import (
"sync" "sync"
"testing" "testing"
_ "github.com/tendermint/tendermint/config/tendermint_test"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
"github.com/tendermint/tmsp/example/counter" "github.com/tendermint/tmsp/example/counter"


Loading…
Cancel
Save