Browse Source

blockchain: update the maxHeight when a peer is removed (#3350)

* blockchain: update the maxHeight when a peer is removed

Refs #2699

* add a changelog entry

* make linter pass
pull/3453/head
Anton Kaliaev 6 years ago
committed by Ethan Buchman
parent
commit
926127c774
3 changed files with 86 additions and 14 deletions
  1. +2
    -0
      CHANGELOG_PENDING.md
  2. +41
    -11
      blockchain/pool.go
  3. +43
    -3
      blockchain/pool_test.go

+ 2
- 0
CHANGELOG_PENDING.md View File

@ -20,3 +20,5 @@
### IMPROVEMENTS: ### IMPROVEMENTS:
### BUG FIXES: ### BUG FIXES:
- [blockchain] \#2699 update the maxHeight when a peer is removed

+ 41
- 11
blockchain/pool.go View File

@ -69,7 +69,7 @@ type BlockPool struct {
height int64 // the lowest key in requesters. height int64 // the lowest key in requesters.
// peers // peers
peers map[p2p.ID]*bpPeer peers map[p2p.ID]*bpPeer
maxPeerHeight int64
maxPeerHeight int64 // the biggest reported height
// atomic // atomic
numPending int32 // number of requests pending assignment or block response numPending int32 // number of requests pending assignment or block response
@ -78,6 +78,8 @@ type BlockPool struct {
errorsCh chan<- peerError errorsCh chan<- peerError
} }
// NewBlockPool returns a new BlockPool with the height equal to start. Block
// requests and errors will be sent to requestsCh and errorsCh accordingly.
func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- peerError) *BlockPool { func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- peerError) *BlockPool {
bp := &BlockPool{ bp := &BlockPool{
peers: make(map[p2p.ID]*bpPeer), peers: make(map[p2p.ID]*bpPeer),
@ -93,15 +95,15 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p
return bp return bp
} }
// OnStart implements cmn.Service by spawning requesters routine and recording
// pool's start time.
func (pool *BlockPool) OnStart() error { func (pool *BlockPool) OnStart() error {
go pool.makeRequestersRoutine() go pool.makeRequestersRoutine()
pool.startTime = time.Now() pool.startTime = time.Now()
return nil return nil
} }
func (pool *BlockPool) OnStop() {}
// Run spawns requesters as needed.
// spawns requesters as needed
func (pool *BlockPool) makeRequestersRoutine() { func (pool *BlockPool) makeRequestersRoutine() {
for { for {
if !pool.IsRunning() { if !pool.IsRunning() {
@ -150,6 +152,8 @@ func (pool *BlockPool) removeTimedoutPeers() {
} }
} }
// GetStatus returns pool's height, numPending requests and the number of
// requesters.
func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequesters int) { func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequesters int) {
pool.mtx.Lock() pool.mtx.Lock()
defer pool.mtx.Unlock() defer pool.mtx.Unlock()
@ -157,6 +161,7 @@ func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequester
return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters) return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters)
} }
// IsCaughtUp returns true if this node is caught up, false - otherwise.
// TODO: relax conditions, prevent abuse. // TODO: relax conditions, prevent abuse.
func (pool *BlockPool) IsCaughtUp() bool { func (pool *BlockPool) IsCaughtUp() bool {
pool.mtx.Lock() pool.mtx.Lock()
@ -170,8 +175,9 @@ func (pool *BlockPool) IsCaughtUp() bool {
// Some conditions to determine if we're caught up. // Some conditions to determine if we're caught up.
// Ensures we've either received a block or waited some amount of time, // Ensures we've either received a block or waited some amount of time,
// and that we're synced to the highest known height. Note we use maxPeerHeight - 1
// because to sync block H requires block H+1 to verify the LastCommit.
// and that we're synced to the highest known height.
// Note we use maxPeerHeight - 1 because to sync block H requires block H+1
// to verify the LastCommit.
receivedBlockOrTimedOut := pool.height > 0 || time.Since(pool.startTime) > 5*time.Second receivedBlockOrTimedOut := pool.height > 0 || time.Since(pool.startTime) > 5*time.Second
ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= (pool.maxPeerHeight-1) ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= (pool.maxPeerHeight-1)
isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers
@ -260,14 +266,14 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
} }
} }
// MaxPeerHeight returns the highest height reported by a peer.
// MaxPeerHeight returns the highest reported height.
func (pool *BlockPool) MaxPeerHeight() int64 { func (pool *BlockPool) MaxPeerHeight() int64 {
pool.mtx.Lock() pool.mtx.Lock()
defer pool.mtx.Unlock() defer pool.mtx.Unlock()
return pool.maxPeerHeight return pool.maxPeerHeight
} }
// Sets the peer's alleged blockchain height.
// SetPeerHeight sets the peer's alleged blockchain height.
func (pool *BlockPool) SetPeerHeight(peerID p2p.ID, height int64) { func (pool *BlockPool) SetPeerHeight(peerID p2p.ID, height int64) {
pool.mtx.Lock() pool.mtx.Lock()
defer pool.mtx.Unlock() defer pool.mtx.Unlock()
@ -286,6 +292,8 @@ func (pool *BlockPool) SetPeerHeight(peerID p2p.ID, height int64) {
} }
} }
// RemovePeer removes the peer with peerID from the pool. If there's no peer
// with peerID, function is a no-op.
func (pool *BlockPool) RemovePeer(peerID p2p.ID) { func (pool *BlockPool) RemovePeer(peerID p2p.ID) {
pool.mtx.Lock() pool.mtx.Lock()
defer pool.mtx.Unlock() defer pool.mtx.Unlock()
@ -299,10 +307,32 @@ func (pool *BlockPool) removePeer(peerID p2p.ID) {
requester.redo(peerID) requester.redo(peerID)
} }
} }
if p, exist := pool.peers[peerID]; exist && p.timeout != nil {
p.timeout.Stop()
peer, ok := pool.peers[peerID]
if ok {
if peer.timeout != nil {
peer.timeout.Stop()
}
delete(pool.peers, peerID)
// Find a new peer with the biggest height and update maxPeerHeight if the
// peer's height was the biggest.
if peer.height == pool.maxPeerHeight {
pool.updateMaxPeerHeight()
}
}
}
// If no peers are left, maxPeerHeight is set to 0.
func (pool *BlockPool) updateMaxPeerHeight() {
var max int64
for _, peer := range pool.peers {
if peer.height > max {
max = peer.height
}
} }
delete(pool.peers, peerID)
pool.maxPeerHeight = max
} }
// Pick an available peer with at least the given minHeight. // Pick an available peer with at least the given minHeight.


