Browse Source

p2p: NodeInfo is an interface; General cleanup (#2556)

* p2p: NodeInfo is an interface

* (squash) fixes from review

* (squash) more fixes from review

* p2p: remove peerConn.HandshakeTimeout

* p2p: NodeInfo is two interfaces. Remove String()

* fixes from review

* remove test code from peer.RemoteIP()

* p2p: remove peer.OriginalAddr(). See #2618

* use a mockPeer in peer_set_test.go

* p2p: fix testNodeInfo naming

* p2p: remove unused var

* remove testRandNodeInfo

* fix linter

* fix retry dialing self

* fix rpc
bucky/node-info-substructs
Ethan Buchman 6 years ago
committed by GitHub
parent
commit
0baa7588c2
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 269 additions and 331 deletions
  1. +17
    -33
      benchmarks/codec_test.go
  2. +1
    -2
      blockchain/reactor_test.go
  3. +1
    -1
      consensus/common_test.go
  4. +3
    -3
      node/node.go
  5. +1
    -6
      p2p/dummy/peer.go
  6. +2
    -3
      p2p/errors.go
  7. +45
    -31
      p2p/node_info.go
  8. +26
    -81
      p2p/peer.go
  9. +27
    -18
      p2p/peer_set_test.go
  10. +17
    -19
      p2p/peer_test.go
  11. +3
    -4
      p2p/pex/pex_reactor_test.go
  12. +7
    -6
      p2p/switch.go
  13. +1
    -0
      p2p/switch_test.go
  14. +48
    -41
      p2p/test_util.go
  15. +18
    -17
      p2p/transport.go
  16. +31
    -52
      p2p/transport_test.go
  17. +1
    -2
      rpc/core/consensus.go
  18. +8
    -1
      rpc/core/net.go
  19. +2
    -1
      rpc/core/status.go
  20. +4
    -4
      rpc/core/types/responses.go
  21. +6
    -6
      rpc/core/types/responses_test.go

+ 17
- 33
benchmarks/codec_test.go View File

@ -12,23 +12,27 @@ import (
ctypes "github.com/tendermint/tendermint/rpc/core/types"
)
func testNodeInfo(id p2p.ID) p2p.DefaultNodeInfo {
return p2p.DefaultNodeInfo{
ID_: id,
Moniker: "SOMENAME",
Network: "SOMENAME",
ListenAddr: "SOMEADDR",
Version: "SOMEVER",
Other: p2p.DefaultNodeInfoOther{
AminoVersion: "SOMESTRING",
P2PVersion: "OTHERSTRING",
},
}
}
func BenchmarkEncodeStatusWire(b *testing.B) {
b.StopTimer()
cdc := amino.NewCodec()
ctypes.RegisterAmino(cdc)
nodeKey := p2p.NodeKey{PrivKey: ed25519.GenPrivKey()}
status := &ctypes.ResultStatus{
NodeInfo: p2p.NodeInfo{
ID: nodeKey.ID(),
Moniker: "SOMENAME",
Network: "SOMENAME",
ListenAddr: "SOMEADDR",
Version: "SOMEVER",
Other: p2p.NodeInfoOther{
AminoVersion: "SOMESTRING",
P2PVersion: "OTHERSTRING",
},
},
NodeInfo: testNodeInfo(nodeKey.ID()),
SyncInfo: ctypes.SyncInfo{
LatestBlockHash: []byte("SOMEBYTES"),
LatestBlockHeight: 123,
@ -56,17 +60,7 @@ func BenchmarkEncodeNodeInfoWire(b *testing.B) {
cdc := amino.NewCodec()
ctypes.RegisterAmino(cdc)
nodeKey := p2p.NodeKey{PrivKey: ed25519.GenPrivKey()}
nodeInfo := p2p.NodeInfo{
ID: nodeKey.ID(),
Moniker: "SOMENAME",
Network: "SOMENAME",
ListenAddr: "SOMEADDR",
Version: "SOMEVER",
Other: p2p.NodeInfoOther{
AminoVersion: "SOMESTRING",
P2PVersion: "OTHERSTRING",
},
}
nodeInfo := testNodeInfo(nodeKey.ID())
b.StartTimer()
counter := 0
@ -84,17 +78,7 @@ func BenchmarkEncodeNodeInfoBinary(b *testing.B) {
cdc := amino.NewCodec()
ctypes.RegisterAmino(cdc)
nodeKey := p2p.NodeKey{PrivKey: ed25519.GenPrivKey()}
nodeInfo := p2p.NodeInfo{
ID: nodeKey.ID(),
Moniker: "SOMENAME",
Network: "SOMENAME",
ListenAddr: "SOMEADDR",
Version: "SOMEVER",
Other: p2p.NodeInfoOther{
AminoVersion: "SOMESTRING",
P2PVersion: "OTHERSTRING",
},
}
nodeInfo := testNodeInfo(nodeKey.ID())
b.StartTimer()
counter := 0


+ 1
- 2
blockchain/reactor_test.go View File

@ -198,7 +198,7 @@ func (tp *bcrTestPeer) TrySend(chID byte, msgBytes []byte) bool {
}
func (tp *bcrTestPeer) Send(chID byte, msgBytes []byte) bool { return tp.TrySend(chID, msgBytes) }
func (tp *bcrTestPeer) NodeInfo() p2p.NodeInfo { return p2p.NodeInfo{} }
func (tp *bcrTestPeer) NodeInfo() p2p.NodeInfo { return p2p.DefaultNodeInfo{} }
func (tp *bcrTestPeer) Status() p2p.ConnectionStatus { return p2p.ConnectionStatus{} }
func (tp *bcrTestPeer) ID() p2p.ID { return tp.id }
func (tp *bcrTestPeer) IsOutbound() bool { return false }
@ -206,4 +206,3 @@ func (tp *bcrTestPeer) IsPersistent() bool { return true }
func (tp *bcrTestPeer) Get(s string) interface{} { return s }
func (tp *bcrTestPeer) Set(string, interface{}) {}
func (tp *bcrTestPeer) RemoteIP() net.IP { return []byte{127, 0, 0, 1} }
func (tp *bcrTestPeer) OriginalAddr() *p2p.NetAddress { return nil }

+ 1
- 1
consensus/common_test.go View File

@ -568,7 +568,7 @@ func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerF
func getSwitchIndex(switches []*p2p.Switch, peer p2p.Peer) int {
for i, s := range switches {
if peer.NodeInfo().ID == s.NodeInfo().ID {
if peer.NodeInfo().ID() == s.NodeInfo().ID() {
return i
}
}


+ 3
- 3
node/node.go View File

@ -761,8 +761,8 @@ func makeNodeInfo(
if _, ok := txIndexer.(*null.TxIndex); ok {
txIndexerStatus = "off"
}
nodeInfo := p2p.NodeInfo{
ID: nodeID,
nodeInfo := p2p.DefaultNodeInfo{
ID_: nodeID,
Network: chainID,
Version: version.Version,
Channels: []byte{
@ -772,7 +772,7 @@ func makeNodeInfo(
evidence.EvidenceChannel,
},
Moniker: config.Moniker,
Other: p2p.NodeInfoOther{
Other: p2p.DefaultNodeInfoOther{
AminoVersion: amino.Version,
P2PVersion: p2p.Version,
ConsensusVersion: cs.Version,


+ 1
- 6
p2p/dummy/peer.go View File

@ -42,7 +42,7 @@ func (p *peer) IsPersistent() bool {
// NodeInfo always returns empty node info.
func (p *peer) NodeInfo() p2p.NodeInfo {
return p2p.NodeInfo{}
return p2p.DefaultNodeInfo{}
}
// RemoteIP always returns localhost.
@ -78,8 +78,3 @@ func (p *peer) Get(key string) interface{} {
}
return nil
}
// OriginalAddr always returns nil.
func (p *peer) OriginalAddr() *p2p.NetAddress {
return nil
}

+ 2
- 3
p2p/errors.go View File

@ -40,13 +40,12 @@ func (e ErrRejected) Error() string {
if e.isDuplicate {
if e.conn != nil {
return fmt.Sprintf(
"duplicate CONN<%s>: %s",
"duplicate CONN<%s>",
e.conn.RemoteAddr().String(),
e.err,
)
}
if e.id != "" {
return fmt.Sprintf("duplicate ID<%v>: %s", e.id, e.err)
return fmt.Sprintf("duplicate ID<%v>", e.id)
}
}


+ 45
- 31
p2p/node_info.go View File

@ -2,6 +2,7 @@ package p2p
import (
"fmt"
"reflect"
"strings"
cmn "github.com/tendermint/tendermint/libs/common"
@ -17,12 +18,32 @@ func MaxNodeInfoSize() int {
return maxNodeInfoSize
}
// NodeInfo is the basic node information exchanged
// NodeInfo exposes basic info of a node
// and determines if we're compatible
type NodeInfo interface {
nodeInfoAddress
nodeInfoTransport
}
// nodeInfoAddress exposes just the core info of a node.
type nodeInfoAddress interface {
ID() ID
NetAddress() *NetAddress
}
// nodeInfoTransport is validates a nodeInfo and checks
// our compatibility with it. It's for use in the handshake.
type nodeInfoTransport interface {
ValidateBasic() error
CompatibleWith(other NodeInfo) error
}
// DefaultNodeInfo is the basic node information exchanged
// between two peers during the Tendermint P2P handshake.
type NodeInfo struct {
type DefaultNodeInfo struct {
// Authenticate
// TODO: replace with NetAddress
ID ID `json:"id"` // authenticated identifier
ID_ ID `json:"id"` // authenticated identifier
ListenAddr string `json:"listen_addr"` // accepting incoming
// Check compatibility.
@ -32,12 +53,12 @@ type NodeInfo struct {
Channels cmn.HexBytes `json:"channels"` // channels this node knows about
// ASCIIText fields
Moniker string `json:"moniker"` // arbitrary moniker
Other NodeInfoOther `json:"other"` // other application specific data
Moniker string `json:"moniker"` // arbitrary moniker
Other DefaultNodeInfoOther `json:"other"` // other application specific data
}
// NodeInfoOther is the misc. applcation specific data
type NodeInfoOther struct {
// DefaultNodeInfoOther is the misc. applcation specific data
type DefaultNodeInfoOther struct {
AminoVersion string `json:"amino_version"`
P2PVersion string `json:"p2p_version"`
ConsensusVersion string `json:"consensus_version"`
@ -46,19 +67,12 @@ type NodeInfoOther struct {
RPCAddress string `json:"rpc_address"`
}
func (o NodeInfoOther) String() string {
return fmt.Sprintf(
"{amino_version: %v, p2p_version: %v, consensus_version: %v, rpc_version: %v, tx_index: %v, rpc_address: %v}",
o.AminoVersion,
o.P2PVersion,
o.ConsensusVersion,
o.RPCVersion,
o.TxIndex,
o.RPCAddress,
)
// ID returns the node's peer ID.
func (info DefaultNodeInfo) ID() ID {
return info.ID_
}
// Validate checks the self-reported NodeInfo is safe.
// ValidateBasic checks the self-reported DefaultNodeInfo is safe.
// It returns an error if there
// are too many Channels, if there are any duplicate Channels,
// if the ListenAddr is malformed, or if the ListenAddr is a host name
@ -71,7 +85,7 @@ func (o NodeInfoOther) String() string {
// International clients could then use punycode (or we could use
// url-encoding), and we just need to be careful with how we handle that in our
// clients. (e.g. off by default).
func (info NodeInfo) Validate() error {
func (info DefaultNodeInfo) ValidateBasic() error {
if len(info.Channels) > maxNumChannels {
return fmt.Errorf("info.Channels is too long (%v). Max is %v", len(info.Channels), maxNumChannels)
}
@ -111,14 +125,19 @@ func (info NodeInfo) Validate() error {
}
// ensure ListenAddr is good
_, err := NewNetAddressString(IDAddressString(info.ID, info.ListenAddr))
_, err := NewNetAddressString(IDAddressString(info.ID(), info.ListenAddr))
return err
}
// CompatibleWith checks if two NodeInfo are compatible with eachother.
// CompatibleWith checks if two DefaultNodeInfo are compatible with eachother.
// CONTRACT: two nodes are compatible if the major version matches and network match
// and they have at least one channel in common.
func (info NodeInfo) CompatibleWith(other NodeInfo) error {
func (info DefaultNodeInfo) CompatibleWith(other_ NodeInfo) error {
other, ok := other_.(DefaultNodeInfo)
if !ok {
return fmt.Errorf("wrong NodeInfo type. Expected DefaultNodeInfo, got %v", reflect.TypeOf(other_))
}
iMajor, _, _, iErr := splitVersion(info.Version)
oMajor, _, _, oErr := splitVersion(other.Version)
@ -164,18 +183,18 @@ OUTER_LOOP:
return nil
}
// NetAddress returns a NetAddress derived from the NodeInfo -
// NetAddress returns a NetAddress derived from the DefaultNodeInfo -
// it includes the authenticated peer ID and the self-reported
// ListenAddr. Note that the ListenAddr is not authenticated and
// may not match that address actually dialed if its an outbound peer.
func (info NodeInfo) NetAddress() *NetAddress {
netAddr, err := NewNetAddressString(IDAddressString(info.ID, info.ListenAddr))
func (info DefaultNodeInfo) NetAddress() *NetAddress {
netAddr, err := NewNetAddressString(IDAddressString(info.ID(), info.ListenAddr))
if err != nil {
switch err.(type) {
case ErrNetAddressLookup:
// XXX If the peer provided a host name and the lookup fails here
// we're out of luck.
// TODO: use a NetAddress in NodeInfo
// TODO: use a NetAddress in DefaultNodeInfo
default:
panic(err) // everything should be well formed by now
}
@ -183,11 +202,6 @@ func (info NodeInfo) NetAddress() *NetAddress {
return netAddr
}
func (info NodeInfo) String() string {
return fmt.Sprintf("NodeInfo{id: %v, moniker: %v, network: %v [listen %v], version: %v (%v)}",
info.ID, info.Moniker, info.Network, info.ListenAddr, info.Version, info.Other)
}
func splitVersion(version string) (string, string, string, error) {
spl := strings.Split(version, ".")
if len(spl) != 3 {


+ 26
- 81
p2p/peer.go View File

@ -3,7 +3,6 @@ package p2p
import (
"fmt"
"net"
"sync/atomic"
"time"
cmn "github.com/tendermint/tendermint/libs/common"
@ -15,19 +14,18 @@ import (
const metricsTickerDuration = 10 * time.Second
var testIPSuffix uint32
// Peer is an interface representing a peer connected on a reactor.
type Peer interface {
cmn.Service
ID() ID // peer's cryptographic ID
RemoteIP() net.IP // remote IP of the connection
ID() ID // peer's cryptographic ID
RemoteIP() net.IP // remote IP of the connection
IsOutbound() bool // did we dial the peer
IsPersistent() bool // do we redial this peer when we disconnect
NodeInfo() NodeInfo // peer's info
Status() tmconn.ConnectionStatus
OriginalAddr() *NetAddress
Send(byte, []byte) bool
TrySend(byte, []byte) bool
@ -40,12 +38,13 @@ type Peer interface {
// peerConn contains the raw connection and its config.
type peerConn struct {
outbound bool
persistent bool
config *config.P2PConfig
conn net.Conn // source connection
ip net.IP
originalAddr *NetAddress // nil for inbound connections
outbound bool
persistent bool
config *config.P2PConfig
conn net.Conn // source connection
// cached RemoteIP()
ip net.IP
}
// ID only exists for SecretConnection.
@ -60,14 +59,6 @@ func (pc peerConn) RemoteIP() net.IP {
return pc.ip
}
// In test cases a conn could not be present at all or be an in-memory
// implementation where we want to return a fake ip.
if pc.conn == nil || pc.conn.RemoteAddr().String() == "pipe" {
pc.ip = net.IP{172, 16, 0, byte(atomic.AddUint32(&testIPSuffix, 1))}
return pc.ip
}
host, _, err := net.SplitHostPort(pc.conn.RemoteAddr().String())
if err != nil {
panic(err)
@ -120,7 +111,7 @@ func newPeer(
p := &peer{
peerConn: pc,
nodeInfo: nodeInfo,
channels: nodeInfo.Channels,
channels: nodeInfo.(DefaultNodeInfo).Channels, // TODO
Data: cmn.NewCMap(),
metricsTicker: time.NewTicker(metricsTickerDuration),
metrics: NopMetrics(),
@ -142,6 +133,15 @@ func newPeer(
return p
}
// String representation.
func (p *peer) String() string {
if p.outbound {
return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.ID())
}
return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID())
}
//---------------------------------------------------
// Implements cmn.Service
@ -177,7 +177,7 @@ func (p *peer) OnStop() {
// ID returns the peer's ID - the hex encoded hash of its pubkey.
func (p *peer) ID() ID {
return p.nodeInfo.ID
return p.nodeInfo.ID()
}
// IsOutbound returns true if the connection is outbound, false otherwise.
@ -195,15 +195,6 @@ func (p *peer) NodeInfo() NodeInfo {
return p.nodeInfo
}
// OriginalAddr returns the original address, which was used to connect with
// the peer. Returns nil for inbound peers.
func (p *peer) OriginalAddr() *NetAddress {
if p.peerConn.outbound {
return p.peerConn.originalAddr
}
return nil
}
// Status returns the peer's ConnectionStatus.
func (p *peer) Status() tmconn.ConnectionStatus {
return p.mconn.Status()
@ -272,53 +263,14 @@ func (p *peer) hasChannel(chID byte) bool {
}
//---------------------------------------------------
// methods used by the Switch
// methods only used for testing
// TODO: can we remove these?
// CloseConn should be called by the Switch if the peer was created but never
// started.
// CloseConn closes the underlying connection
func (pc *peerConn) CloseConn() {
pc.conn.Close() // nolint: errcheck
}
// HandshakeTimeout performs the Tendermint P2P handshake between a given node
// and the peer by exchanging their NodeInfo. It sets the received nodeInfo on
// the peer.
// NOTE: blocking
func (pc *peerConn) HandshakeTimeout(
ourNodeInfo NodeInfo,
timeout time.Duration,
) (peerNodeInfo NodeInfo, err error) {
// Set deadline for handshake so we don't block forever on conn.ReadFull
if err := pc.conn.SetDeadline(time.Now().Add(timeout)); err != nil {
return peerNodeInfo, cmn.ErrorWrap(err, "Error setting deadline")
}
var trs, _ = cmn.Parallel(
func(_ int) (val interface{}, err error, abort bool) {
_, err = cdc.MarshalBinaryWriter(pc.conn, ourNodeInfo)
return
},
func(_ int) (val interface{}, err error, abort bool) {
_, err = cdc.UnmarshalBinaryReader(
pc.conn,
&peerNodeInfo,
int64(MaxNodeInfoSize()),
)
return
},
)
if err := trs.FirstError(); err != nil {
return peerNodeInfo, cmn.ErrorWrap(err, "Error during handshake")
}
// Remove deadline
if err := pc.conn.SetDeadline(time.Time{}); err != nil {
return peerNodeInfo, cmn.ErrorWrap(err, "Error removing deadline")
}
return peerNodeInfo, nil
}
// Addr returns peer's remote network address.
func (p *peer) Addr() net.Addr {
return p.peerConn.conn.RemoteAddr()
@ -332,14 +284,7 @@ func (p *peer) CanSend(chID byte) bool {
return p.mconn.CanSend(chID)
}
// String representation.
func (p *peer) String() string {
if p.outbound {
return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.ID())
}
return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID())
}
//---------------------------------------------------
func PeerMetrics(metrics *Metrics) PeerOption {
return func(p *peer) {


+ 27
- 18
p2p/peer_set_test.go View File

@ -1,7 +1,6 @@
package p2p
import (
"fmt"
"net"
"sync"
"testing"
@ -12,24 +11,34 @@ import (
cmn "github.com/tendermint/tendermint/libs/common"
)
// Returns an empty kvstore peer
func randPeer(ip net.IP) *peer {
// mockPeer for testing the PeerSet
type mockPeer struct {
cmn.BaseService
ip net.IP
id ID
}
func (mp *mockPeer) TrySend(chID byte, msgBytes []byte) bool { return true }
func (mp *mockPeer) Send(chID byte, msgBytes []byte) bool { return true }
func (mp *mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} }
func (mp *mockPeer) Status() ConnectionStatus { return ConnectionStatus{} }
func (mp *mockPeer) ID() ID { return mp.id }
func (mp *mockPeer) IsOutbound() bool { return false }
func (mp *mockPeer) IsPersistent() bool { return true }
func (mp *mockPeer) Get(s string) interface{} { return s }
func (mp *mockPeer) Set(string, interface{}) {}
func (mp *mockPeer) RemoteIP() net.IP { return mp.ip }
// Returns a mock peer
func newMockPeer(ip net.IP) *mockPeer {
if ip == nil {
ip = net.IP{127, 0, 0, 1}
}
nodeKey := NodeKey{PrivKey: ed25519.GenPrivKey()}
p := &peer{
nodeInfo: NodeInfo{
ID: nodeKey.ID(),
ListenAddr: fmt.Sprintf("%v.%v.%v.%v:26656", cmn.RandInt()%256, cmn.RandInt()%256, cmn.RandInt()%256, cmn.RandInt()%256),
},
metrics: NopMetrics(),
return &mockPeer{
ip: ip,
id: nodeKey.ID(),
}
p.ip = ip
return p
}
func TestPeerSetAddRemoveOne(t *testing.T) {
@ -39,7 +48,7 @@ func TestPeerSetAddRemoveOne(t *testing.T) {
var peerList []Peer
for i := 0; i < 5; i++ {
p := randPeer(net.IP{127, 0, 0, byte(i)})
p := newMockPeer(net.IP{127, 0, 0, byte(i)})
if err := peerSet.Add(p); err != nil {
t.Error(err)
}
@ -83,7 +92,7 @@ func TestPeerSetAddRemoveMany(t *testing.T) {
peers := []Peer{}
N := 100
for i := 0; i < N; i++ {
peer := randPeer(net.IP{127, 0, 0, byte(i)})
peer := newMockPeer(net.IP{127, 0, 0, byte(i)})
if err := peerSet.Add(peer); err != nil {
t.Errorf("Failed to add new peer")
}
@ -107,7 +116,7 @@ func TestPeerSetAddRemoveMany(t *testing.T) {
func TestPeerSetAddDuplicate(t *testing.T) {
t.Parallel()
peerSet := NewPeerSet()
peer := randPeer(nil)
peer := newMockPeer(nil)
n := 20
errsChan := make(chan error)
@ -149,7 +158,7 @@ func TestPeerSetGet(t *testing.T) {
var (
peerSet = NewPeerSet()
peer = randPeer(nil)
peer = newMockPeer(nil)
)
assert.Nil(t, peerSet.Get(peer.ID()), "expecting a nil lookup, before .Add")


+ 17
- 19
p2p/peer_test.go View File

@ -19,8 +19,6 @@ import (
tmconn "github.com/tendermint/tendermint/p2p/conn"
)
const testCh = 0x01
func TestPeerBasic(t *testing.T) {
assert, require := assert.New(t), require.New(t)
@ -81,18 +79,14 @@ func createOutboundPeerAndPerformHandshake(
if err != nil {
return nil, err
}
nodeInfo, err := pc.HandshakeTimeout(NodeInfo{
ID: addr.ID,
Moniker: "host_peer",
Network: "testing",
Version: "123.123.123",
Channels: []byte{testCh},
}, 1*time.Second)
timeout := 1 * time.Second
ourNodeInfo := testNodeInfo(addr.ID, "host_peer")
peerNodeInfo, err := handshake(pc.conn, timeout, ourNodeInfo)
if err != nil {
return nil, err
}
p := newPeer(pc, mConfig, nodeInfo, reactorsByCh, chDescs, func(p Peer, r interface{}) {})
p := newPeer(pc, mConfig, peerNodeInfo, reactorsByCh, chDescs, func(p Peer, r interface{}) {})
p.SetLogger(log.TestingLogger().With("peer", addr))
return p, nil
}
@ -120,7 +114,7 @@ func testOutboundPeerConn(
return peerConn{}, cmn.ErrorWrap(err, "Error creating peer")
}
pc, err := testPeerConn(conn, config, true, persistent, ourNodePrivKey, addr)
pc, err := testPeerConn(conn, config, true, persistent, ourNodePrivKey)
if err != nil {
if cerr := conn.Close(); cerr != nil {
return peerConn{}, cmn.ErrorWrap(err, cerr.Error())
@ -191,14 +185,7 @@ func (rp *remotePeer) accept(l net.Listener) {
golog.Fatalf("Failed to create a peer: %+v", err)
}
_, err = handshake(pc.conn, time.Second, NodeInfo{
ID: rp.Addr().ID,
Moniker: "remote_peer",
Network: "testing",
Version: "123.123.123",
ListenAddr: l.Addr().String(),
Channels: rp.channels,
})
_, err = handshake(pc.conn, time.Second, rp.nodeInfo(l))
if err != nil {
golog.Fatalf("Failed to perform handshake: %+v", err)
}
@ -217,3 +204,14 @@ func (rp *remotePeer) accept(l net.Listener) {
}
}
}
func (rp *remotePeer) nodeInfo(l net.Listener) NodeInfo {
return DefaultNodeInfo{
ID_: rp.Addr().ID,
Moniker: "remote_peer",
Network: "testing",
Version: "123.123.123",
ListenAddr: l.Addr().String(),
Channels: rp.channels,
}
}

+ 3
- 4
p2p/pex/pex_reactor_test.go View File

@ -320,7 +320,7 @@ func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) {
peer := p2p.CreateRandomPeer(false)
pexR, book := createReactor(&PEXReactorConfig{})
book.AddPrivateIDs([]string{string(peer.NodeInfo().ID)})
book.AddPrivateIDs([]string{string(peer.NodeInfo().ID())})
defer teardownReactor(book)
// we have to send a request to receive responses
@ -391,8 +391,8 @@ func (mp mockPeer) ID() p2p.ID { return mp.addr.ID }
func (mp mockPeer) IsOutbound() bool { return mp.outbound }
func (mp mockPeer) IsPersistent() bool { return mp.persistent }
func (mp mockPeer) NodeInfo() p2p.NodeInfo {
return p2p.NodeInfo{
ID: mp.addr.ID,
return p2p.DefaultNodeInfo{
ID_: mp.addr.ID,
ListenAddr: mp.addr.DialString(),
}
}
@ -402,7 +402,6 @@ func (mockPeer) Send(byte, []byte) bool { return false }
func (mockPeer) TrySend(byte, []byte) bool { return false }
func (mockPeer) Set(string, interface{}) {}
func (mockPeer) Get(string) interface{} { return nil }
func (mockPeer) OriginalAddr() *p2p.NetAddress { return nil }
func assertPeersWithTimeout(
t *testing.T,


+ 7
- 6
p2p/switch.go View File

@ -280,12 +280,9 @@ func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) {
sw.stopAndRemovePeer(peer, reason)
if peer.IsPersistent() {
addr := peer.OriginalAddr()
if addr == nil {
// FIXME: persistent peers can't be inbound right now.
// self-reported address for inbound persistent peers
addr = peer.NodeInfo().NetAddress()
}
// TODO: use the original address dialed, not the self reported one
// See #2618.
addr := peer.NodeInfo().NetAddress()
go sw.reconnectToPeer(addr)
}
}
@ -560,9 +557,13 @@ func (sw *Switch) addOutboundPeerWithConfig(
// to avoid dialing in the future.
sw.addrBook.RemoveAddress(addr)
sw.addrBook.AddOurAddress(addr)
return err
}
}
// retry persistent peers after
// any dial error besides IsSelf()
if persistent {
go sw.reconnectToPeer(addr)
}


+ 1
- 0
p2p/switch_test.go View File

@ -143,6 +143,7 @@ func assertMsgReceivedWithTimeout(t *testing.T, msgBytes []byte, channel byte, r
}
return
}
case <-time.After(timeout):
t.Fatalf("Expected to have received 1 message in channel #%v, got zero", channel)
}


+ 48
- 41
p2p/test_util.go View File

@ -14,6 +14,19 @@ import (
"github.com/tendermint/tendermint/p2p/conn"
)
const testCh = 0x01
//------------------------------------------------
type mockNodeInfo struct {
addr *NetAddress
}
func (ni mockNodeInfo) ID() ID { return ni.addr.ID }
func (ni mockNodeInfo) NetAddress() *NetAddress { return ni.addr }
func (ni mockNodeInfo) ValidateBasic() error { return nil }
func (ni mockNodeInfo) CompatibleWith(other NodeInfo) error { return nil }
func AddPeerToSwitch(sw *Switch, peer Peer) {
sw.peers.Add(peer)
}
@ -24,12 +37,9 @@ func CreateRandomPeer(outbound bool) *peer {
peerConn: peerConn{
outbound: outbound,
},
nodeInfo: NodeInfo{
ID: netAddr.ID,
ListenAddr: netAddr.DialString(),
},
mconn: &conn.MConnection{},
metrics: NopMetrics(),
nodeInfo: mockNodeInfo{netAddr},
mconn: &conn.MConnection{},
metrics: NopMetrics(),
}
p.SetLogger(log.TestingLogger().With("peer", addr))
return p
@ -159,36 +169,15 @@ func MakeSwitch(
initSwitch func(int, *Switch) *Switch,
opts ...SwitchOption,
) *Switch {
var (
nodeKey = NodeKey{
PrivKey: ed25519.GenPrivKey(),
}
ni = NodeInfo{
ID: nodeKey.ID(),
Moniker: fmt.Sprintf("switch%d", i),
Network: network,
Version: version,
ListenAddr: fmt.Sprintf("127.0.0.1:%d", cmn.RandIntn(64512)+1023),
Other: NodeInfoOther{
AminoVersion: "1.0",
P2PVersion: "1.0",
ConsensusVersion: "1.0",
RPCVersion: "1.0",
TxIndex: "off",
RPCAddress: fmt.Sprintf("127.0.0.1:%d", cmn.RandIntn(64512)+1023),
},
}
)
addr, err := NewNetAddressStringWithOptionalID(
IDAddressString(nodeKey.ID(), ni.ListenAddr),
)
if err != nil {
panic(err)
nodeKey := NodeKey{
PrivKey: ed25519.GenPrivKey(),
}
nodeInfo := testNodeInfo(nodeKey.ID(), fmt.Sprintf("node%d", i))
t := NewMultiplexTransport(ni, nodeKey)
t := NewMultiplexTransport(nodeInfo, nodeKey)
addr := nodeInfo.NetAddress()
if err := t.Listen(*addr); err != nil {
panic(err)
}
@ -198,14 +187,16 @@ func MakeSwitch(
sw.SetLogger(log.TestingLogger())
sw.SetNodeKey(&nodeKey)
ni := nodeInfo.(DefaultNodeInfo)
for ch := range sw.reactorsByCh {
ni.Channels = append(ni.Channels, ch)
}
nodeInfo = ni
// TODO: We need to setup reactors ahead of time so the NodeInfo is properly
// populated and we don't have to do those awkward overrides and setters.
t.nodeInfo = ni
sw.SetNodeInfo(ni)
t.nodeInfo = nodeInfo
sw.SetNodeInfo(nodeInfo)
return sw
}
@ -215,7 +206,7 @@ func testInboundPeerConn(
config *config.P2PConfig,
ourNodePrivKey crypto.PrivKey,
) (peerConn, error) {
return testPeerConn(conn, config, false, false, ourNodePrivKey, nil)
return testPeerConn(conn, config, false, false, ourNodePrivKey)
}
func testPeerConn(
@ -223,7 +214,6 @@ func testPeerConn(
cfg *config.P2PConfig,
outbound, persistent bool,
ourNodePrivKey crypto.PrivKey,
originalAddr *NetAddress,
) (pc peerConn, err error) {
conn := rawConn
@ -241,10 +231,27 @@ func testPeerConn(
// Only the information we already have
return peerConn{
config: cfg,
outbound: outbound,
persistent: persistent,
conn: conn,
originalAddr: originalAddr,
config: cfg,
outbound: outbound,
persistent: persistent,
conn: conn,
}, nil
}
//----------------------------------------------------------------
// rand node info
func testNodeInfo(id ID, name string) NodeInfo {
return testNodeInfoWithNetwork(id, name, "testing")
}
func testNodeInfoWithNetwork(id ID, name, network string) NodeInfo {
return DefaultNodeInfo{
ID_: id,
ListenAddr: fmt.Sprintf("127.0.0.1:%d", cmn.RandIntn(64512)+1023),
Moniker: name,
Network: network,
Version: "123.123.123",
Channels: []byte{testCh},
}
}

+ 18
- 17
p2p/transport.go View File

@ -335,7 +335,7 @@ func (mt *MultiplexTransport) upgrade(
secretConn, err = upgradeSecretConn(c, mt.handshakeTimeout, mt.nodeKey.PrivKey)
if err != nil {
return nil, NodeInfo{}, ErrRejected{
return nil, nil, ErrRejected{
conn: c,
err: fmt.Errorf("secrect conn failed: %v", err),
isAuthFailure: true,
@ -344,15 +344,15 @@ func (mt *MultiplexTransport) upgrade(
nodeInfo, err = handshake(secretConn, mt.handshakeTimeout, mt.nodeInfo)
if err != nil {
return nil, NodeInfo{}, ErrRejected{
return nil, nil, ErrRejected{
conn: c,
err: fmt.Errorf("handshake failed: %v", err),
isAuthFailure: true,
}
}
if err := nodeInfo.Validate(); err != nil {
return nil, NodeInfo{}, ErrRejected{
if err := nodeInfo.ValidateBasic(); err != nil {
return nil, nil, ErrRejected{
conn: c,
err: err,
isNodeInfoInvalid: true,
@ -360,34 +360,34 @@ func (mt *MultiplexTransport) upgrade(
}
// Ensure connection key matches self reported key.
if connID := PubKeyToID(secretConn.RemotePubKey()); connID != nodeInfo.ID {
return nil, NodeInfo{}, ErrRejected{
if connID := PubKeyToID(secretConn.RemotePubKey()); connID != nodeInfo.ID() {
return nil, nil, ErrRejected{
conn: c,
id: connID,
err: fmt.Errorf(
"conn.ID (%v) NodeInfo.ID (%v) missmatch",
connID,
nodeInfo.ID,
nodeInfo.ID(),
),
isAuthFailure: true,
}
}
// Reject self.
if mt.nodeInfo.ID == nodeInfo.ID {
return nil, NodeInfo{}, ErrRejected{
addr: *NewNetAddress(nodeInfo.ID, c.RemoteAddr()),
if mt.nodeInfo.ID() == nodeInfo.ID() {
return nil, nil, ErrRejected{
addr: *NewNetAddress(nodeInfo.ID(), c.RemoteAddr()),
conn: c,
id: nodeInfo.ID,
id: nodeInfo.ID(),
isSelf: true,
}
}
if err := mt.nodeInfo.CompatibleWith(nodeInfo); err != nil {
return nil, NodeInfo{}, ErrRejected{
return nil, nil, ErrRejected{
conn: c,
err: err,
id: nodeInfo.ID,
id: nodeInfo.ID(),
isIncompatible: true,
}
}
@ -430,17 +430,18 @@ func handshake(
nodeInfo NodeInfo,
) (NodeInfo, error) {
if err := c.SetDeadline(time.Now().Add(timeout)); err != nil {
return NodeInfo{}, err
return nil, err
}
var (
errc = make(chan error, 2)
peerNodeInfo NodeInfo
peerNodeInfo DefaultNodeInfo
ourNodeInfo = nodeInfo.(DefaultNodeInfo)
)
go func(errc chan<- error, c net.Conn) {
_, err := cdc.MarshalBinaryWriter(c, nodeInfo)
_, err := cdc.MarshalBinaryWriter(c, ourNodeInfo)
errc <- err
}(errc, c)
go func(errc chan<- error, c net.Conn) {
@ -455,7 +456,7 @@ func handshake(
for i := 0; i < cap(errc); i++ {
err := <-errc
if err != nil {
return NodeInfo{}, err
return nil, err
}
}


+ 31
- 52
p2p/transport_test.go View File

@ -11,9 +11,15 @@ import (
"github.com/tendermint/tendermint/crypto/ed25519"
)
var defaultNodeName = "host_peer"
func emptyNodeInfo() NodeInfo {
return DefaultNodeInfo{}
}
func TestTransportMultiplexConnFilter(t *testing.T) {
mt := NewMultiplexTransport(
NodeInfo{},
emptyNodeInfo(),
NodeKey{
PrivKey: ed25519.GenPrivKey(),
},
@ -70,7 +76,7 @@ func TestTransportMultiplexConnFilter(t *testing.T) {
func TestTransportMultiplexConnFilterTimeout(t *testing.T) {
mt := NewMultiplexTransport(
NodeInfo{},
emptyNodeInfo(),
NodeKey{
PrivKey: ed25519.GenPrivKey(),
},
@ -120,6 +126,7 @@ func TestTransportMultiplexConnFilterTimeout(t *testing.T) {
t.Errorf("expected ErrFilterTimeout")
}
}
func TestTransportMultiplexAcceptMultiple(t *testing.T) {
mt := testSetupMultiplexTransport(t)
@ -134,12 +141,7 @@ func TestTransportMultiplexAcceptMultiple(t *testing.T) {
var (
pv = ed25519.GenPrivKey()
dialer = NewMultiplexTransport(
NodeInfo{
ID: PubKeyToID(pv.PubKey()),
ListenAddr: "127.0.0.1:0",
Moniker: "dialer",
Version: "1.0.0",
},
testNodeInfo(PubKeyToID(pv.PubKey()), defaultNodeName),
NodeKey{
PrivKey: pv,
},
@ -207,15 +209,10 @@ func TestTransportMultiplexAcceptNonBlocking(t *testing.T) {
var (
fastNodePV = ed25519.GenPrivKey()
fastNodeInfo = NodeInfo{
ID: PubKeyToID(fastNodePV.PubKey()),
ListenAddr: "127.0.0.1:0",
Moniker: "fastNode",
Version: "1.0.0",
}
errc = make(chan error)
fastc = make(chan struct{})
slowc = make(chan struct{})
fastNodeInfo = testNodeInfo(PubKeyToID(fastNodePV.PubKey()), "fastnode")
errc = make(chan error)
fastc = make(chan struct{})
slowc = make(chan struct{})
)
// Simulate slow Peer.
@ -248,11 +245,11 @@ func TestTransportMultiplexAcceptNonBlocking(t *testing.T) {
return
}
_, err = handshake(sc, 20*time.Millisecond, NodeInfo{
ID: PubKeyToID(ed25519.GenPrivKey().PubKey()),
ListenAddr: "127.0.0.1:0",
Moniker: "slow_peer",
})
_, err = handshake(sc, 20*time.Millisecond,
testNodeInfo(
PubKeyToID(ed25519.GenPrivKey().PubKey()),
"slow_peer",
))
if err != nil {
errc <- err
return
@ -311,12 +308,7 @@ func TestTransportMultiplexValidateNodeInfo(t *testing.T) {
var (
pv = ed25519.GenPrivKey()
dialer = NewMultiplexTransport(
NodeInfo{
ID: PubKeyToID(pv.PubKey()),
ListenAddr: "127.0.0.1:0",
Moniker: "", // Should not be empty.
Version: "1.0.0",
},
testNodeInfo(PubKeyToID(pv.PubKey()), ""), // Should not be empty
NodeKey{
PrivKey: pv,
},
@ -359,12 +351,9 @@ func TestTransportMultiplexRejectMissmatchID(t *testing.T) {
go func() {
dialer := NewMultiplexTransport(
NodeInfo{
ID: PubKeyToID(ed25519.GenPrivKey().PubKey()),
ListenAddr: "127.0.0.1:0",
Moniker: "dialer",
Version: "1.0.0",
},
testNodeInfo(
PubKeyToID(ed25519.GenPrivKey().PubKey()), "dialer",
),
NodeKey{
PrivKey: ed25519.GenPrivKey(),
},
@ -408,12 +397,7 @@ func TestTransportMultiplexRejectIncompatible(t *testing.T) {
var (
pv = ed25519.GenPrivKey()
dialer = NewMultiplexTransport(
NodeInfo{
ID: PubKeyToID(pv.PubKey()),
ListenAddr: "127.0.0.1:0",
Moniker: "dialer",
Version: "2.0.0",
},
testNodeInfoWithNetwork(PubKeyToID(pv.PubKey()), "dialer", "incompatible-network"),
NodeKey{
PrivKey: pv,
},
@ -521,9 +505,7 @@ func TestTransportHandshake(t *testing.T) {
var (
peerPV = ed25519.GenPrivKey()
peerNodeInfo = NodeInfo{
ID: PubKeyToID(peerPV.PubKey()),
}
peerNodeInfo = testNodeInfo(PubKeyToID(peerPV.PubKey()), defaultNodeName)
)
go func() {
@ -534,13 +516,13 @@ func TestTransportHandshake(t *testing.T) {
}
go func(c net.Conn) {
_, err := cdc.MarshalBinaryWriter(c, peerNodeInfo)
_, err := cdc.MarshalBinaryWriter(c, peerNodeInfo.(DefaultNodeInfo))
if err != nil {
t.Error(err)
}
}(c)
go func(c net.Conn) {
ni := NodeInfo{}
var ni DefaultNodeInfo
_, err := cdc.UnmarshalBinaryReader(
c,
@ -558,7 +540,7 @@ func TestTransportHandshake(t *testing.T) {
t.Fatal(err)
}
ni, err := handshake(c, 20*time.Millisecond, NodeInfo{})
ni, err := handshake(c, 20*time.Millisecond, emptyNodeInfo())
if err != nil {
t.Fatal(err)
}
@ -572,12 +554,9 @@ func testSetupMultiplexTransport(t *testing.T) *MultiplexTransport {
var (
pv = ed25519.GenPrivKey()
mt = NewMultiplexTransport(
NodeInfo{
ID: PubKeyToID(pv.PubKey()),
ListenAddr: "127.0.0.1:0",
Moniker: "transport",
Version: "1.0.0",
},
testNodeInfo(
PubKeyToID(pv.PubKey()), "transport",
),
NodeKey{
PrivKey: pv,
},


+ 1
- 2
rpc/core/consensus.go View File

@ -2,7 +2,6 @@ package core
import (
cm "github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/p2p"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
@ -201,7 +200,7 @@ func DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
}
peerStates[i] = ctypes.PeerStateInfo{
// Peer basic info.
NodeAddress: p2p.IDAddressString(peer.ID(), peer.NodeInfo().ListenAddr),
NodeAddress: peer.NodeInfo().NetAddress().String(),
// Peer consensus state.
PeerState: peerStateJSON,
}


+ 8
- 1
rpc/core/net.go View File

@ -1,8 +1,11 @@
package core
import (
"fmt"
"github.com/pkg/errors"
"github.com/tendermint/tendermint/p2p"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
)
@ -37,8 +40,12 @@ import (
func NetInfo() (*ctypes.ResultNetInfo, error) {
peers := []ctypes.Peer{}
for _, peer := range p2pPeers.Peers().List() {
nodeInfo, ok := peer.NodeInfo().(p2p.DefaultNodeInfo)
if !ok {
return nil, fmt.Errorf("peer.NodeInfo() is not DefaultNodeInfo")
}
peers = append(peers, ctypes.Peer{
NodeInfo: peer.NodeInfo(),
NodeInfo: nodeInfo,
IsOutbound: peer.IsOutbound(),
ConnectionStatus: peer.Status(),
})


+ 2
- 1
rpc/core/status.go View File

@ -5,6 +5,7 @@ import (
"time"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/p2p"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
@ -91,7 +92,7 @@ func Status() (*ctypes.ResultStatus, error) {
}
result := &ctypes.ResultStatus{
NodeInfo: p2pTransport.NodeInfo(),
NodeInfo: p2pTransport.NodeInfo().(p2p.DefaultNodeInfo),
SyncInfo: ctypes.SyncInfo{
LatestBlockHash: latestBlockHash,
LatestAppHash: latestAppHash,


+ 4
- 4
rpc/core/types/responses.go View File

@ -74,9 +74,9 @@ type ValidatorInfo struct {
// Node Status
type ResultStatus struct {
NodeInfo p2p.NodeInfo `json:"node_info"`
SyncInfo SyncInfo `json:"sync_info"`
ValidatorInfo ValidatorInfo `json:"validator_info"`
NodeInfo p2p.DefaultNodeInfo `json:"node_info"`
SyncInfo SyncInfo `json:"sync_info"`
ValidatorInfo ValidatorInfo `json:"validator_info"`
}
// Is TxIndexing enabled
@ -107,7 +107,7 @@ type ResultDialPeers struct {
// A peer
type Peer struct {
p2p.NodeInfo `json:"node_info"`
NodeInfo p2p.DefaultNodeInfo `json:"node_info"`
IsOutbound bool `json:"is_outbound"`
ConnectionStatus p2p.ConnectionStatus `json:"connection_status"`
}


+ 6
- 6
rpc/core/types/responses_test.go View File

@ -15,17 +15,17 @@ func TestStatusIndexer(t *testing.T) {
status = &ResultStatus{}
assert.False(t, status.TxIndexEnabled())
status.NodeInfo = p2p.NodeInfo{}
status.NodeInfo = p2p.DefaultNodeInfo{}
assert.False(t, status.TxIndexEnabled())
cases := []struct {
expected bool
other p2p.NodeInfoOther
other p2p.DefaultNodeInfoOther
}{
{false, p2p.NodeInfoOther{}},
{false, p2p.NodeInfoOther{TxIndex: "aa"}},
{false, p2p.NodeInfoOther{TxIndex: "off"}},
{true, p2p.NodeInfoOther{TxIndex: "on"}},
{false, p2p.DefaultNodeInfoOther{}},
{false, p2p.DefaultNodeInfoOther{TxIndex: "aa"}},
{false, p2p.DefaultNodeInfoOther{TxIndex: "off"}},
{true, p2p.DefaultNodeInfoOther{TxIndex: "on"}},
}
for _, tc := range cases {


Loading…
Cancel
Save