From af71f1cbcb7e87b0584cecc2f507ba544e674d4b Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Fri, 10 Sep 2021 09:26:17 -0400 Subject: [PATCH] e2e: load generation and logging changes (#6912) --- test/e2e/generator/generate.go | 2 +- test/e2e/networks/ci.toml | 10 +- test/e2e/runner/benchmark.go | 10 +- test/e2e/runner/evidence.go | 12 +- test/e2e/runner/load.go | 96 +++++++++------ test/e2e/runner/main.go | 57 +++++---- test/e2e/runner/perturb.go | 27 +++-- test/e2e/runner/rpc.go | 207 ++++++++++++++++++--------------- test/e2e/runner/start.go | 30 ++++- test/e2e/runner/wait.go | 20 ++-- test/e2e/tests/app_test.go | 7 +- 11 files changed, 279 insertions(+), 199 deletions(-) diff --git a/test/e2e/generator/generate.go b/test/e2e/generator/generate.go index 7b1ae2e82..ed835b88f 100644 --- a/test/e2e/generator/generate.go +++ b/test/e2e/generator/generate.go @@ -90,7 +90,7 @@ func Generate(r *rand.Rand, opts Options) ([]e2e.Manifest, error) { if opts.Sorted { // When the sorted flag is set (generally, as long as - // groups arent) + // groups aren't set), e2e.SortManifests(manifests) } diff --git a/test/e2e/networks/ci.toml b/test/e2e/networks/ci.toml index d25701859..f082bac6f 100644 --- a/test/e2e/networks/ci.toml +++ b/test/e2e/networks/ci.toml @@ -43,6 +43,7 @@ persist_interval = 0 perturb = ["restart"] privval_protocol = "tcp" seeds = ["seed01"] +block_sync = "v0" [node.validator03] database = "badgerdb" @@ -51,6 +52,7 @@ abci_protocol = "grpc" persist_interval = 3 perturb = ["kill"] privval_protocol = "grpc" +block_sync = "v0" retain_blocks = 7 [node.validator04] @@ -59,12 +61,13 @@ snapshot_interval = 5 database = "rocksdb" persistent_peers = ["validator01"] perturb = ["pause"] +block_sync = "v0" [node.validator05] -database = "cleveldb" -block_sync = "v0" +database = "cleveldb" +block_sync = "v0" state_sync = "p2p" -seeds = ["seed01"] +seeds = ["seed01"] start_at = 1005 # Becomes part of the validator set at 1010 abci_protocol = "grpc" perturb = ["pause", "disconnect", "restart"] @@ -73,7 +76,6 @@ privval_protocol = "tcp" [node.full01] mode = "full" start_at = 1010 -# FIXME: should be v2, disabled due to flake block_sync = "v0" persistent_peers = ["validator01", "validator02", "validator03", "validator04"] perturb = ["restart"] diff --git a/test/e2e/runner/benchmark.go b/test/e2e/runner/benchmark.go index 74d2491f5..26bee5e3c 100644 --- a/test/e2e/runner/benchmark.go +++ b/test/e2e/runner/benchmark.go @@ -21,8 +21,8 @@ import ( // // Metrics are based of the `benchmarkLength`, the amount of consecutive blocks // sampled from in the testnet -func Benchmark(testnet *e2e.Testnet, benchmarkLength int64) error { - block, _, err := waitForHeight(testnet, 0) +func Benchmark(ctx context.Context, testnet *e2e.Testnet, benchmarkLength int64) error { + block, _, err := waitForHeight(ctx, testnet, 0) if err != nil { return err } @@ -32,13 +32,15 @@ func Benchmark(testnet *e2e.Testnet, benchmarkLength int64) error { // wait for the length of the benchmark period in blocks to pass. We allow 5 seconds for each block // which should be sufficient. waitingTime := time.Duration(benchmarkLength*5) * time.Second - endHeight, err := waitForAllNodes(testnet, block.Height+benchmarkLength, waitingTime) + ctx, cancel := context.WithTimeout(ctx, waitingTime) + defer cancel() + block, _, err = waitForHeight(ctx, testnet, block.Height+benchmarkLength) if err != nil { return err } dur := time.Since(startAt) - logger.Info("Ending benchmark period", "height", endHeight) + logger.Info("Ending benchmark period", "height", block.Height) // fetch a sample of blocks blocks, err := fetchBlockChainSample(testnet, benchmarkLength) diff --git a/test/e2e/runner/evidence.go b/test/e2e/runner/evidence.go index 9249d7958..18b907f18 100644 --- a/test/e2e/runner/evidence.go +++ b/test/e2e/runner/evidence.go @@ -28,7 +28,7 @@ const lightClientEvidenceRatio = 4 // evidence and broadcasts it to a random node through the rpc endpoint `/broadcast_evidence`. // Evidence is random and can be a mixture of LightClientAttackEvidence and // DuplicateVoteEvidence. -func InjectEvidence(testnet *e2e.Testnet, amount int) error { +func InjectEvidence(ctx context.Context, testnet *e2e.Testnet, amount int) error { // select a random node var targetNode *e2e.Node @@ -79,9 +79,12 @@ func InjectEvidence(testnet *e2e.Testnet, amount int) error { return err } + wctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + // wait for the node to reach the height above the forged height so that // it is able to validate the evidence - _, err = waitForNode(targetNode, waitHeight, 30*time.Second) + _, err = waitForNode(wctx, targetNode, waitHeight) if err != nil { return err } @@ -107,9 +110,12 @@ func InjectEvidence(testnet *e2e.Testnet, amount int) error { } } + wctx, cancel = context.WithTimeout(ctx, 10*time.Second) + defer cancel() + // wait for the node to reach the height above the forged height so that // it is able to validate the evidence - _, err = waitForNode(targetNode, blockRes.Block.Height+2, 10*time.Second) + _, err = waitForNode(wctx, targetNode, blockRes.Block.Height+2) if err != nil { return err } diff --git a/test/e2e/runner/load.go b/test/e2e/runner/load.go index b57c96ddf..ede8e9daa 100644 --- a/test/e2e/runner/load.go +++ b/test/e2e/runner/load.go @@ -3,10 +3,9 @@ package main import ( "container/ring" "context" - "crypto/rand" "errors" "fmt" - "math" + "math/rand" "time" rpchttp "github.com/tendermint/tendermint/rpc/client/http" @@ -15,9 +14,8 @@ import ( ) // Load generates transactions against the network until the given context is -// canceled. A multiplier of greater than one can be supplied if load needs to -// be generated beyond a minimum amount. -func Load(ctx context.Context, testnet *e2e.Testnet, multiplier int) error { +// canceled. +func Load(ctx context.Context, testnet *e2e.Testnet) error { // Since transactions are executed across all nodes in the network, we need // to reduce transaction load for larger networks to avoid using too much // CPU. This gives high-throughput small networks and low-throughput large ones. @@ -27,11 +25,9 @@ func Load(ctx context.Context, testnet *e2e.Testnet, multiplier int) error { if concurrency == 0 { concurrency = 1 } - initialTimeout := 1 * time.Minute - stallTimeout := 30 * time.Second chTx := make(chan types.Tx) - chSuccess := make(chan types.Tx) + chSuccess := make(chan int) // success counts per iteration ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -39,61 +35,89 @@ func Load(ctx context.Context, testnet *e2e.Testnet, multiplier int) error { logger.Info(fmt.Sprintf("Starting transaction load (%v workers)...", concurrency)) started := time.Now() - go loadGenerate(ctx, chTx, multiplier, testnet.TxSize) + go loadGenerate(ctx, chTx, testnet.TxSize) for w := 0; w < concurrency; w++ { go loadProcess(ctx, testnet, chTx, chSuccess) } - // Monitor successful transactions, and abort on stalls. + // Montior transaction to ensure load propagates to the network + // + // This loop doesn't check or time out for stalls, since a stall here just + // aborts the load generator sooner and could obscure backpressure + // from the test harness, and there are other checks for + // stalls in the framework. Ideally we should monitor latency as a guide + // for when to give up, but we don't have a good way to track that yet. success := 0 - timeout := initialTimeout for { select { - case <-chSuccess: - success++ - timeout = stallTimeout - case <-time.After(timeout): - return fmt.Errorf("unable to submit transactions for %v", timeout) + case numSeen := <-chSuccess: + success += numSeen case <-ctx.Done(): if success == 0 { return errors.New("failed to submit any transactions") } - logger.Info(fmt.Sprintf("Ending transaction load after %v txs (%.1f tx/s)...", - success, float64(success)/time.Since(started).Seconds())) + rate := float64(success) / time.Since(started).Seconds() + + logger.Info("ending transaction load", + "dur_secs", time.Since(started).Seconds(), + "txns", success, + "rate", rate) + + if rate < 2 { + logger.Error("transaction throughput was low", + "rate", rate) + } + return nil } } } -// loadGenerate generates jobs until the context is canceled -func loadGenerate(ctx context.Context, chTx chan<- types.Tx, multiplier int, size int64) { - for i := 0; i < math.MaxInt64; i++ { +// loadGenerate generates jobs until the context is canceled. +// +// The chTx has multiple consumers, thus the rate limiting of the load +// generation is primarily the result of backpressure from the +// broadcast transaction, though at most one transaction will be +// produced every 10ms. +func loadGenerate(ctx context.Context, chTx chan<- types.Tx, size int64) { + timer := time.NewTimer(0) + defer timer.Stop() + defer close(chTx) + + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + } + // We keep generating the same 100 keys over and over, with different values. // This gives a reasonable load without putting too much data in the app. - id := i % 100 + id := rand.Int63() % 100 // nolint: gosec bz := make([]byte, size) - _, err := rand.Read(bz) + _, err := rand.Read(bz) // nolint: gosec if err != nil { panic(fmt.Sprintf("Failed to read random bytes: %v", err)) } tx := types.Tx(fmt.Sprintf("load-%X=%x", id, bz)) select { - case chTx <- tx: - sqrtSize := int(math.Sqrt(float64(size))) - time.Sleep(10 * time.Millisecond * time.Duration(sqrtSize/multiplier)) - case <-ctx.Done(): - close(chTx) return + case chTx <- tx: + // sleep for a bit before sending the + // next transaction. + waitTime := (10 * time.Millisecond) + time.Duration(rand.Int63n(int64(500*time.Millisecond))) // nolint: gosec + timer.Reset(waitTime) } + } } // loadProcess processes transactions -func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx, chSuccess chan<- types.Tx) { +func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx, chSuccess chan<- int) { // Each worker gets its own client to each usable node, which // allows for some concurrency while still bounding it. clients := make([]*rpchttp.HTTP, 0, len(testnet.Nodes)) @@ -127,8 +151,7 @@ func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx clientRing = clientRing.Next() } - var err error - + successes := 0 for { select { case <-ctx.Done(): @@ -137,19 +160,24 @@ func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx clientRing = clientRing.Next() client := clientRing.Value.(*rpchttp.HTTP) - if _, err := client.Health(ctx); err != nil { + if status, err := client.Status(ctx); err != nil { + continue + } else if status.SyncInfo.CatchingUp { continue } - if _, err = client.BroadcastTxSync(ctx, tx); err != nil { + if _, err := client.BroadcastTxSync(ctx, tx); err != nil { continue } + successes++ select { - case chSuccess <- tx: + case chSuccess <- successes: + successes = 0 // reset counter for the next iteration continue case <-ctx.Done(): return + default: } } diff --git a/test/e2e/runner/main.go b/test/e2e/runner/main.go index 5b1051c01..105e33da0 100644 --- a/test/e2e/runner/main.go +++ b/test/e2e/runner/main.go @@ -57,44 +57,48 @@ func NewCLI() *CLI { } chLoadResult := make(chan error) - ctx, loadCancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(cmd.Context()) + defer cancel() + + lctx, loadCancel := context.WithCancel(ctx) defer loadCancel() go func() { - err := Load(ctx, cli.testnet, 1) + err := Load(lctx, cli.testnet) chLoadResult <- err }() - if err := Start(cli.testnet); err != nil { + if err := Start(ctx, cli.testnet); err != nil { return err } - if err := Wait(cli.testnet, 5); err != nil { // allow some txs to go through + if err := Wait(ctx, cli.testnet, 5); err != nil { // allow some txs to go through return err } if cli.testnet.HasPerturbations() { - if err := Perturb(cli.testnet); err != nil { + if err := Perturb(ctx, cli.testnet); err != nil { return err } - if err := Wait(cli.testnet, 5); err != nil { // allow some txs to go through + if err := Wait(ctx, cli.testnet, 5); err != nil { // allow some txs to go through return err } } if cli.testnet.Evidence > 0 { - if err := InjectEvidence(cli.testnet, cli.testnet.Evidence); err != nil { + if err := InjectEvidence(ctx, cli.testnet, cli.testnet.Evidence); err != nil { return err } - if err := Wait(cli.testnet, 5); err != nil { // ensure chain progress + if err := Wait(ctx, cli.testnet, 5); err != nil { // ensure chain progress return err } } loadCancel() + if err := <-chLoadResult; err != nil { return fmt.Errorf("transaction load failed: %w", err) } - if err := Wait(cli.testnet, 5); err != nil { // wait for network to settle before tests + if err := Wait(ctx, cli.testnet, 5); err != nil { // wait for network to settle before tests return err } if err := Test(cli.testnet); err != nil { @@ -139,7 +143,7 @@ func NewCLI() *CLI { if err != nil { return err } - return Start(cli.testnet) + return Start(cmd.Context(), cli.testnet) }, }) @@ -147,7 +151,7 @@ func NewCLI() *CLI { Use: "perturb", Short: "Perturbs the Docker testnet, e.g. by restarting or disconnecting nodes", RunE: func(cmd *cobra.Command, args []string) error { - return Perturb(cli.testnet) + return Perturb(cmd.Context(), cli.testnet) }, }) @@ -155,7 +159,7 @@ func NewCLI() *CLI { Use: "wait", Short: "Waits for a few blocks to be produced and all nodes to catch up", RunE: func(cmd *cobra.Command, args []string) error { - return Wait(cli.testnet, 5) + return Wait(cmd.Context(), cli.testnet, 5) }, }) @@ -187,20 +191,10 @@ func NewCLI() *CLI { }) cli.root.AddCommand(&cobra.Command{ - Use: "load [multiplier]", - Args: cobra.MaximumNArgs(1), + Use: "load", Short: "Generates transaction load until the command is canceled", RunE: func(cmd *cobra.Command, args []string) (err error) { - m := 1 - - if len(args) == 1 { - m, err = strconv.Atoi(args[0]) - if err != nil { - return err - } - } - - return Load(context.Background(), cli.testnet, m) + return Load(context.Background(), cli.testnet) }, }) @@ -218,7 +212,7 @@ func NewCLI() *CLI { } } - return InjectEvidence(cli.testnet, amount) + return InjectEvidence(cmd.Context(), cli.testnet, amount) }, }) @@ -281,23 +275,26 @@ Does not run any perbutations. } chLoadResult := make(chan error) - ctx, loadCancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(cmd.Context()) + defer cancel() + + lctx, loadCancel := context.WithCancel(ctx) defer loadCancel() go func() { - err := Load(ctx, cli.testnet, 1) + err := Load(lctx, cli.testnet) chLoadResult <- err }() - if err := Start(cli.testnet); err != nil { + if err := Start(ctx, cli.testnet); err != nil { return err } - if err := Wait(cli.testnet, 5); err != nil { // allow some txs to go through + if err := Wait(ctx, cli.testnet, 5); err != nil { // allow some txs to go through return err } // we benchmark performance over the next 100 blocks - if err := Benchmark(cli.testnet, 100); err != nil { + if err := Benchmark(ctx, cli.testnet, 100); err != nil { return err } diff --git a/test/e2e/runner/perturb.go b/test/e2e/runner/perturb.go index 8fb6ec726..900f75d73 100644 --- a/test/e2e/runner/perturb.go +++ b/test/e2e/runner/perturb.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "time" @@ -9,14 +10,24 @@ import ( ) // Perturbs a running testnet. -func Perturb(testnet *e2e.Testnet) error { +func Perturb(ctx context.Context, testnet *e2e.Testnet) error { + timer := time.NewTimer(0) // first tick fires immediately; reset below + defer timer.Stop() + for _, node := range testnet.Nodes { for _, perturbation := range node.Perturbations { - _, err := PerturbNode(node, perturbation) - if err != nil { - return err + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + _, err := PerturbNode(ctx, node, perturbation) + if err != nil { + return err + } + + // give network some time to recover between each + timer.Reset(20 * time.Second) } - time.Sleep(20 * time.Second) // give network some time to recover between each } } return nil @@ -24,7 +35,7 @@ func Perturb(testnet *e2e.Testnet) error { // PerturbNode perturbs a node with a given perturbation, returning its status // after recovering. -func PerturbNode(node *e2e.Node, perturbation e2e.Perturbation) (*rpctypes.ResultStatus, error) { +func PerturbNode(ctx context.Context, node *e2e.Node, perturbation e2e.Perturbation) (*rpctypes.ResultStatus, error) { testnet := node.Testnet switch perturbation { case e2e.PerturbationDisconnect: @@ -77,7 +88,9 @@ func PerturbNode(node *e2e.Node, perturbation e2e.Perturbation) (*rpctypes.Resul return nil, nil } - status, err := waitForNode(node, 0, 3*time.Minute) + ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + status, err := waitForNode(ctx, node, 0) if err != nil { return nil, err } diff --git a/test/e2e/runner/rpc.go b/test/e2e/runner/rpc.go index f7b9fd1c9..225097eb5 100644 --- a/test/e2e/runner/rpc.go +++ b/test/e2e/runner/rpc.go @@ -15,7 +15,7 @@ import ( // waitForHeight waits for the network to reach a certain height (or above), // returning the highest height seen. Errors if the network is not making // progress at all. -func waitForHeight(testnet *e2e.Testnet, height int64) (*types.Block, *types.BlockID, error) { +func waitForHeight(ctx context.Context, testnet *e2e.Testnet, height int64) (*types.Block, *types.BlockID, error) { var ( err error maxResult *rpctypes.ResultBlock @@ -25,11 +25,7 @@ func waitForHeight(testnet *e2e.Testnet, height int64) (*types.Block, *types.Blo numRunningNodes int ) for _, node := range testnet.Nodes { - if node.Mode == e2e.ModeSeed { - continue - } - - if node.Mode == e2e.ModeLight { + if node.Stateless() { continue } @@ -38,86 +34,90 @@ func waitForHeight(testnet *e2e.Testnet, height int64) (*types.Block, *types.Blo } } + timer := time.NewTimer(0) + defer timer.Stop() for { - for _, node := range testnet.Nodes { - // skip nodes that have reached the target height - if _, ok := nodesAtHeight[node.Name]; ok { - continue - } + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + case <-timer.C: + for _, node := range testnet.Nodes { + // skip nodes that have reached the target height + if _, ok := nodesAtHeight[node.Name]; ok { + continue + } - if node.Mode == e2e.ModeSeed { - continue - } + if node.Stateless() { + continue + } - if node.Mode == e2e.ModeLight { - continue - } + if !node.HasStarted { + continue + } - if !node.HasStarted { - continue - } + // cache the clients + client, ok := clients[node.Name] + if !ok { + client, err = node.Client() + if err != nil { + continue + } + clients[node.Name] = client + } - // cache the clients - client, ok := clients[node.Name] - if !ok { - client, err = node.Client() + wctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + result, err := client.Block(wctx, nil) if err != nil { continue } - clients[node.Name] = client - } + if result.Block != nil && (maxResult == nil || result.Block.Height > maxResult.Block.Height) { + maxResult = result + lastIncrease = time.Now() + } - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - result, err := client.Block(ctx, nil) - if err != nil { - continue - } - if result.Block != nil && (maxResult == nil || result.Block.Height > maxResult.Block.Height) { - maxResult = result - lastIncrease = time.Now() - } + if maxResult != nil && maxResult.Block.Height >= height { + // the node has achieved the target height! - if maxResult != nil && maxResult.Block.Height >= height { - // the node has achieved the target height! + // add this node to the set of target + // height nodes + nodesAtHeight[node.Name] = struct{}{} - // add this node to the set of target - // height nodes - nodesAtHeight[node.Name] = struct{}{} + // if not all of the nodes that we + // have clients for have reached the + // target height, keep trying. + if numRunningNodes > len(nodesAtHeight) { + continue + } - // if not all of the nodes that we - // have clients for have reached the - // target height, keep trying. - if numRunningNodes > len(nodesAtHeight) { - continue + // return once all nodes have reached + // the target height. + return maxResult.Block, &maxResult.BlockID, nil } - - // return once all nodes have reached - // the target height. - return maxResult.Block, &maxResult.BlockID, nil } - } - if len(clients) == 0 { - return nil, nil, errors.New("unable to connect to any network nodes") - } - if time.Since(lastIncrease) >= time.Minute { - if maxResult == nil { - return nil, nil, errors.New("chain stalled at unknown height") + if len(clients) == 0 { + return nil, nil, errors.New("unable to connect to any network nodes") } + if time.Since(lastIncrease) >= time.Minute { + if maxResult == nil { + return nil, nil, errors.New("chain stalled at unknown height") + } - return nil, nil, fmt.Errorf("chain stalled at height %v [%d of %d nodes]", - maxResult.Block.Height, - len(nodesAtHeight), - numRunningNodes) + return nil, nil, fmt.Errorf("chain stalled at height %v [%d of %d nodes %+v]", + maxResult.Block.Height, + len(nodesAtHeight), + numRunningNodes, + nodesAtHeight) + } + timer.Reset(1 * time.Second) } - time.Sleep(1 * time.Second) } } // waitForNode waits for a node to become available and catch up to the given block height. -func waitForNode(node *e2e.Node, height int64, timeout time.Duration) (*rpctypes.ResultStatus, error) { +func waitForNode(ctx context.Context, node *e2e.Node, height int64) (*rpctypes.ResultStatus, error) { if node.Mode == e2e.ModeSeed { return nil, nil } @@ -126,42 +126,59 @@ func waitForNode(node *e2e.Node, height int64, timeout time.Duration) (*rpctypes return nil, err } - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() + timer := time.NewTimer(0) + defer timer.Stop() + var ( + lastFailed bool + counter int + ) for { - status, err := client.Status(ctx) - switch { - case errors.Is(err, context.DeadlineExceeded): - return nil, fmt.Errorf("timed out waiting for %v to reach height %v", node.Name, height) - case errors.Is(err, context.Canceled): - return nil, err - case err == nil && status.SyncInfo.LatestBlockHeight >= height: - return status, nil - } - - time.Sleep(300 * time.Millisecond) - } -} - -// waitForAllNodes waits for all nodes to become available and catch up to the given block height. -func waitForAllNodes(testnet *e2e.Testnet, height int64, timeout time.Duration) (int64, error) { - var lastHeight int64 - - for _, node := range testnet.Nodes { - if node.Mode == e2e.ModeSeed { - continue - } - - status, err := waitForNode(node, height, timeout) - if err != nil { - return 0, err + counter++ + if lastFailed { + lastFailed = false + + // if there was a problem with the request in + // the previous recreate the client to ensure + // reconnection + client, err = node.Client() + if err != nil { + return nil, err + } } - if status.SyncInfo.LatestBlockHeight > lastHeight { - lastHeight = status.SyncInfo.LatestBlockHeight + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-timer.C: + status, err := client.Status(ctx) + switch { + case errors.Is(err, context.DeadlineExceeded): + return nil, fmt.Errorf("timed out waiting for %v to reach height %v", node.Name, height) + case errors.Is(err, context.Canceled): + return nil, err + case err == nil && status.SyncInfo.LatestBlockHeight >= height: + return status, nil + case counter%50 == 0: + switch { + case err != nil: + lastFailed = true + logger.Error("node not yet ready", + "iter", counter, + "node", node.Name, + "err", err, + "target", height, + ) + case status != nil: + logger.Error("node not yet ready", + "iter", counter, + "node", node.Name, + "height", status.SyncInfo.LatestBlockHeight, + "target", height, + ) + } + } + timer.Reset(250 * time.Millisecond) } } - - return lastHeight, nil } diff --git a/test/e2e/runner/start.go b/test/e2e/runner/start.go index 1d0f02eb4..967d2519c 100644 --- a/test/e2e/runner/start.go +++ b/test/e2e/runner/start.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "sort" "time" @@ -8,7 +9,7 @@ import ( e2e "github.com/tendermint/tendermint/test/e2e/pkg" ) -func Start(testnet *e2e.Testnet) error { +func Start(ctx context.Context, testnet *e2e.Testnet) error { if len(testnet.Nodes) == 0 { return fmt.Errorf("no nodes in testnet") } @@ -45,7 +46,14 @@ func Start(testnet *e2e.Testnet) error { if err := execCompose(testnet.Dir, "up", "-d", node.Name); err != nil { return err } - if _, err := waitForNode(node, 0, time.Minute); err != nil { + + if err := func() error { + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + + _, err := waitForNode(ctx, node, 0) + return err + }(); err != nil { return err } node.HasStarted = true @@ -60,7 +68,7 @@ func Start(testnet *e2e.Testnet) error { "nodes", len(testnet.Nodes)-len(nodeQueue), "pending", len(nodeQueue)) - block, blockID, err := waitForHeight(testnet, networkHeight) + block, blockID, err := waitForHeight(ctx, testnet, networkHeight) if err != nil { return err } @@ -74,9 +82,16 @@ func Start(testnet *e2e.Testnet) error { // that this node will start at before we // start the node. + logger.Info("Waiting for network to advance to height", + "node", node.Name, + "last_height", networkHeight, + "waiting_for", node.StartAt, + "size", len(testnet.Nodes)-len(nodeQueue), + "pending", len(nodeQueue)) + networkHeight = node.StartAt - block, blockID, err = waitForHeight(testnet, networkHeight) + block, blockID, err = waitForHeight(ctx, testnet, networkHeight) if err != nil { return err } @@ -93,10 +108,15 @@ func Start(testnet *e2e.Testnet) error { if err := execCompose(testnet.Dir, "up", "-d", node.Name); err != nil { return err } - status, err := waitForNode(node, node.StartAt, 8*time.Minute) + + wctx, wcancel := context.WithTimeout(ctx, 8*time.Minute) + status, err := waitForNode(wctx, node, node.StartAt) if err != nil { + wcancel() return err } + wcancel() + node.HasStarted = true logger.Info(fmt.Sprintf("Node %v up on http://127.0.0.1:%v at height %v", node.Name, node.ProxyPort, status.SyncInfo.LatestBlockHeight)) diff --git a/test/e2e/runner/wait.go b/test/e2e/runner/wait.go index 9f3a4c438..7bba21e66 100644 --- a/test/e2e/runner/wait.go +++ b/test/e2e/runner/wait.go @@ -1,31 +1,27 @@ package main import ( + "context" "fmt" - "time" e2e "github.com/tendermint/tendermint/test/e2e/pkg" ) // Wait waits for a number of blocks to be produced, and for all nodes to catch // up with it. -func Wait(testnet *e2e.Testnet, blocks int64) error { - block, _, err := waitForHeight(testnet, 0) +func Wait(ctx context.Context, testnet *e2e.Testnet, blocks int64) error { + block, _, err := waitForHeight(ctx, testnet, 0) if err != nil { return err } - return WaitUntil(testnet, block.Height+blocks) + return WaitUntil(ctx, testnet, block.Height+blocks) } // WaitUntil waits until a given height has been reached. -func WaitUntil(testnet *e2e.Testnet, height int64) error { +func WaitUntil(ctx context.Context, testnet *e2e.Testnet, height int64) error { logger.Info(fmt.Sprintf("Waiting for all nodes to reach height %v...", height)) - _, err := waitForAllNodes(testnet, height, waitingTime(len(testnet.Nodes))) - return err -} -// waitingTime estimates how long it should take for a node to reach the height. -// More nodes in a network implies we may expect a slower network and may have to wait longer. -func waitingTime(nodes int) time.Duration { - return time.Minute + (time.Duration(nodes) * (30 * time.Second)) + _, _, err := waitForHeight(ctx, testnet, height) + + return err } diff --git a/test/e2e/tests/app_test.go b/test/e2e/tests/app_test.go index be273526e..637de6c83 100644 --- a/test/e2e/tests/app_test.go +++ b/test/e2e/tests/app_test.go @@ -44,7 +44,7 @@ func TestApp_Hash(t *testing.T) { block, err := client.Block(ctx, nil) require.NoError(t, err) - require.EqualValues(t, info.Response.LastBlockAppHash, block.Block.AppHash, + require.EqualValues(t, info.Response.LastBlockAppHash, block.Block.AppHash.Bytes(), "app hash does not match last block's app hash") status, err := client.Status(ctx) @@ -62,9 +62,8 @@ func TestApp_Tx(t *testing.T) { // Generate a random value, to prevent duplicate tx errors when // manually running the test multiple times for a testnet. - r := rand.New(rand.NewSource(time.Now().UnixNano())) bz := make([]byte, 32) - _, err = r.Read(bz) + _, err = rand.Read(bz) require.NoError(t, err) key := fmt.Sprintf("testapp-tx-%v", node.Name) @@ -75,7 +74,7 @@ func TestApp_Tx(t *testing.T) { require.NoError(t, err) hash := tx.Hash() - waitTime := 20 * time.Second + const waitTime = time.Minute require.Eventuallyf(t, func() bool { txResp, err := client.Tx(ctx, hash, false)