Browse Source

p2p: add channel descriptors to open channel (#6440)

pull/6449/head
Callum Waters 3 years ago
committed by GitHub
parent
commit
64e7b5efea
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 78 additions and 62 deletions
  1. +2
    -1
      blockchain/v0/reactor_test.go
  2. +10
    -4
      consensus/reactor_test.go
  3. +2
    -1
      evidence/reactor_test.go
  4. +2
    -1
      mempool/reactor_test.go
  5. +10
    -14
      node/node.go
  6. +9
    -2
      p2p/p2p_test.go
  7. +19
    -8
      p2p/p2ptest/network.go
  8. +8
    -10
      p2p/pex/reactor.go
  9. +2
    -2
      p2p/pex/reactor_test.go
  10. +3
    -9
      p2p/router.go
  11. +11
    -10
      p2p/router_test.go

+ 2
- 1
blockchain/v0/reactor_test.go View File

@ -62,7 +62,8 @@ func setup(
fastSync: true,
}
rts.blockchainChannels = rts.network.MakeChannelsNoCleanup(t, BlockchainChannel, new(bcproto.Message), int(chBuf))
chDesc := p2p.ChannelDescriptor{ID: byte(BlockchainChannel)}
rts.blockchainChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(bcproto.Message), int(chBuf))
i := 0
for nodeID := range rts.network.Nodes {


+ 10
- 4
consensus/reactor_test.go View File

@ -47,6 +47,12 @@ type reactorTestSuite struct {
voteSetBitsChannels map[p2p.NodeID]*p2p.Channel
}
func chDesc(chID p2p.ChannelID) p2p.ChannelDescriptor {
return p2p.ChannelDescriptor{
ID: byte(chID),
}
}
func setup(t *testing.T, numNodes int, states []*State, size int) *reactorTestSuite {
t.Helper()
@ -57,10 +63,10 @@ func setup(t *testing.T, numNodes int, states []*State, size int) *reactorTestSu
subs: make(map[p2p.NodeID]types.Subscription, numNodes),
}
rts.stateChannels = rts.network.MakeChannelsNoCleanup(t, StateChannel, new(tmcons.Message), size)
rts.dataChannels = rts.network.MakeChannelsNoCleanup(t, DataChannel, new(tmcons.Message), size)
rts.voteChannels = rts.network.MakeChannelsNoCleanup(t, VoteChannel, new(tmcons.Message), size)
rts.voteSetBitsChannels = rts.network.MakeChannelsNoCleanup(t, VoteSetBitsChannel, new(tmcons.Message), size)
rts.stateChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(StateChannel), new(tmcons.Message), size)
rts.dataChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(DataChannel), new(tmcons.Message), size)
rts.voteChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(VoteChannel), new(tmcons.Message), size)
rts.voteSetBitsChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(VoteSetBitsChannel), new(tmcons.Message), size)
i := 0
for nodeID, node := range rts.network.Nodes {


+ 2
- 1
evidence/reactor_test.go View File

@ -62,8 +62,9 @@ func setup(t *testing.T, stateStores []sm.Store, chBuf uint) *reactorTestSuite {
peerChans: make(map[p2p.NodeID]chan p2p.PeerUpdate, numStateStores),
}
chDesc := p2p.ChannelDescriptor{ID: byte(evidence.EvidenceChannel)}
rts.evidenceChannels = rts.network.MakeChannelsNoCleanup(t,
evidence.EvidenceChannel,
chDesc,
new(tmproto.EvidenceList),
int(chBuf))
require.Len(t, rts.network.RandomNode().PeerManager.Peers(), 0)


+ 2
- 1
mempool/reactor_test.go View File

@ -48,7 +48,8 @@ func setup(t *testing.T, cfg *cfg.MempoolConfig, numNodes int, chBuf uint) *reac
peerUpdates: make(map[p2p.NodeID]*p2p.PeerUpdates, numNodes),
}
rts.mempoolChnnels = rts.network.MakeChannelsNoCleanup(t, MempoolChannel, new(protomem.Message), int(chBuf))
chDesc := p2p.ChannelDescriptor{ID: byte(MempoolChannel)}
rts.mempoolChnnels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(protomem.Message), int(chBuf))
for nodeID := range rts.network.Nodes {
rts.kvstores[nodeID] = kvstore.NewApplication()


+ 10
- 14
node/node.go View File

@ -829,7 +829,7 @@ func createPEXReactorV2(
router *p2p.Router,
) (*pex.ReactorV2, error) {
channel, err := router.OpenChannel(p2p.ChannelID(pex.PexChannel), &protop2p.PexMessage{}, 4096)
channel, err := router.OpenChannel(pex.ChannelDescriptor(), &protop2p.PexMessage{}, 4096)
if err != nil {
return nil, err
}
@ -961,8 +961,8 @@ func NewSeedNode(config *cfg.Config,
// FIXME: we add channel descriptors to both the router and the transport but only the router
// should be aware of channel info. We should remove this from transport once the legacy
// p2p stack is removed.
router.AddChannelDescriptors(pex.ChannelDescriptors())
transport.AddChannelDescriptors(pex.ChannelDescriptors())
pexCh := pex.ChannelDescriptor()
transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh})
if config.P2P.DisableLegacy {
pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router)
if err != nil {
@ -1213,14 +1213,10 @@ func NewNode(config *cfg.Config,
config.StateSync.TempDir,
)
// add the channel descriptors to both the router and the underlying
// transports
router.AddChannelDescriptors(mpReactorShim.GetChannels())
router.AddChannelDescriptors(bcReactorForSwitch.GetChannels())
router.AddChannelDescriptors(csReactorShim.GetChannels())
router.AddChannelDescriptors(evReactorShim.GetChannels())
router.AddChannelDescriptors(stateSyncReactorShim.GetChannels())
// add the channel descriptors to both the transports
// FIXME: This should be removed when the legacy p2p stack is removed and
// transports can either be agnostic to channel descriptors or can be
// declared in the constructor.
transport.AddChannelDescriptors(mpReactorShim.GetChannels())
transport.AddChannelDescriptors(bcReactorForSwitch.GetChannels())
transport.AddChannelDescriptors(csReactorShim.GetChannels())
@ -1266,8 +1262,8 @@ func NewNode(config *cfg.Config,
)
if config.P2P.PexReactor {
router.AddChannelDescriptors(pex.ChannelDescriptors())
transport.AddChannelDescriptors(pex.ChannelDescriptors())
pexCh := pex.ChannelDescriptor()
transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh})
if config.P2P.DisableLegacy {
pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router)
if err != nil {
@ -2058,7 +2054,7 @@ func makeChannelsFromShims(
channels := map[p2p.ChannelID]*p2p.Channel{}
for chID, chShim := range chShims {
ch, err := router.OpenChannel(chID, chShim.MsgType, chShim.Descriptor.RecvBufferCapacity)
ch, err := router.OpenChannel(*chShim.Descriptor, chShim.MsgType, chShim.Descriptor.RecvBufferCapacity)
if err != nil {
panic(fmt.Sprintf("failed to open channel %v: %v", chID, err))
}


+ 9
- 2
p2p/p2p_test.go View File

@ -11,8 +11,15 @@ import (
// Common setup for P2P tests.
var (
ctx = context.Background()
chID = p2p.ChannelID(1)
ctx = context.Background()
chID = p2p.ChannelID(1)
chDesc = p2p.ChannelDescriptor{
ID: byte(chID),
Priority: 5,
SendQueueCapacity: 10,
RecvMessageCapacity: 10,
MaxSendBytes: 1000,
}
selfKey crypto.PrivKey = ed25519.GenPrivKeyFromSecret([]byte{0xf9, 0x1b, 0x08, 0xaa, 0x38, 0xee, 0x34, 0xdd})
selfID = p2p.NodeIDFromPubKey(selfKey.PubKey())


+ 19
- 8
p2p/p2ptest/network.go View File

@ -135,13 +135,13 @@ func (n *Network) NodeIDs() []p2p.NodeID {
// doing error checks and cleanups.
func (n *Network) MakeChannels(
t *testing.T,
chID p2p.ChannelID,
chDesc p2p.ChannelDescriptor,
messageType proto.Message,
size int,
) map[p2p.NodeID]*p2p.Channel {
channels := map[p2p.NodeID]*p2p.Channel{}
for _, node := range n.Nodes {
channels[node.NodeID] = node.MakeChannel(t, chID, messageType, size)
channels[node.NodeID] = node.MakeChannel(t, chDesc, messageType, size)
}
return channels
}
@ -151,13 +151,13 @@ func (n *Network) MakeChannels(
// all the channels.
func (n *Network) MakeChannelsNoCleanup(
t *testing.T,
chID p2p.ChannelID,
chDesc p2p.ChannelDescriptor,
messageType proto.Message,
size int,
) map[p2p.NodeID]*p2p.Channel {
channels := map[p2p.NodeID]*p2p.Channel{}
for _, node := range n.Nodes {
channels[node.NodeID] = node.MakeChannelNoCleanup(t, chID, messageType, size)
channels[node.NodeID] = node.MakeChannelNoCleanup(t, chDesc, messageType, size)
}
return channels
}
@ -279,8 +279,9 @@ func (n *Network) MakeNode(t *testing.T, opts NodeOptions) *Node {
// MakeChannel opens a channel, with automatic error handling and cleanup. On
// test cleanup, it also checks that the channel is empty, to make sure
// all expected messages have been asserted.
func (n *Node) MakeChannel(t *testing.T, chID p2p.ChannelID, messageType proto.Message, size int) *p2p.Channel {
channel, err := n.Router.OpenChannel(chID, messageType, size)
func (n *Node) MakeChannel(t *testing.T, chDesc p2p.ChannelDescriptor,
messageType proto.Message, size int) *p2p.Channel {
channel, err := n.Router.OpenChannel(chDesc, messageType, size)
require.NoError(t, err)
t.Cleanup(func() {
RequireEmpty(t, channel)
@ -293,12 +294,12 @@ func (n *Node) MakeChannel(t *testing.T, chID p2p.ChannelID, messageType proto.M
// caller must ensure proper cleanup of the channel.
func (n *Node) MakeChannelNoCleanup(
t *testing.T,
chID p2p.ChannelID,
chDesc p2p.ChannelDescriptor,
messageType proto.Message,
size int,
) *p2p.Channel {
channel, err := n.Router.OpenChannel(chID, messageType, size)
channel, err := n.Router.OpenChannel(chDesc, messageType, size)
require.NoError(t, err)
return channel
}
@ -328,3 +329,13 @@ func (n *Node) MakePeerUpdatesNoRequireEmpty(t *testing.T) *p2p.PeerUpdates {
return sub
}
func MakeChannelDesc(chID p2p.ChannelID) p2p.ChannelDescriptor {
return p2p.ChannelDescriptor{
ID: byte(chID),
Priority: 5,
SendQueueCapacity: 10,
RecvMessageCapacity: 10,
MaxSendBytes: 1000,
}
}

+ 8
- 10
p2p/pex/reactor.go View File

@ -45,16 +45,14 @@ const (
// within each reactor (as they are now) or, considering that the reactor doesn't
// really need to care about the channel descriptors, if they should be housed
// in the node module.
func ChannelDescriptors() []*conn.ChannelDescriptor {
return []*conn.ChannelDescriptor{
{
ID: PexChannel,
Priority: 1,
SendQueueCapacity: 10,
RecvMessageCapacity: maxMsgSize,
MaxSendBytes: 200,
},
func ChannelDescriptor() conn.ChannelDescriptor {
return conn.ChannelDescriptor{
ID: PexChannel,
Priority: 1,
SendQueueCapacity: 10,
RecvMessageCapacity: maxMsgSize,
MaxSendBytes: 200,
}
}


+ 2
- 2
p2p/pex/reactor_test.go View File

@ -350,7 +350,7 @@ func setupNetwork(t *testing.T, opts testOptions) *reactorTestSuite {
// NOTE: we don't assert that the channels get drained after stopping the
// reactor
rts.pexChannels = rts.network.MakeChannelsNoCleanup(
t, p2p.ChannelID(pex.PexChannel), new(proto.PexMessage), chBuf,
t, pex.ChannelDescriptor(), new(proto.PexMessage), chBuf,
)
idx := 0
@ -416,7 +416,7 @@ func (r *reactorTestSuite) addNodes(t *testing.T, nodes int) {
r.network.Nodes[node.NodeID] = node
nodeID := node.NodeID
r.pexChannels[nodeID] = node.MakeChannelNoCleanup(
t, p2p.ChannelID(pex.PexChannel), new(proto.PexMessage), r.opts.BufferSize,
t, pex.ChannelDescriptor(), new(proto.PexMessage), r.opts.BufferSize,
)
r.peerChans[nodeID] = make(chan p2p.PeerUpdate, r.opts.BufferSize)
r.peerUpdates[nodeID] = p2p.NewPeerUpdates(r.peerChans[nodeID], r.opts.BufferSize)


+ 3
- 9
p2p/router.go View File

@ -344,21 +344,13 @@ func (r *Router) createQueueFactory() (func(int) queue, error) {
}
}
// AddChannelDescriptors adds a set of ChannelDescriptors to the reactor. Note,
// this should be called before the router is started and any connections are made.
func (r *Router) AddChannelDescriptors(chDescs []*ChannelDescriptor) {
for _, chDesc := range chDescs {
r.chDescs = append(r.chDescs, *chDesc)
}
}
// OpenChannel opens a new channel for the given message type. The caller must
// close the channel when done, before stopping the Router. messageType is the
// type of message passed through the channel (used for unmarshaling), which can
// implement Wrapper to automatically (un)wrap multiple message types in a
// wrapper message. The caller may provide a size to make the channel buffered,
// which internally makes the inbound, outbound, and error channel buffered.
func (r *Router) OpenChannel(id ChannelID, messageType proto.Message, size int) (*Channel, error) {
func (r *Router) OpenChannel(chDesc ChannelDescriptor, messageType proto.Message, size int) (*Channel, error) {
if size == 0 {
size = queueBufferDefault
}
@ -366,9 +358,11 @@ func (r *Router) OpenChannel(id ChannelID, messageType proto.Message, size int)
r.channelMtx.Lock()
defer r.channelMtx.Unlock()
id := ChannelID(chDesc.ID)
if _, ok := r.channelQueues[id]; ok {
return nil, fmt.Errorf("channel %v already exists", id)
}
r.chDescs = append(r.chDescs, chDesc)
queue := r.queueFactory(size)
outCh := make(chan Envelope, size)


+ 11
- 10
p2p/router_test.go View File

@ -49,7 +49,7 @@ func TestRouter_Network(t *testing.T) {
local := network.RandomNode()
peers := network.Peers(local.NodeID)
channels := network.MakeChannels(t, 1, &p2ptest.Message{}, 0)
channels := network.MakeChannels(t, chDesc, &p2ptest.Message{}, 0)
channel := channels[local.NodeID]
for _, peer := range peers {
@ -116,22 +116,23 @@ func TestRouter_Channel(t *testing.T) {
})
// Opening a channel should work.
channel, err := router.OpenChannel(chID, &p2ptest.Message{}, 0)
channel, err := router.OpenChannel(chDesc, &p2ptest.Message{}, 0)
require.NoError(t, err)
// Opening the same channel again should fail.
_, err = router.OpenChannel(chID, &p2ptest.Message{}, 0)
_, err = router.OpenChannel(chDesc, &p2ptest.Message{}, 0)
require.Error(t, err)
// Opening a different channel should work.
_, err = router.OpenChannel(2, &p2ptest.Message{}, 0)
chDesc2 := p2p.ChannelDescriptor{ID: byte(2)}
_, err = router.OpenChannel(chDesc2, &p2ptest.Message{}, 0)
require.NoError(t, err)
// Closing the channel, then opening it again should be fine.
channel.Close()
time.Sleep(100 * time.Millisecond) // yes yes, but Close() is async...
channel, err = router.OpenChannel(chID, &p2ptest.Message{}, 0)
channel, err = router.OpenChannel(chDesc, &p2ptest.Message{}, 0)
require.NoError(t, err)
// We should be able to send on the channel, even though there are no peers.
@ -158,9 +159,9 @@ func TestRouter_Channel_SendReceive(t *testing.T) {
ids := network.NodeIDs()
aID, bID, cID := ids[0], ids[1], ids[2]
channels := network.MakeChannels(t, chID, &p2ptest.Message{}, 0)
channels := network.MakeChannels(t, chDesc, &p2ptest.Message{}, 0)
a, b, c := channels[aID], channels[bID], channels[cID]
otherChannels := network.MakeChannels(t, 9, &p2ptest.Message{}, 0)
otherChannels := network.MakeChannels(t, p2ptest.MakeChannelDesc(9), &p2ptest.Message{}, 0)
// Sending a message a->b should work, and not send anything
// further to a, b, or c.
@ -216,7 +217,7 @@ func TestRouter_Channel_Broadcast(t *testing.T) {
ids := network.NodeIDs()
aID, bID, cID, dID := ids[0], ids[1], ids[2], ids[3]
channels := network.MakeChannels(t, 1, &p2ptest.Message{}, 0)
channels := network.MakeChannels(t, chDesc, &p2ptest.Message{}, 0)
a, b, c, d := channels[aID], channels[bID], channels[cID], channels[dID]
// Sending a broadcast from b should work.
@ -243,7 +244,7 @@ func TestRouter_Channel_Wrapper(t *testing.T) {
ids := network.NodeIDs()
aID, bID := ids[0], ids[1]
channels := network.MakeChannels(t, 1, &wrapperMessage{}, 0)
channels := network.MakeChannels(t, chDesc, &wrapperMessage{}, 0)
a, b := channels[aID], channels[bID]
// Since wrapperMessage implements p2p.Wrapper and handles Message, it
@ -301,7 +302,7 @@ func TestRouter_Channel_Error(t *testing.T) {
ids := network.NodeIDs()
aID, bID := ids[0], ids[1]
channels := network.MakeChannels(t, 1, &p2ptest.Message{}, 0)
channels := network.MakeChannels(t, chDesc, &p2ptest.Message{}, 0)
a := channels[aID]
// Erroring b should cause it to be disconnected. It will reconnect shortly after.


Loading…
Cancel
Save