Browse Source

mempool: cfg.CacheSize and expose InitWAL

pull/1121/head
Ethan Buchman 7 years ago
parent
commit
f06cc6630b
5 changed files with 29 additions and 13 deletions
  1. +16
    -2
      config/config.go
  2. +4
    -2
      consensus/replay_test.go
  3. +5
    -7
      mempool/mempool.go
  4. +3
    -2
      mempool/mempool_test.go
  5. +1
    -0
      node/node.go

+ 16
- 2
config/config.go View File

@ -60,9 +60,9 @@ func TestConfig() *Config {
BaseConfig: TestBaseConfig(), BaseConfig: TestBaseConfig(),
RPC: TestRPCConfig(), RPC: TestRPCConfig(),
P2P: TestP2PConfig(), P2P: TestP2PConfig(),
Mempool: DefaultMempoolConfig(),
Mempool: TestMempoolConfig(),
Consensus: TestConsensusConfig(), Consensus: TestConsensusConfig(),
TxIndex: DefaultTxIndexConfig(),
TxIndex: TestTxIndexConfig(),
} }
} }
@ -313,6 +313,7 @@ type MempoolConfig struct {
RecheckEmpty bool `mapstructure:"recheck_empty"` RecheckEmpty bool `mapstructure:"recheck_empty"`
Broadcast bool `mapstructure:"broadcast"` Broadcast bool `mapstructure:"broadcast"`
WalPath string `mapstructure:"wal_dir"` WalPath string `mapstructure:"wal_dir"`
CacheSize int `mapstructure:"cache_size"`
} }
// DefaultMempoolConfig returns a default configuration for the Tendermint mempool // DefaultMempoolConfig returns a default configuration for the Tendermint mempool
@ -322,9 +323,17 @@ func DefaultMempoolConfig() *MempoolConfig {
RecheckEmpty: true, RecheckEmpty: true,
Broadcast: true, Broadcast: true,
WalPath: filepath.Join(defaultDataDir, "mempool.wal"), WalPath: filepath.Join(defaultDataDir, "mempool.wal"),
CacheSize: 100000,
} }
} }
// TestMempoolConfig returns a configuration for testing the Tendermint mempool
func TestMempoolConfig() *MempoolConfig {
config := DefaultMempoolConfig()
config.CacheSize = 1000
return config
}
// WalDir returns the full path to the mempool's write-ahead log // WalDir returns the full path to the mempool's write-ahead log
func (m *MempoolConfig) WalDir() string { func (m *MempoolConfig) WalDir() string {
return rootify(m.WalPath, m.RootDir) return rootify(m.WalPath, m.RootDir)
@ -492,6 +501,11 @@ func DefaultTxIndexConfig() *TxIndexConfig {
} }
} }
// TestTxIndexConfig returns a default configuration for the transaction indexer.
func TestTxIndexConfig() *TxIndexConfig {
return DefaultTxIndexConfig()
}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// Utils // Utils


+ 4
- 2
consensus/replay_test.go View File

