Browse Source

abci: PrepareProposal (#6544)

pull/7769/head
Marko 3 years ago
committed by Sergio Mena
parent
commit
ff498ff333
17 changed files with 999 additions and 214 deletions
  1. +1
    -0
      abci/client/client.go
  2. +8
    -0
      abci/client/grpc_client.go
  3. +11
    -0
      abci/client/local_client.go
  4. +23
    -0
      abci/client/mocks/client.go
  5. +13
    -0
      abci/client/socket_client.go
  6. +6
    -0
      abci/example/kvstore/kvstore.go
  7. +9
    -0
      abci/example/kvstore/persistent_kvstore.go
  8. +3
    -0
      abci/server/socket_server.go
  9. +12
    -1
      abci/types/application.go
  10. +12
    -0
      abci/types/messages.go
  11. +805
    -212
      abci/types/types.pb.go
  12. +5
    -0
      internal/consensus/mempool_test.go
  13. +11
    -0
      internal/proxy/app_conn.go
  14. +23
    -0
      internal/proxy/mocks/app_conn_consensus.go
  15. +30
    -1
      internal/state/execution.go
  16. +5
    -0
      test/e2e/app/app.go
  17. +22
    -0
      types/tx.go

+ 1
- 0
abci/client/client.go View File

@ -45,6 +45,7 @@ type Client interface {
Query(context.Context, types.RequestQuery) (*types.ResponseQuery, error)
Commit(context.Context) (*types.ResponseCommit, error)
InitChain(context.Context, types.RequestInitChain) (*types.ResponseInitChain, error)
PrepareProposal(context.Context, types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error)
BeginBlock(context.Context, types.RequestBeginBlock) (*types.ResponseBeginBlock, error)
EndBlock(context.Context, types.RequestEndBlock) (*types.ResponseEndBlock, error)
ListSnapshots(context.Context, types.RequestListSnapshots) (*types.ResponseListSnapshots, error)


+ 8
- 0
abci/client/grpc_client.go View File

@ -368,3 +368,11 @@ func (cli *grpcClient) ApplySnapshotChunk(
req := types.ToRequestApplySnapshotChunk(params)
return cli.client.ApplySnapshotChunk(ctx, req.GetApplySnapshotChunk(), grpc.WaitForReady(true))
}
func (cli *grpcClient) PrepareProposal(
ctx context.Context,
params types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
req := types.ToRequestPrepareProposal(params)
return cli.client.PrepareProposal(ctx, req.GetPrepareProposal(), grpc.WaitForReady(true))
}

+ 11
- 0
abci/client/local_client.go View File

@ -222,6 +222,17 @@ func (app *localClient) ApplySnapshotChunk(
return &res, nil
}
func (app *localClient) PrepareProposal(
ctx context.Context,
req types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.PrepareProposal(req)
return &res, nil
}
//-------------------------------------------------------
func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRes {


+ 23
- 0
abci/client/mocks/client.go View File

@ -404,6 +404,29 @@ func (_m *Client) OfferSnapshot(_a0 context.Context, _a1 types.RequestOfferSnaps
return r0, r1
}
// PrepareProposal provides a mock function with given fields: _a0, _a1
func (_m *Client) PrepareProposal(_a0 context.Context, _a1 types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
ret := _m.Called(_a0, _a1)
var r0 *types.ResponsePrepareProposal
if rf, ok := ret.Get(0).(func(context.Context, types.RequestPrepareProposal) *types.ResponsePrepareProposal); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ResponsePrepareProposal)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, types.RequestPrepareProposal) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Query provides a mock function with given fields: _a0, _a1
func (_m *Client) Query(_a0 context.Context, _a1 types.RequestQuery) (*types.ResponseQuery, error) {
ret := _m.Called(_a0, _a1)


+ 13
- 0
abci/client/socket_client.go View File

@ -404,6 +404,17 @@ func (cli *socketClient) ApplySnapshotChunk(
return reqres.Response.GetApplySnapshotChunk(), nil
}
func (cli *socketClient) PrepareProposal(
ctx context.Context,
req types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
reqres, err := cli.queueRequestAndFlush(ctx, types.ToRequestPrepareProposal(req))
if err != nil {
return nil, err
}
return reqres.Response.GetPrepareProposal(), nil
}
//----------------------------------------
// queueRequest enqueues req onto the queue. If the queue is full, it ether
@ -527,6 +538,8 @@ func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
_, ok = res.Value.(*types.Response_ListSnapshots)
case *types.Request_OfferSnapshot:
_, ok = res.Value.(*types.Response_OfferSnapshot)
case *types.Request_PrepareProposal:
_, ok = res.Value.(*types.Response_PrepareProposal)
}
return ok
}


+ 6
- 0
abci/example/kvstore/kvstore.go View File

@ -171,3 +171,9 @@ func (app *Application) Query(reqQuery types.RequestQuery) (resQuery types.Respo
return resQuery
}
func (app *Application) PrepareProposal(
req types.RequestPrepareProposal) types.ResponsePrepareProposal {
return types.ResponsePrepareProposal{
BlockData: req.BlockData}
}

+ 9
- 0
abci/example/kvstore/persistent_kvstore.go View File

@ -166,6 +166,15 @@ func (app *PersistentKVStoreApplication) ApplySnapshotChunk(
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
}
func (app *PersistentKVStoreApplication) PrepareProposal(
req types.RequestPrepareProposal) types.ResponsePrepareProposal {
if len(req.BlockData) >= 1 {
req.BlockData[1] = []byte("modified tx")
}
return types.ResponsePrepareProposal{BlockData: req.BlockData}
}
//---------------------------------------------
// update validators


+ 3
- 0
abci/server/socket_server.go View File

@ -240,6 +240,9 @@ func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types
case *types.Request_OfferSnapshot:
res := s.app.OfferSnapshot(*r.OfferSnapshot)
responses <- types.ToResponseOfferSnapshot(res)
case *types.Request_PrepareProposal:
res := s.app.PrepareProposal(*r.PrepareProposal)
responses <- types.ToResponsePrepareProposal(res)
case *types.Request_LoadSnapshotChunk:
res := s.app.LoadSnapshotChunk(*r.LoadSnapshotChunk)
responses <- types.ToResponseLoadSnapshotChunk(res)


+ 12
- 1
abci/types/application.go View File

@ -17,7 +17,8 @@ type Application interface {
CheckTx(RequestCheckTx) ResponseCheckTx // Validate a tx for the mempool
// Consensus Connection
InitChain(RequestInitChain) ResponseInitChain // Initialize blockchain w validators/other info from TendermintCore
InitChain(RequestInitChain) ResponseInitChain // Initialize blockchain w validators/other info from TendermintCore
PrepareProposal(RequestPrepareProposal) ResponsePrepareProposal
BeginBlock(RequestBeginBlock) ResponseBeginBlock // Signals the beginning of a block
DeliverTx(RequestDeliverTx) ResponseDeliverTx // Deliver a tx for full processing
EndBlock(RequestEndBlock) ResponseEndBlock // Signals the end of a block, returns changes to the validator set
@ -90,6 +91,10 @@ func (BaseApplication) ApplySnapshotChunk(req RequestApplySnapshotChunk) Respons
return ResponseApplySnapshotChunk{}
}
func (BaseApplication) PrepareProposal(req RequestPrepareProposal) ResponsePrepareProposal {
return ResponsePrepareProposal{}
}
//-------------------------------------------------------
// GRPCApplication is a GRPC wrapper for Application
@ -172,3 +177,9 @@ func (app *GRPCApplication) ApplySnapshotChunk(
res := app.app.ApplySnapshotChunk(*req)
return &res, nil
}
func (app *GRPCApplication) PrepareProposal(
ctx context.Context, req *RequestPrepareProposal) (*ResponsePrepareProposal, error) {
res := app.app.PrepareProposal(*req)
return &res, nil
}

+ 12
- 0
abci/types/messages.go View File

@ -110,6 +110,12 @@ func ToRequestApplySnapshotChunk(req RequestApplySnapshotChunk) *Request {
}
}
func ToRequestPrepareProposal(req RequestPrepareProposal) *Request {
return &Request{
Value: &Request_PrepareProposal{&req},
}
}
//----------------------------------------
func ToResponseException(errStr string) *Response {
@ -200,3 +206,9 @@ func ToResponseApplySnapshotChunk(res ResponseApplySnapshotChunk) *Response {
Value: &Response_ApplySnapshotChunk{&res},
}
}
func ToResponsePrepareProposal(res ResponsePrepareProposal) *Response {
return &Response{
Value: &Response_PrepareProposal{&res},
}
}

