Browse Source

p2p: simplify open channel interface (#7133)

A fourth #7075 component patch to simplify the channel creation interface
pull/7136/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
7143f14a63
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 61 additions and 58 deletions
  1. +2
    -2
      internal/blocksync/reactor_test.go
  2. +8
    -6
      internal/consensus/reactor_test.go
  3. +2
    -5
      internal/evidence/reactor_test.go
  4. +3
    -2
      internal/mempool/v0/reactor_test.go
  5. +2
    -4
      internal/mempool/v1/reactor_test.go
  6. +2
    -0
      internal/p2p/p2p_test.go
  7. +9
    -14
      internal/p2p/p2ptest/network.go
  8. +2
    -0
      internal/p2p/p2ptest/require.go
  9. +2
    -6
      internal/p2p/pex/reactor_test.go
  10. +6
    -4
      internal/p2p/router.go
  11. +21
    -12
      internal/p2p/router_test.go
  12. +1
    -1
      node/node.go
  13. +1
    -2
      node/setup.go

+ 2
- 2
internal/blocksync/reactor_test.go View File

@ -65,8 +65,8 @@ func setup(
blockSync: true,
}
chDesc := &p2p.ChannelDescriptor{ID: BlockSyncChannel}
rts.blockSyncChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(bcproto.Message), int(chBuf))
chDesc := &p2p.ChannelDescriptor{ID: BlockSyncChannel, MessageType: new(bcproto.Message)}
rts.blockSyncChannels = rts.network.MakeChannelsNoCleanup(t, chDesc)
i := 0
for nodeID := range rts.network.Nodes {


+ 8
- 6
internal/consensus/reactor_test.go View File

@ -50,9 +50,11 @@ type reactorTestSuite struct {
voteSetBitsChannels map[types.NodeID]*p2p.Channel
}
func chDesc(chID p2p.ChannelID) *p2p.ChannelDescriptor {
func chDesc(chID p2p.ChannelID, size int) *p2p.ChannelDescriptor {
return &p2p.ChannelDescriptor{
ID: chID,
ID: chID,
MessageType: new(tmcons.Message),
RecvBufferCapacity: size,
}
}
@ -67,10 +69,10 @@ func setup(t *testing.T, numNodes int, states []*State, size int) *reactorTestSu
blocksyncSubs: make(map[types.NodeID]types.Subscription, numNodes),
}
rts.stateChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(StateChannel), new(tmcons.Message), size)
rts.dataChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(DataChannel), new(tmcons.Message), size)
rts.voteChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(VoteChannel), new(tmcons.Message), size)
rts.voteSetBitsChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(VoteSetBitsChannel), new(tmcons.Message), size)
rts.stateChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(StateChannel, size))
rts.dataChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(DataChannel, size))
rts.voteChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(VoteChannel, size))
rts.voteSetBitsChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(VoteSetBitsChannel, size))
_, cancel := context.WithCancel(context.Background())


+ 2
- 5
internal/evidence/reactor_test.go View File

@ -62,11 +62,8 @@ func setup(t *testing.T, stateStores []sm.Store, chBuf uint) *reactorTestSuite {
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numStateStores),
}
chDesc := &p2p.ChannelDescriptor{ID: evidence.EvidenceChannel}
rts.evidenceChannels = rts.network.MakeChannelsNoCleanup(t,
chDesc,
new(tmproto.EvidenceList),
int(chBuf))
chDesc := &p2p.ChannelDescriptor{ID: evidence.EvidenceChannel, MessageType: new(tmproto.EvidenceList)}
rts.evidenceChannels = rts.network.MakeChannelsNoCleanup(t, chDesc)
require.Len(t, rts.network.RandomNode().PeerManager.Peers(), 0)
idx := 0


+ 3
- 2
internal/mempool/v0/reactor_test.go View File

