You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

358 lines
10 KiB

9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
prevent nil addr Error: ``` Error: runtime error: invalid memoryaddress or nil pointer dereference\nStack: goroutine 549 [running]:\nruntime/debug.Stack(0x0, 0x0, 0x0)\n\t/usr/local/go/src/runtime/debug/stack.go:24 +0x80\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*MConnection)._recover(0xc821723b00)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/connection.go:173 +0x53\npanic(0xbe1500, 0xc820012080)\n\t/usr/local/go/src/runtime/panic.go:443 +0x4e9\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*NetAddress).Valid(0x0, 0x0)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/netaddress.go:125 +0x1c\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*NetAddress).Routable(0x0, 0xc8217bb740)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/netaddress.go:117 +0x25\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*AddrBook).addAddress(0xc820108380, 0x0, 0xc821739590)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/addrbook.go:524 +0x45\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*AddrBook).AddAddress(0xc820108380, 0x0, 0xc821739590)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/addrbook.go:160 +0x286\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*PEXReactor).Receive(0xc82000be60, 0xc820149f00, 0xc8218163f0, 0xc82184e000, 0x5b, 0x1000)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/pex_reactor.go:109 +0x457\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.newPeer.func1(0xc82011d500, 0xc82184e000, 0x5b, 0x1000)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/peer.go:58 +0x202\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*MConnection).recvRoutine(0xc821723b00)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/connection.go:439 +0x1177\ncreated by github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*MConnection).OnStart\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/connection.go:138 +0x1a1\n ```
8 years ago
prevent nil addr Error: ``` Error: runtime error: invalid memoryaddress or nil pointer dereference\nStack: goroutine 549 [running]:\nruntime/debug.Stack(0x0, 0x0, 0x0)\n\t/usr/local/go/src/runtime/debug/stack.go:24 +0x80\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*MConnection)._recover(0xc821723b00)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/connection.go:173 +0x53\npanic(0xbe1500, 0xc820012080)\n\t/usr/local/go/src/runtime/panic.go:443 +0x4e9\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*NetAddress).Valid(0x0, 0x0)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/netaddress.go:125 +0x1c\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*NetAddress).Routable(0x0, 0xc8217bb740)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/netaddress.go:117 +0x25\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*AddrBook).addAddress(0xc820108380, 0x0, 0xc821739590)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/addrbook.go:524 +0x45\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*AddrBook).AddAddress(0xc820108380, 0x0, 0xc821739590)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/addrbook.go:160 +0x286\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*PEXReactor).Receive(0xc82000be60, 0xc820149f00, 0xc8218163f0, 0xc82184e000, 0x5b, 0x1000)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/pex_reactor.go:109 +0x457\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.newPeer.func1(0xc82011d500, 0xc82184e000, 0x5b, 0x1000)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/peer.go:58 +0x202\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*MConnection).recvRoutine(0xc821723b00)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/connection.go:439 +0x1177\ncreated by github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*MConnection).OnStart\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/connection.go:138 +0x1a1\n ```
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bytes"
  4. "fmt"
  5. "math/rand"
  6. "reflect"
  7. "time"
  8. cmn "github.com/tendermint/tmlibs/common"
  9. wire "github.com/tendermint/go-wire"
  10. )
  11. const (
  12. // PexChannel is a channel for PEX messages
  13. PexChannel = byte(0x00)
  14. // period to ensure peers connected
  15. defaultEnsurePeersPeriod = 30 * time.Second
  16. minNumOutboundPeers = 10
  17. maxPexMessageSize = 1048576 // 1MB
  18. // maximum messages one peer can send to us during `msgCountByPeerFlushInterval`
  19. defaultMaxMsgCountByPeer = 1000
  20. msgCountByPeerFlushInterval = 1 * time.Hour
  21. )
  22. // PEXReactor handles PEX (peer exchange) and ensures that an
  23. // adequate number of peers are connected to the switch.
  24. //
  25. // It uses `AddrBook` (address book) to store `NetAddress`es of the peers.
  26. //
  27. // ## Preventing abuse
  28. //
  29. // For now, it just limits the number of messages from one peer to
  30. // `defaultMaxMsgCountByPeer` messages per `msgCountByPeerFlushInterval` (1000
  31. // msg/hour).
  32. //
  33. // NOTE [2017-01-17]:
  34. // Limiting is fine for now. Maybe down the road we want to keep track of the
  35. // quality of peer messages so if peerA keeps telling us about peers we can't
  36. // connect to then maybe we should care less about peerA. But I don't think
  37. // that kind of complexity is priority right now.
  38. type PEXReactor struct {
  39. BaseReactor
  40. sw *Switch
  41. book *AddrBook
  42. ensurePeersPeriod time.Duration
  43. // tracks message count by peer, so we can prevent abuse
  44. msgCountByPeer *cmn.CMap
  45. maxMsgCountByPeer uint16
  46. }
  47. // NewPEXReactor creates new PEX reactor.
  48. func NewPEXReactor(b *AddrBook) *PEXReactor {
  49. r := &PEXReactor{
  50. book: b,
  51. ensurePeersPeriod: defaultEnsurePeersPeriod,
  52. msgCountByPeer: cmn.NewCMap(),
  53. maxMsgCountByPeer: defaultMaxMsgCountByPeer,
  54. }
  55. r.BaseReactor = *NewBaseReactor(log, "PEXReactor", r)
  56. return r
  57. }
  58. // OnStart implements BaseService
  59. func (r *PEXReactor) OnStart() error {
  60. r.BaseReactor.OnStart()
  61. r.book.Start()
  62. go r.ensurePeersRoutine()
  63. go r.flushMsgCountByPeer()
  64. return nil
  65. }
  66. // OnStop implements BaseService
  67. func (r *PEXReactor) OnStop() {
  68. r.BaseReactor.OnStop()
  69. r.book.Stop()
  70. }
  71. // GetChannels implements Reactor
  72. func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
  73. return []*ChannelDescriptor{
  74. &ChannelDescriptor{
  75. ID: PexChannel,
  76. Priority: 1,
  77. SendQueueCapacity: 10,
  78. },
  79. }
  80. }
  81. // AddPeer implements Reactor by adding peer to the address book (if inbound)
  82. // or by requesting more addresses (if outbound).
  83. func (r *PEXReactor) AddPeer(p *Peer) {
  84. if p.IsOutbound() {
  85. // For outbound peers, the address is already in the books.
  86. // Either it was added in DialSeeds or when we
  87. // received the peer's address in r.Receive
  88. if r.book.NeedMoreAddrs() {
  89. r.RequestPEX(p)
  90. }
  91. } else { // For inbound connections, the peer is its own source
  92. addr, err := NewNetAddressString(p.ListenAddr)
  93. if err != nil {
  94. // this should never happen
  95. log.Error("Error in AddPeer: invalid peer address", "addr", p.ListenAddr, "error", err)
  96. return
  97. }
  98. r.book.AddAddress(addr, addr)
  99. }
  100. }
  101. // RemovePeer implements Reactor.
  102. func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) {
  103. // If we aren't keeping track of local temp data for each peer here, then we
  104. // don't have to do anything.
  105. }
  106. // Receive implements Reactor by handling incoming PEX messages.
  107. func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
  108. srcAddr := src.Connection().RemoteAddress
  109. srcAddrStr := srcAddr.String()
  110. r.IncrementMsgCountForPeer(srcAddrStr)
  111. if r.ReachedMaxMsgCountForPeer(srcAddrStr) {
  112. log.Warn("Maximum number of messages reached for peer", "peer", srcAddrStr)
  113. // TODO remove src from peers?
  114. return
  115. }
  116. _, msg, err := DecodeMessage(msgBytes)
  117. if err != nil {
  118. log.Warn("Error decoding message", "error", err)
  119. return
  120. }
  121. log.Notice("Received message", "msg", msg)
  122. switch msg := msg.(type) {
  123. case *pexRequestMessage:
  124. // src requested some peers.
  125. r.SendAddrs(src, r.book.GetSelection())
  126. case *pexAddrsMessage:
  127. // We received some peer addresses from src.
  128. // (We don't want to get spammed with bad peers)
  129. for _, addr := range msg.Addrs {
  130. if addr != nil {
  131. r.book.AddAddress(addr, srcAddr)
  132. }
  133. }
  134. default:
  135. log.Warn(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
  136. }
  137. }
  138. // RequestPEX asks peer for more addresses.
  139. func (r *PEXReactor) RequestPEX(p *Peer) {
  140. p.Send(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
  141. }
  142. // SendAddrs sends addrs to the peer.
  143. func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) {
  144. p.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
  145. }
  146. // SetEnsurePeersPeriod sets period to ensure peers connected.
  147. func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) {
  148. r.ensurePeersPeriod = d
  149. }
  150. // SetMaxMsgCountByPeer sets maximum messages one peer can send to us during 'msgCountByPeerFlushInterval'.
  151. func (r *PEXReactor) SetMaxMsgCountByPeer(v uint16) {
  152. r.maxMsgCountByPeer = v
  153. }
  154. // ReachedMaxMsgCountForPeer returns true if we received too many
  155. // messages from peer with address `addr`.
  156. // NOTE: assumes the value in the CMap is non-nil
  157. func (r *PEXReactor) ReachedMaxMsgCountForPeer(addr string) bool {
  158. return r.msgCountByPeer.Get(addr).(uint16) >= r.maxMsgCountByPeer
  159. }
  160. // Increment or initialize the msg count for the peer in the CMap
  161. func (r *PEXReactor) IncrementMsgCountForPeer(addr string) {
  162. var count uint16
  163. countI := r.msgCountByPeer.Get(addr)
  164. if countI != nil {
  165. count = countI.(uint16)
  166. }
  167. count++
  168. r.msgCountByPeer.Set(addr, count)
  169. }
  170. // Ensures that sufficient peers are connected. (continuous)
  171. func (r *PEXReactor) ensurePeersRoutine() {
  172. // Randomize when routine starts
  173. ensurePeersPeriodMs := r.ensurePeersPeriod.Nanoseconds() / 1e6
  174. time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
  175. // fire once immediately.
  176. r.ensurePeers()
  177. // fire periodically
  178. ticker := time.NewTicker(r.ensurePeersPeriod)
  179. for {
  180. select {
  181. case <-ticker.C:
  182. r.ensurePeers()
  183. case <-r.Quit:
  184. ticker.Stop()
  185. return
  186. }
  187. }
  188. }
  189. // ensurePeers ensures that sufficient peers are connected. (once)
  190. //
  191. // Old bucket / New bucket are arbitrary categories to denote whether an
  192. // address is vetted or not, and this needs to be determined over time via a
  193. // heuristic that we haven't perfected yet, or, perhaps is manually edited by
  194. // the node operator. It should not be used to compute what addresses are
  195. // already connected or not.
  196. //
  197. // TODO Basically, we need to work harder on our good-peer/bad-peer marking.
  198. // What we're currently doing in terms of marking good/bad peers is just a
  199. // placeholder. It should not be the case that an address becomes old/vetted
  200. // upon a single successful connection.
  201. func (r *PEXReactor) ensurePeers() {
  202. numOutPeers, _, numDialing := r.Switch.NumPeers()
  203. numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
  204. log.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial)
  205. if numToDial <= 0 {
  206. return
  207. }
  208. toDial := make(map[string]*NetAddress)
  209. // Try to pick numToDial addresses to dial.
  210. for i := 0; i < numToDial; i++ {
  211. // The purpose of newBias is to first prioritize old (more vetted) peers
  212. // when we have few connections, but to allow for new (less vetted) peers
  213. // if we already have many connections. This algorithm isn't perfect, but
  214. // it somewhat ensures that we prioritize connecting to more-vetted
  215. // peers.
  216. newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
  217. var picked *NetAddress
  218. // Try to fetch a new peer 3 times.
  219. // This caps the maximum number of tries to 3 * numToDial.
  220. for j := 0; j < 3; j++ {
  221. try := r.book.PickAddress(newBias)
  222. if try == nil {
  223. break
  224. }
  225. _, alreadySelected := toDial[try.IP.String()]
  226. alreadyDialing := r.Switch.IsDialing(try)
  227. alreadyConnected := r.Switch.Peers().Has(try.IP.String())
  228. if alreadySelected || alreadyDialing || alreadyConnected {
  229. // log.Info("Cannot dial address", "addr", try,
  230. // "alreadySelected", alreadySelected,
  231. // "alreadyDialing", alreadyDialing,
  232. // "alreadyConnected", alreadyConnected)
  233. continue
  234. } else {
  235. log.Info("Will dial address", "addr", try)
  236. picked = try
  237. break
  238. }
  239. }
  240. if picked == nil {
  241. continue
  242. }
  243. toDial[picked.IP.String()] = picked
  244. }
  245. // Dial picked addresses
  246. for _, item := range toDial {
  247. go func(picked *NetAddress) {
  248. _, err := r.Switch.DialPeerWithAddress(picked, false)
  249. if err != nil {
  250. r.book.MarkAttempt(picked)
  251. }
  252. }(item)
  253. }
  254. // If we need more addresses, pick a random peer and ask for more.
  255. if r.book.NeedMoreAddrs() {
  256. if peers := r.Switch.Peers().List(); len(peers) > 0 {
  257. i := rand.Int() % len(peers)
  258. peer := peers[i]
  259. log.Info("No addresses to dial. Sending pexRequest to random peer", "peer", peer)
  260. r.RequestPEX(peer)
  261. }
  262. }
  263. }
  264. func (r *PEXReactor) flushMsgCountByPeer() {
  265. ticker := time.NewTicker(msgCountByPeerFlushInterval)
  266. for {
  267. select {
  268. case <-ticker.C:
  269. r.msgCountByPeer.Clear()
  270. case <-r.Quit:
  271. ticker.Stop()
  272. return
  273. }
  274. }
  275. }
  276. //-----------------------------------------------------------------------------
  277. // Messages
  278. const (
  279. msgTypeRequest = byte(0x01)
  280. msgTypeAddrs = byte(0x02)
  281. )
  282. // PexMessage is a primary type for PEX messages. Underneath, it could contain
  283. // either pexRequestMessage, or pexAddrsMessage messages.
  284. type PexMessage interface{}
  285. var _ = wire.RegisterInterface(
  286. struct{ PexMessage }{},
  287. wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
  288. wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
  289. )
  290. // DecodeMessage implements interface registered above.
  291. func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
  292. msgType = bz[0]
  293. n := new(int)
  294. r := bytes.NewReader(bz)
  295. msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
  296. return
  297. }
  298. /*
  299. A pexRequestMessage requests additional peer addresses.
  300. */
  301. type pexRequestMessage struct {
  302. }
  303. func (m *pexRequestMessage) String() string {
  304. return "[pexRequest]"
  305. }
  306. /*
  307. A message with announced peer addresses.
  308. */
  309. type pexAddrsMessage struct {
  310. Addrs []*NetAddress
  311. }
  312. func (m *pexAddrsMessage) String() string {
  313. return fmt.Sprintf("[pexAddrs %v]", m.Addrs)
  314. }