From 4009102e2b998ab1596009c3e2991daa421a4e68 Mon Sep 17 00:00:00 2001 From: William Banfield <4561443+williambanfield@users.noreply.github.com> Date: Mon, 12 Jul 2021 19:05:47 -0400 Subject: [PATCH] statesync: remove outgoingCalls race condition in dispatcher (#6699) * statesync: remove outgoing calls race condition --- internal/statesync/dispatcher.go | 7 +- internal/statesync/dispatcher_test.go | 118 ++++++++++++++++++++++++++ 2 files changed, 121 insertions(+), 4 deletions(-) diff --git a/internal/statesync/dispatcher.go b/internal/statesync/dispatcher.go index 0f9b14198..394b77e38 100644 --- a/internal/statesync/dispatcher.go +++ b/internal/statesync/dispatcher.go @@ -49,13 +49,12 @@ func newDispatcher(requestCh chan<- p2p.Envelope, timeout time.Duration) *dispat // in a list, tracks the call and waits for the reactor to pass along the response func (d *dispatcher) LightBlock(ctx context.Context, height int64) (*types.LightBlock, types.NodeID, error) { d.mtx.Lock() - outgoingCalls := len(d.calls) - d.mtx.Unlock() - // check to see that the dispatcher is connected to at least one peer - if d.availablePeers.Len() == 0 && outgoingCalls == 0 { + if d.availablePeers.Len() == 0 && len(d.calls) == 0 { + d.mtx.Unlock() return nil, "", errNoConnectedPeers } + d.mtx.Unlock() // fetch the next peer id in the list and request a light block from that // peer diff --git a/internal/statesync/dispatcher_test.go b/internal/statesync/dispatcher_test.go index 9bec94c88..33bc7c2b6 100644 --- a/internal/statesync/dispatcher_test.go +++ b/internal/statesync/dispatcher_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/fortytw2/leaktest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -17,6 +18,7 @@ import ( ) func TestDispatcherBasic(t *testing.T) { + t.Cleanup(leaktest.Check(t)) ch := make(chan p2p.Envelope, 100) closeCh := make(chan struct{}) @@ -49,7 +51,77 @@ func TestDispatcherBasic(t *testing.T) { wg.Wait() } +func TestDispatcherReturnsNoBlock(t *testing.T) { + t.Cleanup(leaktest.Check(t)) + ch := make(chan p2p.Envelope, 100) + d := newDispatcher(ch, 1*time.Second) + peerFromSet := createPeerSet(1)[0] + d.addPeer(peerFromSet) + doneCh := make(chan struct{}) + + go func() { + err := d.respond(nil, peerFromSet) + require.Nil(t, err) + close(doneCh) + }() + + lb, peerResult, err := d.LightBlock(context.Background(), 1) + <-doneCh + + require.Nil(t, lb) + require.Nil(t, err) + require.Equal(t, peerFromSet, peerResult) +} + +func TestDispatcherErrorsWhenNoPeers(t *testing.T) { + t.Cleanup(leaktest.Check(t)) + ch := make(chan p2p.Envelope, 100) + d := newDispatcher(ch, 1*time.Second) + + lb, peerResult, err := d.LightBlock(context.Background(), 1) + + require.Nil(t, lb) + require.Empty(t, peerResult) + require.Equal(t, errNoConnectedPeers, err) +} + +func TestDispatcherReturnsBlockOncePeerAvailable(t *testing.T) { + t.Cleanup(leaktest.Check(t)) + ch := make(chan p2p.Envelope, 100) + d := newDispatcher(ch, 1*time.Second) + peerFromSet := createPeerSet(1)[0] + d.addPeer(peerFromSet) + ctx := context.Background() + wrapped, cancelFunc := context.WithCancel(ctx) + + doneCh := make(chan struct{}) + go func() { + lb, peerResult, err := d.LightBlock(wrapped, 1) + require.Nil(t, lb) + require.Equal(t, peerFromSet, peerResult) + require.Nil(t, err) + close(doneCh) + }() + cancelFunc() + <-doneCh + + go func() { + lb := &types.LightBlock{} + asProto, err := lb.ToProto() + require.Nil(t, err) + err = d.respond(asProto, peerFromSet) + require.Nil(t, err) + }() + + lb, peerResult, err := d.LightBlock(context.Background(), 1) + + require.NotNil(t, lb) + require.Equal(t, peerFromSet, peerResult) + require.Nil(t, err) +} + func TestDispatcherProviders(t *testing.T) { + t.Cleanup(leaktest.Check(t)) ch := make(chan p2p.Envelope, 100) chainID := "state-sync-test" @@ -78,6 +150,7 @@ func TestDispatcherProviders(t *testing.T) { } func TestPeerListBasic(t *testing.T) { + t.Cleanup(leaktest.Check(t)) peerList := newPeerList() assert.Zero(t, peerList.Len()) numPeers := 10 @@ -108,7 +181,52 @@ func TestPeerListBasic(t *testing.T) { } +func TestPeerListBlocksWhenEmpty(t *testing.T) { + t.Cleanup(leaktest.Check(t)) + peerList := newPeerList() + require.Zero(t, peerList.Len()) + doneCh := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + peerList.Pop(ctx) + close(doneCh) + }() + select { + case <-doneCh: + t.Error("empty peer list should not have returned result") + case <-time.After(100 * time.Millisecond): + } +} + +func TestEmptyPeerListReturnsWhenContextCanceled(t *testing.T) { + t.Cleanup(leaktest.Check(t)) + peerList := newPeerList() + require.Zero(t, peerList.Len()) + doneCh := make(chan struct{}) + ctx := context.Background() + wrapped, cancel := context.WithCancel(ctx) + go func() { + peerList.Pop(wrapped) + close(doneCh) + }() + select { + case <-doneCh: + t.Error("empty peer list should not have returned result") + case <-time.After(100 * time.Millisecond): + } + + cancel() + + select { + case <-doneCh: + case <-time.After(100 * time.Millisecond): + t.Error("peer list should have returned after context canceled") + } +} + func TestPeerListConcurrent(t *testing.T) { + t.Cleanup(leaktest.Check(t)) peerList := newPeerList() numPeers := 10