Browse Source

test: add evidence e2e tests (#5488)

pull/5568/head
Callum Waters 4 years ago
committed by Erik Grinaker
parent
commit
dacbfbe1fe
38 changed files with 8864 additions and 39 deletions
  1. +1
    -0
      .gitignore
  2. +6
    -1
      test/e2e/Makefile
  3. +1
    -0
      test/e2e/app/config.go
  4. +82
    -27
      test/e2e/app/main.go
  5. +1
    -0
      test/e2e/docker/Dockerfile
  6. +10
    -0
      test/e2e/docker/entrypoint-maverick
  7. +29
    -2
      test/e2e/generator/generate.go
  8. +1
    -0
      test/e2e/generator/main.go
  9. +1
    -1
      test/e2e/generator/random.go
  10. +1
    -0
      test/e2e/networks/ci.toml
  11. +1
    -0
      test/e2e/networks/simple.toml
  12. +10
    -0
      test/e2e/pkg/manifest.go
  13. +30
    -0
      test/e2e/pkg/testnet.go
  14. +20
    -7
      test/e2e/runner/main.go
  15. +25
    -1
      test/e2e/runner/setup.go
  16. +21
    -0
      test/e2e/runner/wait.go
  17. +1
    -0
      test/e2e/tests/app_test.go
  18. +1
    -0
      test/e2e/tests/block_test.go
  19. +1
    -0
      test/e2e/tests/e2e_test.go
  20. +50
    -0
      test/e2e/tests/evidence_test.go
  21. +1
    -0
      test/e2e/tests/net_test.go
  22. +1
    -0
      test/e2e/tests/validator_test.go
  23. +51
    -0
      test/maverick/README.md
  24. +220
    -0
      test/maverick/consensus/metrics.go
  25. +398
    -0
      test/maverick/consensus/misbehavior.go
  26. +377
    -0
      test/maverick/consensus/msgs.go
  27. +1720
    -0
      test/maverick/consensus/reactor.go
  28. +533
    -0
      test/maverick/consensus/replay.go
  29. +338
    -0
      test/maverick/consensus/replay_file.go
  30. +90
    -0
      test/maverick/consensus/replay_stubs.go
  31. +1976
    -0
      test/maverick/consensus/state.go
  32. +134
    -0
      test/maverick/consensus/ticker.go
  33. +437
    -0
      test/maverick/consensus/wal.go
  34. +31
    -0
      test/maverick/consensus/wal_fuzz.go
  35. +229
    -0
      test/maverick/consensus/wal_generator.go
  36. +237
    -0
      test/maverick/main.go
  37. +1440
    -0
      test/maverick/node/node.go
  38. +358
    -0
      test/maverick/node/privval.go

+ 1
- 0
.gitignore View File

@ -11,6 +11,7 @@ remote_dump
vendor
.vagrant
test/e2e/build
test/maverick/maverick
test/e2e/networks/*/
test/p2p/data/
test/logs


+ 6
- 1
test/e2e/Makefile View File

@ -8,6 +8,11 @@ docker:
# ABCI testing).
app:
go build -o build/app -tags badgerdb,boltdb,cleveldb,rocksdb ./app
# To be used primarily by the e2e docker instance. If you want to produce this binary
# elsewhere, then run go build in the maverick directory.
maverick:
go build -o build/maverick -tags badgerdb,boltdb,cleveldb,rocksdb ../maverick
generator:
go build -o build/generator ./generator
@ -15,4 +20,4 @@ generator:
runner:
go build -o build/runner ./runner
.PHONY: all app docker generator runner
.PHONY: all app docker generator maverick runner

+ 1
- 0
test/e2e/app/config.go View File

@ -21,6 +21,7 @@ type Config struct {
PrivValServer string `toml:"privval_server"`
PrivValKey string `toml:"privval_key"`
PrivValState string `toml:"privval_state"`
Misbehaviors map[string]string `toml:"misbehaviors"`
}
// LoadConfig loads the configuration from disk.


+ 82
- 27
test/e2e/app/main.go View File

@ -5,9 +5,11 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"time"
"github.com/spf13/viper"
"github.com/tendermint/tendermint/abci/server"
"github.com/tendermint/tendermint/config"
tmflags "github.com/tendermint/tendermint/libs/cli/flags"
@ -17,6 +19,8 @@ import (
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/privval"
"github.com/tendermint/tendermint/proxy"
mcs "github.com/tendermint/tendermint/test/maverick/consensus"
maverick "github.com/tendermint/tendermint/test/maverick/node"
)
var logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout))
@ -60,7 +64,11 @@ func run(configFile string) error {
case "socket", "grpc":
err = startApp(cfg)
case "builtin":
err = startNode(cfg)
if len(cfg.Misbehaviors) == 0 {
err = startNode(cfg)
} else {
err = startMaverick(cfg)
}
default:
err = fmt.Errorf("invalid protocol %q", cfg.Protocol)
}
@ -102,51 +110,59 @@ func startNode(cfg *Config) error {
return err
}
home := os.Getenv("TMHOME")
if home == "" {
return errors.New("TMHOME not set")
}
viper.AddConfigPath(filepath.Join(home, "config"))
viper.SetConfigName("config")
err = viper.ReadInConfig()
tmcfg, nodeLogger, nodeKey, err := setupNode()
if err != nil {
return err
return fmt.Errorf("failed to setup config: %w", err)
}
tmcfg := config.DefaultConfig()
err = viper.Unmarshal(tmcfg)
n, err := node.NewNode(tmcfg,
privval.LoadOrGenFilePV(tmcfg.PrivValidatorKeyFile(), tmcfg.PrivValidatorStateFile()),
nodeKey,
proxy.NewLocalClientCreator(app),
node.DefaultGenesisDocProviderFunc(tmcfg),
node.DefaultDBProvider,
node.DefaultMetricsProvider(tmcfg.Instrumentation),
nodeLogger,
)
if err != nil {
return err
}
tmcfg.SetRoot(home)
if err = tmcfg.ValidateBasic(); err != nil {
return fmt.Errorf("error in config file: %v", err)
}
if tmcfg.LogFormat == config.LogFormatJSON {
logger = log.NewTMJSONLogger(log.NewSyncWriter(os.Stdout))
}
logger, err = tmflags.ParseLogLevel(tmcfg.LogLevel, logger, config.DefaultLogLevel())
return n.Start()
}
// startMaverick starts a Maverick node that runs the application directly. It assumes the Tendermint
// configuration is in $TMHOME/config/tendermint.toml.
func startMaverick(cfg *Config) error {
app, err := NewApplication(cfg)
if err != nil {
return err
}
logger = logger.With("module", "main")
nodeKey, err := p2p.LoadOrGenNodeKey(tmcfg.NodeKeyFile())
tmcfg, logger, nodeKey, err := setupNode()
if err != nil {
return fmt.Errorf("failed to load or gen node key %s: %w", tmcfg.NodeKeyFile(), err)
return fmt.Errorf("failed to setup config: %w", err)
}
n, err := node.NewNode(tmcfg,
privval.LoadOrGenFilePV(tmcfg.PrivValidatorKeyFile(), tmcfg.PrivValidatorStateFile()),
misbehaviors := make(map[int64]mcs.Misbehavior, len(cfg.Misbehaviors))
for heightString, misbehaviorString := range cfg.Misbehaviors {
height, _ := strconv.ParseInt(heightString, 10, 64)
misbehaviors[height] = mcs.MisbehaviorList[misbehaviorString]
}
n, err := maverick.NewNode(tmcfg,
maverick.LoadOrGenFilePV(tmcfg.PrivValidatorKeyFile(), tmcfg.PrivValidatorStateFile()),
nodeKey,
proxy.NewLocalClientCreator(app),
node.DefaultGenesisDocProviderFunc(tmcfg),
node.DefaultDBProvider,
node.DefaultMetricsProvider(tmcfg.Instrumentation),
maverick.DefaultGenesisDocProviderFunc(tmcfg),
maverick.DefaultDBProvider,
maverick.DefaultMetricsProvider(tmcfg.Instrumentation),
logger,
misbehaviors,
)
if err != nil {
return err
}
return n.Start()
}
@ -175,3 +191,42 @@ func startSigner(cfg *Config) error {
logger.Info(fmt.Sprintf("Remote signer connecting to %v", cfg.PrivValServer))
return nil
}
func setupNode() (*config.Config, log.Logger, *p2p.NodeKey, error) {
var tmcfg *config.Config
home := os.Getenv("TMHOME")
if home == "" {
return nil, nil, nil, errors.New("TMHOME not set")
}
viper.AddConfigPath(filepath.Join(home, "config"))
viper.SetConfigName("config")
err := viper.ReadInConfig()
if err != nil {
return nil, nil, nil, err
}
tmcfg = config.DefaultConfig()
err = viper.Unmarshal(tmcfg)
if err != nil {
return nil, nil, nil, err
}
tmcfg.SetRoot(home)
if err = tmcfg.ValidateBasic(); err != nil {
return nil, nil, nil, fmt.Errorf("error in config file: %w", err)
}
if tmcfg.LogFormat == config.LogFormatJSON {
logger = log.NewTMJSONLogger(log.NewSyncWriter(os.Stdout))
}
nodeLogger, err := tmflags.ParseLogLevel(tmcfg.LogLevel, logger, config.DefaultLogLevel())
if err != nil {
return nil, nil, nil, err
}
nodeLogger = nodeLogger.With("module", "main")
nodeKey, err := p2p.LoadOrGenNodeKey(tmcfg.NodeKeyFile())
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to load or gen node key %s: %w", tmcfg.NodeKeyFile(), err)
}
return tmcfg, nodeLogger, nodeKey, nil
}

+ 1
- 0
test/e2e/docker/Dockerfile View File

@ -18,6 +18,7 @@ RUN go mod download
COPY . .
RUN make build && cp build/tendermint /usr/bin/tendermint
COPY test/e2e/docker/entrypoint* /usr/bin/
RUN cd test/e2e && make maverick && cp build/maverick /usr/bin/maverick
RUN cd test/e2e && make app && cp build/app /usr/bin/app
# Set up runtime directory. We don't use a separate runtime image since we need


+ 10
- 0
test/e2e/docker/entrypoint-maverick View File

@ -0,0 +1,10 @@
#!/usr/bin/env bash
# Forcibly remove any stray UNIX sockets left behind from previous runs
rm -rf /var/run/privval.sock /var/run/app.sock
/usr/bin/app /tendermint/config/app.toml &
sleep 1
/usr/bin/maverick "$@"

+ 29
- 2
test/e2e/generator/generate.go View File

@ -4,6 +4,7 @@ import (
"fmt"
"math/rand"
"sort"
"strconv"
"strings"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
@ -39,6 +40,10 @@ var (
"kill": 0.1,
"restart": 0.1,
}
nodeMisbehaviors = weightedChoice{
misbehaviorOption{"double-prevote"}: 1,
misbehaviorOption{}: 9,
}
)
// Generate generates random testnets using the given RNG.
@ -91,7 +96,7 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
nextStartAt := manifest.InitialHeight + 5
quorum := numValidators*2/3 + 1
for i := 1; i <= numValidators; i++ {
startAt := int64(0)
startAt := manifest.InitialHeight
if i > quorum {
startAt = nextStartAt
nextStartAt += 5
@ -174,7 +179,8 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
// generating invalid configurations. We do not set Seeds or PersistentPeers
// here, since we need to know the overall network topology and startup
// sequencing.
func generateNode(r *rand.Rand, mode e2e.Mode, startAt int64, forceArchive bool) *e2e.ManifestNode {
func generateNode(
r *rand.Rand, mode e2e.Mode, startAt int64, forceArchive bool) *e2e.ManifestNode {
node := e2e.ManifestNode{
Mode: string(mode),
StartAt: startAt,
@ -196,6 +202,14 @@ func generateNode(r *rand.Rand, mode e2e.Mode, startAt int64, forceArchive bool)
node.SnapshotInterval = 3
}
if node.Mode == "validator" {
node.Misbehaviors = nodeMisbehaviors.Choose(r).(misbehaviorOption).
atHeight(startAt + 5 + int64(r.Intn(10)))
if len(node.Misbehaviors) != 0 {
node.PrivvalProtocol = "file"
}
}
// If a node which does not persist state also does not retain blocks, randomly
// choose to either persist state or retain all blocks.
if node.PersistInterval != nil && *node.PersistInterval == 0 && node.RetainBlocks > 0 {
@ -223,3 +237,16 @@ func generateNode(r *rand.Rand, mode e2e.Mode, startAt int64, forceArchive bool)
func ptrUint64(i uint64) *uint64 {
return &i
}
type misbehaviorOption struct {
misbehavior string
}
func (m misbehaviorOption) atHeight(height int64) map[string]string {
misbehaviorMap := make(map[string]string)
if m.misbehavior == "" {
return misbehaviorMap
}
misbehaviorMap[strconv.Itoa(int(height))] = m.misbehavior
return misbehaviorMap
}

+ 1
- 0
test/e2e/generator/main.go View File

@ -9,6 +9,7 @@ import (
"path/filepath"
"github.com/spf13/cobra"
"github.com/tendermint/tendermint/libs/log"
)


+ 1
- 1
test/e2e/generator/random.go View File

@ -57,7 +57,7 @@ func (uc uniformChoice) Choose(r *rand.Rand) interface{} {
}
// weightedChoice chooses a single random key from a map of keys and weights.
type weightedChoice map[interface{}]uint // nolint:unused
type weightedChoice map[interface{}]uint
func (wc weightedChoice) Choose(r *rand.Rand) interface{} {
total := 0


+ 1
- 0
test/e2e/networks/ci.toml View File

@ -36,6 +36,7 @@ seeds = ["seed01"]
seeds = ["seed01"]
snapshot_interval = 5
perturb = ["disconnect"]
misbehaviors = { 1012 = "double-prevote", 1018 = "double-prevote" }
[node.validator02]
seeds = ["seed02"]


+ 1
- 0
test/e2e/networks/simple.toml View File

@ -2,3 +2,4 @@
[node.validator02]
[node.validator03]
[node.validator04]

+ 10
- 0
test/e2e/pkg/manifest.go View File

@ -115,6 +115,16 @@ type ManifestNode struct {
// pause: temporarily pauses (freezes) the node
// restart: restarts the node, shutting it down with SIGTERM
Perturb []string `toml:"perturb"`
// Misbehaviors sets how a validator behaves during consensus at a
// certain height. Multiple misbehaviors at different heights can be used
//
// An example of misbehaviors
// { 10 = "double-prevote", 20 = "double-prevote"}
//
// For more information, look at the readme in the maverick folder.
// A list of all behaviors can be found in ../maverick/consensus/behavior.go
Misbehaviors map[string]string `toml:"misbehaviors"`
}
// Save saves the testnet manifest to a file.


+ 30
- 0
test/e2e/pkg/testnet.go View File

@ -15,6 +15,7 @@ import (
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/ed25519"
rpchttp "github.com/tendermint/tendermint/rpc/client/http"
mcs "github.com/tendermint/tendermint/test/maverick/consensus"
)
const (
@ -78,6 +79,7 @@ type Node struct {
Seeds []*Node
PersistentPeers []*Node
Perturbations []Perturbation
Misbehaviors map[int64]string
}
// LoadTestnet loads a testnet from a manifest file, using the filename to
@ -147,6 +149,7 @@ func LoadTestnet(file string) (*Testnet, error) {
SnapshotInterval: nodeManifest.SnapshotInterval,
RetainBlocks: nodeManifest.RetainBlocks,
Perturbations: []Perturbation{},
Misbehaviors: make(map[int64]string),
}
if nodeManifest.Mode != "" {
node.Mode = Mode(nodeManifest.Mode)
@ -166,6 +169,13 @@ func LoadTestnet(file string) (*Testnet, error) {
for _, p := range nodeManifest.Perturb {
node.Perturbations = append(node.Perturbations, Perturbation(p))
}
for heightString, misbehavior := range nodeManifest.Misbehaviors {
height, err := strconv.ParseInt(heightString, 10, 64)
if err != nil {
return nil, fmt.Errorf("unable to parse height %s to int64: %w", heightString, err)
}
node.Misbehaviors[height] = misbehavior
}
testnet.Nodes = append(testnet.Nodes, node)
}
@ -324,6 +334,26 @@ func (n Node) Validate(testnet Testnet) error {
return fmt.Errorf("invalid perturbation %q", perturbation)
}
}
if (n.PrivvalProtocol != "file" || n.Mode != "validator") && len(n.Misbehaviors) != 0 {
return errors.New("must be using \"file\" privval protocol to implement misbehaviors")
}
for height, misbehavior := range n.Misbehaviors {
if height < n.StartAt {
return fmt.Errorf("misbehavior height %d is before start height %d", height, n.StartAt)
}
exists := false
for possibleBehaviors := range mcs.MisbehaviorList {
if possibleBehaviors == misbehavior {
exists = true
}
}
if !exists {
return fmt.Errorf("misbehavior %s does not exist", misbehavior)
}
}
return nil
}


+ 20
- 7
test/e2e/runner/main.go View File

@ -6,11 +6,14 @@ import (
"os"
"github.com/spf13/cobra"
"github.com/tendermint/tendermint/libs/log"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
)
var logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout))
var (
logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout))
)
func main() {
NewCLI().Run()
@ -18,8 +21,9 @@ func main() {
// CLI is the Cobra-based command-line interface.
type CLI struct {
root *cobra.Command
testnet *e2e.Testnet
root *cobra.Command
testnet *e2e.Testnet
preserve bool
}
// NewCLI sets up the CLI.
@ -65,10 +69,13 @@ func NewCLI() *CLI {
if err := Start(cli.testnet); err != nil {
return err
}
if err := waitForAllMisbehaviors(cli.testnet); err != nil {
return err
}
if err := Perturb(cli.testnet); err != nil {
return err
}
if err := Wait(cli.testnet, 5); err != nil { // allow some txs to go through
if err := Wait(cli.testnet, interphaseWaitPeriod); err != nil { // allow some txs to go through
return err
}
@ -76,14 +83,17 @@ func NewCLI() *CLI {
if err := <-chLoadResult; err != nil {
return err
}
if err := Wait(cli.testnet, 5); err != nil { // wait for network to settle before tests
// wait for network to settle before tests
if err := Wait(cli.testnet, interphaseWaitPeriod); err != nil {
return err
}
if err := Test(cli.testnet); err != nil {
return err
}
if err := Cleanup(cli.testnet); err != nil {
return err
if !cli.preserve {
if err := Cleanup(cli.testnet); err != nil {
return err
}
}
return nil
},
@ -92,6 +102,9 @@ func NewCLI() *CLI {
cli.root.PersistentFlags().StringP("file", "f", "", "Testnet TOML manifest")
_ = cli.root.MarkPersistentFlagRequired("file")
cli.root.Flags().BoolVarP(&cli.preserve, "preserve", "p", false,
"Preserves the running of the test net after tests are completed")
cli.root.AddCommand(&cobra.Command{
Use: "setup",
Short: "Generates the testnet directory and configuration",


+ 25
- 1
test/e2e/runner/setup.go View File

@ -12,11 +12,13 @@ import (
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
"text/template"
"time"
"github.com/BurntSushi/toml"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/p2p"
@ -118,7 +120,20 @@ func Setup(testnet *e2e.Testnet) error {
// MakeDockerCompose generates a Docker Compose config for a testnet.
func MakeDockerCompose(testnet *e2e.Testnet) ([]byte, error) {
// Must use version 2 Docker Compose format, to support IPv6.
tmpl, err := template.New("docker-compose").Parse(`version: '2.4'
tmpl, err := template.New("docker-compose").Funcs(template.FuncMap{
"misbehaviorsToString": func(misbehaviors map[int64]string) string {
str := ""
for height, misbehavior := range misbehaviors {
// after the first behavior set, a comma must be prepended
if str != "" {
str += ","
}
heightString := strconv.Itoa(int(height))
str += misbehavior + "," + heightString
}
return str
},
}).Parse(`version: '2.4'
networks:
{{ .Name }}:
@ -142,6 +157,9 @@ services:
image: tendermint/e2e-node
{{- if eq .ABCIProtocol "builtin" }}
entrypoint: /usr/bin/entrypoint-builtin
{{- else if .Misbehaviors }}
entrypoint: /usr/bin/entrypoint-maverick
command: ["node", "--misbehaviors", "{{ misbehaviorsToString .Misbehaviors }}"]
{{- end }}
init: true
ports:
@ -330,6 +348,12 @@ func MakeAppConfig(node *e2e.Node) ([]byte, error) {
}
}
misbehaviors := make(map[string]string)
for height, misbehavior := range node.Misbehaviors {
misbehaviors[strconv.Itoa(int(height))] = misbehavior
}
cfg["misbehaviors"] = misbehaviors
if len(node.Testnet.ValidatorUpdates) > 0 {
validatorUpdates := map[string]map[string]int64{}
for height, validators := range node.Testnet.ValidatorUpdates {


+ 21
- 0
test/e2e/runner/wait.go View File

@ -7,6 +7,8 @@ import (
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
)
const interphaseWaitPeriod = 5
// Wait waits for a number of blocks to be produced, and for all nodes to catch
// up with it.
func Wait(testnet *e2e.Testnet, blocks int64) error {
@ -22,3 +24,22 @@ func Wait(testnet *e2e.Testnet, blocks int64) error {
}
return nil
}
// WaitForAllMisbehaviors calculates the height of the last misbehavior and ensures the entire
// testnet has surpassed this height before moving on to the next phase
func waitForAllMisbehaviors(testnet *e2e.Testnet) error {
_, _, err := waitForHeight(testnet, lastMisbehaviorHeight(testnet))
return err
}
func lastMisbehaviorHeight(testnet *e2e.Testnet) int64 {
lastHeight := testnet.InitialHeight
for _, n := range testnet.Nodes {
for height := range n.Misbehaviors {
if height > lastHeight {
lastHeight = height
}
}
}
return lastHeight + interphaseWaitPeriod
}

+ 1
- 0
test/e2e/tests/app_test.go View File

@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
"github.com/tendermint/tendermint/types"
)


+ 1
- 0
test/e2e/tests/block_test.go View File

@ -5,6 +5,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
)


+ 1
- 0
test/e2e/tests/e2e_test.go View File

@ -8,6 +8,7 @@ import (
"testing"
"github.com/stretchr/testify/require"
rpchttp "github.com/tendermint/tendermint/rpc/client/http"
rpctypes "github.com/tendermint/tendermint/rpc/core/types"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"


+ 50
- 0
test/e2e/tests/evidence_test.go View File

@ -0,0 +1,50 @@
package e2e_test
import (
"bytes"
"testing"
"github.com/stretchr/testify/require"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
"github.com/tendermint/tendermint/types"
)
// assert that all nodes that have blocks during the height (or height + 1) of a misbehavior has evidence
// for that misbehavior
func TestEvidence_Misbehavior(t *testing.T) {
blocks := fetchBlockChain(t)
testNode(t, func(t *testing.T, node e2e.Node) {
for _, block := range blocks {
// Find any evidence blaming this node in this block
var nodeEvidence types.Evidence
for _, evidence := range block.Evidence.Evidence {
switch evidence := evidence.(type) {
case *types.DuplicateVoteEvidence:
if bytes.Equal(evidence.VoteA.ValidatorAddress, node.Key.PubKey().Address()) {
nodeEvidence = evidence
}
default:
t.Fatalf("unexpected evidence type %T", evidence)
}
}
// Check that evidence was as expected (evidence is submitted in following height)
misbehavior, ok := node.Misbehaviors[block.Height-1]
if !ok {
require.Nil(t, nodeEvidence, "found unexpected evidence %v in height %v",
nodeEvidence, block.Height)
continue
}
require.NotNil(t, nodeEvidence, "no evidence found for misbehavior %v in height %v",
misbehavior, block.Height)
switch misbehavior {
case "double-prevote":
require.IsType(t, &types.DuplicateVoteEvidence{}, nodeEvidence, "unexpected evidence type")
default:
t.Fatalf("unknown misbehavior %v", misbehavior)
}
}
})
}

+ 1
- 0
test/e2e/tests/net_test.go View File

@ -4,6 +4,7 @@ import (
"testing"
"github.com/stretchr/testify/require"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
)


+ 1
- 0
test/e2e/tests/validator_test.go View File

@ -5,6 +5,7 @@ import (
"testing"
"github.com/stretchr/testify/require"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
"github.com/tendermint/tendermint/types"
)


+ 51
- 0
test/maverick/README.md View File

@ -0,0 +1,51 @@
# Maverick
![](https://assets.rollingstone.com/assets/2015/article/tom-cruise-to-fight-drones-in-top-gun-sequel-20150629/201166/large_rect/1435581755/1401x788-Top-Gun-3.jpg)
A byzantine node used to test Tendermint consensus against a plethora of different faulty misbehaviors. Designed to easily create new faulty misbehaviors to examine how a Tendermint network reacts to the misbehavior. Can also be used for fuzzy testing with different network arrangements.
## Misbehaviors
A misbehavior allows control at the following stages as highlighted by the struct below
```go
type Misbehavior struct {
String string
EnterPropose func(cs *State, height int64, round int32)
EnterPrevote func(cs *State, height int64, round int32)
EnterPrecommit func(cs *State, height int64, round int32)
ReceivePrevote func(cs *State, prevote *types.Vote)
ReceivePrecommit func(cs *State, precommit *types.Vote)
ReceiveProposal func(cs *State, proposal *types.Proposal) error
}
```
At each of these events, the node can exhibit a different misbehavior. To create a new misbehavior define a function that builds off the existing default misbehavior and then overrides one or more of these functions. Then append it to the misbehaviors list so the node recognizes it like so:
```go
var MisbehaviorList = map[string]Misbehavior{
"double-prevote": DoublePrevoteMisbehavior(),
}
```
## Setup
The maverick node takes most of the functionality from the existing Tendermint CLI. To install this, in the directory of this readme, run:
```bash
go build
```
Use `maverick init` to initialize a single node and `maverick node` to run it. This will run it normally unless you use the misbehaviors flag as follows:
```bash
maverick node --proxy_app persistent_kvstore --misbehaviors double-vote,10
```
This would cause the node to vote twice in every round at height 10. To add more misbehaviors at different heights, append the next misbehavior and height after the first (with comma separation).

+ 220
- 0
test/maverick/consensus/metrics.go View File

@ -0,0 +1,220 @@
package consensus
import (
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/discard"
prometheus "github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
const (
// MetricsSubsystem is a subsystem shared by all metrics exposed by this
// package.
MetricsSubsystem = "consensus"
)
// Metrics contains metrics exposed by this package.
type Metrics struct {
// Height of the chain.
Height metrics.Gauge
// ValidatorLastSignedHeight of a validator.
ValidatorLastSignedHeight metrics.Gauge
// Number of rounds.
Rounds metrics.Gauge
// Number of validators.
Validators metrics.Gauge
// Total power of all validators.
ValidatorsPower metrics.Gauge
// Power of a validator.
ValidatorPower metrics.Gauge
// Amount of blocks missed by a validator.
ValidatorMissedBlocks metrics.Gauge
// Number of validators who did not sign.
MissingValidators metrics.Gauge
// Total power of the missing validators.
MissingValidatorsPower metrics.Gauge
// Number of validators who tried to double sign.
ByzantineValidators metrics.Gauge
// Total power of the byzantine validators.
ByzantineValidatorsPower metrics.Gauge
// Time between this and the last block.
BlockIntervalSeconds metrics.Histogram
// Number of transactions.
NumTxs metrics.Gauge
// Size of the block.
BlockSizeBytes metrics.Gauge
// Total number of transactions.
TotalTxs metrics.Gauge
// The latest block height.
CommittedHeight metrics.Gauge
// Whether or not a node is fast syncing. 1 if yes, 0 if no.
FastSyncing metrics.Gauge
// Whether or not a node is state syncing. 1 if yes, 0 if no.
StateSyncing metrics.Gauge
// Number of blockparts transmitted by peer.
BlockParts metrics.Counter
}
// PrometheusMetrics returns Metrics build using Prometheus client library.
// Optionally, labels can be provided along with their values ("foo",
// "fooValue").
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
labels := []string{}
for i := 0; i < len(labelsAndValues); i += 2 {
labels = append(labels, labelsAndValues[i])
}
return &Metrics{
Height: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "height",
Help: "Height of the chain.",
}, labels).With(labelsAndValues...),
Rounds: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "rounds",
Help: "Number of rounds.",
}, labels).With(labelsAndValues...),
Validators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "validators",
Help: "Number of validators.",
}, labels).With(labelsAndValues...),
ValidatorLastSignedHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "validator_last_signed_height",
Help: "Last signed height for a validator",
}, append(labels, "validator_address")).With(labelsAndValues...),
ValidatorMissedBlocks: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "validator_missed_blocks",
Help: "Total missed blocks for a validator",
}, append(labels, "validator_address")).With(labelsAndValues...),
ValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "validators_power",
Help: "Total power of all validators.",
}, labels).With(labelsAndValues...),
ValidatorPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "validator_power",
Help: "Power of a validator",
}, append(labels, "validator_address")).With(labelsAndValues...),
MissingValidators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "missing_validators",
Help: "Number of validators who did not sign.",
}, labels).With(labelsAndValues...),
MissingValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "missing_validators_power",
Help: "Total power of the missing validators.",
}, labels).With(labelsAndValues...),
ByzantineValidators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "byzantine_validators",
Help: "Number of validators who tried to double sign.",
}, labels).With(labelsAndValues...),
ByzantineValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "byzantine_validators_power",
Help: "Total power of the byzantine validators.",
}, labels).With(labelsAndValues...),
BlockIntervalSeconds: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "block_interval_seconds",
Help: "Time between this and the last block.",
}, labels).With(labelsAndValues...),
NumTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "num_txs",
Help: "Number of transactions.",
}, labels).With(labelsAndValues...),
BlockSizeBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "block_size_bytes",
Help: "Size of the block.",
}, labels).With(labelsAndValues...),
TotalTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "total_txs",
Help: "Total number of transactions.",
}, labels).With(labelsAndValues...),
CommittedHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "latest_block_height",
Help: "The latest block height.",
}, labels).With(labelsAndValues...),
FastSyncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "fast_syncing",
Help: "Whether or not a node is fast syncing. 1 if yes, 0 if no.",
}, labels).With(labelsAndValues...),
StateSyncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "state_syncing",
Help: "Whether or not a node is state syncing. 1 if yes, 0 if no.",
}, labels).With(labelsAndValues...),
BlockParts: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "block_parts",
Help: "Number of blockparts transmitted by peer.",
}, append(labels, "peer_id")).With(labelsAndValues...),
}
}
// NopMetrics returns no-op Metrics.
func NopMetrics() *Metrics {
return &Metrics{
Height: discard.NewGauge(),
ValidatorLastSignedHeight: discard.NewGauge(),
Rounds: discard.NewGauge(),
Validators: discard.NewGauge(),
ValidatorsPower: discard.NewGauge(),
ValidatorPower: discard.NewGauge(),
ValidatorMissedBlocks: discard.NewGauge(),
MissingValidators: discard.NewGauge(),
MissingValidatorsPower: discard.NewGauge(),
ByzantineValidators: discard.NewGauge(),
ByzantineValidatorsPower: discard.NewGauge(),
BlockIntervalSeconds: discard.NewHistogram(),
NumTxs: discard.NewGauge(),
BlockSizeBytes: discard.NewGauge(),
TotalTxs: discard.NewGauge(),
CommittedHeight: discard.NewGauge(),
FastSyncing: discard.NewGauge(),
StateSyncing: discard.NewGauge(),
BlockParts: discard.NewCounter(),
}
}

+ 398
- 0
test/maverick/consensus/misbehavior.go View File

@ -0,0 +1,398 @@
package consensus
import (
"fmt"
cstypes "github.com/tendermint/tendermint/consensus/types"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/types"
)
// MisbehaviorList encompasses a list of all possible behaviors
var MisbehaviorList = map[string]Misbehavior{
"double-prevote": DoublePrevoteMisbehavior(),
}
type Misbehavior struct {
Name string
EnterPropose func(cs *State, height int64, round int32)
EnterPrevote func(cs *State, height int64, round int32)
EnterPrecommit func(cs *State, height int64, round int32)
ReceivePrevote func(cs *State, prevote *types.Vote)
ReceivePrecommit func(cs *State, precommit *types.Vote)
ReceiveProposal func(cs *State, proposal *types.Proposal) error
}
// BEHAVIORS
func DefaultMisbehavior() Misbehavior {
return Misbehavior{
Name: "default",
EnterPropose: defaultEnterPropose,
EnterPrevote: defaultEnterPrevote,
EnterPrecommit: defaultEnterPrecommit,
ReceivePrevote: defaultReceivePrevote,
ReceivePrecommit: defaultReceivePrecommit,
ReceiveProposal: defaultReceiveProposal,
}
}
// DoublePrevoteMisbehavior will make a node prevote both nil and a block in the same
// height and round.
func DoublePrevoteMisbehavior() Misbehavior {
b := DefaultMisbehavior()
b.Name = "double-prevote"
b.EnterPrevote = func(cs *State, height int64, round int32) {
// If a block is locked, prevote that.
if cs.LockedBlock != nil {
cs.Logger.Info("enterPrevote: Already locked on a block, prevoting locked block")
cs.signAddVote(tmproto.PrevoteType, cs.LockedBlock.Hash(), cs.LockedBlockParts.Header())
return
}
// If ProposalBlock is nil, prevote nil.
if cs.ProposalBlock == nil {
cs.Logger.Info("enterPrevote: ProposalBlock is nil")
cs.signAddVote(tmproto.PrevoteType, nil, types.PartSetHeader{})
return
}
// Validate proposal block
err := cs.blockExec.ValidateBlock(cs.state, cs.ProposalBlock)
if err != nil {
// ProposalBlock is invalid, prevote nil.
cs.Logger.Error("enterPrevote: ProposalBlock is invalid", "err", err)
cs.signAddVote(tmproto.PrevoteType, nil, types.PartSetHeader{})
return
}
if cs.sw == nil {
cs.Logger.Error("nil switch")
return
}
prevote, err := cs.signVote(tmproto.PrevoteType, cs.ProposalBlock.Hash(), cs.ProposalBlockParts.Header())
if err != nil {
cs.Logger.Error("enterPrevote: Unable to sign block", "err", err)
}
nilPrevote, err := cs.signVote(tmproto.PrevoteType, nil, types.PartSetHeader{})
if err != nil {
cs.Logger.Error("enterPrevote: Unable to sign block", "err", err)
}
// add our own vote
cs.sendInternalMessage(msgInfo{&VoteMessage{prevote}, ""})
cs.Logger.Info("Sending conflicting votes")
peers := cs.sw.Peers().List()
// there has to be at least two other peers connected else this behavior works normally
for idx, peer := range peers {
if idx%2 == 0 { // sign the proposal block
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote}))
} else { // sign a nil block
peer.Send(VoteChannel, MustEncode(&VoteMessage{nilPrevote}))
}
}
}
return b
}
// DEFAULTS
func defaultEnterPropose(cs *State, height int64, round int32) {
logger := cs.Logger.With("height", height, "round", round)
// If we don't get the proposal and all block parts quick enough, enterPrevote
cs.scheduleTimeout(cs.config.Propose(round), height, round, cstypes.RoundStepPropose)
// Nothing more to do if we're not a validator
if cs.privValidator == nil {
logger.Debug("This node is not a validator")
return
}
logger.Debug("This node is a validator")
pubKey, err := cs.privValidator.GetPubKey()
if err != nil {
// If this node is a validator & proposer in the currentx round, it will
// miss the opportunity to create a block.
logger.Error("Error on retrival of pubkey", "err", err)
return
}
address := pubKey.Address()
// if not a validator, we're done
if !cs.Validators.HasAddress(address) {
logger.Debug("This node is not a validator", "addr", address, "vals", cs.Validators)
return
}
if cs.isProposer(address) {
logger.Info("enterPropose: Our turn to propose",
"proposer",
address,
"privValidator",
cs.privValidator)
cs.decideProposal(height, round)
} else {
logger.Info("enterPropose: Not our turn to propose",
"proposer",
cs.Validators.GetProposer().Address,
"privValidator",
cs.privValidator)
}
}
func defaultEnterPrevote(cs *State, height int64, round int32) {
logger := cs.Logger.With("height", height, "round", round)
// If a block is locked, prevote that.
if cs.LockedBlock != nil {
logger.Info("enterPrevote: Already locked on a block, prevoting locked block")
cs.signAddVote(tmproto.PrevoteType, cs.LockedBlock.Hash(), cs.LockedBlockParts.Header())
return
}
// If ProposalBlock is nil, prevote nil.
if cs.ProposalBlock == nil {
logger.Info("enterPrevote: ProposalBlock is nil")
cs.signAddVote(tmproto.PrevoteType, nil, types.PartSetHeader{})
return
}
// Validate proposal block
err := cs.blockExec.ValidateBlock(cs.state, cs.ProposalBlock)
if err != nil {
// ProposalBlock is invalid, prevote nil.
logger.Error("enterPrevote: ProposalBlock is invalid", "err", err)
cs.signAddVote(tmproto.PrevoteType, nil, types.PartSetHeader{})
return
}
// Prevote cs.ProposalBlock
// NOTE: the proposal signature is validated when it is received,
// and the proposal block parts are validated as they are received (against the merkle hash in the proposal)
logger.Info("enterPrevote: ProposalBlock is valid")
cs.signAddVote(tmproto.PrevoteType, cs.ProposalBlock.Hash(), cs.ProposalBlockParts.Header())
}
func defaultEnterPrecommit(cs *State, height int64, round int32) {
logger := cs.Logger.With("height", height, "round", round)
// check for a polka
blockID, ok := cs.Votes.Prevotes(round).TwoThirdsMajority()
// If we don't have a polka, we must precommit nil.
if !ok {
if cs.LockedBlock != nil {
logger.Info("enterPrecommit: No +2/3 prevotes during enterPrecommit while we're locked. Precommitting nil")
} else {
logger.Info("enterPrecommit: No +2/3 prevotes during enterPrecommit. Precommitting nil.")
}
cs.signAddVote(tmproto.PrecommitType, nil, types.PartSetHeader{})
return
}
// At this point +2/3 prevoted for a particular block or nil.
_ = cs.eventBus.PublishEventPolka(cs.RoundStateEvent())
// the latest POLRound should be this round.
polRound, _ := cs.Votes.POLInfo()
if polRound < round {
panic(fmt.Sprintf("This POLRound should be %v but got %v", round, polRound))
}
// +2/3 prevoted nil. Unlock and precommit nil.
if len(blockID.Hash) == 0 {
if cs.LockedBlock == nil {
logger.Info("enterPrecommit: +2/3 prevoted for nil.")
} else {
logger.Info("enterPrecommit: +2/3 prevoted for nil. Unlocking")
cs.LockedRound = -1
cs.LockedBlock = nil
cs.LockedBlockParts = nil
_ = cs.eventBus.PublishEventUnlock(cs.RoundStateEvent())
}
cs.signAddVote(tmproto.PrecommitType, nil, types.PartSetHeader{})
return
}
// At this point, +2/3 prevoted for a particular block.
// If we're already locked on that block, precommit it, and update the LockedRound
if cs.LockedBlock.HashesTo(blockID.Hash) {
logger.Info("enterPrecommit: +2/3 prevoted locked block. Relocking")
cs.LockedRound = round
_ = cs.eventBus.PublishEventRelock(cs.RoundStateEvent())
cs.signAddVote(tmproto.PrecommitType, blockID.Hash, blockID.PartSetHeader)
return
}
// If +2/3 prevoted for proposal block, stage and precommit it
if cs.ProposalBlock.HashesTo(blockID.Hash) {
logger.Info("enterPrecommit: +2/3 prevoted proposal block. Locking", "hash", blockID.Hash)
// Validate the block.
if err := cs.blockExec.ValidateBlock(cs.state, cs.ProposalBlock); err != nil {
panic(fmt.Sprintf("enterPrecommit: +2/3 prevoted for an invalid block: %v", err))
}
cs.LockedRound = round
cs.LockedBlock = cs.ProposalBlock
cs.LockedBlockParts = cs.ProposalBlockParts
_ = cs.eventBus.PublishEventLock(cs.RoundStateEvent())
cs.signAddVote(tmproto.PrecommitType, blockID.Hash, blockID.PartSetHeader)
return
}
// There was a polka in this round for a block we don't have.
// Fetch that block, unlock, and precommit nil.
// The +2/3 prevotes for this round is the POL for our unlock.
logger.Info("enterPrecommit: +2/3 prevotes for a block we don't have. Voting nil", "blockID", blockID)
cs.LockedRound = -1
cs.LockedBlock = nil
cs.LockedBlockParts = nil
if !cs.ProposalBlockParts.HasHeader(blockID.PartSetHeader) {
cs.ProposalBlock = nil
cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader)
}
_ = cs.eventBus.PublishEventUnlock(cs.RoundStateEvent())
cs.signAddVote(tmproto.PrecommitType, nil, types.PartSetHeader{})
}
func defaultReceivePrevote(cs *State, vote *types.Vote) {
height := cs.Height
prevotes := cs.Votes.Prevotes(vote.Round)
// If +2/3 prevotes for a block or nil for *any* round:
if blockID, ok := prevotes.TwoThirdsMajority(); ok {
// There was a polka!
// If we're locked but this is a recent polka, unlock.
// If it matches our ProposalBlock, update the ValidBlock
// Unlock if `cs.LockedRound < vote.Round <= cs.Round`
// NOTE: If vote.Round > cs.Round, we'll deal with it when we get to vote.Round
if (cs.LockedBlock != nil) &&
(cs.LockedRound < vote.Round) &&
(vote.Round <= cs.Round) &&
!cs.LockedBlock.HashesTo(blockID.Hash) {
cs.Logger.Info("Unlocking because of POL.", "lockedRound", cs.LockedRound, "POLRound", vote.Round)
cs.LockedRound = -1
cs.LockedBlock = nil
cs.LockedBlockParts = nil
_ = cs.eventBus.PublishEventUnlock(cs.RoundStateEvent())
}
// Update Valid* if we can.
// NOTE: our proposal block may be nil or not what received a polka..
if len(blockID.Hash) != 0 && (cs.ValidRound < vote.Round) && (vote.Round == cs.Round) {
if cs.ProposalBlock.HashesTo(blockID.Hash) {
cs.Logger.Info(
"Updating ValidBlock because of POL.", "validRound", cs.ValidRound, "POLRound", vote.Round)
cs.ValidRound = vote.Round
cs.ValidBlock = cs.ProposalBlock
cs.ValidBlockParts = cs.ProposalBlockParts
} else {
cs.Logger.Info(
"Valid block we don't know about. Set ProposalBlock=nil",
"proposal", cs.ProposalBlock.Hash(), "blockID", blockID.Hash)
// We're getting the wrong block.
cs.ProposalBlock = nil
}
if !cs.ProposalBlockParts.HasHeader(blockID.PartSetHeader) {
cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader)
}
cs.evsw.FireEvent(types.EventValidBlock, &cs.RoundState)
_ = cs.eventBus.PublishEventValidBlock(cs.RoundStateEvent())
}
}
// If +2/3 prevotes for *anything* for future round:
switch {
case cs.Round < vote.Round && prevotes.HasTwoThirdsAny():
// Round-skip if there is any 2/3+ of votes ahead of us
cs.enterNewRound(height, vote.Round)
case cs.Round == vote.Round && cstypes.RoundStepPrevote <= cs.Step: // current round
blockID, ok := prevotes.TwoThirdsMajority()
if ok && (cs.isProposalComplete() || len(blockID.Hash) == 0) {
cs.enterPrecommit(height, vote.Round)
} else if prevotes.HasTwoThirdsAny() {
cs.enterPrevoteWait(height, vote.Round)
}
case cs.Proposal != nil && 0 <= cs.Proposal.POLRound && cs.Proposal.POLRound == vote.Round:
// If the proposal is now complete, enter prevote of cs.Round.
if cs.isProposalComplete() {
cs.enterPrevote(height, cs.Round)
}
}
}
func defaultReceivePrecommit(cs *State, vote *types.Vote) {
height := cs.Height
precommits := cs.Votes.Precommits(vote.Round)
cs.Logger.Info("Added to precommit", "vote", vote, "precommits", precommits.StringShort())
blockID, ok := precommits.TwoThirdsMajority()
if ok {
// Executed as TwoThirdsMajority could be from a higher round
cs.enterNewRound(height, vote.Round)
cs.enterPrecommit(height, vote.Round)
if len(blockID.Hash) != 0 {
cs.enterCommit(height, vote.Round)
if cs.config.SkipTimeoutCommit && precommits.HasAll() {
cs.enterNewRound(cs.Height, 0)
}
} else {
cs.enterPrecommitWait(height, vote.Round)
}
} else if cs.Round <= vote.Round && precommits.HasTwoThirdsAny() {
cs.enterNewRound(height, vote.Round)
cs.enterPrecommitWait(height, vote.Round)
}
}
func defaultReceiveProposal(cs *State, proposal *types.Proposal) error {
// Already have one
// TODO: possibly catch double proposals
if cs.Proposal != nil {
return nil
}
// Does not apply
if proposal.Height != cs.Height || proposal.Round != cs.Round {
return nil
}
// Verify POLRound, which must be -1 or in range [0, proposal.Round).
if proposal.POLRound < -1 ||
(proposal.POLRound >= 0 && proposal.POLRound >= proposal.Round) {
return ErrInvalidProposalPOLRound
}
p := proposal.ToProto()
// Verify signature
if !cs.Validators.GetProposer().PubKey.VerifySignature(
types.ProposalSignBytes(cs.state.ChainID, p), proposal.Signature) {
return ErrInvalidProposalSignature
}
proposal.Signature = p.Signature
cs.Proposal = proposal
// We don't update cs.ProposalBlockParts if it is already set.
// This happens if we're already in cstypes.RoundStepCommit or if there is a valid block in the current round.
// TODO: We can check if Proposal is for a different block as this is a sign of misbehavior!
if cs.ProposalBlockParts == nil {
cs.ProposalBlockParts = types.NewPartSetFromHeader(proposal.BlockID.PartSetHeader)
}
cs.Logger.Info("Received proposal", "proposal", proposal)
return nil
}

+ 377
- 0
test/maverick/consensus/msgs.go View File

@ -0,0 +1,377 @@
package consensus
import (
"errors"
"fmt"
"github.com/gogo/protobuf/proto"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/libs/bits"
tmmath "github.com/tendermint/tendermint/libs/math"
"github.com/tendermint/tendermint/p2p"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/types"
)
// MsgToProto takes a consensus message type and returns the proto defined consensus message
func MsgToProto(msg Message) (*tmcons.Message, error) {
if msg == nil {
return nil, errors.New("consensus: message is nil")
}
var pb tmcons.Message
switch msg := msg.(type) {
case *NewRoundStepMessage:
pb = tmcons.Message{
Sum: &tmcons.Message_NewRoundStep{
NewRoundStep: &tmcons.NewRoundStep{
Height: msg.Height,
Round: msg.Round,
Step: uint32(msg.Step),
SecondsSinceStartTime: msg.SecondsSinceStartTime,
LastCommitRound: msg.LastCommitRound,
},
},
}
case *NewValidBlockMessage:
pbPartSetHeader := msg.BlockPartSetHeader.ToProto()
pbBits := msg.BlockParts.ToProto()
pb = tmcons.Message{
Sum: &tmcons.Message_NewValidBlock{
NewValidBlock: &tmcons.NewValidBlock{
Height: msg.Height,
Round: msg.Round,
BlockPartSetHeader: pbPartSetHeader,
BlockParts: pbBits,
IsCommit: msg.IsCommit,
},
},
}
case *ProposalMessage:
pbP := msg.Proposal.ToProto()
pb = tmcons.Message{
Sum: &tmcons.Message_Proposal{
Proposal: &tmcons.Proposal{
Proposal: *pbP,
},
},
}
case *ProposalPOLMessage:
pbBits := msg.ProposalPOL.ToProto()
pb = tmcons.Message{
Sum: &tmcons.Message_ProposalPol{
ProposalPol: &tmcons.ProposalPOL{
Height: msg.Height,
ProposalPolRound: msg.ProposalPOLRound,
ProposalPol: *pbBits,
},
},
}
case *BlockPartMessage:
parts, err := msg.Part.ToProto()
if err != nil {
return nil, fmt.Errorf("msg to proto error: %w", err)
}
pb = tmcons.Message{
Sum: &tmcons.Message_BlockPart{
BlockPart: &tmcons.BlockPart{
Height: msg.Height,
Round: msg.Round,
Part: *parts,
},
},
}
case *VoteMessage:
vote := msg.Vote.ToProto()
pb = tmcons.Message{
Sum: &tmcons.Message_Vote{
Vote: &tmcons.Vote{
Vote: vote,
},
},
}
case *HasVoteMessage:
pb = tmcons.Message{
Sum: &tmcons.Message_HasVote{
HasVote: &tmcons.HasVote{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
Index: msg.Index,
},
},
}
case *VoteSetMaj23Message:
bi := msg.BlockID.ToProto()
pb = tmcons.Message{
Sum: &tmcons.Message_VoteSetMaj23{
VoteSetMaj23: &tmcons.VoteSetMaj23{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: bi,
},
},
}
case *VoteSetBitsMessage:
bi := msg.BlockID.ToProto()
bits := msg.Votes.ToProto()
vsb := &tmcons.Message_VoteSetBits{
VoteSetBits: &tmcons.VoteSetBits{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: bi,
},
}
if bits != nil {
vsb.VoteSetBits.Votes = *bits
}
pb = tmcons.Message{
Sum: vsb,
}
default:
return nil, fmt.Errorf("consensus: message not recognized: %T", msg)
}
return &pb, nil
}
// MsgFromProto takes a consensus proto message and returns the native go type
func MsgFromProto(msg *tmcons.Message) (Message, error) {
if msg == nil {
return nil, errors.New("consensus: nil message")
}
var pb Message
switch msg := msg.Sum.(type) {
case *tmcons.Message_NewRoundStep:
rs, err := tmmath.SafeConvertUint8(int64(msg.NewRoundStep.Step))
// deny message based on possible overflow
if err != nil {
return nil, fmt.Errorf("denying message due to possible overflow: %w", err)
}
pb = &NewRoundStepMessage{
Height: msg.NewRoundStep.Height,
Round: msg.NewRoundStep.Round,
Step: cstypes.RoundStepType(rs),
SecondsSinceStartTime: msg.NewRoundStep.SecondsSinceStartTime,
LastCommitRound: msg.NewRoundStep.LastCommitRound,
}
case *tmcons.Message_NewValidBlock:
pbPartSetHeader, err := types.PartSetHeaderFromProto(&msg.NewValidBlock.BlockPartSetHeader)
if err != nil {
return nil, fmt.Errorf("parts to proto error: %w", err)
}
pbBits := new(bits.BitArray)
pbBits.FromProto(msg.NewValidBlock.BlockParts)
pb = &NewValidBlockMessage{
Height: msg.NewValidBlock.Height,
Round: msg.NewValidBlock.Round,
BlockPartSetHeader: *pbPartSetHeader,
BlockParts: pbBits,
IsCommit: msg.NewValidBlock.IsCommit,
}
case *tmcons.Message_Proposal:
pbP, err := types.ProposalFromProto(&msg.Proposal.Proposal)
if err != nil {
return nil, fmt.Errorf("proposal msg to proto error: %w", err)
}
pb = &ProposalMessage{
Proposal: pbP,
}
case *tmcons.Message_ProposalPol:
pbBits := new(bits.BitArray)
pbBits.FromProto(&msg.ProposalPol.ProposalPol)
pb = &ProposalPOLMessage{
Height: msg.ProposalPol.Height,
ProposalPOLRound: msg.ProposalPol.ProposalPolRound,
ProposalPOL: pbBits,
}
case *tmcons.Message_BlockPart:
parts, err := types.PartFromProto(&msg.BlockPart.Part)
if err != nil {
return nil, fmt.Errorf("blockpart msg to proto error: %w", err)
}
pb = &BlockPartMessage{
Height: msg.BlockPart.Height,
Round: msg.BlockPart.Round,
Part: parts,
}
case *tmcons.Message_Vote:
vote, err := types.VoteFromProto(msg.Vote.Vote)
if err != nil {
return nil, fmt.Errorf("vote msg to proto error: %w", err)
}
pb = &VoteMessage{
Vote: vote,
}
case *tmcons.Message_HasVote:
pb = &HasVoteMessage{
Height: msg.HasVote.Height,
Round: msg.HasVote.Round,
Type: msg.HasVote.Type,
Index: msg.HasVote.Index,
}
case *tmcons.Message_VoteSetMaj23:
bi, err := types.BlockIDFromProto(&msg.VoteSetMaj23.BlockID)
if err != nil {
return nil, fmt.Errorf("voteSetMaj23 msg to proto error: %w", err)
}
pb = &VoteSetMaj23Message{
Height: msg.VoteSetMaj23.Height,
Round: msg.VoteSetMaj23.Round,
Type: msg.VoteSetMaj23.Type,
BlockID: *bi,
}
case *tmcons.Message_VoteSetBits:
bi, err := types.BlockIDFromProto(&msg.VoteSetBits.BlockID)
if err != nil {
return nil, fmt.Errorf("voteSetBits msg to proto error: %w", err)
}
bits := new(bits.BitArray)
bits.FromProto(&msg.VoteSetBits.Votes)
pb = &VoteSetBitsMessage{
Height: msg.VoteSetBits.Height,
Round: msg.VoteSetBits.Round,
Type: msg.VoteSetBits.Type,
BlockID: *bi,
Votes: bits,
}
default:
return nil, fmt.Errorf("consensus: message not recognized: %T", msg)
}
if err := pb.ValidateBasic(); err != nil {
return nil, err
}
return pb, nil
}
// MustEncode takes the reactors msg, makes it proto and marshals it
// this mimics `MustMarshalBinaryBare` in that is panics on error
func MustEncode(msg Message) []byte {
pb, err := MsgToProto(msg)
if err != nil {
panic(err)
}
enc, err := proto.Marshal(pb)
if err != nil {
panic(err)
}
return enc
}
// WALToProto takes a WAL message and return a proto walMessage and error
func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
var pb tmcons.WALMessage
switch msg := msg.(type) {
case types.EventDataRoundState:
pb = tmcons.WALMessage{
Sum: &tmcons.WALMessage_EventDataRoundState{
EventDataRoundState: &tmproto.EventDataRoundState{
Height: msg.Height,
Round: msg.Round,
Step: msg.Step,
},
},
}
case msgInfo:
consMsg, err := MsgToProto(msg.Msg)
if err != nil {
return nil, err
}
pb = tmcons.WALMessage{
Sum: &tmcons.WALMessage_MsgInfo{
MsgInfo: &tmcons.MsgInfo{
Msg: *consMsg,
PeerID: string(msg.PeerID),
},
},
}
case timeoutInfo:
pb = tmcons.WALMessage{
Sum: &tmcons.WALMessage_TimeoutInfo{
TimeoutInfo: &tmcons.TimeoutInfo{
Duration: msg.Duration,
Height: msg.Height,
Round: msg.Round,
Step: uint32(msg.Step),
},
},
}
case EndHeightMessage:
pb = tmcons.WALMessage{
Sum: &tmcons.WALMessage_EndHeight{
EndHeight: &tmcons.EndHeight{
Height: msg.Height,
},
},
}
default:
return nil, fmt.Errorf("to proto: wal message not recognized: %T", msg)
}
return &pb, nil
}
// WALFromProto takes a proto wal message and return a consensus walMessage and error
func WALFromProto(msg *tmcons.WALMessage) (WALMessage, error) {
if msg == nil {
return nil, errors.New("nil WAL message")
}
var pb WALMessage
switch msg := msg.Sum.(type) {
case *tmcons.WALMessage_EventDataRoundState:
pb = types.EventDataRoundState{
Height: msg.EventDataRoundState.Height,
Round: msg.EventDataRoundState.Round,
Step: msg.EventDataRoundState.Step,
}
case *tmcons.WALMessage_MsgInfo:
walMsg, err := MsgFromProto(&msg.MsgInfo.Msg)
if err != nil {
return nil, fmt.Errorf("msgInfo from proto error: %w", err)
}
pb = msgInfo{
Msg: walMsg,
PeerID: p2p.ID(msg.MsgInfo.PeerID),
}
case *tmcons.WALMessage_TimeoutInfo:
tis, err := tmmath.SafeConvertUint8(int64(msg.TimeoutInfo.Step))
// deny message based on possible overflow
if err != nil {
return nil, fmt.Errorf("denying message due to possible overflow: %w", err)
}
pb = timeoutInfo{
Duration: msg.TimeoutInfo.Duration,
Height: msg.TimeoutInfo.Height,
Round: msg.TimeoutInfo.Round,
Step: cstypes.RoundStepType(tis),
}
return pb, nil
case *tmcons.WALMessage_EndHeight:
pb := EndHeightMessage{
Height: msg.EndHeight.Height,
}
return pb, nil
default:
return nil, fmt.Errorf("from proto: wal message not recognized: %T", msg)
}
return pb, nil
}

+ 1720
- 0
test/maverick/consensus/reactor.go
File diff suppressed because it is too large
View File


+ 533
- 0
test/maverick/consensus/replay.go View File

@ -0,0 +1,533 @@
package consensus
import (
"bytes"
"fmt"
"hash/crc32"
"io"
"reflect"
"time"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/merkle"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
)
var crc32c = crc32.MakeTable(crc32.Castagnoli)
// Functionality to replay blocks and messages on recovery from a crash.
// There are two general failure scenarios:
//
// 1. failure during consensus
// 2. failure while applying the block
//
// The former is handled by the WAL, the latter by the proxyApp Handshake on
// restart, which ultimately hands off the work to the WAL.
//-----------------------------------------
// 1. Recover from failure during consensus
// (by replaying messages from the WAL)
//-----------------------------------------
// Unmarshal and apply a single message to the consensus state as if it were
// received in receiveRoutine. Lines that start with "#" are ignored.
// NOTE: receiveRoutine should not be running.
func (cs *State) readReplayMessage(msg *TimedWALMessage, newStepSub types.Subscription) error {
// Skip meta messages which exist for demarcating boundaries.
if _, ok := msg.Msg.(EndHeightMessage); ok {
return nil
}
// for logging
switch m := msg.Msg.(type) {
case types.EventDataRoundState:
cs.Logger.Info("Replay: New Step", "height", m.Height, "round", m.Round, "step", m.Step)
// these are playback checks
ticker := time.After(time.Second * 2)
if newStepSub != nil {
select {
case stepMsg := <-newStepSub.Out():
m2 := stepMsg.Data().(types.EventDataRoundState)
if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step {
return fmt.Errorf("roundState mismatch. Got %v; Expected %v", m2, m)
}
case <-newStepSub.Cancelled():
return fmt.Errorf("failed to read off newStepSub.Out(). newStepSub was cancelled")
case <-ticker:
return fmt.Errorf("failed to read off newStepSub.Out()")
}
}
case msgInfo:
peerID := m.PeerID
if peerID == "" {
peerID = "local"
}
switch msg := m.Msg.(type) {
case *ProposalMessage:
p := msg.Proposal
cs.Logger.Info("Replay: Proposal", "height", p.Height, "round", p.Round, "header",
p.BlockID.PartSetHeader, "pol", p.POLRound, "peer", peerID)
case *BlockPartMessage:
cs.Logger.Info("Replay: BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerID)
case *VoteMessage:
v := msg.Vote
cs.Logger.Info("Replay: Vote", "height", v.Height, "round", v.Round, "type", v.Type,
"blockID", v.BlockID, "peer", peerID)
}
cs.handleMsg(m)
case timeoutInfo:
cs.Logger.Info("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration)
cs.handleTimeout(m, cs.RoundState)
default:
return fmt.Errorf("replay: Unknown TimedWALMessage type: %v", reflect.TypeOf(msg.Msg))
}
return nil
}
// Replay only those messages since the last block. `timeoutRoutine` should
// run concurrently to read off tickChan.
func (cs *State) catchupReplay(csHeight int64) error {
// Set replayMode to true so we don't log signing errors.
cs.replayMode = true
defer func() { cs.replayMode = false }()
// Ensure that #ENDHEIGHT for this height doesn't exist.
// NOTE: This is just a sanity check. As far as we know things work fine
// without it, and Handshake could reuse State if it weren't for
// this check (since we can crash after writing #ENDHEIGHT).
//
// Ignore data corruption errors since this is a sanity check.
gr, found, err := cs.wal.SearchForEndHeight(csHeight, &WALSearchOptions{IgnoreDataCorruptionErrors: true})
if err != nil {
return err
}
if gr != nil {
if err := gr.Close(); err != nil {
return err
}
}
if found {
return fmt.Errorf("wal should not contain #ENDHEIGHT %d", csHeight)
}
// Search for last height marker.
//
// Ignore data corruption errors in previous heights because we only care about last height
if csHeight < cs.state.InitialHeight {
return fmt.Errorf("cannot replay height %v, below initial height %v", csHeight, cs.state.InitialHeight)
}
endHeight := csHeight - 1
if csHeight == cs.state.InitialHeight {
endHeight = 0
}
gr, found, err = cs.wal.SearchForEndHeight(endHeight, &WALSearchOptions{IgnoreDataCorruptionErrors: true})
if err == io.EOF {
cs.Logger.Error("Replay: wal.group.Search returned EOF", "#ENDHEIGHT", endHeight)
} else if err != nil {
return err
}
if !found {
return fmt.Errorf("cannot replay height %d. WAL does not contain #ENDHEIGHT for %d", csHeight, endHeight)
}
defer gr.Close()
cs.Logger.Info("Catchup by replaying consensus messages", "height", csHeight)
var msg *TimedWALMessage
dec := WALDecoder{gr}
LOOP:
for {
msg, err = dec.Decode()
switch {
case err == io.EOF:
break LOOP
case IsDataCorruptionError(err):
cs.Logger.Error("data has been corrupted in last height of consensus WAL", "err", err, "height", csHeight)
return err
case err != nil:
return err
}
// NOTE: since the priv key is set when the msgs are received
// it will attempt to eg double sign but we can just ignore it
// since the votes will be replayed and we'll get to the next step
if err := cs.readReplayMessage(msg, nil); err != nil {
return err
}
}
cs.Logger.Info("Replay: Done")
return nil
}
//--------------------------------------------------------------------------------
// Parses marker lines of the form:
// #ENDHEIGHT: 12345
/*
func makeHeightSearchFunc(height int64) auto.SearchFunc {
return func(line string) (int, error) {
line = strings.TrimRight(line, "\n")
parts := strings.Split(line, " ")
if len(parts) != 2 {
return -1, errors.New("line did not have 2 parts")
}
i, err := strconv.Atoi(parts[1])
if err != nil {
return -1, errors.New("failed to parse INFO: " + err.Error())
}
if height < i {
return 1, nil
} else if height == i {
return 0, nil
} else {
return -1, nil
}
}
}*/
//---------------------------------------------------
// 2. Recover from failure while applying the block.
// (by handshaking with the app to figure out where
// we were last, and using the WAL to recover there.)
//---------------------------------------------------
type Handshaker struct {
stateStore sm.Store
initialState sm.State
store sm.BlockStore
eventBus types.BlockEventPublisher
genDoc *types.GenesisDoc
logger log.Logger
nBlocks int // number of blocks applied to the state
}
func NewHandshaker(stateStore sm.Store, state sm.State,
store sm.BlockStore, genDoc *types.GenesisDoc) *Handshaker {
return &Handshaker{
stateStore: stateStore,
initialState: state,
store: store,
eventBus: types.NopEventBus{},
genDoc: genDoc,
logger: log.NewNopLogger(),
nBlocks: 0,
}
}
func (h *Handshaker) SetLogger(l log.Logger) {
h.logger = l
}
// SetEventBus - sets the event bus for publishing block related events.
// If not called, it defaults to types.NopEventBus.
func (h *Handshaker) SetEventBus(eventBus types.BlockEventPublisher) {
h.eventBus = eventBus
}
// NBlocks returns the number of blocks applied to the state.
func (h *Handshaker) NBlocks() int {
return h.nBlocks
}
// TODO: retry the handshake/replay if it fails ?
func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
// Handshake is done via ABCI Info on the query conn.
res, err := proxyApp.Query().InfoSync(proxy.RequestInfo)
if err != nil {
return fmt.Errorf("error calling Info: %v", err)
}
blockHeight := res.LastBlockHeight
if blockHeight < 0 {
return fmt.Errorf("got a negative last block height (%d) from the app", blockHeight)
}
appHash := res.LastBlockAppHash
h.logger.Info("ABCI Handshake App Info",
"height", blockHeight,
"hash", fmt.Sprintf("%X", appHash),
"software-version", res.Version,
"protocol-version", res.AppVersion,
)
// Only set the version if there is no existing state.
if h.initialState.LastBlockHeight == 0 {
h.initialState.Version.Consensus.App = res.AppVersion
}
// Replay blocks up to the latest in the blockstore.
_, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp)
if err != nil {
return fmt.Errorf("error on replay: %v", err)
}
h.logger.Info("Completed ABCI Handshake - Tendermint and App are synced",
"appHeight", blockHeight, "appHash", fmt.Sprintf("%X", appHash))
// TODO: (on restart) replay mempool
return nil
}
// ReplayBlocks replays all blocks since appBlockHeight and ensures the result
// matches the current state.
// Returns the final AppHash or an error.
func (h *Handshaker) ReplayBlocks(
state sm.State,
appHash []byte,
appBlockHeight int64,
proxyApp proxy.AppConns,
) ([]byte, error) {
storeBlockBase := h.store.Base()
storeBlockHeight := h.store.Height()
stateBlockHeight := state.LastBlockHeight
h.logger.Info(
"ABCI Replay Blocks",
"appHeight",
appBlockHeight,
"storeHeight",
storeBlockHeight,
"stateHeight",
stateBlockHeight)
// If appBlockHeight == 0 it means that we are at genesis and hence should send InitChain.
if appBlockHeight == 0 {
validators := make([]*types.Validator, len(h.genDoc.Validators))
for i, val := range h.genDoc.Validators {
validators[i] = types.NewValidator(val.PubKey, val.Power)
}
validatorSet := types.NewValidatorSet(validators)
nextVals := types.TM2PB.ValidatorUpdates(validatorSet)
csParams := types.TM2PB.ConsensusParams(h.genDoc.ConsensusParams)
req := abci.RequestInitChain{
Time: h.genDoc.GenesisTime,
ChainId: h.genDoc.ChainID,
InitialHeight: h.genDoc.InitialHeight,
ConsensusParams: csParams,
Validators: nextVals,
AppStateBytes: h.genDoc.AppState,
}
res, err := proxyApp.Consensus().InitChainSync(req)
if err != nil {
return nil, err
}
appHash = res.AppHash
if stateBlockHeight == 0 { // we only update state when we are in initial state
// If the app did not return an app hash, we keep the one set from the genesis doc in
// the state. We don't set appHash since we don't want the genesis doc app hash
// recorded in the genesis block. We should probably just remove GenesisDoc.AppHash.
if len(res.AppHash) > 0 {
state.AppHash = res.AppHash
}
// If the app returned validators or consensus params, update the state.
if len(res.Validators) > 0 {
vals, err := types.PB2TM.ValidatorUpdates(res.Validators)
if err != nil {
return nil, err
}
state.Validators = types.NewValidatorSet(vals)
state.NextValidators = types.NewValidatorSet(vals).CopyIncrementProposerPriority(1)
} else if len(h.genDoc.Validators) == 0 {
// If validator set is not set in genesis and still empty after InitChain, exit.
return nil, fmt.Errorf("validator set is nil in genesis and still empty after InitChain")
}
if res.ConsensusParams != nil {
state.ConsensusParams = types.UpdateConsensusParams(state.ConsensusParams, res.ConsensusParams)
state.Version.Consensus.App = state.ConsensusParams.Version.AppVersion
}
// We update the last results hash with the empty hash, to conform with RFC-6962.
state.LastResultsHash = merkle.HashFromByteSlices(nil)
if err := h.stateStore.Save(state); err != nil {
return nil, err
}
}
}
// First handle edge cases and constraints on the storeBlockHeight and storeBlockBase.
switch {
case storeBlockHeight == 0:
assertAppHashEqualsOneFromState(appHash, state)
return appHash, nil
case appBlockHeight == 0 && state.InitialHeight < storeBlockBase:
// the app has no state, and the block store is truncated above the initial height
return appHash, sm.ErrAppBlockHeightTooLow{AppHeight: appBlockHeight, StoreBase: storeBlockBase}
case appBlockHeight > 0 && appBlockHeight < storeBlockBase-1:
// the app is too far behind truncated store (can be 1 behind since we replay the next)
return appHash, sm.ErrAppBlockHeightTooLow{AppHeight: appBlockHeight, StoreBase: storeBlockBase}
case storeBlockHeight < appBlockHeight:
// the app should never be ahead of the store (but this is under app's control)
return appHash, sm.ErrAppBlockHeightTooHigh{CoreHeight: storeBlockHeight, AppHeight: appBlockHeight}
case storeBlockHeight < stateBlockHeight:
// the state should never be ahead of the store (this is under tendermint's control)
panic(fmt.Sprintf("StateBlockHeight (%d) > StoreBlockHeight (%d)", stateBlockHeight, storeBlockHeight))
case storeBlockHeight > stateBlockHeight+1:
// store should be at most one ahead of the state (this is under tendermint's control)
panic(fmt.Sprintf("StoreBlockHeight (%d) > StateBlockHeight + 1 (%d)", storeBlockHeight, stateBlockHeight+1))
}
var err error
// Now either store is equal to state, or one ahead.
// For each, consider all cases of where the app could be, given app <= store
if storeBlockHeight == stateBlockHeight {
// Tendermint ran Commit and saved the state.
// Either the app is asking for replay, or we're all synced up.
if appBlockHeight < storeBlockHeight {
// the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store)
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false)
} else if appBlockHeight == storeBlockHeight {
// We're good!
assertAppHashEqualsOneFromState(appHash, state)
return appHash, nil
}
} else if storeBlockHeight == stateBlockHeight+1 {
// We saved the block in the store but haven't updated the state,
// so we'll need to replay a block using the WAL.
switch {
case appBlockHeight < stateBlockHeight:
// the app is further behind than it should be, so replay blocks
// but leave the last block to go through the WAL
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true)
case appBlockHeight == stateBlockHeight:
// We haven't run Commit (both the state and app are one block behind),
// so replayBlock with the real app.
// NOTE: We could instead use the cs.WAL on cs.Start,
// but we'd have to allow the WAL to replay a block that wrote it's #ENDHEIGHT
h.logger.Info("Replay last block using real app")
state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus())
return state.AppHash, err
case appBlockHeight == storeBlockHeight:
// We ran Commit, but didn't save the state, so replayBlock with mock app.
abciResponses, err := h.stateStore.LoadABCIResponses(storeBlockHeight)
if err != nil {
return nil, err
}
mockApp := newMockProxyApp(appHash, abciResponses)
h.logger.Info("Replay last block using mock app")
state, err = h.replayBlock(state, storeBlockHeight, mockApp)
return state.AppHash, err
}
}
panic(fmt.Sprintf("uncovered case! appHeight: %d, storeHeight: %d, stateHeight: %d",
appBlockHeight, storeBlockHeight, stateBlockHeight))
}
func (h *Handshaker) replayBlocks(
state sm.State,
proxyApp proxy.AppConns,
appBlockHeight,
storeBlockHeight int64,
mutateState bool) ([]byte, error) {
// App is further behind than it should be, so we need to replay blocks.
// We replay all blocks from appBlockHeight+1.
//
// Note that we don't have an old version of the state,
// so we by-pass state validation/mutation using sm.ExecCommitBlock.
// This also means we won't be saving validator sets if they change during this period.
// TODO: Load the historical information to fix this and just use state.ApplyBlock
//
// If mutateState == true, the final block is replayed with h.replayBlock()
var appHash []byte
var err error
finalBlock := storeBlockHeight
if mutateState {
finalBlock--
}
firstBlock := appBlockHeight + 1
if firstBlock == 1 {
firstBlock = state.InitialHeight
}
for i := firstBlock; i <= finalBlock; i++ {
h.logger.Info("Applying block", "height", i)
block := h.store.LoadBlock(i)
// Extra check to ensure the app was not changed in a way it shouldn't have.
if len(appHash) > 0 {
assertAppHashEqualsOneFromBlock(appHash, block)
}
appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight)
if err != nil {
return nil, err
}
h.nBlocks++
}
if mutateState {
// sync the final block
state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus())
if err != nil {
return nil, err
}
appHash = state.AppHash
}
assertAppHashEqualsOneFromState(appHash, state)
return appHash, nil
}
// ApplyBlock on the proxyApp with the last block.
func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.AppConnConsensus) (sm.State, error) {
block := h.store.LoadBlock(height)
meta := h.store.LoadBlockMeta(height)
// Use stubs for both mempool and evidence pool since no transactions nor
// evidence are needed here - block already exists.
blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{})
blockExec.SetEventBus(h.eventBus)
var err error
state, _, err = blockExec.ApplyBlock(state, meta.BlockID, block)
if err != nil {
return sm.State{}, err
}
h.nBlocks++
return state, nil
}
func assertAppHashEqualsOneFromBlock(appHash []byte, block *types.Block) {
if !bytes.Equal(appHash, block.AppHash) {
panic(fmt.Sprintf(`block.AppHash does not match AppHash after replay. Got %X, expected %X.
Block: %v
`,
appHash, block.AppHash, block))
}
}
func assertAppHashEqualsOneFromState(appHash []byte, state sm.State) {
if !bytes.Equal(appHash, state.AppHash) {
panic(fmt.Sprintf(`state.AppHash does not match AppHash after replay. Got
%X, expected %X.
State: %v
Did you reset Tendermint without resetting your application's data?`,
appHash, state.AppHash, state))
}
}

