You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

113 lines
3.3 KiB

8 years ago
8 years ago
8 years ago
8 years ago
9 years ago
  1. package core
  2. import (
  3. "fmt"
  4. "time"
  5. abci "github.com/tendermint/abci/types"
  6. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  7. "github.com/tendermint/tendermint/types"
  8. )
  9. //-----------------------------------------------------------------------------
  10. // NOTE: tx should be signed, but this is only checked at the app level (not by Tendermint!)
  11. // Returns right away, with no response
  12. func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  13. err := mempool.CheckTx(tx, nil)
  14. if err != nil {
  15. return nil, fmt.Errorf("Error broadcasting transaction: %v", err)
  16. }
  17. return &ctypes.ResultBroadcastTx{Hash: tx.Hash()}, nil
  18. }
  19. // Returns with the response from CheckTx
  20. func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
  21. resCh := make(chan *abci.Response, 1)
  22. err := mempool.CheckTx(tx, func(res *abci.Response) {
  23. resCh <- res
  24. })
  25. if err != nil {
  26. return nil, fmt.Errorf("Error broadcasting transaction: %v", err)
  27. }
  28. res := <-resCh
  29. r := res.GetCheckTx()
  30. return &ctypes.ResultBroadcastTx{
  31. Code: r.Code,
  32. Data: r.Data,
  33. Log: r.Log,
  34. Hash: tx.Hash(),
  35. }, nil
  36. }
  37. // CONTRACT: only returns error if mempool.BroadcastTx errs (ie. problem with the app)
  38. // or if we timeout waiting for tx to commit.
  39. // If CheckTx or DeliverTx fail, no error will be returned, but the returned result
  40. // will contain a non-OK ABCI code.
  41. func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
  42. // subscribe to tx being committed in block
  43. deliverTxResCh := make(chan types.EventDataTx, 1)
  44. types.AddListenerForEvent(eventSwitch, "rpc", types.EventStringTx(tx), func(data types.TMEventData) {
  45. deliverTxResCh <- data.(types.EventDataTx)
  46. })
  47. // broadcast the tx and register checktx callback
  48. checkTxResCh := make(chan *abci.Response, 1)
  49. err := mempool.CheckTx(tx, func(res *abci.Response) {
  50. checkTxResCh <- res
  51. })
  52. if err != nil {
  53. log.Error("err", "err", err)
  54. return nil, fmt.Errorf("Error broadcasting transaction: %v", err)
  55. }
  56. checkTxRes := <-checkTxResCh
  57. checkTxR := checkTxRes.GetCheckTx()
  58. if checkTxR.Code != abci.CodeType_OK {
  59. // CheckTx failed!
  60. return &ctypes.ResultBroadcastTxCommit{
  61. CheckTx: checkTxR,
  62. DeliverTx: nil,
  63. Hash: tx.Hash(),
  64. }, nil
  65. }
  66. // Wait for the tx to be included in a block,
  67. // timeout after something reasonable.
  68. // TODO: configureable?
  69. timer := time.NewTimer(60 * 2 * time.Second)
  70. select {
  71. case deliverTxRes := <-deliverTxResCh:
  72. // The tx was included in a block.
  73. deliverTxR := &abci.ResponseDeliverTx{
  74. Code: deliverTxRes.Code,
  75. Data: deliverTxRes.Data,
  76. Log: deliverTxRes.Log,
  77. }
  78. log.Notice("DeliverTx passed ", "tx", []byte(tx), "response", deliverTxR)
  79. return &ctypes.ResultBroadcastTxCommit{
  80. CheckTx: checkTxR,
  81. DeliverTx: deliverTxR,
  82. Hash: tx.Hash(),
  83. Height: deliverTxRes.Height,
  84. }, nil
  85. case <-timer.C:
  86. log.Error("failed to include tx")
  87. return &ctypes.ResultBroadcastTxCommit{
  88. CheckTx: checkTxR,
  89. DeliverTx: nil,
  90. Hash: tx.Hash(),
  91. }, fmt.Errorf("Timed out waiting for transaction to be included in a block")
  92. }
  93. panic("Should never happen!")
  94. }
  95. func UnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) {
  96. txs := mempool.Reap(-1)
  97. return &ctypes.ResultUnconfirmedTxs{len(txs), txs}, nil
  98. }
  99. func NumUnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) {
  100. return &ctypes.ResultUnconfirmedTxs{N: mempool.Size()}, nil
  101. }