From 0baa7588c278c43cd19fbf435f1de17ece9923a7 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 12 Oct 2018 19:25:33 -0400 Subject: [PATCH] p2p: NodeInfo is an interface; General cleanup (#2556) * p2p: NodeInfo is an interface * (squash) fixes from review * (squash) more fixes from review * p2p: remove peerConn.HandshakeTimeout * p2p: NodeInfo is two interfaces. Remove String() * fixes from review * remove test code from peer.RemoteIP() * p2p: remove peer.OriginalAddr(). See #2618 * use a mockPeer in peer_set_test.go * p2p: fix testNodeInfo naming * p2p: remove unused var * remove testRandNodeInfo * fix linter * fix retry dialing self * fix rpc --- benchmarks/codec_test.go | 50 +++++---------- blockchain/reactor_test.go | 3 +- consensus/common_test.go | 2 +- node/node.go | 6 +- p2p/dummy/peer.go | 7 +- p2p/errors.go | 5 +- p2p/node_info.go | 76 +++++++++++++--------- p2p/peer.go | 107 ++++++++----------------------- p2p/peer_set_test.go | 45 +++++++------ p2p/peer_test.go | 36 +++++------ p2p/pex/pex_reactor_test.go | 7 +- p2p/switch.go | 13 ++-- p2p/switch_test.go | 1 + p2p/test_util.go | 89 +++++++++++++------------ p2p/transport.go | 35 +++++----- p2p/transport_test.go | 83 +++++++++--------------- rpc/core/consensus.go | 3 +- rpc/core/net.go | 9 ++- rpc/core/status.go | 3 +- rpc/core/types/responses.go | 8 +-- rpc/core/types/responses_test.go | 12 ++-- 21 files changed, 269 insertions(+), 331 deletions(-) diff --git a/benchmarks/codec_test.go b/benchmarks/codec_test.go index c0e13d168..71d7a83b2 100644 --- a/benchmarks/codec_test.go +++ b/benchmarks/codec_test.go @@ -12,23 +12,27 @@ import ( ctypes "github.com/tendermint/tendermint/rpc/core/types" ) +func testNodeInfo(id p2p.ID) p2p.DefaultNodeInfo { + return p2p.DefaultNodeInfo{ + ID_: id, + Moniker: "SOMENAME", + Network: "SOMENAME", + ListenAddr: "SOMEADDR", + Version: "SOMEVER", + Other: p2p.DefaultNodeInfoOther{ + AminoVersion: "SOMESTRING", + P2PVersion: "OTHERSTRING", + }, + } +} + func BenchmarkEncodeStatusWire(b *testing.B) { b.StopTimer() cdc := amino.NewCodec() ctypes.RegisterAmino(cdc) nodeKey := p2p.NodeKey{PrivKey: ed25519.GenPrivKey()} status := &ctypes.ResultStatus{ - NodeInfo: p2p.NodeInfo{ - ID: nodeKey.ID(), - Moniker: "SOMENAME", - Network: "SOMENAME", - ListenAddr: "SOMEADDR", - Version: "SOMEVER", - Other: p2p.NodeInfoOther{ - AminoVersion: "SOMESTRING", - P2PVersion: "OTHERSTRING", - }, - }, + NodeInfo: testNodeInfo(nodeKey.ID()), SyncInfo: ctypes.SyncInfo{ LatestBlockHash: []byte("SOMEBYTES"), LatestBlockHeight: 123, @@ -56,17 +60,7 @@ func BenchmarkEncodeNodeInfoWire(b *testing.B) { cdc := amino.NewCodec() ctypes.RegisterAmino(cdc) nodeKey := p2p.NodeKey{PrivKey: ed25519.GenPrivKey()} - nodeInfo := p2p.NodeInfo{ - ID: nodeKey.ID(), - Moniker: "SOMENAME", - Network: "SOMENAME", - ListenAddr: "SOMEADDR", - Version: "SOMEVER", - Other: p2p.NodeInfoOther{ - AminoVersion: "SOMESTRING", - P2PVersion: "OTHERSTRING", - }, - } + nodeInfo := testNodeInfo(nodeKey.ID()) b.StartTimer() counter := 0 @@ -84,17 +78,7 @@ func BenchmarkEncodeNodeInfoBinary(b *testing.B) { cdc := amino.NewCodec() ctypes.RegisterAmino(cdc) nodeKey := p2p.NodeKey{PrivKey: ed25519.GenPrivKey()} - nodeInfo := p2p.NodeInfo{ - ID: nodeKey.ID(), - Moniker: "SOMENAME", - Network: "SOMENAME", - ListenAddr: "SOMEADDR", - Version: "SOMEVER", - Other: p2p.NodeInfoOther{ - AminoVersion: "SOMESTRING", - P2PVersion: "OTHERSTRING", - }, - } + nodeInfo := testNodeInfo(nodeKey.ID()) b.StartTimer() counter := 0 diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index b63a057e1..7fc7ffb77 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -198,7 +198,7 @@ func (tp *bcrTestPeer) TrySend(chID byte, msgBytes []byte) bool { } func (tp *bcrTestPeer) Send(chID byte, msgBytes []byte) bool { return tp.TrySend(chID, msgBytes) } -func (tp *bcrTestPeer) NodeInfo() p2p.NodeInfo { return p2p.NodeInfo{} } +func (tp *bcrTestPeer) NodeInfo() p2p.NodeInfo { return p2p.DefaultNodeInfo{} } func (tp *bcrTestPeer) Status() p2p.ConnectionStatus { return p2p.ConnectionStatus{} } func (tp *bcrTestPeer) ID() p2p.ID { return tp.id } func (tp *bcrTestPeer) IsOutbound() bool { return false } @@ -206,4 +206,3 @@ func (tp *bcrTestPeer) IsPersistent() bool { return true } func (tp *bcrTestPeer) Get(s string) interface{} { return s } func (tp *bcrTestPeer) Set(string, interface{}) {} func (tp *bcrTestPeer) RemoteIP() net.IP { return []byte{127, 0, 0, 1} } -func (tp *bcrTestPeer) OriginalAddr() *p2p.NetAddress { return nil } diff --git a/consensus/common_test.go b/consensus/common_test.go index 26f8e3e57..ddce69145 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -568,7 +568,7 @@ func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerF func getSwitchIndex(switches []*p2p.Switch, peer p2p.Peer) int { for i, s := range switches { - if peer.NodeInfo().ID == s.NodeInfo().ID { + if peer.NodeInfo().ID() == s.NodeInfo().ID() { return i } } diff --git a/node/node.go b/node/node.go index 9c409787d..ed0fa1198 100644 --- a/node/node.go +++ b/node/node.go @@ -761,8 +761,8 @@ func makeNodeInfo( if _, ok := txIndexer.(*null.TxIndex); ok { txIndexerStatus = "off" } - nodeInfo := p2p.NodeInfo{ - ID: nodeID, + nodeInfo := p2p.DefaultNodeInfo{ + ID_: nodeID, Network: chainID, Version: version.Version, Channels: []byte{ @@ -772,7 +772,7 @@ func makeNodeInfo( evidence.EvidenceChannel, }, Moniker: config.Moniker, - Other: p2p.NodeInfoOther{ + Other: p2p.DefaultNodeInfoOther{ AminoVersion: amino.Version, P2PVersion: p2p.Version, ConsensusVersion: cs.Version, diff --git a/p2p/dummy/peer.go b/p2p/dummy/peer.go index bb6e822fc..4871719d4 100644 --- a/p2p/dummy/peer.go +++ b/p2p/dummy/peer.go @@ -42,7 +42,7 @@ func (p *peer) IsPersistent() bool { // NodeInfo always returns empty node info. func (p *peer) NodeInfo() p2p.NodeInfo { - return p2p.NodeInfo{} + return p2p.DefaultNodeInfo{} } // RemoteIP always returns localhost. @@ -78,8 +78,3 @@ func (p *peer) Get(key string) interface{} { } return nil } - -// OriginalAddr always returns nil. -func (p *peer) OriginalAddr() *p2p.NetAddress { - return nil -} diff --git a/p2p/errors.go b/p2p/errors.go index 902d22034..706150945 100644 --- a/p2p/errors.go +++ b/p2p/errors.go @@ -40,13 +40,12 @@ func (e ErrRejected) Error() string { if e.isDuplicate { if e.conn != nil { return fmt.Sprintf( - "duplicate CONN<%s>: %s", + "duplicate CONN<%s>", e.conn.RemoteAddr().String(), - e.err, ) } if e.id != "" { - return fmt.Sprintf("duplicate ID<%v>: %s", e.id, e.err) + return fmt.Sprintf("duplicate ID<%v>", e.id) } } diff --git a/p2p/node_info.go b/p2p/node_info.go index a16535949..a468443d1 100644 --- a/p2p/node_info.go +++ b/p2p/node_info.go @@ -2,6 +2,7 @@ package p2p import ( "fmt" + "reflect" "strings" cmn "github.com/tendermint/tendermint/libs/common" @@ -17,12 +18,32 @@ func MaxNodeInfoSize() int { return maxNodeInfoSize } -// NodeInfo is the basic node information exchanged +// NodeInfo exposes basic info of a node +// and determines if we're compatible +type NodeInfo interface { + nodeInfoAddress + nodeInfoTransport +} + +// nodeInfoAddress exposes just the core info of a node. +type nodeInfoAddress interface { + ID() ID + NetAddress() *NetAddress +} + +// nodeInfoTransport is validates a nodeInfo and checks +// our compatibility with it. It's for use in the handshake. +type nodeInfoTransport interface { + ValidateBasic() error + CompatibleWith(other NodeInfo) error +} + +// DefaultNodeInfo is the basic node information exchanged // between two peers during the Tendermint P2P handshake. -type NodeInfo struct { +type DefaultNodeInfo struct { // Authenticate // TODO: replace with NetAddress - ID ID `json:"id"` // authenticated identifier + ID_ ID `json:"id"` // authenticated identifier ListenAddr string `json:"listen_addr"` // accepting incoming // Check compatibility. @@ -32,12 +53,12 @@ type NodeInfo struct { Channels cmn.HexBytes `json:"channels"` // channels this node knows about // ASCIIText fields - Moniker string `json:"moniker"` // arbitrary moniker - Other NodeInfoOther `json:"other"` // other application specific data + Moniker string `json:"moniker"` // arbitrary moniker + Other DefaultNodeInfoOther `json:"other"` // other application specific data } -// NodeInfoOther is the misc. applcation specific data -type NodeInfoOther struct { +// DefaultNodeInfoOther is the misc. applcation specific data +type DefaultNodeInfoOther struct { AminoVersion string `json:"amino_version"` P2PVersion string `json:"p2p_version"` ConsensusVersion string `json:"consensus_version"` @@ -46,19 +67,12 @@ type NodeInfoOther struct { RPCAddress string `json:"rpc_address"` } -func (o NodeInfoOther) String() string { - return fmt.Sprintf( - "{amino_version: %v, p2p_version: %v, consensus_version: %v, rpc_version: %v, tx_index: %v, rpc_address: %v}", - o.AminoVersion, - o.P2PVersion, - o.ConsensusVersion, - o.RPCVersion, - o.TxIndex, - o.RPCAddress, - ) +// ID returns the node's peer ID. +func (info DefaultNodeInfo) ID() ID { + return info.ID_ } -// Validate checks the self-reported NodeInfo is safe. +// ValidateBasic checks the self-reported DefaultNodeInfo is safe. // It returns an error if there // are too many Channels, if there are any duplicate Channels, // if the ListenAddr is malformed, or if the ListenAddr is a host name @@ -71,7 +85,7 @@ func (o NodeInfoOther) String() string { // International clients could then use punycode (or we could use // url-encoding), and we just need to be careful with how we handle that in our // clients. (e.g. off by default). -func (info NodeInfo) Validate() error { +func (info DefaultNodeInfo) ValidateBasic() error { if len(info.Channels) > maxNumChannels { return fmt.Errorf("info.Channels is too long (%v). Max is %v", len(info.Channels), maxNumChannels) } @@ -111,14 +125,19 @@ func (info NodeInfo) Validate() error { } // ensure ListenAddr is good - _, err := NewNetAddressString(IDAddressString(info.ID, info.ListenAddr)) + _, err := NewNetAddressString(IDAddressString(info.ID(), info.ListenAddr)) return err } -// CompatibleWith checks if two NodeInfo are compatible with eachother. +// CompatibleWith checks if two DefaultNodeInfo are compatible with eachother. // CONTRACT: two nodes are compatible if the major version matches and network match // and they have at least one channel in common. -func (info NodeInfo) CompatibleWith(other NodeInfo) error { +func (info DefaultNodeInfo) CompatibleWith(other_ NodeInfo) error { + other, ok := other_.(DefaultNodeInfo) + if !ok { + return fmt.Errorf("wrong NodeInfo type. Expected DefaultNodeInfo, got %v", reflect.TypeOf(other_)) + } + iMajor, _, _, iErr := splitVersion(info.Version) oMajor, _, _, oErr := splitVersion(other.Version) @@ -164,18 +183,18 @@ OUTER_LOOP: return nil } -// NetAddress returns a NetAddress derived from the NodeInfo - +// NetAddress returns a NetAddress derived from the DefaultNodeInfo - // it includes the authenticated peer ID and the self-reported // ListenAddr. Note that the ListenAddr is not authenticated and // may not match that address actually dialed if its an outbound peer. -func (info NodeInfo) NetAddress() *NetAddress { - netAddr, err := NewNetAddressString(IDAddressString(info.ID, info.ListenAddr)) +func (info DefaultNodeInfo) NetAddress() *NetAddress { + netAddr, err := NewNetAddressString(IDAddressString(info.ID(), info.ListenAddr)) if err != nil { switch err.(type) { case ErrNetAddressLookup: // XXX If the peer provided a host name and the lookup fails here // we're out of luck. - // TODO: use a NetAddress in NodeInfo + // TODO: use a NetAddress in DefaultNodeInfo default: panic(err) // everything should be well formed by now } @@ -183,11 +202,6 @@ func (info NodeInfo) NetAddress() *NetAddress { return netAddr } -func (info NodeInfo) String() string { - return fmt.Sprintf("NodeInfo{id: %v, moniker: %v, network: %v [listen %v], version: %v (%v)}", - info.ID, info.Moniker, info.Network, info.ListenAddr, info.Version, info.Other) -} - func splitVersion(version string) (string, string, string, error) { spl := strings.Split(version, ".") if len(spl) != 3 { diff --git a/p2p/peer.go b/p2p/peer.go index ba22695e7..009313141 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -3,7 +3,6 @@ package p2p import ( "fmt" "net" - "sync/atomic" "time" cmn "github.com/tendermint/tendermint/libs/common" @@ -15,19 +14,18 @@ import ( const metricsTickerDuration = 10 * time.Second -var testIPSuffix uint32 - // Peer is an interface representing a peer connected on a reactor. type Peer interface { cmn.Service - ID() ID // peer's cryptographic ID - RemoteIP() net.IP // remote IP of the connection + ID() ID // peer's cryptographic ID + RemoteIP() net.IP // remote IP of the connection + IsOutbound() bool // did we dial the peer IsPersistent() bool // do we redial this peer when we disconnect + NodeInfo() NodeInfo // peer's info Status() tmconn.ConnectionStatus - OriginalAddr() *NetAddress Send(byte, []byte) bool TrySend(byte, []byte) bool @@ -40,12 +38,13 @@ type Peer interface { // peerConn contains the raw connection and its config. type peerConn struct { - outbound bool - persistent bool - config *config.P2PConfig - conn net.Conn // source connection - ip net.IP - originalAddr *NetAddress // nil for inbound connections + outbound bool + persistent bool + config *config.P2PConfig + conn net.Conn // source connection + + // cached RemoteIP() + ip net.IP } // ID only exists for SecretConnection. @@ -60,14 +59,6 @@ func (pc peerConn) RemoteIP() net.IP { return pc.ip } - // In test cases a conn could not be present at all or be an in-memory - // implementation where we want to return a fake ip. - if pc.conn == nil || pc.conn.RemoteAddr().String() == "pipe" { - pc.ip = net.IP{172, 16, 0, byte(atomic.AddUint32(&testIPSuffix, 1))} - - return pc.ip - } - host, _, err := net.SplitHostPort(pc.conn.RemoteAddr().String()) if err != nil { panic(err) @@ -120,7 +111,7 @@ func newPeer( p := &peer{ peerConn: pc, nodeInfo: nodeInfo, - channels: nodeInfo.Channels, + channels: nodeInfo.(DefaultNodeInfo).Channels, // TODO Data: cmn.NewCMap(), metricsTicker: time.NewTicker(metricsTickerDuration), metrics: NopMetrics(), @@ -142,6 +133,15 @@ func newPeer( return p } +// String representation. +func (p *peer) String() string { + if p.outbound { + return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.ID()) + } + + return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID()) +} + //--------------------------------------------------- // Implements cmn.Service @@ -177,7 +177,7 @@ func (p *peer) OnStop() { // ID returns the peer's ID - the hex encoded hash of its pubkey. func (p *peer) ID() ID { - return p.nodeInfo.ID + return p.nodeInfo.ID() } // IsOutbound returns true if the connection is outbound, false otherwise. @@ -195,15 +195,6 @@ func (p *peer) NodeInfo() NodeInfo { return p.nodeInfo } -// OriginalAddr returns the original address, which was used to connect with -// the peer. Returns nil for inbound peers. -func (p *peer) OriginalAddr() *NetAddress { - if p.peerConn.outbound { - return p.peerConn.originalAddr - } - return nil -} - // Status returns the peer's ConnectionStatus. func (p *peer) Status() tmconn.ConnectionStatus { return p.mconn.Status() @@ -272,53 +263,14 @@ func (p *peer) hasChannel(chID byte) bool { } //--------------------------------------------------- -// methods used by the Switch +// methods only used for testing +// TODO: can we remove these? -// CloseConn should be called by the Switch if the peer was created but never -// started. +// CloseConn closes the underlying connection func (pc *peerConn) CloseConn() { pc.conn.Close() // nolint: errcheck } -// HandshakeTimeout performs the Tendermint P2P handshake between a given node -// and the peer by exchanging their NodeInfo. It sets the received nodeInfo on -// the peer. -// NOTE: blocking -func (pc *peerConn) HandshakeTimeout( - ourNodeInfo NodeInfo, - timeout time.Duration, -) (peerNodeInfo NodeInfo, err error) { - // Set deadline for handshake so we don't block forever on conn.ReadFull - if err := pc.conn.SetDeadline(time.Now().Add(timeout)); err != nil { - return peerNodeInfo, cmn.ErrorWrap(err, "Error setting deadline") - } - - var trs, _ = cmn.Parallel( - func(_ int) (val interface{}, err error, abort bool) { - _, err = cdc.MarshalBinaryWriter(pc.conn, ourNodeInfo) - return - }, - func(_ int) (val interface{}, err error, abort bool) { - _, err = cdc.UnmarshalBinaryReader( - pc.conn, - &peerNodeInfo, - int64(MaxNodeInfoSize()), - ) - return - }, - ) - if err := trs.FirstError(); err != nil { - return peerNodeInfo, cmn.ErrorWrap(err, "Error during handshake") - } - - // Remove deadline - if err := pc.conn.SetDeadline(time.Time{}); err != nil { - return peerNodeInfo, cmn.ErrorWrap(err, "Error removing deadline") - } - - return peerNodeInfo, nil -} - // Addr returns peer's remote network address. func (p *peer) Addr() net.Addr { return p.peerConn.conn.RemoteAddr() @@ -332,14 +284,7 @@ func (p *peer) CanSend(chID byte) bool { return p.mconn.CanSend(chID) } -// String representation. -func (p *peer) String() string { - if p.outbound { - return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.ID()) - } - - return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID()) -} +//--------------------------------------------------- func PeerMetrics(metrics *Metrics) PeerOption { return func(p *peer) { diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index ee1c52eab..c0ad80005 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -1,7 +1,6 @@ package p2p import ( - "fmt" "net" "sync" "testing" @@ -12,24 +11,34 @@ import ( cmn "github.com/tendermint/tendermint/libs/common" ) -// Returns an empty kvstore peer -func randPeer(ip net.IP) *peer { +// mockPeer for testing the PeerSet +type mockPeer struct { + cmn.BaseService + ip net.IP + id ID +} + +func (mp *mockPeer) TrySend(chID byte, msgBytes []byte) bool { return true } +func (mp *mockPeer) Send(chID byte, msgBytes []byte) bool { return true } +func (mp *mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} } +func (mp *mockPeer) Status() ConnectionStatus { return ConnectionStatus{} } +func (mp *mockPeer) ID() ID { return mp.id } +func (mp *mockPeer) IsOutbound() bool { return false } +func (mp *mockPeer) IsPersistent() bool { return true } +func (mp *mockPeer) Get(s string) interface{} { return s } +func (mp *mockPeer) Set(string, interface{}) {} +func (mp *mockPeer) RemoteIP() net.IP { return mp.ip } + +// Returns a mock peer +func newMockPeer(ip net.IP) *mockPeer { if ip == nil { ip = net.IP{127, 0, 0, 1} } - nodeKey := NodeKey{PrivKey: ed25519.GenPrivKey()} - p := &peer{ - nodeInfo: NodeInfo{ - ID: nodeKey.ID(), - ListenAddr: fmt.Sprintf("%v.%v.%v.%v:26656", cmn.RandInt()%256, cmn.RandInt()%256, cmn.RandInt()%256, cmn.RandInt()%256), - }, - metrics: NopMetrics(), + return &mockPeer{ + ip: ip, + id: nodeKey.ID(), } - - p.ip = ip - - return p } func TestPeerSetAddRemoveOne(t *testing.T) { @@ -39,7 +48,7 @@ func TestPeerSetAddRemoveOne(t *testing.T) { var peerList []Peer for i := 0; i < 5; i++ { - p := randPeer(net.IP{127, 0, 0, byte(i)}) + p := newMockPeer(net.IP{127, 0, 0, byte(i)}) if err := peerSet.Add(p); err != nil { t.Error(err) } @@ -83,7 +92,7 @@ func TestPeerSetAddRemoveMany(t *testing.T) { peers := []Peer{} N := 100 for i := 0; i < N; i++ { - peer := randPeer(net.IP{127, 0, 0, byte(i)}) + peer := newMockPeer(net.IP{127, 0, 0, byte(i)}) if err := peerSet.Add(peer); err != nil { t.Errorf("Failed to add new peer") } @@ -107,7 +116,7 @@ func TestPeerSetAddRemoveMany(t *testing.T) { func TestPeerSetAddDuplicate(t *testing.T) { t.Parallel() peerSet := NewPeerSet() - peer := randPeer(nil) + peer := newMockPeer(nil) n := 20 errsChan := make(chan error) @@ -149,7 +158,7 @@ func TestPeerSetGet(t *testing.T) { var ( peerSet = NewPeerSet() - peer = randPeer(nil) + peer = newMockPeer(nil) ) assert.Nil(t, peerSet.Get(peer.ID()), "expecting a nil lookup, before .Add") diff --git a/p2p/peer_test.go b/p2p/peer_test.go index a2a2946a1..fecf7f1cc 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -19,8 +19,6 @@ import ( tmconn "github.com/tendermint/tendermint/p2p/conn" ) -const testCh = 0x01 - func TestPeerBasic(t *testing.T) { assert, require := assert.New(t), require.New(t) @@ -81,18 +79,14 @@ func createOutboundPeerAndPerformHandshake( if err != nil { return nil, err } - nodeInfo, err := pc.HandshakeTimeout(NodeInfo{ - ID: addr.ID, - Moniker: "host_peer", - Network: "testing", - Version: "123.123.123", - Channels: []byte{testCh}, - }, 1*time.Second) + timeout := 1 * time.Second + ourNodeInfo := testNodeInfo(addr.ID, "host_peer") + peerNodeInfo, err := handshake(pc.conn, timeout, ourNodeInfo) if err != nil { return nil, err } - p := newPeer(pc, mConfig, nodeInfo, reactorsByCh, chDescs, func(p Peer, r interface{}) {}) + p := newPeer(pc, mConfig, peerNodeInfo, reactorsByCh, chDescs, func(p Peer, r interface{}) {}) p.SetLogger(log.TestingLogger().With("peer", addr)) return p, nil } @@ -120,7 +114,7 @@ func testOutboundPeerConn( return peerConn{}, cmn.ErrorWrap(err, "Error creating peer") } - pc, err := testPeerConn(conn, config, true, persistent, ourNodePrivKey, addr) + pc, err := testPeerConn(conn, config, true, persistent, ourNodePrivKey) if err != nil { if cerr := conn.Close(); cerr != nil { return peerConn{}, cmn.ErrorWrap(err, cerr.Error()) @@ -191,14 +185,7 @@ func (rp *remotePeer) accept(l net.Listener) { golog.Fatalf("Failed to create a peer: %+v", err) } - _, err = handshake(pc.conn, time.Second, NodeInfo{ - ID: rp.Addr().ID, - Moniker: "remote_peer", - Network: "testing", - Version: "123.123.123", - ListenAddr: l.Addr().String(), - Channels: rp.channels, - }) + _, err = handshake(pc.conn, time.Second, rp.nodeInfo(l)) if err != nil { golog.Fatalf("Failed to perform handshake: %+v", err) } @@ -217,3 +204,14 @@ func (rp *remotePeer) accept(l net.Listener) { } } } + +func (rp *remotePeer) nodeInfo(l net.Listener) NodeInfo { + return DefaultNodeInfo{ + ID_: rp.Addr().ID, + Moniker: "remote_peer", + Network: "testing", + Version: "123.123.123", + ListenAddr: l.Addr().String(), + Channels: rp.channels, + } +} diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index c22eabdc1..b0338c3c2 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -320,7 +320,7 @@ func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) { peer := p2p.CreateRandomPeer(false) pexR, book := createReactor(&PEXReactorConfig{}) - book.AddPrivateIDs([]string{string(peer.NodeInfo().ID)}) + book.AddPrivateIDs([]string{string(peer.NodeInfo().ID())}) defer teardownReactor(book) // we have to send a request to receive responses @@ -391,8 +391,8 @@ func (mp mockPeer) ID() p2p.ID { return mp.addr.ID } func (mp mockPeer) IsOutbound() bool { return mp.outbound } func (mp mockPeer) IsPersistent() bool { return mp.persistent } func (mp mockPeer) NodeInfo() p2p.NodeInfo { - return p2p.NodeInfo{ - ID: mp.addr.ID, + return p2p.DefaultNodeInfo{ + ID_: mp.addr.ID, ListenAddr: mp.addr.DialString(), } } @@ -402,7 +402,6 @@ func (mockPeer) Send(byte, []byte) bool { return false } func (mockPeer) TrySend(byte, []byte) bool { return false } func (mockPeer) Set(string, interface{}) {} func (mockPeer) Get(string) interface{} { return nil } -func (mockPeer) OriginalAddr() *p2p.NetAddress { return nil } func assertPeersWithTimeout( t *testing.T, diff --git a/p2p/switch.go b/p2p/switch.go index 8325d7e82..64e248fc3 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -280,12 +280,9 @@ func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) { sw.stopAndRemovePeer(peer, reason) if peer.IsPersistent() { - addr := peer.OriginalAddr() - if addr == nil { - // FIXME: persistent peers can't be inbound right now. - // self-reported address for inbound persistent peers - addr = peer.NodeInfo().NetAddress() - } + // TODO: use the original address dialed, not the self reported one + // See #2618. + addr := peer.NodeInfo().NetAddress() go sw.reconnectToPeer(addr) } } @@ -560,9 +557,13 @@ func (sw *Switch) addOutboundPeerWithConfig( // to avoid dialing in the future. sw.addrBook.RemoveAddress(addr) sw.addrBook.AddOurAddress(addr) + + return err } } + // retry persistent peers after + // any dial error besides IsSelf() if persistent { go sw.reconnectToPeer(addr) } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 4fea3cfe0..f52e47f06 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -143,6 +143,7 @@ func assertMsgReceivedWithTimeout(t *testing.T, msgBytes []byte, channel byte, r } return } + case <-time.After(timeout): t.Fatalf("Expected to have received 1 message in channel #%v, got zero", channel) } diff --git a/p2p/test_util.go b/p2p/test_util.go index e35e0989f..2859dc645 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -14,6 +14,19 @@ import ( "github.com/tendermint/tendermint/p2p/conn" ) +const testCh = 0x01 + +//------------------------------------------------ + +type mockNodeInfo struct { + addr *NetAddress +} + +func (ni mockNodeInfo) ID() ID { return ni.addr.ID } +func (ni mockNodeInfo) NetAddress() *NetAddress { return ni.addr } +func (ni mockNodeInfo) ValidateBasic() error { return nil } +func (ni mockNodeInfo) CompatibleWith(other NodeInfo) error { return nil } + func AddPeerToSwitch(sw *Switch, peer Peer) { sw.peers.Add(peer) } @@ -24,12 +37,9 @@ func CreateRandomPeer(outbound bool) *peer { peerConn: peerConn{ outbound: outbound, }, - nodeInfo: NodeInfo{ - ID: netAddr.ID, - ListenAddr: netAddr.DialString(), - }, - mconn: &conn.MConnection{}, - metrics: NopMetrics(), + nodeInfo: mockNodeInfo{netAddr}, + mconn: &conn.MConnection{}, + metrics: NopMetrics(), } p.SetLogger(log.TestingLogger().With("peer", addr)) return p @@ -159,36 +169,15 @@ func MakeSwitch( initSwitch func(int, *Switch) *Switch, opts ...SwitchOption, ) *Switch { - var ( - nodeKey = NodeKey{ - PrivKey: ed25519.GenPrivKey(), - } - ni = NodeInfo{ - ID: nodeKey.ID(), - Moniker: fmt.Sprintf("switch%d", i), - Network: network, - Version: version, - ListenAddr: fmt.Sprintf("127.0.0.1:%d", cmn.RandIntn(64512)+1023), - Other: NodeInfoOther{ - AminoVersion: "1.0", - P2PVersion: "1.0", - ConsensusVersion: "1.0", - RPCVersion: "1.0", - TxIndex: "off", - RPCAddress: fmt.Sprintf("127.0.0.1:%d", cmn.RandIntn(64512)+1023), - }, - } - ) - addr, err := NewNetAddressStringWithOptionalID( - IDAddressString(nodeKey.ID(), ni.ListenAddr), - ) - if err != nil { - panic(err) + nodeKey := NodeKey{ + PrivKey: ed25519.GenPrivKey(), } + nodeInfo := testNodeInfo(nodeKey.ID(), fmt.Sprintf("node%d", i)) - t := NewMultiplexTransport(ni, nodeKey) + t := NewMultiplexTransport(nodeInfo, nodeKey) + addr := nodeInfo.NetAddress() if err := t.Listen(*addr); err != nil { panic(err) } @@ -198,14 +187,16 @@ func MakeSwitch( sw.SetLogger(log.TestingLogger()) sw.SetNodeKey(&nodeKey) + ni := nodeInfo.(DefaultNodeInfo) for ch := range sw.reactorsByCh { ni.Channels = append(ni.Channels, ch) } + nodeInfo = ni // TODO: We need to setup reactors ahead of time so the NodeInfo is properly // populated and we don't have to do those awkward overrides and setters. - t.nodeInfo = ni - sw.SetNodeInfo(ni) + t.nodeInfo = nodeInfo + sw.SetNodeInfo(nodeInfo) return sw } @@ -215,7 +206,7 @@ func testInboundPeerConn( config *config.P2PConfig, ourNodePrivKey crypto.PrivKey, ) (peerConn, error) { - return testPeerConn(conn, config, false, false, ourNodePrivKey, nil) + return testPeerConn(conn, config, false, false, ourNodePrivKey) } func testPeerConn( @@ -223,7 +214,6 @@ func testPeerConn( cfg *config.P2PConfig, outbound, persistent bool, ourNodePrivKey crypto.PrivKey, - originalAddr *NetAddress, ) (pc peerConn, err error) { conn := rawConn @@ -241,10 +231,27 @@ func testPeerConn( // Only the information we already have return peerConn{ - config: cfg, - outbound: outbound, - persistent: persistent, - conn: conn, - originalAddr: originalAddr, + config: cfg, + outbound: outbound, + persistent: persistent, + conn: conn, }, nil } + +//---------------------------------------------------------------- +// rand node info + +func testNodeInfo(id ID, name string) NodeInfo { + return testNodeInfoWithNetwork(id, name, "testing") +} + +func testNodeInfoWithNetwork(id ID, name, network string) NodeInfo { + return DefaultNodeInfo{ + ID_: id, + ListenAddr: fmt.Sprintf("127.0.0.1:%d", cmn.RandIntn(64512)+1023), + Moniker: name, + Network: network, + Version: "123.123.123", + Channels: []byte{testCh}, + } +} diff --git a/p2p/transport.go b/p2p/transport.go index 6f097b4f7..b20f32f3d 100644 --- a/p2p/transport.go +++ b/p2p/transport.go @@ -335,7 +335,7 @@ func (mt *MultiplexTransport) upgrade( secretConn, err = upgradeSecretConn(c, mt.handshakeTimeout, mt.nodeKey.PrivKey) if err != nil { - return nil, NodeInfo{}, ErrRejected{ + return nil, nil, ErrRejected{ conn: c, err: fmt.Errorf("secrect conn failed: %v", err), isAuthFailure: true, @@ -344,15 +344,15 @@ func (mt *MultiplexTransport) upgrade( nodeInfo, err = handshake(secretConn, mt.handshakeTimeout, mt.nodeInfo) if err != nil { - return nil, NodeInfo{}, ErrRejected{ + return nil, nil, ErrRejected{ conn: c, err: fmt.Errorf("handshake failed: %v", err), isAuthFailure: true, } } - if err := nodeInfo.Validate(); err != nil { - return nil, NodeInfo{}, ErrRejected{ + if err := nodeInfo.ValidateBasic(); err != nil { + return nil, nil, ErrRejected{ conn: c, err: err, isNodeInfoInvalid: true, @@ -360,34 +360,34 @@ func (mt *MultiplexTransport) upgrade( } // Ensure connection key matches self reported key. - if connID := PubKeyToID(secretConn.RemotePubKey()); connID != nodeInfo.ID { - return nil, NodeInfo{}, ErrRejected{ + if connID := PubKeyToID(secretConn.RemotePubKey()); connID != nodeInfo.ID() { + return nil, nil, ErrRejected{ conn: c, id: connID, err: fmt.Errorf( "conn.ID (%v) NodeInfo.ID (%v) missmatch", connID, - nodeInfo.ID, + nodeInfo.ID(), ), isAuthFailure: true, } } // Reject self. - if mt.nodeInfo.ID == nodeInfo.ID { - return nil, NodeInfo{}, ErrRejected{ - addr: *NewNetAddress(nodeInfo.ID, c.RemoteAddr()), + if mt.nodeInfo.ID() == nodeInfo.ID() { + return nil, nil, ErrRejected{ + addr: *NewNetAddress(nodeInfo.ID(), c.RemoteAddr()), conn: c, - id: nodeInfo.ID, + id: nodeInfo.ID(), isSelf: true, } } if err := mt.nodeInfo.CompatibleWith(nodeInfo); err != nil { - return nil, NodeInfo{}, ErrRejected{ + return nil, nil, ErrRejected{ conn: c, err: err, - id: nodeInfo.ID, + id: nodeInfo.ID(), isIncompatible: true, } } @@ -430,17 +430,18 @@ func handshake( nodeInfo NodeInfo, ) (NodeInfo, error) { if err := c.SetDeadline(time.Now().Add(timeout)); err != nil { - return NodeInfo{}, err + return nil, err } var ( errc = make(chan error, 2) - peerNodeInfo NodeInfo + peerNodeInfo DefaultNodeInfo + ourNodeInfo = nodeInfo.(DefaultNodeInfo) ) go func(errc chan<- error, c net.Conn) { - _, err := cdc.MarshalBinaryWriter(c, nodeInfo) + _, err := cdc.MarshalBinaryWriter(c, ourNodeInfo) errc <- err }(errc, c) go func(errc chan<- error, c net.Conn) { @@ -455,7 +456,7 @@ func handshake( for i := 0; i < cap(errc); i++ { err := <-errc if err != nil { - return NodeInfo{}, err + return nil, err } } diff --git a/p2p/transport_test.go b/p2p/transport_test.go index 9e3cc467f..cce223a3e 100644 --- a/p2p/transport_test.go +++ b/p2p/transport_test.go @@ -11,9 +11,15 @@ import ( "github.com/tendermint/tendermint/crypto/ed25519" ) +var defaultNodeName = "host_peer" + +func emptyNodeInfo() NodeInfo { + return DefaultNodeInfo{} +} + func TestTransportMultiplexConnFilter(t *testing.T) { mt := NewMultiplexTransport( - NodeInfo{}, + emptyNodeInfo(), NodeKey{ PrivKey: ed25519.GenPrivKey(), }, @@ -70,7 +76,7 @@ func TestTransportMultiplexConnFilter(t *testing.T) { func TestTransportMultiplexConnFilterTimeout(t *testing.T) { mt := NewMultiplexTransport( - NodeInfo{}, + emptyNodeInfo(), NodeKey{ PrivKey: ed25519.GenPrivKey(), }, @@ -120,6 +126,7 @@ func TestTransportMultiplexConnFilterTimeout(t *testing.T) { t.Errorf("expected ErrFilterTimeout") } } + func TestTransportMultiplexAcceptMultiple(t *testing.T) { mt := testSetupMultiplexTransport(t) @@ -134,12 +141,7 @@ func TestTransportMultiplexAcceptMultiple(t *testing.T) { var ( pv = ed25519.GenPrivKey() dialer = NewMultiplexTransport( - NodeInfo{ - ID: PubKeyToID(pv.PubKey()), - ListenAddr: "127.0.0.1:0", - Moniker: "dialer", - Version: "1.0.0", - }, + testNodeInfo(PubKeyToID(pv.PubKey()), defaultNodeName), NodeKey{ PrivKey: pv, }, @@ -207,15 +209,10 @@ func TestTransportMultiplexAcceptNonBlocking(t *testing.T) { var ( fastNodePV = ed25519.GenPrivKey() - fastNodeInfo = NodeInfo{ - ID: PubKeyToID(fastNodePV.PubKey()), - ListenAddr: "127.0.0.1:0", - Moniker: "fastNode", - Version: "1.0.0", - } - errc = make(chan error) - fastc = make(chan struct{}) - slowc = make(chan struct{}) + fastNodeInfo = testNodeInfo(PubKeyToID(fastNodePV.PubKey()), "fastnode") + errc = make(chan error) + fastc = make(chan struct{}) + slowc = make(chan struct{}) ) // Simulate slow Peer. @@ -248,11 +245,11 @@ func TestTransportMultiplexAcceptNonBlocking(t *testing.T) { return } - _, err = handshake(sc, 20*time.Millisecond, NodeInfo{ - ID: PubKeyToID(ed25519.GenPrivKey().PubKey()), - ListenAddr: "127.0.0.1:0", - Moniker: "slow_peer", - }) + _, err = handshake(sc, 20*time.Millisecond, + testNodeInfo( + PubKeyToID(ed25519.GenPrivKey().PubKey()), + "slow_peer", + )) if err != nil { errc <- err return @@ -311,12 +308,7 @@ func TestTransportMultiplexValidateNodeInfo(t *testing.T) { var ( pv = ed25519.GenPrivKey() dialer = NewMultiplexTransport( - NodeInfo{ - ID: PubKeyToID(pv.PubKey()), - ListenAddr: "127.0.0.1:0", - Moniker: "", // Should not be empty. - Version: "1.0.0", - }, + testNodeInfo(PubKeyToID(pv.PubKey()), ""), // Should not be empty NodeKey{ PrivKey: pv, }, @@ -359,12 +351,9 @@ func TestTransportMultiplexRejectMissmatchID(t *testing.T) { go func() { dialer := NewMultiplexTransport( - NodeInfo{ - ID: PubKeyToID(ed25519.GenPrivKey().PubKey()), - ListenAddr: "127.0.0.1:0", - Moniker: "dialer", - Version: "1.0.0", - }, + testNodeInfo( + PubKeyToID(ed25519.GenPrivKey().PubKey()), "dialer", + ), NodeKey{ PrivKey: ed25519.GenPrivKey(), }, @@ -408,12 +397,7 @@ func TestTransportMultiplexRejectIncompatible(t *testing.T) { var ( pv = ed25519.GenPrivKey() dialer = NewMultiplexTransport( - NodeInfo{ - ID: PubKeyToID(pv.PubKey()), - ListenAddr: "127.0.0.1:0", - Moniker: "dialer", - Version: "2.0.0", - }, + testNodeInfoWithNetwork(PubKeyToID(pv.PubKey()), "dialer", "incompatible-network"), NodeKey{ PrivKey: pv, }, @@ -521,9 +505,7 @@ func TestTransportHandshake(t *testing.T) { var ( peerPV = ed25519.GenPrivKey() - peerNodeInfo = NodeInfo{ - ID: PubKeyToID(peerPV.PubKey()), - } + peerNodeInfo = testNodeInfo(PubKeyToID(peerPV.PubKey()), defaultNodeName) ) go func() { @@ -534,13 +516,13 @@ func TestTransportHandshake(t *testing.T) { } go func(c net.Conn) { - _, err := cdc.MarshalBinaryWriter(c, peerNodeInfo) + _, err := cdc.MarshalBinaryWriter(c, peerNodeInfo.(DefaultNodeInfo)) if err != nil { t.Error(err) } }(c) go func(c net.Conn) { - ni := NodeInfo{} + var ni DefaultNodeInfo _, err := cdc.UnmarshalBinaryReader( c, @@ -558,7 +540,7 @@ func TestTransportHandshake(t *testing.T) { t.Fatal(err) } - ni, err := handshake(c, 20*time.Millisecond, NodeInfo{}) + ni, err := handshake(c, 20*time.Millisecond, emptyNodeInfo()) if err != nil { t.Fatal(err) } @@ -572,12 +554,9 @@ func testSetupMultiplexTransport(t *testing.T) *MultiplexTransport { var ( pv = ed25519.GenPrivKey() mt = NewMultiplexTransport( - NodeInfo{ - ID: PubKeyToID(pv.PubKey()), - ListenAddr: "127.0.0.1:0", - Moniker: "transport", - Version: "1.0.0", - }, + testNodeInfo( + PubKeyToID(pv.PubKey()), "transport", + ), NodeKey{ PrivKey: pv, }, diff --git a/rpc/core/consensus.go b/rpc/core/consensus.go index 1d5f92753..1c2619d5c 100644 --- a/rpc/core/consensus.go +++ b/rpc/core/consensus.go @@ -2,7 +2,6 @@ package core import ( cm "github.com/tendermint/tendermint/consensus" - "github.com/tendermint/tendermint/p2p" ctypes "github.com/tendermint/tendermint/rpc/core/types" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" @@ -201,7 +200,7 @@ func DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) { } peerStates[i] = ctypes.PeerStateInfo{ // Peer basic info. - NodeAddress: p2p.IDAddressString(peer.ID(), peer.NodeInfo().ListenAddr), + NodeAddress: peer.NodeInfo().NetAddress().String(), // Peer consensus state. PeerState: peerStateJSON, } diff --git a/rpc/core/net.go b/rpc/core/net.go index 9816d2f63..dbd4d8c0b 100644 --- a/rpc/core/net.go +++ b/rpc/core/net.go @@ -1,8 +1,11 @@ package core import ( + "fmt" + "github.com/pkg/errors" + "github.com/tendermint/tendermint/p2p" ctypes "github.com/tendermint/tendermint/rpc/core/types" ) @@ -37,8 +40,12 @@ import ( func NetInfo() (*ctypes.ResultNetInfo, error) { peers := []ctypes.Peer{} for _, peer := range p2pPeers.Peers().List() { + nodeInfo, ok := peer.NodeInfo().(p2p.DefaultNodeInfo) + if !ok { + return nil, fmt.Errorf("peer.NodeInfo() is not DefaultNodeInfo") + } peers = append(peers, ctypes.Peer{ - NodeInfo: peer.NodeInfo(), + NodeInfo: nodeInfo, IsOutbound: peer.IsOutbound(), ConnectionStatus: peer.Status(), }) diff --git a/rpc/core/status.go b/rpc/core/status.go index 17fb2f341..c26b06b8a 100644 --- a/rpc/core/status.go +++ b/rpc/core/status.go @@ -5,6 +5,7 @@ import ( "time" cmn "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/p2p" ctypes "github.com/tendermint/tendermint/rpc/core/types" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" @@ -91,7 +92,7 @@ func Status() (*ctypes.ResultStatus, error) { } result := &ctypes.ResultStatus{ - NodeInfo: p2pTransport.NodeInfo(), + NodeInfo: p2pTransport.NodeInfo().(p2p.DefaultNodeInfo), SyncInfo: ctypes.SyncInfo{ LatestBlockHash: latestBlockHash, LatestAppHash: latestAppHash, diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index a6dcf2b93..07628d1c6 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -74,9 +74,9 @@ type ValidatorInfo struct { // Node Status type ResultStatus struct { - NodeInfo p2p.NodeInfo `json:"node_info"` - SyncInfo SyncInfo `json:"sync_info"` - ValidatorInfo ValidatorInfo `json:"validator_info"` + NodeInfo p2p.DefaultNodeInfo `json:"node_info"` + SyncInfo SyncInfo `json:"sync_info"` + ValidatorInfo ValidatorInfo `json:"validator_info"` } // Is TxIndexing enabled @@ -107,7 +107,7 @@ type ResultDialPeers struct { // A peer type Peer struct { - p2p.NodeInfo `json:"node_info"` + NodeInfo p2p.DefaultNodeInfo `json:"node_info"` IsOutbound bool `json:"is_outbound"` ConnectionStatus p2p.ConnectionStatus `json:"connection_status"` } diff --git a/rpc/core/types/responses_test.go b/rpc/core/types/responses_test.go index c6c86e1f7..796299d33 100644 --- a/rpc/core/types/responses_test.go +++ b/rpc/core/types/responses_test.go @@ -15,17 +15,17 @@ func TestStatusIndexer(t *testing.T) { status = &ResultStatus{} assert.False(t, status.TxIndexEnabled()) - status.NodeInfo = p2p.NodeInfo{} + status.NodeInfo = p2p.DefaultNodeInfo{} assert.False(t, status.TxIndexEnabled()) cases := []struct { expected bool - other p2p.NodeInfoOther + other p2p.DefaultNodeInfoOther }{ - {false, p2p.NodeInfoOther{}}, - {false, p2p.NodeInfoOther{TxIndex: "aa"}}, - {false, p2p.NodeInfoOther{TxIndex: "off"}}, - {true, p2p.NodeInfoOther{TxIndex: "on"}}, + {false, p2p.DefaultNodeInfoOther{}}, + {false, p2p.DefaultNodeInfoOther{TxIndex: "aa"}}, + {false, p2p.DefaultNodeInfoOther{TxIndex: "off"}}, + {true, p2p.DefaultNodeInfoOther{TxIndex: "on"}}, } for _, tc := range cases {