Browse Source

node: collapse initialization internals (#7567)

pull/7572/head
Sam Kleinman 2 years ago
committed by GitHub
parent
commit
e07c4cdcf2
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 87 additions and 110 deletions
  1. +12
    -1
      internal/blocksync/reactor.go
  2. +3
    -1
      internal/blocksync/reactor_test.go
  3. +5
    -18
      internal/inspect/rpc/rpc.go
  4. +1
    -3
      internal/rpc/core/consensus.go
  5. +10
    -10
      internal/rpc/core/env.go
  6. +12
    -0
      internal/statesync/reactor.go
  7. +1
    -0
      internal/statesync/reactor_test.go
  8. +40
    -75
      node/node.go
  9. +3
    -2
      rpc/client/rpc_test.go

+ 12
- 1
internal/blocksync/reactor.go View File

@ -10,6 +10,7 @@ import (
"time"
"github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/p2p"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/store"
@ -96,7 +97,8 @@ type Reactor struct {
// stopping the p2p Channel(s).
poolWG sync.WaitGroup
metrics *consensus.Metrics
metrics *consensus.Metrics
eventBus *eventbus.EventBus
syncStartTime time.Time
}
@ -113,6 +115,7 @@ func NewReactor(
peerUpdates *p2p.PeerUpdates,
blockSync bool,
metrics *consensus.Metrics,
eventBus *eventbus.EventBus,
) (*Reactor, error) {
if state.LastBlockHeight != store.Height() {
@ -146,6 +149,7 @@ func NewReactor(
blockSyncOutBridgeCh: make(chan p2p.Envelope),
peerUpdates: peerUpdates,
metrics: metrics,
eventBus: eventBus,
syncStartTime: time.Time{},
}
@ -638,6 +642,13 @@ func (r *Reactor) GetRemainingSyncTime() time.Duration {
return time.Duration(int64(remain * float64(time.Second)))
}
func (r *Reactor) PublishStatus(ctx context.Context, event types.EventDataBlockSyncStatus) error {
if r.eventBus == nil {
return errors.New("event bus is not configured")
}
return r.eventBus.PublishEventBlockSyncStatus(ctx, event)
}
// atomicBool is an atomic Boolean, safe for concurrent use by multiple
// goroutines.
type atomicBool int32


+ 3
- 1
internal/blocksync/reactor_test.go View File

@ -181,7 +181,9 @@ func (rts *reactorTestSuite) addNode(
chCreator,
rts.peerUpdates[nodeID],
rts.blockSync,
consensus.NopMetrics())
consensus.NopMetrics(),
nil, // eventbus, can be nil
)
require.NoError(t, err)
require.NoError(t, rts.reactors[nodeID].Start(ctx))


+ 5
- 18
internal/inspect/rpc/rpc.go View File

@ -8,14 +8,12 @@ import (
"github.com/rs/cors"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/pubsub"
"github.com/tendermint/tendermint/internal/rpc/core"
"github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/state/indexer"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/rpc/jsonrpc/server"
"github.com/tendermint/tendermint/types"
)
// Server defines parameters for running an Inspector rpc server.
@ -33,12 +31,11 @@ type eventBusUnsubscriber interface {
// Routes returns the set of routes used by the Inspector server.
func Routes(cfg config.RPCConfig, s state.Store, bs state.BlockStore, es []indexer.EventSink, logger log.Logger) core.RoutesMap {
env := &core.Environment{
Config: cfg,
EventSinks: es,
StateStore: s,
BlockStore: bs,
ConsensusReactor: waitSyncCheckerImpl{},
Logger: logger,
Config: cfg,
EventSinks: es,
StateStore: s,
BlockStore: bs,
Logger: logger,
}
return core.RoutesMap{
"blockchain": server.NewRPCFunc(env.BlockchainInfo, "minHeight,maxHeight"),
@ -93,16 +90,6 @@ func addCORSHandler(rpcConfig *config.RPCConfig, h http.Handler) http.Handler {
return h
}
type waitSyncCheckerImpl struct{}
func (waitSyncCheckerImpl) WaitSync() bool {
return false
}
func (waitSyncCheckerImpl) GetPeerState(peerID types.NodeID) (*consensus.PeerState, bool) {
return nil, false
}
// ListenAndServe listens on the address specified in srv.Addr and handles any
// incoming requests over HTTP using the Inspector rpc handler specified on the server.
func (srv *Server) ListenAndServe(ctx context.Context) error {


+ 1
- 3
internal/rpc/core/consensus.go View File

@ -101,9 +101,7 @@ func (env *Environment) GetConsensusState(ctx context.Context) (*coretypes.Resul
// ConsensusParams gets the consensus parameters at the given block height.
// If no height is provided, it will fetch the latest consensus params.
// More: https://docs.tendermint.com/master/rpc/#/Info/consensus_params
func (env *Environment) ConsensusParams(
ctx context.Context,
heightPtr *int64) (*coretypes.ResultConsensusParams, error) {
func (env *Environment) ConsensusParams(ctx context.Context, heightPtr *int64) (*coretypes.ResultConsensusParams, error) {
// The latest consensus params that we know is the consensus params after the
// last block.


+ 10
- 10
internal/rpc/core/env.go View File

@ -7,6 +7,7 @@ import (
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/internal/blocksync"
"github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/mempool"
@ -52,11 +53,6 @@ type transport interface {
NodeInfo() types.NodeInfo
}
type consensusReactor interface {
WaitSync() bool
GetPeerState(peerID types.NodeID) (*consensus.PeerState, bool)
}
type peerManager interface {
Peers() []types.NodeID
Addresses(types.NodeID) []p2p.NodeAddress
@ -75,7 +71,8 @@ type Environment struct {
BlockStore sm.BlockStore
EvidencePool sm.EvidencePool
ConsensusState consensusState
ConsensusReactor consensusReactor
ConsensusReactor *consensus.Reactor
BlockSyncReactor *blocksync.Reactor
// Legacy p2p stack
P2PTransport transport
@ -89,7 +86,6 @@ type Environment struct {
EventSinks []indexer.EventSink
EventBus *eventbus.EventBus // thread safe
Mempool mempool.Mempool
BlockSyncReactor consensus.BlockSyncReactor
StateSyncMetricer statesync.Metricer
Logger log.Logger
@ -199,9 +195,13 @@ func (env *Environment) getHeight(latestHeight int64, heightPtr *int64) (int64,
}
func (env *Environment) latestUncommittedHeight() int64 {
nodeIsSyncing := env.ConsensusReactor.WaitSync()
if nodeIsSyncing {
return env.BlockStore.Height()
if env.ConsensusReactor != nil {
// consensus reactor can be nil in inspect mode.
nodeIsSyncing := env.ConsensusReactor.WaitSync()
if nodeIsSyncing {
return env.BlockStore.Height()
}
}
return env.BlockStore.Height() + 1
}

+ 12
- 0
internal/statesync/reactor.go View File

@ -13,6 +13,7 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/proxy"
sm "github.com/tendermint/tendermint/internal/state"
@ -156,6 +157,7 @@ type Reactor struct {
providers map[types.NodeID]*BlockProvider
stateProvider StateProvider
eventBus *eventbus.EventBus
metrics *Metrics
backfillBlockTotal int64
backfilledBlocks int64
@ -179,6 +181,7 @@ func NewReactor(
blockStore *store.BlockStore,
tempDir string,
ssMetrics *Metrics,
eventBus *eventbus.EventBus,
) (*Reactor, error) {
chDesc := getChannelDescriptors()
@ -219,6 +222,7 @@ func NewReactor(
dispatcher: NewDispatcher(blockCh),
providers: make(map[types.NodeID]*BlockProvider),
metrics: ssMetrics,
eventBus: eventBus,
}
r.BaseService = *service.NewBaseService(logger, "StateSync", r)
@ -248,6 +252,14 @@ func (r *Reactor) OnStop() {
r.dispatcher.Close()
}
func (r *Reactor) PublishStatus(ctx context.Context, event types.EventDataStateSyncStatus) error {
if r.eventBus == nil {
return errors.New("event system is not configured")
}
return r.eventBus.PublishEventStateSyncStatus(ctx, event)
}
// Sync runs a state sync, fetching snapshots and providing chunks to the
// application. At the close of the operation, Sync will bootstrap the state
// store and persist the commit at that height so that either consensus or


+ 1
- 0
internal/statesync/reactor_test.go View File

@ -178,6 +178,7 @@ func setup(
rts.blockStore,
"",
m,
nil, // eventbus can be nil
)
require.NoError(t, err)


+ 40
- 75
node/node.go View File

@ -63,23 +63,17 @@ type nodeImpl struct {
isListening bool
// services
eventBus *eventbus.EventBus // pub/sub for services
eventSinks []indexer.EventSink
stateStore sm.Store
blockStore *store.BlockStore // store the blockchain to disk
bcReactor service.Service // for block-syncing
mempoolReactor service.Service // for gossipping transactions
mempool mempool.Mempool
blockStore *store.BlockStore // store the blockchain to disk
stateSync bool // whether the node should state sync on startup
stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots
consensusReactor *consensus.Reactor // for participating in the consensus
pexReactor service.Service // for exchanging peer addresses
evidenceReactor service.Service
rpcListeners []net.Listener // rpc servers
shutdownOps closer
indexerService service.Service
rpcEnv *rpccore.Environment
prometheusSrv *http.Server
services []service.Service
rpcListeners []net.Listener // rpc servers
shutdownOps closer
rpcEnv *rpccore.Environment
prometheusSrv *http.Server
}
// newDefaultNode returns a Tendermint node with default settings for the
@ -339,6 +333,7 @@ func makeNode(
peerManager.Subscribe(ctx),
blockSync && !stateSync,
nodeMetrics.consensus,
eventBus,
)
if err != nil {
return nil, combineCloseError(
@ -372,6 +367,7 @@ func makeNode(
blockStore,
cfg.StateSync.TempDir,
nodeMetrics.statesync,
eventBus,
)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
@ -384,7 +380,6 @@ func makeNode(
return nil, combineCloseError(err, makeCloser(closers))
}
}
node := &nodeImpl{
config: cfg,
logger: logger,
@ -396,19 +391,22 @@ func makeNode(
nodeInfo: nodeInfo,
nodeKey: nodeKey,
eventSinks: eventSinks,
services: []service.Service{
eventBus,
indexerService,
evReactor,
mpReactor,
csReactor,
bcReactor,
pexReactor,
},
stateStore: stateStore,
blockStore: blockStore,
bcReactor: bcReactor,
mempoolReactor: mpReactor,
mempool: mp,
consensusReactor: csReactor,
stateSyncReactor: stateSyncReactor,
stateSync: stateSync,
pexReactor: pexReactor,
evidenceReactor: evReactor,
indexerService: indexerService,
eventBus: eventBus,
eventSinks: eventSinks,
shutdownOps: makeCloser(closers),
@ -494,40 +492,25 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
}
n.isListening = true
if err := n.bcReactor.Start(ctx); err != nil {
return err
}
for _, reactor := range n.services {
if err := reactor.Start(ctx); err != nil {
if errors.Is(err, service.ErrAlreadyStarted) {
continue
}
if err := n.consensusReactor.Start(ctx); err != nil {
return err
return fmt.Errorf("problem starting service '%T': %w ", reactor, err)
}
}
if err := n.stateSyncReactor.Start(ctx); err != nil {
return err
}
if err := n.mempoolReactor.Start(ctx); err != nil {
return err
}
if err := n.evidenceReactor.Start(ctx); err != nil {
return err
}
if n.config.P2P.PexReactor {
if err := n.pexReactor.Start(ctx); err != nil {
return err
}
}
// Run state sync
// TODO: We shouldn't run state sync if we already have state that has a
// LastBlockHeight that is not InitialHeight
if n.stateSync {
bcR, ok := n.bcReactor.(consensus.BlockSyncReactor)
if !ok {
return fmt.Errorf("this blockchain reactor does not support switching from state sync")
}
bcR := n.rpcEnv.BlockSyncReactor
// we need to get the genesis state to get parameters such as
state, err := sm.MakeGenesisState(n.genesisDoc)
@ -540,7 +523,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
// At the beginning of the statesync start, we use the initialHeight as the event height
// because of the statesync doesn't have the concreate state height before fetched the snapshot.
d := types.EventDataStateSyncStatus{Complete: false, Height: state.InitialHeight}
if err := n.eventBus.PublishEventStateSyncStatus(ctx, d); err != nil {
if err := n.stateSyncReactor.PublishStatus(ctx, d); err != nil {
n.logger.Error("failed to emit the statesync start event", "err", err)
}
@ -559,9 +542,9 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
return err
}
n.consensusReactor.SetStateSyncingMetrics(0)
n.rpcEnv.ConsensusReactor.SetStateSyncingMetrics(0)
if err := n.eventBus.PublishEventStateSyncStatus(ctx,
if err := n.stateSyncReactor.PublishStatus(ctx,
types.EventDataStateSyncStatus{
Complete: true,
Height: ssState.LastBlockHeight,
@ -574,13 +557,13 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
// advancing reactors to be able to control which one of the three
// is running
// FIXME Very ugly to have these metrics bleed through here.
n.consensusReactor.SetBlockSyncingMetrics(1)
n.rpcEnv.ConsensusReactor.SetBlockSyncingMetrics(1)
if err := bcR.SwitchToBlockSync(ctx, ssState); err != nil {
n.logger.Error("failed to switch to block sync", "err", err)
return err
}
if err := n.eventBus.PublishEventBlockSyncStatus(ctx,
if err := bcR.PublishStatus(ctx,
types.EventDataBlockSyncStatus{
Complete: false,
Height: ssState.LastBlockHeight,
@ -597,25 +580,17 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
func (n *nodeImpl) OnStop() {
n.logger.Info("Stopping Node")
if n.eventBus != nil {
n.eventBus.Wait()
}
if n.indexerService != nil {
n.indexerService.Wait()
}
for _, es := range n.eventSinks {
if err := es.Stop(); err != nil {
n.logger.Error("failed to stop event sink", "err", err)
}
}
n.bcReactor.Wait()
n.consensusReactor.Wait()
for _, reactor := range n.services {
reactor.Wait()
}
n.stateSyncReactor.Wait()
n.mempoolReactor.Wait()
n.evidenceReactor.Wait()
n.pexReactor.Wait()
n.router.Wait()
n.isListening = false
@ -693,7 +668,7 @@ func (n *nodeImpl) startRPC(ctx context.Context) ([]net.Listener, error) {
wmLogger := rpcLogger.With("protocol", "websocket")
wm := rpcserver.NewWebsocketManager(routes,
rpcserver.OnDisconnect(func(remoteAddr string) {
err := n.eventBus.UnsubscribeAll(context.Background(), remoteAddr)
err := n.rpcEnv.EventBus.UnsubscribeAll(context.Background(), remoteAddr)
if err != nil && err != tmpubsub.ErrSubscriptionNotFound {
wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err)
}
@ -788,19 +763,9 @@ func (n *nodeImpl) startPrometheusServer(ctx context.Context, addr string) *http
return srv
}
// ConsensusReactor returns the Node's ConsensusReactor.
func (n *nodeImpl) ConsensusReactor() *consensus.Reactor {
return n.consensusReactor
}
// Mempool returns the Node's mempool.
func (n *nodeImpl) Mempool() mempool.Mempool {
return n.mempool
}
// EventBus returns the Node's EventBus.
func (n *nodeImpl) EventBus() *eventbus.EventBus {
return n.eventBus
return n.rpcEnv.EventBus
}
// PrivValidator returns the Node's PrivValidator.


+ 3
- 2
rpc/client/rpc_test.go View File

@ -20,6 +20,7 @@ import (
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/crypto/encoding"
"github.com/tendermint/tendermint/internal/mempool"
rpccore "github.com/tendermint/tendermint/internal/rpc/core"
tmjson "github.com/tendermint/tendermint/libs/json"
"github.com/tendermint/tendermint/libs/log"
tmmath "github.com/tendermint/tendermint/libs/math"
@ -576,10 +577,10 @@ func TestClientMethodCalls(t *testing.T) {
func getMempool(t *testing.T, srv service.Service) mempool.Mempool {
t.Helper()
n, ok := srv.(interface {
Mempool() mempool.Mempool
RPCEnvironment() *rpccore.Environment
})
require.True(t, ok)
return n.Mempool()
return n.RPCEnvironment().Mempool
}
// these cases are roughly the same as the TestClientMethodCalls, but


Loading…
Cancel
Save