Browse Source

Fix fast sync stack with wrong block #2457 (#2731)

* fix fastsync may stuck by a wrong block

* fixes from updates

* fixes from review

* Align spec with the changes

* fmt
pull/2919/head
Ethan Buchman 6 years ago
committed by GitHub
parent
commit
47a0669d12
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 358 additions and 190 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +24
    -13
      blockchain/pool.go
  3. +46
    -11
      blockchain/pool_test.go
  4. +13
    -2
      blockchain/reactor.go
  5. +215
    -123
      blockchain/reactor_test.go
  6. +20
    -3
      blockchain/store_test.go
  7. +4
    -4
      consensus/mempool_test.go
  8. +1
    -1
      consensus/state_test.go
  9. +28
    -25
      docs/spec/reactors/block_sync/reactor.md
  10. +1
    -1
      evidence/reactor.go
  11. +1
    -1
      libs/autofile/autofile_test.go
  12. +3
    -5
      node/node_test.go
  13. +1
    -1
      types/events.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -32,6 +32,7 @@ program](https://hackerone.com/tendermint).
### BUG FIXES: ### BUG FIXES:
- [blockchain] \#2731 Retry both blocks if either is bad to avoid getting stuck during fast sync (@goolAdapter)
- [log] \#2868 fix module=main setting overriding all others - [log] \#2868 fix module=main setting overriding all others
- [rpc] \#2808 RPC validators calls IncrementAccum if necessary - [rpc] \#2808 RPC validators calls IncrementAccum if necessary
- [rpc] \#2811 Allow integer IDs in JSON-RPC requests - [rpc] \#2811 Allow integer IDs in JSON-RPC requests

+ 24
- 13
blockchain/pool.go View File

@ -168,9 +168,12 @@ func (pool *BlockPool) IsCaughtUp() bool {
return false return false
} }
// some conditions to determine if we're caught up
receivedBlockOrTimedOut := (pool.height > 0 || time.Since(pool.startTime) > 5*time.Second)
ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= pool.maxPeerHeight
// Some conditions to determine if we're caught up.
// Ensures we've either received a block or waited some amount of time,
// and that we're synced to the highest known height. Note we use maxPeerHeight - 1
// because to sync block H requires block H+1 to verify the LastCommit.
receivedBlockOrTimedOut := pool.height > 0 || time.Since(pool.startTime) > 5*time.Second
ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= (pool.maxPeerHeight-1)
isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers
return isCaughtUp return isCaughtUp
} }
@ -252,7 +255,8 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
peer.decrPending(blockSize) peer.decrPending(blockSize)
} }
} else { } else {
// Bad peer?
pool.Logger.Info("invalid peer", "peer", peerID, "blockHeight", block.Height)
pool.sendError(errors.New("invalid peer"), peerID)
} }
} }
@ -292,7 +296,7 @@ func (pool *BlockPool) RemovePeer(peerID p2p.ID) {
func (pool *BlockPool) removePeer(peerID p2p.ID) { func (pool *BlockPool) removePeer(peerID p2p.ID) {
for _, requester := range pool.requesters { for _, requester := range pool.requesters {
if requester.getPeerID() == peerID { if requester.getPeerID() == peerID {
requester.redo()
requester.redo(peerID)
} }
} }
delete(pool.peers, peerID) delete(pool.peers, peerID)
@ -326,8 +330,11 @@ func (pool *BlockPool) makeNextRequester() {
defer pool.mtx.Unlock() defer pool.mtx.Unlock()
nextHeight := pool.height + pool.requestersLen() nextHeight := pool.height + pool.requestersLen()
if nextHeight > pool.maxPeerHeight {
return
}
request := newBPRequester(pool, nextHeight) request := newBPRequester(pool, nextHeight)
// request.SetLogger(pool.Logger.With("height", nextHeight))
pool.requesters[nextHeight] = request pool.requesters[nextHeight] = request
atomic.AddInt32(&pool.numPending, 1) atomic.AddInt32(&pool.numPending, 1)
@ -453,7 +460,7 @@ type bpRequester struct {
pool *BlockPool pool *BlockPool
height int64 height int64
gotBlockCh chan struct{} gotBlockCh chan struct{}
redoCh chan struct{}
redoCh chan p2p.ID //redo may send multitime, add peerId to identify repeat
mtx sync.Mutex mtx sync.Mutex
peerID p2p.ID peerID p2p.ID
@ -465,7 +472,7 @@ func newBPRequester(pool *BlockPool, height int64) *bpRequester {
pool: pool, pool: pool,
height: height, height: height,
gotBlockCh: make(chan struct{}, 1), gotBlockCh: make(chan struct{}, 1),
redoCh: make(chan struct{}, 1),
redoCh: make(chan p2p.ID, 1),
peerID: "", peerID: "",
block: nil, block: nil,
@ -524,9 +531,9 @@ func (bpr *bpRequester) reset() {
// Tells bpRequester to pick another peer and try again. // Tells bpRequester to pick another peer and try again.
// NOTE: Nonblocking, and does nothing if another redo // NOTE: Nonblocking, and does nothing if another redo
// was already requested. // was already requested.
func (bpr *bpRequester) redo() {
func (bpr *bpRequester) redo(peerId p2p.ID) {
select { select {
case bpr.redoCh <- struct{}{}:
case bpr.redoCh <- peerId:
default: default:
} }
} }
@ -565,9 +572,13 @@ OUTER_LOOP:
return return
case <-bpr.Quit(): case <-bpr.Quit():
return return
case <-bpr.redoCh:
bpr.reset()
continue OUTER_LOOP
case peerID := <-bpr.redoCh:
if peerID == bpr.peerID {
bpr.reset()
continue OUTER_LOOP
} else {
continue WAIT_LOOP
}
case <-bpr.gotBlockCh: case <-bpr.gotBlockCh:
// We got a block! // We got a block!
// Continue the for-loop and wait til Quit. // Continue the for-loop and wait til Quit.


+ 46
- 11
blockchain/pool_test.go View File

@ -16,16 +16,52 @@ func init() {
} }
type testPeer struct { type testPeer struct {
id p2p.ID
height int64
id p2p.ID
height int64
inputChan chan inputData //make sure each peer's data is sequential
} }
func makePeers(numPeers int, minHeight, maxHeight int64) map[p2p.ID]testPeer {
peers := make(map[p2p.ID]testPeer, numPeers)
type inputData struct {
t *testing.T
pool *BlockPool
request BlockRequest
}
func (p testPeer) runInputRoutine() {
go func() {
for input := range p.inputChan {
p.simulateInput(input)
}
}()
}
// Request desired, pretend like we got the block immediately.
func (p testPeer) simulateInput(input inputData) {
block := &types.Block{Header: types.Header{Height: input.request.Height}}
input.pool.AddBlock(input.request.PeerID, block, 123)
input.t.Logf("Added block from peer %v (height: %v)", input.request.PeerID, input.request.Height)
}
type testPeers map[p2p.ID]testPeer
func (ps testPeers) start() {
for _, v := range ps {
v.runInputRoutine()
}
}
func (ps testPeers) stop() {
for _, v := range ps {
close(v.inputChan)
}
}
func makePeers(numPeers int, minHeight, maxHeight int64) testPeers {
peers := make(testPeers, numPeers)
for i := 0; i < numPeers; i++ { for i := 0; i < numPeers; i++ {
peerID := p2p.ID(cmn.RandStr(12)) peerID := p2p.ID(cmn.RandStr(12))
height := minHeight + cmn.RandInt63n(maxHeight-minHeight) height := minHeight + cmn.RandInt63n(maxHeight-minHeight)
peers[peerID] = testPeer{peerID, height}
peers[peerID] = testPeer{peerID, height, make(chan inputData, 10)}
} }
return peers return peers
} }
@ -45,6 +81,9 @@ func TestBasic(t *testing.T) {
defer pool.Stop() defer pool.Stop()
peers.start()
defer peers.stop()
// Introduce each peer. // Introduce each peer.
go func() { go func() {
for _, peer := range peers { for _, peer := range peers {
@ -77,12 +116,8 @@ func TestBasic(t *testing.T) {
if request.Height == 300 { if request.Height == 300 {
return // Done! return // Done!
} }
// Request desired, pretend like we got the block immediately.
go func() {
block := &types.Block{Header: types.Header{Height: request.Height}}
pool.AddBlock(request.PeerID, block, 123)
t.Logf("Added block from peer %v (height: %v)", request.PeerID, request.Height)
}()
peers[request.PeerID].inputChan <- inputData{t, pool, request}
} }
} }
} }


+ 13
- 2
blockchain/reactor.go View File

@ -264,8 +264,12 @@ FOR_LOOP:
bcR.Logger.Info("Time to switch to consensus reactor!", "height", height) bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
bcR.pool.Stop() bcR.pool.Stop()
conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
conR.SwitchToConsensus(state, blocksSynced)
conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
if ok {
conR.SwitchToConsensus(state, blocksSynced)
} else {
// should only happen during testing
}
break FOR_LOOP break FOR_LOOP
} }
@ -314,6 +318,13 @@ FOR_LOOP:
// still need to clean up the rest. // still need to clean up the rest.
bcR.Switch.StopPeerForError(peer, fmt.Errorf("BlockchainReactor validation error: %v", err)) bcR.Switch.StopPeerForError(peer, fmt.Errorf("BlockchainReactor validation error: %v", err))
} }
peerID2 := bcR.pool.RedoRequest(second.Height)
peer2 := bcR.Switch.Peers().Get(peerID2)
if peer2 != nil && peer2 != peer {
// NOTE: we've already removed the peer's request, but we
// still need to clean up the rest.
bcR.Switch.StopPeerForError(peer2, fmt.Errorf("BlockchainReactor validation error: %v", err))
}
continue FOR_LOOP continue FOR_LOOP
} else { } else {
bcR.pool.PopRequest() bcR.pool.PopRequest()


+ 215
- 123
blockchain/reactor_test.go View File

@ -1,72 +1,151 @@
package blockchain package blockchain
import ( import (
"net"
"sort"
"testing" "testing"
"time"
"github.com/stretchr/testify/assert"
abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db" dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"
) )
func makeStateAndBlockStore(logger log.Logger) (sm.State, *BlockStore) {
config := cfg.ResetTestRoot("blockchain_reactor_test")
// blockDB := dbm.NewDebugDB("blockDB", dbm.NewMemDB())
// stateDB := dbm.NewDebugDB("stateDB", dbm.NewMemDB())
var config *cfg.Config
func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.GenesisDoc, []types.PrivValidator) {
validators := make([]types.GenesisValidator, numValidators)
privValidators := make([]types.PrivValidator, numValidators)
for i := 0; i < numValidators; i++ {
val, privVal := types.RandValidator(randPower, minPower)
validators[i] = types.GenesisValidator{
PubKey: val.PubKey,
Power: val.VotingPower,
}
privValidators[i] = privVal
}
sort.Sort(types.PrivValidatorsByAddress(privValidators))
return &types.GenesisDoc{
GenesisTime: tmtime.Now(),
ChainID: config.ChainID(),
Validators: validators,
}, privValidators
}
func makeVote(header *types.Header, blockID types.BlockID, valset *types.ValidatorSet, privVal types.PrivValidator) *types.Vote {
addr := privVal.GetAddress()
idx, _ := valset.GetByAddress(addr)
vote := &types.Vote{
ValidatorAddress: addr,
ValidatorIndex: idx,
Height: header.Height,
Round: 1,
Timestamp: tmtime.Now(),
Type: types.PrecommitType,
BlockID: blockID,
}
privVal.SignVote(header.ChainID, vote)
return vote
}
type BlockchainReactorPair struct {
reactor *BlockchainReactor
app proxy.AppConns
}
func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals []types.PrivValidator, maxBlockHeight int64) BlockchainReactorPair {
if len(privVals) != 1 {
panic("only support one validator")
}
app := &testApp{}
cc := proxy.NewLocalClientCreator(app)
proxyApp := proxy.NewAppConns(cc)
err := proxyApp.Start()
if err != nil {
panic(cmn.ErrorWrap(err, "error start app"))
}
blockDB := dbm.NewMemDB() blockDB := dbm.NewMemDB()
stateDB := dbm.NewMemDB() stateDB := dbm.NewMemDB()
blockStore := NewBlockStore(blockDB) blockStore := NewBlockStore(blockDB)
state, err := sm.LoadStateFromDBOrGenesisFile(stateDB, config.GenesisFile())
state, err := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc)
if err != nil { if err != nil {
panic(cmn.ErrorWrap(err, "error constructing state from genesis file")) panic(cmn.ErrorWrap(err, "error constructing state from genesis file"))
} }
return state, blockStore
}
func newBlockchainReactor(logger log.Logger, maxBlockHeight int64) *BlockchainReactor {
state, blockStore := makeStateAndBlockStore(logger)
// Make the blockchainReactor itself
// Make the BlockchainReactor itself.
// NOTE we have to create and commit the blocks first because
// pool.height is determined from the store.
fastSync := true fastSync := true
var nilApp proxy.AppConnConsensus
blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), nilApp,
blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), proxyApp.Consensus(),
sm.MockMempool{}, sm.MockEvidencePool{}) sm.MockMempool{}, sm.MockEvidencePool{})
bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blockchain"))
// let's add some blocks in
for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ {
lastCommit := &types.Commit{}
if blockHeight > 1 {
lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1)
lastBlock := blockStore.LoadBlock(blockHeight - 1)
// Next: we need to set a switch in order for peers to be added in
bcReactor.Switch = p2p.NewSwitch(cfg.DefaultP2PConfig(), nil)
vote := makeVote(&lastBlock.Header, lastBlockMeta.BlockID, state.Validators, privVals[0])
lastCommit = &types.Commit{Precommits: []*types.Vote{vote}, BlockID: lastBlockMeta.BlockID}
}
// Lastly: let's add some blocks in
for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ {
firstBlock := makeBlock(blockHeight, state)
secondBlock := makeBlock(blockHeight+1, state)
firstParts := firstBlock.MakePartSet(types.BlockPartSizeBytes)
blockStore.SaveBlock(firstBlock, firstParts, secondBlock.LastCommit)
thisBlock := makeBlock(blockHeight, state, lastCommit)
thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes)
blockID := types.BlockID{thisBlock.Hash(), thisParts.Header()}
state, err = blockExec.ApplyBlock(state, blockID, thisBlock)
if err != nil {
panic(cmn.ErrorWrap(err, "error apply block"))
}
blockStore.SaveBlock(thisBlock, thisParts, lastCommit)
} }
return bcReactor
bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blockchain"))
return BlockchainReactorPair{bcReactor, proxyApp}
} }
func TestNoBlockResponse(t *testing.T) { func TestNoBlockResponse(t *testing.T) {
maxBlockHeight := int64(20)
config = cfg.ResetTestRoot("blockchain_reactor_test")
genDoc, privVals := randGenesisDoc(1, false, 30)
maxBlockHeight := int64(65)
reactorPairs := make([]BlockchainReactorPair, 2)
reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
bcr := newBlockchainReactor(log.TestingLogger(), maxBlockHeight)
bcr.Start()
defer bcr.Stop()
p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
return s
// Add some peers in
peer := newbcrTestPeer(p2p.ID(cmn.RandStr(12)))
bcr.AddPeer(peer)
}, p2p.Connect2Switches)
chID := byte(0x01)
defer func() {
for _, r := range reactorPairs {
r.reactor.Stop()
r.app.Stop()
}
}()
tests := []struct { tests := []struct {
height int64 height int64
@ -78,72 +157,100 @@ func TestNoBlockResponse(t *testing.T) {
{100, false}, {100, false},
} }
// receive a request message from peer,
// wait for our response to be received on the peer
for _, tt := range tests {
reqBlockMsg := &bcBlockRequestMessage{tt.height}
reqBlockBytes := cdc.MustMarshalBinaryBare(reqBlockMsg)
bcr.Receive(chID, peer, reqBlockBytes)
msg := peer.lastBlockchainMessage()
for {
if reactorPairs[1].reactor.pool.IsCaughtUp() {
break
}
time.Sleep(10 * time.Millisecond)
}
assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.store.Height())
for _, tt := range tests {
block := reactorPairs[1].reactor.store.LoadBlock(tt.height)
if tt.existent { if tt.existent {
if blockMsg, ok := msg.(*bcBlockResponseMessage); !ok {
t.Fatalf("Expected to receive a block response for height %d", tt.height)
} else if blockMsg.Block.Height != tt.height {
t.Fatalf("Expected response to be for height %d, got %d", tt.height, blockMsg.Block.Height)
}
assert.True(t, block != nil)
} else { } else {
if noBlockMsg, ok := msg.(*bcNoBlockResponseMessage); !ok {
t.Fatalf("Expected to receive a no block response for height %d", tt.height)
} else if noBlockMsg.Height != tt.height {
t.Fatalf("Expected response to be for height %d, got %d", tt.height, noBlockMsg.Height)
}
assert.True(t, block == nil)
} }
} }
} }
/*
// NOTE: This is too hard to test without // NOTE: This is too hard to test without
// an easy way to add test peer to switch // an easy way to add test peer to switch
// or without significant refactoring of the module. // or without significant refactoring of the module.
// Alternatively we could actually dial a TCP conn but // Alternatively we could actually dial a TCP conn but
// that seems extreme. // that seems extreme.
func TestBadBlockStopsPeer(t *testing.T) { func TestBadBlockStopsPeer(t *testing.T) {
maxBlockHeight := int64(20)
config = cfg.ResetTestRoot("blockchain_reactor_test")
genDoc, privVals := randGenesisDoc(1, false, 30)
maxBlockHeight := int64(148)
otherChain := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
defer func() {
otherChain.reactor.Stop()
otherChain.app.Stop()
}()
reactorPairs := make([]BlockchainReactorPair, 4)
reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
reactorPairs[2] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
reactorPairs[3] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
return s
}, p2p.Connect2Switches)
defer func() {
for _, r := range reactorPairs {
r.reactor.Stop()
r.app.Stop()
}
}()
for {
if reactorPairs[3].reactor.pool.IsCaughtUp() {
break
}
time.Sleep(1 * time.Second)
}
//at this time, reactors[0-3] is the newest
assert.Equal(t, 3, reactorPairs[1].reactor.Switch.Peers().Size())
bcr := newBlockchainReactor(log.TestingLogger(), maxBlockHeight)
bcr.Start()
defer bcr.Stop()
//mark reactorPairs[3] is an invalid peer
reactorPairs[3].reactor.store = otherChain.reactor.store
// Add some peers in
peer := newbcrTestPeer(p2p.ID(cmn.RandStr(12)))
lastReactorPair := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
reactorPairs = append(reactorPairs, lastReactorPair)
// XXX: This doesn't add the peer to anything,
// so it's hard to check that it's later removed
bcr.AddPeer(peer)
assert.True(t, bcr.Switch.Peers().Size() > 0)
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
return s
// send a bad block from the peer
// default blocks already dont have commits, so should fail
block := bcr.store.LoadBlock(3)
msg := &bcBlockResponseMessage{Block: block}
peer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})
}, p2p.Connect2Switches)...)
for i := 0; i < len(reactorPairs)-1; i++ {
p2p.Connect2Switches(switches, i, len(reactorPairs)-1)
}
ticker := time.NewTicker(time.Millisecond * 10)
timer := time.NewTimer(time.Second * 2)
LOOP:
for { for {
select {
case <-ticker.C:
if bcr.Switch.Peers().Size() == 0 {
break LOOP
}
case <-timer.C:
t.Fatal("Timed out waiting to disconnect peer")
if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 {
break
} }
time.Sleep(1 * time.Second)
} }
assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1)
} }
*/
//---------------------------------------------- //----------------------------------------------
// utility funcs // utility funcs
@ -155,56 +262,41 @@ func makeTxs(height int64) (txs []types.Tx) {
return txs return txs
} }
func makeBlock(height int64, state sm.State) *types.Block {
block, _ := state.MakeBlock(height, makeTxs(height), new(types.Commit), nil, state.Validators.GetProposer().Address)
func makeBlock(height int64, state sm.State, lastCommit *types.Commit) *types.Block {
block, _ := state.MakeBlock(height, makeTxs(height), lastCommit, nil, state.Validators.GetProposer().Address)
return block return block
} }
// The Test peer
type bcrTestPeer struct {
cmn.BaseService
id p2p.ID
ch chan interface{}
type testApp struct {
abci.BaseApplication
} }
var _ p2p.Peer = (*bcrTestPeer)(nil)
var _ abci.Application = (*testApp)(nil)
func newbcrTestPeer(id p2p.ID) *bcrTestPeer {
bcr := &bcrTestPeer{
id: id,
ch: make(chan interface{}, 2),
}
bcr.BaseService = *cmn.NewBaseService(nil, "bcrTestPeer", bcr)
return bcr
func (app *testApp) Info(req abci.RequestInfo) (resInfo abci.ResponseInfo) {
return abci.ResponseInfo{}
} }
func (tp *bcrTestPeer) lastBlockchainMessage() interface{} { return <-tp.ch }
func (app *testApp) BeginBlock(req abci.RequestBeginBlock) abci.ResponseBeginBlock {
return abci.ResponseBeginBlock{}
}
func (tp *bcrTestPeer) TrySend(chID byte, msgBytes []byte) bool {
var msg BlockchainMessage
err := cdc.UnmarshalBinaryBare(msgBytes, &msg)
if err != nil {
panic(cmn.ErrorWrap(err, "Error while trying to parse a BlockchainMessage"))
}
if _, ok := msg.(*bcStatusResponseMessage); ok {
// Discard status response messages since they skew our results
// We only want to deal with:
// + bcBlockResponseMessage
// + bcNoBlockResponseMessage
} else {
tp.ch <- msg
}
return true
}
func (tp *bcrTestPeer) FlushStop() {}
func (tp *bcrTestPeer) Send(chID byte, msgBytes []byte) bool { return tp.TrySend(chID, msgBytes) }
func (tp *bcrTestPeer) NodeInfo() p2p.NodeInfo { return p2p.DefaultNodeInfo{} }
func (tp *bcrTestPeer) Status() p2p.ConnectionStatus { return p2p.ConnectionStatus{} }
func (tp *bcrTestPeer) ID() p2p.ID { return tp.id }
func (tp *bcrTestPeer) IsOutbound() bool { return false }
func (tp *bcrTestPeer) IsPersistent() bool { return true }
func (tp *bcrTestPeer) Get(s string) interface{} { return s }
func (tp *bcrTestPeer) Set(string, interface{}) {}
func (tp *bcrTestPeer) RemoteIP() net.IP { return []byte{127, 0, 0, 1} }
func (tp *bcrTestPeer) OriginalAddr() *p2p.NetAddress { return nil }
func (app *testApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock {
return abci.ResponseEndBlock{}
}
func (app *testApp) DeliverTx(tx []byte) abci.ResponseDeliverTx {
return abci.ResponseDeliverTx{Tags: []cmn.KVPair{}}
}
func (app *testApp) CheckTx(tx []byte) abci.ResponseCheckTx {
return abci.ResponseCheckTx{}
}
func (app *testApp) Commit() abci.ResponseCommit {
return abci.ResponseCommit{}
}
func (app *testApp) Query(reqQuery abci.RequestQuery) (resQuery abci.ResponseQuery) {
return
}

+ 20
- 3
blockchain/store_test.go View File

@ -9,13 +9,30 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
cfg "github.com/tendermint/tendermint/config"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/libs/db"
dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time" tmtime "github.com/tendermint/tendermint/types/time"
) )
func makeStateAndBlockStore(logger log.Logger) (sm.State, *BlockStore) {
config := cfg.ResetTestRoot("blockchain_reactor_test")
// blockDB := dbm.NewDebugDB("blockDB", dbm.NewMemDB())
// stateDB := dbm.NewDebugDB("stateDB", dbm.NewMemDB())
blockDB := dbm.NewMemDB()
stateDB := dbm.NewMemDB()
state, err := sm.LoadStateFromDBOrGenesisFile(stateDB, config.GenesisFile())
if err != nil {
panic(cmn.ErrorWrap(err, "error constructing state from genesis file"))
}
return state, NewBlockStore(blockDB)
}
func TestLoadBlockStoreStateJSON(t *testing.T) { func TestLoadBlockStoreStateJSON(t *testing.T) {
db := db.NewMemDB() db := db.NewMemDB()
@ -65,7 +82,7 @@ func freshBlockStore() (*BlockStore, db.DB) {
var ( var (
state, _ = makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer))) state, _ = makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer)))
block = makeBlock(1, state)
block = makeBlock(1, state, new(types.Commit))
partSet = block.MakePartSet(2) partSet = block.MakePartSet(2)
part1 = partSet.GetPart(0) part1 = partSet.GetPart(0)
part2 = partSet.GetPart(1) part2 = partSet.GetPart(1)
@ -88,7 +105,7 @@ func TestBlockStoreSaveLoadBlock(t *testing.T) {
} }
// save a block // save a block
block := makeBlock(bs.Height()+1, state)
block := makeBlock(bs.Height()+1, state, new(types.Commit))
validPartSet := block.MakePartSet(2) validPartSet := block.MakePartSet(2)
seenCommit := &types.Commit{Precommits: []*types.Vote{{Height: 10, seenCommit := &types.Commit{Precommits: []*types.Vote{{Height: 10,
Timestamp: tmtime.Now()}}} Timestamp: tmtime.Now()}}}
@ -331,7 +348,7 @@ func TestLoadBlockMeta(t *testing.T) {
func TestBlockFetchAtHeight(t *testing.T) { func TestBlockFetchAtHeight(t *testing.T) {
state, bs := makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer))) state, bs := makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer)))
require.Equal(t, bs.Height(), int64(0), "initially the height should be zero") require.Equal(t, bs.Height(), int64(0), "initially the height should be zero")
block := makeBlock(bs.Height()+1, state)
block := makeBlock(bs.Height()+1, state, new(types.Commit))
partSet := block.MakePartSet(2) partSet := block.MakePartSet(2)
seenCommit := &types.Commit{Precommits: []*types.Vote{{Height: 10, seenCommit := &types.Commit{Precommits: []*types.Vote{{Height: 10,


+ 4
- 4
consensus/mempool_test.go View File

@ -72,18 +72,18 @@ func TestMempoolProgressInHigherRound(t *testing.T) {
startTestRound(cs, height, round) startTestRound(cs, height, round)
ensureNewRound(newRoundCh, height, round) // first round at first height ensureNewRound(newRoundCh, height, round) // first round at first height
ensureNewEventOnChannel(newBlockCh) // first block gets committed
ensureNewEventOnChannel(newBlockCh) // first block gets committed
height = height + 1 // moving to the next height height = height + 1 // moving to the next height
round = 0 round = 0
ensureNewRound(newRoundCh, height, round) // first round at next height ensureNewRound(newRoundCh, height, round) // first round at next height
deliverTxsRange(cs, 0, 1) // we deliver txs, but dont set a proposal so we get the next round
deliverTxsRange(cs, 0, 1) // we deliver txs, but dont set a proposal so we get the next round
ensureNewTimeout(timeoutCh, height, round, cs.config.TimeoutPropose.Nanoseconds()) ensureNewTimeout(timeoutCh, height, round, cs.config.TimeoutPropose.Nanoseconds())
round = round + 1 // moving to the next round
round = round + 1 // moving to the next round
ensureNewRound(newRoundCh, height, round) // wait for the next round ensureNewRound(newRoundCh, height, round) // wait for the next round
ensureNewEventOnChannel(newBlockCh) // now we can commit the block
ensureNewEventOnChannel(newBlockCh) // now we can commit the block
} }
func deliverTxsRange(cs *ConsensusState, start, end int) { func deliverTxsRange(cs *ConsensusState, start, end int) {


+ 1
- 1
consensus/state_test.go View File

@ -1043,7 +1043,7 @@ func TestNoHearbeatWhenNotValidator(t *testing.T) {
cs.Stop() cs.Stop()
// if a faulty implementation sends an event, we should wait here a little bit to make sure we don't miss it by prematurely leaving the test method // if a faulty implementation sends an event, we should wait here a little bit to make sure we don't miss it by prematurely leaving the test method
time.Sleep((proposalHeartbeatIntervalSeconds + 1) * time.Second)
time.Sleep((proposalHeartbeatIntervalSeconds + 1) * time.Second)
} }
// regression for #2518 // regression for #2518


+ 28
- 25
docs/spec/reactors/block_sync/reactor.md View File

@ -65,24 +65,24 @@ type Requester {
mtx Mutex mtx Mutex
block Block block Block
height int64 height int64

peerID p2p.ID
redoChannel chan struct{}
peerID p2p.ID
redoChannel chan p2p.ID //redo may send multi-time; peerId is used to identify repeat
} }
``` ```
Pool is core data structure that stores last executed block (`height`), assignment of requests to peers (`requesters`), current height for each peer and number of pending requests for each peer (`peers`), maximum peer height, etc.
Pool is a core data structure that stores last executed block (`height`), assignment of requests to peers (`requesters`), current height for each peer and number of pending requests for each peer (`peers`), maximum peer height, etc.
```go ```go
type Pool { type Pool {
mtx Mutex
requesters map[int64]*Requester
height int64
peers map[p2p.ID]*Peer
maxPeerHeight int64
numPending int32
store BlockStore
requestsChannel chan<- BlockRequest
errorsChannel chan<- peerError
mtx Mutex
requesters map[int64]*Requester
height int64
peers map[p2p.ID]*Peer
maxPeerHeight int64
numPending int32
store BlockStore
requestsChannel chan<- BlockRequest
errorsChannel chan<- peerError
} }
``` ```
@ -90,11 +90,11 @@ Peer data structure stores for each peer current `height` and number of pending
```go ```go
type Peer struct { type Peer struct {
id p2p.ID
height int64
numPending int32
timeout *time.Timer
didTimeout bool
id p2p.ID
height int64
numPending int32
timeout *time.Timer
didTimeout bool
} }
``` ```
@ -169,11 +169,11 @@ Requester task is responsible for fetching a single block at position `height`.
```go ```go
fetchBlock(height, pool): fetchBlock(height, pool):
while true do
while true do {
peerID = nil peerID = nil
block = nil block = nil
peer = pickAvailablePeer(height) peer = pickAvailablePeer(height)
peerId = peer.id
peerID = peer.id
enqueue BlockRequest(height, peerID) to pool.requestsChannel enqueue BlockRequest(height, peerID) to pool.requestsChannel
redo = false redo = false
@ -181,12 +181,15 @@ fetchBlock(height, pool):
select { select {
upon receiving Quit message do upon receiving Quit message do
return return
upon receiving message on redoChannel do
mtx.Lock()
pool.numPending++
redo = true
mtx.UnLock()
upon receiving redo message with id on redoChannel do
if peerID == id {
mtx.Lock()
pool.numPending++
redo = true
mtx.UnLock()
}
} }
}
pickAvailablePeer(height): pickAvailablePeer(height):
selectedPeer = nil selectedPeer = nil
@ -244,7 +247,7 @@ createRequesters(pool):
main(pool): main(pool):
create trySyncTicker with interval trySyncIntervalMS create trySyncTicker with interval trySyncIntervalMS
create statusUpdateTicker with interval statusUpdateIntervalSeconds create statusUpdateTicker with interval statusUpdateIntervalSeconds
create switchToConsensusTicker with interbal switchToConsensusIntervalSeconds
create switchToConsensusTicker with interval switchToConsensusIntervalSeconds
while true do while true do
select { select {


+ 1
- 1
evidence/reactor.go View File

@ -163,7 +163,7 @@ func (evR EvidenceReactor) checkSendEvidenceMessage(peer p2p.Peer, ev types.Evid
// make sure the peer is up to date // make sure the peer is up to date
evHeight := ev.Height() evHeight := ev.Height()
peerState, ok := peer.Get(types.PeerStateKey).(PeerState) peerState, ok := peer.Get(types.PeerStateKey).(PeerState)
if !ok {
if !ok {
// Peer does not have a state yet. We set it in the consensus reactor, but // Peer does not have a state yet. We set it in the consensus reactor, but
// when we add peer in Switch, the order we call reactors#AddPeer is // when we add peer in Switch, the order we call reactors#AddPeer is
// different every time due to us using a map. Sometimes other reactors // different every time due to us using a map. Sometimes other reactors


+ 1
- 1
libs/autofile/autofile_test.go View File

@ -119,4 +119,4 @@ func TestAutoFileSize(t *testing.T) {
// Cleanup // Cleanup
_ = os.Remove(f.Name()) _ = os.Remove(f.Name())
}
}

+ 3
- 5
node/node_test.go View File

@ -13,18 +13,16 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/abci/example/kvstore" "github.com/tendermint/tendermint/abci/example/kvstore"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/crypto/ed25519"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/privval"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/version"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/privval"
tmtime "github.com/tendermint/tendermint/types/time" tmtime "github.com/tendermint/tendermint/types/time"
"github.com/tendermint/tendermint/version"
) )
func TestNodeStartStop(t *testing.T) { func TestNodeStartStop(t *testing.T) {


+ 1
- 1
types/events.go View File

@ -100,7 +100,7 @@ type EventDataCompleteProposal struct {
Round int `json:"round"` Round int `json:"round"`
Step string `json:"step"` Step string `json:"step"`
BlockID BlockID `json:"block_id"`
BlockID BlockID `json:"block_id"`
} }
type EventDataVote struct { type EventDataVote struct {


Loading…
Cancel
Save