diff --git a/consensus/common_test.go b/consensus/common_test.go index ea1415924..55f4d5828 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -255,7 +255,7 @@ func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state sm.S proxyAppConnCon := abcicli.NewLocalClient(mtx, app) // Make Mempool - mempool := mempl.NewMempool(thisConfig.Mempool, proxyAppConnMem, 0) + mempool := mempl.NewMempool(thisConfig.Mempool, proxyAppConnMem, 0, mempl.NopMetrics()) mempool.SetLogger(log.TestingLogger().With("module", "mempool")) if thisConfig.Consensus.WaitForTxs() { mempool.EnableTxsAvailable() diff --git a/mempool/mempool.go b/mempool/mempool.go index 5af16b3c9..a1f9cd179 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -83,10 +83,12 @@ type Mempool struct { wal *auto.AutoFile logger log.Logger + + metrics *Metrics } // NewMempool returns a new Mempool with the given configuration and connection to an application. -func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool, height int64) *Mempool { +func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool, height int64, metrics *Metrics) *Mempool { mempool := &Mempool{ config: config, proxyAppConn: proxyAppConn, @@ -98,6 +100,7 @@ func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool, he recheckEnd: nil, logger: log.NewNopLogger(), cache: newTxCache(config.CacheSize), + metrics: metrics, } proxyAppConn.SetResponseCallback(mempool.resCb) return mempool @@ -250,6 +253,7 @@ func (mem *Mempool) resCb(req *abci.Request, res *abci.Response) { } else { mem.resCbRecheck(req, res) } + mem.metrics.Size.Set(float64(mem.Size())) } func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) { @@ -393,6 +397,7 @@ func (mem *Mempool) Update(height int64, txs types.Txs) error { // mem.recheckCursor re-scans mem.txs and possibly removes some txs. // Before mem.Reap(), we should wait for mem.recheckCursor to be nil. } + mem.metrics.Size.Set(float64(mem.Size())) return nil } diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index a67adf6d3..45aa27a8b 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -33,7 +33,7 @@ func newMempoolWithApp(cc proxy.ClientCreator) *Mempool { if err != nil { panic(err) } - mempool := NewMempool(config.Mempool, appConnMem, 0) + mempool := NewMempool(config.Mempool, appConnMem, 0, NopMetrics()) mempool.SetLogger(log.TestingLogger()) return mempool } @@ -241,7 +241,7 @@ func TestMempoolCloseWAL(t *testing.T) { app := kvstore.NewKVStoreApplication() cc := proxy.NewLocalClientCreator(app) appConnMem, _ := cc.NewABCIClient() - mempool := NewMempool(wcfg, appConnMem, 10) + mempool := NewMempool(wcfg, appConnMem, 10, NopMetrics()) mempool.InitWAL() // 4. Ensure that the directory contains the WAL file diff --git a/mempool/metrics.go b/mempool/metrics.go new file mode 100644 index 000000000..3457285f3 --- /dev/null +++ b/mempool/metrics.go @@ -0,0 +1,17 @@ +package mempool + +import "github.com/go-kit/kit/metrics" +import "github.com/go-kit/kit/metrics/discard" + +// Metrics contains metrics exposed by this package. +// see MetricsProvider for descriptions. +type Metrics struct { + Size metrics.Gauge +} + +// NopMetrics returns no-op Metrics. +func NopMetrics() *Metrics { + return &Metrics{ + Size: discard.NewGauge(), + } +} diff --git a/node/node.go b/node/node.go index a8671c9f8..acb3461b0 100644 --- a/node/node.go +++ b/node/node.go @@ -91,11 +91,11 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) { } // MetricsProvider returns a consensus and p2p Metrics. -type MetricsProvider func() (*cs.Metrics, *p2p.Metrics) +type MetricsProvider func() (*cs.Metrics, *p2p.Metrics, *mempl.Metrics) // DefaultMetricsProvider returns a consensus and p2p Metrics build using // Prometheus client library. -func DefaultMetricsProvider() (*cs.Metrics, *p2p.Metrics) { +func DefaultMetricsProvider() (*cs.Metrics, *p2p.Metrics, *mempl.Metrics) { return &cs.Metrics{ Height: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Subsystem: "consensus", @@ -167,6 +167,12 @@ func DefaultMetricsProvider() (*cs.Metrics, *p2p.Metrics) { Name: "peers", Help: "Number of peers.", }, []string{}), + }, &mempl.Metrics{ + Size: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Subsystem: "mempool", + Name: "size", + Help: "Size of the mempool (number of uncommitted transactions).", + }, []string{}), } } @@ -294,9 +300,11 @@ func NewNode(config *cfg.Config, consensusLogger.Info("This node is not a validator", "addr", privValidator.GetAddress(), "pubKey", privValidator.GetPubKey()) } + csMetrics, p2pMetrics, memplMetrics := metricsProvider() + // Make MempoolReactor mempoolLogger := logger.With("module", "mempool") - mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool(), state.LastBlockHeight) + mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool(), state.LastBlockHeight, memplMetrics) mempool.InitWAL() // no need to have the mempool wal during tests mempool.SetLogger(mempoolLogger) mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool) @@ -326,8 +334,6 @@ func NewNode(config *cfg.Config, bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) bcReactor.SetLogger(logger.With("module", "blockchain")) - csMetrics, p2pMetrics := metricsProvider() - // Make ConsensusReactor consensusState := cs.NewConsensusState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evidencePool, csMetrics)