Browse Source

p2p: plumb rudamentary service discovery to rectors and update statesync (backport #8030) (#8036)

pull/8024/head
mergify[bot] 3 years ago
committed by GitHub
parent
commit
0e08c66926
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 108 additions and 38 deletions
  1. +3
    -4
      internal/p2p/p2ptest/network.go
  2. +7
    -4
      internal/p2p/p2ptest/require.go
  3. +16
    -9
      internal/p2p/peermanager.go
  4. +34
    -13
      internal/p2p/peermanager_test.go
  5. +12
    -7
      internal/p2p/router.go
  6. +12
    -1
      internal/statesync/reactor.go
  7. +24
    -0
      internal/statesync/reactor_test.go

+ 3
- 4
internal/p2p/p2ptest/network.go View File

@ -95,10 +95,8 @@ func (n *Network) Start(t *testing.T) {
select { select {
case peerUpdate := <-sourceSub.Updates(): case peerUpdate := <-sourceSub.Updates():
require.Equal(t, p2p.PeerUpdate{
NodeID: targetNode.NodeID,
Status: p2p.PeerStatusUp,
}, peerUpdate)
require.Equal(t, targetNode.NodeID, peerUpdate.NodeID)
require.Equal(t, p2p.PeerStatusUp, peerUpdate.Status)
case <-time.After(3 * time.Second): case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for peer", "%v dialing %v", require.Fail(t, "timed out waiting for peer", "%v dialing %v",
sourceNode.NodeID, targetNode.NodeID) sourceNode.NodeID, targetNode.NodeID)
@ -106,6 +104,7 @@ func (n *Network) Start(t *testing.T) {
select { select {
case peerUpdate := <-targetSub.Updates(): case peerUpdate := <-targetSub.Updates():
peerUpdate.Channels = nil
require.Equal(t, p2p.PeerUpdate{ require.Equal(t, p2p.PeerUpdate{
NodeID: sourceNode.NodeID, NodeID: sourceNode.NodeID,
Status: p2p.PeerStatusUp, Status: p2p.PeerStatusUp,


+ 7
- 4
internal/p2p/p2ptest/require.go View File

@ -120,11 +120,10 @@ func RequireUpdate(t *testing.T, peerUpdates *p2p.PeerUpdates, expect p2p.PeerUp
select { select {
case update := <-peerUpdates.Updates(): case update := <-peerUpdates.Updates():
require.Equal(t, expect, update, "peer update did not match")
require.Equal(t, expect.NodeID, update.NodeID, "node id did not match")
require.Equal(t, expect.Status, update.Status, "statuses did not match")
case <-peerUpdates.Done(): case <-peerUpdates.Done():
require.Fail(t, "peer updates subscription is closed") require.Fail(t, "peer updates subscription is closed")
case <-timer.C: case <-timer.C:
require.Fail(t, "timed out waiting for peer update", "expected %v", expect) require.Fail(t, "timed out waiting for peer update", "expected %v", expect)
} }
@ -142,7 +141,11 @@ func RequireUpdates(t *testing.T, peerUpdates *p2p.PeerUpdates, expect []p2p.Pee
case update := <-peerUpdates.Updates(): case update := <-peerUpdates.Updates():
actual = append(actual, update) actual = append(actual, update)
if len(actual) == len(expect) { if len(actual) == len(expect) {
require.Equal(t, expect, actual)
for idx := range expect {
require.Equal(t, expect[idx].NodeID, actual[idx].NodeID)
require.Equal(t, expect[idx].Status, actual[idx].Status)
}
return return
} }


+ 16
- 9
internal/p2p/peermanager.go View File

@ -47,8 +47,9 @@ const (
// PeerUpdate is a peer update event sent via PeerUpdates. // PeerUpdate is a peer update event sent via PeerUpdates.
type PeerUpdate struct { type PeerUpdate struct {
NodeID types.NodeID
Status PeerStatus
NodeID types.NodeID
Status PeerStatus
Channels ChannelIDSet
} }
// PeerUpdates is a peer update subscription with notifications about peer // PeerUpdates is a peer update subscription with notifications about peer
@ -697,19 +698,23 @@ func (m *PeerManager) Accepted(peerID types.NodeID) error {
return nil return nil
} }
// Ready marks a peer as ready, broadcasting status updates to subscribers. The
// peer must already be marked as connected. This is separate from Dialed() and
// Accepted() to allow the router to set up its internal queues before reactors
// start sending messages.
func (m *PeerManager) Ready(peerID types.NodeID) {
// Ready marks a peer as ready, broadcasting status updates to
// subscribers. The peer must already be marked as connected. This is
// separate from Dialed() and Accepted() to allow the router to set up
// its internal queues before reactors start sending messages. The
// channels set here are passed in the peer update broadcast to
// reactors, which can then mediate their own behavior based on the
// capability of the peers.
func (m *PeerManager) Ready(peerID types.NodeID, channels ChannelIDSet) {
m.mtx.Lock() m.mtx.Lock()
defer m.mtx.Unlock() defer m.mtx.Unlock()
if m.connected[peerID] { if m.connected[peerID] {
m.ready[peerID] = true m.ready[peerID] = true
m.broadcast(PeerUpdate{ m.broadcast(PeerUpdate{
NodeID: peerID,
Status: PeerStatusUp,
NodeID: peerID,
Status: PeerStatusUp,
Channels: channels,
}) })
} }
} }
@ -1242,6 +1247,7 @@ type peerInfo struct {
// These fields are ephemeral, i.e. not persisted to the database. // These fields are ephemeral, i.e. not persisted to the database.
Persistent bool Persistent bool
Seed bool
Height int64 Height int64
FixedScore PeerScore // mainly for tests FixedScore PeerScore // mainly for tests
@ -1264,6 +1270,7 @@ func peerInfoFromProto(msg *p2pproto.PeerInfo) (*peerInfo, error) {
return nil, err return nil, err
} }
p.AddressInfo[addressInfo.Address] = addressInfo p.AddressInfo[addressInfo.Address] = addressInfo
} }
return p, p.Validate() return p, p.Validate()
} }


+ 34
- 13
internal/p2p/peermanager_test.go View File

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/fortytw2/leaktest" "github.com/fortytw2/leaktest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db" dbm "github.com/tendermint/tm-db"
@ -1271,7 +1272,7 @@ func TestPeerManager_Ready(t *testing.T) {
require.Equal(t, p2p.PeerStatusDown, peerManager.Status(a.NodeID)) require.Equal(t, p2p.PeerStatusDown, peerManager.Status(a.NodeID))
// Marking a as ready should transition it to PeerStatusUp and send an update. // Marking a as ready should transition it to PeerStatusUp and send an update.
peerManager.Ready(a.NodeID)
peerManager.Ready(a.NodeID, nil)
require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID)) require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID))
require.Equal(t, p2p.PeerUpdate{ require.Equal(t, p2p.PeerUpdate{
NodeID: a.NodeID, NodeID: a.NodeID,
@ -1283,11 +1284,31 @@ func TestPeerManager_Ready(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.True(t, added) require.True(t, added)
require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID)) require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID))
peerManager.Ready(b.NodeID)
peerManager.Ready(b.NodeID, nil)
require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID)) require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID))
require.Empty(t, sub.Updates()) require.Empty(t, sub.Updates())
} }
func TestPeerManager_Ready_Channels(t *testing.T) {
pm, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
require.NoError(t, err)
sub := pm.Subscribe()
a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
added, err := pm.Add(a)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, pm.Accepted(a.NodeID))
pm.Ready(a.NodeID, p2p.ChannelIDSet{42: struct{}{}})
require.NotEmpty(t, sub.Updates())
update := <-sub.Updates()
assert.Equal(t, a.NodeID, update.NodeID)
require.True(t, update.Channels.Contains(42))
require.False(t, update.Channels.Contains(48))
}
// See TryEvictNext for most tests, this just tests blocking behavior. // See TryEvictNext for most tests, this just tests blocking behavior.
func TestPeerManager_EvictNext(t *testing.T) { func TestPeerManager_EvictNext(t *testing.T) {
a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
@ -1299,7 +1320,7 @@ func TestPeerManager_EvictNext(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.True(t, added) require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID)) require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(a.NodeID)
peerManager.Ready(a.NodeID, nil)
// Since there are no peers to evict, EvictNext should block until timeout. // Since there are no peers to evict, EvictNext should block until timeout.
timeoutCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) timeoutCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
@ -1332,7 +1353,7 @@ func TestPeerManager_EvictNext_WakeOnError(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.True(t, added) require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID)) require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(a.NodeID)
peerManager.Ready(a.NodeID, nil)
// Spawn a goroutine to error a peer after a delay. // Spawn a goroutine to error a peer after a delay.
go func() { go func() {
@ -1364,7 +1385,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeDialed(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.True(t, added) require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID)) require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(a.NodeID)
peerManager.Ready(a.NodeID, nil)
// Spawn a goroutine to upgrade to b with a delay. // Spawn a goroutine to upgrade to b with a delay.
go func() { go func() {
@ -1402,7 +1423,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeAccepted(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.True(t, added) require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID)) require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(a.NodeID)
peerManager.Ready(a.NodeID, nil)
// Spawn a goroutine to upgrade b with a delay. // Spawn a goroutine to upgrade b with a delay.
go func() { go func() {
@ -1434,7 +1455,7 @@ func TestPeerManager_TryEvictNext(t *testing.T) {
// Connecting to a won't evict anything either. // Connecting to a won't evict anything either.
require.NoError(t, peerManager.Accepted(a.NodeID)) require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(a.NodeID)
peerManager.Ready(a.NodeID, nil)
// But if a errors it should be evicted. // But if a errors it should be evicted.
peerManager.Errored(a.NodeID, errors.New("foo")) peerManager.Errored(a.NodeID, errors.New("foo"))
@ -1479,7 +1500,7 @@ func TestPeerManager_Disconnected(t *testing.T) {
_, err = peerManager.Add(a) _, err = peerManager.Add(a)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, peerManager.Accepted(a.NodeID)) require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(a.NodeID)
peerManager.Ready(a.NodeID, nil)
require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID)) require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID))
require.NotEmpty(t, sub.Updates()) require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{ require.Equal(t, p2p.PeerUpdate{
@ -1531,7 +1552,7 @@ func TestPeerManager_Errored(t *testing.T) {
require.Zero(t, evict) require.Zero(t, evict)
require.NoError(t, peerManager.Accepted(a.NodeID)) require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(a.NodeID)
peerManager.Ready(a.NodeID, nil)
evict, err = peerManager.TryEvictNext() evict, err = peerManager.TryEvictNext()
require.NoError(t, err) require.NoError(t, err)
require.Zero(t, evict) require.Zero(t, evict)
@ -1562,7 +1583,7 @@ func TestPeerManager_Subscribe(t *testing.T) {
require.NoError(t, peerManager.Accepted(a.NodeID)) require.NoError(t, peerManager.Accepted(a.NodeID))
require.Empty(t, sub.Updates()) require.Empty(t, sub.Updates())
peerManager.Ready(a.NodeID)
peerManager.Ready(a.NodeID, nil)
require.NotEmpty(t, sub.Updates()) require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates()) require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates())
@ -1579,7 +1600,7 @@ func TestPeerManager_Subscribe(t *testing.T) {
require.NoError(t, peerManager.Dialed(a)) require.NoError(t, peerManager.Dialed(a))
require.Empty(t, sub.Updates()) require.Empty(t, sub.Updates())
peerManager.Ready(a.NodeID)
peerManager.Ready(a.NodeID, nil)
require.NotEmpty(t, sub.Updates()) require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates()) require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates())
@ -1619,7 +1640,7 @@ func TestPeerManager_Subscribe_Close(t *testing.T) {
require.NoError(t, peerManager.Accepted(a.NodeID)) require.NoError(t, peerManager.Accepted(a.NodeID))
require.Empty(t, sub.Updates()) require.Empty(t, sub.Updates())
peerManager.Ready(a.NodeID)
peerManager.Ready(a.NodeID, nil)
require.NotEmpty(t, sub.Updates()) require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates()) require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates())
@ -1649,7 +1670,7 @@ func TestPeerManager_Subscribe_Broadcast(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.True(t, added) require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID)) require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(a.NodeID)
peerManager.Ready(a.NodeID, nil)
expectUp := p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp} expectUp := p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}
require.NotEmpty(t, s1) require.NotEmpty(t, s1)


