Browse Source

use context to provide timeouts!

pull/1842/head
Anton Kaliaev 7 years ago
parent
commit
e664f9c688
No known key found for this signature in database GPG Key ID: 7B6881D965918214
3 changed files with 85 additions and 61 deletions
  1. +5
    -2
      pubsub/example_test.go
  2. +31
    -22
      pubsub/pubsub.go
  3. +49
    -37
      pubsub/pubsub_test.go

+ 5
- 2
pubsub/example_test.go View File

@ -1,6 +1,7 @@
package pubsub_test package pubsub_test
import ( import (
"context"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -16,9 +17,11 @@ func TestExample(t *testing.T) {
s.Start() s.Start()
defer s.Stop() defer s.Stop()
ctx := context.Background()
ch := make(chan interface{}, 1) ch := make(chan interface{}, 1)
err := s.Subscribe("example-client", query.MustParse("abci.account.name=John"), ch)
err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name=John"), ch)
require.NoError(t, err)
err = s.PublishWithTags(ctx, "Tombstone", map[string]interface{}{"abci.account.name": "John"})
require.NoError(t, err) require.NoError(t, err)
s.PublishWithTags("Tombstone", map[string]interface{}{"abci.account.name": "John"})
assertReceive(t, "Tombstone", ch) assertReceive(t, "Tombstone", ch)
} }

+ 31
- 22
pubsub/pubsub.go View File

@ -14,8 +14,7 @@
package pubsub package pubsub
import ( import (
"errors"
"time"
"context"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
) )
@ -29,12 +28,6 @@ const (
shutdown shutdown
) )
const subscribeTimeout = 10 * time.Millisecond
var (
ErrorOverflow = errors.New("server overflowed")
)
type cmd struct { type cmd struct {
op operation op operation
query Query query Query
@ -97,37 +90,53 @@ func (s Server) BufferCapacity() int {
// Subscribe returns a channel on which messages matching the given query can // Subscribe returns a channel on which messages matching the given query can
// be received. If the subscription already exists old channel will be closed // be received. If the subscription already exists old channel will be closed
// and new one returned. Error will be returned to the caller if the server is
// overflowed.
func (s *Server) Subscribe(clientID string, query Query, out chan<- interface{}) error {
// and new one returned. Error will be returned to the caller if the context is
// cancelled.
func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error {
select { select {
case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}: case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}:
return nil return nil
case <-time.After(subscribeTimeout):
return ErrorOverflow
case <-ctx.Done():
return ctx.Err()
} }
} }
// Unsubscribe unsubscribes the given client from the query. Blocking.
func (s *Server) Unsubscribe(clientID string, query Query) {
s.cmds <- cmd{op: unsub, clientID: clientID, query: query}
// Unsubscribe unsubscribes the given client from the query. Error will be
// returned to the caller if the context is cancelled.
func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error {
select {
case s.cmds <- cmd{op: unsub, clientID: clientID, query: query}:
return nil
case <-ctx.Done():
return ctx.Err()
}
} }
// Unsubscribe unsubscribes the given channel. Blocking. // Unsubscribe unsubscribes the given channel. Blocking.
func (s *Server) UnsubscribeAll(clientID string) {
s.cmds <- cmd{op: unsub, clientID: clientID}
func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
select {
case s.cmds <- cmd{op: unsub, clientID: clientID}:
return nil
case <-ctx.Done():
return ctx.Err()
}
} }
// Publish publishes the given message. Blocking. // Publish publishes the given message. Blocking.
func (s *Server) Publish(msg interface{}) {
s.PublishWithTags(msg, make(map[string]interface{}))
func (s *Server) Publish(ctx context.Context, msg interface{}) error {
return s.PublishWithTags(ctx, msg, make(map[string]interface{}))
} }
// PublishWithTags publishes the given message with a set of tags. This set of // PublishWithTags publishes the given message with a set of tags. This set of
// tags will be matched with client queries. If there is a match, the message // tags will be matched with client queries. If there is a match, the message
// will be sent to a client. Blocking. // will be sent to a client. Blocking.
func (s *Server) PublishWithTags(msg interface{}, tags map[string]interface{}) {
s.cmds <- cmd{op: pub, msg: msg, tags: tags}
func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags map[string]interface{}) error {
select {
case s.cmds <- cmd{op: pub, msg: msg, tags: tags}:
return nil
case <-ctx.Done():
return ctx.Err()
}
} }
// OnStop implements Service.OnStop by shutting down the server. // OnStop implements Service.OnStop by shutting down the server.


+ 49
- 37
pubsub/pubsub_test.go View File

