diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 074c48c94..941963366 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,5 +1,4 @@ # CODEOWNERS: https://help.github.com/articles/about-codeowners/ -# Everything goes through Bucky. For now. -* @ebuchman - +# Everything goes through Bucky and Anton. For now. +* @ebuchman @melekes diff --git a/CHANGELOG.md b/CHANGELOG.md index 49b18e720..7b5fd8c03 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,10 @@ BUG FIXES: BREAKING CHANGES: - consensus/wal: removed separator +- rpc/client: changed Subscribe/Unsubscribe/UnsubscribeAll funcs signatures to be identical to event bus. + +IMPROVEMENTS: +- rpc/client: can act as event bus subscriber (See https://github.com/tendermint/tendermint/issues/945). ## 0.13.0 (December 6, 2017) diff --git a/DOCKER/Dockerfile b/DOCKER/Dockerfile index 67d346b00..c0d09d951 100644 --- a/DOCKER/Dockerfile +++ b/DOCKER/Dockerfile @@ -1,8 +1,8 @@ FROM alpine:3.6 # This is the release of tendermint to pull in. -ENV TM_VERSION 0.12.0 -ENV TM_SHA256SUM be17469e92f04fc2a3663f891da28edbaa6c37c4d2f746736571887f4790555a +ENV TM_VERSION 0.13.0 +ENV TM_SHA256SUM 36d773d4c2890addc61cc87a72c1e9c21c89516921b0defb0edfebde719b4b85 # Tendermint will be looking for genesis file in /tendermint (unless you change # `genesis_file` in config.toml). You can put your config.toml and private diff --git a/DOCKER/README.md b/DOCKER/README.md index fd19c1014..fceab5feb 100644 --- a/DOCKER/README.md +++ b/DOCKER/README.md @@ -1,6 +1,8 @@ # Supported tags and respective `Dockerfile` links -- `0.12.0`, `latest` [(Dockerfile)](https://github.com/tendermint/tendermint/blob/70d8afa6e952e24c573ece345560a5971bf2cc0e/DOCKER/Dockerfile) +- `0.13.0`, `latest` [(Dockerfile)](https://github.com/tendermint/tendermint/blob/a28b3fff49dce2fb31f90abb2fc693834e0029c2/DOCKER/Dockerfile) +- `0.12.1` [(Dockerfile)](https://github.com/tendermint/tendermint/blob/457c688346b565e90735431619ca3ca597ef9007/DOCKER/Dockerfile) +- `0.12.0` [(Dockerfile)](https://github.com/tendermint/tendermint/blob/70d8afa6e952e24c573ece345560a5971bf2cc0e/DOCKER/Dockerfile) - `0.11.0` [(Dockerfile)](https://github.com/tendermint/tendermint/blob/9177cc1f64ca88a4a0243c5d1773d10fba67e201/DOCKER/Dockerfile) - `0.10.0` [(Dockerfile)](https://github.com/tendermint/tendermint/blob/e5342f4054ab784b2cd6150e14f01053d7c8deb2/DOCKER/Dockerfile) - `0.9.1`, `0.9`, [(Dockerfile)](https://github.com/tendermint/tendermint/blob/809e0e8c5933604ba8b2d096803ada7c5ec4dfd3/DOCKER/Dockerfile) diff --git a/Makefile b/Makefile index fb15dfc4a..b94cfd04c 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ GOTOOLS = \ github.com/mitchellh/gox \ github.com/tcnksm/ghr \ - github.com/alecthomas/gometalinter + gopkg.in/alecthomas/gometalinter.v2 PACKAGES=$(shell go list ./... | grep -v '/vendor/') BUILD_TAGS?=tendermint @@ -12,13 +12,13 @@ BUILD_FLAGS = -ldflags "-X github.com/tendermint/tendermint/version.GitCommit=`g all: get_vendor_deps install test install: - CGO_ENABLED=0 go install $(BUILD_FLAGS) ./cmd/tendermint + go install $(BUILD_FLAGS) ./cmd/tendermint build: - CGO_ENABLED=0 go build $(BUILD_FLAGS) -o build/tendermint ./cmd/tendermint/ + go build $(BUILD_FLAGS) -o build/tendermint ./cmd/tendermint/ build_race: - CGO_ENABLED=0 go build -race $(BUILD_FLAGS) -o build/tendermint ./cmd/tendermint + go build -race $(BUILD_FLAGS) -o build/tendermint ./cmd/tendermint # dist builds binaries for all platforms and packages them for distribution dist: @@ -43,6 +43,12 @@ test_release: test100: @for i in {1..100}; do make test; done +vagrant_test: + vagrant up + vagrant ssh -c 'make install' + vagrant ssh -c 'make test_race' + vagrant ssh -c 'make test_integrations' + draw_deps: # requires brew install graphviz or apt-get install graphviz go get github.com/RobotsAndPencils/goviz @@ -68,7 +74,7 @@ get_vendor_deps: @hash glide 2>/dev/null || go get github.com/Masterminds/glide @rm -rf vendor/ @echo "--> Running glide install" - @glide install + $(GOPATH)/bin/glide install update_tools: @echo "--> Updating tools" @@ -77,21 +83,22 @@ update_tools: tools: @echo "--> Installing tools" @go get $(GOTOOLS) - @gometalinter --install + $(GOPATH)/bin/gometalinter.v2 --install ### Formatting, linting, and vetting metalinter: - @gometalinter --vendor --deadline=600s --enable-all --disable=lll ./... + $(GOPATH)/bin/gometalinter.v2 --vendor --deadline=600s --enable-all --disable=lll ./... metalinter_test: - @gometalinter --vendor --deadline=600s --disable-all \ + $(GOPATH)/bin/gometalinter.v2 --vendor --deadline=600s --disable-all \ --enable=deadcode \ + --enable=gosimple \ --enable=misspell \ --enable=safesql \ ./... - # --enable=gas \ + #--enable=gas \ #--enable=maligned \ #--enable=dupl \ #--enable=errcheck \ @@ -99,7 +106,6 @@ metalinter_test: #--enable=gocyclo \ #--enable=goimports \ #--enable=golint \ <== comments on anything exported - #--enable=gosimple \ #--enable=gotype \ #--enable=ineffassign \ #--enable=interfacer \ diff --git a/Vagrantfile b/Vagrantfile index ea8042360..80d44f9c7 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -2,7 +2,7 @@ # vi: set ft=ruby : Vagrant.configure("2") do |config| - config.vm.box = "ubuntu/trusty64" + config.vm.box = "ubuntu/xenial64" config.vm.provider "virtualbox" do |v| v.memory = 4096 @@ -10,30 +10,40 @@ Vagrant.configure("2") do |config| end config.vm.provision "shell", inline: <<-SHELL - apt-get update - apt-get install -y --no-install-recommends wget curl jq shellcheck bsdmainutils psmisc - - wget -qO- https://get.docker.com/ | sh - usermod -a -G docker vagrant - apt-get autoremove -y - - apt-get install -y --no-install-recommends git - curl -O https://storage.googleapis.com/golang/go1.9.linux-amd64.tar.gz - tar -xvf go1.9.linux-amd64.tar.gz - rm -rf /usr/local/go - mv go /usr/local - rm -f go1.9.linux-amd64.tar.gz - mkdir -p /home/vagrant/go/bin - echo 'export PATH=$PATH:/usr/local/go/bin:/home/vagrant/go/bin' >> /home/vagrant/.bash_profile - echo 'export GOPATH=/home/vagrant/go' >> /home/vagrant/.bash_profile + # add docker repo + curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add - + add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu xenial stable" - echo 'export LC_ALL=en_US.UTF-8' >> /home/vagrant/.bash_profile + # and golang 1.9 support + # official repo doesn't have race detection runtime... + # add-apt-repository ppa:gophers/archive + add-apt-repository ppa:longsleep/golang-backports - mkdir -p /home/vagrant/go/src/github.com/tendermint - ln -s /vagrant /home/vagrant/go/src/github.com/tendermint/tendermint - - chown -R vagrant:vagrant /home/vagrant/go - - su - vagrant -c 'cd /home/vagrant/go/src/github.com/tendermint/tendermint && make get_vendor_deps' + # install base requirements + apt-get update + apt-get install -y --no-install-recommends wget curl jq \ + make shellcheck bsdmainutils psmisc + apt-get install -y docker-ce golang-1.9-go + + # needed for docker + usermod -a -G docker ubuntu + + # use "EOF" not EOF to avoid variable substitution of $PATH + cat << "EOF" >> /home/ubuntu/.bash_profile +export PATH=$PATH:/usr/lib/go-1.9/bin:/home/ubuntu/go/bin +export GOPATH=/home/ubuntu/go +export LC_ALL=en_US.UTF-8 +cd go/src/github.com/tendermint/tendermint +EOF + + mkdir -p /home/ubuntu/go/bin + mkdir -p /home/ubuntu/go/src/github.com/tendermint + ln -s /vagrant /home/ubuntu/go/src/github.com/tendermint/tendermint + + chown -R ubuntu:ubuntu /home/ubuntu/go + chown ubuntu:ubuntu /home/ubuntu/.bash_profile + + # get all deps and tools, ready to install/test + su - ubuntu -c 'cd /home/ubuntu/go/src/github.com/tendermint/tendermint && make get_vendor_deps && make tools' SHELL end diff --git a/circle.yml b/circle.yml index 50ffbd01b..e606097cf 100644 --- a/circle.yml +++ b/circle.yml @@ -24,6 +24,7 @@ dependencies: test: override: + - cd "$PROJECT_PATH" && make tools && make get_vendor_deps && make metalinter_test - cd "$PROJECT_PATH" && set -o pipefail && make test_integrations 2>&1 | tee test_integrations.log: timeout: 1800 post: diff --git a/cmd/tendermint/commands/root_test.go b/cmd/tendermint/commands/root_test.go index b4e30d980..8217ee166 100644 --- a/cmd/tendermint/commands/root_test.go +++ b/cmd/tendermint/commands/root_test.go @@ -32,6 +32,9 @@ func isolate(cmds ...*cobra.Command) cli.Executable { if err := os.Unsetenv("TM_HOME"); err != nil { panic(err) } + if err := os.RemoveAll(defaultRoot); err != nil { + panic(err) + } viper.Reset() config = cfg.DefaultConfig() diff --git a/config/config.go b/config/config.go index ea3fa13e4..783758896 100644 --- a/config/config.go +++ b/config/config.go @@ -2,6 +2,7 @@ package config import ( "fmt" + "os" "path/filepath" "time" ) @@ -108,7 +109,7 @@ func DefaultBaseConfig() BaseConfig { return BaseConfig{ Genesis: "genesis.json", PrivValidator: "priv_validator.json", - Moniker: "anonymous", + Moniker: defaultMoniker, ProxyApp: "tcp://127.0.0.1:46658", ABCI: "socket", LogLevel: DefaultPackageLogLevels(), @@ -456,3 +457,18 @@ func rootify(path, root string) string { } return filepath.Join(root, path) } + +//----------------------------------------------------------------------------- +// Moniker + +var defaultMoniker = getDefaultMoniker() + +// getDefaultMoniker returns a default moniker, which is the host name. If runtime +// fails to get the host name, "anonymous" will be returned. +func getDefaultMoniker() string { + moniker, err := os.Hostname() + if err != nil { + moniker = "anonymous" + } + return moniker +} diff --git a/config/toml.go b/config/toml.go index ec70ab75d..735f45c12 100644 --- a/config/toml.go +++ b/config/toml.go @@ -23,9 +23,7 @@ func EnsureRoot(rootDir string) { // Write default config file if missing. if !cmn.FileExists(configFilePath) { - // Ask user for moniker - // moniker := cfg.Prompt("Type hostname: ", "anonymous") - cmn.MustWriteFile(configFilePath, []byte(defaultConfig("anonymous")), 0644) + cmn.MustWriteFile(configFilePath, []byte(defaultConfig(defaultMoniker)), 0644) } } @@ -81,8 +79,7 @@ func ResetTestRoot(testName string) *Config { // Write default config file if missing. if !cmn.FileExists(configFilePath) { - // Ask user for moniker - cmn.MustWriteFile(configFilePath, []byte(testConfig("anonymous")), 0644) + cmn.MustWriteFile(configFilePath, []byte(testConfig(defaultMoniker)), 0644) } if !cmn.FileExists(genesisFilePath) { cmn.MustWriteFile(genesisFilePath, []byte(testGenesis), 0644) diff --git a/config/toml_test.go b/config/toml_test.go index bf3bf58f7..f927a14ca 100644 --- a/config/toml_test.go +++ b/config/toml_test.go @@ -32,7 +32,7 @@ func TestEnsureRoot(t *testing.T) { // make sure config is set properly data, err := ioutil.ReadFile(filepath.Join(tmpDir, "config.toml")) require.Nil(err) - assert.Equal([]byte(defaultConfig("anonymous")), data) + assert.Equal([]byte(defaultConfig(defaultMoniker)), data) ensureFiles(t, tmpDir, "data") } @@ -49,7 +49,7 @@ func TestEnsureTestRoot(t *testing.T) { // make sure config is set properly data, err := ioutil.ReadFile(filepath.Join(rootDir, "config.toml")) require.Nil(err) - assert.Equal([]byte(testConfig("anonymous")), data) + assert.Equal([]byte(testConfig(defaultMoniker)), data) // TODO: make sure the cfg returned and testconfig are the same! diff --git a/consensus/ticker.go b/consensus/ticker.go index 4762beccc..f66856f91 100644 --- a/consensus/ticker.go +++ b/consensus/ticker.go @@ -68,7 +68,7 @@ func (t *timeoutTicker) Chan() <-chan timeoutInfo { } // ScheduleTimeout schedules a new timeout by sending on the internal tickChan. -// The timeoutRoutine is alwaya available to read from tickChan, so this won't block. +// The timeoutRoutine is always available to read from tickChan, so this won't block. // The scheduling may fail if the timeoutRoutine has already scheduled a timeout for a later height/round/step. func (t *timeoutTicker) ScheduleTimeout(ti timeoutInfo) { t.tickChan <- ti diff --git a/docs/architecture/adr-007-trust-metric-usage.md b/docs/architecture/adr-007-trust-metric-usage.md new file mode 100644 index 000000000..4d833a69f --- /dev/null +++ b/docs/architecture/adr-007-trust-metric-usage.md @@ -0,0 +1,103 @@ +# ADR 007: Trust Metric Usage Guide + +## Context + +Tendermint is required to monitor peer quality in order to inform its peer dialing and peer exchange strategies. + +When a node first connects to the network, it is important that it can quickly find good peers. +Thus, while a node has fewer connections, it should prioritize connecting to higher quality peers. +As the node becomes well connected to the rest of the network, it can dial lesser known or lesser +quality peers and help assess their quality. Similarly, when queried for peers, a node should make +sure they dont return low quality peers. + +Peer quality can be tracked using a trust metric that flags certain behaviours as good or bad. When enough +bad behaviour accumulates, we can mark the peer as bad and disconnect. +For example, when the PEXReactor makes a request for peers network addresses from an already known peer, and the returned network addresses are unreachable, this undesirable behavior should be tracked. Returning a few bad network addresses probably shouldn’t cause a peer to be dropped, while excessive amounts of this behavior does qualify the peer for removal. The originally proposed approach and design document for the trust metric can be found in the [ADR 006](adr-006-trust-metric.md) document. + +The trust metric implementation allows a developer to obtain a peer's trust metric from a trust metric store, and track good and bad events relevant to a peer's behavior, and at any time, the peer's metric can be queried for a current trust value. The current trust value is calculated with a formula that utilizes current behavior, previous behavior, and change between the two. Current behavior is calculated as the percentage of good behavior within a time interval. The time interval is short; probably set between 30 seconds and 5 minutes. On the other hand, the historic data can estimate a peer's behavior over days worth of tracking. At the end of a time interval, the current behavior becomes part of the historic data, and a new time interval begins with the good and bad counters reset to zero. + +These are some important things to keep in mind regarding how the trust metrics handle time intervals and scoring: +- Each new time interval begins with a perfect score +- Bad events quickly bring the score down and good events cause the score to slowly rise +- When the time interval is over, the percentage of good events becomes historic data. + +Some useful information about the inner workings of the trust metric: +- When a trust metric is first instantiated, a timer (ticker) periodically fires in order to handle transitions between trust metric time intervals +- If a peer is disconnected from a node, the timer should be paused, since the node is no longer connected to that peer +- The ability to pause the metric is supported with the store **PeerDisconnected** method and the metric **Pause** method +- After a pause, if a good or bad event method is called on a metric, it automatically becomes unpaused and begins a new time interval. + +## Decision + +The trust metric capability is now available, yet, it still leaves the question of how should it be applied throughout Tendermint in order to properly track the quality of peers? + +### Proposed Process + +Peers are managed using an address book and a trust metric: + +- The address book keeps a record of peers and provides selection methods +- The trust metric tracks the quality of the peers + +#### Presence in Address Book + +Outbound peers are added to the address book before they are dialed, +and inbound peers are added once the peer connection is set up. +Peers are also added to the address book when they are received in response to +a pexRequestMessage. + +While a node has less than `needAddressThreshold`, it will periodically request more, +via pexRequestMessage, from randomly selected peers and from newly dialed outbound peers. + +When a new address is added to an address book that has more than `0.5*needAddressThreshold` addresses, +then with some low probability, a randomly chosen low quality peer is removed. + +#### Outbound Peers + +Peers attempt to maintain a minimum number of outbound connections by +repeatedly querying the address book for peers to connect to. +While a node has few to no outbound connections, the address book is biased to return +higher quality peers. As the node increases the number of outbound connections, +the address book is biased to return less-vetted or lower-quality peers. + +#### Inbound Peers + +Peers also maintain a maximum number of total connections, MaxNumPeers. +If a peer has MaxNumPeers, new incoming connections will be accepted with low probability. +When such a new connection is accepted, the peer disconnects from a probabilistically chosen low ranking peer +so it does not exceed MaxNumPeers. + +#### Peer Exchange + +When a peer receives a pexRequestMessage, it returns a random sample of high quality peers from the address book. Peers with no score or low score should not be inclided in a response to pexRequestMessage. + +#### Peer Quality + +Peer quality is tracked in the connection and across the reactors by storing the TrustMetric in the peer's +thread safe Data store. + +Peer behaviour is then defined as one of the following: +- Fatal - something outright malicious that causes us to disconnect the peer and ban it from the address book for some amount of time +- Bad - Any kind of timeout, messages that don't unmarshal, fail other validity checks, or messages we didn't ask for or aren't expecting (usually worth one bad event) +- Neutral - Unknown channels/message types/version upgrades (no good or bad events recorded) +- Correct - Normal correct behavior (worth one good event) +- Good - some random majority of peers per reactor sending us useful messages (worth more than one good event). + +Note that Fatal behaviour causes us to remove the peer, and neutral behaviour does not affect the score. + +## Status + +Proposed. + +## Consequences + +### Positive + +- Bringing the address book and trust metric store together will cause the network to be built in a way that encourages greater security and reliability. + +### Negative + +- TBD + +### Neutral + +- Keep in mind that, good events need to be recorded just as bad events do using this implementation. diff --git a/docs/specification/configuration.rst b/docs/specification/configuration.rst index 94801136f..74b41d09d 100644 --- a/docs/specification/configuration.rst +++ b/docs/specification/configuration.rst @@ -21,7 +21,8 @@ The main config parameters are defined - ``genesis_file``: The location of the genesis file. *Default*: ``"$TMHOME/genesis.json"`` - ``log_level``: *Default*: ``"state:info,*:error"`` -- ``moniker``: Name of this node. *Default*: ``"anonymous"`` +- ``moniker``: Name of this node. *Default*: the host name or ``"anonymous"`` + if runtime fails to get the host name - ``priv_validator_file``: Validator private key file. *Default*: ``"$TMHOME/priv_validator.json"`` - ``prof_laddr``: Profile listen address. *Default*: ``""`` diff --git a/glide.lock b/glide.lock index 82846067b..d18ccf6e2 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 09fc7f59ca6b718fe236368bb55f4801455295cfe455ea5865d544ee4dcfdc08 -updated: 2017-12-06T03:31:34.476581624-05:00 +hash: f420f1f858100218dad50997d939eaaf129ff654a0648a47ddc60d626ab0b8e9 +updated: 2017-12-10T05:37:46.41123196Z imports: - name: github.com/btcsuite/btcd version: 2e60448ffcc6bf78332d1fe590260095f554dd78 @@ -129,7 +129,7 @@ imports: subpackages: - iavl - name: github.com/tendermint/tmlibs - version: bfcc0217f120d3bee6730ba0789d2eb72fc2e889 + version: e4ef2835f0081c2ece83b9c1f777cf071f956e81 subpackages: - autofile - cli diff --git a/glide.yaml b/glide.yaml index 3f20a4680..e614d0a1e 100644 --- a/glide.yaml +++ b/glide.yaml @@ -34,7 +34,7 @@ import: subpackages: - iavl - package: github.com/tendermint/tmlibs - version: ~0.5.0 + version: e4ef2835f0081c2ece83b9c1f777cf071f956e81 subpackages: - autofile - cli diff --git a/node/node.go b/node/node.go index eb5509717..352c13ccc 100644 --- a/node/node.go +++ b/node/node.go @@ -2,7 +2,6 @@ package node import ( "bytes" - "context" "encoding/json" "errors" "fmt" @@ -373,12 +372,7 @@ func (n *Node) OnStart() error { } // start tx indexer - err = n.indexerService.Start() - if err != nil { - return err - } - - return nil + return n.indexerService.Start() } // OnStop stops the Node. It implements cmn.Service. @@ -446,13 +440,7 @@ func (n *Node) startRPC() ([]net.Listener, error) { for i, listenAddr := range listenAddrs { mux := http.NewServeMux() rpcLogger := n.Logger.With("module", "rpc-server") - onDisconnect := rpcserver.OnDisconnect(func(remoteAddr string) { - err := n.eventBus.UnsubscribeAll(context.Background(), remoteAddr) - if err != nil { - rpcLogger.Error("Error unsubsribing from all on disconnect", "err", err) - } - }) - wm := rpcserver.NewWebsocketManager(rpccore.Routes, onDisconnect) + wm := rpcserver.NewWebsocketManager(rpccore.Routes, rpcserver.EventSubscriber(n.eventBus)) wm.SetLogger(rpcLogger.With("protocol", "websocket")) mux.HandleFunc("/websocket", wm.WebsocketHandler) rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger) diff --git a/p2p/netaddress.go b/p2p/netaddress.go index d424f8c32..9cb7dd2c3 100644 --- a/p2p/netaddress.go +++ b/p2p/netaddress.go @@ -5,8 +5,8 @@ package p2p import ( - "errors" "flag" + "fmt" "net" "strconv" "time" @@ -45,7 +45,6 @@ func NewNetAddress(addr net.Addr) *NetAddress { // address in the form of "IP:Port". Also resolves the host if host // is not an IP. func NewNetAddressString(addr string) (*NetAddress, error) { - host, portStr, err := net.SplitHostPort(addr) if err != nil { return nil, err @@ -73,16 +72,18 @@ func NewNetAddressString(addr string) (*NetAddress, error) { // NewNetAddressStrings returns an array of NetAddress'es build using // the provided strings. -func NewNetAddressStrings(addrs []string) ([]*NetAddress, error) { - netAddrs := make([]*NetAddress, len(addrs)) - for i, addr := range addrs { +func NewNetAddressStrings(addrs []string) ([]*NetAddress, []error) { + netAddrs := make([]*NetAddress, 0) + errs := make([]error, 0) + for _, addr := range addrs { netAddr, err := NewNetAddressString(addr) if err != nil { - return nil, errors.New(cmn.Fmt("Error in address %s: %v", addr, err)) + errs = append(errs, fmt.Errorf("Error in address %s: %v", addr, err)) + } else { + netAddrs = append(netAddrs, netAddr) } - netAddrs[i] = netAddr } - return netAddrs, nil + return netAddrs, errs } // NewNetAddressIPPort returns a new NetAddress using the provided IP diff --git a/p2p/netaddress_test.go b/p2p/netaddress_test.go index 7e899a314..db6147500 100644 --- a/p2p/netaddress_test.go +++ b/p2p/netaddress_test.go @@ -51,11 +51,9 @@ func TestNewNetAddressString(t *testing.T) { } func TestNewNetAddressStrings(t *testing.T) { - assert, require := assert.New(t), require.New(t) - addrs, err := NewNetAddressStrings([]string{"127.0.0.1:8080", "127.0.0.2:8080"}) - require.Nil(err) - - assert.Equal(2, len(addrs)) + addrs, errs := NewNetAddressStrings([]string{"127.0.0.1:8080", "127.0.0.2:8080"}) + assert.Len(t, errs, 0) + assert.Equal(t, 2, len(addrs)) } func TestNewNetAddressIPPort(t *testing.T) { diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 6e49f6d06..960c8c641 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -20,7 +20,7 @@ const ( minNumOutboundPeers = 10 maxPexMessageSize = 1048576 // 1MB - // maximum messages one peer can send to us during `msgCountByPeerFlushInterval` + // maximum pex messages one peer can send to us during `msgCountByPeerFlushInterval` defaultMaxMsgCountByPeer = 1000 msgCountByPeerFlushInterval = 1 * time.Hour ) @@ -247,6 +247,7 @@ func (r *PEXReactor) ensurePeers() { // bias to prefer more vetted peers when we have fewer connections. // not perfect, but somewhate ensures that we prioritize connecting to more-vetted + // NOTE: range here is [10, 90]. Too high ? newBias := cmn.MinInt(numOutPeers, 8)*10 + 10 toDial := make(map[string]*NetAddress) diff --git a/p2p/switch.go b/p2p/switch.go index f41b82959..4fdaec6ec 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -298,9 +298,9 @@ func (sw *Switch) startInitPeer(peer *peer) { // DialSeeds dials a list of seeds asynchronously in random order. func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error { - netAddrs, err := NewNetAddressStrings(seeds) - if err != nil { - return err + netAddrs, errs := NewNetAddressStrings(seeds) + for _, err := range errs { + sw.Logger.Error("Error in seed's address", "err", err) } if addrBook != nil { diff --git a/p2p/trust/store.go b/p2p/trust/store.go index e86aecd2c..fd84ac963 100644 --- a/p2p/trust/store.go +++ b/p2p/trust/store.go @@ -138,7 +138,7 @@ func (tms *TrustMetricStore) loadFromDB() bool { return false } - peers := make(map[string]MetricHistoryJSON, 0) + peers := make(map[string]MetricHistoryJSON) err := json.Unmarshal(bytes, &peers) if err != nil { cmn.PanicCrisis(cmn.Fmt("Could not unmarshal Trust Metric Store DB data: %v", err)) @@ -160,7 +160,7 @@ func (tms *TrustMetricStore) loadFromDB() bool { func (tms *TrustMetricStore) saveToDB() { tms.Logger.Debug("Saving TrustHistory to DB", "size", tms.size()) - peers := make(map[string]MetricHistoryJSON, 0) + peers := make(map[string]MetricHistoryJSON) for key, tm := range tms.peerMetrics { // Add an entry for the peer identified by key diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index e41c2d657..e7a84b6b4 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -2,7 +2,6 @@ package client import ( "context" - "fmt" "time" "github.com/pkg/errors" @@ -57,19 +56,20 @@ func WaitForHeight(c StatusClient, h int64, waiter Waiter) error { // // This handles subscribing and unsubscribing under the hood func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (types.TMEventData, error) { + const subscriber = "helpers" ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() evts := make(chan interface{}, 1) // register for the next event of this type - query := fmt.Sprintf("%s='%s'", types.EventTypeKey, evtTyp) - err := c.Subscribe(ctx, query, evts) + query := types.QueryForEvent(evtTyp) + err := c.Subscribe(ctx, subscriber, query, evts) if err != nil { return types.TMEventData{}, errors.Wrap(err, "failed to subscribe") } // make sure to unregister after the test is over - defer c.Unsubscribe(ctx, query) + defer c.UnsubscribeAll(ctx, subscriber) select { case evt := <-evts: diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index 1f49ea4d7..2ecfa7958 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -3,7 +3,6 @@ package client import ( "context" "encoding/json" - "fmt" "sync" "github.com/pkg/errors" @@ -13,6 +12,7 @@ import ( rpcclient "github.com/tendermint/tendermint/rpc/lib/client" "github.com/tendermint/tendermint/types" cmn "github.com/tendermint/tmlibs/common" + tmpubsub "github.com/tendermint/tmlibs/pubsub" ) /* @@ -204,20 +204,14 @@ type WSEvents struct { endpoint string ws *rpcclient.WSClient - subscriptions map[string]chan<- interface{} mtx sync.RWMutex - - // used for signaling the goroutine that feeds ws -> EventSwitch - quit chan bool - done chan bool + subscriptions map[string]chan<- interface{} } func newWSEvents(remote, endpoint string) *WSEvents { wsEvents := &WSEvents{ endpoint: endpoint, remote: remote, - quit: make(chan bool, 1), - done: make(chan bool, 1), subscriptions: make(map[string]chan<- interface{}), } @@ -225,87 +219,86 @@ func newWSEvents(remote, endpoint string) *WSEvents { return wsEvents } -// Start is the only way I could think the extend OnStart from -// events.eventSwitch. If only it wasn't private... -// BaseService.Start -> eventSwitch.OnStart -> WSEvents.Start -func (w *WSEvents) Start() error { - ws := rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() { +func (w *WSEvents) OnStart() error { + w.ws = rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() { w.redoSubscriptions() })) - err := ws.Start() - if err == nil { - w.ws = ws - go w.eventListener() + err := w.ws.Start() + if err != nil { + return err } - return err -} -// Stop wraps the BaseService/eventSwitch actions as Start does -func (w *WSEvents) Stop() error { - // send a message to quit to stop the eventListener - w.quit <- true - <-w.done - w.ws.Stop() - w.ws = nil + go w.eventListener() return nil } -func (w *WSEvents) Subscribe(ctx context.Context, query string, out chan<- interface{}) error { - if ch := w.getSubscription(query); ch != nil { - return errors.New("already subscribed") +// Stop wraps the BaseService/eventSwitch actions as Start does +func (w *WSEvents) OnStop() { + err := w.ws.Stop() + if err != nil { + w.Logger.Error("failed to stop WSClient", "err", err) } +} - err := w.ws.Subscribe(ctx, query) +func (w *WSEvents) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error { + q := query.String() + + err := w.ws.Subscribe(ctx, q) if err != nil { - return errors.Wrap(err, "failed to subscribe") + return err } w.mtx.Lock() - w.subscriptions[query] = out + // subscriber param is ignored because Tendermint will override it with + // remote IP anyway. + w.subscriptions[q] = out w.mtx.Unlock() return nil } -func (w *WSEvents) Unsubscribe(ctx context.Context, query string) error { - err := w.ws.Unsubscribe(ctx, query) +func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error { + q := query.String() + + err := w.ws.Unsubscribe(ctx, q) if err != nil { return err } w.mtx.Lock() - defer w.mtx.Unlock() - ch, ok := w.subscriptions[query] + ch, ok := w.subscriptions[q] if ok { close(ch) - delete(w.subscriptions, query) + delete(w.subscriptions, q) } + w.mtx.Unlock() return nil } -func (w *WSEvents) UnsubscribeAll(ctx context.Context) error { +func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error { err := w.ws.UnsubscribeAll(ctx) if err != nil { return err } w.mtx.Lock() - defer w.mtx.Unlock() for _, ch := range w.subscriptions { close(ch) } w.subscriptions = make(map[string]chan<- interface{}) + w.mtx.Unlock() + return nil } // After being reconnected, it is necessary to redo subscription to server // otherwise no data will be automatically received. func (w *WSEvents) redoSubscriptions() { - for query := range w.subscriptions { + for q := range w.subscriptions { // NOTE: no timeout for resubscribing // FIXME: better logging/handling of errors?? - w.ws.Subscribe(context.Background(), query) + w.ws.Subscribe(context.Background(), q) } } @@ -316,34 +309,29 @@ func (w *WSEvents) redoSubscriptions() { func (w *WSEvents) eventListener() { for { select { - case resp := <-w.ws.ResponsesCh: - // res is json.RawMessage + case resp, ok := <-w.ws.ResponsesCh: + if !ok { + return + } if resp.Error != nil { - // FIXME: better logging/handling of errors?? - fmt.Printf("ws err: %+v\n", resp.Error.Error()) + w.Logger.Error("WS error", "err", resp.Error.Error()) continue } result := new(ctypes.ResultEvent) err := json.Unmarshal(resp.Result, result) if err != nil { - // ignore silently (eg. subscribe, unsubscribe and maybe other events) - // TODO: ? + w.Logger.Error("failed to unmarshal response", "err", err) continue } - if ch := w.getSubscription(result.Query); ch != nil { + // NOTE: writing also happens inside mutex so we can't close a channel in + // Unsubscribe/UnsubscribeAll. + w.mtx.RLock() + if ch, ok := w.subscriptions[result.Query]; ok { ch <- result.Data } - case <-w.quit: - // send a message so we can wait for the routine to exit - // before cleaning up the w.ws stuff - w.done <- true + w.mtx.RUnlock() + case <-w.Quit: return } } } - -func (w *WSEvents) getSubscription(query string) chan<- interface{} { - w.mtx.RLock() - defer w.mtx.RUnlock() - return w.subscriptions[query] -} diff --git a/rpc/client/interface.go b/rpc/client/interface.go index c38f188ee..063d50e19 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -20,8 +20,6 @@ implementation. package client import ( - "context" - data "github.com/tendermint/go-wire/data" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" @@ -89,7 +87,5 @@ type NetworkClient interface { // EventsClient is reactive, you can subscribe to any message, given the proper // string. see tendermint/types/events.go type EventsClient interface { - Subscribe(ctx context.Context, query string, out chan<- interface{}) error - Unsubscribe(ctx context.Context, query string) error - UnsubscribeAll(ctx context.Context) error + types.EventBusSubscriber } diff --git a/rpc/client/localclient.go b/rpc/client/localclient.go index 40c249123..18c6759de 100644 --- a/rpc/client/localclient.go +++ b/rpc/client/localclient.go @@ -3,19 +3,12 @@ package client import ( "context" - "github.com/pkg/errors" - data "github.com/tendermint/go-wire/data" nm "github.com/tendermint/tendermint/node" "github.com/tendermint/tendermint/rpc/core" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" - tmquery "github.com/tendermint/tmlibs/pubsub/query" -) - -const ( - // event bus subscriber - subscriber = "rpc-localclient" + tmpubsub "github.com/tendermint/tmlibs/pubsub" ) /* @@ -33,10 +26,7 @@ For real clients, you probably want to use client.HTTP. For more powerful control during testing, you probably want the "client/mock" package. */ type Local struct { - node *nm.Node - *types.EventBus - subscriptions map[string]*tmquery.Query } // NewLocal configures a client that calls the Node directly. @@ -48,9 +38,7 @@ type Local struct { func NewLocal(node *nm.Node) *Local { node.ConfigureRPC() return &Local{ - node: node, - EventBus: node.EventBus(), - subscriptions: make(map[string]*tmquery.Query), + EventBus: node.EventBus(), } } @@ -68,7 +56,7 @@ func (Local) ABCIInfo() (*ctypes.ResultABCIInfo, error) { return core.ABCIInfo() } -func (c Local) ABCIQuery(path string, data data.Bytes) (*ctypes.ResultABCIQuery, error) { +func (c *Local) ABCIQuery(path string, data data.Bytes) (*ctypes.ResultABCIQuery, error) { return c.ABCIQueryWithOptions(path, data, DefaultABCIQueryOptions) } @@ -128,34 +116,14 @@ func (Local) TxSearch(query string, prove bool) ([]*ctypes.ResultTx, error) { return core.TxSearch(query, prove) } -func (c *Local) Subscribe(ctx context.Context, query string, out chan<- interface{}) error { - q, err := tmquery.New(query) - if err != nil { - return errors.Wrap(err, "failed to subscribe") - } - if err = c.EventBus.Subscribe(ctx, subscriber, q, out); err != nil { - return errors.Wrap(err, "failed to subscribe") - } - c.subscriptions[query] = q - return nil +func (c *Local) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error { + return c.EventBus.Subscribe(ctx, subscriber, query, out) } -func (c *Local) Unsubscribe(ctx context.Context, query string) error { - q, ok := c.subscriptions[query] - if !ok { - return errors.New("subscription not found") - } - if err := c.EventBus.Unsubscribe(ctx, subscriber, q); err != nil { - return errors.Wrap(err, "failed to unsubscribe") - } - delete(c.subscriptions, query) - return nil +func (c *Local) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error { + return c.EventBus.Unsubscribe(ctx, subscriber, query) } -func (c *Local) UnsubscribeAll(ctx context.Context) error { - if err := c.EventBus.UnsubscribeAll(ctx, subscriber); err != nil { - return errors.Wrap(err, "failed to unsubscribe") - } - c.subscriptions = make(map[string]*tmquery.Query) - return nil +func (c *Local) UnsubscribeAll(ctx context.Context, subscriber string) error { + return c.EventBus.UnsubscribeAll(ctx, subscriber) } diff --git a/rpc/core/events.go b/rpc/core/events.go index 81f1c919a..538134b0f 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -44,20 +44,15 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri q, err := tmquery.New(query) if err != nil { - return nil, errors.Wrap(err, "failed to parse a query") - } - - err = wsCtx.AddSubscription(query, q) - if err != nil { - return nil, errors.Wrap(err, "failed to add subscription") + return nil, errors.Wrap(err, "failed to parse query") } ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) defer cancel() ch := make(chan interface{}) - err = eventBus.Subscribe(ctx, addr, q, ch) + err = eventBusFor(wsCtx).Subscribe(ctx, addr, q, ch) if err != nil { - return nil, errors.Wrap(err, "failed to subscribe") + return nil, err } go func() { @@ -100,18 +95,31 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri func Unsubscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultUnsubscribe, error) { addr := wsCtx.GetRemoteAddr() logger.Info("Unsubscribe from query", "remote", addr, "query", query) - q, ok := wsCtx.DeleteSubscription(query) - if !ok { - return nil, errors.New("subscription not found") + q, err := tmquery.New(query) + if err != nil { + return nil, errors.Wrap(err, "failed to parse query") + } + err = eventBusFor(wsCtx).Unsubscribe(context.Background(), addr, q) + if err != nil { + return nil, err } - eventBus.Unsubscribe(context.Background(), addr, q.(*tmquery.Query)) return &ctypes.ResultUnsubscribe{}, nil } func UnsubscribeAll(wsCtx rpctypes.WSRPCContext) (*ctypes.ResultUnsubscribe, error) { addr := wsCtx.GetRemoteAddr() logger.Info("Unsubscribe from all", "remote", addr) - eventBus.UnsubscribeAll(context.Background(), addr) - wsCtx.DeleteAllSubscriptions() + err := eventBusFor(wsCtx).UnsubscribeAll(context.Background(), addr) + if err != nil { + return nil, err + } return &ctypes.ResultUnsubscribe{}, nil } + +func eventBusFor(wsCtx rpctypes.WSRPCContext) tmtypes.EventBusSubscriber { + es := wsCtx.GetEventSubscriber() + if es == nil { + es = eventBus + } + return es +} diff --git a/rpc/core/status.go b/rpc/core/status.go index 0cb7acc1f..653c37f50 100644 --- a/rpc/core/status.go +++ b/rpc/core/status.go @@ -24,35 +24,35 @@ import ( // // ```json // { -// "error": "", -// "result": { -// "latest_block_time": 1.49631773695e+18, -// "latest_block_height": 22924, -// "latest_app_hash": "9D16177BC71E445476174622EA559715C293740C", -// "latest_block_hash": "75B36EEF96C277A592D8B14867098C58F68BB180", -// "pub_key": { -// "data": "68DFDA7E50F82946E7E8546BED37944A422CD1B831E70DF66BA3B8430593944D", -// "type": "ed25519" -// }, -// "node_info": { -// "other": [ -// "wire_version=0.6.2", -// "p2p_version=0.5.0", -// "consensus_version=v1/0.2.2", -// "rpc_version=0.7.0/3", -// "tx_index=on", -// "rpc_addr=tcp://0.0.0.0:46657" -// ], -// "version": "0.10.0-rc1-aa22bd84", -// "listen_addr": "10.0.2.15:46656", -// "remote_addr": "", -// "network": "test-chain-6UTNIN", -// "moniker": "anonymous", -// "pub_key": "659B9E54DD6EF9FEF28FAD40629AF0E4BD3C2563BB037132B054A176E00F1D94" -// } -// }, -// "id": "", -// "jsonrpc": "2.0" +// "result": { +// "syncing": false, +// "latest_block_time": "2017-12-07T18:19:47.617Z", +// "latest_block_height": 6, +// "latest_app_hash": "", +// "latest_block_hash": "A63D0C3307DEDCCFCC82ED411AE9108B70B29E02", +// "pub_key": { +// "data": "8C9A68070CBE33F9C445862BA1E9D96A75CEB68C0CF6ADD3652D07DCAC5D0380", +// "type": "ed25519" +// }, +// "node_info": { +// "other": [ +// "wire_version=0.7.2", +// "p2p_version=0.5.0", +// "consensus_version=v1/0.2.2", +// "rpc_version=0.7.0/3", +// "tx_index=on", +// "rpc_addr=tcp://0.0.0.0:46657" +// ], +// "version": "0.13.0-14ccc8b", +// "listen_addr": "10.0.2.15:46656", +// "remote_addr": "", +// "network": "test-chain-qhVCa2", +// "moniker": "vagrant-ubuntu-trusty-64", +// "pub_key": "844981FE99ABB19F7816F2D5E94E8A74276AB1153760A7799E925C75401856C6" +// } +// }, +// "id": "", +// "jsonrpc": "2.0" // } // ``` func Status() (*ctypes.ResultStatus, error) { diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index e4ed442e4..79e3f63f4 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -169,13 +169,14 @@ func (c *WSClient) OnStop() {} // Stop overrides cmn.Service#Stop. There is no other way to wait until Quit // channel is closed. func (c *WSClient) Stop() error { - err := c.BaseService.Stop() - if err == nil { - // only close user-facing channels when we can't write to them - c.wg.Wait() - close(c.ResponsesCh) + if err := c.BaseService.Stop(); err != nil { + return err } - return err + // only close user-facing channels when we can't write to them + c.wg.Wait() + close(c.ResponsesCh) + + return nil } // IsReconnecting returns true if the client is reconnecting right now. diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index c81821690..1e14ea9a0 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -2,6 +2,7 @@ package rpcserver import ( "bytes" + "context" "encoding/hex" "encoding/json" "fmt" @@ -366,8 +367,6 @@ type wsConnection struct { funcMap map[string]*RPCFunc - subscriptions map[string]interface{} - // write channel capacity writeChanCapacity int @@ -380,8 +379,8 @@ type wsConnection struct { // Send pings to server with this period. Must be less than readWait, but greater than zero. pingPeriod time.Duration - // called before stopping the connection. - onDisconnect func(remoteAddr string) + // object that is used to subscribe / unsubscribe from events + eventSub types.EventSubscriber } // NewWSConnection wraps websocket.Conn. @@ -395,7 +394,6 @@ func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, opti remoteAddr: baseConn.RemoteAddr().String(), baseConn: baseConn, funcMap: funcMap, - subscriptions: make(map[string]interface{}), writeWait: defaultWSWriteWait, writeChanCapacity: defaultWSWriteChanCapacity, readWait: defaultWSReadWait, @@ -408,6 +406,15 @@ func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, opti return wsc } +// EventSubscriber sets object that is used to subscribe / unsubscribe from +// events - not Goroutine-safe. If none given, default node's eventBus will be +// used. +func EventSubscriber(eventSub types.EventSubscriber) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.eventSub = eventSub + } +} + // WriteWait sets the amount of time to wait before a websocket write times out. // It should only be used in the constructor - not Goroutine-safe. func WriteWait(writeWait time.Duration) func(*wsConnection) { @@ -440,14 +447,6 @@ func PingPeriod(pingPeriod time.Duration) func(*wsConnection) { } } -// OnDisconnect called before stopping the connection. -// It should only be used in the constructor - not Goroutine-safe. -func OnDisconnect(cb func(remoteAddr string)) func(*wsConnection) { - return func(wsc *wsConnection) { - wsc.onDisconnect = cb - } -} - // OnStart implements cmn.Service by starting the read and write routines. It // blocks until the connection closes. func (wsc *wsConnection) OnStart() error { @@ -461,12 +460,12 @@ func (wsc *wsConnection) OnStart() error { return nil } -// OnStop implements cmn.Service by calling OnDisconnect callback. +// OnStop implements cmn.Service by unsubscribing remoteAddr from all subscriptions. func (wsc *wsConnection) OnStop() { // Both read and write loops close the websocket connection when they exit their loops. // The writeChan is never closed, to allow WriteRPCResponse() to fail. - if wsc.onDisconnect != nil { - wsc.onDisconnect(wsc.remoteAddr) + if wsc.eventSub != nil { + wsc.eventSub.UnsubscribeAll(context.TODO(), wsc.remoteAddr) } } @@ -476,6 +475,11 @@ func (wsc *wsConnection) GetRemoteAddr() string { return wsc.remoteAddr } +// GetEventSubscriber implements WSRPCConnection by returning event subscriber. +func (wsc *wsConnection) GetEventSubscriber() types.EventSubscriber { + return wsc.eventSub +} + // WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted. // It implements WSRPCConnection. It is Goroutine-safe. func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) { @@ -499,28 +503,6 @@ func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool { } } -func (wsc *wsConnection) AddSubscription(query string, data interface{}) error { - if _, ok := wsc.subscriptions[query]; ok { - return errors.New("Already subscribed") - } - - wsc.subscriptions[query] = data - return nil -} - -func (wsc *wsConnection) DeleteSubscription(query string) (interface{}, bool) { - data, ok := wsc.subscriptions[query] - if ok { - delete(wsc.subscriptions, query) - return data, true - } - return nil, false -} - -func (wsc *wsConnection) DeleteAllSubscriptions() { - wsc.subscriptions = make(map[string]interface{}) -} - // Read from the socket and subscribe to or unsubscribe from events func (wsc *wsConnection) readRoutine() { defer func() { diff --git a/rpc/lib/types/types.go b/rpc/lib/types/types.go index bac7c2409..37d451457 100644 --- a/rpc/lib/types/types.go +++ b/rpc/lib/types/types.go @@ -1,11 +1,13 @@ package rpctypes import ( + "context" "encoding/json" "fmt" "strings" "github.com/pkg/errors" + tmpubsub "github.com/tendermint/tmlibs/pubsub" ) //---------------------------------------- @@ -135,10 +137,14 @@ type WSRPCConnection interface { GetRemoteAddr() string WriteRPCResponse(resp RPCResponse) TryWriteRPCResponse(resp RPCResponse) bool + GetEventSubscriber() EventSubscriber +} - AddSubscription(string, interface{}) error - DeleteSubscription(string) (interface{}, bool) - DeleteAllSubscriptions() +// EventSubscriber mirros tendermint/tendermint/types.EventBusSubscriber +type EventSubscriber interface { + Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error + Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error + UnsubscribeAll(ctx context.Context, subscriber string) error } // websocket-only RPCFuncs take this as the first parameter. diff --git a/state/txindex/kv/kv.go b/state/txindex/kv/kv.go index d40fe80fe..b70f3699f 100644 --- a/state/txindex/kv/kv.go +++ b/state/txindex/kv/kv.go @@ -342,7 +342,7 @@ func startKey(c query.Condition, height int64) []byte { func startKeyForRange(r queryRange, height int64) []byte { if r.lowerBound == nil { - return []byte(fmt.Sprintf("%s", r.key)) + return []byte(r.key) } var lowerBound interface{} diff --git a/types/event_bus.go b/types/event_bus.go index 6cee1d82b..6b6069b90 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -12,6 +12,12 @@ import ( const defaultCapacity = 1000 +type EventBusSubscriber interface { + Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error + Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error + UnsubscribeAll(ctx context.Context, subscriber string) error +} + // EventBus is a common bus for all events going through the system. All calls // are proxied to underlying pubsub server. All events must be published using // EventBus to ensure correct data types. diff --git a/types/events.go b/types/events.go index 08ebf46da..5c41c6df6 100644 --- a/types/events.go +++ b/types/events.go @@ -146,32 +146,32 @@ const ( ) var ( - EventQueryBond = queryForEvent(EventBond) - EventQueryUnbond = queryForEvent(EventUnbond) - EventQueryRebond = queryForEvent(EventRebond) - EventQueryDupeout = queryForEvent(EventDupeout) - EventQueryFork = queryForEvent(EventFork) - EventQueryNewBlock = queryForEvent(EventNewBlock) - EventQueryNewBlockHeader = queryForEvent(EventNewBlockHeader) - EventQueryNewRound = queryForEvent(EventNewRound) - EventQueryNewRoundStep = queryForEvent(EventNewRoundStep) - EventQueryTimeoutPropose = queryForEvent(EventTimeoutPropose) - EventQueryCompleteProposal = queryForEvent(EventCompleteProposal) - EventQueryPolka = queryForEvent(EventPolka) - EventQueryUnlock = queryForEvent(EventUnlock) - EventQueryLock = queryForEvent(EventLock) - EventQueryRelock = queryForEvent(EventRelock) - EventQueryTimeoutWait = queryForEvent(EventTimeoutWait) - EventQueryVote = queryForEvent(EventVote) - EventQueryProposalHeartbeat = queryForEvent(EventProposalHeartbeat) - EventQueryTx = queryForEvent(EventTx) + EventQueryBond = QueryForEvent(EventBond) + EventQueryUnbond = QueryForEvent(EventUnbond) + EventQueryRebond = QueryForEvent(EventRebond) + EventQueryDupeout = QueryForEvent(EventDupeout) + EventQueryFork = QueryForEvent(EventFork) + EventQueryNewBlock = QueryForEvent(EventNewBlock) + EventQueryNewBlockHeader = QueryForEvent(EventNewBlockHeader) + EventQueryNewRound = QueryForEvent(EventNewRound) + EventQueryNewRoundStep = QueryForEvent(EventNewRoundStep) + EventQueryTimeoutPropose = QueryForEvent(EventTimeoutPropose) + EventQueryCompleteProposal = QueryForEvent(EventCompleteProposal) + EventQueryPolka = QueryForEvent(EventPolka) + EventQueryUnlock = QueryForEvent(EventUnlock) + EventQueryLock = QueryForEvent(EventLock) + EventQueryRelock = QueryForEvent(EventRelock) + EventQueryTimeoutWait = QueryForEvent(EventTimeoutWait) + EventQueryVote = QueryForEvent(EventVote) + EventQueryProposalHeartbeat = QueryForEvent(EventProposalHeartbeat) + EventQueryTx = QueryForEvent(EventTx) ) func EventQueryTxFor(tx Tx) tmpubsub.Query { return tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s='%X'", EventTypeKey, EventTx, TxHashKey, tx.Hash())) } -func queryForEvent(eventType string) tmpubsub.Query { +func QueryForEvent(eventType string) tmpubsub.Query { return tmquery.MustParse(fmt.Sprintf("%s='%s'", EventTypeKey, eventType)) }