You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

469 lines
11 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package kv
  2. import (
  3. "bytes"
  4. "encoding/hex"
  5. "fmt"
  6. "sort"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "github.com/pkg/errors"
  11. cmn "github.com/tendermint/tendermint/libs/common"
  12. dbm "github.com/tendermint/tendermint/libs/db"
  13. "github.com/tendermint/tendermint/libs/pubsub/query"
  14. "github.com/tendermint/tendermint/state/txindex"
  15. "github.com/tendermint/tendermint/types"
  16. )
  17. const (
  18. tagKeySeparator = "/"
  19. )
  20. var _ txindex.TxIndexer = (*TxIndex)(nil)
  21. // TxIndex is the simplest possible indexer, backed by key-value storage (levelDB).
  22. type TxIndex struct {
  23. store dbm.DB
  24. tagsToIndex []string
  25. indexAllTags bool
  26. }
  27. // NewTxIndex creates new KV indexer.
  28. func NewTxIndex(store dbm.DB, options ...func(*TxIndex)) *TxIndex {
  29. txi := &TxIndex{store: store, tagsToIndex: make([]string, 0), indexAllTags: false}
  30. for _, o := range options {
  31. o(txi)
  32. }
  33. return txi
  34. }
  35. // IndexTags is an option for setting which tags to index.
  36. func IndexTags(tags []string) func(*TxIndex) {
  37. return func(txi *TxIndex) {
  38. txi.tagsToIndex = tags
  39. }
  40. }
  41. // IndexAllTags is an option for indexing all tags.
  42. func IndexAllTags() func(*TxIndex) {
  43. return func(txi *TxIndex) {
  44. txi.indexAllTags = true
  45. }
  46. }
  47. // Get gets transaction from the TxIndex storage and returns it or nil if the
  48. // transaction is not found.
  49. func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
  50. if len(hash) == 0 {
  51. return nil, txindex.ErrorEmptyHash
  52. }
  53. rawBytes := txi.store.Get(hash)
  54. if rawBytes == nil {
  55. return nil, nil
  56. }
  57. txResult := new(types.TxResult)
  58. err := cdc.UnmarshalBinaryBare(rawBytes, &txResult)
  59. if err != nil {
  60. return nil, fmt.Errorf("Error reading TxResult: %v", err)
  61. }
  62. return txResult, nil
  63. }
  64. // AddBatch indexes a batch of transactions using the given list of tags.
  65. func (txi *TxIndex) AddBatch(b *txindex.Batch) error {
  66. storeBatch := txi.store.NewBatch()
  67. defer storeBatch.Close()
  68. for _, result := range b.Ops {
  69. hash := result.Tx.Hash()
  70. // index tx by tags
  71. for _, tag := range result.Result.Tags {
  72. if txi.indexAllTags || cmn.StringInSlice(string(tag.Key), txi.tagsToIndex) {
  73. storeBatch.Set(keyForTag(tag, result), hash)
  74. }
  75. }
  76. // index tx by height
  77. if txi.indexAllTags || cmn.StringInSlice(types.TxHeightKey, txi.tagsToIndex) {
  78. storeBatch.Set(keyForHeight(result), hash)
  79. }
  80. // index tx by hash
  81. rawBytes, err := cdc.MarshalBinaryBare(result)
  82. if err != nil {
  83. return err
  84. }
  85. storeBatch.Set(hash, rawBytes)
  86. }
  87. storeBatch.Write()
  88. return nil
  89. }
  90. // Index indexes a single transaction using the given list of tags.
  91. func (txi *TxIndex) Index(result *types.TxResult) error {
  92. b := txi.store.NewBatch()
  93. defer b.Close()
  94. hash := result.Tx.Hash()
  95. // index tx by tags
  96. for _, tag := range result.Result.Tags {
  97. if txi.indexAllTags || cmn.StringInSlice(string(tag.Key), txi.tagsToIndex) {
  98. b.Set(keyForTag(tag, result), hash)
  99. }
  100. }
  101. // index tx by height
  102. if txi.indexAllTags || cmn.StringInSlice(types.TxHeightKey, txi.tagsToIndex) {
  103. b.Set(keyForHeight(result), hash)
  104. }
  105. // index tx by hash
  106. rawBytes, err := cdc.MarshalBinaryBare(result)
  107. if err != nil {
  108. return err
  109. }
  110. b.Set(hash, rawBytes)
  111. b.Write()
  112. return nil
  113. }
  114. // Search performs a search using the given query. It breaks the query into
  115. // conditions (like "tx.height > 5"). For each condition, it queries the DB
  116. // index. One special use cases here: (1) if "tx.hash" is found, it returns tx
  117. // result for it (2) for range queries it is better for the client to provide
  118. // both lower and upper bounds, so we are not performing a full scan. Results
  119. // from querying indexes are then intersected and returned to the caller.
  120. func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
  121. var hashes [][]byte
  122. var hashesInitialized bool
  123. // get a list of conditions (like "tx.height > 5")
  124. conditions := q.Conditions()
  125. // if there is a hash condition, return the result immediately
  126. hash, err, ok := lookForHash(conditions)
  127. if err != nil {
  128. return nil, errors.Wrap(err, "error during searching for a hash in the query")
  129. } else if ok {
  130. res, err := txi.Get(hash)
  131. if res == nil {
  132. return []*types.TxResult{}, nil
  133. }
  134. return []*types.TxResult{res}, errors.Wrap(err, "error while retrieving the result")
  135. }
  136. // conditions to skip because they're handled before "everything else"
  137. skipIndexes := make([]int, 0)
  138. // extract ranges
  139. // if both upper and lower bounds exist, it's better to get them in order not
  140. // no iterate over kvs that are not within range.
  141. ranges, rangeIndexes := lookForRanges(conditions)
  142. if len(ranges) > 0 {
  143. skipIndexes = append(skipIndexes, rangeIndexes...)
  144. for _, r := range ranges {
  145. if !hashesInitialized {
  146. hashes = txi.matchRange(r, startKey(r.key))
  147. hashesInitialized = true
  148. } else {
  149. hashes = intersect(hashes, txi.matchRange(r, startKey(r.key)))
  150. }
  151. }
  152. }
  153. // if there is a height condition ("tx.height=3"), extract it
  154. height := lookForHeight(conditions)
  155. // for all other conditions
  156. for i, c := range conditions {
  157. if cmn.IntInSlice(i, skipIndexes) {
  158. continue
  159. }
  160. if !hashesInitialized {
  161. hashes = txi.match(c, startKeyForCondition(c, height))
  162. hashesInitialized = true
  163. } else {
  164. hashes = intersect(hashes, txi.match(c, startKeyForCondition(c, height)))
  165. }
  166. }
  167. results := make([]*types.TxResult, len(hashes))
  168. i := 0
  169. for _, h := range hashes {
  170. results[i], err = txi.Get(h)
  171. if err != nil {
  172. return nil, errors.Wrapf(err, "failed to get Tx{%X}", h)
  173. }
  174. i++
  175. }
  176. // sort by height & index by default
  177. sort.Slice(results, func(i, j int) bool {
  178. if results[i].Height == results[j].Height {
  179. return results[i].Index < results[j].Index
  180. }
  181. return results[i].Height < results[j].Height
  182. })
  183. return results, nil
  184. }
  185. func lookForHash(conditions []query.Condition) (hash []byte, err error, ok bool) {
  186. for _, c := range conditions {
  187. if c.Tag == types.TxHashKey {
  188. decoded, err := hex.DecodeString(c.Operand.(string))
  189. return decoded, err, true
  190. }
  191. }
  192. return
  193. }
  194. // lookForHeight returns a height if there is an "height=X" condition.
  195. func lookForHeight(conditions []query.Condition) (height int64) {
  196. for _, c := range conditions {
  197. if c.Tag == types.TxHeightKey && c.Op == query.OpEqual {
  198. return c.Operand.(int64)
  199. }
  200. }
  201. return 0
  202. }
  203. // special map to hold range conditions
  204. // Example: account.number => queryRange{lowerBound: 1, upperBound: 5}
  205. type queryRanges map[string]queryRange
  206. type queryRange struct {
  207. key string
  208. lowerBound interface{} // int || time.Time
  209. includeLowerBound bool
  210. upperBound interface{} // int || time.Time
  211. includeUpperBound bool
  212. }
  213. func (r queryRange) lowerBoundValue() interface{} {
  214. if r.lowerBound == nil {
  215. return nil
  216. }
  217. if r.includeLowerBound {
  218. return r.lowerBound
  219. } else {
  220. switch t := r.lowerBound.(type) {
  221. case int64:
  222. return t + 1
  223. case time.Time:
  224. return t.Unix() + 1
  225. default:
  226. panic("not implemented")
  227. }
  228. }
  229. }
  230. func (r queryRange) AnyBound() interface{} {
  231. if r.lowerBound != nil {
  232. return r.lowerBound
  233. } else {
  234. return r.upperBound
  235. }
  236. }
  237. func (r queryRange) upperBoundValue() interface{} {
  238. if r.upperBound == nil {
  239. return nil
  240. }
  241. if r.includeUpperBound {
  242. return r.upperBound
  243. } else {
  244. switch t := r.upperBound.(type) {
  245. case int64:
  246. return t - 1
  247. case time.Time:
  248. return t.Unix() - 1
  249. default:
  250. panic("not implemented")
  251. }
  252. }
  253. }
  254. func lookForRanges(conditions []query.Condition) (ranges queryRanges, indexes []int) {
  255. ranges = make(queryRanges)
  256. for i, c := range conditions {
  257. if isRangeOperation(c.Op) {
  258. r, ok := ranges[c.Tag]
  259. if !ok {
  260. r = queryRange{key: c.Tag}
  261. }
  262. switch c.Op {
  263. case query.OpGreater:
  264. r.lowerBound = c.Operand
  265. case query.OpGreaterEqual:
  266. r.includeLowerBound = true
  267. r.lowerBound = c.Operand
  268. case query.OpLess:
  269. r.upperBound = c.Operand
  270. case query.OpLessEqual:
  271. r.includeUpperBound = true
  272. r.upperBound = c.Operand
  273. }
  274. ranges[c.Tag] = r
  275. indexes = append(indexes, i)
  276. }
  277. }
  278. return ranges, indexes
  279. }
  280. func isRangeOperation(op query.Operator) bool {
  281. switch op {
  282. case query.OpGreater, query.OpGreaterEqual, query.OpLess, query.OpLessEqual:
  283. return true
  284. default:
  285. return false
  286. }
  287. }
  288. func (txi *TxIndex) match(c query.Condition, startKeyBz []byte) (hashes [][]byte) {
  289. if c.Op == query.OpEqual {
  290. it := dbm.IteratePrefix(txi.store, startKeyBz)
  291. defer it.Close()
  292. for ; it.Valid(); it.Next() {
  293. hashes = append(hashes, it.Value())
  294. }
  295. } else if c.Op == query.OpContains {
  296. // XXX: startKey does not apply here.
  297. // For example, if startKey = "account.owner/an/" and search query = "accoutn.owner CONTAINS an"
  298. // we can't iterate with prefix "account.owner/an/" because we might miss keys like "account.owner/Ulan/"
  299. it := dbm.IteratePrefix(txi.store, startKey(c.Tag))
  300. defer it.Close()
  301. for ; it.Valid(); it.Next() {
  302. if !isTagKey(it.Key()) {
  303. continue
  304. }
  305. if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) {
  306. hashes = append(hashes, it.Value())
  307. }
  308. }
  309. } else {
  310. panic("other operators should be handled already")
  311. }
  312. return
  313. }
  314. func (txi *TxIndex) matchRange(r queryRange, startKey []byte) (hashes [][]byte) {
  315. // create a map to prevent duplicates
  316. hashesMap := make(map[string][]byte)
  317. lowerBound := r.lowerBoundValue()
  318. upperBound := r.upperBoundValue()
  319. it := dbm.IteratePrefix(txi.store, startKey)
  320. defer it.Close()
  321. LOOP:
  322. for ; it.Valid(); it.Next() {
  323. if !isTagKey(it.Key()) {
  324. continue
  325. }
  326. switch r.AnyBound().(type) {
  327. case int64:
  328. v, err := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
  329. if err != nil {
  330. continue LOOP
  331. }
  332. include := true
  333. if lowerBound != nil && v < lowerBound.(int64) {
  334. include = false
  335. }
  336. if upperBound != nil && v > upperBound.(int64) {
  337. include = false
  338. }
  339. if include {
  340. hashesMap[fmt.Sprintf("%X", it.Value())] = it.Value()
  341. }
  342. // XXX: passing time in a ABCI Tags is not yet implemented
  343. // case time.Time:
  344. // v := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
  345. // if v == r.upperBound {
  346. // break
  347. // }
  348. }
  349. }
  350. hashes = make([][]byte, len(hashesMap))
  351. i := 0
  352. for _, h := range hashesMap {
  353. hashes[i] = h
  354. i++
  355. }
  356. return
  357. }
  358. ///////////////////////////////////////////////////////////////////////////////
  359. // Keys
  360. func isTagKey(key []byte) bool {
  361. return strings.Count(string(key), tagKeySeparator) == 3
  362. }
  363. func extractValueFromKey(key []byte) string {
  364. parts := strings.SplitN(string(key), tagKeySeparator, 3)
  365. return parts[1]
  366. }
  367. func keyForTag(tag cmn.KVPair, result *types.TxResult) []byte {
  368. return []byte(fmt.Sprintf("%s/%s/%d/%d",
  369. tag.Key,
  370. tag.Value,
  371. result.Height,
  372. result.Index,
  373. ))
  374. }
  375. func keyForHeight(result *types.TxResult) []byte {
  376. return []byte(fmt.Sprintf("%s/%d/%d/%d",
  377. types.TxHeightKey,
  378. result.Height,
  379. result.Height,
  380. result.Index,
  381. ))
  382. }
  383. func startKeyForCondition(c query.Condition, height int64) []byte {
  384. if height > 0 {
  385. return startKey(c.Tag, c.Operand, height)
  386. }
  387. return startKey(c.Tag, c.Operand)
  388. }
  389. func startKey(fields ...interface{}) []byte {
  390. var b bytes.Buffer
  391. for _, f := range fields {
  392. b.Write([]byte(fmt.Sprintf("%v", f) + tagKeySeparator))
  393. }
  394. return b.Bytes()
  395. }
  396. ///////////////////////////////////////////////////////////////////////////////
  397. // Utils
  398. func intersect(as, bs [][]byte) [][]byte {
  399. i := make([][]byte, 0, cmn.MinInt(len(as), len(bs)))
  400. for _, a := range as {
  401. for _, b := range bs {
  402. if bytes.Equal(a, b) {
  403. i = append(i, a)
  404. }
  405. }
  406. }
  407. return i
  408. }