Browse Source

statesync: reactor and channel construction (#7529)

pull/7530/head
Sam Kleinman 2 years ago
committed by GitHub
parent
commit
fc36c7782f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 56 additions and 31 deletions
  1. +30
    -9
      internal/statesync/reactor.go
  2. +20
    -5
      internal/statesync/reactor_test.go
  3. +6
    -17
      node/node.go

+ 30
- 9
internal/statesync/reactor.go View File

@ -71,9 +71,9 @@ const (
maxLightBlockRequestRetries = 20
)
func GetChannelDescriptors() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{
{
func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor {
return map[p2p.ChannelID]*p2p.ChannelDescriptor{
SnapshotChannel: {
ID: SnapshotChannel,
MessageType: new(ssproto.Message),
@ -82,7 +82,7 @@ func GetChannelDescriptors() []*p2p.ChannelDescriptor {
RecvMessageCapacity: snapshotMsgSize,
RecvBufferCapacity: 128,
},
{
ChunkChannel: {
ID: ChunkChannel,
Priority: 3,
MessageType: new(ssproto.Message),
@ -90,7 +90,7 @@ func GetChannelDescriptors() []*p2p.ChannelDescriptor {
RecvMessageCapacity: chunkMsgSize,
RecvBufferCapacity: 128,
},
{
LightBlockChannel: {
ID: LightBlockChannel,
MessageType: new(ssproto.Message),
Priority: 5,
@ -98,7 +98,7 @@ func GetChannelDescriptors() []*p2p.ChannelDescriptor {
RecvMessageCapacity: lightBlockMsgSize,
RecvBufferCapacity: 128,
},
{
ParamsChannel: {
ID: ParamsChannel,
MessageType: new(ssproto.Message),
Priority: 2,
@ -166,19 +166,40 @@ type Reactor struct {
// and querying, references to p2p Channels and a channel to listen for peer
// updates on. Note, the reactor will close all p2p Channels when stopping.
func NewReactor(
ctx context.Context,
chainID string,
initialHeight int64,
cfg config.StateSyncConfig,
logger log.Logger,
conn proxy.AppConnSnapshot,
connQuery proxy.AppConnQuery,
snapshotCh, chunkCh, blockCh, paramsCh *p2p.Channel,
channelCreator p2p.ChannelCreator,
peerUpdates *p2p.PeerUpdates,
stateStore sm.Store,
blockStore *store.BlockStore,
tempDir string,
ssMetrics *Metrics,
) *Reactor {
) (*Reactor, error) {
chDesc := getChannelDescriptors()
snapshotCh, err := channelCreator(ctx, chDesc[SnapshotChannel])
if err != nil {
return nil, err
}
chunkCh, err := channelCreator(ctx, chDesc[ChunkChannel])
if err != nil {
return nil, err
}
blockCh, err := channelCreator(ctx, chDesc[LightBlockChannel])
if err != nil {
return nil, err
}
paramsCh, err := channelCreator(ctx, chDesc[ParamsChannel])
if err != nil {
return nil, err
}
r := &Reactor{
logger: logger,
chainID: chainID,
@ -201,7 +222,7 @@ func NewReactor(
}
r.BaseService = *service.NewBaseService(logger, "StateSync", r)
return r
return r, nil
}
// OnStart starts separate go routines for each p2p Channel and listens for


+ 20
- 5
internal/statesync/reactor_test.go View File

@ -146,23 +146,38 @@ func setup(
cfg := config.DefaultStateSyncConfig()
rts.reactor = NewReactor(
chCreator := func(ctx context.Context, desc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
switch desc.ID {
case SnapshotChannel:
return rts.snapshotChannel, nil
case ChunkChannel:
return rts.chunkChannel, nil
case LightBlockChannel:
return rts.blockChannel, nil
case ParamsChannel:
return rts.paramsChannel, nil
default:
return nil, fmt.Errorf("invalid channel; %v", desc.ID)
}
}
var err error
rts.reactor, err = NewReactor(
ctx,
factory.DefaultTestChainID,
1,
*cfg,
log.TestingLogger(),
conn,
connQuery,
rts.snapshotChannel,
rts.chunkChannel,
rts.blockChannel,
rts.paramsChannel,
chCreator,
rts.peerUpdates,
rts.stateStore,
rts.blockStore,
"",
m,
)
require.NoError(t, err)
rts.syncer = newSyncer(
*cfg,


+ 6
- 17
node/node.go View File

@ -349,35 +349,24 @@ func makeNode(
// FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy,
// we should clean this whole thing up. See:
// https://github.com/tendermint/tendermint/issues/4644
ssChDesc := statesync.GetChannelDescriptors()
channels := make(map[p2p.ChannelID]*p2p.Channel, len(ssChDesc))
for idx := range ssChDesc {
chd := ssChDesc[idx]
ch, err := router.OpenChannel(ctx, chd)
if err != nil {
return nil, err
}
channels[ch.ID] = ch
}
stateSyncReactor := statesync.NewReactor(
stateSyncReactor, err := statesync.NewReactor(
ctx,
genDoc.ChainID,
genDoc.InitialHeight,
*cfg.StateSync,
logger.With("module", "statesync"),
proxyApp.Snapshot(),
proxyApp.Query(),
channels[statesync.SnapshotChannel],
channels[statesync.ChunkChannel],
channels[statesync.LightBlockChannel],
channels[statesync.ParamsChannel],
router.OpenChannel,
peerManager.Subscribe(ctx),
stateStore,
blockStore,
cfg.StateSync.TempDir,
nodeMetrics.statesync,
)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
var pexReactor service.Service
if cfg.P2P.PexReactor {


Loading…
Cancel
Save