From 057c8ef400089e23e7f2562ee314df80dfe07732 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Thu, 28 May 2015 03:18:13 -0700 Subject: [PATCH] waiting back to pending --- blockchain/pool.go | 42 +++++++++++++++--------------------------- blockchain/reactor.go | 21 +++++++++++---------- 2 files changed, 26 insertions(+), 37 deletions(-) diff --git a/blockchain/pool.go b/blockchain/pool.go index 1c38e876e..0503c8ebb 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -13,7 +13,7 @@ const ( maxTries = 3 inputsChannelCapacity = 200 requestIntervalMS = 500 - maxWaitingRequests = 200 + maxPendingRequests = 200 maxTotalRequests = 300 maxRequestsPerPeer = 300 ) @@ -39,7 +39,7 @@ type BlockPool struct { requests map[uint]*bpRequest height uint // the lowest key in requests. numUnassigned int32 // number of requests not yet assigned to a peer - numWaiting int32 // number of requests awaiting response from a peer + numPending int32 // number of requests pending assignment or block response // peers peersMtx sync.Mutex @@ -59,7 +59,7 @@ func NewBlockPool(start uint, requestsCh chan<- BlockRequest, timeoutsCh chan<- requests: make(map[uint]*bpRequest), height: start, numUnassigned: 0, - numWaiting: 0, + numPending: 0, requestsCh: requestsCh, timeoutsCh: timeoutsCh, @@ -94,8 +94,8 @@ RUN_LOOP: if atomic.LoadInt32(&pool.running) == 0 { break RUN_LOOP } - _, numWaiting := pool.GetStatus() - if numWaiting >= maxWaitingRequests { + _, numPending := pool.GetStatus() + if numPending >= maxPendingRequests { // sleep for a bit. time.Sleep(requestIntervalMS * time.Millisecond) } else if len(pool.requests) >= maxTotalRequests { @@ -103,8 +103,7 @@ RUN_LOOP: time.Sleep(requestIntervalMS * time.Millisecond) } else { // request for more blocks. - height := pool.nextHeight() - pool.makeRequest(height) + pool.makeNextRequest() } } } @@ -113,7 +112,7 @@ func (pool *BlockPool) GetStatus() (uint, int32) { pool.requestsMtx.Lock() // Lock defer pool.requestsMtx.Unlock() - return pool.height, pool.numWaiting + return pool.height, pool.numPending } // We need to see the second block's Validation to validate the first block. @@ -160,7 +159,7 @@ func (pool *BlockPool) RedoRequest(height uint) { pool.RemovePeer(request.peerId) // Lock on peersMtx. request.block = nil request.peerId = "" - pool.numWaiting++ + pool.numPending++ pool.numUnassigned++ go requestRoutine(pool, height) @@ -213,7 +212,7 @@ func (pool *BlockPool) AddBlock(block *types.Block, peerId string) { return } request.block = block - pool.numWaiting-- + pool.numPending-- } func (pool *BlockPool) getPeer(peerId string) *bpPeer { @@ -279,33 +278,22 @@ func (pool *BlockPool) decrPeer(peerId string) { peer.numRequests-- } -func (pool *BlockPool) nextHeight() uint { - pool.requestsMtx.Lock() // Lock - defer pool.requestsMtx.Unlock() - - // we make one request per height. - return pool.height + uint(len(pool.requests)) -} - -func (pool *BlockPool) makeRequest(height uint) { +func (pool *BlockPool) makeNextRequest() { pool.requestsMtx.Lock() // Lock defer pool.requestsMtx.Unlock() + nextHeight := pool.height + uint(len(pool.requests)) + 1 request := &bpRequest{ - height: height, + height: nextHeight, peerId: "", block: nil, } - pool.requests[height] = request + pool.requests[nextHeight] = request pool.numUnassigned++ + pool.numPending++ - nextHeight := pool.height + uint(len(pool.requests)) - if nextHeight == height { - pool.numWaiting++ - } - - go requestRoutine(pool, height) + go requestRoutine(pool, nextHeight) } func (pool *BlockPool) sendRequest(height uint, peerId string) { diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 7d0d4b84e..793183375 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -197,18 +197,19 @@ FOR_LOOP: // ask for status updates go bcR.BroadcastStatusRequest() case _ = <-switchToConsensusTicker.C: - // not thread safe access for numUnassigned and numWaiting but should be fine + // not thread safe access for numUnassigned and numPending but should be fine // TODO make threadsafe and use exposed functions - log.Debug("Consensus ticker", "numUnassigned", bcR.pool.numUnassigned, "numWaiting", bcR.pool.numWaiting, "total", len(bcR.pool.requests)) + outbound, inbound, _ := bcR.sw.NumPeers() + log.Debug("Consensus ticker", "numUnassigned", bcR.pool.numUnassigned, "numPending", bcR.pool.numPending, + "total", len(bcR.pool.requests), "outbound", outbound, "inbound", inbound) // NOTE: this condition is very strict right now. may need to weaken - // if the max amount of requests are waiting and numUnassigned - // and we have some peers (say > 5), then we're caught up - maxWaiting := bcR.pool.numWaiting == maxWaitingRequests - peersUnavailable := bcR.pool.numWaiting == bcR.pool.numUnassigned - o, i, _ := bcR.sw.NumPeers() - enoughPeers := o+i >= 5 - if maxWaiting && peersUnavailable && enoughPeers { - log.Warn("Time to switch to consensus reactor!", "height", bcR.pool.height) + // If all `maxPendingRequests` requests are unassigned + // and we have some peers (say >= 3), then we're caught up + maxPending := bcR.pool.numPending == maxPendingRequests + allUnassigned := bcR.pool.numPending == bcR.pool.numUnassigned + enoughPeers := outbound+inbound >= 3 + if maxPending && allUnassigned && enoughPeers { + log.Info("Time to switch to consensus reactor!", "height", bcR.pool.height) bcR.pool.Stop() stateDB := dbm.GetDB("state") state := sm.LoadState(stateDB)