From d4cda544aec6dd4eff3eed316594652bdf45738a Mon Sep 17 00:00:00 2001 From: JayT106 Date: Wed, 7 Jul 2021 07:26:01 -0400 Subject: [PATCH] fastsync/rpc: add TotalSyncedTime & RemainingTime to SyncInfo in /status RPC (#6620) --- CHANGELOG_PENDING.md | 1 + internal/blockchain/v0/pool.go | 44 ++++++++++++++-- internal/blockchain/v0/reactor.go | 42 +++++++++++++-- internal/blockchain/v0/reactor_test.go | 23 ++++++++ internal/blockchain/v2/reactor.go | 73 +++++++++++++++++++++----- internal/consensus/reactor.go | 8 +++ libs/sync/atomic_bool.go | 33 ++++++++++++ libs/sync/atomic_bool_test.go | 27 ++++++++++ rpc/client/mock/status_test.go | 7 +++ rpc/core/status.go | 2 + rpc/core/types/responses.go | 3 ++ rpc/openapi/openapi.yaml | 6 +++ 12 files changed, 245 insertions(+), 24 deletions(-) create mode 100644 libs/sync/atomic_bool.go create mode 100644 libs/sync/atomic_bool_test.go diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 55eb55b1a..5a742a779 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -22,6 +22,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi - [config] \#6462 Move `PrivValidator` configuration out of `BaseConfig` into its own section. (@tychoish) - [rpc] \#6610 Add MaxPeerBlockHeight into /status rpc call (@JayT106) - [libs/CList] \#6626 Automatically detach the prev/next elements in Remove function (@JayT106) + - [fastsync/rpc] \#6620 Add TotalSyncedTime & RemainingTime to SyncInfo in /status RPC (@JayT106) - Apps - [ABCI] \#6408 Change the `key` and `value` fields from `[]byte` to `string` in the `EventAttribute` type. (@alexanderbez) diff --git a/internal/blockchain/v0/pool.go b/internal/blockchain/v0/pool.go index 1ff216a00..c9c4030a8 100644 --- a/internal/blockchain/v0/pool.go +++ b/internal/blockchain/v0/pool.go @@ -83,6 +83,10 @@ type BlockPool struct { requestsCh chan<- BlockRequest errorsCh chan<- peerError + + startHeight int64 + lastHundredBlockTimeStamp time.Time + lastSyncRate float64 } // NewBlockPool returns a new BlockPool with the height equal to start. Block @@ -91,12 +95,14 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p bp := &BlockPool{ peers: make(map[types.NodeID]*bpPeer), - requesters: make(map[int64]*bpRequester), - height: start, - numPending: 0, + requesters: make(map[int64]*bpRequester), + height: start, + startHeight: start, + numPending: 0, - requestsCh: requestsCh, - errorsCh: errorsCh, + requestsCh: requestsCh, + errorsCh: errorsCh, + lastSyncRate: 0, } bp.BaseService = *service.NewBaseService(nil, "BlockPool", bp) return bp @@ -106,6 +112,7 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p // pool's start time. func (pool *BlockPool) OnStart() error { pool.lastAdvance = time.Now() + pool.lastHundredBlockTimeStamp = pool.lastAdvance go pool.makeRequestersRoutine() return nil } @@ -216,6 +223,19 @@ func (pool *BlockPool) PopRequest() { delete(pool.requesters, pool.height) pool.height++ pool.lastAdvance = time.Now() + + // the lastSyncRate will be updated every 100 blocks, it uses the adaptive filter + // to smooth the block sync rate and the unit represents the number of blocks per second. + if (pool.height-pool.startHeight)%100 == 0 { + newSyncRate := 100 / time.Since(pool.lastHundredBlockTimeStamp).Seconds() + if pool.lastSyncRate == 0 { + pool.lastSyncRate = newSyncRate + } else { + pool.lastSyncRate = 0.9*pool.lastSyncRate + 0.1*newSyncRate + } + pool.lastHundredBlockTimeStamp = time.Now() + } + } else { panic(fmt.Sprintf("Expected requester to pop, got nothing at height %v", pool.height)) } @@ -428,6 +448,20 @@ func (pool *BlockPool) debug() string { return str } +func (pool *BlockPool) targetSyncBlocks() int64 { + pool.mtx.RLock() + defer pool.mtx.RUnlock() + + return pool.maxPeerHeight - pool.startHeight + 1 +} + +func (pool *BlockPool) getLastSyncRate() float64 { + pool.mtx.RLock() + defer pool.mtx.RUnlock() + + return pool.lastSyncRate +} + //------------------------------------- type bpPeer struct { diff --git a/internal/blockchain/v0/reactor.go b/internal/blockchain/v0/reactor.go index 9a9584857..6d0c76968 100644 --- a/internal/blockchain/v0/reactor.go +++ b/internal/blockchain/v0/reactor.go @@ -11,6 +11,7 @@ import ( "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" + tmSync "github.com/tendermint/tendermint/libs/sync" bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/store" @@ -84,7 +85,7 @@ type Reactor struct { store *store.BlockStore pool *BlockPool consReactor consensusReactor - fastSync bool + fastSync *tmSync.AtomicBool blockchainCh *p2p.Channel peerUpdates *p2p.PeerUpdates @@ -100,6 +101,8 @@ type Reactor struct { poolWG sync.WaitGroup metrics *cons.Metrics + + syncStartTime time.Time } // NewReactor returns new reactor instance. @@ -132,7 +135,7 @@ func NewReactor( store: store, pool: NewBlockPool(startHeight, requestsCh, errorsCh), consReactor: consReactor, - fastSync: fastSync, + fastSync: tmSync.NewBool(fastSync), requestsCh: requestsCh, errorsCh: errorsCh, blockchainCh: blockchainCh, @@ -140,6 +143,7 @@ func NewReactor( peerUpdatesCh: make(chan p2p.Envelope), closeCh: make(chan struct{}), metrics: metrics, + syncStartTime: time.Time{}, } r.BaseService = *service.NewBaseService(logger, "Blockchain", r) @@ -154,7 +158,7 @@ func NewReactor( // If fastSync is enabled, we also start the pool and the pool processing // goroutine. If the pool fails to start, an error is returned. func (r *Reactor) OnStart() error { - if r.fastSync { + if r.fastSync.IsSet() { if err := r.pool.Start(); err != nil { return err } @@ -172,7 +176,7 @@ func (r *Reactor) OnStart() error { // OnStop stops the reactor by signaling to all spawned goroutines to exit and // blocking until they all exit. func (r *Reactor) OnStop() { - if r.fastSync { + if r.fastSync.IsSet() { if err := r.pool.Stop(); err != nil { r.Logger.Error("failed to stop pool", "err", err) } @@ -363,7 +367,7 @@ func (r *Reactor) processPeerUpdates() { // SwitchToFastSync is called by the state sync reactor when switching to fast // sync. func (r *Reactor) SwitchToFastSync(state sm.State) error { - r.fastSync = true + r.fastSync.Set() r.initialState = state r.pool.height = state.LastBlockHeight + 1 @@ -371,6 +375,8 @@ func (r *Reactor) SwitchToFastSync(state sm.State) error { return err } + r.syncStartTime = time.Now() + r.poolWG.Add(1) go r.poolRoutine(true) @@ -483,6 +489,8 @@ FOR_LOOP: r.Logger.Error("failed to stop pool", "err", err) } + r.fastSync.UnSet() + if r.consReactor != nil { r.consReactor.SwitchToConsensus(state, blocksSynced > 0 || stateSynced) } @@ -597,3 +605,27 @@ FOR_LOOP: func (r *Reactor) GetMaxPeerBlockHeight() int64 { return r.pool.MaxPeerHeight() } + +func (r *Reactor) GetTotalSyncedTime() time.Duration { + if !r.fastSync.IsSet() || r.syncStartTime.IsZero() { + return time.Duration(0) + } + return time.Since(r.syncStartTime) +} + +func (r *Reactor) GetRemainingSyncTime() time.Duration { + if !r.fastSync.IsSet() { + return time.Duration(0) + } + + targetSyncs := r.pool.targetSyncBlocks() + currentSyncs := r.store.Height() - r.pool.startHeight + 1 + lastSyncRate := r.pool.getLastSyncRate() + if currentSyncs < 0 || lastSyncRate < 0.001 { + return time.Duration(0) + } + + remain := float64(targetSyncs-currentSyncs) / lastSyncRate + + return time.Duration(int64(remain * float64(time.Second))) +} diff --git a/internal/blockchain/v0/reactor_test.go b/internal/blockchain/v0/reactor_test.go index cb163721b..1c9dc60c4 100644 --- a/internal/blockchain/v0/reactor_test.go +++ b/internal/blockchain/v0/reactor_test.go @@ -215,6 +215,29 @@ func TestReactor_AbruptDisconnect(t *testing.T) { rts.network.Nodes[rts.nodes[1]].PeerManager.Disconnected(rts.nodes[0]) } +func TestReactor_SyncTime(t *testing.T) { + config := cfg.ResetTestRoot("blockchain_reactor_test") + defer os.RemoveAll(config.RootDir) + + genDoc, privVals := factory.RandGenesisDoc(config, 1, false, 30) + maxBlockHeight := int64(101) + + rts := setup(t, genDoc, privVals[0], []int64{maxBlockHeight, 0}, 0) + require.Equal(t, maxBlockHeight, rts.reactors[rts.nodes[0]].store.Height()) + rts.start(t) + + require.Eventually( + t, + func() bool { + return rts.reactors[rts.nodes[1]].GetRemainingSyncTime() > time.Nanosecond && + rts.reactors[rts.nodes[1]].pool.getLastSyncRate() > 0.001 + }, + 10*time.Second, + 10*time.Millisecond, + "expected node to be partially synced", + ) +} + func TestReactor_NoBlockResponse(t *testing.T) { config := cfg.ResetTestRoot("blockchain_reactor_test") defer os.RemoveAll(config.RootDir) diff --git a/internal/blockchain/v2/reactor.go b/internal/blockchain/v2/reactor.go index 7e15d9675..af18f3c0a 100644 --- a/internal/blockchain/v2/reactor.go +++ b/internal/blockchain/v2/reactor.go @@ -13,6 +13,7 @@ import ( tmsync "github.com/tendermint/tendermint/internal/libs/sync" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/sync" bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain" "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" @@ -34,8 +35,8 @@ type blockStore interface { type BlockchainReactor struct { p2p.BaseReactor - fastSync bool // if true, enable fast sync on start - stateSynced bool // set to true when SwitchToFastSync is called by state sync + fastSync *sync.AtomicBool // enable fast sync on start when it's been Set + stateSynced bool // set to true when SwitchToFastSync is called by state sync scheduler *Routine processor *Routine logger log.Logger @@ -48,6 +49,10 @@ type BlockchainReactor struct { reporter behavior.Reporter io iIO store blockStore + + syncStartTime time.Time + syncStartHeight int64 + lastSyncRate float64 // # blocks sync per sec base on the last 100 blocks } type blockApplier interface { @@ -68,12 +73,15 @@ func newReactor(state state.State, store blockStore, reporter behavior.Reporter, processor := newPcState(pContext) return &BlockchainReactor{ - scheduler: newRoutine("scheduler", scheduler.handle, chBufferSize), - processor: newRoutine("processor", processor.handle, chBufferSize), - store: store, - reporter: reporter, - logger: log.NewNopLogger(), - fastSync: fastSync, + scheduler: newRoutine("scheduler", scheduler.handle, chBufferSize), + processor: newRoutine("processor", processor.handle, chBufferSize), + store: store, + reporter: reporter, + logger: log.NewNopLogger(), + fastSync: sync.NewBool(fastSync), + syncStartHeight: initHeight, + syncStartTime: time.Time{}, + lastSyncRate: 0, } } @@ -129,7 +137,7 @@ func (r *BlockchainReactor) SetLogger(logger log.Logger) { // Start implements cmn.Service interface func (r *BlockchainReactor) Start() error { r.reporter = behavior.NewSwitchReporter(r.BaseReactor.Switch) - if r.fastSync { + if r.fastSync.IsSet() { err := r.startSync(nil) if err != nil { return fmt.Errorf("failed to start fast sync: %w", err) @@ -175,7 +183,13 @@ func (r *BlockchainReactor) endSync() { func (r *BlockchainReactor) SwitchToFastSync(state state.State) error { r.stateSynced = true state = state.Copy() - return r.startSync(&state) + + err := r.startSync(&state) + if err == nil { + r.syncStartTime = time.Now() + } + + return err } // reactor generated ticker events: @@ -283,7 +297,6 @@ func (e bcResetState) String() string { // Takes the channel as a parameter to avoid race conditions on r.events. func (r *BlockchainReactor) demux(events <-chan Event) { - var lastRate = 0.0 var lastHundred = time.Now() var ( @@ -414,10 +427,15 @@ func (r *BlockchainReactor) demux(events <-chan Event) { switch event := event.(type) { case pcBlockProcessed: r.setSyncHeight(event.height) - if r.syncHeight%100 == 0 { - lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) + if (r.syncHeight-r.syncStartHeight)%100 == 0 { + newSyncRate := 100 / time.Since(lastHundred).Seconds() + if r.lastSyncRate == 0 { + r.lastSyncRate = newSyncRate + } else { + r.lastSyncRate = 0.9*r.lastSyncRate + 0.1*newSyncRate + } r.logger.Info("Fast Sync Rate", "height", r.syncHeight, - "max_peer_height", r.maxPeerHeight, "blocks/s", lastRate) + "max_peer_height", r.maxPeerHeight, "blocks/s", r.lastSyncRate) lastHundred = time.Now() } r.scheduler.send(event) @@ -429,6 +447,7 @@ func (r *BlockchainReactor) demux(events <-chan Event) { r.logger.Error("Failed to switch to consensus reactor") } r.endSync() + r.fastSync.UnSet() return case noOpEvent: default: @@ -596,3 +615,29 @@ func (r *BlockchainReactor) GetMaxPeerBlockHeight() int64 { defer r.mtx.RUnlock() return r.maxPeerHeight } + +func (r *BlockchainReactor) GetTotalSyncedTime() time.Duration { + if !r.fastSync.IsSet() || r.syncStartTime.IsZero() { + return time.Duration(0) + } + return time.Since(r.syncStartTime) +} + +func (r *BlockchainReactor) GetRemainingSyncTime() time.Duration { + if !r.fastSync.IsSet() { + return time.Duration(0) + } + + r.mtx.RLock() + defer r.mtx.RUnlock() + + targetSyncs := r.maxPeerHeight - r.syncStartHeight + currentSyncs := r.syncHeight - r.syncStartHeight + 1 + if currentSyncs < 0 || r.lastSyncRate < 0.001 { + return time.Duration(0) + } + + remain := float64(targetSyncs-currentSyncs) / r.lastSyncRate + + return time.Duration(int64(remain * float64(time.Second))) +} diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index ef3494e32..864338154 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -102,6 +102,14 @@ type FastSyncReactor interface { SwitchToFastSync(sm.State) error GetMaxPeerBlockHeight() int64 + + // GetTotalSyncedTime returns the time duration since the fastsync starting. + GetTotalSyncedTime() time.Duration + + // GetRemainingSyncTime returns the estimating time the node will be fully synced, + // if will return 0 if the fastsync does not perform or the number of block synced is + // too small (less than 100). + GetRemainingSyncTime() time.Duration } // Reactor defines a reactor for the consensus service. diff --git a/libs/sync/atomic_bool.go b/libs/sync/atomic_bool.go new file mode 100644 index 000000000..1a530b596 --- /dev/null +++ b/libs/sync/atomic_bool.go @@ -0,0 +1,33 @@ +package sync + +import "sync/atomic" + +// AtomicBool is an atomic Boolean. +// Its methods are all atomic, thus safe to be called by multiple goroutines simultaneously. +// Note: When embedding into a struct one should always use *AtomicBool to avoid copy. +// it's a simple implmentation from https://github.com/tevino/abool +type AtomicBool int32 + +// NewBool creates an AtomicBool with given default value. +func NewBool(ok bool) *AtomicBool { + ab := new(AtomicBool) + if ok { + ab.Set() + } + return ab +} + +// Set sets the Boolean to true. +func (ab *AtomicBool) Set() { + atomic.StoreInt32((*int32)(ab), 1) +} + +// UnSet sets the Boolean to false. +func (ab *AtomicBool) UnSet() { + atomic.StoreInt32((*int32)(ab), 0) +} + +// IsSet returns whether the Boolean is true. +func (ab *AtomicBool) IsSet() bool { + return atomic.LoadInt32((*int32)(ab))&1 == 1 +} diff --git a/libs/sync/atomic_bool_test.go b/libs/sync/atomic_bool_test.go new file mode 100644 index 000000000..9531815e8 --- /dev/null +++ b/libs/sync/atomic_bool_test.go @@ -0,0 +1,27 @@ +package sync + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDefaultValue(t *testing.T) { + t.Parallel() + v := NewBool(false) + assert.False(t, v.IsSet()) + + v = NewBool(true) + assert.True(t, v.IsSet()) +} + +func TestSetUnSet(t *testing.T) { + t.Parallel() + v := NewBool(false) + + v.Set() + assert.True(t, v.IsSet()) + + v.UnSet() + assert.False(t, v.IsSet()) +} diff --git a/rpc/client/mock/status_test.go b/rpc/client/mock/status_test.go index 2a7474aa6..3933c33c9 100644 --- a/rpc/client/mock/status_test.go +++ b/rpc/client/mock/status_test.go @@ -3,6 +3,7 @@ package mock_test import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -23,6 +24,8 @@ func TestStatus(t *testing.T) { LatestAppHash: bytes.HexBytes("app"), LatestBlockHeight: 10, MaxPeerBlockHeight: 20, + TotalSyncedTime: time.Second, + RemainingTime: time.Minute, }, }}, } @@ -36,6 +39,8 @@ func TestStatus(t *testing.T) { assert.EqualValues("block", status.SyncInfo.LatestBlockHash) assert.EqualValues(10, status.SyncInfo.LatestBlockHeight) assert.EqualValues(20, status.SyncInfo.MaxPeerBlockHeight) + assert.EqualValues(time.Second, status.SyncInfo.TotalSyncedTime) + assert.EqualValues(time.Minute, status.SyncInfo.RemainingTime) // make sure recorder works properly require.Equal(1, len(r.Calls)) @@ -49,4 +54,6 @@ func TestStatus(t *testing.T) { assert.EqualValues("block", st.SyncInfo.LatestBlockHash) assert.EqualValues(10, st.SyncInfo.LatestBlockHeight) assert.EqualValues(20, st.SyncInfo.MaxPeerBlockHeight) + assert.EqualValues(time.Second, status.SyncInfo.TotalSyncedTime) + assert.EqualValues(time.Minute, status.SyncInfo.RemainingTime) } diff --git a/rpc/core/status.go b/rpc/core/status.go index 24424a5cf..9febef195 100644 --- a/rpc/core/status.go +++ b/rpc/core/status.go @@ -71,6 +71,8 @@ func (env *Environment) Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, err EarliestBlockTime: time.Unix(0, earliestBlockTimeNano), MaxPeerBlockHeight: env.FastSyncReactor.GetMaxPeerBlockHeight(), CatchingUp: env.ConsensusReactor.WaitSync(), + TotalSyncedTime: env.FastSyncReactor.GetTotalSyncedTime(), + RemainingTime: env.FastSyncReactor.GetRemainingSyncTime(), }, ValidatorInfo: validatorInfo, } diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 7bf5eea09..e5ac55f04 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -98,6 +98,9 @@ type SyncInfo struct { MaxPeerBlockHeight int64 `json:"max_peer_block_height"` CatchingUp bool `json:"catching_up"` + + TotalSyncedTime time.Duration `json:"total_synced_time"` + RemainingTime time.Duration `json:"remaining_time"` } // Info about the node's validator diff --git a/rpc/openapi/openapi.yaml b/rpc/openapi/openapi.yaml index 6a2c16d19..a2cbd62da 100644 --- a/rpc/openapi/openapi.yaml +++ b/rpc/openapi/openapi.yaml @@ -1330,6 +1330,12 @@ components: catching_up: type: boolean example: false + total_synced_time: + type: string + example: "1000000000" + remaining_time: + type: string + example: "0" ValidatorInfo: type: object properties: