You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

372 lines
11 KiB

9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
7 years ago
7 years ago
7 years ago
7 years ago
9 years ago
7 years ago
7 years ago
7 years ago
9 years ago
7 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
prevent nil addr Error: ``` Error: runtime error: invalid memoryaddress or nil pointer dereference\nStack: goroutine 549 [running]:\nruntime/debug.Stack(0x0, 0x0, 0x0)\n\t/usr/local/go/src/runtime/debug/stack.go:24 +0x80\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*MConnection)._recover(0xc821723b00)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/connection.go:173 +0x53\npanic(0xbe1500, 0xc820012080)\n\t/usr/local/go/src/runtime/panic.go:443 +0x4e9\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*NetAddress).Valid(0x0, 0x0)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/netaddress.go:125 +0x1c\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*NetAddress).Routable(0x0, 0xc8217bb740)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/netaddress.go:117 +0x25\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*AddrBook).addAddress(0xc820108380, 0x0, 0xc821739590)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/addrbook.go:524 +0x45\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*AddrBook).AddAddress(0xc820108380, 0x0, 0xc821739590)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/addrbook.go:160 +0x286\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*PEXReactor).Receive(0xc82000be60, 0xc820149f00, 0xc8218163f0, 0xc82184e000, 0x5b, 0x1000)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/pex_reactor.go:109 +0x457\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.newPeer.func1(0xc82011d500, 0xc82184e000, 0x5b, 0x1000)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/peer.go:58 +0x202\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*MConnection).recvRoutine(0xc821723b00)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/connection.go:439 +0x1177\ncreated by github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*MConnection).OnStart\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/connection.go:138 +0x1a1\n ```
8 years ago
prevent nil addr Error: ``` Error: runtime error: invalid memoryaddress or nil pointer dereference\nStack: goroutine 549 [running]:\nruntime/debug.Stack(0x0, 0x0, 0x0)\n\t/usr/local/go/src/runtime/debug/stack.go:24 +0x80\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*MConnection)._recover(0xc821723b00)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/connection.go:173 +0x53\npanic(0xbe1500, 0xc820012080)\n\t/usr/local/go/src/runtime/panic.go:443 +0x4e9\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*NetAddress).Valid(0x0, 0x0)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/netaddress.go:125 +0x1c\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*NetAddress).Routable(0x0, 0xc8217bb740)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/netaddress.go:117 +0x25\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*AddrBook).addAddress(0xc820108380, 0x0, 0xc821739590)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/addrbook.go:524 +0x45\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*AddrBook).AddAddress(0xc820108380, 0x0, 0xc821739590)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/addrbook.go:160 +0x286\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*PEXReactor).Receive(0xc82000be60, 0xc820149f00, 0xc8218163f0, 0xc82184e000, 0x5b, 0x1000)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/pex_reactor.go:109 +0x457\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.newPeer.func1(0xc82011d500, 0xc82184e000, 0x5b, 0x1000)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/peer.go:58 +0x202\ngithub.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*MConnection).recvRoutine(0xc821723b00)\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/connection.go:439 +0x1177\ncreated by github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p.(*MConnection).OnStart\n\t/go/src/github.com/tendermint/tendermint/vendor/github.com/tendermint/go-p2p/connection.go:138 +0x1a1\n ```
8 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bytes"
  4. "fmt"
  5. "math/rand"
  6. "reflect"
  7. "time"
  8. wire "github.com/tendermint/go-wire"
  9. cmn "github.com/tendermint/tmlibs/common"
  10. )
  11. const (
  12. // PexChannel is a channel for PEX messages
  13. PexChannel = byte(0x00)
  14. // period to ensure peers connected
  15. defaultEnsurePeersPeriod = 30 * time.Second
  16. minNumOutboundPeers = 10
  17. maxPexMessageSize = 1048576 // 1MB
  18. // maximum pex messages one peer can send to us during `msgCountByPeerFlushInterval`
  19. defaultMaxMsgCountByPeer = 1000
  20. msgCountByPeerFlushInterval = 1 * time.Hour
  21. )
  22. // PEXReactor handles PEX (peer exchange) and ensures that an
  23. // adequate number of peers are connected to the switch.
  24. //
  25. // It uses `AddrBook` (address book) to store `NetAddress`es of the peers.
  26. //
  27. // ## Preventing abuse
  28. //
  29. // For now, it just limits the number of messages from one peer to
  30. // `defaultMaxMsgCountByPeer` messages per `msgCountByPeerFlushInterval` (1000
  31. // msg/hour).
  32. //
  33. // NOTE [2017-01-17]:
  34. // Limiting is fine for now. Maybe down the road we want to keep track of the
  35. // quality of peer messages so if peerA keeps telling us about peers we can't
  36. // connect to then maybe we should care less about peerA. But I don't think
  37. // that kind of complexity is priority right now.
  38. type PEXReactor struct {
  39. BaseReactor
  40. book *AddrBook
  41. config *PEXReactorConfig
  42. ensurePeersPeriod time.Duration
  43. // tracks message count by peer, so we can prevent abuse
  44. msgCountByPeer *cmn.CMap
  45. maxMsgCountByPeer uint16
  46. }
  47. // PEXReactorConfig holds reactor specific configuration data.
  48. type PEXReactorConfig struct {
  49. // Seeds is a list of addresses reactor may use if it can't connect to peers
  50. // in the addrbook.
  51. Seeds []string
  52. }
  53. // NewPEXReactor creates new PEX reactor.
  54. func NewPEXReactor(b *AddrBook, config *PEXReactorConfig) *PEXReactor {
  55. r := &PEXReactor{
  56. book: b,
  57. config: config,
  58. ensurePeersPeriod: defaultEnsurePeersPeriod,
  59. msgCountByPeer: cmn.NewCMap(),
  60. maxMsgCountByPeer: defaultMaxMsgCountByPeer,
  61. }
  62. r.BaseReactor = *NewBaseReactor("PEXReactor", r)
  63. return r
  64. }
  65. // OnStart implements BaseService
  66. func (r *PEXReactor) OnStart() error {
  67. if err := r.BaseReactor.OnStart(); err != nil {
  68. return err
  69. }
  70. err := r.book.Start()
  71. if err != nil && err != cmn.ErrAlreadyStarted {
  72. return err
  73. }
  74. go r.ensurePeersRoutine()
  75. go r.flushMsgCountByPeer()
  76. return nil
  77. }
  78. // OnStop implements BaseService
  79. func (r *PEXReactor) OnStop() {
  80. r.BaseReactor.OnStop()
  81. r.book.Stop()
  82. }
  83. // GetChannels implements Reactor
  84. func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
  85. return []*ChannelDescriptor{
  86. {
  87. ID: PexChannel,
  88. Priority: 1,
  89. SendQueueCapacity: 10,
  90. },
  91. }
  92. }
  93. // AddPeer implements Reactor by adding peer to the address book (if inbound)
  94. // or by requesting more addresses (if outbound).
  95. func (r *PEXReactor) AddPeer(p Peer) {
  96. if p.IsOutbound() {
  97. // For outbound peers, the address is already in the books.
  98. // Either it was added in DialPersistentPeers or when we
  99. // received the peer's address in r.Receive
  100. if r.book.NeedMoreAddrs() {
  101. r.RequestPEX(p)
  102. }
  103. } else { // For inbound connections, the peer is its own source
  104. addr, err := NewNetAddressString(p.NodeInfo().ListenAddr)
  105. if err != nil {
  106. // peer gave us a bad ListenAddr. TODO: punish
  107. r.Logger.Error("Error in AddPeer: invalid peer address", "addr", p.NodeInfo().ListenAddr, "err", err)
  108. return
  109. }
  110. r.book.AddAddress(addr, addr)
  111. }
  112. }
  113. // RemovePeer implements Reactor.
  114. func (r *PEXReactor) RemovePeer(p Peer, reason interface{}) {
  115. // If we aren't keeping track of local temp data for each peer here, then we
  116. // don't have to do anything.
  117. }
  118. // Receive implements Reactor by handling incoming PEX messages.
  119. func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) {
  120. srcAddrStr := src.NodeInfo().RemoteAddr
  121. srcAddr, err := NewNetAddressString(srcAddrStr)
  122. if err != nil {
  123. // this should never happen. TODO: cancel conn
  124. r.Logger.Error("Error in Receive: invalid peer address", "addr", srcAddrStr, "err", err)
  125. return
  126. }
  127. r.IncrementMsgCountForPeer(srcAddrStr)
  128. if r.ReachedMaxMsgCountForPeer(srcAddrStr) {
  129. r.Logger.Error("Maximum number of messages reached for peer", "peer", srcAddrStr)
  130. // TODO remove src from peers?
  131. return
  132. }
  133. _, msg, err := DecodeMessage(msgBytes)
  134. if err != nil {
  135. r.Logger.Error("Error decoding message", "err", err)
  136. return
  137. }
  138. r.Logger.Debug("Received message", "src", src, "chId", chID, "msg", msg)
  139. switch msg := msg.(type) {
  140. case *pexRequestMessage:
  141. // src requested some peers.
  142. // NOTE: we might send an empty selection
  143. r.SendAddrs(src, r.book.GetSelection())
  144. case *pexAddrsMessage:
  145. // We received some peer addresses from src.
  146. // TODO: (We don't want to get spammed with bad peers)
  147. for _, addr := range msg.Addrs {
  148. if addr != nil {
  149. r.book.AddAddress(addr, srcAddr)
  150. }
  151. }
  152. default:
  153. r.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
  154. }
  155. }
  156. // RequestPEX asks peer for more addresses.
  157. func (r *PEXReactor) RequestPEX(p Peer) {
  158. p.Send(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
  159. }
  160. // SendAddrs sends addrs to the peer.
  161. func (r *PEXReactor) SendAddrs(p Peer, addrs []*NetAddress) {
  162. p.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
  163. }
  164. // SetEnsurePeersPeriod sets period to ensure peers connected.
  165. func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) {
  166. r.ensurePeersPeriod = d
  167. }
  168. // SetMaxMsgCountByPeer sets maximum messages one peer can send to us during 'msgCountByPeerFlushInterval'.
  169. func (r *PEXReactor) SetMaxMsgCountByPeer(v uint16) {
  170. r.maxMsgCountByPeer = v
  171. }
  172. // ReachedMaxMsgCountForPeer returns true if we received too many
  173. // messages from peer with address `addr`.
  174. // NOTE: assumes the value in the CMap is non-nil
  175. func (r *PEXReactor) ReachedMaxMsgCountForPeer(addr string) bool {
  176. return r.msgCountByPeer.Get(addr).(uint16) >= r.maxMsgCountByPeer
  177. }
  178. // Increment or initialize the msg count for the peer in the CMap
  179. func (r *PEXReactor) IncrementMsgCountForPeer(addr string) {
  180. var count uint16
  181. countI := r.msgCountByPeer.Get(addr)
  182. if countI != nil {
  183. count = countI.(uint16)
  184. }
  185. count++
  186. r.msgCountByPeer.Set(addr, count)
  187. }
  188. // Ensures that sufficient peers are connected. (continuous)
  189. func (r *PEXReactor) ensurePeersRoutine() {
  190. // Randomize when routine starts
  191. ensurePeersPeriodMs := r.ensurePeersPeriod.Nanoseconds() / 1e6
  192. time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
  193. // fire once immediately.
  194. r.ensurePeers()
  195. // fire periodically
  196. ticker := time.NewTicker(r.ensurePeersPeriod)
  197. for {
  198. select {
  199. case <-ticker.C:
  200. r.ensurePeers()
  201. case <-r.Quit:
  202. ticker.Stop()
  203. return
  204. }
  205. }
  206. }
  207. // ensurePeers ensures that sufficient peers are connected. (once)
  208. //
  209. // Old bucket / New bucket are arbitrary categories to denote whether an
  210. // address is vetted or not, and this needs to be determined over time via a
  211. // heuristic that we haven't perfected yet, or, perhaps is manually edited by
  212. // the node operator. It should not be used to compute what addresses are
  213. // already connected or not.
  214. //
  215. // TODO Basically, we need to work harder on our good-peer/bad-peer marking.
  216. // What we're currently doing in terms of marking good/bad peers is just a
  217. // placeholder. It should not be the case that an address becomes old/vetted
  218. // upon a single successful connection.
  219. func (r *PEXReactor) ensurePeers() {
  220. numOutPeers, numInPeers, numDialing := r.Switch.NumPeers()
  221. numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
  222. r.Logger.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial)
  223. if numToDial <= 0 {
  224. return
  225. }
  226. // bias to prefer more vetted peers when we have fewer connections.
  227. // not perfect, but somewhate ensures that we prioritize connecting to more-vetted
  228. // NOTE: range here is [10, 90]. Too high ?
  229. newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
  230. toDial := make(map[string]*NetAddress)
  231. // Try maxAttempts times to pick numToDial addresses to dial
  232. maxAttempts := numToDial * 3
  233. for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ {
  234. try := r.book.PickAddress(newBias)
  235. if try == nil {
  236. continue
  237. }
  238. if _, selected := toDial[try.IP.String()]; selected {
  239. continue
  240. }
  241. if dialling := r.Switch.IsDialing(try); dialling {
  242. continue
  243. }
  244. // XXX: Should probably use pubkey as peer key ...
  245. if connected := r.Switch.Peers().Has(try.String()); connected {
  246. continue
  247. }
  248. r.Logger.Info("Will dial address", "addr", try)
  249. toDial[try.IP.String()] = try
  250. }
  251. // Dial picked addresses
  252. for _, item := range toDial {
  253. go func(picked *NetAddress) {
  254. _, err := r.Switch.DialPeerWithAddress(picked, false)
  255. if err != nil {
  256. r.book.MarkAttempt(picked)
  257. }
  258. }(item)
  259. }
  260. // If we need more addresses, pick a random peer and ask for more.
  261. if r.book.NeedMoreAddrs() {
  262. peers := r.Switch.Peers().List()
  263. peersCount := len(peers)
  264. if peersCount > 0 {
  265. peer := peers[rand.Int()%peersCount] // nolint: gas
  266. r.Logger.Info("We need more addresses. Sending pexRequest to random peer", "peer", peer)
  267. r.RequestPEX(peer)
  268. }
  269. }
  270. // If we are not connected to nor dialing anybody, fallback to dialing seeds.
  271. if numOutPeers+numInPeers+numDialing+len(toDial) == 0 {
  272. r.Logger.Info("No addresses to dial nor connected peers. Will dial seeds", "seeds", r.config.Seeds)
  273. r.Switch.DialPeersAsync(r.book, r.config.Seeds, false)
  274. }
  275. }
  276. func (r *PEXReactor) flushMsgCountByPeer() {
  277. ticker := time.NewTicker(msgCountByPeerFlushInterval)
  278. for {
  279. select {
  280. case <-ticker.C:
  281. r.msgCountByPeer.Clear()
  282. case <-r.Quit:
  283. ticker.Stop()
  284. return
  285. }
  286. }
  287. }
  288. //-----------------------------------------------------------------------------
  289. // Messages
  290. const (
  291. msgTypeRequest = byte(0x01)
  292. msgTypeAddrs = byte(0x02)
  293. )
  294. // PexMessage is a primary type for PEX messages. Underneath, it could contain
  295. // either pexRequestMessage, or pexAddrsMessage messages.
  296. type PexMessage interface{}
  297. var _ = wire.RegisterInterface(
  298. struct{ PexMessage }{},
  299. wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
  300. wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
  301. )
  302. // DecodeMessage implements interface registered above.
  303. func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
  304. msgType = bz[0]
  305. n := new(int)
  306. r := bytes.NewReader(bz)
  307. msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
  308. return
  309. }
  310. /*
  311. A pexRequestMessage requests additional peer addresses.
  312. */
  313. type pexRequestMessage struct {
  314. }
  315. func (m *pexRequestMessage) String() string {
  316. return "[pexRequest]"
  317. }
  318. /*
  319. A message with announced peer addresses.
  320. */
  321. type pexAddrsMessage struct {
  322. Addrs []*NetAddress
  323. }
  324. func (m *pexAddrsMessage) String() string {
  325. return fmt.Sprintf("[pexAddrs %v]", m.Addrs)
  326. }