You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

157 lines
4.1 KiB

  1. package main
  2. import (
  3. "container/ring"
  4. "context"
  5. "crypto/rand"
  6. "errors"
  7. "fmt"
  8. "math"
  9. "time"
  10. rpchttp "github.com/tendermint/tendermint/rpc/client/http"
  11. e2e "github.com/tendermint/tendermint/test/e2e/pkg"
  12. "github.com/tendermint/tendermint/types"
  13. )
  14. // Load generates transactions against the network until the given context is
  15. // canceled. A multiplier of greater than one can be supplied if load needs to
  16. // be generated beyond a minimum amount.
  17. func Load(ctx context.Context, testnet *e2e.Testnet, multiplier int) error {
  18. // Since transactions are executed across all nodes in the network, we need
  19. // to reduce transaction load for larger networks to avoid using too much
  20. // CPU. This gives high-throughput small networks and low-throughput large ones.
  21. // This also limits the number of TCP connections, since each worker has
  22. // a connection to all nodes.
  23. concurrency := 64 / len(testnet.Nodes)
  24. if concurrency == 0 {
  25. concurrency = 1
  26. }
  27. initialTimeout := 1 * time.Minute
  28. stallTimeout := 30 * time.Second
  29. chTx := make(chan types.Tx)
  30. chSuccess := make(chan types.Tx)
  31. ctx, cancel := context.WithCancel(ctx)
  32. defer cancel()
  33. // Spawn job generator and processors.
  34. logger.Info(fmt.Sprintf("Starting transaction load (%v workers)...", concurrency))
  35. started := time.Now()
  36. go loadGenerate(ctx, chTx, multiplier, testnet.TxSize)
  37. for w := 0; w < concurrency; w++ {
  38. go loadProcess(ctx, testnet, chTx, chSuccess)
  39. }
  40. // Monitor successful transactions, and abort on stalls.
  41. success := 0
  42. timeout := initialTimeout
  43. for {
  44. select {
  45. case <-chSuccess:
  46. success++
  47. timeout = stallTimeout
  48. case <-time.After(timeout):
  49. return fmt.Errorf("unable to submit transactions for %v", timeout)
  50. case <-ctx.Done():
  51. if success == 0 {
  52. return errors.New("failed to submit any transactions")
  53. }
  54. logger.Info(fmt.Sprintf("Ending transaction load after %v txs (%.1f tx/s)...",
  55. success, float64(success)/time.Since(started).Seconds()))
  56. return nil
  57. }
  58. }
  59. }
  60. // loadGenerate generates jobs until the context is canceled
  61. func loadGenerate(ctx context.Context, chTx chan<- types.Tx, multiplier int, size int64) {
  62. for i := 0; i < math.MaxInt64; i++ {
  63. // We keep generating the same 100 keys over and over, with different values.
  64. // This gives a reasonable load without putting too much data in the app.
  65. id := i % 100
  66. bz := make([]byte, size)
  67. _, err := rand.Read(bz)
  68. if err != nil {
  69. panic(fmt.Sprintf("Failed to read random bytes: %v", err))
  70. }
  71. tx := types.Tx(fmt.Sprintf("load-%X=%x", id, bz))
  72. select {
  73. case chTx <- tx:
  74. sqrtSize := int(math.Sqrt(float64(size)))
  75. time.Sleep(10 * time.Millisecond * time.Duration(sqrtSize/multiplier))
  76. case <-ctx.Done():
  77. close(chTx)
  78. return
  79. }
  80. }
  81. }
  82. // loadProcess processes transactions
  83. func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx, chSuccess chan<- types.Tx) {
  84. // Each worker gets its own client to each usable node, which
  85. // allows for some concurrency while still bounding it.
  86. clients := make([]*rpchttp.HTTP, 0, len(testnet.Nodes))
  87. for idx := range testnet.Nodes {
  88. // Construct a list of usable nodes for the creating
  89. // load. Don't send load through seed nodes because
  90. // they do not provide the RPC endpoints required to
  91. // broadcast transaction.
  92. if testnet.Nodes[idx].Mode == e2e.ModeSeed {
  93. continue
  94. }
  95. client, err := testnet.Nodes[idx].Client()
  96. if err != nil {
  97. continue
  98. }
  99. clients = append(clients, client)
  100. }
  101. if len(clients) == 0 {
  102. panic("no clients to process load")
  103. }
  104. // Put the clients in a ring so they can be used in a
  105. // round-robin fashion.
  106. clientRing := ring.New(len(clients))
  107. for idx := range clients {
  108. clientRing.Value = clients[idx]
  109. clientRing = clientRing.Next()
  110. }
  111. var err error
  112. for {
  113. select {
  114. case <-ctx.Done():
  115. return
  116. case tx := <-chTx:
  117. clientRing = clientRing.Next()
  118. client := clientRing.Value.(*rpchttp.HTTP)
  119. if _, err := client.Health(ctx); err != nil {
  120. continue
  121. }
  122. if _, err = client.BroadcastTxSync(ctx, tx); err != nil {
  123. continue
  124. }
  125. select {
  126. case chSuccess <- tx:
  127. continue
  128. case <-ctx.Done():
  129. return
  130. }
  131. }
  132. }
  133. }