diff --git a/p2p/addrbook.go b/p2p/addrbook.go index dcb41d82f..56fe2a1b1 100644 --- a/p2p/addrbook.go +++ b/p2p/addrbook.go @@ -144,6 +144,7 @@ func (a *AddrBook) Stop() { func (a *AddrBook) AddOurAddress(addr *NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() + log.Debug("Add our address to book", "addr", addr) a.ourAddrs[addr.String()] = addr } @@ -158,6 +159,7 @@ func (a *AddrBook) OurAddresses() []*NetAddress { func (a *AddrBook) AddAddress(addr *NetAddress, src *NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() + log.Debug("Add address to book", "addr", addr, "src", src) a.addAddress(addr, src) } @@ -334,7 +336,7 @@ func (a *AddrBook) loadFromFile(filePath string) { // If doesn't exist, do nothing. _, err := os.Stat(filePath) if os.IsNotExist(err) { - return + panic(Fmt("File does not exist: %v", filePath)) } // Load addrBookJSON{} @@ -546,7 +548,7 @@ func (a *AddrBook) addAddress(addr, src *NetAddress) { bucket := a.calcNewBucket(addr, src) a.addToNewBucket(ka, bucket) - log.Info(Fmt("Added new address %s for a total of %d addresses", addr, a.size())) + log.Info("Added new address", "address", addr, "total", a.size()) } // Make space in the new buckets by expiring the really bad entries. diff --git a/p2p/peer_set.go b/p2p/peer_set.go index 158ffe542..b4230ffa3 100644 --- a/p2p/peer_set.go +++ b/p2p/peer_set.go @@ -70,12 +70,13 @@ func (ps *PeerSet) Remove(peer *Peer) { // If it's the last peer, that's an easy special case. if index == len(ps.list)-1 { ps.list = newList + delete(ps.lookup, peer.Key) return } // Move the last item from ps.list to "index" in list. lastPeer := ps.list[len(ps.list)-1] - lastPeerAddr := lastPeer.mconn.RemoteAddress.String() - lastPeerItem := ps.lookup[lastPeerAddr] + lastPeerKey := lastPeer.Key + lastPeerItem := ps.lookup[lastPeerKey] newList[index] = lastPeer lastPeerItem.index = index ps.list = newList diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go new file mode 100644 index 000000000..6a0c38ae2 --- /dev/null +++ b/p2p/peer_set_test.go @@ -0,0 +1,63 @@ +package p2p + +import ( + "math/rand" + "testing" + + . "github.com/tendermint/tendermint/common" +) + +// Returns an empty dummy peer +func randPeer() *Peer { + return &Peer{ + Key: Fmt("%v.%v.%v.%v:%v", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%10000+80), + } +} + +func TestAddRemoveOne(t *testing.T) { + peerSet := NewPeerSet() + + peer := randPeer() + added := peerSet.Add(peer) + if !added { + t.Errorf("Failed to add new peer") + } + if peerSet.Size() != 1 { + t.Errorf("Failed to add new peer and increment size") + } + + peerSet.Remove(peer) + if peerSet.Has(peer.Key) { + t.Errorf("Failed to remove peer") + } + if peerSet.Size() != 0 { + t.Errorf("Failed to remove peer and decrement size") + } +} + +func TestAddRemoveMany(t *testing.T) { + peerSet := NewPeerSet() + + peers := []*Peer{} + for i := 0; i < 100; i++ { + peer := randPeer() + added := peerSet.Add(peer) + if !added { + t.Errorf("Failed to add new peer") + } + if peerSet.Size() != i+1 { + t.Errorf("Failed to add new peer and increment size") + } + peers = append(peers, peer) + } + + for i, peer := range peers { + peerSet.Remove(peer) + if peerSet.Has(peer.Key) { + t.Errorf("Failed to remove peer") + } + if peerSet.Size() != len(peers)-i-1 { + t.Errorf("Failed to remove peer and decrement size") + } + } +} diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 166c3c5f2..90be9b24c 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -105,12 +105,7 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) { case *pexRequestMessage: // src requested some peers. // TODO: prevent abuse. - addrs := pexR.book.GetSelection() - msg := &pexAddrsMessage{Addrs: addrs} - queued := src.TrySend(PexCh, msg) - if !queued { - // ignore - } + pexR.SendAddrs(src, pexR.book.GetSelection()) case *pexAddrsMessage: // We received some peer addresses from src. // TODO: prevent abuse. @@ -127,7 +122,7 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) { // Asks peer for more addresses. func (pexR *PEXReactor) RequestPEX(peer *Peer) { - peer.TrySend(PexCh, &pexRequestMessage{}) + peer.Send(PexCh, &pexRequestMessage{}) } func (pexR *PEXReactor) SendAddrs(peer *Peer, addrs []*NetAddress) { @@ -176,11 +171,17 @@ func (pexR *PEXReactor) ensurePeers() { if try == nil { break } - if toDial.Has(try.String()) || - pexR.sw.IsDialing(try) || - pexR.sw.Peers().Has(try.String()) { + alreadySelected := toDial.Has(try.String()) + alreadyDialing := pexR.sw.IsDialing(try) + alreadyConnected := pexR.sw.Peers().Has(try.String()) + if alreadySelected || alreadyDialing || alreadyConnected { + log.Debug("Cannot dial address", "addr", try, + "alreadySelected", alreadySelected, + "alreadyDialing", alreadyDialing, + "alreadyConnected", alreadyConnected) continue } else { + log.Debug("Will dial address", "addr", try) picked = try break } diff --git a/p2p/switch.go b/p2p/switch.go index 36da32aa6..635e0ecaa 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -143,17 +143,20 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) { return nil, ErrSwitchStopped } - log.Info("Dialing peer", "address", addr) + log.Debug("Dialing address", "address", addr) sw.dialing.Set(addr.String(), addr) conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second) sw.dialing.Delete(addr.String()) if err != nil { + log.Debug("Failed dialing address", "address", addr, "error", err) return nil, err } peer, err := sw.AddPeerWithConnection(conn, true) if err != nil { + log.Debug("Failed adding peer", "address", addr, "conn", conn, "error", err) return nil, err } + log.Info("Dialed and added peer", "address", addr, "peer", peer) return peer, nil }