You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

252 lines
6.2 KiB

  1. package main
  2. import (
  3. "crypto/md5"
  4. "encoding/binary"
  5. "encoding/hex"
  6. "encoding/json"
  7. "fmt"
  8. "math/rand"
  9. "net"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "sync"
  14. "time"
  15. "github.com/gorilla/websocket"
  16. "github.com/pkg/errors"
  17. "github.com/tendermint/tendermint/libs/log"
  18. rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
  19. )
  20. const (
  21. sendTimeout = 10 * time.Second
  22. // see https://github.com/tendermint/tendermint/blob/master/rpc/lib/server/handlers.go
  23. pingPeriod = (30 * 9 / 10) * time.Second
  24. )
  25. type transacter struct {
  26. Target string
  27. Rate int
  28. Size int
  29. Connections int
  30. BroadcastTxMethod string
  31. conns []*websocket.Conn
  32. connsBroken []bool
  33. wg sync.WaitGroup
  34. stopped bool
  35. logger log.Logger
  36. }
  37. func newTransacter(target string, connections, rate int, size int, broadcastTxMethod string) *transacter {
  38. return &transacter{
  39. Target: target,
  40. Rate: rate,
  41. Size: size,
  42. Connections: connections,
  43. BroadcastTxMethod: broadcastTxMethod,
  44. conns: make([]*websocket.Conn, connections),
  45. connsBroken: make([]bool, connections),
  46. logger: log.NewNopLogger(),
  47. }
  48. }
  49. // SetLogger lets you set your own logger
  50. func (t *transacter) SetLogger(l log.Logger) {
  51. t.logger = l
  52. }
  53. // Start opens N = `t.Connections` connections to the target and creates read
  54. // and write goroutines for each connection.
  55. func (t *transacter) Start() error {
  56. t.stopped = false
  57. rand.Seed(time.Now().Unix())
  58. for i := 0; i < t.Connections; i++ {
  59. c, _, err := connect(t.Target)
  60. if err != nil {
  61. return err
  62. }
  63. t.conns[i] = c
  64. }
  65. t.wg.Add(2 * t.Connections)
  66. for i := 0; i < t.Connections; i++ {
  67. go t.sendLoop(i)
  68. go t.receiveLoop(i)
  69. }
  70. return nil
  71. }
  72. // Stop closes the connections.
  73. func (t *transacter) Stop() {
  74. t.stopped = true
  75. t.wg.Wait()
  76. for _, c := range t.conns {
  77. c.Close()
  78. }
  79. }
  80. // receiveLoop reads messages from the connection (empty in case of
  81. // `broadcast_tx_async`).
  82. func (t *transacter) receiveLoop(connIndex int) {
  83. c := t.conns[connIndex]
  84. defer t.wg.Done()
  85. for {
  86. _, _, err := c.ReadMessage()
  87. if err != nil {
  88. if !websocket.IsCloseError(err, websocket.CloseNormalClosure) {
  89. t.logger.Error(
  90. fmt.Sprintf("failed to read response on conn %d", connIndex),
  91. "err",
  92. err,
  93. )
  94. }
  95. return
  96. }
  97. if t.stopped || t.connsBroken[connIndex] {
  98. return
  99. }
  100. }
  101. }
  102. // sendLoop generates transactions at a given rate.
  103. func (t *transacter) sendLoop(connIndex int) {
  104. c := t.conns[connIndex]
  105. c.SetPingHandler(func(message string) error {
  106. err := c.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(sendTimeout))
  107. if err == websocket.ErrCloseSent {
  108. return nil
  109. } else if e, ok := err.(net.Error); ok && e.Temporary() {
  110. return nil
  111. }
  112. return err
  113. })
  114. logger := t.logger.With("addr", c.RemoteAddr())
  115. var txNumber = 0
  116. pingsTicker := time.NewTicker(pingPeriod)
  117. txsTicker := time.NewTicker(1 * time.Second)
  118. defer func() {
  119. pingsTicker.Stop()
  120. txsTicker.Stop()
  121. t.wg.Done()
  122. }()
  123. // hash of the host name is a part of each tx
  124. var hostnameHash [md5.Size]byte
  125. hostname, err := os.Hostname()
  126. if err != nil {
  127. hostname = "127.0.0.1"
  128. }
  129. hostnameHash = md5.Sum([]byte(hostname))
  130. for {
  131. select {
  132. case <-txsTicker.C:
  133. startTime := time.Now()
  134. endTime := startTime.Add(time.Second)
  135. numTxSent := t.Rate
  136. for i := 0; i < t.Rate; i++ {
  137. // each transaction embeds connection index, tx number and hash of the hostname
  138. tx := generateTx(connIndex, txNumber, t.Size, hostnameHash)
  139. paramsJSON, err := json.Marshal(map[string]interface{}{"tx": hex.EncodeToString(tx)})
  140. if err != nil {
  141. fmt.Printf("failed to encode params: %v\n", err)
  142. os.Exit(1)
  143. }
  144. rawParamsJSON := json.RawMessage(paramsJSON)
  145. c.SetWriteDeadline(time.Now().Add(sendTimeout))
  146. err = c.WriteJSON(rpctypes.RPCRequest{
  147. JSONRPC: "2.0",
  148. ID: "tm-bench",
  149. Method: t.BroadcastTxMethod,
  150. Params: rawParamsJSON,
  151. })
  152. if err != nil {
  153. err = errors.Wrap(err,
  154. fmt.Sprintf("txs send failed on connection #%d", connIndex))
  155. t.connsBroken[connIndex] = true
  156. logger.Error(err.Error())
  157. return
  158. }
  159. // Time added here is 7.13 ns/op, not significant enough to worry about
  160. if i%20 == 0 {
  161. if time.Now().After(endTime) {
  162. // Plus one accounts for sending this tx
  163. numTxSent = i + 1
  164. break
  165. }
  166. }
  167. txNumber++
  168. }
  169. timeToSend := time.Since(startTime)
  170. logger.Info(fmt.Sprintf("sent %d transactions", numTxSent), "took", timeToSend)
  171. if timeToSend < 1*time.Second {
  172. sleepTime := time.Second - timeToSend
  173. logger.Debug(fmt.Sprintf("connection #%d is sleeping for %f seconds", connIndex, sleepTime.Seconds()))
  174. time.Sleep(sleepTime)
  175. }
  176. case <-pingsTicker.C:
  177. // go-rpc server closes the connection in the absence of pings
  178. c.SetWriteDeadline(time.Now().Add(sendTimeout))
  179. if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
  180. err = errors.Wrap(err,
  181. fmt.Sprintf("failed to write ping message on conn #%d", connIndex))
  182. logger.Error(err.Error())
  183. t.connsBroken[connIndex] = true
  184. }
  185. }
  186. if t.stopped {
  187. // To cleanly close a connection, a client should send a close
  188. // frame and wait for the server to close the connection.
  189. c.SetWriteDeadline(time.Now().Add(sendTimeout))
  190. err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
  191. if err != nil {
  192. err = errors.Wrap(err,
  193. fmt.Sprintf("failed to write close message on conn #%d", connIndex))
  194. logger.Error(err.Error())
  195. t.connsBroken[connIndex] = true
  196. }
  197. return
  198. }
  199. }
  200. }
  201. func connect(host string) (*websocket.Conn, *http.Response, error) {
  202. u := url.URL{Scheme: "ws", Host: host, Path: "/websocket"}
  203. return websocket.DefaultDialer.Dial(u.String(), nil)
  204. }
  205. func generateTx(connIndex int, txNumber int, txSize int, hostnameHash [md5.Size]byte) []byte {
  206. tx := make([]byte, txSize)
  207. binary.PutUvarint(tx[:8], uint64(connIndex))
  208. binary.PutUvarint(tx[8:16], uint64(txNumber))
  209. copy(tx[16:32], hostnameHash[:16])
  210. binary.PutUvarint(tx[32:40], uint64(time.Now().Unix()))
  211. // 40-* random data
  212. if _, err := rand.Read(tx[40:]); err != nil {
  213. panic(errors.Wrap(err, "failed to read random bytes"))
  214. }
  215. return tx
  216. }