Browse Source

e2e: load generation and logging changes (#6912)

pull/6924/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
af71f1cbcb
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 279 additions and 199 deletions
  1. +1
    -1
      test/e2e/generator/generate.go
  2. +6
    -4
      test/e2e/networks/ci.toml
  3. +6
    -4
      test/e2e/runner/benchmark.go
  4. +9
    -3
      test/e2e/runner/evidence.go
  5. +62
    -34
      test/e2e/runner/load.go
  6. +27
    -30
      test/e2e/runner/main.go
  7. +20
    -7
      test/e2e/runner/perturb.go
  8. +112
    -95
      test/e2e/runner/rpc.go
  9. +25
    -5
      test/e2e/runner/start.go
  10. +8
    -12
      test/e2e/runner/wait.go
  11. +3
    -4
      test/e2e/tests/app_test.go

+ 1
- 1
test/e2e/generator/generate.go View File

@ -90,7 +90,7 @@ func Generate(r *rand.Rand, opts Options) ([]e2e.Manifest, error) {
if opts.Sorted { if opts.Sorted {
// When the sorted flag is set (generally, as long as // When the sorted flag is set (generally, as long as
// groups arent)
// groups aren't set),
e2e.SortManifests(manifests) e2e.SortManifests(manifests)
} }


+ 6
- 4
test/e2e/networks/ci.toml View File

@ -43,6 +43,7 @@ persist_interval = 0
perturb = ["restart"] perturb = ["restart"]
privval_protocol = "tcp" privval_protocol = "tcp"
seeds = ["seed01"] seeds = ["seed01"]
block_sync = "v0"
[node.validator03] [node.validator03]
database = "badgerdb" database = "badgerdb"
@ -51,6 +52,7 @@ abci_protocol = "grpc"
persist_interval = 3 persist_interval = 3
perturb = ["kill"] perturb = ["kill"]
privval_protocol = "grpc" privval_protocol = "grpc"
block_sync = "v0"
retain_blocks = 7 retain_blocks = 7
[node.validator04] [node.validator04]
@ -59,12 +61,13 @@ snapshot_interval = 5
database = "rocksdb" database = "rocksdb"
persistent_peers = ["validator01"] persistent_peers = ["validator01"]
perturb = ["pause"] perturb = ["pause"]
block_sync = "v0"
[node.validator05] [node.validator05]
database = "cleveldb"
block_sync = "v0"
database = "cleveldb"
block_sync = "v0"
state_sync = "p2p" state_sync = "p2p"
seeds = ["seed01"]
seeds = ["seed01"]
start_at = 1005 # Becomes part of the validator set at 1010 start_at = 1005 # Becomes part of the validator set at 1010
abci_protocol = "grpc" abci_protocol = "grpc"
perturb = ["pause", "disconnect", "restart"] perturb = ["pause", "disconnect", "restart"]
@ -73,7 +76,6 @@ privval_protocol = "tcp"
[node.full01] [node.full01]
mode = "full" mode = "full"
start_at = 1010 start_at = 1010
# FIXME: should be v2, disabled due to flake
block_sync = "v0" block_sync = "v0"
persistent_peers = ["validator01", "validator02", "validator03", "validator04"] persistent_peers = ["validator01", "validator02", "validator03", "validator04"]
perturb = ["restart"] perturb = ["restart"]


+ 6
- 4
test/e2e/runner/benchmark.go View File

@ -21,8 +21,8 @@ import (
// //
// Metrics are based of the `benchmarkLength`, the amount of consecutive blocks // Metrics are based of the `benchmarkLength`, the amount of consecutive blocks
// sampled from in the testnet // sampled from in the testnet
func Benchmark(testnet *e2e.Testnet, benchmarkLength int64) error {
block, _, err := waitForHeight(testnet, 0)
func Benchmark(ctx context.Context, testnet *e2e.Testnet, benchmarkLength int64) error {
block, _, err := waitForHeight(ctx, testnet, 0)
if err != nil { if err != nil {
return err return err
} }
@ -32,13 +32,15 @@ func Benchmark(testnet *e2e.Testnet, benchmarkLength int64) error {
// wait for the length of the benchmark period in blocks to pass. We allow 5 seconds for each block // wait for the length of the benchmark period in blocks to pass. We allow 5 seconds for each block
// which should be sufficient. // which should be sufficient.
waitingTime := time.Duration(benchmarkLength*5) * time.Second waitingTime := time.Duration(benchmarkLength*5) * time.Second
endHeight, err := waitForAllNodes(testnet, block.Height+benchmarkLength, waitingTime)
ctx, cancel := context.WithTimeout(ctx, waitingTime)
defer cancel()
block, _, err = waitForHeight(ctx, testnet, block.Height+benchmarkLength)
if err != nil { if err != nil {
return err return err
} }
dur := time.Since(startAt) dur := time.Since(startAt)
logger.Info("Ending benchmark period", "height", endHeight)
logger.Info("Ending benchmark period", "height", block.Height)
// fetch a sample of blocks // fetch a sample of blocks
blocks, err := fetchBlockChainSample(testnet, benchmarkLength) blocks, err := fetchBlockChainSample(testnet, benchmarkLength)


+ 9
- 3
test/e2e/runner/evidence.go View File

@ -28,7 +28,7 @@ const lightClientEvidenceRatio = 4
// evidence and broadcasts it to a random node through the rpc endpoint `/broadcast_evidence`. // evidence and broadcasts it to a random node through the rpc endpoint `/broadcast_evidence`.
// Evidence is random and can be a mixture of LightClientAttackEvidence and // Evidence is random and can be a mixture of LightClientAttackEvidence and
// DuplicateVoteEvidence. // DuplicateVoteEvidence.
func InjectEvidence(testnet *e2e.Testnet, amount int) error {
func InjectEvidence(ctx context.Context, testnet *e2e.Testnet, amount int) error {
// select a random node // select a random node
var targetNode *e2e.Node var targetNode *e2e.Node
@ -79,9 +79,12 @@ func InjectEvidence(testnet *e2e.Testnet, amount int) error {
return err return err
} }
wctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
// wait for the node to reach the height above the forged height so that // wait for the node to reach the height above the forged height so that
// it is able to validate the evidence // it is able to validate the evidence
_, err = waitForNode(targetNode, waitHeight, 30*time.Second)
_, err = waitForNode(wctx, targetNode, waitHeight)
if err != nil { if err != nil {
return err return err
} }
@ -107,9 +110,12 @@ func InjectEvidence(testnet *e2e.Testnet, amount int) error {
} }
} }
wctx, cancel = context.WithTimeout(ctx, 10*time.Second)
defer cancel()
// wait for the node to reach the height above the forged height so that // wait for the node to reach the height above the forged height so that
// it is able to validate the evidence // it is able to validate the evidence
_, err = waitForNode(targetNode, blockRes.Block.Height+2, 10*time.Second)
_, err = waitForNode(wctx, targetNode, blockRes.Block.Height+2)
if err != nil { if err != nil {
return err return err
} }


+ 62
- 34
test/e2e/runner/load.go View File

@ -3,10 +3,9 @@ package main
import ( import (
"container/ring" "container/ring"
"context" "context"
"crypto/rand"
"errors" "errors"
"fmt" "fmt"
"math"
"math/rand"
"time" "time"
rpchttp "github.com/tendermint/tendermint/rpc/client/http" rpchttp "github.com/tendermint/tendermint/rpc/client/http"
@ -15,9 +14,8 @@ import (
) )
// Load generates transactions against the network until the given context is // Load generates transactions against the network until the given context is
// canceled. A multiplier of greater than one can be supplied if load needs to
// be generated beyond a minimum amount.
func Load(ctx context.Context, testnet *e2e.Testnet, multiplier int) error {
// canceled.
func Load(ctx context.Context, testnet *e2e.Testnet) error {
// Since transactions are executed across all nodes in the network, we need // Since transactions are executed across all nodes in the network, we need
// to reduce transaction load for larger networks to avoid using too much // to reduce transaction load for larger networks to avoid using too much
// CPU. This gives high-throughput small networks and low-throughput large ones. // CPU. This gives high-throughput small networks and low-throughput large ones.
@ -27,11 +25,9 @@ func Load(ctx context.Context, testnet *e2e.Testnet, multiplier int) error {
if concurrency == 0 { if concurrency == 0 {
concurrency = 1 concurrency = 1
} }
initialTimeout := 1 * time.Minute
stallTimeout := 30 * time.Second
chTx := make(chan types.Tx) chTx := make(chan types.Tx)
chSuccess := make(chan types.Tx)
chSuccess := make(chan int) // success counts per iteration
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
@ -39,61 +35,89 @@ func Load(ctx context.Context, testnet *e2e.Testnet, multiplier int) error {
logger.Info(fmt.Sprintf("Starting transaction load (%v workers)...", concurrency)) logger.Info(fmt.Sprintf("Starting transaction load (%v workers)...", concurrency))
started := time.Now() started := time.Now()
go loadGenerate(ctx, chTx, multiplier, testnet.TxSize)
go loadGenerate(ctx, chTx, testnet.TxSize)
for w := 0; w < concurrency; w++ { for w := 0; w < concurrency; w++ {
go loadProcess(ctx, testnet, chTx, chSuccess) go loadProcess(ctx, testnet, chTx, chSuccess)
} }
// Monitor successful transactions, and abort on stalls.
// Montior transaction to ensure load propagates to the network
//
// This loop doesn't check or time out for stalls, since a stall here just
// aborts the load generator sooner and could obscure backpressure
// from the test harness, and there are other checks for
// stalls in the framework. Ideally we should monitor latency as a guide
// for when to give up, but we don't have a good way to track that yet.
success := 0 success := 0
timeout := initialTimeout
for { for {
select { select {
case <-chSuccess:
success++
timeout = stallTimeout
case <-time.After(timeout):
return fmt.Errorf("unable to submit transactions for %v", timeout)
case numSeen := <-chSuccess:
success += numSeen
case <-ctx.Done(): case <-ctx.Done():
if success == 0 { if success == 0 {
return errors.New("failed to submit any transactions") return errors.New("failed to submit any transactions")
} }
logger.Info(fmt.Sprintf("Ending transaction load after %v txs (%.1f tx/s)...",
success, float64(success)/time.Since(started).Seconds()))
rate := float64(success) / time.Since(started).Seconds()
logger.Info("ending transaction load",
"dur_secs", time.Since(started).Seconds(),
"txns", success,
"rate", rate)
if rate < 2 {
logger.Error("transaction throughput was low",
"rate", rate)
}
return nil return nil
} }
} }
} }
// loadGenerate generates jobs until the context is canceled
func loadGenerate(ctx context.Context, chTx chan<- types.Tx, multiplier int, size int64) {
for i := 0; i < math.MaxInt64; i++ {
// loadGenerate generates jobs until the context is canceled.
//
// The chTx has multiple consumers, thus the rate limiting of the load
// generation is primarily the result of backpressure from the
// broadcast transaction, though at most one transaction will be
// produced every 10ms.
func loadGenerate(ctx context.Context, chTx chan<- types.Tx, size int64) {
timer := time.NewTimer(0)
defer timer.Stop()
defer close(chTx)
for {
select {
case <-ctx.Done():
return
case <-timer.C:
}
// We keep generating the same 100 keys over and over, with different values. // We keep generating the same 100 keys over and over, with different values.
// This gives a reasonable load without putting too much data in the app. // This gives a reasonable load without putting too much data in the app.
id := i % 100
id := rand.Int63() % 100 // nolint: gosec
bz := make([]byte, size) bz := make([]byte, size)
_, err := rand.Read(bz)
_, err := rand.Read(bz) // nolint: gosec
if err != nil { if err != nil {
panic(fmt.Sprintf("Failed to read random bytes: %v", err)) panic(fmt.Sprintf("Failed to read random bytes: %v", err))
} }
tx := types.Tx(fmt.Sprintf("load-%X=%x", id, bz)) tx := types.Tx(fmt.Sprintf("load-%X=%x", id, bz))
select { select {
case chTx <- tx:
sqrtSize := int(math.Sqrt(float64(size)))
time.Sleep(10 * time.Millisecond * time.Duration(sqrtSize/multiplier))
case <-ctx.Done(): case <-ctx.Done():
close(chTx)
return return
case chTx <- tx:
// sleep for a bit before sending the
// next transaction.
waitTime := (10 * time.Millisecond) + time.Duration(rand.Int63n(int64(500*time.Millisecond))) // nolint: gosec
timer.Reset(waitTime)
} }
} }
} }
// loadProcess processes transactions // loadProcess processes transactions
func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx, chSuccess chan<- types.Tx) {
func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx, chSuccess chan<- int) {
// Each worker gets its own client to each usable node, which // Each worker gets its own client to each usable node, which
// allows for some concurrency while still bounding it. // allows for some concurrency while still bounding it.
clients := make([]*rpchttp.HTTP, 0, len(testnet.Nodes)) clients := make([]*rpchttp.HTTP, 0, len(testnet.Nodes))
@ -127,8 +151,7 @@ func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx
clientRing = clientRing.Next() clientRing = clientRing.Next()
} }
var err error
successes := 0
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -137,19 +160,24 @@ func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx
clientRing = clientRing.Next() clientRing = clientRing.Next()
client := clientRing.Value.(*rpchttp.HTTP) client := clientRing.Value.(*rpchttp.HTTP)
if _, err := client.Health(ctx); err != nil {
if status, err := client.Status(ctx); err != nil {
continue
} else if status.SyncInfo.CatchingUp {
continue continue
} }
if _, err = client.BroadcastTxSync(ctx, tx); err != nil {
if _, err := client.BroadcastTxSync(ctx, tx); err != nil {
continue continue
} }
successes++
select { select {
case chSuccess <- tx:
case chSuccess <- successes:
successes = 0 // reset counter for the next iteration
continue continue
case <-ctx.Done(): case <-ctx.Done():
return return
default:
} }
} }


+ 27
- 30
test/e2e/runner/main.go View File

@ -57,44 +57,48 @@ func NewCLI() *CLI {
} }
chLoadResult := make(chan error) chLoadResult := make(chan error)
ctx, loadCancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()
lctx, loadCancel := context.WithCancel(ctx)
defer loadCancel() defer loadCancel()
go func() { go func() {
err := Load(ctx, cli.testnet, 1)
err := Load(lctx, cli.testnet)
chLoadResult <- err chLoadResult <- err
}() }()
if err := Start(cli.testnet); err != nil {
if err := Start(ctx, cli.testnet); err != nil {
return err return err
} }
if err := Wait(cli.testnet, 5); err != nil { // allow some txs to go through
if err := Wait(ctx, cli.testnet, 5); err != nil { // allow some txs to go through
return err return err
} }
if cli.testnet.HasPerturbations() { if cli.testnet.HasPerturbations() {
if err := Perturb(cli.testnet); err != nil {
if err := Perturb(ctx, cli.testnet); err != nil {
return err return err
} }
if err := Wait(cli.testnet, 5); err != nil { // allow some txs to go through
if err := Wait(ctx, cli.testnet, 5); err != nil { // allow some txs to go through
return err return err
} }
} }
if cli.testnet.Evidence > 0 { if cli.testnet.Evidence > 0 {
if err := InjectEvidence(cli.testnet, cli.testnet.Evidence); err != nil {
if err := InjectEvidence(ctx, cli.testnet, cli.testnet.Evidence); err != nil {
return err return err
} }
if err := Wait(cli.testnet, 5); err != nil { // ensure chain progress
if err := Wait(ctx, cli.testnet, 5); err != nil { // ensure chain progress
return err return err
} }
} }
loadCancel() loadCancel()
if err := <-chLoadResult; err != nil { if err := <-chLoadResult; err != nil {
return fmt.Errorf("transaction load failed: %w", err) return fmt.Errorf("transaction load failed: %w", err)
} }
if err := Wait(cli.testnet, 5); err != nil { // wait for network to settle before tests
if err := Wait(ctx, cli.testnet, 5); err != nil { // wait for network to settle before tests
return err return err
} }
if err := Test(cli.testnet); err != nil { if err := Test(cli.testnet); err != nil {
@ -139,7 +143,7 @@ func NewCLI() *CLI {
if err != nil { if err != nil {
return err return err
} }
return Start(cli.testnet)
return Start(cmd.Context(), cli.testnet)
}, },
}) })
@ -147,7 +151,7 @@ func NewCLI() *CLI {
Use: "perturb", Use: "perturb",
Short: "Perturbs the Docker testnet, e.g. by restarting or disconnecting nodes", Short: "Perturbs the Docker testnet, e.g. by restarting or disconnecting nodes",
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
return Perturb(cli.testnet)
return Perturb(cmd.Context(), cli.testnet)
}, },
}) })
@ -155,7 +159,7 @@ func NewCLI() *CLI {
Use: "wait", Use: "wait",
Short: "Waits for a few blocks to be produced and all nodes to catch up", Short: "Waits for a few blocks to be produced and all nodes to catch up",
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
return Wait(cli.testnet, 5)
return Wait(cmd.Context(), cli.testnet, 5)
}, },
}) })
@ -187,20 +191,10 @@ func NewCLI() *CLI {
}) })
cli.root.AddCommand(&cobra.Command{ cli.root.AddCommand(&cobra.Command{
Use: "load [multiplier]",
Args: cobra.MaximumNArgs(1),
Use: "load",
Short: "Generates transaction load until the command is canceled", Short: "Generates transaction load until the command is canceled",
RunE: func(cmd *cobra.Command, args []string) (err error) { RunE: func(cmd *cobra.Command, args []string) (err error) {
m := 1
if len(args) == 1 {
m, err = strconv.Atoi(args[0])
if err != nil {
return err
}
}
return Load(context.Background(), cli.testnet, m)
return Load(context.Background(), cli.testnet)
}, },
}) })
@ -218,7 +212,7 @@ func NewCLI() *CLI {
} }
} }
return InjectEvidence(cli.testnet, amount)
return InjectEvidence(cmd.Context(), cli.testnet, amount)
}, },
}) })
@ -281,23 +275,26 @@ Does not run any perbutations.
} }
chLoadResult := make(chan error) chLoadResult := make(chan error)
ctx, loadCancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()
lctx, loadCancel := context.WithCancel(ctx)
defer loadCancel() defer loadCancel()
go func() { go func() {
err := Load(ctx, cli.testnet, 1)
err := Load(lctx, cli.testnet)
chLoadResult <- err chLoadResult <- err
}() }()
if err := Start(cli.testnet); err != nil {
if err := Start(ctx, cli.testnet); err != nil {
return err return err
} }
if err := Wait(cli.testnet, 5); err != nil { // allow some txs to go through
if err := Wait(ctx, cli.testnet, 5); err != nil { // allow some txs to go through
return err return err
} }
// we benchmark performance over the next 100 blocks // we benchmark performance over the next 100 blocks
if err := Benchmark(cli.testnet, 100); err != nil {
if err := Benchmark(ctx, cli.testnet, 100); err != nil {
return err return err
} }