+ 338
- 0
test/maverick/consensus/replay_file.go View File

@ -0,0 +1,338 @@
package consensus
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
"strconv"
"strings"
dbm "github.com/tendermint/tm-db"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
tmos "github.com/tendermint/tendermint/libs/os"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
)
const (
// event bus subscriber
subscriber = "replay-file"
)
//--------------------------------------------------------
// replay messages interactively or all at once
// replay the wal file
func RunReplayFile(config cfg.BaseConfig, csConfig *cfg.ConsensusConfig, console bool) {
consensusState := newConsensusStateForReplay(config, csConfig)
if err := consensusState.ReplayFile(csConfig.WalFile(), console); err != nil {
tmos.Exit(fmt.Sprintf("Error during consensus replay: %v", err))
}
}
// Replay msgs in file or start the console
func (cs *State) ReplayFile(file string, console bool) error {
if cs.IsRunning() {
return errors.New("cs is already running, cannot replay")
}
if cs.wal != nil {
return errors.New("cs wal is open, cannot replay")
}
cs.startForReplay()
// ensure all new step events are regenerated as expected
ctx := context.Background()
newStepSub, err := cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep)
if err != nil {
return fmt.Errorf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep)
}
defer func() {
if err := cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep); err != nil {
cs.Logger.Error("Error unsubscribing to event bus", "err", err)
}
}()
// just open the file for reading, no need to use wal
fp, err := os.OpenFile(file, os.O_RDONLY, 0600)
if err != nil {
return err
}
pb := newPlayback(file, fp, cs, cs.state.Copy())
defer pb.fp.Close()
var nextN int // apply N msgs in a row
var msg *TimedWALMessage
for {
if nextN == 0 && console {
nextN = pb.replayConsoleLoop()
}
msg, err = pb.dec.Decode()
if err == io.EOF {
return nil
} else if err != nil {
return err
}
if err := pb.cs.readReplayMessage(msg, newStepSub); err != nil {
return err
}
if nextN > 0 {
nextN--
}
pb.count++
}
}
//------------------------------------------------
// playback manager
type playback struct {
cs *State
fp *os.File
dec *WALDecoder
count int // how many lines/msgs into the file are we
// replays can be reset to beginning
fileName string // so we can close/reopen the file
genesisState sm.State // so the replay session knows where to restart from
}
func newPlayback(fileName string, fp *os.File, cs *State, genState sm.State) *playback {
return &playback{
cs: cs,
fp: fp,
fileName: fileName,
genesisState: genState,
dec: NewWALDecoder(fp),
}
}
// go back count steps by resetting the state and running (pb.count - count) steps
func (pb *playback) replayReset(count int, newStepSub types.Subscription) error {
if err := pb.cs.Stop(); err != nil {
return err
}
pb.cs.Wait()
newCS := NewState(pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec,
pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool, map[int64]Misbehavior{})
newCS.SetEventBus(pb.cs.eventBus)
newCS.startForReplay()
if err := pb.fp.Close(); err != nil {
return err
}
fp, err := os.OpenFile(pb.fileName, os.O_RDONLY, 0600)
if err != nil {
return err
}
pb.fp = fp
pb.dec = NewWALDecoder(fp)
count = pb.count - count
fmt.Printf("Reseting from %d to %d\n", pb.count, count)
pb.count = 0
pb.cs = newCS
var msg *TimedWALMessage
for i := 0; i < count; i++ {
msg, err = pb.dec.Decode()
if err == io.EOF {
return nil
} else if err != nil {
return err
}
if err := pb.cs.readReplayMessage(msg, newStepSub); err != nil {
return err
}
pb.count++
}
return nil
}
func (cs *State) startForReplay() {
cs.Logger.Error("Replay commands are disabled until someone updates them and writes tests")
/* TODO:!
// since we replay tocks we just ignore ticks
go func() {
for {
select {
case <-cs.tickChan:
case <-cs.Quit:
return
}
}
}()*/
}
// console function for parsing input and running commands
func (pb *playback) replayConsoleLoop() int {
for {
fmt.Printf("> ")
bufReader := bufio.NewReader(os.Stdin)
line, more, err := bufReader.ReadLine()
if more {
tmos.Exit("input is too long")
} else if err != nil {
tmos.Exit(err.Error())
}
tokens := strings.Split(string(line), " ")
if len(tokens) == 0 {
continue
}
switch tokens[0] {
case "next":
// "next" -> replay next message
// "next N" -> replay next N messages
if len(tokens) == 1 {
return 0
}
i, err := strconv.Atoi(tokens[1])
if err != nil {
fmt.Println("next takes an integer argument")
} else {
return i
}
case "back":
// "back" -> go back one message
// "back N" -> go back N messages
// NOTE: "back" is not supported in the state machine design,
// so we restart and replay up to
ctx := context.Background()
// ensure all new step events are regenerated as expected
newStepSub, err := pb.cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep)
if err != nil {
tmos.Exit(fmt.Sprintf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep))
}
defer func() {
if err := pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep); err != nil {
pb.cs.Logger.Error("Error unsubscribing from eventBus", "err", err)
}
}()
if len(tokens) == 1 {
if err := pb.replayReset(1, newStepSub); err != nil {
pb.cs.Logger.Error("Replay reset error", "err", err)
}
} else {
i, err := strconv.Atoi(tokens[1])
if err != nil {
fmt.Println("back takes an integer argument")
} else if i > pb.count {
fmt.Printf("argument to back must not be larger than the current count (%d)\n", pb.count)
} else if err := pb.replayReset(i, newStepSub); err != nil {
pb.cs.Logger.Error("Replay reset error", "err", err)
}
}
case "rs":
// "rs" -> print entire round state
// "rs short" -> print height/round/step
// "rs <field>" -> print another field of the round state
rs := pb.cs.RoundState
if len(tokens) == 1 {
fmt.Println(rs)
} else {
switch tokens[1] {
case "short":
fmt.Printf("%v/%v/%v\n", rs.Height, rs.Round, rs.Step)
case "validators":
fmt.Println(rs.Validators)
case "proposal":
fmt.Println(rs.Proposal)
case "proposal_block":
fmt.Printf("%v %v\n", rs.ProposalBlockParts.StringShort(), rs.ProposalBlock.StringShort())
case "locked_round":
fmt.Println(rs.LockedRound)
case "locked_block":
fmt.Printf("%v %v\n", rs.LockedBlockParts.StringShort(), rs.LockedBlock.StringShort())
case "votes":
fmt.Println(rs.Votes.StringIndented(" "))
default:
fmt.Println("Unknown option", tokens[1])
}
}
case "n":
fmt.Println(pb.count)
}
}
}
//--------------------------------------------------------------------------------
// convenience for replay mode
func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusConfig) *State {
dbType := dbm.BackendType(config.DBBackend)
// Get BlockStore
blockStoreDB, err := dbm.NewDB("blockstore", dbType, config.DBDir())
if err != nil {
tmos.Exit(err.Error())
}
blockStore := store.NewBlockStore(blockStoreDB)
// Get State
stateDB, err := dbm.NewDB("state", dbType, config.DBDir())
if err != nil {
tmos.Exit(err.Error())
}
stateStore := sm.NewStore(stateDB)
gdoc, err := sm.MakeGenesisDocFromFile(config.GenesisFile())
if err != nil {
tmos.Exit(err.Error())
}
state, err := sm.MakeGenesisState(gdoc)
if err != nil {
tmos.Exit(err.Error())
}
// Create proxyAppConn connection (consensus, mempool, query)
clientCreator := proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir())
proxyApp := proxy.NewAppConns(clientCreator)
err = proxyApp.Start()
if err != nil {
tmos.Exit(fmt.Sprintf("Error starting proxy app conns: %v", err))
}
eventBus := types.NewEventBus()
if err := eventBus.Start(); err != nil {
tmos.Exit(fmt.Sprintf("Failed to start event bus: %v", err))
}
handshaker := NewHandshaker(stateStore, state, blockStore, gdoc)
handshaker.SetEventBus(eventBus)
err = handshaker.Handshake(proxyApp)
if err != nil {
tmos.Exit(fmt.Sprintf("Error on handshake: %v", err))
}
mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
consensusState := NewState(csConfig, state.Copy(), blockExec,
blockStore, mempool, evpool, map[int64]Misbehavior{})
consensusState.SetEventBus(eventBus)
return consensusState
}

