Browse Source

Merge branch 'develop' into sdk2

pull/1842/head
Ethan Buchman 7 years ago
committed by GitHub
parent
commit
a991e2fe9c
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 1408 additions and 475 deletions
  1. +19
    -0
      .editorconfig
  2. +37
    -0
      CHANGELOG.md
  3. +193
    -0
      LICENSE
  4. +2
    -2
      Makefile
  5. +49
    -0
      README.md
  6. +7
    -20
      cli/setup.go
  7. +5
    -11
      cli/setup_test.go
  8. +152
    -94
      clist/clist.go
  9. +1
    -1
      clist/clist_test.go
  10. +3
    -4
      common/bit_array.go
  11. +10
    -0
      common/int.go
  12. +14
    -0
      common/int_test.go
  13. +100
    -21
      common/random.go
  14. +120
    -0
      common/random_test.go
  15. +184
    -42
      common/repeat_timer.go
  16. +92
    -0
      common/repeat_timer_test.go
  17. +32
    -14
      common/service.go
  18. +38
    -8
      common/service_test.go
  19. +10
    -0
      common/string.go
  20. +14
    -0
      common/string_test.go
  21. +78
    -0
      common/throttle_timer_test.go
  22. +14
    -14
      events/events_test.go
  23. +2
    -2
      glide.lock
  24. +0
    -1
      glide.yaml
  25. +0
    -78
      logger/log.go
  26. +0
    -76
      process/process.go
  27. +0
    -22
      process/util.go
  28. +56
    -13
      pubsub/pubsub.go
  29. +25
    -7
      pubsub/pubsub_test.go
  30. +2
    -2
      pubsub/query/parser_test.go
  31. +114
    -32
      pubsub/query/query.go
  32. +24
    -3
      pubsub/query/query_test.go
  33. +10
    -7
      test.sh
  34. +1
    -1
      version/version.go

+ 19
- 0
.editorconfig View File

@ -0,0 +1,19 @@
# top-most EditorConfig file
root = true
# Unix-style newlines with a newline ending every file
[*]
charset = utf-8
end_of_line = lf
insert_final_newline = true
trim_trailing_whitespace = true
[Makefile]
indent_style = tab
[*.sh]
indent_style = tab
[*.proto]
indent_style = space
indent_size = 2

+ 37
- 0
CHANGELOG.md View File

@ -1,5 +1,42 @@
# Changelog
## 0.6.0 (December 29, 2017)
BREAKING:
- [cli] remove --root
- [pubsub] add String() method to Query interface
IMPROVEMENTS:
- [common] use a thread-safe and well seeded non-crypto rng
BUG FIXES
- [clist] fix misuse of wait group
- [common] introduce Ticker interface and logicalTicker for better testing of timers
## 0.5.0 (December 5, 2017)
BREAKING:
- [common] replace Service#Start, Service#Stop first return value (bool) with an
error (ErrAlreadyStarted, ErrAlreadyStopped)
- [common] replace Service#Reset first return value (bool) with an error
- [process] removed
FEATURES:
- [common] IntInSlice and StringInSlice functions
- [pubsub/query] introduce `Condition` struct, expose `Operator`, and add `query.Conditions()`
## 0.4.1 (November 27, 2017)
FEATURES:
- [common] `Keys()` method on `CMap`
IMPROVEMENTS:
- [log] complex types now encoded as "%+v" by default if `String()` method is undefined (previously resulted in error)
- [log] logger logs its own errors
BUG FIXES:
- [common] fixed `Kill()` to build on Windows (Windows does not have `syscall.Kill`)
## 0.4.0 (October 26, 2017)
BREAKING:


+ 193
- 0
LICENSE View File

@ -0,0 +1,193 @@
Tendermint Libraries
Copyright (C) 2017 Tendermint
Apache License
Version 2.0, January 2004
https://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

+ 2
- 2
Makefile View File

@ -10,7 +10,6 @@ all: check get_vendor_deps protoc build test install metalinter
check: check_tools
########################################
### Build
@ -62,7 +61,6 @@ get_vendor_deps:
@echo "--> Running glide install"
@glide install
########################################
### Testing
@ -95,7 +93,9 @@ metalinter:
--enable=varcheck \
--enable=vetshadow \
./...
#--enable=gas \
#--enable=aligncheck \
#--enable=dupl \
#--enable=errcheck \
#--enable=gocyclo \


+ 49
- 0
README.md View File

@ -0,0 +1,49 @@
# TMLIBS
This repo is a home for various small packages.
## autofile
Autofile is file access with automatic log rotation. A group of files is maintained and rotation happens
when the leading file gets too big. Provides a reader for reading from the file group.
## cli
CLI wraps the `cobra` and `viper` packages and handles some common elements of building a CLI like flags and env vars for the home directory and the logger.
## clist
Clist provides a linekd list that is safe for concurrent access by many readers.
## common
Common provides a hodgepodge of useful functions.
## db
DB provides a database interface and a number of implementions, including ones using an in-memory map, the filesystem directory structure,
an implemention of LevelDB in Go, and the official LevelDB in C.
## events
Events is a synchronous PubSub package.
## flowrate
Flowrate is a fork of https://github.com/mxk/go-flowrate that added a `SetREMA` method.
## log
Log is a log package structured around key-value pairs that allows logging level to be set differently for different keys.
## merkle
Merkle provides a simple static merkle tree and corresponding proofs.
## process
Process is a simple utility for spawning OS processes.
## pubsub
PubSub is an asynchronous PubSub package.

+ 7
- 20
cli/setup.go View File

