You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

312 lines
7.4 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package crawler
  2. import (
  3. "fmt"
  4. "time"
  5. . "github.com/tendermint/tendermint/common"
  6. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  7. cclient "github.com/tendermint/tendermint/rpc/core_client"
  8. "github.com/tendermint/tendermint/types"
  9. )
  10. const (
  11. CheckQueueBufferSize = 100
  12. NodeQueueBufferSize = 100
  13. GetPeersTickerSeconds = 5
  14. )
  15. //---------------------------------------------------------------------------------------
  16. // crawler.Node
  17. // A node is a peer on the network
  18. type Node struct {
  19. Host string
  20. P2PPort uint16
  21. RPCPort uint16
  22. failed int
  23. connected bool
  24. client *NodeClient
  25. LastSeen time.Time
  26. ChainID string
  27. BlockHeight int
  28. BlockHistory map[int]time.Time // when we saw each block
  29. NetInfo *ctypes.ResultNetInfo
  30. Validator bool
  31. // other peers we heard about this peer from
  32. heardFrom map[string]struct{}
  33. }
  34. func (n *Node) Address() string {
  35. return fmt.Sprintf("%s:%d", n.Host, n.RPCPort)
  36. }
  37. // Set the basic status and chain_id info for a node from RPC responses
  38. func (n *Node) SetInfo(status *ctypes.ResultStatus, netinfo *ctypes.ResultNetInfo) {
  39. n.LastSeen = time.Now()
  40. n.ChainID = status.NodeInfo.ChainID
  41. n.BlockHeight = status.LatestBlockHeight
  42. n.NetInfo = netinfo
  43. // n.Validator
  44. }
  45. // A node client is used to talk to a node over rpc and websockets
  46. type NodeClient struct {
  47. rpc cclient.Client
  48. ws *cclient.WSClient
  49. }
  50. // Create a new client for the node at the given addr
  51. func NewNodeClient(addr string) *NodeClient {
  52. return &NodeClient{
  53. rpc: cclient.NewClient("http://"+addr, "JSONRPC"),
  54. ws: cclient.NewWSClient("ws://" + addr + "/events"),
  55. }
  56. }
  57. // A simple wrapper for mediating access to the maps
  58. type nodeInfo struct {
  59. host string // the new nodes address
  60. port uint16 // node's listening port
  61. from string // the peer that told us about this node
  62. connected bool // move node from nodePool to nodes
  63. disconnected bool // move node from nodes to nodePool
  64. }
  65. func (ni nodeInfo) unpack() (string, uint16, string, bool, bool) {
  66. return ni.host, ni.port, ni.from, ni.connected, ni.disconnected
  67. }
  68. // crawler.Node
  69. //---------------------------------------------------------------------------------------
  70. // crawler.Crawler
  71. // A crawler has a local node, a set of potential nodes in the nodePool, and connected nodes.
  72. // Maps are only accessed by one go-routine, mediated by the checkQueue
  73. type Crawler struct {
  74. QuitService
  75. self *Node
  76. client *NodeClient
  77. checkQueue chan nodeInfo
  78. nodePool map[string]*Node
  79. nodes map[string]*Node
  80. nodeQueue chan *Node
  81. }
  82. // Create a new Crawler using the local RPC server at addr
  83. func NewCrawler(host string, port uint16) *Crawler {
  84. crawler := &Crawler{
  85. self: &Node{Host: host, RPCPort: port, client: NewNodeClient(fmt.Sprintf("%s:%d", host, port))},
  86. checkQueue: make(chan nodeInfo, CheckQueueBufferSize),
  87. nodePool: make(map[string]*Node),
  88. nodes: make(map[string]*Node),
  89. nodeQueue: make(chan *Node, NodeQueueBufferSize),
  90. }
  91. crawler.QuitService = *NewQuitService(log, "Crawler", crawler)
  92. return crawler
  93. }
  94. func (c *Crawler) OnStart() error {
  95. // connect to local node first, set info,
  96. // and fire peers onto the checkQueue
  97. if err := c.pollNode(c.self); err != nil {
  98. return err
  99. }
  100. // connect to weboscket, subscribe to local events
  101. // and run the read loop to listen for new blocks
  102. _, err := c.self.client.ws.Start()
  103. if err != nil {
  104. return err
  105. }
  106. if err := c.self.client.ws.Subscribe(types.EventStringNewBlock()); err != nil {
  107. return err
  108. }
  109. go c.readLoop(c.self)
  110. // add ourselves to the nodes list
  111. c.nodes[c.self.Address()] = c.self
  112. // nodes we hear about get put on the checkQueue
  113. // by pollNode and are handled in the checkLoop.
  114. // if its a node we're not already connected to,
  115. // it gets put on the nodeQueue and
  116. // we attempt to connect in the connectLoop
  117. go c.checkLoop()
  118. go c.connectLoop()
  119. return nil
  120. }
  121. // listen for events from the node and ping it for peers on a ticker
  122. func (c *Crawler) readLoop(node *Node) {
  123. eventsCh := node.client.ws.EventsCh
  124. getPeersTicker := time.Tick(time.Second * GetPeersTickerSeconds)
  125. for {
  126. select {
  127. case eventMsg := <-eventsCh:
  128. // update the node with his new info
  129. if err := c.consumeMessage(eventMsg, node); err != nil {
  130. // lost the node, put him back on the checkQueu
  131. c.checkNode(nodeInfo{
  132. host: node.Host,
  133. port: node.RPCPort,
  134. disconnected: true,
  135. })
  136. }
  137. case <-getPeersTicker:
  138. if err := c.pollNode(node); err != nil {
  139. // lost the node, put him back on the checkQueu
  140. c.checkNode(nodeInfo{
  141. host: node.Host,
  142. port: node.RPCPort,
  143. disconnected: true,
  144. })
  145. }
  146. case <-c.Quit:
  147. return
  148. }
  149. }
  150. }
  151. func (c *Crawler) consumeMessage(eventMsg ctypes.ResultEvent, node *Node) error {
  152. block := eventMsg.Data.(*types.EventDataNewBlock).Block
  153. node.LastSeen = time.Now()
  154. node.BlockHeight = block.Height
  155. node.BlockHistory[block.Height] = node.LastSeen
  156. return nil
  157. }
  158. // check nodes against the nodePool map one at a time
  159. // acts as a mutex on nodePool and nodes
  160. func (c *Crawler) checkLoop() {
  161. for {
  162. select {
  163. case ni := <-c.checkQueue:
  164. host, port, from, connected, disconnected := ni.unpack()
  165. addr := fmt.Sprintf("%s:%d", host, port)
  166. // check if we need to swap node between maps (eg. its connected or disconnected)
  167. // NOTE: once we hear about a node, we never forget ...
  168. if connected {
  169. n, _ := c.nodePool[addr]
  170. c.nodes[addr] = n
  171. delete(c.nodePool, addr)
  172. continue
  173. } else if disconnected {
  174. n, _ := c.nodes[addr]
  175. c.nodePool[addr] = n
  176. delete(c.nodes, addr)
  177. continue
  178. }
  179. // TODO: if address is badly formed
  180. // we should punish ni.from
  181. _ = from
  182. n, ok := c.nodePool[addr]
  183. // create the node if unknown
  184. if !ok {
  185. n = &Node{Host: host, RPCPort: port}
  186. c.nodePool[addr] = n
  187. } else if n.connected {
  188. // should be removed soon
  189. continue
  190. }
  191. // queue it for connecting to
  192. c.nodeQueue <- n
  193. case <-c.Quit:
  194. return
  195. }
  196. }
  197. }
  198. // read off the nodeQueue and attempt to connect to nodes
  199. func (c *Crawler) connectLoop() {
  200. for {
  201. select {
  202. case node := <-c.nodeQueue:
  203. go c.connectToNode(node)
  204. case <-c.Quit:
  205. // close all connections
  206. for addr, node := range c.nodes {
  207. _, _ = addr, node
  208. // TODO: close conn
  209. }
  210. return
  211. }
  212. }
  213. }
  214. func (c *Crawler) connectToNode(node *Node) {
  215. addr := node.Address()
  216. node.client = NewNodeClient(addr)
  217. _, err := node.client.ws.Start()
  218. if err != nil {
  219. fmt.Println("err on ws start:", err)
  220. // set failed, return
  221. }
  222. // remove from nodePool, add to nodes
  223. c.checkNode(nodeInfo{
  224. host: node.Host,
  225. port: node.RPCPort,
  226. connected: true,
  227. })
  228. if err := c.pollNode(node); err != nil {
  229. // TODO: we had a good ws con
  230. // but failed on rpc?!
  231. // try again or something ...
  232. // if we still fail, report and disconnect
  233. }
  234. fmt.Println("Successfully connected to node", node.Address())
  235. // blocks (until quit or err)
  236. c.readLoop(node)
  237. }
  238. func (c *Crawler) checkNode(ni nodeInfo) {
  239. c.checkQueue <- ni
  240. }
  241. func (c *Crawler) pollNode(node *Node) error {
  242. // get the status info
  243. status, err := node.client.rpc.Status()
  244. if err != nil {
  245. return err
  246. }
  247. // get peers and net info
  248. netinfo, err := node.client.rpc.NetInfo()
  249. if err != nil {
  250. return err
  251. }
  252. // set the info for the node
  253. node.SetInfo(status, netinfo)
  254. // fire each peer on the checkQueue
  255. for _, p := range netinfo.Peers {
  256. c.checkNode(nodeInfo{
  257. host: p.Host,
  258. port: p.RPCPort,
  259. from: node.Address(),
  260. })
  261. }
  262. return nil
  263. }