Browse Source

p2p: plumb rudamentary service discovery to rectors and update statesync (#8030)

This is a little coarse, but the idea is that we'll send information
about the channels a peer has upon the peer-up event that we send to
reactors that we can then use to reject peers (if neeeded) from reactors.

This solves the problem where statesync would hang in test networks
(and presumably real) where we would attempt to statesync from seed
nodes, thereby hanging silently forever.
jasmina/4457_block_sync_verification
Sam Kleinman 2 years ago
committed by GitHub
parent
commit
58dc172611
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 111 additions and 37 deletions
  1. +3
    -4
      internal/p2p/p2ptest/network.go
  2. +7
    -3
      internal/p2p/p2ptest/require.go
  3. +16
    -9
      internal/p2p/peermanager.go
  4. +37
    -13
      internal/p2p/peermanager_test.go
  5. +12
    -7
      internal/p2p/router.go
  6. +12
    -1
      internal/statesync/reactor.go
  7. +24
    -0
      internal/statesync/reactor_test.go

+ 3
- 4
internal/p2p/p2ptest/network.go View File

@ -101,10 +101,8 @@ func (n *Network) Start(ctx context.Context, t *testing.T) {
case <-ctx.Done():
require.Fail(t, "operation canceled")
case peerUpdate := <-sourceSub.Updates():
require.Equal(t, p2p.PeerUpdate{
NodeID: targetNode.NodeID,
Status: p2p.PeerStatusUp,
}, peerUpdate)
require.Equal(t, targetNode.NodeID, peerUpdate.NodeID)
require.Equal(t, p2p.PeerStatusUp, peerUpdate.Status)
case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for peer", "%v dialing %v",
sourceNode.NodeID, targetNode.NodeID)
@ -114,6 +112,7 @@ func (n *Network) Start(ctx context.Context, t *testing.T) {
case <-ctx.Done():
require.Fail(t, "operation canceled")
case peerUpdate := <-targetSub.Updates():
peerUpdate.Channels = nil
require.Equal(t, p2p.PeerUpdate{
NodeID: sourceNode.NodeID,
Status: p2p.PeerStatusUp,


+ 7
- 3
internal/p2p/p2ptest/require.go View File

@ -136,8 +136,8 @@ func RequireUpdate(t *testing.T, peerUpdates *p2p.PeerUpdates, expect p2p.PeerUp
select {
case update := <-peerUpdates.Updates():
require.Equal(t, expect, update, "peer update did not match")
require.Equal(t, expect.NodeID, update.NodeID, "node id did not match")
require.Equal(t, expect.Status, update.Status, "statuses did not match")
case <-timer.C:
require.Fail(t, "timed out waiting for peer update", "expected %v", expect)
}
@ -155,7 +155,11 @@ func RequireUpdates(t *testing.T, peerUpdates *p2p.PeerUpdates, expect []p2p.Pee
case update := <-peerUpdates.Updates():
actual = append(actual, update)
if len(actual) == len(expect) {
require.Equal(t, expect, actual)
for idx := range expect {
require.Equal(t, expect[idx].NodeID, actual[idx].NodeID)
require.Equal(t, expect[idx].Status, actual[idx].Status)
}
return
}


+ 16
- 9
internal/p2p/peermanager.go View File

@ -47,8 +47,9 @@ const (
// PeerUpdate is a peer update event sent via PeerUpdates.
type PeerUpdate struct {
NodeID types.NodeID
Status PeerStatus
NodeID types.NodeID
Status PeerStatus
Channels ChannelIDSet
}
// PeerUpdates is a peer update subscription with notifications about peer
@ -674,19 +675,23 @@ func (m *PeerManager) Accepted(peerID types.NodeID) error {
return nil
}
// Ready marks a peer as ready, broadcasting status updates to subscribers. The
// peer must already be marked as connected. This is separate from Dialed() and
// Accepted() to allow the router to set up its internal queues before reactors
// start sending messages.
func (m *PeerManager) Ready(ctx context.Context, peerID types.NodeID) {
// Ready marks a peer as ready, broadcasting status updates to
// subscribers. The peer must already be marked as connected. This is
// separate from Dialed() and Accepted() to allow the router to set up
// its internal queues before reactors start sending messages. The
// channels set here are passed in the peer update broadcast to
// reactors, which can then mediate their own behavior based on the
// capability of the peers.
func (m *PeerManager) Ready(ctx context.Context, peerID types.NodeID, channels ChannelIDSet) {
m.mtx.Lock()
defer m.mtx.Unlock()
if m.connected[peerID] {
m.ready[peerID] = true
m.broadcast(ctx, PeerUpdate{
NodeID: peerID,
Status: PeerStatusUp,
NodeID: peerID,
Status: PeerStatusUp,
Channels: channels,
})
}
}
@ -1208,6 +1213,7 @@ type peerInfo struct {
// These fields are ephemeral, i.e. not persisted to the database.
Persistent bool
Seed bool
Height int64
FixedScore PeerScore // mainly for tests
@ -1230,6 +1236,7 @@ func peerInfoFromProto(msg *p2pproto.PeerInfo) (*peerInfo, error) {
return nil, err
}
p.AddressInfo[addressInfo.Address] = addressInfo
}
return p, p.Validate()
}


+ 37
- 13
internal/p2p/peermanager_test.go View File

@ -9,6 +9,7 @@ import (
"time"
"github.com/fortytw2/leaktest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
@ -1311,7 +1312,7 @@ func TestPeerManager_Ready(t *testing.T) {
require.Equal(t, p2p.PeerStatusDown, peerManager.Status(a.NodeID))
// Marking a as ready should transition it to PeerStatusUp and send an update.
peerManager.Ready(ctx, a.NodeID)
peerManager.Ready(ctx, a.NodeID, nil)
require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID))
require.Equal(t, p2p.PeerUpdate{
NodeID: a.NodeID,
@ -1323,11 +1324,34 @@ func TestPeerManager_Ready(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID))
peerManager.Ready(ctx, b.NodeID)
peerManager.Ready(ctx, b.NodeID, nil)
require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID))
require.Empty(t, sub.Updates())
}
func TestPeerManager_Ready_Channels(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pm, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
require.NoError(t, err)
sub := pm.Subscribe(ctx)
a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
added, err := pm.Add(a)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, pm.Accepted(a.NodeID))
pm.Ready(ctx, a.NodeID, p2p.ChannelIDSet{42: struct{}{}})
require.NotEmpty(t, sub.Updates())
update := <-sub.Updates()
assert.Equal(t, a.NodeID, update.NodeID)
require.True(t, update.Channels.Contains(42))
require.False(t, update.Channels.Contains(48))
}
// See TryEvictNext for most tests, this just tests blocking behavior.
func TestPeerManager_EvictNext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
@ -1342,7 +1366,7 @@ func TestPeerManager_EvictNext(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(ctx, a.NodeID)
peerManager.Ready(ctx, a.NodeID, nil)
// Since there are no peers to evict, EvictNext should block until timeout.
timeoutCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
@ -1378,7 +1402,7 @@ func TestPeerManager_EvictNext_WakeOnError(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(ctx, a.NodeID)
peerManager.Ready(ctx, a.NodeID, nil)
// Spawn a goroutine to error a peer after a delay.
go func() {
@ -1413,7 +1437,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeDialed(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(ctx, a.NodeID)
peerManager.Ready(ctx, a.NodeID, nil)
// Spawn a goroutine to upgrade to b with a delay.
go func() {
@ -1454,7 +1478,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeAccepted(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(ctx, a.NodeID)
peerManager.Ready(ctx, a.NodeID, nil)
// Spawn a goroutine to upgrade b with a delay.
go func() {
@ -1489,7 +1513,7 @@ func TestPeerManager_TryEvictNext(t *testing.T) {
// Connecting to a won't evict anything either.
require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(ctx, a.NodeID)
peerManager.Ready(ctx, a.NodeID, nil)
// But if a errors it should be evicted.
peerManager.Errored(a.NodeID, errors.New("foo"))
@ -1536,7 +1560,7 @@ func TestPeerManager_Disconnected(t *testing.T) {
_, err = peerManager.Add(a)
require.NoError(t, err)
require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(ctx, a.NodeID)
peerManager.Ready(ctx, a.NodeID, nil)
require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID))
require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{
@ -1591,7 +1615,7 @@ func TestPeerManager_Errored(t *testing.T) {
require.Zero(t, evict)
require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(ctx, a.NodeID)
peerManager.Ready(ctx, a.NodeID, nil)
evict, err = peerManager.TryEvictNext()
require.NoError(t, err)
require.Zero(t, evict)
@ -1624,7 +1648,7 @@ func TestPeerManager_Subscribe(t *testing.T) {
require.NoError(t, peerManager.Accepted(a.NodeID))
require.Empty(t, sub.Updates())
peerManager.Ready(ctx, a.NodeID)
peerManager.Ready(ctx, a.NodeID, nil)
require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates())
@ -1641,7 +1665,7 @@ func TestPeerManager_Subscribe(t *testing.T) {
require.NoError(t, peerManager.Dialed(a))
require.Empty(t, sub.Updates())
peerManager.Ready(ctx, a.NodeID)
peerManager.Ready(ctx, a.NodeID, nil)
require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates())
@ -1683,7 +1707,7 @@ func TestPeerManager_Subscribe_Close(t *testing.T) {
require.NoError(t, peerManager.Accepted(a.NodeID))
require.Empty(t, sub.Updates())
peerManager.Ready(ctx, a.NodeID)
peerManager.Ready(ctx, a.NodeID, nil)
require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates())
@ -1716,7 +1740,7 @@ func TestPeerManager_Subscribe_Broadcast(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(ctx, a.NodeID)
peerManager.Ready(ctx, a.NodeID, nil)
expectUp := p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}
require.NotEmpty(t, s1)


+ 12
- 7
internal/p2p/router.go View File

@ -162,7 +162,7 @@ type Router struct {
peerMtx sync.RWMutex
peerQueues map[types.NodeID]queue // outbound messages per peer for all channels
// the channels that the peer queue has open
peerChannels map[types.NodeID]channelIDs
peerChannels map[types.NodeID]ChannelIDSet
queueFactory func(int) queue
// FIXME: We don't strictly need to use a mutex for this if we seal the
@ -210,7 +210,7 @@ func NewRouter(
channelQueues: map[ChannelID]queue{},
channelMessages: map[ChannelID]proto.Message{},
peerQueues: map[types.NodeID]queue{},
peerChannels: make(map[types.NodeID]channelIDs),
peerChannels: make(map[types.NodeID]ChannelIDSet),
}
router.BaseService = service.NewBaseService(logger, "router", router)
@ -644,7 +644,7 @@ func (r *Router) connectPeer(ctx context.Context, address NodeAddress) {
go r.routePeer(ctx, address.NodeID, conn, toChannelIDs(peerInfo.Channels))
}
func (r *Router) getOrMakeQueue(peerID types.NodeID, channels channelIDs) queue {
func (r *Router) getOrMakeQueue(peerID types.NodeID, channels ChannelIDSet) queue {
r.peerMtx.Lock()
defer r.peerMtx.Unlock()
@ -756,9 +756,9 @@ func (r *Router) runWithPeerMutex(fn func() error) error {
// routePeer routes inbound and outbound messages between a peer and the reactor
// channels. It will close the given connection and send queue when done, or if
// they are closed elsewhere it will cause this method to shut down and return.
func (r *Router) routePeer(ctx context.Context, peerID types.NodeID, conn Connection, channels channelIDs) {
func (r *Router) routePeer(ctx context.Context, peerID types.NodeID, conn Connection, channels ChannelIDSet) {
r.metrics.Peers.Add(1)
r.peerManager.Ready(ctx, peerID)
r.peerManager.Ready(ctx, peerID, channels)
sendQueue := r.getOrMakeQueue(peerID, channels)
defer func() {
@ -1003,9 +1003,14 @@ func (r *Router) OnStop() {
}
}
type channelIDs map[ChannelID]struct{}
type ChannelIDSet map[ChannelID]struct{}
func toChannelIDs(bytes []byte) channelIDs {
func (cs ChannelIDSet) Contains(id ChannelID) bool {
_, ok := cs[id]
return ok
}
func toChannelIDs(bytes []byte) ChannelIDSet {
c := make(map[ChannelID]struct{}, len(bytes))
for _, b := range bytes {
c[ChannelID(b)] = struct{}{}


+ 12
- 1
internal/statesync/reactor.go View File

@ -862,7 +862,17 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
switch peerUpdate.Status {
case p2p.PeerStatusUp:
r.peers.Append(peerUpdate.NodeID)
if peerUpdate.Channels.Contains(SnapshotChannel) &&
peerUpdate.Channels.Contains(ChunkChannel) &&
peerUpdate.Channels.Contains(LightBlockChannel) &&
peerUpdate.Channels.Contains(ParamsChannel) {
r.peers.Append(peerUpdate.NodeID)
} else {
r.logger.Error("could not use peer for statesync", "peer", peerUpdate.NodeID)
}
case p2p.PeerStatusDown:
r.peers.Remove(peerUpdate.NodeID)
}
@ -875,6 +885,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
switch peerUpdate.Status {
case p2p.PeerStatusUp:
newProvider := NewBlockProvider(peerUpdate.NodeID, r.chainID, r.dispatcher)
r.providers[peerUpdate.NodeID] = newProvider
err := r.syncer.AddPeer(ctx, peerUpdate.NodeID)


+ 24
- 0
internal/statesync/reactor_test.go View File

@ -496,10 +496,22 @@ func TestReactor_BlockProviders(t *testing.T) {
rts.peerUpdateCh <- p2p.PeerUpdate{
NodeID: types.NodeID("aa"),
Status: p2p.PeerStatusUp,
Channels: p2p.ChannelIDSet{
SnapshotChannel: struct{}{},
ChunkChannel: struct{}{},
LightBlockChannel: struct{}{},
ParamsChannel: struct{}{},
},
}
rts.peerUpdateCh <- p2p.PeerUpdate{
NodeID: types.NodeID("bb"),
Status: p2p.PeerStatusUp,
Channels: p2p.ChannelIDSet{
SnapshotChannel: struct{}{},
ChunkChannel: struct{}{},
LightBlockChannel: struct{}{},
ParamsChannel: struct{}{},
},
}
closeCh := make(chan struct{})
@ -637,6 +649,12 @@ func TestReactor_Backfill(t *testing.T) {
rts.peerUpdateCh <- p2p.PeerUpdate{
NodeID: types.NodeID(peer),
Status: p2p.PeerStatusUp,
Channels: p2p.ChannelIDSet{
SnapshotChannel: struct{}{},
ChunkChannel: struct{}{},
LightBlockChannel: struct{}{},
ParamsChannel: struct{}{},
},
}
}
@ -854,6 +872,12 @@ func graduallyAddPeers(
peerUpdateCh <- p2p.PeerUpdate{
NodeID: factory.RandomNodeID(t),
Status: p2p.PeerStatusUp,
Channels: p2p.ChannelIDSet{
SnapshotChannel: struct{}{},
ChunkChannel: struct{}{},
LightBlockChannel: struct{}{},
ParamsChannel: struct{}{},
},
}
case <-closeCh:
return


Loading…
Cancel
Save