Browse Source

Merge pull request #18 from tendermint/13-reconnect-to-seeds

persistent peers (Refs 13)
pull/456/head
Ethan Buchman 8 years ago
committed by GitHub
parent
commit
ebd3929c0d
9 changed files with 667 additions and 176 deletions
  1. +1
    -4
      .gitignore
  2. +53
    -41
      connection.go
  3. +135
    -0
      connection_test.go
  4. +71
    -0
      glide.lock
  5. +23
    -0
      glide.yaml
  6. +169
    -37
      peer.go
  7. +2
    -2
      pex_reactor.go
  8. +91
    -83
      switch.go
  9. +122
    -9
      switch_test.go

+ 1
- 4
.gitignore View File

@ -1,5 +1,2 @@
*.swp
*.swo
*.bak
.DS_Store
vendor
.glide

+ 53
- 41
connection.go View File

@ -10,25 +10,25 @@ import (
"sync/atomic"
"time"
. "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
cmn "github.com/tendermint/go-common"
flow "github.com/tendermint/go-flowrate/flowrate"
"github.com/tendermint/go-wire" //"github.com/tendermint/log15"
wire "github.com/tendermint/go-wire"
)
const (
numBatchMsgPackets = 10
minReadBufferSize = 1024
minWriteBufferSize = 65536
idleTimeoutMinutes = 5
updateStatsSeconds = 2
pingTimeoutSeconds = 40
flushThrottleMS = 100
updateState = 2 * time.Second
pingTimeout = 40 * time.Second
flushThrottle = 100 * time.Millisecond
defaultSendQueueCapacity = 1
defaultSendRate = int64(512000) // 500KB/s
defaultRecvBufferCapacity = 4096
defaultRecvMessageCapacity = 22020096 // 21MB
defaultSendTimeoutSeconds = 10
defaultRecvMessageCapacity = 22020096 // 21MB
defaultRecvRate = int64(512000) // 500KB/s
defaultSendTimeout = 10 * time.Second
)
type receiveCbFunc func(chID byte, msgBytes []byte)
@ -60,15 +60,13 @@ queue is full.
Inbound message bytes are handled with an onReceive callback function.
*/
type MConnection struct {
BaseService
cmn.BaseService
conn net.Conn
bufReader *bufio.Reader
bufWriter *bufio.Writer
sendMonitor *flow.Monitor
recvMonitor *flow.Monitor
sendRate int64
recvRate int64
send chan struct{}
pong chan struct{}
channels []*Channel
@ -76,35 +74,49 @@ type MConnection struct {
onReceive receiveCbFunc
onError errorCbFunc
errored uint32
config *MConnectionConfig
quit chan struct{}
flushTimer *ThrottleTimer // flush writes as necessary but throttled.
pingTimer *RepeatTimer // send pings periodically
chStatsTimer *RepeatTimer // update channel stats periodically
flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
pingTimer *cmn.RepeatTimer // send pings periodically
chStatsTimer *cmn.RepeatTimer // update channel stats periodically
LocalAddress *NetAddress
RemoteAddress *NetAddress
}
func NewMConnection(config cfg.Config, conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
// MConnectionConfig is a MConnection configuration
type MConnectionConfig struct {
SendRate int64
RecvRate int64
}
// NewMConnection wraps net.Conn and creates multiplex connection
func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
return NewMConnectionWithConfig(
conn,
chDescs,
onReceive,
onError,
&MConnectionConfig{
SendRate: defaultSendRate,
RecvRate: defaultRecvRate,
})
}
// NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnectionConfig) *MConnection {
mconn := &MConnection{
conn: conn,
bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
sendMonitor: flow.New(0, 0),
recvMonitor: flow.New(0, 0),
sendRate: int64(config.GetInt(configKeySendRate)),
recvRate: int64(config.GetInt(configKeyRecvRate)),
send: make(chan struct{}, 1),
pong: make(chan struct{}),
onReceive: onReceive,
onError: onError,
// Initialized in Start()
quit: nil,
flushTimer: nil,
pingTimer: nil,
chStatsTimer: nil,
config: config,
LocalAddress: NewNetAddress(conn.LocalAddr()),
RemoteAddress: NewNetAddress(conn.RemoteAddr()),
@ -123,7 +135,7 @@ func NewMConnection(config cfg.Config, conn net.Conn, chDescs []*ChannelDescript
mconn.channels = channels
mconn.channelsIdx = channelsIdx
mconn.BaseService = *NewBaseService(log, "MConnection", mconn)
mconn.BaseService = *cmn.NewBaseService(log, "MConnection", mconn)
return mconn
}
@ -131,9 +143,9 @@ func NewMConnection(config cfg.Config, conn net.Conn, chDescs []*ChannelDescript
func (c *MConnection) OnStart() error {
c.BaseService.OnStart()
c.quit = make(chan struct{})
c.flushTimer = NewThrottleTimer("flush", flushThrottleMS*time.Millisecond)
c.pingTimer = NewRepeatTimer("ping", pingTimeoutSeconds*time.Second)
c.chStatsTimer = NewRepeatTimer("chStats", updateStatsSeconds*time.Second)
c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle)
c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout)
c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateState)
go c.sendRoutine()
go c.recvRoutine()
return nil
@ -171,7 +183,7 @@ func (c *MConnection) flush() {
func (c *MConnection) _recover() {
if r := recover(); r != nil {
stack := debug.Stack()
err := StackError{r, stack}
err := cmn.StackError{r, stack}
c.stopForError(err)
}
}
@ -196,7 +208,7 @@ func (c *MConnection) Send(chID byte, msg interface{}) bool {
// Send message to channel.
channel, ok := c.channelsIdx[chID]
if !ok {
log.Error(Fmt("Cannot send bytes, unknown channel %X", chID))
log.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
return false
}
@ -225,7 +237,7 @@ func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
// Send message to channel.
channel, ok := c.channelsIdx[chID]
if !ok {
log.Error(Fmt("Cannot send bytes, unknown channel %X", chID))
log.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
return false
}
@ -248,7 +260,7 @@ func (c *MConnection) CanSend(chID byte) bool {
channel, ok := c.channelsIdx[chID]
if !ok {
log.Error(Fmt("Unknown channel %X", chID))
log.Error(cmn.Fmt("Unknown channel %X", chID))
return false
}
return channel.canSend()
@ -314,7 +326,7 @@ func (c *MConnection) sendSomeMsgPackets() bool {
// Block until .sendMonitor says we can write.
// Once we're ready we send more than we asked for,
// but amortized it should even out.
c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.sendRate), true)
c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true)
// Now send some msgPackets.
for i := 0; i < numBatchMsgPackets; i++ {
@ -372,7 +384,7 @@ func (c *MConnection) recvRoutine() {
FOR_LOOP:
for {
// Block until .recvMonitor says we can read.
c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.recvRate), true)
c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true)
/*
// Peek into bufReader for debugging
@ -424,7 +436,7 @@ FOR_LOOP:
}
channel, ok := c.channelsIdx[pkt.ChannelID]
if !ok || channel == nil {
PanicQ(Fmt("Unknown channel %X", pkt.ChannelID))
cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
}
msgBytes, err := channel.recvMsgPacket(pkt)
if err != nil {
@ -439,7 +451,7 @@ FOR_LOOP:
c.onReceive(pkt.ChannelID, msgBytes)
}
default:
PanicSanity(Fmt("Unknown message type %X", pktType))
cmn.PanicSanity(cmn.Fmt("Unknown message type %X", pktType))
}
// TODO: shouldn't this go in the sendRoutine?
@ -524,7 +536,7 @@ type Channel struct {
func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
desc.FillDefaults()
if desc.Priority <= 0 {
PanicSanity("Channel default priority must be a postive integer")
cmn.PanicSanity("Channel default priority must be a postive integer")
}
return &Channel{
conn: conn,
@ -538,9 +550,9 @@ func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
// Queues message to send to this channel.
// Goroutine-safe
// Times out (and returns false) after defaultSendTimeoutSeconds
// Times out (and returns false) after defaultSendTimeout
func (ch *Channel) sendBytes(bytes []byte) bool {
timeout := time.NewTimer(defaultSendTimeoutSeconds * time.Second)
timeout := time.NewTimer(defaultSendTimeout)
select {
case <-timeout.C:
// timeout
@ -593,14 +605,14 @@ func (ch *Channel) isSendPending() bool {
func (ch *Channel) nextMsgPacket() msgPacket {
packet := msgPacket{}
packet.ChannelID = byte(ch.id)
packet.Bytes = ch.sending[:MinInt(maxMsgPacketPayloadSize, len(ch.sending))]
packet.Bytes = ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))]
if len(ch.sending) <= maxMsgPacketPayloadSize {
packet.EOF = byte(0x01)
ch.sending = nil
atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
} else {
packet.EOF = byte(0x00)
ch.sending = ch.sending[MinInt(maxMsgPacketPayloadSize, len(ch.sending)):]
ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):]
}
return packet
}


+ 135
- 0
connection_test.go View File

@ -0,0 +1,135 @@
package p2p_test
import (
"net"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
p2p "github.com/tendermint/go-p2p"
)
func createMConnection(conn net.Conn) *p2p.MConnection {
onReceive := func(chID byte, msgBytes []byte) {
}
onError := func(r interface{}) {
}
return createMConnectionWithCallbacks(conn, onReceive, onError)
}
func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *p2p.MConnection {
chDescs := []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{ID: 0x01, Priority: 1}}
return p2p.NewMConnection(conn, chDescs, onReceive, onError)
}
func TestMConnectionSend(t *testing.T) {
assert, require := assert.New(t), require.New(t)
server, client := net.Pipe()
defer server.Close()
defer client.Close()
mconn := createMConnection(client)
_, err := mconn.Start()
require.Nil(err)
defer mconn.Stop()
msg := "Ant-Man"
assert.True(mconn.Send(0x01, msg))
assert.False(mconn.CanSend(0x01))
server.Read(make([]byte, len(msg)))
assert.True(mconn.CanSend(0x01))
msg = "Spider-Man"
assert.True(mconn.TrySend(0x01, msg))
server.Read(make([]byte, len(msg)))
}
func TestMConnectionReceive(t *testing.T) {
assert, require := assert.New(t), require.New(t)
server, client := net.Pipe()
defer server.Close()
defer client.Close()
receivedCh := make(chan []byte)
errorsCh := make(chan interface{})
onReceive := func(chID byte, msgBytes []byte) {
receivedCh <- msgBytes
}
onError := func(r interface{}) {
errorsCh <- r
}
mconn1 := createMConnectionWithCallbacks(client, onReceive, onError)
_, err := mconn1.Start()
require.Nil(err)
defer mconn1.Stop()
mconn2 := createMConnection(server)
_, err = mconn2.Start()
require.Nil(err)
defer mconn2.Stop()
msg := "Cyclops"
assert.True(mconn2.Send(0x01, msg))
select {
case receivedBytes := <-receivedCh:
assert.Equal([]byte(msg), receivedBytes[2:]) // first 3 bytes are internal
case err := <-errorsCh:
t.Fatalf("Expected %s, got %+v", msg, err)
case <-time.After(500 * time.Millisecond):
t.Fatalf("Did not receive %s message in 500ms", msg)
}
}
func TestMConnectionStatus(t *testing.T) {
assert, require := assert.New(t), require.New(t)
server, client := net.Pipe()
defer server.Close()
defer client.Close()
mconn := createMConnection(client)
_, err := mconn.Start()
require.Nil(err)
defer mconn.Stop()
status := mconn.Status()
assert.NotNil(status)
assert.Zero(status.Channels[0].SendQueueSize)
}
func TestMConnectionStopsAndReturnsError(t *testing.T) {
assert, require := assert.New(t), require.New(t)
server, client := net.Pipe()
defer server.Close()
defer client.Close()
receivedCh := make(chan []byte)
errorsCh := make(chan interface{})
onReceive := func(chID byte, msgBytes []byte) {
receivedCh <- msgBytes
}
onError := func(r interface{}) {
errorsCh <- r
}
mconn := createMConnectionWithCallbacks(client, onReceive, onError)
_, err := mconn.Start()
require.Nil(err)
defer mconn.Stop()
client.Close()
select {
case receivedBytes := <-receivedCh:
t.Fatalf("Expected error, got %v", receivedBytes)
case err := <-errorsCh:
assert.NotNil(err)
assert.False(mconn.IsRunning())
case <-time.After(500 * time.Millisecond):
t.Fatal("Did not receive error in 500ms")
}
}

+ 71
- 0
glide.lock View File

@ -0,0 +1,71 @@
hash: 92a49cbcf88a339e4d29559fe291c30e61eacda1020fd04dfcd97de834e18b3e
updated: 2017-04-10T11:17:14.66226896Z
imports:
- name: github.com/btcsuite/btcd
version: 4b348c1d33373d672edd83fc576892d0e46686d2
subpackages:
- btcec
- name: github.com/BurntSushi/toml
version: b26d9c308763d68093482582cea63d69be07a0f0
- name: github.com/go-stack/stack
version: 100eb0c0a9c5b306ca2fb4f165df21d80ada4b82
- name: github.com/mattn/go-colorable
version: ded68f7a9561c023e790de24279db7ebf473ea80
- name: github.com/mattn/go-isatty
version: fc9e8d8ef48496124e79ae0df75490096eccf6fe
- name: github.com/pkg/errors
version: ff09b135c25aae272398c51a07235b90a75aa4f0
- name: github.com/tendermint/ed25519
version: 1f52c6f8b8a5c7908aff4497c186af344b428925
subpackages:
- edwards25519
- extra25519
- name: github.com/tendermint/go-common
version: dcb015dff6c7af21e65c8e2f3b450df19d38c777
- name: github.com/tendermint/go-config
version: 620dcbbd7d587cf3599dedbf329b64311b0c307a
- name: github.com/tendermint/go-crypto
version: 3f47cfac5fcd9e0f1727c7db980b3559913b3e3a
- name: github.com/tendermint/go-data
version: c955b191240568440ea902e14dad2ce19727543a
- name: github.com/tendermint/go-flowrate
version: a20c98e61957faa93b4014fbd902f20ab9317a6a
subpackages:
- flowrate
- name: github.com/tendermint/go-logger
version: cefb3a45c0bf3c493a04e9bcd9b1540528be59f2
- name: github.com/tendermint/go-wire
version: f530b7af7a8b06e612c2063bff6ace49060a085e
- name: github.com/tendermint/log15
version: ae0f3d6450da9eac7074b439c8e1c3cabf0d5ce6
subpackages:
- term
- name: golang.org/x/crypto
version: 9ef620b9ca2f82b55030ffd4f41327fa9e77a92c
subpackages:
- curve25519
- nacl/box
- nacl/secretbox
- openpgp/armor
- openpgp/errors
- poly1305
- ripemd160
- salsa20/salsa
- name: golang.org/x/sys
version: f3918c30c5c2cb527c0b071a27c35120a6c0719a
subpackages:
- unix
testImports:
- name: github.com/davecgh/go-spew
version: 04cdfd42973bb9c8589fd6a731800cf222fde1a9
subpackages:
- spew
- name: github.com/pmezard/go-difflib
version: d8ed2627bdf02c080bf22230dbb337003b7aba2d
subpackages:
- difflib
- name: github.com/stretchr/testify
version: 4d4bfba8f1d1027c4fdbe371823030df51419987
subpackages:
- assert
- require

+ 23
- 0
glide.yaml View File

@ -0,0 +1,23 @@
package: github.com/tendermint/go-p2p
import:
- package: github.com/tendermint/go-common
- package: github.com/tendermint/go-config
- package: github.com/tendermint/go-crypto
- package: github.com/tendermint/go-data
version: c955b191240568440ea902e14dad2ce19727543a
- package: github.com/tendermint/go-flowrate
subpackages:
- flowrate
- package: github.com/tendermint/go-logger
- package: github.com/tendermint/go-wire
- package: github.com/tendermint/log15
- package: golang.org/x/crypto
subpackages:
- nacl/box
- nacl/secretbox
- ripemd160
testImport:
- package: github.com/stretchr/testify
subpackages:
- assert
- require

+ 169
- 37
peer.go View File

@ -4,101 +4,193 @@ import (
"fmt"
"io"
"net"
"time"
. "github.com/tendermint/go-common"
cmn "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-wire"
crypto "github.com/tendermint/go-crypto"
wire "github.com/tendermint/go-wire"
)
// Peer could be marked as persistent, in which case you can use
// Redial function to reconnect. Note that inbound peers can't be
// made persistent. They should be made persistent on the other end.
//
// Before using a peer, you will need to perform a handshake on connection.
type Peer struct {
BaseService
cmn.BaseService
outbound bool
mconn *MConnection
conn net.Conn // source connection
mconn *MConnection // multiplex connection
authEnc bool // authenticated encryption
persistent bool
config cfg.Config
*NodeInfo
Key string
Data *CMap // User data.
Data *cmn.CMap // User data.
}
func newPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config cfg.Config, privKey crypto.PrivKeyEd25519) (*Peer, error) {
conn, err := dial(addr, config)
if err != nil {
return nil, err
}
// outbound = true
return newPeerFromExistingConn(conn, true, reactorsByCh, chDescs, onPeerError, config, privKey)
}
func newPeerFromExistingConn(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config cfg.Config, privKey crypto.PrivKeyEd25519) (*Peer, error) {
// Encrypt connection
if config.GetBool(configKeyAuthEnc) {
var err error
// Set deadline for handshake so we don't block forever on conn.ReadFull
timeout := time.Duration(config.GetInt(configKeyHandshakeTimeoutSeconds)) * time.Second
conn.SetDeadline(time.Now().Add(timeout))
conn, err = MakeSecretConnection(conn, privKey)
if err != nil {
return nil, err
}
// remove deadline
conn.SetDeadline(time.Time{})
}
// Key and NodeInfo are set after Handshake
p := &Peer{
outbound: outbound,
authEnc: config.GetBool(configKeyAuthEnc),
conn: conn,
config: config,
Data: cmn.NewCMap(),
}
p.mconn = createMConnection(conn, p, reactorsByCh, chDescs, onPeerError, config)
p.BaseService = *cmn.NewBaseService(log, "Peer", p)
return p, nil
}
// CloseConn should be used when the peer was created, but never started.
func (p *Peer) CloseConn() {
p.conn.Close()
}
// makePersistent marks the peer as persistent.
func (p *Peer) makePersistent() {
if !p.outbound {
panic("inbound peers can't be made persistent")
}
p.persistent = true
}
// IsPersistent returns true if the peer is persitent, false otherwise.
func (p *Peer) IsPersistent() bool {
return p.persistent
}
// HandshakeTimeout performs a handshake between a given node and the peer.
// NOTE: blocking
// Before creating a peer with newPeer(), perform a handshake on connection.
func peerHandshake(conn net.Conn, ourNodeInfo *NodeInfo) (*NodeInfo, error) {
func (p *Peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) error {
// Set deadline for handshake so we don't block forever on conn.ReadFull
p.conn.SetDeadline(time.Now().Add(timeout))
var peerNodeInfo = new(NodeInfo)
var err1 error
var err2 error
Parallel(
cmn.Parallel(
func() {
var n int
wire.WriteBinary(ourNodeInfo, conn, &n, &err1)
wire.WriteBinary(ourNodeInfo, p.conn, &n, &err1)
},
func() {
var n int
wire.ReadBinary(peerNodeInfo, conn, maxNodeInfoSize, &n, &err2)
wire.ReadBinary(peerNodeInfo, p.conn, maxNodeInfoSize, &n, &err2)
log.Notice("Peer handshake", "peerNodeInfo", peerNodeInfo)
})
if err1 != nil {
return nil, err1
return err1
}
if err2 != nil {
return nil, err2
return err2
}
peerNodeInfo.RemoteAddr = conn.RemoteAddr().String()
return peerNodeInfo, nil
}
// NOTE: call peerHandshake on conn before calling newPeer().
func newPeer(config cfg.Config, conn net.Conn, peerNodeInfo *NodeInfo, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
var p *Peer
onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID]
if reactor == nil {
PanicSanity(Fmt("Unknown channel %X", chID))
if p.authEnc {
// Check that the professed PubKey matches the sconn's.
if !peerNodeInfo.PubKey.Equals(p.PubKey()) {
return fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v",
peerNodeInfo.PubKey, p.PubKey())
}
reactor.Receive(chID, p, msgBytes)
}
onError := func(r interface{}) {
p.Stop()
onPeerError(p, r)
// Remove deadline
p.conn.SetDeadline(time.Time{})
peerNodeInfo.RemoteAddr = p.RemoteAddr().String()
p.NodeInfo = peerNodeInfo
p.Key = peerNodeInfo.PubKey.KeyString()
return nil
}
// RemoteAddr returns the remote network address.
func (p *Peer) RemoteAddr() net.Addr {
return p.conn.RemoteAddr()
}
// PubKey returns the remote public key.
func (p *Peer) PubKey() crypto.PubKeyEd25519 {
if p.authEnc {
return p.conn.(*SecretConnection).RemotePubKey()
}
mconn := NewMConnection(config, conn, chDescs, onReceive, onError)
p = &Peer{
outbound: outbound,
mconn: mconn,
NodeInfo: peerNodeInfo,
Key: peerNodeInfo.PubKey.KeyString(),
Data: NewCMap(),
if p.NodeInfo == nil {
panic("Attempt to get peer's PubKey before calling Handshake")
}
p.BaseService = *NewBaseService(log, "Peer", p)
return p
return p.PubKey()
}
// OnStart implements BaseService.
func (p *Peer) OnStart() error {
p.BaseService.OnStart()
_, err := p.mconn.Start()
return err
}
// OnStop implements BaseService.
func (p *Peer) OnStop() {
p.BaseService.OnStop()
p.mconn.Stop()
}
// Connection returns underlying MConnection.
func (p *Peer) Connection() *MConnection {
return p.mconn
}
// IsOutbound returns true if the connection is outbound, false otherwise.
func (p *Peer) IsOutbound() bool {
return p.outbound
}
// Send msg to the channel identified by chID byte. Returns false if the send
// queue is full after timeout, specified by MConnection.
func (p *Peer) Send(chID byte, msg interface{}) bool {
if !p.IsRunning() {
// see Switch#Broadcast, where we fetch the list of peers and loop over
// them - while we're looping, one peer may be removed and stopped.
return false
}
return p.mconn.Send(chID, msg)
}
// TrySend msg to the channel identified by chID byte. Immediately returns
// false if the send queue is full.
func (p *Peer) TrySend(chID byte, msg interface{}) bool {
if !p.IsRunning() {
return false
@ -106,6 +198,7 @@ func (p *Peer) TrySend(chID byte, msg interface{}) bool {
return p.mconn.TrySend(chID, msg)
}
// CanSend returns true if the send queue is not full, false otherwise.
func (p *Peer) CanSend(chID byte) bool {
if !p.IsRunning() {
return false
@ -113,6 +206,7 @@ func (p *Peer) CanSend(chID byte) bool {
return p.mconn.CanSend(chID)
}
// WriteTo writes the peer's public key to w.
func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
var n_ int
wire.WriteString(p.Key, w, &n_, &err)
@ -120,18 +214,56 @@ func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
return
}
// String representation.
func (p *Peer) String() string {
if p.outbound {
return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12])
} else {
return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
}
return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
}
// Equals reports whenever 2 peers are actually represent the same node.
func (p *Peer) Equals(other *Peer) bool {
return p.Key == other.Key
}
// Get the data for a given key.
func (p *Peer) Get(key string) interface{} {
return p.Data.Get(key)
}
func dial(addr *NetAddress, config cfg.Config) (net.Conn, error) {
log.Info("Dialing address", "address", addr)
conn, err := addr.DialTimeout(time.Duration(
config.GetInt(configKeyDialTimeoutSeconds)) * time.Second)
if err != nil {
log.Info("Failed dialing address", "address", addr, "error", err)
return nil, err
}
if config.GetBool(configFuzzEnable) {
conn = FuzzConn(config, conn)
}
return conn, nil
}
func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config cfg.Config) *MConnection {
onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID]
if reactor == nil {
cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
}
reactor.Receive(chID, p, msgBytes)
}
onError := func(r interface{}) {
onPeerError(p, r)
}
mconnConfig := &MConnectionConfig{
SendRate: int64(config.GetInt(configKeySendRate)),
RecvRate: int64(config.GetInt(configKeyRecvRate)),
}
return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, mconnConfig)
}

+ 2
- 2
pex_reactor.go View File

@ -9,7 +9,7 @@ import (
"time"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
wire "github.com/tendermint/go-wire"
)
var pexErrInvalidMessage = errors.New("Invalid PEX message")
@ -201,7 +201,7 @@ func (pexR *PEXReactor) ensurePeers() {
// Dial picked addresses
for _, item := range toDial.Values() {
go func(picked *NetAddress) {
_, err := pexR.Switch.DialPeerWithAddress(picked)
_, err := pexR.Switch.DialPeerWithAddress(picked, false)
if err != nil {
pexR.book.MarkAttempt(picked)
}


+ 91
- 83
switch.go View File

@ -9,10 +9,15 @@ import (
. "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-crypto"
crypto "github.com/tendermint/go-crypto"
"github.com/tendermint/log15"
)
const (
reconnectAttempts = 30
reconnectInterval = 3 * time.Second
)
type Reactor interface {
Service // Start, Stop
@ -193,79 +198,45 @@ func (sw *Switch) OnStop() {
}
// NOTE: This performs a blocking handshake before the peer is added.
// CONTRACT: Iff error is returned, peer is nil, and conn is immediately closed.
func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
// Filter by addr (ie. ip:port)
if err := sw.FilterConnByAddr(conn.RemoteAddr()); err != nil {
conn.Close()
return nil, err
// CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
func (sw *Switch) AddPeer(peer *Peer) error {
if err := sw.FilterConnByAddr(peer.RemoteAddr()); err != nil {
return err
}
// Set deadline for handshake so we don't block forever on conn.ReadFull
conn.SetDeadline(time.Now().Add(
time.Duration(sw.config.GetInt(configKeyHandshakeTimeoutSeconds)) * time.Second))
// First, encrypt the connection.
var sconn net.Conn = conn
if sw.config.GetBool(configKeyAuthEnc) {
var err error
sconn, err = MakeSecretConnection(conn, sw.nodePrivKey)
if err != nil {
conn.Close()
return nil, err
}
if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil {
return err
}
// Filter by p2p-key
if err := sw.FilterConnByPubKey(sconn.(*SecretConnection).RemotePubKey()); err != nil {
sconn.Close()
return nil, err
if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.config.GetInt(configKeyHandshakeTimeoutSeconds))*time.Second); err != nil {
return err
}
// Then, perform node handshake
peerNodeInfo, err := peerHandshake(sconn, sw.nodeInfo)
if err != nil {
sconn.Close()
return nil, err
}
if sw.config.GetBool(configKeyAuthEnc) {
// Check that the professed PubKey matches the sconn's.
if !peerNodeInfo.PubKey.Equals(sconn.(*SecretConnection).RemotePubKey()) {
sconn.Close()
return nil, fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v",
peerNodeInfo.PubKey, sconn.(*SecretConnection).RemotePubKey())
}
}
// Avoid self
if peerNodeInfo.PubKey.Equals(sw.nodeInfo.PubKey) {
sconn.Close()
return nil, fmt.Errorf("Ignoring connection from self")
if sw.nodeInfo.PubKey.Equals(peer.PubKey()) {
return errors.New("Ignoring connection from self")
}
// Check version, chain id
if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
sconn.Close()
return nil, err
if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo); err != nil {
return err
}
peer := newPeer(sw.config, sconn, peerNodeInfo, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
// Add the peer to .peers
// ignore if duplicate or if we already have too many for that IP range
if err := sw.peers.Add(peer); err != nil {
log.Notice("Ignoring peer", "error", err, "peer", peer)
peer.Stop()
return nil, err
return err
}
// remove deadline and start peer
conn.SetDeadline(time.Time{})
// Start peer
if sw.IsRunning() {
sw.startInitPeer(peer)
}
log.Notice("Added peer", "peer", peer)
return peer, nil
return nil
}
func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
@ -292,8 +263,10 @@ func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) {
}
func (sw *Switch) startInitPeer(peer *Peer) {
peer.Start() // spawn send/recv routines
sw.addPeerToReactors(peer) // run AddPeer on each reactor
peer.Start() // spawn send/recv routines
for _, reactor := range sw.reactors {
reactor.AddPeer(peer)
}
}
// Dial a list of seeds asynchronously in random order
@ -331,7 +304,7 @@ func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
}
func (sw *Switch) dialSeed(addr *NetAddress) {
peer, err := sw.DialPeerWithAddress(addr)
peer, err := sw.DialPeerWithAddress(addr, true)
if err != nil {
log.Error("Error dialing seed", "error", err)
return
@ -340,22 +313,23 @@ func (sw *Switch) dialSeed(addr *NetAddress) {
}
}
func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
log.Info("Dialing address", "address", addr)
func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) {
sw.dialing.Set(addr.IP.String(), addr)
conn, err := addr.DialTimeout(time.Duration(
sw.config.GetInt(configKeyDialTimeoutSeconds)) * time.Second)
sw.dialing.Delete(addr.IP.String())
defer sw.dialing.Delete(addr.IP.String())
peer, err := newPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey)
if persistent {
peer.makePersistent()
}
if err != nil {
log.Info("Failed dialing address", "address", addr, "error", err)
log.Info("Failed dialing peer", "address", addr, "error", err)
peer.CloseConn()
return nil, err
}
if sw.config.GetBool(configFuzzEnable) {
conn = FuzzConn(sw.config, conn)
}
peer, err := sw.AddPeerWithConnection(conn, true)
err = sw.AddPeer(peer)
if err != nil {
log.Info("Failed adding peer", "address", addr, "conn", conn, "error", err)
log.Info("Failed adding peer", "address", addr, "error", err)
peer.CloseConn()
return nil, err
}
log.Notice("Dialed and added peer", "address", addr, "peer", peer)
@ -400,31 +374,49 @@ func (sw *Switch) Peers() IPeerSet {
return sw.peers
}
// Disconnect from a peer due to external error.
// Disconnect from a peer due to external error, retry if it is a persistent peer.
// TODO: make record depending on reason.
func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
addr := NewNetAddress(peer.RemoteAddr())
log.Notice("Stopping peer for error", "peer", peer, "error", reason)
sw.peers.Remove(peer)
peer.Stop()
sw.removePeerFromReactors(peer, reason)
sw.stopAndRemovePeer(peer, reason)
if peer.IsPersistent() {
go func() {
log.Notice("Reconnecting to peer", "peer", peer)
for i := 1; i < reconnectAttempts; i++ {
if !sw.IsRunning() {
return
}
peer, err := sw.DialPeerWithAddress(addr, true)
if err != nil {
if i == reconnectAttempts {
log.Notice("Error reconnecting to peer. Giving up", "tries", i, "error", err)
return
}
log.Notice("Error reconnecting to peer. Trying again", "tries", i, "error", err)
time.Sleep(reconnectInterval)
continue
}
log.Notice("Reconnected to peer", "peer", peer)
return
}
}()
}
}
// Disconnect from a peer gracefully.
// TODO: handle graceful disconnects.
func (sw *Switch) StopPeerGracefully(peer *Peer) {
log.Notice("Stopping peer gracefully")
sw.peers.Remove(peer)
peer.Stop()
sw.removePeerFromReactors(peer, nil)
}
func (sw *Switch) addPeerToReactors(peer *Peer) {
for _, reactor := range sw.reactors {
reactor.AddPeer(peer)
}
sw.stopAndRemovePeer(peer, nil)
}
func (sw *Switch) removePeerFromReactors(peer *Peer, reason interface{}) {
func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
sw.peers.Remove(peer)
peer.Stop()
for _, reactor := range sw.reactors {
reactor.RemovePeer(peer, reason)
}
@ -449,9 +441,9 @@ func (sw *Switch) listenerRoutine(l Listener) {
}
// New inbound connection!
_, err := sw.AddPeerWithConnection(inConn, false)
err := sw.AddPeerWithConnection(inConn, false)
if err != nil {
log.Notice("Ignoring inbound connection: error on AddPeerWithConnection", "address", inConn.RemoteAddr().String(), "error", err)
log.Notice("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "error", err)
continue
}
@ -511,14 +503,14 @@ func Connect2Switches(switches []*Switch, i, j int) {
c1, c2 := net.Pipe()
doneCh := make(chan struct{})
go func() {
_, err := switchI.AddPeerWithConnection(c1, false) // AddPeer is blocking, requires handshake.
err := switchI.AddPeerWithConnection(c1, false)
if PanicOnAddPeerErr && err != nil {
panic(err)
}
doneCh <- struct{}{}
}()
go func() {
_, err := switchJ.AddPeerWithConnection(c2, true)
err := switchJ.AddPeerWithConnection(c2, false)
if PanicOnAddPeerErr && err != nil {
panic(err)
}
@ -552,3 +544,19 @@ func makeSwitch(i int, network, version string, initSwitch func(int, *Switch) *S
s.SetNodePrivKey(privKey)
return s
}
// AddPeerWithConnection creates a newPeer from the connection, performs the handshake, and adds it to the switch.
func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) error {
peer, err := newPeerFromExistingConn(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey)
if err != nil {
peer.CloseConn()
return err
}
if err = sw.AddPeer(peer); err != nil {
peer.CloseConn()
return err
}
return nil
}

+ 122
- 9
switch_test.go View File

@ -3,15 +3,19 @@ package p2p
import (
"bytes"
"fmt"
golog "log"
"net"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
. "github.com/tendermint/go-common"
cmn "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-crypto"
"github.com/tendermint/go-wire"
crypto "github.com/tendermint/go-crypto"
wire "github.com/tendermint/go-wire"
)
var (
@ -21,7 +25,6 @@ var (
func init() {
config = cfg.NewMapConfig(nil)
setConfigDefaults(config)
}
type PeerMessage struct {
@ -174,8 +177,12 @@ func TestConnAddrFilter(t *testing.T) {
})
// connect to good peer
go s1.AddPeerWithConnection(c1, false) // AddPeer is blocking, requires handshake.
go s2.AddPeerWithConnection(c2, true)
go func() {
s1.AddPeerWithConnection(c1, false)
}()
go func() {
s2.AddPeerWithConnection(c2, true)
}()
// Wait for things to happen, peers to get added...
time.Sleep(100 * time.Millisecond * time.Duration(4))
@ -205,8 +212,12 @@ func TestConnPubKeyFilter(t *testing.T) {
})
// connect to good peer
go s1.AddPeerWithConnection(c1, false) // AddPeer is blocking, requires handshake.
go s2.AddPeerWithConnection(c2, true)
go func() {
s1.AddPeerWithConnection(c1, false)
}()
go func() {
s2.AddPeerWithConnection(c2, true)
}()
// Wait for things to happen, peers to get added...
time.Sleep(100 * time.Millisecond * time.Duration(4))
@ -221,6 +232,63 @@ func TestConnPubKeyFilter(t *testing.T) {
}
}
func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
assert, require := assert.New(t), require.New(t)
sw := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
sw.Start()
defer sw.Stop()
sw2 := makeSwitch(2, "testing", "123.123.123", initSwitchFunc)
defer sw2.Stop()
l, serverAddr := listenTCP()
done := make(chan struct{})
go accept(l, done, sw2)
defer close(done)
peer, err := newPeer(NewNetAddress(serverAddr), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey)
require.Nil(err)
err = sw.AddPeer(peer)
require.Nil(err)
// simulate failure by closing connection
peer.CloseConn()
time.Sleep(100 * time.Millisecond)
assert.Zero(sw.Peers().Size())
assert.False(peer.IsRunning())
}
func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
assert, require := assert.New(t), require.New(t)
sw := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
sw.Start()
defer sw.Stop()
sw2 := makeSwitch(2, "testing", "123.123.123", initSwitchFunc)
defer sw2.Stop()
l, serverAddr := listenTCP()
done := make(chan struct{})
go accept(l, done, sw2)
defer close(done)
peer, err := newPeer(NewNetAddress(serverAddr), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey)
peer.makePersistent()
require.Nil(err)
err = sw.AddPeer(peer)
require.Nil(err)
// simulate failure by closing connection
peer.CloseConn()
time.Sleep(100 * time.Millisecond)
assert.NotZero(sw.Peers().Size())
assert.False(peer.IsRunning())
}
func BenchmarkSwitches(b *testing.B) {
b.StopTimer()
@ -252,9 +320,9 @@ func BenchmarkSwitches(b *testing.B) {
successChan := s1.Broadcast(chID, "test data")
for s := range successChan {
if s {
numSuccess += 1
numSuccess++
} else {
numFailure += 1
numFailure++
}
}
}
@ -266,3 +334,48 @@ func BenchmarkSwitches(b *testing.B) {
time.Sleep(1000 * time.Millisecond)
}
func listenTCP() (net.Listener, net.Addr) {
l, e := net.Listen("tcp", "127.0.0.1:0") // any available address
if e != nil {
golog.Fatalf("net.Listen tcp :0: %+v", e)
}
return l, l.Addr()
}
// simulate remote peer
func accept(l net.Listener, done <-chan struct{}, sw *Switch) {
for {
conn, err := l.Accept()
if err != nil {
golog.Fatalf("Failed to accept conn: %+v", err)
}
conn, err = MakeSecretConnection(conn, sw.nodePrivKey)
if err != nil {
golog.Fatalf("Failed to make secret conn: %+v", err)
}
var err1, err2 error
nodeInfo := new(NodeInfo)
cmn.Parallel(
func() {
var n int
wire.WriteBinary(sw.nodeInfo, conn, &n, &err1)
},
func() {
var n int
wire.ReadBinary(nodeInfo, conn, maxNodeInfoSize, &n, &err2)
})
if err1 != nil {
golog.Fatalf("Failed to do handshake: %+v", err1)
}
if err2 != nil {
golog.Fatalf("Failed to do handshake: %+v", err2)
}
select {
case <-done:
conn.Close()
return
default:
}
}
}

Loading…
Cancel
Save