Browse Source

mempool: refactor mempool constructor (#7530)

pull/7533/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
059b38afe4
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 27 additions and 17 deletions
  1. +12
    -6
      internal/mempool/reactor.go
  2. +9
    -3
      internal/mempool/reactor_test.go
  3. +6
    -8
      node/setup.go

+ 12
- 6
internal/mempool/reactor.go View File

@ -63,13 +63,19 @@ type Reactor struct {
// NewReactor returns a reference to a new reactor. // NewReactor returns a reference to a new reactor.
func NewReactor( func NewReactor(
ctx context.Context,
logger log.Logger, logger log.Logger,
cfg *config.MempoolConfig, cfg *config.MempoolConfig,
peerMgr PeerManager, peerMgr PeerManager,
txmp *TxMempool, txmp *TxMempool,
mempoolCh *p2p.Channel,
chCreator p2p.ChannelCreator,
peerUpdates *p2p.PeerUpdates, peerUpdates *p2p.PeerUpdates,
) *Reactor {
) (*Reactor, error) {
ch, err := chCreator(ctx, getChannelDescriptor(cfg))
if err != nil {
return nil, err
}
r := &Reactor{ r := &Reactor{
logger: logger, logger: logger,
@ -77,21 +83,21 @@ func NewReactor(
peerMgr: peerMgr, peerMgr: peerMgr,
mempool: txmp, mempool: txmp,
ids: NewMempoolIDs(), ids: NewMempoolIDs(),
mempoolCh: mempoolCh,
mempoolCh: ch,
peerUpdates: peerUpdates, peerUpdates: peerUpdates,
peerRoutines: make(map[types.NodeID]*tmsync.Closer), peerRoutines: make(map[types.NodeID]*tmsync.Closer),
observePanic: defaultObservePanic, observePanic: defaultObservePanic,
} }
r.BaseService = *service.NewBaseService(logger, "Mempool", r) r.BaseService = *service.NewBaseService(logger, "Mempool", r)
return r
return r, nil
} }
func defaultObservePanic(r interface{}) {} func defaultObservePanic(r interface{}) {}
// GetChannelDescriptor produces an instance of a descriptor for this
// getChannelDescriptor produces an instance of a descriptor for this
// package's required channels. // package's required channels.
func GetChannelDescriptor(cfg *config.MempoolConfig) *p2p.ChannelDescriptor {
func getChannelDescriptor(cfg *config.MempoolConfig) *p2p.ChannelDescriptor {
largestTx := make([]byte, cfg.MaxTxBytes) largestTx := make([]byte, cfg.MaxTxBytes)
batchMsg := protomem.Message{ batchMsg := protomem.Message{
Sum: &protomem.Message_Txs{ Sum: &protomem.Message_Txs{


+ 9
- 3
internal/mempool/reactor_test.go View File

@ -55,7 +55,7 @@ func setupReactors(ctx context.Context, t *testing.T, numNodes int, chBuf uint)
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes), peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
} }
chDesc := GetChannelDescriptor(cfg.Mempool)
chDesc := getChannelDescriptor(cfg.Mempool)
rts.mempoolChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc) rts.mempoolChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc)
for nodeID := range rts.network.Nodes { for nodeID := range rts.network.Nodes {
@ -68,15 +68,21 @@ func setupReactors(ctx context.Context, t *testing.T, numNodes int, chBuf uint)
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1) rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID]) rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
rts.reactors[nodeID] = NewReactor(
chCreator := func(ctx context.Context, chDesc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
return rts.mempoolChannels[nodeID], nil
}
rts.reactors[nodeID], err = NewReactor(
ctx,
rts.logger.With("nodeID", nodeID), rts.logger.With("nodeID", nodeID),
cfg.Mempool, cfg.Mempool,
rts.network.Nodes[nodeID].PeerManager, rts.network.Nodes[nodeID].PeerManager,
mempool, mempool,
rts.mempoolChannels[nodeID],
chCreator,
rts.peerUpdates[nodeID], rts.peerUpdates[nodeID],
) )
require.NoError(t, err)
rts.nodes = append(rts.nodes, nodeID) rts.nodes = append(rts.nodes, nodeID)
require.NoError(t, rts.reactors[nodeID].Start(ctx)) require.NoError(t, rts.reactors[nodeID].Start(ctx))


+ 6
- 8
node/setup.go View File

@ -175,14 +175,8 @@ func createMempoolReactor(
router *p2p.Router, router *p2p.Router,
logger log.Logger, logger log.Logger,
) (service.Service, mempool.Mempool, error) { ) (service.Service, mempool.Mempool, error) {
logger = logger.With("module", "mempool") logger = logger.With("module", "mempool")
ch, err := router.OpenChannel(ctx, mempool.GetChannelDescriptor(cfg.Mempool))
if err != nil {
return nil, nil, err
}
mp := mempool.NewTxMempool( mp := mempool.NewTxMempool(
logger, logger,
cfg.Mempool, cfg.Mempool,
@ -193,14 +187,18 @@ func createMempoolReactor(
mempool.WithPostCheck(sm.TxPostCheck(state)), mempool.WithPostCheck(sm.TxPostCheck(state)),
) )
reactor := mempool.NewReactor(
reactor, err := mempool.NewReactor(
ctx,
logger, logger,
cfg.Mempool, cfg.Mempool,
peerManager, peerManager,
mp, mp,
ch,
router.OpenChannel,
peerManager.Subscribe(ctx), peerManager.Subscribe(ctx),
) )
if err != nil {
return nil, nil, err
}
if cfg.Consensus.WaitForTxs() { if cfg.Consensus.WaitForTxs() {
mp.EnableTxsAvailable() mp.EnableTxsAvailable()


Loading…
Cancel
Save