From 7c0f51e24bab2144dcdae7f23c4d392be161f8e7 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 28 Apr 2017 23:59:02 -0400 Subject: [PATCH] remove viper from mempool --- consensus/replay_file.go | 5 +---- mempool/mempool.go | 28 +++++++++++++--------------- mempool/reactor.go | 18 ++++++++++++------ node/node.go | 15 +++++++++++++-- 4 files changed, 39 insertions(+), 27 deletions(-) diff --git a/consensus/replay_file.go b/consensus/replay_file.go index 1d2a91f31..5717adda8 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -11,7 +11,6 @@ import ( "github.com/spf13/viper" bc "github.com/tendermint/tendermint/blockchain" - mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" @@ -262,9 +261,7 @@ func newConsensusStateForReplay(config *viper.Viper) *ConsensusState { cmn.Exit(cmn.Fmt("Failed to start event switch: %v", err)) } - mempool := mempl.NewMempool(config, proxyApp.Mempool()) - - consensusState := NewConsensusState(config, state.Copy(), proxyApp.Consensus(), blockStore, mempool) + consensusState := NewConsensusState(config, state.Copy(), proxyApp.Consensus(), blockStore, types.MockMempool{}) consensusState.SetEventSwitch(eventSwitch) return consensusState } diff --git a/mempool/mempool.go b/mempool/mempool.go index 621f0931e..46a78d268 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -7,14 +7,13 @@ import ( "sync/atomic" "time" - "github.com/spf13/viper" - abci "github.com/tendermint/abci/types" - "github.com/tendermint/tendermint/proxy" - "github.com/tendermint/tendermint/types" auto "github.com/tendermint/tmlibs/autofile" "github.com/tendermint/tmlibs/clist" - . "github.com/tendermint/tmlibs/common" + cmn "github.com/tendermint/tmlibs/common" + + "github.com/tendermint/tendermint/proxy" + "github.com/tendermint/tendermint/types" ) /* @@ -48,7 +47,7 @@ TODO: Better handle abci client errors. (make it automatically handle connection const cacheSize = 100000 type Mempool struct { - config *viper.Viper + config Config proxyMtx sync.Mutex proxyAppConn proxy.AppConnMempool @@ -67,7 +66,7 @@ type Mempool struct { wal *auto.AutoFile } -func NewMempool(config *viper.Viper, proxyAppConn proxy.AppConnMempool) *Mempool { +func NewMempool(config Config, proxyAppConn proxy.AppConnMempool) *Mempool { mempool := &Mempool{ config: config, proxyAppConn: proxyAppConn, @@ -86,17 +85,17 @@ func NewMempool(config *viper.Viper, proxyAppConn proxy.AppConnMempool) *Mempool } func (mem *Mempool) initWAL() { - walDir := mem.config.GetString("mempool_wal_dir") + walDir := mem.config.WalDir if walDir != "" { - err := EnsureDir(walDir, 0700) + err := cmn.EnsureDir(walDir, 0700) if err != nil { log.Error("Error ensuring Mempool wal dir", "error", err) - PanicSanity(err) + cmn.PanicSanity(err) } af, err := auto.OpenAutoFile(walDir + "/wal") if err != nil { log.Error("Error opening Mempool wal file", "error", err) - PanicSanity(err) + cmn.PanicSanity(err) } mem.wal = af } @@ -220,7 +219,7 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) { case *abci.Response_CheckTx: memTx := mem.recheckCursor.Value.(*mempoolTx) if !bytes.Equal(req.GetCheckTx().Tx, memTx.tx) { - PanicSanity(Fmt("Unexpected tx response from proxy during recheck\n"+ + cmn.PanicSanity(cmn.Fmt("Unexpected tx response from proxy during recheck\n"+ "Expected %X, got %X", r.CheckTx.Data, memTx.tx)) } if r.CheckTx.Code == abci.CodeType_OK { @@ -270,7 +269,7 @@ func (mem *Mempool) collectTxs(maxTxs int) types.Txs { } else if maxTxs < 0 { maxTxs = mem.txs.Len() } - txs := make([]types.Tx, 0, MinInt(mem.txs.Len(), maxTxs)) + txs := make([]types.Tx, 0, cmn.MinInt(mem.txs.Len(), maxTxs)) for e := mem.txs.Front(); e != nil && len(txs) < maxTxs; e = e.Next() { memTx := e.Value.(*mempoolTx) txs = append(txs, memTx.tx) @@ -299,8 +298,7 @@ func (mem *Mempool) Update(height int, txs types.Txs) { // Recheck mempool txs if any txs were committed in the block // NOTE/XXX: in some apps a tx could be invalidated due to EndBlock, // so we really still do need to recheck, but this is for debugging - if mem.config.GetBool("mempool_recheck") && - (mem.config.GetBool("mempool_recheck_empty") || len(txs) > 0) { + if mem.config.Recheck && (mem.config.RecheckEmpty || len(txs) > 0) { log.Info("Recheck txs", "numtxs", len(goodTxs)) mem.recheckTxs(goodTxs) // At this point, mem.txs are being rechecked. diff --git a/mempool/reactor.go b/mempool/reactor.go index 6bea53b0c..3f25cc562 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -6,13 +6,12 @@ import ( "reflect" "time" - "github.com/spf13/viper" - abci "github.com/tendermint/abci/types" "github.com/tendermint/go-wire" + "github.com/tendermint/tmlibs/clist" + "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" - "github.com/tendermint/tmlibs/clist" ) const ( @@ -22,15 +21,22 @@ const ( peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount ) +type Config struct { + Recheck bool // true + RecheckEmpty bool // true + Broadcast bool // true + WalDir string // rootDir+"/data/mempool.wal") +} + // MempoolReactor handles mempool tx broadcasting amongst peers. type MempoolReactor struct { p2p.BaseReactor - config *viper.Viper + config Config Mempool *Mempool evsw types.EventSwitch } -func NewMempoolReactor(config *viper.Viper, mempool *Mempool) *MempoolReactor { +func NewMempoolReactor(config Config, mempool *Mempool) *MempoolReactor { memR := &MempoolReactor{ config: config, Mempool: mempool, @@ -103,7 +109,7 @@ type Peer interface { // TODO: Handle mempool or reactor shutdown? // As is this routine may block forever if no new txs come in. func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) { - if !memR.config.GetBool("mempool_broadcast") { + if !memR.config.Broadcast { return } diff --git a/node/node.go b/node/node.go index 0471bf3c7..f7aca6d05 100644 --- a/node/node.go +++ b/node/node.go @@ -124,8 +124,8 @@ func NewNode(config *viper.Viper, privValidator *types.PrivValidator, clientCrea bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyApp.Consensus(), blockStore, fastSync) // Make MempoolReactor - mempool := mempl.NewMempool(config, proxyApp.Mempool()) - mempoolReactor := mempl.NewMempoolReactor(config, mempool) + mempool := mempl.NewMempool(mempoolConfig(config), proxyApp.Mempool()) + mempoolReactor := mempl.NewMempoolReactor(mempoolConfig(config), mempool) // Make ConsensusReactor consensusState := consensus.NewConsensusState(config, state.Copy(), proxyApp.Consensus(), blockStore, mempool) @@ -431,3 +431,14 @@ func ProtocolAndAddress(listenAddr string) (string, string) { } return protocol, address } + +//------------------------------------------------------------------------------ + +func mempoolConfig(config *viper.Viper) mempl.Config { + return mempl.Config{ + Recheck: config.GetBool("mempool_recheck"), + RecheckEmpty: config.GetBool("mempool_recheck_empty"), + Broadcast: config.GetBool("mempool_broadcast"), + WalDir: config.GetString("mempool_wal_dir"), + } +}