diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 2815751fc..5a7b024cf 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -98,6 +98,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi - Applications that do not specify a priority, i.e. zero, will have transactions reaped by the order in which they are received by the node. - Transactions are gossiped in FIFO order as they are in `v0`. - [config/indexer] \#6411 Introduce support for custom event indexing data sources, specifically PostgreSQL. (@JayT106) +- [fastsync/event] \#6619 Emit fastsync status event when switching consensus/fastsync (@JayT106) ### IMPROVEMENTS - [libs/log] Console log formatting changes as a result of \#6534 and \#6589. (@tychoish) diff --git a/docs/tendermint-core/fast-sync.md b/docs/tendermint-core/fast-sync.md index a36a158c8..afc668277 100644 --- a/docs/tendermint-core/fast-sync.md +++ b/docs/tendermint-core/fast-sync.md @@ -45,3 +45,13 @@ version = "v0" If we're lagging sufficiently, we should go back to fast syncing, but this is an [open issue](https://github.com/tendermint/tendermint/issues/129). + +## The Fast Sync event +When the tendermint blockchain core launches, it might switch to the `fast-sync` +mode to catch up the states to the current network best height. the core will emits +a fast-sync event to expose the current status and the sync height. Once it catched +the network best height, it will switches to the state sync mechanism and then emit +another event for exposing the fast-sync `complete` status and the state `height`. + +The user can query the events by subscribing `EventQueryFastSyncStatus` +Please check [types](https://pkg.go.dev/github.com/tendermint/tendermint/types?utm_source=godoc#pkg-constants) for the details. \ No newline at end of file diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 864338154..1e5399f52 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -304,6 +304,11 @@ conS: conR: %+v`, err, r.state, r)) } + + d := types.EventDataFastSyncStatus{Complete: true, Height: state.LastBlockHeight} + if err := r.eventBus.PublishEventFastSyncStatus(d); err != nil { + r.Logger.Error("failed to emit the fastsync complete event", "err", err) + } } // String returns a string representation of the Reactor. diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index bdfb333a1..900abc0ff 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -25,6 +25,7 @@ import ( "github.com/tendermint/tendermint/internal/p2p/p2ptest" "github.com/tendermint/tendermint/internal/test/factory" "github.com/tendermint/tendermint/libs/log" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus" sm "github.com/tendermint/tendermint/state" statemocks "github.com/tendermint/tendermint/state/mocks" @@ -42,6 +43,7 @@ type reactorTestSuite struct { states map[types.NodeID]*State reactors map[types.NodeID]*Reactor subs map[types.NodeID]types.Subscription + fastsyncSubs map[types.NodeID]types.Subscription stateChannels map[types.NodeID]*p2p.Channel dataChannels map[types.NodeID]*p2p.Channel voteChannels map[types.NodeID]*p2p.Channel @@ -58,10 +60,11 @@ func setup(t *testing.T, numNodes int, states []*State, size int) *reactorTestSu t.Helper() rts := &reactorTestSuite{ - network: p2ptest.MakeNetwork(t, p2ptest.NetworkOptions{NumNodes: numNodes}), - states: make(map[types.NodeID]*State), - reactors: make(map[types.NodeID]*Reactor, numNodes), - subs: make(map[types.NodeID]types.Subscription, numNodes), + network: p2ptest.MakeNetwork(t, p2ptest.NetworkOptions{NumNodes: numNodes}), + states: make(map[types.NodeID]*State), + reactors: make(map[types.NodeID]*Reactor, numNodes), + subs: make(map[types.NodeID]types.Subscription, numNodes), + fastsyncSubs: make(map[types.NodeID]types.Subscription, numNodes), } rts.stateChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(StateChannel), new(tmcons.Message), size) @@ -69,6 +72,8 @@ func setup(t *testing.T, numNodes int, states []*State, size int) *reactorTestSu rts.voteChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(VoteChannel), new(tmcons.Message), size) rts.voteSetBitsChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(VoteSetBitsChannel), new(tmcons.Message), size) + _, cancel := context.WithCancel(context.Background()) + i := 0 for nodeID, node := range rts.network.Nodes { state := states[i] @@ -89,9 +94,13 @@ func setup(t *testing.T, numNodes int, states []*State, size int) *reactorTestSu blocksSub, err := state.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, size) require.NoError(t, err) + fsSub, err := state.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryFastSyncStatus, size) + require.NoError(t, err) + rts.states[nodeID] = state rts.subs[nodeID] = blocksSub rts.reactors[nodeID] = reactor + rts.fastsyncSubs[nodeID] = fsSub // simulate handle initChain in handshake if state.state.LastBlockHeight == 0 { @@ -117,6 +126,7 @@ func setup(t *testing.T, numNodes int, states []*State, size int) *reactorTestSu } leaktest.Check(t) + cancel() }) return rts @@ -253,6 +263,15 @@ func waitForBlockWithUpdatedValsAndValidateIt( wg.Wait() } +func ensureFastSyncStatus(t *testing.T, msg tmpubsub.Message, complete bool, height int64) { + t.Helper() + status, ok := msg.Data().(types.EventDataFastSyncStatus) + + require.True(t, ok) + require.Equal(t, complete, status.Complete) + require.Equal(t, height, status.Height) +} + func TestReactorBasic(t *testing.T) { config := configSetup(t) @@ -275,8 +294,21 @@ func TestReactorBasic(t *testing.T) { // wait till everyone makes the first new block go func(s types.Subscription) { + defer wg.Done() <-s.Out() - wg.Done() + }(sub) + } + + wg.Wait() + + for _, sub := range rts.fastsyncSubs { + wg.Add(1) + + // wait till everyone makes the consensus switch + go func(s types.Subscription) { + defer wg.Done() + msg := <-s.Out() + ensureFastSyncStatus(t, msg, true, 0) }(sub) } diff --git a/node/node.go b/node/node.go index 5bf13f8e4..2d994b189 100644 --- a/node/node.go +++ b/node/node.go @@ -664,7 +664,7 @@ func (n *nodeImpl) OnStart() error { } err = startStateSync(n.stateSyncReactor, bcR, n.consensusReactor, n.stateSyncProvider, - n.config.StateSync, n.config.FastSyncMode, n.stateStore, n.blockStore, state) + n.config.StateSync, n.config.FastSyncMode, n.stateStore, n.blockStore, state, n.eventBus) if err != nil { return fmt.Errorf("failed to start state sync: %w", err) } @@ -1029,7 +1029,7 @@ func (n *nodeImpl) NodeInfo() types.NodeInfo { // startStateSync starts an asynchronous state sync process, then switches to fast sync mode. func startStateSync(ssR *statesync.Reactor, bcR cs.FastSyncReactor, conR *cs.Reactor, stateProvider statesync.StateProvider, config *cfg.StateSyncConfig, fastSync bool, - stateStore sm.Store, blockStore *store.BlockStore, state sm.State) error { + stateStore sm.Store, blockStore *store.BlockStore, state sm.State, eventbus *types.EventBus) error { ssR.Logger.Info("starting state sync...") if stateProvider == nil { @@ -1071,6 +1071,12 @@ func startStateSync(ssR *statesync.Reactor, bcR cs.FastSyncReactor, conR *cs.Rea ssR.Logger.Error("failed to switch to fast sync", "err", err) return } + + d := types.EventDataFastSyncStatus{Complete: false, Height: state.LastBlockHeight} + if err := eventbus.PublishEventFastSyncStatus(d); err != nil { + ssR.Logger.Error("failed to emit the fastsync starting event", "err", err) + } + } else { conR.SwitchToConsensus(state, true) } diff --git a/types/event_bus.go b/types/event_bus.go index b4ea4568b..e3a530d2e 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -153,6 +153,10 @@ func (b *EventBus) PublishEventValidBlock(data EventDataRoundState) error { return b.Publish(EventValidBlockValue, data) } +func (b *EventBus) PublishEventFastSyncStatus(data EventDataFastSyncStatus) error { + return b.Publish(EventFastSyncStatusValue, data) +} + // PublishEventTx publishes tx event with events from Result. Note it will add // predefined keys (EventTypeKey, TxHashKey). Existing events with the same keys // will be overwritten. @@ -308,3 +312,7 @@ func (NopEventBus) PublishEventLock(data EventDataRoundState) error { func (NopEventBus) PublishEventValidatorSetUpdates(data EventDataValidatorSetUpdates) error { return nil } + +func (NopEventBus) PublishEventFastSyncStatus(data EventDataFastSyncStatus) error { + return nil +} diff --git a/types/event_bus_test.go b/types/event_bus_test.go index e8a1667c6..0ea51cdfe 100644 --- a/types/event_bus_test.go +++ b/types/event_bus_test.go @@ -370,6 +370,8 @@ func TestEventBusPublish(t *testing.T) { require.NoError(t, err) err = eventBus.PublishEventValidatorSetUpdates(EventDataValidatorSetUpdates{}) require.NoError(t, err) + err = eventBus.PublishEventFastSyncStatus(EventDataFastSyncStatus{}) + require.NoError(t, err) select { case <-done: @@ -476,9 +478,11 @@ var events = []string{ EventRelockValue, EventTimeoutWaitValue, EventVoteValue, + EventFastSyncStatusValue, } func randEventValue() string { + return events[mrand.Intn(len(events))] } @@ -494,7 +498,8 @@ var queries = []tmpubsub.Query{ EventQueryLock, EventQueryRelock, EventQueryTimeoutWait, - EventQueryVote} + EventQueryVote, + EventQueryFastSyncStatus} func randQuery() tmpubsub.Query { return queries[mrand.Intn(len(queries))] diff --git a/types/events.go b/types/events.go index 9df8e8507..326a1610d 100644 --- a/types/events.go +++ b/types/events.go @@ -27,16 +27,19 @@ const ( // These are used for testing the consensus state machine. // They can also be used to build real-time consensus visualizers. EventCompleteProposalValue = "CompleteProposal" - EventLockValue = "Lock" - EventNewRoundValue = "NewRound" - EventNewRoundStepValue = "NewRoundStep" - EventPolkaValue = "Polka" - EventRelockValue = "Relock" - EventTimeoutProposeValue = "TimeoutPropose" - EventTimeoutWaitValue = "TimeoutWait" - EventUnlockValue = "Unlock" - EventValidBlockValue = "ValidBlock" - EventVoteValue = "Vote" + // The FastSyncStatus event will be emitted when the node switching + // state sync mechanism between the consensus reactor and the fastsync reactor. + EventFastSyncStatusValue = "FastSyncStatus" + EventLockValue = "Lock" + EventNewRoundValue = "NewRound" + EventNewRoundStepValue = "NewRoundStep" + EventPolkaValue = "Polka" + EventRelockValue = "Relock" + EventTimeoutProposeValue = "TimeoutPropose" + EventTimeoutWaitValue = "TimeoutWait" + EventUnlockValue = "Unlock" + EventValidBlockValue = "ValidBlock" + EventVoteValue = "Vote" ) // Pre-populated ABCI Tendermint-reserved events @@ -100,6 +103,7 @@ func init() { tmjson.RegisterType(EventDataVote{}, "tendermint/event/Vote") tmjson.RegisterType(EventDataValidatorSetUpdates{}, "tendermint/event/ValidatorSetUpdates") tmjson.RegisterType(EventDataString(""), "tendermint/event/ProposalString") + tmjson.RegisterType(EventDataFastSyncStatus{}, "tendermint/event/FastSyncStatus") } // Most event messages are basic types (a block, a transaction) @@ -170,6 +174,13 @@ type EventDataValidatorSetUpdates struct { ValidatorUpdates []*Validator `json:"validator_updates"` } +// EventDataFastSyncStatus shows the fastsync status and the +// height when the node state sync mechanism changes. +type EventDataFastSyncStatus struct { + Complete bool `json:"complete"` + Height int64 `json:"height"` +} + // PUBSUB const ( @@ -207,6 +218,7 @@ var ( EventQueryValidatorSetUpdates = QueryForEvent(EventValidatorSetUpdatesValue) EventQueryValidBlock = QueryForEvent(EventValidBlockValue) EventQueryVote = QueryForEvent(EventVoteValue) + EventQueryFastSyncStatus = QueryForEvent(EventFastSyncStatusValue) ) func EventQueryTxFor(tx Tx) tmpubsub.Query {