Browse Source

p2p: use sub dirs

pull/1141/head
Ethan Buchman 7 years ago
parent
commit
5b5cbaa66a
28 changed files with 366 additions and 407 deletions
  1. +6
    -3
      p2p/base_reactor.go
  2. +14
    -13
      p2p/listener.go
  3. +29
    -26
      p2p/peer.go
  4. +9
    -7
      p2p/peer_set.go
  5. +3
    -2
      p2p/peer_set_test.go
  6. +10
    -8
      p2p/peer_test.go
  7. +61
    -180
      p2p/pex/addrbook.go
  8. +8
    -7
      p2p/pex/addrbook_test.go
  9. +1
    -1
      p2p/pex/file.go
  10. +10
    -6
      p2p/pex/known_address.go
  11. +1
    -1
      p2p/pex/params.go
  12. +28
    -18
      p2p/pex/pex_reactor.go
  13. +48
    -63
      p2p/pex/pex_reactor_test.go
  14. +25
    -21
      p2p/switch.go
  15. +21
    -19
      p2p/switch_test.go
  16. +42
    -6
      p2p/test_util.go
  17. +2
    -2
      p2p/tmconn/conn_go110.go
  18. +2
    -2
      p2p/tmconn/conn_notgo110.go
  19. +8
    -8
      p2p/tmconn/connection.go
  20. +7
    -7
      p2p/tmconn/connection_test.go
  21. +1
    -1
      p2p/tmconn/secret_connection.go
  22. +1
    -1
      p2p/tmconn/secret_connection_test.go
  23. +20
    -0
      p2p/types/errors.go
  24. +1
    -1
      p2p/types/key.go
  25. +1
    -1
      p2p/types/key_test.go
  26. +1
    -1
      p2p/types/netaddress.go
  27. +1
    -1
      p2p/types/netaddress_test.go
  28. +5
    -1
      p2p/types/node_info.go

+ 6
- 3
p2p/base_reactor.go View File

