You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

405 lines
10 KiB

7 years ago
7 years ago
7 years ago
7 years ago
  1. package kv
  2. import (
  3. "bytes"
  4. "encoding/hex"
  5. "fmt"
  6. "strconv"
  7. "strings"
  8. "time"
  9. "github.com/pkg/errors"
  10. abci "github.com/tendermint/abci/types"
  11. wire "github.com/tendermint/go-wire"
  12. "github.com/tendermint/tendermint/state/txindex"
  13. "github.com/tendermint/tendermint/types"
  14. cmn "github.com/tendermint/tmlibs/common"
  15. db "github.com/tendermint/tmlibs/db"
  16. "github.com/tendermint/tmlibs/pubsub/query"
  17. )
  18. const (
  19. tagKeySeparator = "/"
  20. )
  21. var _ txindex.TxIndexer = (*TxIndex)(nil)
  22. // TxIndex is the simplest possible indexer, backed by key-value storage (levelDB).
  23. type TxIndex struct {
  24. store db.DB
  25. tagsToIndex []string
  26. indexAllTags bool
  27. }
  28. // NewTxIndex creates new KV indexer.
  29. func NewTxIndex(store db.DB, options ...func(*TxIndex)) *TxIndex {
  30. txi := &TxIndex{store: store, tagsToIndex: make([]string, 0), indexAllTags: false}
  31. for _, o := range options {
  32. o(txi)
  33. }
  34. return txi
  35. }
  36. // IndexTags is an option for setting which tags to index.
  37. func IndexTags(tags []string) func(*TxIndex) {
  38. return func(txi *TxIndex) {
  39. txi.tagsToIndex = tags
  40. }
  41. }
  42. // IndexAllTags is an option for indexing all tags.
  43. func IndexAllTags() func(*TxIndex) {
  44. return func(txi *TxIndex) {
  45. txi.indexAllTags = true
  46. }
  47. }
  48. // Get gets transaction from the TxIndex storage and returns it or nil if the
  49. // transaction is not found.
  50. func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
  51. if len(hash) == 0 {
  52. return nil, txindex.ErrorEmptyHash
  53. }
  54. rawBytes := txi.store.Get(hash)
  55. if rawBytes == nil {
  56. return nil, nil
  57. }
  58. r := bytes.NewReader(rawBytes)
  59. var n int
  60. var err error
  61. txResult := wire.ReadBinary(&types.TxResult{}, r, 0, &n, &err).(*types.TxResult)
  62. if err != nil {
  63. return nil, fmt.Errorf("Error reading TxResult: %v", err)
  64. }
  65. return txResult, nil
  66. }
  67. // AddBatch indexes a batch of transactions using the given list of tags.
  68. func (txi *TxIndex) AddBatch(b *txindex.Batch) error {
  69. storeBatch := txi.store.NewBatch()
  70. for _, result := range b.Ops {
  71. hash := result.Tx.Hash()
  72. // index tx by tags
  73. for _, tag := range result.Result.Tags {
  74. if txi.indexAllTags || cmn.StringInSlice(tag.Key, txi.tagsToIndex) {
  75. storeBatch.Set(keyForTag(tag, result), hash)
  76. }
  77. }
  78. // index tx by hash
  79. rawBytes := wire.BinaryBytes(result)
  80. storeBatch.Set(hash, rawBytes)
  81. }
  82. storeBatch.Write()
  83. return nil
  84. }
  85. // Index indexes a single transaction using the given list of tags.
  86. func (txi *TxIndex) Index(result *types.TxResult) error {
  87. b := txi.store.NewBatch()
  88. hash := result.Tx.Hash()
  89. // index tx by tags
  90. for _, tag := range result.Result.Tags {
  91. if txi.indexAllTags || cmn.StringInSlice(tag.Key, txi.tagsToIndex) {
  92. b.Set(keyForTag(tag, result), hash)
  93. }
  94. }
  95. // index tx by hash
  96. rawBytes := wire.BinaryBytes(result)
  97. b.Set(hash, rawBytes)
  98. b.Write()
  99. return nil
  100. }
  101. // Search performs a search using the given query. It breaks the query into
  102. // conditions (like "tx.height > 5"). For each condition, it queries the DB
  103. // index. One special use cases here: (1) if "tx.hash" is found, it returns tx
  104. // result for it (2) for range queries it is better for the client to provide
  105. // both lower and upper bounds, so we are not performing a full scan. Results
  106. // from querying indexes are then intersected and returned to the caller.
  107. func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
  108. var hashes [][]byte
  109. var hashesInitialized bool
  110. // get a list of conditions (like "tx.height > 5")
  111. conditions := q.Conditions()
  112. // if there is a hash condition, return the result immediately
  113. hash, err, ok := lookForHash(conditions)
  114. if err != nil {
  115. return nil, errors.Wrap(err, "error during searching for a hash in the query")
  116. } else if ok {
  117. res, err := txi.Get(hash)
  118. if res == nil {
  119. return []*types.TxResult{}, nil
  120. } else {
  121. return []*types.TxResult{res}, errors.Wrap(err, "error while retrieving the result")
  122. }
  123. }
  124. // conditions to skip because they're handled before "everything else"
  125. skipIndexes := make([]int, 0)
  126. // if there is a height condition ("tx.height=3"), extract it for faster lookups
  127. height, heightIndex := lookForHeight(conditions)
  128. if heightIndex >= 0 {
  129. skipIndexes = append(skipIndexes, heightIndex)
  130. }
  131. // extract ranges
  132. // if both upper and lower bounds exist, it's better to get them in order not
  133. // no iterate over kvs that are not within range.
  134. ranges, rangeIndexes := lookForRanges(conditions)
  135. if len(ranges) > 0 {
  136. skipIndexes = append(skipIndexes, rangeIndexes...)
  137. for _, r := range ranges {
  138. if !hashesInitialized {
  139. hashes = txi.matchRange(r, startKeyForRange(r, height))
  140. hashesInitialized = true
  141. } else {
  142. hashes = intersect(hashes, txi.matchRange(r, startKeyForRange(r, height)))
  143. }
  144. }
  145. }
  146. // for all other conditions
  147. for i, c := range conditions {
  148. if cmn.IntInSlice(i, skipIndexes) {
  149. continue
  150. }
  151. if !hashesInitialized {
  152. hashes = txi.match(c, startKey(c, height))
  153. hashesInitialized = true
  154. } else {
  155. hashes = intersect(hashes, txi.match(c, startKey(c, height)))
  156. }
  157. }
  158. results := make([]*types.TxResult, len(hashes))
  159. i := 0
  160. for _, h := range hashes {
  161. results[i], err = txi.Get(h)
  162. if err != nil {
  163. return nil, errors.Wrapf(err, "failed to get Tx{%X}", h)
  164. }
  165. i++
  166. }
  167. return results, nil
  168. }
  169. func lookForHash(conditions []query.Condition) (hash []byte, err error, ok bool) {
  170. for _, c := range conditions {
  171. if c.Tag == types.TxHashKey {
  172. decoded, err := hex.DecodeString(c.Operand.(string))
  173. return decoded, err, true
  174. }
  175. }
  176. return
  177. }
  178. func lookForHeight(conditions []query.Condition) (height uint64, index int) {
  179. for i, c := range conditions {
  180. if c.Tag == types.TxHeightKey {
  181. return uint64(c.Operand.(int64)), i
  182. }
  183. }
  184. return 0, -1
  185. }
  186. // special map to hold range conditions
  187. // Example: account.number => queryRange{lowerBound: 1, upperBound: 5}
  188. type queryRanges map[string]queryRange
  189. type queryRange struct {
  190. key string
  191. lowerBound interface{} // int || time.Time
  192. includeLowerBound bool
  193. upperBound interface{} // int || time.Time
  194. includeUpperBound bool
  195. }
  196. func lookForRanges(conditions []query.Condition) (ranges queryRanges, indexes []int) {
  197. ranges = make(queryRanges)
  198. for i, c := range conditions {
  199. if isRangeOperation(c.Op) {
  200. r, ok := ranges[c.Tag]
  201. if !ok {
  202. r = queryRange{key: c.Tag}
  203. }
  204. switch c.Op {
  205. case query.OpGreater:
  206. r.lowerBound = c.Operand
  207. case query.OpGreaterEqual:
  208. r.includeLowerBound = true
  209. r.lowerBound = c.Operand
  210. case query.OpLess:
  211. r.upperBound = c.Operand
  212. case query.OpLessEqual:
  213. r.includeUpperBound = true
  214. r.upperBound = c.Operand
  215. }
  216. ranges[c.Tag] = r
  217. indexes = append(indexes, i)
  218. }
  219. }
  220. return ranges, indexes
  221. }
  222. func isRangeOperation(op query.Operator) bool {
  223. switch op {
  224. case query.OpGreater, query.OpGreaterEqual, query.OpLess, query.OpLessEqual:
  225. return true
  226. default:
  227. return false
  228. }
  229. }
  230. func (txi *TxIndex) match(c query.Condition, startKey []byte) (hashes [][]byte) {
  231. if c.Op == query.OpEqual {
  232. it := txi.store.IteratorPrefix(startKey)
  233. defer it.Release()
  234. for it.Next() {
  235. hashes = append(hashes, it.Value())
  236. }
  237. } else if c.Op == query.OpContains {
  238. // XXX: doing full scan because startKey does not apply here
  239. // For example, if startKey = "account.owner=an" and search query = "accoutn.owner CONSISTS an"
  240. // we can't iterate with prefix "account.owner=an" because we might miss keys like "account.owner=Ulan"
  241. it := txi.store.Iterator()
  242. defer it.Release()
  243. for it.Next() {
  244. if !isTagKey(it.Key()) {
  245. continue
  246. }
  247. if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) {
  248. hashes = append(hashes, it.Value())
  249. }
  250. }
  251. } else {
  252. panic("other operators should be handled already")
  253. }
  254. return
  255. }
  256. func (txi *TxIndex) matchRange(r queryRange, startKey []byte) (hashes [][]byte) {
  257. it := txi.store.IteratorPrefix(startKey)
  258. defer it.Release()
  259. LOOP:
  260. for it.Next() {
  261. if !isTagKey(it.Key()) {
  262. continue
  263. }
  264. if r.upperBound != nil {
  265. // no other way to stop iterator other than checking for upperBound
  266. switch (r.upperBound).(type) {
  267. case int64:
  268. v, err := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
  269. if err == nil && v == r.upperBound {
  270. if r.includeUpperBound {
  271. hashes = append(hashes, it.Value())
  272. }
  273. break LOOP
  274. }
  275. // XXX: passing time in a ABCI Tags is not yet implemented
  276. // case time.Time:
  277. // v := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
  278. // if v == r.upperBound {
  279. // break
  280. // }
  281. }
  282. }
  283. hashes = append(hashes, it.Value())
  284. }
  285. return
  286. }
  287. ///////////////////////////////////////////////////////////////////////////////
  288. // Keys
  289. func startKey(c query.Condition, height uint64) []byte {
  290. var key string
  291. if height > 0 {
  292. key = fmt.Sprintf("%s/%v/%d", c.Tag, c.Operand, height)
  293. } else {
  294. key = fmt.Sprintf("%s/%v", c.Tag, c.Operand)
  295. }
  296. return []byte(key)
  297. }
  298. func startKeyForRange(r queryRange, height uint64) []byte {
  299. if r.lowerBound == nil {
  300. return []byte(fmt.Sprintf("%s", r.key))
  301. }
  302. var lowerBound interface{}
  303. if r.includeLowerBound {
  304. lowerBound = r.lowerBound
  305. } else {
  306. switch t := r.lowerBound.(type) {
  307. case int64:
  308. lowerBound = t + 1
  309. case time.Time:
  310. lowerBound = t.Unix() + 1
  311. default:
  312. panic("not implemented")
  313. }
  314. }
  315. var key string
  316. if height > 0 {
  317. key = fmt.Sprintf("%s/%v/%d", r.key, lowerBound, height)
  318. } else {
  319. key = fmt.Sprintf("%s/%v", r.key, lowerBound)
  320. }
  321. return []byte(key)
  322. }
  323. func isTagKey(key []byte) bool {
  324. return strings.Count(string(key), tagKeySeparator) == 3
  325. }
  326. func extractValueFromKey(key []byte) string {
  327. parts := strings.SplitN(string(key), tagKeySeparator, 3)
  328. return parts[1]
  329. }
  330. func keyForTag(tag *abci.KVPair, result *types.TxResult) []byte {
  331. switch tag.ValueType {
  332. case abci.KVPair_STRING:
  333. return []byte(fmt.Sprintf("%s/%v/%d/%d", tag.Key, tag.ValueString, result.Height, result.Index))
  334. case abci.KVPair_INT:
  335. return []byte(fmt.Sprintf("%s/%v/%d/%d", tag.Key, tag.ValueInt, result.Height, result.Index))
  336. // case abci.KVPair_TIME:
  337. // return []byte(fmt.Sprintf("%s/%d/%d/%d", tag.Key, tag.ValueTime.Unix(), result.Height, result.Index))
  338. default:
  339. panic(fmt.Sprintf("Undefined value type: %v", tag.ValueType))
  340. }
  341. }
  342. ///////////////////////////////////////////////////////////////////////////////
  343. // Utils
  344. func intersect(as, bs [][]byte) [][]byte {
  345. i := make([][]byte, 0, cmn.MinInt(len(as), len(bs)))
  346. for _, a := range as {
  347. for _, b := range bs {
  348. if bytes.Equal(a, b) {
  349. i = append(i, a)
  350. }
  351. }
  352. }
  353. return i
  354. }