Browse Source

rename SizeBytes to TxsTotalBytes

and other small fixes after Zarko's review
pull/3248/head
Anton Kaliaev 6 years ago
parent
commit
03aa40addd
No known key found for this signature in database GPG Key ID: 7B6881D965918214
5 changed files with 36 additions and 32 deletions
  1. +6
    -7
      config/config.go
  2. +7
    -5
      config/toml.go
  3. +7
    -5
      docs/tendermint-core/configuration.md
  4. +10
    -9
      mempool/mempool.go
  5. +6
    -6
      mempool/mempool_test.go

+ 6
- 7
config/config.go View File

@ -535,9 +535,8 @@ type MempoolConfig struct {
Broadcast bool `mapstructure:"broadcast"` Broadcast bool `mapstructure:"broadcast"`
WalPath string `mapstructure:"wal_dir"` WalPath string `mapstructure:"wal_dir"`
Size int `mapstructure:"size"` Size int `mapstructure:"size"`
MaxBytes int64 `mapstructure:"max_bytes"`
CacheSize int `mapstructure:"cache_size"` CacheSize int `mapstructure:"cache_size"`
// Maximum size of all txs in the mempool in bytes
MaxBytes int `mapstructure:"max_bytes"`
} }
// DefaultMempoolConfig returns a default configuration for the Tendermint mempool // DefaultMempoolConfig returns a default configuration for the Tendermint mempool
@ -546,11 +545,11 @@ func DefaultMempoolConfig() *MempoolConfig {
Recheck: true, Recheck: true,
Broadcast: true, Broadcast: true,
WalPath: "", WalPath: "",
// Each signature verification takes .5ms, size reduced until we implement
// Each signature verification takes .5ms, Size reduced until we implement
// ABCI Recheck // ABCI Recheck
Size: 5000, Size: 5000,
MaxBytes: 1024 * 1024 * 1024, // 1GB
CacheSize: 10000, CacheSize: 10000,
MaxBytes: 1000000000, // 1GB
} }
} }
@ -577,12 +576,12 @@ func (cfg *MempoolConfig) ValidateBasic() error {
if cfg.Size < 0 { if cfg.Size < 0 {
return errors.New("size can't be negative") return errors.New("size can't be negative")
} }
if cfg.MaxBytes < 0 {
return errors.New("max_bytes can't be negative")
}
if cfg.CacheSize < 0 { if cfg.CacheSize < 0 {
return errors.New("cache_size can't be negative") return errors.New("cache_size can't be negative")
} }
if cfg.MaxBytes <= 0 {
return errors.New("max_bytes must be a positive number")
}
return nil return nil
} }


+ 7
- 5
config/toml.go View File

@ -234,15 +234,17 @@ recheck = {{ .Mempool.Recheck }}
broadcast = {{ .Mempool.Broadcast }} broadcast = {{ .Mempool.Broadcast }}
wal_dir = "{{ js .Mempool.WalPath }}" wal_dir = "{{ js .Mempool.WalPath }}"
# size of the mempool
# Maximum number of transactions in the mempool
size = {{ .Mempool.Size }} size = {{ .Mempool.Size }}
# size of the cache (used to filter transactions we saw earlier)
cache_size = {{ .Mempool.CacheSize }}
# maximum size of all txs in the mempool in bytes
# Maximum size of the mempool in bytes
# This only accounts for raw transactions (e.g. given 1MB transactions and
# max_bytes=5MB, mempool will only accept 5 transactions).
max_bytes = {{ .Mempool.MaxBytes }} max_bytes = {{ .Mempool.MaxBytes }}
# Size of the cache (used to filter transactions we saw earlier) in transactions
cache_size = {{ .Mempool.CacheSize }}
##### consensus configuration options ##### ##### consensus configuration options #####
[consensus] [consensus]


+ 7
- 5
docs/tendermint-core/configuration.md View File

@ -183,14 +183,16 @@ recheck = true
broadcast = true broadcast = true
wal_dir = "" wal_dir = ""
# size of the mempool
# Maximum number of transactions in the mempool
size = 5000 size = 5000
# size of the cache (used to filter transactions we saw earlier)
cache_size = 10000
# Maximum size of the mempool in bytes
# This only accounts for raw transactions (e.g. given 1MB transactions and
# max_bytes=5MB, mempool will only accept 5 transactions).
max_bytes = 1073741824
# maximum size of all txs in the mempool in bytes
max_bytes = 1000000000
# Size of the cache (used to filter transactions we saw earlier) in transactions
cache_size = 10000
##### consensus configuration options ##### ##### consensus configuration options #####
[consensus] [consensus]


+ 10
- 9
mempool/mempool.go View File

