Browse Source

Merge pull request #39 from tendermint/development

Development
pull/40/head
Jae Kwon 10 years ago
parent
commit
43a0c253f8
5 changed files with 85 additions and 15 deletions
  1. +4
    -2
      p2p/addrbook.go
  2. +3
    -2
      p2p/peer_set.go
  3. +63
    -0
      p2p/peer_set_test.go
  4. +11
    -10
      p2p/pex_reactor.go
  5. +4
    -1
      p2p/switch.go

+ 4
- 2
p2p/addrbook.go View File

@ -144,6 +144,7 @@ func (a *AddrBook) Stop() {
func (a *AddrBook) AddOurAddress(addr *NetAddress) {
a.mtx.Lock()
defer a.mtx.Unlock()
log.Debug("Add our address to book", "addr", addr)
a.ourAddrs[addr.String()] = addr
}
@ -158,6 +159,7 @@ func (a *AddrBook) OurAddresses() []*NetAddress {
func (a *AddrBook) AddAddress(addr *NetAddress, src *NetAddress) {
a.mtx.Lock()
defer a.mtx.Unlock()
log.Debug("Add address to book", "addr", addr, "src", src)
a.addAddress(addr, src)
}
@ -334,7 +336,7 @@ func (a *AddrBook) loadFromFile(filePath string) {
// If doesn't exist, do nothing.
_, err := os.Stat(filePath)
if os.IsNotExist(err) {
return
panic(Fmt("File does not exist: %v", filePath))
}
// Load addrBookJSON{}
@ -546,7 +548,7 @@ func (a *AddrBook) addAddress(addr, src *NetAddress) {
bucket := a.calcNewBucket(addr, src)
a.addToNewBucket(ka, bucket)
log.Info(Fmt("Added new address %s for a total of %d addresses", addr, a.size()))
log.Info("Added new address", "address", addr, "total", a.size())
}
// Make space in the new buckets by expiring the really bad entries.


+ 3
- 2
p2p/peer_set.go View File

@ -70,12 +70,13 @@ func (ps *PeerSet) Remove(peer *Peer) {
// If it's the last peer, that's an easy special case.
if index == len(ps.list)-1 {
ps.list = newList
delete(ps.lookup, peer.Key)
return
}
// Move the last item from ps.list to "index" in list.
lastPeer := ps.list[len(ps.list)-1]
lastPeerAddr := lastPeer.mconn.RemoteAddress.String()
lastPeerItem := ps.lookup[lastPeerAddr]
lastPeerKey := lastPeer.Key
lastPeerItem := ps.lookup[lastPeerKey]
newList[index] = lastPeer
lastPeerItem.index = index
ps.list = newList


+ 63
- 0
p2p/peer_set_test.go View File

@ -0,0 +1,63 @@
package p2p
import (
"math/rand"
"testing"
. "github.com/tendermint/tendermint/common"
)
// Returns an empty dummy peer
func randPeer() *Peer {
return &Peer{
Key: Fmt("%v.%v.%v.%v:%v", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%10000+80),
}
}
func TestAddRemoveOne(t *testing.T) {
peerSet := NewPeerSet()
peer := randPeer()
added := peerSet.Add(peer)
if !added {
t.Errorf("Failed to add new peer")
}
if peerSet.Size() != 1 {
t.Errorf("Failed to add new peer and increment size")
}
peerSet.Remove(peer)
if peerSet.Has(peer.Key) {
t.Errorf("Failed to remove peer")
}
if peerSet.Size() != 0 {
t.Errorf("Failed to remove peer and decrement size")
}
}
func TestAddRemoveMany(t *testing.T) {
peerSet := NewPeerSet()
peers := []*Peer{}
for i := 0; i < 100; i++ {
peer := randPeer()
added := peerSet.Add(peer)
if !added {
t.Errorf("Failed to add new peer")
}
if peerSet.Size() != i+1 {
t.Errorf("Failed to add new peer and increment size")
}
peers = append(peers, peer)
}
for i, peer := range peers {
peerSet.Remove(peer)
if peerSet.Has(peer.Key) {
t.Errorf("Failed to remove peer")
}
if peerSet.Size() != len(peers)-i-1 {
t.Errorf("Failed to remove peer and decrement size")
}
}
}

+ 11
- 10
p2p/pex_reactor.go View File

@ -105,12 +105,7 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) {
case *pexRequestMessage:
// src requested some peers.
// TODO: prevent abuse.
addrs := pexR.book.GetSelection()
msg := &pexAddrsMessage{Addrs: addrs}
queued := src.TrySend(PexCh, msg)
if !queued {
// ignore
}
pexR.SendAddrs(src, pexR.book.GetSelection())
case *pexAddrsMessage:
// We received some peer addresses from src.
// TODO: prevent abuse.
@ -127,7 +122,7 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) {
// Asks peer for more addresses.
func (pexR *PEXReactor) RequestPEX(peer *Peer) {
peer.TrySend(PexCh, &pexRequestMessage{})
peer.Send(PexCh, &pexRequestMessage{})
}
func (pexR *PEXReactor) SendAddrs(peer *Peer, addrs []*NetAddress) {
@ -176,11 +171,17 @@ func (pexR *PEXReactor) ensurePeers() {
if try == nil {
break
}
if toDial.Has(try.String()) ||
pexR.sw.IsDialing(try) ||
pexR.sw.Peers().Has(try.String()) {
alreadySelected := toDial.Has(try.String())
alreadyDialing := pexR.sw.IsDialing(try)
alreadyConnected := pexR.sw.Peers().Has(try.String())
if alreadySelected || alreadyDialing || alreadyConnected {
log.Debug("Cannot dial address", "addr", try,
"alreadySelected", alreadySelected,
"alreadyDialing", alreadyDialing,
"alreadyConnected", alreadyConnected)
continue
} else {
log.Debug("Will dial address", "addr", try)
picked = try
break
}


+ 4
- 1
p2p/switch.go View File

@ -143,17 +143,20 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
return nil, ErrSwitchStopped
}
log.Info("Dialing peer", "address", addr)
log.Debug("Dialing address", "address", addr)
sw.dialing.Set(addr.String(), addr)
conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second)
sw.dialing.Delete(addr.String())
if err != nil {
log.Debug("Failed dialing address", "address", addr, "error", err)
return nil, err
}
peer, err := sw.AddPeerWithConnection(conn, true)
if err != nil {
log.Debug("Failed adding peer", "address", addr, "conn", conn, "error", err)
return nil, err
}
log.Info("Dialed and added peer", "address", addr, "peer", peer)
return peer, nil
}


Loading…
Cancel
Save