diff --git a/.gitignore b/.gitignore index c765108df..62f28681c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,2 @@ -*.swp -*.swo -*.bak -.DS_Store vendor +.glide diff --git a/connection.go b/connection.go index 4428e0da7..e61608896 100644 --- a/connection.go +++ b/connection.go @@ -10,25 +10,25 @@ import ( "sync/atomic" "time" - . "github.com/tendermint/go-common" - cfg "github.com/tendermint/go-config" + cmn "github.com/tendermint/go-common" flow "github.com/tendermint/go-flowrate/flowrate" - "github.com/tendermint/go-wire" //"github.com/tendermint/log15" + wire "github.com/tendermint/go-wire" ) const ( numBatchMsgPackets = 10 minReadBufferSize = 1024 minWriteBufferSize = 65536 - idleTimeoutMinutes = 5 - updateStatsSeconds = 2 - pingTimeoutSeconds = 40 - flushThrottleMS = 100 + updateState = 2 * time.Second + pingTimeout = 40 * time.Second + flushThrottle = 100 * time.Millisecond defaultSendQueueCapacity = 1 + defaultSendRate = int64(512000) // 500KB/s defaultRecvBufferCapacity = 4096 - defaultRecvMessageCapacity = 22020096 // 21MB - defaultSendTimeoutSeconds = 10 + defaultRecvMessageCapacity = 22020096 // 21MB + defaultRecvRate = int64(512000) // 500KB/s + defaultSendTimeout = 10 * time.Second ) type receiveCbFunc func(chID byte, msgBytes []byte) @@ -60,15 +60,13 @@ queue is full. Inbound message bytes are handled with an onReceive callback function. */ type MConnection struct { - BaseService + cmn.BaseService conn net.Conn bufReader *bufio.Reader bufWriter *bufio.Writer sendMonitor *flow.Monitor recvMonitor *flow.Monitor - sendRate int64 - recvRate int64 send chan struct{} pong chan struct{} channels []*Channel @@ -76,35 +74,49 @@ type MConnection struct { onReceive receiveCbFunc onError errorCbFunc errored uint32 + config *MConnectionConfig quit chan struct{} - flushTimer *ThrottleTimer // flush writes as necessary but throttled. - pingTimer *RepeatTimer // send pings periodically - chStatsTimer *RepeatTimer // update channel stats periodically + flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled. + pingTimer *cmn.RepeatTimer // send pings periodically + chStatsTimer *cmn.RepeatTimer // update channel stats periodically LocalAddress *NetAddress RemoteAddress *NetAddress } -func NewMConnection(config cfg.Config, conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection { +// MConnectionConfig is a MConnection configuration +type MConnectionConfig struct { + SendRate int64 + RecvRate int64 +} + +// NewMConnection wraps net.Conn and creates multiplex connection +func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection { + return NewMConnectionWithConfig( + conn, + chDescs, + onReceive, + onError, + &MConnectionConfig{ + SendRate: defaultSendRate, + RecvRate: defaultRecvRate, + }) +} + +// NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config +func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnectionConfig) *MConnection { mconn := &MConnection{ conn: conn, bufReader: bufio.NewReaderSize(conn, minReadBufferSize), bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize), sendMonitor: flow.New(0, 0), recvMonitor: flow.New(0, 0), - sendRate: int64(config.GetInt(configKeySendRate)), - recvRate: int64(config.GetInt(configKeyRecvRate)), send: make(chan struct{}, 1), pong: make(chan struct{}), onReceive: onReceive, onError: onError, - - // Initialized in Start() - quit: nil, - flushTimer: nil, - pingTimer: nil, - chStatsTimer: nil, + config: config, LocalAddress: NewNetAddress(conn.LocalAddr()), RemoteAddress: NewNetAddress(conn.RemoteAddr()), @@ -123,7 +135,7 @@ func NewMConnection(config cfg.Config, conn net.Conn, chDescs []*ChannelDescript mconn.channels = channels mconn.channelsIdx = channelsIdx - mconn.BaseService = *NewBaseService(log, "MConnection", mconn) + mconn.BaseService = *cmn.NewBaseService(log, "MConnection", mconn) return mconn } @@ -131,9 +143,9 @@ func NewMConnection(config cfg.Config, conn net.Conn, chDescs []*ChannelDescript func (c *MConnection) OnStart() error { c.BaseService.OnStart() c.quit = make(chan struct{}) - c.flushTimer = NewThrottleTimer("flush", flushThrottleMS*time.Millisecond) - c.pingTimer = NewRepeatTimer("ping", pingTimeoutSeconds*time.Second) - c.chStatsTimer = NewRepeatTimer("chStats", updateStatsSeconds*time.Second) + c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle) + c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout) + c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateState) go c.sendRoutine() go c.recvRoutine() return nil @@ -171,7 +183,7 @@ func (c *MConnection) flush() { func (c *MConnection) _recover() { if r := recover(); r != nil { stack := debug.Stack() - err := StackError{r, stack} + err := cmn.StackError{r, stack} c.stopForError(err) } } @@ -196,7 +208,7 @@ func (c *MConnection) Send(chID byte, msg interface{}) bool { // Send message to channel. channel, ok := c.channelsIdx[chID] if !ok { - log.Error(Fmt("Cannot send bytes, unknown channel %X", chID)) + log.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID)) return false } @@ -225,7 +237,7 @@ func (c *MConnection) TrySend(chID byte, msg interface{}) bool { // Send message to channel. channel, ok := c.channelsIdx[chID] if !ok { - log.Error(Fmt("Cannot send bytes, unknown channel %X", chID)) + log.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID)) return false } @@ -248,7 +260,7 @@ func (c *MConnection) CanSend(chID byte) bool { channel, ok := c.channelsIdx[chID] if !ok { - log.Error(Fmt("Unknown channel %X", chID)) + log.Error(cmn.Fmt("Unknown channel %X", chID)) return false } return channel.canSend() @@ -314,7 +326,7 @@ func (c *MConnection) sendSomeMsgPackets() bool { // Block until .sendMonitor says we can write. // Once we're ready we send more than we asked for, // but amortized it should even out. - c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.sendRate), true) + c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true) // Now send some msgPackets. for i := 0; i < numBatchMsgPackets; i++ { @@ -372,7 +384,7 @@ func (c *MConnection) recvRoutine() { FOR_LOOP: for { // Block until .recvMonitor says we can read. - c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.recvRate), true) + c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true) /* // Peek into bufReader for debugging @@ -424,7 +436,7 @@ FOR_LOOP: } channel, ok := c.channelsIdx[pkt.ChannelID] if !ok || channel == nil { - PanicQ(Fmt("Unknown channel %X", pkt.ChannelID)) + cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID)) } msgBytes, err := channel.recvMsgPacket(pkt) if err != nil { @@ -439,7 +451,7 @@ FOR_LOOP: c.onReceive(pkt.ChannelID, msgBytes) } default: - PanicSanity(Fmt("Unknown message type %X", pktType)) + cmn.PanicSanity(cmn.Fmt("Unknown message type %X", pktType)) } // TODO: shouldn't this go in the sendRoutine? @@ -524,7 +536,7 @@ type Channel struct { func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel { desc.FillDefaults() if desc.Priority <= 0 { - PanicSanity("Channel default priority must be a postive integer") + cmn.PanicSanity("Channel default priority must be a postive integer") } return &Channel{ conn: conn, @@ -538,9 +550,9 @@ func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel { // Queues message to send to this channel. // Goroutine-safe -// Times out (and returns false) after defaultSendTimeoutSeconds +// Times out (and returns false) after defaultSendTimeout func (ch *Channel) sendBytes(bytes []byte) bool { - timeout := time.NewTimer(defaultSendTimeoutSeconds * time.Second) + timeout := time.NewTimer(defaultSendTimeout) select { case <-timeout.C: // timeout @@ -593,14 +605,14 @@ func (ch *Channel) isSendPending() bool { func (ch *Channel) nextMsgPacket() msgPacket { packet := msgPacket{} packet.ChannelID = byte(ch.id) - packet.Bytes = ch.sending[:MinInt(maxMsgPacketPayloadSize, len(ch.sending))] + packet.Bytes = ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))] if len(ch.sending) <= maxMsgPacketPayloadSize { packet.EOF = byte(0x01) ch.sending = nil atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize } else { packet.EOF = byte(0x00) - ch.sending = ch.sending[MinInt(maxMsgPacketPayloadSize, len(ch.sending)):] + ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):] } return packet } diff --git a/connection_test.go b/connection_test.go new file mode 100644 index 000000000..cd6bb4a8f --- /dev/null +++ b/connection_test.go @@ -0,0 +1,135 @@ +package p2p_test + +import ( + "net" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + p2p "github.com/tendermint/go-p2p" +) + +func createMConnection(conn net.Conn) *p2p.MConnection { + onReceive := func(chID byte, msgBytes []byte) { + } + onError := func(r interface{}) { + } + return createMConnectionWithCallbacks(conn, onReceive, onError) +} + +func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *p2p.MConnection { + chDescs := []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{ID: 0x01, Priority: 1}} + return p2p.NewMConnection(conn, chDescs, onReceive, onError) +} + +func TestMConnectionSend(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + mconn := createMConnection(client) + _, err := mconn.Start() + require.Nil(err) + defer mconn.Stop() + + msg := "Ant-Man" + assert.True(mconn.Send(0x01, msg)) + assert.False(mconn.CanSend(0x01)) + server.Read(make([]byte, len(msg))) + assert.True(mconn.CanSend(0x01)) + + msg = "Spider-Man" + assert.True(mconn.TrySend(0x01, msg)) + server.Read(make([]byte, len(msg))) +} + +func TestMConnectionReceive(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + receivedCh := make(chan []byte) + errorsCh := make(chan interface{}) + onReceive := func(chID byte, msgBytes []byte) { + receivedCh <- msgBytes + } + onError := func(r interface{}) { + errorsCh <- r + } + mconn1 := createMConnectionWithCallbacks(client, onReceive, onError) + _, err := mconn1.Start() + require.Nil(err) + defer mconn1.Stop() + + mconn2 := createMConnection(server) + _, err = mconn2.Start() + require.Nil(err) + defer mconn2.Stop() + + msg := "Cyclops" + assert.True(mconn2.Send(0x01, msg)) + + select { + case receivedBytes := <-receivedCh: + assert.Equal([]byte(msg), receivedBytes[2:]) // first 3 bytes are internal + case err := <-errorsCh: + t.Fatalf("Expected %s, got %+v", msg, err) + case <-time.After(500 * time.Millisecond): + t.Fatalf("Did not receive %s message in 500ms", msg) + } +} + +func TestMConnectionStatus(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + mconn := createMConnection(client) + _, err := mconn.Start() + require.Nil(err) + defer mconn.Stop() + + status := mconn.Status() + assert.NotNil(status) + assert.Zero(status.Channels[0].SendQueueSize) +} + +func TestMConnectionStopsAndReturnsError(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + receivedCh := make(chan []byte) + errorsCh := make(chan interface{}) + onReceive := func(chID byte, msgBytes []byte) { + receivedCh <- msgBytes + } + onError := func(r interface{}) { + errorsCh <- r + } + mconn := createMConnectionWithCallbacks(client, onReceive, onError) + _, err := mconn.Start() + require.Nil(err) + defer mconn.Stop() + + client.Close() + + select { + case receivedBytes := <-receivedCh: + t.Fatalf("Expected error, got %v", receivedBytes) + case err := <-errorsCh: + assert.NotNil(err) + assert.False(mconn.IsRunning()) + case <-time.After(500 * time.Millisecond): + t.Fatal("Did not receive error in 500ms") + } +} diff --git a/glide.lock b/glide.lock new file mode 100644 index 000000000..797c86031 --- /dev/null +++ b/glide.lock @@ -0,0 +1,71 @@ +hash: 92a49cbcf88a339e4d29559fe291c30e61eacda1020fd04dfcd97de834e18b3e +updated: 2017-04-10T11:17:14.66226896Z +imports: +- name: github.com/btcsuite/btcd + version: 4b348c1d33373d672edd83fc576892d0e46686d2 + subpackages: + - btcec +- name: github.com/BurntSushi/toml + version: b26d9c308763d68093482582cea63d69be07a0f0 +- name: github.com/go-stack/stack + version: 100eb0c0a9c5b306ca2fb4f165df21d80ada4b82 +- name: github.com/mattn/go-colorable + version: ded68f7a9561c023e790de24279db7ebf473ea80 +- name: github.com/mattn/go-isatty + version: fc9e8d8ef48496124e79ae0df75490096eccf6fe +- name: github.com/pkg/errors + version: ff09b135c25aae272398c51a07235b90a75aa4f0 +- name: github.com/tendermint/ed25519 + version: 1f52c6f8b8a5c7908aff4497c186af344b428925 + subpackages: + - edwards25519 + - extra25519 +- name: github.com/tendermint/go-common + version: dcb015dff6c7af21e65c8e2f3b450df19d38c777 +- name: github.com/tendermint/go-config + version: 620dcbbd7d587cf3599dedbf329b64311b0c307a +- name: github.com/tendermint/go-crypto + version: 3f47cfac5fcd9e0f1727c7db980b3559913b3e3a +- name: github.com/tendermint/go-data + version: c955b191240568440ea902e14dad2ce19727543a +- name: github.com/tendermint/go-flowrate + version: a20c98e61957faa93b4014fbd902f20ab9317a6a + subpackages: + - flowrate +- name: github.com/tendermint/go-logger + version: cefb3a45c0bf3c493a04e9bcd9b1540528be59f2 +- name: github.com/tendermint/go-wire + version: f530b7af7a8b06e612c2063bff6ace49060a085e +- name: github.com/tendermint/log15 + version: ae0f3d6450da9eac7074b439c8e1c3cabf0d5ce6 + subpackages: + - term +- name: golang.org/x/crypto + version: 9ef620b9ca2f82b55030ffd4f41327fa9e77a92c + subpackages: + - curve25519 + - nacl/box + - nacl/secretbox + - openpgp/armor + - openpgp/errors + - poly1305 + - ripemd160 + - salsa20/salsa +- name: golang.org/x/sys + version: f3918c30c5c2cb527c0b071a27c35120a6c0719a + subpackages: + - unix +testImports: +- name: github.com/davecgh/go-spew + version: 04cdfd42973bb9c8589fd6a731800cf222fde1a9 + subpackages: + - spew +- name: github.com/pmezard/go-difflib + version: d8ed2627bdf02c080bf22230dbb337003b7aba2d + subpackages: + - difflib +- name: github.com/stretchr/testify + version: 4d4bfba8f1d1027c4fdbe371823030df51419987 + subpackages: + - assert + - require diff --git a/glide.yaml b/glide.yaml new file mode 100644 index 000000000..cf71cc670 --- /dev/null +++ b/glide.yaml @@ -0,0 +1,23 @@ +package: github.com/tendermint/go-p2p +import: +- package: github.com/tendermint/go-common +- package: github.com/tendermint/go-config +- package: github.com/tendermint/go-crypto +- package: github.com/tendermint/go-data + version: c955b191240568440ea902e14dad2ce19727543a +- package: github.com/tendermint/go-flowrate + subpackages: + - flowrate +- package: github.com/tendermint/go-logger +- package: github.com/tendermint/go-wire +- package: github.com/tendermint/log15 +- package: golang.org/x/crypto + subpackages: + - nacl/box + - nacl/secretbox + - ripemd160 +testImport: +- package: github.com/stretchr/testify + subpackages: + - assert + - require diff --git a/peer.go b/peer.go index 7a2d647f8..92d4973d8 100644 --- a/peer.go +++ b/peer.go @@ -4,101 +4,193 @@ import ( "fmt" "io" "net" + "time" - . "github.com/tendermint/go-common" + cmn "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" - "github.com/tendermint/go-wire" + crypto "github.com/tendermint/go-crypto" + wire "github.com/tendermint/go-wire" ) +// Peer could be marked as persistent, in which case you can use +// Redial function to reconnect. Note that inbound peers can't be +// made persistent. They should be made persistent on the other end. +// +// Before using a peer, you will need to perform a handshake on connection. type Peer struct { - BaseService + cmn.BaseService outbound bool - mconn *MConnection + + conn net.Conn // source connection + mconn *MConnection // multiplex connection + + authEnc bool // authenticated encryption + persistent bool + config cfg.Config *NodeInfo Key string - Data *CMap // User data. + Data *cmn.CMap // User data. +} + +func newPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config cfg.Config, privKey crypto.PrivKeyEd25519) (*Peer, error) { + conn, err := dial(addr, config) + if err != nil { + return nil, err + } + + // outbound = true + return newPeerFromExistingConn(conn, true, reactorsByCh, chDescs, onPeerError, config, privKey) +} + +func newPeerFromExistingConn(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config cfg.Config, privKey crypto.PrivKeyEd25519) (*Peer, error) { + // Encrypt connection + if config.GetBool(configKeyAuthEnc) { + var err error + // Set deadline for handshake so we don't block forever on conn.ReadFull + timeout := time.Duration(config.GetInt(configKeyHandshakeTimeoutSeconds)) * time.Second + conn.SetDeadline(time.Now().Add(timeout)) + conn, err = MakeSecretConnection(conn, privKey) + if err != nil { + return nil, err + } + // remove deadline + conn.SetDeadline(time.Time{}) + } + + // Key and NodeInfo are set after Handshake + p := &Peer{ + outbound: outbound, + authEnc: config.GetBool(configKeyAuthEnc), + conn: conn, + config: config, + Data: cmn.NewCMap(), + } + + p.mconn = createMConnection(conn, p, reactorsByCh, chDescs, onPeerError, config) + + p.BaseService = *cmn.NewBaseService(log, "Peer", p) + + return p, nil } +// CloseConn should be used when the peer was created, but never started. +func (p *Peer) CloseConn() { + p.conn.Close() +} + +// makePersistent marks the peer as persistent. +func (p *Peer) makePersistent() { + if !p.outbound { + panic("inbound peers can't be made persistent") + } + + p.persistent = true +} + +// IsPersistent returns true if the peer is persitent, false otherwise. +func (p *Peer) IsPersistent() bool { + return p.persistent +} + +// HandshakeTimeout performs a handshake between a given node and the peer. // NOTE: blocking -// Before creating a peer with newPeer(), perform a handshake on connection. -func peerHandshake(conn net.Conn, ourNodeInfo *NodeInfo) (*NodeInfo, error) { +func (p *Peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) error { + // Set deadline for handshake so we don't block forever on conn.ReadFull + p.conn.SetDeadline(time.Now().Add(timeout)) + var peerNodeInfo = new(NodeInfo) var err1 error var err2 error - Parallel( + cmn.Parallel( func() { var n int - wire.WriteBinary(ourNodeInfo, conn, &n, &err1) + wire.WriteBinary(ourNodeInfo, p.conn, &n, &err1) }, func() { var n int - wire.ReadBinary(peerNodeInfo, conn, maxNodeInfoSize, &n, &err2) + wire.ReadBinary(peerNodeInfo, p.conn, maxNodeInfoSize, &n, &err2) log.Notice("Peer handshake", "peerNodeInfo", peerNodeInfo) }) if err1 != nil { - return nil, err1 + return err1 } if err2 != nil { - return nil, err2 + return err2 } - peerNodeInfo.RemoteAddr = conn.RemoteAddr().String() - return peerNodeInfo, nil -} -// NOTE: call peerHandshake on conn before calling newPeer(). -func newPeer(config cfg.Config, conn net.Conn, peerNodeInfo *NodeInfo, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer { - var p *Peer - onReceive := func(chID byte, msgBytes []byte) { - reactor := reactorsByCh[chID] - if reactor == nil { - PanicSanity(Fmt("Unknown channel %X", chID)) + if p.authEnc { + // Check that the professed PubKey matches the sconn's. + if !peerNodeInfo.PubKey.Equals(p.PubKey()) { + return fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v", + peerNodeInfo.PubKey, p.PubKey()) } - reactor.Receive(chID, p, msgBytes) } - onError := func(r interface{}) { - p.Stop() - onPeerError(p, r) + + // Remove deadline + p.conn.SetDeadline(time.Time{}) + + peerNodeInfo.RemoteAddr = p.RemoteAddr().String() + + p.NodeInfo = peerNodeInfo + p.Key = peerNodeInfo.PubKey.KeyString() + + return nil +} + +// RemoteAddr returns the remote network address. +func (p *Peer) RemoteAddr() net.Addr { + return p.conn.RemoteAddr() +} + +// PubKey returns the remote public key. +func (p *Peer) PubKey() crypto.PubKeyEd25519 { + if p.authEnc { + return p.conn.(*SecretConnection).RemotePubKey() } - mconn := NewMConnection(config, conn, chDescs, onReceive, onError) - p = &Peer{ - outbound: outbound, - mconn: mconn, - NodeInfo: peerNodeInfo, - Key: peerNodeInfo.PubKey.KeyString(), - Data: NewCMap(), + if p.NodeInfo == nil { + panic("Attempt to get peer's PubKey before calling Handshake") } - p.BaseService = *NewBaseService(log, "Peer", p) - return p + return p.PubKey() } +// OnStart implements BaseService. func (p *Peer) OnStart() error { p.BaseService.OnStart() _, err := p.mconn.Start() return err } +// OnStop implements BaseService. func (p *Peer) OnStop() { p.BaseService.OnStop() p.mconn.Stop() } +// Connection returns underlying MConnection. func (p *Peer) Connection() *MConnection { return p.mconn } +// IsOutbound returns true if the connection is outbound, false otherwise. func (p *Peer) IsOutbound() bool { return p.outbound } +// Send msg to the channel identified by chID byte. Returns false if the send +// queue is full after timeout, specified by MConnection. func (p *Peer) Send(chID byte, msg interface{}) bool { if !p.IsRunning() { + // see Switch#Broadcast, where we fetch the list of peers and loop over + // them - while we're looping, one peer may be removed and stopped. return false } return p.mconn.Send(chID, msg) } +// TrySend msg to the channel identified by chID byte. Immediately returns +// false if the send queue is full. func (p *Peer) TrySend(chID byte, msg interface{}) bool { if !p.IsRunning() { return false @@ -106,6 +198,7 @@ func (p *Peer) TrySend(chID byte, msg interface{}) bool { return p.mconn.TrySend(chID, msg) } +// CanSend returns true if the send queue is not full, false otherwise. func (p *Peer) CanSend(chID byte) bool { if !p.IsRunning() { return false @@ -113,6 +206,7 @@ func (p *Peer) CanSend(chID byte) bool { return p.mconn.CanSend(chID) } +// WriteTo writes the peer's public key to w. func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { var n_ int wire.WriteString(p.Key, w, &n_, &err) @@ -120,18 +214,56 @@ func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { return } +// String representation. func (p *Peer) String() string { if p.outbound { return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12]) - } else { - return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12]) } + + return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12]) } +// Equals reports whenever 2 peers are actually represent the same node. func (p *Peer) Equals(other *Peer) bool { return p.Key == other.Key } +// Get the data for a given key. func (p *Peer) Get(key string) interface{} { return p.Data.Get(key) } + +func dial(addr *NetAddress, config cfg.Config) (net.Conn, error) { + log.Info("Dialing address", "address", addr) + conn, err := addr.DialTimeout(time.Duration( + config.GetInt(configKeyDialTimeoutSeconds)) * time.Second) + if err != nil { + log.Info("Failed dialing address", "address", addr, "error", err) + return nil, err + } + if config.GetBool(configFuzzEnable) { + conn = FuzzConn(config, conn) + } + return conn, nil +} + +func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config cfg.Config) *MConnection { + onReceive := func(chID byte, msgBytes []byte) { + reactor := reactorsByCh[chID] + if reactor == nil { + cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID)) + } + reactor.Receive(chID, p, msgBytes) + } + + onError := func(r interface{}) { + onPeerError(p, r) + } + + mconnConfig := &MConnectionConfig{ + SendRate: int64(config.GetInt(configKeySendRate)), + RecvRate: int64(config.GetInt(configKeyRecvRate)), + } + + return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, mconnConfig) +} diff --git a/pex_reactor.go b/pex_reactor.go index 45c4c96d8..4ac9306cf 100644 --- a/pex_reactor.go +++ b/pex_reactor.go @@ -9,7 +9,7 @@ import ( "time" . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" + wire "github.com/tendermint/go-wire" ) var pexErrInvalidMessage = errors.New("Invalid PEX message") @@ -201,7 +201,7 @@ func (pexR *PEXReactor) ensurePeers() { // Dial picked addresses for _, item := range toDial.Values() { go func(picked *NetAddress) { - _, err := pexR.Switch.DialPeerWithAddress(picked) + _, err := pexR.Switch.DialPeerWithAddress(picked, false) if err != nil { pexR.book.MarkAttempt(picked) } diff --git a/switch.go b/switch.go index eed8ceea5..fcde27449 100644 --- a/switch.go +++ b/switch.go @@ -9,10 +9,15 @@ import ( . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" - "github.com/tendermint/go-crypto" + crypto "github.com/tendermint/go-crypto" "github.com/tendermint/log15" ) +const ( + reconnectAttempts = 30 + reconnectInterval = 3 * time.Second +) + type Reactor interface { Service // Start, Stop @@ -193,79 +198,45 @@ func (sw *Switch) OnStop() { } // NOTE: This performs a blocking handshake before the peer is added. -// CONTRACT: Iff error is returned, peer is nil, and conn is immediately closed. -func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) { - - // Filter by addr (ie. ip:port) - if err := sw.FilterConnByAddr(conn.RemoteAddr()); err != nil { - conn.Close() - return nil, err +// CONTRACT: If error is returned, peer is nil, and conn is immediately closed. +func (sw *Switch) AddPeer(peer *Peer) error { + if err := sw.FilterConnByAddr(peer.RemoteAddr()); err != nil { + return err } - // Set deadline for handshake so we don't block forever on conn.ReadFull - conn.SetDeadline(time.Now().Add( - time.Duration(sw.config.GetInt(configKeyHandshakeTimeoutSeconds)) * time.Second)) - - // First, encrypt the connection. - var sconn net.Conn = conn - if sw.config.GetBool(configKeyAuthEnc) { - var err error - sconn, err = MakeSecretConnection(conn, sw.nodePrivKey) - if err != nil { - conn.Close() - return nil, err - } + if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil { + return err } - // Filter by p2p-key - if err := sw.FilterConnByPubKey(sconn.(*SecretConnection).RemotePubKey()); err != nil { - sconn.Close() - return nil, err + if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.config.GetInt(configKeyHandshakeTimeoutSeconds))*time.Second); err != nil { + return err } - // Then, perform node handshake - peerNodeInfo, err := peerHandshake(sconn, sw.nodeInfo) - if err != nil { - sconn.Close() - return nil, err - } - if sw.config.GetBool(configKeyAuthEnc) { - // Check that the professed PubKey matches the sconn's. - if !peerNodeInfo.PubKey.Equals(sconn.(*SecretConnection).RemotePubKey()) { - sconn.Close() - return nil, fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v", - peerNodeInfo.PubKey, sconn.(*SecretConnection).RemotePubKey()) - } - } // Avoid self - if peerNodeInfo.PubKey.Equals(sw.nodeInfo.PubKey) { - sconn.Close() - return nil, fmt.Errorf("Ignoring connection from self") + if sw.nodeInfo.PubKey.Equals(peer.PubKey()) { + return errors.New("Ignoring connection from self") } + // Check version, chain id - if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil { - sconn.Close() - return nil, err + if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo); err != nil { + return err } - peer := newPeer(sw.config, sconn, peerNodeInfo, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError) - // Add the peer to .peers // ignore if duplicate or if we already have too many for that IP range if err := sw.peers.Add(peer); err != nil { log.Notice("Ignoring peer", "error", err, "peer", peer) peer.Stop() - return nil, err + return err } - // remove deadline and start peer - conn.SetDeadline(time.Time{}) + // Start peer if sw.IsRunning() { sw.startInitPeer(peer) } log.Notice("Added peer", "peer", peer) - return peer, nil + return nil } func (sw *Switch) FilterConnByAddr(addr net.Addr) error { @@ -292,8 +263,10 @@ func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) { } func (sw *Switch) startInitPeer(peer *Peer) { - peer.Start() // spawn send/recv routines - sw.addPeerToReactors(peer) // run AddPeer on each reactor + peer.Start() // spawn send/recv routines + for _, reactor := range sw.reactors { + reactor.AddPeer(peer) + } } // Dial a list of seeds asynchronously in random order @@ -331,7 +304,7 @@ func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error { } func (sw *Switch) dialSeed(addr *NetAddress) { - peer, err := sw.DialPeerWithAddress(addr) + peer, err := sw.DialPeerWithAddress(addr, true) if err != nil { log.Error("Error dialing seed", "error", err) return @@ -340,22 +313,23 @@ func (sw *Switch) dialSeed(addr *NetAddress) { } } -func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) { - log.Info("Dialing address", "address", addr) +func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) { sw.dialing.Set(addr.IP.String(), addr) - conn, err := addr.DialTimeout(time.Duration( - sw.config.GetInt(configKeyDialTimeoutSeconds)) * time.Second) - sw.dialing.Delete(addr.IP.String()) + defer sw.dialing.Delete(addr.IP.String()) + + peer, err := newPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey) + if persistent { + peer.makePersistent() + } if err != nil { - log.Info("Failed dialing address", "address", addr, "error", err) + log.Info("Failed dialing peer", "address", addr, "error", err) + peer.CloseConn() return nil, err } - if sw.config.GetBool(configFuzzEnable) { - conn = FuzzConn(sw.config, conn) - } - peer, err := sw.AddPeerWithConnection(conn, true) + err = sw.AddPeer(peer) if err != nil { - log.Info("Failed adding peer", "address", addr, "conn", conn, "error", err) + log.Info("Failed adding peer", "address", addr, "error", err) + peer.CloseConn() return nil, err } log.Notice("Dialed and added peer", "address", addr, "peer", peer) @@ -400,31 +374,49 @@ func (sw *Switch) Peers() IPeerSet { return sw.peers } -// Disconnect from a peer due to external error. +// Disconnect from a peer due to external error, retry if it is a persistent peer. // TODO: make record depending on reason. func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { + addr := NewNetAddress(peer.RemoteAddr()) log.Notice("Stopping peer for error", "peer", peer, "error", reason) - sw.peers.Remove(peer) - peer.Stop() - sw.removePeerFromReactors(peer, reason) + sw.stopAndRemovePeer(peer, reason) + + if peer.IsPersistent() { + go func() { + log.Notice("Reconnecting to peer", "peer", peer) + for i := 1; i < reconnectAttempts; i++ { + if !sw.IsRunning() { + return + } + + peer, err := sw.DialPeerWithAddress(addr, true) + if err != nil { + if i == reconnectAttempts { + log.Notice("Error reconnecting to peer. Giving up", "tries", i, "error", err) + return + } + log.Notice("Error reconnecting to peer. Trying again", "tries", i, "error", err) + time.Sleep(reconnectInterval) + continue + } + + log.Notice("Reconnected to peer", "peer", peer) + return + } + }() + } } // Disconnect from a peer gracefully. // TODO: handle graceful disconnects. func (sw *Switch) StopPeerGracefully(peer *Peer) { log.Notice("Stopping peer gracefully") - sw.peers.Remove(peer) - peer.Stop() - sw.removePeerFromReactors(peer, nil) -} - -func (sw *Switch) addPeerToReactors(peer *Peer) { - for _, reactor := range sw.reactors { - reactor.AddPeer(peer) - } + sw.stopAndRemovePeer(peer, nil) } -func (sw *Switch) removePeerFromReactors(peer *Peer, reason interface{}) { +func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) { + sw.peers.Remove(peer) + peer.Stop() for _, reactor := range sw.reactors { reactor.RemovePeer(peer, reason) } @@ -449,9 +441,9 @@ func (sw *Switch) listenerRoutine(l Listener) { } // New inbound connection! - _, err := sw.AddPeerWithConnection(inConn, false) + err := sw.AddPeerWithConnection(inConn, false) if err != nil { - log.Notice("Ignoring inbound connection: error on AddPeerWithConnection", "address", inConn.RemoteAddr().String(), "error", err) + log.Notice("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "error", err) continue } @@ -511,14 +503,14 @@ func Connect2Switches(switches []*Switch, i, j int) { c1, c2 := net.Pipe() doneCh := make(chan struct{}) go func() { - _, err := switchI.AddPeerWithConnection(c1, false) // AddPeer is blocking, requires handshake. + err := switchI.AddPeerWithConnection(c1, false) if PanicOnAddPeerErr && err != nil { panic(err) } doneCh <- struct{}{} }() go func() { - _, err := switchJ.AddPeerWithConnection(c2, true) + err := switchJ.AddPeerWithConnection(c2, false) if PanicOnAddPeerErr && err != nil { panic(err) } @@ -552,3 +544,19 @@ func makeSwitch(i int, network, version string, initSwitch func(int, *Switch) *S s.SetNodePrivKey(privKey) return s } + +// AddPeerWithConnection creates a newPeer from the connection, performs the handshake, and adds it to the switch. +func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) error { + peer, err := newPeerFromExistingConn(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey) + if err != nil { + peer.CloseConn() + return err + } + + if err = sw.AddPeer(peer); err != nil { + peer.CloseConn() + return err + } + + return nil +} diff --git a/switch_test.go b/switch_test.go index 1b2ccd743..a9abfa0a5 100644 --- a/switch_test.go +++ b/switch_test.go @@ -3,15 +3,19 @@ package p2p import ( "bytes" "fmt" + golog "log" "net" "sync" "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" . "github.com/tendermint/go-common" + cmn "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" - "github.com/tendermint/go-crypto" - "github.com/tendermint/go-wire" + crypto "github.com/tendermint/go-crypto" + wire "github.com/tendermint/go-wire" ) var ( @@ -21,7 +25,6 @@ var ( func init() { config = cfg.NewMapConfig(nil) setConfigDefaults(config) - } type PeerMessage struct { @@ -174,8 +177,12 @@ func TestConnAddrFilter(t *testing.T) { }) // connect to good peer - go s1.AddPeerWithConnection(c1, false) // AddPeer is blocking, requires handshake. - go s2.AddPeerWithConnection(c2, true) + go func() { + s1.AddPeerWithConnection(c1, false) + }() + go func() { + s2.AddPeerWithConnection(c2, true) + }() // Wait for things to happen, peers to get added... time.Sleep(100 * time.Millisecond * time.Duration(4)) @@ -205,8 +212,12 @@ func TestConnPubKeyFilter(t *testing.T) { }) // connect to good peer - go s1.AddPeerWithConnection(c1, false) // AddPeer is blocking, requires handshake. - go s2.AddPeerWithConnection(c2, true) + go func() { + s1.AddPeerWithConnection(c1, false) + }() + go func() { + s2.AddPeerWithConnection(c2, true) + }() // Wait for things to happen, peers to get added... time.Sleep(100 * time.Millisecond * time.Duration(4)) @@ -221,6 +232,63 @@ func TestConnPubKeyFilter(t *testing.T) { } } +func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + sw := makeSwitch(1, "testing", "123.123.123", initSwitchFunc) + sw.Start() + defer sw.Stop() + + sw2 := makeSwitch(2, "testing", "123.123.123", initSwitchFunc) + defer sw2.Stop() + l, serverAddr := listenTCP() + done := make(chan struct{}) + go accept(l, done, sw2) + defer close(done) + + peer, err := newPeer(NewNetAddress(serverAddr), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey) + require.Nil(err) + err = sw.AddPeer(peer) + require.Nil(err) + + // simulate failure by closing connection + peer.CloseConn() + + time.Sleep(100 * time.Millisecond) + + assert.Zero(sw.Peers().Size()) + assert.False(peer.IsRunning()) +} + +func TestSwitchReconnectsToPersistentPeer(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + sw := makeSwitch(1, "testing", "123.123.123", initSwitchFunc) + sw.Start() + defer sw.Stop() + + sw2 := makeSwitch(2, "testing", "123.123.123", initSwitchFunc) + defer sw2.Stop() + l, serverAddr := listenTCP() + done := make(chan struct{}) + go accept(l, done, sw2) + defer close(done) + + peer, err := newPeer(NewNetAddress(serverAddr), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey) + peer.makePersistent() + require.Nil(err) + err = sw.AddPeer(peer) + require.Nil(err) + + // simulate failure by closing connection + peer.CloseConn() + + time.Sleep(100 * time.Millisecond) + + assert.NotZero(sw.Peers().Size()) + assert.False(peer.IsRunning()) +} + func BenchmarkSwitches(b *testing.B) { b.StopTimer() @@ -252,9 +320,9 @@ func BenchmarkSwitches(b *testing.B) { successChan := s1.Broadcast(chID, "test data") for s := range successChan { if s { - numSuccess += 1 + numSuccess++ } else { - numFailure += 1 + numFailure++ } } } @@ -266,3 +334,48 @@ func BenchmarkSwitches(b *testing.B) { time.Sleep(1000 * time.Millisecond) } + +func listenTCP() (net.Listener, net.Addr) { + l, e := net.Listen("tcp", "127.0.0.1:0") // any available address + if e != nil { + golog.Fatalf("net.Listen tcp :0: %+v", e) + } + return l, l.Addr() +} + +// simulate remote peer +func accept(l net.Listener, done <-chan struct{}, sw *Switch) { + for { + conn, err := l.Accept() + if err != nil { + golog.Fatalf("Failed to accept conn: %+v", err) + } + conn, err = MakeSecretConnection(conn, sw.nodePrivKey) + if err != nil { + golog.Fatalf("Failed to make secret conn: %+v", err) + } + var err1, err2 error + nodeInfo := new(NodeInfo) + cmn.Parallel( + func() { + var n int + wire.WriteBinary(sw.nodeInfo, conn, &n, &err1) + }, + func() { + var n int + wire.ReadBinary(nodeInfo, conn, maxNodeInfoSize, &n, &err2) + }) + if err1 != nil { + golog.Fatalf("Failed to do handshake: %+v", err1) + } + if err2 != nil { + golog.Fatalf("Failed to do handshake: %+v", err2) + } + select { + case <-done: + conn.Close() + return + default: + } + } +}