You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

336 lines
7.8 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package crawler
  2. import (
  3. "fmt"
  4. "github.com/tendermint/tendermint/wire"
  5. ctypes "github.com/tendermint/tendermint/rpc/core/types"
  6. rpcclient "github.com/tendermint/tendermint/rpc/core_client"
  7. "github.com/tendermint/tendermint/types"
  8. "time"
  9. "io/ioutil"
  10. )
  11. const (
  12. CheckQueueBufferSize = 100
  13. NodeQueueBufferSize = 100
  14. GetPeersTickerSeconds = 5
  15. )
  16. //---------------------------------------------------------------------------------------
  17. // crawler.Node
  18. // A node is a peer on the network
  19. type Node struct {
  20. Host string
  21. P2PPort uint16
  22. RPCPort uint16
  23. failed int
  24. connected bool
  25. client *NodeClient
  26. LastSeen time.Time
  27. ChainID string
  28. BlockHeight int
  29. BlockHistory map[int]time.Time // when we saw each block
  30. NetInfo *ctypes.ResponseNetInfo
  31. Validator bool
  32. // other peers we heard about this peer from
  33. heardFrom map[string]struct{}
  34. }
  35. func (n *Node) Address() string {
  36. return fmt.Sprintf("%s:%d", n.Host, n.RPCPort)
  37. }
  38. // Set the basic status and chain_id info for a node from RPC responses
  39. func (n *Node) SetInfo(status *ctypes.ResponseStatus, netinfo *ctypes.ResponseNetInfo) {
  40. n.LastSeen = time.Now()
  41. n.ChainID = status.NodeInfo.ChainID
  42. n.BlockHeight = status.LatestBlockHeight
  43. n.NetInfo = netinfo
  44. // n.Validator
  45. }
  46. // A node client is used to talk to a node over rpc and websockets
  47. type NodeClient struct {
  48. rpc rpcclient.Client
  49. ws *rpcclient.WSClient
  50. }
  51. // Create a new client for the node at the given addr
  52. func NewNodeClient(addr string) *NodeClient {
  53. return &NodeClient{
  54. rpc: rpcclient.NewClient("http://"+addr, "JSONRPC"),
  55. ws: rpcclient.NewWSClient("ws://" + addr + "/events"),
  56. }
  57. }
  58. // A simple wrapper for mediating access to the maps
  59. type nodeInfo struct {
  60. host string // the new nodes address
  61. port uint16 // node's listening port
  62. from string // the peer that told us about this node
  63. connected bool // move node from nodePool to nodes
  64. disconnected bool // move node from nodes to nodePool
  65. }
  66. func (ni nodeInfo) unpack() (string, uint16, string, bool, bool) {
  67. return ni.host, ni.port, ni.from, ni.connected, ni.disconnected
  68. }
  69. // crawler.Node
  70. //---------------------------------------------------------------------------------------
  71. // crawler.Crawler
  72. // A crawler has a local node, a set of potential nodes in the nodePool, and connected nodes.
  73. // Maps are only accessed by one go-routine, mediated by the checkQueue
  74. type Crawler struct {
  75. self *Node
  76. client *NodeClient
  77. checkQueue chan nodeInfo
  78. nodePool map[string]*Node
  79. nodes map[string]*Node
  80. nodeQueue chan *Node
  81. quit chan struct{}
  82. }
  83. // Create a new Crawler using the local RPC server at addr
  84. func NewCrawler(host string, port uint16) *Crawler {
  85. return &Crawler{
  86. self: &Node{Host: host, RPCPort: port, client: NewNodeClient(fmt.Sprintf("%s:%d", host, port))},
  87. checkQueue: make(chan nodeInfo, CheckQueueBufferSize),
  88. nodePool: make(map[string]*Node),
  89. nodes: make(map[string]*Node),
  90. nodeQueue: make(chan *Node, NodeQueueBufferSize),
  91. quit: make(chan struct{}),
  92. }
  93. }
  94. func (c *Crawler) Start() error {
  95. // connect to local node first, set info,
  96. // and fire peers onto the checkQueue
  97. if err := c.pollNode(c.self); err != nil {
  98. return err
  99. }
  100. // connect to weboscket, subscribe to local events
  101. // and run the read loop to listen for new blocks
  102. if r, err := c.self.client.ws.Dial(); err != nil {
  103. fmt.Println(r)
  104. b, _ := ioutil.ReadAll(r.Body)
  105. fmt.Println(string(b))
  106. return err
  107. }
  108. if err := c.self.client.ws.Subscribe(types.EventStringNewBlock()); err != nil {
  109. return err
  110. }
  111. go c.readLoop(c.self)
  112. // add ourselves to the nodes list
  113. c.nodes[c.self.Address()] = c.self
  114. // nodes we hear about get put on the checkQueue
  115. // by pollNode and are handled in the checkLoop.
  116. // if its a node we're not already connected to,
  117. // it gets put on the nodeQueue and
  118. // we attempt to connect in the connectLoop
  119. go c.checkLoop()
  120. go c.connectLoop()
  121. return nil
  122. }
  123. func (c *Crawler) Stop() {
  124. close(c.quit)
  125. }
  126. // listen for events from the node and ping it for peers on a ticker
  127. func (c *Crawler) readLoop(node *Node) {
  128. wsChan := node.client.ws.Read()
  129. getPeersTicker := time.Tick(time.Second * GetPeersTickerSeconds)
  130. for {
  131. select {
  132. case wsMsg := <-wsChan:
  133. // update the node with his new info
  134. if err := c.consumeMessage(wsMsg, node); err != nil {
  135. // lost the node, put him back on the checkQueu
  136. c.checkNode(nodeInfo{
  137. host: node.Host,
  138. port: node.RPCPort,
  139. disconnected: true,
  140. })
  141. }
  142. case <-getPeersTicker:
  143. if err := c.pollNode(node); err != nil {
  144. // lost the node, put him back on the checkQueu
  145. c.checkNode(nodeInfo{
  146. host: node.Host,
  147. port: node.RPCPort,
  148. disconnected: true,
  149. })
  150. }
  151. case <-c.quit:
  152. return
  153. }
  154. }
  155. }
  156. func (c *Crawler) consumeMessage(wsMsg *rpcclient.WSMsg, node *Node) error {
  157. if wsMsg.Error != nil {
  158. return wsMsg.Error
  159. }
  160. // unmarshal block event
  161. var response struct {
  162. Event string
  163. Data *types.Block
  164. Error string
  165. }
  166. var err error
  167. wire.ReadJSON(&response, wsMsg.Data, &err)
  168. if err != nil {
  169. return err
  170. }
  171. if response.Error != "" {
  172. return fmt.Errorf(response.Error)
  173. }
  174. block := response.Data
  175. node.LastSeen = time.Now()
  176. node.BlockHeight = block.Height
  177. node.BlockHistory[block.Height] = node.LastSeen
  178. return nil
  179. }
  180. // check nodes against the nodePool map one at a time
  181. // acts as a mutex on nodePool and nodes
  182. func (c *Crawler) checkLoop() {
  183. for {
  184. select {
  185. case ni := <-c.checkQueue:
  186. host, port, from, connected, disconnected := ni.unpack()
  187. addr := fmt.Sprintf("%s:%d", host, port)
  188. // check if we need to swap node between maps (eg. its connected or disconnected)
  189. // NOTE: once we hear about a node, we never forget ...
  190. if connected {
  191. n, _ := c.nodePool[addr]
  192. c.nodes[addr] = n
  193. delete(c.nodePool, addr)
  194. continue
  195. } else if disconnected {
  196. n, _ := c.nodes[addr]
  197. c.nodePool[addr] = n
  198. delete(c.nodes, addr)
  199. continue
  200. }
  201. // TODO: if address is badly formed
  202. // we should punish ni.from
  203. _ = from
  204. n, ok := c.nodePool[addr]
  205. // create the node if unknown
  206. if !ok {
  207. n = &Node{Host: host, RPCPort: port}
  208. c.nodePool[addr] = n
  209. } else if n.connected {
  210. // should be removed soon
  211. continue
  212. }
  213. // queue it for connecting to
  214. c.nodeQueue <- n
  215. case <-c.quit:
  216. return
  217. }
  218. }
  219. }
  220. // read off the nodeQueue and attempt to connect to nodes
  221. func (c *Crawler) connectLoop() {
  222. for {
  223. select {
  224. case node := <-c.nodeQueue:
  225. go c.connectToNode(node)
  226. case <-c.quit:
  227. // close all connections
  228. for addr, node := range c.nodes {
  229. _, _ = addr, node
  230. // TODO: close conn
  231. }
  232. return
  233. }
  234. }
  235. }
  236. func (c *Crawler) connectToNode(node *Node) {
  237. addr := node.Address()
  238. node.client = NewNodeClient(addr)
  239. if b, err := node.client.ws.Dial(); err != nil {
  240. fmt.Println("err on ws dial:", b, err)
  241. // set failed, return
  242. }
  243. // remove from nodePool, add to nodes
  244. c.checkNode(nodeInfo{
  245. host: node.Host,
  246. port: node.RPCPort,
  247. connected: true,
  248. })
  249. if err := c.pollNode(node); err != nil {
  250. // TODO: we had a good ws con
  251. // but failed on rpc?!
  252. // try again or something ...
  253. // if we still fail, report and disconnect
  254. }
  255. fmt.Println("Successfully connected to node", node.Address())
  256. // blocks (until quit or err)
  257. c.readLoop(node)
  258. }
  259. func (c *Crawler) checkNode(ni nodeInfo) {
  260. c.checkQueue <- ni
  261. }
  262. func (c *Crawler) pollNode(node *Node) error {
  263. // get the status info
  264. status, err := node.client.rpc.Status()
  265. if err != nil {
  266. return err
  267. }
  268. // get peers and net info
  269. netinfo, err := node.client.rpc.NetInfo()
  270. if err != nil {
  271. return err
  272. }
  273. // set the info for the node
  274. node.SetInfo(status, netinfo)
  275. // fire each peer on the checkQueue
  276. for _, p := range netinfo.Peers {
  277. c.checkNode(nodeInfo{
  278. host: p.Host,
  279. port: p.RPCPort,
  280. from: node.Address(),
  281. })
  282. }
  283. return nil
  284. }