Browse Source

Merge pull request #960 from tendermint/release-v0.14.0

Release v0.14.0
pull/1025/head v0.14.0
Ethan Buchman 7 years ago
committed by GitHub
parent
commit
88f5f21dbb
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
56 changed files with 1387 additions and 647 deletions
  1. +2
    -3
      .github/CODEOWNERS
  2. +18
    -0
      CHANGELOG.md
  3. +2
    -2
      DOCKER/Dockerfile
  4. +3
    -1
      DOCKER/README.md
  5. +20
    -27
      Makefile
  6. +34
    -24
      Vagrantfile
  7. +13
    -0
      appveyor.yml
  8. +2
    -0
      circle.yml
  9. +60
    -0
      cmd/tendermint/commands/lite.go
  10. +3
    -0
      cmd/tendermint/commands/root_test.go
  11. +1
    -0
      cmd/tendermint/main.go
  12. +17
    -1
      config/config.go
  13. +2
    -5
      config/toml.go
  14. +2
    -2
      config/toml_test.go
  15. +9
    -7
      consensus/replay_test.go
  16. +0
    -36
      consensus/test_data/README.md
  17. +0
    -148
      consensus/test_data/build.sh
  18. BIN
      consensus/test_data/many_blocks.cswal
  19. +1
    -1
      consensus/ticker.go
  20. +2
    -30
      consensus/wal.go
  21. +181
    -0
      consensus/wal_generator.go
  22. +7
    -2
      consensus/wal_test.go
  23. +103
    -0
      docs/architecture/adr-007-trust-metric-usage.md
  24. +2
    -1
      docs/specification/configuration.rst
  25. +3
    -3
      glide.lock
  26. +1
    -1
      glide.yaml
  27. +40
    -0
      lite/proxy/block.go
  28. +30
    -0
      lite/proxy/certifier.go
  29. +22
    -0
      lite/proxy/errors.go
  30. +17
    -0
      lite/proxy/errors_test.go
  31. +69
    -0
      lite/proxy/proxy.go
  32. +120
    -0
      lite/proxy/query.go
  33. +140
    -0
      lite/proxy/query_test.go
  34. +185
    -0
      lite/proxy/wrapper.go
  35. +2
    -14
      node/node.go
  36. +9
    -8
      p2p/netaddress.go
  37. +3
    -5
      p2p/netaddress_test.go
  38. +2
    -1
      p2p/pex_reactor.go
  39. +79
    -34
      p2p/switch.go
  40. +2
    -2
      p2p/switch_test.go
  41. +2
    -2
      p2p/trust/store.go
  42. +4
    -4
      rpc/client/helpers.go
  43. +46
    -58
      rpc/client/httpclient.go
  44. +1
    -5
      rpc/client/interface.go
  45. +9
    -41
      rpc/client/localclient.go
  46. +22
    -14
      rpc/core/events.go
  47. +29
    -29
      rpc/core/status.go
  48. +7
    -6
      rpc/lib/client/ws_client.go
  49. +20
    -38
      rpc/lib/server/handlers.go
  50. +9
    -3
      rpc/lib/types/types.go
  51. +1
    -1
      rpc/test/helpers.go
  52. +0
    -65
      scripts/cutWALUntil/main.go
  53. +1
    -1
      state/txindex/kv/kv.go
  54. +6
    -0
      types/event_bus.go
  55. +20
    -20
      types/events.go
  56. +2
    -2
      version/version.go

+ 2
- 3
.github/CODEOWNERS View File

@ -1,5 +1,4 @@
# CODEOWNERS: https://help.github.com/articles/about-codeowners/
# Everything goes through Bucky. For now.
* @ebuchman
# Everything goes through Bucky and Anton. For now.
* @ebuchman @melekes

+ 18
- 0
CHANGELOG.md View File

