diff --git a/.gitignore b/.gitignore index c765108df..62f28681c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,2 @@ -*.swp -*.swo -*.bak -.DS_Store vendor +.glide diff --git a/CHANGELOG.md b/CHANGELOG.md index 81e30fcca..cae2f4c9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,26 @@ # Changelog +## 0.5.0 (April 21, 2017) + +BREAKING CHANGES: + +- Remove or unexport methods from FuzzedConnection: Active, Mode, ProbDropRW, ProbDropConn, ProbSleep, MaxDelayMilliseconds, Fuzz +- switch.AddPeerWithConnection is unexported and replaced by switch.AddPeer +- switch.DialPeerWithAddress takes a bool, setting the peer as persistent or not + +FEATURES: + +- Persistent peers: any peer considered a "seed" will be reconnected to when the connection is dropped + + +IMPROVEMENTS: + +- Many more tests and comments +- Refactor configurations for less dependence on go-config. Introduces new structs PeerConfig, MConnConfig, FuzzConnConfig +- New methods on peer: CloseConn, HandshakeTimeout, IsPersistent, Addr, PubKey +- NewNetAddress supports a testing mode where the address defaults to 0.0.0.0:0 + + ## 0.4.0 (March 6, 2017) BREAKING CHANGES: diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 000000000..3716185f2 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,13 @@ +FROM golang:latest + +RUN curl https://glide.sh/get | sh + +RUN mkdir -p /go/src/github.com/tendermint/go-p2p +WORKDIR /go/src/github.com/tendermint/go-p2p + +COPY glide.yaml /go/src/github.com/tendermint/go-p2p/ +COPY glide.lock /go/src/github.com/tendermint/go-p2p/ + +RUN glide install + +COPY . /go/src/github.com/tendermint/go-p2p diff --git a/addrbook.go b/addrbook.go index ace0dba4a..8a1698f41 100644 --- a/addrbook.go +++ b/addrbook.go @@ -15,7 +15,7 @@ import ( "time" . "github.com/tendermint/go-common" - "github.com/tendermint/go-crypto" + crypto "github.com/tendermint/go-crypto" ) const ( @@ -73,7 +73,12 @@ const ( serializationVersion = 1 ) -/* AddrBook - concurrency safe peer address manager */ +const ( + bucketTypeNew = 0x01 + bucketTypeOld = 0x02 +) + +// AddrBook - concurrency safe peer address manager. type AddrBook struct { BaseService @@ -91,11 +96,7 @@ type AddrBook struct { nNew int } -const ( - bucketTypeNew = 0x01 - bucketTypeOld = 0x02 -) - +// NewAddrBook creates a new address book. // Use Start to begin processing asynchronous address updates. func NewAddrBook(filePath string, routabilityStrict bool) *AddrBook { am := &AddrBook{ @@ -125,6 +126,7 @@ func (a *AddrBook) init() { } } +// OnStart implements Service. func (a *AddrBook) OnStart() error { a.BaseService.OnStart() a.loadFromFile(a.filePath) @@ -133,6 +135,7 @@ func (a *AddrBook) OnStart() error { return nil } +// OnStop implements Service. func (a *AddrBook) OnStop() { a.BaseService.OnStop() } @@ -254,15 +257,21 @@ func (a *AddrBook) MarkAttempt(addr *NetAddress) { ka.markAttempt() } +// MarkBad currently just ejects the address. In the future, consider +// blacklisting. func (a *AddrBook) MarkBad(addr *NetAddress) { + a.RemoveAddress(addr) +} + +// RemoveAddress removes the address from the book. +func (a *AddrBook) RemoveAddress(addr *NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() ka := a.addrLookup[addr.String()] if ka == nil { return } - // We currently just eject the address. - // In the future, consider blacklisting. + log.Info("Remove address from book", "addr", addr) a.removeFromAllBuckets(ka) } @@ -309,6 +318,10 @@ type addrBookJSON struct { } func (a *AddrBook) saveToFile(filePath string) { + log.Info("Saving AddrBook to file", "size", a.Size()) + + a.mtx.Lock() + defer a.mtx.Unlock() // Compile Addrs addrs := []*knownAddress{} for _, ka := range a.addrLookup { @@ -386,7 +399,6 @@ out: for { select { case <-dumpAddressTicker.C: - log.Info("Saving AddrBook to file", "size", a.Size()) a.saveToFile(a.filePath) case <-a.Quit: break out diff --git a/addrbook_test.go b/addrbook_test.go index 7e8cb8d76..16aea8ef9 100644 --- a/addrbook_test.go +++ b/addrbook_test.go @@ -148,3 +148,19 @@ func randIPv4Address(t *testing.T) *NetAddress { } } } + +func TestAddrBookRemoveAddress(t *testing.T) { + fname := createTempFileName("addrbook_test") + book := NewAddrBook(fname, true) + + addr := randIPv4Address(t) + book.AddAddress(addr, addr) + assert.Equal(t, 1, book.Size()) + + book.RemoveAddress(addr) + assert.Equal(t, 0, book.Size()) + + nonExistingAddr := randIPv4Address(t) + book.RemoveAddress(nonExistingAddr) + assert.Equal(t, 0, book.Size()) +} diff --git a/config.go b/config.go index 60b69e367..a8b7e343b 100644 --- a/config.go +++ b/config.go @@ -17,7 +17,6 @@ const ( // Fuzz params configFuzzEnable = "fuzz_enable" // use the fuzz wrapped conn - configFuzzActive = "fuzz_active" // toggle fuzzing configFuzzMode = "fuzz_mode" // eg. drop, delay configFuzzMaxDelayMilliseconds = "fuzz_max_delay_milliseconds" configFuzzProbDropRW = "fuzz_prob_drop_rw" @@ -38,7 +37,6 @@ func setConfigDefaults(config cfg.Config) { // Fuzz defaults config.SetDefault(configFuzzEnable, false) - config.SetDefault(configFuzzActive, false) config.SetDefault(configFuzzMode, FuzzModeDrop) config.SetDefault(configFuzzMaxDelayMilliseconds, 3000) config.SetDefault(configFuzzProbDropRW, 0.2) diff --git a/connection.go b/connection.go index 4428e0da7..a14fa5ee4 100644 --- a/connection.go +++ b/connection.go @@ -10,25 +10,25 @@ import ( "sync/atomic" "time" - . "github.com/tendermint/go-common" - cfg "github.com/tendermint/go-config" + cmn "github.com/tendermint/go-common" flow "github.com/tendermint/go-flowrate/flowrate" - "github.com/tendermint/go-wire" //"github.com/tendermint/log15" + wire "github.com/tendermint/go-wire" ) const ( numBatchMsgPackets = 10 minReadBufferSize = 1024 minWriteBufferSize = 65536 - idleTimeoutMinutes = 5 - updateStatsSeconds = 2 - pingTimeoutSeconds = 40 - flushThrottleMS = 100 + updateState = 2 * time.Second + pingTimeout = 40 * time.Second + flushThrottle = 100 * time.Millisecond defaultSendQueueCapacity = 1 + defaultSendRate = int64(512000) // 500KB/s defaultRecvBufferCapacity = 4096 - defaultRecvMessageCapacity = 22020096 // 21MB - defaultSendTimeoutSeconds = 10 + defaultRecvMessageCapacity = 22020096 // 21MB + defaultRecvRate = int64(512000) // 500KB/s + defaultSendTimeout = 10 * time.Second ) type receiveCbFunc func(chID byte, msgBytes []byte) @@ -60,15 +60,13 @@ queue is full. Inbound message bytes are handled with an onReceive callback function. */ type MConnection struct { - BaseService + cmn.BaseService conn net.Conn bufReader *bufio.Reader bufWriter *bufio.Writer sendMonitor *flow.Monitor recvMonitor *flow.Monitor - sendRate int64 - recvRate int64 send chan struct{} pong chan struct{} channels []*Channel @@ -76,35 +74,54 @@ type MConnection struct { onReceive receiveCbFunc onError errorCbFunc errored uint32 + config *MConnConfig quit chan struct{} - flushTimer *ThrottleTimer // flush writes as necessary but throttled. - pingTimer *RepeatTimer // send pings periodically - chStatsTimer *RepeatTimer // update channel stats periodically + flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled. + pingTimer *cmn.RepeatTimer // send pings periodically + chStatsTimer *cmn.RepeatTimer // update channel stats periodically LocalAddress *NetAddress RemoteAddress *NetAddress } -func NewMConnection(config cfg.Config, conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection { +// MConnConfig is a MConnection configuration. +type MConnConfig struct { + SendRate int64 + RecvRate int64 +} + +// DefaultMConnConfig returns the default config. +func DefaultMConnConfig() *MConnConfig { + return &MConnConfig{ + SendRate: defaultSendRate, + RecvRate: defaultRecvRate, + } +} + +// NewMConnection wraps net.Conn and creates multiplex connection +func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection { + return NewMConnectionWithConfig( + conn, + chDescs, + onReceive, + onError, + DefaultMConnConfig()) +} + +// NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config +func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection { mconn := &MConnection{ conn: conn, bufReader: bufio.NewReaderSize(conn, minReadBufferSize), bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize), sendMonitor: flow.New(0, 0), recvMonitor: flow.New(0, 0), - sendRate: int64(config.GetInt(configKeySendRate)), - recvRate: int64(config.GetInt(configKeyRecvRate)), send: make(chan struct{}, 1), pong: make(chan struct{}), onReceive: onReceive, onError: onError, - - // Initialized in Start() - quit: nil, - flushTimer: nil, - pingTimer: nil, - chStatsTimer: nil, + config: config, LocalAddress: NewNetAddress(conn.LocalAddr()), RemoteAddress: NewNetAddress(conn.RemoteAddr()), @@ -123,7 +140,7 @@ func NewMConnection(config cfg.Config, conn net.Conn, chDescs []*ChannelDescript mconn.channels = channels mconn.channelsIdx = channelsIdx - mconn.BaseService = *NewBaseService(log, "MConnection", mconn) + mconn.BaseService = *cmn.NewBaseService(log, "MConnection", mconn) return mconn } @@ -131,9 +148,9 @@ func NewMConnection(config cfg.Config, conn net.Conn, chDescs []*ChannelDescript func (c *MConnection) OnStart() error { c.BaseService.OnStart() c.quit = make(chan struct{}) - c.flushTimer = NewThrottleTimer("flush", flushThrottleMS*time.Millisecond) - c.pingTimer = NewRepeatTimer("ping", pingTimeoutSeconds*time.Second) - c.chStatsTimer = NewRepeatTimer("chStats", updateStatsSeconds*time.Second) + c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle) + c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout) + c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateState) go c.sendRoutine() go c.recvRoutine() return nil @@ -171,7 +188,7 @@ func (c *MConnection) flush() { func (c *MConnection) _recover() { if r := recover(); r != nil { stack := debug.Stack() - err := StackError{r, stack} + err := cmn.StackError{r, stack} c.stopForError(err) } } @@ -196,7 +213,7 @@ func (c *MConnection) Send(chID byte, msg interface{}) bool { // Send message to channel. channel, ok := c.channelsIdx[chID] if !ok { - log.Error(Fmt("Cannot send bytes, unknown channel %X", chID)) + log.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID)) return false } @@ -225,7 +242,7 @@ func (c *MConnection) TrySend(chID byte, msg interface{}) bool { // Send message to channel. channel, ok := c.channelsIdx[chID] if !ok { - log.Error(Fmt("Cannot send bytes, unknown channel %X", chID)) + log.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID)) return false } @@ -241,6 +258,8 @@ func (c *MConnection) TrySend(chID byte, msg interface{}) bool { return ok } +// CanSend returns true if you can send more data onto the chID, false +// otherwise. Use only as a heuristic. func (c *MConnection) CanSend(chID byte) bool { if !c.IsRunning() { return false @@ -248,7 +267,7 @@ func (c *MConnection) CanSend(chID byte) bool { channel, ok := c.channelsIdx[chID] if !ok { - log.Error(Fmt("Unknown channel %X", chID)) + log.Error(cmn.Fmt("Unknown channel %X", chID)) return false } return channel.canSend() @@ -314,7 +333,7 @@ func (c *MConnection) sendSomeMsgPackets() bool { // Block until .sendMonitor says we can write. // Once we're ready we send more than we asked for, // but amortized it should even out. - c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.sendRate), true) + c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true) // Now send some msgPackets. for i := 0; i < numBatchMsgPackets; i++ { @@ -372,7 +391,7 @@ func (c *MConnection) recvRoutine() { FOR_LOOP: for { // Block until .recvMonitor says we can read. - c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.recvRate), true) + c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true) /* // Peek into bufReader for debugging @@ -424,7 +443,7 @@ FOR_LOOP: } channel, ok := c.channelsIdx[pkt.ChannelID] if !ok || channel == nil { - PanicQ(Fmt("Unknown channel %X", pkt.ChannelID)) + cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID)) } msgBytes, err := channel.recvMsgPacket(pkt) if err != nil { @@ -439,7 +458,7 @@ FOR_LOOP: c.onReceive(pkt.ChannelID, msgBytes) } default: - PanicSanity(Fmt("Unknown message type %X", pktType)) + cmn.PanicSanity(cmn.Fmt("Unknown message type %X", pktType)) } // TODO: shouldn't this go in the sendRoutine? @@ -524,7 +543,7 @@ type Channel struct { func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel { desc.FillDefaults() if desc.Priority <= 0 { - PanicSanity("Channel default priority must be a postive integer") + cmn.PanicSanity("Channel default priority must be a postive integer") } return &Channel{ conn: conn, @@ -538,16 +557,14 @@ func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel { // Queues message to send to this channel. // Goroutine-safe -// Times out (and returns false) after defaultSendTimeoutSeconds +// Times out (and returns false) after defaultSendTimeout func (ch *Channel) sendBytes(bytes []byte) bool { - timeout := time.NewTimer(defaultSendTimeoutSeconds * time.Second) select { - case <-timeout.C: - // timeout - return false case ch.sendQueue <- bytes: atomic.AddInt32(&ch.sendQueueSize, 1) return true + case <-time.After(defaultSendTimeout): + return false } } @@ -593,14 +610,14 @@ func (ch *Channel) isSendPending() bool { func (ch *Channel) nextMsgPacket() msgPacket { packet := msgPacket{} packet.ChannelID = byte(ch.id) - packet.Bytes = ch.sending[:MinInt(maxMsgPacketPayloadSize, len(ch.sending))] + packet.Bytes = ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))] if len(ch.sending) <= maxMsgPacketPayloadSize { packet.EOF = byte(0x01) ch.sending = nil atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize } else { packet.EOF = byte(0x00) - ch.sending = ch.sending[MinInt(maxMsgPacketPayloadSize, len(ch.sending)):] + ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):] } return packet } diff --git a/connection_test.go b/connection_test.go new file mode 100644 index 000000000..33d8adfd1 --- /dev/null +++ b/connection_test.go @@ -0,0 +1,139 @@ +package p2p_test + +import ( + "net" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + p2p "github.com/tendermint/go-p2p" +) + +func createMConnection(conn net.Conn) *p2p.MConnection { + onReceive := func(chID byte, msgBytes []byte) { + } + onError := func(r interface{}) { + } + return createMConnectionWithCallbacks(conn, onReceive, onError) +} + +func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *p2p.MConnection { + chDescs := []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}} + return p2p.NewMConnection(conn, chDescs, onReceive, onError) +} + +func TestMConnectionSend(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + mconn := createMConnection(client) + _, err := mconn.Start() + require.Nil(err) + defer mconn.Stop() + + msg := "Ant-Man" + assert.True(mconn.Send(0x01, msg)) + // Note: subsequent Send/TrySend calls could pass because we are reading from + // the send queue in a separate goroutine. + server.Read(make([]byte, len(msg))) + assert.True(mconn.CanSend(0x01)) + + msg = "Spider-Man" + assert.True(mconn.TrySend(0x01, msg)) + server.Read(make([]byte, len(msg))) + + assert.False(mconn.CanSend(0x05), "CanSend should return false because channel is unknown") + assert.False(mconn.Send(0x05, "Absorbing Man"), "Send should return false because channel is unknown") +} + +func TestMConnectionReceive(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + receivedCh := make(chan []byte) + errorsCh := make(chan interface{}) + onReceive := func(chID byte, msgBytes []byte) { + receivedCh <- msgBytes + } + onError := func(r interface{}) { + errorsCh <- r + } + mconn1 := createMConnectionWithCallbacks(client, onReceive, onError) + _, err := mconn1.Start() + require.Nil(err) + defer mconn1.Stop() + + mconn2 := createMConnection(server) + _, err = mconn2.Start() + require.Nil(err) + defer mconn2.Stop() + + msg := "Cyclops" + assert.True(mconn2.Send(0x01, msg)) + + select { + case receivedBytes := <-receivedCh: + assert.Equal([]byte(msg), receivedBytes[2:]) // first 3 bytes are internal + case err := <-errorsCh: + t.Fatalf("Expected %s, got %+v", msg, err) + case <-time.After(500 * time.Millisecond): + t.Fatalf("Did not receive %s message in 500ms", msg) + } +} + +func TestMConnectionStatus(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + mconn := createMConnection(client) + _, err := mconn.Start() + require.Nil(err) + defer mconn.Stop() + + status := mconn.Status() + assert.NotNil(status) + assert.Zero(status.Channels[0].SendQueueSize) +} + +func TestMConnectionStopsAndReturnsError(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + receivedCh := make(chan []byte) + errorsCh := make(chan interface{}) + onReceive := func(chID byte, msgBytes []byte) { + receivedCh <- msgBytes + } + onError := func(r interface{}) { + errorsCh <- r + } + mconn := createMConnectionWithCallbacks(client, onReceive, onError) + _, err := mconn.Start() + require.Nil(err) + defer mconn.Stop() + + client.Close() + + select { + case receivedBytes := <-receivedCh: + t.Fatalf("Expected error, got %v", receivedBytes) + case err := <-errorsCh: + assert.NotNil(err) + assert.False(mconn.IsRunning()) + case <-time.After(500 * time.Millisecond): + t.Fatal("Did not receive error in 500ms") + } +} diff --git a/fuzz.go b/fuzz.go index ee8f43ccf..aefac986a 100644 --- a/fuzz.go +++ b/fuzz.go @@ -5,86 +5,147 @@ import ( "net" "sync" "time" - - cfg "github.com/tendermint/go-config" ) -//-------------------------------------------------------- -// delay reads/writes -// randomly drop reads/writes -// randomly drop connections - const ( - FuzzModeDrop = "drop" - FuzzModeDelay = "delay" + // FuzzModeDrop is a mode in which we randomly drop reads/writes, connections or sleep + FuzzModeDrop = iota + // FuzzModeDelay is a mode in which we randomly sleep + FuzzModeDelay ) -func FuzzConn(config cfg.Config, conn net.Conn) net.Conn { - return &FuzzedConnection{ - conn: conn, - start: time.After(time.Second * 10), // so we have time to do peer handshakes and get set up - params: config, +// FuzzedConnection wraps any net.Conn and depending on the mode either delays +// reads/writes or randomly drops reads/writes/connections. +type FuzzedConnection struct { + conn net.Conn + + mtx sync.Mutex + start <-chan time.Time + active bool + + config *FuzzConnConfig +} + +// FuzzConnConfig is a FuzzedConnection configuration. +type FuzzConnConfig struct { + Mode int + MaxDelay time.Duration + ProbDropRW float64 + ProbDropConn float64 + ProbSleep float64 +} + +// DefaultFuzzConnConfig returns the default config. +func DefaultFuzzConnConfig() *FuzzConnConfig { + return &FuzzConnConfig{ + Mode: FuzzModeDrop, + MaxDelay: 3 * time.Second, + ProbDropRW: 0.2, + ProbDropConn: 0.00, + ProbSleep: 0.00, } } -type FuzzedConnection struct { - conn net.Conn +// FuzzConn creates a new FuzzedConnection. Fuzzing starts immediately. +func FuzzConn(conn net.Conn) net.Conn { + return FuzzConnFromConfig(conn, DefaultFuzzConnConfig()) +} - mtx sync.Mutex - fuzz bool // we don't start fuzzing right away - start <-chan time.Time +// FuzzConnFromConfig creates a new FuzzedConnection from a config. Fuzzing +// starts immediately. +func FuzzConnFromConfig(conn net.Conn, config *FuzzConnConfig) net.Conn { + return &FuzzedConnection{ + conn: conn, + start: make(<-chan time.Time), + active: true, + config: config, + } +} - // fuzz params - params cfg.Config +// FuzzConnAfter creates a new FuzzedConnection. Fuzzing starts when the +// duration elapses. +func FuzzConnAfter(conn net.Conn, d time.Duration) net.Conn { + return FuzzConnAfterFromConfig(conn, d, DefaultFuzzConnConfig()) } -func (fc *FuzzedConnection) randomDuration() time.Duration { - return time.Millisecond * time.Duration(rand.Int()%fc.MaxDelayMilliseconds()) +// FuzzConnAfterFromConfig creates a new FuzzedConnection from a config. +// Fuzzing starts when the duration elapses. +func FuzzConnAfterFromConfig(conn net.Conn, d time.Duration, config *FuzzConnConfig) net.Conn { + return &FuzzedConnection{ + conn: conn, + start: time.After(d), + active: false, + config: config, + } } -func (fc *FuzzedConnection) Active() bool { - return fc.params.GetBool(configFuzzActive) +// Config returns the connection's config. +func (fc *FuzzedConnection) Config() *FuzzConnConfig { + return fc.config } -func (fc *FuzzedConnection) Mode() string { - return fc.params.GetString(configFuzzMode) +// Read implements net.Conn. +func (fc *FuzzedConnection) Read(data []byte) (n int, err error) { + if fc.fuzz() { + return 0, nil + } + return fc.conn.Read(data) } -func (fc *FuzzedConnection) ProbDropRW() float64 { - return fc.params.GetFloat64(configFuzzProbDropRW) +// Write implements net.Conn. +func (fc *FuzzedConnection) Write(data []byte) (n int, err error) { + if fc.fuzz() { + return 0, nil + } + return fc.conn.Write(data) } -func (fc *FuzzedConnection) ProbDropConn() float64 { - return fc.params.GetFloat64(configFuzzProbDropConn) +// Close implements net.Conn. +func (fc *FuzzedConnection) Close() error { return fc.conn.Close() } + +// LocalAddr implements net.Conn. +func (fc *FuzzedConnection) LocalAddr() net.Addr { return fc.conn.LocalAddr() } + +// RemoteAddr implements net.Conn. +func (fc *FuzzedConnection) RemoteAddr() net.Addr { return fc.conn.RemoteAddr() } + +// SetDeadline implements net.Conn. +func (fc *FuzzedConnection) SetDeadline(t time.Time) error { return fc.conn.SetDeadline(t) } + +// SetReadDeadline implements net.Conn. +func (fc *FuzzedConnection) SetReadDeadline(t time.Time) error { + return fc.conn.SetReadDeadline(t) } -func (fc *FuzzedConnection) ProbSleep() float64 { - return fc.params.GetFloat64(configFuzzProbSleep) +// SetWriteDeadline implements net.Conn. +func (fc *FuzzedConnection) SetWriteDeadline(t time.Time) error { + return fc.conn.SetWriteDeadline(t) } -func (fc *FuzzedConnection) MaxDelayMilliseconds() int { - return fc.params.GetInt(configFuzzMaxDelayMilliseconds) +func (fc *FuzzedConnection) randomDuration() time.Duration { + maxDelayMillis := int(fc.config.MaxDelay.Nanoseconds() / 1000) + return time.Millisecond * time.Duration(rand.Int()%maxDelayMillis) } // implements the fuzz (delay, kill conn) // and returns whether or not the read/write should be ignored -func (fc *FuzzedConnection) Fuzz() bool { +func (fc *FuzzedConnection) fuzz() bool { if !fc.shouldFuzz() { return false } - switch fc.Mode() { + switch fc.config.Mode { case FuzzModeDrop: // randomly drop the r/w, drop the conn, or sleep r := rand.Float64() - if r <= fc.ProbDropRW() { + if r <= fc.config.ProbDropRW { return true - } else if r < fc.ProbDropRW()+fc.ProbDropConn() { + } else if r < fc.config.ProbDropRW+fc.config.ProbDropConn { // XXX: can't this fail because machine precision? // XXX: do we need an error? fc.Close() return true - } else if r < fc.ProbDropRW()+fc.ProbDropConn()+fc.ProbSleep() { + } else if r < fc.config.ProbDropRW+fc.config.ProbDropConn+fc.config.ProbSleep { time.Sleep(fc.randomDuration()) } case FuzzModeDelay: @@ -94,48 +155,19 @@ func (fc *FuzzedConnection) Fuzz() bool { return false } -// we don't fuzz until start chan fires func (fc *FuzzedConnection) shouldFuzz() bool { - if !fc.Active() { - return false + if fc.active { + return true } fc.mtx.Lock() defer fc.mtx.Unlock() - if fc.fuzz { - return true - } select { case <-fc.start: - fc.fuzz = true + fc.active = true + return true default: + return false } - return false -} - -func (fc *FuzzedConnection) Read(data []byte) (n int, err error) { - if fc.Fuzz() { - return 0, nil - } - return fc.conn.Read(data) -} - -func (fc *FuzzedConnection) Write(data []byte) (n int, err error) { - if fc.Fuzz() { - return 0, nil - } - return fc.conn.Write(data) -} - -// Implements net.Conn -func (fc *FuzzedConnection) Close() error { return fc.conn.Close() } -func (fc *FuzzedConnection) LocalAddr() net.Addr { return fc.conn.LocalAddr() } -func (fc *FuzzedConnection) RemoteAddr() net.Addr { return fc.conn.RemoteAddr() } -func (fc *FuzzedConnection) SetDeadline(t time.Time) error { return fc.conn.SetDeadline(t) } -func (fc *FuzzedConnection) SetReadDeadline(t time.Time) error { - return fc.conn.SetReadDeadline(t) -} -func (fc *FuzzedConnection) SetWriteDeadline(t time.Time) error { - return fc.conn.SetWriteDeadline(t) } diff --git a/glide.lock b/glide.lock new file mode 100644 index 000000000..71505ea05 --- /dev/null +++ b/glide.lock @@ -0,0 +1,73 @@ +hash: f3d76bef9548cc37ad6038cb55f0812bac7e64735a99995c9da85010eef27f50 +updated: 2017-04-19T00:00:50.949249104-04:00 +imports: +- name: github.com/btcsuite/btcd + version: b8df516b4b267acf2de46be593a9d948d1d2c420 + subpackages: + - btcec +- name: github.com/btcsuite/fastsha256 + version: 637e656429416087660c84436a2a035d69d54e2e +- name: github.com/BurntSushi/toml + version: 99064174e013895bbd9b025c31100bd1d9b590ca +- name: github.com/go-stack/stack + version: 100eb0c0a9c5b306ca2fb4f165df21d80ada4b82 +- name: github.com/mattn/go-colorable + version: 9fdad7c47650b7d2e1da50644c1f4ba7f172f252 +- name: github.com/mattn/go-isatty + version: 56b76bdf51f7708750eac80fa38b952bb9f32639 +- name: github.com/pkg/errors + version: 645ef00459ed84a119197bfb8d8205042c6df63d +- name: github.com/tendermint/ed25519 + version: 1f52c6f8b8a5c7908aff4497c186af344b428925 + subpackages: + - edwards25519 + - extra25519 +- name: github.com/tendermint/go-common + version: f9e3db037330c8a8d61d3966de8473eaf01154fa +- name: github.com/tendermint/go-config + version: 620dcbbd7d587cf3599dedbf329b64311b0c307a +- name: github.com/tendermint/go-crypto + version: 0ca2c6fdb0706001ca4c4b9b80c9f428e8cf39da +- name: github.com/tendermint/go-data + version: e7fcc6d081ec8518912fcdc103188275f83a3ee5 +- name: github.com/tendermint/go-flowrate + version: a20c98e61957faa93b4014fbd902f20ab9317a6a + subpackages: + - flowrate +- name: github.com/tendermint/go-logger + version: cefb3a45c0bf3c493a04e9bcd9b1540528be59f2 +- name: github.com/tendermint/go-wire + version: c1c9a57ab8038448ddea1714c0698f8051e5748c +- name: github.com/tendermint/log15 + version: ae0f3d6450da9eac7074b439c8e1c3cabf0d5ce6 + subpackages: + - term +- name: golang.org/x/crypto + version: 1f22c0103821b9390939b6776727195525381532 + subpackages: + - curve25519 + - nacl/box + - nacl/secretbox + - openpgp/armor + - openpgp/errors + - poly1305 + - ripemd160 + - salsa20/salsa +- name: golang.org/x/sys + version: 50c6bc5e4292a1d4e65c6e9be5f53be28bcbe28e + subpackages: + - unix +testImports: +- name: github.com/davecgh/go-spew + version: 6d212800a42e8ab5c146b8ace3490ee17e5225f9 + subpackages: + - spew +- name: github.com/pmezard/go-difflib + version: d8ed2627bdf02c080bf22230dbb337003b7aba2d + subpackages: + - difflib +- name: github.com/stretchr/testify + version: 69483b4bd14f5845b5a1e55bca19e954e827f1d0 + subpackages: + - assert + - require diff --git a/glide.yaml b/glide.yaml new file mode 100644 index 000000000..e7edc80ae --- /dev/null +++ b/glide.yaml @@ -0,0 +1,29 @@ +package: github.com/tendermint/go-p2p +import: +- package: github.com/tendermint/go-common + version: develop +- package: github.com/tendermint/go-config + version: develop +- package: github.com/tendermint/go-crypto + version: develop +- package: github.com/tendermint/go-data + version: develop +- package: github.com/tendermint/go-flowrate + subpackages: + - flowrate +- package: github.com/tendermint/go-logger + version: develop +- package: github.com/tendermint/go-wire + version: develop +- package: github.com/tendermint/log15 +- package: golang.org/x/crypto + subpackages: + - nacl/box + - nacl/secretbox + - ripemd160 +- package: github.com/pkg/errors +testImport: +- package: github.com/stretchr/testify + subpackages: + - assert + - require diff --git a/netaddress.go b/netaddress.go index 90fcf6a43..263ec9037 100644 --- a/netaddress.go +++ b/netaddress.go @@ -6,6 +6,7 @@ package p2p import ( "errors" + "flag" "net" "strconv" "time" @@ -13,28 +14,36 @@ import ( cmn "github.com/tendermint/go-common" ) +// NetAddress defines information about a peer on the network +// including its IP address, and port. type NetAddress struct { IP net.IP Port uint16 str string } +// NewNetAddress returns a new NetAddress using the provided TCP +// address. When testing, other net.Addr (except TCP) will result in +// using 0.0.0.0:0. When normal run, other net.Addr (except TCP) will +// panic. // TODO: socks proxies? func NewNetAddress(addr net.Addr) *NetAddress { tcpAddr, ok := addr.(*net.TCPAddr) if !ok { - log.Warn(`Only TCPAddrs are supported. If used for anything but testing, - may result in undefined behaviour!`, "addr", addr) - return NewNetAddressIPPort(net.IP("0.0.0.0"), 0) - // NOTE: it would be nice to only not panic if we're in testing ... - // PanicSanity(Fmt("Only TCPAddrs are supported. Got: %v", addr)) + if flag.Lookup("test.v") == nil { // normal run + cmn.PanicSanity(cmn.Fmt("Only TCPAddrs are supported. Got: %v", addr)) + } else { // in testing + return NewNetAddressIPPort(net.IP("0.0.0.0"), 0) + } } ip := tcpAddr.IP port := uint16(tcpAddr.Port) return NewNetAddressIPPort(ip, port) } -// Also resolves the host if host is not an IP. +// NewNetAddressString returns a new NetAddress using the provided +// address in the form of "IP:Port". Also resolves the host if host +// is not an IP. func NewNetAddressString(addr string) (*NetAddress, error) { host, portStr, err := net.SplitHostPort(addr) @@ -62,6 +71,8 @@ func NewNetAddressString(addr string) (*NetAddress, error) { return na, nil } +// NewNetAddressStrings returns an array of NetAddress'es build using +// the provided strings. func NewNetAddressStrings(addrs []string) ([]*NetAddress, error) { netAddrs := make([]*NetAddress, len(addrs)) for i, addr := range addrs { @@ -74,6 +85,8 @@ func NewNetAddressStrings(addrs []string) ([]*NetAddress, error) { return netAddrs, nil } +// NewNetAddressIPPort returns a new NetAddress using the provided IP +// and port number. func NewNetAddressIPPort(ip net.IP, port uint16) *NetAddress { na := &NetAddress{ IP: ip, @@ -86,23 +99,25 @@ func NewNetAddressIPPort(ip net.IP, port uint16) *NetAddress { return na } +// Equals reports whether na and other are the same addresses. func (na *NetAddress) Equals(other interface{}) bool { if o, ok := other.(*NetAddress); ok { return na.String() == o.String() - } else { - return false } + + return false } func (na *NetAddress) Less(other interface{}) bool { if o, ok := other.(*NetAddress); ok { return na.String() < o.String() - } else { - cmn.PanicSanity("Cannot compare unequal types") - return false } + + cmn.PanicSanity("Cannot compare unequal types") + return false } +// String representation. func (na *NetAddress) String() string { if na.str == "" { na.str = net.JoinHostPort( @@ -113,6 +128,7 @@ func (na *NetAddress) String() string { return na.str } +// Dial calls net.Dial on the address. func (na *NetAddress) Dial() (net.Conn, error) { conn, err := net.Dial("tcp", na.String()) if err != nil { @@ -121,6 +137,7 @@ func (na *NetAddress) Dial() (net.Conn, error) { return conn, nil } +// DialTimeout calls net.DialTimeout on the address. func (na *NetAddress) DialTimeout(timeout time.Duration) (net.Conn, error) { conn, err := net.DialTimeout("tcp", na.String(), timeout) if err != nil { @@ -129,6 +146,7 @@ func (na *NetAddress) DialTimeout(timeout time.Duration) (net.Conn, error) { return conn, nil } +// Routable returns true if the address is routable. func (na *NetAddress) Routable() bool { // TODO(oga) bitcoind doesn't include RFC3849 here, but should we? return na.Valid() && !(na.RFC1918() || na.RFC3927() || na.RFC4862() || @@ -142,10 +160,12 @@ func (na *NetAddress) Valid() bool { na.IP.Equal(net.IPv4bcast)) } +// Local returns true if it is a local address. func (na *NetAddress) Local() bool { return na.IP.IsLoopback() || zero4.Contains(na.IP) } +// ReachabilityTo checks whenever o can be reached from na. func (na *NetAddress) ReachabilityTo(o *NetAddress) int { const ( Unreachable = 0 diff --git a/netaddress_test.go b/netaddress_test.go new file mode 100644 index 000000000..db871fdec --- /dev/null +++ b/netaddress_test.go @@ -0,0 +1,113 @@ +package p2p + +import ( + "net" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewNetAddress(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + tcpAddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:8080") + require.Nil(err) + addr := NewNetAddress(tcpAddr) + + assert.Equal("127.0.0.1:8080", addr.String()) + + assert.NotPanics(func() { + NewNetAddress(&net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8000}) + }, "Calling NewNetAddress with UDPAddr should not panic in testing") +} + +func TestNewNetAddressString(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + tests := []struct { + addr string + correct bool + }{ + {"127.0.0.1:8080", true}, + {"127.0.0:8080", false}, + {"a", false}, + {"127.0.0.1:a", false}, + {"a:8080", false}, + {"8082", false}, + {"127.0.0:8080000", false}, + } + + for _, t := range tests { + addr, err := NewNetAddressString(t.addr) + if t.correct { + require.Nil(err) + assert.Equal(t.addr, addr.String()) + } else { + require.NotNil(err) + } + } +} + +func TestNewNetAddressStrings(t *testing.T) { + assert, require := assert.New(t), require.New(t) + addrs, err := NewNetAddressStrings([]string{"127.0.0.1:8080", "127.0.0.2:8080"}) + require.Nil(err) + + assert.Equal(2, len(addrs)) +} + +func TestNewNetAddressIPPort(t *testing.T) { + assert := assert.New(t) + addr := NewNetAddressIPPort(net.ParseIP("127.0.0.1"), 8080) + + assert.Equal("127.0.0.1:8080", addr.String()) +} + +func TestNetAddressProperties(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + // TODO add more test cases + tests := []struct { + addr string + valid bool + local bool + routable bool + }{ + {"127.0.0.1:8080", true, true, false}, + {"ya.ru:80", true, false, true}, + } + + for _, t := range tests { + addr, err := NewNetAddressString(t.addr) + require.Nil(err) + + assert.Equal(t.valid, addr.Valid()) + assert.Equal(t.local, addr.Local()) + assert.Equal(t.routable, addr.Routable()) + } +} + +func TestNetAddressReachabilityTo(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + // TODO add more test cases + tests := []struct { + addr string + other string + reachability int + }{ + {"127.0.0.1:8080", "127.0.0.1:8081", 0}, + {"ya.ru:80", "127.0.0.1:8080", 1}, + } + + for _, t := range tests { + addr, err := NewNetAddressString(t.addr) + require.Nil(err) + + other, err := NewNetAddressString(t.other) + require.Nil(err) + + assert.Equal(t.reachability, addr.ReachabilityTo(other)) + } +} diff --git a/peer.go b/peer.go index 7a2d647f8..5461d7e8a 100644 --- a/peer.go +++ b/peer.go @@ -4,101 +4,237 @@ import ( "fmt" "io" "net" + "time" - . "github.com/tendermint/go-common" - cfg "github.com/tendermint/go-config" - "github.com/tendermint/go-wire" + "github.com/pkg/errors" + cmn "github.com/tendermint/go-common" + crypto "github.com/tendermint/go-crypto" + wire "github.com/tendermint/go-wire" ) +// Peer could be marked as persistent, in which case you can use +// Redial function to reconnect. Note that inbound peers can't be +// made persistent. They should be made persistent on the other end. +// +// Before using a peer, you will need to perform a handshake on connection. type Peer struct { - BaseService + cmn.BaseService outbound bool - mconn *MConnection + + conn net.Conn // source connection + mconn *MConnection // multiplex connection + + persistent bool + config *PeerConfig *NodeInfo Key string - Data *CMap // User data. + Data *cmn.CMap // User data. +} + +// PeerConfig is a Peer configuration. +type PeerConfig struct { + AuthEnc bool // authenticated encryption + + HandshakeTimeout time.Duration + DialTimeout time.Duration + + MConfig *MConnConfig + + Fuzz bool // fuzz connection (for testing) + FuzzConfig *FuzzConnConfig +} + +// DefaultPeerConfig returns the default config. +func DefaultPeerConfig() *PeerConfig { + return &PeerConfig{ + AuthEnc: true, + HandshakeTimeout: 2 * time.Second, + DialTimeout: 3 * time.Second, + MConfig: DefaultMConnConfig(), + Fuzz: false, + FuzzConfig: DefaultFuzzConnConfig(), + } } +func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) { + return newOutboundPeerWithConfig(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig()) +} + +func newOutboundPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { + conn, err := dial(addr, config) + if err != nil { + return nil, errors.Wrap(err, "Error creating peer") + } + + peer, err := newPeerFromConnAndConfig(conn, true, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config) + if err != nil { + conn.Close() + return nil, err + } + return peer, nil +} + +func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) { + return newInboundPeerWithConfig(conn, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig()) +} + +func newInboundPeerWithConfig(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { + return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config) +} + +func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { + conn := rawConn + + // Fuzz connection + if config.Fuzz { + // so we have time to do peer handshakes and get set up + conn = FuzzConnAfterFromConfig(conn, 10*time.Second, config.FuzzConfig) + } + + // Encrypt connection + if config.AuthEnc { + conn.SetDeadline(time.Now().Add(config.HandshakeTimeout)) + + var err error + conn, err = MakeSecretConnection(conn, ourNodePrivKey) + if err != nil { + return nil, errors.Wrap(err, "Error creating peer") + } + } + + // Key and NodeInfo are set after Handshake + p := &Peer{ + outbound: outbound, + conn: conn, + config: config, + Data: cmn.NewCMap(), + } + + p.mconn = createMConnection(conn, p, reactorsByCh, chDescs, onPeerError, config.MConfig) + + p.BaseService = *cmn.NewBaseService(log, "Peer", p) + + return p, nil +} + +// CloseConn should be used when the peer was created, but never started. +func (p *Peer) CloseConn() { + p.conn.Close() +} + +// makePersistent marks the peer as persistent. +func (p *Peer) makePersistent() { + if !p.outbound { + panic("inbound peers can't be made persistent") + } + + p.persistent = true +} + +// IsPersistent returns true if the peer is persitent, false otherwise. +func (p *Peer) IsPersistent() bool { + return p.persistent +} + +// HandshakeTimeout performs a handshake between a given node and the peer. // NOTE: blocking -// Before creating a peer with newPeer(), perform a handshake on connection. -func peerHandshake(conn net.Conn, ourNodeInfo *NodeInfo) (*NodeInfo, error) { +func (p *Peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) error { + // Set deadline for handshake so we don't block forever on conn.ReadFull + p.conn.SetDeadline(time.Now().Add(timeout)) + var peerNodeInfo = new(NodeInfo) var err1 error var err2 error - Parallel( + cmn.Parallel( func() { var n int - wire.WriteBinary(ourNodeInfo, conn, &n, &err1) + wire.WriteBinary(ourNodeInfo, p.conn, &n, &err1) }, func() { var n int - wire.ReadBinary(peerNodeInfo, conn, maxNodeInfoSize, &n, &err2) + wire.ReadBinary(peerNodeInfo, p.conn, maxNodeInfoSize, &n, &err2) log.Notice("Peer handshake", "peerNodeInfo", peerNodeInfo) }) if err1 != nil { - return nil, err1 + return errors.Wrap(err1, "Error during handshake/write") } if err2 != nil { - return nil, err2 + return errors.Wrap(err2, "Error during handshake/read") } - peerNodeInfo.RemoteAddr = conn.RemoteAddr().String() - return peerNodeInfo, nil -} -// NOTE: call peerHandshake on conn before calling newPeer(). -func newPeer(config cfg.Config, conn net.Conn, peerNodeInfo *NodeInfo, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer { - var p *Peer - onReceive := func(chID byte, msgBytes []byte) { - reactor := reactorsByCh[chID] - if reactor == nil { - PanicSanity(Fmt("Unknown channel %X", chID)) + if p.config.AuthEnc { + // Check that the professed PubKey matches the sconn's. + if !peerNodeInfo.PubKey.Equals(p.PubKey()) { + return fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v", + peerNodeInfo.PubKey, p.PubKey()) } - reactor.Receive(chID, p, msgBytes) } - onError := func(r interface{}) { - p.Stop() - onPeerError(p, r) + + // Remove deadline + p.conn.SetDeadline(time.Time{}) + + peerNodeInfo.RemoteAddr = p.Addr().String() + + p.NodeInfo = peerNodeInfo + p.Key = peerNodeInfo.PubKey.KeyString() + + return nil +} + +// Addr returns peer's network address. +func (p *Peer) Addr() net.Addr { + return p.conn.RemoteAddr() +} + +// PubKey returns peer's public key. +func (p *Peer) PubKey() crypto.PubKeyEd25519 { + if p.config.AuthEnc { + return p.conn.(*SecretConnection).RemotePubKey() } - mconn := NewMConnection(config, conn, chDescs, onReceive, onError) - p = &Peer{ - outbound: outbound, - mconn: mconn, - NodeInfo: peerNodeInfo, - Key: peerNodeInfo.PubKey.KeyString(), - Data: NewCMap(), + if p.NodeInfo == nil { + panic("Attempt to get peer's PubKey before calling Handshake") } - p.BaseService = *NewBaseService(log, "Peer", p) - return p + return p.PubKey() } +// OnStart implements BaseService. func (p *Peer) OnStart() error { p.BaseService.OnStart() _, err := p.mconn.Start() return err } +// OnStop implements BaseService. func (p *Peer) OnStop() { p.BaseService.OnStop() p.mconn.Stop() } +// Connection returns underlying MConnection. func (p *Peer) Connection() *MConnection { return p.mconn } +// IsOutbound returns true if the connection is outbound, false otherwise. func (p *Peer) IsOutbound() bool { return p.outbound } +// Send msg to the channel identified by chID byte. Returns false if the send +// queue is full after timeout, specified by MConnection. func (p *Peer) Send(chID byte, msg interface{}) bool { if !p.IsRunning() { + // see Switch#Broadcast, where we fetch the list of peers and loop over + // them - while we're looping, one peer may be removed and stopped. return false } return p.mconn.Send(chID, msg) } +// TrySend msg to the channel identified by chID byte. Immediately returns +// false if the send queue is full. func (p *Peer) TrySend(chID byte, msg interface{}) bool { if !p.IsRunning() { return false @@ -106,6 +242,7 @@ func (p *Peer) TrySend(chID byte, msg interface{}) bool { return p.mconn.TrySend(chID, msg) } +// CanSend returns true if the send queue is not full, false otherwise. func (p *Peer) CanSend(chID byte) bool { if !p.IsRunning() { return false @@ -113,6 +250,7 @@ func (p *Peer) CanSend(chID byte) bool { return p.mconn.CanSend(chID) } +// WriteTo writes the peer's public key to w. func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { var n_ int wire.WriteString(p.Key, w, &n_, &err) @@ -120,18 +258,47 @@ func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { return } +// String representation. func (p *Peer) String() string { if p.outbound { return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12]) - } else { - return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12]) } + + return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12]) } +// Equals reports whenever 2 peers are actually represent the same node. func (p *Peer) Equals(other *Peer) bool { return p.Key == other.Key } +// Get the data for a given key. func (p *Peer) Get(key string) interface{} { return p.Data.Get(key) } + +func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) { + log.Info("Dialing address", "address", addr) + conn, err := addr.DialTimeout(config.DialTimeout) + if err != nil { + log.Info("Failed dialing address", "address", addr, "error", err) + return nil, err + } + return conn, nil +} + +func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection { + onReceive := func(chID byte, msgBytes []byte) { + reactor := reactorsByCh[chID] + if reactor == nil { + cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID)) + } + reactor.Receive(chID, p, msgBytes) + } + + onError := func(r interface{}) { + onPeerError(p, r) + } + + return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config) +} diff --git a/peer_test.go b/peer_test.go new file mode 100644 index 000000000..5f3ed0e23 --- /dev/null +++ b/peer_test.go @@ -0,0 +1,156 @@ +package p2p + +import ( + golog "log" + "net" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + crypto "github.com/tendermint/go-crypto" +) + +func TestPeerBasic(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + // simulate remote peer + rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()} + rp.Start() + defer rp.Stop() + + p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), DefaultPeerConfig()) + require.Nil(err) + + p.Start() + defer p.Stop() + + assert.True(p.IsRunning()) + assert.True(p.IsOutbound()) + assert.False(p.IsPersistent()) + p.makePersistent() + assert.True(p.IsPersistent()) + assert.Equal(rp.Addr().String(), p.Addr().String()) + assert.Equal(rp.PubKey(), p.PubKey()) +} + +func TestPeerWithoutAuthEnc(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + config := DefaultPeerConfig() + config.AuthEnc = false + + // simulate remote peer + rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: config} + rp.Start() + defer rp.Stop() + + p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), config) + require.Nil(err) + + p.Start() + defer p.Stop() + + assert.True(p.IsRunning()) +} + +func TestPeerSend(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + config := DefaultPeerConfig() + config.AuthEnc = false + + // simulate remote peer + rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: config} + rp.Start() + defer rp.Stop() + + p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), config) + require.Nil(err) + + p.Start() + defer p.Stop() + + assert.True(p.CanSend(0x01)) + assert.True(p.Send(0x01, "Asylum")) +} + +func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) (*Peer, error) { + chDescs := []*ChannelDescriptor{ + &ChannelDescriptor{ID: 0x01, Priority: 1}, + } + reactorsByCh := map[byte]Reactor{0x01: NewTestReactor(chDescs, true)} + pk := crypto.GenPrivKeyEd25519() + p, err := newOutboundPeerWithConfig(addr, reactorsByCh, chDescs, func(p *Peer, r interface{}) {}, pk, config) + if err != nil { + return nil, err + } + err = p.HandshakeTimeout(&NodeInfo{ + PubKey: pk.PubKey().(crypto.PubKeyEd25519), + Moniker: "host_peer", + Network: "testing", + Version: "123.123.123", + }, 1*time.Second) + if err != nil { + return nil, err + } + return p, nil +} + +type remotePeer struct { + PrivKey crypto.PrivKeyEd25519 + Config *PeerConfig + addr *NetAddress + quit chan struct{} +} + +func (p *remotePeer) Addr() *NetAddress { + return p.addr +} + +func (p *remotePeer) PubKey() crypto.PubKeyEd25519 { + return p.PrivKey.PubKey().(crypto.PubKeyEd25519) +} + +func (p *remotePeer) Start() { + l, e := net.Listen("tcp", "127.0.0.1:0") // any available address + if e != nil { + golog.Fatalf("net.Listen tcp :0: %+v", e) + } + p.addr = NewNetAddress(l.Addr()) + p.quit = make(chan struct{}) + go p.accept(l) +} + +func (p *remotePeer) Stop() { + close(p.quit) +} + +func (p *remotePeer) accept(l net.Listener) { + for { + conn, err := l.Accept() + if err != nil { + golog.Fatalf("Failed to accept conn: %+v", err) + } + peer, err := newInboundPeerWithConfig(conn, make(map[byte]Reactor), make([]*ChannelDescriptor, 0), func(p *Peer, r interface{}) {}, p.PrivKey, p.Config) + if err != nil { + golog.Fatalf("Failed to create a peer: %+v", err) + } + err = peer.HandshakeTimeout(&NodeInfo{ + PubKey: p.PrivKey.PubKey().(crypto.PubKeyEd25519), + Moniker: "remote_peer", + Network: "testing", + Version: "123.123.123", + }, 1*time.Second) + if err != nil { + golog.Fatalf("Failed to perform handshake: %+v", err) + } + select { + case <-p.quit: + conn.Close() + return + default: + } + } +} diff --git a/pex_reactor.go b/pex_reactor.go index 45c4c96d8..4b6129762 100644 --- a/pex_reactor.go +++ b/pex_reactor.go @@ -2,58 +2,86 @@ package p2p import ( "bytes" - "errors" "fmt" "math/rand" "reflect" "time" - . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" + cmn "github.com/tendermint/go-common" + wire "github.com/tendermint/go-wire" ) -var pexErrInvalidMessage = errors.New("Invalid PEX message") - const ( - PexChannel = byte(0x00) - ensurePeersPeriodSeconds = 30 + // PexChannel is a channel for PEX messages + PexChannel = byte(0x00) + + // period to ensure peers connected + defaultEnsurePeersPeriod = 30 * time.Second minNumOutboundPeers = 10 maxPexMessageSize = 1048576 // 1MB + + // maximum messages one peer can send to us during `msgCountByPeerFlushInterval` + defaultMaxMsgCountByPeer = 1000 + msgCountByPeerFlushInterval = 1 * time.Hour ) -/* -PEXReactor handles PEX (peer exchange) and ensures that an -adequate number of peers are connected to the switch. -*/ +// PEXReactor handles PEX (peer exchange) and ensures that an +// adequate number of peers are connected to the switch. +// +// It uses `AddrBook` (address book) to store `NetAddress`es of the peers. +// +// ## Preventing abuse +// +// For now, it just limits the number of messages from one peer to +// `defaultMaxMsgCountByPeer` messages per `msgCountByPeerFlushInterval` (1000 +// msg/hour). +// +// NOTE [2017-01-17]: +// Limiting is fine for now. Maybe down the road we want to keep track of the +// quality of peer messages so if peerA keeps telling us about peers we can't +// connect to then maybe we should care less about peerA. But I don't think +// that kind of complexity is priority right now. type PEXReactor struct { BaseReactor - sw *Switch - book *AddrBook + sw *Switch + book *AddrBook + ensurePeersPeriod time.Duration + + // tracks message count by peer, so we can prevent abuse + msgCountByPeer *cmn.CMap + maxMsgCountByPeer uint16 } -func NewPEXReactor(book *AddrBook) *PEXReactor { - pexR := &PEXReactor{ - book: book, +// NewPEXReactor creates new PEX reactor. +func NewPEXReactor(b *AddrBook) *PEXReactor { + r := &PEXReactor{ + book: b, + ensurePeersPeriod: defaultEnsurePeersPeriod, + msgCountByPeer: cmn.NewCMap(), + maxMsgCountByPeer: defaultMaxMsgCountByPeer, } - pexR.BaseReactor = *NewBaseReactor(log, "PEXReactor", pexR) - return pexR + r.BaseReactor = *NewBaseReactor(log, "PEXReactor", r) + return r } -func (pexR *PEXReactor) OnStart() error { - pexR.BaseReactor.OnStart() - pexR.book.Start() - go pexR.ensurePeersRoutine() +// OnStart implements BaseService +func (r *PEXReactor) OnStart() error { + r.BaseReactor.OnStart() + r.book.Start() + go r.ensurePeersRoutine() + go r.flushMsgCountByPeer() return nil } -func (pexR *PEXReactor) OnStop() { - pexR.BaseReactor.OnStop() - pexR.book.Stop() +// OnStop implements BaseService +func (r *PEXReactor) OnStop() { + r.BaseReactor.OnStop() + r.book.Stop() } -// Implements Reactor -func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor { +// GetChannels implements Reactor +func (r *PEXReactor) GetChannels() []*ChannelDescriptor { return []*ChannelDescriptor{ &ChannelDescriptor{ ID: PexChannel, @@ -63,37 +91,45 @@ func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor { } } -// Implements Reactor -func (pexR *PEXReactor) AddPeer(peer *Peer) { - // Add the peer to the address book - netAddr, err := NewNetAddressString(peer.ListenAddr) - if err != nil { - // this should never happen - log.Error("Error in AddPeer: invalid peer address", "addr", peer.ListenAddr, "error", err) - return - } - - if peer.IsOutbound() { - if pexR.book.NeedMoreAddrs() { - pexR.RequestPEX(peer) +// AddPeer implements Reactor by adding peer to the address book (if inbound) +// or by requesting more addresses (if outbound). +func (r *PEXReactor) AddPeer(p *Peer) { + if p.IsOutbound() { + // For outbound peers, the address is already in the books. + // Either it was added in DialSeeds or when we + // received the peer's address in r.Receive + if r.book.NeedMoreAddrs() { + r.RequestPEX(p) + } + } else { // For inbound connections, the peer is its own source + addr, err := NewNetAddressString(p.ListenAddr) + if err != nil { + // this should never happen + log.Error("Error in AddPeer: invalid peer address", "addr", p.ListenAddr, "error", err) + return } - } else { - // For inbound connections, the peer is its own source - // (For outbound peers, the address is already in the books) - pexR.book.AddAddress(netAddr, netAddr) + r.book.AddAddress(addr, addr) } } -// Implements Reactor -func (pexR *PEXReactor) RemovePeer(peer *Peer, reason interface{}) { - // TODO +// RemovePeer implements Reactor. +func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) { + // If we aren't keeping track of local temp data for each peer here, then we + // don't have to do anything. } -// Implements Reactor -// Handles incoming PEX messages. -func (pexR *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) { +// Receive implements Reactor by handling incoming PEX messages. +func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) { + srcAddr := src.Connection().RemoteAddress + srcAddrStr := srcAddr.String() + + r.IncrementMsgCountForPeer(srcAddrStr) + if r.ReachedMaxMsgCountForPeer(srcAddrStr) { + log.Warn("Maximum number of messages reached for peer", "peer", srcAddrStr) + // TODO remove src from peers? + return + } - // decode message _, msg, err := DecodeMessage(msgBytes) if err != nil { log.Warn("Error decoding message", "error", err) @@ -104,87 +140,127 @@ func (pexR *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) { switch msg := msg.(type) { case *pexRequestMessage: // src requested some peers. - // TODO: prevent abuse. - pexR.SendAddrs(src, pexR.book.GetSelection()) + r.SendAddrs(src, r.book.GetSelection()) case *pexAddrsMessage: // We received some peer addresses from src. - // TODO: prevent abuse. // (We don't want to get spammed with bad peers) - srcAddr := src.Connection().RemoteAddress for _, addr := range msg.Addrs { if addr != nil { - pexR.book.AddAddress(addr, srcAddr) + r.book.AddAddress(addr, srcAddr) } } default: - log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) + log.Warn(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) } } -// Asks peer for more addresses. -func (pexR *PEXReactor) RequestPEX(peer *Peer) { - peer.Send(PexChannel, struct{ PexMessage }{&pexRequestMessage{}}) +// RequestPEX asks peer for more addresses. +func (r *PEXReactor) RequestPEX(p *Peer) { + p.Send(PexChannel, struct{ PexMessage }{&pexRequestMessage{}}) +} + +// SendAddrs sends addrs to the peer. +func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) { + p.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}) +} + +// SetEnsurePeersPeriod sets period to ensure peers connected. +func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) { + r.ensurePeersPeriod = d +} + +// SetMaxMsgCountByPeer sets maximum messages one peer can send to us during 'msgCountByPeerFlushInterval'. +func (r *PEXReactor) SetMaxMsgCountByPeer(v uint16) { + r.maxMsgCountByPeer = v } -func (pexR *PEXReactor) SendAddrs(peer *Peer, addrs []*NetAddress) { - peer.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}) +// ReachedMaxMsgCountForPeer returns true if we received too many +// messages from peer with address `addr`. +// NOTE: assumes the value in the CMap is non-nil +func (r *PEXReactor) ReachedMaxMsgCountForPeer(addr string) bool { + return r.msgCountByPeer.Get(addr).(uint16) >= r.maxMsgCountByPeer +} + +// Increment or initialize the msg count for the peer in the CMap +func (r *PEXReactor) IncrementMsgCountForPeer(addr string) { + var count uint16 + countI := r.msgCountByPeer.Get(addr) + if countI != nil { + count = countI.(uint16) + } + count++ + r.msgCountByPeer.Set(addr, count) } // Ensures that sufficient peers are connected. (continuous) -func (pexR *PEXReactor) ensurePeersRoutine() { +func (r *PEXReactor) ensurePeersRoutine() { // Randomize when routine starts - time.Sleep(time.Duration(rand.Int63n(500*ensurePeersPeriodSeconds)) * time.Millisecond) + ensurePeersPeriodMs := r.ensurePeersPeriod.Nanoseconds() / 1e6 + time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond) // fire once immediately. - pexR.ensurePeers() + r.ensurePeers() + // fire periodically - timer := NewRepeatTimer("pex", ensurePeersPeriodSeconds*time.Second) -FOR_LOOP: + ticker := time.NewTicker(r.ensurePeersPeriod) + for { select { - case <-timer.Ch: - pexR.ensurePeers() - case <-pexR.Quit: - break FOR_LOOP + case <-ticker.C: + r.ensurePeers() + case <-r.Quit: + ticker.Stop() + return } } - - // Cleanup - timer.Stop() } -// Ensures that sufficient peers are connected. (once) -func (pexR *PEXReactor) ensurePeers() { - numOutPeers, _, numDialing := pexR.Switch.NumPeers() +// ensurePeers ensures that sufficient peers are connected. (once) +// +// Old bucket / New bucket are arbitrary categories to denote whether an +// address is vetted or not, and this needs to be determined over time via a +// heuristic that we haven't perfected yet, or, perhaps is manually edited by +// the node operator. It should not be used to compute what addresses are +// already connected or not. +// +// TODO Basically, we need to work harder on our good-peer/bad-peer marking. +// What we're currently doing in terms of marking good/bad peers is just a +// placeholder. It should not be the case that an address becomes old/vetted +// upon a single successful connection. +func (r *PEXReactor) ensurePeers() { + numOutPeers, _, numDialing := r.Switch.NumPeers() numToDial := minNumOutboundPeers - (numOutPeers + numDialing) log.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial) if numToDial <= 0 { return } - toDial := NewCMap() + + toDial := make(map[string]*NetAddress) // Try to pick numToDial addresses to dial. - // TODO: improve logic. for i := 0; i < numToDial; i++ { - newBias := MinInt(numOutPeers, 8)*10 + 10 + // The purpose of newBias is to first prioritize old (more vetted) peers + // when we have few connections, but to allow for new (less vetted) peers + // if we already have many connections. This algorithm isn't perfect, but + // it somewhat ensures that we prioritize connecting to more-vetted + // peers. + newBias := cmn.MinInt(numOutPeers, 8)*10 + 10 var picked *NetAddress // Try to fetch a new peer 3 times. // This caps the maximum number of tries to 3 * numToDial. for j := 0; j < 3; j++ { - try := pexR.book.PickAddress(newBias) + try := r.book.PickAddress(newBias) if try == nil { break } - alreadySelected := toDial.Has(try.IP.String()) - alreadyDialing := pexR.Switch.IsDialing(try) - alreadyConnected := pexR.Switch.Peers().Has(try.IP.String()) + _, alreadySelected := toDial[try.IP.String()] + alreadyDialing := r.Switch.IsDialing(try) + alreadyConnected := r.Switch.Peers().Has(try.IP.String()) if alreadySelected || alreadyDialing || alreadyConnected { - /* - log.Info("Cannot dial address", "addr", try, - "alreadySelected", alreadySelected, - "alreadyDialing", alreadyDialing, - "alreadyConnected", alreadyConnected) - */ + // log.Info("Cannot dial address", "addr", try, + // "alreadySelected", alreadySelected, + // "alreadyDialing", alreadyDialing, + // "alreadyConnected", alreadyConnected) continue } else { log.Info("Will dial address", "addr", try) @@ -195,26 +271,40 @@ func (pexR *PEXReactor) ensurePeers() { if picked == nil { continue } - toDial.Set(picked.IP.String(), picked) + toDial[picked.IP.String()] = picked } // Dial picked addresses - for _, item := range toDial.Values() { + for _, item := range toDial { go func(picked *NetAddress) { - _, err := pexR.Switch.DialPeerWithAddress(picked) + _, err := r.Switch.DialPeerWithAddress(picked, false) if err != nil { - pexR.book.MarkAttempt(picked) + r.book.MarkAttempt(picked) } - }(item.(*NetAddress)) + }(item) } // If we need more addresses, pick a random peer and ask for more. - if pexR.book.NeedMoreAddrs() { - if peers := pexR.Switch.Peers().List(); len(peers) > 0 { + if r.book.NeedMoreAddrs() { + if peers := r.Switch.Peers().List(); len(peers) > 0 { i := rand.Int() % len(peers) peer := peers[i] log.Info("No addresses to dial. Sending pexRequest to random peer", "peer", peer) - pexR.RequestPEX(peer) + r.RequestPEX(peer) + } + } +} + +func (r *PEXReactor) flushMsgCountByPeer() { + ticker := time.NewTicker(msgCountByPeerFlushInterval) + + for { + select { + case <-ticker.C: + r.msgCountByPeer.Clear() + case <-r.Quit: + ticker.Stop() + return } } } @@ -227,6 +317,8 @@ const ( msgTypeAddrs = byte(0x02) ) +// PexMessage is a primary type for PEX messages. Underneath, it could contain +// either pexRequestMessage, or pexAddrsMessage messages. type PexMessage interface{} var _ = wire.RegisterInterface( @@ -235,6 +327,7 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs}, ) +// DecodeMessage implements interface registered above. func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) { msgType = bz[0] n := new(int) diff --git a/pex_reactor_test.go b/pex_reactor_test.go new file mode 100644 index 000000000..13f2fa208 --- /dev/null +++ b/pex_reactor_test.go @@ -0,0 +1,163 @@ +package p2p + +import ( + "io/ioutil" + "math/rand" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + cmn "github.com/tendermint/go-common" + wire "github.com/tendermint/go-wire" +) + +func TestPEXReactorBasic(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + dir, err := ioutil.TempDir("", "pex_reactor") + require.Nil(err) + defer os.RemoveAll(dir) + book := NewAddrBook(dir+"addrbook.json", true) + + r := NewPEXReactor(book) + + assert.NotNil(r) + assert.NotEmpty(r.GetChannels()) +} + +func TestPEXReactorAddRemovePeer(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + dir, err := ioutil.TempDir("", "pex_reactor") + require.Nil(err) + defer os.RemoveAll(dir) + book := NewAddrBook(dir+"addrbook.json", true) + + r := NewPEXReactor(book) + + size := book.Size() + peer := createRandomPeer(false) + + r.AddPeer(peer) + assert.Equal(size+1, book.Size()) + + r.RemovePeer(peer, "peer not available") + assert.Equal(size+1, book.Size()) + + outboundPeer := createRandomPeer(true) + + r.AddPeer(outboundPeer) + assert.Equal(size+1, book.Size(), "outbound peers should not be added to the address book") + + r.RemovePeer(outboundPeer, "peer not available") + assert.Equal(size+1, book.Size()) +} + +func TestPEXReactorRunning(t *testing.T) { + require := require.New(t) + + N := 3 + switches := make([]*Switch, N) + + dir, err := ioutil.TempDir("", "pex_reactor") + require.Nil(err) + defer os.RemoveAll(dir) + book := NewAddrBook(dir+"addrbook.json", false) + + // create switches + for i := 0; i < N; i++ { + switches[i] = makeSwitch(i, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch { + r := NewPEXReactor(book) + r.SetEnsurePeersPeriod(250 * time.Millisecond) + sw.AddReactor("pex", r) + return sw + }) + } + + // fill the address book and add listeners + for _, s := range switches { + addr, _ := NewNetAddressString(s.NodeInfo().ListenAddr) + book.AddAddress(addr, addr) + s.AddListener(NewDefaultListener("tcp", s.NodeInfo().ListenAddr, true)) + } + + // start switches + for _, s := range switches { + _, err := s.Start() // start switch and reactors + require.Nil(err) + } + + time.Sleep(1 * time.Second) + + // check peers are connected after some time + for _, s := range switches { + outbound, inbound, _ := s.NumPeers() + if outbound+inbound == 0 { + t.Errorf("%v expected to be connected to at least one peer", s.NodeInfo().ListenAddr) + } + } + + // stop them + for _, s := range switches { + s.Stop() + } +} + +func TestPEXReactorReceive(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + dir, err := ioutil.TempDir("", "pex_reactor") + require.Nil(err) + defer os.RemoveAll(dir) + book := NewAddrBook(dir+"addrbook.json", true) + + r := NewPEXReactor(book) + + peer := createRandomPeer(false) + + size := book.Size() + netAddr, _ := NewNetAddressString(peer.ListenAddr) + addrs := []*NetAddress{netAddr} + msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}) + r.Receive(PexChannel, peer, msg) + assert.Equal(size+1, book.Size()) + + msg = wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}}) + r.Receive(PexChannel, peer, msg) +} + +func TestPEXReactorAbuseFromPeer(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + dir, err := ioutil.TempDir("", "pex_reactor") + require.Nil(err) + defer os.RemoveAll(dir) + book := NewAddrBook(dir+"addrbook.json", true) + + r := NewPEXReactor(book) + r.SetMaxMsgCountByPeer(5) + + peer := createRandomPeer(false) + + msg := wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}}) + for i := 0; i < 10; i++ { + r.Receive(PexChannel, peer, msg) + } + + assert.True(r.ReachedMaxMsgCountForPeer(peer.ListenAddr)) +} + +func createRandomPeer(outbound bool) *Peer { + addr := cmn.Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256) + netAddr, _ := NewNetAddressString(addr) + return &Peer{ + Key: cmn.RandStr(12), + NodeInfo: &NodeInfo{ + ListenAddr: addr, + }, + outbound: outbound, + mconn: &MConnection{RemoteAddress: netAddr}, + } +} diff --git a/switch.go b/switch.go index eed8ceea5..6835e0a4b 100644 --- a/switch.go +++ b/switch.go @@ -9,10 +9,15 @@ import ( . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" - "github.com/tendermint/go-crypto" + crypto "github.com/tendermint/go-crypto" "github.com/tendermint/log15" ) +const ( + reconnectAttempts = 30 + reconnectInterval = 3 * time.Second +) + type Reactor interface { Service // Start, Stop @@ -193,79 +198,45 @@ func (sw *Switch) OnStop() { } // NOTE: This performs a blocking handshake before the peer is added. -// CONTRACT: Iff error is returned, peer is nil, and conn is immediately closed. -func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) { - - // Filter by addr (ie. ip:port) - if err := sw.FilterConnByAddr(conn.RemoteAddr()); err != nil { - conn.Close() - return nil, err +// CONTRACT: If error is returned, peer is nil, and conn is immediately closed. +func (sw *Switch) AddPeer(peer *Peer) error { + if err := sw.FilterConnByAddr(peer.Addr()); err != nil { + return err } - // Set deadline for handshake so we don't block forever on conn.ReadFull - conn.SetDeadline(time.Now().Add( - time.Duration(sw.config.GetInt(configKeyHandshakeTimeoutSeconds)) * time.Second)) - - // First, encrypt the connection. - var sconn net.Conn = conn - if sw.config.GetBool(configKeyAuthEnc) { - var err error - sconn, err = MakeSecretConnection(conn, sw.nodePrivKey) - if err != nil { - conn.Close() - return nil, err - } + if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil { + return err } - // Filter by p2p-key - if err := sw.FilterConnByPubKey(sconn.(*SecretConnection).RemotePubKey()); err != nil { - sconn.Close() - return nil, err + if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.config.GetInt(configKeyHandshakeTimeoutSeconds))*time.Second); err != nil { + return err } - // Then, perform node handshake - peerNodeInfo, err := peerHandshake(sconn, sw.nodeInfo) - if err != nil { - sconn.Close() - return nil, err - } - if sw.config.GetBool(configKeyAuthEnc) { - // Check that the professed PubKey matches the sconn's. - if !peerNodeInfo.PubKey.Equals(sconn.(*SecretConnection).RemotePubKey()) { - sconn.Close() - return nil, fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v", - peerNodeInfo.PubKey, sconn.(*SecretConnection).RemotePubKey()) - } - } // Avoid self - if peerNodeInfo.PubKey.Equals(sw.nodeInfo.PubKey) { - sconn.Close() - return nil, fmt.Errorf("Ignoring connection from self") + if sw.nodeInfo.PubKey.Equals(peer.PubKey()) { + return errors.New("Ignoring connection from self") } + // Check version, chain id - if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil { - sconn.Close() - return nil, err + if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo); err != nil { + return err } - peer := newPeer(sw.config, sconn, peerNodeInfo, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError) - // Add the peer to .peers // ignore if duplicate or if we already have too many for that IP range if err := sw.peers.Add(peer); err != nil { log.Notice("Ignoring peer", "error", err, "peer", peer) peer.Stop() - return nil, err + return err } - // remove deadline and start peer - conn.SetDeadline(time.Time{}) + // Start peer if sw.IsRunning() { sw.startInitPeer(peer) } log.Notice("Added peer", "peer", peer) - return peer, nil + return nil } func (sw *Switch) FilterConnByAddr(addr net.Addr) error { @@ -292,8 +263,10 @@ func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) { } func (sw *Switch) startInitPeer(peer *Peer) { - peer.Start() // spawn send/recv routines - sw.addPeerToReactors(peer) // run AddPeer on each reactor + peer.Start() // spawn send/recv routines + for _, reactor := range sw.reactors { + reactor.AddPeer(peer) + } } // Dial a list of seeds asynchronously in random order @@ -331,7 +304,7 @@ func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error { } func (sw *Switch) dialSeed(addr *NetAddress) { - peer, err := sw.DialPeerWithAddress(addr) + peer, err := sw.DialPeerWithAddress(addr, true) if err != nil { log.Error("Error dialing seed", "error", err) return @@ -340,22 +313,22 @@ func (sw *Switch) dialSeed(addr *NetAddress) { } } -func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) { - log.Info("Dialing address", "address", addr) +func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) { sw.dialing.Set(addr.IP.String(), addr) - conn, err := addr.DialTimeout(time.Duration( - sw.config.GetInt(configKeyDialTimeoutSeconds)) * time.Second) - sw.dialing.Delete(addr.IP.String()) + defer sw.dialing.Delete(addr.IP.String()) + + peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, peerConfigFromGoConfig(sw.config)) if err != nil { - log.Info("Failed dialing address", "address", addr, "error", err) + log.Info("Failed dialing peer", "address", addr, "error", err) return nil, err } - if sw.config.GetBool(configFuzzEnable) { - conn = FuzzConn(sw.config, conn) + if persistent { + peer.makePersistent() } - peer, err := sw.AddPeerWithConnection(conn, true) + err = sw.AddPeer(peer) if err != nil { - log.Info("Failed adding peer", "address", addr, "conn", conn, "error", err) + log.Info("Failed adding peer", "address", addr, "error", err) + peer.CloseConn() return nil, err } log.Notice("Dialed and added peer", "address", addr, "peer", peer) @@ -400,31 +373,49 @@ func (sw *Switch) Peers() IPeerSet { return sw.peers } -// Disconnect from a peer due to external error. +// Disconnect from a peer due to external error, retry if it is a persistent peer. // TODO: make record depending on reason. func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { + addr := NewNetAddress(peer.Addr()) log.Notice("Stopping peer for error", "peer", peer, "error", reason) - sw.peers.Remove(peer) - peer.Stop() - sw.removePeerFromReactors(peer, reason) + sw.stopAndRemovePeer(peer, reason) + + if peer.IsPersistent() { + go func() { + log.Notice("Reconnecting to peer", "peer", peer) + for i := 1; i < reconnectAttempts; i++ { + if !sw.IsRunning() { + return + } + + peer, err := sw.DialPeerWithAddress(addr, true) + if err != nil { + if i == reconnectAttempts { + log.Notice("Error reconnecting to peer. Giving up", "tries", i, "error", err) + return + } + log.Notice("Error reconnecting to peer. Trying again", "tries", i, "error", err) + time.Sleep(reconnectInterval) + continue + } + + log.Notice("Reconnected to peer", "peer", peer) + return + } + }() + } } // Disconnect from a peer gracefully. // TODO: handle graceful disconnects. func (sw *Switch) StopPeerGracefully(peer *Peer) { log.Notice("Stopping peer gracefully") - sw.peers.Remove(peer) - peer.Stop() - sw.removePeerFromReactors(peer, nil) + sw.stopAndRemovePeer(peer, nil) } -func (sw *Switch) addPeerToReactors(peer *Peer) { - for _, reactor := range sw.reactors { - reactor.AddPeer(peer) - } -} - -func (sw *Switch) removePeerFromReactors(peer *Peer, reason interface{}) { +func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) { + sw.peers.Remove(peer) + peer.Stop() for _, reactor := range sw.reactors { reactor.RemovePeer(peer, reason) } @@ -444,14 +435,10 @@ func (sw *Switch) listenerRoutine(l Listener) { continue } - if sw.config.GetBool(configFuzzEnable) { - inConn = FuzzConn(sw.config, inConn) - } - // New inbound connection! - _, err := sw.AddPeerWithConnection(inConn, false) + err := sw.addPeerWithConnectionAndConfig(inConn, peerConfigFromGoConfig(sw.config)) if err != nil { - log.Notice("Ignoring inbound connection: error on AddPeerWithConnection", "address", inConn.RemoteAddr().String(), "error", err) + log.Notice("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "error", err) continue } @@ -511,14 +498,14 @@ func Connect2Switches(switches []*Switch, i, j int) { c1, c2 := net.Pipe() doneCh := make(chan struct{}) go func() { - _, err := switchI.AddPeerWithConnection(c1, false) // AddPeer is blocking, requires handshake. + err := switchI.addPeerWithConnection(c1) if PanicOnAddPeerErr && err != nil { panic(err) } doneCh <- struct{}{} }() go func() { - _, err := switchJ.AddPeerWithConnection(c2, true) + err := switchJ.addPeerWithConnection(c2) if PanicOnAddPeerErr && err != nil { panic(err) } @@ -544,11 +531,63 @@ func makeSwitch(i int, network, version string, initSwitch func(int, *Switch) *S // TODO: let the config be passed in? s := initSwitch(i, NewSwitch(cfg.NewMapConfig(nil))) s.SetNodeInfo(&NodeInfo{ - PubKey: privKey.PubKey().(crypto.PubKeyEd25519), - Moniker: Fmt("switch%d", i), - Network: network, - Version: version, + PubKey: privKey.PubKey().(crypto.PubKeyEd25519), + Moniker: Fmt("switch%d", i), + Network: network, + Version: version, + RemoteAddr: Fmt("%v:%v", network, rand.Intn(64512)+1023), + ListenAddr: Fmt("%v:%v", network, rand.Intn(64512)+1023), }) s.SetNodePrivKey(privKey) return s } + +func (sw *Switch) addPeerWithConnection(conn net.Conn) error { + peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey) + if err != nil { + conn.Close() + return err + } + + if err = sw.AddPeer(peer); err != nil { + conn.Close() + return err + } + + return nil +} + +func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error { + peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config) + if err != nil { + conn.Close() + return err + } + + if err = sw.AddPeer(peer); err != nil { + conn.Close() + return err + } + + return nil +} + +func peerConfigFromGoConfig(config cfg.Config) *PeerConfig { + return &PeerConfig{ + AuthEnc: config.GetBool(configKeyAuthEnc), + Fuzz: config.GetBool(configFuzzEnable), + HandshakeTimeout: time.Duration(config.GetInt(configKeyHandshakeTimeoutSeconds)) * time.Second, + DialTimeout: time.Duration(config.GetInt(configKeyDialTimeoutSeconds)) * time.Second, + MConfig: &MConnConfig{ + SendRate: int64(config.GetInt(configKeySendRate)), + RecvRate: int64(config.GetInt(configKeyRecvRate)), + }, + FuzzConfig: &FuzzConnConfig{ + Mode: config.GetInt(configFuzzMode), + MaxDelay: time.Duration(config.GetInt(configFuzzMaxDelayMilliseconds)) * time.Millisecond, + ProbDropRW: config.GetFloat64(configFuzzProbDropRW), + ProbDropConn: config.GetFloat64(configFuzzProbDropConn), + ProbSleep: config.GetFloat64(configFuzzProbSleep), + }, + } +} diff --git a/switch_test.go b/switch_test.go index 1b2ccd743..a81bb4ac0 100644 --- a/switch_test.go +++ b/switch_test.go @@ -8,10 +8,12 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" - "github.com/tendermint/go-crypto" - "github.com/tendermint/go-wire" + crypto "github.com/tendermint/go-crypto" + wire "github.com/tendermint/go-wire" ) var ( @@ -21,7 +23,6 @@ var ( func init() { config = cfg.NewMapConfig(nil) setConfigDefaults(config) - } type PeerMessage struct { @@ -174,8 +175,12 @@ func TestConnAddrFilter(t *testing.T) { }) // connect to good peer - go s1.AddPeerWithConnection(c1, false) // AddPeer is blocking, requires handshake. - go s2.AddPeerWithConnection(c2, true) + go func() { + s1.addPeerWithConnection(c1) + }() + go func() { + s2.addPeerWithConnection(c2) + }() // Wait for things to happen, peers to get added... time.Sleep(100 * time.Millisecond * time.Duration(4)) @@ -205,8 +210,12 @@ func TestConnPubKeyFilter(t *testing.T) { }) // connect to good peer - go s1.AddPeerWithConnection(c1, false) // AddPeer is blocking, requires handshake. - go s2.AddPeerWithConnection(c2, true) + go func() { + s1.addPeerWithConnection(c1) + }() + go func() { + s2.addPeerWithConnection(c2) + }() // Wait for things to happen, peers to get added... time.Sleep(100 * time.Millisecond * time.Duration(4)) @@ -221,6 +230,59 @@ func TestConnPubKeyFilter(t *testing.T) { } } +func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + sw := makeSwitch(1, "testing", "123.123.123", initSwitchFunc) + sw.Start() + defer sw.Stop() + + // simulate remote peer + rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()} + rp.Start() + defer rp.Stop() + + peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey) + require.Nil(err) + err = sw.AddPeer(peer) + require.Nil(err) + + // simulate failure by closing connection + peer.CloseConn() + + time.Sleep(100 * time.Millisecond) + + assert.Zero(sw.Peers().Size()) + assert.False(peer.IsRunning()) +} + +func TestSwitchReconnectsToPersistentPeer(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + sw := makeSwitch(1, "testing", "123.123.123", initSwitchFunc) + sw.Start() + defer sw.Stop() + + // simulate remote peer + rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()} + rp.Start() + defer rp.Stop() + + peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey) + peer.makePersistent() + require.Nil(err) + err = sw.AddPeer(peer) + require.Nil(err) + + // simulate failure by closing connection + peer.CloseConn() + + time.Sleep(100 * time.Millisecond) + + assert.NotZero(sw.Peers().Size()) + assert.False(peer.IsRunning()) +} + func BenchmarkSwitches(b *testing.B) { b.StopTimer() @@ -252,9 +314,9 @@ func BenchmarkSwitches(b *testing.B) { successChan := s1.Broadcast(chID, "test data") for s := range successChan { if s { - numSuccess += 1 + numSuccess++ } else { - numFailure += 1 + numFailure++ } } } diff --git a/version.go b/version.go index c13ec447b..9a4c7bbaf 100644 --- a/version.go +++ b/version.go @@ -1,3 +1,3 @@ package p2p -const Version = "0.4.0" // DialSeeds returns an error +const Version = "0.5.0"