You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

451 lines
14 KiB

7 years ago
7 years ago
add support for block pruning via ABCI Commit response (#4588) * Added BlockStore.DeleteBlock() * Added initial block pruner prototype * wip * Added BlockStore.PruneBlocks() * Added consensus setting for block pruning * Added BlockStore base * Error on replay if base does not have blocks * Handle missing blocks when sending VoteSetMaj23Message * Error message tweak * Properly update blockstore state * Error message fix again * blockchain: ignore peer missing blocks * Added FIXME * Added test for block replay with truncated history * Handle peer base in blockchain reactor * Improved replay error handling * Added tests for Store.PruneBlocks() * Fix non-RPC handling of truncated block history * Panic on missing block meta in needProofBlock() * Updated changelog * Handle truncated block history in RPC layer * Added info about earliest block in /status RPC * Reorder height and base in blockchain reactor messages * Updated changelog * Fix tests * Appease linter * Minor review fixes * Non-empty BlockStores should always have base > 0 * Update code to assume base > 0 invariant * Added blockstore tests for pruning to 0 * Make sure we don't prune below the current base * Added BlockStore.Size() * config: added retain_blocks recommendations * Update v1 blockchain reactor to handle blockstore base * Added state database pruning * Propagate errors on missing validator sets * Comment tweaks * Improved error message Co-Authored-By: Anton Kaliaev <anton.kalyaev@gmail.com> * use ABCI field ResponseCommit.retain_height instead of retain-blocks config option * remove State.RetainHeight, return value instead * fix minor issues * rename pruneHeights() to pruneBlocks() * noop to fix GitHub borkage Co-authored-by: Anton Kaliaev <anton.kalyaev@gmail.com>
5 years ago
add support for block pruning via ABCI Commit response (#4588) * Added BlockStore.DeleteBlock() * Added initial block pruner prototype * wip * Added BlockStore.PruneBlocks() * Added consensus setting for block pruning * Added BlockStore base * Error on replay if base does not have blocks * Handle missing blocks when sending VoteSetMaj23Message * Error message tweak * Properly update blockstore state * Error message fix again * blockchain: ignore peer missing blocks * Added FIXME * Added test for block replay with truncated history * Handle peer base in blockchain reactor * Improved replay error handling * Added tests for Store.PruneBlocks() * Fix non-RPC handling of truncated block history * Panic on missing block meta in needProofBlock() * Updated changelog * Handle truncated block history in RPC layer * Added info about earliest block in /status RPC * Reorder height and base in blockchain reactor messages * Updated changelog * Fix tests * Appease linter * Minor review fixes * Non-empty BlockStores should always have base > 0 * Update code to assume base > 0 invariant * Added blockstore tests for pruning to 0 * Make sure we don't prune below the current base * Added BlockStore.Size() * config: added retain_blocks recommendations * Update v1 blockchain reactor to handle blockstore base * Added state database pruning * Propagate errors on missing validator sets * Comment tweaks * Improved error message Co-Authored-By: Anton Kaliaev <anton.kalyaev@gmail.com> * use ABCI field ResponseCommit.retain_height instead of retain-blocks config option * remove State.RetainHeight, return value instead * fix minor issues * rename pruneHeights() to pruneBlocks() * noop to fix GitHub borkage Co-authored-by: Anton Kaliaev <anton.kalyaev@gmail.com>
5 years ago
add support for block pruning via ABCI Commit response (#4588) * Added BlockStore.DeleteBlock() * Added initial block pruner prototype * wip * Added BlockStore.PruneBlocks() * Added consensus setting for block pruning * Added BlockStore base * Error on replay if base does not have blocks * Handle missing blocks when sending VoteSetMaj23Message * Error message tweak * Properly update blockstore state * Error message fix again * blockchain: ignore peer missing blocks * Added FIXME * Added test for block replay with truncated history * Handle peer base in blockchain reactor * Improved replay error handling * Added tests for Store.PruneBlocks() * Fix non-RPC handling of truncated block history * Panic on missing block meta in needProofBlock() * Updated changelog * Handle truncated block history in RPC layer * Added info about earliest block in /status RPC * Reorder height and base in blockchain reactor messages * Updated changelog * Fix tests * Appease linter * Minor review fixes * Non-empty BlockStores should always have base > 0 * Update code to assume base > 0 invariant * Added blockstore tests for pruning to 0 * Make sure we don't prune below the current base * Added BlockStore.Size() * config: added retain_blocks recommendations * Update v1 blockchain reactor to handle blockstore base * Added state database pruning * Propagate errors on missing validator sets * Comment tweaks * Improved error message Co-Authored-By: Anton Kaliaev <anton.kalyaev@gmail.com> * use ABCI field ResponseCommit.retain_height instead of retain-blocks config option * remove State.RetainHeight, return value instead * fix minor issues * rename pruneHeights() to pruneBlocks() * noop to fix GitHub borkage Co-authored-by: Anton Kaliaev <anton.kalyaev@gmail.com>
5 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package state
  2. import (
  3. "fmt"
  4. dbm "github.com/tendermint/tm-db"
  5. abci "github.com/tendermint/tendermint/abci/types"
  6. tmmath "github.com/tendermint/tendermint/libs/math"
  7. tmos "github.com/tendermint/tendermint/libs/os"
  8. tmproto "github.com/tendermint/tendermint/proto/types"
  9. "github.com/tendermint/tendermint/types"
  10. )
  11. const (
  12. // persist validators every valSetCheckpointInterval blocks to avoid
  13. // LoadValidators taking too much time.
  14. // https://github.com/tendermint/tendermint/pull/3438
  15. // 100000 results in ~ 100ms to get 100 validators (see BenchmarkLoadValidators)
  16. valSetCheckpointInterval = 100000
  17. )
  18. //------------------------------------------------------------------------
  19. func calcValidatorsKey(height int64) []byte {
  20. return []byte(fmt.Sprintf("validatorsKey:%v", height))
  21. }
  22. func calcConsensusParamsKey(height int64) []byte {
  23. return []byte(fmt.Sprintf("consensusParamsKey:%v", height))
  24. }
  25. func calcABCIResponsesKey(height int64) []byte {
  26. return []byte(fmt.Sprintf("abciResponsesKey:%v", height))
  27. }
  28. // LoadStateFromDBOrGenesisFile loads the most recent state from the database,
  29. // or creates a new one from the given genesisFilePath and persists the result
  30. // to the database.
  31. func LoadStateFromDBOrGenesisFile(stateDB dbm.DB, genesisFilePath string) (State, error) {
  32. state := LoadState(stateDB)
  33. if state.IsEmpty() {
  34. var err error
  35. state, err = MakeGenesisStateFromFile(genesisFilePath)
  36. if err != nil {
  37. return state, err
  38. }
  39. SaveState(stateDB, state)
  40. }
  41. return state, nil
  42. }
  43. // LoadStateFromDBOrGenesisDoc loads the most recent state from the database,
  44. // or creates a new one from the given genesisDoc and persists the result
  45. // to the database.
  46. func LoadStateFromDBOrGenesisDoc(stateDB dbm.DB, genesisDoc *types.GenesisDoc) (State, error) {
  47. state := LoadState(stateDB)
  48. if state.IsEmpty() {
  49. var err error
  50. state, err = MakeGenesisState(genesisDoc)
  51. if err != nil {
  52. return state, err
  53. }
  54. SaveState(stateDB, state)
  55. }
  56. return state, nil
  57. }
  58. // LoadState loads the State from the database.
  59. func LoadState(db dbm.DB) State {
  60. return loadState(db, stateKey)
  61. }
  62. func loadState(db dbm.DB, key []byte) (state State) {
  63. buf, err := db.Get(key)
  64. if err != nil {
  65. panic(err)
  66. }
  67. if len(buf) == 0 {
  68. return state
  69. }
  70. err = cdc.UnmarshalBinaryBare(buf, &state)
  71. if err != nil {
  72. // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED
  73. tmos.Exit(fmt.Sprintf(`LoadState: Data has been corrupted or its spec has changed:
  74. %v\n`, err))
  75. }
  76. // TODO: ensure that buf is completely read.
  77. return state
  78. }
  79. // SaveState persists the State, the ValidatorsInfo, and the ConsensusParamsInfo to the database.
  80. // This flushes the writes (e.g. calls SetSync).
  81. func SaveState(db dbm.DB, state State) {
  82. saveState(db, state, stateKey)
  83. }
  84. func saveState(db dbm.DB, state State, key []byte) {
  85. nextHeight := state.LastBlockHeight + 1
  86. // If first block, save validators for block 1.
  87. if nextHeight == 1 {
  88. // This extra logic due to Tendermint validator set changes being delayed 1 block.
  89. // It may get overwritten due to InitChain validator updates.
  90. lastHeightVoteChanged := int64(1)
  91. saveValidatorsInfo(db, nextHeight, lastHeightVoteChanged, state.Validators)
  92. }
  93. // Save next validators.
  94. saveValidatorsInfo(db, nextHeight+1, state.LastHeightValidatorsChanged, state.NextValidators)
  95. // Save next consensus params.
  96. saveConsensusParamsInfo(db, nextHeight, state.LastHeightConsensusParamsChanged, state.ConsensusParams)
  97. err := db.SetSync(key, state.Bytes())
  98. if err != nil {
  99. panic(err)
  100. }
  101. }
  102. // BootstrapState saves a new state, used e.g. by state sync when starting from non-zero height.
  103. func BootstrapState(db dbm.DB, state State) error {
  104. height := state.LastBlockHeight
  105. saveValidatorsInfo(db, height, height, state.LastValidators)
  106. saveValidatorsInfo(db, height+1, height+1, state.Validators)
  107. saveValidatorsInfo(db, height+2, height+2, state.NextValidators)
  108. saveConsensusParamsInfo(db, height+1, height+1, state.ConsensusParams)
  109. return db.SetSync(stateKey, state.Bytes())
  110. }
  111. //------------------------------------------------------------------------
  112. // ABCIResponses retains the responses
  113. // of the various ABCI calls during block processing.
  114. // It is persisted to disk for each height before calling Commit.
  115. type ABCIResponses struct {
  116. DeliverTxs []*abci.ResponseDeliverTx `json:"deliver_txs"`
  117. EndBlock *abci.ResponseEndBlock `json:"end_block"`
  118. BeginBlock *abci.ResponseBeginBlock `json:"begin_block"`
  119. }
  120. // PruneStates deletes states between the given heights (including from, excluding to). It is not
  121. // guaranteed to delete all states, since the last checkpointed state and states being pointed to by
  122. // e.g. `LastHeightChanged` must remain. The state at to must also exist.
  123. //
  124. // The from parameter is necessary since we can't do a key scan in a performant way due to the key
  125. // encoding not preserving ordering: https://github.com/tendermint/tendermint/issues/4567
  126. // This will cause some old states to be left behind when doing incremental partial prunes,
  127. // specifically older checkpoints and LastHeightChanged targets.
  128. func PruneStates(db dbm.DB, from int64, to int64) error {
  129. if from <= 0 || to <= 0 {
  130. return fmt.Errorf("from height %v and to height %v must be greater than 0", from, to)
  131. }
  132. if from >= to {
  133. return fmt.Errorf("from height %v must be lower than to height %v", from, to)
  134. }
  135. valInfo := loadValidatorsInfo(db, to)
  136. if valInfo == nil {
  137. return fmt.Errorf("validators at height %v not found", to)
  138. }
  139. paramsInfo := loadConsensusParamsInfo(db, to)
  140. if paramsInfo == nil {
  141. return fmt.Errorf("consensus params at height %v not found", to)
  142. }
  143. keepVals := make(map[int64]bool)
  144. if valInfo.ValidatorSet == nil {
  145. keepVals[valInfo.LastHeightChanged] = true
  146. keepVals[lastStoredHeightFor(to, valInfo.LastHeightChanged)] = true // keep last checkpoint too
  147. }
  148. keepParams := make(map[int64]bool)
  149. if paramsInfo.ConsensusParams.Equal(&tmproto.ConsensusParams{}) {
  150. keepParams[paramsInfo.LastHeightChanged] = true
  151. }
  152. batch := db.NewBatch()
  153. defer batch.Close()
  154. pruned := uint64(0)
  155. var err error
  156. // We have to delete in reverse order, to avoid deleting previous heights that have validator
  157. // sets and consensus params that we may need to retrieve.
  158. for h := to - 1; h >= from; h-- {
  159. // For heights we keep, we must make sure they have the full validator set or consensus
  160. // params, otherwise they will panic if they're retrieved directly (instead of
  161. // indirectly via a LastHeightChanged pointer).
  162. if keepVals[h] {
  163. v := loadValidatorsInfo(db, h)
  164. if v.ValidatorSet == nil {
  165. v.ValidatorSet, err = LoadValidators(db, h)
  166. if err != nil {
  167. return err
  168. }
  169. v.LastHeightChanged = h
  170. batch.Set(calcValidatorsKey(h), v.Bytes())
  171. }
  172. } else {
  173. batch.Delete(calcValidatorsKey(h))
  174. }
  175. if keepParams[h] {
  176. p := loadConsensusParamsInfo(db, h)
  177. if p.ConsensusParams.Equal(&tmproto.ConsensusParams{}) {
  178. p.ConsensusParams, err = LoadConsensusParams(db, h)
  179. if err != nil {
  180. return err
  181. }
  182. p.LastHeightChanged = h
  183. batch.Set(calcConsensusParamsKey(h), p.Bytes())
  184. }
  185. } else {
  186. batch.Delete(calcConsensusParamsKey(h))
  187. }
  188. batch.Delete(calcABCIResponsesKey(h))
  189. pruned++
  190. // avoid batches growing too large by flushing to database regularly
  191. if pruned%1000 == 0 && pruned > 0 {
  192. err := batch.Write()
  193. if err != nil {
  194. return err
  195. }
  196. batch.Close()
  197. batch = db.NewBatch()
  198. defer batch.Close()
  199. }
  200. }
  201. err = batch.WriteSync()
  202. if err != nil {
  203. return err
  204. }
  205. return nil
  206. }
  207. // NewABCIResponses returns a new ABCIResponses
  208. func NewABCIResponses(block *types.Block) *ABCIResponses {
  209. resDeliverTxs := make([]*abci.ResponseDeliverTx, len(block.Data.Txs))
  210. if len(block.Data.Txs) == 0 {
  211. // This makes Amino encoding/decoding consistent.
  212. resDeliverTxs = nil
  213. }
  214. return &ABCIResponses{
  215. DeliverTxs: resDeliverTxs,
  216. }
  217. }
  218. // Bytes serializes the ABCIResponse using go-amino.
  219. func (arz *ABCIResponses) Bytes() []byte {
  220. return cdc.MustMarshalBinaryBare(arz)
  221. }
  222. func (arz *ABCIResponses) ResultsHash() []byte {
  223. results := types.NewResults(arz.DeliverTxs)
  224. return results.Hash()
  225. }
  226. // LoadABCIResponses loads the ABCIResponses for the given height from the database.
  227. // This is useful for recovering from crashes where we called app.Commit and before we called
  228. // s.Save(). It can also be used to produce Merkle proofs of the result of txs.
  229. func LoadABCIResponses(db dbm.DB, height int64) (*ABCIResponses, error) {
  230. buf, err := db.Get(calcABCIResponsesKey(height))
  231. if err != nil {
  232. return nil, err
  233. }
  234. if len(buf) == 0 {
  235. return nil, ErrNoABCIResponsesForHeight{height}
  236. }
  237. abciResponses := new(ABCIResponses)
  238. err = cdc.UnmarshalBinaryBare(buf, abciResponses)
  239. if err != nil {
  240. // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED
  241. tmos.Exit(fmt.Sprintf(`LoadABCIResponses: Data has been corrupted or its spec has
  242. changed: %v\n`, err))
  243. }
  244. // TODO: ensure that buf is completely read.
  245. return abciResponses, nil
  246. }
  247. // SaveABCIResponses persists the ABCIResponses to the database.
  248. // This is useful in case we crash after app.Commit and before s.Save().
  249. // Responses are indexed by height so they can also be loaded later to produce
  250. // Merkle proofs.
  251. //
  252. // Exposed for testing.
  253. func SaveABCIResponses(db dbm.DB, height int64, abciResponses *ABCIResponses) {
  254. db.SetSync(calcABCIResponsesKey(height), abciResponses.Bytes())
  255. }
  256. //-----------------------------------------------------------------------------
  257. // ValidatorsInfo represents the latest validator set, or the last height it changed
  258. type ValidatorsInfo struct {
  259. ValidatorSet *types.ValidatorSet
  260. LastHeightChanged int64
  261. }
  262. // Bytes serializes the ValidatorsInfo using go-amino.
  263. func (valInfo *ValidatorsInfo) Bytes() []byte {
  264. return cdc.MustMarshalBinaryBare(valInfo)
  265. }
  266. // LoadValidators loads the ValidatorSet for a given height.
  267. // Returns ErrNoValSetForHeight if the validator set can't be found for this height.
  268. func LoadValidators(db dbm.DB, height int64) (*types.ValidatorSet, error) {
  269. valInfo := loadValidatorsInfo(db, height)
  270. if valInfo == nil {
  271. return nil, ErrNoValSetForHeight{height}
  272. }
  273. if valInfo.ValidatorSet == nil {
  274. lastStoredHeight := lastStoredHeightFor(height, valInfo.LastHeightChanged)
  275. valInfo2 := loadValidatorsInfo(db, lastStoredHeight)
  276. if valInfo2 == nil || valInfo2.ValidatorSet == nil {
  277. panic(
  278. fmt.Sprintf("Couldn't find validators at height %d (height %d was originally requested)",
  279. lastStoredHeight,
  280. height,
  281. ),
  282. )
  283. }
  284. valInfo2.ValidatorSet.IncrementProposerPriority(tmmath.SafeConvertInt32(height - lastStoredHeight)) // mutate
  285. valInfo = valInfo2
  286. }
  287. return valInfo.ValidatorSet, nil
  288. }
  289. func lastStoredHeightFor(height, lastHeightChanged int64) int64 {
  290. checkpointHeight := height - height%valSetCheckpointInterval
  291. return tmmath.MaxInt64(checkpointHeight, lastHeightChanged)
  292. }
  293. // CONTRACT: Returned ValidatorsInfo can be mutated.
  294. func loadValidatorsInfo(db dbm.DB, height int64) *ValidatorsInfo {
  295. buf, err := db.Get(calcValidatorsKey(height))
  296. if err != nil {
  297. panic(err)
  298. }
  299. if len(buf) == 0 {
  300. return nil
  301. }
  302. v := new(ValidatorsInfo)
  303. err = cdc.UnmarshalBinaryBare(buf, v)
  304. if err != nil {
  305. // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED
  306. tmos.Exit(fmt.Sprintf(`LoadValidators: Data has been corrupted or its spec has changed:
  307. %v\n`, err))
  308. }
  309. // TODO: ensure that buf is completely read.
  310. return v
  311. }
  312. // saveValidatorsInfo persists the validator set.
  313. //
  314. // `height` is the effective height for which the validator is responsible for
  315. // signing. It should be called from s.Save(), right before the state itself is
  316. // persisted.
  317. func saveValidatorsInfo(db dbm.DB, height, lastHeightChanged int64, valSet *types.ValidatorSet) {
  318. if lastHeightChanged > height {
  319. panic("LastHeightChanged cannot be greater than ValidatorsInfo height")
  320. }
  321. valInfo := &ValidatorsInfo{
  322. LastHeightChanged: lastHeightChanged,
  323. }
  324. // Only persist validator set if it was updated or checkpoint height (see
  325. // valSetCheckpointInterval) is reached.
  326. if height == lastHeightChanged || height%valSetCheckpointInterval == 0 {
  327. valInfo.ValidatorSet = valSet
  328. }
  329. db.Set(calcValidatorsKey(height), valInfo.Bytes())
  330. }
  331. //-----------------------------------------------------------------------------
  332. // ConsensusParamsInfo represents the latest consensus params, or the last height it changed
  333. type ConsensusParamsInfo struct {
  334. ConsensusParams tmproto.ConsensusParams
  335. LastHeightChanged int64
  336. }
  337. // Bytes serializes the ConsensusParamsInfo using go-amino.
  338. func (params ConsensusParamsInfo) Bytes() []byte {
  339. return cdc.MustMarshalBinaryBare(params)
  340. }
  341. // LoadConsensusParams loads the ConsensusParams for a given height.
  342. func LoadConsensusParams(db dbm.DB, height int64) (tmproto.ConsensusParams, error) {
  343. empty := tmproto.ConsensusParams{}
  344. paramsInfo := loadConsensusParamsInfo(db, height)
  345. if paramsInfo == nil {
  346. return empty, ErrNoConsensusParamsForHeight{height}
  347. }
  348. if paramsInfo.ConsensusParams.Equal(&empty) {
  349. paramsInfo2 := loadConsensusParamsInfo(db, paramsInfo.LastHeightChanged)
  350. if paramsInfo2 == nil {
  351. panic(
  352. fmt.Sprintf(
  353. "Couldn't find consensus params at height %d as last changed from height %d",
  354. paramsInfo.LastHeightChanged,
  355. height,
  356. ),
  357. )
  358. }
  359. paramsInfo = paramsInfo2
  360. }
  361. return paramsInfo.ConsensusParams, nil
  362. }
  363. func loadConsensusParamsInfo(db dbm.DB, height int64) *ConsensusParamsInfo {
  364. buf, err := db.Get(calcConsensusParamsKey(height))
  365. if err != nil {
  366. panic(err)
  367. }
  368. if len(buf) == 0 {
  369. return nil
  370. }
  371. paramsInfo := new(ConsensusParamsInfo)
  372. err = cdc.UnmarshalBinaryBare(buf, paramsInfo)
  373. if err != nil {
  374. // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED
  375. tmos.Exit(fmt.Sprintf(`LoadConsensusParams: Data has been corrupted or its spec has changed:
  376. %v\n`, err))
  377. }
  378. // TODO: ensure that buf is completely read.
  379. return paramsInfo
  380. }
  381. // saveConsensusParamsInfo persists the consensus params for the next block to disk.
  382. // It should be called from s.Save(), right before the state itself is persisted.
  383. // If the consensus params did not change after processing the latest block,
  384. // only the last height for which they changed is persisted.
  385. func saveConsensusParamsInfo(db dbm.DB, nextHeight, changeHeight int64, params tmproto.ConsensusParams) {
  386. paramsInfo := &ConsensusParamsInfo{
  387. LastHeightChanged: changeHeight,
  388. }
  389. if changeHeight == nextHeight {
  390. paramsInfo.ConsensusParams = params
  391. }
  392. db.Set(calcConsensusParamsKey(nextHeight), paramsInfo.Bytes())
  393. }