diff --git a/CHANGELOG.md b/CHANGELOG.md index 76a34d5c3..5e698a2a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,14 @@ BUG FIXES: - Graceful handling/recovery for apps that have non-determinism or fail to halt - Graceful handling/recovery for violations of safety, or liveness +## 0.14.0 (TBD) + +BREAKING CHANGES: +- rpc/client: changed Subscribe/Unsubscribe/UnsubscribeAll funcs signatures to be identical to event bus. + +IMPROVEMENTS: +- rpc/client: can act as event bus subscriber (See https://github.com/tendermint/tendermint/issues/945). + ## 0.13.0 (December 6, 2017) BREAKING CHANGES: diff --git a/glide.lock b/glide.lock index 82846067b..d18ccf6e2 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 09fc7f59ca6b718fe236368bb55f4801455295cfe455ea5865d544ee4dcfdc08 -updated: 2017-12-06T03:31:34.476581624-05:00 +hash: f420f1f858100218dad50997d939eaaf129ff654a0648a47ddc60d626ab0b8e9 +updated: 2017-12-10T05:37:46.41123196Z imports: - name: github.com/btcsuite/btcd version: 2e60448ffcc6bf78332d1fe590260095f554dd78 @@ -129,7 +129,7 @@ imports: subpackages: - iavl - name: github.com/tendermint/tmlibs - version: bfcc0217f120d3bee6730ba0789d2eb72fc2e889 + version: e4ef2835f0081c2ece83b9c1f777cf071f956e81 subpackages: - autofile - cli diff --git a/glide.yaml b/glide.yaml index 3f20a4680..e614d0a1e 100644 --- a/glide.yaml +++ b/glide.yaml @@ -34,7 +34,7 @@ import: subpackages: - iavl - package: github.com/tendermint/tmlibs - version: ~0.5.0 + version: e4ef2835f0081c2ece83b9c1f777cf071f956e81 subpackages: - autofile - cli diff --git a/node/node.go b/node/node.go index 2f200abc5..352c13ccc 100644 --- a/node/node.go +++ b/node/node.go @@ -2,7 +2,6 @@ package node import ( "bytes" - "context" "encoding/json" "errors" "fmt" @@ -441,13 +440,7 @@ func (n *Node) startRPC() ([]net.Listener, error) { for i, listenAddr := range listenAddrs { mux := http.NewServeMux() rpcLogger := n.Logger.With("module", "rpc-server") - onDisconnect := rpcserver.OnDisconnect(func(remoteAddr string) { - err := n.eventBus.UnsubscribeAll(context.Background(), remoteAddr) - if err != nil { - rpcLogger.Error("Error unsubsribing from all on disconnect", "err", err) - } - }) - wm := rpcserver.NewWebsocketManager(rpccore.Routes, onDisconnect) + wm := rpcserver.NewWebsocketManager(rpccore.Routes, rpcserver.EventSubscriber(n.eventBus)) wm.SetLogger(rpcLogger.With("protocol", "websocket")) mux.HandleFunc("/websocket", wm.WebsocketHandler) rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger) diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index e41c2d657..e7a84b6b4 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -2,7 +2,6 @@ package client import ( "context" - "fmt" "time" "github.com/pkg/errors" @@ -57,19 +56,20 @@ func WaitForHeight(c StatusClient, h int64, waiter Waiter) error { // // This handles subscribing and unsubscribing under the hood func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (types.TMEventData, error) { + const subscriber = "helpers" ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() evts := make(chan interface{}, 1) // register for the next event of this type - query := fmt.Sprintf("%s='%s'", types.EventTypeKey, evtTyp) - err := c.Subscribe(ctx, query, evts) + query := types.QueryForEvent(evtTyp) + err := c.Subscribe(ctx, subscriber, query, evts) if err != nil { return types.TMEventData{}, errors.Wrap(err, "failed to subscribe") } // make sure to unregister after the test is over - defer c.Unsubscribe(ctx, query) + defer c.UnsubscribeAll(ctx, subscriber) select { case evt := <-evts: diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index 1f49ea4d7..2ecfa7958 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -3,7 +3,6 @@ package client import ( "context" "encoding/json" - "fmt" "sync" "github.com/pkg/errors" @@ -13,6 +12,7 @@ import ( rpcclient "github.com/tendermint/tendermint/rpc/lib/client" "github.com/tendermint/tendermint/types" cmn "github.com/tendermint/tmlibs/common" + tmpubsub "github.com/tendermint/tmlibs/pubsub" ) /* @@ -204,20 +204,14 @@ type WSEvents struct { endpoint string ws *rpcclient.WSClient - subscriptions map[string]chan<- interface{} mtx sync.RWMutex - - // used for signaling the goroutine that feeds ws -> EventSwitch - quit chan bool - done chan bool + subscriptions map[string]chan<- interface{} } func newWSEvents(remote, endpoint string) *WSEvents { wsEvents := &WSEvents{ endpoint: endpoint, remote: remote, - quit: make(chan bool, 1), - done: make(chan bool, 1), subscriptions: make(map[string]chan<- interface{}), } @@ -225,87 +219,86 @@ func newWSEvents(remote, endpoint string) *WSEvents { return wsEvents } -// Start is the only way I could think the extend OnStart from -// events.eventSwitch. If only it wasn't private... -// BaseService.Start -> eventSwitch.OnStart -> WSEvents.Start -func (w *WSEvents) Start() error { - ws := rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() { +func (w *WSEvents) OnStart() error { + w.ws = rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() { w.redoSubscriptions() })) - err := ws.Start() - if err == nil { - w.ws = ws - go w.eventListener() + err := w.ws.Start() + if err != nil { + return err } - return err -} -// Stop wraps the BaseService/eventSwitch actions as Start does -func (w *WSEvents) Stop() error { - // send a message to quit to stop the eventListener - w.quit <- true - <-w.done - w.ws.Stop() - w.ws = nil + go w.eventListener() return nil } -func (w *WSEvents) Subscribe(ctx context.Context, query string, out chan<- interface{}) error { - if ch := w.getSubscription(query); ch != nil { - return errors.New("already subscribed") +// Stop wraps the BaseService/eventSwitch actions as Start does +func (w *WSEvents) OnStop() { + err := w.ws.Stop() + if err != nil { + w.Logger.Error("failed to stop WSClient", "err", err) } +} - err := w.ws.Subscribe(ctx, query) +func (w *WSEvents) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error { + q := query.String() + + err := w.ws.Subscribe(ctx, q) if err != nil { - return errors.Wrap(err, "failed to subscribe") + return err } w.mtx.Lock() - w.subscriptions[query] = out + // subscriber param is ignored because Tendermint will override it with + // remote IP anyway. + w.subscriptions[q] = out w.mtx.Unlock() return nil } -func (w *WSEvents) Unsubscribe(ctx context.Context, query string) error { - err := w.ws.Unsubscribe(ctx, query) +func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error { + q := query.String() + + err := w.ws.Unsubscribe(ctx, q) if err != nil { return err } w.mtx.Lock() - defer w.mtx.Unlock() - ch, ok := w.subscriptions[query] + ch, ok := w.subscriptions[q] if ok { close(ch) - delete(w.subscriptions, query) + delete(w.subscriptions, q) } + w.mtx.Unlock() return nil } -func (w *WSEvents) UnsubscribeAll(ctx context.Context) error { +func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error { err := w.ws.UnsubscribeAll(ctx) if err != nil { return err } w.mtx.Lock() - defer w.mtx.Unlock() for _, ch := range w.subscriptions { close(ch) } w.subscriptions = make(map[string]chan<- interface{}) + w.mtx.Unlock() + return nil } // After being reconnected, it is necessary to redo subscription to server // otherwise no data will be automatically received. func (w *WSEvents) redoSubscriptions() { - for query := range w.subscriptions { + for q := range w.subscriptions { // NOTE: no timeout for resubscribing // FIXME: better logging/handling of errors?? - w.ws.Subscribe(context.Background(), query) + w.ws.Subscribe(context.Background(), q) } } @@ -316,34 +309,29 @@ func (w *WSEvents) redoSubscriptions() { func (w *WSEvents) eventListener() { for { select { - case resp := <-w.ws.ResponsesCh: - // res is json.RawMessage + case resp, ok := <-w.ws.ResponsesCh: + if !ok { + return + } if resp.Error != nil { - // FIXME: better logging/handling of errors?? - fmt.Printf("ws err: %+v\n", resp.Error.Error()) + w.Logger.Error("WS error", "err", resp.Error.Error()) continue } result := new(ctypes.ResultEvent) err := json.Unmarshal(resp.Result, result) if err != nil { - // ignore silently (eg. subscribe, unsubscribe and maybe other events) - // TODO: ? + w.Logger.Error("failed to unmarshal response", "err", err) continue } - if ch := w.getSubscription(result.Query); ch != nil { + // NOTE: writing also happens inside mutex so we can't close a channel in + // Unsubscribe/UnsubscribeAll. + w.mtx.RLock() + if ch, ok := w.subscriptions[result.Query]; ok { ch <- result.Data } - case <-w.quit: - // send a message so we can wait for the routine to exit - // before cleaning up the w.ws stuff - w.done <- true + w.mtx.RUnlock() + case <-w.Quit: return } } } - -func (w *WSEvents) getSubscription(query string) chan<- interface{} { - w.mtx.RLock() - defer w.mtx.RUnlock() - return w.subscriptions[query] -} diff --git a/rpc/client/interface.go b/rpc/client/interface.go index c38f188ee..063d50e19 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -20,8 +20,6 @@ implementation. package client import ( - "context" - data "github.com/tendermint/go-wire/data" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" @@ -89,7 +87,5 @@ type NetworkClient interface { // EventsClient is reactive, you can subscribe to any message, given the proper // string. see tendermint/types/events.go type EventsClient interface { - Subscribe(ctx context.Context, query string, out chan<- interface{}) error - Unsubscribe(ctx context.Context, query string) error - UnsubscribeAll(ctx context.Context) error + types.EventBusSubscriber } diff --git a/rpc/client/localclient.go b/rpc/client/localclient.go index 40c249123..18c6759de 100644 --- a/rpc/client/localclient.go +++ b/rpc/client/localclient.go @@ -3,19 +3,12 @@ package client import ( "context" - "github.com/pkg/errors" - data "github.com/tendermint/go-wire/data" nm "github.com/tendermint/tendermint/node" "github.com/tendermint/tendermint/rpc/core" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" - tmquery "github.com/tendermint/tmlibs/pubsub/query" -) - -const ( - // event bus subscriber - subscriber = "rpc-localclient" + tmpubsub "github.com/tendermint/tmlibs/pubsub" ) /* @@ -33,10 +26,7 @@ For real clients, you probably want to use client.HTTP. For more powerful control during testing, you probably want the "client/mock" package. */ type Local struct { - node *nm.Node - *types.EventBus - subscriptions map[string]*tmquery.Query } // NewLocal configures a client that calls the Node directly. @@ -48,9 +38,7 @@ type Local struct { func NewLocal(node *nm.Node) *Local { node.ConfigureRPC() return &Local{ - node: node, - EventBus: node.EventBus(), - subscriptions: make(map[string]*tmquery.Query), + EventBus: node.EventBus(), } } @@ -68,7 +56,7 @@ func (Local) ABCIInfo() (*ctypes.ResultABCIInfo, error) { return core.ABCIInfo() } -func (c Local) ABCIQuery(path string, data data.Bytes) (*ctypes.ResultABCIQuery, error) { +func (c *Local) ABCIQuery(path string, data data.Bytes) (*ctypes.ResultABCIQuery, error) { return c.ABCIQueryWithOptions(path, data, DefaultABCIQueryOptions) } @@ -128,34 +116,14 @@ func (Local) TxSearch(query string, prove bool) ([]*ctypes.ResultTx, error) { return core.TxSearch(query, prove) } -func (c *Local) Subscribe(ctx context.Context, query string, out chan<- interface{}) error { - q, err := tmquery.New(query) - if err != nil { - return errors.Wrap(err, "failed to subscribe") - } - if err = c.EventBus.Subscribe(ctx, subscriber, q, out); err != nil { - return errors.Wrap(err, "failed to subscribe") - } - c.subscriptions[query] = q - return nil +func (c *Local) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error { + return c.EventBus.Subscribe(ctx, subscriber, query, out) } -func (c *Local) Unsubscribe(ctx context.Context, query string) error { - q, ok := c.subscriptions[query] - if !ok { - return errors.New("subscription not found") - } - if err := c.EventBus.Unsubscribe(ctx, subscriber, q); err != nil { - return errors.Wrap(err, "failed to unsubscribe") - } - delete(c.subscriptions, query) - return nil +func (c *Local) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error { + return c.EventBus.Unsubscribe(ctx, subscriber, query) } -func (c *Local) UnsubscribeAll(ctx context.Context) error { - if err := c.EventBus.UnsubscribeAll(ctx, subscriber); err != nil { - return errors.Wrap(err, "failed to unsubscribe") - } - c.subscriptions = make(map[string]*tmquery.Query) - return nil +func (c *Local) UnsubscribeAll(ctx context.Context, subscriber string) error { + return c.EventBus.UnsubscribeAll(ctx, subscriber) } diff --git a/rpc/core/events.go b/rpc/core/events.go index 81f1c919a..538134b0f 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -44,20 +44,15 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri q, err := tmquery.New(query) if err != nil { - return nil, errors.Wrap(err, "failed to parse a query") - } - - err = wsCtx.AddSubscription(query, q) - if err != nil { - return nil, errors.Wrap(err, "failed to add subscription") + return nil, errors.Wrap(err, "failed to parse query") } ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) defer cancel() ch := make(chan interface{}) - err = eventBus.Subscribe(ctx, addr, q, ch) + err = eventBusFor(wsCtx).Subscribe(ctx, addr, q, ch) if err != nil { - return nil, errors.Wrap(err, "failed to subscribe") + return nil, err } go func() { @@ -100,18 +95,31 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri func Unsubscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultUnsubscribe, error) { addr := wsCtx.GetRemoteAddr() logger.Info("Unsubscribe from query", "remote", addr, "query", query) - q, ok := wsCtx.DeleteSubscription(query) - if !ok { - return nil, errors.New("subscription not found") + q, err := tmquery.New(query) + if err != nil { + return nil, errors.Wrap(err, "failed to parse query") + } + err = eventBusFor(wsCtx).Unsubscribe(context.Background(), addr, q) + if err != nil { + return nil, err } - eventBus.Unsubscribe(context.Background(), addr, q.(*tmquery.Query)) return &ctypes.ResultUnsubscribe{}, nil } func UnsubscribeAll(wsCtx rpctypes.WSRPCContext) (*ctypes.ResultUnsubscribe, error) { addr := wsCtx.GetRemoteAddr() logger.Info("Unsubscribe from all", "remote", addr) - eventBus.UnsubscribeAll(context.Background(), addr) - wsCtx.DeleteAllSubscriptions() + err := eventBusFor(wsCtx).UnsubscribeAll(context.Background(), addr) + if err != nil { + return nil, err + } return &ctypes.ResultUnsubscribe{}, nil } + +func eventBusFor(wsCtx rpctypes.WSRPCContext) tmtypes.EventBusSubscriber { + es := wsCtx.GetEventSubscriber() + if es == nil { + es = eventBus + } + return es +} diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index c81821690..1e14ea9a0 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -2,6 +2,7 @@ package rpcserver import ( "bytes" + "context" "encoding/hex" "encoding/json" "fmt" @@ -366,8 +367,6 @@ type wsConnection struct { funcMap map[string]*RPCFunc - subscriptions map[string]interface{} - // write channel capacity writeChanCapacity int @@ -380,8 +379,8 @@ type wsConnection struct { // Send pings to server with this period. Must be less than readWait, but greater than zero. pingPeriod time.Duration - // called before stopping the connection. - onDisconnect func(remoteAddr string) + // object that is used to subscribe / unsubscribe from events + eventSub types.EventSubscriber } // NewWSConnection wraps websocket.Conn. @@ -395,7 +394,6 @@ func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, opti remoteAddr: baseConn.RemoteAddr().String(), baseConn: baseConn, funcMap: funcMap, - subscriptions: make(map[string]interface{}), writeWait: defaultWSWriteWait, writeChanCapacity: defaultWSWriteChanCapacity, readWait: defaultWSReadWait, @@ -408,6 +406,15 @@ func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, opti return wsc } +// EventSubscriber sets object that is used to subscribe / unsubscribe from +// events - not Goroutine-safe. If none given, default node's eventBus will be +// used. +func EventSubscriber(eventSub types.EventSubscriber) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.eventSub = eventSub + } +} + // WriteWait sets the amount of time to wait before a websocket write times out. // It should only be used in the constructor - not Goroutine-safe. func WriteWait(writeWait time.Duration) func(*wsConnection) { @@ -440,14 +447,6 @@ func PingPeriod(pingPeriod time.Duration) func(*wsConnection) { } } -// OnDisconnect called before stopping the connection. -// It should only be used in the constructor - not Goroutine-safe. -func OnDisconnect(cb func(remoteAddr string)) func(*wsConnection) { - return func(wsc *wsConnection) { - wsc.onDisconnect = cb - } -} - // OnStart implements cmn.Service by starting the read and write routines. It // blocks until the connection closes. func (wsc *wsConnection) OnStart() error { @@ -461,12 +460,12 @@ func (wsc *wsConnection) OnStart() error { return nil } -// OnStop implements cmn.Service by calling OnDisconnect callback. +// OnStop implements cmn.Service by unsubscribing remoteAddr from all subscriptions. func (wsc *wsConnection) OnStop() { // Both read and write loops close the websocket connection when they exit their loops. // The writeChan is never closed, to allow WriteRPCResponse() to fail. - if wsc.onDisconnect != nil { - wsc.onDisconnect(wsc.remoteAddr) + if wsc.eventSub != nil { + wsc.eventSub.UnsubscribeAll(context.TODO(), wsc.remoteAddr) } } @@ -476,6 +475,11 @@ func (wsc *wsConnection) GetRemoteAddr() string { return wsc.remoteAddr } +// GetEventSubscriber implements WSRPCConnection by returning event subscriber. +func (wsc *wsConnection) GetEventSubscriber() types.EventSubscriber { + return wsc.eventSub +} + // WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted. // It implements WSRPCConnection. It is Goroutine-safe. func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) { @@ -499,28 +503,6 @@ func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool { } } -func (wsc *wsConnection) AddSubscription(query string, data interface{}) error { - if _, ok := wsc.subscriptions[query]; ok { - return errors.New("Already subscribed") - } - - wsc.subscriptions[query] = data - return nil -} - -func (wsc *wsConnection) DeleteSubscription(query string) (interface{}, bool) { - data, ok := wsc.subscriptions[query] - if ok { - delete(wsc.subscriptions, query) - return data, true - } - return nil, false -} - -func (wsc *wsConnection) DeleteAllSubscriptions() { - wsc.subscriptions = make(map[string]interface{}) -} - // Read from the socket and subscribe to or unsubscribe from events func (wsc *wsConnection) readRoutine() { defer func() { diff --git a/rpc/lib/types/types.go b/rpc/lib/types/types.go index bac7c2409..37d451457 100644 --- a/rpc/lib/types/types.go +++ b/rpc/lib/types/types.go @@ -1,11 +1,13 @@ package rpctypes import ( + "context" "encoding/json" "fmt" "strings" "github.com/pkg/errors" + tmpubsub "github.com/tendermint/tmlibs/pubsub" ) //---------------------------------------- @@ -135,10 +137,14 @@ type WSRPCConnection interface { GetRemoteAddr() string WriteRPCResponse(resp RPCResponse) TryWriteRPCResponse(resp RPCResponse) bool + GetEventSubscriber() EventSubscriber +} - AddSubscription(string, interface{}) error - DeleteSubscription(string) (interface{}, bool) - DeleteAllSubscriptions() +// EventSubscriber mirros tendermint/tendermint/types.EventBusSubscriber +type EventSubscriber interface { + Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error + Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error + UnsubscribeAll(ctx context.Context, subscriber string) error } // websocket-only RPCFuncs take this as the first parameter. diff --git a/types/event_bus.go b/types/event_bus.go index 6cee1d82b..6b6069b90 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -12,6 +12,12 @@ import ( const defaultCapacity = 1000 +type EventBusSubscriber interface { + Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error + Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error + UnsubscribeAll(ctx context.Context, subscriber string) error +} + // EventBus is a common bus for all events going through the system. All calls // are proxied to underlying pubsub server. All events must be published using // EventBus to ensure correct data types. diff --git a/types/events.go b/types/events.go index 08ebf46da..5c41c6df6 100644 --- a/types/events.go +++ b/types/events.go @@ -146,32 +146,32 @@ const ( ) var ( - EventQueryBond = queryForEvent(EventBond) - EventQueryUnbond = queryForEvent(EventUnbond) - EventQueryRebond = queryForEvent(EventRebond) - EventQueryDupeout = queryForEvent(EventDupeout) - EventQueryFork = queryForEvent(EventFork) - EventQueryNewBlock = queryForEvent(EventNewBlock) - EventQueryNewBlockHeader = queryForEvent(EventNewBlockHeader) - EventQueryNewRound = queryForEvent(EventNewRound) - EventQueryNewRoundStep = queryForEvent(EventNewRoundStep) - EventQueryTimeoutPropose = queryForEvent(EventTimeoutPropose) - EventQueryCompleteProposal = queryForEvent(EventCompleteProposal) - EventQueryPolka = queryForEvent(EventPolka) - EventQueryUnlock = queryForEvent(EventUnlock) - EventQueryLock = queryForEvent(EventLock) - EventQueryRelock = queryForEvent(EventRelock) - EventQueryTimeoutWait = queryForEvent(EventTimeoutWait) - EventQueryVote = queryForEvent(EventVote) - EventQueryProposalHeartbeat = queryForEvent(EventProposalHeartbeat) - EventQueryTx = queryForEvent(EventTx) + EventQueryBond = QueryForEvent(EventBond) + EventQueryUnbond = QueryForEvent(EventUnbond) + EventQueryRebond = QueryForEvent(EventRebond) + EventQueryDupeout = QueryForEvent(EventDupeout) + EventQueryFork = QueryForEvent(EventFork) + EventQueryNewBlock = QueryForEvent(EventNewBlock) + EventQueryNewBlockHeader = QueryForEvent(EventNewBlockHeader) + EventQueryNewRound = QueryForEvent(EventNewRound) + EventQueryNewRoundStep = QueryForEvent(EventNewRoundStep) + EventQueryTimeoutPropose = QueryForEvent(EventTimeoutPropose) + EventQueryCompleteProposal = QueryForEvent(EventCompleteProposal) + EventQueryPolka = QueryForEvent(EventPolka) + EventQueryUnlock = QueryForEvent(EventUnlock) + EventQueryLock = QueryForEvent(EventLock) + EventQueryRelock = QueryForEvent(EventRelock) + EventQueryTimeoutWait = QueryForEvent(EventTimeoutWait) + EventQueryVote = QueryForEvent(EventVote) + EventQueryProposalHeartbeat = QueryForEvent(EventProposalHeartbeat) + EventQueryTx = QueryForEvent(EventTx) ) func EventQueryTxFor(tx Tx) tmpubsub.Query { return tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s='%X'", EventTypeKey, EventTx, TxHashKey, tx.Hash())) } -func queryForEvent(eventType string) tmpubsub.Query { +func QueryForEvent(eventType string) tmpubsub.Query { return tmquery.MustParse(fmt.Sprintf("%s='%s'", EventTypeKey, eventType)) }