You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

263 lines
6.2 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "math/rand"
  7. "reflect"
  8. "time"
  9. . "github.com/tendermint/go-common"
  10. "github.com/tendermint/go-wire"
  11. )
  12. var pexErrInvalidMessage = errors.New("Invalid PEX message")
  13. const (
  14. PexChannel = byte(0x00)
  15. ensurePeersPeriodSeconds = 30
  16. minNumOutboundPeers = 10
  17. maxPexMessageSize = 1048576 // 1MB
  18. )
  19. /*
  20. PEXReactor handles PEX (peer exchange) and ensures that an
  21. adequate number of peers are connected to the switch.
  22. */
  23. type PEXReactor struct {
  24. BaseReactor
  25. sw *Switch
  26. book *AddrBook
  27. }
  28. func NewPEXReactor(book *AddrBook) *PEXReactor {
  29. pexR := &PEXReactor{
  30. book: book,
  31. }
  32. pexR.BaseReactor = *NewBaseReactor(log, "PEXReactor", pexR)
  33. return pexR
  34. }
  35. func (pexR *PEXReactor) OnStart() error {
  36. pexR.BaseReactor.OnStart()
  37. pexR.book.OnStart()
  38. go pexR.ensurePeersRoutine()
  39. return nil
  40. }
  41. func (pexR *PEXReactor) OnStop() {
  42. pexR.BaseReactor.OnStop()
  43. pexR.book.OnStop()
  44. }
  45. // Implements Reactor
  46. func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor {
  47. return []*ChannelDescriptor{
  48. &ChannelDescriptor{
  49. ID: PexChannel,
  50. Priority: 1,
  51. SendQueueCapacity: 10,
  52. },
  53. }
  54. }
  55. // Implements Reactor
  56. func (pexR *PEXReactor) AddPeer(peer *Peer) {
  57. // Add the peer to the address book
  58. netAddr, err := NewNetAddressString(peer.ListenAddr)
  59. if err != nil {
  60. // this should never happen
  61. log.Error("Error in AddPeer: invalid peer address", "addr", peer.ListenAddr, "error", err)
  62. return
  63. }
  64. if peer.IsOutbound() {
  65. if pexR.book.NeedMoreAddrs() {
  66. pexR.RequestPEX(peer)
  67. }
  68. } else {
  69. // For inbound connections, the peer is its own source
  70. // (For outbound peers, the address is already in the books)
  71. pexR.book.AddAddress(netAddr, netAddr)
  72. }
  73. }
  74. // Implements Reactor
  75. func (pexR *PEXReactor) RemovePeer(peer *Peer, reason interface{}) {
  76. // TODO
  77. }
  78. // Implements Reactor
  79. // Handles incoming PEX messages.
  80. func (pexR *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
  81. // decode message
  82. _, msg, err := DecodeMessage(msgBytes)
  83. if err != nil {
  84. log.Warn("Error decoding message", "error", err)
  85. return
  86. }
  87. log.Notice("Received message", "msg", msg)
  88. switch msg := msg.(type) {
  89. case *pexRequestMessage:
  90. // src requested some peers.
  91. // TODO: prevent abuse.
  92. pexR.SendAddrs(src, pexR.book.GetSelection())
  93. case *pexAddrsMessage:
  94. // We received some peer addresses from src.
  95. // TODO: prevent abuse.
  96. // (We don't want to get spammed with bad peers)
  97. srcAddr := src.Connection().RemoteAddress
  98. for _, addr := range msg.Addrs {
  99. pexR.book.AddAddress(addr, srcAddr)
  100. }
  101. default:
  102. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  103. }
  104. }
  105. // Asks peer for more addresses.
  106. func (pexR *PEXReactor) RequestPEX(peer *Peer) {
  107. peer.Send(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
  108. }
  109. func (pexR *PEXReactor) SendAddrs(peer *Peer, addrs []*NetAddress) {
  110. peer.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
  111. }
  112. // Ensures that sufficient peers are connected. (continuous)
  113. func (pexR *PEXReactor) ensurePeersRoutine() {
  114. // Randomize when routine starts
  115. time.Sleep(time.Duration(rand.Int63n(500*ensurePeersPeriodSeconds)) * time.Millisecond)
  116. // fire once immediately.
  117. pexR.ensurePeers()
  118. // fire periodically
  119. timer := NewRepeatTimer("pex", ensurePeersPeriodSeconds*time.Second)
  120. FOR_LOOP:
  121. for {
  122. select {
  123. case <-timer.Ch:
  124. pexR.ensurePeers()
  125. case <-pexR.Quit:
  126. break FOR_LOOP
  127. }
  128. }
  129. // Cleanup
  130. timer.Stop()
  131. }
  132. // Ensures that sufficient peers are connected. (once)
  133. func (pexR *PEXReactor) ensurePeers() {
  134. numOutPeers, _, numDialing := pexR.Switch.NumPeers()
  135. numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
  136. log.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial)
  137. if numToDial <= 0 {
  138. return
  139. }
  140. toDial := NewCMap()
  141. // Try to pick numToDial addresses to dial.
  142. // TODO: improve logic.
  143. for i := 0; i < numToDial; i++ {
  144. newBias := MinInt(numOutPeers, 8)*10 + 10
  145. var picked *NetAddress
  146. // Try to fetch a new peer 3 times.
  147. // This caps the maximum number of tries to 3 * numToDial.
  148. for j := 0; j < 3; j++ {
  149. try := pexR.book.PickAddress(newBias)
  150. if try == nil {
  151. break
  152. }
  153. alreadySelected := toDial.Has(try.IP.String())
  154. alreadyDialing := pexR.Switch.IsDialing(try)
  155. alreadyConnected := pexR.Switch.Peers().Has(try.IP.String())
  156. if alreadySelected || alreadyDialing || alreadyConnected {
  157. /*
  158. log.Info("Cannot dial address", "addr", try,
  159. "alreadySelected", alreadySelected,
  160. "alreadyDialing", alreadyDialing,
  161. "alreadyConnected", alreadyConnected)
  162. */
  163. continue
  164. } else {
  165. log.Info("Will dial address", "addr", try)
  166. picked = try
  167. break
  168. }
  169. }
  170. if picked == nil {
  171. continue
  172. }
  173. toDial.Set(picked.IP.String(), picked)
  174. }
  175. // Dial picked addresses
  176. for _, item := range toDial.Values() {
  177. go func(picked *NetAddress) {
  178. _, err := pexR.Switch.DialPeerWithAddress(picked)
  179. if err != nil {
  180. pexR.book.MarkAttempt(picked)
  181. }
  182. }(item.(*NetAddress))
  183. }
  184. // If we need more addresses, pick a random peer and ask for more.
  185. if pexR.book.NeedMoreAddrs() {
  186. if peers := pexR.Switch.Peers().List(); len(peers) > 0 {
  187. i := rand.Int() % len(peers)
  188. peer := peers[i]
  189. log.Info("No addresses to dial. Sending pexRequest to random peer", "peer", peer)
  190. pexR.RequestPEX(peer)
  191. }
  192. }
  193. }
  194. //-----------------------------------------------------------------------------
  195. // Messages
  196. const (
  197. msgTypeRequest = byte(0x01)
  198. msgTypeAddrs = byte(0x02)
  199. )
  200. type PexMessage interface{}
  201. var _ = wire.RegisterInterface(
  202. struct{ PexMessage }{},
  203. wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
  204. wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
  205. )
  206. func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
  207. msgType = bz[0]
  208. n := new(int)
  209. r := bytes.NewReader(bz)
  210. msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
  211. return
  212. }
  213. /*
  214. A pexRequestMessage requests additional peer addresses.
  215. */
  216. type pexRequestMessage struct {
  217. }
  218. func (m *pexRequestMessage) String() string {
  219. return "[pexRequest]"
  220. }
  221. /*
  222. A message with announced peer addresses.
  223. */
  224. type pexAddrsMessage struct {
  225. Addrs []*NetAddress
  226. }
  227. func (m *pexAddrsMessage) String() string {
  228. return fmt.Sprintf("[pexAddrs %v]", m.Addrs)
  229. }