From 41f91318e9e253a1b59cc1cc128ad412941b8e2a Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Sat, 23 Feb 2019 19:32:31 +0400 Subject: [PATCH] bound mempool memory usage (#3248) * bound mempool memory usage Closes #3079 * rename SizeBytes to TxsTotalBytes and other small fixes after Zarko's review * rename MaxBytes to MaxTxsTotalBytes * make ErrMempoolIsFull more informative * expose mempool's txs_total_bytes via RPC * test full response * fixes after Ethan's review * config: rename mempool.size to mempool.max_txs https://github.com/tendermint/tendermint/pull/3248#discussion_r254034004 * test more cases https://github.com/tendermint/tendermint/pull/3248#discussion_r254036532 * simplify test * Revert "config: rename mempool.size to mempool.max_txs" This reverts commit 39bfa3696177aa46195000b90655419a975d6ff7. * rename count back to n_txs to make a change non-breaking * rename max_txs_total_bytes to max_txs_bytes * format code * fix TestWALPeriodicSync The test was sometimes failing due to processFlushTicks being called too early. The solution is to call wal#Start later in the test. * Apply suggestions from code review --- CHANGELOG_PENDING.md | 3 ++ config/config.go | 23 +++++---- config/toml.go | 9 +++- docs/tendermint-core/configuration.md | 9 +++- mempool/mempool.go | 47 +++++++++++++++--- mempool/mempool_test.go | 70 ++++++++++++++++++++++++++- rpc/client/rpc_test.go | 12 +++-- rpc/core/mempool.go | 45 ++++++++++------- rpc/core/types/responses.go | 6 ++- 9 files changed, 180 insertions(+), 44 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 65fc4bd25..bf8040722 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -18,6 +18,9 @@ Special thanks to external contributors on this release: * P2P Protocol ### FEATURES: +- [mempool] \#3079 bound mempool memory usage (`mempool.max_txs_bytes` is set to 1GB by default; see config.toml) + mempool's current `txs_total_bytes` is exposed via `total_bytes` field in + `/num_unconfirmed_txs` and `/unconfirmed_txs` RPC endpoints. ### IMPROVEMENTS: diff --git a/config/config.go b/config/config.go index fc50d7c61..c7f966bf0 100644 --- a/config/config.go +++ b/config/config.go @@ -530,12 +530,13 @@ func DefaultFuzzConnConfig() *FuzzConnConfig { // MempoolConfig defines the configuration options for the Tendermint mempool type MempoolConfig struct { - RootDir string `mapstructure:"home"` - Recheck bool `mapstructure:"recheck"` - Broadcast bool `mapstructure:"broadcast"` - WalPath string `mapstructure:"wal_dir"` - Size int `mapstructure:"size"` - CacheSize int `mapstructure:"cache_size"` + RootDir string `mapstructure:"home"` + Recheck bool `mapstructure:"recheck"` + Broadcast bool `mapstructure:"broadcast"` + WalPath string `mapstructure:"wal_dir"` + Size int `mapstructure:"size"` + MaxTxsBytes int64 `mapstructure:"max_txs_bytes"` + CacheSize int `mapstructure:"cache_size"` } // DefaultMempoolConfig returns a default configuration for the Tendermint mempool @@ -544,10 +545,11 @@ func DefaultMempoolConfig() *MempoolConfig { Recheck: true, Broadcast: true, WalPath: "", - // Each signature verification takes .5ms, size reduced until we implement + // Each signature verification takes .5ms, Size reduced until we implement // ABCI Recheck - Size: 5000, - CacheSize: 10000, + Size: 5000, + MaxTxsBytes: 1024 * 1024 * 1024, // 1GB + CacheSize: 10000, } } @@ -574,6 +576,9 @@ func (cfg *MempoolConfig) ValidateBasic() error { if cfg.Size < 0 { return errors.New("size can't be negative") } + if cfg.MaxTxsBytes < 0 { + return errors.New("max_txs_bytes can't be negative") + } if cfg.CacheSize < 0 { return errors.New("cache_size can't be negative") } diff --git a/config/toml.go b/config/toml.go index b023e4e57..d71bfb519 100644 --- a/config/toml.go +++ b/config/toml.go @@ -237,10 +237,15 @@ recheck = {{ .Mempool.Recheck }} broadcast = {{ .Mempool.Broadcast }} wal_dir = "{{ js .Mempool.WalPath }}" -# size of the mempool +# Maximum number of transactions in the mempool size = {{ .Mempool.Size }} -# size of the cache (used to filter transactions we saw earlier) +# Limit the total size of all txs in the mempool. +# This only accounts for raw transactions (e.g. given 1MB transactions and +# max_txs_bytes=5MB, mempool will only accept 5 transactions). +max_txs_bytes = {{ .Mempool.MaxTxsBytes }} + +# Size of the cache (used to filter transactions we saw earlier) in transactions cache_size = {{ .Mempool.CacheSize }} ##### consensus configuration options ##### diff --git a/docs/tendermint-core/configuration.md b/docs/tendermint-core/configuration.md index 0d9a58c4b..f753d2123 100644 --- a/docs/tendermint-core/configuration.md +++ b/docs/tendermint-core/configuration.md @@ -183,10 +183,15 @@ recheck = true broadcast = true wal_dir = "" -# size of the mempool +# Maximum number of transactions in the mempool size = 5000 -# size of the cache (used to filter transactions we saw earlier) +# Limit the total size of all txs in the mempool. +# This only accounts for raw transactions (e.g. given 1MB transactions and +# max_txs_bytes=5MB, mempool will only accept 5 transactions). +max_txs_bytes = 1073741824 + +# Size of the cache (used to filter transactions we saw earlier) in transactions cache_size = 10000 ##### consensus configuration options ##### diff --git a/mempool/mempool.go b/mempool/mempool.go index 8550f2f81..41ee59cb4 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -63,13 +63,26 @@ var ( // ErrTxInCache is returned to the client if we saw tx earlier ErrTxInCache = errors.New("Tx already exists in cache") - // ErrMempoolIsFull means Tendermint & an application can't handle that much load - ErrMempoolIsFull = errors.New("Mempool is full") - // ErrTxTooLarge means the tx is too big to be sent in a message to other peers ErrTxTooLarge = fmt.Errorf("Tx too large. Max size is %d", maxTxSize) ) +// ErrMempoolIsFull means Tendermint & an application can't handle that much load +type ErrMempoolIsFull struct { + numTxs int + maxTxs int + + txsBytes int64 + maxTxsBytes int64 +} + +func (e ErrMempoolIsFull) Error() string { + return fmt.Sprintf( + "Mempool is full: number of txs %d (max: %d), total txs bytes %d (max: %d)", + e.numTxs, e.maxTxs, + e.txsBytes, e.maxTxsBytes) +} + // ErrPreCheck is returned when tx is too big type ErrPreCheck struct { Reason error @@ -147,6 +160,9 @@ type Mempool struct { preCheck PreCheckFunc postCheck PostCheckFunc + // Atomic integers + txsBytes int64 // see TxsBytes + // Keep a cache of already-seen txs. // This reduces the pressure on the proxyApp. cache txCache @@ -265,8 +281,13 @@ func (mem *Mempool) Size() int { return mem.txs.Len() } -// Flushes the mempool connection to ensure async resCb calls are done e.g. -// from CheckTx. +// TxsBytes returns the total size of all txs in the mempool. +func (mem *Mempool) TxsBytes() int64 { + return atomic.LoadInt64(&mem.txsBytes) +} + +// FlushAppConn flushes the mempool connection to ensure async resCb calls are +// done e.g. from CheckTx. func (mem *Mempool) FlushAppConn() error { return mem.proxyAppConn.FlushSync() } @@ -282,6 +303,8 @@ func (mem *Mempool) Flush() { mem.txs.Remove(e) e.DetachPrev() } + + _ = atomic.SwapInt64(&mem.txsBytes, 0) } // TxsFront returns the first transaction in the ordered list for peer @@ -308,8 +331,15 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) { // use defer to unlock mutex because application (*local client*) might panic defer mem.proxyMtx.Unlock() - if mem.Size() >= mem.config.Size { - return ErrMempoolIsFull + var ( + memSize = mem.Size() + txsBytes = mem.TxsBytes() + ) + if memSize >= mem.config.Size || + int64(len(tx))+txsBytes > mem.config.MaxTxsBytes { + return ErrMempoolIsFull{ + memSize, mem.config.Size, + txsBytes, mem.config.MaxTxsBytes} } // The size of the corresponding amino-encoded TxMessage @@ -383,6 +413,7 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) { tx: tx, } mem.txs.PushBack(memTx) + atomic.AddInt64(&mem.txsBytes, int64(len(tx))) mem.logger.Info("Added good transaction", "tx", TxID(tx), "res", r, @@ -424,6 +455,7 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) { // Tx became invalidated due to newly committed block. mem.logger.Info("Tx is no longer valid", "tx", TxID(tx), "res", r, "err", postCheckErr) mem.txs.Remove(mem.recheckCursor) + atomic.AddInt64(&mem.txsBytes, int64(-len(tx))) mem.recheckCursor.DetachPrev() // remove from cache (it might be good later) @@ -597,6 +629,7 @@ func (mem *Mempool) removeTxs(txs types.Txs) []types.Tx { if _, ok := txsMap[string(memTx.tx)]; ok { // remove from clist mem.txs.Remove(e) + atomic.AddInt64(&mem.txsBytes, int64(-len(memTx.tx))) e.DetachPrev() // NOTE: we don't remove committed txs from the cache. diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index 5d737e190..5928fbc56 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -30,8 +30,10 @@ import ( type cleanupFunc func() func newMempoolWithApp(cc proxy.ClientCreator) (*Mempool, cleanupFunc) { - config := cfg.ResetTestRoot("mempool_test") + return newMempoolWithAppAndConfig(cc, cfg.ResetTestRoot("mempool_test")) +} +func newMempoolWithAppAndConfig(cc proxy.ClientCreator, config *cfg.Config) (*Mempool, cleanupFunc) { appConnMem, _ := cc.NewABCIClient() appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool")) err := appConnMem.Start() @@ -462,6 +464,72 @@ func TestMempoolMaxMsgSize(t *testing.T) { } +func TestMempoolTxsBytes(t *testing.T) { + app := kvstore.NewKVStoreApplication() + cc := proxy.NewLocalClientCreator(app) + config := cfg.ResetTestRoot("mempool_test") + config.Mempool.MaxTxsBytes = 10 + mempool, cleanup := newMempoolWithAppAndConfig(cc, config) + defer cleanup() + + // 1. zero by default + assert.EqualValues(t, 0, mempool.TxsBytes()) + + // 2. len(tx) after CheckTx + err := mempool.CheckTx([]byte{0x01}, nil) + require.NoError(t, err) + assert.EqualValues(t, 1, mempool.TxsBytes()) + + // 3. zero again after tx is removed by Update + mempool.Update(1, []types.Tx{[]byte{0x01}}, nil, nil) + assert.EqualValues(t, 0, mempool.TxsBytes()) + + // 4. zero after Flush + err = mempool.CheckTx([]byte{0x02, 0x03}, nil) + require.NoError(t, err) + assert.EqualValues(t, 2, mempool.TxsBytes()) + + mempool.Flush() + assert.EqualValues(t, 0, mempool.TxsBytes()) + + // 5. ErrMempoolIsFull is returned when/if MaxTxsBytes limit is reached. + err = mempool.CheckTx([]byte{0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04}, nil) + require.NoError(t, err) + err = mempool.CheckTx([]byte{0x05}, nil) + if assert.Error(t, err) { + assert.IsType(t, ErrMempoolIsFull{}, err) + } + + // 6. zero after tx is rechecked and removed due to not being valid anymore + app2 := counter.NewCounterApplication(true) + cc = proxy.NewLocalClientCreator(app2) + mempool, cleanup = newMempoolWithApp(cc) + defer cleanup() + + txBytes := make([]byte, 8) + binary.BigEndian.PutUint64(txBytes, uint64(0)) + + err = mempool.CheckTx(txBytes, nil) + require.NoError(t, err) + assert.EqualValues(t, 8, mempool.TxsBytes()) + + appConnCon, _ := cc.NewABCIClient() + appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus")) + err = appConnCon.Start() + require.Nil(t, err) + defer appConnCon.Stop() + res, err := appConnCon.DeliverTxSync(txBytes) + require.NoError(t, err) + require.EqualValues(t, 0, res.Code) + res2, err := appConnCon.CommitSync() + require.NoError(t, err) + require.NotEmpty(t, res2.Data) + + // Pretend like we committed nothing so txBytes gets rechecked and removed. + mempool.Update(1, []types.Tx{}, nil, nil) + assert.EqualValues(t, 0, mempool.TxsBytes()) +} + func checksumIt(data []byte) string { h := sha256.New() h.Write(data) diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index d3dc90b85..ba9bc3af7 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -290,9 +290,13 @@ func TestUnconfirmedTxs(t *testing.T) { for i, c := range GetClients() { mc, ok := c.(client.MempoolClient) require.True(t, ok, "%d", i) - txs, err := mc.UnconfirmedTxs(1) + res, err := mc.UnconfirmedTxs(1) require.Nil(t, err, "%d: %+v", i, err) - assert.Exactly(t, types.Txs{tx}, types.Txs(txs.Txs)) + + assert.Equal(t, 1, res.Count) + assert.Equal(t, 1, res.Total) + assert.Equal(t, mempool.TxsBytes(), res.TotalBytes) + assert.Exactly(t, types.Txs{tx}, types.Txs(res.Txs)) } mempool.Flush() @@ -311,7 +315,9 @@ func TestNumUnconfirmedTxs(t *testing.T) { res, err := mc.NumUnconfirmedTxs() require.Nil(t, err, "%d: %+v", i, err) - assert.Equal(t, mempoolSize, res.N) + assert.Equal(t, mempoolSize, res.Count) + assert.Equal(t, mempoolSize, res.Total) + assert.Equal(t, mempool.TxsBytes(), res.TotalBytes) } mempool.Flush() diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 0295bf7f3..d4993074d 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -248,27 +248,32 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { // // ```json // { -// "error": "", -// "result": { -// "txs": [], -// "n_txs": "0" -// }, -// "id": "", -// "jsonrpc": "2.0" -// } +// "result" : { +// "txs" : [], +// "total_bytes" : "0", +// "n_txs" : "0", +// "total" : "0" +// }, +// "jsonrpc" : "2.0", +// "id" : "" +// } +// ``` // // ### Query Parameters // // | Parameter | Type | Default | Required | Description | // |-----------+------+---------+----------+--------------------------------------| // | limit | int | 30 | false | Maximum number of entries (max: 100) | -// ``` func UnconfirmedTxs(limit int) (*ctypes.ResultUnconfirmedTxs, error) { // reuse per_page validator limit = validatePerPage(limit) txs := mempool.ReapMaxTxs(limit) - return &ctypes.ResultUnconfirmedTxs{N: len(txs), Txs: txs}, nil + return &ctypes.ResultUnconfirmedTxs{ + Count: len(txs), + Total: mempool.Size(), + TotalBytes: mempool.TxsBytes(), + Txs: txs}, nil } // Get number of unconfirmed transactions. @@ -291,15 +296,19 @@ func UnconfirmedTxs(limit int) (*ctypes.ResultUnconfirmedTxs, error) { // // ```json // { -// "error": "", -// "result": { -// "txs": null, -// "n_txs": "0" -// }, -// "id": "", -// "jsonrpc": "2.0" +// "jsonrpc" : "2.0", +// "id" : "", +// "result" : { +// "n_txs" : "0", +// "total_bytes" : "0", +// "txs" : null, +// "total" : "0" +// } // } // ``` func NumUnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) { - return &ctypes.ResultUnconfirmedTxs{N: mempool.Size()}, nil + return &ctypes.ResultUnconfirmedTxs{ + Count: mempool.Size(), + Total: mempool.Size(), + TotalBytes: mempool.TxsBytes()}, nil } diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 85f2746d4..74457b38a 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -178,8 +178,10 @@ type ResultTxSearch struct { // List of mempool txs type ResultUnconfirmedTxs struct { - N int `json:"n_txs"` - Txs []types.Tx `json:"txs"` + Count int `json:"n_txs"` + Total int `json:"total"` + TotalBytes int64 `json:"total_bytes"` + Txs []types.Tx `json:"txs"` } // Info abci msg