+ 12
- 7
internal/p2p/router.go View File

@ -260,7 +260,7 @@ type Router struct {
peerMtx sync.RWMutex peerMtx sync.RWMutex
peerQueues map[types.NodeID]queue // outbound messages per peer for all channels peerQueues map[types.NodeID]queue // outbound messages per peer for all channels
// the channels that the peer queue has open // the channels that the peer queue has open
peerChannels map[types.NodeID]channelIDs
peerChannels map[types.NodeID]ChannelIDSet
queueFactory func(int) queue queueFactory func(int) queue
// FIXME: We don't strictly need to use a mutex for this if we seal the // FIXME: We don't strictly need to use a mutex for this if we seal the
@ -306,7 +306,7 @@ func NewRouter(
channelQueues: map[ChannelID]queue{}, channelQueues: map[ChannelID]queue{},
channelMessages: map[ChannelID]proto.Message{}, channelMessages: map[ChannelID]proto.Message{},
peerQueues: map[types.NodeID]queue{}, peerQueues: map[types.NodeID]queue{},
peerChannels: make(map[types.NodeID]channelIDs),
peerChannels: make(map[types.NodeID]ChannelIDSet),
} }
router.BaseService = service.NewBaseService(logger, "router", router) router.BaseService = service.NewBaseService(logger, "router", router)
@ -739,7 +739,7 @@ func (r *Router) connectPeer(ctx context.Context, address NodeAddress) {
go r.routePeer(address.NodeID, conn, toChannelIDs(peerInfo.Channels)) go r.routePeer(address.NodeID, conn, toChannelIDs(peerInfo.Channels))
} }
func (r *Router) getOrMakeQueue(peerID types.NodeID, channels channelIDs) queue {
func (r *Router) getOrMakeQueue(peerID types.NodeID, channels ChannelIDSet) queue {
r.peerMtx.Lock() r.peerMtx.Lock()
defer r.peerMtx.Unlock() defer r.peerMtx.Unlock()
@ -851,9 +851,9 @@ func (r *Router) runWithPeerMutex(fn func() error) error {
// routePeer routes inbound and outbound messages between a peer and the reactor // routePeer routes inbound and outbound messages between a peer and the reactor
// channels. It will close the given connection and send queue when done, or if // channels. It will close the given connection and send queue when done, or if
// they are closed elsewhere it will cause this method to shut down and return. // they are closed elsewhere it will cause this method to shut down and return.
func (r *Router) routePeer(peerID types.NodeID, conn Connection, channels channelIDs) {
func (r *Router) routePeer(peerID types.NodeID, conn Connection, channels ChannelIDSet) {
r.metrics.Peers.Add(1) r.metrics.Peers.Add(1)
r.peerManager.Ready(peerID)
r.peerManager.Ready(peerID, channels)
sendQueue := r.getOrMakeQueue(peerID, channels) sendQueue := r.getOrMakeQueue(peerID, channels)
defer func() { defer func() {
@ -1092,9 +1092,14 @@ func (r *Router) stopCtx() context.Context {
return ctx return ctx
} }
type channelIDs map[ChannelID]struct{}
type ChannelIDSet map[ChannelID]struct{}
func toChannelIDs(bytes []byte) channelIDs {
func (cs ChannelIDSet) Contains(id ChannelID) bool {
_, ok := cs[id]
return ok
}
func toChannelIDs(bytes []byte) ChannelIDSet {
c := make(map[ChannelID]struct{}, len(bytes)) c := make(map[ChannelID]struct{}, len(bytes))
for _, b := range bytes { for _, b := range bytes {
c[ChannelID(b)] = struct{}{} c[ChannelID(b)] = struct{}{}


+ 12
- 1
internal/statesync/reactor.go View File

@ -885,7 +885,17 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
switch peerUpdate.Status { switch peerUpdate.Status {
case p2p.PeerStatusUp: case p2p.PeerStatusUp:
r.peers.Append(peerUpdate.NodeID)
if peerUpdate.Channels.Contains(SnapshotChannel) &&
peerUpdate.Channels.Contains(ChunkChannel) &&
peerUpdate.Channels.Contains(LightBlockChannel) &&
peerUpdate.Channels.Contains(ParamsChannel) {
r.peers.Append(peerUpdate.NodeID)
} else {
r.Logger.Error("could not use peer for statesync", "peer", peerUpdate.NodeID)
}
case p2p.PeerStatusDown: case p2p.PeerStatusDown:
r.peers.Remove(peerUpdate.NodeID) r.peers.Remove(peerUpdate.NodeID)
} }
@ -898,6 +908,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
switch peerUpdate.Status { switch peerUpdate.Status {
case p2p.PeerStatusUp: case p2p.PeerStatusUp:
newProvider := NewBlockProvider(peerUpdate.NodeID, r.chainID, r.dispatcher) newProvider := NewBlockProvider(peerUpdate.NodeID, r.chainID, r.dispatcher)
r.providers[peerUpdate.NodeID] = newProvider r.providers[peerUpdate.NodeID] = newProvider
err := r.syncer.AddPeer(peerUpdate.NodeID) err := r.syncer.AddPeer(peerUpdate.NodeID)


+ 24
- 0
internal/statesync/reactor_test.go View File

@ -453,10 +453,22 @@ func TestReactor_BlockProviders(t *testing.T) {
rts.peerUpdateCh <- p2p.PeerUpdate{ rts.peerUpdateCh <- p2p.PeerUpdate{
NodeID: types.NodeID("aa"), NodeID: types.NodeID("aa"),
Status: p2p.PeerStatusUp, Status: p2p.PeerStatusUp,
Channels: p2p.ChannelIDSet{
SnapshotChannel: struct{}{},
ChunkChannel: struct{}{},
LightBlockChannel: struct{}{},
ParamsChannel: struct{}{},
},
} }
rts.peerUpdateCh <- p2p.PeerUpdate{ rts.peerUpdateCh <- p2p.PeerUpdate{
NodeID: types.NodeID("bb"), NodeID: types.NodeID("bb"),
Status: p2p.PeerStatusUp, Status: p2p.PeerStatusUp,
Channels: p2p.ChannelIDSet{
SnapshotChannel: struct{}{},
ChunkChannel: struct{}{},
LightBlockChannel: struct{}{},
ParamsChannel: struct{}{},
},
} }
closeCh := make(chan struct{}) closeCh := make(chan struct{})
@ -591,6 +603,12 @@ func TestReactor_Backfill(t *testing.T) {
rts.peerUpdateCh <- p2p.PeerUpdate{ rts.peerUpdateCh <- p2p.PeerUpdate{
NodeID: types.NodeID(peer), NodeID: types.NodeID(peer),
Status: p2p.PeerStatusUp, Status: p2p.PeerStatusUp,
Channels: p2p.ChannelIDSet{
SnapshotChannel: struct{}{},
ChunkChannel: struct{}{},
LightBlockChannel: struct{}{},
ParamsChannel: struct{}{},
},
} }
} }
@ -789,6 +807,12 @@ func graduallyAddPeers(
peerUpdateCh <- p2p.PeerUpdate{ peerUpdateCh <- p2p.PeerUpdate{
NodeID: factory.RandomNodeID(), NodeID: factory.RandomNodeID(),
Status: p2p.PeerStatusUp, Status: p2p.PeerStatusUp,
Channels: p2p.ChannelIDSet{
SnapshotChannel: struct{}{},
ChunkChannel: struct{}{},
LightBlockChannel: struct{}{},
ParamsChannel: struct{}{},
},
} }
case <-closeCh: case <-closeCh:
return return


Loading…
Cancel
Save