diff --git a/config.go b/config.go index 60b69e367..a8b7e343b 100644 --- a/config.go +++ b/config.go @@ -17,7 +17,6 @@ const ( // Fuzz params configFuzzEnable = "fuzz_enable" // use the fuzz wrapped conn - configFuzzActive = "fuzz_active" // toggle fuzzing configFuzzMode = "fuzz_mode" // eg. drop, delay configFuzzMaxDelayMilliseconds = "fuzz_max_delay_milliseconds" configFuzzProbDropRW = "fuzz_prob_drop_rw" @@ -38,7 +37,6 @@ func setConfigDefaults(config cfg.Config) { // Fuzz defaults config.SetDefault(configFuzzEnable, false) - config.SetDefault(configFuzzActive, false) config.SetDefault(configFuzzMode, FuzzModeDrop) config.SetDefault(configFuzzMaxDelayMilliseconds, 3000) config.SetDefault(configFuzzProbDropRW, 0.2) diff --git a/connection.go b/connection.go index e61608896..a14fa5ee4 100644 --- a/connection.go +++ b/connection.go @@ -74,7 +74,7 @@ type MConnection struct { onReceive receiveCbFunc onError errorCbFunc errored uint32 - config *MConnectionConfig + config *MConnConfig quit chan struct{} flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled. @@ -85,12 +85,20 @@ type MConnection struct { RemoteAddress *NetAddress } -// MConnectionConfig is a MConnection configuration -type MConnectionConfig struct { +// MConnConfig is a MConnection configuration. +type MConnConfig struct { SendRate int64 RecvRate int64 } +// DefaultMConnConfig returns the default config. +func DefaultMConnConfig() *MConnConfig { + return &MConnConfig{ + SendRate: defaultSendRate, + RecvRate: defaultRecvRate, + } +} + // NewMConnection wraps net.Conn and creates multiplex connection func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection { return NewMConnectionWithConfig( @@ -98,14 +106,11 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive recei chDescs, onReceive, onError, - &MConnectionConfig{ - SendRate: defaultSendRate, - RecvRate: defaultRecvRate, - }) + DefaultMConnConfig()) } // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config -func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnectionConfig) *MConnection { +func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection { mconn := &MConnection{ conn: conn, bufReader: bufio.NewReaderSize(conn, minReadBufferSize), @@ -253,6 +258,8 @@ func (c *MConnection) TrySend(chID byte, msg interface{}) bool { return ok } +// CanSend returns true if you can send more data onto the chID, false +// otherwise. Use only as a heuristic. func (c *MConnection) CanSend(chID byte) bool { if !c.IsRunning() { return false @@ -552,14 +559,12 @@ func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel { // Goroutine-safe // Times out (and returns false) after defaultSendTimeout func (ch *Channel) sendBytes(bytes []byte) bool { - timeout := time.NewTimer(defaultSendTimeout) select { - case <-timeout.C: - // timeout - return false case ch.sendQueue <- bytes: atomic.AddInt32(&ch.sendQueueSize, 1) return true + case <-time.After(defaultSendTimeout): + return false } } diff --git a/connection_test.go b/connection_test.go index cd6bb4a8f..84e20eee3 100644 --- a/connection_test.go +++ b/connection_test.go @@ -19,7 +19,7 @@ func createMConnection(conn net.Conn) *p2p.MConnection { } func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *p2p.MConnection { - chDescs := []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{ID: 0x01, Priority: 1}} + chDescs := []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}} return p2p.NewMConnection(conn, chDescs, onReceive, onError) } @@ -37,13 +37,18 @@ func TestMConnectionSend(t *testing.T) { msg := "Ant-Man" assert.True(mconn.Send(0x01, msg)) - assert.False(mconn.CanSend(0x01)) + // Note: subsequent Send/TrySend calls could pass because we are reading from + // the send queue in a separate goroutine. + assert.False(mconn.CanSend(0x01), "CanSend should return false because queue is full") server.Read(make([]byte, len(msg))) assert.True(mconn.CanSend(0x01)) msg = "Spider-Man" assert.True(mconn.TrySend(0x01, msg)) server.Read(make([]byte, len(msg))) + + assert.False(mconn.CanSend(0x05), "CanSend should return false because channel is unknown") + assert.False(mconn.Send(0x05, "Absorbing Man"), "Send should return false because channel is unknown") } func TestMConnectionReceive(t *testing.T) { diff --git a/fuzz.go b/fuzz.go index ee8f43ccf..aefac986a 100644 --- a/fuzz.go +++ b/fuzz.go @@ -5,86 +5,147 @@ import ( "net" "sync" "time" - - cfg "github.com/tendermint/go-config" ) -//-------------------------------------------------------- -// delay reads/writes -// randomly drop reads/writes -// randomly drop connections - const ( - FuzzModeDrop = "drop" - FuzzModeDelay = "delay" + // FuzzModeDrop is a mode in which we randomly drop reads/writes, connections or sleep + FuzzModeDrop = iota + // FuzzModeDelay is a mode in which we randomly sleep + FuzzModeDelay ) -func FuzzConn(config cfg.Config, conn net.Conn) net.Conn { - return &FuzzedConnection{ - conn: conn, - start: time.After(time.Second * 10), // so we have time to do peer handshakes and get set up - params: config, +// FuzzedConnection wraps any net.Conn and depending on the mode either delays +// reads/writes or randomly drops reads/writes/connections. +type FuzzedConnection struct { + conn net.Conn + + mtx sync.Mutex + start <-chan time.Time + active bool + + config *FuzzConnConfig +} + +// FuzzConnConfig is a FuzzedConnection configuration. +type FuzzConnConfig struct { + Mode int + MaxDelay time.Duration + ProbDropRW float64 + ProbDropConn float64 + ProbSleep float64 +} + +// DefaultFuzzConnConfig returns the default config. +func DefaultFuzzConnConfig() *FuzzConnConfig { + return &FuzzConnConfig{ + Mode: FuzzModeDrop, + MaxDelay: 3 * time.Second, + ProbDropRW: 0.2, + ProbDropConn: 0.00, + ProbSleep: 0.00, } } -type FuzzedConnection struct { - conn net.Conn +// FuzzConn creates a new FuzzedConnection. Fuzzing starts immediately. +func FuzzConn(conn net.Conn) net.Conn { + return FuzzConnFromConfig(conn, DefaultFuzzConnConfig()) +} - mtx sync.Mutex - fuzz bool // we don't start fuzzing right away - start <-chan time.Time +// FuzzConnFromConfig creates a new FuzzedConnection from a config. Fuzzing +// starts immediately. +func FuzzConnFromConfig(conn net.Conn, config *FuzzConnConfig) net.Conn { + return &FuzzedConnection{ + conn: conn, + start: make(<-chan time.Time), + active: true, + config: config, + } +} - // fuzz params - params cfg.Config +// FuzzConnAfter creates a new FuzzedConnection. Fuzzing starts when the +// duration elapses. +func FuzzConnAfter(conn net.Conn, d time.Duration) net.Conn { + return FuzzConnAfterFromConfig(conn, d, DefaultFuzzConnConfig()) } -func (fc *FuzzedConnection) randomDuration() time.Duration { - return time.Millisecond * time.Duration(rand.Int()%fc.MaxDelayMilliseconds()) +// FuzzConnAfterFromConfig creates a new FuzzedConnection from a config. +// Fuzzing starts when the duration elapses. +func FuzzConnAfterFromConfig(conn net.Conn, d time.Duration, config *FuzzConnConfig) net.Conn { + return &FuzzedConnection{ + conn: conn, + start: time.After(d), + active: false, + config: config, + } } -func (fc *FuzzedConnection) Active() bool { - return fc.params.GetBool(configFuzzActive) +// Config returns the connection's config. +func (fc *FuzzedConnection) Config() *FuzzConnConfig { + return fc.config } -func (fc *FuzzedConnection) Mode() string { - return fc.params.GetString(configFuzzMode) +// Read implements net.Conn. +func (fc *FuzzedConnection) Read(data []byte) (n int, err error) { + if fc.fuzz() { + return 0, nil + } + return fc.conn.Read(data) } -func (fc *FuzzedConnection) ProbDropRW() float64 { - return fc.params.GetFloat64(configFuzzProbDropRW) +// Write implements net.Conn. +func (fc *FuzzedConnection) Write(data []byte) (n int, err error) { + if fc.fuzz() { + return 0, nil + } + return fc.conn.Write(data) } -func (fc *FuzzedConnection) ProbDropConn() float64 { - return fc.params.GetFloat64(configFuzzProbDropConn) +// Close implements net.Conn. +func (fc *FuzzedConnection) Close() error { return fc.conn.Close() } + +// LocalAddr implements net.Conn. +func (fc *FuzzedConnection) LocalAddr() net.Addr { return fc.conn.LocalAddr() } + +// RemoteAddr implements net.Conn. +func (fc *FuzzedConnection) RemoteAddr() net.Addr { return fc.conn.RemoteAddr() } + +// SetDeadline implements net.Conn. +func (fc *FuzzedConnection) SetDeadline(t time.Time) error { return fc.conn.SetDeadline(t) } + +// SetReadDeadline implements net.Conn. +func (fc *FuzzedConnection) SetReadDeadline(t time.Time) error { + return fc.conn.SetReadDeadline(t) } -func (fc *FuzzedConnection) ProbSleep() float64 { - return fc.params.GetFloat64(configFuzzProbSleep) +// SetWriteDeadline implements net.Conn. +func (fc *FuzzedConnection) SetWriteDeadline(t time.Time) error { + return fc.conn.SetWriteDeadline(t) } -func (fc *FuzzedConnection) MaxDelayMilliseconds() int { - return fc.params.GetInt(configFuzzMaxDelayMilliseconds) +func (fc *FuzzedConnection) randomDuration() time.Duration { + maxDelayMillis := int(fc.config.MaxDelay.Nanoseconds() / 1000) + return time.Millisecond * time.Duration(rand.Int()%maxDelayMillis) } // implements the fuzz (delay, kill conn) // and returns whether or not the read/write should be ignored -func (fc *FuzzedConnection) Fuzz() bool { +func (fc *FuzzedConnection) fuzz() bool { if !fc.shouldFuzz() { return false } - switch fc.Mode() { + switch fc.config.Mode { case FuzzModeDrop: // randomly drop the r/w, drop the conn, or sleep r := rand.Float64() - if r <= fc.ProbDropRW() { + if r <= fc.config.ProbDropRW { return true - } else if r < fc.ProbDropRW()+fc.ProbDropConn() { + } else if r < fc.config.ProbDropRW+fc.config.ProbDropConn { // XXX: can't this fail because machine precision? // XXX: do we need an error? fc.Close() return true - } else if r < fc.ProbDropRW()+fc.ProbDropConn()+fc.ProbSleep() { + } else if r < fc.config.ProbDropRW+fc.config.ProbDropConn+fc.config.ProbSleep { time.Sleep(fc.randomDuration()) } case FuzzModeDelay: @@ -94,48 +155,19 @@ func (fc *FuzzedConnection) Fuzz() bool { return false } -// we don't fuzz until start chan fires func (fc *FuzzedConnection) shouldFuzz() bool { - if !fc.Active() { - return false + if fc.active { + return true } fc.mtx.Lock() defer fc.mtx.Unlock() - if fc.fuzz { - return true - } select { case <-fc.start: - fc.fuzz = true + fc.active = true + return true default: + return false } - return false -} - -func (fc *FuzzedConnection) Read(data []byte) (n int, err error) { - if fc.Fuzz() { - return 0, nil - } - return fc.conn.Read(data) -} - -func (fc *FuzzedConnection) Write(data []byte) (n int, err error) { - if fc.Fuzz() { - return 0, nil - } - return fc.conn.Write(data) -} - -// Implements net.Conn -func (fc *FuzzedConnection) Close() error { return fc.conn.Close() } -func (fc *FuzzedConnection) LocalAddr() net.Addr { return fc.conn.LocalAddr() } -func (fc *FuzzedConnection) RemoteAddr() net.Addr { return fc.conn.RemoteAddr() } -func (fc *FuzzedConnection) SetDeadline(t time.Time) error { return fc.conn.SetDeadline(t) } -func (fc *FuzzedConnection) SetReadDeadline(t time.Time) error { - return fc.conn.SetReadDeadline(t) -} -func (fc *FuzzedConnection) SetWriteDeadline(t time.Time) error { - return fc.conn.SetWriteDeadline(t) } diff --git a/glide.lock b/glide.lock index 797c86031..423f18a09 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 92a49cbcf88a339e4d29559fe291c30e61eacda1020fd04dfcd97de834e18b3e -updated: 2017-04-10T11:17:14.66226896Z +hash: ef8ea7b02d9a133bfbfcf3f4615d43be0956ad2bc9eb0050e0721fca12d09308 +updated: 2017-04-14T08:28:07.579629532Z imports: - name: github.com/btcsuite/btcd version: 4b348c1d33373d672edd83fc576892d0e46686d2 @@ -25,9 +25,9 @@ imports: - name: github.com/tendermint/go-config version: 620dcbbd7d587cf3599dedbf329b64311b0c307a - name: github.com/tendermint/go-crypto - version: 3f47cfac5fcd9e0f1727c7db980b3559913b3e3a + version: 750b25c47a5782f5f2b773ed9e706cb82b3ccef4 - name: github.com/tendermint/go-data - version: c955b191240568440ea902e14dad2ce19727543a + version: e7fcc6d081ec8518912fcdc103188275f83a3ee5 - name: github.com/tendermint/go-flowrate version: a20c98e61957faa93b4014fbd902f20ab9317a6a subpackages: @@ -41,7 +41,7 @@ imports: subpackages: - term - name: golang.org/x/crypto - version: 9ef620b9ca2f82b55030ffd4f41327fa9e77a92c + version: cbc3d0884eac986df6e78a039b8792e869bff863 subpackages: - curve25519 - nacl/box diff --git a/glide.yaml b/glide.yaml index cf71cc670..2020b0424 100644 --- a/glide.yaml +++ b/glide.yaml @@ -3,8 +3,9 @@ import: - package: github.com/tendermint/go-common - package: github.com/tendermint/go-config - package: github.com/tendermint/go-crypto + version: develop - package: github.com/tendermint/go-data - version: c955b191240568440ea902e14dad2ce19727543a + version: develop - package: github.com/tendermint/go-flowrate subpackages: - flowrate @@ -16,6 +17,7 @@ import: - nacl/box - nacl/secretbox - ripemd160 +- package: github.com/pkg/errors testImport: - package: github.com/stretchr/testify subpackages: diff --git a/netaddress.go b/netaddress.go index 90fcf6a43..263ec9037 100644 --- a/netaddress.go +++ b/netaddress.go @@ -6,6 +6,7 @@ package p2p import ( "errors" + "flag" "net" "strconv" "time" @@ -13,28 +14,36 @@ import ( cmn "github.com/tendermint/go-common" ) +// NetAddress defines information about a peer on the network +// including its IP address, and port. type NetAddress struct { IP net.IP Port uint16 str string } +// NewNetAddress returns a new NetAddress using the provided TCP +// address. When testing, other net.Addr (except TCP) will result in +// using 0.0.0.0:0. When normal run, other net.Addr (except TCP) will +// panic. // TODO: socks proxies? func NewNetAddress(addr net.Addr) *NetAddress { tcpAddr, ok := addr.(*net.TCPAddr) if !ok { - log.Warn(`Only TCPAddrs are supported. If used for anything but testing, - may result in undefined behaviour!`, "addr", addr) - return NewNetAddressIPPort(net.IP("0.0.0.0"), 0) - // NOTE: it would be nice to only not panic if we're in testing ... - // PanicSanity(Fmt("Only TCPAddrs are supported. Got: %v", addr)) + if flag.Lookup("test.v") == nil { // normal run + cmn.PanicSanity(cmn.Fmt("Only TCPAddrs are supported. Got: %v", addr)) + } else { // in testing + return NewNetAddressIPPort(net.IP("0.0.0.0"), 0) + } } ip := tcpAddr.IP port := uint16(tcpAddr.Port) return NewNetAddressIPPort(ip, port) } -// Also resolves the host if host is not an IP. +// NewNetAddressString returns a new NetAddress using the provided +// address in the form of "IP:Port". Also resolves the host if host +// is not an IP. func NewNetAddressString(addr string) (*NetAddress, error) { host, portStr, err := net.SplitHostPort(addr) @@ -62,6 +71,8 @@ func NewNetAddressString(addr string) (*NetAddress, error) { return na, nil } +// NewNetAddressStrings returns an array of NetAddress'es build using +// the provided strings. func NewNetAddressStrings(addrs []string) ([]*NetAddress, error) { netAddrs := make([]*NetAddress, len(addrs)) for i, addr := range addrs { @@ -74,6 +85,8 @@ func NewNetAddressStrings(addrs []string) ([]*NetAddress, error) { return netAddrs, nil } +// NewNetAddressIPPort returns a new NetAddress using the provided IP +// and port number. func NewNetAddressIPPort(ip net.IP, port uint16) *NetAddress { na := &NetAddress{ IP: ip, @@ -86,23 +99,25 @@ func NewNetAddressIPPort(ip net.IP, port uint16) *NetAddress { return na } +// Equals reports whether na and other are the same addresses. func (na *NetAddress) Equals(other interface{}) bool { if o, ok := other.(*NetAddress); ok { return na.String() == o.String() - } else { - return false } + + return false } func (na *NetAddress) Less(other interface{}) bool { if o, ok := other.(*NetAddress); ok { return na.String() < o.String() - } else { - cmn.PanicSanity("Cannot compare unequal types") - return false } + + cmn.PanicSanity("Cannot compare unequal types") + return false } +// String representation. func (na *NetAddress) String() string { if na.str == "" { na.str = net.JoinHostPort( @@ -113,6 +128,7 @@ func (na *NetAddress) String() string { return na.str } +// Dial calls net.Dial on the address. func (na *NetAddress) Dial() (net.Conn, error) { conn, err := net.Dial("tcp", na.String()) if err != nil { @@ -121,6 +137,7 @@ func (na *NetAddress) Dial() (net.Conn, error) { return conn, nil } +// DialTimeout calls net.DialTimeout on the address. func (na *NetAddress) DialTimeout(timeout time.Duration) (net.Conn, error) { conn, err := net.DialTimeout("tcp", na.String(), timeout) if err != nil { @@ -129,6 +146,7 @@ func (na *NetAddress) DialTimeout(timeout time.Duration) (net.Conn, error) { return conn, nil } +// Routable returns true if the address is routable. func (na *NetAddress) Routable() bool { // TODO(oga) bitcoind doesn't include RFC3849 here, but should we? return na.Valid() && !(na.RFC1918() || na.RFC3927() || na.RFC4862() || @@ -142,10 +160,12 @@ func (na *NetAddress) Valid() bool { na.IP.Equal(net.IPv4bcast)) } +// Local returns true if it is a local address. func (na *NetAddress) Local() bool { return na.IP.IsLoopback() || zero4.Contains(na.IP) } +// ReachabilityTo checks whenever o can be reached from na. func (na *NetAddress) ReachabilityTo(o *NetAddress) int { const ( Unreachable = 0 diff --git a/netaddress_test.go b/netaddress_test.go new file mode 100644 index 000000000..db871fdec --- /dev/null +++ b/netaddress_test.go @@ -0,0 +1,113 @@ +package p2p + +import ( + "net" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewNetAddress(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + tcpAddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:8080") + require.Nil(err) + addr := NewNetAddress(tcpAddr) + + assert.Equal("127.0.0.1:8080", addr.String()) + + assert.NotPanics(func() { + NewNetAddress(&net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8000}) + }, "Calling NewNetAddress with UDPAddr should not panic in testing") +} + +func TestNewNetAddressString(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + tests := []struct { + addr string + correct bool + }{ + {"127.0.0.1:8080", true}, + {"127.0.0:8080", false}, + {"a", false}, + {"127.0.0.1:a", false}, + {"a:8080", false}, + {"8082", false}, + {"127.0.0:8080000", false}, + } + + for _, t := range tests { + addr, err := NewNetAddressString(t.addr) + if t.correct { + require.Nil(err) + assert.Equal(t.addr, addr.String()) + } else { + require.NotNil(err) + } + } +} + +func TestNewNetAddressStrings(t *testing.T) { + assert, require := assert.New(t), require.New(t) + addrs, err := NewNetAddressStrings([]string{"127.0.0.1:8080", "127.0.0.2:8080"}) + require.Nil(err) + + assert.Equal(2, len(addrs)) +} + +func TestNewNetAddressIPPort(t *testing.T) { + assert := assert.New(t) + addr := NewNetAddressIPPort(net.ParseIP("127.0.0.1"), 8080) + + assert.Equal("127.0.0.1:8080", addr.String()) +} + +func TestNetAddressProperties(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + // TODO add more test cases + tests := []struct { + addr string + valid bool + local bool + routable bool + }{ + {"127.0.0.1:8080", true, true, false}, + {"ya.ru:80", true, false, true}, + } + + for _, t := range tests { + addr, err := NewNetAddressString(t.addr) + require.Nil(err) + + assert.Equal(t.valid, addr.Valid()) + assert.Equal(t.local, addr.Local()) + assert.Equal(t.routable, addr.Routable()) + } +} + +func TestNetAddressReachabilityTo(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + // TODO add more test cases + tests := []struct { + addr string + other string + reachability int + }{ + {"127.0.0.1:8080", "127.0.0.1:8081", 0}, + {"ya.ru:80", "127.0.0.1:8080", 1}, + } + + for _, t := range tests { + addr, err := NewNetAddressString(t.addr) + require.Nil(err) + + other, err := NewNetAddressString(t.other) + require.Nil(err) + + assert.Equal(t.reachability, addr.ReachabilityTo(other)) + } +} diff --git a/peer.go b/peer.go index 1be50aff1..5461d7e8a 100644 --- a/peer.go +++ b/peer.go @@ -6,8 +6,8 @@ import ( "net" "time" + "github.com/pkg/errors" cmn "github.com/tendermint/go-common" - cfg "github.com/tendermint/go-config" crypto "github.com/tendermint/go-crypto" wire "github.com/tendermint/go-wire" ) @@ -25,23 +25,50 @@ type Peer struct { conn net.Conn // source connection mconn *MConnection // multiplex connection - authEnc bool // authenticated encryption persistent bool - config cfg.Config + config *PeerConfig *NodeInfo Key string Data *cmn.CMap // User data. } -func newPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config cfg.Config, privKey crypto.PrivKeyEd25519) (*Peer, error) { +// PeerConfig is a Peer configuration. +type PeerConfig struct { + AuthEnc bool // authenticated encryption + + HandshakeTimeout time.Duration + DialTimeout time.Duration + + MConfig *MConnConfig + + Fuzz bool // fuzz connection (for testing) + FuzzConfig *FuzzConnConfig +} + +// DefaultPeerConfig returns the default config. +func DefaultPeerConfig() *PeerConfig { + return &PeerConfig{ + AuthEnc: true, + HandshakeTimeout: 2 * time.Second, + DialTimeout: 3 * time.Second, + MConfig: DefaultMConnConfig(), + Fuzz: false, + FuzzConfig: DefaultFuzzConnConfig(), + } +} + +func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) { + return newOutboundPeerWithConfig(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig()) +} + +func newOutboundPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { conn, err := dial(addr, config) if err != nil { - return nil, err + return nil, errors.Wrap(err, "Error creating peer") } - // outbound = true - peer, err := newPeerFromExistingConn(conn, true, reactorsByCh, chDescs, onPeerError, config, privKey) + peer, err := newPeerFromConnAndConfig(conn, true, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config) if err != nil { conn.Close() return nil, err @@ -49,31 +76,43 @@ func newPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*Channel return peer, nil } -func newPeerFromExistingConn(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config cfg.Config, privKey crypto.PrivKeyEd25519) (*Peer, error) { +func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) { + return newInboundPeerWithConfig(conn, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig()) +} + +func newInboundPeerWithConfig(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { + return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config) +} + +func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { + conn := rawConn + + // Fuzz connection + if config.Fuzz { + // so we have time to do peer handshakes and get set up + conn = FuzzConnAfterFromConfig(conn, 10*time.Second, config.FuzzConfig) + } + // Encrypt connection - if config.GetBool(configKeyAuthEnc) { + if config.AuthEnc { + conn.SetDeadline(time.Now().Add(config.HandshakeTimeout)) + var err error - // Set deadline for handshake so we don't block forever on conn.ReadFull - timeout := time.Duration(config.GetInt(configKeyHandshakeTimeoutSeconds)) * time.Second - conn.SetDeadline(time.Now().Add(timeout)) - conn, err = MakeSecretConnection(conn, privKey) + conn, err = MakeSecretConnection(conn, ourNodePrivKey) if err != nil { - return nil, err + return nil, errors.Wrap(err, "Error creating peer") } - // remove deadline - conn.SetDeadline(time.Time{}) } // Key and NodeInfo are set after Handshake p := &Peer{ outbound: outbound, - authEnc: config.GetBool(configKeyAuthEnc), conn: conn, config: config, Data: cmn.NewCMap(), } - p.mconn = createMConnection(conn, p, reactorsByCh, chDescs, onPeerError, config) + p.mconn = createMConnection(conn, p, reactorsByCh, chDescs, onPeerError, config.MConfig) p.BaseService = *cmn.NewBaseService(log, "Peer", p) @@ -119,13 +158,13 @@ func (p *Peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) er log.Notice("Peer handshake", "peerNodeInfo", peerNodeInfo) }) if err1 != nil { - return err1 + return errors.Wrap(err1, "Error during handshake/write") } if err2 != nil { - return err2 + return errors.Wrap(err2, "Error during handshake/read") } - if p.authEnc { + if p.config.AuthEnc { // Check that the professed PubKey matches the sconn's. if !peerNodeInfo.PubKey.Equals(p.PubKey()) { return fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v", @@ -136,7 +175,7 @@ func (p *Peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) er // Remove deadline p.conn.SetDeadline(time.Time{}) - peerNodeInfo.RemoteAddr = p.RemoteAddr().String() + peerNodeInfo.RemoteAddr = p.Addr().String() p.NodeInfo = peerNodeInfo p.Key = peerNodeInfo.PubKey.KeyString() @@ -144,14 +183,14 @@ func (p *Peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) er return nil } -// RemoteAddr returns the remote network address. -func (p *Peer) RemoteAddr() net.Addr { +// Addr returns peer's network address. +func (p *Peer) Addr() net.Addr { return p.conn.RemoteAddr() } -// PubKey returns the remote public key. +// PubKey returns peer's public key. func (p *Peer) PubKey() crypto.PubKeyEd25519 { - if p.authEnc { + if p.config.AuthEnc { return p.conn.(*SecretConnection).RemotePubKey() } if p.NodeInfo == nil { @@ -238,21 +277,17 @@ func (p *Peer) Get(key string) interface{} { return p.Data.Get(key) } -func dial(addr *NetAddress, config cfg.Config) (net.Conn, error) { +func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) { log.Info("Dialing address", "address", addr) - conn, err := addr.DialTimeout(time.Duration( - config.GetInt(configKeyDialTimeoutSeconds)) * time.Second) + conn, err := addr.DialTimeout(config.DialTimeout) if err != nil { log.Info("Failed dialing address", "address", addr, "error", err) return nil, err } - if config.GetBool(configFuzzEnable) { - conn = FuzzConn(config, conn) - } return conn, nil } -func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config cfg.Config) *MConnection { +func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection { onReceive := func(chID byte, msgBytes []byte) { reactor := reactorsByCh[chID] if reactor == nil { @@ -265,10 +300,5 @@ func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, ch onPeerError(p, r) } - mconnConfig := &MConnectionConfig{ - SendRate: int64(config.GetInt(configKeySendRate)), - RecvRate: int64(config.GetInt(configKeyRecvRate)), - } - - return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, mconnConfig) + return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config) } diff --git a/peer_test.go b/peer_test.go new file mode 100644 index 000000000..5f3ed0e23 --- /dev/null +++ b/peer_test.go @@ -0,0 +1,156 @@ +package p2p + +import ( + golog "log" + "net" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + crypto "github.com/tendermint/go-crypto" +) + +func TestPeerBasic(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + // simulate remote peer + rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()} + rp.Start() + defer rp.Stop() + + p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), DefaultPeerConfig()) + require.Nil(err) + + p.Start() + defer p.Stop() + + assert.True(p.IsRunning()) + assert.True(p.IsOutbound()) + assert.False(p.IsPersistent()) + p.makePersistent() + assert.True(p.IsPersistent()) + assert.Equal(rp.Addr().String(), p.Addr().String()) + assert.Equal(rp.PubKey(), p.PubKey()) +} + +func TestPeerWithoutAuthEnc(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + config := DefaultPeerConfig() + config.AuthEnc = false + + // simulate remote peer + rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: config} + rp.Start() + defer rp.Stop() + + p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), config) + require.Nil(err) + + p.Start() + defer p.Stop() + + assert.True(p.IsRunning()) +} + +func TestPeerSend(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + config := DefaultPeerConfig() + config.AuthEnc = false + + // simulate remote peer + rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: config} + rp.Start() + defer rp.Stop() + + p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), config) + require.Nil(err) + + p.Start() + defer p.Stop() + + assert.True(p.CanSend(0x01)) + assert.True(p.Send(0x01, "Asylum")) +} + +func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) (*Peer, error) { + chDescs := []*ChannelDescriptor{ + &ChannelDescriptor{ID: 0x01, Priority: 1}, + } + reactorsByCh := map[byte]Reactor{0x01: NewTestReactor(chDescs, true)} + pk := crypto.GenPrivKeyEd25519() + p, err := newOutboundPeerWithConfig(addr, reactorsByCh, chDescs, func(p *Peer, r interface{}) {}, pk, config) + if err != nil { + return nil, err + } + err = p.HandshakeTimeout(&NodeInfo{ + PubKey: pk.PubKey().(crypto.PubKeyEd25519), + Moniker: "host_peer", + Network: "testing", + Version: "123.123.123", + }, 1*time.Second) + if err != nil { + return nil, err + } + return p, nil +} + +type remotePeer struct { + PrivKey crypto.PrivKeyEd25519 + Config *PeerConfig + addr *NetAddress + quit chan struct{} +} + +func (p *remotePeer) Addr() *NetAddress { + return p.addr +} + +func (p *remotePeer) PubKey() crypto.PubKeyEd25519 { + return p.PrivKey.PubKey().(crypto.PubKeyEd25519) +} + +func (p *remotePeer) Start() { + l, e := net.Listen("tcp", "127.0.0.1:0") // any available address + if e != nil { + golog.Fatalf("net.Listen tcp :0: %+v", e) + } + p.addr = NewNetAddress(l.Addr()) + p.quit = make(chan struct{}) + go p.accept(l) +} + +func (p *remotePeer) Stop() { + close(p.quit) +} + +func (p *remotePeer) accept(l net.Listener) { + for { + conn, err := l.Accept() + if err != nil { + golog.Fatalf("Failed to accept conn: %+v", err) + } + peer, err := newInboundPeerWithConfig(conn, make(map[byte]Reactor), make([]*ChannelDescriptor, 0), func(p *Peer, r interface{}) {}, p.PrivKey, p.Config) + if err != nil { + golog.Fatalf("Failed to create a peer: %+v", err) + } + err = peer.HandshakeTimeout(&NodeInfo{ + PubKey: p.PrivKey.PubKey().(crypto.PubKeyEd25519), + Moniker: "remote_peer", + Network: "testing", + Version: "123.123.123", + }, 1*time.Second) + if err != nil { + golog.Fatalf("Failed to perform handshake: %+v", err) + } + select { + case <-p.quit: + conn.Close() + return + default: + } + } +} diff --git a/switch.go b/switch.go index f38d4feb1..8ceb1ab70 100644 --- a/switch.go +++ b/switch.go @@ -200,7 +200,7 @@ func (sw *Switch) OnStop() { // NOTE: This performs a blocking handshake before the peer is added. // CONTRACT: If error is returned, peer is nil, and conn is immediately closed. func (sw *Switch) AddPeer(peer *Peer) error { - if err := sw.FilterConnByAddr(peer.RemoteAddr()); err != nil { + if err := sw.FilterConnByAddr(peer.Addr()); err != nil { return err } @@ -317,7 +317,7 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, sw.dialing.Set(addr.IP.String(), addr) defer sw.dialing.Delete(addr.IP.String()) - peer, err := newPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey) + peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, peerConfigFromGoConfig(sw.config)) if err != nil { log.Info("Failed dialing peer", "address", addr, "error", err) return nil, err @@ -376,7 +376,7 @@ func (sw *Switch) Peers() IPeerSet { // Disconnect from a peer due to external error, retry if it is a persistent peer. // TODO: make record depending on reason. func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { - addr := NewNetAddress(peer.RemoteAddr()) + addr := NewNetAddress(peer.Addr()) log.Notice("Stopping peer for error", "peer", peer, "error", reason) sw.stopAndRemovePeer(peer, reason) @@ -435,12 +435,8 @@ func (sw *Switch) listenerRoutine(l Listener) { continue } - if sw.config.GetBool(configFuzzEnable) { - inConn = FuzzConn(sw.config, inConn) - } - // New inbound connection! - err := sw.AddPeerWithConnection(inConn, false) + err := sw.addPeerWithConnectionAndConfig(inConn, peerConfigFromGoConfig(sw.config)) if err != nil { log.Notice("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "error", err) continue @@ -502,14 +498,14 @@ func Connect2Switches(switches []*Switch, i, j int) { c1, c2 := net.Pipe() doneCh := make(chan struct{}) go func() { - err := switchI.AddPeerWithConnection(c1, false) + err := switchI.addPeerWithConnection(c1) if PanicOnAddPeerErr && err != nil { panic(err) } doneCh <- struct{}{} }() go func() { - err := switchJ.AddPeerWithConnection(c2, false) + err := switchJ.addPeerWithConnection(c2) if PanicOnAddPeerErr && err != nil { panic(err) } @@ -544,18 +540,52 @@ func makeSwitch(i int, network, version string, initSwitch func(int, *Switch) *S return s } -// AddPeerWithConnection creates a newPeer from the connection, performs the handshake, and adds it to the switch. -func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) error { - peer, err := newPeerFromExistingConn(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey) +func (sw *Switch) addPeerWithConnection(conn net.Conn) error { + peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey) if err != nil { conn.Close() return err } if err = sw.AddPeer(peer); err != nil { - peer.CloseConn() + conn.Close() + return err + } + + return nil +} + +func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error { + peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config) + if err != nil { + conn.Close() + return err + } + + if err = sw.AddPeer(peer); err != nil { + conn.Close() return err } return nil } + +func peerConfigFromGoConfig(config cfg.Config) *PeerConfig { + return &PeerConfig{ + AuthEnc: config.GetBool(configKeyAuthEnc), + Fuzz: config.GetBool(configFuzzEnable), + HandshakeTimeout: time.Duration(config.GetInt(configKeyHandshakeTimeoutSeconds)) * time.Second, + DialTimeout: time.Duration(config.GetInt(configKeyDialTimeoutSeconds)) * time.Second, + MConfig: &MConnConfig{ + SendRate: int64(config.GetInt(configKeySendRate)), + RecvRate: int64(config.GetInt(configKeyRecvRate)), + }, + FuzzConfig: &FuzzConnConfig{ + Mode: config.GetInt(configFuzzMode), + MaxDelay: time.Duration(config.GetInt(configFuzzMaxDelayMilliseconds)) * time.Millisecond, + ProbDropRW: config.GetFloat64(configFuzzProbDropRW), + ProbDropConn: config.GetFloat64(configFuzzProbDropConn), + ProbSleep: config.GetFloat64(configFuzzProbSleep), + }, + } +} diff --git a/switch_test.go b/switch_test.go index a9abfa0a5..a81bb4ac0 100644 --- a/switch_test.go +++ b/switch_test.go @@ -3,7 +3,6 @@ package p2p import ( "bytes" "fmt" - golog "log" "net" "sync" "testing" @@ -12,7 +11,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" . "github.com/tendermint/go-common" - cmn "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" crypto "github.com/tendermint/go-crypto" wire "github.com/tendermint/go-wire" @@ -178,10 +176,10 @@ func TestConnAddrFilter(t *testing.T) { // connect to good peer go func() { - s1.AddPeerWithConnection(c1, false) + s1.addPeerWithConnection(c1) }() go func() { - s2.AddPeerWithConnection(c2, true) + s2.addPeerWithConnection(c2) }() // Wait for things to happen, peers to get added... @@ -213,10 +211,10 @@ func TestConnPubKeyFilter(t *testing.T) { // connect to good peer go func() { - s1.AddPeerWithConnection(c1, false) + s1.addPeerWithConnection(c1) }() go func() { - s2.AddPeerWithConnection(c2, true) + s2.addPeerWithConnection(c2) }() // Wait for things to happen, peers to get added... @@ -239,14 +237,12 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { sw.Start() defer sw.Stop() - sw2 := makeSwitch(2, "testing", "123.123.123", initSwitchFunc) - defer sw2.Stop() - l, serverAddr := listenTCP() - done := make(chan struct{}) - go accept(l, done, sw2) - defer close(done) + // simulate remote peer + rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()} + rp.Start() + defer rp.Stop() - peer, err := newPeer(NewNetAddress(serverAddr), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey) + peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey) require.Nil(err) err = sw.AddPeer(peer) require.Nil(err) @@ -267,14 +263,12 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) { sw.Start() defer sw.Stop() - sw2 := makeSwitch(2, "testing", "123.123.123", initSwitchFunc) - defer sw2.Stop() - l, serverAddr := listenTCP() - done := make(chan struct{}) - go accept(l, done, sw2) - defer close(done) + // simulate remote peer + rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()} + rp.Start() + defer rp.Stop() - peer, err := newPeer(NewNetAddress(serverAddr), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.config, sw.nodePrivKey) + peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey) peer.makePersistent() require.Nil(err) err = sw.AddPeer(peer) @@ -334,48 +328,3 @@ func BenchmarkSwitches(b *testing.B) { time.Sleep(1000 * time.Millisecond) } - -func listenTCP() (net.Listener, net.Addr) { - l, e := net.Listen("tcp", "127.0.0.1:0") // any available address - if e != nil { - golog.Fatalf("net.Listen tcp :0: %+v", e) - } - return l, l.Addr() -} - -// simulate remote peer -func accept(l net.Listener, done <-chan struct{}, sw *Switch) { - for { - conn, err := l.Accept() - if err != nil { - golog.Fatalf("Failed to accept conn: %+v", err) - } - conn, err = MakeSecretConnection(conn, sw.nodePrivKey) - if err != nil { - golog.Fatalf("Failed to make secret conn: %+v", err) - } - var err1, err2 error - nodeInfo := new(NodeInfo) - cmn.Parallel( - func() { - var n int - wire.WriteBinary(sw.nodeInfo, conn, &n, &err1) - }, - func() { - var n int - wire.ReadBinary(nodeInfo, conn, maxNodeInfoSize, &n, &err2) - }) - if err1 != nil { - golog.Fatalf("Failed to do handshake: %+v", err1) - } - if err2 != nil { - golog.Fatalf("Failed to do handshake: %+v", err2) - } - select { - case <-done: - conn.Close() - return - default: - } - } -}