You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

126 lines
3.4 KiB

  1. package main
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "errors"
  6. "fmt"
  7. "math"
  8. "time"
  9. rpchttp "github.com/tendermint/tendermint/rpc/client/http"
  10. e2e "github.com/tendermint/tendermint/test/e2e/pkg"
  11. "github.com/tendermint/tendermint/types"
  12. )
  13. // Load generates transactions against the network until the given context is
  14. // canceled. A multiplier of greater than one can be supplied if load needs to
  15. // be generated beyond a minimum amount.
  16. func Load(ctx context.Context, testnet *e2e.Testnet, multiplier int) error {
  17. // Since transactions are executed across all nodes in the network, we need
  18. // to reduce transaction load for larger networks to avoid using too much
  19. // CPU. This gives high-throughput small networks and low-throughput large ones.
  20. // This also limits the number of TCP connections, since each worker has
  21. // a connection to all nodes.
  22. concurrency := 64 / len(testnet.Nodes)
  23. if concurrency == 0 {
  24. concurrency = 1
  25. }
  26. initialTimeout := 1 * time.Minute
  27. stallTimeout := 30 * time.Second
  28. chTx := make(chan types.Tx)
  29. chSuccess := make(chan types.Tx)
  30. ctx, cancel := context.WithCancel(ctx)
  31. defer cancel()
  32. // Spawn job generator and processors.
  33. logger.Info(fmt.Sprintf("Starting transaction load (%v workers)...", concurrency))
  34. started := time.Now()
  35. go loadGenerate(ctx, chTx, multiplier, testnet.TxSize)
  36. for w := 0; w < concurrency; w++ {
  37. go loadProcess(ctx, testnet, chTx, chSuccess)
  38. }
  39. // Monitor successful transactions, and abort on stalls.
  40. success := 0
  41. timeout := initialTimeout
  42. for {
  43. select {
  44. case <-chSuccess:
  45. success++
  46. timeout = stallTimeout
  47. case <-time.After(timeout):
  48. return fmt.Errorf("unable to submit transactions for %v", timeout)
  49. case <-ctx.Done():
  50. if success == 0 {
  51. return errors.New("failed to submit any transactions")
  52. }
  53. logger.Info(fmt.Sprintf("Ending transaction load after %v txs (%.1f tx/s)...",
  54. success, float64(success)/time.Since(started).Seconds()))
  55. return nil
  56. }
  57. }
  58. }
  59. // loadGenerate generates jobs until the context is canceled
  60. func loadGenerate(ctx context.Context, chTx chan<- types.Tx, multiplier int, size int64) {
  61. for i := 0; i < math.MaxInt64; i++ {
  62. // We keep generating the same 100 keys over and over, with different values.
  63. // This gives a reasonable load without putting too much data in the app.
  64. id := i % 100
  65. bz := make([]byte, size)
  66. _, err := rand.Read(bz)
  67. if err != nil {
  68. panic(fmt.Sprintf("Failed to read random bytes: %v", err))
  69. }
  70. tx := types.Tx(fmt.Sprintf("load-%X=%x", id, bz))
  71. select {
  72. case chTx <- tx:
  73. sqrtSize := int(math.Sqrt(float64(size)))
  74. time.Sleep(10 * time.Millisecond * time.Duration(sqrtSize/multiplier))
  75. case <-ctx.Done():
  76. close(chTx)
  77. return
  78. }
  79. }
  80. }
  81. // loadProcess processes transactions
  82. func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx, chSuccess chan<- types.Tx) {
  83. // Each worker gets its own client to each node, which allows for some
  84. // concurrency while still bounding it.
  85. clients := map[string]*rpchttp.HTTP{}
  86. var err error
  87. for tx := range chTx {
  88. node := testnet.RandomNode()
  89. client, ok := clients[node.Name]
  90. if !ok {
  91. client, err = node.Client()
  92. if err != nil {
  93. continue
  94. }
  95. // check that the node is up
  96. _, err = client.Health(ctx)
  97. if err != nil {
  98. continue
  99. }
  100. clients[node.Name] = client
  101. }
  102. if _, err = client.BroadcastTxSync(ctx, tx); err != nil {
  103. continue
  104. }
  105. chSuccess <- tx
  106. }
  107. }