Browse Source

fix dirty data in peerset,resolve #3304 (#3359)

* fix dirty data in peerset

startInitPeer before PeerSet add the peer,
once mconnection start and Receive of one
Reactor faild, will try to remove it from PeerSet
while PeerSet still not contain the peer. Fix
this by change the order.

* fix test FilterDuplicate

* fix start/stop race

* fix err
pull/3374/head
zjubfd 6 years ago
committed by Ethan Buchman
parent
commit
d95894152b
3 changed files with 26 additions and 29 deletions
  1. +2
    -0
      libs/pubsub/pubsub.go
  2. +18
    -25
      p2p/switch.go
  3. +6
    -4
      p2p/switch_test.go

+ 2
- 0
libs/pubsub/pubsub.go View File

@ -88,6 +88,8 @@ type Server struct {
cmds chan cmd
cmdsCap int
// check if we have subscription before
// subscribing or unsubscribing
mtx sync.RWMutex
subscriptions map[string]map[string]struct{} // subscriber -> query (string) -> empty struct
}


+ 18
- 25
p2p/switch.go View File

@ -645,43 +645,36 @@ func (sw *Switch) addPeer(p Peer) error {
// Handle the shut down case where the switch has stopped but we're
// concurrently trying to add a peer.
if sw.IsRunning() {
// All good. Start peer
if err := sw.startInitPeer(p); err != nil {
return err
}
} else {
if !sw.IsRunning() {
// XXX should this return an error or just log and terminate?
sw.Logger.Error("Won't start a peer - switch is not running", "peer", p)
return nil
}
// Add the peer to .peers.
// We start it first so that a peer in the list is safe to Stop.
// It should not err since we already checked peers.Has().
if err := sw.peers.Add(p); err != nil {
// Start the peer's send/recv routines.
// Must start it before adding it to the peer set
// to prevent Start and Stop from being called concurrently.
err := p.Start()
if err != nil {
// Should never happen
sw.Logger.Error("Error starting peer", "err", err, "peer", p)
return err
}
sw.Logger.Info("Added peer", "peer", p)
sw.metrics.Peers.Add(float64(1))
return nil
}
func (sw *Switch) startInitPeer(p Peer) error {
err := p.Start() // spawn send/recv routines
if err != nil {
// Should never happen
sw.Logger.Error(
"Error starting peer",
"err", err,
"peer", p,
)
// Add the peer to PeerSet. Do this before starting the reactors
// so that if Receive errors, we will find the peer and remove it.
// Add should not err since we already checked peers.Has().
if err := sw.peers.Add(p); err != nil {
return err
}
sw.metrics.Peers.Add(float64(1))
// Start all the reactor protocols on the peer.
for _, reactor := range sw.reactors {
reactor.AddPeer(p)
}
sw.Logger.Info("Added peer", "peer", p)
return nil
}

+ 6
- 4
p2p/switch_test.go View File

@ -273,6 +273,8 @@ func TestSwitchPeerFilterTimeout(t *testing.T) {
func TestSwitchPeerFilterDuplicate(t *testing.T) {
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
sw.Start()
defer sw.Stop()
// simulate remote peer
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
@ -293,12 +295,12 @@ func TestSwitchPeerFilterDuplicate(t *testing.T) {
}
err = sw.addPeer(p)
if err, ok := err.(ErrRejected); ok {
if !err.IsDuplicate() {
t.Errorf("expected peer to be duplicate")
if errRej, ok := err.(ErrRejected); ok {
if !errRej.IsDuplicate() {
t.Errorf("expected peer to be duplicate. got %v", errRej)
}
} else {
t.Errorf("expected ErrRejected")
t.Errorf("expected ErrRejected, got %v", err)
}
}


Loading…
Cancel
Save