You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

204 lines
5.2 KiB

  1. package main
  2. import (
  3. "container/ring"
  4. "context"
  5. "fmt"
  6. "math/rand"
  7. "time"
  8. rpchttp "github.com/tendermint/tendermint/rpc/client/http"
  9. e2e "github.com/tendermint/tendermint/test/e2e/pkg"
  10. "github.com/tendermint/tendermint/types"
  11. )
  12. // Load generates transactions against the network until the given context is
  13. // canceled.
  14. func Load(ctx context.Context, r *rand.Rand, testnet *e2e.Testnet) error {
  15. // Since transactions are executed across all nodes in the network, we need
  16. // to reduce transaction load for larger networks to avoid using too much
  17. // CPU. This gives high-throughput small networks and low-throughput large ones.
  18. // This also limits the number of TCP connections, since each worker has
  19. // a connection to all nodes.
  20. concurrency := len(testnet.Nodes) * 2
  21. if concurrency > 32 {
  22. concurrency = 32
  23. }
  24. chTx := make(chan types.Tx)
  25. chSuccess := make(chan int) // success counts per iteration
  26. ctx, cancel := context.WithCancel(ctx)
  27. defer cancel()
  28. // Spawn job generator and processors.
  29. logger.Info("starting transaction load",
  30. "workers", concurrency,
  31. "nodes", len(testnet.Nodes),
  32. "tx", testnet.TxSize)
  33. started := time.Now()
  34. go loadGenerate(ctx, r, chTx, testnet.TxSize, len(testnet.Nodes))
  35. for w := 0; w < concurrency; w++ {
  36. go loadProcess(ctx, testnet, chTx, chSuccess)
  37. }
  38. // Montior transaction to ensure load propagates to the network
  39. //
  40. // This loop doesn't check or time out for stalls, since a stall here just
  41. // aborts the load generator sooner and could obscure backpressure
  42. // from the test harness, and there are other checks for
  43. // stalls in the framework. Ideally we should monitor latency as a guide
  44. // for when to give up, but we don't have a good way to track that yet.
  45. success := 0
  46. for {
  47. select {
  48. case numSeen := <-chSuccess:
  49. success += numSeen
  50. case <-ctx.Done():
  51. if success == 0 {
  52. return fmt.Errorf("failed to submit transactions in %s by %d workers",
  53. time.Since(started), concurrency)
  54. }
  55. // TODO perhaps allow test networks to
  56. // declare required transaction rates, which
  57. // might allow us to avoid the special case
  58. // around 0 txs above.
  59. rate := float64(success) / time.Since(started).Seconds()
  60. logger.Info("ending transaction load",
  61. "dur_secs", time.Since(started).Seconds(),
  62. "txns", success,
  63. "workers", concurrency,
  64. "rate", rate)
  65. return nil
  66. }
  67. }
  68. }
  69. // loadGenerate generates jobs until the context is canceled.
  70. //
  71. // The chTx has multiple consumers, thus the rate limiting of the load
  72. // generation is primarily the result of backpressure from the
  73. // broadcast transaction, though there is still some timer-based
  74. // limiting.
  75. func loadGenerate(ctx context.Context, r *rand.Rand, chTx chan<- types.Tx, txSize int64, networkSize int) {
  76. timer := time.NewTimer(0)
  77. defer timer.Stop()
  78. defer close(chTx)
  79. for {
  80. select {
  81. case <-ctx.Done():
  82. return
  83. case <-timer.C:
  84. }
  85. // Constrain the key space to avoid using too much
  86. // space, while reduce the size of the data in the app.
  87. id := r.Int63n(100)
  88. bz := make([]byte, txSize)
  89. _, err := r.Read(bz)
  90. if err != nil {
  91. panic(fmt.Sprintf("Failed to read random bytes: %v", err))
  92. }
  93. tx := types.Tx(fmt.Sprintf("load-%X=%x", id, bz))
  94. select {
  95. case <-ctx.Done():
  96. return
  97. case chTx <- tx:
  98. // sleep for a bit before sending the
  99. // next transaction.
  100. timer.Reset(loadGenerateWaitTime(r, networkSize))
  101. }
  102. }
  103. }
  104. func loadGenerateWaitTime(r *rand.Rand, size int) time.Duration {
  105. const (
  106. min = int64(250 * time.Millisecond)
  107. max = int64(time.Second)
  108. )
  109. var (
  110. baseJitter = r.Int63n(max-min+1) + min
  111. sizeFactor = int64(size) * min
  112. sizeJitter = r.Int63n(sizeFactor-min+1) + min
  113. )
  114. return time.Duration(baseJitter + sizeJitter)
  115. }
  116. // loadProcess processes transactions
  117. func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx, chSuccess chan<- int) {
  118. // Each worker gets its own client to each usable node, which
  119. // allows for some concurrency while still bounding it.
  120. clients := make([]*rpchttp.HTTP, 0, len(testnet.Nodes))
  121. for idx := range testnet.Nodes {
  122. // Construct a list of usable nodes for the creating
  123. // load. Don't send load through seed nodes because
  124. // they do not provide the RPC endpoints required to
  125. // broadcast transaction.
  126. if testnet.Nodes[idx].Mode == e2e.ModeSeed {
  127. continue
  128. }
  129. client, err := testnet.Nodes[idx].Client()
  130. if err != nil {
  131. continue
  132. }
  133. clients = append(clients, client)
  134. }
  135. if len(clients) == 0 {
  136. panic("no clients to process load")
  137. }
  138. // Put the clients in a ring so they can be used in a
  139. // round-robin fashion.
  140. clientRing := ring.New(len(clients))
  141. for idx := range clients {
  142. clientRing.Value = clients[idx]
  143. clientRing = clientRing.Next()
  144. }
  145. successes := 0
  146. for {
  147. select {
  148. case <-ctx.Done():
  149. return
  150. case tx := <-chTx:
  151. clientRing = clientRing.Next()
  152. client := clientRing.Value.(*rpchttp.HTTP)
  153. if status, err := client.Status(ctx); err != nil {
  154. continue
  155. } else if status.SyncInfo.CatchingUp {
  156. continue
  157. }
  158. if _, err := client.BroadcastTxSync(ctx, tx); err != nil {
  159. continue
  160. }
  161. successes++
  162. select {
  163. case chSuccess <- successes:
  164. successes = 0 // reset counter for the next iteration
  165. continue
  166. case <-ctx.Done():
  167. return
  168. default:
  169. }
  170. }
  171. }
  172. }