diff --git a/blockchain/pool.go b/blockchain/pool.go index 603b4bf2a..8b964e81a 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "sync" + "sync/atomic" "time" cmn "github.com/tendermint/tmlibs/common" @@ -66,11 +67,13 @@ type BlockPool struct { // block requests requesters map[int64]*bpRequester height int64 // the lowest key in requesters. - numPending int32 // number of requests pending assignment or block response // peers peers map[p2p.ID]*bpPeer maxPeerHeight int64 + // atomic + numPending int32 // number of requests pending assignment or block response + requestsCh chan<- BlockRequest errorsCh chan<- peerError } @@ -151,7 +154,7 @@ func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequester pool.mtx.Lock() defer pool.mtx.Unlock() - return pool.height, pool.numPending, len(pool.requesters) + return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters) } // TODO: relax conditions, prevent abuse. @@ -245,7 +248,7 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int } if requester.setBlock(block, peerID) { - pool.numPending-- + atomic.AddInt32(&pool.numPending, -1) peer := pool.peers[peerID] if peer != nil { peer.decrPending(blockSize) @@ -291,10 +294,7 @@ func (pool *BlockPool) RemovePeer(peerID p2p.ID) { func (pool *BlockPool) removePeer(peerID p2p.ID) { for _, requester := range pool.requesters { if requester.getPeerID() == peerID { - if requester.getBlock() != nil { - pool.numPending++ - } - go requester.redo() // pick another peer and ... + requester.redo() } } delete(pool.peers, peerID) @@ -332,7 +332,7 @@ func (pool *BlockPool) makeNextRequester() { // request.SetLogger(pool.Logger.With("height", nextHeight)) pool.requesters[nextHeight] = request - pool.numPending++ + atomic.AddInt32(&pool.numPending, 1) err := request.Start() if err != nil { @@ -360,7 +360,7 @@ func (pool *BlockPool) sendError(err error, peerID p2p.ID) { // unused by tendermint; left for debugging purposes func (pool *BlockPool) debug() string { - pool.mtx.Lock() // Lock + pool.mtx.Lock() defer pool.mtx.Unlock() str := "" @@ -466,8 +466,8 @@ func newBPRequester(pool *BlockPool, height int64) *bpRequester { bpr := &bpRequester{ pool: pool, height: height, - gotBlockCh: make(chan struct{}), - redoCh: make(chan struct{}), + gotBlockCh: make(chan struct{}, 1), + redoCh: make(chan struct{}, 1), peerID: "", block: nil, @@ -481,7 +481,7 @@ func (bpr *bpRequester) OnStart() error { return nil } -// Returns true if the peer matches +// Returns true if the peer matches and block doesn't already exist. func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool { bpr.mtx.Lock() if bpr.block != nil || bpr.peerID != peerID { @@ -491,7 +491,10 @@ func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool { bpr.block = block bpr.mtx.Unlock() - bpr.gotBlockCh <- struct{}{} + select { + case bpr.gotBlockCh <- struct{}{}: + default: + } return true } @@ -507,17 +510,27 @@ func (bpr *bpRequester) getPeerID() p2p.ID { return bpr.peerID } +// This is called from the requestRoutine, upon redo(). func (bpr *bpRequester) reset() { bpr.mtx.Lock() + defer bpr.mtx.Unlock() + + if bpr.block != nil { + atomic.AddInt32(&bpr.pool.numPending, 1) + } + bpr.peerID = "" bpr.block = nil - bpr.mtx.Unlock() } // Tells bpRequester to pick another peer and try again. -// NOTE: blocking +// NOTE: Nonblocking, and does nothing if another redo +// was already requested. func (bpr *bpRequester) redo() { - bpr.redoCh <- struct{}{} + select { + case bpr.redoCh <- struct{}{}: + default: + } } // Responsible for making more requests as necessary @@ -546,17 +559,8 @@ OUTER_LOOP: // Send request and wait. bpr.pool.sendRequest(bpr.height, peer.id) - select { - case <-bpr.pool.Quit(): - bpr.Stop() - return - case <-bpr.Quit(): - return - case <-bpr.redoCh: - bpr.reset() - continue OUTER_LOOP // When peer is removed - case <-bpr.gotBlockCh: - // We got the block, now see if it's good. + WAIT_LOOP: + for { select { case <-bpr.pool.Quit(): bpr.Stop() @@ -566,6 +570,10 @@ OUTER_LOOP: case <-bpr.redoCh: bpr.reset() continue OUTER_LOOP + case <-bpr.gotBlockCh: + // We got a block! + // Continue the for-loop and wait til Quit. + continue WAIT_LOOP } } }