Browse Source

statesync/event: emit statesync start/end event (#6700)

pull/6752/head
JayT106 3 years ago
committed by GitHub
parent
commit
e70445f942
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 374 additions and 65 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +7
    -0
      docs/tendermint-core/state-sync.md
  3. +28
    -0
      internal/consensus/mocks/cons_sync_reactor.go
  4. +71
    -0
      internal/consensus/mocks/fast_sync_reactor.go
  5. +16
    -0
      internal/consensus/reactor.go
  6. +50
    -0
      internal/statesync/mock_sync_reactor.go
  7. +11
    -0
      internal/statesync/reactor.go
  8. +83
    -54
      node/node.go
  9. +75
    -0
      node/node_test.go
  10. +8
    -0
      types/event_bus.go
  11. +3
    -0
      types/event_bus_test.go
  12. +21
    -11
      types/events.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -99,6 +99,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi
- Transactions are gossiped in FIFO order as they are in `v0`.
- [config/indexer] \#6411 Introduce support for custom event indexing data sources, specifically PostgreSQL. (@JayT106)
- [fastsync/event] \#6619 Emit fastsync status event when switching consensus/fastsync (@JayT106)
- [statesync/event] \#6700 Emit statesync status start/end event (@JayT106)
### IMPROVEMENTS
- [libs/log] Console log formatting changes as a result of \#6534 and \#6589. (@tychoish)


+ 7
- 0
docs/tendermint-core/state-sync.md View File

@ -9,3 +9,10 @@ With state sync your node will download data related to the head or near the hea
This leads to drastically shorter times for joining a network.
Information on how to configure state sync is located in the [nodes section](../nodes/state-sync.md)
## Events
When a node starts with the statesync flag enabled in the config file, it will emit two events: one upon starting statesync and the other upon completion.
The user can query the events by subscribing `EventQueryStateSyncStatus`
Please check [types](https://pkg.go.dev/github.com/tendermint/tendermint/types?utm_source=godoc#pkg-constants) for the details.

+ 28
- 0
internal/consensus/mocks/cons_sync_reactor.go View File

@ -0,0 +1,28 @@
// Code generated by mockery 2.7.5. DO NOT EDIT.
package mocks
import (
mock "github.com/stretchr/testify/mock"
state "github.com/tendermint/tendermint/state"
)
// ConsSyncReactor is an autogenerated mock type for the ConsSyncReactor type
type ConsSyncReactor struct {
mock.Mock
}
// SetFastSyncingMetrics provides a mock function with given fields: _a0
func (_m *ConsSyncReactor) SetFastSyncingMetrics(_a0 float64) {
_m.Called(_a0)
}
// SetStateSyncingMetrics provides a mock function with given fields: _a0
func (_m *ConsSyncReactor) SetStateSyncingMetrics(_a0 float64) {
_m.Called(_a0)
}
// SwitchToConsensus provides a mock function with given fields: _a0, _a1
func (_m *ConsSyncReactor) SwitchToConsensus(_a0 state.State, _a1 bool) {
_m.Called(_a0, _a1)
}

+ 71
- 0
internal/consensus/mocks/fast_sync_reactor.go View File

@ -0,0 +1,71 @@
// Code generated by mockery 2.7.5. DO NOT EDIT.
package mocks
import (
mock "github.com/stretchr/testify/mock"
state "github.com/tendermint/tendermint/state"
time "time"
)
// FastSyncReactor is an autogenerated mock type for the FastSyncReactor type
type FastSyncReactor struct {
mock.Mock
}
// GetMaxPeerBlockHeight provides a mock function with given fields:
func (_m *FastSyncReactor) GetMaxPeerBlockHeight() int64 {
ret := _m.Called()
var r0 int64
if rf, ok := ret.Get(0).(func() int64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int64)
}
return r0
}
// GetRemainingSyncTime provides a mock function with given fields:
func (_m *FastSyncReactor) GetRemainingSyncTime() time.Duration {
ret := _m.Called()
var r0 time.Duration
if rf, ok := ret.Get(0).(func() time.Duration); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(time.Duration)
}
return r0
}
// GetTotalSyncedTime provides a mock function with given fields:
func (_m *FastSyncReactor) GetTotalSyncedTime() time.Duration {
ret := _m.Called()
var r0 time.Duration
if rf, ok := ret.Get(0).(func() time.Duration); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(time.Duration)
}
return r0
}
// SwitchToFastSync provides a mock function with given fields: _a0
func (_m *FastSyncReactor) SwitchToFastSync(_a0 state.State) error {
ret := _m.Called(_a0)
var r0 error
if rf, ok := ret.Get(0).(func(state.State) error); ok {
r0 = rf(_a0)
} else {
r0 = ret.Error(0)
}
return r0
}

