diff --git a/consensus/state.go b/consensus/state.go index f5a0b636d..201586643 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -819,11 +819,7 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts } // Mempool validated transactions - txs, err := cs.mempool.Reap() - if err != nil { - log.Warn("createProposalBlock: Error getting proposal txs", "error", err) - return nil, nil - } + txs := cs.mempool.Reap() block = &types.Block{ Header: &types.Header{ diff --git a/mempool/mempool.go b/mempool/mempool.go index 95b91d327..2af9bc0e1 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -1,9 +1,11 @@ package mempool import ( + "bytes" "container/list" "sync" "sync/atomic" + "time" "github.com/tendermint/go-clist" . "github.com/tendermint/go-common" @@ -36,16 +38,21 @@ Garbage collection of old elements from mempool.txs is handlde via the DetachPrev() call, which makes old elements not reachable by peer broadcastTxRoutine() automatically garbage collected. +TODO: Better handle tmsp client errors. (make it automatically handle connection errors) + */ const cacheSize = 100000 type Mempool struct { - proxyMtx sync.Mutex - proxyAppConn proxy.AppConn - txs *clist.CList // concurrent linked-list of good txs - counter int64 // simple incrementing counter - height int // the last block Update()'d to + proxyMtx sync.Mutex + proxyAppConn proxy.AppConn + txs *clist.CList // concurrent linked-list of good txs + counter int64 // simple incrementing counter + height int // the last block Update()'d to + rechecking int32 // for re-checking filtered txs on Update() + recheckCursor *clist.CElement // next expected response + recheckEnd *clist.CElement // re-checking stops here // Keep a cache of already-seen txs. // This reduces the pressure on the proxyApp. @@ -55,10 +62,13 @@ type Mempool struct { func NewMempool(proxyAppConn proxy.AppConn) *Mempool { mempool := &Mempool{ - proxyAppConn: proxyAppConn, - txs: clist.New(), - counter: 0, - height: 0, + proxyAppConn: proxyAppConn, + txs: clist.New(), + counter: 0, + height: 0, + rechecking: 0, + recheckCursor: nil, + recheckEnd: nil, cacheMap: make(map[string]struct{}, cacheSize), cacheList: list.New(), @@ -102,6 +112,7 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*tmsp.Response)) (err error) { mem.cacheList.PushBack(tx) // END CACHE + // NOTE: proxyAppConn may error if tx buffer is full if err = mem.proxyAppConn.Error(); err != nil { return err } @@ -115,6 +126,14 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*tmsp.Response)) (err error) { // TMSP callback function func (mem *Mempool) resCb(req *tmsp.Request, res *tmsp.Response) { + if mem.recheckCursor == nil { + mem.resCbNormal(req, res) + } else { + mem.resCbRecheck(req, res) + } +} + +func (mem *Mempool) resCbNormal(req *tmsp.Request, res *tmsp.Response) { switch res.Type { case tmsp.MessageType_CheckTx: if res.Code == tmsp.CodeType_OK { @@ -134,14 +153,47 @@ func (mem *Mempool) resCb(req *tmsp.Request, res *tmsp.Response) { } } +func (mem *Mempool) resCbRecheck(req *tmsp.Request, res *tmsp.Response) { + switch res.Type { + case tmsp.MessageType_CheckTx: + memTx := mem.recheckCursor.Value.(*mempoolTx) + if !bytes.Equal(req.Data, memTx.tx) { + PanicSanity(Fmt("Unexpected tx response from proxy during recheck\n"+ + "Expected %X, got %X", req.Data, memTx.tx)) + } + if res.Code == tmsp.CodeType_OK { + // Good, nothing to do. + } else { + // Tx became invalidated due to newly committed block. + mem.txs.Remove(mem.recheckCursor) + mem.recheckCursor.DetachPrev() + } + if mem.recheckCursor == mem.recheckEnd { + mem.recheckCursor = nil + } else { + mem.recheckCursor = mem.recheckCursor.Next() + } + if mem.recheckCursor == nil { + // Done! + atomic.StoreInt32(&mem.rechecking, 0) + } + default: + // ignore other messages + } +} + // Get the valid transactions remaining -func (mem *Mempool) Reap() ([]types.Tx, error) { +func (mem *Mempool) Reap() []types.Tx { mem.proxyMtx.Lock() defer mem.proxyMtx.Unlock() - txs := mem.collectTxs() + for atomic.LoadInt32(&mem.rechecking) > 0 { + // TODO: Something better? + time.Sleep(time.Millisecond * 10) + } - return txs, nil + txs := mem.collectTxs() + return txs } func (mem *Mempool) collectTxs() []types.Tx { @@ -156,7 +208,7 @@ func (mem *Mempool) collectTxs() []types.Tx { // Tell mempool that these txs were committed. // Mempool will discard these txs. // NOTE: this should be called *after* block is committed by consensus. -func (mem *Mempool) Update(height int, txs []types.Tx) error { +func (mem *Mempool) Update(height int, txs []types.Tx) { mem.proxyMtx.Lock() defer mem.proxyMtx.Unlock() @@ -168,11 +220,15 @@ func (mem *Mempool) Update(height int, txs []types.Tx) error { // Set height mem.height = height - // Remove transactions that are already in txs. - mem.filterTxs(txsMap) - - return nil + goodTxs := mem.filterTxs(txsMap) + // Recheck mempool txs + // TODO: make optional + mem.recheckTxs(goodTxs) + + // At this point, mem.txs are being rechecked. + // mem.recheckCursor re-scans mem.txs and possibly removes some txs. + // Before mem.Reap(), we should wait for mem.recheckCursor to be nil. } func (mem *Mempool) filterTxs(blockTxsMap map[string]struct{}) []types.Tx { @@ -191,6 +247,23 @@ func (mem *Mempool) filterTxs(blockTxsMap map[string]struct{}) []types.Tx { return goodTxs } +// NOTE: pass in goodTxs because mem.txs can mutate concurrently. +func (mem *Mempool) recheckTxs(goodTxs []types.Tx) { + if len(goodTxs) == 0 { + return + } + atomic.StoreInt32(&mem.rechecking, 1) + mem.recheckCursor = mem.txs.Front() + mem.recheckEnd = mem.txs.Back() + + // Push txs to proxyAppConn + // NOTE: resCb() may be called concurrently. + for _, tx := range goodTxs { + mem.proxyAppConn.CheckTxAsync(tx) + } + mem.proxyAppConn.FlushAsync() +} + //-------------------------------------------------------------------------------- // A transaction that successfully ran diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index b4a104239..1d93efc90 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -44,10 +44,7 @@ func TestSerialReap(t *testing.T) { } reapCheck := func(exp int) { - txs, err := mempool.Reap() - if err != nil { - t.Error("Error in mempool.Reap()", err) - } + txs := mempool.Reap() if len(txs) != exp { t.Fatalf("Expected to reap %v txs but got %v", exp, len(txs)) } @@ -60,10 +57,7 @@ func TestSerialReap(t *testing.T) { binary.BigEndian.PutUint64(txBytes, uint64(i)) txs = append(txs, txBytes) } - err := mempool.Update(0, txs) - if err != nil { - t.Error("Error in mempool.Update()", err) - } + mempool.Update(0, txs) } commitRange := func(start, end int) { diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 7befedbf8..61bf08a56 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -36,6 +36,6 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { } func UnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) { - txs, err := mempoolReactor.Mempool.Reap() - return &ctypes.ResultUnconfirmedTxs{len(txs), txs}, err + txs := mempoolReactor.Mempool.Reap() + return &ctypes.ResultUnconfirmedTxs{len(txs), txs}, nil }