Browse Source

p2p: use recieve for channel iteration (#7425)

pull/7438/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
65c0aaee5e
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 291 additions and 289 deletions
  1. +23
    -17
      internal/blocksync/reactor.go
  2. +49
    -64
      internal/consensus/reactor.go
  3. +12
    -16
      internal/evidence/reactor.go
  4. +12
    -16
      internal/mempool/reactor.go
  5. +12
    -20
      internal/p2p/channel.go
  6. +1
    -1
      internal/p2p/channel_test.go
  7. +1
    -1
      internal/p2p/p2ptest/network.go
  8. +41
    -30
      internal/p2p/p2ptest/require.go
  9. +47
    -31
      internal/p2p/pex/reactor.go
  10. +30
    -24
      internal/p2p/pex/reactor_test.go
  11. +4
    -4
      internal/p2p/pqueue.go
  12. +1
    -1
      internal/p2p/pqueue_test.go
  13. +3
    -3
      internal/p2p/router.go
  14. +36
    -38
      internal/p2p/router_test.go
  15. +19
    -23
      internal/statesync/reactor.go

+ 23
- 17
internal/blocksync/reactor.go View File

@ -166,6 +166,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {
} }
go r.processBlockSyncCh(ctx) go r.processBlockSyncCh(ctx)
go r.processBlockSyncBridge(ctx)
go r.processPeerUpdates(ctx) go r.processPeerUpdates(ctx)
return nil return nil
@ -212,7 +213,7 @@ func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest,
// handleBlockSyncMessage handles envelopes sent from peers on the // handleBlockSyncMessage handles envelopes sent from peers on the
// BlockSyncChannel. It returns an error only if the Envelope.Message is unknown // BlockSyncChannel. It returns an error only if the Envelope.Message is unknown
// for this channel. This should never be called outside of handleMessage. // for this channel. This should never be called outside of handleMessage.
func (r *Reactor) handleBlockSyncMessage(ctx context.Context, envelope p2p.Envelope) error {
func (r *Reactor) handleBlockSyncMessage(ctx context.Context, envelope *p2p.Envelope) error {
logger := r.logger.With("peer", envelope.From) logger := r.logger.With("peer", envelope.From)
switch msg := envelope.Message.(type) { switch msg := envelope.Message.(type) {
@ -251,7 +252,7 @@ func (r *Reactor) handleBlockSyncMessage(ctx context.Context, envelope p2p.Envel
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel. // handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle // It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel. // any error returned by sending a PeerError on the respective channel.
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope p2p.Envelope) (err error) {
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) {
defer func() { defer func() {
if e := recover(); e != nil { if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e) err = fmt.Errorf("panic in processing message: %v", e)
@ -282,25 +283,30 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
// When the reactor is stopped, we will catch the signal and close the p2p Channel // When the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully. // gracefully.
func (r *Reactor) processBlockSyncCh(ctx context.Context) { func (r *Reactor) processBlockSyncCh(ctx context.Context) {
iter := r.blockSyncCh.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, r.blockSyncCh.ID, envelope); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
r.logger.Error("failed to process message", "ch_id", r.blockSyncCh.ID, "envelope", envelope, "err", err)
if serr := r.blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
}
}
}
}
func (r *Reactor) processBlockSyncBridge(ctx context.Context) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
r.logger.Debug("stopped listening on block sync channel; closing...")
return return
case envelope := <-r.blockSyncCh.In:
if err := r.handleMessage(ctx, r.blockSyncCh.ID, envelope); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
r.logger.Error("failed to process message", "ch_id", r.blockSyncCh.ID, "envelope", envelope, "err", err)
if serr := r.blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
}
}
case envelope := <-r.blockSyncOutBridgeCh: case envelope := <-r.blockSyncOutBridgeCh:
if err := r.blockSyncCh.Send(ctx, envelope); err != nil { if err := r.blockSyncCh.Send(ctx, envelope); err != nil {
return return


+ 49
- 64
internal/consensus/reactor.go View File

@ -1070,7 +1070,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// If we fail to find the peer state for the envelope sender, we perform a no-op // If we fail to find the peer state for the envelope sender, we perform a no-op
// and return. This can happen when we process the envelope after the peer is // and return. This can happen when we process the envelope after the peer is
// removed. // removed.
func (r *Reactor) handleStateMessage(ctx context.Context, envelope p2p.Envelope, msgI Message) error {
func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message) error {
ps, ok := r.GetPeerState(envelope.From) ps, ok := r.GetPeerState(envelope.From)
if !ok || ps == nil { if !ok || ps == nil {
r.logger.Debug("failed to find peer state", "peer", envelope.From, "ch_id", "StateChannel") r.logger.Debug("failed to find peer state", "peer", envelope.From, "ch_id", "StateChannel")
@ -1156,7 +1156,7 @@ func (r *Reactor) handleStateMessage(ctx context.Context, envelope p2p.Envelope,
// fail to find the peer state for the envelope sender, we perform a no-op and // fail to find the peer state for the envelope sender, we perform a no-op and
// return. This can happen when we process the envelope after the peer is // return. This can happen when we process the envelope after the peer is
// removed. // removed.
func (r *Reactor) handleDataMessage(ctx context.Context, envelope p2p.Envelope, msgI Message) error {
func (r *Reactor) handleDataMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message) error {
logger := r.logger.With("peer", envelope.From, "ch_id", "DataChannel") logger := r.logger.With("peer", envelope.From, "ch_id", "DataChannel")
ps, ok := r.GetPeerState(envelope.From) ps, ok := r.GetPeerState(envelope.From)
@ -1205,7 +1205,7 @@ func (r *Reactor) handleDataMessage(ctx context.Context, envelope p2p.Envelope,
// fail to find the peer state for the envelope sender, we perform a no-op and // fail to find the peer state for the envelope sender, we perform a no-op and
// return. This can happen when we process the envelope after the peer is // return. This can happen when we process the envelope after the peer is
// removed. // removed.
func (r *Reactor) handleVoteMessage(ctx context.Context, envelope p2p.Envelope, msgI Message) error {
func (r *Reactor) handleVoteMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message) error {
logger := r.logger.With("peer", envelope.From, "ch_id", "VoteChannel") logger := r.logger.With("peer", envelope.From, "ch_id", "VoteChannel")
ps, ok := r.GetPeerState(envelope.From) ps, ok := r.GetPeerState(envelope.From)
@ -1246,7 +1246,7 @@ func (r *Reactor) handleVoteMessage(ctx context.Context, envelope p2p.Envelope,
// VoteSetBitsChannel. If we fail to find the peer state for the envelope sender, // VoteSetBitsChannel. If we fail to find the peer state for the envelope sender,
// we perform a no-op and return. This can happen when we process the envelope // we perform a no-op and return. This can happen when we process the envelope
// after the peer is removed. // after the peer is removed.
func (r *Reactor) handleVoteSetBitsMessage(ctx context.Context, envelope p2p.Envelope, msgI Message) error {
func (r *Reactor) handleVoteSetBitsMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message) error {
logger := r.logger.With("peer", envelope.From, "ch_id", "VoteSetBitsChannel") logger := r.logger.With("peer", envelope.From, "ch_id", "VoteSetBitsChannel")
ps, ok := r.GetPeerState(envelope.From) ps, ok := r.GetPeerState(envelope.From)
@ -1304,7 +1304,7 @@ func (r *Reactor) handleVoteSetBitsMessage(ctx context.Context, envelope p2p.Env
// the p2p channel. // the p2p channel.
// //
// NOTE: We block on consensus state for proposals, block parts, and votes. // NOTE: We block on consensus state for proposals, block parts, and votes.
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope p2p.Envelope) (err error) {
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) {
defer func() { defer func() {
if e := recover(); e != nil { if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e) err = fmt.Errorf("panic in processing message: %v", e)
@ -1359,20 +1359,16 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
// the reactor is stopped, we will catch the signal and close the p2p Channel // the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully. // gracefully.
func (r *Reactor) processStateCh(ctx context.Context) { func (r *Reactor) processStateCh(ctx context.Context) {
for {
select {
case <-ctx.Done():
r.logger.Debug("stopped listening on StateChannel; closing...")
return
case envelope := <-r.stateCh.In:
if err := r.handleMessage(ctx, r.stateCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.stateCh.ID, "envelope", envelope, "err", err)
if serr := r.stateCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
}
iter := r.stateCh.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, r.stateCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.stateCh.ID, "envelope", envelope, "err", err)
if serr := r.stateCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
} }
} }
} }
@ -1384,20 +1380,16 @@ func (r *Reactor) processStateCh(ctx context.Context) {
// the reactor is stopped, we will catch the signal and close the p2p Channel // the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully. // gracefully.
func (r *Reactor) processDataCh(ctx context.Context) { func (r *Reactor) processDataCh(ctx context.Context) {
for {
select {
case <-ctx.Done():
r.logger.Debug("stopped listening on DataChannel; closing...")
return
case envelope := <-r.dataCh.In:
if err := r.handleMessage(ctx, r.dataCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.dataCh.ID, "envelope", envelope, "err", err)
if serr := r.dataCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
}
iter := r.dataCh.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, r.dataCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.dataCh.ID, "envelope", envelope, "err", err)
if serr := r.dataCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
} }
} }
} }
@ -1409,20 +1401,16 @@ func (r *Reactor) processDataCh(ctx context.Context) {
// the reactor is stopped, we will catch the signal and close the p2p Channel // the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully. // gracefully.
func (r *Reactor) processVoteCh(ctx context.Context) { func (r *Reactor) processVoteCh(ctx context.Context) {
for {
select {
case <-ctx.Done():
r.logger.Debug("stopped listening on VoteChannel; closing...")
return
case envelope := <-r.voteCh.In:
if err := r.handleMessage(ctx, r.voteCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.voteCh.ID, "envelope", envelope, "err", err)
if serr := r.voteCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
}
iter := r.voteCh.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, r.voteCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.voteCh.ID, "envelope", envelope, "err", err)
if serr := r.voteCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
} }
} }
} }
@ -1434,24 +1422,21 @@ func (r *Reactor) processVoteCh(ctx context.Context) {
// When the reactor is stopped, we will catch the signal and close the p2p // When the reactor is stopped, we will catch the signal and close the p2p
// Channel gracefully. // Channel gracefully.
func (r *Reactor) processVoteSetBitsCh(ctx context.Context) { func (r *Reactor) processVoteSetBitsCh(ctx context.Context) {
for {
select {
case <-ctx.Done():
r.logger.Debug("stopped listening on VoteSetBitsChannel; closing...")
return
case envelope := <-r.voteSetBitsCh.In:
if err := r.handleMessage(ctx, r.voteSetBitsCh.ID, envelope); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
iter := r.voteSetBitsCh.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
r.logger.Error("failed to process message", "ch_id", r.voteSetBitsCh.ID, "envelope", envelope, "err", err)
if serr := r.voteSetBitsCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
}
if err := r.handleMessage(ctx, r.voteSetBitsCh.ID, envelope); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
r.logger.Error("failed to process message", "ch_id", r.voteSetBitsCh.ID, "envelope", envelope, "err", err)
if serr := r.voteSetBitsCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
} }
} }
} }


+ 12
- 16
internal/evidence/reactor.go View File

@ -110,7 +110,7 @@ func (r *Reactor) OnStop() {
// It returns an error only if the Envelope.Message is unknown for this channel // It returns an error only if the Envelope.Message is unknown for this channel
// or if the given evidence is invalid. This should never be called outside of // or if the given evidence is invalid. This should never be called outside of
// handleMessage. // handleMessage.
func (r *Reactor) handleEvidenceMessage(envelope p2p.Envelope) error {
func (r *Reactor) handleEvidenceMessage(envelope *p2p.Envelope) error {
logger := r.logger.With("peer", envelope.From) logger := r.logger.With("peer", envelope.From)
switch msg := envelope.Message.(type) { switch msg := envelope.Message.(type) {
@ -146,7 +146,7 @@ func (r *Reactor) handleEvidenceMessage(envelope p2p.Envelope) error {
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel. // handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle // It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel. // any error returned by sending a PeerError on the respective channel.
func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err error) {
func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope *p2p.Envelope) (err error) {
defer func() { defer func() {
if e := recover(); e != nil { if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e) err = fmt.Errorf("panic in processing message: %v", e)
@ -174,20 +174,16 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err
// processEvidenceCh implements a blocking event loop where we listen for p2p // processEvidenceCh implements a blocking event loop where we listen for p2p
// Envelope messages from the evidenceCh. // Envelope messages from the evidenceCh.
func (r *Reactor) processEvidenceCh(ctx context.Context) { func (r *Reactor) processEvidenceCh(ctx context.Context) {
for {
select {
case <-ctx.Done():
r.logger.Debug("stopped listening on evidence channel; closing...")
return
case envelope := <-r.evidenceCh.In:
if err := r.handleMessage(r.evidenceCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.evidenceCh.ID, "envelope", envelope, "err", err)
if serr := r.evidenceCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
}
iter := r.evidenceCh.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(r.evidenceCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.evidenceCh.ID, "envelope", envelope, "err", err)
if serr := r.evidenceCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
} }
} }
} }


+ 12
- 16
internal/mempool/reactor.go View File

@ -140,7 +140,7 @@ func (r *Reactor) OnStop() {
// For every tx in the message, we execute CheckTx. It returns an error if an // For every tx in the message, we execute CheckTx. It returns an error if an
// empty set of txs are sent in an envelope or if we receive an unexpected // empty set of txs are sent in an envelope or if we receive an unexpected
// message type. // message type.
func (r *Reactor) handleMempoolMessage(ctx context.Context, envelope p2p.Envelope) error {
func (r *Reactor) handleMempoolMessage(ctx context.Context, envelope *p2p.Envelope) error {
logger := r.logger.With("peer", envelope.From) logger := r.logger.With("peer", envelope.From)
switch msg := envelope.Message.(type) { switch msg := envelope.Message.(type) {
@ -171,7 +171,7 @@ func (r *Reactor) handleMempoolMessage(ctx context.Context, envelope p2p.Envelop
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel. // handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle // It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel. // any error returned by sending a PeerError on the respective channel.
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope p2p.Envelope) (err error) {
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) {
defer func() { defer func() {
if e := recover(); e != nil { if e := recover(); e != nil {
r.observePanic(e) r.observePanic(e)
@ -200,21 +200,17 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
// processMempoolCh implements a blocking event loop where we listen for p2p // processMempoolCh implements a blocking event loop where we listen for p2p
// Envelope messages from the mempoolCh. // Envelope messages from the mempoolCh.
func (r *Reactor) processMempoolCh(ctx context.Context) { func (r *Reactor) processMempoolCh(ctx context.Context) {
for {
select {
case envelope := <-r.mempoolCh.In:
if err := r.handleMessage(ctx, r.mempoolCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.mempoolCh.ID, "envelope", envelope, "err", err)
if serr := r.mempoolCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
}
iter := r.mempoolCh.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, r.mempoolCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.mempoolCh.ID, "envelope", envelope, "err", err)
if serr := r.mempoolCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
} }
case <-ctx.Done():
r.logger.Debug("stopped listening on mempool channel; closing...")
return
} }
} }
} }


+ 12
- 20
internal/p2p/channel.go View File

@ -15,15 +15,7 @@ type Envelope struct {
To types.NodeID // receiver (empty if inbound) To types.NodeID // receiver (empty if inbound)
Broadcast bool // send to all connected peers (ignores To) Broadcast bool // send to all connected peers (ignores To)
Message proto.Message // message payload Message proto.Message // message payload
// channelID is for internal Router use, set on outbound messages to inform
// the sendPeer() goroutine which transport channel to use.
//
// FIXME: If we migrate the Transport API to a byte-oriented multi-stream
// API, this will no longer be necessary since each channel will be mapped
// onto a stream during channel/peer setup. See:
// https://github.com/tendermint/spec/pull/227
channelID ChannelID
ChannelID ChannelID
} }
// Wrapper is a Protobuf message that can contain a variety of inner messages // Wrapper is a Protobuf message that can contain a variety of inner messages
@ -62,7 +54,7 @@ func (pe PeerError) Unwrap() error { return pe.Err }
// Each message is wrapped in an Envelope to specify its sender and receiver. // Each message is wrapped in an Envelope to specify its sender and receiver.
type Channel struct { type Channel struct {
ID ChannelID ID ChannelID
In <-chan Envelope // inbound messages (peers to reactors)
inCh <-chan Envelope // inbound messages (peers to reactors)
outCh chan<- Envelope // outbound messages (reactors to peers) outCh chan<- Envelope // outbound messages (reactors to peers)
errCh chan<- PeerError // peer error reporting errCh chan<- PeerError // peer error reporting
@ -81,7 +73,7 @@ func NewChannel(
return &Channel{ return &Channel{
ID: id, ID: id,
messageType: messageType, messageType: messageType,
In: inCh,
inCh: inCh,
outCh: outCh, outCh: outCh,
errCh: errCh, errCh: errCh,
} }
@ -138,7 +130,7 @@ func iteratorWorker(ctx context.Context, ch *Channel, pipe chan Envelope) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case envelope := <-ch.In:
case envelope := <-ch.inCh:
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
@ -192,6 +184,14 @@ func MergedChannelIterator(ctx context.Context, chs ...*Channel) *ChannelIterato
} }
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
for _, ch := range chs {
wg.Add(1)
go func(ch *Channel) {
defer wg.Done()
iteratorWorker(ctx, ch, iter.pipe)
}(ch)
}
done := make(chan struct{}) done := make(chan struct{})
go func() { defer close(done); wg.Wait() }() go func() { defer close(done); wg.Wait() }()
@ -204,13 +204,5 @@ func MergedChannelIterator(ctx context.Context, chs ...*Channel) *ChannelIterato
<-done <-done
}() }()
for _, ch := range chs {
wg.Add(1)
go func(ch *Channel) {
defer wg.Done()
iteratorWorker(ctx, ch, iter.pipe)
}(ch)
}
return iter return iter
} }

+ 1
- 1
internal/p2p/channel_test.go View File

@ -23,7 +23,7 @@ func testChannel(size int) (*channelInternal, *Channel) {
Error: make(chan PeerError, size), Error: make(chan PeerError, size),
} }
ch := &Channel{ ch := &Channel{
In: in.In,
inCh: in.In,
outCh: in.Out, outCh: in.Out,
errCh: in.Error, errCh: in.Error,
} }


+ 1
- 1
internal/p2p/p2ptest/network.go View File

@ -306,7 +306,7 @@ func (n *Node) MakeChannel(
require.NoError(t, err) require.NoError(t, err)
require.Contains(t, n.Router.NodeInfo().Channels, byte(chDesc.ID)) require.Contains(t, n.Router.NodeInfo().Channels, byte(chDesc.ID))
t.Cleanup(func() { t.Cleanup(func() {
RequireEmpty(t, channel)
RequireEmpty(ctx, t, channel)
cancel() cancel()
}) })
return channel return channel


+ 41
- 30
internal/p2p/p2ptest/require.go View File

@ -7,6 +7,7 @@ import (
"time" "time"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p"
@ -14,53 +15,63 @@ import (
) )
// RequireEmpty requires that the given channel is empty. // RequireEmpty requires that the given channel is empty.
func RequireEmpty(t *testing.T, channels ...*p2p.Channel) {
for _, channel := range channels {
select {
case e := <-channel.In:
require.Fail(t, "unexpected message", "channel %v should be empty, got %v", channel.ID, e)
case <-time.After(10 * time.Millisecond):
}
func RequireEmpty(ctx context.Context, t *testing.T, channels ...*p2p.Channel) {
t.Helper()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
iter := p2p.MergedChannelIterator(ctx, channels...)
count := 0
for iter.Next(ctx) {
count++
require.Nil(t, iter.Envelope())
} }
require.Zero(t, count)
require.Error(t, ctx.Err())
} }
// RequireReceive requires that the given envelope is received on the channel. // RequireReceive requires that the given envelope is received on the channel.
func RequireReceive(t *testing.T, channel *p2p.Channel, expect p2p.Envelope) {
func RequireReceive(ctx context.Context, t *testing.T, channel *p2p.Channel, expect p2p.Envelope) {
t.Helper() t.Helper()
timer := time.NewTimer(time.Second) // not time.After due to goroutine leaks
defer timer.Stop()
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
select {
case e := <-channel.In:
require.Equal(t, expect, e)
case <-timer.C:
require.Fail(t, "timed out waiting for message", "%v on channel %v", expect, channel.ID)
iter := channel.Receive(ctx)
count := 0
for iter.Next(ctx) {
count++
envelope := iter.Envelope()
require.Equal(t, expect.From, envelope.From)
require.Equal(t, expect.Message, envelope.Message)
}
if !assert.True(t, count >= 1) {
require.NoError(t, ctx.Err(), "timed out waiting for message %v", expect)
} }
} }
// RequireReceiveUnordered requires that the given envelopes are all received on // RequireReceiveUnordered requires that the given envelopes are all received on
// the channel, ignoring order. // the channel, ignoring order.
func RequireReceiveUnordered(t *testing.T, channel *p2p.Channel, expect []p2p.Envelope) {
timer := time.NewTimer(time.Second) // not time.After due to goroutine leaks
defer timer.Stop()
func RequireReceiveUnordered(ctx context.Context, t *testing.T, channel *p2p.Channel, expect []*p2p.Envelope) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
actual := []p2p.Envelope{}
for {
select {
case e := <-channel.In:
actual = append(actual, e)
if len(actual) == len(expect) {
require.ElementsMatch(t, expect, actual)
return
}
actual := []*p2p.Envelope{}
case <-timer.C:
require.ElementsMatch(t, expect, actual)
iter := channel.Receive(ctx)
for iter.Next(ctx) {
actual = append(actual, iter.Envelope())
if len(actual) == len(expect) {
require.ElementsMatch(t, expect, actual, "len=%d", len(actual))
return return
} }
} }
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
require.ElementsMatch(t, expect, actual)
}
} }
// RequireSend requires that the given envelope is sent on the channel. // RequireSend requires that the given envelope is sent on the channel.
@ -88,7 +99,7 @@ func RequireSendReceive(
receive proto.Message, receive proto.Message,
) { ) {
RequireSend(ctx, t, channel, p2p.Envelope{To: peerID, Message: send}) RequireSend(ctx, t, channel, p2p.Envelope{To: peerID, Message: send})
RequireReceive(t, channel, p2p.Envelope{From: peerID, Message: send})
RequireReceive(ctx, t, channel, p2p.Envelope{From: peerID, Message: send})
} }
// RequireNoUpdates requires that a PeerUpdates subscription is empty. // RequireNoUpdates requires that a PeerUpdates subscription is empty.


+ 47
- 31
internal/p2p/pex/reactor.go View File

@ -100,9 +100,6 @@ type Reactor struct {
// minReceiveRequestInterval). // minReceiveRequestInterval).
lastReceivedRequests map[types.NodeID]time.Time lastReceivedRequests map[types.NodeID]time.Time
// the time when another request will be sent
nextRequestTime time.Time
// keep track of how many new peers to existing peers we have received to // keep track of how many new peers to existing peers we have received to
// extrapolate the size of the network // extrapolate the size of the network
newPeers uint32 newPeers uint32
@ -155,8 +152,26 @@ func (r *Reactor) OnStop() {}
func (r *Reactor) processPexCh(ctx context.Context) { func (r *Reactor) processPexCh(ctx context.Context) {
timer := time.NewTimer(0) timer := time.NewTimer(0)
defer timer.Stop() defer timer.Stop()
var (
duration = r.calculateNextRequestTime()
err error
)
incoming := make(chan *p2p.Envelope)
go func() {
defer close(incoming)
iter := r.pexCh.Receive(ctx)
for iter.Next(ctx) {
select {
case <-ctx.Done():
return
case incoming <- iter.Envelope():
}
}
}()
for { for {
timer.Reset(time.Until(r.nextRequestTime))
timer.Reset(duration)
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -165,12 +180,15 @@ func (r *Reactor) processPexCh(ctx context.Context) {
// outbound requests for new peers // outbound requests for new peers
case <-timer.C: case <-timer.C:
r.sendRequestForPeers(ctx)
duration, err = r.sendRequestForPeers(ctx)
if err != nil {
return
}
// inbound requests for new peers or responses to requests sent by this // inbound requests for new peers or responses to requests sent by this
// reactor // reactor
case envelope := <-r.pexCh.In:
if err := r.handleMessage(ctx, r.pexCh.ID, envelope); err != nil {
case envelope := <-incoming:
duration, err = r.handleMessage(ctx, r.pexCh.ID, envelope)
if err != nil {
r.logger.Error("failed to process message", "ch_id", r.pexCh.ID, "envelope", envelope, "err", err) r.logger.Error("failed to process message", "ch_id", r.pexCh.ID, "envelope", envelope, "err", err)
if serr := r.pexCh.SendError(ctx, p2p.PeerError{ if serr := r.pexCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From, NodeID: envelope.From,
@ -179,6 +197,7 @@ func (r *Reactor) processPexCh(ctx context.Context) {
return return
} }
} }
} }
} }
} }
@ -199,7 +218,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) {
} }
// handlePexMessage handles envelopes sent from peers on the PexChannel. // handlePexMessage handles envelopes sent from peers on the PexChannel.
func (r *Reactor) handlePexMessage(ctx context.Context, envelope p2p.Envelope) error {
func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope) (time.Duration, error) {
logger := r.logger.With("peer", envelope.From) logger := r.logger.With("peer", envelope.From)
switch msg := envelope.Message.(type) { switch msg := envelope.Message.(type) {
@ -207,7 +226,7 @@ func (r *Reactor) handlePexMessage(ctx context.Context, envelope p2p.Envelope) e
// check if the peer hasn't sent a prior request too close to this one // check if the peer hasn't sent a prior request too close to this one
// in time // in time
if err := r.markPeerRequest(envelope.From); err != nil { if err := r.markPeerRequest(envelope.From); err != nil {
return err
return time.Minute, err
} }
// request peers from the peer manager and parse the NodeAddresses into // request peers from the peer manager and parse the NodeAddresses into
@ -223,18 +242,19 @@ func (r *Reactor) handlePexMessage(ctx context.Context, envelope p2p.Envelope) e
To: envelope.From, To: envelope.From,
Message: &protop2p.PexResponse{Addresses: pexAddresses}, Message: &protop2p.PexResponse{Addresses: pexAddresses},
}); err != nil { }); err != nil {
return err
return 0, err
} }
return time.Second, nil
case *protop2p.PexResponse: case *protop2p.PexResponse:
// check if the response matches a request that was made to that peer // check if the response matches a request that was made to that peer
if err := r.markPeerResponse(envelope.From); err != nil { if err := r.markPeerResponse(envelope.From); err != nil {
return err
return time.Minute, err
} }
// check the size of the response // check the size of the response
if len(msg.Addresses) > int(maxAddresses) { if len(msg.Addresses) > int(maxAddresses) {
return fmt.Errorf("peer sent too many addresses (max: %d, got: %d)",
return 10 * time.Minute, fmt.Errorf("peer sent too many addresses (max: %d, got: %d)",
maxAddresses, maxAddresses,
len(msg.Addresses), len(msg.Addresses),
) )
@ -256,17 +276,16 @@ func (r *Reactor) handlePexMessage(ctx context.Context, envelope p2p.Envelope) e
r.totalPeers++ r.totalPeers++
} }
return 10 * time.Minute, nil
default: default:
return fmt.Errorf("received unknown message: %T", msg)
return time.Second, fmt.Errorf("received unknown message: %T", msg)
} }
return nil
} }
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel. // handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle // It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel. // any error returned by sending a PeerError on the respective channel.
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope p2p.Envelope) (err error) {
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (duration time.Duration, err error) {
defer func() { defer func() {
if e := recover(); e != nil { if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e) err = fmt.Errorf("panic in processing message: %v", e)
@ -282,13 +301,12 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
switch chID { switch chID {
case p2p.ChannelID(PexChannel): case p2p.ChannelID(PexChannel):
err = r.handlePexMessage(ctx, envelope)
duration, err = r.handlePexMessage(ctx, envelope)
default: default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
} }
return err
return
} }
// processPeerUpdate processes a PeerUpdate. For added peers, PeerStatusUp, we // processPeerUpdate processes a PeerUpdate. For added peers, PeerStatusUp, we
@ -314,15 +332,13 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
// peer a request for more peer addresses. The function then moves the // peer a request for more peer addresses. The function then moves the
// peer into the requestsSent bucket and calculates when the next request // peer into the requestsSent bucket and calculates when the next request
// time should be // time should be
func (r *Reactor) sendRequestForPeers(ctx context.Context) {
func (r *Reactor) sendRequestForPeers(ctx context.Context) (time.Duration, error) {
r.mtx.Lock() r.mtx.Lock()
defer r.mtx.Unlock() defer r.mtx.Unlock()
if len(r.availablePeers) == 0 { if len(r.availablePeers) == 0 {
// no peers are available // no peers are available
r.logger.Debug("no available peers to send request to, waiting...") r.logger.Debug("no available peers to send request to, waiting...")
r.nextRequestTime = time.Now().Add(noAvailablePeersWaitPeriod)
return
return noAvailablePeersWaitPeriod, nil
} }
var peerID types.NodeID var peerID types.NodeID
@ -336,15 +352,16 @@ func (r *Reactor) sendRequestForPeers(ctx context.Context) {
To: peerID, To: peerID,
Message: &protop2p.PexRequest{}, Message: &protop2p.PexRequest{},
}); err != nil { }); err != nil {
return
return 0, err
} }
// remove the peer from the abvailable peers list and mark it in the requestsSent map // remove the peer from the abvailable peers list and mark it in the requestsSent map
delete(r.availablePeers, peerID) delete(r.availablePeers, peerID)
r.requestsSent[peerID] = struct{}{} r.requestsSent[peerID] = struct{}{}
r.calculateNextRequestTime()
r.logger.Debug("peer request sent", "next_request_time", r.nextRequestTime)
dur := r.calculateNextRequestTime()
r.logger.Debug("peer request sent", "next_request_time", dur)
return dur, nil
} }
// calculateNextRequestTime implements something of a proportional controller // calculateNextRequestTime implements something of a proportional controller
@ -357,14 +374,13 @@ func (r *Reactor) sendRequestForPeers(ctx context.Context) {
// new nodes will plummet to a very small number, meaning the interval expands // new nodes will plummet to a very small number, meaning the interval expands
// to its upper bound. // to its upper bound.
// CONTRACT: Must use a write lock as nextRequestTime is updated // CONTRACT: Must use a write lock as nextRequestTime is updated
func (r *Reactor) calculateNextRequestTime() {
func (r *Reactor) calculateNextRequestTime() time.Duration {
// check if the peer store is full. If so then there is no need // check if the peer store is full. If so then there is no need
// to send peer requests too often // to send peer requests too often
if ratio := r.peerManager.PeerRatio(); ratio >= 0.95 { if ratio := r.peerManager.PeerRatio(); ratio >= 0.95 {
r.logger.Debug("peer manager near full ratio, sleeping...", r.logger.Debug("peer manager near full ratio, sleeping...",
"sleep_period", fullCapacityInterval, "ratio", ratio) "sleep_period", fullCapacityInterval, "ratio", ratio)
r.nextRequestTime = time.Now().Add(fullCapacityInterval)
return
return fullCapacityInterval
} }
// baseTime represents the shortest interval that we can send peer requests // baseTime represents the shortest interval that we can send peer requests
@ -390,7 +406,7 @@ func (r *Reactor) calculateNextRequestTime() {
} }
// NOTE: As ratio is always >= 1, discovery ratio is >= 1. Therefore we don't need to worry // NOTE: As ratio is always >= 1, discovery ratio is >= 1. Therefore we don't need to worry
// about the next request time being less than the minimum time // about the next request time being less than the minimum time
r.nextRequestTime = time.Now().Add(baseTime * time.Duration(r.discoveryRatio))
return baseTime * time.Duration(r.discoveryRatio)
} }
func (r *Reactor) markPeerRequest(peer types.NodeID) error { func (r *Reactor) markPeerRequest(peer types.NodeID) error {


+ 30
- 24
internal/p2p/pex/reactor_test.go View File

@ -2,6 +2,7 @@ package pex_test
import ( import (
"context" "context"
"errors"
"strings" "strings"
"testing" "testing"
"time" "time"
@ -41,12 +42,12 @@ func TestReactorBasic(t *testing.T) {
testNet.start(ctx, t) testNet.start(ctx, t)
// assert that the mock node receives a request from the real node // assert that the mock node receives a request from the real node
testNet.listenForRequest(t, secondNode, firstNode, shortWait)
testNet.listenForRequest(ctx, t, secondNode, firstNode, shortWait)
// assert that when a mock node sends a request it receives a response (and // assert that when a mock node sends a request it receives a response (and
// the correct one) // the correct one)
testNet.sendRequest(ctx, t, firstNode, secondNode) testNet.sendRequest(ctx, t, firstNode, secondNode)
testNet.listenForResponse(t, secondNode, firstNode, shortWait, []p2pproto.PexAddress(nil))
testNet.listenForResponse(ctx, t, secondNode, firstNode, shortWait, []p2pproto.PexAddress(nil))
} }
func TestReactorConnectFullNetwork(t *testing.T) { func TestReactorConnectFullNetwork(t *testing.T) {
@ -440,38 +441,42 @@ func (r *reactorTestSuite) addNodes(ctx context.Context, t *testing.T, nodes int
} }
func (r *reactorTestSuite) listenFor( func (r *reactorTestSuite) listenFor(
ctx context.Context,
t *testing.T, t *testing.T,
node types.NodeID, node types.NodeID,
conditional func(msg p2p.Envelope) bool,
assertion func(t *testing.T, msg p2p.Envelope) bool,
conditional func(msg *p2p.Envelope) bool,
assertion func(t *testing.T, msg *p2p.Envelope) bool,
waitPeriod time.Duration, waitPeriod time.Duration,
) { ) {
timesUp := time.After(waitPeriod)
for {
select {
case envelope := <-r.pexChannels[node].In:
if conditional(envelope) && assertion(t, envelope) {
return
}
case <-timesUp:
require.Fail(t, "timed out waiting for message",
"node=%v, waitPeriod=%s", node, waitPeriod)
ctx, cancel := context.WithTimeout(ctx, waitPeriod)
defer cancel()
iter := r.pexChannels[node].Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if conditional(envelope) && assertion(t, envelope) {
return
} }
} }
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
require.Fail(t, "timed out waiting for message",
"node=%v, waitPeriod=%s", node, waitPeriod)
}
} }
func (r *reactorTestSuite) listenForRequest(t *testing.T, fromNode, toNode int, waitPeriod time.Duration) {
func (r *reactorTestSuite) listenForRequest(ctx context.Context, t *testing.T, fromNode, toNode int, waitPeriod time.Duration) {
r.logger.Info("Listening for request", "from", fromNode, "to", toNode) r.logger.Info("Listening for request", "from", fromNode, "to", toNode)
to, from := r.checkNodePair(t, toNode, fromNode) to, from := r.checkNodePair(t, toNode, fromNode)
conditional := func(msg p2p.Envelope) bool {
conditional := func(msg *p2p.Envelope) bool {
_, ok := msg.Message.(*p2pproto.PexRequest) _, ok := msg.Message.(*p2pproto.PexRequest)
return ok && msg.From == from return ok && msg.From == from
} }
assertion := func(t *testing.T, msg p2p.Envelope) bool {
assertion := func(t *testing.T, msg *p2p.Envelope) bool {
require.Equal(t, &p2pproto.PexRequest{}, msg.Message) require.Equal(t, &p2pproto.PexRequest{}, msg.Message)
return true return true
} }
r.listenFor(t, to, conditional, assertion, waitPeriod)
r.listenFor(ctx, t, to, conditional, assertion, waitPeriod)
} }
func (r *reactorTestSuite) pingAndlistenForNAddresses( func (r *reactorTestSuite) pingAndlistenForNAddresses(
@ -484,11 +489,11 @@ func (r *reactorTestSuite) pingAndlistenForNAddresses(
t.Helper() t.Helper()
r.logger.Info("Listening for addresses", "from", fromNode, "to", toNode) r.logger.Info("Listening for addresses", "from", fromNode, "to", toNode)
to, from := r.checkNodePair(t, toNode, fromNode) to, from := r.checkNodePair(t, toNode, fromNode)
conditional := func(msg p2p.Envelope) bool {
conditional := func(msg *p2p.Envelope) bool {
_, ok := msg.Message.(*p2pproto.PexResponse) _, ok := msg.Message.(*p2pproto.PexResponse)
return ok && msg.From == from return ok && msg.From == from
} }
assertion := func(t *testing.T, msg p2p.Envelope) bool {
assertion := func(t *testing.T, msg *p2p.Envelope) bool {
m, ok := msg.Message.(*p2pproto.PexResponse) m, ok := msg.Message.(*p2pproto.PexResponse)
if !ok { if !ok {
require.Fail(t, "expected pex response v2") require.Fail(t, "expected pex response v2")
@ -505,10 +510,11 @@ func (r *reactorTestSuite) pingAndlistenForNAddresses(
return false return false
} }
r.sendRequest(ctx, t, toNode, fromNode) r.sendRequest(ctx, t, toNode, fromNode)
r.listenFor(t, to, conditional, assertion, waitPeriod)
r.listenFor(ctx, t, to, conditional, assertion, waitPeriod)
} }
func (r *reactorTestSuite) listenForResponse( func (r *reactorTestSuite) listenForResponse(
ctx context.Context,
t *testing.T, t *testing.T,
fromNode, toNode int, fromNode, toNode int,
waitPeriod time.Duration, waitPeriod time.Duration,
@ -516,16 +522,16 @@ func (r *reactorTestSuite) listenForResponse(
) { ) {
r.logger.Info("Listening for response", "from", fromNode, "to", toNode) r.logger.Info("Listening for response", "from", fromNode, "to", toNode)
to, from := r.checkNodePair(t, toNode, fromNode) to, from := r.checkNodePair(t, toNode, fromNode)
conditional := func(msg p2p.Envelope) bool {
conditional := func(msg *p2p.Envelope) bool {
_, ok := msg.Message.(*p2pproto.PexResponse) _, ok := msg.Message.(*p2pproto.PexResponse)
r.logger.Info("message", msg, "ok", ok) r.logger.Info("message", msg, "ok", ok)
return ok && msg.From == from return ok && msg.From == from
} }
assertion := func(t *testing.T, msg p2p.Envelope) bool {
assertion := func(t *testing.T, msg *p2p.Envelope) bool {
require.Equal(t, &p2pproto.PexResponse{Addresses: addresses}, msg.Message) require.Equal(t, &p2pproto.PexResponse{Addresses: addresses}, msg.Message)
return true return true
} }
r.listenFor(t, to, conditional, assertion, waitPeriod)
r.listenFor(ctx, t, to, conditional, assertion, waitPeriod)
} }
func (r *reactorTestSuite) listenForPeerUpdate( func (r *reactorTestSuite) listenForPeerUpdate(


+ 4
- 4
internal/p2p/pqueue.go View File

@ -160,11 +160,11 @@ func (s *pqScheduler) process(ctx context.Context) {
for { for {
select { select {
case e := <-s.enqueueCh: case e := <-s.enqueueCh:
chIDStr := strconv.Itoa(int(e.channelID))
chIDStr := strconv.Itoa(int(e.ChannelID))
pqEnv := &pqEnvelope{ pqEnv := &pqEnvelope{
envelope: e, envelope: e,
size: uint(proto.Size(e.Message)), size: uint(proto.Size(e.Message)),
priority: s.chPriorities[e.channelID],
priority: s.chPriorities[e.ChannelID],
timestamp: time.Now().UTC(), timestamp: time.Now().UTC(),
} }
@ -203,7 +203,7 @@ func (s *pqScheduler) process(ctx context.Context) {
if tmpSize+pqEnv.size <= s.capacity { if tmpSize+pqEnv.size <= s.capacity {
canEnqueue = true canEnqueue = true
} else { } else {
pqEnvTmpChIDStr := strconv.Itoa(int(pqEnvTmp.envelope.channelID))
pqEnvTmpChIDStr := strconv.Itoa(int(pqEnvTmp.envelope.ChannelID))
s.metrics.PeerQueueDroppedMsgs.With("ch_id", pqEnvTmpChIDStr).Add(1) s.metrics.PeerQueueDroppedMsgs.With("ch_id", pqEnvTmpChIDStr).Add(1)
s.logger.Debug( s.logger.Debug(
"dropped envelope", "dropped envelope",
@ -277,7 +277,7 @@ func (s *pqScheduler) process(ctx context.Context) {
} }
func (s *pqScheduler) push(pqEnv *pqEnvelope) { func (s *pqScheduler) push(pqEnv *pqEnvelope) {
chIDStr := strconv.Itoa(int(pqEnv.envelope.channelID))
chIDStr := strconv.Itoa(int(pqEnv.envelope.ChannelID))
// enqueue the incoming Envelope // enqueue the incoming Envelope
heap.Push(s.pq, pqEnv) heap.Push(s.pq, pqEnv)


+ 1
- 1
internal/p2p/pqueue_test.go View File

@ -20,7 +20,7 @@ func TestCloseWhileDequeueFull(t *testing.T) {
for i := 0; i < enqueueLength; i++ { for i := 0; i < enqueueLength; i++ {
pqueue.enqueue() <- Envelope{ pqueue.enqueue() <- Envelope{
channelID: 0x01,
ChannelID: 0x01,
Message: &testMessage{Value: "foo"}, // 5 bytes Message: &testMessage{Value: "foo"}, // 5 bytes
} }
} }


+ 3
- 3
internal/p2p/router.go View File

@ -328,7 +328,7 @@ func (r *Router) routeChannel(
// Mark the envelope with the channel ID to allow sendPeer() to pass // Mark the envelope with the channel ID to allow sendPeer() to pass
// it on to Transport.SendMessage(). // it on to Transport.SendMessage().
envelope.channelID = chID
envelope.ChannelID = chID
// wrap the message in a wrapper message, if requested // wrap the message in a wrapper message, if requested
if wrapper != nil { if wrapper != nil {
@ -859,7 +859,7 @@ func (r *Router) receivePeer(ctx context.Context, peerID types.NodeID, conn Conn
start := time.Now().UTC() start := time.Now().UTC()
select { select {
case queue.enqueue() <- Envelope{From: peerID, Message: msg}:
case queue.enqueue() <- Envelope{From: peerID, Message: msg, ChannelID: chID}:
r.metrics.PeerReceiveBytesTotal.With( r.metrics.PeerReceiveBytesTotal.With(
"chID", fmt.Sprint(chID), "chID", fmt.Sprint(chID),
"peer_id", string(peerID), "peer_id", string(peerID),
@ -895,7 +895,7 @@ func (r *Router) sendPeer(ctx context.Context, peerID types.NodeID, conn Connect
continue continue
} }
if err = conn.SendMessage(ctx, envelope.channelID, bz); err != nil {
if err = conn.SendMessage(ctx, envelope.ChannelID, bz); err != nil {
return err return err
} }


+ 36
- 38
internal/p2p/router_test.go View File

@ -28,17 +28,14 @@ import (
) )
func echoReactor(ctx context.Context, channel *p2p.Channel) { func echoReactor(ctx context.Context, channel *p2p.Channel) {
for {
select {
case envelope := <-channel.In:
value := envelope.Message.(*p2ptest.Message).Value
if err := channel.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: &p2ptest.Message{Value: value},
}); err != nil {
return
}
case <-ctx.Done():
iter := channel.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
value := envelope.Message.(*p2ptest.Message).Value
if err := channel.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: &p2ptest.Message{Value: value},
}); err != nil {
return return
} }
} }
@ -76,14 +73,15 @@ func TestRouter_Network(t *testing.T) {
Broadcast: true, Broadcast: true,
Message: &p2ptest.Message{Value: "bar"}, Message: &p2ptest.Message{Value: "bar"},
}) })
expect := []p2p.Envelope{}
expect := []*p2p.Envelope{}
for _, peer := range peers { for _, peer := range peers {
expect = append(expect, p2p.Envelope{
From: peer.NodeID,
Message: &p2ptest.Message{Value: "bar"},
expect = append(expect, &p2p.Envelope{
From: peer.NodeID,
ChannelID: 1,
Message: &p2ptest.Message{Value: "bar"},
}) })
} }
p2ptest.RequireReceiveUnordered(t, channel, expect)
p2ptest.RequireReceiveUnordered(ctx, t, channel, expect)
// We then submit an error for a peer, and watch it get disconnected and // We then submit an error for a peer, and watch it get disconnected and
// then reconnected as the router retries it. // then reconnected as the router retries it.
@ -162,7 +160,7 @@ func TestRouter_Channel_Basic(t *testing.T) {
To: selfID, To: selfID,
Message: &p2ptest.Message{Value: "self"}, Message: &p2ptest.Message{Value: "self"},
}) })
p2ptest.RequireEmpty(t, channel)
p2ptest.RequireEmpty(ctx, t, channel)
} }
// Channel tests are hairy to mock, so we use an in-memory network instead. // Channel tests are hairy to mock, so we use an in-memory network instead.
@ -186,45 +184,45 @@ func TestRouter_Channel_SendReceive(t *testing.T) {
// Sending a message a->b should work, and not send anything // Sending a message a->b should work, and not send anything
// further to a, b, or c. // further to a, b, or c.
p2ptest.RequireSend(ctx, t, a, p2p.Envelope{To: bID, Message: &p2ptest.Message{Value: "foo"}}) p2ptest.RequireSend(ctx, t, a, p2p.Envelope{To: bID, Message: &p2ptest.Message{Value: "foo"}})
p2ptest.RequireReceive(t, b, p2p.Envelope{From: aID, Message: &p2ptest.Message{Value: "foo"}})
p2ptest.RequireEmpty(t, a, b, c)
p2ptest.RequireReceive(ctx, t, b, p2p.Envelope{From: aID, Message: &p2ptest.Message{Value: "foo"}})
p2ptest.RequireEmpty(ctx, t, a, b, c)
// Sending a nil message a->b should be dropped. // Sending a nil message a->b should be dropped.
p2ptest.RequireSend(ctx, t, a, p2p.Envelope{To: bID, Message: nil}) p2ptest.RequireSend(ctx, t, a, p2p.Envelope{To: bID, Message: nil})
p2ptest.RequireEmpty(t, a, b, c)
p2ptest.RequireEmpty(ctx, t, a, b, c)
// Sending a different message type should be dropped. // Sending a different message type should be dropped.
p2ptest.RequireSend(ctx, t, a, p2p.Envelope{To: bID, Message: &gogotypes.BoolValue{Value: true}}) p2ptest.RequireSend(ctx, t, a, p2p.Envelope{To: bID, Message: &gogotypes.BoolValue{Value: true}})
p2ptest.RequireEmpty(t, a, b, c)
p2ptest.RequireEmpty(ctx, t, a, b, c)
// Sending to an unknown peer should be dropped. // Sending to an unknown peer should be dropped.
p2ptest.RequireSend(ctx, t, a, p2p.Envelope{ p2ptest.RequireSend(ctx, t, a, p2p.Envelope{
To: types.NodeID(strings.Repeat("a", 40)), To: types.NodeID(strings.Repeat("a", 40)),
Message: &p2ptest.Message{Value: "a"}, Message: &p2ptest.Message{Value: "a"},
}) })
p2ptest.RequireEmpty(t, a, b, c)
p2ptest.RequireEmpty(ctx, t, a, b, c)
// Sending without a recipient should be dropped. // Sending without a recipient should be dropped.
p2ptest.RequireSend(ctx, t, a, p2p.Envelope{Message: &p2ptest.Message{Value: "noto"}}) p2ptest.RequireSend(ctx, t, a, p2p.Envelope{Message: &p2ptest.Message{Value: "noto"}})
p2ptest.RequireEmpty(t, a, b, c)
p2ptest.RequireEmpty(ctx, t, a, b, c)
// Sending to self should be dropped. // Sending to self should be dropped.
p2ptest.RequireSend(ctx, t, a, p2p.Envelope{To: aID, Message: &p2ptest.Message{Value: "self"}}) p2ptest.RequireSend(ctx, t, a, p2p.Envelope{To: aID, Message: &p2ptest.Message{Value: "self"}})
p2ptest.RequireEmpty(t, a, b, c)
p2ptest.RequireEmpty(ctx, t, a, b, c)
// Removing b and sending to it should be dropped. // Removing b and sending to it should be dropped.
network.Remove(ctx, t, bID) network.Remove(ctx, t, bID)
p2ptest.RequireSend(ctx, t, a, p2p.Envelope{To: bID, Message: &p2ptest.Message{Value: "nob"}}) p2ptest.RequireSend(ctx, t, a, p2p.Envelope{To: bID, Message: &p2ptest.Message{Value: "nob"}})
p2ptest.RequireEmpty(t, a, b, c)
p2ptest.RequireEmpty(ctx, t, a, b, c)
// After all this, sending a message c->a should work. // After all this, sending a message c->a should work.
p2ptest.RequireSend(ctx, t, c, p2p.Envelope{To: aID, Message: &p2ptest.Message{Value: "bar"}}) p2ptest.RequireSend(ctx, t, c, p2p.Envelope{To: aID, Message: &p2ptest.Message{Value: "bar"}})
p2ptest.RequireReceive(t, a, p2p.Envelope{From: cID, Message: &p2ptest.Message{Value: "bar"}})
p2ptest.RequireEmpty(t, a, b, c)
p2ptest.RequireReceive(ctx, t, a, p2p.Envelope{From: cID, Message: &p2ptest.Message{Value: "bar"}})
p2ptest.RequireEmpty(ctx, t, a, b, c)
// None of these messages should have made it onto the other channels. // None of these messages should have made it onto the other channels.
for _, other := range otherChannels { for _, other := range otherChannels {
p2ptest.RequireEmpty(t, other)
p2ptest.RequireEmpty(ctx, t, other)
} }
} }
@ -246,17 +244,17 @@ func TestRouter_Channel_Broadcast(t *testing.T) {
// Sending a broadcast from b should work. // Sending a broadcast from b should work.
p2ptest.RequireSend(ctx, t, b, p2p.Envelope{Broadcast: true, Message: &p2ptest.Message{Value: "foo"}}) p2ptest.RequireSend(ctx, t, b, p2p.Envelope{Broadcast: true, Message: &p2ptest.Message{Value: "foo"}})
p2ptest.RequireReceive(t, a, p2p.Envelope{From: bID, Message: &p2ptest.Message{Value: "foo"}})
p2ptest.RequireReceive(t, c, p2p.Envelope{From: bID, Message: &p2ptest.Message{Value: "foo"}})
p2ptest.RequireReceive(t, d, p2p.Envelope{From: bID, Message: &p2ptest.Message{Value: "foo"}})
p2ptest.RequireEmpty(t, a, b, c, d)
p2ptest.RequireReceive(ctx, t, a, p2p.Envelope{From: bID, Message: &p2ptest.Message{Value: "foo"}})
p2ptest.RequireReceive(ctx, t, c, p2p.Envelope{From: bID, Message: &p2ptest.Message{Value: "foo"}})
p2ptest.RequireReceive(ctx, t, d, p2p.Envelope{From: bID, Message: &p2ptest.Message{Value: "foo"}})
p2ptest.RequireEmpty(ctx, t, a, b, c, d)
// Removing one node from the network shouldn't prevent broadcasts from working. // Removing one node from the network shouldn't prevent broadcasts from working.
network.Remove(ctx, t, dID) network.Remove(ctx, t, dID)
p2ptest.RequireSend(ctx, t, a, p2p.Envelope{Broadcast: true, Message: &p2ptest.Message{Value: "bar"}}) p2ptest.RequireSend(ctx, t, a, p2p.Envelope{Broadcast: true, Message: &p2ptest.Message{Value: "bar"}})
p2ptest.RequireReceive(t, b, p2p.Envelope{From: aID, Message: &p2ptest.Message{Value: "bar"}})
p2ptest.RequireReceive(t, c, p2p.Envelope{From: aID, Message: &p2ptest.Message{Value: "bar"}})
p2ptest.RequireEmpty(t, a, b, c, d)
p2ptest.RequireReceive(ctx, t, b, p2p.Envelope{From: aID, Message: &p2ptest.Message{Value: "bar"}})
p2ptest.RequireReceive(ctx, t, c, p2p.Envelope{From: aID, Message: &p2ptest.Message{Value: "bar"}})
p2ptest.RequireEmpty(ctx, t, a, b, c, d)
} }
func TestRouter_Channel_Wrapper(t *testing.T) { func TestRouter_Channel_Wrapper(t *testing.T) {
@ -287,11 +285,11 @@ func TestRouter_Channel_Wrapper(t *testing.T) {
// should automatically wrap and unwrap sent messages -- we prepend the // should automatically wrap and unwrap sent messages -- we prepend the
// wrapper actions to the message value to signal this. // wrapper actions to the message value to signal this.
p2ptest.RequireSend(ctx, t, a, p2p.Envelope{To: bID, Message: &p2ptest.Message{Value: "foo"}}) p2ptest.RequireSend(ctx, t, a, p2p.Envelope{To: bID, Message: &p2ptest.Message{Value: "foo"}})
p2ptest.RequireReceive(t, b, p2p.Envelope{From: aID, Message: &p2ptest.Message{Value: "unwrap:wrap:foo"}})
p2ptest.RequireReceive(ctx, t, b, p2p.Envelope{From: aID, Message: &p2ptest.Message{Value: "unwrap:wrap:foo"}})
// If we send a different message that can't be wrapped, it should be dropped. // If we send a different message that can't be wrapped, it should be dropped.
p2ptest.RequireSend(ctx, t, a, p2p.Envelope{To: bID, Message: &gogotypes.BoolValue{Value: true}}) p2ptest.RequireSend(ctx, t, a, p2p.Envelope{To: bID, Message: &gogotypes.BoolValue{Value: true}})
p2ptest.RequireEmpty(t, b)
p2ptest.RequireEmpty(ctx, t, b)
// If we send the wrapper message itself, it should also be passed through // If we send the wrapper message itself, it should also be passed through
// since WrapperMessage supports it, and should only be unwrapped at the receiver. // since WrapperMessage supports it, and should only be unwrapped at the receiver.
@ -299,7 +297,7 @@ func TestRouter_Channel_Wrapper(t *testing.T) {
To: bID, To: bID,
Message: &wrapperMessage{Message: p2ptest.Message{Value: "foo"}}, Message: &wrapperMessage{Message: p2ptest.Message{Value: "foo"}},
}) })
p2ptest.RequireReceive(t, b, p2p.Envelope{
p2ptest.RequireReceive(ctx, t, b, p2p.Envelope{
From: aID, From: aID,
Message: &p2ptest.Message{Value: "unwrap:foo"}, Message: &p2ptest.Message{Value: "unwrap:foo"},
}) })


+ 19
- 23
internal/statesync/reactor.go View File

@ -503,7 +503,7 @@ func (r *Reactor) backfill(
// handleSnapshotMessage handles envelopes sent from peers on the // handleSnapshotMessage handles envelopes sent from peers on the
// SnapshotChannel. It returns an error only if the Envelope.Message is unknown // SnapshotChannel. It returns an error only if the Envelope.Message is unknown
// for this channel. This should never be called outside of handleMessage. // for this channel. This should never be called outside of handleMessage.
func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope p2p.Envelope) error {
func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envelope) error {
logger := r.logger.With("peer", envelope.From) logger := r.logger.With("peer", envelope.From)
switch msg := envelope.Message.(type) { switch msg := envelope.Message.(type) {
@ -575,7 +575,7 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope p2p.Envelo
// handleChunkMessage handles envelopes sent from peers on the ChunkChannel. // handleChunkMessage handles envelopes sent from peers on the ChunkChannel.
// It returns an error only if the Envelope.Message is unknown for this channel. // It returns an error only if the Envelope.Message is unknown for this channel.
// This should never be called outside of handleMessage. // This should never be called outside of handleMessage.
func (r *Reactor) handleChunkMessage(ctx context.Context, envelope p2p.Envelope) error {
func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope) error {
switch msg := envelope.Message.(type) { switch msg := envelope.Message.(type) {
case *ssproto.ChunkRequest: case *ssproto.ChunkRequest:
r.logger.Debug( r.logger.Debug(
@ -664,7 +664,7 @@ func (r *Reactor) handleChunkMessage(ctx context.Context, envelope p2p.Envelope)
return nil return nil
} }
func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope p2p.Envelope) error {
func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Envelope) error {
switch msg := envelope.Message.(type) { switch msg := envelope.Message.(type) {
case *ssproto.LightBlockRequest: case *ssproto.LightBlockRequest:
r.logger.Info("received light block request", "height", msg.Height) r.logger.Info("received light block request", "height", msg.Height)
@ -718,7 +718,7 @@ func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope p2p.Enve
return nil return nil
} }
func (r *Reactor) handleParamsMessage(ctx context.Context, envelope p2p.Envelope) error {
func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelope) error {
switch msg := envelope.Message.(type) { switch msg := envelope.Message.(type) {
case *ssproto.ParamsRequest: case *ssproto.ParamsRequest:
r.logger.Debug("received consensus params request", "height", msg.Height) r.logger.Debug("received consensus params request", "height", msg.Height)
@ -765,7 +765,7 @@ func (r *Reactor) handleParamsMessage(ctx context.Context, envelope p2p.Envelope
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel. // handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle // It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel. // any error returned by sending a PeerError on the respective channel.
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope p2p.Envelope) (err error) {
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) {
defer func() { defer func() {
if e := recover(); e != nil { if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e) err = fmt.Errorf("panic in processing message: %v", e)
@ -800,24 +800,20 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
// the respective channel. When the reactor is stopped, we will catch the signal // the respective channel. When the reactor is stopped, we will catch the signal
// and close the p2p Channel gracefully. // and close the p2p Channel gracefully.
func (r *Reactor) processCh(ctx context.Context, ch *p2p.Channel, chName string) { func (r *Reactor) processCh(ctx context.Context, ch *p2p.Channel, chName string) {
for {
select {
case <-ctx.Done():
r.logger.Debug("channel closed", "channel", chName)
return
case envelope := <-ch.In:
if err := r.handleMessage(ctx, ch.ID, envelope); err != nil {
r.logger.Error("failed to process message",
"err", err,
"channel", chName,
"ch_id", ch.ID,
"envelope", envelope)
if serr := ch.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
}
iter := ch.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, ch.ID, envelope); err != nil {
r.logger.Error("failed to process message",
"err", err,
"channel", chName,
"ch_id", ch.ID,
"envelope", envelope)
if serr := ch.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
return
} }
} }
} }


Loading…
Cancel
Save