You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

229 lines
5.6 KiB

package http
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"github.com/tendermint/tendermint/internal/pubsub"
rpcclient "github.com/tendermint/tendermint/rpc/client"
"github.com/tendermint/tendermint/rpc/coretypes"
jsonrpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client"
)
// wsEvents is a wrapper around WSClient, which implements EventsClient.
type wsEvents struct {
*rpcclient.RunState
ws *jsonrpcclient.WSClient
mtx sync.RWMutex
subscriptions map[string]*wsSubscription
}
type wsSubscription struct {
res chan coretypes.ResultEvent
id string
query string
}
var _ rpcclient.EventsClient = (*wsEvents)(nil)
func newWsEvents(remote string) (*wsEvents, error) {
w := &wsEvents{
RunState: rpcclient.NewRunState("wsEvents", nil),
subscriptions: make(map[string]*wsSubscription),
}
var err error
w.ws, err = jsonrpcclient.NewWS(strings.TrimSuffix(remote, "/"), "/websocket")
if err != nil {
return nil, fmt.Errorf("can't create WS client: %w", err)
}
w.ws.OnReconnect(func() {
// resubscribe immediately
w.redoSubscriptionsAfter(0 * time.Second)
})
w.ws.Logger = w.Logger
return w, nil
}
// Start starts the websocket client and the event loop.
func (w *wsEvents) Start(ctx context.Context) error {
if err := w.ws.Start(ctx); err != nil {
return err
}
go w.eventListener(ctx)
return nil
}
// IsRunning reports whether the websocket client is running.
func (w *wsEvents) IsRunning() bool { return w.ws.IsRunning() }
// Stop shuts down the websocket client.
func (w *wsEvents) Stop() error { return w.ws.Stop() }
// Subscribe implements EventsClient by using WSClient to subscribe given
// subscriber to query. By default, it returns a channel with cap=1. Error is
// returned if it fails to subscribe.
//
// When reading from the channel, keep in mind there's a single events loop, so
// if you don't read events for this subscription fast enough, other
// subscriptions will slow down in effect.
//
// The channel is never closed to prevent clients from seeing an erroneous
// event.
//
// It returns an error if wsEvents is not running.
func (w *wsEvents) Subscribe(ctx context.Context, subscriber, query string,
outCapacity ...int) (out <-chan coretypes.ResultEvent, err error) {
if !w.IsRunning() {
return nil, rpcclient.ErrClientNotRunning
}
if err := w.ws.Subscribe(ctx, query); err != nil {
return nil, err
}
outCap := 1
if len(outCapacity) > 0 {
outCap = outCapacity[0]
}
outc := make(chan coretypes.ResultEvent, outCap)
w.mtx.Lock()
defer w.mtx.Unlock()
// subscriber param is ignored because Tendermint will override it with
// remote IP anyway.
w.subscriptions[query] = &wsSubscription{res: outc, query: query}
return outc, nil
}
// Unsubscribe implements EventsClient by using WSClient to unsubscribe given
// subscriber from query.
//
// It returns an error if wsEvents is not running.
func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) error {
if !w.IsRunning() {
return rpcclient.ErrClientNotRunning
}
if err := w.ws.Unsubscribe(ctx, query); err != nil {
return err
}
w.mtx.Lock()
info, ok := w.subscriptions[query]
if ok {
if info.id != "" {
delete(w.subscriptions, info.id)
}
delete(w.subscriptions, info.query)
}
w.mtx.Unlock()
return nil
}
// UnsubscribeAll implements EventsClient by using WSClient to unsubscribe
// given subscriber from all the queries.
//
// It returns an error if wsEvents is not running.
func (w *wsEvents) UnsubscribeAll(ctx context.Context, subscriber string) error {
if !w.IsRunning() {
return rpcclient.ErrClientNotRunning
}
if err := w.ws.UnsubscribeAll(ctx); err != nil {
return err
}
w.mtx.Lock()
w.subscriptions = make(map[string]*wsSubscription)
w.mtx.Unlock()
return nil
}
// After being reconnected, it is necessary to redo subscription to server
// otherwise no data will be automatically received.
func (w *wsEvents) redoSubscriptionsAfter(d time.Duration) {
time.Sleep(d)
ctx := context.Background()
w.mtx.Lock()
defer w.mtx.Unlock()
for q, info := range w.subscriptions {
if q != "" && q == info.id {
continue
}
err := w.ws.Subscribe(ctx, q)
if err != nil {
w.Logger.Error("failed to resubscribe", "query", q, "err", err)
delete(w.subscriptions, q)
}
}
}
func isErrAlreadySubscribed(err error) bool {
return strings.Contains(err.Error(), pubsub.ErrAlreadySubscribed.Error())
}
func (w *wsEvents) eventListener(ctx context.Context) {
for {
select {
case resp, ok := <-w.ws.ResponsesCh:
if !ok {
return
}
if resp.Error != nil {
w.Logger.Error("WS error", "err", resp.Error.Error())
// Error can be ErrAlreadySubscribed or max client (subscriptions per
// client) reached or Tendermint exited.
// We can ignore ErrAlreadySubscribed, but need to retry in other
// cases.
if !isErrAlreadySubscribed(resp.Error) {
// Resubscribe after 1 second to give Tendermint time to restart (if
// crashed).
w.redoSubscriptionsAfter(1 * time.Second)
}
continue
}
result := new(coretypes.ResultEvent)
err := json.Unmarshal(resp.Result, result)
if err != nil {
w.Logger.Error("failed to unmarshal response", "err", err)
continue
}
w.mtx.RLock()
out, ok := w.subscriptions[result.Query]
if ok {
if _, idOk := w.subscriptions[result.SubscriptionID]; !idOk {
out.id = result.SubscriptionID
w.subscriptions[result.SubscriptionID] = out
}
}
w.mtx.RUnlock()
if ok {
select {
case out.res <- *result:
case <-ctx.Done():
return
}
}
case <-ctx.Done():
return
}
}
}