From aa76a367e045cb2b2ace57169b9929d74f015b7f Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Fri, 7 Jan 2022 13:40:08 -0500 Subject: [PATCH] blocksync: standardize construction process (#7531) --- internal/blocksync/reactor.go | 9 +++++++- internal/blocksync/reactor_test.go | 7 +++++- node/node.go | 18 +++++++++++----- node/setup.go | 34 ------------------------------ 4 files changed, 27 insertions(+), 41 deletions(-) diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index 361be537e..b3142d953 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -103,16 +103,18 @@ type Reactor struct { // NewReactor returns new reactor instance. func NewReactor( + ctx context.Context, logger log.Logger, state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore, consReactor consensusReactor, - blockSyncCh *p2p.Channel, + channelCreator p2p.ChannelCreator, peerUpdates *p2p.PeerUpdates, blockSync bool, metrics *consensus.Metrics, ) (*Reactor, error) { + if state.LastBlockHeight != store.Height() { return nil, fmt.Errorf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()) } @@ -125,6 +127,11 @@ func NewReactor( requestsCh := make(chan BlockRequest, maxTotalRequesters) errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count. + blockSyncCh, err := channelCreator(ctx, GetChannelDescriptor()) + if err != nil { + return nil, err + } + r := &Reactor{ logger: logger, initialState: state, diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index 786815e53..245f2753e 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -168,13 +168,18 @@ func (rts *reactorTestSuite) addNode( rts.peerChans[nodeID] = make(chan p2p.PeerUpdate) rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1) rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID]) + + chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) { + return rts.blockSyncChannels[nodeID], nil + } rts.reactors[nodeID], err = NewReactor( + ctx, rts.logger.With("nodeID", nodeID), state.Copy(), blockExec, blockStore, nil, - rts.blockSyncChannels[nodeID], + chCreator, rts.peerUpdates[nodeID], rts.blockSync, consensus.NopMetrics()) diff --git a/node/node.go b/node/node.go index 704bf7faa..cbc95e0ec 100644 --- a/node/node.go +++ b/node/node.go @@ -16,6 +16,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/crypto" + "github.com/tendermint/tendermint/internal/blocksync" "github.com/tendermint/tendermint/internal/consensus" "github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/mempool" @@ -327,13 +328,20 @@ func makeNode( // Create the blockchain reactor. Note, we do not start block sync if we're // doing a state sync first. - bcReactor, err := createBlockchainReactor(ctx, - logger, state, blockExec, blockStore, csReactor, - peerManager, router, blockSync && !stateSync, nodeMetrics.consensus, + bcReactor, err := blocksync.NewReactor(ctx, + logger.With("module", "blockchain"), + state.Copy(), + blockExec, + blockStore, + csReactor, + router.OpenChannel, + peerManager.Subscribe(ctx), + blockSync && !stateSync, + nodeMetrics.consensus, ) if err != nil { return nil, combineCloseError( - fmt.Errorf("could not create blockchain reactor: %w", err), + fmt.Errorf("could not create blocksync reactor: %w", err), makeCloser(closers)) } @@ -413,7 +421,7 @@ func makeNode( ConsensusState: csState, ConsensusReactor: csReactor, - BlockSyncReactor: bcReactor.(consensus.BlockSyncReactor), + BlockSyncReactor: bcReactor, PeerManager: peerManager, diff --git a/node/setup.go b/node/setup.go index e83af95c9..5f1b078ab 100644 --- a/node/setup.go +++ b/node/setup.go @@ -243,40 +243,6 @@ func createEvidenceReactor( return evidenceReactor, evidencePool, nil } -func createBlockchainReactor( - ctx context.Context, - logger log.Logger, - state sm.State, - blockExec *sm.BlockExecutor, - blockStore *store.BlockStore, - csReactor *consensus.Reactor, - peerManager *p2p.PeerManager, - router *p2p.Router, - blockSync bool, - metrics *consensus.Metrics, -) (service.Service, error) { - - logger = logger.With("module", "blockchain") - - ch, err := router.OpenChannel(ctx, blocksync.GetChannelDescriptor()) - if err != nil { - return nil, err - } - - peerUpdates := peerManager.Subscribe(ctx) - - reactor, err := blocksync.NewReactor( - logger, state.Copy(), blockExec, blockStore, csReactor, - ch, peerUpdates, blockSync, - metrics, - ) - if err != nil { - return nil, err - } - - return reactor, nil -} - func createConsensusReactor( ctx context.Context, cfg *config.Config,