Browse Source

rpc: paginate mempool /unconfirmed_txs endpoint (#7612)

This commit changes the behaviour of the /unconfirmed_txs endpoint by replacing limit with a page and perPage parameter for pagination.
The test case for unconfirmed_txs have been accommodated to properly test this change and the documentation for the API as well.
pull/7614/head
Kene 3 years ago
committed by GitHub
parent
commit
49153b753c
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 72 additions and 38 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +17
    -9
      internal/rpc/core/mempool.go
  3. +2
    -2
      internal/rpc/core/routes.go
  4. +2
    -2
      light/rpc/client.go
  5. +3
    -2
      rpc/client/http/http.go
  6. +2
    -1
      rpc/client/http/request.go
  7. +1
    -1
      rpc/client/interface.go
  8. +2
    -2
      rpc/client/local/local.go
  9. +31
    -16
      rpc/client/rpc_test.go
  10. +11
    -3
      rpc/openapi/openapi.yaml

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -53,6 +53,7 @@ Special thanks to external contributors on this release:
- [pubsub] \#7319 Performance improvements for the event query API (@creachadair) - [pubsub] \#7319 Performance improvements for the event query API (@creachadair)
- [node] \#7521 Define concrete type for seed node implementation (@spacech1mp) - [node] \#7521 Define concrete type for seed node implementation (@spacech1mp)
- [rpc] \#7612 paginate mempool /unconfirmed_txs rpc endpoint (@spacech1mp)
### BUG FIXES ### BUG FIXES


+ 17
- 9
internal/rpc/core/mempool.go View File

@ -10,6 +10,7 @@ import (
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/internal/state/indexer" "github.com/tendermint/tendermint/internal/state/indexer"
tmmath "github.com/tendermint/tendermint/libs/math"
"github.com/tendermint/tendermint/rpc/coretypes" "github.com/tendermint/tendermint/rpc/coretypes"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -117,19 +118,26 @@ func (env *Environment) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*co
} }
} }
// UnconfirmedTxs gets unconfirmed transactions (maximum ?limit entries)
// including their number.
// UnconfirmedTxs gets unconfirmed transactions from the mempool in order of priority
// More: https://docs.tendermint.com/master/rpc/#/Info/unconfirmed_txs // More: https://docs.tendermint.com/master/rpc/#/Info/unconfirmed_txs
func (env *Environment) UnconfirmedTxs(ctx context.Context, limitPtr *int) (*coretypes.ResultUnconfirmedTxs, error) {
// reuse per_page validator
limit := env.validatePerPage(limitPtr)
func (env *Environment) UnconfirmedTxs(ctx context.Context, pagePtr, perPagePtr *int) (*coretypes.ResultUnconfirmedTxs, error) {
totalCount := env.Mempool.Size()
perPage := env.validatePerPage(perPagePtr)
page, err := validatePage(pagePtr, perPage, totalCount)
if err != nil {
return nil, err
}
skipCount := validateSkipCount(page, perPage)
txs := env.Mempool.ReapMaxTxs(skipCount + tmmath.MinInt(perPage, totalCount-skipCount))
result := txs[skipCount:]
txs := env.Mempool.ReapMaxTxs(limit)
return &coretypes.ResultUnconfirmedTxs{ return &coretypes.ResultUnconfirmedTxs{
Count: len(txs),
Total: env.Mempool.Size(),
Count: len(result),
Total: totalCount,
TotalBytes: env.Mempool.SizeBytes(), TotalBytes: env.Mempool.SizeBytes(),
Txs: txs}, nil
Txs: result}, nil
} }
// NumUnconfirmedTxs gets number of unconfirmed transactions. // NumUnconfirmedTxs gets number of unconfirmed transactions.


+ 2
- 2
internal/rpc/core/routes.go View File

