You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

440 lines
10 KiB

7 years ago
7 years ago
7 years ago
7 years ago
  1. package kv
  2. import (
  3. "bytes"
  4. "encoding/hex"
  5. "fmt"
  6. "sort"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "github.com/pkg/errors"
  11. wire "github.com/tendermint/go-wire"
  12. cmn "github.com/tendermint/tmlibs/common"
  13. dbm "github.com/tendermint/tmlibs/db"
  14. "github.com/tendermint/tmlibs/pubsub/query"
  15. "github.com/tendermint/tendermint/state/txindex"
  16. "github.com/tendermint/tendermint/types"
  17. )
  18. const (
  19. tagKeySeparator = "/"
  20. )
  21. var _ txindex.TxIndexer = (*TxIndex)(nil)
  22. // TxIndex is the simplest possible indexer, backed by key-value storage (levelDB).
  23. type TxIndex struct {
  24. store dbm.DB
  25. tagsToIndex []string
  26. indexAllTags bool
  27. }
  28. // NewTxIndex creates new KV indexer.
  29. func NewTxIndex(store dbm.DB, options ...func(*TxIndex)) *TxIndex {
  30. txi := &TxIndex{store: store, tagsToIndex: make([]string, 0), indexAllTags: false}
  31. for _, o := range options {
  32. o(txi)
  33. }
  34. return txi
  35. }
  36. // IndexTags is an option for setting which tags to index.
  37. func IndexTags(tags []string) func(*TxIndex) {
  38. return func(txi *TxIndex) {
  39. txi.tagsToIndex = tags
  40. }
  41. }
  42. // IndexAllTags is an option for indexing all tags.
  43. func IndexAllTags() func(*TxIndex) {
  44. return func(txi *TxIndex) {
  45. txi.indexAllTags = true
  46. }
  47. }
  48. // Get gets transaction from the TxIndex storage and returns it or nil if the
  49. // transaction is not found.
  50. func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
  51. if len(hash) == 0 {
  52. return nil, txindex.ErrorEmptyHash
  53. }
  54. rawBytes := txi.store.Get(hash)
  55. if rawBytes == nil {
  56. return nil, nil
  57. }
  58. txResult := new(types.TxResult)
  59. err := wire.UnmarshalBinary(rawBytes, &txResult)
  60. if err != nil {
  61. return nil, fmt.Errorf("Error reading TxResult: %v", err)
  62. }
  63. return txResult, nil
  64. }
  65. // AddBatch indexes a batch of transactions using the given list of tags.
  66. func (txi *TxIndex) AddBatch(b *txindex.Batch) error {
  67. storeBatch := txi.store.NewBatch()
  68. for _, result := range b.Ops {
  69. hash := result.Tx.Hash()
  70. // index tx by tags
  71. for _, tag := range result.Result.Tags {
  72. if txi.indexAllTags || cmn.StringInSlice(string(tag.Key), txi.tagsToIndex) {
  73. storeBatch.Set(keyForTag(tag, result), hash)
  74. }
  75. }
  76. // index tx by hash
  77. rawBytes, err := wire.MarshalBinary(result)
  78. if err != nil {
  79. return err
  80. }
  81. storeBatch.Set(hash, rawBytes)
  82. }
  83. storeBatch.Write()
  84. return nil
  85. }
  86. // Index indexes a single transaction using the given list of tags.
  87. func (txi *TxIndex) Index(result *types.TxResult) error {
  88. b := txi.store.NewBatch()
  89. hash := result.Tx.Hash()
  90. // index tx by tags
  91. for _, tag := range result.Result.Tags {
  92. if txi.indexAllTags || cmn.StringInSlice(string(tag.Key), txi.tagsToIndex) {
  93. b.Set(keyForTag(tag, result), hash)
  94. }
  95. }
  96. // index tx by hash
  97. rawBytes, err := wire.MarshalBinary(result)
  98. if err != nil {
  99. return err
  100. }
  101. b.Set(hash, rawBytes)
  102. b.Write()
  103. return nil
  104. }
  105. // Search performs a search using the given query. It breaks the query into
  106. // conditions (like "tx.height > 5"). For each condition, it queries the DB
  107. // index. One special use cases here: (1) if "tx.hash" is found, it returns tx
  108. // result for it (2) for range queries it is better for the client to provide
  109. // both lower and upper bounds, so we are not performing a full scan. Results
  110. // from querying indexes are then intersected and returned to the caller.
  111. func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
  112. var hashes [][]byte
  113. var hashesInitialized bool
  114. // get a list of conditions (like "tx.height > 5")
  115. conditions := q.Conditions()
  116. // if there is a hash condition, return the result immediately
  117. hash, err, ok := lookForHash(conditions)
  118. if err != nil {
  119. return nil, errors.Wrap(err, "error during searching for a hash in the query")
  120. } else if ok {
  121. res, err := txi.Get(hash)
  122. if res == nil {
  123. return []*types.TxResult{}, nil
  124. } else {
  125. return []*types.TxResult{res}, errors.Wrap(err, "error while retrieving the result")
  126. }
  127. }
  128. // conditions to skip because they're handled before "everything else"
  129. skipIndexes := make([]int, 0)
  130. // if there is a height condition ("tx.height=3"), extract it for faster lookups
  131. height, heightIndex := lookForHeight(conditions)
  132. if heightIndex >= 0 {
  133. skipIndexes = append(skipIndexes, heightIndex)
  134. }
  135. // extract ranges
  136. // if both upper and lower bounds exist, it's better to get them in order not
  137. // no iterate over kvs that are not within range.
  138. ranges, rangeIndexes := lookForRanges(conditions)
  139. if len(ranges) > 0 {
  140. skipIndexes = append(skipIndexes, rangeIndexes...)
  141. for _, r := range ranges {
  142. if !hashesInitialized {
  143. hashes = txi.matchRange(r, []byte(r.key))
  144. hashesInitialized = true
  145. } else {
  146. hashes = intersect(hashes, txi.matchRange(r, []byte(r.key)))
  147. }
  148. }
  149. }
  150. // for all other conditions
  151. for i, c := range conditions {
  152. if cmn.IntInSlice(i, skipIndexes) {
  153. continue
  154. }
  155. if !hashesInitialized {
  156. hashes = txi.match(c, startKey(c, height))
  157. hashesInitialized = true
  158. } else {
  159. hashes = intersect(hashes, txi.match(c, startKey(c, height)))
  160. }
  161. }
  162. results := make([]*types.TxResult, len(hashes))
  163. i := 0
  164. for _, h := range hashes {
  165. results[i], err = txi.Get(h)
  166. if err != nil {
  167. return nil, errors.Wrapf(err, "failed to get Tx{%X}", h)
  168. }
  169. i++
  170. }
  171. // sort by height by default
  172. sort.Slice(results, func(i, j int) bool {
  173. return results[i].Height < results[j].Height
  174. })
  175. return results, nil
  176. }
  177. func lookForHash(conditions []query.Condition) (hash []byte, err error, ok bool) {
  178. for _, c := range conditions {
  179. if c.Tag == types.TxHashKey {
  180. decoded, err := hex.DecodeString(c.Operand.(string))
  181. return decoded, err, true
  182. }
  183. }
  184. return
  185. }
  186. func lookForHeight(conditions []query.Condition) (height int64, index int) {
  187. for i, c := range conditions {
  188. if c.Tag == types.TxHeightKey {
  189. return c.Operand.(int64), i
  190. }
  191. }
  192. return 0, -1
  193. }
  194. // special map to hold range conditions
  195. // Example: account.number => queryRange{lowerBound: 1, upperBound: 5}
  196. type queryRanges map[string]queryRange
  197. type queryRange struct {
  198. key string
  199. lowerBound interface{} // int || time.Time
  200. includeLowerBound bool
  201. upperBound interface{} // int || time.Time
  202. includeUpperBound bool
  203. }
  204. func (r queryRange) lowerBoundValue() interface{} {
  205. if r.lowerBound == nil {
  206. return nil
  207. }
  208. if r.includeLowerBound {
  209. return r.lowerBound
  210. } else {
  211. switch t := r.lowerBound.(type) {
  212. case int64:
  213. return t + 1
  214. case time.Time:
  215. return t.Unix() + 1
  216. default:
  217. panic("not implemented")
  218. }
  219. }
  220. }
  221. func (r queryRange) AnyBound() interface{} {
  222. if r.lowerBound != nil {
  223. return r.lowerBound
  224. } else {
  225. return r.upperBound
  226. }
  227. }
  228. func (r queryRange) upperBoundValue() interface{} {
  229. if r.upperBound == nil {
  230. return nil
  231. }
  232. if r.includeUpperBound {
  233. return r.upperBound
  234. } else {
  235. switch t := r.upperBound.(type) {
  236. case int64:
  237. return t - 1
  238. case time.Time:
  239. return t.Unix() - 1
  240. default:
  241. panic("not implemented")
  242. }
  243. }
  244. }
  245. func lookForRanges(conditions []query.Condition) (ranges queryRanges, indexes []int) {
  246. ranges = make(queryRanges)
  247. for i, c := range conditions {
  248. if isRangeOperation(c.Op) {
  249. r, ok := ranges[c.Tag]
  250. if !ok {
  251. r = queryRange{key: c.Tag}
  252. }
  253. switch c.Op {
  254. case query.OpGreater:
  255. r.lowerBound = c.Operand
  256. case query.OpGreaterEqual:
  257. r.includeLowerBound = true
  258. r.lowerBound = c.Operand
  259. case query.OpLess:
  260. r.upperBound = c.Operand
  261. case query.OpLessEqual:
  262. r.includeUpperBound = true
  263. r.upperBound = c.Operand
  264. }
  265. ranges[c.Tag] = r
  266. indexes = append(indexes, i)
  267. }
  268. }
  269. return ranges, indexes
  270. }
  271. func isRangeOperation(op query.Operator) bool {
  272. switch op {
  273. case query.OpGreater, query.OpGreaterEqual, query.OpLess, query.OpLessEqual:
  274. return true
  275. default:
  276. return false
  277. }
  278. }
  279. func (txi *TxIndex) match(c query.Condition, startKey []byte) (hashes [][]byte) {
  280. if c.Op == query.OpEqual {
  281. it := dbm.IteratePrefix(txi.store, startKey)
  282. defer it.Close()
  283. for ; it.Valid(); it.Next() {
  284. hashes = append(hashes, it.Value())
  285. }
  286. } else if c.Op == query.OpContains {
  287. // XXX: doing full scan because startKey does not apply here
  288. // For example, if startKey = "account.owner=an" and search query = "accoutn.owner CONSISTS an"
  289. // we can't iterate with prefix "account.owner=an" because we might miss keys like "account.owner=Ulan"
  290. it := txi.store.Iterator(nil, nil)
  291. defer it.Close()
  292. for ; it.Valid(); it.Next() {
  293. if !isTagKey(it.Key()) {
  294. continue
  295. }
  296. if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) {
  297. hashes = append(hashes, it.Value())
  298. }
  299. }
  300. } else {
  301. panic("other operators should be handled already")
  302. }
  303. return
  304. }
  305. func (txi *TxIndex) matchRange(r queryRange, prefix []byte) (hashes [][]byte) {
  306. // create a map to prevent duplicates
  307. hashesMap := make(map[string][]byte)
  308. lowerBound := r.lowerBoundValue()
  309. upperBound := r.upperBoundValue()
  310. it := dbm.IteratePrefix(txi.store, prefix)
  311. defer it.Close()
  312. LOOP:
  313. for ; it.Valid(); it.Next() {
  314. if !isTagKey(it.Key()) {
  315. continue
  316. }
  317. switch r.AnyBound().(type) {
  318. case int64:
  319. v, err := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
  320. if err != nil {
  321. continue LOOP
  322. }
  323. include := true
  324. if lowerBound != nil && v < lowerBound.(int64) {
  325. include = false
  326. }
  327. if upperBound != nil && v > upperBound.(int64) {
  328. include = false
  329. }
  330. if include {
  331. hashesMap[fmt.Sprintf("%X", it.Value())] = it.Value()
  332. }
  333. // XXX: passing time in a ABCI Tags is not yet implemented
  334. // case time.Time:
  335. // v := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
  336. // if v == r.upperBound {
  337. // break
  338. // }
  339. }
  340. }
  341. hashes = make([][]byte, len(hashesMap))
  342. i := 0
  343. for _, h := range hashesMap {
  344. hashes[i] = h
  345. i++
  346. }
  347. return
  348. }
  349. ///////////////////////////////////////////////////////////////////////////////
  350. // Keys
  351. func startKey(c query.Condition, height int64) []byte {
  352. var key string
  353. if height > 0 {
  354. key = fmt.Sprintf("%s/%v/%d", c.Tag, c.Operand, height)
  355. } else {
  356. key = fmt.Sprintf("%s/%v", c.Tag, c.Operand)
  357. }
  358. return []byte(key)
  359. }
  360. func isTagKey(key []byte) bool {
  361. return strings.Count(string(key), tagKeySeparator) == 3
  362. }
  363. func extractValueFromKey(key []byte) string {
  364. parts := strings.SplitN(string(key), tagKeySeparator, 3)
  365. return parts[1]
  366. }
  367. func keyForTag(tag cmn.KVPair, result *types.TxResult) []byte {
  368. return []byte(fmt.Sprintf("%s/%s/%d/%d", tag.Key, tag.Value, result.Height, result.Index))
  369. }
  370. ///////////////////////////////////////////////////////////////////////////////
  371. // Utils
  372. func intersect(as, bs [][]byte) [][]byte {
  373. i := make([][]byte, 0, cmn.MinInt(len(as), len(bs)))
  374. for _, a := range as {
  375. for _, b := range bs {
  376. if bytes.Equal(a, b) {
  377. i = append(i, a)
  378. }
  379. }
  380. }
  381. return i
  382. }