Browse Source

more crawler work

pull/55/head
Ethan Buchman 10 years ago
parent
commit
882a82bad4
3 changed files with 165 additions and 117 deletions
  1. +131
    -113
      crawler/crawl.go
  2. +8
    -0
      p2p/peer.go
  3. +26
    -4
      rpc/core_client/ws_client.go

+ 131
- 113
crawler/crawl.go View File

@ -2,16 +2,19 @@ package crawler
import ( import (
"fmt" "fmt"
"github.com/tendermint/tendermint/binary"
rpctypes "github.com/tendermint/tendermint/rpc/core/types" rpctypes "github.com/tendermint/tendermint/rpc/core/types"
rpcclient "github.com/tendermint/tendermint/rpc/core_client" rpcclient "github.com/tendermint/tendermint/rpc/core_client"
types "github.com/tendermint/tendermint/types"
"sync"
"github.com/tendermint/tendermint/types"
"time" "time"
"io/ioutil"
) )
const ( const (
CheckQueueBufferSize = 100
NodeQueueBufferSize = 100
CheckQueueBufferSize = 100
NodeQueueBufferSize = 100
GetPeersTickerSeconds = 5
) )
//--------------------------------------------------------------------------------------- //---------------------------------------------------------------------------------------
@ -28,10 +31,11 @@ type Node struct {
client *NodeClient client *NodeClient
LastSeen time.Time
GenesisHash []byte
BlockHeight uint
NetInfo *rpctypes.ResponseNetInfo
LastSeen time.Time
GenesisHash []byte
BlockHeight uint
BlockHistory map[uint]time.Time // when we saw each block
NetInfo *rpctypes.ResponseNetInfo
Validator bool Validator bool
@ -61,29 +65,30 @@ type NodeClient struct {
// Create a new client for the node at the given addr // Create a new client for the node at the given addr
func NewNodeClient(addr string) *NodeClient { func NewNodeClient(addr string) *NodeClient {
return &NodeClient{ return &NodeClient{
rpc: rpcclient.NewClient(addr, "JSONRPC"),
ws: rpcclient.NewWSClient(addr),
rpc: rpcclient.NewClient("http://"+addr, "JSONRPC"),
ws: rpcclient.NewWSClient("ws://" + addr + "/events"),
} }
} }
// A simple wrapper for mediating access to the maps // A simple wrapper for mediating access to the maps
type nodeInfo struct { type nodeInfo struct {
host string // the new nodes address
port uint16
from string // the peer that told us about this node
removed bool // whether to remove from nodePool
host string // the new nodes address
port uint16 // node's listening port
from string // the peer that told us about this node
connected bool // move node from nodePool to nodes
disconnected bool // move node from nodes to nodePool
} }
func (ni nodeInfo) unpack() (string, uint16, string, bool) {
return ni.host, ni.port, ni.from, ni.removed
func (ni nodeInfo) unpack() (string, uint16, string, bool, bool) {
return ni.host, ni.port, ni.from, ni.connected, ni.disconnected
} }
// crawler.Node // crawler.Node
//--------------------------------------------------------------------------------------- //---------------------------------------------------------------------------------------
// crawler.Crawler // crawler.Crawler
// A crawler has a local node, a set of potential nodes in the nodePool
// and connected nodes. Maps are only accessed by one go-routine, mediated through channels
// A crawler has a local node, a set of potential nodes in the nodePool, and connected nodes.
// Maps are only accessed by one go-routine, mediated by the checkQueue
type Crawler struct { type Crawler struct {
self *Node self *Node
client *NodeClient client *NodeClient
@ -94,17 +99,12 @@ type Crawler struct {
nodeQueue chan *Node nodeQueue chan *Node
quit chan struct{} quit chan struct{}
// waits for checkQueue to empty
// so we can re-poll all nodes
wg sync.WaitGroup
} }
// Create a new Crawler using the local RPC server at addr // Create a new Crawler using the local RPC server at addr
func NewCrawler(host string, port uint16) *Crawler { func NewCrawler(host string, port uint16) *Crawler {
return &Crawler{ return &Crawler{
self: &Node{Host: host, RPCPort: port},
client: NewNodeClient(fmt.Sprintf("%s:%d", host, port)),
self: &Node{Host: host, RPCPort: port, client: NewNodeClient(fmt.Sprintf("%s:%d", host, port))},
checkQueue: make(chan nodeInfo, CheckQueueBufferSize), checkQueue: make(chan nodeInfo, CheckQueueBufferSize),
nodePool: make(map[string]*Node), nodePool: make(map[string]*Node),
nodes: make(map[string]*Node), nodes: make(map[string]*Node),
@ -113,59 +113,37 @@ func NewCrawler(host string, port uint16) *Crawler {
} }
} }
func (c *Crawler) checkNode(ni nodeInfo) {
c.wg.Add(1)
c.checkQueue <- ni
}
func (c *Crawler) Start() error { func (c *Crawler) Start() error {
// make sure we can connect
// to our local node first
status, err := c.client.rpc.Status()
if err != nil {
// connect to local node first, set info,
// and fire peers onto the checkQueue
if err := c.pollNode(c.self); err != nil {
return err return err
} }
// connect to weboscket and subscribe to local events
if err = c.client.ws.Dial(); err != nil {
// connect to weboscket, subscribe to local events
// and run the read loop to listen for new blocks
if r, err := c.self.client.ws.Dial(); err != nil {
fmt.Println(r)
b, _ := ioutil.ReadAll(r.Body)
fmt.Println(string(b))
return err return err
} }
if err = c.client.ws.Subscribe(types.EventStringNewBlock()); err != nil {
if err := c.self.client.ws.Subscribe(types.EventStringNewBlock()); err != nil {
return err return err
} }
go c.readLocalEvents()
go c.readLoop(c.self)
// add ourselves to the nodes list // add ourselves to the nodes list
c.nodes[c.self.Address()] = c.self c.nodes[c.self.Address()] = c.self
// get peers from local node
netinfo, err := c.client.rpc.NetInfo()
if err != nil {
return err
}
// set the info for ourselves
c.self.SetInfo(status, netinfo)
// fire each peer on the checkQueue
for _, p := range netinfo.Peers {
c.checkNode(nodeInfo{
host: p.Host,
port: p.RPCPort,
})
}
// nodes we hear about get put on the
// checkQueue and are handled in the checkLoop
// nodes we hear about get put on the checkQueue
// by pollNode and are handled in the checkLoop.
// if its a node we're not already connected to, // if its a node we're not already connected to,
// it gets put it on the nodeQueue and
// it gets put on the nodeQueue and
// we attempt to connect in the connectLoop // we attempt to connect in the connectLoop
go c.checkLoop() go c.checkLoop()
go c.connectLoop() go c.connectLoop()
// finally, a routine with a ticker to poll nodes for peers
go c.pollNodesRoutine()
return nil return nil
} }
@ -173,32 +151,87 @@ func (c *Crawler) Stop() {
close(c.quit) close(c.quit)
} }
func (c *Crawler) readLocalEvents() {
// read on our ws for NewBlocks
// listen for events from the node and ping it for peers on a ticker
func (c *Crawler) readLoop(node *Node) {
wsChan := node.client.ws.Read()
getPeersTicker := time.Tick(time.Second * GetPeersTickerSeconds)
for {
select {
case wsMsg := <-wsChan:
// update the node with his new info
if err := c.consumeMessage(wsMsg, node); err != nil {
// lost the node, put him back on the checkQueu
c.checkNode(nodeInfo{
host: node.Host,
port: node.RPCPort,
disconnected: true,
})
}
case <-getPeersTicker:
if err := c.pollNode(node); err != nil {
// lost the node, put him back on the checkQueu
c.checkNode(nodeInfo{
host: node.Host,
port: node.RPCPort,
disconnected: true,
})
}
case <-c.quit:
return
}
}
}
func (c *Crawler) consumeMessage(wsMsg *rpcclient.WSMsg, node *Node) error {
if wsMsg.Error != nil {
return wsMsg.Error
}
// unmarshal block event
var response struct {
Event string
Data *types.Block
Error string
}
var err error
binary.ReadJSON(&response, wsMsg.Data, &err)
if err != nil {
return err
}
if response.Error != "" {
return fmt.Errorf(response.Error)
}
block := response.Data
node.LastSeen = time.Now()
node.BlockHeight = block.Height
node.BlockHistory[block.Height] = node.LastSeen
return nil
} }
// check nodes against the nodePool map one at a time // check nodes against the nodePool map one at a time
// acts as a mutex on nodePool
// acts as a mutex on nodePool and nodes
func (c *Crawler) checkLoop() { func (c *Crawler) checkLoop() {
c.wg.Add(1)
for { for {
// every time the loop restarts
// it means we processed from the checkQueue
// (except the first time, hence the extra wg.Add)
c.wg.Done()
select { select {
case ni := <-c.checkQueue: case ni := <-c.checkQueue:
host, port, from, removed := ni.unpack()
host, port, from, connected, disconnected := ni.unpack()
addr := fmt.Sprintf("%s:%d", host, port) addr := fmt.Sprintf("%s:%d", host, port)
// check if the node should be removed
// (eg. if its been connected to or abandoned)
if removed {
// check if we need to swap node between maps (eg. its connected or disconnected)
// NOTE: once we hear about a node, we never forget ...
if connected {
n, _ := c.nodePool[addr] n, _ := c.nodePool[addr]
if n.connected {
c.nodes[addr] = n
}
c.nodes[addr] = n
delete(c.nodePool, addr) delete(c.nodePool, addr)
continue continue
} else if disconnected {
n, _ := c.nodes[addr]
c.nodePool[addr] = n
delete(c.nodes, addr)
continue
} }
// TODO: if address is badly formed // TODO: if address is badly formed
@ -209,7 +242,7 @@ func (c *Crawler) checkLoop() {
// create the node if unknown // create the node if unknown
if !ok { if !ok {
n = &Node{Host: host, RPCPort: port} n = &Node{Host: host, RPCPort: port}
c.nodes[addr] = n
c.nodePool[addr] = n
} else if n.connected { } else if n.connected {
// should be removed soon // should be removed soon
continue continue
@ -246,34 +279,49 @@ func (c *Crawler) connectToNode(node *Node) {
addr := node.Address() addr := node.Address()
node.client = NewNodeClient(addr) node.client = NewNodeClient(addr)
if err := node.client.ws.Dial(); err != nil {
if b, err := node.client.ws.Dial(); err != nil {
fmt.Println("err on ws dial:", b, err)
// set failed, return // set failed, return
} }
// remove from nodePool, add to nodes // remove from nodePool, add to nodes
c.checkNode(nodeInfo{ c.checkNode(nodeInfo{
host: node.Host,
port: node.RPCPort,
removed: true,
host: node.Host,
port: node.RPCPort,
connected: true,
}) })
c.pollNode(node)
if err := c.pollNode(node); err != nil {
// TODO: we had a good ws con
// but failed on rpc?!
// try again or something ...
// if we still fail, report and disconnect
}
// TODO: read loop
fmt.Println("Successfully connected to node", node.Address())
// blocks (until quit or err)
c.readLoop(node)
}
func (c *Crawler) checkNode(ni nodeInfo) {
c.checkQueue <- ni
} }
func (c *Crawler) pollNode(node *Node) error { func (c *Crawler) pollNode(node *Node) error {
// get the status info
status, err := node.client.rpc.Status() status, err := node.client.rpc.Status()
if err != nil { if err != nil {
return err return err
} }
// get peers
// get peers and net info
netinfo, err := node.client.rpc.NetInfo() netinfo, err := node.client.rpc.NetInfo()
if err != nil { if err != nil {
return err return err
} }
// set the info for the node
node.SetInfo(status, netinfo) node.SetInfo(status, netinfo)
// fire each peer on the checkQueue // fire each peer on the checkQueue
@ -286,33 +334,3 @@ func (c *Crawler) pollNode(node *Node) error {
} }
return nil return nil
} }
// wait for the checkQueue to empty and poll all the nodes
func (c *Crawler) pollNodesRoutine() {
for {
c.wg.Wait()
ticker := time.Tick(time.Second)
// wait a few seconds to make sure we really have nothing
time.Sleep(time.Second * 5)
ch := make(chan struct{})
go func() {
c.wg.Wait()
ch <- struct{}{}
}()
//
select {
case <-ticker:
// the checkQueue has filled up again, move on
continue
case <-ch:
// the checkQueue is legit empty,
// TODO: poll the nodes!
case <-c.quit:
return
}
}
}

+ 8
- 0
p2p/peer.go View File

@ -10,11 +10,19 @@ import (
. "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/common"
) )
type nodeInfo struct {
Host string
RPCPort uint16
P2PPort uint16
}
type Peer struct { type Peer struct {
outbound bool outbound bool
mconn *MConnection mconn *MConnection
running uint32 running uint32
Nodeinfo *nodeInfo
Key string Key string
Data *CMap // User data. Data *CMap // User data.
} }


+ 26
- 4
rpc/core_client/ws_client.go View File

@ -19,15 +19,15 @@ func NewWSClient(addr string) *WSClient {
} }
} }
func (wsc *WSClient) Dial() error {
func (wsc *WSClient) Dial() (*http.Response, error) {
dialer := websocket.DefaultDialer dialer := websocket.DefaultDialer
rHeader := http.Header{} rHeader := http.Header{}
conn, _, err := dialer.Dial(wsc.host, rHeader)
conn, r, err := dialer.Dial(wsc.host, rHeader)
if err != nil { if err != nil {
return err
return r, err
} }
wsc.conn = conn wsc.conn = conn
return nil
return r, nil
} }
// subscribe to an event // subscribe to an event
@ -45,3 +45,25 @@ func (wsc *WSClient) Unsubscribe(eventid string) error {
Event: eventid, Event: eventid,
}) })
} }
type WSMsg struct {
Data []byte
Error error
}
// returns a channel from which messages can be pulled
// from a go routine that reads the socket.
// if the ws returns an error (eg. closes), we return
func (wsc *WSClient) Read() chan *WSMsg {
ch := make(chan *WSMsg)
go func() {
for {
_, p, err := wsc.conn.ReadMessage()
ch <- &WSMsg{p, err}
if err != nil {
return
}
}
}()
return ch
}

Loading…
Cancel
Save