@ -147,7 +147,7 @@ type Mempool struct {
preCheck PreCheckFunc preCheck PreCheckFunc
postCheck PostCheckFunc postCheck PostCheckFunc
sizeBytes int64 // size of all txs in the mempool in bytes
txsTotalBytes int64 // see TxsTotalBytes
// Keep a cache of already-seen txs. // Keep a cache of already-seen txs.
// This reduces the pressure on the proxyApp. // This reduces the pressure on the proxyApp.
@ -267,9 +267,9 @@ func (mem *Mempool) Size() int {
return mem.txs.Len() return mem.txs.Len()
} }
// SizeBytes returns the size of all txs in the mempool in bytes.
func (mem *Mempool) SizeBytes() int64 {
return atomic.LoadInt64(&mem.sizeBytes)
// TxsTotalBytes returns the total size of all txs in the mempool.
func (mem *Mempool) TxsTotalBytes() int64 {
return atomic.LoadInt64(&mem.txsTotalBytes)
} }
// FlushAppConn flushes the mempool connection to ensure async resCb calls are // FlushAppConn flushes the mempool connection to ensure async resCb calls are
@ -287,9 +287,10 @@ func (mem *Mempool) Flush() {
for e := mem.txs.Front(); e != nil; e = e.Next() { for e := mem.txs.Front(); e != nil; e = e.Next() {
mem.txs.Remove(e) mem.txs.Remove(e)
_ = atomic.SwapInt64(&mem.sizeBytes, 0)
e.DetachPrev() e.DetachPrev()
} }
_ = atomic.SwapInt64(&mem.txsTotalBytes, 0)
} }
// TxsFront returns the first transaction in the ordered list for peer // TxsFront returns the first transaction in the ordered list for peer
@ -318,7 +319,7 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
if mem.Size() >= mem.config.Size { if mem.Size() >= mem.config.Size {
return ErrMempoolIsFull return ErrMempoolIsFull
} else if int64(len(tx))+mem.SizeBytes() > int64(mem.config.MaxBytes) {
} else if int64(len(tx))+mem.TxsTotalBytes() > mem.config.MaxBytes {
return ErrMempoolIsFull return ErrMempoolIsFull
} }
@ -393,7 +394,7 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
tx: tx, tx: tx,
} }
mem.txs.PushBack(memTx) mem.txs.PushBack(memTx)
atomic.AddInt64(&mem.sizeBytes, int64(len(tx)))
atomic.AddInt64(&mem.txsTotalBytes, int64(len(tx)))
mem.logger.Info("Added good transaction", mem.logger.Info("Added good transaction",
"tx", TxID(tx), "tx", TxID(tx),
"res", r, "res", r,
@ -435,7 +436,7 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) {
// Tx became invalidated due to newly committed block. // Tx became invalidated due to newly committed block.
mem.logger.Info("Tx is no longer valid", "tx", TxID(tx), "res", r, "err", postCheckErr) mem.logger.Info("Tx is no longer valid", "tx", TxID(tx), "res", r, "err", postCheckErr)
mem.txs.Remove(mem.recheckCursor) mem.txs.Remove(mem.recheckCursor)
atomic.AddInt64(&mem.sizeBytes, int64(-len(tx)))
atomic.AddInt64(&mem.txsTotalBytes, int64(-len(tx)))
mem.recheckCursor.DetachPrev() mem.recheckCursor.DetachPrev()
// remove from cache (it might be good later) // remove from cache (it might be good later)
@ -609,7 +610,7 @@ func (mem *Mempool) removeTxs(txs types.Txs) []types.Tx {
if _, ok := txsMap[string(memTx.tx)]; ok { if _, ok := txsMap[string(memTx.tx)]; ok {
// remove from clist // remove from clist
mem.txs.Remove(e) mem.txs.Remove(e)
atomic.AddInt64(&mem.sizeBytes, int64(-len(memTx.tx)))
atomic.AddInt64(&mem.txsTotalBytes, int64(-len(memTx.tx)))
e.DetachPrev() e.DetachPrev()
// NOTE: we don't remove committed txs from the cache. // NOTE: we don't remove committed txs from the cache.


+ 6
- 6
mempool/mempool_test.go View File

@ -450,26 +450,26 @@ func TestMempoolMaxMsgSize(t *testing.T) {
} }
func TestMempoolSizeBytes(t *testing.T) {
func TestMempoolTxsTotalBytes(t *testing.T) {
app := kvstore.NewKVStoreApplication() app := kvstore.NewKVStoreApplication()
cc := proxy.NewLocalClientCreator(app) cc := proxy.NewLocalClientCreator(app)
mempool := newMempoolWithApp(cc) mempool := newMempoolWithApp(cc)
assert.EqualValues(t, 0, mempool.SizeBytes())
assert.EqualValues(t, 0, mempool.TxsTotalBytes())
err := mempool.CheckTx([]byte{0x01}, nil) err := mempool.CheckTx([]byte{0x01}, nil)
require.NoError(t, err) require.NoError(t, err)
assert.EqualValues(t, 1, mempool.SizeBytes())
assert.EqualValues(t, 1, mempool.TxsTotalBytes())
mempool.Update(1, []types.Tx{[]byte{0x01}}, nil, nil) mempool.Update(1, []types.Tx{[]byte{0x01}}, nil, nil)
assert.EqualValues(t, 0, mempool.SizeBytes())
assert.EqualValues(t, 0, mempool.TxsTotalBytes())
err = mempool.CheckTx([]byte{0x02, 0x03}, nil) err = mempool.CheckTx([]byte{0x02, 0x03}, nil)
require.NoError(t, err) require.NoError(t, err)
assert.EqualValues(t, 2, mempool.SizeBytes())
assert.EqualValues(t, 2, mempool.TxsTotalBytes())
mempool.Flush() mempool.Flush()
assert.EqualValues(t, 0, mempool.SizeBytes())
assert.EqualValues(t, 0, mempool.TxsTotalBytes())
} }
func checksumIt(data []byte) string { func checksumIt(data []byte) string {


Loading…
Cancel
Save