Browse Source

wait until txs removed from mempool to fire tx events

pull/233/head
Ethan Buchman 9 years ago
parent
commit
8ca615c301
4 changed files with 11 additions and 11 deletions
  1. +7
    -1
      consensus/state.go
  2. +0
    -1
      mempool/mempool.go
  3. +1
    -1
      rpc/test/client_test.go
  4. +3
    -8
      state/execution.go

+ 7
- 1
consensus/state.go View File

@ -1214,10 +1214,13 @@ func (cs *ConsensusState) finalizeCommit(height int) {
// Create a copy of the state for staging
stateCopy := cs.state.Copy()
// event cache for txs
eventCache := events.NewEventCache(cs.evsw)
// Run the block on the State:
// + update validator sets
// + run txs on the proxyAppConn
err := stateCopy.ExecBlock(cs.evsw, cs.proxyAppConn, block, blockParts.Header())
err := stateCopy.ExecBlock(eventCache, cs.proxyAppConn, block, blockParts.Header())
if err != nil {
// TODO: handle this gracefully.
PanicQ(Fmt("Exec failed for application: %v", err))
@ -1230,6 +1233,9 @@ func (cs *ConsensusState) finalizeCommit(height int) {
PanicQ(Fmt("Commit failed for application: %v", err))
}
// txs committed and removed from mempool, fire events
eventCache.Flush()
// Save to blockStore.
if cs.blockStore.Height() < block.Height {
precommits := cs.Votes.Precommits(cs.CommitRound)


+ 0
- 1
mempool/mempool.go View File

@ -167,7 +167,6 @@ func (mem *Mempool) removeTxFromCacheMap(tx []byte) {
// NOTE tx not removed from cacheList
delete(mem.cacheMap, string(tx))
mem.proxyMtx.Unlock()
}
// TMSP callback function


+ 1
- 1
rpc/test/client_test.go View File

@ -119,7 +119,7 @@ func testBroadcastTxCommit(t *testing.T, resI interface{}) {
}
mem := node.MempoolReactor().Mempool
if mem.Size() != 0 {
t.Fatalf("Mempool size should have been 0. Got %s", mem.Size())
t.Fatalf("Mempool size should have been 0. Got %d", mem.Size())
}
}


+ 3
- 8
state/execution.go View File

@ -18,7 +18,7 @@ func (s *State) ValidateBlock(block *types.Block) error {
// Execute the block to mutate State.
// Validates block and then executes Data.Txs in the block.
func (s *State) ExecBlock(evsw *events.EventSwitch, proxyAppConn proxy.AppConn, block *types.Block, blockPartsHeader types.PartSetHeader) error {
func (s *State) ExecBlock(eventCache events.Fireable, proxyAppConn proxy.AppConn, block *types.Block, blockPartsHeader types.PartSetHeader) error {
// Validate the block.
err := s.validateBlock(block)
@ -34,7 +34,7 @@ func (s *State) ExecBlock(evsw *events.EventSwitch, proxyAppConn proxy.AppConn,
nextValSet := valSet.Copy()
// Execute the block txs
err = s.execBlockOnProxyApp(evsw, proxyAppConn, block)
err = s.execBlockOnProxyApp(eventCache, proxyAppConn, block)
if err != nil {
// There was some error in proxyApp
// TODO Report error and wait for proxyApp to be available.
@ -55,9 +55,7 @@ func (s *State) ExecBlock(evsw *events.EventSwitch, proxyAppConn proxy.AppConn,
// Executes block's transactions on proxyAppConn.
// TODO: Generate a bitmap or otherwise store tx validity in state.
func (s *State) execBlockOnProxyApp(evsw *events.EventSwitch, proxyAppConn proxy.AppConn, block *types.Block) error {
eventCache := events.NewEventCache(evsw)
func (s *State) execBlockOnProxyApp(eventCache events.Fireable, proxyAppConn proxy.AppConn, block *types.Block) error {
var validTxs, invalidTxs = 0, 0
@ -100,9 +98,6 @@ func (s *State) execBlockOnProxyApp(evsw *events.EventSwitch, proxyAppConn proxy
log.Info("TODO: Do something with changedValidators", changedValidators)
log.Info(Fmt("ExecBlock got %v valid txs and %v invalid txs", validTxs, invalidTxs))
// fire events
eventCache.Flush()
return nil
}


Loading…
Cancel
Save