diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index 568b928a5..6bca8d4a9 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -65,8 +65,8 @@ func setup( blockSync: true, } - chDesc := &p2p.ChannelDescriptor{ID: BlockSyncChannel} - rts.blockSyncChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(bcproto.Message), int(chBuf)) + chDesc := &p2p.ChannelDescriptor{ID: BlockSyncChannel, MessageType: new(bcproto.Message)} + rts.blockSyncChannels = rts.network.MakeChannelsNoCleanup(t, chDesc) i := 0 for nodeID := range rts.network.Nodes { diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index 246178db4..16fa13969 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -50,9 +50,11 @@ type reactorTestSuite struct { voteSetBitsChannels map[types.NodeID]*p2p.Channel } -func chDesc(chID p2p.ChannelID) *p2p.ChannelDescriptor { +func chDesc(chID p2p.ChannelID, size int) *p2p.ChannelDescriptor { return &p2p.ChannelDescriptor{ - ID: chID, + ID: chID, + MessageType: new(tmcons.Message), + RecvBufferCapacity: size, } } @@ -67,10 +69,10 @@ func setup(t *testing.T, numNodes int, states []*State, size int) *reactorTestSu blocksyncSubs: make(map[types.NodeID]types.Subscription, numNodes), } - rts.stateChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(StateChannel), new(tmcons.Message), size) - rts.dataChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(DataChannel), new(tmcons.Message), size) - rts.voteChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(VoteChannel), new(tmcons.Message), size) - rts.voteSetBitsChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(VoteSetBitsChannel), new(tmcons.Message), size) + rts.stateChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(StateChannel, size)) + rts.dataChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(DataChannel, size)) + rts.voteChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(VoteChannel, size)) + rts.voteSetBitsChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(VoteSetBitsChannel, size)) _, cancel := context.WithCancel(context.Background()) diff --git a/internal/evidence/reactor_test.go b/internal/evidence/reactor_test.go index 50db47012..cf8f840ea 100644 --- a/internal/evidence/reactor_test.go +++ b/internal/evidence/reactor_test.go @@ -62,11 +62,8 @@ func setup(t *testing.T, stateStores []sm.Store, chBuf uint) *reactorTestSuite { peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numStateStores), } - chDesc := &p2p.ChannelDescriptor{ID: evidence.EvidenceChannel} - rts.evidenceChannels = rts.network.MakeChannelsNoCleanup(t, - chDesc, - new(tmproto.EvidenceList), - int(chBuf)) + chDesc := &p2p.ChannelDescriptor{ID: evidence.EvidenceChannel, MessageType: new(tmproto.EvidenceList)} + rts.evidenceChannels = rts.network.MakeChannelsNoCleanup(t, chDesc) require.Len(t, rts.network.RandomNode().PeerManager.Peers(), 0) idx := 0 diff --git a/internal/mempool/v0/reactor_test.go b/internal/mempool/v0/reactor_test.go index 2964018d6..104e1d4bd 100644 --- a/internal/mempool/v0/reactor_test.go +++ b/internal/mempool/v0/reactor_test.go @@ -50,8 +50,9 @@ func setup(t *testing.T, config *config.MempoolConfig, numNodes int, chBuf uint) peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes), } - chDesc := &p2p.ChannelDescriptor{ID: mempool.MempoolChannel} - rts.mempoolChnnels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(protomem.Message), int(chBuf)) + chDesc := GetChannelShims(config)[0] + chDesc.RecvBufferCapacity = int(chBuf) + rts.mempoolChnnels = rts.network.MakeChannelsNoCleanup(t, chDesc) for nodeID := range rts.network.Nodes { rts.kvstores[nodeID] = kvstore.NewApplication() diff --git a/internal/mempool/v1/reactor_test.go b/internal/mempool/v1/reactor_test.go index bdb66f436..f004e75a9 100644 --- a/internal/mempool/v1/reactor_test.go +++ b/internal/mempool/v1/reactor_test.go @@ -10,11 +10,9 @@ import ( "github.com/tendermint/tendermint/abci/example/kvstore" "github.com/tendermint/tendermint/config" tmsync "github.com/tendermint/tendermint/internal/libs/sync" - "github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p/p2ptest" "github.com/tendermint/tendermint/libs/log" - protomem "github.com/tendermint/tendermint/proto/tendermint/mempool" "github.com/tendermint/tendermint/types" ) @@ -52,8 +50,8 @@ func setupReactors(t *testing.T, numNodes int, chBuf uint) *reactorTestSuite { peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes), } - chDesc := &p2p.ChannelDescriptor{ID: mempool.MempoolChannel} - rts.mempoolChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(protomem.Message), int(chBuf)) + chDesc := GetChannelShims(cfg.Mempool)[0] + rts.mempoolChannels = rts.network.MakeChannelsNoCleanup(t, chDesc) for nodeID := range rts.network.Nodes { rts.kvstores[nodeID] = kvstore.NewApplication() diff --git a/internal/p2p/p2p_test.go b/internal/p2p/p2p_test.go index 15e561d9b..642114a1d 100644 --- a/internal/p2p/p2p_test.go +++ b/internal/p2p/p2p_test.go @@ -6,6 +6,7 @@ import ( "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/internal/p2p" + "github.com/tendermint/tendermint/internal/p2p/p2ptest" "github.com/tendermint/tendermint/types" ) @@ -16,6 +17,7 @@ var ( chID = p2p.ChannelID(1) chDesc = &p2p.ChannelDescriptor{ ID: chID, + MessageType: &p2ptest.Message{}, Priority: 5, SendQueueCapacity: 10, RecvMessageCapacity: 10, diff --git a/internal/p2p/p2ptest/network.go b/internal/p2p/p2ptest/network.go index dff5b6a75..c808ad3e0 100644 --- a/internal/p2p/p2ptest/network.go +++ b/internal/p2p/p2ptest/network.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/require" dbm "github.com/tendermint/tm-db" @@ -138,12 +137,10 @@ func (n *Network) NodeIDs() []types.NodeID { func (n *Network) MakeChannels( t *testing.T, chDesc *p2p.ChannelDescriptor, - messageType proto.Message, - size int, ) map[types.NodeID]*p2p.Channel { channels := map[types.NodeID]*p2p.Channel{} for _, node := range n.Nodes { - channels[node.NodeID] = node.MakeChannel(t, chDesc, messageType, size) + channels[node.NodeID] = node.MakeChannel(t, chDesc) } return channels } @@ -154,12 +151,10 @@ func (n *Network) MakeChannels( func (n *Network) MakeChannelsNoCleanup( t *testing.T, chDesc *p2p.ChannelDescriptor, - messageType proto.Message, - size int, ) map[types.NodeID]*p2p.Channel { channels := map[types.NodeID]*p2p.Channel{} for _, node := range n.Nodes { - channels[node.NodeID] = node.MakeChannelNoCleanup(t, chDesc, messageType, size) + channels[node.NodeID] = node.MakeChannelNoCleanup(t, chDesc) } return channels } @@ -281,9 +276,11 @@ func (n *Network) MakeNode(t *testing.T, opts NodeOptions) *Node { // MakeChannel opens a channel, with automatic error handling and cleanup. On // test cleanup, it also checks that the channel is empty, to make sure // all expected messages have been asserted. -func (n *Node) MakeChannel(t *testing.T, chDesc *p2p.ChannelDescriptor, - messageType proto.Message, size int) *p2p.Channel { - channel, err := n.Router.OpenChannel(chDesc, messageType, size) +func (n *Node) MakeChannel( + t *testing.T, + chDesc *p2p.ChannelDescriptor, +) *p2p.Channel { + channel, err := n.Router.OpenChannel(chDesc) require.NoError(t, err) require.Contains(t, n.Router.NodeInfo().Channels, byte(chDesc.ID)) t.Cleanup(func() { @@ -298,11 +295,8 @@ func (n *Node) MakeChannel(t *testing.T, chDesc *p2p.ChannelDescriptor, func (n *Node) MakeChannelNoCleanup( t *testing.T, chDesc *p2p.ChannelDescriptor, - messageType proto.Message, - size int, ) *p2p.Channel { - - channel, err := n.Router.OpenChannel(chDesc, messageType, size) + channel, err := n.Router.OpenChannel(chDesc) require.NoError(t, err) return channel } @@ -336,6 +330,7 @@ func (n *Node) MakePeerUpdatesNoRequireEmpty(t *testing.T) *p2p.PeerUpdates { func MakeChannelDesc(chID p2p.ChannelID) *p2p.ChannelDescriptor { return &p2p.ChannelDescriptor{ ID: chID, + MessageType: &Message{}, Priority: 5, SendQueueCapacity: 10, RecvMessageCapacity: 10, diff --git a/internal/p2p/p2ptest/require.go b/internal/p2p/p2ptest/require.go index 3598baba0..a9fc16a34 100644 --- a/internal/p2p/p2ptest/require.go +++ b/internal/p2p/p2ptest/require.go @@ -24,6 +24,8 @@ func RequireEmpty(t *testing.T, channels ...*p2p.Channel) { // RequireReceive requires that the given envelope is received on the channel. func RequireReceive(t *testing.T, channel *p2p.Channel, expect p2p.Envelope) { + t.Helper() + timer := time.NewTimer(time.Second) // not time.After due to goroutine leaks defer timer.Stop() diff --git a/internal/p2p/pex/reactor_test.go b/internal/p2p/pex/reactor_test.go index 28476e67d..b7e1a01c3 100644 --- a/internal/p2p/pex/reactor_test.go +++ b/internal/p2p/pex/reactor_test.go @@ -380,9 +380,7 @@ func setupNetwork(t *testing.T, opts testOptions) *reactorTestSuite { // NOTE: we don't assert that the channels get drained after stopping the // reactor - rts.pexChannels = rts.network.MakeChannelsNoCleanup( - t, pex.ChannelDescriptor(), new(p2pproto.PexMessage), chBuf, - ) + rts.pexChannels = rts.network.MakeChannelsNoCleanup(t, pex.ChannelDescriptor()) idx := 0 for nodeID := range rts.network.Nodes { @@ -446,9 +444,7 @@ func (r *reactorTestSuite) addNodes(t *testing.T, nodes int) { }) r.network.Nodes[node.NodeID] = node nodeID := node.NodeID - r.pexChannels[nodeID] = node.MakeChannelNoCleanup( - t, pex.ChannelDescriptor(), new(p2pproto.PexMessage), r.opts.BufferSize, - ) + r.pexChannels[nodeID] = node.MakeChannelNoCleanup(t, pex.ChannelDescriptor()) r.peerChans[nodeID] = make(chan p2p.PeerUpdate, r.opts.BufferSize) r.peerUpdates[nodeID] = p2p.NewPeerUpdates(r.peerChans[nodeID], r.opts.BufferSize) r.network.Nodes[nodeID].PeerManager.Register(r.peerUpdates[nodeID]) diff --git a/internal/p2p/router.go b/internal/p2p/router.go index fbf2eea6b..6c4694624 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -354,7 +354,7 @@ func (r *Router) createQueueFactory() (func(int) queue, error) { // implement Wrapper to automatically (un)wrap multiple message types in a // wrapper message. The caller may provide a size to make the channel buffered, // which internally makes the inbound, outbound, and error channel buffered. -func (r *Router) OpenChannel(chDesc *ChannelDescriptor, messageType proto.Message, size int) (*Channel, error) { +func (r *Router) OpenChannel(chDesc *ChannelDescriptor) (*Channel, error) { r.channelMtx.Lock() defer r.channelMtx.Unlock() @@ -364,9 +364,11 @@ func (r *Router) OpenChannel(chDesc *ChannelDescriptor, messageType proto.Messag } r.chDescs = append(r.chDescs, chDesc) - queue := r.queueFactory(size) - outCh := make(chan Envelope, size) - errCh := make(chan PeerError, size) + messageType := chDesc.MessageType + + queue := r.queueFactory(chDesc.RecvBufferCapacity) + outCh := make(chan Envelope, chDesc.RecvBufferCapacity) + errCh := make(chan PeerError, chDesc.RecvBufferCapacity) channel := NewChannel(id, messageType, queue.dequeue(), outCh, errCh) var wrapper Wrapper diff --git a/internal/p2p/router_test.go b/internal/p2p/router_test.go index 7ba42b9b0..997f02a06 100644 --- a/internal/p2p/router_test.go +++ b/internal/p2p/router_test.go @@ -50,7 +50,7 @@ func TestRouter_Network(t *testing.T) { network := p2ptest.MakeNetwork(t, p2ptest.NetworkOptions{NumNodes: 8}) local := network.RandomNode() peers := network.Peers(local.NodeID) - channels := network.MakeChannels(t, chDesc, &p2ptest.Message{}, 0) + channels := network.MakeChannels(t, chDesc) network.Start(t) @@ -119,17 +119,18 @@ func TestRouter_Channel_Basic(t *testing.T) { }) // Opening a channel should work. - channel, err := router.OpenChannel(chDesc, &p2ptest.Message{}, 0) + channel, err := router.OpenChannel(chDesc) require.NoError(t, err) require.Contains(t, router.NodeInfo().Channels, byte(chDesc.ID)) // Opening the same channel again should fail. - _, err = router.OpenChannel(chDesc, &p2ptest.Message{}, 0) + _, err = router.OpenChannel(chDesc) require.Error(t, err) // Opening a different channel should work. - chDesc2 := &p2p.ChannelDescriptor{ID: 2} - _, err = router.OpenChannel(chDesc2, &p2ptest.Message{}, 0) + chDesc2 := &p2p.ChannelDescriptor{ID: 2, MessageType: &p2ptest.Message{}} + _, err = router.OpenChannel(chDesc2) + require.NoError(t, err) require.Contains(t, router.NodeInfo().Channels, byte(chDesc2.ID)) @@ -137,7 +138,7 @@ func TestRouter_Channel_Basic(t *testing.T) { channel.Close() time.Sleep(100 * time.Millisecond) // yes yes, but Close() is async... - channel, err = router.OpenChannel(chDesc, &p2ptest.Message{}, 0) + channel, err = router.OpenChannel(chDesc) require.NoError(t, err) // We should be able to send on the channel, even though there are no peers. @@ -163,9 +164,9 @@ func TestRouter_Channel_SendReceive(t *testing.T) { ids := network.NodeIDs() aID, bID, cID := ids[0], ids[1], ids[2] - channels := network.MakeChannels(t, chDesc, &p2ptest.Message{}, 0) + channels := network.MakeChannels(t, chDesc) a, b, c := channels[aID], channels[bID], channels[cID] - otherChannels := network.MakeChannels(t, p2ptest.MakeChannelDesc(9), &p2ptest.Message{}, 0) + otherChannels := network.MakeChannels(t, p2ptest.MakeChannelDesc(9)) network.Start(t) @@ -222,7 +223,7 @@ func TestRouter_Channel_Broadcast(t *testing.T) { ids := network.NodeIDs() aID, bID, cID, dID := ids[0], ids[1], ids[2], ids[3] - channels := network.MakeChannels(t, chDesc, &p2ptest.Message{}, 0) + channels := network.MakeChannels(t, chDesc) a, b, c, d := channels[aID], channels[bID], channels[cID], channels[dID] network.Start(t) @@ -250,7 +251,15 @@ func TestRouter_Channel_Wrapper(t *testing.T) { ids := network.NodeIDs() aID, bID := ids[0], ids[1] - channels := network.MakeChannels(t, chDesc, &wrapperMessage{}, 0) + chDesc := &p2p.ChannelDescriptor{ + ID: chID, + MessageType: &wrapperMessage{}, + Priority: 5, + SendQueueCapacity: 10, + RecvMessageCapacity: 10, + } + + channels := network.MakeChannels(t, chDesc) a, b := channels[aID], channels[bID] network.Start(t) @@ -310,7 +319,7 @@ func TestRouter_Channel_Error(t *testing.T) { ids := network.NodeIDs() aID, bID := ids[0], ids[1] - channels := network.MakeChannels(t, chDesc, &p2ptest.Message{}, 0) + channels := network.MakeChannels(t, chDesc) a := channels[aID] // Erroring b should cause it to be disconnected. It will reconnect shortly after. @@ -897,7 +906,7 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) { Status: p2p.PeerStatusUp, }) - channel, err := router.OpenChannel(chDesc, &p2ptest.Message{}, 0) + channel, err := router.OpenChannel(chDesc) require.NoError(t, err) channel.Out <- p2p.Envelope{ diff --git a/node/node.go b/node/node.go index cf2d104dc..0a48bf0b7 100644 --- a/node/node.go +++ b/node/node.go @@ -1098,7 +1098,7 @@ func makeChannelsFromShims( channels := map[p2p.ChannelID]*p2p.Channel{} for idx := range chDescs { chDesc := chDescs[idx] - ch, err := router.OpenChannel(chDesc, chDesc.MessageType, chDesc.RecvBufferCapacity) + ch, err := router.OpenChannel(chDesc) if err != nil { panic(fmt.Sprintf("failed to open channel %v: %v", chDesc.ID, err)) } diff --git a/node/setup.go b/node/setup.go index fb5599dfc..fc611d946 100644 --- a/node/setup.go +++ b/node/setup.go @@ -30,7 +30,6 @@ import ( "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" tmstrings "github.com/tendermint/tendermint/libs/strings" - protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p" "github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/version" @@ -485,7 +484,7 @@ func createPEXReactor( router *p2p.Router, ) (service.Service, error) { - channel, err := router.OpenChannel(pex.ChannelDescriptor(), &protop2p.PexMessage{}, 128) + channel, err := router.OpenChannel(pex.ChannelDescriptor()) if err != nil { return nil, err }