From a15ae5b53af59be514bb4b4095aa5e8ec2a3b9fe Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Mon, 15 Nov 2021 13:28:52 -0500 Subject: [PATCH] node+consensus: handshaker initialization (#7283) This mostly just pushes more of initialization out of the node package. --- cmd/tendermint/commands/replay.go | 9 +++--- internal/consensus/replay.go | 24 +++++++-------- internal/consensus/replay_file.go | 49 ++++++++++++++++++++----------- internal/consensus/replay_test.go | 22 +++++++------- node/node.go | 5 +++- node/setup.go | 20 ------------- 6 files changed, 63 insertions(+), 66 deletions(-) diff --git a/cmd/tendermint/commands/replay.go b/cmd/tendermint/commands/replay.go index e92274042..558208ab3 100644 --- a/cmd/tendermint/commands/replay.go +++ b/cmd/tendermint/commands/replay.go @@ -9,8 +9,9 @@ import ( var ReplayCmd = &cobra.Command{ Use: "replay", Short: "Replay messages from WAL", - Run: func(cmd *cobra.Command, args []string) { - consensus.RunReplayFile(config.BaseConfig, config.Consensus, false) + RunE: func(cmd *cobra.Command, args []string) error { + return consensus.RunReplayFile(logger, config.BaseConfig, config.Consensus, false) + }, } @@ -19,7 +20,7 @@ var ReplayCmd = &cobra.Command{ var ReplayConsoleCmd = &cobra.Command{ Use: "replay-console", Short: "Replay messages from WAL in a console", - Run: func(cmd *cobra.Command, args []string) { - consensus.RunReplayFile(config.BaseConfig, config.Consensus, true) + RunE: func(cmd *cobra.Command, args []string) error { + return consensus.RunReplayFile(logger, config.BaseConfig, config.Consensus, true) }, } diff --git a/internal/consensus/replay.go b/internal/consensus/replay.go index 315fe0d9a..75945d3dc 100644 --- a/internal/consensus/replay.go +++ b/internal/consensus/replay.go @@ -211,30 +211,26 @@ type Handshaker struct { nBlocks int // number of blocks applied to the state } -func NewHandshaker(stateStore sm.Store, state sm.State, - store sm.BlockStore, genDoc *types.GenesisDoc) *Handshaker { +func NewHandshaker( + logger log.Logger, + stateStore sm.Store, + state sm.State, + store sm.BlockStore, + eventBus types.BlockEventPublisher, + genDoc *types.GenesisDoc, +) *Handshaker { return &Handshaker{ stateStore: stateStore, initialState: state, store: store, - eventBus: eventbus.NopEventBus{}, + eventBus: eventBus, genDoc: genDoc, - logger: log.NewNopLogger(), + logger: logger, nBlocks: 0, } } -func (h *Handshaker) SetLogger(l log.Logger) { - h.logger = l -} - -// SetEventBus - sets the event bus for publishing block related events. -// If not called, it defaults to types.NopEventBus. -func (h *Handshaker) SetEventBus(eventBus types.BlockEventPublisher) { - h.eventBus = eventBus -} - // NBlocks returns the number of blocks applied to the state. func (h *Handshaker) NBlocks() int { return h.nBlocks diff --git a/internal/consensus/replay_file.go b/internal/consensus/replay_file.go index c03b234aa..8243e4f22 100644 --- a/internal/consensus/replay_file.go +++ b/internal/consensus/replay_file.go @@ -32,12 +32,22 @@ const ( // replay messages interactively or all at once // replay the wal file -func RunReplayFile(cfg config.BaseConfig, csConfig *config.ConsensusConfig, console bool) { - consensusState := newConsensusStateForReplay(cfg, csConfig) +func RunReplayFile( + logger log.Logger, + cfg config.BaseConfig, + csConfig *config.ConsensusConfig, + console bool, +) error { + consensusState, err := newConsensusStateForReplay(cfg, logger, csConfig) + if err != nil { + return err + } if err := consensusState.ReplayFile(csConfig.WalFile(), console); err != nil { - tmos.Exit(fmt.Sprintf("Error during consensus replay: %v", err)) + return fmt.Errorf("consensus replay: %w", err) } + + return nil } // Replay msgs in file or start the console @@ -293,28 +303,34 @@ func (pb *playback) replayConsoleLoop() int { //-------------------------------------------------------------------------------- // convenience for replay mode -func newConsensusStateForReplay(cfg config.BaseConfig, csConfig *config.ConsensusConfig) *State { +func newConsensusStateForReplay( + cfg config.BaseConfig, + logger log.Logger, + csConfig *config.ConsensusConfig, +) (*State, error) { dbType := dbm.BackendType(cfg.DBBackend) // Get BlockStore blockStoreDB, err := dbm.NewDB("blockstore", dbType, cfg.DBDir()) if err != nil { - tmos.Exit(err.Error()) + return nil, err } blockStore := store.NewBlockStore(blockStoreDB) // Get State stateDB, err := dbm.NewDB("state", dbType, cfg.DBDir()) if err != nil { - tmos.Exit(err.Error()) + return nil, err } + stateStore := sm.NewStore(stateDB) gdoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile()) if err != nil { - tmos.Exit(err.Error()) + return nil, err } + state, err := sm.MakeGenesisState(gdoc) if err != nil { - tmos.Exit(err.Error()) + return nil, err } // Create proxyAppConn connection (consensus, mempool, query) @@ -322,27 +338,26 @@ func newConsensusStateForReplay(cfg config.BaseConfig, csConfig *config.Consensu proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics()) err = proxyApp.Start() if err != nil { - tmos.Exit(fmt.Sprintf("Error starting proxy app conns: %v", err)) + return nil, fmt.Errorf("starting proxy app conns: %w", err) } eventBus := eventbus.NewDefault() if err := eventBus.Start(); err != nil { - tmos.Exit(fmt.Sprintf("Failed to start event bus: %v", err)) + return nil, fmt.Errorf("failed to start event bus: %w", err) } - handshaker := NewHandshaker(stateStore, state, blockStore, gdoc) - handshaker.SetEventBus(eventBus) - err = handshaker.Handshake(proxyApp) - if err != nil { - tmos.Exit(fmt.Sprintf("Error on handshake: %v", err)) + handshaker := NewHandshaker(logger, stateStore, state, blockStore, eventBus, gdoc) + + if err = handshaker.Handshake(proxyApp); err != nil { + return nil, err } mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{} - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore) + blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(), mempool, evpool, blockStore) consensusState := NewState(csConfig, state.Copy(), blockExec, blockStore, mempool, evpool) consensusState.SetEventBus(eventBus) - return consensusState + return consensusState, nil } diff --git a/internal/consensus/replay_test.go b/internal/consensus/replay_test.go index 49b1b9092..a91d52b8a 100644 --- a/internal/consensus/replay_test.go +++ b/internal/consensus/replay_test.go @@ -25,6 +25,7 @@ import ( "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/encoding" + "github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/internal/proxy" sm "github.com/tendermint/tendermint/internal/state" @@ -772,7 +773,7 @@ func testHandshakeReplay(t *testing.T, sim *simulatorTestSuite, nBlocks int, mod // now start the app using the handshake - it should sync genDoc, _ := sm.MakeGenesisDocFromFile(cfg.GenesisFile()) - handshaker := NewHandshaker(stateStore, state, store, genDoc) + handshaker := NewHandshaker(log.TestingLogger(), stateStore, state, store, eventbus.NopEventBus{}, genDoc) proxyApp := proxy.NewAppConns(clientCreator2, proxy.NopMetrics()) if err := proxyApp.Start(); err != nil { t.Fatalf("Error starting proxy app connections: %v", err) @@ -962,6 +963,8 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) { blocks := sf.MakeBlocks(3, &state, privVal) store.chain = blocks + logger := log.TestingLogger() + // 2. Tendermint must panic if app returns wrong hash for the first block // - RANDOM HASH // - 0x02 @@ -979,7 +982,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) { }) assert.Panics(t, func() { - h := NewHandshaker(stateStore, state, store, genDoc) + h := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc) if err = h.Handshake(proxyApp); err != nil { t.Log(err) } @@ -1003,7 +1006,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) { }) assert.Panics(t, func() { - h := NewHandshaker(stateStore, state, store, genDoc) + h := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc) if err = h.Handshake(proxyApp); err != nil { t.Log(err) } @@ -1251,17 +1254,16 @@ func TestHandshakeUpdatesValidators(t *testing.T) { oldValAddr := state.Validators.Validators[0].Address // now start the app using the handshake - it should sync - genDoc, _ := sm.MakeGenesisDocFromFile(cfg.GenesisFile()) - handshaker := NewHandshaker(stateStore, state, store, genDoc) + genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile()) + require.NoError(t, err) + + handshaker := NewHandshaker(log.TestingLogger(), stateStore, state, store, eventbus.NopEventBus{}, genDoc) proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics()) if err := proxyApp.Start(); err != nil { t.Fatalf("Error starting proxy app connections: %v", err) } - t.Cleanup(func() { - if err := proxyApp.Stop(); err != nil { - t.Error(err) - } - }) + t.Cleanup(func() { require.NoError(t, proxyApp.Stop()) }) + if err := handshaker.Handshake(proxyApp); err != nil { t.Fatalf("Error on abci handshake: %v", err) } diff --git a/node/node.go b/node/node.go index fa703f1d6..3fe6ea08e 100644 --- a/node/node.go +++ b/node/node.go @@ -224,7 +224,10 @@ func makeNode(cfg *config.Config, // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state, // and replays any blocks as necessary to sync tendermint with the app. if !stateSync { - if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, logger); err != nil { + if err := consensus.NewHandshaker( + logger.With("module", "handshaker"), + stateStore, state, blockStore, eventBus, genDoc, + ).Handshake(proxyApp); err != nil { return nil, combineCloseError(err, makeCloser(closers)) } diff --git a/node/setup.go b/node/setup.go index 8d5fb21b9..e254571e3 100644 --- a/node/setup.go +++ b/node/setup.go @@ -134,26 +134,6 @@ func createAndStartIndexerService( return indexerService, eventSinks, nil } -func doHandshake( - stateStore sm.Store, - state sm.State, - blockStore sm.BlockStore, - genDoc *types.GenesisDoc, - eventBus types.BlockEventPublisher, - proxyApp proxy.AppConns, - logger log.Logger, -) error { - - handshaker := consensus.NewHandshaker(stateStore, state, blockStore, genDoc) - handshaker.SetLogger(logger.With("module", "handshaker")) - handshaker.SetEventBus(eventBus) - - if err := handshaker.Handshake(proxyApp); err != nil { - return fmt.Errorf("error during handshake: %v", err) - } - return nil -} - func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger log.Logger, mode string) { // Log the version info. logger.Info("Version info",