+ 805
- 212
abci/types/types.pb.go
File diff suppressed because it is too large
View File


+ 5
- 0
internal/consensus/mempool_test.go View File

@ -301,3 +301,8 @@ func (app *CounterApplication) Commit() abci.ResponseCommit {
binary.BigEndian.PutUint64(hash, uint64(app.txCount))
return abci.ResponseCommit{Data: hash}
}
func (app *CounterApplication) PrepareProposal(
req abci.RequestPrepareProposal) abci.ResponsePrepareProposal {
return abci.ResponsePrepareProposal{BlockData: req.BlockData} //nolint:gosimple
}

+ 11
- 0
internal/proxy/app_conn.go View File

@ -20,6 +20,7 @@ type AppConnConsensus interface {
InitChain(context.Context, types.RequestInitChain) (*types.ResponseInitChain, error)
PrepareProposal(context.Context, types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error)
BeginBlock(context.Context, types.RequestBeginBlock) (*types.ResponseBeginBlock, error)
DeliverTx(context.Context, types.RequestDeliverTx) (*types.ResponseDeliverTx, error)
EndBlock(context.Context, types.RequestEndBlock) (*types.ResponseEndBlock, error)
@ -62,6 +63,8 @@ type appConnConsensus struct {
appConn abciclient.Client
}
var _ AppConnConsensus = (*appConnConsensus)(nil)
func NewAppConnConsensus(appConn abciclient.Client, metrics *Metrics) AppConnConsensus {
return &appConnConsensus{
metrics: metrics,
@ -85,6 +88,14 @@ func (app *appConnConsensus) InitChain(
return app.appConn.InitChain(ctx, req)
}
func (app *appConnConsensus) PrepareProposal(
ctx context.Context,
req types.RequestPrepareProposal,
) (*types.ResponsePrepareProposal, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "prepare_proposal", "type", "sync"))()
return app.appConn.PrepareProposal(ctx, req)
}
func (app *appConnConsensus) BeginBlock(
ctx context.Context,
req types.RequestBeginBlock,


+ 23
- 0
internal/proxy/mocks/app_conn_consensus.go View File

@ -146,6 +146,29 @@ func (_m *AppConnConsensus) InitChain(_a0 context.Context, _a1 types.RequestInit
return r0, r1
}
// PrepareProposal provides a mock function with given fields: _a0, _a1
func (_m *AppConnConsensus) PrepareProposal(_a0 context.Context, _a1 types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
ret := _m.Called(_a0, _a1)
var r0 *types.ResponsePrepareProposal
if rf, ok := ret.Get(0).(func(context.Context, types.RequestPrepareProposal) *types.ResponsePrepareProposal); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ResponsePrepareProposal)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, types.RequestPrepareProposal) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// SetResponseCallback provides a mock function with given fields: _a0
func (_m *AppConnConsensus) SetResponseCallback(_a0 abciclient.Callback) {
_m.Called(_a0)


+ 30
- 1
internal/state/execution.go View File

@ -99,6 +99,8 @@ func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher)
// and txs from the mempool. The max bytes must be big enough to fit the commit.
// Up to 1/10th of the block space is allcoated for maximum sized evidence.
// The rest is given to txs, up to the max gas.
//
// Contract: application will not return more bytes than are sent over the wire.
func (blockExec *BlockExecutor) CreateProposalBlock(
height int64,
state State, commit *types.Commit,
@ -115,7 +117,34 @@ func (blockExec *BlockExecutor) CreateProposalBlock(
txs := blockExec.mempool.ReapMaxBytesMaxGas(maxDataBytes, maxGas)
return state.MakeBlock(height, txs, commit, evidence, proposerAddr)
preparedProposal, err := blockExec.proxyApp.PrepareProposal(
context.Background(),
abci.RequestPrepareProposal{BlockData: txs.ToSliceOfBytes(), BlockDataSize: maxDataBytes},
)
if err != nil {
// The App MUST ensure that only valid (and hence 'processable') transactions
// enter the mempool. Hence, at this point, we can't have any non-processable
// transaction causing an error.
//
// Also, the App can simply skip any transaction that could cause any kind of trouble.
// Either way, we can not recover in a meaningful way, unless we skip proposing
// this block, repair what caused the error and try again. Hence, we panic on
// purpose for now.
panic(err)
}
newTxs := preparedProposal.GetBlockData()
var txSize int
for _, tx := range newTxs {
txSize += len(tx)
if maxDataBytes < int64(txSize) {
panic("block data exceeds max amount of allowed bytes")
}
}
modifiedTxs := types.ToTxs(preparedProposal.GetBlockData())
return state.MakeBlock(height, modifiedTxs, commit, evidence, proposerAddr)
}
// ValidateBlock validates the given block against the given state.


+ 5
- 0
test/e2e/app/app.go View File

@ -267,6 +267,11 @@ func (app *Application) ApplySnapshotChunk(req abci.RequestApplySnapshotChunk) a
return abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}
}
func (app *Application) PrepareProposal(
req abci.RequestPrepareProposal) abci.ResponsePrepareProposal {
return abci.ResponsePrepareProposal{BlockData: req.BlockData} //nolint:gosimple
}
func (app *Application) Rollback() error {
return app.state.Rollback()
}


+ 22
- 0
types/tx.go View File

@ -79,6 +79,28 @@ func (txs Txs) Proof(i int) TxProof {
}
}
// ToSliceOfBytes converts a Txs to slice of byte slices.
//
// NOTE: This method should become obsolete once Txs is switched to [][]byte.
// ref: #2603
// TODO This function is to disappear when TxRecord is introduced
func (txs Txs) ToSliceOfBytes() [][]byte {
txBzs := make([][]byte, len(txs))
for i := 0; i < len(txs); i++ {
txBzs[i] = txs[i]
}
return txBzs
}
// ToTxs converts a raw slice of byte slices into a Txs type.
func ToTxs(txs [][]byte) Txs {
txBzs := make(Txs, len(txs))
for i := 0; i < len(txs); i++ {
txBzs[i] = txs[i]
}
return txBzs
}
// TxProof represents a Merkle proof of the presence of a transaction in the Merkle tree.
type TxProof struct {
RootHash tmbytes.HexBytes `json:"root_hash"`


Loading…
Cancel
Save