diff --git a/mempool/mempool.go b/mempool/mempool.go index 5a629b159..e26302051 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -60,7 +60,7 @@ type Mempool struct { // Keep a cache of already-seen txs. // This reduces the pressure on the proxyApp. cacheMap map[string]struct{} - cacheList *list.List + cacheList *list.List // to remove oldest tx when cache gets too big } func NewMempool(config cfg.Config, proxyAppConn proxy.AppConn) *Mempool { @@ -81,6 +81,7 @@ func NewMempool(config cfg.Config, proxyAppConn proxy.AppConn) *Mempool { return mempool } +// consensus must be able to hold lock to safely update func (mem *Mempool) Lock() { mem.proxyMtx.Lock() } @@ -89,10 +90,25 @@ func (mem *Mempool) Unlock() { mem.proxyMtx.Unlock() } +// Number of transactions in the mempool clist func (mem *Mempool) Size() int { return mem.txs.Len() } +// Remove all transactions from mempool and cache +func (mem *Mempool) Flush() { + mem.proxyMtx.Lock() + defer mem.proxyMtx.Unlock() + + mem.cacheMap = make(map[string]struct{}, cacheSize) + mem.cacheList.Init() + + for e := mem.txs.Front(); e != nil; e = e.Next() { + mem.txs.Remove(e) + e.DetachPrev() + } +} + // Return the first element of mem.txs for peer goroutines to call .NextWait() on. // Blocks until txs has elements. func (mem *Mempool) TxsFrontWait() *clist.CElement { @@ -125,6 +141,8 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*tmsp.Response)) (err error) { if mem.cacheList.Len() >= cacheSize { popped := mem.cacheList.Front() poppedTx := popped.Value.(types.Tx) + // NOTE: the tx may have already been removed from the map + // but deleting a non-existant element is fine delete(mem.cacheMap, string(poppedTx)) mem.cacheList.Remove(popped) } @@ -146,6 +164,7 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*tmsp.Response)) (err error) { func (mem *Mempool) removeTxFromCacheMap(tx []byte) { mem.proxyMtx.Lock() + // NOTE tx not removed from cacheList delete(mem.cacheMap, string(tx)) mem.proxyMtx.Unlock() diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index f915ce426..cf3ec919c 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -46,7 +46,7 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { // Else, block until the tx is included in a block, // and return the result of AppendTx (with no error). // Even if AppendTx fails, so long as the tx is included in a block this function -// will not return an error. +// will not return an error - it is the caller's responsibility to check res.Code. // The function times out after five minutes and returns the result of CheckTx and an error. // TODO: smarter timeout logic or someway to cancel (tx not getting committed is a sign of a larger problem!) func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { diff --git a/rpc/test/client_test.go b/rpc/test/client_test.go index a5b243bea..bb66965ed 100644 --- a/rpc/test/client_test.go +++ b/rpc/test/client_test.go @@ -1,11 +1,13 @@ package rpctest import ( + "bytes" "fmt" "testing" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" + tmsp "github.com/tendermint/tmsp/types" ) //-------------------------------------------------------------------------------- @@ -42,33 +44,84 @@ func testStatus(t *testing.T, statusI interface{}) { } } -// TODO -/* -func testBroadcastTx(t *testing.T, typ string) { - amt := int64(100) - toAddr := user[1].Address - tx := makeDefaultSendTxSigned(t, typ, toAddr, amt) - receipt := broadcastTx(t, typ, tx) - if receipt.CreatesContract > 0 { - t.Fatal("This tx does not create a contract") - } - if len(receipt.TxHash) == 0 { - t.Fatal("Failed to compute tx hash") - } - pool := node.MempoolReactor().Mempool - txs := pool.GetProposalTxs() - if len(txs) != mempoolCount { - t.Fatalf("The mem pool has %d txs. Expected %d", len(txs), mempoolCount) - } - tx2 := txs[mempoolCount-1].(*types.SendTx) - n, err := new(int64), new(error) - buf1, buf2 := new(bytes.Buffer), new(bytes.Buffer) - tx.WriteSignBytes(chainID, buf1, n, err) - tx2.WriteSignBytes(chainID, buf2, n, err) - if bytes.Compare(buf1.Bytes(), buf2.Bytes()) != 0 { - t.Fatal("inconsistent hashes for mempool tx and sent tx") +//-------------------------------------------------------------------------------- +// broadcast tx sync + +var testTx = []byte{0x1, 0x2, 0x3, 0x4, 0x5} + +func TestURIBroadcastTxSync(t *testing.T) { + config.Set("block_size", 0) + defer config.Set("block_size", -1) + tmResult := new(ctypes.TMResult) + _, err := clientURI.Call("broadcast_tx_sync", map[string]interface{}{"tx": testTx}, tmResult) + if err != nil { + t.Fatal(err) } -}*/ + testBroadcastTxSync(t, tmResult) +} + +func TestJSONBroadcastTxSync(t *testing.T) { + config.Set("block_size", 0) + defer config.Set("block_size", -1) + tmResult := new(ctypes.TMResult) + _, err := clientJSON.Call("broadcast_tx_sync", []interface{}{testTx}, tmResult) + if err != nil { + t.Fatal(err) + } + testBroadcastTxSync(t, tmResult) +} + +func testBroadcastTxSync(t *testing.T, resI interface{}) { + tmRes := resI.(*ctypes.TMResult) + res := (*tmRes).(*ctypes.ResultBroadcastTx) + if res.Code != tmsp.CodeType_OK { + t.Fatalf("BroadcastTxSync got non-zero exit code: %v. %X; %s", res.Code, res.Data, res.Log) + } + mem := node.MempoolReactor().Mempool + if mem.Size() != 1 { + t.Fatalf("Mempool size should have been 1. Got %d", mem.Size()) + } + + txs := mem.Reap(1) + if !bytes.Equal(txs[0], testTx) { + t.Fatalf("Tx in mempool does not match test tx. Got %X, expected %X", txs[0], testTx) + } + + mem.Flush() +} + +//-------------------------------------------------------------------------------- +// broadcast tx commit + +func TestURIBroadcastTxCommit(t *testing.T) { + tmResult := new(ctypes.TMResult) + _, err := clientURI.Call("broadcast_tx_commit", map[string]interface{}{"tx": testTx}, tmResult) + if err != nil { + t.Fatal(err) + } + testBroadcastTxCommit(t, tmResult) +} + +func TestJSONBroadcastTxCommit(t *testing.T) { + tmResult := new(ctypes.TMResult) + _, err := clientJSON.Call("broadcast_tx_commit", []interface{}{testTx}, tmResult) + if err != nil { + t.Fatal(err) + } + testBroadcastTxCommit(t, tmResult) +} + +func testBroadcastTxCommit(t *testing.T, resI interface{}) { + tmRes := resI.(*ctypes.TMResult) + res := (*tmRes).(*ctypes.ResultBroadcastTx) + if res.Code != tmsp.CodeType_OK { + t.Fatalf("BroadcastTxCommit got non-zero exit code: %v. %X; %s", res.Code, res.Data, res.Log) + } + mem := node.MempoolReactor().Mempool + if mem.Size() != 0 { + t.Fatalf("Mempool size should have been 0. Got %s", mem.Size()) + } +} //-------------------------------------------------------------------------------- // Test the websocket service diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index c209324b6..3713aa821 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -20,7 +20,6 @@ import ( var ( config cfg.Config node *nm.Node - mempoolCount = 0 chainID string rpcAddr string requestAddr string