Browse Source

reduce chance of deadlocks

pull/8091/head
tycho garen 3 years ago
parent
commit
05e0e0eacb
1 changed files with 67 additions and 9 deletions
  1. +67
    -9
      internal/consensus/reactor_test.go

+ 67
- 9
internal/consensus/reactor_test.go View File

@ -537,6 +537,9 @@ func TestReactorWithEvidence(t *testing.T) {
go func(s eventbus.Subscription) {
defer wg.Done()
msg, err := s.Next(ctx)
if ctx.Err() != nil {
return
}
if !assert.NoError(t, err) {
cancel()
return
@ -546,8 +549,17 @@ func TestReactorWithEvidence(t *testing.T) {
require.Len(t, block.Evidence, 1)
}(sub)
}
sig := make(chan struct{})
go func() { defer close(sig); wg.Wait() }()
wg.Wait()
select {
case <-ctx.Done():
t.Fatal("operation timed out")
case <-sig:
if ctx.Err() != nil {
t.Fatal("operation aborted")
}
}
}
func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
@ -596,13 +608,25 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
go func(s eventbus.Subscription) {
defer wg.Done()
_, err := s.Next(ctx)
if ctx.Err() != nil {
return
}
if !assert.NoError(t, err) {
cancel()
}
}(sub)
}
sig := make(chan struct{})
go func() { defer close(sig); wg.Wait() }()
wg.Wait()
select {
case <-ctx.Done():
t.Fatal("operation timed out")
case <-sig:
if ctx.Err() != nil {
t.Fatal("operation aborted")
}
}
}
func TestReactorRecordsVotesAndBlockParts(t *testing.T) {
@ -632,13 +656,26 @@ func TestReactorRecordsVotesAndBlockParts(t *testing.T) {
go func(s eventbus.Subscription) {
defer wg.Done()
_, err := s.Next(ctx)
if ctx.Err() != nil {
return
}
if !assert.NoError(t, err) {
cancel()
}
}(sub)
}
wg.Wait()
sig := make(chan struct{})
go func() { defer close(sig); wg.Wait() }()
select {
case <-ctx.Done():
t.Fatal("operation timed out")
case <-sig:
if ctx.Err() != nil {
t.Fatal("operation aborted")
}
}
// Require at least one node to have sent block parts, but we can't know which
// peer sent it.
@ -712,13 +749,25 @@ func TestReactorVotingPowerChange(t *testing.T) {
go func(s eventbus.Subscription) {
defer wg.Done()
_, err := s.Next(ctx)
if ctx.Err() != nil {
return
}
if !assert.NoError(t, err) {
cancel()
}
}(sub)
}
sig := make(chan struct{})
go func() { defer close(sig); wg.Wait() }()
wg.Wait()
select {
case <-ctx.Done():
t.Fatal("operation timed out")
case <-sig:
if ctx.Err() != nil {
t.Fatal("operation aborted")
}
}
blocksSubs := []eventbus.Subscription{}
for _, sub := range rts.subs {
@ -820,17 +869,26 @@ func TestReactorValidatorSetChanges(t *testing.T) {
go func(s eventbus.Subscription) {
defer wg.Done()
_, err := s.Next(ctx)
switch {
case err == nil:
case errors.Is(err, context.DeadlineExceeded):
default:
if ctx.Err() != nil {
return
}
if !assert.NoError(t, err) {
t.Log(err)
cancel()
}
}(sub)
}
sig := make(chan struct{})
go func() { defer close(sig); wg.Wait() }()
wg.Wait()
select {
case <-ctx.Done():
t.Fatal("operation timed out")
case <-sig:
if ctx.Err() != nil {
t.Fatal("operation aborted")
}
}
// after the wait returns, either there was an error with a
// subscription (very unlikely, and causes the context to be


Loading…
Cancel
Save