From ea896865a740eaa43076b9867745cb5d2b67635f Mon Sep 17 00:00:00 2001 From: Alexander Simmerl Date: Sat, 2 Jun 2018 17:30:55 +0200 Subject: [PATCH] Collapse PeerConfig into P2PConfig As both configs are concerned with the p2p packaage and PeerConfig is only used inside of the package there is no good reason to keep the couple of fields separate, therefore it is collapsed into the more general P2PConifg. This is a stepping stone towards a setup where the components inside of p2p do not have any knowledge about the config. follow-up to #1325 --- config/config.go | 50 +++++++++++- p2p/conn/connection.go | 8 +- p2p/fuzz.go | 48 +++-------- p2p/peer.go | 159 +++++++++++++++++++++--------------- p2p/peer_test.go | 14 ++-- p2p/pex/pex_reactor_test.go | 27 +++--- p2p/switch.go | 49 ++++++----- p2p/switch_test.go | 46 +++++------ p2p/test_util.go | 8 +- 9 files changed, 239 insertions(+), 170 deletions(-) diff --git a/config/config.go b/config/config.go index 576554809..a5a212f59 100644 --- a/config/config.go +++ b/config/config.go @@ -5,6 +5,15 @@ import ( "os" "path/filepath" "time" + + tmconn "github.com/tendermint/tendermint/p2p/conn" +) + +const ( + // FuzzModeDrop is a mode in which we randomly drop reads/writes, connections or sleep + FuzzModeDrop = iota + // FuzzModeDelay is a mode in which we randomly sleep + FuzzModeDelay ) // NOTE: Most of the structs & relevant comments + the @@ -287,11 +296,24 @@ type P2PConfig struct { // Does not work if the peer-exchange reactor is disabled. SeedMode bool `mapstructure:"seed_mode"` - // Comma separated list of peer IDs to keep private (will not be gossiped to other peers) + // Comma separated list of peer IDs to keep private (will not be gossiped to + // other peers) PrivatePeerIDs string `mapstructure:"private_peer_ids"` // Toggle to disable guard against peers connecting from the same ip. AllowDuplicateIP bool `mapstructure:"allow_duplicate_ip"` + + // Peer connection configuration. + HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"` + DialTimeout time.Duration `mapstructure:"dial_timeout"` + MConfig tmconn.MConnConfig `mapstructure:"connection"` + + // Testing params. + // Force dial to fail + TestDialFail bool `mapstructure:"test_dial_fail"` + // FUzz connection + TestFuzz bool `mapstructure:"test_fuzz"` + TestFuzzConfig *FuzzConnConfig `mapstructure:"test_fuzz_config"` } // DefaultP2PConfig returns a default configuration for the peer-to-peer layer @@ -308,6 +330,12 @@ func DefaultP2PConfig() *P2PConfig { PexReactor: true, SeedMode: false, AllowDuplicateIP: true, // so non-breaking yet + HandshakeTimeout: 20 * time.Second, + DialTimeout: 3 * time.Second, + MConfig: tmconn.DefaultMConnConfig(), + TestDialFail: false, + TestFuzz: false, + TestFuzzConfig: DefaultFuzzConnConfig(), } } @@ -326,6 +354,26 @@ func (cfg *P2PConfig) AddrBookFile() string { return rootify(cfg.AddrBook, cfg.RootDir) } +// FuzzConnConfig is a FuzzedConnection configuration. +type FuzzConnConfig struct { + Mode int + MaxDelay time.Duration + ProbDropRW float64 + ProbDropConn float64 + ProbSleep float64 +} + +// DefaultFuzzConnConfig returns the default config. +func DefaultFuzzConnConfig() *FuzzConnConfig { + return &FuzzConnConfig{ + Mode: FuzzModeDrop, + MaxDelay: 3 * time.Second, + ProbDropRW: 0.2, + ProbDropConn: 0.00, + ProbSleep: 0.00, + } +} + //----------------------------------------------------------------------------- // MempoolConfig diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 6e08c67f9..94856134b 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -83,7 +83,7 @@ type MConnection struct { onReceive receiveCbFunc onError errorCbFunc errored uint32 - config *MConnConfig + config MConnConfig quit chan struct{} flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled. @@ -121,8 +121,8 @@ func (cfg *MConnConfig) maxPacketMsgTotalSize() int { } // DefaultMConnConfig returns the default config. -func DefaultMConnConfig() *MConnConfig { - return &MConnConfig{ +func DefaultMConnConfig() MConnConfig { + return MConnConfig{ SendRate: defaultSendRate, RecvRate: defaultRecvRate, MaxPacketMsgPayloadSize: maxPacketMsgPayloadSizeDefault, @@ -143,7 +143,7 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive recei } // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config -func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection { +func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config MConnConfig) *MConnection { if config.PongTimeout >= config.PingInterval { panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)") } diff --git a/p2p/fuzz.go b/p2p/fuzz.go index 6bfadc29d..8d00ba40d 100644 --- a/p2p/fuzz.go +++ b/p2p/fuzz.go @@ -5,16 +5,10 @@ import ( "sync" "time" + "github.com/tendermint/tendermint/config" cmn "github.com/tendermint/tmlibs/common" ) -const ( - // FuzzModeDrop is a mode in which we randomly drop reads/writes, connections or sleep - FuzzModeDrop = iota - // FuzzModeDelay is a mode in which we randomly sleep - FuzzModeDelay -) - // FuzzedConnection wraps any net.Conn and depending on the mode either delays // reads/writes or randomly drops reads/writes/connections. type FuzzedConnection struct { @@ -24,37 +18,17 @@ type FuzzedConnection struct { start <-chan time.Time active bool - config *FuzzConnConfig -} - -// FuzzConnConfig is a FuzzedConnection configuration. -type FuzzConnConfig struct { - Mode int - MaxDelay time.Duration - ProbDropRW float64 - ProbDropConn float64 - ProbSleep float64 -} - -// DefaultFuzzConnConfig returns the default config. -func DefaultFuzzConnConfig() *FuzzConnConfig { - return &FuzzConnConfig{ - Mode: FuzzModeDrop, - MaxDelay: 3 * time.Second, - ProbDropRW: 0.2, - ProbDropConn: 0.00, - ProbSleep: 0.00, - } + config *config.FuzzConnConfig } // FuzzConn creates a new FuzzedConnection. Fuzzing starts immediately. func FuzzConn(conn net.Conn) net.Conn { - return FuzzConnFromConfig(conn, DefaultFuzzConnConfig()) + return FuzzConnFromConfig(conn, config.DefaultFuzzConnConfig()) } // FuzzConnFromConfig creates a new FuzzedConnection from a config. Fuzzing // starts immediately. -func FuzzConnFromConfig(conn net.Conn, config *FuzzConnConfig) net.Conn { +func FuzzConnFromConfig(conn net.Conn, config *config.FuzzConnConfig) net.Conn { return &FuzzedConnection{ conn: conn, start: make(<-chan time.Time), @@ -66,12 +40,16 @@ func FuzzConnFromConfig(conn net.Conn, config *FuzzConnConfig) net.Conn { // FuzzConnAfter creates a new FuzzedConnection. Fuzzing starts when the // duration elapses. func FuzzConnAfter(conn net.Conn, d time.Duration) net.Conn { - return FuzzConnAfterFromConfig(conn, d, DefaultFuzzConnConfig()) + return FuzzConnAfterFromConfig(conn, d, config.DefaultFuzzConnConfig()) } // FuzzConnAfterFromConfig creates a new FuzzedConnection from a config. // Fuzzing starts when the duration elapses. -func FuzzConnAfterFromConfig(conn net.Conn, d time.Duration, config *FuzzConnConfig) net.Conn { +func FuzzConnAfterFromConfig( + conn net.Conn, + d time.Duration, + config *config.FuzzConnConfig, +) net.Conn { return &FuzzedConnection{ conn: conn, start: time.After(d), @@ -81,7 +59,7 @@ func FuzzConnAfterFromConfig(conn net.Conn, d time.Duration, config *FuzzConnCon } // Config returns the connection's config. -func (fc *FuzzedConnection) Config() *FuzzConnConfig { +func (fc *FuzzedConnection) Config() *config.FuzzConnConfig { return fc.config } @@ -136,7 +114,7 @@ func (fc *FuzzedConnection) fuzz() bool { } switch fc.config.Mode { - case FuzzModeDrop: + case config.FuzzModeDrop: // randomly drop the r/w, drop the conn, or sleep r := cmn.RandFloat64() if r <= fc.config.ProbDropRW { @@ -149,7 +127,7 @@ func (fc *FuzzedConnection) fuzz() bool { } else if r < fc.config.ProbDropRW+fc.config.ProbDropConn+fc.config.ProbSleep { time.Sleep(fc.randomDuration()) } - case FuzzModeDelay: + case config.FuzzModeDelay: // sleep a bit time.Sleep(fc.randomDuration()) } diff --git a/p2p/peer.go b/p2p/peer.go index 29f424653..73e2eac20 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -10,10 +10,11 @@ import ( cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" + "github.com/tendermint/tendermint/config" tmconn "github.com/tendermint/tendermint/p2p/conn" ) -var testIPSuffix uint32 = 0 +var testIPSuffix uint32 // Peer is an interface representing a peer connected on a reactor. type Peer interface { @@ -39,7 +40,7 @@ type Peer interface { type peerConn struct { outbound bool persistent bool - config *PeerConfig + config *config.P2PConfig conn net.Conn // source connection ip net.IP } @@ -99,94 +100,95 @@ type peer struct { Data *cmn.CMap } -func newPeer(pc peerConn, nodeInfo NodeInfo, - reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor, - onPeerError func(Peer, interface{})) *peer { - +func newPeer( + pc peerConn, + nodeInfo NodeInfo, + reactorsByCh map[byte]Reactor, + chDescs []*tmconn.ChannelDescriptor, + onPeerError func(Peer, interface{}), +) *peer { p := &peer{ peerConn: pc, nodeInfo: nodeInfo, channels: nodeInfo.Channels, Data: cmn.NewCMap(), } - p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig) - p.BaseService = *cmn.NewBaseService(nil, "Peer", p) - return p -} - -// PeerConfig is a Peer configuration. -type PeerConfig struct { - // times are in seconds - HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"` - DialTimeout time.Duration `mapstructure:"dial_timeout"` - MConfig *tmconn.MConnConfig `mapstructure:"connection"` - - DialFail bool `mapstructure:"dial_fail"` // for testing - Fuzz bool `mapstructure:"fuzz"` // fuzz connection (for testing) - FuzzConfig *FuzzConnConfig `mapstructure:"fuzz_config"` -} + p.mconn = createMConnection( + pc.conn, + p, + reactorsByCh, + chDescs, + onPeerError, + pc.config.MConfig, + ) + p.BaseService = *cmn.NewBaseService(nil, "Peer", p) -// DefaultPeerConfig returns the default config. -func DefaultPeerConfig() *PeerConfig { - return &PeerConfig{ - HandshakeTimeout: 20, // * time.Second, - DialTimeout: 3, // * time.Second, - MConfig: tmconn.DefaultMConnConfig(), - DialFail: false, - Fuzz: false, - FuzzConfig: DefaultFuzzConnConfig(), - } + return p } -func newOutboundPeerConn(addr *NetAddress, config *PeerConfig, persistent bool, ourNodePrivKey crypto.PrivKey) (peerConn, error) { - var pc peerConn - +func newOutboundPeerConn( + addr *NetAddress, + config *config.P2PConfig, + persistent bool, + ourNodePrivKey crypto.PrivKey, +) (peerConn, error) { conn, err := dial(addr, config) if err != nil { - return pc, cmn.ErrorWrap(err, "Error creating peer") + return peerConn{}, cmn.ErrorWrap(err, "Error creating peer") } - pc, err = newPeerConn(conn, config, true, persistent, ourNodePrivKey) + pc, err := newPeerConn(conn, config, true, persistent, ourNodePrivKey) if err != nil { - if err2 := conn.Close(); err2 != nil { - return pc, cmn.ErrorWrap(err, err2.Error()) + if cerr := conn.Close(); cerr != nil { + return peerConn{}, cmn.ErrorWrap(err, cerr.Error()) } - return pc, err + return peerConn{}, err } // ensure dialed ID matches connection ID if addr.ID != pc.ID() { - if err2 := conn.Close(); err2 != nil { - return pc, cmn.ErrorWrap(err, err2.Error()) + if cerr := conn.Close(); cerr != nil { + return peerConn{}, cmn.ErrorWrap(err, cerr.Error()) } - return pc, ErrSwitchAuthenticationFailure{addr, pc.ID()} + return peerConn{}, ErrSwitchAuthenticationFailure{addr, pc.ID()} } + return pc, nil } -func newInboundPeerConn(conn net.Conn, config *PeerConfig, ourNodePrivKey crypto.PrivKey) (peerConn, error) { +func newInboundPeerConn( + conn net.Conn, + config *config.P2PConfig, + ourNodePrivKey crypto.PrivKey, +) (peerConn, error) { // TODO: issue PoW challenge return newPeerConn(conn, config, false, false, ourNodePrivKey) } -func newPeerConn(rawConn net.Conn, - config *PeerConfig, outbound, persistent bool, - ourNodePrivKey crypto.PrivKey) (pc peerConn, err error) { - +func newPeerConn( + rawConn net.Conn, + cfg *config.P2PConfig, + outbound, persistent bool, + ourNodePrivKey crypto.PrivKey, +) (pc peerConn, err error) { conn := rawConn // Fuzz connection - if config.Fuzz { + if cfg.TestFuzz { // so we have time to do peer handshakes and get set up - conn = FuzzConnAfterFromConfig(conn, 10*time.Second, config.FuzzConfig) + conn = FuzzConnAfterFromConfig(conn, 10*time.Second, cfg.TestFuzzConfig) } // Set deadline for secret handshake - if err := conn.SetDeadline(time.Now().Add(config.HandshakeTimeout * time.Second)); err != nil { - return pc, cmn.ErrorWrap(err, "Error setting deadline while encrypting connection") + dl := time.Now().Add(cfg.HandshakeTimeout) + if err := conn.SetDeadline(dl); err != nil { + return pc, cmn.ErrorWrap( + err, + "Error setting deadline while encrypting connection", + ) } // Encrypt connection @@ -197,7 +199,7 @@ func newPeerConn(rawConn net.Conn, // Only the information we already have return peerConn{ - config: config, + config: cfg, outbound: outbound, persistent: persistent, conn: conn, @@ -300,22 +302,33 @@ func (p *peer) hasChannel(chID byte) bool { } // NOTE: probably will want to remove this // but could be helpful while the feature is new - p.Logger.Debug("Unknown channel for peer", "channel", chID, "channels", p.channels) + p.Logger.Debug( + "Unknown channel for peer", + "channel", + chID, + "channels", + p.channels, + ) return false } //--------------------------------------------------- // methods used by the Switch -// CloseConn should be called by the Switch if the peer was created but never started. +// CloseConn should be called by the Switch if the peer was created but never +// started. func (pc *peerConn) CloseConn() { pc.conn.Close() // nolint: errcheck } -// HandshakeTimeout performs the Tendermint P2P handshake between a given node and the peer -// by exchanging their NodeInfo. It sets the received nodeInfo on the peer. +// HandshakeTimeout performs the Tendermint P2P handshake between a given node +// and the peer by exchanging their NodeInfo. It sets the received nodeInfo on +// the peer. // NOTE: blocking -func (pc *peerConn) HandshakeTimeout(ourNodeInfo NodeInfo, timeout time.Duration) (peerNodeInfo NodeInfo, err error) { +func (pc *peerConn) HandshakeTimeout( + ourNodeInfo NodeInfo, + timeout time.Duration, +) (peerNodeInfo NodeInfo, err error) { // Set deadline for handshake so we don't block forever on conn.ReadFull if err := pc.conn.SetDeadline(time.Now().Add(timeout)); err != nil { return peerNodeInfo, cmn.ErrorWrap(err, "Error setting deadline") @@ -327,7 +340,11 @@ func (pc *peerConn) HandshakeTimeout(ourNodeInfo NodeInfo, timeout time.Duration return }, func(_ int) (val interface{}, err error, abort bool) { - _, err = cdc.UnmarshalBinaryReader(pc.conn, &peerNodeInfo, int64(MaxNodeInfoSize())) + _, err = cdc.UnmarshalBinaryReader( + pc.conn, + &peerNodeInfo, + int64(MaxNodeInfoSize()), + ) return }, ) @@ -368,20 +385,26 @@ func (p *peer) String() string { //------------------------------------------------------------------ // helper funcs -func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) { - if config.DialFail { +func dial(addr *NetAddress, cfg *config.P2PConfig) (net.Conn, error) { + if cfg.TestDialFail { return nil, fmt.Errorf("dial err (peerConfig.DialFail == true)") } - conn, err := addr.DialTimeout(config.DialTimeout * time.Second) + conn, err := addr.DialTimeout(cfg.DialTimeout) if err != nil { return nil, err } return conn, nil } -func createMConnection(conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor, - onPeerError func(Peer, interface{}), config *tmconn.MConnConfig) *tmconn.MConnection { +func createMConnection( + conn net.Conn, + p *peer, + reactorsByCh map[byte]Reactor, + chDescs []*tmconn.ChannelDescriptor, + onPeerError func(Peer, interface{}), + config tmconn.MConnConfig, +) *tmconn.MConnection { onReceive := func(chID byte, msgBytes []byte) { reactor := reactorsByCh[chID] @@ -397,5 +420,11 @@ func createMConnection(conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, ch onPeerError(p, r) } - return tmconn.NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config) + return tmconn.NewMConnectionWithConfig( + conn, + chDescs, + onReceive, + onError, + config, + ) } diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 435c941fe..d4781c658 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -10,9 +10,11 @@ import ( "github.com/stretchr/testify/require" crypto "github.com/tendermint/go-crypto" - tmconn "github.com/tendermint/tendermint/p2p/conn" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" + + "github.com/tendermint/tendermint/config" + tmconn "github.com/tendermint/tendermint/p2p/conn" ) const testCh = 0x01 @@ -21,11 +23,11 @@ func TestPeerBasic(t *testing.T) { assert, require := assert.New(t), require.New(t) // simulate remote peer - rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()} + rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: cfg} rp.Start() defer rp.Stop() - p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), DefaultPeerConfig()) + p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), cfg) require.Nil(err) err = p.Start() @@ -44,7 +46,7 @@ func TestPeerBasic(t *testing.T) { func TestPeerSend(t *testing.T) { assert, require := assert.New(t), require.New(t) - config := DefaultPeerConfig() + config := cfg // simulate remote peer rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: config} @@ -63,7 +65,7 @@ func TestPeerSend(t *testing.T) { assert.True(p.Send(testCh, []byte("Asylum"))) } -func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) (*peer, error) { +func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *config.P2PConfig) (*peer, error) { chDescs := []*tmconn.ChannelDescriptor{ {ID: testCh, Priority: 1}, } @@ -91,7 +93,7 @@ func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) type remotePeer struct { PrivKey crypto.PrivKey - Config *PeerConfig + Config *config.P2PConfig addr *NetAddress quit chan struct{} channels cmn.HexBytes diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 307427b5a..03769cf07 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -13,21 +13,22 @@ import ( "github.com/stretchr/testify/require" crypto "github.com/tendermint/go-crypto" - cfg "github.com/tendermint/tendermint/config" - "github.com/tendermint/tendermint/p2p" - "github.com/tendermint/tendermint/p2p/conn" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" + + "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/p2p/conn" ) var ( - config *cfg.P2PConfig + cfg *config.P2PConfig ) func init() { - config = cfg.DefaultP2PConfig() - config.PexReactor = true - config.AllowDuplicateIP = true + cfg = config.DefaultP2PConfig() + cfg.PexReactor = true + cfg.AllowDuplicateIP = true } func TestPEXReactorBasic(t *testing.T) { @@ -84,7 +85,7 @@ func TestPEXReactorRunning(t *testing.T) { // create switches for i := 0; i < N; i++ { - switches[i] = p2p.MakeSwitch(config, i, "testing", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { + switches[i] = p2p.MakeSwitch(cfg, i, "testing", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { books[i] = NewAddrBook(filepath.Join(dir, fmt.Sprintf("addrbook%d.json", i)), false) books[i].SetLogger(logger.With("pex", i)) sw.SetAddrBook(books[i]) @@ -212,7 +213,7 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) { // 1. create seed seed := p2p.MakeSwitch( - config, + cfg, 0, "127.0.0.1", "123.123.123", @@ -242,7 +243,7 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) { // 2. create usual peer with only seed configured. peer := p2p.MakeSwitch( - config, + cfg, 1, "127.0.0.1", "123.123.123", @@ -428,7 +429,7 @@ func assertPeersWithTimeout( } } -func createReactor(config *PEXReactorConfig) (r *PEXReactor, book *addrBook) { +func createReactor(conf *PEXReactorConfig) (r *PEXReactor, book *addrBook) { // directory to store address book dir, err := ioutil.TempDir("", "pex_reactor") if err != nil { @@ -437,7 +438,7 @@ func createReactor(config *PEXReactorConfig) (r *PEXReactor, book *addrBook) { book = NewAddrBook(filepath.Join(dir, "addrbook.json"), true) book.SetLogger(log.TestingLogger()) - r = NewPEXReactor(book, config) + r = NewPEXReactor(book, conf) r.SetLogger(log.TestingLogger()) return } @@ -450,7 +451,7 @@ func teardownReactor(book *addrBook) { } func createSwitchAndAddReactors(reactors ...p2p.Reactor) *p2p.Switch { - sw := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw }) + sw := p2p.MakeSwitch(cfg, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw }) sw.SetLogger(log.TestingLogger()) for _, r := range reactors { sw.AddReactor(r.String(), r) diff --git a/p2p/switch.go b/p2p/switch.go index 939af0bbf..9068aa113 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -7,7 +7,7 @@ import ( "sync" "time" - cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/p2p/conn" cmn "github.com/tendermint/tmlibs/common" ) @@ -55,8 +55,7 @@ type AddrBook interface { type Switch struct { cmn.BaseService - config *cfg.P2PConfig - peerConfig *PeerConfig + config *config.P2PConfig listeners []Listener reactors map[string]Reactor chDescs []*conn.ChannelDescriptor @@ -75,10 +74,9 @@ type Switch struct { } // NewSwitch creates a new Switch with the given config. -func NewSwitch(config *cfg.P2PConfig) *Switch { +func NewSwitch(cfg *config.P2PConfig) *Switch { sw := &Switch{ - config: config, - peerConfig: DefaultPeerConfig(), + config: cfg, reactors: make(map[string]Reactor), chDescs: make([]*conn.ChannelDescriptor, 0), reactorsByCh: make(map[byte]Reactor), @@ -90,11 +88,10 @@ func NewSwitch(config *cfg.P2PConfig) *Switch { // Ensure we have a completely undeterministic PRNG. sw.rng = cmn.NewRand() - // TODO: collapse the peerConfig into the config ? - sw.peerConfig.MConfig.FlushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond - sw.peerConfig.MConfig.SendRate = config.SendRate - sw.peerConfig.MConfig.RecvRate = config.RecvRate - sw.peerConfig.MConfig.MaxPacketMsgPayloadSize = config.MaxPacketMsgPayloadSize + sw.config.MConfig.FlushThrottle = time.Duration(cfg.FlushThrottleTimeout) * time.Millisecond + sw.config.MConfig.SendRate = cfg.SendRate + sw.config.MConfig.RecvRate = cfg.RecvRate + sw.config.MConfig.MaxPacketMsgPayloadSize = cfg.MaxPacketMsgPayloadSize sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw) return sw @@ -419,7 +416,7 @@ func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent b func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) error { sw.dialing.Set(string(addr.ID), addr) defer sw.dialing.Delete(string(addr.ID)) - return sw.addOutboundPeerWithConfig(addr, sw.peerConfig, persistent) + return sw.addOutboundPeerWithConfig(addr, sw.config, persistent) } // sleep for interval plus some random amount of ms on [0, dialRandomizerIntervalMilliseconds] @@ -476,7 +473,7 @@ func (sw *Switch) listenerRoutine(l Listener) { } // New inbound connection! - err := sw.addInboundPeerWithConfig(inConn, sw.peerConfig) + err := sw.addInboundPeerWithConfig(inConn, sw.config) if err != nil { sw.Logger.Info("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "err", err) continue @@ -486,7 +483,10 @@ func (sw *Switch) listenerRoutine(l Listener) { // cleanup } -func (sw *Switch) addInboundPeerWithConfig(conn net.Conn, config *PeerConfig) error { +func (sw *Switch) addInboundPeerWithConfig( + conn net.Conn, + config *config.P2PConfig, +) error { peerConn, err := newInboundPeerConn(conn, config, sw.nodeKey.PrivKey) if err != nil { conn.Close() // peer is nil @@ -503,10 +503,20 @@ func (sw *Switch) addInboundPeerWithConfig(conn net.Conn, config *PeerConfig) er // dial the peer; make secret connection; authenticate against the dialed ID; // add the peer. // if dialing fails, start the reconnect loop. If handhsake fails, its over. -// If peer is started succesffuly, reconnectLoop will start when StopPeerForError is called -func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig, persistent bool) error { +// If peer is started succesffuly, reconnectLoop will start when +// StopPeerForError is called +func (sw *Switch) addOutboundPeerWithConfig( + addr *NetAddress, + config *config.P2PConfig, + persistent bool, +) error { sw.Logger.Info("Dialing peer", "address", addr) - peerConn, err := newOutboundPeerConn(addr, config, persistent, sw.nodeKey.PrivKey) + peerConn, err := newOutboundPeerConn( + addr, + config, + persistent, + sw.nodeKey.PrivKey, + ) if err != nil { if persistent { go sw.reconnectToPeer(addr) @@ -525,7 +535,8 @@ func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig // that already has a SecretConnection. If all goes well, // it starts the peer and adds it to the switch. // NOTE: This performs a blocking handshake before the peer is added. -// NOTE: If error is returned, caller is responsible for calling peer.CloseConn() +// NOTE: If error is returned, caller is responsible for calling +// peer.CloseConn() func (sw *Switch) addPeer(pc peerConn) error { addr := pc.conn.RemoteAddr() @@ -534,7 +545,7 @@ func (sw *Switch) addPeer(pc peerConn) error { } // Exchange NodeInfo on the conn - peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second)) + peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.config.HandshakeTimeout)) if err != nil { return err } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index d33797a2b..6157f45c6 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -14,18 +14,18 @@ import ( crypto "github.com/tendermint/go-crypto" "github.com/tendermint/tmlibs/log" - cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/p2p/conn" ) var ( - config *cfg.P2PConfig + cfg *config.P2PConfig ) func init() { - config = cfg.DefaultP2PConfig() - config.PexReactor = true - config.AllowDuplicateIP = true + cfg = config.DefaultP2PConfig() + cfg.PexReactor = true + cfg.AllowDuplicateIP = true } type PeerMessage struct { @@ -85,7 +85,7 @@ func (tr *TestReactor) getMsgs(chID byte) []PeerMessage { // XXX: note this uses net.Pipe and not a proper TCP conn func MakeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) { // Create two switches that will be interconnected. - switches := MakeConnectedSwitches(config, 2, initSwitch, Connect2Switches) + switches := MakeConnectedSwitches(cfg, 2, initSwitch, Connect2Switches) return switches[0], switches[1] } @@ -152,8 +152,8 @@ func assertMsgReceivedWithTimeout(t *testing.T, msgBytes []byte, channel byte, r } func TestConnAddrFilter(t *testing.T) { - s1 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) - s2 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) + s1 := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) + s2 := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) defer s1.Stop() defer s2.Stop() @@ -181,14 +181,14 @@ func TestConnAddrFilter(t *testing.T) { } func TestSwitchFiltersOutItself(t *testing.T) { - s1 := MakeSwitch(config, 1, "127.0.0.1", "123.123.123", initSwitchFunc) + s1 := MakeSwitch(cfg, 1, "127.0.0.1", "123.123.123", initSwitchFunc) // addr := s1.NodeInfo().NetAddress() // // add ourselves like we do in node.go#427 // s1.addrBook.AddOurAddress(addr) // simulate s1 having a public IP by creating a remote peer with the same ID - rp := &remotePeer{PrivKey: s1.nodeKey.PrivKey, Config: DefaultPeerConfig()} + rp := &remotePeer{PrivKey: s1.nodeKey.PrivKey, Config: cfg} rp.Start() // addr should be rejected in addPeer based on the same ID @@ -214,8 +214,8 @@ func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration) } func TestConnIDFilter(t *testing.T) { - s1 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) - s2 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) + s1 := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) + s2 := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) defer s1.Stop() defer s2.Stop() @@ -251,7 +251,7 @@ func TestConnIDFilter(t *testing.T) { func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { assert, require := assert.New(t), require.New(t) - sw := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) + sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) err := sw.Start() if err != nil { t.Error(err) @@ -259,11 +259,11 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { defer sw.Stop() // simulate remote peer - rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()} + rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: cfg} rp.Start() defer rp.Stop() - pc, err := newOutboundPeerConn(rp.Addr(), DefaultPeerConfig(), false, sw.nodeKey.PrivKey) + pc, err := newOutboundPeerConn(rp.Addr(), cfg, false, sw.nodeKey.PrivKey) require.Nil(err) err = sw.addPeer(pc) require.Nil(err) @@ -281,7 +281,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { func TestSwitchReconnectsToPersistentPeer(t *testing.T) { assert, require := assert.New(t), require.New(t) - sw := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) + sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) err := sw.Start() if err != nil { t.Error(err) @@ -289,11 +289,11 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) { defer sw.Stop() // simulate remote peer - rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()} + rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: cfg} rp.Start() defer rp.Stop() - pc, err := newOutboundPeerConn(rp.Addr(), DefaultPeerConfig(), true, sw.nodeKey.PrivKey) + pc, err := newOutboundPeerConn(rp.Addr(), cfg, true, sw.nodeKey.PrivKey) // sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, require.Nil(err) @@ -320,7 +320,7 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) { // simulate another remote peer rp = &remotePeer{ PrivKey: crypto.GenPrivKeyEd25519(), - Config: DefaultPeerConfig(), + Config: cfg, // Use different interface to prevent duplicate IP filter, this will break // beyond two peers. listenAddr: "127.0.0.1:0", @@ -329,9 +329,9 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) { defer rp.Stop() // simulate first time dial failure - peerConfig := DefaultPeerConfig() - peerConfig.DialFail = true - err = sw.addOutboundPeerWithConfig(rp.Addr(), peerConfig, true) + conf := config.DefaultP2PConfig() + conf.TestDialFail = true + err = sw.addOutboundPeerWithConfig(rp.Addr(), conf, true) require.NotNil(err) // DialPeerWithAddres - sw.peerConfig resets the dialer @@ -348,7 +348,7 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) { } func TestSwitchFullConnectivity(t *testing.T) { - switches := MakeConnectedSwitches(config, 3, initSwitchFunc, Connect2Switches) + switches := MakeConnectedSwitches(cfg, 3, initSwitchFunc, Connect2Switches) defer func() { for _, sw := range switches { sw.Stop() diff --git a/p2p/test_util.go b/p2p/test_util.go index b5b739af9..bea815f51 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -8,7 +8,7 @@ import ( cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" - cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/p2p/conn" ) @@ -56,7 +56,7 @@ const TEST_HOST = "localhost" // If connect==Connect2Switches, the switches will be fully connected. // initSwitch defines how the i'th switch should be initialized (ie. with what reactors). // NOTE: panics if any switch fails to start. -func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch { +func MakeConnectedSwitches(cfg *config.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch { switches := make([]*Switch, n) for i := 0; i < n; i++ { switches[i] = MakeSwitch(cfg, i, TEST_HOST, "123.123.123", initSwitch) @@ -104,7 +104,7 @@ func Connect2Switches(switches []*Switch, i, j int) { } func (sw *Switch) addPeerWithConnection(conn net.Conn) error { - pc, err := newInboundPeerConn(conn, sw.peerConfig, sw.nodeKey.PrivKey) + pc, err := newInboundPeerConn(conn, sw.config, sw.nodeKey.PrivKey) if err != nil { if err := conn.Close(); err != nil { sw.Logger.Error("Error closing connection", "err", err) @@ -131,7 +131,7 @@ func StartSwitches(switches []*Switch) error { return nil } -func MakeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch { +func MakeSwitch(cfg *config.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch { // new switch, add reactors // TODO: let the config be passed in? nodeKey := &NodeKey{