diff --git a/Makefile b/Makefile index 15947cf00..321dc44c5 100644 --- a/Makefile +++ b/Makefile @@ -7,6 +7,11 @@ build: get_deps go build -o build/barak github.com/tendermint/tendermint/cmd/barak go build -o build/debora github.com/tendermint/tendermint/cmd/debora +no_get: + go build -o build/tendermint github.com/tendermint/tendermint/cmd/tendermint + go build -o build/barak github.com/tendermint/tendermint/cmd/barak + go build -o build/debora github.com/tendermint/tendermint/cmd/debora + build_race: get_deps go build -race -o build/tendermint github.com/tendermint/tendermint/cmd/tendermint go build -race -o build/barak github.com/tendermint/tendermint/cmd/barak diff --git a/blockchain/pool.go b/blockchain/pool.go index 0bd620463..07d4a7705 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -18,15 +18,29 @@ const ( maxRequestsPerPeer = 300 ) +// numTotal = numPending + blocks in the pool we havnt synced yet + var ( requestTimeoutSeconds = time.Duration(1) ) +/* + Peers self report their heights when a new peer joins the block pool. + Starting from whatever we've got (pool.height), we request blocks + in sequence from peers that reported higher heights than ours. + Every so often we ask peers what height they're on so we can keep going. + + Requests are continuously made for blocks of heigher heights until + the limits. If most of the requests have no available peers, and we + are not at peer limits, we can probably switch to consensus reactor +*/ + type BlockPool struct { // block requests requestsMtx sync.Mutex requests map[uint]*bpRequest - height uint // the lowest key in requests. + peerless int32 // number of requests without peers + height uint // the lowest key in requests. numPending int32 numTotal int32 @@ -145,10 +159,13 @@ func (pool *BlockPool) RedoRequest(height uint) { if request.block == nil { panic("Expected block to be non-nil") } + // TODO: record this malfeasance + // maybe punish peer on switch (an invalid block!) pool.RemovePeer(request.peerId) // Lock on peersMtx. request.block = nil request.peerId = "" pool.numPending++ + pool.peerless++ go requestRoutine(pool, height) } @@ -169,9 +186,22 @@ func (pool *BlockPool) setPeerForRequest(height uint, peerId string) { if request == nil { return } + pool.peerless-- request.peerId = peerId } +func (pool *BlockPool) removePeerForRequest(height uint, peerId string) { + pool.requestsMtx.Lock() // Lock + defer pool.requestsMtx.Unlock() + + request := pool.requests[height] + if request == nil { + return + } + pool.peerless++ + request.peerId = "" +} + func (pool *BlockPool) AddBlock(block *types.Block, peerId string) { pool.requestsMtx.Lock() // Lock defer pool.requestsMtx.Unlock() @@ -198,7 +228,7 @@ func (pool *BlockPool) getPeer(peerId string) *bpPeer { return peer } -// Sets the peer's blockchain height. +// Sets the peer's alleged blockchain height. func (pool *BlockPool) SetPeerHeight(peerId string, height uint) { pool.peersMtx.Lock() // Lock defer pool.peersMtx.Unlock() @@ -239,7 +269,6 @@ func (pool *BlockPool) pickIncrAvailablePeer(minHeight uint) *bpPeer { peer.numRequests++ return peer } - return nil } @@ -258,6 +287,7 @@ func (pool *BlockPool) nextHeight() uint { pool.requestsMtx.Lock() // Lock defer pool.requestsMtx.Unlock() + // we make one request per height. return pool.height + uint(pool.numTotal) } @@ -272,6 +302,8 @@ func (pool *BlockPool) makeRequest(height uint) { } pool.requests[height] = request + pool.peerless++ + nextHeight := pool.height + uint(pool.numTotal) if nextHeight == height { pool.numTotal++ @@ -328,7 +360,7 @@ type bpRequest struct { //------------------------------------- // Responsible for making more requests as necessary -// Returns when a block is found (e.g. AddBlock() is called) +// Returns only when a block is found (e.g. AddBlock() is called) func requestRoutine(pool *BlockPool, height uint) { for { var peer *bpPeer = nil @@ -347,15 +379,18 @@ func requestRoutine(pool *BlockPool, height uint) { break PICK_LOOP } + // set the peer, decrement peerless pool.setPeerForRequest(height, peer.id) for try := 0; try < maxTries; try++ { pool.sendRequest(height, peer.id) time.Sleep(requestTimeoutSeconds * time.Second) + // if successful the block is either in the pool, if pool.hasBlock(height) { pool.decrPeer(peer.id) return } + // or already processed and we've moved past it bpHeight, _, _ := pool.GetStatus() if height < bpHeight { pool.decrPeer(peer.id) @@ -363,6 +398,10 @@ func requestRoutine(pool *BlockPool, height uint) { } } + // unset the peer, increment peerless + pool.removePeerForRequest(height, peer.id) + + // this peer failed us, try again pool.RemovePeer(peer.id) pool.sendTimeout(peer.id) } diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 1d6694e13..c5947b0f5 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -10,6 +10,7 @@ import ( "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" + dbm "github.com/tendermint/tendermint/db" "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/p2p" sm "github.com/tendermint/tendermint/state" @@ -24,9 +25,14 @@ const ( // stop syncing when last block's time is // within this much of the system time. stopSyncingDurationMinutes = 10 + // ask for best height every 10s + statusUpdateIntervalSeconds = 10 + // check if we should switch to consensus reactor + switchToConsensusIntervalSeconds = 10 ) -type stateResetter interface { +type consensusReactor interface { + SetSyncing(bool) ResetToState(*sm.State) } @@ -106,7 +112,7 @@ func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { // Implements Reactor func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) { // Send peer our state. - peer.Send(BlockchainChannel, &bcPeerStatusMessage{bcR.store.Height()}) + peer.Send(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()}) } // Implements Reactor @@ -141,8 +147,14 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) case *bcBlockResponseMessage: // Got a block. bcR.pool.AddBlock(msg.Block, src.Key) - case *bcPeerStatusMessage: - // Got a peer status. + case *bcStatusRequestMessage: + // Send peer our state. + queued := src.TrySend(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()}) + if !queued { + // sorry + } + case *bcStatusResponseMessage: + // Got a peer status. Unverified. bcR.pool.SetPeerHeight(src.Key, msg.Height) default: log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) @@ -153,6 +165,8 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) func (bcR *BlockchainReactor) poolRoutine() { trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) + statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) + switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second) FOR_LOOP: for { @@ -176,6 +190,24 @@ FOR_LOOP: if peer != nil { bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout")) } + case _ = <-statusUpdateTicker.C: + // ask for status updates + go bcR.BroadcastStatusRequest() + case _ = <-switchToConsensusTicker.C: + // not thread safe access for peerless and numPending but should be fine + log.Debug("Consensus ticker", "peerless", bcR.pool.peerless, "pending", bcR.pool.numPending, "total", bcR.pool.numTotal) + // NOTE: this condition is very strict right now. may need to weaken + if bcR.pool.numPending == maxPendingRequests && bcR.pool.peerless == bcR.pool.numPending { + log.Warn("Time to switch to consensus reactor!", "height", bcR.pool.height) + bcR.pool.Stop() + stateDB := dbm.GetDB("state") + state := sm.LoadState(stateDB) + + bcR.sw.Reactor("CONSENSUS").(consensusReactor).ResetToState(state) + bcR.sw.Reactor("CONSENSUS").(consensusReactor).SetSyncing(false) + + break FOR_LOOP + } case _ = <-trySyncTicker.C: // chan time //var lastValidatedBlock *types.Block SYNC_LOOP: @@ -215,6 +247,7 @@ FOR_LOOP: // TODO: use other heuristics too besides blocktime. // It's not a security concern, as it only needs to happen // upon node sync, and there's also a second (slower) + // this peer failed us // method of syncing in the consensus reactor. if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute { @@ -238,8 +271,13 @@ FOR_LOOP: } } -func (bcR *BlockchainReactor) BroadcastStatus() error { - bcR.sw.Broadcast(BlockchainChannel, &bcPeerStatusMessage{bcR.store.Height()}) +func (bcR *BlockchainReactor) BroadcastStatusResponse() error { + bcR.sw.Broadcast(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()}) + return nil +} + +func (bcR *BlockchainReactor) BroadcastStatusRequest() error { + bcR.sw.Broadcast(BlockchainChannel, &bcStatusRequestMessage{bcR.store.Height()}) return nil } @@ -252,9 +290,10 @@ func (bcR *BlockchainReactor) SetFireable(evsw events.Fireable) { // Messages const ( - msgTypeBlockRequest = byte(0x10) - msgTypeBlockResponse = byte(0x11) - msgTypePeerStatus = byte(0x20) + msgTypeBlockRequest = byte(0x10) + msgTypeBlockResponse = byte(0x11) + msgTypeStatusResponse = byte(0x20) + msgTypeStatusRequest = byte(0x21) ) type BlockchainMessage interface{} @@ -263,7 +302,8 @@ var _ = binary.RegisterInterface( struct{ BlockchainMessage }{}, binary.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest}, binary.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse}, - binary.ConcreteType{&bcPeerStatusMessage{}, msgTypePeerStatus}, + binary.ConcreteType{&bcStatusResponseMessage{}, msgTypeStatusResponse}, + binary.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest}, ) func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) { @@ -296,10 +336,20 @@ func (m *bcBlockResponseMessage) String() string { //------------------------------------- -type bcPeerStatusMessage struct { +type bcStatusRequestMessage struct { + Height uint +} + +func (m *bcStatusRequestMessage) String() string { + return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height) +} + +//------------------------------------- + +type bcStatusResponseMessage struct { Height uint } -func (m *bcPeerStatusMessage) String() string { - return fmt.Sprintf("[bcPeerStatusMessage %v]", m.Height) +func (m *bcStatusResponseMessage) String() string { + return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height) } diff --git a/consensus/reactor.go b/consensus/reactor.go index 04c9cbc2d..548ef9510 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -41,6 +41,9 @@ type ConsensusReactor struct { blockStore *bc.BlockStore conS *ConsensusState + // if fast sync is running we don't really do anything + syncing bool + evsw events.Fireable } @@ -123,7 +126,7 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { // Implements Reactor func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) { - if !conR.IsRunning() { + if conR.syncing || !conR.IsRunning() { return } @@ -224,6 +227,11 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte } } +// Sets whether or not we're using the blockchain reactor for syncing +func (conR *ConsensusReactor) SetSyncing(syncing bool) { + conR.syncing = syncing +} + // Sets our private validator account for signing votes. func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) { conR.conS.SetPrivValidator(priv) diff --git a/crawler/crawl.go b/crawler/crawl.go new file mode 100644 index 000000000..a44f360e6 --- /dev/null +++ b/crawler/crawl.go @@ -0,0 +1,336 @@ +package crawler + +import ( + "fmt" + "github.com/tendermint/tendermint/binary" + rpctypes "github.com/tendermint/tendermint/rpc/core/types" + rpcclient "github.com/tendermint/tendermint/rpc/core_client" + "github.com/tendermint/tendermint/types" + "time" + + "io/ioutil" +) + +const ( + CheckQueueBufferSize = 100 + NodeQueueBufferSize = 100 + GetPeersTickerSeconds = 5 +) + +//--------------------------------------------------------------------------------------- +// crawler.Node + +// A node is a peer on the network. +type Node struct { + Host string + P2PPort uint16 + RPCPort uint16 + + failed int + connected bool + + client *NodeClient + + LastSeen time.Time + GenesisHash []byte + BlockHeight uint + BlockHistory map[uint]time.Time // when we saw each block + NetInfo *rpctypes.ResponseNetInfo + + Validator bool + + // other peers we heard about this peer from + heardFrom map[string]struct{} +} + +func (n *Node) Address() string { + return fmt.Sprintf("%s:%d", n.Host, n.RPCPort) +} + +// Set the basic status and network info for a node from RPC responses +func (n *Node) SetInfo(status *rpctypes.ResponseStatus, netinfo *rpctypes.ResponseNetInfo) { + n.LastSeen = time.Now() + n.GenesisHash = status.GenesisHash + n.BlockHeight = status.LatestBlockHeight + n.NetInfo = netinfo + // n.Validator +} + +// A node client is used to talk to a node over rpc and websockets +type NodeClient struct { + rpc rpcclient.Client + ws *rpcclient.WSClient +} + +// Create a new client for the node at the given addr +func NewNodeClient(addr string) *NodeClient { + return &NodeClient{ + rpc: rpcclient.NewClient("http://"+addr, "JSONRPC"), + ws: rpcclient.NewWSClient("ws://" + addr + "/events"), + } +} + +// A simple wrapper for mediating access to the maps +type nodeInfo struct { + host string // the new nodes address + port uint16 // node's listening port + from string // the peer that told us about this node + connected bool // move node from nodePool to nodes + disconnected bool // move node from nodes to nodePool +} + +func (ni nodeInfo) unpack() (string, uint16, string, bool, bool) { + return ni.host, ni.port, ni.from, ni.connected, ni.disconnected +} + +// crawler.Node +//--------------------------------------------------------------------------------------- +// crawler.Crawler + +// A crawler has a local node, a set of potential nodes in the nodePool, and connected nodes. +// Maps are only accessed by one go-routine, mediated by the checkQueue +type Crawler struct { + self *Node + client *NodeClient + + checkQueue chan nodeInfo + nodePool map[string]*Node + nodes map[string]*Node + + nodeQueue chan *Node + quit chan struct{} +} + +// Create a new Crawler using the local RPC server at addr +func NewCrawler(host string, port uint16) *Crawler { + return &Crawler{ + self: &Node{Host: host, RPCPort: port, client: NewNodeClient(fmt.Sprintf("%s:%d", host, port))}, + checkQueue: make(chan nodeInfo, CheckQueueBufferSize), + nodePool: make(map[string]*Node), + nodes: make(map[string]*Node), + nodeQueue: make(chan *Node, NodeQueueBufferSize), + quit: make(chan struct{}), + } +} + +func (c *Crawler) Start() error { + // connect to local node first, set info, + // and fire peers onto the checkQueue + if err := c.pollNode(c.self); err != nil { + return err + } + + // connect to weboscket, subscribe to local events + // and run the read loop to listen for new blocks + if r, err := c.self.client.ws.Dial(); err != nil { + fmt.Println(r) + b, _ := ioutil.ReadAll(r.Body) + fmt.Println(string(b)) + return err + } + if err := c.self.client.ws.Subscribe(types.EventStringNewBlock()); err != nil { + return err + } + go c.readLoop(c.self) + + // add ourselves to the nodes list + c.nodes[c.self.Address()] = c.self + + // nodes we hear about get put on the checkQueue + // by pollNode and are handled in the checkLoop. + // if its a node we're not already connected to, + // it gets put on the nodeQueue and + // we attempt to connect in the connectLoop + go c.checkLoop() + go c.connectLoop() + + return nil +} + +func (c *Crawler) Stop() { + close(c.quit) +} + +// listen for events from the node and ping it for peers on a ticker +func (c *Crawler) readLoop(node *Node) { + wsChan := node.client.ws.Read() + getPeersTicker := time.Tick(time.Second * GetPeersTickerSeconds) + + for { + select { + case wsMsg := <-wsChan: + // update the node with his new info + if err := c.consumeMessage(wsMsg, node); err != nil { + // lost the node, put him back on the checkQueu + c.checkNode(nodeInfo{ + host: node.Host, + port: node.RPCPort, + disconnected: true, + }) + + } + case <-getPeersTicker: + if err := c.pollNode(node); err != nil { + // lost the node, put him back on the checkQueu + c.checkNode(nodeInfo{ + host: node.Host, + port: node.RPCPort, + disconnected: true, + }) + } + case <-c.quit: + return + + } + } +} + +func (c *Crawler) consumeMessage(wsMsg *rpcclient.WSMsg, node *Node) error { + if wsMsg.Error != nil { + return wsMsg.Error + } + // unmarshal block event + var response struct { + Event string + Data *types.Block + Error string + } + var err error + binary.ReadJSON(&response, wsMsg.Data, &err) + if err != nil { + return err + } + if response.Error != "" { + return fmt.Errorf(response.Error) + } + block := response.Data + + node.LastSeen = time.Now() + node.BlockHeight = block.Height + node.BlockHistory[block.Height] = node.LastSeen + + return nil +} + +// check nodes against the nodePool map one at a time +// acts as a mutex on nodePool and nodes +func (c *Crawler) checkLoop() { + for { + select { + case ni := <-c.checkQueue: + host, port, from, connected, disconnected := ni.unpack() + addr := fmt.Sprintf("%s:%d", host, port) + // check if we need to swap node between maps (eg. its connected or disconnected) + // NOTE: once we hear about a node, we never forget ... + if connected { + n, _ := c.nodePool[addr] + c.nodes[addr] = n + delete(c.nodePool, addr) + continue + } else if disconnected { + n, _ := c.nodes[addr] + c.nodePool[addr] = n + delete(c.nodes, addr) + continue + } + + // TODO: if address is badly formed + // we should punish ni.from + _ = from + + n, ok := c.nodePool[addr] + // create the node if unknown + if !ok { + n = &Node{Host: host, RPCPort: port} + c.nodePool[addr] = n + } else if n.connected { + // should be removed soon + continue + } + + // queue it for connecting to + c.nodeQueue <- n + + case <-c.quit: + return + } + } +} + +// read off the nodeQueue and attempt to connect to nodes +func (c *Crawler) connectLoop() { + for { + select { + case node := <-c.nodeQueue: + go c.connectToNode(node) + case <-c.quit: + // close all connections + for addr, node := range c.nodes { + _, _ = addr, node + // TODO: close conn + } + return + } + } +} + +func (c *Crawler) connectToNode(node *Node) { + + addr := node.Address() + node.client = NewNodeClient(addr) + + if b, err := node.client.ws.Dial(); err != nil { + fmt.Println("err on ws dial:", b, err) + // set failed, return + } + + // remove from nodePool, add to nodes + c.checkNode(nodeInfo{ + host: node.Host, + port: node.RPCPort, + connected: true, + }) + + if err := c.pollNode(node); err != nil { + // TODO: we had a good ws con + // but failed on rpc?! + // try again or something ... + // if we still fail, report and disconnect + } + + fmt.Println("Successfully connected to node", node.Address()) + + // blocks (until quit or err) + c.readLoop(node) +} + +func (c *Crawler) checkNode(ni nodeInfo) { + c.checkQueue <- ni +} + +func (c *Crawler) pollNode(node *Node) error { + // get the status info + status, err := node.client.rpc.Status() + if err != nil { + return err + } + + // get peers and net info + netinfo, err := node.client.rpc.NetInfo() + if err != nil { + return err + } + + // set the info for the node + node.SetInfo(status, netinfo) + + // fire each peer on the checkQueue + for _, p := range netinfo.Peers { + c.checkNode(nodeInfo{ + host: p.Host, + port: p.RPCPort, + from: node.Address(), + }) + } + return nil +} diff --git a/node/node.go b/node/node.go index cab2c397a..415796367 100644 --- a/node/node.go +++ b/node/node.go @@ -81,6 +81,11 @@ func NewNode() *Node { consensusReactor.SetPrivValidator(privValidator) } + // so the consensus reactor won't do anything until we're synced + if config.App().GetBool("FastSync") { + consensusReactor.SetSyncing(true) + } + sw := p2p.NewSwitch() sw.AddReactor("PEX", pexReactor) sw.AddReactor("MEMPOOL", mempoolReactor) @@ -112,10 +117,6 @@ func (n *Node) Start() { nodeInfo := makeNodeInfo(n.sw) n.sw.SetNodeInfo(nodeInfo) n.sw.Start() - if config.App().GetBool("FastSync") { - // TODO: When FastSync is done, start CONSENSUS. - n.sw.Reactor("CONSENSUS").Stop() - } } func (n *Node) Stop() { diff --git a/p2p/peer.go b/p2p/peer.go index 549200e24..1a63b52e8 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -12,6 +12,12 @@ import ( "github.com/tendermint/tendermint/types" ) +type nodeInfo struct { + Host string + RPCPort uint16 + P2PPort uint16 +} + type Peer struct { outbound bool mconn *MConnection diff --git a/rpc/core_client/ws_client.go b/rpc/core_client/ws_client.go new file mode 100644 index 000000000..71f24bf32 --- /dev/null +++ b/rpc/core_client/ws_client.go @@ -0,0 +1,69 @@ +package core_client + +import ( + "github.com/gorilla/websocket" + "github.com/tendermint/tendermint/rpc" + "net/http" +) + +// A websocket client subscribes and unsubscribes to events +type WSClient struct { + host string + conn *websocket.Conn +} + +// create a new connection +func NewWSClient(addr string) *WSClient { + return &WSClient{ + host: addr, + } +} + +func (wsc *WSClient) Dial() (*http.Response, error) { + dialer := websocket.DefaultDialer + rHeader := http.Header{} + conn, r, err := dialer.Dial(wsc.host, rHeader) + if err != nil { + return r, err + } + wsc.conn = conn + return r, nil +} + +// subscribe to an event +func (wsc *WSClient) Subscribe(eventid string) error { + return wsc.conn.WriteJSON(rpc.WSRequest{ + Type: "subscribe", + Event: eventid, + }) +} + +// unsubscribe from an event +func (wsc *WSClient) Unsubscribe(eventid string) error { + return wsc.conn.WriteJSON(rpc.WSRequest{ + Type: "unsubscribe", + Event: eventid, + }) +} + +type WSMsg struct { + Data []byte + Error error +} + +// returns a channel from which messages can be pulled +// from a go routine that reads the socket. +// if the ws returns an error (eg. closes), we return +func (wsc *WSClient) Read() chan *WSMsg { + ch := make(chan *WSMsg) + go func() { + for { + _, p, err := wsc.conn.ReadMessage() + ch <- &WSMsg{p, err} + if err != nil { + return + } + } + }() + return ch +} diff --git a/rpc/handlers.go b/rpc/handlers.go index 5d2064b89..fc6f7b019 100644 --- a/rpc/handlers.go +++ b/rpc/handlers.go @@ -258,6 +258,7 @@ func (con *WSConnection) Start(evsw *events.EventSwitch) { // close the connection func (con *WSConnection) Stop() { if atomic.CompareAndSwapUint32(&con.stopped, 0, 1) { + con.evsw.RemoveListener(con.id) close(con.quitChan) // the write loop closes the websocket connection // when it exits its loop, and the read loop @@ -285,6 +286,9 @@ func (con *WSConnection) read() { reaper := time.Tick(time.Second * WSConnectionReaperSeconds) for { select { + // TODO: this actually doesn't work + // since ReadMessage blocks. Really it needs its own + // go routine case <-reaper: if con.failedSends > MaxFailedSends { // sending has failed too many times.