From 7d35500e6b8f8d7110cd6c64d6204d84c87fee25 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 1 Jan 2018 22:34:49 -0500 Subject: [PATCH] p2p: add ID to NetAddress and use for AddrBook --- p2p/addrbook.go | 34 +++++++++++++++++++--------------- p2p/addrbook_test.go | 5 ++++- p2p/key.go | 4 ++-- p2p/netaddress.go | 10 ++++++++-- p2p/pex_reactor.go | 19 ++++++++++--------- p2p/pex_reactor_test.go | 2 ++ p2p/switch.go | 18 ++++++++++-------- 7 files changed, 55 insertions(+), 37 deletions(-) diff --git a/p2p/addrbook.go b/p2p/addrbook.go index 8f924d129..6ccec61f7 100644 --- a/p2p/addrbook.go +++ b/p2p/addrbook.go @@ -89,7 +89,7 @@ type AddrBook struct { mtx sync.Mutex rand *rand.Rand ourAddrs map[string]*NetAddress - addrLookup map[string]*knownAddress // new & old + addrLookup map[ID]*knownAddress // new & old bucketsOld []map[string]*knownAddress bucketsNew []map[string]*knownAddress nOld int @@ -104,7 +104,7 @@ func NewAddrBook(filePath string, routabilityStrict bool) *AddrBook { am := &AddrBook{ rand: rand.New(rand.NewSource(time.Now().UnixNano())), ourAddrs: make(map[string]*NetAddress), - addrLookup: make(map[string]*knownAddress), + addrLookup: make(map[ID]*knownAddress), filePath: filePath, routabilityStrict: routabilityStrict, } @@ -244,11 +244,11 @@ func (a *AddrBook) PickAddress(newBias int) *NetAddress { } // MarkGood marks the peer as good and moves it into an "old" bucket. -// XXX: we never call this! +// TODO: call this from somewhere func (a *AddrBook) MarkGood(addr *NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() - ka := a.addrLookup[addr.String()] + ka := a.addrLookup[addr.ID] if ka == nil { return } @@ -262,7 +262,7 @@ func (a *AddrBook) MarkGood(addr *NetAddress) { func (a *AddrBook) MarkAttempt(addr *NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() - ka := a.addrLookup[addr.String()] + ka := a.addrLookup[addr.ID] if ka == nil { return } @@ -279,11 +279,11 @@ func (a *AddrBook) MarkBad(addr *NetAddress) { func (a *AddrBook) RemoveAddress(addr *NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() - ka := a.addrLookup[addr.String()] + ka := a.addrLookup[addr.ID] if ka == nil { return } - a.Logger.Info("Remove address from book", "addr", addr) + a.Logger.Info("Remove address from book", "addr", ka.Addr) a.removeFromAllBuckets(ka) } @@ -300,8 +300,8 @@ func (a *AddrBook) GetSelection() []*NetAddress { allAddr := make([]*NetAddress, a.size()) i := 0 - for _, v := range a.addrLookup { - allAddr[i] = v.Addr + for _, ka := range a.addrLookup { + allAddr[i] = ka.Addr i++ } @@ -388,7 +388,7 @@ func (a *AddrBook) loadFromFile(filePath string) bool { bucket := a.getBucket(ka.BucketType, bucketIndex) bucket[ka.Addr.String()] = ka } - a.addrLookup[ka.Addr.String()] = ka + a.addrLookup[ka.ID()] = ka if ka.BucketType == bucketTypeNew { a.nNew++ } else { @@ -466,7 +466,7 @@ func (a *AddrBook) addToNewBucket(ka *knownAddress, bucketIdx int) bool { } // Ensure in addrLookup - a.addrLookup[addrStr] = ka + a.addrLookup[ka.ID()] = ka return true } @@ -503,7 +503,7 @@ func (a *AddrBook) addToOldBucket(ka *knownAddress, bucketIdx int) bool { } // Ensure in addrLookup - a.addrLookup[addrStr] = ka + a.addrLookup[ka.ID()] = ka return true } @@ -521,7 +521,7 @@ func (a *AddrBook) removeFromBucket(ka *knownAddress, bucketType byte, bucketIdx } else { a.nOld-- } - delete(a.addrLookup, ka.Addr.String()) + delete(a.addrLookup, ka.ID()) } } @@ -536,7 +536,7 @@ func (a *AddrBook) removeFromAllBuckets(ka *knownAddress) { } else { a.nOld-- } - delete(a.addrLookup, ka.Addr.String()) + delete(a.addrLookup, ka.ID()) } func (a *AddrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress { @@ -559,7 +559,7 @@ func (a *AddrBook) addAddress(addr, src *NetAddress) error { return fmt.Errorf("Cannot add ourselves with address %v", addr) } - ka := a.addrLookup[addr.String()] + ka := a.addrLookup[addr.ID] if ka != nil { // Already old. @@ -768,6 +768,10 @@ func newKnownAddress(addr *NetAddress, src *NetAddress) *knownAddress { } } +func (ka *knownAddress) ID() ID { + return ka.Addr.ID +} + func (ka *knownAddress) isOld() bool { return ka.BucketType == bucketTypeOld } diff --git a/p2p/addrbook_test.go b/p2p/addrbook_test.go index d84c008ed..07ab3474c 100644 --- a/p2p/addrbook_test.go +++ b/p2p/addrbook_test.go @@ -1,12 +1,14 @@ package p2p import ( + "encoding/hex" "fmt" "io/ioutil" "math/rand" "testing" "github.com/stretchr/testify/assert" + cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" ) @@ -102,7 +104,7 @@ func TestAddrBookLookup(t *testing.T) { src := addrSrc.src book.AddAddress(addr, src) - ka := book.addrLookup[addr.String()] + ka := book.addrLookup[addr.ID] assert.NotNil(t, ka, "Expected to find KnownAddress %v but wasn't there.", addr) if !(ka.Addr.Equals(addr) && ka.Src.Equals(src)) { @@ -188,6 +190,7 @@ func randIPv4Address(t *testing.T) *NetAddress { ) port := rand.Intn(65535-1) + 1 addr, err := NewNetAddressString(fmt.Sprintf("%v:%v", ip, port)) + addr.ID = ID(hex.EncodeToString(cmn.RandBytes(20))) // TODO assert.Nil(t, err, "error generating rand network address") if addr.Routable() { return addr diff --git a/p2p/key.go b/p2p/key.go index eede7ce7c..d0031458d 100644 --- a/p2p/key.go +++ b/p2p/key.go @@ -12,6 +12,8 @@ import ( cmn "github.com/tendermint/tmlibs/common" ) +type ID string + //------------------------------------------------------------------------------ // Persistent peer ID // TODO: encrypt on disk @@ -22,8 +24,6 @@ type NodeKey struct { PrivKey crypto.PrivKey `json:"priv_key"` // our priv key } -type ID string - // ID returns the peer's canonical ID - the hash of its public key. func (nodeKey *NodeKey) ID() ID { return ID(hex.EncodeToString(nodeKey.id())) diff --git a/p2p/netaddress.go b/p2p/netaddress.go index 41c2cc976..8176cfde4 100644 --- a/p2p/netaddress.go +++ b/p2p/netaddress.go @@ -16,8 +16,9 @@ import ( ) // NetAddress defines information about a peer on the network -// including its IP address, and port. +// including its ID, IP address, and port. type NetAddress struct { + ID ID IP net.IP Port uint16 str string @@ -122,10 +123,15 @@ func (na *NetAddress) Less(other interface{}) bool { // String representation. func (na *NetAddress) String() string { if na.str == "" { - na.str = net.JoinHostPort( + + addrStr := net.JoinHostPort( na.IP.String(), strconv.FormatUint(uint64(na.Port), 10), ) + if na.ID != "" { + addrStr = fmt.Sprintf("%s@%s", na.ID, addrStr) + } + na.str = addrStr } return na.str } diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 17b9d61ac..47169a1b4 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -107,6 +107,7 @@ func (r *PEXReactor) AddPeer(p Peer) { } } else { // For inbound connections, the peer is its own source addr, err := NewNetAddressString(p.NodeInfo().ListenAddr) + addr.ID = p.ID() // TODO: handle in NewNetAddress func if err != nil { // peer gave us a bad ListenAddr. TODO: punish r.Logger.Error("Error in AddPeer: invalid peer address", "addr", p.NodeInfo().ListenAddr, "err", err) @@ -154,9 +155,9 @@ func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) { case *pexAddrsMessage: // We received some peer addresses from src. // TODO: (We don't want to get spammed with bad peers) - for _, addr := range msg.Addrs { - if addr != nil { - r.book.AddAddress(addr, srcAddr) + for _, netAddr := range msg.Addrs { + if netAddr != nil { + r.book.AddAddress(netAddr, srcAddr) } } default: @@ -170,8 +171,8 @@ func (r *PEXReactor) RequestPEX(p Peer) { } // SendAddrs sends addrs to the peer. -func (r *PEXReactor) SendAddrs(p Peer, addrs []*NetAddress) { - p.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}) +func (r *PEXReactor) SendAddrs(p Peer, netAddrs []*NetAddress) { + p.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: netAddrs}}) } // SetEnsurePeersPeriod sets period to ensure peers connected. @@ -258,19 +259,19 @@ func (r *PEXReactor) ensurePeers() { if try == nil { continue } - if _, selected := toDial[try.IP.String()]; selected { + if _, selected := toDial[string(try.ID)]; selected { continue } - if dialling := r.Switch.IsDialing(try); dialling { + if dialling := r.Switch.IsDialing(try.ID); dialling { continue } // XXX: Should probably use pubkey as peer key ... // TODO: use the ID correctly - if connected := r.Switch.Peers().Has(ID(try.String())); connected { + if connected := r.Switch.Peers().Has(try.ID); connected { continue } r.Logger.Info("Will dial address", "addr", try) - toDial[try.IP.String()] = try + toDial[string(try.ID)] = try } // Dial picked addresses diff --git a/p2p/pex_reactor_test.go b/p2p/pex_reactor_test.go index a14f0eb2a..a830b6c44 100644 --- a/p2p/pex_reactor_test.go +++ b/p2p/pex_reactor_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + crypto "github.com/tendermint/go-crypto" wire "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" @@ -197,6 +198,7 @@ func createRandomPeer(outbound bool) *peer { nodeInfo: &NodeInfo{ ListenAddr: addr, RemoteAddr: netAddr.String(), + PubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(), }, outbound: outbound, mconn: &MConnection{RemoteAddress: netAddr}, diff --git a/p2p/switch.go b/p2p/switch.go index 4da9355a1..8c89ee01d 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -325,6 +325,7 @@ func (sw *Switch) startInitPeer(peer *peer) { // DialSeeds dials a list of seeds asynchronously in random order. func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error { netAddrs, errs := NewNetAddressStrings(seeds) + // TODO: IDs for _, err := range errs { sw.Logger.Error("Error in seed's address", "err", err) } @@ -373,8 +374,8 @@ func (sw *Switch) dialSeed(addr *NetAddress) { // DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects successfully. // If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails. func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (Peer, error) { - sw.dialing.Set(addr.IP.String(), addr) - defer sw.dialing.Delete(addr.IP.String()) + sw.dialing.Set(string(addr.ID), addr) + defer sw.dialing.Delete(string(addr.ID)) sw.Logger.Info("Dialing peer", "address", addr) peer, err := newOutboundPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, sw.peerConfig) @@ -396,9 +397,9 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (Peer, return peer, nil } -// IsDialing returns true if the switch is currently dialing the given address. -func (sw *Switch) IsDialing(addr *NetAddress) bool { - return sw.dialing.Has(addr.IP.String()) +// IsDialing returns true if the switch is currently dialing the given ID. +func (sw *Switch) IsDialing(id ID) bool { + return sw.dialing.Has(string(id)) } // Broadcast runs a go routine for each attempted send, which will block @@ -454,7 +455,8 @@ func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) { // If no success after all that, it stops trying, and leaves it // to the PEX/Addrbook to find the peer again func (sw *Switch) reconnectToPeer(peer Peer) { - addr, _ := NewNetAddressString(peer.NodeInfo().RemoteAddr) + netAddr, _ := NewNetAddressString(peer.NodeInfo().RemoteAddr) + netAddr.ID = peer.ID() // TODO: handle above start := time.Now() sw.Logger.Info("Reconnecting to peer", "peer", peer) for i := 0; i < reconnectAttempts; i++ { @@ -462,7 +464,7 @@ func (sw *Switch) reconnectToPeer(peer Peer) { return } - peer, err := sw.DialPeerWithAddress(addr, true) + peer, err := sw.DialPeerWithAddress(netAddr, true) if err != nil { sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer) // sleep a set amount @@ -484,7 +486,7 @@ func (sw *Switch) reconnectToPeer(peer Peer) { // sleep an exponentially increasing amount sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i)) sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second) - peer, err := sw.DialPeerWithAddress(addr, true) + peer, err := sw.DialPeerWithAddress(netAddr, true) if err != nil { sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer) continue