package p2ptest import ( "context" "errors" "testing" "time" "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/types" ) // RequireEmpty requires that the given channel is empty. func RequireEmpty(ctx context.Context, t *testing.T, channels ...*p2p.Channel) { t.Helper() ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) defer cancel() iter := p2p.MergedChannelIterator(ctx, channels...) count := 0 for iter.Next(ctx) { count++ require.Nil(t, iter.Envelope()) } require.Zero(t, count) require.Error(t, ctx.Err()) } // RequireReceive requires that the given envelope is received on the channel. func RequireReceive(ctx context.Context, t *testing.T, channel *p2p.Channel, expect p2p.Envelope) { t.Helper() ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() iter := channel.Receive(ctx) count := 0 for iter.Next(ctx) { count++ envelope := iter.Envelope() require.Equal(t, expect.From, envelope.From) require.Equal(t, expect.Message, envelope.Message) } if !assert.True(t, count >= 1) { require.NoError(t, ctx.Err(), "timed out waiting for message %v", expect) } } // RequireReceiveUnordered requires that the given envelopes are all received on // the channel, ignoring order. func RequireReceiveUnordered(ctx context.Context, t *testing.T, channel *p2p.Channel, expect []*p2p.Envelope) { ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() actual := []*p2p.Envelope{} iter := channel.Receive(ctx) for iter.Next(ctx) { actual = append(actual, iter.Envelope()) if len(actual) == len(expect) { require.ElementsMatch(t, expect, actual, "len=%d", len(actual)) return } } if errors.Is(ctx.Err(), context.DeadlineExceeded) { require.ElementsMatch(t, expect, actual) } } // RequireSend requires that the given envelope is sent on the channel. func RequireSend(ctx context.Context, t *testing.T, channel *p2p.Channel, envelope p2p.Envelope) { tctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() err := channel.Send(tctx, envelope) switch { case errors.Is(err, context.DeadlineExceeded): require.Fail(t, "timed out sending message to %q", envelope.To) default: require.NoError(t, err, "unexpected error") } } // RequireSendReceive requires that a given Protobuf message is sent to the // given peer, and then that the given response is received back. func RequireSendReceive( ctx context.Context, t *testing.T, channel *p2p.Channel, peerID types.NodeID, send proto.Message, receive proto.Message, ) { RequireSend(ctx, t, channel, p2p.Envelope{To: peerID, Message: send}) RequireReceive(ctx, t, channel, p2p.Envelope{From: peerID, Message: send}) } // RequireNoUpdates requires that a PeerUpdates subscription is empty. func RequireNoUpdates(ctx context.Context, t *testing.T, peerUpdates *p2p.PeerUpdates) { t.Helper() select { case update := <-peerUpdates.Updates(): if ctx.Err() == nil { require.Fail(t, "unexpected peer updates", "got %v", update) } case <-ctx.Done(): default: } } // RequireError requires that the given peer error is submitted for a peer. func RequireError(ctx context.Context, t *testing.T, channel *p2p.Channel, peerError p2p.PeerError) { tctx, tcancel := context.WithTimeout(ctx, time.Second) defer tcancel() err := channel.SendError(tctx, peerError) switch { case errors.Is(err, context.DeadlineExceeded): require.Fail(t, "timed out reporting error", "%v on %v", peerError, channel.ID) default: require.NoError(t, err, "unexpected error") } } // RequireUpdate requires that a PeerUpdates subscription yields the given update. func RequireUpdate(t *testing.T, peerUpdates *p2p.PeerUpdates, expect p2p.PeerUpdate) { timer := time.NewTimer(time.Second) // not time.After due to goroutine leaks defer timer.Stop() select { case update := <-peerUpdates.Updates(): require.Equal(t, expect.NodeID, update.NodeID, "node id did not match") require.Equal(t, expect.Status, update.Status, "statuses did not match") case <-timer.C: require.Fail(t, "timed out waiting for peer update", "expected %v", expect) } } // RequireUpdates requires that a PeerUpdates subscription yields the given updates // in the given order. func RequireUpdates(t *testing.T, peerUpdates *p2p.PeerUpdates, expect []p2p.PeerUpdate) { timer := time.NewTimer(time.Second) // not time.After due to goroutine leaks defer timer.Stop() actual := []p2p.PeerUpdate{} for { select { case update := <-peerUpdates.Updates(): actual = append(actual, update) if len(actual) == len(expect) { for idx := range expect { require.Equal(t, expect[idx].NodeID, actual[idx].NodeID) require.Equal(t, expect[idx].Status, actual[idx].Status) } return } case <-timer.C: require.Equal(t, expect, actual, "did not receive expected peer updates") return } } }