Browse Source

blockchain pool race condition fix

pull/139/merge
Jae Kwon 9 years ago
parent
commit
47eee5ddd5
1 changed files with 36 additions and 28 deletions
  1. +36
    -28
      blockchain/pool.go

+ 36
- 28
blockchain/pool.go View File

@ -11,10 +11,10 @@ import (
) )
const ( const (
requestIntervalMS = 500
requestIntervalMS = 250
maxTotalRequests = 300 maxTotalRequests = 300
maxPendingRequests = maxTotalRequests maxPendingRequests = maxTotalRequests
maxPendingRequestsPerPeer = 50
maxPendingRequestsPerPeer = 75
peerTimeoutSeconds = 15 peerTimeoutSeconds = 15
minRecvRate = 10240 // 10Kb/s minRecvRate = 10240 // 10Kb/s
) )
@ -148,10 +148,10 @@ func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block)
defer pool.mtx.Unlock() defer pool.mtx.Unlock()
if r := pool.requests[pool.height]; r != nil { if r := pool.requests[pool.height]; r != nil {
first = r.block
first = r.getBlock()
} }
if r := pool.requests[pool.height+1]; r != nil { if r := pool.requests[pool.height+1]; r != nil {
second = r.block
second = r.getBlock()
} }
return return
} }
@ -162,9 +162,11 @@ func (pool *BlockPool) PopRequest() {
pool.mtx.Lock() // Lock pool.mtx.Lock() // Lock
defer pool.mtx.Unlock() defer pool.mtx.Unlock()
/* The block can disappear at any time, due to removePeer().
if r := pool.requests[pool.height]; r == nil || r.block == nil { if r := pool.requests[pool.height]; r == nil || r.block == nil {
PanicSanity("PopRequest() requires a valid block") PanicSanity("PopRequest() requires a valid block")
} }
*/
delete(pool.requests, pool.height) delete(pool.requests, pool.height)
pool.height++ pool.height++
@ -332,46 +334,46 @@ func newBPPeer(pool *BlockPool, peerID string, height int) *bpPeer {
return peer return peer
} }
func (bpp *bpPeer) resetMonitor() {
bpp.recvMonitor = flow.New(time.Second, time.Second*40)
func (peer *bpPeer) resetMonitor() {
peer.recvMonitor = flow.New(time.Second, time.Second*40)
var initialValue = float64(minRecvRate) * math.E var initialValue = float64(minRecvRate) * math.E
bpp.recvMonitor.SetREMA(initialValue)
peer.recvMonitor.SetREMA(initialValue)
} }
func (bpp *bpPeer) resetTimeout() {
if bpp.timeout == nil {
bpp.timeout = time.AfterFunc(time.Second*peerTimeoutSeconds, bpp.onTimeout)
func (peer *bpPeer) resetTimeout() {
if peer.timeout == nil {
peer.timeout = time.AfterFunc(time.Second*peerTimeoutSeconds, peer.onTimeout)
} else { } else {
bpp.timeout.Reset(time.Second * peerTimeoutSeconds)
peer.timeout.Reset(time.Second * peerTimeoutSeconds)
} }
} }
func (bpp *bpPeer) incrPending() {
if bpp.numPending == 0 {
bpp.resetMonitor()
bpp.resetTimeout()
func (peer *bpPeer) incrPending() {
if peer.numPending == 0 {
peer.resetMonitor()
peer.resetTimeout()
} }
bpp.numPending++
peer.numPending++
} }
func (bpp *bpPeer) decrPending(recvSize int) {
bpp.numPending--
if bpp.numPending == 0 {
bpp.timeout.Stop()
func (peer *bpPeer) decrPending(recvSize int) {
peer.numPending--
if peer.numPending == 0 {
peer.timeout.Stop()
} else { } else {
bpp.recvMonitor.Update(recvSize)
bpp.resetTimeout()
peer.recvMonitor.Update(recvSize)
peer.resetTimeout()
} }
} }
func (bpp *bpPeer) onTimeout() {
bpp.pool.sendTimeout(bpp.id)
log.Warn("SendTimeout", "peer", bpp.id, "reason", "onTimeout")
bpp.didTimeout = true
func (peer *bpPeer) onTimeout() {
peer.pool.sendTimeout(peer.id)
log.Warn("SendTimeout", "peer", peer.id, "reason", "onTimeout")
peer.didTimeout = true
} }
func (bpp *bpPeer) isBad() bool {
return bpp.didTimeout
func (peer *bpPeer) isBad() bool {
return peer.didTimeout
} }
//------------------------------------- //-------------------------------------
@ -422,6 +424,12 @@ func (bpr *bpRequester) setBlock(block *types.Block, peerID string) bool {
return true return true
} }
func (bpr *bpRequester) getBlock() *types.Block {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
return bpr.block
}
func (bpr *bpRequester) getPeerID() string { func (bpr *bpRequester) getPeerID() string {
bpr.mtx.Lock() bpr.mtx.Lock()
defer bpr.mtx.Unlock() defer bpr.mtx.Unlock()


Loading…
Cancel
Save