You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

337 lines
9.1 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
prevent nil addr Error: ``` Error: runtime error: invalid memoryaddress or nil pointer dereference\nStack: goroutine 549 [running]:\nruntime/debug.Stack(0x0, 0x0, 0x0)\n\t/usr/local/go/src/runtime/debug/stack.go:24 +0x80\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*MConnection)._recover(0xc821723b00)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/connection.go:173 +0x53\npanic(0xbe1500, 0xc820012080)\n\t/usr/local/go/src/runtime/panic.go:443 +0x4e9\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*NetAddress).Valid(0x0, 0x0)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/netaddress.go:125 +0x1c\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*NetAddress).Routable(0x0, 0xc8217bb740)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/netaddress.go:117 +0x25\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*AddrBook).addAddress(0xc820108380, 0x0, 0xc821739590)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/addrbook.go:524 +0x45\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*AddrBook).AddAddress(0xc820108380, 0x0, 0xc821739590)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/addrbook.go:160 +0x286\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*PEXReactor).Receive(0xc82000be60, 0xc820149f00, 0xc8218163f0, 0xc82184e000, 0x5b, 0x1000)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/pex_reactor.go:109 +0x457\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.newPeer.func1(0xc82011d500, 0xc82184e000, 0x5b, 0x1000)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/peer.go:58 +0x202\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*MConnection).recvRoutine(0xc821723b00)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/connection.go:439 +0x1177\ncreated by github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*MConnection).OnStart\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/connection.go:138 +0x1a1\n ```
8 years ago
prevent nil addr Error: ``` Error: runtime error: invalid memoryaddress or nil pointer dereference\nStack: goroutine 549 [running]:\nruntime/debug.Stack(0x0, 0x0, 0x0)\n\t/usr/local/go/src/runtime/debug/stack.go:24 +0x80\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*MConnection)._recover(0xc821723b00)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/connection.go:173 +0x53\npanic(0xbe1500, 0xc820012080)\n\t/usr/local/go/src/runtime/panic.go:443 +0x4e9\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*NetAddress).Valid(0x0, 0x0)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/netaddress.go:125 +0x1c\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*NetAddress).Routable(0x0, 0xc8217bb740)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/netaddress.go:117 +0x25\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*AddrBook).addAddress(0xc820108380, 0x0, 0xc821739590)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/addrbook.go:524 +0x45\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*AddrBook).AddAddress(0xc820108380, 0x0, 0xc821739590)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/addrbook.go:160 +0x286\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*PEXReactor).Receive(0xc82000be60, 0xc820149f00, 0xc8218163f0, 0xc82184e000, 0x5b, 0x1000)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/pex_reactor.go:109 +0x457\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.newPeer.func1(0xc82011d500, 0xc82184e000, 0x5b, 0x1000)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/peer.go:58 +0x202\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*MConnection).recvRoutine(0xc821723b00)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/connection.go:439 +0x1177\ncreated by github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*MConnection).OnStart\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/connection.go:138 +0x1a1\n ```
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bytes"
  4. "fmt"
  5. "math/rand"
  6. "reflect"
  7. "time"
  8. wire "github.com/tendermint/go-wire"
  9. )
  10. const (
  11. // PexChannel is a channel for PEX messages
  12. PexChannel = byte(0x00)
  13. // period to ensure peers connected
  14. defaultEnsurePeersPeriod = 30 * time.Second
  15. minNumOutboundPeers = 10
  16. maxPexMessageSize = 1048576 // 1MB
  17. // maximum messages one peer can send to us during `msgCountByPeerFlushInterval`
  18. defaultMaxMsgCountByPeer = 1000
  19. msgCountByPeerFlushInterval = 1 * time.Hour
  20. )
  21. // PEXReactor handles PEX (peer exchange) and ensures that an
  22. // adequate number of peers are connected to the switch.
  23. //
  24. // It uses `AddrBook` (address book) to store `NetAddress`es of the peers.
  25. //
  26. // ## Preventing abuse
  27. //
  28. // For now, it just limits the number of messages from one peer to
  29. // `defaultMaxMsgCountByPeer` messages per `msgCountByPeerFlushInterval` (1000
  30. // msg/hour).
  31. //
  32. // NOTE [2017-01-17]:
  33. // Limiting is fine for now. Maybe down the road we want to keep track of the
  34. // quality of peer messages so if peerA keeps telling us about peers we can't
  35. // connect to then maybe we should care less about peerA. But I don't think
  36. // that kind of complexity is priority right now.
  37. type PEXReactor struct {
  38. BaseReactor
  39. sw *Switch
  40. book *AddrBook
  41. ensurePeersPeriod time.Duration
  42. // tracks message count by peer, so we can prevent abuse
  43. msgCountByPeer map[string]uint16
  44. maxMsgCountByPeer uint16
  45. }
  46. // NewPEXReactor creates new PEX reactor.
  47. func NewPEXReactor(b *AddrBook) *PEXReactor {
  48. r := &PEXReactor{
  49. book: b,
  50. ensurePeersPeriod: defaultEnsurePeersPeriod,
  51. msgCountByPeer: make(map[string]uint16),
  52. maxMsgCountByPeer: defaultMaxMsgCountByPeer,
  53. }
  54. r.BaseReactor = *NewBaseReactor(log, "PEXReactor", r)
  55. return r
  56. }
  57. // OnStart implements BaseService
  58. func (r *PEXReactor) OnStart() error {
  59. r.BaseReactor.OnStart()
  60. r.book.Start()
  61. go r.ensurePeersRoutine()
  62. go r.flushMsgCountByPeer()
  63. return nil
  64. }
  65. // OnStop implements BaseService
  66. func (r *PEXReactor) OnStop() {
  67. r.BaseReactor.OnStop()
  68. r.book.Stop()
  69. }
  70. // GetChannels implements Reactor
  71. func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
  72. return []*ChannelDescriptor{
  73. &ChannelDescriptor{
  74. ID: PexChannel,
  75. Priority: 1,
  76. SendQueueCapacity: 10,
  77. },
  78. }
  79. }
  80. // AddPeer implements Reactor by adding peer to the address book (if inbound)
  81. // or by requesting more addresses (if outbound).
  82. func (r *PEXReactor) AddPeer(p *Peer) {
  83. if p.IsOutbound() { // For outbound peers, the address is already in the books
  84. if r.book.NeedMoreAddrs() {
  85. r.RequestPEX(p)
  86. }
  87. } else { // For inbound connections, the peer is its own source
  88. addr, err := NewNetAddressString(p.ListenAddr)
  89. if err != nil {
  90. // this should never happen
  91. log.Error("Error in AddPeer: invalid peer address", "addr", p.ListenAddr, "error", err)
  92. return
  93. }
  94. r.book.AddAddress(addr, addr)
  95. }
  96. }
  97. // RemovePeer implements Reactor by removing peer from the address book.
  98. //
  99. // The peer will be proposed to us by other peers (PexAddrsMessage) or himself
  100. // and we will add him again upon successful connection. Note that other peers
  101. // will remove him too. The peer will need to send first requests to others by
  102. // himself (he will have an addrbook or the seeds).
  103. func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) {
  104. addr, err := NewNetAddressString(p.ListenAddr)
  105. if err != nil {
  106. // this should never happen
  107. log.Error("Error in AddPeer: invalid peer address", "addr", p.ListenAddr, "error", err)
  108. return
  109. }
  110. r.book.RemoveAddress(addr)
  111. }
  112. // Receive implements Reactor by handling incoming PEX messages.
  113. func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
  114. srcAddr := src.Connection().RemoteAddress
  115. srcAddrStr := srcAddr.String()
  116. r.msgCountByPeer[srcAddrStr]++
  117. if r.ReachedMaxMsgCountForPeer(srcAddrStr) {
  118. log.Warn("Maximum number of messages reached for peer", "peer", srcAddrStr)
  119. // TODO remove src from peers?
  120. return
  121. }
  122. _, msg, err := DecodeMessage(msgBytes)
  123. if err != nil {
  124. log.Warn("Error decoding message", "error", err)
  125. return
  126. }
  127. log.Notice("Received message", "msg", msg)
  128. switch msg := msg.(type) {
  129. case *pexRequestMessage:
  130. // src requested some peers.
  131. r.SendAddrs(src, r.book.GetSelection())
  132. case *pexAddrsMessage:
  133. // We received some peer addresses from src.
  134. // (We don't want to get spammed with bad peers)
  135. for _, addr := range msg.Addrs {
  136. if addr != nil {
  137. r.book.AddAddress(addr, srcAddr)
  138. }
  139. }
  140. default:
  141. log.Warn(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
  142. }
  143. }
  144. // RequestPEX asks peer for more addresses.
  145. func (r *PEXReactor) RequestPEX(p *Peer) {
  146. p.Send(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
  147. }
  148. // SendAddrs sends addrs to the peer.
  149. func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) {
  150. p.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
  151. }
  152. // SetEnsurePeersPeriod sets period to ensure peers connected.
  153. func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) {
  154. r.ensurePeersPeriod = d
  155. }
  156. // SetMaxMsgCountByPeer sets maximum messages one peer can send to us during 'msgCountByPeerFlushInterval'.
  157. func (r *PEXReactor) SetMaxMsgCountByPeer(v uint16) {
  158. r.maxMsgCountByPeer = v
  159. }
  160. // ReachedMaxMsgCountForPeer returns true if we received too many
  161. // messages from peer with address `addr`.
  162. func (r *PEXReactor) ReachedMaxMsgCountForPeer(addr string) bool {
  163. return r.msgCountByPeer[addr] >= r.maxMsgCountByPeer
  164. }
  165. // Ensures that sufficient peers are connected. (continuous)
  166. func (r *PEXReactor) ensurePeersRoutine() {
  167. // Randomize when routine starts
  168. ensurePeersPeriodMs := r.ensurePeersPeriod.Nanoseconds() / 1e6
  169. time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
  170. // fire once immediately.
  171. r.ensurePeers()
  172. // fire periodically
  173. ticker := time.NewTicker(r.ensurePeersPeriod)
  174. for {
  175. select {
  176. case <-ticker.C:
  177. r.ensurePeers()
  178. case <-r.Quit:
  179. ticker.Stop()
  180. return
  181. }
  182. }
  183. }
  184. // ensurePeers ensures that sufficient peers are connected. (once)
  185. func (r *PEXReactor) ensurePeers() {
  186. numOutPeers, _, numDialing := r.Switch.NumPeers()
  187. numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
  188. log.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial)
  189. if numToDial <= 0 {
  190. return
  191. }
  192. toDial := make(map[string]*NetAddress)
  193. // Try to pick numToDial addresses to dial.
  194. for i := 0; i < numToDial; i++ {
  195. var picked *NetAddress
  196. // Try to fetch a new peer 3 times.
  197. // This caps the maximum number of tries to 3 * numToDial.
  198. for j := 0; j < 3; j++ {
  199. // NOTE always picking from the new group because old one stores already
  200. // connected peers.
  201. try := r.book.PickAddress(100)
  202. if try == nil {
  203. break
  204. }
  205. _, alreadySelected := toDial[try.IP.String()]
  206. alreadyDialing := r.Switch.IsDialing(try)
  207. if alreadySelected || alreadyDialing {
  208. // log.Info("Cannot dial address", "addr", try,
  209. // "alreadySelected", alreadySelected,
  210. // "alreadyDialing", alreadyDialing)
  211. continue
  212. } else {
  213. log.Info("Will dial address", "addr", try)
  214. picked = try
  215. break
  216. }
  217. }
  218. if picked == nil {
  219. continue
  220. }
  221. toDial[picked.IP.String()] = picked
  222. }
  223. // Dial picked addresses
  224. for _, item := range toDial {
  225. go func(picked *NetAddress) {
  226. _, err := r.Switch.DialPeerWithAddress(picked, false)
  227. if err != nil {
  228. r.book.MarkAttempt(picked)
  229. } else {
  230. // move address to the old group
  231. r.book.MarkGood(picked)
  232. }
  233. }(item)
  234. }
  235. // If we need more addresses, pick a random peer and ask for more.
  236. if r.book.NeedMoreAddrs() {
  237. if peers := r.Switch.Peers().List(); len(peers) > 0 {
  238. i := rand.Int() % len(peers)
  239. peer := peers[i]
  240. log.Info("No addresses to dial. Sending pexRequest to random peer", "peer", peer)
  241. r.RequestPEX(peer)
  242. }
  243. }
  244. }
  245. func (r *PEXReactor) flushMsgCountByPeer() {
  246. ticker := time.NewTicker(msgCountByPeerFlushInterval)
  247. for {
  248. select {
  249. case <-ticker.C:
  250. r.msgCountByPeer = make(map[string]uint16)
  251. case <-r.Quit:
  252. ticker.Stop()
  253. return
  254. }
  255. }
  256. }
  257. //-----------------------------------------------------------------------------
  258. // Messages
  259. const (
  260. msgTypeRequest = byte(0x01)
  261. msgTypeAddrs = byte(0x02)
  262. )
  263. // PexMessage is a primary type for PEX messages. Underneath, it could contain
  264. // either pexRequestMessage, or pexAddrsMessage messages.
  265. type PexMessage interface{}
  266. var _ = wire.RegisterInterface(
  267. struct{ PexMessage }{},
  268. wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
  269. wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
  270. )
  271. // DecodeMessage implements interface registered above.
  272. func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
  273. msgType = bz[0]
  274. n := new(int)
  275. r := bytes.NewReader(bz)
  276. msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
  277. return
  278. }
  279. /*
  280. A pexRequestMessage requests additional peer addresses.
  281. */
  282. type pexRequestMessage struct {
  283. }
  284. func (m *pexRequestMessage) String() string {
  285. return "[pexRequest]"
  286. }
  287. /*
  288. A message with announced peer addresses.
  289. */
  290. type pexAddrsMessage struct {
  291. Addrs []*NetAddress
  292. }
  293. func (m *pexAddrsMessage) String() string {
  294. return fmt.Sprintf("[pexAddrs %v]", m.Addrs)
  295. }