|
@ -6,15 +6,12 @@ import ( |
|
|
"testing" |
|
|
"testing" |
|
|
"time" |
|
|
"time" |
|
|
|
|
|
|
|
|
"github.com/stretchr/testify/assert" |
|
|
|
|
|
"github.com/stretchr/testify/mock" |
|
|
"github.com/stretchr/testify/mock" |
|
|
"github.com/stretchr/testify/require" |
|
|
"github.com/stretchr/testify/require" |
|
|
|
|
|
|
|
|
abci "github.com/tendermint/tendermint/abci/types" |
|
|
abci "github.com/tendermint/tendermint/abci/types" |
|
|
"github.com/tendermint/tendermint/libs/log" |
|
|
|
|
|
tmsync "github.com/tendermint/tendermint/libs/sync" |
|
|
tmsync "github.com/tendermint/tendermint/libs/sync" |
|
|
"github.com/tendermint/tendermint/p2p" |
|
|
"github.com/tendermint/tendermint/p2p" |
|
|
p2pmocks "github.com/tendermint/tendermint/p2p/mocks" |
|
|
|
|
|
tmstate "github.com/tendermint/tendermint/proto/tendermint/state" |
|
|
tmstate "github.com/tendermint/tendermint/proto/tendermint/state" |
|
|
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync" |
|
|
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync" |
|
|
tmversion "github.com/tendermint/tendermint/proto/tendermint/version" |
|
|
tmversion "github.com/tendermint/tendermint/proto/tendermint/version" |
|
@ -28,23 +25,6 @@ import ( |
|
|
|
|
|
|
|
|
var ctx = context.Background() |
|
|
var ctx = context.Background() |
|
|
|
|
|
|
|
|
// Sets up a basic syncer that can be used to test OfferSnapshot requests
|
|
|
|
|
|
func setupOfferSyncer(t *testing.T) (*syncer, *proxymocks.AppConnSnapshot) { |
|
|
|
|
|
connQuery := &proxymocks.AppConnQuery{} |
|
|
|
|
|
connSnapshot := &proxymocks.AppConnSnapshot{} |
|
|
|
|
|
stateProvider := &mocks.StateProvider{} |
|
|
|
|
|
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) |
|
|
|
|
|
syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "") |
|
|
|
|
|
return syncer, connSnapshot |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Sets up a simple peer mock with an ID
|
|
|
|
|
|
func simplePeer(id string) *p2pmocks.Peer { |
|
|
|
|
|
peer := &p2pmocks.Peer{} |
|
|
|
|
|
peer.On("ID").Return(p2p.ID(id)) |
|
|
|
|
|
return peer |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func TestSyncer_SyncAny(t *testing.T) { |
|
|
func TestSyncer_SyncAny(t *testing.T) { |
|
|
state := sm.State{ |
|
|
state := sm.State{ |
|
|
ChainID: "chain", |
|
|
ChainID: "chain", |
|
@ -53,7 +33,6 @@ func TestSyncer_SyncAny(t *testing.T) { |
|
|
Block: version.BlockProtocol, |
|
|
Block: version.BlockProtocol, |
|
|
App: 0, |
|
|
App: 0, |
|
|
}, |
|
|
}, |
|
|
|
|
|
|
|
|
Software: version.TMCoreSemVer, |
|
|
Software: version.TMCoreSemVer, |
|
|
}, |
|
|
}, |
|
|
|
|
|
|
|
@ -87,38 +66,39 @@ func TestSyncer_SyncAny(t *testing.T) { |
|
|
connSnapshot := &proxymocks.AppConnSnapshot{} |
|
|
connSnapshot := &proxymocks.AppConnSnapshot{} |
|
|
connQuery := &proxymocks.AppConnQuery{} |
|
|
connQuery := &proxymocks.AppConnQuery{} |
|
|
|
|
|
|
|
|
syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "") |
|
|
|
|
|
|
|
|
peerAID := p2p.PeerID{0xAA} |
|
|
|
|
|
peerBID := p2p.PeerID{0xBB} |
|
|
|
|
|
|
|
|
|
|
|
rts := setup(t, connSnapshot, connQuery, stateProvider, 3) |
|
|
|
|
|
|
|
|
// Adding a chunk should error when no sync is in progress
|
|
|
// Adding a chunk should error when no sync is in progress
|
|
|
_, err := syncer.AddChunk(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{1}}) |
|
|
|
|
|
|
|
|
_, err := rts.syncer.AddChunk(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{1}}) |
|
|
require.Error(t, err) |
|
|
require.Error(t, err) |
|
|
|
|
|
|
|
|
// Adding a couple of peers should trigger snapshot discovery messages
|
|
|
// Adding a couple of peers should trigger snapshot discovery messages
|
|
|
peerA := &p2pmocks.Peer{} |
|
|
|
|
|
peerA.On("ID").Return(p2p.ID("a")) |
|
|
|
|
|
peerA.On("Send", SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{})).Return(true) |
|
|
|
|
|
syncer.AddPeer(peerA) |
|
|
|
|
|
peerA.AssertExpectations(t) |
|
|
|
|
|
|
|
|
|
|
|
peerB := &p2pmocks.Peer{} |
|
|
|
|
|
peerB.On("ID").Return(p2p.ID("b")) |
|
|
|
|
|
peerB.On("Send", SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{})).Return(true) |
|
|
|
|
|
syncer.AddPeer(peerB) |
|
|
|
|
|
peerB.AssertExpectations(t) |
|
|
|
|
|
|
|
|
rts.syncer.AddPeer(peerAID) |
|
|
|
|
|
e := <-rts.snapshotOutCh |
|
|
|
|
|
require.Equal(t, &ssproto.SnapshotsRequest{}, e.Message) |
|
|
|
|
|
require.Equal(t, peerAID, e.To) |
|
|
|
|
|
|
|
|
|
|
|
rts.syncer.AddPeer(peerBID) |
|
|
|
|
|
e = <-rts.snapshotOutCh |
|
|
|
|
|
require.Equal(t, &ssproto.SnapshotsRequest{}, e.Message) |
|
|
|
|
|
require.Equal(t, peerBID, e.To) |
|
|
|
|
|
|
|
|
// Both peers report back with snapshots. One of them also returns a snapshot we don't want, in
|
|
|
// Both peers report back with snapshots. One of them also returns a snapshot we don't want, in
|
|
|
// format 2, which will be rejected by the ABCI application.
|
|
|
// format 2, which will be rejected by the ABCI application.
|
|
|
new, err := syncer.AddSnapshot(peerA, s) |
|
|
|
|
|
|
|
|
new, err := rts.syncer.AddSnapshot(peerAID, s) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
assert.True(t, new) |
|
|
|
|
|
|
|
|
require.True(t, new) |
|
|
|
|
|
|
|
|
new, err = syncer.AddSnapshot(peerB, s) |
|
|
|
|
|
|
|
|
new, err = rts.syncer.AddSnapshot(peerBID, s) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
assert.False(t, new) |
|
|
|
|
|
|
|
|
require.False(t, new) |
|
|
|
|
|
|
|
|
new, err = syncer.AddSnapshot(peerB, &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1}}) |
|
|
|
|
|
|
|
|
new, err = rts.syncer.AddSnapshot(peerBID, &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1}}) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
assert.True(t, new) |
|
|
|
|
|
|
|
|
require.True(t, new) |
|
|
|
|
|
|
|
|
// We start a sync, with peers sending back chunks when requested. We first reject the snapshot
|
|
|
// We start a sync, with peers sending back chunks when requested. We first reject the snapshot
|
|
|
// with height 2 format 2, and accept the snapshot at height 1.
|
|
|
// with height 2 format 2, and accept the snapshot at height 1.
|
|
@ -144,24 +124,25 @@ func TestSyncer_SyncAny(t *testing.T) { |
|
|
|
|
|
|
|
|
chunkRequests := make(map[uint32]int) |
|
|
chunkRequests := make(map[uint32]int) |
|
|
chunkRequestsMtx := tmsync.Mutex{} |
|
|
chunkRequestsMtx := tmsync.Mutex{} |
|
|
onChunkRequest := func(args mock.Arguments) { |
|
|
|
|
|
pb, err := decodeMsg(args[1].([]byte)) |
|
|
|
|
|
assert.NoError(t, err) |
|
|
|
|
|
msg := pb.(*ssproto.ChunkRequest) |
|
|
|
|
|
assert.EqualValues(t, 1, msg.Height) |
|
|
|
|
|
assert.EqualValues(t, 1, msg.Format) |
|
|
|
|
|
assert.LessOrEqual(t, msg.Index, uint32(len(chunks))) |
|
|
|
|
|
|
|
|
|
|
|
added, err := syncer.AddChunk(chunks[msg.Index]) |
|
|
|
|
|
assert.NoError(t, err) |
|
|
|
|
|
assert.True(t, added) |
|
|
|
|
|
|
|
|
|
|
|
chunkRequestsMtx.Lock() |
|
|
|
|
|
chunkRequests[msg.Index]++ |
|
|
|
|
|
chunkRequestsMtx.Unlock() |
|
|
|
|
|
} |
|
|
|
|
|
peerA.On("Send", ChunkChannel, mock.Anything).Maybe().Run(onChunkRequest).Return(true) |
|
|
|
|
|
peerB.On("Send", ChunkChannel, mock.Anything).Maybe().Run(onChunkRequest).Return(true) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
go func() { |
|
|
|
|
|
for e := range rts.chunkOutCh { |
|
|
|
|
|
msg, ok := e.Message.(*ssproto.ChunkRequest) |
|
|
|
|
|
require.True(t, ok) |
|
|
|
|
|
|
|
|
|
|
|
require.EqualValues(t, 1, msg.Height) |
|
|
|
|
|
require.EqualValues(t, 1, msg.Format) |
|
|
|
|
|
require.LessOrEqual(t, msg.Index, uint32(len(chunks))) |
|
|
|
|
|
|
|
|
|
|
|
added, err := rts.syncer.AddChunk(chunks[msg.Index]) |
|
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
require.True(t, added) |
|
|
|
|
|
|
|
|
|
|
|
chunkRequestsMtx.Lock() |
|
|
|
|
|
chunkRequests[msg.Index]++ |
|
|
|
|
|
chunkRequestsMtx.Unlock() |
|
|
|
|
|
} |
|
|
|
|
|
}() |
|
|
|
|
|
|
|
|
// The first time we're applying chunk 2 we tell it to retry the snapshot and discard chunk 1,
|
|
|
// The first time we're applying chunk 2 we tell it to retry the snapshot and discard chunk 1,
|
|
|
// which should cause it to keep the existing chunk 0 and 2, and restart restoration from
|
|
|
// which should cause it to keep the existing chunk 0 and 2, and restart restoration from
|
|
@ -189,113 +170,140 @@ func TestSyncer_SyncAny(t *testing.T) { |
|
|
LastBlockAppHash: []byte("app_hash"), |
|
|
LastBlockAppHash: []byte("app_hash"), |
|
|
}, nil) |
|
|
}, nil) |
|
|
|
|
|
|
|
|
newState, lastCommit, err := syncer.SyncAny(0) |
|
|
|
|
|
|
|
|
newState, lastCommit, err := rts.syncer.SyncAny(0) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
time.Sleep(50 * time.Millisecond) // wait for peers to receive requests
|
|
|
time.Sleep(50 * time.Millisecond) // wait for peers to receive requests
|
|
|
|
|
|
|
|
|
chunkRequestsMtx.Lock() |
|
|
chunkRequestsMtx.Lock() |
|
|
assert.Equal(t, map[uint32]int{0: 1, 1: 2, 2: 1}, chunkRequests) |
|
|
|
|
|
|
|
|
require.Equal(t, map[uint32]int{0: 1, 1: 2, 2: 1}, chunkRequests) |
|
|
chunkRequestsMtx.Unlock() |
|
|
chunkRequestsMtx.Unlock() |
|
|
|
|
|
|
|
|
// The syncer should have updated the state app version from the ABCI info response.
|
|
|
// The syncer should have updated the state app version from the ABCI info response.
|
|
|
expectState := state |
|
|
expectState := state |
|
|
expectState.Version.Consensus.App = 9 |
|
|
expectState.Version.Consensus.App = 9 |
|
|
|
|
|
|
|
|
assert.Equal(t, expectState, newState) |
|
|
|
|
|
assert.Equal(t, commit, lastCommit) |
|
|
|
|
|
|
|
|
require.Equal(t, expectState, newState) |
|
|
|
|
|
require.Equal(t, commit, lastCommit) |
|
|
|
|
|
|
|
|
connSnapshot.AssertExpectations(t) |
|
|
connSnapshot.AssertExpectations(t) |
|
|
connQuery.AssertExpectations(t) |
|
|
connQuery.AssertExpectations(t) |
|
|
peerA.AssertExpectations(t) |
|
|
|
|
|
peerB.AssertExpectations(t) |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func TestSyncer_SyncAny_noSnapshots(t *testing.T) { |
|
|
func TestSyncer_SyncAny_noSnapshots(t *testing.T) { |
|
|
syncer, _ := setupOfferSyncer(t) |
|
|
|
|
|
_, _, err := syncer.SyncAny(0) |
|
|
|
|
|
assert.Equal(t, errNoSnapshots, err) |
|
|
|
|
|
|
|
|
stateProvider := &mocks.StateProvider{} |
|
|
|
|
|
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) |
|
|
|
|
|
|
|
|
|
|
|
rts := setup(t, nil, nil, stateProvider, 2) |
|
|
|
|
|
|
|
|
|
|
|
_, _, err := rts.syncer.SyncAny(0) |
|
|
|
|
|
require.Equal(t, errNoSnapshots, err) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func TestSyncer_SyncAny_abort(t *testing.T) { |
|
|
func TestSyncer_SyncAny_abort(t *testing.T) { |
|
|
syncer, connSnapshot := setupOfferSyncer(t) |
|
|
|
|
|
|
|
|
stateProvider := &mocks.StateProvider{} |
|
|
|
|
|
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) |
|
|
|
|
|
|
|
|
|
|
|
rts := setup(t, nil, nil, stateProvider, 2) |
|
|
|
|
|
|
|
|
s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}} |
|
|
s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}} |
|
|
_, err := syncer.AddSnapshot(simplePeer("id"), s) |
|
|
|
|
|
|
|
|
peerID := p2p.PeerID{0xAA} |
|
|
|
|
|
|
|
|
|
|
|
_, err := rts.syncer.AddSnapshot(peerID, s) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
connSnapshot.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{ |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{ |
|
|
Snapshot: toABCI(s), AppHash: []byte("app_hash"), |
|
|
Snapshot: toABCI(s), AppHash: []byte("app_hash"), |
|
|
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil) |
|
|
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil) |
|
|
|
|
|
|
|
|
_, _, err = syncer.SyncAny(0) |
|
|
|
|
|
assert.Equal(t, errAbort, err) |
|
|
|
|
|
connSnapshot.AssertExpectations(t) |
|
|
|
|
|
|
|
|
_, _, err = rts.syncer.SyncAny(0) |
|
|
|
|
|
require.Equal(t, errAbort, err) |
|
|
|
|
|
rts.conn.AssertExpectations(t) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func TestSyncer_SyncAny_reject(t *testing.T) { |
|
|
func TestSyncer_SyncAny_reject(t *testing.T) { |
|
|
syncer, connSnapshot := setupOfferSyncer(t) |
|
|
|
|
|
|
|
|
stateProvider := &mocks.StateProvider{} |
|
|
|
|
|
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) |
|
|
|
|
|
|
|
|
|
|
|
rts := setup(t, nil, nil, stateProvider, 2) |
|
|
|
|
|
|
|
|
// s22 is tried first, then s12, then s11, then errNoSnapshots
|
|
|
// s22 is tried first, then s12, then s11, then errNoSnapshots
|
|
|
s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}} |
|
|
s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}} |
|
|
s12 := &snapshot{Height: 1, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}} |
|
|
s12 := &snapshot{Height: 1, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}} |
|
|
s11 := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}} |
|
|
s11 := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}} |
|
|
_, err := syncer.AddSnapshot(simplePeer("id"), s22) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
peerID := p2p.PeerID{0xAA} |
|
|
|
|
|
|
|
|
|
|
|
_, err := rts.syncer.AddSnapshot(peerID, s22) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
_, err = syncer.AddSnapshot(simplePeer("id"), s12) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_, err = rts.syncer.AddSnapshot(peerID, s12) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
_, err = syncer.AddSnapshot(simplePeer("id"), s11) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_, err = rts.syncer.AddSnapshot(peerID, s11) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
connSnapshot.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{ |
|
|
|
|
|
|
|
|
rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{ |
|
|
Snapshot: toABCI(s22), AppHash: []byte("app_hash"), |
|
|
Snapshot: toABCI(s22), AppHash: []byte("app_hash"), |
|
|
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil) |
|
|
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil) |
|
|
|
|
|
|
|
|
connSnapshot.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{ |
|
|
|
|
|
|
|
|
rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{ |
|
|
Snapshot: toABCI(s12), AppHash: []byte("app_hash"), |
|
|
Snapshot: toABCI(s12), AppHash: []byte("app_hash"), |
|
|
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil) |
|
|
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil) |
|
|
|
|
|
|
|
|
connSnapshot.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{ |
|
|
|
|
|
|
|
|
rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{ |
|
|
Snapshot: toABCI(s11), AppHash: []byte("app_hash"), |
|
|
Snapshot: toABCI(s11), AppHash: []byte("app_hash"), |
|
|
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil) |
|
|
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil) |
|
|
|
|
|
|
|
|
_, _, err = syncer.SyncAny(0) |
|
|
|
|
|
assert.Equal(t, errNoSnapshots, err) |
|
|
|
|
|
connSnapshot.AssertExpectations(t) |
|
|
|
|
|
|
|
|
_, _, err = rts.syncer.SyncAny(0) |
|
|
|
|
|
require.Equal(t, errNoSnapshots, err) |
|
|
|
|
|
rts.conn.AssertExpectations(t) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func TestSyncer_SyncAny_reject_format(t *testing.T) { |
|
|
func TestSyncer_SyncAny_reject_format(t *testing.T) { |
|
|
syncer, connSnapshot := setupOfferSyncer(t) |
|
|
|
|
|
|
|
|
stateProvider := &mocks.StateProvider{} |
|
|
|
|
|
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) |
|
|
|
|
|
|
|
|
|
|
|
rts := setup(t, nil, nil, stateProvider, 2) |
|
|
|
|
|
|
|
|
// s22 is tried first, which reject s22 and s12, then s11 will abort.
|
|
|
// s22 is tried first, which reject s22 and s12, then s11 will abort.
|
|
|
s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}} |
|
|
s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}} |
|
|
s12 := &snapshot{Height: 1, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}} |
|
|
s12 := &snapshot{Height: 1, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}} |
|
|
s11 := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}} |
|
|
s11 := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}} |
|
|
_, err := syncer.AddSnapshot(simplePeer("id"), s22) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
peerID := p2p.PeerID{0xAA} |
|
|
|
|
|
|
|
|
|
|
|
_, err := rts.syncer.AddSnapshot(peerID, s22) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
_, err = syncer.AddSnapshot(simplePeer("id"), s12) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_, err = rts.syncer.AddSnapshot(peerID, s12) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
_, err = syncer.AddSnapshot(simplePeer("id"), s11) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_, err = rts.syncer.AddSnapshot(peerID, s11) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
connSnapshot.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{ |
|
|
|
|
|
|
|
|
rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{ |
|
|
Snapshot: toABCI(s22), AppHash: []byte("app_hash"), |
|
|
Snapshot: toABCI(s22), AppHash: []byte("app_hash"), |
|
|
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}, nil) |
|
|
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}, nil) |
|
|
|
|
|
|
|
|
connSnapshot.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{ |
|
|
|
|
|
|
|
|
rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{ |
|
|
Snapshot: toABCI(s11), AppHash: []byte("app_hash"), |
|
|
Snapshot: toABCI(s11), AppHash: []byte("app_hash"), |
|
|
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil) |
|
|
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil) |
|
|
|
|
|
|
|
|
_, _, err = syncer.SyncAny(0) |
|
|
|
|
|
assert.Equal(t, errAbort, err) |
|
|
|
|
|
connSnapshot.AssertExpectations(t) |
|
|
|
|
|
|
|
|
_, _, err = rts.syncer.SyncAny(0) |
|
|
|
|
|
require.Equal(t, errAbort, err) |
|
|
|
|
|
rts.conn.AssertExpectations(t) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func TestSyncer_SyncAny_reject_sender(t *testing.T) { |
|
|
func TestSyncer_SyncAny_reject_sender(t *testing.T) { |
|
|
syncer, connSnapshot := setupOfferSyncer(t) |
|
|
|
|
|
|
|
|
stateProvider := &mocks.StateProvider{} |
|
|
|
|
|
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) |
|
|
|
|
|
|
|
|
peerA := simplePeer("a") |
|
|
|
|
|
peerB := simplePeer("b") |
|
|
|
|
|
peerC := simplePeer("c") |
|
|
|
|
|
|
|
|
rts := setup(t, nil, nil, stateProvider, 2) |
|
|
|
|
|
|
|
|
|
|
|
peerAID := p2p.PeerID{0xAA} |
|
|
|
|
|
peerBID := p2p.PeerID{0xBB} |
|
|
|
|
|
peerCID := p2p.PeerID{0xCC} |
|
|
|
|
|
|
|
|
// sbc will be offered first, which will be rejected with reject_sender, causing all snapshots
|
|
|
// sbc will be offered first, which will be rejected with reject_sender, causing all snapshots
|
|
|
// submitted by both b and c (i.e. sb, sc, sbc) to be rejected. Finally, sa will reject and
|
|
|
// submitted by both b and c (i.e. sb, sc, sbc) to be rejected. Finally, sa will reject and
|
|
@ -304,44 +312,56 @@ func TestSyncer_SyncAny_reject_sender(t *testing.T) { |
|
|
sb := &snapshot{Height: 2, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}} |
|
|
sb := &snapshot{Height: 2, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}} |
|
|
sc := &snapshot{Height: 3, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}} |
|
|
sc := &snapshot{Height: 3, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}} |
|
|
sbc := &snapshot{Height: 4, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}} |
|
|
sbc := &snapshot{Height: 4, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}} |
|
|
_, err := syncer.AddSnapshot(peerA, sa) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_, err := rts.syncer.AddSnapshot(peerAID, sa) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
_, err = syncer.AddSnapshot(peerB, sb) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_, err = rts.syncer.AddSnapshot(peerBID, sb) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
_, err = syncer.AddSnapshot(peerC, sc) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_, err = rts.syncer.AddSnapshot(peerCID, sc) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
_, err = syncer.AddSnapshot(peerB, sbc) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_, err = rts.syncer.AddSnapshot(peerBID, sbc) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
_, err = syncer.AddSnapshot(peerC, sbc) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_, err = rts.syncer.AddSnapshot(peerCID, sbc) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
connSnapshot.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{ |
|
|
|
|
|
|
|
|
rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{ |
|
|
Snapshot: toABCI(sbc), AppHash: []byte("app_hash"), |
|
|
Snapshot: toABCI(sbc), AppHash: []byte("app_hash"), |
|
|
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_SENDER}, nil) |
|
|
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_SENDER}, nil) |
|
|
|
|
|
|
|
|
connSnapshot.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{ |
|
|
|
|
|
|
|
|
rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{ |
|
|
Snapshot: toABCI(sa), AppHash: []byte("app_hash"), |
|
|
Snapshot: toABCI(sa), AppHash: []byte("app_hash"), |
|
|
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil) |
|
|
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil) |
|
|
|
|
|
|
|
|
_, _, err = syncer.SyncAny(0) |
|
|
|
|
|
assert.Equal(t, errNoSnapshots, err) |
|
|
|
|
|
connSnapshot.AssertExpectations(t) |
|
|
|
|
|
|
|
|
_, _, err = rts.syncer.SyncAny(0) |
|
|
|
|
|
require.Equal(t, errNoSnapshots, err) |
|
|
|
|
|
rts.conn.AssertExpectations(t) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func TestSyncer_SyncAny_abciError(t *testing.T) { |
|
|
func TestSyncer_SyncAny_abciError(t *testing.T) { |
|
|
syncer, connSnapshot := setupOfferSyncer(t) |
|
|
|
|
|
|
|
|
stateProvider := &mocks.StateProvider{} |
|
|
|
|
|
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) |
|
|
|
|
|
|
|
|
|
|
|
rts := setup(t, nil, nil, stateProvider, 2) |
|
|
|
|
|
|
|
|
errBoom := errors.New("boom") |
|
|
errBoom := errors.New("boom") |
|
|
s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}} |
|
|
s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}} |
|
|
_, err := syncer.AddSnapshot(simplePeer("id"), s) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
peerID := p2p.PeerID{0xAA} |
|
|
|
|
|
|
|
|
|
|
|
_, err := rts.syncer.AddSnapshot(peerID, s) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
connSnapshot.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{ |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{ |
|
|
Snapshot: toABCI(s), AppHash: []byte("app_hash"), |
|
|
Snapshot: toABCI(s), AppHash: []byte("app_hash"), |
|
|
}).Once().Return(nil, errBoom) |
|
|
}).Once().Return(nil, errBoom) |
|
|
|
|
|
|
|
|
_, _, err = syncer.SyncAny(0) |
|
|
|
|
|
assert.True(t, errors.Is(err, errBoom)) |
|
|
|
|
|
connSnapshot.AssertExpectations(t) |
|
|
|
|
|
|
|
|
_, _, err = rts.syncer.SyncAny(0) |
|
|
|
|
|
require.True(t, errors.Is(err, errBoom)) |
|
|
|
|
|
rts.conn.AssertExpectations(t) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func TestSyncer_offerSnapshot(t *testing.T) { |
|
|
func TestSyncer_offerSnapshot(t *testing.T) { |
|
@ -365,13 +385,18 @@ func TestSyncer_offerSnapshot(t *testing.T) { |
|
|
for name, tc := range testcases { |
|
|
for name, tc := range testcases { |
|
|
tc := tc |
|
|
tc := tc |
|
|
t.Run(name, func(t *testing.T) { |
|
|
t.Run(name, func(t *testing.T) { |
|
|
syncer, connSnapshot := setupOfferSyncer(t) |
|
|
|
|
|
|
|
|
stateProvider := &mocks.StateProvider{} |
|
|
|
|
|
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) |
|
|
|
|
|
|
|
|
|
|
|
rts := setup(t, nil, nil, stateProvider, 2) |
|
|
|
|
|
|
|
|
s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}, trustedAppHash: []byte("app_hash")} |
|
|
s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}, trustedAppHash: []byte("app_hash")} |
|
|
connSnapshot.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{ |
|
|
|
|
|
|
|
|
rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{ |
|
|
Snapshot: toABCI(s), |
|
|
Snapshot: toABCI(s), |
|
|
AppHash: []byte("app_hash"), |
|
|
AppHash: []byte("app_hash"), |
|
|
}).Return(&abci.ResponseOfferSnapshot{Result: tc.result}, tc.err) |
|
|
}).Return(&abci.ResponseOfferSnapshot{Result: tc.result}, tc.err) |
|
|
err := syncer.offerSnapshot(s) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
err := rts.syncer.offerSnapshot(s) |
|
|
if tc.expectErr == unknownErr { |
|
|
if tc.expectErr == unknownErr { |
|
|
require.Error(t, err) |
|
|
require.Error(t, err) |
|
|
} else { |
|
|
} else { |
|
@ -379,7 +404,7 @@ func TestSyncer_offerSnapshot(t *testing.T) { |
|
|
if unwrapped != nil { |
|
|
if unwrapped != nil { |
|
|
err = unwrapped |
|
|
err = unwrapped |
|
|
} |
|
|
} |
|
|
assert.Equal(t, tc.expectErr, err) |
|
|
|
|
|
|
|
|
require.Equal(t, tc.expectErr, err) |
|
|
} |
|
|
} |
|
|
}) |
|
|
}) |
|
|
} |
|
|
} |
|
@ -406,11 +431,10 @@ func TestSyncer_applyChunks_Results(t *testing.T) { |
|
|
for name, tc := range testcases { |
|
|
for name, tc := range testcases { |
|
|
tc := tc |
|
|
tc := tc |
|
|
t.Run(name, func(t *testing.T) { |
|
|
t.Run(name, func(t *testing.T) { |
|
|
connQuery := &proxymocks.AppConnQuery{} |
|
|
|
|
|
connSnapshot := &proxymocks.AppConnSnapshot{} |
|
|
|
|
|
stateProvider := &mocks.StateProvider{} |
|
|
stateProvider := &mocks.StateProvider{} |
|
|
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) |
|
|
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) |
|
|
syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
rts := setup(t, nil, nil, stateProvider, 2) |
|
|
|
|
|
|
|
|
body := []byte{1, 2, 3} |
|
|
body := []byte{1, 2, 3} |
|
|
chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 1}, "") |
|
|
chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 1}, "") |
|
@ -418,17 +442,17 @@ func TestSyncer_applyChunks_Results(t *testing.T) { |
|
|
_, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: body}) |
|
|
_, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: body}) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{ |
|
|
|
|
|
|
|
|
rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{ |
|
|
Index: 0, Chunk: body, |
|
|
Index: 0, Chunk: body, |
|
|
}).Once().Return(&abci.ResponseApplySnapshotChunk{Result: tc.result}, tc.err) |
|
|
}).Once().Return(&abci.ResponseApplySnapshotChunk{Result: tc.result}, tc.err) |
|
|
if tc.result == abci.ResponseApplySnapshotChunk_RETRY { |
|
|
if tc.result == abci.ResponseApplySnapshotChunk_RETRY { |
|
|
connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{ |
|
|
|
|
|
|
|
|
rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{ |
|
|
Index: 0, Chunk: body, |
|
|
Index: 0, Chunk: body, |
|
|
}).Once().Return(&abci.ResponseApplySnapshotChunk{ |
|
|
}).Once().Return(&abci.ResponseApplySnapshotChunk{ |
|
|
Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) |
|
|
Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
err = syncer.applyChunks(chunks) |
|
|
|
|
|
|
|
|
err = rts.syncer.applyChunks(chunks) |
|
|
if tc.expectErr == unknownErr { |
|
|
if tc.expectErr == unknownErr { |
|
|
require.Error(t, err) |
|
|
require.Error(t, err) |
|
|
} else { |
|
|
} else { |
|
@ -436,9 +460,10 @@ func TestSyncer_applyChunks_Results(t *testing.T) { |
|
|
if unwrapped != nil { |
|
|
if unwrapped != nil { |
|
|
err = unwrapped |
|
|
err = unwrapped |
|
|
} |
|
|
} |
|
|
assert.Equal(t, tc.expectErr, err) |
|
|
|
|
|
|
|
|
require.Equal(t, tc.expectErr, err) |
|
|
} |
|
|
} |
|
|
connSnapshot.AssertExpectations(t) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
rts.conn.AssertExpectations(t) |
|
|
}) |
|
|
}) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
@ -457,11 +482,10 @@ func TestSyncer_applyChunks_RefetchChunks(t *testing.T) { |
|
|
for name, tc := range testcases { |
|
|
for name, tc := range testcases { |
|
|
tc := tc |
|
|
tc := tc |
|
|
t.Run(name, func(t *testing.T) { |
|
|
t.Run(name, func(t *testing.T) { |
|
|
connQuery := &proxymocks.AppConnQuery{} |
|
|
|
|
|
connSnapshot := &proxymocks.AppConnSnapshot{} |
|
|
|
|
|
stateProvider := &mocks.StateProvider{} |
|
|
stateProvider := &mocks.StateProvider{} |
|
|
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) |
|
|
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) |
|
|
syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
rts := setup(t, nil, nil, stateProvider, 2) |
|
|
|
|
|
|
|
|
chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 3}, "") |
|
|
chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 3}, "") |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
@ -476,13 +500,13 @@ func TestSyncer_applyChunks_RefetchChunks(t *testing.T) { |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
// The first two chunks are accepted, before the last one asks for 1 to be refetched
|
|
|
// The first two chunks are accepted, before the last one asks for 1 to be refetched
|
|
|
connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{ |
|
|
|
|
|
|
|
|
rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{ |
|
|
Index: 0, Chunk: []byte{0}, |
|
|
Index: 0, Chunk: []byte{0}, |
|
|
}).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) |
|
|
}).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) |
|
|
connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{ |
|
|
|
|
|
|
|
|
rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{ |
|
|
Index: 1, Chunk: []byte{1}, |
|
|
Index: 1, Chunk: []byte{1}, |
|
|
}).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) |
|
|
}).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) |
|
|
connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{ |
|
|
|
|
|
|
|
|
rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{ |
|
|
Index: 2, Chunk: []byte{2}, |
|
|
Index: 2, Chunk: []byte{2}, |
|
|
}).Once().Return(&abci.ResponseApplySnapshotChunk{ |
|
|
}).Once().Return(&abci.ResponseApplySnapshotChunk{ |
|
|
Result: tc.result, |
|
|
Result: tc.result, |
|
@ -493,15 +517,15 @@ func TestSyncer_applyChunks_RefetchChunks(t *testing.T) { |
|
|
// check the queue contents, and finally close the queue to end the goroutine.
|
|
|
// check the queue contents, and finally close the queue to end the goroutine.
|
|
|
// We don't really care about the result of applyChunks, since it has separate test.
|
|
|
// We don't really care about the result of applyChunks, since it has separate test.
|
|
|
go func() { |
|
|
go func() { |
|
|
syncer.applyChunks(chunks) //nolint:errcheck // purposefully ignore error
|
|
|
|
|
|
|
|
|
rts.syncer.applyChunks(chunks) //nolint:errcheck // purposefully ignore error
|
|
|
}() |
|
|
}() |
|
|
|
|
|
|
|
|
time.Sleep(50 * time.Millisecond) |
|
|
time.Sleep(50 * time.Millisecond) |
|
|
assert.True(t, chunks.Has(0)) |
|
|
|
|
|
assert.False(t, chunks.Has(1)) |
|
|
|
|
|
assert.True(t, chunks.Has(2)) |
|
|
|
|
|
err = chunks.Close() |
|
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
require.True(t, chunks.Has(0)) |
|
|
|
|
|
require.False(t, chunks.Has(1)) |
|
|
|
|
|
require.True(t, chunks.Has(2)) |
|
|
|
|
|
|
|
|
|
|
|
require.NoError(t, chunks.Close()) |
|
|
}) |
|
|
}) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
@ -520,63 +544,71 @@ func TestSyncer_applyChunks_RejectSenders(t *testing.T) { |
|
|
for name, tc := range testcases { |
|
|
for name, tc := range testcases { |
|
|
tc := tc |
|
|
tc := tc |
|
|
t.Run(name, func(t *testing.T) { |
|
|
t.Run(name, func(t *testing.T) { |
|
|
connQuery := &proxymocks.AppConnQuery{} |
|
|
|
|
|
connSnapshot := &proxymocks.AppConnSnapshot{} |
|
|
|
|
|
stateProvider := &mocks.StateProvider{} |
|
|
stateProvider := &mocks.StateProvider{} |
|
|
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) |
|
|
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) |
|
|
syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
rts := setup(t, nil, nil, stateProvider, 2) |
|
|
|
|
|
|
|
|
// Set up three peers across two snapshots, and ask for one of them to be banned.
|
|
|
// Set up three peers across two snapshots, and ask for one of them to be banned.
|
|
|
// It should be banned from all snapshots.
|
|
|
// It should be banned from all snapshots.
|
|
|
peerA := simplePeer("a") |
|
|
|
|
|
peerB := simplePeer("b") |
|
|
|
|
|
peerC := simplePeer("c") |
|
|
|
|
|
|
|
|
peerAID := p2p.PeerID{0xAA} |
|
|
|
|
|
peerBID := p2p.PeerID{0xBB} |
|
|
|
|
|
peerCID := p2p.PeerID{0xCC} |
|
|
|
|
|
|
|
|
s1 := &snapshot{Height: 1, Format: 1, Chunks: 3} |
|
|
s1 := &snapshot{Height: 1, Format: 1, Chunks: 3} |
|
|
s2 := &snapshot{Height: 2, Format: 1, Chunks: 3} |
|
|
s2 := &snapshot{Height: 2, Format: 1, Chunks: 3} |
|
|
_, err := syncer.AddSnapshot(peerA, s1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_, err := rts.syncer.AddSnapshot(peerAID, s1) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
_, err = syncer.AddSnapshot(peerA, s2) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_, err = rts.syncer.AddSnapshot(peerAID, s2) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
_, err = syncer.AddSnapshot(peerB, s1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_, err = rts.syncer.AddSnapshot(peerBID, s1) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
_, err = syncer.AddSnapshot(peerB, s2) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_, err = rts.syncer.AddSnapshot(peerBID, s2) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
_, err = syncer.AddSnapshot(peerC, s1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_, err = rts.syncer.AddSnapshot(peerCID, s1) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
_, err = syncer.AddSnapshot(peerC, s2) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_, err = rts.syncer.AddSnapshot(peerCID, s2) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
chunks, err := newChunkQueue(s1, "") |
|
|
chunks, err := newChunkQueue(s1, "") |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
added, err := chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{0}, Sender: peerA.ID()}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
added, err := chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{0}, Sender: peerAID}) |
|
|
require.True(t, added) |
|
|
require.True(t, added) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 1, Chunk: []byte{1}, Sender: peerB.ID()}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 1, Chunk: []byte{1}, Sender: peerBID}) |
|
|
require.True(t, added) |
|
|
require.True(t, added) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 2, Chunk: []byte{2}, Sender: peerC.ID()}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 2, Chunk: []byte{2}, Sender: peerCID}) |
|
|
require.True(t, added) |
|
|
require.True(t, added) |
|
|
require.NoError(t, err) |
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
// The first two chunks are accepted, before the last one asks for b sender to be rejected
|
|
|
// The first two chunks are accepted, before the last one asks for b sender to be rejected
|
|
|
connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{ |
|
|
|
|
|
Index: 0, Chunk: []byte{0}, Sender: "a", |
|
|
|
|
|
|
|
|
rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{ |
|
|
|
|
|
Index: 0, Chunk: []byte{0}, Sender: "aa", |
|
|
}).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) |
|
|
}).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) |
|
|
connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{ |
|
|
|
|
|
Index: 1, Chunk: []byte{1}, Sender: "b", |
|
|
|
|
|
|
|
|
rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{ |
|
|
|
|
|
Index: 1, Chunk: []byte{1}, Sender: "bb", |
|
|
}).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) |
|
|
}).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) |
|
|
connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{ |
|
|
|
|
|
Index: 2, Chunk: []byte{2}, Sender: "c", |
|
|
|
|
|
|
|
|
rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{ |
|
|
|
|
|
Index: 2, Chunk: []byte{2}, Sender: "cc", |
|
|
}).Once().Return(&abci.ResponseApplySnapshotChunk{ |
|
|
}).Once().Return(&abci.ResponseApplySnapshotChunk{ |
|
|
Result: tc.result, |
|
|
Result: tc.result, |
|
|
RejectSenders: []string{string(peerB.ID())}, |
|
|
|
|
|
|
|
|
RejectSenders: []string{peerBID.String()}, |
|
|
}, nil) |
|
|
}, nil) |
|
|
|
|
|
|
|
|
// On retry, the last chunk will be tried again, so we just accept it then.
|
|
|
// On retry, the last chunk will be tried again, so we just accept it then.
|
|
|
if tc.result == abci.ResponseApplySnapshotChunk_RETRY { |
|
|
if tc.result == abci.ResponseApplySnapshotChunk_RETRY { |
|
|
connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{ |
|
|
|
|
|
Index: 2, Chunk: []byte{2}, Sender: "c", |
|
|
|
|
|
|
|
|
rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{ |
|
|
|
|
|
Index: 2, Chunk: []byte{2}, Sender: "cc", |
|
|
}).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) |
|
|
}).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -584,23 +616,22 @@ func TestSyncer_applyChunks_RejectSenders(t *testing.T) { |
|
|
// However, it will block on e.g. retry result, so we spawn a goroutine that will
|
|
|
// However, it will block on e.g. retry result, so we spawn a goroutine that will
|
|
|
// be shut down when the chunk queue closes.
|
|
|
// be shut down when the chunk queue closes.
|
|
|
go func() { |
|
|
go func() { |
|
|
syncer.applyChunks(chunks) //nolint:errcheck // purposefully ignore error
|
|
|
|
|
|
|
|
|
rts.syncer.applyChunks(chunks) //nolint:errcheck // purposefully ignore error
|
|
|
}() |
|
|
}() |
|
|
|
|
|
|
|
|
time.Sleep(50 * time.Millisecond) |
|
|
time.Sleep(50 * time.Millisecond) |
|
|
|
|
|
|
|
|
s1peers := syncer.snapshots.GetPeers(s1) |
|
|
|
|
|
assert.Len(t, s1peers, 2) |
|
|
|
|
|
assert.EqualValues(t, "a", s1peers[0].ID()) |
|
|
|
|
|
assert.EqualValues(t, "c", s1peers[1].ID()) |
|
|
|
|
|
|
|
|
s1peers := rts.syncer.snapshots.GetPeers(s1) |
|
|
|
|
|
require.Len(t, s1peers, 2) |
|
|
|
|
|
require.EqualValues(t, "aa", s1peers[0].String()) |
|
|
|
|
|
require.EqualValues(t, "cc", s1peers[1].String()) |
|
|
|
|
|
|
|
|
syncer.snapshots.GetPeers(s1) |
|
|
|
|
|
assert.Len(t, s1peers, 2) |
|
|
|
|
|
assert.EqualValues(t, "a", s1peers[0].ID()) |
|
|
|
|
|
assert.EqualValues(t, "c", s1peers[1].ID()) |
|
|
|
|
|
|
|
|
rts.syncer.snapshots.GetPeers(s1) |
|
|
|
|
|
require.Len(t, s1peers, 2) |
|
|
|
|
|
require.EqualValues(t, "aa", s1peers[0].String()) |
|
|
|
|
|
require.EqualValues(t, "cc", s1peers[1].String()) |
|
|
|
|
|
|
|
|
err = chunks.Close() |
|
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
require.NoError(t, chunks.Close()) |
|
|
}) |
|
|
}) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
@ -634,20 +665,18 @@ func TestSyncer_verifyApp(t *testing.T) { |
|
|
for name, tc := range testcases { |
|
|
for name, tc := range testcases { |
|
|
tc := tc |
|
|
tc := tc |
|
|
t.Run(name, func(t *testing.T) { |
|
|
t.Run(name, func(t *testing.T) { |
|
|
connQuery := &proxymocks.AppConnQuery{} |
|
|
|
|
|
connSnapshot := &proxymocks.AppConnSnapshot{} |
|
|
|
|
|
stateProvider := &mocks.StateProvider{} |
|
|
|
|
|
syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "") |
|
|
|
|
|
|
|
|
rts := setup(t, nil, nil, nil, 2) |
|
|
|
|
|
|
|
|
connQuery.On("InfoSync", ctx, proxy.RequestInfo).Return(tc.response, tc.err) |
|
|
|
|
|
version, err := syncer.verifyApp(s) |
|
|
|
|
|
|
|
|
rts.connQuery.On("InfoSync", ctx, proxy.RequestInfo).Return(tc.response, tc.err) |
|
|
|
|
|
version, err := rts.syncer.verifyApp(s) |
|
|
unwrapped := errors.Unwrap(err) |
|
|
unwrapped := errors.Unwrap(err) |
|
|
if unwrapped != nil { |
|
|
if unwrapped != nil { |
|
|
err = unwrapped |
|
|
err = unwrapped |
|
|
} |
|
|
} |
|
|
assert.Equal(t, tc.expectErr, err) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
require.Equal(t, tc.expectErr, err) |
|
|
if err == nil { |
|
|
if err == nil { |
|
|
assert.Equal(t, tc.response.AppVersion, version) |
|
|
|
|
|
|
|
|
require.Equal(t, tc.response.AppVersion, version) |
|
|
} |
|
|
} |
|
|
}) |
|
|
}) |
|
|
} |
|
|
} |
|
|