diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 8f39f54bc..b9113cc21 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -71,9 +71,9 @@ const ( maxLightBlockRequestRetries = 20 ) -func GetChannelDescriptors() []*p2p.ChannelDescriptor { - return []*p2p.ChannelDescriptor{ - { +func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor { + return map[p2p.ChannelID]*p2p.ChannelDescriptor{ + SnapshotChannel: { ID: SnapshotChannel, MessageType: new(ssproto.Message), @@ -82,7 +82,7 @@ func GetChannelDescriptors() []*p2p.ChannelDescriptor { RecvMessageCapacity: snapshotMsgSize, RecvBufferCapacity: 128, }, - { + ChunkChannel: { ID: ChunkChannel, Priority: 3, MessageType: new(ssproto.Message), @@ -90,7 +90,7 @@ func GetChannelDescriptors() []*p2p.ChannelDescriptor { RecvMessageCapacity: chunkMsgSize, RecvBufferCapacity: 128, }, - { + LightBlockChannel: { ID: LightBlockChannel, MessageType: new(ssproto.Message), Priority: 5, @@ -98,7 +98,7 @@ func GetChannelDescriptors() []*p2p.ChannelDescriptor { RecvMessageCapacity: lightBlockMsgSize, RecvBufferCapacity: 128, }, - { + ParamsChannel: { ID: ParamsChannel, MessageType: new(ssproto.Message), Priority: 2, @@ -166,19 +166,40 @@ type Reactor struct { // and querying, references to p2p Channels and a channel to listen for peer // updates on. Note, the reactor will close all p2p Channels when stopping. func NewReactor( + ctx context.Context, chainID string, initialHeight int64, cfg config.StateSyncConfig, logger log.Logger, conn proxy.AppConnSnapshot, connQuery proxy.AppConnQuery, - snapshotCh, chunkCh, blockCh, paramsCh *p2p.Channel, + channelCreator p2p.ChannelCreator, peerUpdates *p2p.PeerUpdates, stateStore sm.Store, blockStore *store.BlockStore, tempDir string, ssMetrics *Metrics, -) *Reactor { +) (*Reactor, error) { + + chDesc := getChannelDescriptors() + + snapshotCh, err := channelCreator(ctx, chDesc[SnapshotChannel]) + if err != nil { + return nil, err + } + chunkCh, err := channelCreator(ctx, chDesc[ChunkChannel]) + if err != nil { + return nil, err + } + blockCh, err := channelCreator(ctx, chDesc[LightBlockChannel]) + if err != nil { + return nil, err + } + paramsCh, err := channelCreator(ctx, chDesc[ParamsChannel]) + if err != nil { + return nil, err + } + r := &Reactor{ logger: logger, chainID: chainID, @@ -201,7 +222,7 @@ func NewReactor( } r.BaseService = *service.NewBaseService(logger, "StateSync", r) - return r + return r, nil } // OnStart starts separate go routines for each p2p Channel and listens for diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index 127814493..a53cd3a68 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -146,23 +146,38 @@ func setup( cfg := config.DefaultStateSyncConfig() - rts.reactor = NewReactor( + chCreator := func(ctx context.Context, desc *p2p.ChannelDescriptor) (*p2p.Channel, error) { + switch desc.ID { + case SnapshotChannel: + return rts.snapshotChannel, nil + case ChunkChannel: + return rts.chunkChannel, nil + case LightBlockChannel: + return rts.blockChannel, nil + case ParamsChannel: + return rts.paramsChannel, nil + default: + return nil, fmt.Errorf("invalid channel; %v", desc.ID) + } + } + + var err error + rts.reactor, err = NewReactor( + ctx, factory.DefaultTestChainID, 1, *cfg, log.TestingLogger(), conn, connQuery, - rts.snapshotChannel, - rts.chunkChannel, - rts.blockChannel, - rts.paramsChannel, + chCreator, rts.peerUpdates, rts.stateStore, rts.blockStore, "", m, ) + require.NoError(t, err) rts.syncer = newSyncer( *cfg, diff --git a/node/node.go b/node/node.go index 9874f6dab..704bf7faa 100644 --- a/node/node.go +++ b/node/node.go @@ -349,35 +349,24 @@ func makeNode( // FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy, // we should clean this whole thing up. See: // https://github.com/tendermint/tendermint/issues/4644 - ssChDesc := statesync.GetChannelDescriptors() - channels := make(map[p2p.ChannelID]*p2p.Channel, len(ssChDesc)) - for idx := range ssChDesc { - chd := ssChDesc[idx] - ch, err := router.OpenChannel(ctx, chd) - if err != nil { - return nil, err - } - - channels[ch.ID] = ch - } - - stateSyncReactor := statesync.NewReactor( + stateSyncReactor, err := statesync.NewReactor( + ctx, genDoc.ChainID, genDoc.InitialHeight, *cfg.StateSync, logger.With("module", "statesync"), proxyApp.Snapshot(), proxyApp.Query(), - channels[statesync.SnapshotChannel], - channels[statesync.ChunkChannel], - channels[statesync.LightBlockChannel], - channels[statesync.ParamsChannel], + router.OpenChannel, peerManager.Subscribe(ctx), stateStore, blockStore, cfg.StateSync.TempDir, nodeMetrics.statesync, ) + if err != nil { + return nil, combineCloseError(err, makeCloser(closers)) + } var pexReactor service.Service if cfg.P2P.PexReactor {