You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

284 lines
7.0 KiB

  1. package main
  2. import (
  3. "crypto/md5"
  4. "encoding/binary"
  5. "encoding/hex"
  6. "encoding/json"
  7. "fmt"
  8. "math/rand"
  9. "net"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "sync"
  14. "time"
  15. "github.com/gorilla/websocket"
  16. "github.com/pkg/errors"
  17. "github.com/tendermint/tendermint/libs/log"
  18. rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
  19. )
  20. const (
  21. sendTimeout = 10 * time.Second
  22. // see https://github.com/tendermint/tendermint/blob/master/rpc/lib/server/handlers.go
  23. pingPeriod = (30 * 9 / 10) * time.Second
  24. )
  25. type transacter struct {
  26. Target string
  27. Rate int
  28. Size int
  29. Connections int
  30. BroadcastTxMethod string
  31. conns []*websocket.Conn
  32. connsBroken []bool
  33. startingWg sync.WaitGroup
  34. endingWg sync.WaitGroup
  35. stopped bool
  36. logger log.Logger
  37. }
  38. func newTransacter(target string, connections, rate int, size int, broadcastTxMethod string) *transacter {
  39. return &transacter{
  40. Target: target,
  41. Rate: rate,
  42. Size: size,
  43. Connections: connections,
  44. BroadcastTxMethod: broadcastTxMethod,
  45. conns: make([]*websocket.Conn, connections),
  46. connsBroken: make([]bool, connections),
  47. logger: log.NewNopLogger(),
  48. }
  49. }
  50. // SetLogger lets you set your own logger
  51. func (t *transacter) SetLogger(l log.Logger) {
  52. t.logger = l
  53. }
  54. // Start opens N = `t.Connections` connections to the target and creates read
  55. // and write goroutines for each connection.
  56. func (t *transacter) Start() error {
  57. t.stopped = false
  58. rand.Seed(time.Now().Unix())
  59. for i := 0; i < t.Connections; i++ {
  60. c, _, err := connect(t.Target)
  61. if err != nil {
  62. return err
  63. }
  64. t.conns[i] = c
  65. }
  66. t.startingWg.Add(t.Connections)
  67. t.endingWg.Add(2 * t.Connections)
  68. for i := 0; i < t.Connections; i++ {
  69. go t.sendLoop(i)
  70. go t.receiveLoop(i)
  71. }
  72. t.startingWg.Wait()
  73. return nil
  74. }
  75. // Stop closes the connections.
  76. func (t *transacter) Stop() {
  77. t.stopped = true
  78. t.endingWg.Wait()
  79. for _, c := range t.conns {
  80. c.Close()
  81. }
  82. }
  83. // receiveLoop reads messages from the connection (empty in case of
  84. // `broadcast_tx_async`).
  85. func (t *transacter) receiveLoop(connIndex int) {
  86. c := t.conns[connIndex]
  87. defer t.endingWg.Done()
  88. for {
  89. _, _, err := c.ReadMessage()
  90. if err != nil {
  91. if !websocket.IsCloseError(err, websocket.CloseNormalClosure) {
  92. t.logger.Error(
  93. fmt.Sprintf("failed to read response on conn %d", connIndex),
  94. "err",
  95. err,
  96. )
  97. }
  98. return
  99. }
  100. if t.stopped || t.connsBroken[connIndex] {
  101. return
  102. }
  103. }
  104. }
  105. // sendLoop generates transactions at a given rate.
  106. func (t *transacter) sendLoop(connIndex int) {
  107. started := false
  108. // Close the starting waitgroup, in the event that this fails to start
  109. defer func() {
  110. if !started {
  111. t.startingWg.Done()
  112. }
  113. }()
  114. c := t.conns[connIndex]
  115. c.SetPingHandler(func(message string) error {
  116. err := c.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(sendTimeout))
  117. if err == websocket.ErrCloseSent {
  118. return nil
  119. } else if e, ok := err.(net.Error); ok && e.Temporary() {
  120. return nil
  121. }
  122. return err
  123. })
  124. logger := t.logger.With("addr", c.RemoteAddr())
  125. var txNumber = 0
  126. pingsTicker := time.NewTicker(pingPeriod)
  127. txsTicker := time.NewTicker(1 * time.Second)
  128. defer func() {
  129. pingsTicker.Stop()
  130. txsTicker.Stop()
  131. t.endingWg.Done()
  132. }()
  133. // hash of the host name is a part of each tx
  134. var hostnameHash [md5.Size]byte
  135. hostname, err := os.Hostname()
  136. if err != nil {
  137. hostname = "127.0.0.1"
  138. }
  139. hostnameHash = md5.Sum([]byte(hostname))
  140. // each transaction embeds connection index, tx number and hash of the hostname
  141. // we update the tx number between successive txs
  142. tx := generateTx(connIndex, txNumber, t.Size, hostnameHash)
  143. txHex := make([]byte, len(tx)*2)
  144. hex.Encode(txHex, tx)
  145. for {
  146. select {
  147. case <-txsTicker.C:
  148. startTime := time.Now()
  149. endTime := startTime.Add(time.Second)
  150. numTxSent := t.Rate
  151. if !started {
  152. t.startingWg.Done()
  153. started = true
  154. }
  155. now := time.Now()
  156. for i := 0; i < t.Rate; i++ {
  157. // update tx number of the tx, and the corresponding hex
  158. updateTx(tx, txHex, txNumber)
  159. paramsJSON, err := json.Marshal(map[string]interface{}{"tx": txHex})
  160. if err != nil {
  161. fmt.Printf("failed to encode params: %v\n", err)
  162. os.Exit(1)
  163. }
  164. rawParamsJSON := json.RawMessage(paramsJSON)
  165. c.SetWriteDeadline(now.Add(sendTimeout))
  166. err = c.WriteJSON(rpctypes.RPCRequest{
  167. JSONRPC: "2.0",
  168. ID: rpctypes.JSONRPCStringID("tm-bench"),
  169. Method: t.BroadcastTxMethod,
  170. Params: rawParamsJSON,
  171. })
  172. if err != nil {
  173. err = errors.Wrap(err,
  174. fmt.Sprintf("txs send failed on connection #%d", connIndex))
  175. t.connsBroken[connIndex] = true
  176. logger.Error(err.Error())
  177. return
  178. }
  179. // cache the time.Now() reads to save time.
  180. if i%5 == 0 {
  181. now = time.Now()
  182. if now.After(endTime) {
  183. // Plus one accounts for sending this tx
  184. numTxSent = i + 1
  185. break
  186. }
  187. }
  188. txNumber++
  189. }
  190. timeToSend := time.Since(startTime)
  191. logger.Info(fmt.Sprintf("sent %d transactions", numTxSent), "took", timeToSend)
  192. if timeToSend < 1*time.Second {
  193. sleepTime := time.Second - timeToSend
  194. logger.Debug(fmt.Sprintf("connection #%d is sleeping for %f seconds", connIndex, sleepTime.Seconds()))
  195. time.Sleep(sleepTime)
  196. }
  197. case <-pingsTicker.C:
  198. // go-rpc server closes the connection in the absence of pings
  199. c.SetWriteDeadline(time.Now().Add(sendTimeout))
  200. if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
  201. err = errors.Wrap(err,
  202. fmt.Sprintf("failed to write ping message on conn #%d", connIndex))
  203. logger.Error(err.Error())
  204. t.connsBroken[connIndex] = true
  205. }
  206. }
  207. if t.stopped {
  208. // To cleanly close a connection, a client should send a close
  209. // frame and wait for the server to close the connection.
  210. c.SetWriteDeadline(time.Now().Add(sendTimeout))
  211. err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
  212. if err != nil {
  213. err = errors.Wrap(err,
  214. fmt.Sprintf("failed to write close message on conn #%d", connIndex))
  215. logger.Error(err.Error())
  216. t.connsBroken[connIndex] = true
  217. }
  218. return
  219. }
  220. }
  221. }
  222. func connect(host string) (*websocket.Conn, *http.Response, error) {
  223. u := url.URL{Scheme: "ws", Host: host, Path: "/websocket"}
  224. return websocket.DefaultDialer.Dial(u.String(), nil)
  225. }
  226. func generateTx(connIndex int, txNumber int, txSize int, hostnameHash [md5.Size]byte) []byte {
  227. tx := make([]byte, txSize)
  228. binary.PutUvarint(tx[:8], uint64(connIndex))
  229. binary.PutUvarint(tx[8:16], uint64(txNumber))
  230. copy(tx[16:32], hostnameHash[:16])
  231. binary.PutUvarint(tx[32:40], uint64(time.Now().Unix()))
  232. // 40-* random data
  233. if _, err := rand.Read(tx[40:]); err != nil {
  234. panic(errors.Wrap(err, "failed to read random bytes"))
  235. }
  236. return tx
  237. }
  238. // warning, mutates input byte slice
  239. func updateTx(tx []byte, txHex []byte, txNumber int) {
  240. binary.PutUvarint(tx[8:16], uint64(txNumber))
  241. hexUpdate := make([]byte, 16)
  242. hex.Encode(hexUpdate, tx[8:16])
  243. for i := 16; i < 32; i++ {
  244. txHex[i] = hexUpdate[i-16]
  245. }
  246. }