|
@ -133,27 +133,38 @@ func TestSyncer_SyncAny(t *testing.T) { |
|
|
chunkRequests := make(map[uint32]int) |
|
|
chunkRequests := make(map[uint32]int) |
|
|
chunkRequestsMtx := sync.Mutex{} |
|
|
chunkRequestsMtx := sync.Mutex{} |
|
|
|
|
|
|
|
|
var wg sync.WaitGroup |
|
|
|
|
|
wg.Add(4) |
|
|
|
|
|
|
|
|
chunkProcessDone := make(chan struct{}) |
|
|
|
|
|
|
|
|
go func() { |
|
|
go func() { |
|
|
for e := range rts.chunkOutCh { |
|
|
|
|
|
msg, ok := e.Message.(*ssproto.ChunkRequest) |
|
|
|
|
|
assert.True(t, ok) |
|
|
|
|
|
|
|
|
|
|
|
assert.EqualValues(t, 1, msg.Height) |
|
|
|
|
|
assert.EqualValues(t, 1, msg.Format) |
|
|
|
|
|
assert.LessOrEqual(t, msg.Index, uint32(len(chunks))) |
|
|
|
|
|
|
|
|
|
|
|
added, err := rts.syncer.AddChunk(chunks[msg.Index]) |
|
|
|
|
|
assert.NoError(t, err) |
|
|
|
|
|
assert.True(t, added) |
|
|
|
|
|
|
|
|
|
|
|
chunkRequestsMtx.Lock() |
|
|
|
|
|
chunkRequests[msg.Index]++ |
|
|
|
|
|
chunkRequestsMtx.Unlock() |
|
|
|
|
|
|
|
|
defer close(chunkProcessDone) |
|
|
|
|
|
var seen int |
|
|
|
|
|
for { |
|
|
|
|
|
if seen >= 4 { |
|
|
|
|
|
return |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
wg.Done() |
|
|
|
|
|
|
|
|
select { |
|
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
|
t.Logf("sent %d chunks", seen) |
|
|
|
|
|
return |
|
|
|
|
|
case e := <-rts.chunkOutCh: |
|
|
|
|
|
msg, ok := e.Message.(*ssproto.ChunkRequest) |
|
|
|
|
|
assert.True(t, ok) |
|
|
|
|
|
|
|
|
|
|
|
assert.EqualValues(t, 1, msg.Height) |
|
|
|
|
|
assert.EqualValues(t, 1, msg.Format) |
|
|
|
|
|
assert.LessOrEqual(t, msg.Index, uint32(len(chunks))) |
|
|
|
|
|
|
|
|
|
|
|
added, err := rts.syncer.AddChunk(chunks[msg.Index]) |
|
|
|
|
|
assert.NoError(t, err) |
|
|
|
|
|
assert.True(t, added) |
|
|
|
|
|
|
|
|
|
|
|
chunkRequestsMtx.Lock() |
|
|
|
|
|
chunkRequests[msg.Index]++ |
|
|
|
|
|
chunkRequestsMtx.Unlock() |
|
|
|
|
|
seen++ |
|
|
|
|
|
t.Logf("added chunk (%d of 4): %d", seen, msg.Index) |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
}() |
|
|
}() |
|
|
|
|
|
|
|
@ -186,7 +197,7 @@ func TestSyncer_SyncAny(t *testing.T) { |
|
|
newState, lastCommit, err := rts.syncer.SyncAny(ctx, 0, func() error { return nil }) |
|
|
newState, lastCommit, err := rts.syncer.SyncAny(ctx, 0, func() error { return nil }) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
wg.Wait() |
|
|
|
|
|
|
|
|
<-chunkProcessDone |
|
|
|
|
|
|
|
|
chunkRequestsMtx.Lock() |
|
|
chunkRequestsMtx.Lock() |
|
|
require.Equal(t, map[uint32]int{0: 1, 1: 2, 2: 1}, chunkRequests) |
|
|
require.Equal(t, map[uint32]int{0: 1, 1: 2, 2: 1}, chunkRequests) |
|
|