diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 9b333b83c..4eb9d85e5 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -27,4 +27,5 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi ### BUG FIXES -- [\#7057](https://github.com/tendermint/tendermint/pull/7057) Import Postgres driver support for the psql indexer (@creachadair). \ No newline at end of file +- [\#7057](https://github.com/tendermint/tendermint/pull/7057) Import Postgres driver support for the psql indexer (@creachadair). +- [\#7106](https://github.com/tendermint/tendermint/pull/7106) Revert mutex change to ABCI Clients (@tychoish). diff --git a/abci/client/client.go b/abci/client/client.go index 0bfe7d497..c5c1ab219 100644 --- a/abci/client/client.go +++ b/abci/client/client.go @@ -81,7 +81,7 @@ type ReqRes struct { *sync.WaitGroup *types.Response // Not set atomically, so be sure to use WaitGroup. - mtx tmsync.RWMutex + mtx tmsync.Mutex done bool // Gets set to true once *after* WaitGroup.Done(). cb func(*types.Response) // A single callback that may be set. } @@ -131,16 +131,16 @@ func (r *ReqRes) InvokeCallback() { // // ref: https://github.com/tendermint/tendermint/issues/5439 func (r *ReqRes) GetCallback() func(*types.Response) { - r.mtx.RLock() - defer r.mtx.RUnlock() + r.mtx.Lock() + defer r.mtx.Unlock() return r.cb } // SetDone marks the ReqRes object as done. func (r *ReqRes) SetDone() { r.mtx.Lock() - defer r.mtx.Unlock() r.done = true + r.mtx.Unlock() } func waitGroup1() (wg *sync.WaitGroup) { diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go index 71f460d2b..b375c6cc9 100644 --- a/abci/client/grpc_client.go +++ b/abci/client/grpc_client.go @@ -27,7 +27,7 @@ type grpcClient struct { conn *grpc.ClientConn chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool - mtx tmsync.RWMutex + mtx tmsync.Mutex addr string err error resCb func(*types.Request, *types.Response) // listens to all callbacks @@ -146,8 +146,8 @@ func (cli *grpcClient) StopForError(err error) { } func (cli *grpcClient) Error() error { - cli.mtx.RLock() - defer cli.mtx.RUnlock() + cli.mtx.Lock() + defer cli.mtx.Unlock() return cli.err } @@ -155,8 +155,8 @@ func (cli *grpcClient) Error() error { // NOTE: callback may get internally generated flush responses. func (cli *grpcClient) SetResponseCallback(resCb Callback) { cli.mtx.Lock() - defer cli.mtx.Unlock() cli.resCb = resCb + cli.mtx.Unlock() } //---------------------------------------- diff --git a/abci/client/local_client.go b/abci/client/local_client.go index 09e644641..01fc82825 100644 --- a/abci/client/local_client.go +++ b/abci/client/local_client.go @@ -15,7 +15,7 @@ var _ Client = (*localClient)(nil) type localClient struct { service.BaseService - mtx *tmsync.RWMutex + mtx *tmsync.Mutex types.Application Callback } @@ -26,24 +26,22 @@ var _ Client = (*localClient)(nil) // methods of the given app. // // Both Async and Sync methods ignore the given context.Context parameter. -func NewLocalClient(mtx *tmsync.RWMutex, app types.Application) Client { +func NewLocalClient(mtx *tmsync.Mutex, app types.Application) Client { if mtx == nil { - mtx = &tmsync.RWMutex{} + mtx = new(tmsync.Mutex) } - cli := &localClient{ mtx: mtx, Application: app, } - cli.BaseService = *service.NewBaseService(nil, "localClient", cli) return cli } func (app *localClient) SetResponseCallback(cb Callback) { app.mtx.Lock() - defer app.mtx.Unlock() app.Callback = cb + app.mtx.Unlock() } // TODO: change types.Application to include Error()? @@ -67,8 +65,8 @@ func (app *localClient) EchoAsync(msg string) *ReqRes { } func (app *localClient) InfoAsync(req types.RequestInfo) *ReqRes { - app.mtx.RLock() - defer app.mtx.RUnlock() + app.mtx.Lock() + defer app.mtx.Unlock() res := app.Application.Info(req) return app.callback( @@ -111,8 +109,8 @@ func (app *localClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes { } func (app *localClient) QueryAsync(req types.RequestQuery) *ReqRes { - app.mtx.RLock() - defer app.mtx.RUnlock() + app.mtx.Lock() + defer app.mtx.Unlock() res := app.Application.Query(req) return app.callback( @@ -220,8 +218,8 @@ func (app *localClient) EchoSync(msg string) (*types.ResponseEcho, error) { } func (app *localClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) { - app.mtx.RLock() - defer app.mtx.RUnlock() + app.mtx.Lock() + defer app.mtx.Unlock() res := app.Application.Info(req) return &res, nil @@ -252,8 +250,8 @@ func (app *localClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCh } func (app *localClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) { - app.mtx.RLock() - defer app.mtx.RUnlock() + app.mtx.Lock() + defer app.mtx.Unlock() res := app.Application.Query(req) return &res, nil diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index fd32315a6..a369f878c 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -34,7 +34,7 @@ type socketClient struct { reqQueue chan *ReqRes flushTimer *timer.ThrottleTimer - mtx tmsync.RWMutex + mtx tmsync.Mutex err error reqSent *list.List // list of requests sent, waiting for response resCb func(*types.Request, *types.Response) // called on all requests, if set. @@ -99,8 +99,8 @@ func (cli *socketClient) OnStop() { // Error returns an error if the client was stopped abruptly. func (cli *socketClient) Error() error { - cli.mtx.RLock() - defer cli.mtx.RUnlock() + cli.mtx.Lock() + defer cli.mtx.Unlock() return cli.err } @@ -110,8 +110,8 @@ func (cli *socketClient) Error() error { // NOTE: callback may get internally generated flush responses. func (cli *socketClient) SetResponseCallback(resCb Callback) { cli.mtx.Lock() - defer cli.mtx.Unlock() cli.resCb = resCb + cli.mtx.Unlock() } //---------------------------------------- diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 7df700416..f5e372ae2 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -59,7 +59,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { blockStore := store.NewBlockStore(blockDB) // one for mempool, one for consensus - mtx := new(tmsync.RWMutex) + mtx := new(tmsync.Mutex) proxyAppConnMem := abcicli.NewLocalClient(mtx, app) proxyAppConnCon := abcicli.NewLocalClient(mtx, app) diff --git a/consensus/common_test.go b/consensus/common_test.go index 62bc89633..60bb9eed5 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -389,7 +389,7 @@ func newStateWithConfigAndBlockStore( blockStore := store.NewBlockStore(blockDB) // one for mempool, one for consensus - mtx := new(tmsync.RWMutex) + mtx := new(tmsync.Mutex) proxyAppConnMem := abcicli.NewLocalClient(mtx, app) proxyAppConnCon := abcicli.NewLocalClient(mtx, app) diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 39a730d13..f23ec727d 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -153,7 +153,7 @@ func TestReactorWithEvidence(t *testing.T) { blockStore := store.NewBlockStore(blockDB) // one for mempool, one for consensus - mtx := new(tmsync.RWMutex) + mtx := new(tmsync.Mutex) proxyAppConnMem := abcicli.NewLocalClient(mtx, app) proxyAppConnCon := abcicli.NewLocalClient(mtx, app) diff --git a/proxy/client.go b/proxy/client.go index 0504b09f3..e78e827ab 100644 --- a/proxy/client.go +++ b/proxy/client.go @@ -21,7 +21,7 @@ type ClientCreator interface { // local proxy uses a mutex on an in-proc app type localClientCreator struct { - mtx *tmsync.RWMutex + mtx *tmsync.Mutex app types.Application } @@ -29,7 +29,7 @@ type localClientCreator struct { // which will be running locally. func NewLocalClientCreator(app types.Application) ClientCreator { return &localClientCreator{ - mtx: new(tmsync.RWMutex), + mtx: new(tmsync.Mutex), app: app, } }