From 4257407aea97f2ea91ceeade4e9fe9c579afe015 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 1 Feb 2019 15:28:25 +0400 Subject: [PATCH] bring back EventSubscriber --- libs/pubsub/subscription.go | 4 +++ node/node.go | 6 +--- rpc/client/helpers.go | 2 -- rpc/client/httpclient.go | 65 +++++++++++++++++++++++++++++-------- rpc/client/interface.go | 7 +--- rpc/client/types.go | 8 ----- rpc/core/events.go | 15 +++++++-- rpc/lib/server/handlers.go | 24 +++++++++----- rpc/lib/types/types.go | 12 +++++++ 9 files changed, 97 insertions(+), 46 deletions(-) diff --git a/libs/pubsub/subscription.go b/libs/pubsub/subscription.go index e86693245..95cabe345 100644 --- a/libs/pubsub/subscription.go +++ b/libs/pubsub/subscription.go @@ -59,6 +59,10 @@ type Message struct { tags map[string]string } +func NewMessage(data interface{}, tags map[string]string) Message { + return Message{data, tags} +} + // Data returns an original data published. func (msg Message) Data() interface{} { return msg.data diff --git a/node/node.go b/node/node.go index 53cfc2780..1b7319811 100644 --- a/node/node.go +++ b/node/node.go @@ -676,11 +676,7 @@ func (n *Node) startRPC() ([]net.Listener, error) { for i, listenAddr := range listenAddrs { mux := http.NewServeMux() rpcLogger := n.Logger.With("module", "rpc-server") - wm := rpcserver.NewWebsocketManager(rpccore.Routes, coreCodec, rpcserver.DisconnectCallback(func(remoteAddr string) { - // Unsubscribe a client upon disconnect since it won't be able to do it - // itself. - n.eventBus.UnsubscribeAll(context.TODO(), remoteAddr) - })) + wm := rpcserver.NewWebsocketManager(rpccore.Routes, coreCodec, rpcserver.EventSubscriber(n.eventBus)) wm.SetLogger(rpcLogger.With("protocol", "websocket")) mux.HandleFunc("/websocket", wm.WebsocketHandler) rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger) diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index 6b205393e..ec63fb3be 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -59,14 +59,12 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type const subscriber = "helpers" ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - evts := make(chan interface{}, 1) // register for the next event of this type sub, err := c.Subscribe(ctx, subscriber, types.QueryForEvent(evtTyp)) if err != nil { return nil, errors.Wrap(err, "failed to subscribe") } - // make sure to unregister after the test is over defer c.UnsubscribeAll(ctx, subscriber) diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index 51c1d2e21..a1dee9913 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -249,6 +249,28 @@ func (c *HTTP) Validators(height *int64) (*ctypes.ResultValidators, error) { /** websocket event stuff here... **/ +type subscription struct { + out chan tmpubsub.Message + cancelled chan struct{} + + mtx sync.RWMutex + err error +} + +func (s *subscription) Out() <-chan tmpubsub.Message { + return s.out +} + +func (s *subscription) Cancelled() <-chan struct{} { + return s.cancelled +} + +func (s *subscription) Err() error { + s.mtx.RLock() + defer s.mtx.RUnlock() + return s.err +} + type WSEvents struct { cmn.BaseService cdc *amino.Codec @@ -256,8 +278,9 @@ type WSEvents struct { endpoint string ws *rpcclient.WSClient - mtx sync.RWMutex - subscriptions map[string]chan<- EventMessage + mtx sync.RWMutex + // query -> subscription + subscriptions map[string]*subscription } func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents { @@ -265,7 +288,7 @@ func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents { cdc: cdc, endpoint: endpoint, remote: remote, - subscriptions: make(map[string]chan<- EventMessage), + subscriptions: make(map[string]*subscription), } wsEvents.BaseService = *cmn.NewBaseService(nil, "WSEvents", wsEvents) @@ -295,21 +318,29 @@ func (w *WSEvents) OnStop() { } } -func (w *WSEvents) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error { +func (w *WSEvents) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (types.Subscription, error) { q := query.String() err := w.ws.Subscribe(ctx, q) if err != nil { - return err + return nil, err + } + + outCap := 1 + if len(outCapacity) > 0 && outCapacity[0] >= 0 { + outCap = outCapacity[0] } w.mtx.Lock() // subscriber param is ignored because Tendermint will override it with // remote IP anyway. - w.subscriptions[q] = out + w.subscriptions[q] = &subscription{ + out: make(chan tmpubsub.Message, outCap), + cancelled: make(chan struct{}), + } w.mtx.Unlock() - return nil + return w.subscriptions[q], nil } func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error { @@ -321,9 +352,12 @@ func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber string, query tmp } w.mtx.Lock() - ch, ok := w.subscriptions[q] + sub, ok := w.subscriptions[q] if ok { - close(ch) + close(sub.cancelled) + sub.mtx.Lock() + sub.err = errors.New("unsubscribed") + sub.mtx.Unlock() delete(w.subscriptions, q) } w.mtx.Unlock() @@ -338,10 +372,13 @@ func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error } w.mtx.Lock() - for _, ch := range w.subscriptions { - close(ch) + for _, sub := range w.subscriptions { + close(sub.cancelled) + sub.mtx.Lock() + sub.err = errors.New("unsubscribed") + sub.mtx.Unlock() } - w.subscriptions = make(map[string]chan<- EventMessage) + w.subscriptions = make(map[string]*subscription) w.mtx.Unlock() return nil @@ -381,8 +418,8 @@ func (w *WSEvents) eventListener() { // NOTE: writing also happens inside mutex so we can't close a channel in // Unsubscribe/UnsubscribeAll. w.mtx.RLock() - if ch, ok := w.subscriptions[result.Query]; ok { - ch <- EventMessage{result.Data, result.Tags} + if sub, ok := w.subscriptions[result.Query]; ok { + sub.out <- tmpubsub.NewMessage(result.Data, result.Tags) } w.mtx.RUnlock() case <-w.Quit(): diff --git a/rpc/client/interface.go b/rpc/client/interface.go index eeb10d2b9..7477225e9 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -21,10 +21,7 @@ implementation. */ import ( - "context" - cmn "github.com/tendermint/tendermint/libs/common" - tmpubsub "github.com/tendermint/tendermint/libs/pubsub" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" ) @@ -94,9 +91,7 @@ type NetworkClient interface { // EventsClient is reactive, you can subscribe to any message, given the proper // string. see tendermint/types/events.go type EventsClient interface { - Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error - Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error - UnsubscribeAll(ctx context.Context, subscriber string) error + types.EventBusSubscriber } // MempoolClient shows us data about current mempool state. diff --git a/rpc/client/types.go b/rpc/client/types.go index b2c73479b..6a23fa450 100644 --- a/rpc/client/types.go +++ b/rpc/client/types.go @@ -1,7 +1,5 @@ package client -import "github.com/tendermint/tendermint/types" - // ABCIQueryOptions can be used to provide options for ABCIQuery call other // than the DefaultABCIQueryOptions. type ABCIQueryOptions struct { @@ -11,9 +9,3 @@ type ABCIQueryOptions struct { // DefaultABCIQueryOptions are latest height (0) and prove false. var DefaultABCIQueryOptions = ABCIQueryOptions{Height: 0, Prove: false} - -// EventMessage combines event data and tags. -type EventMessage struct { - Data types.TMEventData - Tags map[string]string -} diff --git a/rpc/core/events.go b/rpc/core/events.go index 8b4966c05..3b9493a20 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -9,6 +9,7 @@ import ( tmquery "github.com/tendermint/tendermint/libs/pubsub/query" ctypes "github.com/tendermint/tendermint/rpc/core/types" rpctypes "github.com/tendermint/tendermint/rpc/lib/types" + tmtypes "github.com/tendermint/tendermint/types" ) // Subscribe for events via WebSocket. @@ -100,7 +101,7 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) defer cancel() - sub, err := eventBus.Subscribe(ctx, addr, q) + sub, err := eventBusFor(wsCtx).Subscribe(ctx, addr, q) if err != nil { return nil, err } @@ -167,7 +168,7 @@ func Unsubscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultUnsub if err != nil { return nil, errors.Wrap(err, "failed to parse query") } - err = eventBus.Unsubscribe(context.Background(), addr, q) + err = eventBusFor(wsCtx).Unsubscribe(context.Background(), addr, q) if err != nil { return nil, err } @@ -201,9 +202,17 @@ func Unsubscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultUnsub func UnsubscribeAll(wsCtx rpctypes.WSRPCContext) (*ctypes.ResultUnsubscribe, error) { addr := wsCtx.GetRemoteAddr() logger.Info("Unsubscribe from all", "remote", addr) - err := eventBus.UnsubscribeAll(context.Background(), addr) + err := eventBusFor(wsCtx).UnsubscribeAll(context.Background(), addr) if err != nil { return nil, err } return &ctypes.ResultUnsubscribe{}, nil } + +func eventBusFor(wsCtx rpctypes.WSRPCContext) tmtypes.EventBusSubscriber { + es := wsCtx.GetEventSubscriber() + if es == nil { + es = eventBus + } + return es +} diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index b968f43c3..80eb4308a 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -2,6 +2,7 @@ package rpcserver import ( "bytes" + "context" "encoding/hex" "encoding/json" "fmt" @@ -433,8 +434,8 @@ type wsConnection struct { // Send pings to server with this period. Must be less than readWait, but greater than zero. pingPeriod time.Duration - // see DisconnectCallback option. - disconnectCallback func(remoteAddr string) + // object that is used to subscribe / unsubscribe from events + eventSub types.EventSubscriber } // NewWSConnection wraps websocket.Conn. @@ -467,11 +468,12 @@ func NewWSConnection( return wsc } -// DisconnectCallback can be used optionally to set a callback, which will be -// called upon disconnect - not Goroutine-safe. -func DisconnectCallback(cb func(remoteAddr string)) func(*wsConnection) { +// EventSubscriber sets object that is used to subscribe / unsubscribe from +// events - not Goroutine-safe. If none given, default node's eventBus will be +// used. +func EventSubscriber(eventSub types.EventSubscriber) func(*wsConnection) { return func(wsc *wsConnection) { - wsc.disconnectCallback = cb + wsc.eventSub = eventSub } } @@ -524,8 +526,9 @@ func (wsc *wsConnection) OnStart() error { func (wsc *wsConnection) OnStop() { // Both read and write loops close the websocket connection when they exit their loops. // The writeChan is never closed, to allow WriteRPCResponse() to fail. - if wsc.disconnectCallback != nil { - wsc.disconnectCallback(wsc.remoteAddr) + + if wsc.eventSub != nil { + wsc.eventSub.UnsubscribeAll(context.TODO(), wsc.remoteAddr) } } @@ -535,6 +538,11 @@ func (wsc *wsConnection) GetRemoteAddr() string { return wsc.remoteAddr } +// GetEventSubscriber implements WSRPCConnection by returning event subscriber. +func (wsc *wsConnection) GetEventSubscriber() types.EventSubscriber { + return wsc.eventSub +} + // WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted. // It implements WSRPCConnection. It is Goroutine-safe. func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) { diff --git a/rpc/lib/types/types.go b/rpc/lib/types/types.go index ceb7be83a..1e7f4e120 100644 --- a/rpc/lib/types/types.go +++ b/rpc/lib/types/types.go @@ -1,6 +1,7 @@ package rpctypes import ( + "context" "encoding/json" "fmt" "reflect" @@ -9,6 +10,9 @@ import ( "github.com/pkg/errors" amino "github.com/tendermint/go-amino" + + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" + tmtypes "github.com/tendermint/tendermint/types" ) // a wrapper to emulate a sum type: jsonrpcid = string | int @@ -237,6 +241,7 @@ type WSRPCConnection interface { GetRemoteAddr() string WriteRPCResponse(resp RPCResponse) TryWriteRPCResponse(resp RPCResponse) bool + GetEventSubscriber() EventSubscriber Codec() *amino.Codec } @@ -246,6 +251,13 @@ type WSRPCContext struct { WSRPCConnection } +// EventSubscriber mirros tendermint/tendermint/types.EventBusSubscriber +type EventSubscriber interface { + Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (tmtypes.Subscription, error) + Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error + UnsubscribeAll(ctx context.Context, subscriber string) error +} + //---------------------------------------- // SOCKETS //