Browse Source

lock mempool for commit and update. closes #202

pull/206/merge
Ethan Buchman 9 years ago
committed by Jae Kwon
parent
commit
8e5e5c689f
3 changed files with 45 additions and 28 deletions
  1. +34
    -13
      consensus/state.go
  2. +11
    -2
      mempool/mempool.go
  3. +0
    -13
      state/execution.go

+ 34
- 13
consensus/state.go View File

@ -1217,7 +1217,14 @@ func (cs *ConsensusState) finalizeCommit(height int) {
err := stateCopy.ExecBlock(cs.evsw, cs.proxyAppConn, block, blockParts.Header())
if err != nil {
// TODO: handle this gracefully.
PanicQ(Fmt("Exec failed for application"))
PanicQ(Fmt("Exec failed for application: %v", err))
}
// lock mempool, commit state, update mempoool
err = cs.commitStateUpdateMempool(stateCopy, block)
if err != nil {
// TODO: handle this gracefully.
PanicQ(Fmt("Commit failed for application: %v", err))
}
// Save to blockStore.
@ -1227,21 +1234,9 @@ func (cs *ConsensusState) finalizeCommit(height int) {
cs.blockStore.SaveBlock(block, blockParts, seenCommit)
}
/*
// Commit to proxyAppConn
err = cs.proxyAppConn.CommitSync()
if err != nil {
// TODO: handle this gracefully.
PanicQ(Fmt("Commit failed for application"))
}
*/
// Save the state.
stateCopy.Save()
// Update mempool.
cs.mempool.Update(block.Height, block.Txs)
// NewHeightStep!
cs.updateToState(stateCopy)
@ -1256,6 +1251,32 @@ func (cs *ConsensusState) finalizeCommit(height int) {
return
}
// mempool must be locked during commit and update
// because state is typically reset on Commit and old txs must be replayed
// against committed state before new txs are run in the mempool, lest they be invalid
func (cs *ConsensusState) commitStateUpdateMempool(s *sm.State, block *types.Block) error {
cs.mempool.Lock()
defer cs.mempool.Unlock()
// Commit block, get hash back
res := cs.proxyAppConn.CommitSync()
if res.IsErr() {
log.Warn("Error in proxyAppConn.CommitSync", "error", res)
return res
}
if res.Log != "" {
log.Debug("Commit.Log: " + res.Log)
}
// Set the state's new AppHash
s.AppHash = res.Data
// Update mempool.
cs.mempool.Update(block.Height, block.Txs)
return nil
}
//-----------------------------------------------------------------------------
func (cs *ConsensusState) setProposal(proposal *types.Proposal) error {


+ 11
- 2
mempool/mempool.go View File

@ -77,6 +77,14 @@ func NewMempool(proxyAppConn proxy.AppConn) *Mempool {
return mempool
}
func (mem *Mempool) Lock() {
mem.proxyMtx.Lock()
}
func (mem *Mempool) Unlock() {
mem.proxyMtx.Unlock()
}
func (mem *Mempool) Size() int {
return mem.txs.Len()
}
@ -219,9 +227,10 @@ func (mem *Mempool) collectTxs(maxTxs int) []types.Tx {
// Tell mempool that these txs were committed.
// Mempool will discard these txs.
// NOTE: this should be called *after* block is committed by consensus.
// NOTE: unsafe; Lock/Unlock must be managed by caller
func (mem *Mempool) Update(height int, txs []types.Tx) {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
// mem.proxyMtx.Lock()
// defer mem.proxyMtx.Unlock()
mem.proxyAppConn.FlushSync() // To flush async resCb calls e.g. from CheckTx
// First, create a lookup map of txns in new txs.


+ 0
- 13
state/execution.go View File

@ -94,20 +94,7 @@ func (s *State) execBlockOnProxyApp(evsw *events.EventSwitch, proxyAppConn proxy
// TODO: Do something with changedValidators
log.Info("TODO: Do something with changedValidators", changedValidators)
// Commit block, get hash back
res := proxyAppConn.CommitSync()
if res.IsErr() {
log.Warn("Error in proxyAppConn.CommitSync", "error", res)
return res
}
if res.Log != "" {
log.Debug("Commit.Log: " + res.Log)
}
log.Info(Fmt("ExecBlock got %v valid txs and %v invalid txs", validTxs, invalidTxs))
// Set the state's new AppHash
s.AppHash = res.Data
return nil
}


Loading…
Cancel
Save