Browse Source

pex: regularize reactor constructor (#7532)

pull/7534/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
90cf742065
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 39 additions and 26 deletions
  1. +10
    -4
      internal/p2p/pex/reactor.go
  2. +26
    -5
      internal/p2p/pex/reactor_test.go
  3. +3
    -2
      node/node.go
  4. +0
    -15
      node/setup.go

+ 10
- 4
internal/p2p/pex/reactor.go View File

@ -113,16 +113,22 @@ type Reactor struct {
// NewReactor returns a reference to a new reactor. // NewReactor returns a reference to a new reactor.
func NewReactor( func NewReactor(
ctx context.Context,
logger log.Logger, logger log.Logger,
peerManager *p2p.PeerManager, peerManager *p2p.PeerManager,
pexCh *p2p.Channel,
channelCreator p2p.ChannelCreator,
peerUpdates *p2p.PeerUpdates, peerUpdates *p2p.PeerUpdates,
) *Reactor {
) (*Reactor, error) {
channel, err := channelCreator(ctx, ChannelDescriptor())
if err != nil {
return nil, err
}
r := &Reactor{ r := &Reactor{
logger: logger, logger: logger,
peerManager: peerManager, peerManager: peerManager,
pexCh: pexCh,
pexCh: channel,
peerUpdates: peerUpdates, peerUpdates: peerUpdates,
availablePeers: make(map[types.NodeID]struct{}), availablePeers: make(map[types.NodeID]struct{}),
requestsSent: make(map[types.NodeID]struct{}), requestsSent: make(map[types.NodeID]struct{}),
@ -130,7 +136,7 @@ func NewReactor(
} }
r.BaseService = *service.NewBaseService(logger, "PEX", r) r.BaseService = *service.NewBaseService(logger, "PEX", r)
return r
return r, nil
} }
// OnStart starts separate go routines for each p2p Channel and listens for // OnStart starts separate go routines for each p2p Channel and listens for


+ 26
- 5
internal/p2p/pex/reactor_test.go View File

@ -295,7 +295,13 @@ func setupSingle(ctx context.Context, t *testing.T) *singleTestReactor {
peerManager, err := p2p.NewPeerManager(nodeID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) peerManager, err := p2p.NewPeerManager(nodeID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
require.NoError(t, err) require.NoError(t, err)
reactor := pex.NewReactor(log.TestingLogger(), peerManager, pexCh, peerUpdates)
chCreator := func(context.Context, *p2p.ChannelDescriptor) (*p2p.Channel, error) {
return pexCh, nil
}
reactor, err := pex.NewReactor(ctx, log.TestingLogger(), peerManager, chCreator, peerUpdates)
require.NoError(t, err)
require.NoError(t, reactor.Start(ctx)) require.NoError(t, reactor.Start(ctx))
t.Cleanup(reactor.Wait) t.Cleanup(reactor.Wait)
@ -375,16 +381,23 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], chBuf) rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], chBuf)
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID]) rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
chCreator := func(context.Context, *p2p.ChannelDescriptor) (*p2p.Channel, error) {
return rts.pexChannels[nodeID], nil
}
// the first nodes in the array are always mock nodes // the first nodes in the array are always mock nodes
if idx < opts.MockNodes { if idx < opts.MockNodes {
rts.mocks = append(rts.mocks, nodeID) rts.mocks = append(rts.mocks, nodeID)
} else { } else {
rts.reactors[nodeID] = pex.NewReactor(
var err error
rts.reactors[nodeID], err = pex.NewReactor(
ctx,
rts.logger.With("nodeID", nodeID), rts.logger.With("nodeID", nodeID),
rts.network.Nodes[nodeID].PeerManager, rts.network.Nodes[nodeID].PeerManager,
rts.pexChannels[nodeID],
chCreator,
rts.peerUpdates[nodeID], rts.peerUpdates[nodeID],
) )
require.NoError(t, err)
} }
rts.nodes = append(rts.nodes, nodeID) rts.nodes = append(rts.nodes, nodeID)
@ -429,12 +442,20 @@ func (r *reactorTestSuite) addNodes(ctx context.Context, t *testing.T, nodes int
r.peerChans[nodeID] = make(chan p2p.PeerUpdate, r.opts.BufferSize) r.peerChans[nodeID] = make(chan p2p.PeerUpdate, r.opts.BufferSize)
r.peerUpdates[nodeID] = p2p.NewPeerUpdates(r.peerChans[nodeID], r.opts.BufferSize) r.peerUpdates[nodeID] = p2p.NewPeerUpdates(r.peerChans[nodeID], r.opts.BufferSize)
r.network.Nodes[nodeID].PeerManager.Register(ctx, r.peerUpdates[nodeID]) r.network.Nodes[nodeID].PeerManager.Register(ctx, r.peerUpdates[nodeID])
r.reactors[nodeID] = pex.NewReactor(
chCreator := func(context.Context, *p2p.ChannelDescriptor) (*p2p.Channel, error) {
return r.pexChannels[nodeID], nil
}
var err error
r.reactors[nodeID], err = pex.NewReactor(
ctx,
r.logger.With("nodeID", nodeID), r.logger.With("nodeID", nodeID),
r.network.Nodes[nodeID].PeerManager, r.network.Nodes[nodeID].PeerManager,
r.pexChannels[nodeID],
chCreator,
r.peerUpdates[nodeID], r.peerUpdates[nodeID],
) )
require.NoError(t, err)
r.nodes = append(r.nodes, nodeID) r.nodes = append(r.nodes, nodeID)
r.total++ r.total++
} }


