You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

299 lines
7.0 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package p2p
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "sync/atomic"
  7. "time"
  8. "github.com/ebuchman/debora"
  9. "github.com/tendermint/tendermint2/binary"
  10. . "github.com/tendermint/tendermint2/common"
  11. )
  12. var pexErrInvalidMessage = errors.New("Invalid PEX message")
  13. const (
  14. PexChannel = byte(0x00)
  15. ensurePeersPeriodSeconds = 30
  16. minNumOutboundPeers = 10
  17. maxNumPeers = 50
  18. )
  19. /*
  20. PEXReactor handles PEX (peer exchange) and ensures that an
  21. adequate number of peers are connected to the switch.
  22. */
  23. type PEXReactor struct {
  24. sw *Switch
  25. quit chan struct{}
  26. started uint32
  27. stopped uint32
  28. book *AddrBook
  29. }
  30. func NewPEXReactor(book *AddrBook) *PEXReactor {
  31. pexR := &PEXReactor{
  32. quit: make(chan struct{}),
  33. book: book,
  34. }
  35. return pexR
  36. }
  37. // Implements Reactor
  38. func (pexR *PEXReactor) Start(sw *Switch) {
  39. if atomic.CompareAndSwapUint32(&pexR.started, 0, 1) {
  40. log.Info("Starting PEXReactor")
  41. pexR.sw = sw
  42. go pexR.ensurePeersRoutine()
  43. }
  44. }
  45. // Implements Reactor
  46. func (pexR *PEXReactor) Stop() {
  47. if atomic.CompareAndSwapUint32(&pexR.stopped, 0, 1) {
  48. log.Info("Stopping PEXReactor")
  49. close(pexR.quit)
  50. }
  51. }
  52. // Implements Reactor
  53. func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor {
  54. return []*ChannelDescriptor{
  55. &ChannelDescriptor{
  56. Id: PexChannel,
  57. Priority: 1,
  58. SendQueueCapacity: 10,
  59. },
  60. }
  61. }
  62. // Implements Reactor
  63. func (pexR *PEXReactor) AddPeer(peer *Peer) {
  64. if peer.IsOutbound() {
  65. pexR.SendAddrs(peer, pexR.book.OurAddresses())
  66. if pexR.book.NeedMoreAddrs() {
  67. pexR.RequestPEX(peer)
  68. }
  69. }
  70. }
  71. // Implements Reactor
  72. func (pexR *PEXReactor) RemovePeer(peer *Peer, reason interface{}) {
  73. // TODO
  74. }
  75. // Implements Reactor
  76. // Handles incoming PEX messages.
  77. func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) {
  78. // decode message
  79. msg, err := DecodeMessage(msgBytes)
  80. if err != nil {
  81. log.Warn("Error decoding message", "error", err)
  82. return
  83. }
  84. log.Info("Received message", "msg", msg)
  85. switch msg.(type) {
  86. case *pexHandshakeMessage:
  87. network := msg.(*pexHandshakeMessage).Network
  88. if network != pexR.sw.network {
  89. err := fmt.Sprintf("Peer is on a different chain/network. Got %s, expected %s", network, pexR.sw.network)
  90. pexR.sw.StopPeerForError(src, err)
  91. }
  92. case *pexRequestMessage:
  93. // src requested some peers.
  94. // TODO: prevent abuse.
  95. pexR.SendAddrs(src, pexR.book.GetSelection())
  96. case *pexAddrsMessage:
  97. // We received some peer addresses from src.
  98. // TODO: prevent abuse.
  99. // (We don't want to get spammed with bad peers)
  100. srcAddr := src.Connection().RemoteAddress
  101. for _, addr := range msg.(*pexAddrsMessage).Addrs {
  102. pexR.book.AddAddress(addr, srcAddr)
  103. }
  104. case *PexDeboraMessage:
  105. srcAddr := src.Connection().RemoteAddress.String()
  106. payload := msg.(*PexDeboraMessage).Payload
  107. log.Info(fmt.Sprintf("Received debora msg with payload %s or %x", payload, payload))
  108. if err := debora.Call(srcAddr, payload); err != nil {
  109. log.Info("Debora upgrade call failed.", "error", err)
  110. }
  111. default:
  112. // Ignore unknown message.
  113. }
  114. }
  115. // Asks peer for more addresses.
  116. func (pexR *PEXReactor) RequestPEX(peer *Peer) {
  117. peer.Send(PexChannel, &pexRequestMessage{})
  118. }
  119. func (pexR *PEXReactor) SendAddrs(peer *Peer, addrs []*NetAddress) {
  120. peer.Send(PexChannel, &pexAddrsMessage{Addrs: addrs})
  121. }
  122. // Ensures that sufficient peers are connected. (continuous)
  123. func (pexR *PEXReactor) ensurePeersRoutine() {
  124. // fire once immediately.
  125. pexR.ensurePeers()
  126. // fire periodically
  127. timer := NewRepeatTimer("pex", ensurePeersPeriodSeconds*time.Second)
  128. FOR_LOOP:
  129. for {
  130. select {
  131. case <-timer.Ch:
  132. pexR.ensurePeers()
  133. case <-pexR.quit:
  134. break FOR_LOOP
  135. }
  136. }
  137. // Cleanup
  138. timer.Stop()
  139. }
  140. // Ensures that sufficient peers are connected. (once)
  141. func (pexR *PEXReactor) ensurePeers() {
  142. numOutPeers, _, numDialing := pexR.sw.NumPeers()
  143. numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
  144. log.Debug("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial)
  145. if numToDial <= 0 {
  146. return
  147. }
  148. toDial := NewCMap()
  149. // Try to pick numToDial addresses to dial.
  150. // TODO: improve logic.
  151. for i := 0; i < numToDial; i++ {
  152. newBias := MinInt(numOutPeers, 8)*10 + 10
  153. var picked *NetAddress
  154. // Try to fetch a new peer 3 times.
  155. // This caps the maximum number of tries to 3 * numToDial.
  156. for j := 0; j < 3; j++ {
  157. try := pexR.book.PickAddress(newBias)
  158. if try == nil {
  159. break
  160. }
  161. alreadySelected := toDial.Has(try.String())
  162. alreadyDialing := pexR.sw.IsDialing(try)
  163. alreadyConnected := pexR.sw.Peers().Has(try.String())
  164. if alreadySelected || alreadyDialing || alreadyConnected {
  165. /*
  166. log.Debug("Cannot dial address", "addr", try,
  167. "alreadySelected", alreadySelected,
  168. "alreadyDialing", alreadyDialing,
  169. "alreadyConnected", alreadyConnected)
  170. */
  171. continue
  172. } else {
  173. log.Debug("Will dial address", "addr", try)
  174. picked = try
  175. break
  176. }
  177. }
  178. if picked == nil {
  179. continue
  180. }
  181. toDial.Set(picked.String(), picked)
  182. }
  183. // Dial picked addresses
  184. for _, item := range toDial.Values() {
  185. picked := item.(*NetAddress)
  186. go func() {
  187. _, err := pexR.sw.DialPeerWithAddress(picked)
  188. if err != nil {
  189. pexR.book.MarkAttempt(picked)
  190. }
  191. }()
  192. }
  193. }
  194. //-----------------------------------------------------------------------------
  195. // Messages
  196. const (
  197. msgTypeUnknown = byte(0x00)
  198. msgTypeRequest = byte(0x01)
  199. msgTypeAddrs = byte(0x02)
  200. msgTypeHandshake = byte(0x03)
  201. msgTypeDebora = byte(0x04)
  202. )
  203. // TODO: check for unnecessary extra bytes at the end.
  204. func DecodeMessage(bz []byte) (msg interface{}, err error) {
  205. n := new(int64)
  206. msgType := bz[0]
  207. r := bytes.NewReader(bz)
  208. // log.Debug(Fmt("decoding msg bytes: %X", bz))
  209. switch msgType {
  210. case msgTypeHandshake:
  211. msg = binary.ReadBinary(&pexHandshakeMessage{}, r, n, &err)
  212. case msgTypeRequest:
  213. msg = &pexRequestMessage{}
  214. case msgTypeAddrs:
  215. msg = binary.ReadBinary(&pexAddrsMessage{}, r, n, &err)
  216. case msgTypeDebora:
  217. msg = binary.ReadBinary(&PexDeboraMessage{}, r, n, &err)
  218. default:
  219. msg = nil
  220. }
  221. return
  222. }
  223. /*
  224. A pexHandshakeMessage contains the network identifier.
  225. */
  226. type pexHandshakeMessage struct {
  227. Network string
  228. }
  229. func (m *pexHandshakeMessage) TypeByte() byte { return msgTypeHandshake }
  230. func (m *pexHandshakeMessage) String() string {
  231. return "[pexHandshake]"
  232. }
  233. /*
  234. A pexRequestMessage requests additional peer addresses.
  235. */
  236. type pexRequestMessage struct {
  237. }
  238. func (m *pexRequestMessage) TypeByte() byte { return msgTypeRequest }
  239. func (m *pexRequestMessage) String() string {
  240. return "[pexRequest]"
  241. }
  242. /*
  243. A message with announced peer addresses.
  244. */
  245. type pexAddrsMessage struct {
  246. Addrs []*NetAddress
  247. }
  248. func (m *pexAddrsMessage) TypeByte() byte { return msgTypeAddrs }
  249. func (m *pexAddrsMessage) String() string {
  250. return fmt.Sprintf("[pexAddrs %v]", m.Addrs)
  251. }
  252. /*
  253. A pexDeboraMessage requests the node to upgrade its source code
  254. */
  255. type PexDeboraMessage struct {
  256. Payload []byte
  257. }
  258. func (m *PexDeboraMessage) TypeByte() byte { return msgTypeDebora }
  259. func (m *PexDeboraMessage) String() string {
  260. return "[pexDebora]"
  261. }