You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

396 lines
9.9 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package kv
  2. import (
  3. "bytes"
  4. "encoding/hex"
  5. "fmt"
  6. "strconv"
  7. "strings"
  8. "time"
  9. "github.com/pkg/errors"
  10. wire "github.com/tendermint/go-wire"
  11. cmn "github.com/tendermint/tmlibs/common"
  12. dbm "github.com/tendermint/tmlibs/db"
  13. "github.com/tendermint/tmlibs/pubsub/query"
  14. "github.com/tendermint/tendermint/state/txindex"
  15. "github.com/tendermint/tendermint/types"
  16. )
  17. const (
  18. tagKeySeparator = "/"
  19. )
  20. var _ txindex.TxIndexer = (*TxIndex)(nil)
  21. // TxIndex is the simplest possible indexer, backed by key-value storage (levelDB).
  22. type TxIndex struct {
  23. store dbm.DB
  24. tagsToIndex []string
  25. indexAllTags bool
  26. }
  27. // NewTxIndex creates new KV indexer.
  28. func NewTxIndex(store dbm.DB, options ...func(*TxIndex)) *TxIndex {
  29. txi := &TxIndex{store: store, tagsToIndex: make([]string, 0), indexAllTags: false}
  30. for _, o := range options {
  31. o(txi)
  32. }
  33. return txi
  34. }
  35. // IndexTags is an option for setting which tags to index.
  36. func IndexTags(tags []string) func(*TxIndex) {
  37. return func(txi *TxIndex) {
  38. txi.tagsToIndex = tags
  39. }
  40. }
  41. // IndexAllTags is an option for indexing all tags.
  42. func IndexAllTags() func(*TxIndex) {
  43. return func(txi *TxIndex) {
  44. txi.indexAllTags = true
  45. }
  46. }
  47. // Get gets transaction from the TxIndex storage and returns it or nil if the
  48. // transaction is not found.
  49. func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
  50. if len(hash) == 0 {
  51. return nil, txindex.ErrorEmptyHash
  52. }
  53. rawBytes := txi.store.Get(hash)
  54. if rawBytes == nil {
  55. return nil, nil
  56. }
  57. r := bytes.NewReader(rawBytes)
  58. var n int
  59. var err error
  60. txResult := wire.ReadBinary(&types.TxResult{}, r, 0, &n, &err).(*types.TxResult)
  61. if err != nil {
  62. return nil, fmt.Errorf("Error reading TxResult: %v", err)
  63. }
  64. return txResult, nil
  65. }
  66. // AddBatch indexes a batch of transactions using the given list of tags.
  67. func (txi *TxIndex) AddBatch(b *txindex.Batch) error {
  68. storeBatch := txi.store.NewBatch()
  69. for _, result := range b.Ops {
  70. hash := result.Tx.Hash()
  71. // index tx by tags
  72. for _, tag := range result.Result.Tags {
  73. if txi.indexAllTags || cmn.StringInSlice(string(tag.Key), txi.tagsToIndex) {
  74. storeBatch.Set(keyForTag(tag, result), hash)
  75. }
  76. }
  77. // index tx by hash
  78. rawBytes := wire.BinaryBytes(result)
  79. storeBatch.Set(hash, rawBytes)
  80. }
  81. storeBatch.Write()
  82. return nil
  83. }
  84. // Index indexes a single transaction using the given list of tags.
  85. func (txi *TxIndex) Index(result *types.TxResult) error {
  86. b := txi.store.NewBatch()
  87. hash := result.Tx.Hash()
  88. // index tx by tags
  89. for _, tag := range result.Result.Tags {
  90. if txi.indexAllTags || cmn.StringInSlice(string(tag.Key), txi.tagsToIndex) {
  91. b.Set(keyForTag(tag, result), hash)
  92. }
  93. }
  94. // index tx by hash
  95. rawBytes := wire.BinaryBytes(result)
  96. b.Set(hash, rawBytes)
  97. b.Write()
  98. return nil
  99. }
  100. // Search performs a search using the given query. It breaks the query into
  101. // conditions (like "tx.height > 5"). For each condition, it queries the DB
  102. // index. One special use cases here: (1) if "tx.hash" is found, it returns tx
  103. // result for it (2) for range queries it is better for the client to provide
  104. // both lower and upper bounds, so we are not performing a full scan. Results
  105. // from querying indexes are then intersected and returned to the caller.
  106. func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
  107. var hashes [][]byte
  108. var hashesInitialized bool
  109. // get a list of conditions (like "tx.height > 5")
  110. conditions := q.Conditions()
  111. // if there is a hash condition, return the result immediately
  112. hash, err, ok := lookForHash(conditions)
  113. if err != nil {
  114. return nil, errors.Wrap(err, "error during searching for a hash in the query")
  115. } else if ok {
  116. res, err := txi.Get(hash)
  117. if res == nil {
  118. return []*types.TxResult{}, nil
  119. } else {
  120. return []*types.TxResult{res}, errors.Wrap(err, "error while retrieving the result")
  121. }
  122. }
  123. // conditions to skip because they're handled before "everything else"
  124. skipIndexes := make([]int, 0)
  125. // if there is a height condition ("tx.height=3"), extract it for faster lookups
  126. height, heightIndex := lookForHeight(conditions)
  127. if heightIndex >= 0 {
  128. skipIndexes = append(skipIndexes, heightIndex)
  129. }
  130. // extract ranges
  131. // if both upper and lower bounds exist, it's better to get them in order not
  132. // no iterate over kvs that are not within range.
  133. ranges, rangeIndexes := lookForRanges(conditions)
  134. if len(ranges) > 0 {
  135. skipIndexes = append(skipIndexes, rangeIndexes...)
  136. for _, r := range ranges {
  137. if !hashesInitialized {
  138. hashes = txi.matchRange(r, startKeyForRange(r, height))
  139. hashesInitialized = true
  140. } else {
  141. hashes = intersect(hashes, txi.matchRange(r, startKeyForRange(r, height)))
  142. }
  143. }
  144. }
  145. // for all other conditions
  146. for i, c := range conditions {
  147. if cmn.IntInSlice(i, skipIndexes) {
  148. continue
  149. }
  150. if !hashesInitialized {
  151. hashes = txi.match(c, startKey(c, height))
  152. hashesInitialized = true
  153. } else {
  154. hashes = intersect(hashes, txi.match(c, startKey(c, height)))
  155. }
  156. }
  157. results := make([]*types.TxResult, len(hashes))
  158. i := 0
  159. for _, h := range hashes {
  160. results[i], err = txi.Get(h)
  161. if err != nil {
  162. return nil, errors.Wrapf(err, "failed to get Tx{%X}", h)
  163. }
  164. i++
  165. }
  166. return results, nil
  167. }
  168. func lookForHash(conditions []query.Condition) (hash []byte, err error, ok bool) {
  169. for _, c := range conditions {
  170. if c.Tag == types.TxHashKey {
  171. decoded, err := hex.DecodeString(c.Operand.(string))
  172. return decoded, err, true
  173. }
  174. }
  175. return
  176. }
  177. func lookForHeight(conditions []query.Condition) (height int64, index int) {
  178. for i, c := range conditions {
  179. if c.Tag == types.TxHeightKey {
  180. return c.Operand.(int64), i
  181. }
  182. }
  183. return 0, -1
  184. }
  185. // special map to hold range conditions
  186. // Example: account.number => queryRange{lowerBound: 1, upperBound: 5}
  187. type queryRanges map[string]queryRange
  188. type queryRange struct {
  189. key string
  190. lowerBound interface{} // int || time.Time
  191. includeLowerBound bool
  192. upperBound interface{} // int || time.Time
  193. includeUpperBound bool
  194. }
  195. func lookForRanges(conditions []query.Condition) (ranges queryRanges, indexes []int) {
  196. ranges = make(queryRanges)
  197. for i, c := range conditions {
  198. if isRangeOperation(c.Op) {
  199. r, ok := ranges[c.Tag]
  200. if !ok {
  201. r = queryRange{key: c.Tag}
  202. }
  203. switch c.Op {
  204. case query.OpGreater:
  205. r.lowerBound = c.Operand
  206. case query.OpGreaterEqual:
  207. r.includeLowerBound = true
  208. r.lowerBound = c.Operand
  209. case query.OpLess:
  210. r.upperBound = c.Operand
  211. case query.OpLessEqual:
  212. r.includeUpperBound = true
  213. r.upperBound = c.Operand
  214. }
  215. ranges[c.Tag] = r
  216. indexes = append(indexes, i)
  217. }
  218. }
  219. return ranges, indexes
  220. }
  221. func isRangeOperation(op query.Operator) bool {
  222. switch op {
  223. case query.OpGreater, query.OpGreaterEqual, query.OpLess, query.OpLessEqual:
  224. return true
  225. default:
  226. return false
  227. }
  228. }
  229. func (txi *TxIndex) match(c query.Condition, startKey []byte) (hashes [][]byte) {
  230. if c.Op == query.OpEqual {
  231. it := dbm.IteratePrefix(txi.store, startKey)
  232. defer it.Close()
  233. for ; it.Valid(); it.Next() {
  234. hashes = append(hashes, it.Value())
  235. }
  236. } else if c.Op == query.OpContains {
  237. // XXX: doing full scan because startKey does not apply here
  238. // For example, if startKey = "account.owner=an" and search query = "accoutn.owner CONSISTS an"
  239. // we can't iterate with prefix "account.owner=an" because we might miss keys like "account.owner=Ulan"
  240. it := txi.store.Iterator(nil, nil)
  241. defer it.Close()
  242. for ; it.Valid(); it.Next() {
  243. if !isTagKey(it.Key()) {
  244. continue
  245. }
  246. if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) {
  247. hashes = append(hashes, it.Value())
  248. }
  249. }
  250. } else {
  251. panic("other operators should be handled already")
  252. }
  253. return
  254. }
  255. func (txi *TxIndex) matchRange(r queryRange, startKey []byte) (hashes [][]byte) {
  256. it := dbm.IteratePrefix(txi.store, startKey)
  257. defer it.Close()
  258. LOOP:
  259. for ; it.Valid(); it.Next() {
  260. if !isTagKey(it.Key()) {
  261. continue
  262. }
  263. if r.upperBound != nil {
  264. // no other way to stop iterator other than checking for upperBound
  265. switch (r.upperBound).(type) {
  266. case int64:
  267. v, err := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
  268. if err == nil && v == r.upperBound {
  269. if r.includeUpperBound {
  270. hashes = append(hashes, it.Value())
  271. }
  272. break LOOP
  273. }
  274. // XXX: passing time in a ABCI Tags is not yet implemented
  275. // case time.Time:
  276. // v := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
  277. // if v == r.upperBound {
  278. // break
  279. // }
  280. }
  281. }
  282. hashes = append(hashes, it.Value())
  283. }
  284. return
  285. }
  286. ///////////////////////////////////////////////////////////////////////////////
  287. // Keys
  288. func startKey(c query.Condition, height int64) []byte {
  289. var key string
  290. if height > 0 {
  291. key = fmt.Sprintf("%s/%v/%d", c.Tag, c.Operand, height)
  292. } else {
  293. key = fmt.Sprintf("%s/%v", c.Tag, c.Operand)
  294. }
  295. return []byte(key)
  296. }
  297. func startKeyForRange(r queryRange, height int64) []byte {
  298. if r.lowerBound == nil {
  299. return []byte(r.key)
  300. }
  301. var lowerBound interface{}
  302. if r.includeLowerBound {
  303. lowerBound = r.lowerBound
  304. } else {
  305. switch t := r.lowerBound.(type) {
  306. case int64:
  307. lowerBound = t + 1
  308. case time.Time:
  309. lowerBound = t.Unix() + 1
  310. default:
  311. panic("not implemented")
  312. }
  313. }
  314. var key string
  315. if height > 0 {
  316. key = fmt.Sprintf("%s/%v/%d", r.key, lowerBound, height)
  317. } else {
  318. key = fmt.Sprintf("%s/%v", r.key, lowerBound)
  319. }
  320. return []byte(key)
  321. }
  322. func isTagKey(key []byte) bool {
  323. return strings.Count(string(key), tagKeySeparator) == 3
  324. }
  325. func extractValueFromKey(key []byte) string {
  326. parts := strings.SplitN(string(key), tagKeySeparator, 3)
  327. return parts[1]
  328. }
  329. func keyForTag(tag cmn.KVPair, result *types.TxResult) []byte {
  330. return []byte(fmt.Sprintf("%s/%s/%d/%d", tag.Key, tag.Value, result.Height, result.Index))
  331. }
  332. ///////////////////////////////////////////////////////////////////////////////
  333. // Utils
  334. func intersect(as, bs [][]byte) [][]byte {
  335. i := make([][]byte, 0, cmn.MinInt(len(as), len(bs)))
  336. for _, a := range as {
  337. for _, b := range bs {
  338. if bytes.Equal(a, b) {
  339. i = append(i, a)
  340. }
  341. }
  342. }
  343. return i
  344. }