From f9e4f6eb6ba51d0cac2186bccdd7a9f320fe3c0e Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 14 Jan 2018 00:44:16 -0500 Subject: [PATCH] reorder peer.go methods --- p2p/peer.go | 180 ++++++++++++++++++++++----------------------- p2p/peer_test.go | 4 +- p2p/switch.go | 10 +-- p2p/switch_test.go | 5 +- 4 files changed, 95 insertions(+), 104 deletions(-) diff --git a/p2p/peer.go b/p2p/peer.go index 57718b6bf..596b92168 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -17,10 +17,10 @@ import ( type Peer interface { cmn.Service - ID() ID - IsOutbound() bool - IsPersistent() bool - NodeInfo() NodeInfo + ID() ID // peer's cryptographic ID + IsOutbound() bool // did we dial the peer + IsPersistent() bool // do we redial this peer when we disconnect + NodeInfo() NodeInfo // peer's info Status() ConnectionStatus Send(byte, interface{}) bool @@ -30,9 +30,9 @@ type Peer interface { Get(string) interface{} } -// Peer could be marked as persistent, in which case you can use -// Redial function to reconnect. Note that inbound peers can't be -// made persistent. They should be made persistent on the other end. +//---------------------------------------------------------- + +// peer implements Peer. // // Before using a peer, you will need to perform a handshake on connection. type peer struct { @@ -77,7 +77,7 @@ func DefaultPeerConfig() *PeerConfig { } func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, - onPeerError func(Peer, interface{}), ourNodePrivKey crypto.PrivKey, config *PeerConfig) (*peer, error) { + onPeerError func(Peer, interface{}), ourNodePrivKey crypto.PrivKey, config *PeerConfig, persistent bool) (*peer, error) { conn, err := dial(addr, config) if err != nil { @@ -91,6 +91,7 @@ func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs [] } return nil, err } + peer.persistent = persistent return peer, nil } @@ -142,23 +143,41 @@ func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[ return p, nil } +//--------------------------------------------------- +// Implements cmn.Service + +// SetLogger implements BaseService. func (p *peer) SetLogger(l log.Logger) { p.Logger = l p.mconn.SetLogger(l) } -// CloseConn should be used when the peer was created, but never started. -func (p *peer) CloseConn() { - p.conn.Close() // nolint: errcheck +// OnStart implements BaseService. +func (p *peer) OnStart() error { + if err := p.BaseService.OnStart(); err != nil { + return err + } + err := p.mconn.Start() + return err } -// makePersistent marks the peer as persistent. -func (p *peer) makePersistent() { - if !p.outbound { - panic("inbound peers can't be made persistent") - } +// OnStop implements BaseService. +func (p *peer) OnStop() { + p.BaseService.OnStop() + p.mconn.Stop() // stop everything and close the conn +} + +//--------------------------------------------------- +// Implements Peer - p.persistent = true +// ID returns the peer's ID - the hex encoded hash of its pubkey. +func (p *peer) ID() ID { + return PubKeyToID(p.PubKey()) +} + +// IsOutbound returns true if the connection is outbound, false otherwise. +func (p *peer) IsOutbound() bool { + return p.outbound } // IsPersistent returns true if the peer is persitent, false otherwise. @@ -166,7 +185,56 @@ func (p *peer) IsPersistent() bool { return p.persistent } -// HandshakeTimeout performs a handshake between a given node and the peer. +// NodeInfo returns a copy of the peer's NodeInfo. +func (p *peer) NodeInfo() NodeInfo { + return p.nodeInfo +} + +// Status returns the peer's ConnectionStatus. +func (p *peer) Status() ConnectionStatus { + return p.mconn.Status() +} + +// Send msg to the channel identified by chID byte. Returns false if the send +// queue is full after timeout, specified by MConnection. +func (p *peer) Send(chID byte, msg interface{}) bool { + if !p.IsRunning() { + // see Switch#Broadcast, where we fetch the list of peers and loop over + // them - while we're looping, one peer may be removed and stopped. + return false + } + return p.mconn.Send(chID, msg) +} + +// TrySend msg to the channel identified by chID byte. Immediately returns +// false if the send queue is full. +func (p *peer) TrySend(chID byte, msg interface{}) bool { + if !p.IsRunning() { + return false + } + return p.mconn.TrySend(chID, msg) +} + +// Get the data for a given key. +func (p *peer) Get(key string) interface{} { + return p.Data.Get(key) +} + +// Set sets the data for the given key. +func (p *peer) Set(key string, data interface{}) { + p.Data.Set(key, data) +} + +//--------------------------------------------------- +// methods used by the Switch + +// CloseConn should be called by the Switch if the peer was created but never started. +func (p *peer) CloseConn() { + p.conn.Close() // nolint: errcheck +} + +// HandshakeTimeout performs the Tendermint P2P handshake between a given node and the peer +// by exchanging their NodeInfo. It sets the received nodeInfo on the peer. // NOTE: blocking func (p *peer) HandshakeTimeout(ourNodeInfo NodeInfo, timeout time.Duration) error { // Set deadline for handshake so we don't block forever on conn.ReadFull @@ -220,51 +288,6 @@ func (p *peer) PubKey() crypto.PubKey { panic("Attempt to get peer's PubKey before calling Handshake") } -// OnStart implements BaseService. -func (p *peer) OnStart() error { - if err := p.BaseService.OnStart(); err != nil { - return err - } - err := p.mconn.Start() - return err -} - -// OnStop implements BaseService. -func (p *peer) OnStop() { - p.BaseService.OnStop() - p.mconn.Stop() -} - -// Connection returns underlying MConnection. -func (p *peer) Connection() *MConnection { - return p.mconn -} - -// IsOutbound returns true if the connection is outbound, false otherwise. -func (p *peer) IsOutbound() bool { - return p.outbound -} - -// Send msg to the channel identified by chID byte. Returns false if the send -// queue is full after timeout, specified by MConnection. -func (p *peer) Send(chID byte, msg interface{}) bool { - if !p.IsRunning() { - // see Switch#Broadcast, where we fetch the list of peers and loop over - // them - while we're looping, one peer may be removed and stopped. - return false - } - return p.mconn.Send(chID, msg) -} - -// TrySend msg to the channel identified by chID byte. Immediately returns -// false if the send queue is full. -func (p *peer) TrySend(chID byte, msg interface{}) bool { - if !p.IsRunning() { - return false - } - return p.mconn.TrySend(chID, msg) -} - // CanSend returns true if the send queue is not full, false otherwise. func (p *peer) CanSend(chID byte) bool { if !p.IsRunning() { @@ -282,35 +305,8 @@ func (p *peer) String() string { return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID()) } -// Equals reports whenever 2 peers are actually represent the same node. -func (p *peer) Equals(other Peer) bool { - return p.ID() == other.ID() -} - -// Get the data for a given key. -func (p *peer) Get(key string) interface{} { - return p.Data.Get(key) -} - -// Set sets the data for the given key. -func (p *peer) Set(key string, data interface{}) { - p.Data.Set(key, data) -} - -// ID returns the peer's ID - the hex encoded hash of its pubkey. -func (p *peer) ID() ID { - return PubKeyToID(p.PubKey()) -} - -// NodeInfo returns a copy of the peer's NodeInfo. -func (p *peer) NodeInfo() NodeInfo { - return p.nodeInfo -} - -// Status returns the peer's ConnectionStatus. -func (p *peer) Status() ConnectionStatus { - return p.mconn.Status() -} +//------------------------------------------------------------------ +// helper funcs func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) { conn, err := addr.DialTimeout(config.DialTimeout * time.Second) diff --git a/p2p/peer_test.go b/p2p/peer_test.go index f4a5363b5..d99fff5e4 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -30,7 +30,7 @@ func TestPeerBasic(t *testing.T) { assert.True(p.IsRunning()) assert.True(p.IsOutbound()) assert.False(p.IsPersistent()) - p.makePersistent() + p.persistent = true assert.True(p.IsPersistent()) assert.Equal(rp.Addr().String(), p.Addr().String()) assert.Equal(rp.PubKey(), p.PubKey()) @@ -86,7 +86,7 @@ func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) } reactorsByCh := map[byte]Reactor{0x01: NewTestReactor(chDescs, true)} pk := crypto.GenPrivKeyEd25519().Wrap() - p, err := newOutboundPeer(addr, reactorsByCh, chDescs, func(p Peer, r interface{}) {}, pk, config) + p, err := newOutboundPeer(addr, reactorsByCh, chDescs, func(p Peer, r interface{}) {}, pk, config, false) if err != nil { return nil, err } diff --git a/p2p/switch.go b/p2p/switch.go index 0ff64dbf9..eaf3c22e5 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -428,9 +428,7 @@ func (sw *Switch) listenerRoutine(l Listener) { func (sw *Switch) addInboundPeerWithConfig(conn net.Conn, config *PeerConfig) error { peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, config) if err != nil { - if err := conn.Close(); err != nil { - sw.Logger.Error("Error closing connection", "err", err) - } + peer.CloseConn() return err } peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr())) @@ -446,7 +444,7 @@ func (sw *Switch) addInboundPeerWithConfig(conn net.Conn, config *PeerConfig) er // add the peer. func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig, persistent bool) (Peer, error) { sw.Logger.Info("Dialing peer", "address", addr) - peer, err := newOutboundPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, config) + peer, err := newOutboundPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, config, persistent) if err != nil { sw.Logger.Error("Failed to dial peer", "address", addr, "err", err) return nil, err @@ -457,12 +455,10 @@ func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig if addr.ID == "" { peer.Logger.Info("Dialed peer with unknown ID - unable to authenticate", "addr", addr) } else if addr.ID != peer.ID() { + peer.CloseConn() return nil, fmt.Errorf("Failed to authenticate peer %v. Connected to peer with ID %s", addr, peer.ID()) } - if persistent { - peer.makePersistent() - } err = sw.addPeer(peer) if err != nil { sw.Logger.Error("Failed to add peer", "address", addr, "err", err) diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 7d61fa39a..a729698e9 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -236,7 +236,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { rp.Start() defer rp.Stop() - peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, DefaultPeerConfig()) + peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, DefaultPeerConfig(), false) require.Nil(err) err = sw.addPeer(peer) require.Nil(err) @@ -263,8 +263,7 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) { rp.Start() defer rp.Stop() - peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, DefaultPeerConfig()) - peer.makePersistent() + peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, DefaultPeerConfig(), true) require.Nil(err) err = sw.addPeer(peer) require.Nil(err)