From d95894152beb3b25423cdb109af006340f0ac80b Mon Sep 17 00:00:00 2001 From: zjubfd Date: Sun, 3 Mar 2019 04:17:37 +0800 Subject: [PATCH] fix dirty data in peerset,resolve #3304 (#3359) * fix dirty data in peerset startInitPeer before PeerSet add the peer, once mconnection start and Receive of one Reactor faild, will try to remove it from PeerSet while PeerSet still not contain the peer. Fix this by change the order. * fix test FilterDuplicate * fix start/stop race * fix err --- libs/pubsub/pubsub.go | 2 ++ p2p/switch.go | 43 ++++++++++++++++++------------------------- p2p/switch_test.go | 10 ++++++---- 3 files changed, 26 insertions(+), 29 deletions(-) diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index 6e86a63c9..8d4d1fb05 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -88,6 +88,8 @@ type Server struct { cmds chan cmd cmdsCap int + // check if we have subscription before + // subscribing or unsubscribing mtx sync.RWMutex subscriptions map[string]map[string]struct{} // subscriber -> query (string) -> empty struct } diff --git a/p2p/switch.go b/p2p/switch.go index 7d2e6c3f3..ccd6d40f2 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -645,43 +645,36 @@ func (sw *Switch) addPeer(p Peer) error { // Handle the shut down case where the switch has stopped but we're // concurrently trying to add a peer. - if sw.IsRunning() { - // All good. Start peer - if err := sw.startInitPeer(p); err != nil { - return err - } - } else { + if !sw.IsRunning() { + // XXX should this return an error or just log and terminate? sw.Logger.Error("Won't start a peer - switch is not running", "peer", p) + return nil } - // Add the peer to .peers. - // We start it first so that a peer in the list is safe to Stop. - // It should not err since we already checked peers.Has(). - if err := sw.peers.Add(p); err != nil { + // Start the peer's send/recv routines. + // Must start it before adding it to the peer set + // to prevent Start and Stop from being called concurrently. + err := p.Start() + if err != nil { + // Should never happen + sw.Logger.Error("Error starting peer", "err", err, "peer", p) return err } - sw.Logger.Info("Added peer", "peer", p) - sw.metrics.Peers.Add(float64(1)) - - return nil -} - -func (sw *Switch) startInitPeer(p Peer) error { - err := p.Start() // spawn send/recv routines - if err != nil { - // Should never happen - sw.Logger.Error( - "Error starting peer", - "err", err, - "peer", p, - ) + // Add the peer to PeerSet. Do this before starting the reactors + // so that if Receive errors, we will find the peer and remove it. + // Add should not err since we already checked peers.Has(). + if err := sw.peers.Add(p); err != nil { return err } + sw.metrics.Peers.Add(float64(1)) + // Start all the reactor protocols on the peer. for _, reactor := range sw.reactors { reactor.AddPeer(p) } + sw.Logger.Info("Added peer", "peer", p) + return nil } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 358661616..47cfed55f 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -273,6 +273,8 @@ func TestSwitchPeerFilterTimeout(t *testing.T) { func TestSwitchPeerFilterDuplicate(t *testing.T) { sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) + sw.Start() + defer sw.Stop() // simulate remote peer rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} @@ -293,12 +295,12 @@ func TestSwitchPeerFilterDuplicate(t *testing.T) { } err = sw.addPeer(p) - if err, ok := err.(ErrRejected); ok { - if !err.IsDuplicate() { - t.Errorf("expected peer to be duplicate") + if errRej, ok := err.(ErrRejected); ok { + if !errRej.IsDuplicate() { + t.Errorf("expected peer to be duplicate. got %v", errRej) } } else { - t.Errorf("expected ErrRejected") + t.Errorf("expected ErrRejected, got %v", err) } }