@ -27,6 +27,24 @@ BUG FIXES:
- Graceful handling/recovery for apps that have non-determinism or fail to halt
- Graceful handling/recovery for violations of safety, or liveness
## 0.14.0 (December 11, 2017)
BREAKING CHANGES:
- consensus/wal: removed separator
- rpc/client: changed Subscribe/Unsubscribe/UnsubscribeAll funcs signatures to be identical to event bus.
FEATURES:
- new `tendermint lite` command (and `lite/proxy` pkg) for running a light-client RPC proxy.
NOTE it is currently insecure and its APIs are not yet covered by semver
IMPROVEMENTS:
- rpc/client: can act as event bus subscriber (See https://github.com/tendermint/tendermint/issues/945).
- p2p: use exponential backoff from seconds to hours when attempting to reconnect to persistent peer
- config: moniker defaults to the machine's hostname instead of "anonymous"
BUG FIXES:
- p2p: no longer exit if one of the seed addresses is incorrect
## 0.13.0 (December 6, 2017)
BREAKING CHANGES:


+ 2
- 2
DOCKER/Dockerfile View File

@ -1,8 +1,8 @@
FROM alpine:3.6
# This is the release of tendermint to pull in.
ENV TM_VERSION 0.12.0
ENV TM_SHA256SUM be17469e92f04fc2a3663f891da28edbaa6c37c4d2f746736571887f4790555a
ENV TM_VERSION 0.13.0
ENV TM_SHA256SUM 36d773d4c2890addc61cc87a72c1e9c21c89516921b0defb0edfebde719b4b85
# Tendermint will be looking for genesis file in /tendermint (unless you change
# `genesis_file` in config.toml). You can put your config.toml and private


+ 3
- 1
DOCKER/README.md View File

@ -1,6 +1,8 @@
# Supported tags and respective `Dockerfile` links
- `0.12.0`, `latest` [(Dockerfile)](https://github.com/tendermint/tendermint/blob/70d8afa6e952e24c573ece345560a5971bf2cc0e/DOCKER/Dockerfile)
- `0.13.0`, `latest` [(Dockerfile)](https://github.com/tendermint/tendermint/blob/a28b3fff49dce2fb31f90abb2fc693834e0029c2/DOCKER/Dockerfile)
- `0.12.1` [(Dockerfile)](https://github.com/tendermint/tendermint/blob/457c688346b565e90735431619ca3ca597ef9007/DOCKER/Dockerfile)
- `0.12.0` [(Dockerfile)](https://github.com/tendermint/tendermint/blob/70d8afa6e952e24c573ece345560a5971bf2cc0e/DOCKER/Dockerfile)
- `0.11.0` [(Dockerfile)](https://github.com/tendermint/tendermint/blob/9177cc1f64ca88a4a0243c5d1773d10fba67e201/DOCKER/Dockerfile)
- `0.10.0` [(Dockerfile)](https://github.com/tendermint/tendermint/blob/e5342f4054ab784b2cd6150e14f01053d7c8deb2/DOCKER/Dockerfile)
- `0.9.1`, `0.9`, [(Dockerfile)](https://github.com/tendermint/tendermint/blob/809e0e8c5933604ba8b2d096803ada7c5ec4dfd3/DOCKER/Dockerfile)


+ 20
- 27
Makefile View File

@ -1,7 +1,7 @@
GOTOOLS = \
github.com/mitchellh/gox \
github.com/tcnksm/ghr \
github.com/alecthomas/gometalinter
gopkg.in/alecthomas/gometalinter.v2
PACKAGES=$(shell go list ./... | grep -v '/vendor/')
BUILD_TAGS?=tendermint
@ -12,13 +12,13 @@ BUILD_FLAGS = -ldflags "-X github.com/tendermint/tendermint/version.GitCommit=`g
all: get_vendor_deps install test
install:
CGO_ENABLED=0 go install $(BUILD_FLAGS) ./cmd/tendermint
go install $(BUILD_FLAGS) ./cmd/tendermint
build:
CGO_ENABLED=0 go build $(BUILD_FLAGS) -o build/tendermint ./cmd/tendermint/
go build $(BUILD_FLAGS) -o build/tendermint ./cmd/tendermint/
build_race:
CGO_ENABLED=0 go build -race $(BUILD_FLAGS) -o build/tendermint ./cmd/tendermint
go build -race $(BUILD_FLAGS) -o build/tendermint ./cmd/tendermint
# dist builds binaries for all platforms and packages them for distribution
dist:
@ -43,32 +43,25 @@ test_release:
test100:
@for i in {1..100}; do make test; done
vagrant_test:
vagrant up
vagrant ssh -c 'make install'
vagrant ssh -c 'make test_race'
vagrant ssh -c 'make test_integrations'
draw_deps:
# requires brew install graphviz or apt-get install graphviz
go get github.com/RobotsAndPencils/goviz
@goviz -i github.com/tendermint/tendermint/cmd/tendermint -d 3 | dot -Tpng -o dependency-graph.png
list_deps:
@go list -f '{{join .Deps "\n"}}' ./... | \
grep -v /vendor/ | sort | uniq | \
xargs go list -f '{{if not .Standard}}{{.ImportPath}}{{end}}'
get_deps:
@echo "--> Running go get"
@go get -v -d $(PACKAGES)
@go list -f '{{join .TestImports "\n"}}' ./... | \
grep -v /vendor/ | sort | uniq | \
xargs go get -v -d
update_deps:
@echo "--> Updating dependencies"
@go get -d -u ./...
get_vendor_deps:
@hash glide 2>/dev/null || go get github.com/Masterminds/glide
@rm -rf vendor/
@echo "--> Running glide install"
@glide install
@$(GOPATH)/bin/glide install
update_vendor_deps:
@$(GOPATH)/bin/glide update
update_tools:
@echo "--> Updating tools"
@ -77,21 +70,22 @@ update_tools:
tools:
@echo "--> Installing tools"
@go get $(GOTOOLS)
@gometalinter --install
$(GOPATH)/bin/gometalinter.v2 --install
### Formatting, linting, and vetting
metalinter:
@gometalinter --vendor --deadline=600s --enable-all --disable=lll ./...
$(GOPATH)/bin/gometalinter.v2 --vendor --deadline=600s --enable-all --disable=lll ./...
metalinter_test:
@gometalinter --vendor --deadline=600s --disable-all \
$(GOPATH)/bin/gometalinter.v2 --vendor --deadline=600s --disable-all \
--enable=deadcode \
--enable=gosimple \
--enable=misspell \
--enable=safesql \
./...
# --enable=gas \
#--enable=gas \
#--enable=maligned \
#--enable=dupl \
#--enable=errcheck \
@ -99,7 +93,6 @@ metalinter_test:
#--enable=gocyclo \
#--enable=goimports \
#--enable=golint \ <== comments on anything exported
#--enable=gosimple \
#--enable=gotype \
#--enable=ineffassign \
#--enable=interfacer \
@ -113,4 +106,4 @@ metalinter_test:
#--enable=vet \
#--enable=vetshadow \
.PHONY: install build build_race dist test test_race test_integrations test100 draw_deps list_deps get_deps get_vendor_deps update_deps update_tools tools test_release
.PHONY: install build build_race dist test test_race test_integrations test100 draw_deps get_vendor_deps update_vendor_deps update_tools tools test_release

+ 34
- 24
Vagrantfile View File

@ -2,7 +2,7 @@
# vi: set ft=ruby :
Vagrant.configure("2") do |config|
config.vm.box = "ubuntu/trusty64"
config.vm.box = "ubuntu/xenial64"
config.vm.provider "virtualbox" do |v|
v.memory = 4096
@ -10,30 +10,40 @@ Vagrant.configure("2") do |config|
end
config.vm.provision "shell", inline: <<-SHELL
apt-get update
apt-get install -y --no-install-recommends wget curl jq shellcheck bsdmainutils psmisc
wget -qO- https://get.docker.com/ | sh
usermod -a -G docker vagrant
apt-get autoremove -y
apt-get install -y --no-install-recommends git
curl -O https://storage.googleapis.com/golang/go1.9.linux-amd64.tar.gz
tar -xvf go1.9.linux-amd64.tar.gz
rm -rf /usr/local/go
mv go /usr/local
rm -f go1.9.linux-amd64.tar.gz
mkdir -p /home/vagrant/go/bin
echo 'export PATH=$PATH:/usr/local/go/bin:/home/vagrant/go/bin' >> /home/vagrant/.bash_profile
echo 'export GOPATH=/home/vagrant/go' >> /home/vagrant/.bash_profile
# add docker repo
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add -
add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu xenial stable"
echo 'export LC_ALL=en_US.UTF-8' >> /home/vagrant/.bash_profile
# and golang 1.9 support
# official repo doesn't have race detection runtime...
# add-apt-repository ppa:gophers/archive
add-apt-repository ppa:longsleep/golang-backports
mkdir -p /home/vagrant/go/src/github.com/tendermint
ln -s /vagrant /home/vagrant/go/src/github.com/tendermint/tendermint
chown -R vagrant:vagrant /home/vagrant/go
su - vagrant -c 'cd /home/vagrant/go/src/github.com/tendermint/tendermint && make get_vendor_deps'
# install base requirements
apt-get update
apt-get install -y --no-install-recommends wget curl jq \
make shellcheck bsdmainutils psmisc
apt-get install -y docker-ce golang-1.9-go
# needed for docker
usermod -a -G docker ubuntu
# use "EOF" not EOF to avoid variable substitution of $PATH
cat << "EOF" >> /home/ubuntu/.bash_profile
export PATH=$PATH:/usr/lib/go-1.9/bin:/home/ubuntu/go/bin
export GOPATH=/home/ubuntu/go
export LC_ALL=en_US.UTF-8
cd go/src/github.com/tendermint/tendermint
EOF
mkdir -p /home/ubuntu/go/bin
mkdir -p /home/ubuntu/go/src/github.com/tendermint
ln -s /vagrant /home/ubuntu/go/src/github.com/tendermint/tendermint
chown -R ubuntu:ubuntu /home/ubuntu/go
chown ubuntu:ubuntu /home/ubuntu/.bash_profile
# get all deps and tools, ready to install/test
su - ubuntu -c 'cd /home/ubuntu/go/src/github.com/tendermint/tendermint && make get_vendor_deps && make tools'
SHELL
end

+ 13
- 0
appveyor.yml View File

@ -0,0 +1,13 @@
version: 1.0.{build}
configuration: Release
platform:
- x64
- x86
clone_folder: c:\go\path\src\github.com\tendermint\tendermint
before_build:
- cmd: set GOPATH=%GOROOT%\path
- cmd: set PATH=%GOPATH%\bin;%PATH%
- cmd: make get_vendor_deps
build_script:
- cmd: make test
test: off

+ 2
- 0
circle.yml View File

@ -7,6 +7,7 @@ machine:
GOPATH: "$HOME/.go_project"
PROJECT_PARENT_PATH: "$GOPATH/src/github.com/$CIRCLE_PROJECT_USERNAME"
PROJECT_PATH: "$PROJECT_PARENT_PATH/$CIRCLE_PROJECT_REPONAME"
PATH: "$HOME/.go_project/bin:${PATH}"
hosts:
localhost: 127.0.0.1
@ -24,6 +25,7 @@ dependencies:
test:
override:
- cd "$PROJECT_PATH" && make tools && make get_vendor_deps && make metalinter_test
- cd "$PROJECT_PATH" && set -o pipefail && make test_integrations 2>&1 | tee test_integrations.log:
timeout: 1800
post:


+ 60
- 0
cmd/tendermint/commands/lite.go View File

@ -0,0 +1,60 @@
package commands
import (
"github.com/spf13/cobra"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tendermint/lite/proxy"
rpcclient "github.com/tendermint/tendermint/rpc/client"
)
// LiteCmd represents the base command when called without any subcommands
var LiteCmd = &cobra.Command{
Use: "lite",
Short: "Run lite-client proxy server, verifying tendermint rpc",
Long: `This node will run a secure proxy to a tendermint rpc server.
All calls that can be tracked back to a block header by a proof
will be verified before passing them back to the caller. Other that
that it will present the same interface as a full tendermint node,
just with added trust and running locally.`,
RunE: runProxy,
SilenceUsage: true,
}
var (
listenAddr string
nodeAddr string
chainID string
home string
)
func init() {
LiteCmd.Flags().StringVar(&listenAddr, "laddr", ":8888", "Serve the proxy on the given port")
LiteCmd.Flags().StringVar(&nodeAddr, "node", "localhost:46657", "Connect to a Tendermint node at this address")
LiteCmd.Flags().StringVar(&chainID, "chain-id", "tendermint", "Specify the Tendermint chain ID")
LiteCmd.Flags().StringVar(&home, "home-dir", ".tendermint-lite", "Specify the home directory")
}
func runProxy(cmd *cobra.Command, args []string) error {
// First, connect a client
node := rpcclient.NewHTTP(nodeAddr, "/websocket")
cert, err := proxy.GetCertifier(chainID, home, nodeAddr)
if err != nil {
return err
}
sc := proxy.SecureClient(node, cert)
err = proxy.StartProxy(sc, listenAddr, logger)
if err != nil {
return err
}
cmn.TrapSignal(func() {
// TODO: close up shop
})
return nil
}

+ 3
- 0
cmd/tendermint/commands/root_test.go View File

@ -32,6 +32,9 @@ func isolate(cmds ...*cobra.Command) cli.Executable {
if err := os.Unsetenv("TM_HOME"); err != nil {
panic(err)
}
if err := os.RemoveAll(defaultRoot); err != nil {
panic(err)
}
viper.Reset()
config = cfg.DefaultConfig()


+ 1
- 0
cmd/tendermint/main.go View File

@ -15,6 +15,7 @@ func main() {
cmd.GenValidatorCmd,
cmd.InitFilesCmd,
cmd.ProbeUpnpCmd,
cmd.LiteCmd,
cmd.ReplayCmd,
cmd.ReplayConsoleCmd,
cmd.ResetAllCmd,


+ 17
- 1
config/config.go View File

@ -2,6 +2,7 @@ package config
import (
"fmt"
"os"
"path/filepath"
"time"
)
@ -108,7 +109,7 @@ func DefaultBaseConfig() BaseConfig {
return BaseConfig{
Genesis: "genesis.json",
PrivValidator: "priv_validator.json",
Moniker: "anonymous",
Moniker: defaultMoniker,
ProxyApp: "tcp://127.0.0.1:46658",
ABCI: "socket",
LogLevel: DefaultPackageLogLevels(),
@ -456,3 +457,18 @@ func rootify(path, root string) string {
}
return filepath.Join(root, path)
}
//-----------------------------------------------------------------------------
// Moniker
var defaultMoniker = getDefaultMoniker()
// getDefaultMoniker returns a default moniker, which is the host name. If runtime
// fails to get the host name, "anonymous" will be returned.
func getDefaultMoniker() string {
moniker, err := os.Hostname()
if err != nil {
moniker = "anonymous"
}
return moniker
}

+ 2
- 5
config/toml.go View File

@ -23,9 +23,7 @@ func EnsureRoot(rootDir string) {
// Write default config file if missing.
if !cmn.FileExists(configFilePath) {
// Ask user for moniker
// moniker := cfg.Prompt("Type hostname: ", "anonymous")
cmn.MustWriteFile(configFilePath, []byte(defaultConfig("anonymous")), 0644)
cmn.MustWriteFile(configFilePath, []byte(defaultConfig(defaultMoniker)), 0644)
}
}
@ -81,8 +79,7 @@ func ResetTestRoot(testName string) *Config {
// Write default config file if missing.
if !cmn.FileExists(configFilePath) {
// Ask user for moniker
cmn.MustWriteFile(configFilePath, []byte(testConfig("anonymous")), 0644)
cmn.MustWriteFile(configFilePath, []byte(testConfig(defaultMoniker)), 0644)
}
if !cmn.FileExists(genesisFilePath) {
cmn.MustWriteFile(genesisFilePath, []byte(testGenesis), 0644)


+ 2
- 2
config/toml_test.go View File

@ -32,7 +32,7 @@ func TestEnsureRoot(t *testing.T) {
// make sure config is set properly
data, err := ioutil.ReadFile(filepath.Join(tmpDir, "config.toml"))
require.Nil(err)
assert.Equal([]byte(defaultConfig("anonymous")), data)
assert.Equal([]byte(defaultConfig(defaultMoniker)), data)
ensureFiles(t, tmpDir, "data")
}
@ -49,7 +49,7 @@ func TestEnsureTestRoot(t *testing.T) {
// make sure config is set properly
data, err := ioutil.ReadFile(filepath.Join(rootDir, "config.toml"))
require.Nil(err)
assert.Equal([]byte(testConfig("anonymous")), data)
assert.Equal([]byte(testConfig(defaultMoniker)), data)
// TODO: make sure the cfg returned and testconfig are the same!


+ 9
- 7
consensus/replay_test.go View File

@ -264,9 +264,12 @@ func (w *crashingWAL) Wait() { w.next.Wait() }
//------------------------------------------------------------------------------------------
// Handshake Tests
const (
NUM_BLOCKS = 6
)
var (
NUM_BLOCKS = 6 // number of blocks in the test_data/many_blocks.cswal
mempool = types.MockMempool{}
mempool = types.MockMempool{}
)
//---------------------------------------
@ -305,12 +308,12 @@ func TestHandshakeReplayNone(t *testing.T) {
}
}
func writeWAL(walMsgs []byte) string {
func tempWALWithData(data []byte) string {
walFile, err := ioutil.TempFile("", "wal")
if err != nil {
panic(fmt.Errorf("failed to create temp WAL file: %v", err))
}
_, err = walFile.Write(walMsgs)
_, err = walFile.Write(data)
if err != nil {
panic(fmt.Errorf("failed to write to temp WAL file: %v", err))
}
@ -324,12 +327,11 @@ func writeWAL(walMsgs []byte) string {
func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
config := ResetConfig("proxy_test_")
// copy the many_blocks file
walBody, err := cmn.ReadFile(path.Join(data_dir, "many_blocks.cswal"))
walBody, err := WALWithNBlocks(NUM_BLOCKS)
if err != nil {
t.Fatal(err)
}
walFile := writeWAL(walBody)
walFile := tempWALWithData(walBody)
config.Consensus.SetWalFile(walFile)
privVal := types.LoadPrivValidatorFS(config.PrivValidatorFile())


+ 0
- 36
consensus/test_data/README.md View File

@ -1,36 +0,0 @@
# Generating test data
To generate the data, run `build.sh`. See that script for more details.
Make sure to adjust the stepChanges in the testCases if the number of messages changes.
This sometimes happens for the `small_block2.cswal`, where the number of block parts changes between 4 and 5.
If you need to change the signatures, you can use a script as follows:
The privBytes comes from `config/tendermint_test/...`:
```
package main
import (
"encoding/hex"
"fmt"
"github.com/tendermint/go-crypto"
)
func main() {
signBytes, err := hex.DecodeString("7B22636861696E5F6964223A2274656E6465726D696E745F74657374222C22766F7465223A7B22626C6F636B5F68617368223A2242453544373939433846353044354645383533364334333932464443384537423342313830373638222C22626C6F636B5F70617274735F686561646572223A506172745365747B543A31204236323237323535464632307D2C22686569676874223A312C22726F756E64223A302C2274797065223A327D7D")
if err != nil {
panic(err)
}
privBytes, err := hex.DecodeString("27F82582AEFAE7AB151CFB01C48BB6C1A0DA78F9BDDA979A9F70A84D074EB07D3B3069C422E19688B45CBFAE7BB009FC0FA1B1EA86593519318B7214853803C8")
if err != nil {
panic(err)
}
privKey := crypto.PrivKeyEd25519{}
copy(privKey[:], privBytes)
signature := privKey.Sign(signBytes)
fmt.Printf("Signature Bytes: %X\n", signature.Bytes())
}
```

+ 0
- 148
consensus/test_data/build.sh View File

@ -1,148 +0,0 @@
#!/usr/bin/env bash
# Requires: killall command and jq JSON processor.
# Get the parent directory of where this script is.
SOURCE="${BASH_SOURCE[0]}"
while [ -h "$SOURCE" ] ; do SOURCE="$(readlink "$SOURCE")"; done
DIR="$( cd -P "$( dirname "$SOURCE" )/../.." && pwd )"
# Change into that dir because we expect that.
cd "$DIR" || exit 1
# Make sure we have a tendermint command.
if ! hash tendermint 2>/dev/null; then
make install
fi
# Make sure we have a cutWALUntil binary.
cutWALUntil=./scripts/cutWALUntil/cutWALUntil
cutWALUntilDir=$(dirname $cutWALUntil)
if ! hash $cutWALUntil 2>/dev/null; then
cd "$cutWALUntilDir" && go build && cd - || exit 1
fi
TMHOME=$(mktemp -d)
export TMHOME="$TMHOME"
if [[ ! -d "$TMHOME" ]]; then
echo "Could not create temp directory"
exit 1
else
echo "TMHOME: ${TMHOME}"
fi
# TODO: eventually we should replace with `tendermint init --test`
DIR_TO_COPY=$HOME/.tendermint_test/consensus_state_test
if [ ! -d "$DIR_TO_COPY" ]; then
echo "$DIR_TO_COPY does not exist. Please run: go test ./consensus"
exit 1
fi
echo "==> Copying ${DIR_TO_COPY} to ${TMHOME} directory..."
cp -r "$DIR_TO_COPY"/* "$TMHOME"
# preserve original genesis file because later it will be modified (see small_block2)
cp "$TMHOME/genesis.json" "$TMHOME/genesis.json.bak"
function reset(){
echo "==> Resetting tendermint..."
tendermint unsafe_reset_all
cp "$TMHOME/genesis.json.bak" "$TMHOME/genesis.json"
}
reset
# function empty_block(){
# echo "==> Starting tendermint..."
# tendermint node --proxy_app=persistent_dummy &> /dev/null &
# sleep 5
# echo "==> Killing tendermint..."
# killall tendermint
# echo "==> Copying WAL log..."
# $cutWALUntil "$TMHOME/data/cs.wal/wal" 1 consensus/test_data/new_empty_block.cswal
# mv consensus/test_data/new_empty_block.cswal consensus/test_data/empty_block.cswal
# reset
# }
function many_blocks(){
bash scripts/txs/random.sh 1000 36657 &> /dev/null &
PID=$!
echo "==> Starting tendermint..."
tendermint node --proxy_app=persistent_dummy &> /dev/null &
sleep 10
echo "==> Killing tendermint..."
kill -9 $PID
killall tendermint
echo "==> Copying WAL log..."
$cutWALUntil "$TMHOME/data/cs.wal/wal" 6 consensus/test_data/new_many_blocks.cswal
mv consensus/test_data/new_many_blocks.cswal consensus/test_data/many_blocks.cswal
reset
}
# function small_block1(){
# bash scripts/txs/random.sh 1000 36657 &> /dev/null &
# PID=$!
# echo "==> Starting tendermint..."
# tendermint node --proxy_app=persistent_dummy &> /dev/null &
# sleep 10
# echo "==> Killing tendermint..."
# kill -9 $PID
# killall tendermint
# echo "==> Copying WAL log..."
# $cutWALUntil "$TMHOME/data/cs.wal/wal" 1 consensus/test_data/new_small_block1.cswal
# mv consensus/test_data/new_small_block1.cswal consensus/test_data/small_block1.cswal
# reset
# }
# # block part size = 512
# function small_block2(){
# cat "$TMHOME/genesis.json" | jq '. + {consensus_params: {block_size_params: {max_bytes: 22020096}, block_gossip_params: {block_part_size_bytes: 512}}}' > "$TMHOME/new_genesis.json"
# mv "$TMHOME/new_genesis.json" "$TMHOME/genesis.json"
# bash scripts/txs/random.sh 1000 36657 &> /dev/null &
# PID=$!
# echo "==> Starting tendermint..."
# tendermint node --proxy_app=persistent_dummy &> /dev/null &
# sleep 5
# echo "==> Killing tendermint..."
# kill -9 $PID
# killall tendermint
# echo "==> Copying WAL log..."
# $cutWALUntil "$TMHOME/data/cs.wal/wal" 1 consensus/test_data/new_small_block2.cswal
# mv consensus/test_data/new_small_block2.cswal consensus/test_data/small_block2.cswal
# reset
# }
case "$1" in
# "small_block1")
# small_block1
# ;;
# "small_block2")
# small_block2
# ;;
# "empty_block")
# empty_block
# ;;
"many_blocks")
many_blocks
;;
*)
# small_block1
# small_block2
# empty_block
many_blocks
esac
echo "==> Cleaning up..."
rm -rf "$TMHOME"

BIN
consensus/test_data/many_blocks.cswal View File


+ 1
- 1
consensus/ticker.go View File

@ -68,7 +68,7 @@ func (t *timeoutTicker) Chan() <-chan timeoutInfo {
}
// ScheduleTimeout schedules a new timeout by sending on the internal tickChan.
// The timeoutRoutine is alwaya available to read from tickChan, so this won't block.
// The timeoutRoutine is always available to read from tickChan, so this won't block.
// The scheduling may fail if the timeoutRoutine has already scheduled a timeout for a later height/round/step.
func (t *timeoutTicker) ScheduleTimeout(ti timeoutInfo) {
t.tickChan <- ti


+ 2
- 30
consensus/wal.go View File

@ -20,17 +20,13 @@ import (
//--------------------------------------------------------
// types and functions for savings consensus messages
var (
walSeparator = []byte{55, 127, 6, 130} // 0x377f0682 - magic number
)
type TimedWALMessage struct {
Time time.Time `json:"time"` // for debugging purposes
Msg WALMessage `json:"msg"`
}
// EndHeightMessage marks the end of the given height inside WAL.
// @internal used by scripts/cutWALUntil util.
// @internal used by scripts/wal2json util.
type EndHeightMessage struct {
Height int64 `json:"height"`
}
@ -195,7 +191,7 @@ func NewWALEncoder(wr io.Writer) *WALEncoder {
}
// Encode writes the custom encoding of v to the stream.
func (enc *WALEncoder) Encode(v interface{}) error {
func (enc *WALEncoder) Encode(v *TimedWALMessage) error {
data := wire.BinaryBytes(v)
crc := crc32.Checksum(data, crc32c)
@ -209,11 +205,6 @@ func (enc *WALEncoder) Encode(v interface{}) error {
_, err := enc.wr.Write(msg)
if err == nil {
// TODO [Anton Kaliaev 23 Oct 2017]: remove separator
_, err = enc.wr.Write(walSeparator)
}
return err
}
@ -278,28 +269,9 @@ func (dec *WALDecoder) Decode() (*TimedWALMessage, error) {
return nil, fmt.Errorf("failed to decode data: %v", err)
}
// TODO [Anton Kaliaev 23 Oct 2017]: remove separator
if err = readSeparator(dec.rd); err != nil {
return nil, err
}
return res, err
}
// readSeparator reads a separator from r. It returns any error from underlying
// reader or if it's not a separator.
func readSeparator(r io.Reader) error {
b := make([]byte, len(walSeparator))
_, err := r.Read(b)
if err != nil {
return fmt.Errorf("failed to read separator: %v", err)
}
if !bytes.Equal(b, walSeparator) {
return fmt.Errorf("not a separator: %v", b)
}
return nil
}
type nilWAL struct{}
func (nilWAL) Save(m WALMessage) {}


+ 181
- 0
consensus/wal_generator.go View File

@ -0,0 +1,181 @@
package consensus
import (
"bufio"
"bytes"
"fmt"
"math/rand"
"os"
"path/filepath"
"strings"
"time"
"github.com/pkg/errors"
"github.com/tendermint/abci/example/dummy"
bc "github.com/tendermint/tendermint/blockchain"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
auto "github.com/tendermint/tmlibs/autofile"
"github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log"
)
// WALWithNBlocks generates a consensus WAL. It does this by spining up a
// stripped down version of node (proxy app, event bus, consensus state) with a
// persistent dummy application and special consensus wal instance
// (byteBufferWAL) and waits until numBlocks are created. Then it returns a WAL
// content.
func WALWithNBlocks(numBlocks int) (data []byte, err error) {
config := getConfig()
app := dummy.NewPersistentDummyApplication(filepath.Join(config.DBDir(), "wal_generator"))
logger := log.NewNopLogger() // log.TestingLogger().With("wal_generator", "wal_generator")
/////////////////////////////////////////////////////////////////////////////
// COPY PASTE FROM node.go WITH A FEW MODIFICATIONS
// NOTE: we can't import node package because of circular dependency
privValidatorFile := config.PrivValidatorFile()
privValidator := types.LoadOrGenPrivValidatorFS(privValidatorFile)
genDoc, err := types.GenesisDocFromFile(config.GenesisFile())
if err != nil {
return nil, errors.Wrap(err, "failed to read genesis file")
}
stateDB := db.NewMemDB()
blockStoreDB := db.NewMemDB()
state, err := sm.MakeGenesisState(stateDB, genDoc)
state.SetLogger(logger.With("module", "state"))
if err != nil {
return nil, errors.Wrap(err, "failed to make genesis state")
}
blockStore := bc.NewBlockStore(blockStoreDB)
handshaker := NewHandshaker(state, blockStore)
proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app), handshaker)
proxyApp.SetLogger(logger.With("module", "proxy"))
if err := proxyApp.Start(); err != nil {
return nil, errors.Wrap(err, "failed to start proxy app connections")
}
defer proxyApp.Stop()
eventBus := types.NewEventBus()
eventBus.SetLogger(logger.With("module", "events"))
if err := eventBus.Start(); err != nil {
return nil, errors.Wrap(err, "failed to start event bus")
}
mempool := types.MockMempool{}
consensusState := NewConsensusState(config.Consensus, state.Copy(), proxyApp.Consensus(), blockStore, mempool)
consensusState.SetLogger(logger)
consensusState.SetEventBus(eventBus)
if privValidator != nil {
consensusState.SetPrivValidator(privValidator)
}
// END OF COPY PASTE
/////////////////////////////////////////////////////////////////////////////
// set consensus wal to buffered WAL, which will write all incoming msgs to buffer
var b bytes.Buffer
wr := bufio.NewWriter(&b)
numBlocksWritten := make(chan struct{})
wal := &byteBufferWAL{enc: NewWALEncoder(wr), heightToStop: int64(numBlocks), signalWhenStopsTo: numBlocksWritten}
// see wal.go#103
wal.Save(EndHeightMessage{0})
consensusState.wal = wal
if err := consensusState.Start(); err != nil {
return nil, errors.Wrap(err, "failed to start consensus state")
}
defer consensusState.Stop()
select {
case <-numBlocksWritten:
wr.Flush()
return b.Bytes(), nil
case <-time.After(time.Duration(5*numBlocks) * time.Second):
return b.Bytes(), fmt.Errorf("waited too long for tendermint to produce %d blocks", numBlocks)
}
}
// f**ing long, but unique for each test
func makePathname() string {
// get path
p, err := os.Getwd()
if err != nil {
panic(err)
}
// fmt.Println(p)
sep := string(filepath.Separator)
return strings.Replace(p, sep, "_", -1)
}
func randPort() int {
// returns between base and base + spread
base, spread := 20000, 20000
return base + rand.Intn(spread)
}
func makeAddrs() (string, string, string) {
start := randPort()
return fmt.Sprintf("tcp://0.0.0.0:%d", start),
fmt.Sprintf("tcp://0.0.0.0:%d", start+1),
fmt.Sprintf("tcp://0.0.0.0:%d", start+2)
}
// getConfig returns a config for test cases
func getConfig() *cfg.Config {
pathname := makePathname()
c := cfg.ResetTestRoot(pathname)
// and we use random ports to run in parallel
tm, rpc, grpc := makeAddrs()
c.P2P.ListenAddress = tm
c.RPC.ListenAddress = rpc
c.RPC.GRPCListenAddress = grpc
return c
}
// byteBufferWAL is a WAL which writes all msgs to a byte buffer. Writing stops
// when the heightToStop is reached. Client will be notified via
// signalWhenStopsTo channel.
type byteBufferWAL struct {
enc *WALEncoder
stopped bool
heightToStop int64
signalWhenStopsTo chan struct{}
}
// needed for determinism
var fixedTime, _ = time.Parse(time.RFC3339, "2017-01-02T15:04:05Z")
// Save writes message to the internal buffer except when heightToStop is
// reached, in which case it will signal the caller via signalWhenStopsTo and
// skip writing.
func (w *byteBufferWAL) Save(m WALMessage) {
if w.stopped {
return
}
if endMsg, ok := m.(EndHeightMessage); ok {
if endMsg.Height == w.heightToStop {
w.signalWhenStopsTo <- struct{}{}
w.stopped = true
return
}
}
err := w.enc.Encode(&TimedWALMessage{fixedTime, m})
if err != nil {
panic(fmt.Sprintf("failed to encode the msg %v", m))
}
}
func (w *byteBufferWAL) Group() *auto.Group {
panic("not implemented")
}
func (w *byteBufferWAL) SearchForEndHeight(height int64) (gr *auto.GroupReader, found bool, err error) {
return nil, false, nil
}
func (w *byteBufferWAL) Start() error { return nil }
func (w *byteBufferWAL) Stop() error { return nil }
func (w *byteBufferWAL) Wait() {}

+ 7
- 2
consensus/wal_test.go View File

@ -3,7 +3,6 @@ package consensus
import (
"bytes"
"crypto/rand"
"path"
"sync"
"testing"
"time"
@ -43,7 +42,13 @@ func TestWALEncoderDecoder(t *testing.T) {
}
func TestSearchForEndHeight(t *testing.T) {
wal, err := NewWAL(path.Join(data_dir, "many_blocks.cswal"), false)
walBody, err := WALWithNBlocks(6)
if err != nil {
t.Fatal(err)
}
walFile := tempWALWithData(walBody)
wal, err := NewWAL(walFile, false)
if err != nil {
t.Fatal(err)
}


+ 103
- 0
docs/architecture/adr-007-trust-metric-usage.md View File

@ -0,0 +1,103 @@
# ADR 007: Trust Metric Usage Guide
## Context
Tendermint is required to monitor peer quality in order to inform its peer dialing and peer exchange strategies.
When a node first connects to the network, it is important that it can quickly find good peers.
Thus, while a node has fewer connections, it should prioritize connecting to higher quality peers.
As the node becomes well connected to the rest of the network, it can dial lesser known or lesser
quality peers and help assess their quality. Similarly, when queried for peers, a node should make
sure they dont return low quality peers.
Peer quality can be tracked using a trust metric that flags certain behaviours as good or bad. When enough
bad behaviour accumulates, we can mark the peer as bad and disconnect.
For example, when the PEXReactor makes a request for peers network addresses from an already known peer, and the returned network addresses are unreachable, this undesirable behavior should be tracked. Returning a few bad network addresses probably shouldn’t cause a peer to be dropped, while excessive amounts of this behavior does qualify the peer for removal. The originally proposed approach and design document for the trust metric can be found in the [ADR 006](adr-006-trust-metric.md) document.
The trust metric implementation allows a developer to obtain a peer's trust metric from a trust metric store, and track good and bad events relevant to a peer's behavior, and at any time, the peer's metric can be queried for a current trust value. The current trust value is calculated with a formula that utilizes current behavior, previous behavior, and change between the two. Current behavior is calculated as the percentage of good behavior within a time interval. The time interval is short; probably set between 30 seconds and 5 minutes. On the other hand, the historic data can estimate a peer's behavior over days worth of tracking. At the end of a time interval, the current behavior becomes part of the historic data, and a new time interval begins with the good and bad counters reset to zero.
These are some important things to keep in mind regarding how the trust metrics handle time intervals and scoring:
- Each new time interval begins with a perfect score
- Bad events quickly bring the score down and good events cause the score to slowly rise
- When the time interval is over, the percentage of good events becomes historic data.
Some useful information about the inner workings of the trust metric:
- When a trust metric is first instantiated, a timer (ticker) periodically fires in order to handle transitions between trust metric time intervals
- If a peer is disconnected from a node, the timer should be paused, since the node is no longer connected to that peer
- The ability to pause the metric is supported with the store **PeerDisconnected** method and the metric **Pause** method
- After a pause, if a good or bad event method is called on a metric, it automatically becomes unpaused and begins a new time interval.
## Decision
The trust metric capability is now available, yet, it still leaves the question of how should it be applied throughout Tendermint in order to properly track the quality of peers?
### Proposed Process
Peers are managed using an address book and a trust metric:
- The address book keeps a record of peers and provides selection methods
- The trust metric tracks the quality of the peers
#### Presence in Address Book
Outbound peers are added to the address book before they are dialed,
and inbound peers are added once the peer connection is set up.
Peers are also added to the address book when they are received in response to
a pexRequestMessage.
While a node has less than `needAddressThreshold`, it will periodically request more,
via pexRequestMessage, from randomly selected peers and from newly dialed outbound peers.
When a new address is added to an address book that has more than `0.5*needAddressThreshold` addresses,
then with some low probability, a randomly chosen low quality peer is removed.
#### Outbound Peers
Peers attempt to maintain a minimum number of outbound connections by
repeatedly querying the address book for peers to connect to.
While a node has few to no outbound connections, the address book is biased to return
higher quality peers. As the node increases the number of outbound connections,
the address book is biased to return less-vetted or lower-quality peers.
#### Inbound Peers
Peers also maintain a maximum number of total connections, MaxNumPeers.
If a peer has MaxNumPeers, new incoming connections will be accepted with low probability.
When such a new connection is accepted, the peer disconnects from a probabilistically chosen low ranking peer
so it does not exceed MaxNumPeers.
#### Peer Exchange
When a peer receives a pexRequestMessage, it returns a random sample of high quality peers from the address book. Peers with no score or low score should not be inclided in a response to pexRequestMessage.
#### Peer Quality
Peer quality is tracked in the connection and across the reactors by storing the TrustMetric in the peer's
thread safe Data store.
Peer behaviour is then defined as one of the following:
- Fatal - something outright malicious that causes us to disconnect the peer and ban it from the address book for some amount of time
- Bad - Any kind of timeout, messages that don't unmarshal, fail other validity checks, or messages we didn't ask for or aren't expecting (usually worth one bad event)
- Neutral - Unknown channels/message types/version upgrades (no good or bad events recorded)
- Correct - Normal correct behavior (worth one good event)
- Good - some random majority of peers per reactor sending us useful messages (worth more than one good event).
Note that Fatal behaviour causes us to remove the peer, and neutral behaviour does not affect the score.
## Status
Proposed.
## Consequences
### Positive
- Bringing the address book and trust metric store together will cause the network to be built in a way that encourages greater security and reliability.
### Negative
- TBD
### Neutral
- Keep in mind that, good events need to be recorded just as bad events do using this implementation.

+ 2
- 1
docs/specification/configuration.rst View File

@ -21,7 +21,8 @@ The main config parameters are defined
- ``genesis_file``: The location of the genesis file. *Default*:
``"$TMHOME/genesis.json"``
- ``log_level``: *Default*: ``"state:info,*:error"``
- ``moniker``: Name of this node. *Default*: ``"anonymous"``
- ``moniker``: Name of this node. *Default*: the host name or ``"anonymous"``
if runtime fails to get the host name
- ``priv_validator_file``: Validator private key file. *Default*:
``"$TMHOME/priv_validator.json"``
- ``prof_laddr``: Profile listen address. *Default*: ``""``


+ 3
- 3
glide.lock View File

@ -1,5 +1,5 @@
hash: 09fc7f59ca6b718fe236368bb55f4801455295cfe455ea5865d544ee4dcfdc08
updated: 2017-12-06T03:31:34.476581624-05:00
hash: f420f1f858100218dad50997d939eaaf129ff654a0648a47ddc60d626ab0b8e9
updated: 2017-12-10T05:37:46.41123196Z
imports:
- name: github.com/btcsuite/btcd
version: 2e60448ffcc6bf78332d1fe590260095f554dd78
@ -129,7 +129,7 @@ imports:
subpackages:
- iavl
- name: github.com/tendermint/tmlibs
version: bfcc0217f120d3bee6730ba0789d2eb72fc2e889
version: e4ef2835f0081c2ece83b9c1f777cf071f956e81
subpackages:
- autofile
- cli


+ 1
- 1
glide.yaml View File

@ -34,7 +34,7 @@ import:
subpackages:
- iavl
- package: github.com/tendermint/tmlibs
version: ~0.5.0
version: e4ef2835f0081c2ece83b9c1f777cf071f956e81
subpackages:
- autofile
- cli


+ 40
- 0
lite/proxy/block.go View File

@ -0,0 +1,40 @@
package proxy
import (
"bytes"
"github.com/pkg/errors"
"github.com/tendermint/tendermint/lite"
certerr "github.com/tendermint/tendermint/lite/errors"
"github.com/tendermint/tendermint/types"
)
func ValidateBlockMeta(meta *types.BlockMeta, check lite.Commit) error {
// TODO: check the BlockID??
return ValidateHeader(meta.Header, check)
}
func ValidateBlock(meta *types.Block, check lite.Commit) error {
err := ValidateHeader(meta.Header, check)
if err != nil {
return err
}
if !bytes.Equal(meta.Data.Hash(), meta.Header.DataHash) {
return errors.New("Data hash doesn't match header")
}
return nil
}
func ValidateHeader(head *types.Header, check lite.Commit) error {
// make sure they are for the same height (obvious fail)
if head.Height != check.Height() {
return certerr.ErrHeightMismatch(head.Height, check.Height())
}
// check if they are equal by using hashes
chead := check.Header
if !bytes.Equal(head.Hash(), chead.Hash()) {
return errors.New("Headers don't match")
}
return nil
}

+ 30
- 0
lite/proxy/certifier.go View File

@ -0,0 +1,30 @@
package proxy
import (
"github.com/tendermint/tendermint/lite"
certclient "github.com/tendermint/tendermint/lite/client"
"github.com/tendermint/tendermint/lite/files"
)
func GetCertifier(chainID, rootDir, nodeAddr string) (*lite.Inquiring, error) {
trust := lite.NewCacheProvider(
lite.NewMemStoreProvider(),
files.NewProvider(rootDir),
)
source := certclient.NewHTTPProvider(nodeAddr)
// XXX: total insecure hack to avoid `init`
fc, err := source.LatestCommit()
/* XXX
// this gets the most recent verified commit
fc, err := trust.LatestCommit()
if certerr.IsCommitNotFoundErr(err) {
return nil, errors.New("Please run init first to establish a root of trust")
}*/
if err != nil {
return nil, err
}
cert := lite.NewInquiring(chainID, fc, trust, source)
return cert, nil
}

+ 22
- 0
lite/proxy/errors.go View File

@ -0,0 +1,22 @@
package proxy
import (
"fmt"
"github.com/pkg/errors"
)
//--------------------------------------------
var errNoData = fmt.Errorf("No data returned for query")
// IsNoDataErr checks whether an error is due to a query returning empty data
func IsNoDataErr(err error) bool {
return errors.Cause(err) == errNoData
}
func ErrNoData() error {
return errors.WithStack(errNoData)
}
//--------------------------------------------

+ 17
- 0
lite/proxy/errors_test.go View File

@ -0,0 +1,17 @@
package proxy
import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
)
func TestErrorNoData(t *testing.T) {
e1 := ErrNoData()
assert.True(t, IsNoDataErr(e1))
e2 := errors.New("foobar")
assert.False(t, IsNoDataErr(e2))
assert.False(t, IsNoDataErr(nil))
}

+ 69
- 0
lite/proxy/proxy.go View File

@ -0,0 +1,69 @@
package proxy
import (
"net/http"
"github.com/tendermint/tmlibs/log"
rpcclient "github.com/tendermint/tendermint/rpc/client"
"github.com/tendermint/tendermint/rpc/core"
rpc "github.com/tendermint/tendermint/rpc/lib/server"
)
const (
wsEndpoint = "/websocket"
)
// StartProxy will start the websocket manager on the client,
// set up the rpc routes to proxy via the given client,
// and start up an http/rpc server on the location given by bind (eg. :1234)
func StartProxy(c rpcclient.Client, listenAddr string, logger log.Logger) error {
c.Start()
r := RPCRoutes(c)
// build the handler...
mux := http.NewServeMux()
rpc.RegisterRPCFuncs(mux, r, logger)
wm := rpc.NewWebsocketManager(r, rpc.EventSubscriber(c))
wm.SetLogger(logger)
core.SetLogger(logger)
mux.HandleFunc(wsEndpoint, wm.WebsocketHandler)
_, err := rpc.StartHTTPServer(listenAddr, mux, logger)
return err
}
// RPCRoutes just routes everything to the given client, as if it were
// a tendermint fullnode.
//
// if we want security, the client must implement it as a secure client
func RPCRoutes(c rpcclient.Client) map[string]*rpc.RPCFunc {
return map[string]*rpc.RPCFunc{
// Subscribe/unsubscribe are reserved for websocket events.
// We can just use the core tendermint impl, which uses the
// EventSwitch we registered in NewWebsocketManager above
"subscribe": rpc.NewWSRPCFunc(core.Subscribe, "query"),
"unsubscribe": rpc.NewWSRPCFunc(core.Unsubscribe, "query"),
// info API
"status": rpc.NewRPCFunc(c.Status, ""),
"blockchain": rpc.NewRPCFunc(c.BlockchainInfo, "minHeight,maxHeight"),
"genesis": rpc.NewRPCFunc(c.Genesis, ""),
"block": rpc.NewRPCFunc(c.Block, "height"),
"commit": rpc.NewRPCFunc(c.Commit, "height"),
"tx": rpc.NewRPCFunc(c.Tx, "hash,prove"),
"validators": rpc.NewRPCFunc(c.Validators, ""),
// broadcast API
"broadcast_tx_commit": rpc.NewRPCFunc(c.BroadcastTxCommit, "tx"),
"broadcast_tx_sync": rpc.NewRPCFunc(c.BroadcastTxSync, "tx"),
"broadcast_tx_async": rpc.NewRPCFunc(c.BroadcastTxAsync, "tx"),
// abci API
"abci_query": rpc.NewRPCFunc(c.ABCIQuery, "path,data,prove"),
"abci_info": rpc.NewRPCFunc(c.ABCIInfo, ""),
}
}

+ 120
- 0
lite/proxy/query.go View File

@ -0,0 +1,120 @@
package proxy
import (
"github.com/pkg/errors"
"github.com/tendermint/go-wire/data"
"github.com/tendermint/iavl"
"github.com/tendermint/tendermint/lite"
"github.com/tendermint/tendermint/lite/client"
certerr "github.com/tendermint/tendermint/lite/errors"
rpcclient "github.com/tendermint/tendermint/rpc/client"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
)
// GetWithProof will query the key on the given node, and verify it has
// a valid proof, as defined by the certifier.
//
// If there is any error in checking, returns an error.
// If val is non-empty, proof should be KeyExistsProof
// If val is empty, proof should be KeyMissingProof
func GetWithProof(key []byte, reqHeight int64, node rpcclient.Client,
cert lite.Certifier) (
val data.Bytes, height int64, proof iavl.KeyProof, err error) {
if reqHeight < 0 {
err = errors.Errorf("Height cannot be negative")
return
}
_resp, proof, err := GetWithProofOptions("/key", key,
rpcclient.ABCIQueryOptions{Height: int64(reqHeight)},
node, cert)
if _resp != nil {
resp := _resp.Response
val, height = resp.Value, resp.Height
}
return val, height, proof, err
}
// GetWithProofOptions is useful if you want full access to the ABCIQueryOptions
func GetWithProofOptions(path string, key []byte, opts rpcclient.ABCIQueryOptions,
node rpcclient.Client, cert lite.Certifier) (
*ctypes.ResultABCIQuery, iavl.KeyProof, error) {
_resp, err := node.ABCIQueryWithOptions(path, key, opts)
if err != nil {
return nil, nil, err
}
resp := _resp.Response
// make sure the proof is the proper height
if resp.IsErr() {
err = errors.Errorf("Query error %d: %d", resp.Code)
return nil, nil, err
}
if len(resp.Key) == 0 || len(resp.Proof) == 0 {
return nil, nil, ErrNoData()
}
if resp.Height == 0 {
return nil, nil, errors.New("Height returned is zero")
}
// AppHash for height H is in header H+1
commit, err := GetCertifiedCommit(resp.Height+1, node, cert)
if err != nil {
return nil, nil, err
}
if len(resp.Value) > 0 {
// The key was found, construct a proof of existence.
eproof, err := iavl.ReadKeyExistsProof(resp.Proof)
if err != nil {
return nil, nil, errors.Wrap(err, "Error reading proof")
}
// Validate the proof against the certified header to ensure data integrity.
err = eproof.Verify(resp.Key, resp.Value, commit.Header.AppHash)
if err != nil {
return nil, nil, errors.Wrap(err, "Couldn't verify proof")
}
return &ctypes.ResultABCIQuery{resp}, eproof, nil
}
// The key wasn't found, construct a proof of non-existence.
var aproof *iavl.KeyAbsentProof
aproof, err = iavl.ReadKeyAbsentProof(resp.Proof)
if err != nil {
return nil, nil, errors.Wrap(err, "Error reading proof")
}
// Validate the proof against the certified header to ensure data integrity.
err = aproof.Verify(resp.Key, nil, commit.Header.AppHash)
if err != nil {
return nil, nil, errors.Wrap(err, "Couldn't verify proof")
}
return &ctypes.ResultABCIQuery{resp}, aproof, ErrNoData()
}
// GetCertifiedCommit gets the signed header for a given height
// and certifies it. Returns error if unable to get a proven header.
func GetCertifiedCommit(h int64, node rpcclient.Client,
cert lite.Certifier) (empty lite.Commit, err error) {
// FIXME: cannot use cert.GetByHeight for now, as it also requires
// Validators and will fail on querying tendermint for non-current height.
// When this is supported, we should use it instead...
rpcclient.WaitForHeight(node, h, nil)
cresp, err := node.Commit(&h)
if err != nil {
return
}
commit := client.CommitFromResult(cresp)
// validate downloaded checkpoint with our request and trust store.
if commit.Height() != h {
return empty, certerr.ErrHeightMismatch(h, commit.Height())
}
err = cert.Certify(commit)
return commit, nil
}

+ 140
- 0
lite/proxy/query_test.go View File

@ -0,0 +1,140 @@
package proxy
import (
"fmt"
"os"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/abci/example/dummy"
"github.com/tendermint/tendermint/lite"
certclient "github.com/tendermint/tendermint/lite/client"
nm "github.com/tendermint/tendermint/node"
"github.com/tendermint/tendermint/rpc/client"
rpctest "github.com/tendermint/tendermint/rpc/test"
"github.com/tendermint/tendermint/types"
)
var node *nm.Node
// TODO fix tests!!
func TestMain(m *testing.M) {
app := dummy.NewDummyApplication()
node = rpctest.StartTendermint(app)
code := m.Run()
node.Stop()
node.Wait()
os.Exit(code)
}
func dummyTx(k, v []byte) []byte {
return []byte(fmt.Sprintf("%s=%s", k, v))
}
func _TestAppProofs(t *testing.T) {
assert, require := assert.New(t), require.New(t)
cl := client.NewLocal(node)
client.WaitForHeight(cl, 1, nil)
k := []byte("my-key")
v := []byte("my-value")
tx := dummyTx(k, v)
br, err := cl.BroadcastTxCommit(tx)
require.NoError(err, "%+v", err)
require.EqualValues(0, br.CheckTx.Code, "%#v", br.CheckTx)
require.EqualValues(0, br.DeliverTx.Code)
brh := br.Height
// This sets up our trust on the node based on some past point.
source := certclient.NewProvider(cl)
seed, err := source.GetByHeight(brh - 2)
require.NoError(err, "%+v", err)
cert := lite.NewStatic("my-chain", seed.Validators)
client.WaitForHeight(cl, 3, nil)
latest, err := source.LatestCommit()
require.NoError(err, "%+v", err)
rootHash := latest.Header.AppHash
// verify a query before the tx block has no data (and valid non-exist proof)
bs, height, proof, err := GetWithProof(k, brh-1, cl, cert)
fmt.Println(bs, height, proof, err)
require.NotNil(err)
require.True(IsNoDataErr(err), err.Error())
require.Nil(bs)
// but given that block it is good
bs, height, proof, err = GetWithProof(k, brh, cl, cert)
require.NoError(err, "%+v", err)
require.NotNil(proof)
require.True(height >= int64(latest.Header.Height))
// Alexis there is a bug here, somehow the above code gives us rootHash = nil
// and proof.Verify doesn't care, while proofNotExists.Verify fails.
// I am hacking this in to make it pass, but please investigate further.
rootHash = proof.Root()
//err = wire.ReadBinaryBytes(bs, &data)
//require.NoError(err, "%+v", err)
assert.EqualValues(v, bs)
err = proof.Verify(k, bs, rootHash)
assert.NoError(err, "%+v", err)
// Test non-existing key.
missing := []byte("my-missing-key")
bs, _, proof, err = GetWithProof(missing, 0, cl, cert)
require.True(IsNoDataErr(err))
require.Nil(bs)
require.NotNil(proof)
err = proof.Verify(missing, nil, rootHash)
assert.NoError(err, "%+v", err)
err = proof.Verify(k, nil, rootHash)
assert.Error(err)
}
func _TestTxProofs(t *testing.T) {
assert, require := assert.New(t), require.New(t)
cl := client.NewLocal(node)
client.WaitForHeight(cl, 1, nil)
tx := dummyTx([]byte("key-a"), []byte("value-a"))
br, err := cl.BroadcastTxCommit(tx)
require.NoError(err, "%+v", err)
require.EqualValues(0, br.CheckTx.Code, "%#v", br.CheckTx)
require.EqualValues(0, br.DeliverTx.Code)
brh := br.Height
source := certclient.NewProvider(cl)
seed, err := source.GetByHeight(brh - 2)
require.NoError(err, "%+v", err)
cert := lite.NewStatic("my-chain", seed.Validators)
// First let's make sure a bogus transaction hash returns a valid non-existence proof.
key := types.Tx([]byte("bogus")).Hash()
res, err := cl.Tx(key, true)
require.NotNil(err)
require.Contains(err.Error(), "not found")
// Now let's check with the real tx hash.
key = types.Tx(tx).Hash()
res, err = cl.Tx(key, true)
require.NoError(err, "%+v", err)
require.NotNil(res)
err = res.Proof.Validate(key)
assert.NoError(err, "%+v", err)
commit, err := GetCertifiedCommit(br.Height, cl, cert)
require.Nil(err, "%+v", err)
require.Equal(res.Proof.RootHash, commit.Header.DataHash)
}

+ 185
- 0
lite/proxy/wrapper.go View File

@ -0,0 +1,185 @@
package proxy
import (
"github.com/tendermint/go-wire/data"
"github.com/tendermint/tendermint/lite"
certclient "github.com/tendermint/tendermint/lite/client"
rpcclient "github.com/tendermint/tendermint/rpc/client"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
)
var _ rpcclient.Client = Wrapper{}
// Wrapper wraps a rpcclient with a Certifier and double-checks any input that is
// provable before passing it along. Allows you to make any rpcclient fully secure.
type Wrapper struct {
rpcclient.Client
cert *lite.Inquiring
}
// SecureClient uses a given certifier to wrap an connection to an untrusted
// host and return a cryptographically secure rpc client.
//
// If it is wrapping an HTTP rpcclient, it will also wrap the websocket interface
func SecureClient(c rpcclient.Client, cert *lite.Inquiring) Wrapper {
wrap := Wrapper{c, cert}
// TODO: no longer possible as no more such interface exposed....
// if we wrap http client, then we can swap out the event switch to filter
// if hc, ok := c.(*rpcclient.HTTP); ok {
// evt := hc.WSEvents.EventSwitch
// hc.WSEvents.EventSwitch = WrappedSwitch{evt, wrap}
// }
return wrap
}
// ABCIQueryWithOptions exposes all options for the ABCI query and verifies the returned proof
func (w Wrapper) ABCIQueryWithOptions(path string, data data.Bytes, opts rpcclient.ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) {
res, _, err := GetWithProofOptions(path, data, opts, w.Client, w.cert)
return res, err
}
// ABCIQuery uses default options for the ABCI query and verifies the returned proof
func (w Wrapper) ABCIQuery(path string, data data.Bytes) (*ctypes.ResultABCIQuery, error) {
return w.ABCIQueryWithOptions(path, data, rpcclient.DefaultABCIQueryOptions)
}
// Tx queries for a given tx and verifies the proof if it was requested
func (w Wrapper) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
res, err := w.Client.Tx(hash, prove)
if !prove || err != nil {
return res, err
}
h := int64(res.Height)
check, err := GetCertifiedCommit(h, w.Client, w.cert)
if err != nil {
return res, err
}
err = res.Proof.Validate(check.Header.DataHash)
return res, err
}
// BlockchainInfo requests a list of headers and verifies them all...
// Rather expensive.
//
// TODO: optimize this if used for anything needing performance
func (w Wrapper) BlockchainInfo(minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) {
r, err := w.Client.BlockchainInfo(minHeight, maxHeight)
if err != nil {
return nil, err
}
// go and verify every blockmeta in the result....
for _, meta := range r.BlockMetas {
// get a checkpoint to verify from
c, err := w.Commit(&meta.Header.Height)
if err != nil {
return nil, err
}
check := certclient.CommitFromResult(c)
err = ValidateBlockMeta(meta, check)
if err != nil {
return nil, err
}
}
return r, nil
}
// Block returns an entire block and verifies all signatures
func (w Wrapper) Block(height *int64) (*ctypes.ResultBlock, error) {
r, err := w.Client.Block(height)
if err != nil {
return nil, err
}
// get a checkpoint to verify from
c, err := w.Commit(height)
if err != nil {
return nil, err
}
check := certclient.CommitFromResult(c)
// now verify
err = ValidateBlockMeta(r.BlockMeta, check)
if err != nil {
return nil, err
}
err = ValidateBlock(r.Block, check)
if err != nil {
return nil, err
}
return r, nil
}
// Commit downloads the Commit and certifies it with the lite.
//
// This is the foundation for all other verification in this module
func (w Wrapper) Commit(height *int64) (*ctypes.ResultCommit, error) {
rpcclient.WaitForHeight(w.Client, *height, nil)
r, err := w.Client.Commit(height)
// if we got it, then certify it
if err == nil {
check := certclient.CommitFromResult(r)
err = w.cert.Certify(check)
}
return r, err
}
// // WrappedSwitch creates a websocket connection that auto-verifies any info
// // coming through before passing it along.
// //
// // Since the verification takes 1-2 rpc calls, this is obviously only for
// // relatively low-throughput situations that can tolerate a bit extra latency
// type WrappedSwitch struct {
// types.EventSwitch
// client rpcclient.Client
// }
// // FireEvent verifies any block or header returned from the eventswitch
// func (s WrappedSwitch) FireEvent(event string, data events.EventData) {
// tm, ok := data.(types.TMEventData)
// if !ok {
// fmt.Printf("bad type %#v\n", data)
// return
// }
// // check to validate it if possible, and drop if not valid
// switch t := tm.Unwrap().(type) {
// case types.EventDataNewBlockHeader:
// err := verifyHeader(s.client, t.Header)
// if err != nil {
// fmt.Printf("Invalid header: %#v\n", err)
// return
// }
// case types.EventDataNewBlock:
// err := verifyBlock(s.client, t.Block)
// if err != nil {
// fmt.Printf("Invalid block: %#v\n", err)
// return
// }
// // TODO: can we verify tx as well? anything else
// }
// // looks good, we fire it
// s.EventSwitch.FireEvent(event, data)
// }
// func verifyHeader(c rpcclient.Client, head *types.Header) error {
// // get a checkpoint to verify from
// commit, err := c.Commit(&head.Height)
// if err != nil {
// return err
// }
// check := certclient.CommitFromResult(commit)
// return ValidateHeader(head, check)
// }
//
// func verifyBlock(c rpcclient.Client, block *types.Block) error {
// // get a checkpoint to verify from
// commit, err := c.Commit(&block.Height)
// if err != nil {
// return err
// }
// check := certclient.CommitFromResult(commit)
// return ValidateBlock(block, check)
// }

+ 2
- 14
node/node.go View File

@ -2,7 +2,6 @@ package node
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@ -373,12 +372,7 @@ func (n *Node) OnStart() error {
}
// start tx indexer
err = n.indexerService.Start()
if err != nil {
return err
}
return nil
return n.indexerService.Start()
}
// OnStop stops the Node. It implements cmn.Service.
@ -446,13 +440,7 @@ func (n *Node) startRPC() ([]net.Listener, error) {
for i, listenAddr := range listenAddrs {
mux := http.NewServeMux()
rpcLogger := n.Logger.With("module", "rpc-server")
onDisconnect := rpcserver.OnDisconnect(func(remoteAddr string) {
err := n.eventBus.UnsubscribeAll(context.Background(), remoteAddr)
if err != nil {
rpcLogger.Error("Error unsubsribing from all on disconnect", "err", err)
}
})
wm := rpcserver.NewWebsocketManager(rpccore.Routes, onDisconnect)
wm := rpcserver.NewWebsocketManager(rpccore.Routes, rpcserver.EventSubscriber(n.eventBus))
wm.SetLogger(rpcLogger.With("protocol", "websocket"))
mux.HandleFunc("/websocket", wm.WebsocketHandler)
rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger)


+ 9
- 8
p2p/netaddress.go View File

@ -5,8 +5,8 @@
package p2p
import (
"errors"
"flag"
"fmt"
"net"
"strconv"
"time"
@ -45,7 +45,6 @@ func NewNetAddress(addr net.Addr) *NetAddress {
// address in the form of "IP:Port". Also resolves the host if host
// is not an IP.
func NewNetAddressString(addr string) (*NetAddress, error) {
host, portStr, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
@ -73,16 +72,18 @@ func NewNetAddressString(addr string) (*NetAddress, error) {
// NewNetAddressStrings returns an array of NetAddress'es build using
// the provided strings.
func NewNetAddressStrings(addrs []string) ([]*NetAddress, error) {
netAddrs := make([]*NetAddress, len(addrs))
for i, addr := range addrs {
func NewNetAddressStrings(addrs []string) ([]*NetAddress, []error) {
netAddrs := make([]*NetAddress, 0)
errs := make([]error, 0)
for _, addr := range addrs {
netAddr, err := NewNetAddressString(addr)
if err != nil {
return nil, errors.New(cmn.Fmt("Error in address %s: %v", addr, err))
errs = append(errs, fmt.Errorf("Error in address %s: %v", addr, err))
} else {
netAddrs = append(netAddrs, netAddr)
}
netAddrs[i] = netAddr
}
return netAddrs, nil
return netAddrs, errs
}
// NewNetAddressIPPort returns a new NetAddress using the provided IP


+ 3
- 5
p2p/netaddress_test.go View File

@ -51,11 +51,9 @@ func TestNewNetAddressString(t *testing.T) {
}
func TestNewNetAddressStrings(t *testing.T) {
assert, require := assert.New(t), require.New(t)
addrs, err := NewNetAddressStrings([]string{"127.0.0.1:8080", "127.0.0.2:8080"})
require.Nil(err)
assert.Equal(2, len(addrs))
addrs, errs := NewNetAddressStrings([]string{"127.0.0.1:8080", "127.0.0.2:8080"})
assert.Len(t, errs, 0)
assert.Equal(t, 2, len(addrs))
}
func TestNewNetAddressIPPort(t *testing.T) {


+ 2
- 1
p2p/pex_reactor.go View File

@ -20,7 +20,7 @@ const (
minNumOutboundPeers = 10
maxPexMessageSize = 1048576 // 1MB
// maximum messages one peer can send to us during `msgCountByPeerFlushInterval`
// maximum pex messages one peer can send to us during `msgCountByPeerFlushInterval`
defaultMaxMsgCountByPeer = 1000
msgCountByPeerFlushInterval = 1 * time.Hour
)
@ -247,6 +247,7 @@ func (r *PEXReactor) ensurePeers() {
// bias to prefer more vetted peers when we have fewer connections.
// not perfect, but somewhate ensures that we prioritize connecting to more-vetted
// NOTE: range here is [10, 90]. Too high ?
newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
toDial := make(map[string]*NetAddress)


+ 79
- 34
p2p/switch.go View File

@ -2,6 +2,7 @@ package p2p
import (
"fmt"
"math"
"math/rand"
"net"
"time"
@ -14,8 +15,19 @@ import (
)
const (
reconnectAttempts = 30
reconnectInterval = 3 * time.Second
// wait a random amount of time from this interval
// before dialing seeds or reconnecting to help prevent DoS
dialRandomizerIntervalMilliseconds = 3000
// repeatedly try to reconnect for a few minutes
// ie. 5 * 20 = 100s
reconnectAttempts = 20
reconnectInterval = 5 * time.Second
// then move into exponential backoff mode for ~1day
// ie. 3**10 = 16hrs
reconnectBackOffAttempts = 10
reconnectBackOffBaseSeconds = 3
)
type Reactor interface {
@ -74,6 +86,8 @@ type Switch struct {
filterConnByAddr func(net.Addr) error
filterConnByPubKey func(crypto.PubKeyEd25519) error
rng *rand.Rand // seed for randomizing dial times and orders
}
var (
@ -92,6 +106,10 @@ func NewSwitch(config *cfg.P2PConfig) *Switch {
nodeInfo: nil,
}
// Ensure we have a completely undeterministic PRNG. cmd.RandInt64() draws
// from a seed that's initialized with OS entropy on process start.
sw.rng = rand.New(rand.NewSource(cmn.RandInt64()))
// TODO: collapse the peerConfig into the config ?
sw.peerConfig.MConfig.flushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond
sw.peerConfig.MConfig.SendRate = config.SendRate
@ -298,9 +316,9 @@ func (sw *Switch) startInitPeer(peer *peer) {
// DialSeeds dials a list of seeds asynchronously in random order.
func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
netAddrs, err := NewNetAddressStrings(seeds)
if err != nil {
return err
netAddrs, errs := NewNetAddressStrings(seeds)
for _, err := range errs {
sw.Logger.Error("Error in seed's address", "err", err)
}
if addrBook != nil {
@ -317,15 +335,11 @@ func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
addrBook.Save()
}
// Ensure we have a completely undeterministic PRNG. cmd.RandInt64() draws
// from a seed that's initialized with OS entropy on process start.
rng := rand.New(rand.NewSource(cmn.RandInt64()))
// permute the list, dial them in random order.
perm := rng.Perm(len(netAddrs))
perm := sw.rng.Perm(len(netAddrs))
for i := 0; i < len(perm); i++ {
go func(i int) {
time.Sleep(time.Duration(rng.Int63n(3000)) * time.Millisecond)
sw.randomSleep(0)
j := perm[i]
sw.dialSeed(netAddrs[j])
}(i)
@ -333,6 +347,12 @@ func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
return nil
}
// sleep for interval plus some random amount of ms on [0, dialRandomizerIntervalMilliseconds]
func (sw *Switch) randomSleep(interval time.Duration) {
r := time.Duration(sw.rng.Int63n(dialRandomizerIntervalMilliseconds)) * time.Millisecond
time.Sleep(r + interval)
}
func (sw *Switch) dialSeed(addr *NetAddress) {
peer, err := sw.DialPeerWithAddress(addr, true)
if err != nil {
@ -413,34 +433,59 @@ func (sw *Switch) Peers() IPeerSet {
// If the peer is persistent, it will attempt to reconnect.
// TODO: make record depending on reason.
func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) {
addr, _ := NewNetAddressString(peer.NodeInfo().RemoteAddr)
sw.Logger.Error("Stopping peer for error", "peer", peer, "err", reason)
sw.stopAndRemovePeer(peer, reason)
if peer.IsPersistent() {
go func() {
sw.Logger.Info("Reconnecting to peer", "peer", peer)
for i := 1; i < reconnectAttempts; i++ {
if !sw.IsRunning() {
return
}
peer, err := sw.DialPeerWithAddress(addr, true)
if err != nil {
if i == reconnectAttempts {
sw.Logger.Info("Error reconnecting to peer. Giving up", "tries", i, "err", err)
return
}
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err)
time.Sleep(reconnectInterval)
continue
}
sw.Logger.Info("Reconnected to peer", "peer", peer)
return
}
}()
go sw.reconnectToPeer(peer)
}
}
// reconnectToPeer tries to reconnect to the peer, first repeatedly
// with a fixed interval, then with exponential backoff.
// If no success after all that, it stops trying, and leaves it
// to the PEX/Addrbook to find the peer again
func (sw *Switch) reconnectToPeer(peer Peer) {
addr, _ := NewNetAddressString(peer.NodeInfo().RemoteAddr)
start := time.Now()
sw.Logger.Info("Reconnecting to peer", "peer", peer)
for i := 0; i < reconnectAttempts; i++ {
if !sw.IsRunning() {
return
}
peer, err := sw.DialPeerWithAddress(addr, true)
if err != nil {
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer)
// sleep a set amount
sw.randomSleep(reconnectInterval)
continue
} else {
sw.Logger.Info("Reconnected to peer", "peer", peer)
return
}
}
sw.Logger.Error("Failed to reconnect to peer. Beginning exponential backoff",
"peer", peer, "elapsed", time.Since(start))
for i := 0; i < reconnectBackOffAttempts; i++ {
if !sw.IsRunning() {
return
}
// sleep an exponentially increasing amount
sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i))
sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second)
peer, err := sw.DialPeerWithAddress(addr, true)
if err != nil {
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer)
continue
} else {
sw.Logger.Info("Reconnected to peer", "peer", peer)
return
}
}
sw.Logger.Error("Failed to reconnect to peer. Giving up", "peer", peer, "elapsed", time.Since(start))
}
// StopPeerGracefully disconnects from a peer gracefully.


+ 2
- 2
p2p/switch_test.go View File

@ -272,10 +272,10 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
// simulate failure by closing connection
peer.CloseConn()
// TODO: actually detect the disconnection and wait for reconnect
// TODO: remove sleep, detect the disconnection, wait for reconnect
npeers := sw.Peers().Size()
for i := 0; i < 20; i++ {
time.Sleep(100 * time.Millisecond)
time.Sleep(250 * time.Millisecond)
npeers = sw.Peers().Size()
if npeers > 0 {
break


+ 2
- 2
p2p/trust/store.go View File

@ -138,7 +138,7 @@ func (tms *TrustMetricStore) loadFromDB() bool {
return false
}
peers := make(map[string]MetricHistoryJSON, 0)
peers := make(map[string]MetricHistoryJSON)
err := json.Unmarshal(bytes, &peers)
if err != nil {
cmn.PanicCrisis(cmn.Fmt("Could not unmarshal Trust Metric Store DB data: %v", err))
@ -160,7 +160,7 @@ func (tms *TrustMetricStore) loadFromDB() bool {
func (tms *TrustMetricStore) saveToDB() {
tms.Logger.Debug("Saving TrustHistory to DB", "size", tms.size())
peers := make(map[string]MetricHistoryJSON, 0)
peers := make(map[string]MetricHistoryJSON)
for key, tm := range tms.peerMetrics {
// Add an entry for the peer identified by key


+ 4
- 4
rpc/client/helpers.go View File

@ -2,7 +2,6 @@ package client
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
@ -57,19 +56,20 @@ func WaitForHeight(c StatusClient, h int64, waiter Waiter) error {
//
// This handles subscribing and unsubscribing under the hood
func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (types.TMEventData, error) {
const subscriber = "helpers"
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
evts := make(chan interface{}, 1)
// register for the next event of this type
query := fmt.Sprintf("%s='%s'", types.EventTypeKey, evtTyp)
err := c.Subscribe(ctx, query, evts)
query := types.QueryForEvent(evtTyp)
err := c.Subscribe(ctx, subscriber, query, evts)
if err != nil {
return types.TMEventData{}, errors.Wrap(err, "failed to subscribe")
}
// make sure to unregister after the test is over
defer c.Unsubscribe(ctx, query)
defer c.UnsubscribeAll(ctx, subscriber)
select {
case evt := <-evts:


+ 46
- 58
rpc/client/httpclient.go View File

@ -3,7 +3,6 @@ package client
import (
"context"
"encoding/json"
"fmt"
"sync"
"github.com/pkg/errors"
@ -13,6 +12,7 @@ import (
rpcclient "github.com/tendermint/tendermint/rpc/lib/client"
"github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
tmpubsub "github.com/tendermint/tmlibs/pubsub"
)
/*
@ -204,20 +204,14 @@ type WSEvents struct {
endpoint string
ws *rpcclient.WSClient
subscriptions map[string]chan<- interface{}
mtx sync.RWMutex
// used for signaling the goroutine that feeds ws -> EventSwitch
quit chan bool
done chan bool
subscriptions map[string]chan<- interface{}
}
func newWSEvents(remote, endpoint string) *WSEvents {
wsEvents := &WSEvents{
endpoint: endpoint,
remote: remote,
quit: make(chan bool, 1),
done: make(chan bool, 1),
subscriptions: make(map[string]chan<- interface{}),
}
@ -225,87 +219,86 @@ func newWSEvents(remote, endpoint string) *WSEvents {
return wsEvents
}
// Start is the only way I could think the extend OnStart from
// events.eventSwitch. If only it wasn't private...
// BaseService.Start -> eventSwitch.OnStart -> WSEvents.Start
func (w *WSEvents) Start() error {
ws := rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() {
func (w *WSEvents) OnStart() error {
w.ws = rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() {
w.redoSubscriptions()
}))
err := ws.Start()
if err == nil {
w.ws = ws
go w.eventListener()
err := w.ws.Start()
if err != nil {
return err
}
return err
}
// Stop wraps the BaseService/eventSwitch actions as Start does
func (w *WSEvents) Stop() error {
// send a message to quit to stop the eventListener
w.quit <- true
<-w.done
w.ws.Stop()
w.ws = nil
go w.eventListener()
return nil
}
func (w *WSEvents) Subscribe(ctx context.Context, query string, out chan<- interface{}) error {
if ch := w.getSubscription(query); ch != nil {
return errors.New("already subscribed")
// Stop wraps the BaseService/eventSwitch actions as Start does
func (w *WSEvents) OnStop() {
err := w.ws.Stop()
if err != nil {
w.Logger.Error("failed to stop WSClient", "err", err)
}
}
err := w.ws.Subscribe(ctx, query)
func (w *WSEvents) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error {
q := query.String()
err := w.ws.Subscribe(ctx, q)
if err != nil {
return errors.Wrap(err, "failed to subscribe")
return err
}
w.mtx.Lock()
w.subscriptions[query] = out
// subscriber param is ignored because Tendermint will override it with
// remote IP anyway.
w.subscriptions[q] = out
w.mtx.Unlock()
return nil
}
func (w *WSEvents) Unsubscribe(ctx context.Context, query string) error {
err := w.ws.Unsubscribe(ctx, query)
func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error {
q := query.String()
err := w.ws.Unsubscribe(ctx, q)
if err != nil {
return err
}
w.mtx.Lock()
defer w.mtx.Unlock()
ch, ok := w.subscriptions[query]
ch, ok := w.subscriptions[q]
if ok {
close(ch)
delete(w.subscriptions, query)
delete(w.subscriptions, q)
}
w.mtx.Unlock()
return nil
}
func (w *WSEvents) UnsubscribeAll(ctx context.Context) error {
func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error {
err := w.ws.UnsubscribeAll(ctx)
if err != nil {
return err
}
w.mtx.Lock()
defer w.mtx.Unlock()
for _, ch := range w.subscriptions {
close(ch)
}
w.subscriptions = make(map[string]chan<- interface{})
w.mtx.Unlock()
return nil
}
// After being reconnected, it is necessary to redo subscription to server
// otherwise no data will be automatically received.
func (w *WSEvents) redoSubscriptions() {
for query := range w.subscriptions {
for q := range w.subscriptions {
// NOTE: no timeout for resubscribing
// FIXME: better logging/handling of errors??
w.ws.Subscribe(context.Background(), query)
w.ws.Subscribe(context.Background(), q)
}
}
@ -316,34 +309,29 @@ func (w *WSEvents) redoSubscriptions() {
func (w *WSEvents) eventListener() {
for {
select {
case resp := <-w.ws.ResponsesCh:
// res is json.RawMessage
case resp, ok := <-w.ws.ResponsesCh:
if !ok {
return
}
if resp.Error != nil {
// FIXME: better logging/handling of errors??
fmt.Printf("ws err: %+v\n", resp.Error.Error())
w.Logger.Error("WS error", "err", resp.Error.Error())
continue
}
result := new(ctypes.ResultEvent)
err := json.Unmarshal(resp.Result, result)
if err != nil {
// ignore silently (eg. subscribe, unsubscribe and maybe other events)
// TODO: ?
w.Logger.Error("failed to unmarshal response", "err", err)
continue
}
if ch := w.getSubscription(result.Query); ch != nil {
// NOTE: writing also happens inside mutex so we can't close a channel in
// Unsubscribe/UnsubscribeAll.
w.mtx.RLock()
if ch, ok := w.subscriptions[result.Query]; ok {
ch <- result.Data
}
case <-w.quit:
// send a message so we can wait for the routine to exit
// before cleaning up the w.ws stuff
w.done <- true
w.mtx.RUnlock()
case <-w.Quit:
return
}
}
}
func (w *WSEvents) getSubscription(query string) chan<- interface{} {
w.mtx.RLock()
defer w.mtx.RUnlock()
return w.subscriptions[query]
}

+ 1
- 5
rpc/client/interface.go View File

@ -20,8 +20,6 @@ implementation.
package client
import (
"context"
data "github.com/tendermint/go-wire/data"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
@ -89,7 +87,5 @@ type NetworkClient interface {
// EventsClient is reactive, you can subscribe to any message, given the proper
// string. see tendermint/types/events.go
type EventsClient interface {
Subscribe(ctx context.Context, query string, out chan<- interface{}) error
Unsubscribe(ctx context.Context, query string) error
UnsubscribeAll(ctx context.Context) error
types.EventBusSubscriber
}

+ 9
- 41
rpc/client/localclient.go View File

@ -3,19 +3,12 @@ package client
import (
"context"
"github.com/pkg/errors"
data "github.com/tendermint/go-wire/data"
nm "github.com/tendermint/tendermint/node"
"github.com/tendermint/tendermint/rpc/core"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
tmquery "github.com/tendermint/tmlibs/pubsub/query"
)
const (
// event bus subscriber
subscriber = "rpc-localclient"
tmpubsub "github.com/tendermint/tmlibs/pubsub"
)
/*
@ -33,10 +26,7 @@ For real clients, you probably want to use client.HTTP. For more
powerful control during testing, you probably want the "client/mock" package.
*/
type Local struct {
node *nm.Node
*types.EventBus
subscriptions map[string]*tmquery.Query
}
// NewLocal configures a client that calls the Node directly.
@ -48,9 +38,7 @@ type Local struct {
func NewLocal(node *nm.Node) *Local {
node.ConfigureRPC()
return &Local{
node: node,
EventBus: node.EventBus(),
subscriptions: make(map[string]*tmquery.Query),
EventBus: node.EventBus(),
}
}
@ -68,7 +56,7 @@ func (Local) ABCIInfo() (*ctypes.ResultABCIInfo, error) {
return core.ABCIInfo()
}
func (c Local) ABCIQuery(path string, data data.Bytes) (*ctypes.ResultABCIQuery, error) {
func (c *Local) ABCIQuery(path string, data data.Bytes) (*ctypes.ResultABCIQuery, error) {
return c.ABCIQueryWithOptions(path, data, DefaultABCIQueryOptions)
}
@ -128,34 +116,14 @@ func (Local) TxSearch(query string, prove bool) ([]*ctypes.ResultTx, error) {
return core.TxSearch(query, prove)
}
func (c *Local) Subscribe(ctx context.Context, query string, out chan<- interface{}) error {
q, err := tmquery.New(query)
if err != nil {
return errors.Wrap(err, "failed to subscribe")
}
if err = c.EventBus.Subscribe(ctx, subscriber, q, out); err != nil {
return errors.Wrap(err, "failed to subscribe")
}
c.subscriptions[query] = q
return nil
func (c *Local) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error {
return c.EventBus.Subscribe(ctx, subscriber, query, out)
}
func (c *Local) Unsubscribe(ctx context.Context, query string) error {
q, ok := c.subscriptions[query]
if !ok {
return errors.New("subscription not found")
}
if err := c.EventBus.Unsubscribe(ctx, subscriber, q); err != nil {
return errors.Wrap(err, "failed to unsubscribe")
}
delete(c.subscriptions, query)
return nil
func (c *Local) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error {
return c.EventBus.Unsubscribe(ctx, subscriber, query)
}
func (c *Local) UnsubscribeAll(ctx context.Context) error {
if err := c.EventBus.UnsubscribeAll(ctx, subscriber); err != nil {
return errors.Wrap(err, "failed to unsubscribe")
}
c.subscriptions = make(map[string]*tmquery.Query)
return nil
func (c *Local) UnsubscribeAll(ctx context.Context, subscriber string) error {
return c.EventBus.UnsubscribeAll(ctx, subscriber)
}

+ 22
- 14
rpc/core/events.go View File

@ -44,20 +44,15 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri
q, err := tmquery.New(query)
if err != nil {
return nil, errors.Wrap(err, "failed to parse a query")
}
err = wsCtx.AddSubscription(query, q)
if err != nil {
return nil, errors.Wrap(err, "failed to add subscription")
return nil, errors.Wrap(err, "failed to parse query")
}
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel()
ch := make(chan interface{})
err = eventBus.Subscribe(ctx, addr, q, ch)
err = eventBusFor(wsCtx).Subscribe(ctx, addr, q, ch)
if err != nil {
return nil, errors.Wrap(err, "failed to subscribe")
return nil, err
}
go func() {
@ -100,18 +95,31 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri
func Unsubscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultUnsubscribe, error) {
addr := wsCtx.GetRemoteAddr()
logger.Info("Unsubscribe from query", "remote", addr, "query", query)
q, ok := wsCtx.DeleteSubscription(query)
if !ok {
return nil, errors.New("subscription not found")
q, err := tmquery.New(query)
if err != nil {
return nil, errors.Wrap(err, "failed to parse query")
}
err = eventBusFor(wsCtx).Unsubscribe(context.Background(), addr, q)
if err != nil {
return nil, err
}
eventBus.Unsubscribe(context.Background(), addr, q.(*tmquery.Query))
return &ctypes.ResultUnsubscribe{}, nil
}
func UnsubscribeAll(wsCtx rpctypes.WSRPCContext) (*ctypes.ResultUnsubscribe, error) {
addr := wsCtx.GetRemoteAddr()
logger.Info("Unsubscribe from all", "remote", addr)
eventBus.UnsubscribeAll(context.Background(), addr)
wsCtx.DeleteAllSubscriptions()
err := eventBusFor(wsCtx).UnsubscribeAll(context.Background(), addr)
if err != nil {
return nil, err
}
return &ctypes.ResultUnsubscribe{}, nil
}
func eventBusFor(wsCtx rpctypes.WSRPCContext) tmtypes.EventBusSubscriber {
es := wsCtx.GetEventSubscriber()
if es == nil {
es = eventBus
}
return es
}

+ 29
- 29
rpc/core/status.go View File

@ -24,35 +24,35 @@ import (
//
// ```json
// {
// "error": "",
// "result": {
// "latest_block_time": 1.49631773695e+18,
// "latest_block_height": 22924,
// "latest_app_hash": "9D16177BC71E445476174622EA559715C293740C",
// "latest_block_hash": "75B36EEF96C277A592D8B14867098C58F68BB180",
// "pub_key": {
// "data": "68DFDA7E50F82946E7E8546BED37944A422CD1B831E70DF66BA3B8430593944D",
// "type": "ed25519"
// },
// "node_info": {
// "other": [
// "wire_version=0.6.2",
// "p2p_version=0.5.0",
// "consensus_version=v1/0.2.2",
// "rpc_version=0.7.0/3",
// "tx_index=on",
// "rpc_addr=tcp://0.0.0.0:46657"
// ],
// "version": "0.10.0-rc1-aa22bd84",
// "listen_addr": "10.0.2.15:46656",
// "remote_addr": "",
// "network": "test-chain-6UTNIN",
// "moniker": "anonymous",
// "pub_key": "659B9E54DD6EF9FEF28FAD40629AF0E4BD3C2563BB037132B054A176E00F1D94"
// }
// },
// "id": "",
// "jsonrpc": "2.0"
// "result": {
// "syncing": false,
// "latest_block_time": "2017-12-07T18:19:47.617Z",
// "latest_block_height": 6,
// "latest_app_hash": "",
// "latest_block_hash": "A63D0C3307DEDCCFCC82ED411AE9108B70B29E02",
// "pub_key": {
// "data": "8C9A68070CBE33F9C445862BA1E9D96A75CEB68C0CF6ADD3652D07DCAC5D0380",
// "type": "ed25519"
// },
// "node_info": {
// "other": [
// "wire_version=0.7.2",
// "p2p_version=0.5.0",
// "consensus_version=v1/0.2.2",
// "rpc_version=0.7.0/3",
// "tx_index=on",
// "rpc_addr=tcp://0.0.0.0:46657"
// ],
// "version": "0.13.0-14ccc8b",
// "listen_addr": "10.0.2.15:46656",
// "remote_addr": "",
// "network": "test-chain-qhVCa2",
// "moniker": "vagrant-ubuntu-trusty-64",
// "pub_key": "844981FE99ABB19F7816F2D5E94E8A74276AB1153760A7799E925C75401856C6"
// }
// },
// "id": "",
// "jsonrpc": "2.0"
// }
// ```
func Status() (*ctypes.ResultStatus, error) {


+ 7
- 6
rpc/lib/client/ws_client.go View File

@ -169,13 +169,14 @@ func (c *WSClient) OnStop() {}
// Stop overrides cmn.Service#Stop. There is no other way to wait until Quit
// channel is closed.
func (c *WSClient) Stop() error {
err := c.BaseService.Stop()
if err == nil {
// only close user-facing channels when we can't write to them
c.wg.Wait()
close(c.ResponsesCh)
if err := c.BaseService.Stop(); err != nil {
return err
}
return err
// only close user-facing channels when we can't write to them
c.wg.Wait()
close(c.ResponsesCh)
return nil
}
// IsReconnecting returns true if the client is reconnecting right now.


+ 20
- 38
rpc/lib/server/handlers.go View File

@ -2,6 +2,7 @@ package rpcserver
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
@ -366,8 +367,6 @@ type wsConnection struct {
funcMap map[string]*RPCFunc
subscriptions map[string]interface{}
// write channel capacity
writeChanCapacity int
@ -380,8 +379,8 @@ type wsConnection struct {
// Send pings to server with this period. Must be less than readWait, but greater than zero.
pingPeriod time.Duration
// called before stopping the connection.
onDisconnect func(remoteAddr string)
// object that is used to subscribe / unsubscribe from events
eventSub types.EventSubscriber
}
// NewWSConnection wraps websocket.Conn.
@ -395,7 +394,6 @@ func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, opti
remoteAddr: baseConn.RemoteAddr().String(),
baseConn: baseConn,
funcMap: funcMap,
subscriptions: make(map[string]interface{}),
writeWait: defaultWSWriteWait,
writeChanCapacity: defaultWSWriteChanCapacity,
readWait: defaultWSReadWait,
@ -408,6 +406,15 @@ func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, opti
return wsc
}
// EventSubscriber sets object that is used to subscribe / unsubscribe from
// events - not Goroutine-safe. If none given, default node's eventBus will be
// used.
func EventSubscriber(eventSub types.EventSubscriber) func(*wsConnection) {
return func(wsc *wsConnection) {
wsc.eventSub = eventSub
}
}
// WriteWait sets the amount of time to wait before a websocket write times out.
// It should only be used in the constructor - not Goroutine-safe.
func WriteWait(writeWait time.Duration) func(*wsConnection) {
@ -440,14 +447,6 @@ func PingPeriod(pingPeriod time.Duration) func(*wsConnection) {
}
}
// OnDisconnect called before stopping the connection.
// It should only be used in the constructor - not Goroutine-safe.
func OnDisconnect(cb func(remoteAddr string)) func(*wsConnection) {
return func(wsc *wsConnection) {
wsc.onDisconnect = cb
}
}
// OnStart implements cmn.Service by starting the read and write routines. It
// blocks until the connection closes.
func (wsc *wsConnection) OnStart() error {
@ -461,12 +460,12 @@ func (wsc *wsConnection) OnStart() error {
return nil
}
// OnStop implements cmn.Service by calling OnDisconnect callback.
// OnStop implements cmn.Service by unsubscribing remoteAddr from all subscriptions.
func (wsc *wsConnection) OnStop() {
// Both read and write loops close the websocket connection when they exit their loops.
// The writeChan is never closed, to allow WriteRPCResponse() to fail.
if wsc.onDisconnect != nil {
wsc.onDisconnect(wsc.remoteAddr)
if wsc.eventSub != nil {
wsc.eventSub.UnsubscribeAll(context.TODO(), wsc.remoteAddr)
}
}
@ -476,6 +475,11 @@ func (wsc *wsConnection) GetRemoteAddr() string {
return wsc.remoteAddr
}
// GetEventSubscriber implements WSRPCConnection by returning event subscriber.
func (wsc *wsConnection) GetEventSubscriber() types.EventSubscriber {
return wsc.eventSub
}
// WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted.
// It implements WSRPCConnection. It is Goroutine-safe.
func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {
@ -499,28 +503,6 @@ func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool {
}
}
func (wsc *wsConnection) AddSubscription(query string, data interface{}) error {
if _, ok := wsc.subscriptions[query]; ok {
return errors.New("Already subscribed")
}
wsc.subscriptions[query] = data
return nil
}
func (wsc *wsConnection) DeleteSubscription(query string) (interface{}, bool) {
data, ok := wsc.subscriptions[query]
if ok {
delete(wsc.subscriptions, query)
return data, true
}
return nil, false
}
func (wsc *wsConnection) DeleteAllSubscriptions() {
wsc.subscriptions = make(map[string]interface{})
}
// Read from the socket and subscribe to or unsubscribe from events
func (wsc *wsConnection) readRoutine() {
defer func() {


+ 9
- 3
rpc/lib/types/types.go View File

@ -1,11 +1,13 @@
package rpctypes
import (
"context"
"encoding/json"
"fmt"
"strings"
"github.com/pkg/errors"
tmpubsub "github.com/tendermint/tmlibs/pubsub"
)
//----------------------------------------
@ -135,10 +137,14 @@ type WSRPCConnection interface {
GetRemoteAddr() string
WriteRPCResponse(resp RPCResponse)
TryWriteRPCResponse(resp RPCResponse) bool
GetEventSubscriber() EventSubscriber
}
AddSubscription(string, interface{}) error
DeleteSubscription(string) (interface{}, bool)
DeleteAllSubscriptions()
// EventSubscriber mirros tendermint/tendermint/types.EventBusSubscriber
type EventSubscriber interface {
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error
Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
UnsubscribeAll(ctx context.Context, subscriber string) error
}
// websocket-only RPCFuncs take this as the first parameter.


+ 1
- 1
rpc/test/helpers.go View File

@ -51,7 +51,7 @@ func makePathname() string {
if err != nil {
panic(err)
}
fmt.Println(p)
// fmt.Println(p)
sep := string(filepath.Separator)
return strings.Replace(p, sep, "_", -1)
}


+ 0
- 65
scripts/cutWALUntil/main.go View File

@ -1,65 +0,0 @@
/*
cutWALUntil is a small utility for cutting a WAL until the given height
(inclusively). Note it does not include last cs.EndHeightMessage.
Usage:
cutWALUntil <path-to-wal> height-to-stop <output-wal>
*/
package main
import (
"fmt"
"io"
"os"
"strconv"
cs "github.com/tendermint/tendermint/consensus"
)
func main() {
if len(os.Args) < 4 {
fmt.Println("3 arguments required: <path-to-wal> <height-to-stop> <output-wal>")
os.Exit(1)
}
var heightToStop int64
var err error
if heightToStop, err = strconv.ParseInt(os.Args[2], 10, 64); err != nil {
panic(fmt.Errorf("failed to parse height: %v", err))
}
in, err := os.Open(os.Args[1])
if err != nil {
panic(fmt.Errorf("failed to open input WAL file: %v", err))
}
defer in.Close()
out, err := os.Create(os.Args[3])
if err != nil {
panic(fmt.Errorf("failed to open output WAL file: %v", err))
}
defer out.Close()
enc := cs.NewWALEncoder(out)
dec := cs.NewWALDecoder(in)
for {
msg, err := dec.Decode()
if err == io.EOF {
break
} else if err != nil {
panic(fmt.Errorf("failed to decode msg: %v", err))
}
if m, ok := msg.Msg.(cs.EndHeightMessage); ok {
if m.Height == heightToStop {
break
}
}
err = enc.Encode(msg)
if err != nil {
panic(fmt.Errorf("failed to encode msg: %v", err))
}
}
}

+ 1
- 1
state/txindex/kv/kv.go View File

@ -342,7 +342,7 @@ func startKey(c query.Condition, height int64) []byte {
func startKeyForRange(r queryRange, height int64) []byte {
if r.lowerBound == nil {
return []byte(fmt.Sprintf("%s", r.key))
return []byte(r.key)
}
var lowerBound interface{}


+ 6
- 0
types/event_bus.go View File

@ -12,6 +12,12 @@ import (
const defaultCapacity = 1000
type EventBusSubscriber interface {
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error
Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
UnsubscribeAll(ctx context.Context, subscriber string) error
}
// EventBus is a common bus for all events going through the system. All calls
// are proxied to underlying pubsub server. All events must be published using
// EventBus to ensure correct data types.


+ 20
- 20
types/events.go View File

@ -146,32 +146,32 @@ const (
)
var (
EventQueryBond = queryForEvent(EventBond)
EventQueryUnbond = queryForEvent(EventUnbond)
EventQueryRebond = queryForEvent(EventRebond)
EventQueryDupeout = queryForEvent(EventDupeout)
EventQueryFork = queryForEvent(EventFork)
EventQueryNewBlock = queryForEvent(EventNewBlock)
EventQueryNewBlockHeader = queryForEvent(EventNewBlockHeader)
EventQueryNewRound = queryForEvent(EventNewRound)
EventQueryNewRoundStep = queryForEvent(EventNewRoundStep)
EventQueryTimeoutPropose = queryForEvent(EventTimeoutPropose)
EventQueryCompleteProposal = queryForEvent(EventCompleteProposal)
EventQueryPolka = queryForEvent(EventPolka)
EventQueryUnlock = queryForEvent(EventUnlock)
EventQueryLock = queryForEvent(EventLock)
EventQueryRelock = queryForEvent(EventRelock)
EventQueryTimeoutWait = queryForEvent(EventTimeoutWait)
EventQueryVote = queryForEvent(EventVote)
EventQueryProposalHeartbeat = queryForEvent(EventProposalHeartbeat)
EventQueryTx = queryForEvent(EventTx)
EventQueryBond = QueryForEvent(EventBond)
EventQueryUnbond = QueryForEvent(EventUnbond)
EventQueryRebond = QueryForEvent(EventRebond)
EventQueryDupeout = QueryForEvent(EventDupeout)
EventQueryFork = QueryForEvent(EventFork)
EventQueryNewBlock = QueryForEvent(EventNewBlock)
EventQueryNewBlockHeader = QueryForEvent(EventNewBlockHeader)
EventQueryNewRound = QueryForEvent(EventNewRound)
EventQueryNewRoundStep = QueryForEvent(EventNewRoundStep)
EventQueryTimeoutPropose = QueryForEvent(EventTimeoutPropose)
EventQueryCompleteProposal = QueryForEvent(EventCompleteProposal)
EventQueryPolka = QueryForEvent(EventPolka)
EventQueryUnlock = QueryForEvent(EventUnlock)
EventQueryLock = QueryForEvent(EventLock)
EventQueryRelock = QueryForEvent(EventRelock)
EventQueryTimeoutWait = QueryForEvent(EventTimeoutWait)
EventQueryVote = QueryForEvent(EventVote)
EventQueryProposalHeartbeat = QueryForEvent(EventProposalHeartbeat)
EventQueryTx = QueryForEvent(EventTx)
)
func EventQueryTxFor(tx Tx) tmpubsub.Query {
return tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s='%X'", EventTypeKey, EventTx, TxHashKey, tx.Hash()))
}
func queryForEvent(eventType string) tmpubsub.Query {
func QueryForEvent(eventType string) tmpubsub.Query {
return tmquery.MustParse(fmt.Sprintf("%s='%s'", EventTypeKey, eventType))
}


+ 2
- 2
version/version.go View File

@ -1,13 +1,13 @@
package version
const Maj = "0"
const Min = "13"
const Min = "14"
const Fix = "0"
var (
// Version is the current version of Tendermint
// Must be a string because scripts like dist.sh read this file.
Version = "0.13.0"
Version = "0.14.0"
// GitCommit is the current HEAD set using ldflags.
GitCommit string


Loading…
Cancel
Save