Browse Source

Merge pull request #1641 from tendermint/release/v0.19.6

Release/v0.19.6
pull/1650/merge v0.19.6
Ethan Buchman 7 years ago
committed by GitHub
parent
commit
846ca5ce56
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 45 additions and 29 deletions
  1. +8
    -0
      CHANGELOG.md
  2. +35
    -27
      blockchain/pool.go
  3. +2
    -2
      version/version.go

+ 8
- 0
CHANGELOG.md View File

@ -1,5 +1,13 @@
# Changelog # Changelog
## 0.19.6
*May 29th, 2018*
BUG FIXES
- [blockchain] Fix fast-sync deadlock during high peer turnover
## 0.19.5 ## 0.19.5
*May 20th, 2018* *May 20th, 2018*


+ 35
- 27
blockchain/pool.go View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"math" "math"
"sync" "sync"
"sync/atomic"
"time" "time"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
@ -66,11 +67,13 @@ type BlockPool struct {
// block requests // block requests
requesters map[int64]*bpRequester requesters map[int64]*bpRequester
height int64 // the lowest key in requesters. height int64 // the lowest key in requesters.
numPending int32 // number of requests pending assignment or block response
// peers // peers
peers map[p2p.ID]*bpPeer peers map[p2p.ID]*bpPeer
maxPeerHeight int64 maxPeerHeight int64
// atomic
numPending int32 // number of requests pending assignment or block response
requestsCh chan<- BlockRequest requestsCh chan<- BlockRequest
errorsCh chan<- peerError errorsCh chan<- peerError
} }
@ -151,7 +154,7 @@ func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequester
pool.mtx.Lock() pool.mtx.Lock()
defer pool.mtx.Unlock() defer pool.mtx.Unlock()
return pool.height, pool.numPending, len(pool.requesters)
return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters)
} }
// TODO: relax conditions, prevent abuse. // TODO: relax conditions, prevent abuse.
@ -245,7 +248,7 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
} }
if requester.setBlock(block, peerID) { if requester.setBlock(block, peerID) {
pool.numPending--
atomic.AddInt32(&pool.numPending, -1)
peer := pool.peers[peerID] peer := pool.peers[peerID]
if peer != nil { if peer != nil {
peer.decrPending(blockSize) peer.decrPending(blockSize)
@ -291,10 +294,7 @@ func (pool *BlockPool) RemovePeer(peerID p2p.ID) {
func (pool *BlockPool) removePeer(peerID p2p.ID) { func (pool *BlockPool) removePeer(peerID p2p.ID) {
for _, requester := range pool.requesters { for _, requester := range pool.requesters {
if requester.getPeerID() == peerID { if requester.getPeerID() == peerID {
if requester.getBlock() != nil {
pool.numPending++
}
go requester.redo() // pick another peer and ...
requester.redo()
} }
} }
delete(pool.peers, peerID) delete(pool.peers, peerID)
@ -332,7 +332,7 @@ func (pool *BlockPool) makeNextRequester() {
// request.SetLogger(pool.Logger.With("height", nextHeight)) // request.SetLogger(pool.Logger.With("height", nextHeight))
pool.requesters[nextHeight] = request pool.requesters[nextHeight] = request
pool.numPending++
atomic.AddInt32(&pool.numPending, 1)
err := request.Start() err := request.Start()
if err != nil { if err != nil {
@ -360,7 +360,7 @@ func (pool *BlockPool) sendError(err error, peerID p2p.ID) {
// unused by tendermint; left for debugging purposes // unused by tendermint; left for debugging purposes
func (pool *BlockPool) debug() string { func (pool *BlockPool) debug() string {
pool.mtx.Lock() // Lock
pool.mtx.Lock()
defer pool.mtx.Unlock() defer pool.mtx.Unlock()
str := "" str := ""
@ -466,8 +466,8 @@ func newBPRequester(pool *BlockPool, height int64) *bpRequester {
bpr := &bpRequester{ bpr := &bpRequester{
pool: pool, pool: pool,
height: height, height: height,
gotBlockCh: make(chan struct{}),
redoCh: make(chan struct{}),
gotBlockCh: make(chan struct{}, 1),
redoCh: make(chan struct{}, 1),
peerID: "", peerID: "",
block: nil, block: nil,
@ -481,7 +481,7 @@ func (bpr *bpRequester) OnStart() error {
return nil return nil
} }
// Returns true if the peer matches
// Returns true if the peer matches and block doesn't already exist.
func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool { func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool {
bpr.mtx.Lock() bpr.mtx.Lock()
if bpr.block != nil || bpr.peerID != peerID { if bpr.block != nil || bpr.peerID != peerID {
@ -491,7 +491,10 @@ func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool {
bpr.block = block bpr.block = block
bpr.mtx.Unlock() bpr.mtx.Unlock()
bpr.gotBlockCh <- struct{}{}
select {
case bpr.gotBlockCh <- struct{}{}:
default:
}
return true return true
} }
@ -507,17 +510,27 @@ func (bpr *bpRequester) getPeerID() p2p.ID {
return bpr.peerID return bpr.peerID
} }
// This is called from the requestRoutine, upon redo().
func (bpr *bpRequester) reset() { func (bpr *bpRequester) reset() {
bpr.mtx.Lock() bpr.mtx.Lock()
defer bpr.mtx.Unlock()
if bpr.block != nil {
atomic.AddInt32(&bpr.pool.numPending, 1)
}
bpr.peerID = "" bpr.peerID = ""
bpr.block = nil bpr.block = nil
bpr.mtx.Unlock()
} }
// Tells bpRequester to pick another peer and try again. // Tells bpRequester to pick another peer and try again.
// NOTE: blocking
// NOTE: Nonblocking, and does nothing if another redo
// was already requested.
func (bpr *bpRequester) redo() { func (bpr *bpRequester) redo() {
bpr.redoCh <- struct{}{}
select {
case bpr.redoCh <- struct{}{}:
default:
}
} }
// Responsible for making more requests as necessary // Responsible for making more requests as necessary
@ -546,17 +559,8 @@ OUTER_LOOP:
// Send request and wait. // Send request and wait.
bpr.pool.sendRequest(bpr.height, peer.id) bpr.pool.sendRequest(bpr.height, peer.id)
select {
case <-bpr.pool.Quit():
bpr.Stop()
return
case <-bpr.Quit():
return
case <-bpr.redoCh:
bpr.reset()
continue OUTER_LOOP // When peer is removed
case <-bpr.gotBlockCh:
// We got the block, now see if it's good.
WAIT_LOOP:
for {
select { select {
case <-bpr.pool.Quit(): case <-bpr.pool.Quit():
bpr.Stop() bpr.Stop()
@ -566,6 +570,10 @@ OUTER_LOOP:
case <-bpr.redoCh: case <-bpr.redoCh:
bpr.reset() bpr.reset()
continue OUTER_LOOP continue OUTER_LOOP
case <-bpr.gotBlockCh:
// We got a block!
// Continue the for-loop and wait til Quit.
continue WAIT_LOOP
} }
} }
} }


+ 2
- 2
version/version.go View File

@ -4,13 +4,13 @@ package version
const ( const (
Maj = "0" Maj = "0"
Min = "19" Min = "19"
Fix = "5"
Fix = "6"
) )
var ( var (
// Version is the current version of Tendermint // Version is the current version of Tendermint
// Must be a string because scripts like dist.sh read this file. // Must be a string because scripts like dist.sh read this file.
Version = "0.19.5"
Version = "0.19.6"
// GitCommit is the current HEAD set using ldflags. // GitCommit is the current HEAD set using ldflags.
GitCommit string GitCommit string


Loading…
Cancel
Save