+ 3
- 2
node/node.go View File

@ -21,6 +21,7 @@ import (
"github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/pex"
"github.com/tendermint/tendermint/internal/proxy" "github.com/tendermint/tendermint/internal/proxy"
tmpubsub "github.com/tendermint/tendermint/internal/pubsub" tmpubsub "github.com/tendermint/tendermint/internal/pubsub"
rpccore "github.com/tendermint/tendermint/internal/rpc/core" rpccore "github.com/tendermint/tendermint/internal/rpc/core"
@ -378,7 +379,7 @@ func makeNode(
var pexReactor service.Service var pexReactor service.Service
if cfg.P2P.PexReactor { if cfg.P2P.PexReactor {
pexReactor, err = createPEXReactor(ctx, logger, peerManager, router)
pexReactor, err = pex.NewReactor(ctx, logger, peerManager, router.OpenChannel, peerManager.Subscribe(ctx))
if err != nil { if err != nil {
return nil, combineCloseError(err, makeCloser(closers)) return nil, combineCloseError(err, makeCloser(closers))
} }
@ -487,7 +488,7 @@ func makeSeedNode(
closer) closer)
} }
pexReactor, err := createPEXReactor(ctx, logger, peerManager, router)
pexReactor, err := pex.NewReactor(ctx, logger, peerManager, router.OpenChannel, peerManager.Subscribe(ctx))
if err != nil { if err != nil {
return nil, combineCloseError(err, closer) return nil, combineCloseError(err, closer)
} }


+ 0
- 15
node/setup.go View File

@ -408,21 +408,6 @@ func createRouter(
) )
} }
func createPEXReactor(
ctx context.Context,
logger log.Logger,
peerManager *p2p.PeerManager,
router *p2p.Router,
) (service.Service, error) {
channel, err := router.OpenChannel(ctx, pex.ChannelDescriptor())
if err != nil {
return nil, err
}
return pex.NewReactor(logger, peerManager, channel, peerManager.Subscribe(ctx)), nil
}
func makeNodeInfo( func makeNodeInfo(
cfg *config.Config, cfg *config.Config,
nodeKey types.NodeKey, nodeKey types.NodeKey,


Loading…
Cancel
Save