Browse Source

p2p: refactor channel description (#7130)

This is another small sliver of #7075, with the intention of removing
the legacy shim layer related to channel registration.
pull/7132/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
f4a56f4034
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 17 additions and 36 deletions
  1. +1
    -2
      internal/blocksync/reactor.go
  2. +4
    -8
      internal/consensus/reactor.go
  3. +1
    -2
      internal/evidence/reactor.go
  4. +1
    -2
      internal/mempool/v0/reactor.go
  5. +1
    -2
      internal/mempool/v1/reactor.go
  6. +2
    -4
      internal/p2p/conn/connection.go
  7. +0
    -1
      internal/p2p/p2p_test.go
  8. +0
    -1
      internal/p2p/p2ptest/network.go
  9. +0
    -1
      internal/p2p/pex/reactor.go
  10. +1
    -1
      internal/p2p/pqueue_test.go
  11. +1
    -3
      internal/p2p/shim.go
  12. +4
    -8
      internal/statesync/reactor.go
  13. +1
    -1
      node/node.go

+ 1
- 2
internal/blocksync/reactor.go View File

@ -29,14 +29,13 @@ var (
// ref: https://github.com/tendermint/tendermint/issues/5670 // ref: https://github.com/tendermint/tendermint/issues/5670
ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
BlockSyncChannel: { BlockSyncChannel: {
MsgType: new(bcproto.Message),
Descriptor: &p2p.ChannelDescriptor{ Descriptor: &p2p.ChannelDescriptor{
ID: byte(BlockSyncChannel), ID: byte(BlockSyncChannel),
MessageType: new(bcproto.Message),
Priority: 5, Priority: 5,
SendQueueCapacity: 1000, SendQueueCapacity: 1000,
RecvBufferCapacity: 1024, RecvBufferCapacity: 1024,
RecvMessageCapacity: MaxMsgSize, RecvMessageCapacity: MaxMsgSize,
MaxSendBytes: 100,
}, },
}, },
} }


+ 4
- 8
internal/consensus/reactor.go View File

@ -31,50 +31,46 @@ var (
// ref: https://github.com/tendermint/tendermint/issues/5670 // ref: https://github.com/tendermint/tendermint/issues/5670
ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
StateChannel: { StateChannel: {
MsgType: new(tmcons.Message),
Descriptor: &p2p.ChannelDescriptor{ Descriptor: &p2p.ChannelDescriptor{
ID: byte(StateChannel), ID: byte(StateChannel),
MessageType: new(tmcons.Message),
Priority: 8, Priority: 8,
SendQueueCapacity: 64, SendQueueCapacity: 64,
RecvMessageCapacity: maxMsgSize, RecvMessageCapacity: maxMsgSize,
RecvBufferCapacity: 128, RecvBufferCapacity: 128,
MaxSendBytes: 12000,
}, },
}, },
DataChannel: { DataChannel: {
MsgType: new(tmcons.Message),
Descriptor: &p2p.ChannelDescriptor{ Descriptor: &p2p.ChannelDescriptor{
// TODO: Consider a split between gossiping current block and catchup // TODO: Consider a split between gossiping current block and catchup
// stuff. Once we gossip the whole block there is nothing left to send // stuff. Once we gossip the whole block there is nothing left to send
// until next height or round. // until next height or round.
ID: byte(DataChannel), ID: byte(DataChannel),
MessageType: new(tmcons.Message),
Priority: 12, Priority: 12,
SendQueueCapacity: 64, SendQueueCapacity: 64,
RecvBufferCapacity: 512, RecvBufferCapacity: 512,
RecvMessageCapacity: maxMsgSize, RecvMessageCapacity: maxMsgSize,
MaxSendBytes: 40000,
}, },
}, },
VoteChannel: { VoteChannel: {
MsgType: new(tmcons.Message),
Descriptor: &p2p.ChannelDescriptor{ Descriptor: &p2p.ChannelDescriptor{
ID: byte(VoteChannel), ID: byte(VoteChannel),
MessageType: new(tmcons.Message),
Priority: 10, Priority: 10,
SendQueueCapacity: 64, SendQueueCapacity: 64,
RecvBufferCapacity: 128, RecvBufferCapacity: 128,
RecvMessageCapacity: maxMsgSize, RecvMessageCapacity: maxMsgSize,
MaxSendBytes: 150,
}, },
}, },
VoteSetBitsChannel: { VoteSetBitsChannel: {
MsgType: new(tmcons.Message),
Descriptor: &p2p.ChannelDescriptor{ Descriptor: &p2p.ChannelDescriptor{
ID: byte(VoteSetBitsChannel), ID: byte(VoteSetBitsChannel),
MessageType: new(tmcons.Message),
Priority: 5, Priority: 5,
SendQueueCapacity: 8, SendQueueCapacity: 8,
RecvBufferCapacity: 128, RecvBufferCapacity: 128,
RecvMessageCapacity: maxMsgSize, RecvMessageCapacity: maxMsgSize,
MaxSendBytes: 50,
}, },
}, },
} }


