You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

336 lines
7.8 KiB

  1. package crawler
  2. import (
  3. "fmt"
  4. "github.com/tendermint/tendermint/binary"
  5. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  6. rpcclient "github.com/tendermint/tendermint/rpc/core_client"
  7. "github.com/tendermint/tendermint/types"
  8. "time"
  9. "io/ioutil"
  10. )
  11. const (
  12. CheckQueueBufferSize = 100
  13. NodeQueueBufferSize = 100
  14. GetPeersTickerSeconds = 5
  15. )
  16. //---------------------------------------------------------------------------------------
  17. // crawler.Node
  18. // A node is a peer on the network
  19. type Node struct {
  20. Host string
  21. P2PPort uint16
  22. RPCPort uint16
  23. failed int
  24. connected bool
  25. client *NodeClient
  26. LastSeen time.Time
  27. ChainID string
  28. BlockHeight int
  29. BlockHistory map[int]time.Time // when we saw each block
  30. NetInfo *ctypes.ResponseNetInfo
  31. Validator bool
  32. // other peers we heard about this peer from
  33. heardFrom map[string]struct{}
  34. }
  35. func (n *Node) Address() string {
  36. return fmt.Sprintf("%s:%d", n.Host, n.RPCPort)
  37. }
  38. // Set the basic status and chain_id info for a node from RPC responses
  39. func (n *Node) SetInfo(status *ctypes.ResponseStatus, netinfo *ctypes.ResponseNetInfo) {
  40. n.LastSeen = time.Now()
  41. n.ChainID = status.NodeInfo.ChainID
  42. n.BlockHeight = status.LatestBlockHeight
  43. n.NetInfo = netinfo
  44. // n.Validator
  45. }
  46. // A node client is used to talk to a node over rpc and websockets
  47. type NodeClient struct {
  48. rpc rpcclient.Client
  49. ws *rpcclient.WSClient
  50. }
  51. // Create a new client for the node at the given addr
  52. func NewNodeClient(addr string) *NodeClient {
  53. return &NodeClient{
  54. rpc: rpcclient.NewClient("http://"+addr, "JSONRPC"),
  55. ws: rpcclient.NewWSClient("ws://" + addr + "/events"),
  56. }
  57. }
  58. // A simple wrapper for mediating access to the maps
  59. type nodeInfo struct {
  60. host string // the new nodes address
  61. port uint16 // node's listening port
  62. from string // the peer that told us about this node
  63. connected bool // move node from nodePool to nodes
  64. disconnected bool // move node from nodes to nodePool
  65. }
  66. func (ni nodeInfo) unpack() (string, uint16, string, bool, bool) {
  67. return ni.host, ni.port, ni.from, ni.connected, ni.disconnected
  68. }
  69. // crawler.Node
  70. //---------------------------------------------------------------------------------------
  71. // crawler.Crawler
  72. // A crawler has a local node, a set of potential nodes in the nodePool, and connected nodes.
  73. // Maps are only accessed by one go-routine, mediated by the checkQueue
  74. type Crawler struct {
  75. self *Node
  76. client *NodeClient
  77. checkQueue chan nodeInfo
  78. nodePool map[string]*Node
  79. nodes map[string]*Node
  80. nodeQueue chan *Node
  81. quit chan struct{}
  82. }
  83. // Create a new Crawler using the local RPC server at addr
  84. func NewCrawler(host string, port uint16) *Crawler {
  85. return &Crawler{
  86. self: &Node{Host: host, RPCPort: port, client: NewNodeClient(fmt.Sprintf("%s:%d", host, port))},
  87. checkQueue: make(chan nodeInfo, CheckQueueBufferSize),
  88. nodePool: make(map[string]*Node),
  89. nodes: make(map[string]*Node),
  90. nodeQueue: make(chan *Node, NodeQueueBufferSize),
  91. quit: make(chan struct{}),
  92. }
  93. }
  94. func (c *Crawler) Start() error {
  95. // connect to local node first, set info,
  96. // and fire peers onto the checkQueue
  97. if err := c.pollNode(c.self); err != nil {
  98. return err
  99. }
  100. // connect to weboscket, subscribe to local events
  101. // and run the read loop to listen for new blocks
  102. if r, err := c.self.client.ws.Dial(); err != nil {
  103. fmt.Println(r)
  104. b, _ := ioutil.ReadAll(r.Body)
  105. fmt.Println(string(b))
  106. return err
  107. }
  108. if err := c.self.client.ws.Subscribe(types.EventStringNewBlock()); err != nil {
  109. return err
  110. }
  111. go c.readLoop(c.self)
  112. // add ourselves to the nodes list
  113. c.nodes[c.self.Address()] = c.self
  114. // nodes we hear about get put on the checkQueue
  115. // by pollNode and are handled in the checkLoop.
  116. // if its a node we're not already connected to,
  117. // it gets put on the nodeQueue and
  118. // we attempt to connect in the connectLoop
  119. go c.checkLoop()
  120. go c.connectLoop()
  121. return nil
  122. }
  123. func (c *Crawler) Stop() {
  124. close(c.quit)
  125. }
  126. // listen for events from the node and ping it for peers on a ticker
  127. func (c *Crawler) readLoop(node *Node) {
  128. wsChan := node.client.ws.Read()
  129. getPeersTicker := time.Tick(time.Second * GetPeersTickerSeconds)
  130. for {
  131. select {
  132. case wsMsg := <-wsChan:
  133. // update the node with his new info
  134. if err := c.consumeMessage(wsMsg, node); err != nil {
  135. // lost the node, put him back on the checkQueu
  136. c.checkNode(nodeInfo{
  137. host: node.Host,
  138. port: node.RPCPort,
  139. disconnected: true,
  140. })
  141. }
  142. case <-getPeersTicker:
  143. if err := c.pollNode(node); err != nil {
  144. // lost the node, put him back on the checkQueu
  145. c.checkNode(nodeInfo{
  146. host: node.Host,
  147. port: node.RPCPort,
  148. disconnected: true,
  149. })
  150. }
  151. case <-c.quit:
  152. return
  153. }
  154. }
  155. }
  156. func (c *Crawler) consumeMessage(wsMsg *rpcclient.WSMsg, node *Node) error {
  157. if wsMsg.Error != nil {
  158. return wsMsg.Error
  159. }
  160. // unmarshal block event
  161. var response struct {
  162. Event string
  163. Data *types.Block
  164. Error string
  165. }
  166. var err error
  167. binary.ReadJSON(&response, wsMsg.Data, &err)
  168. if err != nil {
  169. return err
  170. }
  171. if response.Error != "" {
  172. return fmt.Errorf(response.Error)
  173. }
  174. block := response.Data
  175. node.LastSeen = time.Now()
  176. node.BlockHeight = block.Height
  177. node.BlockHistory[block.Height] = node.LastSeen
  178. return nil
  179. }
  180. // check nodes against the nodePool map one at a time
  181. // acts as a mutex on nodePool and nodes
  182. func (c *Crawler) checkLoop() {
  183. for {
  184. select {
  185. case ni := <-c.checkQueue:
  186. host, port, from, connected, disconnected := ni.unpack()
  187. addr := fmt.Sprintf("%s:%d", host, port)
  188. // check if we need to swap node between maps (eg. its connected or disconnected)
  189. // NOTE: once we hear about a node, we never forget ...
  190. if connected {
  191. n, _ := c.nodePool[addr]
  192. c.nodes[addr] = n
  193. delete(c.nodePool, addr)
  194. continue
  195. } else if disconnected {
  196. n, _ := c.nodes[addr]
  197. c.nodePool[addr] = n
  198. delete(c.nodes, addr)
  199. continue
  200. }
  201. // TODO: if address is badly formed
  202. // we should punish ni.from
  203. _ = from
  204. n, ok := c.nodePool[addr]
  205. // create the node if unknown
  206. if !ok {
  207. n = &Node{Host: host, RPCPort: port}
  208. c.nodePool[addr] = n
  209. } else if n.connected {
  210. // should be removed soon
  211. continue
  212. }
  213. // queue it for connecting to
  214. c.nodeQueue <- n
  215. case <-c.quit:
  216. return
  217. }
  218. }
  219. }
  220. // read off the nodeQueue and attempt to connect to nodes
  221. func (c *Crawler) connectLoop() {
  222. for {
  223. select {
  224. case node := <-c.nodeQueue:
  225. go c.connectToNode(node)
  226. case <-c.quit:
  227. // close all connections
  228. for addr, node := range c.nodes {
  229. _, _ = addr, node
  230. // TODO: close conn
  231. }
  232. return
  233. }
  234. }
  235. }
  236. func (c *Crawler) connectToNode(node *Node) {
  237. addr := node.Address()
  238. node.client = NewNodeClient(addr)
  239. if b, err := node.client.ws.Dial(); err != nil {
  240. fmt.Println("err on ws dial:", b, err)
  241. // set failed, return
  242. }
  243. // remove from nodePool, add to nodes
  244. c.checkNode(nodeInfo{
  245. host: node.Host,
  246. port: node.RPCPort,
  247. connected: true,
  248. })
  249. if err := c.pollNode(node); err != nil {
  250. // TODO: we had a good ws con
  251. // but failed on rpc?!
  252. // try again or something ...
  253. // if we still fail, report and disconnect
  254. }
  255. fmt.Println("Successfully connected to node", node.Address())
  256. // blocks (until quit or err)
  257. c.readLoop(node)
  258. }
  259. func (c *Crawler) checkNode(ni nodeInfo) {
  260. c.checkQueue <- ni
  261. }
  262. func (c *Crawler) pollNode(node *Node) error {
  263. // get the status info
  264. status, err := node.client.rpc.Status()
  265. if err != nil {
  266. return err
  267. }
  268. // get peers and net info
  269. netinfo, err := node.client.rpc.NetInfo()
  270. if err != nil {
  271. return err
  272. }
  273. // set the info for the node
  274. node.SetInfo(status, netinfo)
  275. // fire each peer on the checkQueue
  276. for _, p := range netinfo.Peers {
  277. c.checkNode(nodeInfo{
  278. host: p.Host,
  279. port: p.RPCPort,
  280. from: node.Address(),
  281. })
  282. }
  283. return nil
  284. }