Browse Source

a little more moving around

pull/1105/head
Ethan Buchman 7 years ago
parent
commit
08f84cd712
5 changed files with 72 additions and 56 deletions
  1. +1
    -1
      node/node.go
  2. +2
    -8
      p2p/peer.go
  3. +2
    -2
      p2p/pex_reactor.go
  4. +46
    -40
      p2p/switch.go
  5. +21
    -5
      p2p/types.go

+ 1
- 1
node/node.go View File

@ -544,9 +544,9 @@ func (n *Node) makeNodeInfo(pubKey crypto.PubKey) *p2p.NodeInfo {
} }
nodeInfo := &p2p.NodeInfo{ nodeInfo := &p2p.NodeInfo{
PubKey: pubKey, PubKey: pubKey,
Moniker: n.config.Moniker,
Network: n.genesisDoc.ChainID, Network: n.genesisDoc.ChainID,
Version: version.Version, Version: version.Version,
Moniker: n.config.Moniker,
Other: []string{ Other: []string{
cmn.Fmt("wire_version=%v", wire.Version), cmn.Fmt("wire_version=%v", wire.Version),
cmn.Fmt("p2p_version=%v", p2p.Version), cmn.Fmt("p2p_version=%v", p2p.Version),


+ 2
- 8
p2p/peer.go View File

@ -195,19 +195,13 @@ func (p *peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) er
return errors.Wrap(err2, "Error during handshake/read") return errors.Wrap(err2, "Error during handshake/read")
} }
if p.config.AuthEnc {
// Check that the professed PubKey matches the sconn's.
if !peerNodeInfo.PubKey.Equals(p.PubKey().Wrap()) {
return fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v",
peerNodeInfo.PubKey, p.PubKey())
}
}
// Remove deadline // Remove deadline
if err := p.conn.SetDeadline(time.Time{}); err != nil { if err := p.conn.SetDeadline(time.Time{}); err != nil {
return errors.Wrap(err, "Error removing deadline") return errors.Wrap(err, "Error removing deadline")
} }
// TODO: fix the peerNodeInfo.ListenAddr
p.nodeInfo = peerNodeInfo p.nodeInfo = peerNodeInfo
return nil return nil
} }


+ 2
- 2
p2p/pex_reactor.go View File

