Browse Source

proxy: improve ABCI app connection handling (#5078)

Closes #5074 

Old code does not work when --consensus.create_empty_blocks=false
(because it only calls tmos.Kill when ApplyBlock fails). New code is
listening ABCI clients for Quit and kills TM process if there were any
errors.
pull/5097/head
Anton Kaliaev 4 years ago
committed by GitHub
parent
commit
16216028a1
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 979 additions and 53 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +736
    -0
      abci/client/mocks/client.go
  3. +1
    -5
      consensus/state.go
  4. +10
    -4
      proxy/client.go
  5. +36
    -0
      proxy/mocks/client_creator.go
  6. +110
    -44
      proxy/multi_app_conn.go
  7. +85
    -0
      proxy/multi_app_conn_test.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -124,3 +124,4 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- [blockchain/v2] Correctly set block store base in status responses (@erikgrinaker) - [blockchain/v2] Correctly set block store base in status responses (@erikgrinaker)
- [consensus] [\#4895](https://github.com/tendermint/tendermint/pull/4895) Cache the address of the validator to reduce querying a remote KMS (@joe-bowman) - [consensus] [\#4895](https://github.com/tendermint/tendermint/pull/4895) Cache the address of the validator to reduce querying a remote KMS (@joe-bowman)
- [consensus] \#4970 Stricter on `LastCommitRound` check (@cuonglm) - [consensus] \#4970 Stricter on `LastCommitRound` check (@cuonglm)
- [proxy] \#5078 Fix a bug, where TM does not exit when ABCI app crashes (@melekes)

+ 736
- 0
abci/client/mocks/client.go View File

@ -0,0 +1,736 @@
// Code generated by mockery v1.1.1. DO NOT EDIT.
package mocks
import (
abcicli "github.com/tendermint/tendermint/abci/client"
log "github.com/tendermint/tendermint/libs/log"
mock "github.com/stretchr/testify/mock"
types "github.com/tendermint/tendermint/abci/types"
)
// Client is an autogenerated mock type for the Client type
type Client struct {
mock.Mock
}
// ApplySnapshotChunkAsync provides a mock function with given fields: _a0
func (_m *Client) ApplySnapshotChunkAsync(_a0 types.RequestApplySnapshotChunk) *abcicli.ReqRes {
ret := _m.Called(_a0)
var r0 *abcicli.ReqRes
if rf, ok := ret.Get(0).(func(types.RequestApplySnapshotChunk) *abcicli.ReqRes); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*abcicli.ReqRes)
}
}
return r0
}
// ApplySnapshotChunkSync provides a mock function with given fields: _a0
func (_m *Client) ApplySnapshotChunkSync(_a0 types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
ret := _m.Called(_a0)
var r0 *types.ResponseApplySnapshotChunk
if rf, ok := ret.Get(0).(func(types.RequestApplySnapshotChunk) *types.ResponseApplySnapshotChunk); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ResponseApplySnapshotChunk)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(types.RequestApplySnapshotChunk) error); ok {
r1 = rf(_a0)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// BeginBlockAsync provides a mock function with given fields: _a0
func (_m *Client) BeginBlockAsync(_a0 types.RequestBeginBlock) *abcicli.ReqRes {
ret := _m.Called(_a0)
var r0 *abcicli.ReqRes
if rf, ok := ret.Get(0).(func(types.RequestBeginBlock) *abcicli.ReqRes); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*abcicli.ReqRes)
}
}
return r0
}
// BeginBlockSync provides a mock function with given fields: _a0
func (_m *Client) BeginBlockSync(_a0 types.RequestBeginBlock) (*types.ResponseBeginBlock, error) {
ret := _m.Called(_a0)
var r0 *types.ResponseBeginBlock
if rf, ok := ret.Get(0).(func(types.RequestBeginBlock) *types.ResponseBeginBlock); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ResponseBeginBlock)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(types.RequestBeginBlock) error); ok {
r1 = rf(_a0)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// CheckTxAsync provides a mock function with given fields: _a0
func (_m *Client) CheckTxAsync(_a0 types.RequestCheckTx) *abcicli.ReqRes {
ret := _m.Called(_a0)
var r0 *abcicli.ReqRes
if rf, ok := ret.Get(0).(func(types.RequestCheckTx) *abcicli.ReqRes); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*abcicli.ReqRes)
}
}
return r0
}
// CheckTxSync provides a mock function with given fields: _a0
func (_m *Client) CheckTxSync(_a0 types.RequestCheckTx) (*types.ResponseCheckTx, error) {
ret := _m.Called(_a0)
var r0 *types.ResponseCheckTx
if rf, ok := ret.Get(0).(func(types.RequestCheckTx) *types.ResponseCheckTx); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ResponseCheckTx)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(types.RequestCheckTx) error); ok {
r1 = rf(_a0)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// CommitAsync provides a mock function with given fields:
func (_m *Client) CommitAsync() *abcicli.ReqRes {
ret := _m.Called()
var r0 *abcicli.ReqRes
if rf, ok := ret.Get(0).(func() *abcicli.ReqRes); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*abcicli.ReqRes)
}
}
return r0
}
// CommitSync provides a mock function with given fields:
func (_m *Client) CommitSync() (*types.ResponseCommit, error) {
ret := _m.Called()
var r0 *types.ResponseCommit
if rf, ok := ret.Get(0).(func() *types.ResponseCommit); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ResponseCommit)
}
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// DeliverTxAsync provides a mock function with given fields: _a0
func (_m *Client) DeliverTxAsync(_a0 types.RequestDeliverTx) *abcicli.ReqRes {
ret := _m.Called(_a0)
var r0 *abcicli.ReqRes
if rf, ok := ret.Get(0).(func(types.RequestDeliverTx) *abcicli.ReqRes); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*abcicli.ReqRes)
}
}
return r0
}
// DeliverTxSync provides a mock function with given fields: _a0
func (_m *Client) DeliverTxSync(_a0 types.RequestDeliverTx) (*types.ResponseDeliverTx, error) {
ret := _m.Called(_a0)
var r0 *types.ResponseDeliverTx
if rf, ok := ret.Get(0).(func(types.RequestDeliverTx) *types.ResponseDeliverTx); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ResponseDeliverTx)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(types.RequestDeliverTx) error); ok {
r1 = rf(_a0)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// EchoAsync provides a mock function with given fields: msg
func (_m *Client) EchoAsync(msg string) *abcicli.ReqRes {
ret := _m.Called(msg)
var r0 *abcicli.ReqRes
if rf, ok := ret.Get(0).(func(string) *abcicli.ReqRes); ok {
r0 = rf(msg)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*abcicli.ReqRes)
}
}
return r0
}
// EchoSync provides a mock function with given fields: msg
func (_m *Client) EchoSync(msg string) (*types.ResponseEcho, error) {
ret := _m.Called(msg)
var r0 *types.ResponseEcho
if rf, ok := ret.Get(0).(func(string) *types.ResponseEcho); ok {
r0 = rf(msg)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ResponseEcho)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(msg)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// EndBlockAsync provides a mock function with given fields: _a0
func (_m *Client) EndBlockAsync(_a0 types.RequestEndBlock) *abcicli.ReqRes {
ret := _m.Called(_a0)
var r0 *abcicli.ReqRes
if rf, ok := ret.Get(0).(func(types.RequestEndBlock) *abcicli.ReqRes); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*abcicli.ReqRes)
}
}
return r0
}
// EndBlockSync provides a mock function with given fields: _a0
func (_m *Client) EndBlockSync(_a0 types.RequestEndBlock) (*types.ResponseEndBlock, error) {
ret := _m.Called(_a0)
var r0 *types.ResponseEndBlock
if rf, ok := ret.Get(0).(func(types.RequestEndBlock) *types.ResponseEndBlock); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ResponseEndBlock)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(types.RequestEndBlock) error); ok {
r1 = rf(_a0)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Error provides a mock function with given fields:
func (_m *Client) Error() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// FlushAsync provides a mock function with given fields:
func (_m *Client) FlushAsync() *abcicli.ReqRes {
ret := _m.Called()
var r0 *abcicli.ReqRes
if rf, ok := ret.Get(0).(func() *abcicli.ReqRes); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*abcicli.ReqRes)
}
}
return r0
}
// FlushSync provides a mock function with given fields:
func (_m *Client) FlushSync() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// InfoAsync provides a mock function with given fields: _a0
func (_m *Client) InfoAsync(_a0 types.RequestInfo) *abcicli.ReqRes {
ret := _m.Called(_a0)
var r0 *abcicli.ReqRes
if rf, ok := ret.Get(0).(func(types.RequestInfo) *abcicli.ReqRes); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*abcicli.ReqRes)
}
}
return r0
}
// InfoSync provides a mock function with given fields: _a0
func (_m *Client) InfoSync(_a0 types.RequestInfo) (*types.ResponseInfo, error) {
ret := _m.Called(_a0)
var r0 *types.ResponseInfo
if rf, ok := ret.Get(0).(func(types.RequestInfo) *types.ResponseInfo); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ResponseInfo)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(types.RequestInfo) error); ok {
r1 = rf(_a0)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// InitChainAsync provides a mock function with given fields: _a0
func (_m *Client) InitChainAsync(_a0 types.RequestInitChain) *abcicli.ReqRes {
ret := _m.Called(_a0)
var r0 *abcicli.ReqRes
if rf, ok := ret.Get(0).(func(types.RequestInitChain) *abcicli.ReqRes); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*abcicli.ReqRes)
}
}
return r0
}
// InitChainSync provides a mock function with given fields: _a0
func (_m *Client) InitChainSync(_a0 types.RequestInitChain) (*types.ResponseInitChain, error) {
ret := _m.Called(_a0)
var r0 *types.ResponseInitChain
if rf, ok := ret.Get(0).(func(types.RequestInitChain) *types.ResponseInitChain); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ResponseInitChain)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(types.RequestInitChain) error); ok {
r1 = rf(_a0)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// IsRunning provides a mock function with given fields:
func (_m *Client) IsRunning() bool {
ret := _m.Called()
var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// ListSnapshotsAsync provides a mock function with given fields: _a0
func (_m *Client) ListSnapshotsAsync(_a0 types.RequestListSnapshots) *abcicli.ReqRes {
ret := _m.Called(_a0)
var r0 *abcicli.ReqRes
if rf, ok := ret.Get(0).(func(types.RequestListSnapshots) *abcicli.ReqRes); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*abcicli.ReqRes)
}
}
return r0
}
// ListSnapshotsSync provides a mock function with given fields: _a0
func (_m *Client) ListSnapshotsSync(_a0 types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
ret := _m.Called(_a0)
var r0 *types.ResponseListSnapshots
if rf, ok := ret.Get(0).(func(types.RequestListSnapshots) *types.ResponseListSnapshots); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ResponseListSnapshots)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(types.RequestListSnapshots) error); ok {
r1 = rf(_a0)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// LoadSnapshotChunkAsync provides a mock function with given fields: _a0
func (_m *Client) LoadSnapshotChunkAsync(_a0 types.RequestLoadSnapshotChunk) *abcicli.ReqRes {
ret := _m.Called(_a0)
var r0 *abcicli.ReqRes
if rf, ok := ret.Get(0).(func(types.RequestLoadSnapshotChunk) *abcicli.ReqRes); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*abcicli.ReqRes)
}
}
return r0
}
// LoadSnapshotChunkSync provides a mock function with given fields: _a0
func (_m *Client) LoadSnapshotChunkSync(_a0 types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
ret := _m.Called(_a0)
var r0 *types.ResponseLoadSnapshotChunk
if rf, ok := ret.Get(0).(func(types.RequestLoadSnapshotChunk) *types.ResponseLoadSnapshotChunk); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ResponseLoadSnapshotChunk)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(types.RequestLoadSnapshotChunk) error); ok {
r1 = rf(_a0)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// OfferSnapshotAsync provides a mock function with given fields: _a0
func (_m *Client) OfferSnapshotAsync(_a0 types.RequestOfferSnapshot) *abcicli.ReqRes {
ret := _m.Called(_a0)
var r0 *abcicli.ReqRes
if rf, ok := ret.Get(0).(func(types.RequestOfferSnapshot) *abcicli.ReqRes); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*abcicli.ReqRes)
}
}
return r0
}
// OfferSnapshotSync provides a mock function with given fields: _a0
func (_m *Client) OfferSnapshotSync(_a0 types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) {
ret := _m.Called(_a0)
var r0 *types.ResponseOfferSnapshot
if rf, ok := ret.Get(0).(func(types.RequestOfferSnapshot) *types.ResponseOfferSnapshot); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ResponseOfferSnapshot)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(types.RequestOfferSnapshot) error); ok {
r1 = rf(_a0)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// OnReset provides a mock function with given fields:
func (_m *Client) OnReset() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// OnStart provides a mock function with given fields:
func (_m *Client) OnStart() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// OnStop provides a mock function with given fields:
func (_m *Client) OnStop() {
_m.Called()
}
// QueryAsync provides a mock function with given fields: _a0
func (_m *Client) QueryAsync(_a0 types.RequestQuery) *abcicli.ReqRes {
ret := _m.Called(_a0)
var r0 *abcicli.ReqRes
if rf, ok := ret.Get(0).(func(types.RequestQuery) *abcicli.ReqRes); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*abcicli.ReqRes)
}
}
return r0
}
// QuerySync provides a mock function with given fields: _a0
func (_m *Client) QuerySync(_a0 types.RequestQuery) (*types.ResponseQuery, error) {
ret := _m.Called(_a0)
var r0 *types.ResponseQuery
if rf, ok := ret.Get(0).(func(types.RequestQuery) *types.ResponseQuery); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ResponseQuery)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(types.RequestQuery) error); ok {
r1 = rf(_a0)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Quit provides a mock function with given fields:
func (_m *Client) Quit() <-chan struct{} {
ret := _m.Called()
var r0 <-chan struct{}
if rf, ok := ret.Get(0).(func() <-chan struct{}); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan struct{})
}
}
return r0
}
// Reset provides a mock function with given fields:
func (_m *Client) Reset() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// SetLogger provides a mock function with given fields: _a0
func (_m *Client) SetLogger(_a0 log.Logger) {
_m.Called(_a0)
}
// SetOptionAsync provides a mock function with given fields: _a0
func (_m *Client) SetOptionAsync(_a0 types.RequestSetOption) *abcicli.ReqRes {
ret := _m.Called(_a0)
var r0 *abcicli.ReqRes
if rf, ok := ret.Get(0).(func(types.RequestSetOption) *abcicli.ReqRes); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*abcicli.ReqRes)
}
}
return r0
}
// SetOptionSync provides a mock function with given fields: _a0
func (_m *Client) SetOptionSync(_a0 types.RequestSetOption) (*types.ResponseSetOption, error) {
ret := _m.Called(_a0)
var r0 *types.ResponseSetOption
if rf, ok := ret.Get(0).(func(types.RequestSetOption) *types.ResponseSetOption); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ResponseSetOption)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(types.RequestSetOption) error); ok {
r1 = rf(_a0)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// SetResponseCallback provides a mock function with given fields: _a0
func (_m *Client) SetResponseCallback(_a0 abcicli.Callback) {
_m.Called(_a0)
}
// Start provides a mock function with given fields:
func (_m *Client) Start() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// Stop provides a mock function with given fields:
func (_m *Client) Stop() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// String provides a mock function with given fields:
func (_m *Client) String() string {
ret := _m.Called()
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}

+ 1
- 5
consensus/state.go View File

@ -1529,11 +1529,7 @@ func (cs *State) finalizeCommit(height int64) {
types.BlockID{Hash: block.Hash(), PartSetHeader: blockParts.Header()}, types.BlockID{Hash: block.Hash(), PartSetHeader: blockParts.Header()},
block) block)
if err != nil { if err != nil {
cs.Logger.Error("Error on ApplyBlock. Did the application crash? Please restart tendermint", "err", err)
err := tmos.Kill()
if err != nil {
cs.Logger.Error("Failed to kill this process - please do so manually", "err", err)
}
cs.Logger.Error("Error on ApplyBlock", "err", err)
return return
} }


