Browse Source

persistent peers (Refs #13)

pull/456/head
Anton Kaliaev 8 years ago
parent
commit
8bb3a2e1d7
No known key found for this signature in database GPG Key ID: 7B6881D965918214
4 changed files with 377 additions and 135 deletions
  1. +163
    -41
      peer.go
  2. +2
    -2
      pex_reactor.go
  3. +90
    -83
      switch.go
  4. +122
    -9
      switch_test.go

+ 163
- 41
peer.go View File

@ -4,105 +4,187 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"time"
. "github.com/tendermint/go-common"
cmn "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config" cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-wire"
crypto "github.com/tendermint/go-crypto"
wire "github.com/tendermint/go-wire"
) )
// Peer could be marked as persistent, in which case you can use
// Redial function to reconnect. Note that inbound peers can't be
// made persistent. They should be made persistent on the other end.
//
// Before using a peer, you will need to perform a handshake on connection.
type Peer struct { type Peer struct {
BaseService
cmn.BaseService
outbound bool outbound bool
mconn *MConnection
conn net.Conn // source connection
mconn *MConnection // multiplex connection
authEnc bool // authenticated encryption
persistent bool
config cfg.Config
*NodeInfo *NodeInfo
Key string Key string
Data *CMap // User data.
Data *cmn.CMap // User data.
}
func newPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config cfg.Config, privKey crypto.PrivKeyEd25519) (*Peer, error) {
conn, err := dial(addr, config)
if err != nil {
return nil, err
}
// outbound = true
return newPeerFromExistingConn(conn, true, reactorsByCh, chDescs, onPeerError, config, privKey)
}
func newPeerFromExistingConn(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config cfg.Config, privKey crypto.PrivKeyEd25519) (*Peer, error) {
// Encrypt connection
if config.GetBool(configKeyAuthEnc) {
var err error
conn, err = MakeSecretConnection(conn, privKey)
if err != nil {
return nil, err
}
}
p := &Peer{
outbound: outbound,
authEnc: config.GetBool(configKeyAuthEnc),
conn: conn,
config: config,
Data: cmn.NewCMap(),
}
p.mconn = createMConnection(conn, p, reactorsByCh, chDescs, onPeerError, config)
p.BaseService = *cmn.NewBaseService(log, "Peer", p)
return p, nil
}
// CloseConn should be used when the peer was created, but never started.
func (p *Peer) CloseConn() {
p.conn.Close()
}
// MakePersistent marks the peer as persistent.
func (p *Peer) MakePersistent() {
if !p.outbound {
panic("inbound peers can't be made persistent")
}
p.persistent = true
} }
// IsPersistent returns true if the peer is persitent, false otherwise.
func (p *Peer) IsPersistent() bool {
return p.persistent
}
// HandshakeTimeout performs a handshake between a given node and the peer.
// NOTE: blocking // NOTE: blocking
// Before creating a peer with newPeer(), perform a handshake on connection.
func peerHandshake(conn net.Conn, ourNodeInfo *NodeInfo) (*NodeInfo, error) {
func (p *Peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) error {
// Set deadline for handshake so we don't block forever on conn.ReadFull
p.conn.SetDeadline(time.Now().Add(timeout))
var peerNodeInfo = new(NodeInfo) var peerNodeInfo = new(NodeInfo)
var err1 error var err1 error
var err2 error var err2 error
Parallel(
cmn.Parallel(
func() { func() {
var n int var n int
wire.WriteBinary(ourNodeInfo, conn, &n, &err1)
wire.WriteBinary(ourNodeInfo, p.conn, &n, &err1)
}, },
func() { func() {
var n int var n int
wire.ReadBinary(peerNodeInfo, conn, maxNodeInfoSize, &n, &err2)
wire.ReadBinary(peerNodeInfo, p.conn, maxNodeInfoSize, &n, &err2)
log.Notice("Peer handshake", "peerNodeInfo", peerNodeInfo) log.Notice("Peer handshake", "peerNodeInfo", peerNodeInfo)
}) })
if err1 != nil { if err1 != nil {
return nil, err1
return err1
} }
if err2 != nil { if err2 != nil {
return nil, err2
return err2
} }
peerNodeInfo.RemoteAddr = conn.RemoteAddr().String()
return peerNodeInfo, nil
}
// NOTE: call peerHandshake on conn before calling newPeer().
func newPeer(config cfg.Config, conn net.Conn, peerNodeInfo *NodeInfo, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
var p *Peer
onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID]
if reactor == nil {
PanicSanity(Fmt("Unknown channel %X", chID))
if p.authEnc {
// Check that the professed PubKey matches the sconn's.
if !peerNodeInfo.PubKey.Equals(p.PubKey()) {
return fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v",
peerNodeInfo.PubKey, p.PubKey())
} }
reactor.Receive(chID, p, msgBytes)
} }
onError := func(r interface{}) {
p.Stop()
onPeerError(p, r)
}
mconnConfig := &MConnectionConfig{
SendRate: int64(config.GetInt(configKeySendRate)),
RecvRate: int64(config.GetInt(configKeyRecvRate)),
// Remove deadline
p.conn.SetDeadline(time.Time{})
peerNodeInfo.RemoteAddr = p.RemoteAddr().String()
p.NodeInfo = peerNodeInfo
p.Key = peerNodeInfo.PubKey.KeyString()
return nil
}
// RemoteAddr returns the remote network address.
func (p *Peer) RemoteAddr() net.Addr {
return p.conn.RemoteAddr()
}
// PubKey returns the remote public key.
func (p *Peer) PubKey() crypto.PubKeyEd25519 {
if p.authEnc {
return p.conn.(*SecretConnection).RemotePubKey()
} }
mconn := NewMConnectionWithConfig(conn, chDescs, onReceive, onError, mconnConfig)
p = &Peer{
outbound: outbound,
mconn: mconn,
NodeInfo: peerNodeInfo,
Key: peerNodeInfo.PubKey.KeyString(),
Data: NewCMap(),
if p.NodeInfo == nil {
panic("Attempt to get peer's PubKey before calling Handshake")
} }
p.BaseService = *NewBaseService(log, "Peer", p)
return p
return p.PubKey()
} }
// OnStart implements BaseService.
func (p *Peer) OnStart() error { func (p *Peer) OnStart() error {
p.BaseService.OnStart() p.BaseService.OnStart()
_, err := p.mconn.Start() _, err := p.mconn.Start()
return err return err
} }
// OnStop implements BaseService.
func (p *Peer) OnStop() { func (p *Peer) OnStop() {
p.BaseService.OnStop() p.BaseService.OnStop()
p.mconn.Stop() p.mconn.Stop()
} }
// Connection returns underlying MConnection.
func (p *Peer) Connection() *MConnection { func (p *Peer) Connection() *MConnection {
return p.mconn return p.mconn
} }
// IsOutbound returns true if the connection is outbound, false otherwise.
func (p *Peer) IsOutbound() bool { func (p *Peer) IsOutbound() bool {
return p.outbound return p.outbound
} }
// Send msg to the channel identified by chID byte. Returns false if the send
// queue is full after timeout, specified by MConnection.
func (p *Peer) Send(chID byte, msg interface{}) bool { func (p *Peer) Send(chID byte, msg interface{}) bool {
if !p.IsRunning() { if !p.IsRunning() {
// see Switch#Broadcast, where we fetch the list of peers and loop over
// them - while we're looping, one peer may be removed and stopped.
return false return false
} }
return p.mconn.Send(chID, msg) return p.mconn.Send(chID, msg)
} }
// TrySend msg to the channel identified by chID byte. Immediately returns
// false if the send queue is full.
func (p *Peer) TrySend(chID byte, msg interface{}) bool { func (p *Peer) TrySend(chID byte, msg interface{}) bool {
if !p.IsRunning() { if !p.IsRunning() {
return false return false
@ -110,6 +192,7 @@ func (p *Peer) TrySend(chID byte, msg interface{}) bool {
return p.mconn.TrySend(chID, msg) return p.mconn.TrySend(chID, msg)
} }
// CanSend returns true if the send queue is not full, false otherwise.
func (p *Peer) CanSend(chID byte) bool { func (p *Peer) CanSend(chID byte) bool {
if !p.IsRunning() { if !p.IsRunning() {
return false return false
@ -117,6 +200,7 @@ func (p *Peer) CanSend(chID byte) bool {
return p.mconn.CanSend(chID) return p.mconn.CanSend(chID)
} }
// WriteTo writes the peer's public key to w.
func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
var n_ int var n_ int
wire.WriteString(p.Key, w, &n_, &err) wire.WriteString(p.Key, w, &n_, &err)
@ -124,18 +208,56 @@ func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
return return
} }
// String representation.
func (p *Peer) String() string { func (p *Peer) String() string {
if p.outbound { if p.outbound {
return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12]) return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12])
} else {
return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
} }
return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
} }
// Equals reports whenever 2 peers are actually represent the same node.
func (p *Peer) Equals(other *Peer) bool { func (p *Peer) Equals(other *Peer) bool {
return p.Key == other.Key return p.Key == other.Key
} }
// Get the data for a given key.
func (p *Peer) Get(key string) interface{} { func (p *Peer) Get(key string) interface{} {
return p.Data.Get(key) return p.Data.Get(key)
} }
func dial(addr *NetAddress, config cfg.Config) (net.Conn, error) {
log.Info("Dialing address", "address", addr)
conn, err := addr.DialTimeout(time.Duration(
config.GetInt(configKeyDialTimeoutSeconds)) * time.Second)
if err != nil {
log.Info("Failed dialing address", "address", addr, "error", err)
return nil, err
}
if config.GetBool(configFuzzEnable) {
conn = FuzzConn(config, conn)
}
return conn, nil
}
func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config cfg.Config) *MConnection {
onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID]
if reactor == nil {
cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
}
reactor.Receive(chID, p, msgBytes)
}
onError := func(r interface{}) {
onPeerError(p, r)
}
mconnConfig := &MConnectionConfig{
SendRate: int64(config.GetInt(configKeySendRate)),
RecvRate: int64(config.GetInt(configKeyRecvRate)),
}
return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, mconnConfig)
}

