Browse Source

Potential fix for blockchain pool halting issue

pull/1641/head v0.19.6-rc2
Jae Kwon 6 years ago
committed by Ethan Buchman
parent
commit
fd6021876b
1 changed files with 35 additions and 27 deletions
  1. +35
    -27
      blockchain/pool.go

+ 35
- 27
blockchain/pool.go View File

@ -5,6 +5,7 @@ import (
"fmt"
"math"
"sync"
"sync/atomic"
"time"
cmn "github.com/tendermint/tmlibs/common"
@ -66,11 +67,13 @@ type BlockPool struct {
// block requests
requesters map[int64]*bpRequester
height int64 // the lowest key in requesters.
numPending int32 // number of requests pending assignment or block response
// peers
peers map[p2p.ID]*bpPeer
maxPeerHeight int64
// atomic
numPending int32 // number of requests pending assignment or block response
requestsCh chan<- BlockRequest
errorsCh chan<- peerError
}
@ -151,7 +154,7 @@ func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequester
pool.mtx.Lock()
defer pool.mtx.Unlock()
return pool.height, pool.numPending, len(pool.requesters)
return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters)
}
// TODO: relax conditions, prevent abuse.
@ -245,7 +248,7 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
}
if requester.setBlock(block, peerID) {
pool.numPending--
atomic.AddInt32(&pool.numPending, -1)
peer := pool.peers[peerID]
if peer != nil {
peer.decrPending(blockSize)
@ -291,10 +294,7 @@ func (pool *BlockPool) RemovePeer(peerID p2p.ID) {
func (pool *BlockPool) removePeer(peerID p2p.ID) {
for _, requester := range pool.requesters {
if requester.getPeerID() == peerID {
if requester.getBlock() != nil {
pool.numPending++
}
go requester.redo() // pick another peer and ...
requester.redo()
}
}
delete(pool.peers, peerID)
@ -332,7 +332,7 @@ func (pool *BlockPool) makeNextRequester() {
// request.SetLogger(pool.Logger.With("height", nextHeight))
pool.requesters[nextHeight] = request
pool.numPending++
atomic.AddInt32(&pool.numPending, 1)
err := request.Start()
if err != nil {
@ -360,7 +360,7 @@ func (pool *BlockPool) sendError(err error, peerID p2p.ID) {
// unused by tendermint; left for debugging purposes
func (pool *BlockPool) debug() string {
pool.mtx.Lock() // Lock
pool.mtx.Lock()
defer pool.mtx.Unlock()
str := ""
@ -466,8 +466,8 @@ func newBPRequester(pool *BlockPool, height int64) *bpRequester {
bpr := &bpRequester{
pool: pool,
height: height,
gotBlockCh: make(chan struct{}),
redoCh: make(chan struct{}),
gotBlockCh: make(chan struct{}, 1),
redoCh: make(chan struct{}, 1),
peerID: "",
block: nil,
@ -481,7 +481,7 @@ func (bpr *bpRequester) OnStart() error {
return nil
}
// Returns true if the peer matches
// Returns true if the peer matches and block doesn't already exist.
func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool {
bpr.mtx.Lock()
if bpr.block != nil || bpr.peerID != peerID {
@ -491,7 +491,10 @@ func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool {
bpr.block = block
bpr.mtx.Unlock()
bpr.gotBlockCh <- struct{}{}
select {
case bpr.gotBlockCh <- struct{}{}:
default:
}
return true
}
@ -507,17 +510,27 @@ func (bpr *bpRequester) getPeerID() p2p.ID {
return bpr.peerID
}
// This is called from the requestRoutine, upon redo().
func (bpr *bpRequester) reset() {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
if bpr.block != nil {
atomic.AddInt32(&bpr.pool.numPending, 1)
}
bpr.peerID = ""
bpr.block = nil
bpr.mtx.Unlock()
}
// Tells bpRequester to pick another peer and try again.
// NOTE: blocking
// NOTE: Nonblocking, and does nothing if another redo
// was already requested.
func (bpr *bpRequester) redo() {
bpr.redoCh <- struct{}{}
select {
case bpr.redoCh <- struct{}{}:
default:
}
}
// Responsible for making more requests as necessary
@ -546,17 +559,8 @@ OUTER_LOOP:
// Send request and wait.
bpr.pool.sendRequest(bpr.height, peer.id)
select {
case <-bpr.pool.Quit():
bpr.Stop()
return
case <-bpr.Quit():
return
case <-bpr.redoCh:
bpr.reset()
continue OUTER_LOOP // When peer is removed
case <-bpr.gotBlockCh:
// We got the block, now see if it's good.
WAIT_LOOP:
for {
select {
case <-bpr.pool.Quit():
bpr.Stop()
@ -566,6 +570,10 @@ OUTER_LOOP:
case <-bpr.redoCh:
bpr.reset()
continue OUTER_LOOP
case <-bpr.gotBlockCh:
// We got a block!
// Continue the for-loop and wait til Quit.
continue WAIT_LOOP
}
}
}


Loading…
Cancel
Save