Browse Source

p2p: make persistent prop independent of conn direction (#3593)

## Description

Previously only outbound peers can be persistent.

Now, even if the peer is inbound, if it's marked as persistent, when/if conn is lost,
Tendermint will try to reconnect. This part is actually optional and can be reverted.

Plus, seed won't disconnect from inbound peer if it's marked as
persistent. Fixes #3362

## Commits

* make persistent prop independent of conn direction

Previously only outbound peers can be persistent. Now, even if the peer
is inbound, if it's marked as persistent, when/if conn is lost,
Tendermint will try to reconnect.

Plus, seed won't disconnect from inbound peer if it's marked as
persistent. Fixes #3362

* fix TestPEXReactorDialPeer test

* add a changelog entry

* update changelog

* add two tests

* reformat code

* test UnsafeDialPeers and UnsafeDialSeeds

* add TestSwitchDialPeersAsync

* spec: update p2p/config spec

* fixes after Ismail's review

* Apply suggestions from code review

Co-Authored-By: melekes <anton.kalyaev@gmail.com>

* fix merge conflict

* remove sleep from TestPEXReactorDoesNotDisconnectFromPersistentPeerInSeedMode

We don't need it actually.
pull/3621/head
Anton Kaliaev 6 years ago
committed by GitHub
parent
commit
8711af608f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 323 additions and 104 deletions
  1. +5
    -0
      CHANGELOG_PENDING.md
  2. +6
    -6
      docs/spec/p2p/config.md
  3. +7
    -6
      node/node.go
  4. +2
    -3
      p2p/pex/pex_reactor.go
  5. +39
    -3
      p2p/pex/pex_reactor_test.go
  6. +89
    -32
      p2p/switch.go
  7. +72
    -39
      p2p/switch_test.go
  8. +22
    -6
      p2p/transport.go
  9. +6
    -8
      rpc/core/net.go
  10. +73
    -0
      rpc/core/net_test.go
  11. +2
    -1
      rpc/core/pipe.go

+ 5
- 0
CHANGELOG_PENDING.md View File

@ -23,11 +23,16 @@
- [cli] \#3585 Add option to not clear address book with unsafe reset (@climber73) - [cli] \#3585 Add option to not clear address book with unsafe reset (@climber73)
- [cli] [\#3160](https://github.com/tendermint/tendermint/issues/3160) Add `-config=<path-to-config>` option to `testnet` cmd (@gregdhill) - [cli] [\#3160](https://github.com/tendermint/tendermint/issues/3160) Add `-config=<path-to-config>` option to `testnet` cmd (@gregdhill)
- [cs/replay] \#3460 check appHash for each block - [cs/replay] \#3460 check appHash for each block
- [rpc] \#3362 `/dial_seeds` & `/dial_peers` return errors if addresses are incorrect (except when IP lookup fails)
- [node] \#3362 returns an error if `persistent_peers` list is invalid (except when IP lookup fails)
- [p2p] \#3531 Terminate session on nonce wrapping (@climber73) - [p2p] \#3531 Terminate session on nonce wrapping (@climber73)
### BUG FIXES: ### BUG FIXES:
- [p2p] \#3532 limit the number of attempts to connect to a peer in seed mode - [p2p] \#3532 limit the number of attempts to connect to a peer in seed mode
to 16 (as a result, the node will stop retrying after a 35 hours time window) to 16 (as a result, the node will stop retrying after a 35 hours time window)
- [consensus] \#2723, \#3451 and \#3317 Fix non-deterministic tests - [consensus] \#2723, \#3451 and \#3317 Fix non-deterministic tests
- [p2p] \#3362 make persistent prop independent of conn direction
* `Switch#DialPeersAsync` now only takes a list of peers
* `Switch#DialPeerWithAddress` now only takes an address
- [consensus] \#3067 getBeginBlockValidatorInfo loads validators from stateDB instead of state (@james-ray) - [consensus] \#3067 getBeginBlockValidatorInfo loads validators from stateDB instead of state (@james-ray)
- [pex] \#3603 Dial seeds when addrbook needs more addresses (@defunctzombie) - [pex] \#3603 Dial seeds when addrbook needs more addresses (@defunctzombie)

+ 6
- 6
docs/spec/p2p/config.md View File

@ -12,14 +12,14 @@ and upon incoming connection shares some peers and disconnects.
## Seeds ## Seeds
`--p2p.seeds “1.2.3.4:26656,2.3.4.5:4444”`
`--p2p.seeds “id100000000000000000000000000000000@1.2.3.4:26656,id200000000000000000000000000000000@2.3.4.5:4444”`
Dials these seeds when we need more peers. They should return a list of peers and then disconnect. Dials these seeds when we need more peers. They should return a list of peers and then disconnect.
If we already have enough peers in the address book, we may never need to dial them. If we already have enough peers in the address book, we may never need to dial them.
## Persistent Peers ## Persistent Peers
`--p2p.persistent_peers “1.2.3.4:26656,2.3.4.5:26656”`
`--p2p.persistent_peers “id100000000000000000000000000000000@1.2.3.4:26656,id200000000000000000000000000000000@2.3.4.5:26656”`
Dial these peers and auto-redial them if the connection fails. Dial these peers and auto-redial them if the connection fails.
These are intended to be trusted persistent peers that can help These are intended to be trusted persistent peers that can help
@ -30,9 +30,9 @@ backoff and will give up after a day of trying to connect.
the user will be warned that seeds may auto-close connections the user will be warned that seeds may auto-close connections
and that the node may not be able to keep the connection persistent. and that the node may not be able to keep the connection persistent.
## Private Persistent Peers
## Private Peers
`--p2p.private_persistent_peers “1.2.3.4:26656,2.3.4.5:26656”`
`--p2p.private_peer_ids “id100000000000000000000000000000000,id200000000000000000000000000000000”`
These are persistent peers that we do not add to the address book or
gossip to other peers. They stay private to us.
These are IDs of the peers that we do not add to the address book or gossip to
other peers. They stay private to us.

+ 7
- 6
node/node.go View File

@ -579,6 +579,11 @@ func NewNode(config *cfg.Config,
consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger, consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger,
) )
err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
if err != nil {
return nil, errors.Wrap(err, "could not add peers from persistent_peers field")
}
addrBook := createAddrBookAndSetOnSwitch(config, sw, p2pLogger) addrBook := createAddrBookAndSetOnSwitch(config, sw, p2pLogger)
// Optionally, start the pex reactor // Optionally, start the pex reactor
@ -675,12 +680,8 @@ func (n *Node) OnStart() error {
} }
// Always connect to persistent peers // Always connect to persistent peers
if n.config.P2P.PersistentPeers != "" {
err = n.sw.DialPeersAsync(n.addrBook, splitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " "), true)
if err != nil {
return err
}
}
// parsing errors are handled above by AddPersistentPeers
_ = n.sw.DialPeersAsync(splitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " "))
return nil return nil
} }