+ 43
- 3
blockchain/pool_test.go View File

@ -1,12 +1,15 @@
package blockchain package blockchain
import ( import (
"fmt"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -66,7 +69,7 @@ func makePeers(numPeers int, minHeight, maxHeight int64) testPeers {
return peers return peers
} }
func TestBasic(t *testing.T) {
func TestBlockPoolBasic(t *testing.T) {
start := int64(42) start := int64(42)
peers := makePeers(10, start+1, 1000) peers := makePeers(10, start+1, 1000)
errorsCh := make(chan peerError, 1000) errorsCh := make(chan peerError, 1000)
@ -122,7 +125,7 @@ func TestBasic(t *testing.T) {
} }
} }
func TestTimeout(t *testing.T) {
func TestBlockPoolTimeout(t *testing.T) {
start := int64(42) start := int64(42)
peers := makePeers(10, start+1, 1000) peers := makePeers(10, start+1, 1000)
errorsCh := make(chan peerError, 1000) errorsCh := make(chan peerError, 1000)
@ -180,3 +183,40 @@ func TestTimeout(t *testing.T) {
} }
} }
} }
func TestBlockPoolRemovePeer(t *testing.T) {
peers := make(testPeers, 10)
for i := 0; i < 10; i++ {
peerID := p2p.ID(fmt.Sprintf("%d", i+1))
height := int64(i + 1)
peers[peerID] = testPeer{peerID, height, make(chan inputData)}
}
requestsCh := make(chan BlockRequest)
errorsCh := make(chan peerError)
pool := NewBlockPool(1, requestsCh, errorsCh)
pool.SetLogger(log.TestingLogger())
err := pool.Start()
require.NoError(t, err)
defer pool.Stop()
// add peers
for peerID, peer := range peers {
pool.SetPeerHeight(peerID, peer.height)
}
assert.EqualValues(t, 10, pool.MaxPeerHeight())
// remove not-existing peer
assert.NotPanics(t, func() { pool.RemovePeer(p2p.ID("Superman")) })
// remove peer with biggest height
pool.RemovePeer(p2p.ID("10"))
assert.EqualValues(t, 9, pool.MaxPeerHeight())
// remove all peers
for peerID := range peers {
pool.RemovePeer(peerID)
}
assert.EqualValues(t, 0, pool.MaxPeerHeight())
}

Loading…
Cancel
Save