From 49153b753c5d900a3eec09da56568bb3e5e66685 Mon Sep 17 00:00:00 2001 From: Kene <48869444+spaceCh1mp@users.noreply.github.com> Date: Tue, 18 Jan 2022 10:58:32 +0100 Subject: [PATCH] rpc: paginate mempool /unconfirmed_txs endpoint (#7612) This commit changes the behaviour of the /unconfirmed_txs endpoint by replacing limit with a page and perPage parameter for pagination. The test case for unconfirmed_txs have been accommodated to properly test this change and the documentation for the API as well. --- CHANGELOG_PENDING.md | 1 + internal/rpc/core/mempool.go | 26 +++++++++++++------- internal/rpc/core/routes.go | 4 +-- light/rpc/client.go | 4 +-- rpc/client/http/http.go | 5 ++-- rpc/client/http/request.go | 3 ++- rpc/client/interface.go | 2 +- rpc/client/local/local.go | 4 +-- rpc/client/rpc_test.go | 47 ++++++++++++++++++++++++------------ rpc/openapi/openapi.yaml | 14 ++++++++--- 10 files changed, 72 insertions(+), 38 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 1609e9755..7a9c98700 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -53,6 +53,7 @@ Special thanks to external contributors on this release: - [pubsub] \#7319 Performance improvements for the event query API (@creachadair) - [node] \#7521 Define concrete type for seed node implementation (@spacech1mp) +- [rpc] \#7612 paginate mempool /unconfirmed_txs rpc endpoint (@spacech1mp) ### BUG FIXES diff --git a/internal/rpc/core/mempool.go b/internal/rpc/core/mempool.go index 751c7ee73..4087439cb 100644 --- a/internal/rpc/core/mempool.go +++ b/internal/rpc/core/mempool.go @@ -10,6 +10,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/internal/state/indexer" + tmmath "github.com/tendermint/tendermint/libs/math" "github.com/tendermint/tendermint/rpc/coretypes" "github.com/tendermint/tendermint/types" ) @@ -117,19 +118,26 @@ func (env *Environment) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*co } } -// UnconfirmedTxs gets unconfirmed transactions (maximum ?limit entries) -// including their number. +// UnconfirmedTxs gets unconfirmed transactions from the mempool in order of priority // More: https://docs.tendermint.com/master/rpc/#/Info/unconfirmed_txs -func (env *Environment) UnconfirmedTxs(ctx context.Context, limitPtr *int) (*coretypes.ResultUnconfirmedTxs, error) { - // reuse per_page validator - limit := env.validatePerPage(limitPtr) +func (env *Environment) UnconfirmedTxs(ctx context.Context, pagePtr, perPagePtr *int) (*coretypes.ResultUnconfirmedTxs, error) { + totalCount := env.Mempool.Size() + perPage := env.validatePerPage(perPagePtr) + page, err := validatePage(pagePtr, perPage, totalCount) + if err != nil { + return nil, err + } + + skipCount := validateSkipCount(page, perPage) + + txs := env.Mempool.ReapMaxTxs(skipCount + tmmath.MinInt(perPage, totalCount-skipCount)) + result := txs[skipCount:] - txs := env.Mempool.ReapMaxTxs(limit) return &coretypes.ResultUnconfirmedTxs{ - Count: len(txs), - Total: env.Mempool.Size(), + Count: len(result), + Total: totalCount, TotalBytes: env.Mempool.SizeBytes(), - Txs: txs}, nil + Txs: result}, nil } // NumUnconfirmedTxs gets number of unconfirmed transactions. diff --git a/internal/rpc/core/routes.go b/internal/rpc/core/routes.go index fd26ab50e..09be47c5c 100644 --- a/internal/rpc/core/routes.go +++ b/internal/rpc/core/routes.go @@ -55,7 +55,7 @@ func NewRoutesMap(svc RPCService, opts *RouteOptions) RoutesMap { "dump_consensus_state": rpc.NewRPCFunc(svc.DumpConsensusState), "consensus_state": rpc.NewRPCFunc(svc.GetConsensusState), "consensus_params": rpc.NewRPCFunc(svc.ConsensusParams, "height"), - "unconfirmed_txs": rpc.NewRPCFunc(svc.UnconfirmedTxs, "limit"), + "unconfirmed_txs": rpc.NewRPCFunc(svc.UnconfirmedTxs, "page", "per_page"), "num_unconfirmed_txs": rpc.NewRPCFunc(svc.NumUnconfirmedTxs), // tx broadcast API @@ -107,7 +107,7 @@ type RPCService interface { Subscribe(ctx context.Context, query string) (*coretypes.ResultSubscribe, error) Tx(ctx context.Context, hash bytes.HexBytes, prove bool) (*coretypes.ResultTx, error) TxSearch(ctx context.Context, query string, prove bool, pagePtr, perPagePtr *int, orderBy string) (*coretypes.ResultTxSearch, error) - UnconfirmedTxs(ctx context.Context, limitPtr *int) (*coretypes.ResultUnconfirmedTxs, error) + UnconfirmedTxs(ctx context.Context, page, perPage *int) (*coretypes.ResultUnconfirmedTxs, error) Unsubscribe(ctx context.Context, query string) (*coretypes.ResultUnsubscribe, error) UnsubscribeAll(ctx context.Context) (*coretypes.ResultUnsubscribe, error) Validators(ctx context.Context, heightPtr *int64, pagePtr, perPagePtr *int) (*coretypes.ResultValidators, error) diff --git a/light/rpc/client.go b/light/rpc/client.go index fec6e4723..272100422 100644 --- a/light/rpc/client.go +++ b/light/rpc/client.go @@ -211,8 +211,8 @@ func (c *Client) BroadcastTxSync(ctx context.Context, tx types.Tx) (*coretypes.R return c.next.BroadcastTxSync(ctx, tx) } -func (c *Client) UnconfirmedTxs(ctx context.Context, limit *int) (*coretypes.ResultUnconfirmedTxs, error) { - return c.next.UnconfirmedTxs(ctx, limit) +func (c *Client) UnconfirmedTxs(ctx context.Context, page, perPage *int) (*coretypes.ResultUnconfirmedTxs, error) { + return c.next.UnconfirmedTxs(ctx, page, perPage) } func (c *Client) NumUnconfirmedTxs(ctx context.Context) (*coretypes.ResultUnconfirmedTxs, error) { diff --git a/rpc/client/http/http.go b/rpc/client/http/http.go index f10985ae3..f3ae8fe1c 100644 --- a/rpc/client/http/http.go +++ b/rpc/client/http/http.go @@ -281,11 +281,12 @@ func (c *baseRPCClient) broadcastTX( func (c *baseRPCClient) UnconfirmedTxs( ctx context.Context, - limit *int, + page *int, + perPage *int, ) (*coretypes.ResultUnconfirmedTxs, error) { result := new(coretypes.ResultUnconfirmedTxs) - if err := c.caller.Call(ctx, "unconfirmed_txs", unconfirmedArgs{Limit: limit}, result); err != nil { + if err := c.caller.Call(ctx, "unconfirmed_txs", unconfirmedArgs{Page: page, PerPage: perPage}, result); err != nil { return nil, err } return result, nil diff --git a/rpc/client/http/request.go b/rpc/client/http/request.go index a6f85b637..88d6b1d1b 100644 --- a/rpc/client/http/request.go +++ b/rpc/client/http/request.go @@ -27,7 +27,8 @@ type txKeyArgs struct { } type unconfirmedArgs struct { - Limit *int `json:"limit,string,omitempty"` + Page *int `json:"page,string,omitempty"` + PerPage *int `json:"per_page,string,omitempty"` } type heightArgs struct { diff --git a/rpc/client/interface.go b/rpc/client/interface.go index 9b2a600cc..d5bbaec1b 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -142,7 +142,7 @@ type EventsClient interface { // MempoolClient shows us data about current mempool state. type MempoolClient interface { - UnconfirmedTxs(ctx context.Context, limit *int) (*coretypes.ResultUnconfirmedTxs, error) + UnconfirmedTxs(ctx context.Context, page, perPage *int) (*coretypes.ResultUnconfirmedTxs, error) NumUnconfirmedTxs(context.Context) (*coretypes.ResultUnconfirmedTxs, error) CheckTx(context.Context, types.Tx) (*coretypes.ResultCheckTx, error) RemoveTx(context.Context, types.TxKey) error diff --git a/rpc/client/local/local.go b/rpc/client/local/local.go index 7f2ab46d4..95f3c63b9 100644 --- a/rpc/client/local/local.go +++ b/rpc/client/local/local.go @@ -97,8 +97,8 @@ func (c *Local) BroadcastTxSync(ctx context.Context, tx types.Tx) (*coretypes.Re return c.env.BroadcastTxSync(ctx, tx) } -func (c *Local) UnconfirmedTxs(ctx context.Context, limit *int) (*coretypes.ResultUnconfirmedTxs, error) { - return c.env.UnconfirmedTxs(ctx, limit) +func (c *Local) UnconfirmedTxs(ctx context.Context, page, perPage *int) (*coretypes.ResultUnconfirmedTxs, error) { + return c.env.UnconfirmedTxs(ctx, page, perPage) } func (c *Local) NumUnconfirmedTxs(ctx context.Context) (*coretypes.ResultUnconfirmedTxs, error) { diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index 37cc9707b..3e0e39554 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -612,29 +612,44 @@ func TestClientMethodCallsAdvanced(t *testing.T) { pool := getMempool(t, n) t.Run("UnconfirmedTxs", func(t *testing.T) { - _, _, tx := MakeTxKV() - ch := make(chan struct{}) + // populate mempool with 5 tx + txs := make([]types.Tx, 5) + ch := make(chan error, 5) + for i := 0; i < 5; i++ { + _, _, tx := MakeTxKV() - err := pool.CheckTx(ctx, tx, func(_ *abci.Response) { close(ch) }, mempool.TxInfo{}) - require.NoError(t, err) + txs[i] = tx + err := pool.CheckTx(ctx, tx, func(_ *abci.Response) { ch <- nil }, mempool.TxInfo{}) + require.NoError(t, err) + } // wait for tx to arrive in mempoool. - select { - case <-ch: - case <-time.After(5 * time.Second): - t.Error("Timed out waiting for CheckTx callback") + for i := 0; i < 5; i++ { + select { + case <-ch: + case <-time.After(5 * time.Second): + t.Error("Timed out waiting for CheckTx callback") + } } + close(ch) for _, c := range GetClients(t, n, conf) { - mc := c.(client.MempoolClient) - limit := 1 - res, err := mc.UnconfirmedTxs(ctx, &limit) - require.NoError(t, err) + for i := 1; i <= 2; i++ { + mc := c.(client.MempoolClient) + page, perPage := i, 3 + res, err := mc.UnconfirmedTxs(ctx, &page, &perPage) + require.NoError(t, err) - assert.Equal(t, 1, res.Count) - assert.Equal(t, 1, res.Total) - assert.Equal(t, pool.SizeBytes(), res.TotalBytes) - assert.Exactly(t, types.Txs{tx}, types.Txs(res.Txs)) + if i == 2 { + perPage = 2 + } + assert.Equal(t, perPage, res.Count) + assert.Equal(t, 5, res.Total) + assert.Equal(t, pool.SizeBytes(), res.TotalBytes) + for _, tx := range res.Txs { + assert.Contains(t, txs, tx) + } + } } pool.Flush() diff --git a/rpc/openapi/openapi.yaml b/rpc/openapi/openapi.yaml index bcb5739dd..ba48ee82e 100644 --- a/rpc/openapi/openapi.yaml +++ b/rpc/openapi/openapi.yaml @@ -1062,13 +1062,21 @@ paths: operationId: unconfirmed_txs parameters: - in: query - name: limit - description: Maximum number of unconfirmed transactions to return (max 100) + name: page + description: "Page number (1-based)" required: false schema: type: integer - default: 30 + default: 1 example: 1 + - in: query + name: per_page + description: "Number of entries per page (max: 100)" + required: false + schema: + type: integer + example: 100 + default: 30 tags: - Info description: |