You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

493 lines
12 KiB

  1. package kv
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sort"
  7. "strconv"
  8. "strings"
  9. "github.com/google/orderedcode"
  10. dbm "github.com/tendermint/tm-db"
  11. abci "github.com/tendermint/tendermint/abci/types"
  12. "github.com/tendermint/tendermint/libs/pubsub/query"
  13. "github.com/tendermint/tendermint/state/indexer"
  14. "github.com/tendermint/tendermint/types"
  15. )
  16. var _ indexer.BlockIndexer = (*BlockerIndexer)(nil)
  17. // BlockerIndexer implements a block indexer, indexing BeginBlock and EndBlock
  18. // events with an underlying KV store. Block events are indexed by their height,
  19. // such that matching search criteria returns the respective block height(s).
  20. type BlockerIndexer struct {
  21. store dbm.DB
  22. }
  23. func New(store dbm.DB) *BlockerIndexer {
  24. return &BlockerIndexer{
  25. store: store,
  26. }
  27. }
  28. // Has returns true if the given height has been indexed. An error is returned
  29. // upon database query failure.
  30. func (idx *BlockerIndexer) Has(height int64) (bool, error) {
  31. key, err := heightKey(height)
  32. if err != nil {
  33. return false, fmt.Errorf("failed to create block height index key: %w", err)
  34. }
  35. return idx.store.Has(key)
  36. }
  37. // Index indexes BeginBlock and EndBlock events for a given block by its height.
  38. // The following is indexed:
  39. //
  40. // primary key: encode(block.height | height) => encode(height)
  41. // BeginBlock events: encode(eventType.eventAttr|eventValue|height|begin_block) => encode(height)
  42. // EndBlock events: encode(eventType.eventAttr|eventValue|height|end_block) => encode(height)
  43. func (idx *BlockerIndexer) Index(bh types.EventDataNewBlockHeader) error {
  44. batch := idx.store.NewBatch()
  45. defer batch.Close()
  46. height := bh.Header.Height
  47. // 1. index by height
  48. key, err := heightKey(height)
  49. if err != nil {
  50. return fmt.Errorf("failed to create block height index key: %w", err)
  51. }
  52. if err := batch.Set(key, int64ToBytes(height)); err != nil {
  53. return err
  54. }
  55. // 2. index BeginBlock events
  56. if err := idx.indexEvents(batch, bh.ResultBeginBlock.Events, "begin_block", height); err != nil {
  57. return fmt.Errorf("failed to index BeginBlock events: %w", err)
  58. }
  59. // 3. index EndBlock events
  60. if err := idx.indexEvents(batch, bh.ResultEndBlock.Events, "end_block", height); err != nil {
  61. return fmt.Errorf("failed to index EndBlock events: %w", err)
  62. }
  63. return batch.WriteSync()
  64. }
  65. // Search performs a query for block heights that match a given BeginBlock
  66. // and Endblock event search criteria. The given query can match against zero,
  67. // one or more block heights. In the case of height queries, i.e. block.height=H,
  68. // if the height is indexed, that height alone will be returned. An error and
  69. // nil slice is returned. Otherwise, a non-nil slice and nil error is returned.
  70. func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64, error) {
  71. results := make([]int64, 0)
  72. select {
  73. case <-ctx.Done():
  74. return results, nil
  75. default:
  76. }
  77. conditions, err := q.Conditions()
  78. if err != nil {
  79. return nil, fmt.Errorf("failed to parse query conditions: %w", err)
  80. }
  81. // If there is an exact height query, return the result immediately
  82. // (if it exists).
  83. height, ok := lookForHeight(conditions)
  84. if ok {
  85. ok, err := idx.Has(height)
  86. if err != nil {
  87. return nil, err
  88. }
  89. if ok {
  90. return []int64{height}, nil
  91. }
  92. return results, nil
  93. }
  94. var heightsInitialized bool
  95. filteredHeights := make(map[string][]byte)
  96. // conditions to skip because they're handled before "everything else"
  97. skipIndexes := make([]int, 0)
  98. // Extract ranges. If both upper and lower bounds exist, it's better to get
  99. // them in order as to not iterate over kvs that are not within range.
  100. ranges, rangeIndexes := indexer.LookForRanges(conditions)
  101. if len(ranges) > 0 {
  102. skipIndexes = append(skipIndexes, rangeIndexes...)
  103. for _, qr := range ranges {
  104. prefix, err := orderedcode.Append(nil, qr.Key)
  105. if err != nil {
  106. return nil, fmt.Errorf("failed to create prefix key: %w", err)
  107. }
  108. if !heightsInitialized {
  109. filteredHeights, err = idx.matchRange(ctx, qr, prefix, filteredHeights, true)
  110. if err != nil {
  111. return nil, err
  112. }
  113. heightsInitialized = true
  114. // Ignore any remaining conditions if the first condition resulted in no
  115. // matches (assuming implicit AND operand).
  116. if len(filteredHeights) == 0 {
  117. break
  118. }
  119. } else {
  120. filteredHeights, err = idx.matchRange(ctx, qr, prefix, filteredHeights, false)
  121. if err != nil {
  122. return nil, err
  123. }
  124. }
  125. }
  126. }
  127. // for all other conditions
  128. for i, c := range conditions {
  129. if intInSlice(i, skipIndexes) {
  130. continue
  131. }
  132. startKey, err := orderedcode.Append(nil, c.CompositeKey, fmt.Sprintf("%v", c.Operand))
  133. if err != nil {
  134. return nil, err
  135. }
  136. if !heightsInitialized {
  137. filteredHeights, err = idx.match(ctx, c, startKey, filteredHeights, true)
  138. if err != nil {
  139. return nil, err
  140. }
  141. heightsInitialized = true
  142. // Ignore any remaining conditions if the first condition resulted in no
  143. // matches (assuming implicit AND operand).
  144. if len(filteredHeights) == 0 {
  145. break
  146. }
  147. } else {
  148. filteredHeights, err = idx.match(ctx, c, startKey, filteredHeights, false)
  149. if err != nil {
  150. return nil, err
  151. }
  152. }
  153. }
  154. // fetch matching heights
  155. results = make([]int64, 0, len(filteredHeights))
  156. heights:
  157. for _, hBz := range filteredHeights {
  158. h := int64FromBytes(hBz)
  159. ok, err := idx.Has(h)
  160. if err != nil {
  161. return nil, err
  162. }
  163. if ok {
  164. results = append(results, h)
  165. }
  166. select {
  167. case <-ctx.Done():
  168. break heights
  169. default:
  170. }
  171. }
  172. sort.Slice(results, func(i, j int) bool { return results[i] < results[j] })
  173. return results, nil
  174. }
  175. // matchRange returns all matching block heights that match a given QueryRange
  176. // and start key. An already filtered result (filteredHeights) is provided such
  177. // that any non-intersecting matches are removed.
  178. //
  179. // NOTE: The provided filteredHeights may be empty if no previous condition has
  180. // matched.
  181. func (idx *BlockerIndexer) matchRange(
  182. ctx context.Context,
  183. qr indexer.QueryRange,
  184. startKey []byte,
  185. filteredHeights map[string][]byte,
  186. firstRun bool,
  187. ) (map[string][]byte, error) {
  188. // A previous match was attempted but resulted in no matches, so we return
  189. // no matches (assuming AND operand).
  190. if !firstRun && len(filteredHeights) == 0 {
  191. return filteredHeights, nil
  192. }
  193. tmpHeights := make(map[string][]byte)
  194. lowerBound := qr.LowerBoundValue()
  195. upperBound := qr.UpperBoundValue()
  196. it, err := dbm.IteratePrefix(idx.store, startKey)
  197. if err != nil {
  198. return nil, fmt.Errorf("failed to create prefix iterator: %w", err)
  199. }
  200. defer it.Close()
  201. iter:
  202. for ; it.Valid(); it.Next() {
  203. var (
  204. eventValue string
  205. err error
  206. )
  207. if qr.Key == types.BlockHeightKey {
  208. eventValue, err = parseValueFromPrimaryKey(it.Key())
  209. } else {
  210. eventValue, err = parseValueFromEventKey(it.Key())
  211. }
  212. if err != nil {
  213. continue
  214. }
  215. if _, ok := qr.AnyBound().(int64); ok {
  216. v, err := strconv.ParseInt(eventValue, 10, 64)
  217. if err != nil {
  218. continue iter
  219. }
  220. include := true
  221. if lowerBound != nil && v < lowerBound.(int64) {
  222. include = false
  223. }
  224. if upperBound != nil && v > upperBound.(int64) {
  225. include = false
  226. }
  227. if include {
  228. tmpHeights[string(it.Value())] = it.Value()
  229. }
  230. }
  231. select {
  232. case <-ctx.Done():
  233. break iter
  234. default:
  235. }
  236. }
  237. if err := it.Error(); err != nil {
  238. return nil, err
  239. }
  240. if len(tmpHeights) == 0 || firstRun {
  241. // Either:
  242. //
  243. // 1. Regardless if a previous match was attempted, which may have had
  244. // results, but no match was found for the current condition, then we
  245. // return no matches (assuming AND operand).
  246. //
  247. // 2. A previous match was not attempted, so we return all results.
  248. return tmpHeights, nil
  249. }
  250. // Remove/reduce matches in filteredHashes that were not found in this
  251. // match (tmpHashes).
  252. for k := range filteredHeights {
  253. if tmpHeights[k] == nil {
  254. delete(filteredHeights, k)
  255. select {
  256. case <-ctx.Done():
  257. break
  258. default:
  259. }
  260. }
  261. }
  262. return filteredHeights, nil
  263. }
  264. // match returns all matching heights that meet a given query condition and start
  265. // key. An already filtered result (filteredHeights) is provided such that any
  266. // non-intersecting matches are removed.
  267. //
  268. // NOTE: The provided filteredHeights may be empty if no previous condition has
  269. // matched.
  270. func (idx *BlockerIndexer) match(
  271. ctx context.Context,
  272. c query.Condition,
  273. startKeyBz []byte,
  274. filteredHeights map[string][]byte,
  275. firstRun bool,
  276. ) (map[string][]byte, error) {
  277. // A previous match was attempted but resulted in no matches, so we return
  278. // no matches (assuming AND operand).
  279. if !firstRun && len(filteredHeights) == 0 {
  280. return filteredHeights, nil
  281. }
  282. tmpHeights := make(map[string][]byte)
  283. switch {
  284. case c.Op == query.OpEqual:
  285. it, err := dbm.IteratePrefix(idx.store, startKeyBz)
  286. if err != nil {
  287. return nil, fmt.Errorf("failed to create prefix iterator: %w", err)
  288. }
  289. defer it.Close()
  290. for ; it.Valid(); it.Next() {
  291. tmpHeights[string(it.Value())] = it.Value()
  292. if err := ctx.Err(); err != nil {
  293. break
  294. }
  295. }
  296. if err := it.Error(); err != nil {
  297. return nil, err
  298. }
  299. case c.Op == query.OpExists:
  300. prefix, err := orderedcode.Append(nil, c.CompositeKey)
  301. if err != nil {
  302. return nil, err
  303. }
  304. it, err := dbm.IteratePrefix(idx.store, prefix)
  305. if err != nil {
  306. return nil, fmt.Errorf("failed to create prefix iterator: %w", err)
  307. }
  308. defer it.Close()
  309. iterExists:
  310. for ; it.Valid(); it.Next() {
  311. tmpHeights[string(it.Value())] = it.Value()
  312. select {
  313. case <-ctx.Done():
  314. break iterExists
  315. default:
  316. }
  317. }
  318. if err := it.Error(); err != nil {
  319. return nil, err
  320. }
  321. case c.Op == query.OpContains:
  322. prefix, err := orderedcode.Append(nil, c.CompositeKey)
  323. if err != nil {
  324. return nil, err
  325. }
  326. it, err := dbm.IteratePrefix(idx.store, prefix)
  327. if err != nil {
  328. return nil, fmt.Errorf("failed to create prefix iterator: %w", err)
  329. }
  330. defer it.Close()
  331. iterContains:
  332. for ; it.Valid(); it.Next() {
  333. eventValue, err := parseValueFromEventKey(it.Key())
  334. if err != nil {
  335. continue
  336. }
  337. if strings.Contains(eventValue, c.Operand.(string)) {
  338. tmpHeights[string(it.Value())] = it.Value()
  339. }
  340. select {
  341. case <-ctx.Done():
  342. break iterContains
  343. default:
  344. }
  345. }
  346. if err := it.Error(); err != nil {
  347. return nil, err
  348. }
  349. default:
  350. return nil, errors.New("other operators should be handled already")
  351. }
  352. if len(tmpHeights) == 0 || firstRun {
  353. // Either:
  354. //
  355. // 1. Regardless if a previous match was attempted, which may have had
  356. // results, but no match was found for the current condition, then we
  357. // return no matches (assuming AND operand).
  358. //
  359. // 2. A previous match was not attempted, so we return all results.
  360. return tmpHeights, nil
  361. }
  362. // Remove/reduce matches in filteredHeights that were not found in this
  363. // match (tmpHeights).
  364. for k := range filteredHeights {
  365. if tmpHeights[k] == nil {
  366. delete(filteredHeights, k)
  367. select {
  368. case <-ctx.Done():
  369. break
  370. default:
  371. }
  372. }
  373. }
  374. return filteredHeights, nil
  375. }
  376. func (idx *BlockerIndexer) indexEvents(batch dbm.Batch, events []abci.Event, typ string, height int64) error {
  377. heightBz := int64ToBytes(height)
  378. for _, event := range events {
  379. // only index events with a non-empty type
  380. if len(event.Type) == 0 {
  381. continue
  382. }
  383. for _, attr := range event.Attributes {
  384. if len(attr.Key) == 0 {
  385. continue
  386. }
  387. // index iff the event specified index:true and it's not a reserved event
  388. compositeKey := fmt.Sprintf("%s.%s", event.Type, attr.Key)
  389. if compositeKey == types.BlockHeightKey {
  390. return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeKey)
  391. }
  392. if attr.GetIndex() {
  393. key, err := eventKey(compositeKey, typ, attr.Value, height)
  394. if err != nil {
  395. return fmt.Errorf("failed to create block index key: %w", err)
  396. }
  397. if err := batch.Set(key, heightBz); err != nil {
  398. return err
  399. }
  400. }
  401. }
  402. }
  403. return nil
  404. }