- package core
-
- import (
- "context"
- "fmt"
- "time"
-
- "github.com/pkg/errors"
-
- abci "github.com/tendermint/tendermint/abci/types"
- ctypes "github.com/tendermint/tendermint/rpc/core/types"
- rpcserver "github.com/tendermint/tendermint/rpc/lib/server"
- "github.com/tendermint/tendermint/types"
- )
-
- //-----------------------------------------------------------------------------
- // NOTE: tx should be signed, but this is only checked at the app level (not by Tendermint!)
-
- // Returns right away, with no response
- //
- // ```shell
- // curl 'localhost:26657/broadcast_tx_async?tx="123"'
- // ```
- //
- // ```go
- // client := client.NewHTTP("tcp://0.0.0.0:26657", "/websocket")
- // err := client.Start()
- // if err != nil {
- // // handle error
- // }
- // defer client.Stop()
- // result, err := client.BroadcastTxAsync("123")
- // ```
- //
- // > The above command returns JSON structured like this:
- //
- // ```json
- // {
- // "error": "",
- // "result": {
- // "hash": "E39AAB7A537ABAA237831742DCE1117F187C3C52",
- // "log": "",
- // "data": "",
- // "code": "0"
- // },
- // "id": "",
- // "jsonrpc": "2.0"
- // }
- // ```
- //
- // ### Query Parameters
- //
- // | Parameter | Type | Default | Required | Description |
- // |-----------+------+---------+----------+-----------------|
- // | tx | Tx | nil | true | The transaction |
- func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
- err := mempool.CheckTx(tx, nil)
- if err != nil {
- return nil, err
- }
- return &ctypes.ResultBroadcastTx{Hash: tx.Hash()}, nil
- }
-
- // Returns with the response from CheckTx.
- //
- // ```shell
- // curl 'localhost:26657/broadcast_tx_sync?tx="456"'
- // ```
- //
- // ```go
- // client := client.NewHTTP("tcp://0.0.0.0:26657", "/websocket")
- // err := client.Start()
- // if err != nil {
- // // handle error
- // }
- // defer client.Stop()
- // result, err := client.BroadcastTxSync("456")
- // ```
- //
- // > The above command returns JSON structured like this:
- //
- // ```json
- // {
- // "jsonrpc": "2.0",
- // "id": "",
- // "result": {
- // "code": "0",
- // "data": "",
- // "log": "",
- // "hash": "0D33F2F03A5234F38706E43004489E061AC40A2E"
- // },
- // "error": ""
- // }
- // ```
- //
- // ### Query Parameters
- //
- // | Parameter | Type | Default | Required | Description |
- // |-----------+------+---------+----------+-----------------|
- // | tx | Tx | nil | true | The transaction |
- func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
- resCh := make(chan *abci.Response, 1)
- err := mempool.CheckTx(tx, func(res *abci.Response) {
- resCh <- res
- })
- if err != nil {
- return nil, err
- }
- res := <-resCh
- r := res.GetCheckTx()
- return &ctypes.ResultBroadcastTx{
- Code: r.Code,
- Data: r.Data,
- Log: r.Log,
- Hash: tx.Hash(),
- }, nil
- }
-
- // CONTRACT: only returns error if mempool.CheckTx() errs or if we timeout
- // waiting for tx to commit.
- //
- // If CheckTx or DeliverTx fail, no error will be returned, but the returned result
- // will contain a non-OK ABCI code.
- //
- // ```shell
- // curl 'localhost:26657/broadcast_tx_commit?tx="789"'
- // ```
- //
- // ```go
- // client := client.NewHTTP("tcp://0.0.0.0:26657", "/websocket")
- // err := client.Start()
- // if err != nil {
- // // handle error
- // }
- // defer client.Stop()
- // result, err := client.BroadcastTxCommit("789")
- // ```
- //
- // > The above command returns JSON structured like this:
- //
- // ```json
- // {
- // "error": "",
- // "result": {
- // "height": "26682",
- // "hash": "75CA0F856A4DA078FC4911580360E70CEFB2EBEE",
- // "deliver_tx": {
- // "log": "",
- // "data": "",
- // "code": "0"
- // },
- // "check_tx": {
- // "log": "",
- // "data": "",
- // "code": "0"
- // }
- // },
- // "id": "",
- // "jsonrpc": "2.0"
- // }
- // ```
- //
- // ### Query Parameters
- //
- // | Parameter | Type | Default | Required | Description |
- // |-----------+------+---------+----------+-----------------|
- // | tx | Tx | nil | true | The transaction |
- func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
- // Subscribe to tx being committed in block.
- ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
- defer cancel()
- deliverTxResCh := make(chan interface{}, 1)
- q := types.EventQueryTxFor(tx)
- err := eventBus.Subscribe(ctx, "mempool", q, deliverTxResCh)
- if err != nil {
- err = errors.Wrap(err, "failed to subscribe to tx")
- logger.Error("Error on broadcast_tx_commit", "err", err)
- return nil, err
- }
- defer func() {
- // drain deliverTxResCh to make sure we don't block
- LOOP:
- for {
- select {
- case <-deliverTxResCh:
- default:
- break LOOP
- }
- }
- eventBus.Unsubscribe(context.Background(), "mempool", q)
- }()
-
- // Broadcast tx and wait for CheckTx result
- checkTxResCh := make(chan *abci.Response, 1)
- err = mempool.CheckTx(tx, func(res *abci.Response) {
- checkTxResCh <- res
- })
- if err != nil {
- logger.Error("Error on broadcastTxCommit", "err", err)
- return nil, fmt.Errorf("Error on broadcastTxCommit: %v", err)
- }
- checkTxResMsg := <-checkTxResCh
- checkTxRes := checkTxResMsg.GetCheckTx()
- if checkTxRes.Code != abci.CodeTypeOK {
- return &ctypes.ResultBroadcastTxCommit{
- CheckTx: *checkTxRes,
- DeliverTx: abci.ResponseDeliverTx{},
- Hash: tx.Hash(),
- }, nil
- }
-
- // Wait for the tx to be included in a block or timeout.
- // TODO: configurable?
- var deliverTxTimeout = rpcserver.WriteTimeout / 2
- select {
- case deliverTxResMsg, ok := <-deliverTxResCh: // The tx was included in a block.
- if !ok {
- return nil, errors.New("Error on broadcastTxCommit: expected DeliverTxResult, got nil. Did the Tendermint stop?")
- }
- deliverTxRes := deliverTxResMsg.(types.EventDataTx)
- return &ctypes.ResultBroadcastTxCommit{
- CheckTx: *checkTxRes,
- DeliverTx: deliverTxRes.Result,
- Hash: tx.Hash(),
- Height: deliverTxRes.Height,
- }, nil
- case <-time.After(deliverTxTimeout):
- err = errors.New("Timed out waiting for tx to be included in a block")
- logger.Error("Error on broadcastTxCommit", "err", err)
- return &ctypes.ResultBroadcastTxCommit{
- CheckTx: *checkTxRes,
- DeliverTx: abci.ResponseDeliverTx{},
- Hash: tx.Hash(),
- }, err
- }
- }
-
- // Get unconfirmed transactions (maximum ?limit entries) including their number.
- //
- // ```shell
- // curl 'localhost:26657/unconfirmed_txs'
- // ```
- //
- // ```go
- // client := client.NewHTTP("tcp://0.0.0.0:26657", "/websocket")
- // err := client.Start()
- // if err != nil {
- // // handle error
- // }
- // defer client.Stop()
- // result, err := client.UnconfirmedTxs()
- // ```
- //
- // > The above command returns JSON structured like this:
- //
- // ```json
- // {
- // "error": "",
- // "result": {
- // "txs": [],
- // "n_txs": "0"
- // },
- // "id": "",
- // "jsonrpc": "2.0"
- // }
- //
- // ### Query Parameters
- //
- // | Parameter | Type | Default | Required | Description |
- // |-----------+------+---------+----------+--------------------------------------|
- // | limit | int | 30 | false | Maximum number of entries (max: 100) |
- // ```
- func UnconfirmedTxs(limit int) (*ctypes.ResultUnconfirmedTxs, error) {
- // reuse per_page validator
- limit = validatePerPage(limit)
-
- txs := mempool.ReapMaxTxs(limit)
- return &ctypes.ResultUnconfirmedTxs{N: len(txs), Txs: txs}, nil
- }
-
- // Get number of unconfirmed transactions.
- //
- // ```shell
- // curl 'localhost:26657/num_unconfirmed_txs'
- // ```
- //
- // ```go
- // client := client.NewHTTP("tcp://0.0.0.0:26657", "/websocket")
- // err := client.Start()
- // if err != nil {
- // // handle error
- // }
- // defer client.Stop()
- // result, err := client.UnconfirmedTxs()
- // ```
- //
- // > The above command returns JSON structured like this:
- //
- // ```json
- // {
- // "error": "",
- // "result": {
- // "txs": null,
- // "n_txs": "0"
- // },
- // "id": "",
- // "jsonrpc": "2.0"
- // }
- // ```
- func NumUnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) {
- return &ctypes.ResultUnconfirmedTxs{N: mempool.Size()}, nil
- }
|