diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index a20c8ca41..abfa973b5 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -280,6 +280,7 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) { } if err := r.initStateProvider(ctx, r.chainID, r.initialHeight); err != nil { + r.mtx.Unlock() return sm.State{}, err } @@ -889,17 +890,20 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { } r.mtx.Lock() + defer r.mtx.Unlock() if r.syncer == nil { - r.mtx.Unlock() return } - defer r.mtx.Unlock() switch peerUpdate.Status { case p2p.PeerStatusUp: newProvider := NewBlockProvider(peerUpdate.NodeID, r.chainID, r.dispatcher) r.providers[peerUpdate.NodeID] = newProvider - r.syncer.AddPeer(peerUpdate.NodeID) + err := r.syncer.AddPeer(peerUpdate.NodeID) + if err != nil { + r.Logger.Error("error adding peer to syncer", "error", err) + return + } if sp, ok := r.stateProvider.(*stateProviderP2P); ok { // we do this in a separate routine to not block whilst waiting for the light client to finish // whatever call it's currently executing diff --git a/internal/statesync/syncer.go b/internal/statesync/syncer.go index 68bec6880..b4212961a 100644 --- a/internal/statesync/syncer.go +++ b/internal/statesync/syncer.go @@ -141,7 +141,17 @@ func (s *syncer) AddSnapshot(peerID types.NodeID, snapshot *snapshot) (bool, err // AddPeer adds a peer to the pool. For now we just keep it simple and send a // single request to discover snapshots, later we may want to do retries and stuff. -func (s *syncer) AddPeer(peerID types.NodeID) { +func (s *syncer) AddPeer(peerID types.NodeID) (err error) { + defer func() { + // TODO: remove panic recover once AddPeer can no longer accientally send on + // closed channel. + // This recover was added to protect against the p2p message being sent + // to the snapshot channel after the snapshot channel was closed. + if r := recover(); r != nil { + err = fmt.Errorf("panic sending peer snapshot request: %v", r) + } + }() + s.logger.Debug("Requesting snapshots from peer", "peer", peerID) msg := p2p.Envelope{ @@ -153,6 +163,7 @@ func (s *syncer) AddPeer(peerID types.NodeID) { case <-s.closeCh: case s.snapshotCh <- msg: } + return err } // RemovePeer removes a peer from the pool. diff --git a/internal/statesync/syncer_test.go b/internal/statesync/syncer_test.go index 14297c049..ad902a54c 100644 --- a/internal/statesync/syncer_test.go +++ b/internal/statesync/syncer_test.go @@ -77,12 +77,14 @@ func TestSyncer_SyncAny(t *testing.T) { require.Error(t, err) // Adding a couple of peers should trigger snapshot discovery messages - rts.syncer.AddPeer(peerAID) + err = rts.syncer.AddPeer(peerAID) + require.NoError(t, err) e := <-rts.snapshotOutCh require.Equal(t, &ssproto.SnapshotsRequest{}, e.Message) require.Equal(t, peerAID, e.To) - rts.syncer.AddPeer(peerBID) + err = rts.syncer.AddPeer(peerBID) + require.NoError(t, err) e = <-rts.snapshotOutCh require.Equal(t, &ssproto.SnapshotsRequest{}, e.Message) require.Equal(t, peerBID, e.To)