+ 90
- 0
test/maverick/consensus/replay_stubs.go View File

@ -0,0 +1,90 @@
package consensus
import (
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/clist"
mempl "github.com/tendermint/tendermint/mempool"
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
)
//-----------------------------------------------------------------------------
type emptyMempool struct{}
var _ mempl.Mempool = emptyMempool{}
func (emptyMempool) Lock() {}
func (emptyMempool) Unlock() {}
func (emptyMempool) Size() int { return 0 }
func (emptyMempool) CheckTx(_ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error {
return nil
}
func (emptyMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
func (emptyMempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} }
func (emptyMempool) Update(
_ int64,
_ types.Txs,
_ []*abci.ResponseDeliverTx,
_ mempl.PreCheckFunc,
_ mempl.PostCheckFunc,
) error {
return nil
}
func (emptyMempool) Flush() {}
func (emptyMempool) FlushAppConn() error { return nil }
func (emptyMempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) }
func (emptyMempool) EnableTxsAvailable() {}
func (emptyMempool) TxsBytes() int64 { return 0 }
func (emptyMempool) TxsFront() *clist.CElement { return nil }
func (emptyMempool) TxsWaitChan() <-chan struct{} { return nil }
func (emptyMempool) InitWAL() error { return nil }
func (emptyMempool) CloseWAL() {}
//-----------------------------------------------------------------------------
// mockProxyApp uses ABCIResponses to give the right results.
//
// Useful because we don't want to call Commit() twice for the same block on
// the real app.
func newMockProxyApp(appHash []byte, abciResponses *tmstate.ABCIResponses) proxy.AppConnConsensus {
clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{
appHash: appHash,
abciResponses: abciResponses,
})
cli, _ := clientCreator.NewABCIClient()
err := cli.Start()
if err != nil {
panic(err)
}
return proxy.NewAppConnConsensus(cli)
}
type mockProxyApp struct {
abci.BaseApplication
appHash []byte
txCount int
abciResponses *tmstate.ABCIResponses
}
func (mock *mockProxyApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
r := mock.abciResponses.DeliverTxs[mock.txCount]
mock.txCount++
if r == nil {
return abci.ResponseDeliverTx{}
}
return *r
}
func (mock *mockProxyApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock {
mock.txCount = 0
return *mock.abciResponses.EndBlock
}
func (mock *mockProxyApp) Commit() abci.ResponseCommit {
return abci.ResponseCommit{Data: mock.appHash}
}

+ 1976
- 0
test/maverick/consensus/state.go
File diff suppressed because it is too large
View File


+ 134
- 0
test/maverick/consensus/ticker.go View File

@ -0,0 +1,134 @@
package consensus
import (
"time"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
)
var (
tickTockBufferSize = 10
)
// TimeoutTicker is a timer that schedules timeouts
// conditional on the height/round/step in the timeoutInfo.
// The timeoutInfo.Duration may be non-positive.
type TimeoutTicker interface {
Start() error
Stop() error
Chan() <-chan timeoutInfo // on which to receive a timeout
ScheduleTimeout(ti timeoutInfo) // reset the timer
SetLogger(log.Logger)
}
// timeoutTicker wraps time.Timer,
// scheduling timeouts only for greater height/round/step
// than what it's already seen.
// Timeouts are scheduled along the tickChan,
// and fired on the tockChan.
type timeoutTicker struct {
service.BaseService
timer *time.Timer
tickChan chan timeoutInfo // for scheduling timeouts
tockChan chan timeoutInfo // for notifying about them
}
// NewTimeoutTicker returns a new TimeoutTicker.
func NewTimeoutTicker() TimeoutTicker {
tt := &timeoutTicker{
timer: time.NewTimer(0),
tickChan: make(chan timeoutInfo, tickTockBufferSize),
tockChan: make(chan timeoutInfo, tickTockBufferSize),
}
tt.BaseService = *service.NewBaseService(nil, "TimeoutTicker", tt)
tt.stopTimer() // don't want to fire until the first scheduled timeout
return tt
}
// OnStart implements service.Service. It starts the timeout routine.
func (t *timeoutTicker) OnStart() error {
go t.timeoutRoutine()
return nil
}
// OnStop implements service.Service. It stops the timeout routine.
func (t *timeoutTicker) OnStop() {
t.BaseService.OnStop()
t.stopTimer()
}
// Chan returns a channel on which timeouts are sent.
func (t *timeoutTicker) Chan() <-chan timeoutInfo {
return t.tockChan
}
// ScheduleTimeout schedules a new timeout by sending on the internal tickChan.
// The timeoutRoutine is always available to read from tickChan, so this won't block.
// The scheduling may fail if the timeoutRoutine has already scheduled a timeout for a later height/round/step.
func (t *timeoutTicker) ScheduleTimeout(ti timeoutInfo) {
t.tickChan <- ti
}
//-------------------------------------------------------------
// stop the timer and drain if necessary
func (t *timeoutTicker) stopTimer() {
// Stop() returns false if it was already fired or was stopped
if !t.timer.Stop() {
select {
case <-t.timer.C:
default:
t.Logger.Debug("Timer already stopped")
}
}
}
// send on tickChan to start a new timer.
// timers are interupted and replaced by new ticks from later steps
// timeouts of 0 on the tickChan will be immediately relayed to the tockChan
func (t *timeoutTicker) timeoutRoutine() {
t.Logger.Debug("Starting timeout routine")
var ti timeoutInfo
for {
select {
case newti := <-t.tickChan:
t.Logger.Debug("Received tick", "old_ti", ti, "new_ti", newti)
// ignore tickers for old height/round/step
if newti.Height < ti.Height {
continue
} else if newti.Height == ti.Height {
if newti.Round < ti.Round {
continue
} else if newti.Round == ti.Round {
if ti.Step > 0 && newti.Step <= ti.Step {
continue
}
}
}
// stop the last timer
t.stopTimer()
// update timeoutInfo and reset timer
// NOTE time.Timer allows duration to be non-positive
ti = newti
t.timer.Reset(ti.Duration)
t.Logger.Debug("Scheduled timeout", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
case <-t.timer.C:
t.Logger.Info("Timed out", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
// go routine here guarantees timeoutRoutine doesn't block.
// Determinism comes from playback in the receiveRoutine.
// We can eliminate it by merging the timeoutRoutine into receiveRoutine
// and managing the timeouts ourselves with a millisecond ticker
go func(toi timeoutInfo) { t.tockChan <- toi }(ti)
case <-t.Quit():
return
}
}
}

+ 437
- 0
test/maverick/consensus/wal.go View File

@ -0,0 +1,437 @@
package consensus
import (
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
"io"
"path/filepath"
"time"
"github.com/gogo/protobuf/proto"
auto "github.com/tendermint/tendermint/libs/autofile"
// tmjson "github.com/tendermint/tendermint/libs/json"
"github.com/tendermint/tendermint/libs/log"
tmos "github.com/tendermint/tendermint/libs/os"
"github.com/tendermint/tendermint/libs/service"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmtime "github.com/tendermint/tendermint/types/time"
)
const (
// time.Time + max consensus msg size
maxMsgSizeBytes = maxMsgSize + 24
// how often the WAL should be sync'd during period sync'ing
walDefaultFlushInterval = 2 * time.Second
)
//--------------------------------------------------------
// types and functions for savings consensus messages
// TimedWALMessage wraps WALMessage and adds Time for debugging purposes.
type TimedWALMessage struct {
Time time.Time `json:"time"`
Msg WALMessage `json:"msg"`
}
// EndHeightMessage marks the end of the given height inside WAL.
// @internal used by scripts/wal2json util.
type EndHeightMessage struct {
Height int64 `json:"height"`
}
type WALMessage interface{}
// func init() {
// tmjson.RegisterType(msgInfo{}, "tendermint/wal/MsgInfo")
// tmjson.RegisterType(timeoutInfo{}, "tendermint/wal/TimeoutInfo")
// tmjson.RegisterType(EndHeightMessage{}, "tendermint/wal/EndHeightMessage")
// }
//--------------------------------------------------------
// Simple write-ahead logger
// WAL is an interface for any write-ahead logger.
type WAL interface {
Write(WALMessage) error
WriteSync(WALMessage) error
FlushAndSync() error
SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error)
// service methods
Start() error
Stop() error
Wait()
}
// Write ahead logger writes msgs to disk before they are processed.
// Can be used for crash-recovery and deterministic replay.
// TODO: currently the wal is overwritten during replay catchup, give it a mode
// so it's either reading or appending - must read to end to start appending
// again.
type BaseWAL struct {
service.BaseService
group *auto.Group
enc *WALEncoder
flushTicker *time.Ticker
flushInterval time.Duration
}
var _ WAL = &BaseWAL{}
// NewWAL returns a new write-ahead logger based on `baseWAL`, which implements
// WAL. It's flushed and synced to disk every 2s and once when stopped.
func NewWAL(walFile string, groupOptions ...func(*auto.Group)) (*BaseWAL, error) {
err := tmos.EnsureDir(filepath.Dir(walFile), 0700)
if err != nil {
return nil, fmt.Errorf("failed to ensure WAL directory is in place: %w", err)
}
group, err := auto.OpenGroup(walFile, groupOptions...)
if err != nil {
return nil, err
}
wal := &BaseWAL{
group: group,
enc: NewWALEncoder(group),
flushInterval: walDefaultFlushInterval,
}
wal.BaseService = *service.NewBaseService(nil, "baseWAL", wal)
return wal, nil
}
// SetFlushInterval allows us to override the periodic flush interval for the WAL.
func (wal *BaseWAL) SetFlushInterval(i time.Duration) {
wal.flushInterval = i
}
func (wal *BaseWAL) Group() *auto.Group {
return wal.group
}
func (wal *BaseWAL) SetLogger(l log.Logger) {
wal.BaseService.Logger = l
wal.group.SetLogger(l)
}
func (wal *BaseWAL) OnStart() error {
size, err := wal.group.Head.Size()
if err != nil {
return err
} else if size == 0 {
if err := wal.WriteSync(EndHeightMessage{0}); err != nil {
return err
}
}
err = wal.group.Start()
if err != nil {
return err
}
wal.flushTicker = time.NewTicker(wal.flushInterval)
go wal.processFlushTicks()
return nil
}
func (wal *BaseWAL) processFlushTicks() {
for {
select {
case <-wal.flushTicker.C:
if err := wal.FlushAndSync(); err != nil {
wal.Logger.Error("Periodic WAL flush failed", "err", err)
}
case <-wal.Quit():
return
}
}
}
// FlushAndSync flushes and fsync's the underlying group's data to disk.
// See auto#FlushAndSync
func (wal *BaseWAL) FlushAndSync() error {
return wal.group.FlushAndSync()
}
// Stop the underlying autofile group.
// Use Wait() to ensure it's finished shutting down
// before cleaning up files.
func (wal *BaseWAL) OnStop() {
wal.flushTicker.Stop()
if err := wal.FlushAndSync(); err != nil {
wal.Logger.Error("error on flush data to disk", "error", err)
}
if err := wal.group.Stop(); err != nil {
wal.Logger.Error("error trying to stop wal", "error", err)
}
wal.group.Close()
}
// Wait for the underlying autofile group to finish shutting down
// so it's safe to cleanup files.
func (wal *BaseWAL) Wait() {
wal.group.Wait()
}
// Write is called in newStep and for each receive on the
// peerMsgQueue and the timeoutTicker.
// NOTE: does not call fsync()
func (wal *BaseWAL) Write(msg WALMessage) error {
if wal == nil {
return nil
}
if err := wal.enc.Encode(&TimedWALMessage{tmtime.Now(), msg}); err != nil {
wal.Logger.Error("Error writing msg to consensus wal. WARNING: recover may not be possible for the current height",
"err", err, "msg", msg)
return err
}
return nil
}
// WriteSync is called when we receive a msg from ourselves
// so that we write to disk before sending signed messages.
// NOTE: calls fsync()
func (wal *BaseWAL) WriteSync(msg WALMessage) error {
if wal == nil {
return nil
}
if err := wal.Write(msg); err != nil {
return err
}
if err := wal.FlushAndSync(); err != nil {
wal.Logger.Error(`WriteSync failed to flush consensus wal.
WARNING: may result in creating alternative proposals / votes for the current height iff the node restarted`,
"err", err)
return err
}
return nil
}
// WALSearchOptions are optional arguments to SearchForEndHeight.
type WALSearchOptions struct {
// IgnoreDataCorruptionErrors set to true will result in skipping data corruption errors.
IgnoreDataCorruptionErrors bool
}
// SearchForEndHeight searches for the EndHeightMessage with the given height
// and returns an auto.GroupReader, whenever it was found or not and an error.
// Group reader will be nil if found equals false.
//
// CONTRACT: caller must close group reader.
func (wal *BaseWAL) SearchForEndHeight(
height int64,
options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
var (
msg *TimedWALMessage
gr *auto.GroupReader
)
lastHeightFound := int64(-1)
// NOTE: starting from the last file in the group because we're usually
// searching for the last height. See replay.go
min, max := wal.group.MinIndex(), wal.group.MaxIndex()
wal.Logger.Info("Searching for height", "height", height, "min", min, "max", max)
for index := max; index >= min; index-- {
gr, err = wal.group.NewReader(index)
if err != nil {
return nil, false, err
}
dec := NewWALDecoder(gr)
for {
msg, err = dec.Decode()
if err == io.EOF {
// OPTIMISATION: no need to look for height in older files if we've seen h < height
if lastHeightFound > 0 && lastHeightFound < height {
gr.Close()
return nil, false, nil
}
// check next file
break
}
if options.IgnoreDataCorruptionErrors && IsDataCorruptionError(err) {
wal.Logger.Error("Corrupted entry. Skipping...", "err", err)
// do nothing
continue
} else if err != nil {
gr.Close()
return nil, false, err
}
if m, ok := msg.Msg.(EndHeightMessage); ok {
lastHeightFound = m.Height
if m.Height == height { // found
wal.Logger.Info("Found", "height", height, "index", index)
return gr, true, nil
}
}
}
gr.Close()
}
return nil, false, nil
}
// /////////////////////////////////////////////////////////////////////////////
// A WALEncoder writes custom-encoded WAL messages to an output stream.
//
// Format: 4 bytes CRC sum + 4 bytes length + arbitrary-length value
type WALEncoder struct {
wr io.Writer
}
// NewWALEncoder returns a new encoder that writes to wr.
func NewWALEncoder(wr io.Writer) *WALEncoder {
return &WALEncoder{wr}
}
// Encode writes the custom encoding of v to the stream. It returns an error if
// the encoded size of v is greater than 1MB. Any error encountered
// during the write is also returned.
func (enc *WALEncoder) Encode(v *TimedWALMessage) error {
pbMsg, err := WALToProto(v.Msg)
if err != nil {
return err
}
pv := tmcons.TimedWALMessage{
Time: v.Time,
Msg: pbMsg,
}
data, err := proto.Marshal(&pv)
if err != nil {
panic(fmt.Errorf("encode timed wall message failure: %w", err))
}
crc := crc32.Checksum(data, crc32c)
length := uint32(len(data))
if length > maxMsgSizeBytes {
return fmt.Errorf("msg is too big: %d bytes, max: %d bytes", length, maxMsgSizeBytes)
}
totalLength := 8 + int(length)
msg := make([]byte, totalLength)
binary.BigEndian.PutUint32(msg[0:4], crc)
binary.BigEndian.PutUint32(msg[4:8], length)
copy(msg[8:], data)
_, err = enc.wr.Write(msg)
return err
}
// /////////////////////////////////////////////////////////////////////////////
// IsDataCorruptionError returns true if data has been corrupted inside WAL.
func IsDataCorruptionError(err error) bool {
_, ok := err.(DataCorruptionError)
return ok
}
// DataCorruptionError is an error that occures if data on disk was corrupted.
type DataCorruptionError struct {
cause error
}
func (e DataCorruptionError) Error() string {
return fmt.Sprintf("DataCorruptionError[%v]", e.cause)
}
func (e DataCorruptionError) Cause() error {
return e.cause
}
// A WALDecoder reads and decodes custom-encoded WAL messages from an input
// stream. See WALEncoder for the format used.
//
// It will also compare the checksums and make sure data size is equal to the
// length from the header. If that is not the case, error will be returned.
type WALDecoder struct {
rd io.Reader
}
// NewWALDecoder returns a new decoder that reads from rd.
func NewWALDecoder(rd io.Reader) *WALDecoder {
return &WALDecoder{rd}
}
// Decode reads the next custom-encoded value from its reader and returns it.
func (dec *WALDecoder) Decode() (*TimedWALMessage, error) {
b := make([]byte, 4)
_, err := dec.rd.Read(b)
if errors.Is(err, io.EOF) {
return nil, err
}
if err != nil {
return nil, DataCorruptionError{fmt.Errorf("failed to read checksum: %v", err)}
}
crc := binary.BigEndian.Uint32(b)
b = make([]byte, 4)
_, err = dec.rd.Read(b)
if err != nil {
return nil, DataCorruptionError{fmt.Errorf("failed to read length: %v", err)}
}
length := binary.BigEndian.Uint32(b)
if length > maxMsgSizeBytes {
return nil, DataCorruptionError{fmt.Errorf(
"length %d exceeded maximum possible value of %d bytes",
length,
maxMsgSizeBytes)}
}
data := make([]byte, length)
n, err := dec.rd.Read(data)
if err != nil {
return nil, DataCorruptionError{fmt.Errorf("failed to read data: %v (read: %d, wanted: %d)", err, n, length)}
}
// check checksum before decoding data
actualCRC := crc32.Checksum(data, crc32c)
if actualCRC != crc {
return nil, DataCorruptionError{fmt.Errorf("checksums do not match: read: %v, actual: %v", crc, actualCRC)}
}
var res = new(tmcons.TimedWALMessage)
err = proto.Unmarshal(data, res)
if err != nil {
return nil, DataCorruptionError{fmt.Errorf("failed to decode data: %v", err)}
}
walMsg, err := WALFromProto(res.Msg)
if err != nil {
return nil, DataCorruptionError{fmt.Errorf("failed to convert from proto: %w", err)}
}
tMsgWal := &TimedWALMessage{
Time: res.Time,
Msg: walMsg,
}
return tMsgWal, err
}
type nilWAL struct{}
var _ WAL = nilWAL{}
func (nilWAL) Write(m WALMessage) error { return nil }
func (nilWAL) WriteSync(m WALMessage) error { return nil }
func (nilWAL) FlushAndSync() error { return nil }
func (nilWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
return nil, false, nil
}
func (nilWAL) Start() error { return nil }
func (nilWAL) Stop() error { return nil }
func (nilWAL) Wait() {}

+ 31
- 0
test/maverick/consensus/wal_fuzz.go View File

@ -0,0 +1,31 @@
// +build gofuzz
package consensus
import (
"bytes"
"io"
)
func Fuzz(data []byte) int {
dec := NewWALDecoder(bytes.NewReader(data))
for {
msg, err := dec.Decode()
if err == io.EOF {
break
}
if err != nil {
if msg != nil {
panic("msg != nil on error")
}
return 0
}
var w bytes.Buffer
enc := NewWALEncoder(&w)
err = enc.Encode(msg)
if err != nil {
panic(err)
}
}
return 1
}

+ 229
- 0
test/maverick/consensus/wal_generator.go View File

@ -0,0 +1,229 @@
package consensus
import (
"bufio"
"bytes"
"fmt"
"io"
"path/filepath"
"testing"
"time"
db "github.com/tendermint/tm-db"
"github.com/tendermint/tendermint/abci/example/kvstore"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/privval"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
)
// WALGenerateNBlocks generates a consensus WAL. It does this by spinning up a
// stripped down version of node (proxy app, event bus, consensus state) with a
// persistent kvstore application and special consensus wal instance
// (byteBufferWAL) and waits until numBlocks are created.
// If the node fails to produce given numBlocks, it returns an error.
func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
config := getConfig(t)
app := kvstore.NewPersistentKVStoreApplication(filepath.Join(config.DBDir(), "wal_generator"))
logger := log.TestingLogger().With("wal_generator", "wal_generator")
logger.Info("generating WAL (last height msg excluded)", "numBlocks", numBlocks)
// ///////////////////////////////////////////////////////////////////////////
// COPY PASTE FROM node.go WITH A FEW MODIFICATIONS
// NOTE: we can't import node package because of circular dependency.
// NOTE: we don't do handshake so need to set state.Version.Consensus.App directly.
privValidatorKeyFile := config.PrivValidatorKeyFile()
privValidatorStateFile := config.PrivValidatorStateFile()
privValidator := privval.LoadOrGenFilePV(privValidatorKeyFile, privValidatorStateFile)
genDoc, err := types.GenesisDocFromFile(config.GenesisFile())
if err != nil {
return fmt.Errorf("failed to read genesis file: %w", err)
}
blockStoreDB := db.NewMemDB()
stateDB := blockStoreDB
stateStore := sm.NewStore(stateDB)
state, err := sm.MakeGenesisState(genDoc)
if err != nil {
return fmt.Errorf("failed to make genesis state: %w", err)
}
state.Version.Consensus.App = kvstore.ProtocolVersion
if err = stateStore.Save(state); err != nil {
t.Error(err)
}
blockStore := store.NewBlockStore(blockStoreDB)
proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app))
proxyApp.SetLogger(logger.With("module", "proxy"))
if err := proxyApp.Start(); err != nil {
return fmt.Errorf("failed to start proxy app connections: %w", err)
}
t.Cleanup(func() {
if err := proxyApp.Stop(); err != nil {
t.Error(err)
}
})
eventBus := types.NewEventBus()
eventBus.SetLogger(logger.With("module", "events"))
if err := eventBus.Start(); err != nil {
return fmt.Errorf("failed to start event bus: %w", err)
}
t.Cleanup(func() {
if err := eventBus.Stop(); err != nil {
t.Error(err)
}
})
mempool := emptyMempool{}
evpool := sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
consensusState := NewState(config.Consensus, state.Copy(),
blockExec, blockStore, mempool, evpool, map[int64]Misbehavior{})
consensusState.SetLogger(logger)
consensusState.SetEventBus(eventBus)
if privValidator != nil {
consensusState.SetPrivValidator(privValidator)
}
// END OF COPY PASTE
// ///////////////////////////////////////////////////////////////////////////
// set consensus wal to buffered WAL, which will write all incoming msgs to buffer
numBlocksWritten := make(chan struct{})
wal := newByteBufferWAL(logger, NewWALEncoder(wr), int64(numBlocks), numBlocksWritten)
// see wal.go#103
if err := wal.Write(EndHeightMessage{0}); err != nil {
t.Error(err)
}
consensusState.wal = wal
if err := consensusState.Start(); err != nil {
return fmt.Errorf("failed to start consensus state: %w", err)
}
select {
case <-numBlocksWritten:
if err := consensusState.Stop(); err != nil {
t.Error(err)
}
return nil
case <-time.After(1 * time.Minute):
if err := consensusState.Stop(); err != nil {
t.Error(err)
}
return fmt.Errorf("waited too long for tendermint to produce %d blocks (grep logs for `wal_generator`)", numBlocks)
}
}
// WALWithNBlocks returns a WAL content with numBlocks.
func WALWithNBlocks(t *testing.T, numBlocks int) (data []byte, err error) {
var b bytes.Buffer
wr := bufio.NewWriter(&b)
if err := WALGenerateNBlocks(t, wr, numBlocks); err != nil {
return []byte{}, err
}
wr.Flush()
return b.Bytes(), nil
}
func randPort() int {
// returns between base and base + spread
base, spread := 20000, 20000
return base + tmrand.Intn(spread)
}
func makeAddrs() (string, string, string) {
start := randPort()
return fmt.Sprintf("tcp://127.0.0.1:%d", start),
fmt.Sprintf("tcp://127.0.0.1:%d", start+1),
fmt.Sprintf("tcp://127.0.0.1:%d", start+2)
}
// getConfig returns a config for test cases
func getConfig(t *testing.T) *cfg.Config {
c := cfg.ResetTestRoot(t.Name())
// and we use random ports to run in parallel
tm, rpc, grpc := makeAddrs()
c.P2P.ListenAddress = tm
c.RPC.ListenAddress = rpc
c.RPC.GRPCListenAddress = grpc
return c
}
// byteBufferWAL is a WAL which writes all msgs to a byte buffer. Writing stops
// when the heightToStop is reached. Client will be notified via
// signalWhenStopsTo channel.
type byteBufferWAL struct {
enc *WALEncoder
stopped bool
heightToStop int64
signalWhenStopsTo chan<- struct{}
logger log.Logger
}
// needed for determinism
var fixedTime, _ = time.Parse(time.RFC3339, "2017-01-02T15:04:05Z")
func newByteBufferWAL(logger log.Logger, enc *WALEncoder, nBlocks int64, signalStop chan<- struct{}) *byteBufferWAL {
return &byteBufferWAL{
enc: enc,
heightToStop: nBlocks,
signalWhenStopsTo: signalStop,
logger: logger,
}
}
// Save writes message to the internal buffer except when heightToStop is
// reached, in which case it will signal the caller via signalWhenStopsTo and
// skip writing.
func (w *byteBufferWAL) Write(m WALMessage) error {
if w.stopped {
w.logger.Debug("WAL already stopped. Not writing message", "msg", m)
return nil
}
if endMsg, ok := m.(EndHeightMessage); ok {
w.logger.Debug("WAL write end height message", "height", endMsg.Height, "stopHeight", w.heightToStop)
if endMsg.Height == w.heightToStop {
w.logger.Debug("Stopping WAL at height", "height", endMsg.Height)
w.signalWhenStopsTo <- struct{}{}
w.stopped = true
return nil
}
}
w.logger.Debug("WAL Write Message", "msg", m)
err := w.enc.Encode(&TimedWALMessage{fixedTime, m})
if err != nil {
panic(fmt.Sprintf("failed to encode the msg %v", m))
}
return nil
}
func (w *byteBufferWAL) WriteSync(m WALMessage) error {
return w.Write(m)
}
func (w *byteBufferWAL) FlushAndSync() error { return nil }
func (w *byteBufferWAL) SearchForEndHeight(
height int64,
options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
return nil, false, nil
}
func (w *byteBufferWAL) Start() error { return nil }
func (w *byteBufferWAL) Stop() error { return nil }
func (w *byteBufferWAL) Wait() {}