+ 20
- 7
test/e2e/runner/perturb.go View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"fmt" "fmt"
"time" "time"
@ -9,14 +10,24 @@ import (
) )
// Perturbs a running testnet. // Perturbs a running testnet.
func Perturb(testnet *e2e.Testnet) error {
func Perturb(ctx context.Context, testnet *e2e.Testnet) error {
timer := time.NewTimer(0) // first tick fires immediately; reset below
defer timer.Stop()
for _, node := range testnet.Nodes { for _, node := range testnet.Nodes {
for _, perturbation := range node.Perturbations { for _, perturbation := range node.Perturbations {
_, err := PerturbNode(node, perturbation)
if err != nil {
return err
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
_, err := PerturbNode(ctx, node, perturbation)
if err != nil {
return err
}
// give network some time to recover between each
timer.Reset(20 * time.Second)
} }
time.Sleep(20 * time.Second) // give network some time to recover between each
} }
} }
return nil return nil
@ -24,7 +35,7 @@ func Perturb(testnet *e2e.Testnet) error {
// PerturbNode perturbs a node with a given perturbation, returning its status // PerturbNode perturbs a node with a given perturbation, returning its status
// after recovering. // after recovering.
func PerturbNode(node *e2e.Node, perturbation e2e.Perturbation) (*rpctypes.ResultStatus, error) {
func PerturbNode(ctx context.Context, node *e2e.Node, perturbation e2e.Perturbation) (*rpctypes.ResultStatus, error) {
testnet := node.Testnet testnet := node.Testnet
switch perturbation { switch perturbation {
case e2e.PerturbationDisconnect: case e2e.PerturbationDisconnect:
@ -77,7 +88,9 @@ func PerturbNode(node *e2e.Node, perturbation e2e.Perturbation) (*rpctypes.Resul
return nil, nil return nil, nil
} }
status, err := waitForNode(node, 0, 3*time.Minute)
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
status, err := waitForNode(ctx, node, 0)
if err != nil { if err != nil {
return nil, err return nil, err
} }


+ 112
- 95
test/e2e/runner/rpc.go View File

@ -15,7 +15,7 @@ import (
// waitForHeight waits for the network to reach a certain height (or above), // waitForHeight waits for the network to reach a certain height (or above),
// returning the highest height seen. Errors if the network is not making // returning the highest height seen. Errors if the network is not making
// progress at all. // progress at all.
func waitForHeight(testnet *e2e.Testnet, height int64) (*types.Block, *types.BlockID, error) {
func waitForHeight(ctx context.Context, testnet *e2e.Testnet, height int64) (*types.Block, *types.BlockID, error) {
var ( var (
err error err error
maxResult *rpctypes.ResultBlock maxResult *rpctypes.ResultBlock
@ -25,11 +25,7 @@ func waitForHeight(testnet *e2e.Testnet, height int64) (*types.Block, *types.Blo
numRunningNodes int numRunningNodes int
) )
for _, node := range testnet.Nodes { for _, node := range testnet.Nodes {
if node.Mode == e2e.ModeSeed {
continue
}
if node.Mode == e2e.ModeLight {
if node.Stateless() {
continue continue
} }
@ -38,86 +34,90 @@ func waitForHeight(testnet *e2e.Testnet, height int64) (*types.Block, *types.Blo
} }
} }
timer := time.NewTimer(0)
defer timer.Stop()
for { for {
for _, node := range testnet.Nodes {
// skip nodes that have reached the target height
if _, ok := nodesAtHeight[node.Name]; ok {
continue
}
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
case <-timer.C:
for _, node := range testnet.Nodes {
// skip nodes that have reached the target height
if _, ok := nodesAtHeight[node.Name]; ok {
continue
}
if node.Mode == e2e.ModeSeed {
continue
}
if node.Stateless() {
continue
}
if node.Mode == e2e.ModeLight {
continue
}
if !node.HasStarted {
continue
}
if !node.HasStarted {
continue
}
// cache the clients
client, ok := clients[node.Name]
if !ok {
client, err = node.Client()
if err != nil {
continue
}
clients[node.Name] = client
}
// cache the clients
client, ok := clients[node.Name]
if !ok {
client, err = node.Client()
wctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
result, err := client.Block(wctx, nil)
if err != nil { if err != nil {
continue continue
} }
clients[node.Name] = client
}
if result.Block != nil && (maxResult == nil || result.Block.Height > maxResult.Block.Height) {
maxResult = result
lastIncrease = time.Now()
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
result, err := client.Block(ctx, nil)
if err != nil {
continue
}
if result.Block != nil && (maxResult == nil || result.Block.Height > maxResult.Block.Height) {
maxResult = result
lastIncrease = time.Now()
}
if maxResult != nil && maxResult.Block.Height >= height {
// the node has achieved the target height!
if maxResult != nil && maxResult.Block.Height >= height {
// the node has achieved the target height!
// add this node to the set of target
// height nodes
nodesAtHeight[node.Name] = struct{}{}
// add this node to the set of target
// height nodes
nodesAtHeight[node.Name] = struct{}{}
// if not all of the nodes that we
// have clients for have reached the
// target height, keep trying.
if numRunningNodes > len(nodesAtHeight) {
continue
}
// if not all of the nodes that we
// have clients for have reached the
// target height, keep trying.
if numRunningNodes > len(nodesAtHeight) {
continue
// return once all nodes have reached
// the target height.
return maxResult.Block, &maxResult.BlockID, nil
} }
// return once all nodes have reached
// the target height.
return maxResult.Block, &maxResult.BlockID, nil
} }
}
if len(clients) == 0 {
return nil, nil, errors.New("unable to connect to any network nodes")
}
if time.Since(lastIncrease) >= time.Minute {
if maxResult == nil {
return nil, nil, errors.New("chain stalled at unknown height")
if len(clients) == 0 {
return nil, nil, errors.New("unable to connect to any network nodes")
} }
if time.Since(lastIncrease) >= time.Minute {
if maxResult == nil {
return nil, nil, errors.New("chain stalled at unknown height")
}
return nil, nil, fmt.Errorf("chain stalled at height %v [%d of %d nodes]",
maxResult.Block.Height,
len(nodesAtHeight),
numRunningNodes)
return nil, nil, fmt.Errorf("chain stalled at height %v [%d of %d nodes %+v]",
maxResult.Block.Height,
len(nodesAtHeight),
numRunningNodes,
nodesAtHeight)
}
timer.Reset(1 * time.Second)
} }
time.Sleep(1 * time.Second)
} }
} }
// waitForNode waits for a node to become available and catch up to the given block height. // waitForNode waits for a node to become available and catch up to the given block height.
func waitForNode(node *e2e.Node, height int64, timeout time.Duration) (*rpctypes.ResultStatus, error) {
func waitForNode(ctx context.Context, node *e2e.Node, height int64) (*rpctypes.ResultStatus, error) {
if node.Mode == e2e.ModeSeed { if node.Mode == e2e.ModeSeed {
return nil, nil return nil, nil
} }
@ -126,42 +126,59 @@ func waitForNode(node *e2e.Node, height int64, timeout time.Duration) (*rpctypes
return nil, err return nil, err
} }
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
timer := time.NewTimer(0)
defer timer.Stop()
var (
lastFailed bool
counter int
)
for { for {
status, err := client.Status(ctx)
switch {
case errors.Is(err, context.DeadlineExceeded):
return nil, fmt.Errorf("timed out waiting for %v to reach height %v", node.Name, height)
case errors.Is(err, context.Canceled):
return nil, err
case err == nil && status.SyncInfo.LatestBlockHeight >= height:
return status, nil
}
time.Sleep(300 * time.Millisecond)
}
}
// waitForAllNodes waits for all nodes to become available and catch up to the given block height.
func waitForAllNodes(testnet *e2e.Testnet, height int64, timeout time.Duration) (int64, error) {
var lastHeight int64
for _, node := range testnet.Nodes {
if node.Mode == e2e.ModeSeed {
continue
}
status, err := waitForNode(node, height, timeout)
if err != nil {
return 0, err
counter++
if lastFailed {
lastFailed = false
// if there was a problem with the request in
// the previous recreate the client to ensure
// reconnection
client, err = node.Client()
if err != nil {
return nil, err
}
} }
if status.SyncInfo.LatestBlockHeight > lastHeight {
lastHeight = status.SyncInfo.LatestBlockHeight
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-timer.C:
status, err := client.Status(ctx)
switch {
case errors.Is(err, context.DeadlineExceeded):
return nil, fmt.Errorf("timed out waiting for %v to reach height %v", node.Name, height)
case errors.Is(err, context.Canceled):
return nil, err
case err == nil && status.SyncInfo.LatestBlockHeight >= height:
return status, nil
case counter%50 == 0:
switch {
case err != nil:
lastFailed = true
logger.Error("node not yet ready",
"iter", counter,
"node", node.Name,
"err", err,
"target", height,
)
case status != nil:
logger.Error("node not yet ready",
"iter", counter,
"node", node.Name,
"height", status.SyncInfo.LatestBlockHeight,
"target", height,
)
}
}
timer.Reset(250 * time.Millisecond)
} }
} }
return lastHeight, nil
} }

+ 25
- 5
test/e2e/runner/start.go View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"fmt" "fmt"
"sort" "sort"
"time" "time"
@ -8,7 +9,7 @@ import (
e2e "github.com/tendermint/tendermint/test/e2e/pkg" e2e "github.com/tendermint/tendermint/test/e2e/pkg"
) )
func Start(testnet *e2e.Testnet) error {
func Start(ctx context.Context, testnet *e2e.Testnet) error {
if len(testnet.Nodes) == 0 { if len(testnet.Nodes) == 0 {
return fmt.Errorf("no nodes in testnet") return fmt.Errorf("no nodes in testnet")
} }
@ -45,7 +46,14 @@ func Start(testnet *e2e.Testnet) error {
if err := execCompose(testnet.Dir, "up", "-d", node.Name); err != nil { if err := execCompose(testnet.Dir, "up", "-d", node.Name); err != nil {
return err return err
} }
if _, err := waitForNode(node, 0, time.Minute); err != nil {
if err := func() error {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
_, err := waitForNode(ctx, node, 0)
return err
}(); err != nil {
return err return err
} }
node.HasStarted = true node.HasStarted = true
@ -60,7 +68,7 @@ func Start(testnet *e2e.Testnet) error {
"nodes", len(testnet.Nodes)-len(nodeQueue), "nodes", len(testnet.Nodes)-len(nodeQueue),
"pending", len(nodeQueue)) "pending", len(nodeQueue))
block, blockID, err := waitForHeight(testnet, networkHeight)
block, blockID, err := waitForHeight(ctx, testnet, networkHeight)
if err != nil { if err != nil {
return err return err
} }
@ -74,9 +82,16 @@ func Start(testnet *e2e.Testnet) error {
// that this node will start at before we // that this node will start at before we
// start the node. // start the node.
logger.Info("Waiting for network to advance to height",
"node", node.Name,
"last_height", networkHeight,
"waiting_for", node.StartAt,
"size", len(testnet.Nodes)-len(nodeQueue),
"pending", len(nodeQueue))
networkHeight = node.StartAt networkHeight = node.StartAt
block, blockID, err = waitForHeight(testnet, networkHeight)
block, blockID, err = waitForHeight(ctx, testnet, networkHeight)
if err != nil { if err != nil {
return err return err
} }
@ -93,10 +108,15 @@ func Start(testnet *e2e.Testnet) error {
if err := execCompose(testnet.Dir, "up", "-d", node.Name); err != nil { if err := execCompose(testnet.Dir, "up", "-d", node.Name); err != nil {
return err return err
} }
status, err := waitForNode(node, node.StartAt, 8*time.Minute)
wctx, wcancel := context.WithTimeout(ctx, 8*time.Minute)
status, err := waitForNode(wctx, node, node.StartAt)
if err != nil { if err != nil {
wcancel()
return err return err
} }
wcancel()
node.HasStarted = true node.HasStarted = true
logger.Info(fmt.Sprintf("Node %v up on http://127.0.0.1:%v at height %v", logger.Info(fmt.Sprintf("Node %v up on http://127.0.0.1:%v at height %v",
node.Name, node.ProxyPort, status.SyncInfo.LatestBlockHeight)) node.Name, node.ProxyPort, status.SyncInfo.LatestBlockHeight))


+ 8
- 12
test/e2e/runner/wait.go View File

@ -1,31 +1,27 @@
package main package main
import ( import (
"context"
"fmt" "fmt"
"time"
e2e "github.com/tendermint/tendermint/test/e2e/pkg" e2e "github.com/tendermint/tendermint/test/e2e/pkg"
) )
// Wait waits for a number of blocks to be produced, and for all nodes to catch // Wait waits for a number of blocks to be produced, and for all nodes to catch
// up with it. // up with it.
func Wait(testnet *e2e.Testnet, blocks int64) error {
block, _, err := waitForHeight(testnet, 0)
func Wait(ctx context.Context, testnet *e2e.Testnet, blocks int64) error {
block, _, err := waitForHeight(ctx, testnet, 0)
if err != nil { if err != nil {
return err return err
} }
return WaitUntil(testnet, block.Height+blocks)
return WaitUntil(ctx, testnet, block.Height+blocks)
} }
// WaitUntil waits until a given height has been reached. // WaitUntil waits until a given height has been reached.
func WaitUntil(testnet *e2e.Testnet, height int64) error {
func WaitUntil(ctx context.Context, testnet *e2e.Testnet, height int64) error {
logger.Info(fmt.Sprintf("Waiting for all nodes to reach height %v...", height)) logger.Info(fmt.Sprintf("Waiting for all nodes to reach height %v...", height))
_, err := waitForAllNodes(testnet, height, waitingTime(len(testnet.Nodes)))
return err
}
// waitingTime estimates how long it should take for a node to reach the height.
// More nodes in a network implies we may expect a slower network and may have to wait longer.
func waitingTime(nodes int) time.Duration {
return time.Minute + (time.Duration(nodes) * (30 * time.Second))
_, _, err := waitForHeight(ctx, testnet, height)
return err
} }

+ 3
- 4
test/e2e/tests/app_test.go View File

@ -44,7 +44,7 @@ func TestApp_Hash(t *testing.T) {
block, err := client.Block(ctx, nil) block, err := client.Block(ctx, nil)
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, info.Response.LastBlockAppHash, block.Block.AppHash,
require.EqualValues(t, info.Response.LastBlockAppHash, block.Block.AppHash.Bytes(),
"app hash does not match last block's app hash") "app hash does not match last block's app hash")
status, err := client.Status(ctx) status, err := client.Status(ctx)
@ -62,9 +62,8 @@ func TestApp_Tx(t *testing.T) {
// Generate a random value, to prevent duplicate tx errors when // Generate a random value, to prevent duplicate tx errors when
// manually running the test multiple times for a testnet. // manually running the test multiple times for a testnet.
r := rand.New(rand.NewSource(time.Now().UnixNano()))
bz := make([]byte, 32) bz := make([]byte, 32)
_, err = r.Read(bz)
_, err = rand.Read(bz)
require.NoError(t, err) require.NoError(t, err)
key := fmt.Sprintf("testapp-tx-%v", node.Name) key := fmt.Sprintf("testapp-tx-%v", node.Name)
@ -75,7 +74,7 @@ func TestApp_Tx(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
hash := tx.Hash() hash := tx.Hash()
waitTime := 20 * time.Second
const waitTime = time.Minute
require.Eventuallyf(t, func() bool { require.Eventuallyf(t, func() bool {
txResp, err := client.Tx(ctx, hash, false) txResp, err := client.Tx(ctx, hash, false)


Loading…
Cancel
Save