@ -1,12 +1,15 @@
package p2p
import cmn "github.com/tendermint/tmlibs/common"
import (
"github.com/tendermint/tendermint/p2p/tmconn"
cmn "github.com/tendermint/tmlibs/common"
)
type Reactor interface {
cmn.Service // Start, Stop
SetSwitch(*Switch)
GetChannels() []*ChannelDescriptor
GetChannels() []*tmconn.ChannelDescriptor
AddPeer(peer Peer)
RemovePeer(peer Peer, reason interface{})
Receive(chID byte, peer Peer, msgBytes []byte) // CONTRACT: msgBytes are not nil
@ -29,7 +32,7 @@ func NewBaseReactor(name string, impl Reactor) *BaseReactor {
func (br *BaseReactor) SetSwitch(sw *Switch) {
br.Switch = sw
}
func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil }
func (_ *BaseReactor) GetChannels() []*tmconn.ChannelDescriptor { return nil }
func (_ *BaseReactor) AddPeer(peer Peer) {}
func (_ *BaseReactor) RemovePeer(peer Peer, reason interface{}) {}
func (_ *BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {}

+ 14
- 13
p2p/listener.go View File

@ -6,6 +6,7 @@ import (
"strconv"
"time"
"github.com/tendermint/tendermint/p2p/types"
"github.com/tendermint/tendermint/p2p/upnp"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
@ -13,8 +14,8 @@ import (
type Listener interface {
Connections() <-chan net.Conn
InternalAddress() *NetAddress
ExternalAddress() *NetAddress
InternalAddress() *types.NetAddress
ExternalAddress() *types.NetAddress
String() string
Stop() error
}
@ -24,8 +25,8 @@ type DefaultListener struct {
cmn.BaseService
listener net.Listener
intAddr *NetAddress
extAddr *NetAddress
intAddr *types.NetAddress
extAddr *types.NetAddress
connections chan net.Conn
}
@ -71,14 +72,14 @@ func NewDefaultListener(protocol string, lAddr string, skipUPNP bool, logger log
logger.Info("Local listener", "ip", listenerIP, "port", listenerPort)
// Determine internal address...
var intAddr *NetAddress
intAddr, err = NewNetAddressString(lAddr)
var intAddr *types.NetAddress
intAddr, err = types.NewNetAddressString(lAddr)
if err != nil {
panic(err)
}
// Determine external address...
var extAddr *NetAddress
var extAddr *types.NetAddress
if !skipUPNP {
// If the lAddrIP is INADDR_ANY, try UPnP
if lAddrIP == "" || lAddrIP == "0.0.0.0" {
@ -151,11 +152,11 @@ func (l *DefaultListener) Connections() <-chan net.Conn {
return l.connections
}
func (l *DefaultListener) InternalAddress() *NetAddress {
func (l *DefaultListener) InternalAddress() *types.NetAddress {
return l.intAddr
}
func (l *DefaultListener) ExternalAddress() *NetAddress {
func (l *DefaultListener) ExternalAddress() *types.NetAddress {
return l.extAddr
}
@ -172,7 +173,7 @@ func (l *DefaultListener) String() string {
/* external address helpers */
// UPNP external address discovery & port mapping
func getUPNPExternalAddress(externalPort, internalPort int, logger log.Logger) *NetAddress {
func getUPNPExternalAddress(externalPort, internalPort int, logger log.Logger) *types.NetAddress {
logger.Info("Getting UPNP external address")
nat, err := upnp.Discover()
if err != nil {
@ -198,11 +199,11 @@ func getUPNPExternalAddress(externalPort, internalPort int, logger log.Logger) *
}
logger.Info("Got UPNP external address", "address", ext)
return NewNetAddressIPPort(ext, uint16(externalPort))
return types.NewNetAddressIPPort(ext, uint16(externalPort))
}
// TODO: use syscalls: see issue #712
func getNaiveExternalAddress(port int, settleForLocal bool, logger log.Logger) *NetAddress {
func getNaiveExternalAddress(port int, settleForLocal bool, logger log.Logger) *types.NetAddress {
addrs, err := net.InterfaceAddrs()
if err != nil {
panic(cmn.Fmt("Could not fetch interface addresses: %v", err))
@ -217,7 +218,7 @@ func getNaiveExternalAddress(port int, settleForLocal bool, logger log.Logger) *
if v4 == nil || (!settleForLocal && v4[0] == 127) {
continue
} // loopback
return NewNetAddressIPPort(ipnet.IP, uint16(port))
return types.NewNetAddressIPPort(ipnet.IP, uint16(port))
}
// try again, but settle for local


+ 29
- 26
p2p/peer.go View File

@ -11,17 +11,20 @@ import (
wire "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
"github.com/tendermint/tendermint/p2p/tmconn"
"github.com/tendermint/tendermint/p2p/types"
)
// Peer is an interface representing a peer connected on a reactor.
type Peer interface {
cmn.Service
ID() ID // peer's cryptographic ID
IsOutbound() bool // did we dial the peer
IsPersistent() bool // do we redial this peer when we disconnect
NodeInfo() NodeInfo // peer's info
Status() ConnectionStatus
ID() types.ID // peer's cryptographic ID
IsOutbound() bool // did we dial the peer
IsPersistent() bool // do we redial this peer when we disconnect
NodeInfo() types.NodeInfo // peer's info
Status() tmconn.ConnectionStatus
Send(byte, interface{}) bool
TrySend(byte, interface{}) bool
@ -40,13 +43,13 @@ type peer struct {
outbound bool
conn net.Conn // source connection
mconn *MConnection // multiplex connection
conn net.Conn // source connection
mconn *tmconn.MConnection // multiplex connection
persistent bool
config *PeerConfig
nodeInfo NodeInfo
nodeInfo types.NodeInfo
Data *cmn.CMap // User data.
}
@ -58,7 +61,7 @@ type PeerConfig struct {
HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"`
DialTimeout time.Duration `mapstructure:"dial_timeout"`
MConfig *MConnConfig `mapstructure:"connection"`
MConfig *tmconn.MConnConfig `mapstructure:"connection"`
Fuzz bool `mapstructure:"fuzz"` // fuzz connection (for testing)
FuzzConfig *FuzzConnConfig `mapstructure:"fuzz_config"`
@ -70,13 +73,13 @@ func DefaultPeerConfig() *PeerConfig {
AuthEnc: true,
HandshakeTimeout: 20, // * time.Second,
DialTimeout: 3, // * time.Second,
MConfig: DefaultMConnConfig(),
MConfig: tmconn.DefaultMConnConfig(),
Fuzz: false,
FuzzConfig: DefaultFuzzConnConfig(),
}
}
func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor,
func newOutboundPeer(addr *types.NetAddress, reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}), ourNodePrivKey crypto.PrivKey, config *PeerConfig, persistent bool) (*peer, error) {
conn, err := dial(addr, config)
@ -96,7 +99,7 @@ func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []
return peer, nil
}
func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor,
func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}), ourNodePrivKey crypto.PrivKey, config *PeerConfig) (*peer, error) {
// TODO: issue PoW challenge
@ -104,7 +107,7 @@ func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*Cha
return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
}
func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor,
func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}), ourNodePrivKey crypto.PrivKey, config *PeerConfig) (*peer, error) {
conn := rawConn
@ -122,7 +125,7 @@ func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[
}
var err error
conn, err = MakeSecretConnection(conn, ourNodePrivKey)
conn, err = tmconn.MakeSecretConnection(conn, ourNodePrivKey)
if err != nil {
return nil, errors.Wrap(err, "Error creating peer")
}
@ -171,8 +174,8 @@ func (p *peer) OnStop() {
// Implements Peer
// ID returns the peer's ID - the hex encoded hash of its pubkey.
func (p *peer) ID() ID {
return PubKeyToID(p.PubKey())
func (p *peer) ID() types.ID {
return types.PubKeyToID(p.PubKey())
}
// IsOutbound returns true if the connection is outbound, false otherwise.
@ -186,12 +189,12 @@ func (p *peer) IsPersistent() bool {
}
// NodeInfo returns a copy of the peer's NodeInfo.
func (p *peer) NodeInfo() NodeInfo {
func (p *peer) NodeInfo() types.NodeInfo {
return p.nodeInfo
}
// Status returns the peer's ConnectionStatus.
func (p *peer) Status() ConnectionStatus {
func (p *peer) Status() tmconn.ConnectionStatus {
return p.mconn.Status()
}
@ -236,13 +239,13 @@ func (p *peer) CloseConn() {
// HandshakeTimeout performs the Tendermint P2P handshake between a given node and the peer
// by exchanging their NodeInfo. It sets the received nodeInfo on the peer.
// NOTE: blocking
func (p *peer) HandshakeTimeout(ourNodeInfo NodeInfo, timeout time.Duration) error {
func (p *peer) HandshakeTimeout(ourNodeInfo types.NodeInfo, timeout time.Duration) error {
// Set deadline for handshake so we don't block forever on conn.ReadFull
if err := p.conn.SetDeadline(time.Now().Add(timeout)); err != nil {
return errors.Wrap(err, "Error setting deadline")
}
var peerNodeInfo NodeInfo
var peerNodeInfo types.NodeInfo
var err1 error
var err2 error
cmn.Parallel(
@ -252,7 +255,7 @@ func (p *peer) HandshakeTimeout(ourNodeInfo NodeInfo, timeout time.Duration) err
},
func() {
var n int
wire.ReadBinary(&peerNodeInfo, p.conn, maxNodeInfoSize, &n, &err2)
wire.ReadBinary(&peerNodeInfo, p.conn, types.MaxNodeInfoSize(), &n, &err2)
p.Logger.Info("Peer handshake", "peerNodeInfo", peerNodeInfo)
})
if err1 != nil {
@ -283,7 +286,7 @@ func (p *peer) PubKey() crypto.PubKey {
if !p.nodeInfo.PubKey.Empty() {
return p.nodeInfo.PubKey
} else if p.config.AuthEnc {
return p.conn.(*SecretConnection).RemotePubKey()
return p.conn.(*tmconn.SecretConnection).RemotePubKey()
}
panic("Attempt to get peer's PubKey before calling Handshake")
}
@ -308,7 +311,7 @@ func (p *peer) String() string {
//------------------------------------------------------------------
// helper funcs
func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
func dial(addr *types.NetAddress, config *PeerConfig) (net.Conn, error) {
conn, err := addr.DialTimeout(config.DialTimeout * time.Second)
if err != nil {
return nil, err
@ -316,8 +319,8 @@ func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
return conn, nil
}
func createMConnection(conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor,
onPeerError func(Peer, interface{}), config *MConnConfig) *MConnection {
func createMConnection(conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}), config *tmconn.MConnConfig) *tmconn.MConnection {
onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID]
@ -331,5 +334,5 @@ func createMConnection(conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, ch
onPeerError(p, r)
}
return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
return tmconn.NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
}

+ 9
- 7
p2p/peer_set.go View File

@ -2,12 +2,14 @@ package p2p
import (
"sync"
"github.com/tendermint/tendermint/p2p/types"
)
// IPeerSet has a (immutable) subset of the methods of PeerSet.
type IPeerSet interface {
Has(key ID) bool
Get(key ID) Peer
Has(key types.ID) bool
Get(key types.ID) Peer
List() []Peer
Size() int
}
@ -18,7 +20,7 @@ type IPeerSet interface {
// Iteration over the peers is super fast and thread-safe.
type PeerSet struct {
mtx sync.Mutex
lookup map[ID]*peerSetItem
lookup map[types.ID]*peerSetItem
list []Peer
}
@ -30,7 +32,7 @@ type peerSetItem struct {
// NewPeerSet creates a new peerSet with a list of initial capacity of 256 items.
func NewPeerSet() *PeerSet {
return &PeerSet{
lookup: make(map[ID]*peerSetItem),
lookup: make(map[types.ID]*peerSetItem),
list: make([]Peer, 0, 256),
}
}
@ -41,7 +43,7 @@ func (ps *PeerSet) Add(peer Peer) error {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.lookup[peer.ID()] != nil {
return ErrSwitchDuplicatePeer
return types.ErrSwitchDuplicatePeer
}
index := len(ps.list)
@ -54,7 +56,7 @@ func (ps *PeerSet) Add(peer Peer) error {
// Has returns true iff the PeerSet contains
// the peer referred to by this peerKey.
func (ps *PeerSet) Has(peerKey ID) bool {
func (ps *PeerSet) Has(peerKey types.ID) bool {
ps.mtx.Lock()
_, ok := ps.lookup[peerKey]
ps.mtx.Unlock()
@ -62,7 +64,7 @@ func (ps *PeerSet) Has(peerKey ID) bool {
}
// Get looks up a peer by the provided peerKey.
func (ps *PeerSet) Get(peerKey ID) Peer {
func (ps *PeerSet) Get(peerKey types.ID) Peer {
ps.mtx.Lock()
defer ps.mtx.Unlock()
item, ok := ps.lookup[peerKey]


+ 3
- 2
p2p/peer_set_test.go View File

@ -8,13 +8,14 @@ import (
"github.com/stretchr/testify/assert"
crypto "github.com/tendermint/go-crypto"
"github.com/tendermint/tendermint/p2p/types"
cmn "github.com/tendermint/tmlibs/common"
)
// Returns an empty dummy peer
func randPeer() *peer {
return &peer{
nodeInfo: NodeInfo{
nodeInfo: types.NodeInfo{
ListenAddr: cmn.Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256),
PubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(),
},
@ -119,7 +120,7 @@ func TestPeerSetAddDuplicate(t *testing.T) {
// Our next procedure is to ensure that only one addition
// succeeded and that the rest are each ErrSwitchDuplicatePeer.
wantErrCount, gotErrCount := n-1, errsTally[ErrSwitchDuplicatePeer]
wantErrCount, gotErrCount := n-1, errsTally[types.ErrSwitchDuplicatePeer]
assert.Equal(t, wantErrCount, gotErrCount, "invalid ErrSwitchDuplicatePeer count")
wantNilErrCount, gotNilErrCount := 1, errsTally[nil]


+ 10
- 8
p2p/peer_test.go View File

@ -10,6 +10,8 @@ import (
"github.com/stretchr/testify/require"
crypto "github.com/tendermint/go-crypto"
"github.com/tendermint/tendermint/p2p/tmconn"
"github.com/tendermint/tendermint/p2p/types"
)
func TestPeerBasic(t *testing.T) {
@ -80,8 +82,8 @@ func TestPeerSend(t *testing.T) {
assert.True(p.Send(0x01, "Asylum"))
}
func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) (*peer, error) {
chDescs := []*ChannelDescriptor{
func createOutboundPeerAndPerformHandshake(addr *types.NetAddress, config *PeerConfig) (*peer, error) {
chDescs := []*tmconn.ChannelDescriptor{
{ID: 0x01, Priority: 1},
}
reactorsByCh := map[byte]Reactor{0x01: NewTestReactor(chDescs, true)}
@ -90,7 +92,7 @@ func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig)
if err != nil {
return nil, err
}
err = p.HandshakeTimeout(NodeInfo{
err = p.HandshakeTimeout(types.NodeInfo{
PubKey: pk.PubKey(),
Moniker: "host_peer",
Network: "testing",
@ -105,11 +107,11 @@ func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig)
type remotePeer struct {
PrivKey crypto.PrivKey
Config *PeerConfig
addr *NetAddress
addr *types.NetAddress
quit chan struct{}
}
func (p *remotePeer) Addr() *NetAddress {
func (p *remotePeer) Addr() *types.NetAddress {
return p.addr
}
@ -122,7 +124,7 @@ func (p *remotePeer) Start() {
if e != nil {
golog.Fatalf("net.Listen tcp :0: %+v", e)
}
p.addr = NewNetAddress("", l.Addr())
p.addr = types.NewNetAddress("", l.Addr())
p.quit = make(chan struct{})
go p.accept(l)
}
@ -137,11 +139,11 @@ func (p *remotePeer) accept(l net.Listener) {
if err != nil {
golog.Fatalf("Failed to accept conn: %+v", err)
}
peer, err := newInboundPeer(conn, make(map[byte]Reactor), make([]*ChannelDescriptor, 0), func(p Peer, r interface{}) {}, p.PrivKey, p.Config)
peer, err := newInboundPeer(conn, make(map[byte]Reactor), make([]*tmconn.ChannelDescriptor, 0), func(p Peer, r interface{}) {}, p.PrivKey, p.Config)
if err != nil {
golog.Fatalf("Failed to create a peer: %+v", err)
}
err = peer.HandshakeTimeout(NodeInfo{
err = peer.HandshakeTimeout(types.NodeInfo{
PubKey: p.PrivKey.PubKey(),
Moniker: "remote_peer",
Network: "testing",


p2p/addrbook/addrbook.go → p2p/pex/addrbook.go View File


p2p/addrbook/addrbook_test.go → p2p/pex/addrbook_test.go View File


p2p/addrbook/file.go → p2p/pex/file.go View File


p2p/addrbook/known_address.go → p2p/pex/known_address.go View File


p2p/addrbook/params.go → p2p/pex/params.go View File


p2p/pex_reactor.go → p2p/pex/pex_reactor.go View File


p2p/pex_reactor_test.go → p2p/pex/pex_reactor_test.go View File


+ 25
- 21
p2p/switch.go View File

@ -11,6 +11,8 @@ import (
crypto "github.com/tendermint/go-crypto"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p/tmconn"
"github.com/tendermint/tendermint/p2p/types"
cmn "github.com/tendermint/tmlibs/common"
)
@ -30,10 +32,11 @@ const (
reconnectBackOffBaseSeconds = 3
)
var (
ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
ErrSwitchConnectToSelf = errors.New("Connect to self")
)
//-----------------------------------------------------------------------------
type AddrBook interface {
AddAddress(addr *types.NetAddress, src *types.NetAddress)
}
//-----------------------------------------------------------------------------
@ -48,12 +51,12 @@ type Switch struct {
peerConfig *PeerConfig
listeners []Listener
reactors map[string]Reactor
chDescs []*ChannelDescriptor
chDescs []*tmconn.ChannelDescriptor
reactorsByCh map[byte]Reactor
peers *PeerSet
dialing *cmn.CMap
nodeInfo NodeInfo // our node info
nodeKey *NodeKey // our node privkey
nodeInfo types.NodeInfo // our node info
nodeKey *types.NodeKey // our node privkey
filterConnByAddr func(net.Addr) error
filterConnByPubKey func(crypto.PubKey) error
@ -66,7 +69,7 @@ func NewSwitch(config *cfg.P2PConfig) *Switch {
config: config,
peerConfig: DefaultPeerConfig(),
reactors: make(map[string]Reactor),
chDescs: make([]*ChannelDescriptor, 0),
chDescs: make([]*tmconn.ChannelDescriptor, 0),
reactorsByCh: make(map[byte]Reactor),
peers: NewPeerSet(),
dialing: cmn.NewCMap(),
@ -77,10 +80,10 @@ func NewSwitch(config *cfg.P2PConfig) *Switch {
sw.rng = rand.New(rand.NewSource(cmn.RandInt64()))
// TODO: collapse the peerConfig into the config ?
sw.peerConfig.MConfig.flushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond
sw.peerConfig.MConfig.FlushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond
sw.peerConfig.MConfig.SendRate = config.SendRate
sw.peerConfig.MConfig.RecvRate = config.RecvRate
sw.peerConfig.MConfig.maxMsgPacketPayloadSize = config.MaxMsgPacketPayloadSize
sw.peerConfig.MConfig.MaxMsgPacketPayloadSize = config.MaxMsgPacketPayloadSize
sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
return sw
@ -140,19 +143,19 @@ func (sw *Switch) IsListening() bool {
// SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
// NOTE: Not goroutine safe.
func (sw *Switch) SetNodeInfo(nodeInfo NodeInfo) {
func (sw *Switch) SetNodeInfo(nodeInfo types.NodeInfo) {
sw.nodeInfo = nodeInfo
}
// NodeInfo returns the switch's NodeInfo.
// NOTE: Not goroutine safe.
func (sw *Switch) NodeInfo() NodeInfo {
func (sw *Switch) NodeInfo() types.NodeInfo {
return sw.nodeInfo
}
// SetNodeKey sets the switch's private key for authenticated encryption.
// NOTE: Not goroutine safe.
func (sw *Switch) SetNodeKey(nodeKey *NodeKey) {
func (sw *Switch) SetNodeKey(nodeKey *types.NodeKey) {
sw.nodeKey = nodeKey
}
@ -311,13 +314,13 @@ func (sw *Switch) reconnectToPeer(peer Peer) {
// Dialing
// IsDialing returns true if the switch is currently dialing the given ID.
func (sw *Switch) IsDialing(id ID) bool {
func (sw *Switch) IsDialing(id types.ID) bool {
return sw.dialing.Has(string(id))
}
// DialPeersAsync dials a list of peers asynchronously in random order (optionally, making them persistent).
func (sw *Switch) DialPeersAsync(addrBook *AddrBook, peers []string, persistent bool) error {
netAddrs, errs := NewNetAddressStrings(peers)
func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent bool) error {
netAddrs, errs := types.NewNetAddressStrings(peers)
for _, err := range errs {
sw.Logger.Error("Error in peer's address", "err", err)
}
@ -330,6 +333,7 @@ func (sw *Switch) DialPeersAsync(addrBook *AddrBook, peers []string, persistent
if netAddr.Same(ourAddr) {
continue
}
// TODO: move this out of here ?
addrBook.AddAddress(netAddr, ourAddr)
}
}
@ -353,7 +357,7 @@ func (sw *Switch) DialPeersAsync(addrBook *AddrBook, peers []string, persistent
// DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects and authenticates successfully.
// If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails.
func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (Peer, error) {
func (sw *Switch) DialPeerWithAddress(addr *types.NetAddress, persistent bool) (Peer, error) {
sw.dialing.Set(string(addr.ID), addr)
defer sw.dialing.Delete(string(addr.ID))
return sw.addOutboundPeerWithConfig(addr, sw.peerConfig, persistent)
@ -439,7 +443,7 @@ func (sw *Switch) addInboundPeerWithConfig(conn net.Conn, config *PeerConfig) er
// dial the peer; make secret connection; authenticate against the dialed ID;
// add the peer.
func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig, persistent bool) (Peer, error) {
func (sw *Switch) addOutboundPeerWithConfig(addr *types.NetAddress, config *PeerConfig, persistent bool) (Peer, error) {
sw.Logger.Info("Dialing peer", "address", addr)
peer, err := newOutboundPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, config, persistent)
if err != nil {
@ -453,7 +457,7 @@ func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig
peer.Logger.Info("Dialed peer with unknown ID - unable to authenticate", "addr", addr)
} else if addr.ID != peer.ID() {
peer.CloseConn()
return nil, fmt.Errorf("Failed to authenticate peer %v. Connected to peer with ID %s", addr, peer.ID())
return nil, types.ErrSwitchAuthenticationFailure{addr, peer.ID()}
}
err = sw.addPeer(peer)
@ -474,12 +478,12 @@ func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig
func (sw *Switch) addPeer(peer *peer) error {
// Avoid self
if sw.nodeKey.ID() == peer.ID() {
return ErrSwitchConnectToSelf
return types.ErrSwitchConnectToSelf
}
// Avoid duplicate
if sw.peers.Has(peer.ID()) {
return ErrSwitchDuplicatePeer
return types.ErrSwitchDuplicatePeer
}


+ 21
- 19
p2p/switch_test.go View File

@ -16,6 +16,8 @@ import (
"github.com/tendermint/tmlibs/log"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p/tmconn"
"github.com/tendermint/tendermint/p2p/types"
)
var (
@ -28,7 +30,7 @@ func init() {
}
type PeerMessage struct {
PeerID ID
PeerID types.ID
Bytes []byte
Counter int
}
@ -37,7 +39,7 @@ type TestReactor struct {
BaseReactor
mtx sync.Mutex
channels []*ChannelDescriptor
channels []*tmconn.ChannelDescriptor
peersAdded []Peer
peersRemoved []Peer
logMessages bool
@ -45,7 +47,7 @@ type TestReactor struct {
msgsReceived map[byte][]PeerMessage
}
func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReactor {
func NewTestReactor(channels []*tmconn.ChannelDescriptor, logMessages bool) *TestReactor {
tr := &TestReactor{
channels: channels,
logMessages: logMessages,
@ -56,7 +58,7 @@ func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReacto
return tr
}
func (tr *TestReactor) GetChannels() []*ChannelDescriptor {
func (tr *TestReactor) GetChannels() []*tmconn.ChannelDescriptor {
return tr.channels
}
@ -92,7 +94,7 @@ func (tr *TestReactor) getMsgs(chID byte) []PeerMessage {
// convenience method for creating two switches connected to each other.
// XXX: note this uses net.Pipe and not a proper TCP conn
func makeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) {
func MakeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) {
// Create two switches that will be interconnected.
switches := MakeConnectedSwitches(config, 2, initSwitch, Connect2Switches)
return switches[0], switches[1]
@ -100,11 +102,11 @@ func makeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switc
func initSwitchFunc(i int, sw *Switch) *Switch {
// Make two reactors of two channels each
sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
sw.AddReactor("foo", NewTestReactor([]*tmconn.ChannelDescriptor{
{ID: byte(0x00), Priority: 10},
{ID: byte(0x01), Priority: 10},
}, true))
sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
sw.AddReactor("bar", NewTestReactor([]*tmconn.ChannelDescriptor{
{ID: byte(0x02), Priority: 10},
{ID: byte(0x03), Priority: 10},
}, true))
@ -112,7 +114,7 @@ func initSwitchFunc(i int, sw *Switch) *Switch {
}
func TestSwitches(t *testing.T) {
s1, s2 := makeSwitchPair(t, initSwitchFunc)
s1, s2 := MakeSwitchPair(t, initSwitchFunc)
defer s1.Stop()
defer s2.Stop()
@ -156,12 +158,12 @@ func assertMsgReceivedWithTimeout(t *testing.T, msg string, channel byte, reacto
}
func TestConnAddrFilter(t *testing.T) {
s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
s1 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
s2 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
defer s1.Stop()
defer s2.Stop()
c1, c2 := netPipe()
c1, c2 := tmconn.NetPipe()
s1.SetAddrFilter(func(addr net.Addr) error {
if addr.String() == c1.RemoteAddr().String() {
@ -192,12 +194,12 @@ func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration)
}
func TestConnPubKeyFilter(t *testing.T) {
s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
s1 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
s2 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
defer s1.Stop()
defer s2.Stop()
c1, c2 := netPipe()
c1, c2 := tmconn.NetPipe()
// set pubkey filter
s1.SetPubKeyFilter(func(pubkey crypto.PubKey) error {
@ -224,7 +226,7 @@ func TestConnPubKeyFilter(t *testing.T) {
func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
assert, require := assert.New(t), require.New(t)
sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
sw := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
err := sw.Start()
if err != nil {
t.Error(err)
@ -251,7 +253,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
assert, require := assert.New(t), require.New(t)
sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
sw := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
err := sw.Start()
if err != nil {
t.Error(err)
@ -302,13 +304,13 @@ func TestSwitchFullConnectivity(t *testing.T) {
func BenchmarkSwitches(b *testing.B) {
b.StopTimer()
s1, s2 := makeSwitchPair(b, func(i int, sw *Switch) *Switch {
s1, s2 := MakeSwitchPair(b, func(i int, sw *Switch) *Switch {
// Make bar reactors of bar channels each
sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
sw.AddReactor("foo", NewTestReactor([]*tmconn.ChannelDescriptor{
{ID: byte(0x00), Priority: 10},
{ID: byte(0x01), Priority: 10},
}, false))
sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
sw.AddReactor("bar", NewTestReactor([]*tmconn.ChannelDescriptor{
{ID: byte(0x02), Priority: 10},
{ID: byte(0x03), Priority: 10},
}, false))


+ 42
- 6
p2p/test_util.go View File

@ -5,11 +5,47 @@ import (
"net"
crypto "github.com/tendermint/go-crypto"
cfg "github.com/tendermint/tendermint/config"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p/tmconn"
"github.com/tendermint/tendermint/p2p/types"
)
func AddPeerToSwitch(sw *Switch, peer Peer) {
sw.peers.Add(peer)
}
func CreateRandomPeer(outbound bool) *peer {
addr, netAddr := CreateRoutableAddr()
p := &peer{
nodeInfo: types.NodeInfo{
ListenAddr: netAddr.DialString(),
PubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(),
},
outbound: outbound,
mconn: &tmconn.MConnection{},
}
p.SetLogger(log.TestingLogger().With("peer", addr))
return p
}
func CreateRoutableAddr() (addr string, netAddr *types.NetAddress) {
for {
var err error
addr = cmn.Fmt("%X@%v.%v.%v.%v:46656", cmn.RandBytes(20), rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256)
netAddr, err = types.NewNetAddressString(addr)
if err != nil {
panic(err)
}
if netAddr.Routable() {
break
}
}
return
}
//------------------------------------------------------------------
// Connects switches via arbitrary net.Conn. Used for testing.
@ -20,7 +56,7 @@ import (
func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
switches := make([]*Switch, n)
for i := 0; i < n; i++ {
switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch)
switches[i] = MakeSwitch(cfg, i, "testing", "123.123.123", initSwitch)
}
if err := StartSwitches(switches); err != nil {
@ -42,7 +78,7 @@ func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Swit
func Connect2Switches(switches []*Switch, i, j int) {
switchI := switches[i]
switchJ := switches[j]
c1, c2 := netPipe()
c1, c2 := tmconn.NetPipe()
doneCh := make(chan struct{})
go func() {
err := switchI.addPeerWithConnection(c1)
@ -91,16 +127,16 @@ func StartSwitches(switches []*Switch) error {
return nil
}
func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
func MakeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
// new switch, add reactors
// TODO: let the config be passed in?
nodeKey := &NodeKey{
nodeKey := &types.NodeKey{
PrivKey: crypto.GenPrivKeyEd25519().Wrap(),
}
s := NewSwitch(cfg)
s.SetLogger(log.TestingLogger())
s = initSwitch(i, s)
s.SetNodeInfo(NodeInfo{
s.SetNodeInfo(types.NodeInfo{
PubKey: nodeKey.PubKey(),
Moniker: cmn.Fmt("switch%d", i),
Network: network,


p2p/conn_go110.go → p2p/tmconn/conn_go110.go View File


p2p/conn_notgo110.go → p2p/tmconn/conn_notgo110.go View File


p2p/connection.go → p2p/tmconn/connection.go View File


p2p/connection_test.go → p2p/tmconn/connection_test.go View File


p2p/secret_connection.go → p2p/tmconn/secret_connection.go View File


p2p/secret_connection_test.go → p2p/tmconn/secret_connection_test.go View File


+ 20
- 0
p2p/types/errors.go View File

@ -0,0 +1,20 @@
package types
import (
"errors"
"fmt"
)
var (
ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
ErrSwitchConnectToSelf = errors.New("Connect to self")
)
type ErrSwitchAuthenticationFailure struct {
Dialed *NetAddress
Got ID
}
func (e ErrSwitchAuthenticationFailure) Error() string {
return fmt.Sprintf("Failed to authenticate peer. Dialed %v, but got peer with ID %s", e.Dialed, e.Got)
}

p2p/key.go → p2p/types/key.go View File


p2p/key_test.go → p2p/types/key_test.go View File


p2p/netaddress.go → p2p/types/netaddress.go View File


p2p/netaddress_test.go → p2p/types/netaddress_test.go View File


p2p/types.go → p2p/types/node_info.go View File


Loading…
Cancel
Save