From cbe6ad6cd5261963b0d05722f5ba0c549130d96f Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Fri, 15 Oct 2021 13:03:10 -0400 Subject: [PATCH] p2p: flatten channel descriptor (#7132) --- internal/blocksync/reactor.go | 18 ++++---- internal/blocksync/reactor_test.go | 2 +- internal/consensus/reactor.go | 72 +++++++++++++---------------- internal/consensus/reactor_test.go | 4 +- internal/evidence/reactor.go | 16 +++---- internal/evidence/reactor_test.go | 2 +- internal/mempool/v0/reactor.go | 18 ++++---- internal/mempool/v0/reactor_test.go | 2 +- internal/mempool/v1/reactor.go | 18 ++++---- internal/mempool/v1/reactor_test.go | 2 +- internal/p2p/p2p_test.go | 2 +- internal/p2p/p2ptest/network.go | 12 ++--- internal/p2p/pex/reactor.go | 7 +-- internal/p2p/pqueue.go | 6 +-- internal/p2p/pqueue_test.go | 2 +- internal/p2p/router.go | 8 ++-- internal/p2p/router_test.go | 2 +- internal/p2p/shim.go | 9 ---- internal/statesync/reactor.go | 66 ++++++++++++-------------- node/node.go | 11 +++-- 20 files changed, 124 insertions(+), 155 deletions(-) delete mode 100644 internal/p2p/shim.go diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index 86aaf79d3..eebfda764 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -27,16 +27,14 @@ var ( // // TODO: Remove once p2p refactor is complete. // ref: https://github.com/tendermint/tendermint/issues/5670 - ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ - BlockSyncChannel: { - Descriptor: &p2p.ChannelDescriptor{ - ID: BlockSyncChannel, - MessageType: new(bcproto.Message), - Priority: 5, - SendQueueCapacity: 1000, - RecvBufferCapacity: 1024, - RecvMessageCapacity: MaxMsgSize, - }, + ChannelShims = []*p2p.ChannelDescriptor{ + { + ID: BlockSyncChannel, + MessageType: new(bcproto.Message), + Priority: 5, + SendQueueCapacity: 1000, + RecvBufferCapacity: 1024, + RecvMessageCapacity: MaxMsgSize, }, } ) diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index 59889eec4..568b928a5 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -65,7 +65,7 @@ func setup( blockSync: true, } - chDesc := p2p.ChannelDescriptor{ID: BlockSyncChannel} + chDesc := &p2p.ChannelDescriptor{ID: BlockSyncChannel} rts.blockSyncChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(bcproto.Message), int(chBuf)) i := 0 diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index dde36306c..eb8345b72 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -29,49 +29,41 @@ var ( // // TODO: Remove once p2p refactor is complete. // ref: https://github.com/tendermint/tendermint/issues/5670 - ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ - StateChannel: { - Descriptor: &p2p.ChannelDescriptor{ - ID: StateChannel, - MessageType: new(tmcons.Message), - Priority: 8, - SendQueueCapacity: 64, - RecvMessageCapacity: maxMsgSize, - RecvBufferCapacity: 128, - }, + ChannelShims = []*p2p.ChannelDescriptor{ + { + ID: StateChannel, + MessageType: new(tmcons.Message), + Priority: 8, + SendQueueCapacity: 64, + RecvMessageCapacity: maxMsgSize, + RecvBufferCapacity: 128, }, - DataChannel: { - Descriptor: &p2p.ChannelDescriptor{ - // TODO: Consider a split between gossiping current block and catchup - // stuff. Once we gossip the whole block there is nothing left to send - // until next height or round. - ID: DataChannel, - MessageType: new(tmcons.Message), - Priority: 12, - SendQueueCapacity: 64, - RecvBufferCapacity: 512, - RecvMessageCapacity: maxMsgSize, - }, + { + // TODO: Consider a split between gossiping current block and catchup + // stuff. Once we gossip the whole block there is nothing left to send + // until next height or round. + ID: DataChannel, + MessageType: new(tmcons.Message), + Priority: 12, + SendQueueCapacity: 64, + RecvBufferCapacity: 512, + RecvMessageCapacity: maxMsgSize, }, - VoteChannel: { - Descriptor: &p2p.ChannelDescriptor{ - ID: VoteChannel, - MessageType: new(tmcons.Message), - Priority: 10, - SendQueueCapacity: 64, - RecvBufferCapacity: 128, - RecvMessageCapacity: maxMsgSize, - }, + { + ID: VoteChannel, + MessageType: new(tmcons.Message), + Priority: 10, + SendQueueCapacity: 64, + RecvBufferCapacity: 128, + RecvMessageCapacity: maxMsgSize, }, - VoteSetBitsChannel: { - Descriptor: &p2p.ChannelDescriptor{ - ID: VoteSetBitsChannel, - MessageType: new(tmcons.Message), - Priority: 5, - SendQueueCapacity: 8, - RecvBufferCapacity: 128, - RecvMessageCapacity: maxMsgSize, - }, + { + ID: VoteSetBitsChannel, + MessageType: new(tmcons.Message), + Priority: 5, + SendQueueCapacity: 8, + RecvBufferCapacity: 128, + RecvMessageCapacity: maxMsgSize, }, } ) diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index 04bc3708e..246178db4 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -50,8 +50,8 @@ type reactorTestSuite struct { voteSetBitsChannels map[types.NodeID]*p2p.Channel } -func chDesc(chID p2p.ChannelID) p2p.ChannelDescriptor { - return p2p.ChannelDescriptor{ +func chDesc(chID p2p.ChannelID) *p2p.ChannelDescriptor { + return &p2p.ChannelDescriptor{ ID: chID, } } diff --git a/internal/evidence/reactor.go b/internal/evidence/reactor.go index 0cf261441..6f96b3906 100644 --- a/internal/evidence/reactor.go +++ b/internal/evidence/reactor.go @@ -25,15 +25,13 @@ var ( // // TODO: Remove once p2p refactor is complete. // ref: https://github.com/tendermint/tendermint/issues/5670 - ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ - EvidenceChannel: { - Descriptor: &p2p.ChannelDescriptor{ - ID: EvidenceChannel, - MessageType: new(tmproto.EvidenceList), - Priority: 6, - RecvMessageCapacity: maxMsgSize, - RecvBufferCapacity: 32, - }, + ChannelShims = []*p2p.ChannelDescriptor{ + { + ID: EvidenceChannel, + MessageType: new(tmproto.EvidenceList), + Priority: 6, + RecvMessageCapacity: maxMsgSize, + RecvBufferCapacity: 32, }, } ) diff --git a/internal/evidence/reactor_test.go b/internal/evidence/reactor_test.go index 7963ba959..50db47012 100644 --- a/internal/evidence/reactor_test.go +++ b/internal/evidence/reactor_test.go @@ -62,7 +62,7 @@ func setup(t *testing.T, stateStores []sm.Store, chBuf uint) *reactorTestSuite { peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numStateStores), } - chDesc := p2p.ChannelDescriptor{ID: evidence.EvidenceChannel} + chDesc := &p2p.ChannelDescriptor{ID: evidence.EvidenceChannel} rts.evidenceChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(tmproto.EvidenceList), diff --git a/internal/mempool/v0/reactor.go b/internal/mempool/v0/reactor.go index 010f98f5d..d2a621829 100644 --- a/internal/mempool/v0/reactor.go +++ b/internal/mempool/v0/reactor.go @@ -90,7 +90,7 @@ func NewReactor( // // TODO: Remove once p2p refactor is complete. // ref: https://github.com/tendermint/tendermint/issues/5670 -func GetChannelShims(cfg *config.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDescriptorShim { +func GetChannelShims(cfg *config.MempoolConfig) []*p2p.ChannelDescriptor { largestTx := make([]byte, cfg.MaxTxBytes) batchMsg := protomem.Message{ Sum: &protomem.Message_Txs{ @@ -98,15 +98,13 @@ func GetChannelShims(cfg *config.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDe }, } - return map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ - mempool.MempoolChannel: { - Descriptor: &p2p.ChannelDescriptor{ - ID: mempool.MempoolChannel, - MessageType: new(protomem.Message), - Priority: 5, - RecvMessageCapacity: batchMsg.Size(), - RecvBufferCapacity: 128, - }, + return []*p2p.ChannelDescriptor{ + { + ID: mempool.MempoolChannel, + MessageType: new(protomem.Message), + Priority: 5, + RecvMessageCapacity: batchMsg.Size(), + RecvBufferCapacity: 128, }, } } diff --git a/internal/mempool/v0/reactor_test.go b/internal/mempool/v0/reactor_test.go index 4ae2523a1..2964018d6 100644 --- a/internal/mempool/v0/reactor_test.go +++ b/internal/mempool/v0/reactor_test.go @@ -50,7 +50,7 @@ func setup(t *testing.T, config *config.MempoolConfig, numNodes int, chBuf uint) peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes), } - chDesc := p2p.ChannelDescriptor{ID: mempool.MempoolChannel} + chDesc := &p2p.ChannelDescriptor{ID: mempool.MempoolChannel} rts.mempoolChnnels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(protomem.Message), int(chBuf)) for nodeID := range rts.network.Nodes { diff --git a/internal/mempool/v1/reactor.go b/internal/mempool/v1/reactor.go index 72154b4a8..e4b04c931 100644 --- a/internal/mempool/v1/reactor.go +++ b/internal/mempool/v1/reactor.go @@ -97,7 +97,7 @@ func defaultObservePanic(r interface{}) {} // // TODO: Remove once p2p refactor is complete. // ref: https://github.com/tendermint/tendermint/issues/5670 -func GetChannelShims(cfg *config.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDescriptorShim { +func GetChannelShims(cfg *config.MempoolConfig) []*p2p.ChannelDescriptor { largestTx := make([]byte, cfg.MaxTxBytes) batchMsg := protomem.Message{ Sum: &protomem.Message_Txs{ @@ -105,15 +105,13 @@ func GetChannelShims(cfg *config.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDe }, } - return map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ - mempool.MempoolChannel: { - Descriptor: &p2p.ChannelDescriptor{ - ID: mempool.MempoolChannel, - MessageType: new(protomem.Message), - Priority: 5, - RecvMessageCapacity: batchMsg.Size(), - RecvBufferCapacity: 128, - }, + return []*p2p.ChannelDescriptor{ + { + ID: mempool.MempoolChannel, + MessageType: new(protomem.Message), + Priority: 5, + RecvMessageCapacity: batchMsg.Size(), + RecvBufferCapacity: 128, }, } } diff --git a/internal/mempool/v1/reactor_test.go b/internal/mempool/v1/reactor_test.go index 1449b20b1..bdb66f436 100644 --- a/internal/mempool/v1/reactor_test.go +++ b/internal/mempool/v1/reactor_test.go @@ -52,7 +52,7 @@ func setupReactors(t *testing.T, numNodes int, chBuf uint) *reactorTestSuite { peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes), } - chDesc := p2p.ChannelDescriptor{ID: mempool.MempoolChannel} + chDesc := &p2p.ChannelDescriptor{ID: mempool.MempoolChannel} rts.mempoolChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(protomem.Message), int(chBuf)) for nodeID := range rts.network.Nodes { diff --git a/internal/p2p/p2p_test.go b/internal/p2p/p2p_test.go index f5ed5706c..15e561d9b 100644 --- a/internal/p2p/p2p_test.go +++ b/internal/p2p/p2p_test.go @@ -14,7 +14,7 @@ import ( var ( ctx = context.Background() chID = p2p.ChannelID(1) - chDesc = p2p.ChannelDescriptor{ + chDesc = &p2p.ChannelDescriptor{ ID: chID, Priority: 5, SendQueueCapacity: 10, diff --git a/internal/p2p/p2ptest/network.go b/internal/p2p/p2ptest/network.go index 2ed888764..dff5b6a75 100644 --- a/internal/p2p/p2ptest/network.go +++ b/internal/p2p/p2ptest/network.go @@ -137,7 +137,7 @@ func (n *Network) NodeIDs() []types.NodeID { // doing error checks and cleanups. func (n *Network) MakeChannels( t *testing.T, - chDesc p2p.ChannelDescriptor, + chDesc *p2p.ChannelDescriptor, messageType proto.Message, size int, ) map[types.NodeID]*p2p.Channel { @@ -153,7 +153,7 @@ func (n *Network) MakeChannels( // all the channels. func (n *Network) MakeChannelsNoCleanup( t *testing.T, - chDesc p2p.ChannelDescriptor, + chDesc *p2p.ChannelDescriptor, messageType proto.Message, size int, ) map[types.NodeID]*p2p.Channel { @@ -281,7 +281,7 @@ func (n *Network) MakeNode(t *testing.T, opts NodeOptions) *Node { // MakeChannel opens a channel, with automatic error handling and cleanup. On // test cleanup, it also checks that the channel is empty, to make sure // all expected messages have been asserted. -func (n *Node) MakeChannel(t *testing.T, chDesc p2p.ChannelDescriptor, +func (n *Node) MakeChannel(t *testing.T, chDesc *p2p.ChannelDescriptor, messageType proto.Message, size int) *p2p.Channel { channel, err := n.Router.OpenChannel(chDesc, messageType, size) require.NoError(t, err) @@ -297,7 +297,7 @@ func (n *Node) MakeChannel(t *testing.T, chDesc p2p.ChannelDescriptor, // caller must ensure proper cleanup of the channel. func (n *Node) MakeChannelNoCleanup( t *testing.T, - chDesc p2p.ChannelDescriptor, + chDesc *p2p.ChannelDescriptor, messageType proto.Message, size int, ) *p2p.Channel { @@ -333,8 +333,8 @@ func (n *Node) MakePeerUpdatesNoRequireEmpty(t *testing.T) *p2p.PeerUpdates { return sub } -func MakeChannelDesc(chID p2p.ChannelID) p2p.ChannelDescriptor { - return p2p.ChannelDescriptor{ +func MakeChannelDesc(chID p2p.ChannelID) *p2p.ChannelDescriptor { + return &p2p.ChannelDescriptor{ ID: chID, Priority: 5, SendQueueCapacity: 10, diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index d43d836ce..645cc19e1 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -62,13 +62,14 @@ const ( // within each reactor (as they are now) or, considering that the reactor doesn't // really need to care about the channel descriptors, if they should be housed // in the node module. -func ChannelDescriptor() conn.ChannelDescriptor { - return conn.ChannelDescriptor{ +func ChannelDescriptor() *conn.ChannelDescriptor { + return &conn.ChannelDescriptor{ ID: PexChannel, + MessageType: new(protop2p.PexMessage), Priority: 1, SendQueueCapacity: 10, RecvMessageCapacity: maxMsgSize, - RecvBufferCapacity: 32, + RecvBufferCapacity: 128, } } diff --git a/internal/p2p/pqueue.go b/internal/p2p/pqueue.go index fd0a43db6..e0e812cf5 100644 --- a/internal/p2p/pqueue.go +++ b/internal/p2p/pqueue.go @@ -71,7 +71,7 @@ type pqScheduler struct { size uint sizes map[uint]uint // cumulative priority sizes pq *priorityQueue - chDescs []ChannelDescriptor + chDescs []*ChannelDescriptor capacity uint chPriorities map[ChannelID]uint @@ -84,12 +84,12 @@ type pqScheduler struct { func newPQScheduler( logger log.Logger, m *Metrics, - chDescs []ChannelDescriptor, + chDescs []*ChannelDescriptor, enqueueBuf, dequeueBuf, capacity uint, ) *pqScheduler { // copy each ChannelDescriptor and sort them by ascending channel priority - chDescsCopy := make([]ChannelDescriptor, len(chDescs)) + chDescsCopy := make([]*ChannelDescriptor, len(chDescs)) copy(chDescsCopy, chDescs) sort.Slice(chDescsCopy, func(i, j int) bool { return chDescsCopy[i].Priority < chDescsCopy[j].Priority }) diff --git a/internal/p2p/pqueue_test.go b/internal/p2p/pqueue_test.go index 18f0c02e3..ffa7e39a8 100644 --- a/internal/p2p/pqueue_test.go +++ b/internal/p2p/pqueue_test.go @@ -12,7 +12,7 @@ type testMessage = gogotypes.StringValue func TestCloseWhileDequeueFull(t *testing.T) { enqueueLength := 5 - chDescs := []ChannelDescriptor{ + chDescs := []*ChannelDescriptor{ {ID: 0x01, Priority: 1}, } pqueue := newPQScheduler(log.NewNopLogger(), NopMetrics(), chDescs, uint(enqueueLength), 1, 120) diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 11d0514bb..fbf2eea6b 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -249,7 +249,7 @@ type Router struct { nodeInfo types.NodeInfo privKey crypto.PrivKey peerManager *PeerManager - chDescs []ChannelDescriptor + chDescs []*ChannelDescriptor transports []Transport connTracker connectionTracker protocolTransports map[Protocol]Transport @@ -295,7 +295,7 @@ func NewRouter( options.MaxIncomingConnectionAttempts, options.IncomingConnectionWindow, ), - chDescs: make([]ChannelDescriptor, 0), + chDescs: make([]*ChannelDescriptor, 0), transports: transports, protocolTransports: map[Protocol]Transport{}, peerManager: peerManager, @@ -354,7 +354,7 @@ func (r *Router) createQueueFactory() (func(int) queue, error) { // implement Wrapper to automatically (un)wrap multiple message types in a // wrapper message. The caller may provide a size to make the channel buffered, // which internally makes the inbound, outbound, and error channel buffered. -func (r *Router) OpenChannel(chDesc ChannelDescriptor, messageType proto.Message, size int) (*Channel, error) { +func (r *Router) OpenChannel(chDesc *ChannelDescriptor, messageType proto.Message, size int) (*Channel, error) { r.channelMtx.Lock() defer r.channelMtx.Unlock() @@ -381,7 +381,7 @@ func (r *Router) OpenChannel(chDesc ChannelDescriptor, messageType proto.Message r.nodeInfo.AddChannel(uint16(chDesc.ID)) for _, t := range r.transports { - t.AddChannelDescriptors([]*ChannelDescriptor{&chDesc}) + t.AddChannelDescriptors([]*ChannelDescriptor{chDesc}) } go func() { diff --git a/internal/p2p/router_test.go b/internal/p2p/router_test.go index 7f922c29d..7ba42b9b0 100644 --- a/internal/p2p/router_test.go +++ b/internal/p2p/router_test.go @@ -128,7 +128,7 @@ func TestRouter_Channel_Basic(t *testing.T) { require.Error(t, err) // Opening a different channel should work. - chDesc2 := p2p.ChannelDescriptor{ID: 2} + chDesc2 := &p2p.ChannelDescriptor{ID: 2} _, err = router.OpenChannel(chDesc2, &p2ptest.Message{}, 0) require.NoError(t, err) require.Contains(t, router.NodeInfo().Channels, byte(chDesc2.ID)) diff --git a/internal/p2p/shim.go b/internal/p2p/shim.go deleted file mode 100644 index 8de0b71c7..000000000 --- a/internal/p2p/shim.go +++ /dev/null @@ -1,9 +0,0 @@ -package p2p - -// ChannelDescriptorShim defines a shim wrapper around a legacy p2p channel -// and the proto.Message the new p2p Channel is responsible for handling. -// A ChannelDescriptorShim is not contained in ReactorShim, but is rather -// used to construct a ReactorShim. -type ChannelDescriptorShim struct { - Descriptor *ChannelDescriptor -} diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 86a46ea63..99cb4e6ab 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -36,46 +36,38 @@ var ( // // TODO: Remove once p2p refactor is complete. // ref: https://github.com/tendermint/tendermint/issues/5670 - ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ - SnapshotChannel: { - Descriptor: &p2p.ChannelDescriptor{ - ID: SnapshotChannel, - MessageType: new(ssproto.Message), - Priority: 6, - SendQueueCapacity: 10, - RecvMessageCapacity: snapshotMsgSize, - RecvBufferCapacity: 128, - }, + ChannelShims = []*p2p.ChannelDescriptor{ + { + ID: SnapshotChannel, + MessageType: new(ssproto.Message), + Priority: 6, + SendQueueCapacity: 10, + RecvMessageCapacity: snapshotMsgSize, + RecvBufferCapacity: 128, }, - ChunkChannel: { - Descriptor: &p2p.ChannelDescriptor{ - ID: ChunkChannel, - Priority: 3, - MessageType: new(ssproto.Message), - SendQueueCapacity: 4, - RecvMessageCapacity: chunkMsgSize, - RecvBufferCapacity: 128, - }, + { + ID: ChunkChannel, + Priority: 3, + MessageType: new(ssproto.Message), + SendQueueCapacity: 4, + RecvMessageCapacity: chunkMsgSize, + RecvBufferCapacity: 128, }, - LightBlockChannel: { - Descriptor: &p2p.ChannelDescriptor{ - ID: LightBlockChannel, - MessageType: new(ssproto.Message), - Priority: 5, - SendQueueCapacity: 10, - RecvMessageCapacity: lightBlockMsgSize, - RecvBufferCapacity: 128, - }, + { + ID: LightBlockChannel, + MessageType: new(ssproto.Message), + Priority: 5, + SendQueueCapacity: 10, + RecvMessageCapacity: lightBlockMsgSize, + RecvBufferCapacity: 128, }, - ParamsChannel: { - Descriptor: &p2p.ChannelDescriptor{ - ID: ParamsChannel, - MessageType: new(ssproto.Message), - Priority: 2, - SendQueueCapacity: 10, - RecvMessageCapacity: paramMsgSize, - RecvBufferCapacity: 128, - }, + { + ID: ParamsChannel, + MessageType: new(ssproto.Message), + Priority: 2, + SendQueueCapacity: 10, + RecvMessageCapacity: paramMsgSize, + RecvBufferCapacity: 128, }, } ) diff --git a/node/node.go b/node/node.go index 61ba25749..cf2d104dc 100644 --- a/node/node.go +++ b/node/node.go @@ -1092,17 +1092,18 @@ func getRouterConfig(conf *config.Config, proxyApp proxy.AppConns) p2p.RouterOpt // FIXME: Temporary helper function, shims should be removed. func makeChannelsFromShims( router *p2p.Router, - chShims map[p2p.ChannelID]*p2p.ChannelDescriptorShim, + chDescs []*p2p.ChannelDescriptor, ) map[p2p.ChannelID]*p2p.Channel { channels := map[p2p.ChannelID]*p2p.Channel{} - for chID, chShim := range chShims { - ch, err := router.OpenChannel(*chShim.Descriptor, chShim.Descriptor.MessageType, chShim.Descriptor.RecvBufferCapacity) + for idx := range chDescs { + chDesc := chDescs[idx] + ch, err := router.OpenChannel(chDesc, chDesc.MessageType, chDesc.RecvBufferCapacity) if err != nil { - panic(fmt.Sprintf("failed to open channel %v: %v", chID, err)) + panic(fmt.Sprintf("failed to open channel %v: %v", chDesc.ID, err)) } - channels[chID] = ch + channels[chDesc.ID] = ch } return channels