diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index c4720e494..8dac42b0c 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -10,6 +10,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi - CLI/RPC/Config - [config] \#5598 The `test_fuzz` and `test_fuzz_config` P2P settings have been removed. (@erikgrinaker) + - [config] \#5728 `fast_sync = "v1"` is no longer supported (@melekes) - Apps @@ -33,6 +34,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi - [abci/client] \#5673 `Async` requests return an error if queue is full (@melekes) - [mempool] \#5673 Cancel `CheckTx` requests if RPC client disconnects or times out (@melekes) - [abci] \#5706 Added `AbciVersion` to `RequestInfo` allowing applications to check ABCI version when connecting to Tendermint. (@marbar3778) +- [blockchain/v1] \#5728 Remove in favor of v2 (@melekes) ### BUG FIXES diff --git a/UPGRADING.md b/UPGRADING.md index 6f48af15a..d6e66276f 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -8,6 +8,10 @@ This guide provides instructions for upgrading to specific versions of Tendermin * Added `AbciVersion` to `RequestInfo`. Applications should check that the ABCI version they expect is being used in order to avoid unimplemented changes errors. +### Config Changes + +* `fast_sync = "v1"` is no longer supported. Please use `v2` instead. + ## v0.34.0 **Upgrading to Tendermint 0.34 requires a blockchain restart.** @@ -117,7 +121,7 @@ Tendermint 0.34 includes new and updated consensus parameters. #### Evidence Parameters -* `MaxBytes`, which caps the total amount of evidence. The default is 1048576 (1 MB). +* `MaxBytes`, which caps the total amount of evidence. The default is 1048576 (1 MB). ### Crypto @@ -194,7 +198,7 @@ blockchains, we recommend that you check the chain ID. ### Version -Version is now set through Go linker flags `ld_flags`. Applications that are using tendermint as a library should set this at compile time. +Version is now set through Go linker flags `ld_flags`. Applications that are using tendermint as a library should set this at compile time. Example: @@ -202,7 +206,7 @@ Example: go install -mod=readonly -ldflags "-X github.com/tendermint/tendermint/version.TMCoreSemVer=$(go list -m github.com/tendermint/tendermint | sed 's/ /\@/g') -s -w " -trimpath ./cmd ``` -Additionally, the exported constant `version.Version` is now `version.TMCoreSemVer`. +Additionally, the exported constant `version.Version` is now `version.TMCoreSemVer`. ## v0.33.4 diff --git a/blockchain/v1/peer.go b/blockchain/v1/peer.go deleted file mode 100644 index a9b5e0379..000000000 --- a/blockchain/v1/peer.go +++ /dev/null @@ -1,226 +0,0 @@ -package v1 - -import ( - "fmt" - "math" - "time" - - flow "github.com/tendermint/tendermint/libs/flowrate" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/p2p" - "github.com/tendermint/tendermint/types" -) - -//-------- -// Peer - -// BpPeerParams stores the peer parameters that are used when creating a peer. -type BpPeerParams struct { - timeout time.Duration - minRecvRate int64 - sampleRate time.Duration - windowSize time.Duration -} - -// BpPeer is the datastructure associated with a fast sync peer. -type BpPeer struct { - logger log.Logger - ID p2p.ID - - Base int64 // the peer reported base - Height int64 // the peer reported height - NumPendingBlockRequests int // number of requests still waiting for block responses - blocks map[int64]*types.Block // blocks received or expected to be received from this peer - noBlocks map[int64]struct{} // heights for which the peer does not have blocks - blockResponseTimer *time.Timer - recvMonitor *flow.Monitor - params *BpPeerParams // parameters for timer and monitor - - onErr func(err error, peerID p2p.ID) // function to call on error -} - -// NewBpPeer creates a new peer. -func NewBpPeer(peerID p2p.ID, base int64, height int64, - onErr func(err error, peerID p2p.ID), params *BpPeerParams) *BpPeer { - - if params == nil { - params = BpPeerDefaultParams() - } - return &BpPeer{ - ID: peerID, - Base: base, - Height: height, - blocks: make(map[int64]*types.Block, maxRequestsPerPeer), - noBlocks: make(map[int64]struct{}), - logger: log.NewNopLogger(), - onErr: onErr, - params: params, - } -} - -// String returns a string representation of a peer. -func (peer *BpPeer) String() string { - return fmt.Sprintf("peer: %v height: %v pending: %v", peer.ID, peer.Height, peer.NumPendingBlockRequests) -} - -// SetLogger sets the logger of the peer. -func (peer *BpPeer) SetLogger(l log.Logger) { - peer.logger = l -} - -// Cleanup performs cleanup of the peer, removes blocks, requests, stops timer and monitor. -func (peer *BpPeer) Cleanup() { - if peer.blockResponseTimer != nil { - peer.blockResponseTimer.Stop() - } - if peer.NumPendingBlockRequests != 0 { - peer.logger.Info("peer with pending requests is being cleaned", "peer", peer.ID) - } - if len(peer.blocks)-peer.NumPendingBlockRequests != 0 { - peer.logger.Info("peer with pending blocks is being cleaned", "peer", peer.ID) - } - for h := range peer.blocks { - delete(peer.blocks, h) - } - peer.NumPendingBlockRequests = 0 - peer.recvMonitor = nil -} - -// BlockAtHeight returns the block at a given height if available and errMissingBlock otherwise. -func (peer *BpPeer) BlockAtHeight(height int64) (*types.Block, error) { - block, ok := peer.blocks[height] - if !ok { - return nil, errMissingBlock - } - if block == nil { - return nil, errMissingBlock - } - return peer.blocks[height], nil -} - -// AddBlock adds a block at peer level. Block must be non-nil and recvSize a positive integer -// The peer must have a pending request for this block. -func (peer *BpPeer) AddBlock(block *types.Block, recvSize int) error { - if block == nil || recvSize < 0 { - panic("bad parameters") - } - existingBlock, ok := peer.blocks[block.Height] - if !ok { - peer.logger.Error("unsolicited block", "blockHeight", block.Height, "peer", peer.ID) - return errMissingBlock - } - if existingBlock != nil { - peer.logger.Error("already have a block for height", "height", block.Height) - return errDuplicateBlock - } - if peer.NumPendingBlockRequests == 0 { - panic("peer does not have pending requests") - } - peer.blocks[block.Height] = block - peer.NumPendingBlockRequests-- - if peer.NumPendingBlockRequests == 0 { - peer.stopMonitor() - peer.stopBlockResponseTimer() - } else { - peer.recvMonitor.Update(recvSize) - peer.resetBlockResponseTimer() - } - return nil -} - -// RemoveBlock removes the block of given height -func (peer *BpPeer) RemoveBlock(height int64) { - delete(peer.blocks, height) -} - -// SetNoBlock records that the peer does not have a block for height. -func (peer *BpPeer) SetNoBlock(height int64) { - peer.noBlocks[height] = struct{}{} -} - -// NoBlock returns true if the peer does not have a block for height. -func (peer *BpPeer) NoBlock(height int64) bool { - if _, ok := peer.noBlocks[height]; ok { - return true - } - return false -} - -// RequestSent records that a request was sent, and starts the peer timer and monitor if needed. -func (peer *BpPeer) RequestSent(height int64) { - peer.blocks[height] = nil - - if peer.NumPendingBlockRequests == 0 { - peer.startMonitor() - peer.resetBlockResponseTimer() - } - peer.NumPendingBlockRequests++ -} - -// CheckRate verifies that the response rate of the peer is acceptable (higher than the minimum allowed). -func (peer *BpPeer) CheckRate() error { - if peer.NumPendingBlockRequests == 0 { - return nil - } - curRate := peer.recvMonitor.Status().CurRate - // curRate can be 0 on start - if curRate != 0 && curRate < peer.params.minRecvRate { - err := errSlowPeer - peer.logger.Error("SendTimeout", "peer", peer, - "reason", err, - "curRate", fmt.Sprintf("%d KB/s", curRate/1024), - "minRate", fmt.Sprintf("%d KB/s", peer.params.minRecvRate/1024)) - return err - } - return nil -} - -func (peer *BpPeer) onTimeout() { - peer.onErr(errNoPeerResponse, peer.ID) -} - -func (peer *BpPeer) stopMonitor() { - peer.recvMonitor.Done() - peer.recvMonitor = nil -} - -func (peer *BpPeer) startMonitor() { - peer.recvMonitor = flow.New(peer.params.sampleRate, peer.params.windowSize) - initialValue := float64(peer.params.minRecvRate) * math.E - peer.recvMonitor.SetREMA(initialValue) -} - -func (peer *BpPeer) resetBlockResponseTimer() { - if peer.blockResponseTimer == nil { - peer.blockResponseTimer = time.AfterFunc(peer.params.timeout, peer.onTimeout) - } else { - peer.blockResponseTimer.Reset(peer.params.timeout) - } -} - -func (peer *BpPeer) stopBlockResponseTimer() bool { - if peer.blockResponseTimer == nil { - return false - } - return peer.blockResponseTimer.Stop() -} - -// BpPeerDefaultParams returns the default peer parameters. -func BpPeerDefaultParams() *BpPeerParams { - return &BpPeerParams{ - // Timeout for a peer to respond to a block request. - timeout: 15 * time.Second, - - // Minimum recv rate to ensure we're receiving blocks from a peer fast - // enough. If a peer is not sending data at at least that rate, we - // consider them to have timedout and we disconnect. - // - // Assuming a DSL connection (not a good choice) 128 Kbps (upload) ~ 15 KB/s, - // sending data across atlantic ~ 7.5 KB/s. - minRecvRate: int64(7680), - - // Monitor parameters - sampleRate: time.Second, - windowSize: 40 * time.Second, - } -} diff --git a/blockchain/v1/peer_test.go b/blockchain/v1/peer_test.go deleted file mode 100644 index fd9e9f14b..000000000 --- a/blockchain/v1/peer_test.go +++ /dev/null @@ -1,280 +0,0 @@ -package v1 - -import ( - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/tendermint/tendermint/libs/log" - tmrand "github.com/tendermint/tendermint/libs/rand" - "github.com/tendermint/tendermint/p2p" - "github.com/tendermint/tendermint/types" -) - -func TestPeerMonitor(t *testing.T) { - peer := NewBpPeer( - p2p.ID(tmrand.Str(12)), 0, 10, - func(err error, _ p2p.ID) {}, - nil) - peer.SetLogger(log.TestingLogger()) - peer.startMonitor() - assert.NotNil(t, peer.recvMonitor) - peer.stopMonitor() - assert.Nil(t, peer.recvMonitor) -} - -func TestPeerResetBlockResponseTimer(t *testing.T) { - var ( - numErrFuncCalls int // number of calls to the errFunc - lastErr error // last generated error - peerTestMtx sync.Mutex // modifications of ^^ variables are also done from timer handler goroutine - ) - params := &BpPeerParams{timeout: 20 * time.Millisecond} - - peer := NewBpPeer( - p2p.ID(tmrand.Str(12)), 0, 10, - func(err error, _ p2p.ID) { - peerTestMtx.Lock() - defer peerTestMtx.Unlock() - lastErr = err - numErrFuncCalls++ - }, - params) - - peer.SetLogger(log.TestingLogger()) - checkByStoppingPeerTimer(t, peer, false) - - // initial reset call with peer having a nil timer - peer.resetBlockResponseTimer() - assert.NotNil(t, peer.blockResponseTimer) - // make sure timer is running and stop it - checkByStoppingPeerTimer(t, peer, true) - - // reset with running timer - peer.resetBlockResponseTimer() - time.Sleep(5 * time.Millisecond) - peer.resetBlockResponseTimer() - assert.NotNil(t, peer.blockResponseTimer) - - // let the timer expire and ... - time.Sleep(50 * time.Millisecond) - // ... check timer is not running - checkByStoppingPeerTimer(t, peer, false) - - peerTestMtx.Lock() - // ... check errNoPeerResponse has been sent - assert.Equal(t, 1, numErrFuncCalls) - assert.Equal(t, lastErr, errNoPeerResponse) - peerTestMtx.Unlock() -} - -func TestPeerRequestSent(t *testing.T) { - params := &BpPeerParams{timeout: 2 * time.Millisecond} - - peer := NewBpPeer( - p2p.ID(tmrand.Str(12)), 0, 10, - func(err error, _ p2p.ID) {}, - params) - - peer.SetLogger(log.TestingLogger()) - - peer.RequestSent(1) - assert.NotNil(t, peer.recvMonitor) - assert.NotNil(t, peer.blockResponseTimer) - assert.Equal(t, 1, peer.NumPendingBlockRequests) - - peer.RequestSent(1) - assert.NotNil(t, peer.recvMonitor) - assert.NotNil(t, peer.blockResponseTimer) - assert.Equal(t, 2, peer.NumPendingBlockRequests) -} - -func TestPeerGetAndRemoveBlock(t *testing.T) { - peer := NewBpPeer( - p2p.ID(tmrand.Str(12)), 0, 100, - func(err error, _ p2p.ID) {}, - nil) - - // Change peer height - peer.Height = int64(10) - assert.Equal(t, int64(10), peer.Height) - - // request some blocks and receive few of them - for i := 1; i <= 10; i++ { - peer.RequestSent(int64(i)) - if i > 5 { - // only receive blocks 1..5 - continue - } - _ = peer.AddBlock(makeSmallBlock(i), 10) - } - - tests := []struct { - name string - height int64 - wantErr error - blockPresent bool - }{ - {"no request", 100, errMissingBlock, false}, - {"no block", 6, errMissingBlock, false}, - {"block 1 present", 1, nil, true}, - {"block max present", 5, nil, true}, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - // try to get the block - b, err := peer.BlockAtHeight(tt.height) - assert.Equal(t, tt.wantErr, err) - assert.Equal(t, tt.blockPresent, b != nil) - - // remove the block - peer.RemoveBlock(tt.height) - _, err = peer.BlockAtHeight(tt.height) - assert.Equal(t, errMissingBlock, err) - }) - } -} - -func TestPeerAddBlock(t *testing.T) { - peer := NewBpPeer( - p2p.ID(tmrand.Str(12)), 0, 100, - func(err error, _ p2p.ID) {}, - nil) - - // request some blocks, receive one - for i := 1; i <= 10; i++ { - peer.RequestSent(int64(i)) - if i == 5 { - // receive block 5 - _ = peer.AddBlock(makeSmallBlock(i), 10) - } - } - - tests := []struct { - name string - height int64 - wantErr error - blockPresent bool - }{ - {"no request", 50, errMissingBlock, false}, - {"duplicate block", 5, errDuplicateBlock, true}, - {"block 1 successfully received", 1, nil, true}, - {"block max successfully received", 10, nil, true}, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - // try to get the block - err := peer.AddBlock(makeSmallBlock(int(tt.height)), 10) - assert.Equal(t, tt.wantErr, err) - _, err = peer.BlockAtHeight(tt.height) - assert.Equal(t, tt.blockPresent, err == nil) - }) - } -} - -func TestPeerOnErrFuncCalledDueToExpiration(t *testing.T) { - - params := &BpPeerParams{timeout: 10 * time.Millisecond} - var ( - numErrFuncCalls int // number of calls to the onErr function - lastErr error // last generated error - peerTestMtx sync.Mutex // modifications of ^^ variables are also done from timer handler goroutine - ) - - peer := NewBpPeer( - p2p.ID(tmrand.Str(12)), 0, 10, - func(err error, _ p2p.ID) { - peerTestMtx.Lock() - defer peerTestMtx.Unlock() - lastErr = err - numErrFuncCalls++ - }, - params) - - peer.SetLogger(log.TestingLogger()) - - peer.RequestSent(1) - time.Sleep(50 * time.Millisecond) - // timer should have expired by now, check that the on error function was called - peerTestMtx.Lock() - assert.Equal(t, 1, numErrFuncCalls) - assert.Equal(t, errNoPeerResponse, lastErr) - peerTestMtx.Unlock() -} - -func TestPeerCheckRate(t *testing.T) { - params := &BpPeerParams{ - timeout: time.Second, - minRecvRate: int64(100), // 100 bytes/sec exponential moving average - } - peer := NewBpPeer( - p2p.ID(tmrand.Str(12)), 0, 10, - func(err error, _ p2p.ID) {}, - params) - peer.SetLogger(log.TestingLogger()) - - require.Nil(t, peer.CheckRate()) - - for i := 0; i < 40; i++ { - peer.RequestSent(int64(i)) - } - - // monitor starts with a higher rEMA (~ 2*minRecvRate), wait for it to go down - time.Sleep(900 * time.Millisecond) - - // normal peer - send a bit more than 100 bytes/sec, > 10 bytes/100msec, check peer is not considered slow - for i := 0; i < 10; i++ { - _ = peer.AddBlock(makeSmallBlock(i), 11) - time.Sleep(100 * time.Millisecond) - require.Nil(t, peer.CheckRate()) - } - - // slow peer - send a bit less than 10 bytes/100msec - for i := 10; i < 20; i++ { - _ = peer.AddBlock(makeSmallBlock(i), 9) - time.Sleep(100 * time.Millisecond) - } - // check peer is considered slow - assert.Equal(t, errSlowPeer, peer.CheckRate()) -} - -func TestPeerCleanup(t *testing.T) { - params := &BpPeerParams{timeout: 2 * time.Millisecond} - - peer := NewBpPeer( - p2p.ID(tmrand.Str(12)), 0, 10, - func(err error, _ p2p.ID) {}, - params) - peer.SetLogger(log.TestingLogger()) - - assert.Nil(t, peer.blockResponseTimer) - peer.RequestSent(1) - assert.NotNil(t, peer.blockResponseTimer) - - peer.Cleanup() - checkByStoppingPeerTimer(t, peer, false) -} - -// Check if peer timer is running or not (a running timer can be successfully stopped). -// Note: stops the timer. -func checkByStoppingPeerTimer(t *testing.T, peer *BpPeer, running bool) { - assert.NotPanics(t, func() { - stopped := peer.stopBlockResponseTimer() - if running { - assert.True(t, stopped) - } else { - assert.False(t, stopped) - } - }) -} - -func makeSmallBlock(height int) *types.Block { - return types.MakeBlock(int64(height), []types.Tx{types.Tx("foo")}, nil, nil) -} diff --git a/blockchain/v1/pool.go b/blockchain/v1/pool.go deleted file mode 100644 index 9c037183d..000000000 --- a/blockchain/v1/pool.go +++ /dev/null @@ -1,382 +0,0 @@ -package v1 - -import ( - "sort" - - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/p2p" - "github.com/tendermint/tendermint/types" -) - -// BlockPool keeps track of the fast sync peers, block requests and block responses. -type BlockPool struct { - logger log.Logger - // Set of peers that have sent status responses, with height bigger than pool.Height - peers map[p2p.ID]*BpPeer - // Set of block heights and the corresponding peers from where a block response is expected or has been received. - blocks map[int64]p2p.ID - - plannedRequests map[int64]struct{} // list of blocks to be assigned peers for blockRequest - nextRequestHeight int64 // next height to be added to plannedRequests - - Height int64 // height of next block to execute - MaxPeerHeight int64 // maximum height of all peers - toBcR bcReactor -} - -// NewBlockPool creates a new BlockPool. -func NewBlockPool(height int64, toBcR bcReactor) *BlockPool { - return &BlockPool{ - Height: height, - MaxPeerHeight: 0, - peers: make(map[p2p.ID]*BpPeer), - blocks: make(map[int64]p2p.ID), - plannedRequests: make(map[int64]struct{}), - nextRequestHeight: height, - toBcR: toBcR, - } -} - -// SetLogger sets the logger of the pool. -func (pool *BlockPool) SetLogger(l log.Logger) { - pool.logger = l -} - -// ReachedMaxHeight check if the pool has reached the maximum peer height. -func (pool *BlockPool) ReachedMaxHeight() bool { - return pool.Height >= pool.MaxPeerHeight -} - -func (pool *BlockPool) rescheduleRequest(peerID p2p.ID, height int64) { - pool.logger.Info("reschedule requests made to peer for height ", "peerID", peerID, "height", height) - pool.plannedRequests[height] = struct{}{} - delete(pool.blocks, height) - pool.peers[peerID].RemoveBlock(height) -} - -// Updates the pool's max height. If no peers are left MaxPeerHeight is set to 0. -func (pool *BlockPool) updateMaxPeerHeight() { - var newMax int64 - for _, peer := range pool.peers { - peerHeight := peer.Height - if peerHeight > newMax { - newMax = peerHeight - } - } - pool.MaxPeerHeight = newMax -} - -// UpdatePeer adds a new peer or updates an existing peer with a new base and height. -// If a peer is short it is not added. -func (pool *BlockPool) UpdatePeer(peerID p2p.ID, base int64, height int64) error { - - peer := pool.peers[peerID] - - if peer == nil { - if height < pool.Height { - pool.logger.Info("Peer height too small", - "peer", peerID, "height", height, "fsm_height", pool.Height) - return errPeerTooShort - } - // Add new peer. - peer = NewBpPeer(peerID, base, height, pool.toBcR.sendPeerError, nil) - peer.SetLogger(pool.logger.With("peer", peerID)) - pool.peers[peerID] = peer - pool.logger.Info("added peer", "peerID", peerID, "base", base, "height", height, "num_peers", len(pool.peers)) - } else { - // Check if peer is lowering its height. This is not allowed. - if height < peer.Height { - pool.RemovePeer(peerID, errPeerLowersItsHeight) - return errPeerLowersItsHeight - } - // Update existing peer. - peer.Base = base - peer.Height = height - } - - // Update the pool's MaxPeerHeight if needed. - pool.updateMaxPeerHeight() - - return nil -} - -// SetNoBlock records that the peer does not have a block for height and -// schedules a new request for that height from another peer. -func (pool *BlockPool) SetNoBlock(peerID p2p.ID, height int64) { - peer := pool.peers[peerID] - if peer == nil { - return - } - peer.SetNoBlock(height) - - pool.rescheduleRequest(peerID, height) -} - -// Cleans and deletes the peer. Recomputes the max peer height. -func (pool *BlockPool) deletePeer(peer *BpPeer) { - if peer == nil { - return - } - peer.Cleanup() - delete(pool.peers, peer.ID) - - if peer.Height == pool.MaxPeerHeight { - pool.updateMaxPeerHeight() - } -} - -// RemovePeer removes the blocks and requests from the peer, reschedules them and deletes the peer. -func (pool *BlockPool) RemovePeer(peerID p2p.ID, err error) { - peer := pool.peers[peerID] - if peer == nil { - return - } - pool.logger.Info("removing peer", "peerID", peerID, "error", err) - - // Reschedule the block requests made to the peer, or received and not processed yet. - // Note that some of the requests may be removed further down. - for h := range pool.peers[peerID].blocks { - pool.rescheduleRequest(peerID, h) - } - - oldMaxPeerHeight := pool.MaxPeerHeight - // Delete the peer. This operation may result in the pool's MaxPeerHeight being lowered. - pool.deletePeer(peer) - - // Check if the pool's MaxPeerHeight has been lowered. - // This may happen if the tallest peer has been removed. - if oldMaxPeerHeight > pool.MaxPeerHeight { - // Remove any planned requests for heights over the new MaxPeerHeight. - for h := range pool.plannedRequests { - if h > pool.MaxPeerHeight { - delete(pool.plannedRequests, h) - } - } - // Adjust the nextRequestHeight to the new max plus one. - if pool.nextRequestHeight > pool.MaxPeerHeight { - pool.nextRequestHeight = pool.MaxPeerHeight + 1 - } - } -} - -func (pool *BlockPool) removeShortPeers() { - for _, peer := range pool.peers { - if peer.Height < pool.Height { - pool.RemovePeer(peer.ID, nil) - } - } -} - -func (pool *BlockPool) removeBadPeers() { - pool.removeShortPeers() - for _, peer := range pool.peers { - if err := peer.CheckRate(); err != nil { - pool.RemovePeer(peer.ID, err) - pool.toBcR.sendPeerError(err, peer.ID) - } - } -} - -// MakeNextRequests creates more requests if the block pool is running low. -func (pool *BlockPool) MakeNextRequests(maxNumRequests int) { - heights := pool.makeRequestBatch(maxNumRequests) - if len(heights) != 0 { - pool.logger.Info("makeNextRequests will make following requests", - "number", len(heights), "heights", heights) - } - - for _, height := range heights { - h := int64(height) - if !pool.sendRequest(h) { - // If a good peer was not found for sending the request at height h then return, - // as it shouldn't be possible to find a peer for h+1. - return - } - delete(pool.plannedRequests, h) - } -} - -// Makes a batch of requests sorted by height such that the block pool has up to maxNumRequests entries. -func (pool *BlockPool) makeRequestBatch(maxNumRequests int) []int { - pool.removeBadPeers() - // At this point pool.requests may include heights for requests to be redone due to removal of peers: - // - peers timed out or were removed by switch - // - FSM timed out on waiting to advance the block execution due to missing blocks at h or h+1 - // Determine the number of requests needed by subtracting the number of requests already made from the maximum - // allowed - numNeeded := maxNumRequests - len(pool.blocks) - for len(pool.plannedRequests) < numNeeded { - if pool.nextRequestHeight > pool.MaxPeerHeight { - break - } - pool.plannedRequests[pool.nextRequestHeight] = struct{}{} - pool.nextRequestHeight++ - } - - heights := make([]int, 0, len(pool.plannedRequests)) - for k := range pool.plannedRequests { - heights = append(heights, int(k)) - } - sort.Ints(heights) - return heights -} - -func (pool *BlockPool) sendRequest(height int64) bool { - for _, peer := range pool.peers { - if peer.NumPendingBlockRequests >= maxRequestsPerPeer { - continue - } - if peer.Base > height || peer.Height < height || peer.NoBlock(height) { - continue - } - - err := pool.toBcR.sendBlockRequest(peer.ID, height) - if err == errNilPeerForBlockRequest { - // Switch does not have this peer, remove it and continue to look for another peer. - pool.logger.Error("switch does not have peer..removing peer selected for height", "peer", - peer.ID, "height", height) - pool.RemovePeer(peer.ID, err) - continue - } - - if err == errSendQueueFull { - pool.logger.Error("peer queue is full", "peer", peer.ID, "height", height) - continue - } - - pool.logger.Info("assigned request to peer", "peer", peer.ID, "height", height) - - pool.blocks[height] = peer.ID - peer.RequestSent(height) - - return true - } - pool.logger.Error("could not find peer to send request for block at height", "height", height) - return false -} - -// AddBlock validates that the block comes from the peer it was expected from and stores it in the 'blocks' map. -func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int) error { - peer, ok := pool.peers[peerID] - if !ok { - pool.logger.Error("block from unknown peer", "height", block.Height, "peer", peerID) - return errBadDataFromPeer - } - if wantPeerID, ok := pool.blocks[block.Height]; ok && wantPeerID != peerID { - pool.logger.Error("block received from wrong peer", "height", block.Height, - "peer", peerID, "expected_peer", wantPeerID) - return errBadDataFromPeer - } - - return peer.AddBlock(block, blockSize) -} - -// BlockData stores the peer responsible to deliver a block and the actual block if delivered. -type BlockData struct { - block *types.Block - peer *BpPeer -} - -// BlockAndPeerAtHeight retrieves the block and delivery peer at specified height. -// Returns errMissingBlock if a block was not found -func (pool *BlockPool) BlockAndPeerAtHeight(height int64) (bData *BlockData, err error) { - peerID := pool.blocks[height] - peer := pool.peers[peerID] - if peer == nil { - return nil, errMissingBlock - } - - block, err := peer.BlockAtHeight(height) - if err != nil { - return nil, err - } - - return &BlockData{peer: peer, block: block}, nil - -} - -// FirstTwoBlocksAndPeers returns the blocks and the delivery peers at pool's height H and H+1. -func (pool *BlockPool) FirstTwoBlocksAndPeers() (first, second *BlockData, err error) { - first, err = pool.BlockAndPeerAtHeight(pool.Height) - second, err2 := pool.BlockAndPeerAtHeight(pool.Height + 1) - if err == nil { - err = err2 - } - return -} - -// InvalidateFirstTwoBlocks removes the peers that sent us the first two blocks, blocks are removed by RemovePeer(). -func (pool *BlockPool) InvalidateFirstTwoBlocks(err error) { - first, err1 := pool.BlockAndPeerAtHeight(pool.Height) - second, err2 := pool.BlockAndPeerAtHeight(pool.Height + 1) - - if err1 == nil { - pool.RemovePeer(first.peer.ID, err) - } - if err2 == nil { - pool.RemovePeer(second.peer.ID, err) - } -} - -// ProcessedCurrentHeightBlock performs cleanup after a block is processed. It removes block at pool height and -// the peers that are now short. -func (pool *BlockPool) ProcessedCurrentHeightBlock() { - peerID, peerOk := pool.blocks[pool.Height] - if peerOk { - pool.peers[peerID].RemoveBlock(pool.Height) - } - delete(pool.blocks, pool.Height) - pool.logger.Debug("removed block at height", "height", pool.Height) - pool.Height++ - pool.removeShortPeers() -} - -// RemovePeerAtCurrentHeights checks if a block at pool's height H exists and if not, it removes the -// delivery peer and returns. If a block at height H exists then the check and peer removal is done for H+1. -// This function is called when the FSM is not able to make progress for some time. -// This happens if either the block H or H+1 have not been delivered. -func (pool *BlockPool) RemovePeerAtCurrentHeights(err error) { - peerID := pool.blocks[pool.Height] - peer, ok := pool.peers[peerID] - if ok { - if _, err := peer.BlockAtHeight(pool.Height); err != nil { - pool.logger.Info("remove peer that hasn't sent block at pool.Height", - "peer", peerID, "height", pool.Height) - pool.RemovePeer(peerID, err) - return - } - } - peerID = pool.blocks[pool.Height+1] - peer, ok = pool.peers[peerID] - if ok { - if _, err := peer.BlockAtHeight(pool.Height + 1); err != nil { - pool.logger.Info("remove peer that hasn't sent block at pool.Height+1", - "peer", peerID, "height", pool.Height+1) - pool.RemovePeer(peerID, err) - return - } - } -} - -// Cleanup performs pool and peer cleanup -func (pool *BlockPool) Cleanup() { - for id, peer := range pool.peers { - peer.Cleanup() - delete(pool.peers, id) - } - pool.plannedRequests = make(map[int64]struct{}) - pool.blocks = make(map[int64]p2p.ID) - pool.nextRequestHeight = 0 - pool.Height = 0 - pool.MaxPeerHeight = 0 -} - -// NumPeers returns the number of peers in the pool -func (pool *BlockPool) NumPeers() int { - return len(pool.peers) -} - -// NeedsBlocks returns true if more blocks are required. -func (pool *BlockPool) NeedsBlocks() bool { - return len(pool.blocks) < maxNumRequests -} diff --git a/blockchain/v1/pool_test.go b/blockchain/v1/pool_test.go deleted file mode 100644 index 31b9d09f7..000000000 --- a/blockchain/v1/pool_test.go +++ /dev/null @@ -1,691 +0,0 @@ -package v1 - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/p2p" - "github.com/tendermint/tendermint/types" -) - -type testPeer struct { - id p2p.ID - base int64 - height int64 -} - -type testBcR struct { - logger log.Logger -} - -type testValues struct { - numRequestsSent int -} - -var testResults testValues - -func resetPoolTestResults() { - testResults.numRequestsSent = 0 -} - -func (testR *testBcR) sendPeerError(err error, peerID p2p.ID) { -} - -func (testR *testBcR) sendStatusRequest() { -} - -func (testR *testBcR) sendBlockRequest(peerID p2p.ID, height int64) error { - testResults.numRequestsSent++ - return nil -} - -func (testR *testBcR) resetStateTimer(name string, timer **time.Timer, timeout time.Duration) { -} - -func (testR *testBcR) switchToConsensus() { - -} - -func newTestBcR() *testBcR { - testBcR := &testBcR{logger: log.TestingLogger()} - return testBcR -} - -type tPBlocks struct { - id p2p.ID - create bool -} - -// Makes a block pool with specified current height, list of peers, block requests and block responses -func makeBlockPool(bcr *testBcR, height int64, peers []BpPeer, blocks map[int64]tPBlocks) *BlockPool { - bPool := NewBlockPool(height, bcr) - bPool.SetLogger(bcr.logger) - - txs := []types.Tx{types.Tx("foo"), types.Tx("bar")} - - var maxH int64 - for _, p := range peers { - if p.Height > maxH { - maxH = p.Height - } - bPool.peers[p.ID] = NewBpPeer(p.ID, p.Base, p.Height, bcr.sendPeerError, nil) - bPool.peers[p.ID].SetLogger(bcr.logger) - - } - bPool.MaxPeerHeight = maxH - for h, p := range blocks { - bPool.blocks[h] = p.id - bPool.peers[p.id].RequestSent(h) - if p.create { - // simulate that a block at height h has been received - _ = bPool.peers[p.id].AddBlock(types.MakeBlock(h, txs, nil, nil), 100) - } - } - return bPool -} - -func assertPeerSetsEquivalent(t *testing.T, set1 map[p2p.ID]*BpPeer, set2 map[p2p.ID]*BpPeer) { - assert.Equal(t, len(set1), len(set2)) - for peerID, peer1 := range set1 { - peer2 := set2[peerID] - assert.NotNil(t, peer2) - assert.Equal(t, peer1.NumPendingBlockRequests, peer2.NumPendingBlockRequests) - assert.Equal(t, peer1.Height, peer2.Height) - assert.Equal(t, peer1.Base, peer2.Base) - assert.Equal(t, len(peer1.blocks), len(peer2.blocks)) - for h, block1 := range peer1.blocks { - block2 := peer2.blocks[h] - // block1 and block2 could be nil if a request was made but no block was received - assert.Equal(t, block1, block2) - } - } -} - -func assertBlockPoolEquivalent(t *testing.T, poolWanted, pool *BlockPool) { - assert.Equal(t, poolWanted.blocks, pool.blocks) - assertPeerSetsEquivalent(t, poolWanted.peers, pool.peers) - assert.Equal(t, poolWanted.MaxPeerHeight, pool.MaxPeerHeight) - assert.Equal(t, poolWanted.Height, pool.Height) - -} - -func TestBlockPoolUpdatePeer(t *testing.T) { - testBcR := newTestBcR() - - tests := []struct { - name string - pool *BlockPool - args testPeer - poolWanted *BlockPool - errWanted error - }{ - { - name: "add a first short peer", - pool: makeBlockPool(testBcR, 100, []BpPeer{}, map[int64]tPBlocks{}), - args: testPeer{"P1", 0, 50}, - errWanted: errPeerTooShort, - poolWanted: makeBlockPool(testBcR, 100, []BpPeer{}, map[int64]tPBlocks{}), - }, - { - name: "add a first good peer", - pool: makeBlockPool(testBcR, 100, []BpPeer{}, map[int64]tPBlocks{}), - args: testPeer{"P1", 0, 101}, - poolWanted: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 101}}, map[int64]tPBlocks{}), - }, - { - name: "add a first good peer with base", - pool: makeBlockPool(testBcR, 100, []BpPeer{}, map[int64]tPBlocks{}), - args: testPeer{"P1", 10, 101}, - poolWanted: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Base: 10, Height: 101}}, map[int64]tPBlocks{}), - }, - { - name: "increase the height of P1 from 120 to 123", - pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 120}}, map[int64]tPBlocks{}), - args: testPeer{"P1", 0, 123}, - poolWanted: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 123}}, map[int64]tPBlocks{}), - }, - { - name: "decrease the height of P1 from 120 to 110", - pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 120}}, map[int64]tPBlocks{}), - args: testPeer{"P1", 0, 110}, - errWanted: errPeerLowersItsHeight, - poolWanted: makeBlockPool(testBcR, 100, []BpPeer{}, map[int64]tPBlocks{}), - }, - { - name: "decrease the height of P1 from 105 to 102 with blocks", - pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 105}}, - map[int64]tPBlocks{ - 100: {"P1", true}, 101: {"P1", true}, 102: {"P1", true}}), - args: testPeer{"P1", 0, 102}, - errWanted: errPeerLowersItsHeight, - poolWanted: makeBlockPool(testBcR, 100, []BpPeer{}, - map[int64]tPBlocks{}), - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - pool := tt.pool - err := pool.UpdatePeer(tt.args.id, tt.args.base, tt.args.height) - assert.Equal(t, tt.errWanted, err) - assert.Equal(t, tt.poolWanted.blocks, tt.pool.blocks) - assertPeerSetsEquivalent(t, tt.poolWanted.peers, tt.pool.peers) - assert.Equal(t, tt.poolWanted.MaxPeerHeight, tt.pool.MaxPeerHeight) - }) - } -} - -func TestBlockPoolRemovePeer(t *testing.T) { - testBcR := newTestBcR() - - type args struct { - peerID p2p.ID - err error - } - - tests := []struct { - name string - pool *BlockPool - args args - poolWanted *BlockPool - }{ - { - name: "attempt to delete non-existing peer", - pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 120}}, map[int64]tPBlocks{}), - args: args{"P99", nil}, - poolWanted: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 120}}, map[int64]tPBlocks{}), - }, - { - name: "delete the only peer without blocks", - pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 120}}, map[int64]tPBlocks{}), - args: args{"P1", nil}, - poolWanted: makeBlockPool(testBcR, 100, []BpPeer{}, map[int64]tPBlocks{}), - }, - { - name: "delete the shortest of two peers without blocks", - pool: makeBlockPool( - testBcR, - 100, - []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 120}}, - map[int64]tPBlocks{}), - args: args{"P1", nil}, - poolWanted: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P2", Height: 120}}, map[int64]tPBlocks{}), - }, - { - name: "delete the tallest of two peers without blocks", - pool: makeBlockPool( - testBcR, - 100, - []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 120}}, - map[int64]tPBlocks{}), - args: args{"P2", nil}, - poolWanted: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 100}}, map[int64]tPBlocks{}), - }, - { - name: "delete the only peer with block requests sent and blocks received", - pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 120}}, - map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", false}}), - args: args{"P1", nil}, - poolWanted: makeBlockPool(testBcR, 100, []BpPeer{}, map[int64]tPBlocks{}), - }, - { - name: "delete the shortest of two peers with block requests sent and blocks received", - pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 120}, {ID: "P2", Height: 200}}, - map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", false}}), - args: args{"P1", nil}, - poolWanted: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P2", Height: 200}}, map[int64]tPBlocks{}), - }, - { - name: "delete the tallest of two peers with block requests sent and blocks received", - pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 120}, {ID: "P2", Height: 110}}, - map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", false}}), - args: args{"P1", nil}, - poolWanted: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P2", Height: 110}}, map[int64]tPBlocks{}), - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - tt.pool.RemovePeer(tt.args.peerID, tt.args.err) - assertBlockPoolEquivalent(t, tt.poolWanted, tt.pool) - }) - } -} - -func TestBlockPoolRemoveShortPeers(t *testing.T) { - testBcR := newTestBcR() - - tests := []struct { - name string - pool *BlockPool - poolWanted *BlockPool - }{ - { - name: "no short peers", - pool: makeBlockPool(testBcR, 100, - []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 110}, {ID: "P3", Height: 120}}, map[int64]tPBlocks{}), - poolWanted: makeBlockPool(testBcR, 100, - []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 110}, {ID: "P3", Height: 120}}, map[int64]tPBlocks{}), - }, - - { - name: "one short peer", - pool: makeBlockPool(testBcR, 100, - []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 90}, {ID: "P3", Height: 120}}, map[int64]tPBlocks{}), - poolWanted: makeBlockPool(testBcR, 100, - []BpPeer{{ID: "P1", Height: 100}, {ID: "P3", Height: 120}}, map[int64]tPBlocks{}), - }, - - { - name: "all short peers", - pool: makeBlockPool(testBcR, 100, - []BpPeer{{ID: "P1", Height: 90}, {ID: "P2", Height: 91}, {ID: "P3", Height: 92}}, map[int64]tPBlocks{}), - poolWanted: makeBlockPool(testBcR, 100, []BpPeer{}, map[int64]tPBlocks{}), - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - pool := tt.pool - pool.removeShortPeers() - assertBlockPoolEquivalent(t, tt.poolWanted, tt.pool) - }) - } -} - -func TestBlockPoolSendRequestBatch(t *testing.T) { - type testPeerResult struct { - id p2p.ID - numPendingBlockRequests int - } - - testBcR := newTestBcR() - - tests := []struct { - name string - pool *BlockPool - maxRequestsPerPeer int - expRequests map[int64]bool - expRequestsSent int - expPeerResults []testPeerResult - }{ - { - name: "one peer - send up to maxRequestsPerPeer block requests", - pool: makeBlockPool(testBcR, 10, []BpPeer{{ID: "P1", Height: 100}}, map[int64]tPBlocks{}), - maxRequestsPerPeer: 2, - expRequests: map[int64]bool{10: true, 11: true}, - expRequestsSent: 2, - expPeerResults: []testPeerResult{{id: "P1", numPendingBlockRequests: 2}}, - }, - { - name: "multiple peers - stops at gap between height and base", - pool: makeBlockPool(testBcR, 10, []BpPeer{ - {ID: "P1", Base: 1, Height: 12}, - {ID: "P2", Base: 15, Height: 100}, - }, map[int64]tPBlocks{}), - maxRequestsPerPeer: 10, - expRequests: map[int64]bool{10: true, 11: true, 12: true}, - expRequestsSent: 3, - expPeerResults: []testPeerResult{ - {id: "P1", numPendingBlockRequests: 3}, - {id: "P2", numPendingBlockRequests: 0}, - }, - }, - { - name: "n peers - send n*maxRequestsPerPeer block requests", - pool: makeBlockPool( - testBcR, - 10, - []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, - map[int64]tPBlocks{}), - maxRequestsPerPeer: 2, - expRequests: map[int64]bool{10: true, 11: true}, - expRequestsSent: 4, - expPeerResults: []testPeerResult{ - {id: "P1", numPendingBlockRequests: 2}, - {id: "P2", numPendingBlockRequests: 2}}, - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - resetPoolTestResults() - - var pool = tt.pool - maxRequestsPerPeer = tt.maxRequestsPerPeer - pool.MakeNextRequests(10) - - assert.Equal(t, tt.expRequestsSent, testResults.numRequestsSent) - for _, tPeer := range tt.expPeerResults { - var peer = pool.peers[tPeer.id] - assert.NotNil(t, peer) - assert.Equal(t, tPeer.numPendingBlockRequests, peer.NumPendingBlockRequests) - } - }) - } -} - -func TestBlockPoolAddBlock(t *testing.T) { - testBcR := newTestBcR() - txs := []types.Tx{types.Tx("foo"), types.Tx("bar")} - - type args struct { - peerID p2p.ID - block *types.Block - blockSize int - } - tests := []struct { - name string - pool *BlockPool - args args - poolWanted *BlockPool - errWanted error - }{ - {name: "block from unknown peer", - pool: makeBlockPool(testBcR, 10, []BpPeer{{ID: "P1", Height: 100}}, map[int64]tPBlocks{}), - args: args{ - peerID: "P2", - block: types.MakeBlock(int64(10), txs, nil, nil), - blockSize: 100, - }, - poolWanted: makeBlockPool(testBcR, 10, []BpPeer{{ID: "P1", Height: 100}}, map[int64]tPBlocks{}), - errWanted: errBadDataFromPeer, - }, - {name: "unexpected block 11 from known peer - waiting for 10", - pool: makeBlockPool(testBcR, 10, - []BpPeer{{ID: "P1", Height: 100}}, - map[int64]tPBlocks{10: {"P1", false}}), - args: args{ - peerID: "P1", - block: types.MakeBlock(int64(11), txs, nil, nil), - blockSize: 100, - }, - poolWanted: makeBlockPool(testBcR, 10, - []BpPeer{{ID: "P1", Height: 100}}, - map[int64]tPBlocks{10: {"P1", false}}), - errWanted: errMissingBlock, - }, - {name: "unexpected block 10 from known peer - already have 10", - pool: makeBlockPool(testBcR, 10, - []BpPeer{{ID: "P1", Height: 100}}, - map[int64]tPBlocks{10: {"P1", true}, 11: {"P1", false}}), - args: args{ - peerID: "P1", - block: types.MakeBlock(int64(10), txs, nil, nil), - blockSize: 100, - }, - poolWanted: makeBlockPool(testBcR, 10, - []BpPeer{{ID: "P1", Height: 100}}, - map[int64]tPBlocks{10: {"P1", true}, 11: {"P1", false}}), - errWanted: errDuplicateBlock, - }, - {name: "unexpected block 10 from known peer P2 - expected 10 to come from P1", - pool: makeBlockPool(testBcR, 10, - []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, - map[int64]tPBlocks{10: {"P1", false}}), - args: args{ - peerID: "P2", - block: types.MakeBlock(int64(10), txs, nil, nil), - blockSize: 100, - }, - poolWanted: makeBlockPool(testBcR, 10, - []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, - map[int64]tPBlocks{10: {"P1", false}}), - errWanted: errBadDataFromPeer, - }, - {name: "expected block from known peer", - pool: makeBlockPool(testBcR, 10, - []BpPeer{{ID: "P1", Height: 100}}, - map[int64]tPBlocks{10: {"P1", false}}), - args: args{ - peerID: "P1", - block: types.MakeBlock(int64(10), txs, nil, nil), - blockSize: 100, - }, - poolWanted: makeBlockPool(testBcR, 10, - []BpPeer{{ID: "P1", Height: 100}}, - map[int64]tPBlocks{10: {"P1", true}}), - errWanted: nil, - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - err := tt.pool.AddBlock(tt.args.peerID, tt.args.block, tt.args.blockSize) - assert.Equal(t, tt.errWanted, err) - assertBlockPoolEquivalent(t, tt.poolWanted, tt.pool) - }) - } -} - -func TestBlockPoolFirstTwoBlocksAndPeers(t *testing.T) { - testBcR := newTestBcR() - - tests := []struct { - name string - pool *BlockPool - firstWanted int64 - secondWanted int64 - errWanted error - }{ - { - name: "both blocks missing", - pool: makeBlockPool(testBcR, 10, - []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, - map[int64]tPBlocks{15: {"P1", true}, 16: {"P2", true}}), - errWanted: errMissingBlock, - }, - { - name: "second block missing", - pool: makeBlockPool(testBcR, 15, - []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, - map[int64]tPBlocks{15: {"P1", true}, 18: {"P2", true}}), - firstWanted: 15, - errWanted: errMissingBlock, - }, - { - name: "first block missing", - pool: makeBlockPool(testBcR, 15, - []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, - map[int64]tPBlocks{16: {"P2", true}, 18: {"P2", true}}), - secondWanted: 16, - errWanted: errMissingBlock, - }, - { - name: "both blocks present", - pool: makeBlockPool(testBcR, 10, - []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, - map[int64]tPBlocks{10: {"P1", true}, 11: {"P2", true}}), - firstWanted: 10, - secondWanted: 11, - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - pool := tt.pool - gotFirst, gotSecond, err := pool.FirstTwoBlocksAndPeers() - assert.Equal(t, tt.errWanted, err) - - if tt.firstWanted != 0 { - peer := pool.blocks[tt.firstWanted] - block := pool.peers[peer].blocks[tt.firstWanted] - assert.Equal(t, block, gotFirst.block, - "BlockPool.FirstTwoBlocksAndPeers() gotFirst = %v, want %v", - tt.firstWanted, gotFirst.block.Height) - } - - if tt.secondWanted != 0 { - peer := pool.blocks[tt.secondWanted] - block := pool.peers[peer].blocks[tt.secondWanted] - assert.Equal(t, block, gotSecond.block, - "BlockPool.FirstTwoBlocksAndPeers() gotFirst = %v, want %v", - tt.secondWanted, gotSecond.block.Height) - } - }) - } -} - -func TestBlockPoolInvalidateFirstTwoBlocks(t *testing.T) { - testBcR := newTestBcR() - - tests := []struct { - name string - pool *BlockPool - poolWanted *BlockPool - }{ - { - name: "both blocks missing", - pool: makeBlockPool(testBcR, 10, - []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, - map[int64]tPBlocks{15: {"P1", true}, 16: {"P2", true}}), - poolWanted: makeBlockPool(testBcR, 10, - []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, - map[int64]tPBlocks{15: {"P1", true}, 16: {"P2", true}}), - }, - { - name: "second block missing", - pool: makeBlockPool(testBcR, 15, - []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, - map[int64]tPBlocks{15: {"P1", true}, 18: {"P2", true}}), - poolWanted: makeBlockPool(testBcR, 15, - []BpPeer{{ID: "P2", Height: 100}}, - map[int64]tPBlocks{18: {"P2", true}}), - }, - { - name: "first block missing", - pool: makeBlockPool(testBcR, 15, - []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, - map[int64]tPBlocks{18: {"P1", true}, 16: {"P2", true}}), - poolWanted: makeBlockPool(testBcR, 15, - []BpPeer{{ID: "P1", Height: 100}}, - map[int64]tPBlocks{18: {"P1", true}}), - }, - { - name: "both blocks present", - pool: makeBlockPool(testBcR, 10, - []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, - map[int64]tPBlocks{10: {"P1", true}, 11: {"P2", true}}), - poolWanted: makeBlockPool(testBcR, 10, - []BpPeer{}, - map[int64]tPBlocks{}), - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - tt.pool.InvalidateFirstTwoBlocks(errNoPeerResponse) - assertBlockPoolEquivalent(t, tt.poolWanted, tt.pool) - }) - } -} - -func TestProcessedCurrentHeightBlock(t *testing.T) { - testBcR := newTestBcR() - - tests := []struct { - name string - pool *BlockPool - poolWanted *BlockPool - }{ - { - name: "one peer", - pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 120}}, - map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", true}}), - poolWanted: makeBlockPool(testBcR, 101, []BpPeer{{ID: "P1", Height: 120}}, - map[int64]tPBlocks{101: {"P1", true}}), - }, - { - name: "multiple peers", - pool: makeBlockPool(testBcR, 100, - []BpPeer{{ID: "P1", Height: 120}, {ID: "P2", Height: 120}, {ID: "P3", Height: 130}}, - map[int64]tPBlocks{ - 100: {"P1", true}, 104: {"P1", true}, 105: {"P1", false}, - 101: {"P2", true}, 103: {"P2", false}, - 102: {"P3", true}, 106: {"P3", true}}), - poolWanted: makeBlockPool(testBcR, 101, - []BpPeer{{ID: "P1", Height: 120}, {ID: "P2", Height: 120}, {ID: "P3", Height: 130}}, - map[int64]tPBlocks{ - 104: {"P1", true}, 105: {"P1", false}, - 101: {"P2", true}, 103: {"P2", false}, - 102: {"P3", true}, 106: {"P3", true}}), - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - tt.pool.ProcessedCurrentHeightBlock() - assertBlockPoolEquivalent(t, tt.poolWanted, tt.pool) - }) - } -} - -func TestRemovePeerAtCurrentHeight(t *testing.T) { - testBcR := newTestBcR() - - tests := []struct { - name string - pool *BlockPool - poolWanted *BlockPool - }{ - { - name: "one peer, remove peer for block at H", - pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 120}}, - map[int64]tPBlocks{100: {"P1", false}, 101: {"P1", true}}), - poolWanted: makeBlockPool(testBcR, 100, []BpPeer{}, map[int64]tPBlocks{}), - }, - { - name: "one peer, remove peer for block at H+1", - pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 120}}, - map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", false}}), - poolWanted: makeBlockPool(testBcR, 100, []BpPeer{}, map[int64]tPBlocks{}), - }, - { - name: "multiple peers, remove peer for block at H", - pool: makeBlockPool(testBcR, 100, - []BpPeer{{ID: "P1", Height: 120}, {ID: "P2", Height: 120}, {ID: "P3", Height: 130}}, - map[int64]tPBlocks{ - 100: {"P1", false}, 104: {"P1", true}, 105: {"P1", false}, - 101: {"P2", true}, 103: {"P2", false}, - 102: {"P3", true}, 106: {"P3", true}}), - poolWanted: makeBlockPool(testBcR, 100, - []BpPeer{{ID: "P2", Height: 120}, {ID: "P3", Height: 130}}, - map[int64]tPBlocks{ - 101: {"P2", true}, 103: {"P2", false}, - 102: {"P3", true}, 106: {"P3", true}}), - }, - { - name: "multiple peers, remove peer for block at H+1", - pool: makeBlockPool(testBcR, 100, - []BpPeer{{ID: "P1", Height: 120}, {ID: "P2", Height: 120}, {ID: "P3", Height: 130}}, - map[int64]tPBlocks{ - 100: {"P1", true}, 104: {"P1", true}, 105: {"P1", false}, - 101: {"P2", false}, 103: {"P2", false}, - 102: {"P3", true}, 106: {"P3", true}}), - poolWanted: makeBlockPool(testBcR, 100, - []BpPeer{{ID: "P1", Height: 120}, {ID: "P3", Height: 130}}, - map[int64]tPBlocks{ - 100: {"P1", true}, 104: {"P1", true}, 105: {"P1", false}, - 102: {"P3", true}, 106: {"P3", true}}), - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - tt.pool.RemovePeerAtCurrentHeights(errNoPeerResponse) - assertBlockPoolEquivalent(t, tt.poolWanted, tt.pool) - }) - } -} diff --git a/blockchain/v1/reactor.go b/blockchain/v1/reactor.go deleted file mode 100644 index ddfcb3650..000000000 --- a/blockchain/v1/reactor.go +++ /dev/null @@ -1,596 +0,0 @@ -package v1 - -import ( - "fmt" - "reflect" - "sync/atomic" - "time" - - "github.com/tendermint/tendermint/behaviour" - bc "github.com/tendermint/tendermint/blockchain" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/p2p" - bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain" - sm "github.com/tendermint/tendermint/state" - "github.com/tendermint/tendermint/store" - "github.com/tendermint/tendermint/types" -) - -const ( - // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height) - BlockchainChannel = byte(0x40) - trySyncIntervalMS = 10 - trySendIntervalMS = 10 - - // ask for best height every 10s - statusUpdateIntervalSeconds = 10 -) - -var ( - // Maximum number of requests that can be pending per peer, i.e. for which requests have been sent but blocks - // have not been received. - maxRequestsPerPeer = 20 - // Maximum number of block requests for the reactor, pending or for which blocks have been received. - maxNumRequests = 64 -) - -type consensusReactor interface { - // for when we switch from blockchain reactor and fast sync to - // the consensus machine - SwitchToConsensus(state sm.State, skipWAL bool) -} - -// BlockchainReactor handles long-term catchup syncing. -type BlockchainReactor struct { - p2p.BaseReactor - - initialState sm.State // immutable - state sm.State - - blockExec *sm.BlockExecutor - store *store.BlockStore - - fastSync bool - stateSynced bool - - fsm *BcReactorFSM - blocksSynced uint64 - - // Receive goroutine forwards messages to this channel to be processed in the context of the poolRoutine. - messagesForFSMCh chan bcReactorMessage - - // Switch goroutine may send RemovePeer to the blockchain reactor. This is an error message that is relayed - // to this channel to be processed in the context of the poolRoutine. - errorsForFSMCh chan bcReactorMessage - - // This channel is used by the FSM and indirectly the block pool to report errors to the blockchain reactor and - // the switch. - eventsFromFSMCh chan bcFsmMessage - - swReporter *behaviour.SwitchReporter - - // Atomic integer (0 - sync in progress, 1 - finished syncing) - syncEnded int32 -} - -// NewBlockchainReactor returns new reactor instance. -func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore, - fastSync bool) *BlockchainReactor { - - if state.LastBlockHeight != store.Height() { - panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, - store.Height())) - } - - const capacity = 1000 - eventsFromFSMCh := make(chan bcFsmMessage, capacity) - messagesForFSMCh := make(chan bcReactorMessage, capacity) - errorsForFSMCh := make(chan bcReactorMessage, capacity) - - startHeight := store.Height() + 1 - if startHeight == 1 { - startHeight = state.InitialHeight - } - bcR := &BlockchainReactor{ - initialState: state, - state: state, - blockExec: blockExec, - fastSync: fastSync, - store: store, - messagesForFSMCh: messagesForFSMCh, - eventsFromFSMCh: eventsFromFSMCh, - errorsForFSMCh: errorsForFSMCh, - } - fsm := NewFSM(startHeight, bcR) - bcR.fsm = fsm - bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR) - // bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch) - - return bcR -} - -// bcReactorMessage is used by the reactor to send messages to the FSM. -type bcReactorMessage struct { - event bReactorEvent - data bReactorEventData -} - -type bFsmEvent uint - -const ( - // message type events - peerErrorEv = iota + 1 - syncFinishedEv -) - -type bFsmEventData struct { - peerID p2p.ID - err error -} - -// bcFsmMessage is used by the FSM to send messages to the reactor -type bcFsmMessage struct { - event bFsmEvent - data bFsmEventData -} - -// SetLogger implements service.Service by setting the logger on reactor and pool. -func (bcR *BlockchainReactor) SetLogger(l log.Logger) { - bcR.BaseService.Logger = l - bcR.fsm.SetLogger(l) -} - -// OnStart implements service.Service. -func (bcR *BlockchainReactor) OnStart() error { - bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch) - if bcR.fastSync { - go bcR.poolRoutine() - } else { // if we're not fast syncing, mark it as finished - bcR.setSyncEnded() - } - return nil -} - -// OnStop implements service.Service. -func (bcR *BlockchainReactor) OnStop() { -} - -func (bcR *BlockchainReactor) isSyncEnded() bool { - return atomic.LoadInt32(&(bcR.syncEnded)) != 0 -} - -func (bcR *BlockchainReactor) setSyncEnded() { - atomic.StoreInt32(&(bcR.syncEnded), 1) -} - -// SwitchToFastSync is called by the state sync reactor when switching to fast sync. -func (bcR *BlockchainReactor) SwitchToFastSync(state sm.State) error { - bcR.fastSync = true - bcR.initialState = state - bcR.state = state - bcR.stateSynced = true - - bcR.fsm = NewFSM(state.LastBlockHeight+1, bcR) - bcR.fsm.SetLogger(bcR.Logger) - go bcR.poolRoutine() - return nil -} - -// GetChannels implements Reactor -func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { - return []*p2p.ChannelDescriptor{ - { - ID: BlockchainChannel, - Priority: 10, - SendQueueCapacity: 2000, - RecvBufferCapacity: 50 * 4096, - RecvMessageCapacity: bc.MaxMsgSize, - }, - } -} - -// AddPeer implements Reactor by sending our state to peer. -func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) { - msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{ - Base: bcR.store.Base(), - Height: bcR.store.Height(), - }) - if err != nil { - bcR.Logger.Error("could not convert msg to protobuf", "err", err) - return - } - peer.Send(BlockchainChannel, msgBytes) - // it's OK if send fails. will try later in poolRoutine - - // peer is added to the pool once we receive the first - // bcStatusResponseMessage from the peer and call pool.updatePeer() -} - -// sendBlockToPeer loads a block and sends it to the requesting peer. -// If the block doesn't exist a bcNoBlockResponseMessage is sent. -// If all nodes are honest, no node should be requesting for a block that doesn't exist. -func (bcR *BlockchainReactor) sendBlockToPeer(msg *bcproto.BlockRequest, - src p2p.Peer) (queued bool) { - - block := bcR.store.LoadBlock(msg.Height) - if block != nil { - pbbi, err := block.ToProto() - if err != nil { - bcR.Logger.Error("Could not send block message to peer", "err", err) - return false - } - msgBytes, err := bc.EncodeMsg(&bcproto.BlockResponse{Block: pbbi}) - if err != nil { - bcR.Logger.Error("unable to marshal msg", "err", err) - return false - } - return src.TrySend(BlockchainChannel, msgBytes) - } - - bcR.Logger.Info("peer asking for a block we don't have", "src", src, "height", msg.Height) - - msgBytes, err := bc.EncodeMsg(&bcproto.NoBlockResponse{Height: msg.Height}) - if err != nil { - bcR.Logger.Error("unable to marshal msg", "err", err) - return false - } - return src.TrySend(BlockchainChannel, msgBytes) -} - -func (bcR *BlockchainReactor) sendStatusResponseToPeer(msg *bcproto.StatusRequest, src p2p.Peer) (queued bool) { - msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{ - Base: bcR.store.Base(), - Height: bcR.store.Height(), - }) - if err != nil { - bcR.Logger.Error("unable to marshal msg", "err", err) - return false - } - - return src.TrySend(BlockchainChannel, msgBytes) -} - -// RemovePeer implements Reactor by removing peer from the pool. -func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { - if bcR.isSyncEnded() { - return - } - - msgData := bcReactorMessage{ - event: peerRemoveEv, - data: bReactorEventData{ - peerID: peer.ID(), - err: errSwitchRemovesPeer, - }, - } - bcR.errorsForFSMCh <- msgData -} - -// Receive implements Reactor by handling 4 types of messages (look below). -// XXX: do not call any methods that can block or incur heavy processing. -// https://github.com/tendermint/tendermint/issues/2888 -func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { - msg, err := bc.DecodeMsg(msgBytes) - if err != nil { - bcR.Logger.Error("error decoding message", - "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) - _ = bcR.swReporter.Report(behaviour.BadMessage(src.ID(), err.Error())) - return - } - - if err = bc.ValidateMsg(msg); err != nil { - bcR.Logger.Error("peer sent us invalid msg", "peer", src, "msg", msg, "err", err) - _ = bcR.swReporter.Report(behaviour.BadMessage(src.ID(), err.Error())) - return - } - - bcR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg) - - switch msg := msg.(type) { - case *bcproto.BlockRequest: - if queued := bcR.sendBlockToPeer(msg, src); !queued { - // Unfortunately not queued since the queue is full. - bcR.Logger.Error("Could not send block message to peer", "src", src, "height", msg.Height) - } - - case *bcproto.StatusRequest: - // Send peer our state. - if queued := bcR.sendStatusResponseToPeer(msg, src); !queued { - // Unfortunately not queued since the queue is full. - bcR.Logger.Error("Could not send status message to peer", "src", src) - } - - case *bcproto.BlockResponse: - if bcR.isSyncEnded() { - return - } - - bi, err := types.BlockFromProto(msg.Block) - if err != nil { - bcR.Logger.Error("error transition block from protobuf", "err", err) - _ = bcR.swReporter.Report(behaviour.BadMessage(src.ID(), err.Error())) - return - } - msgForFSM := bcReactorMessage{ - event: blockResponseEv, - data: bReactorEventData{ - peerID: src.ID(), - height: bi.Height, - block: bi, - length: len(msgBytes), - }, - } - bcR.Logger.Info("Received", "src", src, "height", bi.Height) - bcR.messagesForFSMCh <- msgForFSM - case *bcproto.NoBlockResponse: - if bcR.isSyncEnded() { - return - } - - msgForFSM := bcReactorMessage{ - event: noBlockResponseEv, - data: bReactorEventData{ - peerID: src.ID(), - height: msg.Height, - }, - } - bcR.Logger.Debug("Peer does not have requested block", "peer", src, "height", msg.Height) - bcR.messagesForFSMCh <- msgForFSM - case *bcproto.StatusResponse: - if bcR.isSyncEnded() { - return - } - - // Got a peer status. Unverified. - msgForFSM := bcReactorMessage{ - event: statusResponseEv, - data: bReactorEventData{ - peerID: src.ID(), - height: msg.Height, - length: len(msgBytes), - }, - } - bcR.messagesForFSMCh <- msgForFSM - default: - bcR.Logger.Error(fmt.Sprintf("unknown message type %v", reflect.TypeOf(msg))) - } -} - -// processBlocksRoutine processes blocks until signlaed to stop over the stopProcessing channel -func (bcR *BlockchainReactor) processBlocksRoutine(stopProcessing chan struct{}) { - processReceivedBlockTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) - defer processReceivedBlockTicker.Stop() - - var ( - doProcessBlockCh = make(chan struct{}, 1) - lastHundred = time.Now() - lastRate = 0.0 - ) - -ForLoop: - for { - select { - case <-bcR.Quit(): - break ForLoop - case <-stopProcessing: - bcR.Logger.Info("finishing block execution") - break ForLoop - case <-processReceivedBlockTicker.C: // try to execute blocks - select { - case doProcessBlockCh <- struct{}{}: - default: - } - case <-doProcessBlockCh: - for { - err := bcR.processBlock() - if err == errMissingBlock { - break - } - // Notify FSM of block processing result. - msgForFSM := bcReactorMessage{ - event: processedBlockEv, - data: bReactorEventData{ - err: err, - }, - } - _ = bcR.fsm.Handle(&msgForFSM) - - if err != nil { - break - } - - bcR.blocksSynced++ - if bcR.blocksSynced%100 == 0 { - lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) - height, maxPeerHeight := bcR.fsm.Status() - bcR.Logger.Info("Fast Sync Rate", "height", height, - "max_peer_height", maxPeerHeight, "blocks/s", lastRate) - lastHundred = time.Now() - } - } - } - } -} - -// poolRoutine receives and handles messages from the Receive() routine and from the FSM. -func (bcR *BlockchainReactor) poolRoutine() { - bcR.fsm.Start() - - sendBlockRequestTicker := time.NewTicker(trySendIntervalMS * time.Millisecond) - statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) - - defer sendBlockRequestTicker.Stop() - // NOTE: statusUpdateTicker can continue to run - - stopProcessing := make(chan struct{}, 1) - go bcR.processBlocksRoutine(stopProcessing) - -ForLoop: - for { - select { - - case <-sendBlockRequestTicker.C: - if !bcR.fsm.NeedsBlocks() { - continue - } - _ = bcR.fsm.Handle(&bcReactorMessage{ - event: makeRequestsEv, - data: bReactorEventData{ - maxNumRequests: maxNumRequests}}) - - case <-statusUpdateTicker.C: - // Ask for status updates. - go bcR.sendStatusRequest() - - case msg := <-bcR.messagesForFSMCh: - // Sent from the Receive() routine when status (statusResponseEv) and - // block (blockResponseEv) response events are received - _ = bcR.fsm.Handle(&msg) - - case msg := <-bcR.errorsForFSMCh: - // Sent from the switch.RemovePeer() routine (RemovePeerEv) and - // FSM state timer expiry routine (stateTimeoutEv). - _ = bcR.fsm.Handle(&msg) - - case msg := <-bcR.eventsFromFSMCh: - switch msg.event { - case syncFinishedEv: // Sent from the FSM when it enters finished state. - stopProcessing <- struct{}{} - bcR.setSyncEnded() - case peerErrorEv: // Sent from the FSM when it detects peer error - bcR.reportPeerErrorToSwitch(msg.data.err, msg.data.peerID) - if msg.data.err == errNoPeerResponse { - // Sent from the peer timeout handler routine - _ = bcR.fsm.Handle(&bcReactorMessage{ - event: peerRemoveEv, - data: bReactorEventData{ - peerID: msg.data.peerID, - err: msg.data.err, - }, - }) - } - // else { - // For slow peers, or errors due to blocks received from wrong peer - // the FSM had already removed the peers - // } - default: - bcR.Logger.Error("Event from FSM not supported", "type", msg.event) - } - - case <-bcR.Quit(): - break ForLoop - } - } -} - -func (bcR *BlockchainReactor) reportPeerErrorToSwitch(err error, peerID p2p.ID) { - peer := bcR.Switch.Peers().Get(peerID) - if peer != nil { - _ = bcR.swReporter.Report(behaviour.BadMessage(peerID, err.Error())) - } -} - -func (bcR *BlockchainReactor) processBlock() error { - - first, second, err := bcR.fsm.FirstTwoBlocks() - if err != nil { - // We need both to sync the first block. - return err - } - - chainID := bcR.initialState.ChainID - - firstParts := first.MakePartSet(types.BlockPartSizeBytes) - firstPartSetHeader := firstParts.Header() - firstID := types.BlockID{Hash: first.Hash(), PartSetHeader: firstPartSetHeader} - // Finally, verify the first block using the second's commit - // NOTE: we can probably make this more efficient, but note that calling - // first.Hash() doesn't verify the tx contents, so MakePartSet() is - // currently necessary. - err = bcR.state.Validators.VerifyCommitLight(chainID, firstID, first.Height, second.LastCommit) - if err != nil { - bcR.Logger.Error("error during commit verification", "err", err, - "first", first.Height, "second", second.Height) - return errBlockVerificationFailure - } - - bcR.store.SaveBlock(first, firstParts, second.LastCommit) - - bcR.state, _, err = bcR.blockExec.ApplyBlock(bcR.state, firstID, first) - if err != nil { - panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) - } - - return nil -} - -// sendStatusRequest broadcasts `BlockStore` height. -func (bcR *BlockchainReactor) sendStatusRequest() { - msgBytes, err := bc.EncodeMsg(&bcproto.StatusRequest{}) - if err != nil { - panic(err) - } - bcR.Switch.Broadcast(BlockchainChannel, msgBytes) -} - -// BlockRequest sends `BlockRequest` height. -func (bcR *BlockchainReactor) sendBlockRequest(peerID p2p.ID, height int64) error { - peer := bcR.Switch.Peers().Get(peerID) - if peer == nil { - return errNilPeerForBlockRequest - } - - msgBytes, err := bc.EncodeMsg(&bcproto.BlockRequest{Height: height}) - if err != nil { - return err - } - queued := peer.TrySend(BlockchainChannel, msgBytes) - if !queued { - return errSendQueueFull - } - return nil -} - -func (bcR *BlockchainReactor) switchToConsensus() { - conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor) - if ok { - conR.SwitchToConsensus(bcR.state, bcR.blocksSynced > 0 || bcR.stateSynced) - bcR.eventsFromFSMCh <- bcFsmMessage{event: syncFinishedEv} - } -} - -// Called by FSM and pool: -// - pool calls when it detects slow peer or when peer times out -// - FSM calls when: -// - adding a block (addBlock) fails -// - reactor processing of a block reports failure and FSM sends back the peers of first and second blocks -func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) { - bcR.Logger.Info("sendPeerError:", "peer", peerID, "error", err) - msgData := bcFsmMessage{ - event: peerErrorEv, - data: bFsmEventData{ - peerID: peerID, - err: err, - }, - } - bcR.eventsFromFSMCh <- msgData -} - -func (bcR *BlockchainReactor) resetStateTimer(name string, timer **time.Timer, timeout time.Duration) { - if timer == nil { - panic("nil timer pointer parameter") - } - if *timer == nil { - *timer = time.AfterFunc(timeout, func() { - msg := bcReactorMessage{ - event: stateTimeoutEv, - data: bReactorEventData{ - stateName: name, - }, - } - bcR.errorsForFSMCh <- msg - }) - } else { - (*timer).Reset(timeout) - } -} diff --git a/blockchain/v1/reactor_fsm.go b/blockchain/v1/reactor_fsm.go deleted file mode 100644 index 384ea5c28..000000000 --- a/blockchain/v1/reactor_fsm.go +++ /dev/null @@ -1,463 +0,0 @@ -package v1 - -import ( - "errors" - "fmt" - "sync" - "time" - - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/p2p" - "github.com/tendermint/tendermint/types" -) - -// Blockchain Reactor State -type bcReactorFSMState struct { - name string - - // called when transitioning out of current state - handle func(*BcReactorFSM, bReactorEvent, bReactorEventData) (next *bcReactorFSMState, err error) - // called when entering the state - enter func(fsm *BcReactorFSM) - - // timeout to ensure FSM is not stuck in a state forever - // the timer is owned and run by the fsm instance - timeout time.Duration -} - -func (s *bcReactorFSMState) String() string { - return s.name -} - -// BcReactorFSM is the datastructure for the Blockchain Reactor State Machine -type BcReactorFSM struct { - logger log.Logger - mtx sync.Mutex - - startTime time.Time - - state *bcReactorFSMState - stateTimer *time.Timer - pool *BlockPool - - // interface used to call the Blockchain reactor to send StatusRequest, BlockRequest, reporting errors, etc. - toBcR bcReactor -} - -// NewFSM creates a new reactor FSM. -func NewFSM(height int64, toBcR bcReactor) *BcReactorFSM { - return &BcReactorFSM{ - state: unknown, - startTime: time.Now(), - pool: NewBlockPool(height, toBcR), - toBcR: toBcR, - } -} - -// bReactorEventData is part of the message sent by the reactor to the FSM and used by the state handlers. -type bReactorEventData struct { - peerID p2p.ID - err error // for peer error: timeout, slow; for processed block event if error occurred - base int64 // for status response - height int64 // for status response; for processed block event - block *types.Block // for block response - stateName string // for state timeout events - length int // for block response event, length of received block, used to detect slow peers - maxNumRequests int // for request needed event, maximum number of pending requests -} - -// Blockchain Reactor Events (the input to the state machine) -type bReactorEvent uint - -const ( - // message type events - startFSMEv = iota + 1 - statusResponseEv - blockResponseEv - noBlockResponseEv - processedBlockEv - makeRequestsEv - stopFSMEv - - // other events - peerRemoveEv = iota + 256 - stateTimeoutEv -) - -func (msg *bcReactorMessage) String() string { - var dataStr string - - switch msg.event { - case startFSMEv: - dataStr = "" - case statusResponseEv: - dataStr = fmt.Sprintf("peer=%v base=%v height=%v", msg.data.peerID, msg.data.base, msg.data.height) - case blockResponseEv: - dataStr = fmt.Sprintf("peer=%v block.height=%v length=%v", - msg.data.peerID, msg.data.block.Height, msg.data.length) - case noBlockResponseEv: - dataStr = fmt.Sprintf("peer=%v requested height=%v", - msg.data.peerID, msg.data.height) - case processedBlockEv: - dataStr = fmt.Sprintf("error=%v", msg.data.err) - case makeRequestsEv: - dataStr = "" - case stopFSMEv: - dataStr = "" - case peerRemoveEv: - dataStr = fmt.Sprintf("peer: %v is being removed by the switch", msg.data.peerID) - case stateTimeoutEv: - dataStr = fmt.Sprintf("state=%v", msg.data.stateName) - default: - dataStr = "cannot interpret message data" - } - - return fmt.Sprintf("%v: %v", msg.event, dataStr) -} - -func (ev bReactorEvent) String() string { - switch ev { - case startFSMEv: - return "startFSMEv" - case statusResponseEv: - return "statusResponseEv" - case blockResponseEv: - return "blockResponseEv" - case noBlockResponseEv: - return "noBlockResponseEv" - case processedBlockEv: - return "processedBlockEv" - case makeRequestsEv: - return "makeRequestsEv" - case stopFSMEv: - return "stopFSMEv" - case peerRemoveEv: - return "peerRemoveEv" - case stateTimeoutEv: - return "stateTimeoutEv" - default: - return "event unknown" - } - -} - -// states -var ( - unknown *bcReactorFSMState - waitForPeer *bcReactorFSMState - waitForBlock *bcReactorFSMState - finished *bcReactorFSMState -) - -// timeouts for state timers -const ( - waitForPeerTimeout = 3 * time.Second - waitForBlockAtCurrentHeightTimeout = 10 * time.Second -) - -// errors -var ( - // internal to the package - errNoErrorFinished = errors.New("fast sync is finished") - errInvalidEvent = errors.New("invalid event in current state") - errMissingBlock = errors.New("missing blocks") - errNilPeerForBlockRequest = errors.New("peer for block request does not exist in the switch") - errSendQueueFull = errors.New("block request not made, send-queue is full") - errPeerTooShort = errors.New("peer height too low, old peer removed/ new peer not added") - errSwitchRemovesPeer = errors.New("switch is removing peer") - errTimeoutEventWrongState = errors.New("timeout event for a state different than the current one") - errNoTallerPeer = errors.New("fast sync timed out on waiting for a peer taller than this node") - - // reported eventually to the switch - // handle return - errPeerLowersItsHeight = errors.New("fast sync peer reports a height lower than previous") - // handle return - errNoPeerResponseForCurrentHeights = errors.New("fast sync timed out on peer block response for current heights") - errNoPeerResponse = errors.New("fast sync timed out on peer block response") // xx - errBadDataFromPeer = errors.New("fast sync received block from wrong peer or block is bad") // xx - errDuplicateBlock = errors.New("fast sync received duplicate block from peer") - errBlockVerificationFailure = errors.New("fast sync block verification failure") // xx - errSlowPeer = errors.New("fast sync peer is not sending us data fast enough") // xx - -) - -func init() { - unknown = &bcReactorFSMState{ - name: "unknown", - handle: func(fsm *BcReactorFSM, ev bReactorEvent, data bReactorEventData) (*bcReactorFSMState, error) { - switch ev { - case startFSMEv: - // Broadcast Status message. Currently doesn't return non-nil error. - fsm.toBcR.sendStatusRequest() - return waitForPeer, nil - - case stopFSMEv: - return finished, errNoErrorFinished - - default: - return unknown, errInvalidEvent - } - }, - } - - waitForPeer = &bcReactorFSMState{ - name: "waitForPeer", - timeout: waitForPeerTimeout, - enter: func(fsm *BcReactorFSM) { - // Stop when leaving the state. - fsm.resetStateTimer() - }, - handle: func(fsm *BcReactorFSM, ev bReactorEvent, data bReactorEventData) (*bcReactorFSMState, error) { - switch ev { - case stateTimeoutEv: - if data.stateName != "waitForPeer" { - fsm.logger.Error("received a state timeout event for different state", - "state", data.stateName) - return waitForPeer, errTimeoutEventWrongState - } - // There was no statusResponse received from any peer. - // Should we send status request again? - return finished, errNoTallerPeer - - case statusResponseEv: - if err := fsm.pool.UpdatePeer(data.peerID, data.base, data.height); err != nil { - if fsm.pool.NumPeers() == 0 { - return waitForPeer, err - } - } - if fsm.stateTimer != nil { - fsm.stateTimer.Stop() - } - return waitForBlock, nil - - case stopFSMEv: - if fsm.stateTimer != nil { - fsm.stateTimer.Stop() - } - return finished, errNoErrorFinished - - default: - return waitForPeer, errInvalidEvent - } - }, - } - - waitForBlock = &bcReactorFSMState{ - name: "waitForBlock", - timeout: waitForBlockAtCurrentHeightTimeout, - enter: func(fsm *BcReactorFSM) { - // Stop when leaving the state. - fsm.resetStateTimer() - }, - handle: func(fsm *BcReactorFSM, ev bReactorEvent, data bReactorEventData) (*bcReactorFSMState, error) { - switch ev { - - case statusResponseEv: - err := fsm.pool.UpdatePeer(data.peerID, data.base, data.height) - if fsm.pool.NumPeers() == 0 { - return waitForPeer, err - } - if fsm.pool.ReachedMaxHeight() { - return finished, err - } - return waitForBlock, err - - case blockResponseEv: - fsm.logger.Debug("blockResponseEv", "H", data.block.Height) - err := fsm.pool.AddBlock(data.peerID, data.block, data.length) - if err != nil { - // A block was received that was unsolicited, from unexpected peer, or that we already have it. - // Ignore block, remove peer and send error to switch. - fsm.pool.RemovePeer(data.peerID, err) - fsm.toBcR.sendPeerError(err, data.peerID) - } - if fsm.pool.NumPeers() == 0 { - return waitForPeer, err - } - return waitForBlock, err - case noBlockResponseEv: - fsm.logger.Error("peer does not have requested block", "peer", data.peerID) - fsm.pool.SetNoBlock(data.peerID, data.height) - - return waitForBlock, nil - case processedBlockEv: - if data.err != nil { - first, second, _ := fsm.pool.FirstTwoBlocksAndPeers() - fsm.logger.Error("error processing block", "err", data.err, - "first", first.block.Height, "second", second.block.Height) - fsm.logger.Error("send peer error for", "peer", first.peer.ID) - fsm.toBcR.sendPeerError(data.err, first.peer.ID) - fsm.logger.Error("send peer error for", "peer", second.peer.ID) - fsm.toBcR.sendPeerError(data.err, second.peer.ID) - // Remove the first two blocks. This will also remove the peers - fsm.pool.InvalidateFirstTwoBlocks(data.err) - } else { - fsm.pool.ProcessedCurrentHeightBlock() - // Since we advanced one block reset the state timer - fsm.resetStateTimer() - } - - // Both cases above may result in achieving maximum height. - if fsm.pool.ReachedMaxHeight() { - return finished, nil - } - - return waitForBlock, data.err - - case peerRemoveEv: - // This event is sent by the switch to remove disconnected and errored peers. - fsm.pool.RemovePeer(data.peerID, data.err) - if fsm.pool.NumPeers() == 0 { - return waitForPeer, nil - } - if fsm.pool.ReachedMaxHeight() { - return finished, nil - } - return waitForBlock, nil - - case makeRequestsEv: - fsm.makeNextRequests(data.maxNumRequests) - return waitForBlock, nil - - case stateTimeoutEv: - if data.stateName != "waitForBlock" { - fsm.logger.Error("received a state timeout event for different state", - "state", data.stateName) - return waitForBlock, errTimeoutEventWrongState - } - // We haven't received the block at current height or height+1. Remove peer. - fsm.pool.RemovePeerAtCurrentHeights(errNoPeerResponseForCurrentHeights) - fsm.resetStateTimer() - if fsm.pool.NumPeers() == 0 { - return waitForPeer, errNoPeerResponseForCurrentHeights - } - if fsm.pool.ReachedMaxHeight() { - return finished, nil - } - return waitForBlock, errNoPeerResponseForCurrentHeights - - case stopFSMEv: - if fsm.stateTimer != nil { - fsm.stateTimer.Stop() - } - return finished, errNoErrorFinished - - default: - return waitForBlock, errInvalidEvent - } - }, - } - - finished = &bcReactorFSMState{ - name: "finished", - enter: func(fsm *BcReactorFSM) { - fsm.logger.Info("Time to switch to consensus reactor!", "height", fsm.pool.Height) - fsm.toBcR.switchToConsensus() - fsm.cleanup() - }, - handle: func(fsm *BcReactorFSM, ev bReactorEvent, data bReactorEventData) (*bcReactorFSMState, error) { - return finished, nil - }, - } -} - -// Interface used by FSM for sending Block and Status requests, -// informing of peer errors and state timeouts -// Implemented by BlockchainReactor and tests -type bcReactor interface { - sendStatusRequest() - sendBlockRequest(peerID p2p.ID, height int64) error - sendPeerError(err error, peerID p2p.ID) - resetStateTimer(name string, timer **time.Timer, timeout time.Duration) - switchToConsensus() -} - -// SetLogger sets the FSM logger. -func (fsm *BcReactorFSM) SetLogger(l log.Logger) { - fsm.logger = l - fsm.pool.SetLogger(l) -} - -// Start starts the FSM. -func (fsm *BcReactorFSM) Start() { - _ = fsm.Handle(&bcReactorMessage{event: startFSMEv}) -} - -// Handle processes messages and events sent to the FSM. -func (fsm *BcReactorFSM) Handle(msg *bcReactorMessage) error { - fsm.mtx.Lock() - defer fsm.mtx.Unlock() - fsm.logger.Debug("FSM received", "event", msg, "state", fsm.state) - - if fsm.state == nil { - fsm.state = unknown - } - next, err := fsm.state.handle(fsm, msg.event, msg.data) - if err != nil { - fsm.logger.Error("FSM event handler returned", "err", err, - "state", fsm.state, "event", msg.event) - } - - oldState := fsm.state.name - fsm.transition(next) - if oldState != fsm.state.name { - fsm.logger.Info("FSM changed state", "new_state", fsm.state) - } - return err -} - -func (fsm *BcReactorFSM) transition(next *bcReactorFSMState) { - if next == nil { - return - } - if fsm.state != next { - fsm.state = next - if next.enter != nil { - next.enter(fsm) - } - } -} - -// Called when entering an FSM state in order to detect lack of progress in the state machine. -// Note the use of the 'bcr' interface to facilitate testing without timer expiring. -func (fsm *BcReactorFSM) resetStateTimer() { - fsm.toBcR.resetStateTimer(fsm.state.name, &fsm.stateTimer, fsm.state.timeout) -} - -func (fsm *BcReactorFSM) isCaughtUp() bool { - return fsm.state == finished -} - -func (fsm *BcReactorFSM) makeNextRequests(maxNumRequests int) { - fsm.pool.MakeNextRequests(maxNumRequests) -} - -func (fsm *BcReactorFSM) cleanup() { - fsm.pool.Cleanup() -} - -// NeedsBlocks checks if more block requests are required. -func (fsm *BcReactorFSM) NeedsBlocks() bool { - fsm.mtx.Lock() - defer fsm.mtx.Unlock() - return fsm.state.name == "waitForBlock" && fsm.pool.NeedsBlocks() -} - -// FirstTwoBlocks returns the two blocks at pool height and height+1 -func (fsm *BcReactorFSM) FirstTwoBlocks() (first, second *types.Block, err error) { - fsm.mtx.Lock() - defer fsm.mtx.Unlock() - firstBP, secondBP, err := fsm.pool.FirstTwoBlocksAndPeers() - if err == nil { - first = firstBP.block - second = secondBP.block - } - return -} - -// Status returns the pool's height and the maximum peer height. -func (fsm *BcReactorFSM) Status() (height, maxPeerHeight int64) { - fsm.mtx.Lock() - defer fsm.mtx.Unlock() - return fsm.pool.Height, fsm.pool.MaxPeerHeight -} diff --git a/blockchain/v1/reactor_fsm_test.go b/blockchain/v1/reactor_fsm_test.go deleted file mode 100644 index 9fdfe9c9b..000000000 --- a/blockchain/v1/reactor_fsm_test.go +++ /dev/null @@ -1,997 +0,0 @@ -package v1 - -import ( - "fmt" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "github.com/tendermint/tendermint/libs/log" - tmmath "github.com/tendermint/tendermint/libs/math" - tmrand "github.com/tendermint/tendermint/libs/rand" - "github.com/tendermint/tendermint/p2p" - "github.com/tendermint/tendermint/types" -) - -type lastBlockRequestT struct { - peerID p2p.ID - height int64 -} - -type lastPeerErrorT struct { - peerID p2p.ID - err error -} - -// reactor for FSM testing -type testReactor struct { - logger log.Logger - fsm *BcReactorFSM - numStatusRequests int - numBlockRequests int - lastBlockRequest lastBlockRequestT - lastPeerError lastPeerErrorT - stateTimerStarts map[string]int -} - -func sendEventToFSM(fsm *BcReactorFSM, ev bReactorEvent, data bReactorEventData) error { - return fsm.Handle(&bcReactorMessage{event: ev, data: data}) -} - -type fsmStepTestValues struct { - currentState string - event bReactorEvent - data bReactorEventData - - wantErr error - wantState string - wantStatusReqSent bool - wantReqIncreased bool - wantNewBlocks []int64 - wantRemovedPeers []p2p.ID -} - -// --------------------------------------------------------------------------- -// helper test function for different FSM events, state and expected behavior -func sStopFSMEv(current, expected string) fsmStepTestValues { - return fsmStepTestValues{ - currentState: current, - event: stopFSMEv, - wantState: expected, - wantErr: errNoErrorFinished} -} - -func sUnknownFSMEv(current string) fsmStepTestValues { - return fsmStepTestValues{ - currentState: current, - event: 1234, - wantState: current, - wantErr: errInvalidEvent} -} - -func sStartFSMEv() fsmStepTestValues { - return fsmStepTestValues{ - currentState: "unknown", - event: startFSMEv, - wantState: "waitForPeer", - wantStatusReqSent: true} -} - -func sStateTimeoutEv(current, expected string, timedoutState string, wantErr error) fsmStepTestValues { - return fsmStepTestValues{ - currentState: current, - event: stateTimeoutEv, - data: bReactorEventData{ - stateName: timedoutState, - }, - wantState: expected, - wantErr: wantErr, - } -} - -func sProcessedBlockEv(current, expected string, reactorError error) fsmStepTestValues { - return fsmStepTestValues{ - currentState: current, - event: processedBlockEv, - data: bReactorEventData{ - err: reactorError, - }, - wantState: expected, - wantErr: reactorError, - } -} - -func sNoBlockResponseEv(current, expected string, peerID p2p.ID, height int64, err error) fsmStepTestValues { - return fsmStepTestValues{ - currentState: current, - event: noBlockResponseEv, - data: bReactorEventData{ - peerID: peerID, - height: height, - }, - wantState: expected, - wantErr: err, - } -} - -func sStatusEv(current, expected string, peerID p2p.ID, height int64, err error) fsmStepTestValues { - return fsmStepTestValues{ - currentState: current, - event: statusResponseEv, - data: bReactorEventData{peerID: peerID, height: height}, - wantState: expected, - wantErr: err} -} - -func sMakeRequestsEv(current, expected string, maxPendingRequests int) fsmStepTestValues { - return fsmStepTestValues{ - currentState: current, - event: makeRequestsEv, - data: bReactorEventData{maxNumRequests: maxPendingRequests}, - wantState: expected, - wantReqIncreased: true, - } -} - -func sMakeRequestsEvErrored(current, expected string, - maxPendingRequests int, err error, peersRemoved []p2p.ID) fsmStepTestValues { - return fsmStepTestValues{ - currentState: current, - event: makeRequestsEv, - data: bReactorEventData{maxNumRequests: maxPendingRequests}, - wantState: expected, - wantErr: err, - wantRemovedPeers: peersRemoved, - wantReqIncreased: true, - } -} - -func sBlockRespEv(current, expected string, peerID p2p.ID, height int64, prevBlocks []int64) fsmStepTestValues { - txs := []types.Tx{types.Tx("foo"), types.Tx("bar")} - return fsmStepTestValues{ - currentState: current, - event: blockResponseEv, - data: bReactorEventData{ - peerID: peerID, - height: height, - block: types.MakeBlock(height, txs, nil, nil), - length: 100}, - wantState: expected, - wantNewBlocks: append(prevBlocks, height), - } -} - -func sBlockRespEvErrored(current, expected string, - peerID p2p.ID, height int64, prevBlocks []int64, wantErr error, peersRemoved []p2p.ID) fsmStepTestValues { - txs := []types.Tx{types.Tx("foo"), types.Tx("bar")} - - return fsmStepTestValues{ - currentState: current, - event: blockResponseEv, - data: bReactorEventData{ - peerID: peerID, - height: height, - block: types.MakeBlock(height, txs, nil, nil), - length: 100}, - wantState: expected, - wantErr: wantErr, - wantRemovedPeers: peersRemoved, - wantNewBlocks: prevBlocks, - } -} - -func sPeerRemoveEv(current, expected string, peerID p2p.ID, err error, peersRemoved []p2p.ID) fsmStepTestValues { - return fsmStepTestValues{ - currentState: current, - event: peerRemoveEv, - data: bReactorEventData{ - peerID: peerID, - err: err, - }, - wantState: expected, - wantRemovedPeers: peersRemoved, - } -} - -// -------------------------------------------- - -func newTestReactor(height int64) *testReactor { - testBcR := &testReactor{logger: log.TestingLogger(), stateTimerStarts: make(map[string]int)} - testBcR.fsm = NewFSM(height, testBcR) - testBcR.fsm.SetLogger(testBcR.logger) - return testBcR -} - -func fixBlockResponseEvStep(step *fsmStepTestValues, testBcR *testReactor) { - // There is currently no good way to know to which peer a block request was sent. - // So in some cases where it does not matter, before we simulate a block response - // we cheat and look where it is expected from. - if step.event == blockResponseEv { - height := step.data.height - peerID, ok := testBcR.fsm.pool.blocks[height] - if ok { - step.data.peerID = peerID - } - } -} - -type testFields struct { - name string - startingHeight int64 - maxRequestsPerPeer int - maxPendingRequests int - steps []fsmStepTestValues -} - -func executeFSMTests(t *testing.T, tests []testFields, matchRespToReq bool) { - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - // Create test reactor - testBcR := newTestReactor(tt.startingHeight) - - if tt.maxRequestsPerPeer != 0 { - maxRequestsPerPeer = tt.maxRequestsPerPeer - } - - for _, step := range tt.steps { - step := step - assert.Equal(t, step.currentState, testBcR.fsm.state.name) - - var heightBefore int64 - if step.event == processedBlockEv && step.data.err == errBlockVerificationFailure { - heightBefore = testBcR.fsm.pool.Height - } - oldNumStatusRequests := testBcR.numStatusRequests - oldNumBlockRequests := testBcR.numBlockRequests - if matchRespToReq { - fixBlockResponseEvStep(&step, testBcR) - } - - fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data) - assert.Equal(t, step.wantErr, fsmErr) - - if step.wantStatusReqSent { - assert.Equal(t, oldNumStatusRequests+1, testBcR.numStatusRequests) - } else { - assert.Equal(t, oldNumStatusRequests, testBcR.numStatusRequests) - } - - if step.wantReqIncreased { - assert.True(t, oldNumBlockRequests < testBcR.numBlockRequests) - } else { - assert.Equal(t, oldNumBlockRequests, testBcR.numBlockRequests) - } - - for _, height := range step.wantNewBlocks { - _, err := testBcR.fsm.pool.BlockAndPeerAtHeight(height) - assert.Nil(t, err) - } - if step.event == processedBlockEv && step.data.err == errBlockVerificationFailure { - heightAfter := testBcR.fsm.pool.Height - assert.Equal(t, heightBefore, heightAfter) - firstAfter, err1 := testBcR.fsm.pool.BlockAndPeerAtHeight(testBcR.fsm.pool.Height) - secondAfter, err2 := testBcR.fsm.pool.BlockAndPeerAtHeight(testBcR.fsm.pool.Height + 1) - assert.NotNil(t, err1) - assert.NotNil(t, err2) - assert.Nil(t, firstAfter) - assert.Nil(t, secondAfter) - } - - assert.Equal(t, step.wantState, testBcR.fsm.state.name) - - if step.wantState == "finished" { - assert.True(t, testBcR.fsm.isCaughtUp()) - } - } - }) - } -} - -func TestFSMBasic(t *testing.T) { - tests := []testFields{ - { - name: "one block, one peer - TS2", - startingHeight: 1, - maxRequestsPerPeer: 2, - steps: []fsmStepTestValues{ - sStartFSMEv(), - sStatusEv("waitForPeer", "waitForBlock", "P1", 2, nil), - sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), - sBlockRespEv("waitForBlock", "waitForBlock", "P1", 1, []int64{}), - sBlockRespEv("waitForBlock", "waitForBlock", "P2", 2, []int64{1}), - sProcessedBlockEv("waitForBlock", "finished", nil), - }, - }, - { - name: "multi block, multi peer - TS2", - startingHeight: 1, - maxRequestsPerPeer: 2, - steps: []fsmStepTestValues{ - sStartFSMEv(), - sStatusEv("waitForPeer", "waitForBlock", "P1", 4, nil), - sStatusEv("waitForBlock", "waitForBlock", "P2", 4, nil), - sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), - - sBlockRespEv("waitForBlock", "waitForBlock", "P1", 1, []int64{}), - sBlockRespEv("waitForBlock", "waitForBlock", "P1", 2, []int64{1}), - sBlockRespEv("waitForBlock", "waitForBlock", "P2", 3, []int64{1, 2}), - sBlockRespEv("waitForBlock", "waitForBlock", "P2", 4, []int64{1, 2, 3}), - - sProcessedBlockEv("waitForBlock", "waitForBlock", nil), - sProcessedBlockEv("waitForBlock", "waitForBlock", nil), - sProcessedBlockEv("waitForBlock", "finished", nil), - }, - }, - } - - executeFSMTests(t, tests, true) -} - -func TestFSMBlockVerificationFailure(t *testing.T) { - tests := []testFields{ - { - name: "block verification failure - TS2 variant", - startingHeight: 1, - maxRequestsPerPeer: 3, - steps: []fsmStepTestValues{ - sStartFSMEv(), - - // add P1 and get blocks 1-3 from it - sStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil), - sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), - sBlockRespEv("waitForBlock", "waitForBlock", "P1", 1, []int64{}), - sBlockRespEv("waitForBlock", "waitForBlock", "P1", 2, []int64{1}), - sBlockRespEv("waitForBlock", "waitForBlock", "P1", 3, []int64{1, 2}), - - // add P2 - sStatusEv("waitForBlock", "waitForBlock", "P2", 3, nil), - - // process block failure, should remove P1 and all blocks - sProcessedBlockEv("waitForBlock", "waitForBlock", errBlockVerificationFailure), - - // get blocks 1-3 from P2 - sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), - sBlockRespEv("waitForBlock", "waitForBlock", "P2", 1, []int64{}), - sBlockRespEv("waitForBlock", "waitForBlock", "P2", 2, []int64{1}), - sBlockRespEv("waitForBlock", "waitForBlock", "P2", 3, []int64{1, 2}), - - // finish after processing blocks 1 and 2 - sProcessedBlockEv("waitForBlock", "waitForBlock", nil), - sProcessedBlockEv("waitForBlock", "finished", nil), - }, - }, - } - - executeFSMTests(t, tests, false) -} - -func TestFSMNoBlockResponse(t *testing.T) { - tests := []testFields{ - { - name: "no block response", - startingHeight: 1, - maxRequestsPerPeer: 3, - steps: []fsmStepTestValues{ - sStartFSMEv(), - - // add P1 and get blocks 1-3 from it - sStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil), - sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), - sBlockRespEv("waitForBlock", "waitForBlock", "P1", 1, []int64{}), - sBlockRespEv("waitForBlock", "waitForBlock", "P1", 2, []int64{1}), - sBlockRespEv("waitForBlock", "waitForBlock", "P1", 3, []int64{1, 2}), - - // add P2 - sStatusEv("waitForBlock", "waitForBlock", "P2", 3, nil), - - // process block failure, should remove P1 and all blocks - sNoBlockResponseEv("waitForBlock", "waitForBlock", "P1", 1, nil), - sNoBlockResponseEv("waitForBlock", "waitForBlock", "P1", 2, nil), - sNoBlockResponseEv("waitForBlock", "waitForBlock", "P1", 3, nil), - - // get blocks 1-3 from P2 - sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), - sBlockRespEv("waitForBlock", "waitForBlock", "P2", 1, []int64{}), - sBlockRespEv("waitForBlock", "waitForBlock", "P2", 2, []int64{1}), - sBlockRespEv("waitForBlock", "waitForBlock", "P2", 3, []int64{1, 2}), - - // finish after processing blocks 1 and 2 - sProcessedBlockEv("waitForBlock", "waitForBlock", nil), - sProcessedBlockEv("waitForBlock", "finished", nil), - }, - }, - } - - executeFSMTests(t, tests, false) -} - -func TestFSMBadBlockFromPeer(t *testing.T) { - tests := []testFields{ - { - name: "block we haven't asked for", - startingHeight: 1, - maxRequestsPerPeer: 3, - steps: []fsmStepTestValues{ - sStartFSMEv(), - // add P1 and ask for blocks 1-3 - sStatusEv("waitForPeer", "waitForBlock", "P1", 300, nil), - sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), - - // blockResponseEv for height 100 should cause an error - sBlockRespEvErrored("waitForBlock", "waitForPeer", - "P1", 100, []int64{}, errMissingBlock, []p2p.ID{}), - }, - }, - { - name: "block we already have", - startingHeight: 1, - maxRequestsPerPeer: 3, - steps: []fsmStepTestValues{ - sStartFSMEv(), - // add P1 and get block 1 - sStatusEv("waitForPeer", "waitForBlock", "P1", 100, nil), - sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), - sBlockRespEv("waitForBlock", "waitForBlock", - "P1", 1, []int64{}), - - // Get block 1 again. Since peer is removed together with block 1, - // the blocks present in the pool should be {} - sBlockRespEvErrored("waitForBlock", "waitForPeer", - "P1", 1, []int64{}, errDuplicateBlock, []p2p.ID{"P1"}), - }, - }, - { - name: "block from unknown peer", - startingHeight: 1, - maxRequestsPerPeer: 3, - steps: []fsmStepTestValues{ - sStartFSMEv(), - // add P1 and get block 1 - sStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil), - - // get block 1 from unknown peer P2 - sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), - sBlockRespEvErrored("waitForBlock", "waitForBlock", - "P2", 1, []int64{}, errBadDataFromPeer, []p2p.ID{"P2"}), - }, - }, - { - name: "block from wrong peer", - startingHeight: 1, - maxRequestsPerPeer: 3, - steps: []fsmStepTestValues{ - sStartFSMEv(), - // add P1, make requests for blocks 1-3 to P1 - sStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil), - sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), - - // add P2 - sStatusEv("waitForBlock", "waitForBlock", "P2", 3, nil), - - // receive block 1 from P2 - sBlockRespEvErrored("waitForBlock", "waitForBlock", - "P2", 1, []int64{}, errBadDataFromPeer, []p2p.ID{"P2"}), - }, - }, - } - - executeFSMTests(t, tests, false) -} - -func TestFSMBlockAtCurrentHeightDoesNotArriveInTime(t *testing.T) { - tests := []testFields{ - { - name: "block at current height undelivered - TS5", - startingHeight: 1, - maxRequestsPerPeer: 3, - steps: []fsmStepTestValues{ - sStartFSMEv(), - // add P1, get blocks 1 and 2, process block 1 - sStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil), - sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), - sBlockRespEv("waitForBlock", "waitForBlock", - "P1", 1, []int64{}), - sBlockRespEv("waitForBlock", "waitForBlock", - "P1", 2, []int64{1}), - sProcessedBlockEv("waitForBlock", "waitForBlock", nil), - - // add P2 - sStatusEv("waitForBlock", "waitForBlock", "P2", 3, nil), - - // timeout on block 3, P1 should be removed - sStateTimeoutEv("waitForBlock", "waitForBlock", "waitForBlock", errNoPeerResponseForCurrentHeights), - - // make requests and finish by receiving blocks 2 and 3 from P2 - sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), - sBlockRespEv("waitForBlock", "waitForBlock", "P2", 2, []int64{}), - sBlockRespEv("waitForBlock", "waitForBlock", "P2", 3, []int64{2}), - sProcessedBlockEv("waitForBlock", "finished", nil), - }, - }, - { - name: "block at current height undelivered, at maxPeerHeight after peer removal - TS3", - startingHeight: 1, - maxRequestsPerPeer: 3, - steps: []fsmStepTestValues{ - sStartFSMEv(), - // add P1, request blocks 1-3 from P1 - sStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil), - sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), - - // add P2 (tallest) - sStatusEv("waitForBlock", "waitForBlock", "P2", 30, nil), - sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), - - // receive blocks 1-3 from P1 - sBlockRespEv("waitForBlock", "waitForBlock", "P1", 1, []int64{}), - sBlockRespEv("waitForBlock", "waitForBlock", "P1", 2, []int64{1}), - sBlockRespEv("waitForBlock", "waitForBlock", "P1", 3, []int64{1, 2}), - - // process blocks at heights 1 and 2 - sProcessedBlockEv("waitForBlock", "waitForBlock", nil), - sProcessedBlockEv("waitForBlock", "waitForBlock", nil), - - // timeout on block at height 4 - sStateTimeoutEv("waitForBlock", "finished", "waitForBlock", nil), - }, - }, - } - - executeFSMTests(t, tests, true) -} - -func TestFSMPeerRelatedEvents(t *testing.T) { - tests := []testFields{ - { - name: "peer remove event with no blocks", - startingHeight: 1, - steps: []fsmStepTestValues{ - sStartFSMEv(), - // add P1, P2, P3 - sStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil), - sStatusEv("waitForBlock", "waitForBlock", "P2", 3, nil), - sStatusEv("waitForBlock", "waitForBlock", "P3", 3, nil), - - // switch removes P2 - sPeerRemoveEv("waitForBlock", "waitForBlock", "P2", errSwitchRemovesPeer, []p2p.ID{"P2"}), - }, - }, - { - name: "only peer removed while in waitForBlock state", - startingHeight: 100, - steps: []fsmStepTestValues{ - sStartFSMEv(), - // add P1 - sStatusEv("waitForPeer", "waitForBlock", "P1", 200, nil), - - // switch removes P1 - sPeerRemoveEv("waitForBlock", "waitForPeer", "P1", errSwitchRemovesPeer, []p2p.ID{"P1"}), - }, - }, - { - name: "highest peer removed while in waitForBlock state, node reaches maxPeerHeight - TS4 ", - startingHeight: 100, - maxRequestsPerPeer: 3, - steps: []fsmStepTestValues{ - sStartFSMEv(), - // add P1 and make requests - sStatusEv("waitForPeer", "waitForBlock", "P1", 101, nil), - sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), - // add P2 - sStatusEv("waitForBlock", "waitForBlock", "P2", 200, nil), - - // get blocks 100 and 101 from P1 and process block at height 100 - sBlockRespEv("waitForBlock", "waitForBlock", "P1", 100, []int64{}), - sBlockRespEv("waitForBlock", "waitForBlock", "P1", 101, []int64{100}), - sProcessedBlockEv("waitForBlock", "waitForBlock", nil), - - // switch removes peer P1, should be finished - sPeerRemoveEv("waitForBlock", "finished", "P2", errSwitchRemovesPeer, []p2p.ID{"P2"}), - }, - }, - { - name: "highest peer lowers its height in waitForBlock state, node reaches maxPeerHeight - TS4", - startingHeight: 100, - maxRequestsPerPeer: 3, - steps: []fsmStepTestValues{ - sStartFSMEv(), - // add P1 and make requests - sStatusEv("waitForPeer", "waitForBlock", "P1", 101, nil), - sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), - - // add P2 - sStatusEv("waitForBlock", "waitForBlock", "P2", 200, nil), - - // get blocks 100 and 101 from P1 - sBlockRespEv("waitForBlock", "waitForBlock", "P1", 100, []int64{}), - sBlockRespEv("waitForBlock", "waitForBlock", "P1", 101, []int64{100}), - - // processed block at heights 100 - sProcessedBlockEv("waitForBlock", "waitForBlock", nil), - - // P2 becomes short - sStatusEv("waitForBlock", "finished", "P2", 100, errPeerLowersItsHeight), - }, - }, - { - name: "new short peer while in waitForPeer state", - startingHeight: 100, - steps: []fsmStepTestValues{ - sStartFSMEv(), - sStatusEv("waitForPeer", "waitForPeer", "P1", 3, errPeerTooShort), - }, - }, - { - name: "new short peer while in waitForBlock state", - startingHeight: 100, - steps: []fsmStepTestValues{ - sStartFSMEv(), - sStatusEv("waitForPeer", "waitForBlock", "P1", 200, nil), - sStatusEv("waitForBlock", "waitForBlock", "P2", 3, errPeerTooShort), - }, - }, - { - name: "only peer updated with low height while in waitForBlock state", - startingHeight: 100, - steps: []fsmStepTestValues{ - sStartFSMEv(), - sStatusEv("waitForPeer", "waitForBlock", "P1", 200, nil), - sStatusEv("waitForBlock", "waitForPeer", "P1", 3, errPeerLowersItsHeight), - }, - }, - { - name: "peer does not exist in the switch", - startingHeight: 9999999, - maxRequestsPerPeer: 3, - steps: []fsmStepTestValues{ - sStartFSMEv(), - // add P1 - sStatusEv("waitForPeer", "waitForBlock", "P1", 20000000, nil), - // send request for block 9999999 - // Note: For this block request the "switch missing the peer" error is simulated, - // see implementation of bcReactor interface, sendBlockRequest(), in this file. - sMakeRequestsEvErrored("waitForBlock", "waitForBlock", - maxNumRequests, nil, []p2p.ID{"P1"}), - }, - }, - } - - executeFSMTests(t, tests, true) -} - -func TestFSMStopFSM(t *testing.T) { - tests := []testFields{ - { - name: "stopFSMEv in unknown", - steps: []fsmStepTestValues{ - sStopFSMEv("unknown", "finished"), - }, - }, - { - name: "stopFSMEv in waitForPeer", - startingHeight: 1, - steps: []fsmStepTestValues{ - sStartFSMEv(), - sStopFSMEv("waitForPeer", "finished"), - }, - }, - { - name: "stopFSMEv in waitForBlock", - startingHeight: 1, - steps: []fsmStepTestValues{ - sStartFSMEv(), - sStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil), - sStopFSMEv("waitForBlock", "finished"), - }, - }, - } - - executeFSMTests(t, tests, false) -} - -func TestFSMUnknownElements(t *testing.T) { - tests := []testFields{ - { - name: "unknown event for state unknown", - steps: []fsmStepTestValues{ - sUnknownFSMEv("unknown"), - }, - }, - { - name: "unknown event for state waitForPeer", - steps: []fsmStepTestValues{ - sStartFSMEv(), - sUnknownFSMEv("waitForPeer"), - }, - }, - { - name: "unknown event for state waitForBlock", - startingHeight: 1, - steps: []fsmStepTestValues{ - sStartFSMEv(), - sStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil), - sUnknownFSMEv("waitForBlock"), - }, - }, - } - - executeFSMTests(t, tests, false) -} - -func TestFSMPeerStateTimeoutEvent(t *testing.T) { - tests := []testFields{ - { - name: "timeout event for state waitForPeer while in state waitForPeer - TS1", - startingHeight: 1, - maxRequestsPerPeer: 3, - steps: []fsmStepTestValues{ - sStartFSMEv(), - sStateTimeoutEv("waitForPeer", "finished", "waitForPeer", errNoTallerPeer), - }, - }, - { - name: "timeout event for state waitForPeer while in a state != waitForPeer", - startingHeight: 1, - maxRequestsPerPeer: 3, - steps: []fsmStepTestValues{ - sStartFSMEv(), - sStateTimeoutEv("waitForPeer", "waitForPeer", "waitForBlock", errTimeoutEventWrongState), - }, - }, - { - name: "timeout event for state waitForBlock while in state waitForBlock ", - startingHeight: 1, - maxRequestsPerPeer: 3, - steps: []fsmStepTestValues{ - sStartFSMEv(), - sStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil), - sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), - sStateTimeoutEv("waitForBlock", "waitForPeer", "waitForBlock", errNoPeerResponseForCurrentHeights), - }, - }, - { - name: "timeout event for state waitForBlock while in a state != waitForBlock", - startingHeight: 1, - maxRequestsPerPeer: 3, - steps: []fsmStepTestValues{ - sStartFSMEv(), - sStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil), - sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), - sStateTimeoutEv("waitForBlock", "waitForBlock", "waitForPeer", errTimeoutEventWrongState), - }, - }, - { - name: "timeout event for state waitForBlock with multiple peers", - startingHeight: 1, - maxRequestsPerPeer: 3, - steps: []fsmStepTestValues{ - sStartFSMEv(), - sStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil), - sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), - sStatusEv("waitForBlock", "waitForBlock", "P2", 3, nil), - sStateTimeoutEv("waitForBlock", "waitForBlock", "waitForBlock", errNoPeerResponseForCurrentHeights), - }, - }, - } - - executeFSMTests(t, tests, false) -} - -func makeCorrectTransitionSequence(startingHeight int64, numBlocks int64, numPeers int, randomPeerHeights bool, - maxRequestsPerPeer int, maxPendingRequests int) testFields { - - // Generate numPeers peers with random or numBlocks heights according to the randomPeerHeights flag. - peerHeights := make([]int64, numPeers) - for i := 0; i < numPeers; i++ { - if i == 0 { - peerHeights[0] = numBlocks - continue - } - if randomPeerHeights { - peerHeights[i] = int64(tmmath.MaxInt(tmrand.Intn(int(numBlocks)), int(startingHeight)+1)) - } else { - peerHeights[i] = numBlocks - } - } - - // Approximate the slice capacity to save time for appends. - testSteps := make([]fsmStepTestValues, 0, 3*numBlocks+int64(numPeers)) - - testName := fmt.Sprintf("%v-blocks %v-startingHeight %v-peers %v-maxRequestsPerPeer %v-maxNumRequests", - numBlocks, startingHeight, numPeers, maxRequestsPerPeer, maxPendingRequests) - - // Add startFSMEv step. - testSteps = append(testSteps, sStartFSMEv()) - - // For each peer, add statusResponseEv step. - for i := 0; i < numPeers; i++ { - peerName := fmt.Sprintf("P%d", i) - if i == 0 { - testSteps = append( - testSteps, - sStatusEv("waitForPeer", "waitForBlock", p2p.ID(peerName), peerHeights[i], nil)) - } else { - testSteps = append(testSteps, - sStatusEv("waitForBlock", "waitForBlock", p2p.ID(peerName), peerHeights[i], nil)) - } - } - - height := startingHeight - numBlocksReceived := 0 - prevBlocks := make([]int64, 0, maxPendingRequests) - -forLoop: - for i := 0; i < int(numBlocks); i++ { - - // Add the makeRequestEv step periodically. - if i%maxRequestsPerPeer == 0 { - testSteps = append( - testSteps, - sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), - ) - } - - // Add the blockRespEv step - testSteps = append( - testSteps, - sBlockRespEv("waitForBlock", "waitForBlock", - "P0", height, prevBlocks)) - prevBlocks = append(prevBlocks, height) - height++ - numBlocksReceived++ - - // Add the processedBlockEv step periodically. - if numBlocksReceived >= maxRequestsPerPeer || height >= numBlocks { - for j := int(height) - numBlocksReceived; j < int(height); j++ { - if j >= int(numBlocks) { - // This is the last block that is processed, we should be in "finished" state. - testSteps = append( - testSteps, - sProcessedBlockEv("waitForBlock", "finished", nil)) - break forLoop - } - testSteps = append( - testSteps, - sProcessedBlockEv("waitForBlock", "waitForBlock", nil)) - } - numBlocksReceived = 0 - prevBlocks = make([]int64, 0, maxPendingRequests) - } - } - - return testFields{ - name: testName, - startingHeight: startingHeight, - maxRequestsPerPeer: maxRequestsPerPeer, - maxPendingRequests: maxPendingRequests, - steps: testSteps, - } -} - -const ( - maxStartingHeightTest = 100 - maxRequestsPerPeerTest = 20 - maxTotalPendingRequestsTest = 600 - maxNumPeersTest = 1000 - maxNumBlocksInChainTest = 10000 // should be smaller than 9999999 -) - -func makeCorrectTransitionSequenceWithRandomParameters() testFields { - // Generate a starting height for fast sync. - startingHeight := int64(tmrand.Intn(maxStartingHeightTest) + 1) - - // Generate the number of requests per peer. - maxRequestsPerPeer := tmrand.Intn(maxRequestsPerPeerTest) + 1 - - // Generate the maximum number of total pending requests, >= maxRequestsPerPeer. - maxPendingRequests := tmrand.Intn(maxTotalPendingRequestsTest-maxRequestsPerPeer) + maxRequestsPerPeer - - // Generate the number of blocks to be synced. - numBlocks := int64(tmrand.Intn(maxNumBlocksInChainTest)) + startingHeight - - // Generate a number of peers. - numPeers := tmrand.Intn(maxNumPeersTest) + 1 - - return makeCorrectTransitionSequence(startingHeight, numBlocks, numPeers, true, maxRequestsPerPeer, maxPendingRequests) -} - -func shouldApplyProcessedBlockEvStep(step *fsmStepTestValues, testBcR *testReactor) bool { - if step.event == processedBlockEv { - _, err := testBcR.fsm.pool.BlockAndPeerAtHeight(testBcR.fsm.pool.Height) - if err == errMissingBlock { - return false - } - _, err = testBcR.fsm.pool.BlockAndPeerAtHeight(testBcR.fsm.pool.Height + 1) - if err == errMissingBlock { - return false - } - } - return true -} - -func TestFSMCorrectTransitionSequences(t *testing.T) { - - tests := []testFields{ - makeCorrectTransitionSequence(1, 100, 10, true, 10, 40), - makeCorrectTransitionSequenceWithRandomParameters(), - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - // Create test reactor - testBcR := newTestReactor(tt.startingHeight) - - if tt.maxRequestsPerPeer != 0 { - maxRequestsPerPeer = tt.maxRequestsPerPeer - } - - for _, step := range tt.steps { - step := step - assert.Equal(t, step.currentState, testBcR.fsm.state.name) - - oldNumStatusRequests := testBcR.numStatusRequests - fixBlockResponseEvStep(&step, testBcR) - if !shouldApplyProcessedBlockEvStep(&step, testBcR) { - continue - } - - fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data) - assert.Equal(t, step.wantErr, fsmErr) - - if step.wantStatusReqSent { - assert.Equal(t, oldNumStatusRequests+1, testBcR.numStatusRequests) - } else { - assert.Equal(t, oldNumStatusRequests, testBcR.numStatusRequests) - } - - assert.Equal(t, step.wantState, testBcR.fsm.state.name) - if step.wantState == "finished" { - assert.True(t, testBcR.fsm.isCaughtUp()) - } - } - - }) - } -} - -// ---------------------------------------- -// implements the bcRNotifier -func (testR *testReactor) sendPeerError(err error, peerID p2p.ID) { - testR.logger.Info("Reactor received sendPeerError call from FSM", "peer", peerID, "err", err) - testR.lastPeerError.peerID = peerID - testR.lastPeerError.err = err -} - -func (testR *testReactor) sendStatusRequest() { - testR.logger.Info("Reactor received sendStatusRequest call from FSM") - testR.numStatusRequests++ -} - -func (testR *testReactor) sendBlockRequest(peerID p2p.ID, height int64) error { - testR.logger.Info("Reactor received sendBlockRequest call from FSM", "peer", peerID, "height", height) - testR.numBlockRequests++ - testR.lastBlockRequest.peerID = peerID - testR.lastBlockRequest.height = height - if height == 9999999 { - // simulate switch does not have peer - return errNilPeerForBlockRequest - } - return nil -} - -func (testR *testReactor) resetStateTimer(name string, timer **time.Timer, timeout time.Duration) { - testR.logger.Info("Reactor received resetStateTimer call from FSM", "state", name, "timeout", timeout) - if _, ok := testR.stateTimerStarts[name]; !ok { - testR.stateTimerStarts[name] = 1 - } else { - testR.stateTimerStarts[name]++ - } -} - -func (testR *testReactor) switchToConsensus() { -} - -// ---------------------------------------- diff --git a/blockchain/v1/reactor_test.go b/blockchain/v1/reactor_test.go deleted file mode 100644 index 3aaf7e913..000000000 --- a/blockchain/v1/reactor_test.go +++ /dev/null @@ -1,363 +0,0 @@ -package v1 - -import ( - "fmt" - "os" - "sort" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - dbm "github.com/tendermint/tm-db" - - abci "github.com/tendermint/tendermint/abci/types" - cfg "github.com/tendermint/tendermint/config" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/mempool/mock" - "github.com/tendermint/tendermint/p2p" - tmproto "github.com/tendermint/tendermint/proto/tendermint/types" - "github.com/tendermint/tendermint/proxy" - sm "github.com/tendermint/tendermint/state" - "github.com/tendermint/tendermint/store" - "github.com/tendermint/tendermint/types" - tmtime "github.com/tendermint/tendermint/types/time" -) - -var config *cfg.Config - -func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.GenesisDoc, []types.PrivValidator) { - validators := make([]types.GenesisValidator, numValidators) - privValidators := make([]types.PrivValidator, numValidators) - for i := 0; i < numValidators; i++ { - val, privVal := types.RandValidator(randPower, minPower) - validators[i] = types.GenesisValidator{ - PubKey: val.PubKey, - Power: val.VotingPower, - } - privValidators[i] = privVal - } - sort.Sort(types.PrivValidatorsByAddress(privValidators)) - - return &types.GenesisDoc{ - GenesisTime: tmtime.Now(), - ChainID: config.ChainID(), - Validators: validators, - }, privValidators -} - -func makeVote( - t *testing.T, - header *types.Header, - blockID types.BlockID, - valset *types.ValidatorSet, - privVal types.PrivValidator) *types.Vote { - - pubKey, err := privVal.GetPubKey() - require.NoError(t, err) - - valIdx, _ := valset.GetByAddress(pubKey.Address()) - vote := &types.Vote{ - ValidatorAddress: pubKey.Address(), - ValidatorIndex: valIdx, - Height: header.Height, - Round: 1, - Timestamp: tmtime.Now(), - Type: tmproto.PrecommitType, - BlockID: blockID, - } - - vpb := vote.ToProto() - - _ = privVal.SignVote(header.ChainID, vpb) - vote.Signature = vpb.Signature - - return vote -} - -type BlockchainReactorPair struct { - bcR *BlockchainReactor - conR *consensusReactorTest -} - -func newBlockchainReactor( - t *testing.T, - logger log.Logger, - genDoc *types.GenesisDoc, - privVals []types.PrivValidator, - maxBlockHeight int64) *BlockchainReactor { - if len(privVals) != 1 { - panic("only support one validator") - } - - app := &testApp{} - cc := proxy.NewLocalClientCreator(app) - proxyApp := proxy.NewAppConns(cc) - err := proxyApp.Start() - if err != nil { - panic(fmt.Errorf("error start app: %w", err)) - } - - blockDB := dbm.NewMemDB() - stateDB := dbm.NewMemDB() - stateStore := sm.NewStore(stateDB) - blockStore := store.NewBlockStore(blockDB) - - state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc) - if err != nil { - panic(fmt.Errorf("error constructing state from genesis file: %w", err)) - } - - // Make the BlockchainReactor itself. - // NOTE we have to create and commit the blocks first because - // pool.height is determined from the store. - fastSync := true - db := dbm.NewMemDB() - stateStore = sm.NewStore(db) - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), - mock.Mempool{}, sm.EmptyEvidencePool{}) - if err = stateStore.Save(state); err != nil { - panic(err) - } - - // let's add some blocks in - for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ { - lastCommit := types.NewCommit(blockHeight-1, 1, types.BlockID{}, nil) - if blockHeight > 1 { - lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1) - lastBlock := blockStore.LoadBlock(blockHeight - 1) - - vote := makeVote(t, &lastBlock.Header, lastBlockMeta.BlockID, state.Validators, privVals[0]) - lastCommit = types.NewCommit(vote.Height, vote.Round, lastBlockMeta.BlockID, []types.CommitSig{vote.CommitSig()}) - } - - thisBlock := makeBlock(blockHeight, state, lastCommit) - - thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes) - blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()} - - state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock) - if err != nil { - panic(fmt.Errorf("error apply block: %w", err)) - } - - blockStore.SaveBlock(thisBlock, thisParts, lastCommit) - } - - bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) - bcReactor.SetLogger(logger.With("module", "blockchain")) - - return bcReactor -} - -func newBlockchainReactorPair( - t *testing.T, - logger log.Logger, - genDoc *types.GenesisDoc, - privVals []types.PrivValidator, - maxBlockHeight int64) BlockchainReactorPair { - - consensusReactor := &consensusReactorTest{} - consensusReactor.BaseReactor = *p2p.NewBaseReactor("Consensus reactor", consensusReactor) - - return BlockchainReactorPair{ - newBlockchainReactor(t, logger, genDoc, privVals, maxBlockHeight), - consensusReactor} -} - -type consensusReactorTest struct { - p2p.BaseReactor // BaseService + p2p.Switch - switchedToConsensus bool - mtx sync.Mutex -} - -func (conR *consensusReactorTest) SwitchToConsensus(state sm.State, blocksSynced bool) { - conR.mtx.Lock() - defer conR.mtx.Unlock() - conR.switchedToConsensus = true -} - -func TestFastSyncNoBlockResponse(t *testing.T) { - - config = cfg.ResetTestRoot("blockchain_new_reactor_test") - defer os.RemoveAll(config.RootDir) - genDoc, privVals := randGenesisDoc(1, false, 30) - - maxBlockHeight := int64(65) - - reactorPairs := make([]BlockchainReactorPair, 2) - - logger := log.TestingLogger() - reactorPairs[0] = newBlockchainReactorPair(t, logger, genDoc, privVals, maxBlockHeight) - reactorPairs[1] = newBlockchainReactorPair(t, logger, genDoc, privVals, 0) - - p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("BLOCKCHAIN", reactorPairs[i].bcR) - s.AddReactor("CONSENSUS", reactorPairs[i].conR) - moduleName := fmt.Sprintf("blockchain-%v", i) - reactorPairs[i].bcR.SetLogger(logger.With("module", moduleName)) - - return s - - }, p2p.Connect2Switches) - - defer func() { - for _, r := range reactorPairs { - _ = r.bcR.Stop() - _ = r.conR.Stop() - } - }() - - tests := []struct { - height int64 - existent bool - }{ - {maxBlockHeight + 2, false}, - {10, true}, - {1, true}, - {maxBlockHeight + 100, false}, - } - - for { - time.Sleep(10 * time.Millisecond) - reactorPairs[1].conR.mtx.Lock() - if reactorPairs[1].conR.switchedToConsensus { - reactorPairs[1].conR.mtx.Unlock() - break - } - reactorPairs[1].conR.mtx.Unlock() - } - - assert.Equal(t, maxBlockHeight, reactorPairs[0].bcR.store.Height()) - - for _, tt := range tests { - block := reactorPairs[1].bcR.store.LoadBlock(tt.height) - if tt.existent { - assert.True(t, block != nil) - } else { - assert.True(t, block == nil) - } - } -} - -// NOTE: This is too hard to test without an easy way to add test peer to -// switch or without significant refactoring of the module. Alternatively we -// could actually dial a TCP conn but that seems extreme. -func TestFastSyncBadBlockStopsPeer(t *testing.T) { - numNodes := 4 - maxBlockHeight := int64(148) - - config = cfg.ResetTestRoot("blockchain_reactor_test") - defer os.RemoveAll(config.RootDir) - genDoc, privVals := randGenesisDoc(1, false, 30) - - otherChain := newBlockchainReactorPair(t, log.TestingLogger(), genDoc, privVals, maxBlockHeight) - defer func() { - _ = otherChain.bcR.Stop() - _ = otherChain.conR.Stop() - }() - - reactorPairs := make([]BlockchainReactorPair, numNodes) - logger := make([]log.Logger, numNodes) - - for i := 0; i < numNodes; i++ { - logger[i] = log.TestingLogger() - height := int64(0) - if i == 0 { - height = maxBlockHeight - } - reactorPairs[i] = newBlockchainReactorPair(t, logger[i], genDoc, privVals, height) - } - - switches := p2p.MakeConnectedSwitches(config.P2P, numNodes, func(i int, s *p2p.Switch) *p2p.Switch { - reactorPairs[i].conR.mtx.Lock() - s.AddReactor("BLOCKCHAIN", reactorPairs[i].bcR) - s.AddReactor("CONSENSUS", reactorPairs[i].conR) - moduleName := fmt.Sprintf("blockchain-%v", i) - reactorPairs[i].bcR.SetLogger(logger[i].With("module", moduleName)) - reactorPairs[i].conR.mtx.Unlock() - return s - - }, p2p.Connect2Switches) - - defer func() { - for _, r := range reactorPairs { - _ = r.bcR.Stop() - _ = r.conR.Stop() - } - }() - -outerFor: - for { - time.Sleep(10 * time.Millisecond) - for i := 0; i < numNodes; i++ { - reactorPairs[i].conR.mtx.Lock() - if !reactorPairs[i].conR.switchedToConsensus { - reactorPairs[i].conR.mtx.Unlock() - continue outerFor - } - reactorPairs[i].conR.mtx.Unlock() - } - break - } - - // at this time, reactors[0-3] is the newest - assert.Equal(t, numNodes-1, reactorPairs[1].bcR.Switch.Peers().Size()) - - // mark last reactorPair as an invalid peer - reactorPairs[numNodes-1].bcR.store = otherChain.bcR.store - - lastLogger := log.TestingLogger() - lastReactorPair := newBlockchainReactorPair(t, lastLogger, genDoc, privVals, 0) - reactorPairs = append(reactorPairs, lastReactorPair) - - switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].bcR) - s.AddReactor("CONSENSUS", reactorPairs[len(reactorPairs)-1].conR) - moduleName := fmt.Sprintf("blockchain-%v", len(reactorPairs)-1) - reactorPairs[len(reactorPairs)-1].bcR.SetLogger(lastLogger.With("module", moduleName)) - return s - - }, p2p.Connect2Switches)...) - - for i := 0; i < len(reactorPairs)-1; i++ { - p2p.Connect2Switches(switches, i, len(reactorPairs)-1) - } - - for { - time.Sleep(1 * time.Second) - lastReactorPair.conR.mtx.Lock() - if lastReactorPair.conR.switchedToConsensus { - lastReactorPair.conR.mtx.Unlock() - break - } - lastReactorPair.conR.mtx.Unlock() - - if lastReactorPair.bcR.Switch.Peers().Size() == 0 { - break - } - } - - assert.True(t, lastReactorPair.bcR.Switch.Peers().Size() < len(reactorPairs)-1) -} - -//---------------------------------------------- -// utility funcs - -func makeTxs(height int64) (txs []types.Tx) { - for i := 0; i < 10; i++ { - txs = append(txs, types.Tx([]byte{byte(height), byte(i)})) - } - return txs -} - -func makeBlock(height int64, state sm.State, lastCommit *types.Commit) *types.Block { - block, _ := state.MakeBlock(height, makeTxs(height), lastCommit, nil, state.Validators.GetProposer().Address) - return block -} - -type testApp struct { - abci.BaseApplication -} diff --git a/config/config.go b/config/config.go index 514a23b88..c7e012978 100644 --- a/config/config.go +++ b/config/config.go @@ -788,8 +788,6 @@ func (cfg *FastSyncConfig) ValidateBasic() error { switch cfg.Version { case "v0": return nil - case "v1": - return nil case "v2": return nil default: diff --git a/config/config_test.go b/config/config_test.go index 6a46933bc..78657d848 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -133,7 +133,7 @@ func TestFastSyncConfigValidateBasic(t *testing.T) { assert.NoError(t, cfg.ValidateBasic()) // tamper with version - cfg.Version = "v1" + cfg.Version = "v2" assert.NoError(t, cfg.ValidateBasic()) cfg.Version = "invalid" diff --git a/config/toml.go b/config/toml.go index 19973a3fb..99ed8d13d 100644 --- a/config/toml.go +++ b/config/toml.go @@ -374,7 +374,6 @@ temp_dir = "{{ .StateSync.TempDir }}" # Fast Sync version to use: # 1) "v0" (default) - the legacy fast sync implementation -# 2) "v1" - refactor of v0 version for better testability # 2) "v2" - complete redesign of v0, optimized for testability & readability version = "{{ .FastSync.Version }}" diff --git a/docs/nodes/configuration.md b/docs/nodes/configuration.md index 4c78d1b39..49a69c5bf 100644 --- a/docs/nodes/configuration.md +++ b/docs/nodes/configuration.md @@ -322,7 +322,6 @@ temp_dir = "" # Fast Sync version to use: # 1) "v0" (default) - the legacy fast sync implementation -# 2) "v1" - refactor of v0 version for better testability # 2) "v2" - complete redesign of v0, optimized for testability & readability version = "v0" diff --git a/docs/tendermint-core/fast-sync.md b/docs/tendermint-core/fast-sync.md index 9bbeade38..a36a158c8 100644 --- a/docs/tendermint-core/fast-sync.md +++ b/docs/tendermint-core/fast-sync.md @@ -28,7 +28,7 @@ has at least one peer and it's height is at least as high as the max reported peer height. See [the IsCaughtUp method](https://github.com/tendermint/tendermint/blob/b467515719e686e4678e6da4e102f32a491b85a0/blockchain/pool.go#L128). -Note: There are three versions of fast sync. We recommend using v0 as v1 and v2 are still in beta. +Note: There are three versions of fast sync. We recommend using v0 as v2 is still in beta. If you would like to use a different version you can do so by changing the version in the `config.toml`: ```toml @@ -39,8 +39,7 @@ Note: There are three versions of fast sync. We recommend using v0 as v1 and v2 # Fast Sync version to use: # 1) "v0" (default) - the legacy fast sync implementation -# 2) "v1" - refactor of v0 version for better testability -# 2) "v2" - complete redesign of v0, optimized for testability & readability +# 2) "v2" - complete redesign of v0, optimized for testability & readability version = "v0" ``` diff --git a/node/node.go b/node/node.go index 83064dbae..fdd6debdc 100644 --- a/node/node.go +++ b/node/node.go @@ -19,7 +19,6 @@ import ( abci "github.com/tendermint/tendermint/abci/types" bcv0 "github.com/tendermint/tendermint/blockchain/v0" - bcv1 "github.com/tendermint/tendermint/blockchain/v1" bcv2 "github.com/tendermint/tendermint/blockchain/v2" cfg "github.com/tendermint/tendermint/config" cs "github.com/tendermint/tendermint/consensus" @@ -128,7 +127,7 @@ func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider { // Option sets a parameter for the node. type Option func(*Node) -// Temporary interface for switching to fast sync, we should get rid of v0 and v1 reactors. +// Temporary interface for switching to fast sync, we should get rid of v0. // See: https://github.com/tendermint/tendermint/issues/4595 type fastSyncReactor interface { SwitchToFastSync(sm.State) error @@ -366,8 +365,6 @@ func createBlockchainReactor(config *cfg.Config, switch config.FastSync.Version { case "v0": bcReactor = bcv0.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) - case "v1": - bcReactor = bcv1.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) case "v2": bcReactor = bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) default: @@ -1238,8 +1235,6 @@ func makeNodeInfo( switch config.FastSync.Version { case "v0": bcChannel = bcv0.BlockchainChannel - case "v1": - bcChannel = bcv1.BlockchainChannel case "v2": bcChannel = bcv2.BlockchainChannel default: diff --git a/test/e2e/generator/generate.go b/test/e2e/generator/generate.go index b67500168..063e3dc85 100644 --- a/test/e2e/generator/generate.go +++ b/test/e2e/generator/generate.go @@ -30,7 +30,7 @@ var ( nodeDatabases = uniformChoice{"goleveldb", "cleveldb", "rocksdb", "boltdb", "badgerdb"} nodeABCIProtocols = uniformChoice{"unix", "tcp", "grpc", "builtin"} nodePrivvalProtocols = uniformChoice{"file", "unix", "tcp"} - nodeFastSyncs = uniformChoice{"", "v0", "v1", "v2"} + nodeFastSyncs = uniformChoice{"", "v0", "v2"} nodeStateSyncs = uniformChoice{false, true} nodePersistIntervals = uniformChoice{0, 1, 5} nodeSnapshotIntervals = uniformChoice{0, 3} diff --git a/test/e2e/networks/ci.toml b/test/e2e/networks/ci.toml index 91ed89d63..520762ce9 100644 --- a/test/e2e/networks/ci.toml +++ b/test/e2e/networks/ci.toml @@ -73,7 +73,7 @@ perturb = ["kill", "pause", "disconnect", "restart"] [node.full01] start_at = 1010 mode = "full" -fast_sync = "v1" +fast_sync = "v2" persistent_peers = ["validator01", "validator02", "validator03", "validator04", "validator05"] retain_blocks = 1 perturb = ["restart"] diff --git a/test/e2e/pkg/manifest.go b/test/e2e/pkg/manifest.go index 3fbf14558..e94fd0790 100644 --- a/test/e2e/pkg/manifest.go +++ b/test/e2e/pkg/manifest.go @@ -87,7 +87,7 @@ type ManifestNode struct { // runner will wait for the network to reach at least this block height. StartAt int64 `toml:"start_at"` - // FastSync specifies the fast sync mode: "" (disable), "v0", "v1", or "v2". + // FastSync specifies the fast sync mode: "" (disable), "v0" or "v2". // Defaults to disabled. FastSync string `toml:"fast_sync"` diff --git a/test/e2e/pkg/testnet.go b/test/e2e/pkg/testnet.go index df2be5699..eaae577b1 100644 --- a/test/e2e/pkg/testnet.go +++ b/test/e2e/pkg/testnet.go @@ -307,7 +307,7 @@ func (n Node) Validate(testnet Testnet) error { } } switch n.FastSync { - case "", "v0", "v1", "v2": + case "", "v0", "v2": default: return fmt.Errorf("invalid fast sync setting %q", n.FastSync) } diff --git a/test/maverick/node/node.go b/test/maverick/node/node.go index f697fd7a1..b9ae86143 100644 --- a/test/maverick/node/node.go +++ b/test/maverick/node/node.go @@ -20,7 +20,6 @@ import ( abci "github.com/tendermint/tendermint/abci/types" bcv0 "github.com/tendermint/tendermint/blockchain/v0" - bcv1 "github.com/tendermint/tendermint/blockchain/v1" bcv2 "github.com/tendermint/tendermint/blockchain/v2" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/consensus" @@ -157,7 +156,7 @@ func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider { // Option sets a parameter for the node. type Option func(*Node) -// Temporary interface for switching to fast sync, we should get rid of v0 and v1 reactors. +// Temporary interface for switching to fast sync, we should get rid of v0. // See: https://github.com/tendermint/tendermint/issues/4595 type fastSyncReactor interface { SwitchToFastSync(sm.State) error @@ -408,8 +407,6 @@ func createBlockchainReactor(config *cfg.Config, switch config.FastSync.Version { case "v0": bcReactor = bcv0.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) - case "v1": - bcReactor = bcv1.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) case "v2": bcReactor = bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) default: @@ -1280,8 +1277,6 @@ func makeNodeInfo( switch config.FastSync.Version { case "v0": bcChannel = bcv0.BlockchainChannel - case "v1": - bcChannel = bcv1.BlockchainChannel case "v2": bcChannel = bcv2.BlockchainChannel default: