From 16216028a1c18895de43542ee0bd8f414a3a52af Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 8 Jul 2020 16:07:11 +0400 Subject: [PATCH] proxy: improve ABCI app connection handling (#5078) Closes #5074 Old code does not work when --consensus.create_empty_blocks=false (because it only calls tmos.Kill when ApplyBlock fails). New code is listening ABCI clients for Quit and kills TM process if there were any errors. --- CHANGELOG_PENDING.md | 1 + abci/client/mocks/client.go | 736 ++++++++++++++++++++++++++++++++++ consensus/state.go | 6 +- proxy/client.go | 14 +- proxy/mocks/client_creator.go | 36 ++ proxy/multi_app_conn.go | 154 +++++-- proxy/multi_app_conn_test.go | 85 ++++ 7 files changed, 979 insertions(+), 53 deletions(-) create mode 100644 abci/client/mocks/client.go create mode 100644 proxy/mocks/client_creator.go create mode 100644 proxy/multi_app_conn_test.go diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 500d4c0b3..9cdac293b 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -124,3 +124,4 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi - [blockchain/v2] Correctly set block store base in status responses (@erikgrinaker) - [consensus] [\#4895](https://github.com/tendermint/tendermint/pull/4895) Cache the address of the validator to reduce querying a remote KMS (@joe-bowman) - [consensus] \#4970 Stricter on `LastCommitRound` check (@cuonglm) +- [proxy] \#5078 Fix a bug, where TM does not exit when ABCI app crashes (@melekes) diff --git a/abci/client/mocks/client.go b/abci/client/mocks/client.go new file mode 100644 index 000000000..6d7f99b0f --- /dev/null +++ b/abci/client/mocks/client.go @@ -0,0 +1,736 @@ +// Code generated by mockery v1.1.1. DO NOT EDIT. + +package mocks + +import ( + abcicli "github.com/tendermint/tendermint/abci/client" + log "github.com/tendermint/tendermint/libs/log" + + mock "github.com/stretchr/testify/mock" + + types "github.com/tendermint/tendermint/abci/types" +) + +// Client is an autogenerated mock type for the Client type +type Client struct { + mock.Mock +} + +// ApplySnapshotChunkAsync provides a mock function with given fields: _a0 +func (_m *Client) ApplySnapshotChunkAsync(_a0 types.RequestApplySnapshotChunk) *abcicli.ReqRes { + ret := _m.Called(_a0) + + var r0 *abcicli.ReqRes + if rf, ok := ret.Get(0).(func(types.RequestApplySnapshotChunk) *abcicli.ReqRes); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*abcicli.ReqRes) + } + } + + return r0 +} + +// ApplySnapshotChunkSync provides a mock function with given fields: _a0 +func (_m *Client) ApplySnapshotChunkSync(_a0 types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) { + ret := _m.Called(_a0) + + var r0 *types.ResponseApplySnapshotChunk + if rf, ok := ret.Get(0).(func(types.RequestApplySnapshotChunk) *types.ResponseApplySnapshotChunk); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.ResponseApplySnapshotChunk) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(types.RequestApplySnapshotChunk) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// BeginBlockAsync provides a mock function with given fields: _a0 +func (_m *Client) BeginBlockAsync(_a0 types.RequestBeginBlock) *abcicli.ReqRes { + ret := _m.Called(_a0) + + var r0 *abcicli.ReqRes + if rf, ok := ret.Get(0).(func(types.RequestBeginBlock) *abcicli.ReqRes); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*abcicli.ReqRes) + } + } + + return r0 +} + +// BeginBlockSync provides a mock function with given fields: _a0 +func (_m *Client) BeginBlockSync(_a0 types.RequestBeginBlock) (*types.ResponseBeginBlock, error) { + ret := _m.Called(_a0) + + var r0 *types.ResponseBeginBlock + if rf, ok := ret.Get(0).(func(types.RequestBeginBlock) *types.ResponseBeginBlock); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.ResponseBeginBlock) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(types.RequestBeginBlock) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// CheckTxAsync provides a mock function with given fields: _a0 +func (_m *Client) CheckTxAsync(_a0 types.RequestCheckTx) *abcicli.ReqRes { + ret := _m.Called(_a0) + + var r0 *abcicli.ReqRes + if rf, ok := ret.Get(0).(func(types.RequestCheckTx) *abcicli.ReqRes); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*abcicli.ReqRes) + } + } + + return r0 +} + +// CheckTxSync provides a mock function with given fields: _a0 +func (_m *Client) CheckTxSync(_a0 types.RequestCheckTx) (*types.ResponseCheckTx, error) { + ret := _m.Called(_a0) + + var r0 *types.ResponseCheckTx + if rf, ok := ret.Get(0).(func(types.RequestCheckTx) *types.ResponseCheckTx); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.ResponseCheckTx) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(types.RequestCheckTx) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// CommitAsync provides a mock function with given fields: +func (_m *Client) CommitAsync() *abcicli.ReqRes { + ret := _m.Called() + + var r0 *abcicli.ReqRes + if rf, ok := ret.Get(0).(func() *abcicli.ReqRes); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*abcicli.ReqRes) + } + } + + return r0 +} + +// CommitSync provides a mock function with given fields: +func (_m *Client) CommitSync() (*types.ResponseCommit, error) { + ret := _m.Called() + + var r0 *types.ResponseCommit + if rf, ok := ret.Get(0).(func() *types.ResponseCommit); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.ResponseCommit) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// DeliverTxAsync provides a mock function with given fields: _a0 +func (_m *Client) DeliverTxAsync(_a0 types.RequestDeliverTx) *abcicli.ReqRes { + ret := _m.Called(_a0) + + var r0 *abcicli.ReqRes + if rf, ok := ret.Get(0).(func(types.RequestDeliverTx) *abcicli.ReqRes); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*abcicli.ReqRes) + } + } + + return r0 +} + +// DeliverTxSync provides a mock function with given fields: _a0 +func (_m *Client) DeliverTxSync(_a0 types.RequestDeliverTx) (*types.ResponseDeliverTx, error) { + ret := _m.Called(_a0) + + var r0 *types.ResponseDeliverTx + if rf, ok := ret.Get(0).(func(types.RequestDeliverTx) *types.ResponseDeliverTx); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.ResponseDeliverTx) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(types.RequestDeliverTx) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// EchoAsync provides a mock function with given fields: msg +func (_m *Client) EchoAsync(msg string) *abcicli.ReqRes { + ret := _m.Called(msg) + + var r0 *abcicli.ReqRes + if rf, ok := ret.Get(0).(func(string) *abcicli.ReqRes); ok { + r0 = rf(msg) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*abcicli.ReqRes) + } + } + + return r0 +} + +// EchoSync provides a mock function with given fields: msg +func (_m *Client) EchoSync(msg string) (*types.ResponseEcho, error) { + ret := _m.Called(msg) + + var r0 *types.ResponseEcho + if rf, ok := ret.Get(0).(func(string) *types.ResponseEcho); ok { + r0 = rf(msg) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.ResponseEcho) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(msg) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// EndBlockAsync provides a mock function with given fields: _a0 +func (_m *Client) EndBlockAsync(_a0 types.RequestEndBlock) *abcicli.ReqRes { + ret := _m.Called(_a0) + + var r0 *abcicli.ReqRes + if rf, ok := ret.Get(0).(func(types.RequestEndBlock) *abcicli.ReqRes); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*abcicli.ReqRes) + } + } + + return r0 +} + +// EndBlockSync provides a mock function with given fields: _a0 +func (_m *Client) EndBlockSync(_a0 types.RequestEndBlock) (*types.ResponseEndBlock, error) { + ret := _m.Called(_a0) + + var r0 *types.ResponseEndBlock + if rf, ok := ret.Get(0).(func(types.RequestEndBlock) *types.ResponseEndBlock); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.ResponseEndBlock) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(types.RequestEndBlock) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Error provides a mock function with given fields: +func (_m *Client) Error() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// FlushAsync provides a mock function with given fields: +func (_m *Client) FlushAsync() *abcicli.ReqRes { + ret := _m.Called() + + var r0 *abcicli.ReqRes + if rf, ok := ret.Get(0).(func() *abcicli.ReqRes); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*abcicli.ReqRes) + } + } + + return r0 +} + +// FlushSync provides a mock function with given fields: +func (_m *Client) FlushSync() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// InfoAsync provides a mock function with given fields: _a0 +func (_m *Client) InfoAsync(_a0 types.RequestInfo) *abcicli.ReqRes { + ret := _m.Called(_a0) + + var r0 *abcicli.ReqRes + if rf, ok := ret.Get(0).(func(types.RequestInfo) *abcicli.ReqRes); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*abcicli.ReqRes) + } + } + + return r0 +} + +// InfoSync provides a mock function with given fields: _a0 +func (_m *Client) InfoSync(_a0 types.RequestInfo) (*types.ResponseInfo, error) { + ret := _m.Called(_a0) + + var r0 *types.ResponseInfo + if rf, ok := ret.Get(0).(func(types.RequestInfo) *types.ResponseInfo); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.ResponseInfo) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(types.RequestInfo) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// InitChainAsync provides a mock function with given fields: _a0 +func (_m *Client) InitChainAsync(_a0 types.RequestInitChain) *abcicli.ReqRes { + ret := _m.Called(_a0) + + var r0 *abcicli.ReqRes + if rf, ok := ret.Get(0).(func(types.RequestInitChain) *abcicli.ReqRes); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*abcicli.ReqRes) + } + } + + return r0 +} + +// InitChainSync provides a mock function with given fields: _a0 +func (_m *Client) InitChainSync(_a0 types.RequestInitChain) (*types.ResponseInitChain, error) { + ret := _m.Called(_a0) + + var r0 *types.ResponseInitChain + if rf, ok := ret.Get(0).(func(types.RequestInitChain) *types.ResponseInitChain); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.ResponseInitChain) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(types.RequestInitChain) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// IsRunning provides a mock function with given fields: +func (_m *Client) IsRunning() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// ListSnapshotsAsync provides a mock function with given fields: _a0 +func (_m *Client) ListSnapshotsAsync(_a0 types.RequestListSnapshots) *abcicli.ReqRes { + ret := _m.Called(_a0) + + var r0 *abcicli.ReqRes + if rf, ok := ret.Get(0).(func(types.RequestListSnapshots) *abcicli.ReqRes); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*abcicli.ReqRes) + } + } + + return r0 +} + +// ListSnapshotsSync provides a mock function with given fields: _a0 +func (_m *Client) ListSnapshotsSync(_a0 types.RequestListSnapshots) (*types.ResponseListSnapshots, error) { + ret := _m.Called(_a0) + + var r0 *types.ResponseListSnapshots + if rf, ok := ret.Get(0).(func(types.RequestListSnapshots) *types.ResponseListSnapshots); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.ResponseListSnapshots) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(types.RequestListSnapshots) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// LoadSnapshotChunkAsync provides a mock function with given fields: _a0 +func (_m *Client) LoadSnapshotChunkAsync(_a0 types.RequestLoadSnapshotChunk) *abcicli.ReqRes { + ret := _m.Called(_a0) + + var r0 *abcicli.ReqRes + if rf, ok := ret.Get(0).(func(types.RequestLoadSnapshotChunk) *abcicli.ReqRes); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*abcicli.ReqRes) + } + } + + return r0 +} + +// LoadSnapshotChunkSync provides a mock function with given fields: _a0 +func (_m *Client) LoadSnapshotChunkSync(_a0 types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) { + ret := _m.Called(_a0) + + var r0 *types.ResponseLoadSnapshotChunk + if rf, ok := ret.Get(0).(func(types.RequestLoadSnapshotChunk) *types.ResponseLoadSnapshotChunk); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.ResponseLoadSnapshotChunk) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(types.RequestLoadSnapshotChunk) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// OfferSnapshotAsync provides a mock function with given fields: _a0 +func (_m *Client) OfferSnapshotAsync(_a0 types.RequestOfferSnapshot) *abcicli.ReqRes { + ret := _m.Called(_a0) + + var r0 *abcicli.ReqRes + if rf, ok := ret.Get(0).(func(types.RequestOfferSnapshot) *abcicli.ReqRes); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*abcicli.ReqRes) + } + } + + return r0 +} + +// OfferSnapshotSync provides a mock function with given fields: _a0 +func (_m *Client) OfferSnapshotSync(_a0 types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) { + ret := _m.Called(_a0) + + var r0 *types.ResponseOfferSnapshot + if rf, ok := ret.Get(0).(func(types.RequestOfferSnapshot) *types.ResponseOfferSnapshot); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.ResponseOfferSnapshot) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(types.RequestOfferSnapshot) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// OnReset provides a mock function with given fields: +func (_m *Client) OnReset() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// OnStart provides a mock function with given fields: +func (_m *Client) OnStart() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// OnStop provides a mock function with given fields: +func (_m *Client) OnStop() { + _m.Called() +} + +// QueryAsync provides a mock function with given fields: _a0 +func (_m *Client) QueryAsync(_a0 types.RequestQuery) *abcicli.ReqRes { + ret := _m.Called(_a0) + + var r0 *abcicli.ReqRes + if rf, ok := ret.Get(0).(func(types.RequestQuery) *abcicli.ReqRes); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*abcicli.ReqRes) + } + } + + return r0 +} + +// QuerySync provides a mock function with given fields: _a0 +func (_m *Client) QuerySync(_a0 types.RequestQuery) (*types.ResponseQuery, error) { + ret := _m.Called(_a0) + + var r0 *types.ResponseQuery + if rf, ok := ret.Get(0).(func(types.RequestQuery) *types.ResponseQuery); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.ResponseQuery) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(types.RequestQuery) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Quit provides a mock function with given fields: +func (_m *Client) Quit() <-chan struct{} { + ret := _m.Called() + + var r0 <-chan struct{} + if rf, ok := ret.Get(0).(func() <-chan struct{}); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan struct{}) + } + } + + return r0 +} + +// Reset provides a mock function with given fields: +func (_m *Client) Reset() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SetLogger provides a mock function with given fields: _a0 +func (_m *Client) SetLogger(_a0 log.Logger) { + _m.Called(_a0) +} + +// SetOptionAsync provides a mock function with given fields: _a0 +func (_m *Client) SetOptionAsync(_a0 types.RequestSetOption) *abcicli.ReqRes { + ret := _m.Called(_a0) + + var r0 *abcicli.ReqRes + if rf, ok := ret.Get(0).(func(types.RequestSetOption) *abcicli.ReqRes); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*abcicli.ReqRes) + } + } + + return r0 +} + +// SetOptionSync provides a mock function with given fields: _a0 +func (_m *Client) SetOptionSync(_a0 types.RequestSetOption) (*types.ResponseSetOption, error) { + ret := _m.Called(_a0) + + var r0 *types.ResponseSetOption + if rf, ok := ret.Get(0).(func(types.RequestSetOption) *types.ResponseSetOption); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.ResponseSetOption) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(types.RequestSetOption) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// SetResponseCallback provides a mock function with given fields: _a0 +func (_m *Client) SetResponseCallback(_a0 abcicli.Callback) { + _m.Called(_a0) +} + +// Start provides a mock function with given fields: +func (_m *Client) Start() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Stop provides a mock function with given fields: +func (_m *Client) Stop() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// String provides a mock function with given fields: +func (_m *Client) String() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} diff --git a/consensus/state.go b/consensus/state.go index 0d2d28c9b..1445dd479 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1529,11 +1529,7 @@ func (cs *State) finalizeCommit(height int64) { types.BlockID{Hash: block.Hash(), PartSetHeader: blockParts.Header()}, block) if err != nil { - cs.Logger.Error("Error on ApplyBlock. Did the application crash? Please restart tendermint", "err", err) - err := tmos.Kill() - if err != nil { - cs.Logger.Error("Failed to kill this process - please do so manually", "err", err) - } + cs.Logger.Error("Error on ApplyBlock", "err", err) return } diff --git a/proxy/client.go b/proxy/client.go index 5901cf592..f92b258e6 100644 --- a/proxy/client.go +++ b/proxy/client.go @@ -10,8 +10,9 @@ import ( "github.com/tendermint/tendermint/abci/types" ) -// NewABCIClient returns newly connected client +// ClientCreator creates new ABCI clients. type ClientCreator interface { + // NewABCIClient returns a new ABCI client. NewABCIClient() (abcicli.Client, error) } @@ -23,6 +24,8 @@ type localClientCreator struct { app types.Application } +// NewLocalClientCreator returns a ClientCreator for the given app, +// which will be running locally. func NewLocalClientCreator(app types.Application) ClientCreator { return &localClientCreator{ mtx: new(sync.Mutex), @@ -43,6 +46,9 @@ type remoteClientCreator struct { mustConnect bool } +// NewRemoteClientCreator returns a ClientCreator for the given address (e.g. +// "192.168.0.1") and transport (e.g. "tcp"). Set mustConnect to true if you +// want the client to connect before reporting success. func NewRemoteClientCreator(addr, transport string, mustConnect bool) ClientCreator { return &remoteClientCreator{ addr: addr, @@ -59,9 +65,9 @@ func (r *remoteClientCreator) NewABCIClient() (abcicli.Client, error) { return remoteApp, nil } -//----------------------------------------------------------------- -// default - +// DefaultClientCreator returns a default ClientCreator, which will create a +// local client if addr is one of: 'counter', 'counter_serial', 'kvstore', +// 'persistent_kvstore' or 'noop', otherwise - a remote client. func DefaultClientCreator(addr, transport, dbDir string) ClientCreator { switch addr { case "counter": diff --git a/proxy/mocks/client_creator.go b/proxy/mocks/client_creator.go new file mode 100644 index 000000000..499313d17 --- /dev/null +++ b/proxy/mocks/client_creator.go @@ -0,0 +1,36 @@ +// Code generated by mockery v1.1.1. DO NOT EDIT. + +package mocks + +import ( + mock "github.com/stretchr/testify/mock" + abcicli "github.com/tendermint/tendermint/abci/client" +) + +// ClientCreator is an autogenerated mock type for the ClientCreator type +type ClientCreator struct { + mock.Mock +} + +// NewABCIClient provides a mock function with given fields: +func (_m *ClientCreator) NewABCIClient() (abcicli.Client, error) { + ret := _m.Called() + + var r0 abcicli.Client + if rf, ok := ret.Get(0).(func() abcicli.Client); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(abcicli.Client) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/proxy/multi_app_conn.go b/proxy/multi_app_conn.go index 753d9a55e..ce470fda6 100644 --- a/proxy/multi_app_conn.go +++ b/proxy/multi_app_conn.go @@ -3,43 +3,61 @@ package proxy import ( "fmt" + abcicli "github.com/tendermint/tendermint/abci/client" + tmlog "github.com/tendermint/tendermint/libs/log" + tmos "github.com/tendermint/tendermint/libs/os" "github.com/tendermint/tendermint/libs/service" ) -//----------------------------- +const ( + connConsensus = "consensus" + connMempool = "mempool" + connQuery = "query" + connSnapshot = "snapshot" +) -// Tendermint's interface to the application consists of multiple connections +// AppConns is the Tendermint's interface to the application that consists of +// multiple connections. type AppConns interface { service.Service + // Mempool connection Mempool() AppConnMempool + // Consensus connection Consensus() AppConnConsensus + // Query connection Query() AppConnQuery + // Snapshot connection Snapshot() AppConnSnapshot } +// NewAppConns calls NewMultiAppConn. func NewAppConns(clientCreator ClientCreator) AppConns { return NewMultiAppConn(clientCreator) } -//----------------------------- -// multiAppConn implements AppConns - -// a multiAppConn is made of a few appConns (mempool, consensus, query) -// and manages their underlying abci clients +// multiAppConn implements AppConns. +// +// A multiAppConn is made of a few appConns and manages their underlying abci +// clients. // TODO: on app restart, clients must reboot together type multiAppConn struct { service.BaseService - mempoolConn AppConnMempool consensusConn AppConnConsensus + mempoolConn AppConnMempool queryConn AppConnQuery snapshotConn AppConnSnapshot + consensusConnClient abcicli.Client + mempoolConnClient abcicli.Client + queryConnClient abcicli.Client + snapshotConnClient abcicli.Client + clientCreator ClientCreator } -// Make all necessary abci connections to the application +// NewMultiAppConn makes all necessary abci connections to the application. func NewMultiAppConn(clientCreator ClientCreator) AppConns { multiAppConn := &multiAppConn{ clientCreator: clientCreator, @@ -48,70 +66,118 @@ func NewMultiAppConn(clientCreator ClientCreator) AppConns { return multiAppConn } -// Returns the mempool connection func (app *multiAppConn) Mempool() AppConnMempool { return app.mempoolConn } -// Returns the consensus Connection func (app *multiAppConn) Consensus() AppConnConsensus { return app.consensusConn } -// Returns the query Connection func (app *multiAppConn) Query() AppConnQuery { return app.queryConn } -// Returns the snapshot Connection func (app *multiAppConn) Snapshot() AppConnSnapshot { return app.snapshotConn } func (app *multiAppConn) OnStart() error { - // query connection - querycli, err := app.clientCreator.NewABCIClient() + c, err := app.abciClientFor(connQuery) if err != nil { - return fmt.Errorf("error creating ABCI client (query connection): %w", err) - } - querycli.SetLogger(app.Logger.With("module", "abci-client", "connection", "query")) - if err := querycli.Start(); err != nil { - return fmt.Errorf("error starting ABCI client (query connection): %w", err) + return err } - app.queryConn = NewAppConnQuery(querycli) + app.queryConnClient = c + app.queryConn = NewAppConnQuery(c) - // snapshot connection - snapshotcli, err := app.clientCreator.NewABCIClient() + c, err = app.abciClientFor(connSnapshot) if err != nil { - return fmt.Errorf("error creating ABCI client (snapshot connection): %w", err) + app.stopAllClients() + return err } - snapshotcli.SetLogger(app.Logger.With("module", "abci-client", "connection", "snapshot")) - if err := snapshotcli.Start(); err != nil { - return fmt.Errorf("error starting ABCI client (snapshot connection): %w", err) + app.snapshotConnClient = c + app.snapshotConn = NewAppConnSnapshot(c) + + c, err = app.abciClientFor(connMempool) + if err != nil { + app.stopAllClients() + return err } - app.snapshotConn = NewAppConnSnapshot(snapshotcli) + app.mempoolConnClient = c + app.mempoolConn = NewAppConnMempool(c) - // mempool connection - memcli, err := app.clientCreator.NewABCIClient() + c, err = app.abciClientFor(connConsensus) if err != nil { - return fmt.Errorf("error creating ABCI client (mempool connection): %w", err) + app.stopAllClients() + return err + } + app.consensusConnClient = c + app.consensusConn = NewAppConnConsensus(c) + + // Kill Tendermint if the ABCI application crashes. + go app.killTMOnClientError() + + return nil +} + +func (app *multiAppConn) OnStop() { + app.stopAllClients() +} + +func (app *multiAppConn) killTMOnClientError() { + killFn := func(conn string, err error, logger tmlog.Logger) { + logger.Error( + fmt.Sprintf("%s connection terminated. Did the application crash? Please restart tendermint", conn), + "err", err) + killErr := tmos.Kill() + if killErr != nil { + logger.Error("Failed to kill this process - please do so manually", "err", killErr) + } } - memcli.SetLogger(app.Logger.With("module", "abci-client", "connection", "mempool")) - if err := memcli.Start(); err != nil { - return fmt.Errorf("error starting ABCI client (mempool connection): %w", err) + + select { + case <-app.consensusConnClient.Quit(): + if err := app.consensusConnClient.Error(); err != nil { + killFn(connConsensus, err, app.Logger) + } + case <-app.mempoolConnClient.Quit(): + if err := app.mempoolConnClient.Error(); err != nil { + killFn(connMempool, err, app.Logger) + } + case <-app.queryConnClient.Quit(): + if err := app.queryConnClient.Error(); err != nil { + killFn(connQuery, err, app.Logger) + } + case <-app.snapshotConnClient.Quit(): + if err := app.snapshotConnClient.Error(); err != nil { + killFn(connSnapshot, err, app.Logger) + } } - app.mempoolConn = NewAppConnMempool(memcli) +} - // consensus connection - concli, err := app.clientCreator.NewABCIClient() - if err != nil { - return fmt.Errorf("error creating ABCI client (consensus connection): %w", err) +func (app *multiAppConn) stopAllClients() { + if app.consensusConnClient != nil { + app.consensusConnClient.Stop() + } + if app.mempoolConnClient != nil { + app.mempoolConnClient.Stop() } - concli.SetLogger(app.Logger.With("module", "abci-client", "connection", "consensus")) - if err := concli.Start(); err != nil { - return fmt.Errorf("error starting ABCI client (consensus connection): %w", err) + if app.queryConnClient != nil { + app.queryConnClient.Stop() } - app.consensusConn = NewAppConnConsensus(concli) + if app.snapshotConnClient != nil { + app.snapshotConnClient.Stop() + } +} - return nil +func (app *multiAppConn) abciClientFor(conn string) (abcicli.Client, error) { + c, err := app.clientCreator.NewABCIClient() + if err != nil { + return nil, fmt.Errorf("error creating ABCI client (%s connection): %w", conn, err) + } + c.SetLogger(app.Logger.With("module", "abci-client", "connection", conn)) + if err := c.Start(); err != nil { + return nil, fmt.Errorf("error starting ABCI client (%s connection): %w", conn, err) + } + return c, nil } diff --git a/proxy/multi_app_conn_test.go b/proxy/multi_app_conn_test.go new file mode 100644 index 000000000..1a07e93aa --- /dev/null +++ b/proxy/multi_app_conn_test.go @@ -0,0 +1,85 @@ +package proxy + +import ( + "errors" + "os" + "os/signal" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + abcimocks "github.com/tendermint/tendermint/abci/client/mocks" + "github.com/tendermint/tendermint/proxy/mocks" +) + +func TestAppConns_Start_Stop(t *testing.T) { + quitCh := make(<-chan struct{}) + + clientCreatorMock := &mocks.ClientCreator{} + + clientMock := &abcimocks.Client{} + clientMock.On("SetLogger", mock.Anything).Return().Times(4) + clientMock.On("Start").Return(nil).Times(4) + clientMock.On("Stop").Return(nil).Times(4) + clientMock.On("Quit").Return(quitCh).Times(4) + + clientCreatorMock.On("NewABCIClient").Return(clientMock, nil).Times(4) + + appConns := NewAppConns(clientCreatorMock) + + err := appConns.Start() + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + appConns.Stop() + + clientMock.AssertExpectations(t) +} + +// Upon failure, we call tmos.Kill +func TestAppConns_Failure(t *testing.T) { + ok := make(chan struct{}) + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGTERM) + go func() { + for range c { + close(ok) + } + }() + + quitCh := make(chan struct{}) + var recvQuitCh <-chan struct{} // nolint:gosimple + recvQuitCh = quitCh + + clientCreatorMock := &mocks.ClientCreator{} + + clientMock := &abcimocks.Client{} + clientMock.On("SetLogger", mock.Anything).Return() + clientMock.On("Start").Return(nil) + clientMock.On("Stop").Return(nil) + + clientMock.On("Quit").Return(recvQuitCh) + clientMock.On("Error").Return(errors.New("EOF")).Once() + + clientCreatorMock.On("NewABCIClient").Return(clientMock, nil) + + appConns := NewAppConns(clientCreatorMock) + + err := appConns.Start() + require.NoError(t, err) + defer appConns.Stop() + + // simulate failure + close(quitCh) + + select { + case <-ok: + t.Log("SIGTERM successfully received") + case <-time.After(5 * time.Second): + t.Fatal("expected process to receive SIGTERM signal") + } +}