+ 2
- 2
pex_reactor.go View File

@ -9,7 +9,7 @@ import (
"time" "time"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
wire "github.com/tendermint/go-wire"
) )
var pexErrInvalidMessage = errors.New("Invalid PEX message") var pexErrInvalidMessage = errors.New("Invalid PEX message")
@ -201,7 +201,7 @@ func (pexR *PEXReactor) ensurePeers() {
// Dial picked addresses // Dial picked addresses
for _, item := range toDial.Values() { for _, item := range toDial.Values() {
go func(picked *NetAddress) { go func(picked *NetAddress) {
_, err := pexR.Switch.DialPeerWithAddress(picked)
_, err := pexR.Switch.DialPeerWithAddress(picked, false)
if err != nil { if err != nil {
pexR.book.MarkAttempt(picked) pexR.book.MarkAttempt(picked)
} }


+ 90
- 83
switch.go View File

@ -9,10 +9,15 @@ import (
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config" cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-crypto"
crypto "github.com/tendermint/go-crypto"
"github.com/tendermint/log15" "github.com/tendermint/log15"
) )
const (
reconnectAttempts = 30
reconnectInterval = 3 * time.Second
)
type Reactor interface { type Reactor interface {
Service // Start, Stop Service // Start, Stop
@ -194,78 +199,43 @@ func (sw *Switch) OnStop() {
// NOTE: This performs a blocking handshake before the peer is added. // NOTE: This performs a blocking handshake before the peer is added.
// CONTRACT: If error is returned, peer is nil, and conn is immediately closed. // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
// Filter by addr (ie. ip:port)
if err := sw.FilterConnByAddr(conn.RemoteAddr()); err != nil {
conn.Close()
return nil, err
func (sw *Switch) AddPeer(peer *Peer) error {
if err := sw.FilterConnByAddr(peer.RemoteAddr()); err != nil {
return err
} }
// Set deadline for handshake so we don't block forever on conn.ReadFull
conn.SetDeadline(time.Now().Add(
time.Duration(sw.config.GetInt(configKeyHandshakeTimeoutSeconds)) * time.Second))
// First, encrypt the connection.
var sconn net.Conn = conn
if sw.config.GetBool(configKeyAuthEnc) {
var err error
sconn, err = MakeSecretConnection(conn, sw.nodePrivKey)
if err != nil {
conn.Close()
return nil, err
}
if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil {
return err
} }
// Filter by p2p-key
if err := sw.FilterConnByPubKey(sconn.(*SecretConnection).RemotePubKey()); err != nil {
sconn.Close()
return nil, err
if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.config.GetInt(configKeyHandshakeTimeoutSeconds))*time.Second); err != nil {
return err
} }
// Then, perform node handshake
peerNodeInfo, err := peerHandshake(sconn, sw.nodeInfo)
if err != nil {
sconn.Close()
return nil, err
}
if sw.config.GetBool(configKeyAuthEnc) {
// Check that the professed PubKey matches the sconn's.
if !peerNodeInfo.PubKey.Equals(sconn.(*SecretConnection).RemotePubKey()) {
sconn.Close()
return nil, fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v",
peerNodeInfo.PubKey, sconn.(*SecretConnection).RemotePubKey())
}
}
// Avoid self // Avoid self
if peerNodeInfo.PubKey.Equals(sw.nodeInfo.PubKey) {
sconn.Close()
return nil, fmt.Errorf("Ignoring connection from self")
if sw.nodeInfo.PubKey.Equals(peer.PubKey()) {
return errors.New("Ignoring connection from self")
} }
// Check version, chain id // Check version, chain id
if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
sconn.Close()
return nil, err
if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo); err != nil {
return err
} }
peer := newPeer(sw.config, sconn, peerNodeInfo, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
// Add the peer to .peers // Add the peer to .peers
// ignore if duplicate or if we already have too many for that IP range // ignore if duplicate or if we already have too many for that IP range
if err := sw.peers.Add(peer); err != nil { if err := sw.peers.Add(peer); err != nil {
log.Notice("Ignoring peer", "error", err, "peer", peer) log.Notice("Ignoring peer", "error", err, "peer", peer)
peer.Stop()
return nil, err
return err
} }
// remove deadline and start peer
conn.SetDeadline(time.Time{})
// Start peer
if sw.IsRunning() { if sw.IsRunning() {
sw.startInitPeer(peer) sw.startInitPeer(peer)
} }
log.Notice("Added peer", "peer", peer) log.Notice("Added peer", "peer", peer)
return peer, nil
return nil
} }
func (sw *Switch) FilterConnByAddr(addr net.Addr) error { func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
@ -292,8 +262,10 @@ func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) {
} }
func (sw *Switch) startInitPeer(peer *Peer) { func (sw *Switch) startInitPeer(peer *Peer) {
peer.Start() // spawn send/recv routines
sw.addPeerToReactors(peer) // run AddPeer on each reactor
peer.Start() // spawn send/recv routines
for _, reactor := range sw.reactors {
reactor.AddPeer(peer)
}
} }
// Dial a list of seeds asynchronously in random order // Dial a list of seeds asynchronously in random order
@ -331,7 +303,7 @@ func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
} }
func (sw *Switch) dialSeed(addr *NetAddress) { func (sw *Switch) dialSeed(addr *NetAddress) {
peer, err := sw.DialPeerWithAddress(addr)
peer, err := sw.DialPeerWithAddress(addr, true)
if err != nil { if err != nil {
log.Error("Error dialing seed", "error", err) log.Error("Error dialing seed", "error", err)
return return
@ -340,22 +312,23 @@ func (sw *Switch) dialSeed(addr *NetAddress) {
} }
} }
func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
log.Info("Dialing address", "address", addr)
func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) {
sw.dialing.Set(addr.IP.String(), addr) sw.dialing.Set(addr.IP.String(), addr)
conn, err := addr.DialTimeout(time.Duration(
sw.config.GetInt(configKeyDialTimeoutSeconds)) * time.Second)
sw.dialing.Delete(addr.IP.String())
defer sw.dialing.Delete(addr.IP.String())
peer, err := newPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey)
if persistent {
peer.MakePersistent()
}
if err != nil { if err != nil {
log.Info("Failed dialing address", "address", addr, "error", err)
log.Info("Failed dialing peer", "address", addr, "error", err)
peer.CloseConn()
return nil, err return nil, err
} }
if sw.config.GetBool(configFuzzEnable) {
conn = FuzzConn(sw.config, conn)
}
peer, err := sw.AddPeerWithConnection(conn, true)
err = sw.AddPeer(peer)
if err != nil { if err != nil {
log.Info("Failed adding peer", "address", addr, "conn", conn, "error", err)
log.Info("Failed adding peer", "address", addr, "error", err)
peer.CloseConn()
return nil, err return nil, err
} }
log.Notice("Dialed and added peer", "address", addr, "peer", peer) log.Notice("Dialed and added peer", "address", addr, "peer", peer)
@ -400,31 +373,49 @@ func (sw *Switch) Peers() IPeerSet {
return sw.peers return sw.peers
} }
// Disconnect from a peer due to external error.
// Disconnect from a peer due to external error, retry if it is a persistent peer.
// TODO: make record depending on reason. // TODO: make record depending on reason.
func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
addr := NewNetAddress(peer.RemoteAddr())
log.Notice("Stopping peer for error", "peer", peer, "error", reason) log.Notice("Stopping peer for error", "peer", peer, "error", reason)
sw.peers.Remove(peer)
peer.Stop()
sw.removePeerFromReactors(peer, reason)
sw.stopAndRemovePeer(peer, reason)
if peer.IsPersistent() {
go func() {
log.Notice("Reconnecting to peer", "peer", peer)
for i := 1; i < reconnectAttempts; i++ {
if !sw.IsRunning() {
return
}
peer, err := sw.DialPeerWithAddress(addr, true)
if err != nil {
if i == reconnectAttempts {
log.Notice("Error reconnecting to peer. Giving up", "tries", i, "error", err)
return
}
log.Notice("Error reconnecting to peer. Trying again", "tries", i, "error", err)
time.Sleep(reconnectInterval)
continue
}
log.Notice("Reconnected to peer", "peer", peer)
return
}
}()
}
} }
// Disconnect from a peer gracefully. // Disconnect from a peer gracefully.
// TODO: handle graceful disconnects. // TODO: handle graceful disconnects.
func (sw *Switch) StopPeerGracefully(peer *Peer) { func (sw *Switch) StopPeerGracefully(peer *Peer) {
log.Notice("Stopping peer gracefully") log.Notice("Stopping peer gracefully")
sw.peers.Remove(peer)
peer.Stop()
sw.removePeerFromReactors(peer, nil)
sw.stopAndRemovePeer(peer, nil)
} }
func (sw *Switch) addPeerToReactors(peer *Peer) {
for _, reactor := range sw.reactors {
reactor.AddPeer(peer)
}
}
func (sw *Switch) removePeerFromReactors(peer *Peer, reason interface{}) {
func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
sw.peers.Remove(peer)
peer.Stop()
for _, reactor := range sw.reactors { for _, reactor := range sw.reactors {
reactor.RemovePeer(peer, reason) reactor.RemovePeer(peer, reason)
} }
@ -449,9 +440,9 @@ func (sw *Switch) listenerRoutine(l Listener) {
} }
// New inbound connection! // New inbound connection!
_, err := sw.AddPeerWithConnection(inConn, false)
err := sw.AddPeerWithConnection(inConn, false, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey)
if err != nil { if err != nil {
log.Notice("Ignoring inbound connection: error on AddPeerWithConnection", "address", inConn.RemoteAddr().String(), "error", err)
log.Notice("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "error", err)
continue continue
} }
@ -511,14 +502,14 @@ func Connect2Switches(switches []*Switch, i, j int) {
c1, c2 := net.Pipe() c1, c2 := net.Pipe()
doneCh := make(chan struct{}) doneCh := make(chan struct{})
go func() { go func() {
_, err := switchI.AddPeerWithConnection(c1, false) // AddPeer is blocking, requires handshake.
err := switchI.AddPeerWithConnection(c1, false, switchI.reactorsByCh, switchI.chDescs, switchI.StopPeerForError, switchI.config, switchI.nodePrivKey)
if PanicOnAddPeerErr && err != nil { if PanicOnAddPeerErr && err != nil {
panic(err) panic(err)
} }
doneCh <- struct{}{} doneCh <- struct{}{}
}() }()
go func() { go func() {
_, err := switchJ.AddPeerWithConnection(c2, true)
err := switchJ.AddPeerWithConnection(c2, false, switchJ.reactorsByCh, switchJ.chDescs, switchJ.StopPeerForError, switchJ.config, switchJ.nodePrivKey)
if PanicOnAddPeerErr && err != nil { if PanicOnAddPeerErr && err != nil {
panic(err) panic(err)
} }
@ -552,3 +543,19 @@ func makeSwitch(i int, network, version string, initSwitch func(int, *Switch) *S
s.SetNodePrivKey(privKey) s.SetNodePrivKey(privKey)
return s return s
} }
// AddPeerWithConnection is a helper function for testing.
func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config cfg.Config, privKey crypto.PrivKeyEd25519) error {
peer, err := newPeerFromExistingConn(conn, outbound, reactorsByCh, chDescs, onPeerError, config, privKey)
if err != nil {
peer.CloseConn()
return err
}
if err = sw.AddPeer(peer); err != nil {
peer.CloseConn()
return err
}
return nil
}

+ 122
- 9
switch_test.go View File

@ -3,15 +3,19 @@ package p2p
import ( import (
"bytes" "bytes"
"fmt" "fmt"
golog "log"
"net" "net"
"sync" "sync"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
cmn "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config" cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-crypto"
"github.com/tendermint/go-wire"
crypto "github.com/tendermint/go-crypto"
wire "github.com/tendermint/go-wire"
) )
var ( var (
@ -21,7 +25,6 @@ var (
func init() { func init() {
config = cfg.NewMapConfig(nil) config = cfg.NewMapConfig(nil)
setConfigDefaults(config) setConfigDefaults(config)
} }
type PeerMessage struct { type PeerMessage struct {
@ -174,8 +177,12 @@ func TestConnAddrFilter(t *testing.T) {
}) })
// connect to good peer // connect to good peer
go s1.AddPeerWithConnection(c1, false) // AddPeer is blocking, requires handshake.
go s2.AddPeerWithConnection(c2, true)
go func() {
s1.AddPeerWithConnection(c1, false, s1.reactorsByCh, s1.chDescs, s1.StopPeerForError, s1.config, s1.nodePrivKey)
}()
go func() {
s2.AddPeerWithConnection(c2, true, s2.reactorsByCh, s2.chDescs, s2.StopPeerForError, s2.config, s2.nodePrivKey)
}()
// Wait for things to happen, peers to get added... // Wait for things to happen, peers to get added...
time.Sleep(100 * time.Millisecond * time.Duration(4)) time.Sleep(100 * time.Millisecond * time.Duration(4))
@ -205,8 +212,12 @@ func TestConnPubKeyFilter(t *testing.T) {
}) })
// connect to good peer // connect to good peer
go s1.AddPeerWithConnection(c1, false) // AddPeer is blocking, requires handshake.
go s2.AddPeerWithConnection(c2, true)
go func() {
s1.AddPeerWithConnection(c1, false, s1.reactorsByCh, s1.chDescs, s1.StopPeerForError, s1.config, s1.nodePrivKey)
}()
go func() {
s2.AddPeerWithConnection(c2, true, s2.reactorsByCh, s2.chDescs, s2.StopPeerForError, s2.config, s2.nodePrivKey)
}()
// Wait for things to happen, peers to get added... // Wait for things to happen, peers to get added...
time.Sleep(100 * time.Millisecond * time.Duration(4)) time.Sleep(100 * time.Millisecond * time.Duration(4))
@ -221,6 +232,63 @@ func TestConnPubKeyFilter(t *testing.T) {
} }
} }
func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
assert, require := assert.New(t), require.New(t)
sw := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
sw.Start()
defer sw.Stop()
sw2 := makeSwitch(2, "testing", "123.123.123", initSwitchFunc)
defer sw2.Stop()
l, serverAddr := listenTCP()
done := make(chan struct{})
go accept(l, done, sw2)
defer close(done)
peer, err := newPeer(NewNetAddress(serverAddr), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey)
require.Nil(err)
err = sw.AddPeer(peer)
require.Nil(err)
// simulate failure by closing connection
peer.CloseConn()
time.Sleep(100 * time.Millisecond)
assert.Zero(sw.Peers().Size())
assert.False(peer.IsRunning())
}
func TestSwitchReconnectsToPeerIfItIsPersistent(t *testing.T) {
assert, require := assert.New(t), require.New(t)
sw := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
sw.Start()
defer sw.Stop()
sw2 := makeSwitch(2, "testing", "123.123.123", initSwitchFunc)
defer sw2.Stop()
l, serverAddr := listenTCP()
done := make(chan struct{})
go accept(l, done, sw2)
defer close(done)
peer, err := newPeer(NewNetAddress(serverAddr), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey)
peer.MakePersistent()
require.Nil(err)
err = sw.AddPeer(peer)
require.Nil(err)
// simulate failure by closing connection
peer.CloseConn()
time.Sleep(100 * time.Millisecond)
assert.NotZero(sw.Peers().Size())
assert.False(peer.IsRunning())
}
func BenchmarkSwitches(b *testing.B) { func BenchmarkSwitches(b *testing.B) {
b.StopTimer() b.StopTimer()
@ -252,9 +320,9 @@ func BenchmarkSwitches(b *testing.B) {
successChan := s1.Broadcast(chID, "test data") successChan := s1.Broadcast(chID, "test data")
for s := range successChan { for s := range successChan {
if s { if s {
numSuccess += 1
numSuccess++
} else { } else {
numFailure += 1
numFailure++
} }
} }
} }
@ -266,3 +334,48 @@ func BenchmarkSwitches(b *testing.B) {
time.Sleep(1000 * time.Millisecond) time.Sleep(1000 * time.Millisecond)
} }
func listenTCP() (net.Listener, net.Addr) {
l, e := net.Listen("tcp", "127.0.0.1:0") // any available address
if e != nil {
golog.Fatalf("net.Listen tcp :0: %+v", e)
}
return l, l.Addr()
}
// simulate remote peer
func accept(l net.Listener, done <-chan struct{}, sw *Switch) {
for {
conn, err := l.Accept()
if err != nil {
golog.Fatalf("Failed to accept conn: %+v", err)
}
conn, err = MakeSecretConnection(conn, sw.nodePrivKey)
if err != nil {
golog.Fatalf("Failed to make secret conn: %+v", err)
}
var err1, err2 error
nodeInfo := new(NodeInfo)
cmn.Parallel(
func() {
var n int
wire.WriteBinary(sw.nodeInfo, conn, &n, &err1)
},
func() {
var n int
wire.ReadBinary(nodeInfo, conn, maxNodeInfoSize, &n, &err2)
})
if err1 != nil {
golog.Fatalf("Failed to do handshake: %+v", err1)
}
if err2 != nil {
golog.Fatalf("Failed to do handshake: %+v", err2)
}
select {
case <-done:
conn.Close()
return
default:
}
}
}

Loading…
Cancel
Save