You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

406 lines
11 KiB

9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
9 years ago
8 years ago
7 years ago
9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bytes"
  4. "fmt"
  5. "math/rand"
  6. "reflect"
  7. "time"
  8. "github.com/pkg/errors"
  9. wire "github.com/tendermint/go-wire"
  10. cmn "github.com/tendermint/tmlibs/common"
  11. )
  12. const (
  13. // PexChannel is a channel for PEX messages
  14. PexChannel = byte(0x00)
  15. // period to ensure peers connected
  16. defaultEnsurePeersPeriod = 30 * time.Second
  17. minNumOutboundPeers = 10
  18. maxPexMessageSize = 1048576 // 1MB
  19. )
  20. // PEXReactor handles PEX (peer exchange) and ensures that an
  21. // adequate number of peers are connected to the switch.
  22. //
  23. // It uses `AddrBook` (address book) to store `NetAddress`es of the peers.
  24. //
  25. // ## Preventing abuse
  26. //
  27. // Only accept pexAddrsMsg from peers we sent a corresponding pexRequestMsg too.
  28. // Only accept one pexRequestMsg every ~defaultEnsurePeersPeriod.
  29. type PEXReactor struct {
  30. BaseReactor
  31. book *AddrBook
  32. config *PEXReactorConfig
  33. ensurePeersPeriod time.Duration
  34. // maps to prevent abuse
  35. requestsSent *cmn.CMap // ID->struct{}: unanswered send requests
  36. lastReceivedRequests *cmn.CMap // ID->time.Time: last time peer requested from us
  37. }
  38. // PEXReactorConfig holds reactor specific configuration data.
  39. type PEXReactorConfig struct {
  40. // Seeds is a list of addresses reactor may use if it can't connect to peers
  41. // in the addrbook.
  42. Seeds []string
  43. }
  44. // NewPEXReactor creates new PEX reactor.
  45. func NewPEXReactor(b *AddrBook, config *PEXReactorConfig) *PEXReactor {
  46. r := &PEXReactor{
  47. book: b,
  48. config: config,
  49. ensurePeersPeriod: defaultEnsurePeersPeriod,
  50. requestsSent: cmn.NewCMap(),
  51. lastReceivedRequests: cmn.NewCMap(),
  52. }
  53. r.BaseReactor = *NewBaseReactor("PEXReactor", r)
  54. return r
  55. }
  56. // OnStart implements BaseService
  57. func (r *PEXReactor) OnStart() error {
  58. if err := r.BaseReactor.OnStart(); err != nil {
  59. return err
  60. }
  61. err := r.book.Start()
  62. if err != nil && err != cmn.ErrAlreadyStarted {
  63. return err
  64. }
  65. // return err if user provided a bad seed address
  66. if err := r.checkSeeds(); err != nil {
  67. return err
  68. }
  69. go r.ensurePeersRoutine()
  70. return nil
  71. }
  72. // OnStop implements BaseService
  73. func (r *PEXReactor) OnStop() {
  74. r.BaseReactor.OnStop()
  75. r.book.Stop()
  76. }
  77. // GetChannels implements Reactor
  78. func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
  79. return []*ChannelDescriptor{
  80. {
  81. ID: PexChannel,
  82. Priority: 1,
  83. SendQueueCapacity: 10,
  84. },
  85. }
  86. }
  87. // AddPeer implements Reactor by adding peer to the address book (if inbound)
  88. // or by requesting more addresses (if outbound).
  89. func (r *PEXReactor) AddPeer(p Peer) {
  90. if p.IsOutbound() {
  91. // For outbound peers, the address is already in the books -
  92. // either via DialPeersAsync or r.Receive.
  93. // Ask it for more peers if we need.
  94. if r.book.NeedMoreAddrs() {
  95. r.RequestPEX(p)
  96. }
  97. } else {
  98. // For inbound peers, the peer is its own source,
  99. // and its NodeInfo has already been validated.
  100. // Let the ensurePeersRoutine handle asking for more
  101. // peers when we need - we don't trust inbound peers as much.
  102. addr := p.NodeInfo().NetAddress()
  103. r.book.AddAddress(addr, addr)
  104. }
  105. }
  106. // RemovePeer implements Reactor.
  107. func (r *PEXReactor) RemovePeer(p Peer, reason interface{}) {
  108. id := string(p.ID())
  109. r.requestsSent.Delete(id)
  110. r.lastReceivedRequests.Delete(id)
  111. }
  112. // Receive implements Reactor by handling incoming PEX messages.
  113. func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) {
  114. _, msg, err := DecodeMessage(msgBytes)
  115. if err != nil {
  116. r.Logger.Error("Error decoding message", "err", err)
  117. return
  118. }
  119. r.Logger.Debug("Received message", "src", src, "chId", chID, "msg", msg)
  120. switch msg := msg.(type) {
  121. case *pexRequestMessage:
  122. // We received a request for peers from src.
  123. if err := r.receiveRequest(src); err != nil {
  124. r.Switch.StopPeerForError(src, err)
  125. return
  126. }
  127. r.SendAddrs(src, r.book.GetSelection())
  128. case *pexAddrsMessage:
  129. // We received some peer addresses from src.
  130. if err := r.ReceivePEX(msg.Addrs, src); err != nil {
  131. r.Switch.StopPeerForError(src, err)
  132. return
  133. }
  134. default:
  135. r.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
  136. }
  137. }
  138. func (r *PEXReactor) receiveRequest(src Peer) error {
  139. id := string(src.ID())
  140. v := r.lastReceivedRequests.Get(id)
  141. if v == nil {
  142. // initialize with empty time
  143. lastReceived := time.Time{}
  144. r.lastReceivedRequests.Set(id, lastReceived)
  145. return nil
  146. }
  147. lastReceived := v.(time.Time)
  148. if lastReceived.Equal(time.Time{}) {
  149. // first time gets a free pass. then we start tracking the time
  150. lastReceived = time.Now()
  151. r.lastReceivedRequests.Set(id, lastReceived)
  152. return nil
  153. }
  154. now := time.Now()
  155. if now.Sub(lastReceived) < r.ensurePeersPeriod/3 {
  156. return fmt.Errorf("Peer (%v) is sending too many PEX requests. Disconnecting", src.ID())
  157. }
  158. r.lastReceivedRequests.Set(id, now)
  159. return nil
  160. }
  161. // RequestPEX asks peer for more addresses if we do not already
  162. // have a request out for this peer.
  163. func (r *PEXReactor) RequestPEX(p Peer) {
  164. id := string(p.ID())
  165. if r.requestsSent.Has(id) {
  166. return
  167. }
  168. r.requestsSent.Set(id, struct{}{})
  169. p.Send(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
  170. }
  171. // ReceivePEX adds the given addrs to the addrbook if theres an open
  172. // request for this peer and deletes the open request.
  173. // If there's no open request for the src peer, it returns an error.
  174. func (r *PEXReactor) ReceivePEX(addrs []*NetAddress, src Peer) error {
  175. id := string(src.ID())
  176. if !r.requestsSent.Has(id) {
  177. return errors.New("Received unsolicited pexAddrsMessage")
  178. }
  179. r.requestsSent.Delete(id)
  180. srcAddr := src.NodeInfo().NetAddress()
  181. for _, netAddr := range addrs {
  182. if netAddr != nil {
  183. r.book.AddAddress(netAddr, srcAddr)
  184. }
  185. }
  186. return nil
  187. }
  188. // SendAddrs sends addrs to the peer.
  189. func (r *PEXReactor) SendAddrs(p Peer, netAddrs []*NetAddress) {
  190. p.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: netAddrs}})
  191. }
  192. // SetEnsurePeersPeriod sets period to ensure peers connected.
  193. func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) {
  194. r.ensurePeersPeriod = d
  195. }
  196. // Ensures that sufficient peers are connected. (continuous)
  197. func (r *PEXReactor) ensurePeersRoutine() {
  198. // Randomize when routine starts
  199. ensurePeersPeriodMs := r.ensurePeersPeriod.Nanoseconds() / 1e6
  200. time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
  201. // fire once immediately.
  202. // ensures we dial the seeds right away if the book is empty
  203. r.ensurePeers()
  204. // fire periodically
  205. ticker := time.NewTicker(r.ensurePeersPeriod)
  206. for {
  207. select {
  208. case <-ticker.C:
  209. r.ensurePeers()
  210. case <-r.Quit:
  211. ticker.Stop()
  212. return
  213. }
  214. }
  215. }
  216. // ensurePeers ensures that sufficient peers are connected. (once)
  217. //
  218. // Old bucket / New bucket are arbitrary categories to denote whether an
  219. // address is vetted or not, and this needs to be determined over time via a
  220. // heuristic that we haven't perfected yet, or, perhaps is manually edited by
  221. // the node operator. It should not be used to compute what addresses are
  222. // already connected or not.
  223. //
  224. // TODO Basically, we need to work harder on our good-peer/bad-peer marking.
  225. // What we're currently doing in terms of marking good/bad peers is just a
  226. // placeholder. It should not be the case that an address becomes old/vetted
  227. // upon a single successful connection.
  228. func (r *PEXReactor) ensurePeers() {
  229. numOutPeers, numInPeers, numDialing := r.Switch.NumPeers()
  230. numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
  231. r.Logger.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial)
  232. if numToDial <= 0 {
  233. return
  234. }
  235. // bias to prefer more vetted peers when we have fewer connections.
  236. // not perfect, but somewhate ensures that we prioritize connecting to more-vetted
  237. // NOTE: range here is [10, 90]. Too high ?
  238. newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
  239. toDial := make(map[ID]*NetAddress)
  240. // Try maxAttempts times to pick numToDial addresses to dial
  241. maxAttempts := numToDial * 3
  242. for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ {
  243. try := r.book.PickAddress(newBias)
  244. if try == nil {
  245. continue
  246. }
  247. if _, selected := toDial[try.ID]; selected {
  248. continue
  249. }
  250. if dialling := r.Switch.IsDialing(try.ID); dialling {
  251. continue
  252. }
  253. if connected := r.Switch.Peers().Has(try.ID); connected {
  254. continue
  255. }
  256. r.Logger.Info("Will dial address", "addr", try)
  257. toDial[try.ID] = try
  258. }
  259. // Dial picked addresses
  260. for _, item := range toDial {
  261. go func(picked *NetAddress) {
  262. _, err := r.Switch.DialPeerWithAddress(picked, false)
  263. if err != nil {
  264. r.book.MarkAttempt(picked)
  265. }
  266. }(item)
  267. }
  268. // If we need more addresses, pick a random peer and ask for more.
  269. if r.book.NeedMoreAddrs() {
  270. peers := r.Switch.Peers().List()
  271. peersCount := len(peers)
  272. if peersCount > 0 {
  273. peer := peers[rand.Int()%peersCount] // nolint: gas
  274. r.Logger.Info("We need more addresses. Sending pexRequest to random peer", "peer", peer)
  275. r.RequestPEX(peer)
  276. }
  277. }
  278. // If we are not connected to nor dialing anybody, fallback to dialing a seed.
  279. if numOutPeers+numInPeers+numDialing+len(toDial) == 0 {
  280. r.Logger.Info("No addresses to dial nor connected peers. Falling back to seeds")
  281. r.dialSeed()
  282. }
  283. }
  284. // check seed addresses are well formed
  285. func (r *PEXReactor) checkSeeds() error {
  286. lSeeds := len(r.config.Seeds)
  287. if lSeeds == 0 {
  288. return nil
  289. }
  290. _, errs := NewNetAddressStrings(r.config.Seeds)
  291. for _, err := range errs {
  292. if err != nil {
  293. return err
  294. }
  295. }
  296. return nil
  297. }
  298. // randomly dial seeds until we connect to one or exhaust them
  299. func (r *PEXReactor) dialSeed() {
  300. lSeeds := len(r.config.Seeds)
  301. if lSeeds == 0 {
  302. return
  303. }
  304. seedAddrs, _ := NewNetAddressStrings(r.config.Seeds)
  305. perm := r.Switch.rng.Perm(lSeeds)
  306. for _, i := range perm {
  307. // dial a random seed
  308. seedAddr := seedAddrs[i]
  309. peer, err := r.Switch.DialPeerWithAddress(seedAddr, false)
  310. if err != nil {
  311. r.Switch.Logger.Error("Error dialing seed", "err", err, "seed", seedAddr)
  312. } else {
  313. r.Switch.Logger.Info("Connected to seed", "peer", peer)
  314. return
  315. }
  316. }
  317. r.Switch.Logger.Error("Couldn't connect to any seeds")
  318. }
  319. //-----------------------------------------------------------------------------
  320. // Messages
  321. const (
  322. msgTypeRequest = byte(0x01)
  323. msgTypeAddrs = byte(0x02)
  324. )
  325. // PexMessage is a primary type for PEX messages. Underneath, it could contain
  326. // either pexRequestMessage, or pexAddrsMessage messages.
  327. type PexMessage interface{}
  328. var _ = wire.RegisterInterface(
  329. struct{ PexMessage }{},
  330. wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
  331. wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
  332. )
  333. // DecodeMessage implements interface registered above.
  334. func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
  335. msgType = bz[0]
  336. n := new(int)
  337. r := bytes.NewReader(bz)
  338. msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
  339. return
  340. }
  341. /*
  342. A pexRequestMessage requests additional peer addresses.
  343. */
  344. type pexRequestMessage struct {
  345. }
  346. func (m *pexRequestMessage) String() string {
  347. return "[pexRequest]"
  348. }
  349. /*
  350. A message with announced peer addresses.
  351. */
  352. type pexAddrsMessage struct {
  353. Addrs []*NetAddress
  354. }
  355. func (m *pexAddrsMessage) String() string {
  356. return fmt.Sprintf("[pexAddrs %v]", m.Addrs)
  357. }