Browse Source

mempool size metric

pull/1737/head
Anton Kaliaev 6 years ago
parent
commit
7efb73aa18
No known key found for this signature in database GPG Key ID: 7B6881D965918214
5 changed files with 37 additions and 9 deletions
  1. +1
    -1
      consensus/common_test.go
  2. +6
    -1
      mempool/mempool.go
  3. +2
    -2
      mempool/mempool_test.go
  4. +17
    -0
      mempool/metrics.go
  5. +11
    -5
      node/node.go

+ 1
- 1
consensus/common_test.go View File

@ -255,7 +255,7 @@ func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state sm.S
proxyAppConnCon := abcicli.NewLocalClient(mtx, app) proxyAppConnCon := abcicli.NewLocalClient(mtx, app)
// Make Mempool // Make Mempool
mempool := mempl.NewMempool(thisConfig.Mempool, proxyAppConnMem, 0)
mempool := mempl.NewMempool(thisConfig.Mempool, proxyAppConnMem, 0, mempl.NopMetrics())
mempool.SetLogger(log.TestingLogger().With("module", "mempool")) mempool.SetLogger(log.TestingLogger().With("module", "mempool"))
if thisConfig.Consensus.WaitForTxs() { if thisConfig.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable() mempool.EnableTxsAvailable()


+ 6
- 1
mempool/mempool.go View File

@ -83,10 +83,12 @@ type Mempool struct {
wal *auto.AutoFile wal *auto.AutoFile
logger log.Logger logger log.Logger
metrics *Metrics
} }
// NewMempool returns a new Mempool with the given configuration and connection to an application. // NewMempool returns a new Mempool with the given configuration and connection to an application.
func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool, height int64) *Mempool {
func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool, height int64, metrics *Metrics) *Mempool {
mempool := &Mempool{ mempool := &Mempool{
config: config, config: config,
proxyAppConn: proxyAppConn, proxyAppConn: proxyAppConn,
@ -98,6 +100,7 @@ func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool, he
recheckEnd: nil, recheckEnd: nil,
logger: log.NewNopLogger(), logger: log.NewNopLogger(),
cache: newTxCache(config.CacheSize), cache: newTxCache(config.CacheSize),
metrics: metrics,
} }
proxyAppConn.SetResponseCallback(mempool.resCb) proxyAppConn.SetResponseCallback(mempool.resCb)
return mempool return mempool
@ -250,6 +253,7 @@ func (mem *Mempool) resCb(req *abci.Request, res *abci.Response) {
} else { } else {
mem.resCbRecheck(req, res) mem.resCbRecheck(req, res)
} }
mem.metrics.Size.Set(float64(mem.Size()))
} }
func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) { func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
@ -393,6 +397,7 @@ func (mem *Mempool) Update(height int64, txs types.Txs) error {
// mem.recheckCursor re-scans mem.txs and possibly removes some txs. // mem.recheckCursor re-scans mem.txs and possibly removes some txs.
// Before mem.Reap(), we should wait for mem.recheckCursor to be nil. // Before mem.Reap(), we should wait for mem.recheckCursor to be nil.
} }
mem.metrics.Size.Set(float64(mem.Size()))
return nil return nil
} }


+ 2
- 2
mempool/mempool_test.go View File