@ -55,7 +55,7 @@ func NewRoutesMap(svc RPCService, opts *RouteOptions) RoutesMap {
"dump_consensus_state": rpc.NewRPCFunc(svc.DumpConsensusState), "dump_consensus_state": rpc.NewRPCFunc(svc.DumpConsensusState),
"consensus_state": rpc.NewRPCFunc(svc.GetConsensusState), "consensus_state": rpc.NewRPCFunc(svc.GetConsensusState),
"consensus_params": rpc.NewRPCFunc(svc.ConsensusParams, "height"), "consensus_params": rpc.NewRPCFunc(svc.ConsensusParams, "height"),
"unconfirmed_txs": rpc.NewRPCFunc(svc.UnconfirmedTxs, "limit"),
"unconfirmed_txs": rpc.NewRPCFunc(svc.UnconfirmedTxs, "page", "per_page"),
"num_unconfirmed_txs": rpc.NewRPCFunc(svc.NumUnconfirmedTxs), "num_unconfirmed_txs": rpc.NewRPCFunc(svc.NumUnconfirmedTxs),
// tx broadcast API // tx broadcast API
@ -107,7 +107,7 @@ type RPCService interface {
Subscribe(ctx context.Context, query string) (*coretypes.ResultSubscribe, error) Subscribe(ctx context.Context, query string) (*coretypes.ResultSubscribe, error)
Tx(ctx context.Context, hash bytes.HexBytes, prove bool) (*coretypes.ResultTx, error) Tx(ctx context.Context, hash bytes.HexBytes, prove bool) (*coretypes.ResultTx, error)
TxSearch(ctx context.Context, query string, prove bool, pagePtr, perPagePtr *int, orderBy string) (*coretypes.ResultTxSearch, error) TxSearch(ctx context.Context, query string, prove bool, pagePtr, perPagePtr *int, orderBy string) (*coretypes.ResultTxSearch, error)
UnconfirmedTxs(ctx context.Context, limitPtr *int) (*coretypes.ResultUnconfirmedTxs, error)
UnconfirmedTxs(ctx context.Context, page, perPage *int) (*coretypes.ResultUnconfirmedTxs, error)
Unsubscribe(ctx context.Context, query string) (*coretypes.ResultUnsubscribe, error) Unsubscribe(ctx context.Context, query string) (*coretypes.ResultUnsubscribe, error)
UnsubscribeAll(ctx context.Context) (*coretypes.ResultUnsubscribe, error) UnsubscribeAll(ctx context.Context) (*coretypes.ResultUnsubscribe, error)
Validators(ctx context.Context, heightPtr *int64, pagePtr, perPagePtr *int) (*coretypes.ResultValidators, error) Validators(ctx context.Context, heightPtr *int64, pagePtr, perPagePtr *int) (*coretypes.ResultValidators, error)


+ 2
- 2
light/rpc/client.go View File

@ -211,8 +211,8 @@ func (c *Client) BroadcastTxSync(ctx context.Context, tx types.Tx) (*coretypes.R
return c.next.BroadcastTxSync(ctx, tx) return c.next.BroadcastTxSync(ctx, tx)
} }
func (c *Client) UnconfirmedTxs(ctx context.Context, limit *int) (*coretypes.ResultUnconfirmedTxs, error) {
return c.next.UnconfirmedTxs(ctx, limit)
func (c *Client) UnconfirmedTxs(ctx context.Context, page, perPage *int) (*coretypes.ResultUnconfirmedTxs, error) {
return c.next.UnconfirmedTxs(ctx, page, perPage)
} }
func (c *Client) NumUnconfirmedTxs(ctx context.Context) (*coretypes.ResultUnconfirmedTxs, error) { func (c *Client) NumUnconfirmedTxs(ctx context.Context) (*coretypes.ResultUnconfirmedTxs, error) {


+ 3
- 2
rpc/client/http/http.go View File

@ -281,11 +281,12 @@ func (c *baseRPCClient) broadcastTX(
func (c *baseRPCClient) UnconfirmedTxs( func (c *baseRPCClient) UnconfirmedTxs(
ctx context.Context, ctx context.Context,
limit *int,
page *int,
perPage *int,
) (*coretypes.ResultUnconfirmedTxs, error) { ) (*coretypes.ResultUnconfirmedTxs, error) {
result := new(coretypes.ResultUnconfirmedTxs) result := new(coretypes.ResultUnconfirmedTxs)
if err := c.caller.Call(ctx, "unconfirmed_txs", unconfirmedArgs{Limit: limit}, result); err != nil {
if err := c.caller.Call(ctx, "unconfirmed_txs", unconfirmedArgs{Page: page, PerPage: perPage}, result); err != nil {
return nil, err return nil, err
} }
return result, nil return result, nil


+ 2
- 1
rpc/client/http/request.go View File

@ -27,7 +27,8 @@ type txKeyArgs struct {
} }
type unconfirmedArgs struct { type unconfirmedArgs struct {
Limit *int `json:"limit,string,omitempty"`
Page *int `json:"page,string,omitempty"`
PerPage *int `json:"per_page,string,omitempty"`
} }
type heightArgs struct { type heightArgs struct {


+ 1
- 1
rpc/client/interface.go View File

@ -142,7 +142,7 @@ type EventsClient interface {
// MempoolClient shows us data about current mempool state. // MempoolClient shows us data about current mempool state.
type MempoolClient interface { type MempoolClient interface {
UnconfirmedTxs(ctx context.Context, limit *int) (*coretypes.ResultUnconfirmedTxs, error)
UnconfirmedTxs(ctx context.Context, page, perPage *int) (*coretypes.ResultUnconfirmedTxs, error)
NumUnconfirmedTxs(context.Context) (*coretypes.ResultUnconfirmedTxs, error) NumUnconfirmedTxs(context.Context) (*coretypes.ResultUnconfirmedTxs, error)
CheckTx(context.Context, types.Tx) (*coretypes.ResultCheckTx, error) CheckTx(context.Context, types.Tx) (*coretypes.ResultCheckTx, error)
RemoveTx(context.Context, types.TxKey) error RemoveTx(context.Context, types.TxKey) error


+ 2
- 2
rpc/client/local/local.go View File

@ -97,8 +97,8 @@ func (c *Local) BroadcastTxSync(ctx context.Context, tx types.Tx) (*coretypes.Re
return c.env.BroadcastTxSync(ctx, tx) return c.env.BroadcastTxSync(ctx, tx)
} }
func (c *Local) UnconfirmedTxs(ctx context.Context, limit *int) (*coretypes.ResultUnconfirmedTxs, error) {
return c.env.UnconfirmedTxs(ctx, limit)
func (c *Local) UnconfirmedTxs(ctx context.Context, page, perPage *int) (*coretypes.ResultUnconfirmedTxs, error) {
return c.env.UnconfirmedTxs(ctx, page, perPage)
} }
func (c *Local) NumUnconfirmedTxs(ctx context.Context) (*coretypes.ResultUnconfirmedTxs, error) { func (c *Local) NumUnconfirmedTxs(ctx context.Context) (*coretypes.ResultUnconfirmedTxs, error) {


+ 31
- 16
rpc/client/rpc_test.go View File

@ -612,29 +612,44 @@ func TestClientMethodCallsAdvanced(t *testing.T) {
pool := getMempool(t, n) pool := getMempool(t, n)
t.Run("UnconfirmedTxs", func(t *testing.T) { t.Run("UnconfirmedTxs", func(t *testing.T) {
_, _, tx := MakeTxKV()
ch := make(chan struct{})
// populate mempool with 5 tx
txs := make([]types.Tx, 5)
ch := make(chan error, 5)
for i := 0; i < 5; i++ {
_, _, tx := MakeTxKV()
err := pool.CheckTx(ctx, tx, func(_ *abci.Response) { close(ch) }, mempool.TxInfo{})
require.NoError(t, err)
txs[i] = tx
err := pool.CheckTx(ctx, tx, func(_ *abci.Response) { ch <- nil }, mempool.TxInfo{})
require.NoError(t, err)
}
// wait for tx to arrive in mempoool. // wait for tx to arrive in mempoool.
select {
case <-ch:
case <-time.After(5 * time.Second):
t.Error("Timed out waiting for CheckTx callback")
for i := 0; i < 5; i++ {
select {
case <-ch:
case <-time.After(5 * time.Second):
t.Error("Timed out waiting for CheckTx callback")
}
} }
close(ch)
for _, c := range GetClients(t, n, conf) { for _, c := range GetClients(t, n, conf) {
mc := c.(client.MempoolClient)
limit := 1
res, err := mc.UnconfirmedTxs(ctx, &limit)
require.NoError(t, err)
for i := 1; i <= 2; i++ {
mc := c.(client.MempoolClient)
page, perPage := i, 3
res, err := mc.UnconfirmedTxs(ctx, &page, &perPage)
require.NoError(t, err)
assert.Equal(t, 1, res.Count)
assert.Equal(t, 1, res.Total)
assert.Equal(t, pool.SizeBytes(), res.TotalBytes)
assert.Exactly(t, types.Txs{tx}, types.Txs(res.Txs))
if i == 2 {
perPage = 2
}
assert.Equal(t, perPage, res.Count)
assert.Equal(t, 5, res.Total)
assert.Equal(t, pool.SizeBytes(), res.TotalBytes)
for _, tx := range res.Txs {
assert.Contains(t, txs, tx)
}
}
} }
pool.Flush() pool.Flush()


+ 11
- 3
rpc/openapi/openapi.yaml View File

@ -1062,13 +1062,21 @@ paths:
operationId: unconfirmed_txs operationId: unconfirmed_txs
parameters: parameters:
- in: query - in: query
name: limit
description: Maximum number of unconfirmed transactions to return (max 100)
name: page
description: "Page number (1-based)"
required: false required: false
schema: schema:
type: integer type: integer
default: 30
default: 1
example: 1 example: 1
- in: query
name: per_page
description: "Number of entries per page (max: 100)"
required: false
schema:
type: integer
example: 100
default: 30
tags: tags:
- Info - Info
description: | description: |


Loading…
Cancel
Save