Browse Source

Merge pull request #55 from tendermint/sync

Sync
pull/57/head
Jae Kwon 10 years ago
parent
commit
4bcfc1e4bb
9 changed files with 540 additions and 22 deletions
  1. +5
    -0
      Makefile
  2. +43
    -4
      blockchain/pool.go
  3. +63
    -13
      blockchain/reactor.go
  4. +9
    -1
      consensus/reactor.go
  5. +336
    -0
      crawler/crawl.go
  6. +5
    -4
      node/node.go
  7. +6
    -0
      p2p/peer.go
  8. +69
    -0
      rpc/core_client/ws_client.go
  9. +4
    -0
      rpc/handlers.go

+ 5
- 0
Makefile View File

@ -7,6 +7,11 @@ build: get_deps
go build -o build/barak github.com/tendermint/tendermint/cmd/barak go build -o build/barak github.com/tendermint/tendermint/cmd/barak
go build -o build/debora github.com/tendermint/tendermint/cmd/debora go build -o build/debora github.com/tendermint/tendermint/cmd/debora
no_get:
go build -o build/tendermint github.com/tendermint/tendermint/cmd/tendermint
go build -o build/barak github.com/tendermint/tendermint/cmd/barak
go build -o build/debora github.com/tendermint/tendermint/cmd/debora
build_race: get_deps build_race: get_deps
go build -race -o build/tendermint github.com/tendermint/tendermint/cmd/tendermint go build -race -o build/tendermint github.com/tendermint/tendermint/cmd/tendermint
go build -race -o build/barak github.com/tendermint/tendermint/cmd/barak go build -race -o build/barak github.com/tendermint/tendermint/cmd/barak


+ 43
- 4
blockchain/pool.go View File

