From 6c7d85c64c2a7e4176049eff48d0b3b80c95fa63 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Sun, 22 Mar 2015 12:46:53 -0700 Subject: [PATCH] Basic test for block_pool --- block/block.go | 15 +++++ block/{block_pool.go => pool.go} | 107 +++++++++++++++++++++---------- block/pool_test.go | 74 +++++++++++++++++++++ 3 files changed, 163 insertions(+), 33 deletions(-) rename block/{block_pool.go => pool.go} (74%) create mode 100644 block/pool_test.go diff --git a/block/block.go b/block/block.go index bfa8c28f4..fa100f4e0 100644 --- a/block/block.go +++ b/block/block.go @@ -54,6 +54,9 @@ func (b *Block) ValidateBasic(lastBlockHeight uint, lastBlockHash []byte, } func (b *Block) Hash() []byte { + if b.Header == nil || b.Validation == nil || b.Data == nil { + return nil + } hashes := [][]byte{ b.Header.Hash(), b.Validation.Hash(), @@ -81,6 +84,9 @@ func (b *Block) String() string { } func (b *Block) StringIndented(indent string) string { + if b == nil { + return "nil-Block" + } return fmt.Sprintf(`Block{ %s %v %s %v @@ -126,6 +132,9 @@ func (h *Header) Hash() []byte { } func (h *Header) StringIndented(indent string) string { + if h == nil { + return "nil-Header" + } return fmt.Sprintf(`Header{ %s Network: %v %s Height: %v @@ -212,6 +221,9 @@ func (v *Validation) Hash() []byte { } func (v *Validation) StringIndented(indent string) string { + if v == nil { + return "nil-Validation" + } commitStrings := make([]string, len(v.Commits)) for i, commit := range v.Commits { commitStrings[i] = commit.String() @@ -254,6 +266,9 @@ func (data *Data) Hash() []byte { } func (data *Data) StringIndented(indent string) string { + if data == nil { + return "nil-Data" + } txStrings := make([]string, len(data.Txs)) for i, tx := range data.Txs { txStrings[i] = fmt.Sprintf("Tx:%v", tx) diff --git a/block/block_pool.go b/block/pool.go similarity index 74% rename from block/block_pool.go rename to block/pool.go index 1326c6e2e..a50bc7ed1 100644 --- a/block/block_pool.go +++ b/block/pool.go @@ -1,6 +1,7 @@ package block import ( + "math/rand" "sync/atomic" "time" @@ -27,18 +28,17 @@ type BlockRequest struct { type BlockPool struct { peers map[string]*bpPeer blockInfos map[uint]*bpBlockInfo - height uint // the lowest key in blockInfos. - started int32 - stopped int32 + height uint // the lowest key in blockInfos. + started int32 // atomic + stopped int32 // atomic numPending int32 numTotal int32 - quit chan struct{} - - eventsCh chan interface{} + eventsCh chan interface{} // internal events. requestsCh chan<- BlockRequest // output of new requests to make. timeoutsCh chan<- string // output of peers that timed out. blocksCh chan<- *Block // output of ordered blocks. repeater *RepeatTimer // for requesting more bocks. + quit chan struct{} } func NewBlockPool(start uint, timeoutsCh chan<- string, requestsCh chan<- BlockRequest, blocksCh chan<- *Block) *BlockPool { @@ -55,6 +55,7 @@ func NewBlockPool(start uint, timeoutsCh chan<- string, requestsCh chan<- BlockR eventsCh: make(chan interface{}, eventsChannelCapacity), requestsCh: requestsCh, timeoutsCh: timeoutsCh, + blocksCh: blocksCh, repeater: NewRepeatTimer("", requestIntervalMS*time.Millisecond), } } @@ -95,19 +96,8 @@ FOR_LOOP: case msg := <-bp.eventsCh: bp.handleEvent(msg) case <-bp.repeater.Ch: - // make more requests if necessary. - for i := 0; i < requestBatchSize; i++ { - if atomic.LoadInt32(&bp.numPending) < maxPendingRequests && - atomic.LoadInt32(&bp.numTotal) < maxTotalRequests { - atomic.AddInt32(&bp.numPending, 1) - atomic.AddInt32(&bp.numTotal, 1) - requestHeight := bp.height + uint(bp.numTotal) - blockInfo := bpNewBlockInfo(requestHeight) - bp.blockInfos[requestHeight] = blockInfo - } else { - break - } - } + bp.makeMoreBlockInfos() + bp.requestBlocksFromRandomPeers(10) case <-bp.quit: break FOR_LOOP } @@ -129,24 +119,26 @@ func (bp *BlockPool) handleEvent(event_ interface{}) { if peer != nil { peer.good++ } + delete(peer.requests, event.block.Height) if blockInfo.block == nil { // peer is the first to give it to us. blockInfo.block = event.block blockInfo.blockBy = peer.id - atomic.AddInt32(&bp.numPending, -1) + bp.numPending-- if event.block.Height == bp.height { - // push block to blocksCh. - atomic.AddInt32(&bp.numTotal, -1) - delete(peer.requests, bp.height) - delete(blockInfo.requests, peer.id) - go func() { bp.blocksCh <- event.block }() + go bp.pushBlocksFromStart() } } } case bpPeerStatus: // we have updated (or new) status from peer, // request blocks if possible. - bp.requestBlocksFromPeer(event.peerId, event.height) + peer := bp.peers[event.peerId] + if peer == nil { + peer = bpNewPeer(event.peerId, event.height) + bp.peers[peer.id] = peer + } + bp.requestBlocksFromPeer(peer) case bpRequestTimeout: peer := bp.peers[event.peerId] request := peer.requests[event.height] @@ -158,8 +150,8 @@ func (bp *BlockPool) handleEvent(event_ interface{}) { if request.tries < maxTries { // try again, start timer again. request.start(bp.eventsCh) - event := BlockRequest{event.height, peer.id} - go func() { bp.requestsCh <- event }() + msg := BlockRequest{event.height, peer.id} + go func() { bp.requestsCh <- msg }() } else { // delete the request. if peer != nil { @@ -175,11 +167,15 @@ func (bp *BlockPool) handleEvent(event_ interface{}) { } } -func (bp *BlockPool) requestBlocksFromPeer(peerId string, height uint) { - peer := bp.peers[peerId] - if peer == nil { - bp.peers[peerId] = bpNewPeer(peerId, height) +func (bp *BlockPool) requestBlocksFromRandomPeers(maxPeers int) { + chosen := bp.pickAvailablePeers(maxPeers) + log.Debug("requestBlocksFromRandomPeers", "chosen", len(chosen)) + for _, peer := range chosen { + bp.requestBlocksFromPeer(peer) } +} + +func (bp *BlockPool) requestBlocksFromPeer(peer *bpPeer) { // If peer is available and can provide something... for height := bp.height; peer.available(); height++ { blockInfo := bp.blockInfos[height] @@ -187,7 +183,6 @@ func (bp *BlockPool) requestBlocksFromPeer(peerId string, height uint) { // We're out of range. return } - needsMorePeers := blockInfo.needsMorePeers() alreadyAskedPeer := blockInfo.requests[peer.id] != nil if needsMorePeers && !alreadyAskedPeer { @@ -205,6 +200,52 @@ func (bp *BlockPool) requestBlocksFromPeer(peerId string, height uint) { } } +func (bp *BlockPool) makeMoreBlockInfos() { + // make more requests if necessary. + for i := 0; i < requestBatchSize; i++ { + if bp.numPending < maxPendingRequests && bp.numTotal < maxTotalRequests { + // Make a request for the next block height + requestHeight := bp.height + uint(bp.numTotal) + log.Debug("New blockInfo", "height", requestHeight) + blockInfo := bpNewBlockInfo(requestHeight) + bp.blockInfos[requestHeight] = blockInfo + bp.numPending++ + bp.numTotal++ + } else { + break + } + } +} + +func (bp *BlockPool) pickAvailablePeers(choose int) []*bpPeer { + available := []*bpPeer{} + for _, peer := range bp.peers { + if peer.available() { + available = append(available, peer) + } + } + perm := rand.Perm(MinInt(choose, len(available))) + chosen := make([]*bpPeer, len(perm)) + for i, idx := range perm { + chosen[i] = available[idx] + } + return chosen +} + +func (bp *BlockPool) pushBlocksFromStart() { + for height := bp.height; ; height++ { + // push block to blocksCh. + blockInfo := bp.blockInfos[height] + if blockInfo == nil || blockInfo.block == nil { + break + } + bp.numTotal-- + bp.height++ + delete(bp.blockInfos, height) + go func() { bp.blocksCh <- blockInfo.block }() + } +} + //----------------------------------------------------------------------------- type bpBlockInfo struct { diff --git a/block/pool_test.go b/block/pool_test.go new file mode 100644 index 000000000..2b8aa0868 --- /dev/null +++ b/block/pool_test.go @@ -0,0 +1,74 @@ +package block + +import ( + "math/rand" + "testing" + + . "github.com/tendermint/tendermint/common" +) + +type testPeer struct { + id string + height uint +} + +func makePeers(numPeers int, minHeight, maxHeight uint) map[string]testPeer { + peers := make(map[string]testPeer, numPeers) + for i := 0; i < numPeers; i++ { + peerId := RandStr(12) + height := minHeight + uint(rand.Intn(int(maxHeight-minHeight))) + peers[peerId] = testPeer{peerId, height} + } + return peers +} + +func TestBasic(t *testing.T) { + // 100 peers anywhere at height 0 to 1000. + peers := makePeers(100, 0, 1000) + + start := uint(42) + maxHeight := uint(300) + timeoutsCh := make(chan string) + requestsCh := make(chan BlockRequest) + blocksCh := make(chan *Block) + + pool := NewBlockPool(start, timeoutsCh, requestsCh, blocksCh) + pool.Start() + + // Introduce each peer. + go func() { + for _, peer := range peers { + pool.SetPeerStatus(peer.id, peer.height) + } + }() + + lastSeenBlock := uint(41) + + // Pull from channels + for { + select { + case peerId := <-timeoutsCh: + t.Errorf("timeout: %v", peerId) + case request := <-requestsCh: + log.Debug("Pulled new BlockRequest", "request", request) + // After a while, pretend like we got a block from the peer. + go func() { + block := &Block{Header: &Header{Height: request.Height}} + pool.AddBlock(block, request.PeerId) + log.Debug("Added block", "block", request.Height, "peer", request.PeerId) + }() + case block := <-blocksCh: + log.Debug("Pulled new Block", "height", block.Height) + if block.Height != lastSeenBlock+1 { + t.Fatalf("Wrong order of blocks seen. Expected: %v Got: %v", lastSeenBlock+1, block.Height) + } + lastSeenBlock++ + if block.Height == maxHeight { + return // Done! + } + } + } + + pool.Stop() + +}