Browse Source

reorder peer.go methods

pull/1105/head
Ethan Buchman 7 years ago
parent
commit
f9e4f6eb6b
4 changed files with 95 additions and 104 deletions
  1. +88
    -92
      p2p/peer.go
  2. +2
    -2
      p2p/peer_test.go
  3. +3
    -7
      p2p/switch.go
  4. +2
    -3
      p2p/switch_test.go

+ 88
- 92
p2p/peer.go View File

@ -17,10 +17,10 @@ import (
type Peer interface { type Peer interface {
cmn.Service cmn.Service
ID() ID
IsOutbound() bool
IsPersistent() bool
NodeInfo() NodeInfo
ID() ID // peer's cryptographic ID
IsOutbound() bool // did we dial the peer
IsPersistent() bool // do we redial this peer when we disconnect
NodeInfo() NodeInfo // peer's info
Status() ConnectionStatus Status() ConnectionStatus
Send(byte, interface{}) bool Send(byte, interface{}) bool
@ -30,9 +30,9 @@ type Peer interface {
Get(string) interface{} Get(string) interface{}
} }
// Peer could be marked as persistent, in which case you can use
// Redial function to reconnect. Note that inbound peers can't be
// made persistent. They should be made persistent on the other end.
//----------------------------------------------------------
// peer implements Peer.
// //
// Before using a peer, you will need to perform a handshake on connection. // Before using a peer, you will need to perform a handshake on connection.
type peer struct { type peer struct {
@ -77,7 +77,7 @@ func DefaultPeerConfig() *PeerConfig {
} }
func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor,
onPeerError func(Peer, interface{}), ourNodePrivKey crypto.PrivKey, config *PeerConfig) (*peer, error) {
onPeerError func(Peer, interface{}), ourNodePrivKey crypto.PrivKey, config *PeerConfig, persistent bool) (*peer, error) {
conn, err := dial(addr, config) conn, err := dial(addr, config)
if err != nil { if err != nil {
@ -91,6 +91,7 @@ func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []
} }
return nil, err return nil, err
} }
peer.persistent = persistent
return peer, nil return peer, nil
} }
@ -142,23 +143,41 @@ func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[
return p, nil return p, nil
} }
//---------------------------------------------------
// Implements cmn.Service
// SetLogger implements BaseService.
func (p *peer) SetLogger(l log.Logger) { func (p *peer) SetLogger(l log.Logger) {
p.Logger = l p.Logger = l
p.mconn.SetLogger(l) p.mconn.SetLogger(l)
} }
// CloseConn should be used when the peer was created, but never started.
func (p *peer) CloseConn() {
p.conn.Close() // nolint: errcheck
// OnStart implements BaseService.
func (p *peer) OnStart() error {
if err := p.BaseService.OnStart(); err != nil {
return err
}
err := p.mconn.Start()
return err
} }
// makePersistent marks the peer as persistent.
func (p *peer) makePersistent() {
if !p.outbound {
panic("inbound peers can't be made persistent")
}
// OnStop implements BaseService.
func (p *peer) OnStop() {
p.BaseService.OnStop()
p.mconn.Stop() // stop everything and close the conn
}
//---------------------------------------------------
// Implements Peer
p.persistent = true
// ID returns the peer's ID - the hex encoded hash of its pubkey.
func (p *peer) ID() ID {
return PubKeyToID(p.PubKey())
}
// IsOutbound returns true if the connection is outbound, false otherwise.
func (p *peer) IsOutbound() bool {
return p.outbound
} }
// IsPersistent returns true if the peer is persitent, false otherwise. // IsPersistent returns true if the peer is persitent, false otherwise.
@ -166,7 +185,56 @@ func (p *peer) IsPersistent() bool {
return p.persistent return p.persistent
} }
// HandshakeTimeout performs a handshake between a given node and the peer.
// NodeInfo returns a copy of the peer's NodeInfo.
func (p *peer) NodeInfo() NodeInfo {
return p.nodeInfo
}
// Status returns the peer's ConnectionStatus.
func (p *peer) Status() ConnectionStatus {
return p.mconn.Status()
}
// Send msg to the channel identified by chID byte. Returns false if the send
// queue is full after timeout, specified by MConnection.
func (p *peer) Send(chID byte, msg interface{}) bool {
if !p.IsRunning() {
// see Switch#Broadcast, where we fetch the list of peers and loop over
// them - while we're looping, one peer may be removed and stopped.
return false
}
return p.mconn.Send(chID, msg)
}
// TrySend msg to the channel identified by chID byte. Immediately returns
// false if the send queue is full.
func (p *peer) TrySend(chID byte, msg interface{}) bool {
if !p.IsRunning() {
return false
}
return p.mconn.TrySend(chID, msg)
}
// Get the data for a given key.
func (p *peer) Get(key string) interface{} {
return p.Data.Get(key)
}
// Set sets the data for the given key.
func (p *peer) Set(key string, data interface{}) {
p.Data.Set(key, data)
}
//---------------------------------------------------
// methods used by the Switch
// CloseConn should be called by the Switch if the peer was created but never started.
func (p *peer) CloseConn() {
p.conn.Close() // nolint: errcheck
}
// HandshakeTimeout performs the Tendermint P2P handshake between a given node and the peer
// by exchanging their NodeInfo. It sets the received nodeInfo on the peer.
// NOTE: blocking // NOTE: blocking
func (p *peer) HandshakeTimeout(ourNodeInfo NodeInfo, timeout time.Duration) error { func (p *peer) HandshakeTimeout(ourNodeInfo NodeInfo, timeout time.Duration) error {
// Set deadline for handshake so we don't block forever on conn.ReadFull // Set deadline for handshake so we don't block forever on conn.ReadFull
@ -220,51 +288,6 @@ func (p *peer) PubKey() crypto.PubKey {
panic("Attempt to get peer's PubKey before calling Handshake") panic("Attempt to get peer's PubKey before calling Handshake")
} }
// OnStart implements BaseService.
func (p *peer) OnStart() error {
if err := p.BaseService.OnStart(); err != nil {
return err
}
err := p.mconn.Start()
return err
}
// OnStop implements BaseService.
func (p *peer) OnStop() {
p.BaseService.OnStop()
p.mconn.Stop()
}
// Connection returns underlying MConnection.
func (p *peer) Connection() *MConnection {
return p.mconn
}
// IsOutbound returns true if the connection is outbound, false otherwise.
func (p *peer) IsOutbound() bool {
return p.outbound
}
// Send msg to the channel identified by chID byte. Returns false if the send
// queue is full after timeout, specified by MConnection.
func (p *peer) Send(chID byte, msg interface{}) bool {
if !p.IsRunning() {
// see Switch#Broadcast, where we fetch the list of peers and loop over
// them - while we're looping, one peer may be removed and stopped.
return false
}
return p.mconn.Send(chID, msg)
}
// TrySend msg to the channel identified by chID byte. Immediately returns
// false if the send queue is full.
func (p *peer) TrySend(chID byte, msg interface{}) bool {
if !p.IsRunning() {
return false
}
return p.mconn.TrySend(chID, msg)
}
// CanSend returns true if the send queue is not full, false otherwise. // CanSend returns true if the send queue is not full, false otherwise.
func (p *peer) CanSend(chID byte) bool { func (p *peer) CanSend(chID byte) bool {
if !p.IsRunning() { if !p.IsRunning() {
@ -282,35 +305,8 @@ func (p *peer) String() string {
return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID()) return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID())
} }
// Equals reports whenever 2 peers are actually represent the same node.
func (p *peer) Equals(other Peer) bool {
return p.ID() == other.ID()
}
// Get the data for a given key.
func (p *peer) Get(key string) interface{} {
return p.Data.Get(key)
}
// Set sets the data for the given key.
func (p *peer) Set(key string, data interface{}) {
p.Data.Set(key, data)
}
// ID returns the peer's ID - the hex encoded hash of its pubkey.
func (p *peer) ID() ID {
return PubKeyToID(p.PubKey())
}
// NodeInfo returns a copy of the peer's NodeInfo.
func (p *peer) NodeInfo() NodeInfo {
return p.nodeInfo
}
// Status returns the peer's ConnectionStatus.
func (p *peer) Status() ConnectionStatus {
return p.mconn.Status()
}
//------------------------------------------------------------------
// helper funcs
func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) { func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
conn, err := addr.DialTimeout(config.DialTimeout * time.Second) conn, err := addr.DialTimeout(config.DialTimeout * time.Second)


+ 2
- 2
p2p/peer_test.go View File

@ -30,7 +30,7 @@ func TestPeerBasic(t *testing.T) {
assert.True(p.IsRunning()) assert.True(p.IsRunning())
assert.True(p.IsOutbound()) assert.True(p.IsOutbound())
assert.False(p.IsPersistent()) assert.False(p.IsPersistent())
p.makePersistent()
p.persistent = true
assert.True(p.IsPersistent()) assert.True(p.IsPersistent())
assert.Equal(rp.Addr().String(), p.Addr().String()) assert.Equal(rp.Addr().String(), p.Addr().String())
assert.Equal(rp.PubKey(), p.PubKey()) assert.Equal(rp.PubKey(), p.PubKey())
@ -86,7 +86,7 @@ func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig)
} }
reactorsByCh := map[byte]Reactor{0x01: NewTestReactor(chDescs, true)} reactorsByCh := map[byte]Reactor{0x01: NewTestReactor(chDescs, true)}
pk := crypto.GenPrivKeyEd25519().Wrap() pk := crypto.GenPrivKeyEd25519().Wrap()
p, err := newOutboundPeer(addr, reactorsByCh, chDescs, func(p Peer, r interface{}) {}, pk, config)
p, err := newOutboundPeer(addr, reactorsByCh, chDescs, func(p Peer, r interface{}) {}, pk, config, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }


