From 059b38afe4cb309a5f39f62548cc19964be2c0c0 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Fri, 7 Jan 2022 12:49:22 -0500 Subject: [PATCH] mempool: refactor mempool constructor (#7530) --- internal/mempool/reactor.go | 18 ++++++++++++------ internal/mempool/reactor_test.go | 12 +++++++++--- node/setup.go | 14 ++++++-------- 3 files changed, 27 insertions(+), 17 deletions(-) diff --git a/internal/mempool/reactor.go b/internal/mempool/reactor.go index 14b52e917..d1d027bd8 100644 --- a/internal/mempool/reactor.go +++ b/internal/mempool/reactor.go @@ -63,13 +63,19 @@ type Reactor struct { // NewReactor returns a reference to a new reactor. func NewReactor( + ctx context.Context, logger log.Logger, cfg *config.MempoolConfig, peerMgr PeerManager, txmp *TxMempool, - mempoolCh *p2p.Channel, + chCreator p2p.ChannelCreator, peerUpdates *p2p.PeerUpdates, -) *Reactor { +) (*Reactor, error) { + + ch, err := chCreator(ctx, getChannelDescriptor(cfg)) + if err != nil { + return nil, err + } r := &Reactor{ logger: logger, @@ -77,21 +83,21 @@ func NewReactor( peerMgr: peerMgr, mempool: txmp, ids: NewMempoolIDs(), - mempoolCh: mempoolCh, + mempoolCh: ch, peerUpdates: peerUpdates, peerRoutines: make(map[types.NodeID]*tmsync.Closer), observePanic: defaultObservePanic, } r.BaseService = *service.NewBaseService(logger, "Mempool", r) - return r + return r, nil } func defaultObservePanic(r interface{}) {} -// GetChannelDescriptor produces an instance of a descriptor for this +// getChannelDescriptor produces an instance of a descriptor for this // package's required channels. -func GetChannelDescriptor(cfg *config.MempoolConfig) *p2p.ChannelDescriptor { +func getChannelDescriptor(cfg *config.MempoolConfig) *p2p.ChannelDescriptor { largestTx := make([]byte, cfg.MaxTxBytes) batchMsg := protomem.Message{ Sum: &protomem.Message_Txs{ diff --git a/internal/mempool/reactor_test.go b/internal/mempool/reactor_test.go index e3f0b5718..d8a536084 100644 --- a/internal/mempool/reactor_test.go +++ b/internal/mempool/reactor_test.go @@ -55,7 +55,7 @@ func setupReactors(ctx context.Context, t *testing.T, numNodes int, chBuf uint) peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes), } - chDesc := GetChannelDescriptor(cfg.Mempool) + chDesc := getChannelDescriptor(cfg.Mempool) rts.mempoolChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc) for nodeID := range rts.network.Nodes { @@ -68,15 +68,21 @@ func setupReactors(ctx context.Context, t *testing.T, numNodes int, chBuf uint) rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1) rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID]) - rts.reactors[nodeID] = NewReactor( + chCreator := func(ctx context.Context, chDesc *p2p.ChannelDescriptor) (*p2p.Channel, error) { + return rts.mempoolChannels[nodeID], nil + } + + rts.reactors[nodeID], err = NewReactor( + ctx, rts.logger.With("nodeID", nodeID), cfg.Mempool, rts.network.Nodes[nodeID].PeerManager, mempool, - rts.mempoolChannels[nodeID], + chCreator, rts.peerUpdates[nodeID], ) + require.NoError(t, err) rts.nodes = append(rts.nodes, nodeID) require.NoError(t, rts.reactors[nodeID].Start(ctx)) diff --git a/node/setup.go b/node/setup.go index af8a4b789..d3feba772 100644 --- a/node/setup.go +++ b/node/setup.go @@ -175,14 +175,8 @@ func createMempoolReactor( router *p2p.Router, logger log.Logger, ) (service.Service, mempool.Mempool, error) { - logger = logger.With("module", "mempool") - ch, err := router.OpenChannel(ctx, mempool.GetChannelDescriptor(cfg.Mempool)) - if err != nil { - return nil, nil, err - } - mp := mempool.NewTxMempool( logger, cfg.Mempool, @@ -193,14 +187,18 @@ func createMempoolReactor( mempool.WithPostCheck(sm.TxPostCheck(state)), ) - reactor := mempool.NewReactor( + reactor, err := mempool.NewReactor( + ctx, logger, cfg.Mempool, peerManager, mp, - ch, + router.OpenChannel, peerManager.Subscribe(ctx), ) + if err != nil { + return nil, nil, err + } if cfg.Consensus.WaitForTxs() { mp.EnableTxsAvailable()