@ -1,6 +1,7 @@
package pubsub_test package pubsub_test
import ( import (
"context"
"fmt" "fmt"
"runtime/debug" "runtime/debug"
"testing" "testing"
@ -24,13 +25,16 @@ func TestSubscribe(t *testing.T) {
s.Start() s.Start()
defer s.Stop() defer s.Stop()
ctx := context.Background()
ch := make(chan interface{}, 1) ch := make(chan interface{}, 1)
err := s.Subscribe(clientID, query.Empty{}, ch)
err := s.Subscribe(ctx, clientID, query.Empty{}, ch)
require.NoError(t, err)
err = s.Publish(ctx, "Ka-Zar")
require.NoError(t, err) require.NoError(t, err)
s.Publish("Ka-Zar")
assertReceive(t, "Ka-Zar", ch) assertReceive(t, "Ka-Zar", ch)
s.Publish("Quicksilver")
err = s.Publish(ctx, "Quicksilver")
require.NoError(t, err)
assertReceive(t, "Quicksilver", ch) assertReceive(t, "Quicksilver", ch)
} }
@ -39,23 +43,28 @@ func TestDifferentClients(t *testing.T) {
s.SetLogger(log.TestingLogger()) s.SetLogger(log.TestingLogger())
s.Start() s.Start()
defer s.Stop() defer s.Stop()
ctx := context.Background()
ch1 := make(chan interface{}, 1) ch1 := make(chan interface{}, 1)
err := s.Subscribe("client-1", query.MustParse("tm.events.type=NewBlock"), ch1)
err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type=NewBlock"), ch1)
require.NoError(t, err)
err = s.PublishWithTags(ctx, "Iceman", map[string]interface{}{"tm.events.type": "NewBlock"})
require.NoError(t, err) require.NoError(t, err)
s.PublishWithTags("Iceman", map[string]interface{}{"tm.events.type": "NewBlock"})
assertReceive(t, "Iceman", ch1) assertReceive(t, "Iceman", ch1)
ch2 := make(chan interface{}, 1) ch2 := make(chan interface{}, 1)
err = s.Subscribe("client-2", query.MustParse("tm.events.type=NewBlock AND abci.account.name=Igor"), ch2)
err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type=NewBlock AND abci.account.name=Igor"), ch2)
require.NoError(t, err)
err = s.PublishWithTags(ctx, "Ultimo", map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"})
require.NoError(t, err) require.NoError(t, err)
s.PublishWithTags("Ultimo", map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"})
assertReceive(t, "Ultimo", ch1) assertReceive(t, "Ultimo", ch1)
assertReceive(t, "Ultimo", ch2) assertReceive(t, "Ultimo", ch2)
ch3 := make(chan interface{}, 1) ch3 := make(chan interface{}, 1)
err = s.Subscribe("client-3", query.MustParse("tm.events.type=NewRoundStep AND abci.account.name=Igor AND abci.invoice.number = 10"), ch3)
err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type=NewRoundStep AND abci.account.name=Igor AND abci.invoice.number = 10"), ch3)
require.NoError(t, err)
err = s.PublishWithTags(ctx, "Valeria Richards", map[string]interface{}{"tm.events.type": "NewRoundStep"})
require.NoError(t, err) require.NoError(t, err)
s.PublishWithTags("Valeria Richards", map[string]interface{}{"tm.events.type": "NewRoundStep"})
assert.Zero(t, len(ch3)) assert.Zero(t, len(ch3))
} }
@ -65,22 +74,25 @@ func TestClientResubscribes(t *testing.T) {
s.Start() s.Start()
defer s.Stop() defer s.Stop()
ctx := context.Background()
q := query.MustParse("tm.events.type=NewBlock") q := query.MustParse("tm.events.type=NewBlock")
ch1 := make(chan interface{}, 1) ch1 := make(chan interface{}, 1)
err := s.Subscribe(clientID, q, ch1)
err := s.Subscribe(ctx, clientID, q, ch1)
require.NoError(t, err)
err = s.PublishWithTags(ctx, "Goblin Queen", map[string]interface{}{"tm.events.type": "NewBlock"})
require.NoError(t, err) require.NoError(t, err)
s.PublishWithTags("Goblin Queen", map[string]interface{}{"tm.events.type": "NewBlock"})
assertReceive(t, "Goblin Queen", ch1) assertReceive(t, "Goblin Queen", ch1)
ch2 := make(chan interface{}, 1) ch2 := make(chan interface{}, 1)
err = s.Subscribe(clientID, q, ch2)
err = s.Subscribe(ctx, clientID, q, ch2)
require.NoError(t, err) require.NoError(t, err)
_, ok := <-ch1 _, ok := <-ch1
assert.False(t, ok) assert.False(t, ok)
s.PublishWithTags("Spider-Man", map[string]interface{}{"tm.events.type": "NewBlock"})
err = s.PublishWithTags(ctx, "Spider-Man", map[string]interface{}{"tm.events.type": "NewBlock"})
require.NoError(t, err)
assertReceive(t, "Spider-Man", ch2) assertReceive(t, "Spider-Man", ch2)
} }
@ -90,12 +102,15 @@ func TestUnsubscribe(t *testing.T) {
s.Start() s.Start()
defer s.Stop() defer s.Stop()
ctx := context.Background()
ch := make(chan interface{}) ch := make(chan interface{})
err := s.Subscribe(clientID, query.Empty{}, ch)
err := s.Subscribe(ctx, clientID, query.Empty{}, ch)
require.NoError(t, err)
err = s.Unsubscribe(ctx, clientID, query.Empty{})
require.NoError(t, err) require.NoError(t, err)
s.Unsubscribe(clientID, query.Empty{})
s.Publish("Nick Fury")
err = s.Publish(ctx, "Nick Fury")
require.NoError(t, err)
assert.Zero(t, len(ch), "Should not receive anything after Unsubscribe") assert.Zero(t, len(ch), "Should not receive anything after Unsubscribe")
_, ok := <-ch _, ok := <-ch
@ -108,15 +123,18 @@ func TestUnsubscribeAll(t *testing.T) {
s.Start() s.Start()
defer s.Stop() defer s.Stop()
ctx := context.Background()
ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1) ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1)
err := s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlock"), ch1)
err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type=NewBlock"), ch1)
require.NoError(t, err) require.NoError(t, err)
err = s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlockHeader"), ch2)
err = s.Subscribe(ctx, clientID, query.MustParse("tm.events.type=NewBlockHeader"), ch2)
require.NoError(t, err) require.NoError(t, err)
s.UnsubscribeAll(clientID)
err = s.UnsubscribeAll(ctx, clientID)
require.NoError(t, err)
s.Publish("Nick Fury")
err = s.Publish(ctx, "Nick Fury")
require.NoError(t, err)
assert.Zero(t, len(ch1), "Should not receive anything after UnsubscribeAll") assert.Zero(t, len(ch1), "Should not receive anything after UnsubscribeAll")
assert.Zero(t, len(ch2), "Should not receive anything after UnsubscribeAll") assert.Zero(t, len(ch2), "Should not receive anything after UnsubscribeAll")
@ -130,19 +148,11 @@ func TestBufferCapacity(t *testing.T) {
s := pubsub.NewServer(pubsub.BufferCapacity(2)) s := pubsub.NewServer(pubsub.BufferCapacity(2))
s.SetLogger(log.TestingLogger()) s.SetLogger(log.TestingLogger())
s.Publish("Nighthawk")
s.Publish("Sage")
}
func TestSubscribeReturnsErrorIfServerOverflowed(t *testing.T) {
s := pubsub.NewServer()
s.SetLogger(log.TestingLogger())
ch := make(chan interface{}, 1)
err := s.Subscribe(clientID, query.MustParse("tm.events.type=NewBlock"), ch)
if assert.Error(t, err) {
assert.Equal(t, pubsub.ErrorOverflow, err)
}
ctx := context.Background()
err := s.Publish(ctx, "Nighthawk")
require.NoError(t, err)
err = s.Publish(ctx, "Sage")
require.NoError(t, err)
} }
func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) } func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) }
@ -158,19 +168,20 @@ func benchmarkNClients(n int, b *testing.B) {
s.Start() s.Start()
defer s.Stop() defer s.Stop()
ctx := context.Background()
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
ch := make(chan interface{}) ch := make(chan interface{})
go func() { go func() {
for range ch { for range ch {
} }
}() }()
s.Subscribe(clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = Ivan AND abci.Invoices.Number = %d", i)), ch)
s.Subscribe(ctx, clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = Ivan AND abci.Invoices.Number = %d", i)), ch)
} }
b.ReportAllocs() b.ReportAllocs()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
s.PublishWithTags("Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i})
s.PublishWithTags(ctx, "Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i})
} }
} }
@ -179,6 +190,7 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) {
s.Start() s.Start()
defer s.Stop() defer s.Stop()
ctx := context.Background()
q := query.MustParse("abci.Account.Owner = Ivan AND abci.Invoices.Number = 1") q := query.MustParse("abci.Account.Owner = Ivan AND abci.Invoices.Number = 1")
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
ch := make(chan interface{}) ch := make(chan interface{})
@ -186,13 +198,13 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) {
for range ch { for range ch {
} }
}() }()
s.Subscribe(clientID, q, ch)
s.Subscribe(ctx, clientID, q, ch)
} }
b.ReportAllocs() b.ReportAllocs()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
s.PublishWithTags("Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1})
s.PublishWithTags(ctx, "Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1})
} }
} }


Loading…
Cancel
Save