@ -18,15 +18,29 @@ const (
maxRequestsPerPeer = 300 maxRequestsPerPeer = 300
) )
// numTotal = numPending + blocks in the pool we havnt synced yet
var ( var (
requestTimeoutSeconds = time.Duration(1) requestTimeoutSeconds = time.Duration(1)
) )
/*
Peers self report their heights when a new peer joins the block pool.
Starting from whatever we've got (pool.height), we request blocks
in sequence from peers that reported higher heights than ours.
Every so often we ask peers what height they're on so we can keep going.
Requests are continuously made for blocks of heigher heights until
the limits. If most of the requests have no available peers, and we
are not at peer limits, we can probably switch to consensus reactor
*/
type BlockPool struct { type BlockPool struct {
// block requests // block requests
requestsMtx sync.Mutex requestsMtx sync.Mutex
requests map[uint]*bpRequest requests map[uint]*bpRequest
height uint // the lowest key in requests.
peerless int32 // number of requests without peers
height uint // the lowest key in requests.
numPending int32 numPending int32
numTotal int32 numTotal int32
@ -145,10 +159,13 @@ func (pool *BlockPool) RedoRequest(height uint) {
if request.block == nil { if request.block == nil {
panic("Expected block to be non-nil") panic("Expected block to be non-nil")
} }
// TODO: record this malfeasance
// maybe punish peer on switch (an invalid block!)
pool.RemovePeer(request.peerId) // Lock on peersMtx. pool.RemovePeer(request.peerId) // Lock on peersMtx.
request.block = nil request.block = nil
request.peerId = "" request.peerId = ""
pool.numPending++ pool.numPending++
pool.peerless++
go requestRoutine(pool, height) go requestRoutine(pool, height)
} }
@ -169,9 +186,22 @@ func (pool *BlockPool) setPeerForRequest(height uint, peerId string) {
if request == nil { if request == nil {
return return
} }
pool.peerless--
request.peerId = peerId request.peerId = peerId
} }
func (pool *BlockPool) removePeerForRequest(height uint, peerId string) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
request := pool.requests[height]
if request == nil {
return
}
pool.peerless++
request.peerId = ""
}
func (pool *BlockPool) AddBlock(block *types.Block, peerId string) { func (pool *BlockPool) AddBlock(block *types.Block, peerId string) {
pool.requestsMtx.Lock() // Lock pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock() defer pool.requestsMtx.Unlock()
@ -198,7 +228,7 @@ func (pool *BlockPool) getPeer(peerId string) *bpPeer {
return peer return peer
} }
// Sets the peer's blockchain height.
// Sets the peer's alleged blockchain height.
func (pool *BlockPool) SetPeerHeight(peerId string, height uint) { func (pool *BlockPool) SetPeerHeight(peerId string, height uint) {
pool.peersMtx.Lock() // Lock pool.peersMtx.Lock() // Lock
defer pool.peersMtx.Unlock() defer pool.peersMtx.Unlock()
@ -239,7 +269,6 @@ func (pool *BlockPool) pickIncrAvailablePeer(minHeight uint) *bpPeer {
peer.numRequests++ peer.numRequests++
return peer return peer
} }
return nil return nil
} }
@ -258,6 +287,7 @@ func (pool *BlockPool) nextHeight() uint {
pool.requestsMtx.Lock() // Lock pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock() defer pool.requestsMtx.Unlock()
// we make one request per height.
return pool.height + uint(pool.numTotal) return pool.height + uint(pool.numTotal)
} }
@ -272,6 +302,8 @@ func (pool *BlockPool) makeRequest(height uint) {
} }
pool.requests[height] = request pool.requests[height] = request
pool.peerless++
nextHeight := pool.height + uint(pool.numTotal) nextHeight := pool.height + uint(pool.numTotal)
if nextHeight == height { if nextHeight == height {
pool.numTotal++ pool.numTotal++
@ -328,7 +360,7 @@ type bpRequest struct {
//------------------------------------- //-------------------------------------
// Responsible for making more requests as necessary // Responsible for making more requests as necessary
// Returns when a block is found (e.g. AddBlock() is called)
// Returns only when a block is found (e.g. AddBlock() is called)
func requestRoutine(pool *BlockPool, height uint) { func requestRoutine(pool *BlockPool, height uint) {
for { for {
var peer *bpPeer = nil var peer *bpPeer = nil
@ -347,15 +379,18 @@ func requestRoutine(pool *BlockPool, height uint) {
break PICK_LOOP break PICK_LOOP
} }
// set the peer, decrement peerless
pool.setPeerForRequest(height, peer.id) pool.setPeerForRequest(height, peer.id)
for try := 0; try < maxTries; try++ { for try := 0; try < maxTries; try++ {
pool.sendRequest(height, peer.id) pool.sendRequest(height, peer.id)
time.Sleep(requestTimeoutSeconds * time.Second) time.Sleep(requestTimeoutSeconds * time.Second)
// if successful the block is either in the pool,
if pool.hasBlock(height) { if pool.hasBlock(height) {
pool.decrPeer(peer.id) pool.decrPeer(peer.id)
return return
} }
// or already processed and we've moved past it
bpHeight, _, _ := pool.GetStatus() bpHeight, _, _ := pool.GetStatus()
if height < bpHeight { if height < bpHeight {
pool.decrPeer(peer.id) pool.decrPeer(peer.id)
@ -363,6 +398,10 @@ func requestRoutine(pool *BlockPool, height uint) {
} }
} }
// unset the peer, increment peerless
pool.removePeerForRequest(height, peer.id)
// this peer failed us, try again
pool.RemovePeer(peer.id) pool.RemovePeer(peer.id)
pool.sendTimeout(peer.id) pool.sendTimeout(peer.id)
} }


+ 63
- 13
blockchain/reactor.go View File

@ -10,6 +10,7 @@ import (
"github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/common"
dbm "github.com/tendermint/tendermint/db"
"github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/events"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
@ -24,9 +25,14 @@ const (
// stop syncing when last block's time is // stop syncing when last block's time is
// within this much of the system time. // within this much of the system time.
stopSyncingDurationMinutes = 10 stopSyncingDurationMinutes = 10
// ask for best height every 10s
statusUpdateIntervalSeconds = 10
// check if we should switch to consensus reactor
switchToConsensusIntervalSeconds = 10
) )
type stateResetter interface {
type consensusReactor interface {
SetSyncing(bool)
ResetToState(*sm.State) ResetToState(*sm.State)
} }
@ -106,7 +112,7 @@ func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
// Implements Reactor // Implements Reactor
func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) { func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
// Send peer our state. // Send peer our state.
peer.Send(BlockchainChannel, &bcPeerStatusMessage{bcR.store.Height()})
peer.Send(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()})
} }
// Implements Reactor // Implements Reactor
@ -141,8 +147,14 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte)
case *bcBlockResponseMessage: case *bcBlockResponseMessage:
// Got a block. // Got a block.
bcR.pool.AddBlock(msg.Block, src.Key) bcR.pool.AddBlock(msg.Block, src.Key)
case *bcPeerStatusMessage:
// Got a peer status.
case *bcStatusRequestMessage:
// Send peer our state.
queued := src.TrySend(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()})
if !queued {
// sorry
}
case *bcStatusResponseMessage:
// Got a peer status. Unverified.
bcR.pool.SetPeerHeight(src.Key, msg.Height) bcR.pool.SetPeerHeight(src.Key, msg.Height)
default: default:
log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
@ -153,6 +165,8 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte)
func (bcR *BlockchainReactor) poolRoutine() { func (bcR *BlockchainReactor) poolRoutine() {
trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
FOR_LOOP: FOR_LOOP:
for { for {
@ -176,6 +190,24 @@ FOR_LOOP:
if peer != nil { if peer != nil {
bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout")) bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
} }
case _ = <-statusUpdateTicker.C:
// ask for status updates
go bcR.BroadcastStatusRequest()
case _ = <-switchToConsensusTicker.C:
// not thread safe access for peerless and numPending but should be fine
log.Debug("Consensus ticker", "peerless", bcR.pool.peerless, "pending", bcR.pool.numPending, "total", bcR.pool.numTotal)
// NOTE: this condition is very strict right now. may need to weaken
if bcR.pool.numPending == maxPendingRequests && bcR.pool.peerless == bcR.pool.numPending {
log.Warn("Time to switch to consensus reactor!", "height", bcR.pool.height)
bcR.pool.Stop()
stateDB := dbm.GetDB("state")
state := sm.LoadState(stateDB)
bcR.sw.Reactor("CONSENSUS").(consensusReactor).ResetToState(state)
bcR.sw.Reactor("CONSENSUS").(consensusReactor).SetSyncing(false)
break FOR_LOOP
}
case _ = <-trySyncTicker.C: // chan time case _ = <-trySyncTicker.C: // chan time
//var lastValidatedBlock *types.Block //var lastValidatedBlock *types.Block
SYNC_LOOP: SYNC_LOOP:
@ -215,6 +247,7 @@ FOR_LOOP:
// TODO: use other heuristics too besides blocktime. // TODO: use other heuristics too besides blocktime.
// It's not a security concern, as it only needs to happen // It's not a security concern, as it only needs to happen
// upon node sync, and there's also a second (slower) // upon node sync, and there's also a second (slower)
// this peer failed us
// method of syncing in the consensus reactor. // method of syncing in the consensus reactor.
if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute { if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute {
@ -238,8 +271,13 @@ FOR_LOOP:
} }
} }
func (bcR *BlockchainReactor) BroadcastStatus() error {
bcR.sw.Broadcast(BlockchainChannel, &bcPeerStatusMessage{bcR.store.Height()})
func (bcR *BlockchainReactor) BroadcastStatusResponse() error {
bcR.sw.Broadcast(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()})
return nil
}
func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
bcR.sw.Broadcast(BlockchainChannel, &bcStatusRequestMessage{bcR.store.Height()})
return nil return nil
} }
@ -252,9 +290,10 @@ func (bcR *BlockchainReactor) SetFireable(evsw events.Fireable) {
// Messages // Messages
const ( const (
msgTypeBlockRequest = byte(0x10)
msgTypeBlockResponse = byte(0x11)
msgTypePeerStatus = byte(0x20)
msgTypeBlockRequest = byte(0x10)
msgTypeBlockResponse = byte(0x11)
msgTypeStatusResponse = byte(0x20)
msgTypeStatusRequest = byte(0x21)
) )
type BlockchainMessage interface{} type BlockchainMessage interface{}
@ -263,7 +302,8 @@ var _ = binary.RegisterInterface(
struct{ BlockchainMessage }{}, struct{ BlockchainMessage }{},
binary.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest}, binary.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest},
binary.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse}, binary.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse},
binary.ConcreteType{&bcPeerStatusMessage{}, msgTypePeerStatus},
binary.ConcreteType{&bcStatusResponseMessage{}, msgTypeStatusResponse},
binary.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest},
) )
func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) { func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
@ -296,10 +336,20 @@ func (m *bcBlockResponseMessage) String() string {
//------------------------------------- //-------------------------------------
type bcPeerStatusMessage struct {
type bcStatusRequestMessage struct {
Height uint
}
func (m *bcStatusRequestMessage) String() string {
return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height)
}
//-------------------------------------
type bcStatusResponseMessage struct {
Height uint Height uint
} }
func (m *bcPeerStatusMessage) String() string {
return fmt.Sprintf("[bcPeerStatusMessage %v]", m.Height)
func (m *bcStatusResponseMessage) String() string {
return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height)
} }

