diff --git a/blockchain/pool.go b/blockchain/pool.go index 3de2c2969..3e3313bfb 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -1,7 +1,7 @@ package blockchain import ( - "math/rand" + "sync" "sync/atomic" "time" @@ -11,347 +11,368 @@ import ( const ( maxOutstandingRequestsPerPeer = 10 - eventsChannelCapacity = 100 + inputsChannelCapacity = 100 maxTries = 3 requestIntervalMS = 500 requestBatchSize = 50 maxPendingRequests = 50 maxTotalRequests = 100 - maxPeersPerRequest = 1 + maxRequestsPerPeer = 20 ) var ( - requestTimeoutSeconds = time.Duration(10) + requestTimeoutSeconds = time.Duration(1) ) -type BlockRequest struct { - Height uint - PeerId string -} - type BlockPool struct { - peers map[string]*bpPeer - blockInfos map[uint]*bpBlockInfo - height uint // the lowest key in blockInfos. - started int32 // atomic - stopped int32 // atomic - numPending int32 - numTotal int32 - eventsCh chan interface{} // internal events. - requestsCh chan<- BlockRequest // output of new requests to make. - timeoutsCh chan<- string // output of peers that timed out. - blocksCh chan<- *types.Block // output of ordered blocks. - repeater *RepeatTimer // for requesting more bocks. - quit chan struct{} + // block requests + requestsMtx sync.Mutex + requests map[uint]*bpRequest + height uint // the lowest key in requests. + numPending int32 + numTotal int32 + + // peers + peersMtx sync.Mutex + peers map[string]*bpPeer + + requestsCh chan<- BlockRequest + timeoutsCh chan<- string + repeater *RepeatTimer + + running int32 // atomic } -func NewBlockPool(start uint, timeoutsCh chan<- string, requestsCh chan<- BlockRequest, blocksCh chan<- *types.Block) *BlockPool { +func NewBlockPool(start uint, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool { return &BlockPool{ - peers: make(map[string]*bpPeer), - blockInfos: make(map[uint]*bpBlockInfo), + peers: make(map[string]*bpPeer), + + requests: make(map[uint]*bpRequest), height: start, - started: 0, - stopped: 0, numPending: 0, numTotal: 0, - quit: make(chan struct{}), - eventsCh: make(chan interface{}, eventsChannelCapacity), requestsCh: requestsCh, timeoutsCh: timeoutsCh, - blocksCh: blocksCh, repeater: NewRepeatTimer("", requestIntervalMS*time.Millisecond), + + running: 0, } } func (bp *BlockPool) Start() { - if atomic.CompareAndSwapInt32(&bp.started, 0, 1) { + if atomic.CompareAndSwapInt32(&bp.running, 0, 1) { log.Info("Starting BlockPool") go bp.run() } } func (bp *BlockPool) Stop() { - if atomic.CompareAndSwapInt32(&bp.stopped, 0, 1) { + if atomic.CompareAndSwapInt32(&bp.running, 1, 0) { log.Info("Stopping BlockPool") - close(bp.quit) - close(bp.eventsCh) - close(bp.requestsCh) - close(bp.timeoutsCh) - close(bp.blocksCh) bp.repeater.Stop() } } -// AddBlock should be called when a block is received. -func (bp *BlockPool) AddBlock(block *types.Block, peerId string) { - bp.eventsCh <- bpBlockResponse{block, peerId} -} - -func (bp *BlockPool) SetPeerStatus(peerId string, height uint) { - bp.eventsCh <- bpPeerStatus{peerId, height} +func (bp *BlockPool) IsRunning() bool { + return atomic.LoadInt32(&bp.running) == 1 } -// Runs in a goroutine and processes messages. +// Run spawns requests as needed. func (bp *BlockPool) run() { -FOR_LOOP: +RUN_LOOP: for { - select { - case msg := <-bp.eventsCh: - bp.handleEvent(msg) - case <-bp.repeater.Ch: - bp.makeMoreBlockInfos() - bp.requestBlocksFromRandomPeers(10) - case <-bp.quit: - break FOR_LOOP + if atomic.LoadInt32(&bp.running) == 0 { + break RUN_LOOP + } + height, numPending, numTotal := bp.GetStatus() + log.Debug("BlockPool.run", "height", height, "numPending", numPending, + "numTotal", numTotal) + if numPending >= maxPendingRequests { + // sleep for a bit. + time.Sleep(requestIntervalMS * time.Millisecond) + } else if numTotal >= maxTotalRequests { + // sleep for a bit. + time.Sleep(requestIntervalMS * time.Millisecond) + } else { + // request for more blocks. + height := bp.nextHeight() + bp.makeRequest(height) } } } -func (bp *BlockPool) handleEvent(event_ interface{}) { - switch event := event_.(type) { - case bpBlockResponse: - peer := bp.peers[event.peerId] - blockInfo := bp.blockInfos[event.block.Height] - if blockInfo == nil { - // block was unwanted. - if peer != nil { - peer.bad++ - } - } else { - // block was wanted. - if peer != nil { - peer.good++ - } - delete(peer.requests, event.block.Height) - if blockInfo.block == nil { - // peer is the first to give it to us. - blockInfo.block = event.block - blockInfo.blockBy = peer.id - bp.numPending-- - if event.block.Height == bp.height { - go bp.pushBlocksFromStart() - } - } - } - case bpPeerStatus: // updated or new status from peer - // request blocks if possible. - peer := bp.peers[event.peerId] - if peer == nil { - peer = bpNewPeer(event.peerId, event.height) - bp.peers[peer.id] = peer - } - bp.requestBlocksFromPeer(peer) - case bpRequestTimeout: // unconditional timeout for each peer's request. - peer := bp.peers[event.peerId] - if peer == nil { - // cleanup was already handled. - return - } - height := event.height - request := peer.requests[height] - if request == nil || request.block != nil { - // the request was fulfilled by some peer or this peer. - return - } +func (bp *BlockPool) GetStatus() (uint, int32, int32) { + bp.requestsMtx.Lock() // Lock + defer bp.requestsMtx.Unlock() - // A request for peer timed out. - peer.bad++ - if request.tries < maxTries { - log.Warn("Timeout: Trying again.", "tries", request.tries, "peerId", peer.id) - // try again. - select { - case bp.requestsCh <- BlockRequest{height, peer.id}: - request.startAndTimeoutTo(bp.eventsCh) // also bumps request.tries - default: - // The request cannot be made because requestCh is full. - // Just delete the request. - delete(peer.requests, height) - } - } else { - log.Warn("Timeout: Deleting request") - // delete the request. - delete(peer.requests, height) - blockInfo := bp.blockInfos[height] - if blockInfo != nil { - delete(blockInfo.requests, peer.id) - } - select { - case bp.timeoutsCh <- peer.id: - default: - } + return bp.height, bp.numPending, bp.numTotal +} - } +// We need to see the second block's Validation to validate the first block. +// So we peek two blocks at a time. +func (bp *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) { + bp.requestsMtx.Lock() // Lock + defer bp.requestsMtx.Unlock() + + if r := bp.requests[bp.height]; r != nil { + first = r.block + } + if r := bp.requests[bp.height+1]; r != nil { + second = r.block } + return } -// NOTE: This function is sufficient, but we should find pending blocks -// and sample the peers in one go rather than the current O(n^2) impl. -func (bp *BlockPool) requestBlocksFromRandomPeers(maxPeers int) { - chosen := bp.pickAvailablePeers(maxPeers) - log.Debug("requestBlocksFromRandomPeers", "chosen", len(chosen)) - for _, peer := range chosen { - bp.requestBlocksFromPeer(peer) +// Pop the first block at bp.height +// It must have been validated by 'second'.Validation from PeekTwoBlocks(). +func (bp *BlockPool) PopRequest() { + bp.requestsMtx.Lock() // Lock + defer bp.requestsMtx.Unlock() + + if r := bp.requests[bp.height]; r == nil || r.block == nil { + panic("PopRequest() requires a valid block") } + + delete(bp.requests, bp.height) + bp.height++ + bp.numTotal-- } -func (bp *BlockPool) requestBlocksFromPeer(peer *bpPeer) { - // If peer is available and can provide something... - for height := bp.height; peer.available(); height++ { - blockInfo := bp.blockInfos[height] - if blockInfo == nil { - // We're out of range. - return - } - needsMorePeers := blockInfo.needsMorePeers() - alreadyAskedPeer := blockInfo.requests[peer.id] != nil - if needsMorePeers && !alreadyAskedPeer { - select { - case bp.requestsCh <- BlockRequest{height, peer.id}: - // Create a new request and start the timer. - request := &bpBlockRequest{ - height: height, - peer: peer, - } - blockInfo.requests[peer.id] = request - peer.requests[height] = request - request.startAndTimeoutTo(bp.eventsCh) // also bumps request.tries - default: - // The request cannot be made because requestCh is full. - // Just stop. - return - } - } +// Invalidates the block at bp.height. +// Remove the peer and request from others. +func (bp *BlockPool) RedoRequest(height uint) { + bp.requestsMtx.Lock() // Lock + defer bp.requestsMtx.Unlock() + + request := bp.requests[height] + if request.block == nil { + panic("Expected block to be non-nil") } + bp.removePeer(request.peerId) + request.block = nil + request.peerId = "" + bp.numPending++ + + go requestRoutine(bp, height) } -func (bp *BlockPool) makeMoreBlockInfos() { - // make more requests if necessary. - for i := 0; i < requestBatchSize; i++ { - //log.Debug("Confused?", - // "numPending", bp.numPending, "maxPendingRequests", maxPendingRequests, "numtotal", bp.numTotal, "maxTotalRequests", maxTotalRequests) - if bp.numPending < maxPendingRequests && bp.numTotal < maxTotalRequests { - // Make a request for the next block height - requestHeight := bp.height + uint(bp.numTotal) - log.Debug("New blockInfo", "height", requestHeight) - blockInfo := bpNewBlockInfo(requestHeight) - bp.blockInfos[requestHeight] = blockInfo - bp.numPending++ - bp.numTotal++ - } else { - break - } +func (bp *BlockPool) hasBlock(height uint) bool { + bp.requestsMtx.Lock() // Lock + defer bp.requestsMtx.Unlock() + + request := bp.requests[height] + return request != nil && request.block != nil +} + +func (bp *BlockPool) setPeerForRequest(height uint, peerId string) { + bp.requestsMtx.Lock() // Lock + defer bp.requestsMtx.Unlock() + + request := bp.requests[height] + if request == nil { + return } + request.peerId = peerId } -func (bp *BlockPool) pickAvailablePeers(choose int) []*bpPeer { - available := []*bpPeer{} - for _, peer := range bp.peers { - if peer.available() { - available = append(available, peer) - } +func (bp *BlockPool) AddBlock(block *types.Block, peerId string) { + bp.requestsMtx.Lock() // Lock + defer bp.requestsMtx.Unlock() + + request := bp.requests[block.Height] + if request == nil { + return } - perm := rand.Perm(MinInt(choose, len(available))) - chosen := make([]*bpPeer, len(perm)) - for i, idx := range perm { - chosen[i] = available[idx] + if request.peerId != peerId { + return } - return chosen + if request.block != nil { + return + } + request.block = block + bp.numPending-- +} + +func (bp *BlockPool) getPeer(peerId string) *bpPeer { + bp.peersMtx.Lock() // Lock + defer bp.peersMtx.Unlock() + + peer := bp.peers[peerId] + return peer } -// blocking -func (bp *BlockPool) pushBlocksFromStart() { - for height := bp.height; ; height++ { - // push block to blocksCh. - blockInfo := bp.blockInfos[height] - if blockInfo == nil || blockInfo.block == nil { - break +// Sets the peer's blockchain height. +func (bp *BlockPool) SetPeerHeight(peerId string, height uint) { + bp.peersMtx.Lock() // Lock + defer bp.peersMtx.Unlock() + + peer := bp.peers[peerId] + if peer != nil { + peer.height = height + } else { + peer = &bpPeer{ + height: height, + id: peerId, + numRequests: 0, } - bp.numTotal-- - bp.height++ - delete(bp.blockInfos, height) - bp.blocksCh <- blockInfo.block + bp.peers[peerId] = peer } } -//----------------------------------------------------------------------------- +func (bp *BlockPool) RemovePeer(peerId string) { + bp.peersMtx.Lock() // Lock + defer bp.peersMtx.Unlock() -type bpBlockInfo struct { - height uint - requests map[string]*bpBlockRequest - block *types.Block // first block received - blockBy string // peerId of source + delete(bp.peers, peerId) } -func bpNewBlockInfo(height uint) *bpBlockInfo { - return &bpBlockInfo{ - height: height, - requests: make(map[string]*bpBlockRequest), +// Pick an available peer with at least the given minHeight. +// If no peers are available, returns nil. +func (bp *BlockPool) pickIncrAvailablePeer(minHeight uint) *bpPeer { + bp.peersMtx.Lock() + defer bp.peersMtx.Unlock() + + for _, peer := range bp.peers { + if peer.numRequests >= maxRequestsPerPeer { + continue + } + if peer.height < minHeight { + continue + } + peer.numRequests++ + return peer } -} -func (blockInfo *bpBlockInfo) needsMorePeers() bool { - return len(blockInfo.requests) < maxPeersPerRequest + return nil } -//------------------------------------- +func (bp *BlockPool) decrPeer(peerId string) { + bp.peersMtx.Lock() + defer bp.peersMtx.Unlock() -type bpBlockRequest struct { - peer *bpPeer - height uint - block *types.Block - tries int + peer := bp.peers[peerId] + if peer == nil { + return + } + peer.numRequests-- } -// bump tries++ and set timeout. -// NOTE: the timer is unconditional. -func (request *bpBlockRequest) startAndTimeoutTo(eventsCh chan<- interface{}) { - request.tries++ - time.AfterFunc(requestTimeoutSeconds*time.Second, func() { - eventsCh <- bpRequestTimeout{ - peerId: request.peer.id, - height: request.height, - } - }) +func (bp *BlockPool) nextHeight() uint { + bp.requestsMtx.Lock() // Lock + defer bp.requestsMtx.Unlock() + + return bp.height + uint(bp.numTotal) } -//------------------------------------- +func (bp *BlockPool) makeRequest(height uint) { + bp.requestsMtx.Lock() // Lock + defer bp.requestsMtx.Unlock() -type bpPeer struct { - id string - height uint - requests map[uint]*bpBlockRequest - // Count good/bad events from peer. - good uint - bad uint + request := &bpRequest{ + height: height, + peerId: "", + block: nil, + } + bp.requests[height] = request + + nextHeight := bp.height + uint(bp.numTotal) + if nextHeight == height { + bp.numTotal++ + bp.numPending++ + } + + go requestRoutine(bp, height) } -func bpNewPeer(peerId string, height uint) *bpPeer { - return &bpPeer{ - id: peerId, - height: height, - requests: make(map[uint]*bpBlockRequest), +func (bp *BlockPool) sendRequest(height uint, peerId string) { + if atomic.LoadInt32(&bp.running) == 0 { + return } + bp.requestsCh <- BlockRequest{height, peerId} } -func (peer *bpPeer) available() bool { - return len(peer.requests) < maxOutstandingRequestsPerPeer +func (bp *BlockPool) sendTimeout(peerId string) { + if atomic.LoadInt32(&bp.running) == 0 { + return + } + bp.timeoutsCh <- peerId +} + +func (bp *BlockPool) debug() string { + bp.requestsMtx.Lock() // Lock + defer bp.requestsMtx.Unlock() + + str := "" + for h := bp.height; h < bp.height+uint(bp.numTotal); h++ { + if bp.requests[h] == nil { + str += Fmt("H(%v):X ", h) + } else { + str += Fmt("H(%v):", h) + str += Fmt("B?(%v) ", bp.requests[h].block != nil) + } + } + return str } //------------------------------------- -// bp.eventsCh messages -type bpBlockResponse struct { - block *types.Block - peerId string +type bpPeer struct { + id string + height uint + numRequests int32 } -type bpPeerStatus struct { +type bpRequest struct { + height uint peerId string - height uint // blockchain tip of peer + block *types.Block } -type bpRequestTimeout struct { - peerId string - height uint +//------------------------------------- + +// Responsible for making more requests as necessary +// Returns when a block is found (e.g. AddBlock() is called) +func requestRoutine(bp *BlockPool, height uint) { + for { + var peer *bpPeer = nil + PICK_LOOP: + for { + if !bp.IsRunning() { + return + } + peer = bp.pickIncrAvailablePeer(height) + if peer == nil { + time.Sleep(requestIntervalMS * time.Millisecond) + continue PICK_LOOP + } + break PICK_LOOP + } + + bp.setPeerForRequest(height, peer.id) + + for try := 0; try < maxTries; try++ { + bp.sendRequest(height, peer.id) + time.Sleep(requestTimeoutSeconds * time.Second) + if bp.hasBlock(height) { + bp.decrPeer(peer.id) + return + } + bpHeight, _, _ := bp.GetStatus() + if height < bpHeight { + bp.decrPeer(peer.id) + return + } + } + + bp.RemovePeer(peer.id) + bp.sendTimeout(peer.id) + } +} + +//------------------------------------- + +type BlockRequest struct { + Height uint + PeerId string } diff --git a/blockchain/pool_test.go b/blockchain/pool_test.go index a2578aa13..c07a11d85 100644 --- a/blockchain/pool_test.go +++ b/blockchain/pool_test.go @@ -25,26 +25,34 @@ func makePeers(numPeers int, minHeight, maxHeight uint) map[string]testPeer { } func TestBasic(t *testing.T) { - // 100 peers anywhere at height 0 to 1000. - peers := makePeers(100, 0, 1000) - + peers := makePeers(10, 0, 1000) start := uint(42) - maxHeight := uint(300) timeoutsCh := make(chan string, 100) requestsCh := make(chan BlockRequest, 100) - blocksCh := make(chan *types.Block, 100) - - pool := NewBlockPool(start, timeoutsCh, requestsCh, blocksCh) + pool := NewBlockPool(start, requestsCh, timeoutsCh) pool.Start() // Introduce each peer. go func() { for _, peer := range peers { - pool.SetPeerStatus(peer.id, peer.height) + pool.SetPeerHeight(peer.id, peer.height) } }() - lastSeenBlock := uint(41) + // Start a goroutine to pull blocks + go func() { + for { + if !pool.IsRunning() { + return + } + first, second := pool.PeekTwoBlocks() + if first != nil && second != nil { + pool.PopRequest() + } else { + time.Sleep(1 * time.Second) + } + } + }() // Pull from channels for { @@ -53,21 +61,15 @@ func TestBasic(t *testing.T) { t.Errorf("timeout: %v", peerId) case request := <-requestsCh: log.Debug("TEST: Pulled new BlockRequest", "request", request) - // After a while, pretend like we got a block from the peer. + if request.Height == 300 { + return // Done! + } + // Request desired, pretend like we got the block immediately. go func() { block := &types.Block{Header: &types.Header{Height: request.Height}} pool.AddBlock(block, request.PeerId) log.Debug("TEST: Added block", "block", request.Height, "peer", request.PeerId) }() - case block := <-blocksCh: - log.Debug("TEST: Pulled new Block", "height", block.Height) - if block.Height != lastSeenBlock+1 { - t.Fatalf("Wrong order of blocks seen. Expected: %v Got: %v", lastSeenBlock+1, block.Height) - } - lastSeenBlock++ - if block.Height == maxHeight { - return // Done! - } } } @@ -75,43 +77,52 @@ func TestBasic(t *testing.T) { } func TestTimeout(t *testing.T) { - origRequestTimeoutSeconds := requestTimeoutSeconds - requestTimeoutSeconds = time.Duration(0) - defer func() { requestTimeoutSeconds = origRequestTimeoutSeconds }() - - peers := makePeers(100, 0, 1000) + peers := makePeers(10, 0, 1000) start := uint(42) - timeoutsCh := make(chan string, 10) - requestsCh := make(chan BlockRequest, 10) - blocksCh := make(chan *types.Block, 100) - - pool := NewBlockPool(start, timeoutsCh, requestsCh, blocksCh) + timeoutsCh := make(chan string, 100) + requestsCh := make(chan BlockRequest, 100) + pool := NewBlockPool(start, requestsCh, timeoutsCh) pool.Start() // Introduce each peer. go func() { for _, peer := range peers { - pool.SetPeerStatus(peer.id, peer.height) + pool.SetPeerHeight(peer.id, peer.height) + } + }() + + // Start a goroutine to pull blocks + go func() { + for { + if !pool.IsRunning() { + return + } + first, second := pool.PeekTwoBlocks() + if first != nil && second != nil { + pool.PopRequest() + } else { + time.Sleep(1 * time.Second) + } } }() // Pull from channels + counter := 0 + timedOut := map[string]struct{}{} for { select { case peerId := <-timeoutsCh: - // Timed out. Done! - if peers[peerId].id != peerId { - t.Errorf("Unexpected peer from timeoutsCh") + log.Debug("Timeout", "peerId", peerId) + if _, ok := timedOut[peerId]; !ok { + counter++ + if counter == len(peers) { + return // Done! + } } - return - case _ = <-requestsCh: - // Don't do anything, let it time out. - case _ = <-blocksCh: - t.Errorf("Got block when none expected") - return + case request := <-requestsCh: + log.Debug("TEST: Pulled new BlockRequest", "request", request) } } pool.Stop() - }