+ 16
- 0
internal/consensus/reactor.go View File

@ -112,6 +112,14 @@ type FastSyncReactor interface {
GetRemainingSyncTime() time.Duration
}
//go:generate mockery --case underscore --name ConsSyncReactor
// ConsSyncReactor defines an interface used for testing abilities of node.startStateSync.
type ConsSyncReactor interface {
SwitchToConsensus(sm.State, bool)
SetStateSyncingMetrics(float64)
SetFastSyncingMetrics(float64)
}
// Reactor defines a reactor for the consensus service.
type Reactor struct {
service.BaseService
@ -1429,3 +1437,11 @@ func (r *Reactor) peerStatsRoutine() {
func (r *Reactor) GetConsensusState() *State {
return r.state
}
func (r *Reactor) SetStateSyncingMetrics(v float64) {
r.Metrics.StateSyncing.Set(v)
}
func (r *Reactor) SetFastSyncingMetrics(v float64) {
r.Metrics.FastSyncing.Set(v)
}

+ 50
- 0
internal/statesync/mock_sync_reactor.go View File

@ -0,0 +1,50 @@
package statesync
import (
"context"
"time"
mock "github.com/stretchr/testify/mock"
state "github.com/tendermint/tendermint/state"
)
// MockSyncReactor is an autogenerated mock type for the SyncReactor type.
// Because of the stateprovider uses in Sync(), we use package statesync instead of mocks.
type MockSyncReactor struct {
mock.Mock
}
// Backfill provides a mock function with given fields: _a0
func (_m *MockSyncReactor) Backfill(_a0 state.State) error {
ret := _m.Called(_a0)
var r0 error
if rf, ok := ret.Get(0).(func(state.State) error); ok {
r0 = rf(_a0)
} else {
r0 = ret.Error(0)
}
return r0
}
// Sync provides a mock function with given fields: _a0, _a1, _a2
func (_m *MockSyncReactor) Sync(_a0 context.Context, _a1 StateProvider, _a2 time.Duration) (state.State, error) {
ret := _m.Called(_a0, _a1, _a2)
var r0 state.State
if rf, ok := ret.Get(0).(func(context.Context, StateProvider, time.Duration) state.State); ok {
r0 = rf(_a0, _a1, _a2)
} else {
r0 = ret.Get(0).(state.State)
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, StateProvider, time.Duration) error); ok {
r1 = rf(_a0, _a1, _a2)
} else {
r1 = ret.Error(1)
}
return r0, r1
}

+ 11
- 0
internal/statesync/reactor.go View File

@ -102,6 +102,12 @@ const (
maxLightBlockRequestRetries = 20
)
// SyncReactor defines an interface used for testing abilities of node.startStateSync.
type SyncReactor interface {
Sync(context.Context, StateProvider, time.Duration) (sm.State, error)
Backfill(sm.State) error
}
// Reactor handles state sync, both restoring snapshots for the local node and
// serving snapshots for other nodes.
type Reactor struct {
@ -222,6 +228,11 @@ func (r *Reactor) Sync(
return sm.State{}, errors.New("a state sync is already in progress")
}
if stateProvider == nil {
r.mtx.Unlock()
return sm.State{}, errors.New("the stateProvider should not be nil when doing the state sync")
}
r.syncer = newSyncer(
r.cfg,
r.Logger,


+ 83
- 54
node/node.go View File

@ -63,26 +63,25 @@ type nodeImpl struct {
isListening bool
// services
eventBus *types.EventBus // pub/sub for services
stateStore sm.Store
blockStore *store.BlockStore // store the blockchain to disk
bcReactor service.Service // for fast-syncing
mempoolReactor service.Service // for gossipping transactions
mempool mempool.Mempool
stateSync bool // whether the node should state sync on startup
stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots
stateSyncProvider statesync.StateProvider // provides state data for bootstrapping a node
consensusState *cs.State // latest consensus state
consensusReactor *cs.Reactor // for participating in the consensus
pexReactor *pex.Reactor // for exchanging peer addresses
pexReactorV2 *pex.ReactorV2 // for exchanging peer addresses
evidenceReactor *evidence.Reactor
evidencePool *evidence.Pool // tracking evidence
proxyApp proxy.AppConns // connection to the application
rpcListeners []net.Listener // rpc servers
eventSinks []indexer.EventSink
indexerService *indexer.Service
prometheusSrv *http.Server
eventBus *types.EventBus // pub/sub for services
stateStore sm.Store
blockStore *store.BlockStore // store the blockchain to disk
bcReactor service.Service // for fast-syncing
mempoolReactor service.Service // for gossipping transactions
mempool mempool.Mempool
stateSync bool // whether the node should state sync on startup
stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots
consensusState *cs.State // latest consensus state
consensusReactor *cs.Reactor // for participating in the consensus
pexReactor *pex.Reactor // for exchanging peer addresses
pexReactorV2 *pex.ReactorV2 // for exchanging peer addresses
evidenceReactor *evidence.Reactor
evidencePool *evidence.Pool // tracking evidence
proxyApp proxy.AppConns // connection to the application
rpcListeners []net.Listener // rpc servers
eventSinks []indexer.EventSink
indexerService *indexer.Service
prometheusSrv *http.Server
}
// newDefaultNode returns a Tendermint node with default settings for the
@ -663,9 +662,15 @@ func (n *nodeImpl) OnStart() error {
return fmt.Errorf("unable to derive state: %w", err)
}
err = startStateSync(n.stateSyncReactor, bcR, n.consensusReactor, n.stateSyncProvider,
n.config.StateSync, n.config.FastSyncMode, n.stateStore, n.blockStore, state, n.eventBus)
ssc := n.config.StateSync
sp, err := constructStateProvider(ssc, state, n.Logger.With("module", "light"))
if err != nil {
return fmt.Errorf("failed to set up light client state provider: %w", err)
}
if err := startStateSync(n.stateSyncReactor, bcR, n.consensusReactor, sp,
ssc, n.config.FastSyncMode, state.InitialHeight, n.eventBus); err != nil {
return fmt.Errorf("failed to start state sync: %w", err)
}
}
@ -1027,54 +1032,57 @@ func (n *nodeImpl) NodeInfo() types.NodeInfo {
}
// startStateSync starts an asynchronous state sync process, then switches to fast sync mode.
func startStateSync(ssR *statesync.Reactor, bcR cs.FastSyncReactor, conR *cs.Reactor,
stateProvider statesync.StateProvider, config *cfg.StateSyncConfig, fastSync bool,
stateStore sm.Store, blockStore *store.BlockStore, state sm.State, eventbus *types.EventBus) error {
ssR.Logger.Info("starting state sync...")
if stateProvider == nil {
var err error
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stateProvider, err = statesync.NewLightClientStateProvider(
ctx,
state.ChainID, state.Version, state.InitialHeight,
config.RPCServers, light.TrustOptions{
Period: config.TrustPeriod,
Height: config.TrustHeight,
Hash: config.TrustHashBytes(),
}, ssR.Logger.With("module", "light"))
if err != nil {
return fmt.Errorf("failed to set up light client state provider: %w", err)
}
func startStateSync(
ssR statesync.SyncReactor,
bcR cs.FastSyncReactor,
conR cs.ConsSyncReactor,
sp statesync.StateProvider,
config *cfg.StateSyncConfig,
fastSync bool,
stateInitHeight int64,
eb *types.EventBus,
) error {
stateSyncLogger := eb.Logger.With("module", "statesync")
stateSyncLogger.Info("starting state sync...")
// at the beginning of the statesync start, we use the initialHeight as the event height
// because of the statesync doesn't have the concreate state height before fetched the snapshot.
d := types.EventDataStateSyncStatus{Complete: false, Height: stateInitHeight}
if err := eb.PublishEventStateSyncStatus(d); err != nil {
stateSyncLogger.Error("failed to emit the statesync start event", "err", err)
}
go func() {
state, err := ssR.Sync(context.TODO(), stateProvider, config.DiscoveryTime)
state, err := ssR.Sync(context.TODO(), sp, config.DiscoveryTime)
if err != nil {
ssR.Logger.Error("state sync failed", "err", err)
stateSyncLogger.Error("state sync failed", "err", err)
return
}
err = ssR.Backfill(state)
if err != nil {
ssR.Logger.Error("backfill failed; node has insufficient history to verify all evidence;"+
if err := ssR.Backfill(state); err != nil {
stateSyncLogger.Error("backfill failed; node has insufficient history to verify all evidence;"+
" proceeding optimistically...", "err", err)
}
conR.Metrics.StateSyncing.Set(0)
conR.SetStateSyncingMetrics(0)
d := types.EventDataStateSyncStatus{Complete: true, Height: state.LastBlockHeight}
if err := eb.PublishEventStateSyncStatus(d); err != nil {
stateSyncLogger.Error("failed to emit the statesync start event", "err", err)
}
if fastSync {
// FIXME Very ugly to have these metrics bleed through here.
conR.Metrics.FastSyncing.Set(1)
err = bcR.SwitchToFastSync(state)
if err != nil {
ssR.Logger.Error("failed to switch to fast sync", "err", err)
conR.SetFastSyncingMetrics(1)
if err := bcR.SwitchToFastSync(state); err != nil {
stateSyncLogger.Error("failed to switch to fast sync", "err", err)
return
}
d := types.EventDataFastSyncStatus{Complete: false, Height: state.LastBlockHeight}
if err := eventbus.PublishEventFastSyncStatus(d); err != nil {
ssR.Logger.Error("failed to emit the fastsync starting event", "err", err)
if err := eb.PublishEventFastSyncStatus(d); err != nil {
stateSyncLogger.Error("failed to emit the fastsync starting event", "err", err)
}
} else {
@ -1266,3 +1274,24 @@ func getChannelsFromShim(reactorShim *p2p.ReactorShim) map[p2p.ChannelID]*p2p.Ch
return channels
}
func constructStateProvider(
ssc *cfg.StateSyncConfig,
state sm.State,
logger log.Logger,
) (statesync.StateProvider, error) {
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
defer cancel()
to := light.TrustOptions{
Period: ssc.TrustPeriod,
Height: ssc.TrustHeight,
Hash: ssc.TrustHashBytes(),
}
return statesync.NewLightClientStateProvider(
ctx,
state.ChainID, state.Version, state.InitialHeight,
ssc.RPCServers, to, logger,
)
}

+ 75
- 0
node/node_test.go View File

@ -21,11 +21,16 @@ import (
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/crypto/tmhash"
consmocks "github.com/tendermint/tendermint/internal/consensus/mocks"
ssmocks "github.com/tendermint/tendermint/internal/statesync/mocks"
"github.com/tendermint/tendermint/internal/evidence"
"github.com/tendermint/tendermint/internal/mempool"
mempoolv0 "github.com/tendermint/tendermint/internal/mempool/v0"
statesync "github.com/tendermint/tendermint/internal/statesync"
"github.com/tendermint/tendermint/internal/test/factory"
"github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmrand "github.com/tendermint/tendermint/libs/rand"
tmtime "github.com/tendermint/tendermint/libs/time"
"github.com/tendermint/tendermint/privval"
@ -626,6 +631,12 @@ func state(nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) {
}
func TestLoadStateFromGenesis(t *testing.T) {
_ = loadStatefromGenesis(t)
}
func loadStatefromGenesis(t *testing.T) sm.State {
t.Helper()
stateDB := dbm.NewMemDB()
stateStore := sm.NewStore(stateDB)
config := cfg.ResetTestRoot("load_state_from_genesis")
@ -642,4 +653,68 @@ func TestLoadStateFromGenesis(t *testing.T) {
)
require.NoError(t, err)
require.NotNil(t, state)
return state
}
func TestNodeStartStateSync(t *testing.T) {
mockSSR := &statesync.MockSyncReactor{}
mockFSR := &consmocks.FastSyncReactor{}
mockCSR := &consmocks.ConsSyncReactor{}
mockSP := &ssmocks.StateProvider{}
state := loadStatefromGenesis(t)
config := cfg.ResetTestRoot("load_state_from_genesis")
eventBus, err := createAndStartEventBus(log.TestingLogger())
defer func() {
err := eventBus.Stop()
require.NoError(t, err)
}()
require.NoError(t, err)
require.NotNil(t, eventBus)
sub, err := eventBus.Subscribe(context.Background(), "test-client", types.EventQueryStateSyncStatus, 10)
require.NoError(t, err)
require.NotNil(t, sub)
cfgSS := config.StateSync
mockSSR.On("Sync", context.TODO(), mockSP, cfgSS.DiscoveryTime).Return(state, nil).
On("Backfill", state).Return(nil)
mockCSR.On("SetStateSyncingMetrics", float64(0)).Return().
On("SwitchToConsensus", state, true).Return()
require.NoError(t,
startStateSync(mockSSR, mockFSR, mockCSR, mockSP, config.StateSync, false, state.InitialHeight, eventBus))
for cnt := 0; cnt < 2; {
select {
case <-time.After(3 * time.Second):
t.Errorf("StateSyncStatus timeout")
case msg := <-sub.Out():
if cnt == 0 {
ensureStateSyncStatus(t, msg, false, state.InitialHeight)
cnt++
} else {
// the state height = 0 because we are not actually update the state in this test
ensureStateSyncStatus(t, msg, true, 0)
cnt++
}
}
}
mockSSR.AssertNumberOfCalls(t, "Sync", 1)
mockSSR.AssertNumberOfCalls(t, "Backfill", 1)
mockCSR.AssertNumberOfCalls(t, "SetStateSyncingMetrics", 1)
mockCSR.AssertNumberOfCalls(t, "SwitchToConsensus", 1)
}
func ensureStateSyncStatus(t *testing.T, msg tmpubsub.Message, complete bool, height int64) {
t.Helper()
status, ok := msg.Data().(types.EventDataStateSyncStatus)
require.True(t, ok)
require.Equal(t, complete, status.Complete)
require.Equal(t, height, status.Height)
}

+ 8
- 0
types/event_bus.go View File

@ -157,6 +157,10 @@ func (b *EventBus) PublishEventFastSyncStatus(data EventDataFastSyncStatus) erro
return b.Publish(EventFastSyncStatusValue, data)
}
func (b *EventBus) PublishEventStateSyncStatus(data EventDataStateSyncStatus) error {
return b.Publish(EventStateSyncStatusValue, data)
}
// PublishEventTx publishes tx event with events from Result. Note it will add
// predefined keys (EventTypeKey, TxHashKey). Existing events with the same keys
// will be overwritten.
@ -316,3 +320,7 @@ func (NopEventBus) PublishEventValidatorSetUpdates(data EventDataValidatorSetUpd
func (NopEventBus) PublishEventFastSyncStatus(data EventDataFastSyncStatus) error {
return nil
}
func (NopEventBus) PublishEventStateSyncStatus(data EventDataStateSyncStatus) error {
return nil
}

+ 3
- 0
types/event_bus_test.go View File

@ -372,6 +372,8 @@ func TestEventBusPublish(t *testing.T) {
require.NoError(t, err)
err = eventBus.PublishEventFastSyncStatus(EventDataFastSyncStatus{})
require.NoError(t, err)
err = eventBus.PublishEventStateSyncStatus(EventDataStateSyncStatus{})
require.NoError(t, err)
select {
case <-done:
@ -479,6 +481,7 @@ var events = []string{
EventTimeoutWaitValue,
EventVoteValue,
EventFastSyncStatusValue,
EventStateSyncStatusValue,
}
func randEventValue() string {


+ 21
- 11
types/events.go View File

@ -29,17 +29,18 @@ const (
EventCompleteProposalValue = "CompleteProposal"
// The FastSyncStatus event will be emitted when the node switching
// state sync mechanism between the consensus reactor and the fastsync reactor.
EventFastSyncStatusValue = "FastSyncStatus"
EventLockValue = "Lock"
EventNewRoundValue = "NewRound"
EventNewRoundStepValue = "NewRoundStep"
EventPolkaValue = "Polka"
EventRelockValue = "Relock"
EventTimeoutProposeValue = "TimeoutPropose"
EventTimeoutWaitValue = "TimeoutWait"
EventUnlockValue = "Unlock"
EventValidBlockValue = "ValidBlock"
EventVoteValue = "Vote"
EventFastSyncStatusValue = "FastSyncStatus"
EventLockValue = "Lock"
EventNewRoundValue = "NewRound"
EventNewRoundStepValue = "NewRoundStep"
EventPolkaValue = "Polka"
EventRelockValue = "Relock"
EventStateSyncStatusValue = "StateSyncStatus"
EventTimeoutProposeValue = "TimeoutPropose"
EventTimeoutWaitValue = "TimeoutWait"
EventUnlockValue = "Unlock"
EventValidBlockValue = "ValidBlock"
EventVoteValue = "Vote"
)
// Pre-populated ABCI Tendermint-reserved events
@ -104,6 +105,7 @@ func init() {
tmjson.RegisterType(EventDataValidatorSetUpdates{}, "tendermint/event/ValidatorSetUpdates")
tmjson.RegisterType(EventDataString(""), "tendermint/event/ProposalString")
tmjson.RegisterType(EventDataFastSyncStatus{}, "tendermint/event/FastSyncStatus")
tmjson.RegisterType(EventDataStateSyncStatus{}, "tendermint/event/StateSyncStatus")
}
// Most event messages are basic types (a block, a transaction)
@ -181,6 +183,13 @@ type EventDataFastSyncStatus struct {
Height int64 `json:"height"`
}
// EventDataStateSyncStatus shows the statesync status and the
// height when the node state sync mechanism changes.
type EventDataStateSyncStatus struct {
Complete bool `json:"complete"`
Height int64 `json:"height"`
}
// PUBSUB
const (
@ -219,6 +228,7 @@ var (
EventQueryValidBlock = QueryForEvent(EventValidBlockValue)
EventQueryVote = QueryForEvent(EventVoteValue)
EventQueryFastSyncStatus = QueryForEvent(EventFastSyncStatusValue)
EventQueryStateSyncStatus = QueryForEvent(EventStateSyncStatusValue)
)
func EventQueryTxFor(tx Tx) tmpubsub.Query {


Loading…
Cancel
Save