Browse Source

Recheck txs

pull/192/head
Jae Kwon 9 years ago
parent
commit
d31d3c58ad
4 changed files with 95 additions and 32 deletions
  1. +1
    -5
      consensus/state.go
  2. +90
    -17
      mempool/mempool.go
  3. +2
    -8
      mempool/mempool_test.go
  4. +2
    -2
      rpc/core/mempool.go

+ 1
- 5
consensus/state.go View File

@ -819,11 +819,7 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts
}
// Mempool validated transactions
txs, err := cs.mempool.Reap()
if err != nil {
log.Warn("createProposalBlock: Error getting proposal txs", "error", err)
return nil, nil
}
txs := cs.mempool.Reap()
block = &types.Block{
Header: &types.Header{


+ 90
- 17
mempool/mempool.go View File

@ -1,9 +1,11 @@
package mempool
import (
"bytes"
"container/list"
"sync"
"sync/atomic"
"time"
"github.com/tendermint/go-clist"
. "github.com/tendermint/go-common"
@ -36,16 +38,21 @@ Garbage collection of old elements from mempool.txs is handlde via
the DetachPrev() call, which makes old elements not reachable by
peer broadcastTxRoutine() automatically garbage collected.
TODO: Better handle tmsp client errors. (make it automatically handle connection errors)
*/
const cacheSize = 100000
type Mempool struct {
proxyMtx sync.Mutex
proxyAppConn proxy.AppConn
txs *clist.CList // concurrent linked-list of good txs
counter int64 // simple incrementing counter
height int // the last block Update()'d to
proxyMtx sync.Mutex
proxyAppConn proxy.AppConn
txs *clist.CList // concurrent linked-list of good txs
counter int64 // simple incrementing counter
height int // the last block Update()'d to
rechecking int32 // for re-checking filtered txs on Update()
recheckCursor *clist.CElement // next expected response
recheckEnd *clist.CElement // re-checking stops here
// Keep a cache of already-seen txs.
// This reduces the pressure on the proxyApp.
@ -55,10 +62,13 @@ type Mempool struct {
func NewMempool(proxyAppConn proxy.AppConn) *Mempool {
mempool := &Mempool{
proxyAppConn: proxyAppConn,
txs: clist.New(),
counter: 0,
height: 0,
proxyAppConn: proxyAppConn,
txs: clist.New(),
counter: 0,
height: 0,
rechecking: 0,
recheckCursor: nil,
recheckEnd: nil,
cacheMap: make(map[string]struct{}, cacheSize),
cacheList: list.New(),
@ -102,6 +112,7 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*tmsp.Response)) (err error) {
mem.cacheList.PushBack(tx)
// END CACHE
// NOTE: proxyAppConn may error if tx buffer is full
if err = mem.proxyAppConn.Error(); err != nil {
return err
}
@ -115,6 +126,14 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*tmsp.Response)) (err error) {
// TMSP callback function
func (mem *Mempool) resCb(req *tmsp.Request, res *tmsp.Response) {
if mem.recheckCursor == nil {
mem.resCbNormal(req, res)
} else {
mem.resCbRecheck(req, res)
}
}
func (mem *Mempool) resCbNormal(req *tmsp.Request, res *tmsp.Response) {
switch res.Type {
case tmsp.MessageType_CheckTx:
if res.Code == tmsp.CodeType_OK {
@ -134,14 +153,47 @@ func (mem *Mempool) resCb(req *tmsp.Request, res *tmsp.Response) {
}
}
func (mem *Mempool) resCbRecheck(req *tmsp.Request, res *tmsp.Response) {
switch res.Type {
case tmsp.MessageType_CheckTx:
memTx := mem.recheckCursor.Value.(*mempoolTx)
if !bytes.Equal(req.Data, memTx.tx) {
PanicSanity(Fmt("Unexpected tx response from proxy during recheck\n"+
"Expected %X, got %X", req.Data, memTx.tx))
}
if res.Code == tmsp.CodeType_OK {
// Good, nothing to do.
} else {
// Tx became invalidated due to newly committed block.
mem.txs.Remove(mem.recheckCursor)
mem.recheckCursor.DetachPrev()
}
if mem.recheckCursor == mem.recheckEnd {
mem.recheckCursor = nil
} else {
mem.recheckCursor = mem.recheckCursor.Next()
}
if mem.recheckCursor == nil {
// Done!
atomic.StoreInt32(&mem.rechecking, 0)
}
default:
// ignore other messages
}
}
// Get the valid transactions remaining
func (mem *Mempool) Reap() ([]types.Tx, error) {
func (mem *Mempool) Reap() []types.Tx {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
txs := mem.collectTxs()
for atomic.LoadInt32(&mem.rechecking) > 0 {
// TODO: Something better?
time.Sleep(time.Millisecond * 10)
}
return txs, nil
txs := mem.collectTxs()
return txs
}
func (mem *Mempool) collectTxs() []types.Tx {
@ -156,7 +208,7 @@ func (mem *Mempool) collectTxs() []types.Tx {
// Tell mempool that these txs were committed.
// Mempool will discard these txs.
// NOTE: this should be called *after* block is committed by consensus.
func (mem *Mempool) Update(height int, txs []types.Tx) error {
func (mem *Mempool) Update(height int, txs []types.Tx) {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
@ -168,11 +220,15 @@ func (mem *Mempool) Update(height int, txs []types.Tx) error {
// Set height
mem.height = height
// Remove transactions that are already in txs.
mem.filterTxs(txsMap)
return nil
goodTxs := mem.filterTxs(txsMap)
// Recheck mempool txs
// TODO: make optional
mem.recheckTxs(goodTxs)
// At this point, mem.txs are being rechecked.
// mem.recheckCursor re-scans mem.txs and possibly removes some txs.
// Before mem.Reap(), we should wait for mem.recheckCursor to be nil.
}
func (mem *Mempool) filterTxs(blockTxsMap map[string]struct{}) []types.Tx {
@ -191,6 +247,23 @@ func (mem *Mempool) filterTxs(blockTxsMap map[string]struct{}) []types.Tx {
return goodTxs
}
// NOTE: pass in goodTxs because mem.txs can mutate concurrently.
func (mem *Mempool) recheckTxs(goodTxs []types.Tx) {
if len(goodTxs) == 0 {
return
}
atomic.StoreInt32(&mem.rechecking, 1)
mem.recheckCursor = mem.txs.Front()
mem.recheckEnd = mem.txs.Back()
// Push txs to proxyAppConn
// NOTE: resCb() may be called concurrently.
for _, tx := range goodTxs {
mem.proxyAppConn.CheckTxAsync(tx)
}
mem.proxyAppConn.FlushAsync()
}
//--------------------------------------------------------------------------------
// A transaction that successfully ran


+ 2
- 8
mempool/mempool_test.go View File

@ -44,10 +44,7 @@ func TestSerialReap(t *testing.T) {
}
reapCheck := func(exp int) {
txs, err := mempool.Reap()
if err != nil {
t.Error("Error in mempool.Reap()", err)
}
txs := mempool.Reap()
if len(txs) != exp {
t.Fatalf("Expected to reap %v txs but got %v", exp, len(txs))
}
@ -60,10 +57,7 @@ func TestSerialReap(t *testing.T) {
binary.BigEndian.PutUint64(txBytes, uint64(i))
txs = append(txs, txBytes)
}
err := mempool.Update(0, txs)
if err != nil {
t.Error("Error in mempool.Update()", err)
}
mempool.Update(0, txs)
}
commitRange := func(start, end int) {


+ 2
- 2
rpc/core/mempool.go View File

@ -36,6 +36,6 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
}
func UnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) {
txs, err := mempoolReactor.Mempool.Reap()
return &ctypes.ResultUnconfirmedTxs{len(txs), txs}, err
txs := mempoolReactor.Mempool.Reap()
return &ctypes.ResultUnconfirmedTxs{len(txs), txs}, nil
}

Loading…
Cancel
Save