You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

540 lines
14 KiB

9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
9 years ago
8 years ago
7 years ago
9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bytes"
  4. "fmt"
  5. "math/rand"
  6. "reflect"
  7. "sort"
  8. "time"
  9. "github.com/pkg/errors"
  10. wire "github.com/tendermint/go-wire"
  11. cmn "github.com/tendermint/tmlibs/common"
  12. )
  13. const (
  14. // PexChannel is a channel for PEX messages
  15. PexChannel = byte(0x00)
  16. maxPexMessageSize = 1048576 // 1MB
  17. // ensure we have enough peers
  18. defaultEnsurePeersPeriod = 30 * time.Second
  19. defaultMinNumOutboundPeers = 10
  20. // Seed/Crawler constants
  21. // TODO:
  22. // We want seeds to only advertise good peers.
  23. // Peers are marked by external mechanisms.
  24. // We need a config value that can be set to be
  25. // on the order of how long it would take before a good
  26. // peer is marked good.
  27. defaultSeedDisconnectWaitPeriod = 2 * time.Minute // disconnect after this
  28. defaultCrawlPeerInterval = 2 * time.Minute // dont redial for this. TODO: back-off
  29. defaultCrawlPeersPeriod = 30 * time.Second // check some peers every this
  30. )
  31. // PEXReactor handles PEX (peer exchange) and ensures that an
  32. // adequate number of peers are connected to the switch.
  33. //
  34. // It uses `AddrBook` (address book) to store `NetAddress`es of the peers.
  35. //
  36. // ## Preventing abuse
  37. //
  38. // Only accept pexAddrsMsg from peers we sent a corresponding pexRequestMsg too.
  39. // Only accept one pexRequestMsg every ~defaultEnsurePeersPeriod.
  40. type PEXReactor struct {
  41. BaseReactor
  42. book *AddrBook
  43. config *PEXReactorConfig
  44. ensurePeersPeriod time.Duration
  45. // maps to prevent abuse
  46. requestsSent *cmn.CMap // ID->struct{}: unanswered send requests
  47. lastReceivedRequests *cmn.CMap // ID->time.Time: last time peer requested from us
  48. }
  49. // PEXReactorConfig holds reactor specific configuration data.
  50. type PEXReactorConfig struct {
  51. // Seed/Crawler mode
  52. SeedMode bool
  53. // Seeds is a list of addresses reactor may use
  54. // if it can't connect to peers in the addrbook.
  55. Seeds []string
  56. }
  57. // NewPEXReactor creates new PEX reactor.
  58. func NewPEXReactor(b *AddrBook, config *PEXReactorConfig) *PEXReactor {
  59. r := &PEXReactor{
  60. book: b,
  61. config: config,
  62. ensurePeersPeriod: defaultEnsurePeersPeriod,
  63. requestsSent: cmn.NewCMap(),
  64. lastReceivedRequests: cmn.NewCMap(),
  65. }
  66. r.BaseReactor = *NewBaseReactor("PEXReactor", r)
  67. return r
  68. }
  69. // OnStart implements BaseService
  70. func (r *PEXReactor) OnStart() error {
  71. if err := r.BaseReactor.OnStart(); err != nil {
  72. return err
  73. }
  74. err := r.book.Start()
  75. if err != nil && err != cmn.ErrAlreadyStarted {
  76. return err
  77. }
  78. // return err if user provided a bad seed address
  79. if err := r.checkSeeds(); err != nil {
  80. return err
  81. }
  82. // Check if this node should run
  83. // in seed/crawler mode
  84. if r.config.SeedMode {
  85. go r.crawlPeersRoutine()
  86. } else {
  87. go r.ensurePeersRoutine()
  88. }
  89. return nil
  90. }
  91. // OnStop implements BaseService
  92. func (r *PEXReactor) OnStop() {
  93. r.BaseReactor.OnStop()
  94. r.book.Stop()
  95. }
  96. // GetChannels implements Reactor
  97. func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
  98. return []*ChannelDescriptor{
  99. {
  100. ID: PexChannel,
  101. Priority: 1,
  102. SendQueueCapacity: 10,
  103. },
  104. }
  105. }
  106. // AddPeer implements Reactor by adding peer to the address book (if inbound)
  107. // or by requesting more addresses (if outbound).
  108. func (r *PEXReactor) AddPeer(p Peer) {
  109. if p.IsOutbound() {
  110. // For outbound peers, the address is already in the books -
  111. // either via DialPeersAsync or r.Receive.
  112. // Ask it for more peers if we need.
  113. if r.book.NeedMoreAddrs() {
  114. r.RequestAddrs(p)
  115. }
  116. } else {
  117. // For inbound peers, the peer is its own source,
  118. // and its NodeInfo has already been validated.
  119. // Let the ensurePeersRoutine handle asking for more
  120. // peers when we need - we don't trust inbound peers as much.
  121. addr := p.NodeInfo().NetAddress()
  122. r.book.AddAddress(addr, addr)
  123. }
  124. }
  125. // RemovePeer implements Reactor.
  126. func (r *PEXReactor) RemovePeer(p Peer, reason interface{}) {
  127. id := string(p.ID())
  128. r.requestsSent.Delete(id)
  129. r.lastReceivedRequests.Delete(id)
  130. }
  131. // Receive implements Reactor by handling incoming PEX messages.
  132. func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) {
  133. _, msg, err := DecodeMessage(msgBytes)
  134. if err != nil {
  135. r.Logger.Error("Error decoding message", "err", err)
  136. return
  137. }
  138. r.Logger.Debug("Received message", "src", src, "chId", chID, "msg", msg)
  139. switch msg := msg.(type) {
  140. case *pexRequestMessage:
  141. // Check we're not receiving too many requests
  142. if err := r.receiveRequest(src); err != nil {
  143. r.Switch.StopPeerForError(src, err)
  144. return
  145. }
  146. // Seeds disconnect after sending a batch of addrs
  147. if r.config.SeedMode {
  148. // TODO: should we be more selective ?
  149. r.SendAddrs(src, r.book.GetSelection())
  150. r.Switch.StopPeerGracefully(src)
  151. } else {
  152. r.SendAddrs(src, r.book.GetSelection())
  153. }
  154. case *pexAddrsMessage:
  155. // If we asked for addresses, add them to the book
  156. if err := r.ReceiveAddrs(msg.Addrs, src); err != nil {
  157. r.Switch.StopPeerForError(src, err)
  158. return
  159. }
  160. default:
  161. r.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
  162. }
  163. }
  164. func (r *PEXReactor) receiveRequest(src Peer) error {
  165. id := string(src.ID())
  166. v := r.lastReceivedRequests.Get(id)
  167. if v == nil {
  168. // initialize with empty time
  169. lastReceived := time.Time{}
  170. r.lastReceivedRequests.Set(id, lastReceived)
  171. return nil
  172. }
  173. lastReceived := v.(time.Time)
  174. if lastReceived.Equal(time.Time{}) {
  175. // first time gets a free pass. then we start tracking the time
  176. lastReceived = time.Now()
  177. r.lastReceivedRequests.Set(id, lastReceived)
  178. return nil
  179. }
  180. now := time.Now()
  181. if now.Sub(lastReceived) < r.ensurePeersPeriod/3 {
  182. return fmt.Errorf("Peer (%v) is sending too many PEX requests. Disconnecting", src.ID())
  183. }
  184. r.lastReceivedRequests.Set(id, now)
  185. return nil
  186. }
  187. // RequestAddrs asks peer for more addresses if we do not already
  188. // have a request out for this peer.
  189. func (r *PEXReactor) RequestAddrs(p Peer) {
  190. id := string(p.ID())
  191. if r.requestsSent.Has(id) {
  192. return
  193. }
  194. r.requestsSent.Set(id, struct{}{})
  195. p.Send(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
  196. }
  197. // ReceiveAddrs adds the given addrs to the addrbook if theres an open
  198. // request for this peer and deletes the open request.
  199. // If there's no open request for the src peer, it returns an error.
  200. func (r *PEXReactor) ReceiveAddrs(addrs []*NetAddress, src Peer) error {
  201. id := string(src.ID())
  202. if !r.requestsSent.Has(id) {
  203. return errors.New("Received unsolicited pexAddrsMessage")
  204. }
  205. r.requestsSent.Delete(id)
  206. srcAddr := src.NodeInfo().NetAddress()
  207. for _, netAddr := range addrs {
  208. if netAddr != nil {
  209. r.book.AddAddress(netAddr, srcAddr)
  210. }
  211. }
  212. return nil
  213. }
  214. // SendAddrs sends addrs to the peer.
  215. func (r *PEXReactor) SendAddrs(p Peer, netAddrs []*NetAddress) {
  216. p.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: netAddrs}})
  217. }
  218. // SetEnsurePeersPeriod sets period to ensure peers connected.
  219. func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) {
  220. r.ensurePeersPeriod = d
  221. }
  222. // Ensures that sufficient peers are connected. (continuous)
  223. func (r *PEXReactor) ensurePeersRoutine() {
  224. // Randomize when routine starts
  225. ensurePeersPeriodMs := r.ensurePeersPeriod.Nanoseconds() / 1e6
  226. time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
  227. // fire once immediately.
  228. // ensures we dial the seeds right away if the book is empty
  229. r.ensurePeers()
  230. // fire periodically
  231. ticker := time.NewTicker(r.ensurePeersPeriod)
  232. for {
  233. select {
  234. case <-ticker.C:
  235. r.ensurePeers()
  236. case <-r.Quit:
  237. ticker.Stop()
  238. return
  239. }
  240. }
  241. }
  242. // ensurePeers ensures that sufficient peers are connected. (once)
  243. //
  244. // heuristic that we haven't perfected yet, or, perhaps is manually edited by
  245. // the node operator. It should not be used to compute what addresses are
  246. // already connected or not.
  247. func (r *PEXReactor) ensurePeers() {
  248. numOutPeers, numInPeers, numDialing := r.Switch.NumPeers()
  249. numToDial := defaultMinNumOutboundPeers - (numOutPeers + numDialing)
  250. r.Logger.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial)
  251. if numToDial <= 0 {
  252. return
  253. }
  254. // bias to prefer more vetted peers when we have fewer connections.
  255. // not perfect, but somewhate ensures that we prioritize connecting to more-vetted
  256. // NOTE: range here is [10, 90]. Too high ?
  257. newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
  258. toDial := make(map[ID]*NetAddress)
  259. // Try maxAttempts times to pick numToDial addresses to dial
  260. maxAttempts := numToDial * 3
  261. for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ {
  262. try := r.book.PickAddress(newBias)
  263. if try == nil {
  264. continue
  265. }
  266. if _, selected := toDial[try.ID]; selected {
  267. continue
  268. }
  269. if dialling := r.Switch.IsDialing(try.ID); dialling {
  270. continue
  271. }
  272. if connected := r.Switch.Peers().Has(try.ID); connected {
  273. continue
  274. }
  275. r.Logger.Info("Will dial address", "addr", try)
  276. toDial[try.ID] = try
  277. }
  278. // Dial picked addresses
  279. for _, item := range toDial {
  280. go func(picked *NetAddress) {
  281. _, err := r.Switch.DialPeerWithAddress(picked, false)
  282. if err != nil {
  283. r.book.MarkAttempt(picked)
  284. }
  285. }(item)
  286. }
  287. // If we need more addresses, pick a random peer and ask for more.
  288. if r.book.NeedMoreAddrs() {
  289. peers := r.Switch.Peers().List()
  290. peersCount := len(peers)
  291. if peersCount > 0 {
  292. peer := peers[rand.Int()%peersCount] // nolint: gas
  293. r.Logger.Info("We need more addresses. Sending pexRequest to random peer", "peer", peer)
  294. r.RequestAddrs(peer)
  295. }
  296. }
  297. // If we are not connected to nor dialing anybody, fallback to dialing a seed.
  298. if numOutPeers+numInPeers+numDialing+len(toDial) == 0 {
  299. r.Logger.Info("No addresses to dial nor connected peers. Falling back to seeds")
  300. r.dialSeeds()
  301. }
  302. }
  303. // check seed addresses are well formed
  304. func (r *PEXReactor) checkSeeds() error {
  305. lSeeds := len(r.config.Seeds)
  306. if lSeeds == 0 {
  307. return nil
  308. }
  309. _, errs := NewNetAddressStrings(r.config.Seeds)
  310. for _, err := range errs {
  311. if err != nil {
  312. return err
  313. }
  314. }
  315. return nil
  316. }
  317. // randomly dial seeds until we connect to one or exhaust them
  318. func (r *PEXReactor) dialSeeds() {
  319. lSeeds := len(r.config.Seeds)
  320. if lSeeds == 0 {
  321. return
  322. }
  323. seedAddrs, _ := NewNetAddressStrings(r.config.Seeds)
  324. perm := r.Switch.rng.Perm(lSeeds)
  325. for _, i := range perm {
  326. // dial a random seed
  327. seedAddr := seedAddrs[i]
  328. peer, err := r.Switch.DialPeerWithAddress(seedAddr, false)
  329. if err != nil {
  330. r.Switch.Logger.Error("Error dialing seed", "err", err, "seed", seedAddr)
  331. } else {
  332. r.Switch.Logger.Info("Connected to seed", "peer", peer)
  333. return
  334. }
  335. }
  336. r.Switch.Logger.Error("Couldn't connect to any seeds")
  337. }
  338. //----------------------------------------------------------
  339. // Explores the network searching for more peers. (continuous)
  340. // Seed/Crawler Mode causes this node to quickly disconnect
  341. // from peers, except other seed nodes.
  342. func (r *PEXReactor) crawlPeersRoutine() {
  343. // Do an initial crawl
  344. r.crawlPeers()
  345. // Fire periodically
  346. ticker := time.NewTicker(defaultCrawlPeersPeriod)
  347. for {
  348. select {
  349. case <-ticker.C:
  350. r.attemptDisconnects()
  351. r.crawlPeers()
  352. case <-r.Quit:
  353. return
  354. }
  355. }
  356. }
  357. // crawlPeerInfo handles temporary data needed for the
  358. // network crawling performed during seed/crawler mode.
  359. type crawlPeerInfo struct {
  360. // The listening address of a potential peer we learned about
  361. Addr *NetAddress
  362. // The last time we attempt to reach this address
  363. LastAttempt time.Time
  364. // The last time we successfully reached this address
  365. LastSuccess time.Time
  366. }
  367. // oldestFirst implements sort.Interface for []crawlPeerInfo
  368. // based on the LastAttempt field.
  369. type oldestFirst []crawlPeerInfo
  370. func (of oldestFirst) Len() int { return len(of) }
  371. func (of oldestFirst) Swap(i, j int) { of[i], of[j] = of[j], of[i] }
  372. func (of oldestFirst) Less(i, j int) bool { return of[i].LastAttempt.Before(of[j].LastAttempt) }
  373. // getPeersToCrawl returns addresses of potential peers that we wish to validate.
  374. // NOTE: The status information is ordered as described above.
  375. func (r *PEXReactor) getPeersToCrawl() []crawlPeerInfo {
  376. var of oldestFirst
  377. // TODO: be more selective
  378. addrs := r.book.ListOfKnownAddresses()
  379. for _, addr := range addrs {
  380. if len(addr.ID()) == 0 {
  381. continue // dont use peers without id
  382. }
  383. of = append(of, crawlPeerInfo{
  384. Addr: addr.Addr,
  385. LastAttempt: addr.LastAttempt,
  386. LastSuccess: addr.LastSuccess,
  387. })
  388. }
  389. sort.Sort(of)
  390. return of
  391. }
  392. // crawlPeers will crawl the network looking for new peer addresses. (once)
  393. func (r *PEXReactor) crawlPeers() {
  394. peerInfos := r.getPeersToCrawl()
  395. now := time.Now()
  396. // Use addresses we know of to reach additional peers
  397. for _, pi := range peerInfos {
  398. // Do not attempt to connect with peers we recently dialed
  399. if now.Sub(pi.LastAttempt) < defaultCrawlPeerInterval {
  400. continue
  401. }
  402. // Otherwise, attempt to connect with the known address
  403. _, err := r.Switch.DialPeerWithAddress(pi.Addr, false)
  404. if err != nil {
  405. r.book.MarkAttempt(pi.Addr)
  406. continue
  407. }
  408. }
  409. // Crawl the connected peers asking for more addresses
  410. for _, pi := range peerInfos {
  411. // We will wait a minimum period of time before crawling peers again
  412. if now.Sub(pi.LastAttempt) >= defaultCrawlPeerInterval {
  413. peer := r.Switch.Peers().Get(pi.Addr.ID)
  414. if peer != nil {
  415. r.RequestAddrs(peer)
  416. }
  417. }
  418. }
  419. }
  420. // attemptDisconnects checks if we've been with each peer long enough to disconnect
  421. func (r *PEXReactor) attemptDisconnects() {
  422. for _, peer := range r.Switch.Peers().List() {
  423. status := peer.Status()
  424. if status.Duration < defaultSeedDisconnectWaitPeriod {
  425. continue
  426. }
  427. if peer.IsPersistent() {
  428. continue
  429. }
  430. r.Switch.StopPeerGracefully(peer)
  431. }
  432. }
  433. //-----------------------------------------------------------------------------
  434. // Messages
  435. const (
  436. msgTypeRequest = byte(0x01)
  437. msgTypeAddrs = byte(0x02)
  438. )
  439. // PexMessage is a primary type for PEX messages. Underneath, it could contain
  440. // either pexRequestMessage, or pexAddrsMessage messages.
  441. type PexMessage interface{}
  442. var _ = wire.RegisterInterface(
  443. struct{ PexMessage }{},
  444. wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
  445. wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
  446. )
  447. // DecodeMessage implements interface registered above.
  448. func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
  449. msgType = bz[0]
  450. n := new(int)
  451. r := bytes.NewReader(bz)
  452. msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
  453. return
  454. }
  455. /*
  456. A pexRequestMessage requests additional peer addresses.
  457. */
  458. type pexRequestMessage struct {
  459. }
  460. func (m *pexRequestMessage) String() string {
  461. return "[pexRequest]"
  462. }
  463. /*
  464. A message with announced peer addresses.
  465. */
  466. type pexAddrsMessage struct {
  467. Addrs []*NetAddress
  468. }
  469. func (m *pexAddrsMessage) String() string {
  470. return fmt.Sprintf("[pexAddrs %v]", m.Addrs)
  471. }