You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

362 lines
10 KiB

  1. package app
  2. import (
  3. "bytes"
  4. "encoding/base64"
  5. "errors"
  6. "fmt"
  7. "path/filepath"
  8. "sort"
  9. "strconv"
  10. "sync"
  11. "github.com/tendermint/tendermint/abci/example/code"
  12. abci "github.com/tendermint/tendermint/abci/types"
  13. "github.com/tendermint/tendermint/libs/log"
  14. "github.com/tendermint/tendermint/proto/tendermint/types"
  15. "github.com/tendermint/tendermint/version"
  16. )
  17. // Application is an ABCI application for use by end-to-end tests. It is a
  18. // simple key/value store for strings, storing data in memory and persisting
  19. // to disk as JSON, taking state sync snapshots if requested.
  20. type Application struct {
  21. abci.BaseApplication
  22. mu sync.Mutex
  23. logger log.Logger
  24. state *State
  25. snapshots *SnapshotStore
  26. cfg *Config
  27. restoreSnapshot *abci.Snapshot
  28. restoreChunks [][]byte
  29. }
  30. // Config allows for the setting of high level parameters for running the e2e Application
  31. // KeyType and ValidatorUpdates must be the same for all nodes running the same application.
  32. type Config struct {
  33. // The directory with which state.json will be persisted in. Usually $HOME/.tendermint/data
  34. Dir string `toml:"dir"`
  35. // SnapshotInterval specifies the height interval at which the application
  36. // will take state sync snapshots. Defaults to 0 (disabled).
  37. SnapshotInterval uint64 `toml:"snapshot_interval"`
  38. // RetainBlocks specifies the number of recent blocks to retain. Defaults to
  39. // 0, which retains all blocks. Must be greater that PersistInterval,
  40. // SnapshotInterval and EvidenceAgeHeight.
  41. RetainBlocks uint64 `toml:"retain_blocks"`
  42. // KeyType sets the curve that will be used by validators.
  43. // Options are ed25519 & secp256k1
  44. KeyType string `toml:"key_type"`
  45. // PersistInterval specifies the height interval at which the application
  46. // will persist state to disk. Defaults to 1 (every height), setting this to
  47. // 0 disables state persistence.
  48. PersistInterval uint64 `toml:"persist_interval"`
  49. // ValidatorUpdates is a map of heights to validator names and their power,
  50. // and will be returned by the ABCI application. For example, the following
  51. // changes the power of validator01 and validator02 at height 1000:
  52. //
  53. // [validator_update.1000]
  54. // validator01 = 20
  55. // validator02 = 10
  56. //
  57. // Specifying height 0 returns the validator update during InitChain. The
  58. // application returns the validator updates as-is, i.e. removing a
  59. // validator must be done by returning it with power 0, and any validators
  60. // not specified are not changed.
  61. //
  62. // height <-> pubkey <-> voting power
  63. ValidatorUpdates map[string]map[string]uint8 `toml:"validator_update"`
  64. }
  65. func DefaultConfig(dir string) *Config {
  66. return &Config{
  67. PersistInterval: 1,
  68. SnapshotInterval: 100,
  69. Dir: dir,
  70. }
  71. }
  72. // NewApplication creates the application.
  73. func NewApplication(cfg *Config) (*Application, error) {
  74. state, err := NewState(cfg.Dir, cfg.PersistInterval)
  75. if err != nil {
  76. return nil, err
  77. }
  78. snapshots, err := NewSnapshotStore(filepath.Join(cfg.Dir, "snapshots"))
  79. if err != nil {
  80. return nil, err
  81. }
  82. return &Application{
  83. logger: log.MustNewDefaultLogger(log.LogFormatPlain, log.LogLevelInfo),
  84. state: state,
  85. snapshots: snapshots,
  86. cfg: cfg,
  87. }, nil
  88. }
  89. // Info implements ABCI.
  90. func (app *Application) Info(req abci.RequestInfo) abci.ResponseInfo {
  91. app.mu.Lock()
  92. defer app.mu.Unlock()
  93. return abci.ResponseInfo{
  94. Version: version.ABCIVersion,
  95. AppVersion: 1,
  96. LastBlockHeight: int64(app.state.Height),
  97. LastBlockAppHash: app.state.Hash,
  98. }
  99. }
  100. // Info implements ABCI.
  101. func (app *Application) InitChain(req abci.RequestInitChain) abci.ResponseInitChain {
  102. app.mu.Lock()
  103. defer app.mu.Unlock()
  104. var err error
  105. app.state.initialHeight = uint64(req.InitialHeight)
  106. if len(req.AppStateBytes) > 0 {
  107. err = app.state.Import(0, req.AppStateBytes)
  108. if err != nil {
  109. panic(err)
  110. }
  111. }
  112. resp := abci.ResponseInitChain{
  113. AppHash: app.state.Hash,
  114. ConsensusParams: &types.ConsensusParams{
  115. Version: &types.VersionParams{
  116. AppVersion: 1,
  117. },
  118. },
  119. }
  120. if resp.Validators, err = app.validatorUpdates(0); err != nil {
  121. panic(err)
  122. }
  123. return resp
  124. }
  125. // CheckTx implements ABCI.
  126. func (app *Application) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
  127. app.mu.Lock()
  128. defer app.mu.Unlock()
  129. _, _, err := parseTx(req.Tx)
  130. if err != nil {
  131. return abci.ResponseCheckTx{
  132. Code: code.CodeTypeEncodingError,
  133. Log: err.Error(),
  134. }
  135. }
  136. return abci.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}
  137. }
  138. // FinalizeBlock implements ABCI.
  139. func (app *Application) FinalizeBlock(req abci.RequestFinalizeBlock) abci.ResponseFinalizeBlock {
  140. var txs = make([]*abci.ResponseDeliverTx, len(req.Txs))
  141. app.mu.Lock()
  142. defer app.mu.Unlock()
  143. for i, tx := range req.Txs {
  144. key, value, err := parseTx(tx)
  145. if err != nil {
  146. panic(err) // shouldn't happen since we verified it in CheckTx
  147. }
  148. app.state.Set(key, value)
  149. txs[i] = &abci.ResponseDeliverTx{Code: code.CodeTypeOK}
  150. }
  151. valUpdates, err := app.validatorUpdates(uint64(req.Height))
  152. if err != nil {
  153. panic(err)
  154. }
  155. return abci.ResponseFinalizeBlock{
  156. Txs: txs,
  157. ValidatorUpdates: valUpdates,
  158. Events: []abci.Event{
  159. {
  160. Type: "val_updates",
  161. Attributes: []abci.EventAttribute{
  162. {
  163. Key: "size",
  164. Value: strconv.Itoa(valUpdates.Len()),
  165. },
  166. {
  167. Key: "height",
  168. Value: strconv.Itoa(int(req.Height)),
  169. },
  170. },
  171. },
  172. },
  173. }
  174. }
  175. // Commit implements ABCI.
  176. func (app *Application) Commit() abci.ResponseCommit {
  177. app.mu.Lock()
  178. defer app.mu.Unlock()
  179. height, hash, err := app.state.Commit()
  180. if err != nil {
  181. panic(err)
  182. }
  183. if app.cfg.SnapshotInterval > 0 && height%app.cfg.SnapshotInterval == 0 {
  184. snapshot, err := app.snapshots.Create(app.state)
  185. if err != nil {
  186. panic(err)
  187. }
  188. app.logger.Info("Created state sync snapshot", "height", snapshot.Height)
  189. err = app.snapshots.Prune(maxSnapshotCount)
  190. if err != nil {
  191. app.logger.Error("Failed to prune snapshots", "err", err)
  192. }
  193. }
  194. retainHeight := int64(0)
  195. if app.cfg.RetainBlocks > 0 {
  196. retainHeight = int64(height - app.cfg.RetainBlocks + 1)
  197. }
  198. return abci.ResponseCommit{
  199. Data: hash,
  200. RetainHeight: retainHeight,
  201. }
  202. }
  203. // Query implements ABCI.
  204. func (app *Application) Query(req abci.RequestQuery) abci.ResponseQuery {
  205. app.mu.Lock()
  206. defer app.mu.Unlock()
  207. return abci.ResponseQuery{
  208. Height: int64(app.state.Height),
  209. Key: req.Data,
  210. Value: []byte(app.state.Get(string(req.Data))),
  211. }
  212. }
  213. // ListSnapshots implements ABCI.
  214. func (app *Application) ListSnapshots(req abci.RequestListSnapshots) abci.ResponseListSnapshots {
  215. app.mu.Lock()
  216. defer app.mu.Unlock()
  217. snapshots, err := app.snapshots.List()
  218. if err != nil {
  219. panic(err)
  220. }
  221. return abci.ResponseListSnapshots{Snapshots: snapshots}
  222. }
  223. // LoadSnapshotChunk implements ABCI.
  224. func (app *Application) LoadSnapshotChunk(req abci.RequestLoadSnapshotChunk) abci.ResponseLoadSnapshotChunk {
  225. app.mu.Lock()
  226. defer app.mu.Unlock()
  227. chunk, err := app.snapshots.LoadChunk(req.Height, req.Format, req.Chunk)
  228. if err != nil {
  229. panic(err)
  230. }
  231. return abci.ResponseLoadSnapshotChunk{Chunk: chunk}
  232. }
  233. // OfferSnapshot implements ABCI.
  234. func (app *Application) OfferSnapshot(req abci.RequestOfferSnapshot) abci.ResponseOfferSnapshot {
  235. app.mu.Lock()
  236. defer app.mu.Unlock()
  237. if app.restoreSnapshot != nil {
  238. panic("A snapshot is already being restored")
  239. }
  240. app.restoreSnapshot = req.Snapshot
  241. app.restoreChunks = [][]byte{}
  242. return abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ACCEPT}
  243. }
  244. // ApplySnapshotChunk implements ABCI.
  245. func (app *Application) ApplySnapshotChunk(req abci.RequestApplySnapshotChunk) abci.ResponseApplySnapshotChunk {
  246. app.mu.Lock()
  247. defer app.mu.Unlock()
  248. if app.restoreSnapshot == nil {
  249. panic("No restore in progress")
  250. }
  251. app.restoreChunks = append(app.restoreChunks, req.Chunk)
  252. if len(app.restoreChunks) == int(app.restoreSnapshot.Chunks) {
  253. bz := []byte{}
  254. for _, chunk := range app.restoreChunks {
  255. bz = append(bz, chunk...)
  256. }
  257. err := app.state.Import(app.restoreSnapshot.Height, bz)
  258. if err != nil {
  259. panic(err)
  260. }
  261. app.restoreSnapshot = nil
  262. app.restoreChunks = nil
  263. }
  264. return abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}
  265. }
  266. func (app *Application) PrepareProposal(req abci.RequestPrepareProposal) abci.ResponsePrepareProposal {
  267. return abci.ResponsePrepareProposal{BlockData: req.BlockData}
  268. }
  269. // ProcessProposal implements part of the Application interface.
  270. // It accepts any proposal that does not contain a malformed transaction.
  271. func (app *Application) ProcessProposal(req abci.RequestProcessProposal) abci.ResponseProcessProposal {
  272. for _, tx := range req.Txs {
  273. _, _, err := parseTx(tx)
  274. if err != nil {
  275. return abci.ResponseProcessProposal{Accept: false}
  276. }
  277. }
  278. return abci.ResponseProcessProposal{Accept: true}
  279. }
  280. func (app *Application) Rollback() error {
  281. app.mu.Lock()
  282. defer app.mu.Unlock()
  283. return app.state.Rollback()
  284. }
  285. // validatorUpdates generates a validator set update.
  286. func (app *Application) validatorUpdates(height uint64) (abci.ValidatorUpdates, error) {
  287. updates := app.cfg.ValidatorUpdates[fmt.Sprintf("%v", height)]
  288. if len(updates) == 0 {
  289. return nil, nil
  290. }
  291. valUpdates := abci.ValidatorUpdates{}
  292. for keyString, power := range updates {
  293. keyBytes, err := base64.StdEncoding.DecodeString(keyString)
  294. if err != nil {
  295. return nil, fmt.Errorf("invalid base64 pubkey value %q: %w", keyString, err)
  296. }
  297. valUpdates = append(valUpdates, abci.UpdateValidator(keyBytes, int64(power), app.cfg.KeyType))
  298. }
  299. // the validator updates could be returned in arbitrary order,
  300. // and that seems potentially bad. This orders the validator
  301. // set.
  302. sort.Slice(valUpdates, func(i, j int) bool {
  303. return valUpdates[i].PubKey.Compare(valUpdates[j].PubKey) < 0
  304. })
  305. return valUpdates, nil
  306. }
  307. // parseTx parses a tx in 'key=value' format into a key and value.
  308. func parseTx(tx []byte) (string, string, error) {
  309. parts := bytes.Split(tx, []byte("="))
  310. if len(parts) != 2 {
  311. return "", "", fmt.Errorf("invalid tx format: %q", string(tx))
  312. }
  313. if len(parts[0]) == 0 {
  314. return "", "", errors.New("key cannot be empty")
  315. }
  316. return string(parts[0]), string(parts[1]), nil
  317. }