Browse Source

node: move seed node implementation to its own file (#7566)

pull/7568/merge
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
46f56fcea5
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 171 additions and 152 deletions
  1. +0
    -152
      node/node.go
  2. +171
    -0
      node/seed.go

+ 0
- 152
node/node.go View File

@ -82,26 +82,6 @@ type nodeImpl struct {
prometheusSrv *http.Server prometheusSrv *http.Server
} }
type seedNodeImpl struct {
service.BaseService
logger log.Logger
// config
config *config.Config
genesisDoc *types.GenesisDoc // initial validator set
// network
peerManager *p2p.PeerManager
router *p2p.Router
nodeInfo types.NodeInfo
nodeKey types.NodeKey // our node privkey
isListening bool
// services
pexReactor service.Service // for exchanging peer addresses
shutdownOps closer
}
// newDefaultNode returns a Tendermint node with default settings for the // newDefaultNode returns a Tendermint node with default settings for the
// PrivValidator, ClientCreator, GenesisDoc, and DBProvider. // PrivValidator, ClientCreator, GenesisDoc, and DBProvider.
// It implements NodeProvider. // It implements NodeProvider.
@ -462,138 +442,6 @@ func makeNode(
return node, nil return node, nil
} }
// makeSeedNode returns a new seed node, containing only p2p, pex reactor
func makeSeedNode(
ctx context.Context,
cfg *config.Config,
dbProvider config.DBProvider,
nodeKey types.NodeKey,
genesisDocProvider genesisDocProvider,
logger log.Logger,
) (service.Service, error) {
if !cfg.P2P.PexReactor {
return nil, errors.New("cannot run seed nodes with PEX disabled")
}
genDoc, err := genesisDocProvider()
if err != nil {
return nil, err
}
state, err := sm.MakeGenesisState(genDoc)
if err != nil {
return nil, err
}
nodeInfo, err := makeSeedNodeInfo(cfg, nodeKey, genDoc, state)
if err != nil {
return nil, err
}
// Setup Transport and Switch.
p2pMetrics := p2p.PrometheusMetrics(cfg.Instrumentation.Namespace, "chain_id", genDoc.ChainID)
peerManager, closer, err := createPeerManager(cfg, dbProvider, nodeKey.ID)
if err != nil {
return nil, combineCloseError(
fmt.Errorf("failed to create peer manager: %w", err),
closer)
}
router, err := createRouter(ctx, logger, p2pMetrics, nodeInfo, nodeKey,
peerManager, cfg, nil)
if err != nil {
return nil, combineCloseError(
fmt.Errorf("failed to create router: %w", err),
closer)
}
pexReactor, err := pex.NewReactor(ctx, logger, peerManager, router.OpenChannel, peerManager.Subscribe(ctx))
if err != nil {
return nil, combineCloseError(err, closer)
}
node := &seedNodeImpl{
config: cfg,
logger: logger,
genesisDoc: genDoc,
nodeInfo: nodeInfo,
nodeKey: nodeKey,
peerManager: peerManager,
router: router,
shutdownOps: closer,
pexReactor: pexReactor,
}
node.BaseService = *service.NewBaseService(logger, "SeedNode", node)
return node, nil
}
// OnStart starts the Seed Node. It implements service.Service.
func (n *seedNodeImpl) OnStart(ctx context.Context) error {
if n.config.RPC.PprofListenAddress != "" {
rpcCtx, rpcCancel := context.WithCancel(ctx)
srv := &http.Server{Addr: n.config.RPC.PprofListenAddress, Handler: nil}
go func() {
select {
case <-ctx.Done():
sctx, scancel := context.WithTimeout(context.Background(), time.Second)
defer scancel()
_ = srv.Shutdown(sctx)
case <-rpcCtx.Done():
}
}()
go func() {
n.logger.Info("Starting pprof server", "laddr", n.config.RPC.PprofListenAddress)
if err := srv.ListenAndServe(); err != nil {
n.logger.Error("pprof server error", "err", err)
rpcCancel()
}
}()
}
now := tmtime.Now()
genTime := n.genesisDoc.GenesisTime
if genTime.After(now) {
n.logger.Info("Genesis time is in the future. Sleeping until then...", "genTime", genTime)
time.Sleep(genTime.Sub(now))
}
// Start the transport.
if err := n.router.Start(ctx); err != nil {
return err
}
n.isListening = true
if n.config.P2P.PexReactor {
if err := n.pexReactor.Start(ctx); err != nil {
return err
}
}
return nil
}
// OnStop stops the Seed Node. It implements service.Service.
func (n *seedNodeImpl) OnStop() {
n.logger.Info("Stopping Node")
n.pexReactor.Wait()
n.router.Wait()
n.isListening = false
if err := n.shutdownOps(); err != nil {
if strings.TrimSpace(err.Error()) != "" {
n.logger.Error("problem shutting down additional services", "err", err)
}
}
}
// OnStart starts the Node. It implements service.Service. // OnStart starts the Node. It implements service.Service.
func (n *nodeImpl) OnStart(ctx context.Context) error { func (n *nodeImpl) OnStart(ctx context.Context) error {
if n.config.RPC.PprofListenAddress != "" { if n.config.RPC.PprofListenAddress != "" {


+ 171
- 0
node/seed.go View File

@ -0,0 +1,171 @@
package node
import (
"context"
"errors"
"fmt"
"net/http"
"time"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/pex"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/libs/strings"
tmtime "github.com/tendermint/tendermint/libs/time"
"github.com/tendermint/tendermint/types"
)
type seedNodeImpl struct {
service.BaseService
logger log.Logger
// config
config *config.Config
genesisDoc *types.GenesisDoc // initial validator set
// network
peerManager *p2p.PeerManager
router *p2p.Router
nodeInfo types.NodeInfo
nodeKey types.NodeKey // our node privkey
isListening bool
// services
pexReactor service.Service // for exchanging peer addresses
shutdownOps closer
}
// makeSeedNode returns a new seed node, containing only p2p, pex reactor
func makeSeedNode(
ctx context.Context,
cfg *config.Config,
dbProvider config.DBProvider,
nodeKey types.NodeKey,
genesisDocProvider genesisDocProvider,
logger log.Logger,
) (service.Service, error) {
if !cfg.P2P.PexReactor {
return nil, errors.New("cannot run seed nodes with PEX disabled")
}
genDoc, err := genesisDocProvider()
if err != nil {
return nil, err
}
state, err := sm.MakeGenesisState(genDoc)
if err != nil {
return nil, err
}
nodeInfo, err := makeSeedNodeInfo(cfg, nodeKey, genDoc, state)
if err != nil {
return nil, err
}
// Setup Transport and Switch.
p2pMetrics := p2p.PrometheusMetrics(cfg.Instrumentation.Namespace, "chain_id", genDoc.ChainID)
peerManager, closer, err := createPeerManager(cfg, dbProvider, nodeKey.ID)
if err != nil {
return nil, combineCloseError(
fmt.Errorf("failed to create peer manager: %w", err),
closer)
}
router, err := createRouter(ctx, logger, p2pMetrics, nodeInfo, nodeKey,
peerManager, cfg, nil)
if err != nil {
return nil, combineCloseError(
fmt.Errorf("failed to create router: %w", err),
closer)
}
pexReactor, err := pex.NewReactor(ctx, logger, peerManager, router.OpenChannel, peerManager.Subscribe(ctx))
if err != nil {
return nil, combineCloseError(err, closer)
}
node := &seedNodeImpl{
config: cfg,
logger: logger,
genesisDoc: genDoc,
nodeInfo: nodeInfo,
nodeKey: nodeKey,
peerManager: peerManager,
router: router,
shutdownOps: closer,
pexReactor: pexReactor,
}
node.BaseService = *service.NewBaseService(logger, "SeedNode", node)
return node, nil
}
// OnStart starts the Seed Node. It implements service.Service.
func (n *seedNodeImpl) OnStart(ctx context.Context) error {
if n.config.RPC.PprofListenAddress != "" {
rpcCtx, rpcCancel := context.WithCancel(ctx)
srv := &http.Server{Addr: n.config.RPC.PprofListenAddress, Handler: nil}
go func() {
select {
case <-ctx.Done():
sctx, scancel := context.WithTimeout(context.Background(), time.Second)
defer scancel()
_ = srv.Shutdown(sctx)
case <-rpcCtx.Done():
}
}()
go func() {
n.logger.Info("Starting pprof server", "laddr", n.config.RPC.PprofListenAddress)
if err := srv.ListenAndServe(); err != nil {
n.logger.Error("pprof server error", "err", err)
rpcCancel()
}
}()
}
now := tmtime.Now()
genTime := n.genesisDoc.GenesisTime
if genTime.After(now) {
n.logger.Info("Genesis time is in the future. Sleeping until then...", "genTime", genTime)
time.Sleep(genTime.Sub(now))
}
// Start the transport.
if err := n.router.Start(ctx); err != nil {
return err
}
n.isListening = true
if n.config.P2P.PexReactor {
if err := n.pexReactor.Start(ctx); err != nil {
return err
}
}
return nil
}
// OnStop stops the Seed Node. It implements service.Service.
func (n *seedNodeImpl) OnStop() {
n.logger.Info("Stopping Node")
n.pexReactor.Wait()
n.router.Wait()
n.isListening = false
if err := n.shutdownOps(); err != nil {
if strings.TrimSpace(err.Error()) != "" {
n.logger.Error("problem shutting down additional services", "err", err)
}
}
}

Loading…
Cancel
Save