diff --git a/blockchain/pool.go b/blockchain/pool.go index 7b4e2145a..3067b8041 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -18,8 +18,6 @@ const ( maxRequestsPerPeer = 300 ) -// numTotal = numPending + blocks in the pool we havnt synced yet - var ( requestTimeoutSeconds = time.Duration(3) ) @@ -37,12 +35,11 @@ var ( type BlockPool struct { // block requests - requestsMtx sync.Mutex - requests map[uint]*bpRequest - peerless int32 // number of requests without peers - height uint // the lowest key in requests. - numPending int32 - numTotal int32 + requestsMtx sync.Mutex + requests map[uint]*bpRequest + height uint // the lowest key in requests. + numUnassigned int32 // number of requests not yet assigned to a peer + numWaiting int32 // number of requests awaiting response from a peer // peers peersMtx sync.Mutex @@ -59,10 +56,10 @@ func NewBlockPool(start uint, requestsCh chan<- BlockRequest, timeoutsCh chan<- return &BlockPool{ peers: make(map[string]*bpPeer), - requests: make(map[uint]*bpRequest), - height: start, - numPending: 0, - numTotal: 0, + requests: make(map[uint]*bpRequest), + height: start, + numUnassigned: 0, + numWaiting: 0, requestsCh: requestsCh, timeoutsCh: timeoutsCh, @@ -97,11 +94,11 @@ RUN_LOOP: if atomic.LoadInt32(&pool.running) == 0 { break RUN_LOOP } - _, numPending, numTotal := pool.GetStatus() - if numPending >= maxPendingRequests { + _, numWaiting := pool.GetStatus() + if numWaiting >= maxPendingRequests { // sleep for a bit. time.Sleep(requestIntervalMS * time.Millisecond) - } else if numTotal >= maxTotalRequests { + } else if len(pool.requests) >= maxTotalRequests { // sleep for a bit. time.Sleep(requestIntervalMS * time.Millisecond) } else { @@ -112,11 +109,11 @@ RUN_LOOP: } } -func (pool *BlockPool) GetStatus() (uint, int32, int32) { +func (pool *BlockPool) GetStatus() (uint, int32) { pool.requestsMtx.Lock() // Lock defer pool.requestsMtx.Unlock() - return pool.height, pool.numPending, pool.numTotal + return pool.height, pool.numWaiting } // We need to see the second block's Validation to validate the first block. @@ -146,7 +143,6 @@ func (pool *BlockPool) PopRequest() { delete(pool.requests, pool.height) pool.height++ - pool.numTotal-- } // Invalidates the block at pool.height. @@ -164,8 +160,8 @@ func (pool *BlockPool) RedoRequest(height uint) { pool.RemovePeer(request.peerId) // Lock on peersMtx. request.block = nil request.peerId = "" - pool.numPending++ - pool.peerless++ + pool.numWaiting++ + pool.numUnassigned++ go requestRoutine(pool, height) } @@ -186,7 +182,7 @@ func (pool *BlockPool) setPeerForRequest(height uint, peerId string) { if request == nil { return } - pool.peerless-- + pool.numUnassigned-- request.peerId = peerId } @@ -198,7 +194,7 @@ func (pool *BlockPool) removePeerForRequest(height uint, peerId string) { if request == nil { return } - pool.peerless++ + pool.numUnassigned++ request.peerId = "" } @@ -217,7 +213,7 @@ func (pool *BlockPool) AddBlock(block *types.Block, peerId string) { return } request.block = block - pool.numPending-- + pool.numWaiting-- } func (pool *BlockPool) getPeer(peerId string) *bpPeer { @@ -288,7 +284,7 @@ func (pool *BlockPool) nextHeight() uint { defer pool.requestsMtx.Unlock() // we make one request per height. - return pool.height + uint(pool.numTotal) + return pool.height + uint(len(pool.requests)) } func (pool *BlockPool) makeRequest(height uint) { @@ -302,12 +298,11 @@ func (pool *BlockPool) makeRequest(height uint) { } pool.requests[height] = request - pool.peerless++ + pool.numUnassigned++ - nextHeight := pool.height + uint(pool.numTotal) + nextHeight := pool.height + uint(len(pool.requests)) if nextHeight == height { - pool.numTotal++ - pool.numPending++ + pool.numWaiting++ } go requestRoutine(pool, height) @@ -332,7 +327,7 @@ func (pool *BlockPool) debug() string { defer pool.requestsMtx.Unlock() str := "" - for h := pool.height; h < pool.height+uint(pool.numTotal); h++ { + for h := pool.height; h < pool.height+uint(len(pool.requests)); h++ { if pool.requests[h] == nil { str += Fmt("H(%v):X ", h) } else { @@ -379,7 +374,7 @@ func requestRoutine(pool *BlockPool, height uint) { break PICK_LOOP } - // set the peer, decrement peerless + // set the peer, decrement numUnassigned pool.setPeerForRequest(height, peer.id) for try := 0; try < maxTries; try++ { @@ -391,14 +386,14 @@ func requestRoutine(pool *BlockPool, height uint) { return } // or already processed and we've moved past it - bpHeight, _, _ := pool.GetStatus() + bpHeight, _ := pool.GetStatus() if height < bpHeight { pool.decrPeer(peer.id) return } } - // unset the peer, increment peerless + // unset the peer, increment numUnassigned pool.removePeerForRequest(height, peer.id) // this peer failed us, try again diff --git a/blockchain/reactor.go b/blockchain/reactor.go index bc5bb38f0..be00e7db4 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -195,13 +195,14 @@ FOR_LOOP: // ask for status updates go bcR.BroadcastStatusRequest() case _ = <-switchToConsensusTicker.C: - // not thread safe access for peerless and numPending but should be fine - log.Debug("Consensus ticker", "peerless", bcR.pool.peerless, "pending", bcR.pool.numPending, "total", bcR.pool.numTotal) + // not thread safe access for numUnassigned and numWaiting but should be fine + // TODO make threadsafe and use exposed functions + log.Debug("Consensus ticker", "numUnassigned", bcR.pool.numUnassigned, "numWaiting", bcR.pool.numWaiting, "total", len(bcR.pool.requests)) // NOTE: this condition is very strict right now. may need to weaken - // if the max amount of requests are pending and peerless + // if the max amount of requests are waiting and numUnassigned // and we have some peers (say > 5), then we're caught up - maxPending := bcR.pool.numPending == maxPendingRequests - maxPeerless := bcR.pool.peerless == bcR.pool.numPending + maxPending := bcR.pool.numWaiting == maxPendingRequests + maxPeerless := bcR.pool.numUnassigned == bcR.pool.numWaiting o, i, _ := bcR.sw.NumPeers() enoughPeers := o+i >= 5 if maxPending && maxPeerless && enoughPeers {