You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

220 lines
5.2 KiB

  1. package main
  2. import (
  3. "crypto/md5"
  4. "encoding/binary"
  5. "encoding/hex"
  6. "encoding/json"
  7. "fmt"
  8. "math/rand"
  9. "net"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "sync"
  14. "time"
  15. "github.com/gorilla/websocket"
  16. "github.com/pkg/errors"
  17. rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
  18. "github.com/tendermint/tmlibs/log"
  19. )
  20. const (
  21. sendTimeout = 10 * time.Second
  22. // see https://github.com/tendermint/go-rpc/blob/develop/server/handlers.go#L313
  23. pingPeriod = (30 * 9 / 10) * time.Second
  24. // the size of a transaction in bytes.
  25. txSize = 250
  26. )
  27. type transacter struct {
  28. Target string
  29. Rate int
  30. Connections int
  31. conns []*websocket.Conn
  32. wg sync.WaitGroup
  33. stopped bool
  34. logger log.Logger
  35. }
  36. func newTransacter(target string, connections int, rate int) *transacter {
  37. return &transacter{
  38. Target: target,
  39. Rate: rate,
  40. Connections: connections,
  41. conns: make([]*websocket.Conn, connections),
  42. logger: log.NewNopLogger(),
  43. }
  44. }
  45. // SetLogger lets you set your own logger
  46. func (t *transacter) SetLogger(l log.Logger) {
  47. t.logger = l
  48. }
  49. // Start opens N = `t.Connections` connections to the target and creates read
  50. // and write goroutines for each connection.
  51. func (t *transacter) Start() error {
  52. t.stopped = false
  53. rand.Seed(time.Now().Unix())
  54. for i := 0; i < t.Connections; i++ {
  55. c, _, err := connect(t.Target)
  56. if err != nil {
  57. return err
  58. }
  59. t.conns[i] = c
  60. }
  61. t.wg.Add(2 * t.Connections)
  62. for i := 0; i < t.Connections; i++ {
  63. go t.sendLoop(i)
  64. go t.receiveLoop(i)
  65. }
  66. return nil
  67. }
  68. // Stop closes the connections.
  69. func (t *transacter) Stop() {
  70. t.stopped = true
  71. t.wg.Wait()
  72. for _, c := range t.conns {
  73. c.Close()
  74. }
  75. }
  76. // receiveLoop reads messages from the connection (empty in case of
  77. // `broadcast_tx_async`).
  78. func (t *transacter) receiveLoop(connIndex int) {
  79. c := t.conns[connIndex]
  80. defer t.wg.Done()
  81. for {
  82. _, _, err := c.ReadMessage()
  83. if err != nil {
  84. if !websocket.IsCloseError(err, websocket.CloseNormalClosure) {
  85. t.logger.Error("failed to read response", "err", err)
  86. }
  87. return
  88. }
  89. if t.stopped {
  90. return
  91. }
  92. }
  93. }
  94. // sendLoop generates transactions at a given rate.
  95. func (t *transacter) sendLoop(connIndex int) {
  96. c := t.conns[connIndex]
  97. c.SetPingHandler(func(message string) error {
  98. err := c.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(sendTimeout))
  99. if err == websocket.ErrCloseSent {
  100. return nil
  101. } else if e, ok := err.(net.Error); ok && e.Temporary() {
  102. return nil
  103. }
  104. return err
  105. })
  106. logger := t.logger.With("addr", c.RemoteAddr())
  107. var txNumber = 0
  108. pingsTicker := time.NewTicker(pingPeriod)
  109. txsTicker := time.NewTicker(1 * time.Second)
  110. defer func() {
  111. pingsTicker.Stop()
  112. txsTicker.Stop()
  113. t.wg.Done()
  114. }()
  115. // hash of the host name is a part of each tx
  116. var hostnameHash [md5.Size]byte
  117. hostname, err := os.Hostname()
  118. if err != nil {
  119. hostname = "127.0.0.1"
  120. }
  121. hostnameHash = md5.Sum([]byte(hostname))
  122. for {
  123. select {
  124. case <-txsTicker.C:
  125. startTime := time.Now()
  126. for i := 0; i < t.Rate; i++ {
  127. // each transaction embeds connection index, tx number and hash of the hostname
  128. tx := generateTx(connIndex, txNumber, hostnameHash)
  129. paramsJson, err := json.Marshal(map[string]interface{}{"tx": hex.EncodeToString(tx)})
  130. if err != nil {
  131. fmt.Printf("failed to encode params: %v\n", err)
  132. os.Exit(1)
  133. }
  134. rawParamsJson := json.RawMessage(paramsJson)
  135. c.SetWriteDeadline(time.Now().Add(sendTimeout))
  136. err = c.WriteJSON(rpctypes.RPCRequest{
  137. JSONRPC: "2.0",
  138. ID: "tm-bench",
  139. Method: "broadcast_tx_async",
  140. Params: &rawParamsJson,
  141. })
  142. if err != nil {
  143. fmt.Printf("%v. Try reducing the connections count and increasing the rate.\n", errors.Wrap(err, "txs send failed"))
  144. os.Exit(1)
  145. }
  146. txNumber++
  147. }
  148. timeToSend := time.Now().Sub(startTime)
  149. time.Sleep(time.Second - timeToSend)
  150. logger.Info(fmt.Sprintf("sent %d transactions", t.Rate), "took", timeToSend)
  151. case <-pingsTicker.C:
  152. // go-rpc server closes the connection in the absence of pings
  153. c.SetWriteDeadline(time.Now().Add(sendTimeout))
  154. if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
  155. logger.Error("failed to write ping message", "err", err)
  156. }
  157. }
  158. if t.stopped {
  159. // To cleanly close a connection, a client should send a close
  160. // frame and wait for the server to close the connection.
  161. c.SetWriteDeadline(time.Now().Add(sendTimeout))
  162. err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
  163. if err != nil {
  164. logger.Error("failed to write close message", "err", err)
  165. }
  166. return
  167. }
  168. }
  169. }
  170. func connect(host string) (*websocket.Conn, *http.Response, error) {
  171. u := url.URL{Scheme: "ws", Host: host, Path: "/websocket"}
  172. return websocket.DefaultDialer.Dial(u.String(), nil)
  173. }
  174. func generateTx(connIndex int, txNumber int, hostnameHash [md5.Size]byte) []byte {
  175. tx := make([]byte, txSize)
  176. binary.PutUvarint(tx[:8], uint64(connIndex))
  177. binary.PutUvarint(tx[8:16], uint64(txNumber))
  178. copy(tx[16:32], hostnameHash[:16])
  179. binary.PutUvarint(tx[32:40], uint64(time.Now().Unix()))
  180. // 40-* random data
  181. if _, err := rand.Read(tx[40:]); err != nil {
  182. panic(errors.Wrap(err, "failed to read random bytes"))
  183. }
  184. return tx
  185. }