diff --git a/blockchain/pool.go b/blockchain/pool.go index 1d1fd6f18..a4cbe6d24 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -23,8 +23,8 @@ var ( ) /* - Peers self report their heights when a new peer joins the block pool. - Starting from pool.height (inclusive), we request blocks + Peers self report their heights when we join the block pool. + Starting from our latest pool.height, we request blocks in sequence from peers that reported higher heights than ours. Every so often we ask peers what height they're on so we can keep going. @@ -94,7 +94,7 @@ RUN_LOOP: if atomic.LoadInt32(&pool.running) == 0 { break RUN_LOOP } - _, numPending := pool.GetStatus() + _, numPending, _ := pool.GetStatus() if numPending >= maxPendingRequests { // sleep for a bit. time.Sleep(requestIntervalMS * time.Millisecond) @@ -108,11 +108,11 @@ RUN_LOOP: } } -func (pool *BlockPool) GetStatus() (int, int32) { +func (pool *BlockPool) GetStatus() (int, int32, int32) { pool.requestsMtx.Lock() // Lock defer pool.requestsMtx.Unlock() - return pool.height, pool.numPending + return pool.height, pool.numPending, pool.numUnassigned } // We need to see the second block's Validation to validate the first block. @@ -378,7 +378,7 @@ func requestRoutine(pool *BlockPool, height int) { return } // or already processed and we've moved past it - bpHeight, _ := pool.GetStatus() + bpHeight, _, _ := pool.GetStatus() if height < bpHeight { pool.decrPeer(peer.id) return diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 68053801f..754f4644a 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -199,19 +199,18 @@ FOR_LOOP: // ask for status updates go bcR.BroadcastStatusRequest() case _ = <-switchToConsensusTicker.C: - // not thread safe access for numUnassigned and numPending but should be fine - // TODO make threadsafe and use exposed functions + height, numUnassigned, numPending := bcR.pool.GetStatus() outbound, inbound, _ := bcR.sw.NumPeers() - log.Debug("Consensus ticker", "numUnassigned", bcR.pool.numUnassigned, "numPending", bcR.pool.numPending, + log.Debug("Consensus ticker", "numUnassigned", numUnassigned, "numPending", numPending, "total", len(bcR.pool.requests), "outbound", outbound, "inbound", inbound) // NOTE: this condition is very strict right now. may need to weaken // If all `maxPendingRequests` requests are unassigned // and we have some peers (say >= 3), then we're caught up - maxPending := bcR.pool.numPending == maxPendingRequests - allUnassigned := bcR.pool.numPending == bcR.pool.numUnassigned + maxPending := numPending == maxPendingRequests + allUnassigned := numPending == numUnassigned enoughPeers := outbound+inbound >= 3 if maxPending && allUnassigned && enoughPeers { - log.Info("Time to switch to consensus reactor!", "height", bcR.pool.height) + log.Info("Time to switch to consensus reactor!", "height", height) bcR.pool.Stop() conR := bcR.sw.Reactor("CONSENSUS").(consensusReactor) diff --git a/consensus/reactor.go b/consensus/reactor.go index 2df09e5af..6f08a09a8 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -126,6 +126,7 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { if !conR.IsRunning() { return } + // TODO //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect() } diff --git a/node/node.go b/node/node.go index aa96606c9..a7688095d 100644 --- a/node/node.go +++ b/node/node.go @@ -141,8 +141,7 @@ func NewNode() *Node { func (n *Node) Start() { log.Info("Starting Node") n.book.Start() - nodeInfo := makeNodeInfo(n.sw) - n.sw.SetNodeInfo(nodeInfo) + n.sw.SetNodeInfo(makeNodeInfo(n.sw)) n.sw.Start() } @@ -169,7 +168,8 @@ func (n *Node) AddListener(l p2p.Listener) { n.book.AddOurAddress(l.ExternalAddress()) } -// NOTE: Blocking +// Dial a list of seeds in random order +// Spawns a go routine for each dial func (n *Node) DialSeed() { // permute the list, dial them in random order. seeds := strings.Split(config.GetString("seeds"), ",") @@ -196,7 +196,7 @@ func (n *Node) dialSeed(addr *p2p.NetAddress) { } } -func (n *Node) StartRPC() { +func (n *Node) StartRPC() net.Listener { core.SetBlockStore(n.blockStore) core.SetConsensusState(n.consensusState) core.SetConsensusReactor(n.consensusReactor) @@ -209,7 +209,11 @@ func (n *Node) StartRPC() { mux := http.NewServeMux() rpcserver.RegisterEventsHandler(mux, n.evsw) rpcserver.RegisterRPCFuncs(mux, core.Routes) - rpcserver.StartHTTPServer(listenAddr, mux) + listener, err := rpcserver.StartHTTPServer(listenAddr, mux) + if err != nil { + panic(err) + } + return listener } func (n *Node) Switch() *p2p.Switch { @@ -252,7 +256,8 @@ func makeNodeInfo(sw *p2p.Switch) *types.NodeInfo { } // We assume that the rpcListener has the same ExternalAddress. - // This is probably true because both P2P and RPC listeners use UPnP. + // This is probably true because both P2P and RPC listeners use UPnP, + // except of course if the rpc is only bound to localhost nodeInfo.Host = p2pHost nodeInfo.P2PPort = p2pPort nodeInfo.RPCPort = uint16(rpcPort) @@ -269,7 +274,7 @@ func RunNode() { n.Start() // If seedNode is provided by config, dial out. - if len(config.GetString("seeds")) > 0 { + if config.GetString("seeds") != "" { n.DialSeed() } diff --git a/rpc/server/handlers.go b/rpc/server/handlers.go index be4733fd9..58e8db081 100644 --- a/rpc/server/handlers.go +++ b/rpc/server/handlers.go @@ -346,7 +346,7 @@ func (con *WSConnection) write() { log.Error("Failed to marshal WSResponse to JSON", "error", err) } else { if err := con.wsConn.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil { - log.Error("Failed to write response on websocket", "error", err) + log.Warn("Failed to write response on websocket", "error", err) con.Stop() return } diff --git a/state/genesis.go b/state/genesis.go index c582cf5bb..b18e35daf 100644 --- a/state/genesis.go +++ b/state/genesis.go @@ -154,7 +154,7 @@ func MakeGenesisState(db dbm.DB, genDoc *GenesisDoc) *State { // Make namereg tree nameReg := merkle.NewIAVLTree(binary.BasicCodec, NameRegCodec, 0, db) - // TODO: add names to genesis.json + // TODO: add names, contracts to genesis.json // IAVLTrees must be persisted before copy operations. accounts.Save()