diff --git a/config/config.go b/config/config.go index 072606b43..e34d9b959 100644 --- a/config/config.go +++ b/config/config.go @@ -22,11 +22,13 @@ var ( defaultGenesisJSONName = "genesis.json" defaultPrivValName = "priv_validator.json" defaultNodeKeyName = "node_key.json" + defaultAddrBookName = "addrbook.json" defaultConfigFilePath = filepath.Join(defaultConfigDir, defaultConfigFileName) defaultGenesisJSONPath = filepath.Join(defaultConfigDir, defaultGenesisJSONName) defaultPrivValPath = filepath.Join(defaultConfigDir, defaultPrivValName) defaultNodeKeyPath = filepath.Join(defaultConfigDir, defaultNodeKeyName) + defaultAddrBookPath = filepath.Join(defaultConfigDir, defaultAddrBookName) ) // Config defines the top level configuration for a Tendermint node @@ -278,7 +280,7 @@ type P2PConfig struct { func DefaultP2PConfig() *P2PConfig { return &P2PConfig{ ListenAddress: "tcp://0.0.0.0:46656", - AddrBook: "addrbook.json", + AddrBook: defaultAddrBookPath, AddrBookStrict: true, MaxNumPeers: 50, FlushThrottleTimeout: 100, diff --git a/node/node.go b/node/node.go index 3012ed057..bdbf12f82 100644 --- a/node/node.go +++ b/node/node.go @@ -22,6 +22,7 @@ import ( "github.com/tendermint/tendermint/evidence" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/p2p/pex" "github.com/tendermint/tendermint/p2p/trust" "github.com/tendermint/tendermint/proxy" rpccore "github.com/tendermint/tendermint/rpc/core" @@ -97,7 +98,7 @@ type Node struct { // network sw *p2p.Switch // p2p connections - addrBook *p2p.AddrBook // known peers + addrBook pex.AddrBook // known peers trustMetricStore *trust.TrustMetricStore // trust metrics for all peers // services @@ -238,10 +239,10 @@ func NewNode(config *cfg.Config, sw.AddReactor("EVIDENCE", evidenceReactor) // Optionally, start the pex reactor - var addrBook *p2p.AddrBook + var addrBook pex.AddrBook var trustMetricStore *trust.TrustMetricStore if config.P2P.PexReactor { - addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict) + addrBook = pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict) addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile())) // Get the trust metric history data @@ -256,8 +257,8 @@ func NewNode(config *cfg.Config, if config.P2P.Seeds != "" { seeds = strings.Split(config.P2P.Seeds, ",") } - pexReactor := p2p.NewPEXReactor(addrBook, - &p2p.PEXReactorConfig{Seeds: seeds}) + pexReactor := pex.NewPEXReactor(addrBook, + &pex.PEXReactorConfig{Seeds: seeds}) pexReactor.SetLogger(p2pLogger) sw.AddReactor("PEX", pexReactor) } diff --git a/p2p/base_reactor.go b/p2p/base_reactor.go index e8107d730..20525e675 100644 --- a/p2p/base_reactor.go +++ b/p2p/base_reactor.go @@ -1,12 +1,15 @@ package p2p -import cmn "github.com/tendermint/tmlibs/common" +import ( + "github.com/tendermint/tendermint/p2p/conn" + cmn "github.com/tendermint/tmlibs/common" +) type Reactor interface { cmn.Service // Start, Stop SetSwitch(*Switch) - GetChannels() []*ChannelDescriptor + GetChannels() []*conn.ChannelDescriptor AddPeer(peer Peer) RemovePeer(peer Peer, reason interface{}) Receive(chID byte, peer Peer, msgBytes []byte) // CONTRACT: msgBytes are not nil @@ -29,7 +32,7 @@ func NewBaseReactor(name string, impl Reactor) *BaseReactor { func (br *BaseReactor) SetSwitch(sw *Switch) { br.Switch = sw } -func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil } +func (_ *BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil } func (_ *BaseReactor) AddPeer(peer Peer) {} func (_ *BaseReactor) RemovePeer(peer Peer, reason interface{}) {} func (_ *BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {} diff --git a/p2p/conn_go110.go b/p2p/conn/conn_go110.go similarity index 86% rename from p2p/conn_go110.go rename to p2p/conn/conn_go110.go index 2fca7c3df..682188101 100644 --- a/p2p/conn_go110.go +++ b/p2p/conn/conn_go110.go @@ -1,6 +1,6 @@ // +build go1.10 -package p2p +package conn // Go1.10 has a proper net.Conn implementation that // has the SetDeadline method implemented as per @@ -10,6 +10,6 @@ package p2p import "net" -func netPipe() (net.Conn, net.Conn) { +func NetPipe() (net.Conn, net.Conn) { return net.Pipe() } diff --git a/p2p/conn_notgo110.go b/p2p/conn/conn_notgo110.go similarity index 94% rename from p2p/conn_notgo110.go rename to p2p/conn/conn_notgo110.go index a5c2f7410..ed642eb54 100644 --- a/p2p/conn_notgo110.go +++ b/p2p/conn/conn_notgo110.go @@ -1,6 +1,6 @@ // +build !go1.10 -package p2p +package conn import ( "net" @@ -24,7 +24,7 @@ func (p *pipe) SetDeadline(t time.Time) error { return nil } -func netPipe() (net.Conn, net.Conn) { +func NetPipe() (net.Conn, net.Conn) { p1, p2 := net.Pipe() return &pipe{p1}, &pipe{p2} } diff --git a/p2p/connection.go b/p2p/conn/connection.go similarity index 98% rename from p2p/connection.go rename to p2p/conn/connection.go index dcb660967..71b2a13d0 100644 --- a/p2p/connection.go +++ b/p2p/conn/connection.go @@ -1,4 +1,4 @@ -package p2p +package conn import ( "bufio" @@ -97,13 +97,13 @@ type MConnConfig struct { SendRate int64 `mapstructure:"send_rate"` RecvRate int64 `mapstructure:"recv_rate"` - maxMsgPacketPayloadSize int + MaxMsgPacketPayloadSize int - flushThrottle time.Duration + FlushThrottle time.Duration } func (cfg *MConnConfig) maxMsgPacketTotalSize() int { - return cfg.maxMsgPacketPayloadSize + maxMsgPacketOverheadSize + return cfg.MaxMsgPacketPayloadSize + maxMsgPacketOverheadSize } // DefaultMConnConfig returns the default config. @@ -111,8 +111,8 @@ func DefaultMConnConfig() *MConnConfig { return &MConnConfig{ SendRate: defaultSendRate, RecvRate: defaultRecvRate, - maxMsgPacketPayloadSize: defaultMaxMsgPacketPayloadSize, - flushThrottle: defaultFlushThrottle, + MaxMsgPacketPayloadSize: defaultMaxMsgPacketPayloadSize, + FlushThrottle: defaultFlushThrottle, } } @@ -171,7 +171,7 @@ func (c *MConnection) OnStart() error { return err } c.quit = make(chan struct{}) - c.flushTimer = cmn.NewThrottleTimer("flush", c.config.flushThrottle) + c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle) c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout) c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats) go c.sendRoutine() @@ -586,7 +586,7 @@ func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel { desc: desc, sendQueue: make(chan []byte, desc.SendQueueCapacity), recving: make([]byte, 0, desc.RecvBufferCapacity), - maxMsgPacketPayloadSize: conn.config.maxMsgPacketPayloadSize, + maxMsgPacketPayloadSize: conn.config.MaxMsgPacketPayloadSize, } } diff --git a/p2p/connection_test.go b/p2p/conn/connection_test.go similarity index 97% rename from p2p/connection_test.go rename to p2p/conn/connection_test.go index 2a64764ea..9c8eccbe4 100644 --- a/p2p/connection_test.go +++ b/p2p/conn/connection_test.go @@ -1,4 +1,4 @@ -package p2p +package conn import ( "net" @@ -31,7 +31,7 @@ func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msg func TestMConnectionSend(t *testing.T) { assert, require := assert.New(t), require.New(t) - server, client := netPipe() + server, client := NetPipe() defer server.Close() // nolint: errcheck defer client.Close() // nolint: errcheck @@ -64,7 +64,7 @@ func TestMConnectionSend(t *testing.T) { func TestMConnectionReceive(t *testing.T) { assert, require := assert.New(t), require.New(t) - server, client := netPipe() + server, client := NetPipe() defer server.Close() // nolint: errcheck defer client.Close() // nolint: errcheck @@ -102,7 +102,7 @@ func TestMConnectionReceive(t *testing.T) { func TestMConnectionStatus(t *testing.T) { assert, require := assert.New(t), require.New(t) - server, client := netPipe() + server, client := NetPipe() defer server.Close() // nolint: errcheck defer client.Close() // nolint: errcheck @@ -119,7 +119,7 @@ func TestMConnectionStatus(t *testing.T) { func TestMConnectionStopsAndReturnsError(t *testing.T) { assert, require := assert.New(t), require.New(t) - server, client := netPipe() + server, client := NetPipe() defer server.Close() // nolint: errcheck defer client.Close() // nolint: errcheck @@ -152,7 +152,7 @@ func TestMConnectionStopsAndReturnsError(t *testing.T) { } func newClientAndServerConnsForReadErrors(require *require.Assertions, chOnErr chan struct{}) (*MConnection, *MConnection) { - server, client := netPipe() + server, client := NetPipe() onReceive := func(chID byte, msgBytes []byte) {} onError := func(r interface{}) {} @@ -283,7 +283,7 @@ func TestMConnectionReadErrorUnknownMsgType(t *testing.T) { func TestMConnectionTrySend(t *testing.T) { assert, require := assert.New(t), require.New(t) - server, client := netPipe() + server, client := NetPipe() defer server.Close() defer client.Close() diff --git a/p2p/secret_connection.go b/p2p/conn/secret_connection.go similarity index 99% rename from p2p/secret_connection.go rename to p2p/conn/secret_connection.go index f022d9c35..aa6db05bb 100644 --- a/p2p/secret_connection.go +++ b/p2p/conn/secret_connection.go @@ -4,7 +4,7 @@ // is known ahead of time, and thus we are technically // still vulnerable to MITM. (TODO!) // See docs/sts-final.pdf for more info -package p2p +package conn import ( "bytes" diff --git a/p2p/secret_connection_test.go b/p2p/conn/secret_connection_test.go similarity index 99% rename from p2p/secret_connection_test.go rename to p2p/conn/secret_connection_test.go index 5e0611a87..8af9cdeb5 100644 --- a/p2p/secret_connection_test.go +++ b/p2p/conn/secret_connection_test.go @@ -1,4 +1,4 @@ -package p2p +package conn import ( "io" diff --git a/p2p/errors.go b/p2p/errors.go new file mode 100644 index 000000000..cb6a7051a --- /dev/null +++ b/p2p/errors.go @@ -0,0 +1,20 @@ +package p2p + +import ( + "errors" + "fmt" +) + +var ( + ErrSwitchDuplicatePeer = errors.New("Duplicate peer") + ErrSwitchConnectToSelf = errors.New("Connect to self") +) + +type ErrSwitchAuthenticationFailure struct { + Dialed *NetAddress + Got ID +} + +func (e ErrSwitchAuthenticationFailure) Error() string { + return fmt.Sprintf("Failed to authenticate peer. Dialed %v, but got peer with ID %s", e.Dialed, e.Got) +} diff --git a/p2p/node_info.go b/p2p/node_info.go new file mode 100644 index 000000000..72873add8 --- /dev/null +++ b/p2p/node_info.go @@ -0,0 +1,105 @@ +package p2p + +import ( + "fmt" + "strings" + + crypto "github.com/tendermint/go-crypto" +) + +const maxNodeInfoSize = 10240 // 10Kb + +func MaxNodeInfoSize() int { + return maxNodeInfoSize +} + +// NodeInfo is the basic node information exchanged +// between two peers during the Tendermint P2P handshake. +type NodeInfo struct { + // Authenticate + PubKey crypto.PubKey `json:"pub_key"` // authenticated pubkey + ListenAddr string `json:"listen_addr"` // accepting incoming + + // Check compatibility + Network string `json:"network"` // network/chain ID + Version string `json:"version"` // major.minor.revision + + // Sanitize + Moniker string `json:"moniker"` // arbitrary moniker + Other []string `json:"other"` // other application specific data +} + +// Validate checks the self-reported NodeInfo is safe. +// It returns an error if the info.PubKey doesn't match the given pubKey. +// TODO: constraints for Moniker/Other? Or is that for the UI ? +func (info NodeInfo) Validate(pubKey crypto.PubKey) error { + if !info.PubKey.Equals(pubKey) { + return fmt.Errorf("info.PubKey (%v) doesn't match peer.PubKey (%v)", + info.PubKey, pubKey) + } + return nil +} + +// CompatibleWith checks if two NodeInfo are compatible with eachother. +// CONTRACT: two nodes are compatible if the major/minor versions match and network match. +func (info NodeInfo) CompatibleWith(other NodeInfo) error { + iMajor, iMinor, _, iErr := splitVersion(info.Version) + oMajor, oMinor, _, oErr := splitVersion(other.Version) + + // if our own version number is not formatted right, we messed up + if iErr != nil { + return iErr + } + + // version number must be formatted correctly ("x.x.x") + if oErr != nil { + return oErr + } + + // major version must match + if iMajor != oMajor { + return fmt.Errorf("Peer is on a different major version. Got %v, expected %v", oMajor, iMajor) + } + + // minor version must match + if iMinor != oMinor { + return fmt.Errorf("Peer is on a different minor version. Got %v, expected %v", oMinor, iMinor) + } + + // nodes must be on the same network + if info.Network != other.Network { + return fmt.Errorf("Peer is on a different network. Got %v, expected %v", other.Network, info.Network) + } + + return nil +} + +func (info NodeInfo) ID() ID { + return PubKeyToID(info.PubKey) +} + +// NetAddress returns a NetAddress derived from the NodeInfo - +// it includes the authenticated peer ID and the self-reported +// ListenAddr. Note that the ListenAddr is not authenticated and +// may not match that address actually dialed if its an outbound peer. +func (info NodeInfo) NetAddress() *NetAddress { + id := PubKeyToID(info.PubKey) + addr := info.ListenAddr + netAddr, err := NewNetAddressString(IDAddressString(id, addr)) + if err != nil { + panic(err) // everything should be well formed by now + } + return netAddr +} + +func (info NodeInfo) String() string { + return fmt.Sprintf("NodeInfo{pk: %v, moniker: %v, network: %v [listen %v], version: %v (%v)}", info.PubKey, info.Moniker, info.Network, info.ListenAddr, info.Version, info.Other) +} + +func splitVersion(version string) (string, string, string, error) { + spl := strings.Split(version, ".") + if len(spl) != 3 { + return "", "", "", fmt.Errorf("Invalid version format %v", version) + } + return spl[0], spl[1], spl[2], nil +} diff --git a/p2p/peer.go b/p2p/peer.go index 596b92168..e427b0d95 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -11,6 +11,8 @@ import ( wire "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" + + tmconn "github.com/tendermint/tendermint/p2p/conn" ) // Peer is an interface representing a peer connected on a reactor. @@ -21,7 +23,7 @@ type Peer interface { IsOutbound() bool // did we dial the peer IsPersistent() bool // do we redial this peer when we disconnect NodeInfo() NodeInfo // peer's info - Status() ConnectionStatus + Status() tmconn.ConnectionStatus Send(byte, interface{}) bool TrySend(byte, interface{}) bool @@ -40,8 +42,8 @@ type peer struct { outbound bool - conn net.Conn // source connection - mconn *MConnection // multiplex connection + conn net.Conn // source connection + mconn *tmconn.MConnection // multiplex connection persistent bool config *PeerConfig @@ -58,7 +60,7 @@ type PeerConfig struct { HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"` DialTimeout time.Duration `mapstructure:"dial_timeout"` - MConfig *MConnConfig `mapstructure:"connection"` + MConfig *tmconn.MConnConfig `mapstructure:"connection"` Fuzz bool `mapstructure:"fuzz"` // fuzz connection (for testing) FuzzConfig *FuzzConnConfig `mapstructure:"fuzz_config"` @@ -70,13 +72,13 @@ func DefaultPeerConfig() *PeerConfig { AuthEnc: true, HandshakeTimeout: 20, // * time.Second, DialTimeout: 3, // * time.Second, - MConfig: DefaultMConnConfig(), + MConfig: tmconn.DefaultMConnConfig(), Fuzz: false, FuzzConfig: DefaultFuzzConnConfig(), } } -func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, +func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor, onPeerError func(Peer, interface{}), ourNodePrivKey crypto.PrivKey, config *PeerConfig, persistent bool) (*peer, error) { conn, err := dial(addr, config) @@ -96,7 +98,7 @@ func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs [] return peer, nil } -func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, +func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor, onPeerError func(Peer, interface{}), ourNodePrivKey crypto.PrivKey, config *PeerConfig) (*peer, error) { // TODO: issue PoW challenge @@ -104,7 +106,7 @@ func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*Cha return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config) } -func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, +func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor, onPeerError func(Peer, interface{}), ourNodePrivKey crypto.PrivKey, config *PeerConfig) (*peer, error) { conn := rawConn @@ -122,7 +124,7 @@ func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[ } var err error - conn, err = MakeSecretConnection(conn, ourNodePrivKey) + conn, err = tmconn.MakeSecretConnection(conn, ourNodePrivKey) if err != nil { return nil, errors.Wrap(err, "Error creating peer") } @@ -191,7 +193,7 @@ func (p *peer) NodeInfo() NodeInfo { } // Status returns the peer's ConnectionStatus. -func (p *peer) Status() ConnectionStatus { +func (p *peer) Status() tmconn.ConnectionStatus { return p.mconn.Status() } @@ -252,7 +254,7 @@ func (p *peer) HandshakeTimeout(ourNodeInfo NodeInfo, timeout time.Duration) err }, func() { var n int - wire.ReadBinary(&peerNodeInfo, p.conn, maxNodeInfoSize, &n, &err2) + wire.ReadBinary(&peerNodeInfo, p.conn, MaxNodeInfoSize(), &n, &err2) p.Logger.Info("Peer handshake", "peerNodeInfo", peerNodeInfo) }) if err1 != nil { @@ -267,8 +269,6 @@ func (p *peer) HandshakeTimeout(ourNodeInfo NodeInfo, timeout time.Duration) err return errors.Wrap(err, "Error removing deadline") } - // TODO: fix the peerNodeInfo.ListenAddr - p.nodeInfo = peerNodeInfo return nil } @@ -283,7 +283,7 @@ func (p *peer) PubKey() crypto.PubKey { if !p.nodeInfo.PubKey.Empty() { return p.nodeInfo.PubKey } else if p.config.AuthEnc { - return p.conn.(*SecretConnection).RemotePubKey() + return p.conn.(*tmconn.SecretConnection).RemotePubKey() } panic("Attempt to get peer's PubKey before calling Handshake") } @@ -316,8 +316,8 @@ func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) { return conn, nil } -func createMConnection(conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, - onPeerError func(Peer, interface{}), config *MConnConfig) *MConnection { +func createMConnection(conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor, + onPeerError func(Peer, interface{}), config *tmconn.MConnConfig) *tmconn.MConnection { onReceive := func(chID byte, msgBytes []byte) { reactor := reactorsByCh[chID] @@ -331,5 +331,5 @@ func createMConnection(conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, ch onPeerError(p, r) } - return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config) + return tmconn.NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config) } diff --git a/p2p/peer_test.go b/p2p/peer_test.go index d99fff5e4..978775c8e 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" crypto "github.com/tendermint/go-crypto" + tmconn "github.com/tendermint/tendermint/p2p/conn" ) func TestPeerBasic(t *testing.T) { @@ -81,7 +82,7 @@ func TestPeerSend(t *testing.T) { } func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) (*peer, error) { - chDescs := []*ChannelDescriptor{ + chDescs := []*tmconn.ChannelDescriptor{ {ID: 0x01, Priority: 1}, } reactorsByCh := map[byte]Reactor{0x01: NewTestReactor(chDescs, true)} @@ -137,7 +138,7 @@ func (p *remotePeer) accept(l net.Listener) { if err != nil { golog.Fatalf("Failed to accept conn: %+v", err) } - peer, err := newInboundPeer(conn, make(map[byte]Reactor), make([]*ChannelDescriptor, 0), func(p Peer, r interface{}) {}, p.PrivKey, p.Config) + peer, err := newInboundPeer(conn, make(map[byte]Reactor), make([]*tmconn.ChannelDescriptor, 0), func(p Peer, r interface{}) {}, p.PrivKey, p.Config) if err != nil { golog.Fatalf("Failed to create a peer: %+v", err) } diff --git a/p2p/addrbook.go b/p2p/pex/addrbook.go similarity index 58% rename from p2p/addrbook.go rename to p2p/pex/addrbook.go index fee179cb1..3a3be6e44 100644 --- a/p2p/addrbook.go +++ b/p2p/pex/addrbook.go @@ -2,95 +2,80 @@ // Originally Copyright (c) 2013-2014 Conformal Systems LLC. // https://github.com/conformal/btcd/blob/master/LICENSE -package p2p +package pex import ( "crypto/sha256" "encoding/binary" - "encoding/json" "fmt" "math" "math/rand" "net" - "os" "sync" "time" crypto "github.com/tendermint/go-crypto" + "github.com/tendermint/tendermint/p2p" cmn "github.com/tendermint/tmlibs/common" ) const ( - // addresses under which the address manager will claim to need more addresses. - needAddressThreshold = 1000 - - // interval used to dump the address cache to disk for future use. - dumpAddressInterval = time.Minute * 2 - - // max addresses in each old address bucket. - oldBucketSize = 64 - - // buckets we split old addresses over. - oldBucketCount = 64 - - // max addresses in each new address bucket. - newBucketSize = 64 - - // buckets that we spread new addresses over. - newBucketCount = 256 - - // old buckets over which an address group will be spread. - oldBucketsPerGroup = 4 + bucketTypeNew = 0x01 + bucketTypeOld = 0x02 +) - // new buckets over which a source address group will be spread. - newBucketsPerGroup = 32 +// AddrBook is an address book used for tracking peers +// so we can gossip about them to others and select +// peers to dial. +// TODO: break this up? +type AddrBook interface { + cmn.Service - // buckets a frequently seen new address may end up in. - maxNewBucketsPerAddress = 4 + // Add our own addresses so we don't later add ourselves + AddOurAddress(*p2p.NetAddress) - // days before which we assume an address has vanished - // if we have not seen it announced in that long. - numMissingDays = 30 + // Add and remove an address + AddAddress(addr *p2p.NetAddress, src *p2p.NetAddress) error + RemoveAddress(addr *p2p.NetAddress) - // tries without a single success before we assume an address is bad. - numRetries = 3 + // Do we need more peers? + NeedMoreAddrs() bool - // max failures we will accept without a success before considering an address bad. - maxFailures = 10 + // Pick an address to dial + PickAddress(newBias int) *p2p.NetAddress - // days since the last success before we will consider evicting an address. - minBadDays = 7 + // Mark address + MarkGood(*p2p.NetAddress) + MarkAttempt(*p2p.NetAddress) + MarkBad(*p2p.NetAddress) - // % of total addresses known returned by GetSelection. - getSelectionPercent = 23 + // Send a selection of addresses to peers + GetSelection() []*p2p.NetAddress - // min addresses that must be returned by GetSelection. Useful for bootstrapping. - minGetSelection = 32 + // TODO: remove + ListOfKnownAddresses() []*knownAddress - // max addresses returned by GetSelection - // NOTE: this must match "maxPexMessageSize" - maxGetSelection = 250 -) + // Persist to disk + Save() +} -const ( - bucketTypeNew = 0x01 - bucketTypeOld = 0x02 -) +var _ AddrBook = (*addrBook)(nil) -// AddrBook - concurrency safe peer address manager. -type AddrBook struct { +// addrBook - concurrency safe peer address manager. +// Implements AddrBook. +type addrBook struct { cmn.BaseService // immutable after creation filePath string routabilityStrict bool - key string + key string // random prefix for bucket placement // accessed concurrently mtx sync.Mutex rand *rand.Rand - ourAddrs map[string]*NetAddress - addrLookup map[ID]*knownAddress // new & old + ourAddrs map[string]*p2p.NetAddress + addrLookup map[p2p.ID]*knownAddress // new & old bucketsOld []map[string]*knownAddress bucketsNew []map[string]*knownAddress nOld int @@ -101,11 +86,11 @@ type AddrBook struct { // NewAddrBook creates a new address book. // Use Start to begin processing asynchronous address updates. -func NewAddrBook(filePath string, routabilityStrict bool) *AddrBook { - am := &AddrBook{ - rand: rand.New(rand.NewSource(time.Now().UnixNano())), - ourAddrs: make(map[string]*NetAddress), - addrLookup: make(map[ID]*knownAddress), +func NewAddrBook(filePath string, routabilityStrict bool) *addrBook { + am := &addrBook{ + rand: rand.New(rand.NewSource(time.Now().UnixNano())), // TODO: seed from outside + ourAddrs: make(map[string]*p2p.NetAddress), + addrLookup: make(map[p2p.ID]*knownAddress), filePath: filePath, routabilityStrict: routabilityStrict, } @@ -114,8 +99,9 @@ func NewAddrBook(filePath string, routabilityStrict bool) *AddrBook { return am } +// Initialize the buckets. // When modifying this, don't forget to update loadFromFile() -func (a *AddrBook) init() { +func (a *addrBook) init() { a.key = crypto.CRandHex(24) // 24/2 * 8 = 96 bits // New addr buckets a.bucketsNew = make([]map[string]*knownAddress, newBucketCount) @@ -130,7 +116,7 @@ func (a *AddrBook) init() { } // OnStart implements Service. -func (a *AddrBook) OnStart() error { +func (a *addrBook) OnStart() error { if err := a.BaseService.OnStart(); err != nil { return err } @@ -145,62 +131,56 @@ func (a *AddrBook) OnStart() error { } // OnStop implements Service. -func (a *AddrBook) OnStop() { +func (a *addrBook) OnStop() { a.BaseService.OnStop() } -func (a *AddrBook) Wait() { +func (a *addrBook) Wait() { a.wg.Wait() } -// AddOurAddress adds another one of our addresses. -func (a *AddrBook) AddOurAddress(addr *NetAddress) { +//------------------------------------------------------- + +// AddOurAddress one of our addresses. +func (a *addrBook) AddOurAddress(addr *p2p.NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() a.Logger.Info("Add our address to book", "addr", addr) a.ourAddrs[addr.String()] = addr } -// OurAddresses returns a list of our addresses. -func (a *AddrBook) OurAddresses() []*NetAddress { - addrs := []*NetAddress{} - for _, addr := range a.ourAddrs { - addrs = append(addrs, addr) - } - return addrs -} - -// AddAddress adds the given address as received from the given source. +// AddAddress implements AddrBook - adds the given address as received from the given source. // NOTE: addr must not be nil -func (a *AddrBook) AddAddress(addr *NetAddress, src *NetAddress) error { +func (a *addrBook) AddAddress(addr *p2p.NetAddress, src *p2p.NetAddress) error { a.mtx.Lock() defer a.mtx.Unlock() return a.addAddress(addr, src) } -// NeedMoreAddrs returns true if there are not have enough addresses in the book. -func (a *AddrBook) NeedMoreAddrs() bool { - return a.Size() < needAddressThreshold -} - -// Size returns the number of addresses in the book. -func (a *AddrBook) Size() int { +// RemoveAddress implements AddrBook - removes the address from the book. +func (a *addrBook) RemoveAddress(addr *p2p.NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() - return a.size() + ka := a.addrLookup[addr.ID] + if ka == nil { + return + } + a.Logger.Info("Remove address from book", "addr", ka.Addr, "ID", ka.ID) + a.removeFromAllBuckets(ka) } -func (a *AddrBook) size() int { - return a.nNew + a.nOld +// NeedMoreAddrs implements AddrBook - returns true if there are not have enough addresses in the book. +func (a *addrBook) NeedMoreAddrs() bool { + return a.Size() < needAddressThreshold } -// PickAddress picks an address to connect to. +// PickAddress implements AddrBook. It picks an address to connect to. // The address is picked randomly from an old or new bucket according // to the newBias argument, which must be between [0, 100] (or else is truncated to that range) // and determines how biased we are to pick an address from a new bucket. // PickAddress returns nil if the AddrBook is empty or if we try to pick // from an empty bucket. -func (a *AddrBook) PickAddress(newBias int) *NetAddress { +func (a *addrBook) PickAddress(newBias int) *p2p.NetAddress { a.mtx.Lock() defer a.mtx.Unlock() @@ -244,9 +224,9 @@ func (a *AddrBook) PickAddress(newBias int) *NetAddress { return nil } -// MarkGood marks the peer as good and moves it into an "old" bucket. -// TODO: call this from somewhere -func (a *AddrBook) MarkGood(addr *NetAddress) { +// MarkGood implements AddrBook - it marks the peer as good and +// moves it into an "old" bucket. +func (a *addrBook) MarkGood(addr *p2p.NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() ka := a.addrLookup[addr.ID] @@ -259,8 +239,8 @@ func (a *AddrBook) MarkGood(addr *NetAddress) { } } -// MarkAttempt marks that an attempt was made to connect to the address. -func (a *AddrBook) MarkAttempt(addr *NetAddress) { +// MarkAttempt implements AddrBook - it marks that an attempt was made to connect to the address. +func (a *addrBook) MarkAttempt(addr *p2p.NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() ka := a.addrLookup[addr.ID] @@ -270,28 +250,15 @@ func (a *AddrBook) MarkAttempt(addr *NetAddress) { ka.markAttempt() } -// MarkBad currently just ejects the address. In the future, consider -// blacklisting. -func (a *AddrBook) MarkBad(addr *NetAddress) { +// MarkBad implements AddrBook. Currently it just ejects the address. +// TODO: black list for some amount of time +func (a *addrBook) MarkBad(addr *p2p.NetAddress) { a.RemoveAddress(addr) } -// RemoveAddress removes the address from the book. -func (a *AddrBook) RemoveAddress(addr *NetAddress) { - a.mtx.Lock() - defer a.mtx.Unlock() - ka := a.addrLookup[addr.ID] - if ka == nil { - return - } - a.Logger.Info("Remove address from book", "addr", ka.Addr, "ID", ka.ID) - a.removeFromAllBuckets(ka) -} - -/* Peer exchange */ - -// GetSelection randomly selects some addresses (old & new). Suitable for peer-exchange protocols. -func (a *AddrBook) GetSelection() []*NetAddress { +// GetSelection implements AddrBook. +// It randomly selects some addresses (old & new). Suitable for peer-exchange protocols. +func (a *addrBook) GetSelection() []*p2p.NetAddress { a.mtx.Lock() defer a.mtx.Unlock() @@ -299,7 +266,7 @@ func (a *AddrBook) GetSelection() []*NetAddress { return nil } - allAddr := make([]*NetAddress, a.size()) + allAddr := make([]*p2p.NetAddress, a.size()) i := 0 for _, ka := range a.addrLookup { allAddr[i] = ka.Addr @@ -325,7 +292,7 @@ func (a *AddrBook) GetSelection() []*NetAddress { } // ListOfKnownAddresses returns the new and old addresses. -func (a *AddrBook) ListOfKnownAddresses() []*knownAddress { +func (a *addrBook) ListOfKnownAddresses() []*knownAddress { a.mtx.Lock() defer a.mtx.Unlock() @@ -336,102 +303,27 @@ func (a *AddrBook) ListOfKnownAddresses() []*knownAddress { return addrs } -func (ka *knownAddress) copy() *knownAddress { - return &knownAddress{ - Addr: ka.Addr, - Src: ka.Src, - Attempts: ka.Attempts, - LastAttempt: ka.LastAttempt, - LastSuccess: ka.LastSuccess, - BucketType: ka.BucketType, - Buckets: ka.Buckets, - } -} - -/* Loading & Saving */ - -type addrBookJSON struct { - Key string - Addrs []*knownAddress -} - -func (a *AddrBook) saveToFile(filePath string) { - a.Logger.Info("Saving AddrBook to file", "size", a.Size()) +//------------------------------------------------ +// Size returns the number of addresses in the book. +func (a *addrBook) Size() int { a.mtx.Lock() defer a.mtx.Unlock() - // Compile Addrs - addrs := []*knownAddress{} - for _, ka := range a.addrLookup { - addrs = append(addrs, ka) - } - - aJSON := &addrBookJSON{ - Key: a.key, - Addrs: addrs, - } - - jsonBytes, err := json.MarshalIndent(aJSON, "", "\t") - if err != nil { - a.Logger.Error("Failed to save AddrBook to file", "err", err) - return - } - err = cmn.WriteFileAtomic(filePath, jsonBytes, 0644) - if err != nil { - a.Logger.Error("Failed to save AddrBook to file", "file", filePath, "err", err) - } + return a.size() } -// Returns false if file does not exist. -// cmn.Panics if file is corrupt. -func (a *AddrBook) loadFromFile(filePath string) bool { - // If doesn't exist, do nothing. - _, err := os.Stat(filePath) - if os.IsNotExist(err) { - return false - } - - // Load addrBookJSON{} - r, err := os.Open(filePath) - if err != nil { - cmn.PanicCrisis(cmn.Fmt("Error opening file %s: %v", filePath, err)) - } - defer r.Close() // nolint: errcheck - aJSON := &addrBookJSON{} - dec := json.NewDecoder(r) - err = dec.Decode(aJSON) - if err != nil { - cmn.PanicCrisis(cmn.Fmt("Error reading file %s: %v", filePath, err)) - } - - // Restore all the fields... - // Restore the key - a.key = aJSON.Key - // Restore .bucketsNew & .bucketsOld - for _, ka := range aJSON.Addrs { - for _, bucketIndex := range ka.Buckets { - bucket := a.getBucket(ka.BucketType, bucketIndex) - bucket[ka.Addr.String()] = ka - } - a.addrLookup[ka.ID()] = ka - if ka.BucketType == bucketTypeNew { - a.nNew++ - } else { - a.nOld++ - } - } - return true +func (a *addrBook) size() int { + return a.nNew + a.nOld } -// Save saves the book. -func (a *AddrBook) Save() { - a.Logger.Info("Saving AddrBook to file", "size", a.Size()) - a.saveToFile(a.filePath) -} +//---------------------------------------------------------- -/* Private methods */ +// Save persists the address book to disk. +func (a *addrBook) Save() { + a.saveToFile(a.filePath) // thread safe +} -func (a *AddrBook) saveRoutine() { +func (a *addrBook) saveRoutine() { defer a.wg.Done() saveFileTicker := time.NewTicker(dumpAddressInterval) @@ -449,7 +341,9 @@ out: a.Logger.Info("Address handler done") } -func (a *AddrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAddress { +//---------------------------------------------------------- + +func (a *addrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAddress { switch bucketType { case bucketTypeNew: return a.bucketsNew[bucketIdx] @@ -463,7 +357,7 @@ func (a *AddrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAd // Adds ka to new bucket. Returns false if it couldn't do it cuz buckets full. // NOTE: currently it always returns true. -func (a *AddrBook) addToNewBucket(ka *knownAddress, bucketIdx int) bool { +func (a *addrBook) addToNewBucket(ka *knownAddress, bucketIdx int) bool { // Sanity check if ka.isOld() { a.Logger.Error(cmn.Fmt("Cannot add address already in old bucket to a new bucket: %v", ka)) @@ -480,24 +374,25 @@ func (a *AddrBook) addToNewBucket(ka *knownAddress, bucketIdx int) bool { // Enforce max addresses. if len(bucket) > newBucketSize { - a.Logger.Info("new bucket is full, expiring old ") + a.Logger.Info("new bucket is full, expiring new") a.expireNew(bucketIdx) } // Add to bucket. bucket[addrStr] = ka + // increment nNew if the peer doesnt already exist in a bucket if ka.addBucketRef(bucketIdx) == 1 { a.nNew++ } - // Ensure in addrLookup + // Add it to addrLookup a.addrLookup[ka.ID()] = ka return true } // Adds ka to old bucket. Returns false if it couldn't do it cuz buckets full. -func (a *AddrBook) addToOldBucket(ka *knownAddress, bucketIdx int) bool { +func (a *addrBook) addToOldBucket(ka *knownAddress, bucketIdx int) bool { // Sanity check if ka.isNew() { a.Logger.Error(cmn.Fmt("Cannot add new address to old bucket: %v", ka)) @@ -533,7 +428,7 @@ func (a *AddrBook) addToOldBucket(ka *knownAddress, bucketIdx int) bool { return true } -func (a *AddrBook) removeFromBucket(ka *knownAddress, bucketType byte, bucketIdx int) { +func (a *addrBook) removeFromBucket(ka *knownAddress, bucketType byte, bucketIdx int) { if ka.BucketType != bucketType { a.Logger.Error(cmn.Fmt("Bucket type mismatch: %v", ka)) return @@ -550,7 +445,7 @@ func (a *AddrBook) removeFromBucket(ka *knownAddress, bucketType byte, bucketIdx } } -func (a *AddrBook) removeFromAllBuckets(ka *knownAddress) { +func (a *addrBook) removeFromAllBuckets(ka *knownAddress) { for _, bucketIdx := range ka.Buckets { bucket := a.getBucket(ka.BucketType, bucketIdx) delete(bucket, ka.Addr.String()) @@ -564,7 +459,9 @@ func (a *AddrBook) removeFromAllBuckets(ka *knownAddress) { delete(a.addrLookup, ka.ID()) } -func (a *AddrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress { +//---------------------------------------------------------- + +func (a *addrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress { bucket := a.getBucket(bucketType, bucketIdx) var oldest *knownAddress for _, ka := range bucket { @@ -575,7 +472,9 @@ func (a *AddrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress { return oldest } -func (a *AddrBook) addAddress(addr, src *NetAddress) error { +// adds the address to a "new" bucket. if its already in one, +// it only adds it probabilistically +func (a *addrBook) addAddress(addr, src *p2p.NetAddress) error { if a.routabilityStrict && !addr.Routable() { return fmt.Errorf("Cannot add non-routable address %v", addr) } @@ -605,7 +504,10 @@ func (a *AddrBook) addAddress(addr, src *NetAddress) error { } bucket := a.calcNewBucket(addr, src) - a.addToNewBucket(ka, bucket) + added := a.addToNewBucket(ka, bucket) + if !added { + a.Logger.Info("Can't add new address, addr book is full", "address", addr, "total", a.size()) + } a.Logger.Info("Added new address", "address", addr, "total", a.size()) return nil @@ -613,7 +515,7 @@ func (a *AddrBook) addAddress(addr, src *NetAddress) error { // Make space in the new buckets by expiring the really bad entries. // If no bad entries are available we remove the oldest. -func (a *AddrBook) expireNew(bucketIdx int) { +func (a *addrBook) expireNew(bucketIdx int) { for addrStr, ka := range a.bucketsNew[bucketIdx] { // If an entry is bad, throw it away if ka.isBad() { @@ -628,10 +530,10 @@ func (a *AddrBook) expireNew(bucketIdx int) { a.removeFromBucket(oldest, bucketTypeNew, bucketIdx) } -// Promotes an address from new to old. -// TODO: Move to old probabilistically. -// The better a node is, the less likely it should be evicted from an old bucket. -func (a *AddrBook) moveToOld(ka *knownAddress) { +// Promotes an address from new to old. If the destination bucket is full, +// demote the oldest one to a "new" bucket. +// TODO: Demote more probabilistically? +func (a *addrBook) moveToOld(ka *knownAddress) { // Sanity check if ka.isOld() { a.Logger.Error(cmn.Fmt("Cannot promote address that is already old %v", ka)) @@ -674,9 +576,12 @@ func (a *AddrBook) moveToOld(ka *knownAddress) { } } +//--------------------------------------------------------------------- +// calculate bucket placements + // doublesha256( key + sourcegroup + // int64(doublesha256(key + group + sourcegroup))%bucket_per_group ) % num_new_buckets -func (a *AddrBook) calcNewBucket(addr, src *NetAddress) int { +func (a *addrBook) calcNewBucket(addr, src *p2p.NetAddress) int { data1 := []byte{} data1 = append(data1, []byte(a.key)...) data1 = append(data1, []byte(a.groupKey(addr))...) @@ -697,7 +602,7 @@ func (a *AddrBook) calcNewBucket(addr, src *NetAddress) int { // doublesha256( key + group + // int64(doublesha256(key + addr))%buckets_per_group ) % num_old_buckets -func (a *AddrBook) calcOldBucket(addr *NetAddress) int { +func (a *addrBook) calcOldBucket(addr *p2p.NetAddress) int { data1 := []byte{} data1 = append(data1, []byte(a.key)...) data1 = append(data1, []byte(addr.String())...) @@ -719,7 +624,7 @@ func (a *AddrBook) calcOldBucket(addr *NetAddress) int { // This is the /16 for IPv4, the /32 (/36 for he.net) for IPv6, the string // "local" for a local address and the string "unroutable" for an unroutable // address. -func (a *AddrBook) groupKey(na *NetAddress) string { +func (a *addrBook) groupKey(na *p2p.NetAddress) string { if a.routabilityStrict && na.Local() { return "local" } @@ -764,137 +669,6 @@ func (a *AddrBook) groupKey(na *NetAddress) string { return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(bits, 128)}).String() } -//----------------------------------------------------------------------------- - -/* - knownAddress - - tracks information about a known network address that is used - to determine how viable an address is. -*/ -type knownAddress struct { - Addr *NetAddress - Src *NetAddress - Attempts int32 - LastAttempt time.Time - LastSuccess time.Time - BucketType byte - Buckets []int -} - -func newKnownAddress(addr *NetAddress, src *NetAddress) *knownAddress { - return &knownAddress{ - Addr: addr, - Src: src, - Attempts: 0, - LastAttempt: time.Now(), - BucketType: bucketTypeNew, - Buckets: nil, - } -} - -func (ka *knownAddress) ID() ID { - return ka.Addr.ID -} - -func (ka *knownAddress) isOld() bool { - return ka.BucketType == bucketTypeOld -} - -func (ka *knownAddress) isNew() bool { - return ka.BucketType == bucketTypeNew -} - -func (ka *knownAddress) markAttempt() { - now := time.Now() - ka.LastAttempt = now - ka.Attempts += 1 -} - -func (ka *knownAddress) markGood() { - now := time.Now() - ka.LastAttempt = now - ka.Attempts = 0 - ka.LastSuccess = now -} - -func (ka *knownAddress) addBucketRef(bucketIdx int) int { - for _, bucket := range ka.Buckets { - if bucket == bucketIdx { - // TODO refactor to return error? - // log.Warn(Fmt("Bucket already exists in ka.Buckets: %v", ka)) - return -1 - } - } - ka.Buckets = append(ka.Buckets, bucketIdx) - return len(ka.Buckets) -} - -func (ka *knownAddress) removeBucketRef(bucketIdx int) int { - buckets := []int{} - for _, bucket := range ka.Buckets { - if bucket != bucketIdx { - buckets = append(buckets, bucket) - } - } - if len(buckets) != len(ka.Buckets)-1 { - // TODO refactor to return error? - // log.Warn(Fmt("bucketIdx not found in ka.Buckets: %v", ka)) - return -1 - } - ka.Buckets = buckets - return len(ka.Buckets) -} - -/* - An address is bad if the address in question is a New address, has not been tried in the last - minute, and meets one of the following criteria: - - 1) It claims to be from the future - 2) It hasn't been seen in over a month - 3) It has failed at least three times and never succeeded - 4) It has failed ten times in the last week - - All addresses that meet these criteria are assumed to be worthless and not - worth keeping hold of. - - XXX: so a good peer needs us to call MarkGood before the conditions above are reached! -*/ -func (ka *knownAddress) isBad() bool { - // Is Old --> good - if ka.BucketType == bucketTypeOld { - return false - } - - // Has been attempted in the last minute --> good - if ka.LastAttempt.Before(time.Now().Add(-1 * time.Minute)) { - return false - } - - // Too old? - // XXX: does this mean if we've kept a connection up for this long we'll disconnect?! - // and shouldn't it be .Before ? - if ka.LastAttempt.After(time.Now().Add(-1 * numMissingDays * time.Hour * 24)) { - return true - } - - // Never succeeded? - if ka.LastSuccess.IsZero() && ka.Attempts >= numRetries { - return true - } - - // Hasn't succeeded in too long? - // XXX: does this mean if we've kept a connection up for this long we'll disconnect?! - if ka.LastSuccess.Before(time.Now().Add(-1*minBadDays*time.Hour*24)) && - ka.Attempts >= maxFailures { - return true - } - - return false -} - -//----------------------------------------------------------------------------- - // doubleSha256 calculates sha256(sha256(b)) and returns the resulting bytes. func doubleSha256(b []byte) []byte { hasher := sha256.New() diff --git a/p2p/addrbook_test.go b/p2p/pex/addrbook_test.go similarity index 93% rename from p2p/addrbook_test.go rename to p2p/pex/addrbook_test.go index 00051ae1f..166d31847 100644 --- a/p2p/addrbook_test.go +++ b/p2p/pex/addrbook_test.go @@ -1,4 +1,4 @@ -package p2p +package pex import ( "encoding/hex" @@ -8,6 +8,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/tendermint/tendermint/p2p" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" ) @@ -168,8 +169,8 @@ func TestAddrBookHandlesDuplicates(t *testing.T) { } type netAddressPair struct { - addr *NetAddress - src *NetAddress + addr *p2p.NetAddress + src *p2p.NetAddress } func randNetAddressPairs(t *testing.T, n int) []netAddressPair { @@ -180,7 +181,7 @@ func randNetAddressPairs(t *testing.T, n int) []netAddressPair { return randAddrs } -func randIPv4Address(t *testing.T) *NetAddress { +func randIPv4Address(t *testing.T) *p2p.NetAddress { for { ip := fmt.Sprintf("%v.%v.%v.%v", rand.Intn(254)+1, @@ -189,9 +190,9 @@ func randIPv4Address(t *testing.T) *NetAddress { rand.Intn(255), ) port := rand.Intn(65535-1) + 1 - id := ID(hex.EncodeToString(cmn.RandBytes(IDByteLength))) - idAddr := IDAddressString(id, fmt.Sprintf("%v:%v", ip, port)) - addr, err := NewNetAddressString(idAddr) + id := p2p.ID(hex.EncodeToString(cmn.RandBytes(p2p.IDByteLength))) + idAddr := p2p.IDAddressString(id, fmt.Sprintf("%v:%v", ip, port)) + addr, err := p2p.NewNetAddressString(idAddr) assert.Nil(t, err, "error generating rand network address") if addr.Routable() { return addr diff --git a/p2p/pex/file.go b/p2p/pex/file.go new file mode 100644 index 000000000..38142dd9d --- /dev/null +++ b/p2p/pex/file.go @@ -0,0 +1,83 @@ +package pex + +import ( + "encoding/json" + "os" + + cmn "github.com/tendermint/tmlibs/common" +) + +/* Loading & Saving */ + +type addrBookJSON struct { + Key string `json:"key"` + Addrs []*knownAddress `json:"addrs"` +} + +func (a *addrBook) saveToFile(filePath string) { + a.Logger.Info("Saving AddrBook to file", "size", a.Size()) + + a.mtx.Lock() + defer a.mtx.Unlock() + // Compile Addrs + addrs := []*knownAddress{} + for _, ka := range a.addrLookup { + addrs = append(addrs, ka) + } + + aJSON := &addrBookJSON{ + Key: a.key, + Addrs: addrs, + } + + jsonBytes, err := json.MarshalIndent(aJSON, "", "\t") + if err != nil { + a.Logger.Error("Failed to save AddrBook to file", "err", err) + return + } + err = cmn.WriteFileAtomic(filePath, jsonBytes, 0644) + if err != nil { + a.Logger.Error("Failed to save AddrBook to file", "file", filePath, "err", err) + } +} + +// Returns false if file does not exist. +// cmn.Panics if file is corrupt. +func (a *addrBook) loadFromFile(filePath string) bool { + // If doesn't exist, do nothing. + _, err := os.Stat(filePath) + if os.IsNotExist(err) { + return false + } + + // Load addrBookJSON{} + r, err := os.Open(filePath) + if err != nil { + cmn.PanicCrisis(cmn.Fmt("Error opening file %s: %v", filePath, err)) + } + defer r.Close() // nolint: errcheck + aJSON := &addrBookJSON{} + dec := json.NewDecoder(r) + err = dec.Decode(aJSON) + if err != nil { + cmn.PanicCrisis(cmn.Fmt("Error reading file %s: %v", filePath, err)) + } + + // Restore all the fields... + // Restore the key + a.key = aJSON.Key + // Restore .bucketsNew & .bucketsOld + for _, ka := range aJSON.Addrs { + for _, bucketIndex := range ka.Buckets { + bucket := a.getBucket(ka.BucketType, bucketIndex) + bucket[ka.Addr.String()] = ka + } + a.addrLookup[ka.ID()] = ka + if ka.BucketType == bucketTypeNew { + a.nNew++ + } else { + a.nOld++ + } + } + return true +} diff --git a/p2p/pex/known_address.go b/p2p/pex/known_address.go new file mode 100644 index 000000000..085eb10fa --- /dev/null +++ b/p2p/pex/known_address.go @@ -0,0 +1,142 @@ +package pex + +import ( + "time" + + "github.com/tendermint/tendermint/p2p" +) + +// knownAddress tracks information about a known network address +// that is used to determine how viable an address is. +type knownAddress struct { + Addr *p2p.NetAddress `json:"addr"` + Src *p2p.NetAddress `json:"src"` + Attempts int32 `json:"attempts"` + LastAttempt time.Time `json:"last_attempt"` + LastSuccess time.Time `json:"last_success"` + BucketType byte `json:"bucket_type"` + Buckets []int `json:"buckets"` +} + +func newKnownAddress(addr *p2p.NetAddress, src *p2p.NetAddress) *knownAddress { + return &knownAddress{ + Addr: addr, + Src: src, + Attempts: 0, + LastAttempt: time.Now(), + BucketType: bucketTypeNew, + Buckets: nil, + } +} + +func (ka *knownAddress) ID() p2p.ID { + return ka.Addr.ID +} + +func (ka *knownAddress) copy() *knownAddress { + return &knownAddress{ + Addr: ka.Addr, + Src: ka.Src, + Attempts: ka.Attempts, + LastAttempt: ka.LastAttempt, + LastSuccess: ka.LastSuccess, + BucketType: ka.BucketType, + Buckets: ka.Buckets, + } +} + +func (ka *knownAddress) isOld() bool { + return ka.BucketType == bucketTypeOld +} + +func (ka *knownAddress) isNew() bool { + return ka.BucketType == bucketTypeNew +} + +func (ka *knownAddress) markAttempt() { + now := time.Now() + ka.LastAttempt = now + ka.Attempts += 1 +} + +func (ka *knownAddress) markGood() { + now := time.Now() + ka.LastAttempt = now + ka.Attempts = 0 + ka.LastSuccess = now +} + +func (ka *knownAddress) addBucketRef(bucketIdx int) int { + for _, bucket := range ka.Buckets { + if bucket == bucketIdx { + // TODO refactor to return error? + // log.Warn(Fmt("Bucket already exists in ka.Buckets: %v", ka)) + return -1 + } + } + ka.Buckets = append(ka.Buckets, bucketIdx) + return len(ka.Buckets) +} + +func (ka *knownAddress) removeBucketRef(bucketIdx int) int { + buckets := []int{} + for _, bucket := range ka.Buckets { + if bucket != bucketIdx { + buckets = append(buckets, bucket) + } + } + if len(buckets) != len(ka.Buckets)-1 { + // TODO refactor to return error? + // log.Warn(Fmt("bucketIdx not found in ka.Buckets: %v", ka)) + return -1 + } + ka.Buckets = buckets + return len(ka.Buckets) +} + +/* + An address is bad if the address in question is a New address, has not been tried in the last + minute, and meets one of the following criteria: + + 1) It claims to be from the future + 2) It hasn't been seen in over a week + 3) It has failed at least three times and never succeeded + 4) It has failed ten times in the last week + + All addresses that meet these criteria are assumed to be worthless and not + worth keeping hold of. + + XXX: so a good peer needs us to call MarkGood before the conditions above are reached! +*/ +func (ka *knownAddress) isBad() bool { + // Is Old --> good + if ka.BucketType == bucketTypeOld { + return false + } + + // Has been attempted in the last minute --> good + if ka.LastAttempt.Before(time.Now().Add(-1 * time.Minute)) { + return false + } + + // Too old? + // XXX: does this mean if we've kept a connection up for this long we'll disconnect?! + // and shouldn't it be .Before ? + if ka.LastAttempt.After(time.Now().Add(-1 * numMissingDays * time.Hour * 24)) { + return true + } + + // Never succeeded? + if ka.LastSuccess.IsZero() && ka.Attempts >= numRetries { + return true + } + + // Hasn't succeeded in too long? + // XXX: does this mean if we've kept a connection up for this long we'll disconnect?! + if ka.LastSuccess.Before(time.Now().Add(-1*minBadDays*time.Hour*24)) && + ka.Attempts >= maxFailures { + return true + } + + return false +} diff --git a/p2p/pex/params.go b/p2p/pex/params.go new file mode 100644 index 000000000..f94e1021c --- /dev/null +++ b/p2p/pex/params.go @@ -0,0 +1,55 @@ +package pex + +import "time" + +const ( + // addresses under which the address manager will claim to need more addresses. + needAddressThreshold = 1000 + + // interval used to dump the address cache to disk for future use. + dumpAddressInterval = time.Minute * 2 + + // max addresses in each old address bucket. + oldBucketSize = 64 + + // buckets we split old addresses over. + oldBucketCount = 64 + + // max addresses in each new address bucket. + newBucketSize = 64 + + // buckets that we spread new addresses over. + newBucketCount = 256 + + // old buckets over which an address group will be spread. + oldBucketsPerGroup = 4 + + // new buckets over which a source address group will be spread. + newBucketsPerGroup = 32 + + // buckets a frequently seen new address may end up in. + maxNewBucketsPerAddress = 4 + + // days before which we assume an address has vanished + // if we have not seen it announced in that long. + numMissingDays = 7 + + // tries without a single success before we assume an address is bad. + numRetries = 3 + + // max failures we will accept without a success before considering an address bad. + maxFailures = 10 // ? + + // days since the last success before we will consider evicting an address. + minBadDays = 7 + + // % of total addresses known returned by GetSelection. + getSelectionPercent = 23 + + // min addresses that must be returned by GetSelection. Useful for bootstrapping. + minGetSelection = 32 + + // max addresses returned by GetSelection + // NOTE: this must match "maxPexMessageSize" + maxGetSelection = 250 +) diff --git a/p2p/pex_reactor.go b/p2p/pex/pex_reactor.go similarity index 93% rename from p2p/pex_reactor.go rename to p2p/pex/pex_reactor.go index bd19ab3b5..53075a1d6 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -1,4 +1,4 @@ -package p2p +package pex import ( "bytes" @@ -11,8 +11,13 @@ import ( "github.com/pkg/errors" wire "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" + + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/p2p/conn" ) +type Peer = p2p.Peer + const ( // PexChannel is a channel for PEX messages PexChannel = byte(0x00) @@ -45,9 +50,9 @@ const ( // Only accept pexAddrsMsg from peers we sent a corresponding pexRequestMsg too. // Only accept one pexRequestMsg every ~defaultEnsurePeersPeriod. type PEXReactor struct { - BaseReactor + p2p.BaseReactor - book *AddrBook + book AddrBook config *PEXReactorConfig ensurePeersPeriod time.Duration @@ -67,7 +72,7 @@ type PEXReactorConfig struct { } // NewPEXReactor creates new PEX reactor. -func NewPEXReactor(b *AddrBook, config *PEXReactorConfig) *PEXReactor { +func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor { r := &PEXReactor{ book: b, config: config, @@ -75,7 +80,7 @@ func NewPEXReactor(b *AddrBook, config *PEXReactorConfig) *PEXReactor { requestsSent: cmn.NewCMap(), lastReceivedRequests: cmn.NewCMap(), } - r.BaseReactor = *NewBaseReactor("PEXReactor", r) + r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r) return r } @@ -111,8 +116,8 @@ func (r *PEXReactor) OnStop() { } // GetChannels implements Reactor -func (r *PEXReactor) GetChannels() []*ChannelDescriptor { - return []*ChannelDescriptor{ +func (r *PEXReactor) GetChannels() []*conn.ChannelDescriptor { + return []*conn.ChannelDescriptor{ { ID: PexChannel, Priority: 1, @@ -225,7 +230,7 @@ func (r *PEXReactor) RequestAddrs(p Peer) { // ReceiveAddrs adds the given addrs to the addrbook if theres an open // request for this peer and deletes the open request. // If there's no open request for the src peer, it returns an error. -func (r *PEXReactor) ReceiveAddrs(addrs []*NetAddress, src Peer) error { +func (r *PEXReactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error { id := string(src.ID()) if !r.requestsSent.Has(id) { @@ -244,7 +249,7 @@ func (r *PEXReactor) ReceiveAddrs(addrs []*NetAddress, src Peer) error { } // SendAddrs sends addrs to the peer. -func (r *PEXReactor) SendAddrs(p Peer, netAddrs []*NetAddress) { +func (r *PEXReactor) SendAddrs(p Peer, netAddrs []*p2p.NetAddress) { p.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: netAddrs}}) } @@ -294,7 +299,7 @@ func (r *PEXReactor) ensurePeers() { // NOTE: range here is [10, 90]. Too high ? newBias := cmn.MinInt(numOutPeers, 8)*10 + 10 - toDial := make(map[ID]*NetAddress) + toDial := make(map[p2p.ID]*p2p.NetAddress) // Try maxAttempts times to pick numToDial addresses to dial maxAttempts := numToDial * 3 for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ { @@ -317,10 +322,15 @@ func (r *PEXReactor) ensurePeers() { // Dial picked addresses for _, item := range toDial { - go func(picked *NetAddress) { + go func(picked *p2p.NetAddress) { _, err := r.Switch.DialPeerWithAddress(picked, false) if err != nil { - r.book.MarkAttempt(picked) + // TODO: detect more "bad peer" scenarios + if _, ok := err.(p2p.ErrSwitchAuthenticationFailure); ok { + r.book.MarkBad(picked) + } else { + r.book.MarkAttempt(picked) + } } }(item) } @@ -349,7 +359,7 @@ func (r *PEXReactor) checkSeeds() error { if lSeeds == 0 { return nil } - _, errs := NewNetAddressStrings(r.config.Seeds) + _, errs := p2p.NewNetAddressStrings(r.config.Seeds) for _, err := range errs { if err != nil { return err @@ -364,9 +374,10 @@ func (r *PEXReactor) dialSeeds() { if lSeeds == 0 { return } - seedAddrs, _ := NewNetAddressStrings(r.config.Seeds) + seedAddrs, _ := p2p.NewNetAddressStrings(r.config.Seeds) - perm := r.Switch.rng.Perm(lSeeds) + perm := rand.Perm(lSeeds) + // perm := r.Switch.rng.Perm(lSeeds) for _, i := range perm { // dial a random seed seedAddr := seedAddrs[i] @@ -408,7 +419,7 @@ func (r *PEXReactor) crawlPeersRoutine() { // network crawling performed during seed/crawler mode. type crawlPeerInfo struct { // The listening address of a potential peer we learned about - Addr *NetAddress + Addr *p2p.NetAddress // The last time we attempt to reach this address LastAttempt time.Time @@ -532,7 +543,7 @@ func (m *pexRequestMessage) String() string { A message with announced peer addresses. */ type pexAddrsMessage struct { - Addrs []*NetAddress + Addrs []*p2p.NetAddress } func (m *pexAddrsMessage) String() string { diff --git a/p2p/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go similarity index 77% rename from p2p/pex_reactor_test.go rename to p2p/pex/pex_reactor_test.go index 44fd8b51c..82dafecd4 100644 --- a/p2p/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -1,21 +1,34 @@ -package p2p +package pex import ( "fmt" "io/ioutil" - "math/rand" "os" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + crypto "github.com/tendermint/go-crypto" wire "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" + + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/p2p/conn" +) + +var ( + config *cfg.P2PConfig ) +func init() { + config = cfg.DefaultP2PConfig() + config.PexReactor = true +} + func TestPEXReactorBasic(t *testing.T) { assert, require := assert.New(t), require.New(t) @@ -45,7 +58,7 @@ func TestPEXReactorAddRemovePeer(t *testing.T) { r.SetLogger(log.TestingLogger()) size := book.Size() - peer := createRandomPeer(false) + peer := p2p.CreateRandomPeer(false) r.AddPeer(peer) assert.Equal(size+1, book.Size()) @@ -53,7 +66,7 @@ func TestPEXReactorAddRemovePeer(t *testing.T) { r.RemovePeer(peer, "peer not available") assert.Equal(size+1, book.Size()) - outboundPeer := createRandomPeer(true) + outboundPeer := p2p.CreateRandomPeer(true) r.AddPeer(outboundPeer) assert.Equal(size+1, book.Size(), "outbound peers should not be added to the address book") @@ -64,7 +77,7 @@ func TestPEXReactorAddRemovePeer(t *testing.T) { func TestPEXReactorRunning(t *testing.T) { N := 3 - switches := make([]*Switch, N) + switches := make([]*p2p.Switch, N) dir, err := ioutil.TempDir("", "pex_reactor") require.Nil(t, err) @@ -74,7 +87,7 @@ func TestPEXReactorRunning(t *testing.T) { // create switches for i := 0; i < N; i++ { - switches[i] = makeSwitch(config, i, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch { + switches[i] = p2p.MakeSwitch(config, i, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { sw.SetLogger(log.TestingLogger().With("switch", i)) r := NewPEXReactor(book, &PEXReactorConfig{}) @@ -87,9 +100,9 @@ func TestPEXReactorRunning(t *testing.T) { // fill the address book and add listeners for _, s := range switches { - addr, _ := NewNetAddressString(s.NodeInfo().ListenAddr) + addr, _ := p2p.NewNetAddressString(s.NodeInfo().ListenAddr) book.AddAddress(addr, addr) - s.AddListener(NewDefaultListener("tcp", s.NodeInfo().ListenAddr, true, log.TestingLogger())) + s.AddListener(p2p.NewDefaultListener("tcp", s.NodeInfo().ListenAddr, true, log.TestingLogger())) } // start switches @@ -98,7 +111,7 @@ func TestPEXReactorRunning(t *testing.T) { require.Nil(t, err) } - assertSomePeersWithTimeout(t, switches, 10*time.Millisecond, 10*time.Second) + assertPeersWithTimeout(t, switches, 10*time.Millisecond, 10*time.Second, N-1) // stop them for _, s := range switches { @@ -106,7 +119,7 @@ func TestPEXReactorRunning(t *testing.T) { } } -func assertSomePeersWithTimeout(t *testing.T, switches []*Switch, checkPeriod, timeout time.Duration) { +func assertPeersWithTimeout(t *testing.T, switches []*p2p.Switch, checkPeriod, timeout time.Duration, nPeers int) { ticker := time.NewTicker(checkPeriod) remaining := timeout for { @@ -116,7 +129,7 @@ func assertSomePeersWithTimeout(t *testing.T, switches []*Switch, checkPeriod, t allGood := true for _, s := range switches { outbound, inbound, _ := s.NumPeers() - if outbound+inbound == 0 { + if outbound+inbound < nPeers { allGood = false } } @@ -151,13 +164,13 @@ func TestPEXReactorReceive(t *testing.T) { r := NewPEXReactor(book, &PEXReactorConfig{}) r.SetLogger(log.TestingLogger()) - peer := createRandomPeer(false) + peer := p2p.CreateRandomPeer(false) // we have to send a request to receive responses r.RequestAddrs(peer) size := book.Size() - addrs := []*NetAddress{peer.NodeInfo().NetAddress()} + addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()} msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}) r.Receive(PexChannel, peer, msg) assert.Equal(size+1, book.Size()) @@ -176,14 +189,14 @@ func TestPEXReactorRequestMessageAbuse(t *testing.T) { book.SetLogger(log.TestingLogger()) r := NewPEXReactor(book, &PEXReactorConfig{}) - sw := makeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch { return sw }) + sw := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw }) sw.SetLogger(log.TestingLogger()) sw.AddReactor("PEX", r) r.SetSwitch(sw) r.SetLogger(log.TestingLogger()) peer := newMockPeer() - sw.peers.Add(peer) + p2p.AddPeerToSwitch(sw, peer) assert.True(sw.Peers().Has(peer.ID())) id := string(peer.ID()) @@ -215,14 +228,14 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) { book.SetLogger(log.TestingLogger()) r := NewPEXReactor(book, &PEXReactorConfig{}) - sw := makeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch { return sw }) + sw := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw }) sw.SetLogger(log.TestingLogger()) sw.AddReactor("PEX", r) r.SetSwitch(sw) r.SetLogger(log.TestingLogger()) peer := newMockPeer() - sw.peers.Add(peer) + p2p.AddPeerToSwitch(sw, peer) assert.True(sw.Peers().Has(peer.ID())) id := string(peer.ID()) @@ -232,7 +245,7 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) { assert.True(r.requestsSent.Has(id)) assert.True(sw.Peers().Has(peer.ID())) - addrs := []*NetAddress{peer.NodeInfo().NetAddress()} + addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()} msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}) // receive some addrs. should clear the request @@ -254,7 +267,7 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) { book.SetLogger(log.TestingLogger()) // 1. create seed - seed := makeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch { + seed := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { sw.SetLogger(log.TestingLogger()) r := NewPEXReactor(book, &PEXReactorConfig{}) @@ -263,13 +276,13 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) { sw.AddReactor("pex", r) return sw }) - seed.AddListener(NewDefaultListener("tcp", seed.NodeInfo().ListenAddr, true, log.TestingLogger())) + seed.AddListener(p2p.NewDefaultListener("tcp", seed.NodeInfo().ListenAddr, true, log.TestingLogger())) err = seed.Start() require.Nil(t, err) defer seed.Stop() // 2. create usual peer - sw := makeSwitch(config, 1, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch { + sw := p2p.MakeSwitch(config, 1, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { sw.SetLogger(log.TestingLogger()) r := NewPEXReactor(book, &PEXReactorConfig{Seeds: []string{seed.NodeInfo().ListenAddr}}) @@ -283,7 +296,7 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) { defer sw.Stop() // 3. check that peer at least connects to seed - assertSomePeersWithTimeout(t, []*Switch{sw}, 10*time.Millisecond, 10*time.Second) + assertPeersWithTimeout(t, []*p2p.Switch{sw}, 10*time.Millisecond, 10*time.Second, 1) } func TestPEXReactorCrawlStatus(t *testing.T) { @@ -297,7 +310,7 @@ func TestPEXReactorCrawlStatus(t *testing.T) { pexR := NewPEXReactor(book, &PEXReactorConfig{SeedMode: true}) // Seed/Crawler mode uses data from the Switch - makeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch { + p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { pexR.SetLogger(log.TestingLogger()) sw.SetLogger(log.TestingLogger().With("switch", i)) sw.AddReactor("pex", pexR) @@ -305,13 +318,13 @@ func TestPEXReactorCrawlStatus(t *testing.T) { }) // Create a peer, add it to the peer set and the addrbook. - peer := createRandomPeer(false) - pexR.Switch.peers.Add(peer) + peer := p2p.CreateRandomPeer(false) + p2p.AddPeerToSwitch(pexR.Switch, peer) addr1 := peer.NodeInfo().NetAddress() pexR.book.AddAddress(addr1, addr1) // Add a non-connected address to the book. - _, addr2 := createRoutableAddr() + _, addr2 := p2p.CreateRoutableAddr() pexR.book.AddAddress(addr2, addr1) // Get some peerInfos to crawl @@ -323,44 +336,15 @@ func TestPEXReactorCrawlStatus(t *testing.T) { // TODO: test } -func createRoutableAddr() (addr string, netAddr *NetAddress) { - for { - var err error - addr = cmn.Fmt("%X@%v.%v.%v.%v:46656", cmn.RandBytes(20), rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256) - netAddr, err = NewNetAddressString(addr) - if err != nil { - panic(err) - } - if netAddr.Routable() { - break - } - } - return -} - -func createRandomPeer(outbound bool) *peer { - addr, netAddr := createRoutableAddr() - p := &peer{ - nodeInfo: NodeInfo{ - ListenAddr: netAddr.DialString(), - PubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(), - }, - outbound: outbound, - mconn: &MConnection{}, - } - p.SetLogger(log.TestingLogger().With("peer", addr)) - return p -} - type mockPeer struct { *cmn.BaseService pubKey crypto.PubKey - addr *NetAddress + addr *p2p.NetAddress outbound, persistent bool } func newMockPeer() mockPeer { - _, netAddr := createRoutableAddr() + _, netAddr := p2p.CreateRoutableAddr() mp := mockPeer{ addr: netAddr, pubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(), @@ -370,16 +354,16 @@ func newMockPeer() mockPeer { return mp } -func (mp mockPeer) ID() ID { return PubKeyToID(mp.pubKey) } +func (mp mockPeer) ID() p2p.ID { return p2p.PubKeyToID(mp.pubKey) } func (mp mockPeer) IsOutbound() bool { return mp.outbound } func (mp mockPeer) IsPersistent() bool { return mp.persistent } -func (mp mockPeer) NodeInfo() NodeInfo { - return NodeInfo{ +func (mp mockPeer) NodeInfo() p2p.NodeInfo { + return p2p.NodeInfo{ PubKey: mp.pubKey, ListenAddr: mp.addr.DialString(), } } -func (mp mockPeer) Status() ConnectionStatus { return ConnectionStatus{} } +func (mp mockPeer) Status() conn.ConnectionStatus { return conn.ConnectionStatus{} } func (mp mockPeer) Send(byte, interface{}) bool { return false } func (mp mockPeer) TrySend(byte, interface{}) bool { return false } func (mp mockPeer) Set(string, interface{}) {} diff --git a/p2p/switch.go b/p2p/switch.go index ccf2c5eb4..9502359dd 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -11,6 +11,7 @@ import ( crypto "github.com/tendermint/go-crypto" cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/p2p/conn" cmn "github.com/tendermint/tmlibs/common" ) @@ -30,10 +31,12 @@ const ( reconnectBackOffBaseSeconds = 3 ) -var ( - ErrSwitchDuplicatePeer = errors.New("Duplicate peer") - ErrSwitchConnectToSelf = errors.New("Connect to self") -) +//----------------------------------------------------------------------------- + +type AddrBook interface { + AddAddress(addr *NetAddress, src *NetAddress) error + Save() +} //----------------------------------------------------------------------------- @@ -48,7 +51,7 @@ type Switch struct { peerConfig *PeerConfig listeners []Listener reactors map[string]Reactor - chDescs []*ChannelDescriptor + chDescs []*conn.ChannelDescriptor reactorsByCh map[byte]Reactor peers *PeerSet dialing *cmn.CMap @@ -66,7 +69,7 @@ func NewSwitch(config *cfg.P2PConfig) *Switch { config: config, peerConfig: DefaultPeerConfig(), reactors: make(map[string]Reactor), - chDescs: make([]*ChannelDescriptor, 0), + chDescs: make([]*conn.ChannelDescriptor, 0), reactorsByCh: make(map[byte]Reactor), peers: NewPeerSet(), dialing: cmn.NewCMap(), @@ -77,10 +80,10 @@ func NewSwitch(config *cfg.P2PConfig) *Switch { sw.rng = rand.New(rand.NewSource(cmn.RandInt64())) // TODO: collapse the peerConfig into the config ? - sw.peerConfig.MConfig.flushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond + sw.peerConfig.MConfig.FlushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond sw.peerConfig.MConfig.SendRate = config.SendRate sw.peerConfig.MConfig.RecvRate = config.RecvRate - sw.peerConfig.MConfig.maxMsgPacketPayloadSize = config.MaxMsgPacketPayloadSize + sw.peerConfig.MConfig.MaxMsgPacketPayloadSize = config.MaxMsgPacketPayloadSize sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw) return sw @@ -265,6 +268,8 @@ func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) { // If no success after all that, it stops trying, and leaves it // to the PEX/Addrbook to find the peer again func (sw *Switch) reconnectToPeer(peer Peer) { + // NOTE this will connect to the self reported address, + // not necessarily the original we dialed netAddr := peer.NodeInfo().NetAddress() start := time.Now() sw.Logger.Info("Reconnecting to peer", "peer", peer) @@ -316,7 +321,7 @@ func (sw *Switch) IsDialing(id ID) bool { } // DialPeersAsync dials a list of peers asynchronously in random order (optionally, making them persistent). -func (sw *Switch) DialPeersAsync(addrBook *AddrBook, peers []string, persistent bool) error { +func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent bool) error { netAddrs, errs := NewNetAddressStrings(peers) for _, err := range errs { sw.Logger.Error("Error in peer's address", "err", err) @@ -330,8 +335,11 @@ func (sw *Switch) DialPeersAsync(addrBook *AddrBook, peers []string, persistent if netAddr.Same(ourAddr) { continue } + // TODO: move this out of here ? addrBook.AddAddress(netAddr, ourAddr) } + // Persist some peers to disk right away. + // NOTE: integration tests depend on this addrBook.Save() } @@ -454,7 +462,7 @@ func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig peer.Logger.Info("Dialed peer with unknown ID - unable to authenticate", "addr", addr) } else if addr.ID != peer.ID() { peer.CloseConn() - return nil, fmt.Errorf("Failed to authenticate peer %v. Connected to peer with ID %s", addr, peer.ID()) + return nil, ErrSwitchAuthenticationFailure{addr, peer.ID()} } err = sw.addPeer(peer) diff --git a/p2p/switch_test.go b/p2p/switch_test.go index a729698e9..75f9640b1 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -16,6 +16,7 @@ import ( "github.com/tendermint/tmlibs/log" cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/p2p/conn" ) var ( @@ -37,7 +38,7 @@ type TestReactor struct { BaseReactor mtx sync.Mutex - channels []*ChannelDescriptor + channels []*conn.ChannelDescriptor peersAdded []Peer peersRemoved []Peer logMessages bool @@ -45,7 +46,7 @@ type TestReactor struct { msgsReceived map[byte][]PeerMessage } -func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReactor { +func NewTestReactor(channels []*conn.ChannelDescriptor, logMessages bool) *TestReactor { tr := &TestReactor{ channels: channels, logMessages: logMessages, @@ -56,7 +57,7 @@ func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReacto return tr } -func (tr *TestReactor) GetChannels() []*ChannelDescriptor { +func (tr *TestReactor) GetChannels() []*conn.ChannelDescriptor { return tr.channels } @@ -92,7 +93,7 @@ func (tr *TestReactor) getMsgs(chID byte) []PeerMessage { // convenience method for creating two switches connected to each other. // XXX: note this uses net.Pipe and not a proper TCP conn -func makeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) { +func MakeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) { // Create two switches that will be interconnected. switches := MakeConnectedSwitches(config, 2, initSwitch, Connect2Switches) return switches[0], switches[1] @@ -100,11 +101,11 @@ func makeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switc func initSwitchFunc(i int, sw *Switch) *Switch { // Make two reactors of two channels each - sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{ + sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{ {ID: byte(0x00), Priority: 10}, {ID: byte(0x01), Priority: 10}, }, true)) - sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{ + sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{ {ID: byte(0x02), Priority: 10}, {ID: byte(0x03), Priority: 10}, }, true)) @@ -112,7 +113,7 @@ func initSwitchFunc(i int, sw *Switch) *Switch { } func TestSwitches(t *testing.T) { - s1, s2 := makeSwitchPair(t, initSwitchFunc) + s1, s2 := MakeSwitchPair(t, initSwitchFunc) defer s1.Stop() defer s2.Stop() @@ -156,12 +157,12 @@ func assertMsgReceivedWithTimeout(t *testing.T, msg string, channel byte, reacto } func TestConnAddrFilter(t *testing.T) { - s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) - s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) + s1 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) + s2 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) defer s1.Stop() defer s2.Stop() - c1, c2 := netPipe() + c1, c2 := conn.NetPipe() s1.SetAddrFilter(func(addr net.Addr) error { if addr.String() == c1.RemoteAddr().String() { @@ -192,12 +193,12 @@ func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration) } func TestConnPubKeyFilter(t *testing.T) { - s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) - s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) + s1 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) + s2 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) defer s1.Stop() defer s2.Stop() - c1, c2 := netPipe() + c1, c2 := conn.NetPipe() // set pubkey filter s1.SetPubKeyFilter(func(pubkey crypto.PubKey) error { @@ -224,7 +225,7 @@ func TestConnPubKeyFilter(t *testing.T) { func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { assert, require := assert.New(t), require.New(t) - sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) + sw := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) err := sw.Start() if err != nil { t.Error(err) @@ -251,7 +252,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { func TestSwitchReconnectsToPersistentPeer(t *testing.T) { assert, require := assert.New(t), require.New(t) - sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) + sw := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) err := sw.Start() if err != nil { t.Error(err) @@ -302,13 +303,13 @@ func TestSwitchFullConnectivity(t *testing.T) { func BenchmarkSwitches(b *testing.B) { b.StopTimer() - s1, s2 := makeSwitchPair(b, func(i int, sw *Switch) *Switch { + s1, s2 := MakeSwitchPair(b, func(i int, sw *Switch) *Switch { // Make bar reactors of bar channels each - sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{ + sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{ {ID: byte(0x00), Priority: 10}, {ID: byte(0x01), Priority: 10}, }, false)) - sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{ + sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{ {ID: byte(0x02), Priority: 10}, {ID: byte(0x03), Priority: 10}, }, false)) diff --git a/p2p/test_util.go b/p2p/test_util.go index dca23a0ea..c4c0fe0b7 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -5,11 +5,46 @@ import ( "net" crypto "github.com/tendermint/go-crypto" - cfg "github.com/tendermint/tendermint/config" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" + + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/p2p/conn" ) +func AddPeerToSwitch(sw *Switch, peer Peer) { + sw.peers.Add(peer) +} + +func CreateRandomPeer(outbound bool) *peer { + addr, netAddr := CreateRoutableAddr() + p := &peer{ + nodeInfo: NodeInfo{ + ListenAddr: netAddr.DialString(), + PubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(), + }, + outbound: outbound, + mconn: &conn.MConnection{}, + } + p.SetLogger(log.TestingLogger().With("peer", addr)) + return p +} + +func CreateRoutableAddr() (addr string, netAddr *NetAddress) { + for { + var err error + addr = cmn.Fmt("%X@%v.%v.%v.%v:46656", cmn.RandBytes(20), rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256) + netAddr, err = NewNetAddressString(addr) + if err != nil { + panic(err) + } + if netAddr.Routable() { + break + } + } + return +} + //------------------------------------------------------------------ // Connects switches via arbitrary net.Conn. Used for testing. @@ -20,7 +55,7 @@ import ( func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch { switches := make([]*Switch, n) for i := 0; i < n; i++ { - switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch) + switches[i] = MakeSwitch(cfg, i, "testing", "123.123.123", initSwitch) } if err := StartSwitches(switches); err != nil { @@ -42,7 +77,7 @@ func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Swit func Connect2Switches(switches []*Switch, i, j int) { switchI := switches[i] switchJ := switches[j] - c1, c2 := netPipe() + c1, c2 := conn.NetPipe() doneCh := make(chan struct{}) go func() { err := switchI.addPeerWithConnection(c1) @@ -91,7 +126,7 @@ func StartSwitches(switches []*Switch) error { return nil } -func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch { +func MakeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch { // new switch, add reactors // TODO: let the config be passed in? nodeKey := &NodeKey{ diff --git a/p2p/types.go b/p2p/types.go index d93adc9b6..b11765bb5 100644 --- a/p2p/types.go +++ b/p2p/types.go @@ -1,112 +1,8 @@ package p2p import ( - "fmt" - "net" - "strconv" - "strings" - - crypto "github.com/tendermint/go-crypto" + "github.com/tendermint/tendermint/p2p/conn" ) -const maxNodeInfoSize = 10240 // 10Kb - -// NodeInfo is the basic node information exchanged -// between two peers during the Tendermint P2P handshake. -type NodeInfo struct { - // Authenticate - PubKey crypto.PubKey `json:"pub_key"` // authenticated pubkey - ListenAddr string `json:"listen_addr"` // accepting incoming - - // Check compatibility - Network string `json:"network"` // network/chain ID - Version string `json:"version"` // major.minor.revision - - // Sanitize - Moniker string `json:"moniker"` // arbitrary moniker - Other []string `json:"other"` // other application specific data -} - -// Validate checks the self-reported NodeInfo is safe. -// It returns an error if the info.PubKey doesn't match the given pubKey. -// TODO: constraints for Moniker/Other? Or is that for the UI ? -func (info NodeInfo) Validate(pubKey crypto.PubKey) error { - if !info.PubKey.Equals(pubKey) { - return fmt.Errorf("info.PubKey (%v) doesn't match peer.PubKey (%v)", - info.PubKey, pubKey) - } - return nil -} - -// CONTRACT: two nodes are compatible if the major/minor versions match and network match -func (info NodeInfo) CompatibleWith(other NodeInfo) error { - iMajor, iMinor, _, iErr := splitVersion(info.Version) - oMajor, oMinor, _, oErr := splitVersion(other.Version) - - // if our own version number is not formatted right, we messed up - if iErr != nil { - return iErr - } - - // version number must be formatted correctly ("x.x.x") - if oErr != nil { - return oErr - } - - // major version must match - if iMajor != oMajor { - return fmt.Errorf("Peer is on a different major version. Got %v, expected %v", oMajor, iMajor) - } - - // minor version must match - if iMinor != oMinor { - return fmt.Errorf("Peer is on a different minor version. Got %v, expected %v", oMinor, iMinor) - } - - // nodes must be on the same network - if info.Network != other.Network { - return fmt.Errorf("Peer is on a different network. Got %v, expected %v", other.Network, info.Network) - } - - return nil -} - -func (info NodeInfo) ID() ID { - return PubKeyToID(info.PubKey) -} - -func (info NodeInfo) NetAddress() *NetAddress { - id := PubKeyToID(info.PubKey) - addr := info.ListenAddr - netAddr, err := NewNetAddressString(IDAddressString(id, addr)) - if err != nil { - panic(err) // everything should be well formed by now - } - return netAddr -} - -func (info NodeInfo) ListenHost() string { - host, _, _ := net.SplitHostPort(info.ListenAddr) // nolint: errcheck, gas - return host -} - -func (info NodeInfo) ListenPort() int { - _, port, _ := net.SplitHostPort(info.ListenAddr) // nolint: errcheck, gas - port_i, err := strconv.Atoi(port) - if err != nil { - return -1 - } - return port_i -} - -func (info NodeInfo) String() string { - return fmt.Sprintf("NodeInfo{pk: %v, moniker: %v, network: %v [listen %v], version: %v (%v)}", info.PubKey, info.Moniker, info.Network, info.ListenAddr, info.Version, info.Other) -} - -func splitVersion(version string) (string, string, string, error) { - spl := strings.Split(version, ".") - if len(spl) != 3 { - return "", "", "", fmt.Errorf("Invalid version format %v", version) - } - return spl[0], spl[1], spl[2], nil -} +type ChannelDescriptor = conn.ChannelDescriptor +type ConnectionStatus = conn.ConnectionStatus diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index 301977ac3..2edb3f3d1 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -32,7 +32,7 @@ type P2P interface { NumPeers() (outbound, inbound, dialig int) NodeInfo() p2p.NodeInfo IsListening() bool - DialPeersAsync(*p2p.AddrBook, []string, bool) error + DialPeersAsync(p2p.AddrBook, []string, bool) error } //---------------------------------------------- @@ -54,7 +54,7 @@ var ( // objects pubKey crypto.PubKey genDoc *types.GenesisDoc // cache the genesis structure - addrBook *p2p.AddrBook + addrBook p2p.AddrBook txIndexer txindex.TxIndexer consensusReactor *consensus.ConsensusReactor eventBus *types.EventBus // thread safe @@ -94,7 +94,7 @@ func SetGenesisDoc(doc *types.GenesisDoc) { genDoc = doc } -func SetAddrBook(book *p2p.AddrBook) { +func SetAddrBook(book p2p.AddrBook) { addrBook = book } diff --git a/test/p2p/pex/test_addrbook.sh b/test/p2p/pex/test_addrbook.sh index 1dd26b172..d54bcf428 100644 --- a/test/p2p/pex/test_addrbook.sh +++ b/test/p2p/pex/test_addrbook.sh @@ -17,18 +17,18 @@ CLIENT_NAME="pex_addrbook_$ID" echo "1. restart peer $ID" docker stop "local_testnet_$ID" # preserve addrbook.json -docker cp "local_testnet_$ID:/go/src/github.com/tendermint/tendermint/test/p2p/data/mach1/core/addrbook.json" "/tmp/addrbook.json" +docker cp "local_testnet_$ID:/go/src/github.com/tendermint/tendermint/test/p2p/data/mach1/core/config/addrbook.json" "/tmp/addrbook.json" set +e #CIRCLE docker rm -vf "local_testnet_$ID" set -e # NOTE that we do not provide persistent_peers bash test/p2p/peer.sh "$DOCKER_IMAGE" "$NETWORK_NAME" "$ID" "$PROXY_APP" "--p2p.pex --rpc.unsafe" -docker cp "/tmp/addrbook.json" "local_testnet_$ID:/go/src/github.com/tendermint/tendermint/test/p2p/data/mach1/core/addrbook.json" +docker cp "/tmp/addrbook.json" "local_testnet_$ID:/go/src/github.com/tendermint/tendermint/test/p2p/data/mach1/core/config/addrbook.json" echo "with the following addrbook:" cat /tmp/addrbook.json # exec doesn't work on circle -# docker exec "local_testnet_$ID" cat "/go/src/github.com/tendermint/tendermint/test/p2p/data/mach1/core/addrbook.json" +# docker exec "local_testnet_$ID" cat "/go/src/github.com/tendermint/tendermint/test/p2p/data/mach1/core/config/addrbook.json" echo "" # if the client runs forever, it means addrbook wasn't saved or was empty @@ -44,7 +44,7 @@ echo "1. restart peer $ID" docker stop "local_testnet_$ID" set +e #CIRCLE docker rm -vf "local_testnet_$ID" -set -e +set -e # NOTE that we do not provide persistent_peers bash test/p2p/peer.sh "$DOCKER_IMAGE" "$NETWORK_NAME" "$ID" "$PROXY_APP" "--p2p.pex --rpc.unsafe"