diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 5a7b024cf..9c3add9b8 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -99,6 +99,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi - Transactions are gossiped in FIFO order as they are in `v0`. - [config/indexer] \#6411 Introduce support for custom event indexing data sources, specifically PostgreSQL. (@JayT106) - [fastsync/event] \#6619 Emit fastsync status event when switching consensus/fastsync (@JayT106) +- [statesync/event] \#6700 Emit statesync status start/end event (@JayT106) ### IMPROVEMENTS - [libs/log] Console log formatting changes as a result of \#6534 and \#6589. (@tychoish) diff --git a/docs/tendermint-core/state-sync.md b/docs/tendermint-core/state-sync.md index 38bf8bf33..623de4953 100644 --- a/docs/tendermint-core/state-sync.md +++ b/docs/tendermint-core/state-sync.md @@ -9,3 +9,10 @@ With state sync your node will download data related to the head or near the hea This leads to drastically shorter times for joining a network. Information on how to configure state sync is located in the [nodes section](../nodes/state-sync.md) + +## Events + +When a node starts with the statesync flag enabled in the config file, it will emit two events: one upon starting statesync and the other upon completion. + +The user can query the events by subscribing `EventQueryStateSyncStatus` +Please check [types](https://pkg.go.dev/github.com/tendermint/tendermint/types?utm_source=godoc#pkg-constants) for the details. \ No newline at end of file diff --git a/internal/consensus/mocks/cons_sync_reactor.go b/internal/consensus/mocks/cons_sync_reactor.go new file mode 100644 index 000000000..fcaa5696d --- /dev/null +++ b/internal/consensus/mocks/cons_sync_reactor.go @@ -0,0 +1,28 @@ +// Code generated by mockery 2.7.5. DO NOT EDIT. + +package mocks + +import ( + mock "github.com/stretchr/testify/mock" + state "github.com/tendermint/tendermint/state" +) + +// ConsSyncReactor is an autogenerated mock type for the ConsSyncReactor type +type ConsSyncReactor struct { + mock.Mock +} + +// SetFastSyncingMetrics provides a mock function with given fields: _a0 +func (_m *ConsSyncReactor) SetFastSyncingMetrics(_a0 float64) { + _m.Called(_a0) +} + +// SetStateSyncingMetrics provides a mock function with given fields: _a0 +func (_m *ConsSyncReactor) SetStateSyncingMetrics(_a0 float64) { + _m.Called(_a0) +} + +// SwitchToConsensus provides a mock function with given fields: _a0, _a1 +func (_m *ConsSyncReactor) SwitchToConsensus(_a0 state.State, _a1 bool) { + _m.Called(_a0, _a1) +} diff --git a/internal/consensus/mocks/fast_sync_reactor.go b/internal/consensus/mocks/fast_sync_reactor.go new file mode 100644 index 000000000..f6a7fabbc --- /dev/null +++ b/internal/consensus/mocks/fast_sync_reactor.go @@ -0,0 +1,71 @@ +// Code generated by mockery 2.7.5. DO NOT EDIT. + +package mocks + +import ( + mock "github.com/stretchr/testify/mock" + state "github.com/tendermint/tendermint/state" + + time "time" +) + +// FastSyncReactor is an autogenerated mock type for the FastSyncReactor type +type FastSyncReactor struct { + mock.Mock +} + +// GetMaxPeerBlockHeight provides a mock function with given fields: +func (_m *FastSyncReactor) GetMaxPeerBlockHeight() int64 { + ret := _m.Called() + + var r0 int64 + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} + +// GetRemainingSyncTime provides a mock function with given fields: +func (_m *FastSyncReactor) GetRemainingSyncTime() time.Duration { + ret := _m.Called() + + var r0 time.Duration + if rf, ok := ret.Get(0).(func() time.Duration); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(time.Duration) + } + + return r0 +} + +// GetTotalSyncedTime provides a mock function with given fields: +func (_m *FastSyncReactor) GetTotalSyncedTime() time.Duration { + ret := _m.Called() + + var r0 time.Duration + if rf, ok := ret.Get(0).(func() time.Duration); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(time.Duration) + } + + return r0 +} + +// SwitchToFastSync provides a mock function with given fields: _a0 +func (_m *FastSyncReactor) SwitchToFastSync(_a0 state.State) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(state.State) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 1e5399f52..e01a6f329 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -112,6 +112,14 @@ type FastSyncReactor interface { GetRemainingSyncTime() time.Duration } +//go:generate mockery --case underscore --name ConsSyncReactor +// ConsSyncReactor defines an interface used for testing abilities of node.startStateSync. +type ConsSyncReactor interface { + SwitchToConsensus(sm.State, bool) + SetStateSyncingMetrics(float64) + SetFastSyncingMetrics(float64) +} + // Reactor defines a reactor for the consensus service. type Reactor struct { service.BaseService @@ -1429,3 +1437,11 @@ func (r *Reactor) peerStatsRoutine() { func (r *Reactor) GetConsensusState() *State { return r.state } + +func (r *Reactor) SetStateSyncingMetrics(v float64) { + r.Metrics.StateSyncing.Set(v) +} + +func (r *Reactor) SetFastSyncingMetrics(v float64) { + r.Metrics.FastSyncing.Set(v) +} diff --git a/internal/statesync/mock_sync_reactor.go b/internal/statesync/mock_sync_reactor.go new file mode 100644 index 000000000..6688ce4d2 --- /dev/null +++ b/internal/statesync/mock_sync_reactor.go @@ -0,0 +1,50 @@ +package statesync + +import ( + "context" + "time" + + mock "github.com/stretchr/testify/mock" + state "github.com/tendermint/tendermint/state" +) + +// MockSyncReactor is an autogenerated mock type for the SyncReactor type. +// Because of the stateprovider uses in Sync(), we use package statesync instead of mocks. +type MockSyncReactor struct { + mock.Mock +} + +// Backfill provides a mock function with given fields: _a0 +func (_m *MockSyncReactor) Backfill(_a0 state.State) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(state.State) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Sync provides a mock function with given fields: _a0, _a1, _a2 +func (_m *MockSyncReactor) Sync(_a0 context.Context, _a1 StateProvider, _a2 time.Duration) (state.State, error) { + ret := _m.Called(_a0, _a1, _a2) + + var r0 state.State + if rf, ok := ret.Get(0).(func(context.Context, StateProvider, time.Duration) state.State); ok { + r0 = rf(_a0, _a1, _a2) + } else { + r0 = ret.Get(0).(state.State) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, StateProvider, time.Duration) error); ok { + r1 = rf(_a0, _a1, _a2) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 3921e477e..59cbabd14 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -102,6 +102,12 @@ const ( maxLightBlockRequestRetries = 20 ) +// SyncReactor defines an interface used for testing abilities of node.startStateSync. +type SyncReactor interface { + Sync(context.Context, StateProvider, time.Duration) (sm.State, error) + Backfill(sm.State) error +} + // Reactor handles state sync, both restoring snapshots for the local node and // serving snapshots for other nodes. type Reactor struct { @@ -222,6 +228,11 @@ func (r *Reactor) Sync( return sm.State{}, errors.New("a state sync is already in progress") } + if stateProvider == nil { + r.mtx.Unlock() + return sm.State{}, errors.New("the stateProvider should not be nil when doing the state sync") + } + r.syncer = newSyncer( r.cfg, r.Logger, diff --git a/node/node.go b/node/node.go index 2d994b189..cb32c5ce6 100644 --- a/node/node.go +++ b/node/node.go @@ -63,26 +63,25 @@ type nodeImpl struct { isListening bool // services - eventBus *types.EventBus // pub/sub for services - stateStore sm.Store - blockStore *store.BlockStore // store the blockchain to disk - bcReactor service.Service // for fast-syncing - mempoolReactor service.Service // for gossipping transactions - mempool mempool.Mempool - stateSync bool // whether the node should state sync on startup - stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots - stateSyncProvider statesync.StateProvider // provides state data for bootstrapping a node - consensusState *cs.State // latest consensus state - consensusReactor *cs.Reactor // for participating in the consensus - pexReactor *pex.Reactor // for exchanging peer addresses - pexReactorV2 *pex.ReactorV2 // for exchanging peer addresses - evidenceReactor *evidence.Reactor - evidencePool *evidence.Pool // tracking evidence - proxyApp proxy.AppConns // connection to the application - rpcListeners []net.Listener // rpc servers - eventSinks []indexer.EventSink - indexerService *indexer.Service - prometheusSrv *http.Server + eventBus *types.EventBus // pub/sub for services + stateStore sm.Store + blockStore *store.BlockStore // store the blockchain to disk + bcReactor service.Service // for fast-syncing + mempoolReactor service.Service // for gossipping transactions + mempool mempool.Mempool + stateSync bool // whether the node should state sync on startup + stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots + consensusState *cs.State // latest consensus state + consensusReactor *cs.Reactor // for participating in the consensus + pexReactor *pex.Reactor // for exchanging peer addresses + pexReactorV2 *pex.ReactorV2 // for exchanging peer addresses + evidenceReactor *evidence.Reactor + evidencePool *evidence.Pool // tracking evidence + proxyApp proxy.AppConns // connection to the application + rpcListeners []net.Listener // rpc servers + eventSinks []indexer.EventSink + indexerService *indexer.Service + prometheusSrv *http.Server } // newDefaultNode returns a Tendermint node with default settings for the @@ -663,9 +662,15 @@ func (n *nodeImpl) OnStart() error { return fmt.Errorf("unable to derive state: %w", err) } - err = startStateSync(n.stateSyncReactor, bcR, n.consensusReactor, n.stateSyncProvider, - n.config.StateSync, n.config.FastSyncMode, n.stateStore, n.blockStore, state, n.eventBus) + ssc := n.config.StateSync + sp, err := constructStateProvider(ssc, state, n.Logger.With("module", "light")) + if err != nil { + return fmt.Errorf("failed to set up light client state provider: %w", err) + } + + if err := startStateSync(n.stateSyncReactor, bcR, n.consensusReactor, sp, + ssc, n.config.FastSyncMode, state.InitialHeight, n.eventBus); err != nil { return fmt.Errorf("failed to start state sync: %w", err) } } @@ -1027,54 +1032,57 @@ func (n *nodeImpl) NodeInfo() types.NodeInfo { } // startStateSync starts an asynchronous state sync process, then switches to fast sync mode. -func startStateSync(ssR *statesync.Reactor, bcR cs.FastSyncReactor, conR *cs.Reactor, - stateProvider statesync.StateProvider, config *cfg.StateSyncConfig, fastSync bool, - stateStore sm.Store, blockStore *store.BlockStore, state sm.State, eventbus *types.EventBus) error { - ssR.Logger.Info("starting state sync...") - - if stateProvider == nil { - var err error - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - stateProvider, err = statesync.NewLightClientStateProvider( - ctx, - state.ChainID, state.Version, state.InitialHeight, - config.RPCServers, light.TrustOptions{ - Period: config.TrustPeriod, - Height: config.TrustHeight, - Hash: config.TrustHashBytes(), - }, ssR.Logger.With("module", "light")) - if err != nil { - return fmt.Errorf("failed to set up light client state provider: %w", err) - } +func startStateSync( + ssR statesync.SyncReactor, + bcR cs.FastSyncReactor, + conR cs.ConsSyncReactor, + sp statesync.StateProvider, + config *cfg.StateSyncConfig, + fastSync bool, + stateInitHeight int64, + eb *types.EventBus, +) error { + stateSyncLogger := eb.Logger.With("module", "statesync") + + stateSyncLogger.Info("starting state sync...") + + // at the beginning of the statesync start, we use the initialHeight as the event height + // because of the statesync doesn't have the concreate state height before fetched the snapshot. + d := types.EventDataStateSyncStatus{Complete: false, Height: stateInitHeight} + if err := eb.PublishEventStateSyncStatus(d); err != nil { + stateSyncLogger.Error("failed to emit the statesync start event", "err", err) } go func() { - state, err := ssR.Sync(context.TODO(), stateProvider, config.DiscoveryTime) + state, err := ssR.Sync(context.TODO(), sp, config.DiscoveryTime) if err != nil { - ssR.Logger.Error("state sync failed", "err", err) + stateSyncLogger.Error("state sync failed", "err", err) return } - err = ssR.Backfill(state) - if err != nil { - ssR.Logger.Error("backfill failed; node has insufficient history to verify all evidence;"+ + if err := ssR.Backfill(state); err != nil { + stateSyncLogger.Error("backfill failed; node has insufficient history to verify all evidence;"+ " proceeding optimistically...", "err", err) } - conR.Metrics.StateSyncing.Set(0) + conR.SetStateSyncingMetrics(0) + + d := types.EventDataStateSyncStatus{Complete: true, Height: state.LastBlockHeight} + if err := eb.PublishEventStateSyncStatus(d); err != nil { + stateSyncLogger.Error("failed to emit the statesync start event", "err", err) + } + if fastSync { // FIXME Very ugly to have these metrics bleed through here. - conR.Metrics.FastSyncing.Set(1) - err = bcR.SwitchToFastSync(state) - if err != nil { - ssR.Logger.Error("failed to switch to fast sync", "err", err) + conR.SetFastSyncingMetrics(1) + if err := bcR.SwitchToFastSync(state); err != nil { + stateSyncLogger.Error("failed to switch to fast sync", "err", err) return } d := types.EventDataFastSyncStatus{Complete: false, Height: state.LastBlockHeight} - if err := eventbus.PublishEventFastSyncStatus(d); err != nil { - ssR.Logger.Error("failed to emit the fastsync starting event", "err", err) + if err := eb.PublishEventFastSyncStatus(d); err != nil { + stateSyncLogger.Error("failed to emit the fastsync starting event", "err", err) } } else { @@ -1266,3 +1274,24 @@ func getChannelsFromShim(reactorShim *p2p.ReactorShim) map[p2p.ChannelID]*p2p.Ch return channels } + +func constructStateProvider( + ssc *cfg.StateSyncConfig, + state sm.State, + logger log.Logger, +) (statesync.StateProvider, error) { + ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) + defer cancel() + + to := light.TrustOptions{ + Period: ssc.TrustPeriod, + Height: ssc.TrustHeight, + Hash: ssc.TrustHashBytes(), + } + + return statesync.NewLightClientStateProvider( + ctx, + state.ChainID, state.Version, state.InitialHeight, + ssc.RPCServers, to, logger, + ) +} diff --git a/node/node_test.go b/node/node_test.go index d619d22d9..eca622bd3 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -21,11 +21,16 @@ import ( "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/crypto/tmhash" + consmocks "github.com/tendermint/tendermint/internal/consensus/mocks" + ssmocks "github.com/tendermint/tendermint/internal/statesync/mocks" + "github.com/tendermint/tendermint/internal/evidence" "github.com/tendermint/tendermint/internal/mempool" mempoolv0 "github.com/tendermint/tendermint/internal/mempool/v0" + statesync "github.com/tendermint/tendermint/internal/statesync" "github.com/tendermint/tendermint/internal/test/factory" "github.com/tendermint/tendermint/libs/log" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" tmrand "github.com/tendermint/tendermint/libs/rand" tmtime "github.com/tendermint/tendermint/libs/time" "github.com/tendermint/tendermint/privval" @@ -626,6 +631,12 @@ func state(nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) { } func TestLoadStateFromGenesis(t *testing.T) { + _ = loadStatefromGenesis(t) +} + +func loadStatefromGenesis(t *testing.T) sm.State { + t.Helper() + stateDB := dbm.NewMemDB() stateStore := sm.NewStore(stateDB) config := cfg.ResetTestRoot("load_state_from_genesis") @@ -642,4 +653,68 @@ func TestLoadStateFromGenesis(t *testing.T) { ) require.NoError(t, err) require.NotNil(t, state) + + return state +} + +func TestNodeStartStateSync(t *testing.T) { + mockSSR := &statesync.MockSyncReactor{} + mockFSR := &consmocks.FastSyncReactor{} + mockCSR := &consmocks.ConsSyncReactor{} + mockSP := &ssmocks.StateProvider{} + state := loadStatefromGenesis(t) + config := cfg.ResetTestRoot("load_state_from_genesis") + + eventBus, err := createAndStartEventBus(log.TestingLogger()) + defer func() { + err := eventBus.Stop() + require.NoError(t, err) + }() + + require.NoError(t, err) + require.NotNil(t, eventBus) + + sub, err := eventBus.Subscribe(context.Background(), "test-client", types.EventQueryStateSyncStatus, 10) + require.NoError(t, err) + require.NotNil(t, sub) + + cfgSS := config.StateSync + + mockSSR.On("Sync", context.TODO(), mockSP, cfgSS.DiscoveryTime).Return(state, nil). + On("Backfill", state).Return(nil) + mockCSR.On("SetStateSyncingMetrics", float64(0)).Return(). + On("SwitchToConsensus", state, true).Return() + + require.NoError(t, + startStateSync(mockSSR, mockFSR, mockCSR, mockSP, config.StateSync, false, state.InitialHeight, eventBus)) + + for cnt := 0; cnt < 2; { + select { + case <-time.After(3 * time.Second): + t.Errorf("StateSyncStatus timeout") + case msg := <-sub.Out(): + if cnt == 0 { + ensureStateSyncStatus(t, msg, false, state.InitialHeight) + cnt++ + } else { + // the state height = 0 because we are not actually update the state in this test + ensureStateSyncStatus(t, msg, true, 0) + cnt++ + } + } + } + + mockSSR.AssertNumberOfCalls(t, "Sync", 1) + mockSSR.AssertNumberOfCalls(t, "Backfill", 1) + mockCSR.AssertNumberOfCalls(t, "SetStateSyncingMetrics", 1) + mockCSR.AssertNumberOfCalls(t, "SwitchToConsensus", 1) +} + +func ensureStateSyncStatus(t *testing.T, msg tmpubsub.Message, complete bool, height int64) { + t.Helper() + status, ok := msg.Data().(types.EventDataStateSyncStatus) + + require.True(t, ok) + require.Equal(t, complete, status.Complete) + require.Equal(t, height, status.Height) } diff --git a/types/event_bus.go b/types/event_bus.go index e3a530d2e..5290181ee 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -157,6 +157,10 @@ func (b *EventBus) PublishEventFastSyncStatus(data EventDataFastSyncStatus) erro return b.Publish(EventFastSyncStatusValue, data) } +func (b *EventBus) PublishEventStateSyncStatus(data EventDataStateSyncStatus) error { + return b.Publish(EventStateSyncStatusValue, data) +} + // PublishEventTx publishes tx event with events from Result. Note it will add // predefined keys (EventTypeKey, TxHashKey). Existing events with the same keys // will be overwritten. @@ -316,3 +320,7 @@ func (NopEventBus) PublishEventValidatorSetUpdates(data EventDataValidatorSetUpd func (NopEventBus) PublishEventFastSyncStatus(data EventDataFastSyncStatus) error { return nil } + +func (NopEventBus) PublishEventStateSyncStatus(data EventDataStateSyncStatus) error { + return nil +} diff --git a/types/event_bus_test.go b/types/event_bus_test.go index 0ea51cdfe..987a10eab 100644 --- a/types/event_bus_test.go +++ b/types/event_bus_test.go @@ -372,6 +372,8 @@ func TestEventBusPublish(t *testing.T) { require.NoError(t, err) err = eventBus.PublishEventFastSyncStatus(EventDataFastSyncStatus{}) require.NoError(t, err) + err = eventBus.PublishEventStateSyncStatus(EventDataStateSyncStatus{}) + require.NoError(t, err) select { case <-done: @@ -479,6 +481,7 @@ var events = []string{ EventTimeoutWaitValue, EventVoteValue, EventFastSyncStatusValue, + EventStateSyncStatusValue, } func randEventValue() string { diff --git a/types/events.go b/types/events.go index 326a1610d..2e234d3b7 100644 --- a/types/events.go +++ b/types/events.go @@ -29,17 +29,18 @@ const ( EventCompleteProposalValue = "CompleteProposal" // The FastSyncStatus event will be emitted when the node switching // state sync mechanism between the consensus reactor and the fastsync reactor. - EventFastSyncStatusValue = "FastSyncStatus" - EventLockValue = "Lock" - EventNewRoundValue = "NewRound" - EventNewRoundStepValue = "NewRoundStep" - EventPolkaValue = "Polka" - EventRelockValue = "Relock" - EventTimeoutProposeValue = "TimeoutPropose" - EventTimeoutWaitValue = "TimeoutWait" - EventUnlockValue = "Unlock" - EventValidBlockValue = "ValidBlock" - EventVoteValue = "Vote" + EventFastSyncStatusValue = "FastSyncStatus" + EventLockValue = "Lock" + EventNewRoundValue = "NewRound" + EventNewRoundStepValue = "NewRoundStep" + EventPolkaValue = "Polka" + EventRelockValue = "Relock" + EventStateSyncStatusValue = "StateSyncStatus" + EventTimeoutProposeValue = "TimeoutPropose" + EventTimeoutWaitValue = "TimeoutWait" + EventUnlockValue = "Unlock" + EventValidBlockValue = "ValidBlock" + EventVoteValue = "Vote" ) // Pre-populated ABCI Tendermint-reserved events @@ -104,6 +105,7 @@ func init() { tmjson.RegisterType(EventDataValidatorSetUpdates{}, "tendermint/event/ValidatorSetUpdates") tmjson.RegisterType(EventDataString(""), "tendermint/event/ProposalString") tmjson.RegisterType(EventDataFastSyncStatus{}, "tendermint/event/FastSyncStatus") + tmjson.RegisterType(EventDataStateSyncStatus{}, "tendermint/event/StateSyncStatus") } // Most event messages are basic types (a block, a transaction) @@ -181,6 +183,13 @@ type EventDataFastSyncStatus struct { Height int64 `json:"height"` } +// EventDataStateSyncStatus shows the statesync status and the +// height when the node state sync mechanism changes. +type EventDataStateSyncStatus struct { + Complete bool `json:"complete"` + Height int64 `json:"height"` +} + // PUBSUB const ( @@ -219,6 +228,7 @@ var ( EventQueryValidBlock = QueryForEvent(EventValidBlockValue) EventQueryVote = QueryForEvent(EventVoteValue) EventQueryFastSyncStatus = QueryForEvent(EventFastSyncStatusValue) + EventQueryStateSyncStatus = QueryForEvent(EventStateSyncStatusValue) ) func EventQueryTxFor(tx Tx) tmpubsub.Query {