You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

271 lines
6.6 KiB

  1. package lite
  2. import (
  3. "fmt"
  4. "regexp"
  5. "strconv"
  6. amino "github.com/tendermint/go-amino"
  7. cryptoAmino "github.com/tendermint/tendermint/crypto/encoding/amino"
  8. dbm "github.com/tendermint/tendermint/libs/db"
  9. log "github.com/tendermint/tendermint/libs/log"
  10. lerr "github.com/tendermint/tendermint/lite/errors"
  11. "github.com/tendermint/tendermint/types"
  12. )
  13. var _ PersistentProvider = (*DBProvider)(nil)
  14. // DBProvider stores commits and validator sets in a DB.
  15. type DBProvider struct {
  16. logger log.Logger
  17. label string
  18. db dbm.DB
  19. cdc *amino.Codec
  20. limit int
  21. }
  22. func NewDBProvider(label string, db dbm.DB) *DBProvider {
  23. // NOTE: when debugging, this type of construction might be useful.
  24. //db = dbm.NewDebugDB("db provider "+cmn.RandStr(4), db)
  25. cdc := amino.NewCodec()
  26. cryptoAmino.RegisterAmino(cdc)
  27. dbp := &DBProvider{
  28. logger: log.NewNopLogger(),
  29. label: label,
  30. db: db,
  31. cdc: cdc,
  32. }
  33. return dbp
  34. }
  35. func (dbp *DBProvider) SetLogger(logger log.Logger) {
  36. dbp.logger = logger.With("label", dbp.label)
  37. }
  38. func (dbp *DBProvider) SetLimit(limit int) *DBProvider {
  39. dbp.limit = limit
  40. return dbp
  41. }
  42. // Implements PersistentProvider.
  43. func (dbp *DBProvider) SaveFullCommit(fc FullCommit) error {
  44. dbp.logger.Info("DBProvider.SaveFullCommit()...", "fc", fc)
  45. batch := dbp.db.NewBatch()
  46. // Save the fc.validators.
  47. // We might be overwriting what we already have, but
  48. // it makes the logic easier for now.
  49. vsKey := validatorSetKey(fc.ChainID(), fc.Height())
  50. vsBz, err := dbp.cdc.MarshalBinaryLengthPrefixed(fc.Validators)
  51. if err != nil {
  52. return err
  53. }
  54. batch.Set(vsKey, vsBz)
  55. // Save the fc.NextValidators.
  56. nvsKey := validatorSetKey(fc.ChainID(), fc.Height()+1)
  57. nvsBz, err := dbp.cdc.MarshalBinaryLengthPrefixed(fc.NextValidators)
  58. if err != nil {
  59. return err
  60. }
  61. batch.Set(nvsKey, nvsBz)
  62. // Save the fc.SignedHeader
  63. shKey := signedHeaderKey(fc.ChainID(), fc.Height())
  64. shBz, err := dbp.cdc.MarshalBinaryLengthPrefixed(fc.SignedHeader)
  65. if err != nil {
  66. return err
  67. }
  68. batch.Set(shKey, shBz)
  69. // And write sync.
  70. batch.WriteSync()
  71. // Garbage collect.
  72. // TODO: optimize later.
  73. if dbp.limit > 0 {
  74. dbp.deleteAfterN(fc.ChainID(), dbp.limit)
  75. }
  76. return nil
  77. }
  78. // Implements Provider.
  79. func (dbp *DBProvider) LatestFullCommit(chainID string, minHeight, maxHeight int64) (
  80. FullCommit, error) {
  81. dbp.logger.Info("DBProvider.LatestFullCommit()...",
  82. "chainID", chainID, "minHeight", minHeight, "maxHeight", maxHeight)
  83. if minHeight <= 0 {
  84. minHeight = 1
  85. }
  86. if maxHeight == 0 {
  87. maxHeight = 1<<63 - 1
  88. }
  89. itr := dbp.db.ReverseIterator(
  90. signedHeaderKey(chainID, minHeight),
  91. append(signedHeaderKey(chainID, maxHeight), byte(0x00)),
  92. )
  93. defer itr.Close()
  94. for itr.Valid() {
  95. key := itr.Key()
  96. _, _, ok := parseSignedHeaderKey(key)
  97. if !ok {
  98. // Skip over other keys.
  99. itr.Next()
  100. continue
  101. } else {
  102. // Found the latest full commit signed header.
  103. shBz := itr.Value()
  104. sh := types.SignedHeader{}
  105. err := dbp.cdc.UnmarshalBinaryLengthPrefixed(shBz, &sh)
  106. if err != nil {
  107. return FullCommit{}, err
  108. } else {
  109. lfc, err := dbp.fillFullCommit(sh)
  110. if err == nil {
  111. dbp.logger.Info("DBProvider.LatestFullCommit() found latest.", "height", lfc.Height())
  112. return lfc, nil
  113. } else {
  114. dbp.logger.Error("DBProvider.LatestFullCommit() got error", "lfc", lfc)
  115. dbp.logger.Error(fmt.Sprintf("%+v", err))
  116. return lfc, err
  117. }
  118. }
  119. }
  120. }
  121. return FullCommit{}, lerr.ErrCommitNotFound()
  122. }
  123. func (dbp *DBProvider) ValidatorSet(chainID string, height int64) (valset *types.ValidatorSet, err error) {
  124. return dbp.getValidatorSet(chainID, height)
  125. }
  126. func (dbp *DBProvider) getValidatorSet(chainID string, height int64) (valset *types.ValidatorSet, err error) {
  127. vsBz := dbp.db.Get(validatorSetKey(chainID, height))
  128. if vsBz == nil {
  129. err = lerr.ErrUnknownValidators(chainID, height)
  130. return
  131. }
  132. err = dbp.cdc.UnmarshalBinaryLengthPrefixed(vsBz, &valset)
  133. if err != nil {
  134. return
  135. }
  136. // To test deep equality. This makes it easier to test for e.g. valset
  137. // equivalence using assert.Equal (tests for deep equality) in our tests,
  138. // which also tests for unexported/private field equivalence.
  139. valset.TotalVotingPower()
  140. return
  141. }
  142. func (dbp *DBProvider) fillFullCommit(sh types.SignedHeader) (FullCommit, error) {
  143. var chainID = sh.ChainID
  144. var height = sh.Height
  145. var valset, nextValset *types.ValidatorSet
  146. // Load the validator set.
  147. valset, err := dbp.getValidatorSet(chainID, height)
  148. if err != nil {
  149. return FullCommit{}, err
  150. }
  151. // Load the next validator set.
  152. nextValset, err = dbp.getValidatorSet(chainID, height+1)
  153. if err != nil {
  154. return FullCommit{}, err
  155. }
  156. // Return filled FullCommit.
  157. return FullCommit{
  158. SignedHeader: sh,
  159. Validators: valset,
  160. NextValidators: nextValset,
  161. }, nil
  162. }
  163. func (dbp *DBProvider) deleteAfterN(chainID string, after int) error {
  164. dbp.logger.Info("DBProvider.deleteAfterN()...", "chainID", chainID, "after", after)
  165. itr := dbp.db.ReverseIterator(
  166. signedHeaderKey(chainID, 1),
  167. append(signedHeaderKey(chainID, 1<<63-1), byte(0x00)),
  168. )
  169. defer itr.Close()
  170. var lastHeight int64 = 1<<63 - 1
  171. var numSeen = 0
  172. var numDeleted = 0
  173. for itr.Valid() {
  174. key := itr.Key()
  175. _, height, ok := parseChainKeyPrefix(key)
  176. if !ok {
  177. return fmt.Errorf("unexpected key %v", key)
  178. } else {
  179. if height < lastHeight {
  180. lastHeight = height
  181. numSeen += 1
  182. }
  183. if numSeen > after {
  184. dbp.db.Delete(key)
  185. numDeleted += 1
  186. }
  187. }
  188. itr.Next()
  189. }
  190. dbp.logger.Info(fmt.Sprintf("DBProvider.deleteAfterN() deleted %v items", numDeleted))
  191. return nil
  192. }
  193. //----------------------------------------
  194. // key encoding
  195. func signedHeaderKey(chainID string, height int64) []byte {
  196. return []byte(fmt.Sprintf("%s/%010d/sh", chainID, height))
  197. }
  198. func validatorSetKey(chainID string, height int64) []byte {
  199. return []byte(fmt.Sprintf("%s/%010d/vs", chainID, height))
  200. }
  201. //----------------------------------------
  202. // key parsing
  203. var keyPattern = regexp.MustCompile(`^([^/]+)/([0-9]*)/(.*)$`)
  204. func parseKey(key []byte) (chainID string, height int64, part string, ok bool) {
  205. submatch := keyPattern.FindSubmatch(key)
  206. if submatch == nil {
  207. return "", 0, "", false
  208. }
  209. chainID = string(submatch[1])
  210. heightStr := string(submatch[2])
  211. heightInt, err := strconv.Atoi(heightStr)
  212. if err != nil {
  213. return "", 0, "", false
  214. }
  215. height = int64(heightInt)
  216. part = string(submatch[3])
  217. ok = true // good!
  218. return
  219. }
  220. func parseSignedHeaderKey(key []byte) (chainID string, height int64, ok bool) {
  221. chainID, height, part, ok := parseKey(key)
  222. if part != "sh" {
  223. return "", 0, false
  224. }
  225. return chainID, height, true
  226. }
  227. func parseChainKeyPrefix(key []byte) (chainID string, height int64, ok bool) {
  228. chainID, height, _, ok = parseKey(key)
  229. return chainID, height, true
  230. }