@ -1,2 +1,5 @@ | |||
*.swp | |||
vendor | |||
.glide | |||
pubsub/query/fuzz_test/output |
@ -0,0 +1,43 @@ | |||
package common | |||
import ( | |||
"strings" | |||
"time" | |||
"github.com/pkg/errors" | |||
) | |||
// TimeLayout helps to parse a date string of the format YYYY-MM-DD | |||
// Intended to be used with the following function: | |||
// time.Parse(TimeLayout, date) | |||
var TimeLayout = "2006-01-02" //this represents YYYY-MM-DD | |||
// ParseDateRange parses a date range string of the format start:end | |||
// where the start and end date are of the format YYYY-MM-DD. | |||
// The parsed dates are time.Time and will return the zero time for | |||
// unbounded dates, ex: | |||
// unbounded start: :2000-12-31 | |||
// unbounded end: 2000-12-31: | |||
func ParseDateRange(dateRange string) (startDate, endDate time.Time, err error) { | |||
dates := strings.Split(dateRange, ":") | |||
if len(dates) != 2 { | |||
err = errors.New("bad date range, must be in format date:date") | |||
return | |||
} | |||
parseDate := func(date string) (out time.Time, err error) { | |||
if len(date) == 0 { | |||
return | |||
} | |||
out, err = time.Parse(TimeLayout, date) | |||
return | |||
} | |||
startDate, err = parseDate(dates[0]) | |||
if err != nil { | |||
return | |||
} | |||
endDate, err = parseDate(dates[1]) | |||
if err != nil { | |||
return | |||
} | |||
return | |||
} |
@ -0,0 +1,46 @@ | |||
package common | |||
import ( | |||
"testing" | |||
"time" | |||
"github.com/stretchr/testify/assert" | |||
) | |||
var ( | |||
date = time.Date(2015, time.Month(12), 31, 0, 0, 0, 0, time.UTC) | |||
date2 = time.Date(2016, time.Month(12), 31, 0, 0, 0, 0, time.UTC) | |||
zero time.Time | |||
) | |||
func TestParseDateRange(t *testing.T) { | |||
assert := assert.New(t) | |||
var testDates = []struct { | |||
dateStr string | |||
start time.Time | |||
end time.Time | |||
errNil bool | |||
}{ | |||
{"2015-12-31:2016-12-31", date, date2, true}, | |||
{"2015-12-31:", date, zero, true}, | |||
{":2016-12-31", zero, date2, true}, | |||
{"2016-12-31", zero, zero, false}, | |||
{"2016-31-12:", zero, zero, false}, | |||
{":2016-31-12", zero, zero, false}, | |||
} | |||
for _, test := range testDates { | |||
start, end, err := ParseDateRange(test.dateStr) | |||
if test.errNil { | |||
assert.Nil(err) | |||
testPtr := func(want, have time.Time) { | |||
assert.True(have.Equal(want)) | |||
} | |||
testPtr(test.start, start) | |||
testPtr(test.end, end) | |||
} else { | |||
assert.NotNil(err) | |||
} | |||
} | |||
} |
@ -0,0 +1,153 @@ | |||
package common | |||
import ( | |||
"encoding/json" | |||
"io" | |||
"net/http" | |||
"gopkg.in/go-playground/validator.v9" | |||
"github.com/pkg/errors" | |||
) | |||
type ErrorResponse struct { | |||
Success bool `json:"success,omitempty"` | |||
// Err is the error message if Success is false | |||
Err string `json:"error,omitempty"` | |||
// Code is set if Success is false | |||
Code int `json:"code,omitempty"` | |||
} | |||
// ErrorWithCode makes an ErrorResponse with the | |||
// provided err's Error() content, and status code. | |||
// It panics if err is nil. | |||
func ErrorWithCode(err error, code int) *ErrorResponse { | |||
return &ErrorResponse{ | |||
Err: err.Error(), | |||
Code: code, | |||
} | |||
} | |||
// Ensure that ErrorResponse implements error | |||
var _ error = (*ErrorResponse)(nil) | |||
func (er *ErrorResponse) Error() string { | |||
return er.Err | |||
} | |||
// Ensure that ErrorResponse implements httpCoder | |||
var _ httpCoder = (*ErrorResponse)(nil) | |||
func (er *ErrorResponse) HTTPCode() int { | |||
return er.Code | |||
} | |||
var errNilBody = errors.Errorf("expecting a non-nil body") | |||
// FparseJSON unmarshals into save, the body of the provided reader. | |||
// Since it uses json.Unmarshal, save must be of a pointer type | |||
// or compatible with json.Unmarshal. | |||
func FparseJSON(r io.Reader, save interface{}) error { | |||
if r == nil { | |||
return errors.Wrap(errNilBody, "Reader") | |||
} | |||
dec := json.NewDecoder(r) | |||
if err := dec.Decode(save); err != nil { | |||
return errors.Wrap(err, "Decode/Unmarshal") | |||
} | |||
return nil | |||
} | |||
// ParseRequestJSON unmarshals into save, the body of the | |||
// request. It closes the body of the request after parsing. | |||
// Since it uses json.Unmarshal, save must be of a pointer type | |||
// or compatible with json.Unmarshal. | |||
func ParseRequestJSON(r *http.Request, save interface{}) error { | |||
if r == nil || r.Body == nil { | |||
return errNilBody | |||
} | |||
defer r.Body.Close() | |||
return FparseJSON(r.Body, save) | |||
} | |||
// ParseRequestAndValidateJSON unmarshals into save, the body of the | |||
// request and invokes a validator on the saved content. To ensure | |||
// validation, make sure to set tags "validate" on your struct as | |||
// per https://godoc.org/gopkg.in/go-playground/validator.v9. | |||
// It closes the body of the request after parsing. | |||
// Since it uses json.Unmarshal, save must be of a pointer type | |||
// or compatible with json.Unmarshal. | |||
func ParseRequestAndValidateJSON(r *http.Request, save interface{}) error { | |||
if r == nil || r.Body == nil { | |||
return errNilBody | |||
} | |||
defer r.Body.Close() | |||
return FparseAndValidateJSON(r.Body, save) | |||
} | |||
// FparseAndValidateJSON like FparseJSON unmarshals into save, | |||
// the body of the provided reader. However, it invokes the validator | |||
// to check the set validators on your struct fields as per | |||
// per https://godoc.org/gopkg.in/go-playground/validator.v9. | |||
// Since it uses json.Unmarshal, save must be of a pointer type | |||
// or compatible with json.Unmarshal. | |||
func FparseAndValidateJSON(r io.Reader, save interface{}) error { | |||
if err := FparseJSON(r, save); err != nil { | |||
return err | |||
} | |||
return validate(save) | |||
} | |||
var theValidator = validator.New() | |||
func validate(obj interface{}) error { | |||
return errors.Wrap(theValidator.Struct(obj), "Validate") | |||
} | |||
// WriteSuccess JSON marshals the content provided, to an HTTP | |||
// response, setting the provided status code and setting header | |||
// "Content-Type" to "application/json". | |||
func WriteSuccess(w http.ResponseWriter, data interface{}) { | |||
WriteCode(w, data, 200) | |||
} | |||
// WriteCode JSON marshals content, to an HTTP response, | |||
// setting the provided status code, and setting header | |||
// "Content-Type" to "application/json". If JSON marshalling fails | |||
// with an error, WriteCode instead writes out the error invoking | |||
// WriteError. | |||
func WriteCode(w http.ResponseWriter, out interface{}, code int) { | |||
blob, err := json.MarshalIndent(out, "", " ") | |||
if err != nil { | |||
WriteError(w, err) | |||
} else { | |||
w.Header().Set("Content-Type", "application/json") | |||
w.WriteHeader(code) | |||
w.Write(blob) | |||
} | |||
} | |||
type httpCoder interface { | |||
HTTPCode() int | |||
} | |||
// WriteError is a convenience function to write out an | |||
// error to an http.ResponseWriter, to send out an error | |||
// that's structured as JSON i.e the form | |||
// {"error": sss, "code": ddd} | |||
// If err implements the interface HTTPCode() int, | |||
// it will use that status code otherwise, it will | |||
// set code to be http.StatusBadRequest | |||
func WriteError(w http.ResponseWriter, err error) { | |||
code := http.StatusBadRequest | |||
if httpC, ok := err.(httpCoder); ok { | |||
code = httpC.HTTPCode() | |||
} | |||
WriteCode(w, ErrorWithCode(err, code), code) | |||
} |
@ -0,0 +1,250 @@ | |||
package common_test | |||
import ( | |||
"bytes" | |||
"encoding/json" | |||
"errors" | |||
"io" | |||
"io/ioutil" | |||
"net/http" | |||
"net/http/httptest" | |||
"reflect" | |||
"strings" | |||
"sync" | |||
"testing" | |||
"github.com/stretchr/testify/assert" | |||
"github.com/stretchr/testify/require" | |||
"github.com/tendermint/tmlibs/common" | |||
) | |||
func TestWriteSuccess(t *testing.T) { | |||
w := httptest.NewRecorder() | |||
common.WriteSuccess(w, "foo") | |||
assert.Equal(t, w.Code, 200, "should get a 200") | |||
} | |||
var blankErrResponse = new(common.ErrorResponse) | |||
func TestWriteError(t *testing.T) { | |||
tests := [...]struct { | |||
msg string | |||
code int | |||
}{ | |||
0: { | |||
msg: "this is a message", | |||
code: 419, | |||
}, | |||
} | |||
for i, tt := range tests { | |||
w := httptest.NewRecorder() | |||
msg := tt.msg | |||
// First check without a defined code, should send back a 400 | |||
common.WriteError(w, errors.New(msg)) | |||
assert.Equal(t, w.Code, http.StatusBadRequest, "#%d: should get a 400", i) | |||
blob, err := ioutil.ReadAll(w.Body) | |||
if err != nil { | |||
assert.Fail(t, "expecting a successful ioutil.ReadAll", "#%d", i) | |||
continue | |||
} | |||
recv := new(common.ErrorResponse) | |||
if err := json.Unmarshal(blob, recv); err != nil { | |||
assert.Fail(t, "expecting a successful json.Unmarshal", "#%d", i) | |||
continue | |||
} | |||
assert.Equal(t, reflect.DeepEqual(recv, blankErrResponse), false, "expecting a non-blank error response") | |||
// Now test with an error that's .HTTPCode() int conforming | |||
// Reset w | |||
w = httptest.NewRecorder() | |||
common.WriteError(w, common.ErrorWithCode(errors.New("foo"), tt.code)) | |||
assert.Equal(t, w.Code, tt.code, "case #%d", i) | |||
} | |||
} | |||
type marshalFailer struct{} | |||
var errFooFailed = errors.New("foo failed here") | |||
func (mf *marshalFailer) MarshalJSON() ([]byte, error) { | |||
return nil, errFooFailed | |||
} | |||
func TestWriteCode(t *testing.T) { | |||
codes := [...]int{ | |||
0: http.StatusOK, | |||
1: http.StatusBadRequest, | |||
2: http.StatusUnauthorized, | |||
3: http.StatusInternalServerError, | |||
} | |||
for i, code := range codes { | |||
w := httptest.NewRecorder() | |||
common.WriteCode(w, "foo", code) | |||
assert.Equal(t, w.Code, code, "#%d", i) | |||
// Then for the failed JSON marshaling | |||
w = httptest.NewRecorder() | |||
common.WriteCode(w, &marshalFailer{}, code) | |||
wantCode := http.StatusBadRequest | |||
assert.Equal(t, w.Code, wantCode, "#%d", i) | |||
assert.True(t, strings.Contains(string(w.Body.Bytes()), errFooFailed.Error()), | |||
"#%d: expected %q in the error message", i, errFooFailed) | |||
} | |||
} | |||
type saver struct { | |||
Foo int `json:"foo" validate:"min=10"` | |||
Bar string `json:"bar"` | |||
} | |||
type rcloser struct { | |||
closeOnce sync.Once | |||
body *bytes.Buffer | |||
closeChan chan bool | |||
} | |||
var errAlreadyClosed = errors.New("already closed") | |||
func (rc *rcloser) Close() error { | |||
var err = errAlreadyClosed | |||
rc.closeOnce.Do(func() { | |||
err = nil | |||
rc.closeChan <- true | |||
close(rc.closeChan) | |||
}) | |||
return err | |||
} | |||
func (rc *rcloser) Read(b []byte) (int, error) { | |||
return rc.body.Read(b) | |||
} | |||
var _ io.ReadCloser = (*rcloser)(nil) | |||
func makeReq(strBody string) (*http.Request, <-chan bool) { | |||
closeChan := make(chan bool, 1) | |||
buf := new(bytes.Buffer) | |||
buf.Write([]byte(strBody)) | |||
req := &http.Request{ | |||
Header: make(http.Header), | |||
Body: &rcloser{body: buf, closeChan: closeChan}, | |||
} | |||
return req, closeChan | |||
} | |||
func TestParseRequestJSON(t *testing.T) { | |||
tests := [...]struct { | |||
body string | |||
wantErr bool | |||
useNil bool | |||
}{ | |||
0: {wantErr: true, body: ``}, | |||
1: {body: `{}`}, | |||
2: {body: `{"foo": 2}`}, // Not that the validate tags don't matter here since we are just parsing | |||
3: {body: `{"foo": "abcd"}`, wantErr: true}, | |||
4: {useNil: true, wantErr: true}, | |||
} | |||
for i, tt := range tests { | |||
req, closeChan := makeReq(tt.body) | |||
if tt.useNil { | |||
req.Body = nil | |||
} | |||
sav := new(saver) | |||
err := common.ParseRequestJSON(req, sav) | |||
if tt.wantErr { | |||
assert.NotEqual(t, err, nil, "#%d: want non-nil error", i) | |||
continue | |||
} | |||
assert.Equal(t, err, nil, "#%d: want nil error", i) | |||
wasClosed := <-closeChan | |||
assert.Equal(t, wasClosed, true, "#%d: should have invoked close", i) | |||
} | |||
} | |||
func TestFparseJSON(t *testing.T) { | |||
r1 := strings.NewReader(`{"foo": 1}`) | |||
sav := new(saver) | |||
require.Equal(t, common.FparseJSON(r1, sav), nil, "expecting successful parsing") | |||
r2 := strings.NewReader(`{"bar": "blockchain"}`) | |||
require.Equal(t, common.FparseJSON(r2, sav), nil, "expecting successful parsing") | |||
require.Equal(t, reflect.DeepEqual(sav, &saver{Foo: 1, Bar: "blockchain"}), true, "should have parsed both") | |||
// Now with a nil body | |||
require.NotEqual(t, nil, common.FparseJSON(nil, sav), "expecting a nil error report") | |||
} | |||
func TestFparseAndValidateJSON(t *testing.T) { | |||
r1 := strings.NewReader(`{"foo": 1}`) | |||
sav := new(saver) | |||
require.NotEqual(t, common.FparseAndValidateJSON(r1, sav), nil, "expecting validation to fail") | |||
r1 = strings.NewReader(`{"foo": 100}`) | |||
require.Equal(t, common.FparseJSON(r1, sav), nil, "expecting successful parsing") | |||
r2 := strings.NewReader(`{"bar": "blockchain"}`) | |||
require.Equal(t, common.FparseAndValidateJSON(r2, sav), nil, "expecting successful parsing") | |||
require.Equal(t, reflect.DeepEqual(sav, &saver{Foo: 100, Bar: "blockchain"}), true, "should have parsed both") | |||
// Now with a nil body | |||
require.NotEqual(t, nil, common.FparseJSON(nil, sav), "expecting a nil error report") | |||
} | |||
var blankSaver = new(saver) | |||
func TestParseAndValidateRequestJSON(t *testing.T) { | |||
tests := [...]struct { | |||
body string | |||
wantErr bool | |||
useNil bool | |||
}{ | |||
0: {wantErr: true, body: ``}, | |||
1: {body: `{}`, wantErr: true}, // Here it should fail since Foo doesn't meet the minimum value | |||
2: {body: `{"foo": 2}`, wantErr: true}, // Here validation should fail | |||
3: {body: `{"foo": "abcd"}`, wantErr: true}, | |||
4: {useNil: true, wantErr: true}, | |||
5: {body: `{"foo": 100}`}, // Must succeed | |||
} | |||
for i, tt := range tests { | |||
req, closeChan := makeReq(tt.body) | |||
if tt.useNil { | |||
req.Body = nil | |||
} | |||
sav := new(saver) | |||
err := common.ParseRequestAndValidateJSON(req, sav) | |||
if tt.wantErr { | |||
assert.NotEqual(t, err, nil, "#%d: want non-nil error", i) | |||
continue | |||
} | |||
assert.Equal(t, err, nil, "#%d: want nil error", i) | |||
assert.False(t, reflect.DeepEqual(blankSaver, sav), "#%d: expecting a set saver", i) | |||
wasClosed := <-closeChan | |||
assert.Equal(t, wasClosed, true, "#%d: should have invoked close", i) | |||
} | |||
} | |||
func TestErrorWithCode(t *testing.T) { | |||
tests := [...]struct { | |||
code int | |||
err error | |||
}{ | |||
0: {code: 500, err: errors.New("funky")}, | |||
1: {code: 406, err: errors.New("purist")}, | |||
} | |||
for i, tt := range tests { | |||
errRes := common.ErrorWithCode(tt.err, tt.code) | |||
assert.Equal(t, errRes.Error(), tt.err.Error(), "#%d: expecting the error values to be equal", i) | |||
assert.Equal(t, errRes.Code, tt.code, "expecting the same status code", i) | |||
assert.Equal(t, errRes.HTTPCode(), tt.code, "expecting the same status code", i) | |||
} | |||
} |
@ -0,0 +1,38 @@ | |||
package common | |||
import ( | |||
"testing" | |||
"github.com/stretchr/testify/assert" | |||
) | |||
func TestProtocolAndAddress(t *testing.T) { | |||
cases := []struct { | |||
fullAddr string | |||
proto string | |||
addr string | |||
}{ | |||
{ | |||
"tcp://mydomain:80", | |||
"tcp", | |||
"mydomain:80", | |||
}, | |||
{ | |||
"mydomain:80", | |||
"tcp", | |||
"mydomain:80", | |||
}, | |||
{ | |||
"unix://mydomain:80", | |||
"unix", | |||
"mydomain:80", | |||
}, | |||
} | |||
for _, c := range cases { | |||
proto, addr := ProtocolAndAddress(c.fullAddr) | |||
assert.Equal(t, proto, c.proto) | |||
assert.Equal(t, addr, c.addr) | |||
} | |||
} |
@ -0,0 +1,29 @@ | |||
package common | |||
import ( | |||
"bytes" | |||
"fmt" | |||
"io/ioutil" | |||
"os" | |||
"testing" | |||
"time" | |||
) | |||
func TestWriteFileAtomic(t *testing.T) { | |||
data := []byte("Becatron") | |||
fname := fmt.Sprintf("/tmp/write-file-atomic-test-%v.txt", time.Now().UnixNano()) | |||
err := WriteFileAtomic(fname, data, 0664) | |||
if err != nil { | |||
t.Fatal(err) | |||
} | |||
rData, err := ioutil.ReadFile(fname) | |||
if err != nil { | |||
t.Fatal(err) | |||
} | |||
if !bytes.Equal(data, rData) { | |||
t.Fatalf("data mismatch: %v != %v", data, rData) | |||
} | |||
if err := os.Remove(fname); err != nil { | |||
t.Fatal(err) | |||
} | |||
} |
@ -0,0 +1,28 @@ | |||
package db | |||
import ( | |||
"testing" | |||
"github.com/stretchr/testify/assert" | |||
) | |||
func TestMemDbIterator(t *testing.T) { | |||
db := NewMemDB() | |||
keys := make([][]byte, 100) | |||
for i := 0; i < 100; i++ { | |||
keys[i] = []byte{byte(i)} | |||
} | |||
value := []byte{5} | |||
for _, k := range keys { | |||
db.Set(k, value) | |||
} | |||
iter := db.Iterator() | |||
i := 0 | |||
for iter.Next() { | |||
assert.Equal(t, db.Get(iter.Key()), iter.Value(), "values dont match for key") | |||
i += 1 | |||
} | |||
assert.Equal(t, i, len(db.db), "iterator didnt cover whole db") | |||
} |
@ -1,18 +0,0 @@ | |||
package log_test | |||
import ( | |||
"testing" | |||
"github.com/tendermint/tmlibs/log" | |||
) | |||
func TestNopLogger(t *testing.T) { | |||
t.Parallel() | |||
logger := log.NewNopLogger() | |||
if err := logger.Info("Hello", "abc", 123); err != nil { | |||
t.Error(err) | |||
} | |||
if err := logger.With("def", "ghi").Debug(""); err != nil { | |||
t.Error(err) | |||
} | |||
} |
@ -0,0 +1,27 @@ | |||
package pubsub_test | |||
import ( | |||
"context" | |||
"testing" | |||
"github.com/stretchr/testify/require" | |||
"github.com/tendermint/tmlibs/log" | |||
"github.com/tendermint/tmlibs/pubsub" | |||
"github.com/tendermint/tmlibs/pubsub/query" | |||
) | |||
func TestExample(t *testing.T) { | |||
s := pubsub.NewServer() | |||
s.SetLogger(log.TestingLogger()) | |||
s.Start() | |||
defer s.Stop() | |||
ctx := context.Background() | |||
ch := make(chan interface{}, 1) | |||
err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"), ch) | |||
require.NoError(t, err) | |||
err = s.PublishWithTags(ctx, "Tombstone", map[string]interface{}{"abci.account.name": "John"}) | |||
require.NoError(t, err) | |||
assertReceive(t, "Tombstone", ch) | |||
} |
@ -0,0 +1,253 @@ | |||
// Package pubsub implements a pub-sub model with a single publisher (Server) | |||
// and multiple subscribers (clients). | |||
// | |||
// Though you can have multiple publishers by sharing a pointer to a server or | |||
// by giving the same channel to each publisher and publishing messages from | |||
// that channel (fan-in). | |||
// | |||
// Clients subscribe for messages, which could be of any type, using a query. | |||
// When some message is published, we match it with all queries. If there is a | |||
// match, this message will be pushed to all clients, subscribed to that query. | |||
// See query subpackage for our implementation. | |||
package pubsub | |||
import ( | |||
"context" | |||
cmn "github.com/tendermint/tmlibs/common" | |||
) | |||
type operation int | |||
const ( | |||
sub operation = iota | |||
pub | |||
unsub | |||
shutdown | |||
) | |||
type cmd struct { | |||
op operation | |||
query Query | |||
ch chan<- interface{} | |||
clientID string | |||
msg interface{} | |||
tags map[string]interface{} | |||
} | |||
// Query defines an interface for a query to be used for subscribing. | |||
type Query interface { | |||
Matches(tags map[string]interface{}) bool | |||
} | |||
// Server allows clients to subscribe/unsubscribe for messages, publishing | |||
// messages with or without tags, and manages internal state. | |||
type Server struct { | |||
cmn.BaseService | |||
cmds chan cmd | |||
cmdsCap int | |||
} | |||
// Option sets a parameter for the server. | |||
type Option func(*Server) | |||
// NewServer returns a new server. See the commentary on the Option functions | |||
// for a detailed description of how to configure buffering. If no options are | |||
// provided, the resulting server's queue is unbuffered. | |||
func NewServer(options ...Option) *Server { | |||
s := &Server{} | |||
s.BaseService = *cmn.NewBaseService(nil, "PubSub", s) | |||
for _, option := range options { | |||
option(s) | |||
} | |||
// if BufferCapacity option was not set, the channel is unbuffered | |||
s.cmds = make(chan cmd, s.cmdsCap) | |||
return s | |||
} | |||
// BufferCapacity allows you to specify capacity for the internal server's | |||
// queue. Since the server, given Y subscribers, could only process X messages, | |||
// this option could be used to survive spikes (e.g. high amount of | |||
// transactions during peak hours). | |||
func BufferCapacity(cap int) Option { | |||
return func(s *Server) { | |||
if cap > 0 { | |||
s.cmdsCap = cap | |||
} | |||
} | |||
} | |||
// BufferCapacity returns capacity of the internal server's queue. | |||
func (s Server) BufferCapacity() int { | |||
return s.cmdsCap | |||
} | |||
// Subscribe creates a subscription for the given client. It accepts a channel | |||
// on which messages matching the given query can be received. If the | |||
// subscription already exists, the old channel will be closed. An error will | |||
// be returned to the caller if the context is canceled. | |||
func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error { | |||
select { | |||
case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}: | |||
return nil | |||
case <-ctx.Done(): | |||
return ctx.Err() | |||
} | |||
} | |||
// Unsubscribe removes the subscription on the given query. An error will be | |||
// returned to the caller if the context is canceled. | |||
func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) error { | |||
select { | |||
case s.cmds <- cmd{op: unsub, clientID: clientID, query: query}: | |||
return nil | |||
case <-ctx.Done(): | |||
return ctx.Err() | |||
} | |||
} | |||
// UnsubscribeAll removes all client subscriptions. An error will be returned | |||
// to the caller if the context is canceled. | |||
func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error { | |||
select { | |||
case s.cmds <- cmd{op: unsub, clientID: clientID}: | |||
return nil | |||
case <-ctx.Done(): | |||
return ctx.Err() | |||
} | |||
} | |||
// Publish publishes the given message. An error will be returned to the caller | |||
// if the context is canceled. | |||
func (s *Server) Publish(ctx context.Context, msg interface{}) error { | |||
return s.PublishWithTags(ctx, msg, make(map[string]interface{})) | |||
} | |||
// PublishWithTags publishes the given message with the set of tags. The set is | |||
// matched with clients queries. If there is a match, the message is sent to | |||
// the client. | |||
func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags map[string]interface{}) error { | |||
select { | |||
case s.cmds <- cmd{op: pub, msg: msg, tags: tags}: | |||
return nil | |||
case <-ctx.Done(): | |||
return ctx.Err() | |||
} | |||
} | |||
// OnStop implements Service.OnStop by shutting down the server. | |||
func (s *Server) OnStop() { | |||
s.cmds <- cmd{op: shutdown} | |||
} | |||
// NOTE: not goroutine safe | |||
type state struct { | |||
// query -> client -> ch | |||
queries map[Query]map[string]chan<- interface{} | |||
// client -> query -> struct{} | |||
clients map[string]map[Query]struct{} | |||
} | |||
// OnStart implements Service.OnStart by starting the server. | |||
func (s *Server) OnStart() error { | |||
go s.loop(state{ | |||
queries: make(map[Query]map[string]chan<- interface{}), | |||
clients: make(map[string]map[Query]struct{}), | |||
}) | |||
return nil | |||
} | |||
func (s *Server) loop(state state) { | |||
loop: | |||
for cmd := range s.cmds { | |||
switch cmd.op { | |||
case unsub: | |||
if cmd.query != nil { | |||
state.remove(cmd.clientID, cmd.query) | |||
} else { | |||
state.removeAll(cmd.clientID) | |||
} | |||
case shutdown: | |||
for clientID := range state.clients { | |||
state.removeAll(clientID) | |||
} | |||
break loop | |||
case sub: | |||
state.add(cmd.clientID, cmd.query, cmd.ch) | |||
case pub: | |||
state.send(cmd.msg, cmd.tags) | |||
} | |||
} | |||
} | |||
func (state *state) add(clientID string, q Query, ch chan<- interface{}) { | |||
// add query if needed | |||
if clientToChannelMap, ok := state.queries[q]; !ok { | |||
state.queries[q] = make(map[string]chan<- interface{}) | |||
} else { | |||
// check if already subscribed | |||
if oldCh, ok := clientToChannelMap[clientID]; ok { | |||
close(oldCh) | |||
} | |||
} | |||
// create subscription | |||
state.queries[q][clientID] = ch | |||
// add client if needed | |||
if _, ok := state.clients[clientID]; !ok { | |||
state.clients[clientID] = make(map[Query]struct{}) | |||
} | |||
state.clients[clientID][q] = struct{}{} | |||
} | |||
func (state *state) remove(clientID string, q Query) { | |||
clientToChannelMap, ok := state.queries[q] | |||
if !ok { | |||
return | |||
} | |||
ch, ok := clientToChannelMap[clientID] | |||
if ok { | |||
close(ch) | |||
delete(state.clients[clientID], q) | |||
// if it not subscribed to anything else, remove the client | |||
if len(state.clients[clientID]) == 0 { | |||
delete(state.clients, clientID) | |||
} | |||
delete(state.queries[q], clientID) | |||
} | |||
} | |||
func (state *state) removeAll(clientID string) { | |||
queryMap, ok := state.clients[clientID] | |||
if !ok { | |||
return | |||
} | |||
for q := range queryMap { | |||
ch := state.queries[q][clientID] | |||
close(ch) | |||
delete(state.queries[q], clientID) | |||
} | |||
delete(state.clients, clientID) | |||
} | |||
func (state *state) send(msg interface{}, tags map[string]interface{}) { | |||
for q, clientToChannelMap := range state.queries { | |||
if q.Matches(tags) { | |||
for _, ch := range clientToChannelMap { | |||
ch <- msg | |||
} | |||
} | |||
} | |||
} |
@ -0,0 +1,234 @@ | |||
package pubsub_test | |||
import ( | |||
"context" | |||
"fmt" | |||
"runtime/debug" | |||
"testing" | |||
"time" | |||
"github.com/stretchr/testify/assert" | |||
"github.com/stretchr/testify/require" | |||
"github.com/tendermint/tmlibs/log" | |||
"github.com/tendermint/tmlibs/pubsub" | |||
"github.com/tendermint/tmlibs/pubsub/query" | |||
) | |||
const ( | |||
clientID = "test-client" | |||
) | |||
func TestSubscribe(t *testing.T) { | |||
s := pubsub.NewServer() | |||
s.SetLogger(log.TestingLogger()) | |||
s.Start() | |||
defer s.Stop() | |||
ctx := context.Background() | |||
ch := make(chan interface{}, 1) | |||
err := s.Subscribe(ctx, clientID, query.Empty{}, ch) | |||
require.NoError(t, err) | |||
err = s.Publish(ctx, "Ka-Zar") | |||
require.NoError(t, err) | |||
assertReceive(t, "Ka-Zar", ch) | |||
err = s.Publish(ctx, "Quicksilver") | |||
require.NoError(t, err) | |||
assertReceive(t, "Quicksilver", ch) | |||
} | |||
func TestDifferentClients(t *testing.T) { | |||
s := pubsub.NewServer() | |||
s.SetLogger(log.TestingLogger()) | |||
s.Start() | |||
defer s.Stop() | |||
ctx := context.Background() | |||
ch1 := make(chan interface{}, 1) | |||
err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"), ch1) | |||
require.NoError(t, err) | |||
err = s.PublishWithTags(ctx, "Iceman", map[string]interface{}{"tm.events.type": "NewBlock"}) | |||
require.NoError(t, err) | |||
assertReceive(t, "Iceman", ch1) | |||
ch2 := make(chan interface{}, 1) | |||
err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), ch2) | |||
require.NoError(t, err) | |||
err = s.PublishWithTags(ctx, "Ultimo", map[string]interface{}{"tm.events.type": "NewBlock", "abci.account.name": "Igor"}) | |||
require.NoError(t, err) | |||
assertReceive(t, "Ultimo", ch1) | |||
assertReceive(t, "Ultimo", ch2) | |||
ch3 := make(chan interface{}, 1) | |||
err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), ch3) | |||
require.NoError(t, err) | |||
err = s.PublishWithTags(ctx, "Valeria Richards", map[string]interface{}{"tm.events.type": "NewRoundStep"}) | |||
require.NoError(t, err) | |||
assert.Zero(t, len(ch3)) | |||
} | |||
func TestClientSubscribesTwice(t *testing.T) { | |||
s := pubsub.NewServer() | |||
s.SetLogger(log.TestingLogger()) | |||
s.Start() | |||
defer s.Stop() | |||
ctx := context.Background() | |||
q := query.MustParse("tm.events.type='NewBlock'") | |||
ch1 := make(chan interface{}, 1) | |||
err := s.Subscribe(ctx, clientID, q, ch1) | |||
require.NoError(t, err) | |||
err = s.PublishWithTags(ctx, "Goblin Queen", map[string]interface{}{"tm.events.type": "NewBlock"}) | |||
require.NoError(t, err) | |||
assertReceive(t, "Goblin Queen", ch1) | |||
ch2 := make(chan interface{}, 1) | |||
err = s.Subscribe(ctx, clientID, q, ch2) | |||
require.NoError(t, err) | |||
_, ok := <-ch1 | |||
assert.False(t, ok) | |||
err = s.PublishWithTags(ctx, "Spider-Man", map[string]interface{}{"tm.events.type": "NewBlock"}) | |||
require.NoError(t, err) | |||
assertReceive(t, "Spider-Man", ch2) | |||
} | |||
func TestUnsubscribe(t *testing.T) { | |||
s := pubsub.NewServer() | |||
s.SetLogger(log.TestingLogger()) | |||
s.Start() | |||
defer s.Stop() | |||
ctx := context.Background() | |||
ch := make(chan interface{}) | |||
err := s.Subscribe(ctx, clientID, query.Empty{}, ch) | |||
require.NoError(t, err) | |||
err = s.Unsubscribe(ctx, clientID, query.Empty{}) | |||
require.NoError(t, err) | |||
err = s.Publish(ctx, "Nick Fury") | |||
require.NoError(t, err) | |||
assert.Zero(t, len(ch), "Should not receive anything after Unsubscribe") | |||
_, ok := <-ch | |||
assert.False(t, ok) | |||
} | |||
func TestUnsubscribeAll(t *testing.T) { | |||
s := pubsub.NewServer() | |||
s.SetLogger(log.TestingLogger()) | |||
s.Start() | |||
defer s.Stop() | |||
ctx := context.Background() | |||
ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1) | |||
err := s.Subscribe(ctx, clientID, query.Empty{}, ch1) | |||
require.NoError(t, err) | |||
err = s.Subscribe(ctx, clientID, query.Empty{}, ch2) | |||
require.NoError(t, err) | |||
err = s.UnsubscribeAll(ctx, clientID) | |||
require.NoError(t, err) | |||
err = s.Publish(ctx, "Nick Fury") | |||
require.NoError(t, err) | |||
assert.Zero(t, len(ch1), "Should not receive anything after UnsubscribeAll") | |||
assert.Zero(t, len(ch2), "Should not receive anything after UnsubscribeAll") | |||
_, ok := <-ch1 | |||
assert.False(t, ok) | |||
_, ok = <-ch2 | |||
assert.False(t, ok) | |||
} | |||
func TestBufferCapacity(t *testing.T) { | |||
s := pubsub.NewServer(pubsub.BufferCapacity(2)) | |||
s.SetLogger(log.TestingLogger()) | |||
assert.Equal(t, 2, s.BufferCapacity()) | |||
ctx := context.Background() | |||
err := s.Publish(ctx, "Nighthawk") | |||
require.NoError(t, err) | |||
err = s.Publish(ctx, "Sage") | |||
require.NoError(t, err) | |||
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) | |||
defer cancel() | |||
err = s.Publish(ctx, "Ironclad") | |||
if assert.Error(t, err) { | |||
assert.Equal(t, context.DeadlineExceeded, err) | |||
} | |||
} | |||
func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) } | |||
func Benchmark100Clients(b *testing.B) { benchmarkNClients(100, b) } | |||
func Benchmark1000Clients(b *testing.B) { benchmarkNClients(1000, b) } | |||
func Benchmark10ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(10, b) } | |||
func Benchmark100ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(100, b) } | |||
func Benchmark1000ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(1000, b) } | |||
func benchmarkNClients(n int, b *testing.B) { | |||
s := pubsub.NewServer() | |||
s.Start() | |||
defer s.Stop() | |||
ctx := context.Background() | |||
for i := 0; i < n; i++ { | |||
ch := make(chan interface{}) | |||
go func() { | |||
for range ch { | |||
} | |||
}() | |||
s.Subscribe(ctx, clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = %d", i)), ch) | |||
} | |||
b.ReportAllocs() | |||
b.ResetTimer() | |||
for i := 0; i < b.N; i++ { | |||
s.PublishWithTags(ctx, "Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": i}) | |||
} | |||
} | |||
func benchmarkNClientsOneQuery(n int, b *testing.B) { | |||
s := pubsub.NewServer() | |||
s.Start() | |||
defer s.Stop() | |||
ctx := context.Background() | |||
q := query.MustParse("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = 1") | |||
for i := 0; i < n; i++ { | |||
ch := make(chan interface{}) | |||
go func() { | |||
for range ch { | |||
} | |||
}() | |||
s.Subscribe(ctx, clientID, q, ch) | |||
} | |||
b.ReportAllocs() | |||
b.ResetTimer() | |||
for i := 0; i < b.N; i++ { | |||
s.PublishWithTags(ctx, "Gamora", map[string]interface{}{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": 1}) | |||
} | |||
} | |||
/////////////////////////////////////////////////////////////////////////////// | |||
/// HELPERS | |||
/////////////////////////////////////////////////////////////////////////////// | |||
func assertReceive(t *testing.T, expected interface{}, ch <-chan interface{}, msgAndArgs ...interface{}) { | |||
select { | |||
case actual := <-ch: | |||
if actual != nil { | |||
assert.Equal(t, expected, actual, msgAndArgs...) | |||
} | |||
case <-time.After(1 * time.Second): | |||
t.Errorf("Expected to receive %v from the channel, got nothing after 1s", expected) | |||
debug.PrintStack() | |||
} | |||
} |
@ -0,0 +1,11 @@ | |||
gen_query_parser: | |||
@go get github.com/pointlander/peg | |||
peg -inline -switch query.peg | |||
fuzzy_test: | |||
@go get github.com/dvyukov/go-fuzz/go-fuzz | |||
@go get github.com/dvyukov/go-fuzz/go-fuzz-build | |||
go-fuzz-build github.com/tendermint/tmlibs/pubsub/query/fuzz_test | |||
go-fuzz -bin=./fuzz_test-fuzz.zip -workdir=./fuzz_test/output | |||
.PHONY: gen_query_parser fuzzy_test |
@ -0,0 +1,14 @@ | |||
package query | |||
// Empty query matches any set of tags. | |||
type Empty struct { | |||
} | |||
// Matches always returns true. | |||
func (Empty) Matches(tags map[string]interface{}) bool { | |||
return true | |||
} | |||
func (Empty) String() string { | |||
return "empty" | |||
} |
@ -0,0 +1,16 @@ | |||
package query_test | |||
import ( | |||
"testing" | |||
"github.com/stretchr/testify/assert" | |||
"github.com/tendermint/tmlibs/pubsub/query" | |||
) | |||
func TestEmptyQueryMatchesAnything(t *testing.T) { | |||
q := query.Empty{} | |||
assert.True(t, q.Matches(map[string]interface{}{})) | |||
assert.True(t, q.Matches(map[string]interface{}{"Asher": "Roth"})) | |||
assert.True(t, q.Matches(map[string]interface{}{"Route": 66})) | |||
assert.True(t, q.Matches(map[string]interface{}{"Route": 66, "Billy": "Blue"})) | |||
} |
@ -0,0 +1,30 @@ | |||
package fuzz_test | |||
import ( | |||
"fmt" | |||
"github.com/tendermint/tmlibs/pubsub/query" | |||
) | |||
func Fuzz(data []byte) int { | |||
sdata := string(data) | |||
q0, err := query.New(sdata) | |||
if err != nil { | |||
return 0 | |||
} | |||
sdata1 := q0.String() | |||
q1, err := query.New(sdata1) | |||
if err != nil { | |||
panic(err) | |||
} | |||
sdata2 := q1.String() | |||
if sdata1 != sdata2 { | |||
fmt.Printf("q0: %q\n", sdata1) | |||
fmt.Printf("q1: %q\n", sdata2) | |||
panic("query changed") | |||
} | |||
return 1 | |||
} |
@ -0,0 +1,91 @@ | |||
package query_test | |||
import ( | |||
"testing" | |||
"github.com/stretchr/testify/assert" | |||
"github.com/tendermint/tmlibs/pubsub/query" | |||
) | |||
// TODO: fuzzy testing? | |||
func TestParser(t *testing.T) { | |||
cases := []struct { | |||
query string | |||
valid bool | |||
}{ | |||
{"tm.events.type='NewBlock'", true}, | |||
{"tm.events.type = 'NewBlock'", true}, | |||
{"tm.events.name = ''", true}, | |||
{"tm.events.type='TIME'", true}, | |||
{"tm.events.type='DATE'", true}, | |||
{"tm.events.type='='", true}, | |||
{"tm.events.type='TIME", false}, | |||
{"tm.events.type=TIME'", false}, | |||
{"tm.events.type==", false}, | |||
{"tm.events.type=NewBlock", false}, | |||
{">==", false}, | |||
{"tm.events.type 'NewBlock' =", false}, | |||
{"tm.events.type>'NewBlock'", false}, | |||
{"", false}, | |||
{"=", false}, | |||
{"='NewBlock'", false}, | |||
{"tm.events.type=", false}, | |||
{"tm.events.typeNewBlock", false}, | |||
{"tm.events.type'NewBlock'", false}, | |||
{"'NewBlock'", false}, | |||
{"NewBlock", false}, | |||
{"", false}, | |||
{"tm.events.type='NewBlock' AND abci.account.name='Igor'", true}, | |||
{"tm.events.type='NewBlock' AND", false}, | |||
{"tm.events.type='NewBlock' AN", false}, | |||
{"tm.events.type='NewBlock' AN tm.events.type='NewBlockHeader'", false}, | |||
{"AND tm.events.type='NewBlock' ", false}, | |||
{"abci.account.name CONTAINS 'Igor'", true}, | |||
{"tx.date > DATE 2013-05-03", true}, | |||
{"tx.date < DATE 2013-05-03", true}, | |||
{"tx.date <= DATE 2013-05-03", true}, | |||
{"tx.date >= DATE 2013-05-03", true}, | |||
{"tx.date >= DAT 2013-05-03", false}, | |||
{"tx.date <= DATE2013-05-03", false}, | |||
{"tx.date <= DATE -05-03", false}, | |||
{"tx.date >= DATE 20130503", false}, | |||
{"tx.date >= DATE 2013+01-03", false}, | |||
// incorrect year, month, day | |||
{"tx.date >= DATE 0013-01-03", false}, | |||
{"tx.date >= DATE 2013-31-03", false}, | |||
{"tx.date >= DATE 2013-01-83", false}, | |||
{"tx.date > TIME 2013-05-03T14:45:00+07:00", true}, | |||
{"tx.date < TIME 2013-05-03T14:45:00-02:00", true}, | |||
{"tx.date <= TIME 2013-05-03T14:45:00Z", true}, | |||
{"tx.date >= TIME 2013-05-03T14:45:00Z", true}, | |||
{"tx.date >= TIME2013-05-03T14:45:00Z", false}, | |||
{"tx.date = IME 2013-05-03T14:45:00Z", false}, | |||
{"tx.date = TIME 2013-05-:45:00Z", false}, | |||
{"tx.date >= TIME 2013-05-03T14:45:00", false}, | |||
{"tx.date >= TIME 0013-00-00T14:45:00Z", false}, | |||
{"tx.date >= TIME 2013+05=03T14:45:00Z", false}, | |||
{"account.balance=100", true}, | |||
{"account.balance >= 200", true}, | |||
{"account.balance >= -300", false}, | |||
{"account.balance >>= 400", false}, | |||
{"account.balance=33.22.1", false}, | |||
{"hash='136E18F7E4C348B780CF873A0BF43922E5BAFA63'", true}, | |||
{"hash=136E18F7E4C348B780CF873A0BF43922E5BAFA63", false}, | |||
} | |||
for _, c := range cases { | |||
_, err := query.New(c.query) | |||
if c.valid { | |||
assert.NoError(t, err, "Query was '%s'", c.query) | |||
} else { | |||
assert.Error(t, err, "Query was '%s'", c.query) | |||
} | |||
} | |||
} |
@ -0,0 +1,261 @@ | |||
// Package query provides a parser for a custom query format: | |||
// | |||
// abci.invoice.number=22 AND abci.invoice.owner=Ivan | |||
// | |||
// See query.peg for the grammar, which is a https://en.wikipedia.org/wiki/Parsing_expression_grammar. | |||
// More: https://github.com/PhilippeSigaud/Pegged/wiki/PEG-Basics | |||
// | |||
// It has a support for numbers (integer and floating point), dates and times. | |||
package query | |||
import ( | |||
"fmt" | |||
"reflect" | |||
"strconv" | |||
"strings" | |||
"time" | |||
) | |||
// Query holds the query string and the query parser. | |||
type Query struct { | |||
str string | |||
parser *QueryParser | |||
} | |||
// New parses the given string and returns a query or error if the string is | |||
// invalid. | |||
func New(s string) (*Query, error) { | |||
p := &QueryParser{Buffer: fmt.Sprintf(`"%s"`, s)} | |||
p.Init() | |||
if err := p.Parse(); err != nil { | |||
return nil, err | |||
} | |||
return &Query{str: s, parser: p}, nil | |||
} | |||
// MustParse turns the given string into a query or panics; for tests or others | |||
// cases where you know the string is valid. | |||
func MustParse(s string) *Query { | |||
q, err := New(s) | |||
if err != nil { | |||
panic(fmt.Sprintf("failed to parse %s: %v", s, err)) | |||
} | |||
return q | |||
} | |||
// String returns the original string. | |||
func (q *Query) String() string { | |||
return q.str | |||
} | |||
type operator uint8 | |||
const ( | |||
opLessEqual operator = iota | |||
opGreaterEqual | |||
opLess | |||
opGreater | |||
opEqual | |||
opContains | |||
) | |||
// Matches returns true if the query matches the given set of tags, false otherwise. | |||
// | |||
// For example, query "name=John" matches tags = {"name": "John"}. More | |||
// examples could be found in parser_test.go and query_test.go. | |||
func (q *Query) Matches(tags map[string]interface{}) bool { | |||
if len(tags) == 0 { | |||
return false | |||
} | |||
buffer, begin, end := q.parser.Buffer, 0, 0 | |||
var tag string | |||
var op operator | |||
// tokens must be in the following order: tag ("tx.gas") -> operator ("=") -> operand ("7") | |||
for _, token := range q.parser.Tokens() { | |||
switch token.pegRule { | |||
case rulePegText: | |||
begin, end = int(token.begin), int(token.end) | |||
case ruletag: | |||
tag = buffer[begin:end] | |||
case rulele: | |||
op = opLessEqual | |||
case rulege: | |||
op = opGreaterEqual | |||
case rulel: | |||
op = opLess | |||
case ruleg: | |||
op = opGreater | |||
case ruleequal: | |||
op = opEqual | |||
case rulecontains: | |||
op = opContains | |||
case rulevalue: | |||
// strip single quotes from value (i.e. "'NewBlock'" -> "NewBlock") | |||
valueWithoutSingleQuotes := buffer[begin+1 : end-1] | |||
// see if the triplet (tag, operator, operand) matches any tag | |||
// "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" } | |||
if !match(tag, op, reflect.ValueOf(valueWithoutSingleQuotes), tags) { | |||
return false | |||
} | |||
case rulenumber: | |||
number := buffer[begin:end] | |||
if strings.Contains(number, ".") { // if it looks like a floating-point number | |||
value, err := strconv.ParseFloat(number, 64) | |||
if err != nil { | |||
panic(fmt.Sprintf("got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", err, number)) | |||
} | |||
if !match(tag, op, reflect.ValueOf(value), tags) { | |||
return false | |||
} | |||
} else { | |||
value, err := strconv.ParseInt(number, 10, 64) | |||
if err != nil { | |||
panic(fmt.Sprintf("got %v while trying to parse %s as int64 (should never happen if the grammar is correct)", err, number)) | |||
} | |||
if !match(tag, op, reflect.ValueOf(value), tags) { | |||
return false | |||
} | |||
} | |||
case ruletime: | |||
value, err := time.Parse(time.RFC3339, buffer[begin:end]) | |||
if err != nil { | |||
panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", err, buffer[begin:end])) | |||
} | |||
if !match(tag, op, reflect.ValueOf(value), tags) { | |||
return false | |||
} | |||
case ruledate: | |||
value, err := time.Parse("2006-01-02", buffer[begin:end]) | |||
if err != nil { | |||
panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / '2006-01-02' (should never happen if the grammar is correct)", err, buffer[begin:end])) | |||
} | |||
if !match(tag, op, reflect.ValueOf(value), tags) { | |||
return false | |||
} | |||
} | |||
} | |||
return true | |||
} | |||
// match returns true if the given triplet (tag, operator, operand) matches any tag. | |||
// | |||
// First, it looks up the tag in tags and if it finds one, tries to compare the | |||
// value from it to the operand using the operator. | |||
// | |||
// "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" } | |||
func match(tag string, op operator, operand reflect.Value, tags map[string]interface{}) bool { | |||
// look up the tag from the query in tags | |||
value, ok := tags[tag] | |||
if !ok { | |||
return false | |||
} | |||
switch operand.Kind() { | |||
case reflect.Struct: // time | |||
operandAsTime := operand.Interface().(time.Time) | |||
v, ok := value.(time.Time) | |||
if !ok { // if value from tags is not time.Time | |||
return false | |||
} | |||
switch op { | |||
case opLessEqual: | |||
return v.Before(operandAsTime) || v.Equal(operandAsTime) | |||
case opGreaterEqual: | |||
return v.Equal(operandAsTime) || v.After(operandAsTime) | |||
case opLess: | |||
return v.Before(operandAsTime) | |||
case opGreater: | |||
return v.After(operandAsTime) | |||
case opEqual: | |||
return v.Equal(operandAsTime) | |||
} | |||
case reflect.Float64: | |||
operandFloat64 := operand.Interface().(float64) | |||
var v float64 | |||
// try our best to convert value from tags to float64 | |||
switch vt := value.(type) { | |||
case float64: | |||
v = vt | |||
case float32: | |||
v = float64(vt) | |||
case int: | |||
v = float64(vt) | |||
case int8: | |||
v = float64(vt) | |||
case int16: | |||
v = float64(vt) | |||
case int32: | |||
v = float64(vt) | |||
case int64: | |||
v = float64(vt) | |||
default: // fail for all other types | |||
panic(fmt.Sprintf("Incomparable types: %T (%v) vs float64 (%v)", value, value, operandFloat64)) | |||
} | |||
switch op { | |||
case opLessEqual: | |||
return v <= operandFloat64 | |||
case opGreaterEqual: | |||
return v >= operandFloat64 | |||
case opLess: | |||
return v < operandFloat64 | |||
case opGreater: | |||
return v > operandFloat64 | |||
case opEqual: | |||
return v == operandFloat64 | |||
} | |||
case reflect.Int64: | |||
operandInt := operand.Interface().(int64) | |||
var v int64 | |||
// try our best to convert value from tags to int64 | |||
switch vt := value.(type) { | |||
case int64: | |||
v = vt | |||
case int8: | |||
v = int64(vt) | |||
case int16: | |||
v = int64(vt) | |||
case int32: | |||
v = int64(vt) | |||
case int: | |||
v = int64(vt) | |||
case float64: | |||
v = int64(vt) | |||
case float32: | |||
v = int64(vt) | |||
default: // fail for all other types | |||
panic(fmt.Sprintf("Incomparable types: %T (%v) vs int64 (%v)", value, value, operandInt)) | |||
} | |||
switch op { | |||
case opLessEqual: | |||
return v <= operandInt | |||
case opGreaterEqual: | |||
return v >= operandInt | |||
case opLess: | |||
return v < operandInt | |||
case opGreater: | |||
return v > operandInt | |||
case opEqual: | |||
return v == operandInt | |||
} | |||
case reflect.String: | |||
v, ok := value.(string) | |||
if !ok { // if value from tags is not string | |||
return false | |||
} | |||
switch op { | |||
case opEqual: | |||
return v == operand.String() | |||
case opContains: | |||
return strings.Contains(v, operand.String()) | |||
} | |||
default: | |||
panic(fmt.Sprintf("Unknown kind of operand %v", operand.Kind())) | |||
} | |||
return false | |||
} |
@ -0,0 +1,33 @@ | |||
package query | |||
type QueryParser Peg { | |||
} | |||
e <- '\"' condition ( ' '+ and ' '+ condition )* '\"' !. | |||
condition <- tag ' '* (le ' '* (number / time / date) | |||
/ ge ' '* (number / time / date) | |||
/ l ' '* (number / time / date) | |||
/ g ' '* (number / time / date) | |||
/ equal ' '* (number / time / date / value) | |||
/ contains ' '* value | |||
) | |||
tag <- < (![ \t\n\r\\()"'=><] .)+ > | |||
value <- < '\'' (!["'] .)* '\''> | |||
number <- < ('0' | |||
/ [1-9] digit* ('.' digit*)?) > | |||
digit <- [0-9] | |||
time <- "TIME " < year '-' month '-' day 'T' digit digit ':' digit digit ':' digit digit (('-' / '+') digit digit ':' digit digit / 'Z') > | |||
date <- "DATE " < year '-' month '-' day > | |||
year <- ('1' / '2') digit digit digit | |||
month <- ('0' / '1') digit | |||
day <- ('0' / '1' / '2' / '3') digit | |||
and <- "AND" | |||
equal <- "=" | |||
contains <- "CONTAINS" | |||
le <- "<=" | |||
ge <- ">=" | |||
l <- "<" | |||
g <- ">" |
@ -0,0 +1,64 @@ | |||
package query_test | |||
import ( | |||
"testing" | |||
"time" | |||
"github.com/stretchr/testify/assert" | |||
"github.com/stretchr/testify/require" | |||
"github.com/tendermint/tmlibs/pubsub/query" | |||
) | |||
func TestMatches(t *testing.T) { | |||
const shortForm = "2006-Jan-02" | |||
txDate, err := time.Parse(shortForm, "2017-Jan-01") | |||
require.NoError(t, err) | |||
txTime, err := time.Parse(time.RFC3339, "2018-05-03T14:45:00Z") | |||
require.NoError(t, err) | |||
testCases := []struct { | |||
s string | |||
tags map[string]interface{} | |||
err bool | |||
matches bool | |||
}{ | |||
{"tm.events.type='NewBlock'", map[string]interface{}{"tm.events.type": "NewBlock"}, false, true}, | |||
{"tx.gas > 7", map[string]interface{}{"tx.gas": 8}, false, true}, | |||
{"tx.gas > 7 AND tx.gas < 9", map[string]interface{}{"tx.gas": 8}, false, true}, | |||
{"body.weight >= 3.5", map[string]interface{}{"body.weight": 3.5}, false, true}, | |||
{"account.balance < 1000.0", map[string]interface{}{"account.balance": 900}, false, true}, | |||
{"apples.kg <= 4", map[string]interface{}{"apples.kg": 4.0}, false, true}, | |||
{"body.weight >= 4.5", map[string]interface{}{"body.weight": float32(4.5)}, false, true}, | |||
{"oranges.kg < 4 AND watermellons.kg > 10", map[string]interface{}{"oranges.kg": 3, "watermellons.kg": 12}, false, true}, | |||
{"peaches.kg < 4", map[string]interface{}{"peaches.kg": 5}, false, false}, | |||
{"tx.date > DATE 2017-01-01", map[string]interface{}{"tx.date": time.Now()}, false, true}, | |||
{"tx.date = DATE 2017-01-01", map[string]interface{}{"tx.date": txDate}, false, true}, | |||
{"tx.date = DATE 2018-01-01", map[string]interface{}{"tx.date": txDate}, false, false}, | |||
{"tx.time >= TIME 2013-05-03T14:45:00Z", map[string]interface{}{"tx.time": time.Now()}, false, true}, | |||
{"tx.time = TIME 2013-05-03T14:45:00Z", map[string]interface{}{"tx.time": txTime}, false, false}, | |||
{"abci.owner.name CONTAINS 'Igor'", map[string]interface{}{"abci.owner.name": "Igor,Ivan"}, false, true}, | |||
{"abci.owner.name CONTAINS 'Igor'", map[string]interface{}{"abci.owner.name": "Pavel,Ivan"}, false, false}, | |||
} | |||
for _, tc := range testCases { | |||
query, err := query.New(tc.s) | |||
if !tc.err { | |||
require.Nil(t, err) | |||
} | |||
if tc.matches { | |||
assert.True(t, query.Matches(tc.tags), "Query '%s' should match %v", tc.s, tc.tags) | |||
} else { | |||
assert.False(t, query.Matches(tc.tags), "Query '%s' should not match %v", tc.s, tc.tags) | |||
} | |||
} | |||
} | |||
func TestMustParse(t *testing.T) { | |||
assert.Panics(t, func() { query.MustParse("=") }) | |||
assert.NotPanics(t, func() { query.MustParse("tm.events.type='NewBlock'") }) | |||
} |
@ -1,3 +1,3 @@ | |||
package version | |||
const Version = "0.2.2" | |||
const Version = "0.3.0" |