Browse Source

Merge pull request #791 from tendermint/740-no-wal-after-fast-sync

dont catchupReplay on wal if we fast synced
pull/796/head
Ethan Buchman 7 years ago
committed by GitHub
parent
commit
3319ad03b8
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 58 additions and 29 deletions
  1. +20
    -12
      blockchain/pool.go
  2. +20
    -7
      blockchain/reactor.go
  3. +2
    -2
      consensus/byzantine_test.go
  4. +5
    -1
      consensus/reactor.go
  5. +1
    -1
      consensus/reactor_test.go
  6. +10
    -6
      consensus/state.go

+ 20
- 12
blockchain/pool.go View File

@ -26,10 +26,10 @@ eg, L = latency = 0.1s
*/
const (
requestIntervalMS = 250
maxTotalRequesters = 300
requestIntervalMS = 100
maxTotalRequesters = 1000
maxPendingRequests = maxTotalRequesters
maxPendingRequestsPerPeer = 10
maxPendingRequestsPerPeer = 50
minRecvRate = 10240 // 10Kb/s
)
@ -56,7 +56,8 @@ type BlockPool struct {
height int // the lowest key in requesters.
numPending int32 // number of requests pending assignment or block response
// peers
peers map[string]*bpPeer
peers map[string]*bpPeer
maxPeerHeight int
requestsCh chan<- BlockRequest
timeoutsCh chan<- string
@ -87,10 +88,12 @@ func (pool *BlockPool) OnStop() {}
// Run spawns requesters as needed.
func (pool *BlockPool) makeRequestersRoutine() {
for {
if !pool.IsRunning() {
break
}
_, numPending, lenRequesters := pool.GetStatus()
if numPending >= maxPendingRequests {
// sleep for a bit.
@ -147,16 +150,10 @@ func (pool *BlockPool) IsCaughtUp() bool {
return false
}
maxPeerHeight := 0
for _, peer := range pool.peers {
maxPeerHeight = cmn.MaxInt(maxPeerHeight, peer.height)
}
// some conditions to determine if we're caught up
receivedBlockOrTimedOut := (pool.height > 0 || time.Since(pool.startTime) > 5*time.Second)
ourChainIsLongestAmongPeers := maxPeerHeight == 0 || pool.height >= maxPeerHeight
ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= pool.maxPeerHeight
isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers
pool.Logger.Info(cmn.Fmt("IsCaughtUp: %v", isCaughtUp), "height", pool.height, "maxPeerHeight", maxPeerHeight)
return isCaughtUp
}
@ -235,6 +232,13 @@ func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int
}
}
// MaxPeerHeight returns the heighest height reported by a peer
func (pool *BlockPool) MaxPeerHeight() int {
pool.mtx.Lock()
defer pool.mtx.Unlock()
return pool.maxPeerHeight
}
// Sets the peer's alleged blockchain height.
func (pool *BlockPool) SetPeerHeight(peerID string, height int) {
pool.mtx.Lock()
@ -248,6 +252,10 @@ func (pool *BlockPool) SetPeerHeight(peerID string, height int) {
peer.setLogger(pool.Logger.With("peer", peerID))
pool.peers[peerID] = peer
}
if height > pool.maxPeerHeight {
pool.maxPeerHeight = height
}
}
func (pool *BlockPool) RemovePeer(peerID string) {
@ -298,7 +306,7 @@ func (pool *BlockPool) makeNextRequester() {
nextHeight := pool.height + len(pool.requesters)
request := newBPRequester(pool, nextHeight)
request.SetLogger(pool.Logger.With("height", nextHeight))
// request.SetLogger(pool.Logger.With("height", nextHeight))
pool.requesters[nextHeight] = request
pool.numPending++


+ 20
- 7
blockchain/reactor.go View File

@ -19,8 +19,8 @@ const (
// BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
BlockchainChannel = byte(0x40)
defaultChannelCapacity = 100
trySyncIntervalMS = 100
defaultChannelCapacity = 1000
trySyncIntervalMS = 50
// stop syncing when last block's time is
// within this much of the system time.
// stopSyncingDurationMinutes = 10
@ -34,7 +34,7 @@ const (
type consensusReactor interface {
// for when we switch from blockchain reactor and fast sync to
// the consensus machine
SwitchToConsensus(*sm.State)
SwitchToConsensus(*sm.State, int)
}
// BlockchainReactor handles long-term catchup syncing.
@ -110,8 +110,8 @@ func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{
&p2p.ChannelDescriptor{
ID: BlockchainChannel,
Priority: 5,
SendQueueCapacity: 100,
Priority: 10,
SendQueueCapacity: 1000,
},
}
}
@ -194,8 +194,13 @@ func (bcR *BlockchainReactor) poolRoutine() {
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
blocksSynced := 0
chainID := bcR.state.ChainID
lastHundred := time.Now()
lastRate := 0.0
FOR_LOOP:
for {
select {
@ -223,14 +228,14 @@ FOR_LOOP:
case <-switchToConsensusTicker.C:
height, numPending, lenRequesters := bcR.pool.GetStatus()
outbound, inbound, _ := bcR.Switch.NumPeers()
bcR.Logger.Info("Consensus ticker", "numPending", numPending, "total", lenRequesters,
bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters,
"outbound", outbound, "inbound", inbound)
if bcR.pool.IsCaughtUp() {
bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
bcR.pool.Stop()
conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
conR.SwitchToConsensus(bcR.state)
conR.SwitchToConsensus(bcR.state, blocksSynced)
break FOR_LOOP
}
@ -271,6 +276,14 @@ FOR_LOOP:
// TODO This is bad, are we zombie?
cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
}
blocksSynced += 1
if blocksSynced%100 == 0 {
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height,
"max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate)
lastHundred = time.Now()
}
}
}
continue FOR_LOOP


+ 2
- 2
consensus/byzantine_test.go View File

@ -101,10 +101,10 @@ func TestByzantine(t *testing.T) {
// start the state machines
byzR := reactors[0].(*ByzantineReactor)
s := byzR.reactor.conS.GetState()
byzR.reactor.SwitchToConsensus(s)
byzR.reactor.SwitchToConsensus(s, 0)
for i := 1; i < N; i++ {
cr := reactors[i].(*ConsensusReactor)
cr.SwitchToConsensus(cr.conS.GetState())
cr.SwitchToConsensus(cr.conS.GetState(), 0)
}
// byz proposer sends one block to peers[0]


+ 5
- 1
consensus/reactor.go View File

@ -76,7 +76,7 @@ func (conR *ConsensusReactor) OnStop() {
// SwitchToConsensus switches from fast_sync mode to consensus mode.
// It resets the state, turns off fast_sync, and starts the consensus state-machine
func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State, blocksSynced int) {
conR.Logger.Info("SwitchToConsensus")
conR.conS.reconstructLastCommit(state)
// NOTE: The line below causes broadcastNewRoundStepRoutine() to
@ -87,6 +87,10 @@ func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
conR.fastSync = false
conR.mtx.Unlock()
if blocksSynced > 0 {
// dont bother with the WAL if we fast synced
conR.conS.doWALCatchup = false
}
conR.conS.Start()
}


+ 1
- 1
consensus/reactor_test.go View File

@ -54,7 +54,7 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int, subscribeEven
// we'd block when the cs fires NewBlockEvent and the peers are trying to start their reactors
for i := 0; i < N; i++ {
s := reactors[i].conS.GetState()
reactors[i].SwitchToConsensus(s)
reactors[i].SwitchToConsensus(s, 0)
}
return reactors, eventChans
}


+ 10
- 6
consensus/state.go View File

@ -97,8 +97,9 @@ type ConsensusState struct {
// a Write-Ahead Log ensures we can recover from any kind of crash
// and helps us avoid signing conflicting votes
wal *WAL
replayMode bool // so we don't log signing errors during replay
wal *WAL
replayMode bool // so we don't log signing errors during replay
doWALCatchup bool // determines if we even try to do the catchup
// for tests where we want to limit the number of transitions the state makes
nSteps int
@ -123,6 +124,7 @@ func NewConsensusState(config *cfg.ConsensusConfig, state *sm.State, proxyAppCon
internalMsgQueue: make(chan msgInfo, msgQueueSize),
timeoutTicker: NewTimeoutTicker(),
done: make(chan struct{}),
doWALCatchup: true,
}
// set function defaults (may be overwritten before calling Start)
cs.decideProposal = cs.defaultDecideProposal
@ -226,10 +228,12 @@ func (cs *ConsensusState) OnStart() error {
// we may have lost some votes if the process crashed
// reload from consensus log to catchup
if err := cs.catchupReplay(cs.Height); err != nil {
cs.Logger.Error("Error on catchup replay. Proceeding to start ConsensusState anyway", "err", err.Error())
// NOTE: if we ever do return an error here,
// make sure to stop the timeoutTicker
if cs.doWALCatchup {
if err := cs.catchupReplay(cs.Height); err != nil {
cs.Logger.Error("Error on catchup replay. Proceeding to start ConsensusState anyway", "err", err.Error())
// NOTE: if we ever do return an error here,
// make sure to stop the timeoutTicker
}
}
// now start the receiveRoutine


Loading…
Cancel
Save