Browse Source

Merge pull request #1676 from tendermint/xla/collapse-peerconfig

Collapse PeerConfig into P2PConfig
pull/1638/merge
Ethan Buchman 6 years ago
committed by GitHub
parent
commit
fd4db8dfdc
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 239 additions and 170 deletions
  1. +49
    -1
      config/config.go
  2. +4
    -4
      p2p/conn/connection.go
  3. +13
    -35
      p2p/fuzz.go
  4. +94
    -65
      p2p/peer.go
  5. +8
    -6
      p2p/peer_test.go
  6. +14
    -13
      p2p/pex/pex_reactor_test.go
  7. +30
    -19
      p2p/switch.go
  8. +23
    -23
      p2p/switch_test.go
  9. +4
    -4
      p2p/test_util.go

+ 49
- 1
config/config.go View File

@ -5,6 +5,15 @@ import (
"os"
"path/filepath"
"time"
tmconn "github.com/tendermint/tendermint/p2p/conn"
)
const (
// FuzzModeDrop is a mode in which we randomly drop reads/writes, connections or sleep
FuzzModeDrop = iota
// FuzzModeDelay is a mode in which we randomly sleep
FuzzModeDelay
)
// NOTE: Most of the structs & relevant comments + the
@ -287,11 +296,24 @@ type P2PConfig struct {
// Does not work if the peer-exchange reactor is disabled.
SeedMode bool `mapstructure:"seed_mode"`
// Comma separated list of peer IDs to keep private (will not be gossiped to other peers)
// Comma separated list of peer IDs to keep private (will not be gossiped to
// other peers)
PrivatePeerIDs string `mapstructure:"private_peer_ids"`
// Toggle to disable guard against peers connecting from the same ip.
AllowDuplicateIP bool `mapstructure:"allow_duplicate_ip"`
// Peer connection configuration.
HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"`
DialTimeout time.Duration `mapstructure:"dial_timeout"`
MConfig tmconn.MConnConfig `mapstructure:"connection"`
// Testing params.
// Force dial to fail
TestDialFail bool `mapstructure:"test_dial_fail"`
// FUzz connection
TestFuzz bool `mapstructure:"test_fuzz"`
TestFuzzConfig *FuzzConnConfig `mapstructure:"test_fuzz_config"`
}
// DefaultP2PConfig returns a default configuration for the peer-to-peer layer
@ -308,6 +330,12 @@ func DefaultP2PConfig() *P2PConfig {
PexReactor: true,
SeedMode: false,
AllowDuplicateIP: true, // so non-breaking yet
HandshakeTimeout: 20 * time.Second,
DialTimeout: 3 * time.Second,
MConfig: tmconn.DefaultMConnConfig(),
TestDialFail: false,
TestFuzz: false,
TestFuzzConfig: DefaultFuzzConnConfig(),
}
}
@ -326,6 +354,26 @@ func (cfg *P2PConfig) AddrBookFile() string {
return rootify(cfg.AddrBook, cfg.RootDir)
}
// FuzzConnConfig is a FuzzedConnection configuration.
type FuzzConnConfig struct {
Mode int
MaxDelay time.Duration
ProbDropRW float64
ProbDropConn float64
ProbSleep float64
}
// DefaultFuzzConnConfig returns the default config.
func DefaultFuzzConnConfig() *FuzzConnConfig {
return &FuzzConnConfig{
Mode: FuzzModeDrop,
MaxDelay: 3 * time.Second,
ProbDropRW: 0.2,
ProbDropConn: 0.00,
ProbSleep: 0.00,
}
}
//-----------------------------------------------------------------------------
// MempoolConfig


+ 4
- 4
p2p/conn/connection.go View File

