Browse Source

abci/client: remove lingering async client code (#7876)

While I'd hoped to be able to make the socket client less weird, I
think that this is a nice middle ground in terms of improving
readability and removing the vestigal components without breaking
anything or radically changing the underlying assumptions. 

In the future we'd want to have requests be identified by a request
ID, and then we could drop the request tracking logic in the client
entirely, and this is protocol breaking. The alternatives aren't
substantively different than the current implementation.
pull/7880/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
abfcd08903
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 106 additions and 392 deletions
  1. +12
    -42
      abci/client/client.go
  2. +4
    -13
      abci/client/doc.go
  3. +2
    -62
      abci/client/grpc_client.go
  4. +16
    -61
      abci/client/local_client.go
  5. +0
    -26
      abci/client/mocks/client.go
  6. +72
    -163
      abci/client/socket_client.go
  7. +0
    -25
      internal/proxy/mocks/app_conn_mempool.go

+ 12
- 42
abci/client/client.go View File

@ -19,8 +19,8 @@ const (
// Client defines an interface for an ABCI client.
//
// All `Async` methods return a `ReqRes` object and an error.
// All `Sync` methods return the appropriate protobuf ResponseXxx struct and an error.
// All methods return the appropriate protobuf ResponseXxx struct and
// an error.
//
// NOTE these are client errors, eg. ABCI socket connectivity issues.
// Application-related errors are reflected in response via ABCI error codes
@ -52,65 +52,35 @@ type Client interface {
// NewClient returns a new ABCI client of the specified transport type.
// It returns an error if the transport is not "socket" or "grpc"
func NewClient(logger log.Logger, addr, transport string, mustConnect bool) (client Client, err error) {
func NewClient(logger log.Logger, addr, transport string, mustConnect bool) (Client, error) {
switch transport {
case "socket":
client = NewSocketClient(logger, addr, mustConnect)
return NewSocketClient(logger, addr, mustConnect), nil
case "grpc":
client = NewGRPCClient(logger, addr, mustConnect)
return NewGRPCClient(logger, addr, mustConnect), nil
default:
err = fmt.Errorf("unknown abci transport %s", transport)
return nil, fmt.Errorf("unknown abci transport %s", transport)
}
return
}
type ReqRes struct {
type requestAndResponse struct {
*types.Request
*types.Response // Not set atomically, so be sure to use WaitGroup.
*types.Response
mtx sync.Mutex
signal chan struct{}
cb func(*types.Response) // A single callback that may be set.
}
func NewReqRes(req *types.Request) *ReqRes {
return &ReqRes{
func makeReqRes(req *types.Request) *requestAndResponse {
return &requestAndResponse{
Request: req,
Response: nil,
signal: make(chan struct{}),
cb: nil,
}
}
// Sets sets the callback. If reqRes is already done, it will call the cb
// immediately. Note, reqRes.cb should not change if reqRes.done and only one
// callback is supported.
func (r *ReqRes) SetCallback(cb func(res *types.Response)) {
r.mtx.Lock()
select {
case <-r.signal:
r.mtx.Unlock()
cb(r.Response)
default:
r.cb = cb
r.mtx.Unlock()
}
}
// InvokeCallback invokes a thread-safe execution of the configured callback
// if non-nil.
func (r *ReqRes) InvokeCallback() {
r.mtx.Lock()
defer r.mtx.Unlock()
if r.cb != nil {
r.cb(r.Response)
}
}
// SetDone marks the ReqRes object as done.
func (r *ReqRes) SetDone() {
// markDone marks the ReqRes object as done.
func (r *requestAndResponse) markDone() {
r.mtx.Lock()
defer r.mtx.Unlock()


+ 4
- 13
abci/client/doc.go View File

@ -7,23 +7,14 @@
//
// ## Socket client
//
// async: the client maintains an internal buffer of a fixed size. when the
// buffer becomes full, all Async calls will return an error immediately.
//
// sync: the client blocks on 1) enqueuing the Sync request 2) enqueuing the
// Flush requests 3) waiting for the Flush response
// The client blocks for enqueuing the request, for enqueuing the
// Flush to send the request, and for the Flush response to return.
//
// ## Local client
//
// async: global mutex is locked during each call (meaning it's not really async!)
// sync: global mutex is locked during each call
// The global mutex is locked during each call
//
// ## gRPC client
//
// async: gRPC is synchronous, but an internal buffer of a fixed size is used
// to store responses and later call callbacks (separate goroutine per
// response).
//
// sync: waits for all Async calls to complete (essentially what Flush does in
// the socket client) and calls Sync method.
// The client waits for all calls to complete.
package abciclient

+ 2
- 62
abci/client/grpc_client.go View File

@ -24,9 +24,8 @@ type grpcClient struct {
mustConnect bool
client types.ABCIApplicationClient
conn *grpc.ClientConn
chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool
client types.ABCIApplicationClient
conn *grpc.ClientConn
mtx sync.Mutex
addr string
@ -38,25 +37,11 @@ var _ Client = (*grpcClient)(nil)
// NewGRPCClient creates a gRPC client, which will connect to addr upon the
// start. Note Client#Start returns an error if connection is unsuccessful and
// mustConnect is true.
//
// GRPC calls are synchronous, but some callbacks expect to be called
// asynchronously (eg. the mempool expects to be able to lock to remove bad txs
// from cache). To accommodate, we finish each call in its own go-routine,
// which is expensive, but easy - if you want something better, use the socket
// protocol! maybe one day, if people really want it, we use grpc streams, but
// hopefully not :D
func NewGRPCClient(logger log.Logger, addr string, mustConnect bool) Client {
cli := &grpcClient{
logger: logger,
addr: addr,
mustConnect: mustConnect,
// Buffering the channel is needed to make calls appear asynchronous,
// which is required when the caller makes multiple async calls before
// processing callbacks (e.g. due to holding locks). 64 means that a
// caller can make up to 64 async calls before a callback must be
// processed (otherwise it deadlocks). It also means that we can make 64
// gRPC calls while processing a slow callback at the channel head.
chReqRes: make(chan *ReqRes, 64),
}
cli.BaseService = *service.NewBaseService(logger, "grpcClient", cli)
return cli
@ -67,35 +52,6 @@ func dialerFunc(ctx context.Context, addr string) (net.Conn, error) {
}
func (cli *grpcClient) OnStart(ctx context.Context) error {
// This processes asynchronous request/response messages and dispatches
// them to callbacks.
go func() {
// Use a separate function to use defer for mutex unlocks (this handles panics)
callCb := func(reqres *ReqRes) {
cli.mtx.Lock()
defer cli.mtx.Unlock()
reqres.SetDone()
// Notify reqRes listener if set
reqres.InvokeCallback()
}
for {
select {
case reqres := <-cli.chReqRes:
if reqres != nil {
callCb(reqres)
} else {
cli.logger.Error("Received nil reqres")
}
case <-ctx.Done():
return
}
}
}()
RETRY_LOOP:
for {
conn, err := grpc.Dial(cli.addr,
@ -138,22 +94,6 @@ func (cli *grpcClient) OnStop() {
if cli.conn != nil {
cli.conn.Close()
}
close(cli.chReqRes)
}
func (cli *grpcClient) StopForError(err error) {
if !cli.IsRunning() {
return
}
cli.mtx.Lock()
if cli.err == nil {
cli.err = err
}
cli.mtx.Unlock()
cli.logger.Error("Stopping abci.grpcClient for error", "err", err)
cli.Stop()
}
func (cli *grpcClient) Error() error {


+ 16
- 61
abci/client/local_client.go View File

@ -25,7 +25,7 @@ var _ Client = (*localClient)(nil)
// NewLocalClient creates a local client, which will be directly calling the
// methods of the given app.
//
// Both Async and Sync methods ignore the given context.Context parameter.
// The client methods ignore their context argument.
func NewLocalClient(logger log.Logger, app types.Application) Client {
cli := &localClient{
Application: app,
@ -36,19 +36,13 @@ func NewLocalClient(logger log.Logger, app types.Application) Client {
func (*localClient) OnStart(context.Context) error { return nil }
func (*localClient) OnStop() {}
// TODO: change types.Application to include Error()?
func (app *localClient) Error() error {
return nil
}
func (*localClient) Error() error { return nil }
//-------------------------------------------------------
func (app *localClient) Flush(ctx context.Context) error {
return nil
}
func (*localClient) Flush(context.Context) error { return nil }
func (app *localClient) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) {
func (app *localClient) Echo(_ context.Context, msg string) (*types.ResponseEcho, error) {
return &types.ResponseEcho{Message: msg}, nil
}
@ -60,10 +54,7 @@ func (app *localClient) Info(ctx context.Context, req types.RequestInfo) (*types
return &res, nil
}
func (app *localClient) CheckTx(
ctx context.Context,
req types.RequestCheckTx,
) (*types.ResponseCheckTx, error) {
func (app *localClient) CheckTx(_ context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
@ -71,10 +62,7 @@ func (app *localClient) CheckTx(
return &res, nil
}
func (app *localClient) Query(
ctx context.Context,
req types.RequestQuery,
) (*types.ResponseQuery, error) {
func (app *localClient) Query(_ context.Context, req types.RequestQuery) (*types.ResponseQuery, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
@ -90,11 +78,7 @@ func (app *localClient) Commit(ctx context.Context) (*types.ResponseCommit, erro
return &res, nil
}
func (app *localClient) InitChain(
ctx context.Context,
req types.RequestInitChain,
) (*types.ResponseInitChain, error) {
func (app *localClient) InitChain(_ context.Context, req types.RequestInitChain) (*types.ResponseInitChain, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
@ -102,11 +86,7 @@ func (app *localClient) InitChain(
return &res, nil
}
func (app *localClient) ListSnapshots(
ctx context.Context,
req types.RequestListSnapshots,
) (*types.ResponseListSnapshots, error) {
func (app *localClient) ListSnapshots(_ context.Context, req types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
@ -114,11 +94,7 @@ func (app *localClient) ListSnapshots(
return &res, nil
}
func (app *localClient) OfferSnapshot(
ctx context.Context,
req types.RequestOfferSnapshot,
) (*types.ResponseOfferSnapshot, error) {
func (app *localClient) OfferSnapshot(_ context.Context, req types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
@ -126,10 +102,7 @@ func (app *localClient) OfferSnapshot(
return &res, nil
}
func (app *localClient) LoadSnapshotChunk(
ctx context.Context,
req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
func (app *localClient) LoadSnapshotChunk(_ context.Context, req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
@ -137,10 +110,7 @@ func (app *localClient) LoadSnapshotChunk(
return &res, nil
}
func (app *localClient) ApplySnapshotChunk(
ctx context.Context,
req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
func (app *localClient) ApplySnapshotChunk(_ context.Context, req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
@ -148,10 +118,7 @@ func (app *localClient) ApplySnapshotChunk(
return &res, nil
}
func (app *localClient) PrepareProposal(
ctx context.Context,
req types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
func (app *localClient) PrepareProposal(_ context.Context, req types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
@ -159,10 +126,7 @@ func (app *localClient) PrepareProposal(
return &res, nil
}
func (app *localClient) ProcessProposal(
ctx context.Context,
req types.RequestProcessProposal) (*types.ResponseProcessProposal, error) {
func (app *localClient) ProcessProposal(_ context.Context, req types.RequestProcessProposal) (*types.ResponseProcessProposal, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
@ -170,10 +134,7 @@ func (app *localClient) ProcessProposal(
return &res, nil
}
func (app *localClient) ExtendVote(
ctx context.Context,
req types.RequestExtendVote) (*types.ResponseExtendVote, error) {
func (app *localClient) ExtendVote(_ context.Context, req types.RequestExtendVote) (*types.ResponseExtendVote, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
@ -181,10 +142,7 @@ func (app *localClient) ExtendVote(
return &res, nil
}
func (app *localClient) VerifyVoteExtension(
ctx context.Context,
req types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) {
func (app *localClient) VerifyVoteExtension(_ context.Context, req types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
@ -192,10 +150,7 @@ func (app *localClient) VerifyVoteExtension(
return &res, nil
}
func (app *localClient) FinalizeBlock(
ctx context.Context,
req types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
func (app *localClient) FinalizeBlock(_ context.Context, req types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
app.mtx.Lock()
defer app.mtx.Unlock()


+ 0
- 26
abci/client/mocks/client.go View File

@ -5,10 +5,7 @@ package mocks
import (
context "context"
abciclient "github.com/tendermint/tendermint/abci/client"
mock "github.com/stretchr/testify/mock"
types "github.com/tendermint/tendermint/abci/types"
)
@ -63,29 +60,6 @@ func (_m *Client) CheckTx(_a0 context.Context, _a1 types.RequestCheckTx) (*types
return r0, r1
}
// CheckTxAsync provides a mock function with given fields: _a0, _a1
func (_m *Client) CheckTxAsync(_a0 context.Context, _a1 types.RequestCheckTx) (*abciclient.ReqRes, error) {
ret := _m.Called(_a0, _a1)
var r0 *abciclient.ReqRes
if rf, ok := ret.Get(0).(func(context.Context, types.RequestCheckTx) *abciclient.ReqRes); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*abciclient.ReqRes)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, types.RequestCheckTx) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Commit provides a mock function with given fields: _a0
func (_m *Client) Commit(_a0 context.Context) (*types.ResponseCommit, error) {
ret := _m.Called(_a0)


+ 72
- 163
abci/client/socket_client.go View File

@ -8,7 +8,6 @@ import (
"fmt"
"io"
"net"
"reflect"
"sync"
"time"
@ -34,12 +33,11 @@ type socketClient struct {
mustConnect bool
conn net.Conn
reqQueue chan *ReqRes
reqQueue chan *requestAndResponse
mtx sync.Mutex
err error
reqSent *list.List // list of requests sent, waiting for response
resCb func(*types.Request, *types.Response) // called on all requests, if set.
reqSent *list.List // list of requests sent, waiting for response
}
var _ Client = (*socketClient)(nil)
@ -50,11 +48,10 @@ var _ Client = (*socketClient)(nil)
func NewSocketClient(logger log.Logger, addr string, mustConnect bool) Client {
cli := &socketClient{
logger: logger,
reqQueue: make(chan *ReqRes, reqQueueSize),
reqQueue: make(chan *requestAndResponse, reqQueueSize),
mustConnect: mustConnect,
addr: addr,
reqSent: list.New(),
resCb: nil,
}
cli.BaseService = *service.NewBaseService(logger, "socketClient", cli)
return cli
@ -126,6 +123,7 @@ func (cli *socketClient) sendRequestsRoutine(ctx context.Context, conn io.Writer
cli.stopForError(fmt.Errorf("write to buffer: %w", err))
return
}
if err := bw.Flush(); err != nil {
cli.stopForError(fmt.Errorf("flush buffer: %w", err))
return
@ -140,23 +138,20 @@ func (cli *socketClient) recvResponseRoutine(ctx context.Context, conn io.Reader
if ctx.Err() != nil {
return
}
var res = &types.Response{}
err := types.ReadMessage(r, res)
if err != nil {
res := &types.Response{}
if err := types.ReadMessage(r, res); err != nil {
cli.stopForError(fmt.Errorf("read message: %w", err))
return
}
// cli.logger.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
switch r := res.Value.(type) {
case *types.Response_Exception: // app responded with error
// XXX After setting cli.err, release waiters (e.g. reqres.Done())
cli.stopForError(errors.New(r.Exception.Error))
return
default:
err := cli.didRecvResponse(res)
if err != nil {
if err := cli.didRecvResponse(res); err != nil {
cli.stopForError(err)
return
}
@ -164,7 +159,7 @@ func (cli *socketClient) recvResponseRoutine(ctx context.Context, conn io.Reader
}
}
func (cli *socketClient) willSendReq(reqres *ReqRes) {
func (cli *socketClient) willSendReq(reqres *requestAndResponse) {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.reqSent.PushBack(reqres)
@ -177,258 +172,172 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error {
// Get the first ReqRes.
next := cli.reqSent.Front()
if next == nil {
return fmt.Errorf("unexpected %v when nothing expected", reflect.TypeOf(res.Value))
return fmt.Errorf("unexpected %T when nothing expected", res.Value)
}
reqres := next.Value.(*ReqRes)
reqres := next.Value.(*requestAndResponse)
if !resMatchesReq(reqres.Request, res) {
return fmt.Errorf("unexpected %v when response to %v expected",
reflect.TypeOf(res.Value), reflect.TypeOf(reqres.Request.Value))
return fmt.Errorf("unexpected %T when response to %T expected", res.Value, reqres.Request.Value)
}
reqres.Response = res
reqres.SetDone() // release waiters
reqres.markDone() // release waiters
cli.reqSent.Remove(next) // pop first item from linked list
// Notify client listener if set (global callback).
if cli.resCb != nil {
cli.resCb(reqres.Request, res)
}
// Notify reqRes listener if set (request specific callback).
//
// NOTE: It is possible this callback isn't set on the reqres object. At this
// point, in which case it will be called after, when it is set.
reqres.InvokeCallback()
return nil
}
//----------------------------------------
func (cli *socketClient) Flush(ctx context.Context) error {
reqRes, err := cli.queueRequest(ctx, types.ToRequestFlush())
_, err := cli.doRequest(ctx, types.ToRequestFlush())
if err != nil {
return queueErr(err)
}
if err := cli.Error(); err != nil {
return err
}
select {
case <-reqRes.signal:
return cli.Error()
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (cli *socketClient) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) {
reqres, err := cli.queueRequestAndFlush(ctx, types.ToRequestEcho(msg))
res, err := cli.doRequest(ctx, types.ToRequestEcho(msg))
if err != nil {
return nil, err
}
return reqres.Response.GetEcho(), nil
return res.GetEcho(), nil
}
func (cli *socketClient) Info(
ctx context.Context,
req types.RequestInfo,
) (*types.ResponseInfo, error) {
reqres, err := cli.queueRequestAndFlush(ctx, types.ToRequestInfo(req))
func (cli *socketClient) Info(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) {
res, err := cli.doRequest(ctx, types.ToRequestInfo(req))
if err != nil {
return nil, err
}
return reqres.Response.GetInfo(), nil
return res.GetInfo(), nil
}
func (cli *socketClient) CheckTx(
ctx context.Context,
req types.RequestCheckTx,
) (*types.ResponseCheckTx, error) {
reqres, err := cli.queueRequestAndFlush(ctx, types.ToRequestCheckTx(req))
func (cli *socketClient) CheckTx(ctx context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
res, err := cli.doRequest(ctx, types.ToRequestCheckTx(req))
if err != nil {
return nil, err
}
return reqres.Response.GetCheckTx(), nil
return res.GetCheckTx(), nil
}
func (cli *socketClient) Query(
ctx context.Context,
req types.RequestQuery,
) (*types.ResponseQuery, error) {
reqres, err := cli.queueRequestAndFlush(ctx, types.ToRequestQuery(req))
func (cli *socketClient) Query(ctx context.Context, req types.RequestQuery) (*types.ResponseQuery, error) {
res, err := cli.doRequest(ctx, types.ToRequestQuery(req))
if err != nil {
return nil, err
}
return reqres.Response.GetQuery(), nil
return res.GetQuery(), nil
}
func (cli *socketClient) Commit(ctx context.Context) (*types.ResponseCommit, error) {
reqres, err := cli.queueRequestAndFlush(ctx, types.ToRequestCommit())
res, err := cli.doRequest(ctx, types.ToRequestCommit())
if err != nil {
return nil, err
}
return reqres.Response.GetCommit(), nil
return res.GetCommit(), nil
}
func (cli *socketClient) InitChain(
ctx context.Context,
req types.RequestInitChain,
) (*types.ResponseInitChain, error) {
reqres, err := cli.queueRequestAndFlush(ctx, types.ToRequestInitChain(req))
func (cli *socketClient) InitChain(ctx context.Context, req types.RequestInitChain) (*types.ResponseInitChain, error) {
res, err := cli.doRequest(ctx, types.ToRequestInitChain(req))
if err != nil {
return nil, err
}
return reqres.Response.GetInitChain(), nil
return res.GetInitChain(), nil
}
func (cli *socketClient) ListSnapshots(
ctx context.Context,
req types.RequestListSnapshots,
) (*types.ResponseListSnapshots, error) {
reqres, err := cli.queueRequestAndFlush(ctx, types.ToRequestListSnapshots(req))
func (cli *socketClient) ListSnapshots(ctx context.Context, req types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
res, err := cli.doRequest(ctx, types.ToRequestListSnapshots(req))
if err != nil {
return nil, err
}
return reqres.Response.GetListSnapshots(), nil
return res.GetListSnapshots(), nil
}
func (cli *socketClient) OfferSnapshot(
ctx context.Context,
req types.RequestOfferSnapshot,
) (*types.ResponseOfferSnapshot, error) {
reqres, err := cli.queueRequestAndFlush(ctx, types.ToRequestOfferSnapshot(req))
func (cli *socketClient) OfferSnapshot(ctx context.Context, req types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) {
res, err := cli.doRequest(ctx, types.ToRequestOfferSnapshot(req))
if err != nil {
return nil, err
}
return reqres.Response.GetOfferSnapshot(), nil
return res.GetOfferSnapshot(), nil
}
func (cli *socketClient) LoadSnapshotChunk(
ctx context.Context,
req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
reqres, err := cli.queueRequestAndFlush(ctx, types.ToRequestLoadSnapshotChunk(req))
func (cli *socketClient) LoadSnapshotChunk(ctx context.Context, req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
res, err := cli.doRequest(ctx, types.ToRequestLoadSnapshotChunk(req))
if err != nil {
return nil, err
}
return reqres.Response.GetLoadSnapshotChunk(), nil
return res.GetLoadSnapshotChunk(), nil
}
func (cli *socketClient) ApplySnapshotChunk(
ctx context.Context,
req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
reqres, err := cli.queueRequestAndFlush(ctx, types.ToRequestApplySnapshotChunk(req))
func (cli *socketClient) ApplySnapshotChunk(ctx context.Context, req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
res, err := cli.doRequest(ctx, types.ToRequestApplySnapshotChunk(req))
if err != nil {
return nil, err
}
return reqres.Response.GetApplySnapshotChunk(), nil
return res.GetApplySnapshotChunk(), nil
}
func (cli *socketClient) PrepareProposal(
ctx context.Context,
req types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
reqres, err := cli.queueRequestAndFlush(ctx, types.ToRequestPrepareProposal(req))
func (cli *socketClient) PrepareProposal(ctx context.Context, req types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
res, err := cli.doRequest(ctx, types.ToRequestPrepareProposal(req))
if err != nil {
return nil, err
}
return reqres.Response.GetPrepareProposal(), nil
return res.GetPrepareProposal(), nil
}
func (cli *socketClient) ProcessProposal(
ctx context.Context,
req types.RequestProcessProposal,
) (*types.ResponseProcessProposal, error) {
reqres, err := cli.queueRequestAndFlush(ctx, types.ToRequestProcessProposal(req))
func (cli *socketClient) ProcessProposal(ctx context.Context, req types.RequestProcessProposal) (*types.ResponseProcessProposal, error) {
res, err := cli.doRequest(ctx, types.ToRequestProcessProposal(req))
if err != nil {
return nil, err
}
return reqres.Response.GetProcessProposal(), nil
return res.GetProcessProposal(), nil
}
func (cli *socketClient) ExtendVote(
ctx context.Context,
req types.RequestExtendVote) (*types.ResponseExtendVote, error) {
reqres, err := cli.queueRequestAndFlush(ctx, types.ToRequestExtendVote(req))
func (cli *socketClient) ExtendVote(ctx context.Context, req types.RequestExtendVote) (*types.ResponseExtendVote, error) {
res, err := cli.doRequest(ctx, types.ToRequestExtendVote(req))
if err != nil {
return nil, err
}
return reqres.Response.GetExtendVote(), nil
return res.GetExtendVote(), nil
}
func (cli *socketClient) VerifyVoteExtension(
ctx context.Context,
req types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) {
reqres, err := cli.queueRequestAndFlush(ctx, types.ToRequestVerifyVoteExtension(req))
func (cli *socketClient) VerifyVoteExtension(ctx context.Context, req types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) {
res, err := cli.doRequest(ctx, types.ToRequestVerifyVoteExtension(req))
if err != nil {
return nil, err
}
return reqres.Response.GetVerifyVoteExtension(), nil
return res.GetVerifyVoteExtension(), nil
}
func (cli *socketClient) FinalizeBlock(
ctx context.Context,
req types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
reqres, err := cli.queueRequestAndFlush(ctx, types.ToRequestFinalizeBlock(req))
func (cli *socketClient) FinalizeBlock(ctx context.Context, req types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
res, err := cli.doRequest(ctx, types.ToRequestFinalizeBlock(req))
if err != nil {
return nil, err
}
return reqres.Response.GetFinalizeBlock(), nil
return res.GetFinalizeBlock(), nil
}
//----------------------------------------
// queueRequest enqueues req onto the queue. The request can break early if the
// the context is canceled. If the queue is full, this method blocks to allow
// the request to be placed onto the queue. This has the effect of creating an
// unbounded queue of goroutines waiting to write to this queue which is a bit
// antithetical to the purposes of a queue, however, undoing this behavior has
// dangerous upstream implications as a result of the usage of this behavior upstream.
// Remove at your peril.
//
// The caller is responsible for checking cli.Error.
func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request) (*ReqRes, error) {
reqres := NewReqRes(req)
func (cli *socketClient) doRequest(ctx context.Context, req *types.Request) (*types.Response, error) {
reqres := makeReqRes(req)
select {
case cli.reqQueue <- reqres:
case <-ctx.Done():
return nil, ctx.Err()
return nil, fmt.Errorf("can't queue req: %w", ctx.Err())
}
return reqres, nil
}
func (cli *socketClient) queueRequestAndFlush(
ctx context.Context,
req *types.Request,
) (*ReqRes, error) {
reqres, err := cli.queueRequest(ctx, req)
if err != nil {
return nil, queueErr(err)
}
select {
case <-reqres.signal:
if err := cli.Error(); err != nil {
return nil, err
}
if err := cli.Flush(ctx); err != nil {
return nil, err
return reqres.Response, nil
case <-ctx.Done():
return nil, ctx.Err()
}
return reqres, cli.Error()
}
func queueErr(e error) error {
return fmt.Errorf("can't queue req: %w", e)
}
// drainQueue marks as complete and discards all remaining pending requests
@ -439,8 +348,8 @@ func (cli *socketClient) drainQueue(ctx context.Context) {
// mark all in-flight messages as resolved (they will get cli.Error())
for req := cli.reqSent.Front(); req != nil; req = req.Next() {
reqres := req.Value.(*ReqRes)
reqres.SetDone()
reqres := req.Value.(*requestAndResponse)
reqres.markDone()
}
// Mark all queued messages as resolved.
@ -453,7 +362,7 @@ func (cli *socketClient) drainQueue(ctx context.Context) {
case <-ctx.Done():
return
case reqres := <-cli.reqQueue:
reqres.SetDone()
reqres.markDone()
default:
return
}


+ 0
- 25
internal/proxy/mocks/app_conn_mempool.go View File

@ -5,8 +5,6 @@ package mocks
import (
context "context"
abciclient "github.com/tendermint/tendermint/abci/client"
mock "github.com/stretchr/testify/mock"
types "github.com/tendermint/tendermint/abci/types"
@ -40,29 +38,6 @@ func (_m *AppConnMempool) CheckTx(_a0 context.Context, _a1 types.RequestCheckTx)
return r0, r1
}
// CheckTxAsync provides a mock function with given fields: _a0, _a1
func (_m *AppConnMempool) CheckTxAsync(_a0 context.Context, _a1 types.RequestCheckTx) (*abciclient.ReqRes, error) {
ret := _m.Called(_a0, _a1)
var r0 *abciclient.ReqRes
if rf, ok := ret.Get(0).(func(context.Context, types.RequestCheckTx) *abciclient.ReqRes); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*abciclient.ReqRes)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, types.RequestCheckTx) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Error provides a mock function with given fields:
func (_m *AppConnMempool) Error() error {
ret := _m.Called()


Loading…
Cancel
Save