+ 2
- 3
p2p/pex/pex_reactor.go View File

@ -531,8 +531,7 @@ func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) error {
} }
} }
err := r.Switch.DialPeerWithAddress(addr, false)
err := r.Switch.DialPeerWithAddress(addr)
if err != nil { if err != nil {
if _, ok := err.(p2p.ErrCurrentlyDialingOrExistingAddress); ok { if _, ok := err.(p2p.ErrCurrentlyDialingOrExistingAddress); ok {
return err return err
@ -584,7 +583,7 @@ func (r *PEXReactor) dialSeeds() {
for _, i := range perm { for _, i := range perm {
// dial a random seed // dial a random seed
seedAddr := r.seedAddrs[i] seedAddr := r.seedAddrs[i]
err := r.Switch.DialPeerWithAddress(seedAddr, false)
err := r.Switch.DialPeerWithAddress(seedAddr)
if err == nil { if err == nil {
return return
} }


+ 39
- 3
p2p/pex/pex_reactor_test.go View File

@ -291,7 +291,8 @@ func TestPEXReactorSeedMode(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
defer os.RemoveAll(dir) // nolint: errcheck defer os.RemoveAll(dir) // nolint: errcheck
pexR, book := createReactor(&PEXReactorConfig{SeedMode: true, SeedDisconnectWaitPeriod: 10 * time.Millisecond})
pexRConfig := &PEXReactorConfig{SeedMode: true, SeedDisconnectWaitPeriod: 10 * time.Millisecond}
pexR, book := createReactor(pexRConfig)
defer teardownReactor(book) defer teardownReactor(book)
sw := createSwitchAndAddReactors(pexR) sw := createSwitchAndAddReactors(pexR)
@ -315,13 +316,48 @@ func TestPEXReactorSeedMode(t *testing.T) {
pexR.attemptDisconnects() pexR.attemptDisconnects()
assert.Equal(t, 1, sw.Peers().Size()) assert.Equal(t, 1, sw.Peers().Size())
time.Sleep(100 * time.Millisecond)
// sleep for SeedDisconnectWaitPeriod
time.Sleep(pexRConfig.SeedDisconnectWaitPeriod + 1*time.Millisecond)
// 3. attemptDisconnects should disconnect after wait period // 3. attemptDisconnects should disconnect after wait period
pexR.attemptDisconnects() pexR.attemptDisconnects()
assert.Equal(t, 0, sw.Peers().Size()) assert.Equal(t, 0, sw.Peers().Size())
} }
func TestPEXReactorDoesNotDisconnectFromPersistentPeerInSeedMode(t *testing.T) {
// directory to store address books
dir, err := ioutil.TempDir("", "pex_reactor")
require.Nil(t, err)
defer os.RemoveAll(dir) // nolint: errcheck
pexR, book := createReactor(&PEXReactorConfig{SeedMode: true, SeedDisconnectWaitPeriod: 1 * time.Millisecond})
defer teardownReactor(book)
sw := createSwitchAndAddReactors(pexR)
sw.SetAddrBook(book)
err = sw.Start()
require.NoError(t, err)
defer sw.Stop()
assert.Zero(t, sw.Peers().Size())
peerSwitch := testCreateDefaultPeer(dir, 1)
require.NoError(t, peerSwitch.Start())
defer peerSwitch.Stop()
err = sw.AddPersistentPeers([]string{peerSwitch.NetAddress().String()})
require.NoError(t, err)
// 1. Test crawlPeers dials the peer
pexR.crawlPeers([]*p2p.NetAddress{peerSwitch.NetAddress()})
assert.Equal(t, 1, sw.Peers().Size())
assert.True(t, sw.Peers().Has(peerSwitch.NodeInfo().ID()))
// 2. attemptDisconnects should not disconnect because the peer is persistent
pexR.attemptDisconnects()
assert.Equal(t, 1, sw.Peers().Size())
}
func TestPEXReactorDialsPeerUpToMaxAttemptsInSeedMode(t *testing.T) { func TestPEXReactorDialsPeerUpToMaxAttemptsInSeedMode(t *testing.T) {
// directory to store address books // directory to store address books
dir, err := ioutil.TempDir("", "pex_reactor") dir, err := ioutil.TempDir("", "pex_reactor")
@ -398,7 +434,7 @@ func TestPEXReactorSeedModeFlushStop(t *testing.T) {
reactor := switches[0].Reactors()["pex"].(*PEXReactor) reactor := switches[0].Reactors()["pex"].(*PEXReactor)
peerID := switches[1].NodeInfo().ID() peerID := switches[1].NodeInfo().ID()
err = switches[1].DialPeerWithAddress(switches[0].NetAddress(), false)
err = switches[1].DialPeerWithAddress(switches[0].NetAddress())
assert.NoError(t, err) assert.NoError(t, err)
// sleep up to a second while waiting for the peer to send us a message. // sleep up to a second while waiting for the peer to send us a message.


+ 89
- 32
p2p/switch.go View File

@ -77,6 +77,8 @@ type Switch struct {
nodeInfo NodeInfo // our node info nodeInfo NodeInfo // our node info
nodeKey *NodeKey // our node privkey nodeKey *NodeKey // our node privkey
addrBook AddrBook addrBook AddrBook
// peers addresses with whom we'll maintain constant connection
persistentPeersAddrs []*NetAddress
transport Transport transport Transport
@ -104,16 +106,17 @@ func NewSwitch(
options ...SwitchOption, options ...SwitchOption,
) *Switch { ) *Switch {
sw := &Switch{ sw := &Switch{
config: cfg,
reactors: make(map[string]Reactor),
chDescs: make([]*conn.ChannelDescriptor, 0),
reactorsByCh: make(map[byte]Reactor),
peers: NewPeerSet(),
dialing: cmn.NewCMap(),
reconnecting: cmn.NewCMap(),
metrics: NopMetrics(),
transport: transport,
filterTimeout: defaultFilterTimeout,
config: cfg,
reactors: make(map[string]Reactor),
chDescs: make([]*conn.ChannelDescriptor, 0),
reactorsByCh: make(map[byte]Reactor),
peers: NewPeerSet(),
dialing: cmn.NewCMap(),
reconnecting: cmn.NewCMap(),
metrics: NopMetrics(),
transport: transport,
filterTimeout: defaultFilterTimeout,
persistentPeersAddrs: make([]*NetAddress, 0),
} }
// Ensure we have a completely undeterministic PRNG. // Ensure we have a completely undeterministic PRNG.
@ -297,7 +300,19 @@ func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) {
sw.stopAndRemovePeer(peer, reason) sw.stopAndRemovePeer(peer, reason)
if peer.IsPersistent() { if peer.IsPersistent() {
go sw.reconnectToPeer(peer.SocketAddr())
var addr *NetAddress
if peer.IsOutbound() { // socket address for outbound peers
addr = peer.SocketAddr()
} else { // self-reported address for inbound peers
var err error
addr, err = peer.NodeInfo().NetAddress()
if err != nil {
sw.Logger.Error("Wanted to reconnect to inbound peer, but self-reported address is wrong",
"peer", peer, "addr", addr, "err", err)
return
}
}
go sw.reconnectToPeer(addr)
} }
} }
@ -341,7 +356,7 @@ func (sw *Switch) reconnectToPeer(addr *NetAddress) {
return return
} }
err := sw.DialPeerWithAddress(addr, true)
err := sw.DialPeerWithAddress(addr)
if err == nil { if err == nil {
return // success return // success
} else if _, ok := err.(ErrCurrentlyDialingOrExistingAddress); ok { } else if _, ok := err.(ErrCurrentlyDialingOrExistingAddress); ok {
@ -365,7 +380,7 @@ func (sw *Switch) reconnectToPeer(addr *NetAddress) {
sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i)) sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i))
sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second) sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second)
err := sw.DialPeerWithAddress(addr, true)
err := sw.DialPeerWithAddress(addr)
if err == nil { if err == nil {
return // success return // success
} else if _, ok := err.(ErrCurrentlyDialingOrExistingAddress); ok { } else if _, ok := err.(ErrCurrentlyDialingOrExistingAddress); ok {
@ -401,28 +416,41 @@ func isPrivateAddr(err error) bool {
return ok && te.PrivateAddr() return ok && te.PrivateAddr()
} }
// DialPeersAsync dials a list of peers asynchronously in random order (optionally, making them persistent).
// DialPeersAsync dials a list of peers asynchronously in random order.
// Used to dial peers from config on startup or from unsafe-RPC (trusted sources). // Used to dial peers from config on startup or from unsafe-RPC (trusted sources).
// TODO: remove addrBook arg since it's now set on the switch
func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent bool) error {
// It ignores ErrNetAddressLookup. However, if there are other errors, first
// encounter is returned.
// Nop if there are no peers.
func (sw *Switch) DialPeersAsync(peers []string) error {
netAddrs, errs := NewNetAddressStrings(peers) netAddrs, errs := NewNetAddressStrings(peers)
// only log errors, dial correct addresses
// report all the errors
for _, err := range errs { for _, err := range errs {
sw.Logger.Error("Error in peer's address", "err", err) sw.Logger.Error("Error in peer's address", "err", err)
} }
// return first non-ErrNetAddressLookup error
for _, err := range errs {
if _, ok := err.(ErrNetAddressLookup); ok {
continue
}
return err
}
sw.dialPeersAsync(netAddrs)
return nil
}
func (sw *Switch) dialPeersAsync(netAddrs []*NetAddress) {
ourAddr := sw.NetAddress() ourAddr := sw.NetAddress()
// TODO: this code feels like it's in the wrong place. // TODO: this code feels like it's in the wrong place.
// The integration tests depend on the addrBook being saved // The integration tests depend on the addrBook being saved
// right away but maybe we can change that. Recall that // right away but maybe we can change that. Recall that
// the addrBook is only written to disk every 2min // the addrBook is only written to disk every 2min
if addrBook != nil {
if sw.addrBook != nil {
// add peers to `addrBook` // add peers to `addrBook`
for _, netAddr := range netAddrs { for _, netAddr := range netAddrs {
// do not add our address or ID // do not add our address or ID
if !netAddr.Same(ourAddr) { if !netAddr.Same(ourAddr) {
if err := addrBook.AddAddress(netAddr, ourAddr); err != nil {
if err := sw.addrBook.AddAddress(netAddr, ourAddr); err != nil {
if isPrivateAddr(err) { if isPrivateAddr(err) {
sw.Logger.Debug("Won't add peer's address to addrbook", "err", err) sw.Logger.Debug("Won't add peer's address to addrbook", "err", err)
} else { } else {
@ -433,7 +461,7 @@ func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent b
} }
// Persist some peers to disk right away. // Persist some peers to disk right away.
// NOTE: integration tests depend on this // NOTE: integration tests depend on this
addrBook.Save()
sw.addrBook.Save()
} }
// permute the list, dial them in random order. // permute the list, dial them in random order.
@ -450,7 +478,7 @@ func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent b
sw.randomSleep(0) sw.randomSleep(0)
err := sw.DialPeerWithAddress(addr, persistent)
err := sw.DialPeerWithAddress(addr)
if err != nil { if err != nil {
switch err.(type) { switch err.(type) {
case ErrSwitchConnectToSelf, ErrSwitchDuplicatePeerID, ErrCurrentlyDialingOrExistingAddress: case ErrSwitchConnectToSelf, ErrSwitchDuplicatePeerID, ErrCurrentlyDialingOrExistingAddress:
@ -461,16 +489,13 @@ func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent b
} }
}(i) }(i)
} }
return nil
} }
// DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects // DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects
// and authenticates successfully. // and authenticates successfully.
// If `persistent == true`, the switch will always try to reconnect to this
// peer if the connection ever fails.
// If we're currently dialing this address or it belongs to an existing peer, // If we're currently dialing this address or it belongs to an existing peer,
// ErrCurrentlyDialingOrExistingAddress is returned. // ErrCurrentlyDialingOrExistingAddress is returned.
func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) error {
func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
if sw.IsDialingOrExistingAddress(addr) { if sw.IsDialingOrExistingAddress(addr) {
return ErrCurrentlyDialingOrExistingAddress{addr.String()} return ErrCurrentlyDialingOrExistingAddress{addr.String()}
} }
@ -478,7 +503,7 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) error {
sw.dialing.Set(string(addr.ID), addr) sw.dialing.Set(string(addr.ID), addr)
defer sw.dialing.Delete(string(addr.ID)) defer sw.dialing.Delete(string(addr.ID))
return sw.addOutboundPeerWithConfig(addr, sw.config, persistent)
return sw.addOutboundPeerWithConfig(addr, sw.config)
} }
// sleep for interval plus some random amount of ms on [0, dialRandomizerIntervalMilliseconds] // sleep for interval plus some random amount of ms on [0, dialRandomizerIntervalMilliseconds]
@ -495,6 +520,38 @@ func (sw *Switch) IsDialingOrExistingAddress(addr *NetAddress) bool {
(!sw.config.AllowDuplicateIP && sw.peers.HasIP(addr.IP)) (!sw.config.AllowDuplicateIP && sw.peers.HasIP(addr.IP))
} }
// AddPersistentPeers allows you to set persistent peers. It ignores
// ErrNetAddressLookup. However, if there are other errors, first encounter is
// returned.
func (sw *Switch) AddPersistentPeers(addrs []string) error {
sw.Logger.Info("Adding persistent peers", "addrs", addrs)
netAddrs, errs := NewNetAddressStrings(addrs)
// report all the errors
for _, err := range errs {
sw.Logger.Error("Error in peer's address", "err", err)
}
// return first non-ErrNetAddressLookup error
for _, err := range errs {
if _, ok := err.(ErrNetAddressLookup); ok {
continue
}
return err
}
sw.persistentPeersAddrs = netAddrs
return nil
}
func (sw *Switch) isPeerPersistentFn() func(*NetAddress) bool {
return func(na *NetAddress) bool {
for _, pa := range sw.persistentPeersAddrs {
if pa.Equals(na) {
return true
}
}
return false
}
}
func (sw *Switch) acceptRoutine() { func (sw *Switch) acceptRoutine() {
for { for {
p, err := sw.transport.Accept(peerConfig{ p, err := sw.transport.Accept(peerConfig{
@ -502,6 +559,7 @@ func (sw *Switch) acceptRoutine() {
onPeerError: sw.StopPeerForError, onPeerError: sw.StopPeerForError,
reactorsByCh: sw.reactorsByCh, reactorsByCh: sw.reactorsByCh,
metrics: sw.metrics, metrics: sw.metrics,
isPersistent: sw.isPeerPersistentFn(),
}) })
if err != nil { if err != nil {
switch err := err.(type) { switch err := err.(type) {
@ -581,13 +639,12 @@ func (sw *Switch) acceptRoutine() {
// dial the peer; make secret connection; authenticate against the dialed ID; // dial the peer; make secret connection; authenticate against the dialed ID;
// add the peer. // add the peer.
// if dialing fails, start the reconnect loop. If handhsake fails, its over.
// If peer is started succesffuly, reconnectLoop will start when
// StopPeerForError is called
// if dialing fails, start the reconnect loop. If handshake fails, it's over.
// If peer is started successfully, reconnectLoop will start when
// StopPeerForError is called.
func (sw *Switch) addOutboundPeerWithConfig( func (sw *Switch) addOutboundPeerWithConfig(
addr *NetAddress, addr *NetAddress,
cfg *config.P2PConfig, cfg *config.P2PConfig,
persistent bool,
) error { ) error {
sw.Logger.Info("Dialing peer", "address", addr) sw.Logger.Info("Dialing peer", "address", addr)
@ -600,7 +657,7 @@ func (sw *Switch) addOutboundPeerWithConfig(
p, err := sw.transport.Dial(*addr, peerConfig{ p, err := sw.transport.Dial(*addr, peerConfig{
chDescs: sw.chDescs, chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError, onPeerError: sw.StopPeerForError,
persistent: persistent,
isPersistent: sw.isPeerPersistentFn(),
reactorsByCh: sw.reactorsByCh, reactorsByCh: sw.reactorsByCh,
metrics: sw.metrics, metrics: sw.metrics,
}) })
@ -619,7 +676,7 @@ func (sw *Switch) addOutboundPeerWithConfig(
// retry persistent peers after // retry persistent peers after
// any dial error besides IsSelf() // any dial error besides IsSelf()
if persistent {
if sw.isPeerPersistentFn()(addr) {
go sw.reconnectToPeer(addr) go sw.reconnectToPeer(addr)
} }


+ 72
- 39
p2p/switch_test.go View File

@ -167,7 +167,7 @@ func TestSwitchFiltersOutItself(t *testing.T) {
rp.Start() rp.Start()
// addr should be rejected in addPeer based on the same ID // addr should be rejected in addPeer based on the same ID
err := s1.DialPeerWithAddress(rp.Addr(), false)
err := s1.DialPeerWithAddress(rp.Addr())
if assert.Error(t, err) { if assert.Error(t, err) {
if err, ok := err.(ErrRejected); ok { if err, ok := err.(ErrRejected); ok {
if !err.IsSelf() { if !err.IsSelf() {
@ -212,6 +212,7 @@ func TestSwitchPeerFilter(t *testing.T) {
p, err := sw.transport.Dial(*rp.Addr(), peerConfig{ p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
chDescs: sw.chDescs, chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError, onPeerError: sw.StopPeerForError,
isPersistent: sw.isPeerPersistentFn(),
reactorsByCh: sw.reactorsByCh, reactorsByCh: sw.reactorsByCh,
}) })
if err != nil { if err != nil {
@ -256,6 +257,7 @@ func TestSwitchPeerFilterTimeout(t *testing.T) {
p, err := sw.transport.Dial(*rp.Addr(), peerConfig{ p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
chDescs: sw.chDescs, chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError, onPeerError: sw.StopPeerForError,
isPersistent: sw.isPeerPersistentFn(),
reactorsByCh: sw.reactorsByCh, reactorsByCh: sw.reactorsByCh,
}) })
if err != nil { if err != nil {
@ -281,6 +283,7 @@ func TestSwitchPeerFilterDuplicate(t *testing.T) {
p, err := sw.transport.Dial(*rp.Addr(), peerConfig{ p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
chDescs: sw.chDescs, chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError, onPeerError: sw.StopPeerForError,
isPersistent: sw.isPeerPersistentFn(),
reactorsByCh: sw.reactorsByCh, reactorsByCh: sw.reactorsByCh,
}) })
if err != nil { if err != nil {
@ -326,6 +329,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
p, err := sw.transport.Dial(*rp.Addr(), peerConfig{ p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
chDescs: sw.chDescs, chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError, onPeerError: sw.StopPeerForError,
isPersistent: sw.isPeerPersistentFn(),
reactorsByCh: sw.reactorsByCh, reactorsByCh: sw.reactorsByCh,
}) })
require.Nil(err) require.Nil(err)
@ -390,49 +394,33 @@ func TestSwitchStopPeerForError(t *testing.T) {
assert.EqualValues(t, 0, peersMetricValue()) assert.EqualValues(t, 0, peersMetricValue())
} }
func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
assert, require := assert.New(t), require.New(t)
func TestSwitchReconnectsToOutboundPersistentPeer(t *testing.T) {
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
err := sw.Start() err := sw.Start()
if err != nil {
t.Error(err)
}
require.NoError(t, err)
defer sw.Stop() defer sw.Stop()
// simulate remote peer
// 1. simulate failure by closing connection
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
rp.Start() rp.Start()
defer rp.Stop() defer rp.Stop()
p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
persistent: true,
reactorsByCh: sw.reactorsByCh,
})
require.Nil(err)
require.Nil(sw.addPeer(p))
err = sw.AddPersistentPeers([]string{rp.Addr().String()})
require.NoError(t, err)
require.NotNil(sw.Peers().Get(rp.ID()))
err = sw.DialPeerWithAddress(rp.Addr())
require.Nil(t, err)
time.Sleep(50 * time.Millisecond)
require.NotNil(t, sw.Peers().Get(rp.ID()))
// simulate failure by closing connection
p := sw.Peers().List()[0]
p.(*peer).CloseConn() p.(*peer).CloseConn()
// TODO: remove sleep, detect the disconnection, wait for reconnect
npeers := sw.Peers().Size()
for i := 0; i < 20; i++ {
time.Sleep(250 * time.Millisecond)
npeers = sw.Peers().Size()
if npeers > 0 {
break
}
}
assert.NotZero(npeers)
assert.False(p.IsRunning())
waitUntilSwitchHasAtLeastNPeers(sw, 1)
assert.False(t, p.IsRunning()) // old peer instance
assert.Equal(t, 1, sw.Peers().Size()) // new peer instance
// simulate another remote peer
// 2. simulate first time dial failure
rp = &remotePeer{ rp = &remotePeer{
PrivKey: ed25519.GenPrivKey(), PrivKey: ed25519.GenPrivKey(),
Config: cfg, Config: cfg,
@ -443,23 +431,68 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
rp.Start() rp.Start()
defer rp.Stop() defer rp.Stop()
// simulate first time dial failure
conf := config.DefaultP2PConfig() conf := config.DefaultP2PConfig()
conf.TestDialFail = true conf.TestDialFail = true
err = sw.addOutboundPeerWithConfig(rp.Addr(), conf, true)
require.NotNil(err)
err = sw.addOutboundPeerWithConfig(rp.Addr(), conf)
require.NotNil(t, err)
// DialPeerWithAddres - sw.peerConfig resets the dialer // DialPeerWithAddres - sw.peerConfig resets the dialer
waitUntilSwitchHasAtLeastNPeers(sw, 2)
assert.Equal(t, 2, sw.Peers().Size())
}
func TestSwitchReconnectsToInboundPersistentPeer(t *testing.T) {
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
err := sw.Start()
require.NoError(t, err)
defer sw.Stop()
// 1. simulate failure by closing the connection
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
rp.Start()
defer rp.Stop()
err = sw.AddPersistentPeers([]string{rp.Addr().String()})
require.NoError(t, err)
conn, err := rp.Dial(sw.NetAddress())
require.NoError(t, err)
time.Sleep(50 * time.Millisecond)
require.NotNil(t, sw.Peers().Get(rp.ID()))
conn.Close()
waitUntilSwitchHasAtLeastNPeers(sw, 1)
assert.Equal(t, 1, sw.Peers().Size())
}
func TestSwitchDialPeersAsync(t *testing.T) {
if testing.Short() {
return
}
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
err := sw.Start()
require.NoError(t, err)
defer sw.Stop()
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
rp.Start()
defer rp.Stop()
err = sw.DialPeersAsync([]string{rp.Addr().String()})
require.NoError(t, err)
time.Sleep(dialRandomizerIntervalMilliseconds * time.Millisecond)
require.NotNil(t, sw.Peers().Get(rp.ID()))
}
// TODO: same as above
func waitUntilSwitchHasAtLeastNPeers(sw *Switch, n int) {
for i := 0; i < 20; i++ { for i := 0; i < 20; i++ {
time.Sleep(250 * time.Millisecond) time.Sleep(250 * time.Millisecond)
npeers = sw.Peers().Size()
if npeers > 1 {
has := sw.Peers().Size()
if has >= n {
break break
} }
} }
assert.EqualValues(2, npeers)
} }
func TestSwitchFullConnectivity(t *testing.T) { func TestSwitchFullConnectivity(t *testing.T) {


+ 22
- 6
p2p/transport.go View File

@ -37,11 +37,15 @@ type accept struct {
// events. // events.
// TODO(xla): Refactor out with more static Reactor setup and PeerBehaviour. // TODO(xla): Refactor out with more static Reactor setup and PeerBehaviour.
type peerConfig struct { type peerConfig struct {
chDescs []*conn.ChannelDescriptor
onPeerError func(Peer, interface{})
outbound, persistent bool
reactorsByCh map[byte]Reactor
metrics *Metrics
chDescs []*conn.ChannelDescriptor
onPeerError func(Peer, interface{})
outbound bool
// isPersistent allows you to set a function, which, given socket address
// (for outbound peers) OR self-reported address (for inbound peers), tells
// if the peer is persistent or not.
isPersistent func(*NetAddress) bool
reactorsByCh map[byte]Reactor
metrics *Metrics
} }
// Transport emits and connects to Peers. The implementation of Peer is left to // Transport emits and connects to Peers. The implementation of Peer is left to
@ -446,9 +450,21 @@ func (mt *MultiplexTransport) wrapPeer(
socketAddr *NetAddress, socketAddr *NetAddress,
) Peer { ) Peer {
persistent := false
if cfg.isPersistent != nil {
if cfg.outbound {
persistent = cfg.isPersistent(socketAddr)
} else {
selfReportedAddr, err := ni.NetAddress()
if err == nil {
persistent = cfg.isPersistent(selfReportedAddr)
}
}
}
peerConn := newPeerConn( peerConn := newPeerConn(
cfg.outbound, cfg.outbound,
cfg.persistent,
persistent,
c, c,
socketAddr, socketAddr,
) )


+ 6
- 8
rpc/core/net.go View File

@ -184,10 +184,8 @@ func UnsafeDialSeeds(ctx *rpctypes.Context, seeds []string) (*ctypes.ResultDialS
if len(seeds) == 0 { if len(seeds) == 0 {
return &ctypes.ResultDialSeeds{}, errors.New("No seeds provided") return &ctypes.ResultDialSeeds{}, errors.New("No seeds provided")
} }
// starts go routines to dial each peer after random delays
logger.Info("DialSeeds", "addrBook", addrBook, "seeds", seeds)
err := p2pPeers.DialPeersAsync(addrBook, seeds, false)
if err != nil {
logger.Info("DialSeeds", "seeds", seeds)
if err := p2pPeers.DialPeersAsync(seeds); err != nil {
return &ctypes.ResultDialSeeds{}, err return &ctypes.ResultDialSeeds{}, err
} }
return &ctypes.ResultDialSeeds{Log: "Dialing seeds in progress. See /net_info for details"}, nil return &ctypes.ResultDialSeeds{Log: "Dialing seeds in progress. See /net_info for details"}, nil
@ -197,12 +195,12 @@ func UnsafeDialPeers(ctx *rpctypes.Context, peers []string, persistent bool) (*c
if len(peers) == 0 { if len(peers) == 0 {
return &ctypes.ResultDialPeers{}, errors.New("No peers provided") return &ctypes.ResultDialPeers{}, errors.New("No peers provided")
} }
// starts go routines to dial each peer after random delays
logger.Info("DialPeers", "addrBook", addrBook, "peers", peers, "persistent", persistent)
err := p2pPeers.DialPeersAsync(addrBook, peers, persistent)
if err != nil {
logger.Info("DialPeers", "peers", peers, "persistent", persistent)
if err := p2pPeers.AddPersistentPeers(peers); err != nil {
return &ctypes.ResultDialPeers{}, err return &ctypes.ResultDialPeers{}, err
} }
// parsing errors are handled above by AddPersistentPeers
_ = p2pPeers.DialPeersAsync(peers)
return &ctypes.ResultDialPeers{Log: "Dialing peers in progress. See /net_info for details"}, nil return &ctypes.ResultDialPeers{Log: "Dialing peers in progress. See /net_info for details"}, nil
} }


+ 73
- 0
rpc/core/net_test.go View File

@ -0,0 +1,73 @@
package core
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
)
func TestUnsafeDialSeeds(t *testing.T) {
sw := p2p.MakeSwitch(cfg.DefaultP2PConfig(), 1, "testing", "123.123.123",
func(n int, sw *p2p.Switch) *p2p.Switch { return sw })
err := sw.Start()
require.NoError(t, err)
defer sw.Stop()
logger = log.TestingLogger()
p2pPeers = sw
testCases := []struct {
seeds []string
isErr bool
}{
{[]string{}, true},
{[]string{"d51fb70907db1c6c2d5237e78379b25cf1a37ab4@127.0.0.1:41198"}, false},
{[]string{"127.0.0.1:41198"}, true},
}
for _, tc := range testCases {
res, err := UnsafeDialSeeds(&rpctypes.Context{}, tc.seeds)
if tc.isErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.NotNil(t, res)
}
}
}
func TestUnsafeDialPeers(t *testing.T) {
sw := p2p.MakeSwitch(cfg.DefaultP2PConfig(), 1, "testing", "123.123.123",
func(n int, sw *p2p.Switch) *p2p.Switch { return sw })
err := sw.Start()
require.NoError(t, err)
defer sw.Stop()
logger = log.TestingLogger()
p2pPeers = sw
testCases := []struct {
peers []string
isErr bool
}{
{[]string{}, true},
{[]string{"d51fb70907db1c6c2d5237e78379b25cf1a37ab4@127.0.0.1:41198"}, false},
{[]string{"127.0.0.1:41198"}, true},
}
for _, tc := range testCases {
res, err := UnsafeDialPeers(&rpctypes.Context{}, tc.peers, false)
if tc.isErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.NotNil(t, res)
}
}
}

+ 2
- 1
rpc/core/pipe.go View File

@ -44,7 +44,8 @@ type transport interface {
} }
type peers interface { type peers interface {
DialPeersAsync(p2p.AddrBook, []string, bool) error
AddPersistentPeers([]string) error
DialPeersAsync([]string) error
NumPeers() (outbound, inbound, dialig int) NumPeers() (outbound, inbound, dialig int)
Peers() p2p.IPeerSet Peers() p2p.IPeerSet
} }


Loading…
Cancel
Save