diff --git a/config/config.go b/config/config.go index 576554809..a5a212f59 100644 --- a/config/config.go +++ b/config/config.go @@ -5,6 +5,15 @@ import ( "os" "path/filepath" "time" + + tmconn "github.com/tendermint/tendermint/p2p/conn" +) + +const ( + // FuzzModeDrop is a mode in which we randomly drop reads/writes, connections or sleep + FuzzModeDrop = iota + // FuzzModeDelay is a mode in which we randomly sleep + FuzzModeDelay ) // NOTE: Most of the structs & relevant comments + the @@ -287,11 +296,24 @@ type P2PConfig struct { // Does not work if the peer-exchange reactor is disabled. SeedMode bool `mapstructure:"seed_mode"` - // Comma separated list of peer IDs to keep private (will not be gossiped to other peers) + // Comma separated list of peer IDs to keep private (will not be gossiped to + // other peers) PrivatePeerIDs string `mapstructure:"private_peer_ids"` // Toggle to disable guard against peers connecting from the same ip. AllowDuplicateIP bool `mapstructure:"allow_duplicate_ip"` + + // Peer connection configuration. + HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"` + DialTimeout time.Duration `mapstructure:"dial_timeout"` + MConfig tmconn.MConnConfig `mapstructure:"connection"` + + // Testing params. + // Force dial to fail + TestDialFail bool `mapstructure:"test_dial_fail"` + // FUzz connection + TestFuzz bool `mapstructure:"test_fuzz"` + TestFuzzConfig *FuzzConnConfig `mapstructure:"test_fuzz_config"` } // DefaultP2PConfig returns a default configuration for the peer-to-peer layer @@ -308,6 +330,12 @@ func DefaultP2PConfig() *P2PConfig { PexReactor: true, SeedMode: false, AllowDuplicateIP: true, // so non-breaking yet + HandshakeTimeout: 20 * time.Second, + DialTimeout: 3 * time.Second, + MConfig: tmconn.DefaultMConnConfig(), + TestDialFail: false, + TestFuzz: false, + TestFuzzConfig: DefaultFuzzConnConfig(), } } @@ -326,6 +354,26 @@ func (cfg *P2PConfig) AddrBookFile() string { return rootify(cfg.AddrBook, cfg.RootDir) } +// FuzzConnConfig is a FuzzedConnection configuration. +type FuzzConnConfig struct { + Mode int + MaxDelay time.Duration + ProbDropRW float64 + ProbDropConn float64 + ProbSleep float64 +} + +// DefaultFuzzConnConfig returns the default config. +func DefaultFuzzConnConfig() *FuzzConnConfig { + return &FuzzConnConfig{ + Mode: FuzzModeDrop, + MaxDelay: 3 * time.Second, + ProbDropRW: 0.2, + ProbDropConn: 0.00, + ProbSleep: 0.00, + } +} + //----------------------------------------------------------------------------- // MempoolConfig diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 6e08c67f9..94856134b 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -83,7 +83,7 @@ type MConnection struct { onReceive receiveCbFunc onError errorCbFunc errored uint32 - config *MConnConfig + config MConnConfig quit chan struct{} flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled. @@ -121,8 +121,8 @@ func (cfg *MConnConfig) maxPacketMsgTotalSize() int { } // DefaultMConnConfig returns the default config. -func DefaultMConnConfig() *MConnConfig { - return &MConnConfig{ +func DefaultMConnConfig() MConnConfig { + return MConnConfig{ SendRate: defaultSendRate, RecvRate: defaultRecvRate, MaxPacketMsgPayloadSize: maxPacketMsgPayloadSizeDefault, @@ -143,7 +143,7 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive recei } // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config -func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection { +func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config MConnConfig) *MConnection { if config.PongTimeout >= config.PingInterval { panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)") } diff --git a/p2p/fuzz.go b/p2p/fuzz.go index 6bfadc29d..8d00ba40d 100644 --- a/p2p/fuzz.go +++ b/p2p/fuzz.go @@ -5,16 +5,10 @@ import ( "sync" "time" + "github.com/tendermint/tendermint/config" cmn "github.com/tendermint/tmlibs/common" ) -const ( - // FuzzModeDrop is a mode in which we randomly drop reads/writes, connections or sleep - FuzzModeDrop = iota - // FuzzModeDelay is a mode in which we randomly sleep - FuzzModeDelay -) - // FuzzedConnection wraps any net.Conn and depending on the mode either delays // reads/writes or randomly drops reads/writes/connections. type FuzzedConnection struct { @@ -24,37 +18,17 @@ type FuzzedConnection struct { start <-chan time.Time active bool - config *FuzzConnConfig -} - -// FuzzConnConfig is a FuzzedConnection configuration. -type FuzzConnConfig struct { - Mode int - MaxDelay time.Duration - ProbDropRW float64 - ProbDropConn float64 - ProbSleep float64 -} - -// DefaultFuzzConnConfig returns the default config. -func DefaultFuzzConnConfig() *FuzzConnConfig { - return &FuzzConnConfig{ - Mode: FuzzModeDrop, - MaxDelay: 3 * time.Second, - ProbDropRW: 0.2, - ProbDropConn: 0.00, - ProbSleep: 0.00, - } + config *config.FuzzConnConfig } // FuzzConn creates a new FuzzedConnection. Fuzzing starts immediately. func FuzzConn(conn net.Conn) net.Conn { - return FuzzConnFromConfig(conn, DefaultFuzzConnConfig()) + return FuzzConnFromConfig(conn, config.DefaultFuzzConnConfig()) } // FuzzConnFromConfig creates a new FuzzedConnection from a config. Fuzzing // starts immediately. -func FuzzConnFromConfig(conn net.Conn, config *FuzzConnConfig) net.Conn { +func FuzzConnFromConfig(conn net.Conn, config *config.FuzzConnConfig) net.Conn { return &FuzzedConnection{ conn: conn, start: make(<-chan time.Time), @@ -66,12 +40,16 @@ func FuzzConnFromConfig(conn net.Conn, config *FuzzConnConfig) net.Conn { // FuzzConnAfter creates a new FuzzedConnection. Fuzzing starts when the // duration elapses. func FuzzConnAfter(conn net.Conn, d time.Duration) net.Conn { - return FuzzConnAfterFromConfig(conn, d, DefaultFuzzConnConfig()) + return FuzzConnAfterFromConfig(conn, d, config.DefaultFuzzConnConfig()) } // FuzzConnAfterFromConfig creates a new FuzzedConnection from a config. // Fuzzing starts when the duration elapses. -func FuzzConnAfterFromConfig(conn net.Conn, d time.Duration, config *FuzzConnConfig) net.Conn { +func FuzzConnAfterFromConfig( + conn net.Conn, + d time.Duration, + config *config.FuzzConnConfig, +) net.Conn { return &FuzzedConnection{ conn: conn, start: time.After(d), @@ -81,7 +59,7 @@ func FuzzConnAfterFromConfig(conn net.Conn, d time.Duration, config *FuzzConnCon } // Config returns the connection's config. -func (fc *FuzzedConnection) Config() *FuzzConnConfig { +func (fc *FuzzedConnection) Config() *config.FuzzConnConfig { return fc.config } @@ -136,7 +114,7 @@ func (fc *FuzzedConnection) fuzz() bool { } switch fc.config.Mode { - case FuzzModeDrop: + case config.FuzzModeDrop: // randomly drop the r/w, drop the conn, or sleep r := cmn.RandFloat64() if r <= fc.config.ProbDropRW { @@ -149,7 +127,7 @@ func (fc *FuzzedConnection) fuzz() bool { } else if r < fc.config.ProbDropRW+fc.config.ProbDropConn+fc.config.ProbSleep { time.Sleep(fc.randomDuration()) } - case FuzzModeDelay: + case config.FuzzModeDelay: // sleep a bit time.Sleep(fc.randomDuration()) } diff --git a/p2p/peer.go b/p2p/peer.go index 29f424653..73e2eac20 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -10,10 +10,11 @@ import ( cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" + "github.com/tendermint/tendermint/config" tmconn "github.com/tendermint/tendermint/p2p/conn" ) -var testIPSuffix uint32 = 0 +var testIPSuffix uint32 // Peer is an interface representing a peer connected on a reactor. type Peer interface { @@ -39,7 +40,7 @@ type Peer interface { type peerConn struct { outbound bool persistent bool - config *PeerConfig + config *config.P2PConfig conn net.Conn // source connection ip net.IP } @@ -99,94 +100,95 @@ type peer struct { Data *cmn.CMap } -func newPeer(pc peerConn, nodeInfo NodeInfo, - reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor, - onPeerError func(Peer, interface{})) *peer { - +func newPeer( + pc peerConn, + nodeInfo NodeInfo, + reactorsByCh map[byte]Reactor, + chDescs []*tmconn.ChannelDescriptor, + onPeerError func(Peer, interface{}), +) *peer { p := &peer{ peerConn: pc, nodeInfo: nodeInfo, channels: nodeInfo.Channels, Data: cmn.NewCMap(), } - p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig) - p.BaseService = *cmn.NewBaseService(nil, "Peer", p) - return p -} - -// PeerConfig is a Peer configuration. -type PeerConfig struct { - // times are in seconds - HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"` - DialTimeout time.Duration `mapstructure:"dial_timeout"` - MConfig *tmconn.MConnConfig `mapstructure:"connection"` - - DialFail bool `mapstructure:"dial_fail"` // for testing - Fuzz bool `mapstructure:"fuzz"` // fuzz connection (for testing) - FuzzConfig *FuzzConnConfig `mapstructure:"fuzz_config"` -} + p.mconn = createMConnection( + pc.conn, + p, + reactorsByCh, + chDescs, + onPeerError, + pc.config.MConfig, + ) + p.BaseService = *cmn.NewBaseService(nil, "Peer", p) -// DefaultPeerConfig returns the default config. -func DefaultPeerConfig() *PeerConfig { - return &PeerConfig{ - HandshakeTimeout: 20, // * time.Second, - DialTimeout: 3, // * time.Second, - MConfig: tmconn.DefaultMConnConfig(), - DialFail: false, - Fuzz: false, - FuzzConfig: DefaultFuzzConnConfig(), - } + return p } -func newOutboundPeerConn(addr *NetAddress, config *PeerConfig, persistent bool, ourNodePrivKey crypto.PrivKey) (peerConn, error) { - var pc peerConn - +func newOutboundPeerConn( + addr *NetAddress, + config *config.P2PConfig, + persistent bool, + ourNodePrivKey crypto.PrivKey, +) (peerConn, error) { conn, err := dial(addr, config) if err != nil { - return pc, cmn.ErrorWrap(err, "Error creating peer") + return peerConn{}, cmn.ErrorWrap(err, "Error creating peer") } - pc, err = newPeerConn(conn, config, true, persistent, ourNodePrivKey) + pc, err := newPeerConn(conn, config, true, persistent, ourNodePrivKey) if err != nil { - if err2 := conn.Close(); err2 != nil { - return pc, cmn.ErrorWrap(err, err2.Error()) + if cerr := conn.Close(); cerr != nil { + return peerConn{}, cmn.ErrorWrap(err, cerr.Error()) } - return pc, err + return peerConn{}, err } // ensure dialed ID matches connection ID if addr.ID != pc.ID() { - if err2 := conn.Close(); err2 != nil { - return pc, cmn.ErrorWrap(err, err2.Error()) + if cerr := conn.Close(); cerr != nil { + return peerConn{}, cmn.ErrorWrap(err, cerr.Error()) } - return pc, ErrSwitchAuthenticationFailure{addr, pc.ID()} + return peerConn{}, ErrSwitchAuthenticationFailure{addr, pc.ID()} } + return pc, nil } -func newInboundPeerConn(conn net.Conn, config *PeerConfig, ourNodePrivKey crypto.PrivKey) (peerConn, error) { +func newInboundPeerConn( + conn net.Conn, + config *config.P2PConfig, + ourNodePrivKey crypto.PrivKey, +) (peerConn, error) { // TODO: issue PoW challenge return newPeerConn(conn, config, false, false, ourNodePrivKey) } -func newPeerConn(rawConn net.Conn, - config *PeerConfig, outbound, persistent bool, - ourNodePrivKey crypto.PrivKey) (pc peerConn, err error) { - +func newPeerConn( + rawConn net.Conn, + cfg *config.P2PConfig, + outbound, persistent bool, + ourNodePrivKey crypto.PrivKey, +) (pc peerConn, err error) { conn := rawConn // Fuzz connection - if config.Fuzz { + if cfg.TestFuzz { // so we have time to do peer handshakes and get set up - conn = FuzzConnAfterFromConfig(conn, 10*time.Second, config.FuzzConfig) + conn = FuzzConnAfterFromConfig(conn, 10*time.Second, cfg.TestFuzzConfig) } // Set deadline for secret handshake - if err := conn.SetDeadline(time.Now().Add(config.HandshakeTimeout * time.Second)); err != nil { - return pc, cmn.ErrorWrap(err, "Error setting deadline while encrypting connection") + dl := time.Now().Add(cfg.HandshakeTimeout) + if err := conn.SetDeadline(dl); err != nil { + return pc, cmn.ErrorWrap( + err, + "Error setting deadline while encrypting connection", + ) } // Encrypt connection @@ -197,7 +199,7 @@ func newPeerConn(rawConn net.Conn, // Only the information we already have return peerConn{ - config: config, + config: cfg, outbound: outbound, persistent: persistent, conn: conn, @@ -300,22 +302,33 @@ func (p *peer) hasChannel(chID byte) bool { } // NOTE: probably will want to remove this // but could be helpful while the feature is new - p.Logger.Debug("Unknown channel for peer", "channel", chID, "channels", p.channels) + p.Logger.Debug( + "Unknown channel for peer", + "channel", + chID, + "channels", + p.channels, + ) return false } //--------------------------------------------------- // methods used by the Switch -// CloseConn should be called by the Switch if the peer was created but never started. +// CloseConn should be called by the Switch if the peer was created but never +// started. func (pc *peerConn) CloseConn() { pc.conn.Close() // nolint: errcheck } -// HandshakeTimeout performs the Tendermint P2P handshake between a given node and the peer -// by exchanging their NodeInfo. It sets the received nodeInfo on the peer. +// HandshakeTimeout performs the Tendermint P2P handshake between a given node +// and the peer by exchanging their NodeInfo. It sets the received nodeInfo on +// the peer. // NOTE: blocking -func (pc *peerConn) HandshakeTimeout(ourNodeInfo NodeInfo, timeout time.Duration) (peerNodeInfo NodeInfo, err error) { +func (pc *peerConn) HandshakeTimeout( + ourNodeInfo NodeInfo, + timeout time.Duration, +) (peerNodeInfo NodeInfo, err error) { // Set deadline for handshake so we don't block forever on conn.ReadFull if err := pc.conn.SetDeadline(time.Now().Add(timeout)); err != nil { return peerNodeInfo, cmn.ErrorWrap(err, "Error setting deadline") @@ -327,7 +340,11 @@ func (pc *peerConn) HandshakeTimeout(ourNodeInfo NodeInfo, timeout time.Duration return }, func(_ int) (val interface{}, err error, abort bool) { - _, err = cdc.UnmarshalBinaryReader(pc.conn, &peerNodeInfo, int64(MaxNodeInfoSize())) + _, err = cdc.UnmarshalBinaryReader( + pc.conn, + &peerNodeInfo, + int64(MaxNodeInfoSize()), + ) return }, ) @@ -368,20 +385,26 @@ func (p *peer) String() string { //------------------------------------------------------------------ // helper funcs -func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) { - if config.DialFail { +func dial(addr *NetAddress, cfg *config.P2PConfig) (net.Conn, error) { + if cfg.TestDialFail { return nil, fmt.Errorf("dial err (peerConfig.DialFail == true)") } - conn, err := addr.DialTimeout(config.DialTimeout * time.Second) + conn, err := addr.DialTimeout(cfg.DialTimeout) if err != nil { return nil, err } return conn, nil } -func createMConnection(conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor, - onPeerError func(Peer, interface{}), config *tmconn.MConnConfig) *tmconn.MConnection { +func createMConnection( + conn net.Conn, + p *peer, + reactorsByCh map[byte]Reactor, + chDescs []*tmconn.ChannelDescriptor, + onPeerError func(Peer, interface{}), + config tmconn.MConnConfig, +) *tmconn.MConnection { onReceive := func(chID byte, msgBytes []byte) { reactor := reactorsByCh[chID] @@ -397,5 +420,11 @@ func createMConnection(conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, ch onPeerError(p, r) } - return tmconn.NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config) + return tmconn.NewMConnectionWithConfig( + conn, + chDescs, + onReceive, + onError, + config, + ) } diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 435c941fe..d4781c658 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -10,9 +10,11 @@ import ( "github.com/stretchr/testify/require" crypto "github.com/tendermint/go-crypto" - tmconn "github.com/tendermint/tendermint/p2p/conn" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" + + "github.com/tendermint/tendermint/config" + tmconn "github.com/tendermint/tendermint/p2p/conn" ) const testCh = 0x01 @@ -21,11 +23,11 @@ func TestPeerBasic(t *testing.T) { assert, require := assert.New(t), require.New(t) // simulate remote peer - rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()} + rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: cfg} rp.Start() defer rp.Stop() - p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), DefaultPeerConfig()) + p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), cfg) require.Nil(err) err = p.Start() @@ -44,7 +46,7 @@ func TestPeerBasic(t *testing.T) { func TestPeerSend(t *testing.T) { assert, require := assert.New(t), require.New(t) - config := DefaultPeerConfig() + config := cfg // simulate remote peer rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: config} @@ -63,7 +65,7 @@ func TestPeerSend(t *testing.T) { assert.True(p.Send(testCh, []byte("Asylum"))) } -func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) (*peer, error) { +func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *config.P2PConfig) (*peer, error) { chDescs := []*tmconn.ChannelDescriptor{ {ID: testCh, Priority: 1}, } @@ -91,7 +93,7 @@ func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) type remotePeer struct { PrivKey crypto.PrivKey - Config *PeerConfig + Config *config.P2PConfig addr *NetAddress quit chan struct{} channels cmn.HexBytes diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 307427b5a..03769cf07 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -13,21 +13,22 @@ import ( "github.com/stretchr/testify/require" crypto "github.com/tendermint/go-crypto" - cfg "github.com/tendermint/tendermint/config" - "github.com/tendermint/tendermint/p2p" - "github.com/tendermint/tendermint/p2p/conn" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" + + "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/p2p/conn" ) var ( - config *cfg.P2PConfig + cfg *config.P2PConfig ) func init() { - config = cfg.DefaultP2PConfig() - config.PexReactor = true - config.AllowDuplicateIP = true + cfg = config.DefaultP2PConfig() + cfg.PexReactor = true + cfg.AllowDuplicateIP = true } func TestPEXReactorBasic(t *testing.T) { @@ -84,7 +85,7 @@ func TestPEXReactorRunning(t *testing.T) { // create switches for i := 0; i < N; i++ { - switches[i] = p2p.MakeSwitch(config, i, "testing", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { + switches[i] = p2p.MakeSwitch(cfg, i, "testing", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { books[i] = NewAddrBook(filepath.Join(dir, fmt.Sprintf("addrbook%d.json", i)), false) books[i].SetLogger(logger.With("pex", i)) sw.SetAddrBook(books[i]) @@ -212,7 +213,7 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) { // 1. create seed seed := p2p.MakeSwitch( - config, + cfg, 0, "127.0.0.1", "123.123.123", @@ -242,7 +243,7 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) { // 2. create usual peer with only seed configured. peer := p2p.MakeSwitch( - config, + cfg, 1, "127.0.0.1", "123.123.123", @@ -428,7 +429,7 @@ func assertPeersWithTimeout( } } -func createReactor(config *PEXReactorConfig) (r *PEXReactor, book *addrBook) { +func createReactor(conf *PEXReactorConfig) (r *PEXReactor, book *addrBook) { // directory to store address book dir, err := ioutil.TempDir("", "pex_reactor") if err != nil { @@ -437,7 +438,7 @@ func createReactor(config *PEXReactorConfig) (r *PEXReactor, book *addrBook) { book = NewAddrBook(filepath.Join(dir, "addrbook.json"), true) book.SetLogger(log.TestingLogger()) - r = NewPEXReactor(book, config) + r = NewPEXReactor(book, conf) r.SetLogger(log.TestingLogger()) return } @@ -450,7 +451,7 @@ func teardownReactor(book *addrBook) { } func createSwitchAndAddReactors(reactors ...p2p.Reactor) *p2p.Switch { - sw := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw }) + sw := p2p.MakeSwitch(cfg, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw }) sw.SetLogger(log.TestingLogger()) for _, r := range reactors { sw.AddReactor(r.String(), r) diff --git a/p2p/switch.go b/p2p/switch.go index 939af0bbf..9068aa113 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -7,7 +7,7 @@ import ( "sync" "time" - cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/p2p/conn" cmn "github.com/tendermint/tmlibs/common" ) @@ -55,8 +55,7 @@ type AddrBook interface { type Switch struct { cmn.BaseService - config *cfg.P2PConfig - peerConfig *PeerConfig + config *config.P2PConfig listeners []Listener reactors map[string]Reactor chDescs []*conn.ChannelDescriptor @@ -75,10 +74,9 @@ type Switch struct { } // NewSwitch creates a new Switch with the given config. -func NewSwitch(config *cfg.P2PConfig) *Switch { +func NewSwitch(cfg *config.P2PConfig) *Switch { sw := &Switch{ - config: config, - peerConfig: DefaultPeerConfig(), + config: cfg, reactors: make(map[string]Reactor), chDescs: make([]*conn.ChannelDescriptor, 0), reactorsByCh: make(map[byte]Reactor), @@ -90,11 +88,10 @@ func NewSwitch(config *cfg.P2PConfig) *Switch { // Ensure we have a completely undeterministic PRNG. sw.rng = cmn.NewRand() - // TODO: collapse the peerConfig into the config ? - sw.peerConfig.MConfig.FlushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond - sw.peerConfig.MConfig.SendRate = config.SendRate - sw.peerConfig.MConfig.RecvRate = config.RecvRate - sw.peerConfig.MConfig.MaxPacketMsgPayloadSize = config.MaxPacketMsgPayloadSize + sw.config.MConfig.FlushThrottle = time.Duration(cfg.FlushThrottleTimeout) * time.Millisecond + sw.config.MConfig.SendRate = cfg.SendRate + sw.config.MConfig.RecvRate = cfg.RecvRate + sw.config.MConfig.MaxPacketMsgPayloadSize = cfg.MaxPacketMsgPayloadSize sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw) return sw @@ -419,7 +416,7 @@ func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent b func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) error { sw.dialing.Set(string(addr.ID), addr) defer sw.dialing.Delete(string(addr.ID)) - return sw.addOutboundPeerWithConfig(addr, sw.peerConfig, persistent) + return sw.addOutboundPeerWithConfig(addr, sw.config, persistent) } // sleep for interval plus some random amount of ms on [0, dialRandomizerIntervalMilliseconds] @@ -476,7 +473,7 @@ func (sw *Switch) listenerRoutine(l Listener) { } // New inbound connection! - err := sw.addInboundPeerWithConfig(inConn, sw.peerConfig) + err := sw.addInboundPeerWithConfig(inConn, sw.config) if err != nil { sw.Logger.Info("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "err", err) continue @@ -486,7 +483,10 @@ func (sw *Switch) listenerRoutine(l Listener) { // cleanup } -func (sw *Switch) addInboundPeerWithConfig(conn net.Conn, config *PeerConfig) error { +func (sw *Switch) addInboundPeerWithConfig( + conn net.Conn, + config *config.P2PConfig, +) error { peerConn, err := newInboundPeerConn(conn, config, sw.nodeKey.PrivKey) if err != nil { conn.Close() // peer is nil @@ -503,10 +503,20 @@ func (sw *Switch) addInboundPeerWithConfig(conn net.Conn, config *PeerConfig) er // dial the peer; make secret connection; authenticate against the dialed ID; // add the peer. // if dialing fails, start the reconnect loop. If handhsake fails, its over. -// If peer is started succesffuly, reconnectLoop will start when StopPeerForError is called -func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig, persistent bool) error { +// If peer is started succesffuly, reconnectLoop will start when +// StopPeerForError is called +func (sw *Switch) addOutboundPeerWithConfig( + addr *NetAddress, + config *config.P2PConfig, + persistent bool, +) error { sw.Logger.Info("Dialing peer", "address", addr) - peerConn, err := newOutboundPeerConn(addr, config, persistent, sw.nodeKey.PrivKey) + peerConn, err := newOutboundPeerConn( + addr, + config, + persistent, + sw.nodeKey.PrivKey, + ) if err != nil { if persistent { go sw.reconnectToPeer(addr) @@ -525,7 +535,8 @@ func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig // that already has a SecretConnection. If all goes well, // it starts the peer and adds it to the switch. // NOTE: This performs a blocking handshake before the peer is added. -// NOTE: If error is returned, caller is responsible for calling peer.CloseConn() +// NOTE: If error is returned, caller is responsible for calling +// peer.CloseConn() func (sw *Switch) addPeer(pc peerConn) error { addr := pc.conn.RemoteAddr() @@ -534,7 +545,7 @@ func (sw *Switch) addPeer(pc peerConn) error { } // Exchange NodeInfo on the conn - peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second)) + peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.config.HandshakeTimeout)) if err != nil { return err } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index d33797a2b..6157f45c6 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -14,18 +14,18 @@ import ( crypto "github.com/tendermint/go-crypto" "github.com/tendermint/tmlibs/log" - cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/p2p/conn" ) var ( - config *cfg.P2PConfig + cfg *config.P2PConfig ) func init() { - config = cfg.DefaultP2PConfig() - config.PexReactor = true - config.AllowDuplicateIP = true + cfg = config.DefaultP2PConfig() + cfg.PexReactor = true + cfg.AllowDuplicateIP = true } type PeerMessage struct { @@ -85,7 +85,7 @@ func (tr *TestReactor) getMsgs(chID byte) []PeerMessage { // XXX: note this uses net.Pipe and not a proper TCP conn func MakeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) { // Create two switches that will be interconnected. - switches := MakeConnectedSwitches(config, 2, initSwitch, Connect2Switches) + switches := MakeConnectedSwitches(cfg, 2, initSwitch, Connect2Switches) return switches[0], switches[1] } @@ -152,8 +152,8 @@ func assertMsgReceivedWithTimeout(t *testing.T, msgBytes []byte, channel byte, r } func TestConnAddrFilter(t *testing.T) { - s1 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) - s2 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) + s1 := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) + s2 := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) defer s1.Stop() defer s2.Stop() @@ -181,14 +181,14 @@ func TestConnAddrFilter(t *testing.T) { } func TestSwitchFiltersOutItself(t *testing.T) { - s1 := MakeSwitch(config, 1, "127.0.0.1", "123.123.123", initSwitchFunc) + s1 := MakeSwitch(cfg, 1, "127.0.0.1", "123.123.123", initSwitchFunc) // addr := s1.NodeInfo().NetAddress() // // add ourselves like we do in node.go#427 // s1.addrBook.AddOurAddress(addr) // simulate s1 having a public IP by creating a remote peer with the same ID - rp := &remotePeer{PrivKey: s1.nodeKey.PrivKey, Config: DefaultPeerConfig()} + rp := &remotePeer{PrivKey: s1.nodeKey.PrivKey, Config: cfg} rp.Start() // addr should be rejected in addPeer based on the same ID @@ -214,8 +214,8 @@ func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration) } func TestConnIDFilter(t *testing.T) { - s1 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) - s2 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) + s1 := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) + s2 := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) defer s1.Stop() defer s2.Stop() @@ -251,7 +251,7 @@ func TestConnIDFilter(t *testing.T) { func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { assert, require := assert.New(t), require.New(t) - sw := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) + sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) err := sw.Start() if err != nil { t.Error(err) @@ -259,11 +259,11 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { defer sw.Stop() // simulate remote peer - rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()} + rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: cfg} rp.Start() defer rp.Stop() - pc, err := newOutboundPeerConn(rp.Addr(), DefaultPeerConfig(), false, sw.nodeKey.PrivKey) + pc, err := newOutboundPeerConn(rp.Addr(), cfg, false, sw.nodeKey.PrivKey) require.Nil(err) err = sw.addPeer(pc) require.Nil(err) @@ -281,7 +281,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { func TestSwitchReconnectsToPersistentPeer(t *testing.T) { assert, require := assert.New(t), require.New(t) - sw := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) + sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) err := sw.Start() if err != nil { t.Error(err) @@ -289,11 +289,11 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) { defer sw.Stop() // simulate remote peer - rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()} + rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: cfg} rp.Start() defer rp.Stop() - pc, err := newOutboundPeerConn(rp.Addr(), DefaultPeerConfig(), true, sw.nodeKey.PrivKey) + pc, err := newOutboundPeerConn(rp.Addr(), cfg, true, sw.nodeKey.PrivKey) // sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, require.Nil(err) @@ -320,7 +320,7 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) { // simulate another remote peer rp = &remotePeer{ PrivKey: crypto.GenPrivKeyEd25519(), - Config: DefaultPeerConfig(), + Config: cfg, // Use different interface to prevent duplicate IP filter, this will break // beyond two peers. listenAddr: "127.0.0.1:0", @@ -329,9 +329,9 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) { defer rp.Stop() // simulate first time dial failure - peerConfig := DefaultPeerConfig() - peerConfig.DialFail = true - err = sw.addOutboundPeerWithConfig(rp.Addr(), peerConfig, true) + conf := config.DefaultP2PConfig() + conf.TestDialFail = true + err = sw.addOutboundPeerWithConfig(rp.Addr(), conf, true) require.NotNil(err) // DialPeerWithAddres - sw.peerConfig resets the dialer @@ -348,7 +348,7 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) { } func TestSwitchFullConnectivity(t *testing.T) { - switches := MakeConnectedSwitches(config, 3, initSwitchFunc, Connect2Switches) + switches := MakeConnectedSwitches(cfg, 3, initSwitchFunc, Connect2Switches) defer func() { for _, sw := range switches { sw.Stop() diff --git a/p2p/test_util.go b/p2p/test_util.go index b5b739af9..bea815f51 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -8,7 +8,7 @@ import ( cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" - cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/p2p/conn" ) @@ -56,7 +56,7 @@ const TEST_HOST = "localhost" // If connect==Connect2Switches, the switches will be fully connected. // initSwitch defines how the i'th switch should be initialized (ie. with what reactors). // NOTE: panics if any switch fails to start. -func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch { +func MakeConnectedSwitches(cfg *config.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch { switches := make([]*Switch, n) for i := 0; i < n; i++ { switches[i] = MakeSwitch(cfg, i, TEST_HOST, "123.123.123", initSwitch) @@ -104,7 +104,7 @@ func Connect2Switches(switches []*Switch, i, j int) { } func (sw *Switch) addPeerWithConnection(conn net.Conn) error { - pc, err := newInboundPeerConn(conn, sw.peerConfig, sw.nodeKey.PrivKey) + pc, err := newInboundPeerConn(conn, sw.config, sw.nodeKey.PrivKey) if err != nil { if err := conn.Close(); err != nil { sw.Logger.Error("Error closing connection", "err", err) @@ -131,7 +131,7 @@ func StartSwitches(switches []*Switch) error { return nil } -func MakeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch { +func MakeSwitch(cfg *config.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch { // new switch, add reactors // TODO: let the config be passed in? nodeKey := &NodeKey{