You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

337 lines
9.6 KiB

pubsub 2.0 (#3227) * green pubsub tests :OK: * get rid of clientToQueryMap * Subscribe and SubscribeUnbuffered * start adapting other pkgs to new pubsub * nope * rename MsgAndTags to Message * remove TagMap it does not bring any additional benefits * bring back EventSubscriber * fix test * fix data race in TestStartNextHeightCorrectly ``` Write at 0x00c0001c7418 by goroutine 796: github.com/tendermint/tendermint/consensus.TestStartNextHeightCorrectly() /go/src/github.com/tendermint/tendermint/consensus/state_test.go:1296 +0xad testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Previous read at 0x00c0001c7418 by goroutine 858: github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote() /go/src/github.com/tendermint/tendermint/consensus/state.go:1631 +0x1366 github.com/tendermint/tendermint/consensus.(*ConsensusState).tryAddVote() /go/src/github.com/tendermint/tendermint/consensus/state.go:1476 +0x8f github.com/tendermint/tendermint/consensus.(*ConsensusState).handleMsg() /go/src/github.com/tendermint/tendermint/consensus/state.go:667 +0xa1e github.com/tendermint/tendermint/consensus.(*ConsensusState).receiveRoutine() /go/src/github.com/tendermint/tendermint/consensus/state.go:628 +0x794 Goroutine 796 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 testing.runTests.func1() /usr/local/go/src/testing/testing.go:1119 +0xa8 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 testing.runTests() /usr/local/go/src/testing/testing.go:1117 +0x4ee testing.(*M).Run() /usr/local/go/src/testing/testing.go:1034 +0x2ee main.main() _testmain.go:214 +0x332 Goroutine 858 (running) created at: github.com/tendermint/tendermint/consensus.(*ConsensusState).startRoutines() /go/src/github.com/tendermint/tendermint/consensus/state.go:334 +0x221 github.com/tendermint/tendermint/consensus.startTestRound() /go/src/github.com/tendermint/tendermint/consensus/common_test.go:122 +0x63 github.com/tendermint/tendermint/consensus.TestStateFullRound1() /go/src/github.com/tendermint/tendermint/consensus/state_test.go:255 +0x397 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ``` * fixes after my own review * fix formatting * wait 100ms before kicking a subscriber out + a test for indexer_service * fixes after my second review * no timeout * add changelog entries * fix merge conflicts * fix typos after Thane's review Co-Authored-By: melekes <anton.kalyaev@gmail.com> * reformat code * rewrite indexer service in the attempt to fix failing test https://github.com/tendermint/tendermint/pull/3227/#issuecomment-462316527 * Revert "rewrite indexer service in the attempt to fix failing test" This reverts commit 0d9107a098230de7138abb1c201877c246e89ed1. * another attempt to fix indexer * fixes after Ethan's review * use unbuffered channel when indexing transactions Refs https://github.com/tendermint/tendermint/pull/3227#discussion_r258786716 * add a comment for EventBus#SubscribeUnbuffered * format code
6 years ago
pubsub 2.0 (#3227) * green pubsub tests :OK: * get rid of clientToQueryMap * Subscribe and SubscribeUnbuffered * start adapting other pkgs to new pubsub * nope * rename MsgAndTags to Message * remove TagMap it does not bring any additional benefits * bring back EventSubscriber * fix test * fix data race in TestStartNextHeightCorrectly ``` Write at 0x00c0001c7418 by goroutine 796: github.com/tendermint/tendermint/consensus.TestStartNextHeightCorrectly() /go/src/github.com/tendermint/tendermint/consensus/state_test.go:1296 +0xad testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Previous read at 0x00c0001c7418 by goroutine 858: github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote() /go/src/github.com/tendermint/tendermint/consensus/state.go:1631 +0x1366 github.com/tendermint/tendermint/consensus.(*ConsensusState).tryAddVote() /go/src/github.com/tendermint/tendermint/consensus/state.go:1476 +0x8f github.com/tendermint/tendermint/consensus.(*ConsensusState).handleMsg() /go/src/github.com/tendermint/tendermint/consensus/state.go:667 +0xa1e github.com/tendermint/tendermint/consensus.(*ConsensusState).receiveRoutine() /go/src/github.com/tendermint/tendermint/consensus/state.go:628 +0x794 Goroutine 796 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 testing.runTests.func1() /usr/local/go/src/testing/testing.go:1119 +0xa8 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 testing.runTests() /usr/local/go/src/testing/testing.go:1117 +0x4ee testing.(*M).Run() /usr/local/go/src/testing/testing.go:1034 +0x2ee main.main() _testmain.go:214 +0x332 Goroutine 858 (running) created at: github.com/tendermint/tendermint/consensus.(*ConsensusState).startRoutines() /go/src/github.com/tendermint/tendermint/consensus/state.go:334 +0x221 github.com/tendermint/tendermint/consensus.startTestRound() /go/src/github.com/tendermint/tendermint/consensus/common_test.go:122 +0x63 github.com/tendermint/tendermint/consensus.TestStateFullRound1() /go/src/github.com/tendermint/tendermint/consensus/state_test.go:255 +0x397 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ``` * fixes after my own review * fix formatting * wait 100ms before kicking a subscriber out + a test for indexer_service * fixes after my second review * no timeout * add changelog entries * fix merge conflicts * fix typos after Thane's review Co-Authored-By: melekes <anton.kalyaev@gmail.com> * reformat code * rewrite indexer service in the attempt to fix failing test https://github.com/tendermint/tendermint/pull/3227/#issuecomment-462316527 * Revert "rewrite indexer service in the attempt to fix failing test" This reverts commit 0d9107a098230de7138abb1c201877c246e89ed1. * another attempt to fix indexer * fixes after Ethan's review * use unbuffered channel when indexing transactions Refs https://github.com/tendermint/tendermint/pull/3227#discussion_r258786716 * add a comment for EventBus#SubscribeUnbuffered * format code
6 years ago
pubsub 2.0 (#3227) * green pubsub tests :OK: * get rid of clientToQueryMap * Subscribe and SubscribeUnbuffered * start adapting other pkgs to new pubsub * nope * rename MsgAndTags to Message * remove TagMap it does not bring any additional benefits * bring back EventSubscriber * fix test * fix data race in TestStartNextHeightCorrectly ``` Write at 0x00c0001c7418 by goroutine 796: github.com/tendermint/tendermint/consensus.TestStartNextHeightCorrectly() /go/src/github.com/tendermint/tendermint/consensus/state_test.go:1296 +0xad testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Previous read at 0x00c0001c7418 by goroutine 858: github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote() /go/src/github.com/tendermint/tendermint/consensus/state.go:1631 +0x1366 github.com/tendermint/tendermint/consensus.(*ConsensusState).tryAddVote() /go/src/github.com/tendermint/tendermint/consensus/state.go:1476 +0x8f github.com/tendermint/tendermint/consensus.(*ConsensusState).handleMsg() /go/src/github.com/tendermint/tendermint/consensus/state.go:667 +0xa1e github.com/tendermint/tendermint/consensus.(*ConsensusState).receiveRoutine() /go/src/github.com/tendermint/tendermint/consensus/state.go:628 +0x794 Goroutine 796 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 testing.runTests.func1() /usr/local/go/src/testing/testing.go:1119 +0xa8 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 testing.runTests() /usr/local/go/src/testing/testing.go:1117 +0x4ee testing.(*M).Run() /usr/local/go/src/testing/testing.go:1034 +0x2ee main.main() _testmain.go:214 +0x332 Goroutine 858 (running) created at: github.com/tendermint/tendermint/consensus.(*ConsensusState).startRoutines() /go/src/github.com/tendermint/tendermint/consensus/state.go:334 +0x221 github.com/tendermint/tendermint/consensus.startTestRound() /go/src/github.com/tendermint/tendermint/consensus/common_test.go:122 +0x63 github.com/tendermint/tendermint/consensus.TestStateFullRound1() /go/src/github.com/tendermint/tendermint/consensus/state_test.go:255 +0x397 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ``` * fixes after my own review * fix formatting * wait 100ms before kicking a subscriber out + a test for indexer_service * fixes after my second review * no timeout * add changelog entries * fix merge conflicts * fix typos after Thane's review Co-Authored-By: melekes <anton.kalyaev@gmail.com> * reformat code * rewrite indexer service in the attempt to fix failing test https://github.com/tendermint/tendermint/pull/3227/#issuecomment-462316527 * Revert "rewrite indexer service in the attempt to fix failing test" This reverts commit 0d9107a098230de7138abb1c201877c246e89ed1. * another attempt to fix indexer * fixes after Ethan's review * use unbuffered channel when indexing transactions Refs https://github.com/tendermint/tendermint/pull/3227#discussion_r258786716 * add a comment for EventBus#SubscribeUnbuffered * format code
6 years ago
  1. // Package query provides a parser for a custom query format:
  2. //
  3. // abci.invoice.number=22 AND abci.invoice.owner=Ivan
  4. //
  5. // See query.peg for the grammar, which is a https://en.wikipedia.org/wiki/Parsing_expression_grammar.
  6. // More: https://github.com/PhilippeSigaud/Pegged/wiki/PEG-Basics
  7. //
  8. // It has a support for numbers (integer and floating point), dates and times.
  9. package query
  10. import (
  11. "fmt"
  12. "reflect"
  13. "strconv"
  14. "strings"
  15. "time"
  16. )
  17. // Query holds the query string and the query parser.
  18. type Query struct {
  19. str string
  20. parser *QueryParser
  21. }
  22. // Condition represents a single condition within a query and consists of tag
  23. // (e.g. "tx.gas"), operator (e.g. "=") and operand (e.g. "7").
  24. type Condition struct {
  25. Tag string
  26. Op Operator
  27. Operand interface{}
  28. }
  29. // New parses the given string and returns a query or error if the string is
  30. // invalid.
  31. func New(s string) (*Query, error) {
  32. p := &QueryParser{Buffer: fmt.Sprintf(`"%s"`, s)}
  33. p.Init()
  34. if err := p.Parse(); err != nil {
  35. return nil, err
  36. }
  37. return &Query{str: s, parser: p}, nil
  38. }
  39. // MustParse turns the given string into a query or panics; for tests or others
  40. // cases where you know the string is valid.
  41. func MustParse(s string) *Query {
  42. q, err := New(s)
  43. if err != nil {
  44. panic(fmt.Sprintf("failed to parse %s: %v", s, err))
  45. }
  46. return q
  47. }
  48. // String returns the original string.
  49. func (q *Query) String() string {
  50. return q.str
  51. }
  52. // Operator is an operator that defines some kind of relation between tag and
  53. // operand (equality, etc.).
  54. type Operator uint8
  55. const (
  56. // "<="
  57. OpLessEqual Operator = iota
  58. // ">="
  59. OpGreaterEqual
  60. // "<"
  61. OpLess
  62. // ">"
  63. OpGreater
  64. // "="
  65. OpEqual
  66. // "CONTAINS"; used to check if a string contains a certain sub string.
  67. OpContains
  68. )
  69. const (
  70. // DateLayout defines a layout for all dates (`DATE date`)
  71. DateLayout = "2006-01-02"
  72. // TimeLayout defines a layout for all times (`TIME time`)
  73. TimeLayout = time.RFC3339
  74. )
  75. // Conditions returns a list of conditions.
  76. func (q *Query) Conditions() []Condition {
  77. conditions := make([]Condition, 0)
  78. buffer, begin, end := q.parser.Buffer, 0, 0
  79. var tag string
  80. var op Operator
  81. // tokens must be in the following order: tag ("tx.gas") -> operator ("=") -> operand ("7")
  82. for _, token := range q.parser.Tokens() {
  83. switch token.pegRule {
  84. case rulePegText:
  85. begin, end = int(token.begin), int(token.end)
  86. case ruletag:
  87. tag = buffer[begin:end]
  88. case rulele:
  89. op = OpLessEqual
  90. case rulege:
  91. op = OpGreaterEqual
  92. case rulel:
  93. op = OpLess
  94. case ruleg:
  95. op = OpGreater
  96. case ruleequal:
  97. op = OpEqual
  98. case rulecontains:
  99. op = OpContains
  100. case rulevalue:
  101. // strip single quotes from value (i.e. "'NewBlock'" -> "NewBlock")
  102. valueWithoutSingleQuotes := buffer[begin+1 : end-1]
  103. conditions = append(conditions, Condition{tag, op, valueWithoutSingleQuotes})
  104. case rulenumber:
  105. number := buffer[begin:end]
  106. if strings.ContainsAny(number, ".") { // if it looks like a floating-point number
  107. value, err := strconv.ParseFloat(number, 64)
  108. if err != nil {
  109. panic(fmt.Sprintf("got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", err, number))
  110. }
  111. conditions = append(conditions, Condition{tag, op, value})
  112. } else {
  113. value, err := strconv.ParseInt(number, 10, 64)
  114. if err != nil {
  115. panic(fmt.Sprintf("got %v while trying to parse %s as int64 (should never happen if the grammar is correct)", err, number))
  116. }
  117. conditions = append(conditions, Condition{tag, op, value})
  118. }
  119. case ruletime:
  120. value, err := time.Parse(TimeLayout, buffer[begin:end])
  121. if err != nil {
  122. panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", err, buffer[begin:end]))
  123. }
  124. conditions = append(conditions, Condition{tag, op, value})
  125. case ruledate:
  126. value, err := time.Parse("2006-01-02", buffer[begin:end])
  127. if err != nil {
  128. panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / '2006-01-02' (should never happen if the grammar is correct)", err, buffer[begin:end]))
  129. }
  130. conditions = append(conditions, Condition{tag, op, value})
  131. }
  132. }
  133. return conditions
  134. }
  135. // Matches returns true if the query matches the given set of tags, false otherwise.
  136. //
  137. // For example, query "name=John" matches tags = {"name": "John"}. More
  138. // examples could be found in parser_test.go and query_test.go.
  139. func (q *Query) Matches(tags map[string]string) bool {
  140. if len(tags) == 0 {
  141. return false
  142. }
  143. buffer, begin, end := q.parser.Buffer, 0, 0
  144. var tag string
  145. var op Operator
  146. // tokens must be in the following order: tag ("tx.gas") -> operator ("=") -> operand ("7")
  147. for _, token := range q.parser.Tokens() {
  148. switch token.pegRule {
  149. case rulePegText:
  150. begin, end = int(token.begin), int(token.end)
  151. case ruletag:
  152. tag = buffer[begin:end]
  153. case rulele:
  154. op = OpLessEqual
  155. case rulege:
  156. op = OpGreaterEqual
  157. case rulel:
  158. op = OpLess
  159. case ruleg:
  160. op = OpGreater
  161. case ruleequal:
  162. op = OpEqual
  163. case rulecontains:
  164. op = OpContains
  165. case rulevalue:
  166. // strip single quotes from value (i.e. "'NewBlock'" -> "NewBlock")
  167. valueWithoutSingleQuotes := buffer[begin+1 : end-1]
  168. // see if the triplet (tag, operator, operand) matches any tag
  169. // "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" }
  170. if !match(tag, op, reflect.ValueOf(valueWithoutSingleQuotes), tags) {
  171. return false
  172. }
  173. case rulenumber:
  174. number := buffer[begin:end]
  175. if strings.ContainsAny(number, ".") { // if it looks like a floating-point number
  176. value, err := strconv.ParseFloat(number, 64)
  177. if err != nil {
  178. panic(fmt.Sprintf("got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", err, number))
  179. }
  180. if !match(tag, op, reflect.ValueOf(value), tags) {
  181. return false
  182. }
  183. } else {
  184. value, err := strconv.ParseInt(number, 10, 64)
  185. if err != nil {
  186. panic(fmt.Sprintf("got %v while trying to parse %s as int64 (should never happen if the grammar is correct)", err, number))
  187. }
  188. if !match(tag, op, reflect.ValueOf(value), tags) {
  189. return false
  190. }
  191. }
  192. case ruletime:
  193. value, err := time.Parse(TimeLayout, buffer[begin:end])
  194. if err != nil {
  195. panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", err, buffer[begin:end]))
  196. }
  197. if !match(tag, op, reflect.ValueOf(value), tags) {
  198. return false
  199. }
  200. case ruledate:
  201. value, err := time.Parse("2006-01-02", buffer[begin:end])
  202. if err != nil {
  203. panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / '2006-01-02' (should never happen if the grammar is correct)", err, buffer[begin:end]))
  204. }
  205. if !match(tag, op, reflect.ValueOf(value), tags) {
  206. return false
  207. }
  208. }
  209. }
  210. return true
  211. }
  212. // match returns true if the given triplet (tag, operator, operand) matches any tag.
  213. //
  214. // First, it looks up the tag in tags and if it finds one, tries to compare the
  215. // value from it to the operand using the operator.
  216. //
  217. // "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" }
  218. func match(tag string, op Operator, operand reflect.Value, tags map[string]string) bool {
  219. // look up the tag from the query in tags
  220. value, ok := tags[tag]
  221. if !ok {
  222. return false
  223. }
  224. switch operand.Kind() {
  225. case reflect.Struct: // time
  226. operandAsTime := operand.Interface().(time.Time)
  227. // try our best to convert value from tags to time.Time
  228. var (
  229. v time.Time
  230. err error
  231. )
  232. if strings.ContainsAny(value, "T") {
  233. v, err = time.Parse(TimeLayout, value)
  234. } else {
  235. v, err = time.Parse(DateLayout, value)
  236. }
  237. if err != nil {
  238. panic(fmt.Sprintf("Failed to convert value %v from tag to time.Time: %v", value, err))
  239. }
  240. switch op {
  241. case OpLessEqual:
  242. return v.Before(operandAsTime) || v.Equal(operandAsTime)
  243. case OpGreaterEqual:
  244. return v.Equal(operandAsTime) || v.After(operandAsTime)
  245. case OpLess:
  246. return v.Before(operandAsTime)
  247. case OpGreater:
  248. return v.After(operandAsTime)
  249. case OpEqual:
  250. return v.Equal(operandAsTime)
  251. }
  252. case reflect.Float64:
  253. operandFloat64 := operand.Interface().(float64)
  254. var v float64
  255. // try our best to convert value from tags to float64
  256. v, err := strconv.ParseFloat(value, 64)
  257. if err != nil {
  258. panic(fmt.Sprintf("Failed to convert value %v from tag to float64: %v", value, err))
  259. }
  260. switch op {
  261. case OpLessEqual:
  262. return v <= operandFloat64
  263. case OpGreaterEqual:
  264. return v >= operandFloat64
  265. case OpLess:
  266. return v < operandFloat64
  267. case OpGreater:
  268. return v > operandFloat64
  269. case OpEqual:
  270. return v == operandFloat64
  271. }
  272. case reflect.Int64:
  273. operandInt := operand.Interface().(int64)
  274. var v int64
  275. // if value looks like float, we try to parse it as float
  276. if strings.ContainsAny(value, ".") {
  277. v1, err := strconv.ParseFloat(value, 64)
  278. if err != nil {
  279. panic(fmt.Sprintf("Failed to convert value %v from tag to float64: %v", value, err))
  280. }
  281. v = int64(v1)
  282. } else {
  283. var err error
  284. // try our best to convert value from tags to int64
  285. v, err = strconv.ParseInt(value, 10, 64)
  286. if err != nil {
  287. panic(fmt.Sprintf("Failed to convert value %v from tag to int64: %v", value, err))
  288. }
  289. }
  290. switch op {
  291. case OpLessEqual:
  292. return v <= operandInt
  293. case OpGreaterEqual:
  294. return v >= operandInt
  295. case OpLess:
  296. return v < operandInt
  297. case OpGreater:
  298. return v > operandInt
  299. case OpEqual:
  300. return v == operandInt
  301. }
  302. case reflect.String:
  303. switch op {
  304. case OpEqual:
  305. return value == operand.String()
  306. case OpContains:
  307. return strings.Contains(value, operand.String())
  308. }
  309. default:
  310. panic(fmt.Sprintf("Unknown kind of operand %v", operand.Kind()))
  311. }
  312. return false
  313. }