Browse Source

blocksync: standardize construction process (#7531)

pull/7534/head
Sam Kleinman 2 years ago
committed by GitHub
parent
commit
aa76a367e0
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 27 additions and 41 deletions
  1. +8
    -1
      internal/blocksync/reactor.go
  2. +6
    -1
      internal/blocksync/reactor_test.go
  3. +13
    -5
      node/node.go
  4. +0
    -34
      node/setup.go

+ 8
- 1
internal/blocksync/reactor.go View File

@ -103,16 +103,18 @@ type Reactor struct {
// NewReactor returns new reactor instance. // NewReactor returns new reactor instance.
func NewReactor( func NewReactor(
ctx context.Context,
logger log.Logger, logger log.Logger,
state sm.State, state sm.State,
blockExec *sm.BlockExecutor, blockExec *sm.BlockExecutor,
store *store.BlockStore, store *store.BlockStore,
consReactor consensusReactor, consReactor consensusReactor,
blockSyncCh *p2p.Channel,
channelCreator p2p.ChannelCreator,
peerUpdates *p2p.PeerUpdates, peerUpdates *p2p.PeerUpdates,
blockSync bool, blockSync bool,
metrics *consensus.Metrics, metrics *consensus.Metrics,
) (*Reactor, error) { ) (*Reactor, error) {
if state.LastBlockHeight != store.Height() { if state.LastBlockHeight != store.Height() {
return nil, fmt.Errorf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()) return nil, fmt.Errorf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height())
} }
@ -125,6 +127,11 @@ func NewReactor(
requestsCh := make(chan BlockRequest, maxTotalRequesters) requestsCh := make(chan BlockRequest, maxTotalRequesters)
errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count. errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count.
blockSyncCh, err := channelCreator(ctx, GetChannelDescriptor())
if err != nil {
return nil, err
}
r := &Reactor{ r := &Reactor{
logger: logger, logger: logger,
initialState: state, initialState: state,


+ 6
- 1
internal/blocksync/reactor_test.go View File

@ -168,13 +168,18 @@ func (rts *reactorTestSuite) addNode(
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate) rts.peerChans[nodeID] = make(chan p2p.PeerUpdate)
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1) rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID]) rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
return rts.blockSyncChannels[nodeID], nil
}
rts.reactors[nodeID], err = NewReactor( rts.reactors[nodeID], err = NewReactor(
ctx,
rts.logger.With("nodeID", nodeID), rts.logger.With("nodeID", nodeID),
state.Copy(), state.Copy(),
blockExec, blockExec,
blockStore, blockStore,
nil, nil,
rts.blockSyncChannels[nodeID],
chCreator,
rts.peerUpdates[nodeID], rts.peerUpdates[nodeID],
rts.blockSync, rts.blockSync,
consensus.NopMetrics()) consensus.NopMetrics())


+ 13
- 5
node/node.go View File

@ -16,6 +16,7 @@ import (
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/internal/blocksync"
"github.com/tendermint/tendermint/internal/consensus" "github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/internal/mempool"
@ -327,13 +328,20 @@ func makeNode(
// Create the blockchain reactor. Note, we do not start block sync if we're // Create the blockchain reactor. Note, we do not start block sync if we're
// doing a state sync first. // doing a state sync first.
bcReactor, err := createBlockchainReactor(ctx,
logger, state, blockExec, blockStore, csReactor,
peerManager, router, blockSync && !stateSync, nodeMetrics.consensus,
bcReactor, err := blocksync.NewReactor(ctx,
logger.With("module", "blockchain"),
state.Copy(),
blockExec,
blockStore,
csReactor,
router.OpenChannel,
peerManager.Subscribe(ctx),
blockSync && !stateSync,
nodeMetrics.consensus,
) )
if err != nil { if err != nil {
return nil, combineCloseError( return nil, combineCloseError(
fmt.Errorf("could not create blockchain reactor: %w", err),
fmt.Errorf("could not create blocksync reactor: %w", err),
makeCloser(closers)) makeCloser(closers))
} }
@ -413,7 +421,7 @@ func makeNode(
ConsensusState: csState, ConsensusState: csState,
ConsensusReactor: csReactor, ConsensusReactor: csReactor,
BlockSyncReactor: bcReactor.(consensus.BlockSyncReactor),
BlockSyncReactor: bcReactor,
PeerManager: peerManager, PeerManager: peerManager,


+ 0
- 34
node/setup.go View File

@ -243,40 +243,6 @@ func createEvidenceReactor(
return evidenceReactor, evidencePool, nil return evidenceReactor, evidencePool, nil
} }
func createBlockchainReactor(
ctx context.Context,
logger log.Logger,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore *store.BlockStore,
csReactor *consensus.Reactor,
peerManager *p2p.PeerManager,
router *p2p.Router,
blockSync bool,
metrics *consensus.Metrics,
) (service.Service, error) {
logger = logger.With("module", "blockchain")
ch, err := router.OpenChannel(ctx, blocksync.GetChannelDescriptor())
if err != nil {
return nil, err
}
peerUpdates := peerManager.Subscribe(ctx)
reactor, err := blocksync.NewReactor(
logger, state.Copy(), blockExec, blockStore, csReactor,
ch, peerUpdates, blockSync,
metrics,
)
if err != nil {
return nil, err
}
return reactor, nil
}
func createConsensusReactor( func createConsensusReactor(
ctx context.Context, ctx context.Context,
cfg *config.Config, cfg *config.Config,


Loading…
Cancel
Save