From 224ea00917845a36d3e4c9ee593e581623febcdc Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 20 Apr 2015 02:13:04 -0700 Subject: [PATCH 1/3] first draft of network crawler --- crawler/crawl.go | 318 +++++++++++++++++++++++++++++++++++ rpc/core/net.go | 5 +- rpc/core/types/responses.go | 4 +- rpc/core_client/ws_client.go | 47 ++++++ rpc/handlers.go | 4 + 5 files changed, 376 insertions(+), 2 deletions(-) create mode 100644 crawler/crawl.go create mode 100644 rpc/core_client/ws_client.go diff --git a/crawler/crawl.go b/crawler/crawl.go new file mode 100644 index 000000000..c61e0535d --- /dev/null +++ b/crawler/crawl.go @@ -0,0 +1,318 @@ +package crawler + +import ( + "fmt" + rpctypes "github.com/tendermint/tendermint/rpc/core/types" + rpcclient "github.com/tendermint/tendermint/rpc/core_client" + types "github.com/tendermint/tendermint/types" + "sync" + "time" +) + +const ( + CheckQueueBufferSize = 100 + NodeQueueBufferSize = 100 +) + +//--------------------------------------------------------------------------------------- +// crawler.Node + +// A node is a peer on the network. +type Node struct { + Host string + P2PPort uint16 + RPCPort uint16 + + failed int + connected bool + + client *NodeClient + + LastSeen time.Time + GenesisHash []byte + BlockHeight uint + NetInfo *rpctypes.ResponseNetInfo + + Validator bool + + // other peers we heard about this peer from + heardFrom map[string]struct{} +} + +func (n *Node) Address() string { + return fmt.Sprintf("%s:%d", n.Host, n.RPCPort) +} + +// Set the basic status and network info for a node from RPC responses +func (n *Node) SetInfo(status *rpctypes.ResponseStatus, netinfo *rpctypes.ResponseNetInfo) { + n.LastSeen = time.Now() + n.GenesisHash = status.GenesisHash + n.BlockHeight = status.LatestBlockHeight + n.NetInfo = netinfo + // n.Validator +} + +// A node client is used to talk to a node over rpc and websockets +type NodeClient struct { + rpc rpcclient.Client + ws *rpcclient.WSClient +} + +// Create a new client for the node at the given addr +func NewNodeClient(addr string) *NodeClient { + return &NodeClient{ + rpc: rpcclient.NewClient(addr, "JSONRPC"), + ws: rpcclient.NewWSClient(addr), + } +} + +// A simple wrapper for mediating access to the maps +type nodeInfo struct { + host string // the new nodes address + port uint16 + from string // the peer that told us about this node + removed bool // whether to remove from nodePool +} + +func (ni nodeInfo) unpack() (string, uint16, string, bool) { + return ni.host, ni.port, ni.from, ni.removed +} + +// crawler.Node +//--------------------------------------------------------------------------------------- +// crawler.Crawler + +// A crawler has a local node, a set of potential nodes in the nodePool +// and connected nodes. Maps are only accessed by one go-routine, mediated through channels +type Crawler struct { + self *Node + client *NodeClient + + checkQueue chan nodeInfo + nodePool map[string]*Node + nodes map[string]*Node + + nodeQueue chan *Node + quit chan struct{} + + // waits for checkQueue to empty + // so we can re-poll all nodes + wg sync.WaitGroup +} + +// Create a new Crawler using the local RPC server at addr +func NewCrawler(host string, port uint16) *Crawler { + return &Crawler{ + self: &Node{Host: host, RPCPort: port}, + client: NewNodeClient(fmt.Sprintf("%s:%d", host, port)), + checkQueue: make(chan nodeInfo, CheckQueueBufferSize), + nodePool: make(map[string]*Node), + nodes: make(map[string]*Node), + nodeQueue: make(chan *Node, NodeQueueBufferSize), + quit: make(chan struct{}), + } +} + +func (c *Crawler) checkNode(ni nodeInfo) { + c.wg.Add(1) + c.checkQueue <- ni +} + +func (c *Crawler) Start() error { + // make sure we can connect + // to our local node first + status, err := c.client.rpc.Status() + if err != nil { + return err + } + + // connect to weboscket and subscribe to local events + if err = c.client.ws.Dial(); err != nil { + return err + } + if err = c.client.ws.Subscribe(types.EventStringNewBlock()); err != nil { + return err + } + go c.readLocalEvents() + + // add ourselves to the nodes list + c.nodes[c.self.Address()] = c.self + + // get peers from local node + netinfo, err := c.client.rpc.NetInfo() + if err != nil { + return err + } + + // set the info for ourselves + c.self.SetInfo(status, netinfo) + + // fire each peer on the checkQueue + for _, p := range netinfo.Peers { + c.checkNode(nodeInfo{ + host: p.Host, + port: p.RPCPort, + }) + } + + // nodes we hear about get put on the + // checkQueue and are handled in the checkLoop + // if its a node we're not already connected to, + // it gets put it on the nodeQueue and + // we attempt to connect in the connectLoop + go c.checkLoop() + go c.connectLoop() + + // finally, a routine with a ticker to poll nodes for peers + go c.pollNodesRoutine() + + return nil +} + +func (c *Crawler) Stop() { + close(c.quit) +} + +func (c *Crawler) readLocalEvents() { + // read on our ws for NewBlocks +} + +// check nodes against the nodePool map one at a time +// acts as a mutex on nodePool +func (c *Crawler) checkLoop() { + c.wg.Add(1) + for { + // every time the loop restarts + // it means we processed from the checkQueue + // (except the first time, hence the extra wg.Add) + c.wg.Done() + select { + case ni := <-c.checkQueue: + host, port, from, removed := ni.unpack() + addr := fmt.Sprintf("%s:%d", host, port) + // check if the node should be removed + // (eg. if its been connected to or abandoned) + if removed { + n, _ := c.nodePool[addr] + if n.connected { + c.nodes[addr] = n + } + delete(c.nodePool, addr) + continue + } + + // TODO: if address is badly formed + // we should punish ni.from + _ = from + + n, ok := c.nodePool[addr] + // create the node if unknown + if !ok { + n = &Node{Host: host, RPCPort: port} + c.nodes[addr] = n + } else if n.connected { + // should be removed soon + continue + } + + // queue it for connecting to + c.nodeQueue <- n + + case <-c.quit: + return + } + } +} + +// read off the nodeQueue and attempt to connect to nodes +func (c *Crawler) connectLoop() { + for { + select { + case node := <-c.nodeQueue: + go c.connectToNode(node) + case <-c.quit: + // close all connections + for addr, node := range c.nodes { + _, _ = addr, node + // TODO: close conn + } + return + } + } +} + +func (c *Crawler) connectToNode(node *Node) { + + addr := node.Address() + node.client = NewNodeClient(addr) + + if err := node.client.ws.Dial(); err != nil { + // set failed, return + } + + // remove from nodePool, add to nodes + c.checkNode(nodeInfo{ + host: node.Host, + port: node.RPCPort, + removed: true, + }) + + c.pollNode(node) + + // TODO: read loop +} + +func (c *Crawler) pollNode(node *Node) error { + status, err := node.client.rpc.Status() + if err != nil { + return err + } + + // get peers + netinfo, err := node.client.rpc.NetInfo() + if err != nil { + return err + } + + node.SetInfo(status, netinfo) + + // fire each peer on the checkQueue + for _, p := range netinfo.Peers { + c.checkNode(nodeInfo{ + host: p.Host, + port: p.RPCPort, + from: node.Address(), + }) + } + return nil +} + +// wait for the checkQueue to empty and poll all the nodes +func (c *Crawler) pollNodesRoutine() { + for { + c.wg.Wait() + + ticker := time.Tick(time.Second) + // wait a few seconds to make sure we really have nothing + time.Sleep(time.Second * 5) + ch := make(chan struct{}) + go func() { + c.wg.Wait() + ch <- struct{}{} + }() + + // + select { + case <-ticker: + // the checkQueue has filled up again, move on + continue + case <-ch: + // the checkQueue is legit empty, + // TODO: poll the nodes! + + case <-c.quit: + return + } + } + +} diff --git a/rpc/core/net.go b/rpc/core/net.go index 3fcb37e13..62973a680 100644 --- a/rpc/core/net.go +++ b/rpc/core/net.go @@ -42,8 +42,11 @@ func NetInfo() (*ctypes.ResponseNetInfo, error) { peers := []ctypes.Peer{} for _, peer := range p2pSwitch.Peers().List() { peers = append(peers, ctypes.Peer{ - Address: peer.Connection().RemoteAddress.String(), + //Address: peer.Connection().RemoteAddress.String(), + Host: peer.Nodeinfo.Host, IsOutbound: peer.IsOutbound(), + P2PPort: peer.Nodeinfo.P2PPort, + RPCPort: peer.Nodeinfo.RPCPort, }) } return &ctypes.ResponseNetInfo{ diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index e7c71e259..ee662105d 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -78,7 +78,9 @@ type ResponseNetInfo struct { } type Peer struct { - Address string + Host string // ip + P2PPort uint16 + RPCPort uint16 IsOutbound bool } diff --git a/rpc/core_client/ws_client.go b/rpc/core_client/ws_client.go new file mode 100644 index 000000000..ce0824469 --- /dev/null +++ b/rpc/core_client/ws_client.go @@ -0,0 +1,47 @@ +package core_client + +import ( + "github.com/gorilla/websocket" + "github.com/tendermint/tendermint/rpc" + "net/http" +) + +// A websocket client subscribes and unsubscribes to events +type WSClient struct { + host string + conn *websocket.Conn +} + +// create a new connection +func NewWSClient(addr string) *WSClient { + return &WSClient{ + host: addr, + } +} + +func (wsc *WSClient) Dial() error { + dialer := websocket.DefaultDialer + rHeader := http.Header{} + conn, _, err := dialer.Dial(wsc.host, rHeader) + if err != nil { + return err + } + wsc.conn = conn + return nil +} + +// subscribe to an event +func (wsc *WSClient) Subscribe(eventid string) error { + return wsc.conn.WriteJSON(rpc.WSRequest{ + Type: "subscribe", + Event: eventid, + }) +} + +// unsubscribe from an event +func (wsc *WSClient) Unsubscribe(eventid string) error { + return wsc.conn.WriteJSON(rpc.WSRequest{ + Type: "unsubscribe", + Event: eventid, + }) +} diff --git a/rpc/handlers.go b/rpc/handlers.go index 5d2064b89..fc6f7b019 100644 --- a/rpc/handlers.go +++ b/rpc/handlers.go @@ -258,6 +258,7 @@ func (con *WSConnection) Start(evsw *events.EventSwitch) { // close the connection func (con *WSConnection) Stop() { if atomic.CompareAndSwapUint32(&con.stopped, 0, 1) { + con.evsw.RemoveListener(con.id) close(con.quitChan) // the write loop closes the websocket connection // when it exits its loop, and the read loop @@ -285,6 +286,9 @@ func (con *WSConnection) read() { reaper := time.Tick(time.Second * WSConnectionReaperSeconds) for { select { + // TODO: this actually doesn't work + // since ReadMessage blocks. Really it needs its own + // go routine case <-reaper: if con.failedSends > MaxFailedSends { // sending has failed too many times. From 882a82bad4ff538a7f7d4a50a1f301ce66a7a0e8 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 20 Apr 2015 14:00:19 -0700 Subject: [PATCH 2/3] more crawler work --- crawler/crawl.go | 244 +++++++++++++++++++---------------- p2p/peer.go | 8 ++ rpc/core_client/ws_client.go | 30 ++++- 3 files changed, 165 insertions(+), 117 deletions(-) diff --git a/crawler/crawl.go b/crawler/crawl.go index c61e0535d..a44f360e6 100644 --- a/crawler/crawl.go +++ b/crawler/crawl.go @@ -2,16 +2,19 @@ package crawler import ( "fmt" + "github.com/tendermint/tendermint/binary" rpctypes "github.com/tendermint/tendermint/rpc/core/types" rpcclient "github.com/tendermint/tendermint/rpc/core_client" - types "github.com/tendermint/tendermint/types" - "sync" + "github.com/tendermint/tendermint/types" "time" + + "io/ioutil" ) const ( - CheckQueueBufferSize = 100 - NodeQueueBufferSize = 100 + CheckQueueBufferSize = 100 + NodeQueueBufferSize = 100 + GetPeersTickerSeconds = 5 ) //--------------------------------------------------------------------------------------- @@ -28,10 +31,11 @@ type Node struct { client *NodeClient - LastSeen time.Time - GenesisHash []byte - BlockHeight uint - NetInfo *rpctypes.ResponseNetInfo + LastSeen time.Time + GenesisHash []byte + BlockHeight uint + BlockHistory map[uint]time.Time // when we saw each block + NetInfo *rpctypes.ResponseNetInfo Validator bool @@ -61,29 +65,30 @@ type NodeClient struct { // Create a new client for the node at the given addr func NewNodeClient(addr string) *NodeClient { return &NodeClient{ - rpc: rpcclient.NewClient(addr, "JSONRPC"), - ws: rpcclient.NewWSClient(addr), + rpc: rpcclient.NewClient("http://"+addr, "JSONRPC"), + ws: rpcclient.NewWSClient("ws://" + addr + "/events"), } } // A simple wrapper for mediating access to the maps type nodeInfo struct { - host string // the new nodes address - port uint16 - from string // the peer that told us about this node - removed bool // whether to remove from nodePool + host string // the new nodes address + port uint16 // node's listening port + from string // the peer that told us about this node + connected bool // move node from nodePool to nodes + disconnected bool // move node from nodes to nodePool } -func (ni nodeInfo) unpack() (string, uint16, string, bool) { - return ni.host, ni.port, ni.from, ni.removed +func (ni nodeInfo) unpack() (string, uint16, string, bool, bool) { + return ni.host, ni.port, ni.from, ni.connected, ni.disconnected } // crawler.Node //--------------------------------------------------------------------------------------- // crawler.Crawler -// A crawler has a local node, a set of potential nodes in the nodePool -// and connected nodes. Maps are only accessed by one go-routine, mediated through channels +// A crawler has a local node, a set of potential nodes in the nodePool, and connected nodes. +// Maps are only accessed by one go-routine, mediated by the checkQueue type Crawler struct { self *Node client *NodeClient @@ -94,17 +99,12 @@ type Crawler struct { nodeQueue chan *Node quit chan struct{} - - // waits for checkQueue to empty - // so we can re-poll all nodes - wg sync.WaitGroup } // Create a new Crawler using the local RPC server at addr func NewCrawler(host string, port uint16) *Crawler { return &Crawler{ - self: &Node{Host: host, RPCPort: port}, - client: NewNodeClient(fmt.Sprintf("%s:%d", host, port)), + self: &Node{Host: host, RPCPort: port, client: NewNodeClient(fmt.Sprintf("%s:%d", host, port))}, checkQueue: make(chan nodeInfo, CheckQueueBufferSize), nodePool: make(map[string]*Node), nodes: make(map[string]*Node), @@ -113,59 +113,37 @@ func NewCrawler(host string, port uint16) *Crawler { } } -func (c *Crawler) checkNode(ni nodeInfo) { - c.wg.Add(1) - c.checkQueue <- ni -} - func (c *Crawler) Start() error { - // make sure we can connect - // to our local node first - status, err := c.client.rpc.Status() - if err != nil { + // connect to local node first, set info, + // and fire peers onto the checkQueue + if err := c.pollNode(c.self); err != nil { return err } - // connect to weboscket and subscribe to local events - if err = c.client.ws.Dial(); err != nil { + // connect to weboscket, subscribe to local events + // and run the read loop to listen for new blocks + if r, err := c.self.client.ws.Dial(); err != nil { + fmt.Println(r) + b, _ := ioutil.ReadAll(r.Body) + fmt.Println(string(b)) return err } - if err = c.client.ws.Subscribe(types.EventStringNewBlock()); err != nil { + if err := c.self.client.ws.Subscribe(types.EventStringNewBlock()); err != nil { return err } - go c.readLocalEvents() + go c.readLoop(c.self) // add ourselves to the nodes list c.nodes[c.self.Address()] = c.self - // get peers from local node - netinfo, err := c.client.rpc.NetInfo() - if err != nil { - return err - } - - // set the info for ourselves - c.self.SetInfo(status, netinfo) - - // fire each peer on the checkQueue - for _, p := range netinfo.Peers { - c.checkNode(nodeInfo{ - host: p.Host, - port: p.RPCPort, - }) - } - - // nodes we hear about get put on the - // checkQueue and are handled in the checkLoop + // nodes we hear about get put on the checkQueue + // by pollNode and are handled in the checkLoop. // if its a node we're not already connected to, - // it gets put it on the nodeQueue and + // it gets put on the nodeQueue and // we attempt to connect in the connectLoop go c.checkLoop() go c.connectLoop() - // finally, a routine with a ticker to poll nodes for peers - go c.pollNodesRoutine() - return nil } @@ -173,32 +151,87 @@ func (c *Crawler) Stop() { close(c.quit) } -func (c *Crawler) readLocalEvents() { - // read on our ws for NewBlocks +// listen for events from the node and ping it for peers on a ticker +func (c *Crawler) readLoop(node *Node) { + wsChan := node.client.ws.Read() + getPeersTicker := time.Tick(time.Second * GetPeersTickerSeconds) + + for { + select { + case wsMsg := <-wsChan: + // update the node with his new info + if err := c.consumeMessage(wsMsg, node); err != nil { + // lost the node, put him back on the checkQueu + c.checkNode(nodeInfo{ + host: node.Host, + port: node.RPCPort, + disconnected: true, + }) + + } + case <-getPeersTicker: + if err := c.pollNode(node); err != nil { + // lost the node, put him back on the checkQueu + c.checkNode(nodeInfo{ + host: node.Host, + port: node.RPCPort, + disconnected: true, + }) + } + case <-c.quit: + return + + } + } +} + +func (c *Crawler) consumeMessage(wsMsg *rpcclient.WSMsg, node *Node) error { + if wsMsg.Error != nil { + return wsMsg.Error + } + // unmarshal block event + var response struct { + Event string + Data *types.Block + Error string + } + var err error + binary.ReadJSON(&response, wsMsg.Data, &err) + if err != nil { + return err + } + if response.Error != "" { + return fmt.Errorf(response.Error) + } + block := response.Data + + node.LastSeen = time.Now() + node.BlockHeight = block.Height + node.BlockHistory[block.Height] = node.LastSeen + + return nil } // check nodes against the nodePool map one at a time -// acts as a mutex on nodePool +// acts as a mutex on nodePool and nodes func (c *Crawler) checkLoop() { - c.wg.Add(1) for { - // every time the loop restarts - // it means we processed from the checkQueue - // (except the first time, hence the extra wg.Add) - c.wg.Done() select { case ni := <-c.checkQueue: - host, port, from, removed := ni.unpack() + host, port, from, connected, disconnected := ni.unpack() addr := fmt.Sprintf("%s:%d", host, port) - // check if the node should be removed - // (eg. if its been connected to or abandoned) - if removed { + // check if we need to swap node between maps (eg. its connected or disconnected) + // NOTE: once we hear about a node, we never forget ... + if connected { n, _ := c.nodePool[addr] - if n.connected { - c.nodes[addr] = n - } + c.nodes[addr] = n delete(c.nodePool, addr) continue + } else if disconnected { + n, _ := c.nodes[addr] + c.nodePool[addr] = n + delete(c.nodes, addr) + continue } // TODO: if address is badly formed @@ -209,7 +242,7 @@ func (c *Crawler) checkLoop() { // create the node if unknown if !ok { n = &Node{Host: host, RPCPort: port} - c.nodes[addr] = n + c.nodePool[addr] = n } else if n.connected { // should be removed soon continue @@ -246,34 +279,49 @@ func (c *Crawler) connectToNode(node *Node) { addr := node.Address() node.client = NewNodeClient(addr) - if err := node.client.ws.Dial(); err != nil { + if b, err := node.client.ws.Dial(); err != nil { + fmt.Println("err on ws dial:", b, err) // set failed, return } // remove from nodePool, add to nodes c.checkNode(nodeInfo{ - host: node.Host, - port: node.RPCPort, - removed: true, + host: node.Host, + port: node.RPCPort, + connected: true, }) - c.pollNode(node) + if err := c.pollNode(node); err != nil { + // TODO: we had a good ws con + // but failed on rpc?! + // try again or something ... + // if we still fail, report and disconnect + } - // TODO: read loop + fmt.Println("Successfully connected to node", node.Address()) + + // blocks (until quit or err) + c.readLoop(node) +} + +func (c *Crawler) checkNode(ni nodeInfo) { + c.checkQueue <- ni } func (c *Crawler) pollNode(node *Node) error { + // get the status info status, err := node.client.rpc.Status() if err != nil { return err } - // get peers + // get peers and net info netinfo, err := node.client.rpc.NetInfo() if err != nil { return err } + // set the info for the node node.SetInfo(status, netinfo) // fire each peer on the checkQueue @@ -286,33 +334,3 @@ func (c *Crawler) pollNode(node *Node) error { } return nil } - -// wait for the checkQueue to empty and poll all the nodes -func (c *Crawler) pollNodesRoutine() { - for { - c.wg.Wait() - - ticker := time.Tick(time.Second) - // wait a few seconds to make sure we really have nothing - time.Sleep(time.Second * 5) - ch := make(chan struct{}) - go func() { - c.wg.Wait() - ch <- struct{}{} - }() - - // - select { - case <-ticker: - // the checkQueue has filled up again, move on - continue - case <-ch: - // the checkQueue is legit empty, - // TODO: poll the nodes! - - case <-c.quit: - return - } - } - -} diff --git a/p2p/peer.go b/p2p/peer.go index 173297eb0..5340366fd 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -10,11 +10,19 @@ import ( . "github.com/tendermint/tendermint/common" ) +type nodeInfo struct { + Host string + RPCPort uint16 + P2PPort uint16 +} + type Peer struct { outbound bool mconn *MConnection running uint32 + Nodeinfo *nodeInfo + Key string Data *CMap // User data. } diff --git a/rpc/core_client/ws_client.go b/rpc/core_client/ws_client.go index ce0824469..71f24bf32 100644 --- a/rpc/core_client/ws_client.go +++ b/rpc/core_client/ws_client.go @@ -19,15 +19,15 @@ func NewWSClient(addr string) *WSClient { } } -func (wsc *WSClient) Dial() error { +func (wsc *WSClient) Dial() (*http.Response, error) { dialer := websocket.DefaultDialer rHeader := http.Header{} - conn, _, err := dialer.Dial(wsc.host, rHeader) + conn, r, err := dialer.Dial(wsc.host, rHeader) if err != nil { - return err + return r, err } wsc.conn = conn - return nil + return r, nil } // subscribe to an event @@ -45,3 +45,25 @@ func (wsc *WSClient) Unsubscribe(eventid string) error { Event: eventid, }) } + +type WSMsg struct { + Data []byte + Error error +} + +// returns a channel from which messages can be pulled +// from a go routine that reads the socket. +// if the ws returns an error (eg. closes), we return +func (wsc *WSClient) Read() chan *WSMsg { + ch := make(chan *WSMsg) + go func() { + for { + _, p, err := wsc.conn.ReadMessage() + ch <- &WSMsg{p, err} + if err != nil { + return + } + } + }() + return ch +} From d54bf6bcd547b978565368a799e5e5e54e955f88 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 21 Apr 2015 19:51:23 -0700 Subject: [PATCH 3/3] blockchain reactor to consensus reactor transition on catchup --- Makefile | 5 +++ blockchain/pool.go | 47 +++++++++++++++++++++++--- blockchain/reactor.go | 78 +++++++++++++++++++++++++++++++++++-------- consensus/reactor.go | 10 +++++- node/node.go | 9 ++--- 5 files changed, 126 insertions(+), 23 deletions(-) diff --git a/Makefile b/Makefile index f79cfeed4..4ede2e46b 100644 --- a/Makefile +++ b/Makefile @@ -7,6 +7,11 @@ build: get_deps go build -o build/barak github.com/tendermint/tendermint/cmd/barak go build -o build/debora github.com/tendermint/tendermint/cmd/debora +no_get: + go build -o build/tendermint github.com/tendermint/tendermint/cmd/tendermint + go build -o build/barak github.com/tendermint/tendermint/cmd/barak + go build -o build/debora github.com/tendermint/tendermint/cmd/debora + build_race: get_deps go build -race -o build/tendermint github.com/tendermint/tendermint/cmd/tendermint go build -race -o build/barak github.com/tendermint/tendermint/cmd/barak diff --git a/blockchain/pool.go b/blockchain/pool.go index 0bd620463..07d4a7705 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -18,15 +18,29 @@ const ( maxRequestsPerPeer = 300 ) +// numTotal = numPending + blocks in the pool we havnt synced yet + var ( requestTimeoutSeconds = time.Duration(1) ) +/* + Peers self report their heights when a new peer joins the block pool. + Starting from whatever we've got (pool.height), we request blocks + in sequence from peers that reported higher heights than ours. + Every so often we ask peers what height they're on so we can keep going. + + Requests are continuously made for blocks of heigher heights until + the limits. If most of the requests have no available peers, and we + are not at peer limits, we can probably switch to consensus reactor +*/ + type BlockPool struct { // block requests requestsMtx sync.Mutex requests map[uint]*bpRequest - height uint // the lowest key in requests. + peerless int32 // number of requests without peers + height uint // the lowest key in requests. numPending int32 numTotal int32 @@ -145,10 +159,13 @@ func (pool *BlockPool) RedoRequest(height uint) { if request.block == nil { panic("Expected block to be non-nil") } + // TODO: record this malfeasance + // maybe punish peer on switch (an invalid block!) pool.RemovePeer(request.peerId) // Lock on peersMtx. request.block = nil request.peerId = "" pool.numPending++ + pool.peerless++ go requestRoutine(pool, height) } @@ -169,9 +186,22 @@ func (pool *BlockPool) setPeerForRequest(height uint, peerId string) { if request == nil { return } + pool.peerless-- request.peerId = peerId } +func (pool *BlockPool) removePeerForRequest(height uint, peerId string) { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() + + request := pool.requests[height] + if request == nil { + return + } + pool.peerless++ + request.peerId = "" +} + func (pool *BlockPool) AddBlock(block *types.Block, peerId string) { pool.requestsMtx.Lock() // Lock defer pool.requestsMtx.Unlock() @@ -198,7 +228,7 @@ func (pool *BlockPool) getPeer(peerId string) *bpPeer { return peer } -// Sets the peer's blockchain height. +// Sets the peer's alleged blockchain height. func (pool *BlockPool) SetPeerHeight(peerId string, height uint) { pool.peersMtx.Lock() // Lock defer pool.peersMtx.Unlock() @@ -239,7 +269,6 @@ func (pool *BlockPool) pickIncrAvailablePeer(minHeight uint) *bpPeer { peer.numRequests++ return peer } - return nil } @@ -258,6 +287,7 @@ func (pool *BlockPool) nextHeight() uint { pool.requestsMtx.Lock() // Lock defer pool.requestsMtx.Unlock() + // we make one request per height. return pool.height + uint(pool.numTotal) } @@ -272,6 +302,8 @@ func (pool *BlockPool) makeRequest(height uint) { } pool.requests[height] = request + pool.peerless++ + nextHeight := pool.height + uint(pool.numTotal) if nextHeight == height { pool.numTotal++ @@ -328,7 +360,7 @@ type bpRequest struct { //------------------------------------- // Responsible for making more requests as necessary -// Returns when a block is found (e.g. AddBlock() is called) +// Returns only when a block is found (e.g. AddBlock() is called) func requestRoutine(pool *BlockPool, height uint) { for { var peer *bpPeer = nil @@ -347,15 +379,18 @@ func requestRoutine(pool *BlockPool, height uint) { break PICK_LOOP } + // set the peer, decrement peerless pool.setPeerForRequest(height, peer.id) for try := 0; try < maxTries; try++ { pool.sendRequest(height, peer.id) time.Sleep(requestTimeoutSeconds * time.Second) + // if successful the block is either in the pool, if pool.hasBlock(height) { pool.decrPeer(peer.id) return } + // or already processed and we've moved past it bpHeight, _, _ := pool.GetStatus() if height < bpHeight { pool.decrPeer(peer.id) @@ -363,6 +398,10 @@ func requestRoutine(pool *BlockPool, height uint) { } } + // unset the peer, increment peerless + pool.removePeerForRequest(height, peer.id) + + // this peer failed us, try again pool.RemovePeer(peer.id) pool.sendTimeout(peer.id) } diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 9bdfe27cf..c5947b0f5 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -10,6 +10,7 @@ import ( "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" + dbm "github.com/tendermint/tendermint/db" "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/p2p" sm "github.com/tendermint/tendermint/state" @@ -24,9 +25,14 @@ const ( // stop syncing when last block's time is // within this much of the system time. stopSyncingDurationMinutes = 10 + // ask for best height every 10s + statusUpdateIntervalSeconds = 10 + // check if we should switch to consensus reactor + switchToConsensusIntervalSeconds = 10 ) -type stateResetter interface { +type consensusReactor interface { + SetSyncing(bool) ResetToState(*sm.State) } @@ -76,8 +82,8 @@ func (bcR *BlockchainReactor) Start(sw *p2p.Switch) { if atomic.CompareAndSwapUint32(&bcR.running, 0, 1) { log.Info("Starting BlockchainReactor") bcR.sw = sw - bcR.pool.Start() if bcR.sync { + bcR.pool.Start() go bcR.poolRoutine() } } @@ -106,7 +112,7 @@ func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { // Implements Reactor func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) { // Send peer our state. - peer.Send(BlockchainChannel, &bcPeerStatusMessage{bcR.store.Height()}) + peer.Send(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()}) } // Implements Reactor @@ -141,8 +147,14 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) case *bcBlockResponseMessage: // Got a block. bcR.pool.AddBlock(msg.Block, src.Key) - case *bcPeerStatusMessage: - // Got a peer status. + case *bcStatusRequestMessage: + // Send peer our state. + queued := src.TrySend(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()}) + if !queued { + // sorry + } + case *bcStatusResponseMessage: + // Got a peer status. Unverified. bcR.pool.SetPeerHeight(src.Key, msg.Height) default: log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) @@ -153,6 +165,8 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) func (bcR *BlockchainReactor) poolRoutine() { trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) + statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) + switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second) FOR_LOOP: for { @@ -176,6 +190,24 @@ FOR_LOOP: if peer != nil { bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout")) } + case _ = <-statusUpdateTicker.C: + // ask for status updates + go bcR.BroadcastStatusRequest() + case _ = <-switchToConsensusTicker.C: + // not thread safe access for peerless and numPending but should be fine + log.Debug("Consensus ticker", "peerless", bcR.pool.peerless, "pending", bcR.pool.numPending, "total", bcR.pool.numTotal) + // NOTE: this condition is very strict right now. may need to weaken + if bcR.pool.numPending == maxPendingRequests && bcR.pool.peerless == bcR.pool.numPending { + log.Warn("Time to switch to consensus reactor!", "height", bcR.pool.height) + bcR.pool.Stop() + stateDB := dbm.GetDB("state") + state := sm.LoadState(stateDB) + + bcR.sw.Reactor("CONSENSUS").(consensusReactor).ResetToState(state) + bcR.sw.Reactor("CONSENSUS").(consensusReactor).SetSyncing(false) + + break FOR_LOOP + } case _ = <-trySyncTicker.C: // chan time //var lastValidatedBlock *types.Block SYNC_LOOP: @@ -215,6 +247,7 @@ FOR_LOOP: // TODO: use other heuristics too besides blocktime. // It's not a security concern, as it only needs to happen // upon node sync, and there's also a second (slower) + // this peer failed us // method of syncing in the consensus reactor. if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute { @@ -238,8 +271,13 @@ FOR_LOOP: } } -func (bcR *BlockchainReactor) BroadcastStatus() error { - bcR.sw.Broadcast(BlockchainChannel, &bcPeerStatusMessage{bcR.store.Height()}) +func (bcR *BlockchainReactor) BroadcastStatusResponse() error { + bcR.sw.Broadcast(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()}) + return nil +} + +func (bcR *BlockchainReactor) BroadcastStatusRequest() error { + bcR.sw.Broadcast(BlockchainChannel, &bcStatusRequestMessage{bcR.store.Height()}) return nil } @@ -252,9 +290,10 @@ func (bcR *BlockchainReactor) SetFireable(evsw events.Fireable) { // Messages const ( - msgTypeBlockRequest = byte(0x10) - msgTypeBlockResponse = byte(0x11) - msgTypePeerStatus = byte(0x20) + msgTypeBlockRequest = byte(0x10) + msgTypeBlockResponse = byte(0x11) + msgTypeStatusResponse = byte(0x20) + msgTypeStatusRequest = byte(0x21) ) type BlockchainMessage interface{} @@ -263,7 +302,8 @@ var _ = binary.RegisterInterface( struct{ BlockchainMessage }{}, binary.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest}, binary.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse}, - binary.ConcreteType{&bcPeerStatusMessage{}, msgTypePeerStatus}, + binary.ConcreteType{&bcStatusResponseMessage{}, msgTypeStatusResponse}, + binary.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest}, ) func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) { @@ -296,10 +336,20 @@ func (m *bcBlockResponseMessage) String() string { //------------------------------------- -type bcPeerStatusMessage struct { +type bcStatusRequestMessage struct { + Height uint +} + +func (m *bcStatusRequestMessage) String() string { + return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height) +} + +//------------------------------------- + +type bcStatusResponseMessage struct { Height uint } -func (m *bcPeerStatusMessage) String() string { - return fmt.Sprintf("[bcPeerStatusMessage %v]", m.Height) +func (m *bcStatusResponseMessage) String() string { + return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height) } diff --git a/consensus/reactor.go b/consensus/reactor.go index ffc1c9219..2b6af88b2 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -41,6 +41,9 @@ type ConsensusReactor struct { blockStore *bc.BlockStore conS *ConsensusState + // if fast sync is running we don't really do anything + syncing bool + evsw events.Fireable } @@ -124,7 +127,7 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { // Implements Reactor func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) { - if !conR.IsRunning() { + if conR.syncing || !conR.IsRunning() { return } @@ -224,6 +227,11 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte } } +// Sets whether or not we're using the blockchain reactor for syncing +func (conR *ConsensusReactor) SetSyncing(syncing bool) { + conR.syncing = syncing +} + // Sets our private validator account for signing votes. func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) { conR.conS.SetPrivValidator(priv) diff --git a/node/node.go b/node/node.go index cab2c397a..415796367 100644 --- a/node/node.go +++ b/node/node.go @@ -81,6 +81,11 @@ func NewNode() *Node { consensusReactor.SetPrivValidator(privValidator) } + // so the consensus reactor won't do anything until we're synced + if config.App().GetBool("FastSync") { + consensusReactor.SetSyncing(true) + } + sw := p2p.NewSwitch() sw.AddReactor("PEX", pexReactor) sw.AddReactor("MEMPOOL", mempoolReactor) @@ -112,10 +117,6 @@ func (n *Node) Start() { nodeInfo := makeNodeInfo(n.sw) n.sw.SetNodeInfo(nodeInfo) n.sw.Start() - if config.App().GetBool("FastSync") { - // TODO: When FastSync is done, start CONSENSUS. - n.sw.Reactor("CONSENSUS").Stop() - } } func (n *Node) Stop() {