Browse Source

fix blockpool races. closes #188

pull/224/head
Ethan Buchman 9 years ago
parent
commit
ca674304c5
3 changed files with 42 additions and 23 deletions
  1. +3
    -0
      Makefile
  2. +38
    -22
      blockchain/pool.go
  3. +1
    -1
      blockchain/reactor.go

+ 3
- 0
Makefile View File

@ -20,6 +20,9 @@ build_race:
test: build
go test `${NOVENDOR}`
test_race: build
go test -race `${NOVENDOR}`
test100: build
for i in {1..100}; do make test; done


+ 38
- 22
blockchain/pool.go View File

@ -81,13 +81,13 @@ func (pool *BlockPool) makeRequestersRoutine() {
if !pool.IsRunning() {
break
}
_, numPending := pool.GetStatus()
_, numPending, lenRequesters := pool.GetStatus()
if numPending >= maxPendingRequests {
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
// check for timed out peers
pool.removeTimedoutPeers()
} else if len(pool.requesters) >= maxTotalRequesters {
} else if lenRequesters >= maxTotalRequesters {
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
// check for timed out peers
@ -100,6 +100,8 @@ func (pool *BlockPool) makeRequestersRoutine() {
}
func (pool *BlockPool) removeTimedoutPeers() {
pool.peersMtx.Lock() // Lock
defer pool.peersMtx.Unlock()
for _, peer := range pool.peers {
if !peer.didTimeout && peer.numPending > 0 {
curRate := peer.recvMonitor.Status().CurRate
@ -111,25 +113,24 @@ func (pool *BlockPool) removeTimedoutPeers() {
}
}
if peer.didTimeout {
pool.peersMtx.Lock() // Lock
pool.removePeer(peer.id)
pool.peersMtx.Unlock()
}
}
}
func (pool *BlockPool) GetStatus() (height int, numPending int32) {
func (pool *BlockPool) GetStatus() (height int, numPending int32, lenRequesters int) {
pool.mtx.Lock() // Lock
defer pool.mtx.Unlock()
return pool.height, pool.numPending
return pool.height, pool.numPending, len(pool.requesters)
}
// TODO: relax conditions, prevent abuse.
func (pool *BlockPool) IsCaughtUp() bool {
pool.mtx.Lock()
height := pool.height
pool.mtx.Unlock()
height, _, _ := pool.GetStatus()
pool.peersMtx.Lock()
defer pool.peersMtx.Unlock()
// Need at least 1 peer to be considered caught up.
if len(pool.peers) == 0 {
@ -137,12 +138,10 @@ func (pool *BlockPool) IsCaughtUp() bool {
return false
}
pool.peersMtx.Lock()
maxPeerHeight := 0
for _, peer := range pool.peers {
maxPeerHeight = MaxInt(maxPeerHeight, peer.height)
}
pool.peersMtx.Unlock()
isCaughtUp := (height > 0 || time.Now().Sub(pool.startTime) > 5*time.Second) && (maxPeerHeight == 0 || height >= maxPeerHeight)
log.Notice(Fmt("IsCaughtUp: %v", isCaughtUp), "height", height, "maxPeerHeight", maxPeerHeight)
@ -189,15 +188,15 @@ func (pool *BlockPool) PopRequest() {
// Remove the peer and redo request from others.
func (pool *BlockPool) RedoRequest(height int) {
pool.mtx.Lock() // Lock
defer pool.mtx.Unlock()
request := pool.requesters[height]
pool.mtx.Unlock()
if request.block == nil {
PanicSanity("Expected block to be non-nil")
}
// RemovePeer will redo all requesters associated with this peer.
// TODO: record this malfeasance
pool.RemovePeer(request.peerID) // Lock on peersMtx.
pool.RemovePeer(request.peerID) // Lock on peersMtx and mtx
}
// TODO: ensure that blocks come in order for each peer.
@ -213,7 +212,10 @@ func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int
if requester.setBlock(block, peerID) {
pool.numPending--
peer := pool.getPeer(peerID)
peer.decrPending(blockSize)
pool.peersMtx.Lock()
peer.decrPending(blockSize, pool.onTimeout(peer))
pool.peersMtx.Unlock()
} else {
// Bad peer?
}
@ -241,6 +243,11 @@ func (pool *BlockPool) RemovePeer(peerID string) {
}
func (pool *BlockPool) removePeer(peerID string) {
// need to lock pool to access requesters and numPending.
// peersMtx should be locked by caller
pool.mtx.Lock()
defer pool.mtx.Unlock()
for _, requester := range pool.requesters {
if requester.getPeerID() == peerID {
pool.numPending++
@ -276,7 +283,7 @@ func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer {
if peer.height < minHeight {
continue
}
peer.incrPending()
peer.incrPending(pool.onTimeout(peer))
return peer
}
return nil
@ -309,6 +316,14 @@ func (pool *BlockPool) sendTimeout(peerID string) {
pool.timeoutsCh <- peerID
}
func (pool *BlockPool) onTimeout(peer *bpPeer) func() {
return func() {
pool.peersMtx.Lock()
defer pool.peersMtx.Unlock()
peer.onTimeout()
}
}
func (pool *BlockPool) debug() string {
pool.mtx.Lock() // Lock
defer pool.mtx.Unlock()
@ -353,29 +368,30 @@ func (peer *bpPeer) resetMonitor() {
peer.recvMonitor.SetREMA(initialValue)
}
func (peer *bpPeer) resetTimeout() {
// needs the closure so we can lock the peersMtx
func (peer *bpPeer) resetTimeout(callback func()) {
if peer.timeout == nil {
peer.timeout = time.AfterFunc(time.Second*peerTimeoutSeconds, peer.onTimeout)
peer.timeout = time.AfterFunc(time.Second*peerTimeoutSeconds, callback)
} else {
peer.timeout.Reset(time.Second * peerTimeoutSeconds)
}
}
func (peer *bpPeer) incrPending() {
func (peer *bpPeer) incrPending(onTimeout func()) {
if peer.numPending == 0 {
peer.resetMonitor()
peer.resetTimeout()
peer.resetTimeout(onTimeout)
}
peer.numPending++
}
func (peer *bpPeer) decrPending(recvSize int) {
func (peer *bpPeer) decrPending(recvSize int, onTimeout func()) {
peer.numPending--
if peer.numPending == 0 {
peer.timeout.Stop()
} else {
peer.recvMonitor.Update(recvSize)
peer.resetTimeout()
peer.resetTimeout(onTimeout)
}
}


+ 1
- 1
blockchain/reactor.go View File

@ -196,7 +196,7 @@ FOR_LOOP:
// ask for status updates
go bcR.BroadcastStatusRequest()
case _ = <-switchToConsensusTicker.C:
height, numPending := bcR.pool.GetStatus()
height, numPending, _ := bcR.pool.GetStatus()
outbound, inbound, _ := bcR.Switch.NumPeers()
log.Info("Consensus ticker", "numPending", numPending, "total", len(bcR.pool.requesters),
"outbound", outbound, "inbound", inbound)


Loading…
Cancel
Save