From 452d10f368a626a47bf4f5e9736a6b1166c252e0 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 13 Jan 2018 17:25:51 -0500 Subject: [PATCH 1/6] cleanup switch --- p2p/addrbook.go | 13 ++ p2p/base_reactor.go | 35 +++ p2p/peer.go | 2 + p2p/switch.go | 504 +++++++++++++++++--------------------------- p2p/test_util.go | 111 ++++++++++ p2p/util.go | 15 -- 6 files changed, 355 insertions(+), 325 deletions(-) create mode 100644 p2p/base_reactor.go create mode 100644 p2p/test_util.go delete mode 100644 p2p/util.go diff --git a/p2p/addrbook.go b/p2p/addrbook.go index 8826ff1e0..7591c3074 100644 --- a/p2p/addrbook.go +++ b/p2p/addrbook.go @@ -5,6 +5,7 @@ package p2p import ( + "crypto/sha256" "encoding/binary" "encoding/json" "fmt" @@ -867,3 +868,15 @@ func (ka *knownAddress) isBad() bool { return false } + +//----------------------------------------------------------------------------- + +// doubleSha256 calculates sha256(sha256(b)) and returns the resulting bytes. +func doubleSha256(b []byte) []byte { + hasher := sha256.New() + hasher.Write(b) // nolint: errcheck, gas + sum := hasher.Sum(nil) + hasher.Reset() + hasher.Write(sum) // nolint: errcheck, gas + return hasher.Sum(nil) +} diff --git a/p2p/base_reactor.go b/p2p/base_reactor.go new file mode 100644 index 000000000..e8107d730 --- /dev/null +++ b/p2p/base_reactor.go @@ -0,0 +1,35 @@ +package p2p + +import cmn "github.com/tendermint/tmlibs/common" + +type Reactor interface { + cmn.Service // Start, Stop + + SetSwitch(*Switch) + GetChannels() []*ChannelDescriptor + AddPeer(peer Peer) + RemovePeer(peer Peer, reason interface{}) + Receive(chID byte, peer Peer, msgBytes []byte) // CONTRACT: msgBytes are not nil +} + +//-------------------------------------- + +type BaseReactor struct { + cmn.BaseService // Provides Start, Stop, .Quit + Switch *Switch +} + +func NewBaseReactor(name string, impl Reactor) *BaseReactor { + return &BaseReactor{ + BaseService: *cmn.NewBaseService(nil, name, impl), + Switch: nil, + } +} + +func (br *BaseReactor) SetSwitch(sw *Switch) { + br.Switch = sw +} +func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil } +func (_ *BaseReactor) AddPeer(peer Peer) {} +func (_ *BaseReactor) RemovePeer(peer Peer, reason interface{}) {} +func (_ *BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {} diff --git a/p2p/peer.go b/p2p/peer.go index 2f5dff78d..1f5192d50 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -99,6 +99,8 @@ func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs [] func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(Peer, interface{}), ourNodePrivKey crypto.PrivKey, config *PeerConfig) (*peer, error) { + // TODO: issue PoW challenge + return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config) } diff --git a/p2p/switch.go b/p2p/switch.go index 484649145..4ece1a58c 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -12,7 +12,6 @@ import ( crypto "github.com/tendermint/go-crypto" cfg "github.com/tendermint/tendermint/config" cmn "github.com/tendermint/tmlibs/common" - "github.com/tendermint/tmlibs/log" ) const ( @@ -31,46 +30,17 @@ const ( reconnectBackOffBaseSeconds = 3 ) -type Reactor interface { - cmn.Service // Start, Stop - - SetSwitch(*Switch) - GetChannels() []*ChannelDescriptor - AddPeer(peer Peer) - RemovePeer(peer Peer, reason interface{}) - Receive(chID byte, peer Peer, msgBytes []byte) // CONTRACT: msgBytes are not nil -} - -//-------------------------------------- - -type BaseReactor struct { - cmn.BaseService // Provides Start, Stop, .Quit - Switch *Switch -} - -func NewBaseReactor(name string, impl Reactor) *BaseReactor { - return &BaseReactor{ - BaseService: *cmn.NewBaseService(nil, name, impl), - Switch: nil, - } -} - -func (br *BaseReactor) SetSwitch(sw *Switch) { - br.Switch = sw -} -func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil } -func (_ *BaseReactor) AddPeer(peer Peer) {} -func (_ *BaseReactor) RemovePeer(peer Peer, reason interface{}) {} -func (_ *BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {} +var ( + ErrSwitchDuplicatePeer = errors.New("Duplicate peer") + ErrSwitchConnectToSelf = errors.New("Connect to self") +) //----------------------------------------------------------------------------- -/* -The `Switch` handles peer connections and exposes an API to receive incoming messages -on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one -or more `Channels`. So while sending outgoing messages is typically performed on the peer, -incoming messages are received on the reactor. -*/ +// `Switch` handles peer connections and exposes an API to receive incoming messages +// on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one +// or more `Channels`. So while sending outgoing messages is typically performed on the peer, +// incoming messages are received on the reactor. type Switch struct { cmn.BaseService @@ -91,11 +61,6 @@ type Switch struct { rng *rand.Rand // seed for randomizing dial times and orders } -var ( - ErrSwitchDuplicatePeer = errors.New("Duplicate peer") - ErrSwitchConnectToSelf = errors.New("Connect to self") -) - func NewSwitch(config *cfg.P2PConfig) *Switch { sw := &Switch{ config: config, @@ -122,6 +87,9 @@ func NewSwitch(config *cfg.P2PConfig) *Switch { return sw } +//--------------------------------------------------------------------- +// Switch setup + // AddReactor adds the given reactor to the switch. // NOTE: Not goroutine safe. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { @@ -193,6 +161,9 @@ func (sw *Switch) SetNodeKey(nodeKey *NodeKey) { } } +//--------------------------------------------------------------------- +// Service start/stop + // OnStart implements BaseService. It starts all the reactors, peers, and listeners. func (sw *Switch) OnStart() error { // Start reactors @@ -228,92 +199,125 @@ func (sw *Switch) OnStop() { } } -// addPeer checks the given peer's validity, performs a handshake, and adds the -// peer to the switch and to all registered reactors. -// NOTE: This performs a blocking handshake before the peer is added. -// NOTE: If error is returned, caller is responsible for calling peer.CloseConn() -func (sw *Switch) addPeer(peer *peer) error { - // Avoid self - if sw.nodeKey.ID() == peer.ID() { - return ErrSwitchConnectToSelf - } - - // Filter peer against white list - if err := sw.FilterConnByAddr(peer.Addr()); err != nil { - return err - } - if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil { - return err - } +//--------------------------------------------------------------------- +// Peers - if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second)); err != nil { - return err - } - - // Avoid duplicate - if sw.peers.Has(peer.ID()) { - return ErrSwitchDuplicatePeer +// Peers returns the set of peers that are connected to the switch. +func (sw *Switch) Peers() IPeerSet { + return sw.peers +} +// NumPeers returns the count of outbound/inbound and outbound-dialing peers. +func (sw *Switch) NumPeers() (outbound, inbound, dialing int) { + peers := sw.peers.List() + for _, peer := range peers { + if peer.IsOutbound() { + outbound++ + } else { + inbound++ + } } + dialing = sw.dialing.Size() + return +} - // Check version, chain id - if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo()); err != nil { - return err +// Broadcast runs a go routine for each attempted send, which will block +// trying to send for defaultSendTimeoutSeconds. Returns a channel +// which receives success values for each attempted send (false if times out). +// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved. +// TODO: Something more intelligent. +func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool { + successChan := make(chan bool, len(sw.peers.List())) + sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg) + for _, peer := range sw.peers.List() { + go func(peer Peer) { + success := peer.Send(chID, msg) + successChan <- success + }(peer) } + return successChan +} - // Start peer - if sw.IsRunning() { - sw.startInitPeer(peer) - } +// StopPeerForError disconnects from a peer due to external error. +// If the peer is persistent, it will attempt to reconnect. +// TODO: make record depending on reason. +func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) { + sw.Logger.Error("Stopping peer for error", "peer", peer, "err", reason) + sw.stopAndRemovePeer(peer, reason) - // Add the peer to .peers. - // We start it first so that a peer in the list is safe to Stop. - // It should not err since we already checked peers.Has(). - if err := sw.peers.Add(peer); err != nil { - return err + if peer.IsPersistent() { + go sw.reconnectToPeer(peer) } +} - sw.Logger.Info("Added peer", "peer", peer) - return nil +// StopPeerGracefully disconnects from a peer gracefully. +// TODO: handle graceful disconnects. +func (sw *Switch) StopPeerGracefully(peer Peer) { + sw.Logger.Info("Stopping peer gracefully") + sw.stopAndRemovePeer(peer, nil) } -// FilterConnByAddr returns an error if connecting to the given address is forbidden. -func (sw *Switch) FilterConnByAddr(addr net.Addr) error { - if sw.filterConnByAddr != nil { - return sw.filterConnByAddr(addr) +func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) { + sw.peers.Remove(peer) + peer.Stop() + for _, reactor := range sw.reactors { + reactor.RemovePeer(peer, reason) } - return nil } -// FilterConnByPubKey returns an error if connecting to the given public key is forbidden. -func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKey) error { - if sw.filterConnByPubKey != nil { - return sw.filterConnByPubKey(pubkey) - } - return nil +// reconnectToPeer tries to reconnect to the peer, first repeatedly +// with a fixed interval, then with exponential backoff. +// If no success after all that, it stops trying, and leaves it +// to the PEX/Addrbook to find the peer again +func (sw *Switch) reconnectToPeer(peer Peer) { + netAddr := peer.NodeInfo().NetAddress() + start := time.Now() + sw.Logger.Info("Reconnecting to peer", "peer", peer) + for i := 0; i < reconnectAttempts; i++ { + if !sw.IsRunning() { + return + } -} + peer, err := sw.DialPeerWithAddress(netAddr, true) + if err != nil { + sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer) + // sleep a set amount + sw.randomSleep(reconnectInterval) + continue + } else { + sw.Logger.Info("Reconnected to peer", "peer", peer) + return + } + } -// SetAddrFilter sets the function for filtering connections by address. -func (sw *Switch) SetAddrFilter(f func(net.Addr) error) { - sw.filterConnByAddr = f -} + sw.Logger.Error("Failed to reconnect to peer. Beginning exponential backoff", + "peer", peer, "elapsed", time.Since(start)) + for i := 0; i < reconnectBackOffAttempts; i++ { + if !sw.IsRunning() { + return + } -// SetPubKeyFilter sets the function for filtering connections by public key. -func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKey) error) { - sw.filterConnByPubKey = f + // sleep an exponentially increasing amount + sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i)) + sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second) + peer, err := sw.DialPeerWithAddress(netAddr, true) + if err != nil { + sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer) + continue + } else { + sw.Logger.Info("Reconnected to peer", "peer", peer) + return + } + } + sw.Logger.Error("Failed to reconnect to peer. Giving up", "peer", peer, "elapsed", time.Since(start)) } -func (sw *Switch) startInitPeer(peer *peer) { - err := peer.Start() // spawn send/recv routines - if err != nil { - // Should never happen - sw.Logger.Error("Error starting peer", "peer", peer, "err", err) - } +//--------------------------------------------------------------------- +// Dialing - for _, reactor := range sw.reactors { - reactor.AddPeer(peer) - } +// IsDialing returns true if the switch is currently dialing the given ID. +func (sw *Switch) IsDialing(id ID) bool { + return sw.dialing.Has(string(id)) } // DialPeersAsync dials a list of peers asynchronously in random order (optionally, making them persistent). @@ -355,12 +359,6 @@ func (sw *Switch) DialPeersAsync(addrBook *AddrBook, peers []string, persistent return nil } -// sleep for interval plus some random amount of ms on [0, dialRandomizerIntervalMilliseconds] -func (sw *Switch) randomSleep(interval time.Duration) { - r := time.Duration(sw.rng.Int63n(dialRandomizerIntervalMilliseconds)) * time.Millisecond - time.Sleep(r + interval) -} - // DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects successfully. // If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails. func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (Peer, error) { @@ -395,121 +393,44 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (Peer, return peer, nil } -// IsDialing returns true if the switch is currently dialing the given ID. -func (sw *Switch) IsDialing(id ID) bool { - return sw.dialing.Has(string(id)) -} - -// Broadcast runs a go routine for each attempted send, which will block -// trying to send for defaultSendTimeoutSeconds. Returns a channel -// which receives success values for each attempted send (false if times out). -// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved. -// TODO: Something more intelligent. -func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool { - successChan := make(chan bool, len(sw.peers.List())) - sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg) - for _, peer := range sw.peers.List() { - go func(peer Peer) { - success := peer.Send(chID, msg) - successChan <- success - }(peer) - } - return successChan -} - -// NumPeers returns the count of outbound/inbound and outbound-dialing peers. -func (sw *Switch) NumPeers() (outbound, inbound, dialing int) { - peers := sw.peers.List() - for _, peer := range peers { - if peer.IsOutbound() { - outbound++ - } else { - inbound++ - } - } - dialing = sw.dialing.Size() - return -} - -// Peers returns the set of peers that are connected to the switch. -func (sw *Switch) Peers() IPeerSet { - return sw.peers +// sleep for interval plus some random amount of ms on [0, dialRandomizerIntervalMilliseconds] +func (sw *Switch) randomSleep(interval time.Duration) { + r := time.Duration(sw.rng.Int63n(dialRandomizerIntervalMilliseconds)) * time.Millisecond + time.Sleep(r + interval) } -// StopPeerForError disconnects from a peer due to external error. -// If the peer is persistent, it will attempt to reconnect. -// TODO: make record depending on reason. -func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) { - sw.Logger.Error("Stopping peer for error", "peer", peer, "err", reason) - sw.stopAndRemovePeer(peer, reason) +//------------------------------------------------------------------------------------ +// Connection filtering - if peer.IsPersistent() { - go sw.reconnectToPeer(peer) +// FilterConnByAddr returns an error if connecting to the given address is forbidden. +func (sw *Switch) FilterConnByAddr(addr net.Addr) error { + if sw.filterConnByAddr != nil { + return sw.filterConnByAddr(addr) } + return nil } -// reconnectToPeer tries to reconnect to the peer, first repeatedly -// with a fixed interval, then with exponential backoff. -// If no success after all that, it stops trying, and leaves it -// to the PEX/Addrbook to find the peer again -func (sw *Switch) reconnectToPeer(peer Peer) { - netAddr := peer.NodeInfo().NetAddress() - start := time.Now() - sw.Logger.Info("Reconnecting to peer", "peer", peer) - for i := 0; i < reconnectAttempts; i++ { - if !sw.IsRunning() { - return - } - - peer, err := sw.DialPeerWithAddress(netAddr, true) - if err != nil { - sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer) - // sleep a set amount - sw.randomSleep(reconnectInterval) - continue - } else { - sw.Logger.Info("Reconnected to peer", "peer", peer) - return - } +// FilterConnByPubKey returns an error if connecting to the given public key is forbidden. +func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKey) error { + if sw.filterConnByPubKey != nil { + return sw.filterConnByPubKey(pubkey) } + return nil - sw.Logger.Error("Failed to reconnect to peer. Beginning exponential backoff", - "peer", peer, "elapsed", time.Since(start)) - for i := 0; i < reconnectBackOffAttempts; i++ { - if !sw.IsRunning() { - return - } - - // sleep an exponentially increasing amount - sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i)) - sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second) - peer, err := sw.DialPeerWithAddress(netAddr, true) - if err != nil { - sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer) - continue - } else { - sw.Logger.Info("Reconnected to peer", "peer", peer) - return - } - } - sw.Logger.Error("Failed to reconnect to peer. Giving up", "peer", peer, "elapsed", time.Since(start)) } -// StopPeerGracefully disconnects from a peer gracefully. -// TODO: handle graceful disconnects. -func (sw *Switch) StopPeerGracefully(peer Peer) { - sw.Logger.Info("Stopping peer gracefully") - sw.stopAndRemovePeer(peer, nil) +// SetAddrFilter sets the function for filtering connections by address. +func (sw *Switch) SetAddrFilter(f func(net.Addr) error) { + sw.filterConnByAddr = f } -func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) { - sw.peers.Remove(peer) - peer.Stop() - for _, reactor := range sw.reactors { - reactor.RemovePeer(peer, reason) - } +// SetPubKeyFilter sets the function for filtering connections by public key. +func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKey) error) { + sw.filterConnByPubKey = f } +//------------------------------------------------------------------------------------ + func (sw *Switch) listenerRoutine(l Listener) { for { inConn, ok := <-l.Connections() @@ -525,7 +446,7 @@ func (sw *Switch) listenerRoutine(l Listener) { } // New inbound connection! - err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig) + err := sw.addInboundPeerWithConfig(inConn, sw.peerConfig) if err != nil { sw.Logger.Info("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "err", err) continue @@ -539,119 +460,82 @@ func (sw *Switch) listenerRoutine(l Listener) { // cleanup } -//------------------------------------------------------------------ -// Connects switches via arbitrary net.Conn. Used for testing. - -// MakeConnectedSwitches returns n switches, connected according to the connect func. -// If connect==Connect2Switches, the switches will be fully connected. -// initSwitch defines how the i'th switch should be initialized (ie. with what reactors). -// NOTE: panics if any switch fails to start. -func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch { - switches := make([]*Switch, n) - for i := 0; i < n; i++ { - switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch) +func (sw *Switch) addInboundPeerWithConfig(conn net.Conn, config *PeerConfig) error { + peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, config) + if err != nil { + if err := conn.Close(); err != nil { + sw.Logger.Error("Error closing connection", "err", err) + } + return err + } + peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr())) + if err = sw.addPeer(peer); err != nil { + peer.CloseConn() + return err } - if err := StartSwitches(switches); err != nil { - panic(err) + return nil +} + +// addPeer checks the given peer's validity, performs a handshake, and adds the +// peer to the switch and to all registered reactors. +// We already have an authenticated SecretConnection with the peer. +// NOTE: This performs a blocking handshake before the peer is added. +// NOTE: If error is returned, caller is responsible for calling peer.CloseConn() +func (sw *Switch) addPeer(peer *peer) error { + // Avoid self + if sw.nodeKey.ID() == peer.ID() { + return ErrSwitchConnectToSelf } - for i := 0; i < n; i++ { - for j := i + 1; j < n; j++ { - connect(switches, i, j) - } + // Filter peer against white list + if err := sw.FilterConnByAddr(peer.Addr()); err != nil { + return err + } + if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil { + return err } - return switches -} + // Exchange NodeInfo with the peer + if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second)); err != nil { + return err + } -// Connect2Switches will connect switches i and j via net.Pipe(). -// Blocks until a connection is established. -// NOTE: caller ensures i and j are within bounds. -func Connect2Switches(switches []*Switch, i, j int) { - switchI := switches[i] - switchJ := switches[j] - c1, c2 := netPipe() - doneCh := make(chan struct{}) - go func() { - err := switchI.addPeerWithConnection(c1) - if err != nil { - panic(err) - } - doneCh <- struct{}{} - }() - go func() { - err := switchJ.addPeerWithConnection(c2) - if err != nil { - panic(err) - } - doneCh <- struct{}{} - }() - <-doneCh - <-doneCh -} + // Avoid duplicate + if sw.peers.Has(peer.ID()) { + return ErrSwitchDuplicatePeer -// StartSwitches calls sw.Start() for each given switch. -// It returns the first encountered error. -func StartSwitches(switches []*Switch) error { - for _, s := range switches { - err := s.Start() // start switch and reactors - if err != nil { - return err - } } - return nil -} -func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch { - // new switch, add reactors - // TODO: let the config be passed in? - nodeKey := &NodeKey{ - PrivKey: crypto.GenPrivKeyEd25519().Wrap(), + // Check version, chain id + if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo()); err != nil { + return err } - s := initSwitch(i, NewSwitch(cfg)) - s.SetNodeInfo(&NodeInfo{ - PubKey: nodeKey.PubKey(), - Moniker: cmn.Fmt("switch%d", i), - Network: network, - Version: version, - ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023), - }) - s.SetNodeKey(nodeKey) - s.SetLogger(log.TestingLogger()) - return s -} -func (sw *Switch) addPeerWithConnection(conn net.Conn) error { - peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, sw.peerConfig) - if err != nil { - if err := conn.Close(); err != nil { - sw.Logger.Error("Error closing connection", "err", err) - } - return err + // Start peer + if sw.IsRunning() { + sw.startInitPeer(peer) } - peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr())) - if err = sw.addPeer(peer); err != nil { - peer.CloseConn() + + // Add the peer to .peers. + // We start it first so that a peer in the list is safe to Stop. + // It should not err since we already checked peers.Has(). + if err := sw.peers.Add(peer); err != nil { return err } + sw.Logger.Info("Added peer", "peer", peer) return nil } -func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error { - peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, config) +func (sw *Switch) startInitPeer(peer *peer) { + err := peer.Start() // spawn send/recv routines if err != nil { - if err := conn.Close(); err != nil { - sw.Logger.Error("Error closing connection", "err", err) - } - return err - } - peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr())) - if err = sw.addPeer(peer); err != nil { - peer.CloseConn() - return err + // Should never happen + sw.Logger.Error("Error starting peer", "peer", peer, "err", err) } - return nil + for _, reactor := range sw.reactors { + reactor.AddPeer(peer) + } } diff --git a/p2p/test_util.go b/p2p/test_util.go new file mode 100644 index 000000000..9c6892f16 --- /dev/null +++ b/p2p/test_util.go @@ -0,0 +1,111 @@ +package p2p + +import ( + "math/rand" + "net" + + crypto "github.com/tendermint/go-crypto" + cfg "github.com/tendermint/tendermint/config" + cmn "github.com/tendermint/tmlibs/common" + "github.com/tendermint/tmlibs/log" +) + +//------------------------------------------------------------------ +// Connects switches via arbitrary net.Conn. Used for testing. + +// MakeConnectedSwitches returns n switches, connected according to the connect func. +// If connect==Connect2Switches, the switches will be fully connected. +// initSwitch defines how the i'th switch should be initialized (ie. with what reactors). +// NOTE: panics if any switch fails to start. +func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch { + switches := make([]*Switch, n) + for i := 0; i < n; i++ { + switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch) + } + + if err := StartSwitches(switches); err != nil { + panic(err) + } + + for i := 0; i < n; i++ { + for j := i + 1; j < n; j++ { + connect(switches, i, j) + } + } + + return switches +} + +// Connect2Switches will connect switches i and j via net.Pipe(). +// Blocks until a connection is established. +// NOTE: caller ensures i and j are within bounds. +func Connect2Switches(switches []*Switch, i, j int) { + switchI := switches[i] + switchJ := switches[j] + c1, c2 := netPipe() + doneCh := make(chan struct{}) + go func() { + err := switchI.addPeerWithConnection(c1) + if err != nil { + panic(err) + } + doneCh <- struct{}{} + }() + go func() { + err := switchJ.addPeerWithConnection(c2) + if err != nil { + panic(err) + } + doneCh <- struct{}{} + }() + <-doneCh + <-doneCh +} + +func (sw *Switch) addPeerWithConnection(conn net.Conn) error { + peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, sw.peerConfig) + if err != nil { + if err := conn.Close(); err != nil { + sw.Logger.Error("Error closing connection", "err", err) + } + return err + } + peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr())) + if err = sw.addPeer(peer); err != nil { + peer.CloseConn() + return err + } + + return nil +} + +// StartSwitches calls sw.Start() for each given switch. +// It returns the first encountered error. +func StartSwitches(switches []*Switch) error { + for _, s := range switches { + err := s.Start() // start switch and reactors + if err != nil { + return err + } + } + return nil +} + +func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch { + // new switch, add reactors + // TODO: let the config be passed in? + nodeKey := &NodeKey{ + PrivKey: crypto.GenPrivKeyEd25519().Wrap(), + } + s := initSwitch(i, NewSwitch(cfg)) + s.SetNodeInfo(&NodeInfo{ + PubKey: nodeKey.PubKey(), + Moniker: cmn.Fmt("switch%d", i), + Network: network, + Version: version, + ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023), + }) + s.SetNodeKey(nodeKey) + s.SetLogger(log.TestingLogger()) + return s +} diff --git a/p2p/util.go b/p2p/util.go deleted file mode 100644 index a4c3ad58b..000000000 --- a/p2p/util.go +++ /dev/null @@ -1,15 +0,0 @@ -package p2p - -import ( - "crypto/sha256" -) - -// doubleSha256 calculates sha256(sha256(b)) and returns the resulting bytes. -func doubleSha256(b []byte) []byte { - hasher := sha256.New() - hasher.Write(b) // nolint: errcheck, gas - sum := hasher.Sum(nil) - hasher.Reset() - hasher.Write(sum) // nolint: errcheck, gas - return hasher.Sum(nil) -} From 08f84cd712e02fb4c81094e50d2c88112d571a10 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 13 Jan 2018 23:48:41 -0500 Subject: [PATCH 2/6] a little more moving around --- node/node.go | 2 +- p2p/peer.go | 10 ++---- p2p/pex_reactor.go | 4 +-- p2p/switch.go | 86 +++++++++++++++++++++++++--------------------- p2p/types.go | 26 +++++++++++--- 5 files changed, 72 insertions(+), 56 deletions(-) diff --git a/node/node.go b/node/node.go index 5535b1e16..2990ba9d7 100644 --- a/node/node.go +++ b/node/node.go @@ -544,9 +544,9 @@ func (n *Node) makeNodeInfo(pubKey crypto.PubKey) *p2p.NodeInfo { } nodeInfo := &p2p.NodeInfo{ PubKey: pubKey, - Moniker: n.config.Moniker, Network: n.genesisDoc.ChainID, Version: version.Version, + Moniker: n.config.Moniker, Other: []string{ cmn.Fmt("wire_version=%v", wire.Version), cmn.Fmt("p2p_version=%v", p2p.Version), diff --git a/p2p/peer.go b/p2p/peer.go index 1f5192d50..9e434b975 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -195,19 +195,13 @@ func (p *peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) er return errors.Wrap(err2, "Error during handshake/read") } - if p.config.AuthEnc { - // Check that the professed PubKey matches the sconn's. - if !peerNodeInfo.PubKey.Equals(p.PubKey().Wrap()) { - return fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v", - peerNodeInfo.PubKey, p.PubKey()) - } - } - // Remove deadline if err := p.conn.SetDeadline(time.Time{}); err != nil { return errors.Wrap(err, "Error removing deadline") } + // TODO: fix the peerNodeInfo.ListenAddr + p.nodeInfo = peerNodeInfo return nil } diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 5aa63db71..1903a460f 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -115,7 +115,8 @@ func (r *PEXReactor) AddPeer(p Peer) { r.RequestPEX(p) } } else { - // For inbound connections, the peer is its own source + // For inbound connections, the peer is its own source, + // and its NodeInfo has already been validated addr := p.NodeInfo().NetAddress() r.book.AddAddress(addr, addr) } @@ -130,7 +131,6 @@ func (r *PEXReactor) RemovePeer(p Peer, reason interface{}) { // Receive implements Reactor by handling incoming PEX messages. func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) { srcAddr := src.NodeInfo().NetAddress() - r.IncrementMsgCountForPeer(srcAddr.ID) if r.ReachedMaxMsgCountForPeer(srcAddr.ID) { r.Logger.Error("Maximum number of messages reached for peer", "peer", srcAddr) diff --git a/p2p/switch.go b/p2p/switch.go index 4ece1a58c..05edb8c19 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -359,38 +359,12 @@ func (sw *Switch) DialPeersAsync(addrBook *AddrBook, peers []string, persistent return nil } -// DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects successfully. +// DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects and authenticates successfully. // If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails. func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (Peer, error) { sw.dialing.Set(string(addr.ID), addr) defer sw.dialing.Delete(string(addr.ID)) - - sw.Logger.Info("Dialing peer", "address", addr) - peer, err := newOutboundPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, sw.peerConfig) - if err != nil { - sw.Logger.Error("Failed to dial peer", "address", addr, "err", err) - return nil, err - } - peer.SetLogger(sw.Logger.With("peer", addr)) - - // authenticate peer - if addr.ID == "" { - peer.Logger.Info("Dialed peer with unknown ID - unable to authenticate", "addr", addr) - } else if addr.ID != peer.ID() { - return nil, fmt.Errorf("Failed to authenticate peer %v. Connected to peer with ID %s", addr, peer.ID()) - } - - if persistent { - peer.makePersistent() - } - err = sw.addPeer(peer) - if err != nil { - sw.Logger.Error("Failed to add peer", "address", addr, "err", err) - peer.CloseConn() - return nil, err - } - sw.Logger.Info("Dialed and added peer", "address", addr, "peer", peer) - return peer, nil + return sw.addOutboundPeerWithConfig(addr, sw.peerConfig, persistent) } // sleep for interval plus some random amount of ms on [0, dialRandomizerIntervalMilliseconds] @@ -451,10 +425,6 @@ func (sw *Switch) listenerRoutine(l Listener) { sw.Logger.Info("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "err", err) continue } - - // NOTE: We don't yet have the listening port of the - // remote (if they have a listener at all). - // The peerHandshake will handle that. } // cleanup @@ -477,9 +447,40 @@ func (sw *Switch) addInboundPeerWithConfig(conn net.Conn, config *PeerConfig) er return nil } -// addPeer checks the given peer's validity, performs a handshake, and adds the -// peer to the switch and to all registered reactors. -// We already have an authenticated SecretConnection with the peer. +// dial the peer; make secret connection; authenticate against the dialed ID; +// add the peer. +func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig, persistent bool) (Peer, error) { + sw.Logger.Info("Dialing peer", "address", addr) + peer, err := newOutboundPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, config) + if err != nil { + sw.Logger.Error("Failed to dial peer", "address", addr, "err", err) + return nil, err + } + peer.SetLogger(sw.Logger.With("peer", addr)) + + // authenticate peer + if addr.ID == "" { + peer.Logger.Info("Dialed peer with unknown ID - unable to authenticate", "addr", addr) + } else if addr.ID != peer.ID() { + return nil, fmt.Errorf("Failed to authenticate peer %v. Connected to peer with ID %s", addr, peer.ID()) + } + + if persistent { + peer.makePersistent() + } + err = sw.addPeer(peer) + if err != nil { + sw.Logger.Error("Failed to add peer", "address", addr, "err", err) + peer.CloseConn() + return nil, err + } + sw.Logger.Info("Dialed and added peer", "address", addr, "peer", peer) + return peer, nil +} + +// addPeer performs the Tendermint P2P handshake with a peer +// that already has a SecretConnection. If all goes well, +// it starts the peer and adds it to the switch. // NOTE: This performs a blocking handshake before the peer is added. // NOTE: If error is returned, caller is responsible for calling peer.CloseConn() func (sw *Switch) addPeer(peer *peer) error { @@ -488,6 +489,12 @@ func (sw *Switch) addPeer(peer *peer) error { return ErrSwitchConnectToSelf } + // Avoid duplicate + if sw.peers.Has(peer.ID()) { + return ErrSwitchDuplicatePeer + + } + // Filter peer against white list if err := sw.FilterConnByAddr(peer.Addr()); err != nil { return err @@ -501,10 +508,9 @@ func (sw *Switch) addPeer(peer *peer) error { return err } - // Avoid duplicate - if sw.peers.Has(peer.ID()) { - return ErrSwitchDuplicatePeer - + // Validate the peers nodeInfo against the pubkey + if err := peer.NodeInfo().Validate(peer.PubKey()); err != nil { + return err } // Check version, chain id @@ -512,7 +518,7 @@ func (sw *Switch) addPeer(peer *peer) error { return err } - // Start peer + // All good. Start peer if sw.IsRunning() { sw.startInitPeer(peer) } diff --git a/p2p/types.go b/p2p/types.go index 2aec521b2..d4adb9eea 100644 --- a/p2p/types.go +++ b/p2p/types.go @@ -12,14 +12,30 @@ import ( const maxNodeInfoSize = 10240 // 10Kb // NodeInfo is the basic node information exchanged -// between two peers during the Tendermint P2P handshake +// between two peers during the Tendermint P2P handshake. type NodeInfo struct { + // Authenticate PubKey crypto.PubKey `json:"pub_key"` // authenticated pubkey - Moniker string `json:"moniker"` // arbitrary moniker - Network string `json:"network"` // network/chain ID ListenAddr string `json:"listen_addr"` // accepting incoming - Version string `json:"version"` // major.minor.revision - Other []string `json:"other"` // other application specific data + + // Check compatibility + Network string `json:"network"` // network/chain ID + Version string `json:"version"` // major.minor.revision + + // Sanitize + Moniker string `json:"moniker"` // arbitrary moniker + Other []string `json:"other"` // other application specific data +} + +// Validate checks the self-reported NodeInfo is safe. +// It returns an error if the info.PubKey doesn't match the given pubKey. +// TODO: constraints for Moniker/Other? Or is that for the UI ? +func (info *NodeInfo) Validate(pubKey crypto.PubKey) error { + if !info.PubKey.Equals(pubKey) { + return fmt.Errorf("info.PubKey (%v) doesn't match peer.PubKey (%v)", + info.PubKey, pubKey) + } + return nil } // CONTRACT: two nodes are compatible if the major/minor versions match and network match From 8b74a8d6ac945c86a520bb98d415f0e7bec87561 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 14 Jan 2018 00:10:29 -0500 Subject: [PATCH 3/6] NodeInfo not a pointer --- node/node.go | 6 +++--- p2p/peer.go | 27 +++++++++++---------------- p2p/peer_set_test.go | 2 +- p2p/peer_test.go | 4 ++-- p2p/pex_reactor_test.go | 2 +- p2p/switch.go | 13 ++++--------- p2p/test_util.go | 2 +- p2p/types.go | 12 ++++++------ rpc/core/net.go | 2 +- rpc/core/pipe.go | 2 +- rpc/core/types/responses.go | 4 ++-- 11 files changed, 33 insertions(+), 43 deletions(-) diff --git a/node/node.go b/node/node.go index 2990ba9d7..4db7f44d4 100644 --- a/node/node.go +++ b/node/node.go @@ -537,12 +537,12 @@ func (n *Node) ProxyApp() proxy.AppConns { return n.proxyApp } -func (n *Node) makeNodeInfo(pubKey crypto.PubKey) *p2p.NodeInfo { +func (n *Node) makeNodeInfo(pubKey crypto.PubKey) p2p.NodeInfo { txIndexerStatus := "on" if _, ok := n.txIndexer.(*null.TxIndex); ok { txIndexerStatus = "off" } - nodeInfo := &p2p.NodeInfo{ + nodeInfo := p2p.NodeInfo{ PubKey: pubKey, Network: n.genesisDoc.ChainID, Version: version.Version, @@ -574,7 +574,7 @@ func (n *Node) makeNodeInfo(pubKey crypto.PubKey) *p2p.NodeInfo { //------------------------------------------------------------------------------ // NodeInfo returns the Node's Info from the Switch. -func (n *Node) NodeInfo() *p2p.NodeInfo { +func (n *Node) NodeInfo() p2p.NodeInfo { return n.sw.NodeInfo() } diff --git a/p2p/peer.go b/p2p/peer.go index 9e434b975..57718b6bf 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -1,7 +1,6 @@ package p2p import ( - "encoding/hex" "fmt" "net" "time" @@ -21,7 +20,7 @@ type Peer interface { ID() ID IsOutbound() bool IsPersistent() bool - NodeInfo() *NodeInfo + NodeInfo() NodeInfo Status() ConnectionStatus Send(byte, interface{}) bool @@ -47,7 +46,7 @@ type peer struct { persistent bool config *PeerConfig - nodeInfo *NodeInfo + nodeInfo NodeInfo Data *cmn.CMap // User data. } @@ -128,7 +127,7 @@ func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[ } } - // Key and NodeInfo are set after Handshake + // NodeInfo is set after Handshake p := &peer{ outbound: outbound, conn: conn, @@ -169,23 +168,23 @@ func (p *peer) IsPersistent() bool { // HandshakeTimeout performs a handshake between a given node and the peer. // NOTE: blocking -func (p *peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) error { +func (p *peer) HandshakeTimeout(ourNodeInfo NodeInfo, timeout time.Duration) error { // Set deadline for handshake so we don't block forever on conn.ReadFull if err := p.conn.SetDeadline(time.Now().Add(timeout)); err != nil { return errors.Wrap(err, "Error setting deadline") } - var peerNodeInfo = new(NodeInfo) + var peerNodeInfo NodeInfo var err1 error var err2 error cmn.Parallel( func() { var n int - wire.WriteBinary(ourNodeInfo, p.conn, &n, &err1) + wire.WriteBinary(&ourNodeInfo, p.conn, &n, &err1) }, func() { var n int - wire.ReadBinary(peerNodeInfo, p.conn, maxNodeInfoSize, &n, &err2) + wire.ReadBinary(&peerNodeInfo, p.conn, maxNodeInfoSize, &n, &err2) p.Logger.Info("Peer handshake", "peerNodeInfo", peerNodeInfo) }) if err1 != nil { @@ -213,7 +212,7 @@ func (p *peer) Addr() net.Addr { // PubKey returns peer's public key. func (p *peer) PubKey() crypto.PubKey { - if p.NodeInfo() != nil { + if !p.nodeInfo.PubKey.Empty() { return p.nodeInfo.PubKey } else if p.config.AuthEnc { return p.conn.(*SecretConnection).RemotePubKey() @@ -300,16 +299,12 @@ func (p *peer) Set(key string, data interface{}) { // ID returns the peer's ID - the hex encoded hash of its pubkey. func (p *peer) ID() ID { - return ID(hex.EncodeToString(p.PubKey().Address())) + return PubKeyToID(p.PubKey()) } // NodeInfo returns a copy of the peer's NodeInfo. -func (p *peer) NodeInfo() *NodeInfo { - if p.nodeInfo == nil { - return nil - } - n := *p.nodeInfo // copy - return &n +func (p *peer) NodeInfo() NodeInfo { + return p.nodeInfo } // Status returns the peer's ConnectionStatus. diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index e50bb384b..e906eb8e7 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -14,7 +14,7 @@ import ( // Returns an empty dummy peer func randPeer() *peer { return &peer{ - nodeInfo: &NodeInfo{ + nodeInfo: NodeInfo{ ListenAddr: cmn.Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256), PubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(), }, diff --git a/p2p/peer_test.go b/p2p/peer_test.go index aafe8d7a9..f4a5363b5 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -90,7 +90,7 @@ func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) if err != nil { return nil, err } - err = p.HandshakeTimeout(&NodeInfo{ + err = p.HandshakeTimeout(NodeInfo{ PubKey: pk.PubKey(), Moniker: "host_peer", Network: "testing", @@ -141,7 +141,7 @@ func (p *remotePeer) accept(l net.Listener) { if err != nil { golog.Fatalf("Failed to create a peer: %+v", err) } - err = peer.HandshakeTimeout(&NodeInfo{ + err = peer.HandshakeTimeout(NodeInfo{ PubKey: p.PrivKey.PubKey(), Moniker: "remote_peer", Network: "testing", diff --git a/p2p/pex_reactor_test.go b/p2p/pex_reactor_test.go index 296b18867..20c8b823a 100644 --- a/p2p/pex_reactor_test.go +++ b/p2p/pex_reactor_test.go @@ -242,7 +242,7 @@ func createRoutableAddr() (addr string, netAddr *NetAddress) { func createRandomPeer(outbound bool) *peer { addr, netAddr := createRoutableAddr() p := &peer{ - nodeInfo: &NodeInfo{ + nodeInfo: NodeInfo{ ListenAddr: netAddr.String(), PubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(), }, diff --git a/p2p/switch.go b/p2p/switch.go index 05edb8c19..0ff64dbf9 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -52,8 +52,8 @@ type Switch struct { reactorsByCh map[byte]Reactor peers *PeerSet dialing *cmn.CMap - nodeInfo *NodeInfo // our node info - nodeKey *NodeKey // our node privkey + nodeInfo NodeInfo // our node info + nodeKey *NodeKey // our node privkey filterConnByAddr func(net.Addr) error filterConnByPubKey func(crypto.PubKey) error @@ -70,7 +70,6 @@ func NewSwitch(config *cfg.P2PConfig) *Switch { reactorsByCh: make(map[byte]Reactor), peers: NewPeerSet(), dialing: cmn.NewCMap(), - nodeInfo: nil, } // Ensure we have a completely undeterministic PRNG. cmd.RandInt64() draws @@ -141,24 +140,20 @@ func (sw *Switch) IsListening() bool { // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes. // NOTE: Not goroutine safe. -func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) { +func (sw *Switch) SetNodeInfo(nodeInfo NodeInfo) { sw.nodeInfo = nodeInfo } // NodeInfo returns the switch's NodeInfo. // NOTE: Not goroutine safe. -func (sw *Switch) NodeInfo() *NodeInfo { +func (sw *Switch) NodeInfo() NodeInfo { return sw.nodeInfo } // SetNodeKey sets the switch's private key for authenticated encryption. -// NOTE: Overwrites sw.nodeInfo.PubKey. // NOTE: Not goroutine safe. func (sw *Switch) SetNodeKey(nodeKey *NodeKey) { sw.nodeKey = nodeKey - if sw.nodeInfo != nil { - sw.nodeInfo.PubKey = nodeKey.PubKey() - } } //--------------------------------------------------------------------- diff --git a/p2p/test_util.go b/p2p/test_util.go index 9c6892f16..18167e209 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -98,7 +98,7 @@ func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch f PrivKey: crypto.GenPrivKeyEd25519().Wrap(), } s := initSwitch(i, NewSwitch(cfg)) - s.SetNodeInfo(&NodeInfo{ + s.SetNodeInfo(NodeInfo{ PubKey: nodeKey.PubKey(), Moniker: cmn.Fmt("switch%d", i), Network: network, diff --git a/p2p/types.go b/p2p/types.go index d4adb9eea..d93adc9b6 100644 --- a/p2p/types.go +++ b/p2p/types.go @@ -30,7 +30,7 @@ type NodeInfo struct { // Validate checks the self-reported NodeInfo is safe. // It returns an error if the info.PubKey doesn't match the given pubKey. // TODO: constraints for Moniker/Other? Or is that for the UI ? -func (info *NodeInfo) Validate(pubKey crypto.PubKey) error { +func (info NodeInfo) Validate(pubKey crypto.PubKey) error { if !info.PubKey.Equals(pubKey) { return fmt.Errorf("info.PubKey (%v) doesn't match peer.PubKey (%v)", info.PubKey, pubKey) @@ -39,7 +39,7 @@ func (info *NodeInfo) Validate(pubKey crypto.PubKey) error { } // CONTRACT: two nodes are compatible if the major/minor versions match and network match -func (info *NodeInfo) CompatibleWith(other *NodeInfo) error { +func (info NodeInfo) CompatibleWith(other NodeInfo) error { iMajor, iMinor, _, iErr := splitVersion(info.Version) oMajor, oMinor, _, oErr := splitVersion(other.Version) @@ -71,11 +71,11 @@ func (info *NodeInfo) CompatibleWith(other *NodeInfo) error { return nil } -func (info *NodeInfo) ID() ID { +func (info NodeInfo) ID() ID { return PubKeyToID(info.PubKey) } -func (info *NodeInfo) NetAddress() *NetAddress { +func (info NodeInfo) NetAddress() *NetAddress { id := PubKeyToID(info.PubKey) addr := info.ListenAddr netAddr, err := NewNetAddressString(IDAddressString(id, addr)) @@ -85,12 +85,12 @@ func (info *NodeInfo) NetAddress() *NetAddress { return netAddr } -func (info *NodeInfo) ListenHost() string { +func (info NodeInfo) ListenHost() string { host, _, _ := net.SplitHostPort(info.ListenAddr) // nolint: errcheck, gas return host } -func (info *NodeInfo) ListenPort() int { +func (info NodeInfo) ListenPort() int { _, port, _ := net.SplitHostPort(info.ListenAddr) // nolint: errcheck, gas port_i, err := strconv.Atoi(port) if err != nil { diff --git a/rpc/core/net.go b/rpc/core/net.go index af52c81cc..14e7389d5 100644 --- a/rpc/core/net.go +++ b/rpc/core/net.go @@ -41,7 +41,7 @@ func NetInfo() (*ctypes.ResultNetInfo, error) { peers := []ctypes.Peer{} for _, peer := range p2pSwitch.Peers().List() { peers = append(peers, ctypes.Peer{ - NodeInfo: *peer.NodeInfo(), + NodeInfo: peer.NodeInfo(), IsOutbound: peer.IsOutbound(), ConnectionStatus: peer.Status(), }) diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index 6d67fd898..301977ac3 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -30,7 +30,7 @@ type P2P interface { Listeners() []p2p.Listener Peers() p2p.IPeerSet NumPeers() (outbound, inbound, dialig int) - NodeInfo() *p2p.NodeInfo + NodeInfo() p2p.NodeInfo IsListening() bool DialPeersAsync(*p2p.AddrBook, []string, bool) error } diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index a6bb30682..233b44e93 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -54,7 +54,7 @@ func NewResultCommit(header *types.Header, commit *types.Commit, } type ResultStatus struct { - NodeInfo *p2p.NodeInfo `json:"node_info"` + NodeInfo p2p.NodeInfo `json:"node_info"` PubKey crypto.PubKey `json:"pub_key"` LatestBlockHash data.Bytes `json:"latest_block_hash"` LatestAppHash data.Bytes `json:"latest_app_hash"` @@ -64,7 +64,7 @@ type ResultStatus struct { } func (s *ResultStatus) TxIndexEnabled() bool { - if s == nil || s.NodeInfo == nil { + if s == nil { return false } for _, s := range s.NodeInfo.Other { From f9e4f6eb6ba51d0cac2186bccdd7a9f320fe3c0e Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 14 Jan 2018 00:44:16 -0500 Subject: [PATCH 4/6] reorder peer.go methods --- p2p/peer.go | 180 ++++++++++++++++++++++----------------------- p2p/peer_test.go | 4 +- p2p/switch.go | 10 +-- p2p/switch_test.go | 5 +- 4 files changed, 95 insertions(+), 104 deletions(-) diff --git a/p2p/peer.go b/p2p/peer.go index 57718b6bf..596b92168 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -17,10 +17,10 @@ import ( type Peer interface { cmn.Service - ID() ID - IsOutbound() bool - IsPersistent() bool - NodeInfo() NodeInfo + ID() ID // peer's cryptographic ID + IsOutbound() bool // did we dial the peer + IsPersistent() bool // do we redial this peer when we disconnect + NodeInfo() NodeInfo // peer's info Status() ConnectionStatus Send(byte, interface{}) bool @@ -30,9 +30,9 @@ type Peer interface { Get(string) interface{} } -// Peer could be marked as persistent, in which case you can use -// Redial function to reconnect. Note that inbound peers can't be -// made persistent. They should be made persistent on the other end. +//---------------------------------------------------------- + +// peer implements Peer. // // Before using a peer, you will need to perform a handshake on connection. type peer struct { @@ -77,7 +77,7 @@ func DefaultPeerConfig() *PeerConfig { } func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, - onPeerError func(Peer, interface{}), ourNodePrivKey crypto.PrivKey, config *PeerConfig) (*peer, error) { + onPeerError func(Peer, interface{}), ourNodePrivKey crypto.PrivKey, config *PeerConfig, persistent bool) (*peer, error) { conn, err := dial(addr, config) if err != nil { @@ -91,6 +91,7 @@ func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs [] } return nil, err } + peer.persistent = persistent return peer, nil } @@ -142,23 +143,41 @@ func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[ return p, nil } +//--------------------------------------------------- +// Implements cmn.Service + +// SetLogger implements BaseService. func (p *peer) SetLogger(l log.Logger) { p.Logger = l p.mconn.SetLogger(l) } -// CloseConn should be used when the peer was created, but never started. -func (p *peer) CloseConn() { - p.conn.Close() // nolint: errcheck +// OnStart implements BaseService. +func (p *peer) OnStart() error { + if err := p.BaseService.OnStart(); err != nil { + return err + } + err := p.mconn.Start() + return err } -// makePersistent marks the peer as persistent. -func (p *peer) makePersistent() { - if !p.outbound { - panic("inbound peers can't be made persistent") - } +// OnStop implements BaseService. +func (p *peer) OnStop() { + p.BaseService.OnStop() + p.mconn.Stop() // stop everything and close the conn +} + +//--------------------------------------------------- +// Implements Peer - p.persistent = true +// ID returns the peer's ID - the hex encoded hash of its pubkey. +func (p *peer) ID() ID { + return PubKeyToID(p.PubKey()) +} + +// IsOutbound returns true if the connection is outbound, false otherwise. +func (p *peer) IsOutbound() bool { + return p.outbound } // IsPersistent returns true if the peer is persitent, false otherwise. @@ -166,7 +185,56 @@ func (p *peer) IsPersistent() bool { return p.persistent } -// HandshakeTimeout performs a handshake between a given node and the peer. +// NodeInfo returns a copy of the peer's NodeInfo. +func (p *peer) NodeInfo() NodeInfo { + return p.nodeInfo +} + +// Status returns the peer's ConnectionStatus. +func (p *peer) Status() ConnectionStatus { + return p.mconn.Status() +} + +// Send msg to the channel identified by chID byte. Returns false if the send +// queue is full after timeout, specified by MConnection. +func (p *peer) Send(chID byte, msg interface{}) bool { + if !p.IsRunning() { + // see Switch#Broadcast, where we fetch the list of peers and loop over + // them - while we're looping, one peer may be removed and stopped. + return false + } + return p.mconn.Send(chID, msg) +} + +// TrySend msg to the channel identified by chID byte. Immediately returns +// false if the send queue is full. +func (p *peer) TrySend(chID byte, msg interface{}) bool { + if !p.IsRunning() { + return false + } + return p.mconn.TrySend(chID, msg) +} + +// Get the data for a given key. +func (p *peer) Get(key string) interface{} { + return p.Data.Get(key) +} + +// Set sets the data for the given key. +func (p *peer) Set(key string, data interface{}) { + p.Data.Set(key, data) +} + +//--------------------------------------------------- +// methods used by the Switch + +// CloseConn should be called by the Switch if the peer was created but never started. +func (p *peer) CloseConn() { + p.conn.Close() // nolint: errcheck +} + +// HandshakeTimeout performs the Tendermint P2P handshake between a given node and the peer +// by exchanging their NodeInfo. It sets the received nodeInfo on the peer. // NOTE: blocking func (p *peer) HandshakeTimeout(ourNodeInfo NodeInfo, timeout time.Duration) error { // Set deadline for handshake so we don't block forever on conn.ReadFull @@ -220,51 +288,6 @@ func (p *peer) PubKey() crypto.PubKey { panic("Attempt to get peer's PubKey before calling Handshake") } -// OnStart implements BaseService. -func (p *peer) OnStart() error { - if err := p.BaseService.OnStart(); err != nil { - return err - } - err := p.mconn.Start() - return err -} - -// OnStop implements BaseService. -func (p *peer) OnStop() { - p.BaseService.OnStop() - p.mconn.Stop() -} - -// Connection returns underlying MConnection. -func (p *peer) Connection() *MConnection { - return p.mconn -} - -// IsOutbound returns true if the connection is outbound, false otherwise. -func (p *peer) IsOutbound() bool { - return p.outbound -} - -// Send msg to the channel identified by chID byte. Returns false if the send -// queue is full after timeout, specified by MConnection. -func (p *peer) Send(chID byte, msg interface{}) bool { - if !p.IsRunning() { - // see Switch#Broadcast, where we fetch the list of peers and loop over - // them - while we're looping, one peer may be removed and stopped. - return false - } - return p.mconn.Send(chID, msg) -} - -// TrySend msg to the channel identified by chID byte. Immediately returns -// false if the send queue is full. -func (p *peer) TrySend(chID byte, msg interface{}) bool { - if !p.IsRunning() { - return false - } - return p.mconn.TrySend(chID, msg) -} - // CanSend returns true if the send queue is not full, false otherwise. func (p *peer) CanSend(chID byte) bool { if !p.IsRunning() { @@ -282,35 +305,8 @@ func (p *peer) String() string { return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID()) } -// Equals reports whenever 2 peers are actually represent the same node. -func (p *peer) Equals(other Peer) bool { - return p.ID() == other.ID() -} - -// Get the data for a given key. -func (p *peer) Get(key string) interface{} { - return p.Data.Get(key) -} - -// Set sets the data for the given key. -func (p *peer) Set(key string, data interface{}) { - p.Data.Set(key, data) -} - -// ID returns the peer's ID - the hex encoded hash of its pubkey. -func (p *peer) ID() ID { - return PubKeyToID(p.PubKey()) -} - -// NodeInfo returns a copy of the peer's NodeInfo. -func (p *peer) NodeInfo() NodeInfo { - return p.nodeInfo -} - -// Status returns the peer's ConnectionStatus. -func (p *peer) Status() ConnectionStatus { - return p.mconn.Status() -} +//------------------------------------------------------------------ +// helper funcs func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) { conn, err := addr.DialTimeout(config.DialTimeout * time.Second) diff --git a/p2p/peer_test.go b/p2p/peer_test.go index f4a5363b5..d99fff5e4 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -30,7 +30,7 @@ func TestPeerBasic(t *testing.T) { assert.True(p.IsRunning()) assert.True(p.IsOutbound()) assert.False(p.IsPersistent()) - p.makePersistent() + p.persistent = true assert.True(p.IsPersistent()) assert.Equal(rp.Addr().String(), p.Addr().String()) assert.Equal(rp.PubKey(), p.PubKey()) @@ -86,7 +86,7 @@ func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) } reactorsByCh := map[byte]Reactor{0x01: NewTestReactor(chDescs, true)} pk := crypto.GenPrivKeyEd25519().Wrap() - p, err := newOutboundPeer(addr, reactorsByCh, chDescs, func(p Peer, r interface{}) {}, pk, config) + p, err := newOutboundPeer(addr, reactorsByCh, chDescs, func(p Peer, r interface{}) {}, pk, config, false) if err != nil { return nil, err } diff --git a/p2p/switch.go b/p2p/switch.go index 0ff64dbf9..eaf3c22e5 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -428,9 +428,7 @@ func (sw *Switch) listenerRoutine(l Listener) { func (sw *Switch) addInboundPeerWithConfig(conn net.Conn, config *PeerConfig) error { peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, config) if err != nil { - if err := conn.Close(); err != nil { - sw.Logger.Error("Error closing connection", "err", err) - } + peer.CloseConn() return err } peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr())) @@ -446,7 +444,7 @@ func (sw *Switch) addInboundPeerWithConfig(conn net.Conn, config *PeerConfig) er // add the peer. func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig, persistent bool) (Peer, error) { sw.Logger.Info("Dialing peer", "address", addr) - peer, err := newOutboundPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, config) + peer, err := newOutboundPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, config, persistent) if err != nil { sw.Logger.Error("Failed to dial peer", "address", addr, "err", err) return nil, err @@ -457,12 +455,10 @@ func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig if addr.ID == "" { peer.Logger.Info("Dialed peer with unknown ID - unable to authenticate", "addr", addr) } else if addr.ID != peer.ID() { + peer.CloseConn() return nil, fmt.Errorf("Failed to authenticate peer %v. Connected to peer with ID %s", addr, peer.ID()) } - if persistent { - peer.makePersistent() - } err = sw.addPeer(peer) if err != nil { sw.Logger.Error("Failed to add peer", "address", addr, "err", err) diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 7d61fa39a..a729698e9 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -236,7 +236,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { rp.Start() defer rp.Stop() - peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, DefaultPeerConfig()) + peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, DefaultPeerConfig(), false) require.Nil(err) err = sw.addPeer(peer) require.Nil(err) @@ -263,8 +263,7 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) { rp.Start() defer rp.Stop() - peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, DefaultPeerConfig()) - peer.makePersistent() + peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, DefaultPeerConfig(), true) require.Nil(err) err = sw.addPeer(peer) require.Nil(err) From 68237911ba03cf16d9d20db2dd95c549311f6b33 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 14 Jan 2018 01:11:50 -0500 Subject: [PATCH 5/6] NetAddress.Same checks ID or DialString --- p2p/netaddress.go | 15 ++++++++++++++- p2p/switch.go | 8 +++----- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/p2p/netaddress.go b/p2p/netaddress.go index fed5e59d4..333d16e5d 100644 --- a/p2p/netaddress.go +++ b/p2p/netaddress.go @@ -127,12 +127,25 @@ func NewNetAddressIPPort(ip net.IP, port uint16) *NetAddress { return na } -// Equals reports whether na and other are the same addresses. +// Equals reports whether na and other are the same addresses, +// including their ID, IP, and Port. func (na *NetAddress) Equals(other interface{}) bool { if o, ok := other.(*NetAddress); ok { return na.String() == o.String() } + return false +} +// Same returns true is na has the same non-empty ID or DialString as other. +func (na *NetAddress) Same(other interface{}) bool { + if o, ok := other.(*NetAddress); ok { + if na.DialString() == o.DialString() { + return true + } + if na.ID != "" && na.ID == o.ID { + return true + } + } return false } diff --git a/p2p/switch.go b/p2p/switch.go index eaf3c22e5..3f026556a 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -318,18 +318,16 @@ func (sw *Switch) IsDialing(id ID) bool { // DialPeersAsync dials a list of peers asynchronously in random order (optionally, making them persistent). func (sw *Switch) DialPeersAsync(addrBook *AddrBook, peers []string, persistent bool) error { netAddrs, errs := NewNetAddressStrings(peers) - // TODO: IDs for _, err := range errs { sw.Logger.Error("Error in peer's address", "err", err) } if addrBook != nil { // add peers to `addrBook` - ourAddrS := sw.nodeInfo.ListenAddr - ourAddr, _ := NewNetAddressString(ourAddrS) + ourAddr := sw.nodeInfo.NetAddress() for _, netAddr := range netAddrs { - // do not add ourselves - if netAddr.Equals(ourAddr) { + // do not add our address or ID + if netAddr.Same(ourAddr) { continue } addrBook.AddAddress(netAddr, ourAddr) From 3368eeb03ec82de933c9be33ad906ab83823e351 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 14 Jan 2018 01:17:43 -0500 Subject: [PATCH 6/6] fix tests --- benchmarks/codec_test.go | 12 ++++-------- blockchain/reactor_test.go | 2 +- rpc/core/types/responses_test.go | 2 +- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/benchmarks/codec_test.go b/benchmarks/codec_test.go index 209fcd3ba..8ac62a247 100644 --- a/benchmarks/codec_test.go +++ b/benchmarks/codec_test.go @@ -16,11 +16,10 @@ func BenchmarkEncodeStatusWire(b *testing.B) { b.StopTimer() pubKey := crypto.GenPrivKeyEd25519().PubKey() status := &ctypes.ResultStatus{ - NodeInfo: &p2p.NodeInfo{ + NodeInfo: p2p.NodeInfo{ PubKey: pubKey, Moniker: "SOMENAME", Network: "SOMENAME", - RemoteAddr: "SOMEADDR", ListenAddr: "SOMEADDR", Version: "SOMEVER", Other: []string{"SOMESTRING", "OTHERSTRING"}, @@ -43,11 +42,10 @@ func BenchmarkEncodeStatusWire(b *testing.B) { func BenchmarkEncodeNodeInfoWire(b *testing.B) { b.StopTimer() pubKey := crypto.GenPrivKeyEd25519().PubKey() - nodeInfo := &p2p.NodeInfo{ + nodeInfo := p2p.NodeInfo{ PubKey: pubKey, Moniker: "SOMENAME", Network: "SOMENAME", - RemoteAddr: "SOMEADDR", ListenAddr: "SOMEADDR", Version: "SOMEVER", Other: []string{"SOMESTRING", "OTHERSTRING"}, @@ -64,11 +62,10 @@ func BenchmarkEncodeNodeInfoWire(b *testing.B) { func BenchmarkEncodeNodeInfoBinary(b *testing.B) { b.StopTimer() pubKey := crypto.GenPrivKeyEd25519().PubKey() - nodeInfo := &p2p.NodeInfo{ + nodeInfo := p2p.NodeInfo{ PubKey: pubKey, Moniker: "SOMENAME", Network: "SOMENAME", - RemoteAddr: "SOMEADDR", ListenAddr: "SOMEADDR", Version: "SOMEVER", Other: []string{"SOMESTRING", "OTHERSTRING"}, @@ -87,11 +84,10 @@ func BenchmarkEncodeNodeInfoProto(b *testing.B) { b.StopTimer() pubKey := crypto.GenPrivKeyEd25519().PubKey().Unwrap().(crypto.PubKeyEd25519) pubKey2 := &proto.PubKey{Ed25519: &proto.PubKeyEd25519{Bytes: pubKey[:]}} - nodeInfo := &proto.NodeInfo{ + nodeInfo := proto.NodeInfo{ PubKey: pubKey2, Moniker: "SOMENAME", Network: "SOMENAME", - RemoteAddr: "SOMEADDR", ListenAddr: "SOMEADDR", Version: "SOMEVER", Other: []string{"SOMESTRING", "OTHERSTRING"}, diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index 5bdd28694..06f6c36c5 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -142,7 +142,7 @@ func (tp *bcrTestPeer) TrySend(chID byte, value interface{}) bool { } func (tp *bcrTestPeer) Send(chID byte, data interface{}) bool { return tp.TrySend(chID, data) } -func (tp *bcrTestPeer) NodeInfo() *p2p.NodeInfo { return nil } +func (tp *bcrTestPeer) NodeInfo() p2p.NodeInfo { return p2p.NodeInfo{} } func (tp *bcrTestPeer) Status() p2p.ConnectionStatus { return p2p.ConnectionStatus{} } func (tp *bcrTestPeer) ID() p2p.ID { return tp.id } func (tp *bcrTestPeer) IsOutbound() bool { return false } diff --git a/rpc/core/types/responses_test.go b/rpc/core/types/responses_test.go index fa0da3fdf..e410d47ae 100644 --- a/rpc/core/types/responses_test.go +++ b/rpc/core/types/responses_test.go @@ -17,7 +17,7 @@ func TestStatusIndexer(t *testing.T) { status = &ResultStatus{} assert.False(status.TxIndexEnabled()) - status.NodeInfo = &p2p.NodeInfo{} + status.NodeInfo = p2p.NodeInfo{} assert.False(status.TxIndexEnabled()) cases := []struct {