Browse Source

p2p: implement interface for p2p.Channel without channels (#7378)

pull/7333/head
Sam Kleinman 2 years ago
committed by GitHub
parent
commit
4e355d80c4
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 437 additions and 75 deletions
  1. +216
    -0
      internal/p2p/channel.go
  2. +221
    -0
      internal/p2p/channel_test.go
  3. +0
    -75
      internal/p2p/router.go

+ 216
- 0
internal/p2p/channel.go View File

@ -0,0 +1,216 @@
package p2p
import (
"context"
"fmt"
"sync"
"github.com/gogo/protobuf/proto"
"github.com/tendermint/tendermint/types"
)
// Envelope contains a message with sender/receiver routing info.
type Envelope struct {
From types.NodeID // sender (empty if outbound)
To types.NodeID // receiver (empty if inbound)
Broadcast bool // send to all connected peers (ignores To)
Message proto.Message // message payload
// channelID is for internal Router use, set on outbound messages to inform
// the sendPeer() goroutine which transport channel to use.
//
// FIXME: If we migrate the Transport API to a byte-oriented multi-stream
// API, this will no longer be necessary since each channel will be mapped
// onto a stream during channel/peer setup. See:
// https://github.com/tendermint/spec/pull/227
channelID ChannelID
}
// Wrapper is a Protobuf message that can contain a variety of inner messages
// (e.g. via oneof fields). If a Channel's message type implements Wrapper, the
// Router will automatically wrap outbound messages and unwrap inbound messages,
// such that reactors do not have to do this themselves.
type Wrapper interface {
proto.Message
// Wrap will take a message and wrap it in this one if possible.
Wrap(proto.Message) error
// Unwrap will unwrap the inner message contained in this message.
Unwrap() (proto.Message, error)
}
// PeerError is a peer error reported via Channel.Error.
//
// FIXME: This currently just disconnects the peer, which is too simplistic.
// For example, some errors should be logged, some should cause disconnects,
// and some should ban the peer.
//
// FIXME: This should probably be replaced by a more general PeerBehavior
// concept that can mark good and bad behavior and contributes to peer scoring.
// It should possibly also allow reactors to request explicit actions, e.g.
// disconnection or banning, in addition to doing this based on aggregates.
type PeerError struct {
NodeID types.NodeID
Err error
}
func (pe PeerError) Error() string { return fmt.Sprintf("peer=%q: %s", pe.NodeID, pe.Err.Error()) }
func (pe PeerError) Unwrap() error { return pe.Err }
// Channel is a bidirectional channel to exchange Protobuf messages with peers.
// Each message is wrapped in an Envelope to specify its sender and receiver.
type Channel struct {
ID ChannelID
In <-chan Envelope // inbound messages (peers to reactors)
Out chan<- Envelope // outbound messages (reactors to peers)
Error chan<- PeerError // peer error reporting
messageType proto.Message // the channel's message type, used for unmarshaling
}
// NewChannel creates a new channel. It is primarily for internal and test
// use, reactors should use Router.OpenChannel().
func NewChannel(
id ChannelID,
messageType proto.Message,
inCh <-chan Envelope,
outCh chan<- Envelope,
errCh chan<- PeerError,
) *Channel {
return &Channel{
ID: id,
messageType: messageType,
In: inCh,
Out: outCh,
Error: errCh,
}
}
// Send blocks until the envelope has been sent, or until ctx ends.
// An error only occurs if the context ends before the send completes.
func (ch *Channel) Send(ctx context.Context, envelope Envelope) error {
select {
case <-ctx.Done():
return ctx.Err()
case ch.Out <- envelope:
return nil
}
}
// SendError blocks until the given error has been sent, or ctx ends.
// An error only occurs if the context ends before the send completes.
func (ch *Channel) SendError(ctx context.Context, pe PeerError) error {
select {
case <-ctx.Done():
return ctx.Err()
case ch.Error <- pe:
return nil
}
}
// Receive returns a new unbuffered iterator to receive messages from ch.
// The iterator runs until ctx ends.
func (ch *Channel) Receive(ctx context.Context) *ChannelIterator {
iter := &ChannelIterator{
pipe: make(chan Envelope), // unbuffered
}
go func() {
defer close(iter.pipe)
iteratorWorker(ctx, ch, iter.pipe)
}()
return iter
}
// ChannelIterator provides a context-aware path for callers
// (reactors) to process messages from the P2P layer without relying
// on the implementation details of the P2P layer. Channel provides
// access to it's Outbound stream as an iterator, and the
// MergedChannelIterator makes it possible to combine multiple
// channels into a single iterator.
type ChannelIterator struct {
pipe chan Envelope
current *Envelope
}
func iteratorWorker(ctx context.Context, ch *Channel, pipe chan Envelope) {
for {
select {
case <-ctx.Done():
return
case envelope := <-ch.In:
select {
case <-ctx.Done():
return
case pipe <- envelope:
}
}
}
}
// Next returns true when the Envelope value has advanced, and false
// when the context is canceled or iteration should stop. If an iterator has returned false,
// it will never return true again.
// in general, use Next, as in:
//
// for iter.Next(ctx) {
// envelope := iter.Envelope()
// // ... do things ...
// }
//
func (iter *ChannelIterator) Next(ctx context.Context) bool {
select {
case <-ctx.Done():
iter.current = nil
return false
case envelope, ok := <-iter.pipe:
if !ok {
iter.current = nil
return false
}
iter.current = &envelope
return true
}
}
// Envelope returns the current Envelope object held by the
// iterator. When the last call to Next returned true, Envelope will
// return a non-nil object. If Next returned false then Envelope is
// always nil.
func (iter *ChannelIterator) Envelope() *Envelope { return iter.current }
// MergedChannelIterator produces an iterator that merges the
// messages from the given channels in arbitrary order.
//
// This allows the caller to consume messages from multiple channels
// without needing to manage the concurrency separately.
func MergedChannelIterator(ctx context.Context, chs ...*Channel) *ChannelIterator {
iter := &ChannelIterator{
pipe: make(chan Envelope), // unbuffered
}
wg := new(sync.WaitGroup)
done := make(chan struct{})
go func() { defer close(done); wg.Wait() }()
go func() {
defer close(iter.pipe)
// we could return early if the context is canceled,
// but this is safer because it means the pipe stays
// open until all of the ch worker threads end, which
// should happen very quickly.
<-done
}()
for _, ch := range chs {
wg.Add(1)
go func(ch *Channel) {
defer wg.Done()
iteratorWorker(ctx, ch, iter.pipe)
}(ch)
}
return iter
}

+ 221
- 0
internal/p2p/channel_test.go View File

@ -0,0 +1,221 @@
package p2p
import (
"context"
"errors"
"testing"
"time"
"github.com/fortytw2/leaktest"
"github.com/stretchr/testify/require"
)
type channelInternal struct {
In chan Envelope
Out chan Envelope
Error chan PeerError
}
func testChannel(size int) (*channelInternal, *Channel) {
in := &channelInternal{
In: make(chan Envelope, size),
Out: make(chan Envelope, size),
Error: make(chan PeerError, size),
}
ch := &Channel{
In: in.In,
Out: in.Out,
Error: in.Error,
}
return in, ch
}
func TestChannel(t *testing.T) {
t.Cleanup(leaktest.Check(t))
bctx, bcancel := context.WithCancel(context.Background())
defer bcancel()
testCases := []struct {
Name string
Case func(context.Context, *testing.T)
}{
{
Name: "Send",
Case: func(ctx context.Context, t *testing.T) {
ins, ch := testChannel(1)
require.NoError(t, ch.Send(ctx, Envelope{From: "kip", To: "merlin"}))
res, ok := <-ins.Out
require.True(t, ok)
require.EqualValues(t, "kip", res.From)
require.EqualValues(t, "merlin", res.To)
},
},
{
Name: "SendError",
Case: func(ctx context.Context, t *testing.T) {
ins, ch := testChannel(1)
require.NoError(t, ch.SendError(ctx, PeerError{NodeID: "kip", Err: errors.New("merlin")}))
res, ok := <-ins.Error
require.True(t, ok)
require.EqualValues(t, "kip", res.NodeID)
require.EqualValues(t, "merlin", res.Err.Error())
},
},
{
Name: "SendWithCanceledContext",
Case: func(ctx context.Context, t *testing.T) {
_, ch := testChannel(0)
cctx, ccancel := context.WithCancel(ctx)
ccancel()
require.Error(t, ch.Send(cctx, Envelope{From: "kip", To: "merlin"}))
},
},
{
Name: "SendErrorWithCanceledContext",
Case: func(ctx context.Context, t *testing.T) {
_, ch := testChannel(0)
cctx, ccancel := context.WithCancel(ctx)
ccancel()
require.Error(t, ch.SendError(cctx, PeerError{NodeID: "kip", Err: errors.New("merlin")}))
},
},
{
Name: "ReceiveEmptyIteratorBlocks",
Case: func(ctx context.Context, t *testing.T) {
_, ch := testChannel(1)
iter := ch.Receive(ctx)
require.NotNil(t, iter)
out := make(chan bool)
go func() {
defer close(out)
select {
case <-ctx.Done():
case out <- iter.Next(ctx):
}
}()
select {
case <-time.After(10 * time.Millisecond):
case <-out:
require.Fail(t, "iterator should not advance")
}
require.Nil(t, iter.Envelope())
},
},
{
Name: "ReceiveWithData",
Case: func(ctx context.Context, t *testing.T) {
ins, ch := testChannel(1)
ins.In <- Envelope{From: "kip", To: "merlin"}
iter := ch.Receive(ctx)
require.NotNil(t, iter)
require.True(t, iter.Next(ctx))
res := iter.Envelope()
require.EqualValues(t, "kip", res.From)
require.EqualValues(t, "merlin", res.To)
},
},
{
Name: "ReceiveWithCanceledContext",
Case: func(ctx context.Context, t *testing.T) {
_, ch := testChannel(0)
cctx, ccancel := context.WithCancel(ctx)
ccancel()
iter := ch.Receive(cctx)
require.NotNil(t, iter)
require.False(t, iter.Next(cctx))
require.Nil(t, iter.Envelope())
},
},
{
Name: "IteratorWithCanceledContext",
Case: func(ctx context.Context, t *testing.T) {
_, ch := testChannel(0)
iter := ch.Receive(ctx)
require.NotNil(t, iter)
cctx, ccancel := context.WithCancel(ctx)
ccancel()
require.False(t, iter.Next(cctx))
require.Nil(t, iter.Envelope())
},
},
{
Name: "IteratorCanceledAfterFirstUseBecomesNil",
Case: func(ctx context.Context, t *testing.T) {
ins, ch := testChannel(1)
ins.In <- Envelope{From: "kip", To: "merlin"}
iter := ch.Receive(ctx)
require.NotNil(t, iter)
require.True(t, iter.Next(ctx))
res := iter.Envelope()
require.EqualValues(t, "kip", res.From)
require.EqualValues(t, "merlin", res.To)
cctx, ccancel := context.WithCancel(ctx)
ccancel()
require.False(t, iter.Next(cctx))
require.Nil(t, iter.Envelope())
},
},
{
Name: "IteratorMultipleNextCalls",
Case: func(ctx context.Context, t *testing.T) {
ins, ch := testChannel(1)
ins.In <- Envelope{From: "kip", To: "merlin"}
iter := ch.Receive(ctx)
require.NotNil(t, iter)
require.True(t, iter.Next(ctx))
res := iter.Envelope()
require.EqualValues(t, "kip", res.From)
require.EqualValues(t, "merlin", res.To)
res1 := iter.Envelope()
require.Equal(t, res, res1)
},
},
{
Name: "IteratorProducesNilObjectBeforeNext",
Case: func(ctx context.Context, t *testing.T) {
ins, ch := testChannel(1)
iter := ch.Receive(ctx)
require.NotNil(t, iter)
require.Nil(t, iter.Envelope())
ins.In <- Envelope{From: "kip", To: "merlin"}
require.NotNil(t, iter)
require.True(t, iter.Next(ctx))
res := iter.Envelope()
require.NotNil(t, res)
require.EqualValues(t, "kip", res.From)
require.EqualValues(t, "merlin", res.To)
},
},
}
for _, tc := range testCases {
t.Run(tc.Name, func(t *testing.T) {
t.Cleanup(leaktest.Check(t))
ctx, cancel := context.WithCancel(bctx)
defer cancel()
tc.Case(ctx, t)
})
}
}

+ 0
- 75
internal/p2p/router.go View File

@ -21,81 +21,6 @@ import (
const queueBufferDefault = 32
// Envelope contains a message with sender/receiver routing info.
type Envelope struct {
From types.NodeID // sender (empty if outbound)
To types.NodeID // receiver (empty if inbound)
Broadcast bool // send to all connected peers (ignores To)
Message proto.Message // message payload
// channelID is for internal Router use, set on outbound messages to inform
// the sendPeer() goroutine which transport channel to use.
//
// FIXME: If we migrate the Transport API to a byte-oriented multi-stream
// API, this will no longer be necessary since each channel will be mapped
// onto a stream during channel/peer setup. See:
// https://github.com/tendermint/spec/pull/227
channelID ChannelID
}
// PeerError is a peer error reported via Channel.Error.
//
// FIXME: This currently just disconnects the peer, which is too simplistic.
// For example, some errors should be logged, some should cause disconnects,
// and some should ban the peer.
//
// FIXME: This should probably be replaced by a more general PeerBehavior
// concept that can mark good and bad behavior and contributes to peer scoring.
// It should possibly also allow reactors to request explicit actions, e.g.
// disconnection or banning, in addition to doing this based on aggregates.
type PeerError struct {
NodeID types.NodeID
Err error
}
// Channel is a bidirectional channel to exchange Protobuf messages with peers,
// wrapped in Envelope to specify routing info (i.e. sender/receiver).
type Channel struct {
ID ChannelID
In <-chan Envelope // inbound messages (peers to reactors)
Out chan<- Envelope // outbound messages (reactors to peers)
Error chan<- PeerError // peer error reporting
messageType proto.Message // the channel's message type, used for unmarshaling
}
// NewChannel creates a new channel. It is primarily for internal and test
// use, reactors should use Router.OpenChannel().
func NewChannel(
id ChannelID,
messageType proto.Message,
inCh <-chan Envelope,
outCh chan<- Envelope,
errCh chan<- PeerError,
) *Channel {
return &Channel{
ID: id,
messageType: messageType,
In: inCh,
Out: outCh,
Error: errCh,
}
}
// Wrapper is a Protobuf message that can contain a variety of inner messages
// (e.g. via oneof fields). If a Channel's message type implements Wrapper, the
// Router will automatically wrap outbound messages and unwrap inbound messages,
// such that reactors do not have to do this themselves.
type Wrapper interface {
proto.Message
// Wrap will take a message and wrap it in this one if possible.
Wrap(proto.Message) error
// Unwrap will unwrap the inner message contained in this message.
Unwrap() (proto.Message, error)
}
// RouterOptions specifies options for a Router.
type RouterOptions struct {
// ResolveTimeout is the timeout for resolving NodeAddress URLs.


Loading…
Cancel
Save