diff --git a/Makefile b/Makefile index d9b6fdf97..ac1956dde 100644 --- a/Makefile +++ b/Makefile @@ -20,6 +20,9 @@ build_race: test: build go test `${NOVENDOR}` + +test_race: build + go test -race `${NOVENDOR}` test100: build for i in {1..100}; do make test; done diff --git a/blockchain/pool.go b/blockchain/pool.go index f3bf5d204..86f67296d 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -35,15 +35,13 @@ type BlockPool struct { QuitService startTime time.Time + mtx sync.Mutex // block requests - mtx sync.Mutex requesters map[int]*bpRequester height int // the lowest key in requesters. numPending int32 // number of requests pending assignment or block response - // peers - peersMtx sync.Mutex - peers map[string]*bpPeer + peers map[string]*bpPeer requestsCh chan<- BlockRequest timeoutsCh chan<- string @@ -81,13 +79,13 @@ func (pool *BlockPool) makeRequestersRoutine() { if !pool.IsRunning() { break } - _, numPending := pool.GetStatus() + _, numPending, lenRequesters := pool.GetStatus() if numPending >= maxPendingRequests { // sleep for a bit. time.Sleep(requestIntervalMS * time.Millisecond) // check for timed out peers pool.removeTimedoutPeers() - } else if len(pool.requesters) >= maxTotalRequesters { + } else if lenRequesters >= maxTotalRequesters { // sleep for a bit. time.Sleep(requestIntervalMS * time.Millisecond) // check for timed out peers @@ -100,6 +98,9 @@ func (pool *BlockPool) makeRequestersRoutine() { } func (pool *BlockPool) removeTimedoutPeers() { + pool.mtx.Lock() + defer pool.mtx.Unlock() + for _, peer := range pool.peers { if !peer.didTimeout && peer.numPending > 0 { curRate := peer.recvMonitor.Status().CurRate @@ -111,25 +112,24 @@ func (pool *BlockPool) removeTimedoutPeers() { } } if peer.didTimeout { - pool.peersMtx.Lock() // Lock pool.removePeer(peer.id) - pool.peersMtx.Unlock() } } } -func (pool *BlockPool) GetStatus() (height int, numPending int32) { - pool.mtx.Lock() // Lock +func (pool *BlockPool) GetStatus() (height int, numPending int32, lenRequesters int) { + pool.mtx.Lock() defer pool.mtx.Unlock() - return pool.height, pool.numPending + return pool.height, pool.numPending, len(pool.requesters) } // TODO: relax conditions, prevent abuse. func (pool *BlockPool) IsCaughtUp() bool { pool.mtx.Lock() + defer pool.mtx.Unlock() + height := pool.height - pool.mtx.Unlock() // Need at least 1 peer to be considered caught up. if len(pool.peers) == 0 { @@ -137,12 +137,10 @@ func (pool *BlockPool) IsCaughtUp() bool { return false } - pool.peersMtx.Lock() maxPeerHeight := 0 for _, peer := range pool.peers { maxPeerHeight = MaxInt(maxPeerHeight, peer.height) } - pool.peersMtx.Unlock() isCaughtUp := (height > 0 || time.Now().Sub(pool.startTime) > 5*time.Second) && (maxPeerHeight == 0 || height >= maxPeerHeight) log.Notice(Fmt("IsCaughtUp: %v", isCaughtUp), "height", height, "maxPeerHeight", maxPeerHeight) @@ -153,7 +151,7 @@ func (pool *BlockPool) IsCaughtUp() bool { // So we peek two blocks at a time. // The caller will verify the commit. func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) { - pool.mtx.Lock() // Lock + pool.mtx.Lock() defer pool.mtx.Unlock() if r := pool.requesters[pool.height]; r != nil { @@ -168,7 +166,7 @@ func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) // Pop the first block at pool.height // It must have been validated by 'second'.Commit from PeekTwoBlocks(). func (pool *BlockPool) PopRequest() { - pool.mtx.Lock() // Lock + pool.mtx.Lock() defer pool.mtx.Unlock() if r := pool.requesters[pool.height]; r != nil { @@ -188,21 +186,21 @@ func (pool *BlockPool) PopRequest() { // Invalidates the block at pool.height, // Remove the peer and redo request from others. func (pool *BlockPool) RedoRequest(height int) { - pool.mtx.Lock() // Lock - defer pool.mtx.Unlock() - + pool.mtx.Lock() request := pool.requesters[height] + pool.mtx.Unlock() + if request.block == nil { PanicSanity("Expected block to be non-nil") } // RemovePeer will redo all requesters associated with this peer. // TODO: record this malfeasance - pool.RemovePeer(request.peerID) // Lock on peersMtx. + pool.RemovePeer(request.peerID) } // TODO: ensure that blocks come in order for each peer. func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int) { - pool.mtx.Lock() // Lock + pool.mtx.Lock() defer pool.mtx.Unlock() requester := pool.requesters[block.Height] @@ -212,7 +210,7 @@ func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int if requester.setBlock(block, peerID) { pool.numPending-- - peer := pool.getPeer(peerID) + peer := pool.peers[peerID] peer.decrPending(blockSize) } else { // Bad peer? @@ -221,8 +219,8 @@ func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int // Sets the peer's alleged blockchain height. func (pool *BlockPool) SetPeerHeight(peerID string, height int) { - pool.peersMtx.Lock() // Lock - defer pool.peersMtx.Unlock() + pool.mtx.Lock() + defer pool.mtx.Unlock() peer := pool.peers[peerID] if peer != nil { @@ -234,8 +232,8 @@ func (pool *BlockPool) SetPeerHeight(peerID string, height int) { } func (pool *BlockPool) RemovePeer(peerID string) { - pool.peersMtx.Lock() // Lock - defer pool.peersMtx.Unlock() + pool.mtx.Lock() + defer pool.mtx.Unlock() pool.removePeer(peerID) } @@ -250,22 +248,14 @@ func (pool *BlockPool) removePeer(peerID string) { delete(pool.peers, peerID) } -func (pool *BlockPool) getPeer(peerID string) *bpPeer { - pool.peersMtx.Lock() // Lock - defer pool.peersMtx.Unlock() - - peer := pool.peers[peerID] - return peer -} - // Pick an available peer with at least the given minHeight. // If no peers are available, returns nil. func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer { - pool.peersMtx.Lock() - defer pool.peersMtx.Unlock() + pool.mtx.Lock() + defer pool.mtx.Unlock() for _, peer := range pool.peers { - if peer.isBad() { + if peer.didTimeout { pool.removePeer(peer.id) continue } else { @@ -283,7 +273,7 @@ func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer { } func (pool *BlockPool) makeNextRequester() { - pool.mtx.Lock() // Lock + pool.mtx.Lock() defer pool.mtx.Unlock() nextHeight := pool.height + len(pool.requesters) @@ -330,11 +320,13 @@ func (pool *BlockPool) debug() string { type bpPeer struct { pool *BlockPool id string - height int - numPending int32 recvMonitor *flow.Monitor - timeout *time.Timer - didTimeout bool + + mtx sync.Mutex + height int + numPending int32 + timeout *time.Timer + didTimeout bool } func newBPPeer(pool *BlockPool, peerID string, height int) *bpPeer { @@ -380,15 +372,14 @@ func (peer *bpPeer) decrPending(recvSize int) { } func (peer *bpPeer) onTimeout() { + peer.pool.mtx.Lock() + defer peer.pool.mtx.Unlock() + peer.pool.sendTimeout(peer.id) log.Warn("SendTimeout", "peer", peer.id, "reason", "onTimeout") peer.didTimeout = true } -func (peer *bpPeer) isBad() bool { - return peer.didTimeout -} - //------------------------------------- type bpRequester struct { diff --git a/blockchain/reactor.go b/blockchain/reactor.go index a668e5f10..53aa311cc 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -196,7 +196,7 @@ FOR_LOOP: // ask for status updates go bcR.BroadcastStatusRequest() case _ = <-switchToConsensusTicker.C: - height, numPending := bcR.pool.GetStatus() + height, numPending, _ := bcR.pool.GetStatus() outbound, inbound, _ := bcR.Switch.NumPeers() log.Info("Consensus ticker", "numPending", numPending, "total", len(bcR.pool.requesters), "outbound", outbound, "inbound", inbound)