Browse Source

waiting back to pending

pull/80/head
Jae Kwon 10 years ago
parent
commit
057c8ef400
2 changed files with 26 additions and 37 deletions
  1. +15
    -27
      blockchain/pool.go
  2. +11
    -10
      blockchain/reactor.go

+ 15
- 27
blockchain/pool.go View File

@ -13,7 +13,7 @@ const (
maxTries = 3
inputsChannelCapacity = 200
requestIntervalMS = 500
maxWaitingRequests = 200
maxPendingRequests = 200
maxTotalRequests = 300
maxRequestsPerPeer = 300
)
@ -39,7 +39,7 @@ type BlockPool struct {
requests map[uint]*bpRequest
height uint // the lowest key in requests.
numUnassigned int32 // number of requests not yet assigned to a peer
numWaiting int32 // number of requests awaiting response from a peer
numPending int32 // number of requests pending assignment or block response
// peers
peersMtx sync.Mutex
@ -59,7 +59,7 @@ func NewBlockPool(start uint, requestsCh chan<- BlockRequest, timeoutsCh chan<-
requests: make(map[uint]*bpRequest),
height: start,
numUnassigned: 0,
numWaiting: 0,
numPending: 0,
requestsCh: requestsCh,
timeoutsCh: timeoutsCh,
@ -94,8 +94,8 @@ RUN_LOOP:
if atomic.LoadInt32(&pool.running) == 0 {
break RUN_LOOP
}
_, numWaiting := pool.GetStatus()
if numWaiting >= maxWaitingRequests {
_, numPending := pool.GetStatus()
if numPending >= maxPendingRequests {
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
} else if len(pool.requests) >= maxTotalRequests {
@ -103,8 +103,7 @@ RUN_LOOP:
time.Sleep(requestIntervalMS * time.Millisecond)
} else {
// request for more blocks.
height := pool.nextHeight()
pool.makeRequest(height)
pool.makeNextRequest()
}
}
}
@ -113,7 +112,7 @@ func (pool *BlockPool) GetStatus() (uint, int32) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
return pool.height, pool.numWaiting
return pool.height, pool.numPending
}
// We need to see the second block's Validation to validate the first block.
@ -160,7 +159,7 @@ func (pool *BlockPool) RedoRequest(height uint) {
pool.RemovePeer(request.peerId) // Lock on peersMtx.
request.block = nil
request.peerId = ""
pool.numWaiting++
pool.numPending++
pool.numUnassigned++
go requestRoutine(pool, height)
@ -213,7 +212,7 @@ func (pool *BlockPool) AddBlock(block *types.Block, peerId string) {
return
}
request.block = block
pool.numWaiting--
pool.numPending--
}
func (pool *BlockPool) getPeer(peerId string) *bpPeer {
@ -279,33 +278,22 @@ func (pool *BlockPool) decrPeer(peerId string) {
peer.numRequests--
}
func (pool *BlockPool) nextHeight() uint {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
// we make one request per height.
return pool.height + uint(len(pool.requests))
}
func (pool *BlockPool) makeRequest(height uint) {
func (pool *BlockPool) makeNextRequest() {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
nextHeight := pool.height + uint(len(pool.requests)) + 1
request := &bpRequest{
height: height,
height: nextHeight,
peerId: "",
block: nil,
}
pool.requests[height] = request
pool.requests[nextHeight] = request
pool.numUnassigned++
pool.numPending++
nextHeight := pool.height + uint(len(pool.requests))
if nextHeight == height {
pool.numWaiting++
}
go requestRoutine(pool, height)
go requestRoutine(pool, nextHeight)
}
func (pool *BlockPool) sendRequest(height uint, peerId string) {


+ 11
- 10
blockchain/reactor.go View File

@ -197,18 +197,19 @@ FOR_LOOP:
// ask for status updates
go bcR.BroadcastStatusRequest()
case _ = <-switchToConsensusTicker.C:
// not thread safe access for numUnassigned and numWaiting but should be fine
// not thread safe access for numUnassigned and numPending but should be fine
// TODO make threadsafe and use exposed functions
log.Debug("Consensus ticker", "numUnassigned", bcR.pool.numUnassigned, "numWaiting", bcR.pool.numWaiting, "total", len(bcR.pool.requests))
outbound, inbound, _ := bcR.sw.NumPeers()
log.Debug("Consensus ticker", "numUnassigned", bcR.pool.numUnassigned, "numPending", bcR.pool.numPending,
"total", len(bcR.pool.requests), "outbound", outbound, "inbound", inbound)
// NOTE: this condition is very strict right now. may need to weaken
// if the max amount of requests are waiting and numUnassigned
// and we have some peers (say > 5), then we're caught up
maxWaiting := bcR.pool.numWaiting == maxWaitingRequests
peersUnavailable := bcR.pool.numWaiting == bcR.pool.numUnassigned
o, i, _ := bcR.sw.NumPeers()
enoughPeers := o+i >= 5
if maxWaiting && peersUnavailable && enoughPeers {
log.Warn("Time to switch to consensus reactor!", "height", bcR.pool.height)
// If all `maxPendingRequests` requests are unassigned
// and we have some peers (say >= 3), then we're caught up
maxPending := bcR.pool.numPending == maxPendingRequests
allUnassigned := bcR.pool.numPending == bcR.pool.numUnassigned
enoughPeers := outbound+inbound >= 3
if maxPending && allUnassigned && enoughPeers {
log.Info("Time to switch to consensus reactor!", "height", bcR.pool.height)
bcR.pool.Stop()
stateDB := dbm.GetDB("state")
state := sm.LoadState(stateDB)


Loading…
Cancel
Save