Browse Source

start eventBus & indexerService before replay and use them while replaying blocks (#3194)

so if we did not index the last block (because of panic or smth else), we index it during replay

Closes #3186
pull/3218/head
cong 5 years ago
committed by Anton Kaliaev
parent
commit
71e5939441
6 changed files with 60 additions and 41 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +9
    -0
      consensus/replay.go
  3. +6
    -5
      consensus/replay_file.go
  4. +41
    -32
      node/node.go
  5. +1
    -2
      state/execution.go
  6. +2
    -2
      state/execution_test.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -21,3 +21,4 @@ Special thanks to external contributors on this release:
### IMPROVEMENTS:
### BUG FIXES:
- [node] \#3186 EventBus and indexerService should be started before first block (for replay last block on handshake) execution

+ 9
- 0
consensus/replay.go View File

@ -196,6 +196,7 @@ type Handshaker struct {
stateDB dbm.DB
initialState sm.State
store sm.BlockStore
eventBus types.BlockEventPublisher
genDoc *types.GenesisDoc
logger log.Logger
@ -209,6 +210,7 @@ func NewHandshaker(stateDB dbm.DB, state sm.State,
stateDB: stateDB,
initialState: state,
store: store,
eventBus: types.NopEventBus{},
genDoc: genDoc,
logger: log.NewNopLogger(),
nBlocks: 0,
@ -219,6 +221,12 @@ func (h *Handshaker) SetLogger(l log.Logger) {
h.logger = l
}
// SetEventBus - sets the event bus for publishing block related events.
// If not called, it defaults to types.NopEventBus.
func (h *Handshaker) SetEventBus(eventBus types.BlockEventPublisher) {
h.eventBus = eventBus
}
func (h *Handshaker) NBlocks() int {
return h.nBlocks
}
@ -432,6 +440,7 @@ func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.Ap
meta := h.store.LoadBlockMeta(height)
blockExec := sm.NewBlockExecutor(h.stateDB, h.logger, proxyApp, sm.MockMempool{}, sm.MockEvidencePool{})
blockExec.SetEventBus(h.eventBus)
var err error
state, err = blockExec.ApplyBlock(state, meta.BlockID, block)


+ 6
- 5
consensus/replay_file.go View File

@ -326,17 +326,18 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
cmn.Exit(fmt.Sprintf("Error starting proxy app conns: %v", err))
}
eventBus := types.NewEventBus()
if err := eventBus.Start(); err != nil {
cmn.Exit(fmt.Sprintf("Failed to start event bus: %v", err))
}
handshaker := NewHandshaker(stateDB, state, blockStore, gdoc)
handshaker.SetEventBus(eventBus)
err = handshaker.Handshake(proxyApp)
if err != nil {
cmn.Exit(fmt.Sprintf("Error on handshake: %v", err))
}
eventBus := types.NewEventBus()
if err := eventBus.Start(); err != nil {
cmn.Exit(fmt.Sprintf("Failed to start event bus: %v", err))
}
mempool, evpool := sm.MockMempool{}, sm.MockEvidencePool{}
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)


+ 41
- 32
node/node.go View File

@ -217,11 +217,51 @@ func NewNode(config *cfg.Config,
return nil, fmt.Errorf("Error starting proxy app connections: %v", err)
}
// EventBus and IndexerService must be started before the handshake because
// we might need to index the txs of the replayed block as this might not have happened
// when the node stopped last time (i.e. the node stopped after it saved the block
// but before it indexed the txs, or, endblocker panicked)
eventBus := types.NewEventBus()
eventBus.SetLogger(logger.With("module", "events"))
err = eventBus.Start()
if err != nil {
return nil, err
}
// Transaction indexing
var txIndexer txindex.TxIndexer
switch config.TxIndex.Indexer {
case "kv":
store, err := dbProvider(&DBContext{"tx_index", config})
if err != nil {
return nil, err
}
if config.TxIndex.IndexTags != "" {
txIndexer = kv.NewTxIndex(store, kv.IndexTags(splitAndTrimEmpty(config.TxIndex.IndexTags, ",", " ")))
} else if config.TxIndex.IndexAllTags {
txIndexer = kv.NewTxIndex(store, kv.IndexAllTags())
} else {
txIndexer = kv.NewTxIndex(store)
}
default:
txIndexer = &null.TxIndex{}
}
indexerService := txindex.NewIndexerService(txIndexer, eventBus)
indexerService.SetLogger(logger.With("module", "txindex"))
err = indexerService.Start()
if err != nil {
return nil, err
}
// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
// and replays any blocks as necessary to sync tendermint with the app.
consensusLogger := logger.With("module", "consensus")
handshaker := cs.NewHandshaker(stateDB, state, blockStore, genDoc)
handshaker.SetLogger(consensusLogger)
handshaker.SetEventBus(eventBus)
if err := handshaker.Handshake(proxyApp); err != nil {
return nil, fmt.Errorf("Error during handshake: %v", err)
}
@ -343,35 +383,10 @@ func NewNode(config *cfg.Config,
consensusReactor := cs.NewConsensusReactor(consensusState, fastSync, cs.ReactorMetrics(csMetrics))
consensusReactor.SetLogger(consensusLogger)
eventBus := types.NewEventBus()
eventBus.SetLogger(logger.With("module", "events"))
// services which will be publishing and/or subscribing for messages (events)
// consensusReactor will set it on consensusState and blockExecutor
consensusReactor.SetEventBus(eventBus)
// Transaction indexing
var txIndexer txindex.TxIndexer
switch config.TxIndex.Indexer {
case "kv":
store, err := dbProvider(&DBContext{"tx_index", config})
if err != nil {
return nil, err
}
if config.TxIndex.IndexTags != "" {
txIndexer = kv.NewTxIndex(store, kv.IndexTags(splitAndTrimEmpty(config.TxIndex.IndexTags, ",", " ")))
} else if config.TxIndex.IndexAllTags {
txIndexer = kv.NewTxIndex(store, kv.IndexAllTags())
} else {
txIndexer = kv.NewTxIndex(store)
}
default:
txIndexer = &null.TxIndex{}
}
indexerService := txindex.NewIndexerService(txIndexer, eventBus)
indexerService.SetLogger(logger.With("module", "txindex"))
p2pLogger := logger.With("module", "p2p")
nodeInfo, err := makeNodeInfo(
config,
@ -534,11 +549,6 @@ func (n *Node) OnStart() error {
time.Sleep(genTime.Sub(now))
}
err := n.eventBus.Start()
if err != nil {
return err
}
// Add private IDs to addrbook to block those peers being added
n.addrBook.AddPrivateIDs(splitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " "))
@ -582,8 +592,7 @@ func (n *Node) OnStart() error {
}
}
// start tx indexer
return n.indexerService.Start()
return nil
}
// OnStop stops the Node. It implements cmn.Service.


+ 1
- 2
state/execution.go View File

@ -49,8 +49,7 @@ func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption {
// NewBlockExecutor returns a new BlockExecutor with a NopEventBus.
// Call SetEventBus to provide one.
func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus,
mempool Mempool, evpool EvidencePool, options ...BlockExecutorOption) *BlockExecutor {
func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus, mempool Mempool, evpool EvidencePool, options ...BlockExecutorOption) *BlockExecutor {
res := &BlockExecutor{
db: db,
proxyApp: proxyApp,


+ 2
- 2
state/execution_test.go View File

@ -309,8 +309,8 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
state, stateDB := state(1, 1)
blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(),
MockMempool{}, MockEvidencePool{})
blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), MockMempool{}, MockEvidencePool{})
eventBus := types.NewEventBus()
err = eventBus.Start()
require.NoError(t, err)


Loading…
Cancel
Save