You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

318 lines
6.7 KiB

  1. package crawler
  2. import (
  3. "fmt"
  4. rpctypes "github.com/tendermint/tendermint/rpc/core/types"
  5. rpcclient "github.com/tendermint/tendermint/rpc/core_client"
  6. types "github.com/tendermint/tendermint/types"
  7. "sync"
  8. "time"
  9. )
  10. const (
  11. CheckQueueBufferSize = 100
  12. NodeQueueBufferSize = 100
  13. )
  14. //---------------------------------------------------------------------------------------
  15. // crawler.Node
  16. // A node is a peer on the network.
  17. type Node struct {
  18. Host string
  19. P2PPort uint16
  20. RPCPort uint16
  21. failed int
  22. connected bool
  23. client *NodeClient
  24. LastSeen time.Time
  25. GenesisHash []byte
  26. BlockHeight uint
  27. NetInfo *rpctypes.ResponseNetInfo
  28. Validator bool
  29. // other peers we heard about this peer from
  30. heardFrom map[string]struct{}
  31. }
  32. func (n *Node) Address() string {
  33. return fmt.Sprintf("%s:%d", n.Host, n.RPCPort)
  34. }
  35. // Set the basic status and network info for a node from RPC responses
  36. func (n *Node) SetInfo(status *rpctypes.ResponseStatus, netinfo *rpctypes.ResponseNetInfo) {
  37. n.LastSeen = time.Now()
  38. n.GenesisHash = status.GenesisHash
  39. n.BlockHeight = status.LatestBlockHeight
  40. n.NetInfo = netinfo
  41. // n.Validator
  42. }
  43. // A node client is used to talk to a node over rpc and websockets
  44. type NodeClient struct {
  45. rpc rpcclient.Client
  46. ws *rpcclient.WSClient
  47. }
  48. // Create a new client for the node at the given addr
  49. func NewNodeClient(addr string) *NodeClient {
  50. return &NodeClient{
  51. rpc: rpcclient.NewClient(addr, "JSONRPC"),
  52. ws: rpcclient.NewWSClient(addr),
  53. }
  54. }
  55. // A simple wrapper for mediating access to the maps
  56. type nodeInfo struct {
  57. host string // the new nodes address
  58. port uint16
  59. from string // the peer that told us about this node
  60. removed bool // whether to remove from nodePool
  61. }
  62. func (ni nodeInfo) unpack() (string, uint16, string, bool) {
  63. return ni.host, ni.port, ni.from, ni.removed
  64. }
  65. // crawler.Node
  66. //---------------------------------------------------------------------------------------
  67. // crawler.Crawler
  68. // A crawler has a local node, a set of potential nodes in the nodePool
  69. // and connected nodes. Maps are only accessed by one go-routine, mediated through channels
  70. type Crawler struct {
  71. self *Node
  72. client *NodeClient
  73. checkQueue chan nodeInfo
  74. nodePool map[string]*Node
  75. nodes map[string]*Node
  76. nodeQueue chan *Node
  77. quit chan struct{}
  78. // waits for checkQueue to empty
  79. // so we can re-poll all nodes
  80. wg sync.WaitGroup
  81. }
  82. // Create a new Crawler using the local RPC server at addr
  83. func NewCrawler(host string, port uint16) *Crawler {
  84. return &Crawler{
  85. self: &Node{Host: host, RPCPort: port},
  86. client: NewNodeClient(fmt.Sprintf("%s:%d", host, port)),
  87. checkQueue: make(chan nodeInfo, CheckQueueBufferSize),
  88. nodePool: make(map[string]*Node),
  89. nodes: make(map[string]*Node),
  90. nodeQueue: make(chan *Node, NodeQueueBufferSize),
  91. quit: make(chan struct{}),
  92. }
  93. }
  94. func (c *Crawler) checkNode(ni nodeInfo) {
  95. c.wg.Add(1)
  96. c.checkQueue <- ni
  97. }
  98. func (c *Crawler) Start() error {
  99. // make sure we can connect
  100. // to our local node first
  101. status, err := c.client.rpc.Status()
  102. if err != nil {
  103. return err
  104. }
  105. // connect to weboscket and subscribe to local events
  106. if err = c.client.ws.Dial(); err != nil {
  107. return err
  108. }
  109. if err = c.client.ws.Subscribe(types.EventStringNewBlock()); err != nil {
  110. return err
  111. }
  112. go c.readLocalEvents()
  113. // add ourselves to the nodes list
  114. c.nodes[c.self.Address()] = c.self
  115. // get peers from local node
  116. netinfo, err := c.client.rpc.NetInfo()
  117. if err != nil {
  118. return err
  119. }
  120. // set the info for ourselves
  121. c.self.SetInfo(status, netinfo)
  122. // fire each peer on the checkQueue
  123. for _, p := range netinfo.Peers {
  124. c.checkNode(nodeInfo{
  125. host: p.Host,
  126. port: p.RPCPort,
  127. })
  128. }
  129. // nodes we hear about get put on the
  130. // checkQueue and are handled in the checkLoop
  131. // if its a node we're not already connected to,
  132. // it gets put it on the nodeQueue and
  133. // we attempt to connect in the connectLoop
  134. go c.checkLoop()
  135. go c.connectLoop()
  136. // finally, a routine with a ticker to poll nodes for peers
  137. go c.pollNodesRoutine()
  138. return nil
  139. }
  140. func (c *Crawler) Stop() {
  141. close(c.quit)
  142. }
  143. func (c *Crawler) readLocalEvents() {
  144. // read on our ws for NewBlocks
  145. }
  146. // check nodes against the nodePool map one at a time
  147. // acts as a mutex on nodePool
  148. func (c *Crawler) checkLoop() {
  149. c.wg.Add(1)
  150. for {
  151. // every time the loop restarts
  152. // it means we processed from the checkQueue
  153. // (except the first time, hence the extra wg.Add)
  154. c.wg.Done()
  155. select {
  156. case ni := <-c.checkQueue:
  157. host, port, from, removed := ni.unpack()
  158. addr := fmt.Sprintf("%s:%d", host, port)
  159. // check if the node should be removed
  160. // (eg. if its been connected to or abandoned)
  161. if removed {
  162. n, _ := c.nodePool[addr]
  163. if n.connected {
  164. c.nodes[addr] = n
  165. }
  166. delete(c.nodePool, addr)
  167. continue
  168. }
  169. // TODO: if address is badly formed
  170. // we should punish ni.from
  171. _ = from
  172. n, ok := c.nodePool[addr]
  173. // create the node if unknown
  174. if !ok {
  175. n = &Node{Host: host, RPCPort: port}
  176. c.nodes[addr] = n
  177. } else if n.connected {
  178. // should be removed soon
  179. continue
  180. }
  181. // queue it for connecting to
  182. c.nodeQueue <- n
  183. case <-c.quit:
  184. return
  185. }
  186. }
  187. }
  188. // read off the nodeQueue and attempt to connect to nodes
  189. func (c *Crawler) connectLoop() {
  190. for {
  191. select {
  192. case node := <-c.nodeQueue:
  193. go c.connectToNode(node)
  194. case <-c.quit:
  195. // close all connections
  196. for addr, node := range c.nodes {
  197. _, _ = addr, node
  198. // TODO: close conn
  199. }
  200. return
  201. }
  202. }
  203. }
  204. func (c *Crawler) connectToNode(node *Node) {
  205. addr := node.Address()
  206. node.client = NewNodeClient(addr)
  207. if err := node.client.ws.Dial(); err != nil {
  208. // set failed, return
  209. }
  210. // remove from nodePool, add to nodes
  211. c.checkNode(nodeInfo{
  212. host: node.Host,
  213. port: node.RPCPort,
  214. removed: true,
  215. })
  216. c.pollNode(node)
  217. // TODO: read loop
  218. }
  219. func (c *Crawler) pollNode(node *Node) error {
  220. status, err := node.client.rpc.Status()
  221. if err != nil {
  222. return err
  223. }
  224. // get peers
  225. netinfo, err := node.client.rpc.NetInfo()
  226. if err != nil {
  227. return err
  228. }
  229. node.SetInfo(status, netinfo)
  230. // fire each peer on the checkQueue
  231. for _, p := range netinfo.Peers {
  232. c.checkNode(nodeInfo{
  233. host: p.Host,
  234. port: p.RPCPort,
  235. from: node.Address(),
  236. })
  237. }
  238. return nil
  239. }
  240. // wait for the checkQueue to empty and poll all the nodes
  241. func (c *Crawler) pollNodesRoutine() {
  242. for {
  243. c.wg.Wait()
  244. ticker := time.Tick(time.Second)
  245. // wait a few seconds to make sure we really have nothing
  246. time.Sleep(time.Second * 5)
  247. ch := make(chan struct{})
  248. go func() {
  249. c.wg.Wait()
  250. ch <- struct{}{}
  251. }()
  252. //
  253. select {
  254. case <-ticker:
  255. // the checkQueue has filled up again, move on
  256. continue
  257. case <-ch:
  258. // the checkQueue is legit empty,
  259. // TODO: poll the nodes!
  260. case <-c.quit:
  261. return
  262. }
  263. }
  264. }