|
|
@ -100,6 +100,7 @@ func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool) *M |
|
|
|
|
|
|
|
// FireOnTxsAvailable initializes the TxsAvailable channel,
|
|
|
|
// ensuring it will trigger once every height when transactions are available.
|
|
|
|
// NOTE: not thread safe - should only be called once, on startup
|
|
|
|
func (mem *Mempool) FireOnTxsAvailable() { |
|
|
|
mem.txsAvailable = make(chan struct{}, 1) |
|
|
|
} |
|
|
@ -225,7 +226,7 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) { |
|
|
|
tx: req.GetCheckTx().Tx, |
|
|
|
} |
|
|
|
mem.txs.PushBack(memTx) |
|
|
|
mem.alertIfTxsAvailable() |
|
|
|
mem.notifyIfTxsAvailable() |
|
|
|
} else { |
|
|
|
// ignore bad transaction
|
|
|
|
mem.logger.Info("Bad Transaction", "res", r) |
|
|
@ -268,20 +269,13 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) { |
|
|
|
atomic.StoreInt32(&mem.rechecking, 0) |
|
|
|
mem.logger.Info("Done rechecking txs") |
|
|
|
|
|
|
|
mem.alertIfTxsAvailable() |
|
|
|
mem.notifyIfTxsAvailable() |
|
|
|
} |
|
|
|
default: |
|
|
|
// ignore other messages
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (mem *Mempool) alertIfTxsAvailable() { |
|
|
|
if !mem.notifiedTxsAvailable && mem.Size() > 0 { |
|
|
|
mem.notifiedTxsAvailable = true |
|
|
|
mem.txsAvailable <- struct{}{} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// TxsAvailable returns a channel which fires once for every height,
|
|
|
|
// and only when transactions are available in the mempool.
|
|
|
|
// XXX: Will panic if mem.FireOnTxsAvailable() has not been called.
|
|
|
@ -292,7 +286,7 @@ func (mem *Mempool) TxsAvailable() chan struct{} { |
|
|
|
return mem.txsAvailable |
|
|
|
} |
|
|
|
|
|
|
|
func (mem *Mempool) alertIfTxsAvailable() { |
|
|
|
func (mem *Mempool) notifyIfTxsAvailable() { |
|
|
|
if mem.txsAvailable != nil && |
|
|
|
!mem.notifiedTxsAvailable && mem.Size() > 0 { |
|
|
|
|
|
|
@ -345,6 +339,8 @@ func (mem *Mempool) Update(height int, txs types.Txs) { |
|
|
|
} |
|
|
|
|
|
|
|
// Set height
|
|
|
|
// NOTE: the height is not set until Update is first called
|
|
|
|
// (so it will be wrong after a restart until the next block)
|
|
|
|
mem.height = height |
|
|
|
mem.notifiedTxsAvailable = false |
|
|
|
|
|
|
|