|
@ -171,14 +171,10 @@ func (r *Reactor) processPexCh(ctx context.Context) { |
|
|
defer close(incoming) |
|
|
defer close(incoming) |
|
|
iter := r.pexCh.Receive(ctx) |
|
|
iter := r.pexCh.Receive(ctx) |
|
|
for iter.Next(ctx) { |
|
|
for iter.Next(ctx) { |
|
|
env := iter.Envelope() |
|
|
|
|
|
if env == nil { |
|
|
|
|
|
break |
|
|
|
|
|
} |
|
|
|
|
|
select { |
|
|
select { |
|
|
case <-ctx.Done(): |
|
|
case <-ctx.Done(): |
|
|
return |
|
|
return |
|
|
case incoming <- env: |
|
|
|
|
|
|
|
|
case incoming <- iter.Envelope(): |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
}() |
|
|
}() |
|
@ -198,7 +194,10 @@ func (r *Reactor) processPexCh(ctx context.Context) { |
|
|
} |
|
|
} |
|
|
// inbound requests for new peers or responses to requests sent by this
|
|
|
// inbound requests for new peers or responses to requests sent by this
|
|
|
// reactor
|
|
|
// reactor
|
|
|
case envelope := <-incoming: |
|
|
|
|
|
|
|
|
case envelope, ok := <-incoming: |
|
|
|
|
|
if !ok { |
|
|
|
|
|
return |
|
|
|
|
|
} |
|
|
duration, err = r.handleMessage(ctx, r.pexCh.ID, envelope) |
|
|
duration, err = r.handleMessage(ctx, r.pexCh.ID, envelope) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
r.logger.Error("failed to process message", "ch_id", r.pexCh.ID, "envelope", envelope, "err", err) |
|
|
r.logger.Error("failed to process message", "ch_id", r.pexCh.ID, "envelope", envelope, "err", err) |
|
|