@ -82,12 +82,14 @@ func startNewConsensusStateAndWaitForBlock(t *testing.T, lastBlockHeight int64,
func sendTxs(cs *ConsensusState, ctx context.Context) { func sendTxs(cs *ConsensusState, ctx context.Context) {
i := 0 i := 0
for {
tx := []byte{byte(i)}
for i := 0; i < 256; i++ {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: default:
cs.mempool.CheckTx([]byte{byte(i)}, nil)
tx[0] = byte(i)
cs.mempool.CheckTx(tx, nil)
i++ i++
} }
} }


+ 5
- 7
mempool/mempool.go View File

@ -3,7 +3,6 @@ package mempool
import ( import (
"bytes" "bytes"
"container/list" "container/list"
"fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -49,7 +48,7 @@ TODO: Better handle abci client errors. (make it automatically handle connection
*/ */
const cacheSize = 100000
var ErrTxInCache = errors.New("Tx already exists in cache")
// Mempool is an ordered in-memory pool for transactions before they are proposed in a consensus // Mempool is an ordered in-memory pool for transactions before they are proposed in a consensus
// round. Transaction validity is checked using the CheckTx abci message before the transaction is // round. Transaction validity is checked using the CheckTx abci message before the transaction is
@ -92,9 +91,8 @@ func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool, he
recheckCursor: nil, recheckCursor: nil,
recheckEnd: nil, recheckEnd: nil,
logger: log.NewNopLogger(), logger: log.NewNopLogger(),
cache: newTxCache(cacheSize),
cache: newTxCache(config.CacheSize),
} }
mempool.initWAL()
proxyAppConn.SetResponseCallback(mempool.resCb) proxyAppConn.SetResponseCallback(mempool.resCb)
return mempool return mempool
} }
@ -131,7 +129,7 @@ func (mem *Mempool) CloseWAL() bool {
return true return true
} }
func (mem *Mempool) initWAL() {
func (mem *Mempool) InitWAL() {
walDir := mem.config.WalDir() walDir := mem.config.WalDir()
if walDir != "" { if walDir != "" {
err := cmn.EnsureDir(walDir, 0700) err := cmn.EnsureDir(walDir, 0700)
@ -192,7 +190,7 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
// CACHE // CACHE
if mem.cache.Exists(tx) { if mem.cache.Exists(tx) {
return fmt.Errorf("Tx already exists in cache")
return ErrTxInCache
} }
mem.cache.Push(tx) mem.cache.Push(tx)
// END CACHE // END CACHE
@ -449,7 +447,7 @@ func newTxCache(cacheSize int) *txCache {
// Reset resets the txCache to empty. // Reset resets the txCache to empty.
func (cache *txCache) Reset() { func (cache *txCache) Reset() {
cache.mtx.Lock() cache.mtx.Lock()
cache.map_ = make(map[string]struct{}, cacheSize)
cache.map_ = make(map[string]struct{}, cache.size)
cache.list.Init() cache.list.Init()
cache.mtx.Unlock() cache.mtx.Unlock()
} }


+ 3
- 2
mempool/mempool_test.go View File

@ -236,12 +236,13 @@ func TestMempoolCloseWAL(t *testing.T) {
require.Equal(t, 0, len(m1), "no matches yet") require.Equal(t, 0, len(m1), "no matches yet")
// 3. Create the mempool // 3. Create the mempool
wcfg := *(cfg.DefaultMempoolConfig())
wcfg := cfg.DefaultMempoolConfig()
wcfg.RootDir = rootDir wcfg.RootDir = rootDir
app := dummy.NewDummyApplication() app := dummy.NewDummyApplication()
cc := proxy.NewLocalClientCreator(app) cc := proxy.NewLocalClientCreator(app)
appConnMem, _ := cc.NewABCIClient() appConnMem, _ := cc.NewABCIClient()
mempool := NewMempool(&wcfg, appConnMem, 10)
mempool := NewMempool(wcfg, appConnMem, 10)
mempool.InitWAL()
// 4. Ensure that the directory contains the WAL file // 4. Ensure that the directory contains the WAL file
m2, err := filepath.Glob(filepath.Join(rootDir, "*")) m2, err := filepath.Glob(filepath.Join(rootDir, "*"))


+ 1
- 0
node/node.go View File

@ -189,6 +189,7 @@ func NewNode(config *cfg.Config,
// Make MempoolReactor // Make MempoolReactor
mempoolLogger := logger.With("module", "mempool") mempoolLogger := logger.With("module", "mempool")
mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool(), state.LastBlockHeight) mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool(), state.LastBlockHeight)
mempool.InitWAL() // no need to have the mempool wal during tests
mempool.SetLogger(mempoolLogger) mempool.SetLogger(mempoolLogger)
mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool) mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool)
mempoolReactor.SetLogger(mempoolLogger) mempoolReactor.SetLogger(mempoolLogger)


Loading…
Cancel
Save