Browse Source

abci/client: remove client-level callback (#7845)

* abci/client: remove client-level callback

* ditch multi conn con

* fix lint

* fix teset
pull/7863/head
Sam Kleinman 2 years ago
committed by GitHub
parent
commit
c2cce2a696
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 51 additions and 239 deletions
  1. +0
    -15
      abci/client/client.go
  2. +1
    -4
      abci/client/creators.go
  3. +4
    -20
      abci/client/grpc_client.go
  4. +2
    -14
      abci/client/local_client.go
  5. +0
    -19
      abci/client/mocks/client.go
  6. +0
    -10
      abci/client/socket_client.go
  7. +2
    -3
      internal/consensus/byzantine_test.go
  8. +2
    -3
      internal/consensus/common_test.go
  9. +0
    -1
      internal/consensus/mocks/cons_sync_reactor.go
  10. +2
    -3
      internal/consensus/reactor_test.go
  11. +0
    -1
      internal/evidence/mocks/block_store.go
  12. +5
    -3
      internal/mempool/mempool.go
  13. +0
    -10
      internal/proxy/app_conn.go
  14. +0
    -7
      internal/proxy/mocks/app_conn_consensus.go
  15. +0
    -5
      internal/proxy/mocks/app_conn_mempool.go
  16. +26
    -111
      internal/proxy/multi_app_conn.go
  17. +7
    -6
      internal/proxy/multi_app_conn_test.go
  18. +0
    -1
      internal/state/indexer/mocks/event_sink.go
  19. +0
    -1
      internal/state/mocks/evidence_pool.go
  20. +0
    -1
      internal/state/mocks/store.go
  21. +0
    -1
      internal/statesync/mocks/state_provider.go

+ 0
- 15
abci/client/client.go View File

@ -28,7 +28,6 @@ const (
type Client interface {
service.Service
SetResponseCallback(Callback)
Error() error
// Asynchronous requests
@ -70,8 +69,6 @@ func NewClient(logger log.Logger, addr, transport string, mustConnect bool) (cli
return
}
type Callback func(*types.Request, *types.Response)
type ReqRes struct {
*types.Request
*types.Response // Not set atomically, so be sure to use WaitGroup.
@ -117,18 +114,6 @@ func (r *ReqRes) InvokeCallback() {
}
}
// GetCallback returns the configured callback of the ReqRes object which may be
// nil. Note, it is not safe to concurrently call this in cases where it is
// marked done and SetCallback is called before calling GetCallback as that
// will invoke the callback twice and create a potential race condition.
//
// ref: https://github.com/tendermint/tendermint/issues/5439
func (r *ReqRes) GetCallback() func(*types.Response) {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.cb
}
// SetDone marks the ReqRes object as done.
func (r *ReqRes) SetDone() {
r.mtx.Lock()


+ 1
- 4
abci/client/creators.go View File

@ -2,7 +2,6 @@ package abciclient
import (
"fmt"
"sync"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
@ -14,10 +13,8 @@ type Creator func(log.Logger) (Client, error)
// NewLocalCreator returns a Creator for the given app,
// which will be running locally.
func NewLocalCreator(app types.Application) Creator {
mtx := new(sync.Mutex)
return func(logger log.Logger) (Client, error) {
return NewLocalClient(logger, mtx, app), nil
return NewLocalClient(logger, app), nil
}
}


+ 4
- 20
abci/client/grpc_client.go View File

@ -28,10 +28,9 @@ type grpcClient struct {
conn *grpc.ClientConn
chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool
mtx sync.Mutex
addr string
err error
resCb func(*types.Request, *types.Response) // listens to all callbacks
mtx sync.Mutex
addr string
err error
}
var _ Client = (*grpcClient)(nil)
@ -78,15 +77,8 @@ func (cli *grpcClient) OnStart(ctx context.Context) error {
reqres.SetDone()
// Notify client listener if set
if cli.resCb != nil {
cli.resCb(reqres.Request, reqres.Response)
}
// Notify reqRes listener if set
if cb := reqres.GetCallback(); cb != nil {
cb(reqres.Response)
}
reqres.InvokeCallback()
}
for {
@ -170,14 +162,6 @@ func (cli *grpcClient) Error() error {
return cli.err
}
// Set listener for all responses
// NOTE: callback may get internally generated flush responses.
func (cli *grpcClient) SetResponseCallback(resCb Callback) {
cli.mtx.Lock()
cli.resCb = resCb
cli.mtx.Unlock()
}
//----------------------------------------
// NOTE: call is synchronous, use ctx to break early if needed


+ 2
- 14
abci/client/local_client.go View File

@ -16,9 +16,8 @@ import (
type localClient struct {
service.BaseService
mtx *sync.Mutex
mtx sync.Mutex
types.Application
Callback
}
var _ Client = (*localClient)(nil)
@ -27,12 +26,8 @@ var _ Client = (*localClient)(nil)
// methods of the given app.
//
// Both Async and Sync methods ignore the given context.Context parameter.
func NewLocalClient(logger log.Logger, mtx *sync.Mutex, app types.Application) Client {
if mtx == nil {
mtx = new(sync.Mutex)
}
func NewLocalClient(logger log.Logger, app types.Application) Client {
cli := &localClient{
mtx: mtx,
Application: app,
}
cli.BaseService = *service.NewBaseService(logger, "localClient", cli)
@ -42,12 +37,6 @@ func NewLocalClient(logger log.Logger, mtx *sync.Mutex, app types.Application) C
func (*localClient) OnStart(context.Context) error { return nil }
func (*localClient) OnStop() {}
func (app *localClient) SetResponseCallback(cb Callback) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.Callback = cb
}
// TODO: change types.Application to include Error()?
func (app *localClient) Error() error {
return nil
@ -233,7 +222,6 @@ func (app *localClient) FinalizeBlock(
//-------------------------------------------------------
func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRes {
app.Callback(req, res)
return newLocalReqRes(req, res)
}


+ 0
- 19
abci/client/mocks/client.go View File

@ -427,11 +427,6 @@ func (_m *Client) Query(_a0 context.Context, _a1 types.RequestQuery) (*types.Res
return r0, r1
}
// SetResponseCallback provides a mock function with given fields: _a0
func (_m *Client) SetResponseCallback(_a0 abciclient.Callback) {
_m.Called(_a0)
}
// Start provides a mock function with given fields: _a0
func (_m *Client) Start(_a0 context.Context) error {
ret := _m.Called(_a0)
@ -446,20 +441,6 @@ func (_m *Client) Start(_a0 context.Context) error {
return r0
}
// String provides a mock function with given fields:
func (_m *Client) String() string {
ret := _m.Called()
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// VerifyVoteExtension provides a mock function with given fields: _a0, _a1
func (_m *Client) VerifyVoteExtension(_a0 context.Context, _a1 types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) {
ret := _m.Called(_a0, _a1)


+ 0
- 10
abci/client/socket_client.go View File

@ -107,16 +107,6 @@ func (cli *socketClient) Error() error {
return cli.err
}
// SetResponseCallback sets a callback, which will be executed for each
// non-error & non-empty response from the server.
//
// NOTE: callback may get internally generated flush responses.
func (cli *socketClient) SetResponseCallback(resCb Callback) {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.resCb = resCb
}
//----------------------------------------
func (cli *socketClient) sendRequestsRoutine(ctx context.Context, conn io.Writer) {


+ 2
- 3
internal/consensus/byzantine_test.go View File

@ -74,9 +74,8 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
blockStore := store.NewBlockStore(blockDB)
// one for mempool, one for consensus
mtx := new(sync.Mutex)
proxyAppConnMem := abciclient.NewLocalClient(logger, mtx, app)
proxyAppConnCon := abciclient.NewLocalClient(logger, mtx, app)
proxyAppConnMem := abciclient.NewLocalClient(logger, app)
proxyAppConnCon := abciclient.NewLocalClient(logger, app)
// Make Mempool
mempool := mempool.NewTxMempool(


+ 2
- 3
internal/consensus/common_test.go View File

@ -454,9 +454,8 @@ func newStateWithConfigAndBlockStore(
t.Helper()
// one for mempool, one for consensus
mtx := new(sync.Mutex)
proxyAppConnMem := abciclient.NewLocalClient(logger, mtx, app)
proxyAppConnCon := abciclient.NewLocalClient(logger, mtx, app)
proxyAppConnMem := abciclient.NewLocalClient(logger, app)
proxyAppConnCon := abciclient.NewLocalClient(logger, app)
// Make Mempool


+ 0
- 1
internal/consensus/mocks/cons_sync_reactor.go View File

@ -4,7 +4,6 @@ package mocks
import (
mock "github.com/stretchr/testify/mock"
state "github.com/tendermint/tendermint/internal/state"
)


+ 2
- 3
internal/consensus/reactor_test.go View File

@ -406,9 +406,8 @@ func TestReactorWithEvidence(t *testing.T) {
blockStore := store.NewBlockStore(blockDB)
// one for mempool, one for consensus
mtx := new(sync.Mutex)
proxyAppConnMem := abciclient.NewLocalClient(logger, mtx, app)
proxyAppConnCon := abciclient.NewLocalClient(logger, mtx, app)
proxyAppConnMem := abciclient.NewLocalClient(logger, app)
proxyAppConnCon := abciclient.NewLocalClient(logger, app)
mempool := mempool.NewTxMempool(
log.TestingLogger().With("module", "mempool"),


+ 0
- 1
internal/evidence/mocks/block_store.go View File

@ -4,7 +4,6 @@ package mocks
import (
mock "github.com/stretchr/testify/mock"
types "github.com/tendermint/tendermint/types"
)


+ 5
- 3
internal/mempool/mempool.go View File

@ -121,8 +121,6 @@ func NewTxMempool(
txmp.cache = NewLRUTxCache(cfg.CacheSize)
}
proxyAppConn.SetResponseCallback(txmp.defaultTxCallback)
for _, opt := range options {
opt(txmp)
}
@ -280,6 +278,8 @@ func (txmp *TxMempool) CheckTx(
timestamp: time.Now().UTC(),
height: txmp.height,
}
txmp.defaultTxCallback(reqRes.Request, res)
txmp.initTxCallback(wtx, res, txInfo)
if cb != nil {
@ -717,14 +717,16 @@ func (txmp *TxMempool) updateReCheckTxs(ctx context.Context) {
// Only execute CheckTx if the transaction is not marked as removed which
// could happen if the transaction was evicted.
if !txmp.txStore.IsTxRemoved(wtx.hash) {
_, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{
res, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{
Tx: wtx.tx,
Type: abci.CheckTxType_Recheck,
})
if err != nil {
// no need in retrying since the tx will be rechecked after the next block
txmp.logger.Error("failed to execute CheckTx during rechecking", "err", err)
continue
}
txmp.defaultTxCallback(res.Request, res.Response)
}
}


+ 0
- 10
internal/proxy/app_conn.go View File

@ -16,7 +16,6 @@ import (
// Enforce which abci msgs can be sent on a connection at the type level
type AppConnConsensus interface {
SetResponseCallback(abciclient.Callback)
Error() error
InitChain(context.Context, types.RequestInitChain) (*types.ResponseInitChain, error)
@ -30,7 +29,6 @@ type AppConnConsensus interface {
}
type AppConnMempool interface {
SetResponseCallback(abciclient.Callback)
Error() error
CheckTxAsync(context.Context, types.RequestCheckTx) (*abciclient.ReqRes, error)
@ -74,10 +72,6 @@ func NewAppConnConsensus(appConn abciclient.Client, metrics *Metrics) AppConnCon
}
}
func (app *appConnConsensus) SetResponseCallback(cb abciclient.Callback) {
app.appConn.SetResponseCallback(cb)
}
func (app *appConnConsensus) Error() error {
return app.appConn.Error()
}
@ -150,10 +144,6 @@ func NewAppConnMempool(appConn abciclient.Client, metrics *Metrics) AppConnMempo
}
}
func (app *appConnMempool) SetResponseCallback(cb abciclient.Callback) {
app.appConn.SetResponseCallback(cb)
}
func (app *appConnMempool) Error() error {
return app.appConn.Error()
}


+ 0
- 7
internal/proxy/mocks/app_conn_consensus.go View File

@ -5,8 +5,6 @@ package mocks
import (
context "context"
abciclient "github.com/tendermint/tendermint/abci/client"
mock "github.com/stretchr/testify/mock"
types "github.com/tendermint/tendermint/abci/types"
@ -169,11 +167,6 @@ func (_m *AppConnConsensus) ProcessProposal(_a0 context.Context, _a1 types.Reque
return r0, r1
}
// SetResponseCallback provides a mock function with given fields: _a0
func (_m *AppConnConsensus) SetResponseCallback(_a0 abciclient.Callback) {
_m.Called(_a0)
}
// VerifyVoteExtension provides a mock function with given fields: _a0, _a1
func (_m *AppConnConsensus) VerifyVoteExtension(_a0 context.Context, _a1 types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) {
ret := _m.Called(_a0, _a1)


+ 0
- 5
internal/proxy/mocks/app_conn_mempool.go View File

@ -113,8 +113,3 @@ func (_m *AppConnMempool) FlushAsync(_a0 context.Context) (*abciclient.ReqRes, e
return r0, r1
}
// SetResponseCallback provides a mock function with given fields: _a0
func (_m *AppConnMempool) SetResponseCallback(_a0 abciclient.Callback) {
_m.Called(_a0)
}

+ 26
- 111
internal/proxy/multi_app_conn.go View File

@ -2,7 +2,6 @@ package proxy
import (
"context"
"fmt"
"os"
"syscall"
@ -11,13 +10,6 @@ import (
"github.com/tendermint/tendermint/libs/service"
)
const (
connConsensus = "consensus"
connMempool = "mempool"
connQuery = "query"
connSnapshot = "snapshot"
)
// AppConns is the Tendermint's interface to the application that consists of
// multiple connections.
type AppConns interface {
@ -53,10 +45,7 @@ type multiAppConn struct {
queryConn AppConnQuery
snapshotConn AppConnSnapshot
consensusConnClient stoppableClient
mempoolConnClient stoppableClient
queryConnClient stoppableClient
snapshotConnClient stoppableClient
client stoppableClient
clientCreator abciclient.Creator
}
@ -89,123 +78,49 @@ func (app *multiAppConn) OnStart(ctx context.Context) error {
var err error
defer func() {
if err != nil {
app.stopAllClients()
app.client.Stop()
}
}()
app.queryConnClient, err = app.abciClientFor(ctx, connQuery)
var client abciclient.Client
client, err = app.clientCreator(app.logger)
if err != nil {
return err
}
app.queryConn = NewAppConnQuery(app.queryConnClient, app.metrics)
app.snapshotConnClient, err = app.abciClientFor(ctx, connSnapshot)
if err != nil {
return err
}
app.snapshotConn = NewAppConnSnapshot(app.snapshotConnClient, app.metrics)
app.queryConn = NewAppConnQuery(client, app.metrics)
app.snapshotConn = NewAppConnSnapshot(client, app.metrics)
app.mempoolConn = NewAppConnMempool(client, app.metrics)
app.consensusConn = NewAppConnConsensus(client, app.metrics)
app.mempoolConnClient, err = app.abciClientFor(ctx, connMempool)
if err != nil {
return err
}
app.mempoolConn = NewAppConnMempool(app.mempoolConnClient, app.metrics)
app.consensusConnClient, err = app.abciClientFor(ctx, connConsensus)
if err != nil {
return err
}
app.consensusConn = NewAppConnConsensus(app.consensusConnClient, app.metrics)
app.client = client.(stoppableClient)
// Kill Tendermint if the ABCI application crashes.
app.startWatchersForClientErrorToKillTendermint(ctx)
return nil
}
func (app *multiAppConn) OnStop() { app.stopAllClients() }
func (app *multiAppConn) startWatchersForClientErrorToKillTendermint(ctx context.Context) {
// this function starts a number of threads (per abci client)
// that will SIGTERM's our own PID if any of the ABCI clients
// exit/return early. If the context is canceled then these
// functions will not kill tendermint.
killFn := func(conn string, err error, logger log.Logger) {
logger.Error(
fmt.Sprintf("%s connection terminated. Did the application crash? Please restart tendermint", conn),
"err", err)
if killErr := kill(); killErr != nil {
logger.Error("Failed to kill this process - please do so manually", "err", killErr)
go func() {
if !client.IsRunning() {
return
}
app.client.Wait()
if ctx.Err() != nil {
return
}
}
for _, client := range []struct {
connClient stoppableClient
name string
}{
{
connClient: app.consensusConnClient,
name: connConsensus,
},
{
connClient: app.mempoolConnClient,
name: connMempool,
},
{
connClient: app.queryConnClient,
name: connQuery,
},
{
connClient: app.snapshotConnClient,
name: connSnapshot,
},
} {
go func(name string, client stoppableClient) {
client.Wait()
if ctx.Err() != nil {
return
if err := app.client.Error(); err != nil {
app.logger.Error("client connection terminated. Did the application crash? Please restart tendermint",
"err", err)
if killErr := kill(); killErr != nil {
app.logger.Error("Failed to kill this process - please do so manually",
"err", killErr)
}
if err := client.Error(); err != nil {
killFn(name, err, app.logger)
}
}(client.name, client.connClient)
}
}
func (app *multiAppConn) stopAllClients() {
for _, client := range []stoppableClient{
app.consensusConnClient,
app.mempoolConnClient,
app.queryConnClient,
app.snapshotConnClient,
} {
if client != nil {
client.Stop()
}
}
}
func (app *multiAppConn) abciClientFor(ctx context.Context, conn string) (stoppableClient, error) {
c, err := app.clientCreator(app.logger.With(
"module", "abci-client",
"connection", conn))
if err != nil {
return nil, fmt.Errorf("error creating ABCI client (%s connection): %w", conn, err)
}
if err := c.Start(ctx); err != nil {
return nil, fmt.Errorf("error starting ABCI client (%s connection): %w", conn, err)
}
client, ok := c.(stoppableClient)
if !ok {
return nil, fmt.Errorf("%T is not a stoppable client", c)
}
}()
return client, nil
return client.Start(ctx)
}
func (app *multiAppConn) OnStop() { app.client.Stop() }
func kill() error {
p, err := os.FindProcess(os.Getpid())
if err != nil {


+ 7
- 6
internal/proxy/multi_app_conn_test.go View File

@ -30,9 +30,10 @@ func TestAppConns_Start_Stop(t *testing.T) {
defer cancel()
clientMock := &abcimocks.Client{}
clientMock.On("Start", mock.Anything).Return(nil).Times(4)
clientMock.On("Start", mock.Anything).Return(nil)
clientMock.On("Error").Return(nil)
clientMock.On("Wait").Return(nil).Times(4)
clientMock.On("IsRunning").Return(true)
clientMock.On("Wait").Return(nil).Times(1)
cl := &noopStoppableClientImpl{Client: clientMock}
creatorCallCount := 0
@ -46,14 +47,14 @@ func TestAppConns_Start_Stop(t *testing.T) {
err := appConns.Start(ctx)
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
time.Sleep(200 * time.Millisecond)
cancel()
appConns.Wait()
clientMock.AssertExpectations(t)
assert.Equal(t, 4, cl.count)
assert.Equal(t, 4, creatorCallCount)
assert.Equal(t, 1, cl.count)
assert.Equal(t, 1, creatorCallCount)
}
// Upon failure, we call tmos.Kill
@ -74,7 +75,7 @@ func TestAppConns_Failure(t *testing.T) {
clientMock := &abcimocks.Client{}
clientMock.On("SetLogger", mock.Anything).Return()
clientMock.On("Start", mock.Anything).Return(nil)
clientMock.On("IsRunning").Return(true)
clientMock.On("Wait").Return(nil)
clientMock.On("Error").Return(errors.New("EOF"))
cl := &noopStoppableClientImpl{Client: clientMock}


+ 0
- 1
internal/state/indexer/mocks/event_sink.go View File

@ -6,7 +6,6 @@ import (
context "context"
mock "github.com/stretchr/testify/mock"
indexer "github.com/tendermint/tendermint/internal/state/indexer"
query "github.com/tendermint/tendermint/internal/pubsub/query"


+ 0
- 1
internal/state/mocks/evidence_pool.go View File

@ -4,7 +4,6 @@ package mocks
import (
mock "github.com/stretchr/testify/mock"
state "github.com/tendermint/tendermint/internal/state"
types "github.com/tendermint/tendermint/types"
)


+ 0
- 1
internal/state/mocks/store.go View File

@ -4,7 +4,6 @@ package mocks
import (
mock "github.com/stretchr/testify/mock"
state "github.com/tendermint/tendermint/internal/state"
tendermintstate "github.com/tendermint/tendermint/proto/tendermint/state"


+ 0
- 1
internal/statesync/mocks/state_provider.go View File

@ -6,7 +6,6 @@ import (
context "context"
mock "github.com/stretchr/testify/mock"
state "github.com/tendermint/tendermint/internal/state"
types "github.com/tendermint/tendermint/types"


Loading…
Cancel
Save