Browse Source

remove viper from mempool

pull/484/head
Ethan Buchman 8 years ago
parent
commit
7c0f51e24b
4 changed files with 39 additions and 27 deletions
  1. +1
    -4
      consensus/replay_file.go
  2. +13
    -15
      mempool/mempool.go
  3. +12
    -6
      mempool/reactor.go
  4. +13
    -2
      node/node.go

+ 1
- 4
consensus/replay_file.go View File

@ -11,7 +11,6 @@ import (
"github.com/spf13/viper" "github.com/spf13/viper"
bc "github.com/tendermint/tendermint/blockchain" bc "github.com/tendermint/tendermint/blockchain"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
@ -262,9 +261,7 @@ func newConsensusStateForReplay(config *viper.Viper) *ConsensusState {
cmn.Exit(cmn.Fmt("Failed to start event switch: %v", err)) cmn.Exit(cmn.Fmt("Failed to start event switch: %v", err))
} }
mempool := mempl.NewMempool(config, proxyApp.Mempool())
consensusState := NewConsensusState(config, state.Copy(), proxyApp.Consensus(), blockStore, mempool)
consensusState := NewConsensusState(config, state.Copy(), proxyApp.Consensus(), blockStore, types.MockMempool{})
consensusState.SetEventSwitch(eventSwitch) consensusState.SetEventSwitch(eventSwitch)
return consensusState return consensusState
} }

+ 13
- 15
mempool/mempool.go View File

