diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index 6e86a63c9..8d4d1fb05 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -88,6 +88,8 @@ type Server struct { cmds chan cmd cmdsCap int + // check if we have subscription before + // subscribing or unsubscribing mtx sync.RWMutex subscriptions map[string]map[string]struct{} // subscriber -> query (string) -> empty struct } diff --git a/p2p/switch.go b/p2p/switch.go index 7d2e6c3f3..ccd6d40f2 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -645,43 +645,36 @@ func (sw *Switch) addPeer(p Peer) error { // Handle the shut down case where the switch has stopped but we're // concurrently trying to add a peer. - if sw.IsRunning() { - // All good. Start peer - if err := sw.startInitPeer(p); err != nil { - return err - } - } else { + if !sw.IsRunning() { + // XXX should this return an error or just log and terminate? sw.Logger.Error("Won't start a peer - switch is not running", "peer", p) + return nil } - // Add the peer to .peers. - // We start it first so that a peer in the list is safe to Stop. - // It should not err since we already checked peers.Has(). - if err := sw.peers.Add(p); err != nil { + // Start the peer's send/recv routines. + // Must start it before adding it to the peer set + // to prevent Start and Stop from being called concurrently. + err := p.Start() + if err != nil { + // Should never happen + sw.Logger.Error("Error starting peer", "err", err, "peer", p) return err } - sw.Logger.Info("Added peer", "peer", p) - sw.metrics.Peers.Add(float64(1)) - - return nil -} - -func (sw *Switch) startInitPeer(p Peer) error { - err := p.Start() // spawn send/recv routines - if err != nil { - // Should never happen - sw.Logger.Error( - "Error starting peer", - "err", err, - "peer", p, - ) + // Add the peer to PeerSet. Do this before starting the reactors + // so that if Receive errors, we will find the peer and remove it. + // Add should not err since we already checked peers.Has(). + if err := sw.peers.Add(p); err != nil { return err } + sw.metrics.Peers.Add(float64(1)) + // Start all the reactor protocols on the peer. for _, reactor := range sw.reactors { reactor.AddPeer(p) } + sw.Logger.Info("Added peer", "peer", p) + return nil } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 358661616..47cfed55f 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -273,6 +273,8 @@ func TestSwitchPeerFilterTimeout(t *testing.T) { func TestSwitchPeerFilterDuplicate(t *testing.T) { sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) + sw.Start() + defer sw.Stop() // simulate remote peer rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} @@ -293,12 +295,12 @@ func TestSwitchPeerFilterDuplicate(t *testing.T) { } err = sw.addPeer(p) - if err, ok := err.(ErrRejected); ok { - if !err.IsDuplicate() { - t.Errorf("expected peer to be duplicate") + if errRej, ok := err.(ErrRejected); ok { + if !errRej.IsDuplicate() { + t.Errorf("expected peer to be duplicate. got %v", errRej) } } else { - t.Errorf("expected ErrRejected") + t.Errorf("expected ErrRejected, got %v", err) } }