|
@ -3,7 +3,6 @@ package pex |
|
|
import ( |
|
|
import ( |
|
|
"context" |
|
|
"context" |
|
|
"fmt" |
|
|
"fmt" |
|
|
"runtime/debug" |
|
|
|
|
|
"sync" |
|
|
"sync" |
|
|
"time" |
|
|
"time" |
|
|
|
|
|
|
|
@ -192,7 +191,7 @@ func (r *Reactor) processPexCh(ctx context.Context) { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// A request from another peer, or a response to one of our requests.
|
|
|
// A request from another peer, or a response to one of our requests.
|
|
|
dur, err := r.handleMessage(ctx, r.pexCh.ID, envelope) |
|
|
|
|
|
|
|
|
dur, err := r.handlePexMessage(ctx, envelope) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
r.logger.Error("failed to process message", |
|
|
r.logger.Error("failed to process message", |
|
|
"ch_id", r.pexCh.ID, "envelope", envelope, "err", err) |
|
|
"ch_id", r.pexCh.ID, "envelope", envelope, "err", err) |
|
@ -287,28 +286,6 @@ func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// handleMessage handles an Envelope sent from a peer on the specified Channel.
|
|
|
|
|
|
// This method will convert a panic in message handling as an error.
|
|
|
|
|
|
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (_ time.Duration, err error) { |
|
|
|
|
|
defer func() { |
|
|
|
|
|
if e := recover(); e != nil { |
|
|
|
|
|
err = fmt.Errorf("panic in processing message: %v", e) |
|
|
|
|
|
r.logger.Error( |
|
|
|
|
|
"recovering from processing message panic", |
|
|
|
|
|
"err", err, |
|
|
|
|
|
"stack", string(debug.Stack()), |
|
|
|
|
|
) |
|
|
|
|
|
} |
|
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
|
|
r.logger.Debug("received PEX message", "peer", envelope.From) |
|
|
|
|
|
|
|
|
|
|
|
if chID == p2p.ChannelID(PexChannel) { |
|
|
|
|
|
return r.handlePexMessage(ctx, envelope) |
|
|
|
|
|
} |
|
|
|
|
|
return 0, fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// processPeerUpdate processes a PeerUpdate. For added peers, PeerStatusUp, we
|
|
|
// processPeerUpdate processes a PeerUpdate. For added peers, PeerStatusUp, we
|
|
|
// send a request for addresses.
|
|
|
// send a request for addresses.
|
|
|
func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { |
|
|
func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { |
|
|