|
|
@ -18,7 +18,8 @@ const ( |
|
|
|
// BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
|
|
|
|
BlockchainChannel = byte(0x40) |
|
|
|
|
|
|
|
trySyncIntervalMS = 50 |
|
|
|
trySyncIntervalMS = 10 |
|
|
|
|
|
|
|
// stop syncing when last block's time is
|
|
|
|
// within this much of the system time.
|
|
|
|
// stopSyncingDurationMinutes = 10
|
|
|
@ -76,8 +77,9 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl |
|
|
|
store.Height())) |
|
|
|
} |
|
|
|
|
|
|
|
const capacity = 1000 // must be bigger than peers count
|
|
|
|
requestsCh := make(chan BlockRequest, capacity) |
|
|
|
requestsCh := make(chan BlockRequest, maxTotalRequesters) |
|
|
|
|
|
|
|
const capacity = 1000 // must be bigger than peers count
|
|
|
|
errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock
|
|
|
|
|
|
|
|
pool := NewBlockPool( |
|
|
@ -209,7 +211,6 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) |
|
|
|
|
|
|
|
// Handle messages from the poolReactor telling the reactor what to do.
|
|
|
|
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
|
|
|
|
// (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
|
|
|
|
func (bcR *BlockchainReactor) poolRoutine() { |
|
|
|
|
|
|
|
trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) |
|
|
@ -224,6 +225,8 @@ func (bcR *BlockchainReactor) poolRoutine() { |
|
|
|
lastHundred := time.Now() |
|
|
|
lastRate := 0.0 |
|
|
|
|
|
|
|
didProcessCh := make(chan struct{}, 1) |
|
|
|
|
|
|
|
FOR_LOOP: |
|
|
|
for { |
|
|
|
select { |
|
|
@ -239,14 +242,17 @@ FOR_LOOP: |
|
|
|
// The pool handles timeouts, just let it go.
|
|
|
|
continue FOR_LOOP |
|
|
|
} |
|
|
|
|
|
|
|
case err := <-bcR.errorsCh: |
|
|
|
peer := bcR.Switch.Peers().Get(err.peerID) |
|
|
|
if peer != nil { |
|
|
|
bcR.Switch.StopPeerForError(peer, err) |
|
|
|
} |
|
|
|
|
|
|
|
case <-statusUpdateTicker.C: |
|
|
|
// ask for status updates
|
|
|
|
go bcR.BroadcastStatusRequest() // nolint: errcheck
|
|
|
|
|
|
|
|
case <-switchToConsensusTicker.C: |
|
|
|
height, numPending, lenRequesters := bcR.pool.GetStatus() |
|
|
|
outbound, inbound, _ := bcR.Switch.NumPeers() |
|
|
@ -261,60 +267,78 @@ FOR_LOOP: |
|
|
|
|
|
|
|
break FOR_LOOP |
|
|
|
} |
|
|
|
|
|
|
|
case <-trySyncTicker.C: // chan time
|
|
|
|
// This loop can be slow as long as it's doing syncing work.
|
|
|
|
SYNC_LOOP: |
|
|
|
for i := 0; i < 10; i++ { |
|
|
|
// See if there are any blocks to sync.
|
|
|
|
first, second := bcR.pool.PeekTwoBlocks() |
|
|
|
//bcR.Logger.Info("TrySync peeked", "first", first, "second", second)
|
|
|
|
if first == nil || second == nil { |
|
|
|
// We need both to sync the first block.
|
|
|
|
break SYNC_LOOP |
|
|
|
select { |
|
|
|
case didProcessCh <- struct{}{}: |
|
|
|
default: |
|
|
|
} |
|
|
|
|
|
|
|
case <-didProcessCh: |
|
|
|
// NOTE: It is a subtle mistake to process more than a single block
|
|
|
|
// at a time (e.g. 10) here, because we only TrySend 1 request per
|
|
|
|
// loop. The ratio mismatch can result in starving of blocks, a
|
|
|
|
// sudden burst of requests and responses, and repeat.
|
|
|
|
// Consequently, it is better to split these routines rather than
|
|
|
|
// coupling them as it's written here. TODO uncouple from request
|
|
|
|
// routine.
|
|
|
|
|
|
|
|
// See if there are any blocks to sync.
|
|
|
|
first, second := bcR.pool.PeekTwoBlocks() |
|
|
|
//bcR.Logger.Info("TrySync peeked", "first", first, "second", second)
|
|
|
|
if first == nil || second == nil { |
|
|
|
// We need both to sync the first block.
|
|
|
|
continue FOR_LOOP |
|
|
|
} else { |
|
|
|
// Try again quickly next loop.
|
|
|
|
didProcessCh <- struct{}{} |
|
|
|
} |
|
|
|
|
|
|
|
firstParts := first.MakePartSet(state.ConsensusParams.BlockPartSizeBytes) |
|
|
|
firstPartsHeader := firstParts.Header() |
|
|
|
firstID := types.BlockID{first.Hash(), firstPartsHeader} |
|
|
|
// Finally, verify the first block using the second's commit
|
|
|
|
// NOTE: we can probably make this more efficient, but note that calling
|
|
|
|
// first.Hash() doesn't verify the tx contents, so MakePartSet() is
|
|
|
|
// currently necessary.
|
|
|
|
err := state.Validators.VerifyCommit( |
|
|
|
chainID, firstID, first.Height, second.LastCommit) |
|
|
|
if err != nil { |
|
|
|
bcR.Logger.Error("Error in validation", "err", err) |
|
|
|
peerID := bcR.pool.RedoRequest(first.Height) |
|
|
|
peer := bcR.Switch.Peers().Get(peerID) |
|
|
|
if peer != nil { |
|
|
|
// NOTE: we've already removed the peer's request, but we
|
|
|
|
// still need to clean up the rest.
|
|
|
|
bcR.Switch.StopPeerForError(peer, fmt.Errorf("BlockchainReactor validation error: %v", err)) |
|
|
|
} |
|
|
|
firstParts := first.MakePartSet(state.ConsensusParams.BlockPartSizeBytes) |
|
|
|
firstPartsHeader := firstParts.Header() |
|
|
|
firstID := types.BlockID{first.Hash(), firstPartsHeader} |
|
|
|
// Finally, verify the first block using the second's commit
|
|
|
|
// NOTE: we can probably make this more efficient, but note that calling
|
|
|
|
// first.Hash() doesn't verify the tx contents, so MakePartSet() is
|
|
|
|
// currently necessary.
|
|
|
|
err := state.Validators.VerifyCommit( |
|
|
|
chainID, firstID, first.Height, second.LastCommit) |
|
|
|
continue FOR_LOOP |
|
|
|
} else { |
|
|
|
bcR.pool.PopRequest() |
|
|
|
|
|
|
|
// TODO: batch saves so we dont persist to disk every block
|
|
|
|
bcR.store.SaveBlock(first, firstParts, second.LastCommit) |
|
|
|
|
|
|
|
// TODO: same thing for app - but we would need a way to
|
|
|
|
// get the hash without persisting the state
|
|
|
|
var err error |
|
|
|
state, err = bcR.blockExec.ApplyBlock(state, firstID, first) |
|
|
|
if err != nil { |
|
|
|
bcR.Logger.Error("Error in validation", "err", err) |
|
|
|
peerID := bcR.pool.RedoRequest(first.Height) |
|
|
|
peer := bcR.Switch.Peers().Get(peerID) |
|
|
|
if peer != nil { |
|
|
|
bcR.Switch.StopPeerForError(peer, fmt.Errorf("BlockchainReactor validation error: %v", err)) |
|
|
|
} |
|
|
|
break SYNC_LOOP |
|
|
|
} else { |
|
|
|
bcR.pool.PopRequest() |
|
|
|
|
|
|
|
// TODO: batch saves so we dont persist to disk every block
|
|
|
|
bcR.store.SaveBlock(first, firstParts, second.LastCommit) |
|
|
|
|
|
|
|
// TODO: same thing for app - but we would need a way to
|
|
|
|
// get the hash without persisting the state
|
|
|
|
var err error |
|
|
|
state, err = bcR.blockExec.ApplyBlock(state, firstID, first) |
|
|
|
if err != nil { |
|
|
|
// TODO This is bad, are we zombie?
|
|
|
|
cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", |
|
|
|
first.Height, first.Hash(), err)) |
|
|
|
} |
|
|
|
blocksSynced++ |
|
|
|
|
|
|
|
if blocksSynced%100 == 0 { |
|
|
|
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) |
|
|
|
bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height, |
|
|
|
"max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate) |
|
|
|
lastHundred = time.Now() |
|
|
|
} |
|
|
|
// TODO This is bad, are we zombie?
|
|
|
|
cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", |
|
|
|
first.Height, first.Hash(), err)) |
|
|
|
} |
|
|
|
blocksSynced++ |
|
|
|
|
|
|
|
if blocksSynced%100 == 0 { |
|
|
|
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) |
|
|
|
bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height, |
|
|
|
"max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate) |
|
|
|
lastHundred = time.Now() |
|
|
|
} |
|
|
|
} |
|
|
|
continue FOR_LOOP |
|
|
|
|
|
|
|
case <-bcR.Quit(): |
|
|
|
break FOR_LOOP |
|
|
|
} |
|
|
|