@ -7,14 +7,13 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/spf13/viper"
abci "github.com/tendermint/abci/types" abci "github.com/tendermint/abci/types"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
auto "github.com/tendermint/tmlibs/autofile" auto "github.com/tendermint/tmlibs/autofile"
"github.com/tendermint/tmlibs/clist" "github.com/tendermint/tmlibs/clist"
. "github.com/tendermint/tmlibs/common"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
) )
/* /*
@ -48,7 +47,7 @@ TODO: Better handle abci client errors. (make it automatically handle connection
const cacheSize = 100000 const cacheSize = 100000
type Mempool struct { type Mempool struct {
config *viper.Viper
config Config
proxyMtx sync.Mutex proxyMtx sync.Mutex
proxyAppConn proxy.AppConnMempool proxyAppConn proxy.AppConnMempool
@ -67,7 +66,7 @@ type Mempool struct {
wal *auto.AutoFile wal *auto.AutoFile
} }
func NewMempool(config *viper.Viper, proxyAppConn proxy.AppConnMempool) *Mempool {
func NewMempool(config Config, proxyAppConn proxy.AppConnMempool) *Mempool {
mempool := &Mempool{ mempool := &Mempool{
config: config, config: config,
proxyAppConn: proxyAppConn, proxyAppConn: proxyAppConn,
@ -86,17 +85,17 @@ func NewMempool(config *viper.Viper, proxyAppConn proxy.AppConnMempool) *Mempool
} }
func (mem *Mempool) initWAL() { func (mem *Mempool) initWAL() {
walDir := mem.config.GetString("mempool_wal_dir")
walDir := mem.config.WalDir
if walDir != "" { if walDir != "" {
err := EnsureDir(walDir, 0700)
err := cmn.EnsureDir(walDir, 0700)
if err != nil { if err != nil {
log.Error("Error ensuring Mempool wal dir", "error", err) log.Error("Error ensuring Mempool wal dir", "error", err)
PanicSanity(err)
cmn.PanicSanity(err)
} }
af, err := auto.OpenAutoFile(walDir + "/wal") af, err := auto.OpenAutoFile(walDir + "/wal")
if err != nil { if err != nil {
log.Error("Error opening Mempool wal file", "error", err) log.Error("Error opening Mempool wal file", "error", err)
PanicSanity(err)
cmn.PanicSanity(err)
} }
mem.wal = af mem.wal = af
} }
@ -220,7 +219,7 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) {
case *abci.Response_CheckTx: case *abci.Response_CheckTx:
memTx := mem.recheckCursor.Value.(*mempoolTx) memTx := mem.recheckCursor.Value.(*mempoolTx)
if !bytes.Equal(req.GetCheckTx().Tx, memTx.tx) { if !bytes.Equal(req.GetCheckTx().Tx, memTx.tx) {
PanicSanity(Fmt("Unexpected tx response from proxy during recheck\n"+
cmn.PanicSanity(cmn.Fmt("Unexpected tx response from proxy during recheck\n"+
"Expected %X, got %X", r.CheckTx.Data, memTx.tx)) "Expected %X, got %X", r.CheckTx.Data, memTx.tx))
} }
if r.CheckTx.Code == abci.CodeType_OK { if r.CheckTx.Code == abci.CodeType_OK {
@ -270,7 +269,7 @@ func (mem *Mempool) collectTxs(maxTxs int) types.Txs {
} else if maxTxs < 0 { } else if maxTxs < 0 {
maxTxs = mem.txs.Len() maxTxs = mem.txs.Len()
} }
txs := make([]types.Tx, 0, MinInt(mem.txs.Len(), maxTxs))
txs := make([]types.Tx, 0, cmn.MinInt(mem.txs.Len(), maxTxs))
for e := mem.txs.Front(); e != nil && len(txs) < maxTxs; e = e.Next() { for e := mem.txs.Front(); e != nil && len(txs) < maxTxs; e = e.Next() {
memTx := e.Value.(*mempoolTx) memTx := e.Value.(*mempoolTx)
txs = append(txs, memTx.tx) txs = append(txs, memTx.tx)
@ -299,8 +298,7 @@ func (mem *Mempool) Update(height int, txs types.Txs) {
// Recheck mempool txs if any txs were committed in the block // Recheck mempool txs if any txs were committed in the block
// NOTE/XXX: in some apps a tx could be invalidated due to EndBlock, // NOTE/XXX: in some apps a tx could be invalidated due to EndBlock,
// so we really still do need to recheck, but this is for debugging // so we really still do need to recheck, but this is for debugging
if mem.config.GetBool("mempool_recheck") &&
(mem.config.GetBool("mempool_recheck_empty") || len(txs) > 0) {
if mem.config.Recheck && (mem.config.RecheckEmpty || len(txs) > 0) {
log.Info("Recheck txs", "numtxs", len(goodTxs)) log.Info("Recheck txs", "numtxs", len(goodTxs))
mem.recheckTxs(goodTxs) mem.recheckTxs(goodTxs)
// At this point, mem.txs are being rechecked. // At this point, mem.txs are being rechecked.


+ 12
- 6
mempool/reactor.go View File

@ -6,13 +6,12 @@ import (
"reflect" "reflect"
"time" "time"
"github.com/spf13/viper"
abci "github.com/tendermint/abci/types" abci "github.com/tendermint/abci/types"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
"github.com/tendermint/tmlibs/clist"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
"github.com/tendermint/tmlibs/clist"
) )
const ( const (
@ -22,15 +21,22 @@ const (
peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
) )
type Config struct {
Recheck bool // true
RecheckEmpty bool // true
Broadcast bool // true
WalDir string // rootDir+"/data/mempool.wal")
}
// MempoolReactor handles mempool tx broadcasting amongst peers. // MempoolReactor handles mempool tx broadcasting amongst peers.
type MempoolReactor struct { type MempoolReactor struct {
p2p.BaseReactor p2p.BaseReactor
config *viper.Viper
config Config
Mempool *Mempool Mempool *Mempool
evsw types.EventSwitch evsw types.EventSwitch
} }
func NewMempoolReactor(config *viper.Viper, mempool *Mempool) *MempoolReactor {
func NewMempoolReactor(config Config, mempool *Mempool) *MempoolReactor {
memR := &MempoolReactor{ memR := &MempoolReactor{
config: config, config: config,
Mempool: mempool, Mempool: mempool,
@ -103,7 +109,7 @@ type Peer interface {
// TODO: Handle mempool or reactor shutdown? // TODO: Handle mempool or reactor shutdown?
// As is this routine may block forever if no new txs come in. // As is this routine may block forever if no new txs come in.
func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) { func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) {
if !memR.config.GetBool("mempool_broadcast") {
if !memR.config.Broadcast {
return return
} }


+ 13
- 2
node/node.go View File

@ -124,8 +124,8 @@ func NewNode(config *viper.Viper, privValidator *types.PrivValidator, clientCrea
bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyApp.Consensus(), blockStore, fastSync) bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyApp.Consensus(), blockStore, fastSync)
// Make MempoolReactor // Make MempoolReactor
mempool := mempl.NewMempool(config, proxyApp.Mempool())
mempoolReactor := mempl.NewMempoolReactor(config, mempool)
mempool := mempl.NewMempool(mempoolConfig(config), proxyApp.Mempool())
mempoolReactor := mempl.NewMempoolReactor(mempoolConfig(config), mempool)
// Make ConsensusReactor // Make ConsensusReactor
consensusState := consensus.NewConsensusState(config, state.Copy(), proxyApp.Consensus(), blockStore, mempool) consensusState := consensus.NewConsensusState(config, state.Copy(), proxyApp.Consensus(), blockStore, mempool)
@ -431,3 +431,14 @@ func ProtocolAndAddress(listenAddr string) (string, string) {
} }
return protocol, address return protocol, address
} }
//------------------------------------------------------------------------------
func mempoolConfig(config *viper.Viper) mempl.Config {
return mempl.Config{
Recheck: config.GetBool("mempool_recheck"),
RecheckEmpty: config.GetBool("mempool_recheck_empty"),
Broadcast: config.GetBool("mempool_broadcast"),
WalDir: config.GetString("mempool_wal_dir"),
}
}

Loading…
Cancel
Save