From d27e0bbad578945bd8ece6f0386df8c9e98b2f57 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 15 Apr 2015 23:40:27 -0700 Subject: [PATCH] event cache and fireable interace --- blockchain/reactor.go | 4 ++-- consensus/reactor.go | 6 +++--- consensus/state.go | 18 ++++++++++------ events/event_cache.go | 39 +++++++++++++++++++++++++++++++++ events/events.go | 7 +++++- mempool/mempool.go | 4 ++-- mempool/reactor.go | 4 ++-- node/node.go | 6 +++--- p2p/pex_reactor.go | 4 ++-- state/execution.go | 50 ++++++++++++++++++++----------------------- state/state.go | 12 +++++------ vm/vm.go | 10 ++++----- 12 files changed, 105 insertions(+), 59 deletions(-) create mode 100644 events/event_cache.go diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 7a213b0f0..489059dab 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -42,7 +42,7 @@ type BlockchainReactor struct { quit chan struct{} running uint32 - evsw *events.EventSwitch + evsw events.Fireable } func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *BlockchainReactor { @@ -242,7 +242,7 @@ func (bcR *BlockchainReactor) BroadcastStatus() error { } // implements events.Eventable -func (bcR *BlockchainReactor) SetEventSwitch(evsw *events.EventSwitch) { +func (bcR *BlockchainReactor) SetFireable(evsw events.Fireable) { bcR.evsw = evsw } diff --git a/consensus/reactor.go b/consensus/reactor.go index 16ef2129f..dfad3e61b 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -40,7 +40,7 @@ type ConsensusReactor struct { blockStore *bc.BlockStore conS *ConsensusState - evsw *events.EventSwitch + evsw events.Fireable } func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore) *ConsensusReactor { @@ -234,9 +234,9 @@ func (conR *ConsensusReactor) ResetToState(state *sm.State) { } // implements events.Eventable -func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) { +func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) { conR.evsw = evsw - conR.conS.SetEventSwitch(evsw) + conR.conS.SetFireable(evsw) } //-------------------------------------- diff --git a/consensus/state.go b/consensus/state.go index 0b698a311..c1579c7e9 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -251,7 +251,8 @@ type ConsensusState struct { stagedState *sm.State // Cache result of staged block. lastCommitVoteHeight uint // Last called commitVoteBlock() or saveCommitVoteBlock() on. - evsw *events.EventSwitch + evsw events.Fireable + evc *events.EventCache // set in stageBlock and passed into state } func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReactor *mempl.MempoolReactor) *ConsensusState { @@ -443,9 +444,12 @@ ACTION_LOOP: if cs.TryFinalizeCommit(rs.Height) { // Now at new height // cs.Step is at RoundStepNewHeight or RoundStepNewRound. - newBlock := cs.blockStore.LoadBlock(cs.state.LastBlockHeight) - cs.evsw.FireEvent(types.EventStringNewBlock(), newBlock) - // TODO: go fire events from event cache + // fire some events! + go func() { + newBlock := cs.blockStore.LoadBlock(cs.state.LastBlockHeight) + cs.evsw.FireEvent(types.EventStringNewBlock(), newBlock) + cs.evc.Flush() + }() scheduleNextAction() continue ACTION_LOOP } else { @@ -1032,6 +1036,9 @@ func (cs *ConsensusState) stageBlock(block *types.Block, blockParts *types.PartS // Create a copy of the state for staging stateCopy := cs.state.Copy() + // reset the event cache and pass it into the state + cs.evc = events.NewEventCache(cs.evsw) + stateCopy.SetFireable(cs.evc) // Commit block onto the copied state. // NOTE: Basic validation is done in state.AppendBlock(). @@ -1117,9 +1124,8 @@ func (cs *ConsensusState) saveCommitVoteBlock(block *types.Block, blockParts *ty } // implements events.Eventable -func (cs *ConsensusState) SetEventSwitch(evsw *events.EventSwitch) { +func (cs *ConsensusState) SetFireable(evsw events.Fireable) { cs.evsw = evsw - cs.state.SetEventSwitch(evsw) } //----------------------------------------------------------------------------- diff --git a/events/event_cache.go b/events/event_cache.go new file mode 100644 index 000000000..e1af09fa4 --- /dev/null +++ b/events/event_cache.go @@ -0,0 +1,39 @@ +package events + +const ( + eventsBufferSize = 1000 +) + +// An EventCache buffers events for a Fireable +// All events are cached. Filtering happens on Flush +type EventCache struct { + evsw Fireable + events []eventInfo +} + +// Create a new EventCache with an EventSwitch as backend +func NewEventCache(evsw Fireable) *EventCache { + return &EventCache{ + evsw: evsw, + events: make([]eventInfo, eventsBufferSize), + } +} + +// a cached event +type eventInfo struct { + event string + msg interface{} +} + +// Cache an event to be fired upon finality. +func (evc *EventCache) FireEvent(event string, msg interface{}) { + // append to list + evc.events = append(evc.events, eventInfo{event, msg}) +} + +// Fire events by running evsw.FireEvent on all cached events. Blocks. +func (evc *EventCache) Flush() { + for _, ei := range evc.events { + evc.evsw.FireEvent(ei.event, ei.msg) + } +} diff --git a/events/events.go b/events/events.go index 46f01cb57..13aca2dac 100644 --- a/events/events.go +++ b/events/events.go @@ -8,7 +8,12 @@ import ( // reactors and other modules should export // this interface to become eventable type Eventable interface { - SetEventSwitch(*EventSwitch) + SetFireable(Fireable) +} + +// an event switch or cache implements fireable +type Fireable interface { + FireEvent(event string, msg interface{}) } type EventSwitch struct { diff --git a/mempool/mempool.go b/mempool/mempool.go index d5c6dad06..51cfab139 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -42,7 +42,7 @@ func (mem *Mempool) GetCache() *sm.BlockCache { func (mem *Mempool) AddTx(tx types.Tx) (err error) { mem.mtx.Lock() defer mem.mtx.Unlock() - err = sm.ExecTx(mem.cache, tx, false, false) + err = sm.ExecTx(mem.cache, tx, false, nil) if err != nil { log.Debug("AddTx() error", "tx", tx, "error", err) return err @@ -93,7 +93,7 @@ func (mem *Mempool) ResetForBlockAndState(block *types.Block, state *sm.State) { // Next, filter all txs that aren't valid given new state. validTxs := []types.Tx{} for _, tx := range txs { - err := sm.ExecTx(mem.cache, tx, false, false) + err := sm.ExecTx(mem.cache, tx, false, nil) if err == nil { log.Debug("Filter in, valid", "tx", tx) validTxs = append(validTxs, tx) diff --git a/mempool/reactor.go b/mempool/reactor.go index ae469da41..fbab16f84 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -24,7 +24,7 @@ type MempoolReactor struct { Mempool *Mempool - evsw *events.EventSwitch + evsw events.Fireable } func NewMempoolReactor(mempool *Mempool) *MempoolReactor { @@ -114,7 +114,7 @@ func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error { } // implements events.Eventable -func (memR *MempoolReactor) SetEventSwitch(evsw *events.EventSwitch) { +func (memR *MempoolReactor) SetFireable(evsw events.Fireable) { memR.evsw = evsw } diff --git a/node/node.go b/node/node.go index 531413402..311828349 100644 --- a/node/node.go +++ b/node/node.go @@ -81,7 +81,7 @@ func NewNode() *Node { // add the event switch to all services // they should all satisfy events.Eventable - SetEventSwitch(eventSwitch, pexReactor, bcReactor, mempoolReactor, consensusReactor) + SetFireable(eventSwitch, pexReactor, bcReactor, mempoolReactor, consensusReactor) return &Node{ sw: sw, @@ -115,9 +115,9 @@ func (n *Node) Stop() { } // Add the event switch to reactors, mempool, etc. -func SetEventSwitch(evsw *events.EventSwitch, eventables ...events.Eventable) { +func SetFireable(evsw *events.EventSwitch, eventables ...events.Eventable) { for _, e := range eventables { - e.SetEventSwitch(evsw) + e.SetFireable(evsw) } } diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 20e175fc5..02826db41 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -33,7 +33,7 @@ type PEXReactor struct { book *AddrBook - evsw *events.EventSwitch + evsw events.Fireable } func NewPEXReactor(book *AddrBook) *PEXReactor { @@ -211,7 +211,7 @@ func (pexR *PEXReactor) ensurePeers() { } // implements events.Eventable -func (pexR *PEXReactor) SetEventSwitch(evsw *events.EventSwitch) { +func (pexR *PEXReactor) SetFireable(evsw events.Fireable) { pexR.evsw = evsw } diff --git a/state/execution.go b/state/execution.go index 918e9122b..f5f047c80 100644 --- a/state/execution.go +++ b/state/execution.go @@ -6,6 +6,7 @@ import ( "github.com/tendermint/tendermint/account" . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/vm" ) @@ -13,7 +14,7 @@ import ( // NOTE: If an error occurs during block execution, state will be left // at an invalid state. Copy the state before calling ExecBlock! func ExecBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeader) error { - err := execBlock(s, block, blockPartsHeader, true) + err := execBlock(s, block, blockPartsHeader) if err != nil { return err } @@ -29,7 +30,7 @@ func ExecBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeade // executes transactions of a block, does not check block.StateHash // NOTE: If an error occurs during block execution, state will be left // at an invalid state. Copy the state before calling execBlock! -func execBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeader, fireEvents bool) error { +func execBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeader) error { // Basic block validation. err := block.ValidateBasic(s.LastBlockHeight, s.LastBlockHash, s.LastBlockParts, s.LastBlockTime) if err != nil { @@ -111,7 +112,7 @@ func execBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeade // Commit each tx for _, tx := range block.Data.Txs { - err := ExecTx(blockCache, tx, true, fireEvents) + err := ExecTx(blockCache, tx, true, s.evc) if err != nil { return InvalidTxError{tx, err} } @@ -291,13 +292,11 @@ func adjustByOutputs(accounts map[string]*account.Account, outs []*types.TxOutpu // If the tx is invalid, an error will be returned. // Unlike ExecBlock(), state will not be altered. -func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) error { +func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool, evc events.Fireable) error { // TODO: do something with fees fees := uint64(0) _s := blockCache.State() // hack to access validators and event switch. - nilSwitch := _s.evsw == nil - fireEvents = fireEvents && !nilSwitch // Exec tx switch tx := tx_.(type) { @@ -328,16 +327,14 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro blockCache.UpdateAccount(acc) } - // If we're in a block (not mempool), - // fire event on all inputs and outputs - // see types/events.go for spec - if fireEvents { + // if the evc is nil, nothing will happen + if evc != nil { for _, i := range tx.Inputs { - _s.evsw.FireEvent(types.EventStringAccInput(i.Address), tx) + evc.FireEvent(types.EventStringAccInput(i.Address), tx) } for _, o := range tx.Outputs { - _s.evsw.FireEvent(types.EventStringAccOutput(o.Address), tx) + evc.FireEvent(types.EventStringAccOutput(o.Address), tx) } } return nil @@ -427,7 +424,7 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro txCache.UpdateAccount(caller) // because we adjusted by input above, and bumped nonce maybe. txCache.UpdateAccount(callee) // because we adjusted by input above. vmach := vm.NewVM(txCache, params, caller.Address, account.HashSignBytes(tx)) - vmach.SetEventSwitch(_s.evsw) + vmach.SetFireable(_s.evc) // NOTE: Call() transfers the value from caller to callee iff call succeeds. ret, err := vmach.Call(caller, callee, code, tx.Data, value, &gas) @@ -451,12 +448,11 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro // Create a receipt from the ret and whether errored. log.Info("VM call complete", "caller", caller, "callee", callee, "return", ret, "err", err) - if fireEvents { - // Fire Events for sender and receiver - // a separate event will be fired from vm for each - _s.evsw.FireEvent(types.EventStringAccInput(tx.Input.Address), types.EventMsgCallTx{tx, ret, exception}) - - _s.evsw.FireEvent(types.EventStringAccReceive(tx.Address), types.EventMsgCallTx{tx, ret, exception}) + // Fire Events for sender and receiver + // a separate event will be fired from vm for each additional call + if evc != nil { + evc.FireEvent(types.EventStringAccInput(tx.Input.Address), types.EventMsgCallTx{tx, ret, exception}) + evc.FireEvent(types.EventStringAccReceive(tx.Address), types.EventMsgCallTx{tx, ret, exception}) } } else { // The mempool does not call txs until @@ -525,8 +521,8 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro if !added { panic("Failed to add validator") } - if fireEvents { - _s.evsw.FireEvent(types.EventStringBond(), tx) + if evc != nil { + evc.FireEvent(types.EventStringBond(), tx) } return nil @@ -550,8 +546,8 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro // Good! _s.unbondValidator(val) - if fireEvents { - _s.evsw.FireEvent(types.EventStringUnbond(), tx) + if evc != nil { + evc.FireEvent(types.EventStringUnbond(), tx) } return nil @@ -575,8 +571,8 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro // Good! _s.rebondValidator(val) - if fireEvents { - _s.evsw.FireEvent(types.EventStringRebond(), tx) + if evc != nil { + evc.FireEvent(types.EventStringRebond(), tx) } return nil @@ -621,8 +617,8 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro // Good! (Bad validator!) _s.destroyValidator(accused) - if fireEvents { - _s.evsw.FireEvent(types.EventStringDupeout(), tx) + if evc != nil { + evc.FireEvent(types.EventStringDupeout(), tx) } return nil diff --git a/state/state.go b/state/state.go index 4bd3da422..01270a241 100644 --- a/state/state.go +++ b/state/state.go @@ -36,7 +36,7 @@ type State struct { accounts merkle.Tree // Shouldn't be accessed directly. validatorInfos merkle.Tree // Shouldn't be accessed directly. - evsw *events.EventSwitch + evc events.Fireable // typically an events.EventCache } func LoadState(db dbm.DB) *State { @@ -101,7 +101,6 @@ func (s *State) Copy() *State { UnbondingValidators: s.UnbondingValidators.Copy(), // copy the valSet lazily. accounts: s.accounts.Copy(), validatorInfos: s.validatorInfos.Copy(), - evsw: s.evsw, } } @@ -119,7 +118,8 @@ func (s *State) Hash() []byte { // Mutates the block in place and updates it with new state hash. func (s *State) SetBlockStateHash(block *types.Block) error { sCopy := s.Copy() - err := execBlock(sCopy, block, types.PartSetHeader{}, false) // don't fire events + // sCopy has no event cache in it, so this won't fire events + err := execBlock(sCopy, block, types.PartSetHeader{}) if err != nil { return err } @@ -268,9 +268,9 @@ func (s *State) LoadStorage(hash []byte) (storage merkle.Tree) { // State.storage //------------------------------------- -// implements events.Eventable -func (s *State) SetEventSwitch(evsw *events.EventSwitch) { - s.evsw = evsw +// Implements events.Eventable. Typically uses events.EventCache +func (s *State) SetFireable(evc events.Fireable) { + s.evc = evc } //----------------------------------------------------------------------------- diff --git a/vm/vm.go b/vm/vm.go index 50b95a19d..79b2a3345 100644 --- a/vm/vm.go +++ b/vm/vm.go @@ -49,7 +49,7 @@ type VM struct { callDepth int - evsw *events.EventSwitch + evc events.Fireable } func NewVM(appState AppState, params Params, origin Word256, txid []byte) *VM { @@ -63,8 +63,8 @@ func NewVM(appState AppState, params Params, origin Word256, txid []byte) *VM { } // satisfies events.Eventable -func (vm *VM) SetEventSwitch(evsw *events.EventSwitch) { - vm.evsw = evsw +func (vm *VM) SetFireable(evc events.Fireable) { + vm.evc = evc } // CONTRACT appState is aware of caller and callee, so we can just mutate them. @@ -93,8 +93,8 @@ func (vm *VM) Call(caller, callee *Account, code, input []byte, value uint64, ga } // if callDepth is 0 the event is fired from ExecTx (along with the Input event) // otherwise, we fire from here. - if vm.callDepth != 0 && vm.evsw != nil { - vm.evsw.FireEvent(types.EventStringAccReceive(callee.Address.Prefix(20)), types.EventMsgCall{ + if vm.callDepth != 0 && vm.evc != nil { + vm.evc.FireEvent(types.EventStringAccReceive(callee.Address.Prefix(20)), types.EventMsgCall{ &types.CallData{caller.Address.Prefix(20), callee.Address.Prefix(20), input, value, *gas}, vm.origin.Prefix(20), vm.txid,