Browse Source

fastsync/event: emit fastsync status event when switching consensus/fastsync (#6619)

closes #2498 
solves part of #3365

Note: difficult to test the event emit in SwitchToFastSync part, might need to change `stateSyncReactor` to an interface in the `nodeImpl` struct
pull/6742/head
JayT106 3 years ago
committed by GitHub
parent
commit
c4f77ab6d1
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 97 additions and 18 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +10
    -0
      docs/tendermint-core/fast-sync.md
  3. +5
    -0
      internal/consensus/reactor.go
  4. +37
    -5
      internal/consensus/reactor_test.go
  5. +8
    -2
      node/node.go
  6. +8
    -0
      types/event_bus.go
  7. +6
    -1
      types/event_bus_test.go
  8. +22
    -10
      types/events.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -98,6 +98,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi
- Applications that do not specify a priority, i.e. zero, will have transactions reaped by the order in which they are received by the node.
- Transactions are gossiped in FIFO order as they are in `v0`.
- [config/indexer] \#6411 Introduce support for custom event indexing data sources, specifically PostgreSQL. (@JayT106)
- [fastsync/event] \#6619 Emit fastsync status event when switching consensus/fastsync (@JayT106)
### IMPROVEMENTS
- [libs/log] Console log formatting changes as a result of \#6534 and \#6589. (@tychoish)


+ 10
- 0
docs/tendermint-core/fast-sync.md View File

@ -45,3 +45,13 @@ version = "v0"
If we're lagging sufficiently, we should go back to fast syncing, but
this is an [open issue](https://github.com/tendermint/tendermint/issues/129).
## The Fast Sync event
When the tendermint blockchain core launches, it might switch to the `fast-sync`
mode to catch up the states to the current network best height. the core will emits
a fast-sync event to expose the current status and the sync height. Once it catched
the network best height, it will switches to the state sync mechanism and then emit
another event for exposing the fast-sync `complete` status and the state `height`.
The user can query the events by subscribing `EventQueryFastSyncStatus`
Please check [types](https://pkg.go.dev/github.com/tendermint/tendermint/types?utm_source=godoc#pkg-constants) for the details.

+ 5
- 0
internal/consensus/reactor.go View File

@ -304,6 +304,11 @@ conS:
conR:
%+v`, err, r.state, r))
}
d := types.EventDataFastSyncStatus{Complete: true, Height: state.LastBlockHeight}
if err := r.eventBus.PublishEventFastSyncStatus(d); err != nil {
r.Logger.Error("failed to emit the fastsync complete event", "err", err)
}
}
// String returns a string representation of the Reactor.


+ 37
- 5
internal/consensus/reactor_test.go View File

@ -25,6 +25,7 @@ import (
"github.com/tendermint/tendermint/internal/p2p/p2ptest"
"github.com/tendermint/tendermint/internal/test/factory"
"github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
sm "github.com/tendermint/tendermint/state"
statemocks "github.com/tendermint/tendermint/state/mocks"
@ -42,6 +43,7 @@ type reactorTestSuite struct {
states map[types.NodeID]*State
reactors map[types.NodeID]*Reactor
subs map[types.NodeID]types.Subscription
fastsyncSubs map[types.NodeID]types.Subscription
stateChannels map[types.NodeID]*p2p.Channel
dataChannels map[types.NodeID]*p2p.Channel
voteChannels map[types.NodeID]*p2p.Channel
@ -58,10 +60,11 @@ func setup(t *testing.T, numNodes int, states []*State, size int) *reactorTestSu
t.Helper()
rts := &reactorTestSuite{
network: p2ptest.MakeNetwork(t, p2ptest.NetworkOptions{NumNodes: numNodes}),
states: make(map[types.NodeID]*State),
reactors: make(map[types.NodeID]*Reactor, numNodes),
subs: make(map[types.NodeID]types.Subscription, numNodes),
network: p2ptest.MakeNetwork(t, p2ptest.NetworkOptions{NumNodes: numNodes}),
states: make(map[types.NodeID]*State),
reactors: make(map[types.NodeID]*Reactor, numNodes),
subs: make(map[types.NodeID]types.Subscription, numNodes),
fastsyncSubs: make(map[types.NodeID]types.Subscription, numNodes),
}
rts.stateChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(StateChannel), new(tmcons.Message), size)
@ -69,6 +72,8 @@ func setup(t *testing.T, numNodes int, states []*State, size int) *reactorTestSu
rts.voteChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(VoteChannel), new(tmcons.Message), size)
rts.voteSetBitsChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(VoteSetBitsChannel), new(tmcons.Message), size)
_, cancel := context.WithCancel(context.Background())
i := 0
for nodeID, node := range rts.network.Nodes {
state := states[i]
@ -89,9 +94,13 @@ func setup(t *testing.T, numNodes int, states []*State, size int) *reactorTestSu
blocksSub, err := state.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, size)
require.NoError(t, err)
fsSub, err := state.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryFastSyncStatus, size)
require.NoError(t, err)
rts.states[nodeID] = state
rts.subs[nodeID] = blocksSub
rts.reactors[nodeID] = reactor
rts.fastsyncSubs[nodeID] = fsSub
// simulate handle initChain in handshake
if state.state.LastBlockHeight == 0 {
@ -117,6 +126,7 @@ func setup(t *testing.T, numNodes int, states []*State, size int) *reactorTestSu
}
leaktest.Check(t)
cancel()
})
return rts
@ -253,6 +263,15 @@ func waitForBlockWithUpdatedValsAndValidateIt(
wg.Wait()
}
func ensureFastSyncStatus(t *testing.T, msg tmpubsub.Message, complete bool, height int64) {
t.Helper()
status, ok := msg.Data().(types.EventDataFastSyncStatus)
require.True(t, ok)
require.Equal(t, complete, status.Complete)
require.Equal(t, height, status.Height)
}
func TestReactorBasic(t *testing.T) {
config := configSetup(t)
@ -275,8 +294,21 @@ func TestReactorBasic(t *testing.T) {
// wait till everyone makes the first new block
go func(s types.Subscription) {
defer wg.Done()
<-s.Out()
wg.Done()
}(sub)
}
wg.Wait()
for _, sub := range rts.fastsyncSubs {
wg.Add(1)
// wait till everyone makes the consensus switch
go func(s types.Subscription) {
defer wg.Done()
msg := <-s.Out()
ensureFastSyncStatus(t, msg, true, 0)
}(sub)
}


+ 8
- 2
node/node.go View File

@ -664,7 +664,7 @@ func (n *nodeImpl) OnStart() error {
}
err = startStateSync(n.stateSyncReactor, bcR, n.consensusReactor, n.stateSyncProvider,
n.config.StateSync, n.config.FastSyncMode, n.stateStore, n.blockStore, state)
n.config.StateSync, n.config.FastSyncMode, n.stateStore, n.blockStore, state, n.eventBus)
if err != nil {
return fmt.Errorf("failed to start state sync: %w", err)
}
@ -1029,7 +1029,7 @@ func (n *nodeImpl) NodeInfo() types.NodeInfo {
// startStateSync starts an asynchronous state sync process, then switches to fast sync mode.
func startStateSync(ssR *statesync.Reactor, bcR cs.FastSyncReactor, conR *cs.Reactor,
stateProvider statesync.StateProvider, config *cfg.StateSyncConfig, fastSync bool,
stateStore sm.Store, blockStore *store.BlockStore, state sm.State) error {
stateStore sm.Store, blockStore *store.BlockStore, state sm.State, eventbus *types.EventBus) error {
ssR.Logger.Info("starting state sync...")
if stateProvider == nil {
@ -1071,6 +1071,12 @@ func startStateSync(ssR *statesync.Reactor, bcR cs.FastSyncReactor, conR *cs.Rea
ssR.Logger.Error("failed to switch to fast sync", "err", err)
return
}
d := types.EventDataFastSyncStatus{Complete: false, Height: state.LastBlockHeight}
if err := eventbus.PublishEventFastSyncStatus(d); err != nil {
ssR.Logger.Error("failed to emit the fastsync starting event", "err", err)
}
} else {
conR.SwitchToConsensus(state, true)
}


+ 8
- 0
types/event_bus.go View File

@ -153,6 +153,10 @@ func (b *EventBus) PublishEventValidBlock(data EventDataRoundState) error {
return b.Publish(EventValidBlockValue, data)
}
func (b *EventBus) PublishEventFastSyncStatus(data EventDataFastSyncStatus) error {
return b.Publish(EventFastSyncStatusValue, data)
}
// PublishEventTx publishes tx event with events from Result. Note it will add
// predefined keys (EventTypeKey, TxHashKey). Existing events with the same keys
// will be overwritten.
@ -308,3 +312,7 @@ func (NopEventBus) PublishEventLock(data EventDataRoundState) error {
func (NopEventBus) PublishEventValidatorSetUpdates(data EventDataValidatorSetUpdates) error {
return nil
}
func (NopEventBus) PublishEventFastSyncStatus(data EventDataFastSyncStatus) error {
return nil
}

+ 6
- 1
types/event_bus_test.go View File

@ -370,6 +370,8 @@ func TestEventBusPublish(t *testing.T) {
require.NoError(t, err)
err = eventBus.PublishEventValidatorSetUpdates(EventDataValidatorSetUpdates{})
require.NoError(t, err)
err = eventBus.PublishEventFastSyncStatus(EventDataFastSyncStatus{})
require.NoError(t, err)
select {
case <-done:
@ -476,9 +478,11 @@ var events = []string{
EventRelockValue,
EventTimeoutWaitValue,
EventVoteValue,
EventFastSyncStatusValue,
}
func randEventValue() string {
return events[mrand.Intn(len(events))]
}
@ -494,7 +498,8 @@ var queries = []tmpubsub.Query{
EventQueryLock,
EventQueryRelock,
EventQueryTimeoutWait,
EventQueryVote}
EventQueryVote,
EventQueryFastSyncStatus}
func randQuery() tmpubsub.Query {
return queries[mrand.Intn(len(queries))]


+ 22
- 10
types/events.go View File

@ -27,16 +27,19 @@ const (
// These are used for testing the consensus state machine.
// They can also be used to build real-time consensus visualizers.
EventCompleteProposalValue = "CompleteProposal"
EventLockValue = "Lock"
EventNewRoundValue = "NewRound"
EventNewRoundStepValue = "NewRoundStep"
EventPolkaValue = "Polka"
EventRelockValue = "Relock"
EventTimeoutProposeValue = "TimeoutPropose"
EventTimeoutWaitValue = "TimeoutWait"
EventUnlockValue = "Unlock"
EventValidBlockValue = "ValidBlock"
EventVoteValue = "Vote"
// The FastSyncStatus event will be emitted when the node switching
// state sync mechanism between the consensus reactor and the fastsync reactor.
EventFastSyncStatusValue = "FastSyncStatus"
EventLockValue = "Lock"
EventNewRoundValue = "NewRound"
EventNewRoundStepValue = "NewRoundStep"
EventPolkaValue = "Polka"
EventRelockValue = "Relock"
EventTimeoutProposeValue = "TimeoutPropose"
EventTimeoutWaitValue = "TimeoutWait"
EventUnlockValue = "Unlock"
EventValidBlockValue = "ValidBlock"
EventVoteValue = "Vote"
)
// Pre-populated ABCI Tendermint-reserved events
@ -100,6 +103,7 @@ func init() {
tmjson.RegisterType(EventDataVote{}, "tendermint/event/Vote")
tmjson.RegisterType(EventDataValidatorSetUpdates{}, "tendermint/event/ValidatorSetUpdates")
tmjson.RegisterType(EventDataString(""), "tendermint/event/ProposalString")
tmjson.RegisterType(EventDataFastSyncStatus{}, "tendermint/event/FastSyncStatus")
}
// Most event messages are basic types (a block, a transaction)
@ -170,6 +174,13 @@ type EventDataValidatorSetUpdates struct {
ValidatorUpdates []*Validator `json:"validator_updates"`
}
// EventDataFastSyncStatus shows the fastsync status and the
// height when the node state sync mechanism changes.
type EventDataFastSyncStatus struct {
Complete bool `json:"complete"`
Height int64 `json:"height"`
}
// PUBSUB
const (
@ -207,6 +218,7 @@ var (
EventQueryValidatorSetUpdates = QueryForEvent(EventValidatorSetUpdatesValue)
EventQueryValidBlock = QueryForEvent(EventValidBlockValue)
EventQueryVote = QueryForEvent(EventVoteValue)
EventQueryFastSyncStatus = QueryForEvent(EventFastSyncStatusValue)
)
func EventQueryTxFor(tx Tx) tmpubsub.Query {


Loading…
Cancel
Save