From 399c3661857b68e6168da5479dfffd025623d702 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Thu, 22 Apr 2021 09:54:14 -0400 Subject: [PATCH] statesync: sort snapshots by commonness (#6385) --- statesync/snapshots.go | 43 ++++++++++++++++++++++++++++++------- statesync/snapshots_test.go | 2 +- statesync/syncer_test.go | 9 ++++++-- 3 files changed, 43 insertions(+), 11 deletions(-) diff --git a/statesync/snapshots.go b/statesync/snapshots.go index 3bed0fbf1..70e5bd78e 100644 --- a/statesync/snapshots.go +++ b/statesync/snapshots.go @@ -177,12 +177,41 @@ func (p *snapshotPool) Ranked() []*snapshot { p.Lock() defer p.Unlock() - candidates := make([]*snapshot, 0, len(p.snapshots)) + if len(p.snapshots) == 0 { + return []*snapshot{} + } + + numPeers := make([]int, 0, len(p.snapshots)) for key := range p.snapshots { - candidates = append(candidates, p.snapshots[key]) + numPeers = append(numPeers, len(p.snapshotPeers[key])) + } + sort.Ints(numPeers) + median := len(numPeers) / 2 + if len(numPeers)%2 == 0 { + median = (numPeers[median-1] + numPeers[median]) / 2 + } else { + median = numPeers[median] } - sort.Slice(candidates, func(i, j int) bool { + commonCandidates := make([]*snapshot, 0, len(p.snapshots)/2) + uncommonCandidates := make([]*snapshot, 0, len(p.snapshots)/2) + for key := range p.snapshots { + if len(p.snapshotPeers[key]) > median { + commonCandidates = append(commonCandidates, p.snapshots[key]) + continue + } + + uncommonCandidates = append(uncommonCandidates, p.snapshots[key]) + } + + sort.Slice(commonCandidates, p.sorterFactory(commonCandidates)) + sort.Slice(uncommonCandidates, p.sorterFactory(uncommonCandidates)) + + return append(commonCandidates, uncommonCandidates...) +} + +func (p *snapshotPool) sorterFactory(candidates []*snapshot) func(int, int) bool { + return func(i, j int) bool { a := candidates[i] b := candidates[j] @@ -191,18 +220,16 @@ func (p *snapshotPool) Ranked() []*snapshot { return true case a.Height < b.Height: return false + case len(p.snapshotPeers[a.Key()]) > len(p.snapshotPeers[b.Key()]): + return true case a.Format > b.Format: return true case a.Format < b.Format: return false - case len(p.snapshotPeers[a.Key()]) > len(p.snapshotPeers[b.Key()]): - return true default: return false } - }) - - return candidates + } } // Reject rejects a snapshot. Rejected snapshots will never be used again. diff --git a/statesync/snapshots_test.go b/statesync/snapshots_test.go index 085121f32..69acd33eb 100644 --- a/statesync/snapshots_test.go +++ b/statesync/snapshots_test.go @@ -149,10 +149,10 @@ func TestSnapshotPool_Ranked_Best(t *testing.T) { peers []p2p.NodeID }{ {&snapshot{Height: 2, Format: 2, Chunks: 4, Hash: []byte{1, 3}}, []p2p.NodeID{"AA", "BB", "CC"}}, + {&snapshot{Height: 1, Format: 1, Chunks: 4, Hash: []byte{1, 2}}, []p2p.NodeID{"AA", "BB", "CC"}}, {&snapshot{Height: 2, Format: 2, Chunks: 5, Hash: []byte{1, 2}}, []p2p.NodeID{"AA"}}, {&snapshot{Height: 2, Format: 1, Chunks: 3, Hash: []byte{1, 2}}, []p2p.NodeID{"AA", "BB"}}, {&snapshot{Height: 1, Format: 2, Chunks: 5, Hash: []byte{1, 2}}, []p2p.NodeID{"AA", "BB"}}, - {&snapshot{Height: 1, Format: 1, Chunks: 4, Hash: []byte{1, 2}}, []p2p.NodeID{"AA", "BB", "CC"}}, } // Add snapshots in reverse order, to make sure the pool enforces some order. diff --git a/statesync/syncer_test.go b/statesync/syncer_test.go index 7d150c5d5..876d6adcb 100644 --- a/statesync/syncer_test.go +++ b/statesync/syncer_test.go @@ -68,7 +68,7 @@ func TestSyncer_SyncAny(t *testing.T) { peerAID := p2p.NodeID("aa") peerBID := p2p.NodeID("bb") - + peerCID := p2p.NodeID("cc") rts := setup(t, connSnapshot, connQuery, stateProvider, 3) // Adding a chunk should error when no sync is in progress @@ -96,10 +96,15 @@ func TestSyncer_SyncAny(t *testing.T) { require.NoError(t, err) require.False(t, new) - new, err = rts.syncer.AddSnapshot(peerBID, &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1}}) + s2 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1}} + new, err = rts.syncer.AddSnapshot(peerBID, s2) require.NoError(t, err) require.True(t, new) + new, err = rts.syncer.AddSnapshot(peerCID, s2) + require.NoError(t, err) + require.False(t, new) + // We start a sync, with peers sending back chunks when requested. We first reject the snapshot // with height 2 format 2, and accept the snapshot at height 1. connSnapshot.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{