You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

217 lines
5.0 KiB

  1. package main
  2. import (
  3. "encoding/binary"
  4. "encoding/hex"
  5. "encoding/json"
  6. "fmt"
  7. "math/rand"
  8. "net"
  9. "net/http"
  10. "net/url"
  11. "os"
  12. "sync"
  13. "time"
  14. "github.com/gorilla/websocket"
  15. "github.com/pkg/errors"
  16. rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
  17. "github.com/tendermint/tmlibs/log"
  18. )
  19. const (
  20. sendTimeout = 10 * time.Second
  21. // see https://github.com/tendermint/go-rpc/blob/develop/server/handlers.go#L313
  22. pingPeriod = (30 * 9 / 10) * time.Second
  23. )
  24. type transacter struct {
  25. Target string
  26. Rate int
  27. Connections int
  28. HostHash [16]byte
  29. conns []*websocket.Conn
  30. wg sync.WaitGroup
  31. stopped bool
  32. logger log.Logger
  33. }
  34. func newTransacter(target string, connections int, rate int, hosthash [16]byte) *transacter {
  35. return &transacter{
  36. Target: target,
  37. Rate: rate,
  38. Connections: connections,
  39. HostHash: hosthash,
  40. conns: make([]*websocket.Conn, connections),
  41. logger: log.NewNopLogger(),
  42. }
  43. }
  44. // SetLogger lets you set your own logger
  45. func (t *transacter) SetLogger(l log.Logger) {
  46. t.logger = l
  47. }
  48. // Start opens N = `t.Connections` connections to the target and creates read
  49. // and write goroutines for each connection.
  50. func (t *transacter) Start() error {
  51. t.stopped = false
  52. for i := 0; i < t.Connections; i++ {
  53. c, _, err := connect(t.Target)
  54. if err != nil {
  55. return err
  56. }
  57. t.conns[i] = c
  58. }
  59. t.wg.Add(2 * t.Connections)
  60. for i := 0; i < t.Connections; i++ {
  61. go t.sendLoop(i)
  62. go t.receiveLoop(i)
  63. }
  64. return nil
  65. }
  66. // Stop closes the connections.
  67. func (t *transacter) Stop() {
  68. t.stopped = true
  69. t.wg.Wait()
  70. for _, c := range t.conns {
  71. c.Close()
  72. }
  73. }
  74. // receiveLoop reads messages from the connection (empty in case of
  75. // `broadcast_tx_async`).
  76. func (t *transacter) receiveLoop(connIndex int) {
  77. c := t.conns[connIndex]
  78. defer t.wg.Done()
  79. for {
  80. _, _, err := c.ReadMessage()
  81. if err != nil {
  82. if !websocket.IsCloseError(err, websocket.CloseNormalClosure) {
  83. t.logger.Error("failed to read response", "err", err)
  84. }
  85. return
  86. }
  87. if t.stopped {
  88. return
  89. }
  90. }
  91. }
  92. // sendLoop generates transactions at a given rate.
  93. func (t *transacter) sendLoop(connIndex int) {
  94. c := t.conns[connIndex]
  95. c.SetPingHandler(func(message string) error {
  96. err := c.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(sendTimeout))
  97. if err == websocket.ErrCloseSent {
  98. return nil
  99. } else if e, ok := err.(net.Error); ok && e.Temporary() {
  100. return nil
  101. }
  102. return err
  103. })
  104. logger := t.logger.With("addr", c.RemoteAddr())
  105. var txNumber = 0
  106. pingsTicker := time.NewTicker(pingPeriod)
  107. txsTicker := time.NewTicker(1 * time.Second)
  108. defer func() {
  109. pingsTicker.Stop()
  110. txsTicker.Stop()
  111. t.wg.Done()
  112. }()
  113. for {
  114. select {
  115. case <-txsTicker.C:
  116. startTime := time.Now()
  117. for i := 0; i < t.Rate; i++ {
  118. // each transaction embeds connection index and tx number
  119. tx := generateTx(connIndex, txNumber, t.HostHash)
  120. paramsJson, err := json.Marshal(map[string]interface{}{"tx": hex.EncodeToString(tx)})
  121. if err != nil {
  122. fmt.Printf("failed to encode params: %v\n", err)
  123. os.Exit(1)
  124. }
  125. rawParamsJson := json.RawMessage(paramsJson)
  126. c.SetWriteDeadline(time.Now().Add(sendTimeout))
  127. err = c.WriteJSON(rpctypes.RPCRequest{
  128. JSONRPC: "2.0",
  129. ID: "",
  130. Method: "broadcast_tx_async",
  131. Params: &rawParamsJson,
  132. })
  133. if err != nil {
  134. fmt.Printf("%v. Try reducing the connections count and increasing the rate.\n", errors.Wrap(err, "txs send failed"))
  135. os.Exit(1)
  136. }
  137. txNumber++
  138. }
  139. timeToSend := time.Now().Sub(startTime)
  140. time.Sleep(time.Second - timeToSend)
  141. logger.Info(fmt.Sprintf("sent %d transactions", t.Rate), "took", timeToSend)
  142. case <-pingsTicker.C:
  143. // go-rpc server closes the connection in the absence of pings
  144. c.SetWriteDeadline(time.Now().Add(sendTimeout))
  145. if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
  146. logger.Error("failed to write ping message", "err", err)
  147. }
  148. }
  149. if t.stopped {
  150. // To cleanly close a connection, a client should send a close
  151. // frame and wait for the server to close the connection.
  152. c.SetWriteDeadline(time.Now().Add(sendTimeout))
  153. err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
  154. if err != nil {
  155. logger.Error("failed to write close message", "err", err)
  156. }
  157. return
  158. }
  159. }
  160. }
  161. func connect(host string) (*websocket.Conn, *http.Response, error) {
  162. u := url.URL{Scheme: "ws", Host: host, Path: "/websocket"}
  163. return websocket.DefaultDialer.Dial(u.String(), nil)
  164. }
  165. func generateTx(a int, b int, hosthash [16]byte) []byte {
  166. // 250 byte transaction
  167. tx := make([]byte, 250)
  168. // 0-8 connection number
  169. binary.PutUvarint(tx[:8], uint64(a))
  170. // 8-16 transaction number
  171. binary.PutUvarint(tx[8:16], uint64(b))
  172. // 16-32 hostname hash
  173. for i:=0; i < 16 ; i++ {
  174. tx[16+i] = hosthash[i]
  175. }
  176. // 32-40 current time
  177. binary.PutUvarint(tx[32:40], uint64(time.Now().Unix()))
  178. // 40- random data
  179. if _, err := rand.Read(tx[40:]); err != nil {
  180. panic(errors.Wrap(err, "failed to generate transaction"))
  181. }
  182. return tx
  183. }