You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

211 lines
5.6 KiB

  1. package main
  2. import (
  3. "container/ring"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "math/rand"
  8. "time"
  9. rpchttp "github.com/tendermint/tendermint/rpc/client/http"
  10. e2e "github.com/tendermint/tendermint/test/e2e/pkg"
  11. "github.com/tendermint/tendermint/types"
  12. )
  13. // Load generates transactions against the network until the given context is
  14. // canceled.
  15. func Load(ctx context.Context, testnet *e2e.Testnet) error {
  16. // Since transactions are executed across all nodes in the network, we need
  17. // to reduce transaction load for larger networks to avoid using too much
  18. // CPU. This gives high-throughput small networks and low-throughput large ones.
  19. // This also limits the number of TCP connections, since each worker has
  20. // a connection to all nodes.
  21. concurrency := 64 / len(testnet.Nodes)
  22. if concurrency == 0 {
  23. concurrency = 1
  24. }
  25. chTx := make(chan types.Tx)
  26. chSuccess := make(chan int) // success counts per iteration
  27. ctx, cancel := context.WithCancel(ctx)
  28. defer cancel()
  29. // Spawn job generator and processors.
  30. logger.Info(fmt.Sprintf("Starting transaction load (%v workers)...", concurrency))
  31. started := time.Now()
  32. go loadGenerate(ctx, chTx, testnet.TxSize)
  33. for w := 0; w < concurrency; w++ {
  34. go loadProcess(ctx, testnet, chTx, chSuccess)
  35. }
  36. // Montior transaction to ensure load propagates to the network
  37. //
  38. // This loop doesn't check or time out for stalls, since a stall here just
  39. // aborts the load generator sooner and could obscure backpressure
  40. // from the test harness, and there are other checks for
  41. // stalls in the framework. Ideally we should monitor latency as a guide
  42. // for when to give up, but we don't have a good way to track that yet.
  43. success := 0
  44. for {
  45. select {
  46. case numSeen := <-chSuccess:
  47. success += numSeen
  48. case <-ctx.Done():
  49. // if we couldn't submit any transactions,
  50. // that's probably a problem and the test
  51. // should error; however, for very short tests
  52. // we shouldn't abort.
  53. //
  54. // The 2s cut off, is a rough guess based on
  55. // the expected value of
  56. // loadGenerateWaitTime. If the implementation
  57. // of that function changes, then this might
  58. // also need to change without more
  59. // refactoring.
  60. if success == 0 && time.Since(started) > 2*time.Second {
  61. return errors.New("failed to submit any transactions")
  62. }
  63. // TODO perhaps allow test networks to
  64. // declare required transaction rates, which
  65. // might allow us to avoid the special case
  66. // around 0 txs above.
  67. rate := float64(success) / time.Since(started).Seconds()
  68. logger.Info("ending transaction load",
  69. "dur_secs", time.Since(started).Seconds(),
  70. "txns", success,
  71. "rate", rate,
  72. "slow", rate < 1)
  73. return nil
  74. }
  75. }
  76. }
  77. // loadGenerate generates jobs until the context is canceled.
  78. //
  79. // The chTx has multiple consumers, thus the rate limiting of the load
  80. // generation is primarily the result of backpressure from the
  81. // broadcast transaction, though there is still some timer-based
  82. // limiting.
  83. func loadGenerate(ctx context.Context, chTx chan<- types.Tx, size int64) {
  84. timer := time.NewTimer(0)
  85. defer timer.Stop()
  86. defer close(chTx)
  87. for {
  88. select {
  89. case <-ctx.Done():
  90. return
  91. case <-timer.C:
  92. }
  93. // We keep generating the same 100 keys over and over, with different values.
  94. // This gives a reasonable load without putting too much data in the app.
  95. id := rand.Int63() % 100 // nolint: gosec
  96. bz := make([]byte, size)
  97. _, err := rand.Read(bz) // nolint: gosec
  98. if err != nil {
  99. panic(fmt.Sprintf("Failed to read random bytes: %v", err))
  100. }
  101. tx := types.Tx(fmt.Sprintf("load-%X=%x", id, bz))
  102. select {
  103. case <-ctx.Done():
  104. return
  105. case chTx <- tx:
  106. // sleep for a bit before sending the
  107. // next transaction.
  108. timer.Reset(loadGenerateWaitTime(size))
  109. }
  110. }
  111. }
  112. func loadGenerateWaitTime(size int64) time.Duration {
  113. const (
  114. min = int64(100 * time.Millisecond)
  115. max = int64(time.Second)
  116. )
  117. var (
  118. baseJitter = rand.Int63n(max-min+1) + min // nolint: gosec
  119. sizeFactor = size * int64(time.Millisecond)
  120. sizeJitter = rand.Int63n(sizeFactor-min+1) + min // nolint: gosec
  121. )
  122. return time.Duration(baseJitter + sizeJitter)
  123. }
  124. // loadProcess processes transactions
  125. func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx, chSuccess chan<- int) {
  126. // Each worker gets its own client to each usable node, which
  127. // allows for some concurrency while still bounding it.
  128. clients := make([]*rpchttp.HTTP, 0, len(testnet.Nodes))
  129. for idx := range testnet.Nodes {
  130. // Construct a list of usable nodes for the creating
  131. // load. Don't send load through seed nodes because
  132. // they do not provide the RPC endpoints required to
  133. // broadcast transaction.
  134. if testnet.Nodes[idx].Mode == e2e.ModeSeed {
  135. continue
  136. }
  137. client, err := testnet.Nodes[idx].Client()
  138. if err != nil {
  139. continue
  140. }
  141. clients = append(clients, client)
  142. }
  143. if len(clients) == 0 {
  144. panic("no clients to process load")
  145. }
  146. // Put the clients in a ring so they can be used in a
  147. // round-robin fashion.
  148. clientRing := ring.New(len(clients))
  149. for idx := range clients {
  150. clientRing.Value = clients[idx]
  151. clientRing = clientRing.Next()
  152. }
  153. successes := 0
  154. for {
  155. select {
  156. case <-ctx.Done():
  157. return
  158. case tx := <-chTx:
  159. clientRing = clientRing.Next()
  160. client := clientRing.Value.(*rpchttp.HTTP)
  161. if status, err := client.Status(ctx); err != nil {
  162. continue
  163. } else if status.SyncInfo.CatchingUp {
  164. continue
  165. }
  166. if _, err := client.BroadcastTxSync(ctx, tx); err != nil {
  167. continue
  168. }
  169. successes++
  170. select {
  171. case chSuccess <- successes:
  172. successes = 0 // reset counter for the next iteration
  173. continue
  174. case <-ctx.Done():
  175. return
  176. default:
  177. }
  178. }
  179. }
  180. }