+ 3
- 7
p2p/switch.go View File

@ -428,9 +428,7 @@ func (sw *Switch) listenerRoutine(l Listener) {
func (sw *Switch) addInboundPeerWithConfig(conn net.Conn, config *PeerConfig) error { func (sw *Switch) addInboundPeerWithConfig(conn net.Conn, config *PeerConfig) error {
peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, config) peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, config)
if err != nil { if err != nil {
if err := conn.Close(); err != nil {
sw.Logger.Error("Error closing connection", "err", err)
}
peer.CloseConn()
return err return err
} }
peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr())) peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
@ -446,7 +444,7 @@ func (sw *Switch) addInboundPeerWithConfig(conn net.Conn, config *PeerConfig) er
// add the peer. // add the peer.
func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig, persistent bool) (Peer, error) { func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig, persistent bool) (Peer, error) {
sw.Logger.Info("Dialing peer", "address", addr) sw.Logger.Info("Dialing peer", "address", addr)
peer, err := newOutboundPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, config)
peer, err := newOutboundPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, config, persistent)
if err != nil { if err != nil {
sw.Logger.Error("Failed to dial peer", "address", addr, "err", err) sw.Logger.Error("Failed to dial peer", "address", addr, "err", err)
return nil, err return nil, err
@ -457,12 +455,10 @@ func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig
if addr.ID == "" { if addr.ID == "" {
peer.Logger.Info("Dialed peer with unknown ID - unable to authenticate", "addr", addr) peer.Logger.Info("Dialed peer with unknown ID - unable to authenticate", "addr", addr)
} else if addr.ID != peer.ID() { } else if addr.ID != peer.ID() {
peer.CloseConn()
return nil, fmt.Errorf("Failed to authenticate peer %v. Connected to peer with ID %s", addr, peer.ID()) return nil, fmt.Errorf("Failed to authenticate peer %v. Connected to peer with ID %s", addr, peer.ID())
} }
if persistent {
peer.makePersistent()
}
err = sw.addPeer(peer) err = sw.addPeer(peer)
if err != nil { if err != nil {
sw.Logger.Error("Failed to add peer", "address", addr, "err", err) sw.Logger.Error("Failed to add peer", "address", addr, "err", err)


+ 2
- 3
p2p/switch_test.go View File

@ -236,7 +236,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
rp.Start() rp.Start()
defer rp.Stop() defer rp.Stop()
peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, DefaultPeerConfig())
peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, DefaultPeerConfig(), false)
require.Nil(err) require.Nil(err)
err = sw.addPeer(peer) err = sw.addPeer(peer)
require.Nil(err) require.Nil(err)
@ -263,8 +263,7 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
rp.Start() rp.Start()
defer rp.Stop() defer rp.Stop()
peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, DefaultPeerConfig())
peer.makePersistent()
peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, DefaultPeerConfig(), true)
require.Nil(err) require.Nil(err)
err = sw.addPeer(peer) err = sw.addPeer(peer)
require.Nil(err) require.Nil(err)


Loading…
Cancel
Save