You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

599 lines
16 KiB

9 years ago
9 years ago
8 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
9 years ago
8 years ago
7 years ago
9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
p2p: introduce peerConn to simplify peer creation (#1226) * expose AuthEnc in the P2P config if AuthEnc is true, dialed peers must have a node ID in the address and it must match the persistent pubkey from the secret handshake. Refs #1157 * fixes after my own review * fix docs * fix build failure ``` p2p/pex/pex_reactor_test.go:288:88: cannot use seed.NodeInfo().NetAddress() (type *p2p.NetAddress) as type string in array or slice literal ``` * p2p: introduce peerConn to simplify peer creation * Introduce `peerConn` containing the known fields of `peer` * `peer` only created in `sw.addPeer` once handshake is complete and NodeInfo is checked * Eliminates some mutable variables and makes the code flow better * Simplifies the `newXxxPeer` funcs * Use ID instead of PubKey where possible. * SetPubKeyFilter -> SetIDFilter * nodeInfo.Validate takes ID * remove peer.PubKey() * persistent node ids * fixes from review * test: use ip_plus_id.sh more * fix invalid memory panic during fast_sync test ``` 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: panic: runtime error: invalid memory address or nil pointer dereference 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: [signal SIGSEGV: segmentation violation code=0x1 addr=0x20 pc=0x98dd3e] 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: goroutine 3432 [running]: 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: github.com/tendermint/tendermint/p2p.newOutboundPeerConn(0xc423fd1380, 0xc420933e00, 0x1, 0x1239a60, 0 xc420128c40, 0x2, 0x42caf6, 0xc42001f300, 0xc422831d98, 0xc4227951c0, ...) 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/peer.go:123 +0x31e 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: github.com/tendermint/tendermint/p2p.(*Switch).addOutboundPeerWithConfig(0xc4200ad040, 0xc423fd1380, 0 xc420933e00, 0xc423f48801, 0x28, 0x2) 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/switch.go:455 +0x12b 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: github.com/tendermint/tendermint/p2p.(*Switch).DialPeerWithAddress(0xc4200ad040, 0xc423fd1380, 0x1, 0x 0, 0x0) 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/switch.go:371 +0xdc 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: github.com/tendermint/tendermint/p2p.(*Switch).reconnectToPeer(0xc4200ad040, 0x123e000, 0xc42007bb00) 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/switch.go:290 +0x25f 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: created by github.com/tendermint/tendermint/p2p.(*Switch).StopPeerForError 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/switch.go:256 +0x1b7 ```
7 years ago
p2p: introduce peerConn to simplify peer creation (#1226) * expose AuthEnc in the P2P config if AuthEnc is true, dialed peers must have a node ID in the address and it must match the persistent pubkey from the secret handshake. Refs #1157 * fixes after my own review * fix docs * fix build failure ``` p2p/pex/pex_reactor_test.go:288:88: cannot use seed.NodeInfo().NetAddress() (type *p2p.NetAddress) as type string in array or slice literal ``` * p2p: introduce peerConn to simplify peer creation * Introduce `peerConn` containing the known fields of `peer` * `peer` only created in `sw.addPeer` once handshake is complete and NodeInfo is checked * Eliminates some mutable variables and makes the code flow better * Simplifies the `newXxxPeer` funcs * Use ID instead of PubKey where possible. * SetPubKeyFilter -> SetIDFilter * nodeInfo.Validate takes ID * remove peer.PubKey() * persistent node ids * fixes from review * test: use ip_plus_id.sh more * fix invalid memory panic during fast_sync test ``` 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: panic: runtime error: invalid memory address or nil pointer dereference 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: [signal SIGSEGV: segmentation violation code=0x1 addr=0x20 pc=0x98dd3e] 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: goroutine 3432 [running]: 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: github.com/tendermint/tendermint/p2p.newOutboundPeerConn(0xc423fd1380, 0xc420933e00, 0x1, 0x1239a60, 0 xc420128c40, 0x2, 0x42caf6, 0xc42001f300, 0xc422831d98, 0xc4227951c0, ...) 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/peer.go:123 +0x31e 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: github.com/tendermint/tendermint/p2p.(*Switch).addOutboundPeerWithConfig(0xc4200ad040, 0xc423fd1380, 0 xc420933e00, 0xc423f48801, 0x28, 0x2) 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/switch.go:455 +0x12b 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: github.com/tendermint/tendermint/p2p.(*Switch).DialPeerWithAddress(0xc4200ad040, 0xc423fd1380, 0x1, 0x 0, 0x0) 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/switch.go:371 +0xdc 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: github.com/tendermint/tendermint/p2p.(*Switch).reconnectToPeer(0xc4200ad040, 0x123e000, 0xc42007bb00) 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/switch.go:290 +0x25f 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: created by github.com/tendermint/tendermint/p2p.(*Switch).StopPeerForError 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/switch.go:256 +0x1b7 ```
7 years ago
p2p: introduce peerConn to simplify peer creation (#1226) * expose AuthEnc in the P2P config if AuthEnc is true, dialed peers must have a node ID in the address and it must match the persistent pubkey from the secret handshake. Refs #1157 * fixes after my own review * fix docs * fix build failure ``` p2p/pex/pex_reactor_test.go:288:88: cannot use seed.NodeInfo().NetAddress() (type *p2p.NetAddress) as type string in array or slice literal ``` * p2p: introduce peerConn to simplify peer creation * Introduce `peerConn` containing the known fields of `peer` * `peer` only created in `sw.addPeer` once handshake is complete and NodeInfo is checked * Eliminates some mutable variables and makes the code flow better * Simplifies the `newXxxPeer` funcs * Use ID instead of PubKey where possible. * SetPubKeyFilter -> SetIDFilter * nodeInfo.Validate takes ID * remove peer.PubKey() * persistent node ids * fixes from review * test: use ip_plus_id.sh more * fix invalid memory panic during fast_sync test ``` 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: panic: runtime error: invalid memory address or nil pointer dereference 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: [signal SIGSEGV: segmentation violation code=0x1 addr=0x20 pc=0x98dd3e] 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: goroutine 3432 [running]: 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: github.com/tendermint/tendermint/p2p.newOutboundPeerConn(0xc423fd1380, 0xc420933e00, 0x1, 0x1239a60, 0 xc420128c40, 0x2, 0x42caf6, 0xc42001f300, 0xc422831d98, 0xc4227951c0, ...) 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/peer.go:123 +0x31e 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: github.com/tendermint/tendermint/p2p.(*Switch).addOutboundPeerWithConfig(0xc4200ad040, 0xc423fd1380, 0 xc420933e00, 0xc423f48801, 0x28, 0x2) 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/switch.go:455 +0x12b 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: github.com/tendermint/tendermint/p2p.(*Switch).DialPeerWithAddress(0xc4200ad040, 0xc423fd1380, 0x1, 0x 0, 0x0) 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/switch.go:371 +0xdc 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: github.com/tendermint/tendermint/p2p.(*Switch).reconnectToPeer(0xc4200ad040, 0x123e000, 0xc42007bb00) 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/switch.go:290 +0x25f 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: created by github.com/tendermint/tendermint/p2p.(*Switch).StopPeerForError 2018-02-21T06:30:05Z box887.localdomain docker/local_testnet_4[14907]: #011/go/src/github.com/tendermint/tendermint/p2p/switch.go:256 +0x1b7 ```
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package pex
  2. import (
  3. "bytes"
  4. "fmt"
  5. "math/rand"
  6. "reflect"
  7. "sort"
  8. "sync"
  9. "time"
  10. "github.com/pkg/errors"
  11. wire "github.com/tendermint/go-wire"
  12. cmn "github.com/tendermint/tmlibs/common"
  13. "github.com/tendermint/tendermint/p2p"
  14. "github.com/tendermint/tendermint/p2p/conn"
  15. )
  16. type Peer = p2p.Peer
  17. const (
  18. // PexChannel is a channel for PEX messages
  19. PexChannel = byte(0x00)
  20. maxPexMessageSize = 1048576 // 1MB
  21. // ensure we have enough peers
  22. defaultEnsurePeersPeriod = 30 * time.Second
  23. defaultMinNumOutboundPeers = 10
  24. // Seed/Crawler constants
  25. // TODO:
  26. // We want seeds to only advertise good peers.
  27. // Peers are marked by external mechanisms.
  28. // We need a config value that can be set to be
  29. // on the order of how long it would take before a good
  30. // peer is marked good.
  31. defaultSeedDisconnectWaitPeriod = 2 * time.Minute // disconnect after this
  32. defaultCrawlPeerInterval = 2 * time.Minute // dont redial for this. TODO: back-off
  33. defaultCrawlPeersPeriod = 30 * time.Second // check some peers every this
  34. )
  35. // PEXReactor handles PEX (peer exchange) and ensures that an
  36. // adequate number of peers are connected to the switch.
  37. //
  38. // It uses `AddrBook` (address book) to store `NetAddress`es of the peers.
  39. //
  40. // ## Preventing abuse
  41. //
  42. // Only accept pexAddrsMsg from peers we sent a corresponding pexRequestMsg too.
  43. // Only accept one pexRequestMsg every ~defaultEnsurePeersPeriod.
  44. type PEXReactor struct {
  45. p2p.BaseReactor
  46. book AddrBook
  47. config *PEXReactorConfig
  48. ensurePeersPeriod time.Duration
  49. // maps to prevent abuse
  50. requestsSent *cmn.CMap // ID->struct{}: unanswered send requests
  51. lastReceivedRequests *cmn.CMap // ID->time.Time: last time peer requested from us
  52. attemptsToDial sync.Map // dial addr -> number of attempts to dial (for exponential backoff)
  53. }
  54. // PEXReactorConfig holds reactor specific configuration data.
  55. type PEXReactorConfig struct {
  56. // Seed/Crawler mode
  57. SeedMode bool
  58. // Seeds is a list of addresses reactor may use
  59. // if it can't connect to peers in the addrbook.
  60. Seeds []string
  61. }
  62. // NewPEXReactor creates new PEX reactor.
  63. func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor {
  64. r := &PEXReactor{
  65. book: b,
  66. config: config,
  67. ensurePeersPeriod: defaultEnsurePeersPeriod,
  68. requestsSent: cmn.NewCMap(),
  69. lastReceivedRequests: cmn.NewCMap(),
  70. }
  71. r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r)
  72. return r
  73. }
  74. // OnStart implements BaseService
  75. func (r *PEXReactor) OnStart() error {
  76. if err := r.BaseReactor.OnStart(); err != nil {
  77. return err
  78. }
  79. err := r.book.Start()
  80. if err != nil && err != cmn.ErrAlreadyStarted {
  81. return err
  82. }
  83. // return err if user provided a bad seed address
  84. if err := r.checkSeeds(); err != nil {
  85. return err
  86. }
  87. // Check if this node should run
  88. // in seed/crawler mode
  89. if r.config.SeedMode {
  90. go r.crawlPeersRoutine()
  91. } else {
  92. go r.ensurePeersRoutine()
  93. }
  94. return nil
  95. }
  96. // OnStop implements BaseService
  97. func (r *PEXReactor) OnStop() {
  98. r.BaseReactor.OnStop()
  99. r.book.Stop()
  100. }
  101. // GetChannels implements Reactor
  102. func (r *PEXReactor) GetChannels() []*conn.ChannelDescriptor {
  103. return []*conn.ChannelDescriptor{
  104. {
  105. ID: PexChannel,
  106. Priority: 1,
  107. SendQueueCapacity: 10,
  108. },
  109. }
  110. }
  111. // AddPeer implements Reactor by adding peer to the address book (if inbound)
  112. // or by requesting more addresses (if outbound).
  113. func (r *PEXReactor) AddPeer(p Peer) {
  114. if p.IsOutbound() {
  115. // For outbound peers, the address is already in the books -
  116. // either via DialPeersAsync or r.Receive.
  117. // Ask it for more peers if we need.
  118. if r.book.NeedMoreAddrs() {
  119. r.RequestAddrs(p)
  120. }
  121. } else {
  122. // For inbound peers, the peer is its own source,
  123. // and its NodeInfo has already been validated.
  124. // Let the ensurePeersRoutine handle asking for more
  125. // peers when we need - we don't trust inbound peers as much.
  126. addr := p.NodeInfo().NetAddress()
  127. r.book.AddAddress(addr, addr)
  128. }
  129. }
  130. // RemovePeer implements Reactor.
  131. func (r *PEXReactor) RemovePeer(p Peer, reason interface{}) {
  132. id := string(p.ID())
  133. r.requestsSent.Delete(id)
  134. r.lastReceivedRequests.Delete(id)
  135. }
  136. // Receive implements Reactor by handling incoming PEX messages.
  137. func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) {
  138. _, msg, err := DecodeMessage(msgBytes)
  139. if err != nil {
  140. r.Logger.Error("Error decoding message", "err", err)
  141. return
  142. }
  143. r.Logger.Debug("Received message", "src", src, "chId", chID, "msg", msg)
  144. switch msg := msg.(type) {
  145. case *pexRequestMessage:
  146. // Check we're not receiving too many requests
  147. if err := r.receiveRequest(src); err != nil {
  148. r.Switch.StopPeerForError(src, err)
  149. return
  150. }
  151. // Seeds disconnect after sending a batch of addrs
  152. if r.config.SeedMode {
  153. // TODO: should we be more selective ?
  154. r.SendAddrs(src, r.book.GetSelection())
  155. r.Switch.StopPeerGracefully(src)
  156. } else {
  157. r.SendAddrs(src, r.book.GetSelection())
  158. }
  159. case *pexAddrsMessage:
  160. // If we asked for addresses, add them to the book
  161. if err := r.ReceiveAddrs(msg.Addrs, src); err != nil {
  162. r.Switch.StopPeerForError(src, err)
  163. return
  164. }
  165. default:
  166. r.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
  167. }
  168. }
  169. func (r *PEXReactor) receiveRequest(src Peer) error {
  170. id := string(src.ID())
  171. v := r.lastReceivedRequests.Get(id)
  172. if v == nil {
  173. // initialize with empty time
  174. lastReceived := time.Time{}
  175. r.lastReceivedRequests.Set(id, lastReceived)
  176. return nil
  177. }
  178. lastReceived := v.(time.Time)
  179. if lastReceived.Equal(time.Time{}) {
  180. // first time gets a free pass. then we start tracking the time
  181. lastReceived = time.Now()
  182. r.lastReceivedRequests.Set(id, lastReceived)
  183. return nil
  184. }
  185. now := time.Now()
  186. if now.Sub(lastReceived) < r.ensurePeersPeriod/3 {
  187. return fmt.Errorf("Peer (%v) is sending too many PEX requests. Disconnecting", src.ID())
  188. }
  189. r.lastReceivedRequests.Set(id, now)
  190. return nil
  191. }
  192. // RequestAddrs asks peer for more addresses if we do not already
  193. // have a request out for this peer.
  194. func (r *PEXReactor) RequestAddrs(p Peer) {
  195. id := string(p.ID())
  196. if r.requestsSent.Has(id) {
  197. return
  198. }
  199. r.requestsSent.Set(id, struct{}{})
  200. p.Send(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
  201. }
  202. // ReceiveAddrs adds the given addrs to the addrbook if theres an open
  203. // request for this peer and deletes the open request.
  204. // If there's no open request for the src peer, it returns an error.
  205. func (r *PEXReactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error {
  206. id := string(src.ID())
  207. if !r.requestsSent.Has(id) {
  208. return errors.New("Received unsolicited pexAddrsMessage")
  209. }
  210. r.requestsSent.Delete(id)
  211. srcAddr := src.NodeInfo().NetAddress()
  212. for _, netAddr := range addrs {
  213. if netAddr != nil {
  214. r.book.AddAddress(netAddr, srcAddr)
  215. }
  216. }
  217. return nil
  218. }
  219. // SendAddrs sends addrs to the peer.
  220. func (r *PEXReactor) SendAddrs(p Peer, netAddrs []*p2p.NetAddress) {
  221. p.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: netAddrs}})
  222. }
  223. // SetEnsurePeersPeriod sets period to ensure peers connected.
  224. func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) {
  225. r.ensurePeersPeriod = d
  226. }
  227. // Ensures that sufficient peers are connected. (continuous)
  228. func (r *PEXReactor) ensurePeersRoutine() {
  229. var (
  230. seed = rand.New(rand.NewSource(time.Now().UnixNano()))
  231. jitter = seed.Int63n(r.ensurePeersPeriod.Nanoseconds())
  232. )
  233. // Randomize first round of communication to avoid thundering herd.
  234. // If no potential peers are present directly start connecting so we guarantee
  235. // swift setup with the help of configured seeds.
  236. if r.hasPotentialPeers() {
  237. time.Sleep(time.Duration(jitter))
  238. }
  239. // fire once immediately.
  240. // ensures we dial the seeds right away if the book is empty
  241. r.ensurePeers()
  242. // fire periodically
  243. ticker := time.NewTicker(r.ensurePeersPeriod)
  244. for {
  245. select {
  246. case <-ticker.C:
  247. r.ensurePeers()
  248. case <-r.Quit():
  249. ticker.Stop()
  250. return
  251. }
  252. }
  253. }
  254. // ensurePeers ensures that sufficient peers are connected. (once)
  255. //
  256. // heuristic that we haven't perfected yet, or, perhaps is manually edited by
  257. // the node operator. It should not be used to compute what addresses are
  258. // already connected or not.
  259. func (r *PEXReactor) ensurePeers() {
  260. var (
  261. out, in, dial = r.Switch.NumPeers()
  262. numToDial = defaultMinNumOutboundPeers - (out + dial)
  263. )
  264. r.Logger.Info(
  265. "Ensure peers",
  266. "numOutPeers", out,
  267. "numInPeers", in,
  268. "numDialing", dial,
  269. "numToDial", numToDial,
  270. )
  271. if numToDial <= 0 {
  272. return
  273. }
  274. // bias to prefer more vetted peers when we have fewer connections.
  275. // not perfect, but somewhate ensures that we prioritize connecting to more-vetted
  276. // NOTE: range here is [10, 90]. Too high ?
  277. newBias := cmn.MinInt(out, 8)*10 + 10
  278. toDial := make(map[p2p.ID]*p2p.NetAddress)
  279. // Try maxAttempts times to pick numToDial addresses to dial
  280. maxAttempts := numToDial * 3
  281. for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ {
  282. try := r.book.PickAddress(newBias)
  283. if try == nil {
  284. continue
  285. }
  286. if _, selected := toDial[try.ID]; selected {
  287. continue
  288. }
  289. if dialling := r.Switch.IsDialing(try.ID); dialling {
  290. continue
  291. }
  292. if connected := r.Switch.Peers().Has(try.ID); connected {
  293. continue
  294. }
  295. r.Logger.Info("Will dial address", "addr", try)
  296. toDial[try.ID] = try
  297. }
  298. // Dial picked addresses
  299. for _, addr := range toDial {
  300. go r.dialPeer(addr)
  301. }
  302. // If we need more addresses, pick a random peer and ask for more.
  303. if r.book.NeedMoreAddrs() {
  304. peers := r.Switch.Peers().List()
  305. peersCount := len(peers)
  306. if peersCount > 0 {
  307. peer := peers[rand.Int()%peersCount] // nolint: gas
  308. r.Logger.Info("We need more addresses. Sending pexRequest to random peer", "peer", peer)
  309. r.RequestAddrs(peer)
  310. }
  311. }
  312. // If we are not connected to nor dialing anybody, fallback to dialing a seed.
  313. if out+in+dial+len(toDial) == 0 {
  314. r.Logger.Info("No addresses to dial nor connected peers. Falling back to seeds")
  315. r.dialSeeds()
  316. }
  317. }
  318. func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) {
  319. // 1s == (1e9 ns) == (1 Billion ns)
  320. billionNs := float64(time.Second.Nanoseconds())
  321. // exponential backoff if it's not our first attempt to dial given address
  322. var attempts int
  323. if lAttempts, attempted := r.attemptsToDial.Load(addr.DialString()); attempted {
  324. attempts = lAttempts.(int)
  325. jitterSeconds := time.Duration(rand.Float64() * billionNs)
  326. backoffDuration := jitterSeconds + ((1 << uint(attempts)) * time.Second)
  327. r.Logger.Debug(fmt.Sprintf("Dialing %v", addr), "attempts", attempts, "backoff_duration", backoffDuration)
  328. time.Sleep(backoffDuration)
  329. }
  330. err := r.Switch.DialPeerWithAddress(addr, false)
  331. if err != nil {
  332. r.Logger.Error("Dialing failed", "err", err)
  333. // TODO: detect more "bad peer" scenarios
  334. if _, ok := err.(p2p.ErrSwitchAuthenticationFailure); ok {
  335. r.book.MarkBad(addr)
  336. } else {
  337. r.book.MarkAttempt(addr)
  338. }
  339. // record attempt
  340. r.attemptsToDial.Store(addr.DialString(), attempts+1)
  341. } else {
  342. // cleanup any history
  343. r.attemptsToDial.Delete(addr.DialString())
  344. }
  345. }
  346. // check seed addresses are well formed
  347. func (r *PEXReactor) checkSeeds() error {
  348. lSeeds := len(r.config.Seeds)
  349. if lSeeds == 0 {
  350. return nil
  351. }
  352. _, errs := p2p.NewNetAddressStrings(r.config.Seeds)
  353. for _, err := range errs {
  354. if err != nil {
  355. return err
  356. }
  357. }
  358. return nil
  359. }
  360. // randomly dial seeds until we connect to one or exhaust them
  361. func (r *PEXReactor) dialSeeds() {
  362. lSeeds := len(r.config.Seeds)
  363. if lSeeds == 0 {
  364. return
  365. }
  366. seedAddrs, _ := p2p.NewNetAddressStrings(r.config.Seeds)
  367. perm := rand.Perm(lSeeds)
  368. // perm := r.Switch.rng.Perm(lSeeds)
  369. for _, i := range perm {
  370. // dial a random seed
  371. seedAddr := seedAddrs[i]
  372. err := r.Switch.DialPeerWithAddress(seedAddr, false)
  373. if err == nil {
  374. return
  375. }
  376. r.Switch.Logger.Error("Error dialing seed", "err", err, "seed", seedAddr)
  377. }
  378. r.Switch.Logger.Error("Couldn't connect to any seeds")
  379. }
  380. //----------------------------------------------------------
  381. // Explores the network searching for more peers. (continuous)
  382. // Seed/Crawler Mode causes this node to quickly disconnect
  383. // from peers, except other seed nodes.
  384. func (r *PEXReactor) crawlPeersRoutine() {
  385. // Do an initial crawl
  386. r.crawlPeers()
  387. // Fire periodically
  388. ticker := time.NewTicker(defaultCrawlPeersPeriod)
  389. for {
  390. select {
  391. case <-ticker.C:
  392. r.attemptDisconnects()
  393. r.crawlPeers()
  394. case <-r.Quit():
  395. return
  396. }
  397. }
  398. }
  399. // hasPotentialPeers indicates if there is a potential peer to connect to, by
  400. // consulting the Switch as well as the AddrBook.
  401. func (r *PEXReactor) hasPotentialPeers() bool {
  402. out, in, dial := r.Switch.NumPeers()
  403. return out+in+dial > 0 && len(r.book.ListOfKnownAddresses()) > 0
  404. }
  405. // crawlPeerInfo handles temporary data needed for the
  406. // network crawling performed during seed/crawler mode.
  407. type crawlPeerInfo struct {
  408. // The listening address of a potential peer we learned about
  409. Addr *p2p.NetAddress
  410. // The last time we attempt to reach this address
  411. LastAttempt time.Time
  412. // The last time we successfully reached this address
  413. LastSuccess time.Time
  414. }
  415. // oldestFirst implements sort.Interface for []crawlPeerInfo
  416. // based on the LastAttempt field.
  417. type oldestFirst []crawlPeerInfo
  418. func (of oldestFirst) Len() int { return len(of) }
  419. func (of oldestFirst) Swap(i, j int) { of[i], of[j] = of[j], of[i] }
  420. func (of oldestFirst) Less(i, j int) bool { return of[i].LastAttempt.Before(of[j].LastAttempt) }
  421. // getPeersToCrawl returns addresses of potential peers that we wish to validate.
  422. // NOTE: The status information is ordered as described above.
  423. func (r *PEXReactor) getPeersToCrawl() []crawlPeerInfo {
  424. var of oldestFirst
  425. // TODO: be more selective
  426. addrs := r.book.ListOfKnownAddresses()
  427. for _, addr := range addrs {
  428. if len(addr.ID()) == 0 {
  429. continue // dont use peers without id
  430. }
  431. of = append(of, crawlPeerInfo{
  432. Addr: addr.Addr,
  433. LastAttempt: addr.LastAttempt,
  434. LastSuccess: addr.LastSuccess,
  435. })
  436. }
  437. sort.Sort(of)
  438. return of
  439. }
  440. // crawlPeers will crawl the network looking for new peer addresses. (once)
  441. func (r *PEXReactor) crawlPeers() {
  442. peerInfos := r.getPeersToCrawl()
  443. now := time.Now()
  444. // Use addresses we know of to reach additional peers
  445. for _, pi := range peerInfos {
  446. // Do not attempt to connect with peers we recently dialed
  447. if now.Sub(pi.LastAttempt) < defaultCrawlPeerInterval {
  448. continue
  449. }
  450. // Otherwise, attempt to connect with the known address
  451. err := r.Switch.DialPeerWithAddress(pi.Addr, false)
  452. if err != nil {
  453. r.book.MarkAttempt(pi.Addr)
  454. continue
  455. }
  456. }
  457. // Crawl the connected peers asking for more addresses
  458. for _, pi := range peerInfos {
  459. // We will wait a minimum period of time before crawling peers again
  460. if now.Sub(pi.LastAttempt) >= defaultCrawlPeerInterval {
  461. peer := r.Switch.Peers().Get(pi.Addr.ID)
  462. if peer != nil {
  463. r.RequestAddrs(peer)
  464. }
  465. }
  466. }
  467. }
  468. // attemptDisconnects checks if we've been with each peer long enough to disconnect
  469. func (r *PEXReactor) attemptDisconnects() {
  470. for _, peer := range r.Switch.Peers().List() {
  471. status := peer.Status()
  472. if status.Duration < defaultSeedDisconnectWaitPeriod {
  473. continue
  474. }
  475. if peer.IsPersistent() {
  476. continue
  477. }
  478. r.Switch.StopPeerGracefully(peer)
  479. }
  480. }
  481. //-----------------------------------------------------------------------------
  482. // Messages
  483. const (
  484. msgTypeRequest = byte(0x01)
  485. msgTypeAddrs = byte(0x02)
  486. )
  487. // PexMessage is a primary type for PEX messages. Underneath, it could contain
  488. // either pexRequestMessage, or pexAddrsMessage messages.
  489. type PexMessage interface{}
  490. var _ = wire.RegisterInterface(
  491. struct{ PexMessage }{},
  492. wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
  493. wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
  494. )
  495. // DecodeMessage implements interface registered above.
  496. func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
  497. msgType = bz[0]
  498. n := new(int)
  499. r := bytes.NewReader(bz)
  500. msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
  501. return
  502. }
  503. /*
  504. A pexRequestMessage requests additional peer addresses.
  505. */
  506. type pexRequestMessage struct {
  507. }
  508. func (m *pexRequestMessage) String() string {
  509. return "[pexRequest]"
  510. }
  511. /*
  512. A message with announced peer addresses.
  513. */
  514. type pexAddrsMessage struct {
  515. Addrs []*p2p.NetAddress
  516. }
  517. func (m *pexAddrsMessage) String() string {
  518. return fmt.Sprintf("[pexAddrs %v]", m.Addrs)
  519. }