From 926127c774a2c9110c4284938411818918ffecac Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 20 Mar 2019 03:59:33 +0300 Subject: [PATCH] blockchain: update the maxHeight when a peer is removed (#3350) * blockchain: update the maxHeight when a peer is removed Refs #2699 * add a changelog entry * make linter pass --- CHANGELOG_PENDING.md | 2 ++ blockchain/pool.go | 52 ++++++++++++++++++++++++++++++++--------- blockchain/pool_test.go | 46 +++++++++++++++++++++++++++++++++--- 3 files changed, 86 insertions(+), 14 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 3cbc63b7b..de16fcc26 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -20,3 +20,5 @@ ### IMPROVEMENTS: ### BUG FIXES: + +- [blockchain] \#2699 update the maxHeight when a peer is removed diff --git a/blockchain/pool.go b/blockchain/pool.go index 2cb7dda96..c842c0d13 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -69,7 +69,7 @@ type BlockPool struct { height int64 // the lowest key in requesters. // peers peers map[p2p.ID]*bpPeer - maxPeerHeight int64 + maxPeerHeight int64 // the biggest reported height // atomic numPending int32 // number of requests pending assignment or block response @@ -78,6 +78,8 @@ type BlockPool struct { errorsCh chan<- peerError } +// NewBlockPool returns a new BlockPool with the height equal to start. Block +// requests and errors will be sent to requestsCh and errorsCh accordingly. func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- peerError) *BlockPool { bp := &BlockPool{ peers: make(map[p2p.ID]*bpPeer), @@ -93,15 +95,15 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p return bp } +// OnStart implements cmn.Service by spawning requesters routine and recording +// pool's start time. func (pool *BlockPool) OnStart() error { go pool.makeRequestersRoutine() pool.startTime = time.Now() return nil } -func (pool *BlockPool) OnStop() {} - -// Run spawns requesters as needed. +// spawns requesters as needed func (pool *BlockPool) makeRequestersRoutine() { for { if !pool.IsRunning() { @@ -150,6 +152,8 @@ func (pool *BlockPool) removeTimedoutPeers() { } } +// GetStatus returns pool's height, numPending requests and the number of +// requesters. func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequesters int) { pool.mtx.Lock() defer pool.mtx.Unlock() @@ -157,6 +161,7 @@ func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequester return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters) } +// IsCaughtUp returns true if this node is caught up, false - otherwise. // TODO: relax conditions, prevent abuse. func (pool *BlockPool) IsCaughtUp() bool { pool.mtx.Lock() @@ -170,8 +175,9 @@ func (pool *BlockPool) IsCaughtUp() bool { // Some conditions to determine if we're caught up. // Ensures we've either received a block or waited some amount of time, - // and that we're synced to the highest known height. Note we use maxPeerHeight - 1 - // because to sync block H requires block H+1 to verify the LastCommit. + // and that we're synced to the highest known height. + // Note we use maxPeerHeight - 1 because to sync block H requires block H+1 + // to verify the LastCommit. receivedBlockOrTimedOut := pool.height > 0 || time.Since(pool.startTime) > 5*time.Second ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= (pool.maxPeerHeight-1) isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers @@ -260,14 +266,14 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int } } -// MaxPeerHeight returns the highest height reported by a peer. +// MaxPeerHeight returns the highest reported height. func (pool *BlockPool) MaxPeerHeight() int64 { pool.mtx.Lock() defer pool.mtx.Unlock() return pool.maxPeerHeight } -// Sets the peer's alleged blockchain height. +// SetPeerHeight sets the peer's alleged blockchain height. func (pool *BlockPool) SetPeerHeight(peerID p2p.ID, height int64) { pool.mtx.Lock() defer pool.mtx.Unlock() @@ -286,6 +292,8 @@ func (pool *BlockPool) SetPeerHeight(peerID p2p.ID, height int64) { } } +// RemovePeer removes the peer with peerID from the pool. If there's no peer +// with peerID, function is a no-op. func (pool *BlockPool) RemovePeer(peerID p2p.ID) { pool.mtx.Lock() defer pool.mtx.Unlock() @@ -299,10 +307,32 @@ func (pool *BlockPool) removePeer(peerID p2p.ID) { requester.redo(peerID) } } - if p, exist := pool.peers[peerID]; exist && p.timeout != nil { - p.timeout.Stop() + + peer, ok := pool.peers[peerID] + if ok { + if peer.timeout != nil { + peer.timeout.Stop() + } + + delete(pool.peers, peerID) + + // Find a new peer with the biggest height and update maxPeerHeight if the + // peer's height was the biggest. + if peer.height == pool.maxPeerHeight { + pool.updateMaxPeerHeight() + } + } +} + +// If no peers are left, maxPeerHeight is set to 0. +func (pool *BlockPool) updateMaxPeerHeight() { + var max int64 + for _, peer := range pool.peers { + if peer.height > max { + max = peer.height + } } - delete(pool.peers, peerID) + pool.maxPeerHeight = max } // Pick an available peer with at least the given minHeight. diff --git a/blockchain/pool_test.go b/blockchain/pool_test.go index 75a03f631..e24f6131e 100644 --- a/blockchain/pool_test.go +++ b/blockchain/pool_test.go @@ -1,12 +1,15 @@ package blockchain import ( + "fmt" "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" ) @@ -66,7 +69,7 @@ func makePeers(numPeers int, minHeight, maxHeight int64) testPeers { return peers } -func TestBasic(t *testing.T) { +func TestBlockPoolBasic(t *testing.T) { start := int64(42) peers := makePeers(10, start+1, 1000) errorsCh := make(chan peerError, 1000) @@ -122,7 +125,7 @@ func TestBasic(t *testing.T) { } } -func TestTimeout(t *testing.T) { +func TestBlockPoolTimeout(t *testing.T) { start := int64(42) peers := makePeers(10, start+1, 1000) errorsCh := make(chan peerError, 1000) @@ -180,3 +183,40 @@ func TestTimeout(t *testing.T) { } } } + +func TestBlockPoolRemovePeer(t *testing.T) { + peers := make(testPeers, 10) + for i := 0; i < 10; i++ { + peerID := p2p.ID(fmt.Sprintf("%d", i+1)) + height := int64(i + 1) + peers[peerID] = testPeer{peerID, height, make(chan inputData)} + } + requestsCh := make(chan BlockRequest) + errorsCh := make(chan peerError) + + pool := NewBlockPool(1, requestsCh, errorsCh) + pool.SetLogger(log.TestingLogger()) + err := pool.Start() + require.NoError(t, err) + defer pool.Stop() + + // add peers + for peerID, peer := range peers { + pool.SetPeerHeight(peerID, peer.height) + } + assert.EqualValues(t, 10, pool.MaxPeerHeight()) + + // remove not-existing peer + assert.NotPanics(t, func() { pool.RemovePeer(p2p.ID("Superman")) }) + + // remove peer with biggest height + pool.RemovePeer(p2p.ID("10")) + assert.EqualValues(t, 9, pool.MaxPeerHeight()) + + // remove all peers + for peerID := range peers { + pool.RemovePeer(peerID) + } + + assert.EqualValues(t, 0, pool.MaxPeerHeight()) +}