diff --git a/blockchain/pool.go b/blockchain/pool.go index 7b4e2145a..5c8b280b9 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -18,15 +18,13 @@ const ( maxRequestsPerPeer = 300 ) -// numTotal = numPending + blocks in the pool we havnt synced yet - var ( requestTimeoutSeconds = time.Duration(3) ) /* Peers self report their heights when a new peer joins the block pool. - Starting from whatever we've got (pool.height), we request blocks + Starting from pool.height (inclusive), we request blocks in sequence from peers that reported higher heights than ours. Every so often we ask peers what height they're on so we can keep going. @@ -37,12 +35,11 @@ var ( type BlockPool struct { // block requests - requestsMtx sync.Mutex - requests map[uint]*bpRequest - peerless int32 // number of requests without peers - height uint // the lowest key in requests. - numPending int32 - numTotal int32 + requestsMtx sync.Mutex + requests map[uint]*bpRequest + height uint // the lowest key in requests. + numUnassigned int32 // number of requests not yet assigned to a peer + numPending int32 // number of requests pending assignment or block response // peers peersMtx sync.Mutex @@ -59,10 +56,10 @@ func NewBlockPool(start uint, requestsCh chan<- BlockRequest, timeoutsCh chan<- return &BlockPool{ peers: make(map[string]*bpPeer), - requests: make(map[uint]*bpRequest), - height: start, - numPending: 0, - numTotal: 0, + requests: make(map[uint]*bpRequest), + height: start, + numUnassigned: 0, + numPending: 0, requestsCh: requestsCh, timeoutsCh: timeoutsCh, @@ -97,26 +94,25 @@ RUN_LOOP: if atomic.LoadInt32(&pool.running) == 0 { break RUN_LOOP } - _, numPending, numTotal := pool.GetStatus() + _, numPending := pool.GetStatus() if numPending >= maxPendingRequests { // sleep for a bit. time.Sleep(requestIntervalMS * time.Millisecond) - } else if numTotal >= maxTotalRequests { + } else if len(pool.requests) >= maxTotalRequests { // sleep for a bit. time.Sleep(requestIntervalMS * time.Millisecond) } else { // request for more blocks. - height := pool.nextHeight() - pool.makeRequest(height) + pool.makeNextRequest() } } } -func (pool *BlockPool) GetStatus() (uint, int32, int32) { +func (pool *BlockPool) GetStatus() (uint, int32) { pool.requestsMtx.Lock() // Lock defer pool.requestsMtx.Unlock() - return pool.height, pool.numPending, pool.numTotal + return pool.height, pool.numPending } // We need to see the second block's Validation to validate the first block. @@ -146,7 +142,6 @@ func (pool *BlockPool) PopRequest() { delete(pool.requests, pool.height) pool.height++ - pool.numTotal-- } // Invalidates the block at pool.height. @@ -165,7 +160,7 @@ func (pool *BlockPool) RedoRequest(height uint) { request.block = nil request.peerId = "" pool.numPending++ - pool.peerless++ + pool.numUnassigned++ go requestRoutine(pool, height) } @@ -186,7 +181,7 @@ func (pool *BlockPool) setPeerForRequest(height uint, peerId string) { if request == nil { return } - pool.peerless-- + pool.numUnassigned-- request.peerId = peerId } @@ -198,7 +193,7 @@ func (pool *BlockPool) removePeerForRequest(height uint, peerId string) { if request == nil { return } - pool.peerless++ + pool.numUnassigned++ request.peerId = "" } @@ -283,34 +278,22 @@ func (pool *BlockPool) decrPeer(peerId string) { peer.numRequests-- } -func (pool *BlockPool) nextHeight() uint { - pool.requestsMtx.Lock() // Lock - defer pool.requestsMtx.Unlock() - - // we make one request per height. - return pool.height + uint(pool.numTotal) -} - -func (pool *BlockPool) makeRequest(height uint) { +func (pool *BlockPool) makeNextRequest() { pool.requestsMtx.Lock() // Lock defer pool.requestsMtx.Unlock() + nextHeight := pool.height + uint(len(pool.requests)) request := &bpRequest{ - height: height, + height: nextHeight, peerId: "", block: nil, } - pool.requests[height] = request - - pool.peerless++ - nextHeight := pool.height + uint(pool.numTotal) - if nextHeight == height { - pool.numTotal++ - pool.numPending++ - } + pool.requests[nextHeight] = request + pool.numUnassigned++ + pool.numPending++ - go requestRoutine(pool, height) + go requestRoutine(pool, nextHeight) } func (pool *BlockPool) sendRequest(height uint, peerId string) { @@ -332,7 +315,7 @@ func (pool *BlockPool) debug() string { defer pool.requestsMtx.Unlock() str := "" - for h := pool.height; h < pool.height+uint(pool.numTotal); h++ { + for h := pool.height; h < pool.height+uint(len(pool.requests)); h++ { if pool.requests[h] == nil { str += Fmt("H(%v):X ", h) } else { @@ -379,7 +362,7 @@ func requestRoutine(pool *BlockPool, height uint) { break PICK_LOOP } - // set the peer, decrement peerless + // set the peer, decrement numUnassigned pool.setPeerForRequest(height, peer.id) for try := 0; try < maxTries; try++ { @@ -391,14 +374,14 @@ func requestRoutine(pool *BlockPool, height uint) { return } // or already processed and we've moved past it - bpHeight, _, _ := pool.GetStatus() + bpHeight, _ := pool.GetStatus() if height < bpHeight { pool.decrPeer(peer.id) return } } - // unset the peer, increment peerless + // unset the peer, increment numUnassigned pool.removePeerForRequest(height, peer.id) // this peer failed us, try again diff --git a/blockchain/reactor.go b/blockchain/reactor.go index bc5bb38f0..e48da66f9 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -10,7 +10,6 @@ import ( "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" - dbm "github.com/tendermint/tendermint/db" "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/p2p" sm "github.com/tendermint/tendermint/state" @@ -33,8 +32,9 @@ const ( ) type consensusReactor interface { - SetSyncing(bool) - ResetToState(*sm.State) + // for when we switch from blockchain reactor and fast sync to + // the consensus machine + SwitchToConsensus(*sm.State) } // BlockchainReactor handles long-term catchup syncing. @@ -163,6 +163,8 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) } // Handle messages from the poolReactor telling the reactor what to do. +// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! +// (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.) func (bcR *BlockchainReactor) poolRoutine() { trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) @@ -175,14 +177,14 @@ FOR_LOOP: case request := <-bcR.requestsCh: // chan BlockRequest peer := bcR.sw.Peers().Get(request.PeerId) if peer == nil { - // We can't fulfill the request. + // We can't assign the request. continue FOR_LOOP } msg := &bcBlockRequestMessage{request.Height} queued := peer.TrySend(BlockchainChannel, msg) if !queued { - // We couldn't queue the request. - time.Sleep(defaultSleepIntervalMS * time.Millisecond) + // We couldn't make the request, send-queue full. + // The pool handles retries, so just let it go. continue FOR_LOOP } case peerId := <-bcR.timeoutsCh: // chan string @@ -195,28 +197,28 @@ FOR_LOOP: // ask for status updates go bcR.BroadcastStatusRequest() case _ = <-switchToConsensusTicker.C: - // not thread safe access for peerless and numPending but should be fine - log.Debug("Consensus ticker", "peerless", bcR.pool.peerless, "pending", bcR.pool.numPending, "total", bcR.pool.numTotal) + // not thread safe access for numUnassigned and numPending but should be fine + // TODO make threadsafe and use exposed functions + outbound, inbound, _ := bcR.sw.NumPeers() + log.Debug("Consensus ticker", "numUnassigned", bcR.pool.numUnassigned, "numPending", bcR.pool.numPending, + "total", len(bcR.pool.requests), "outbound", outbound, "inbound", inbound) // NOTE: this condition is very strict right now. may need to weaken - // if the max amount of requests are pending and peerless - // and we have some peers (say > 5), then we're caught up + // If all `maxPendingRequests` requests are unassigned + // and we have some peers (say >= 3), then we're caught up maxPending := bcR.pool.numPending == maxPendingRequests - maxPeerless := bcR.pool.peerless == bcR.pool.numPending - o, i, _ := bcR.sw.NumPeers() - enoughPeers := o+i >= 5 - if maxPending && maxPeerless && enoughPeers { - log.Warn("Time to switch to consensus reactor!", "height", bcR.pool.height) + allUnassigned := bcR.pool.numPending == bcR.pool.numUnassigned + enoughPeers := outbound+inbound >= 3 + if maxPending && allUnassigned && enoughPeers { + log.Info("Time to switch to consensus reactor!", "height", bcR.pool.height) bcR.pool.Stop() - stateDB := dbm.GetDB("state") - state := sm.LoadState(stateDB) - bcR.sw.Reactor("CONSENSUS").(consensusReactor).ResetToState(state) - bcR.sw.Reactor("CONSENSUS").(consensusReactor).SetSyncing(false) + conR := bcR.sw.Reactor("CONSENSUS").(consensusReactor) + conR.SwitchToConsensus(bcR.state) break FOR_LOOP } case _ = <-trySyncTicker.C: // chan time - //var lastValidatedBlock *types.Block + // This loop can be slow as long as it's doing syncing work. SYNC_LOOP: for i := 0; i < 10; i++ { // See if there are any blocks to sync. @@ -244,33 +246,8 @@ FOR_LOOP: } bcR.store.SaveBlock(first, firstParts, second.Validation) bcR.state.Save() - //lastValidatedBlock = first } } - /* - // We're done syncing for now (will do again shortly) - // See if we want to stop syncing and turn on the - // consensus reactor. - // TODO: use other heuristics too besides blocktime. - // It's not a security concern, as it only needs to happen - // upon node sync, and there's also a second (slower) - // this peer failed us - // method of syncing in the consensus reactor. - - if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute { - go func() { - log.Info("Stopping blockpool syncing, turning on consensus...") - trySyncTicker.Stop() // Just stop the block requests. Still serve blocks to others. - conR := bcR.sw.Reactor("CONSENSUS") - conR.(stateResetter).ResetToState(bcR.state) - conR.Start(bcR.sw) - for _, peer := range bcR.sw.Peers().List() { - conR.AddPeer(peer) - } - }() - break FOR_LOOP - } - */ continue FOR_LOOP case <-bcR.quit: break FOR_LOOP diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index 51149ea2c..7f78a3c2c 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -71,7 +71,7 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("genesis_file", rootDir+"/genesis.json") mapConfig.SetDefault("moniker", "anonymous") mapConfig.SetDefault("node_laddr", "0.0.0.0:36656") - mapConfig.SetDefault("fast_sync", true) + mapConfig.SetDefault("fast_sync", false) mapConfig.SetDefault("addrbook_file", rootDir+"/addrbook.json") mapConfig.SetDefault("priv_validator_file", rootDir+"/priv_validator.json") mapConfig.SetDefault("db_backend", "memdb") @@ -94,7 +94,7 @@ network = "tendermint_test" moniker = "__MONIKER__" node_laddr = "0.0.0.0:36656" seeds = "" -fast_sync = true +fast_sync = false db_backend = "memdb" log_level = "debug" rpc_laddr = "0.0.0.0:36657" diff --git a/consensus/reactor.go b/consensus/reactor.go index 1bd0edba5..2cddd51ae 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -42,16 +42,17 @@ type ConsensusReactor struct { conS *ConsensusState // if fast sync is running we don't really do anything - syncing bool + sync bool evsw events.Fireable } -func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore) *ConsensusReactor { +func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, sync bool) *ConsensusReactor { conR := &ConsensusReactor{ blockStore: blockStore, quit: make(chan struct{}), conS: consensusState, + sync: sync, } return conR } @@ -61,7 +62,9 @@ func (conR *ConsensusReactor) Start(sw *p2p.Switch) { if atomic.CompareAndSwapUint32(&conR.running, 0, 1) { log.Info("Starting ConsensusReactor") conR.sw = sw - conR.conS.Start() + if !conR.sync { + conR.conS.Start() + } go conR.broadcastNewRoundStepRoutine() } } @@ -129,7 +132,7 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { // Implements Reactor func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) { - if conR.syncing || !conR.IsRunning() { + if conR.sync || !conR.IsRunning() { return } @@ -235,20 +238,18 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte } } -// Sets whether or not we're using the blockchain reactor for syncing -func (conR *ConsensusReactor) SetSyncing(syncing bool) { - conR.syncing = syncing -} - // Sets our private validator account for signing votes. func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) { conR.conS.SetPrivValidator(priv) } -// Reset to some state. -func (conR *ConsensusReactor) ResetToState(state *sm.State) { +// Switch from the fast sync to the consensus: +// reset the state, turn off fast sync, start the consensus-state-machine +func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) { conR.conS.updateToState(state, false) conR.conS.newStepCh <- conR.conS.getRoundState() + conR.sync = false + conR.conS.Start() } // implements events.Eventable diff --git a/node/node.go b/node/node.go index 35a165c41..ed0816a08 100644 --- a/node/node.go +++ b/node/node.go @@ -87,16 +87,11 @@ func NewNode() *Node { // Get ConsensusReactor consensusState := consensus.NewConsensusState(state, blockStore, mempoolReactor) - consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore) + consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore, config.GetBool("fast_sync")) if privValidator != nil { consensusReactor.SetPrivValidator(privValidator) } - // so the consensus reactor won't do anything until we're synced - if config.GetBool("fast_sync") { - consensusReactor.SetSyncing(true) - } - sw := p2p.NewSwitch() sw.AddReactor("PEX", pexReactor) sw.AddReactor("MEMPOOL", mempoolReactor)