Browse Source

node+consensus: handshaker initialization (#7283)

This mostly just pushes more of initialization out of the node package.
pull/7289/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
a15ae5b53a
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 63 additions and 66 deletions
  1. +5
    -4
      cmd/tendermint/commands/replay.go
  2. +10
    -14
      internal/consensus/replay.go
  3. +32
    -17
      internal/consensus/replay_file.go
  4. +12
    -10
      internal/consensus/replay_test.go
  5. +4
    -1
      node/node.go
  6. +0
    -20
      node/setup.go

+ 5
- 4
cmd/tendermint/commands/replay.go View File

@ -9,8 +9,9 @@ import (
var ReplayCmd = &cobra.Command{
Use: "replay",
Short: "Replay messages from WAL",
Run: func(cmd *cobra.Command, args []string) {
consensus.RunReplayFile(config.BaseConfig, config.Consensus, false)
RunE: func(cmd *cobra.Command, args []string) error {
return consensus.RunReplayFile(logger, config.BaseConfig, config.Consensus, false)
},
}
@ -19,7 +20,7 @@ var ReplayCmd = &cobra.Command{
var ReplayConsoleCmd = &cobra.Command{
Use: "replay-console",
Short: "Replay messages from WAL in a console",
Run: func(cmd *cobra.Command, args []string) {
consensus.RunReplayFile(config.BaseConfig, config.Consensus, true)
RunE: func(cmd *cobra.Command, args []string) error {
return consensus.RunReplayFile(logger, config.BaseConfig, config.Consensus, true)
},
}

+ 10
- 14
internal/consensus/replay.go View File

@ -211,30 +211,26 @@ type Handshaker struct {
nBlocks int // number of blocks applied to the state
}
func NewHandshaker(stateStore sm.Store, state sm.State,
store sm.BlockStore, genDoc *types.GenesisDoc) *Handshaker {
func NewHandshaker(
logger log.Logger,
stateStore sm.Store,
state sm.State,
store sm.BlockStore,
eventBus types.BlockEventPublisher,
genDoc *types.GenesisDoc,
) *Handshaker {
return &Handshaker{
stateStore: stateStore,
initialState: state,
store: store,
eventBus: eventbus.NopEventBus{},
eventBus: eventBus,
genDoc: genDoc,
logger: log.NewNopLogger(),
logger: logger,
nBlocks: 0,
}
}
func (h *Handshaker) SetLogger(l log.Logger) {
h.logger = l
}
// SetEventBus - sets the event bus for publishing block related events.
// If not called, it defaults to types.NopEventBus.
func (h *Handshaker) SetEventBus(eventBus types.BlockEventPublisher) {
h.eventBus = eventBus
}
// NBlocks returns the number of blocks applied to the state.
func (h *Handshaker) NBlocks() int {
return h.nBlocks


+ 32
- 17
internal/consensus/replay_file.go View File

@ -32,12 +32,22 @@ const (
// replay messages interactively or all at once
// replay the wal file
func RunReplayFile(cfg config.BaseConfig, csConfig *config.ConsensusConfig, console bool) {
consensusState := newConsensusStateForReplay(cfg, csConfig)
func RunReplayFile(
logger log.Logger,
cfg config.BaseConfig,
csConfig *config.ConsensusConfig,
console bool,
) error {
consensusState, err := newConsensusStateForReplay(cfg, logger, csConfig)
if err != nil {
return err
}
if err := consensusState.ReplayFile(csConfig.WalFile(), console); err != nil {
tmos.Exit(fmt.Sprintf("Error during consensus replay: %v", err))
return fmt.Errorf("consensus replay: %w", err)
}
return nil
}
// Replay msgs in file or start the console
@ -293,28 +303,34 @@ func (pb *playback) replayConsoleLoop() int {
//--------------------------------------------------------------------------------
// convenience for replay mode
func newConsensusStateForReplay(cfg config.BaseConfig, csConfig *config.ConsensusConfig) *State {
func newConsensusStateForReplay(
cfg config.BaseConfig,
logger log.Logger,
csConfig *config.ConsensusConfig,
) (*State, error) {
dbType := dbm.BackendType(cfg.DBBackend)
// Get BlockStore
blockStoreDB, err := dbm.NewDB("blockstore", dbType, cfg.DBDir())
if err != nil {
tmos.Exit(err.Error())
return nil, err
}
blockStore := store.NewBlockStore(blockStoreDB)
// Get State
stateDB, err := dbm.NewDB("state", dbType, cfg.DBDir())
if err != nil {
tmos.Exit(err.Error())
return nil, err
}
stateStore := sm.NewStore(stateDB)
gdoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
if err != nil {
tmos.Exit(err.Error())
return nil, err
}
state, err := sm.MakeGenesisState(gdoc)
if err != nil {
tmos.Exit(err.Error())
return nil, err
}
// Create proxyAppConn connection (consensus, mempool, query)
@ -322,27 +338,26 @@ func newConsensusStateForReplay(cfg config.BaseConfig, csConfig *config.Consensu
proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics())
err = proxyApp.Start()
if err != nil {
tmos.Exit(fmt.Sprintf("Error starting proxy app conns: %v", err))
return nil, fmt.Errorf("starting proxy app conns: %w", err)
}
eventBus := eventbus.NewDefault()
if err := eventBus.Start(); err != nil {
tmos.Exit(fmt.Sprintf("Failed to start event bus: %v", err))
return nil, fmt.Errorf("failed to start event bus: %w", err)
}
handshaker := NewHandshaker(stateStore, state, blockStore, gdoc)
handshaker.SetEventBus(eventBus)
err = handshaker.Handshake(proxyApp)
if err != nil {
tmos.Exit(fmt.Sprintf("Error on handshake: %v", err))
handshaker := NewHandshaker(logger, stateStore, state, blockStore, eventBus, gdoc)
if err = handshaker.Handshake(proxyApp); err != nil {
return nil, err
}
mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(), mempool, evpool, blockStore)
consensusState := NewState(csConfig, state.Copy(), blockExec,
blockStore, mempool, evpool)
consensusState.SetEventBus(eventBus)
return consensusState
return consensusState, nil
}

+ 12
- 10
internal/consensus/replay_test.go View File

@ -25,6 +25,7 @@ import (
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/encoding"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/internal/proxy"
sm "github.com/tendermint/tendermint/internal/state"
@ -772,7 +773,7 @@ func testHandshakeReplay(t *testing.T, sim *simulatorTestSuite, nBlocks int, mod
// now start the app using the handshake - it should sync
genDoc, _ := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
handshaker := NewHandshaker(stateStore, state, store, genDoc)
handshaker := NewHandshaker(log.TestingLogger(), stateStore, state, store, eventbus.NopEventBus{}, genDoc)
proxyApp := proxy.NewAppConns(clientCreator2, proxy.NopMetrics())
if err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err)
@ -962,6 +963,8 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
blocks := sf.MakeBlocks(3, &state, privVal)
store.chain = blocks
logger := log.TestingLogger()
// 2. Tendermint must panic if app returns wrong hash for the first block
// - RANDOM HASH
// - 0x02
@ -979,7 +982,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
})
assert.Panics(t, func() {
h := NewHandshaker(stateStore, state, store, genDoc)
h := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc)
if err = h.Handshake(proxyApp); err != nil {
t.Log(err)
}
@ -1003,7 +1006,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
})
assert.Panics(t, func() {
h := NewHandshaker(stateStore, state, store, genDoc)
h := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc)
if err = h.Handshake(proxyApp); err != nil {
t.Log(err)
}
@ -1251,17 +1254,16 @@ func TestHandshakeUpdatesValidators(t *testing.T) {
oldValAddr := state.Validators.Validators[0].Address
// now start the app using the handshake - it should sync
genDoc, _ := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
handshaker := NewHandshaker(stateStore, state, store, genDoc)
genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
require.NoError(t, err)
handshaker := NewHandshaker(log.TestingLogger(), stateStore, state, store, eventbus.NopEventBus{}, genDoc)
proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics())
if err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err)
}
t.Cleanup(func() {
if err := proxyApp.Stop(); err != nil {
t.Error(err)
}
})
t.Cleanup(func() { require.NoError(t, proxyApp.Stop()) })
if err := handshaker.Handshake(proxyApp); err != nil {
t.Fatalf("Error on abci handshake: %v", err)
}


+ 4
- 1
node/node.go View File

@ -224,7 +224,10 @@ func makeNode(cfg *config.Config,
// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
// and replays any blocks as necessary to sync tendermint with the app.
if !stateSync {
if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, logger); err != nil {
if err := consensus.NewHandshaker(
logger.With("module", "handshaker"),
stateStore, state, blockStore, eventBus, genDoc,
).Handshake(proxyApp); err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}


+ 0
- 20
node/setup.go View File

@ -134,26 +134,6 @@ func createAndStartIndexerService(
return indexerService, eventSinks, nil
}
func doHandshake(
stateStore sm.Store,
state sm.State,
blockStore sm.BlockStore,
genDoc *types.GenesisDoc,
eventBus types.BlockEventPublisher,
proxyApp proxy.AppConns,
logger log.Logger,
) error {
handshaker := consensus.NewHandshaker(stateStore, state, blockStore, genDoc)
handshaker.SetLogger(logger.With("module", "handshaker"))
handshaker.SetEventBus(eventBus)
if err := handshaker.Handshake(proxyApp); err != nil {
return fmt.Errorf("error during handshake: %v", err)
}
return nil
}
func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger log.Logger, mode string) {
// Log the version info.
logger.Info("Version info",


Loading…
Cancel
Save