Browse Source

move fireEvents to ApplyBlock

pull/1015/head
Ethan Buchman 7 years ago
parent
commit
ae68fcb78a
6 changed files with 45 additions and 38 deletions
  1. +0
    -1
      blockchain/reactor.go
  2. +1
    -1
      consensus/common_test.go
  3. +0
    -18
      consensus/state.go
  4. +1
    -0
      node/node.go
  5. +36
    -18
      state/execution.go
  6. +7
    -0
      types/events.go

+ 0
- 1
blockchain/reactor.go View File

@ -284,7 +284,6 @@ FOR_LOOP:
bcR.store.SaveBlock(first, firstParts, second.LastCommit)
// TODO: should we be firing events? need to fire NewBlock events manually ...
// NOTE: we could improve performance if we
// didn't make the app commit to disk every block
// ... but we would need a way to get the hash without it persisting


+ 1
- 1
consensus/common_test.go View File

@ -264,7 +264,7 @@ func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state sm.S
evpool := types.MockEvidencePool{}
// Make ConsensusReactor
stateDB := dbm.NewMemDB() // XXX !!
stateDB := dbm.NewMemDB()
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewConsensusState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs.SetLogger(log.TestingLogger())


+ 0
- 18
consensus/state.go View File

@ -1203,8 +1203,6 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
// Create a copy of the state for staging
// and an event cache for txs
stateCopy := cs.state.Copy()
txEventBuffer := types.NewTxEventBuffer(cs.eventBus, int(block.NumTxs))
cs.blockExec.SetTxEventPublisher(txEventBuffer)
// Execute and commit the block, update and save the state, and update the mempool.
// NOTE: the block.AppHash wont reflect these txs until the next block
@ -1221,22 +1219,6 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
fail.Fail() // XXX
// Fire event for new block.
// NOTE: If we fail before firing, these events will never fire
//
// TODO: Either
// * Fire before persisting state, in ApplyBlock
// * Fire on start up if we haven't written any new WAL msgs
// Both options mean we may fire more than once. Is that fine ?
cs.eventBus.PublishEventNewBlock(types.EventDataNewBlock{block})
cs.eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header})
err = txEventBuffer.Flush()
if err != nil {
cs.Logger.Error("Failed to flush event buffer", "err", err)
}
fail.Fail() // XXX
// NewHeightStep!
cs.updateToState(stateCopy)


+ 1
- 0
node/node.go View File

@ -291,6 +291,7 @@ func NewNode(config *cfg.Config,
eventBus.SetLogger(logger.With("module", "events"))
// services which will be publishing and/or subscribing for messages (events)
blockExec.SetEventBus(eventBus)
consensusReactor.SetEventBus(eventBus)
// Transaction indexing


+ 36
- 18
state/execution.go View File

@ -26,8 +26,8 @@ type BlockExecutor struct {
// execute the app against this
proxyApp proxy.AppConnConsensus
// tx events
txEventPublisher types.TxEventPublisher
// events
eventBus types.BlockEventPublisher
// update these with block results after commit
mempool types.Mempool
@ -36,27 +36,29 @@ type BlockExecutor struct {
logger log.Logger
}
// NewBlockExecutor returns a new BlockExecutor.
// NewBlockExecutor returns a new BlockExecutor with a NopEventBus.
// Call SetEventBus to provide one.
func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus,
mempool types.Mempool, evpool types.EvidencePool) *BlockExecutor {
return &BlockExecutor{
db: db,
proxyApp: proxyApp,
txEventPublisher: types.NopEventBus{},
mempool: mempool,
evpool: evpool,
logger: logger,
db: db,
proxyApp: proxyApp,
eventBus: types.NopEventBus{},
mempool: mempool,
evpool: evpool,
logger: logger,
}
}
// SetTxEventPublisher - set the transaction event publisher. If not called,
// it defaults to types.NopEventBus.
func (blockExec *BlockExecutor) SetTxEventPublisher(txEventPublisher types.TxEventPublisher) {
blockExec.txEventPublisher = txEventPublisher
// SetEventBus - sets the event bus for publishing block related events.
// If not called, it defaults to types.NopEventBus.
func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher) {
blockExec.eventBus = eventBus
}
// ApplyBlock validates the block against the state, executes it against the app,
// commits it, and saves the block and state. It's the only function that needs to be called
// fires the relevent events, commits the app, and saves the new state and responses.
// It's the only function that needs to be called
// from outside this package to process and commit an entire block.
// It takes a blockID to avoid recomputing the parts hash.
func (blockExec *BlockExecutor) ApplyBlock(s State, blockID types.BlockID, block *types.Block) (State, error) {
@ -70,8 +72,6 @@ func (blockExec *BlockExecutor) ApplyBlock(s State, blockID types.BlockID, block
return s, ErrProxyAppConn(err)
}
fireEvents(blockExec.txEventPublisher, block, abciResponses)
fail.Fail() // XXX
// save the results before we commit
@ -97,6 +97,12 @@ func (blockExec *BlockExecutor) ApplyBlock(s State, blockID types.BlockID, block
s.AppHash = appHash
SaveState(blockExec.db, s)
fail.Fail() // XXX
// events are fired after everything else
// NOTE: if we crash between Commit and Save, events wont be fired during replay
fireEvents(blockExec.logger, blockExec.eventBus, block, abciResponses)
return s, nil
}
@ -354,15 +360,27 @@ func updateState(s State, blockID types.BlockID, header *types.Header,
}, nil
}
func fireEvents(txEventPublisher types.TxEventPublisher, block *types.Block, abciResponses *ABCIResponses) {
// Fire NewBlock, NewBlockHeader.
// Fire TxEvent for every tx.
// NOTE: if Tendermint crashes before commit, some or all of these events may be published again.
func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, abciResponses *ABCIResponses) {
// NOTE: do we still need this buffer ?
txEventBuffer := types.NewTxEventBuffer(eventBus, int(block.NumTxs))
for i, tx := range block.Data.Txs {
txEventPublisher.PublishEventTx(types.EventDataTx{types.TxResult{
txEventBuffer.PublishEventTx(types.EventDataTx{types.TxResult{
Height: block.Height,
Index: uint32(i),
Tx: tx,
Result: *(abciResponses.DeliverTx[i]),
}})
}
eventBus.PublishEventNewBlock(types.EventDataNewBlock{block})
eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header})
err := txEventBuffer.Flush()
if err != nil {
logger.Error("Failed to flush event buffer", "err", err)
}
}
//----------------------------------------------------------------------------------------------------


+ 7
- 0
types/events.go View File

@ -175,6 +175,13 @@ func QueryForEvent(eventType string) tmpubsub.Query {
return tmquery.MustParse(fmt.Sprintf("%s='%s'", EventTypeKey, eventType))
}
// BlockEventPublisher publishes all block related events
type BlockEventPublisher interface {
PublishEventNewBlock(block EventDataNewBlock) error
PublishEventNewBlockHeader(header EventDataNewBlockHeader) error
PublishEventTx(EventDataTx) error
}
type TxEventPublisher interface {
PublishEventTx(EventDataTx) error
}

Loading…
Cancel
Save