You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

273 lines
6.4 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package p2p
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "math/rand"
  7. "reflect"
  8. "sync/atomic"
  9. "time"
  10. "github.com/tendermint/tendermint/binary"
  11. . "github.com/tendermint/tendermint/common"
  12. "github.com/tendermint/tendermint/events"
  13. )
  14. var pexErrInvalidMessage = errors.New("Invalid PEX message")
  15. const (
  16. PexChannel = byte(0x00)
  17. ensurePeersPeriodSeconds = 30
  18. minNumOutboundPeers = 10
  19. )
  20. /*
  21. PEXReactor handles PEX (peer exchange) and ensures that an
  22. adequate number of peers are connected to the switch.
  23. */
  24. type PEXReactor struct {
  25. sw *Switch
  26. quit chan struct{}
  27. started uint32
  28. stopped uint32
  29. book *AddrBook
  30. evsw events.Fireable
  31. }
  32. func NewPEXReactor(book *AddrBook) *PEXReactor {
  33. pexR := &PEXReactor{
  34. quit: make(chan struct{}),
  35. book: book,
  36. }
  37. return pexR
  38. }
  39. // Implements Reactor
  40. func (pexR *PEXReactor) Start(sw *Switch) {
  41. if atomic.CompareAndSwapUint32(&pexR.started, 0, 1) {
  42. log.Notice("Starting PEXReactor")
  43. pexR.sw = sw
  44. go pexR.ensurePeersRoutine()
  45. }
  46. }
  47. // Implements Reactor
  48. func (pexR *PEXReactor) Stop() {
  49. if atomic.CompareAndSwapUint32(&pexR.stopped, 0, 1) {
  50. log.Notice("Stopping PEXReactor")
  51. close(pexR.quit)
  52. }
  53. }
  54. // Implements Reactor
  55. func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor {
  56. return []*ChannelDescriptor{
  57. &ChannelDescriptor{
  58. Id: PexChannel,
  59. Priority: 1,
  60. SendQueueCapacity: 10,
  61. },
  62. }
  63. }
  64. // Implements Reactor
  65. func (pexR *PEXReactor) AddPeer(peer *Peer) {
  66. // Add the peer to the address book
  67. netAddr := NewNetAddressString(fmt.Sprintf("%s:%d", peer.Host, peer.P2PPort))
  68. if peer.IsOutbound() {
  69. if pexR.book.NeedMoreAddrs() {
  70. pexR.RequestPEX(peer)
  71. }
  72. } else {
  73. // For inbound connections, the peer is its own source
  74. // (For outbound peers, the address is already in the books)
  75. pexR.book.AddAddress(netAddr, netAddr)
  76. }
  77. }
  78. // Implements Reactor
  79. func (pexR *PEXReactor) RemovePeer(peer *Peer, reason interface{}) {
  80. // TODO
  81. }
  82. // Implements Reactor
  83. // Handles incoming PEX messages.
  84. func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) {
  85. // decode message
  86. _, msg, err := DecodeMessage(msgBytes)
  87. if err != nil {
  88. log.Warn("Error decoding message", "error", err)
  89. return
  90. }
  91. log.Notice("Received message", "msg", msg)
  92. switch msg := msg.(type) {
  93. case *pexRequestMessage:
  94. // src requested some peers.
  95. // TODO: prevent abuse.
  96. pexR.SendAddrs(src, pexR.book.GetSelection())
  97. case *pexAddrsMessage:
  98. // We received some peer addresses from src.
  99. // TODO: prevent abuse.
  100. // (We don't want to get spammed with bad peers)
  101. srcAddr := src.Connection().RemoteAddress
  102. for _, addr := range msg.Addrs {
  103. pexR.book.AddAddress(addr, srcAddr)
  104. }
  105. default:
  106. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  107. }
  108. }
  109. // Asks peer for more addresses.
  110. func (pexR *PEXReactor) RequestPEX(peer *Peer) {
  111. peer.Send(PexChannel, &pexRequestMessage{})
  112. }
  113. func (pexR *PEXReactor) SendAddrs(peer *Peer, addrs []*NetAddress) {
  114. peer.Send(PexChannel, &pexAddrsMessage{Addrs: addrs})
  115. }
  116. // Ensures that sufficient peers are connected. (continuous)
  117. func (pexR *PEXReactor) ensurePeersRoutine() {
  118. // Randomize when routine starts
  119. time.Sleep(time.Duration(rand.Int63n(500*ensurePeersPeriodSeconds)) * time.Millisecond)
  120. // fire once immediately.
  121. pexR.ensurePeers()
  122. // fire periodically
  123. timer := NewRepeatTimer("pex", ensurePeersPeriodSeconds*time.Second)
  124. FOR_LOOP:
  125. for {
  126. select {
  127. case <-timer.Ch:
  128. pexR.ensurePeers()
  129. case <-pexR.quit:
  130. break FOR_LOOP
  131. }
  132. }
  133. // Cleanup
  134. timer.Stop()
  135. }
  136. // Ensures that sufficient peers are connected. (once)
  137. func (pexR *PEXReactor) ensurePeers() {
  138. numOutPeers, _, numDialing := pexR.sw.NumPeers()
  139. numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
  140. log.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial)
  141. if numToDial <= 0 {
  142. return
  143. }
  144. toDial := NewCMap()
  145. // Try to pick numToDial addresses to dial.
  146. // TODO: improve logic.
  147. for i := 0; i < numToDial; i++ {
  148. newBias := MinInt(numOutPeers, 8)*10 + 10
  149. var picked *NetAddress
  150. // Try to fetch a new peer 3 times.
  151. // This caps the maximum number of tries to 3 * numToDial.
  152. for j := 0; j < 3; j++ {
  153. try := pexR.book.PickAddress(newBias)
  154. if try == nil {
  155. break
  156. }
  157. alreadySelected := toDial.Has(try.IP.String())
  158. alreadyDialing := pexR.sw.IsDialing(try)
  159. alreadyConnected := pexR.sw.Peers().Has(try.IP.String())
  160. if alreadySelected || alreadyDialing || alreadyConnected {
  161. /*
  162. log.Info("Cannot dial address", "addr", try,
  163. "alreadySelected", alreadySelected,
  164. "alreadyDialing", alreadyDialing,
  165. "alreadyConnected", alreadyConnected)
  166. */
  167. continue
  168. } else {
  169. log.Info("Will dial address", "addr", try)
  170. picked = try
  171. break
  172. }
  173. }
  174. if picked == nil {
  175. continue
  176. }
  177. toDial.Set(picked.IP.String(), picked)
  178. }
  179. // Dial picked addresses
  180. for _, item := range toDial.Values() {
  181. go func(picked *NetAddress) {
  182. _, err := pexR.sw.DialPeerWithAddress(picked)
  183. if err != nil {
  184. pexR.book.MarkAttempt(picked)
  185. }
  186. }(item.(*NetAddress))
  187. }
  188. // If we need more addresses, pick a random peer and ask for more.
  189. if pexR.book.NeedMoreAddrs() {
  190. if peers := pexR.sw.Peers().List(); len(peers) > 0 {
  191. i := rand.Int() % len(peers)
  192. peer := peers[i]
  193. log.Info("No addresses to dial. Sending pexRequest to random peer", "peer", peer)
  194. pexR.RequestPEX(peer)
  195. }
  196. }
  197. }
  198. // implements events.Eventable
  199. func (pexR *PEXReactor) SetFireable(evsw events.Fireable) {
  200. pexR.evsw = evsw
  201. }
  202. //-----------------------------------------------------------------------------
  203. // Messages
  204. const (
  205. msgTypeRequest = byte(0x01)
  206. msgTypeAddrs = byte(0x02)
  207. )
  208. type PexMessage interface{}
  209. var _ = binary.RegisterInterface(
  210. struct{ PexMessage }{},
  211. binary.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
  212. binary.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
  213. )
  214. func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
  215. msgType = bz[0]
  216. n := new(int64)
  217. r := bytes.NewReader(bz)
  218. msg = binary.ReadBinary(struct{ PexMessage }{}, r, n, &err).(struct{ PexMessage }).PexMessage
  219. return
  220. }
  221. /*
  222. A pexRequestMessage requests additional peer addresses.
  223. */
  224. type pexRequestMessage struct {
  225. }
  226. func (m *pexRequestMessage) String() string {
  227. return "[pexRequest]"
  228. }
  229. /*
  230. A message with announced peer addresses.
  231. */
  232. type pexAddrsMessage struct {
  233. Addrs []*NetAddress
  234. }
  235. func (m *pexAddrsMessage) String() string {
  236. return fmt.Sprintf("[pexAddrs %v]", m.Addrs)
  237. }