Browse Source

call FlushSync before calling CommitSync

if we call it after, we might receive a "fresh" transaction from
  `broadcast_tx_sync` before old transactions (which were not
  committed).

Refs #1091

```
Commit is called with a lock on the mempool, meaning no calls to CheckTx
can start. However, since CheckTx is called async in the mempool
connection, some CheckTx might have already "sailed", when the lock is
released in the mempool and Commit proceeds.

Then, that spurious CheckTx has not yet "begun" in the ABCI app (stuck
in transport?). Instead, ABCI app manages to start to process the
Commit. Next, the spurious, "sailed" CheckTx happens in the wrong place.
```
pull/1143/head
Anton Kaliaev 7 years ago
parent
commit
5f3048bd09
No known key found for this signature in database GPG Key ID: 7B6881D965918214
3 changed files with 16 additions and 3 deletions
  1. +6
    -3
      mempool/mempool.go
  2. +8
    -0
      state/execution.go
  3. +2
    -0
      types/services.go

+ 6
- 3
mempool/mempool.go View File

@ -159,6 +159,12 @@ func (mem *Mempool) Size() int {
return mem.txs.Len()
}
// Flushes the mempool connection to ensure async resCb calls are done e.g.
// from CheckTx.
func (mem *Mempool) FlushAppConn() error {
return mem.proxyAppConn.FlushSync()
}
// Flush removes all transactions from the mempool and cache
func (mem *Mempool) Flush() {
mem.proxyMtx.Lock()
@ -347,9 +353,6 @@ func (mem *Mempool) collectTxs(maxTxs int) types.Txs {
// NOTE: this should be called *after* block is committed by consensus.
// NOTE: unsafe; Lock/Unlock must be managed by caller
func (mem *Mempool) Update(height int64, txs types.Txs) error {
if err := mem.proxyAppConn.FlushSync(); err != nil { // To flush async resCb calls e.g. from CheckTx
return err
}
// First, create a lookup map of txns in new txs.
txsMap := make(map[string]struct{})
for _, tx := range txs {


+ 8
- 0
state/execution.go View File

@ -127,6 +127,14 @@ func (blockExec *BlockExecutor) Commit(block *types.Block) ([]byte, error) {
blockExec.mempool.Lock()
defer blockExec.mempool.Unlock()
// while mempool is Locked, flush to ensure all async requests have completed
// in the ABCI app before Commit.
err := blockExec.mempool.FlushAppConn()
if err != nil {
blockExec.logger.Error("Client error during mempool.FlushAppConn", "err", err)
return nil, err
}
// Commit block, get hash back
res, err := blockExec.proxyApp.CommitSync()
if err != nil {


+ 2
- 0
types/services.go View File

@ -27,6 +27,7 @@ type Mempool interface {
Reap(int) Txs
Update(height int64, txs Txs) error
Flush()
FlushAppConn() error
TxsAvailable() <-chan int64
EnableTxsAvailable()
@ -44,6 +45,7 @@ func (m MockMempool) CheckTx(tx Tx, cb func(*abci.Response)) error { return nil
func (m MockMempool) Reap(n int) Txs { return Txs{} }
func (m MockMempool) Update(height int64, txs Txs) error { return nil }
func (m MockMempool) Flush() {}
func (m MockMempool) FlushAppConn() error { return nil }
func (m MockMempool) TxsAvailable() <-chan int64 { return make(chan int64) }
func (m MockMempool) EnableTxsAvailable() {}


Loading…
Cancel
Save