From 8bb3a2e1d7b1152f5d6043a5ccca7c3fbdd9a993 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 7 Apr 2017 14:57:03 +0400 Subject: [PATCH] persistent peers (Refs #13) --- peer.go | 204 +++++++++++++++++++++++++++++++++++++++---------- pex_reactor.go | 4 +- switch.go | 173 +++++++++++++++++++++-------------------- switch_test.go | 131 ++++++++++++++++++++++++++++--- 4 files changed, 377 insertions(+), 135 deletions(-) diff --git a/peer.go b/peer.go index 6b1b47a1f..d94ce5838 100644 --- a/peer.go +++ b/peer.go @@ -4,105 +4,187 @@ import ( "fmt" "io" "net" + "time" - . "github.com/tendermint/go-common" + cmn "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" - "github.com/tendermint/go-wire" + crypto "github.com/tendermint/go-crypto" + wire "github.com/tendermint/go-wire" ) +// Peer could be marked as persistent, in which case you can use +// Redial function to reconnect. Note that inbound peers can't be +// made persistent. They should be made persistent on the other end. +// +// Before using a peer, you will need to perform a handshake on connection. type Peer struct { - BaseService + cmn.BaseService outbound bool - mconn *MConnection + + conn net.Conn // source connection + mconn *MConnection // multiplex connection + + authEnc bool // authenticated encryption + persistent bool + config cfg.Config *NodeInfo Key string - Data *CMap // User data. + Data *cmn.CMap // User data. +} + +func newPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config cfg.Config, privKey crypto.PrivKeyEd25519) (*Peer, error) { + conn, err := dial(addr, config) + if err != nil { + return nil, err + } + + // outbound = true + return newPeerFromExistingConn(conn, true, reactorsByCh, chDescs, onPeerError, config, privKey) +} + +func newPeerFromExistingConn(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config cfg.Config, privKey crypto.PrivKeyEd25519) (*Peer, error) { + // Encrypt connection + if config.GetBool(configKeyAuthEnc) { + var err error + conn, err = MakeSecretConnection(conn, privKey) + if err != nil { + return nil, err + } + } + + p := &Peer{ + outbound: outbound, + authEnc: config.GetBool(configKeyAuthEnc), + conn: conn, + config: config, + Data: cmn.NewCMap(), + } + + p.mconn = createMConnection(conn, p, reactorsByCh, chDescs, onPeerError, config) + + p.BaseService = *cmn.NewBaseService(log, "Peer", p) + + return p, nil +} + +// CloseConn should be used when the peer was created, but never started. +func (p *Peer) CloseConn() { + p.conn.Close() +} + +// MakePersistent marks the peer as persistent. +func (p *Peer) MakePersistent() { + if !p.outbound { + panic("inbound peers can't be made persistent") + } + + p.persistent = true } +// IsPersistent returns true if the peer is persitent, false otherwise. +func (p *Peer) IsPersistent() bool { + return p.persistent +} + +// HandshakeTimeout performs a handshake between a given node and the peer. // NOTE: blocking -// Before creating a peer with newPeer(), perform a handshake on connection. -func peerHandshake(conn net.Conn, ourNodeInfo *NodeInfo) (*NodeInfo, error) { +func (p *Peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) error { + // Set deadline for handshake so we don't block forever on conn.ReadFull + p.conn.SetDeadline(time.Now().Add(timeout)) + var peerNodeInfo = new(NodeInfo) var err1 error var err2 error - Parallel( + cmn.Parallel( func() { var n int - wire.WriteBinary(ourNodeInfo, conn, &n, &err1) + wire.WriteBinary(ourNodeInfo, p.conn, &n, &err1) }, func() { var n int - wire.ReadBinary(peerNodeInfo, conn, maxNodeInfoSize, &n, &err2) + wire.ReadBinary(peerNodeInfo, p.conn, maxNodeInfoSize, &n, &err2) log.Notice("Peer handshake", "peerNodeInfo", peerNodeInfo) }) if err1 != nil { - return nil, err1 + return err1 } if err2 != nil { - return nil, err2 + return err2 } - peerNodeInfo.RemoteAddr = conn.RemoteAddr().String() - return peerNodeInfo, nil -} -// NOTE: call peerHandshake on conn before calling newPeer(). -func newPeer(config cfg.Config, conn net.Conn, peerNodeInfo *NodeInfo, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer { - var p *Peer - onReceive := func(chID byte, msgBytes []byte) { - reactor := reactorsByCh[chID] - if reactor == nil { - PanicSanity(Fmt("Unknown channel %X", chID)) + if p.authEnc { + // Check that the professed PubKey matches the sconn's. + if !peerNodeInfo.PubKey.Equals(p.PubKey()) { + return fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v", + peerNodeInfo.PubKey, p.PubKey()) } - reactor.Receive(chID, p, msgBytes) } - onError := func(r interface{}) { - p.Stop() - onPeerError(p, r) - } - mconnConfig := &MConnectionConfig{ - SendRate: int64(config.GetInt(configKeySendRate)), - RecvRate: int64(config.GetInt(configKeyRecvRate)), + + // Remove deadline + p.conn.SetDeadline(time.Time{}) + + peerNodeInfo.RemoteAddr = p.RemoteAddr().String() + + p.NodeInfo = peerNodeInfo + p.Key = peerNodeInfo.PubKey.KeyString() + + return nil +} + +// RemoteAddr returns the remote network address. +func (p *Peer) RemoteAddr() net.Addr { + return p.conn.RemoteAddr() +} + +// PubKey returns the remote public key. +func (p *Peer) PubKey() crypto.PubKeyEd25519 { + if p.authEnc { + return p.conn.(*SecretConnection).RemotePubKey() } - mconn := NewMConnectionWithConfig(conn, chDescs, onReceive, onError, mconnConfig) - p = &Peer{ - outbound: outbound, - mconn: mconn, - NodeInfo: peerNodeInfo, - Key: peerNodeInfo.PubKey.KeyString(), - Data: NewCMap(), + if p.NodeInfo == nil { + panic("Attempt to get peer's PubKey before calling Handshake") } - p.BaseService = *NewBaseService(log, "Peer", p) - return p + return p.PubKey() } +// OnStart implements BaseService. func (p *Peer) OnStart() error { p.BaseService.OnStart() _, err := p.mconn.Start() return err } +// OnStop implements BaseService. func (p *Peer) OnStop() { p.BaseService.OnStop() p.mconn.Stop() } +// Connection returns underlying MConnection. func (p *Peer) Connection() *MConnection { return p.mconn } +// IsOutbound returns true if the connection is outbound, false otherwise. func (p *Peer) IsOutbound() bool { return p.outbound } +// Send msg to the channel identified by chID byte. Returns false if the send +// queue is full after timeout, specified by MConnection. func (p *Peer) Send(chID byte, msg interface{}) bool { if !p.IsRunning() { + // see Switch#Broadcast, where we fetch the list of peers and loop over + // them - while we're looping, one peer may be removed and stopped. return false } return p.mconn.Send(chID, msg) } +// TrySend msg to the channel identified by chID byte. Immediately returns +// false if the send queue is full. func (p *Peer) TrySend(chID byte, msg interface{}) bool { if !p.IsRunning() { return false @@ -110,6 +192,7 @@ func (p *Peer) TrySend(chID byte, msg interface{}) bool { return p.mconn.TrySend(chID, msg) } +// CanSend returns true if the send queue is not full, false otherwise. func (p *Peer) CanSend(chID byte) bool { if !p.IsRunning() { return false @@ -117,6 +200,7 @@ func (p *Peer) CanSend(chID byte) bool { return p.mconn.CanSend(chID) } +// WriteTo writes the peer's public key to w. func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { var n_ int wire.WriteString(p.Key, w, &n_, &err) @@ -124,18 +208,56 @@ func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { return } +// String representation. func (p *Peer) String() string { if p.outbound { return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12]) - } else { - return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12]) } + + return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12]) } +// Equals reports whenever 2 peers are actually represent the same node. func (p *Peer) Equals(other *Peer) bool { return p.Key == other.Key } +// Get the data for a given key. func (p *Peer) Get(key string) interface{} { return p.Data.Get(key) } + +func dial(addr *NetAddress, config cfg.Config) (net.Conn, error) { + log.Info("Dialing address", "address", addr) + conn, err := addr.DialTimeout(time.Duration( + config.GetInt(configKeyDialTimeoutSeconds)) * time.Second) + if err != nil { + log.Info("Failed dialing address", "address", addr, "error", err) + return nil, err + } + if config.GetBool(configFuzzEnable) { + conn = FuzzConn(config, conn) + } + return conn, nil +} + +func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config cfg.Config) *MConnection { + onReceive := func(chID byte, msgBytes []byte) { + reactor := reactorsByCh[chID] + if reactor == nil { + cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID)) + } + reactor.Receive(chID, p, msgBytes) + } + + onError := func(r interface{}) { + onPeerError(p, r) + } + + mconnConfig := &MConnectionConfig{ + SendRate: int64(config.GetInt(configKeySendRate)), + RecvRate: int64(config.GetInt(configKeyRecvRate)), + } + + return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, mconnConfig) +} diff --git a/pex_reactor.go b/pex_reactor.go index 45c4c96d8..4ac9306cf 100644 --- a/pex_reactor.go +++ b/pex_reactor.go @@ -9,7 +9,7 @@ import ( "time" . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" + wire "github.com/tendermint/go-wire" ) var pexErrInvalidMessage = errors.New("Invalid PEX message") @@ -201,7 +201,7 @@ func (pexR *PEXReactor) ensurePeers() { // Dial picked addresses for _, item := range toDial.Values() { go func(picked *NetAddress) { - _, err := pexR.Switch.DialPeerWithAddress(picked) + _, err := pexR.Switch.DialPeerWithAddress(picked, false) if err != nil { pexR.book.MarkAttempt(picked) } diff --git a/switch.go b/switch.go index 78a3020ed..b0551a4f8 100644 --- a/switch.go +++ b/switch.go @@ -9,10 +9,15 @@ import ( . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" - "github.com/tendermint/go-crypto" + crypto "github.com/tendermint/go-crypto" "github.com/tendermint/log15" ) +const ( + reconnectAttempts = 30 + reconnectInterval = 3 * time.Second +) + type Reactor interface { Service // Start, Stop @@ -194,78 +199,43 @@ func (sw *Switch) OnStop() { // NOTE: This performs a blocking handshake before the peer is added. // CONTRACT: If error is returned, peer is nil, and conn is immediately closed. -func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) { - - // Filter by addr (ie. ip:port) - if err := sw.FilterConnByAddr(conn.RemoteAddr()); err != nil { - conn.Close() - return nil, err +func (sw *Switch) AddPeer(peer *Peer) error { + if err := sw.FilterConnByAddr(peer.RemoteAddr()); err != nil { + return err } - // Set deadline for handshake so we don't block forever on conn.ReadFull - conn.SetDeadline(time.Now().Add( - time.Duration(sw.config.GetInt(configKeyHandshakeTimeoutSeconds)) * time.Second)) - - // First, encrypt the connection. - var sconn net.Conn = conn - if sw.config.GetBool(configKeyAuthEnc) { - var err error - sconn, err = MakeSecretConnection(conn, sw.nodePrivKey) - if err != nil { - conn.Close() - return nil, err - } + if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil { + return err } - // Filter by p2p-key - if err := sw.FilterConnByPubKey(sconn.(*SecretConnection).RemotePubKey()); err != nil { - sconn.Close() - return nil, err + if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.config.GetInt(configKeyHandshakeTimeoutSeconds))*time.Second); err != nil { + return err } - // Then, perform node handshake - peerNodeInfo, err := peerHandshake(sconn, sw.nodeInfo) - if err != nil { - sconn.Close() - return nil, err - } - if sw.config.GetBool(configKeyAuthEnc) { - // Check that the professed PubKey matches the sconn's. - if !peerNodeInfo.PubKey.Equals(sconn.(*SecretConnection).RemotePubKey()) { - sconn.Close() - return nil, fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v", - peerNodeInfo.PubKey, sconn.(*SecretConnection).RemotePubKey()) - } - } // Avoid self - if peerNodeInfo.PubKey.Equals(sw.nodeInfo.PubKey) { - sconn.Close() - return nil, fmt.Errorf("Ignoring connection from self") + if sw.nodeInfo.PubKey.Equals(peer.PubKey()) { + return errors.New("Ignoring connection from self") } + // Check version, chain id - if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil { - sconn.Close() - return nil, err + if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo); err != nil { + return err } - peer := newPeer(sw.config, sconn, peerNodeInfo, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError) - // Add the peer to .peers // ignore if duplicate or if we already have too many for that IP range if err := sw.peers.Add(peer); err != nil { log.Notice("Ignoring peer", "error", err, "peer", peer) - peer.Stop() - return nil, err + return err } - // remove deadline and start peer - conn.SetDeadline(time.Time{}) + // Start peer if sw.IsRunning() { sw.startInitPeer(peer) } log.Notice("Added peer", "peer", peer) - return peer, nil + return nil } func (sw *Switch) FilterConnByAddr(addr net.Addr) error { @@ -292,8 +262,10 @@ func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) { } func (sw *Switch) startInitPeer(peer *Peer) { - peer.Start() // spawn send/recv routines - sw.addPeerToReactors(peer) // run AddPeer on each reactor + peer.Start() // spawn send/recv routines + for _, reactor := range sw.reactors { + reactor.AddPeer(peer) + } } // Dial a list of seeds asynchronously in random order @@ -331,7 +303,7 @@ func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error { } func (sw *Switch) dialSeed(addr *NetAddress) { - peer, err := sw.DialPeerWithAddress(addr) + peer, err := sw.DialPeerWithAddress(addr, true) if err != nil { log.Error("Error dialing seed", "error", err) return @@ -340,22 +312,23 @@ func (sw *Switch) dialSeed(addr *NetAddress) { } } -func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) { - log.Info("Dialing address", "address", addr) +func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) { sw.dialing.Set(addr.IP.String(), addr) - conn, err := addr.DialTimeout(time.Duration( - sw.config.GetInt(configKeyDialTimeoutSeconds)) * time.Second) - sw.dialing.Delete(addr.IP.String()) + defer sw.dialing.Delete(addr.IP.String()) + + peer, err := newPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey) + if persistent { + peer.MakePersistent() + } if err != nil { - log.Info("Failed dialing address", "address", addr, "error", err) + log.Info("Failed dialing peer", "address", addr, "error", err) + peer.CloseConn() return nil, err } - if sw.config.GetBool(configFuzzEnable) { - conn = FuzzConn(sw.config, conn) - } - peer, err := sw.AddPeerWithConnection(conn, true) + err = sw.AddPeer(peer) if err != nil { - log.Info("Failed adding peer", "address", addr, "conn", conn, "error", err) + log.Info("Failed adding peer", "address", addr, "error", err) + peer.CloseConn() return nil, err } log.Notice("Dialed and added peer", "address", addr, "peer", peer) @@ -400,31 +373,49 @@ func (sw *Switch) Peers() IPeerSet { return sw.peers } -// Disconnect from a peer due to external error. +// Disconnect from a peer due to external error, retry if it is a persistent peer. // TODO: make record depending on reason. func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { + addr := NewNetAddress(peer.RemoteAddr()) log.Notice("Stopping peer for error", "peer", peer, "error", reason) - sw.peers.Remove(peer) - peer.Stop() - sw.removePeerFromReactors(peer, reason) + sw.stopAndRemovePeer(peer, reason) + + if peer.IsPersistent() { + go func() { + log.Notice("Reconnecting to peer", "peer", peer) + for i := 1; i < reconnectAttempts; i++ { + if !sw.IsRunning() { + return + } + + peer, err := sw.DialPeerWithAddress(addr, true) + if err != nil { + if i == reconnectAttempts { + log.Notice("Error reconnecting to peer. Giving up", "tries", i, "error", err) + return + } + log.Notice("Error reconnecting to peer. Trying again", "tries", i, "error", err) + time.Sleep(reconnectInterval) + continue + } + + log.Notice("Reconnected to peer", "peer", peer) + return + } + }() + } } // Disconnect from a peer gracefully. // TODO: handle graceful disconnects. func (sw *Switch) StopPeerGracefully(peer *Peer) { log.Notice("Stopping peer gracefully") - sw.peers.Remove(peer) - peer.Stop() - sw.removePeerFromReactors(peer, nil) + sw.stopAndRemovePeer(peer, nil) } -func (sw *Switch) addPeerToReactors(peer *Peer) { - for _, reactor := range sw.reactors { - reactor.AddPeer(peer) - } -} - -func (sw *Switch) removePeerFromReactors(peer *Peer, reason interface{}) { +func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) { + sw.peers.Remove(peer) + peer.Stop() for _, reactor := range sw.reactors { reactor.RemovePeer(peer, reason) } @@ -449,9 +440,9 @@ func (sw *Switch) listenerRoutine(l Listener) { } // New inbound connection! - _, err := sw.AddPeerWithConnection(inConn, false) + err := sw.AddPeerWithConnection(inConn, false, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey) if err != nil { - log.Notice("Ignoring inbound connection: error on AddPeerWithConnection", "address", inConn.RemoteAddr().String(), "error", err) + log.Notice("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "error", err) continue } @@ -511,14 +502,14 @@ func Connect2Switches(switches []*Switch, i, j int) { c1, c2 := net.Pipe() doneCh := make(chan struct{}) go func() { - _, err := switchI.AddPeerWithConnection(c1, false) // AddPeer is blocking, requires handshake. + err := switchI.AddPeerWithConnection(c1, false, switchI.reactorsByCh, switchI.chDescs, switchI.StopPeerForError, switchI.config, switchI.nodePrivKey) if PanicOnAddPeerErr && err != nil { panic(err) } doneCh <- struct{}{} }() go func() { - _, err := switchJ.AddPeerWithConnection(c2, true) + err := switchJ.AddPeerWithConnection(c2, false, switchJ.reactorsByCh, switchJ.chDescs, switchJ.StopPeerForError, switchJ.config, switchJ.nodePrivKey) if PanicOnAddPeerErr && err != nil { panic(err) } @@ -552,3 +543,19 @@ func makeSwitch(i int, network, version string, initSwitch func(int, *Switch) *S s.SetNodePrivKey(privKey) return s } + +// AddPeerWithConnection is a helper function for testing. +func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config cfg.Config, privKey crypto.PrivKeyEd25519) error { + peer, err := newPeerFromExistingConn(conn, outbound, reactorsByCh, chDescs, onPeerError, config, privKey) + if err != nil { + peer.CloseConn() + return err + } + + if err = sw.AddPeer(peer); err != nil { + peer.CloseConn() + return err + } + + return nil +} diff --git a/switch_test.go b/switch_test.go index 1b2ccd743..727bd2a1f 100644 --- a/switch_test.go +++ b/switch_test.go @@ -3,15 +3,19 @@ package p2p import ( "bytes" "fmt" + golog "log" "net" "sync" "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" . "github.com/tendermint/go-common" + cmn "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" - "github.com/tendermint/go-crypto" - "github.com/tendermint/go-wire" + crypto "github.com/tendermint/go-crypto" + wire "github.com/tendermint/go-wire" ) var ( @@ -21,7 +25,6 @@ var ( func init() { config = cfg.NewMapConfig(nil) setConfigDefaults(config) - } type PeerMessage struct { @@ -174,8 +177,12 @@ func TestConnAddrFilter(t *testing.T) { }) // connect to good peer - go s1.AddPeerWithConnection(c1, false) // AddPeer is blocking, requires handshake. - go s2.AddPeerWithConnection(c2, true) + go func() { + s1.AddPeerWithConnection(c1, false, s1.reactorsByCh, s1.chDescs, s1.StopPeerForError, s1.config, s1.nodePrivKey) + }() + go func() { + s2.AddPeerWithConnection(c2, true, s2.reactorsByCh, s2.chDescs, s2.StopPeerForError, s2.config, s2.nodePrivKey) + }() // Wait for things to happen, peers to get added... time.Sleep(100 * time.Millisecond * time.Duration(4)) @@ -205,8 +212,12 @@ func TestConnPubKeyFilter(t *testing.T) { }) // connect to good peer - go s1.AddPeerWithConnection(c1, false) // AddPeer is blocking, requires handshake. - go s2.AddPeerWithConnection(c2, true) + go func() { + s1.AddPeerWithConnection(c1, false, s1.reactorsByCh, s1.chDescs, s1.StopPeerForError, s1.config, s1.nodePrivKey) + }() + go func() { + s2.AddPeerWithConnection(c2, true, s2.reactorsByCh, s2.chDescs, s2.StopPeerForError, s2.config, s2.nodePrivKey) + }() // Wait for things to happen, peers to get added... time.Sleep(100 * time.Millisecond * time.Duration(4)) @@ -221,6 +232,63 @@ func TestConnPubKeyFilter(t *testing.T) { } } +func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + sw := makeSwitch(1, "testing", "123.123.123", initSwitchFunc) + sw.Start() + defer sw.Stop() + + sw2 := makeSwitch(2, "testing", "123.123.123", initSwitchFunc) + defer sw2.Stop() + l, serverAddr := listenTCP() + done := make(chan struct{}) + go accept(l, done, sw2) + defer close(done) + + peer, err := newPeer(NewNetAddress(serverAddr), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey) + require.Nil(err) + err = sw.AddPeer(peer) + require.Nil(err) + + // simulate failure by closing connection + peer.CloseConn() + + time.Sleep(100 * time.Millisecond) + + assert.Zero(sw.Peers().Size()) + assert.False(peer.IsRunning()) +} + +func TestSwitchReconnectsToPeerIfItIsPersistent(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + sw := makeSwitch(1, "testing", "123.123.123", initSwitchFunc) + sw.Start() + defer sw.Stop() + + sw2 := makeSwitch(2, "testing", "123.123.123", initSwitchFunc) + defer sw2.Stop() + l, serverAddr := listenTCP() + done := make(chan struct{}) + go accept(l, done, sw2) + defer close(done) + + peer, err := newPeer(NewNetAddress(serverAddr), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey) + peer.MakePersistent() + require.Nil(err) + err = sw.AddPeer(peer) + require.Nil(err) + + // simulate failure by closing connection + peer.CloseConn() + + time.Sleep(100 * time.Millisecond) + + assert.NotZero(sw.Peers().Size()) + assert.False(peer.IsRunning()) +} + func BenchmarkSwitches(b *testing.B) { b.StopTimer() @@ -252,9 +320,9 @@ func BenchmarkSwitches(b *testing.B) { successChan := s1.Broadcast(chID, "test data") for s := range successChan { if s { - numSuccess += 1 + numSuccess++ } else { - numFailure += 1 + numFailure++ } } } @@ -266,3 +334,48 @@ func BenchmarkSwitches(b *testing.B) { time.Sleep(1000 * time.Millisecond) } + +func listenTCP() (net.Listener, net.Addr) { + l, e := net.Listen("tcp", "127.0.0.1:0") // any available address + if e != nil { + golog.Fatalf("net.Listen tcp :0: %+v", e) + } + return l, l.Addr() +} + +// simulate remote peer +func accept(l net.Listener, done <-chan struct{}, sw *Switch) { + for { + conn, err := l.Accept() + if err != nil { + golog.Fatalf("Failed to accept conn: %+v", err) + } + conn, err = MakeSecretConnection(conn, sw.nodePrivKey) + if err != nil { + golog.Fatalf("Failed to make secret conn: %+v", err) + } + var err1, err2 error + nodeInfo := new(NodeInfo) + cmn.Parallel( + func() { + var n int + wire.WriteBinary(sw.nodeInfo, conn, &n, &err1) + }, + func() { + var n int + wire.ReadBinary(nodeInfo, conn, maxNodeInfoSize, &n, &err2) + }) + if err1 != nil { + golog.Fatalf("Failed to do handshake: %+v", err1) + } + if err2 != nil { + golog.Fatalf("Failed to do handshake: %+v", err2) + } + select { + case <-done: + conn.Close() + return + default: + } + } +}