You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

692 lines
19 KiB

p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
  1. package p2p
  2. import (
  3. "fmt"
  4. "math"
  5. "sync"
  6. "time"
  7. "github.com/tendermint/tendermint/config"
  8. cmn "github.com/tendermint/tendermint/libs/common"
  9. "github.com/tendermint/tendermint/p2p/conn"
  10. )
  11. const (
  12. // wait a random amount of time from this interval
  13. // before dialing peers or reconnecting to help prevent DoS
  14. dialRandomizerIntervalMilliseconds = 3000
  15. // repeatedly try to reconnect for a few minutes
  16. // ie. 5 * 20 = 100s
  17. reconnectAttempts = 20
  18. reconnectInterval = 5 * time.Second
  19. // then move into exponential backoff mode for ~1day
  20. // ie. 3**10 = 16hrs
  21. reconnectBackOffAttempts = 10
  22. reconnectBackOffBaseSeconds = 3
  23. )
  24. // MConnConfig returns an MConnConfig with fields updated
  25. // from the P2PConfig.
  26. func MConnConfig(cfg *config.P2PConfig) conn.MConnConfig {
  27. mConfig := conn.DefaultMConnConfig()
  28. mConfig.FlushThrottle = cfg.FlushThrottleTimeout
  29. mConfig.SendRate = cfg.SendRate
  30. mConfig.RecvRate = cfg.RecvRate
  31. mConfig.MaxPacketMsgPayloadSize = cfg.MaxPacketMsgPayloadSize
  32. return mConfig
  33. }
  34. //-----------------------------------------------------------------------------
  35. // An AddrBook represents an address book from the pex package, which is used
  36. // to store peer addresses.
  37. type AddrBook interface {
  38. AddAddress(addr *NetAddress, src *NetAddress) error
  39. AddOurAddress(*NetAddress)
  40. OurAddress(*NetAddress) bool
  41. MarkGood(*NetAddress)
  42. RemoveAddress(*NetAddress)
  43. HasAddress(*NetAddress) bool
  44. Save()
  45. }
  46. // PeerFilterFunc to be implemented by filter hooks after a new Peer has been
  47. // fully setup.
  48. type PeerFilterFunc func(IPeerSet, Peer) error
  49. //-----------------------------------------------------------------------------
  50. // Switch handles peer connections and exposes an API to receive incoming messages
  51. // on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
  52. // or more `Channels`. So while sending outgoing messages is typically performed on the peer,
  53. // incoming messages are received on the reactor.
  54. type Switch struct {
  55. cmn.BaseService
  56. config *config.P2PConfig
  57. reactors map[string]Reactor
  58. chDescs []*conn.ChannelDescriptor
  59. reactorsByCh map[byte]Reactor
  60. peers *PeerSet
  61. dialing *cmn.CMap
  62. reconnecting *cmn.CMap
  63. nodeInfo NodeInfo // our node info
  64. nodeKey *NodeKey // our node privkey
  65. addrBook AddrBook
  66. transport Transport
  67. filterTimeout time.Duration
  68. peerFilters []PeerFilterFunc
  69. rng *cmn.Rand // seed for randomizing dial times and orders
  70. metrics *Metrics
  71. }
  72. // NetAddress returns the address the switch is listening on.
  73. func (sw *Switch) NetAddress() *NetAddress {
  74. addr := sw.transport.NetAddress()
  75. return &addr
  76. }
  77. // SwitchOption sets an optional parameter on the Switch.
  78. type SwitchOption func(*Switch)
  79. // NewSwitch creates a new Switch with the given config.
  80. func NewSwitch(
  81. cfg *config.P2PConfig,
  82. transport Transport,
  83. options ...SwitchOption,
  84. ) *Switch {
  85. sw := &Switch{
  86. config: cfg,
  87. reactors: make(map[string]Reactor),
  88. chDescs: make([]*conn.ChannelDescriptor, 0),
  89. reactorsByCh: make(map[byte]Reactor),
  90. peers: NewPeerSet(),
  91. dialing: cmn.NewCMap(),
  92. reconnecting: cmn.NewCMap(),
  93. metrics: NopMetrics(),
  94. transport: transport,
  95. filterTimeout: defaultFilterTimeout,
  96. }
  97. // Ensure we have a completely undeterministic PRNG.
  98. sw.rng = cmn.NewRand()
  99. sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
  100. for _, option := range options {
  101. option(sw)
  102. }
  103. return sw
  104. }
  105. // SwitchFilterTimeout sets the timeout used for peer filters.
  106. func SwitchFilterTimeout(timeout time.Duration) SwitchOption {
  107. return func(sw *Switch) { sw.filterTimeout = timeout }
  108. }
  109. // SwitchPeerFilters sets the filters for rejection of new peers.
  110. func SwitchPeerFilters(filters ...PeerFilterFunc) SwitchOption {
  111. return func(sw *Switch) { sw.peerFilters = filters }
  112. }
  113. // WithMetrics sets the metrics.
  114. func WithMetrics(metrics *Metrics) SwitchOption {
  115. return func(sw *Switch) { sw.metrics = metrics }
  116. }
  117. //---------------------------------------------------------------------
  118. // Switch setup
  119. // AddReactor adds the given reactor to the switch.
  120. // NOTE: Not goroutine safe.
  121. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
  122. // Validate the reactor.
  123. // No two reactors can share the same channel.
  124. reactorChannels := reactor.GetChannels()
  125. for _, chDesc := range reactorChannels {
  126. chID := chDesc.ID
  127. if sw.reactorsByCh[chID] != nil {
  128. cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
  129. }
  130. sw.chDescs = append(sw.chDescs, chDesc)
  131. sw.reactorsByCh[chID] = reactor
  132. }
  133. sw.reactors[name] = reactor
  134. reactor.SetSwitch(sw)
  135. return reactor
  136. }
  137. // Reactors returns a map of reactors registered on the switch.
  138. // NOTE: Not goroutine safe.
  139. func (sw *Switch) Reactors() map[string]Reactor {
  140. return sw.reactors
  141. }
  142. // Reactor returns the reactor with the given name.
  143. // NOTE: Not goroutine safe.
  144. func (sw *Switch) Reactor(name string) Reactor {
  145. return sw.reactors[name]
  146. }
  147. // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
  148. // NOTE: Not goroutine safe.
  149. func (sw *Switch) SetNodeInfo(nodeInfo NodeInfo) {
  150. sw.nodeInfo = nodeInfo
  151. }
  152. // NodeInfo returns the switch's NodeInfo.
  153. // NOTE: Not goroutine safe.
  154. func (sw *Switch) NodeInfo() NodeInfo {
  155. return sw.nodeInfo
  156. }
  157. // SetNodeKey sets the switch's private key for authenticated encryption.
  158. // NOTE: Not goroutine safe.
  159. func (sw *Switch) SetNodeKey(nodeKey *NodeKey) {
  160. sw.nodeKey = nodeKey
  161. }
  162. //---------------------------------------------------------------------
  163. // Service start/stop
  164. // OnStart implements BaseService. It starts all the reactors and peers.
  165. func (sw *Switch) OnStart() error {
  166. // Start reactors
  167. for _, reactor := range sw.reactors {
  168. err := reactor.Start()
  169. if err != nil {
  170. return cmn.ErrorWrap(err, "failed to start %v", reactor)
  171. }
  172. }
  173. // Start accepting Peers.
  174. go sw.acceptRoutine()
  175. return nil
  176. }
  177. // OnStop implements BaseService. It stops all peers and reactors.
  178. func (sw *Switch) OnStop() {
  179. // Stop peers
  180. for _, p := range sw.peers.List() {
  181. sw.transport.Cleanup(p)
  182. p.Stop()
  183. if sw.peers.Remove(p) {
  184. sw.metrics.Peers.Add(float64(-1))
  185. }
  186. }
  187. // Stop reactors
  188. sw.Logger.Debug("Switch: Stopping reactors")
  189. for _, reactor := range sw.reactors {
  190. reactor.Stop()
  191. }
  192. }
  193. //---------------------------------------------------------------------
  194. // Peers
  195. // Broadcast runs a go routine for each attempted send, which will block trying
  196. // to send for defaultSendTimeoutSeconds. Returns a channel which receives
  197. // success values for each attempted send (false if times out). Channel will be
  198. // closed once msg bytes are sent to all peers (or time out).
  199. //
  200. // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
  201. func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool {
  202. sw.Logger.Debug("Broadcast", "channel", chID, "msgBytes", fmt.Sprintf("%X", msgBytes))
  203. peers := sw.peers.List()
  204. var wg sync.WaitGroup
  205. wg.Add(len(peers))
  206. successChan := make(chan bool, len(peers))
  207. for _, peer := range peers {
  208. go func(p Peer) {
  209. defer wg.Done()
  210. success := p.Send(chID, msgBytes)
  211. successChan <- success
  212. }(peer)
  213. }
  214. go func() {
  215. wg.Wait()
  216. close(successChan)
  217. }()
  218. return successChan
  219. }
  220. // NumPeers returns the count of outbound/inbound and outbound-dialing peers.
  221. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
  222. peers := sw.peers.List()
  223. for _, peer := range peers {
  224. if peer.IsOutbound() {
  225. outbound++
  226. } else {
  227. inbound++
  228. }
  229. }
  230. dialing = sw.dialing.Size()
  231. return
  232. }
  233. // MaxNumOutboundPeers returns a maximum number of outbound peers.
  234. func (sw *Switch) MaxNumOutboundPeers() int {
  235. return sw.config.MaxNumOutboundPeers
  236. }
  237. // Peers returns the set of peers that are connected to the switch.
  238. func (sw *Switch) Peers() IPeerSet {
  239. return sw.peers
  240. }
  241. // StopPeerForError disconnects from a peer due to external error.
  242. // If the peer is persistent, it will attempt to reconnect.
  243. // TODO: make record depending on reason.
  244. func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) {
  245. sw.Logger.Error("Stopping peer for error", "peer", peer, "err", reason)
  246. sw.stopAndRemovePeer(peer, reason)
  247. if peer.IsPersistent() {
  248. go sw.reconnectToPeer(peer.SocketAddr())
  249. }
  250. }
  251. // StopPeerGracefully disconnects from a peer gracefully.
  252. // TODO: handle graceful disconnects.
  253. func (sw *Switch) StopPeerGracefully(peer Peer) {
  254. sw.Logger.Info("Stopping peer gracefully")
  255. sw.stopAndRemovePeer(peer, nil)
  256. }
  257. func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
  258. if sw.peers.Remove(peer) {
  259. sw.metrics.Peers.Add(float64(-1))
  260. }
  261. sw.transport.Cleanup(peer)
  262. peer.Stop()
  263. for _, reactor := range sw.reactors {
  264. reactor.RemovePeer(peer, reason)
  265. }
  266. }
  267. // reconnectToPeer tries to reconnect to the addr, first repeatedly
  268. // with a fixed interval, then with exponential backoff.
  269. // If no success after all that, it stops trying, and leaves it
  270. // to the PEX/Addrbook to find the peer with the addr again
  271. // NOTE: this will keep trying even if the handshake or auth fails.
  272. // TODO: be more explicit with error types so we only retry on certain failures
  273. // - ie. if we're getting ErrDuplicatePeer we can stop
  274. // because the addrbook got us the peer back already
  275. func (sw *Switch) reconnectToPeer(addr *NetAddress) {
  276. if sw.reconnecting.Has(string(addr.ID)) {
  277. return
  278. }
  279. sw.reconnecting.Set(string(addr.ID), addr)
  280. defer sw.reconnecting.Delete(string(addr.ID))
  281. start := time.Now()
  282. sw.Logger.Info("Reconnecting to peer", "addr", addr)
  283. for i := 0; i < reconnectAttempts; i++ {
  284. if !sw.IsRunning() {
  285. return
  286. }
  287. if sw.IsDialingOrExistingAddress(addr) {
  288. sw.Logger.Debug("Peer connection has been established or dialed while we waiting next try", "addr", addr)
  289. return
  290. }
  291. err := sw.DialPeerWithAddress(addr, true)
  292. if err == nil {
  293. return // success
  294. }
  295. sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr)
  296. // sleep a set amount
  297. sw.randomSleep(reconnectInterval)
  298. continue
  299. }
  300. sw.Logger.Error("Failed to reconnect to peer. Beginning exponential backoff",
  301. "addr", addr, "elapsed", time.Since(start))
  302. for i := 0; i < reconnectBackOffAttempts; i++ {
  303. if !sw.IsRunning() {
  304. return
  305. }
  306. // sleep an exponentially increasing amount
  307. sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i))
  308. sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second)
  309. err := sw.DialPeerWithAddress(addr, true)
  310. if err == nil {
  311. return // success
  312. }
  313. sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr)
  314. }
  315. sw.Logger.Error("Failed to reconnect to peer. Giving up", "addr", addr, "elapsed", time.Since(start))
  316. }
  317. // SetAddrBook allows to set address book on Switch.
  318. func (sw *Switch) SetAddrBook(addrBook AddrBook) {
  319. sw.addrBook = addrBook
  320. }
  321. // MarkPeerAsGood marks the given peer as good when it did something useful
  322. // like contributed to consensus.
  323. func (sw *Switch) MarkPeerAsGood(peer Peer) {
  324. if sw.addrBook != nil {
  325. sw.addrBook.MarkGood(peer.SocketAddr())
  326. }
  327. }
  328. //---------------------------------------------------------------------
  329. // Dialing
  330. // DialPeersAsync dials a list of peers asynchronously in random order (optionally, making them persistent).
  331. // Used to dial peers from config on startup or from unsafe-RPC (trusted sources).
  332. // TODO: remove addrBook arg since it's now set on the switch
  333. func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent bool) error {
  334. netAddrs, errs := NewNetAddressStrings(peers)
  335. // only log errors, dial correct addresses
  336. for _, err := range errs {
  337. sw.Logger.Error("Error in peer's address", "err", err)
  338. }
  339. ourAddr := sw.NetAddress()
  340. // TODO: this code feels like it's in the wrong place.
  341. // The integration tests depend on the addrBook being saved
  342. // right away but maybe we can change that. Recall that
  343. // the addrBook is only written to disk every 2min
  344. if addrBook != nil {
  345. // add peers to `addrBook`
  346. for _, netAddr := range netAddrs {
  347. // do not add our address or ID
  348. if !netAddr.Same(ourAddr) {
  349. if err := addrBook.AddAddress(netAddr, ourAddr); err != nil {
  350. sw.Logger.Error("Can't add peer's address to addrbook", "err", err)
  351. }
  352. }
  353. }
  354. // Persist some peers to disk right away.
  355. // NOTE: integration tests depend on this
  356. addrBook.Save()
  357. }
  358. // permute the list, dial them in random order.
  359. perm := sw.rng.Perm(len(netAddrs))
  360. for i := 0; i < len(perm); i++ {
  361. go func(i int) {
  362. j := perm[i]
  363. addr := netAddrs[j]
  364. if addr.Same(ourAddr) {
  365. sw.Logger.Debug("Ignore attempt to connect to ourselves", "addr", addr, "ourAddr", ourAddr)
  366. return
  367. }
  368. sw.randomSleep(0)
  369. if sw.IsDialingOrExistingAddress(addr) {
  370. sw.Logger.Debug("Ignore attempt to connect to an existing peer", "addr", addr)
  371. return
  372. }
  373. err := sw.DialPeerWithAddress(addr, persistent)
  374. if err != nil {
  375. switch err.(type) {
  376. case ErrSwitchConnectToSelf, ErrSwitchDuplicatePeerID:
  377. sw.Logger.Debug("Error dialing peer", "err", err)
  378. default:
  379. sw.Logger.Error("Error dialing peer", "err", err)
  380. }
  381. }
  382. }(i)
  383. }
  384. return nil
  385. }
  386. // DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects and authenticates successfully.
  387. // If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails.
  388. func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) error {
  389. sw.dialing.Set(string(addr.ID), addr)
  390. defer sw.dialing.Delete(string(addr.ID))
  391. return sw.addOutboundPeerWithConfig(addr, sw.config, persistent)
  392. }
  393. // sleep for interval plus some random amount of ms on [0, dialRandomizerIntervalMilliseconds]
  394. func (sw *Switch) randomSleep(interval time.Duration) {
  395. r := time.Duration(sw.rng.Int63n(dialRandomizerIntervalMilliseconds)) * time.Millisecond
  396. time.Sleep(r + interval)
  397. }
  398. // IsDialingOrExistingAddress returns true if switch has a peer with the given
  399. // address or dialing it at the moment.
  400. func (sw *Switch) IsDialingOrExistingAddress(addr *NetAddress) bool {
  401. return sw.dialing.Has(string(addr.ID)) ||
  402. sw.peers.Has(addr.ID) ||
  403. (!sw.config.AllowDuplicateIP && sw.peers.HasIP(addr.IP))
  404. }
  405. func (sw *Switch) acceptRoutine() {
  406. for {
  407. p, err := sw.transport.Accept(peerConfig{
  408. chDescs: sw.chDescs,
  409. onPeerError: sw.StopPeerForError,
  410. reactorsByCh: sw.reactorsByCh,
  411. metrics: sw.metrics,
  412. })
  413. if err != nil {
  414. switch err := err.(type) {
  415. case ErrRejected:
  416. if err.IsSelf() {
  417. // Remove the given address from the address book and add to our addresses
  418. // to avoid dialing in the future.
  419. addr := err.Addr()
  420. sw.addrBook.RemoveAddress(&addr)
  421. sw.addrBook.AddOurAddress(&addr)
  422. }
  423. sw.Logger.Info(
  424. "Inbound Peer rejected",
  425. "err", err,
  426. "numPeers", sw.peers.Size(),
  427. )
  428. continue
  429. case ErrFilterTimeout:
  430. sw.Logger.Error(
  431. "Peer filter timed out",
  432. "err", err,
  433. )
  434. continue
  435. case ErrTransportClosed:
  436. sw.Logger.Error(
  437. "Stopped accept routine, as transport is closed",
  438. "numPeers", sw.peers.Size(),
  439. )
  440. default:
  441. sw.Logger.Error(
  442. "Accept on transport errored",
  443. "err", err,
  444. "numPeers", sw.peers.Size(),
  445. )
  446. // We could instead have a retry loop around the acceptRoutine,
  447. // but that would need to stop and let the node shutdown eventually.
  448. // So might as well panic and let process managers restart the node.
  449. // There's no point in letting the node run without the acceptRoutine,
  450. // since it won't be able to accept new connections.
  451. panic(fmt.Errorf("accept routine exited: %v", err))
  452. }
  453. break
  454. }
  455. // Ignore connection if we already have enough peers.
  456. _, in, _ := sw.NumPeers()
  457. if in >= sw.config.MaxNumInboundPeers {
  458. sw.Logger.Info(
  459. "Ignoring inbound connection: already have enough inbound peers",
  460. "address", p.SocketAddr(),
  461. "have", in,
  462. "max", sw.config.MaxNumInboundPeers,
  463. )
  464. sw.transport.Cleanup(p)
  465. continue
  466. }
  467. if err := sw.addPeer(p); err != nil {
  468. sw.transport.Cleanup(p)
  469. if p.IsRunning() {
  470. _ = p.Stop()
  471. }
  472. sw.Logger.Info(
  473. "Ignoring inbound connection: error while adding peer",
  474. "err", err,
  475. "id", p.ID(),
  476. )
  477. }
  478. }
  479. }
  480. // dial the peer; make secret connection; authenticate against the dialed ID;
  481. // add the peer.
  482. // if dialing fails, start the reconnect loop. If handhsake fails, its over.
  483. // If peer is started succesffuly, reconnectLoop will start when
  484. // StopPeerForError is called
  485. func (sw *Switch) addOutboundPeerWithConfig(
  486. addr *NetAddress,
  487. cfg *config.P2PConfig,
  488. persistent bool,
  489. ) error {
  490. sw.Logger.Info("Dialing peer", "address", addr)
  491. // XXX(xla): Remove the leakage of test concerns in implementation.
  492. if cfg.TestDialFail {
  493. go sw.reconnectToPeer(addr)
  494. return fmt.Errorf("dial err (peerConfig.DialFail == true)")
  495. }
  496. p, err := sw.transport.Dial(*addr, peerConfig{
  497. chDescs: sw.chDescs,
  498. onPeerError: sw.StopPeerForError,
  499. persistent: persistent,
  500. reactorsByCh: sw.reactorsByCh,
  501. metrics: sw.metrics,
  502. })
  503. if err != nil {
  504. switch e := err.(type) {
  505. case ErrRejected:
  506. if e.IsSelf() {
  507. // Remove the given address from the address book and add to our addresses
  508. // to avoid dialing in the future.
  509. sw.addrBook.RemoveAddress(addr)
  510. sw.addrBook.AddOurAddress(addr)
  511. return err
  512. }
  513. }
  514. // retry persistent peers after
  515. // any dial error besides IsSelf()
  516. if persistent {
  517. go sw.reconnectToPeer(addr)
  518. }
  519. return err
  520. }
  521. if err := sw.addPeer(p); err != nil {
  522. sw.transport.Cleanup(p)
  523. if p.IsRunning() {
  524. _ = p.Stop()
  525. }
  526. return err
  527. }
  528. return nil
  529. }
  530. func (sw *Switch) filterPeer(p Peer) error {
  531. // Avoid duplicate
  532. if sw.peers.Has(p.ID()) {
  533. return ErrRejected{id: p.ID(), isDuplicate: true}
  534. }
  535. errc := make(chan error, len(sw.peerFilters))
  536. for _, f := range sw.peerFilters {
  537. go func(f PeerFilterFunc, p Peer, errc chan<- error) {
  538. errc <- f(sw.peers, p)
  539. }(f, p, errc)
  540. }
  541. for i := 0; i < cap(errc); i++ {
  542. select {
  543. case err := <-errc:
  544. if err != nil {
  545. return ErrRejected{id: p.ID(), err: err, isFiltered: true}
  546. }
  547. case <-time.After(sw.filterTimeout):
  548. return ErrFilterTimeout{}
  549. }
  550. }
  551. return nil
  552. }
  553. // addPeer starts up the Peer and adds it to the Switch. Error is returned if
  554. // the peer is filtered out or failed to start or can't be added.
  555. func (sw *Switch) addPeer(p Peer) error {
  556. if err := sw.filterPeer(p); err != nil {
  557. return err
  558. }
  559. p.SetLogger(sw.Logger.With("peer", p.SocketAddr()))
  560. // Handle the shut down case where the switch has stopped but we're
  561. // concurrently trying to add a peer.
  562. if !sw.IsRunning() {
  563. // XXX should this return an error or just log and terminate?
  564. sw.Logger.Error("Won't start a peer - switch is not running", "peer", p)
  565. return nil
  566. }
  567. // Start the peer's send/recv routines.
  568. // Must start it before adding it to the peer set
  569. // to prevent Start and Stop from being called concurrently.
  570. err := p.Start()
  571. if err != nil {
  572. // Should never happen
  573. sw.Logger.Error("Error starting peer", "err", err, "peer", p)
  574. return err
  575. }
  576. // Add the peer to PeerSet. Do this before starting the reactors
  577. // so that if Receive errors, we will find the peer and remove it.
  578. // Add should not err since we already checked peers.Has().
  579. if err := sw.peers.Add(p); err != nil {
  580. return err
  581. }
  582. sw.metrics.Peers.Add(float64(1))
  583. // Start all the reactor protocols on the peer.
  584. for _, reactor := range sw.reactors {
  585. reactor.AddPeer(p)
  586. }
  587. sw.Logger.Info("Added peer", "peer", p)
  588. return nil
  589. }