Browse Source

BlockchainReactor syncs first before ConsensusReactor.

pull/43/merge
Jae Kwon 9 years ago
parent
commit
cebfae60c7
9 changed files with 232 additions and 116 deletions
  1. +107
    -107
      blockchain/pool.go
  2. +59
    -1
      blockchain/reactor.go
  3. +2
    -0
      consensus/reactor.go
  4. +2
    -0
      consensus/state.go
  5. +1
    -1
      consensus/vote_set.go
  6. +5
    -5
      daemon/daemon.go
  7. +6
    -1
      p2p/peer_set.go
  8. +47
    -0
      state/validator_set.go
  9. +3
    -1
      types/block.go

+ 107
- 107
blockchain/pool.go View File

@ -60,32 +60,32 @@ func NewBlockPool(start uint, requestsCh chan<- BlockRequest, timeoutsCh chan<-
}
}
func (bp *BlockPool) Start() {
if atomic.CompareAndSwapInt32(&bp.running, 0, 1) {
func (pool *BlockPool) Start() {
if atomic.CompareAndSwapInt32(&pool.running, 0, 1) {
log.Info("Starting BlockPool")
go bp.run()
go pool.run()
}
}
func (bp *BlockPool) Stop() {
if atomic.CompareAndSwapInt32(&bp.running, 1, 0) {
func (pool *BlockPool) Stop() {
if atomic.CompareAndSwapInt32(&pool.running, 1, 0) {
log.Info("Stopping BlockPool")
bp.repeater.Stop()
pool.repeater.Stop()
}
}
func (bp *BlockPool) IsRunning() bool {
return atomic.LoadInt32(&bp.running) == 1
func (pool *BlockPool) IsRunning() bool {
return atomic.LoadInt32(&pool.running) == 1
}
// Run spawns requests as needed.
func (bp *BlockPool) run() {
func (pool *BlockPool) run() {
RUN_LOOP:
for {
if atomic.LoadInt32(&bp.running) == 0 {
if atomic.LoadInt32(&pool.running) == 0 {
break RUN_LOOP
}
height, numPending, numTotal := bp.GetStatus()
height, numPending, numTotal := pool.GetStatus()
log.Debug("BlockPool.run", "height", height, "numPending", numPending,
"numTotal", numTotal)
if numPending >= maxPendingRequests {
@ -96,91 +96,91 @@ RUN_LOOP:
time.Sleep(requestIntervalMS * time.Millisecond)
} else {
// request for more blocks.
height := bp.nextHeight()
bp.makeRequest(height)
height := pool.nextHeight()
pool.makeRequest(height)
}
}
}
func (bp *BlockPool) GetStatus() (uint, int32, int32) {
bp.requestsMtx.Lock() // Lock
defer bp.requestsMtx.Unlock()
func (pool *BlockPool) GetStatus() (uint, int32, int32) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
return bp.height, bp.numPending, bp.numTotal
return pool.height, pool.numPending, pool.numTotal
}
// We need to see the second block's Validation to validate the first block.
// So we peek two blocks at a time.
func (bp *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
bp.requestsMtx.Lock() // Lock
defer bp.requestsMtx.Unlock()
func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
if r := bp.requests[bp.height]; r != nil {
if r := pool.requests[pool.height]; r != nil {
first = r.block
}
if r := bp.requests[bp.height+1]; r != nil {
if r := pool.requests[pool.height+1]; r != nil {
second = r.block
}
return
}
// Pop the first block at bp.height
// Pop the first block at pool.height
// It must have been validated by 'second'.Validation from PeekTwoBlocks().
func (bp *BlockPool) PopRequest() {
bp.requestsMtx.Lock() // Lock
defer bp.requestsMtx.Unlock()
func (pool *BlockPool) PopRequest() {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
if r := bp.requests[bp.height]; r == nil || r.block == nil {
if r := pool.requests[pool.height]; r == nil || r.block == nil {
panic("PopRequest() requires a valid block")
}
delete(bp.requests, bp.height)
bp.height++
bp.numTotal--
delete(pool.requests, pool.height)
pool.height++
pool.numTotal--
}
// Invalidates the block at bp.height.
// Invalidates the block at pool.height.
// Remove the peer and request from others.
func (bp *BlockPool) RedoRequest(height uint) {
bp.requestsMtx.Lock() // Lock
defer bp.requestsMtx.Unlock()
func (pool *BlockPool) RedoRequest(height uint) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
request := bp.requests[height]
request := pool.requests[height]
if request.block == nil {
panic("Expected block to be non-nil")
}
bp.RemovePeer(request.peerId) // Lock on peersMtx.
pool.RemovePeer(request.peerId) // Lock on peersMtx.
request.block = nil
request.peerId = ""
bp.numPending++
pool.numPending++
go requestRoutine(bp, height)
go requestRoutine(pool, height)
}
func (bp *BlockPool) hasBlock(height uint) bool {
bp.requestsMtx.Lock() // Lock
defer bp.requestsMtx.Unlock()
func (pool *BlockPool) hasBlock(height uint) bool {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
request := bp.requests[height]
request := pool.requests[height]
return request != nil && request.block != nil
}
func (bp *BlockPool) setPeerForRequest(height uint, peerId string) {
bp.requestsMtx.Lock() // Lock
defer bp.requestsMtx.Unlock()
func (pool *BlockPool) setPeerForRequest(height uint, peerId string) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
request := bp.requests[height]
request := pool.requests[height]
if request == nil {
return
}
request.peerId = peerId
}
func (bp *BlockPool) AddBlock(block *types.Block, peerId string) {
bp.requestsMtx.Lock() // Lock
defer bp.requestsMtx.Unlock()
func (pool *BlockPool) AddBlock(block *types.Block, peerId string) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
request := bp.requests[block.Height]
request := pool.requests[block.Height]
if request == nil {
return
}
@ -191,23 +191,23 @@ func (bp *BlockPool) AddBlock(block *types.Block, peerId string) {
return
}
request.block = block
bp.numPending--
pool.numPending--
}
func (bp *BlockPool) getPeer(peerId string) *bpPeer {
bp.peersMtx.Lock() // Lock
defer bp.peersMtx.Unlock()
func (pool *BlockPool) getPeer(peerId string) *bpPeer {
pool.peersMtx.Lock() // Lock
defer pool.peersMtx.Unlock()
peer := bp.peers[peerId]
peer := pool.peers[peerId]
return peer
}
// Sets the peer's blockchain height.
func (bp *BlockPool) SetPeerHeight(peerId string, height uint) {
bp.peersMtx.Lock() // Lock
defer bp.peersMtx.Unlock()
func (pool *BlockPool) SetPeerHeight(peerId string, height uint) {
pool.peersMtx.Lock() // Lock
defer pool.peersMtx.Unlock()
peer := bp.peers[peerId]
peer := pool.peers[peerId]
if peer != nil {
peer.height = height
} else {
@ -216,24 +216,24 @@ func (bp *BlockPool) SetPeerHeight(peerId string, height uint) {
id: peerId,
numRequests: 0,
}
bp.peers[peerId] = peer
pool.peers[peerId] = peer
}
}
func (bp *BlockPool) RemovePeer(peerId string) {
bp.peersMtx.Lock() // Lock
defer bp.peersMtx.Unlock()
func (pool *BlockPool) RemovePeer(peerId string) {
pool.peersMtx.Lock() // Lock
defer pool.peersMtx.Unlock()
delete(bp.peers, peerId)
delete(pool.peers, peerId)
}
// Pick an available peer with at least the given minHeight.
// If no peers are available, returns nil.
func (bp *BlockPool) pickIncrAvailablePeer(minHeight uint) *bpPeer {
bp.peersMtx.Lock()
defer bp.peersMtx.Unlock()
func (pool *BlockPool) pickIncrAvailablePeer(minHeight uint) *bpPeer {
pool.peersMtx.Lock()
defer pool.peersMtx.Unlock()
for _, peer := range bp.peers {
for _, peer := range pool.peers {
if peer.numRequests >= maxRequestsPerPeer {
continue
}
@ -247,69 +247,69 @@ func (bp *BlockPool) pickIncrAvailablePeer(minHeight uint) *bpPeer {
return nil
}
func (bp *BlockPool) decrPeer(peerId string) {
bp.peersMtx.Lock()
defer bp.peersMtx.Unlock()
func (pool *BlockPool) decrPeer(peerId string) {
pool.peersMtx.Lock()
defer pool.peersMtx.Unlock()
peer := bp.peers[peerId]
peer := pool.peers[peerId]
if peer == nil {
return
}
peer.numRequests--
}
func (bp *BlockPool) nextHeight() uint {
bp.requestsMtx.Lock() // Lock
defer bp.requestsMtx.Unlock()
func (pool *BlockPool) nextHeight() uint {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
return bp.height + uint(bp.numTotal)
return pool.height + uint(pool.numTotal)
}
func (bp *BlockPool) makeRequest(height uint) {
bp.requestsMtx.Lock() // Lock
defer bp.requestsMtx.Unlock()
func (pool *BlockPool) makeRequest(height uint) {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
request := &bpRequest{
height: height,
peerId: "",
block: nil,
}
bp.requests[height] = request
pool.requests[height] = request
nextHeight := bp.height + uint(bp.numTotal)
nextHeight := pool.height + uint(pool.numTotal)
if nextHeight == height {
bp.numTotal++
bp.numPending++
pool.numTotal++
pool.numPending++
}
go requestRoutine(bp, height)
go requestRoutine(pool, height)
}
func (bp *BlockPool) sendRequest(height uint, peerId string) {
if atomic.LoadInt32(&bp.running) == 0 {
func (pool *BlockPool) sendRequest(height uint, peerId string) {
if atomic.LoadInt32(&pool.running) == 0 {
return
}
bp.requestsCh <- BlockRequest{height, peerId}
pool.requestsCh <- BlockRequest{height, peerId}
}
func (bp *BlockPool) sendTimeout(peerId string) {
if atomic.LoadInt32(&bp.running) == 0 {
func (pool *BlockPool) sendTimeout(peerId string) {
if atomic.LoadInt32(&pool.running) == 0 {
return
}
bp.timeoutsCh <- peerId
pool.timeoutsCh <- peerId
}
func (bp *BlockPool) debug() string {
bp.requestsMtx.Lock() // Lock
defer bp.requestsMtx.Unlock()
func (pool *BlockPool) debug() string {
pool.requestsMtx.Lock() // Lock
defer pool.requestsMtx.Unlock()
str := ""
for h := bp.height; h < bp.height+uint(bp.numTotal); h++ {
if bp.requests[h] == nil {
for h := pool.height; h < pool.height+uint(pool.numTotal); h++ {
if pool.requests[h] == nil {
str += Fmt("H(%v):X ", h)
} else {
str += Fmt("H(%v):", h)
str += Fmt("B?(%v) ", bp.requests[h].block != nil)
str += Fmt("B?(%v) ", pool.requests[h].block != nil)
}
}
return str
@ -333,15 +333,15 @@ type bpRequest struct {
// Responsible for making more requests as necessary
// Returns when a block is found (e.g. AddBlock() is called)
func requestRoutine(bp *BlockPool, height uint) {
func requestRoutine(pool *BlockPool, height uint) {
for {
var peer *bpPeer = nil
PICK_LOOP:
for {
if !bp.IsRunning() {
if !pool.IsRunning() {
return
}
peer = bp.pickIncrAvailablePeer(height)
peer = pool.pickIncrAvailablePeer(height)
if peer == nil {
time.Sleep(requestIntervalMS * time.Millisecond)
continue PICK_LOOP
@ -349,24 +349,24 @@ func requestRoutine(bp *BlockPool, height uint) {
break PICK_LOOP
}
bp.setPeerForRequest(height, peer.id)
pool.setPeerForRequest(height, peer.id)
for try := 0; try < maxTries; try++ {
bp.sendRequest(height, peer.id)
pool.sendRequest(height, peer.id)
time.Sleep(requestTimeoutSeconds * time.Second)
if bp.hasBlock(height) {
bp.decrPeer(peer.id)
if pool.hasBlock(height) {
pool.decrPeer(peer.id)
return
}
bpHeight, _, _ := bp.GetStatus()
bpHeight, _, _ := pool.GetStatus()
if height < bpHeight {
bp.decrPeer(peer.id)
pool.decrPeer(peer.id)
return
}
}
bp.RemovePeer(peer.id)
bp.sendTimeout(peer.id)
pool.RemovePeer(peer.id)
pool.sendTimeout(peer.id)
}
}


+ 59
- 1
blockchain/reactor.go View File

@ -8,7 +8,9 @@ import (
"time"
"github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
)
@ -16,11 +18,17 @@ const (
BlockchainChannel = byte(0x40)
defaultChannelCapacity = 100
defaultSleepIntervalMS = 500
trySyncIntervalMS = 100
// stop syncing when last block's time is
// within this much of the system time.
stopSyncingDurationMinutes = 10
)
// BlockchainReactor handles long-term catchup syncing.
type BlockchainReactor struct {
sw *p2p.Switch
state *sm.State
store *BlockStore
pool *BlockPool
requestsCh chan BlockRequest
@ -31,7 +39,10 @@ type BlockchainReactor struct {
stopped uint32
}
func NewBlockchainReactor(store *BlockStore) *BlockchainReactor {
func NewBlockchainReactor(state *sm.State, store *BlockStore) *BlockchainReactor {
if state.LastBlockHeight != store.Height() {
panic(Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
}
requestsCh := make(chan BlockRequest, defaultChannelCapacity)
timeoutsCh := make(chan string, defaultChannelCapacity)
pool := NewBlockPool(
@ -40,6 +51,7 @@ func NewBlockchainReactor(store *BlockStore) *BlockchainReactor {
timeoutsCh,
)
bcR := &BlockchainReactor{
state: state,
store: store,
pool: pool,
requestsCh: requestsCh,
@ -129,7 +141,11 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte)
}
}
// Handle messages from the poolReactor telling the reactor what to do.
func (bcR *BlockchainReactor) poolRoutine() {
trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
FOR_LOOP:
for {
select {
@ -150,6 +166,48 @@ FOR_LOOP:
// Peer timed out.
peer := bcR.sw.Peers().Get(peerId)
bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
case _ = <-trySyncTicker.C: // chan time
var lastValidatedBlock *types.Block
SYNC_LOOP:
for i := 0; i < 10; i++ {
// See if there are any blocks to sync.
first, second := bcR.pool.PeekTwoBlocks()
if first == nil || second == nil {
// We need both to sync the first block.
break SYNC_LOOP
}
firstParts := first.MakePartSet().Header()
// Finally, verify the first block using the second's validation.
err := bcR.state.BondedValidators.VerifyValidation(
first.Hash(), firstParts, first.Height, second.Validation)
if err != nil {
bcR.pool.RedoRequest(first.Height)
break SYNC_LOOP
} else {
bcR.pool.PopRequest()
err := bcR.state.AppendBlock(first, firstParts)
if err != nil {
// TODO This is bad, are we zombie?
panic(Fmt("Failed to process committed block: %v", err))
}
lastValidatedBlock = first
}
}
// We're done syncing for now (will do again shortly)
// See if we want to stop syncing and turn on the
// consensus reactor.
// TODO: use other heuristics too besides blocktime.
// It's not a security concern, as it only needs to happen
// upon node sync, and there's also a second (slower)
// method of syncing in the consensus reactor.
if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute {
go func() {
bcR.sw.Reactor("BLOCKCHAIN").Stop()
bcR.sw.Reactor("CONSENSUS").Start(bcR.sw)
}()
break FOR_LOOP
}
continue FOR_LOOP
case <-bcR.quit:
break FOR_LOOP
}


+ 2
- 0
consensus/reactor.go View File

@ -29,6 +29,8 @@ const (
//-----------------------------------------------------------------------------
// The reactor's underlying ConsensusState may change state at any time.
// We atomically copy the RoundState struct before using it.
type ConsensusReactor struct {
sw *p2p.Switch
started uint32


+ 2
- 0
consensus/state.go View File

@ -467,6 +467,8 @@ func (cs *ConsensusState) updateToState(state *sm.State) {
// Reset fields based on state.
validators := state.BondedValidators
height := state.LastBlockHeight + 1 // next desired block height
// RoundState fields
cs.Height = height
cs.Round = 0
cs.Step = RoundStepNewHeight


+ 1
- 1
consensus/vote_set.go View File

@ -34,7 +34,7 @@ type VoteSet struct {
maj23Exists bool
}
// Constructs a new VoteSet struct used to accumulate votes for each round.
// Constructs a new VoteSet struct used to accumulate votes for given height/round.
func NewVoteSet(height uint, round uint, type_ byte, valSet *sm.ValidatorSet) *VoteSet {
if height == 0 {
panic("Cannot make VoteSet for height == 0, doesn't make sense.")


+ 5
- 5
daemon/daemon.go View File

@ -55,7 +55,7 @@ func NewNode() *Node {
pexReactor := p2p.NewPEXReactor(book)
// Get BlockchainReactor
bcReactor := bc.NewBlockchainReactor(blockStore)
bcReactor := bc.NewBlockchainReactor(state, blockStore)
// Get MempoolReactor
mempool := mempl.NewMempool(state.Copy())
@ -70,10 +70,10 @@ func NewNode() *Node {
sw := p2p.NewSwitch()
sw.SetChainId(state.Hash(), config.App().GetString("Network"))
sw.AddReactor("PEX", pexReactor)
//sw.AddReactor("BLOCKCHAIN", bcReactor)
sw.AddReactor("MEMPOOL", mempoolReactor)
sw.AddReactor("CONSENSUS", consensusReactor)
sw.AddReactor("PEX", pexReactor).Start(sw)
sw.AddReactor("BLOCKCHAIN", bcReactor).Start(sw)
sw.AddReactor("MEMPOOL", mempoolReactor).Start(sw)
sw.AddReactor("CONSENSUS", consensusReactor) // Do not start yet.
return &Node{
sw: sw,


+ 6
- 1
p2p/peer_set.go View File

@ -59,7 +59,12 @@ func (ps *PeerSet) Has(peerKey string) bool {
func (ps *PeerSet) Get(peerKey string) *Peer {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return ps.lookup[peerKey].peer
item, ok := ps.lookup[peerKey]
if ok {
return item.peer
} else {
return nil
}
}
func (ps *PeerSet) Remove(peer *Peer) {


+ 47
- 0
state/validator_set.go View File

@ -2,12 +2,15 @@ package state
import (
"bytes"
"errors"
"fmt"
"sort"
"strings"
"github.com/tendermint/tendermint/account"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/merkle"
"github.com/tendermint/tendermint/types"
)
// ValidatorSet represent a set of *Validator at a given height.
@ -198,6 +201,50 @@ func (valSet *ValidatorSet) Iterate(fn func(index uint, val *Validator) bool) {
}
}
// Verify that +2/3 of the set had signed the given signBytes
func (valSet *ValidatorSet) VerifyValidation(hash []byte, parts types.PartSetHeader, height uint, v *types.Validation) error {
if valSet.Size() != uint(len(v.Commits)) {
return errors.New(Fmt("Invalid validation -- wrong set size: %v vs %v",
valSet.Size(), len(v.Commits)))
}
talliedVotingPower := uint64(0)
seenValidators := map[string]struct{}{}
for idx, commit := range v.Commits {
// may be zero, in which case skip.
if commit.Signature.IsZero() {
continue
}
_, val := valSet.GetByIndex(uint(idx))
commitSignBytes := account.SignBytes(&types.Vote{
Height: height, Round: commit.Round, Type: types.VoteTypeCommit,
BlockHash: hash,
BlockParts: parts,
})
// Validate
if _, seen := seenValidators[string(val.Address)]; seen {
return Errorf("Duplicate validator for commit %v for Validation %v", commit, v)
}
if !val.PubKey.VerifyBytes(commitSignBytes, commit.Signature) {
return Errorf("Invalid signature for commit %v for Validation %v", commit, v)
}
// Tally
seenValidators[string(val.Address)] = struct{}{}
talliedVotingPower += val.VotingPower
}
if talliedVotingPower > valSet.TotalVotingPower()*2/3 {
return nil
} else {
return Errorf("insufficient voting power %v, needed %v",
talliedVotingPower, (valSet.TotalVotingPower()*2/3 + 1))
}
}
func (valSet *ValidatorSet) String() string {
return valSet.StringIndented("")
}


+ 3
- 1
types/block.go View File

@ -39,7 +39,9 @@ func (b *Block) ValidateBasic(lastBlockHeight uint, lastBlockHash []byte,
if !b.LastBlockParts.Equals(lastBlockParts) {
return errors.New("Wrong Block.Header.LastBlockParts")
}
/* TODO: Determine bounds.
/* TODO: Determine bounds
See blockchain/reactor "stopSyncingDurationMinutes"
if !b.Time.After(lastBlockTime) {
return errors.New("Invalid Block.Header.Time")
}


Loading…
Cancel
Save