Browse Source

node: new concrete type for seed node implementation (#7521)

Defines a different concrete type that satisfies the service interface for a seed node.
update the seed node unit test to assert the new type.

Fixes #6775
pull/7552/head
Kene 3 years ago
committed by GitHub
parent
commit
2f858f1448
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 105 additions and 30 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +103
    -29
      node/node.go
  3. +1
    -1
      node/node_test.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -51,6 +51,7 @@ Special thanks to external contributors on this release:
- [internal/protoio] \#7325 Optimized `MarshalDelimited` by inlining the common case and using a `sync.Pool` in the worst case. (@odeke-em) - [internal/protoio] \#7325 Optimized `MarshalDelimited` by inlining the common case and using a `sync.Pool` in the worst case. (@odeke-em)
- [pubsub] \#7319 Performance improvements for the event query API (@creachadair) - [pubsub] \#7319 Performance improvements for the event query API (@creachadair)
- [node] \#7521 Define concrete type for seed node implementation (@spacech1mp)
### BUG FIXES ### BUG FIXES


+ 103
- 29
node/node.go View File

@ -82,6 +82,26 @@ type nodeImpl struct {
prometheusSrv *http.Server prometheusSrv *http.Server
} }
type seedNodeImpl struct {
service.BaseService
logger log.Logger
// config
config *config.Config
genesisDoc *types.GenesisDoc // initial validator set
// network
peerManager *p2p.PeerManager
router *p2p.Router
nodeInfo types.NodeInfo
nodeKey types.NodeKey // our node privkey
isListening bool
// services
pexReactor service.Service // for exchanging peer addresses
shutdownOps closer
}
// newDefaultNode returns a Tendermint node with default settings for the // newDefaultNode returns a Tendermint node with default settings for the
// PrivValidator, ClientCreator, GenesisDoc, and DBProvider. // PrivValidator, ClientCreator, GenesisDoc, and DBProvider.
// It implements NodeProvider. // It implements NodeProvider.
@ -493,7 +513,7 @@ func makeSeedNode(
return nil, combineCloseError(err, closer) return nil, combineCloseError(err, closer)
} }
node := &nodeImpl{
node := &seedNodeImpl{
config: cfg, config: cfg,
logger: logger, logger: logger,
genesisDoc: genDoc, genesisDoc: genDoc,
@ -512,6 +532,68 @@ func makeSeedNode(
return node, nil return node, nil
} }
// OnStart starts the Seed Node. It implements service.Service.
func (n *seedNodeImpl) OnStart(ctx context.Context) error {
if n.config.RPC.PprofListenAddress != "" {
rpcCtx, rpcCancel := context.WithCancel(ctx)
srv := &http.Server{Addr: n.config.RPC.PprofListenAddress, Handler: nil}
go func() {
select {
case <-ctx.Done():
sctx, scancel := context.WithTimeout(context.Background(), time.Second)
defer scancel()
_ = srv.Shutdown(sctx)
case <-rpcCtx.Done():
}
}()
go func() {
n.logger.Info("Starting pprof server", "laddr", n.config.RPC.PprofListenAddress)
if err := srv.ListenAndServe(); err != nil {
n.logger.Error("pprof server error", "err", err)
rpcCancel()
}
}()
}
now := tmtime.Now()
genTime := n.genesisDoc.GenesisTime
if genTime.After(now) {
n.logger.Info("Genesis time is in the future. Sleeping until then...", "genTime", genTime)
time.Sleep(genTime.Sub(now))
}
// Start the transport.
if err := n.router.Start(ctx); err != nil {
return err
}
n.isListening = true
if n.config.P2P.PexReactor {
if err := n.pexReactor.Start(ctx); err != nil {
return err
}
}
return nil
}
// OnStop stops the Seed Node. It implements service.Service.
func (n *seedNodeImpl) OnStop() {
n.logger.Info("Stopping Node")
n.pexReactor.Wait()
n.router.Wait()
n.isListening = false
if err := n.shutdownOps(); err != nil {
if strings.TrimSpace(err.Error()) != "" {
n.logger.Error("problem shutting down additional services", "err", err)
}
}
}
// OnStart starts the Node. It implements service.Service. // OnStart starts the Node. It implements service.Service.
func (n *nodeImpl) OnStart(ctx context.Context) error { func (n *nodeImpl) OnStart(ctx context.Context) error {
if n.config.RPC.PprofListenAddress != "" { if n.config.RPC.PprofListenAddress != "" {
@ -546,7 +628,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
// Start the RPC server before the P2P server // Start the RPC server before the P2P server
// so we can eg. receive txs for the first block // so we can eg. receive txs for the first block
if n.config.RPC.ListenAddress != "" && n.config.Mode != config.ModeSeed {
if n.config.RPC.ListenAddress != "" {
listeners, err := n.startRPC(ctx) listeners, err := n.startRPC(ctx)
if err != nil { if err != nil {
return err return err
@ -564,30 +646,24 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
} }
n.isListening = true n.isListening = true
if n.config.Mode != config.ModeSeed {
if err := n.bcReactor.Start(ctx); err != nil {
return err
}
if err := n.bcReactor.Start(ctx); err != nil {
return err
}
// Start the real consensus reactor separately since the switch uses the shim.
if err := n.consensusReactor.Start(ctx); err != nil {
return err
}
if err := n.consensusReactor.Start(ctx); err != nil {
return err
}
// Start the real state sync reactor separately since the switch uses the shim.
if err := n.stateSyncReactor.Start(ctx); err != nil {
return err
}
if err := n.stateSyncReactor.Start(ctx); err != nil {
return err
}
// Start the real mempool reactor separately since the switch uses the shim.
if err := n.mempoolReactor.Start(ctx); err != nil {
return err
}
if err := n.mempoolReactor.Start(ctx); err != nil {
return err
}
// Start the real evidence reactor separately since the switch uses the shim.
if err := n.evidenceReactor.Start(ctx); err != nil {
return err
}
if err := n.evidenceReactor.Start(ctx); err != nil {
return err
} }
if n.config.P2P.PexReactor { if n.config.P2P.PexReactor {
@ -686,13 +762,11 @@ func (n *nodeImpl) OnStop() {
} }
} }
if n.config.Mode != config.ModeSeed {
n.bcReactor.Wait()
n.consensusReactor.Wait()
n.stateSyncReactor.Wait()
n.mempoolReactor.Wait()
n.evidenceReactor.Wait()
}
n.bcReactor.Wait()
n.consensusReactor.Wait()
n.stateSyncReactor.Wait()
n.mempoolReactor.Wait()
n.evidenceReactor.Wait()
n.pexReactor.Wait() n.pexReactor.Wait()
n.router.Wait() n.router.Wait()
n.isListening = false n.isListening = false


+ 1
- 1
node/node_test.go View File

@ -556,7 +556,7 @@ func TestNodeNewSeedNode(t *testing.T) {
t.Cleanup(ns.Wait) t.Cleanup(ns.Wait)
require.NoError(t, err) require.NoError(t, err)
n, ok := ns.(*nodeImpl)
n, ok := ns.(*seedNodeImpl)
require.True(t, ok) require.True(t, ok)
err = n.Start(ctx) err = n.Start(ctx)


Loading…
Cancel
Save