diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index ad9a2f617..93dd5c5d4 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -32,6 +32,7 @@ program](https://hackerone.com/tendermint). ### BUG FIXES: +- [blockchain] \#2731 Retry both blocks if either is bad to avoid getting stuck during fast sync (@goolAdapter) - [log] \#2868 fix module=main setting overriding all others - [rpc] \#2808 RPC validators calls IncrementAccum if necessary - [rpc] \#2811 Allow integer IDs in JSON-RPC requests diff --git a/blockchain/pool.go b/blockchain/pool.go index c7864a646..e6be36012 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -168,9 +168,12 @@ func (pool *BlockPool) IsCaughtUp() bool { return false } - // some conditions to determine if we're caught up - receivedBlockOrTimedOut := (pool.height > 0 || time.Since(pool.startTime) > 5*time.Second) - ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= pool.maxPeerHeight + // Some conditions to determine if we're caught up. + // Ensures we've either received a block or waited some amount of time, + // and that we're synced to the highest known height. Note we use maxPeerHeight - 1 + // because to sync block H requires block H+1 to verify the LastCommit. + receivedBlockOrTimedOut := pool.height > 0 || time.Since(pool.startTime) > 5*time.Second + ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= (pool.maxPeerHeight-1) isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers return isCaughtUp } @@ -252,7 +255,8 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int peer.decrPending(blockSize) } } else { - // Bad peer? + pool.Logger.Info("invalid peer", "peer", peerID, "blockHeight", block.Height) + pool.sendError(errors.New("invalid peer"), peerID) } } @@ -292,7 +296,7 @@ func (pool *BlockPool) RemovePeer(peerID p2p.ID) { func (pool *BlockPool) removePeer(peerID p2p.ID) { for _, requester := range pool.requesters { if requester.getPeerID() == peerID { - requester.redo() + requester.redo(peerID) } } delete(pool.peers, peerID) @@ -326,8 +330,11 @@ func (pool *BlockPool) makeNextRequester() { defer pool.mtx.Unlock() nextHeight := pool.height + pool.requestersLen() + if nextHeight > pool.maxPeerHeight { + return + } + request := newBPRequester(pool, nextHeight) - // request.SetLogger(pool.Logger.With("height", nextHeight)) pool.requesters[nextHeight] = request atomic.AddInt32(&pool.numPending, 1) @@ -453,7 +460,7 @@ type bpRequester struct { pool *BlockPool height int64 gotBlockCh chan struct{} - redoCh chan struct{} + redoCh chan p2p.ID //redo may send multitime, add peerId to identify repeat mtx sync.Mutex peerID p2p.ID @@ -465,7 +472,7 @@ func newBPRequester(pool *BlockPool, height int64) *bpRequester { pool: pool, height: height, gotBlockCh: make(chan struct{}, 1), - redoCh: make(chan struct{}, 1), + redoCh: make(chan p2p.ID, 1), peerID: "", block: nil, @@ -524,9 +531,9 @@ func (bpr *bpRequester) reset() { // Tells bpRequester to pick another peer and try again. // NOTE: Nonblocking, and does nothing if another redo // was already requested. -func (bpr *bpRequester) redo() { +func (bpr *bpRequester) redo(peerId p2p.ID) { select { - case bpr.redoCh <- struct{}{}: + case bpr.redoCh <- peerId: default: } } @@ -565,9 +572,13 @@ OUTER_LOOP: return case <-bpr.Quit(): return - case <-bpr.redoCh: - bpr.reset() - continue OUTER_LOOP + case peerID := <-bpr.redoCh: + if peerID == bpr.peerID { + bpr.reset() + continue OUTER_LOOP + } else { + continue WAIT_LOOP + } case <-bpr.gotBlockCh: // We got a block! // Continue the for-loop and wait til Quit. diff --git a/blockchain/pool_test.go b/blockchain/pool_test.go index 01187bcfe..75a03f631 100644 --- a/blockchain/pool_test.go +++ b/blockchain/pool_test.go @@ -16,16 +16,52 @@ func init() { } type testPeer struct { - id p2p.ID - height int64 + id p2p.ID + height int64 + inputChan chan inputData //make sure each peer's data is sequential } -func makePeers(numPeers int, minHeight, maxHeight int64) map[p2p.ID]testPeer { - peers := make(map[p2p.ID]testPeer, numPeers) +type inputData struct { + t *testing.T + pool *BlockPool + request BlockRequest +} + +func (p testPeer) runInputRoutine() { + go func() { + for input := range p.inputChan { + p.simulateInput(input) + } + }() +} + +// Request desired, pretend like we got the block immediately. +func (p testPeer) simulateInput(input inputData) { + block := &types.Block{Header: types.Header{Height: input.request.Height}} + input.pool.AddBlock(input.request.PeerID, block, 123) + input.t.Logf("Added block from peer %v (height: %v)", input.request.PeerID, input.request.Height) +} + +type testPeers map[p2p.ID]testPeer + +func (ps testPeers) start() { + for _, v := range ps { + v.runInputRoutine() + } +} + +func (ps testPeers) stop() { + for _, v := range ps { + close(v.inputChan) + } +} + +func makePeers(numPeers int, minHeight, maxHeight int64) testPeers { + peers := make(testPeers, numPeers) for i := 0; i < numPeers; i++ { peerID := p2p.ID(cmn.RandStr(12)) height := minHeight + cmn.RandInt63n(maxHeight-minHeight) - peers[peerID] = testPeer{peerID, height} + peers[peerID] = testPeer{peerID, height, make(chan inputData, 10)} } return peers } @@ -45,6 +81,9 @@ func TestBasic(t *testing.T) { defer pool.Stop() + peers.start() + defer peers.stop() + // Introduce each peer. go func() { for _, peer := range peers { @@ -77,12 +116,8 @@ func TestBasic(t *testing.T) { if request.Height == 300 { return // Done! } - // Request desired, pretend like we got the block immediately. - go func() { - block := &types.Block{Header: types.Header{Height: request.Height}} - pool.AddBlock(request.PeerID, block, 123) - t.Logf("Added block from peer %v (height: %v)", request.PeerID, request.Height) - }() + + peers[request.PeerID].inputChan <- inputData{t, pool, request} } } } diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 59318dcc5..e62a9e4fe 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -264,8 +264,12 @@ FOR_LOOP: bcR.Logger.Info("Time to switch to consensus reactor!", "height", height) bcR.pool.Stop() - conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor) - conR.SwitchToConsensus(state, blocksSynced) + conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor) + if ok { + conR.SwitchToConsensus(state, blocksSynced) + } else { + // should only happen during testing + } break FOR_LOOP } @@ -314,6 +318,13 @@ FOR_LOOP: // still need to clean up the rest. bcR.Switch.StopPeerForError(peer, fmt.Errorf("BlockchainReactor validation error: %v", err)) } + peerID2 := bcR.pool.RedoRequest(second.Height) + peer2 := bcR.Switch.Peers().Get(peerID2) + if peer2 != nil && peer2 != peer { + // NOTE: we've already removed the peer's request, but we + // still need to clean up the rest. + bcR.Switch.StopPeerForError(peer2, fmt.Errorf("BlockchainReactor validation error: %v", err)) + } continue FOR_LOOP } else { bcR.pool.PopRequest() diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index 9b26f919a..ac499efa6 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -1,72 +1,151 @@ package blockchain import ( - "net" + "sort" "testing" + "time" + "github.com/stretchr/testify/assert" + + abci "github.com/tendermint/tendermint/abci/types" + cfg "github.com/tendermint/tendermint/config" cmn "github.com/tendermint/tendermint/libs/common" dbm "github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/libs/log" - - cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" + tmtime "github.com/tendermint/tendermint/types/time" ) -func makeStateAndBlockStore(logger log.Logger) (sm.State, *BlockStore) { - config := cfg.ResetTestRoot("blockchain_reactor_test") - // blockDB := dbm.NewDebugDB("blockDB", dbm.NewMemDB()) - // stateDB := dbm.NewDebugDB("stateDB", dbm.NewMemDB()) +var config *cfg.Config + +func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.GenesisDoc, []types.PrivValidator) { + validators := make([]types.GenesisValidator, numValidators) + privValidators := make([]types.PrivValidator, numValidators) + for i := 0; i < numValidators; i++ { + val, privVal := types.RandValidator(randPower, minPower) + validators[i] = types.GenesisValidator{ + PubKey: val.PubKey, + Power: val.VotingPower, + } + privValidators[i] = privVal + } + sort.Sort(types.PrivValidatorsByAddress(privValidators)) + + return &types.GenesisDoc{ + GenesisTime: tmtime.Now(), + ChainID: config.ChainID(), + Validators: validators, + }, privValidators +} + +func makeVote(header *types.Header, blockID types.BlockID, valset *types.ValidatorSet, privVal types.PrivValidator) *types.Vote { + addr := privVal.GetAddress() + idx, _ := valset.GetByAddress(addr) + vote := &types.Vote{ + ValidatorAddress: addr, + ValidatorIndex: idx, + Height: header.Height, + Round: 1, + Timestamp: tmtime.Now(), + Type: types.PrecommitType, + BlockID: blockID, + } + + privVal.SignVote(header.ChainID, vote) + + return vote +} + +type BlockchainReactorPair struct { + reactor *BlockchainReactor + app proxy.AppConns +} + +func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals []types.PrivValidator, maxBlockHeight int64) BlockchainReactorPair { + if len(privVals) != 1 { + panic("only support one validator") + } + + app := &testApp{} + cc := proxy.NewLocalClientCreator(app) + proxyApp := proxy.NewAppConns(cc) + err := proxyApp.Start() + if err != nil { + panic(cmn.ErrorWrap(err, "error start app")) + } + blockDB := dbm.NewMemDB() stateDB := dbm.NewMemDB() blockStore := NewBlockStore(blockDB) - state, err := sm.LoadStateFromDBOrGenesisFile(stateDB, config.GenesisFile()) + + state, err := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc) if err != nil { panic(cmn.ErrorWrap(err, "error constructing state from genesis file")) } - return state, blockStore -} - -func newBlockchainReactor(logger log.Logger, maxBlockHeight int64) *BlockchainReactor { - state, blockStore := makeStateAndBlockStore(logger) - // Make the blockchainReactor itself + // Make the BlockchainReactor itself. + // NOTE we have to create and commit the blocks first because + // pool.height is determined from the store. fastSync := true - var nilApp proxy.AppConnConsensus - blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), nilApp, + blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), proxyApp.Consensus(), sm.MockMempool{}, sm.MockEvidencePool{}) - bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) - bcReactor.SetLogger(logger.With("module", "blockchain")) + // let's add some blocks in + for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ { + lastCommit := &types.Commit{} + if blockHeight > 1 { + lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1) + lastBlock := blockStore.LoadBlock(blockHeight - 1) - // Next: we need to set a switch in order for peers to be added in - bcReactor.Switch = p2p.NewSwitch(cfg.DefaultP2PConfig(), nil) + vote := makeVote(&lastBlock.Header, lastBlockMeta.BlockID, state.Validators, privVals[0]) + lastCommit = &types.Commit{Precommits: []*types.Vote{vote}, BlockID: lastBlockMeta.BlockID} + } - // Lastly: let's add some blocks in - for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ { - firstBlock := makeBlock(blockHeight, state) - secondBlock := makeBlock(blockHeight+1, state) - firstParts := firstBlock.MakePartSet(types.BlockPartSizeBytes) - blockStore.SaveBlock(firstBlock, firstParts, secondBlock.LastCommit) + thisBlock := makeBlock(blockHeight, state, lastCommit) + + thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes) + blockID := types.BlockID{thisBlock.Hash(), thisParts.Header()} + + state, err = blockExec.ApplyBlock(state, blockID, thisBlock) + if err != nil { + panic(cmn.ErrorWrap(err, "error apply block")) + } + + blockStore.SaveBlock(thisBlock, thisParts, lastCommit) } - return bcReactor + bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) + bcReactor.SetLogger(logger.With("module", "blockchain")) + + return BlockchainReactorPair{bcReactor, proxyApp} } func TestNoBlockResponse(t *testing.T) { - maxBlockHeight := int64(20) + config = cfg.ResetTestRoot("blockchain_reactor_test") + genDoc, privVals := randGenesisDoc(1, false, 30) + + maxBlockHeight := int64(65) + + reactorPairs := make([]BlockchainReactorPair, 2) + + reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight) + reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) - bcr := newBlockchainReactor(log.TestingLogger(), maxBlockHeight) - bcr.Start() - defer bcr.Stop() + p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch { + s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) + return s - // Add some peers in - peer := newbcrTestPeer(p2p.ID(cmn.RandStr(12))) - bcr.AddPeer(peer) + }, p2p.Connect2Switches) - chID := byte(0x01) + defer func() { + for _, r := range reactorPairs { + r.reactor.Stop() + r.app.Stop() + } + }() tests := []struct { height int64 @@ -78,72 +157,100 @@ func TestNoBlockResponse(t *testing.T) { {100, false}, } - // receive a request message from peer, - // wait for our response to be received on the peer - for _, tt := range tests { - reqBlockMsg := &bcBlockRequestMessage{tt.height} - reqBlockBytes := cdc.MustMarshalBinaryBare(reqBlockMsg) - bcr.Receive(chID, peer, reqBlockBytes) - msg := peer.lastBlockchainMessage() + for { + if reactorPairs[1].reactor.pool.IsCaughtUp() { + break + } + + time.Sleep(10 * time.Millisecond) + } + + assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.store.Height()) + for _, tt := range tests { + block := reactorPairs[1].reactor.store.LoadBlock(tt.height) if tt.existent { - if blockMsg, ok := msg.(*bcBlockResponseMessage); !ok { - t.Fatalf("Expected to receive a block response for height %d", tt.height) - } else if blockMsg.Block.Height != tt.height { - t.Fatalf("Expected response to be for height %d, got %d", tt.height, blockMsg.Block.Height) - } + assert.True(t, block != nil) } else { - if noBlockMsg, ok := msg.(*bcNoBlockResponseMessage); !ok { - t.Fatalf("Expected to receive a no block response for height %d", tt.height) - } else if noBlockMsg.Height != tt.height { - t.Fatalf("Expected response to be for height %d, got %d", tt.height, noBlockMsg.Height) - } + assert.True(t, block == nil) } } } -/* // NOTE: This is too hard to test without // an easy way to add test peer to switch // or without significant refactoring of the module. // Alternatively we could actually dial a TCP conn but // that seems extreme. func TestBadBlockStopsPeer(t *testing.T) { - maxBlockHeight := int64(20) + config = cfg.ResetTestRoot("blockchain_reactor_test") + genDoc, privVals := randGenesisDoc(1, false, 30) + + maxBlockHeight := int64(148) + + otherChain := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight) + defer func() { + otherChain.reactor.Stop() + otherChain.app.Stop() + }() + + reactorPairs := make([]BlockchainReactorPair, 4) + + reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight) + reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) + reactorPairs[2] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) + reactorPairs[3] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) + + switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch { + s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) + return s + + }, p2p.Connect2Switches) + + defer func() { + for _, r := range reactorPairs { + r.reactor.Stop() + r.app.Stop() + } + }() + + for { + if reactorPairs[3].reactor.pool.IsCaughtUp() { + break + } + + time.Sleep(1 * time.Second) + } + + //at this time, reactors[0-3] is the newest + assert.Equal(t, 3, reactorPairs[1].reactor.Switch.Peers().Size()) - bcr := newBlockchainReactor(log.TestingLogger(), maxBlockHeight) - bcr.Start() - defer bcr.Stop() + //mark reactorPairs[3] is an invalid peer + reactorPairs[3].reactor.store = otherChain.reactor.store - // Add some peers in - peer := newbcrTestPeer(p2p.ID(cmn.RandStr(12))) + lastReactorPair := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) + reactorPairs = append(reactorPairs, lastReactorPair) - // XXX: This doesn't add the peer to anything, - // so it's hard to check that it's later removed - bcr.AddPeer(peer) - assert.True(t, bcr.Switch.Peers().Size() > 0) + switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch { + s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor) + return s - // send a bad block from the peer - // default blocks already dont have commits, so should fail - block := bcr.store.LoadBlock(3) - msg := &bcBlockResponseMessage{Block: block} - peer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg}) + }, p2p.Connect2Switches)...) + + for i := 0; i < len(reactorPairs)-1; i++ { + p2p.Connect2Switches(switches, i, len(reactorPairs)-1) + } - ticker := time.NewTicker(time.Millisecond * 10) - timer := time.NewTimer(time.Second * 2) -LOOP: for { - select { - case <-ticker.C: - if bcr.Switch.Peers().Size() == 0 { - break LOOP - } - case <-timer.C: - t.Fatal("Timed out waiting to disconnect peer") + if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 { + break } + + time.Sleep(1 * time.Second) } + + assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1) } -*/ //---------------------------------------------- // utility funcs @@ -155,56 +262,41 @@ func makeTxs(height int64) (txs []types.Tx) { return txs } -func makeBlock(height int64, state sm.State) *types.Block { - block, _ := state.MakeBlock(height, makeTxs(height), new(types.Commit), nil, state.Validators.GetProposer().Address) +func makeBlock(height int64, state sm.State, lastCommit *types.Commit) *types.Block { + block, _ := state.MakeBlock(height, makeTxs(height), lastCommit, nil, state.Validators.GetProposer().Address) return block } -// The Test peer -type bcrTestPeer struct { - cmn.BaseService - id p2p.ID - ch chan interface{} +type testApp struct { + abci.BaseApplication } -var _ p2p.Peer = (*bcrTestPeer)(nil) +var _ abci.Application = (*testApp)(nil) -func newbcrTestPeer(id p2p.ID) *bcrTestPeer { - bcr := &bcrTestPeer{ - id: id, - ch: make(chan interface{}, 2), - } - bcr.BaseService = *cmn.NewBaseService(nil, "bcrTestPeer", bcr) - return bcr +func (app *testApp) Info(req abci.RequestInfo) (resInfo abci.ResponseInfo) { + return abci.ResponseInfo{} } -func (tp *bcrTestPeer) lastBlockchainMessage() interface{} { return <-tp.ch } +func (app *testApp) BeginBlock(req abci.RequestBeginBlock) abci.ResponseBeginBlock { + return abci.ResponseBeginBlock{} +} -func (tp *bcrTestPeer) TrySend(chID byte, msgBytes []byte) bool { - var msg BlockchainMessage - err := cdc.UnmarshalBinaryBare(msgBytes, &msg) - if err != nil { - panic(cmn.ErrorWrap(err, "Error while trying to parse a BlockchainMessage")) - } - if _, ok := msg.(*bcStatusResponseMessage); ok { - // Discard status response messages since they skew our results - // We only want to deal with: - // + bcBlockResponseMessage - // + bcNoBlockResponseMessage - } else { - tp.ch <- msg - } - return true -} - -func (tp *bcrTestPeer) FlushStop() {} -func (tp *bcrTestPeer) Send(chID byte, msgBytes []byte) bool { return tp.TrySend(chID, msgBytes) } -func (tp *bcrTestPeer) NodeInfo() p2p.NodeInfo { return p2p.DefaultNodeInfo{} } -func (tp *bcrTestPeer) Status() p2p.ConnectionStatus { return p2p.ConnectionStatus{} } -func (tp *bcrTestPeer) ID() p2p.ID { return tp.id } -func (tp *bcrTestPeer) IsOutbound() bool { return false } -func (tp *bcrTestPeer) IsPersistent() bool { return true } -func (tp *bcrTestPeer) Get(s string) interface{} { return s } -func (tp *bcrTestPeer) Set(string, interface{}) {} -func (tp *bcrTestPeer) RemoteIP() net.IP { return []byte{127, 0, 0, 1} } -func (tp *bcrTestPeer) OriginalAddr() *p2p.NetAddress { return nil } +func (app *testApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock { + return abci.ResponseEndBlock{} +} + +func (app *testApp) DeliverTx(tx []byte) abci.ResponseDeliverTx { + return abci.ResponseDeliverTx{Tags: []cmn.KVPair{}} +} + +func (app *testApp) CheckTx(tx []byte) abci.ResponseCheckTx { + return abci.ResponseCheckTx{} +} + +func (app *testApp) Commit() abci.ResponseCommit { + return abci.ResponseCommit{} +} + +func (app *testApp) Query(reqQuery abci.RequestQuery) (resQuery abci.ResponseQuery) { + return +} diff --git a/blockchain/store_test.go b/blockchain/store_test.go index 9c8fdb23c..a52039fa4 100644 --- a/blockchain/store_test.go +++ b/blockchain/store_test.go @@ -9,13 +9,30 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + cfg "github.com/tendermint/tendermint/config" + cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/db" + dbm "github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/libs/log" + sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" tmtime "github.com/tendermint/tendermint/types/time" ) +func makeStateAndBlockStore(logger log.Logger) (sm.State, *BlockStore) { + config := cfg.ResetTestRoot("blockchain_reactor_test") + // blockDB := dbm.NewDebugDB("blockDB", dbm.NewMemDB()) + // stateDB := dbm.NewDebugDB("stateDB", dbm.NewMemDB()) + blockDB := dbm.NewMemDB() + stateDB := dbm.NewMemDB() + state, err := sm.LoadStateFromDBOrGenesisFile(stateDB, config.GenesisFile()) + if err != nil { + panic(cmn.ErrorWrap(err, "error constructing state from genesis file")) + } + return state, NewBlockStore(blockDB) +} + func TestLoadBlockStoreStateJSON(t *testing.T) { db := db.NewMemDB() @@ -65,7 +82,7 @@ func freshBlockStore() (*BlockStore, db.DB) { var ( state, _ = makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer))) - block = makeBlock(1, state) + block = makeBlock(1, state, new(types.Commit)) partSet = block.MakePartSet(2) part1 = partSet.GetPart(0) part2 = partSet.GetPart(1) @@ -88,7 +105,7 @@ func TestBlockStoreSaveLoadBlock(t *testing.T) { } // save a block - block := makeBlock(bs.Height()+1, state) + block := makeBlock(bs.Height()+1, state, new(types.Commit)) validPartSet := block.MakePartSet(2) seenCommit := &types.Commit{Precommits: []*types.Vote{{Height: 10, Timestamp: tmtime.Now()}}} @@ -331,7 +348,7 @@ func TestLoadBlockMeta(t *testing.T) { func TestBlockFetchAtHeight(t *testing.T) { state, bs := makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer))) require.Equal(t, bs.Height(), int64(0), "initially the height should be zero") - block := makeBlock(bs.Height()+1, state) + block := makeBlock(bs.Height()+1, state, new(types.Commit)) partSet := block.MakePartSet(2) seenCommit := &types.Commit{Precommits: []*types.Vote{{Height: 10, diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index 6d36d1e74..49ba74fe5 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -72,18 +72,18 @@ func TestMempoolProgressInHigherRound(t *testing.T) { startTestRound(cs, height, round) ensureNewRound(newRoundCh, height, round) // first round at first height - ensureNewEventOnChannel(newBlockCh) // first block gets committed + ensureNewEventOnChannel(newBlockCh) // first block gets committed height = height + 1 // moving to the next height round = 0 ensureNewRound(newRoundCh, height, round) // first round at next height - deliverTxsRange(cs, 0, 1) // we deliver txs, but dont set a proposal so we get the next round + deliverTxsRange(cs, 0, 1) // we deliver txs, but dont set a proposal so we get the next round ensureNewTimeout(timeoutCh, height, round, cs.config.TimeoutPropose.Nanoseconds()) - round = round + 1 // moving to the next round + round = round + 1 // moving to the next round ensureNewRound(newRoundCh, height, round) // wait for the next round - ensureNewEventOnChannel(newBlockCh) // now we can commit the block + ensureNewEventOnChannel(newBlockCh) // now we can commit the block } func deliverTxsRange(cs *ConsensusState, start, end int) { diff --git a/consensus/state_test.go b/consensus/state_test.go index 19dde0532..941a99cda 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -1043,7 +1043,7 @@ func TestNoHearbeatWhenNotValidator(t *testing.T) { cs.Stop() // if a faulty implementation sends an event, we should wait here a little bit to make sure we don't miss it by prematurely leaving the test method - time.Sleep((proposalHeartbeatIntervalSeconds + 1) * time.Second) + time.Sleep((proposalHeartbeatIntervalSeconds + 1) * time.Second) } // regression for #2518 diff --git a/docs/spec/reactors/block_sync/reactor.md b/docs/spec/reactors/block_sync/reactor.md index 045bbd400..91fd79b0b 100644 --- a/docs/spec/reactors/block_sync/reactor.md +++ b/docs/spec/reactors/block_sync/reactor.md @@ -65,24 +65,24 @@ type Requester { mtx Mutex block Block height int64 - 
peerID p2p.ID - redoChannel chan struct{} + 
 peerID p2p.ID + redoChannel chan p2p.ID //redo may send multi-time; peerId is used to identify repeat } ``` -Pool is core data structure that stores last executed block (`height`), assignment of requests to peers (`requesters`), current height for each peer and number of pending requests for each peer (`peers`), maximum peer height, etc. +Pool is a core data structure that stores last executed block (`height`), assignment of requests to peers (`requesters`), current height for each peer and number of pending requests for each peer (`peers`), maximum peer height, etc. ```go type Pool { - mtx Mutex - requesters map[int64]*Requester - height int64 - peers map[p2p.ID]*Peer - maxPeerHeight int64 - numPending int32 - store BlockStore - requestsChannel chan<- BlockRequest - errorsChannel chan<- peerError + mtx Mutex + requesters map[int64]*Requester + height int64 + peers map[p2p.ID]*Peer + maxPeerHeight int64 + numPending int32 + store BlockStore + requestsChannel chan<- BlockRequest + errorsChannel chan<- peerError } ``` @@ -90,11 +90,11 @@ Peer data structure stores for each peer current `height` and number of pending ```go type Peer struct { - id p2p.ID - height int64 - numPending int32 - timeout *time.Timer - didTimeout bool + id p2p.ID + height int64 + numPending int32 + timeout *time.Timer + didTimeout bool } ``` @@ -169,11 +169,11 @@ Requester task is responsible for fetching a single block at position `height`. ```go fetchBlock(height, pool): - while true do + while true do { peerID = nil block = nil peer = pickAvailablePeer(height) - peerId = peer.id + peerID = peer.id enqueue BlockRequest(height, peerID) to pool.requestsChannel redo = false @@ -181,12 +181,15 @@ fetchBlock(height, pool): select { upon receiving Quit message do return - upon receiving message on redoChannel do - mtx.Lock() - pool.numPending++ - redo = true - mtx.UnLock() + upon receiving redo message with id on redoChannel do + if peerID == id { + mtx.Lock() + pool.numPending++ + redo = true + mtx.UnLock() + } } + } pickAvailablePeer(height): selectedPeer = nil @@ -244,7 +247,7 @@ createRequesters(pool): main(pool): create trySyncTicker with interval trySyncIntervalMS create statusUpdateTicker with interval statusUpdateIntervalSeconds - create switchToConsensusTicker with interbal switchToConsensusIntervalSeconds + create switchToConsensusTicker with interval switchToConsensusIntervalSeconds while true do select { diff --git a/evidence/reactor.go b/evidence/reactor.go index 48092fdff..6bb45e689 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -163,7 +163,7 @@ func (evR EvidenceReactor) checkSendEvidenceMessage(peer p2p.Peer, ev types.Evid // make sure the peer is up to date evHeight := ev.Height() peerState, ok := peer.Get(types.PeerStateKey).(PeerState) - if !ok { + if !ok { // Peer does not have a state yet. We set it in the consensus reactor, but // when we add peer in Switch, the order we call reactors#AddPeer is // different every time due to us using a map. Sometimes other reactors diff --git a/libs/autofile/autofile_test.go b/libs/autofile/autofile_test.go index 9903f1e68..d9c90309e 100644 --- a/libs/autofile/autofile_test.go +++ b/libs/autofile/autofile_test.go @@ -119,4 +119,4 @@ func TestAutoFileSize(t *testing.T) { // Cleanup _ = os.Remove(f.Name()) -} \ No newline at end of file +} diff --git a/node/node_test.go b/node/node_test.go index 4d0019eac..e675eb9a8 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -13,18 +13,16 @@ import ( "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/abci/example/kvstore" + cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/crypto/ed25519" cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/privval" sm "github.com/tendermint/tendermint/state" - "github.com/tendermint/tendermint/version" - - cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/types" - - "github.com/tendermint/tendermint/privval" tmtime "github.com/tendermint/tendermint/types/time" + "github.com/tendermint/tendermint/version" ) func TestNodeStartStop(t *testing.T) { diff --git a/types/events.go b/types/events.go index 2f9dc76ee..c33b5978f 100644 --- a/types/events.go +++ b/types/events.go @@ -100,7 +100,7 @@ type EventDataCompleteProposal struct { Round int `json:"round"` Step string `json:"step"` - BlockID BlockID `json:"block_id"` + BlockID BlockID `json:"block_id"` } type EventDataVote struct {