diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index aec60bcc0..2b1a569c7 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -10,26 +10,13 @@ // match, this message will be pushed to all clients, subscribed to that query. // See query subpackage for our implementation. // -// Overflow strategies (incoming publish requests): -// -// 1) drop - drops publish requests when there are too many of them -// 2) wait - blocks until the server is ready to accept more publish requests (default) -// // Subscribe/Unsubscribe calls are always blocking. -// -// Overflow strategies (outgoing messages): -// -// 1) skip - do not send a message if the client is busy or slow (default) -// 2) wait - wait until the client is ready to accept new messages -// package pubsub import ( "errors" - "fmt" cmn "github.com/tendermint/tmlibs/common" - "github.com/tendermint/tmlibs/log" ) type operation int @@ -41,13 +28,6 @@ const ( shutdown ) -type overflowStrategy int - -const ( - drop overflowStrategy = iota - wait -) - var ( ErrorOverflow = errors.New("Server overflowed") ) @@ -72,20 +52,16 @@ type Server struct { cmn.BaseService cmds chan cmd - - overflowStrategy overflowStrategy - slowClientStrategy overflowStrategy } // Option sets a parameter for the server. type Option func(*Server) // NewServer returns a new server. See the commentary on the Option functions -// for a detailed description of how to configure buffering and overflow -// behavior. If no options are provided, the resulting server's queue is -// unbuffered and it blocks when overflowed. +// for a detailed description of how to configure buffering. If no options are +// provided, the resulting server's queue is unbuffered. func NewServer(options ...Option) *Server { - s := &Server{overflowStrategy: wait, slowClientStrategy: drop} + s := &Server{} s.BaseService = *cmn.NewBaseService(nil, "PubSub", s) for _, option := range options { @@ -111,38 +87,6 @@ func BufferCapacity(cap int) Option { } } -// OverflowStrategyDrop will tell the server to drop messages when it can't -// process more messages. -func OverflowStrategyDrop() Option { - return func(s *Server) { - s.overflowStrategy = drop - } -} - -// OverflowStrategyWait will tell the server to block and wait for some time -// for server to process other messages. Default strategy. -func OverflowStrategyWait() func(*Server) { - return func(s *Server) { - s.overflowStrategy = wait - } -} - -// WaitSlowClients will tell the server to block and wait until subscriber -// reads a messages even if it is fast enough to process them. -func WaitSlowClients() func(*Server) { - return func(s *Server) { - s.slowClientStrategy = wait - } -} - -// SkipSlowClients will tell the server to skip subscriber if it is busy -// processing previous message(s). Default strategy. -func SkipSlowClients() func(*Server) { - return func(s *Server) { - s.slowClientStrategy = drop - } -} - // Subscribe returns a channel on which messages matching the given query can // be received. If the subscription already exists old channel will be closed // and new one returned. @@ -170,17 +114,7 @@ func (s *Server) Publish(msg interface{}) error { // will be sent to a client. func (s *Server) PublishWithTags(msg interface{}, tags map[string]interface{}) error { pubCmd := cmd{op: pub, msg: msg, tags: tags} - switch s.overflowStrategy { - case drop: - select { - case s.cmds <- pubCmd: - default: - s.Logger.Error("Server overflowed, dropping message...", "msg", msg, "tags", fmt.Sprintf("%v", tags)) - return ErrorOverflow - } - case wait: - s.cmds <- pubCmd - } + s.cmds <- pubCmd return nil } @@ -224,7 +158,7 @@ loop: case sub: state.add(cmd.clientID, cmd.query, cmd.ch) case pub: - state.send(cmd.msg, cmd.tags, s.slowClientStrategy, s.Logger) + state.send(cmd.msg, cmd.tags) } } } @@ -289,20 +223,11 @@ func (state *state) removeAll(clientID string) { delete(state.clients, clientID) } -func (state *state) send(msg interface{}, tags map[string]interface{}, slowClientStrategy overflowStrategy, logger log.Logger) { +func (state *state) send(msg interface{}, tags map[string]interface{}) { for q, clientToChannelMap := range state.queries { if q.Matches(tags) { - for clientID, ch := range clientToChannelMap { - switch slowClientStrategy { - case drop: - select { - case ch <- msg: - default: - logger.Error("Wanted to send a message, but the client is busy", "msg", msg, "tags", fmt.Sprintf("%v", tags), "clientID", clientID) - } - case wait: - ch <- msg - } + for _, ch := range clientToChannelMap { + ch <- msg } } } diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 3112ab5d3..7cc4e5998 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -126,30 +126,6 @@ func TestUnsubscribeAll(t *testing.T) { assert.False(t, ok) } -func TestOverflowStrategyDrop(t *testing.T) { - s := pubsub.NewServer(pubsub.OverflowStrategyDrop()) - s.SetLogger(log.TestingLogger()) - - err := s.Publish("Veda") - if assert.Error(t, err) { - assert.Equal(t, pubsub.ErrorOverflow, err) - } -} - -func TestOverflowStrategyWait(t *testing.T) { - s := pubsub.NewServer(pubsub.OverflowStrategyWait()) - s.SetLogger(log.TestingLogger()) - - go func() { - time.Sleep(1 * time.Second) - s.Start() - defer s.Stop() - }() - - err := s.Publish("Veda") - assert.NoError(t, err) -} - func TestBufferCapacity(t *testing.T) { s := pubsub.NewServer(pubsub.BufferCapacity(2)) s.SetLogger(log.TestingLogger()) @@ -160,35 +136,6 @@ func TestBufferCapacity(t *testing.T) { require.NoError(t, err) } -func TestWaitSlowClients(t *testing.T) { - s := pubsub.NewServer(pubsub.WaitSlowClients()) - s.SetLogger(log.TestingLogger()) - s.Start() - defer s.Stop() - - ch := make(chan interface{}) - s.Subscribe(clientID, query.Empty{}, ch) - err := s.Publish("Wonderwoman") - require.NoError(t, err) - - time.Sleep(1 * time.Second) - - assertReceive(t, "Wonderwoman", ch) -} - -func TestSkipSlowClients(t *testing.T) { - s := pubsub.NewServer(pubsub.SkipSlowClients()) - s.SetLogger(log.TestingLogger()) - s.Start() - defer s.Stop() - - ch := make(chan interface{}) - s.Subscribe(clientID, query.Empty{}, ch) - err := s.Publish("Cyclops") - require.NoError(t, err) - assert.Zero(t, len(ch)) -} - func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) } func Benchmark100Clients(b *testing.B) { benchmarkNClients(100, b) } func Benchmark1000Clients(b *testing.B) { benchmarkNClients(1000, b) }