From 90cf742065944d6e8260dc4cacb79580b00e4397 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Fri, 7 Jan 2022 13:52:11 -0500 Subject: [PATCH] pex: regularize reactor constructor (#7532) --- internal/p2p/pex/reactor.go | 14 ++++++++++---- internal/p2p/pex/reactor_test.go | 31 ++++++++++++++++++++++++++----- node/node.go | 5 +++-- node/setup.go | 15 --------------- 4 files changed, 39 insertions(+), 26 deletions(-) diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index fe57dc85b..b1f33081a 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -113,16 +113,22 @@ type Reactor struct { // NewReactor returns a reference to a new reactor. func NewReactor( + ctx context.Context, logger log.Logger, peerManager *p2p.PeerManager, - pexCh *p2p.Channel, + channelCreator p2p.ChannelCreator, peerUpdates *p2p.PeerUpdates, -) *Reactor { +) (*Reactor, error) { + + channel, err := channelCreator(ctx, ChannelDescriptor()) + if err != nil { + return nil, err + } r := &Reactor{ logger: logger, peerManager: peerManager, - pexCh: pexCh, + pexCh: channel, peerUpdates: peerUpdates, availablePeers: make(map[types.NodeID]struct{}), requestsSent: make(map[types.NodeID]struct{}), @@ -130,7 +136,7 @@ func NewReactor( } r.BaseService = *service.NewBaseService(logger, "PEX", r) - return r + return r, nil } // OnStart starts separate go routines for each p2p Channel and listens for diff --git a/internal/p2p/pex/reactor_test.go b/internal/p2p/pex/reactor_test.go index d83b6d3af..451ec37c1 100644 --- a/internal/p2p/pex/reactor_test.go +++ b/internal/p2p/pex/reactor_test.go @@ -295,7 +295,13 @@ func setupSingle(ctx context.Context, t *testing.T) *singleTestReactor { peerManager, err := p2p.NewPeerManager(nodeID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) require.NoError(t, err) - reactor := pex.NewReactor(log.TestingLogger(), peerManager, pexCh, peerUpdates) + chCreator := func(context.Context, *p2p.ChannelDescriptor) (*p2p.Channel, error) { + return pexCh, nil + } + + reactor, err := pex.NewReactor(ctx, log.TestingLogger(), peerManager, chCreator, peerUpdates) + require.NoError(t, err) + require.NoError(t, reactor.Start(ctx)) t.Cleanup(reactor.Wait) @@ -375,16 +381,23 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], chBuf) rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID]) + chCreator := func(context.Context, *p2p.ChannelDescriptor) (*p2p.Channel, error) { + return rts.pexChannels[nodeID], nil + } + // the first nodes in the array are always mock nodes if idx < opts.MockNodes { rts.mocks = append(rts.mocks, nodeID) } else { - rts.reactors[nodeID] = pex.NewReactor( + var err error + rts.reactors[nodeID], err = pex.NewReactor( + ctx, rts.logger.With("nodeID", nodeID), rts.network.Nodes[nodeID].PeerManager, - rts.pexChannels[nodeID], + chCreator, rts.peerUpdates[nodeID], ) + require.NoError(t, err) } rts.nodes = append(rts.nodes, nodeID) @@ -429,12 +442,20 @@ func (r *reactorTestSuite) addNodes(ctx context.Context, t *testing.T, nodes int r.peerChans[nodeID] = make(chan p2p.PeerUpdate, r.opts.BufferSize) r.peerUpdates[nodeID] = p2p.NewPeerUpdates(r.peerChans[nodeID], r.opts.BufferSize) r.network.Nodes[nodeID].PeerManager.Register(ctx, r.peerUpdates[nodeID]) - r.reactors[nodeID] = pex.NewReactor( + + chCreator := func(context.Context, *p2p.ChannelDescriptor) (*p2p.Channel, error) { + return r.pexChannels[nodeID], nil + } + + var err error + r.reactors[nodeID], err = pex.NewReactor( + ctx, r.logger.With("nodeID", nodeID), r.network.Nodes[nodeID].PeerManager, - r.pexChannels[nodeID], + chCreator, r.peerUpdates[nodeID], ) + require.NoError(t, err) r.nodes = append(r.nodes, nodeID) r.total++ } diff --git a/node/node.go b/node/node.go index cbc95e0ec..daf256104 100644 --- a/node/node.go +++ b/node/node.go @@ -21,6 +21,7 @@ import ( "github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/internal/p2p" + "github.com/tendermint/tendermint/internal/p2p/pex" "github.com/tendermint/tendermint/internal/proxy" tmpubsub "github.com/tendermint/tendermint/internal/pubsub" rpccore "github.com/tendermint/tendermint/internal/rpc/core" @@ -378,7 +379,7 @@ func makeNode( var pexReactor service.Service if cfg.P2P.PexReactor { - pexReactor, err = createPEXReactor(ctx, logger, peerManager, router) + pexReactor, err = pex.NewReactor(ctx, logger, peerManager, router.OpenChannel, peerManager.Subscribe(ctx)) if err != nil { return nil, combineCloseError(err, makeCloser(closers)) } @@ -487,7 +488,7 @@ func makeSeedNode( closer) } - pexReactor, err := createPEXReactor(ctx, logger, peerManager, router) + pexReactor, err := pex.NewReactor(ctx, logger, peerManager, router.OpenChannel, peerManager.Subscribe(ctx)) if err != nil { return nil, combineCloseError(err, closer) } diff --git a/node/setup.go b/node/setup.go index 5f1b078ab..97216cae3 100644 --- a/node/setup.go +++ b/node/setup.go @@ -408,21 +408,6 @@ func createRouter( ) } -func createPEXReactor( - ctx context.Context, - logger log.Logger, - peerManager *p2p.PeerManager, - router *p2p.Router, -) (service.Service, error) { - - channel, err := router.OpenChannel(ctx, pex.ChannelDescriptor()) - if err != nil { - return nil, err - } - - return pex.NewReactor(logger, peerManager, channel, peerManager.Subscribe(ctx)), nil -} - func makeNodeInfo( cfg *config.Config, nodeKey types.NodeKey,