package core import ( "context" "fmt" "time" "github.com/pkg/errors" abci "github.com/tendermint/tendermint/abci/types" ctypes "github.com/tendermint/tendermint/rpc/core/types" rpcserver "github.com/tendermint/tendermint/rpc/lib/server" "github.com/tendermint/tendermint/types" ) //----------------------------------------------------------------------------- // NOTE: tx should be signed, but this is only checked at the app level (not by Tendermint!) // Returns right away, with no response // // ```shell // curl 'localhost:26657/broadcast_tx_async?tx="123"' // ``` // // ```go // client := client.NewHTTP("tcp://0.0.0.0:26657", "/websocket") // err := client.Start() // if err != nil { // // handle error // } // defer client.Stop() // result, err := client.BroadcastTxAsync("123") // ``` // // > The above command returns JSON structured like this: // // ```json // { // "error": "", // "result": { // "hash": "E39AAB7A537ABAA237831742DCE1117F187C3C52", // "log": "", // "data": "", // "code": "0" // }, // "id": "", // "jsonrpc": "2.0" // } // ``` // // ### Query Parameters // // | Parameter | Type | Default | Required | Description | // |-----------+------+---------+----------+-----------------| // | tx | Tx | nil | true | The transaction | func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { err := mempool.CheckTx(tx, nil) if err != nil { return nil, err } return &ctypes.ResultBroadcastTx{Hash: tx.Hash()}, nil } // Returns with the response from CheckTx. // // ```shell // curl 'localhost:26657/broadcast_tx_sync?tx="456"' // ``` // // ```go // client := client.NewHTTP("tcp://0.0.0.0:26657", "/websocket") // err := client.Start() // if err != nil { // // handle error // } // defer client.Stop() // result, err := client.BroadcastTxSync("456") // ``` // // > The above command returns JSON structured like this: // // ```json // { // "jsonrpc": "2.0", // "id": "", // "result": { // "code": "0", // "data": "", // "log": "", // "hash": "0D33F2F03A5234F38706E43004489E061AC40A2E" // }, // "error": "" // } // ``` // // ### Query Parameters // // | Parameter | Type | Default | Required | Description | // |-----------+------+---------+----------+-----------------| // | tx | Tx | nil | true | The transaction | func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { resCh := make(chan *abci.Response, 1) err := mempool.CheckTx(tx, func(res *abci.Response) { resCh <- res }) if err != nil { return nil, err } res := <-resCh r := res.GetCheckTx() return &ctypes.ResultBroadcastTx{ Code: r.Code, Data: r.Data, Log: r.Log, Hash: tx.Hash(), }, nil } // CONTRACT: only returns error if mempool.CheckTx() errs or if we timeout // waiting for tx to commit. // // If CheckTx or DeliverTx fail, no error will be returned, but the returned result // will contain a non-OK ABCI code. // // ```shell // curl 'localhost:26657/broadcast_tx_commit?tx="789"' // ``` // // ```go // client := client.NewHTTP("tcp://0.0.0.0:26657", "/websocket") // err := client.Start() // if err != nil { // // handle error // } // defer client.Stop() // result, err := client.BroadcastTxCommit("789") // ``` // // > The above command returns JSON structured like this: // // ```json // { // "error": "", // "result": { // "height": "26682", // "hash": "75CA0F856A4DA078FC4911580360E70CEFB2EBEE", // "deliver_tx": { // "log": "", // "data": "", // "code": "0" // }, // "check_tx": { // "log": "", // "data": "", // "code": "0" // } // }, // "id": "", // "jsonrpc": "2.0" // } // ``` // // ### Query Parameters // // | Parameter | Type | Default | Required | Description | // |-----------+------+---------+----------+-----------------| // | tx | Tx | nil | true | The transaction | func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { // Subscribe to tx being committed in block. ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) defer cancel() deliverTxResCh := make(chan interface{}, 1) q := types.EventQueryTxFor(tx) err := eventBus.Subscribe(ctx, "mempool", q, deliverTxResCh) if err != nil { err = errors.Wrap(err, "failed to subscribe to tx") logger.Error("Error on broadcast_tx_commit", "err", err) return nil, err } defer func() { // drain deliverTxResCh to make sure we don't block LOOP: for { select { case <-deliverTxResCh: default: break LOOP } } eventBus.Unsubscribe(context.Background(), "mempool", q) }() // Broadcast tx and wait for CheckTx result checkTxResCh := make(chan *abci.Response, 1) err = mempool.CheckTx(tx, func(res *abci.Response) { checkTxResCh <- res }) if err != nil { logger.Error("Error on broadcastTxCommit", "err", err) return nil, fmt.Errorf("Error on broadcastTxCommit: %v", err) } checkTxResMsg := <-checkTxResCh checkTxRes := checkTxResMsg.GetCheckTx() if checkTxRes.Code != abci.CodeTypeOK { return &ctypes.ResultBroadcastTxCommit{ CheckTx: *checkTxRes, DeliverTx: abci.ResponseDeliverTx{}, Hash: tx.Hash(), }, nil } // Wait for the tx to be included in a block or timeout. // TODO: configurable? var deliverTxTimeout = rpcserver.WriteTimeout / 2 select { case deliverTxResMsg, ok := <-deliverTxResCh: // The tx was included in a block. if !ok { return nil, errors.New("Error on broadcastTxCommit: expected DeliverTxResult, got nil. Did the Tendermint stop?") } deliverTxRes := deliverTxResMsg.(types.EventDataTx) return &ctypes.ResultBroadcastTxCommit{ CheckTx: *checkTxRes, DeliverTx: deliverTxRes.Result, Hash: tx.Hash(), Height: deliverTxRes.Height, }, nil case <-time.After(deliverTxTimeout): err = errors.New("Timed out waiting for tx to be included in a block") logger.Error("Error on broadcastTxCommit", "err", err) return &ctypes.ResultBroadcastTxCommit{ CheckTx: *checkTxRes, DeliverTx: abci.ResponseDeliverTx{}, Hash: tx.Hash(), }, err } } // Get unconfirmed transactions (maximum ?limit entries) including their number. // // ```shell // curl 'localhost:26657/unconfirmed_txs' // ``` // // ```go // client := client.NewHTTP("tcp://0.0.0.0:26657", "/websocket") // err := client.Start() // if err != nil { // // handle error // } // defer client.Stop() // result, err := client.UnconfirmedTxs() // ``` // // > The above command returns JSON structured like this: // // ```json // { // "error": "", // "result": { // "txs": [], // "n_txs": "0" // }, // "id": "", // "jsonrpc": "2.0" // } // // ### Query Parameters // // | Parameter | Type | Default | Required | Description | // |-----------+------+---------+----------+--------------------------------------| // | limit | int | 30 | false | Maximum number of entries (max: 100) | // ``` func UnconfirmedTxs(limit int) (*ctypes.ResultUnconfirmedTxs, error) { // reuse per_page validator limit = validatePerPage(limit) txs := mempool.ReapMaxTxs(limit) return &ctypes.ResultUnconfirmedTxs{len(txs), txs}, nil } // Get number of unconfirmed transactions. // // ```shell // curl 'localhost:26657/num_unconfirmed_txs' // ``` // // ```go // client := client.NewHTTP("tcp://0.0.0.0:26657", "/websocket") // err := client.Start() // if err != nil { // // handle error // } // defer client.Stop() // result, err := client.UnconfirmedTxs() // ``` // // > The above command returns JSON structured like this: // // ```json // { // "error": "", // "result": { // "txs": null, // "n_txs": "0" // }, // "id": "", // "jsonrpc": "2.0" // } // ``` func NumUnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) { return &ctypes.ResultUnconfirmedTxs{N: mempool.Size()}, nil }