From 61a81279bd6ebfded58979783629b2916195fca3 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Wed, 23 Feb 2022 17:39:47 -0500 Subject: [PATCH] abci: make tendermint example+test clients manage a mutex (#7978) This is the first step in removing the mutex from ABCI applications: making our test applications hold mutexes, which this does, hopefully with zero impact. If this lands well, then we can explore deleting the other mutexes (in the ABCI server and the clients.) While this change is not user impacting at all, removing the other mutexes *will* be. In persuit of this, I've changed the KV app somewhat, to put almost all of the logic in the base application and make the persistent application mostly be a wrapper on top of that with a different storage layer. --- abci/example/kvstore/kvstore.go | 305 ++++++++++++++++++-- abci/example/kvstore/kvstore_test.go | 6 +- abci/example/kvstore/persistent_kvstore.go | 315 +-------------------- abci/types/application.go | 3 +- internal/consensus/byzantine_test.go | 6 +- internal/consensus/common_test.go | 21 +- internal/consensus/invalid_test.go | 2 +- internal/consensus/reactor_test.go | 11 +- internal/consensus/replay_test.go | 2 +- rpc/client/main_test.go | 7 +- test/e2e/app/app.go | 39 ++- 11 files changed, 348 insertions(+), 369 deletions(-) diff --git a/abci/example/kvstore/kvstore.go b/abci/example/kvstore/kvstore.go index 9f75fd149..9d4f61887 100644 --- a/abci/example/kvstore/kvstore.go +++ b/abci/example/kvstore/kvstore.go @@ -2,14 +2,21 @@ package kvstore import ( "bytes" + "encoding/base64" "encoding/binary" "encoding/json" "fmt" + "strconv" + "strings" + "sync" dbm "github.com/tendermint/tm-db" "github.com/tendermint/tendermint/abci/example/code" "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/crypto/encoding" + "github.com/tendermint/tendermint/libs/log" + cryptoproto "github.com/tendermint/tendermint/proto/tendermint/crypto" "github.com/tendermint/tendermint/version" ) @@ -65,17 +72,41 @@ var _ types.Application = (*Application)(nil) type Application struct { types.BaseApplication - + mu sync.Mutex state State RetainBlocks int64 // blocks to retain after commit (via ResponseCommit.RetainHeight) + logger log.Logger + + // validator set + ValUpdates []types.ValidatorUpdate + valAddrToPubKeyMap map[string]cryptoproto.PublicKey } func NewApplication() *Application { - state := loadState(dbm.NewMemDB()) - return &Application{state: state} + return &Application{ + logger: log.NewNopLogger(), + state: loadState(dbm.NewMemDB()), + valAddrToPubKeyMap: make(map[string]cryptoproto.PublicKey), + } } -func (app *Application) Info(req types.RequestInfo) (resInfo types.ResponseInfo) { +func (app *Application) InitChain(req types.RequestInitChain) types.ResponseInitChain { + app.mu.Lock() + defer app.mu.Unlock() + + for _, v := range req.Validators { + r := app.updateValidator(v) + if r.IsErr() { + app.logger.Error("error updating validators", "r", r) + panic("problem updating validators") + } + } + return types.ResponseInitChain{} +} + +func (app *Application) Info(req types.RequestInfo) types.ResponseInfo { + app.mu.Lock() + defer app.mu.Unlock() return types.ResponseInfo{ Data: fmt.Sprintf("{\"size\":%v}", app.state.Size), Version: version.ABCIVersion, @@ -85,8 +116,20 @@ func (app *Application) Info(req types.RequestInfo) (resInfo types.ResponseInfo) } } -// tx is either "key=value" or just arbitrary bytes -func (app *Application) HandleTx(tx []byte) *types.ResponseDeliverTx { +// tx is either "val:pubkey!power" or "key=value" or just arbitrary bytes +func (app *Application) handleTx(tx []byte) *types.ResponseDeliverTx { + // if it starts with "val:", update the validator set + // format is "val:pubkey!power" + if isValidatorTx(tx) { + // update validators in the merkle tree + // and in app.ValUpdates + return app.execValidatorTx(tx) + } + + if isPrepareTx(tx) { + return app.execPrepareTx(tx) + } + var key, value string parts := bytes.Split(tx, []byte("=")) if len(parts) == 2 { @@ -116,19 +159,53 @@ func (app *Application) HandleTx(tx []byte) *types.ResponseDeliverTx { return &types.ResponseDeliverTx{Code: code.CodeTypeOK, Events: events} } +func (app *Application) Close() error { + app.mu.Lock() + defer app.mu.Unlock() + + return app.state.db.Close() +} + func (app *Application) FinalizeBlock(req types.RequestFinalizeBlock) types.ResponseFinalizeBlock { - txs := make([]*types.ResponseDeliverTx, len(req.Txs)) + app.mu.Lock() + defer app.mu.Unlock() + + // reset valset changes + app.ValUpdates = make([]types.ValidatorUpdate, 0) + + // Punish validators who committed equivocation. + for _, ev := range req.ByzantineValidators { + if ev.Type == types.EvidenceType_DUPLICATE_VOTE { + addr := string(ev.Validator.Address) + if pubKey, ok := app.valAddrToPubKeyMap[addr]; ok { + app.updateValidator(types.ValidatorUpdate{ + PubKey: pubKey, + Power: ev.Validator.Power - 1, + }) + app.logger.Info("Decreased val power by 1 because of the equivocation", + "val", addr) + } else { + panic(fmt.Errorf("wanted to punish val %q but can't find it", addr)) + } + } + } + + respTxs := make([]*types.ResponseDeliverTx, len(req.Txs)) for i, tx := range req.Txs { - txs[i] = app.HandleTx(tx) + respTxs[i] = app.handleTx(tx) } - return types.ResponseFinalizeBlock{Txs: txs} + + return types.ResponseFinalizeBlock{Txs: respTxs, ValidatorUpdates: app.ValUpdates} } -func (app *Application) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx { +func (*Application) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx { return types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1} } func (app *Application) Commit() types.ResponseCommit { + app.mu.Lock() + defer app.mu.Unlock() + // Using a memdb - just return the big endian size of the db appHash := make([]byte, 8) binary.PutVarint(appHash, app.state.Size) @@ -144,43 +221,225 @@ func (app *Application) Commit() types.ResponseCommit { } // Returns an associated value or nil if missing. -func (app *Application) Query(reqQuery types.RequestQuery) (resQuery types.ResponseQuery) { +func (app *Application) Query(reqQuery types.RequestQuery) types.ResponseQuery { + app.mu.Lock() + defer app.mu.Unlock() + + if reqQuery.Path == "/val" { + key := []byte("val:" + string(reqQuery.Data)) + value, err := app.state.db.Get(key) + if err != nil { + panic(err) + } + + return types.ResponseQuery{ + Key: reqQuery.Data, + Value: value, + } + } + if reqQuery.Prove { value, err := app.state.db.Get(prefixKey(reqQuery.Data)) if err != nil { panic(err) } + + resQuery := types.ResponseQuery{ + Index: -1, + Key: reqQuery.Data, + Value: value, + Height: app.state.Height, + } + if value == nil { resQuery.Log = "does not exist" } else { resQuery.Log = "exists" } - resQuery.Index = -1 // TODO make Proof return index - resQuery.Key = reqQuery.Data - resQuery.Value = value - resQuery.Height = app.state.Height - return + return resQuery } - resQuery.Key = reqQuery.Data value, err := app.state.db.Get(prefixKey(reqQuery.Data)) if err != nil { panic(err) } + + resQuery := types.ResponseQuery{ + Key: reqQuery.Data, + Value: value, + Height: app.state.Height, + } + if value == nil { resQuery.Log = "does not exist" } else { resQuery.Log = "exists" } - resQuery.Value = value - resQuery.Height = app.state.Height return resQuery } -func (app *Application) PrepareProposal( - req types.RequestPrepareProposal) types.ResponsePrepareProposal { - return types.ResponsePrepareProposal{ - BlockData: req.BlockData} +func (app *Application) PrepareProposal(req types.RequestPrepareProposal) types.ResponsePrepareProposal { + app.mu.Lock() + defer app.mu.Unlock() + + return types.ResponsePrepareProposal{BlockData: app.substPrepareTx(req.BlockData)} +} + +func (*Application) ProcessProposal(req types.RequestProcessProposal) types.ResponseProcessProposal { + for _, tx := range req.Txs { + if len(tx) == 0 { + return types.ResponseProcessProposal{Result: types.ResponseProcessProposal_REJECT} + } + } + return types.ResponseProcessProposal{Result: types.ResponseProcessProposal_ACCEPT} +} + +//--------------------------------------------- +// update validators + +func (app *Application) Validators() (validators []types.ValidatorUpdate) { + app.mu.Lock() + defer app.mu.Unlock() + + itr, err := app.state.db.Iterator(nil, nil) + if err != nil { + panic(err) + } + for ; itr.Valid(); itr.Next() { + if isValidatorTx(itr.Key()) { + validator := new(types.ValidatorUpdate) + err := types.ReadMessage(bytes.NewBuffer(itr.Value()), validator) + if err != nil { + panic(err) + } + validators = append(validators, *validator) + } + } + if err = itr.Error(); err != nil { + panic(err) + } + return +} + +func MakeValSetChangeTx(pubkey cryptoproto.PublicKey, power int64) []byte { + pk, err := encoding.PubKeyFromProto(pubkey) + if err != nil { + panic(err) + } + pubStr := base64.StdEncoding.EncodeToString(pk.Bytes()) + return []byte(fmt.Sprintf("val:%s!%d", pubStr, power)) +} + +func isValidatorTx(tx []byte) bool { + return strings.HasPrefix(string(tx), ValidatorSetChangePrefix) +} + +// format is "val:pubkey!power" +// pubkey is a base64-encoded 32-byte ed25519 key +func (app *Application) execValidatorTx(tx []byte) *types.ResponseDeliverTx { + tx = tx[len(ValidatorSetChangePrefix):] + + // get the pubkey and power + pubKeyAndPower := strings.Split(string(tx), "!") + if len(pubKeyAndPower) != 2 { + return &types.ResponseDeliverTx{ + Code: code.CodeTypeEncodingError, + Log: fmt.Sprintf("Expected 'pubkey!power'. Got %v", pubKeyAndPower)} + } + pubkeyS, powerS := pubKeyAndPower[0], pubKeyAndPower[1] + + // decode the pubkey + pubkey, err := base64.StdEncoding.DecodeString(pubkeyS) + if err != nil { + return &types.ResponseDeliverTx{ + Code: code.CodeTypeEncodingError, + Log: fmt.Sprintf("Pubkey (%s) is invalid base64", pubkeyS)} + } + + // decode the power + power, err := strconv.ParseInt(powerS, 10, 64) + if err != nil { + return &types.ResponseDeliverTx{ + Code: code.CodeTypeEncodingError, + Log: fmt.Sprintf("Power (%s) is not an int", powerS)} + } + + // update + return app.updateValidator(types.UpdateValidator(pubkey, power, "")) +} + +// add, update, or remove a validator +func (app *Application) updateValidator(v types.ValidatorUpdate) *types.ResponseDeliverTx { + pubkey, err := encoding.PubKeyFromProto(v.PubKey) + if err != nil { + panic(fmt.Errorf("can't decode public key: %w", err)) + } + key := []byte("val:" + string(pubkey.Bytes())) + + if v.Power == 0 { + // remove validator + hasKey, err := app.state.db.Has(key) + if err != nil { + panic(err) + } + if !hasKey { + pubStr := base64.StdEncoding.EncodeToString(pubkey.Bytes()) + return &types.ResponseDeliverTx{ + Code: code.CodeTypeUnauthorized, + Log: fmt.Sprintf("Cannot remove non-existent validator %s", pubStr)} + } + if err = app.state.db.Delete(key); err != nil { + panic(err) + } + delete(app.valAddrToPubKeyMap, string(pubkey.Address())) + } else { + // add or update validator + value := bytes.NewBuffer(make([]byte, 0)) + if err := types.WriteMessage(&v, value); err != nil { + return &types.ResponseDeliverTx{ + Code: code.CodeTypeEncodingError, + Log: fmt.Sprintf("error encoding validator: %v", err)} + } + if err = app.state.db.Set(key, value.Bytes()); err != nil { + panic(err) + } + app.valAddrToPubKeyMap[string(pubkey.Address())] = v.PubKey + } + + // we only update the changes array if we successfully updated the tree + app.ValUpdates = append(app.ValUpdates, v) + + return &types.ResponseDeliverTx{Code: code.CodeTypeOK} +} + +// ----------------------------- +// prepare proposal machinery + +const PreparePrefix = "prepare" + +func isPrepareTx(tx []byte) bool { + return strings.HasPrefix(string(tx), PreparePrefix) +} + +// execPrepareTx is noop. tx data is considered as placeholder +// and is substitute at the PrepareProposal. +func (app *Application) execPrepareTx(tx []byte) *types.ResponseDeliverTx { + // noop + return &types.ResponseDeliverTx{} +} + +// substPrepareTx subst all the preparetx in the blockdata +// to null string(could be any arbitrary string). +func (app *Application) substPrepareTx(blockData [][]byte) [][]byte { + // TODO: this mechanism will change with the current spec of PrepareProposal + // We now have a special type for marking a tx as changed + for i, tx := range blockData { + if isPrepareTx(tx) { + blockData[i] = make([]byte, len(tx)) + } + } + + return blockData } diff --git a/abci/example/kvstore/kvstore_test.go b/abci/example/kvstore/kvstore_test.go index 21f54e0fe..23841d76a 100644 --- a/abci/example/kvstore/kvstore_test.go +++ b/abci/example/kvstore/kvstore_test.go @@ -118,10 +118,7 @@ func TestPersistentKVStoreInfo(t *testing.T) { // add a validator, remove a validator, update a validator func TestValUpdates(t *testing.T) { - dir := t.TempDir() - logger := log.NewTestingLogger(t) - - kvstore := NewPersistentKVStoreApplication(logger, dir) + kvstore := NewApplication() // init with some validators total := 10 @@ -210,6 +207,7 @@ func makeApplyBlock( // order doesn't matter func valsEqual(t *testing.T, vals1, vals2 []types.ValidatorUpdate) { + t.Helper() if len(vals1) != len(vals2) { t.Fatalf("vals dont match in len. got %d, expected %d", len(vals2), len(vals1)) } diff --git a/abci/example/kvstore/persistent_kvstore.go b/abci/example/kvstore/persistent_kvstore.go index 830f93235..2a6e8aa19 100644 --- a/abci/example/kvstore/persistent_kvstore.go +++ b/abci/example/kvstore/persistent_kvstore.go @@ -2,16 +2,10 @@ package kvstore import ( "bytes" - "encoding/base64" - "fmt" - "strconv" - "strings" dbm "github.com/tendermint/tm-db" - "github.com/tendermint/tendermint/abci/example/code" "github.com/tendermint/tendermint/abci/types" - "github.com/tendermint/tendermint/crypto/encoding" "github.com/tendermint/tendermint/libs/log" cryptoproto "github.com/tendermint/tendermint/proto/tendermint/crypto" ptypes "github.com/tendermint/tendermint/proto/tendermint/types" @@ -26,325 +20,42 @@ const ( var _ types.Application = (*PersistentKVStoreApplication)(nil) type PersistentKVStoreApplication struct { - app *Application - - // validator set - ValUpdates []types.ValidatorUpdate - - valAddrToPubKeyMap map[string]cryptoproto.PublicKey - - logger log.Logger + *Application } func NewPersistentKVStoreApplication(logger log.Logger, dbDir string) *PersistentKVStoreApplication { - name := "kvstore" - db, err := dbm.NewGoLevelDB(name, dbDir) + db, err := dbm.NewGoLevelDB("kvstore", dbDir) if err != nil { panic(err) } - state := loadState(db) - return &PersistentKVStoreApplication{ - app: &Application{state: state}, - valAddrToPubKeyMap: make(map[string]cryptoproto.PublicKey), - logger: logger, - } -} - -func (app *PersistentKVStoreApplication) Close() error { - return app.app.state.db.Close() -} - -func (app *PersistentKVStoreApplication) Info(req types.RequestInfo) types.ResponseInfo { - res := app.app.Info(req) - res.LastBlockHeight = app.app.state.Height - res.LastBlockAppHash = app.app.state.AppHash - return res -} - -// tx is either "val:pubkey!power" or "key=value" or just arbitrary bytes -func (app *PersistentKVStoreApplication) HandleTx(tx []byte) *types.ResponseDeliverTx { - // if it starts with "val:", update the validator set - // format is "val:pubkey!power" - if isValidatorTx(tx) { - // update validators in the merkle tree - // and in app.ValUpdates - return app.execValidatorTx(tx) - } - - if isPrepareTx(tx) { - return app.execPrepareTx(tx) - } - - // otherwise, update the key-value store - return app.app.HandleTx(tx) -} - -func (app *PersistentKVStoreApplication) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx { - return app.app.CheckTx(req) -} - -// Commit will panic if InitChain was not called -func (app *PersistentKVStoreApplication) Commit() types.ResponseCommit { - return app.app.Commit() -} - -// When path=/val and data={validator address}, returns the validator update (types.ValidatorUpdate) varint encoded. -// For any other path, returns an associated value or nil if missing. -func (app *PersistentKVStoreApplication) Query(reqQuery types.RequestQuery) (resQuery types.ResponseQuery) { - switch reqQuery.Path { - case "/val": - key := []byte("val:" + string(reqQuery.Data)) - value, err := app.app.state.db.Get(key) - if err != nil { - panic(err) - } - - resQuery.Key = reqQuery.Data - resQuery.Value = value - return - default: - return app.app.Query(reqQuery) + Application: &Application{ + valAddrToPubKeyMap: make(map[string]cryptoproto.PublicKey), + state: loadState(db), + logger: logger, + }, } } -// Save the validators in the merkle tree -func (app *PersistentKVStoreApplication) InitChain(req types.RequestInitChain) types.ResponseInitChain { - for _, v := range req.Validators { - r := app.updateValidator(v) - if r.IsErr() { - app.logger.Error("error updating validators", "r", r) - } - } - return types.ResponseInitChain{} -} - -// Track the block hash and header information -// Execute transactions -// Update the validator set -func (app *PersistentKVStoreApplication) FinalizeBlock(req types.RequestFinalizeBlock) types.ResponseFinalizeBlock { - // reset valset changes - app.ValUpdates = make([]types.ValidatorUpdate, 0) - - // Punish validators who committed equivocation. - for _, ev := range req.ByzantineValidators { - if ev.Type == types.EvidenceType_DUPLICATE_VOTE { - addr := string(ev.Validator.Address) - if pubKey, ok := app.valAddrToPubKeyMap[addr]; ok { - app.updateValidator(types.ValidatorUpdate{ - PubKey: pubKey, - Power: ev.Validator.Power - 1, - }) - app.logger.Info("Decreased val power by 1 because of the equivocation", - "val", addr) - } else { - app.logger.Error("Wanted to punish val, but can't find it", - "val", addr) - } - } - } - - respTxs := make([]*types.ResponseDeliverTx, len(req.Txs)) - for i, tx := range req.Txs { - respTxs[i] = app.HandleTx(tx) - } - - return types.ResponseFinalizeBlock{Txs: respTxs, ValidatorUpdates: app.ValUpdates} -} - -func (app *PersistentKVStoreApplication) ListSnapshots( - req types.RequestListSnapshots) types.ResponseListSnapshots { - return types.ResponseListSnapshots{} -} - -func (app *PersistentKVStoreApplication) LoadSnapshotChunk( - req types.RequestLoadSnapshotChunk) types.ResponseLoadSnapshotChunk { - return types.ResponseLoadSnapshotChunk{} -} - -func (app *PersistentKVStoreApplication) OfferSnapshot( - req types.RequestOfferSnapshot) types.ResponseOfferSnapshot { +func (app *PersistentKVStoreApplication) OfferSnapshot(req types.RequestOfferSnapshot) types.ResponseOfferSnapshot { return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_ABORT} } -func (app *PersistentKVStoreApplication) ApplySnapshotChunk( - req types.RequestApplySnapshotChunk) types.ResponseApplySnapshotChunk { +func (app *PersistentKVStoreApplication) ApplySnapshotChunk(req types.RequestApplySnapshotChunk) types.ResponseApplySnapshotChunk { return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT} } -func (app *PersistentKVStoreApplication) ExtendVote( - req types.RequestExtendVote) types.ResponseExtendVote { - return types.ResponseExtendVote{ - VoteExtension: ConstructVoteExtension(req.Vote.ValidatorAddress), - } -} - -func (app *PersistentKVStoreApplication) VerifyVoteExtension( - req types.RequestVerifyVoteExtension) types.ResponseVerifyVoteExtension { - return types.RespondVerifyVoteExtension( - app.verifyExtension(req.Vote.ValidatorAddress, req.Vote.VoteExtension)) -} - -func (app *PersistentKVStoreApplication) PrepareProposal( - req types.RequestPrepareProposal) types.ResponsePrepareProposal { - return types.ResponsePrepareProposal{BlockData: app.substPrepareTx(req.BlockData)} -} - -func (app *PersistentKVStoreApplication) ProcessProposal( - req types.RequestProcessProposal) types.ResponseProcessProposal { - for _, tx := range req.Txs { - if len(tx) == 0 { - return types.ResponseProcessProposal{Result: types.ResponseProcessProposal_REJECT} - } - } - return types.ResponseProcessProposal{Result: types.ResponseProcessProposal_ACCEPT} -} - -//--------------------------------------------- -// update validators - -func (app *PersistentKVStoreApplication) Validators() (validators []types.ValidatorUpdate) { - itr, err := app.app.state.db.Iterator(nil, nil) - if err != nil { - panic(err) - } - for ; itr.Valid(); itr.Next() { - if isValidatorTx(itr.Key()) { - validator := new(types.ValidatorUpdate) - err := types.ReadMessage(bytes.NewBuffer(itr.Value()), validator) - if err != nil { - panic(err) - } - validators = append(validators, *validator) - } - } - if err = itr.Error(); err != nil { - panic(err) - } - return -} - -func MakeValSetChangeTx(pubkey cryptoproto.PublicKey, power int64) []byte { - pk, err := encoding.PubKeyFromProto(pubkey) - if err != nil { - panic(err) - } - pubStr := base64.StdEncoding.EncodeToString(pk.Bytes()) - return []byte(fmt.Sprintf("val:%s!%d", pubStr, power)) +func (app *PersistentKVStoreApplication) ExtendVote(req types.RequestExtendVote) types.ResponseExtendVote { + return types.ResponseExtendVote{VoteExtension: ConstructVoteExtension(req.Vote.ValidatorAddress)} } -func isValidatorTx(tx []byte) bool { - return strings.HasPrefix(string(tx), ValidatorSetChangePrefix) -} - -// format is "val:pubkey!power" -// pubkey is a base64-encoded 32-byte ed25519 key -func (app *PersistentKVStoreApplication) execValidatorTx(tx []byte) *types.ResponseDeliverTx { - tx = tx[len(ValidatorSetChangePrefix):] - - // get the pubkey and power - pubKeyAndPower := strings.Split(string(tx), "!") - if len(pubKeyAndPower) != 2 { - return &types.ResponseDeliverTx{ - Code: code.CodeTypeEncodingError, - Log: fmt.Sprintf("Expected 'pubkey!power'. Got %v", pubKeyAndPower)} - } - pubkeyS, powerS := pubKeyAndPower[0], pubKeyAndPower[1] - - // decode the pubkey - pubkey, err := base64.StdEncoding.DecodeString(pubkeyS) - if err != nil { - return &types.ResponseDeliverTx{ - Code: code.CodeTypeEncodingError, - Log: fmt.Sprintf("Pubkey (%s) is invalid base64", pubkeyS)} - } - - // decode the power - power, err := strconv.ParseInt(powerS, 10, 64) - if err != nil { - return &types.ResponseDeliverTx{ - Code: code.CodeTypeEncodingError, - Log: fmt.Sprintf("Power (%s) is not an int", powerS)} - } - - // update - return app.updateValidator(types.UpdateValidator(pubkey, power, "")) -} - -// add, update, or remove a validator -func (app *PersistentKVStoreApplication) updateValidator(v types.ValidatorUpdate) *types.ResponseDeliverTx { - pubkey, err := encoding.PubKeyFromProto(v.PubKey) - if err != nil { - panic(fmt.Errorf("can't decode public key: %w", err)) - } - key := []byte("val:" + string(pubkey.Bytes())) - - if v.Power == 0 { - // remove validator - hasKey, err := app.app.state.db.Has(key) - if err != nil { - panic(err) - } - if !hasKey { - pubStr := base64.StdEncoding.EncodeToString(pubkey.Bytes()) - return &types.ResponseDeliverTx{ - Code: code.CodeTypeUnauthorized, - Log: fmt.Sprintf("Cannot remove non-existent validator %s", pubStr)} - } - if err = app.app.state.db.Delete(key); err != nil { - panic(err) - } - delete(app.valAddrToPubKeyMap, string(pubkey.Address())) - } else { - // add or update validator - value := bytes.NewBuffer(make([]byte, 0)) - if err := types.WriteMessage(&v, value); err != nil { - return &types.ResponseDeliverTx{ - Code: code.CodeTypeEncodingError, - Log: fmt.Sprintf("error encoding validator: %v", err)} - } - if err = app.app.state.db.Set(key, value.Bytes()); err != nil { - panic(err) - } - app.valAddrToPubKeyMap[string(pubkey.Address())] = v.PubKey - } - - // we only update the changes array if we successfully updated the tree - app.ValUpdates = append(app.ValUpdates, v) - - return &types.ResponseDeliverTx{Code: code.CodeTypeOK} +func (app *PersistentKVStoreApplication) VerifyVoteExtension(req types.RequestVerifyVoteExtension) types.ResponseVerifyVoteExtension { + return types.RespondVerifyVoteExtension(app.verifyExtension(req.Vote.ValidatorAddress, req.Vote.VoteExtension)) } // ----------------------------- -const PreparePrefix = "prepare" - -func isPrepareTx(tx []byte) bool { - return strings.HasPrefix(string(tx), PreparePrefix) -} - -// execPrepareTx is noop. tx data is considered as placeholder -// and is substitute at the PrepareProposal. -func (app *PersistentKVStoreApplication) execPrepareTx(tx []byte) *types.ResponseDeliverTx { - // noop - return &types.ResponseDeliverTx{} -} - -// substPrepareTx subst all the preparetx in the blockdata -// to null string(could be any arbitrary string). -func (app *PersistentKVStoreApplication) substPrepareTx(blockData [][]byte) [][]byte { - // TODO: this mechanism will change with the current spec of PrepareProposal - // We now have a special type for marking a tx as changed - for i, tx := range blockData { - if isPrepareTx(tx) { - blockData[i] = make([]byte, len(tx)) - } - } - - return blockData -} - func ConstructVoteExtension(valAddr []byte) *ptypes.VoteExtension { return &ptypes.VoteExtension{ AppDataToSign: valAddr, diff --git a/abci/types/application.go b/abci/types/application.go index 16ae03546..b3ebf707f 100644 --- a/abci/types/application.go +++ b/abci/types/application.go @@ -41,8 +41,7 @@ type Application interface { var _ Application = (*BaseApplication)(nil) -type BaseApplication struct { -} +type BaseApplication struct{} func NewBaseApplication() *BaseApplication { return &BaseApplication{} diff --git a/internal/consensus/byzantine_test.go b/internal/consensus/byzantine_test.go index 7a5260a64..33e1dbf63 100644 --- a/internal/consensus/byzantine_test.go +++ b/internal/consensus/byzantine_test.go @@ -14,6 +14,7 @@ import ( dbm "github.com/tendermint/tm-db" abciclient "github.com/tendermint/tendermint/abci/client" + "github.com/tendermint/tendermint/abci/example/kvstore" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/evidence" @@ -36,7 +37,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { // kind of deadlock and hit the larger timeout. This timeout // can be extended a bunch if needed, but it's good to avoid // falling back to a much coarser timeout - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() config := configSetup(t) @@ -45,7 +46,6 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { prevoteHeight := int64(2) testName := "consensus_byzantine_test" tickerFunc := newMockTickerFunc(true) - appFunc := newKVStore valSet, privVals := factory.ValidatorSet(ctx, t, nValidators, 30) genDoc := factory.GenesisDoc(config, time.Now(), valSet.Validators, nil) @@ -66,7 +66,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { defer os.RemoveAll(thisConfig.RootDir) ensureDir(t, path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal - app := appFunc(t, logger) + app := kvstore.NewApplication() vals := types.TM2PB.ValidatorUpdates(state.Validators) app.InitChain(abci.RequestInitChain{Validators: vals}) diff --git a/internal/consensus/common_test.go b/internal/consensus/common_test.go index cfd232b34..fd9780a15 100644 --- a/internal/consensus/common_test.go +++ b/internal/consensus/common_test.go @@ -5,7 +5,6 @@ import ( "context" "errors" "fmt" - "io" "os" "path/filepath" "sort" @@ -750,7 +749,6 @@ func makeConsensusState( nValidators int, testName string, tickerFunc func() TimeoutTicker, - appFunc func(t *testing.T, logger log.Logger) abci.Application, configOpts ...func(*config.Config), ) ([]*State, cleanupFunc) { t.Helper() @@ -779,11 +777,8 @@ func makeConsensusState( ensureDir(t, filepath.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal - app := appFunc(t, logger) - - if appCloser, ok := app.(io.Closer); ok { - closeFuncs = append(closeFuncs, appCloser.Close) - } + app := kvstore.NewApplication() + closeFuncs = append(closeFuncs, app.Close) vals := types.TM2PB.ValidatorUpdates(state.Validators) app.InitChain(abci.RequestInitChain{Validators: vals}) @@ -934,19 +929,11 @@ func (m *mockTicker) Chan() <-chan timeoutInfo { return m.c } -func newPersistentKVStore(t *testing.T, logger log.Logger) abci.Application { - t.Helper() - - dir := t.TempDir() - - return kvstore.NewPersistentKVStoreApplication(logger, dir) -} - -func newKVStore(_ *testing.T, _ log.Logger) abci.Application { +func newEpehemeralKVStore(_ log.Logger, _ string) abci.Application { return kvstore.NewApplication() } -func newPersistentKVStoreWithPath(logger log.Logger, dbDir string) abci.Application { +func newPersistentKVStore(logger log.Logger, dbDir string) abci.Application { return kvstore.NewPersistentKVStoreApplication(logger, dbDir) } diff --git a/internal/consensus/invalid_test.go b/internal/consensus/invalid_test.go index a3e865c68..142414d30 100644 --- a/internal/consensus/invalid_test.go +++ b/internal/consensus/invalid_test.go @@ -27,7 +27,7 @@ func TestReactorInvalidPrecommit(t *testing.T) { n := 4 states, cleanup := makeConsensusState(ctx, t, config, n, "consensus_reactor_test", - newMockTickerFunc(true), newKVStore) + newMockTickerFunc(true)) t.Cleanup(cleanup) for i := 0; i < 4; i++ { diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index cd6313650..97fcd091f 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -326,7 +326,7 @@ func TestReactorBasic(t *testing.T) { n := 4 states, cleanup := makeConsensusState(ctx, t, cfg, n, "consensus_reactor_test", - newMockTickerFunc(true), newKVStore) + newMockTickerFunc(true)) t.Cleanup(cleanup) rts := setup(ctx, t, n, states, 100) // buffer must be large enough to not deadlock @@ -379,7 +379,6 @@ func TestReactorWithEvidence(t *testing.T) { n := 4 testName := "consensus_reactor_test" tickerFunc := newMockTickerFunc(true) - appFunc := newKVStore valSet, privVals := factory.ValidatorSet(ctx, t, n, 30) genDoc := factory.GenesisDoc(cfg, time.Now(), valSet.Validators, nil) @@ -397,7 +396,7 @@ func TestReactorWithEvidence(t *testing.T) { defer os.RemoveAll(thisConfig.RootDir) ensureDir(t, path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal - app := appFunc(t, logger) + app := kvstore.NewApplication() vals := types.TM2PB.ValidatorUpdates(state.Validators) app.InitChain(abci.RequestInitChain{Validators: vals}) @@ -491,7 +490,6 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) { n, "consensus_reactor_test", newMockTickerFunc(true), - newKVStore, func(c *config.Config) { c.Consensus.CreateEmptyBlocks = false }, @@ -543,7 +541,7 @@ func TestReactorRecordsVotesAndBlockParts(t *testing.T) { n := 4 states, cleanup := makeConsensusState(ctx, t, cfg, n, "consensus_reactor_test", - newMockTickerFunc(true), newKVStore) + newMockTickerFunc(true)) t.Cleanup(cleanup) rts := setup(ctx, t, n, states, 100) // buffer must be large enough to not deadlock @@ -612,7 +610,6 @@ func TestReactorVotingPowerChange(t *testing.T) { n, "consensus_voting_power_changes_test", newMockTickerFunc(true), - newPersistentKVStore, ) t.Cleanup(cleanup) @@ -722,7 +719,7 @@ func TestReactorValidatorSetChanges(t *testing.T) { nPeers, "consensus_val_set_changes_test", newMockTickerFunc(true), - newPersistentKVStoreWithPath, + newEpehemeralKVStore, ) t.Cleanup(cleanup) diff --git a/internal/consensus/replay_test.go b/internal/consensus/replay_test.go index 82ca54f31..b9302d125 100644 --- a/internal/consensus/replay_test.go +++ b/internal/consensus/replay_test.go @@ -339,7 +339,7 @@ func setupSimulator(ctx context.Context, t *testing.T) *simulatorTestSuite { nPeers, "replay_test", newMockTickerFunc(true), - newPersistentKVStoreWithPath) + newPersistentKVStore) sim.Config = cfg var err error diff --git a/rpc/client/main_test.go b/rpc/client/main_test.go index d799d8b85..45a83afbb 100644 --- a/rpc/client/main_test.go +++ b/rpc/client/main_test.go @@ -2,7 +2,6 @@ package client_test import ( "context" - "os" "testing" "github.com/stretchr/testify/assert" @@ -23,10 +22,9 @@ func NodeSuite(ctx context.Context, t *testing.T, logger log.Logger) (service.Se conf, err := rpctest.CreateConfig(t, t.Name()) require.NoError(t, err) - // start a tendermint node in the background to test against - dir := t.TempDir() - app := kvstore.NewPersistentKVStoreApplication(logger, dir) + app := kvstore.NewApplication() + // start a tendermint node in the background to test against. node, closer, err := rpctest.StartTendermint(ctx, conf, app, rpctest.SuppressStdout) require.NoError(t, err) t.Cleanup(func() { @@ -34,7 +32,6 @@ func NodeSuite(ctx context.Context, t *testing.T, logger log.Logger) (service.Se assert.NoError(t, closer(ctx)) assert.NoError(t, app.Close()) node.Wait() - _ = os.RemoveAll(dir) }) return node, conf } diff --git a/test/e2e/app/app.go b/test/e2e/app/app.go index a7348ecd9..26a511718 100644 --- a/test/e2e/app/app.go +++ b/test/e2e/app/app.go @@ -8,6 +8,7 @@ import ( "path/filepath" "sort" "strconv" + "sync" "github.com/tendermint/tendermint/abci/example/code" abci "github.com/tendermint/tendermint/abci/types" @@ -21,6 +22,7 @@ import ( // to disk as JSON, taking state sync snapshots if requested. type Application struct { abci.BaseApplication + mu sync.Mutex logger log.Logger state *State snapshots *SnapshotStore @@ -98,8 +100,8 @@ func NewApplication(cfg *Config) (*Application, error) { // Info implements ABCI. func (app *Application) Info(req abci.RequestInfo) abci.ResponseInfo { - app.state.RLock() - defer app.state.RUnlock() + app.mu.Lock() + defer app.mu.Unlock() return abci.ResponseInfo{ Version: version.ABCIVersion, @@ -111,6 +113,9 @@ func (app *Application) Info(req abci.RequestInfo) abci.ResponseInfo { // Info implements ABCI. func (app *Application) InitChain(req abci.RequestInitChain) abci.ResponseInitChain { + app.mu.Lock() + defer app.mu.Unlock() + var err error app.state.initialHeight = uint64(req.InitialHeight) if len(req.AppStateBytes) > 0 { @@ -135,6 +140,9 @@ func (app *Application) InitChain(req abci.RequestInitChain) abci.ResponseInitCh // CheckTx implements ABCI. func (app *Application) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { + app.mu.Lock() + defer app.mu.Unlock() + _, _, err := parseTx(req.Tx) if err != nil { return abci.ResponseCheckTx{ @@ -149,6 +157,9 @@ func (app *Application) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { func (app *Application) FinalizeBlock(req abci.RequestFinalizeBlock) abci.ResponseFinalizeBlock { var txs = make([]*abci.ResponseDeliverTx, len(req.Txs)) + app.mu.Lock() + defer app.mu.Unlock() + for i, tx := range req.Txs { key, value, err := parseTx(tx) if err != nil { @@ -187,6 +198,9 @@ func (app *Application) FinalizeBlock(req abci.RequestFinalizeBlock) abci.Respon // Commit implements ABCI. func (app *Application) Commit() abci.ResponseCommit { + app.mu.Lock() + defer app.mu.Unlock() + height, hash, err := app.state.Commit() if err != nil { panic(err) @@ -214,6 +228,9 @@ func (app *Application) Commit() abci.ResponseCommit { // Query implements ABCI. func (app *Application) Query(req abci.RequestQuery) abci.ResponseQuery { + app.mu.Lock() + defer app.mu.Unlock() + return abci.ResponseQuery{ Height: int64(app.state.Height), Key: req.Data, @@ -223,6 +240,9 @@ func (app *Application) Query(req abci.RequestQuery) abci.ResponseQuery { // ListSnapshots implements ABCI. func (app *Application) ListSnapshots(req abci.RequestListSnapshots) abci.ResponseListSnapshots { + app.mu.Lock() + defer app.mu.Unlock() + snapshots, err := app.snapshots.List() if err != nil { panic(err) @@ -232,6 +252,9 @@ func (app *Application) ListSnapshots(req abci.RequestListSnapshots) abci.Respon // LoadSnapshotChunk implements ABCI. func (app *Application) LoadSnapshotChunk(req abci.RequestLoadSnapshotChunk) abci.ResponseLoadSnapshotChunk { + app.mu.Lock() + defer app.mu.Unlock() + chunk, err := app.snapshots.LoadChunk(req.Height, req.Format, req.Chunk) if err != nil { panic(err) @@ -241,6 +264,9 @@ func (app *Application) LoadSnapshotChunk(req abci.RequestLoadSnapshotChunk) abc // OfferSnapshot implements ABCI. func (app *Application) OfferSnapshot(req abci.RequestOfferSnapshot) abci.ResponseOfferSnapshot { + app.mu.Lock() + defer app.mu.Unlock() + if app.restoreSnapshot != nil { panic("A snapshot is already being restored") } @@ -251,6 +277,9 @@ func (app *Application) OfferSnapshot(req abci.RequestOfferSnapshot) abci.Respon // ApplySnapshotChunk implements ABCI. func (app *Application) ApplySnapshotChunk(req abci.RequestApplySnapshotChunk) abci.ResponseApplySnapshotChunk { + app.mu.Lock() + defer app.mu.Unlock() + if app.restoreSnapshot == nil { panic("No restore in progress") } @@ -270,12 +299,14 @@ func (app *Application) ApplySnapshotChunk(req abci.RequestApplySnapshotChunk) a return abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT} } -func (app *Application) PrepareProposal( - req abci.RequestPrepareProposal) abci.ResponsePrepareProposal { +func (app *Application) PrepareProposal(req abci.RequestPrepareProposal) abci.ResponsePrepareProposal { return abci.ResponsePrepareProposal{BlockData: req.BlockData} } func (app *Application) Rollback() error { + app.mu.Lock() + defer app.mu.Unlock() + return app.state.Rollback() }