+ 237
- 0
test/maverick/main.go View File

@ -0,0 +1,237 @@
package main
import (
"fmt"
"os"
"path/filepath"
"github.com/spf13/cobra"
"github.com/spf13/viper"
cmd "github.com/tendermint/tendermint/cmd/tendermint/commands"
"github.com/tendermint/tendermint/cmd/tendermint/commands/debug"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/cli"
tmflags "github.com/tendermint/tendermint/libs/cli/flags"
"github.com/tendermint/tendermint/libs/log"
tmos "github.com/tendermint/tendermint/libs/os"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/p2p"
cs "github.com/tendermint/tendermint/test/maverick/consensus"
nd "github.com/tendermint/tendermint/test/maverick/node"
"github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"
)
var (
config = cfg.DefaultConfig()
logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout))
misbehaviorFlag = ""
)
func init() {
registerFlagsRootCmd(RootCmd)
}
func registerFlagsRootCmd(command *cobra.Command) {
command.PersistentFlags().String("log_level", config.LogLevel, "Log level")
}
func ParseConfig() (*cfg.Config, error) {
conf := cfg.DefaultConfig()
err := viper.Unmarshal(conf)
if err != nil {
return nil, err
}
conf.SetRoot(conf.RootDir)
cfg.EnsureRoot(conf.RootDir)
if err = conf.ValidateBasic(); err != nil {
return nil, fmt.Errorf("error in config file: %v", err)
}
return conf, err
}
// RootCmd is the root command for Tendermint core.
var RootCmd = &cobra.Command{
Use: "maverick",
Short: "Tendermint Maverick Node",
Long: "Tendermint Maverick Node for testing with faulty consensus misbehaviors in a testnet. Contains " +
"all the functionality of a normal node but custom misbehaviors can be injected when running the node " +
"through a flag. See maverick node --help for how the misbehavior flag is constructured",
PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) {
fmt.Printf("use: %v, args: %v", cmd.Use, cmd.Args)
config, err = ParseConfig()
if err != nil {
return err
}
if config.LogFormat == cfg.LogFormatJSON {
logger = log.NewTMJSONLogger(log.NewSyncWriter(os.Stdout))
}
logger, err = tmflags.ParseLogLevel(config.LogLevel, logger, cfg.DefaultLogLevel())
if err != nil {
return err
}
if viper.GetBool(cli.TraceFlag) {
logger = log.NewTracingLogger(logger)
}
logger = logger.With("module", "main")
return nil
},
}
func main() {
rootCmd := RootCmd
rootCmd.AddCommand(
ListMisbehaviorCmd,
cmd.GenValidatorCmd,
InitFilesCmd,
cmd.ProbeUpnpCmd,
cmd.ReplayCmd,
cmd.ReplayConsoleCmd,
cmd.ResetAllCmd,
cmd.ResetPrivValidatorCmd,
cmd.ShowValidatorCmd,
cmd.ShowNodeIDCmd,
cmd.GenNodeKeyCmd,
cmd.VersionCmd,
debug.DebugCmd,
cli.NewCompletionCmd(rootCmd, true),
)
nodeCmd := &cobra.Command{
Use: "node",
Short: "Run the maverick node",
RunE: func(command *cobra.Command, args []string) error {
return startNode(config, logger, misbehaviorFlag)
},
}
cmd.AddNodeFlags(nodeCmd)
// Create & start node
rootCmd.AddCommand(nodeCmd)
// add special flag for misbehaviors
nodeCmd.Flags().StringVar(
&misbehaviorFlag,
"misbehaviors",
"",
"Select the misbehaviors of the node (comma-separated, no spaces in between): \n"+
"e.g. --misbehaviors double-prevote,3\n"+
"You can also have multiple misbehaviors: e.g. double-prevote,3,no-vote,5")
cmd := cli.PrepareBaseCmd(rootCmd, "TM", os.ExpandEnv(filepath.Join("$HOME", cfg.DefaultTendermintDir)))
if err := cmd.Execute(); err != nil {
panic(err)
}
}
func startNode(config *cfg.Config, logger log.Logger, misbehaviorFlag string) error {
misbehaviors, err := nd.ParseMisbehaviors(misbehaviorFlag)
if err != nil {
return err
}
node, err := nd.DefaultNewNode(config, logger, misbehaviors)
if err != nil {
return fmt.Errorf("failed to create node: %w", err)
}
if err := node.Start(); err != nil {
return fmt.Errorf("failed to start node: %w", err)
}
logger.Info("Started node", "nodeInfo", node.Switch().NodeInfo())
// Stop upon receiving SIGTERM or CTRL-C.
tmos.TrapSignal(logger, func() {
if node.IsRunning() {
if err := node.Stop(); err != nil {
logger.Error("unable to stop the node", "error", err)
}
}
})
// Run forever.
select {}
}
var InitFilesCmd = &cobra.Command{
Use: "init",
Short: "Initialize Tendermint",
RunE: initFiles,
}
func initFiles(cmd *cobra.Command, args []string) error {
return initFilesWithConfig(config)
}
func initFilesWithConfig(config *cfg.Config) error {
// private validator
privValKeyFile := config.PrivValidatorKeyFile()
privValStateFile := config.PrivValidatorStateFile()
var pv *nd.FilePV
if tmos.FileExists(privValKeyFile) {
pv = nd.LoadFilePV(privValKeyFile, privValStateFile)
logger.Info("Found private validator", "keyFile", privValKeyFile,
"stateFile", privValStateFile)
} else {
pv = nd.GenFilePV(privValKeyFile, privValStateFile)
pv.Save()
logger.Info("Generated private validator", "keyFile", privValKeyFile,
"stateFile", privValStateFile)
}
nodeKeyFile := config.NodeKeyFile()
if tmos.FileExists(nodeKeyFile) {
logger.Info("Found node key", "path", nodeKeyFile)
} else {
if _, err := p2p.LoadOrGenNodeKey(nodeKeyFile); err != nil {
return err
}
logger.Info("Generated node key", "path", nodeKeyFile)
}
// genesis file
genFile := config.GenesisFile()
if tmos.FileExists(genFile) {
logger.Info("Found genesis file", "path", genFile)
} else {
genDoc := types.GenesisDoc{
ChainID: fmt.Sprintf("test-chain-%v", tmrand.Str(6)),
GenesisTime: tmtime.Now(),
ConsensusParams: types.DefaultConsensusParams(),
}
pubKey, err := pv.GetPubKey()
if err != nil {
return fmt.Errorf("can't get pubkey: %w", err)
}
genDoc.Validators = []types.GenesisValidator{{
Address: pubKey.Address(),
PubKey: pubKey,
Power: 10,
}}
if err := genDoc.SaveAs(genFile); err != nil {
return err
}
logger.Info("Generated genesis file", "path", genFile)
}
return nil
}
var ListMisbehaviorCmd = &cobra.Command{
Use: "misbehaviors",
Short: "Lists possible misbehaviors",
RunE: listMisbehaviors,
}
func listMisbehaviors(cmd *cobra.Command, args []string) error {
str := "Currently registered misbehaviors: \n"
for key := range cs.MisbehaviorList {
str += fmt.Sprintf("- %s\n", key)
}
fmt.Println(str)
return nil
}

