Browse Source

statesync: use initial height as a floor to backfilling (#6709)

pull/6714/head
Callum Waters 3 years ago
committed by GitHub
parent
commit
a12e2bbb60
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 72 additions and 26 deletions
  1. +21
    -18
      internal/statesync/block_queue.go
  2. +45
    -5
      internal/statesync/block_queue_test.go
  3. +5
    -3
      internal/statesync/reactor.go
  4. +1
    -0
      internal/statesync/reactor_test.go

+ 21
- 18
internal/statesync/block_queue.go View File

@ -23,9 +23,10 @@ type blockQueue struct {
verifyHeight int64
// termination conditions
stopHeight int64
stopTime time.Time
terminal *types.LightBlock
initialHeight int64
stopHeight int64
stopTime time.Time
terminal *types.LightBlock
// track failed heights so we know what blocks to try fetch again
failed *maxIntHeap
@ -45,21 +46,22 @@ type blockQueue struct {
}
func newBlockQueue(
startHeight, stopHeight int64,
startHeight, stopHeight, initialHeight int64,
stopTime time.Time,
maxRetries int,
) *blockQueue {
return &blockQueue{
stopHeight: stopHeight,
stopTime: stopTime,
fetchHeight: startHeight,
verifyHeight: startHeight,
pending: make(map[int64]lightBlockResponse),
failed: &maxIntHeap{},
retries: 0,
maxRetries: maxRetries,
waiters: make([]chan int64, 0),
doneCh: make(chan struct{}),
stopHeight: stopHeight,
initialHeight: initialHeight,
stopTime: stopTime,
fetchHeight: startHeight,
verifyHeight: startHeight,
pending: make(map[int64]lightBlockResponse),
failed: &maxIntHeap{},
retries: 0,
maxRetries: maxRetries,
waiters: make([]chan int64, 0),
doneCh: make(chan struct{}),
}
}
@ -93,9 +95,10 @@ func (q *blockQueue) add(l lightBlockResponse) {
q.pending[l.block.Height] = l
}
// Lastly, if the incoming block is past the stop time and stop height then
// we mark it as the terminal block
if l.block.Height <= q.stopHeight && l.block.Time.Before(q.stopTime) {
// Lastly, if the incoming block is past the stop time and stop height or
// is equal to the initial height then we mark it as the terminal block.
if l.block.Height <= q.stopHeight && l.block.Time.Before(q.stopTime) ||
l.block.Height == q.initialHeight {
q.terminal = l.block
}
}
@ -115,7 +118,7 @@ func (q *blockQueue) nextHeight() <-chan int64 {
return ch
}
if q.terminal == nil {
if q.terminal == nil && q.fetchHeight >= q.initialHeight {
// return and decrement the fetch height
ch <- q.fetchHeight
q.fetchHeight--


+ 45
- 5
internal/statesync/block_queue_test.go View File

@ -25,7 +25,7 @@ func TestBlockQueueBasic(t *testing.T) {
peerID, err := types.NewNodeID("0011223344556677889900112233445566778899")
require.NoError(t, err)
queue := newBlockQueue(startHeight, stopHeight, stopTime, 1)
queue := newBlockQueue(startHeight, stopHeight, 1, stopTime, 1)
wg := &sync.WaitGroup{}
// asynchronously fetch blocks and add it to the queue
@ -72,7 +72,7 @@ func TestBlockQueueWithFailures(t *testing.T) {
peerID, err := types.NewNodeID("0011223344556677889900112233445566778899")
require.NoError(t, err)
queue := newBlockQueue(startHeight, stopHeight, stopTime, 200)
queue := newBlockQueue(startHeight, stopHeight, 1, stopTime, 200)
wg := &sync.WaitGroup{}
failureRate := 4
@ -121,7 +121,7 @@ func TestBlockQueueWithFailures(t *testing.T) {
func TestBlockQueueBlocks(t *testing.T) {
peerID, err := types.NewNodeID("0011223344556677889900112233445566778899")
require.NoError(t, err)
queue := newBlockQueue(startHeight, stopHeight, stopTime, 2)
queue := newBlockQueue(startHeight, stopHeight, 1, stopTime, 2)
expectedHeight := startHeight
retryHeight := stopHeight + 2
@ -168,7 +168,7 @@ loop:
func TestBlockQueueAcceptsNoMoreBlocks(t *testing.T) {
peerID, err := types.NewNodeID("0011223344556677889900112233445566778899")
require.NoError(t, err)
queue := newBlockQueue(startHeight, stopHeight, stopTime, 1)
queue := newBlockQueue(startHeight, stopHeight, 1, stopTime, 1)
defer queue.close()
loop:
@ -194,7 +194,7 @@ func TestBlockQueueStopTime(t *testing.T) {
peerID, err := types.NewNodeID("0011223344556677889900112233445566778899")
require.NoError(t, err)
queue := newBlockQueue(startHeight, stopHeight, stopTime, 1)
queue := newBlockQueue(startHeight, stopHeight, 1, stopTime, 1)
wg := &sync.WaitGroup{}
baseTime := stopTime.Add(-50 * time.Second)
@ -233,6 +233,46 @@ func TestBlockQueueStopTime(t *testing.T) {
}
}
func TestBlockQueueInitialHeight(t *testing.T) {
peerID, err := types.NewNodeID("0011223344556677889900112233445566778899")
require.NoError(t, err)
const initialHeight int64 = 120
queue := newBlockQueue(startHeight, stopHeight, initialHeight, stopTime, 1)
wg := &sync.WaitGroup{}
// asynchronously fetch blocks and add it to the queue
for i := 0; i <= numWorkers; i++ {
wg.Add(1)
go func() {
for {
select {
case height := <-queue.nextHeight():
require.GreaterOrEqual(t, height, initialHeight)
queue.add(mockLBResp(t, peerID, height, endTime))
case <-queue.done():
wg.Done()
return
}
}
}()
}
loop:
for {
select {
case <-queue.done():
wg.Wait()
require.NoError(t, queue.error())
break loop
case resp := <-queue.verifyNext():
require.GreaterOrEqual(t, resp.block.Height, initialHeight)
queue.success(resp.block.Height)
}
}
}
func mockLBResp(t *testing.T, peer types.NodeID, height int64, time time.Time) lightBlockResponse {
return lightBlockResponse{
block: mockLB(t, height, time, factory.MakeBlockID()),


+ 5
- 3
internal/statesync/reactor.go View File

@ -281,7 +281,9 @@ func (r *Reactor) Backfill(state sm.State) error {
return r.backfill(
context.Background(),
state.ChainID,
state.LastBlockHeight, stopHeight,
state.LastBlockHeight,
stopHeight,
state.InitialHeight,
state.LastBlockID,
stopTime,
)
@ -290,7 +292,7 @@ func (r *Reactor) Backfill(state sm.State) error {
func (r *Reactor) backfill(
ctx context.Context,
chainID string,
startHeight, stopHeight int64,
startHeight, stopHeight, initialHeight int64,
trustedBlockID types.BlockID,
stopTime time.Time,
) error {
@ -303,7 +305,7 @@ func (r *Reactor) backfill(
lastChangeHeight int64 = startHeight
)
queue := newBlockQueue(startHeight, stopHeight, stopTime, maxLightBlockRequestRetries)
queue := newBlockQueue(startHeight, stopHeight, initialHeight, stopTime, maxLightBlockRequestRetries)
// fetch light blocks across four workers. The aim with deploying concurrent
// workers is to equate the network messaging time with the verification


+ 1
- 0
internal/statesync/reactor_test.go View File

@ -463,6 +463,7 @@ func TestReactor_Backfill(t *testing.T) {
factory.DefaultTestChainID,
startHeight,
stopHeight,
1,
factory.MakeBlockIDWithHash(chain[startHeight].Header.Hash()),
stopTime,
)


Loading…
Cancel
Save