Browse Source

Merge pull request #3450 from tendermint/master

Merge master back to develop
pull/3452/head
Ethan Buchman 6 years ago
committed by GitHub
parent
commit
22bcfca87a
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 255 additions and 77 deletions
  1. +92
    -1
      CHANGELOG.md
  2. +1
    -30
      CHANGELOG_PENDING.md
  3. +41
    -0
      UPGRADING.md
  4. +3
    -4
      config/config.go
  5. +3
    -0
      config/toml.go
  6. +3
    -0
      docs/tendermint-core/configuration.md
  7. +4
    -2
      lite/proxy/proxy.go
  8. +14
    -3
      node/node.go
  9. +1
    -2
      rpc/core/events.go
  10. +1
    -1
      rpc/core/mempool.go
  11. +6
    -3
      rpc/core/pipe.go
  12. +5
    -4
      rpc/lib/rpc_test.go
  13. +18
    -0
      rpc/lib/server/handlers.go
  14. +22
    -15
      rpc/lib/server/http_server.go
  15. +8
    -4
      rpc/lib/server/http_server_test.go
  16. +3
    -2
      rpc/lib/test/main.go
  17. +25
    -2
      rpc/lib/types/types.go
  18. +3
    -2
      tools/tm-monitor/rpc.go
  19. +2
    -2
      version/version.go

+ 92
- 1
CHANGELOG.md View File

