From 882a82bad4ff538a7f7d4a50a1f301ce66a7a0e8 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 20 Apr 2015 14:00:19 -0700 Subject: [PATCH] more crawler work --- crawler/crawl.go | 244 +++++++++++++++++++---------------- p2p/peer.go | 8 ++ rpc/core_client/ws_client.go | 30 ++++- 3 files changed, 165 insertions(+), 117 deletions(-) diff --git a/crawler/crawl.go b/crawler/crawl.go index c61e0535d..a44f360e6 100644 --- a/crawler/crawl.go +++ b/crawler/crawl.go @@ -2,16 +2,19 @@ package crawler import ( "fmt" + "github.com/tendermint/tendermint/binary" rpctypes "github.com/tendermint/tendermint/rpc/core/types" rpcclient "github.com/tendermint/tendermint/rpc/core_client" - types "github.com/tendermint/tendermint/types" - "sync" + "github.com/tendermint/tendermint/types" "time" + + "io/ioutil" ) const ( - CheckQueueBufferSize = 100 - NodeQueueBufferSize = 100 + CheckQueueBufferSize = 100 + NodeQueueBufferSize = 100 + GetPeersTickerSeconds = 5 ) //--------------------------------------------------------------------------------------- @@ -28,10 +31,11 @@ type Node struct { client *NodeClient - LastSeen time.Time - GenesisHash []byte - BlockHeight uint - NetInfo *rpctypes.ResponseNetInfo + LastSeen time.Time + GenesisHash []byte + BlockHeight uint + BlockHistory map[uint]time.Time // when we saw each block + NetInfo *rpctypes.ResponseNetInfo Validator bool @@ -61,29 +65,30 @@ type NodeClient struct { // Create a new client for the node at the given addr func NewNodeClient(addr string) *NodeClient { return &NodeClient{ - rpc: rpcclient.NewClient(addr, "JSONRPC"), - ws: rpcclient.NewWSClient(addr), + rpc: rpcclient.NewClient("http://"+addr, "JSONRPC"), + ws: rpcclient.NewWSClient("ws://" + addr + "/events"), } } // A simple wrapper for mediating access to the maps type nodeInfo struct { - host string // the new nodes address - port uint16 - from string // the peer that told us about this node - removed bool // whether to remove from nodePool + host string // the new nodes address + port uint16 // node's listening port + from string // the peer that told us about this node + connected bool // move node from nodePool to nodes + disconnected bool // move node from nodes to nodePool } -func (ni nodeInfo) unpack() (string, uint16, string, bool) { - return ni.host, ni.port, ni.from, ni.removed +func (ni nodeInfo) unpack() (string, uint16, string, bool, bool) { + return ni.host, ni.port, ni.from, ni.connected, ni.disconnected } // crawler.Node //--------------------------------------------------------------------------------------- // crawler.Crawler -// A crawler has a local node, a set of potential nodes in the nodePool -// and connected nodes. Maps are only accessed by one go-routine, mediated through channels +// A crawler has a local node, a set of potential nodes in the nodePool, and connected nodes. +// Maps are only accessed by one go-routine, mediated by the checkQueue type Crawler struct { self *Node client *NodeClient @@ -94,17 +99,12 @@ type Crawler struct { nodeQueue chan *Node quit chan struct{} - - // waits for checkQueue to empty - // so we can re-poll all nodes - wg sync.WaitGroup } // Create a new Crawler using the local RPC server at addr func NewCrawler(host string, port uint16) *Crawler { return &Crawler{ - self: &Node{Host: host, RPCPort: port}, - client: NewNodeClient(fmt.Sprintf("%s:%d", host, port)), + self: &Node{Host: host, RPCPort: port, client: NewNodeClient(fmt.Sprintf("%s:%d", host, port))}, checkQueue: make(chan nodeInfo, CheckQueueBufferSize), nodePool: make(map[string]*Node), nodes: make(map[string]*Node), @@ -113,59 +113,37 @@ func NewCrawler(host string, port uint16) *Crawler { } } -func (c *Crawler) checkNode(ni nodeInfo) { - c.wg.Add(1) - c.checkQueue <- ni -} - func (c *Crawler) Start() error { - // make sure we can connect - // to our local node first - status, err := c.client.rpc.Status() - if err != nil { + // connect to local node first, set info, + // and fire peers onto the checkQueue + if err := c.pollNode(c.self); err != nil { return err } - // connect to weboscket and subscribe to local events - if err = c.client.ws.Dial(); err != nil { + // connect to weboscket, subscribe to local events + // and run the read loop to listen for new blocks + if r, err := c.self.client.ws.Dial(); err != nil { + fmt.Println(r) + b, _ := ioutil.ReadAll(r.Body) + fmt.Println(string(b)) return err } - if err = c.client.ws.Subscribe(types.EventStringNewBlock()); err != nil { + if err := c.self.client.ws.Subscribe(types.EventStringNewBlock()); err != nil { return err } - go c.readLocalEvents() + go c.readLoop(c.self) // add ourselves to the nodes list c.nodes[c.self.Address()] = c.self - // get peers from local node - netinfo, err := c.client.rpc.NetInfo() - if err != nil { - return err - } - - // set the info for ourselves - c.self.SetInfo(status, netinfo) - - // fire each peer on the checkQueue - for _, p := range netinfo.Peers { - c.checkNode(nodeInfo{ - host: p.Host, - port: p.RPCPort, - }) - } - - // nodes we hear about get put on the - // checkQueue and are handled in the checkLoop + // nodes we hear about get put on the checkQueue + // by pollNode and are handled in the checkLoop. // if its a node we're not already connected to, - // it gets put it on the nodeQueue and + // it gets put on the nodeQueue and // we attempt to connect in the connectLoop go c.checkLoop() go c.connectLoop() - // finally, a routine with a ticker to poll nodes for peers - go c.pollNodesRoutine() - return nil } @@ -173,32 +151,87 @@ func (c *Crawler) Stop() { close(c.quit) } -func (c *Crawler) readLocalEvents() { - // read on our ws for NewBlocks +// listen for events from the node and ping it for peers on a ticker +func (c *Crawler) readLoop(node *Node) { + wsChan := node.client.ws.Read() + getPeersTicker := time.Tick(time.Second * GetPeersTickerSeconds) + + for { + select { + case wsMsg := <-wsChan: + // update the node with his new info + if err := c.consumeMessage(wsMsg, node); err != nil { + // lost the node, put him back on the checkQueu + c.checkNode(nodeInfo{ + host: node.Host, + port: node.RPCPort, + disconnected: true, + }) + + } + case <-getPeersTicker: + if err := c.pollNode(node); err != nil { + // lost the node, put him back on the checkQueu + c.checkNode(nodeInfo{ + host: node.Host, + port: node.RPCPort, + disconnected: true, + }) + } + case <-c.quit: + return + + } + } +} + +func (c *Crawler) consumeMessage(wsMsg *rpcclient.WSMsg, node *Node) error { + if wsMsg.Error != nil { + return wsMsg.Error + } + // unmarshal block event + var response struct { + Event string + Data *types.Block + Error string + } + var err error + binary.ReadJSON(&response, wsMsg.Data, &err) + if err != nil { + return err + } + if response.Error != "" { + return fmt.Errorf(response.Error) + } + block := response.Data + + node.LastSeen = time.Now() + node.BlockHeight = block.Height + node.BlockHistory[block.Height] = node.LastSeen + + return nil } // check nodes against the nodePool map one at a time -// acts as a mutex on nodePool +// acts as a mutex on nodePool and nodes func (c *Crawler) checkLoop() { - c.wg.Add(1) for { - // every time the loop restarts - // it means we processed from the checkQueue - // (except the first time, hence the extra wg.Add) - c.wg.Done() select { case ni := <-c.checkQueue: - host, port, from, removed := ni.unpack() + host, port, from, connected, disconnected := ni.unpack() addr := fmt.Sprintf("%s:%d", host, port) - // check if the node should be removed - // (eg. if its been connected to or abandoned) - if removed { + // check if we need to swap node between maps (eg. its connected or disconnected) + // NOTE: once we hear about a node, we never forget ... + if connected { n, _ := c.nodePool[addr] - if n.connected { - c.nodes[addr] = n - } + c.nodes[addr] = n delete(c.nodePool, addr) continue + } else if disconnected { + n, _ := c.nodes[addr] + c.nodePool[addr] = n + delete(c.nodes, addr) + continue } // TODO: if address is badly formed @@ -209,7 +242,7 @@ func (c *Crawler) checkLoop() { // create the node if unknown if !ok { n = &Node{Host: host, RPCPort: port} - c.nodes[addr] = n + c.nodePool[addr] = n } else if n.connected { // should be removed soon continue @@ -246,34 +279,49 @@ func (c *Crawler) connectToNode(node *Node) { addr := node.Address() node.client = NewNodeClient(addr) - if err := node.client.ws.Dial(); err != nil { + if b, err := node.client.ws.Dial(); err != nil { + fmt.Println("err on ws dial:", b, err) // set failed, return } // remove from nodePool, add to nodes c.checkNode(nodeInfo{ - host: node.Host, - port: node.RPCPort, - removed: true, + host: node.Host, + port: node.RPCPort, + connected: true, }) - c.pollNode(node) + if err := c.pollNode(node); err != nil { + // TODO: we had a good ws con + // but failed on rpc?! + // try again or something ... + // if we still fail, report and disconnect + } - // TODO: read loop + fmt.Println("Successfully connected to node", node.Address()) + + // blocks (until quit or err) + c.readLoop(node) +} + +func (c *Crawler) checkNode(ni nodeInfo) { + c.checkQueue <- ni } func (c *Crawler) pollNode(node *Node) error { + // get the status info status, err := node.client.rpc.Status() if err != nil { return err } - // get peers + // get peers and net info netinfo, err := node.client.rpc.NetInfo() if err != nil { return err } + // set the info for the node node.SetInfo(status, netinfo) // fire each peer on the checkQueue @@ -286,33 +334,3 @@ func (c *Crawler) pollNode(node *Node) error { } return nil } - -// wait for the checkQueue to empty and poll all the nodes -func (c *Crawler) pollNodesRoutine() { - for { - c.wg.Wait() - - ticker := time.Tick(time.Second) - // wait a few seconds to make sure we really have nothing - time.Sleep(time.Second * 5) - ch := make(chan struct{}) - go func() { - c.wg.Wait() - ch <- struct{}{} - }() - - // - select { - case <-ticker: - // the checkQueue has filled up again, move on - continue - case <-ch: - // the checkQueue is legit empty, - // TODO: poll the nodes! - - case <-c.quit: - return - } - } - -} diff --git a/p2p/peer.go b/p2p/peer.go index 173297eb0..5340366fd 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -10,11 +10,19 @@ import ( . "github.com/tendermint/tendermint/common" ) +type nodeInfo struct { + Host string + RPCPort uint16 + P2PPort uint16 +} + type Peer struct { outbound bool mconn *MConnection running uint32 + Nodeinfo *nodeInfo + Key string Data *CMap // User data. } diff --git a/rpc/core_client/ws_client.go b/rpc/core_client/ws_client.go index ce0824469..71f24bf32 100644 --- a/rpc/core_client/ws_client.go +++ b/rpc/core_client/ws_client.go @@ -19,15 +19,15 @@ func NewWSClient(addr string) *WSClient { } } -func (wsc *WSClient) Dial() error { +func (wsc *WSClient) Dial() (*http.Response, error) { dialer := websocket.DefaultDialer rHeader := http.Header{} - conn, _, err := dialer.Dial(wsc.host, rHeader) + conn, r, err := dialer.Dial(wsc.host, rHeader) if err != nil { - return err + return r, err } wsc.conn = conn - return nil + return r, nil } // subscribe to an event @@ -45,3 +45,25 @@ func (wsc *WSClient) Unsubscribe(eventid string) error { Event: eventid, }) } + +type WSMsg struct { + Data []byte + Error error +} + +// returns a channel from which messages can be pulled +// from a go routine that reads the socket. +// if the ws returns an error (eg. closes), we return +func (wsc *WSClient) Read() chan *WSMsg { + ch := make(chan *WSMsg) + go func() { + for { + _, p, err := wsc.conn.ReadMessage() + ch <- &WSMsg{p, err} + if err != nil { + return + } + } + }() + return ch +}