@ -14,7 +14,6 @@ import (
)
const (
RootFlag = "root"
HomeFlag = "home"
TraceFlag = "trace"
OutputFlag = "output"
@ -28,14 +27,9 @@ type Executable interface {
}
// PrepareBaseCmd is meant for tendermint and other servers
func PrepareBaseCmd(cmd *cobra.Command, envPrefix, defautRoot string) Executor {
func PrepareBaseCmd(cmd *cobra.Command, envPrefix, defaultHome string) Executor {
cobra.OnInitialize(func() { initEnv(envPrefix) })
cmd.PersistentFlags().StringP(RootFlag, "r", defautRoot, "DEPRECATED. Use --home")
// -h is already reserved for --help as part of the cobra framework
// do you want to try something else??
// also, default must be empty, so we can detect this unset and fall back
// to --root / TM_ROOT / TMROOT
cmd.PersistentFlags().String(HomeFlag, "", "root directory for config and data")
cmd.PersistentFlags().StringP(HomeFlag, "", defaultHome, "directory for config and data")
cmd.PersistentFlags().Bool(TraceFlag, false, "print out full stack trace on errors")
cmd.PersistentPreRunE = concatCobraCmdFuncs(bindFlagsLoadViper, cmd.PersistentPreRunE)
return Executor{cmd, os.Exit}
@ -45,11 +39,11 @@ func PrepareBaseCmd(cmd *cobra.Command, envPrefix, defautRoot string) Executor {
//
// This adds --encoding (hex, btc, base64) and --output (text, json) to
// the command. These only really make sense in interactive commands.
func PrepareMainCmd(cmd *cobra.Command, envPrefix, defautRoot string) Executor {
func PrepareMainCmd(cmd *cobra.Command, envPrefix, defaultHome string) Executor {
cmd.PersistentFlags().StringP(EncodingFlag, "e", "hex", "Binary encoding (hex|b64|btc)")
cmd.PersistentFlags().StringP(OutputFlag, "o", "text", "Output format (text|json)")
cmd.PersistentPreRunE = concatCobraCmdFuncs(setEncoding, validateOutput, cmd.PersistentPreRunE)
return PrepareBaseCmd(cmd, envPrefix, defautRoot)
return PrepareBaseCmd(cmd, envPrefix, defaultHome)
}
// initEnv sets to use ENV variables if set.
@ -136,17 +130,10 @@ func bindFlagsLoadViper(cmd *cobra.Command, args []string) error {
return err
}
// rootDir is command line flag, env variable, or default $HOME/.tlc
// NOTE: we support both --root and --home for now, but eventually only --home
// Also ensure we set the correct rootDir under HomeFlag so we dont need to
// repeat this logic elsewhere.
rootDir := viper.GetString(HomeFlag)
if rootDir == "" {
rootDir = viper.GetString(RootFlag)
viper.Set(HomeFlag, rootDir)
}
homeDir := viper.GetString(HomeFlag)
viper.Set(HomeFlag, homeDir)
viper.SetConfigName("config") // name of config file (without extension)
viper.AddConfigPath(rootDir) // search root directory
viper.AddConfigPath(homeDir) // search root directory
// If a config file is found, read it in.
if err := viper.ReadInConfig(); err == nil {


+ 5
- 11
cli/setup_test.go View File

@ -57,12 +57,9 @@ func TestSetupEnv(t *testing.T) {
func TestSetupConfig(t *testing.T) {
// we pre-create two config files we can refer to in the rest of
// the test cases.
cval1, cval2 := "fubble", "wubble"
cval1 := "fubble"
conf1, err := WriteDemoConfig(map[string]string{"boo": cval1})
require.Nil(t, err)
// make sure it handles dashed-words in the config, and ignores random info
conf2, err := WriteDemoConfig(map[string]string{"boo": cval2, "foo": "bar", "two-words": "WORD"})
require.Nil(t, err)
cases := []struct {
args []string
@ -74,16 +71,13 @@ func TestSetupConfig(t *testing.T) {
// setting on the command line
{[]string{"--boo", "haha"}, nil, "haha", ""},
{[]string{"--two-words", "rocks"}, nil, "", "rocks"},
{[]string{"--root", conf1}, nil, cval1, ""},
{[]string{"--home", conf1}, nil, cval1, ""},
// test both variants of the prefix
{nil, map[string]string{"RD_BOO": "bang"}, "bang", ""},
{nil, map[string]string{"RD_TWO_WORDS": "fly"}, "", "fly"},
{nil, map[string]string{"RDTWO_WORDS": "fly"}, "", "fly"},
{nil, map[string]string{"RD_ROOT": conf1}, cval1, ""},
{nil, map[string]string{"RDROOT": conf2}, cval2, "WORD"},
{nil, map[string]string{"RD_HOME": conf1}, cval1, ""},
{nil, map[string]string{"RDHOME": conf1}, cval1, ""},
// and when both are set??? HOME wins every time!
{[]string{"--root", conf1}, map[string]string{"RDHOME": conf2}, cval2, "WORD"},
}
for idx, tc := range cases {
@ -156,10 +150,10 @@ func TestSetupUnmarshal(t *testing.T) {
{nil, nil, c("", 0)},
// setting on the command line
{[]string{"--name", "haha"}, nil, c("haha", 0)},
{[]string{"--root", conf1}, nil, c(cval1, 0)},
{[]string{"--home", conf1}, nil, c(cval1, 0)},
// test both variants of the prefix
{nil, map[string]string{"MR_AGE": "56"}, c("", 56)},
{nil, map[string]string{"MR_ROOT": conf1}, c(cval1, 0)},
{nil, map[string]string{"MR_HOME": conf1}, c(cval1, 0)},
{[]string{"--age", "17"}, map[string]string{"MRHOME": conf2}, c(cval2, 17)},
}


+ 152
- 94
clist/clist.go View File

@ -1,46 +1,68 @@
package clist
/*
The purpose of CList is to provide a goroutine-safe linked-list.
This list can be traversed concurrently by any number of goroutines.
However, removed CElements cannot be added back.
NOTE: Not all methods of container/list are (yet) implemented.
NOTE: Removed elements need to DetachPrev or DetachNext consistently
to ensure garbage collection of removed elements.
*/
import (
"sync"
"sync/atomic"
"unsafe"
)
// CElement is an element of a linked-list
// Traversal from a CElement are goroutine-safe.
/*
CElement is an element of a linked-list
Traversal from a CElement is goroutine-safe.
We can't avoid using WaitGroups or for-loops given the documentation
spec without re-implementing the primitives that already exist in
golang/sync. Notice that WaitGroup allows many go-routines to be
simultaneously released, which is what we want. Mutex doesn't do
this. RWMutex does this, but it's clumsy to use in the way that a
WaitGroup would be used -- and we'd end up having two RWMutex's for
prev/next each, which is doubly confusing.
sync.Cond would be sort-of useful, but we don't need a write-lock in
the for-loop. Use sync.Cond when you need serial access to the
"condition". In our case our condition is if `next != nil || removed`,
and there's no reason to serialize that condition for goroutines
waiting on NextWait() (since it's just a read operation).
*/
type CElement struct {
prev unsafe.Pointer
mtx sync.RWMutex
prev *CElement
prevWg *sync.WaitGroup
next unsafe.Pointer
next *CElement
nextWg *sync.WaitGroup
removed uint32
Value interface{}
removed bool
Value interface{} // immutable
}
// Blocking implementation of Next().
// May return nil iff CElement was tail and got removed.
func (e *CElement) NextWait() *CElement {
for {
e.nextWg.Wait()
next := e.Next()
if next == nil {
if e.Removed() {
return nil
} else {
continue
}
} else {
e.mtx.RLock()
next := e.next
nextWg := e.nextWg
removed := e.removed
e.mtx.RUnlock()
if next != nil || removed {
return next
}
nextWg.Wait()
// e.next doesn't necessarily exist here.
// That's why we need to continue a for-loop.
}
}
@ -48,82 +70,113 @@ func (e *CElement) NextWait() *CElement {
// May return nil iff CElement was head and got removed.
func (e *CElement) PrevWait() *CElement {
for {
e.prevWg.Wait()
prev := e.Prev()
if prev == nil {
if e.Removed() {
return nil
} else {
continue
}
} else {
e.mtx.RLock()
prev := e.prev
prevWg := e.prevWg
removed := e.removed
e.mtx.RUnlock()
if prev != nil || removed {
return prev
}
prevWg.Wait()
}
}
// Nonblocking, may return nil if at the end.
func (e *CElement) Next() *CElement {
return (*CElement)(atomic.LoadPointer(&e.next))
e.mtx.RLock()
defer e.mtx.RUnlock()
return e.next
}
// Nonblocking, may return nil if at the end.
func (e *CElement) Prev() *CElement {
return (*CElement)(atomic.LoadPointer(&e.prev))
e.mtx.RLock()
defer e.mtx.RUnlock()
return e.prev
}
func (e *CElement) Removed() bool {
return atomic.LoadUint32(&(e.removed)) > 0
e.mtx.RLock()
defer e.mtx.RUnlock()
return e.removed
}
func (e *CElement) DetachNext() {
if !e.Removed() {
panic("DetachNext() must be called after Remove(e)")
}
atomic.StorePointer(&e.next, nil)
e.mtx.Lock()
defer e.mtx.Unlock()
e.next = nil
}
func (e *CElement) DetachPrev() {
if !e.Removed() {
panic("DetachPrev() must be called after Remove(e)")
}
atomic.StorePointer(&e.prev, nil)
e.mtx.Lock()
defer e.mtx.Unlock()
e.prev = nil
}
func (e *CElement) setNextAtomic(next *CElement) {
for {
oldNext := atomic.LoadPointer(&e.next)
if !atomic.CompareAndSwapPointer(&(e.next), oldNext, unsafe.Pointer(next)) {
continue
}
if next == nil && oldNext != nil { // We for-loop in NextWait() so race is ok
e.nextWg.Add(1)
}
if next != nil && oldNext == nil {
e.nextWg.Done()
}
return
// NOTE: This function needs to be safe for
// concurrent goroutines waiting on nextWg.
func (e *CElement) SetNext(newNext *CElement) {
e.mtx.Lock()
defer e.mtx.Unlock()
oldNext := e.next
e.next = newNext
if oldNext != nil && newNext == nil {
// See https://golang.org/pkg/sync/:
//
// If a WaitGroup is reused to wait for several independent sets of
// events, new Add calls must happen after all previous Wait calls have
// returned.
e.nextWg = waitGroup1() // WaitGroups are difficult to re-use.
}
if oldNext == nil && newNext != nil {
e.nextWg.Done()
}
}
func (e *CElement) setPrevAtomic(prev *CElement) {
for {
oldPrev := atomic.LoadPointer(&e.prev)
if !atomic.CompareAndSwapPointer(&(e.prev), oldPrev, unsafe.Pointer(prev)) {
continue
}
if prev == nil && oldPrev != nil { // We for-loop in PrevWait() so race is ok
e.prevWg.Add(1)
}
if prev != nil && oldPrev == nil {
e.prevWg.Done()
}
return
// NOTE: This function needs to be safe for
// concurrent goroutines waiting on prevWg
func (e *CElement) SetPrev(newPrev *CElement) {
e.mtx.Lock()
defer e.mtx.Unlock()
oldPrev := e.prev
e.prev = newPrev
if oldPrev != nil && newPrev == nil {
e.prevWg = waitGroup1() // WaitGroups are difficult to re-use.
}
if oldPrev == nil && newPrev != nil {
e.prevWg.Done()
}
}
func (e *CElement) setRemovedAtomic() {
atomic.StoreUint32(&(e.removed), 1)
func (e *CElement) SetRemoved() {
e.mtx.Lock()
defer e.mtx.Unlock()
e.removed = true
// This wakes up anyone waiting in either direction.
if e.prev == nil {
e.prevWg.Done()
}
if e.next == nil {
e.nextWg.Done()
}
}
//--------------------------------------------------------------------------------
@ -132,7 +185,7 @@ func (e *CElement) setRemovedAtomic() {
// The zero value for CList is an empty list ready to use.
// Operations are goroutine-safe.
type CList struct {
mtx sync.Mutex
mtx sync.RWMutex
wg *sync.WaitGroup
head *CElement // first element
tail *CElement // last element
@ -142,6 +195,7 @@ type CList struct {
func (l *CList) Init() *CList {
l.mtx.Lock()
defer l.mtx.Unlock()
l.wg = waitGroup1()
l.head = nil
l.tail = nil
@ -152,48 +206,55 @@ func (l *CList) Init() *CList {
func New() *CList { return new(CList).Init() }
func (l *CList) Len() int {
l.mtx.Lock()
defer l.mtx.Unlock()
l.mtx.RLock()
defer l.mtx.RUnlock()
return l.len
}
func (l *CList) Front() *CElement {
l.mtx.Lock()
defer l.mtx.Unlock()
l.mtx.RLock()
defer l.mtx.RUnlock()
return l.head
}
func (l *CList) FrontWait() *CElement {
// Loop until the head is non-nil else wait and try again
for {
l.mtx.Lock()
l.mtx.RLock()
head := l.head
wg := l.wg
l.mtx.Unlock()
if head == nil {
wg.Wait()
} else {
l.mtx.RUnlock()
if head != nil {
return head
}
wg.Wait()
// NOTE: If you think l.head exists here, think harder.
}
}
func (l *CList) Back() *CElement {
l.mtx.Lock()
defer l.mtx.Unlock()
l.mtx.RLock()
defer l.mtx.RUnlock()
return l.tail
}
func (l *CList) BackWait() *CElement {
for {
l.mtx.Lock()
l.mtx.RLock()
tail := l.tail
wg := l.wg
l.mtx.Unlock()
if tail == nil {
wg.Wait()
} else {
l.mtx.RUnlock()
if tail != nil {
return tail
}
wg.Wait()
// l.tail doesn't necessarily exist here.
// That's why we need to continue a for-loop.
}
}
@ -203,11 +264,12 @@ func (l *CList) PushBack(v interface{}) *CElement {
// Construct a new element
e := &CElement{
prev: nil,
prevWg: waitGroup1(),
next: nil,
nextWg: waitGroup1(),
Value: v,
prev: nil,
prevWg: waitGroup1(),
next: nil,
nextWg: waitGroup1(),
removed: false,
Value: v,
}
// Release waiters on FrontWait/BackWait maybe
@ -221,9 +283,9 @@ func (l *CList) PushBack(v interface{}) *CElement {
l.head = e
l.tail = e
} else {
l.tail.setNextAtomic(e)
e.setPrevAtomic(l.tail)
l.tail = e
e.SetPrev(l.tail) // We must init e first.
l.tail.SetNext(e) // This will make e accessible.
l.tail = e // Update the list.
}
return e
@ -250,30 +312,26 @@ func (l *CList) Remove(e *CElement) interface{} {
// If we're removing the only item, make CList FrontWait/BackWait wait.
if l.len == 1 {
l.wg.Add(1)
l.wg = waitGroup1() // WaitGroups are difficult to re-use.
}
// Update l.len
l.len -= 1
// Connect next/prev and set head/tail
if prev == nil {
l.head = next
} else {
prev.setNextAtomic(next)
prev.SetNext(next)
}
if next == nil {
l.tail = prev
} else {
next.setPrevAtomic(prev)
next.SetPrev(prev)
}
// Set .Done() on e, otherwise waiters will wait forever.
e.setRemovedAtomic()
if prev == nil {
e.prevWg.Done()
}
if next == nil {
e.nextWg.Done()
}
e.SetRemoved()
return e.Value
}


+ 1
- 1
clist/clist_test.go View File

@ -149,7 +149,7 @@ func _TestGCRandom(t *testing.T) {
func TestScanRightDeleteRandom(t *testing.T) {
const numElements = 10000
const numTimes = 100000
const numTimes = 1000
const numScanners = 10
l := New()


+ 3
- 4
common/bit_array.go View File

@ -3,7 +3,6 @@ package common
import (
"encoding/binary"
"fmt"
"math/rand"
"strings"
"sync"
)
@ -212,12 +211,12 @@ func (bA *BitArray) PickRandom() (int, bool) {
if length == 0 {
return 0, false
}
randElemStart := rand.Intn(length)
randElemStart := RandIntn(length)
for i := 0; i < length; i++ {
elemIdx := ((i + randElemStart) % length)
if elemIdx < length-1 {
if bA.Elems[elemIdx] > 0 {
randBitStart := rand.Intn(64)
randBitStart := RandIntn(64)
for j := 0; j < 64; j++ {
bitIdx := ((j + randBitStart) % 64)
if (bA.Elems[elemIdx] & (uint64(1) << uint(bitIdx))) > 0 {
@ -232,7 +231,7 @@ func (bA *BitArray) PickRandom() (int, bool) {
if elemBits == 0 {
elemBits = 64
}
randBitStart := rand.Intn(elemBits)
randBitStart := RandIntn(elemBits)
for j := 0; j < elemBits; j++ {
bitIdx := ((j + randBitStart) % elemBits)
if (bA.Elems[elemIdx] & (uint64(1) << uint(bitIdx))) > 0 {


+ 10
- 0
common/int.go View File

@ -53,3 +53,13 @@ func PutInt64BE(dest []byte, i int64) {
func GetInt64BE(src []byte) int64 {
return int64(binary.BigEndian.Uint64(src))
}
// IntInSlice returns true if a is found in the list.
func IntInSlice(a int, list []int) bool {
for _, b := range list {
if b == a {
return true
}
}
return false
}

+ 14
- 0
common/int_test.go View File

@ -0,0 +1,14 @@
package common
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestIntInSlice(t *testing.T) {
assert.True(t, IntInSlice(1, []int{1, 2, 3}))
assert.False(t, IntInSlice(4, []int{1, 2, 3}))
assert.True(t, IntInSlice(0, []int{0}))
assert.False(t, IntInSlice(0, []int{}))
}

+ 100
- 21
common/random.go View File

@ -2,7 +2,8 @@ package common
import (
crand "crypto/rand"
"math/rand"
mrand "math/rand"
"sync"
"time"
)
@ -10,22 +11,36 @@ const (
strChars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" // 62 characters
)
func init() {
// pseudo random number generator.
// seeded with OS randomness (crand)
var prng struct {
sync.Mutex
*mrand.Rand
}
func reset() {
b := cRandBytes(8)
var seed uint64
for i := 0; i < 8; i++ {
seed |= uint64(b[i])
seed <<= 8
}
rand.Seed(int64(seed))
prng.Lock()
prng.Rand = mrand.New(mrand.NewSource(int64(seed)))
prng.Unlock()
}
func init() {
reset()
}
// Constructs an alphanumeric string of given length.
// It is not safe for cryptographic usage.
func RandStr(length int) string {
chars := []byte{}
MAIN_LOOP:
for {
val := rand.Int63()
val := RandInt63()
for i := 0; i < 10; i++ {
v := int(val & 0x3f) // rightmost 6 bits
if v >= 62 { // only 62 characters in strChars
@ -44,87 +59,151 @@ MAIN_LOOP:
return string(chars)
}
// It is not safe for cryptographic usage.
func RandUint16() uint16 {
return uint16(rand.Uint32() & (1<<16 - 1))
return uint16(RandUint32() & (1<<16 - 1))
}
// It is not safe for cryptographic usage.
func RandUint32() uint32 {
return rand.Uint32()
prng.Lock()
u32 := prng.Uint32()
prng.Unlock()
return u32
}
// It is not safe for cryptographic usage.
func RandUint64() uint64 {
return uint64(rand.Uint32())<<32 + uint64(rand.Uint32())
return uint64(RandUint32())<<32 + uint64(RandUint32())
}
// It is not safe for cryptographic usage.
func RandUint() uint {
return uint(rand.Int())
prng.Lock()
i := prng.Int()
prng.Unlock()
return uint(i)
}
// It is not safe for cryptographic usage.
func RandInt16() int16 {
return int16(rand.Uint32() & (1<<16 - 1))
return int16(RandUint32() & (1<<16 - 1))
}
// It is not safe for cryptographic usage.
func RandInt32() int32 {
return int32(rand.Uint32())
return int32(RandUint32())
}
// It is not safe for cryptographic usage.
func RandInt64() int64 {
return int64(rand.Uint32())<<32 + int64(rand.Uint32())
return int64(RandUint64())
}
// It is not safe for cryptographic usage.
func RandInt() int {
return rand.Int()
prng.Lock()
i := prng.Int()
prng.Unlock()
return i
}
// It is not safe for cryptographic usage.
func RandInt31() int32 {
prng.Lock()
i31 := prng.Int31()
prng.Unlock()
return i31
}
// It is not safe for cryptographic usage.
func RandInt63() int64 {
prng.Lock()
i63 := prng.Int63()
prng.Unlock()
return i63
}
// Distributed pseudo-exponentially to test for various cases
// It is not safe for cryptographic usage.
func RandUint16Exp() uint16 {
bits := rand.Uint32() % 16
bits := RandUint32() % 16
if bits == 0 {
return 0
}
n := uint16(1 << (bits - 1))
n += uint16(rand.Int31()) & ((1 << (bits - 1)) - 1)
n += uint16(RandInt31()) & ((1 << (bits - 1)) - 1)
return n
}
// Distributed pseudo-exponentially to test for various cases
// It is not safe for cryptographic usage.
func RandUint32Exp() uint32 {
bits := rand.Uint32() % 32
bits := RandUint32() % 32
if bits == 0 {
return 0
}
n := uint32(1 << (bits - 1))
n += uint32(rand.Int31()) & ((1 << (bits - 1)) - 1)
n += uint32(RandInt31()) & ((1 << (bits - 1)) - 1)
return n
}
// Distributed pseudo-exponentially to test for various cases
// It is not safe for cryptographic usage.
func RandUint64Exp() uint64 {
bits := rand.Uint32() % 64
bits := RandUint32() % 64
if bits == 0 {
return 0
}
n := uint64(1 << (bits - 1))
n += uint64(rand.Int63()) & ((1 << (bits - 1)) - 1)
n += uint64(RandInt63()) & ((1 << (bits - 1)) - 1)
return n
}
// It is not safe for cryptographic usage.
func RandFloat32() float32 {
return rand.Float32()
prng.Lock()
f32 := prng.Float32()
prng.Unlock()
return f32
}
// It is not safe for cryptographic usage.
func RandTime() time.Time {
return time.Unix(int64(RandUint64Exp()), 0)
}
// RandBytes returns n random bytes from the OS's source of entropy ie. via crypto/rand.
// It is not safe for cryptographic usage.
func RandBytes(n int) []byte {
// cRandBytes isn't guaranteed to be fast so instead
// use random bytes generated from the internal PRNG
bs := make([]byte, n)
for i := 0; i < n; i++ {
bs[i] = byte(rand.Intn(256))
for i := 0; i < len(bs); i++ {
bs[i] = byte(RandInt() & 0xFF)
}
return bs
}
// RandIntn returns, as an int, a non-negative pseudo-random number in [0, n).
// It panics if n <= 0.
// It is not safe for cryptographic usage.
func RandIntn(n int) int {
prng.Lock()
i := prng.Intn(n)
prng.Unlock()
return i
}
// RandPerm returns a pseudo-random permutation of n integers in [0, n).
// It is not safe for cryptographic usage.
func RandPerm(n int) []int {
prng.Lock()
perm := prng.Perm(n)
prng.Unlock()
return perm
}
// NOTE: This relies on the os's random number generator.
// For real security, we should salt that with some seed.
// See github.com/tendermint/go-crypto for a more secure reader.


+ 120
- 0
common/random_test.go View File

@ -0,0 +1,120 @@
package common
import (
"bytes"
"encoding/json"
"fmt"
"io"
mrand "math/rand"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestRandStr(t *testing.T) {
l := 243
s := RandStr(l)
assert.Equal(t, l, len(s))
}
func TestRandBytes(t *testing.T) {
l := 243
b := RandBytes(l)
assert.Equal(t, l, len(b))
}
func TestRandIntn(t *testing.T) {
n := 243
for i := 0; i < 100; i++ {
x := RandIntn(n)
assert.True(t, x < n)
}
}
// It is essential that these tests run and never repeat their outputs
// lest we've been pwned and the behavior of our randomness is controlled.
// See Issues:
// * https://github.com/tendermint/tmlibs/issues/99
// * https://github.com/tendermint/tendermint/issues/973
func TestUniqueRng(t *testing.T) {
buf := new(bytes.Buffer)
outputs := make(map[string][]int)
for i := 0; i < 100; i++ {
testThemAll(buf)
output := buf.String()
buf.Reset()
runs, seen := outputs[output]
if seen {
t.Errorf("Run #%d's output was already seen in previous runs: %v", i, runs)
}
outputs[output] = append(outputs[output], i)
}
}
func testThemAll(out io.Writer) {
// Reset the internal PRNG
reset()
// Set math/rand's Seed so that any direct invocations
// of math/rand will reveal themselves.
mrand.Seed(1)
perm := RandPerm(10)
blob, _ := json.Marshal(perm)
fmt.Fprintf(out, "perm: %s\n", blob)
fmt.Fprintf(out, "randInt: %d\n", RandInt())
fmt.Fprintf(out, "randUint: %d\n", RandUint())
fmt.Fprintf(out, "randIntn: %d\n", RandIntn(97))
fmt.Fprintf(out, "randInt31: %d\n", RandInt31())
fmt.Fprintf(out, "randInt32: %d\n", RandInt32())
fmt.Fprintf(out, "randInt63: %d\n", RandInt63())
fmt.Fprintf(out, "randInt64: %d\n", RandInt64())
fmt.Fprintf(out, "randUint32: %d\n", RandUint32())
fmt.Fprintf(out, "randUint64: %d\n", RandUint64())
fmt.Fprintf(out, "randUint16Exp: %d\n", RandUint16Exp())
fmt.Fprintf(out, "randUint32Exp: %d\n", RandUint32Exp())
fmt.Fprintf(out, "randUint64Exp: %d\n", RandUint64Exp())
}
func TestRngConcurrencySafety(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = RandUint64()
<-time.After(time.Millisecond * time.Duration(RandIntn(100)))
_ = RandPerm(3)
}()
}
wg.Wait()
}
func BenchmarkRandBytes10B(b *testing.B) {
benchmarkRandBytes(b, 10)
}
func BenchmarkRandBytes100B(b *testing.B) {
benchmarkRandBytes(b, 100)
}
func BenchmarkRandBytes1KiB(b *testing.B) {
benchmarkRandBytes(b, 1024)
}
func BenchmarkRandBytes10KiB(b *testing.B) {
benchmarkRandBytes(b, 10*1024)
}
func BenchmarkRandBytes100KiB(b *testing.B) {
benchmarkRandBytes(b, 100*1024)
}
func BenchmarkRandBytes1MiB(b *testing.B) {
benchmarkRandBytes(b, 1024*1024)
}
func benchmarkRandBytes(b *testing.B, n int) {
for i := 0; i < b.N; i++ {
_ = RandBytes(n)
}
b.ReportAllocs()
}

+ 184
- 42
common/repeat_timer.go View File

@ -5,82 +5,224 @@ import (
"time"
)
// Used by RepeatTimer the first time,
// and every time it's Reset() after Stop().
type TickerMaker func(dur time.Duration) Ticker
// Ticker is a basic ticker interface.
type Ticker interface {
// Never changes, never closes.
Chan() <-chan time.Time
// Stopping a stopped Ticker will panic.
Stop()
}
//----------------------------------------
// defaultTickerMaker
func defaultTickerMaker(dur time.Duration) Ticker {
ticker := time.NewTicker(dur)
return (*defaultTicker)(ticker)
}
type defaultTicker time.Ticker
// Implements Ticker
func (t *defaultTicker) Chan() <-chan time.Time {
return t.C
}
// Implements Ticker
func (t *defaultTicker) Stop() {
((*time.Ticker)(t)).Stop()
}
//----------------------------------------
// LogicalTickerMaker
// Construct a TickerMaker that always uses `source`.
// It's useful for simulating a deterministic clock.
func NewLogicalTickerMaker(source chan time.Time) TickerMaker {
return func(dur time.Duration) Ticker {
return newLogicalTicker(source, dur)
}
}
type logicalTicker struct {
source <-chan time.Time
ch chan time.Time
quit chan struct{}
}
func newLogicalTicker(source <-chan time.Time, interval time.Duration) Ticker {
lt := &logicalTicker{
source: source,
ch: make(chan time.Time),
quit: make(chan struct{}),
}
go lt.fireRoutine(interval)
return lt
}
// We need a goroutine to read times from t.source
// and fire on t.Chan() when `interval` has passed.
func (t *logicalTicker) fireRoutine(interval time.Duration) {
source := t.source
// Init `lasttime`
lasttime := time.Time{}
select {
case lasttime = <-source:
case <-t.quit:
return
}
// Init `lasttime` end
timeleft := interval
for {
select {
case newtime := <-source:
elapsed := newtime.Sub(lasttime)
timeleft -= elapsed
if timeleft <= 0 {
// Block for determinism until the ticker is stopped.
select {
case t.ch <- newtime:
case <-t.quit:
return
}
// Reset timeleft.
// Don't try to "catch up" by sending more.
// "Ticker adjusts the intervals or drops ticks to make up for
// slow receivers" - https://golang.org/pkg/time/#Ticker
timeleft = interval
}
case <-t.quit:
return // done
}
}
}
// Implements Ticker
func (t *logicalTicker) Chan() <-chan time.Time {
return t.ch // immutable
}
// Implements Ticker
func (t *logicalTicker) Stop() {
close(t.quit) // it *should* panic when stopped twice.
}
//---------------------------------------------------------------------
/*
RepeatTimer repeatedly sends a struct{}{} to .Ch after each "dur" period.
It's good for keeping connections alive.
A RepeatTimer must be Stop()'d or it will keep a goroutine alive.
RepeatTimer repeatedly sends a struct{}{} to `.Chan()` after each `dur`
period. (It's good for keeping connections alive.)
A RepeatTimer must be stopped, or it will keep a goroutine alive.
*/
type RepeatTimer struct {
Ch chan time.Time
name string
ch chan time.Time
tm TickerMaker
mtx sync.Mutex
name string
ticker *time.Ticker
quit chan struct{}
wg *sync.WaitGroup
dur time.Duration
ticker Ticker
quit chan struct{}
}
// NewRepeatTimer returns a RepeatTimer with a defaultTicker.
func NewRepeatTimer(name string, dur time.Duration) *RepeatTimer {
return NewRepeatTimerWithTickerMaker(name, dur, defaultTickerMaker)
}
// NewRepeatTimerWithTicker returns a RepeatTimer with the given ticker
// maker.
func NewRepeatTimerWithTickerMaker(name string, dur time.Duration, tm TickerMaker) *RepeatTimer {
var t = &RepeatTimer{
Ch: make(chan time.Time),
ticker: time.NewTicker(dur),
quit: make(chan struct{}),
wg: new(sync.WaitGroup),
name: name,
ch: make(chan time.Time),
tm: tm,
dur: dur,
ticker: nil,
quit: nil,
}
t.wg.Add(1)
go t.fireRoutine(t.ticker)
t.reset()
return t
}
func (t *RepeatTimer) fireRoutine(ticker *time.Ticker) {
func (t *RepeatTimer) fireRoutine(ch <-chan time.Time, quit <-chan struct{}) {
for {
select {
case t_ := <-ticker.C:
t.Ch <- t_
case <-t.quit:
// needed so we know when we can reset t.quit
t.wg.Done()
case t_ := <-ch:
t.ch <- t_
case <-quit: // NOTE: `t.quit` races.
return
}
}
}
func (t *RepeatTimer) Chan() <-chan time.Time {
return t.ch
}
func (t *RepeatTimer) Stop() {
t.mtx.Lock()
defer t.mtx.Unlock()
t.stop()
}
// Wait the duration again before firing.
func (t *RepeatTimer) Reset() {
t.Stop()
t.mtx.Lock() // Lock
t.mtx.Lock()
defer t.mtx.Unlock()
t.ticker = time.NewTicker(t.dur)
t.reset()
}
//----------------------------------------
// Misc.
// CONTRACT: (non-constructor) caller should hold t.mtx.
func (t *RepeatTimer) reset() {
if t.ticker != nil {
t.stop()
}
t.ticker = t.tm(t.dur)
t.quit = make(chan struct{})
t.wg.Add(1)
go t.fireRoutine(t.ticker)
go t.fireRoutine(t.ticker.Chan(), t.quit)
}
// For ease of .Stop()'ing services before .Start()'ing them,
// we ignore .Stop()'s on nil RepeatTimers.
func (t *RepeatTimer) Stop() bool {
if t == nil {
return false
// CONTRACT: caller should hold t.mtx.
func (t *RepeatTimer) stop() {
if t.ticker == nil {
/*
Similar to the case of closing channels twice:
https://groups.google.com/forum/#!topic/golang-nuts/rhxMiNmRAPk
Stopping a RepeatTimer twice implies that you do
not know whether you are done or not.
If you're calling stop on a stopped RepeatTimer,
you probably have race conditions.
*/
panic("Tried to stop a stopped RepeatTimer")
}
t.mtx.Lock() // Lock
defer t.mtx.Unlock()
t.ticker.Stop()
t.ticker = nil
/*
XXX
From https://golang.org/pkg/time/#Ticker:
"Stop the ticker to release associated resources"
"After Stop, no more ticks will be sent"
So we shouldn't have to do the below.
exists := t.ticker != nil
if exists {
t.ticker.Stop() // does not close the channel
select {
case <-t.Ch:
case <-t.ch:
// read off channel if there's anything there
default:
}
close(t.quit)
t.wg.Wait() // must wait for quit to close else we race Reset
t.ticker = nil
}
return exists
*/
close(t.quit)
}

+ 92
- 0
common/repeat_timer_test.go View File

@ -0,0 +1,92 @@
package common
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestDefaultTicker(t *testing.T) {
ticker := defaultTickerMaker(time.Millisecond * 10)
<-ticker.Chan()
ticker.Stop()
}
func TestRepeat(t *testing.T) {
ch := make(chan time.Time, 100)
lt := time.Time{} // zero time is year 1
// tick fires `cnt` times for each second.
tick := func(cnt int) {
for i := 0; i < cnt; i++ {
lt = lt.Add(time.Second)
ch <- lt
}
}
// tock consumes Ticker.Chan() events `cnt` times.
tock := func(t *testing.T, rt *RepeatTimer, cnt int) {
for i := 0; i < cnt; i++ {
timeout := time.After(time.Second * 10)
select {
case <-rt.Chan():
case <-timeout:
panic("expected RepeatTimer to fire")
}
}
done := true
select {
case <-rt.Chan():
done = false
default:
}
assert.True(t, done)
}
tm := NewLogicalTickerMaker(ch)
dur := time.Duration(10 * time.Millisecond) // less than a second
rt := NewRepeatTimerWithTickerMaker("bar", dur, tm)
// Start at 0.
tock(t, rt, 0)
tick(1) // init time
tock(t, rt, 0)
tick(1) // wait 1 periods
tock(t, rt, 1)
tick(2) // wait 2 periods
tock(t, rt, 2)
tick(3) // wait 3 periods
tock(t, rt, 3)
tick(4) // wait 4 periods
tock(t, rt, 4)
// Multiple resets leads to no firing.
for i := 0; i < 20; i++ {
time.Sleep(time.Millisecond)
rt.Reset()
}
// After this, it works as new.
tock(t, rt, 0)
tick(1) // init time
tock(t, rt, 0)
tick(1) // wait 1 periods
tock(t, rt, 1)
tick(2) // wait 2 periods
tock(t, rt, 2)
tick(3) // wait 3 periods
tock(t, rt, 3)
tick(4) // wait 4 periods
tock(t, rt, 4)
// After a stop, nothing more is sent.
rt.Stop()
tock(t, rt, 0)
// Another stop panics.
assert.Panics(t, func() { rt.Stop() })
}

+ 32
- 14
common/service.go View File

@ -1,23 +1,41 @@
package common
import (
"errors"
"fmt"
"sync/atomic"
"github.com/tendermint/tmlibs/log"
)
var (
ErrAlreadyStarted = errors.New("already started")
ErrAlreadyStopped = errors.New("already stopped")
)
// Service defines a service that can be started, stopped, and reset.
type Service interface {
Start() (bool, error)
// Start the service.
// If it's already started or stopped, will return an error.
// If OnStart() returns an error, it's returned by Start()
Start() error
OnStart() error
Stop() bool
// Stop the service.
// If it's already stopped, will return an error.
// OnStop must never error.
Stop() error
OnStop()
Reset() (bool, error)
// Reset the service.
// Panics by default - must be overwritten to enable reset.
Reset() error
OnReset() error
// Return true if the service is running
IsRunning() bool
// String representation of the service
String() string
SetLogger(log.Logger)
@ -94,11 +112,11 @@ func (bs *BaseService) SetLogger(l log.Logger) {
}
// Implements Servce
func (bs *BaseService) Start() (bool, error) {
func (bs *BaseService) Start() error {
if atomic.CompareAndSwapUint32(&bs.started, 0, 1) {
if atomic.LoadUint32(&bs.stopped) == 1 {
bs.Logger.Error(Fmt("Not starting %v -- already stopped", bs.name), "impl", bs.impl)
return false, nil
return ErrAlreadyStopped
} else {
bs.Logger.Info(Fmt("Starting %v", bs.name), "impl", bs.impl)
}
@ -106,12 +124,12 @@ func (bs *BaseService) Start() (bool, error) {
if err != nil {
// revert flag
atomic.StoreUint32(&bs.started, 0)
return false, err
return err
}
return true, err
return nil
} else {
bs.Logger.Debug(Fmt("Not starting %v -- already started", bs.name), "impl", bs.impl)
return false, nil
return ErrAlreadyStarted
}
}
@ -121,15 +139,15 @@ func (bs *BaseService) Start() (bool, error) {
func (bs *BaseService) OnStart() error { return nil }
// Implements Service
func (bs *BaseService) Stop() bool {
func (bs *BaseService) Stop() error {
if atomic.CompareAndSwapUint32(&bs.stopped, 0, 1) {
bs.Logger.Info(Fmt("Stopping %v", bs.name), "impl", bs.impl)
bs.impl.OnStop()
close(bs.Quit)
return true
return nil
} else {
bs.Logger.Debug(Fmt("Stopping %v (ignoring: already stopped)", bs.name), "impl", bs.impl)
return false
return ErrAlreadyStopped
}
}
@ -139,17 +157,17 @@ func (bs *BaseService) Stop() bool {
func (bs *BaseService) OnStop() {}
// Implements Service
func (bs *BaseService) Reset() (bool, error) {
func (bs *BaseService) Reset() error {
if !atomic.CompareAndSwapUint32(&bs.stopped, 1, 0) {
bs.Logger.Debug(Fmt("Can't reset %v. Not stopped", bs.name), "impl", bs.impl)
return false, nil
return fmt.Errorf("can't reset running %s", bs.name)
}
// whether or not we've started, we can reset
atomic.CompareAndSwapUint32(&bs.started, 1, 0)
bs.Quit = make(chan struct{})
return true, bs.impl.OnReset()
return bs.impl.OnReset()
}
// Implements Service


+ 38
- 8
common/service_test.go View File

@ -2,23 +2,53 @@ package common
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestBaseServiceWait(t *testing.T) {
type testService struct {
BaseService
}
type TestService struct {
BaseService
}
ts := &TestService{}
func (testService) OnReset() error {
return nil
}
func TestBaseServiceWait(t *testing.T) {
ts := &testService{}
ts.BaseService = *NewBaseService(nil, "TestService", ts)
ts.Start()
waitFinished := make(chan struct{})
go func() {
ts.Stop()
ts.Wait()
waitFinished <- struct{}{}
}()
for i := 0; i < 10; i++ {
ts.Wait()
go ts.Stop()
select {
case <-waitFinished:
// all good
case <-time.After(100 * time.Millisecond):
t.Fatal("expected Wait() to finish within 100 ms.")
}
}
func TestBaseServiceReset(t *testing.T) {
ts := &testService{}
ts.BaseService = *NewBaseService(nil, "TestService", ts)
ts.Start()
err := ts.Reset()
require.Error(t, err, "expected cant reset service error")
ts.Stop()
err = ts.Reset()
require.NoError(t, err)
err = ts.Start()
require.NoError(t, err)
}

+ 10
- 0
common/string.go View File

@ -43,3 +43,13 @@ func StripHex(s string) string {
}
return s
}
// StringInSlice returns true if a is found the list.
func StringInSlice(a string, list []string) bool {
for _, b := range list {
if b == a {
return true
}
}
return false
}

+ 14
- 0
common/string_test.go View File

@ -0,0 +1,14 @@
package common
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestStringInSlice(t *testing.T) {
assert.True(t, StringInSlice("a", []string{"a", "b", "c"}))
assert.False(t, StringInSlice("d", []string{"a", "b", "c"}))
assert.True(t, StringInSlice("", []string{""}))
assert.False(t, StringInSlice("", []string{}))
}

+ 78
- 0
common/throttle_timer_test.go View File

@ -0,0 +1,78 @@
package common
import (
"sync"
"testing"
"time"
// make govet noshadow happy...
asrt "github.com/stretchr/testify/assert"
)
type thCounter struct {
input chan struct{}
mtx sync.Mutex
count int
}
func (c *thCounter) Increment() {
c.mtx.Lock()
c.count++
c.mtx.Unlock()
}
func (c *thCounter) Count() int {
c.mtx.Lock()
val := c.count
c.mtx.Unlock()
return val
}
// Read should run in a go-routine and
// updates count by one every time a packet comes in
func (c *thCounter) Read() {
for range c.input {
c.Increment()
}
}
func TestThrottle(test *testing.T) {
assert := asrt.New(test)
ms := 50
delay := time.Duration(ms) * time.Millisecond
longwait := time.Duration(2) * delay
t := NewThrottleTimer("foo", delay)
// start at 0
c := &thCounter{input: t.Ch}
assert.Equal(0, c.Count())
go c.Read()
// waiting does nothing
time.Sleep(longwait)
assert.Equal(0, c.Count())
// send one event adds one
t.Set()
time.Sleep(longwait)
assert.Equal(1, c.Count())
// send a burst adds one
for i := 0; i < 5; i++ {
t.Set()
}
time.Sleep(longwait)
assert.Equal(2, c.Count())
// send 12, over 2 delay sections, adds 3
short := time.Duration(ms/5) * time.Millisecond
for i := 0; i < 13; i++ {
t.Set()
time.Sleep(short)
}
time.Sleep(longwait)
assert.Equal(5, c.Count())
close(t.Ch)
}

+ 14
- 14
events/events_test.go View File

@ -13,8 +13,8 @@ import (
// listener to an event, and sends a string "data".
func TestAddListenerForEventFireOnce(t *testing.T) {
evsw := NewEventSwitch()
started, err := evsw.Start()
if !started || err != nil {
err := evsw.Start()
if err != nil {
t.Errorf("Failed to start EventSwitch, error: %v", err)
}
messages := make(chan EventData)
@ -33,8 +33,8 @@ func TestAddListenerForEventFireOnce(t *testing.T) {
// listener to an event, and sends a thousand integers.
func TestAddListenerForEventFireMany(t *testing.T) {
evsw := NewEventSwitch()
started, err := evsw.Start()
if !started || err != nil {
err := evsw.Start()
if err != nil {
t.Errorf("Failed to start EventSwitch, error: %v", err)
}
doneSum := make(chan uint64)
@ -62,8 +62,8 @@ func TestAddListenerForEventFireMany(t *testing.T) {
// of the three events.
func TestAddListenerForDifferentEvents(t *testing.T) {
evsw := NewEventSwitch()
started, err := evsw.Start()
if !started || err != nil {
err := evsw.Start()
if err != nil {
t.Errorf("Failed to start EventSwitch, error: %v", err)
}
doneSum := make(chan uint64)
@ -107,8 +107,8 @@ func TestAddListenerForDifferentEvents(t *testing.T) {
// for each of the three events.
func TestAddDifferentListenerForDifferentEvents(t *testing.T) {
evsw := NewEventSwitch()
started, err := evsw.Start()
if !started || err != nil {
err := evsw.Start()
if err != nil {
t.Errorf("Failed to start EventSwitch, error: %v", err)
}
doneSum1 := make(chan uint64)
@ -167,8 +167,8 @@ func TestAddDifferentListenerForDifferentEvents(t *testing.T) {
// the listener and fires a thousand integers for the second event.
func TestAddAndRemoveListener(t *testing.T) {
evsw := NewEventSwitch()
started, err := evsw.Start()
if !started || err != nil {
err := evsw.Start()
if err != nil {
t.Errorf("Failed to start EventSwitch, error: %v", err)
}
doneSum1 := make(chan uint64)
@ -212,8 +212,8 @@ func TestAddAndRemoveListener(t *testing.T) {
// TestRemoveListener does basic tests on adding and removing
func TestRemoveListener(t *testing.T) {
evsw := NewEventSwitch()
started, err := evsw.Start()
if !started || err != nil {
err := evsw.Start()
if err != nil {
t.Errorf("Failed to start EventSwitch, error: %v", err)
}
count := 10
@ -265,8 +265,8 @@ func TestRemoveListener(t *testing.T) {
// `go test -race`, to examine for possible race conditions.
func TestRemoveListenersAsync(t *testing.T) {
evsw := NewEventSwitch()
started, err := evsw.Start()
if !started || err != nil {
err := evsw.Start()
if err != nil {
t.Errorf("Failed to start EventSwitch, error: %v", err)
}
doneSum1 := make(chan uint64)


+ 2
- 2
glide.lock View File

@ -107,7 +107,7 @@ imports:
version: eb3733d160e74a9c7e442f435eb3bea458e1d19f
testImports:
- name: github.com/davecgh/go-spew
version: 6d212800a42e8ab5c146b8ace3490ee17e5225f9
version: 04cdfd42973bb9c8589fd6a731800cf222fde1a9
subpackages:
- spew
- name: github.com/pmezard/go-difflib
@ -115,7 +115,7 @@ testImports:
subpackages:
- difflib
- name: github.com/stretchr/testify
version: 69483b4bd14f5845b5a1e55bca19e954e827f1d0
version: 2aa2c176b9dab406a6970f6a55f513e8a8c8b18f
subpackages:
- assert
- require

+ 0
- 1
glide.yaml View File

@ -25,7 +25,6 @@ import:
- ripemd160
testImport:
- package: github.com/stretchr/testify
version: ^1.1.4
subpackages:
- assert
- require

+ 0
- 78
logger/log.go View File

@ -1,78 +0,0 @@
// DEPRECATED! Use newer log package.
package logger
import (
"os"
"github.com/tendermint/log15"
. "github.com/tendermint/tmlibs/common"
)
var mainHandler log15.Handler
var bypassHandler log15.Handler
func init() {
resetWithLogLevel("debug")
}
func SetLogLevel(logLevel string) {
resetWithLogLevel(logLevel)
}
func resetWithLogLevel(logLevel string) {
// main handler
//handlers := []log15.Handler{}
mainHandler = log15.LvlFilterHandler(
getLevel(logLevel),
log15.StreamHandler(os.Stdout, log15.TerminalFormat()),
)
//handlers = append(handlers, mainHandler)
// bypass handler for not filtering on global logLevel.
bypassHandler = log15.StreamHandler(os.Stdout, log15.TerminalFormat())
//handlers = append(handlers, bypassHandler)
// By setting handlers on the root, we handle events from all loggers.
log15.Root().SetHandler(mainHandler)
}
// See go-wire/log for an example of usage.
func MainHandler() log15.Handler {
return mainHandler
}
func New(ctx ...interface{}) log15.Logger {
return NewMain(ctx...)
}
func BypassHandler() log15.Handler {
return bypassHandler
}
func NewMain(ctx ...interface{}) log15.Logger {
return log15.Root().New(ctx...)
}
func NewBypass(ctx ...interface{}) log15.Logger {
bypass := log15.New(ctx...)
bypass.SetHandler(bypassHandler)
return bypass
}
func getLevel(lvlString string) log15.Lvl {
lvl, err := log15.LvlFromString(lvlString)
if err != nil {
Exit(Fmt("Invalid log level %v: %v", lvlString, err))
}
return lvl
}
//----------------------------------------
// Exported from log15
var LvlFilterHandler = log15.LvlFilterHandler
var LvlDebug = log15.LvlDebug
var LvlInfo = log15.LvlInfo
var LvlNotice = log15.LvlNotice
var LvlWarn = log15.LvlWarn
var LvlError = log15.LvlError

+ 0
- 76
process/process.go View File

@ -1,76 +0,0 @@
package process
import (
"fmt"
"io"
"os"
"os/exec"
"time"
)
type Process struct {
Label string
ExecPath string
Args []string
Pid int
StartTime time.Time
EndTime time.Time
Cmd *exec.Cmd `json:"-"`
ExitState *os.ProcessState `json:"-"`
InputFile io.Reader `json:"-"`
OutputFile io.WriteCloser `json:"-"`
WaitCh chan struct{} `json:"-"`
}
// execPath: command name
// args: args to command. (should not include name)
func StartProcess(label string, dir string, execPath string, args []string, inFile io.Reader, outFile io.WriteCloser) (*Process, error) {
cmd := exec.Command(execPath, args...)
cmd.Dir = dir
cmd.Stdout = outFile
cmd.Stderr = outFile
cmd.Stdin = inFile
if err := cmd.Start(); err != nil {
return nil, err
}
proc := &Process{
Label: label,
ExecPath: execPath,
Args: args,
Pid: cmd.Process.Pid,
StartTime: time.Now(),
Cmd: cmd,
ExitState: nil,
InputFile: inFile,
OutputFile: outFile,
WaitCh: make(chan struct{}),
}
go func() {
err := proc.Cmd.Wait()
if err != nil {
// fmt.Printf("Process exit: %v\n", err)
if exitError, ok := err.(*exec.ExitError); ok {
proc.ExitState = exitError.ProcessState
}
}
proc.ExitState = proc.Cmd.ProcessState
proc.EndTime = time.Now() // TODO make this goroutine-safe
err = proc.OutputFile.Close()
if err != nil {
fmt.Printf("Error closing output file for %v: %v\n", proc.Label, err)
}
close(proc.WaitCh)
}()
return proc, nil
}
func (proc *Process) StopProcess(kill bool) error {
defer proc.OutputFile.Close()
if kill {
// fmt.Printf("Killing process %v\n", proc.Cmd.Process)
return proc.Cmd.Process.Kill()
} else {
// fmt.Printf("Stopping process %v\n", proc.Cmd.Process)
return proc.Cmd.Process.Signal(os.Interrupt)
}
}

+ 0
- 22
process/util.go View File

@ -1,22 +0,0 @@
package process
import (
. "github.com/tendermint/tmlibs/common"
)
// Runs a command and gets the result.
func Run(dir string, command string, args []string) (string, bool, error) {
outFile := NewBufferCloser(nil)
proc, err := StartProcess("", dir, command, args, nil, outFile)
if err != nil {
return "", false, err
}
<-proc.WaitCh
if proc.ExitState.Success() {
return outFile.String(), true, nil
} else {
return outFile.String(), false, nil
}
}

+ 56
- 13
pubsub/pubsub.go View File

@ -13,6 +13,8 @@ package pubsub
import (
"context"
"errors"
"sync"
cmn "github.com/tendermint/tmlibs/common"
)
@ -38,6 +40,7 @@ type cmd struct {
// Query defines an interface for a query to be used for subscribing.
type Query interface {
Matches(tags map[string]interface{}) bool
String() string
}
// Server allows clients to subscribe/unsubscribe for messages, publishing
@ -47,6 +50,9 @@ type Server struct {
cmds chan cmd
cmdsCap int
mtx sync.RWMutex
subscriptions map[string]map[string]struct{} // subscriber -> query -> struct{}
}
// Option sets a parameter for the server.
@ -56,7 +62,9 @@ type Option func(*Server)
// for a detailed description of how to configure buffering. If no options are
// provided, the resulting server's queue is unbuffered.
func NewServer(options ...Option) *Server {
s := &Server{}
s := &Server{
subscriptions: make(map[string]map[string]struct{}),
}
s.BaseService = *cmn.NewBaseService(nil, "PubSub", s)
for _, option := range options {
@ -82,17 +90,33 @@ func BufferCapacity(cap int) Option {
}
// BufferCapacity returns capacity of the internal server's queue.
func (s Server) BufferCapacity() int {
func (s *Server) BufferCapacity() int {
return s.cmdsCap
}
// Subscribe creates a subscription for the given client. It accepts a channel
// on which messages matching the given query can be received. If the
// subscription already exists, the old channel will be closed. An error will
// be returned to the caller if the context is canceled.
// on which messages matching the given query can be received. An error will be
// returned to the caller if the context is canceled or if subscription already
// exist for pair clientID and query.
func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error {
s.mtx.RLock()
clientSubscriptions, ok := s.subscriptions[clientID]
if ok {
_, ok = clientSubscriptions[query.String()]
}
s.mtx.RUnlock()
if ok {
return errors.New("already subscribed")
}
select {
case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}:
s.mtx.Lock()
if _, ok = s.subscriptions[clientID]; !ok {
s.subscriptions[clientID] = make(map[string]struct{})
}
s.subscriptions[clientID][query.String()] = struct{}{}
s.mtx.Unlock()
return nil
case <-ctx.Done():
return ctx.Err()
@ -100,10 +124,24 @@ func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, ou
}
// Unsubscribe removes the subscription on the given query. An error will be
// returned to the caller if the context is canceled.
// returned to the caller if the context is canceled or if subscription does
// not exist.
func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error {
s.mtx.RLock()
clientSubscriptions, ok := s.subscriptions[clientID]
if ok {
_, ok = clientSubscriptions[query.String()]
}
s.mtx.RUnlock()
if !ok {
return errors.New("subscription not found")
}
select {
case s.cmds <- cmd{op: unsub, clientID: clientID, query: query}:
s.mtx.Lock()
delete(clientSubscriptions, query.String())
s.mtx.Unlock()
return nil
case <-ctx.Done():
return ctx.Err()
@ -111,10 +149,20 @@ func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query)
}
// UnsubscribeAll removes all client subscriptions. An error will be returned
// to the caller if the context is canceled.
// to the caller if the context is canceled or if subscription does not exist.
func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
s.mtx.RLock()
_, ok := s.subscriptions[clientID]
s.mtx.RUnlock()
if !ok {
return errors.New("subscription not found")
}
select {
case s.cmds <- cmd{op: unsub, clientID: clientID}:
s.mtx.Lock()
delete(s.subscriptions, clientID)
s.mtx.Unlock()
return nil
case <-ctx.Done():
return ctx.Err()
@ -186,13 +234,8 @@ loop:
func (state *state) add(clientID string, q Query, ch chan<- interface{}) {
// add query if needed
if clientToChannelMap, ok := state.queries[q]; !ok {
if _, ok := state.queries[q]; !ok {
state.queries[q] = make(map[string]chan<- interface{})
} else {
// check if already subscribed
if oldCh, ok := clientToChannelMap[clientID]; ok {
close(oldCh)
}
}
// create subscription


+ 25
- 7
pubsub/pubsub_test.go View File

@ -86,14 +86,11 @@ func TestClientSubscribesTwice(t *testing.T) {
ch2 := make(chan interface{}, 1)
err = s.Subscribe(ctx, clientID, q, ch2)
require.NoError(t, err)
_, ok := <-ch1
assert.False(t, ok)
require.Error(t, err)
err = s.PublishWithTags(ctx, "Spider-Man", map[string]interface{}{"tm.events.type": "NewBlock"})
require.NoError(t, err)
assertReceive(t, "Spider-Man", ch2)
assertReceive(t, "Spider-Man", ch1)
}
func TestUnsubscribe(t *testing.T) {
@ -117,6 +114,27 @@ func TestUnsubscribe(t *testing.T) {
assert.False(t, ok)
}
func TestResubscribe(t *testing.T) {
s := pubsub.NewServer()
s.SetLogger(log.TestingLogger())
s.Start()
defer s.Stop()
ctx := context.Background()
ch := make(chan interface{})
err := s.Subscribe(ctx, clientID, query.Empty{}, ch)
require.NoError(t, err)
err = s.Unsubscribe(ctx, clientID, query.Empty{})
require.NoError(t, err)
ch = make(chan interface{})
err = s.Subscribe(ctx, clientID, query.Empty{}, ch)
require.NoError(t, err)
err = s.Publish(ctx, "Cable")
require.NoError(t, err)
assertReceive(t, "Cable", ch)
}
func TestUnsubscribeAll(t *testing.T) {
s := pubsub.NewServer()
s.SetLogger(log.TestingLogger())
@ -125,9 +143,9 @@ func TestUnsubscribeAll(t *testing.T) {
ctx := context.Background()
ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1)
err := s.Subscribe(ctx, clientID, query.Empty{}, ch1)
err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), ch1)
require.NoError(t, err)
err = s.Subscribe(ctx, clientID, query.Empty{}, ch2)
err = s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlockHeader'"), ch2)
require.NoError(t, err)
err = s.UnsubscribeAll(ctx, clientID)


+ 2
- 2
pubsub/query/parser_test.go View File

@ -83,9 +83,9 @@ func TestParser(t *testing.T) {
for _, c := range cases {
_, err := query.New(c.query)
if c.valid {
assert.NoError(t, err, "Query was '%s'", c.query)
assert.NoErrorf(t, err, "Query was '%s'", c.query)
} else {
assert.Error(t, err, "Query was '%s'", c.query)
assert.Errorf(t, err, "Query was '%s'", c.query)
}
}
}

+ 114
- 32
pubsub/query/query.go View File

@ -22,6 +22,14 @@ type Query struct {
parser *QueryParser
}
// Condition represents a single condition within a query and consists of tag
// (e.g. "tx.gas"), operator (e.g. "=") and operand (e.g. "7").
type Condition struct {
Tag string
Op Operator
Operand interface{}
}
// New parses the given string and returns a query or error if the string is
// invalid.
func New(s string) (*Query, error) {
@ -48,17 +56,91 @@ func (q *Query) String() string {
return q.str
}
type operator uint8
// Operator is an operator that defines some kind of relation between tag and
// operand (equality, etc.).
type Operator uint8
const (
opLessEqual operator = iota
opGreaterEqual
opLess
opGreater
opEqual
opContains
// "<="
OpLessEqual Operator = iota
// ">="
OpGreaterEqual
// "<"
OpLess
// ">"
OpGreater
// "="
OpEqual
// "CONTAINS"; used to check if a string contains a certain sub string.
OpContains
)
// Conditions returns a list of conditions.
func (q *Query) Conditions() []Condition {
conditions := make([]Condition, 0)
buffer, begin, end := q.parser.Buffer, 0, 0
var tag string
var op Operator
// tokens must be in the following order: tag ("tx.gas") -> operator ("=") -> operand ("7")
for _, token := range q.parser.Tokens() {
switch token.pegRule {
case rulePegText:
begin, end = int(token.begin), int(token.end)
case ruletag:
tag = buffer[begin:end]
case rulele:
op = OpLessEqual
case rulege:
op = OpGreaterEqual
case rulel:
op = OpLess
case ruleg:
op = OpGreater
case ruleequal:
op = OpEqual
case rulecontains:
op = OpContains
case rulevalue:
// strip single quotes from value (i.e. "'NewBlock'" -> "NewBlock")
valueWithoutSingleQuotes := buffer[begin+1 : end-1]
conditions = append(conditions, Condition{tag, op, valueWithoutSingleQuotes})
case rulenumber:
number := buffer[begin:end]
if strings.Contains(number, ".") { // if it looks like a floating-point number
value, err := strconv.ParseFloat(number, 64)
if err != nil {
panic(fmt.Sprintf("got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", err, number))
}
conditions = append(conditions, Condition{tag, op, value})
} else {
value, err := strconv.ParseInt(number, 10, 64)
if err != nil {
panic(fmt.Sprintf("got %v while trying to parse %s as int64 (should never happen if the grammar is correct)", err, number))
}
conditions = append(conditions, Condition{tag, op, value})
}
case ruletime:
value, err := time.Parse(time.RFC3339, buffer[begin:end])
if err != nil {
panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", err, buffer[begin:end]))
}
conditions = append(conditions, Condition{tag, op, value})
case ruledate:
value, err := time.Parse("2006-01-02", buffer[begin:end])
if err != nil {
panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / '2006-01-02' (should never happen if the grammar is correct)", err, buffer[begin:end]))
}
conditions = append(conditions, Condition{tag, op, value})
}
}
return conditions
}
// Matches returns true if the query matches the given set of tags, false otherwise.
//
// For example, query "name=John" matches tags = {"name": "John"}. More
@ -71,7 +153,7 @@ func (q *Query) Matches(tags map[string]interface{}) bool {
buffer, begin, end := q.parser.Buffer, 0, 0
var tag string
var op operator
var op Operator
// tokens must be in the following order: tag ("tx.gas") -> operator ("=") -> operand ("7")
for _, token := range q.parser.Tokens() {
@ -82,17 +164,17 @@ func (q *Query) Matches(tags map[string]interface{}) bool {
case ruletag:
tag = buffer[begin:end]
case rulele:
op = opLessEqual
op = OpLessEqual
case rulege:
op = opGreaterEqual
op = OpGreaterEqual
case rulel:
op = opLess
op = OpLess
case ruleg:
op = opGreater
op = OpGreater
case ruleequal:
op = opEqual
op = OpEqual
case rulecontains:
op = opContains
op = OpContains
case rulevalue:
// strip single quotes from value (i.e. "'NewBlock'" -> "NewBlock")
valueWithoutSingleQuotes := buffer[begin+1 : end-1]
@ -149,7 +231,7 @@ func (q *Query) Matches(tags map[string]interface{}) bool {
// value from it to the operand using the operator.
//
// "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" }
func match(tag string, op operator, operand reflect.Value, tags map[string]interface{}) bool {
func match(tag string, op Operator, operand reflect.Value, tags map[string]interface{}) bool {
// look up the tag from the query in tags
value, ok := tags[tag]
if !ok {
@ -163,15 +245,15 @@ func match(tag string, op operator, operand reflect.Value, tags map[string]inter
return false
}
switch op {
case opLessEqual:
case OpLessEqual:
return v.Before(operandAsTime) || v.Equal(operandAsTime)
case opGreaterEqual:
case OpGreaterEqual:
return v.Equal(operandAsTime) || v.After(operandAsTime)
case opLess:
case OpLess:
return v.Before(operandAsTime)
case opGreater:
case OpGreater:
return v.After(operandAsTime)
case opEqual:
case OpEqual:
return v.Equal(operandAsTime)
}
case reflect.Float64:
@ -197,15 +279,15 @@ func match(tag string, op operator, operand reflect.Value, tags map[string]inter
panic(fmt.Sprintf("Incomparable types: %T (%v) vs float64 (%v)", value, value, operandFloat64))
}
switch op {
case opLessEqual:
case OpLessEqual:
return v <= operandFloat64
case opGreaterEqual:
case OpGreaterEqual:
return v >= operandFloat64
case opLess:
case OpLess:
return v < operandFloat64
case opGreater:
case OpGreater:
return v > operandFloat64
case opEqual:
case OpEqual:
return v == operandFloat64
}
case reflect.Int64:
@ -231,15 +313,15 @@ func match(tag string, op operator, operand reflect.Value, tags map[string]inter
panic(fmt.Sprintf("Incomparable types: %T (%v) vs int64 (%v)", value, value, operandInt))
}
switch op {
case opLessEqual:
case OpLessEqual:
return v <= operandInt
case opGreaterEqual:
case OpGreaterEqual:
return v >= operandInt
case opLess:
case OpLess:
return v < operandInt
case opGreater:
case OpGreater:
return v > operandInt
case opEqual:
case OpEqual:
return v == operandInt
}
case reflect.String:
@ -248,9 +330,9 @@ func match(tag string, op operator, operand reflect.Value, tags map[string]inter
return false
}
switch op {
case opEqual:
case OpEqual:
return v == operand.String()
case opContains:
case OpContains:
return strings.Contains(v, operand.String())
}
default:


+ 24
- 3
pubsub/query/query_test.go View File

@ -45,15 +45,15 @@ func TestMatches(t *testing.T) {
}
for _, tc := range testCases {
query, err := query.New(tc.s)
q, err := query.New(tc.s)
if !tc.err {
require.Nil(t, err)
}
if tc.matches {
assert.True(t, query.Matches(tc.tags), "Query '%s' should match %v", tc.s, tc.tags)
assert.True(t, q.Matches(tc.tags), "Query '%s' should match %v", tc.s, tc.tags)
} else {
assert.False(t, query.Matches(tc.tags), "Query '%s' should not match %v", tc.s, tc.tags)
assert.False(t, q.Matches(tc.tags), "Query '%s' should not match %v", tc.s, tc.tags)
}
}
}
@ -62,3 +62,24 @@ func TestMustParse(t *testing.T) {
assert.Panics(t, func() { query.MustParse("=") })
assert.NotPanics(t, func() { query.MustParse("tm.events.type='NewBlock'") })
}
func TestConditions(t *testing.T) {
txTime, err := time.Parse(time.RFC3339, "2013-05-03T14:45:00Z")
require.NoError(t, err)
testCases := []struct {
s string
conditions []query.Condition
}{
{s: "tm.events.type='NewBlock'", conditions: []query.Condition{query.Condition{Tag: "tm.events.type", Op: query.OpEqual, Operand: "NewBlock"}}},
{s: "tx.gas > 7 AND tx.gas < 9", conditions: []query.Condition{query.Condition{Tag: "tx.gas", Op: query.OpGreater, Operand: int64(7)}, query.Condition{Tag: "tx.gas", Op: query.OpLess, Operand: int64(9)}}},
{s: "tx.time >= TIME 2013-05-03T14:45:00Z", conditions: []query.Condition{query.Condition{Tag: "tx.time", Op: query.OpGreaterEqual, Operand: txTime}}},
}
for _, tc := range testCases {
q, err := query.New(tc.s)
require.Nil(t, err)
assert.Equal(t, tc.conditions, q.Conditions())
}
}

+ 10
- 7
test.sh View File

@ -1,12 +1,15 @@
#!/usr/bin/env bash
set -e
echo "" > coverage.txt
# run the linter
# make metalinter_test
# run the unit tests with coverage
echo "" > coverage.txt
for d in $(go list ./... | grep -v vendor); do
go test -race -coverprofile=profile.out -covermode=atomic "$d"
if [ -f profile.out ]; then
cat profile.out >> coverage.txt
rm profile.out
fi
go test -race -coverprofile=profile.out -covermode=atomic "$d"
if [ -f profile.out ]; then
cat profile.out >> coverage.txt
rm profile.out
fi
done

+ 1
- 1
version/version.go View File

@ -1,3 +1,3 @@
package version
const Version = "0.4.0"
const Version = "0.6.0"

Loading…
Cancel
Save