You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

181 lines
4.2 KiB

  1. package main
  2. import (
  3. "encoding/binary"
  4. "encoding/hex"
  5. "fmt"
  6. "math/rand"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "sync"
  11. "time"
  12. "github.com/go-kit/kit/log"
  13. "github.com/gorilla/websocket"
  14. "github.com/pkg/errors"
  15. rpctypes "github.com/tendermint/go-rpc/types"
  16. )
  17. const (
  18. sendTimeout = 500 * time.Millisecond
  19. // see https://github.com/tendermint/go-rpc/blob/develop/server/handlers.go#L313
  20. pingPeriod = (30 * 9 / 10) * time.Second
  21. )
  22. type transacter struct {
  23. Target string
  24. Rate int
  25. Connections int
  26. conns []*websocket.Conn
  27. wg sync.WaitGroup
  28. stopped bool
  29. logger log.Logger
  30. }
  31. func newTransacter(target string, connections int, rate int) *transacter {
  32. return &transacter{
  33. Target: target,
  34. Rate: rate,
  35. Connections: connections,
  36. conns: make([]*websocket.Conn, connections),
  37. logger: log.NewNopLogger(),
  38. }
  39. }
  40. // SetLogger lets you set your own logger
  41. func (t *transacter) SetLogger(l log.Logger) {
  42. t.logger = l
  43. }
  44. // Start opens N = `t.Connections` connections to the target and creates read
  45. // and write goroutines for each connection.
  46. func (t *transacter) Start() error {
  47. t.stopped = false
  48. for i := 0; i < t.Connections; i++ {
  49. c, _, err := connect(t.Target)
  50. if err != nil {
  51. return err
  52. }
  53. t.conns[i] = c
  54. }
  55. t.wg.Add(2 * t.Connections)
  56. for i := 0; i < t.Connections; i++ {
  57. go t.sendLoop(i)
  58. go t.receiveLoop(i)
  59. }
  60. return nil
  61. }
  62. // Stop closes the connections.
  63. func (t *transacter) Stop() {
  64. t.stopped = true
  65. t.wg.Wait()
  66. for _, c := range t.conns {
  67. c.Close()
  68. }
  69. }
  70. // receiveLoop reads messages from the connection (empty in case of
  71. // `broadcast_tx_async`).
  72. func (t *transacter) receiveLoop(connIndex int) {
  73. c := t.conns[connIndex]
  74. defer t.wg.Done()
  75. for {
  76. _, _, err := c.ReadMessage()
  77. if err != nil {
  78. if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
  79. t.logger.Log("err", errors.Wrap(err, "failed to read response"))
  80. }
  81. return
  82. }
  83. if t.stopped {
  84. return
  85. }
  86. }
  87. }
  88. // sendLoop generates transactions at a given rate.
  89. func (t *transacter) sendLoop(connIndex int) {
  90. c := t.conns[connIndex]
  91. logger := log.With(t.logger, "addr", c.RemoteAddr())
  92. var txNumber = 0
  93. pingsTicker := time.NewTicker(pingPeriod)
  94. txsTicker := time.NewTicker(1 * time.Second)
  95. defer func() {
  96. pingsTicker.Stop()
  97. txsTicker.Stop()
  98. t.wg.Done()
  99. }()
  100. for {
  101. select {
  102. case <-txsTicker.C:
  103. startTime := time.Now()
  104. for i := 0; i < t.Rate; i++ {
  105. // each transaction embeds connection index and tx number
  106. tx := generateTx(connIndex, txNumber)
  107. c.SetWriteDeadline(time.Now().Add(sendTimeout))
  108. err := c.WriteJSON(rpctypes.RPCRequest{
  109. JSONRPC: "2.0",
  110. ID: "",
  111. Method: "broadcast_tx_async",
  112. Params: []interface{}{hex.EncodeToString(tx)},
  113. })
  114. if err != nil {
  115. fmt.Printf("%v. Try increasing the connections count and reducing the rate.\n", errors.Wrap(err, "txs send failed"))
  116. os.Exit(1)
  117. }
  118. txNumber++
  119. }
  120. timeToSend := time.Now().Sub(startTime)
  121. time.Sleep(time.Second - timeToSend)
  122. logger.Log("event", fmt.Sprintf("sent %d transactions", t.Rate), "took", timeToSend)
  123. case <-pingsTicker.C:
  124. // Right now go-rpc server closes the connection in the absence of pings
  125. c.SetWriteDeadline(time.Now().Add(sendTimeout))
  126. if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
  127. logger.Log("err", errors.Wrap(err, "failed to write ping message"))
  128. }
  129. }
  130. if t.stopped {
  131. // To cleanly close a connection, a client should send a close
  132. // frame and wait for the server to close the connection.
  133. c.SetWriteDeadline(time.Now().Add(sendTimeout))
  134. err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
  135. if err != nil {
  136. logger.Log("err", errors.Wrap(err, "failed to write close message"))
  137. }
  138. return
  139. }
  140. }
  141. }
  142. func connect(host string) (*websocket.Conn, *http.Response, error) {
  143. u := url.URL{Scheme: "ws", Host: host, Path: "/websocket"}
  144. return websocket.DefaultDialer.Dial(u.String(), nil)
  145. }
  146. func generateTx(a int, b int) []byte {
  147. tx := make([]byte, 250)
  148. binary.PutUvarint(tx[:32], uint64(a))
  149. binary.PutUvarint(tx[32:64], uint64(b))
  150. if _, err := rand.Read(tx[234:]); err != nil {
  151. panic(errors.Wrap(err, "failed to generate transaction"))
  152. }
  153. return tx
  154. }