|
|
- package blockchain
-
- import (
- "sync"
- "sync/atomic"
- "time"
-
- . "github.com/tendermint/tendermint/common"
- "github.com/tendermint/tendermint/types"
- )
-
- const (
- maxTries = 3
- inputsChannelCapacity = 200
- requestIntervalMS = 500
- maxPendingRequests = 200
- maxTotalRequests = 300
- maxRequestsPerPeer = 300
- )
-
- var (
- requestTimeoutSeconds = time.Duration(3)
- )
-
- /*
- Peers self report their heights when a new peer joins the block pool.
- Starting from pool.height (inclusive), we request blocks
- in sequence from peers that reported higher heights than ours.
- Every so often we ask peers what height they're on so we can keep going.
-
- Requests are continuously made for blocks of heigher heights until
- the limits. If most of the requests have no available peers, and we
- are not at peer limits, we can probably switch to consensus reactor
- */
-
- type BlockPool struct {
- // block requests
- requestsMtx sync.Mutex
- requests map[int]*bpRequest
- height int // the lowest key in requests.
- numUnassigned int32 // number of requests not yet assigned to a peer
- numPending int32 // number of requests pending assignment or block response
-
- // peers
- peersMtx sync.Mutex
- peers map[string]*bpPeer
-
- requestsCh chan<- BlockRequest
- timeoutsCh chan<- string
- repeater *RepeatTimer
-
- running int32 // atomic
- }
-
- func NewBlockPool(start int, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool {
- return &BlockPool{
- peers: make(map[string]*bpPeer),
-
- requests: make(map[int]*bpRequest),
- height: start,
- numUnassigned: 0,
- numPending: 0,
-
- requestsCh: requestsCh,
- timeoutsCh: timeoutsCh,
- repeater: NewRepeatTimer("", requestIntervalMS*time.Millisecond),
-
- running: 0,
- }
- }
-
- func (pool *BlockPool) Start() {
- if atomic.CompareAndSwapInt32(&pool.running, 0, 1) {
- log.Info("Starting BlockPool")
- go pool.run()
- }
- }
-
- func (pool *BlockPool) Stop() {
- if atomic.CompareAndSwapInt32(&pool.running, 1, 0) {
- log.Info("Stopping BlockPool")
- pool.repeater.Stop()
- }
- }
-
- func (pool *BlockPool) IsRunning() bool {
- return atomic.LoadInt32(&pool.running) == 1
- }
-
- // Run spawns requests as needed.
- func (pool *BlockPool) run() {
- RUN_LOOP:
- for {
- if atomic.LoadInt32(&pool.running) == 0 {
- break RUN_LOOP
- }
- _, numPending := pool.GetStatus()
- if numPending >= maxPendingRequests {
- // sleep for a bit.
- time.Sleep(requestIntervalMS * time.Millisecond)
- } else if len(pool.requests) >= maxTotalRequests {
- // sleep for a bit.
- time.Sleep(requestIntervalMS * time.Millisecond)
- } else {
- // request for more blocks.
- pool.makeNextRequest()
- }
- }
- }
-
- func (pool *BlockPool) GetStatus() (int, int32) {
- pool.requestsMtx.Lock() // Lock
- defer pool.requestsMtx.Unlock()
-
- return pool.height, pool.numPending
- }
-
- // We need to see the second block's Validation to validate the first block.
- // So we peek two blocks at a time.
- func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
- pool.requestsMtx.Lock() // Lock
- defer pool.requestsMtx.Unlock()
-
- if r := pool.requests[pool.height]; r != nil {
- first = r.block
- }
- if r := pool.requests[pool.height+1]; r != nil {
- second = r.block
- }
- return
- }
-
- // Pop the first block at pool.height
- // It must have been validated by 'second'.Validation from PeekTwoBlocks().
- func (pool *BlockPool) PopRequest() {
- pool.requestsMtx.Lock() // Lock
- defer pool.requestsMtx.Unlock()
-
- // SANITY CHECK
- if r := pool.requests[pool.height]; r == nil || r.block == nil {
- panic("PopRequest() requires a valid block")
- }
- // SANITY CHECK END
-
- delete(pool.requests, pool.height)
- pool.height++
- }
-
- // Invalidates the block at pool.height.
- // Remove the peer and request from others.
- func (pool *BlockPool) RedoRequest(height int) {
- pool.requestsMtx.Lock() // Lock
- defer pool.requestsMtx.Unlock()
-
- request := pool.requests[height]
- // SANITY CHECK
- if request.block == nil {
- panic("Expected block to be non-nil")
- }
- // SANITY CHECK END
- // TODO: record this malfeasance
- // maybe punish peer on switch (an invalid block!)
- pool.RemovePeer(request.peerId) // Lock on peersMtx.
- request.block = nil
- request.peerId = ""
- pool.numPending++
- pool.numUnassigned++
-
- go requestRoutine(pool, height)
- }
-
- func (pool *BlockPool) hasBlock(height int) bool {
- pool.requestsMtx.Lock() // Lock
- defer pool.requestsMtx.Unlock()
-
- request := pool.requests[height]
- return request != nil && request.block != nil
- }
-
- func (pool *BlockPool) setPeerForRequest(height int, peerId string) {
- pool.requestsMtx.Lock() // Lock
- defer pool.requestsMtx.Unlock()
-
- request := pool.requests[height]
- if request == nil {
- return
- }
- pool.numUnassigned--
- request.peerId = peerId
- }
-
- func (pool *BlockPool) removePeerForRequest(height int, peerId string) {
- pool.requestsMtx.Lock() // Lock
- defer pool.requestsMtx.Unlock()
-
- request := pool.requests[height]
- if request == nil {
- return
- }
- pool.numUnassigned++
- request.peerId = ""
- }
-
- func (pool *BlockPool) AddBlock(block *types.Block, peerId string) {
- pool.requestsMtx.Lock() // Lock
- defer pool.requestsMtx.Unlock()
-
- request := pool.requests[block.Height]
- if request == nil {
- return
- }
- if request.peerId != peerId {
- return
- }
- if request.block != nil {
- return
- }
- request.block = block
- pool.numPending--
- }
-
- func (pool *BlockPool) getPeer(peerId string) *bpPeer {
- pool.peersMtx.Lock() // Lock
- defer pool.peersMtx.Unlock()
-
- peer := pool.peers[peerId]
- return peer
- }
-
- // Sets the peer's alleged blockchain height.
- func (pool *BlockPool) SetPeerHeight(peerId string, height int) {
- pool.peersMtx.Lock() // Lock
- defer pool.peersMtx.Unlock()
-
- peer := pool.peers[peerId]
- if peer != nil {
- peer.height = height
- } else {
- peer = &bpPeer{
- height: height,
- id: peerId,
- numRequests: 0,
- }
- pool.peers[peerId] = peer
- }
- }
-
- func (pool *BlockPool) RemovePeer(peerId string) {
- pool.peersMtx.Lock() // Lock
- defer pool.peersMtx.Unlock()
-
- delete(pool.peers, peerId)
- }
-
- // Pick an available peer with at least the given minHeight.
- // If no peers are available, returns nil.
- func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer {
- pool.peersMtx.Lock()
- defer pool.peersMtx.Unlock()
-
- for _, peer := range pool.peers {
- if peer.numRequests >= maxRequestsPerPeer {
- continue
- }
- if peer.height < minHeight {
- continue
- }
- peer.numRequests++
- return peer
- }
- return nil
- }
-
- func (pool *BlockPool) decrPeer(peerId string) {
- pool.peersMtx.Lock()
- defer pool.peersMtx.Unlock()
-
- peer := pool.peers[peerId]
- if peer == nil {
- return
- }
- peer.numRequests--
- }
-
- func (pool *BlockPool) makeNextRequest() {
- pool.requestsMtx.Lock() // Lock
- defer pool.requestsMtx.Unlock()
-
- nextHeight := pool.height + len(pool.requests)
- request := &bpRequest{
- height: nextHeight,
- peerId: "",
- block: nil,
- }
-
- pool.requests[nextHeight] = request
- pool.numUnassigned++
- pool.numPending++
-
- go requestRoutine(pool, nextHeight)
- }
-
- func (pool *BlockPool) sendRequest(height int, peerId string) {
- if atomic.LoadInt32(&pool.running) == 0 {
- return
- }
- pool.requestsCh <- BlockRequest{height, peerId}
- }
-
- func (pool *BlockPool) sendTimeout(peerId string) {
- if atomic.LoadInt32(&pool.running) == 0 {
- return
- }
- pool.timeoutsCh <- peerId
- }
-
- func (pool *BlockPool) debug() string {
- pool.requestsMtx.Lock() // Lock
- defer pool.requestsMtx.Unlock()
-
- str := ""
- for h := pool.height; h < pool.height+len(pool.requests); h++ {
- if pool.requests[h] == nil {
- str += Fmt("H(%v):X ", h)
- } else {
- str += Fmt("H(%v):", h)
- str += Fmt("B?(%v) ", pool.requests[h].block != nil)
- }
- }
- return str
- }
-
- //-------------------------------------
-
- type bpPeer struct {
- id string
- height int
- numRequests int32
- }
-
- type bpRequest struct {
- height int
- peerId string
- block *types.Block
- }
-
- //-------------------------------------
-
- // Responsible for making more requests as necessary
- // Returns only when a block is found (e.g. AddBlock() is called)
- func requestRoutine(pool *BlockPool, height int) {
- for {
- var peer *bpPeer = nil
- PICK_LOOP:
- for {
- if !pool.IsRunning() {
- log.Debug("BlockPool not running. Stopping requestRoutine", "height", height)
- return
- }
- peer = pool.pickIncrAvailablePeer(height)
- if peer == nil {
- //log.Debug("No peers available", "height", height)
- time.Sleep(requestIntervalMS * time.Millisecond)
- continue PICK_LOOP
- }
- break PICK_LOOP
- }
-
- // set the peer, decrement numUnassigned
- pool.setPeerForRequest(height, peer.id)
-
- for try := 0; try < maxTries; try++ {
- pool.sendRequest(height, peer.id)
- time.Sleep(requestTimeoutSeconds * time.Second)
- // if successful the block is either in the pool,
- if pool.hasBlock(height) {
- pool.decrPeer(peer.id)
- return
- }
- // or already processed and we've moved past it
- bpHeight, _ := pool.GetStatus()
- if height < bpHeight {
- pool.decrPeer(peer.id)
- return
- }
- }
-
- // unset the peer, increment numUnassigned
- pool.removePeerForRequest(height, peer.id)
-
- // this peer failed us, try again
- pool.RemovePeer(peer.id)
- pool.sendTimeout(peer.id)
- }
- }
-
- //-------------------------------------
-
- type BlockRequest struct {
- Height int
- PeerId string
- }
|