@ -83,7 +83,7 @@ type MConnection struct {
onReceive receiveCbFunc
onError errorCbFunc
errored uint32
config *MConnConfig
config MConnConfig
quit chan struct{}
flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
@ -121,8 +121,8 @@ func (cfg *MConnConfig) maxPacketMsgTotalSize() int {
}
// DefaultMConnConfig returns the default config.
func DefaultMConnConfig() *MConnConfig {
return &MConnConfig{
func DefaultMConnConfig() MConnConfig {
return MConnConfig{
SendRate: defaultSendRate,
RecvRate: defaultRecvRate,
MaxPacketMsgPayloadSize: maxPacketMsgPayloadSizeDefault,
@ -143,7 +143,7 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive recei
}
// NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config
func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config MConnConfig) *MConnection {
if config.PongTimeout >= config.PingInterval {
panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)")
}


+ 13
- 35
p2p/fuzz.go View File

@ -5,16 +5,10 @@ import (
"sync"
"time"
"github.com/tendermint/tendermint/config"
cmn "github.com/tendermint/tmlibs/common"
)
const (
// FuzzModeDrop is a mode in which we randomly drop reads/writes, connections or sleep
FuzzModeDrop = iota
// FuzzModeDelay is a mode in which we randomly sleep
FuzzModeDelay
)
// FuzzedConnection wraps any net.Conn and depending on the mode either delays
// reads/writes or randomly drops reads/writes/connections.
type FuzzedConnection struct {
@ -24,37 +18,17 @@ type FuzzedConnection struct {
start <-chan time.Time
active bool
config *FuzzConnConfig
}
// FuzzConnConfig is a FuzzedConnection configuration.
type FuzzConnConfig struct {
Mode int
MaxDelay time.Duration
ProbDropRW float64
ProbDropConn float64
ProbSleep float64
}
// DefaultFuzzConnConfig returns the default config.
func DefaultFuzzConnConfig() *FuzzConnConfig {
return &FuzzConnConfig{
Mode: FuzzModeDrop,
MaxDelay: 3 * time.Second,
ProbDropRW: 0.2,
ProbDropConn: 0.00,
ProbSleep: 0.00,
}
config *config.FuzzConnConfig
}
// FuzzConn creates a new FuzzedConnection. Fuzzing starts immediately.
func FuzzConn(conn net.Conn) net.Conn {
return FuzzConnFromConfig(conn, DefaultFuzzConnConfig())
return FuzzConnFromConfig(conn, config.DefaultFuzzConnConfig())
}
// FuzzConnFromConfig creates a new FuzzedConnection from a config. Fuzzing
// starts immediately.
func FuzzConnFromConfig(conn net.Conn, config *FuzzConnConfig) net.Conn {
func FuzzConnFromConfig(conn net.Conn, config *config.FuzzConnConfig) net.Conn {
return &FuzzedConnection{
conn: conn,
start: make(<-chan time.Time),
@ -66,12 +40,16 @@ func FuzzConnFromConfig(conn net.Conn, config *FuzzConnConfig) net.Conn {
// FuzzConnAfter creates a new FuzzedConnection. Fuzzing starts when the
// duration elapses.
func FuzzConnAfter(conn net.Conn, d time.Duration) net.Conn {
return FuzzConnAfterFromConfig(conn, d, DefaultFuzzConnConfig())
return FuzzConnAfterFromConfig(conn, d, config.DefaultFuzzConnConfig())
}
// FuzzConnAfterFromConfig creates a new FuzzedConnection from a config.
// Fuzzing starts when the duration elapses.
func FuzzConnAfterFromConfig(conn net.Conn, d time.Duration, config *FuzzConnConfig) net.Conn {
func FuzzConnAfterFromConfig(
conn net.Conn,
d time.Duration,
config *config.FuzzConnConfig,
) net.Conn {
return &FuzzedConnection{
conn: conn,
start: time.After(d),
@ -81,7 +59,7 @@ func FuzzConnAfterFromConfig(conn net.Conn, d time.Duration, config *FuzzConnCon
}
// Config returns the connection's config.
func (fc *FuzzedConnection) Config() *FuzzConnConfig {
func (fc *FuzzedConnection) Config() *config.FuzzConnConfig {
return fc.config
}
@ -136,7 +114,7 @@ func (fc *FuzzedConnection) fuzz() bool {
}
switch fc.config.Mode {
case FuzzModeDrop:
case config.FuzzModeDrop:
// randomly drop the r/w, drop the conn, or sleep
r := cmn.RandFloat64()
if r <= fc.config.ProbDropRW {
@ -149,7 +127,7 @@ func (fc *FuzzedConnection) fuzz() bool {
} else if r < fc.config.ProbDropRW+fc.config.ProbDropConn+fc.config.ProbSleep {
time.Sleep(fc.randomDuration())
}
case FuzzModeDelay:
case config.FuzzModeDelay:
// sleep a bit
time.Sleep(fc.randomDuration())
}


+ 94
- 65
p2p/peer.go View File

@ -10,10 +10,11 @@ import (
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
"github.com/tendermint/tendermint/config"
tmconn "github.com/tendermint/tendermint/p2p/conn"
)
var testIPSuffix uint32 = 0
var testIPSuffix uint32
// Peer is an interface representing a peer connected on a reactor.
type Peer interface {
@ -39,7 +40,7 @@ type Peer interface {
type peerConn struct {
outbound bool
persistent bool
config *PeerConfig
config *config.P2PConfig
conn net.Conn // source connection
ip net.IP
}
@ -99,94 +100,95 @@ type peer struct {
Data *cmn.CMap
}
func newPeer(pc peerConn, nodeInfo NodeInfo,
reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{})) *peer {
func newPeer(
pc peerConn,
nodeInfo NodeInfo,
reactorsByCh map[byte]Reactor,
chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}),
) *peer {
p := &peer{
peerConn: pc,
nodeInfo: nodeInfo,
channels: nodeInfo.Channels,
Data: cmn.NewCMap(),
}
p.mconn = createMConnection(pc.conn, p, reactorsByCh, chDescs, onPeerError, pc.config.MConfig)
p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
return p
}
// PeerConfig is a Peer configuration.
type PeerConfig struct {
// times are in seconds
HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"`
DialTimeout time.Duration `mapstructure:"dial_timeout"`
MConfig *tmconn.MConnConfig `mapstructure:"connection"`
DialFail bool `mapstructure:"dial_fail"` // for testing
Fuzz bool `mapstructure:"fuzz"` // fuzz connection (for testing)
FuzzConfig *FuzzConnConfig `mapstructure:"fuzz_config"`
}
p.mconn = createMConnection(
pc.conn,
p,
reactorsByCh,
chDescs,
onPeerError,
pc.config.MConfig,
)
p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
// DefaultPeerConfig returns the default config.
func DefaultPeerConfig() *PeerConfig {
return &PeerConfig{
HandshakeTimeout: 20, // * time.Second,
DialTimeout: 3, // * time.Second,
MConfig: tmconn.DefaultMConnConfig(),
DialFail: false,
Fuzz: false,
FuzzConfig: DefaultFuzzConnConfig(),
}
return p
}
func newOutboundPeerConn(addr *NetAddress, config *PeerConfig, persistent bool, ourNodePrivKey crypto.PrivKey) (peerConn, error) {
var pc peerConn
func newOutboundPeerConn(
addr *NetAddress,
config *config.P2PConfig,
persistent bool,
ourNodePrivKey crypto.PrivKey,
) (peerConn, error) {
conn, err := dial(addr, config)
if err != nil {
return pc, cmn.ErrorWrap(err, "Error creating peer")
return peerConn{}, cmn.ErrorWrap(err, "Error creating peer")
}
pc, err = newPeerConn(conn, config, true, persistent, ourNodePrivKey)
pc, err := newPeerConn(conn, config, true, persistent, ourNodePrivKey)
if err != nil {
if err2 := conn.Close(); err2 != nil {
return pc, cmn.ErrorWrap(err, err2.Error())
if cerr := conn.Close(); cerr != nil {
return peerConn{}, cmn.ErrorWrap(err, cerr.Error())
}
return pc, err
return peerConn{}, err
}
// ensure dialed ID matches connection ID
if addr.ID != pc.ID() {
if err2 := conn.Close(); err2 != nil {
return pc, cmn.ErrorWrap(err, err2.Error())
if cerr := conn.Close(); cerr != nil {
return peerConn{}, cmn.ErrorWrap(err, cerr.Error())
}
return pc, ErrSwitchAuthenticationFailure{addr, pc.ID()}
return peerConn{}, ErrSwitchAuthenticationFailure{addr, pc.ID()}
}
return pc, nil
}
func newInboundPeerConn(conn net.Conn, config *PeerConfig, ourNodePrivKey crypto.PrivKey) (peerConn, error) {
func newInboundPeerConn(
conn net.Conn,
config *config.P2PConfig,
ourNodePrivKey crypto.PrivKey,
) (peerConn, error) {
// TODO: issue PoW challenge
return newPeerConn(conn, config, false, false, ourNodePrivKey)
}
func newPeerConn(rawConn net.Conn,
config *PeerConfig, outbound, persistent bool,
ourNodePrivKey crypto.PrivKey) (pc peerConn, err error) {
func newPeerConn(
rawConn net.Conn,
cfg *config.P2PConfig,
outbound, persistent bool,
ourNodePrivKey crypto.PrivKey,
) (pc peerConn, err error) {
conn := rawConn
// Fuzz connection
if config.Fuzz {
if cfg.TestFuzz {
// so we have time to do peer handshakes and get set up
conn = FuzzConnAfterFromConfig(conn, 10*time.Second, config.FuzzConfig)
conn = FuzzConnAfterFromConfig(conn, 10*time.Second, cfg.TestFuzzConfig)
}
// Set deadline for secret handshake
if err := conn.SetDeadline(time.Now().Add(config.HandshakeTimeout * time.Second)); err != nil {
return pc, cmn.ErrorWrap(err, "Error setting deadline while encrypting connection")
dl := time.Now().Add(cfg.HandshakeTimeout)
if err := conn.SetDeadline(dl); err != nil {
return pc, cmn.ErrorWrap(
err,
"Error setting deadline while encrypting connection",
)
}
// Encrypt connection
@ -197,7 +199,7 @@ func newPeerConn(rawConn net.Conn,
// Only the information we already have
return peerConn{
config: config,
config: cfg,
outbound: outbound,
persistent: persistent,
conn: conn,
@ -300,22 +302,33 @@ func (p *peer) hasChannel(chID byte) bool {
}
// NOTE: probably will want to remove this
// but could be helpful while the feature is new
p.Logger.Debug("Unknown channel for peer", "channel", chID, "channels", p.channels)
p.Logger.Debug(
"Unknown channel for peer",
"channel",
chID,
"channels",
p.channels,
)
return false
}
//---------------------------------------------------
// methods used by the Switch
// CloseConn should be called by the Switch if the peer was created but never started.
// CloseConn should be called by the Switch if the peer was created but never
// started.
func (pc *peerConn) CloseConn() {
pc.conn.Close() // nolint: errcheck
}
// HandshakeTimeout performs the Tendermint P2P handshake between a given node and the peer
// by exchanging their NodeInfo. It sets the received nodeInfo on the peer.
// HandshakeTimeout performs the Tendermint P2P handshake between a given node
// and the peer by exchanging their NodeInfo. It sets the received nodeInfo on
// the peer.
// NOTE: blocking
func (pc *peerConn) HandshakeTimeout(ourNodeInfo NodeInfo, timeout time.Duration) (peerNodeInfo NodeInfo, err error) {
func (pc *peerConn) HandshakeTimeout(
ourNodeInfo NodeInfo,
timeout time.Duration,
) (peerNodeInfo NodeInfo, err error) {
// Set deadline for handshake so we don't block forever on conn.ReadFull
if err := pc.conn.SetDeadline(time.Now().Add(timeout)); err != nil {
return peerNodeInfo, cmn.ErrorWrap(err, "Error setting deadline")
@ -327,7 +340,11 @@ func (pc *peerConn) HandshakeTimeout(ourNodeInfo NodeInfo, timeout time.Duration
return
},
func(_ int) (val interface{}, err error, abort bool) {
_, err = cdc.UnmarshalBinaryReader(pc.conn, &peerNodeInfo, int64(MaxNodeInfoSize()))
_, err = cdc.UnmarshalBinaryReader(
pc.conn,
&peerNodeInfo,
int64(MaxNodeInfoSize()),
)
return
},
)
@ -368,20 +385,26 @@ func (p *peer) String() string {
//------------------------------------------------------------------
// helper funcs
func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
if config.DialFail {
func dial(addr *NetAddress, cfg *config.P2PConfig) (net.Conn, error) {
if cfg.TestDialFail {
return nil, fmt.Errorf("dial err (peerConfig.DialFail == true)")
}
conn, err := addr.DialTimeout(config.DialTimeout * time.Second)
conn, err := addr.DialTimeout(cfg.DialTimeout)
if err != nil {
return nil, err
}
return conn, nil
}
func createMConnection(conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}), config *tmconn.MConnConfig) *tmconn.MConnection {
func createMConnection(
conn net.Conn,
p *peer,
reactorsByCh map[byte]Reactor,
chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}),
config tmconn.MConnConfig,
) *tmconn.MConnection {
onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID]
@ -397,5 +420,11 @@ func createMConnection(conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, ch
onPeerError(p, r)
}
return tmconn.NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
return tmconn.NewMConnectionWithConfig(
conn,
chDescs,
onReceive,
onError,
config,
)
}

+ 8
- 6
p2p/peer_test.go View File

@ -10,9 +10,11 @@ import (
"github.com/stretchr/testify/require"
crypto "github.com/tendermint/go-crypto"
tmconn "github.com/tendermint/tendermint/p2p/conn"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
"github.com/tendermint/tendermint/config"
tmconn "github.com/tendermint/tendermint/p2p/conn"
)
const testCh = 0x01
@ -21,11 +23,11 @@ func TestPeerBasic(t *testing.T) {
assert, require := assert.New(t), require.New(t)
// simulate remote peer
rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()}
rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: cfg}
rp.Start()
defer rp.Stop()
p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), DefaultPeerConfig())
p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), cfg)
require.Nil(err)
err = p.Start()
@ -44,7 +46,7 @@ func TestPeerBasic(t *testing.T) {
func TestPeerSend(t *testing.T) {
assert, require := assert.New(t), require.New(t)
config := DefaultPeerConfig()
config := cfg
// simulate remote peer
rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: config}
@ -63,7 +65,7 @@ func TestPeerSend(t *testing.T) {
assert.True(p.Send(testCh, []byte("Asylum")))
}
func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) (*peer, error) {
func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *config.P2PConfig) (*peer, error) {
chDescs := []*tmconn.ChannelDescriptor{
{ID: testCh, Priority: 1},
}
@ -91,7 +93,7 @@ func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig)
type remotePeer struct {
PrivKey crypto.PrivKey
Config *PeerConfig
Config *config.P2PConfig
addr *NetAddress
quit chan struct{}
channels cmn.HexBytes


+ 14
- 13
p2p/pex/pex_reactor_test.go View File

@ -13,21 +13,22 @@ import (
"github.com/stretchr/testify/require"
crypto "github.com/tendermint/go-crypto"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/conn"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/conn"
)
var (
config *cfg.P2PConfig
cfg *config.P2PConfig
)
func init() {
config = cfg.DefaultP2PConfig()
config.PexReactor = true
config.AllowDuplicateIP = true
cfg = config.DefaultP2PConfig()
cfg.PexReactor = true
cfg.AllowDuplicateIP = true
}
func TestPEXReactorBasic(t *testing.T) {
@ -81,7 +82,7 @@ func TestPEXReactorRunning(t *testing.T) {
// create switches
for i := 0; i < N; i++ {
switches[i] = p2p.MakeSwitch(config, i, "testing", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch {
switches[i] = p2p.MakeSwitch(cfg, i, "testing", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch {
books[i] = NewAddrBook(filepath.Join(dir, fmt.Sprintf("addrbook%d.json", i)), false)
books[i].SetLogger(logger.With("pex", i))
sw.SetAddrBook(books[i])
@ -209,7 +210,7 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
// 1. create seed
seed := p2p.MakeSwitch(
config,
cfg,
0,
"127.0.0.1",
"123.123.123",
@ -239,7 +240,7 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
// 2. create usual peer with only seed configured.
peer := p2p.MakeSwitch(
config,
cfg,
1,
"127.0.0.1",
"123.123.123",
@ -425,7 +426,7 @@ func assertPeersWithTimeout(
}
}
func createReactor(config *PEXReactorConfig) (r *PEXReactor, book *addrBook) {
func createReactor(conf *PEXReactorConfig) (r *PEXReactor, book *addrBook) {
// directory to store address book
dir, err := ioutil.TempDir("", "pex_reactor")
if err != nil {
@ -434,7 +435,7 @@ func createReactor(config *PEXReactorConfig) (r *PEXReactor, book *addrBook) {
book = NewAddrBook(filepath.Join(dir, "addrbook.json"), true)
book.SetLogger(log.TestingLogger())
r = NewPEXReactor(book, config)
r = NewPEXReactor(book, conf)
r.SetLogger(log.TestingLogger())
return
}
@ -447,7 +448,7 @@ func teardownReactor(book *addrBook) {
}
func createSwitchAndAddReactors(reactors ...p2p.Reactor) *p2p.Switch {
sw := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw })
sw := p2p.MakeSwitch(cfg, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw })
sw.SetLogger(log.TestingLogger())
for _, r := range reactors {
sw.AddReactor(r.String(), r)


+ 30
- 19
p2p/switch.go View File

@ -7,7 +7,7 @@ import (
"sync"
"time"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p/conn"
cmn "github.com/tendermint/tmlibs/common"
)
@ -55,8 +55,7 @@ type AddrBook interface {
type Switch struct {
cmn.BaseService
config *cfg.P2PConfig
peerConfig *PeerConfig
config *config.P2PConfig
listeners []Listener
reactors map[string]Reactor
chDescs []*conn.ChannelDescriptor
@ -75,10 +74,9 @@ type Switch struct {
}
// NewSwitch creates a new Switch with the given config.
func NewSwitch(config *cfg.P2PConfig) *Switch {
func NewSwitch(cfg *config.P2PConfig) *Switch {
sw := &Switch{
config: config,
peerConfig: DefaultPeerConfig(),
config: cfg,
reactors: make(map[string]Reactor),
chDescs: make([]*conn.ChannelDescriptor, 0),
reactorsByCh: make(map[byte]Reactor),
@ -90,11 +88,10 @@ func NewSwitch(config *cfg.P2PConfig) *Switch {
// Ensure we have a completely undeterministic PRNG.
sw.rng = cmn.NewRand()
// TODO: collapse the peerConfig into the config ?
sw.peerConfig.MConfig.FlushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond
sw.peerConfig.MConfig.SendRate = config.SendRate
sw.peerConfig.MConfig.RecvRate = config.RecvRate
sw.peerConfig.MConfig.MaxPacketMsgPayloadSize = config.MaxPacketMsgPayloadSize
sw.config.MConfig.FlushThrottle = time.Duration(cfg.FlushThrottleTimeout) * time.Millisecond
sw.config.MConfig.SendRate = cfg.SendRate
sw.config.MConfig.RecvRate = cfg.RecvRate
sw.config.MConfig.MaxPacketMsgPayloadSize = cfg.MaxPacketMsgPayloadSize
sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
return sw
@ -419,7 +416,7 @@ func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent b
func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) error {
sw.dialing.Set(string(addr.ID), addr)
defer sw.dialing.Delete(string(addr.ID))
return sw.addOutboundPeerWithConfig(addr, sw.peerConfig, persistent)
return sw.addOutboundPeerWithConfig(addr, sw.config, persistent)
}
// sleep for interval plus some random amount of ms on [0, dialRandomizerIntervalMilliseconds]
@ -476,7 +473,7 @@ func (sw *Switch) listenerRoutine(l Listener) {
}
// New inbound connection!
err := sw.addInboundPeerWithConfig(inConn, sw.peerConfig)
err := sw.addInboundPeerWithConfig(inConn, sw.config)
if err != nil {
sw.Logger.Info("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "err", err)
continue
@ -486,7 +483,10 @@ func (sw *Switch) listenerRoutine(l Listener) {
// cleanup
}
func (sw *Switch) addInboundPeerWithConfig(conn net.Conn, config *PeerConfig) error {
func (sw *Switch) addInboundPeerWithConfig(
conn net.Conn,
config *config.P2PConfig,
) error {
peerConn, err := newInboundPeerConn(conn, config, sw.nodeKey.PrivKey)
if err != nil {
conn.Close() // peer is nil
@ -503,10 +503,20 @@ func (sw *Switch) addInboundPeerWithConfig(conn net.Conn, config *PeerConfig) er
// dial the peer; make secret connection; authenticate against the dialed ID;
// add the peer.
// if dialing fails, start the reconnect loop. If handhsake fails, its over.
// If peer is started succesffuly, reconnectLoop will start when StopPeerForError is called
func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig, persistent bool) error {
// If peer is started succesffuly, reconnectLoop will start when
// StopPeerForError is called
func (sw *Switch) addOutboundPeerWithConfig(
addr *NetAddress,
config *config.P2PConfig,
persistent bool,
) error {
sw.Logger.Info("Dialing peer", "address", addr)
peerConn, err := newOutboundPeerConn(addr, config, persistent, sw.nodeKey.PrivKey)
peerConn, err := newOutboundPeerConn(
addr,
config,
persistent,
sw.nodeKey.PrivKey,
)
if err != nil {
if persistent {
go sw.reconnectToPeer(addr)
@ -525,7 +535,8 @@ func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig
// that already has a SecretConnection. If all goes well,
// it starts the peer and adds it to the switch.
// NOTE: This performs a blocking handshake before the peer is added.
// NOTE: If error is returned, caller is responsible for calling peer.CloseConn()
// NOTE: If error is returned, caller is responsible for calling
// peer.CloseConn()
func (sw *Switch) addPeer(pc peerConn) error {
addr := pc.conn.RemoteAddr()
@ -534,7 +545,7 @@ func (sw *Switch) addPeer(pc peerConn) error {
}
// Exchange NodeInfo on the conn
peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second))
peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.config.HandshakeTimeout))
if err != nil {
return err
}


+ 23
- 23
p2p/switch_test.go View File

@ -14,18 +14,18 @@ import (
crypto "github.com/tendermint/go-crypto"
"github.com/tendermint/tmlibs/log"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p/conn"
)
var (
config *cfg.P2PConfig
cfg *config.P2PConfig
)
func init() {
config = cfg.DefaultP2PConfig()
config.PexReactor = true
config.AllowDuplicateIP = true
cfg = config.DefaultP2PConfig()
cfg.PexReactor = true
cfg.AllowDuplicateIP = true
}
type PeerMessage struct {
@ -85,7 +85,7 @@ func (tr *TestReactor) getMsgs(chID byte) []PeerMessage {
// XXX: note this uses net.Pipe and not a proper TCP conn
func MakeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) {
// Create two switches that will be interconnected.
switches := MakeConnectedSwitches(config, 2, initSwitch, Connect2Switches)
switches := MakeConnectedSwitches(cfg, 2, initSwitch, Connect2Switches)
return switches[0], switches[1]
}
@ -152,8 +152,8 @@ func assertMsgReceivedWithTimeout(t *testing.T, msgBytes []byte, channel byte, r
}
func TestConnAddrFilter(t *testing.T) {
s1 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
s2 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
s1 := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
s2 := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
defer s1.Stop()
defer s2.Stop()
@ -181,14 +181,14 @@ func TestConnAddrFilter(t *testing.T) {
}
func TestSwitchFiltersOutItself(t *testing.T) {
s1 := MakeSwitch(config, 1, "127.0.0.1", "123.123.123", initSwitchFunc)
s1 := MakeSwitch(cfg, 1, "127.0.0.1", "123.123.123", initSwitchFunc)
// addr := s1.NodeInfo().NetAddress()
// // add ourselves like we do in node.go#427
// s1.addrBook.AddOurAddress(addr)
// simulate s1 having a public IP by creating a remote peer with the same ID
rp := &remotePeer{PrivKey: s1.nodeKey.PrivKey, Config: DefaultPeerConfig()}
rp := &remotePeer{PrivKey: s1.nodeKey.PrivKey, Config: cfg}
rp.Start()
// addr should be rejected in addPeer based on the same ID
@ -214,8 +214,8 @@ func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration)
}
func TestConnIDFilter(t *testing.T) {
s1 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
s2 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
s1 := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
s2 := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
defer s1.Stop()
defer s2.Stop()
@ -251,7 +251,7 @@ func TestConnIDFilter(t *testing.T) {
func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
assert, require := assert.New(t), require.New(t)
sw := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
err := sw.Start()
if err != nil {
t.Error(err)
@ -259,11 +259,11 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
defer sw.Stop()
// simulate remote peer
rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()}
rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: cfg}
rp.Start()
defer rp.Stop()
pc, err := newOutboundPeerConn(rp.Addr(), DefaultPeerConfig(), false, sw.nodeKey.PrivKey)
pc, err := newOutboundPeerConn(rp.Addr(), cfg, false, sw.nodeKey.PrivKey)
require.Nil(err)
err = sw.addPeer(pc)
require.Nil(err)
@ -281,7 +281,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
assert, require := assert.New(t), require.New(t)
sw := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
err := sw.Start()
if err != nil {
t.Error(err)
@ -289,11 +289,11 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
defer sw.Stop()
// simulate remote peer
rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()}
rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: cfg}
rp.Start()
defer rp.Stop()
pc, err := newOutboundPeerConn(rp.Addr(), DefaultPeerConfig(), true, sw.nodeKey.PrivKey)
pc, err := newOutboundPeerConn(rp.Addr(), cfg, true, sw.nodeKey.PrivKey)
// sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey,
require.Nil(err)
@ -320,7 +320,7 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
// simulate another remote peer
rp = &remotePeer{
PrivKey: crypto.GenPrivKeyEd25519(),
Config: DefaultPeerConfig(),
Config: cfg,
// Use different interface to prevent duplicate IP filter, this will break
// beyond two peers.
listenAddr: "127.0.0.1:0",
@ -329,9 +329,9 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
defer rp.Stop()
// simulate first time dial failure
peerConfig := DefaultPeerConfig()
peerConfig.DialFail = true
err = sw.addOutboundPeerWithConfig(rp.Addr(), peerConfig, true)
conf := config.DefaultP2PConfig()
conf.TestDialFail = true
err = sw.addOutboundPeerWithConfig(rp.Addr(), conf, true)
require.NotNil(err)
// DialPeerWithAddres - sw.peerConfig resets the dialer
@ -348,7 +348,7 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
}
func TestSwitchFullConnectivity(t *testing.T) {
switches := MakeConnectedSwitches(config, 3, initSwitchFunc, Connect2Switches)
switches := MakeConnectedSwitches(cfg, 3, initSwitchFunc, Connect2Switches)
defer func() {
for _, sw := range switches {
sw.Stop()


+ 4
- 4
p2p/test_util.go View File

@ -8,7 +8,7 @@ import (
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p/conn"
)
@ -56,7 +56,7 @@ const TEST_HOST = "localhost"
// If connect==Connect2Switches, the switches will be fully connected.
// initSwitch defines how the i'th switch should be initialized (ie. with what reactors).
// NOTE: panics if any switch fails to start.
func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
func MakeConnectedSwitches(cfg *config.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
switches := make([]*Switch, n)
for i := 0; i < n; i++ {
switches[i] = MakeSwitch(cfg, i, TEST_HOST, "123.123.123", initSwitch)
@ -104,7 +104,7 @@ func Connect2Switches(switches []*Switch, i, j int) {
}
func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
pc, err := newInboundPeerConn(conn, sw.peerConfig, sw.nodeKey.PrivKey)
pc, err := newInboundPeerConn(conn, sw.config, sw.nodeKey.PrivKey)
if err != nil {
if err := conn.Close(); err != nil {
sw.Logger.Error("Error closing connection", "err", err)
@ -131,7 +131,7 @@ func StartSwitches(switches []*Switch) error {
return nil
}
func MakeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
func MakeSwitch(cfg *config.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
// new switch, add reactors
// TODO: let the config be passed in?
nodeKey := &NodeKey{


Loading…
Cancel
Save