diff --git a/internal/blockchain/v0/reactor.go b/internal/blockchain/v0/reactor.go index 859b0fe97..444dfff56 100644 --- a/internal/blockchain/v0/reactor.go +++ b/internal/blockchain/v0/reactor.go @@ -33,10 +33,9 @@ var ( ID: byte(BlockchainChannel), Priority: 5, SendQueueCapacity: 1000, - RecvBufferCapacity: 50 * 4096, + RecvBufferCapacity: 1024, RecvMessageCapacity: bc.MaxMsgSize, - - MaxSendBytes: 100, + MaxSendBytes: 100, }, }, } diff --git a/internal/blockchain/v2/reactor.go b/internal/blockchain/v2/reactor.go index 861b3033d..7e15d9675 100644 --- a/internal/blockchain/v2/reactor.go +++ b/internal/blockchain/v2/reactor.go @@ -585,7 +585,7 @@ func (r *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { ID: BlockchainChannel, Priority: 5, SendQueueCapacity: 2000, - RecvBufferCapacity: 50 * 4096, + RecvBufferCapacity: 1024, RecvMessageCapacity: bc.MaxMsgSize, }, } diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index eb2868f7e..612b83efd 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -33,11 +33,11 @@ var ( MsgType: new(tmcons.Message), Descriptor: &p2p.ChannelDescriptor{ ID: byte(StateChannel), - Priority: 6, - SendQueueCapacity: 100, + Priority: 8, + SendQueueCapacity: 64, RecvMessageCapacity: maxMsgSize, - - MaxSendBytes: 12000, + RecvBufferCapacity: 128, + MaxSendBytes: 12000, }, }, DataChannel: { @@ -47,36 +47,33 @@ var ( // stuff. Once we gossip the whole block there is nothing left to send // until next height or round. ID: byte(DataChannel), - Priority: 10, - SendQueueCapacity: 100, - RecvBufferCapacity: 50 * 4096, + Priority: 12, + SendQueueCapacity: 64, + RecvBufferCapacity: 512, RecvMessageCapacity: maxMsgSize, - - MaxSendBytes: 40000, + MaxSendBytes: 40000, }, }, VoteChannel: { MsgType: new(tmcons.Message), Descriptor: &p2p.ChannelDescriptor{ ID: byte(VoteChannel), - Priority: 7, - SendQueueCapacity: 100, - RecvBufferCapacity: 100 * 100, + Priority: 10, + SendQueueCapacity: 64, + RecvBufferCapacity: 128, RecvMessageCapacity: maxMsgSize, - - MaxSendBytes: 150, + MaxSendBytes: 150, }, }, VoteSetBitsChannel: { MsgType: new(tmcons.Message), Descriptor: &p2p.ChannelDescriptor{ ID: byte(VoteSetBitsChannel), - Priority: 1, - SendQueueCapacity: 2, - RecvBufferCapacity: 1024, + Priority: 5, + SendQueueCapacity: 8, + RecvBufferCapacity: 128, RecvMessageCapacity: maxMsgSize, - - MaxSendBytes: 50, + MaxSendBytes: 50, }, }, } diff --git a/internal/evidence/reactor.go b/internal/evidence/reactor.go index eca078f44..57202d5e8 100644 --- a/internal/evidence/reactor.go +++ b/internal/evidence/reactor.go @@ -31,8 +31,8 @@ var ( ID: byte(EvidenceChannel), Priority: 6, RecvMessageCapacity: maxMsgSize, - - MaxSendBytes: 400, + RecvBufferCapacity: 32, + MaxSendBytes: 400, }, }, } diff --git a/internal/mempool/v0/reactor.go b/internal/mempool/v0/reactor.go index 1b39b0b6b..5bfff0efe 100644 --- a/internal/mempool/v0/reactor.go +++ b/internal/mempool/v0/reactor.go @@ -104,8 +104,8 @@ func GetChannelShims(config *cfg.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDe ID: byte(mempool.MempoolChannel), Priority: 5, RecvMessageCapacity: batchMsg.Size(), - - MaxSendBytes: 5000, + RecvBufferCapacity: 128, + MaxSendBytes: 5000, }, }, } diff --git a/internal/mempool/v1/reactor.go b/internal/mempool/v1/reactor.go index 5bb2bbcbb..6fdfe2d06 100644 --- a/internal/mempool/v1/reactor.go +++ b/internal/mempool/v1/reactor.go @@ -103,8 +103,8 @@ func GetChannelShims(config *cfg.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDe ID: byte(mempool.MempoolChannel), Priority: 5, RecvMessageCapacity: batchMsg.Size(), - - MaxSendBytes: 5000, + RecvBufferCapacity: 128, + MaxSendBytes: 5000, }, }, } diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index 7ff6e35ef..4a8738fd8 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -51,8 +51,8 @@ func ChannelDescriptor() conn.ChannelDescriptor { Priority: 1, SendQueueCapacity: 10, RecvMessageCapacity: maxMsgSize, - - MaxSendBytes: 200, + RecvBufferCapacity: 32, + MaxSendBytes: 200, } } @@ -417,6 +417,7 @@ func (r *ReactorV2) sendRequestForPeers() { // no peers are available r.Logger.Debug("no available peers to send request to, waiting...") r.nextRequestTime = time.Now().Add(noAvailablePeersWaitPeriod) + return } var peerID types.NodeID diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 9666794bc..ff4a34ccd 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -19,7 +19,7 @@ import ( "github.com/tendermint/tendermint/types" ) -const queueBufferDefault = 4096 +const queueBufferDefault = 32 // ChannelID is an arbitrary channel ID. type ChannelID uint16 @@ -365,10 +365,6 @@ func (r *Router) createQueueFactory() (func(int) queue, error) { // wrapper message. The caller may provide a size to make the channel buffered, // which internally makes the inbound, outbound, and error channel buffered. func (r *Router) OpenChannel(chDesc ChannelDescriptor, messageType proto.Message, size int) (*Channel, error) { - if size == 0 { - size = queueBufferDefault - } - r.channelMtx.Lock() defer r.channelMtx.Unlock() diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 513000c1d..1e35c1c3a 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -41,8 +41,8 @@ var ( Priority: 6, SendQueueCapacity: 10, RecvMessageCapacity: snapshotMsgSize, - - MaxSendBytes: 400, + RecvBufferCapacity: 128, + MaxSendBytes: 400, }, }, ChunkChannel: { @@ -52,8 +52,8 @@ var ( Priority: 3, SendQueueCapacity: 4, RecvMessageCapacity: chunkMsgSize, - - MaxSendBytes: 400, + RecvBufferCapacity: 128, + MaxSendBytes: 400, }, }, LightBlockChannel: { @@ -63,8 +63,8 @@ var ( Priority: 2, SendQueueCapacity: 10, RecvMessageCapacity: lightBlockMsgSize, - - MaxSendBytes: 400, + RecvBufferCapacity: 128, + MaxSendBytes: 400, }, }, } diff --git a/node/setup.go b/node/setup.go index 763713095..ac48353e3 100644 --- a/node/setup.go +++ b/node/setup.go @@ -698,7 +698,7 @@ func createPEXReactorV2( router *p2p.Router, ) (*pex.ReactorV2, error) { - channel, err := router.OpenChannel(pex.ChannelDescriptor(), &protop2p.PexMessage{}, 4096) + channel, err := router.OpenChannel(pex.ChannelDescriptor(), &protop2p.PexMessage{}, 128) if err != nil { return nil, err }