From 452d10f368a626a47bf4f5e9736a6b1166c252e0 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 13 Jan 2018 17:25:51 -0500 Subject: [PATCH] cleanup switch --- p2p/addrbook.go | 13 ++ p2p/base_reactor.go | 35 +++ p2p/peer.go | 2 + p2p/switch.go | 504 +++++++++++++++++--------------------------- p2p/test_util.go | 111 ++++++++++ p2p/util.go | 15 -- 6 files changed, 355 insertions(+), 325 deletions(-) create mode 100644 p2p/base_reactor.go create mode 100644 p2p/test_util.go delete mode 100644 p2p/util.go diff --git a/p2p/addrbook.go b/p2p/addrbook.go index 8826ff1e0..7591c3074 100644 --- a/p2p/addrbook.go +++ b/p2p/addrbook.go @@ -5,6 +5,7 @@ package p2p import ( + "crypto/sha256" "encoding/binary" "encoding/json" "fmt" @@ -867,3 +868,15 @@ func (ka *knownAddress) isBad() bool { return false } + +//----------------------------------------------------------------------------- + +// doubleSha256 calculates sha256(sha256(b)) and returns the resulting bytes. +func doubleSha256(b []byte) []byte { + hasher := sha256.New() + hasher.Write(b) // nolint: errcheck, gas + sum := hasher.Sum(nil) + hasher.Reset() + hasher.Write(sum) // nolint: errcheck, gas + return hasher.Sum(nil) +} diff --git a/p2p/base_reactor.go b/p2p/base_reactor.go new file mode 100644 index 000000000..e8107d730 --- /dev/null +++ b/p2p/base_reactor.go @@ -0,0 +1,35 @@ +package p2p + +import cmn "github.com/tendermint/tmlibs/common" + +type Reactor interface { + cmn.Service // Start, Stop + + SetSwitch(*Switch) + GetChannels() []*ChannelDescriptor + AddPeer(peer Peer) + RemovePeer(peer Peer, reason interface{}) + Receive(chID byte, peer Peer, msgBytes []byte) // CONTRACT: msgBytes are not nil +} + +//-------------------------------------- + +type BaseReactor struct { + cmn.BaseService // Provides Start, Stop, .Quit + Switch *Switch +} + +func NewBaseReactor(name string, impl Reactor) *BaseReactor { + return &BaseReactor{ + BaseService: *cmn.NewBaseService(nil, name, impl), + Switch: nil, + } +} + +func (br *BaseReactor) SetSwitch(sw *Switch) { + br.Switch = sw +} +func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil } +func (_ *BaseReactor) AddPeer(peer Peer) {} +func (_ *BaseReactor) RemovePeer(peer Peer, reason interface{}) {} +func (_ *BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {} diff --git a/p2p/peer.go b/p2p/peer.go index 2f5dff78d..1f5192d50 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -99,6 +99,8 @@ func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs [] func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(Peer, interface{}), ourNodePrivKey crypto.PrivKey, config *PeerConfig) (*peer, error) { + // TODO: issue PoW challenge + return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config) } diff --git a/p2p/switch.go b/p2p/switch.go index 484649145..4ece1a58c 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -12,7 +12,6 @@ import ( crypto "github.com/tendermint/go-crypto" cfg "github.com/tendermint/tendermint/config" cmn "github.com/tendermint/tmlibs/common" - "github.com/tendermint/tmlibs/log" ) const ( @@ -31,46 +30,17 @@ const ( reconnectBackOffBaseSeconds = 3 ) -type Reactor interface { - cmn.Service // Start, Stop - - SetSwitch(*Switch) - GetChannels() []*ChannelDescriptor - AddPeer(peer Peer) - RemovePeer(peer Peer, reason interface{}) - Receive(chID byte, peer Peer, msgBytes []byte) // CONTRACT: msgBytes are not nil -} - -//-------------------------------------- - -type BaseReactor struct { - cmn.BaseService // Provides Start, Stop, .Quit - Switch *Switch -} - -func NewBaseReactor(name string, impl Reactor) *BaseReactor { - return &BaseReactor{ - BaseService: *cmn.NewBaseService(nil, name, impl), - Switch: nil, - } -} - -func (br *BaseReactor) SetSwitch(sw *Switch) { - br.Switch = sw -} -func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil } -func (_ *BaseReactor) AddPeer(peer Peer) {} -func (_ *BaseReactor) RemovePeer(peer Peer, reason interface{}) {} -func (_ *BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {} +var ( + ErrSwitchDuplicatePeer = errors.New("Duplicate peer") + ErrSwitchConnectToSelf = errors.New("Connect to self") +) //----------------------------------------------------------------------------- -/* -The `Switch` handles peer connections and exposes an API to receive incoming messages -on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one -or more `Channels`. So while sending outgoing messages is typically performed on the peer, -incoming messages are received on the reactor. -*/ +// `Switch` handles peer connections and exposes an API to receive incoming messages +// on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one +// or more `Channels`. So while sending outgoing messages is typically performed on the peer, +// incoming messages are received on the reactor. type Switch struct { cmn.BaseService @@ -91,11 +61,6 @@ type Switch struct { rng *rand.Rand // seed for randomizing dial times and orders } -var ( - ErrSwitchDuplicatePeer = errors.New("Duplicate peer") - ErrSwitchConnectToSelf = errors.New("Connect to self") -) - func NewSwitch(config *cfg.P2PConfig) *Switch { sw := &Switch{ config: config, @@ -122,6 +87,9 @@ func NewSwitch(config *cfg.P2PConfig) *Switch { return sw } +//--------------------------------------------------------------------- +// Switch setup + // AddReactor adds the given reactor to the switch. // NOTE: Not goroutine safe. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { @@ -193,6 +161,9 @@ func (sw *Switch) SetNodeKey(nodeKey *NodeKey) { } } +//--------------------------------------------------------------------- +// Service start/stop + // OnStart implements BaseService. It starts all the reactors, peers, and listeners. func (sw *Switch) OnStart() error { // Start reactors @@ -228,92 +199,125 @@ func (sw *Switch) OnStop() { } } -// addPeer checks the given peer's validity, performs a handshake, and adds the -// peer to the switch and to all registered reactors. -// NOTE: This performs a blocking handshake before the peer is added. -// NOTE: If error is returned, caller is responsible for calling peer.CloseConn() -func (sw *Switch) addPeer(peer *peer) error { - // Avoid self - if sw.nodeKey.ID() == peer.ID() { - return ErrSwitchConnectToSelf - } - - // Filter peer against white list - if err := sw.FilterConnByAddr(peer.Addr()); err != nil { - return err - } - if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil { - return err - } +//--------------------------------------------------------------------- +// Peers - if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second)); err != nil { - return err - } - - // Avoid duplicate - if sw.peers.Has(peer.ID()) { - return ErrSwitchDuplicatePeer +// Peers returns the set of peers that are connected to the switch. +func (sw *Switch) Peers() IPeerSet { + return sw.peers +} +// NumPeers returns the count of outbound/inbound and outbound-dialing peers. +func (sw *Switch) NumPeers() (outbound, inbound, dialing int) { + peers := sw.peers.List() + for _, peer := range peers { + if peer.IsOutbound() { + outbound++ + } else { + inbound++ + } } + dialing = sw.dialing.Size() + return +} - // Check version, chain id - if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo()); err != nil { - return err +// Broadcast runs a go routine for each attempted send, which will block +// trying to send for defaultSendTimeoutSeconds. Returns a channel +// which receives success values for each attempted send (false if times out). +// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved. +// TODO: Something more intelligent. +func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool { + successChan := make(chan bool, len(sw.peers.List())) + sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg) + for _, peer := range sw.peers.List() { + go func(peer Peer) { + success := peer.Send(chID, msg) + successChan <- success + }(peer) } + return successChan +} - // Start peer - if sw.IsRunning() { - sw.startInitPeer(peer) - } +// StopPeerForError disconnects from a peer due to external error. +// If the peer is persistent, it will attempt to reconnect. +// TODO: make record depending on reason. +func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) { + sw.Logger.Error("Stopping peer for error", "peer", peer, "err", reason) + sw.stopAndRemovePeer(peer, reason) - // Add the peer to .peers. - // We start it first so that a peer in the list is safe to Stop. - // It should not err since we already checked peers.Has(). - if err := sw.peers.Add(peer); err != nil { - return err + if peer.IsPersistent() { + go sw.reconnectToPeer(peer) } +} - sw.Logger.Info("Added peer", "peer", peer) - return nil +// StopPeerGracefully disconnects from a peer gracefully. +// TODO: handle graceful disconnects. +func (sw *Switch) StopPeerGracefully(peer Peer) { + sw.Logger.Info("Stopping peer gracefully") + sw.stopAndRemovePeer(peer, nil) } -// FilterConnByAddr returns an error if connecting to the given address is forbidden. -func (sw *Switch) FilterConnByAddr(addr net.Addr) error { - if sw.filterConnByAddr != nil { - return sw.filterConnByAddr(addr) +func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) { + sw.peers.Remove(peer) + peer.Stop() + for _, reactor := range sw.reactors { + reactor.RemovePeer(peer, reason) } - return nil } -// FilterConnByPubKey returns an error if connecting to the given public key is forbidden. -func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKey) error { - if sw.filterConnByPubKey != nil { - return sw.filterConnByPubKey(pubkey) - } - return nil +// reconnectToPeer tries to reconnect to the peer, first repeatedly +// with a fixed interval, then with exponential backoff. +// If no success after all that, it stops trying, and leaves it +// to the PEX/Addrbook to find the peer again +func (sw *Switch) reconnectToPeer(peer Peer) { + netAddr := peer.NodeInfo().NetAddress() + start := time.Now() + sw.Logger.Info("Reconnecting to peer", "peer", peer) + for i := 0; i < reconnectAttempts; i++ { + if !sw.IsRunning() { + return + } -} + peer, err := sw.DialPeerWithAddress(netAddr, true) + if err != nil { + sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer) + // sleep a set amount + sw.randomSleep(reconnectInterval) + continue + } else { + sw.Logger.Info("Reconnected to peer", "peer", peer) + return + } + } -// SetAddrFilter sets the function for filtering connections by address. -func (sw *Switch) SetAddrFilter(f func(net.Addr) error) { - sw.filterConnByAddr = f -} + sw.Logger.Error("Failed to reconnect to peer. Beginning exponential backoff", + "peer", peer, "elapsed", time.Since(start)) + for i := 0; i < reconnectBackOffAttempts; i++ { + if !sw.IsRunning() { + return + } -// SetPubKeyFilter sets the function for filtering connections by public key. -func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKey) error) { - sw.filterConnByPubKey = f + // sleep an exponentially increasing amount + sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i)) + sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second) + peer, err := sw.DialPeerWithAddress(netAddr, true) + if err != nil { + sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer) + continue + } else { + sw.Logger.Info("Reconnected to peer", "peer", peer) + return + } + } + sw.Logger.Error("Failed to reconnect to peer. Giving up", "peer", peer, "elapsed", time.Since(start)) } -func (sw *Switch) startInitPeer(peer *peer) { - err := peer.Start() // spawn send/recv routines - if err != nil { - // Should never happen - sw.Logger.Error("Error starting peer", "peer", peer, "err", err) - } +//--------------------------------------------------------------------- +// Dialing - for _, reactor := range sw.reactors { - reactor.AddPeer(peer) - } +// IsDialing returns true if the switch is currently dialing the given ID. +func (sw *Switch) IsDialing(id ID) bool { + return sw.dialing.Has(string(id)) } // DialPeersAsync dials a list of peers asynchronously in random order (optionally, making them persistent). @@ -355,12 +359,6 @@ func (sw *Switch) DialPeersAsync(addrBook *AddrBook, peers []string, persistent return nil } -// sleep for interval plus some random amount of ms on [0, dialRandomizerIntervalMilliseconds] -func (sw *Switch) randomSleep(interval time.Duration) { - r := time.Duration(sw.rng.Int63n(dialRandomizerIntervalMilliseconds)) * time.Millisecond - time.Sleep(r + interval) -} - // DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects successfully. // If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails. func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (Peer, error) { @@ -395,121 +393,44 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (Peer, return peer, nil } -// IsDialing returns true if the switch is currently dialing the given ID. -func (sw *Switch) IsDialing(id ID) bool { - return sw.dialing.Has(string(id)) -} - -// Broadcast runs a go routine for each attempted send, which will block -// trying to send for defaultSendTimeoutSeconds. Returns a channel -// which receives success values for each attempted send (false if times out). -// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved. -// TODO: Something more intelligent. -func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool { - successChan := make(chan bool, len(sw.peers.List())) - sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg) - for _, peer := range sw.peers.List() { - go func(peer Peer) { - success := peer.Send(chID, msg) - successChan <- success - }(peer) - } - return successChan -} - -// NumPeers returns the count of outbound/inbound and outbound-dialing peers. -func (sw *Switch) NumPeers() (outbound, inbound, dialing int) { - peers := sw.peers.List() - for _, peer := range peers { - if peer.IsOutbound() { - outbound++ - } else { - inbound++ - } - } - dialing = sw.dialing.Size() - return -} - -// Peers returns the set of peers that are connected to the switch. -func (sw *Switch) Peers() IPeerSet { - return sw.peers +// sleep for interval plus some random amount of ms on [0, dialRandomizerIntervalMilliseconds] +func (sw *Switch) randomSleep(interval time.Duration) { + r := time.Duration(sw.rng.Int63n(dialRandomizerIntervalMilliseconds)) * time.Millisecond + time.Sleep(r + interval) } -// StopPeerForError disconnects from a peer due to external error. -// If the peer is persistent, it will attempt to reconnect. -// TODO: make record depending on reason. -func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) { - sw.Logger.Error("Stopping peer for error", "peer", peer, "err", reason) - sw.stopAndRemovePeer(peer, reason) +//------------------------------------------------------------------------------------ +// Connection filtering - if peer.IsPersistent() { - go sw.reconnectToPeer(peer) +// FilterConnByAddr returns an error if connecting to the given address is forbidden. +func (sw *Switch) FilterConnByAddr(addr net.Addr) error { + if sw.filterConnByAddr != nil { + return sw.filterConnByAddr(addr) } + return nil } -// reconnectToPeer tries to reconnect to the peer, first repeatedly -// with a fixed interval, then with exponential backoff. -// If no success after all that, it stops trying, and leaves it -// to the PEX/Addrbook to find the peer again -func (sw *Switch) reconnectToPeer(peer Peer) { - netAddr := peer.NodeInfo().NetAddress() - start := time.Now() - sw.Logger.Info("Reconnecting to peer", "peer", peer) - for i := 0; i < reconnectAttempts; i++ { - if !sw.IsRunning() { - return - } - - peer, err := sw.DialPeerWithAddress(netAddr, true) - if err != nil { - sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer) - // sleep a set amount - sw.randomSleep(reconnectInterval) - continue - } else { - sw.Logger.Info("Reconnected to peer", "peer", peer) - return - } +// FilterConnByPubKey returns an error if connecting to the given public key is forbidden. +func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKey) error { + if sw.filterConnByPubKey != nil { + return sw.filterConnByPubKey(pubkey) } + return nil - sw.Logger.Error("Failed to reconnect to peer. Beginning exponential backoff", - "peer", peer, "elapsed", time.Since(start)) - for i := 0; i < reconnectBackOffAttempts; i++ { - if !sw.IsRunning() { - return - } - - // sleep an exponentially increasing amount - sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i)) - sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second) - peer, err := sw.DialPeerWithAddress(netAddr, true) - if err != nil { - sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer) - continue - } else { - sw.Logger.Info("Reconnected to peer", "peer", peer) - return - } - } - sw.Logger.Error("Failed to reconnect to peer. Giving up", "peer", peer, "elapsed", time.Since(start)) } -// StopPeerGracefully disconnects from a peer gracefully. -// TODO: handle graceful disconnects. -func (sw *Switch) StopPeerGracefully(peer Peer) { - sw.Logger.Info("Stopping peer gracefully") - sw.stopAndRemovePeer(peer, nil) +// SetAddrFilter sets the function for filtering connections by address. +func (sw *Switch) SetAddrFilter(f func(net.Addr) error) { + sw.filterConnByAddr = f } -func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) { - sw.peers.Remove(peer) - peer.Stop() - for _, reactor := range sw.reactors { - reactor.RemovePeer(peer, reason) - } +// SetPubKeyFilter sets the function for filtering connections by public key. +func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKey) error) { + sw.filterConnByPubKey = f } +//------------------------------------------------------------------------------------ + func (sw *Switch) listenerRoutine(l Listener) { for { inConn, ok := <-l.Connections() @@ -525,7 +446,7 @@ func (sw *Switch) listenerRoutine(l Listener) { } // New inbound connection! - err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig) + err := sw.addInboundPeerWithConfig(inConn, sw.peerConfig) if err != nil { sw.Logger.Info("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "err", err) continue @@ -539,119 +460,82 @@ func (sw *Switch) listenerRoutine(l Listener) { // cleanup } -//------------------------------------------------------------------ -// Connects switches via arbitrary net.Conn. Used for testing. - -// MakeConnectedSwitches returns n switches, connected according to the connect func. -// If connect==Connect2Switches, the switches will be fully connected. -// initSwitch defines how the i'th switch should be initialized (ie. with what reactors). -// NOTE: panics if any switch fails to start. -func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch { - switches := make([]*Switch, n) - for i := 0; i < n; i++ { - switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch) +func (sw *Switch) addInboundPeerWithConfig(conn net.Conn, config *PeerConfig) error { + peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, config) + if err != nil { + if err := conn.Close(); err != nil { + sw.Logger.Error("Error closing connection", "err", err) + } + return err + } + peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr())) + if err = sw.addPeer(peer); err != nil { + peer.CloseConn() + return err } - if err := StartSwitches(switches); err != nil { - panic(err) + return nil +} + +// addPeer checks the given peer's validity, performs a handshake, and adds the +// peer to the switch and to all registered reactors. +// We already have an authenticated SecretConnection with the peer. +// NOTE: This performs a blocking handshake before the peer is added. +// NOTE: If error is returned, caller is responsible for calling peer.CloseConn() +func (sw *Switch) addPeer(peer *peer) error { + // Avoid self + if sw.nodeKey.ID() == peer.ID() { + return ErrSwitchConnectToSelf } - for i := 0; i < n; i++ { - for j := i + 1; j < n; j++ { - connect(switches, i, j) - } + // Filter peer against white list + if err := sw.FilterConnByAddr(peer.Addr()); err != nil { + return err + } + if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil { + return err } - return switches -} + // Exchange NodeInfo with the peer + if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second)); err != nil { + return err + } -// Connect2Switches will connect switches i and j via net.Pipe(). -// Blocks until a connection is established. -// NOTE: caller ensures i and j are within bounds. -func Connect2Switches(switches []*Switch, i, j int) { - switchI := switches[i] - switchJ := switches[j] - c1, c2 := netPipe() - doneCh := make(chan struct{}) - go func() { - err := switchI.addPeerWithConnection(c1) - if err != nil { - panic(err) - } - doneCh <- struct{}{} - }() - go func() { - err := switchJ.addPeerWithConnection(c2) - if err != nil { - panic(err) - } - doneCh <- struct{}{} - }() - <-doneCh - <-doneCh -} + // Avoid duplicate + if sw.peers.Has(peer.ID()) { + return ErrSwitchDuplicatePeer -// StartSwitches calls sw.Start() for each given switch. -// It returns the first encountered error. -func StartSwitches(switches []*Switch) error { - for _, s := range switches { - err := s.Start() // start switch and reactors - if err != nil { - return err - } } - return nil -} -func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch { - // new switch, add reactors - // TODO: let the config be passed in? - nodeKey := &NodeKey{ - PrivKey: crypto.GenPrivKeyEd25519().Wrap(), + // Check version, chain id + if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo()); err != nil { + return err } - s := initSwitch(i, NewSwitch(cfg)) - s.SetNodeInfo(&NodeInfo{ - PubKey: nodeKey.PubKey(), - Moniker: cmn.Fmt("switch%d", i), - Network: network, - Version: version, - ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023), - }) - s.SetNodeKey(nodeKey) - s.SetLogger(log.TestingLogger()) - return s -} -func (sw *Switch) addPeerWithConnection(conn net.Conn) error { - peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, sw.peerConfig) - if err != nil { - if err := conn.Close(); err != nil { - sw.Logger.Error("Error closing connection", "err", err) - } - return err + // Start peer + if sw.IsRunning() { + sw.startInitPeer(peer) } - peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr())) - if err = sw.addPeer(peer); err != nil { - peer.CloseConn() + + // Add the peer to .peers. + // We start it first so that a peer in the list is safe to Stop. + // It should not err since we already checked peers.Has(). + if err := sw.peers.Add(peer); err != nil { return err } + sw.Logger.Info("Added peer", "peer", peer) return nil } -func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error { - peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, config) +func (sw *Switch) startInitPeer(peer *peer) { + err := peer.Start() // spawn send/recv routines if err != nil { - if err := conn.Close(); err != nil { - sw.Logger.Error("Error closing connection", "err", err) - } - return err - } - peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr())) - if err = sw.addPeer(peer); err != nil { - peer.CloseConn() - return err + // Should never happen + sw.Logger.Error("Error starting peer", "peer", peer, "err", err) } - return nil + for _, reactor := range sw.reactors { + reactor.AddPeer(peer) + } } diff --git a/p2p/test_util.go b/p2p/test_util.go new file mode 100644 index 000000000..9c6892f16 --- /dev/null +++ b/p2p/test_util.go @@ -0,0 +1,111 @@ +package p2p + +import ( + "math/rand" + "net" + + crypto "github.com/tendermint/go-crypto" + cfg "github.com/tendermint/tendermint/config" + cmn "github.com/tendermint/tmlibs/common" + "github.com/tendermint/tmlibs/log" +) + +//------------------------------------------------------------------ +// Connects switches via arbitrary net.Conn. Used for testing. + +// MakeConnectedSwitches returns n switches, connected according to the connect func. +// If connect==Connect2Switches, the switches will be fully connected. +// initSwitch defines how the i'th switch should be initialized (ie. with what reactors). +// NOTE: panics if any switch fails to start. +func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch { + switches := make([]*Switch, n) + for i := 0; i < n; i++ { + switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch) + } + + if err := StartSwitches(switches); err != nil { + panic(err) + } + + for i := 0; i < n; i++ { + for j := i + 1; j < n; j++ { + connect(switches, i, j) + } + } + + return switches +} + +// Connect2Switches will connect switches i and j via net.Pipe(). +// Blocks until a connection is established. +// NOTE: caller ensures i and j are within bounds. +func Connect2Switches(switches []*Switch, i, j int) { + switchI := switches[i] + switchJ := switches[j] + c1, c2 := netPipe() + doneCh := make(chan struct{}) + go func() { + err := switchI.addPeerWithConnection(c1) + if err != nil { + panic(err) + } + doneCh <- struct{}{} + }() + go func() { + err := switchJ.addPeerWithConnection(c2) + if err != nil { + panic(err) + } + doneCh <- struct{}{} + }() + <-doneCh + <-doneCh +} + +func (sw *Switch) addPeerWithConnection(conn net.Conn) error { + peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, sw.peerConfig) + if err != nil { + if err := conn.Close(); err != nil { + sw.Logger.Error("Error closing connection", "err", err) + } + return err + } + peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr())) + if err = sw.addPeer(peer); err != nil { + peer.CloseConn() + return err + } + + return nil +} + +// StartSwitches calls sw.Start() for each given switch. +// It returns the first encountered error. +func StartSwitches(switches []*Switch) error { + for _, s := range switches { + err := s.Start() // start switch and reactors + if err != nil { + return err + } + } + return nil +} + +func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch { + // new switch, add reactors + // TODO: let the config be passed in? + nodeKey := &NodeKey{ + PrivKey: crypto.GenPrivKeyEd25519().Wrap(), + } + s := initSwitch(i, NewSwitch(cfg)) + s.SetNodeInfo(&NodeInfo{ + PubKey: nodeKey.PubKey(), + Moniker: cmn.Fmt("switch%d", i), + Network: network, + Version: version, + ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023), + }) + s.SetNodeKey(nodeKey) + s.SetLogger(log.TestingLogger()) + return s +} diff --git a/p2p/util.go b/p2p/util.go deleted file mode 100644 index a4c3ad58b..000000000 --- a/p2p/util.go +++ /dev/null @@ -1,15 +0,0 @@ -package p2p - -import ( - "crypto/sha256" -) - -// doubleSha256 calculates sha256(sha256(b)) and returns the resulting bytes. -func doubleSha256(b []byte) []byte { - hasher := sha256.New() - hasher.Write(b) // nolint: errcheck, gas - sum := hasher.Sum(nil) - hasher.Reset() - hasher.Write(sum) // nolint: errcheck, gas - return hasher.Sum(nil) -}