|
|
@ -13,6 +13,7 @@ import ( |
|
|
|
tmsync "github.com/tendermint/tendermint/internal/libs/sync" |
|
|
|
"github.com/tendermint/tendermint/internal/p2p" |
|
|
|
"github.com/tendermint/tendermint/libs/log" |
|
|
|
"github.com/tendermint/tendermint/libs/sync" |
|
|
|
bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain" |
|
|
|
"github.com/tendermint/tendermint/state" |
|
|
|
"github.com/tendermint/tendermint/types" |
|
|
@ -34,8 +35,8 @@ type blockStore interface { |
|
|
|
type BlockchainReactor struct { |
|
|
|
p2p.BaseReactor |
|
|
|
|
|
|
|
fastSync bool // if true, enable fast sync on start
|
|
|
|
stateSynced bool // set to true when SwitchToFastSync is called by state sync
|
|
|
|
fastSync *sync.AtomicBool // enable fast sync on start when it's been Set
|
|
|
|
stateSynced bool // set to true when SwitchToFastSync is called by state sync
|
|
|
|
scheduler *Routine |
|
|
|
processor *Routine |
|
|
|
logger log.Logger |
|
|
@ -48,6 +49,10 @@ type BlockchainReactor struct { |
|
|
|
reporter behavior.Reporter |
|
|
|
io iIO |
|
|
|
store blockStore |
|
|
|
|
|
|
|
syncStartTime time.Time |
|
|
|
syncStartHeight int64 |
|
|
|
lastSyncRate float64 // # blocks sync per sec base on the last 100 blocks
|
|
|
|
} |
|
|
|
|
|
|
|
type blockApplier interface { |
|
|
@ -68,12 +73,15 @@ func newReactor(state state.State, store blockStore, reporter behavior.Reporter, |
|
|
|
processor := newPcState(pContext) |
|
|
|
|
|
|
|
return &BlockchainReactor{ |
|
|
|
scheduler: newRoutine("scheduler", scheduler.handle, chBufferSize), |
|
|
|
processor: newRoutine("processor", processor.handle, chBufferSize), |
|
|
|
store: store, |
|
|
|
reporter: reporter, |
|
|
|
logger: log.NewNopLogger(), |
|
|
|
fastSync: fastSync, |
|
|
|
scheduler: newRoutine("scheduler", scheduler.handle, chBufferSize), |
|
|
|
processor: newRoutine("processor", processor.handle, chBufferSize), |
|
|
|
store: store, |
|
|
|
reporter: reporter, |
|
|
|
logger: log.NewNopLogger(), |
|
|
|
fastSync: sync.NewBool(fastSync), |
|
|
|
syncStartHeight: initHeight, |
|
|
|
syncStartTime: time.Time{}, |
|
|
|
lastSyncRate: 0, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@ -129,7 +137,7 @@ func (r *BlockchainReactor) SetLogger(logger log.Logger) { |
|
|
|
// Start implements cmn.Service interface
|
|
|
|
func (r *BlockchainReactor) Start() error { |
|
|
|
r.reporter = behavior.NewSwitchReporter(r.BaseReactor.Switch) |
|
|
|
if r.fastSync { |
|
|
|
if r.fastSync.IsSet() { |
|
|
|
err := r.startSync(nil) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("failed to start fast sync: %w", err) |
|
|
@ -175,7 +183,13 @@ func (r *BlockchainReactor) endSync() { |
|
|
|
func (r *BlockchainReactor) SwitchToFastSync(state state.State) error { |
|
|
|
r.stateSynced = true |
|
|
|
state = state.Copy() |
|
|
|
return r.startSync(&state) |
|
|
|
|
|
|
|
err := r.startSync(&state) |
|
|
|
if err == nil { |
|
|
|
r.syncStartTime = time.Now() |
|
|
|
} |
|
|
|
|
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
// reactor generated ticker events:
|
|
|
@ -283,7 +297,6 @@ func (e bcResetState) String() string { |
|
|
|
|
|
|
|
// Takes the channel as a parameter to avoid race conditions on r.events.
|
|
|
|
func (r *BlockchainReactor) demux(events <-chan Event) { |
|
|
|
var lastRate = 0.0 |
|
|
|
var lastHundred = time.Now() |
|
|
|
|
|
|
|
var ( |
|
|
@ -414,10 +427,15 @@ func (r *BlockchainReactor) demux(events <-chan Event) { |
|
|
|
switch event := event.(type) { |
|
|
|
case pcBlockProcessed: |
|
|
|
r.setSyncHeight(event.height) |
|
|
|
if r.syncHeight%100 == 0 { |
|
|
|
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) |
|
|
|
if (r.syncHeight-r.syncStartHeight)%100 == 0 { |
|
|
|
newSyncRate := 100 / time.Since(lastHundred).Seconds() |
|
|
|
if r.lastSyncRate == 0 { |
|
|
|
r.lastSyncRate = newSyncRate |
|
|
|
} else { |
|
|
|
r.lastSyncRate = 0.9*r.lastSyncRate + 0.1*newSyncRate |
|
|
|
} |
|
|
|
r.logger.Info("Fast Sync Rate", "height", r.syncHeight, |
|
|
|
"max_peer_height", r.maxPeerHeight, "blocks/s", lastRate) |
|
|
|
"max_peer_height", r.maxPeerHeight, "blocks/s", r.lastSyncRate) |
|
|
|
lastHundred = time.Now() |
|
|
|
} |
|
|
|
r.scheduler.send(event) |
|
|
@ -429,6 +447,7 @@ func (r *BlockchainReactor) demux(events <-chan Event) { |
|
|
|
r.logger.Error("Failed to switch to consensus reactor") |
|
|
|
} |
|
|
|
r.endSync() |
|
|
|
r.fastSync.UnSet() |
|
|
|
return |
|
|
|
case noOpEvent: |
|
|
|
default: |
|
|
@ -596,3 +615,29 @@ func (r *BlockchainReactor) GetMaxPeerBlockHeight() int64 { |
|
|
|
defer r.mtx.RUnlock() |
|
|
|
return r.maxPeerHeight |
|
|
|
} |
|
|
|
|
|
|
|
func (r *BlockchainReactor) GetTotalSyncedTime() time.Duration { |
|
|
|
if !r.fastSync.IsSet() || r.syncStartTime.IsZero() { |
|
|
|
return time.Duration(0) |
|
|
|
} |
|
|
|
return time.Since(r.syncStartTime) |
|
|
|
} |
|
|
|
|
|
|
|
func (r *BlockchainReactor) GetRemainingSyncTime() time.Duration { |
|
|
|
if !r.fastSync.IsSet() { |
|
|
|
return time.Duration(0) |
|
|
|
} |
|
|
|
|
|
|
|
r.mtx.RLock() |
|
|
|
defer r.mtx.RUnlock() |
|
|
|
|
|
|
|
targetSyncs := r.maxPeerHeight - r.syncStartHeight |
|
|
|
currentSyncs := r.syncHeight - r.syncStartHeight + 1 |
|
|
|
if currentSyncs < 0 || r.lastSyncRate < 0.001 { |
|
|
|
return time.Duration(0) |
|
|
|
} |
|
|
|
|
|
|
|
remain := float64(targetSyncs-currentSyncs) / r.lastSyncRate |
|
|
|
|
|
|
|
return time.Duration(int64(remain * float64(time.Second))) |
|
|
|
} |