diff --git a/abci/example/kvstore/kvstore.go b/abci/example/kvstore/kvstore.go index 9f75fd149..9d4f61887 100644 --- a/abci/example/kvstore/kvstore.go +++ b/abci/example/kvstore/kvstore.go @@ -2,14 +2,21 @@ package kvstore import ( "bytes" + "encoding/base64" "encoding/binary" "encoding/json" "fmt" + "strconv" + "strings" + "sync" dbm "github.com/tendermint/tm-db" "github.com/tendermint/tendermint/abci/example/code" "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/crypto/encoding" + "github.com/tendermint/tendermint/libs/log" + cryptoproto "github.com/tendermint/tendermint/proto/tendermint/crypto" "github.com/tendermint/tendermint/version" ) @@ -65,17 +72,41 @@ var _ types.Application = (*Application)(nil) type Application struct { types.BaseApplication - + mu sync.Mutex state State RetainBlocks int64 // blocks to retain after commit (via ResponseCommit.RetainHeight) + logger log.Logger + + // validator set + ValUpdates []types.ValidatorUpdate + valAddrToPubKeyMap map[string]cryptoproto.PublicKey } func NewApplication() *Application { - state := loadState(dbm.NewMemDB()) - return &Application{state: state} + return &Application{ + logger: log.NewNopLogger(), + state: loadState(dbm.NewMemDB()), + valAddrToPubKeyMap: make(map[string]cryptoproto.PublicKey), + } } -func (app *Application) Info(req types.RequestInfo) (resInfo types.ResponseInfo) { +func (app *Application) InitChain(req types.RequestInitChain) types.ResponseInitChain { + app.mu.Lock() + defer app.mu.Unlock() + + for _, v := range req.Validators { + r := app.updateValidator(v) + if r.IsErr() { + app.logger.Error("error updating validators", "r", r) + panic("problem updating validators") + } + } + return types.ResponseInitChain{} +} + +func (app *Application) Info(req types.RequestInfo) types.ResponseInfo { + app.mu.Lock() + defer app.mu.Unlock() return types.ResponseInfo{ Data: fmt.Sprintf("{\"size\":%v}", app.state.Size), Version: version.ABCIVersion, @@ -85,8 +116,20 @@ func (app *Application) Info(req types.RequestInfo) (resInfo types.ResponseInfo) } } -// tx is either "key=value" or just arbitrary bytes -func (app *Application) HandleTx(tx []byte) *types.ResponseDeliverTx { +// tx is either "val:pubkey!power" or "key=value" or just arbitrary bytes +func (app *Application) handleTx(tx []byte) *types.ResponseDeliverTx { + // if it starts with "val:", update the validator set + // format is "val:pubkey!power" + if isValidatorTx(tx) { + // update validators in the merkle tree + // and in app.ValUpdates + return app.execValidatorTx(tx) + } + + if isPrepareTx(tx) { + return app.execPrepareTx(tx) + } + var key, value string parts := bytes.Split(tx, []byte("=")) if len(parts) == 2 { @@ -116,19 +159,53 @@ func (app *Application) HandleTx(tx []byte) *types.ResponseDeliverTx { return &types.ResponseDeliverTx{Code: code.CodeTypeOK, Events: events} } +func (app *Application) Close() error { + app.mu.Lock() + defer app.mu.Unlock() + + return app.state.db.Close() +} + func (app *Application) FinalizeBlock(req types.RequestFinalizeBlock) types.ResponseFinalizeBlock { - txs := make([]*types.ResponseDeliverTx, len(req.Txs)) + app.mu.Lock() + defer app.mu.Unlock() + + // reset valset changes + app.ValUpdates = make([]types.ValidatorUpdate, 0) + + // Punish validators who committed equivocation. + for _, ev := range req.ByzantineValidators { + if ev.Type == types.EvidenceType_DUPLICATE_VOTE { + addr := string(ev.Validator.Address) + if pubKey, ok := app.valAddrToPubKeyMap[addr]; ok { + app.updateValidator(types.ValidatorUpdate{ + PubKey: pubKey, + Power: ev.Validator.Power - 1, + }) + app.logger.Info("Decreased val power by 1 because of the equivocation", + "val", addr) + } else { + panic(fmt.Errorf("wanted to punish val %q but can't find it", addr)) + } + } + } + + respTxs := make([]*types.ResponseDeliverTx, len(req.Txs)) for i, tx := range req.Txs { - txs[i] = app.HandleTx(tx) + respTxs[i] = app.handleTx(tx) } - return types.ResponseFinalizeBlock{Txs: txs} + + return types.ResponseFinalizeBlock{Txs: respTxs, ValidatorUpdates: app.ValUpdates} } -func (app *Application) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx { +func (*Application) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx { return types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1} } func (app *Application) Commit() types.ResponseCommit { + app.mu.Lock() + defer app.mu.Unlock() + // Using a memdb - just return the big endian size of the db appHash := make([]byte, 8) binary.PutVarint(appHash, app.state.Size) @@ -144,43 +221,225 @@ func (app *Application) Commit() types.ResponseCommit { } // Returns an associated value or nil if missing. -func (app *Application) Query(reqQuery types.RequestQuery) (resQuery types.ResponseQuery) { +func (app *Application) Query(reqQuery types.RequestQuery) types.ResponseQuery { + app.mu.Lock() + defer app.mu.Unlock() + + if reqQuery.Path == "/val" { + key := []byte("val:" + string(reqQuery.Data)) + value, err := app.state.db.Get(key) + if err != nil { + panic(err) + } + + return types.ResponseQuery{ + Key: reqQuery.Data, + Value: value, + } + } + if reqQuery.Prove { value, err := app.state.db.Get(prefixKey(reqQuery.Data)) if err != nil { panic(err) } + + resQuery := types.ResponseQuery{ + Index: -1, + Key: reqQuery.Data, + Value: value, + Height: app.state.Height, + } + if value == nil { resQuery.Log = "does not exist" } else { resQuery.Log = "exists" } - resQuery.Index = -1 // TODO make Proof return index - resQuery.Key = reqQuery.Data - resQuery.Value = value - resQuery.Height = app.state.Height - return + return resQuery } - resQuery.Key = reqQuery.Data value, err := app.state.db.Get(prefixKey(reqQuery.Data)) if err != nil { panic(err) } + + resQuery := types.ResponseQuery{ + Key: reqQuery.Data, + Value: value, + Height: app.state.Height, + } + if value == nil { resQuery.Log = "does not exist" } else { resQuery.Log = "exists" } - resQuery.Value = value - resQuery.Height = app.state.Height return resQuery } -func (app *Application) PrepareProposal( - req types.RequestPrepareProposal) types.ResponsePrepareProposal { - return types.ResponsePrepareProposal{ - BlockData: req.BlockData} +func (app *Application) PrepareProposal(req types.RequestPrepareProposal) types.ResponsePrepareProposal { + app.mu.Lock() + defer app.mu.Unlock() + + return types.ResponsePrepareProposal{BlockData: app.substPrepareTx(req.BlockData)} +} + +func (*Application) ProcessProposal(req types.RequestProcessProposal) types.ResponseProcessProposal { + for _, tx := range req.Txs { + if len(tx) == 0 { + return types.ResponseProcessProposal{Result: types.ResponseProcessProposal_REJECT} + } + } + return types.ResponseProcessProposal{Result: types.ResponseProcessProposal_ACCEPT} +} + +//--------------------------------------------- +// update validators + +func (app *Application) Validators() (validators []types.ValidatorUpdate) { + app.mu.Lock() + defer app.mu.Unlock() + + itr, err := app.state.db.Iterator(nil, nil) + if err != nil { + panic(err) + } + for ; itr.Valid(); itr.Next() { + if isValidatorTx(itr.Key()) { + validator := new(types.ValidatorUpdate) + err := types.ReadMessage(bytes.NewBuffer(itr.Value()), validator) + if err != nil { + panic(err) + } + validators = append(validators, *validator) + } + } + if err = itr.Error(); err != nil { + panic(err) + } + return +} + +func MakeValSetChangeTx(pubkey cryptoproto.PublicKey, power int64) []byte { + pk, err := encoding.PubKeyFromProto(pubkey) + if err != nil { + panic(err) + } + pubStr := base64.StdEncoding.EncodeToString(pk.Bytes()) + return []byte(fmt.Sprintf("val:%s!%d", pubStr, power)) +} + +func isValidatorTx(tx []byte) bool { + return strings.HasPrefix(string(tx), ValidatorSetChangePrefix) +} + +// format is "val:pubkey!power" +// pubkey is a base64-encoded 32-byte ed25519 key +func (app *Application) execValidatorTx(tx []byte) *types.ResponseDeliverTx { + tx = tx[len(ValidatorSetChangePrefix):] + + // get the pubkey and power + pubKeyAndPower := strings.Split(string(tx), "!") + if len(pubKeyAndPower) != 2 { + return &types.ResponseDeliverTx{ + Code: code.CodeTypeEncodingError, + Log: fmt.Sprintf("Expected 'pubkey!power'. Got %v", pubKeyAndPower)} + } + pubkeyS, powerS := pubKeyAndPower[0], pubKeyAndPower[1] + + // decode the pubkey + pubkey, err := base64.StdEncoding.DecodeString(pubkeyS) + if err != nil { + return &types.ResponseDeliverTx{ + Code: code.CodeTypeEncodingError, + Log: fmt.Sprintf("Pubkey (%s) is invalid base64", pubkeyS)} + } + + // decode the power + power, err := strconv.ParseInt(powerS, 10, 64) + if err != nil { + return &types.ResponseDeliverTx{ + Code: code.CodeTypeEncodingError, + Log: fmt.Sprintf("Power (%s) is not an int", powerS)} + } + + // update + return app.updateValidator(types.UpdateValidator(pubkey, power, "")) +} + +// add, update, or remove a validator +func (app *Application) updateValidator(v types.ValidatorUpdate) *types.ResponseDeliverTx { + pubkey, err := encoding.PubKeyFromProto(v.PubKey) + if err != nil { + panic(fmt.Errorf("can't decode public key: %w", err)) + } + key := []byte("val:" + string(pubkey.Bytes())) + + if v.Power == 0 { + // remove validator + hasKey, err := app.state.db.Has(key) + if err != nil { + panic(err) + } + if !hasKey { + pubStr := base64.StdEncoding.EncodeToString(pubkey.Bytes()) + return &types.ResponseDeliverTx{ + Code: code.CodeTypeUnauthorized, + Log: fmt.Sprintf("Cannot remove non-existent validator %s", pubStr)} + } + if err = app.state.db.Delete(key); err != nil { + panic(err) + } + delete(app.valAddrToPubKeyMap, string(pubkey.Address())) + } else { + // add or update validator + value := bytes.NewBuffer(make([]byte, 0)) + if err := types.WriteMessage(&v, value); err != nil { + return &types.ResponseDeliverTx{ + Code: code.CodeTypeEncodingError, + Log: fmt.Sprintf("error encoding validator: %v", err)} + } + if err = app.state.db.Set(key, value.Bytes()); err != nil { + panic(err) + } + app.valAddrToPubKeyMap[string(pubkey.Address())] = v.PubKey + } + + // we only update the changes array if we successfully updated the tree + app.ValUpdates = append(app.ValUpdates, v) + + return &types.ResponseDeliverTx{Code: code.CodeTypeOK} +} + +// ----------------------------- +// prepare proposal machinery + +const PreparePrefix = "prepare" + +func isPrepareTx(tx []byte) bool { + return strings.HasPrefix(string(tx), PreparePrefix) +} + +// execPrepareTx is noop. tx data is considered as placeholder +// and is substitute at the PrepareProposal. +func (app *Application) execPrepareTx(tx []byte) *types.ResponseDeliverTx { + // noop + return &types.ResponseDeliverTx{} +} + +// substPrepareTx subst all the preparetx in the blockdata +// to null string(could be any arbitrary string). +func (app *Application) substPrepareTx(blockData [][]byte) [][]byte { + // TODO: this mechanism will change with the current spec of PrepareProposal + // We now have a special type for marking a tx as changed + for i, tx := range blockData { + if isPrepareTx(tx) { + blockData[i] = make([]byte, len(tx)) + } + } + + return blockData } diff --git a/abci/example/kvstore/kvstore_test.go b/abci/example/kvstore/kvstore_test.go index 21f54e0fe..23841d76a 100644 --- a/abci/example/kvstore/kvstore_test.go +++ b/abci/example/kvstore/kvstore_test.go @@ -118,10 +118,7 @@ func TestPersistentKVStoreInfo(t *testing.T) { // add a validator, remove a validator, update a validator func TestValUpdates(t *testing.T) { - dir := t.TempDir() - logger := log.NewTestingLogger(t) - - kvstore := NewPersistentKVStoreApplication(logger, dir) + kvstore := NewApplication() // init with some validators total := 10 @@ -210,6 +207,7 @@ func makeApplyBlock( // order doesn't matter func valsEqual(t *testing.T, vals1, vals2 []types.ValidatorUpdate) { + t.Helper() if len(vals1) != len(vals2) { t.Fatalf("vals dont match in len. got %d, expected %d", len(vals2), len(vals1)) } diff --git a/abci/example/kvstore/persistent_kvstore.go b/abci/example/kvstore/persistent_kvstore.go index 830f93235..2a6e8aa19 100644 --- a/abci/example/kvstore/persistent_kvstore.go +++ b/abci/example/kvstore/persistent_kvstore.go @@ -2,16 +2,10 @@ package kvstore import ( "bytes" - "encoding/base64" - "fmt" - "strconv" - "strings" dbm "github.com/tendermint/tm-db" - "github.com/tendermint/tendermint/abci/example/code" "github.com/tendermint/tendermint/abci/types" - "github.com/tendermint/tendermint/crypto/encoding" "github.com/tendermint/tendermint/libs/log" cryptoproto "github.com/tendermint/tendermint/proto/tendermint/crypto" ptypes "github.com/tendermint/tendermint/proto/tendermint/types" @@ -26,325 +20,42 @@ const ( var _ types.Application = (*PersistentKVStoreApplication)(nil) type PersistentKVStoreApplication struct { - app *Application - - // validator set - ValUpdates []types.ValidatorUpdate - - valAddrToPubKeyMap map[string]cryptoproto.PublicKey - - logger log.Logger + *Application } func NewPersistentKVStoreApplication(logger log.Logger, dbDir string) *PersistentKVStoreApplication { - name := "kvstore" - db, err := dbm.NewGoLevelDB(name, dbDir) + db, err := dbm.NewGoLevelDB("kvstore", dbDir) if err != nil { panic(err) } - state := loadState(db) - return &PersistentKVStoreApplication{ - app: &Application{state: state}, - valAddrToPubKeyMap: make(map[string]cryptoproto.PublicKey), - logger: logger, - } -} - -func (app *PersistentKVStoreApplication) Close() error { - return app.app.state.db.Close() -} - -func (app *PersistentKVStoreApplication) Info(req types.RequestInfo) types.ResponseInfo { - res := app.app.Info(req) - res.LastBlockHeight = app.app.state.Height - res.LastBlockAppHash = app.app.state.AppHash - return res -} - -// tx is either "val:pubkey!power" or "key=value" or just arbitrary bytes -func (app *PersistentKVStoreApplication) HandleTx(tx []byte) *types.ResponseDeliverTx { - // if it starts with "val:", update the validator set - // format is "val:pubkey!power" - if isValidatorTx(tx) { - // update validators in the merkle tree - // and in app.ValUpdates - return app.execValidatorTx(tx) - } - - if isPrepareTx(tx) { - return app.execPrepareTx(tx) - } - - // otherwise, update the key-value store - return app.app.HandleTx(tx) -} - -func (app *PersistentKVStoreApplication) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx { - return app.app.CheckTx(req) -} - -// Commit will panic if InitChain was not called -func (app *PersistentKVStoreApplication) Commit() types.ResponseCommit { - return app.app.Commit() -} - -// When path=/val and data={validator address}, returns the validator update (types.ValidatorUpdate) varint encoded. -// For any other path, returns an associated value or nil if missing. -func (app *PersistentKVStoreApplication) Query(reqQuery types.RequestQuery) (resQuery types.ResponseQuery) { - switch reqQuery.Path { - case "/val": - key := []byte("val:" + string(reqQuery.Data)) - value, err := app.app.state.db.Get(key) - if err != nil { - panic(err) - } - - resQuery.Key = reqQuery.Data - resQuery.Value = value - return - default: - return app.app.Query(reqQuery) + Application: &Application{ + valAddrToPubKeyMap: make(map[string]cryptoproto.PublicKey), + state: loadState(db), + logger: logger, + }, } } -// Save the validators in the merkle tree -func (app *PersistentKVStoreApplication) InitChain(req types.RequestInitChain) types.ResponseInitChain { - for _, v := range req.Validators { - r := app.updateValidator(v) - if r.IsErr() { - app.logger.Error("error updating validators", "r", r) - } - } - return types.ResponseInitChain{} -} - -// Track the block hash and header information -// Execute transactions -// Update the validator set -func (app *PersistentKVStoreApplication) FinalizeBlock(req types.RequestFinalizeBlock) types.ResponseFinalizeBlock { - // reset valset changes - app.ValUpdates = make([]types.ValidatorUpdate, 0) - - // Punish validators who committed equivocation. - for _, ev := range req.ByzantineValidators { - if ev.Type == types.EvidenceType_DUPLICATE_VOTE { - addr := string(ev.Validator.Address) - if pubKey, ok := app.valAddrToPubKeyMap[addr]; ok { - app.updateValidator(types.ValidatorUpdate{ - PubKey: pubKey, - Power: ev.Validator.Power - 1, - }) - app.logger.Info("Decreased val power by 1 because of the equivocation", - "val", addr) - } else { - app.logger.Error("Wanted to punish val, but can't find it", - "val", addr) - } - } - } - - respTxs := make([]*types.ResponseDeliverTx, len(req.Txs)) - for i, tx := range req.Txs { - respTxs[i] = app.HandleTx(tx) - } - - return types.ResponseFinalizeBlock{Txs: respTxs, ValidatorUpdates: app.ValUpdates} -} - -func (app *PersistentKVStoreApplication) ListSnapshots( - req types.RequestListSnapshots) types.ResponseListSnapshots { - return types.ResponseListSnapshots{} -} - -func (app *PersistentKVStoreApplication) LoadSnapshotChunk( - req types.RequestLoadSnapshotChunk) types.ResponseLoadSnapshotChunk { - return types.ResponseLoadSnapshotChunk{} -} - -func (app *PersistentKVStoreApplication) OfferSnapshot( - req types.RequestOfferSnapshot) types.ResponseOfferSnapshot { +func (app *PersistentKVStoreApplication) OfferSnapshot(req types.RequestOfferSnapshot) types.ResponseOfferSnapshot { return types.ResponseOfferSnapshot{Result: types.ResponseOfferSnapshot_ABORT} } -func (app *PersistentKVStoreApplication) ApplySnapshotChunk( - req types.RequestApplySnapshotChunk) types.ResponseApplySnapshotChunk { +func (app *PersistentKVStoreApplication) ApplySnapshotChunk(req types.RequestApplySnapshotChunk) types.ResponseApplySnapshotChunk { return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT} } -func (app *PersistentKVStoreApplication) ExtendVote( - req types.RequestExtendVote) types.ResponseExtendVote { - return types.ResponseExtendVote{ - VoteExtension: ConstructVoteExtension(req.Vote.ValidatorAddress), - } -} - -func (app *PersistentKVStoreApplication) VerifyVoteExtension( - req types.RequestVerifyVoteExtension) types.ResponseVerifyVoteExtension { - return types.RespondVerifyVoteExtension( - app.verifyExtension(req.Vote.ValidatorAddress, req.Vote.VoteExtension)) -} - -func (app *PersistentKVStoreApplication) PrepareProposal( - req types.RequestPrepareProposal) types.ResponsePrepareProposal { - return types.ResponsePrepareProposal{BlockData: app.substPrepareTx(req.BlockData)} -} - -func (app *PersistentKVStoreApplication) ProcessProposal( - req types.RequestProcessProposal) types.ResponseProcessProposal { - for _, tx := range req.Txs { - if len(tx) == 0 { - return types.ResponseProcessProposal{Result: types.ResponseProcessProposal_REJECT} - } - } - return types.ResponseProcessProposal{Result: types.ResponseProcessProposal_ACCEPT} -} - -//--------------------------------------------- -// update validators - -func (app *PersistentKVStoreApplication) Validators() (validators []types.ValidatorUpdate) { - itr, err := app.app.state.db.Iterator(nil, nil) - if err != nil { - panic(err) - } - for ; itr.Valid(); itr.Next() { - if isValidatorTx(itr.Key()) { - validator := new(types.ValidatorUpdate) - err := types.ReadMessage(bytes.NewBuffer(itr.Value()), validator) - if err != nil { - panic(err) - } - validators = append(validators, *validator) - } - } - if err = itr.Error(); err != nil { - panic(err) - } - return -} - -func MakeValSetChangeTx(pubkey cryptoproto.PublicKey, power int64) []byte { - pk, err := encoding.PubKeyFromProto(pubkey) - if err != nil { - panic(err) - } - pubStr := base64.StdEncoding.EncodeToString(pk.Bytes()) - return []byte(fmt.Sprintf("val:%s!%d", pubStr, power)) +func (app *PersistentKVStoreApplication) ExtendVote(req types.RequestExtendVote) types.ResponseExtendVote { + return types.ResponseExtendVote{VoteExtension: ConstructVoteExtension(req.Vote.ValidatorAddress)} } -func isValidatorTx(tx []byte) bool { - return strings.HasPrefix(string(tx), ValidatorSetChangePrefix) -} - -// format is "val:pubkey!power" -// pubkey is a base64-encoded 32-byte ed25519 key -func (app *PersistentKVStoreApplication) execValidatorTx(tx []byte) *types.ResponseDeliverTx { - tx = tx[len(ValidatorSetChangePrefix):] - - // get the pubkey and power - pubKeyAndPower := strings.Split(string(tx), "!") - if len(pubKeyAndPower) != 2 { - return &types.ResponseDeliverTx{ - Code: code.CodeTypeEncodingError, - Log: fmt.Sprintf("Expected 'pubkey!power'. Got %v", pubKeyAndPower)} - } - pubkeyS, powerS := pubKeyAndPower[0], pubKeyAndPower[1] - - // decode the pubkey - pubkey, err := base64.StdEncoding.DecodeString(pubkeyS) - if err != nil { - return &types.ResponseDeliverTx{ - Code: code.CodeTypeEncodingError, - Log: fmt.Sprintf("Pubkey (%s) is invalid base64", pubkeyS)} - } - - // decode the power - power, err := strconv.ParseInt(powerS, 10, 64) - if err != nil { - return &types.ResponseDeliverTx{ - Code: code.CodeTypeEncodingError, - Log: fmt.Sprintf("Power (%s) is not an int", powerS)} - } - - // update - return app.updateValidator(types.UpdateValidator(pubkey, power, "")) -} - -// add, update, or remove a validator -func (app *PersistentKVStoreApplication) updateValidator(v types.ValidatorUpdate) *types.ResponseDeliverTx { - pubkey, err := encoding.PubKeyFromProto(v.PubKey) - if err != nil { - panic(fmt.Errorf("can't decode public key: %w", err)) - } - key := []byte("val:" + string(pubkey.Bytes())) - - if v.Power == 0 { - // remove validator - hasKey, err := app.app.state.db.Has(key) - if err != nil { - panic(err) - } - if !hasKey { - pubStr := base64.StdEncoding.EncodeToString(pubkey.Bytes()) - return &types.ResponseDeliverTx{ - Code: code.CodeTypeUnauthorized, - Log: fmt.Sprintf("Cannot remove non-existent validator %s", pubStr)} - } - if err = app.app.state.db.Delete(key); err != nil { - panic(err) - } - delete(app.valAddrToPubKeyMap, string(pubkey.Address())) - } else { - // add or update validator - value := bytes.NewBuffer(make([]byte, 0)) - if err := types.WriteMessage(&v, value); err != nil { - return &types.ResponseDeliverTx{ - Code: code.CodeTypeEncodingError, - Log: fmt.Sprintf("error encoding validator: %v", err)} - } - if err = app.app.state.db.Set(key, value.Bytes()); err != nil { - panic(err) - } - app.valAddrToPubKeyMap[string(pubkey.Address())] = v.PubKey - } - - // we only update the changes array if we successfully updated the tree - app.ValUpdates = append(app.ValUpdates, v) - - return &types.ResponseDeliverTx{Code: code.CodeTypeOK} +func (app *PersistentKVStoreApplication) VerifyVoteExtension(req types.RequestVerifyVoteExtension) types.ResponseVerifyVoteExtension { + return types.RespondVerifyVoteExtension(app.verifyExtension(req.Vote.ValidatorAddress, req.Vote.VoteExtension)) } // ----------------------------- -const PreparePrefix = "prepare" - -func isPrepareTx(tx []byte) bool { - return strings.HasPrefix(string(tx), PreparePrefix) -} - -// execPrepareTx is noop. tx data is considered as placeholder -// and is substitute at the PrepareProposal. -func (app *PersistentKVStoreApplication) execPrepareTx(tx []byte) *types.ResponseDeliverTx { - // noop - return &types.ResponseDeliverTx{} -} - -// substPrepareTx subst all the preparetx in the blockdata -// to null string(could be any arbitrary string). -func (app *PersistentKVStoreApplication) substPrepareTx(blockData [][]byte) [][]byte { - // TODO: this mechanism will change with the current spec of PrepareProposal - // We now have a special type for marking a tx as changed - for i, tx := range blockData { - if isPrepareTx(tx) { - blockData[i] = make([]byte, len(tx)) - } - } - - return blockData -} - func ConstructVoteExtension(valAddr []byte) *ptypes.VoteExtension { return &ptypes.VoteExtension{ AppDataToSign: valAddr, diff --git a/abci/types/application.go b/abci/types/application.go index 16ae03546..b3ebf707f 100644 --- a/abci/types/application.go +++ b/abci/types/application.go @@ -41,8 +41,7 @@ type Application interface { var _ Application = (*BaseApplication)(nil) -type BaseApplication struct { -} +type BaseApplication struct{} func NewBaseApplication() *BaseApplication { return &BaseApplication{} diff --git a/internal/consensus/byzantine_test.go b/internal/consensus/byzantine_test.go index 7a5260a64..33e1dbf63 100644 --- a/internal/consensus/byzantine_test.go +++ b/internal/consensus/byzantine_test.go @@ -14,6 +14,7 @@ import ( dbm "github.com/tendermint/tm-db" abciclient "github.com/tendermint/tendermint/abci/client" + "github.com/tendermint/tendermint/abci/example/kvstore" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/evidence" @@ -36,7 +37,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { // kind of deadlock and hit the larger timeout. This timeout // can be extended a bunch if needed, but it's good to avoid // falling back to a much coarser timeout - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() config := configSetup(t) @@ -45,7 +46,6 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { prevoteHeight := int64(2) testName := "consensus_byzantine_test" tickerFunc := newMockTickerFunc(true) - appFunc := newKVStore valSet, privVals := factory.ValidatorSet(ctx, t, nValidators, 30) genDoc := factory.GenesisDoc(config, time.Now(), valSet.Validators, nil) @@ -66,7 +66,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { defer os.RemoveAll(thisConfig.RootDir) ensureDir(t, path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal - app := appFunc(t, logger) + app := kvstore.NewApplication() vals := types.TM2PB.ValidatorUpdates(state.Validators) app.InitChain(abci.RequestInitChain{Validators: vals}) diff --git a/internal/consensus/common_test.go b/internal/consensus/common_test.go index cfd232b34..fd9780a15 100644 --- a/internal/consensus/common_test.go +++ b/internal/consensus/common_test.go @@ -5,7 +5,6 @@ import ( "context" "errors" "fmt" - "io" "os" "path/filepath" "sort" @@ -750,7 +749,6 @@ func makeConsensusState( nValidators int, testName string, tickerFunc func() TimeoutTicker, - appFunc func(t *testing.T, logger log.Logger) abci.Application, configOpts ...func(*config.Config), ) ([]*State, cleanupFunc) { t.Helper() @@ -779,11 +777,8 @@ func makeConsensusState( ensureDir(t, filepath.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal - app := appFunc(t, logger) - - if appCloser, ok := app.(io.Closer); ok { - closeFuncs = append(closeFuncs, appCloser.Close) - } + app := kvstore.NewApplication() + closeFuncs = append(closeFuncs, app.Close) vals := types.TM2PB.ValidatorUpdates(state.Validators) app.InitChain(abci.RequestInitChain{Validators: vals}) @@ -934,19 +929,11 @@ func (m *mockTicker) Chan() <-chan timeoutInfo { return m.c } -func newPersistentKVStore(t *testing.T, logger log.Logger) abci.Application { - t.Helper() - - dir := t.TempDir() - - return kvstore.NewPersistentKVStoreApplication(logger, dir) -} - -func newKVStore(_ *testing.T, _ log.Logger) abci.Application { +func newEpehemeralKVStore(_ log.Logger, _ string) abci.Application { return kvstore.NewApplication() } -func newPersistentKVStoreWithPath(logger log.Logger, dbDir string) abci.Application { +func newPersistentKVStore(logger log.Logger, dbDir string) abci.Application { return kvstore.NewPersistentKVStoreApplication(logger, dbDir) } diff --git a/internal/consensus/invalid_test.go b/internal/consensus/invalid_test.go index a3e865c68..142414d30 100644 --- a/internal/consensus/invalid_test.go +++ b/internal/consensus/invalid_test.go @@ -27,7 +27,7 @@ func TestReactorInvalidPrecommit(t *testing.T) { n := 4 states, cleanup := makeConsensusState(ctx, t, config, n, "consensus_reactor_test", - newMockTickerFunc(true), newKVStore) + newMockTickerFunc(true)) t.Cleanup(cleanup) for i := 0; i < 4; i++ { diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index cd6313650..97fcd091f 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -326,7 +326,7 @@ func TestReactorBasic(t *testing.T) { n := 4 states, cleanup := makeConsensusState(ctx, t, cfg, n, "consensus_reactor_test", - newMockTickerFunc(true), newKVStore) + newMockTickerFunc(true)) t.Cleanup(cleanup) rts := setup(ctx, t, n, states, 100) // buffer must be large enough to not deadlock @@ -379,7 +379,6 @@ func TestReactorWithEvidence(t *testing.T) { n := 4 testName := "consensus_reactor_test" tickerFunc := newMockTickerFunc(true) - appFunc := newKVStore valSet, privVals := factory.ValidatorSet(ctx, t, n, 30) genDoc := factory.GenesisDoc(cfg, time.Now(), valSet.Validators, nil) @@ -397,7 +396,7 @@ func TestReactorWithEvidence(t *testing.T) { defer os.RemoveAll(thisConfig.RootDir) ensureDir(t, path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal - app := appFunc(t, logger) + app := kvstore.NewApplication() vals := types.TM2PB.ValidatorUpdates(state.Validators) app.InitChain(abci.RequestInitChain{Validators: vals}) @@ -491,7 +490,6 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) { n, "consensus_reactor_test", newMockTickerFunc(true), - newKVStore, func(c *config.Config) { c.Consensus.CreateEmptyBlocks = false }, @@ -543,7 +541,7 @@ func TestReactorRecordsVotesAndBlockParts(t *testing.T) { n := 4 states, cleanup := makeConsensusState(ctx, t, cfg, n, "consensus_reactor_test", - newMockTickerFunc(true), newKVStore) + newMockTickerFunc(true)) t.Cleanup(cleanup) rts := setup(ctx, t, n, states, 100) // buffer must be large enough to not deadlock @@ -612,7 +610,6 @@ func TestReactorVotingPowerChange(t *testing.T) { n, "consensus_voting_power_changes_test", newMockTickerFunc(true), - newPersistentKVStore, ) t.Cleanup(cleanup) @@ -722,7 +719,7 @@ func TestReactorValidatorSetChanges(t *testing.T) { nPeers, "consensus_val_set_changes_test", newMockTickerFunc(true), - newPersistentKVStoreWithPath, + newEpehemeralKVStore, ) t.Cleanup(cleanup) diff --git a/internal/consensus/replay_test.go b/internal/consensus/replay_test.go index 82ca54f31..b9302d125 100644 --- a/internal/consensus/replay_test.go +++ b/internal/consensus/replay_test.go @@ -339,7 +339,7 @@ func setupSimulator(ctx context.Context, t *testing.T) *simulatorTestSuite { nPeers, "replay_test", newMockTickerFunc(true), - newPersistentKVStoreWithPath) + newPersistentKVStore) sim.Config = cfg var err error diff --git a/rpc/client/main_test.go b/rpc/client/main_test.go index d799d8b85..45a83afbb 100644 --- a/rpc/client/main_test.go +++ b/rpc/client/main_test.go @@ -2,7 +2,6 @@ package client_test import ( "context" - "os" "testing" "github.com/stretchr/testify/assert" @@ -23,10 +22,9 @@ func NodeSuite(ctx context.Context, t *testing.T, logger log.Logger) (service.Se conf, err := rpctest.CreateConfig(t, t.Name()) require.NoError(t, err) - // start a tendermint node in the background to test against - dir := t.TempDir() - app := kvstore.NewPersistentKVStoreApplication(logger, dir) + app := kvstore.NewApplication() + // start a tendermint node in the background to test against. node, closer, err := rpctest.StartTendermint(ctx, conf, app, rpctest.SuppressStdout) require.NoError(t, err) t.Cleanup(func() { @@ -34,7 +32,6 @@ func NodeSuite(ctx context.Context, t *testing.T, logger log.Logger) (service.Se assert.NoError(t, closer(ctx)) assert.NoError(t, app.Close()) node.Wait() - _ = os.RemoveAll(dir) }) return node, conf } diff --git a/test/e2e/app/app.go b/test/e2e/app/app.go index a7348ecd9..26a511718 100644 --- a/test/e2e/app/app.go +++ b/test/e2e/app/app.go @@ -8,6 +8,7 @@ import ( "path/filepath" "sort" "strconv" + "sync" "github.com/tendermint/tendermint/abci/example/code" abci "github.com/tendermint/tendermint/abci/types" @@ -21,6 +22,7 @@ import ( // to disk as JSON, taking state sync snapshots if requested. type Application struct { abci.BaseApplication + mu sync.Mutex logger log.Logger state *State snapshots *SnapshotStore @@ -98,8 +100,8 @@ func NewApplication(cfg *Config) (*Application, error) { // Info implements ABCI. func (app *Application) Info(req abci.RequestInfo) abci.ResponseInfo { - app.state.RLock() - defer app.state.RUnlock() + app.mu.Lock() + defer app.mu.Unlock() return abci.ResponseInfo{ Version: version.ABCIVersion, @@ -111,6 +113,9 @@ func (app *Application) Info(req abci.RequestInfo) abci.ResponseInfo { // Info implements ABCI. func (app *Application) InitChain(req abci.RequestInitChain) abci.ResponseInitChain { + app.mu.Lock() + defer app.mu.Unlock() + var err error app.state.initialHeight = uint64(req.InitialHeight) if len(req.AppStateBytes) > 0 { @@ -135,6 +140,9 @@ func (app *Application) InitChain(req abci.RequestInitChain) abci.ResponseInitCh // CheckTx implements ABCI. func (app *Application) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { + app.mu.Lock() + defer app.mu.Unlock() + _, _, err := parseTx(req.Tx) if err != nil { return abci.ResponseCheckTx{ @@ -149,6 +157,9 @@ func (app *Application) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { func (app *Application) FinalizeBlock(req abci.RequestFinalizeBlock) abci.ResponseFinalizeBlock { var txs = make([]*abci.ResponseDeliverTx, len(req.Txs)) + app.mu.Lock() + defer app.mu.Unlock() + for i, tx := range req.Txs { key, value, err := parseTx(tx) if err != nil { @@ -187,6 +198,9 @@ func (app *Application) FinalizeBlock(req abci.RequestFinalizeBlock) abci.Respon // Commit implements ABCI. func (app *Application) Commit() abci.ResponseCommit { + app.mu.Lock() + defer app.mu.Unlock() + height, hash, err := app.state.Commit() if err != nil { panic(err) @@ -214,6 +228,9 @@ func (app *Application) Commit() abci.ResponseCommit { // Query implements ABCI. func (app *Application) Query(req abci.RequestQuery) abci.ResponseQuery { + app.mu.Lock() + defer app.mu.Unlock() + return abci.ResponseQuery{ Height: int64(app.state.Height), Key: req.Data, @@ -223,6 +240,9 @@ func (app *Application) Query(req abci.RequestQuery) abci.ResponseQuery { // ListSnapshots implements ABCI. func (app *Application) ListSnapshots(req abci.RequestListSnapshots) abci.ResponseListSnapshots { + app.mu.Lock() + defer app.mu.Unlock() + snapshots, err := app.snapshots.List() if err != nil { panic(err) @@ -232,6 +252,9 @@ func (app *Application) ListSnapshots(req abci.RequestListSnapshots) abci.Respon // LoadSnapshotChunk implements ABCI. func (app *Application) LoadSnapshotChunk(req abci.RequestLoadSnapshotChunk) abci.ResponseLoadSnapshotChunk { + app.mu.Lock() + defer app.mu.Unlock() + chunk, err := app.snapshots.LoadChunk(req.Height, req.Format, req.Chunk) if err != nil { panic(err) @@ -241,6 +264,9 @@ func (app *Application) LoadSnapshotChunk(req abci.RequestLoadSnapshotChunk) abc // OfferSnapshot implements ABCI. func (app *Application) OfferSnapshot(req abci.RequestOfferSnapshot) abci.ResponseOfferSnapshot { + app.mu.Lock() + defer app.mu.Unlock() + if app.restoreSnapshot != nil { panic("A snapshot is already being restored") } @@ -251,6 +277,9 @@ func (app *Application) OfferSnapshot(req abci.RequestOfferSnapshot) abci.Respon // ApplySnapshotChunk implements ABCI. func (app *Application) ApplySnapshotChunk(req abci.RequestApplySnapshotChunk) abci.ResponseApplySnapshotChunk { + app.mu.Lock() + defer app.mu.Unlock() + if app.restoreSnapshot == nil { panic("No restore in progress") } @@ -270,12 +299,14 @@ func (app *Application) ApplySnapshotChunk(req abci.RequestApplySnapshotChunk) a return abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT} } -func (app *Application) PrepareProposal( - req abci.RequestPrepareProposal) abci.ResponsePrepareProposal { +func (app *Application) PrepareProposal(req abci.RequestPrepareProposal) abci.ResponsePrepareProposal { return abci.ResponsePrepareProposal{BlockData: req.BlockData} } func (app *Application) Rollback() error { + app.mu.Lock() + defer app.mu.Unlock() + return app.state.Rollback() }