You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

223 lines
5.4 KiB

  1. package main
  2. import (
  3. "crypto/md5"
  4. "encoding/binary"
  5. "encoding/hex"
  6. "encoding/json"
  7. "fmt"
  8. "math/rand"
  9. "net"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "sync"
  14. "time"
  15. "github.com/gorilla/websocket"
  16. "github.com/pkg/errors"
  17. rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
  18. "github.com/tendermint/tmlibs/log"
  19. )
  20. const (
  21. sendTimeout = 10 * time.Second
  22. // see https://github.com/tendermint/go-rpc/blob/develop/server/handlers.go#L313
  23. pingPeriod = (30 * 9 / 10) * time.Second
  24. )
  25. type transacter struct {
  26. Target string
  27. Rate int
  28. Size int
  29. Connections int
  30. BroadcastTxMethod string
  31. conns []*websocket.Conn
  32. wg sync.WaitGroup
  33. stopped bool
  34. logger log.Logger
  35. }
  36. func newTransacter(target string, connections, rate int, size int, broadcastTxMethod string) *transacter {
  37. return &transacter{
  38. Target: target,
  39. Rate: rate,
  40. Size: size,
  41. Connections: connections,
  42. BroadcastTxMethod: broadcastTxMethod,
  43. conns: make([]*websocket.Conn, connections),
  44. logger: log.NewNopLogger(),
  45. }
  46. }
  47. // SetLogger lets you set your own logger
  48. func (t *transacter) SetLogger(l log.Logger) {
  49. t.logger = l
  50. }
  51. // Start opens N = `t.Connections` connections to the target and creates read
  52. // and write goroutines for each connection.
  53. func (t *transacter) Start() error {
  54. t.stopped = false
  55. rand.Seed(time.Now().Unix())
  56. for i := 0; i < t.Connections; i++ {
  57. c, _, err := connect(t.Target)
  58. if err != nil {
  59. return err
  60. }
  61. t.conns[i] = c
  62. }
  63. t.wg.Add(2 * t.Connections)
  64. for i := 0; i < t.Connections; i++ {
  65. go t.sendLoop(i)
  66. go t.receiveLoop(i)
  67. }
  68. return nil
  69. }
  70. // Stop closes the connections.
  71. func (t *transacter) Stop() {
  72. t.stopped = true
  73. t.wg.Wait()
  74. for _, c := range t.conns {
  75. c.Close()
  76. }
  77. }
  78. // receiveLoop reads messages from the connection (empty in case of
  79. // `broadcast_tx_async`).
  80. func (t *transacter) receiveLoop(connIndex int) {
  81. c := t.conns[connIndex]
  82. defer t.wg.Done()
  83. for {
  84. _, _, err := c.ReadMessage()
  85. if err != nil {
  86. if !websocket.IsCloseError(err, websocket.CloseNormalClosure) {
  87. t.logger.Error("failed to read response", "err", err)
  88. }
  89. return
  90. }
  91. if t.stopped {
  92. return
  93. }
  94. }
  95. }
  96. // sendLoop generates transactions at a given rate.
  97. func (t *transacter) sendLoop(connIndex int) {
  98. c := t.conns[connIndex]
  99. c.SetPingHandler(func(message string) error {
  100. err := c.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(sendTimeout))
  101. if err == websocket.ErrCloseSent {
  102. return nil
  103. } else if e, ok := err.(net.Error); ok && e.Temporary() {
  104. return nil
  105. }
  106. return err
  107. })
  108. logger := t.logger.With("addr", c.RemoteAddr())
  109. var txNumber = 0
  110. pingsTicker := time.NewTicker(pingPeriod)
  111. txsTicker := time.NewTicker(1 * time.Second)
  112. defer func() {
  113. pingsTicker.Stop()
  114. txsTicker.Stop()
  115. t.wg.Done()
  116. }()
  117. // hash of the host name is a part of each tx
  118. var hostnameHash [md5.Size]byte
  119. hostname, err := os.Hostname()
  120. if err != nil {
  121. hostname = "127.0.0.1"
  122. }
  123. hostnameHash = md5.Sum([]byte(hostname))
  124. for {
  125. select {
  126. case <-txsTicker.C:
  127. startTime := time.Now()
  128. for i := 0; i < t.Rate; i++ {
  129. // each transaction embeds connection index, tx number and hash of the hostname
  130. tx := generateTx(connIndex, txNumber, t.Size, hostnameHash)
  131. paramsJSON, err := json.Marshal(map[string]interface{}{"tx": hex.EncodeToString(tx)})
  132. if err != nil {
  133. fmt.Printf("failed to encode params: %v\n", err)
  134. os.Exit(1)
  135. }
  136. rawParamsJSON := json.RawMessage(paramsJSON)
  137. c.SetWriteDeadline(time.Now().Add(sendTimeout))
  138. err = c.WriteJSON(rpctypes.RPCRequest{
  139. JSONRPC: "2.0",
  140. ID: "tm-bench",
  141. Method: t.BroadcastTxMethod,
  142. Params: rawParamsJSON,
  143. })
  144. if err != nil {
  145. fmt.Fprintf(os.Stderr, "%v. Try reducing the connections count and increasing the rate.\n", errors.Wrap(err, "txs send failed"))
  146. os.Exit(1)
  147. }
  148. txNumber++
  149. }
  150. timeToSend := time.Now().Sub(startTime)
  151. if timeToSend < 1*time.Second {
  152. time.Sleep(time.Second - timeToSend)
  153. }
  154. logger.Info(fmt.Sprintf("sent %d transactions", t.Rate), "took", timeToSend)
  155. case <-pingsTicker.C:
  156. // go-rpc server closes the connection in the absence of pings
  157. c.SetWriteDeadline(time.Now().Add(sendTimeout))
  158. if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
  159. logger.Error("failed to write ping message", "err", err)
  160. }
  161. }
  162. if t.stopped {
  163. // To cleanly close a connection, a client should send a close
  164. // frame and wait for the server to close the connection.
  165. c.SetWriteDeadline(time.Now().Add(sendTimeout))
  166. err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
  167. if err != nil {
  168. logger.Error("failed to write close message", "err", err)
  169. }
  170. return
  171. }
  172. }
  173. }
  174. func connect(host string) (*websocket.Conn, *http.Response, error) {
  175. u := url.URL{Scheme: "ws", Host: host, Path: "/websocket"}
  176. return websocket.DefaultDialer.Dial(u.String(), nil)
  177. }
  178. func generateTx(connIndex int, txNumber int, txSize int, hostnameHash [md5.Size]byte) []byte {
  179. tx := make([]byte, txSize)
  180. binary.PutUvarint(tx[:8], uint64(connIndex))
  181. binary.PutUvarint(tx[8:16], uint64(txNumber))
  182. copy(tx[16:32], hostnameHash[:16])
  183. binary.PutUvarint(tx[32:40], uint64(time.Now().Unix()))
  184. // 40-* random data
  185. if _, err := rand.Read(tx[40:]); err != nil {
  186. panic(errors.Wrap(err, "failed to read random bytes"))
  187. }
  188. return tx
  189. }