+ 1
- 2
internal/evidence/reactor.go View File

@ -27,13 +27,12 @@ var (
// ref: https://github.com/tendermint/tendermint/issues/5670 // ref: https://github.com/tendermint/tendermint/issues/5670
ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
EvidenceChannel: { EvidenceChannel: {
MsgType: new(tmproto.EvidenceList),
Descriptor: &p2p.ChannelDescriptor{ Descriptor: &p2p.ChannelDescriptor{
ID: byte(EvidenceChannel), ID: byte(EvidenceChannel),
MessageType: new(tmproto.EvidenceList),
Priority: 6, Priority: 6,
RecvMessageCapacity: maxMsgSize, RecvMessageCapacity: maxMsgSize,
RecvBufferCapacity: 32, RecvBufferCapacity: 32,
MaxSendBytes: 400,
}, },
}, },
} }


+ 1
- 2
internal/mempool/v0/reactor.go View File

@ -100,13 +100,12 @@ func GetChannelShims(cfg *config.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDe
return map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ return map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
mempool.MempoolChannel: { mempool.MempoolChannel: {
MsgType: new(protomem.Message),
Descriptor: &p2p.ChannelDescriptor{ Descriptor: &p2p.ChannelDescriptor{
ID: byte(mempool.MempoolChannel), ID: byte(mempool.MempoolChannel),
MessageType: new(protomem.Message),
Priority: 5, Priority: 5,
RecvMessageCapacity: batchMsg.Size(), RecvMessageCapacity: batchMsg.Size(),
RecvBufferCapacity: 128, RecvBufferCapacity: 128,
MaxSendBytes: 5000,
}, },
}, },
} }


+ 1
- 2
internal/mempool/v1/reactor.go View File

@ -107,13 +107,12 @@ func GetChannelShims(cfg *config.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDe
return map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ return map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
mempool.MempoolChannel: { mempool.MempoolChannel: {
MsgType: new(protomem.Message),
Descriptor: &p2p.ChannelDescriptor{ Descriptor: &p2p.ChannelDescriptor{
ID: byte(mempool.MempoolChannel), ID: byte(mempool.MempoolChannel),
MessageType: new(protomem.Message),
Priority: 5, Priority: 5,
RecvMessageCapacity: batchMsg.Size(), RecvMessageCapacity: batchMsg.Size(),
RecvBufferCapacity: 128, RecvBufferCapacity: 128,
MaxSendBytes: 5000,
}, },
}, },
} }


+ 2
- 4
internal/p2p/conn/connection.go View File

