diff --git a/internal/p2p/p2ptest/network.go b/internal/p2p/p2ptest/network.go index cc195c22a..669d92962 100644 --- a/internal/p2p/p2ptest/network.go +++ b/internal/p2p/p2ptest/network.go @@ -101,10 +101,8 @@ func (n *Network) Start(ctx context.Context, t *testing.T) { case <-ctx.Done(): require.Fail(t, "operation canceled") case peerUpdate := <-sourceSub.Updates(): - require.Equal(t, p2p.PeerUpdate{ - NodeID: targetNode.NodeID, - Status: p2p.PeerStatusUp, - }, peerUpdate) + require.Equal(t, targetNode.NodeID, peerUpdate.NodeID) + require.Equal(t, p2p.PeerStatusUp, peerUpdate.Status) case <-time.After(3 * time.Second): require.Fail(t, "timed out waiting for peer", "%v dialing %v", sourceNode.NodeID, targetNode.NodeID) @@ -114,6 +112,7 @@ func (n *Network) Start(ctx context.Context, t *testing.T) { case <-ctx.Done(): require.Fail(t, "operation canceled") case peerUpdate := <-targetSub.Updates(): + peerUpdate.Channels = nil require.Equal(t, p2p.PeerUpdate{ NodeID: sourceNode.NodeID, Status: p2p.PeerStatusUp, diff --git a/internal/p2p/p2ptest/require.go b/internal/p2p/p2ptest/require.go index f492ff09e..f9f3ec40e 100644 --- a/internal/p2p/p2ptest/require.go +++ b/internal/p2p/p2ptest/require.go @@ -136,8 +136,8 @@ func RequireUpdate(t *testing.T, peerUpdates *p2p.PeerUpdates, expect p2p.PeerUp select { case update := <-peerUpdates.Updates(): - require.Equal(t, expect, update, "peer update did not match") - + require.Equal(t, expect.NodeID, update.NodeID, "node id did not match") + require.Equal(t, expect.Status, update.Status, "statuses did not match") case <-timer.C: require.Fail(t, "timed out waiting for peer update", "expected %v", expect) } @@ -155,7 +155,11 @@ func RequireUpdates(t *testing.T, peerUpdates *p2p.PeerUpdates, expect []p2p.Pee case update := <-peerUpdates.Updates(): actual = append(actual, update) if len(actual) == len(expect) { - require.Equal(t, expect, actual) + for idx := range expect { + require.Equal(t, expect[idx].NodeID, actual[idx].NodeID) + require.Equal(t, expect[idx].Status, actual[idx].Status) + } + return } diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go index 9d7cfe42d..3293468ae 100644 --- a/internal/p2p/peermanager.go +++ b/internal/p2p/peermanager.go @@ -47,8 +47,9 @@ const ( // PeerUpdate is a peer update event sent via PeerUpdates. type PeerUpdate struct { - NodeID types.NodeID - Status PeerStatus + NodeID types.NodeID + Status PeerStatus + Channels ChannelIDSet } // PeerUpdates is a peer update subscription with notifications about peer @@ -674,19 +675,23 @@ func (m *PeerManager) Accepted(peerID types.NodeID) error { return nil } -// Ready marks a peer as ready, broadcasting status updates to subscribers. The -// peer must already be marked as connected. This is separate from Dialed() and -// Accepted() to allow the router to set up its internal queues before reactors -// start sending messages. -func (m *PeerManager) Ready(ctx context.Context, peerID types.NodeID) { +// Ready marks a peer as ready, broadcasting status updates to +// subscribers. The peer must already be marked as connected. This is +// separate from Dialed() and Accepted() to allow the router to set up +// its internal queues before reactors start sending messages. The +// channels set here are passed in the peer update broadcast to +// reactors, which can then mediate their own behavior based on the +// capability of the peers. +func (m *PeerManager) Ready(ctx context.Context, peerID types.NodeID, channels ChannelIDSet) { m.mtx.Lock() defer m.mtx.Unlock() if m.connected[peerID] { m.ready[peerID] = true m.broadcast(ctx, PeerUpdate{ - NodeID: peerID, - Status: PeerStatusUp, + NodeID: peerID, + Status: PeerStatusUp, + Channels: channels, }) } } @@ -1208,6 +1213,7 @@ type peerInfo struct { // These fields are ephemeral, i.e. not persisted to the database. Persistent bool + Seed bool Height int64 FixedScore PeerScore // mainly for tests @@ -1230,6 +1236,7 @@ func peerInfoFromProto(msg *p2pproto.PeerInfo) (*peerInfo, error) { return nil, err } p.AddressInfo[addressInfo.Address] = addressInfo + } return p, p.Validate() } diff --git a/internal/p2p/peermanager_test.go b/internal/p2p/peermanager_test.go index 3e12ac989..82d1e2693 100644 --- a/internal/p2p/peermanager_test.go +++ b/internal/p2p/peermanager_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/fortytw2/leaktest" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" dbm "github.com/tendermint/tm-db" @@ -1311,7 +1312,7 @@ func TestPeerManager_Ready(t *testing.T) { require.Equal(t, p2p.PeerStatusDown, peerManager.Status(a.NodeID)) // Marking a as ready should transition it to PeerStatusUp and send an update. - peerManager.Ready(ctx, a.NodeID) + peerManager.Ready(ctx, a.NodeID, nil) require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID)) require.Equal(t, p2p.PeerUpdate{ NodeID: a.NodeID, @@ -1323,11 +1324,34 @@ func TestPeerManager_Ready(t *testing.T) { require.NoError(t, err) require.True(t, added) require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID)) - peerManager.Ready(ctx, b.NodeID) + peerManager.Ready(ctx, b.NodeID, nil) require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID)) require.Empty(t, sub.Updates()) } +func TestPeerManager_Ready_Channels(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + pm, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + require.NoError(t, err) + + sub := pm.Subscribe(ctx) + + a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} + added, err := pm.Add(a) + require.NoError(t, err) + require.True(t, added) + require.NoError(t, pm.Accepted(a.NodeID)) + + pm.Ready(ctx, a.NodeID, p2p.ChannelIDSet{42: struct{}{}}) + require.NotEmpty(t, sub.Updates()) + update := <-sub.Updates() + assert.Equal(t, a.NodeID, update.NodeID) + require.True(t, update.Channels.Contains(42)) + require.False(t, update.Channels.Contains(48)) +} + // See TryEvictNext for most tests, this just tests blocking behavior. func TestPeerManager_EvictNext(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -1342,7 +1366,7 @@ func TestPeerManager_EvictNext(t *testing.T) { require.NoError(t, err) require.True(t, added) require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Ready(ctx, a.NodeID) + peerManager.Ready(ctx, a.NodeID, nil) // Since there are no peers to evict, EvictNext should block until timeout. timeoutCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) @@ -1378,7 +1402,7 @@ func TestPeerManager_EvictNext_WakeOnError(t *testing.T) { require.NoError(t, err) require.True(t, added) require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Ready(ctx, a.NodeID) + peerManager.Ready(ctx, a.NodeID, nil) // Spawn a goroutine to error a peer after a delay. go func() { @@ -1413,7 +1437,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeDialed(t *testing.T) { require.NoError(t, err) require.True(t, added) require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Ready(ctx, a.NodeID) + peerManager.Ready(ctx, a.NodeID, nil) // Spawn a goroutine to upgrade to b with a delay. go func() { @@ -1454,7 +1478,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeAccepted(t *testing.T) { require.NoError(t, err) require.True(t, added) require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Ready(ctx, a.NodeID) + peerManager.Ready(ctx, a.NodeID, nil) // Spawn a goroutine to upgrade b with a delay. go func() { @@ -1489,7 +1513,7 @@ func TestPeerManager_TryEvictNext(t *testing.T) { // Connecting to a won't evict anything either. require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Ready(ctx, a.NodeID) + peerManager.Ready(ctx, a.NodeID, nil) // But if a errors it should be evicted. peerManager.Errored(a.NodeID, errors.New("foo")) @@ -1536,7 +1560,7 @@ func TestPeerManager_Disconnected(t *testing.T) { _, err = peerManager.Add(a) require.NoError(t, err) require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Ready(ctx, a.NodeID) + peerManager.Ready(ctx, a.NodeID, nil) require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID)) require.NotEmpty(t, sub.Updates()) require.Equal(t, p2p.PeerUpdate{ @@ -1591,7 +1615,7 @@ func TestPeerManager_Errored(t *testing.T) { require.Zero(t, evict) require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Ready(ctx, a.NodeID) + peerManager.Ready(ctx, a.NodeID, nil) evict, err = peerManager.TryEvictNext() require.NoError(t, err) require.Zero(t, evict) @@ -1624,7 +1648,7 @@ func TestPeerManager_Subscribe(t *testing.T) { require.NoError(t, peerManager.Accepted(a.NodeID)) require.Empty(t, sub.Updates()) - peerManager.Ready(ctx, a.NodeID) + peerManager.Ready(ctx, a.NodeID, nil) require.NotEmpty(t, sub.Updates()) require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates()) @@ -1641,7 +1665,7 @@ func TestPeerManager_Subscribe(t *testing.T) { require.NoError(t, peerManager.Dialed(a)) require.Empty(t, sub.Updates()) - peerManager.Ready(ctx, a.NodeID) + peerManager.Ready(ctx, a.NodeID, nil) require.NotEmpty(t, sub.Updates()) require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates()) @@ -1683,7 +1707,7 @@ func TestPeerManager_Subscribe_Close(t *testing.T) { require.NoError(t, peerManager.Accepted(a.NodeID)) require.Empty(t, sub.Updates()) - peerManager.Ready(ctx, a.NodeID) + peerManager.Ready(ctx, a.NodeID, nil) require.NotEmpty(t, sub.Updates()) require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates()) @@ -1716,7 +1740,7 @@ func TestPeerManager_Subscribe_Broadcast(t *testing.T) { require.NoError(t, err) require.True(t, added) require.NoError(t, peerManager.Accepted(a.NodeID)) - peerManager.Ready(ctx, a.NodeID) + peerManager.Ready(ctx, a.NodeID, nil) expectUp := p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp} require.NotEmpty(t, s1) diff --git a/internal/p2p/router.go b/internal/p2p/router.go index c60fe30a5..f9b3d1ad8 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -162,7 +162,7 @@ type Router struct { peerMtx sync.RWMutex peerQueues map[types.NodeID]queue // outbound messages per peer for all channels // the channels that the peer queue has open - peerChannels map[types.NodeID]channelIDs + peerChannels map[types.NodeID]ChannelIDSet queueFactory func(int) queue // FIXME: We don't strictly need to use a mutex for this if we seal the @@ -210,7 +210,7 @@ func NewRouter( channelQueues: map[ChannelID]queue{}, channelMessages: map[ChannelID]proto.Message{}, peerQueues: map[types.NodeID]queue{}, - peerChannels: make(map[types.NodeID]channelIDs), + peerChannels: make(map[types.NodeID]ChannelIDSet), } router.BaseService = service.NewBaseService(logger, "router", router) @@ -644,7 +644,7 @@ func (r *Router) connectPeer(ctx context.Context, address NodeAddress) { go r.routePeer(ctx, address.NodeID, conn, toChannelIDs(peerInfo.Channels)) } -func (r *Router) getOrMakeQueue(peerID types.NodeID, channels channelIDs) queue { +func (r *Router) getOrMakeQueue(peerID types.NodeID, channels ChannelIDSet) queue { r.peerMtx.Lock() defer r.peerMtx.Unlock() @@ -756,9 +756,9 @@ func (r *Router) runWithPeerMutex(fn func() error) error { // routePeer routes inbound and outbound messages between a peer and the reactor // channels. It will close the given connection and send queue when done, or if // they are closed elsewhere it will cause this method to shut down and return. -func (r *Router) routePeer(ctx context.Context, peerID types.NodeID, conn Connection, channels channelIDs) { +func (r *Router) routePeer(ctx context.Context, peerID types.NodeID, conn Connection, channels ChannelIDSet) { r.metrics.Peers.Add(1) - r.peerManager.Ready(ctx, peerID) + r.peerManager.Ready(ctx, peerID, channels) sendQueue := r.getOrMakeQueue(peerID, channels) defer func() { @@ -1003,9 +1003,14 @@ func (r *Router) OnStop() { } } -type channelIDs map[ChannelID]struct{} +type ChannelIDSet map[ChannelID]struct{} -func toChannelIDs(bytes []byte) channelIDs { +func (cs ChannelIDSet) Contains(id ChannelID) bool { + _, ok := cs[id] + return ok +} + +func toChannelIDs(bytes []byte) ChannelIDSet { c := make(map[ChannelID]struct{}, len(bytes)) for _, b := range bytes { c[ChannelID(b)] = struct{}{} diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index b1e286ad8..1f65a8c0c 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -862,7 +862,17 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda switch peerUpdate.Status { case p2p.PeerStatusUp: - r.peers.Append(peerUpdate.NodeID) + if peerUpdate.Channels.Contains(SnapshotChannel) && + peerUpdate.Channels.Contains(ChunkChannel) && + peerUpdate.Channels.Contains(LightBlockChannel) && + peerUpdate.Channels.Contains(ParamsChannel) { + + r.peers.Append(peerUpdate.NodeID) + + } else { + r.logger.Error("could not use peer for statesync", "peer", peerUpdate.NodeID) + } + case p2p.PeerStatusDown: r.peers.Remove(peerUpdate.NodeID) } @@ -875,6 +885,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda switch peerUpdate.Status { case p2p.PeerStatusUp: + newProvider := NewBlockProvider(peerUpdate.NodeID, r.chainID, r.dispatcher) r.providers[peerUpdate.NodeID] = newProvider err := r.syncer.AddPeer(ctx, peerUpdate.NodeID) diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index c1ca87b2c..dd1a224b9 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -496,10 +496,22 @@ func TestReactor_BlockProviders(t *testing.T) { rts.peerUpdateCh <- p2p.PeerUpdate{ NodeID: types.NodeID("aa"), Status: p2p.PeerStatusUp, + Channels: p2p.ChannelIDSet{ + SnapshotChannel: struct{}{}, + ChunkChannel: struct{}{}, + LightBlockChannel: struct{}{}, + ParamsChannel: struct{}{}, + }, } rts.peerUpdateCh <- p2p.PeerUpdate{ NodeID: types.NodeID("bb"), Status: p2p.PeerStatusUp, + Channels: p2p.ChannelIDSet{ + SnapshotChannel: struct{}{}, + ChunkChannel: struct{}{}, + LightBlockChannel: struct{}{}, + ParamsChannel: struct{}{}, + }, } closeCh := make(chan struct{}) @@ -637,6 +649,12 @@ func TestReactor_Backfill(t *testing.T) { rts.peerUpdateCh <- p2p.PeerUpdate{ NodeID: types.NodeID(peer), Status: p2p.PeerStatusUp, + Channels: p2p.ChannelIDSet{ + SnapshotChannel: struct{}{}, + ChunkChannel: struct{}{}, + LightBlockChannel: struct{}{}, + ParamsChannel: struct{}{}, + }, } } @@ -854,6 +872,12 @@ func graduallyAddPeers( peerUpdateCh <- p2p.PeerUpdate{ NodeID: factory.RandomNodeID(t), Status: p2p.PeerStatusUp, + Channels: p2p.ChannelIDSet{ + SnapshotChannel: struct{}{}, + ChunkChannel: struct{}{}, + LightBlockChannel: struct{}{}, + ParamsChannel: struct{}{}, + }, } case <-closeCh: return