You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

268 lines
6.6 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
prevent nil addr Error: ``` Error: runtime error: invalid memoryaddress or nil pointer dereference\nStack: goroutine 549 [running]:\nruntime/debug.Stack(0x0, 0x0, 0x0)\n\t/usr/local/go/src/runtime/debug/stack.go:24 +0x80\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*MConnection)._recover(0xc821723b00)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/connection.go:173 +0x53\npanic(0xbe1500, 0xc820012080)\n\t/usr/local/go/src/runtime/panic.go:443 +0x4e9\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*NetAddress).Valid(0x0, 0x0)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/netaddress.go:125 +0x1c\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*NetAddress).Routable(0x0, 0xc8217bb740)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/netaddress.go:117 +0x25\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*AddrBook).addAddress(0xc820108380, 0x0, 0xc821739590)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/addrbook.go:524 +0x45\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*AddrBook).AddAddress(0xc820108380, 0x0, 0xc821739590)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/addrbook.go:160 +0x286\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*PEXReactor).Receive(0xc82000be60, 0xc820149f00, 0xc8218163f0, 0xc82184e000, 0x5b, 0x1000)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/pex_reactor.go:109 +0x457\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.newPeer.func1(0xc82011d500, 0xc82184e000, 0x5b, 0x1000)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/peer.go:58 +0x202\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*MConnection).recvRoutine(0xc821723b00)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/connection.go:439 +0x1177\ncreated by github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*MConnection).OnStart\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/connection.go:138 +0x1a1\n ```
8 years ago
prevent nil addr Error: ``` Error: runtime error: invalid memoryaddress or nil pointer dereference\nStack: goroutine 549 [running]:\nruntime/debug.Stack(0x0, 0x0, 0x0)\n\t/usr/local/go/src/runtime/debug/stack.go:24 +0x80\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*MConnection)._recover(0xc821723b00)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/connection.go:173 +0x53\npanic(0xbe1500, 0xc820012080)\n\t/usr/local/go/src/runtime/panic.go:443 +0x4e9\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*NetAddress).Valid(0x0, 0x0)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/netaddress.go:125 +0x1c\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*NetAddress).Routable(0x0, 0xc8217bb740)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/netaddress.go:117 +0x25\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*AddrBook).addAddress(0xc820108380, 0x0, 0xc821739590)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/addrbook.go:524 +0x45\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*AddrBook).AddAddress(0xc820108380, 0x0, 0xc821739590)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/addrbook.go:160 +0x286\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*PEXReactor).Receive(0xc82000be60, 0xc820149f00, 0xc8218163f0, 0xc82184e000, 0x5b, 0x1000)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/pex_reactor.go:109 +0x457\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.newPeer.func1(0xc82011d500, 0xc82184e000, 0x5b, 0x1000)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/peer.go:58 +0x202\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*MConnection).recvRoutine(0xc821723b00)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/connection.go:439 +0x1177\ncreated by github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*MConnection).OnStart\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/connection.go:138 +0x1a1\n ```
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bytes"
  4. "fmt"
  5. "math/rand"
  6. "reflect"
  7. "time"
  8. . "github.com/tendermint/go-common"
  9. wire "github.com/tendermint/go-wire"
  10. )
  11. const (
  12. PexChannel = byte(0x00)
  13. defaultEnsurePeersPeriod = 30 * time.Second
  14. minNumOutboundPeers = 10
  15. maxPexMessageSize = 1048576 // 1MB
  16. )
  17. // PEXReactor handles PEX (peer exchange) and ensures that an
  18. // adequate number of peers are connected to the switch.
  19. type PEXReactor struct {
  20. BaseReactor
  21. sw *Switch
  22. book *AddrBook
  23. ensurePeersPeriod time.Duration
  24. }
  25. func NewPEXReactor(b *AddrBook) *PEXReactor {
  26. r := &PEXReactor{
  27. book: b,
  28. ensurePeersPeriod: defaultEnsurePeersPeriod,
  29. }
  30. r.BaseReactor = *NewBaseReactor(log, "PEXReactor", r)
  31. return r
  32. }
  33. func (r *PEXReactor) OnStart() error {
  34. r.BaseReactor.OnStart()
  35. r.book.Start()
  36. go r.ensurePeersRoutine()
  37. return nil
  38. }
  39. func (r *PEXReactor) OnStop() {
  40. r.BaseReactor.OnStop()
  41. r.book.Stop()
  42. }
  43. // GetChannels implements Reactor
  44. func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
  45. return []*ChannelDescriptor{
  46. &ChannelDescriptor{
  47. ID: PexChannel,
  48. Priority: 1,
  49. SendQueueCapacity: 10,
  50. },
  51. }
  52. }
  53. // AddPeer implements Reactor by adding peer to the address book (if inbound)
  54. // or by requesting more addresses (if outbound).
  55. func (r *PEXReactor) AddPeer(p *Peer) {
  56. netAddr, err := NewNetAddressString(p.ListenAddr)
  57. if err != nil {
  58. // this should never happen
  59. log.Error("Error in AddPeer: invalid peer address", "addr", p.ListenAddr, "error", err)
  60. return
  61. }
  62. if p.IsOutbound() { // For outbound peers, the address is already in the books
  63. if r.book.NeedMoreAddrs() {
  64. r.RequestPEX(p)
  65. }
  66. } else { // For inbound connections, the peer is its own source
  67. addr := NewNetAddressString(p.ListenAddr)
  68. r.book.AddAddress(addr, addr)
  69. }
  70. }
  71. // RemovePeer implements Reactor
  72. func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) {
  73. addr := NewNetAddressString(p.ListenAddr)
  74. // addr will be ejected from the book
  75. r.book.MarkBad(addr)
  76. }
  77. // Receive implements Reactor by handling incoming PEX messages.
  78. func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
  79. _, msg, err := DecodeMessage(msgBytes)
  80. if err != nil {
  81. log.Warn("Error decoding message", "error", err)
  82. return
  83. }
  84. log.Notice("Received message", "msg", msg)
  85. switch msg := msg.(type) {
  86. case *pexRequestMessage:
  87. // src requested some peers.
  88. // TODO: prevent abuse.
  89. r.SendAddrs(src, r.book.GetSelection())
  90. case *pexAddrsMessage:
  91. // We received some peer addresses from src.
  92. // TODO: prevent abuse.
  93. // (We don't want to get spammed with bad peers)
  94. srcAddr := src.Connection().RemoteAddress
  95. for _, addr := range msg.Addrs {
  96. if addr != nil {
  97. r.book.AddAddress(addr, srcAddr)
  98. }
  99. }
  100. default:
  101. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  102. }
  103. }
  104. // RequestPEX asks peer for more addresses.
  105. func (r *PEXReactor) RequestPEX(p *Peer) {
  106. p.Send(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
  107. }
  108. // SendAddrs sends addrs to the peer.
  109. func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) {
  110. p.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
  111. }
  112. // SetEnsurePeersPeriod sets period to ensure peers connected.
  113. func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) {
  114. r.ensurePeersPeriod = d
  115. }
  116. // Ensures that sufficient peers are connected. (continuous)
  117. func (r *PEXReactor) ensurePeersRoutine() {
  118. // Randomize when routine starts
  119. ensurePeersPeriodMs := r.ensurePeersPeriod.Nanoseconds() / 1e6
  120. time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
  121. // fire once immediately.
  122. r.ensurePeers()
  123. // fire periodically
  124. ticker := time.NewTicker(r.ensurePeersPeriod)
  125. FOR_LOOP:
  126. for {
  127. select {
  128. case <-ticker.C:
  129. r.ensurePeers()
  130. case <-r.Quit:
  131. break FOR_LOOP
  132. }
  133. }
  134. ticker.Stop()
  135. }
  136. // ensurePeers ensures that sufficient peers are connected. (once)
  137. func (r *PEXReactor) ensurePeers() {
  138. numOutPeers, _, numDialing := r.Switch.NumPeers()
  139. numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
  140. log.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial)
  141. if numToDial <= 0 {
  142. return
  143. }
  144. toDial := NewCMap()
  145. // Try to pick numToDial addresses to dial.
  146. // TODO: improve logic.
  147. for i := 0; i < numToDial; i++ {
  148. newBias := MinInt(numOutPeers, 8)*10 + 10
  149. var picked *NetAddress
  150. // Try to fetch a new peer 3 times.
  151. // This caps the maximum number of tries to 3 * numToDial.
  152. for j := 0; j < 3; j++ {
  153. try := r.book.PickAddress(newBias)
  154. if try == nil {
  155. break
  156. }
  157. alreadySelected := toDial.Has(try.IP.String())
  158. alreadyDialing := r.Switch.IsDialing(try)
  159. alreadyConnected := r.Switch.Peers().Has(try.IP.String())
  160. if alreadySelected || alreadyDialing || alreadyConnected {
  161. /*
  162. log.Info("Cannot dial address", "addr", try,
  163. "alreadySelected", alreadySelected,
  164. "alreadyDialing", alreadyDialing,
  165. "alreadyConnected", alreadyConnected)
  166. */
  167. continue
  168. } else {
  169. log.Info("Will dial address", "addr", try)
  170. picked = try
  171. break
  172. }
  173. }
  174. if picked == nil {
  175. continue
  176. }
  177. toDial.Set(picked.IP.String(), picked)
  178. }
  179. // Dial picked addresses
  180. for _, item := range toDial.Values() {
  181. go func(picked *NetAddress) {
  182. _, err := r.Switch.DialPeerWithAddress(picked, false)
  183. if err != nil {
  184. r.book.MarkAttempt(picked)
  185. }
  186. }(item.(*NetAddress))
  187. }
  188. // If we need more addresses, pick a random peer and ask for more.
  189. if r.book.NeedMoreAddrs() {
  190. if peers := r.Switch.Peers().List(); len(peers) > 0 {
  191. i := rand.Int() % len(peers)
  192. peer := peers[i]
  193. log.Info("No addresses to dial. Sending pexRequest to random peer", "peer", peer)
  194. r.RequestPEX(peer)
  195. }
  196. }
  197. }
  198. //-----------------------------------------------------------------------------
  199. // Messages
  200. const (
  201. msgTypeRequest = byte(0x01)
  202. msgTypeAddrs = byte(0x02)
  203. )
  204. type PexMessage interface{}
  205. var _ = wire.RegisterInterface(
  206. struct{ PexMessage }{},
  207. wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
  208. wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
  209. )
  210. func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
  211. msgType = bz[0]
  212. n := new(int)
  213. r := bytes.NewReader(bz)
  214. msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
  215. return
  216. }
  217. /*
  218. A pexRequestMessage requests additional peer addresses.
  219. */
  220. type pexRequestMessage struct {
  221. }
  222. func (m *pexRequestMessage) String() string {
  223. return "[pexRequest]"
  224. }
  225. /*
  226. A message with announced peer addresses.
  227. */
  228. type pexAddrsMessage struct {
  229. Addrs []*NetAddress
  230. }
  231. func (m *pexAddrsMessage) String() string {
  232. return fmt.Sprintf("[pexAddrs %v]", m.Addrs)
  233. }