diff --git a/internal/statesync/block_queue.go b/internal/statesync/block_queue.go index 73f3547c3..56ed3c376 100644 --- a/internal/statesync/block_queue.go +++ b/internal/statesync/block_queue.go @@ -23,9 +23,10 @@ type blockQueue struct { verifyHeight int64 // termination conditions - stopHeight int64 - stopTime time.Time - terminal *types.LightBlock + initialHeight int64 + stopHeight int64 + stopTime time.Time + terminal *types.LightBlock // track failed heights so we know what blocks to try fetch again failed *maxIntHeap @@ -45,21 +46,22 @@ type blockQueue struct { } func newBlockQueue( - startHeight, stopHeight int64, + startHeight, stopHeight, initialHeight int64, stopTime time.Time, maxRetries int, ) *blockQueue { return &blockQueue{ - stopHeight: stopHeight, - stopTime: stopTime, - fetchHeight: startHeight, - verifyHeight: startHeight, - pending: make(map[int64]lightBlockResponse), - failed: &maxIntHeap{}, - retries: 0, - maxRetries: maxRetries, - waiters: make([]chan int64, 0), - doneCh: make(chan struct{}), + stopHeight: stopHeight, + initialHeight: initialHeight, + stopTime: stopTime, + fetchHeight: startHeight, + verifyHeight: startHeight, + pending: make(map[int64]lightBlockResponse), + failed: &maxIntHeap{}, + retries: 0, + maxRetries: maxRetries, + waiters: make([]chan int64, 0), + doneCh: make(chan struct{}), } } @@ -93,9 +95,10 @@ func (q *blockQueue) add(l lightBlockResponse) { q.pending[l.block.Height] = l } - // Lastly, if the incoming block is past the stop time and stop height then - // we mark it as the terminal block - if l.block.Height <= q.stopHeight && l.block.Time.Before(q.stopTime) { + // Lastly, if the incoming block is past the stop time and stop height or + // is equal to the initial height then we mark it as the terminal block. + if l.block.Height <= q.stopHeight && l.block.Time.Before(q.stopTime) || + l.block.Height == q.initialHeight { q.terminal = l.block } } @@ -115,7 +118,7 @@ func (q *blockQueue) nextHeight() <-chan int64 { return ch } - if q.terminal == nil { + if q.terminal == nil && q.fetchHeight >= q.initialHeight { // return and decrement the fetch height ch <- q.fetchHeight q.fetchHeight-- diff --git a/internal/statesync/block_queue_test.go b/internal/statesync/block_queue_test.go index 4f8bbc28c..3a4c71e4e 100644 --- a/internal/statesync/block_queue_test.go +++ b/internal/statesync/block_queue_test.go @@ -25,7 +25,7 @@ func TestBlockQueueBasic(t *testing.T) { peerID, err := types.NewNodeID("0011223344556677889900112233445566778899") require.NoError(t, err) - queue := newBlockQueue(startHeight, stopHeight, stopTime, 1) + queue := newBlockQueue(startHeight, stopHeight, 1, stopTime, 1) wg := &sync.WaitGroup{} // asynchronously fetch blocks and add it to the queue @@ -72,7 +72,7 @@ func TestBlockQueueWithFailures(t *testing.T) { peerID, err := types.NewNodeID("0011223344556677889900112233445566778899") require.NoError(t, err) - queue := newBlockQueue(startHeight, stopHeight, stopTime, 200) + queue := newBlockQueue(startHeight, stopHeight, 1, stopTime, 200) wg := &sync.WaitGroup{} failureRate := 4 @@ -121,7 +121,7 @@ func TestBlockQueueWithFailures(t *testing.T) { func TestBlockQueueBlocks(t *testing.T) { peerID, err := types.NewNodeID("0011223344556677889900112233445566778899") require.NoError(t, err) - queue := newBlockQueue(startHeight, stopHeight, stopTime, 2) + queue := newBlockQueue(startHeight, stopHeight, 1, stopTime, 2) expectedHeight := startHeight retryHeight := stopHeight + 2 @@ -168,7 +168,7 @@ loop: func TestBlockQueueAcceptsNoMoreBlocks(t *testing.T) { peerID, err := types.NewNodeID("0011223344556677889900112233445566778899") require.NoError(t, err) - queue := newBlockQueue(startHeight, stopHeight, stopTime, 1) + queue := newBlockQueue(startHeight, stopHeight, 1, stopTime, 1) defer queue.close() loop: @@ -194,7 +194,7 @@ func TestBlockQueueStopTime(t *testing.T) { peerID, err := types.NewNodeID("0011223344556677889900112233445566778899") require.NoError(t, err) - queue := newBlockQueue(startHeight, stopHeight, stopTime, 1) + queue := newBlockQueue(startHeight, stopHeight, 1, stopTime, 1) wg := &sync.WaitGroup{} baseTime := stopTime.Add(-50 * time.Second) @@ -233,6 +233,46 @@ func TestBlockQueueStopTime(t *testing.T) { } } +func TestBlockQueueInitialHeight(t *testing.T) { + peerID, err := types.NewNodeID("0011223344556677889900112233445566778899") + require.NoError(t, err) + const initialHeight int64 = 120 + + queue := newBlockQueue(startHeight, stopHeight, initialHeight, stopTime, 1) + wg := &sync.WaitGroup{} + + // asynchronously fetch blocks and add it to the queue + for i := 0; i <= numWorkers; i++ { + wg.Add(1) + go func() { + for { + select { + case height := <-queue.nextHeight(): + require.GreaterOrEqual(t, height, initialHeight) + queue.add(mockLBResp(t, peerID, height, endTime)) + case <-queue.done(): + wg.Done() + return + } + } + }() + } + +loop: + for { + select { + case <-queue.done(): + wg.Wait() + require.NoError(t, queue.error()) + break loop + + case resp := <-queue.verifyNext(): + require.GreaterOrEqual(t, resp.block.Height, initialHeight) + queue.success(resp.block.Height) + } + } +} + func mockLBResp(t *testing.T, peer types.NodeID, height int64, time time.Time) lightBlockResponse { return lightBlockResponse{ block: mockLB(t, height, time, factory.MakeBlockID()), diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index f50651b76..c75cbe854 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -281,7 +281,9 @@ func (r *Reactor) Backfill(state sm.State) error { return r.backfill( context.Background(), state.ChainID, - state.LastBlockHeight, stopHeight, + state.LastBlockHeight, + stopHeight, + state.InitialHeight, state.LastBlockID, stopTime, ) @@ -290,7 +292,7 @@ func (r *Reactor) Backfill(state sm.State) error { func (r *Reactor) backfill( ctx context.Context, chainID string, - startHeight, stopHeight int64, + startHeight, stopHeight, initialHeight int64, trustedBlockID types.BlockID, stopTime time.Time, ) error { @@ -303,7 +305,7 @@ func (r *Reactor) backfill( lastChangeHeight int64 = startHeight ) - queue := newBlockQueue(startHeight, stopHeight, stopTime, maxLightBlockRequestRetries) + queue := newBlockQueue(startHeight, stopHeight, initialHeight, stopTime, maxLightBlockRequestRetries) // fetch light blocks across four workers. The aim with deploying concurrent // workers is to equate the network messaging time with the verification diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index 22b016f89..9bff72679 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -463,6 +463,7 @@ func TestReactor_Backfill(t *testing.T) { factory.DefaultTestChainID, startHeight, stopHeight, + 1, factory.MakeBlockIDWithHash(chain[startHeight].Header.Hash()), stopTime, )