diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index eebfda764..43c3e83cd 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -17,27 +17,7 @@ import ( "github.com/tendermint/tendermint/types" ) -var ( - _ service.Service = (*Reactor)(nil) - - // ChannelShims contains a map of ChannelDescriptorShim objects, where each - // object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding - // p2p proto.Message the new p2p Channel is responsible for handling. - // - // - // TODO: Remove once p2p refactor is complete. - // ref: https://github.com/tendermint/tendermint/issues/5670 - ChannelShims = []*p2p.ChannelDescriptor{ - { - ID: BlockSyncChannel, - MessageType: new(bcproto.Message), - Priority: 5, - SendQueueCapacity: 1000, - RecvBufferCapacity: 1024, - RecvMessageCapacity: MaxMsgSize, - }, - } -) +var _ service.Service = (*Reactor)(nil) const ( // BlockSyncChannel is a channel for blocks and status updates @@ -55,6 +35,17 @@ const ( syncTimeout = 60 * time.Second ) +func GetChannelDescriptor() *p2p.ChannelDescriptor { + return &p2p.ChannelDescriptor{ + ID: BlockSyncChannel, + MessageType: new(bcproto.Message), + Priority: 5, + SendQueueCapacity: 1000, + RecvBufferCapacity: 1024, + RecvMessageCapacity: MaxMsgSize, + } +} + type consensusReactor interface { // For when we switch from block sync reactor to the consensus // machine. diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index eb8345b72..62517fd4f 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -21,15 +21,12 @@ import ( var ( _ service.Service = (*Reactor)(nil) _ p2p.Wrapper = (*tmcons.Message)(nil) +) - // ChannelShims contains a map of ChannelDescriptorShim objects, where each - // object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding - // p2p proto.Message the new p2p Channel is responsible for handling. - // - // - // TODO: Remove once p2p refactor is complete. - // ref: https://github.com/tendermint/tendermint/issues/5670 - ChannelShims = []*p2p.ChannelDescriptor{ +// GetChannelDescriptor produces an instance of a descriptor for this +// package's required channels. +func GetChannelDescriptors() []*p2p.ChannelDescriptor { + return []*p2p.ChannelDescriptor{ { ID: StateChannel, MessageType: new(tmcons.Message), @@ -66,7 +63,7 @@ var ( RecvMessageCapacity: maxMsgSize, }, } -) +} const ( StateChannel = p2p.ChannelID(0x20) diff --git a/internal/evidence/reactor.go b/internal/evidence/reactor.go index 6f96b3906..c2f25bd36 100644 --- a/internal/evidence/reactor.go +++ b/internal/evidence/reactor.go @@ -15,26 +15,7 @@ import ( "github.com/tendermint/tendermint/types" ) -var ( - _ service.Service = (*Reactor)(nil) - - // ChannelShims contains a map of ChannelDescriptorShim objects, where each - // object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding - // p2p proto.Message the new p2p Channel is responsible for handling. - // - // - // TODO: Remove once p2p refactor is complete. - // ref: https://github.com/tendermint/tendermint/issues/5670 - ChannelShims = []*p2p.ChannelDescriptor{ - { - ID: EvidenceChannel, - MessageType: new(tmproto.EvidenceList), - Priority: 6, - RecvMessageCapacity: maxMsgSize, - RecvBufferCapacity: 32, - }, - } -) +var _ service.Service = (*Reactor)(nil) const ( EvidenceChannel = p2p.ChannelID(0x38) @@ -48,6 +29,18 @@ const ( broadcastEvidenceIntervalS = 10 ) +// GetChannelDescriptor produces an instance of a descriptor for this +// package's required channels. +func GetChannelDescriptor() *p2p.ChannelDescriptor { + return &p2p.ChannelDescriptor{ + ID: EvidenceChannel, + MessageType: new(tmproto.EvidenceList), + Priority: 6, + RecvMessageCapacity: maxMsgSize, + RecvBufferCapacity: 32, + } +} + // Reactor handles evpool evidence broadcasting amongst peers. type Reactor struct { service.BaseService diff --git a/internal/mempool/v0/reactor.go b/internal/mempool/v0/reactor.go index d2a621829..118321645 100644 --- a/internal/mempool/v0/reactor.go +++ b/internal/mempool/v0/reactor.go @@ -83,14 +83,9 @@ func NewReactor( return r } -// GetChannelShims returns a map of ChannelDescriptorShim objects, where each -// object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding -// p2p proto.Message the new p2p Channel is responsible for handling. -// -// -// TODO: Remove once p2p refactor is complete. -// ref: https://github.com/tendermint/tendermint/issues/5670 -func GetChannelShims(cfg *config.MempoolConfig) []*p2p.ChannelDescriptor { +// GetChannelDescriptor produces an instance of a descriptor for this +// package's required channels. +func GetChannelDescriptor(cfg *config.MempoolConfig) *p2p.ChannelDescriptor { largestTx := make([]byte, cfg.MaxTxBytes) batchMsg := protomem.Message{ Sum: &protomem.Message_Txs{ @@ -98,14 +93,12 @@ func GetChannelShims(cfg *config.MempoolConfig) []*p2p.ChannelDescriptor { }, } - return []*p2p.ChannelDescriptor{ - { - ID: mempool.MempoolChannel, - MessageType: new(protomem.Message), - Priority: 5, - RecvMessageCapacity: batchMsg.Size(), - RecvBufferCapacity: 128, - }, + return &p2p.ChannelDescriptor{ + ID: mempool.MempoolChannel, + MessageType: new(protomem.Message), + Priority: 5, + RecvMessageCapacity: batchMsg.Size(), + RecvBufferCapacity: 128, } } diff --git a/internal/mempool/v0/reactor_test.go b/internal/mempool/v0/reactor_test.go index 104e1d4bd..69582284b 100644 --- a/internal/mempool/v0/reactor_test.go +++ b/internal/mempool/v0/reactor_test.go @@ -50,7 +50,7 @@ func setup(t *testing.T, config *config.MempoolConfig, numNodes int, chBuf uint) peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes), } - chDesc := GetChannelShims(config)[0] + chDesc := GetChannelDescriptor(config) chDesc.RecvBufferCapacity = int(chBuf) rts.mempoolChnnels = rts.network.MakeChannelsNoCleanup(t, chDesc) diff --git a/internal/mempool/v1/reactor.go b/internal/mempool/v1/reactor.go index e4b04c931..8ef5a6bd8 100644 --- a/internal/mempool/v1/reactor.go +++ b/internal/mempool/v1/reactor.go @@ -90,14 +90,9 @@ func NewReactor( func defaultObservePanic(r interface{}) {} -// GetChannelShims returns a map of ChannelDescriptorShim objects, where each -// object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding -// p2p proto.Message the new p2p Channel is responsible for handling. -// -// -// TODO: Remove once p2p refactor is complete. -// ref: https://github.com/tendermint/tendermint/issues/5670 -func GetChannelShims(cfg *config.MempoolConfig) []*p2p.ChannelDescriptor { +// GetChannelDescriptor produces an instance of a descriptor for this +// package's required channels. +func GetChannelDescriptor(cfg *config.MempoolConfig) *p2p.ChannelDescriptor { largestTx := make([]byte, cfg.MaxTxBytes) batchMsg := protomem.Message{ Sum: &protomem.Message_Txs{ @@ -105,14 +100,12 @@ func GetChannelShims(cfg *config.MempoolConfig) []*p2p.ChannelDescriptor { }, } - return []*p2p.ChannelDescriptor{ - { - ID: mempool.MempoolChannel, - MessageType: new(protomem.Message), - Priority: 5, - RecvMessageCapacity: batchMsg.Size(), - RecvBufferCapacity: 128, - }, + return &p2p.ChannelDescriptor{ + ID: mempool.MempoolChannel, + MessageType: new(protomem.Message), + Priority: 5, + RecvMessageCapacity: batchMsg.Size(), + RecvBufferCapacity: 128, } } diff --git a/internal/mempool/v1/reactor_test.go b/internal/mempool/v1/reactor_test.go index f004e75a9..56e6057a1 100644 --- a/internal/mempool/v1/reactor_test.go +++ b/internal/mempool/v1/reactor_test.go @@ -50,7 +50,7 @@ func setupReactors(t *testing.T, numNodes int, chBuf uint) *reactorTestSuite { peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes), } - chDesc := GetChannelShims(cfg.Mempool)[0] + chDesc := GetChannelDescriptor(cfg.Mempool) rts.mempoolChannels = rts.network.MakeChannelsNoCleanup(t, chDesc) for nodeID := range rts.network.Nodes { diff --git a/internal/p2p/conn/connection.go b/internal/p2p/conn/connection.go index 1e149a2e5..a99e83dc5 100644 --- a/internal/p2p/conn/connection.go +++ b/internal/p2p/conn/connection.go @@ -81,8 +81,8 @@ type MConnection struct { recvMonitor *flow.Monitor send chan struct{} pong chan struct{} - channels []*Channel - channelsIdx map[ChannelID]*Channel + channels []*channel + channelsIdx map[ChannelID]*channel onReceive receiveCbFunc onError errorCbFunc errored uint32 @@ -186,8 +186,8 @@ func NewMConnectionWithConfig( } // Create channels - var channelsIdx = map[ChannelID]*Channel{} - var channels = []*Channel{} + var channelsIdx = map[ChannelID]*channel{} + var channels = []*channel{} for _, desc := range chDescs { channel := newChannel(mconn, *desc) @@ -436,7 +436,7 @@ func (c *MConnection) sendPacketMsg() bool { // Choose a channel to create a PacketMsg from. // The chosen channel will be the one whose recentlySent/priority is the least. var leastRatio float32 = math.MaxFloat32 - var leastChannel *Channel + var leastChannel *channel for _, channel := range c.channels { // If nothing to send, skip this channel if !channel.isSendPending() { @@ -639,9 +639,8 @@ func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) { return } -// TODO: lowercase. // NOTE: not goroutine-safe. -type Channel struct { +type channel struct { // Exponential moving average. // This field must be accessed atomically. // It is first in the struct to ensure correct alignment. @@ -660,12 +659,12 @@ type Channel struct { Logger log.Logger } -func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel { +func newChannel(conn *MConnection, desc ChannelDescriptor) *channel { desc = desc.FillDefaults() if desc.Priority <= 0 { panic("Channel default priority must be a positive integer") } - return &Channel{ + return &channel{ conn: conn, desc: desc, sendQueue: make(chan []byte, desc.SendQueueCapacity), @@ -674,14 +673,14 @@ func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel { } } -func (ch *Channel) SetLogger(l log.Logger) { +func (ch *channel) SetLogger(l log.Logger) { ch.Logger = l } // Queues message to send to this channel. // Goroutine-safe // Times out (and returns false) after defaultSendTimeout -func (ch *Channel) sendBytes(bytes []byte) bool { +func (ch *channel) sendBytes(bytes []byte) bool { select { case ch.sendQueue <- bytes: atomic.AddInt32(&ch.sendQueueSize, 1) @@ -694,7 +693,7 @@ func (ch *Channel) sendBytes(bytes []byte) bool { // Returns true if any PacketMsgs are pending to be sent. // Call before calling nextPacketMsg() // Goroutine-safe -func (ch *Channel) isSendPending() bool { +func (ch *channel) isSendPending() bool { if len(ch.sending) == 0 { if len(ch.sendQueue) == 0 { return false @@ -706,7 +705,7 @@ func (ch *Channel) isSendPending() bool { // Creates a new PacketMsg to send. // Not goroutine-safe -func (ch *Channel) nextPacketMsg() tmp2p.PacketMsg { +func (ch *channel) nextPacketMsg() tmp2p.PacketMsg { packet := tmp2p.PacketMsg{ChannelID: int32(ch.desc.ID)} maxSize := ch.maxPacketMsgPayloadSize packet.Data = ch.sending[:tmmath.MinInt(maxSize, len(ch.sending))] @@ -723,7 +722,7 @@ func (ch *Channel) nextPacketMsg() tmp2p.PacketMsg { // Writes next PacketMsg to w and updates c.recentlySent. // Not goroutine-safe -func (ch *Channel) writePacketMsgTo(w io.Writer) (n int, err error) { +func (ch *channel) writePacketMsgTo(w io.Writer) (n int, err error) { packet := ch.nextPacketMsg() n, err = protoio.NewDelimitedWriter(w).WriteMsg(mustWrapPacket(&packet)) atomic.AddInt64(&ch.recentlySent, int64(n)) @@ -733,7 +732,7 @@ func (ch *Channel) writePacketMsgTo(w io.Writer) (n int, err error) { // Handles incoming PacketMsgs. It returns a message bytes if message is // complete, which is owned by the caller and will not be modified. // Not goroutine-safe -func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) { +func (ch *channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) { ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet) var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Data) if recvCap < recvReceived { @@ -750,7 +749,7 @@ func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) { // Call this periodically to update stats for throttling purposes. // Not goroutine-safe -func (ch *Channel) updateStats() { +func (ch *channel) updateStats() { // Exponential decay of stats. // TODO: optimize. atomic.StoreInt64(&ch.recentlySent, int64(float64(atomic.LoadInt64(&ch.recentlySent))*0.8)) diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 99cb4e6ab..939fb409c 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -28,48 +28,6 @@ import ( var ( _ service.Service = (*Reactor)(nil) _ p2p.Wrapper = (*ssproto.Message)(nil) - - // ChannelShims contains a map of ChannelDescriptorShim objects, where each - // object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding - // p2p proto.Message the new p2p Channel is responsible for handling. - // - // - // TODO: Remove once p2p refactor is complete. - // ref: https://github.com/tendermint/tendermint/issues/5670 - ChannelShims = []*p2p.ChannelDescriptor{ - { - ID: SnapshotChannel, - MessageType: new(ssproto.Message), - Priority: 6, - SendQueueCapacity: 10, - RecvMessageCapacity: snapshotMsgSize, - RecvBufferCapacity: 128, - }, - { - ID: ChunkChannel, - Priority: 3, - MessageType: new(ssproto.Message), - SendQueueCapacity: 4, - RecvMessageCapacity: chunkMsgSize, - RecvBufferCapacity: 128, - }, - { - ID: LightBlockChannel, - MessageType: new(ssproto.Message), - Priority: 5, - SendQueueCapacity: 10, - RecvMessageCapacity: lightBlockMsgSize, - RecvBufferCapacity: 128, - }, - { - ID: ParamsChannel, - MessageType: new(ssproto.Message), - Priority: 2, - SendQueueCapacity: 10, - RecvMessageCapacity: paramMsgSize, - RecvBufferCapacity: 128, - }, - } ) const ( @@ -113,6 +71,45 @@ const ( maxLightBlockRequestRetries = 20 ) +func GetChannelDescriptors() []*p2p.ChannelDescriptor { + return []*p2p.ChannelDescriptor{ + { + + ID: SnapshotChannel, + MessageType: new(ssproto.Message), + Priority: 6, + SendQueueCapacity: 10, + RecvMessageCapacity: snapshotMsgSize, + RecvBufferCapacity: 128, + }, + { + ID: ChunkChannel, + Priority: 3, + MessageType: new(ssproto.Message), + SendQueueCapacity: 4, + RecvMessageCapacity: chunkMsgSize, + RecvBufferCapacity: 128, + }, + { + ID: LightBlockChannel, + MessageType: new(ssproto.Message), + Priority: 5, + SendQueueCapacity: 10, + RecvMessageCapacity: lightBlockMsgSize, + RecvBufferCapacity: 128, + }, + { + ID: ParamsChannel, + MessageType: new(ssproto.Message), + Priority: 2, + SendQueueCapacity: 10, + RecvMessageCapacity: paramMsgSize, + RecvBufferCapacity: 128, + }, + } + +} + // Metricer defines an interface used for the rpc sync info query, please see statesync.metrics // for the details. type Metricer interface { diff --git a/node/node.go b/node/node.go index 0a48bf0b7..727722279 100644 --- a/node/node.go +++ b/node/node.go @@ -303,11 +303,14 @@ func makeNode(cfg *config.Config, sm.BlockExecutorWithMetrics(nodeMetrics.state), ) - csReactor, csState := createConsensusReactor( + csReactor, csState, err := createConsensusReactor( cfg, state, blockExec, blockStore, mp, evPool, privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus, peerManager, router, consensusLogger, ) + if err != nil { + return nil, combineCloseError(err, makeCloser(closers)) + } // Create the blockchain reactor. Note, we do not start block sync if we're // doing a state sync first. @@ -334,7 +337,17 @@ func makeNode(cfg *config.Config, // we should clean this whole thing up. See: // https://github.com/tendermint/tendermint/issues/4644 ssLogger := logger.With("module", "statesync") - channels := makeChannelsFromShims(router, statesync.ChannelShims) + ssChDesc := statesync.GetChannelDescriptors() + channels := make(map[p2p.ChannelID]*p2p.Channel, len(ssChDesc)) + for idx := range ssChDesc { + chd := ssChDesc[idx] + ch, err := router.OpenChannel(chd) + if err != nil { + return nil, err + } + + channels[ch.ID] = ch + } peerUpdates := peerManager.Subscribe() stateSyncReactor := statesync.NewReactor( @@ -1088,23 +1101,3 @@ func getRouterConfig(conf *config.Config, proxyApp proxy.AppConns) p2p.RouterOpt return opts } - -// FIXME: Temporary helper function, shims should be removed. -func makeChannelsFromShims( - router *p2p.Router, - chDescs []*p2p.ChannelDescriptor, -) map[p2p.ChannelID]*p2p.Channel { - - channels := map[p2p.ChannelID]*p2p.Channel{} - for idx := range chDescs { - chDesc := chDescs[idx] - ch, err := router.OpenChannel(chDesc) - if err != nil { - panic(fmt.Sprintf("failed to open channel %v: %v", chDesc.ID, err)) - } - - channels[chDesc.ID] = ch - } - - return channels -} diff --git a/node/setup.go b/node/setup.go index fc611d946..40ced410c 100644 --- a/node/setup.go +++ b/node/setup.go @@ -198,13 +198,15 @@ func createMempoolReactor( ) (service.Service, mempool.Mempool, error) { logger = logger.With("module", "mempool", "version", cfg.Mempool.Version) - channelShims := mempoolv0.GetChannelShims(cfg.Mempool) - - channels := makeChannelsFromShims(router, channelShims) peerUpdates := peerManager.Subscribe() switch cfg.Mempool.Version { case config.MempoolV0: + ch, err := router.OpenChannel(mempoolv0.GetChannelDescriptor(cfg.Mempool)) + if err != nil { + return nil, nil, err + } + mp := mempoolv0.NewCListMempool( cfg.Mempool, proxyApp.Mempool(), @@ -221,7 +223,7 @@ func createMempoolReactor( cfg.Mempool, peerManager, mp, - channels[mempool.MempoolChannel], + ch, peerUpdates, ) @@ -232,6 +234,11 @@ func createMempoolReactor( return reactor, mp, nil case config.MempoolV1: + ch, err := router.OpenChannel(mempoolv1.GetChannelDescriptor(cfg.Mempool)) + if err != nil { + return nil, nil, err + } + mp := mempoolv1.NewTxMempool( logger, cfg.Mempool, @@ -247,7 +254,7 @@ func createMempoolReactor( cfg.Mempool, peerManager, mp, - channels[mempool.MempoolChannel], + ch, peerUpdates, ) @@ -283,9 +290,14 @@ func createEvidenceReactor( return nil, nil, fmt.Errorf("creating evidence pool: %w", err) } + ch, err := router.OpenChannel(evidence.GetChannelDescriptor()) + if err != nil { + return nil, nil, fmt.Errorf("creating evidence channel: %w", err) + } + evidenceReactor := evidence.NewReactor( logger, - makeChannelsFromShims(router, evidence.ChannelShims)[evidence.EvidenceChannel], + ch, peerManager.Subscribe(), evidencePool, ) @@ -307,12 +319,16 @@ func createBlockchainReactor( logger = logger.With("module", "blockchain") - channels := makeChannelsFromShims(router, blocksync.ChannelShims) + ch, err := router.OpenChannel(blocksync.GetChannelDescriptor()) + if err != nil { + return nil, err + } + peerUpdates := peerManager.Subscribe() reactor, err := blocksync.NewReactor( logger, state.Copy(), blockExec, blockStore, csReactor, - channels[blocksync.BlockSyncChannel], peerUpdates, blockSync, + ch, peerUpdates, blockSync, metrics, ) if err != nil { @@ -336,7 +352,7 @@ func createConsensusReactor( peerManager *p2p.PeerManager, router *p2p.Router, logger log.Logger, -) (*consensus.Reactor, *consensus.State) { +) (*consensus.Reactor, *consensus.State, error) { consensusState := consensus.NewState( cfg.Consensus, @@ -352,13 +368,19 @@ func createConsensusReactor( consensusState.SetPrivValidator(privValidator) } - var ( - channels map[p2p.ChannelID]*p2p.Channel - peerUpdates *p2p.PeerUpdates - ) + csChDesc := consensus.GetChannelDescriptors() + channels := make(map[p2p.ChannelID]*p2p.Channel, len(csChDesc)) + for idx := range csChDesc { + chd := csChDesc[idx] + ch, err := router.OpenChannel(chd) + if err != nil { + return nil, nil, err + } - channels = makeChannelsFromShims(router, consensus.ChannelShims) - peerUpdates = peerManager.Subscribe() + channels[ch.ID] = ch + } + + peerUpdates := peerManager.Subscribe() reactor := consensus.NewReactor( logger, @@ -376,7 +398,7 @@ func createConsensusReactor( // consensusReactor will set it on consensusState and blockExecutor. reactor.SetEventBus(eventBus) - return reactor, consensusState + return reactor, consensusState, nil } func createTransport(logger log.Logger, cfg *config.Config) *p2p.MConnTransport {