@ -33,7 +33,7 @@ func newMempoolWithApp(cc proxy.ClientCreator) *Mempool {
if err != nil { if err != nil {
panic(err) panic(err)
} }
mempool := NewMempool(config.Mempool, appConnMem, 0)
mempool := NewMempool(config.Mempool, appConnMem, 0, NopMetrics())
mempool.SetLogger(log.TestingLogger()) mempool.SetLogger(log.TestingLogger())
return mempool return mempool
} }
@ -241,7 +241,7 @@ func TestMempoolCloseWAL(t *testing.T) {
app := kvstore.NewKVStoreApplication() app := kvstore.NewKVStoreApplication()
cc := proxy.NewLocalClientCreator(app) cc := proxy.NewLocalClientCreator(app)
appConnMem, _ := cc.NewABCIClient() appConnMem, _ := cc.NewABCIClient()
mempool := NewMempool(wcfg, appConnMem, 10)
mempool := NewMempool(wcfg, appConnMem, 10, NopMetrics())
mempool.InitWAL() mempool.InitWAL()
// 4. Ensure that the directory contains the WAL file // 4. Ensure that the directory contains the WAL file


+ 17
- 0
mempool/metrics.go View File

@ -0,0 +1,17 @@
package mempool
import "github.com/go-kit/kit/metrics"
import "github.com/go-kit/kit/metrics/discard"
// Metrics contains metrics exposed by this package.
// see MetricsProvider for descriptions.
type Metrics struct {
Size metrics.Gauge
}
// NopMetrics returns no-op Metrics.
func NopMetrics() *Metrics {
return &Metrics{
Size: discard.NewGauge(),
}
}

+ 11
- 5
node/node.go View File

@ -91,11 +91,11 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
} }
// MetricsProvider returns a consensus and p2p Metrics. // MetricsProvider returns a consensus and p2p Metrics.
type MetricsProvider func() (*cs.Metrics, *p2p.Metrics)
type MetricsProvider func() (*cs.Metrics, *p2p.Metrics, *mempl.Metrics)
// DefaultMetricsProvider returns a consensus and p2p Metrics build using // DefaultMetricsProvider returns a consensus and p2p Metrics build using
// Prometheus client library. // Prometheus client library.
func DefaultMetricsProvider() (*cs.Metrics, *p2p.Metrics) {
func DefaultMetricsProvider() (*cs.Metrics, *p2p.Metrics, *mempl.Metrics) {
return &cs.Metrics{ return &cs.Metrics{
Height: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Height: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: "consensus", Subsystem: "consensus",
@ -167,6 +167,12 @@ func DefaultMetricsProvider() (*cs.Metrics, *p2p.Metrics) {
Name: "peers", Name: "peers",
Help: "Number of peers.", Help: "Number of peers.",
}, []string{}), }, []string{}),
}, &mempl.Metrics{
Size: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: "mempool",
Name: "size",
Help: "Size of the mempool (number of uncommitted transactions).",
}, []string{}),
} }
} }
@ -294,9 +300,11 @@ func NewNode(config *cfg.Config,
consensusLogger.Info("This node is not a validator", "addr", privValidator.GetAddress(), "pubKey", privValidator.GetPubKey()) consensusLogger.Info("This node is not a validator", "addr", privValidator.GetAddress(), "pubKey", privValidator.GetPubKey())
} }
csMetrics, p2pMetrics, memplMetrics := metricsProvider()
// Make MempoolReactor // Make MempoolReactor
mempoolLogger := logger.With("module", "mempool") mempoolLogger := logger.With("module", "mempool")
mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool(), state.LastBlockHeight)
mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool(), state.LastBlockHeight, memplMetrics)
mempool.InitWAL() // no need to have the mempool wal during tests mempool.InitWAL() // no need to have the mempool wal during tests
mempool.SetLogger(mempoolLogger) mempool.SetLogger(mempoolLogger)
mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool) mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool)
@ -326,8 +334,6 @@ func NewNode(config *cfg.Config,
bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blockchain")) bcReactor.SetLogger(logger.With("module", "blockchain"))
csMetrics, p2pMetrics := metricsProvider()
// Make ConsensusReactor // Make ConsensusReactor
consensusState := cs.NewConsensusState(config.Consensus, state.Copy(), consensusState := cs.NewConsensusState(config.Consensus, state.Copy(),
blockExec, blockStore, mempool, evidencePool, csMetrics) blockExec, blockStore, mempool, evidencePool, csMetrics)


Loading…
Cancel
Save