@ -188,10 +188,17 @@ func waitForAndValidateBlock(
ctx , cancel := context . WithCancel ( bctx )
defer cancel ( )
fn := func ( j int ) {
msg , err := blocksSubs [ j ] . Next ( ctx )
if ! assert . NoError ( t , err ) {
cancel ( )
switch {
case errors . Is ( err , context . DeadlineExceeded ) :
return
case errors . Is ( err , context . Canceled ) :
return
case err != nil :
cancel ( ) // terminate other workers
require . NoError ( t , err )
return
}
@ -217,6 +224,10 @@ func waitForAndValidateBlock(
}
wg . Wait ( )
if err := ctx . Err ( ) ; errors . Is ( err , context . DeadlineExceeded ) {
t . Fatal ( "encountered timeout" )
}
}
func waitForAndValidateBlockWithTx (
@ -236,8 +247,14 @@ func waitForAndValidateBlockWithTx(
ntxs := 0
for {
msg , err := blocksSubs [ j ] . Next ( ctx )
if ! assert . NoError ( t , err ) {
cancel ( )
switch {
case errors . Is ( err , context . DeadlineExceeded ) :
return
case errors . Is ( err , context . Canceled ) :
return
case err != nil :
cancel ( ) // terminate other workers
t . Fatalf ( "problem waiting for %d subscription: %v" , j , err )
return
}
@ -268,6 +285,9 @@ func waitForAndValidateBlockWithTx(
}
wg . Wait ( )
if err := ctx . Err ( ) ; errors . Is ( err , context . DeadlineExceeded ) {
t . Fatal ( "encountered timeout" )
}
}
func waitForBlockWithUpdatedValsAndValidateIt (
@ -287,8 +307,14 @@ func waitForBlockWithUpdatedValsAndValidateIt(
for {
msg , err := blocksSubs [ j ] . Next ( ctx )
if ! assert . NoError ( t , err ) {
cancel ( )
switch {
case errors . Is ( err , context . DeadlineExceeded ) :
return
case errors . Is ( err , context . Canceled ) :
return
case err != nil :
cancel ( ) // terminate other workers
t . Fatalf ( "problem waiting for %d subscription: %v" , j , err )
return
}
@ -311,6 +337,9 @@ func waitForBlockWithUpdatedValsAndValidateIt(
}
wg . Wait ( )
if err := ctx . Err ( ) ; errors . Is ( err , context . DeadlineExceeded ) {
t . Fatal ( "encountered timeout" )
}
}
func ensureBlockSyncStatus ( t * testing . T , msg tmpubsub . Message , complete bool , height int64 ) {
@ -342,6 +371,8 @@ func TestReactorBasic(t *testing.T) {
}
var wg sync . WaitGroup
errCh := make ( chan error , len ( rts . subs ) )
for _ , sub := range rts . subs {
wg . Add ( 1 )
@ -349,14 +380,32 @@ func TestReactorBasic(t *testing.T) {
go func ( s eventbus . Subscription ) {
defer wg . Done ( )
_ , err := s . Next ( ctx )
if ! assert . NoError ( t , err ) {
cancel ( )
switch {
case errors . Is ( err , context . DeadlineExceeded ) :
return
case errors . Is ( err , context . Canceled ) :
return
case err != nil :
errCh <- err
cancel ( ) // terminate other workers
return
}
} ( sub )
}
wg . Wait ( )
if err := ctx . Err ( ) ; errors . Is ( err , context . DeadlineExceeded ) {
t . Fatal ( "encountered timeout" )
}
select {
case err := <- errCh :
if err != nil {
t . Fatal ( err )
}
default :
}
errCh = make ( chan error , len ( rts . blocksyncSubs ) )
for _ , sub := range rts . blocksyncSubs {
wg . Add ( 1 )
@ -364,8 +413,14 @@ func TestReactorBasic(t *testing.T) {
go func ( s eventbus . Subscription ) {
defer wg . Done ( )
msg , err := s . Next ( ctx )
if ! assert . NoError ( t , err ) {
cancel ( )
switch {
case errors . Is ( err , context . DeadlineExceeded ) :
return
case errors . Is ( err , context . Canceled ) :
return
case err != nil :
errCh <- err
cancel ( ) // terminate other workers
return
}
ensureBlockSyncStatus ( t , msg , true , 0 )
@ -373,6 +428,17 @@ func TestReactorBasic(t *testing.T) {
}
wg . Wait ( )
if err := ctx . Err ( ) ; errors . Is ( err , context . DeadlineExceeded ) {
t . Fatal ( "encountered timeout" )
}
select {
case err := <- errCh :
if err != nil {
t . Fatal ( err )
}
default :
}
}
func TestReactorWithEvidence ( t * testing . T ) {
@ -709,7 +775,7 @@ func TestReactorVotingPowerChange(t *testing.T) {
}
func TestReactorValidatorSetChanges ( t * testing . T ) {
ctx , cancel := context . WithTimeout ( context . Background ( ) , time . Minute )
ctx , cancel := context . WithTimeout ( context . Background ( ) , 2 * time . Minute )
defer cancel ( )
cfg := configSetup ( t )
@ -752,7 +818,11 @@ func TestReactorValidatorSetChanges(t *testing.T) {
go func ( s eventbus . Subscription ) {
defer wg . Done ( )
_ , err := s . Next ( ctx )
if ! assert . NoError ( t , err ) {
switch {
case err == nil :
case errors . Is ( err , context . DeadlineExceeded ) :
default :
t . Log ( err )
cancel ( )
}
} ( sub )
@ -760,6 +830,17 @@ func TestReactorValidatorSetChanges(t *testing.T) {
wg . Wait ( )
// after the wait returns, either there was an error with a
// subscription (very unlikely, and causes the context to be
// canceled manually), there was a timeout and the test's root context
// was canceled (somewhat likely,) or the test can proceed
// (common.)
if err := ctx . Err ( ) ; errors . Is ( err , context . DeadlineExceeded ) {
t . Fatal ( "encountered timeout" )
} else if errors . Is ( err , context . Canceled ) {
t . Fatal ( "subscription encountered unexpected error" )
}
newValidatorPubKey1 , err := states [ nVals ] . privValidator . GetPubKey ( ctx )
require . NoError ( t , err )