+ 9
- 1
consensus/reactor.go View File

@ -41,6 +41,9 @@ type ConsensusReactor struct {
blockStore *bc.BlockStore blockStore *bc.BlockStore
conS *ConsensusState conS *ConsensusState
// if fast sync is running we don't really do anything
syncing bool
evsw events.Fireable evsw events.Fireable
} }
@ -123,7 +126,7 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
// Implements Reactor // Implements Reactor
func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) { func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
if !conR.IsRunning() {
if conR.syncing || !conR.IsRunning() {
return return
} }
@ -224,6 +227,11 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
} }
} }
// Sets whether or not we're using the blockchain reactor for syncing
func (conR *ConsensusReactor) SetSyncing(syncing bool) {
conR.syncing = syncing
}
// Sets our private validator account for signing votes. // Sets our private validator account for signing votes.
func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) { func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) {
conR.conS.SetPrivValidator(priv) conR.conS.SetPrivValidator(priv)


+ 336
- 0
crawler/crawl.go View File

@ -0,0 +1,336 @@
package crawler
import (
"fmt"
"github.com/tendermint/tendermint/binary"
rpctypes "github.com/tendermint/tendermint/rpc/core/types"
rpcclient "github.com/tendermint/tendermint/rpc/core_client"
"github.com/tendermint/tendermint/types"
"time"
"io/ioutil"
)
const (
CheckQueueBufferSize = 100
NodeQueueBufferSize = 100
GetPeersTickerSeconds = 5
)
//---------------------------------------------------------------------------------------
// crawler.Node
// A node is a peer on the network.
type Node struct {
Host string
P2PPort uint16
RPCPort uint16
failed int
connected bool
client *NodeClient
LastSeen time.Time
GenesisHash []byte
BlockHeight uint
BlockHistory map[uint]time.Time // when we saw each block
NetInfo *rpctypes.ResponseNetInfo
Validator bool
// other peers we heard about this peer from
heardFrom map[string]struct{}
}
func (n *Node) Address() string {
return fmt.Sprintf("%s:%d", n.Host, n.RPCPort)
}
// Set the basic status and network info for a node from RPC responses
func (n *Node) SetInfo(status *rpctypes.ResponseStatus, netinfo *rpctypes.ResponseNetInfo) {
n.LastSeen = time.Now()
n.GenesisHash = status.GenesisHash
n.BlockHeight = status.LatestBlockHeight
n.NetInfo = netinfo
// n.Validator
}
// A node client is used to talk to a node over rpc and websockets
type NodeClient struct {
rpc rpcclient.Client
ws *rpcclient.WSClient
}
// Create a new client for the node at the given addr
func NewNodeClient(addr string) *NodeClient {
return &NodeClient{
rpc: rpcclient.NewClient("http://"+addr, "JSONRPC"),
ws: rpcclient.NewWSClient("ws://" + addr + "/events"),
}
}
// A simple wrapper for mediating access to the maps
type nodeInfo struct {
host string // the new nodes address
port uint16 // node's listening port
from string // the peer that told us about this node
connected bool // move node from nodePool to nodes
disconnected bool // move node from nodes to nodePool
}
func (ni nodeInfo) unpack() (string, uint16, string, bool, bool) {
return ni.host, ni.port, ni.from, ni.connected, ni.disconnected
}
// crawler.Node
//---------------------------------------------------------------------------------------
// crawler.Crawler
// A crawler has a local node, a set of potential nodes in the nodePool, and connected nodes.
// Maps are only accessed by one go-routine, mediated by the checkQueue
type Crawler struct {
self *Node
client *NodeClient
checkQueue chan nodeInfo
nodePool map[string]*Node
nodes map[string]*Node
nodeQueue chan *Node
quit chan struct{}
}
// Create a new Crawler using the local RPC server at addr
func NewCrawler(host string, port uint16) *Crawler {
return &Crawler{
self: &Node{Host: host, RPCPort: port, client: NewNodeClient(fmt.Sprintf("%s:%d", host, port))},
checkQueue: make(chan nodeInfo, CheckQueueBufferSize),
nodePool: make(map[string]*Node),
nodes: make(map[string]*Node),
nodeQueue: make(chan *Node, NodeQueueBufferSize),
quit: make(chan struct{}),
}
}
func (c *Crawler) Start() error {
// connect to local node first, set info,
// and fire peers onto the checkQueue
if err := c.pollNode(c.self); err != nil {
return err
}
// connect to weboscket, subscribe to local events
// and run the read loop to listen for new blocks
if r, err := c.self.client.ws.Dial(); err != nil {
fmt.Println(r)
b, _ := ioutil.ReadAll(r.Body)
fmt.Println(string(b))
return err
}
if err := c.self.client.ws.Subscribe(types.EventStringNewBlock()); err != nil {
return err
}
go c.readLoop(c.self)
// add ourselves to the nodes list
c.nodes[c.self.Address()] = c.self
// nodes we hear about get put on the checkQueue
// by pollNode and are handled in the checkLoop.
// if its a node we're not already connected to,
// it gets put on the nodeQueue and
// we attempt to connect in the connectLoop
go c.checkLoop()
go c.connectLoop()
return nil
}
func (c *Crawler) Stop() {
close(c.quit)
}
// listen for events from the node and ping it for peers on a ticker
func (c *Crawler) readLoop(node *Node) {
wsChan := node.client.ws.Read()
getPeersTicker := time.Tick(time.Second * GetPeersTickerSeconds)
for {
select {
case wsMsg := <-wsChan:
// update the node with his new info
if err := c.consumeMessage(wsMsg, node); err != nil {
// lost the node, put him back on the checkQueu
c.checkNode(nodeInfo{
host: node.Host,
port: node.RPCPort,
disconnected: true,
})
}
case <-getPeersTicker:
if err := c.pollNode(node); err != nil {
// lost the node, put him back on the checkQueu
c.checkNode(nodeInfo{
host: node.Host,
port: node.RPCPort,
disconnected: true,
})
}
case <-c.quit:
return
}
}
}
func (c *Crawler) consumeMessage(wsMsg *rpcclient.WSMsg, node *Node) error {
if wsMsg.Error != nil {
return wsMsg.Error
}
// unmarshal block event
var response struct {
Event string
Data *types.Block
Error string
}
var err error
binary.ReadJSON(&response, wsMsg.Data, &err)
if err != nil {
return err
}
if response.Error != "" {
return fmt.Errorf(response.Error)
}
block := response.Data
node.LastSeen = time.Now()
node.BlockHeight = block.Height
node.BlockHistory[block.Height] = node.LastSeen
return nil
}
// check nodes against the nodePool map one at a time
// acts as a mutex on nodePool and nodes
func (c *Crawler) checkLoop() {
for {
select {
case ni := <-c.checkQueue:
host, port, from, connected, disconnected := ni.unpack()
addr := fmt.Sprintf("%s:%d", host, port)
// check if we need to swap node between maps (eg. its connected or disconnected)
// NOTE: once we hear about a node, we never forget ...
if connected {
n, _ := c.nodePool[addr]
c.nodes[addr] = n
delete(c.nodePool, addr)
continue
} else if disconnected {
n, _ := c.nodes[addr]
c.nodePool[addr] = n
delete(c.nodes, addr)
continue
}
// TODO: if address is badly formed
// we should punish ni.from
_ = from
n, ok := c.nodePool[addr]
// create the node if unknown
if !ok {
n = &Node{Host: host, RPCPort: port}
c.nodePool[addr] = n
} else if n.connected {
// should be removed soon
continue
}
// queue it for connecting to
c.nodeQueue <- n
case <-c.quit:
return
}
}
}
// read off the nodeQueue and attempt to connect to nodes
func (c *Crawler) connectLoop() {
for {
select {
case node := <-c.nodeQueue:
go c.connectToNode(node)
case <-c.quit:
// close all connections
for addr, node := range c.nodes {
_, _ = addr, node
// TODO: close conn
}
return
}
}
}
func (c *Crawler) connectToNode(node *Node) {
addr := node.Address()
node.client = NewNodeClient(addr)
if b, err := node.client.ws.Dial(); err != nil {
fmt.Println("err on ws dial:", b, err)
// set failed, return
}
// remove from nodePool, add to nodes
c.checkNode(nodeInfo{
host: node.Host,
port: node.RPCPort,
connected: true,
})
if err := c.pollNode(node); err != nil {
// TODO: we had a good ws con
// but failed on rpc?!
// try again or something ...
// if we still fail, report and disconnect
}
fmt.Println("Successfully connected to node", node.Address())
// blocks (until quit or err)
c.readLoop(node)
}
func (c *Crawler) checkNode(ni nodeInfo) {
c.checkQueue <- ni
}
func (c *Crawler) pollNode(node *Node) error {
// get the status info
status, err := node.client.rpc.Status()
if err != nil {
return err
}
// get peers and net info
netinfo, err := node.client.rpc.NetInfo()
if err != nil {
return err
}
// set the info for the node
node.SetInfo(status, netinfo)
// fire each peer on the checkQueue
for _, p := range netinfo.Peers {
c.checkNode(nodeInfo{
host: p.Host,
port: p.RPCPort,
from: node.Address(),
})
}
return nil
}

+ 5
- 4
node/node.go View File

@ -81,6 +81,11 @@ func NewNode() *Node {
consensusReactor.SetPrivValidator(privValidator) consensusReactor.SetPrivValidator(privValidator)
} }
// so the consensus reactor won't do anything until we're synced
if config.App().GetBool("FastSync") {
consensusReactor.SetSyncing(true)
}
sw := p2p.NewSwitch() sw := p2p.NewSwitch()
sw.AddReactor("PEX", pexReactor) sw.AddReactor("PEX", pexReactor)
sw.AddReactor("MEMPOOL", mempoolReactor) sw.AddReactor("MEMPOOL", mempoolReactor)
@ -112,10 +117,6 @@ func (n *Node) Start() {
nodeInfo := makeNodeInfo(n.sw) nodeInfo := makeNodeInfo(n.sw)
n.sw.SetNodeInfo(nodeInfo) n.sw.SetNodeInfo(nodeInfo)
n.sw.Start() n.sw.Start()
if config.App().GetBool("FastSync") {
// TODO: When FastSync is done, start CONSENSUS.
n.sw.Reactor("CONSENSUS").Stop()
}
} }
func (n *Node) Stop() { func (n *Node) Stop() {


+ 6
- 0
p2p/peer.go View File

@ -12,6 +12,12 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
type nodeInfo struct {
Host string
RPCPort uint16
P2PPort uint16
}
type Peer struct { type Peer struct {
outbound bool outbound bool
mconn *MConnection mconn *MConnection


+ 69
- 0
rpc/core_client/ws_client.go View File

@ -0,0 +1,69 @@
package core_client
import (
"github.com/gorilla/websocket"
"github.com/tendermint/tendermint/rpc"
"net/http"
)
// A websocket client subscribes and unsubscribes to events
type WSClient struct {
host string
conn *websocket.Conn
}
// create a new connection
func NewWSClient(addr string) *WSClient {
return &WSClient{
host: addr,
}
}
func (wsc *WSClient) Dial() (*http.Response, error) {
dialer := websocket.DefaultDialer
rHeader := http.Header{}
conn, r, err := dialer.Dial(wsc.host, rHeader)
if err != nil {
return r, err
}
wsc.conn = conn
return r, nil
}
// subscribe to an event
func (wsc *WSClient) Subscribe(eventid string) error {
return wsc.conn.WriteJSON(rpc.WSRequest{
Type: "subscribe",
Event: eventid,
})
}
// unsubscribe from an event
func (wsc *WSClient) Unsubscribe(eventid string) error {
return wsc.conn.WriteJSON(rpc.WSRequest{
Type: "unsubscribe",
Event: eventid,
})
}
type WSMsg struct {
Data []byte
Error error
}
// returns a channel from which messages can be pulled
// from a go routine that reads the socket.
// if the ws returns an error (eg. closes), we return
func (wsc *WSClient) Read() chan *WSMsg {
ch := make(chan *WSMsg)
go func() {
for {
_, p, err := wsc.conn.ReadMessage()
ch <- &WSMsg{p, err}
if err != nil {
return
}
}
}()
return ch
}

+ 4
- 0
rpc/handlers.go View File

@ -258,6 +258,7 @@ func (con *WSConnection) Start(evsw *events.EventSwitch) {
// close the connection // close the connection
func (con *WSConnection) Stop() { func (con *WSConnection) Stop() {
if atomic.CompareAndSwapUint32(&con.stopped, 0, 1) { if atomic.CompareAndSwapUint32(&con.stopped, 0, 1) {
con.evsw.RemoveListener(con.id)
close(con.quitChan) close(con.quitChan)
// the write loop closes the websocket connection // the write loop closes the websocket connection
// when it exits its loop, and the read loop // when it exits its loop, and the read loop
@ -285,6 +286,9 @@ func (con *WSConnection) read() {
reaper := time.Tick(time.Second * WSConnectionReaperSeconds) reaper := time.Tick(time.Second * WSConnectionReaperSeconds)
for { for {
select { select {
// TODO: this actually doesn't work
// since ReadMessage blocks. Really it needs its own
// go routine
case <-reaper: case <-reaper:
if con.failedSends > MaxFailedSends { if con.failedSends > MaxFailedSends {
// sending has failed too many times. // sending has failed too many times.


Loading…
Cancel
Save