Browse Source

limit number of /subscribe clients and queries per client (#3269)

* limit number of /subscribe clients and queries per client

Add the following config variables (under [rpc] section):
  * max_subscription_clients
  * max_subscriptions_per_client
  * timeout_broadcast_tx_commit

Fixes #2826

new HTTPClient interface for subscriptions

finalize HTTPClient events interface

remove EventSubscriber

fix data race

```
WARNING: DATA RACE
Read at 0x00c000a36060 by goroutine 129:
  github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe.func1()
      /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:168 +0x1f0

Previous write at 0x00c000a36060 by goroutine 132:
  github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe()
      /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:191 +0x4e0
  github.com/tendermint/tendermint/rpc/client.WaitForOneEvent()
      /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178
  github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1()
      /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:827 +0x162

Goroutine 129 (running) created at:
  github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe()
      /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:164 +0x4b7
  github.com/tendermint/tendermint/rpc/client.WaitForOneEvent()
      /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178
  github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1()
      /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:827 +0x162

Goroutine 132 (running) created at:
  testing.(*T).Run()
      /usr/local/go/src/testing/testing.go:878 +0x659
  github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync()
      /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:119 +0x186
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:827 +0x162
==================
```

lite client works (tested manually)

godoc comments

httpclient: do not close the out channel

use TimeoutBroadcastTxCommit

no timeout for unsubscribe

but 1s Local (5s HTTP) timeout for resubscribe

format code

change Subscribe#out cap to 1

and replace config vars with RPCConfig

TimeoutBroadcastTxCommit can't be greater than rpcserver.WriteTimeout

rpc: Context as first parameter to all functions

reformat code

fixes after my own review

fixes after Ethan's review

add test stubs

fix config.toml

* fixes after manual testing

- rpc: do not recommend to use BroadcastTxCommit because it's slow and wastes
Tendermint resources (pubsub)
- rpc: better error in Subscribe and BroadcastTxCommit
- HTTPClient: do not resubscribe if err = ErrAlreadySubscribed

* fixes after Ismail's review

* Update rpc/grpc/grpc_test.go

Co-Authored-By: melekes <anton.kalyaev@gmail.com>
pull/3411/head
Anton Kaliaev 5 years ago
committed by GitHub
parent
commit
d741c7b478
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 658 additions and 351 deletions
  1. +2
    -2
      CHANGELOG_PENDING.md
  2. +30
    -0
      config/config.go
  3. +13
    -0
      config/toml.go
  4. +13
    -0
      docs/tendermint-core/configuration.md
  5. +14
    -0
      libs/pubsub/pubsub.go
  6. +4
    -0
      libs/pubsub/pubsub_test.go
  7. +10
    -6
      lite/proxy/proxy.go
  8. +53
    -0
      lite/proxy/wrapper.go
  9. +11
    -2
      node/node.go
  10. +6
    -0
      rpc/client/event_test.go
  11. +3
    -5
      rpc/client/helpers.go
  12. +77
    -80
      rpc/client/httpclient.go
  13. +14
    -1
      rpc/client/interface.go
  14. +139
    -49
      rpc/client/localclient.go
  15. +15
    -14
      rpc/client/mock/client.go
  16. +3
    -2
      rpc/core/abci.go
  17. +5
    -4
      rpc/core/blocks.go
  18. +5
    -4
      rpc/core/consensus.go
  19. +9
    -4
      rpc/core/dev.go
  20. +34
    -27
      rpc/core/events.go
  21. +2
    -1
      rpc/core/health.go
  22. +31
    -13
      rpc/core/mempool.go
  23. +5
    -4
      rpc/core/net.go
  24. +8
    -0
      rpc/core/pipe.go
  25. +2
    -1
      rpc/core/status.go
  26. +3
    -2
      rpc/core/tx.go
  27. +5
    -2
      rpc/grpc/api.go
  28. +2
    -1
      rpc/grpc/client_server.go
  29. +3
    -4
      rpc/grpc/grpc_test.go
  30. +5
    -5
      rpc/lib/rpc_test.go
  31. +45
    -52
      rpc/lib/server/handlers.go
  32. +2
    -2
      rpc/lib/server/handlers_test.go
  33. +42
    -48
      rpc/lib/server/parse_test.go
  34. +3
    -1
      rpc/lib/test/main.go
  35. +29
    -15
      rpc/lib/types/types.go
  36. +11
    -0
      types/event_bus.go

+ 2
- 2
CHANGELOG_PENDING.md View File

@ -7,7 +7,7 @@ Special thanks to external contributors on this release:
### BREAKING CHANGES:
* CLI/RPC/Config
- [httpclient] Update Subscribe interface to reflect new pubsub/eventBus API [ADR-33](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-033-pubsub.md)
- [rpc/client] Update Subscribe interface to reflect new pubsub/eventBus API [ADR-33](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-033-pubsub.md)
* Apps
@ -27,6 +27,7 @@ Special thanks to external contributors on this release:
- [config] \#2920 Remove `consensus.blocktime_iota` parameter
- [genesis] \#2920 Add `time_iota_ms` to block's consensus parameters (not exposed to the application)
- [genesis] \#2920 Rename `consensus_params.block_size` to `consensus_params.block`
- [lite] add `/unsubscribe_all` endpoint, which allows you to unsubscribe from all events
### IMPROVEMENTS:
- [libs/common] \#3238 exit with zero (0) code upon receiving SIGTERM/SIGINT
@ -41,7 +42,6 @@ Special thanks to external contributors on this release:
- leveldb.aliveiters
### BUG FIXES:
- [p2p/conn] \#3347 Reject all-zero shared secrets in the Diffie-Hellman step of secret-connection
- [libs/pubsub] \#951, \#1880 use non-blocking send when dispatching messages [ADR-33](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-033-pubsub.md)
- [p2p] \#3369 do not panic when filter times out

+ 30
- 0
config/config.go View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/pkg/errors"
rpcserver "github.com/tendermint/tendermint/rpc/lib/server"
)
const (
@ -323,6 +324,19 @@ type RPCConfig struct {
// Should be < {ulimit -Sn} - {MaxNumInboundPeers} - {MaxNumOutboundPeers} - {N of wal, db and other open files}
// 1024 - 40 - 10 - 50 = 924 = ~900
MaxOpenConnections int `mapstructure:"max_open_connections"`
// Maximum number of unique clientIDs that can /subscribe
// If you're using /broadcast_tx_commit, set to the estimated maximum number
// of broadcast_tx_commit calls per block.
MaxSubscriptionClients int `mapstructure:"max_subscription_clients"`
// Maximum number of unique queries a given client can /subscribe to
// If you're using GRPC (or Local RPC client) and /broadcast_tx_commit, set
// to the estimated maximum number of broadcast_tx_commit calls per block.
MaxSubscriptionsPerClient int `mapstructure:"max_subscriptions_per_client"`
// How long to wait for a tx to be committed during /broadcast_tx_commit
TimeoutBroadcastTxCommit time.Duration `mapstructure:"timeout_broadcast_tx_commit"`
}
// DefaultRPCConfig returns a default configuration for the RPC server
@ -337,6 +351,10 @@ func DefaultRPCConfig() *RPCConfig {
Unsafe: false,
MaxOpenConnections: 900,
MaxSubscriptionClients: 100,
MaxSubscriptionsPerClient: 5,
TimeoutBroadcastTxCommit: 10 * time.Second,
}
}
@ -358,6 +376,18 @@ func (cfg *RPCConfig) ValidateBasic() error {
if cfg.MaxOpenConnections < 0 {
return errors.New("max_open_connections can't be negative")
}
if cfg.MaxSubscriptionClients < 0 {
return errors.New("max_subscription_clients can't be negative")
}
if cfg.MaxSubscriptionsPerClient < 0 {
return errors.New("max_subscriptions_per_client can't be negative")
}
if cfg.TimeoutBroadcastTxCommit < 0 {
return errors.New("timeout_broadcast_tx_commit can't be negative")
}
if cfg.TimeoutBroadcastTxCommit > rpcserver.WriteTimeout {
return fmt.Errorf("timeout_broadcast_tx_commit can't be greater than rpc server's write timeout: %v", rpcserver.WriteTimeout)
}
return nil
}


+ 13
- 0
config/toml.go View File

@ -165,6 +165,19 @@ unsafe = {{ .RPC.Unsafe }}
# 1024 - 40 - 10 - 50 = 924 = ~900
max_open_connections = {{ .RPC.MaxOpenConnections }}
# Maximum number of unique clientIDs that can /subscribe
# If you're using /broadcast_tx_commit, set to the estimated maximum number
# of broadcast_tx_commit calls per block.
max_subscription_clients = {{ .RPC.MaxSubscriptionClients }}
# Maximum number of unique queries a given client can /subscribe to
# If you're using GRPC (or Local RPC client) and /broadcast_tx_commit, set to
# the estimated # maximum number of broadcast_tx_commit calls per block.
max_subscriptions_per_client = {{ .RPC.MaxSubscriptionsPerClient }}
# How long to wait for a tx to be committed during /broadcast_tx_commit.
timeout_broadcast_tx_commit = "{{ .RPC.TimeoutBroadcastTxCommit }}"
##### peer to peer configuration options #####
[p2p]


+ 13
- 0
docs/tendermint-core/configuration.md View File

@ -111,6 +111,19 @@ unsafe = false
# 1024 - 40 - 10 - 50 = 924 = ~900
max_open_connections = 900
# Maximum number of unique clientIDs that can /subscribe
# If you're using /broadcast_tx_commit, set to the estimated maximum number
# of broadcast_tx_commit calls per block.
max_subscription_clients = 100
# Maximum number of unique queries a given client can /subscribe to
# If you're using GRPC (or Local RPC client) and /broadcast_tx_commit, set to
# the estimated # maximum number of broadcast_tx_commit calls per block.
max_subscriptions_per_client = 5
# How long to wait for a tx to be committed during /broadcast_tx_commit.
timeout_broadcast_tx_commit = "10s"
##### peer to peer configuration options #####
[p2p]


+ 14
- 0
libs/pubsub/pubsub.go View File

