From ca674304c5316785fc9cdc24c73f66163f7f7966 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 24 Jun 2016 20:21:44 -0400 Subject: [PATCH] fix blockpool races. closes #188 --- Makefile | 3 +++ blockchain/pool.go | 60 +++++++++++++++++++++++++++---------------- blockchain/reactor.go | 2 +- 3 files changed, 42 insertions(+), 23 deletions(-) diff --git a/Makefile b/Makefile index d9b6fdf97..ac1956dde 100644 --- a/Makefile +++ b/Makefile @@ -20,6 +20,9 @@ build_race: test: build go test `${NOVENDOR}` + +test_race: build + go test -race `${NOVENDOR}` test100: build for i in {1..100}; do make test; done diff --git a/blockchain/pool.go b/blockchain/pool.go index f3bf5d204..3c06d669b 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -81,13 +81,13 @@ func (pool *BlockPool) makeRequestersRoutine() { if !pool.IsRunning() { break } - _, numPending := pool.GetStatus() + _, numPending, lenRequesters := pool.GetStatus() if numPending >= maxPendingRequests { // sleep for a bit. time.Sleep(requestIntervalMS * time.Millisecond) // check for timed out peers pool.removeTimedoutPeers() - } else if len(pool.requesters) >= maxTotalRequesters { + } else if lenRequesters >= maxTotalRequesters { // sleep for a bit. time.Sleep(requestIntervalMS * time.Millisecond) // check for timed out peers @@ -100,6 +100,8 @@ func (pool *BlockPool) makeRequestersRoutine() { } func (pool *BlockPool) removeTimedoutPeers() { + pool.peersMtx.Lock() // Lock + defer pool.peersMtx.Unlock() for _, peer := range pool.peers { if !peer.didTimeout && peer.numPending > 0 { curRate := peer.recvMonitor.Status().CurRate @@ -111,25 +113,24 @@ func (pool *BlockPool) removeTimedoutPeers() { } } if peer.didTimeout { - pool.peersMtx.Lock() // Lock pool.removePeer(peer.id) - pool.peersMtx.Unlock() } } } -func (pool *BlockPool) GetStatus() (height int, numPending int32) { +func (pool *BlockPool) GetStatus() (height int, numPending int32, lenRequesters int) { pool.mtx.Lock() // Lock defer pool.mtx.Unlock() - return pool.height, pool.numPending + return pool.height, pool.numPending, len(pool.requesters) } // TODO: relax conditions, prevent abuse. func (pool *BlockPool) IsCaughtUp() bool { - pool.mtx.Lock() - height := pool.height - pool.mtx.Unlock() + height, _, _ := pool.GetStatus() + + pool.peersMtx.Lock() + defer pool.peersMtx.Unlock() // Need at least 1 peer to be considered caught up. if len(pool.peers) == 0 { @@ -137,12 +138,10 @@ func (pool *BlockPool) IsCaughtUp() bool { return false } - pool.peersMtx.Lock() maxPeerHeight := 0 for _, peer := range pool.peers { maxPeerHeight = MaxInt(maxPeerHeight, peer.height) } - pool.peersMtx.Unlock() isCaughtUp := (height > 0 || time.Now().Sub(pool.startTime) > 5*time.Second) && (maxPeerHeight == 0 || height >= maxPeerHeight) log.Notice(Fmt("IsCaughtUp: %v", isCaughtUp), "height", height, "maxPeerHeight", maxPeerHeight) @@ -189,15 +188,15 @@ func (pool *BlockPool) PopRequest() { // Remove the peer and redo request from others. func (pool *BlockPool) RedoRequest(height int) { pool.mtx.Lock() // Lock - defer pool.mtx.Unlock() - request := pool.requesters[height] + pool.mtx.Unlock() + if request.block == nil { PanicSanity("Expected block to be non-nil") } // RemovePeer will redo all requesters associated with this peer. // TODO: record this malfeasance - pool.RemovePeer(request.peerID) // Lock on peersMtx. + pool.RemovePeer(request.peerID) // Lock on peersMtx and mtx } // TODO: ensure that blocks come in order for each peer. @@ -213,7 +212,10 @@ func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int if requester.setBlock(block, peerID) { pool.numPending-- peer := pool.getPeer(peerID) - peer.decrPending(blockSize) + + pool.peersMtx.Lock() + peer.decrPending(blockSize, pool.onTimeout(peer)) + pool.peersMtx.Unlock() } else { // Bad peer? } @@ -241,6 +243,11 @@ func (pool *BlockPool) RemovePeer(peerID string) { } func (pool *BlockPool) removePeer(peerID string) { + // need to lock pool to access requesters and numPending. + // peersMtx should be locked by caller + pool.mtx.Lock() + defer pool.mtx.Unlock() + for _, requester := range pool.requesters { if requester.getPeerID() == peerID { pool.numPending++ @@ -276,7 +283,7 @@ func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer { if peer.height < minHeight { continue } - peer.incrPending() + peer.incrPending(pool.onTimeout(peer)) return peer } return nil @@ -309,6 +316,14 @@ func (pool *BlockPool) sendTimeout(peerID string) { pool.timeoutsCh <- peerID } +func (pool *BlockPool) onTimeout(peer *bpPeer) func() { + return func() { + pool.peersMtx.Lock() + defer pool.peersMtx.Unlock() + peer.onTimeout() + } +} + func (pool *BlockPool) debug() string { pool.mtx.Lock() // Lock defer pool.mtx.Unlock() @@ -353,29 +368,30 @@ func (peer *bpPeer) resetMonitor() { peer.recvMonitor.SetREMA(initialValue) } -func (peer *bpPeer) resetTimeout() { +// needs the closure so we can lock the peersMtx +func (peer *bpPeer) resetTimeout(callback func()) { if peer.timeout == nil { - peer.timeout = time.AfterFunc(time.Second*peerTimeoutSeconds, peer.onTimeout) + peer.timeout = time.AfterFunc(time.Second*peerTimeoutSeconds, callback) } else { peer.timeout.Reset(time.Second * peerTimeoutSeconds) } } -func (peer *bpPeer) incrPending() { +func (peer *bpPeer) incrPending(onTimeout func()) { if peer.numPending == 0 { peer.resetMonitor() - peer.resetTimeout() + peer.resetTimeout(onTimeout) } peer.numPending++ } -func (peer *bpPeer) decrPending(recvSize int) { +func (peer *bpPeer) decrPending(recvSize int, onTimeout func()) { peer.numPending-- if peer.numPending == 0 { peer.timeout.Stop() } else { peer.recvMonitor.Update(recvSize) - peer.resetTimeout() + peer.resetTimeout(onTimeout) } } diff --git a/blockchain/reactor.go b/blockchain/reactor.go index a668e5f10..53aa311cc 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -196,7 +196,7 @@ FOR_LOOP: // ask for status updates go bcR.BroadcastStatusRequest() case _ = <-switchToConsensusTicker.C: - height, numPending := bcR.pool.GetStatus() + height, numPending, _ := bcR.pool.GetStatus() outbound, inbound, _ := bcR.Switch.NumPeers() log.Info("Consensus ticker", "numPending", numPending, "total", len(bcR.pool.requesters), "outbound", outbound, "inbound", inbound)