Browse Source

p2p: flatten channel descriptor (#7132)

pull/7136/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
cbe6ad6cd5
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 124 additions and 155 deletions
  1. +8
    -10
      internal/blocksync/reactor.go
  2. +1
    -1
      internal/blocksync/reactor_test.go
  3. +32
    -40
      internal/consensus/reactor.go
  4. +2
    -2
      internal/consensus/reactor_test.go
  5. +7
    -9
      internal/evidence/reactor.go
  6. +1
    -1
      internal/evidence/reactor_test.go
  7. +8
    -10
      internal/mempool/v0/reactor.go
  8. +1
    -1
      internal/mempool/v0/reactor_test.go
  9. +8
    -10
      internal/mempool/v1/reactor.go
  10. +1
    -1
      internal/mempool/v1/reactor_test.go
  11. +1
    -1
      internal/p2p/p2p_test.go
  12. +6
    -6
      internal/p2p/p2ptest/network.go
  13. +4
    -3
      internal/p2p/pex/reactor.go
  14. +3
    -3
      internal/p2p/pqueue.go
  15. +1
    -1
      internal/p2p/pqueue_test.go
  16. +4
    -4
      internal/p2p/router.go
  17. +1
    -1
      internal/p2p/router_test.go
  18. +0
    -9
      internal/p2p/shim.go
  19. +29
    -37
      internal/statesync/reactor.go
  20. +6
    -5
      node/node.go

+ 8
- 10
internal/blocksync/reactor.go View File

@ -27,16 +27,14 @@ var (
//
// TODO: Remove once p2p refactor is complete.
// ref: https://github.com/tendermint/tendermint/issues/5670
ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
BlockSyncChannel: {
Descriptor: &p2p.ChannelDescriptor{
ID: BlockSyncChannel,
MessageType: new(bcproto.Message),
Priority: 5,
SendQueueCapacity: 1000,
RecvBufferCapacity: 1024,
RecvMessageCapacity: MaxMsgSize,
},
ChannelShims = []*p2p.ChannelDescriptor{
{
ID: BlockSyncChannel,
MessageType: new(bcproto.Message),
Priority: 5,
SendQueueCapacity: 1000,
RecvBufferCapacity: 1024,
RecvMessageCapacity: MaxMsgSize,
},
}
)


+ 1
- 1
internal/blocksync/reactor_test.go View File

@ -65,7 +65,7 @@ func setup(
blockSync: true,
}
chDesc := p2p.ChannelDescriptor{ID: BlockSyncChannel}
chDesc := &p2p.ChannelDescriptor{ID: BlockSyncChannel}
rts.blockSyncChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(bcproto.Message), int(chBuf))
i := 0


+ 32
- 40
internal/consensus/reactor.go View File

@ -29,49 +29,41 @@ var (
//
// TODO: Remove once p2p refactor is complete.
// ref: https://github.com/tendermint/tendermint/issues/5670
ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
StateChannel: {
Descriptor: &p2p.ChannelDescriptor{
ID: StateChannel,
MessageType: new(tmcons.Message),
Priority: 8,
SendQueueCapacity: 64,
RecvMessageCapacity: maxMsgSize,
RecvBufferCapacity: 128,
},
ChannelShims = []*p2p.ChannelDescriptor{
{
ID: StateChannel,
MessageType: new(tmcons.Message),
Priority: 8,
SendQueueCapacity: 64,
RecvMessageCapacity: maxMsgSize,
RecvBufferCapacity: 128,
},
DataChannel: {
Descriptor: &p2p.ChannelDescriptor{
// TODO: Consider a split between gossiping current block and catchup
// stuff. Once we gossip the whole block there is nothing left to send
// until next height or round.
ID: DataChannel,
MessageType: new(tmcons.Message),
Priority: 12,
SendQueueCapacity: 64,
RecvBufferCapacity: 512,
RecvMessageCapacity: maxMsgSize,
},
{
// TODO: Consider a split between gossiping current block and catchup
// stuff. Once we gossip the whole block there is nothing left to send
// until next height or round.
ID: DataChannel,
MessageType: new(tmcons.Message),
Priority: 12,
SendQueueCapacity: 64,
RecvBufferCapacity: 512,
RecvMessageCapacity: maxMsgSize,
},
VoteChannel: {
Descriptor: &p2p.ChannelDescriptor{
ID: VoteChannel,
MessageType: new(tmcons.Message),
Priority: 10,
SendQueueCapacity: 64,
RecvBufferCapacity: 128,
RecvMessageCapacity: maxMsgSize,
},
{
ID: VoteChannel,
MessageType: new(tmcons.Message),
Priority: 10,
SendQueueCapacity: 64,
RecvBufferCapacity: 128,
RecvMessageCapacity: maxMsgSize,
},
VoteSetBitsChannel: {
Descriptor: &p2p.ChannelDescriptor{
ID: VoteSetBitsChannel,
MessageType: new(tmcons.Message),
Priority: 5,
SendQueueCapacity: 8,
RecvBufferCapacity: 128,
RecvMessageCapacity: maxMsgSize,
},
{
ID: VoteSetBitsChannel,
MessageType: new(tmcons.Message),
Priority: 5,
SendQueueCapacity: 8,
RecvBufferCapacity: 128,
RecvMessageCapacity: maxMsgSize,
},
}
)


+ 2
- 2
internal/consensus/reactor_test.go View File

@ -50,8 +50,8 @@ type reactorTestSuite struct {
voteSetBitsChannels map[types.NodeID]*p2p.Channel
}
func chDesc(chID p2p.ChannelID) p2p.ChannelDescriptor {
return p2p.ChannelDescriptor{
func chDesc(chID p2p.ChannelID) *p2p.ChannelDescriptor {
return &p2p.ChannelDescriptor{
ID: chID,
}
}


+ 7
- 9
internal/evidence/reactor.go View File

@ -25,15 +25,13 @@ var (
//
// TODO: Remove once p2p refactor is complete.
// ref: https://github.com/tendermint/tendermint/issues/5670
ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
EvidenceChannel: {
Descriptor: &p2p.ChannelDescriptor{
ID: EvidenceChannel,
MessageType: new(tmproto.EvidenceList),
Priority: 6,
RecvMessageCapacity: maxMsgSize,
RecvBufferCapacity: 32,
},
ChannelShims = []*p2p.ChannelDescriptor{
{
ID: EvidenceChannel,
MessageType: new(tmproto.EvidenceList),
Priority: 6,
RecvMessageCapacity: maxMsgSize,
RecvBufferCapacity: 32,
},
}
)


+ 1
- 1
internal/evidence/reactor_test.go View File

@ -62,7 +62,7 @@ func setup(t *testing.T, stateStores []sm.Store, chBuf uint) *reactorTestSuite {
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numStateStores),
}
chDesc := p2p.ChannelDescriptor{ID: evidence.EvidenceChannel}
chDesc := &p2p.ChannelDescriptor{ID: evidence.EvidenceChannel}
rts.evidenceChannels = rts.network.MakeChannelsNoCleanup(t,
chDesc,
new(tmproto.EvidenceList),


+ 8
- 10
internal/mempool/v0/reactor.go View File

@ -90,7 +90,7 @@ func NewReactor(
//
// TODO: Remove once p2p refactor is complete.
// ref: https://github.com/tendermint/tendermint/issues/5670
func GetChannelShims(cfg *config.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDescriptorShim {
func GetChannelShims(cfg *config.MempoolConfig) []*p2p.ChannelDescriptor {
largestTx := make([]byte, cfg.MaxTxBytes)
batchMsg := protomem.Message{
Sum: &protomem.Message_Txs{
@ -98,15 +98,13 @@ func GetChannelShims(cfg *config.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDe
},
}
return map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
mempool.MempoolChannel: {
Descriptor: &p2p.ChannelDescriptor{
ID: mempool.MempoolChannel,
MessageType: new(protomem.Message),
Priority: 5,
RecvMessageCapacity: batchMsg.Size(),
RecvBufferCapacity: 128,
},
return []*p2p.ChannelDescriptor{
{
ID: mempool.MempoolChannel,
MessageType: new(protomem.Message),
Priority: 5,
RecvMessageCapacity: batchMsg.Size(),
RecvBufferCapacity: 128,
},
}
}


+ 1
- 1
internal/mempool/v0/reactor_test.go View File

@ -50,7 +50,7 @@ func setup(t *testing.T, config *config.MempoolConfig, numNodes int, chBuf uint)
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
}
chDesc := p2p.ChannelDescriptor{ID: mempool.MempoolChannel}
chDesc := &p2p.ChannelDescriptor{ID: mempool.MempoolChannel}
rts.mempoolChnnels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(protomem.Message), int(chBuf))
for nodeID := range rts.network.Nodes {


+ 8
- 10
internal/mempool/v1/reactor.go View File

@ -97,7 +97,7 @@ func defaultObservePanic(r interface{}) {}
//
// TODO: Remove once p2p refactor is complete.
// ref: https://github.com/tendermint/tendermint/issues/5670
func GetChannelShims(cfg *config.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDescriptorShim {
func GetChannelShims(cfg *config.MempoolConfig) []*p2p.ChannelDescriptor {
largestTx := make([]byte, cfg.MaxTxBytes)
batchMsg := protomem.Message{
Sum: &protomem.Message_Txs{
@ -105,15 +105,13 @@ func GetChannelShims(cfg *config.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDe
},
}
return map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
mempool.MempoolChannel: {
Descriptor: &p2p.ChannelDescriptor{
ID: mempool.MempoolChannel,
MessageType: new(protomem.Message),
Priority: 5,
RecvMessageCapacity: batchMsg.Size(),
RecvBufferCapacity: 128,
},
return []*p2p.ChannelDescriptor{
{
ID: mempool.MempoolChannel,
MessageType: new(protomem.Message),
Priority: 5,
RecvMessageCapacity: batchMsg.Size(),
RecvBufferCapacity: 128,
},
}
}


+ 1
- 1
internal/mempool/v1/reactor_test.go View File

@ -52,7 +52,7 @@ func setupReactors(t *testing.T, numNodes int, chBuf uint) *reactorTestSuite {
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
}
chDesc := p2p.ChannelDescriptor{ID: mempool.MempoolChannel}
chDesc := &p2p.ChannelDescriptor{ID: mempool.MempoolChannel}
rts.mempoolChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(protomem.Message), int(chBuf))
for nodeID := range rts.network.Nodes {


+ 1
- 1
internal/p2p/p2p_test.go View File

@ -14,7 +14,7 @@ import (
var (
ctx = context.Background()
chID = p2p.ChannelID(1)
chDesc = p2p.ChannelDescriptor{
chDesc = &p2p.ChannelDescriptor{
ID: chID,
Priority: 5,
SendQueueCapacity: 10,


+ 6
- 6
internal/p2p/p2ptest/network.go View File

@ -137,7 +137,7 @@ func (n *Network) NodeIDs() []types.NodeID {
// doing error checks and cleanups.
func (n *Network) MakeChannels(
t *testing.T,
chDesc p2p.ChannelDescriptor,
chDesc *p2p.ChannelDescriptor,
messageType proto.Message,
size int,
) map[types.NodeID]*p2p.Channel {
@ -153,7 +153,7 @@ func (n *Network) MakeChannels(
// all the channels.
func (n *Network) MakeChannelsNoCleanup(
t *testing.T,
chDesc p2p.ChannelDescriptor,
chDesc *p2p.ChannelDescriptor,
messageType proto.Message,
size int,
) map[types.NodeID]*p2p.Channel {
@ -281,7 +281,7 @@ func (n *Network) MakeNode(t *testing.T, opts NodeOptions) *Node {
// MakeChannel opens a channel, with automatic error handling and cleanup. On
// test cleanup, it also checks that the channel is empty, to make sure
// all expected messages have been asserted.
func (n *Node) MakeChannel(t *testing.T, chDesc p2p.ChannelDescriptor,
func (n *Node) MakeChannel(t *testing.T, chDesc *p2p.ChannelDescriptor,
messageType proto.Message, size int) *p2p.Channel {
channel, err := n.Router.OpenChannel(chDesc, messageType, size)
require.NoError(t, err)
@ -297,7 +297,7 @@ func (n *Node) MakeChannel(t *testing.T, chDesc p2p.ChannelDescriptor,
// caller must ensure proper cleanup of the channel.
func (n *Node) MakeChannelNoCleanup(
t *testing.T,
chDesc p2p.ChannelDescriptor,
chDesc *p2p.ChannelDescriptor,
messageType proto.Message,
size int,
) *p2p.Channel {
@ -333,8 +333,8 @@ func (n *Node) MakePeerUpdatesNoRequireEmpty(t *testing.T) *p2p.PeerUpdates {
return sub
}
func MakeChannelDesc(chID p2p.ChannelID) p2p.ChannelDescriptor {
return p2p.ChannelDescriptor{
func MakeChannelDesc(chID p2p.ChannelID) *p2p.ChannelDescriptor {
return &p2p.ChannelDescriptor{
ID: chID,
Priority: 5,
SendQueueCapacity: 10,


+ 4
- 3
internal/p2p/pex/reactor.go View File

@ -62,13 +62,14 @@ const (
// within each reactor (as they are now) or, considering that the reactor doesn't
// really need to care about the channel descriptors, if they should be housed
// in the node module.
func ChannelDescriptor() conn.ChannelDescriptor {
return conn.ChannelDescriptor{
func ChannelDescriptor() *conn.ChannelDescriptor {
return &conn.ChannelDescriptor{
ID: PexChannel,
MessageType: new(protop2p.PexMessage),
Priority: 1,
SendQueueCapacity: 10,
RecvMessageCapacity: maxMsgSize,
RecvBufferCapacity: 32,
RecvBufferCapacity: 128,
}
}


+ 3
- 3
internal/p2p/pqueue.go View File

@ -71,7 +71,7 @@ type pqScheduler struct {
size uint
sizes map[uint]uint // cumulative priority sizes
pq *priorityQueue
chDescs []ChannelDescriptor
chDescs []*ChannelDescriptor
capacity uint
chPriorities map[ChannelID]uint
@ -84,12 +84,12 @@ type pqScheduler struct {
func newPQScheduler(
logger log.Logger,
m *Metrics,
chDescs []ChannelDescriptor,
chDescs []*ChannelDescriptor,
enqueueBuf, dequeueBuf, capacity uint,
) *pqScheduler {
// copy each ChannelDescriptor and sort them by ascending channel priority
chDescsCopy := make([]ChannelDescriptor, len(chDescs))
chDescsCopy := make([]*ChannelDescriptor, len(chDescs))
copy(chDescsCopy, chDescs)
sort.Slice(chDescsCopy, func(i, j int) bool { return chDescsCopy[i].Priority < chDescsCopy[j].Priority })


+ 1
- 1
internal/p2p/pqueue_test.go View File

@ -12,7 +12,7 @@ type testMessage = gogotypes.StringValue
func TestCloseWhileDequeueFull(t *testing.T) {
enqueueLength := 5
chDescs := []ChannelDescriptor{
chDescs := []*ChannelDescriptor{
{ID: 0x01, Priority: 1},
}
pqueue := newPQScheduler(log.NewNopLogger(), NopMetrics(), chDescs, uint(enqueueLength), 1, 120)


+ 4
- 4
internal/p2p/router.go View File

@ -249,7 +249,7 @@ type Router struct {
nodeInfo types.NodeInfo
privKey crypto.PrivKey
peerManager *PeerManager
chDescs []ChannelDescriptor
chDescs []*ChannelDescriptor
transports []Transport
connTracker connectionTracker
protocolTransports map[Protocol]Transport
@ -295,7 +295,7 @@ func NewRouter(
options.MaxIncomingConnectionAttempts,
options.IncomingConnectionWindow,
),
chDescs: make([]ChannelDescriptor, 0),
chDescs: make([]*ChannelDescriptor, 0),
transports: transports,
protocolTransports: map[Protocol]Transport{},
peerManager: peerManager,
@ -354,7 +354,7 @@ func (r *Router) createQueueFactory() (func(int) queue, error) {
// implement Wrapper to automatically (un)wrap multiple message types in a
// wrapper message. The caller may provide a size to make the channel buffered,
// which internally makes the inbound, outbound, and error channel buffered.
func (r *Router) OpenChannel(chDesc ChannelDescriptor, messageType proto.Message, size int) (*Channel, error) {
func (r *Router) OpenChannel(chDesc *ChannelDescriptor, messageType proto.Message, size int) (*Channel, error) {
r.channelMtx.Lock()
defer r.channelMtx.Unlock()
@ -381,7 +381,7 @@ func (r *Router) OpenChannel(chDesc ChannelDescriptor, messageType proto.Message
r.nodeInfo.AddChannel(uint16(chDesc.ID))
for _, t := range r.transports {
t.AddChannelDescriptors([]*ChannelDescriptor{&chDesc})
t.AddChannelDescriptors([]*ChannelDescriptor{chDesc})
}
go func() {


+ 1
- 1
internal/p2p/router_test.go View File

@ -128,7 +128,7 @@ func TestRouter_Channel_Basic(t *testing.T) {
require.Error(t, err)
// Opening a different channel should work.
chDesc2 := p2p.ChannelDescriptor{ID: 2}
chDesc2 := &p2p.ChannelDescriptor{ID: 2}
_, err = router.OpenChannel(chDesc2, &p2ptest.Message{}, 0)
require.NoError(t, err)
require.Contains(t, router.NodeInfo().Channels, byte(chDesc2.ID))


+ 0
- 9
internal/p2p/shim.go View File

@ -1,9 +0,0 @@
package p2p
// ChannelDescriptorShim defines a shim wrapper around a legacy p2p channel
// and the proto.Message the new p2p Channel is responsible for handling.
// A ChannelDescriptorShim is not contained in ReactorShim, but is rather
// used to construct a ReactorShim.
type ChannelDescriptorShim struct {
Descriptor *ChannelDescriptor
}

+ 29
- 37
internal/statesync/reactor.go View File

@ -36,46 +36,38 @@ var (
//
// TODO: Remove once p2p refactor is complete.
// ref: https://github.com/tendermint/tendermint/issues/5670
ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
SnapshotChannel: {
Descriptor: &p2p.ChannelDescriptor{
ID: SnapshotChannel,
MessageType: new(ssproto.Message),
Priority: 6,
SendQueueCapacity: 10,
RecvMessageCapacity: snapshotMsgSize,
RecvBufferCapacity: 128,
},
ChannelShims = []*p2p.ChannelDescriptor{
{
ID: SnapshotChannel,
MessageType: new(ssproto.Message),
Priority: 6,
SendQueueCapacity: 10,
RecvMessageCapacity: snapshotMsgSize,
RecvBufferCapacity: 128,
},
ChunkChannel: {
Descriptor: &p2p.ChannelDescriptor{
ID: ChunkChannel,
Priority: 3,
MessageType: new(ssproto.Message),
SendQueueCapacity: 4,
RecvMessageCapacity: chunkMsgSize,
RecvBufferCapacity: 128,
},
{
ID: ChunkChannel,
Priority: 3,
MessageType: new(ssproto.Message),
SendQueueCapacity: 4,
RecvMessageCapacity: chunkMsgSize,
RecvBufferCapacity: 128,
},
LightBlockChannel: {
Descriptor: &p2p.ChannelDescriptor{
ID: LightBlockChannel,
MessageType: new(ssproto.Message),
Priority: 5,
SendQueueCapacity: 10,
RecvMessageCapacity: lightBlockMsgSize,
RecvBufferCapacity: 128,
},
{
ID: LightBlockChannel,
MessageType: new(ssproto.Message),
Priority: 5,
SendQueueCapacity: 10,
RecvMessageCapacity: lightBlockMsgSize,
RecvBufferCapacity: 128,
},
ParamsChannel: {
Descriptor: &p2p.ChannelDescriptor{
ID: ParamsChannel,
MessageType: new(ssproto.Message),
Priority: 2,
SendQueueCapacity: 10,
RecvMessageCapacity: paramMsgSize,
RecvBufferCapacity: 128,
},
{
ID: ParamsChannel,
MessageType: new(ssproto.Message),
Priority: 2,
SendQueueCapacity: 10,
RecvMessageCapacity: paramMsgSize,
RecvBufferCapacity: 128,
},
}
)


+ 6
- 5
node/node.go View File

@ -1092,17 +1092,18 @@ func getRouterConfig(conf *config.Config, proxyApp proxy.AppConns) p2p.RouterOpt
// FIXME: Temporary helper function, shims should be removed.
func makeChannelsFromShims(
router *p2p.Router,
chShims map[p2p.ChannelID]*p2p.ChannelDescriptorShim,
chDescs []*p2p.ChannelDescriptor,
) map[p2p.ChannelID]*p2p.Channel {
channels := map[p2p.ChannelID]*p2p.Channel{}
for chID, chShim := range chShims {
ch, err := router.OpenChannel(*chShim.Descriptor, chShim.Descriptor.MessageType, chShim.Descriptor.RecvBufferCapacity)
for idx := range chDescs {
chDesc := chDescs[idx]
ch, err := router.OpenChannel(chDesc, chDesc.MessageType, chDesc.RecvBufferCapacity)
if err != nil {
panic(fmt.Sprintf("failed to open channel %v: %v", chID, err))
panic(fmt.Sprintf("failed to open channel %v: %v", chDesc.ID, err))
}
channels[chID] = ch
channels[chDesc.ID] = ch
}
return channels


Loading…
Cancel
Save