+ 1440
- 0
test/maverick/node/node.go
File diff suppressed because it is too large
View File


+ 358
- 0
test/maverick/node/privval.go View File

@ -0,0 +1,358 @@
package node
import (
"errors"
"fmt"
"io/ioutil"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/ed25519"
tmbytes "github.com/tendermint/tendermint/libs/bytes"
tmjson "github.com/tendermint/tendermint/libs/json"
tmos "github.com/tendermint/tendermint/libs/os"
"github.com/tendermint/tendermint/libs/tempfile"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/types"
)
// *******************************************************************************************************************
//
// WARNING: FOR TESTING ONLY. DO NOT USE THIS FILE OUTSIDE MAVERICK
//
// *******************************************************************************************************************
const (
stepNone int8 = 0 // Used to distinguish the initial state
stepPropose int8 = 1
stepPrevote int8 = 2
stepPrecommit int8 = 3
)
// A vote is either stepPrevote or stepPrecommit.
func voteToStep(vote *tmproto.Vote) int8 {
switch vote.Type {
case tmproto.PrevoteType:
return stepPrevote
case tmproto.PrecommitType:
return stepPrecommit
default:
panic(fmt.Sprintf("Unknown vote type: %v", vote.Type))
}
}
//-------------------------------------------------------------------------------
// FilePVKey stores the immutable part of PrivValidator.
type FilePVKey struct {
Address types.Address `json:"address"`
PubKey crypto.PubKey `json:"pub_key"`
PrivKey crypto.PrivKey `json:"priv_key"`
filePath string
}
// Save persists the FilePVKey to its filePath.
func (pvKey FilePVKey) Save() {
outFile := pvKey.filePath
if outFile == "" {
panic("cannot save PrivValidator key: filePath not set")
}
jsonBytes, err := tmjson.MarshalIndent(pvKey, "", " ")
if err != nil {
panic(err)
}
err = tempfile.WriteFileAtomic(outFile, jsonBytes, 0600)
if err != nil {
panic(err)
}
}
//-------------------------------------------------------------------------------
// FilePVLastSignState stores the mutable part of PrivValidator.
type FilePVLastSignState struct {
Height int64 `json:"height"`
Round int32 `json:"round"`
Step int8 `json:"step"`
Signature []byte `json:"signature,omitempty"`
SignBytes tmbytes.HexBytes `json:"signbytes,omitempty"`
filePath string
}
// CheckHRS checks the given height, round, step (HRS) against that of the
// FilePVLastSignState. It returns an error if the arguments constitute a regression,
// or if they match but the SignBytes are empty.
// The returned boolean indicates whether the last Signature should be reused -
// it returns true if the HRS matches the arguments and the SignBytes are not empty (indicating
// we have already signed for this HRS, and can reuse the existing signature).
// It panics if the HRS matches the arguments, there's a SignBytes, but no Signature.
func (lss *FilePVLastSignState) CheckHRS(height int64, round int32, step int8) (bool, error) {
if lss.Height > height {
return false, fmt.Errorf("height regression. Got %v, last height %v", height, lss.Height)
}
if lss.Height == height {
if lss.Round > round {
return false, fmt.Errorf("round regression at height %v. Got %v, last round %v", height, round, lss.Round)
}
if lss.Round == round {
if lss.Step > step {
return false, fmt.Errorf(
"step regression at height %v round %v. Got %v, last step %v",
height,
round,
step,
lss.Step,
)
} else if lss.Step == step {
if lss.SignBytes != nil {
if lss.Signature == nil {
panic("pv: Signature is nil but SignBytes is not!")
}
return true, nil
}
return false, errors.New("no SignBytes found")
}
}
}
return false, nil
}
// Save persists the FilePvLastSignState to its filePath.
func (lss *FilePVLastSignState) Save() {
outFile := lss.filePath
if outFile == "" {
panic("cannot save FilePVLastSignState: filePath not set")
}
jsonBytes, err := tmjson.MarshalIndent(lss, "", " ")
if err != nil {
panic(err)
}
err = tempfile.WriteFileAtomic(outFile, jsonBytes, 0600)
if err != nil {
panic(err)
}
}
//-------------------------------------------------------------------------------
// FilePV implements PrivValidator using data persisted to disk
// to prevent double signing.
// NOTE: the directories containing pv.Key.filePath and pv.LastSignState.filePath must already exist.
// It includes the LastSignature and LastSignBytes so we don't lose the signature
// if the process crashes after signing but before the resulting consensus message is processed.
type FilePV struct {
Key FilePVKey
LastSignState FilePVLastSignState
}
// GenFilePV generates a new validator with randomly generated private key
// and sets the filePaths, but does not call Save().
func GenFilePV(keyFilePath, stateFilePath string) *FilePV {
privKey := ed25519.GenPrivKey()
return &FilePV{
Key: FilePVKey{
Address: privKey.PubKey().Address(),
PubKey: privKey.PubKey(),
PrivKey: privKey,
filePath: keyFilePath,
},
LastSignState: FilePVLastSignState{
Step: stepNone,
filePath: stateFilePath,
},
}
}
// LoadFilePV loads a FilePV from the filePaths. The FilePV handles double
// signing prevention by persisting data to the stateFilePath. If either file path
// does not exist, the program will exit.
func LoadFilePV(keyFilePath, stateFilePath string) *FilePV {
return loadFilePV(keyFilePath, stateFilePath, true)
}
// LoadFilePVEmptyState loads a FilePV from the given keyFilePath, with an empty LastSignState.
// If the keyFilePath does not exist, the program will exit.
func LoadFilePVEmptyState(keyFilePath, stateFilePath string) *FilePV {
return loadFilePV(keyFilePath, stateFilePath, false)
}
// If loadState is true, we load from the stateFilePath. Otherwise, we use an empty LastSignState.
func loadFilePV(keyFilePath, stateFilePath string, loadState bool) *FilePV {
keyJSONBytes, err := ioutil.ReadFile(keyFilePath)
if err != nil {
tmos.Exit(err.Error())
}
pvKey := FilePVKey{}
err = tmjson.Unmarshal(keyJSONBytes, &pvKey)
if err != nil {
tmos.Exit(fmt.Sprintf("Error reading PrivValidator key from %v: %v\n", keyFilePath, err))
}
// overwrite pubkey and address for convenience
pvKey.PubKey = pvKey.PrivKey.PubKey()
pvKey.Address = pvKey.PubKey.Address()
pvKey.filePath = keyFilePath
pvState := FilePVLastSignState{}
if loadState {
stateJSONBytes, err := ioutil.ReadFile(stateFilePath)
if err != nil {
tmos.Exit(err.Error())
}
err = tmjson.Unmarshal(stateJSONBytes, &pvState)
if err != nil {
tmos.Exit(fmt.Sprintf("Error reading PrivValidator state from %v: %v\n", stateFilePath, err))
}
}
pvState.filePath = stateFilePath
return &FilePV{
Key: pvKey,
LastSignState: pvState,
}
}
// LoadOrGenFilePV loads a FilePV from the given filePaths
// or else generates a new one and saves it to the filePaths.
func LoadOrGenFilePV(keyFilePath, stateFilePath string) *FilePV {
var pv *FilePV
if tmos.FileExists(keyFilePath) {
pv = LoadFilePV(keyFilePath, stateFilePath)
} else {
pv = GenFilePV(keyFilePath, stateFilePath)
pv.Save()
}
return pv
}
// GetAddress returns the address of the validator.
// Implements PrivValidator.
func (pv *FilePV) GetAddress() types.Address {
return pv.Key.Address
}
// GetPubKey returns the public key of the validator.
// Implements PrivValidator.
func (pv *FilePV) GetPubKey() (crypto.PubKey, error) {
return pv.Key.PubKey, nil
}
// SignVote signs a canonical representation of the vote, along with the
// chainID. Implements PrivValidator.
func (pv *FilePV) SignVote(chainID string, vote *tmproto.Vote) error {
if err := pv.signVote(chainID, vote); err != nil {
return fmt.Errorf("error signing vote: %v", err)
}
return nil
}
// SignProposal signs a canonical representation of the proposal, along with
// the chainID. Implements PrivValidator.
func (pv *FilePV) SignProposal(chainID string, proposal *tmproto.Proposal) error {
if err := pv.signProposal(chainID, proposal); err != nil {
return fmt.Errorf("error signing proposal: %v", err)
}
return nil
}
// Save persists the FilePV to disk.
func (pv *FilePV) Save() {
pv.Key.Save()
pv.LastSignState.Save()
}
// Reset resets all fields in the FilePV.
// NOTE: Unsafe!
func (pv *FilePV) Reset() {
var sig []byte
pv.LastSignState.Height = 0
pv.LastSignState.Round = 0
pv.LastSignState.Step = 0
pv.LastSignState.Signature = sig
pv.LastSignState.SignBytes = nil
pv.Save()
}
// String returns a string representation of the FilePV.
func (pv *FilePV) String() string {
return fmt.Sprintf(
"PrivValidator{%v LH:%v, LR:%v, LS:%v}",
pv.GetAddress(),
pv.LastSignState.Height,
pv.LastSignState.Round,
pv.LastSignState.Step,
)
}
//------------------------------------------------------------------------------------
// signVote checks if the vote is good to sign and sets the vote signature.
// It may need to set the timestamp as well if the vote is otherwise the same as
// a previously signed vote (ie. we crashed after signing but before the vote hit the WAL).
func (pv *FilePV) signVote(chainID string, vote *tmproto.Vote) error {
height, round, step := vote.Height, vote.Round, voteToStep(vote)
lss := pv.LastSignState
_, err := lss.CheckHRS(height, round, step)
if err != nil {
return err
}
signBytes := types.VoteSignBytes(chainID, vote)
// It passed the checks. Sign the vote
sig, err := pv.Key.PrivKey.Sign(signBytes)
if err != nil {
return err
}
pv.saveSigned(height, round, step, signBytes, sig)
vote.Signature = sig
return nil
}
// signProposal checks if the proposal is good to sign and sets the proposal signature.
// It may need to set the timestamp as well if the proposal is otherwise the same as
// a previously signed proposal ie. we crashed after signing but before the proposal hit the WAL).
func (pv *FilePV) signProposal(chainID string, proposal *tmproto.Proposal) error {
height, round, step := proposal.Height, proposal.Round, stepPropose
lss := pv.LastSignState
_, err := lss.CheckHRS(height, round, step)
if err != nil {
return err
}
signBytes := types.ProposalSignBytes(chainID, proposal)
// It passed the checks. Sign the proposal
sig, err := pv.Key.PrivKey.Sign(signBytes)
if err != nil {
return err
}
pv.saveSigned(height, round, step, signBytes, sig)
proposal.Signature = sig
return nil
}
// Persist height/round/step and signature
func (pv *FilePV) saveSigned(height int64, round int32, step int8,
signBytes []byte, sig []byte) {
pv.LastSignState.Height = height
pv.LastSignState.Round = round
pv.LastSignState.Step = step
pv.LastSignState.Signature = sig
pv.LastSignState.SignBytes = signBytes
pv.LastSignState.Save()
}

Loading…
Cancel
Save