@ -115,7 +115,8 @@ func (r *PEXReactor) AddPeer(p Peer) {
r.RequestPEX(p) r.RequestPEX(p)
} }
} else { } else {
// For inbound connections, the peer is its own source
// For inbound connections, the peer is its own source,
// and its NodeInfo has already been validated
addr := p.NodeInfo().NetAddress() addr := p.NodeInfo().NetAddress()
r.book.AddAddress(addr, addr) r.book.AddAddress(addr, addr)
} }
@ -130,7 +131,6 @@ func (r *PEXReactor) RemovePeer(p Peer, reason interface{}) {
// Receive implements Reactor by handling incoming PEX messages. // Receive implements Reactor by handling incoming PEX messages.
func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) { func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) {
srcAddr := src.NodeInfo().NetAddress() srcAddr := src.NodeInfo().NetAddress()
r.IncrementMsgCountForPeer(srcAddr.ID) r.IncrementMsgCountForPeer(srcAddr.ID)
if r.ReachedMaxMsgCountForPeer(srcAddr.ID) { if r.ReachedMaxMsgCountForPeer(srcAddr.ID) {
r.Logger.Error("Maximum number of messages reached for peer", "peer", srcAddr) r.Logger.Error("Maximum number of messages reached for peer", "peer", srcAddr)


+ 46
- 40
p2p/switch.go View File

@ -359,38 +359,12 @@ func (sw *Switch) DialPeersAsync(addrBook *AddrBook, peers []string, persistent
return nil return nil
} }
// DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects successfully.
// DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects and authenticates successfully.
// If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails. // If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails.
func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (Peer, error) { func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (Peer, error) {
sw.dialing.Set(string(addr.ID), addr) sw.dialing.Set(string(addr.ID), addr)
defer sw.dialing.Delete(string(addr.ID)) defer sw.dialing.Delete(string(addr.ID))
sw.Logger.Info("Dialing peer", "address", addr)
peer, err := newOutboundPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, sw.peerConfig)
if err != nil {
sw.Logger.Error("Failed to dial peer", "address", addr, "err", err)
return nil, err
}
peer.SetLogger(sw.Logger.With("peer", addr))
// authenticate peer
if addr.ID == "" {
peer.Logger.Info("Dialed peer with unknown ID - unable to authenticate", "addr", addr)
} else if addr.ID != peer.ID() {
return nil, fmt.Errorf("Failed to authenticate peer %v. Connected to peer with ID %s", addr, peer.ID())
}
if persistent {
peer.makePersistent()
}
err = sw.addPeer(peer)
if err != nil {
sw.Logger.Error("Failed to add peer", "address", addr, "err", err)
peer.CloseConn()
return nil, err
}
sw.Logger.Info("Dialed and added peer", "address", addr, "peer", peer)
return peer, nil
return sw.addOutboundPeerWithConfig(addr, sw.peerConfig, persistent)
} }
// sleep for interval plus some random amount of ms on [0, dialRandomizerIntervalMilliseconds] // sleep for interval plus some random amount of ms on [0, dialRandomizerIntervalMilliseconds]
@ -451,10 +425,6 @@ func (sw *Switch) listenerRoutine(l Listener) {
sw.Logger.Info("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "err", err) sw.Logger.Info("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "err", err)
continue continue
} }
// NOTE: We don't yet have the listening port of the
// remote (if they have a listener at all).
// The peerHandshake will handle that.
} }
// cleanup // cleanup
@ -477,9 +447,40 @@ func (sw *Switch) addInboundPeerWithConfig(conn net.Conn, config *PeerConfig) er
return nil return nil
} }
// addPeer checks the given peer's validity, performs a handshake, and adds the
// peer to the switch and to all registered reactors.
// We already have an authenticated SecretConnection with the peer.
// dial the peer; make secret connection; authenticate against the dialed ID;
// add the peer.
func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig, persistent bool) (Peer, error) {
sw.Logger.Info("Dialing peer", "address", addr)
peer, err := newOutboundPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, config)
if err != nil {
sw.Logger.Error("Failed to dial peer", "address", addr, "err", err)
return nil, err
}
peer.SetLogger(sw.Logger.With("peer", addr))
// authenticate peer
if addr.ID == "" {
peer.Logger.Info("Dialed peer with unknown ID - unable to authenticate", "addr", addr)
} else if addr.ID != peer.ID() {
return nil, fmt.Errorf("Failed to authenticate peer %v. Connected to peer with ID %s", addr, peer.ID())
}
if persistent {
peer.makePersistent()
}
err = sw.addPeer(peer)
if err != nil {
sw.Logger.Error("Failed to add peer", "address", addr, "err", err)
peer.CloseConn()
return nil, err
}
sw.Logger.Info("Dialed and added peer", "address", addr, "peer", peer)
return peer, nil
}
// addPeer performs the Tendermint P2P handshake with a peer
// that already has a SecretConnection. If all goes well,
// it starts the peer and adds it to the switch.
// NOTE: This performs a blocking handshake before the peer is added. // NOTE: This performs a blocking handshake before the peer is added.
// NOTE: If error is returned, caller is responsible for calling peer.CloseConn() // NOTE: If error is returned, caller is responsible for calling peer.CloseConn()
func (sw *Switch) addPeer(peer *peer) error { func (sw *Switch) addPeer(peer *peer) error {
@ -488,6 +489,12 @@ func (sw *Switch) addPeer(peer *peer) error {
return ErrSwitchConnectToSelf return ErrSwitchConnectToSelf
} }
// Avoid duplicate
if sw.peers.Has(peer.ID()) {
return ErrSwitchDuplicatePeer
}
// Filter peer against white list // Filter peer against white list
if err := sw.FilterConnByAddr(peer.Addr()); err != nil { if err := sw.FilterConnByAddr(peer.Addr()); err != nil {
return err return err
@ -501,10 +508,9 @@ func (sw *Switch) addPeer(peer *peer) error {
return err return err
} }
// Avoid duplicate
if sw.peers.Has(peer.ID()) {
return ErrSwitchDuplicatePeer
// Validate the peers nodeInfo against the pubkey
if err := peer.NodeInfo().Validate(peer.PubKey()); err != nil {
return err
} }
// Check version, chain id // Check version, chain id
@ -512,7 +518,7 @@ func (sw *Switch) addPeer(peer *peer) error {
return err return err
} }
// Start peer
// All good. Start peer
if sw.IsRunning() { if sw.IsRunning() {
sw.startInitPeer(peer) sw.startInitPeer(peer)
} }


+ 21
- 5
p2p/types.go View File

@ -12,14 +12,30 @@ import (
const maxNodeInfoSize = 10240 // 10Kb const maxNodeInfoSize = 10240 // 10Kb
// NodeInfo is the basic node information exchanged // NodeInfo is the basic node information exchanged
// between two peers during the Tendermint P2P handshake
// between two peers during the Tendermint P2P handshake.
type NodeInfo struct { type NodeInfo struct {
// Authenticate
PubKey crypto.PubKey `json:"pub_key"` // authenticated pubkey PubKey crypto.PubKey `json:"pub_key"` // authenticated pubkey
Moniker string `json:"moniker"` // arbitrary moniker
Network string `json:"network"` // network/chain ID
ListenAddr string `json:"listen_addr"` // accepting incoming ListenAddr string `json:"listen_addr"` // accepting incoming
Version string `json:"version"` // major.minor.revision
Other []string `json:"other"` // other application specific data
// Check compatibility
Network string `json:"network"` // network/chain ID
Version string `json:"version"` // major.minor.revision
// Sanitize
Moniker string `json:"moniker"` // arbitrary moniker
Other []string `json:"other"` // other application specific data
}
// Validate checks the self-reported NodeInfo is safe.
// It returns an error if the info.PubKey doesn't match the given pubKey.
// TODO: constraints for Moniker/Other? Or is that for the UI ?
func (info *NodeInfo) Validate(pubKey crypto.PubKey) error {
if !info.PubKey.Equals(pubKey) {
return fmt.Errorf("info.PubKey (%v) doesn't match peer.PubKey (%v)",
info.PubKey, pubKey)
}
return nil
} }
// CONTRACT: two nodes are compatible if the major/minor versions match and network match // CONTRACT: two nodes are compatible if the major/minor versions match and network match


Loading…
Cancel
Save