@ -612,6 +612,8 @@ type ChannelDescriptor struct {
ID byte ID byte
Priority int Priority int
MessageType proto.Message
// TODO: Remove once p2p refactor is complete. // TODO: Remove once p2p refactor is complete.
SendQueueCapacity int SendQueueCapacity int
RecvMessageCapacity int RecvMessageCapacity int
@ -619,10 +621,6 @@ type ChannelDescriptor struct {
// RecvBufferCapacity defines the max buffer size of inbound messages for a // RecvBufferCapacity defines the max buffer size of inbound messages for a
// given p2p Channel queue. // given p2p Channel queue.
RecvBufferCapacity int RecvBufferCapacity int
// MaxSendBytes defines the maximum number of bytes that can be sent at any
// given moment from a Channel to a peer.
MaxSendBytes uint
} }
func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) { func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {


+ 0
- 1
internal/p2p/p2p_test.go View File

@ -19,7 +19,6 @@ var (
Priority: 5, Priority: 5,
SendQueueCapacity: 10, SendQueueCapacity: 10,
RecvMessageCapacity: 10, RecvMessageCapacity: 10,
MaxSendBytes: 1000,
} }
selfKey crypto.PrivKey = ed25519.GenPrivKeyFromSecret([]byte{0xf9, 0x1b, 0x08, 0xaa, 0x38, 0xee, 0x34, 0xdd}) selfKey crypto.PrivKey = ed25519.GenPrivKeyFromSecret([]byte{0xf9, 0x1b, 0x08, 0xaa, 0x38, 0xee, 0x34, 0xdd})


+ 0
- 1
internal/p2p/p2ptest/network.go View File

@ -339,6 +339,5 @@ func MakeChannelDesc(chID p2p.ChannelID) p2p.ChannelDescriptor {
Priority: 5, Priority: 5,
SendQueueCapacity: 10, SendQueueCapacity: 10,
RecvMessageCapacity: 10, RecvMessageCapacity: 10,
MaxSendBytes: 1000,
} }
} }

+ 0
- 1
internal/p2p/pex/reactor.go View File

@ -69,7 +69,6 @@ func ChannelDescriptor() conn.ChannelDescriptor {
SendQueueCapacity: 10, SendQueueCapacity: 10,
RecvMessageCapacity: maxMsgSize, RecvMessageCapacity: maxMsgSize,
RecvBufferCapacity: 32, RecvBufferCapacity: 32,
MaxSendBytes: 200,
} }
} }


+ 1
- 1
internal/p2p/pqueue_test.go View File