+ 10
- 4
proxy/client.go View File

@ -10,8 +10,9 @@ import (
"github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/abci/types"
) )
// NewABCIClient returns newly connected client
// ClientCreator creates new ABCI clients.
type ClientCreator interface { type ClientCreator interface {
// NewABCIClient returns a new ABCI client.
NewABCIClient() (abcicli.Client, error) NewABCIClient() (abcicli.Client, error)
} }
@ -23,6 +24,8 @@ type localClientCreator struct {
app types.Application app types.Application
} }
// NewLocalClientCreator returns a ClientCreator for the given app,
// which will be running locally.
func NewLocalClientCreator(app types.Application) ClientCreator { func NewLocalClientCreator(app types.Application) ClientCreator {
return &localClientCreator{ return &localClientCreator{
mtx: new(sync.Mutex), mtx: new(sync.Mutex),
@ -43,6 +46,9 @@ type remoteClientCreator struct {
mustConnect bool mustConnect bool
} }
// NewRemoteClientCreator returns a ClientCreator for the given address (e.g.
// "192.168.0.1") and transport (e.g. "tcp"). Set mustConnect to true if you
// want the client to connect before reporting success.
func NewRemoteClientCreator(addr, transport string, mustConnect bool) ClientCreator { func NewRemoteClientCreator(addr, transport string, mustConnect bool) ClientCreator {
return &remoteClientCreator{ return &remoteClientCreator{
addr: addr, addr: addr,
@ -59,9 +65,9 @@ func (r *remoteClientCreator) NewABCIClient() (abcicli.Client, error) {
return remoteApp, nil return remoteApp, nil
} }
//-----------------------------------------------------------------
// default
// DefaultClientCreator returns a default ClientCreator, which will create a
// local client if addr is one of: 'counter', 'counter_serial', 'kvstore',
// 'persistent_kvstore' or 'noop', otherwise - a remote client.
func DefaultClientCreator(addr, transport, dbDir string) ClientCreator { func DefaultClientCreator(addr, transport, dbDir string) ClientCreator {
switch addr { switch addr {
case "counter": case "counter":


+ 36
- 0
proxy/mocks/client_creator.go View File

@ -0,0 +1,36 @@
// Code generated by mockery v1.1.1. DO NOT EDIT.
package mocks
import (
mock "github.com/stretchr/testify/mock"
abcicli "github.com/tendermint/tendermint/abci/client"
)
// ClientCreator is an autogenerated mock type for the ClientCreator type
type ClientCreator struct {
mock.Mock
}
// NewABCIClient provides a mock function with given fields:
func (_m *ClientCreator) NewABCIClient() (abcicli.Client, error) {
ret := _m.Called()
var r0 abcicli.Client
if rf, ok := ret.Get(0).(func() abcicli.Client); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(abcicli.Client)
}
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}

+ 110
- 44
proxy/multi_app_conn.go View File

@ -3,43 +3,61 @@ package proxy
import ( import (
"fmt" "fmt"
abcicli "github.com/tendermint/tendermint/abci/client"
tmlog "github.com/tendermint/tendermint/libs/log"
tmos "github.com/tendermint/tendermint/libs/os"
"github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/libs/service"
) )
//-----------------------------
const (
connConsensus = "consensus"
connMempool = "mempool"
connQuery = "query"
connSnapshot = "snapshot"
)
// Tendermint's interface to the application consists of multiple connections
// AppConns is the Tendermint's interface to the application that consists of
// multiple connections.
type AppConns interface { type AppConns interface {
service.Service service.Service
// Mempool connection
Mempool() AppConnMempool Mempool() AppConnMempool
// Consensus connection
Consensus() AppConnConsensus Consensus() AppConnConsensus
// Query connection
Query() AppConnQuery Query() AppConnQuery
// Snapshot connection
Snapshot() AppConnSnapshot Snapshot() AppConnSnapshot
} }
// NewAppConns calls NewMultiAppConn.
func NewAppConns(clientCreator ClientCreator) AppConns { func NewAppConns(clientCreator ClientCreator) AppConns {
return NewMultiAppConn(clientCreator) return NewMultiAppConn(clientCreator)
} }
//-----------------------------
// multiAppConn implements AppConns
// a multiAppConn is made of a few appConns (mempool, consensus, query)
// and manages their underlying abci clients
// multiAppConn implements AppConns.
//
// A multiAppConn is made of a few appConns and manages their underlying abci
// clients.
// TODO: on app restart, clients must reboot together // TODO: on app restart, clients must reboot together
type multiAppConn struct { type multiAppConn struct {
service.BaseService service.BaseService
mempoolConn AppConnMempool
consensusConn AppConnConsensus consensusConn AppConnConsensus
mempoolConn AppConnMempool
queryConn AppConnQuery queryConn AppConnQuery
snapshotConn AppConnSnapshot snapshotConn AppConnSnapshot
consensusConnClient abcicli.Client
mempoolConnClient abcicli.Client
queryConnClient abcicli.Client
snapshotConnClient abcicli.Client
clientCreator ClientCreator clientCreator ClientCreator
} }
// Make all necessary abci connections to the application
// NewMultiAppConn makes all necessary abci connections to the application.
func NewMultiAppConn(clientCreator ClientCreator) AppConns { func NewMultiAppConn(clientCreator ClientCreator) AppConns {
multiAppConn := &multiAppConn{ multiAppConn := &multiAppConn{
clientCreator: clientCreator, clientCreator: clientCreator,
@ -48,70 +66,118 @@ func NewMultiAppConn(clientCreator ClientCreator) AppConns {
return multiAppConn return multiAppConn
} }
// Returns the mempool connection
func (app *multiAppConn) Mempool() AppConnMempool { func (app *multiAppConn) Mempool() AppConnMempool {
return app.mempoolConn return app.mempoolConn
} }
// Returns the consensus Connection
func (app *multiAppConn) Consensus() AppConnConsensus { func (app *multiAppConn) Consensus() AppConnConsensus {
return app.consensusConn return app.consensusConn
} }
// Returns the query Connection
func (app *multiAppConn) Query() AppConnQuery { func (app *multiAppConn) Query() AppConnQuery {
return app.queryConn return app.queryConn
} }
// Returns the snapshot Connection
func (app *multiAppConn) Snapshot() AppConnSnapshot { func (app *multiAppConn) Snapshot() AppConnSnapshot {
return app.snapshotConn return app.snapshotConn
} }
func (app *multiAppConn) OnStart() error { func (app *multiAppConn) OnStart() error {
// query connection
querycli, err := app.clientCreator.NewABCIClient()
c, err := app.abciClientFor(connQuery)
if err != nil { if err != nil {
return fmt.Errorf("error creating ABCI client (query connection): %w", err)
}
querycli.SetLogger(app.Logger.With("module", "abci-client", "connection", "query"))
if err := querycli.Start(); err != nil {
return fmt.Errorf("error starting ABCI client (query connection): %w", err)
return err
} }
app.queryConn = NewAppConnQuery(querycli)
app.queryConnClient = c
app.queryConn = NewAppConnQuery(c)
// snapshot connection
snapshotcli, err := app.clientCreator.NewABCIClient()
c, err = app.abciClientFor(connSnapshot)
if err != nil { if err != nil {
return fmt.Errorf("error creating ABCI client (snapshot connection): %w", err)
app.stopAllClients()
return err
} }
snapshotcli.SetLogger(app.Logger.With("module", "abci-client", "connection", "snapshot"))
if err := snapshotcli.Start(); err != nil {
return fmt.Errorf("error starting ABCI client (snapshot connection): %w", err)
app.snapshotConnClient = c
app.snapshotConn = NewAppConnSnapshot(c)
c, err = app.abciClientFor(connMempool)
if err != nil {
app.stopAllClients()
return err
} }
app.snapshotConn = NewAppConnSnapshot(snapshotcli)
app.mempoolConnClient = c
app.mempoolConn = NewAppConnMempool(c)
// mempool connection
memcli, err := app.clientCreator.NewABCIClient()
c, err = app.abciClientFor(connConsensus)
if err != nil { if err != nil {
return fmt.Errorf("error creating ABCI client (mempool connection): %w", err)
app.stopAllClients()
return err
}
app.consensusConnClient = c
app.consensusConn = NewAppConnConsensus(c)
// Kill Tendermint if the ABCI application crashes.
go app.killTMOnClientError()
return nil
}
func (app *multiAppConn) OnStop() {
app.stopAllClients()
}
func (app *multiAppConn) killTMOnClientError() {
killFn := func(conn string, err error, logger tmlog.Logger) {
logger.Error(
fmt.Sprintf("%s connection terminated. Did the application crash? Please restart tendermint", conn),
"err", err)
killErr := tmos.Kill()
if killErr != nil {
logger.Error("Failed to kill this process - please do so manually", "err", killErr)
}
} }
memcli.SetLogger(app.Logger.With("module", "abci-client", "connection", "mempool"))
if err := memcli.Start(); err != nil {
return fmt.Errorf("error starting ABCI client (mempool connection): %w", err)
select {
case <-app.consensusConnClient.Quit():
if err := app.consensusConnClient.Error(); err != nil {
killFn(connConsensus, err, app.Logger)
}
case <-app.mempoolConnClient.Quit():
if err := app.mempoolConnClient.Error(); err != nil {
killFn(connMempool, err, app.Logger)
}
case <-app.queryConnClient.Quit():
if err := app.queryConnClient.Error(); err != nil {
killFn(connQuery, err, app.Logger)
}
case <-app.snapshotConnClient.Quit():
if err := app.snapshotConnClient.Error(); err != nil {
killFn(connSnapshot, err, app.Logger)
}
} }
app.mempoolConn = NewAppConnMempool(memcli)
}
// consensus connection
concli, err := app.clientCreator.NewABCIClient()
if err != nil {
return fmt.Errorf("error creating ABCI client (consensus connection): %w", err)
func (app *multiAppConn) stopAllClients() {
if app.consensusConnClient != nil {
app.consensusConnClient.Stop()
}
if app.mempoolConnClient != nil {
app.mempoolConnClient.Stop()
} }
concli.SetLogger(app.Logger.With("module", "abci-client", "connection", "consensus"))
if err := concli.Start(); err != nil {
return fmt.Errorf("error starting ABCI client (consensus connection): %w", err)
if app.queryConnClient != nil {
app.queryConnClient.Stop()
} }
app.consensusConn = NewAppConnConsensus(concli)
if app.snapshotConnClient != nil {
app.snapshotConnClient.Stop()
}
}
return nil
func (app *multiAppConn) abciClientFor(conn string) (abcicli.Client, error) {
c, err := app.clientCreator.NewABCIClient()
if err != nil {
return nil, fmt.Errorf("error creating ABCI client (%s connection): %w", conn, err)
}
c.SetLogger(app.Logger.With("module", "abci-client", "connection", conn))
if err := c.Start(); err != nil {
return nil, fmt.Errorf("error starting ABCI client (%s connection): %w", conn, err)
}
return c, nil
} }

+ 85
- 0
proxy/multi_app_conn_test.go View File

@ -0,0 +1,85 @@
package proxy
import (
"errors"
"os"
"os/signal"
"syscall"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
abcimocks "github.com/tendermint/tendermint/abci/client/mocks"
"github.com/tendermint/tendermint/proxy/mocks"
)
func TestAppConns_Start_Stop(t *testing.T) {
quitCh := make(<-chan struct{})
clientCreatorMock := &mocks.ClientCreator{}
clientMock := &abcimocks.Client{}
clientMock.On("SetLogger", mock.Anything).Return().Times(4)
clientMock.On("Start").Return(nil).Times(4)
clientMock.On("Stop").Return(nil).Times(4)
clientMock.On("Quit").Return(quitCh).Times(4)
clientCreatorMock.On("NewABCIClient").Return(clientMock, nil).Times(4)
appConns := NewAppConns(clientCreatorMock)
err := appConns.Start()
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
appConns.Stop()
clientMock.AssertExpectations(t)
}
// Upon failure, we call tmos.Kill
func TestAppConns_Failure(t *testing.T) {
ok := make(chan struct{})
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGTERM)
go func() {
for range c {
close(ok)
}
}()
quitCh := make(chan struct{})
var recvQuitCh <-chan struct{} // nolint:gosimple
recvQuitCh = quitCh
clientCreatorMock := &mocks.ClientCreator{}
clientMock := &abcimocks.Client{}
clientMock.On("SetLogger", mock.Anything).Return()
clientMock.On("Start").Return(nil)
clientMock.On("Stop").Return(nil)
clientMock.On("Quit").Return(recvQuitCh)
clientMock.On("Error").Return(errors.New("EOF")).Once()
clientCreatorMock.On("NewABCIClient").Return(clientMock, nil)
appConns := NewAppConns(clientCreatorMock)
err := appConns.Start()
require.NoError(t, err)
defer appConns.Stop()
// simulate failure
close(quitCh)
select {
case <-ok:
t.Log("SIGTERM successfully received")
case <-time.After(5 * time.Second):
t.Fatal("expected process to receive SIGTERM signal")
}
}

Loading…
Cancel
Save