From ca674304c5316785fc9cdc24c73f66163f7f7966 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 24 Jun 2016 20:21:44 -0400 Subject: [PATCH 1/2] fix blockpool races. closes #188 --- Makefile | 3 +++ blockchain/pool.go | 60 +++++++++++++++++++++++++++---------------- blockchain/reactor.go | 2 +- 3 files changed, 42 insertions(+), 23 deletions(-) diff --git a/Makefile b/Makefile index d9b6fdf97..ac1956dde 100644 --- a/Makefile +++ b/Makefile @@ -20,6 +20,9 @@ build_race: test: build go test `${NOVENDOR}` + +test_race: build + go test -race `${NOVENDOR}` test100: build for i in {1..100}; do make test; done diff --git a/blockchain/pool.go b/blockchain/pool.go index f3bf5d204..3c06d669b 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -81,13 +81,13 @@ func (pool *BlockPool) makeRequestersRoutine() { if !pool.IsRunning() { break } - _, numPending := pool.GetStatus() + _, numPending, lenRequesters := pool.GetStatus() if numPending >= maxPendingRequests { // sleep for a bit. time.Sleep(requestIntervalMS * time.Millisecond) // check for timed out peers pool.removeTimedoutPeers() - } else if len(pool.requesters) >= maxTotalRequesters { + } else if lenRequesters >= maxTotalRequesters { // sleep for a bit. time.Sleep(requestIntervalMS * time.Millisecond) // check for timed out peers @@ -100,6 +100,8 @@ func (pool *BlockPool) makeRequestersRoutine() { } func (pool *BlockPool) removeTimedoutPeers() { + pool.peersMtx.Lock() // Lock + defer pool.peersMtx.Unlock() for _, peer := range pool.peers { if !peer.didTimeout && peer.numPending > 0 { curRate := peer.recvMonitor.Status().CurRate @@ -111,25 +113,24 @@ func (pool *BlockPool) removeTimedoutPeers() { } } if peer.didTimeout { - pool.peersMtx.Lock() // Lock pool.removePeer(peer.id) - pool.peersMtx.Unlock() } } } -func (pool *BlockPool) GetStatus() (height int, numPending int32) { +func (pool *BlockPool) GetStatus() (height int, numPending int32, lenRequesters int) { pool.mtx.Lock() // Lock defer pool.mtx.Unlock() - return pool.height, pool.numPending + return pool.height, pool.numPending, len(pool.requesters) } // TODO: relax conditions, prevent abuse. func (pool *BlockPool) IsCaughtUp() bool { - pool.mtx.Lock() - height := pool.height - pool.mtx.Unlock() + height, _, _ := pool.GetStatus() + + pool.peersMtx.Lock() + defer pool.peersMtx.Unlock() // Need at least 1 peer to be considered caught up. if len(pool.peers) == 0 { @@ -137,12 +138,10 @@ func (pool *BlockPool) IsCaughtUp() bool { return false } - pool.peersMtx.Lock() maxPeerHeight := 0 for _, peer := range pool.peers { maxPeerHeight = MaxInt(maxPeerHeight, peer.height) } - pool.peersMtx.Unlock() isCaughtUp := (height > 0 || time.Now().Sub(pool.startTime) > 5*time.Second) && (maxPeerHeight == 0 || height >= maxPeerHeight) log.Notice(Fmt("IsCaughtUp: %v", isCaughtUp), "height", height, "maxPeerHeight", maxPeerHeight) @@ -189,15 +188,15 @@ func (pool *BlockPool) PopRequest() { // Remove the peer and redo request from others. func (pool *BlockPool) RedoRequest(height int) { pool.mtx.Lock() // Lock - defer pool.mtx.Unlock() - request := pool.requesters[height] + pool.mtx.Unlock() + if request.block == nil { PanicSanity("Expected block to be non-nil") } // RemovePeer will redo all requesters associated with this peer. // TODO: record this malfeasance - pool.RemovePeer(request.peerID) // Lock on peersMtx. + pool.RemovePeer(request.peerID) // Lock on peersMtx and mtx } // TODO: ensure that blocks come in order for each peer. @@ -213,7 +212,10 @@ func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int if requester.setBlock(block, peerID) { pool.numPending-- peer := pool.getPeer(peerID) - peer.decrPending(blockSize) + + pool.peersMtx.Lock() + peer.decrPending(blockSize, pool.onTimeout(peer)) + pool.peersMtx.Unlock() } else { // Bad peer? } @@ -241,6 +243,11 @@ func (pool *BlockPool) RemovePeer(peerID string) { } func (pool *BlockPool) removePeer(peerID string) { + // need to lock pool to access requesters and numPending. + // peersMtx should be locked by caller + pool.mtx.Lock() + defer pool.mtx.Unlock() + for _, requester := range pool.requesters { if requester.getPeerID() == peerID { pool.numPending++ @@ -276,7 +283,7 @@ func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer { if peer.height < minHeight { continue } - peer.incrPending() + peer.incrPending(pool.onTimeout(peer)) return peer } return nil @@ -309,6 +316,14 @@ func (pool *BlockPool) sendTimeout(peerID string) { pool.timeoutsCh <- peerID } +func (pool *BlockPool) onTimeout(peer *bpPeer) func() { + return func() { + pool.peersMtx.Lock() + defer pool.peersMtx.Unlock() + peer.onTimeout() + } +} + func (pool *BlockPool) debug() string { pool.mtx.Lock() // Lock defer pool.mtx.Unlock() @@ -353,29 +368,30 @@ func (peer *bpPeer) resetMonitor() { peer.recvMonitor.SetREMA(initialValue) } -func (peer *bpPeer) resetTimeout() { +// needs the closure so we can lock the peersMtx +func (peer *bpPeer) resetTimeout(callback func()) { if peer.timeout == nil { - peer.timeout = time.AfterFunc(time.Second*peerTimeoutSeconds, peer.onTimeout) + peer.timeout = time.AfterFunc(time.Second*peerTimeoutSeconds, callback) } else { peer.timeout.Reset(time.Second * peerTimeoutSeconds) } } -func (peer *bpPeer) incrPending() { +func (peer *bpPeer) incrPending(onTimeout func()) { if peer.numPending == 0 { peer.resetMonitor() - peer.resetTimeout() + peer.resetTimeout(onTimeout) } peer.numPending++ } -func (peer *bpPeer) decrPending(recvSize int) { +func (peer *bpPeer) decrPending(recvSize int, onTimeout func()) { peer.numPending-- if peer.numPending == 0 { peer.timeout.Stop() } else { peer.recvMonitor.Update(recvSize) - peer.resetTimeout() + peer.resetTimeout(onTimeout) } } diff --git a/blockchain/reactor.go b/blockchain/reactor.go index a668e5f10..53aa311cc 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -196,7 +196,7 @@ FOR_LOOP: // ask for status updates go bcR.BroadcastStatusRequest() case _ = <-switchToConsensusTicker.C: - height, numPending := bcR.pool.GetStatus() + height, numPending, _ := bcR.pool.GetStatus() outbound, inbound, _ := bcR.Switch.NumPeers() log.Info("Consensus ticker", "numPending", numPending, "total", len(bcR.pool.requesters), "outbound", outbound, "inbound", inbound) From b25cfb0e0bae591e00c1c4a3d5a869fd16f5dee1 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Tue, 28 Jun 2016 18:02:27 -0700 Subject: [PATCH 2/2] Unify blockpool mtxs --- blockchain/pool.go | 105 +++++++++++++++++---------------------------- 1 file changed, 40 insertions(+), 65 deletions(-) diff --git a/blockchain/pool.go b/blockchain/pool.go index 3c06d669b..86f67296d 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -35,15 +35,13 @@ type BlockPool struct { QuitService startTime time.Time + mtx sync.Mutex // block requests - mtx sync.Mutex requesters map[int]*bpRequester height int // the lowest key in requesters. numPending int32 // number of requests pending assignment or block response - // peers - peersMtx sync.Mutex - peers map[string]*bpPeer + peers map[string]*bpPeer requestsCh chan<- BlockRequest timeoutsCh chan<- string @@ -100,8 +98,9 @@ func (pool *BlockPool) makeRequestersRoutine() { } func (pool *BlockPool) removeTimedoutPeers() { - pool.peersMtx.Lock() // Lock - defer pool.peersMtx.Unlock() + pool.mtx.Lock() + defer pool.mtx.Unlock() + for _, peer := range pool.peers { if !peer.didTimeout && peer.numPending > 0 { curRate := peer.recvMonitor.Status().CurRate @@ -119,7 +118,7 @@ func (pool *BlockPool) removeTimedoutPeers() { } func (pool *BlockPool) GetStatus() (height int, numPending int32, lenRequesters int) { - pool.mtx.Lock() // Lock + pool.mtx.Lock() defer pool.mtx.Unlock() return pool.height, pool.numPending, len(pool.requesters) @@ -127,10 +126,10 @@ func (pool *BlockPool) GetStatus() (height int, numPending int32, lenRequesters // TODO: relax conditions, prevent abuse. func (pool *BlockPool) IsCaughtUp() bool { - height, _, _ := pool.GetStatus() + pool.mtx.Lock() + defer pool.mtx.Unlock() - pool.peersMtx.Lock() - defer pool.peersMtx.Unlock() + height := pool.height // Need at least 1 peer to be considered caught up. if len(pool.peers) == 0 { @@ -152,7 +151,7 @@ func (pool *BlockPool) IsCaughtUp() bool { // So we peek two blocks at a time. // The caller will verify the commit. func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) { - pool.mtx.Lock() // Lock + pool.mtx.Lock() defer pool.mtx.Unlock() if r := pool.requesters[pool.height]; r != nil { @@ -167,7 +166,7 @@ func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) // Pop the first block at pool.height // It must have been validated by 'second'.Commit from PeekTwoBlocks(). func (pool *BlockPool) PopRequest() { - pool.mtx.Lock() // Lock + pool.mtx.Lock() defer pool.mtx.Unlock() if r := pool.requesters[pool.height]; r != nil { @@ -187,7 +186,7 @@ func (pool *BlockPool) PopRequest() { // Invalidates the block at pool.height, // Remove the peer and redo request from others. func (pool *BlockPool) RedoRequest(height int) { - pool.mtx.Lock() // Lock + pool.mtx.Lock() request := pool.requesters[height] pool.mtx.Unlock() @@ -196,12 +195,12 @@ func (pool *BlockPool) RedoRequest(height int) { } // RemovePeer will redo all requesters associated with this peer. // TODO: record this malfeasance - pool.RemovePeer(request.peerID) // Lock on peersMtx and mtx + pool.RemovePeer(request.peerID) } // TODO: ensure that blocks come in order for each peer. func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int) { - pool.mtx.Lock() // Lock + pool.mtx.Lock() defer pool.mtx.Unlock() requester := pool.requesters[block.Height] @@ -211,11 +210,8 @@ func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int if requester.setBlock(block, peerID) { pool.numPending-- - peer := pool.getPeer(peerID) - - pool.peersMtx.Lock() - peer.decrPending(blockSize, pool.onTimeout(peer)) - pool.peersMtx.Unlock() + peer := pool.peers[peerID] + peer.decrPending(blockSize) } else { // Bad peer? } @@ -223,8 +219,8 @@ func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int // Sets the peer's alleged blockchain height. func (pool *BlockPool) SetPeerHeight(peerID string, height int) { - pool.peersMtx.Lock() // Lock - defer pool.peersMtx.Unlock() + pool.mtx.Lock() + defer pool.mtx.Unlock() peer := pool.peers[peerID] if peer != nil { @@ -236,18 +232,13 @@ func (pool *BlockPool) SetPeerHeight(peerID string, height int) { } func (pool *BlockPool) RemovePeer(peerID string) { - pool.peersMtx.Lock() // Lock - defer pool.peersMtx.Unlock() + pool.mtx.Lock() + defer pool.mtx.Unlock() pool.removePeer(peerID) } func (pool *BlockPool) removePeer(peerID string) { - // need to lock pool to access requesters and numPending. - // peersMtx should be locked by caller - pool.mtx.Lock() - defer pool.mtx.Unlock() - for _, requester := range pool.requesters { if requester.getPeerID() == peerID { pool.numPending++ @@ -257,22 +248,14 @@ func (pool *BlockPool) removePeer(peerID string) { delete(pool.peers, peerID) } -func (pool *BlockPool) getPeer(peerID string) *bpPeer { - pool.peersMtx.Lock() // Lock - defer pool.peersMtx.Unlock() - - peer := pool.peers[peerID] - return peer -} - // Pick an available peer with at least the given minHeight. // If no peers are available, returns nil. func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer { - pool.peersMtx.Lock() - defer pool.peersMtx.Unlock() + pool.mtx.Lock() + defer pool.mtx.Unlock() for _, peer := range pool.peers { - if peer.isBad() { + if peer.didTimeout { pool.removePeer(peer.id) continue } else { @@ -283,14 +266,14 @@ func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer { if peer.height < minHeight { continue } - peer.incrPending(pool.onTimeout(peer)) + peer.incrPending() return peer } return nil } func (pool *BlockPool) makeNextRequester() { - pool.mtx.Lock() // Lock + pool.mtx.Lock() defer pool.mtx.Unlock() nextHeight := pool.height + len(pool.requesters) @@ -316,14 +299,6 @@ func (pool *BlockPool) sendTimeout(peerID string) { pool.timeoutsCh <- peerID } -func (pool *BlockPool) onTimeout(peer *bpPeer) func() { - return func() { - pool.peersMtx.Lock() - defer pool.peersMtx.Unlock() - peer.onTimeout() - } -} - func (pool *BlockPool) debug() string { pool.mtx.Lock() // Lock defer pool.mtx.Unlock() @@ -345,11 +320,13 @@ func (pool *BlockPool) debug() string { type bpPeer struct { pool *BlockPool id string - height int - numPending int32 recvMonitor *flow.Monitor - timeout *time.Timer - didTimeout bool + + mtx sync.Mutex + height int + numPending int32 + timeout *time.Timer + didTimeout bool } func newBPPeer(pool *BlockPool, peerID string, height int) *bpPeer { @@ -368,43 +345,41 @@ func (peer *bpPeer) resetMonitor() { peer.recvMonitor.SetREMA(initialValue) } -// needs the closure so we can lock the peersMtx -func (peer *bpPeer) resetTimeout(callback func()) { +func (peer *bpPeer) resetTimeout() { if peer.timeout == nil { - peer.timeout = time.AfterFunc(time.Second*peerTimeoutSeconds, callback) + peer.timeout = time.AfterFunc(time.Second*peerTimeoutSeconds, peer.onTimeout) } else { peer.timeout.Reset(time.Second * peerTimeoutSeconds) } } -func (peer *bpPeer) incrPending(onTimeout func()) { +func (peer *bpPeer) incrPending() { if peer.numPending == 0 { peer.resetMonitor() - peer.resetTimeout(onTimeout) + peer.resetTimeout() } peer.numPending++ } -func (peer *bpPeer) decrPending(recvSize int, onTimeout func()) { +func (peer *bpPeer) decrPending(recvSize int) { peer.numPending-- if peer.numPending == 0 { peer.timeout.Stop() } else { peer.recvMonitor.Update(recvSize) - peer.resetTimeout(onTimeout) + peer.resetTimeout() } } func (peer *bpPeer) onTimeout() { + peer.pool.mtx.Lock() + defer peer.pool.mtx.Unlock() + peer.pool.sendTimeout(peer.id) log.Warn("SendTimeout", "peer", peer.id, "reason", "onTimeout") peer.didTimeout = true } -func (peer *bpPeer) isBad() bool { - return peer.didTimeout -} - //------------------------------------- type bpRequester struct {