@ -241,6 +241,20 @@ func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
}
}
// NumClients returns the number of clients.
func (s *Server) NumClients() int {
s.mtx.RLock()
defer s.mtx.RUnlock()
return len(s.subscriptions)
}
// NumClientSubscriptions returns the number of subscriptions the client has.
func (s *Server) NumClientSubscriptions(clientID string) int {
s.mtx.RLock()
defer s.mtx.RUnlock()
return len(s.subscriptions[clientID])
}
// Publish publishes the given message. An error will be returned to the caller
// if the context is canceled.
func (s *Server) Publish(ctx context.Context, msg interface{}) error {


+ 4
- 0
libs/pubsub/pubsub_test.go View File

@ -29,6 +29,10 @@ func TestSubscribe(t *testing.T) {
ctx := context.Background()
subscription, err := s.Subscribe(ctx, clientID, query.Empty{})
require.NoError(t, err)
assert.Equal(t, 1, s.NumClients())
assert.Equal(t, 1, s.NumClientSubscriptions(clientID))
err = s.Publish(ctx, "Ka-Zar")
require.NoError(t, err)
assertReceive(t, "Ka-Zar", subscription.Out())


+ 10
- 6
lite/proxy/proxy.go View File

@ -1,6 +1,7 @@
package proxy
import (
"context"
"net/http"
amino "github.com/tendermint/go-amino"
@ -34,7 +35,12 @@ func StartProxy(c rpcclient.Client, listenAddr string, logger log.Logger, maxOpe
mux := http.NewServeMux()
rpcserver.RegisterRPCFuncs(mux, r, cdc, logger)
wm := rpcserver.NewWebsocketManager(r, cdc, rpcserver.EventSubscriber(c))
unsubscribeFromAllEvents := func(remoteAddr string) {
if err := c.UnsubscribeAll(context.Background(), remoteAddr); err != nil {
logger.Error("Failed to unsubscribe from events", "err", err)
}
}
wm := rpcserver.NewWebsocketManager(r, cdc, rpcserver.OnDisconnect(unsubscribeFromAllEvents))
wm.SetLogger(logger)
core.SetLogger(logger)
mux.HandleFunc(wsEndpoint, wm.WebsocketHandler)
@ -51,13 +57,11 @@ func StartProxy(c rpcclient.Client, listenAddr string, logger log.Logger, maxOpe
//
// if we want security, the client must implement it as a secure client
func RPCRoutes(c rpcclient.Client) map[string]*rpcserver.RPCFunc {
return map[string]*rpcserver.RPCFunc{
// Subscribe/unsubscribe are reserved for websocket events.
// We can just use the core tendermint impl, which uses the
// EventSwitch we registered in NewWebsocketManager above
"subscribe": rpcserver.NewWSRPCFunc(core.Subscribe, "query"),
"unsubscribe": rpcserver.NewWSRPCFunc(core.Unsubscribe, "query"),
"subscribe": rpcserver.NewWSRPCFunc(c.(Wrapper).SubscribeWS, "query"),
"unsubscribe": rpcserver.NewWSRPCFunc(c.(Wrapper).UnsubscribeWS, "query"),
"unsubscribe_all": rpcserver.NewWSRPCFunc(c.(Wrapper).UnsubscribeAllWS, ""),
// info API
"status": rpcserver.NewRPCFunc(c.Status, ""),


+ 53
- 0
lite/proxy/wrapper.go View File

@ -1,12 +1,16 @@
package proxy
import (
"context"
"fmt"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/crypto/merkle"
"github.com/tendermint/tendermint/lite"
rpcclient "github.com/tendermint/tendermint/rpc/client"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
)
var _ rpcclient.Client = Wrapper{}
@ -149,6 +153,55 @@ func (w Wrapper) RegisterOpDecoder(typ string, dec merkle.OpDecoder) {
w.prt.RegisterOpDecoder(typ, dec)
}
// SubscribeWS subscribes for events using the given query and remote address as
// a subscriber, but does not verify responses (UNSAFE)!
func (w Wrapper) SubscribeWS(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) {
out, err := w.Client.Subscribe(context.Background(), ctx.RemoteAddr(), query)
if err != nil {
return nil, err
}
go func() {
for {
select {
case resultEvent := <-out:
// XXX(melekes) We should have a switch here that performs a validation
// depending on the event's type.
ctx.WSConn.TryWriteRPCResponse(
rpctypes.NewRPCSuccessResponse(
ctx.WSConn.Codec(),
rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", ctx.JSONReq.ID)),
resultEvent,
))
case <-w.Client.Quit():
return
}
}
}()
return &ctypes.ResultSubscribe{}, nil
}
// UnsubscribeWS calls original client's Unsubscribe using remote address as a
// subscriber.
func (w Wrapper) UnsubscribeWS(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) {
err := w.Client.Unsubscribe(context.Background(), ctx.RemoteAddr(), query)
if err != nil {
return nil, err
}
return &ctypes.ResultUnsubscribe{}, nil
}
// UnsubscribeAllWS calls original client's UnsubscribeAll using remote address
// as a subscriber.
func (w Wrapper) UnsubscribeAllWS(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) {
err := w.Client.UnsubscribeAll(context.Background(), ctx.RemoteAddr())
if err != nil {
return nil, err
}
return &ctypes.ResultUnsubscribe{}, nil
}
// // WrappedSwitch creates a websocket connection that auto-verifies any info
// // coming through before passing it along.
// //


+ 11
- 2
node/node.go View File

@ -26,6 +26,7 @@ import (
cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/pex"
@ -658,6 +659,7 @@ func (n *Node) ConfigureRPC() {
rpccore.SetConsensusReactor(n.consensusReactor)
rpccore.SetEventBus(n.eventBus)
rpccore.SetLogger(n.Logger.With("module", "rpc"))
rpccore.SetConfig(*n.config.RPC)
}
func (n *Node) startRPC() ([]net.Listener, error) {
@ -675,8 +677,15 @@ func (n *Node) startRPC() ([]net.Listener, error) {
for i, listenAddr := range listenAddrs {
mux := http.NewServeMux()
rpcLogger := n.Logger.With("module", "rpc-server")
wm := rpcserver.NewWebsocketManager(rpccore.Routes, coreCodec, rpcserver.EventSubscriber(n.eventBus))
wm.SetLogger(rpcLogger.With("protocol", "websocket"))
wmLogger := rpcLogger.With("protocol", "websocket")
wm := rpcserver.NewWebsocketManager(rpccore.Routes, coreCodec,
rpcserver.OnDisconnect(func(remoteAddr string) {
err := n.eventBus.UnsubscribeAll(context.Background(), remoteAddr)
if err != nil && err != tmpubsub.ErrSubscriptionNotFound {
wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err)
}
}))
wm.SetLogger(wmLogger)
mux.HandleFunc("/websocket", wm.WebsocketHandler)
rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger)


+ 6
- 0
rpc/client/event_test.go View File

@ -129,3 +129,9 @@ func testTxEventsSent(t *testing.T, broadcastMethod string) {
})
}
}
// Test HTTPClient resubscribes upon disconnect && subscription error.
// Test Local client resubscribes upon subscription error.
func TestClientsResubscribe(t *testing.T) {
// TODO(melekes)
}

+ 3
- 5
rpc/client/helpers.go View File

@ -61,7 +61,7 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type
defer cancel()
// register for the next event of this type
sub, err := c.Subscribe(ctx, subscriber, types.QueryForEvent(evtTyp))
eventCh, err := c.Subscribe(ctx, subscriber, types.QueryForEvent(evtTyp).String())
if err != nil {
return nil, errors.Wrap(err, "failed to subscribe")
}
@ -69,10 +69,8 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type
defer c.UnsubscribeAll(ctx, subscriber)
select {
case msg := <-sub.Out():
return msg.Data().(types.TMEventData), nil
case <-sub.Cancelled():
return nil, errors.New("subscription was cancelled")
case event := <-eventCh:
return event.Data.(types.TMEventData), nil
case <-ctx.Done():
return nil, errors.New("timed out waiting for event")
}


+ 77
- 80
rpc/client/httpclient.go View File

@ -2,11 +2,14 @@ package client
import (
"context"
"strings"
"sync"
"time"
"github.com/pkg/errors"
amino "github.com/tendermint/go-amino"
cmn "github.com/tendermint/tendermint/libs/common"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
@ -15,13 +18,18 @@ import (
)
/*
HTTP is a Client implementation that communicates
with a tendermint node over json rpc and websockets.
This is the main implementation you probably want to use in
production code. There are other implementations when calling
the tendermint node in-process (local), or when you want to mock
out the server for test code (mock).
HTTP is a Client implementation that communicates with a tendermint node over
json rpc and websockets.
This is the main implementation you probably want to use in production code.
There are other implementations when calling the tendermint node in-process
(Local), or when you want to mock out the server for test code (mock).
You can subscribe for any event published by Tendermint using Subscribe method.
Note delivery is best-effort. If you don't read events fast enough or network
is slow, Tendermint might cancel the subscription. The client will attempt to
resubscribe (you don't need to do anything). It will keep trying every second
indefinitely until successful.
*/
type HTTP struct {
remote string
@ -249,28 +257,6 @@ func (c *HTTP) Validators(height *int64) (*ctypes.ResultValidators, error) {
/** websocket event stuff here... **/
type subscription struct {
out chan tmpubsub.Message
cancelled chan struct{}
mtx sync.RWMutex
err error
}
func (s *subscription) Out() <-chan tmpubsub.Message {
return s.out
}
func (s *subscription) Cancelled() <-chan struct{} {
return s.cancelled
}
func (s *subscription) Err() error {
s.mtx.RLock()
defer s.mtx.RUnlock()
return s.err
}
type WSEvents struct {
cmn.BaseService
cdc *amino.Codec
@ -279,8 +265,8 @@ type WSEvents struct {
ws *rpcclient.WSClient
mtx sync.RWMutex
// query -> subscription
subscriptions map[string]*subscription
// query -> chan
subscriptions map[string]chan ctypes.ResultEvent
}
func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents {
@ -288,16 +274,18 @@ func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents {
cdc: cdc,
endpoint: endpoint,
remote: remote,
subscriptions: make(map[string]*subscription),
subscriptions: make(map[string]chan ctypes.ResultEvent),
}
wsEvents.BaseService = *cmn.NewBaseService(nil, "WSEvents", wsEvents)
return wsEvents
}
// OnStart implements cmn.Service by starting WSClient and event loop.
func (w *WSEvents) OnStart() error {
w.ws = rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() {
w.redoSubscriptions()
// resubscribe immediately
w.redoSubscriptionsAfter(0 * time.Second)
}))
w.ws.SetCodec(w.cdc)
@ -310,75 +298,63 @@ func (w *WSEvents) OnStart() error {
return nil
}
// Stop wraps the BaseService/eventSwitch actions as Start does
// OnStop implements cmn.Service by stopping WSClient.
func (w *WSEvents) OnStop() {
err := w.ws.Stop()
if err != nil {
w.Logger.Error("failed to stop WSClient", "err", err)
}
_ = w.ws.Stop()
}
func (w *WSEvents) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (types.Subscription, error) {
q := query.String()
// Subscribe implements EventsClient by using WSClient to subscribe given
// subscriber to query. By default, returns a channel with cap=1. Error is
// returned if it fails to subscribe.
// Channel is never closed to prevent clients from seeing an erroneus event.
func (w *WSEvents) Subscribe(ctx context.Context, subscriber, query string,
outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) {
err := w.ws.Subscribe(ctx, q)
if err != nil {
if err := w.ws.Subscribe(ctx, query); err != nil {
return nil, err
}
outCap := 1
if len(outCapacity) > 0 && outCapacity[0] >= 0 {
if len(outCapacity) > 0 {
outCap = outCapacity[0]
}
outc := make(chan ctypes.ResultEvent, outCap)
w.mtx.Lock()
// subscriber param is ignored because Tendermint will override it with
// remote IP anyway.
w.subscriptions[q] = &subscription{
out: make(chan tmpubsub.Message, outCap),
cancelled: make(chan struct{}),
}
w.subscriptions[query] = outc
w.mtx.Unlock()
return w.subscriptions[q], nil
return outc, nil
}
func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error {
q := query.String()
err := w.ws.Unsubscribe(ctx, q)
if err != nil {
// Unsubscribe implements EventsClient by using WSClient to unsubscribe given
// subscriber from query.
func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber, query string) error {
if err := w.ws.Unsubscribe(ctx, query); err != nil {
return err
}
w.mtx.Lock()
sub, ok := w.subscriptions[q]
_, ok := w.subscriptions[query]
if ok {
close(sub.cancelled)
sub.mtx.Lock()
sub.err = errors.New("unsubscribed")
sub.mtx.Unlock()
delete(w.subscriptions, q)
delete(w.subscriptions, query)
}
w.mtx.Unlock()
return nil
}
// UnsubscribeAll implements EventsClient by using WSClient to unsubscribe
// given subscriber from all the queries.
func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error {
err := w.ws.UnsubscribeAll(ctx)
if err != nil {
if err := w.ws.UnsubscribeAll(ctx); err != nil {
return err
}
w.mtx.Lock()
for _, sub := range w.subscriptions {
close(sub.cancelled)
sub.mtx.Lock()
sub.err = errors.New("unsubscribed")
sub.mtx.Unlock()
}
w.subscriptions = make(map[string]*subscription)
w.subscriptions = make(map[string]chan ctypes.ResultEvent)
w.mtx.Unlock()
return nil
@ -386,18 +362,21 @@ func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error
// After being reconnected, it is necessary to redo subscription to server
// otherwise no data will be automatically received.
func (w *WSEvents) redoSubscriptions() {
func (w *WSEvents) redoSubscriptionsAfter(d time.Duration) {
time.Sleep(d)
for q := range w.subscriptions {
// NOTE: no timeout for resubscribing
// FIXME: better logging/handling of errors??
w.ws.Subscribe(context.Background(), q)
err := w.ws.Subscribe(context.Background(), q)
if err != nil {
w.Logger.Error("Failed to resubscribe", "err", err)
}
}
}
// eventListener is an infinite loop pulling all websocket events
// and pushing them to the EventSwitch.
//
// the goroutine only stops by closing quit
func isErrAlreadySubscribed(err error) bool {
return strings.Contains(err.Error(), tmpubsub.ErrAlreadySubscribed.Error())
}
func (w *WSEvents) eventListener() {
for {
select {
@ -405,21 +384,39 @@ func (w *WSEvents) eventListener() {
if !ok {
return
}
if resp.Error != nil {
w.Logger.Error("WS error", "err", resp.Error.Error())
// Error can be ErrAlreadySubscribed or max client (subscriptions per
// client) reached or Tendermint exited.
// We can ignore ErrAlreadySubscribed, but need to retry in other
// cases.
if !isErrAlreadySubscribed(resp.Error) {
// Resubscribe after 1 second to give Tendermint time to restart (if
// crashed).
w.redoSubscriptionsAfter(1 * time.Second)
}
continue
}
result := new(ctypes.ResultEvent)
err := w.cdc.UnmarshalJSON(resp.Result, result)
if err != nil {
w.Logger.Error("failed to unmarshal response", "err", err)
continue
}
// NOTE: writing also happens inside mutex so we can't close a channel in
// Unsubscribe/UnsubscribeAll.
w.mtx.RLock()
if sub, ok := w.subscriptions[result.Query]; ok {
sub.out <- tmpubsub.NewMessage(result.Data, result.Tags)
if out, ok := w.subscriptions[result.Query]; ok {
if cap(out) == 0 {
out <- *result
} else {
select {
case out <- *result:
default:
w.Logger.Error("wanted to publish ResultEvent, but out channel is full", "result", result, "query", result.Query)
}
}
}
w.mtx.RUnlock()
case <-w.Quit():


+ 14
- 1
rpc/client/interface.go View File

@ -21,6 +21,8 @@ implementation.
*/
import (
"context"
cmn "github.com/tendermint/tendermint/libs/common"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
@ -91,7 +93,18 @@ type NetworkClient interface {
// EventsClient is reactive, you can subscribe to any message, given the proper
// string. see tendermint/types/events.go
type EventsClient interface {
types.EventBusSubscriber
// Subscribe subscribes given subscriber to query. Returns a channel with
// cap=1 onto which events are published. An error is returned if it fails to
// subscribe. outCapacity can be used optionally to set capacity for the
// channel. Channel is never closed to prevent accidental reads.
//
// ctx cannot be used to unsubscribe. To unsubscribe, use either Unsubscribe
// or UnsubscribeAll.
Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error)
// Unsubscribe unsubscribes given subscriber from query.
Unsubscribe(ctx context.Context, subscriber, query string) error
// UnsubscribeAll unsubscribes given subscriber from all the queries.
UnsubscribeAll(ctx context.Context, subscriber string) error
}
// MempoolClient shows us data about current mempool state.


+ 139
- 49
rpc/client/localclient.go View File

@ -2,12 +2,18 @@ package client
import (
"context"
"time"
"github.com/pkg/errors"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
nm "github.com/tendermint/tendermint/node"
"github.com/tendermint/tendermint/rpc/core"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
"github.com/tendermint/tendermint/types"
)
@ -24,9 +30,17 @@ are compiled in process.
For real clients, you probably want to use client.HTTP. For more
powerful control during testing, you probably want the "client/mock" package.
You can subscribe for any event published by Tendermint using Subscribe method.
Note delivery is best-effort. If you don't read events fast enough, Tendermint
might cancel the subscription. The client will attempt to resubscribe (you
don't need to do anything). It will keep trying indefinitely with exponential
backoff (10ms -> 20ms -> 40ms) until successful.
*/
type Local struct {
*types.EventBus
Logger log.Logger
ctx *rpctypes.Context
}
// NewLocal configures a client that calls the Node directly.
@ -39,113 +53,189 @@ func NewLocal(node *nm.Node) *Local {
node.ConfigureRPC()
return &Local{
EventBus: node.EventBus(),
Logger: log.NewNopLogger(),
ctx: &rpctypes.Context{},
}
}
var (
_ Client = (*Local)(nil)
_ NetworkClient = Local{}
_ NetworkClient = (*Local)(nil)
_ EventsClient = (*Local)(nil)
)
func (Local) Status() (*ctypes.ResultStatus, error) {
return core.Status()
// SetLogger allows to set a logger on the client.
func (c *Local) SetLogger(l log.Logger) {
c.Logger = l
}
func (c *Local) Status() (*ctypes.ResultStatus, error) {
return core.Status(c.ctx)
}
func (Local) ABCIInfo() (*ctypes.ResultABCIInfo, error) {
return core.ABCIInfo()
func (c *Local) ABCIInfo() (*ctypes.ResultABCIInfo, error) {
return core.ABCIInfo(c.ctx)
}
func (c *Local) ABCIQuery(path string, data cmn.HexBytes) (*ctypes.ResultABCIQuery, error) {
return c.ABCIQueryWithOptions(path, data, DefaultABCIQueryOptions)
}
func (Local) ABCIQueryWithOptions(path string, data cmn.HexBytes, opts ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) {
return core.ABCIQuery(path, data, opts.Height, opts.Prove)
func (c *Local) ABCIQueryWithOptions(path string, data cmn.HexBytes, opts ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) {
return core.ABCIQuery(c.ctx, path, data, opts.Height, opts.Prove)
}
func (Local) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
return core.BroadcastTxCommit(tx)
func (c *Local) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
return core.BroadcastTxCommit(c.ctx, tx)
}
func (Local) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
return core.BroadcastTxAsync(tx)
func (c *Local) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
return core.BroadcastTxAsync(c.ctx, tx)
}
func (Local) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
return core.BroadcastTxSync(tx)
func (c *Local) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
return core.BroadcastTxSync(c.ctx, tx)
}
func (Local) UnconfirmedTxs(limit int) (*ctypes.ResultUnconfirmedTxs, error) {
return core.UnconfirmedTxs(limit)
func (c *Local) UnconfirmedTxs(limit int) (*ctypes.ResultUnconfirmedTxs, error) {
return core.UnconfirmedTxs(c.ctx, limit)
}
func (Local) NumUnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) {
return core.NumUnconfirmedTxs()
func (c *Local) NumUnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) {
return core.NumUnconfirmedTxs(c.ctx)
}
func (Local) NetInfo() (*ctypes.ResultNetInfo, error) {
return core.NetInfo()
func (c *Local) NetInfo() (*ctypes.ResultNetInfo, error) {
return core.NetInfo(c.ctx)
}
func (Local) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
return core.DumpConsensusState()
func (c *Local) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
return core.DumpConsensusState(c.ctx)
}
func (Local) ConsensusState() (*ctypes.ResultConsensusState, error) {
return core.ConsensusState()
func (c *Local) ConsensusState() (*ctypes.ResultConsensusState, error) {
return core.ConsensusState(c.ctx)
}
func (Local) Health() (*ctypes.ResultHealth, error) {
return core.Health()
func (c *Local) Health() (*ctypes.ResultHealth, error) {
return core.Health(c.ctx)
}
func (Local) DialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) {
return core.UnsafeDialSeeds(seeds)
func (c *Local) DialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) {
return core.UnsafeDialSeeds(c.ctx, seeds)
}
func (Local) DialPeers(peers []string, persistent bool) (*ctypes.ResultDialPeers, error) {
return core.UnsafeDialPeers(peers, persistent)
func (c *Local) DialPeers(peers []string, persistent bool) (*ctypes.ResultDialPeers, error) {
return core.UnsafeDialPeers(c.ctx, peers, persistent)
}
func (Local) BlockchainInfo(minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) {
return core.BlockchainInfo(minHeight, maxHeight)
func (c *Local) BlockchainInfo(minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) {
return core.BlockchainInfo(c.ctx, minHeight, maxHeight)
}
func (Local) Genesis() (*ctypes.ResultGenesis, error) {
return core.Genesis()
func (c *Local) Genesis() (*ctypes.ResultGenesis, error) {
return core.Genesis(c.ctx)
}
func (Local) Block(height *int64) (*ctypes.ResultBlock, error) {
return core.Block(height)
func (c *Local) Block(height *int64) (*ctypes.ResultBlock, error) {
return core.Block(c.ctx, height)
}
func (Local) BlockResults(height *int64) (*ctypes.ResultBlockResults, error) {
return core.BlockResults(height)
func (c *Local) BlockResults(height *int64) (*ctypes.ResultBlockResults, error) {
return core.BlockResults(c.ctx, height)
}
func (Local) Commit(height *int64) (*ctypes.ResultCommit, error) {
return core.Commit(height)
func (c *Local) Commit(height *int64) (*ctypes.ResultCommit, error) {
return core.Commit(c.ctx, height)
}
func (Local) Validators(height *int64) (*ctypes.ResultValidators, error) {
return core.Validators(height)
func (c *Local) Validators(height *int64) (*ctypes.ResultValidators, error) {
return core.Validators(c.ctx, height)
}
func (Local) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
return core.Tx(hash, prove)
func (c *Local) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
return core.Tx(c.ctx, hash, prove)
}
func (Local) TxSearch(query string, prove bool, page, perPage int) (*ctypes.ResultTxSearch, error) {
return core.TxSearch(query, prove, page, perPage)
func (c *Local) TxSearch(query string, prove bool, page, perPage int) (*ctypes.ResultTxSearch, error) {
return core.TxSearch(c.ctx, query, prove, page, perPage)
}
func (c *Local) Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) {
q, err := tmquery.New(query)
if err != nil {
return nil, errors.Wrap(err, "failed to parse query")
}
sub, err := c.EventBus.Subscribe(ctx, subscriber, q)
if err != nil {
return nil, errors.Wrap(err, "failed to subscribe")
}
outCap := 1
if len(outCapacity) > 0 {
outCap = outCapacity[0]
}
outc := make(chan ctypes.ResultEvent, outCap)
go c.eventsRoutine(sub, subscriber, q, outc)
return outc, nil
}
func (c *Local) eventsRoutine(sub types.Subscription, subscriber string, q tmpubsub.Query, outc chan<- ctypes.ResultEvent) {
for {
select {
case msg := <-sub.Out():
result := ctypes.ResultEvent{Query: q.String(), Data: msg.Data(), Tags: msg.Tags()}
if cap(outc) == 0 {
outc <- result
} else {
select {
case outc <- result:
default:
c.Logger.Error("wanted to publish ResultEvent, but out channel is full", "result", result, "query", result.Query)
}
}
case <-sub.Cancelled():
if sub.Err() == tmpubsub.ErrUnsubscribed {
return
}
c.Logger.Error("subscription was cancelled, resubscribing...", "err", sub.Err(), "query", q.String())
sub = c.resubscribe(subscriber, q)
if sub == nil { // client was stopped
return
}
case <-c.Quit():
return
}
}
}
func (c *Local) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (types.Subscription, error) {
return c.EventBus.Subscribe(ctx, subscriber, query, outCapacity...)
// Try to resubscribe with exponential backoff.
func (c *Local) resubscribe(subscriber string, q tmpubsub.Query) types.Subscription {
attempts := 0
for {
if !c.IsRunning() {
return nil
}
sub, err := c.EventBus.Subscribe(context.Background(), subscriber, q)
if err == nil {
return sub
}
attempts++
time.Sleep((10 << uint(attempts)) * time.Millisecond) // 10ms -> 20ms -> 40ms
}
}
func (c *Local) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error {
return c.EventBus.Unsubscribe(ctx, subscriber, query)
func (c *Local) Unsubscribe(ctx context.Context, subscriber, query string) error {
q, err := tmquery.New(query)
if err != nil {
return errors.Wrap(err, "failed to parse query")
}
return c.EventBus.Unsubscribe(ctx, subscriber, q)
}
func (c *Local) UnsubscribeAll(ctx context.Context, subscriber string) error {


+ 15
- 14
rpc/client/mock/client.go View File

@ -21,6 +21,7 @@ import (
"github.com/tendermint/tendermint/rpc/client"
"github.com/tendermint/tendermint/rpc/core"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
"github.com/tendermint/tendermint/types"
)
@ -76,11 +77,11 @@ func (c Call) GetResponse(args interface{}) (interface{}, error) {
}
func (c Client) Status() (*ctypes.ResultStatus, error) {
return core.Status()
return core.Status(&rpctypes.Context{})
}
func (c Client) ABCIInfo() (*ctypes.ResultABCIInfo, error) {
return core.ABCIInfo()
return core.ABCIInfo(&rpctypes.Context{})
}
func (c Client) ABCIQuery(path string, data cmn.HexBytes) (*ctypes.ResultABCIQuery, error) {
@ -88,49 +89,49 @@ func (c Client) ABCIQuery(path string, data cmn.HexBytes) (*ctypes.ResultABCIQue
}
func (c Client) ABCIQueryWithOptions(path string, data cmn.HexBytes, opts client.ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) {
return core.ABCIQuery(path, data, opts.Height, opts.Prove)
return core.ABCIQuery(&rpctypes.Context{}, path, data, opts.Height, opts.Prove)
}
func (c Client) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
return core.BroadcastTxCommit(tx)
return core.BroadcastTxCommit(&rpctypes.Context{}, tx)
}
func (c Client) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
return core.BroadcastTxAsync(tx)
return core.BroadcastTxAsync(&rpctypes.Context{}, tx)
}
func (c Client) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
return core.BroadcastTxSync(tx)
return core.BroadcastTxSync(&rpctypes.Context{}, tx)
}
func (c Client) NetInfo() (*ctypes.ResultNetInfo, error) {
return core.NetInfo()
return core.NetInfo(&rpctypes.Context{})
}
func (c Client) DialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) {
return core.UnsafeDialSeeds(seeds)
return core.UnsafeDialSeeds(&rpctypes.Context{}, seeds)
}
func (c Client) DialPeers(peers []string, persistent bool) (*ctypes.ResultDialPeers, error) {
return core.UnsafeDialPeers(peers, persistent)
return core.UnsafeDialPeers(&rpctypes.Context{}, peers, persistent)
}
func (c Client) BlockchainInfo(minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) {
return core.BlockchainInfo(minHeight, maxHeight)
return core.BlockchainInfo(&rpctypes.Context{}, minHeight, maxHeight)
}
func (c Client) Genesis() (*ctypes.ResultGenesis, error) {
return core.Genesis()
return core.Genesis(&rpctypes.Context{})
}
func (c Client) Block(height *int64) (*ctypes.ResultBlock, error) {
return core.Block(height)
return core.Block(&rpctypes.Context{}, height)
}
func (c Client) Commit(height *int64) (*ctypes.ResultCommit, error) {
return core.Commit(height)
return core.Commit(&rpctypes.Context{}, height)
}
func (c Client) Validators(height *int64) (*ctypes.ResultValidators, error) {
return core.Validators(height)
return core.Validators(&rpctypes.Context{}, height)
}

+ 3
- 2
rpc/core/abci.go View File

@ -5,6 +5,7 @@ import (
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/proxy"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
)
// Query the application for some information.
@ -52,7 +53,7 @@ import (
// | data | []byte | false | true | Data |
// | height | int64 | 0 | false | Height (0 means latest) |
// | prove | bool | false | false | Includes proof if true |
func ABCIQuery(path string, data cmn.HexBytes, height int64, prove bool) (*ctypes.ResultABCIQuery, error) {
func ABCIQuery(ctx *rpctypes.Context, path string, data cmn.HexBytes, height int64, prove bool) (*ctypes.ResultABCIQuery, error) {
resQuery, err := proxyAppQuery.QuerySync(abci.RequestQuery{
Path: path,
Data: data,
@ -96,7 +97,7 @@ func ABCIQuery(path string, data cmn.HexBytes, height int64, prove bool) (*ctype
// "jsonrpc": "2.0"
// }
// ```
func ABCIInfo() (*ctypes.ResultABCIInfo, error) {
func ABCIInfo(ctx *rpctypes.Context) (*ctypes.ResultABCIInfo, error) {
resInfo, err := proxyAppQuery.InfoSync(proxy.RequestInfo)
if err != nil {
return nil, err


+ 5
- 4
rpc/core/blocks.go View File

@ -5,6 +5,7 @@ import (
cmn "github.com/tendermint/tendermint/libs/common"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
)
@ -68,7 +69,7 @@ import (
// ```
//
// <aside class="notice">Returns at most 20 items.</aside>
func BlockchainInfo(minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) {
func BlockchainInfo(ctx *rpctypes.Context, minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) {
// maximum 20 block metas
const limit int64 = 20
@ -226,7 +227,7 @@ func filterMinMax(height, min, max, limit int64) (int64, int64, error) {
// "jsonrpc": "2.0"
// }
// ```
func Block(heightPtr *int64) (*ctypes.ResultBlock, error) {
func Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error) {
storeHeight := blockStore.Height()
height, err := getHeight(storeHeight, heightPtr)
if err != nil {
@ -313,7 +314,7 @@ func Block(heightPtr *int64) (*ctypes.ResultBlock, error) {
// "jsonrpc": "2.0"
// }
// ```
func Commit(heightPtr *int64) (*ctypes.ResultCommit, error) {
func Commit(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCommit, error) {
storeHeight := blockStore.Height()
height, err := getHeight(storeHeight, heightPtr)
if err != nil {
@ -372,7 +373,7 @@ func Commit(heightPtr *int64) (*ctypes.ResultCommit, error) {
// ]
// }
// ```
func BlockResults(heightPtr *int64) (*ctypes.ResultBlockResults, error) {
func BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockResults, error) {
storeHeight := blockStore.Height()
height, err := getHeight(storeHeight, heightPtr)
if err != nil {


+ 5
- 4
rpc/core/consensus.go View File

@ -3,6 +3,7 @@ package core
import (
cm "github.com/tendermint/tendermint/consensus"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
)
@ -47,7 +48,7 @@ import (
// "jsonrpc": "2.0"
// }
// ```
func Validators(heightPtr *int64) (*ctypes.ResultValidators, error) {
func Validators(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultValidators, error) {
// The latest validator that we know is the
// NextValidator of the last block.
height := consensusState.GetState().LastBlockHeight + 1
@ -200,7 +201,7 @@ func Validators(heightPtr *int64) (*ctypes.ResultValidators, error) {
// }
// }
// ```
func DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
func DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState, error) {
// Get Peer consensus states.
peers := p2pPeers.Peers().List()
peerStates := make([]ctypes.PeerStateInfo, len(peers))
@ -277,7 +278,7 @@ func DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
// }
//}
//```
func ConsensusState() (*ctypes.ResultConsensusState, error) {
func ConsensusState(ctx *rpctypes.Context) (*ctypes.ResultConsensusState, error) {
// Get self round state.
bz, err := consensusState.GetRoundStateSimpleJSON()
return &ctypes.ResultConsensusState{RoundState: bz}, err
@ -320,7 +321,7 @@ func ConsensusState() (*ctypes.ResultConsensusState, error) {
// }
// }
// ```
func ConsensusParams(heightPtr *int64) (*ctypes.ResultConsensusParams, error) {
func ConsensusParams(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultConsensusParams, error) {
height := consensusState.GetState().LastBlockHeight + 1
height, err := getHeight(height, heightPtr)
if err != nil {


+ 9
- 4
rpc/core/dev.go View File

@ -5,16 +5,19 @@ import (
"runtime/pprof"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
)
func UnsafeFlushMempool() (*ctypes.ResultUnsafeFlushMempool, error) {
// UnsafeFlushMempool removes all transactions from the mempool.
func UnsafeFlushMempool(ctx *rpctypes.Context) (*ctypes.ResultUnsafeFlushMempool, error) {
mempool.Flush()
return &ctypes.ResultUnsafeFlushMempool{}, nil
}
var profFile *os.File
func UnsafeStartCPUProfiler(filename string) (*ctypes.ResultUnsafeProfile, error) {
// UnsafeStartCPUProfiler starts a pprof profiler using the given filename.
func UnsafeStartCPUProfiler(ctx *rpctypes.Context, filename string) (*ctypes.ResultUnsafeProfile, error) {
var err error
profFile, err = os.Create(filename)
if err != nil {
@ -27,7 +30,8 @@ func UnsafeStartCPUProfiler(filename string) (*ctypes.ResultUnsafeProfile, error
return &ctypes.ResultUnsafeProfile{}, nil
}
func UnsafeStopCPUProfiler() (*ctypes.ResultUnsafeProfile, error) {
// UnsafeStopCPUProfiler stops the running pprof profiler.
func UnsafeStopCPUProfiler(ctx *rpctypes.Context) (*ctypes.ResultUnsafeProfile, error) {
pprof.StopCPUProfile()
if err := profFile.Close(); err != nil {
return nil, err
@ -35,7 +39,8 @@ func UnsafeStopCPUProfiler() (*ctypes.ResultUnsafeProfile, error) {
return &ctypes.ResultUnsafeProfile{}, nil
}
func UnsafeWriteHeapProfile(filename string) (*ctypes.ResultUnsafeProfile, error) {
// UnsafeWriteHeapProfile dumps a heap profile to the given filename.
func UnsafeWriteHeapProfile(ctx *rpctypes.Context, filename string) (*ctypes.ResultUnsafeProfile, error) {
memProfFile, err := os.Create(filename)
if err != nil {
return nil, err


+ 34
- 27
rpc/core/events.go View File

@ -6,10 +6,10 @@ import (
"github.com/pkg/errors"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
tmtypes "github.com/tendermint/tendermint/types"
)
// Subscribe for events via WebSocket.
@ -90,8 +90,15 @@ import (
// | query | string | "" | true | Query |
//
// <aside class="notice">WebSocket only</aside>
func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscribe, error) {
addr := wsCtx.GetRemoteAddr()
func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) {
addr := ctx.RemoteAddr()
if eventBus.NumClients() >= config.MaxSubscriptionClients {
return nil, fmt.Errorf("max_subscription_clients %d reached", config.MaxSubscriptionClients)
} else if eventBus.NumClientSubscriptions(addr) >= config.MaxSubscriptionsPerClient {
return nil, fmt.Errorf("max_subscriptions_per_client %d reached", config.MaxSubscriptionsPerClient)
}
logger.Info("Subscribe to query", "remote", addr, "query", query)
q, err := tmquery.New(query)
@ -99,9 +106,9 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri
return nil, errors.Wrap(err, "failed to parse query")
}
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
subCtx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel()
sub, err := eventBusFor(wsCtx).Subscribe(ctx, addr, q)
sub, err := eventBus.Subscribe(subCtx, addr, q)
if err != nil {
return nil, err
}
@ -111,18 +118,26 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri
select {
case msg := <-sub.Out():
resultEvent := &ctypes.ResultEvent{Query: query, Data: msg.Data(), Tags: msg.Tags()}
wsCtx.TryWriteRPCResponse(
ctx.WSConn.TryWriteRPCResponse(
rpctypes.NewRPCSuccessResponse(
wsCtx.Codec(),
rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", wsCtx.Request.ID)),
ctx.WSConn.Codec(),
rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", ctx.JSONReq.ID)),
resultEvent,
))
case <-sub.Cancelled():
wsCtx.TryWriteRPCResponse(
rpctypes.RPCServerError(rpctypes.JSONRPCStringID(
fmt.Sprintf("%v#event", wsCtx.Request.ID)),
fmt.Errorf("subscription was cancelled (reason: %v)", sub.Err()),
))
if sub.Err() != tmpubsub.ErrUnsubscribed {
var reason string
if sub.Err() == nil {
reason = "Tendermint exited"
} else {
reason = sub.Err().Error()
}
ctx.WSConn.TryWriteRPCResponse(
rpctypes.RPCServerError(rpctypes.JSONRPCStringID(
fmt.Sprintf("%v#event", ctx.JSONReq.ID)),
fmt.Errorf("subscription was cancelled (reason: %s)", reason),
))
}
return
}
}
@ -161,14 +176,14 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri
// | query | string | "" | true | Query |
//
// <aside class="notice">WebSocket only</aside>
func Unsubscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultUnsubscribe, error) {
addr := wsCtx.GetRemoteAddr()
func Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) {
addr := ctx.RemoteAddr()
logger.Info("Unsubscribe from query", "remote", addr, "query", query)
q, err := tmquery.New(query)
if err != nil {
return nil, errors.Wrap(err, "failed to parse query")
}
err = eventBusFor(wsCtx).Unsubscribe(context.Background(), addr, q)
err = eventBus.Unsubscribe(context.Background(), addr, q)
if err != nil {
return nil, err
}
@ -199,20 +214,12 @@ func Unsubscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultUnsub
// ```
//
// <aside class="notice">WebSocket only</aside>
func UnsubscribeAll(wsCtx rpctypes.WSRPCContext) (*ctypes.ResultUnsubscribe, error) {
addr := wsCtx.GetRemoteAddr()
func UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) {
addr := ctx.RemoteAddr()
logger.Info("Unsubscribe from all", "remote", addr)
err := eventBusFor(wsCtx).UnsubscribeAll(context.Background(), addr)
err := eventBus.UnsubscribeAll(context.Background(), addr)
if err != nil {
return nil, err
}
return &ctypes.ResultUnsubscribe{}, nil
}
func eventBusFor(wsCtx rpctypes.WSRPCContext) tmtypes.EventBusSubscriber {
es := wsCtx.GetEventSubscriber()
if es == nil {
es = eventBus
}
return es
}

+ 2
- 1
rpc/core/health.go View File

@ -2,6 +2,7 @@ package core
import (
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
)
// Get node health. Returns empty result (200 OK) on success, no response - in
@ -31,6 +32,6 @@ import (
// "jsonrpc": "2.0"
// }
// ```
func Health() (*ctypes.ResultHealth, error) {
func Health(ctx *rpctypes.Context) (*ctypes.ResultHealth, error) {
return &ctypes.ResultHealth{}, nil
}

+ 31
- 13
rpc/core/mempool.go View File

@ -9,7 +9,7 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpcserver "github.com/tendermint/tendermint/rpc/lib/server"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
"github.com/tendermint/tendermint/types"
)
@ -59,7 +59,7 @@ import (
// | Parameter | Type | Default | Required | Description |
// |-----------+------+---------+----------+-----------------|
// | tx | Tx | nil | true | The transaction |
func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
err := mempool.CheckTx(tx, nil)
if err != nil {
return nil, err
@ -108,7 +108,7 @@ func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
// | Parameter | Type | Default | Required | Description |
// |-----------+------+---------+----------+-----------------|
// | tx | Tx | nil | true | The transaction |
func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
resCh := make(chan *abci.Response, 1)
err := mempool.CheckTx(tx, func(res *abci.Response) {
resCh <- res
@ -128,6 +128,11 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
// Returns with the responses from CheckTx and DeliverTx.
//
// IMPORTANT: use only for testing and development. In production, use
// BroadcastTxSync or BroadcastTxAsync. You can subscribe for the transaction
// result using JSONRPC via a websocket. See
// https://tendermint.com/docs/app-dev/subscribing-to-events-via-websocket.html
//
// CONTRACT: only returns error if mempool.CheckTx() errs or if we timeout
// waiting for tx to commit.
//
@ -182,18 +187,26 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
// | Parameter | Type | Default | Required | Description |
// |-----------+------+---------+----------+-----------------|
// | tx | Tx | nil | true | The transaction |
func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
subscriber := ctx.RemoteAddr()
if eventBus.NumClients() >= config.MaxSubscriptionClients {
return nil, fmt.Errorf("max_subscription_clients %d reached", config.MaxSubscriptionClients)
} else if eventBus.NumClientSubscriptions(subscriber) >= config.MaxSubscriptionsPerClient {
return nil, fmt.Errorf("max_subscriptions_per_client %d reached", config.MaxSubscriptionsPerClient)
}
// Subscribe to tx being committed in block.
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
subCtx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel()
q := types.EventQueryTxFor(tx)
deliverTxSub, err := eventBus.Subscribe(ctx, "mempool", q)
deliverTxSub, err := eventBus.Subscribe(subCtx, subscriber, q)
if err != nil {
err = errors.Wrap(err, "failed to subscribe to tx")
logger.Error("Error on broadcast_tx_commit", "err", err)
return nil, err
}
defer eventBus.Unsubscribe(context.Background(), "mempool", q)
defer eventBus.Unsubscribe(context.Background(), subscriber, q)
// Broadcast tx and wait for CheckTx result
checkTxResCh := make(chan *abci.Response, 1)
@ -215,8 +228,6 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
}
// Wait for the tx to be included in a block or timeout.
// TODO: configurable?
var deliverTxTimeout = rpcserver.WriteTimeout / 2
select {
case msg := <-deliverTxSub.Out(): // The tx was included in a block.
deliverTxRes := msg.Data().(types.EventDataTx)
@ -227,14 +238,20 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
Height: deliverTxRes.Height,
}, nil
case <-deliverTxSub.Cancelled():
err = errors.New("deliverTxSub was cancelled. Did the Tendermint stop?")
var reason string
if deliverTxSub.Err() == nil {
reason = "Tendermint exited"
} else {
reason = deliverTxSub.Err().Error()
}
err = fmt.Errorf("deliverTxSub was cancelled (reason: %s)", reason)
logger.Error("Error on broadcastTxCommit", "err", err)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxRes,
DeliverTx: abci.ResponseDeliverTx{},
Hash: tx.Hash(),
}, err
case <-time.After(deliverTxTimeout):
case <-time.After(config.TimeoutBroadcastTxCommit):
err = errors.New("Timed out waiting for tx to be included in a block")
logger.Error("Error on broadcastTxCommit", "err", err)
return &ctypes.ResultBroadcastTxCommit{
@ -281,7 +298,8 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// | Parameter | Type | Default | Required | Description |
// |-----------+------+---------+----------+--------------------------------------|
// | limit | int | 30 | false | Maximum number of entries (max: 100) |
func UnconfirmedTxs(limit int) (*ctypes.ResultUnconfirmedTxs, error) {
// ```
func UnconfirmedTxs(ctx *rpctypes.Context, limit int) (*ctypes.ResultUnconfirmedTxs, error) {
// reuse per_page validator
limit = validatePerPage(limit)
@ -323,7 +341,7 @@ func UnconfirmedTxs(limit int) (*ctypes.ResultUnconfirmedTxs, error) {
// }
// }
// ```
func NumUnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) {
func NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, error) {
return &ctypes.ResultUnconfirmedTxs{
Count: mempool.Size(),
Total: mempool.Size(),


+ 5
- 4
rpc/core/net.go View File

@ -7,6 +7,7 @@ import (
"github.com/tendermint/tendermint/p2p"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
)
// Get network info.
@ -153,7 +154,7 @@ import (
// ...
// }
// ```
func NetInfo() (*ctypes.ResultNetInfo, error) {
func NetInfo(ctx *rpctypes.Context) (*ctypes.ResultNetInfo, error) {
out, in, _ := p2pPeers.NumPeers()
peers := make([]ctypes.Peer, 0, out+in)
for _, peer := range p2pPeers.Peers().List() {
@ -179,7 +180,7 @@ func NetInfo() (*ctypes.ResultNetInfo, error) {
}, nil
}
func UnsafeDialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) {
func UnsafeDialSeeds(ctx *rpctypes.Context, seeds []string) (*ctypes.ResultDialSeeds, error) {
if len(seeds) == 0 {
return &ctypes.ResultDialSeeds{}, errors.New("No seeds provided")
}
@ -192,7 +193,7 @@ func UnsafeDialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) {
return &ctypes.ResultDialSeeds{Log: "Dialing seeds in progress. See /net_info for details"}, nil
}
func UnsafeDialPeers(peers []string, persistent bool) (*ctypes.ResultDialPeers, error) {
func UnsafeDialPeers(ctx *rpctypes.Context, peers []string, persistent bool) (*ctypes.ResultDialPeers, error) {
if len(peers) == 0 {
return &ctypes.ResultDialPeers{}, errors.New("No peers provided")
}
@ -247,6 +248,6 @@ func UnsafeDialPeers(peers []string, persistent bool) (*ctypes.ResultDialPeers,
// "jsonrpc": "2.0"
// }
// ```
func Genesis() (*ctypes.ResultGenesis, error) {
func Genesis(ctx *rpctypes.Context) (*ctypes.ResultGenesis, error) {
return &ctypes.ResultGenesis{Genesis: genDoc}, nil
}

+ 8
- 0
rpc/core/pipe.go View File

@ -1,6 +1,7 @@
package core
import (
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/crypto"
dbm "github.com/tendermint/tendermint/libs/db"
@ -71,6 +72,8 @@ var (
mempool *mempl.Mempool
logger log.Logger
config cfg.RPCConfig
)
func SetStateDB(db dbm.DB) {
@ -133,6 +136,11 @@ func SetEventBus(b *types.EventBus) {
eventBus = b
}
// SetConfig sets an RPCConfig.
func SetConfig(c cfg.RPCConfig) {
config = c
}
func validatePage(page, perPage, totalCount int) int {
if perPage < 1 {
return 1


+ 2
- 1
rpc/core/status.go View File

@ -7,6 +7,7 @@ import (
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/p2p"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
)
@ -70,7 +71,7 @@ import (
// }
// }
// ```
func Status() (*ctypes.ResultStatus, error) {
func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) {
var latestHeight int64
if consensusReactor.FastSync() {
latestHeight = blockStore.Height()


+ 3
- 2
rpc/core/tx.go View File

@ -7,6 +7,7 @@ import (
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
"github.com/tendermint/tendermint/state/txindex/null"
"github.com/tendermint/tendermint/types"
)
@ -77,7 +78,7 @@ import (
// - `index`: `int` - index of the transaction
// - `height`: `int` - height of the block where this transaction was in
// - `hash`: `[]byte` - hash of the transaction
func Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
func Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) {
// if index is disabled, return error
if _, ok := txIndexer.(*null.TxIndex); ok {
@ -183,7 +184,7 @@ func Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
// - `index`: `int` - index of the transaction
// - `height`: `int` - height of the block where this transaction was in
// - `hash`: `[]byte` - hash of the transaction
func TxSearch(query string, prove bool, page, perPage int) (*ctypes.ResultTxSearch, error) {
func TxSearch(ctx *rpctypes.Context, query string, prove bool, page, perPage int) (*ctypes.ResultTxSearch, error) {
// if index is disabled, return error
if _, ok := txIndexer.(*null.TxIndex); ok {
return nil, fmt.Errorf("Transaction indexing is disabled")


+ 5
- 2
rpc/grpc/api.go View File

@ -5,6 +5,7 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
core "github.com/tendermint/tendermint/rpc/core"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
)
type broadcastAPI struct {
@ -16,12 +17,14 @@ func (bapi *broadcastAPI) Ping(ctx context.Context, req *RequestPing) (*Response
}
func (bapi *broadcastAPI) BroadcastTx(ctx context.Context, req *RequestBroadcastTx) (*ResponseBroadcastTx, error) {
res, err := core.BroadcastTxCommit(req.Tx)
// NOTE: there's no way to get client's remote address
// see https://stackoverflow.com/questions/33684570/session-and-remote-ip-address-in-grpc-go
res, err := core.BroadcastTxCommit(&rpctypes.Context{}, req.Tx)
if err != nil {
return nil, err
}
return &ResponseBroadcastTx{
return &ResponseBroadcastTx{
CheckTx: &abci.ResponseCheckTx{
Code: res.CheckTx.Code,
Data: res.CheckTx.Data,


+ 2
- 1
rpc/grpc/client_server.go View File

@ -14,7 +14,8 @@ type Config struct {
MaxOpenConnections int
}
// StartGRPCServer starts a new gRPC BroadcastAPIServer using the given net.Listener.
// StartGRPCServer starts a new gRPC BroadcastAPIServer using the given
// net.Listener.
// NOTE: This function blocks - you may want to call it in a go-routine.
func StartGRPCServer(ln net.Listener) error {
grpcServer := grpc.NewServer()


+ 3
- 4
rpc/grpc/grpc_test.go View File

@ -25,9 +25,8 @@ func TestMain(m *testing.M) {
}
func TestBroadcastTx(t *testing.T) {
require := require.New(t)
res, err := rpctest.GetGRPCClient().BroadcastTx(context.Background(), &core_grpc.RequestBroadcastTx{Tx: []byte("this is a tx")})
require.Nil(err, "%+v", err)
require.EqualValues(0, res.CheckTx.Code)
require.EqualValues(0, res.DeliverTx.Code)
require.NoError(t, err)
require.EqualValues(t, 0, res.CheckTx.Code)
require.EqualValues(t, 0, res.DeliverTx.Code)
}

+ 5
- 5
rpc/lib/rpc_test.go View File

@ -63,23 +63,23 @@ var Routes = map[string]*server.RPCFunc{
// Amino codec required to encode/decode everything above.
var RoutesCdc = amino.NewCodec()
func EchoResult(v string) (*ResultEcho, error) {
func EchoResult(ctx *types.Context, v string) (*ResultEcho, error) {
return &ResultEcho{v}, nil
}
func EchoWSResult(wsCtx types.WSRPCContext, v string) (*ResultEcho, error) {
func EchoWSResult(ctx *types.Context, v string) (*ResultEcho, error) {
return &ResultEcho{v}, nil
}
func EchoIntResult(v int) (*ResultEchoInt, error) {
func EchoIntResult(ctx *types.Context, v int) (*ResultEchoInt, error) {
return &ResultEchoInt{v}, nil
}
func EchoBytesResult(v []byte) (*ResultEchoBytes, error) {
func EchoBytesResult(ctx *types.Context, v []byte) (*ResultEchoBytes, error) {
return &ResultEchoBytes{v}, nil
}
func EchoDataBytesResult(v cmn.HexBytes) (*ResultEchoDataBytes, error) {
func EchoDataBytesResult(ctx *types.Context, v cmn.HexBytes) (*ResultEchoDataBytes, error) {
return &ResultEchoDataBytes{v}, nil
}


+ 45
- 52
rpc/lib/server/handlers.go View File

@ -2,7 +2,6 @@ package rpcserver
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
@ -129,20 +128,26 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, cdc *amino.Codec, logger lo
WriteRPCResponseHTTP(w, types.RPCInvalidRequestError(request.ID, errors.Errorf("Path %s is invalid", r.URL.Path)))
return
}
rpcFunc := funcMap[request.Method]
if rpcFunc == nil || rpcFunc.ws {
WriteRPCResponseHTTP(w, types.RPCMethodNotFoundError(request.ID))
return
}
var args []reflect.Value
ctx := &types.Context{JSONReq: &request, HTTPReq: r}
args := []reflect.Value{reflect.ValueOf(ctx)}
if len(request.Params) > 0 {
args, err = jsonParamsToArgsRPC(rpcFunc, cdc, request.Params)
fnArgs, err := jsonParamsToArgs(rpcFunc, cdc, request.Params)
if err != nil {
WriteRPCResponseHTTP(w, types.RPCInvalidParamsError(request.ID, errors.Wrap(err, "Error converting json params to arguments")))
return
}
args = append(args, fnArgs...)
}
returns := rpcFunc.f.Call(args)
logger.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
result, err := unreflectResult(returns)
if err != nil {
@ -205,13 +210,14 @@ func arrayParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, params []json.RawMess
return values, nil
}
// `raw` is unparsed json (from json.RawMessage) encoding either a map or an array.
// `argsOffset` should be 0 for RPC calls, and 1 for WS requests, where len(rpcFunc.args) != len(rpcFunc.argNames).
// raw is unparsed json (from json.RawMessage) encoding either a map or an
// array.
//
// Example:
// rpcFunc.args = [rpctypes.WSRPCContext string]
// rpcFunc.args = [rpctypes.Context string]
// rpcFunc.argNames = ["arg"]
func jsonParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, raw []byte, argsOffset int) ([]reflect.Value, error) {
func jsonParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, raw []byte) ([]reflect.Value, error) {
const argsOffset = 1
// TODO: Make more efficient, perhaps by checking the first character for '{' or '['?
// First, try to get the map.
@ -232,20 +238,6 @@ func jsonParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, raw []byte, argsOffset
return nil, errors.Errorf("Unknown type for JSON params: %v. Expected map or array", err)
}
// Convert a []interface{} OR a map[string]interface{} to properly typed values
func jsonParamsToArgsRPC(rpcFunc *RPCFunc, cdc *amino.Codec, params json.RawMessage) ([]reflect.Value, error) {
return jsonParamsToArgs(rpcFunc, cdc, params, 0)
}
// Same as above, but with the first param the websocket connection
func jsonParamsToArgsWS(rpcFunc *RPCFunc, cdc *amino.Codec, params json.RawMessage, wsCtx types.WSRPCContext) ([]reflect.Value, error) {
values, err := jsonParamsToArgs(rpcFunc, cdc, params, 1)
if err != nil {
return nil, err
}
return append([]reflect.Value{reflect.ValueOf(wsCtx)}, values...), nil
}
// rpc.json
//-----------------------------------------------------------------------------
// rpc.http
@ -258,15 +250,23 @@ func makeHTTPHandler(rpcFunc *RPCFunc, cdc *amino.Codec, logger log.Logger) func
WriteRPCResponseHTTP(w, types.RPCMethodNotFoundError(types.JSONRPCStringID("")))
}
}
// All other endpoints
return func(w http.ResponseWriter, r *http.Request) {
logger.Debug("HTTP HANDLER", "req", r)
args, err := httpParamsToArgs(rpcFunc, cdc, r)
ctx := &types.Context{HTTPReq: r}
args := []reflect.Value{reflect.ValueOf(ctx)}
fnArgs, err := httpParamsToArgs(rpcFunc, cdc, r)
if err != nil {
WriteRPCResponseHTTP(w, types.RPCInvalidParamsError(types.JSONRPCStringID(""), errors.Wrap(err, "Error converting http params to arguments")))
return
}
args = append(args, fnArgs...)
returns := rpcFunc.f.Call(args)
logger.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns)
result, err := unreflectResult(returns)
if err != nil {
@ -280,10 +280,13 @@ func makeHTTPHandler(rpcFunc *RPCFunc, cdc *amino.Codec, logger log.Logger) func
// Covert an http query to a list of properly typed values.
// To be properly decoded the arg must be a concrete type from tendermint (if its an interface).
func httpParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, r *http.Request) ([]reflect.Value, error) {
values := make([]reflect.Value, len(rpcFunc.args))
// skip types.Context
const argsOffset = 1
values := make([]reflect.Value, len(rpcFunc.argNames))
for i, name := range rpcFunc.argNames {
argType := rpcFunc.args[i]
argType := rpcFunc.args[i+argsOffset]
values[i] = reflect.Zero(argType) // set default for that type
@ -434,8 +437,8 @@ type wsConnection struct {
// Send pings to server with this period. Must be less than readWait, but greater than zero.
pingPeriod time.Duration
// object that is used to subscribe / unsubscribe from events
eventSub types.EventSubscriber
// callback which is called upon disconnect
onDisconnect func(remoteAddr string)
}
// NewWSConnection wraps websocket.Conn.
@ -468,12 +471,11 @@ func NewWSConnection(
return wsc
}
// EventSubscriber sets object that is used to subscribe / unsubscribe from
// events - not Goroutine-safe. If none given, default node's eventBus will be
// used.
func EventSubscriber(eventSub types.EventSubscriber) func(*wsConnection) {
// OnDisconnect sets a callback which is used upon disconnect - not
// Goroutine-safe. Nop by default.
func OnDisconnect(onDisconnect func(remoteAddr string)) func(*wsConnection) {
return func(wsc *wsConnection) {
wsc.eventSub = eventSub
wsc.onDisconnect = onDisconnect
}
}
@ -527,8 +529,8 @@ func (wsc *wsConnection) OnStop() {
// Both read and write loops close the websocket connection when they exit their loops.
// The writeChan is never closed, to allow WriteRPCResponse() to fail.
if wsc.eventSub != nil {
wsc.eventSub.UnsubscribeAll(context.TODO(), wsc.remoteAddr)
if wsc.onDisconnect != nil {
wsc.onDisconnect(wsc.remoteAddr)
}
}
@ -538,11 +540,6 @@ func (wsc *wsConnection) GetRemoteAddr() string {
return wsc.remoteAddr
}
// GetEventSubscriber implements WSRPCConnection by returning event subscriber.
func (wsc *wsConnection) GetEventSubscriber() types.EventSubscriber {
return wsc.eventSub
}
// WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted.
// It implements WSRPCConnection. It is Goroutine-safe.
func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {
@ -628,27 +625,23 @@ func (wsc *wsConnection) readRoutine() {
}
// Now, fetch the RPCFunc and execute it.
rpcFunc := wsc.funcMap[request.Method]
if rpcFunc == nil {
wsc.WriteRPCResponse(types.RPCMethodNotFoundError(request.ID))
continue
}
var args []reflect.Value
if rpcFunc.ws {
wsCtx := types.WSRPCContext{Request: request, WSRPCConnection: wsc}
if len(request.Params) > 0 {
args, err = jsonParamsToArgsWS(rpcFunc, wsc.cdc, request.Params, wsCtx)
}
} else {
if len(request.Params) > 0 {
args, err = jsonParamsToArgsRPC(rpcFunc, wsc.cdc, request.Params)
ctx := &types.Context{JSONReq: &request, WSConn: wsc}
args := []reflect.Value{reflect.ValueOf(ctx)}
if len(request.Params) > 0 {
fnArgs, err := jsonParamsToArgs(rpcFunc, wsc.cdc, request.Params)
if err != nil {
wsc.WriteRPCResponse(types.RPCInternalError(request.ID, errors.Wrap(err, "Error converting json params to arguments")))
continue
}
args = append(args, fnArgs...)
}
if err != nil {
wsc.WriteRPCResponse(types.RPCInternalError(request.ID, errors.Wrap(err, "Error converting json params to arguments")))
continue
}
returns := rpcFunc.f.Call(args)
// TODO: Need to encode args/returns to string if we want to log them


+ 2
- 2
rpc/lib/server/handlers_test.go View File

@ -28,7 +28,7 @@ import (
func testMux() *http.ServeMux {
funcMap := map[string]*rs.RPCFunc{
"c": rs.NewRPCFunc(func(s string, i int) (string, error) { return "foo", nil }, "s,i"),
"c": rs.NewRPCFunc(func(ctx *types.Context, s string, i int) (string, error) { return "foo", nil }, "s,i"),
}
cdc := amino.NewCodec()
mux := http.NewServeMux()
@ -195,7 +195,7 @@ func TestWebsocketManagerHandler(t *testing.T) {
func newWSServer() *httptest.Server {
funcMap := map[string]*rs.RPCFunc{
"c": rs.NewWSRPCFunc(func(wsCtx types.WSRPCContext, s string, i int) (string, error) { return "foo", nil }, "s,i"),
"c": rs.NewWSRPCFunc(func(ctx *types.Context, s string, i int) (string, error) { return "foo", nil }, "s,i"),
}
wm := rs.NewWebsocketManager(funcMap, amino.NewCodec())
wm.SetLogger(log.TestingLogger())


+ 42
- 48
rpc/lib/server/parse_test.go View File

@ -10,24 +10,23 @@ import (
"github.com/stretchr/testify/assert"
amino "github.com/tendermint/go-amino"
cmn "github.com/tendermint/tendermint/libs/common"
types "github.com/tendermint/tendermint/rpc/lib/types"
)
func TestParseJSONMap(t *testing.T) {
assert := assert.New(t)
input := []byte(`{"value":"1234","height":22}`)
// naive is float,string
var p1 map[string]interface{}
err := json.Unmarshal(input, &p1)
if assert.Nil(err) {
if assert.Nil(t, err) {
h, ok := p1["height"].(float64)
if assert.True(ok, "%#v", p1["height"]) {
assert.EqualValues(22, h)
if assert.True(t, ok, "%#v", p1["height"]) {
assert.EqualValues(t, 22, h)
}
v, ok := p1["value"].(string)
if assert.True(ok, "%#v", p1["value"]) {
assert.EqualValues("1234", v)
if assert.True(t, ok, "%#v", p1["value"]) {
assert.EqualValues(t, "1234", v)
}
}
@ -38,14 +37,14 @@ func TestParseJSONMap(t *testing.T) {
"height": &tmp,
}
err = json.Unmarshal(input, &p2)
if assert.Nil(err) {
if assert.Nil(t, err) {
h, ok := p2["height"].(float64)
if assert.True(ok, "%#v", p2["height"]) {
assert.EqualValues(22, h)
if assert.True(t, ok, "%#v", p2["height"]) {
assert.EqualValues(t, 22, h)
}
v, ok := p2["value"].(string)
if assert.True(ok, "%#v", p2["value"]) {
assert.EqualValues("1234", v)
if assert.True(t, ok, "%#v", p2["value"]) {
assert.EqualValues(t, "1234", v)
}
}
@ -60,14 +59,14 @@ func TestParseJSONMap(t *testing.T) {
Value: &cmn.HexBytes{},
}
err = json.Unmarshal(input, &p3)
if assert.Nil(err) {
if assert.Nil(t, err) {
h, ok := p3.Height.(*int)
if assert.True(ok, "%#v", p3.Height) {
assert.Equal(22, *h)
if assert.True(t, ok, "%#v", p3.Height) {
assert.Equal(t, 22, *h)
}
v, ok := p3.Value.(*cmn.HexBytes)
if assert.True(ok, "%#v", p3.Value) {
assert.EqualValues([]byte{0x12, 0x34}, *v)
if assert.True(t, ok, "%#v", p3.Value) {
assert.EqualValues(t, []byte{0x12, 0x34}, *v)
}
}
@ -77,46 +76,44 @@ func TestParseJSONMap(t *testing.T) {
Height int `json:"height"`
}{}
err = json.Unmarshal(input, &p4)
if assert.Nil(err) {
assert.EqualValues(22, p4.Height)
assert.EqualValues([]byte{0x12, 0x34}, p4.Value)
if assert.Nil(t, err) {
assert.EqualValues(t, 22, p4.Height)
assert.EqualValues(t, []byte{0x12, 0x34}, p4.Value)
}
// so, let's use this trick...
// dynamic keys on map, and we can deserialize to the desired types
var p5 map[string]*json.RawMessage
err = json.Unmarshal(input, &p5)
if assert.Nil(err) {
if assert.Nil(t, err) {
var h int
err = json.Unmarshal(*p5["height"], &h)
if assert.Nil(err) {
assert.Equal(22, h)
if assert.Nil(t, err) {
assert.Equal(t, 22, h)
}
var v cmn.HexBytes
err = json.Unmarshal(*p5["value"], &v)
if assert.Nil(err) {
assert.Equal(cmn.HexBytes{0x12, 0x34}, v)
if assert.Nil(t, err) {
assert.Equal(t, cmn.HexBytes{0x12, 0x34}, v)
}
}
}
func TestParseJSONArray(t *testing.T) {
assert := assert.New(t)
input := []byte(`["1234",22]`)
// naive is float,string
var p1 []interface{}
err := json.Unmarshal(input, &p1)
if assert.Nil(err) {
if assert.Nil(t, err) {
v, ok := p1[0].(string)
if assert.True(ok, "%#v", p1[0]) {
assert.EqualValues("1234", v)
if assert.True(t, ok, "%#v", p1[0]) {
assert.EqualValues(t, "1234", v)
}
h, ok := p1[1].(float64)
if assert.True(ok, "%#v", p1[1]) {
assert.EqualValues(22, h)
if assert.True(t, ok, "%#v", p1[1]) {
assert.EqualValues(t, 22, h)
}
}
@ -124,22 +121,20 @@ func TestParseJSONArray(t *testing.T) {
tmp := 0
p2 := []interface{}{&cmn.HexBytes{}, &tmp}
err = json.Unmarshal(input, &p2)
if assert.Nil(err) {
if assert.Nil(t, err) {
v, ok := p2[0].(*cmn.HexBytes)
if assert.True(ok, "%#v", p2[0]) {
assert.EqualValues([]byte{0x12, 0x34}, *v)
if assert.True(t, ok, "%#v", p2[0]) {
assert.EqualValues(t, []byte{0x12, 0x34}, *v)
}
h, ok := p2[1].(*int)
if assert.True(ok, "%#v", p2[1]) {
assert.EqualValues(22, *h)
if assert.True(t, ok, "%#v", p2[1]) {
assert.EqualValues(t, 22, *h)
}
}
}
func TestParseJSONRPC(t *testing.T) {
assert := assert.New(t)
demo := func(height int, name string) {}
demo := func(ctx *types.Context, height int, name string) {}
call := NewRPCFunc(demo, "height,name")
cdc := amino.NewCodec()
@ -162,14 +157,14 @@ func TestParseJSONRPC(t *testing.T) {
for idx, tc := range cases {
i := strconv.Itoa(idx)
data := []byte(tc.raw)
vals, err := jsonParamsToArgs(call, cdc, data, 0)
vals, err := jsonParamsToArgs(call, cdc, data)
if tc.fail {
assert.NotNil(err, i)
assert.NotNil(t, err, i)
} else {
assert.Nil(err, "%s: %+v", i, err)
if assert.Equal(2, len(vals), i) {
assert.Equal(tc.height, vals[0].Int(), i)
assert.Equal(tc.name, vals[1].String(), i)
assert.Nil(t, err, "%s: %+v", i, err)
if assert.Equal(t, 2, len(vals), i) {
assert.Equal(t, tc.height, vals[0].Int(), i)
assert.Equal(t, tc.name, vals[1].String(), i)
}
}
@ -177,8 +172,7 @@ func TestParseJSONRPC(t *testing.T) {
}
func TestParseURI(t *testing.T) {
demo := func(height int, name string) {}
demo := func(ctx *types.Context, height int, name string) {}
call := NewRPCFunc(demo, "height,name")
cdc := amino.NewCodec()


+ 3
- 1
rpc/lib/test/main.go View File

@ -6,16 +6,18 @@ import (
"os"
amino "github.com/tendermint/go-amino"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
rpcserver "github.com/tendermint/tendermint/rpc/lib/server"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
)
var routes = map[string]*rpcserver.RPCFunc{
"hello_world": rpcserver.NewRPCFunc(HelloWorld, "name,num"),
}
func HelloWorld(name string, num int) (Result, error) {
func HelloWorld(ctx *rpctypes.Context, name string, num int) (Result, error) {
return Result{fmt.Sprintf("hi %s %d", name, num)}, nil
}


+ 29
- 15
rpc/lib/types/types.go View File

@ -1,18 +1,15 @@
package rpctypes
import (
"context"
"encoding/json"
"fmt"
"net/http"
"reflect"
"strings"
"github.com/pkg/errors"
amino "github.com/tendermint/go-amino"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmtypes "github.com/tendermint/tendermint/types"
)
// a wrapper to emulate a sum type: jsonrpcid = string | int
@ -236,30 +233,47 @@ func RPCServerError(id jsonrpcid, err error) RPCResponse {
//----------------------------------------
// *wsConnection implements this interface.
// WSRPCConnection represents a websocket connection.
type WSRPCConnection interface {
// GetRemoteAddr returns a remote address of the connection.
GetRemoteAddr() string
// WriteRPCResponse writes the resp onto connection (BLOCKING).
WriteRPCResponse(resp RPCResponse)
// TryWriteRPCResponse tries to write the resp onto connection (NON-BLOCKING).
TryWriteRPCResponse(resp RPCResponse) bool
GetEventSubscriber() EventSubscriber
// Codec returns an Amino codec used.
Codec() *amino.Codec
}
// websocket-only RPCFuncs take this as the first parameter.
type WSRPCContext struct {
Request RPCRequest
WSRPCConnection
// Context is the first parameter for all functions. It carries a json-rpc
// request, http request and websocket connection.
//
// - JSONReq is non-nil when JSONRPC is called over websocket or HTTP.
// - WSConn is non-nil when we're connected via a websocket.
// - HTTPReq is non-nil when URI or JSONRPC is called over HTTP.
type Context struct {
// json-rpc request
JSONReq *RPCRequest
// websocket connection
WSConn WSRPCConnection
// http request
HTTPReq *http.Request
}
// EventSubscriber mirrors tendermint/tendermint/types.EventBusSubscriber
type EventSubscriber interface {
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (tmtypes.Subscription, error)
Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
UnsubscribeAll(ctx context.Context, subscriber string) error
// RemoteAddr returns either HTTPReq#RemoteAddr or result of the
// WSConn#GetRemoteAddr().
func (ctx *Context) RemoteAddr() string {
if ctx.HTTPReq != nil {
return ctx.HTTPReq.RemoteAddr
} else if ctx.WSConn != nil {
return ctx.WSConn.GetRemoteAddr()
}
return ""
}
//----------------------------------------
// SOCKETS
//
// Determine if its a unix or tcp socket.
// If tcp, must specify the port; `0.0.0.0` will return incorrectly as "unix" since there's no port


+ 11
- 0
types/event_bus.go View File

@ -15,6 +15,9 @@ type EventBusSubscriber interface {
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (Subscription, error)
Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
UnsubscribeAll(ctx context.Context, subscriber string) error
NumClients() int
NumClientSubscriptions(clientID string) int
}
type Subscription interface {
@ -58,6 +61,14 @@ func (b *EventBus) OnStop() {
b.pubsub.Stop()
}
func (b *EventBus) NumClients() int {
return b.pubsub.NumClients()
}
func (b *EventBus) NumClientSubscriptions(clientID string) int {
return b.pubsub.NumClientSubscriptions(clientID)
}
func (b *EventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (Subscription, error) {
return b.pubsub.Subscribe(ctx, subscriber, query, outCapacity...)
}


Loading…
Cancel
Save