@ -50,8 +50,9 @@ func setup(t *testing.T, config *config.MempoolConfig, numNodes int, chBuf uint)
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
}
chDesc := &p2p.ChannelDescriptor{ID: mempool.MempoolChannel}
rts.mempoolChnnels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(protomem.Message), int(chBuf))
chDesc := GetChannelShims(config)[0]
chDesc.RecvBufferCapacity = int(chBuf)
rts.mempoolChnnels = rts.network.MakeChannelsNoCleanup(t, chDesc)
for nodeID := range rts.network.Nodes {
rts.kvstores[nodeID] = kvstore.NewApplication()


+ 2
- 4
internal/mempool/v1/reactor_test.go View File

@ -10,11 +10,9 @@ import (
"github.com/tendermint/tendermint/abci/example/kvstore"
"github.com/tendermint/tendermint/config"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/p2ptest"
"github.com/tendermint/tendermint/libs/log"
protomem "github.com/tendermint/tendermint/proto/tendermint/mempool"
"github.com/tendermint/tendermint/types"
)
@ -52,8 +50,8 @@ func setupReactors(t *testing.T, numNodes int, chBuf uint) *reactorTestSuite {
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
}
chDesc := &p2p.ChannelDescriptor{ID: mempool.MempoolChannel}
rts.mempoolChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(protomem.Message), int(chBuf))
chDesc := GetChannelShims(cfg.Mempool)[0]
rts.mempoolChannels = rts.network.MakeChannelsNoCleanup(t, chDesc)
for nodeID := range rts.network.Nodes {
rts.kvstores[nodeID] = kvstore.NewApplication()


+ 2
- 0
internal/p2p/p2p_test.go View File

@ -6,6 +6,7 @@ import (
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/p2ptest"
"github.com/tendermint/tendermint/types"
)
@ -16,6 +17,7 @@ var (
chID = p2p.ChannelID(1)
chDesc = &p2p.ChannelDescriptor{
ID: chID,
MessageType: &p2ptest.Message{},
Priority: 5,
SendQueueCapacity: 10,
RecvMessageCapacity: 10,


+ 9
- 14
internal/p2p/p2ptest/network.go View File

@ -6,7 +6,6 @@ import (
"testing"
"time"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
@ -138,12 +137,10 @@ func (n *Network) NodeIDs() []types.NodeID {
func (n *Network) MakeChannels(
t *testing.T,
chDesc *p2p.ChannelDescriptor,
messageType proto.Message,
size int,
) map[types.NodeID]*p2p.Channel {
channels := map[types.NodeID]*p2p.Channel{}
for _, node := range n.Nodes {
channels[node.NodeID] = node.MakeChannel(t, chDesc, messageType, size)
channels[node.NodeID] = node.MakeChannel(t, chDesc)
}
return channels
}
@ -154,12 +151,10 @@ func (n *Network) MakeChannels(
func (n *Network) MakeChannelsNoCleanup(
t *testing.T,
chDesc *p2p.ChannelDescriptor,
messageType proto.Message,
size int,
) map[types.NodeID]*p2p.Channel {
channels := map[types.NodeID]*p2p.Channel{}
for _, node := range n.Nodes {
channels[node.NodeID] = node.MakeChannelNoCleanup(t, chDesc, messageType, size)
channels[node.NodeID] = node.MakeChannelNoCleanup(t, chDesc)
}
return channels
}
@ -281,9 +276,11 @@ func (n *Network) MakeNode(t *testing.T, opts NodeOptions) *Node {
// MakeChannel opens a channel, with automatic error handling and cleanup. On
// test cleanup, it also checks that the channel is empty, to make sure
// all expected messages have been asserted.
func (n *Node) MakeChannel(t *testing.T, chDesc *p2p.ChannelDescriptor,
messageType proto.Message, size int) *p2p.Channel {
channel, err := n.Router.OpenChannel(chDesc, messageType, size)
func (n *Node) MakeChannel(
t *testing.T,
chDesc *p2p.ChannelDescriptor,
) *p2p.Channel {
channel, err := n.Router.OpenChannel(chDesc)
require.NoError(t, err)
require.Contains(t, n.Router.NodeInfo().Channels, byte(chDesc.ID))
t.Cleanup(func() {
@ -298,11 +295,8 @@ func (n *Node) MakeChannel(t *testing.T, chDesc *p2p.ChannelDescriptor,
func (n *Node) MakeChannelNoCleanup(
t *testing.T,
chDesc *p2p.ChannelDescriptor,
messageType proto.Message,
size int,
) *p2p.Channel {
channel, err := n.Router.OpenChannel(chDesc, messageType, size)
channel, err := n.Router.OpenChannel(chDesc)
require.NoError(t, err)
return channel
}
@ -336,6 +330,7 @@ func (n *Node) MakePeerUpdatesNoRequireEmpty(t *testing.T) *p2p.PeerUpdates {
func MakeChannelDesc(chID p2p.ChannelID) *p2p.ChannelDescriptor {
return &p2p.ChannelDescriptor{
ID: chID,
MessageType: &Message{},
Priority: 5,
SendQueueCapacity: 10,
RecvMessageCapacity: 10,


+ 2
- 0
internal/p2p/p2ptest/require.go View File

@ -24,6 +24,8 @@ func RequireEmpty(t *testing.T, channels ...*p2p.Channel) {
// RequireReceive requires that the given envelope is received on the channel.
func RequireReceive(t *testing.T, channel *p2p.Channel, expect p2p.Envelope) {
t.Helper()
timer := time.NewTimer(time.Second) // not time.After due to goroutine leaks
defer timer.Stop()


+ 2
- 6
internal/p2p/pex/reactor_test.go View File

@ -380,9 +380,7 @@ func setupNetwork(t *testing.T, opts testOptions) *reactorTestSuite {
// NOTE: we don't assert that the channels get drained after stopping the
// reactor
rts.pexChannels = rts.network.MakeChannelsNoCleanup(
t, pex.ChannelDescriptor(), new(p2pproto.PexMessage), chBuf,
)
rts.pexChannels = rts.network.MakeChannelsNoCleanup(t, pex.ChannelDescriptor())
idx := 0
for nodeID := range rts.network.Nodes {
@ -446,9 +444,7 @@ func (r *reactorTestSuite) addNodes(t *testing.T, nodes int) {
})
r.network.Nodes[node.NodeID] = node
nodeID := node.NodeID
r.pexChannels[nodeID] = node.MakeChannelNoCleanup(
t, pex.ChannelDescriptor(), new(p2pproto.PexMessage), r.opts.BufferSize,
)
r.pexChannels[nodeID] = node.MakeChannelNoCleanup(t, pex.ChannelDescriptor())
r.peerChans[nodeID] = make(chan p2p.PeerUpdate, r.opts.BufferSize)
r.peerUpdates[nodeID] = p2p.NewPeerUpdates(r.peerChans[nodeID], r.opts.BufferSize)
r.network.Nodes[nodeID].PeerManager.Register(r.peerUpdates[nodeID])


+ 6
- 4
internal/p2p/router.go View File

@ -354,7 +354,7 @@ func (r *Router) createQueueFactory() (func(int) queue, error) {
// implement Wrapper to automatically (un)wrap multiple message types in a
// wrapper message. The caller may provide a size to make the channel buffered,
// which internally makes the inbound, outbound, and error channel buffered.
func (r *Router) OpenChannel(chDesc *ChannelDescriptor, messageType proto.Message, size int) (*Channel, error) {
func (r *Router) OpenChannel(chDesc *ChannelDescriptor) (*Channel, error) {
r.channelMtx.Lock()
defer r.channelMtx.Unlock()
@ -364,9 +364,11 @@ func (r *Router) OpenChannel(chDesc *ChannelDescriptor, messageType proto.Messag
}
r.chDescs = append(r.chDescs, chDesc)
queue := r.queueFactory(size)
outCh := make(chan Envelope, size)
errCh := make(chan PeerError, size)
messageType := chDesc.MessageType
queue := r.queueFactory(chDesc.RecvBufferCapacity)
outCh := make(chan Envelope, chDesc.RecvBufferCapacity)
errCh := make(chan PeerError, chDesc.RecvBufferCapacity)
channel := NewChannel(id, messageType, queue.dequeue(), outCh, errCh)
var wrapper Wrapper


+ 21
- 12
internal/p2p/router_test.go View File

@ -50,7 +50,7 @@ func TestRouter_Network(t *testing.T) {
network := p2ptest.MakeNetwork(t, p2ptest.NetworkOptions{NumNodes: 8})
local := network.RandomNode()
peers := network.Peers(local.NodeID)
channels := network.MakeChannels(t, chDesc, &p2ptest.Message{}, 0)
channels := network.MakeChannels(t, chDesc)
network.Start(t)
@ -119,17 +119,18 @@ func TestRouter_Channel_Basic(t *testing.T) {
})
// Opening a channel should work.
channel, err := router.OpenChannel(chDesc, &p2ptest.Message{}, 0)
channel, err := router.OpenChannel(chDesc)
require.NoError(t, err)
require.Contains(t, router.NodeInfo().Channels, byte(chDesc.ID))
// Opening the same channel again should fail.
_, err = router.OpenChannel(chDesc, &p2ptest.Message{}, 0)
_, err = router.OpenChannel(chDesc)
require.Error(t, err)
// Opening a different channel should work.
chDesc2 := &p2p.ChannelDescriptor{ID: 2}
_, err = router.OpenChannel(chDesc2, &p2ptest.Message{}, 0)
chDesc2 := &p2p.ChannelDescriptor{ID: 2, MessageType: &p2ptest.Message{}}
_, err = router.OpenChannel(chDesc2)
require.NoError(t, err)
require.Contains(t, router.NodeInfo().Channels, byte(chDesc2.ID))
@ -137,7 +138,7 @@ func TestRouter_Channel_Basic(t *testing.T) {
channel.Close()
time.Sleep(100 * time.Millisecond) // yes yes, but Close() is async...
channel, err = router.OpenChannel(chDesc, &p2ptest.Message{}, 0)
channel, err = router.OpenChannel(chDesc)
require.NoError(t, err)
// We should be able to send on the channel, even though there are no peers.
@ -163,9 +164,9 @@ func TestRouter_Channel_SendReceive(t *testing.T) {
ids := network.NodeIDs()
aID, bID, cID := ids[0], ids[1], ids[2]
channels := network.MakeChannels(t, chDesc, &p2ptest.Message{}, 0)
channels := network.MakeChannels(t, chDesc)
a, b, c := channels[aID], channels[bID], channels[cID]
otherChannels := network.MakeChannels(t, p2ptest.MakeChannelDesc(9), &p2ptest.Message{}, 0)
otherChannels := network.MakeChannels(t, p2ptest.MakeChannelDesc(9))
network.Start(t)
@ -222,7 +223,7 @@ func TestRouter_Channel_Broadcast(t *testing.T) {
ids := network.NodeIDs()
aID, bID, cID, dID := ids[0], ids[1], ids[2], ids[3]
channels := network.MakeChannels(t, chDesc, &p2ptest.Message{}, 0)
channels := network.MakeChannels(t, chDesc)
a, b, c, d := channels[aID], channels[bID], channels[cID], channels[dID]
network.Start(t)
@ -250,7 +251,15 @@ func TestRouter_Channel_Wrapper(t *testing.T) {
ids := network.NodeIDs()
aID, bID := ids[0], ids[1]
channels := network.MakeChannels(t, chDesc, &wrapperMessage{}, 0)
chDesc := &p2p.ChannelDescriptor{
ID: chID,
MessageType: &wrapperMessage{},
Priority: 5,
SendQueueCapacity: 10,
RecvMessageCapacity: 10,
}
channels := network.MakeChannels(t, chDesc)
a, b := channels[aID], channels[bID]
network.Start(t)
@ -310,7 +319,7 @@ func TestRouter_Channel_Error(t *testing.T) {
ids := network.NodeIDs()
aID, bID := ids[0], ids[1]
channels := network.MakeChannels(t, chDesc, &p2ptest.Message{}, 0)
channels := network.MakeChannels(t, chDesc)
a := channels[aID]
// Erroring b should cause it to be disconnected. It will reconnect shortly after.
@ -897,7 +906,7 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) {
Status: p2p.PeerStatusUp,
})
channel, err := router.OpenChannel(chDesc, &p2ptest.Message{}, 0)
channel, err := router.OpenChannel(chDesc)
require.NoError(t, err)
channel.Out <- p2p.Envelope{


+ 1
- 1
node/node.go View File

@ -1098,7 +1098,7 @@ func makeChannelsFromShims(
channels := map[p2p.ChannelID]*p2p.Channel{}
for idx := range chDescs {
chDesc := chDescs[idx]
ch, err := router.OpenChannel(chDesc, chDesc.MessageType, chDesc.RecvBufferCapacity)
ch, err := router.OpenChannel(chDesc)
if err != nil {
panic(fmt.Sprintf("failed to open channel %v: %v", chDesc.ID, err))
}


+ 1
- 2
node/setup.go View File

@ -30,7 +30,6 @@ import (
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
tmstrings "github.com/tendermint/tendermint/libs/strings"
protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/version"
@ -485,7 +484,7 @@ func createPEXReactor(
router *p2p.Router,
) (service.Service, error) {
channel, err := router.OpenChannel(pex.ChannelDescriptor(), &protop2p.PexMessage{}, 128)
channel, err := router.OpenChannel(pex.ChannelDescriptor())
if err != nil {
return nil, err
}


Loading…
Cancel
Save