@ -1,33 +0,0 @@ | |||
package abciclient | |||
import ( | |||
"fmt" | |||
"github.com/tendermint/tendermint/abci/types" | |||
"github.com/tendermint/tendermint/libs/log" | |||
) | |||
// Creator creates new ABCI clients. | |||
type Creator func(log.Logger) (Client, error) | |||
// NewLocalCreator returns a Creator for the given app, | |||
// which will be running locally. | |||
func NewLocalCreator(app types.Application) Creator { | |||
return func(logger log.Logger) (Client, error) { | |||
return NewLocalClient(logger, app), nil | |||
} | |||
} | |||
// NewRemoteCreator returns a Creator for the given address (e.g. | |||
// "192.168.0.1") and transport (e.g. "tcp"). Set mustConnect to true if you | |||
// want the client to connect before reporting success. | |||
func NewRemoteCreator(logger log.Logger, addr, transport string, mustConnect bool) Creator { | |||
return func(log.Logger) (Client, error) { | |||
remoteApp, err := NewClient(logger, addr, transport, mustConnect) | |||
if err != nil { | |||
return nil, fmt.Errorf("failed to connect to proxy: %w", err) | |||
} | |||
return remoteApp, nil | |||
} | |||
} |
@ -1,85 +0,0 @@ | |||
package abciclient_test | |||
import ( | |||
"context" | |||
"fmt" | |||
"testing" | |||
"time" | |||
"math/rand" | |||
"github.com/stretchr/testify/assert" | |||
"github.com/stretchr/testify/require" | |||
abciclient "github.com/tendermint/tendermint/abci/client" | |||
"github.com/tendermint/tendermint/abci/server" | |||
"github.com/tendermint/tendermint/abci/types" | |||
"github.com/tendermint/tendermint/libs/log" | |||
"github.com/tendermint/tendermint/libs/service" | |||
) | |||
func TestProperSyncCalls(t *testing.T) { | |||
ctx, cancel := context.WithCancel(context.Background()) | |||
defer cancel() | |||
app := slowApp{} | |||
logger := log.NewNopLogger() | |||
_, c := setupClientServer(ctx, t, logger, app) | |||
resp := make(chan error, 1) | |||
go func() { | |||
rsp, err := c.FinalizeBlock(ctx, types.RequestFinalizeBlock{}) | |||
assert.NoError(t, err) | |||
assert.NoError(t, c.Flush(ctx)) | |||
assert.NotNil(t, rsp) | |||
select { | |||
case <-ctx.Done(): | |||
case resp <- c.Error(): | |||
} | |||
}() | |||
select { | |||
case <-time.After(time.Second): | |||
require.Fail(t, "No response arrived") | |||
case err, ok := <-resp: | |||
require.True(t, ok, "Must not close channel") | |||
assert.NoError(t, err, "This should return success") | |||
} | |||
} | |||
func setupClientServer( | |||
ctx context.Context, | |||
t *testing.T, | |||
logger log.Logger, | |||
app types.Application, | |||
) (service.Service, abciclient.Client) { | |||
t.Helper() | |||
// some port between 20k and 30k | |||
port := 20000 + rand.Int31()%10000 | |||
addr := fmt.Sprintf("localhost:%d", port) | |||
s, err := server.NewServer(logger, addr, "socket", app) | |||
require.NoError(t, err) | |||
require.NoError(t, s.Start(ctx)) | |||
t.Cleanup(s.Wait) | |||
c := abciclient.NewSocketClient(logger, addr, true) | |||
require.NoError(t, c.Start(ctx)) | |||
t.Cleanup(c.Wait) | |||
require.True(t, s.IsRunning()) | |||
require.True(t, c.IsRunning()) | |||
return s, c | |||
} | |||
type slowApp struct { | |||
types.BaseApplication | |||
} | |||
func (slowApp) FinalizeBlock(req types.RequestFinalizeBlock) types.ResponseFinalizeBlock { | |||
time.Sleep(200 * time.Millisecond) | |||
return types.ResponseFinalizeBlock{} | |||
} |
@ -1,10 +1,10 @@ | |||
echo hello | |||
info | |||
commit | |||
deliver_tx "abc" | |||
finalize_block "abc" | |||
info | |||
commit | |||
query "abc" | |||
deliver_tx "def=xyz" | |||
finalize_block "def=xyz" "ghi=123" | |||
commit | |||
query "def" |
@ -1,7 +1,7 @@ | |||
check_tx 0x00 | |||
check_tx 0xff | |||
deliver_tx 0x00 | |||
finalize_block 0x00 | |||
check_tx 0x00 | |||
deliver_tx 0x01 | |||
deliver_tx 0x04 | |||
finalize_block 0x01 | |||
finalize_block 0x04 | |||
info |
@ -0,0 +1,95 @@ | |||
--- | |||
order: 3 | |||
--- | |||
# PBTS | |||
This document provides an overview of the Proposer-Based Timestamp (PBTS) | |||
algorithm added to Tendermint in the v0.36 release. It outlines the core | |||
functionality as well as the parameters and constraints of the this algorithm. | |||
## Algorithm Overview | |||
The PBTS algorithm defines a way for a Tendermint blockchain to create block | |||
timestamps that are within a reasonable bound of the clocks of the validators on | |||
the network. This replaces the original BFTTime algorithm for timestamp | |||
assignment that relied on the timestamps included in precommit messages. | |||
## Algorithm Parameters | |||
The functionality of the PBTS algorithm is governed by two parameters within | |||
Tendermint. These two parameters are [consensus | |||
parameters](https://github.com/tendermint/tendermint/blob/master/spec/abci/apps.md#L291), | |||
meaning they are configured by the ABCI application and are expected to be the | |||
same across all nodes on the network. | |||
### `Precision` | |||
The `Precision` parameter configures the acceptable upper-bound of clock drift | |||
among all of the nodes on a Tendermint network. Any two nodes on a Tendermint | |||
network are expected to have clocks that differ by at most `Precision` | |||
milliseconds any given instant. | |||
### `MessageDelay` | |||
The `MessageDelay` parameter configures the acceptable upper-bound for | |||
transmitting a `Proposal` message from the proposer to _all_ of the validators | |||
on the network. | |||
Networks should choose as small a value for `MessageDelay` as is practical, | |||
provided it is large enough that messages can reach all participants with high | |||
probability given the number of participants and latency of their connections. | |||
## Algorithm Concepts | |||
### Block timestamps | |||
Each block produced by the Tendermint consensus engine contains a timestamp. | |||
The timestamp produced in each block is a meaningful representation of time that is | |||
useful for the protocols and applications built on top of Tendermint. | |||
The following protocols and application features require a reliable source of time: | |||
* Tendermint Light Clients [rely on correspondence between their known time](https://github.com/tendermint/tendermint/blob/master/spec/light-client/verification/README.md#definitions-1) and the block time for block verification. | |||
* Tendermint Evidence validity is determined [either in terms of heights or in terms of time](https://github.com/tendermint/tendermint/blob/master/spec/consensus/evidence.md#verification). | |||
* Unbonding of staked assets in the Cosmos Hub [occurs after a period of 21 | |||
days](https://github.com/cosmos/governance/blob/master/params-change/Staking.md#unbondingtime). | |||
* IBC packets can use either a [timestamp or a height to timeout packet | |||
delivery](https://docs.cosmos.network/v0.44/ibc/overview.html#acknowledgements) | |||
### Proposer Selects a Block Timestamp | |||
When the proposer node creates a new block proposal, the node reads the time | |||
from its local clock and uses this reading as the timestamp for the proposed | |||
block. | |||
### Timeliness | |||
When each validator on a Tendermint network receives a proposed block, it | |||
performs a series of checks to ensure that the block can be considered valid as | |||
a candidate to be the next block in the chain. | |||
The PBTS algorithm performs a validity check on the timestamp of proposed | |||
blocks. When a validator receives a proposal it ensures that the timestamp in | |||
the proposal is within a bound of the validator's local clock. Specifically, the | |||
algorithm checks that the timestamp is no more than `Precision` greater than the | |||
node's local clock and no less than `Precision` + `MessageDelay` behind than the | |||
node's local clock. This creates range of acceptable timestamps around the | |||
node's local time. If the timestamp is within this range, the PBTS algorithm | |||
considers the block **timely**. If a block is not **timely**, the node will | |||
issue a `nil` `prevote` for this block, signaling to the rest of the network | |||
that the node does not consider the block to be valid. | |||
### Clock Synchronization | |||
The PBTS algorithm requires the clocks of the validators on a Tendermint network | |||
are within `Precision` of each other. In practice, this means that validators | |||
should periodically synchronize to a reliable NTP server. Validators that drift | |||
too far away from the rest of the network will no longer propose blocks with | |||
valid timestamps. Additionally they will not view the timestamps of blocks | |||
proposed by their peers to be valid either. | |||
## See Also | |||
* [The PBTS specification](https://github.com/tendermint/tendermint/blob/master/spec/consensus/proposer-based-timestamp/README.md) | |||
contains all of the details of the algorithm. |
@ -1,3 +1,4 @@ | |||
master master | |||
v0.33.x v0.33 | |||
v0.34.x v0.34 | |||
v0.35.x v0.35 |
@ -1,249 +0,0 @@ | |||
package proxy | |||
import ( | |||
"context" | |||
"time" | |||
"github.com/go-kit/kit/metrics" | |||
abciclient "github.com/tendermint/tendermint/abci/client" | |||
"github.com/tendermint/tendermint/abci/types" | |||
) | |||
//go:generate ../../scripts/mockery_generate.sh AppConnConsensus|AppConnMempool|AppConnQuery|AppConnSnapshot | |||
//---------------------------------------------------------------------------------------- | |||
// Enforce which abci msgs can be sent on a connection at the type level | |||
type AppConnConsensus interface { | |||
Error() error | |||
InitChain(context.Context, types.RequestInitChain) (*types.ResponseInitChain, error) | |||
PrepareProposal(context.Context, types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) | |||
ProcessProposal(context.Context, types.RequestProcessProposal) (*types.ResponseProcessProposal, error) | |||
ExtendVote(context.Context, types.RequestExtendVote) (*types.ResponseExtendVote, error) | |||
VerifyVoteExtension(context.Context, types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) | |||
FinalizeBlock(context.Context, types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) | |||
Commit(context.Context) (*types.ResponseCommit, error) | |||
} | |||
type AppConnMempool interface { | |||
Error() error | |||
CheckTx(context.Context, types.RequestCheckTx) (*types.ResponseCheckTx, error) | |||
Flush(context.Context) error | |||
} | |||
type AppConnQuery interface { | |||
Error() error | |||
Echo(context.Context, string) (*types.ResponseEcho, error) | |||
Info(context.Context, types.RequestInfo) (*types.ResponseInfo, error) | |||
Query(context.Context, types.RequestQuery) (*types.ResponseQuery, error) | |||
} | |||
type AppConnSnapshot interface { | |||
Error() error | |||
ListSnapshots(context.Context, types.RequestListSnapshots) (*types.ResponseListSnapshots, error) | |||
OfferSnapshot(context.Context, types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) | |||
LoadSnapshotChunk(context.Context, types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) | |||
ApplySnapshotChunk(context.Context, types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) | |||
} | |||
//----------------------------------------------------------------------------------------- | |||
// Implements AppConnConsensus (subset of abciclient.Client) | |||
type appConnConsensus struct { | |||
metrics *Metrics | |||
appConn abciclient.Client | |||
} | |||
var _ AppConnConsensus = (*appConnConsensus)(nil) | |||
func NewAppConnConsensus(appConn abciclient.Client, metrics *Metrics) AppConnConsensus { | |||
return &appConnConsensus{ | |||
metrics: metrics, | |||
appConn: appConn, | |||
} | |||
} | |||
func (app *appConnConsensus) Error() error { | |||
return app.appConn.Error() | |||
} | |||
func (app *appConnConsensus) InitChain( | |||
ctx context.Context, | |||
req types.RequestInitChain, | |||
) (*types.ResponseInitChain, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "init_chain", "type", "sync"))() | |||
return app.appConn.InitChain(ctx, req) | |||
} | |||
func (app *appConnConsensus) PrepareProposal( | |||
ctx context.Context, | |||
req types.RequestPrepareProposal, | |||
) (*types.ResponsePrepareProposal, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "prepare_proposal", "type", "sync"))() | |||
return app.appConn.PrepareProposal(ctx, req) | |||
} | |||
func (app *appConnConsensus) ProcessProposal( | |||
ctx context.Context, | |||
req types.RequestProcessProposal, | |||
) (*types.ResponseProcessProposal, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "process_proposal", "type", "sync"))() | |||
return app.appConn.ProcessProposal(ctx, req) | |||
} | |||
func (app *appConnConsensus) ExtendVote( | |||
ctx context.Context, | |||
req types.RequestExtendVote, | |||
) (*types.ResponseExtendVote, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "extend_vote", "type", "sync"))() | |||
return app.appConn.ExtendVote(ctx, req) | |||
} | |||
func (app *appConnConsensus) VerifyVoteExtension( | |||
ctx context.Context, | |||
req types.RequestVerifyVoteExtension, | |||
) (*types.ResponseVerifyVoteExtension, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "verify_vote_extension", "type", "sync"))() | |||
return app.appConn.VerifyVoteExtension(ctx, req) | |||
} | |||
func (app *appConnConsensus) FinalizeBlock( | |||
ctx context.Context, | |||
req types.RequestFinalizeBlock, | |||
) (*types.ResponseFinalizeBlock, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "finalize_block", "type", "sync"))() | |||
return app.appConn.FinalizeBlock(ctx, req) | |||
} | |||
func (app *appConnConsensus) Commit(ctx context.Context) (*types.ResponseCommit, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "commit", "type", "sync"))() | |||
return app.appConn.Commit(ctx) | |||
} | |||
//------------------------------------------------ | |||
// Implements AppConnMempool (subset of abciclient.Client) | |||
type appConnMempool struct { | |||
metrics *Metrics | |||
appConn abciclient.Client | |||
} | |||
func NewAppConnMempool(appConn abciclient.Client, metrics *Metrics) AppConnMempool { | |||
return &appConnMempool{ | |||
metrics: metrics, | |||
appConn: appConn, | |||
} | |||
} | |||
func (app *appConnMempool) Error() error { | |||
return app.appConn.Error() | |||
} | |||
func (app *appConnMempool) Flush(ctx context.Context) error { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "flush", "type", "sync"))() | |||
return app.appConn.Flush(ctx) | |||
} | |||
func (app *appConnMempool) CheckTx(ctx context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "sync"))() | |||
return app.appConn.CheckTx(ctx, req) | |||
} | |||
//------------------------------------------------ | |||
// Implements AppConnQuery (subset of abciclient.Client) | |||
type appConnQuery struct { | |||
metrics *Metrics | |||
appConn abciclient.Client | |||
} | |||
func NewAppConnQuery(appConn abciclient.Client, metrics *Metrics) AppConnQuery { | |||
return &appConnQuery{ | |||
metrics: metrics, | |||
appConn: appConn, | |||
} | |||
} | |||
func (app *appConnQuery) Error() error { | |||
return app.appConn.Error() | |||
} | |||
func (app *appConnQuery) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "echo", "type", "sync"))() | |||
return app.appConn.Echo(ctx, msg) | |||
} | |||
func (app *appConnQuery) Info(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "info", "type", "sync"))() | |||
return app.appConn.Info(ctx, req) | |||
} | |||
func (app *appConnQuery) Query(ctx context.Context, reqQuery types.RequestQuery) (*types.ResponseQuery, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "query", "type", "sync"))() | |||
return app.appConn.Query(ctx, reqQuery) | |||
} | |||
//------------------------------------------------ | |||
// Implements AppConnSnapshot (subset of abciclient.Client) | |||
type appConnSnapshot struct { | |||
metrics *Metrics | |||
appConn abciclient.Client | |||
} | |||
func NewAppConnSnapshot(appConn abciclient.Client, metrics *Metrics) AppConnSnapshot { | |||
return &appConnSnapshot{ | |||
metrics: metrics, | |||
appConn: appConn, | |||
} | |||
} | |||
func (app *appConnSnapshot) Error() error { | |||
return app.appConn.Error() | |||
} | |||
func (app *appConnSnapshot) ListSnapshots( | |||
ctx context.Context, | |||
req types.RequestListSnapshots, | |||
) (*types.ResponseListSnapshots, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "list_snapshots", "type", "sync"))() | |||
return app.appConn.ListSnapshots(ctx, req) | |||
} | |||
func (app *appConnSnapshot) OfferSnapshot( | |||
ctx context.Context, | |||
req types.RequestOfferSnapshot, | |||
) (*types.ResponseOfferSnapshot, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "offer_snapshot", "type", "sync"))() | |||
return app.appConn.OfferSnapshot(ctx, req) | |||
} | |||
func (app *appConnSnapshot) LoadSnapshotChunk( | |||
ctx context.Context, | |||
req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "load_snapshot_chunk", "type", "sync"))() | |||
return app.appConn.LoadSnapshotChunk(ctx, req) | |||
} | |||
func (app *appConnSnapshot) ApplySnapshotChunk( | |||
ctx context.Context, | |||
req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "apply_snapshot_chunk", "type", "sync"))() | |||
return app.appConn.ApplySnapshotChunk(ctx, req) | |||
} | |||
// addTimeSample returns a function that, when called, adds an observation to m. | |||
// The observation added to m is the number of seconds ellapsed since addTimeSample | |||
// was initially called. addTimeSample is meant to be called in a defer to calculate | |||
// the amount of time a function takes to complete. | |||
func addTimeSample(m metrics.Histogram) func() { | |||
start := time.Now() | |||
return func() { m.Observe(time.Since(start).Seconds()) } | |||
} |
@ -1,42 +1,213 @@ | |||
package proxy | |||
import ( | |||
"context" | |||
"io" | |||
"os" | |||
"syscall" | |||
"time" | |||
"github.com/go-kit/kit/metrics" | |||
abciclient "github.com/tendermint/tendermint/abci/client" | |||
"github.com/tendermint/tendermint/abci/example/kvstore" | |||
"github.com/tendermint/tendermint/abci/types" | |||
"github.com/tendermint/tendermint/libs/log" | |||
"github.com/tendermint/tendermint/libs/service" | |||
e2e "github.com/tendermint/tendermint/test/e2e/app" | |||
) | |||
// DefaultClientCreator returns a default ClientCreator, which will create a | |||
// local client if addr is one of: 'kvstore', | |||
// 'persistent_kvstore', 'e2e', or 'noop', otherwise - a remote client. | |||
// ClientFactory returns a client object, which will create a local | |||
// client if addr is one of: 'kvstore', 'persistent_kvstore', 'e2e', | |||
// or 'noop', otherwise - a remote client. | |||
// | |||
// The Closer is a noop except for persistent_kvstore applications, | |||
// which will clean up the store. | |||
func DefaultClientCreator(logger log.Logger, addr, transport, dbDir string) (abciclient.Creator, io.Closer) { | |||
func ClientFactory(logger log.Logger, addr, transport, dbDir string) (abciclient.Client, io.Closer, error) { | |||
switch addr { | |||
case "kvstore": | |||
return abciclient.NewLocalCreator(kvstore.NewApplication()), noopCloser{} | |||
return abciclient.NewLocalClient(logger, kvstore.NewApplication()), noopCloser{}, nil | |||
case "persistent_kvstore": | |||
app := kvstore.NewPersistentKVStoreApplication(logger, dbDir) | |||
return abciclient.NewLocalCreator(app), app | |||
return abciclient.NewLocalClient(logger, app), app, nil | |||
case "e2e": | |||
app, err := e2e.NewApplication(e2e.DefaultConfig(dbDir)) | |||
if err != nil { | |||
panic(err) | |||
return nil, noopCloser{}, err | |||
} | |||
return abciclient.NewLocalCreator(app), noopCloser{} | |||
return abciclient.NewLocalClient(logger, app), noopCloser{}, nil | |||
case "noop": | |||
return abciclient.NewLocalCreator(types.NewBaseApplication()), noopCloser{} | |||
return abciclient.NewLocalClient(logger, types.NewBaseApplication()), noopCloser{}, nil | |||
default: | |||
mustConnect := false // loop retrying | |||
return abciclient.NewRemoteCreator(logger, addr, transport, mustConnect), noopCloser{} | |||
const mustConnect = false // loop retrying | |||
client, err := abciclient.NewClient(logger, addr, transport, mustConnect) | |||
if err != nil { | |||
return nil, noopCloser{}, err | |||
} | |||
return client, noopCloser{}, nil | |||
} | |||
} | |||
type noopCloser struct{} | |||
func (noopCloser) Close() error { return nil } | |||
// proxyClient provides the application connection. | |||
type proxyClient struct { | |||
service.BaseService | |||
logger log.Logger | |||
client abciclient.Client | |||
metrics *Metrics | |||
} | |||
// New creates a proxy application interface. | |||
func New(client abciclient.Client, logger log.Logger, metrics *Metrics) abciclient.Client { | |||
conn := &proxyClient{ | |||
logger: logger, | |||
metrics: metrics, | |||
client: client, | |||
} | |||
conn.BaseService = *service.NewBaseService(logger, "proxyClient", conn) | |||
return conn | |||
} | |||
func (app *proxyClient) OnStop() { tryCallStop(app.client) } | |||
func (app *proxyClient) Error() error { return app.client.Error() } | |||
func tryCallStop(client abciclient.Client) { | |||
if c, ok := client.(interface{ Stop() }); ok { | |||
c.Stop() | |||
} | |||
} | |||
func (app *proxyClient) OnStart(ctx context.Context) error { | |||
var err error | |||
defer func() { | |||
if err != nil { | |||
tryCallStop(app.client) | |||
} | |||
}() | |||
// Kill Tendermint if the ABCI application crashes. | |||
go func() { | |||
if !app.client.IsRunning() { | |||
return | |||
} | |||
app.client.Wait() | |||
if ctx.Err() != nil { | |||
return | |||
} | |||
if err := app.client.Error(); err != nil { | |||
app.logger.Error("client connection terminated. Did the application crash? Please restart tendermint", | |||
"err", err) | |||
if killErr := kill(); killErr != nil { | |||
app.logger.Error("Failed to kill this process - please do so manually", | |||
"err", killErr) | |||
} | |||
} | |||
}() | |||
return app.client.Start(ctx) | |||
} | |||
func kill() error { | |||
p, err := os.FindProcess(os.Getpid()) | |||
if err != nil { | |||
return err | |||
} | |||
return p.Signal(syscall.SIGABRT) | |||
} | |||
func (app *proxyClient) InitChain(ctx context.Context, req types.RequestInitChain) (*types.ResponseInitChain, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "init_chain", "type", "sync"))() | |||
return app.client.InitChain(ctx, req) | |||
} | |||
func (app *proxyClient) PrepareProposal(ctx context.Context, req types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "prepare_proposal", "type", "sync"))() | |||
return app.client.PrepareProposal(ctx, req) | |||
} | |||
func (app *proxyClient) ProcessProposal(ctx context.Context, req types.RequestProcessProposal) (*types.ResponseProcessProposal, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "process_proposal", "type", "sync"))() | |||
return app.client.ProcessProposal(ctx, req) | |||
} | |||
func (app *proxyClient) ExtendVote(ctx context.Context, req types.RequestExtendVote) (*types.ResponseExtendVote, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "extend_vote", "type", "sync"))() | |||
return app.client.ExtendVote(ctx, req) | |||
} | |||
func (app *proxyClient) VerifyVoteExtension(ctx context.Context, req types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "verify_vote_extension", "type", "sync"))() | |||
return app.client.VerifyVoteExtension(ctx, req) | |||
} | |||
func (app *proxyClient) FinalizeBlock(ctx context.Context, req types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "finalize_block", "type", "sync"))() | |||
return app.client.FinalizeBlock(ctx, req) | |||
} | |||
func (app *proxyClient) Commit(ctx context.Context) (*types.ResponseCommit, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "commit", "type", "sync"))() | |||
return app.client.Commit(ctx) | |||
} | |||
func (app *proxyClient) Flush(ctx context.Context) error { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "flush", "type", "sync"))() | |||
return app.client.Flush(ctx) | |||
} | |||
func (app *proxyClient) CheckTx(ctx context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "sync"))() | |||
return app.client.CheckTx(ctx, req) | |||
} | |||
func (app *proxyClient) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "echo", "type", "sync"))() | |||
return app.client.Echo(ctx, msg) | |||
} | |||
func (app *proxyClient) Info(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "info", "type", "sync"))() | |||
return app.client.Info(ctx, req) | |||
} | |||
func (app *proxyClient) Query(ctx context.Context, reqQuery types.RequestQuery) (*types.ResponseQuery, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "query", "type", "sync"))() | |||
return app.client.Query(ctx, reqQuery) | |||
} | |||
func (app *proxyClient) ListSnapshots(ctx context.Context, req types.RequestListSnapshots) (*types.ResponseListSnapshots, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "list_snapshots", "type", "sync"))() | |||
return app.client.ListSnapshots(ctx, req) | |||
} | |||
func (app *proxyClient) OfferSnapshot(ctx context.Context, req types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "offer_snapshot", "type", "sync"))() | |||
return app.client.OfferSnapshot(ctx, req) | |||
} | |||
func (app *proxyClient) LoadSnapshotChunk(ctx context.Context, req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "load_snapshot_chunk", "type", "sync"))() | |||
return app.client.LoadSnapshotChunk(ctx, req) | |||
} | |||
func (app *proxyClient) ApplySnapshotChunk(ctx context.Context, req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) { | |||
defer addTimeSample(app.metrics.MethodTiming.With("method", "apply_snapshot_chunk", "type", "sync"))() | |||
return app.client.ApplySnapshotChunk(ctx, req) | |||
} | |||
// addTimeSample returns a function that, when called, adds an observation to m. | |||
// The observation added to m is the number of seconds ellapsed since addTimeSample | |||
// was initially called. addTimeSample is meant to be called in a defer to calculate | |||
// the amount of time a function takes to complete. | |||
func addTimeSample(m metrics.Histogram) func() { | |||
start := time.Now() | |||
return func() { m.Observe(time.Since(start).Seconds()) } | |||
} |
@ -1,131 +0,0 @@ | |||
package proxy | |||
import ( | |||
"context" | |||
"os" | |||
"syscall" | |||
abciclient "github.com/tendermint/tendermint/abci/client" | |||
"github.com/tendermint/tendermint/libs/log" | |||
"github.com/tendermint/tendermint/libs/service" | |||
) | |||
// AppConns is the Tendermint's interface to the application that consists of | |||
// multiple connections. | |||
type AppConns interface { | |||
service.Service | |||
// Mempool connection | |||
Mempool() AppConnMempool | |||
// Consensus connection | |||
Consensus() AppConnConsensus | |||
// Query connection | |||
Query() AppConnQuery | |||
// Snapshot connection | |||
Snapshot() AppConnSnapshot | |||
} | |||
// NewAppConns calls NewMultiAppConn. | |||
func NewAppConns(clientCreator abciclient.Creator, logger log.Logger, metrics *Metrics) AppConns { | |||
return NewMultiAppConn(clientCreator, logger, metrics) | |||
} | |||
// multiAppConn implements AppConns. | |||
// | |||
// A multiAppConn is made of a few appConns and manages their underlying abci | |||
// clients. | |||
// TODO: on app restart, clients must reboot together | |||
type multiAppConn struct { | |||
service.BaseService | |||
logger log.Logger | |||
metrics *Metrics | |||
consensusConn AppConnConsensus | |||
mempoolConn AppConnMempool | |||
queryConn AppConnQuery | |||
snapshotConn AppConnSnapshot | |||
client stoppableClient | |||
clientCreator abciclient.Creator | |||
} | |||
// TODO: this is a totally internal and quasi permanent shim for | |||
// clients. eventually we can have a single client and have some kind | |||
// of reasonable lifecycle witout needing an explicit stop method. | |||
type stoppableClient interface { | |||
abciclient.Client | |||
Stop() | |||
} | |||
// NewMultiAppConn makes all necessary abci connections to the application. | |||
func NewMultiAppConn(clientCreator abciclient.Creator, logger log.Logger, metrics *Metrics) AppConns { | |||
multiAppConn := &multiAppConn{ | |||
logger: logger, | |||
metrics: metrics, | |||
clientCreator: clientCreator, | |||
} | |||
multiAppConn.BaseService = *service.NewBaseService(logger, "multiAppConn", multiAppConn) | |||
return multiAppConn | |||
} | |||
func (app *multiAppConn) Mempool() AppConnMempool { return app.mempoolConn } | |||
func (app *multiAppConn) Consensus() AppConnConsensus { return app.consensusConn } | |||
func (app *multiAppConn) Query() AppConnQuery { return app.queryConn } | |||
func (app *multiAppConn) Snapshot() AppConnSnapshot { return app.snapshotConn } | |||
func (app *multiAppConn) OnStart(ctx context.Context) error { | |||
var err error | |||
defer func() { | |||
if err != nil { | |||
app.client.Stop() | |||
} | |||
}() | |||
var client abciclient.Client | |||
client, err = app.clientCreator(app.logger) | |||
if err != nil { | |||
return err | |||
} | |||
app.queryConn = NewAppConnQuery(client, app.metrics) | |||
app.snapshotConn = NewAppConnSnapshot(client, app.metrics) | |||
app.mempoolConn = NewAppConnMempool(client, app.metrics) | |||
app.consensusConn = NewAppConnConsensus(client, app.metrics) | |||
app.client = client.(stoppableClient) | |||
// Kill Tendermint if the ABCI application crashes. | |||
go func() { | |||
if !client.IsRunning() { | |||
return | |||
} | |||
app.client.Wait() | |||
if ctx.Err() != nil { | |||
return | |||
} | |||
if err := app.client.Error(); err != nil { | |||
app.logger.Error("client connection terminated. Did the application crash? Please restart tendermint", | |||
"err", err) | |||
if killErr := kill(); killErr != nil { | |||
app.logger.Error("Failed to kill this process - please do so manually", | |||
"err", killErr) | |||
} | |||
} | |||
}() | |||
return client.Start(ctx) | |||
} | |||
func (app *multiAppConn) OnStop() { app.client.Stop() } | |||
func kill() error { | |||
p, err := os.FindProcess(os.Getpid()) | |||
if err != nil { | |||
return err | |||
} | |||
return p.Signal(syscall.SIGTERM) | |||
} |
@ -1,99 +0,0 @@ | |||
package proxy | |||
import ( | |||
"context" | |||
"errors" | |||
"os" | |||
"os/signal" | |||
"syscall" | |||
"testing" | |||
"time" | |||
"github.com/stretchr/testify/assert" | |||
"github.com/stretchr/testify/mock" | |||
"github.com/stretchr/testify/require" | |||
abciclient "github.com/tendermint/tendermint/abci/client" | |||
abcimocks "github.com/tendermint/tendermint/abci/client/mocks" | |||
"github.com/tendermint/tendermint/libs/log" | |||
) | |||
type noopStoppableClientImpl struct { | |||
abciclient.Client | |||
count int | |||
} | |||
func (c *noopStoppableClientImpl) Stop() { c.count++ } | |||
func TestAppConns_Start_Stop(t *testing.T) { | |||
ctx, cancel := context.WithCancel(context.Background()) | |||
defer cancel() | |||
clientMock := &abcimocks.Client{} | |||
clientMock.On("Start", mock.Anything).Return(nil) | |||
clientMock.On("Error").Return(nil) | |||
clientMock.On("IsRunning").Return(true) | |||
clientMock.On("Wait").Return(nil).Times(1) | |||
cl := &noopStoppableClientImpl{Client: clientMock} | |||
creatorCallCount := 0 | |||
creator := func(logger log.Logger) (abciclient.Client, error) { | |||
creatorCallCount++ | |||
return cl, nil | |||
} | |||
appConns := NewAppConns(creator, log.TestingLogger(), NopMetrics()) | |||
err := appConns.Start(ctx) | |||
require.NoError(t, err) | |||
time.Sleep(200 * time.Millisecond) | |||
cancel() | |||
appConns.Wait() | |||
clientMock.AssertExpectations(t) | |||
assert.Equal(t, 1, cl.count) | |||
assert.Equal(t, 1, creatorCallCount) | |||
} | |||
// Upon failure, we call tmos.Kill | |||
func TestAppConns_Failure(t *testing.T) { | |||
ok := make(chan struct{}) | |||
c := make(chan os.Signal, 1) | |||
signal.Notify(c, syscall.SIGTERM) | |||
go func() { | |||
for range c { | |||
close(ok) | |||
return | |||
} | |||
}() | |||
ctx, cancel := context.WithCancel(context.Background()) | |||
defer cancel() | |||
clientMock := &abcimocks.Client{} | |||
clientMock.On("SetLogger", mock.Anything).Return() | |||
clientMock.On("Start", mock.Anything).Return(nil) | |||
clientMock.On("IsRunning").Return(true) | |||
clientMock.On("Wait").Return(nil) | |||
clientMock.On("Error").Return(errors.New("EOF")) | |||
cl := &noopStoppableClientImpl{Client: clientMock} | |||
creator := func(log.Logger) (abciclient.Client, error) { | |||
return cl, nil | |||
} | |||
appConns := NewAppConns(creator, log.TestingLogger(), NopMetrics()) | |||
err := appConns.Start(ctx) | |||
require.NoError(t, err) | |||
t.Cleanup(func() { cancel(); appConns.Wait() }) | |||
select { | |||
case <-ok: | |||
t.Log("SIGTERM successfully received") | |||
case <-time.After(5 * time.Second): | |||
t.Fatal("expected process to receive SIGTERM signal") | |||
} | |||
} |