You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

106 lines
2.6 KiB

  1. package main
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "errors"
  6. "fmt"
  7. "math"
  8. "time"
  9. rpchttp "github.com/tendermint/tendermint/rpc/client/http"
  10. e2e "github.com/tendermint/tendermint/test/e2e/pkg"
  11. "github.com/tendermint/tendermint/types"
  12. )
  13. // Load generates transactions against the network until the given
  14. // context is cancelled.
  15. func Load(ctx context.Context, testnet *e2e.Testnet) error {
  16. concurrency := 50
  17. initialTimeout := 1 * time.Minute
  18. stallTimeout := 15 * time.Second
  19. chTx := make(chan types.Tx)
  20. chSuccess := make(chan types.Tx)
  21. ctx, cancel := context.WithCancel(ctx)
  22. defer cancel()
  23. // Spawn job generator and processors.
  24. logger.Info("Starting transaction load...")
  25. started := time.Now()
  26. go loadGenerate(ctx, chTx)
  27. for w := 0; w < concurrency; w++ {
  28. go loadProcess(ctx, testnet, chTx, chSuccess)
  29. }
  30. // Monitor successful transactions, and abort on stalls.
  31. success := 0
  32. timeout := initialTimeout
  33. for {
  34. select {
  35. case <-chSuccess:
  36. success++
  37. timeout = stallTimeout
  38. case <-time.After(timeout):
  39. return fmt.Errorf("unable to submit transactions for %v", timeout)
  40. case <-ctx.Done():
  41. if success == 0 {
  42. return errors.New("failed to submit any transactions")
  43. }
  44. logger.Info(fmt.Sprintf("Ending transaction load after %v txs (%.1f tx/s)...",
  45. success, float64(success)/time.Since(started).Seconds()))
  46. return nil
  47. }
  48. }
  49. }
  50. // loadGenerate generates jobs until the context is cancelled
  51. func loadGenerate(ctx context.Context, chTx chan<- types.Tx) {
  52. for i := 0; i < math.MaxInt64; i++ {
  53. // We keep generating the same 1000 keys over and over, with different values.
  54. // This gives a reasonable load without putting too much data in the app.
  55. id := i % 1000
  56. bz := make([]byte, 2048) // 4kb hex-encoded
  57. _, err := rand.Read(bz)
  58. if err != nil {
  59. panic(fmt.Sprintf("Failed to read random bytes: %v", err))
  60. }
  61. tx := types.Tx(fmt.Sprintf("load-%X=%x", id, bz))
  62. select {
  63. case chTx <- tx:
  64. time.Sleep(10 * time.Millisecond)
  65. case <-ctx.Done():
  66. close(chTx)
  67. return
  68. }
  69. }
  70. }
  71. // loadProcess processes transactions
  72. func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx, chSuccess chan<- types.Tx) {
  73. // Each worker gets its own client to each node, which allows for some
  74. // concurrency while still bounding it.
  75. clients := map[string]*rpchttp.HTTP{}
  76. var err error
  77. for tx := range chTx {
  78. node := testnet.RandomNode()
  79. client, ok := clients[node.Name]
  80. if !ok {
  81. client, err = node.Client()
  82. if err != nil {
  83. continue
  84. }
  85. clients[node.Name] = client
  86. }
  87. _, err = client.BroadcastTxCommit(ctx, tx)
  88. if err != nil {
  89. continue
  90. }
  91. chSuccess <- tx
  92. }
  93. }