Browse Source

remove overflow options

pull/1842/head
Anton Kaliaev 8 years ago
parent
commit
13207a5927
No known key found for this signature in database GPG Key ID: 7B6881D965918214
2 changed files with 8 additions and 136 deletions
  1. +8
    -83
      pubsub/pubsub.go
  2. +0
    -53
      pubsub/pubsub_test.go

+ 8
- 83
pubsub/pubsub.go View File

@ -10,26 +10,13 @@
// match, this message will be pushed to all clients, subscribed to that query.
// See query subpackage for our implementation.
//
// Overflow strategies (incoming publish requests):
//
// 1) drop - drops publish requests when there are too many of them
// 2) wait - blocks until the server is ready to accept more publish requests (default)
//
// Subscribe/Unsubscribe calls are always blocking.
//
// Overflow strategies (outgoing messages):
//
// 1) skip - do not send a message if the client is busy or slow (default)
// 2) wait - wait until the client is ready to accept new messages
//
package pubsub
import (
"errors"
"fmt"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
)
type operation int
@ -41,13 +28,6 @@ const (
shutdown
)
type overflowStrategy int
const (
drop overflowStrategy = iota
wait
)
var (
ErrorOverflow = errors.New("Server overflowed")
)
@ -72,20 +52,16 @@ type Server struct {
cmn.BaseService
cmds chan cmd
overflowStrategy overflowStrategy
slowClientStrategy overflowStrategy
}
// Option sets a parameter for the server.
type Option func(*Server)
// NewServer returns a new server. See the commentary on the Option functions
// for a detailed description of how to configure buffering and overflow
// behavior. If no options are provided, the resulting server's queue is
// unbuffered and it blocks when overflowed.
// for a detailed description of how to configure buffering. If no options are
// provided, the resulting server's queue is unbuffered.
func NewServer(options ...Option) *Server {
s := &Server{overflowStrategy: wait, slowClientStrategy: drop}
s := &Server{}
s.BaseService = *cmn.NewBaseService(nil, "PubSub", s)
for _, option := range options {
@ -111,38 +87,6 @@ func BufferCapacity(cap int) Option {
}
}
// OverflowStrategyDrop will tell the server to drop messages when it can't
// process more messages.
func OverflowStrategyDrop() Option {
return func(s *Server) {
s.overflowStrategy = drop
}
}
// OverflowStrategyWait will tell the server to block and wait for some time
// for server to process other messages. Default strategy.
func OverflowStrategyWait() func(*Server) {
return func(s *Server) {
s.overflowStrategy = wait
}
}
// WaitSlowClients will tell the server to block and wait until subscriber
// reads a messages even if it is fast enough to process them.
func WaitSlowClients() func(*Server) {
return func(s *Server) {
s.slowClientStrategy = wait
}
}
// SkipSlowClients will tell the server to skip subscriber if it is busy
// processing previous message(s). Default strategy.
func SkipSlowClients() func(*Server) {
return func(s *Server) {
s.slowClientStrategy = drop
}
}
// Subscribe returns a channel on which messages matching the given query can
// be received. If the subscription already exists old channel will be closed
// and new one returned.
@ -170,17 +114,7 @@ func (s *Server) Publish(msg interface{}) error {
// will be sent to a client.
func (s *Server) PublishWithTags(msg interface{}, tags map[string]interface{}) error {
pubCmd := cmd{op: pub, msg: msg, tags: tags}
switch s.overflowStrategy {
case drop:
select {
case s.cmds <- pubCmd:
default:
s.Logger.Error("Server overflowed, dropping message...", "msg", msg, "tags", fmt.Sprintf("%v", tags))
return ErrorOverflow
}
case wait:
s.cmds <- pubCmd
}
s.cmds <- pubCmd
return nil
}
@ -224,7 +158,7 @@ loop:
case sub:
state.add(cmd.clientID, cmd.query, cmd.ch)
case pub:
state.send(cmd.msg, cmd.tags, s.slowClientStrategy, s.Logger)
state.send(cmd.msg, cmd.tags)
}
}
}
@ -289,20 +223,11 @@ func (state *state) removeAll(clientID string) {
delete(state.clients, clientID)
}
func (state *state) send(msg interface{}, tags map[string]interface{}, slowClientStrategy overflowStrategy, logger log.Logger) {
func (state *state) send(msg interface{}, tags map[string]interface{}) {
for q, clientToChannelMap := range state.queries {
if q.Matches(tags) {
for clientID, ch := range clientToChannelMap {
switch slowClientStrategy {
case drop:
select {
case ch <- msg:
default:
logger.Error("Wanted to send a message, but the client is busy", "msg", msg, "tags", fmt.Sprintf("%v", tags), "clientID", clientID)
}
case wait:
ch <- msg
}
for _, ch := range clientToChannelMap {
ch <- msg
}
}
}


+ 0
- 53
pubsub/pubsub_test.go View File

@ -126,30 +126,6 @@ func TestUnsubscribeAll(t *testing.T) {
assert.False(t, ok)
}
func TestOverflowStrategyDrop(t *testing.T) {
s := pubsub.NewServer(pubsub.OverflowStrategyDrop())
s.SetLogger(log.TestingLogger())
err := s.Publish("Veda")
if assert.Error(t, err) {
assert.Equal(t, pubsub.ErrorOverflow, err)
}
}
func TestOverflowStrategyWait(t *testing.T) {
s := pubsub.NewServer(pubsub.OverflowStrategyWait())
s.SetLogger(log.TestingLogger())
go func() {
time.Sleep(1 * time.Second)
s.Start()
defer s.Stop()
}()
err := s.Publish("Veda")
assert.NoError(t, err)
}
func TestBufferCapacity(t *testing.T) {
s := pubsub.NewServer(pubsub.BufferCapacity(2))
s.SetLogger(log.TestingLogger())
@ -160,35 +136,6 @@ func TestBufferCapacity(t *testing.T) {
require.NoError(t, err)
}
func TestWaitSlowClients(t *testing.T) {
s := pubsub.NewServer(pubsub.WaitSlowClients())
s.SetLogger(log.TestingLogger())
s.Start()
defer s.Stop()
ch := make(chan interface{})
s.Subscribe(clientID, query.Empty{}, ch)
err := s.Publish("Wonderwoman")
require.NoError(t, err)
time.Sleep(1 * time.Second)
assertReceive(t, "Wonderwoman", ch)
}
func TestSkipSlowClients(t *testing.T) {
s := pubsub.NewServer(pubsub.SkipSlowClients())
s.SetLogger(log.TestingLogger())
s.Start()
defer s.Stop()
ch := make(chan interface{})
s.Subscribe(clientID, query.Empty{}, ch)
err := s.Publish("Cyclops")
require.NoError(t, err)
assert.Zero(t, len(ch))
}
func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) }
func Benchmark100Clients(b *testing.B) { benchmarkNClients(100, b) }
func Benchmark1000Clients(b *testing.B) { benchmarkNClients(1000, b) }


Loading…
Cancel
Save