Browse Source

mempool: remove vestigal mempool wal (#6396)

v0.34/x
Sam Kleinman 4 years ago
committed by GitHub
parent
commit
8eccaf9535
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 2 additions and 145 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +0
    -12
      config/config.go
  3. +0
    -3
      config/config_test.go
  4. +0
    -1
      config/toml.go
  5. +1
    -45
      mempool/clist_mempool.go
  6. +0
    -63
      mempool/clist_mempool_test.go
  7. +0
    -8
      mempool/mempool.go
  8. +0
    -13
      node/node.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -52,6 +52,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi
- Data Storage - Data Storage
- [store/state/evidence/light] \#5771 Use an order-preserving varint key encoding (@cmwaters) - [store/state/evidence/light] \#5771 Use an order-preserving varint key encoding (@cmwaters)
- [mempool] \#6396 Remove mempool's write ahead log (WAL), (previously unused by the tendermint code). (@tychoish)
### FEATURES ### FEATURES


+ 0
- 12
config/config.go View File

@ -720,7 +720,6 @@ type MempoolConfig struct {
RootDir string `mapstructure:"home"` RootDir string `mapstructure:"home"`
Recheck bool `mapstructure:"recheck"` Recheck bool `mapstructure:"recheck"`
Broadcast bool `mapstructure:"broadcast"` Broadcast bool `mapstructure:"broadcast"`
WalPath string `mapstructure:"wal-dir"`
// Maximum number of transactions in the mempool // Maximum number of transactions in the mempool
Size int `mapstructure:"size"` Size int `mapstructure:"size"`
// Limit the total size of all txs in the mempool. // Limit the total size of all txs in the mempool.
@ -747,7 +746,6 @@ func DefaultMempoolConfig() *MempoolConfig {
return &MempoolConfig{ return &MempoolConfig{
Recheck: true, Recheck: true,
Broadcast: true, Broadcast: true,
WalPath: "",
// Each signature verification takes .5ms, Size reduced until we implement // Each signature verification takes .5ms, Size reduced until we implement
// ABCI Recheck // ABCI Recheck
Size: 5000, Size: 5000,
@ -764,16 +762,6 @@ func TestMempoolConfig() *MempoolConfig {
return cfg return cfg
} }
// WalDir returns the full path to the mempool's write-ahead log
func (cfg *MempoolConfig) WalDir() string {
return rootify(cfg.WalPath, cfg.RootDir)
}
// WalEnabled returns true if the WAL is enabled.
func (cfg *MempoolConfig) WalEnabled() bool {
return cfg.WalPath != ""
}
// ValidateBasic performs basic validation (checking param bounds, etc.) and // ValidateBasic performs basic validation (checking param bounds, etc.) and
// returns an error if any check fails. // returns an error if any check fails.
func (cfg *MempoolConfig) ValidateBasic() error { func (cfg *MempoolConfig) ValidateBasic() error {


+ 0
- 3
config/config_test.go View File

@ -22,12 +22,9 @@ func TestDefaultConfig(t *testing.T) {
cfg.SetRoot("/foo") cfg.SetRoot("/foo")
cfg.Genesis = "bar" cfg.Genesis = "bar"
cfg.DBPath = "/opt/data" cfg.DBPath = "/opt/data"
cfg.Mempool.WalPath = "wal/mem/"
assert.Equal("/foo/bar", cfg.GenesisFile()) assert.Equal("/foo/bar", cfg.GenesisFile())
assert.Equal("/opt/data", cfg.DBDir()) assert.Equal("/opt/data", cfg.DBDir())
assert.Equal("/foo/wal/mem", cfg.Mempool.WalDir())
} }
func TestConfigValidateBasic(t *testing.T) { func TestConfigValidateBasic(t *testing.T) {


+ 0
- 1
config/toml.go View File

@ -357,7 +357,6 @@ dial-timeout = "{{ .P2P.DialTimeout }}"
recheck = {{ .Mempool.Recheck }} recheck = {{ .Mempool.Recheck }}
broadcast = {{ .Mempool.Broadcast }} broadcast = {{ .Mempool.Broadcast }}
wal-dir = "{{ js .Mempool.WalPath }}"
# Maximum number of transactions in the mempool # Maximum number of transactions in the mempool
size = {{ .Mempool.Size }} size = {{ .Mempool.Size }}


+ 1
- 45
mempool/clist_mempool.go View File

@ -11,11 +11,9 @@ import (
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config" cfg "github.com/tendermint/tendermint/config"
auto "github.com/tendermint/tendermint/libs/autofile"
"github.com/tendermint/tendermint/libs/clist" "github.com/tendermint/tendermint/libs/clist"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
tmmath "github.com/tendermint/tendermint/libs/math" tmmath "github.com/tendermint/tendermint/libs/math"
tmos "github.com/tendermint/tendermint/libs/os"
tmsync "github.com/tendermint/tendermint/libs/sync" tmsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
@ -25,8 +23,6 @@ import (
// TxKeySize is the size of the transaction key index // TxKeySize is the size of the transaction key index
const TxKeySize = sha256.Size const TxKeySize = sha256.Size
var newline = []byte("\n")
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
// CListMempool is an ordered in-memory pool for transactions before they are // CListMempool is an ordered in-memory pool for transactions before they are
@ -51,8 +47,7 @@ type CListMempool struct {
preCheck PreCheckFunc preCheck PreCheckFunc
postCheck PostCheckFunc postCheck PostCheckFunc
wal *auto.AutoFile // a log of mempool txs
txs *clist.CList // concurrent linked-list of good txs
txs *clist.CList // concurrent linked-list of good txs
proxyAppConn proxy.AppConnMempool proxyAppConn proxy.AppConnMempool
// Track whether we're rechecking txs. // Track whether we're rechecking txs.
@ -137,33 +132,6 @@ func WithMetrics(metrics *Metrics) CListMempoolOption {
return func(mem *CListMempool) { mem.metrics = metrics } return func(mem *CListMempool) { mem.metrics = metrics }
} }
func (mem *CListMempool) InitWAL() error {
var (
walDir = mem.config.WalDir()
walFile = walDir + "/wal"
)
const perm = 0700
if err := tmos.EnsureDir(walDir, perm); err != nil {
return err
}
af, err := auto.OpenAutoFile(walFile)
if err != nil {
return fmt.Errorf("can't open autofile %s: %w", walFile, err)
}
mem.wal = af
return nil
}
func (mem *CListMempool) CloseWAL() {
if err := mem.wal.Close(); err != nil {
mem.logger.Error("Error closing WAL", "err", err)
}
mem.wal = nil
}
// Safe for concurrent use by multiple goroutines. // Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) Lock() { func (mem *CListMempool) Lock() {
mem.updateMtx.Lock() mem.updateMtx.Lock()
@ -253,18 +221,6 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx
} }
} }
// NOTE: writing to the WAL and calling proxy must be done before adding tx
// to the cache. otherwise, if either of them fails, next time CheckTx is
// called with tx, ErrTxInCache will be returned without tx being checked at
// all even once.
if mem.wal != nil {
// TODO: Notify administrators when WAL fails
_, err := mem.wal.Write(append([]byte(tx), newline...))
if err != nil {
return fmt.Errorf("wal.Write: %w", err)
}
}
// NOTE: proxyAppConn may error if tx buffer is full // NOTE: proxyAppConn may error if tx buffer is full
if err := mem.proxyAppConn.Error(); err != nil { if err := mem.proxyAppConn.Error(); err != nil {
return err return err


+ 0
- 63
mempool/clist_mempool_test.go View File

@ -3,13 +3,10 @@ package mempool
import ( import (
"context" "context"
"crypto/rand" "crypto/rand"
"crypto/sha256"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"io/ioutil"
mrand "math/rand" mrand "math/rand"
"os" "os"
"path/filepath"
"testing" "testing"
"time" "time"
@ -418,55 +415,6 @@ func TestSerialReap(t *testing.T) {
reapCheck(600) reapCheck(600)
} }
func TestMempoolCloseWAL(t *testing.T) {
// 1. Create the temporary directory for mempool and WAL testing.
rootDir, err := ioutil.TempDir("", "mempool-test")
require.Nil(t, err, "expecting successful tmpdir creation")
// 2. Ensure that it doesn't contain any elements -- Sanity check
m1, err := filepath.Glob(filepath.Join(rootDir, "*"))
require.Nil(t, err, "successful globbing expected")
require.Equal(t, 0, len(m1), "no matches yet")
// 3. Create the mempool
wcfg := cfg.DefaultConfig()
wcfg.Mempool.RootDir = rootDir
app := kvstore.NewApplication()
cc := proxy.NewLocalClientCreator(app)
mempool, cleanup := newMempoolWithAppAndConfig(cc, wcfg)
defer cleanup()
mempool.height = 10
err = mempool.InitWAL()
require.NoError(t, err)
// 4. Ensure that the directory contains the WAL file
m2, err := filepath.Glob(filepath.Join(rootDir, "*"))
require.Nil(t, err, "successful globbing expected")
require.Equal(t, 1, len(m2), "expecting the wal match in")
// 5. Write some contents to the WAL
err = mempool.CheckTx(types.Tx([]byte("foo")), nil, TxInfo{})
require.NoError(t, err)
walFilepath := mempool.wal.Path
sum1 := checksumFile(walFilepath, t)
// 6. Sanity check to ensure that the written TX matches the expectation.
require.Equal(t, sum1, checksumIt([]byte("foo\n")), "foo with a newline should be written")
// 7. Invoke CloseWAL() and ensure it discards the
// WAL thus any other write won't go through.
mempool.CloseWAL()
err = mempool.CheckTx(types.Tx([]byte("bar")), nil, TxInfo{})
require.NoError(t, err)
sum2 := checksumFile(walFilepath, t)
require.Equal(t, sum1, sum2, "expected no change to the WAL after invoking CloseWAL() since it was discarded")
// 8. Sanity check to ensure that the WAL file still exists
m3, err := filepath.Glob(filepath.Join(rootDir, "*"))
require.Nil(t, err, "successful globbing expected")
require.Equal(t, 1, len(m3), "expecting the wal match in")
}
func TestMempool_CheckTxChecksTxSize(t *testing.T) { func TestMempool_CheckTxChecksTxSize(t *testing.T) {
app := kvstore.NewApplication() app := kvstore.NewApplication()
cc := proxy.NewLocalClientCreator(app) cc := proxy.NewLocalClientCreator(app)
@ -651,17 +599,6 @@ func newRemoteApp(
} }
return clientCreator, server return clientCreator, server
} }
func checksumIt(data []byte) string {
h := sha256.New()
h.Write(data)
return fmt.Sprintf("%x", h.Sum(nil))
}
func checksumFile(p string, t *testing.T) string {
data, err := ioutil.ReadFile(p)
require.Nil(t, err, "expecting successful read of %q", p)
return checksumIt(data)
}
func abciResponses(n int, code uint32) []*abci.ResponseDeliverTx { func abciResponses(n int, code uint32) []*abci.ResponseDeliverTx {
responses := make([]*abci.ResponseDeliverTx, 0, n) responses := make([]*abci.ResponseDeliverTx, 0, n)


+ 0
- 8
mempool/mempool.go View File

@ -69,14 +69,6 @@ type Mempool interface {
// TxsBytes returns the total size of all txs in the mempool. // TxsBytes returns the total size of all txs in the mempool.
TxsBytes() int64 TxsBytes() int64
// InitWAL creates a directory for the WAL file and opens a file itself. If
// there is an error, it will be of type *PathError.
InitWAL() error
// CloseWAL closes and discards the underlying WAL file.
// Any further writes will not be relayed to disk.
CloseWAL()
} }
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------


+ 0
- 13
node/node.go View File

@ -1322,14 +1322,6 @@ func (n *Node) OnStart() error {
n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr) n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr)
} }
// Start the mempool.
if n.config.Mempool.WalEnabled() {
err := n.mempool.InitWAL()
if err != nil {
return fmt.Errorf("init mempool WAL: %w", err)
}
}
// Start the transport. // Start the transport.
addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID, n.config.P2P.ListenAddress)) addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID, n.config.P2P.ListenAddress))
if err != nil { if err != nil {
@ -1469,11 +1461,6 @@ func (n *Node) OnStop() {
} }
} }
// stop mempool WAL
if n.config.Mempool.WalEnabled() {
n.mempool.CloseWAL()
}
if err := n.transport.Close(); err != nil { if err := n.transport.Close(); err != nil {
n.Logger.Error("Error closing transport", "err", err) n.Logger.Error("Error closing transport", "err", err)
} }


Loading…
Cancel
Save