Browse Source

bring back EventSubscriber

pull/3227/head
Anton Kaliaev 6 years ago
parent
commit
4257407aea
No known key found for this signature in database GPG Key ID: 7B6881D965918214
9 changed files with 97 additions and 46 deletions
  1. +4
    -0
      libs/pubsub/subscription.go
  2. +1
    -5
      node/node.go
  3. +0
    -2
      rpc/client/helpers.go
  4. +51
    -14
      rpc/client/httpclient.go
  5. +1
    -6
      rpc/client/interface.go
  6. +0
    -8
      rpc/client/types.go
  7. +12
    -3
      rpc/core/events.go
  8. +16
    -8
      rpc/lib/server/handlers.go
  9. +12
    -0
      rpc/lib/types/types.go

+ 4
- 0
libs/pubsub/subscription.go View File

@ -59,6 +59,10 @@ type Message struct {
tags map[string]string tags map[string]string
} }
func NewMessage(data interface{}, tags map[string]string) Message {
return Message{data, tags}
}
// Data returns an original data published. // Data returns an original data published.
func (msg Message) Data() interface{} { func (msg Message) Data() interface{} {
return msg.data return msg.data


+ 1
- 5
node/node.go View File

@ -676,11 +676,7 @@ func (n *Node) startRPC() ([]net.Listener, error) {
for i, listenAddr := range listenAddrs { for i, listenAddr := range listenAddrs {
mux := http.NewServeMux() mux := http.NewServeMux()
rpcLogger := n.Logger.With("module", "rpc-server") rpcLogger := n.Logger.With("module", "rpc-server")
wm := rpcserver.NewWebsocketManager(rpccore.Routes, coreCodec, rpcserver.DisconnectCallback(func(remoteAddr string) {
// Unsubscribe a client upon disconnect since it won't be able to do it
// itself.
n.eventBus.UnsubscribeAll(context.TODO(), remoteAddr)
}))
wm := rpcserver.NewWebsocketManager(rpccore.Routes, coreCodec, rpcserver.EventSubscriber(n.eventBus))
wm.SetLogger(rpcLogger.With("protocol", "websocket")) wm.SetLogger(rpcLogger.With("protocol", "websocket"))
mux.HandleFunc("/websocket", wm.WebsocketHandler) mux.HandleFunc("/websocket", wm.WebsocketHandler)
rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger) rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger)


+ 0
- 2
rpc/client/helpers.go View File

@ -59,14 +59,12 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type
const subscriber = "helpers" const subscriber = "helpers"
ctx, cancel := context.WithTimeout(context.Background(), timeout) ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel() defer cancel()
evts := make(chan interface{}, 1)
// register for the next event of this type // register for the next event of this type
sub, err := c.Subscribe(ctx, subscriber, types.QueryForEvent(evtTyp)) sub, err := c.Subscribe(ctx, subscriber, types.QueryForEvent(evtTyp))
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to subscribe") return nil, errors.Wrap(err, "failed to subscribe")
} }
// make sure to unregister after the test is over // make sure to unregister after the test is over
defer c.UnsubscribeAll(ctx, subscriber) defer c.UnsubscribeAll(ctx, subscriber)


+ 51
- 14
rpc/client/httpclient.go View File

@ -249,6 +249,28 @@ func (c *HTTP) Validators(height *int64) (*ctypes.ResultValidators, error) {
/** websocket event stuff here... **/ /** websocket event stuff here... **/
type subscription struct {
out chan tmpubsub.Message
cancelled chan struct{}
mtx sync.RWMutex
err error
}
func (s *subscription) Out() <-chan tmpubsub.Message {
return s.out
}
func (s *subscription) Cancelled() <-chan struct{} {
return s.cancelled
}
func (s *subscription) Err() error {
s.mtx.RLock()
defer s.mtx.RUnlock()
return s.err
}
type WSEvents struct { type WSEvents struct {
cmn.BaseService cmn.BaseService
cdc *amino.Codec cdc *amino.Codec
@ -256,8 +278,9 @@ type WSEvents struct {
endpoint string endpoint string
ws *rpcclient.WSClient ws *rpcclient.WSClient
mtx sync.RWMutex
subscriptions map[string]chan<- EventMessage
mtx sync.RWMutex
// query -> subscription
subscriptions map[string]*subscription
} }
func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents { func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents {
@ -265,7 +288,7 @@ func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents {
cdc: cdc, cdc: cdc,
endpoint: endpoint, endpoint: endpoint,
remote: remote, remote: remote,
subscriptions: make(map[string]chan<- EventMessage),
subscriptions: make(map[string]*subscription),
} }
wsEvents.BaseService = *cmn.NewBaseService(nil, "WSEvents", wsEvents) wsEvents.BaseService = *cmn.NewBaseService(nil, "WSEvents", wsEvents)
@ -295,21 +318,29 @@ func (w *WSEvents) OnStop() {
} }
} }
func (w *WSEvents) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error {
func (w *WSEvents) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (types.Subscription, error) {
q := query.String() q := query.String()
err := w.ws.Subscribe(ctx, q) err := w.ws.Subscribe(ctx, q)
if err != nil { if err != nil {
return err
return nil, err
}
outCap := 1
if len(outCapacity) > 0 && outCapacity[0] >= 0 {
outCap = outCapacity[0]
} }
w.mtx.Lock() w.mtx.Lock()
// subscriber param is ignored because Tendermint will override it with // subscriber param is ignored because Tendermint will override it with
// remote IP anyway. // remote IP anyway.
w.subscriptions[q] = out
w.subscriptions[q] = &subscription{
out: make(chan tmpubsub.Message, outCap),
cancelled: make(chan struct{}),
}
w.mtx.Unlock() w.mtx.Unlock()
return nil
return w.subscriptions[q], nil
} }
func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error { func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error {
@ -321,9 +352,12 @@ func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber string, query tmp
} }
w.mtx.Lock() w.mtx.Lock()
ch, ok := w.subscriptions[q]
sub, ok := w.subscriptions[q]
if ok { if ok {
close(ch)
close(sub.cancelled)
sub.mtx.Lock()
sub.err = errors.New("unsubscribed")
sub.mtx.Unlock()
delete(w.subscriptions, q) delete(w.subscriptions, q)
} }
w.mtx.Unlock() w.mtx.Unlock()
@ -338,10 +372,13 @@ func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error
} }
w.mtx.Lock() w.mtx.Lock()
for _, ch := range w.subscriptions {
close(ch)
for _, sub := range w.subscriptions {
close(sub.cancelled)
sub.mtx.Lock()
sub.err = errors.New("unsubscribed")
sub.mtx.Unlock()
} }
w.subscriptions = make(map[string]chan<- EventMessage)
w.subscriptions = make(map[string]*subscription)
w.mtx.Unlock() w.mtx.Unlock()
return nil return nil
@ -381,8 +418,8 @@ func (w *WSEvents) eventListener() {
// NOTE: writing also happens inside mutex so we can't close a channel in // NOTE: writing also happens inside mutex so we can't close a channel in
// Unsubscribe/UnsubscribeAll. // Unsubscribe/UnsubscribeAll.
w.mtx.RLock() w.mtx.RLock()
if ch, ok := w.subscriptions[result.Query]; ok {
ch <- EventMessage{result.Data, result.Tags}
if sub, ok := w.subscriptions[result.Query]; ok {
sub.out <- tmpubsub.NewMessage(result.Data, result.Tags)
} }
w.mtx.RUnlock() w.mtx.RUnlock()
case <-w.Quit(): case <-w.Quit():


+ 1
- 6
rpc/client/interface.go View File

@ -21,10 +21,7 @@ implementation.
*/ */
import ( import (
"context"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -94,9 +91,7 @@ type NetworkClient interface {
// EventsClient is reactive, you can subscribe to any message, given the proper // EventsClient is reactive, you can subscribe to any message, given the proper
// string. see tendermint/types/events.go // string. see tendermint/types/events.go
type EventsClient interface { type EventsClient interface {
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error
Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
UnsubscribeAll(ctx context.Context, subscriber string) error
types.EventBusSubscriber
} }
// MempoolClient shows us data about current mempool state. // MempoolClient shows us data about current mempool state.


+ 0
- 8
rpc/client/types.go View File

@ -1,7 +1,5 @@
package client package client
import "github.com/tendermint/tendermint/types"
// ABCIQueryOptions can be used to provide options for ABCIQuery call other // ABCIQueryOptions can be used to provide options for ABCIQuery call other
// than the DefaultABCIQueryOptions. // than the DefaultABCIQueryOptions.
type ABCIQueryOptions struct { type ABCIQueryOptions struct {
@ -11,9 +9,3 @@ type ABCIQueryOptions struct {
// DefaultABCIQueryOptions are latest height (0) and prove false. // DefaultABCIQueryOptions are latest height (0) and prove false.
var DefaultABCIQueryOptions = ABCIQueryOptions{Height: 0, Prove: false} var DefaultABCIQueryOptions = ABCIQueryOptions{Height: 0, Prove: false}
// EventMessage combines event data and tags.
type EventMessage struct {
Data types.TMEventData
Tags map[string]string
}

+ 12
- 3
rpc/core/events.go View File

@ -9,6 +9,7 @@ import (
tmquery "github.com/tendermint/tendermint/libs/pubsub/query" tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types" rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
tmtypes "github.com/tendermint/tendermint/types"
) )
// Subscribe for events via WebSocket. // Subscribe for events via WebSocket.
@ -100,7 +101,7 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel() defer cancel()
sub, err := eventBus.Subscribe(ctx, addr, q)
sub, err := eventBusFor(wsCtx).Subscribe(ctx, addr, q)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -167,7 +168,7 @@ func Unsubscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultUnsub
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to parse query") return nil, errors.Wrap(err, "failed to parse query")
} }
err = eventBus.Unsubscribe(context.Background(), addr, q)
err = eventBusFor(wsCtx).Unsubscribe(context.Background(), addr, q)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -201,9 +202,17 @@ func Unsubscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultUnsub
func UnsubscribeAll(wsCtx rpctypes.WSRPCContext) (*ctypes.ResultUnsubscribe, error) { func UnsubscribeAll(wsCtx rpctypes.WSRPCContext) (*ctypes.ResultUnsubscribe, error) {
addr := wsCtx.GetRemoteAddr() addr := wsCtx.GetRemoteAddr()
logger.Info("Unsubscribe from all", "remote", addr) logger.Info("Unsubscribe from all", "remote", addr)
err := eventBus.UnsubscribeAll(context.Background(), addr)
err := eventBusFor(wsCtx).UnsubscribeAll(context.Background(), addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &ctypes.ResultUnsubscribe{}, nil return &ctypes.ResultUnsubscribe{}, nil
} }
func eventBusFor(wsCtx rpctypes.WSRPCContext) tmtypes.EventBusSubscriber {
es := wsCtx.GetEventSubscriber()
if es == nil {
es = eventBus
}
return es
}

+ 16
- 8
rpc/lib/server/handlers.go View File

@ -2,6 +2,7 @@ package rpcserver
import ( import (
"bytes" "bytes"
"context"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -433,8 +434,8 @@ type wsConnection struct {
// Send pings to server with this period. Must be less than readWait, but greater than zero. // Send pings to server with this period. Must be less than readWait, but greater than zero.
pingPeriod time.Duration pingPeriod time.Duration
// see DisconnectCallback option.
disconnectCallback func(remoteAddr string)
// object that is used to subscribe / unsubscribe from events
eventSub types.EventSubscriber
} }
// NewWSConnection wraps websocket.Conn. // NewWSConnection wraps websocket.Conn.
@ -467,11 +468,12 @@ func NewWSConnection(
return wsc return wsc
} }
// DisconnectCallback can be used optionally to set a callback, which will be
// called upon disconnect - not Goroutine-safe.
func DisconnectCallback(cb func(remoteAddr string)) func(*wsConnection) {
// EventSubscriber sets object that is used to subscribe / unsubscribe from
// events - not Goroutine-safe. If none given, default node's eventBus will be
// used.
func EventSubscriber(eventSub types.EventSubscriber) func(*wsConnection) {
return func(wsc *wsConnection) { return func(wsc *wsConnection) {
wsc.disconnectCallback = cb
wsc.eventSub = eventSub
} }
} }
@ -524,8 +526,9 @@ func (wsc *wsConnection) OnStart() error {
func (wsc *wsConnection) OnStop() { func (wsc *wsConnection) OnStop() {
// Both read and write loops close the websocket connection when they exit their loops. // Both read and write loops close the websocket connection when they exit their loops.
// The writeChan is never closed, to allow WriteRPCResponse() to fail. // The writeChan is never closed, to allow WriteRPCResponse() to fail.
if wsc.disconnectCallback != nil {
wsc.disconnectCallback(wsc.remoteAddr)
if wsc.eventSub != nil {
wsc.eventSub.UnsubscribeAll(context.TODO(), wsc.remoteAddr)
} }
} }
@ -535,6 +538,11 @@ func (wsc *wsConnection) GetRemoteAddr() string {
return wsc.remoteAddr return wsc.remoteAddr
} }
// GetEventSubscriber implements WSRPCConnection by returning event subscriber.
func (wsc *wsConnection) GetEventSubscriber() types.EventSubscriber {
return wsc.eventSub
}
// WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted. // WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted.
// It implements WSRPCConnection. It is Goroutine-safe. // It implements WSRPCConnection. It is Goroutine-safe.
func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) { func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {


+ 12
- 0
rpc/lib/types/types.go View File

@ -1,6 +1,7 @@
package rpctypes package rpctypes
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"reflect" "reflect"
@ -9,6 +10,9 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
amino "github.com/tendermint/go-amino" amino "github.com/tendermint/go-amino"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmtypes "github.com/tendermint/tendermint/types"
) )
// a wrapper to emulate a sum type: jsonrpcid = string | int // a wrapper to emulate a sum type: jsonrpcid = string | int
@ -237,6 +241,7 @@ type WSRPCConnection interface {
GetRemoteAddr() string GetRemoteAddr() string
WriteRPCResponse(resp RPCResponse) WriteRPCResponse(resp RPCResponse)
TryWriteRPCResponse(resp RPCResponse) bool TryWriteRPCResponse(resp RPCResponse) bool
GetEventSubscriber() EventSubscriber
Codec() *amino.Codec Codec() *amino.Codec
} }
@ -246,6 +251,13 @@ type WSRPCContext struct {
WSRPCConnection WSRPCConnection
} }
// EventSubscriber mirros tendermint/tendermint/types.EventBusSubscriber
type EventSubscriber interface {
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (tmtypes.Subscription, error)
Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
UnsubscribeAll(ctx context.Context, subscriber string) error
}
//---------------------------------------- //----------------------------------------
// SOCKETS // SOCKETS
// //


Loading…
Cancel
Save