@ -1,5 +1,96 @@
# Changelog # Changelog
## v0.31.0
*March 16th, 2019*
Special thanks to external contributors on this release:
@danil-lashin, @guagualvcha, @siburu, @silasdavis, @srmo, @Stumble, @svenstaro
This release is primarily about the new pubsub implementation, dubbed `pubsub 2.0`, and related changes,
like configurable limits on the number of active RPC subscriptions at a time (`max_subscription_clients`).
Pubsub 2.0 is an improved version of the older pubsub that is non-blocking and has a nicer API.
Note the improved pubsub API also resulted in some improvements to the HTTPClient interface and the API for WebSocket subscriptions.
This release also adds a configurable limit to the mempool size (`max_txs_bytes`, default 1GB)
and a configurable timeout for the `/broadcast_tx_commit` endpoint.
See the [v0.31.0
Milestone](https://github.com/tendermint/tendermint/milestone/19?closed=1) for
more details.
Friendly reminder, we have a [bug bounty
program](https://hackerone.com/tendermint).
### BREAKING CHANGES:
* CLI/RPC/Config
- [config] [\#2920](https://github.com/tendermint/tendermint/issues/2920) Remove `consensus.blocktime_iota` parameter
- [rpc] [\#3227](https://github.com/tendermint/tendermint/issues/3227) New PubSub design does not block on clients when publishing
messages. Slow clients may miss messages and receive an error, terminating
the subscription.
- [rpc] [\#3269](https://github.com/tendermint/tendermint/issues/2826) Limit number of unique clientIDs with open subscriptions. Configurable via `rpc.max_subscription_clients`
- [rpc] [\#3269](https://github.com/tendermint/tendermint/issues/2826) Limit number of unique queries a given client can subscribe to at once. Configurable via `rpc.max_subscriptions_per_client`.
- [rpc] [\#3435](https://github.com/tendermint/tendermint/issues/3435) Default ReadTimeout and WriteTimeout changed to 10s. WriteTimeout can increased by setting `rpc.timeout_broadcast_tx_commit` in the config.
- [rpc/client] [\#3269](https://github.com/tendermint/tendermint/issues/3269) Update `EventsClient` interface to reflect new pubsub/eventBus API [ADR-33](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-033-pubsub.md). This includes `Subscribe`, `Unsubscribe`, and `UnsubscribeAll` methods.
* Apps
- [abci] [\#3403](https://github.com/tendermint/tendermint/issues/3403) Remove `time_iota_ms` from BlockParams. This is a
ConsensusParam but need not be exposed to the app for now.
- [abci] [\#2920](https://github.com/tendermint/tendermint/issues/2920) Rename `consensus_params.block_size` to `consensus_params.block` in ABCI ConsensusParams
* Go API
- [libs/common] TrapSignal accepts logger as a first parameter and does not block anymore
* previously it was dumping "captured ..." msg to os.Stdout
* TrapSignal should not be responsible for blocking thread of execution
- [libs/db] [\#3397](https://github.com/tendermint/tendermint/pull/3397) Add possibility to `Close()` `Batch` to prevent memory leak when using ClevelDB. (@Stumble)
- [types] [\#3354](https://github.com/tendermint/tendermint/issues/3354) Remove RoundState from EventDataRoundState
- [rpc] [\#3435](https://github.com/tendermint/tendermint/issues/3435) `StartHTTPServer` / `StartHTTPAndTLSServer` now require a Config (use `rpcserver.DefaultConfig`)
* Blockchain Protocol
* P2P Protocol
### FEATURES:
- [config] [\#3269](https://github.com/tendermint/tendermint/issues/2826) New configuration values for controlling RPC subscriptions:
- `rpc.max_subscription_clients` sets the maximum number of unique clients
with open subscriptions
- `rpc.max_subscriptions_per_client`sets the maximum number of unique
subscriptions from a given client
- `rpc.timeout_broadcast_tx_commit` sets the time to wait for a tx to be committed during `/broadcast_tx_commit`
- [types] [\#2920](https://github.com/tendermint/tendermint/issues/2920) Add `time_iota_ms` to block's consensus parameters (not exposed to the application)
- [lite] [\#3269](https://github.com/tendermint/tendermint/issues/3269) Add `/unsubscribe_all` endpoint to unsubscribe from all events
- [mempool] [\#3079](https://github.com/tendermint/tendermint/issues/3079) Bound mempool memory usage via the `mempool.max_txs_bytes` configuration value. Set to 1GB by default. The mempool's current `txs_total_bytes` is exposed via `total_bytes` field in
`/num_unconfirmed_txs` and `/unconfirmed_txs` RPC endpoints.
### IMPROVEMENTS:
- [all] [\#3385](https://github.com/tendermint/tendermint/issues/3385), [\#3386](https://github.com/tendermint/tendermint/issues/3386) Various linting improvements
- [crypto] [\#3371](https://github.com/tendermint/tendermint/issues/3371) Copy in secp256k1 package from go-ethereum instead of importing
go-ethereum (@silasdavis)
- [deps] [\#3382](https://github.com/tendermint/tendermint/issues/3382) Don't pin repos without releases
- [deps] [\#3357](https://github.com/tendermint/tendermint/issues/3357), [\#3389](https://github.com/tendermint/tendermint/issues/3389), [\#3392](https://github.com/tendermint/tendermint/issues/3392) Update gogo/protobuf, golang/protobuf, levigo, golang.org/x/crypto
- [libs/common] [\#3238](https://github.com/tendermint/tendermint/issues/3238) exit with zero (0) code upon receiving SIGTERM/SIGINT
- [libs/db] [\#3378](https://github.com/tendermint/tendermint/issues/3378) CLevelDB#Stats now returns the following properties:
- leveldb.num-files-at-level{n}
- leveldb.stats
- leveldb.sstables
- leveldb.blockpool
- leveldb.cachedblock
- leveldb.openedtables
- leveldb.alivesnaps
- leveldb.aliveiters
- [privval] [\#3351](https://github.com/tendermint/tendermint/pull/3351) First part of larger refactoring that clarifies and separates concerns in the privval package.
### BUG FIXES:
- [blockchain] [\#3358](https://github.com/tendermint/tendermint/pull/3358) Fix timer leak in `BlockPool` (@guagualvcha)
- [cmd] [\#3408](https://github.com/tendermint/tendermint/issues/3408) Fix `testnet` command's panic when creating non-validator configs (using `--n` flag) (@srmo)
- [libs/db/remotedb/grpcdb] [\#3402](https://github.com/tendermint/tendermint/issues/3402) Close Iterator/ReverseIterator after use
- [libs/pubsub] [\#951](https://github.com/tendermint/tendermint/issues/951), [\#1880](https://github.com/tendermint/tendermint/issues/1880) Use non-blocking send when dispatching messages [ADR-33](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-033-pubsub.md)
- [lite] [\#3364](https://github.com/tendermint/tendermint/issues/3364) Fix `/validators` and `/abci_query` proxy endpoints
(@guagualvcha)
- [p2p/conn] [\#3347](https://github.com/tendermint/tendermint/issues/3347) Reject all-zero shared secrets in the Diffie-Hellman step of secret-connection
- [p2p] [\#3369](https://github.com/tendermint/tendermint/issues/3369) Do not panic when filter times out
- [p2p] [\#3359](https://github.com/tendermint/tendermint/pull/3359) Fix reconnecting report duplicate ID error due to race condition between adding peer to peerSet and starting it (@guagualvcha)
## v0.30.2 ## v0.30.2
*March 10th, 2019* *March 10th, 2019*
@ -14,7 +105,7 @@ fix here.
### BREAKING CHANGES: ### BREAKING CHANGES:
* Go API * Go API
- [libs/db] [\#3842](https://github.com/cosmos/cosmos-sdk/issues/3842) Add Close() method to Batch interface (@Stumble)
- [libs/db] [\#3842](https://github.com/cosmos/cosmos-sdk/issues/3842) Add Close() method to Batch interface (@Stumble)
### BUG FIXES: ### BUG FIXES:
- [libs/db] [\#3842](https://github.com/cosmos/cosmos-sdk/issues/3842) Fix CLevelDB memory leak (@Stumble) - [libs/db] [\#3842](https://github.com/cosmos/cosmos-sdk/issues/3842) Fix CLevelDB memory leak (@Stumble)


+ 1
- 30
CHANGELOG_PENDING.md View File

@ -1,50 +1,21 @@
## v0.31.0
## v0.32.0
** **
Special thanks to external contributors on this release:
@srmo
### BREAKING CHANGES: ### BREAKING CHANGES:
* CLI/RPC/Config * CLI/RPC/Config
- [rpc/client] Update Subscribe interface to reflect new pubsub/eventBus API [ADR-33](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-033-pubsub.md)
* Apps * Apps
* Go API * Go API
- [libs/common] TrapSignal accepts logger as a first parameter and does not block anymore
* previously it was dumping "captured ..." msg to os.Stdout
* TrapSignal should not be responsible for blocking thread of execution
* Blockchain Protocol * Blockchain Protocol
* P2P Protocol * P2P Protocol
### FEATURES: ### FEATURES:
- [mempool] \#3079 bound mempool memory usage (`mempool.max_txs_bytes` is set to 1GB by default; see config.toml)
mempool's current `txs_total_bytes` is exposed via `total_bytes` field in
`/num_unconfirmed_txs` and `/unconfirmed_txs` RPC endpoints.
- [config] \#2920 Remove `consensus.blocktime_iota` parameter
- [genesis] \#2920 Add `time_iota_ms` to block's consensus parameters (not exposed to the application)
- [genesis] \#2920 Rename `consensus_params.block_size` to `consensus_params.block`
- [lite] add `/unsubscribe_all` endpoint, which allows you to unsubscribe from all events
### IMPROVEMENTS: ### IMPROVEMENTS:
- [libs/common] \#3238 exit with zero (0) code upon receiving SIGTERM/SIGINT
- [libs/db] \#3378 CLevelDB#Stats now returns the following properties:
- leveldb.num-files-at-level{n}
- leveldb.stats
- leveldb.sstables
- leveldb.blockpool
- leveldb.cachedblock
- leveldb.openedtables
- leveldb.alivesnaps
- leveldb.aliveiters
### BUG FIXES: ### BUG FIXES:
- [p2p/conn] \#3347 Reject all-zero shared secrets in the Diffie-Hellman step of secret-connection
- [libs/pubsub] \#951, \#1880 use non-blocking send when dispatching messages [ADR-33](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-033-pubsub.md)
- [p2p] \#3369 do not panic when filter times out
- [cmd] \#3408 Fix `testnet` command's panic when creating non-validator configs (using `--n` flag) (@srmo)
- [libs/db/remotedb/grpcdb] \#3402 Close Iterator/ReverseIterator after use

+ 41
- 0
UPGRADING.md View File

@ -3,6 +3,47 @@
This guide provides steps to be followed when you upgrade your applications to This guide provides steps to be followed when you upgrade your applications to
a newer version of Tendermint Core. a newer version of Tendermint Core.
## v0.31.0
This release contains a breaking change to the behaviour of the pubsub system.
It also contains some minor breaking changes in the Go API and ABCI.
There are no changes to the block or p2p protocols, so v0.31.0 should work fine
with blockchains created from the v0.30 series.
### RPC
The pubsub no longer blocks on publishing. This may cause some WebSocket (WS) clients to stop working as expected.
If your WS client is not consuming events fast enough, Tendermint can terminate the subscription.
In this case, the WS client will receive an error with description:
```json
{
"jsonrpc": "2.0",
"id": "{ID}#event",
"error": {
"code": -32000,
"msg": "Server error",
"data": "subscription was cancelled (reason: client is not pulling messages fast enough)" // or "subscription was cancelled (reason: Tendermint exited)"
}
}
Additionally, there are now limits on the number of subscribers and
subscriptions that can be active at once. See the new
`rpc.max_subscription_clients` and `rpc.max_subscriptions_per_client` values to
configure this.
```
### Applications
Simple rename of `ConsensusParams.BlockSize` to `ConsensusParams.Block`.
The `ConsensusParams.Block.TimeIotaMS` field was also removed. It's configured
in the ConsensusParsm in genesis.
### Go API
See the [CHANGELOG](CHANGELOG.md). These are relatively straight forward.
## v0.30.0 ## v0.30.0
This release contains a breaking change to both the block and p2p protocols, This release contains a breaking change to both the block and p2p protocols,


+ 3
- 4
config/config.go View File

@ -7,7 +7,6 @@ import (
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
rpcserver "github.com/tendermint/tendermint/rpc/lib/server"
) )
const ( const (
@ -336,6 +335,9 @@ type RPCConfig struct {
MaxSubscriptionsPerClient int `mapstructure:"max_subscriptions_per_client"` MaxSubscriptionsPerClient int `mapstructure:"max_subscriptions_per_client"`
// How long to wait for a tx to be committed during /broadcast_tx_commit // How long to wait for a tx to be committed during /broadcast_tx_commit
// WARNING: Using a value larger than 10s will result in increasing the
// global HTTP write timeout, which applies to all connections and endpoints.
// See https://github.com/tendermint/tendermint/issues/3435
TimeoutBroadcastTxCommit time.Duration `mapstructure:"timeout_broadcast_tx_commit"` TimeoutBroadcastTxCommit time.Duration `mapstructure:"timeout_broadcast_tx_commit"`
} }
@ -385,9 +387,6 @@ func (cfg *RPCConfig) ValidateBasic() error {
if cfg.TimeoutBroadcastTxCommit < 0 { if cfg.TimeoutBroadcastTxCommit < 0 {
return errors.New("timeout_broadcast_tx_commit can't be negative") return errors.New("timeout_broadcast_tx_commit can't be negative")
} }
if cfg.TimeoutBroadcastTxCommit > rpcserver.WriteTimeout {
return fmt.Errorf("timeout_broadcast_tx_commit can't be greater than rpc server's write timeout: %v", rpcserver.WriteTimeout)
}
return nil return nil
} }


+ 3
- 0
config/toml.go View File

@ -176,6 +176,9 @@ max_subscription_clients = {{ .RPC.MaxSubscriptionClients }}
max_subscriptions_per_client = {{ .RPC.MaxSubscriptionsPerClient }} max_subscriptions_per_client = {{ .RPC.MaxSubscriptionsPerClient }}
# How long to wait for a tx to be committed during /broadcast_tx_commit. # How long to wait for a tx to be committed during /broadcast_tx_commit.
# WARNING: Using a value larger than 10s will result in increasing the
# global HTTP write timeout, which applies to all connections and endpoints.
# See https://github.com/tendermint/tendermint/issues/3435
timeout_broadcast_tx_commit = "{{ .RPC.TimeoutBroadcastTxCommit }}" timeout_broadcast_tx_commit = "{{ .RPC.TimeoutBroadcastTxCommit }}"
##### peer to peer configuration options ##### ##### peer to peer configuration options #####


+ 3
- 0
docs/tendermint-core/configuration.md View File

@ -122,6 +122,9 @@ max_subscription_clients = 100
max_subscriptions_per_client = 5 max_subscriptions_per_client = 5
# How long to wait for a tx to be committed during /broadcast_tx_commit. # How long to wait for a tx to be committed during /broadcast_tx_commit.
# WARNING: Using a value larger than 10s will result in increasing the
# global HTTP write timeout, which applies to all connections and endpoints.
# See https://github.com/tendermint/tendermint/issues/3435
timeout_broadcast_tx_commit = "10s" timeout_broadcast_tx_commit = "10s"
##### peer to peer configuration options ##### ##### peer to peer configuration options #####


+ 4
- 2
lite/proxy/proxy.go View File

@ -45,11 +45,13 @@ func StartProxy(c rpcclient.Client, listenAddr string, logger log.Logger, maxOpe
core.SetLogger(logger) core.SetLogger(logger)
mux.HandleFunc(wsEndpoint, wm.WebsocketHandler) mux.HandleFunc(wsEndpoint, wm.WebsocketHandler)
l, err := rpcserver.Listen(listenAddr, rpcserver.Config{MaxOpenConnections: maxOpenConnections})
config := rpcserver.DefaultConfig()
config.MaxOpenConnections = maxOpenConnections
l, err := rpcserver.Listen(listenAddr, config)
if err != nil { if err != nil {
return err return err
} }
return rpcserver.StartHTTPServer(l, mux, logger)
return rpcserver.StartHTTPServer(l, mux, logger, config)
} }
// RPCRoutes just routes everything to the given client, as if it were // RPCRoutes just routes everything to the given client, as if it were


+ 14
- 3
node/node.go View File

@ -689,9 +689,18 @@ func (n *Node) startRPC() ([]net.Listener, error) {
mux.HandleFunc("/websocket", wm.WebsocketHandler) mux.HandleFunc("/websocket", wm.WebsocketHandler)
rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger) rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger)
config := rpcserver.DefaultConfig()
config.MaxOpenConnections = n.config.RPC.MaxOpenConnections
// If necessary adjust global WriteTimeout to ensure it's greater than
// TimeoutBroadcastTxCommit.
// See https://github.com/tendermint/tendermint/issues/3435
if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit {
config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
}
listener, err := rpcserver.Listen( listener, err := rpcserver.Listen(
listenAddr, listenAddr,
rpcserver.Config{MaxOpenConnections: n.config.RPC.MaxOpenConnections},
config,
) )
if err != nil { if err != nil {
return nil, err return nil, err
@ -711,6 +720,7 @@ func (n *Node) startRPC() ([]net.Listener, error) {
listener, listener,
rootHandler, rootHandler,
rpcLogger, rpcLogger,
config,
) )
listeners[i] = listener listeners[i] = listener
} }
@ -718,8 +728,9 @@ func (n *Node) startRPC() ([]net.Listener, error) {
// we expose a simplified api over grpc for convenience to app devs // we expose a simplified api over grpc for convenience to app devs
grpcListenAddr := n.config.RPC.GRPCListenAddress grpcListenAddr := n.config.RPC.GRPCListenAddress
if grpcListenAddr != "" { if grpcListenAddr != "" {
listener, err := rpcserver.Listen(
grpcListenAddr, rpcserver.Config{MaxOpenConnections: n.config.RPC.GRPCMaxOpenConnections})
config := rpcserver.DefaultConfig()
config.MaxOpenConnections = n.config.RPC.MaxOpenConnections
listener, err := rpcserver.Listen(grpcListenAddr, config)
if err != nil { if err != nil {
return nil, err return nil, err
} }


+ 1
- 2
rpc/core/events.go View File

@ -105,8 +105,7 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to parse query") return nil, errors.Wrap(err, "failed to parse query")
} }
subCtx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
subCtx, cancel := context.WithTimeout(ctx.Context(), SubscribeTimeout)
defer cancel() defer cancel()
sub, err := eventBus.Subscribe(subCtx, addr, q) sub, err := eventBus.Subscribe(subCtx, addr, q)
if err != nil { if err != nil {


+ 1
- 1
rpc/core/mempool.go View File

@ -197,7 +197,7 @@ func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadc
} }
// Subscribe to tx being committed in block. // Subscribe to tx being committed in block.
subCtx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
subCtx, cancel := context.WithTimeout(ctx.Context(), SubscribeTimeout)
defer cancel() defer cancel()
q := types.EventQueryTxFor(tx) q := types.EventQueryTxFor(tx)
deliverTxSub, err := eventBus.Subscribe(subCtx, subscriber, q) deliverTxSub, err := eventBus.Subscribe(subCtx, subscriber, q)


+ 6
- 3
rpc/core/pipe.go View File

@ -1,6 +1,8 @@
package core package core
import ( import (
"time"
cfg "github.com/tendermint/tendermint/config" cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto"
@ -9,7 +11,6 @@ import (
mempl "github.com/tendermint/tendermint/mempool" mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
rpcserver "github.com/tendermint/tendermint/rpc/lib/server"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
@ -19,9 +20,11 @@ const (
// see README // see README
defaultPerPage = 30 defaultPerPage = 30
maxPerPage = 100 maxPerPage = 100
)
var subscribeTimeout = rpcserver.WriteTimeout / 2
// SubscribeTimeout is the maximum time we wait to subscribe for an event.
// must be less than the server's write timeout (see rpcserver.DefaultConfig)
SubscribeTimeout = 5 * time.Second
)
//---------------------------------------------- //----------------------------------------------
// These interfaces are used by RPC and must be thread safe // These interfaces are used by RPC and must be thread safe


+ 5
- 4
rpc/lib/rpc_test.go View File

@ -121,11 +121,12 @@ func setup() {
wm := server.NewWebsocketManager(Routes, RoutesCdc, server.ReadWait(5*time.Second), server.PingPeriod(1*time.Second)) wm := server.NewWebsocketManager(Routes, RoutesCdc, server.ReadWait(5*time.Second), server.PingPeriod(1*time.Second))
wm.SetLogger(tcpLogger) wm.SetLogger(tcpLogger)
mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler) mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
listener1, err := server.Listen(tcpAddr, server.Config{})
config := server.DefaultConfig()
listener1, err := server.Listen(tcpAddr, config)
if err != nil { if err != nil {
panic(err) panic(err)
} }
go server.StartHTTPServer(listener1, mux, tcpLogger)
go server.StartHTTPServer(listener1, mux, tcpLogger, config)
unixLogger := logger.With("socket", "unix") unixLogger := logger.With("socket", "unix")
mux2 := http.NewServeMux() mux2 := http.NewServeMux()
@ -133,11 +134,11 @@ func setup() {
wm = server.NewWebsocketManager(Routes, RoutesCdc) wm = server.NewWebsocketManager(Routes, RoutesCdc)
wm.SetLogger(unixLogger) wm.SetLogger(unixLogger)
mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler) mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
listener2, err := server.Listen(unixAddr, server.Config{})
listener2, err := server.Listen(unixAddr, config)
if err != nil { if err != nil {
panic(err) panic(err)
} }
go server.StartHTTPServer(listener2, mux2, unixLogger)
go server.StartHTTPServer(listener2, mux2, unixLogger, config)
// wait for servers to start // wait for servers to start
time.Sleep(time.Second * 2) time.Sleep(time.Second * 2)


+ 18
- 0
rpc/lib/server/handlers.go View File

@ -2,6 +2,7 @@ package rpcserver
import ( import (
"bytes" "bytes"
"context"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -439,6 +440,9 @@ type wsConnection struct {
// callback which is called upon disconnect // callback which is called upon disconnect
onDisconnect func(remoteAddr string) onDisconnect func(remoteAddr string)
ctx context.Context
cancel context.CancelFunc
} }
// NewWSConnection wraps websocket.Conn. // NewWSConnection wraps websocket.Conn.
@ -532,6 +536,10 @@ func (wsc *wsConnection) OnStop() {
if wsc.onDisconnect != nil { if wsc.onDisconnect != nil {
wsc.onDisconnect(wsc.remoteAddr) wsc.onDisconnect(wsc.remoteAddr)
} }
if wsc.ctx != nil {
wsc.cancel()
}
} }
// GetRemoteAddr returns the remote address of the underlying connection. // GetRemoteAddr returns the remote address of the underlying connection.
@ -569,6 +577,16 @@ func (wsc *wsConnection) Codec() *amino.Codec {
return wsc.cdc return wsc.cdc
} }
// Context returns the connection's context.
// The context is canceled when the client's connection closes.
func (wsc *wsConnection) Context() context.Context {
if wsc.ctx != nil {
return wsc.ctx
}
wsc.ctx, wsc.cancel = context.WithCancel(context.Background())
return wsc.ctx
}
// Read from the socket and subscribe to or unsubscribe from events // Read from the socket and subscribe to or unsubscribe from events
func (wsc *wsConnection) readRoutine() { func (wsc *wsConnection) readRoutine() {
defer func() { defer func() {


+ 22
- 15
rpc/lib/server/http_server.go View File

@ -18,9 +18,23 @@ import (
types "github.com/tendermint/tendermint/rpc/lib/types" types "github.com/tendermint/tendermint/rpc/lib/types"
) )
// Config is an RPC server configuration.
// Config is a RPC server configuration.
type Config struct { type Config struct {
// see netutil.LimitListener
MaxOpenConnections int MaxOpenConnections int
// mirrors http.Server#ReadTimeout
ReadTimeout time.Duration
// mirrors http.Server#WriteTimeout
WriteTimeout time.Duration
}
// DefaultConfig returns a default configuration.
func DefaultConfig() *Config {
return &Config{
MaxOpenConnections: 0, // unlimited
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
} }
const ( const (
@ -30,25 +44,17 @@ const (
// same as the net/http default // same as the net/http default
maxHeaderBytes = 1 << 20 maxHeaderBytes = 1 << 20
// Timeouts for reading/writing to the http connection.
// Public so handlers can read them -
// /broadcast_tx_commit has it's own timeout, which should
// be less than the WriteTimeout here.
// TODO: use a config instead.
ReadTimeout = 3 * time.Second
WriteTimeout = 20 * time.Second
) )
// StartHTTPServer takes a listener and starts an HTTP server with the given handler. // StartHTTPServer takes a listener and starts an HTTP server with the given handler.
// It wraps handler with RecoverAndLogHandler. // It wraps handler with RecoverAndLogHandler.
// NOTE: This function blocks - you may want to call it in a go-routine. // NOTE: This function blocks - you may want to call it in a go-routine.
func StartHTTPServer(listener net.Listener, handler http.Handler, logger log.Logger) error {
func StartHTTPServer(listener net.Listener, handler http.Handler, logger log.Logger, config *Config) error {
logger.Info(fmt.Sprintf("Starting RPC HTTP server on %s", listener.Addr())) logger.Info(fmt.Sprintf("Starting RPC HTTP server on %s", listener.Addr()))
s := &http.Server{ s := &http.Server{
Handler: RecoverAndLogHandler(maxBytesHandler{h: handler, n: maxBodyBytes}, logger), Handler: RecoverAndLogHandler(maxBytesHandler{h: handler, n: maxBodyBytes}, logger),
ReadTimeout: ReadTimeout,
WriteTimeout: WriteTimeout,
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
MaxHeaderBytes: maxHeaderBytes, MaxHeaderBytes: maxHeaderBytes,
} }
err := s.Serve(listener) err := s.Serve(listener)
@ -64,13 +70,14 @@ func StartHTTPAndTLSServer(
handler http.Handler, handler http.Handler,
certFile, keyFile string, certFile, keyFile string,
logger log.Logger, logger log.Logger,
config *Config,
) error { ) error {
logger.Info(fmt.Sprintf("Starting RPC HTTPS server on %s (cert: %q, key: %q)", logger.Info(fmt.Sprintf("Starting RPC HTTPS server on %s (cert: %q, key: %q)",
listener.Addr(), certFile, keyFile)) listener.Addr(), certFile, keyFile))
s := &http.Server{ s := &http.Server{
Handler: RecoverAndLogHandler(maxBytesHandler{h: handler, n: maxBodyBytes}, logger), Handler: RecoverAndLogHandler(maxBytesHandler{h: handler, n: maxBodyBytes}, logger),
ReadTimeout: ReadTimeout,
WriteTimeout: WriteTimeout,
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
MaxHeaderBytes: maxHeaderBytes, MaxHeaderBytes: maxHeaderBytes,
} }
err := s.ServeTLS(listener, certFile, keyFile) err := s.ServeTLS(listener, certFile, keyFile)
@ -180,7 +187,7 @@ func (h maxBytesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Listen starts a new net.Listener on the given address. // Listen starts a new net.Listener on the given address.
// It returns an error if the address is invalid or the call to Listen() fails. // It returns an error if the address is invalid or the call to Listen() fails.
func Listen(addr string, config Config) (listener net.Listener, err error) {
func Listen(addr string, config *Config) (listener net.Listener, err error) {
parts := strings.SplitN(addr, "://", 2) parts := strings.SplitN(addr, "://", 2)
if len(parts) != 2 { if len(parts) != 2 {
return nil, errors.Errorf( return nil, errors.Errorf(


+ 8
- 4
rpc/lib/server/http_server_test.go View File

@ -30,10 +30,12 @@ func TestMaxOpenConnections(t *testing.T) {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
fmt.Fprint(w, "some body") fmt.Fprint(w, "some body")
}) })
l, err := Listen("tcp://127.0.0.1:0", Config{MaxOpenConnections: max})
config := DefaultConfig()
config.MaxOpenConnections = max
l, err := Listen("tcp://127.0.0.1:0", config)
require.NoError(t, err) require.NoError(t, err)
defer l.Close() defer l.Close()
go StartHTTPServer(l, mux, log.TestingLogger())
go StartHTTPServer(l, mux, log.TestingLogger(), config)
// Make N GET calls to the server. // Make N GET calls to the server.
attempts := max * 2 attempts := max * 2
@ -64,15 +66,17 @@ func TestMaxOpenConnections(t *testing.T) {
} }
func TestStartHTTPAndTLSServer(t *testing.T) { func TestStartHTTPAndTLSServer(t *testing.T) {
config := DefaultConfig()
config.MaxOpenConnections = 1
// set up fixtures // set up fixtures
listenerAddr := "tcp://0.0.0.0:0" listenerAddr := "tcp://0.0.0.0:0"
listener, err := Listen(listenerAddr, Config{MaxOpenConnections: 1})
listener, err := Listen(listenerAddr, config)
require.NoError(t, err) require.NoError(t, err)
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {}) mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {})
// test failure // test failure
err = StartHTTPAndTLSServer(listener, mux, "", "", log.TestingLogger())
err = StartHTTPAndTLSServer(listener, mux, "", "", log.TestingLogger(), config)
require.IsType(t, (*os.PathError)(nil), err) require.IsType(t, (*os.PathError)(nil), err)
// TODO: test that starting the server can actually work // TODO: test that starting the server can actually work


+ 3
- 2
rpc/lib/test/main.go View File

@ -36,9 +36,10 @@ func main() {
cmn.TrapSignal(logger, func() {}) cmn.TrapSignal(logger, func() {})
rpcserver.RegisterRPCFuncs(mux, routes, cdc, logger) rpcserver.RegisterRPCFuncs(mux, routes, cdc, logger)
listener, err := rpcserver.Listen("0.0.0.0:8008", rpcserver.Config{})
config := rpcserver.DefaultConfig()
listener, err := rpcserver.Listen("0.0.0.0:8008", config)
if err != nil { if err != nil {
cmn.Exit(err.Error()) cmn.Exit(err.Error())
} }
rpcserver.StartHTTPServer(listener, mux, logger)
rpcserver.StartHTTPServer(listener, mux, logger, config)
} }

+ 25
- 2
rpc/lib/types/types.go View File

@ -1,6 +1,7 @@
package rpctypes package rpctypes
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
@ -243,6 +244,8 @@ type WSRPCConnection interface {
TryWriteRPCResponse(resp RPCResponse) bool TryWriteRPCResponse(resp RPCResponse) bool
// Codec returns an Amino codec used. // Codec returns an Amino codec used.
Codec() *amino.Codec Codec() *amino.Codec
// Context returns the connection's context.
Context() context.Context
} }
// Context is the first parameter for all functions. It carries a json-rpc // Context is the first parameter for all functions. It carries a json-rpc
@ -260,8 +263,12 @@ type Context struct {
HTTPReq *http.Request HTTPReq *http.Request
} }
// RemoteAddr returns either HTTPReq#RemoteAddr or result of the
// WSConn#GetRemoteAddr().
// RemoteAddr returns the remote address (usually a string "IP:port").
// If neither HTTPReq nor WSConn is set, an empty string is returned.
// HTTP:
// http.Request#RemoteAddr
// WS:
// result of GetRemoteAddr
func (ctx *Context) RemoteAddr() string { func (ctx *Context) RemoteAddr() string {
if ctx.HTTPReq != nil { if ctx.HTTPReq != nil {
return ctx.HTTPReq.RemoteAddr return ctx.HTTPReq.RemoteAddr
@ -271,6 +278,22 @@ func (ctx *Context) RemoteAddr() string {
return "" return ""
} }
// Context returns the request's context.
// The returned context is always non-nil; it defaults to the background context.
// HTTP:
// The context is canceled when the client's connection closes, the request
// is canceled (with HTTP/2), or when the ServeHTTP method returns.
// WS:
// The context is canceled when the client's connections closes.
func (ctx *Context) Context() context.Context {
if ctx.HTTPReq != nil {
return ctx.HTTPReq.Context()
} else if ctx.WSConn != nil {
return ctx.WSConn.Context()
}
return context.Background()
}
//---------------------------------------- //----------------------------------------
// SOCKETS // SOCKETS


+ 3
- 2
tools/tm-monitor/rpc.go View File

@ -17,11 +17,12 @@ func startRPC(listenAddr string, m *monitor.Monitor, logger log.Logger) net.List
wm := rpc.NewWebsocketManager(routes, nil) wm := rpc.NewWebsocketManager(routes, nil)
mux.HandleFunc("/websocket", wm.WebsocketHandler) mux.HandleFunc("/websocket", wm.WebsocketHandler)
rpc.RegisterRPCFuncs(mux, routes, cdc, logger) rpc.RegisterRPCFuncs(mux, routes, cdc, logger)
listener, err := rpc.Listen(listenAddr, rpc.Config{})
config := rpc.DefaultConfig()
listener, err := rpc.Listen(listenAddr, config)
if err != nil { if err != nil {
panic(err) panic(err)
} }
go rpc.StartHTTPServer(listener, mux, logger)
go rpc.StartHTTPServer(listener, mux, logger, config)
return listener return listener
} }


+ 2
- 2
version/version.go View File

@ -20,10 +20,10 @@ const (
// Must be a string because scripts like dist.sh read this file. // Must be a string because scripts like dist.sh read this file.
// XXX: Don't change the name of this variable or you will break // XXX: Don't change the name of this variable or you will break
// automation :) // automation :)
TMCoreSemVer = "0.30.2"
TMCoreSemVer = "0.31.0"
// ABCISemVer is the semantic version of the ABCI library // ABCISemVer is the semantic version of the ABCI library
ABCISemVer = "0.15.0"
ABCISemVer = "0.16.0"
ABCIVersion = ABCISemVer ABCIVersion = ABCISemVer
) )


Loading…
Cancel
Save