Browse Source

cleanup, comments

pull/110/head
Ethan Buchman 10 years ago
parent
commit
751b892cba
6 changed files with 26 additions and 21 deletions
  1. +6
    -6
      blockchain/pool.go
  2. +5
    -6
      blockchain/reactor.go
  3. +1
    -0
      consensus/reactor.go
  4. +12
    -7
      node/node.go
  5. +1
    -1
      rpc/server/handlers.go
  6. +1
    -1
      state/genesis.go

+ 6
- 6
blockchain/pool.go View File

@ -23,8 +23,8 @@ var (
) )
/* /*
Peers self report their heights when a new peer joins the block pool.
Starting from pool.height (inclusive), we request blocks
Peers self report their heights when we join the block pool.
Starting from our latest pool.height, we request blocks
in sequence from peers that reported higher heights than ours. in sequence from peers that reported higher heights than ours.
Every so often we ask peers what height they're on so we can keep going. Every so often we ask peers what height they're on so we can keep going.
@ -94,7 +94,7 @@ RUN_LOOP:
if atomic.LoadInt32(&pool.running) == 0 { if atomic.LoadInt32(&pool.running) == 0 {
break RUN_LOOP break RUN_LOOP
} }
_, numPending := pool.GetStatus()
_, numPending, _ := pool.GetStatus()
if numPending >= maxPendingRequests { if numPending >= maxPendingRequests {
// sleep for a bit. // sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond) time.Sleep(requestIntervalMS * time.Millisecond)
@ -108,11 +108,11 @@ RUN_LOOP:
} }
} }
func (pool *BlockPool) GetStatus() (int, int32) {
func (pool *BlockPool) GetStatus() (int, int32, int32) {
pool.requestsMtx.Lock() // Lock pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock() defer pool.requestsMtx.Unlock()
return pool.height, pool.numPending
return pool.height, pool.numPending, pool.numUnassigned
} }
// We need to see the second block's Validation to validate the first block. // We need to see the second block's Validation to validate the first block.
@ -378,7 +378,7 @@ func requestRoutine(pool *BlockPool, height int) {
return return
} }
// or already processed and we've moved past it // or already processed and we've moved past it
bpHeight, _ := pool.GetStatus()
bpHeight, _, _ := pool.GetStatus()
if height < bpHeight { if height < bpHeight {
pool.decrPeer(peer.id) pool.decrPeer(peer.id)
return return


+ 5
- 6
blockchain/reactor.go View File

@ -199,19 +199,18 @@ FOR_LOOP:
// ask for status updates // ask for status updates
go bcR.BroadcastStatusRequest() go bcR.BroadcastStatusRequest()
case _ = <-switchToConsensusTicker.C: case _ = <-switchToConsensusTicker.C:
// not thread safe access for numUnassigned and numPending but should be fine
// TODO make threadsafe and use exposed functions
height, numUnassigned, numPending := bcR.pool.GetStatus()
outbound, inbound, _ := bcR.sw.NumPeers() outbound, inbound, _ := bcR.sw.NumPeers()
log.Debug("Consensus ticker", "numUnassigned", bcR.pool.numUnassigned, "numPending", bcR.pool.numPending,
log.Debug("Consensus ticker", "numUnassigned", numUnassigned, "numPending", numPending,
"total", len(bcR.pool.requests), "outbound", outbound, "inbound", inbound) "total", len(bcR.pool.requests), "outbound", outbound, "inbound", inbound)
// NOTE: this condition is very strict right now. may need to weaken // NOTE: this condition is very strict right now. may need to weaken
// If all `maxPendingRequests` requests are unassigned // If all `maxPendingRequests` requests are unassigned
// and we have some peers (say >= 3), then we're caught up // and we have some peers (say >= 3), then we're caught up
maxPending := bcR.pool.numPending == maxPendingRequests
allUnassigned := bcR.pool.numPending == bcR.pool.numUnassigned
maxPending := numPending == maxPendingRequests
allUnassigned := numPending == numUnassigned
enoughPeers := outbound+inbound >= 3 enoughPeers := outbound+inbound >= 3
if maxPending && allUnassigned && enoughPeers { if maxPending && allUnassigned && enoughPeers {
log.Info("Time to switch to consensus reactor!", "height", bcR.pool.height)
log.Info("Time to switch to consensus reactor!", "height", height)
bcR.pool.Stop() bcR.pool.Stop()
conR := bcR.sw.Reactor("CONSENSUS").(consensusReactor) conR := bcR.sw.Reactor("CONSENSUS").(consensusReactor)


+ 1
- 0
consensus/reactor.go View File

@ -126,6 +126,7 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
if !conR.IsRunning() { if !conR.IsRunning() {
return return
} }
// TODO
//peer.Data.Get(PeerStateKey).(*PeerState).Disconnect() //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect()
} }


+ 12
- 7
node/node.go View File

@ -141,8 +141,7 @@ func NewNode() *Node {
func (n *Node) Start() { func (n *Node) Start() {
log.Info("Starting Node") log.Info("Starting Node")
n.book.Start() n.book.Start()
nodeInfo := makeNodeInfo(n.sw)
n.sw.SetNodeInfo(nodeInfo)
n.sw.SetNodeInfo(makeNodeInfo(n.sw))
n.sw.Start() n.sw.Start()
} }
@ -169,7 +168,8 @@ func (n *Node) AddListener(l p2p.Listener) {
n.book.AddOurAddress(l.ExternalAddress()) n.book.AddOurAddress(l.ExternalAddress())
} }
// NOTE: Blocking
// Dial a list of seeds in random order
// Spawns a go routine for each dial
func (n *Node) DialSeed() { func (n *Node) DialSeed() {
// permute the list, dial them in random order. // permute the list, dial them in random order.
seeds := strings.Split(config.GetString("seeds"), ",") seeds := strings.Split(config.GetString("seeds"), ",")
@ -196,7 +196,7 @@ func (n *Node) dialSeed(addr *p2p.NetAddress) {
} }
} }
func (n *Node) StartRPC() {
func (n *Node) StartRPC() net.Listener {
core.SetBlockStore(n.blockStore) core.SetBlockStore(n.blockStore)
core.SetConsensusState(n.consensusState) core.SetConsensusState(n.consensusState)
core.SetConsensusReactor(n.consensusReactor) core.SetConsensusReactor(n.consensusReactor)
@ -209,7 +209,11 @@ func (n *Node) StartRPC() {
mux := http.NewServeMux() mux := http.NewServeMux()
rpcserver.RegisterEventsHandler(mux, n.evsw) rpcserver.RegisterEventsHandler(mux, n.evsw)
rpcserver.RegisterRPCFuncs(mux, core.Routes) rpcserver.RegisterRPCFuncs(mux, core.Routes)
rpcserver.StartHTTPServer(listenAddr, mux)
listener, err := rpcserver.StartHTTPServer(listenAddr, mux)
if err != nil {
panic(err)
}
return listener
} }
func (n *Node) Switch() *p2p.Switch { func (n *Node) Switch() *p2p.Switch {
@ -252,7 +256,8 @@ func makeNodeInfo(sw *p2p.Switch) *types.NodeInfo {
} }
// We assume that the rpcListener has the same ExternalAddress. // We assume that the rpcListener has the same ExternalAddress.
// This is probably true because both P2P and RPC listeners use UPnP.
// This is probably true because both P2P and RPC listeners use UPnP,
// except of course if the rpc is only bound to localhost
nodeInfo.Host = p2pHost nodeInfo.Host = p2pHost
nodeInfo.P2PPort = p2pPort nodeInfo.P2PPort = p2pPort
nodeInfo.RPCPort = uint16(rpcPort) nodeInfo.RPCPort = uint16(rpcPort)
@ -269,7 +274,7 @@ func RunNode() {
n.Start() n.Start()
// If seedNode is provided by config, dial out. // If seedNode is provided by config, dial out.
if len(config.GetString("seeds")) > 0 {
if config.GetString("seeds") != "" {
n.DialSeed() n.DialSeed()
} }


+ 1
- 1
rpc/server/handlers.go View File

@ -346,7 +346,7 @@ func (con *WSConnection) write() {
log.Error("Failed to marshal WSResponse to JSON", "error", err) log.Error("Failed to marshal WSResponse to JSON", "error", err)
} else { } else {
if err := con.wsConn.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil { if err := con.wsConn.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil {
log.Error("Failed to write response on websocket", "error", err)
log.Warn("Failed to write response on websocket", "error", err)
con.Stop() con.Stop()
return return
} }


+ 1
- 1
state/genesis.go View File

@ -154,7 +154,7 @@ func MakeGenesisState(db dbm.DB, genDoc *GenesisDoc) *State {
// Make namereg tree // Make namereg tree
nameReg := merkle.NewIAVLTree(binary.BasicCodec, NameRegCodec, 0, db) nameReg := merkle.NewIAVLTree(binary.BasicCodec, NameRegCodec, 0, db)
// TODO: add names to genesis.json
// TODO: add names, contracts to genesis.json
// IAVLTrees must be persisted before copy operations. // IAVLTrees must be persisted before copy operations.
accounts.Save() accounts.Save()


Loading…
Cancel
Save