@ -13,7 +13,7 @@ type testMessage = gogotypes.StringValue
func TestCloseWhileDequeueFull(t *testing.T) { func TestCloseWhileDequeueFull(t *testing.T) {
enqueueLength := 5 enqueueLength := 5
chDescs := []ChannelDescriptor{ chDescs := []ChannelDescriptor{
{ID: 0x01, Priority: 1, MaxSendBytes: 4},
{ID: 0x01, Priority: 1},
} }
pqueue := newPQScheduler(log.NewNopLogger(), NopMetrics(), chDescs, uint(enqueueLength), 1, 120) pqueue := newPQScheduler(log.NewNopLogger(), NopMetrics(), chDescs, uint(enqueueLength), 1, 120)


+ 1
- 3
internal/p2p/shim.go View File

@ -3,7 +3,6 @@ package p2p
import ( import (
"sort" "sort"
"github.com/gogo/protobuf/proto"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
) )
@ -12,7 +11,6 @@ import (
// A ChannelDescriptorShim is not contained in ReactorShim, but is rather // A ChannelDescriptorShim is not contained in ReactorShim, but is rather
// used to construct a ReactorShim. // used to construct a ReactorShim.
type ChannelDescriptorShim struct { type ChannelDescriptorShim struct {
MsgType proto.Message
Descriptor *ChannelDescriptor Descriptor *ChannelDescriptor
} }
@ -61,7 +59,7 @@ func NewChannelShim(cds *ChannelDescriptorShim, buf uint) *ChannelShim {
Descriptor: cds.Descriptor, Descriptor: cds.Descriptor,
Channel: NewChannel( Channel: NewChannel(
ChannelID(cds.Descriptor.ID), ChannelID(cds.Descriptor.ID),
cds.MsgType,
cds.Descriptor.MessageType,
inCh, inCh,
outCh, outCh,
errCh, errCh,


+ 4
- 8
internal/statesync/reactor.go View File

@ -38,47 +38,43 @@ var (
// ref: https://github.com/tendermint/tendermint/issues/5670 // ref: https://github.com/tendermint/tendermint/issues/5670
ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
SnapshotChannel: { SnapshotChannel: {
MsgType: new(ssproto.Message),
Descriptor: &p2p.ChannelDescriptor{ Descriptor: &p2p.ChannelDescriptor{
ID: byte(SnapshotChannel), ID: byte(SnapshotChannel),
MessageType: new(ssproto.Message),
Priority: 6, Priority: 6,
SendQueueCapacity: 10, SendQueueCapacity: 10,
RecvMessageCapacity: snapshotMsgSize, RecvMessageCapacity: snapshotMsgSize,
RecvBufferCapacity: 128, RecvBufferCapacity: 128,
MaxSendBytes: 400,
}, },
}, },
ChunkChannel: { ChunkChannel: {
MsgType: new(ssproto.Message),
Descriptor: &p2p.ChannelDescriptor{ Descriptor: &p2p.ChannelDescriptor{
ID: byte(ChunkChannel), ID: byte(ChunkChannel),
Priority: 3, Priority: 3,
MessageType: new(ssproto.Message),
SendQueueCapacity: 4, SendQueueCapacity: 4,
RecvMessageCapacity: chunkMsgSize, RecvMessageCapacity: chunkMsgSize,
RecvBufferCapacity: 128, RecvBufferCapacity: 128,
MaxSendBytes: 400,
}, },
}, },
LightBlockChannel: { LightBlockChannel: {
MsgType: new(ssproto.Message),
Descriptor: &p2p.ChannelDescriptor{ Descriptor: &p2p.ChannelDescriptor{
ID: byte(LightBlockChannel), ID: byte(LightBlockChannel),
MessageType: new(ssproto.Message),
Priority: 5, Priority: 5,
SendQueueCapacity: 10, SendQueueCapacity: 10,
RecvMessageCapacity: lightBlockMsgSize, RecvMessageCapacity: lightBlockMsgSize,
RecvBufferCapacity: 128, RecvBufferCapacity: 128,
MaxSendBytes: 400,
}, },
}, },
ParamsChannel: { ParamsChannel: {
MsgType: new(ssproto.Message),
Descriptor: &p2p.ChannelDescriptor{ Descriptor: &p2p.ChannelDescriptor{
ID: byte(ParamsChannel), ID: byte(ParamsChannel),
MessageType: new(ssproto.Message),
Priority: 2, Priority: 2,
SendQueueCapacity: 10, SendQueueCapacity: 10,
RecvMessageCapacity: paramMsgSize, RecvMessageCapacity: paramMsgSize,
RecvBufferCapacity: 128, RecvBufferCapacity: 128,
MaxSendBytes: 400,
}, },
}, },
} }


+ 1
- 1
node/node.go View File

@ -1119,7 +1119,7 @@ func makeChannelsFromShims(
channels := map[p2p.ChannelID]*p2p.Channel{} channels := map[p2p.ChannelID]*p2p.Channel{}
for chID, chShim := range chShims { for chID, chShim := range chShims {
ch, err := router.OpenChannel(*chShim.Descriptor, chShim.MsgType, chShim.Descriptor.RecvBufferCapacity)
ch, err := router.OpenChannel(*chShim.Descriptor, chShim.Descriptor.MessageType, chShim.Descriptor.RecvBufferCapacity)
if err != nil { if err != nil {
panic(fmt.Sprintf("failed to open channel %v: %v", chID, err)) panic(fmt.Sprintf("failed to open channel %v: %v", chID, err))
} }


Loading…
Cancel
Save