Browse Source

reduce chance of deadlocks

pull/8127/head
tycho garen 3 years ago
committed by M. J. Fromberger
parent
commit
e675ffa552
1 changed files with 67 additions and 9 deletions
  1. +67
    -9
      internal/consensus/reactor_test.go

+ 67
- 9
internal/consensus/reactor_test.go View File

@ -535,6 +535,9 @@ func TestReactorWithEvidence(t *testing.T) {
go func(s eventbus.Subscription) {
defer wg.Done()
msg, err := s.Next(ctx)
if ctx.Err() != nil {
return
}
if !assert.NoError(t, err) {
cancel()
return
@ -544,8 +547,17 @@ func TestReactorWithEvidence(t *testing.T) {
require.Len(t, block.Evidence, 1)
}(sub)
}
sig := make(chan struct{})
go func() { defer close(sig); wg.Wait() }()
wg.Wait()
select {
case <-ctx.Done():
t.Fatal("operation timed out")
case <-sig:
if ctx.Err() != nil {
t.Fatal("operation aborted")
}
}
}
func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
@ -593,13 +605,25 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
go func(s eventbus.Subscription) {
defer wg.Done()
_, err := s.Next(ctx)
if ctx.Err() != nil {
return
}
if !assert.NoError(t, err) {
cancel()
}
}(sub)
}
sig := make(chan struct{})
go func() { defer close(sig); wg.Wait() }()
wg.Wait()
select {
case <-ctx.Done():
t.Fatal("operation timed out")
case <-sig:
if ctx.Err() != nil {
t.Fatal("operation aborted")
}
}
}
func TestReactorRecordsVotesAndBlockParts(t *testing.T) {
@ -629,13 +653,26 @@ func TestReactorRecordsVotesAndBlockParts(t *testing.T) {
go func(s eventbus.Subscription) {
defer wg.Done()
_, err := s.Next(ctx)
if ctx.Err() != nil {
return
}
if !assert.NoError(t, err) {
cancel()
}
}(sub)
}
wg.Wait()
sig := make(chan struct{})
go func() { defer close(sig); wg.Wait() }()
select {
case <-ctx.Done():
t.Fatal("operation timed out")
case <-sig:
if ctx.Err() != nil {
t.Fatal("operation aborted")
}
}
// Require at least one node to have sent block parts, but we can't know which
// peer sent it.
@ -709,13 +746,25 @@ func TestReactorVotingPowerChange(t *testing.T) {
go func(s eventbus.Subscription) {
defer wg.Done()
_, err := s.Next(ctx)
if ctx.Err() != nil {
return
}
if !assert.NoError(t, err) {
cancel()
}
}(sub)
}
sig := make(chan struct{})
go func() { defer close(sig); wg.Wait() }()
wg.Wait()
select {
case <-ctx.Done():
t.Fatal("operation timed out")
case <-sig:
if ctx.Err() != nil {
t.Fatal("operation aborted")
}
}
blocksSubs := []eventbus.Subscription{}
for _, sub := range rts.subs {
@ -817,17 +866,26 @@ func TestReactorValidatorSetChanges(t *testing.T) {
go func(s eventbus.Subscription) {
defer wg.Done()
_, err := s.Next(ctx)
switch {
case err == nil:
case errors.Is(err, context.DeadlineExceeded):
default:
if ctx.Err() != nil {
return
}
if !assert.NoError(t, err) {
t.Log(err)
cancel()
}
}(sub)
}
sig := make(chan struct{})
go func() { defer close(sig); wg.Wait() }()
wg.Wait()
select {
case <-ctx.Done():
t.Fatal("operation timed out")
case <-sig:
if ctx.Err() != nil {
t.Fatal("operation aborted")
}
}
// after the wait returns, either there was an error with a
// subscription (very unlikely, and causes the context to be


Loading…
Cancel
Save