From ae68fcb78a5400f0ab83a756ca8da28ea3c3cf6b Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 28 Dec 2017 18:58:05 -0500 Subject: [PATCH] move fireEvents to ApplyBlock --- blockchain/reactor.go | 1 - consensus/common_test.go | 2 +- consensus/state.go | 18 -------------- node/node.go | 1 + state/execution.go | 54 ++++++++++++++++++++++++++-------------- types/events.go | 7 ++++++ 6 files changed, 45 insertions(+), 38 deletions(-) diff --git a/blockchain/reactor.go b/blockchain/reactor.go index c8e794a13..d4b803dd6 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -284,7 +284,6 @@ FOR_LOOP: bcR.store.SaveBlock(first, firstParts, second.LastCommit) - // TODO: should we be firing events? need to fire NewBlock events manually ... // NOTE: we could improve performance if we // didn't make the app commit to disk every block // ... but we would need a way to get the hash without it persisting diff --git a/consensus/common_test.go b/consensus/common_test.go index eb574a219..249e77329 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -264,7 +264,7 @@ func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state sm.S evpool := types.MockEvidencePool{} // Make ConsensusReactor - stateDB := dbm.NewMemDB() // XXX !! + stateDB := dbm.NewMemDB() blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyAppConnCon, mempool, evpool) cs := NewConsensusState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) cs.SetLogger(log.TestingLogger()) diff --git a/consensus/state.go b/consensus/state.go index 477d872b0..69858da08 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1203,8 +1203,6 @@ func (cs *ConsensusState) finalizeCommit(height int64) { // Create a copy of the state for staging // and an event cache for txs stateCopy := cs.state.Copy() - txEventBuffer := types.NewTxEventBuffer(cs.eventBus, int(block.NumTxs)) - cs.blockExec.SetTxEventPublisher(txEventBuffer) // Execute and commit the block, update and save the state, and update the mempool. // NOTE: the block.AppHash wont reflect these txs until the next block @@ -1221,22 +1219,6 @@ func (cs *ConsensusState) finalizeCommit(height int64) { fail.Fail() // XXX - // Fire event for new block. - // NOTE: If we fail before firing, these events will never fire - // - // TODO: Either - // * Fire before persisting state, in ApplyBlock - // * Fire on start up if we haven't written any new WAL msgs - // Both options mean we may fire more than once. Is that fine ? - cs.eventBus.PublishEventNewBlock(types.EventDataNewBlock{block}) - cs.eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header}) - err = txEventBuffer.Flush() - if err != nil { - cs.Logger.Error("Failed to flush event buffer", "err", err) - } - - fail.Fail() // XXX - // NewHeightStep! cs.updateToState(stateCopy) diff --git a/node/node.go b/node/node.go index 2eea4ed26..fe51b941c 100644 --- a/node/node.go +++ b/node/node.go @@ -291,6 +291,7 @@ func NewNode(config *cfg.Config, eventBus.SetLogger(logger.With("module", "events")) // services which will be publishing and/or subscribing for messages (events) + blockExec.SetEventBus(eventBus) consensusReactor.SetEventBus(eventBus) // Transaction indexing diff --git a/state/execution.go b/state/execution.go index 68cb13bd7..8b05733a7 100644 --- a/state/execution.go +++ b/state/execution.go @@ -26,8 +26,8 @@ type BlockExecutor struct { // execute the app against this proxyApp proxy.AppConnConsensus - // tx events - txEventPublisher types.TxEventPublisher + // events + eventBus types.BlockEventPublisher // update these with block results after commit mempool types.Mempool @@ -36,27 +36,29 @@ type BlockExecutor struct { logger log.Logger } -// NewBlockExecutor returns a new BlockExecutor. +// NewBlockExecutor returns a new BlockExecutor with a NopEventBus. +// Call SetEventBus to provide one. func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus, mempool types.Mempool, evpool types.EvidencePool) *BlockExecutor { return &BlockExecutor{ - db: db, - proxyApp: proxyApp, - txEventPublisher: types.NopEventBus{}, - mempool: mempool, - evpool: evpool, - logger: logger, + db: db, + proxyApp: proxyApp, + eventBus: types.NopEventBus{}, + mempool: mempool, + evpool: evpool, + logger: logger, } } -// SetTxEventPublisher - set the transaction event publisher. If not called, -// it defaults to types.NopEventBus. -func (blockExec *BlockExecutor) SetTxEventPublisher(txEventPublisher types.TxEventPublisher) { - blockExec.txEventPublisher = txEventPublisher +// SetEventBus - sets the event bus for publishing block related events. +// If not called, it defaults to types.NopEventBus. +func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher) { + blockExec.eventBus = eventBus } // ApplyBlock validates the block against the state, executes it against the app, -// commits it, and saves the block and state. It's the only function that needs to be called +// fires the relevent events, commits the app, and saves the new state and responses. +// It's the only function that needs to be called // from outside this package to process and commit an entire block. // It takes a blockID to avoid recomputing the parts hash. func (blockExec *BlockExecutor) ApplyBlock(s State, blockID types.BlockID, block *types.Block) (State, error) { @@ -70,8 +72,6 @@ func (blockExec *BlockExecutor) ApplyBlock(s State, blockID types.BlockID, block return s, ErrProxyAppConn(err) } - fireEvents(blockExec.txEventPublisher, block, abciResponses) - fail.Fail() // XXX // save the results before we commit @@ -97,6 +97,12 @@ func (blockExec *BlockExecutor) ApplyBlock(s State, blockID types.BlockID, block s.AppHash = appHash SaveState(blockExec.db, s) + fail.Fail() // XXX + + // events are fired after everything else + // NOTE: if we crash between Commit and Save, events wont be fired during replay + fireEvents(blockExec.logger, blockExec.eventBus, block, abciResponses) + return s, nil } @@ -354,15 +360,27 @@ func updateState(s State, blockID types.BlockID, header *types.Header, }, nil } -func fireEvents(txEventPublisher types.TxEventPublisher, block *types.Block, abciResponses *ABCIResponses) { +// Fire NewBlock, NewBlockHeader. +// Fire TxEvent for every tx. +// NOTE: if Tendermint crashes before commit, some or all of these events may be published again. +func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, abciResponses *ABCIResponses) { + // NOTE: do we still need this buffer ? + txEventBuffer := types.NewTxEventBuffer(eventBus, int(block.NumTxs)) for i, tx := range block.Data.Txs { - txEventPublisher.PublishEventTx(types.EventDataTx{types.TxResult{ + txEventBuffer.PublishEventTx(types.EventDataTx{types.TxResult{ Height: block.Height, Index: uint32(i), Tx: tx, Result: *(abciResponses.DeliverTx[i]), }}) } + + eventBus.PublishEventNewBlock(types.EventDataNewBlock{block}) + eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header}) + err := txEventBuffer.Flush() + if err != nil { + logger.Error("Failed to flush event buffer", "err", err) + } } //---------------------------------------------------------------------------------------------------- diff --git a/types/events.go b/types/events.go index 5c41c6df6..d6f7b012c 100644 --- a/types/events.go +++ b/types/events.go @@ -175,6 +175,13 @@ func QueryForEvent(eventType string) tmpubsub.Query { return tmquery.MustParse(fmt.Sprintf("%s='%s'", EventTypeKey, eventType)) } +// BlockEventPublisher publishes all block related events +type BlockEventPublisher interface { + PublishEventNewBlock(block EventDataNewBlock) error + PublishEventNewBlockHeader(header EventDataNewBlockHeader) error + PublishEventTx(EventDataTx) error +} + type TxEventPublisher interface { PublishEventTx(EventDataTx) error }