@ -0,0 +1,10 @@ | |||
#!/usr/bin/env bash | |||
# Forcibly remove any stray UNIX sockets left behind from previous runs | |||
rm -rf /var/run/privval.sock /var/run/app.sock | |||
/usr/bin/app /tendermint/config/app.toml & | |||
sleep 1 | |||
/usr/bin/maverick "$@" |
@ -0,0 +1,50 @@ | |||
package e2e_test | |||
import ( | |||
"bytes" | |||
"testing" | |||
"github.com/stretchr/testify/require" | |||
e2e "github.com/tendermint/tendermint/test/e2e/pkg" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
// assert that all nodes that have blocks during the height (or height + 1) of a misbehavior has evidence | |||
// for that misbehavior | |||
func TestEvidence_Misbehavior(t *testing.T) { | |||
blocks := fetchBlockChain(t) | |||
testNode(t, func(t *testing.T, node e2e.Node) { | |||
for _, block := range blocks { | |||
// Find any evidence blaming this node in this block | |||
var nodeEvidence types.Evidence | |||
for _, evidence := range block.Evidence.Evidence { | |||
switch evidence := evidence.(type) { | |||
case *types.DuplicateVoteEvidence: | |||
if bytes.Equal(evidence.VoteA.ValidatorAddress, node.Key.PubKey().Address()) { | |||
nodeEvidence = evidence | |||
} | |||
default: | |||
t.Fatalf("unexpected evidence type %T", evidence) | |||
} | |||
} | |||
// Check that evidence was as expected (evidence is submitted in following height) | |||
misbehavior, ok := node.Misbehaviors[block.Height-1] | |||
if !ok { | |||
require.Nil(t, nodeEvidence, "found unexpected evidence %v in height %v", | |||
nodeEvidence, block.Height) | |||
continue | |||
} | |||
require.NotNil(t, nodeEvidence, "no evidence found for misbehavior %v in height %v", | |||
misbehavior, block.Height) | |||
switch misbehavior { | |||
case "double-prevote": | |||
require.IsType(t, &types.DuplicateVoteEvidence{}, nodeEvidence, "unexpected evidence type") | |||
default: | |||
t.Fatalf("unknown misbehavior %v", misbehavior) | |||
} | |||
} | |||
}) | |||
} |
@ -0,0 +1,51 @@ | |||
# Maverick | |||
![](https://assets.rollingstone.com/assets/2015/article/tom-cruise-to-fight-drones-in-top-gun-sequel-20150629/201166/large_rect/1435581755/1401x788-Top-Gun-3.jpg) | |||
A byzantine node used to test Tendermint consensus against a plethora of different faulty misbehaviors. Designed to easily create new faulty misbehaviors to examine how a Tendermint network reacts to the misbehavior. Can also be used for fuzzy testing with different network arrangements. | |||
## Misbehaviors | |||
A misbehavior allows control at the following stages as highlighted by the struct below | |||
```go | |||
type Misbehavior struct { | |||
String string | |||
EnterPropose func(cs *State, height int64, round int32) | |||
EnterPrevote func(cs *State, height int64, round int32) | |||
EnterPrecommit func(cs *State, height int64, round int32) | |||
ReceivePrevote func(cs *State, prevote *types.Vote) | |||
ReceivePrecommit func(cs *State, precommit *types.Vote) | |||
ReceiveProposal func(cs *State, proposal *types.Proposal) error | |||
} | |||
``` | |||
At each of these events, the node can exhibit a different misbehavior. To create a new misbehavior define a function that builds off the existing default misbehavior and then overrides one or more of these functions. Then append it to the misbehaviors list so the node recognizes it like so: | |||
```go | |||
var MisbehaviorList = map[string]Misbehavior{ | |||
"double-prevote": DoublePrevoteMisbehavior(), | |||
} | |||
``` | |||
## Setup | |||
The maverick node takes most of the functionality from the existing Tendermint CLI. To install this, in the directory of this readme, run: | |||
```bash | |||
go build | |||
``` | |||
Use `maverick init` to initialize a single node and `maverick node` to run it. This will run it normally unless you use the misbehaviors flag as follows: | |||
```bash | |||
maverick node --proxy_app persistent_kvstore --misbehaviors double-vote,10 | |||
``` | |||
This would cause the node to vote twice in every round at height 10. To add more misbehaviors at different heights, append the next misbehavior and height after the first (with comma separation). |
@ -0,0 +1,220 @@ | |||
package consensus | |||
import ( | |||
"github.com/go-kit/kit/metrics" | |||
"github.com/go-kit/kit/metrics/discard" | |||
prometheus "github.com/go-kit/kit/metrics/prometheus" | |||
stdprometheus "github.com/prometheus/client_golang/prometheus" | |||
) | |||
const ( | |||
// MetricsSubsystem is a subsystem shared by all metrics exposed by this | |||
// package. | |||
MetricsSubsystem = "consensus" | |||
) | |||
// Metrics contains metrics exposed by this package. | |||
type Metrics struct { | |||
// Height of the chain. | |||
Height metrics.Gauge | |||
// ValidatorLastSignedHeight of a validator. | |||
ValidatorLastSignedHeight metrics.Gauge | |||
// Number of rounds. | |||
Rounds metrics.Gauge | |||
// Number of validators. | |||
Validators metrics.Gauge | |||
// Total power of all validators. | |||
ValidatorsPower metrics.Gauge | |||
// Power of a validator. | |||
ValidatorPower metrics.Gauge | |||
// Amount of blocks missed by a validator. | |||
ValidatorMissedBlocks metrics.Gauge | |||
// Number of validators who did not sign. | |||
MissingValidators metrics.Gauge | |||
// Total power of the missing validators. | |||
MissingValidatorsPower metrics.Gauge | |||
// Number of validators who tried to double sign. | |||
ByzantineValidators metrics.Gauge | |||
// Total power of the byzantine validators. | |||
ByzantineValidatorsPower metrics.Gauge | |||
// Time between this and the last block. | |||
BlockIntervalSeconds metrics.Histogram | |||
// Number of transactions. | |||
NumTxs metrics.Gauge | |||
// Size of the block. | |||
BlockSizeBytes metrics.Gauge | |||
// Total number of transactions. | |||
TotalTxs metrics.Gauge | |||
// The latest block height. | |||
CommittedHeight metrics.Gauge | |||
// Whether or not a node is fast syncing. 1 if yes, 0 if no. | |||
FastSyncing metrics.Gauge | |||
// Whether or not a node is state syncing. 1 if yes, 0 if no. | |||
StateSyncing metrics.Gauge | |||
// Number of blockparts transmitted by peer. | |||
BlockParts metrics.Counter | |||
} | |||
// PrometheusMetrics returns Metrics build using Prometheus client library. | |||
// Optionally, labels can be provided along with their values ("foo", | |||
// "fooValue"). | |||
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { | |||
labels := []string{} | |||
for i := 0; i < len(labelsAndValues); i += 2 { | |||
labels = append(labels, labelsAndValues[i]) | |||
} | |||
return &Metrics{ | |||
Height: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "height", | |||
Help: "Height of the chain.", | |||
}, labels).With(labelsAndValues...), | |||
Rounds: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "rounds", | |||
Help: "Number of rounds.", | |||
}, labels).With(labelsAndValues...), | |||
Validators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "validators", | |||
Help: "Number of validators.", | |||
}, labels).With(labelsAndValues...), | |||
ValidatorLastSignedHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "validator_last_signed_height", | |||
Help: "Last signed height for a validator", | |||
}, append(labels, "validator_address")).With(labelsAndValues...), | |||
ValidatorMissedBlocks: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "validator_missed_blocks", | |||
Help: "Total missed blocks for a validator", | |||
}, append(labels, "validator_address")).With(labelsAndValues...), | |||
ValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "validators_power", | |||
Help: "Total power of all validators.", | |||
}, labels).With(labelsAndValues...), | |||
ValidatorPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "validator_power", | |||
Help: "Power of a validator", | |||
}, append(labels, "validator_address")).With(labelsAndValues...), | |||
MissingValidators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "missing_validators", | |||
Help: "Number of validators who did not sign.", | |||
}, labels).With(labelsAndValues...), | |||
MissingValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "missing_validators_power", | |||
Help: "Total power of the missing validators.", | |||
}, labels).With(labelsAndValues...), | |||
ByzantineValidators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "byzantine_validators", | |||
Help: "Number of validators who tried to double sign.", | |||
}, labels).With(labelsAndValues...), | |||
ByzantineValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "byzantine_validators_power", | |||
Help: "Total power of the byzantine validators.", | |||
}, labels).With(labelsAndValues...), | |||
BlockIntervalSeconds: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "block_interval_seconds", | |||
Help: "Time between this and the last block.", | |||
}, labels).With(labelsAndValues...), | |||
NumTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "num_txs", | |||
Help: "Number of transactions.", | |||
}, labels).With(labelsAndValues...), | |||
BlockSizeBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "block_size_bytes", | |||
Help: "Size of the block.", | |||
}, labels).With(labelsAndValues...), | |||
TotalTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "total_txs", | |||
Help: "Total number of transactions.", | |||
}, labels).With(labelsAndValues...), | |||
CommittedHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "latest_block_height", | |||
Help: "The latest block height.", | |||
}, labels).With(labelsAndValues...), | |||
FastSyncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "fast_syncing", | |||
Help: "Whether or not a node is fast syncing. 1 if yes, 0 if no.", | |||
}, labels).With(labelsAndValues...), | |||
StateSyncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "state_syncing", | |||
Help: "Whether or not a node is state syncing. 1 if yes, 0 if no.", | |||
}, labels).With(labelsAndValues...), | |||
BlockParts: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||
Namespace: namespace, | |||
Subsystem: MetricsSubsystem, | |||
Name: "block_parts", | |||
Help: "Number of blockparts transmitted by peer.", | |||
}, append(labels, "peer_id")).With(labelsAndValues...), | |||
} | |||
} | |||
// NopMetrics returns no-op Metrics. | |||
func NopMetrics() *Metrics { | |||
return &Metrics{ | |||
Height: discard.NewGauge(), | |||
ValidatorLastSignedHeight: discard.NewGauge(), | |||
Rounds: discard.NewGauge(), | |||
Validators: discard.NewGauge(), | |||
ValidatorsPower: discard.NewGauge(), | |||
ValidatorPower: discard.NewGauge(), | |||
ValidatorMissedBlocks: discard.NewGauge(), | |||
MissingValidators: discard.NewGauge(), | |||
MissingValidatorsPower: discard.NewGauge(), | |||
ByzantineValidators: discard.NewGauge(), | |||
ByzantineValidatorsPower: discard.NewGauge(), | |||
BlockIntervalSeconds: discard.NewHistogram(), | |||
NumTxs: discard.NewGauge(), | |||
BlockSizeBytes: discard.NewGauge(), | |||
TotalTxs: discard.NewGauge(), | |||
CommittedHeight: discard.NewGauge(), | |||
FastSyncing: discard.NewGauge(), | |||
StateSyncing: discard.NewGauge(), | |||
BlockParts: discard.NewCounter(), | |||
} | |||
} |
@ -0,0 +1,398 @@ | |||
package consensus | |||
import ( | |||
"fmt" | |||
cstypes "github.com/tendermint/tendermint/consensus/types" | |||
tmproto "github.com/tendermint/tendermint/proto/tendermint/types" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
// MisbehaviorList encompasses a list of all possible behaviors | |||
var MisbehaviorList = map[string]Misbehavior{ | |||
"double-prevote": DoublePrevoteMisbehavior(), | |||
} | |||
type Misbehavior struct { | |||
Name string | |||
EnterPropose func(cs *State, height int64, round int32) | |||
EnterPrevote func(cs *State, height int64, round int32) | |||
EnterPrecommit func(cs *State, height int64, round int32) | |||
ReceivePrevote func(cs *State, prevote *types.Vote) | |||
ReceivePrecommit func(cs *State, precommit *types.Vote) | |||
ReceiveProposal func(cs *State, proposal *types.Proposal) error | |||
} | |||
// BEHAVIORS | |||
func DefaultMisbehavior() Misbehavior { | |||
return Misbehavior{ | |||
Name: "default", | |||
EnterPropose: defaultEnterPropose, | |||
EnterPrevote: defaultEnterPrevote, | |||
EnterPrecommit: defaultEnterPrecommit, | |||
ReceivePrevote: defaultReceivePrevote, | |||
ReceivePrecommit: defaultReceivePrecommit, | |||
ReceiveProposal: defaultReceiveProposal, | |||
} | |||
} | |||
// DoublePrevoteMisbehavior will make a node prevote both nil and a block in the same | |||
// height and round. | |||
func DoublePrevoteMisbehavior() Misbehavior { | |||
b := DefaultMisbehavior() | |||
b.Name = "double-prevote" | |||
b.EnterPrevote = func(cs *State, height int64, round int32) { | |||
// If a block is locked, prevote that. | |||
if cs.LockedBlock != nil { | |||
cs.Logger.Info("enterPrevote: Already locked on a block, prevoting locked block") | |||
cs.signAddVote(tmproto.PrevoteType, cs.LockedBlock.Hash(), cs.LockedBlockParts.Header()) | |||
return | |||
} | |||
// If ProposalBlock is nil, prevote nil. | |||
if cs.ProposalBlock == nil { | |||
cs.Logger.Info("enterPrevote: ProposalBlock is nil") | |||
cs.signAddVote(tmproto.PrevoteType, nil, types.PartSetHeader{}) | |||
return | |||
} | |||
// Validate proposal block | |||
err := cs.blockExec.ValidateBlock(cs.state, cs.ProposalBlock) | |||
if err != nil { | |||
// ProposalBlock is invalid, prevote nil. | |||
cs.Logger.Error("enterPrevote: ProposalBlock is invalid", "err", err) | |||
cs.signAddVote(tmproto.PrevoteType, nil, types.PartSetHeader{}) | |||
return | |||
} | |||
if cs.sw == nil { | |||
cs.Logger.Error("nil switch") | |||
return | |||
} | |||
prevote, err := cs.signVote(tmproto.PrevoteType, cs.ProposalBlock.Hash(), cs.ProposalBlockParts.Header()) | |||
if err != nil { | |||
cs.Logger.Error("enterPrevote: Unable to sign block", "err", err) | |||
} | |||
nilPrevote, err := cs.signVote(tmproto.PrevoteType, nil, types.PartSetHeader{}) | |||
if err != nil { | |||
cs.Logger.Error("enterPrevote: Unable to sign block", "err", err) | |||
} | |||
// add our own vote | |||
cs.sendInternalMessage(msgInfo{&VoteMessage{prevote}, ""}) | |||
cs.Logger.Info("Sending conflicting votes") | |||
peers := cs.sw.Peers().List() | |||
// there has to be at least two other peers connected else this behavior works normally | |||
for idx, peer := range peers { | |||
if idx%2 == 0 { // sign the proposal block | |||
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote})) | |||
} else { // sign a nil block | |||
peer.Send(VoteChannel, MustEncode(&VoteMessage{nilPrevote})) | |||
} | |||
} | |||
} | |||
return b | |||
} | |||
// DEFAULTS | |||
func defaultEnterPropose(cs *State, height int64, round int32) { | |||
logger := cs.Logger.With("height", height, "round", round) | |||
// If we don't get the proposal and all block parts quick enough, enterPrevote | |||
cs.scheduleTimeout(cs.config.Propose(round), height, round, cstypes.RoundStepPropose) | |||
// Nothing more to do if we're not a validator | |||
if cs.privValidator == nil { | |||
logger.Debug("This node is not a validator") | |||
return | |||
} | |||
logger.Debug("This node is a validator") | |||
pubKey, err := cs.privValidator.GetPubKey() | |||
if err != nil { | |||
// If this node is a validator & proposer in the currentx round, it will | |||
// miss the opportunity to create a block. | |||
logger.Error("Error on retrival of pubkey", "err", err) | |||
return | |||
} | |||
address := pubKey.Address() | |||
// if not a validator, we're done | |||
if !cs.Validators.HasAddress(address) { | |||
logger.Debug("This node is not a validator", "addr", address, "vals", cs.Validators) | |||
return | |||
} | |||
if cs.isProposer(address) { | |||
logger.Info("enterPropose: Our turn to propose", | |||
"proposer", | |||
address, | |||
"privValidator", | |||
cs.privValidator) | |||
cs.decideProposal(height, round) | |||
} else { | |||
logger.Info("enterPropose: Not our turn to propose", | |||
"proposer", | |||
cs.Validators.GetProposer().Address, | |||
"privValidator", | |||
cs.privValidator) | |||
} | |||
} | |||
func defaultEnterPrevote(cs *State, height int64, round int32) { | |||
logger := cs.Logger.With("height", height, "round", round) | |||
// If a block is locked, prevote that. | |||
if cs.LockedBlock != nil { | |||
logger.Info("enterPrevote: Already locked on a block, prevoting locked block") | |||
cs.signAddVote(tmproto.PrevoteType, cs.LockedBlock.Hash(), cs.LockedBlockParts.Header()) | |||
return | |||
} | |||
// If ProposalBlock is nil, prevote nil. | |||
if cs.ProposalBlock == nil { | |||
logger.Info("enterPrevote: ProposalBlock is nil") | |||
cs.signAddVote(tmproto.PrevoteType, nil, types.PartSetHeader{}) | |||
return | |||
} | |||
// Validate proposal block | |||
err := cs.blockExec.ValidateBlock(cs.state, cs.ProposalBlock) | |||
if err != nil { | |||
// ProposalBlock is invalid, prevote nil. | |||
logger.Error("enterPrevote: ProposalBlock is invalid", "err", err) | |||
cs.signAddVote(tmproto.PrevoteType, nil, types.PartSetHeader{}) | |||
return | |||
} | |||
// Prevote cs.ProposalBlock | |||
// NOTE: the proposal signature is validated when it is received, | |||
// and the proposal block parts are validated as they are received (against the merkle hash in the proposal) | |||
logger.Info("enterPrevote: ProposalBlock is valid") | |||
cs.signAddVote(tmproto.PrevoteType, cs.ProposalBlock.Hash(), cs.ProposalBlockParts.Header()) | |||
} | |||
func defaultEnterPrecommit(cs *State, height int64, round int32) { | |||
logger := cs.Logger.With("height", height, "round", round) | |||
// check for a polka | |||
blockID, ok := cs.Votes.Prevotes(round).TwoThirdsMajority() | |||
// If we don't have a polka, we must precommit nil. | |||
if !ok { | |||
if cs.LockedBlock != nil { | |||
logger.Info("enterPrecommit: No +2/3 prevotes during enterPrecommit while we're locked. Precommitting nil") | |||
} else { | |||
logger.Info("enterPrecommit: No +2/3 prevotes during enterPrecommit. Precommitting nil.") | |||
} | |||
cs.signAddVote(tmproto.PrecommitType, nil, types.PartSetHeader{}) | |||
return | |||
} | |||
// At this point +2/3 prevoted for a particular block or nil. | |||
_ = cs.eventBus.PublishEventPolka(cs.RoundStateEvent()) | |||
// the latest POLRound should be this round. | |||
polRound, _ := cs.Votes.POLInfo() | |||
if polRound < round { | |||
panic(fmt.Sprintf("This POLRound should be %v but got %v", round, polRound)) | |||
} | |||
// +2/3 prevoted nil. Unlock and precommit nil. | |||
if len(blockID.Hash) == 0 { | |||
if cs.LockedBlock == nil { | |||
logger.Info("enterPrecommit: +2/3 prevoted for nil.") | |||
} else { | |||
logger.Info("enterPrecommit: +2/3 prevoted for nil. Unlocking") | |||
cs.LockedRound = -1 | |||
cs.LockedBlock = nil | |||
cs.LockedBlockParts = nil | |||
_ = cs.eventBus.PublishEventUnlock(cs.RoundStateEvent()) | |||
} | |||
cs.signAddVote(tmproto.PrecommitType, nil, types.PartSetHeader{}) | |||
return | |||
} | |||
// At this point, +2/3 prevoted for a particular block. | |||
// If we're already locked on that block, precommit it, and update the LockedRound | |||
if cs.LockedBlock.HashesTo(blockID.Hash) { | |||
logger.Info("enterPrecommit: +2/3 prevoted locked block. Relocking") | |||
cs.LockedRound = round | |||
_ = cs.eventBus.PublishEventRelock(cs.RoundStateEvent()) | |||
cs.signAddVote(tmproto.PrecommitType, blockID.Hash, blockID.PartSetHeader) | |||
return | |||
} | |||
// If +2/3 prevoted for proposal block, stage and precommit it | |||
if cs.ProposalBlock.HashesTo(blockID.Hash) { | |||
logger.Info("enterPrecommit: +2/3 prevoted proposal block. Locking", "hash", blockID.Hash) | |||
// Validate the block. | |||
if err := cs.blockExec.ValidateBlock(cs.state, cs.ProposalBlock); err != nil { | |||
panic(fmt.Sprintf("enterPrecommit: +2/3 prevoted for an invalid block: %v", err)) | |||
} | |||
cs.LockedRound = round | |||
cs.LockedBlock = cs.ProposalBlock | |||
cs.LockedBlockParts = cs.ProposalBlockParts | |||
_ = cs.eventBus.PublishEventLock(cs.RoundStateEvent()) | |||
cs.signAddVote(tmproto.PrecommitType, blockID.Hash, blockID.PartSetHeader) | |||
return | |||
} | |||
// There was a polka in this round for a block we don't have. | |||
// Fetch that block, unlock, and precommit nil. | |||
// The +2/3 prevotes for this round is the POL for our unlock. | |||
logger.Info("enterPrecommit: +2/3 prevotes for a block we don't have. Voting nil", "blockID", blockID) | |||
cs.LockedRound = -1 | |||
cs.LockedBlock = nil | |||
cs.LockedBlockParts = nil | |||
if !cs.ProposalBlockParts.HasHeader(blockID.PartSetHeader) { | |||
cs.ProposalBlock = nil | |||
cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader) | |||
} | |||
_ = cs.eventBus.PublishEventUnlock(cs.RoundStateEvent()) | |||
cs.signAddVote(tmproto.PrecommitType, nil, types.PartSetHeader{}) | |||
} | |||
func defaultReceivePrevote(cs *State, vote *types.Vote) { | |||
height := cs.Height | |||
prevotes := cs.Votes.Prevotes(vote.Round) | |||
// If +2/3 prevotes for a block or nil for *any* round: | |||
if blockID, ok := prevotes.TwoThirdsMajority(); ok { | |||
// There was a polka! | |||
// If we're locked but this is a recent polka, unlock. | |||
// If it matches our ProposalBlock, update the ValidBlock | |||
// Unlock if `cs.LockedRound < vote.Round <= cs.Round` | |||
// NOTE: If vote.Round > cs.Round, we'll deal with it when we get to vote.Round | |||
if (cs.LockedBlock != nil) && | |||
(cs.LockedRound < vote.Round) && | |||
(vote.Round <= cs.Round) && | |||
!cs.LockedBlock.HashesTo(blockID.Hash) { | |||
cs.Logger.Info("Unlocking because of POL.", "lockedRound", cs.LockedRound, "POLRound", vote.Round) | |||
cs.LockedRound = -1 | |||
cs.LockedBlock = nil | |||
cs.LockedBlockParts = nil | |||
_ = cs.eventBus.PublishEventUnlock(cs.RoundStateEvent()) | |||
} | |||
// Update Valid* if we can. | |||
// NOTE: our proposal block may be nil or not what received a polka.. | |||
if len(blockID.Hash) != 0 && (cs.ValidRound < vote.Round) && (vote.Round == cs.Round) { | |||
if cs.ProposalBlock.HashesTo(blockID.Hash) { | |||
cs.Logger.Info( | |||
"Updating ValidBlock because of POL.", "validRound", cs.ValidRound, "POLRound", vote.Round) | |||
cs.ValidRound = vote.Round | |||
cs.ValidBlock = cs.ProposalBlock | |||
cs.ValidBlockParts = cs.ProposalBlockParts | |||
} else { | |||
cs.Logger.Info( | |||
"Valid block we don't know about. Set ProposalBlock=nil", | |||
"proposal", cs.ProposalBlock.Hash(), "blockID", blockID.Hash) | |||
// We're getting the wrong block. | |||
cs.ProposalBlock = nil | |||
} | |||
if !cs.ProposalBlockParts.HasHeader(blockID.PartSetHeader) { | |||
cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader) | |||
} | |||
cs.evsw.FireEvent(types.EventValidBlock, &cs.RoundState) | |||
_ = cs.eventBus.PublishEventValidBlock(cs.RoundStateEvent()) | |||
} | |||
} | |||
// If +2/3 prevotes for *anything* for future round: | |||
switch { | |||
case cs.Round < vote.Round && prevotes.HasTwoThirdsAny(): | |||
// Round-skip if there is any 2/3+ of votes ahead of us | |||
cs.enterNewRound(height, vote.Round) | |||
case cs.Round == vote.Round && cstypes.RoundStepPrevote <= cs.Step: // current round | |||
blockID, ok := prevotes.TwoThirdsMajority() | |||
if ok && (cs.isProposalComplete() || len(blockID.Hash) == 0) { | |||
cs.enterPrecommit(height, vote.Round) | |||
} else if prevotes.HasTwoThirdsAny() { | |||
cs.enterPrevoteWait(height, vote.Round) | |||
} | |||
case cs.Proposal != nil && 0 <= cs.Proposal.POLRound && cs.Proposal.POLRound == vote.Round: | |||
// If the proposal is now complete, enter prevote of cs.Round. | |||
if cs.isProposalComplete() { | |||
cs.enterPrevote(height, cs.Round) | |||
} | |||
} | |||
} | |||
func defaultReceivePrecommit(cs *State, vote *types.Vote) { | |||
height := cs.Height | |||
precommits := cs.Votes.Precommits(vote.Round) | |||
cs.Logger.Info("Added to precommit", "vote", vote, "precommits", precommits.StringShort()) | |||
blockID, ok := precommits.TwoThirdsMajority() | |||
if ok { | |||
// Executed as TwoThirdsMajority could be from a higher round | |||
cs.enterNewRound(height, vote.Round) | |||
cs.enterPrecommit(height, vote.Round) | |||
if len(blockID.Hash) != 0 { | |||
cs.enterCommit(height, vote.Round) | |||
if cs.config.SkipTimeoutCommit && precommits.HasAll() { | |||
cs.enterNewRound(cs.Height, 0) | |||
} | |||
} else { | |||
cs.enterPrecommitWait(height, vote.Round) | |||
} | |||
} else if cs.Round <= vote.Round && precommits.HasTwoThirdsAny() { | |||
cs.enterNewRound(height, vote.Round) | |||
cs.enterPrecommitWait(height, vote.Round) | |||
} | |||
} | |||
func defaultReceiveProposal(cs *State, proposal *types.Proposal) error { | |||
// Already have one | |||
// TODO: possibly catch double proposals | |||
if cs.Proposal != nil { | |||
return nil | |||
} | |||
// Does not apply | |||
if proposal.Height != cs.Height || proposal.Round != cs.Round { | |||
return nil | |||
} | |||
// Verify POLRound, which must be -1 or in range [0, proposal.Round). | |||
if proposal.POLRound < -1 || | |||
(proposal.POLRound >= 0 && proposal.POLRound >= proposal.Round) { | |||
return ErrInvalidProposalPOLRound | |||
} | |||
p := proposal.ToProto() | |||
// Verify signature | |||
if !cs.Validators.GetProposer().PubKey.VerifySignature( | |||
types.ProposalSignBytes(cs.state.ChainID, p), proposal.Signature) { | |||
return ErrInvalidProposalSignature | |||
} | |||
proposal.Signature = p.Signature | |||
cs.Proposal = proposal | |||
// We don't update cs.ProposalBlockParts if it is already set. | |||
// This happens if we're already in cstypes.RoundStepCommit or if there is a valid block in the current round. | |||
// TODO: We can check if Proposal is for a different block as this is a sign of misbehavior! | |||
if cs.ProposalBlockParts == nil { | |||
cs.ProposalBlockParts = types.NewPartSetFromHeader(proposal.BlockID.PartSetHeader) | |||
} | |||
cs.Logger.Info("Received proposal", "proposal", proposal) | |||
return nil | |||
} |
@ -0,0 +1,377 @@ | |||
package consensus | |||
import ( | |||
"errors" | |||
"fmt" | |||
"github.com/gogo/protobuf/proto" | |||
cstypes "github.com/tendermint/tendermint/consensus/types" | |||
"github.com/tendermint/tendermint/libs/bits" | |||
tmmath "github.com/tendermint/tendermint/libs/math" | |||
"github.com/tendermint/tendermint/p2p" | |||
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus" | |||
tmproto "github.com/tendermint/tendermint/proto/tendermint/types" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
// MsgToProto takes a consensus message type and returns the proto defined consensus message | |||
func MsgToProto(msg Message) (*tmcons.Message, error) { | |||
if msg == nil { | |||
return nil, errors.New("consensus: message is nil") | |||
} | |||
var pb tmcons.Message | |||
switch msg := msg.(type) { | |||
case *NewRoundStepMessage: | |||
pb = tmcons.Message{ | |||
Sum: &tmcons.Message_NewRoundStep{ | |||
NewRoundStep: &tmcons.NewRoundStep{ | |||
Height: msg.Height, | |||
Round: msg.Round, | |||
Step: uint32(msg.Step), | |||
SecondsSinceStartTime: msg.SecondsSinceStartTime, | |||
LastCommitRound: msg.LastCommitRound, | |||
}, | |||
}, | |||
} | |||
case *NewValidBlockMessage: | |||
pbPartSetHeader := msg.BlockPartSetHeader.ToProto() | |||
pbBits := msg.BlockParts.ToProto() | |||
pb = tmcons.Message{ | |||
Sum: &tmcons.Message_NewValidBlock{ | |||
NewValidBlock: &tmcons.NewValidBlock{ | |||
Height: msg.Height, | |||
Round: msg.Round, | |||
BlockPartSetHeader: pbPartSetHeader, | |||
BlockParts: pbBits, | |||
IsCommit: msg.IsCommit, | |||
}, | |||
}, | |||
} | |||
case *ProposalMessage: | |||
pbP := msg.Proposal.ToProto() | |||
pb = tmcons.Message{ | |||
Sum: &tmcons.Message_Proposal{ | |||
Proposal: &tmcons.Proposal{ | |||
Proposal: *pbP, | |||
}, | |||
}, | |||
} | |||
case *ProposalPOLMessage: | |||
pbBits := msg.ProposalPOL.ToProto() | |||
pb = tmcons.Message{ | |||
Sum: &tmcons.Message_ProposalPol{ | |||
ProposalPol: &tmcons.ProposalPOL{ | |||
Height: msg.Height, | |||
ProposalPolRound: msg.ProposalPOLRound, | |||
ProposalPol: *pbBits, | |||
}, | |||
}, | |||
} | |||
case *BlockPartMessage: | |||
parts, err := msg.Part.ToProto() | |||
if err != nil { | |||
return nil, fmt.Errorf("msg to proto error: %w", err) | |||
} | |||
pb = tmcons.Message{ | |||
Sum: &tmcons.Message_BlockPart{ | |||
BlockPart: &tmcons.BlockPart{ | |||
Height: msg.Height, | |||
Round: msg.Round, | |||
Part: *parts, | |||
}, | |||
}, | |||
} | |||
case *VoteMessage: | |||
vote := msg.Vote.ToProto() | |||
pb = tmcons.Message{ | |||
Sum: &tmcons.Message_Vote{ | |||
Vote: &tmcons.Vote{ | |||
Vote: vote, | |||
}, | |||
}, | |||
} | |||
case *HasVoteMessage: | |||
pb = tmcons.Message{ | |||
Sum: &tmcons.Message_HasVote{ | |||
HasVote: &tmcons.HasVote{ | |||
Height: msg.Height, | |||
Round: msg.Round, | |||
Type: msg.Type, | |||
Index: msg.Index, | |||
}, | |||
}, | |||
} | |||
case *VoteSetMaj23Message: | |||
bi := msg.BlockID.ToProto() | |||
pb = tmcons.Message{ | |||
Sum: &tmcons.Message_VoteSetMaj23{ | |||
VoteSetMaj23: &tmcons.VoteSetMaj23{ | |||
Height: msg.Height, | |||
Round: msg.Round, | |||
Type: msg.Type, | |||
BlockID: bi, | |||
}, | |||
}, | |||
} | |||
case *VoteSetBitsMessage: | |||
bi := msg.BlockID.ToProto() | |||
bits := msg.Votes.ToProto() | |||
vsb := &tmcons.Message_VoteSetBits{ | |||
VoteSetBits: &tmcons.VoteSetBits{ | |||
Height: msg.Height, | |||
Round: msg.Round, | |||
Type: msg.Type, | |||
BlockID: bi, | |||
}, | |||
} | |||
if bits != nil { | |||
vsb.VoteSetBits.Votes = *bits | |||
} | |||
pb = tmcons.Message{ | |||
Sum: vsb, | |||
} | |||
default: | |||
return nil, fmt.Errorf("consensus: message not recognized: %T", msg) | |||
} | |||
return &pb, nil | |||
} | |||
// MsgFromProto takes a consensus proto message and returns the native go type | |||
func MsgFromProto(msg *tmcons.Message) (Message, error) { | |||
if msg == nil { | |||
return nil, errors.New("consensus: nil message") | |||
} | |||
var pb Message | |||
switch msg := msg.Sum.(type) { | |||
case *tmcons.Message_NewRoundStep: | |||
rs, err := tmmath.SafeConvertUint8(int64(msg.NewRoundStep.Step)) | |||
// deny message based on possible overflow | |||
if err != nil { | |||
return nil, fmt.Errorf("denying message due to possible overflow: %w", err) | |||
} | |||
pb = &NewRoundStepMessage{ | |||
Height: msg.NewRoundStep.Height, | |||
Round: msg.NewRoundStep.Round, | |||
Step: cstypes.RoundStepType(rs), | |||
SecondsSinceStartTime: msg.NewRoundStep.SecondsSinceStartTime, | |||
LastCommitRound: msg.NewRoundStep.LastCommitRound, | |||
} | |||
case *tmcons.Message_NewValidBlock: | |||
pbPartSetHeader, err := types.PartSetHeaderFromProto(&msg.NewValidBlock.BlockPartSetHeader) | |||
if err != nil { | |||
return nil, fmt.Errorf("parts to proto error: %w", err) | |||
} | |||
pbBits := new(bits.BitArray) | |||
pbBits.FromProto(msg.NewValidBlock.BlockParts) | |||
pb = &NewValidBlockMessage{ | |||
Height: msg.NewValidBlock.Height, | |||
Round: msg.NewValidBlock.Round, | |||
BlockPartSetHeader: *pbPartSetHeader, | |||
BlockParts: pbBits, | |||
IsCommit: msg.NewValidBlock.IsCommit, | |||
} | |||
case *tmcons.Message_Proposal: | |||
pbP, err := types.ProposalFromProto(&msg.Proposal.Proposal) | |||
if err != nil { | |||
return nil, fmt.Errorf("proposal msg to proto error: %w", err) | |||
} | |||
pb = &ProposalMessage{ | |||
Proposal: pbP, | |||
} | |||
case *tmcons.Message_ProposalPol: | |||
pbBits := new(bits.BitArray) | |||
pbBits.FromProto(&msg.ProposalPol.ProposalPol) | |||
pb = &ProposalPOLMessage{ | |||
Height: msg.ProposalPol.Height, | |||
ProposalPOLRound: msg.ProposalPol.ProposalPolRound, | |||
ProposalPOL: pbBits, | |||
} | |||
case *tmcons.Message_BlockPart: | |||
parts, err := types.PartFromProto(&msg.BlockPart.Part) | |||
if err != nil { | |||
return nil, fmt.Errorf("blockpart msg to proto error: %w", err) | |||
} | |||
pb = &BlockPartMessage{ | |||
Height: msg.BlockPart.Height, | |||
Round: msg.BlockPart.Round, | |||
Part: parts, | |||
} | |||
case *tmcons.Message_Vote: | |||
vote, err := types.VoteFromProto(msg.Vote.Vote) | |||
if err != nil { | |||
return nil, fmt.Errorf("vote msg to proto error: %w", err) | |||
} | |||
pb = &VoteMessage{ | |||
Vote: vote, | |||
} | |||
case *tmcons.Message_HasVote: | |||
pb = &HasVoteMessage{ | |||
Height: msg.HasVote.Height, | |||
Round: msg.HasVote.Round, | |||
Type: msg.HasVote.Type, | |||
Index: msg.HasVote.Index, | |||
} | |||
case *tmcons.Message_VoteSetMaj23: | |||
bi, err := types.BlockIDFromProto(&msg.VoteSetMaj23.BlockID) | |||
if err != nil { | |||
return nil, fmt.Errorf("voteSetMaj23 msg to proto error: %w", err) | |||
} | |||
pb = &VoteSetMaj23Message{ | |||
Height: msg.VoteSetMaj23.Height, | |||
Round: msg.VoteSetMaj23.Round, | |||
Type: msg.VoteSetMaj23.Type, | |||
BlockID: *bi, | |||
} | |||
case *tmcons.Message_VoteSetBits: | |||
bi, err := types.BlockIDFromProto(&msg.VoteSetBits.BlockID) | |||
if err != nil { | |||
return nil, fmt.Errorf("voteSetBits msg to proto error: %w", err) | |||
} | |||
bits := new(bits.BitArray) | |||
bits.FromProto(&msg.VoteSetBits.Votes) | |||
pb = &VoteSetBitsMessage{ | |||
Height: msg.VoteSetBits.Height, | |||
Round: msg.VoteSetBits.Round, | |||
Type: msg.VoteSetBits.Type, | |||
BlockID: *bi, | |||
Votes: bits, | |||
} | |||
default: | |||
return nil, fmt.Errorf("consensus: message not recognized: %T", msg) | |||
} | |||
if err := pb.ValidateBasic(); err != nil { | |||
return nil, err | |||
} | |||
return pb, nil | |||
} | |||
// MustEncode takes the reactors msg, makes it proto and marshals it | |||
// this mimics `MustMarshalBinaryBare` in that is panics on error | |||
func MustEncode(msg Message) []byte { | |||
pb, err := MsgToProto(msg) | |||
if err != nil { | |||
panic(err) | |||
} | |||
enc, err := proto.Marshal(pb) | |||
if err != nil { | |||
panic(err) | |||
} | |||
return enc | |||
} | |||
// WALToProto takes a WAL message and return a proto walMessage and error | |||
func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) { | |||
var pb tmcons.WALMessage | |||
switch msg := msg.(type) { | |||
case types.EventDataRoundState: | |||
pb = tmcons.WALMessage{ | |||
Sum: &tmcons.WALMessage_EventDataRoundState{ | |||
EventDataRoundState: &tmproto.EventDataRoundState{ | |||
Height: msg.Height, | |||
Round: msg.Round, | |||
Step: msg.Step, | |||
}, | |||
}, | |||
} | |||
case msgInfo: | |||
consMsg, err := MsgToProto(msg.Msg) | |||
if err != nil { | |||
return nil, err | |||
} | |||
pb = tmcons.WALMessage{ | |||
Sum: &tmcons.WALMessage_MsgInfo{ | |||
MsgInfo: &tmcons.MsgInfo{ | |||
Msg: *consMsg, | |||
PeerID: string(msg.PeerID), | |||
}, | |||
}, | |||
} | |||
case timeoutInfo: | |||
pb = tmcons.WALMessage{ | |||
Sum: &tmcons.WALMessage_TimeoutInfo{ | |||
TimeoutInfo: &tmcons.TimeoutInfo{ | |||
Duration: msg.Duration, | |||
Height: msg.Height, | |||
Round: msg.Round, | |||
Step: uint32(msg.Step), | |||
}, | |||
}, | |||
} | |||
case EndHeightMessage: | |||
pb = tmcons.WALMessage{ | |||
Sum: &tmcons.WALMessage_EndHeight{ | |||
EndHeight: &tmcons.EndHeight{ | |||
Height: msg.Height, | |||
}, | |||
}, | |||
} | |||
default: | |||
return nil, fmt.Errorf("to proto: wal message not recognized: %T", msg) | |||
} | |||
return &pb, nil | |||
} | |||
// WALFromProto takes a proto wal message and return a consensus walMessage and error | |||
func WALFromProto(msg *tmcons.WALMessage) (WALMessage, error) { | |||
if msg == nil { | |||
return nil, errors.New("nil WAL message") | |||
} | |||
var pb WALMessage | |||
switch msg := msg.Sum.(type) { | |||
case *tmcons.WALMessage_EventDataRoundState: | |||
pb = types.EventDataRoundState{ | |||
Height: msg.EventDataRoundState.Height, | |||
Round: msg.EventDataRoundState.Round, | |||
Step: msg.EventDataRoundState.Step, | |||
} | |||
case *tmcons.WALMessage_MsgInfo: | |||
walMsg, err := MsgFromProto(&msg.MsgInfo.Msg) | |||
if err != nil { | |||
return nil, fmt.Errorf("msgInfo from proto error: %w", err) | |||
} | |||
pb = msgInfo{ | |||
Msg: walMsg, | |||
PeerID: p2p.ID(msg.MsgInfo.PeerID), | |||
} | |||
case *tmcons.WALMessage_TimeoutInfo: | |||
tis, err := tmmath.SafeConvertUint8(int64(msg.TimeoutInfo.Step)) | |||
// deny message based on possible overflow | |||
if err != nil { | |||
return nil, fmt.Errorf("denying message due to possible overflow: %w", err) | |||
} | |||
pb = timeoutInfo{ | |||
Duration: msg.TimeoutInfo.Duration, | |||
Height: msg.TimeoutInfo.Height, | |||
Round: msg.TimeoutInfo.Round, | |||
Step: cstypes.RoundStepType(tis), | |||
} | |||
return pb, nil | |||
case *tmcons.WALMessage_EndHeight: | |||
pb := EndHeightMessage{ | |||
Height: msg.EndHeight.Height, | |||
} | |||
return pb, nil | |||
default: | |||
return nil, fmt.Errorf("from proto: wal message not recognized: %T", msg) | |||
} | |||
return pb, nil | |||
} |
@ -0,0 +1,533 @@ | |||
package consensus | |||
import ( | |||
"bytes" | |||
"fmt" | |||
"hash/crc32" | |||
"io" | |||
"reflect" | |||
"time" | |||
abci "github.com/tendermint/tendermint/abci/types" | |||
"github.com/tendermint/tendermint/crypto/merkle" | |||
"github.com/tendermint/tendermint/libs/log" | |||
"github.com/tendermint/tendermint/proxy" | |||
sm "github.com/tendermint/tendermint/state" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
var crc32c = crc32.MakeTable(crc32.Castagnoli) | |||
// Functionality to replay blocks and messages on recovery from a crash. | |||
// There are two general failure scenarios: | |||
// | |||
// 1. failure during consensus | |||
// 2. failure while applying the block | |||
// | |||
// The former is handled by the WAL, the latter by the proxyApp Handshake on | |||
// restart, which ultimately hands off the work to the WAL. | |||
//----------------------------------------- | |||
// 1. Recover from failure during consensus | |||
// (by replaying messages from the WAL) | |||
//----------------------------------------- | |||
// Unmarshal and apply a single message to the consensus state as if it were | |||
// received in receiveRoutine. Lines that start with "#" are ignored. | |||
// NOTE: receiveRoutine should not be running. | |||
func (cs *State) readReplayMessage(msg *TimedWALMessage, newStepSub types.Subscription) error { | |||
// Skip meta messages which exist for demarcating boundaries. | |||
if _, ok := msg.Msg.(EndHeightMessage); ok { | |||
return nil | |||
} | |||
// for logging | |||
switch m := msg.Msg.(type) { | |||
case types.EventDataRoundState: | |||
cs.Logger.Info("Replay: New Step", "height", m.Height, "round", m.Round, "step", m.Step) | |||
// these are playback checks | |||
ticker := time.After(time.Second * 2) | |||
if newStepSub != nil { | |||
select { | |||
case stepMsg := <-newStepSub.Out(): | |||
m2 := stepMsg.Data().(types.EventDataRoundState) | |||
if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step { | |||
return fmt.Errorf("roundState mismatch. Got %v; Expected %v", m2, m) | |||
} | |||
case <-newStepSub.Cancelled(): | |||
return fmt.Errorf("failed to read off newStepSub.Out(). newStepSub was cancelled") | |||
case <-ticker: | |||
return fmt.Errorf("failed to read off newStepSub.Out()") | |||
} | |||
} | |||
case msgInfo: | |||
peerID := m.PeerID | |||
if peerID == "" { | |||
peerID = "local" | |||
} | |||
switch msg := m.Msg.(type) { | |||
case *ProposalMessage: | |||
p := msg.Proposal | |||
cs.Logger.Info("Replay: Proposal", "height", p.Height, "round", p.Round, "header", | |||
p.BlockID.PartSetHeader, "pol", p.POLRound, "peer", peerID) | |||
case *BlockPartMessage: | |||
cs.Logger.Info("Replay: BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerID) | |||
case *VoteMessage: | |||
v := msg.Vote | |||
cs.Logger.Info("Replay: Vote", "height", v.Height, "round", v.Round, "type", v.Type, | |||
"blockID", v.BlockID, "peer", peerID) | |||
} | |||
cs.handleMsg(m) | |||
case timeoutInfo: | |||
cs.Logger.Info("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration) | |||
cs.handleTimeout(m, cs.RoundState) | |||
default: | |||
return fmt.Errorf("replay: Unknown TimedWALMessage type: %v", reflect.TypeOf(msg.Msg)) | |||
} | |||
return nil | |||
} | |||
// Replay only those messages since the last block. `timeoutRoutine` should | |||
// run concurrently to read off tickChan. | |||
func (cs *State) catchupReplay(csHeight int64) error { | |||
// Set replayMode to true so we don't log signing errors. | |||
cs.replayMode = true | |||
defer func() { cs.replayMode = false }() | |||
// Ensure that #ENDHEIGHT for this height doesn't exist. | |||
// NOTE: This is just a sanity check. As far as we know things work fine | |||
// without it, and Handshake could reuse State if it weren't for | |||
// this check (since we can crash after writing #ENDHEIGHT). | |||
// | |||
// Ignore data corruption errors since this is a sanity check. | |||
gr, found, err := cs.wal.SearchForEndHeight(csHeight, &WALSearchOptions{IgnoreDataCorruptionErrors: true}) | |||
if err != nil { | |||
return err | |||
} | |||
if gr != nil { | |||
if err := gr.Close(); err != nil { | |||
return err | |||
} | |||
} | |||
if found { | |||
return fmt.Errorf("wal should not contain #ENDHEIGHT %d", csHeight) | |||
} | |||
// Search for last height marker. | |||
// | |||
// Ignore data corruption errors in previous heights because we only care about last height | |||
if csHeight < cs.state.InitialHeight { | |||
return fmt.Errorf("cannot replay height %v, below initial height %v", csHeight, cs.state.InitialHeight) | |||
} | |||
endHeight := csHeight - 1 | |||
if csHeight == cs.state.InitialHeight { | |||
endHeight = 0 | |||
} | |||
gr, found, err = cs.wal.SearchForEndHeight(endHeight, &WALSearchOptions{IgnoreDataCorruptionErrors: true}) | |||
if err == io.EOF { | |||
cs.Logger.Error("Replay: wal.group.Search returned EOF", "#ENDHEIGHT", endHeight) | |||
} else if err != nil { | |||
return err | |||
} | |||
if !found { | |||
return fmt.Errorf("cannot replay height %d. WAL does not contain #ENDHEIGHT for %d", csHeight, endHeight) | |||
} | |||
defer gr.Close() | |||
cs.Logger.Info("Catchup by replaying consensus messages", "height", csHeight) | |||
var msg *TimedWALMessage | |||
dec := WALDecoder{gr} | |||
LOOP: | |||
for { | |||
msg, err = dec.Decode() | |||
switch { | |||
case err == io.EOF: | |||
break LOOP | |||
case IsDataCorruptionError(err): | |||
cs.Logger.Error("data has been corrupted in last height of consensus WAL", "err", err, "height", csHeight) | |||
return err | |||
case err != nil: | |||
return err | |||
} | |||
// NOTE: since the priv key is set when the msgs are received | |||
// it will attempt to eg double sign but we can just ignore it | |||
// since the votes will be replayed and we'll get to the next step | |||
if err := cs.readReplayMessage(msg, nil); err != nil { | |||
return err | |||
} | |||
} | |||
cs.Logger.Info("Replay: Done") | |||
return nil | |||
} | |||
//-------------------------------------------------------------------------------- | |||
// Parses marker lines of the form: | |||
// #ENDHEIGHT: 12345 | |||
/* | |||
func makeHeightSearchFunc(height int64) auto.SearchFunc { | |||
return func(line string) (int, error) { | |||
line = strings.TrimRight(line, "\n") | |||
parts := strings.Split(line, " ") | |||
if len(parts) != 2 { | |||
return -1, errors.New("line did not have 2 parts") | |||
} | |||
i, err := strconv.Atoi(parts[1]) | |||
if err != nil { | |||
return -1, errors.New("failed to parse INFO: " + err.Error()) | |||
} | |||
if height < i { | |||
return 1, nil | |||
} else if height == i { | |||
return 0, nil | |||
} else { | |||
return -1, nil | |||
} | |||
} | |||
}*/ | |||
//--------------------------------------------------- | |||
// 2. Recover from failure while applying the block. | |||
// (by handshaking with the app to figure out where | |||
// we were last, and using the WAL to recover there.) | |||
//--------------------------------------------------- | |||
type Handshaker struct { | |||
stateStore sm.Store | |||
initialState sm.State | |||
store sm.BlockStore | |||
eventBus types.BlockEventPublisher | |||
genDoc *types.GenesisDoc | |||
logger log.Logger | |||
nBlocks int // number of blocks applied to the state | |||
} | |||
func NewHandshaker(stateStore sm.Store, state sm.State, | |||
store sm.BlockStore, genDoc *types.GenesisDoc) *Handshaker { | |||
return &Handshaker{ | |||
stateStore: stateStore, | |||
initialState: state, | |||
store: store, | |||
eventBus: types.NopEventBus{}, | |||
genDoc: genDoc, | |||
logger: log.NewNopLogger(), | |||
nBlocks: 0, | |||
} | |||
} | |||
func (h *Handshaker) SetLogger(l log.Logger) { | |||
h.logger = l | |||
} | |||
// SetEventBus - sets the event bus for publishing block related events. | |||
// If not called, it defaults to types.NopEventBus. | |||
func (h *Handshaker) SetEventBus(eventBus types.BlockEventPublisher) { | |||
h.eventBus = eventBus | |||
} | |||
// NBlocks returns the number of blocks applied to the state. | |||
func (h *Handshaker) NBlocks() int { | |||
return h.nBlocks | |||
} | |||
// TODO: retry the handshake/replay if it fails ? | |||
func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { | |||
// Handshake is done via ABCI Info on the query conn. | |||
res, err := proxyApp.Query().InfoSync(proxy.RequestInfo) | |||
if err != nil { | |||
return fmt.Errorf("error calling Info: %v", err) | |||
} | |||
blockHeight := res.LastBlockHeight | |||
if blockHeight < 0 { | |||
return fmt.Errorf("got a negative last block height (%d) from the app", blockHeight) | |||
} | |||
appHash := res.LastBlockAppHash | |||
h.logger.Info("ABCI Handshake App Info", | |||
"height", blockHeight, | |||
"hash", fmt.Sprintf("%X", appHash), | |||
"software-version", res.Version, | |||
"protocol-version", res.AppVersion, | |||
) | |||
// Only set the version if there is no existing state. | |||
if h.initialState.LastBlockHeight == 0 { | |||
h.initialState.Version.Consensus.App = res.AppVersion | |||
} | |||
// Replay blocks up to the latest in the blockstore. | |||
_, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp) | |||
if err != nil { | |||
return fmt.Errorf("error on replay: %v", err) | |||
} | |||
h.logger.Info("Completed ABCI Handshake - Tendermint and App are synced", | |||
"appHeight", blockHeight, "appHash", fmt.Sprintf("%X", appHash)) | |||
// TODO: (on restart) replay mempool | |||
return nil | |||
} | |||
// ReplayBlocks replays all blocks since appBlockHeight and ensures the result | |||
// matches the current state. | |||
// Returns the final AppHash or an error. | |||
func (h *Handshaker) ReplayBlocks( | |||
state sm.State, | |||
appHash []byte, | |||
appBlockHeight int64, | |||
proxyApp proxy.AppConns, | |||
) ([]byte, error) { | |||
storeBlockBase := h.store.Base() | |||
storeBlockHeight := h.store.Height() | |||
stateBlockHeight := state.LastBlockHeight | |||
h.logger.Info( | |||
"ABCI Replay Blocks", | |||
"appHeight", | |||
appBlockHeight, | |||
"storeHeight", | |||
storeBlockHeight, | |||
"stateHeight", | |||
stateBlockHeight) | |||
// If appBlockHeight == 0 it means that we are at genesis and hence should send InitChain. | |||
if appBlockHeight == 0 { | |||
validators := make([]*types.Validator, len(h.genDoc.Validators)) | |||
for i, val := range h.genDoc.Validators { | |||
validators[i] = types.NewValidator(val.PubKey, val.Power) | |||
} | |||
validatorSet := types.NewValidatorSet(validators) | |||
nextVals := types.TM2PB.ValidatorUpdates(validatorSet) | |||
csParams := types.TM2PB.ConsensusParams(h.genDoc.ConsensusParams) | |||
req := abci.RequestInitChain{ | |||
Time: h.genDoc.GenesisTime, | |||
ChainId: h.genDoc.ChainID, | |||
InitialHeight: h.genDoc.InitialHeight, | |||
ConsensusParams: csParams, | |||
Validators: nextVals, | |||
AppStateBytes: h.genDoc.AppState, | |||
} | |||
res, err := proxyApp.Consensus().InitChainSync(req) | |||
if err != nil { | |||
return nil, err | |||
} | |||
appHash = res.AppHash | |||
if stateBlockHeight == 0 { // we only update state when we are in initial state | |||
// If the app did not return an app hash, we keep the one set from the genesis doc in | |||
// the state. We don't set appHash since we don't want the genesis doc app hash | |||
// recorded in the genesis block. We should probably just remove GenesisDoc.AppHash. | |||
if len(res.AppHash) > 0 { | |||
state.AppHash = res.AppHash | |||
} | |||
// If the app returned validators or consensus params, update the state. | |||
if len(res.Validators) > 0 { | |||
vals, err := types.PB2TM.ValidatorUpdates(res.Validators) | |||
if err != nil { | |||
return nil, err | |||
} | |||
state.Validators = types.NewValidatorSet(vals) | |||
state.NextValidators = types.NewValidatorSet(vals).CopyIncrementProposerPriority(1) | |||
} else if len(h.genDoc.Validators) == 0 { | |||
// If validator set is not set in genesis and still empty after InitChain, exit. | |||
return nil, fmt.Errorf("validator set is nil in genesis and still empty after InitChain") | |||
} | |||
if res.ConsensusParams != nil { | |||
state.ConsensusParams = types.UpdateConsensusParams(state.ConsensusParams, res.ConsensusParams) | |||
state.Version.Consensus.App = state.ConsensusParams.Version.AppVersion | |||
} | |||
// We update the last results hash with the empty hash, to conform with RFC-6962. | |||
state.LastResultsHash = merkle.HashFromByteSlices(nil) | |||
if err := h.stateStore.Save(state); err != nil { | |||
return nil, err | |||
} | |||
} | |||
} | |||
// First handle edge cases and constraints on the storeBlockHeight and storeBlockBase. | |||
switch { | |||
case storeBlockHeight == 0: | |||
assertAppHashEqualsOneFromState(appHash, state) | |||
return appHash, nil | |||
case appBlockHeight == 0 && state.InitialHeight < storeBlockBase: | |||
// the app has no state, and the block store is truncated above the initial height | |||
return appHash, sm.ErrAppBlockHeightTooLow{AppHeight: appBlockHeight, StoreBase: storeBlockBase} | |||
case appBlockHeight > 0 && appBlockHeight < storeBlockBase-1: | |||
// the app is too far behind truncated store (can be 1 behind since we replay the next) | |||
return appHash, sm.ErrAppBlockHeightTooLow{AppHeight: appBlockHeight, StoreBase: storeBlockBase} | |||
case storeBlockHeight < appBlockHeight: | |||
// the app should never be ahead of the store (but this is under app's control) | |||
return appHash, sm.ErrAppBlockHeightTooHigh{CoreHeight: storeBlockHeight, AppHeight: appBlockHeight} | |||
case storeBlockHeight < stateBlockHeight: | |||
// the state should never be ahead of the store (this is under tendermint's control) | |||
panic(fmt.Sprintf("StateBlockHeight (%d) > StoreBlockHeight (%d)", stateBlockHeight, storeBlockHeight)) | |||
case storeBlockHeight > stateBlockHeight+1: | |||
// store should be at most one ahead of the state (this is under tendermint's control) | |||
panic(fmt.Sprintf("StoreBlockHeight (%d) > StateBlockHeight + 1 (%d)", storeBlockHeight, stateBlockHeight+1)) | |||
} | |||
var err error | |||
// Now either store is equal to state, or one ahead. | |||
// For each, consider all cases of where the app could be, given app <= store | |||
if storeBlockHeight == stateBlockHeight { | |||
// Tendermint ran Commit and saved the state. | |||
// Either the app is asking for replay, or we're all synced up. | |||
if appBlockHeight < storeBlockHeight { | |||
// the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store) | |||
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false) | |||
} else if appBlockHeight == storeBlockHeight { | |||
// We're good! | |||
assertAppHashEqualsOneFromState(appHash, state) | |||
return appHash, nil | |||
} | |||
} else if storeBlockHeight == stateBlockHeight+1 { | |||
// We saved the block in the store but haven't updated the state, | |||
// so we'll need to replay a block using the WAL. | |||
switch { | |||
case appBlockHeight < stateBlockHeight: | |||
// the app is further behind than it should be, so replay blocks | |||
// but leave the last block to go through the WAL | |||
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true) | |||
case appBlockHeight == stateBlockHeight: | |||
// We haven't run Commit (both the state and app are one block behind), | |||
// so replayBlock with the real app. | |||
// NOTE: We could instead use the cs.WAL on cs.Start, | |||
// but we'd have to allow the WAL to replay a block that wrote it's #ENDHEIGHT | |||
h.logger.Info("Replay last block using real app") | |||
state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus()) | |||
return state.AppHash, err | |||
case appBlockHeight == storeBlockHeight: | |||
// We ran Commit, but didn't save the state, so replayBlock with mock app. | |||
abciResponses, err := h.stateStore.LoadABCIResponses(storeBlockHeight) | |||
if err != nil { | |||
return nil, err | |||
} | |||
mockApp := newMockProxyApp(appHash, abciResponses) | |||
h.logger.Info("Replay last block using mock app") | |||
state, err = h.replayBlock(state, storeBlockHeight, mockApp) | |||
return state.AppHash, err | |||
} | |||
} | |||
panic(fmt.Sprintf("uncovered case! appHeight: %d, storeHeight: %d, stateHeight: %d", | |||
appBlockHeight, storeBlockHeight, stateBlockHeight)) | |||
} | |||
func (h *Handshaker) replayBlocks( | |||
state sm.State, | |||
proxyApp proxy.AppConns, | |||
appBlockHeight, | |||
storeBlockHeight int64, | |||
mutateState bool) ([]byte, error) { | |||
// App is further behind than it should be, so we need to replay blocks. | |||
// We replay all blocks from appBlockHeight+1. | |||
// | |||
// Note that we don't have an old version of the state, | |||
// so we by-pass state validation/mutation using sm.ExecCommitBlock. | |||
// This also means we won't be saving validator sets if they change during this period. | |||
// TODO: Load the historical information to fix this and just use state.ApplyBlock | |||
// | |||
// If mutateState == true, the final block is replayed with h.replayBlock() | |||
var appHash []byte | |||
var err error | |||
finalBlock := storeBlockHeight | |||
if mutateState { | |||
finalBlock-- | |||
} | |||
firstBlock := appBlockHeight + 1 | |||
if firstBlock == 1 { | |||
firstBlock = state.InitialHeight | |||
} | |||
for i := firstBlock; i <= finalBlock; i++ { | |||
h.logger.Info("Applying block", "height", i) | |||
block := h.store.LoadBlock(i) | |||
// Extra check to ensure the app was not changed in a way it shouldn't have. | |||
if len(appHash) > 0 { | |||
assertAppHashEqualsOneFromBlock(appHash, block) | |||
} | |||
appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight) | |||
if err != nil { | |||
return nil, err | |||
} | |||
h.nBlocks++ | |||
} | |||
if mutateState { | |||
// sync the final block | |||
state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus()) | |||
if err != nil { | |||
return nil, err | |||
} | |||
appHash = state.AppHash | |||
} | |||
assertAppHashEqualsOneFromState(appHash, state) | |||
return appHash, nil | |||
} | |||
// ApplyBlock on the proxyApp with the last block. | |||
func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.AppConnConsensus) (sm.State, error) { | |||
block := h.store.LoadBlock(height) | |||
meta := h.store.LoadBlockMeta(height) | |||
// Use stubs for both mempool and evidence pool since no transactions nor | |||
// evidence are needed here - block already exists. | |||
blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{}) | |||
blockExec.SetEventBus(h.eventBus) | |||
var err error | |||
state, _, err = blockExec.ApplyBlock(state, meta.BlockID, block) | |||
if err != nil { | |||
return sm.State{}, err | |||
} | |||
h.nBlocks++ | |||
return state, nil | |||
} | |||
func assertAppHashEqualsOneFromBlock(appHash []byte, block *types.Block) { | |||
if !bytes.Equal(appHash, block.AppHash) { | |||
panic(fmt.Sprintf(`block.AppHash does not match AppHash after replay. Got %X, expected %X. | |||
Block: %v | |||
`, | |||
appHash, block.AppHash, block)) | |||
} | |||
} | |||
func assertAppHashEqualsOneFromState(appHash []byte, state sm.State) { | |||
if !bytes.Equal(appHash, state.AppHash) { | |||
panic(fmt.Sprintf(`state.AppHash does not match AppHash after replay. Got | |||
%X, expected %X. | |||
State: %v | |||
Did you reset Tendermint without resetting your application's data?`, | |||
appHash, state.AppHash, state)) | |||
} | |||
} |
@ -0,0 +1,338 @@ | |||
package consensus | |||
import ( | |||
"bufio" | |||
"context" | |||
"errors" | |||
"fmt" | |||
"io" | |||
"os" | |||
"strconv" | |||
"strings" | |||
dbm "github.com/tendermint/tm-db" | |||
cfg "github.com/tendermint/tendermint/config" | |||
"github.com/tendermint/tendermint/libs/log" | |||
tmos "github.com/tendermint/tendermint/libs/os" | |||
"github.com/tendermint/tendermint/proxy" | |||
sm "github.com/tendermint/tendermint/state" | |||
"github.com/tendermint/tendermint/store" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
const ( | |||
// event bus subscriber | |||
subscriber = "replay-file" | |||
) | |||
//-------------------------------------------------------- | |||
// replay messages interactively or all at once | |||
// replay the wal file | |||
func RunReplayFile(config cfg.BaseConfig, csConfig *cfg.ConsensusConfig, console bool) { | |||
consensusState := newConsensusStateForReplay(config, csConfig) | |||
if err := consensusState.ReplayFile(csConfig.WalFile(), console); err != nil { | |||
tmos.Exit(fmt.Sprintf("Error during consensus replay: %v", err)) | |||
} | |||
} | |||
// Replay msgs in file or start the console | |||
func (cs *State) ReplayFile(file string, console bool) error { | |||
if cs.IsRunning() { | |||
return errors.New("cs is already running, cannot replay") | |||
} | |||
if cs.wal != nil { | |||
return errors.New("cs wal is open, cannot replay") | |||
} | |||
cs.startForReplay() | |||
// ensure all new step events are regenerated as expected | |||
ctx := context.Background() | |||
newStepSub, err := cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep) | |||
if err != nil { | |||
return fmt.Errorf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep) | |||
} | |||
defer func() { | |||
if err := cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep); err != nil { | |||
cs.Logger.Error("Error unsubscribing to event bus", "err", err) | |||
} | |||
}() | |||
// just open the file for reading, no need to use wal | |||
fp, err := os.OpenFile(file, os.O_RDONLY, 0600) | |||
if err != nil { | |||
return err | |||
} | |||
pb := newPlayback(file, fp, cs, cs.state.Copy()) | |||
defer pb.fp.Close() | |||
var nextN int // apply N msgs in a row | |||
var msg *TimedWALMessage | |||
for { | |||
if nextN == 0 && console { | |||
nextN = pb.replayConsoleLoop() | |||
} | |||
msg, err = pb.dec.Decode() | |||
if err == io.EOF { | |||
return nil | |||
} else if err != nil { | |||
return err | |||
} | |||
if err := pb.cs.readReplayMessage(msg, newStepSub); err != nil { | |||
return err | |||
} | |||
if nextN > 0 { | |||
nextN-- | |||
} | |||
pb.count++ | |||
} | |||
} | |||
//------------------------------------------------ | |||
// playback manager | |||
type playback struct { | |||
cs *State | |||
fp *os.File | |||
dec *WALDecoder | |||
count int // how many lines/msgs into the file are we | |||
// replays can be reset to beginning | |||
fileName string // so we can close/reopen the file | |||
genesisState sm.State // so the replay session knows where to restart from | |||
} | |||
func newPlayback(fileName string, fp *os.File, cs *State, genState sm.State) *playback { | |||
return &playback{ | |||
cs: cs, | |||
fp: fp, | |||
fileName: fileName, | |||
genesisState: genState, | |||
dec: NewWALDecoder(fp), | |||
} | |||
} | |||
// go back count steps by resetting the state and running (pb.count - count) steps | |||
func (pb *playback) replayReset(count int, newStepSub types.Subscription) error { | |||
if err := pb.cs.Stop(); err != nil { | |||
return err | |||
} | |||
pb.cs.Wait() | |||
newCS := NewState(pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec, | |||
pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool, map[int64]Misbehavior{}) | |||
newCS.SetEventBus(pb.cs.eventBus) | |||
newCS.startForReplay() | |||
if err := pb.fp.Close(); err != nil { | |||
return err | |||
} | |||
fp, err := os.OpenFile(pb.fileName, os.O_RDONLY, 0600) | |||
if err != nil { | |||
return err | |||
} | |||
pb.fp = fp | |||
pb.dec = NewWALDecoder(fp) | |||
count = pb.count - count | |||
fmt.Printf("Reseting from %d to %d\n", pb.count, count) | |||
pb.count = 0 | |||
pb.cs = newCS | |||
var msg *TimedWALMessage | |||
for i := 0; i < count; i++ { | |||
msg, err = pb.dec.Decode() | |||
if err == io.EOF { | |||
return nil | |||
} else if err != nil { | |||
return err | |||
} | |||
if err := pb.cs.readReplayMessage(msg, newStepSub); err != nil { | |||
return err | |||
} | |||
pb.count++ | |||
} | |||
return nil | |||
} | |||
func (cs *State) startForReplay() { | |||
cs.Logger.Error("Replay commands are disabled until someone updates them and writes tests") | |||
/* TODO:! | |||
// since we replay tocks we just ignore ticks | |||
go func() { | |||
for { | |||
select { | |||
case <-cs.tickChan: | |||
case <-cs.Quit: | |||
return | |||
} | |||
} | |||
}()*/ | |||
} | |||
// console function for parsing input and running commands | |||
func (pb *playback) replayConsoleLoop() int { | |||
for { | |||
fmt.Printf("> ") | |||
bufReader := bufio.NewReader(os.Stdin) | |||
line, more, err := bufReader.ReadLine() | |||
if more { | |||
tmos.Exit("input is too long") | |||
} else if err != nil { | |||
tmos.Exit(err.Error()) | |||
} | |||
tokens := strings.Split(string(line), " ") | |||
if len(tokens) == 0 { | |||
continue | |||
} | |||
switch tokens[0] { | |||
case "next": | |||
// "next" -> replay next message | |||
// "next N" -> replay next N messages | |||
if len(tokens) == 1 { | |||
return 0 | |||
} | |||
i, err := strconv.Atoi(tokens[1]) | |||
if err != nil { | |||
fmt.Println("next takes an integer argument") | |||
} else { | |||
return i | |||
} | |||
case "back": | |||
// "back" -> go back one message | |||
// "back N" -> go back N messages | |||
// NOTE: "back" is not supported in the state machine design, | |||
// so we restart and replay up to | |||
ctx := context.Background() | |||
// ensure all new step events are regenerated as expected | |||
newStepSub, err := pb.cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep) | |||
if err != nil { | |||
tmos.Exit(fmt.Sprintf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep)) | |||
} | |||
defer func() { | |||
if err := pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep); err != nil { | |||
pb.cs.Logger.Error("Error unsubscribing from eventBus", "err", err) | |||
} | |||
}() | |||
if len(tokens) == 1 { | |||
if err := pb.replayReset(1, newStepSub); err != nil { | |||
pb.cs.Logger.Error("Replay reset error", "err", err) | |||
} | |||
} else { | |||
i, err := strconv.Atoi(tokens[1]) | |||
if err != nil { | |||
fmt.Println("back takes an integer argument") | |||
} else if i > pb.count { | |||
fmt.Printf("argument to back must not be larger than the current count (%d)\n", pb.count) | |||
} else if err := pb.replayReset(i, newStepSub); err != nil { | |||
pb.cs.Logger.Error("Replay reset error", "err", err) | |||
} | |||
} | |||
case "rs": | |||
// "rs" -> print entire round state | |||
// "rs short" -> print height/round/step | |||
// "rs <field>" -> print another field of the round state | |||
rs := pb.cs.RoundState | |||
if len(tokens) == 1 { | |||
fmt.Println(rs) | |||
} else { | |||
switch tokens[1] { | |||
case "short": | |||
fmt.Printf("%v/%v/%v\n", rs.Height, rs.Round, rs.Step) | |||
case "validators": | |||
fmt.Println(rs.Validators) | |||
case "proposal": | |||
fmt.Println(rs.Proposal) | |||
case "proposal_block": | |||
fmt.Printf("%v %v\n", rs.ProposalBlockParts.StringShort(), rs.ProposalBlock.StringShort()) | |||
case "locked_round": | |||
fmt.Println(rs.LockedRound) | |||
case "locked_block": | |||
fmt.Printf("%v %v\n", rs.LockedBlockParts.StringShort(), rs.LockedBlock.StringShort()) | |||
case "votes": | |||
fmt.Println(rs.Votes.StringIndented(" ")) | |||
default: | |||
fmt.Println("Unknown option", tokens[1]) | |||
} | |||
} | |||
case "n": | |||
fmt.Println(pb.count) | |||
} | |||
} | |||
} | |||
//-------------------------------------------------------------------------------- | |||
// convenience for replay mode | |||
func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusConfig) *State { | |||
dbType := dbm.BackendType(config.DBBackend) | |||
// Get BlockStore | |||
blockStoreDB, err := dbm.NewDB("blockstore", dbType, config.DBDir()) | |||
if err != nil { | |||
tmos.Exit(err.Error()) | |||
} | |||
blockStore := store.NewBlockStore(blockStoreDB) | |||
// Get State | |||
stateDB, err := dbm.NewDB("state", dbType, config.DBDir()) | |||
if err != nil { | |||
tmos.Exit(err.Error()) | |||
} | |||
stateStore := sm.NewStore(stateDB) | |||
gdoc, err := sm.MakeGenesisDocFromFile(config.GenesisFile()) | |||
if err != nil { | |||
tmos.Exit(err.Error()) | |||
} | |||
state, err := sm.MakeGenesisState(gdoc) | |||
if err != nil { | |||
tmos.Exit(err.Error()) | |||
} | |||
// Create proxyAppConn connection (consensus, mempool, query) | |||
clientCreator := proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()) | |||
proxyApp := proxy.NewAppConns(clientCreator) | |||
err = proxyApp.Start() | |||
if err != nil { | |||
tmos.Exit(fmt.Sprintf("Error starting proxy app conns: %v", err)) | |||
} | |||
eventBus := types.NewEventBus() | |||
if err := eventBus.Start(); err != nil { | |||
tmos.Exit(fmt.Sprintf("Failed to start event bus: %v", err)) | |||
} | |||
handshaker := NewHandshaker(stateStore, state, blockStore, gdoc) | |||
handshaker.SetEventBus(eventBus) | |||
err = handshaker.Handshake(proxyApp) | |||
if err != nil { | |||
tmos.Exit(fmt.Sprintf("Error on handshake: %v", err)) | |||
} | |||
mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{} | |||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool) | |||
consensusState := NewState(csConfig, state.Copy(), blockExec, | |||
blockStore, mempool, evpool, map[int64]Misbehavior{}) | |||
consensusState.SetEventBus(eventBus) | |||
return consensusState | |||
} |
@ -0,0 +1,90 @@ | |||
package consensus | |||
import ( | |||
abci "github.com/tendermint/tendermint/abci/types" | |||
"github.com/tendermint/tendermint/libs/clist" | |||
mempl "github.com/tendermint/tendermint/mempool" | |||
tmstate "github.com/tendermint/tendermint/proto/tendermint/state" | |||
"github.com/tendermint/tendermint/proxy" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
//----------------------------------------------------------------------------- | |||
type emptyMempool struct{} | |||
var _ mempl.Mempool = emptyMempool{} | |||
func (emptyMempool) Lock() {} | |||
func (emptyMempool) Unlock() {} | |||
func (emptyMempool) Size() int { return 0 } | |||
func (emptyMempool) CheckTx(_ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error { | |||
return nil | |||
} | |||
func (emptyMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } | |||
func (emptyMempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} } | |||
func (emptyMempool) Update( | |||
_ int64, | |||
_ types.Txs, | |||
_ []*abci.ResponseDeliverTx, | |||
_ mempl.PreCheckFunc, | |||
_ mempl.PostCheckFunc, | |||
) error { | |||
return nil | |||
} | |||
func (emptyMempool) Flush() {} | |||
func (emptyMempool) FlushAppConn() error { return nil } | |||
func (emptyMempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) } | |||
func (emptyMempool) EnableTxsAvailable() {} | |||
func (emptyMempool) TxsBytes() int64 { return 0 } | |||
func (emptyMempool) TxsFront() *clist.CElement { return nil } | |||
func (emptyMempool) TxsWaitChan() <-chan struct{} { return nil } | |||
func (emptyMempool) InitWAL() error { return nil } | |||
func (emptyMempool) CloseWAL() {} | |||
//----------------------------------------------------------------------------- | |||
// mockProxyApp uses ABCIResponses to give the right results. | |||
// | |||
// Useful because we don't want to call Commit() twice for the same block on | |||
// the real app. | |||
func newMockProxyApp(appHash []byte, abciResponses *tmstate.ABCIResponses) proxy.AppConnConsensus { | |||
clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{ | |||
appHash: appHash, | |||
abciResponses: abciResponses, | |||
}) | |||
cli, _ := clientCreator.NewABCIClient() | |||
err := cli.Start() | |||
if err != nil { | |||
panic(err) | |||
} | |||
return proxy.NewAppConnConsensus(cli) | |||
} | |||
type mockProxyApp struct { | |||
abci.BaseApplication | |||
appHash []byte | |||
txCount int | |||
abciResponses *tmstate.ABCIResponses | |||
} | |||
func (mock *mockProxyApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx { | |||
r := mock.abciResponses.DeliverTxs[mock.txCount] | |||
mock.txCount++ | |||
if r == nil { | |||
return abci.ResponseDeliverTx{} | |||
} | |||
return *r | |||
} | |||
func (mock *mockProxyApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock { | |||
mock.txCount = 0 | |||
return *mock.abciResponses.EndBlock | |||
} | |||
func (mock *mockProxyApp) Commit() abci.ResponseCommit { | |||
return abci.ResponseCommit{Data: mock.appHash} | |||
} |
@ -0,0 +1,134 @@ | |||
package consensus | |||
import ( | |||
"time" | |||
"github.com/tendermint/tendermint/libs/log" | |||
"github.com/tendermint/tendermint/libs/service" | |||
) | |||
var ( | |||
tickTockBufferSize = 10 | |||
) | |||
// TimeoutTicker is a timer that schedules timeouts | |||
// conditional on the height/round/step in the timeoutInfo. | |||
// The timeoutInfo.Duration may be non-positive. | |||
type TimeoutTicker interface { | |||
Start() error | |||
Stop() error | |||
Chan() <-chan timeoutInfo // on which to receive a timeout | |||
ScheduleTimeout(ti timeoutInfo) // reset the timer | |||
SetLogger(log.Logger) | |||
} | |||
// timeoutTicker wraps time.Timer, | |||
// scheduling timeouts only for greater height/round/step | |||
// than what it's already seen. | |||
// Timeouts are scheduled along the tickChan, | |||
// and fired on the tockChan. | |||
type timeoutTicker struct { | |||
service.BaseService | |||
timer *time.Timer | |||
tickChan chan timeoutInfo // for scheduling timeouts | |||
tockChan chan timeoutInfo // for notifying about them | |||
} | |||
// NewTimeoutTicker returns a new TimeoutTicker. | |||
func NewTimeoutTicker() TimeoutTicker { | |||
tt := &timeoutTicker{ | |||
timer: time.NewTimer(0), | |||
tickChan: make(chan timeoutInfo, tickTockBufferSize), | |||
tockChan: make(chan timeoutInfo, tickTockBufferSize), | |||
} | |||
tt.BaseService = *service.NewBaseService(nil, "TimeoutTicker", tt) | |||
tt.stopTimer() // don't want to fire until the first scheduled timeout | |||
return tt | |||
} | |||
// OnStart implements service.Service. It starts the timeout routine. | |||
func (t *timeoutTicker) OnStart() error { | |||
go t.timeoutRoutine() | |||
return nil | |||
} | |||
// OnStop implements service.Service. It stops the timeout routine. | |||
func (t *timeoutTicker) OnStop() { | |||
t.BaseService.OnStop() | |||
t.stopTimer() | |||
} | |||
// Chan returns a channel on which timeouts are sent. | |||
func (t *timeoutTicker) Chan() <-chan timeoutInfo { | |||
return t.tockChan | |||
} | |||
// ScheduleTimeout schedules a new timeout by sending on the internal tickChan. | |||
// The timeoutRoutine is always available to read from tickChan, so this won't block. | |||
// The scheduling may fail if the timeoutRoutine has already scheduled a timeout for a later height/round/step. | |||
func (t *timeoutTicker) ScheduleTimeout(ti timeoutInfo) { | |||
t.tickChan <- ti | |||
} | |||
//------------------------------------------------------------- | |||
// stop the timer and drain if necessary | |||
func (t *timeoutTicker) stopTimer() { | |||
// Stop() returns false if it was already fired or was stopped | |||
if !t.timer.Stop() { | |||
select { | |||
case <-t.timer.C: | |||
default: | |||
t.Logger.Debug("Timer already stopped") | |||
} | |||
} | |||
} | |||
// send on tickChan to start a new timer. | |||
// timers are interupted and replaced by new ticks from later steps | |||
// timeouts of 0 on the tickChan will be immediately relayed to the tockChan | |||
func (t *timeoutTicker) timeoutRoutine() { | |||
t.Logger.Debug("Starting timeout routine") | |||
var ti timeoutInfo | |||
for { | |||
select { | |||
case newti := <-t.tickChan: | |||
t.Logger.Debug("Received tick", "old_ti", ti, "new_ti", newti) | |||
// ignore tickers for old height/round/step | |||
if newti.Height < ti.Height { | |||
continue | |||
} else if newti.Height == ti.Height { | |||
if newti.Round < ti.Round { | |||
continue | |||
} else if newti.Round == ti.Round { | |||
if ti.Step > 0 && newti.Step <= ti.Step { | |||
continue | |||
} | |||
} | |||
} | |||
// stop the last timer | |||
t.stopTimer() | |||
// update timeoutInfo and reset timer | |||
// NOTE time.Timer allows duration to be non-positive | |||
ti = newti | |||
t.timer.Reset(ti.Duration) | |||
t.Logger.Debug("Scheduled timeout", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) | |||
case <-t.timer.C: | |||
t.Logger.Info("Timed out", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) | |||
// go routine here guarantees timeoutRoutine doesn't block. | |||
// Determinism comes from playback in the receiveRoutine. | |||
// We can eliminate it by merging the timeoutRoutine into receiveRoutine | |||
// and managing the timeouts ourselves with a millisecond ticker | |||
go func(toi timeoutInfo) { t.tockChan <- toi }(ti) | |||
case <-t.Quit(): | |||
return | |||
} | |||
} | |||
} |
@ -0,0 +1,437 @@ | |||
package consensus | |||
import ( | |||
"encoding/binary" | |||
"errors" | |||
"fmt" | |||
"hash/crc32" | |||
"io" | |||
"path/filepath" | |||
"time" | |||
"github.com/gogo/protobuf/proto" | |||
auto "github.com/tendermint/tendermint/libs/autofile" | |||
// tmjson "github.com/tendermint/tendermint/libs/json" | |||
"github.com/tendermint/tendermint/libs/log" | |||
tmos "github.com/tendermint/tendermint/libs/os" | |||
"github.com/tendermint/tendermint/libs/service" | |||
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus" | |||
tmtime "github.com/tendermint/tendermint/types/time" | |||
) | |||
const ( | |||
// time.Time + max consensus msg size | |||
maxMsgSizeBytes = maxMsgSize + 24 | |||
// how often the WAL should be sync'd during period sync'ing | |||
walDefaultFlushInterval = 2 * time.Second | |||
) | |||
//-------------------------------------------------------- | |||
// types and functions for savings consensus messages | |||
// TimedWALMessage wraps WALMessage and adds Time for debugging purposes. | |||
type TimedWALMessage struct { | |||
Time time.Time `json:"time"` | |||
Msg WALMessage `json:"msg"` | |||
} | |||
// EndHeightMessage marks the end of the given height inside WAL. | |||
// @internal used by scripts/wal2json util. | |||
type EndHeightMessage struct { | |||
Height int64 `json:"height"` | |||
} | |||
type WALMessage interface{} | |||
// func init() { | |||
// tmjson.RegisterType(msgInfo{}, "tendermint/wal/MsgInfo") | |||
// tmjson.RegisterType(timeoutInfo{}, "tendermint/wal/TimeoutInfo") | |||
// tmjson.RegisterType(EndHeightMessage{}, "tendermint/wal/EndHeightMessage") | |||
// } | |||
//-------------------------------------------------------- | |||
// Simple write-ahead logger | |||
// WAL is an interface for any write-ahead logger. | |||
type WAL interface { | |||
Write(WALMessage) error | |||
WriteSync(WALMessage) error | |||
FlushAndSync() error | |||
SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) | |||
// service methods | |||
Start() error | |||
Stop() error | |||
Wait() | |||
} | |||
// Write ahead logger writes msgs to disk before they are processed. | |||
// Can be used for crash-recovery and deterministic replay. | |||
// TODO: currently the wal is overwritten during replay catchup, give it a mode | |||
// so it's either reading or appending - must read to end to start appending | |||
// again. | |||
type BaseWAL struct { | |||
service.BaseService | |||
group *auto.Group | |||
enc *WALEncoder | |||
flushTicker *time.Ticker | |||
flushInterval time.Duration | |||
} | |||
var _ WAL = &BaseWAL{} | |||
// NewWAL returns a new write-ahead logger based on `baseWAL`, which implements | |||
// WAL. It's flushed and synced to disk every 2s and once when stopped. | |||
func NewWAL(walFile string, groupOptions ...func(*auto.Group)) (*BaseWAL, error) { | |||
err := tmos.EnsureDir(filepath.Dir(walFile), 0700) | |||
if err != nil { | |||
return nil, fmt.Errorf("failed to ensure WAL directory is in place: %w", err) | |||
} | |||
group, err := auto.OpenGroup(walFile, groupOptions...) | |||
if err != nil { | |||
return nil, err | |||
} | |||
wal := &BaseWAL{ | |||
group: group, | |||
enc: NewWALEncoder(group), | |||
flushInterval: walDefaultFlushInterval, | |||
} | |||
wal.BaseService = *service.NewBaseService(nil, "baseWAL", wal) | |||
return wal, nil | |||
} | |||
// SetFlushInterval allows us to override the periodic flush interval for the WAL. | |||
func (wal *BaseWAL) SetFlushInterval(i time.Duration) { | |||
wal.flushInterval = i | |||
} | |||
func (wal *BaseWAL) Group() *auto.Group { | |||
return wal.group | |||
} | |||
func (wal *BaseWAL) SetLogger(l log.Logger) { | |||
wal.BaseService.Logger = l | |||
wal.group.SetLogger(l) | |||
} | |||
func (wal *BaseWAL) OnStart() error { | |||
size, err := wal.group.Head.Size() | |||
if err != nil { | |||
return err | |||
} else if size == 0 { | |||
if err := wal.WriteSync(EndHeightMessage{0}); err != nil { | |||
return err | |||
} | |||
} | |||
err = wal.group.Start() | |||
if err != nil { | |||
return err | |||
} | |||
wal.flushTicker = time.NewTicker(wal.flushInterval) | |||
go wal.processFlushTicks() | |||
return nil | |||
} | |||
func (wal *BaseWAL) processFlushTicks() { | |||
for { | |||
select { | |||
case <-wal.flushTicker.C: | |||
if err := wal.FlushAndSync(); err != nil { | |||
wal.Logger.Error("Periodic WAL flush failed", "err", err) | |||
} | |||
case <-wal.Quit(): | |||
return | |||
} | |||
} | |||
} | |||
// FlushAndSync flushes and fsync's the underlying group's data to disk. | |||
// See auto#FlushAndSync | |||
func (wal *BaseWAL) FlushAndSync() error { | |||
return wal.group.FlushAndSync() | |||
} | |||
// Stop the underlying autofile group. | |||
// Use Wait() to ensure it's finished shutting down | |||
// before cleaning up files. | |||
func (wal *BaseWAL) OnStop() { | |||
wal.flushTicker.Stop() | |||
if err := wal.FlushAndSync(); err != nil { | |||
wal.Logger.Error("error on flush data to disk", "error", err) | |||
} | |||
if err := wal.group.Stop(); err != nil { | |||
wal.Logger.Error("error trying to stop wal", "error", err) | |||
} | |||
wal.group.Close() | |||
} | |||
// Wait for the underlying autofile group to finish shutting down | |||
// so it's safe to cleanup files. | |||
func (wal *BaseWAL) Wait() { | |||
wal.group.Wait() | |||
} | |||
// Write is called in newStep and for each receive on the | |||
// peerMsgQueue and the timeoutTicker. | |||
// NOTE: does not call fsync() | |||
func (wal *BaseWAL) Write(msg WALMessage) error { | |||
if wal == nil { | |||
return nil | |||
} | |||
if err := wal.enc.Encode(&TimedWALMessage{tmtime.Now(), msg}); err != nil { | |||
wal.Logger.Error("Error writing msg to consensus wal. WARNING: recover may not be possible for the current height", | |||
"err", err, "msg", msg) | |||
return err | |||
} | |||
return nil | |||
} | |||
// WriteSync is called when we receive a msg from ourselves | |||
// so that we write to disk before sending signed messages. | |||
// NOTE: calls fsync() | |||
func (wal *BaseWAL) WriteSync(msg WALMessage) error { | |||
if wal == nil { | |||
return nil | |||
} | |||
if err := wal.Write(msg); err != nil { | |||
return err | |||
} | |||
if err := wal.FlushAndSync(); err != nil { | |||
wal.Logger.Error(`WriteSync failed to flush consensus wal. | |||
WARNING: may result in creating alternative proposals / votes for the current height iff the node restarted`, | |||
"err", err) | |||
return err | |||
} | |||
return nil | |||
} | |||
// WALSearchOptions are optional arguments to SearchForEndHeight. | |||
type WALSearchOptions struct { | |||
// IgnoreDataCorruptionErrors set to true will result in skipping data corruption errors. | |||
IgnoreDataCorruptionErrors bool | |||
} | |||
// SearchForEndHeight searches for the EndHeightMessage with the given height | |||
// and returns an auto.GroupReader, whenever it was found or not and an error. | |||
// Group reader will be nil if found equals false. | |||
// | |||
// CONTRACT: caller must close group reader. | |||
func (wal *BaseWAL) SearchForEndHeight( | |||
height int64, | |||
options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) { | |||
var ( | |||
msg *TimedWALMessage | |||
gr *auto.GroupReader | |||
) | |||
lastHeightFound := int64(-1) | |||
// NOTE: starting from the last file in the group because we're usually | |||
// searching for the last height. See replay.go | |||
min, max := wal.group.MinIndex(), wal.group.MaxIndex() | |||
wal.Logger.Info("Searching for height", "height", height, "min", min, "max", max) | |||
for index := max; index >= min; index-- { | |||
gr, err = wal.group.NewReader(index) | |||
if err != nil { | |||
return nil, false, err | |||
} | |||
dec := NewWALDecoder(gr) | |||
for { | |||
msg, err = dec.Decode() | |||
if err == io.EOF { | |||
// OPTIMISATION: no need to look for height in older files if we've seen h < height | |||
if lastHeightFound > 0 && lastHeightFound < height { | |||
gr.Close() | |||
return nil, false, nil | |||
} | |||
// check next file | |||
break | |||
} | |||
if options.IgnoreDataCorruptionErrors && IsDataCorruptionError(err) { | |||
wal.Logger.Error("Corrupted entry. Skipping...", "err", err) | |||
// do nothing | |||
continue | |||
} else if err != nil { | |||
gr.Close() | |||
return nil, false, err | |||
} | |||
if m, ok := msg.Msg.(EndHeightMessage); ok { | |||
lastHeightFound = m.Height | |||
if m.Height == height { // found | |||
wal.Logger.Info("Found", "height", height, "index", index) | |||
return gr, true, nil | |||
} | |||
} | |||
} | |||
gr.Close() | |||
} | |||
return nil, false, nil | |||
} | |||
// ///////////////////////////////////////////////////////////////////////////// | |||
// A WALEncoder writes custom-encoded WAL messages to an output stream. | |||
// | |||
// Format: 4 bytes CRC sum + 4 bytes length + arbitrary-length value | |||
type WALEncoder struct { | |||
wr io.Writer | |||
} | |||
// NewWALEncoder returns a new encoder that writes to wr. | |||
func NewWALEncoder(wr io.Writer) *WALEncoder { | |||
return &WALEncoder{wr} | |||
} | |||
// Encode writes the custom encoding of v to the stream. It returns an error if | |||
// the encoded size of v is greater than 1MB. Any error encountered | |||
// during the write is also returned. | |||
func (enc *WALEncoder) Encode(v *TimedWALMessage) error { | |||
pbMsg, err := WALToProto(v.Msg) | |||
if err != nil { | |||
return err | |||
} | |||
pv := tmcons.TimedWALMessage{ | |||
Time: v.Time, | |||
Msg: pbMsg, | |||
} | |||
data, err := proto.Marshal(&pv) | |||
if err != nil { | |||
panic(fmt.Errorf("encode timed wall message failure: %w", err)) | |||
} | |||
crc := crc32.Checksum(data, crc32c) | |||
length := uint32(len(data)) | |||
if length > maxMsgSizeBytes { | |||
return fmt.Errorf("msg is too big: %d bytes, max: %d bytes", length, maxMsgSizeBytes) | |||
} | |||
totalLength := 8 + int(length) | |||
msg := make([]byte, totalLength) | |||
binary.BigEndian.PutUint32(msg[0:4], crc) | |||
binary.BigEndian.PutUint32(msg[4:8], length) | |||
copy(msg[8:], data) | |||
_, err = enc.wr.Write(msg) | |||
return err | |||
} | |||
// ///////////////////////////////////////////////////////////////////////////// | |||
// IsDataCorruptionError returns true if data has been corrupted inside WAL. | |||
func IsDataCorruptionError(err error) bool { | |||
_, ok := err.(DataCorruptionError) | |||
return ok | |||
} | |||
// DataCorruptionError is an error that occures if data on disk was corrupted. | |||
type DataCorruptionError struct { | |||
cause error | |||
} | |||
func (e DataCorruptionError) Error() string { | |||
return fmt.Sprintf("DataCorruptionError[%v]", e.cause) | |||
} | |||
func (e DataCorruptionError) Cause() error { | |||
return e.cause | |||
} | |||
// A WALDecoder reads and decodes custom-encoded WAL messages from an input | |||
// stream. See WALEncoder for the format used. | |||
// | |||
// It will also compare the checksums and make sure data size is equal to the | |||
// length from the header. If that is not the case, error will be returned. | |||
type WALDecoder struct { | |||
rd io.Reader | |||
} | |||
// NewWALDecoder returns a new decoder that reads from rd. | |||
func NewWALDecoder(rd io.Reader) *WALDecoder { | |||
return &WALDecoder{rd} | |||
} | |||
// Decode reads the next custom-encoded value from its reader and returns it. | |||
func (dec *WALDecoder) Decode() (*TimedWALMessage, error) { | |||
b := make([]byte, 4) | |||
_, err := dec.rd.Read(b) | |||
if errors.Is(err, io.EOF) { | |||
return nil, err | |||
} | |||
if err != nil { | |||
return nil, DataCorruptionError{fmt.Errorf("failed to read checksum: %v", err)} | |||
} | |||
crc := binary.BigEndian.Uint32(b) | |||
b = make([]byte, 4) | |||
_, err = dec.rd.Read(b) | |||
if err != nil { | |||
return nil, DataCorruptionError{fmt.Errorf("failed to read length: %v", err)} | |||
} | |||
length := binary.BigEndian.Uint32(b) | |||
if length > maxMsgSizeBytes { | |||
return nil, DataCorruptionError{fmt.Errorf( | |||
"length %d exceeded maximum possible value of %d bytes", | |||
length, | |||
maxMsgSizeBytes)} | |||
} | |||
data := make([]byte, length) | |||
n, err := dec.rd.Read(data) | |||
if err != nil { | |||
return nil, DataCorruptionError{fmt.Errorf("failed to read data: %v (read: %d, wanted: %d)", err, n, length)} | |||
} | |||
// check checksum before decoding data | |||
actualCRC := crc32.Checksum(data, crc32c) | |||
if actualCRC != crc { | |||
return nil, DataCorruptionError{fmt.Errorf("checksums do not match: read: %v, actual: %v", crc, actualCRC)} | |||
} | |||
var res = new(tmcons.TimedWALMessage) | |||
err = proto.Unmarshal(data, res) | |||
if err != nil { | |||
return nil, DataCorruptionError{fmt.Errorf("failed to decode data: %v", err)} | |||
} | |||
walMsg, err := WALFromProto(res.Msg) | |||
if err != nil { | |||
return nil, DataCorruptionError{fmt.Errorf("failed to convert from proto: %w", err)} | |||
} | |||
tMsgWal := &TimedWALMessage{ | |||
Time: res.Time, | |||
Msg: walMsg, | |||
} | |||
return tMsgWal, err | |||
} | |||
type nilWAL struct{} | |||
var _ WAL = nilWAL{} | |||
func (nilWAL) Write(m WALMessage) error { return nil } | |||
func (nilWAL) WriteSync(m WALMessage) error { return nil } | |||
func (nilWAL) FlushAndSync() error { return nil } | |||
func (nilWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) { | |||
return nil, false, nil | |||
} | |||
func (nilWAL) Start() error { return nil } | |||
func (nilWAL) Stop() error { return nil } | |||
func (nilWAL) Wait() {} |
@ -0,0 +1,31 @@ | |||
// +build gofuzz | |||
package consensus | |||
import ( | |||
"bytes" | |||
"io" | |||
) | |||
func Fuzz(data []byte) int { | |||
dec := NewWALDecoder(bytes.NewReader(data)) | |||
for { | |||
msg, err := dec.Decode() | |||
if err == io.EOF { | |||
break | |||
} | |||
if err != nil { | |||
if msg != nil { | |||
panic("msg != nil on error") | |||
} | |||
return 0 | |||
} | |||
var w bytes.Buffer | |||
enc := NewWALEncoder(&w) | |||
err = enc.Encode(msg) | |||
if err != nil { | |||
panic(err) | |||
} | |||
} | |||
return 1 | |||
} |
@ -0,0 +1,229 @@ | |||
package consensus | |||
import ( | |||
"bufio" | |||
"bytes" | |||
"fmt" | |||
"io" | |||
"path/filepath" | |||
"testing" | |||
"time" | |||
db "github.com/tendermint/tm-db" | |||
"github.com/tendermint/tendermint/abci/example/kvstore" | |||
cfg "github.com/tendermint/tendermint/config" | |||
"github.com/tendermint/tendermint/libs/log" | |||
tmrand "github.com/tendermint/tendermint/libs/rand" | |||
"github.com/tendermint/tendermint/privval" | |||
"github.com/tendermint/tendermint/proxy" | |||
sm "github.com/tendermint/tendermint/state" | |||
"github.com/tendermint/tendermint/store" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
// WALGenerateNBlocks generates a consensus WAL. It does this by spinning up a | |||
// stripped down version of node (proxy app, event bus, consensus state) with a | |||
// persistent kvstore application and special consensus wal instance | |||
// (byteBufferWAL) and waits until numBlocks are created. | |||
// If the node fails to produce given numBlocks, it returns an error. | |||
func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { | |||
config := getConfig(t) | |||
app := kvstore.NewPersistentKVStoreApplication(filepath.Join(config.DBDir(), "wal_generator")) | |||
logger := log.TestingLogger().With("wal_generator", "wal_generator") | |||
logger.Info("generating WAL (last height msg excluded)", "numBlocks", numBlocks) | |||
// /////////////////////////////////////////////////////////////////////////// | |||
// COPY PASTE FROM node.go WITH A FEW MODIFICATIONS | |||
// NOTE: we can't import node package because of circular dependency. | |||
// NOTE: we don't do handshake so need to set state.Version.Consensus.App directly. | |||
privValidatorKeyFile := config.PrivValidatorKeyFile() | |||
privValidatorStateFile := config.PrivValidatorStateFile() | |||
privValidator := privval.LoadOrGenFilePV(privValidatorKeyFile, privValidatorStateFile) | |||
genDoc, err := types.GenesisDocFromFile(config.GenesisFile()) | |||
if err != nil { | |||
return fmt.Errorf("failed to read genesis file: %w", err) | |||
} | |||
blockStoreDB := db.NewMemDB() | |||
stateDB := blockStoreDB | |||
stateStore := sm.NewStore(stateDB) | |||
state, err := sm.MakeGenesisState(genDoc) | |||
if err != nil { | |||
return fmt.Errorf("failed to make genesis state: %w", err) | |||
} | |||
state.Version.Consensus.App = kvstore.ProtocolVersion | |||
if err = stateStore.Save(state); err != nil { | |||
t.Error(err) | |||
} | |||
blockStore := store.NewBlockStore(blockStoreDB) | |||
proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app)) | |||
proxyApp.SetLogger(logger.With("module", "proxy")) | |||
if err := proxyApp.Start(); err != nil { | |||
return fmt.Errorf("failed to start proxy app connections: %w", err) | |||
} | |||
t.Cleanup(func() { | |||
if err := proxyApp.Stop(); err != nil { | |||
t.Error(err) | |||
} | |||
}) | |||
eventBus := types.NewEventBus() | |||
eventBus.SetLogger(logger.With("module", "events")) | |||
if err := eventBus.Start(); err != nil { | |||
return fmt.Errorf("failed to start event bus: %w", err) | |||
} | |||
t.Cleanup(func() { | |||
if err := eventBus.Stop(); err != nil { | |||
t.Error(err) | |||
} | |||
}) | |||
mempool := emptyMempool{} | |||
evpool := sm.EmptyEvidencePool{} | |||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool) | |||
consensusState := NewState(config.Consensus, state.Copy(), | |||
blockExec, blockStore, mempool, evpool, map[int64]Misbehavior{}) | |||
consensusState.SetLogger(logger) | |||
consensusState.SetEventBus(eventBus) | |||
if privValidator != nil { | |||
consensusState.SetPrivValidator(privValidator) | |||
} | |||
// END OF COPY PASTE | |||
// /////////////////////////////////////////////////////////////////////////// | |||
// set consensus wal to buffered WAL, which will write all incoming msgs to buffer | |||
numBlocksWritten := make(chan struct{}) | |||
wal := newByteBufferWAL(logger, NewWALEncoder(wr), int64(numBlocks), numBlocksWritten) | |||
// see wal.go#103 | |||
if err := wal.Write(EndHeightMessage{0}); err != nil { | |||
t.Error(err) | |||
} | |||
consensusState.wal = wal | |||
if err := consensusState.Start(); err != nil { | |||
return fmt.Errorf("failed to start consensus state: %w", err) | |||
} | |||
select { | |||
case <-numBlocksWritten: | |||
if err := consensusState.Stop(); err != nil { | |||
t.Error(err) | |||
} | |||
return nil | |||
case <-time.After(1 * time.Minute): | |||
if err := consensusState.Stop(); err != nil { | |||
t.Error(err) | |||
} | |||
return fmt.Errorf("waited too long for tendermint to produce %d blocks (grep logs for `wal_generator`)", numBlocks) | |||
} | |||
} | |||
// WALWithNBlocks returns a WAL content with numBlocks. | |||
func WALWithNBlocks(t *testing.T, numBlocks int) (data []byte, err error) { | |||
var b bytes.Buffer | |||
wr := bufio.NewWriter(&b) | |||
if err := WALGenerateNBlocks(t, wr, numBlocks); err != nil { | |||
return []byte{}, err | |||
} | |||
wr.Flush() | |||
return b.Bytes(), nil | |||
} | |||
func randPort() int { | |||
// returns between base and base + spread | |||
base, spread := 20000, 20000 | |||
return base + tmrand.Intn(spread) | |||
} | |||
func makeAddrs() (string, string, string) { | |||
start := randPort() | |||
return fmt.Sprintf("tcp://127.0.0.1:%d", start), | |||
fmt.Sprintf("tcp://127.0.0.1:%d", start+1), | |||
fmt.Sprintf("tcp://127.0.0.1:%d", start+2) | |||
} | |||
// getConfig returns a config for test cases | |||
func getConfig(t *testing.T) *cfg.Config { | |||
c := cfg.ResetTestRoot(t.Name()) | |||
// and we use random ports to run in parallel | |||
tm, rpc, grpc := makeAddrs() | |||
c.P2P.ListenAddress = tm | |||
c.RPC.ListenAddress = rpc | |||
c.RPC.GRPCListenAddress = grpc | |||
return c | |||
} | |||
// byteBufferWAL is a WAL which writes all msgs to a byte buffer. Writing stops | |||
// when the heightToStop is reached. Client will be notified via | |||
// signalWhenStopsTo channel. | |||
type byteBufferWAL struct { | |||
enc *WALEncoder | |||
stopped bool | |||
heightToStop int64 | |||
signalWhenStopsTo chan<- struct{} | |||
logger log.Logger | |||
} | |||
// needed for determinism | |||
var fixedTime, _ = time.Parse(time.RFC3339, "2017-01-02T15:04:05Z") | |||
func newByteBufferWAL(logger log.Logger, enc *WALEncoder, nBlocks int64, signalStop chan<- struct{}) *byteBufferWAL { | |||
return &byteBufferWAL{ | |||
enc: enc, | |||
heightToStop: nBlocks, | |||
signalWhenStopsTo: signalStop, | |||
logger: logger, | |||
} | |||
} | |||
// Save writes message to the internal buffer except when heightToStop is | |||
// reached, in which case it will signal the caller via signalWhenStopsTo and | |||
// skip writing. | |||
func (w *byteBufferWAL) Write(m WALMessage) error { | |||
if w.stopped { | |||
w.logger.Debug("WAL already stopped. Not writing message", "msg", m) | |||
return nil | |||
} | |||
if endMsg, ok := m.(EndHeightMessage); ok { | |||
w.logger.Debug("WAL write end height message", "height", endMsg.Height, "stopHeight", w.heightToStop) | |||
if endMsg.Height == w.heightToStop { | |||
w.logger.Debug("Stopping WAL at height", "height", endMsg.Height) | |||
w.signalWhenStopsTo <- struct{}{} | |||
w.stopped = true | |||
return nil | |||
} | |||
} | |||
w.logger.Debug("WAL Write Message", "msg", m) | |||
err := w.enc.Encode(&TimedWALMessage{fixedTime, m}) | |||
if err != nil { | |||
panic(fmt.Sprintf("failed to encode the msg %v", m)) | |||
} | |||
return nil | |||
} | |||
func (w *byteBufferWAL) WriteSync(m WALMessage) error { | |||
return w.Write(m) | |||
} | |||
func (w *byteBufferWAL) FlushAndSync() error { return nil } | |||
func (w *byteBufferWAL) SearchForEndHeight( | |||
height int64, | |||
options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) { | |||
return nil, false, nil | |||
} | |||
func (w *byteBufferWAL) Start() error { return nil } | |||
func (w *byteBufferWAL) Stop() error { return nil } | |||
func (w *byteBufferWAL) Wait() {} |
@ -0,0 +1,237 @@ | |||
package main | |||
import ( | |||
"fmt" | |||
"os" | |||
"path/filepath" | |||
"github.com/spf13/cobra" | |||
"github.com/spf13/viper" | |||
cmd "github.com/tendermint/tendermint/cmd/tendermint/commands" | |||
"github.com/tendermint/tendermint/cmd/tendermint/commands/debug" | |||
cfg "github.com/tendermint/tendermint/config" | |||
"github.com/tendermint/tendermint/libs/cli" | |||
tmflags "github.com/tendermint/tendermint/libs/cli/flags" | |||
"github.com/tendermint/tendermint/libs/log" | |||
tmos "github.com/tendermint/tendermint/libs/os" | |||
tmrand "github.com/tendermint/tendermint/libs/rand" | |||
"github.com/tendermint/tendermint/p2p" | |||
cs "github.com/tendermint/tendermint/test/maverick/consensus" | |||
nd "github.com/tendermint/tendermint/test/maverick/node" | |||
"github.com/tendermint/tendermint/types" | |||
tmtime "github.com/tendermint/tendermint/types/time" | |||
) | |||
var ( | |||
config = cfg.DefaultConfig() | |||
logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout)) | |||
misbehaviorFlag = "" | |||
) | |||
func init() { | |||
registerFlagsRootCmd(RootCmd) | |||
} | |||
func registerFlagsRootCmd(command *cobra.Command) { | |||
command.PersistentFlags().String("log_level", config.LogLevel, "Log level") | |||
} | |||
func ParseConfig() (*cfg.Config, error) { | |||
conf := cfg.DefaultConfig() | |||
err := viper.Unmarshal(conf) | |||
if err != nil { | |||
return nil, err | |||
} | |||
conf.SetRoot(conf.RootDir) | |||
cfg.EnsureRoot(conf.RootDir) | |||
if err = conf.ValidateBasic(); err != nil { | |||
return nil, fmt.Errorf("error in config file: %v", err) | |||
} | |||
return conf, err | |||
} | |||
// RootCmd is the root command for Tendermint core. | |||
var RootCmd = &cobra.Command{ | |||
Use: "maverick", | |||
Short: "Tendermint Maverick Node", | |||
Long: "Tendermint Maverick Node for testing with faulty consensus misbehaviors in a testnet. Contains " + | |||
"all the functionality of a normal node but custom misbehaviors can be injected when running the node " + | |||
"through a flag. See maverick node --help for how the misbehavior flag is constructured", | |||
PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) { | |||
fmt.Printf("use: %v, args: %v", cmd.Use, cmd.Args) | |||
config, err = ParseConfig() | |||
if err != nil { | |||
return err | |||
} | |||
if config.LogFormat == cfg.LogFormatJSON { | |||
logger = log.NewTMJSONLogger(log.NewSyncWriter(os.Stdout)) | |||
} | |||
logger, err = tmflags.ParseLogLevel(config.LogLevel, logger, cfg.DefaultLogLevel()) | |||
if err != nil { | |||
return err | |||
} | |||
if viper.GetBool(cli.TraceFlag) { | |||
logger = log.NewTracingLogger(logger) | |||
} | |||
logger = logger.With("module", "main") | |||
return nil | |||
}, | |||
} | |||
func main() { | |||
rootCmd := RootCmd | |||
rootCmd.AddCommand( | |||
ListMisbehaviorCmd, | |||
cmd.GenValidatorCmd, | |||
InitFilesCmd, | |||
cmd.ProbeUpnpCmd, | |||
cmd.ReplayCmd, | |||
cmd.ReplayConsoleCmd, | |||
cmd.ResetAllCmd, | |||
cmd.ResetPrivValidatorCmd, | |||
cmd.ShowValidatorCmd, | |||
cmd.ShowNodeIDCmd, | |||
cmd.GenNodeKeyCmd, | |||
cmd.VersionCmd, | |||
debug.DebugCmd, | |||
cli.NewCompletionCmd(rootCmd, true), | |||
) | |||
nodeCmd := &cobra.Command{ | |||
Use: "node", | |||
Short: "Run the maverick node", | |||
RunE: func(command *cobra.Command, args []string) error { | |||
return startNode(config, logger, misbehaviorFlag) | |||
}, | |||
} | |||
cmd.AddNodeFlags(nodeCmd) | |||
// Create & start node | |||
rootCmd.AddCommand(nodeCmd) | |||
// add special flag for misbehaviors | |||
nodeCmd.Flags().StringVar( | |||
&misbehaviorFlag, | |||
"misbehaviors", | |||
"", | |||
"Select the misbehaviors of the node (comma-separated, no spaces in between): \n"+ | |||
"e.g. --misbehaviors double-prevote,3\n"+ | |||
"You can also have multiple misbehaviors: e.g. double-prevote,3,no-vote,5") | |||
cmd := cli.PrepareBaseCmd(rootCmd, "TM", os.ExpandEnv(filepath.Join("$HOME", cfg.DefaultTendermintDir))) | |||
if err := cmd.Execute(); err != nil { | |||
panic(err) | |||
} | |||
} | |||
func startNode(config *cfg.Config, logger log.Logger, misbehaviorFlag string) error { | |||
misbehaviors, err := nd.ParseMisbehaviors(misbehaviorFlag) | |||
if err != nil { | |||
return err | |||
} | |||
node, err := nd.DefaultNewNode(config, logger, misbehaviors) | |||
if err != nil { | |||
return fmt.Errorf("failed to create node: %w", err) | |||
} | |||
if err := node.Start(); err != nil { | |||
return fmt.Errorf("failed to start node: %w", err) | |||
} | |||
logger.Info("Started node", "nodeInfo", node.Switch().NodeInfo()) | |||
// Stop upon receiving SIGTERM or CTRL-C. | |||
tmos.TrapSignal(logger, func() { | |||
if node.IsRunning() { | |||
if err := node.Stop(); err != nil { | |||
logger.Error("unable to stop the node", "error", err) | |||
} | |||
} | |||
}) | |||
// Run forever. | |||
select {} | |||
} | |||
var InitFilesCmd = &cobra.Command{ | |||
Use: "init", | |||
Short: "Initialize Tendermint", | |||
RunE: initFiles, | |||
} | |||
func initFiles(cmd *cobra.Command, args []string) error { | |||
return initFilesWithConfig(config) | |||
} | |||
func initFilesWithConfig(config *cfg.Config) error { | |||
// private validator | |||
privValKeyFile := config.PrivValidatorKeyFile() | |||
privValStateFile := config.PrivValidatorStateFile() | |||
var pv *nd.FilePV | |||
if tmos.FileExists(privValKeyFile) { | |||
pv = nd.LoadFilePV(privValKeyFile, privValStateFile) | |||
logger.Info("Found private validator", "keyFile", privValKeyFile, | |||
"stateFile", privValStateFile) | |||
} else { | |||
pv = nd.GenFilePV(privValKeyFile, privValStateFile) | |||
pv.Save() | |||
logger.Info("Generated private validator", "keyFile", privValKeyFile, | |||
"stateFile", privValStateFile) | |||
} | |||
nodeKeyFile := config.NodeKeyFile() | |||
if tmos.FileExists(nodeKeyFile) { | |||
logger.Info("Found node key", "path", nodeKeyFile) | |||
} else { | |||
if _, err := p2p.LoadOrGenNodeKey(nodeKeyFile); err != nil { | |||
return err | |||
} | |||
logger.Info("Generated node key", "path", nodeKeyFile) | |||
} | |||
// genesis file | |||
genFile := config.GenesisFile() | |||
if tmos.FileExists(genFile) { | |||
logger.Info("Found genesis file", "path", genFile) | |||
} else { | |||
genDoc := types.GenesisDoc{ | |||
ChainID: fmt.Sprintf("test-chain-%v", tmrand.Str(6)), | |||
GenesisTime: tmtime.Now(), | |||
ConsensusParams: types.DefaultConsensusParams(), | |||
} | |||
pubKey, err := pv.GetPubKey() | |||
if err != nil { | |||
return fmt.Errorf("can't get pubkey: %w", err) | |||
} | |||
genDoc.Validators = []types.GenesisValidator{{ | |||
Address: pubKey.Address(), | |||
PubKey: pubKey, | |||
Power: 10, | |||
}} | |||
if err := genDoc.SaveAs(genFile); err != nil { | |||
return err | |||
} | |||
logger.Info("Generated genesis file", "path", genFile) | |||
} | |||
return nil | |||
} | |||
var ListMisbehaviorCmd = &cobra.Command{ | |||
Use: "misbehaviors", | |||
Short: "Lists possible misbehaviors", | |||
RunE: listMisbehaviors, | |||
} | |||
func listMisbehaviors(cmd *cobra.Command, args []string) error { | |||
str := "Currently registered misbehaviors: \n" | |||
for key := range cs.MisbehaviorList { | |||
str += fmt.Sprintf("- %s\n", key) | |||
} | |||
fmt.Println(str) | |||
return nil | |||
} |
@ -0,0 +1,358 @@ | |||
package node | |||
import ( | |||
"errors" | |||
"fmt" | |||
"io/ioutil" | |||
"github.com/tendermint/tendermint/crypto" | |||
"github.com/tendermint/tendermint/crypto/ed25519" | |||
tmbytes "github.com/tendermint/tendermint/libs/bytes" | |||
tmjson "github.com/tendermint/tendermint/libs/json" | |||
tmos "github.com/tendermint/tendermint/libs/os" | |||
"github.com/tendermint/tendermint/libs/tempfile" | |||
tmproto "github.com/tendermint/tendermint/proto/tendermint/types" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
// ******************************************************************************************************************* | |||
// | |||
// WARNING: FOR TESTING ONLY. DO NOT USE THIS FILE OUTSIDE MAVERICK | |||
// | |||
// ******************************************************************************************************************* | |||
const ( | |||
stepNone int8 = 0 // Used to distinguish the initial state | |||
stepPropose int8 = 1 | |||
stepPrevote int8 = 2 | |||
stepPrecommit int8 = 3 | |||
) | |||
// A vote is either stepPrevote or stepPrecommit. | |||
func voteToStep(vote *tmproto.Vote) int8 { | |||
switch vote.Type { | |||
case tmproto.PrevoteType: | |||
return stepPrevote | |||
case tmproto.PrecommitType: | |||
return stepPrecommit | |||
default: | |||
panic(fmt.Sprintf("Unknown vote type: %v", vote.Type)) | |||
} | |||
} | |||
//------------------------------------------------------------------------------- | |||
// FilePVKey stores the immutable part of PrivValidator. | |||
type FilePVKey struct { | |||
Address types.Address `json:"address"` | |||
PubKey crypto.PubKey `json:"pub_key"` | |||
PrivKey crypto.PrivKey `json:"priv_key"` | |||
filePath string | |||
} | |||
// Save persists the FilePVKey to its filePath. | |||
func (pvKey FilePVKey) Save() { | |||
outFile := pvKey.filePath | |||
if outFile == "" { | |||
panic("cannot save PrivValidator key: filePath not set") | |||
} | |||
jsonBytes, err := tmjson.MarshalIndent(pvKey, "", " ") | |||
if err != nil { | |||
panic(err) | |||
} | |||
err = tempfile.WriteFileAtomic(outFile, jsonBytes, 0600) | |||
if err != nil { | |||
panic(err) | |||
} | |||
} | |||
//------------------------------------------------------------------------------- | |||
// FilePVLastSignState stores the mutable part of PrivValidator. | |||
type FilePVLastSignState struct { | |||
Height int64 `json:"height"` | |||
Round int32 `json:"round"` | |||
Step int8 `json:"step"` | |||
Signature []byte `json:"signature,omitempty"` | |||
SignBytes tmbytes.HexBytes `json:"signbytes,omitempty"` | |||
filePath string | |||
} | |||
// CheckHRS checks the given height, round, step (HRS) against that of the | |||
// FilePVLastSignState. It returns an error if the arguments constitute a regression, | |||
// or if they match but the SignBytes are empty. | |||
// The returned boolean indicates whether the last Signature should be reused - | |||
// it returns true if the HRS matches the arguments and the SignBytes are not empty (indicating | |||
// we have already signed for this HRS, and can reuse the existing signature). | |||
// It panics if the HRS matches the arguments, there's a SignBytes, but no Signature. | |||
func (lss *FilePVLastSignState) CheckHRS(height int64, round int32, step int8) (bool, error) { | |||
if lss.Height > height { | |||
return false, fmt.Errorf("height regression. Got %v, last height %v", height, lss.Height) | |||
} | |||
if lss.Height == height { | |||
if lss.Round > round { | |||
return false, fmt.Errorf("round regression at height %v. Got %v, last round %v", height, round, lss.Round) | |||
} | |||
if lss.Round == round { | |||
if lss.Step > step { | |||
return false, fmt.Errorf( | |||
"step regression at height %v round %v. Got %v, last step %v", | |||
height, | |||
round, | |||
step, | |||
lss.Step, | |||
) | |||
} else if lss.Step == step { | |||
if lss.SignBytes != nil { | |||
if lss.Signature == nil { | |||
panic("pv: Signature is nil but SignBytes is not!") | |||
} | |||
return true, nil | |||
} | |||
return false, errors.New("no SignBytes found") | |||
} | |||
} | |||
} | |||
return false, nil | |||
} | |||
// Save persists the FilePvLastSignState to its filePath. | |||
func (lss *FilePVLastSignState) Save() { | |||
outFile := lss.filePath | |||
if outFile == "" { | |||
panic("cannot save FilePVLastSignState: filePath not set") | |||
} | |||
jsonBytes, err := tmjson.MarshalIndent(lss, "", " ") | |||
if err != nil { | |||
panic(err) | |||
} | |||
err = tempfile.WriteFileAtomic(outFile, jsonBytes, 0600) | |||
if err != nil { | |||
panic(err) | |||
} | |||
} | |||
//------------------------------------------------------------------------------- | |||
// FilePV implements PrivValidator using data persisted to disk | |||
// to prevent double signing. | |||
// NOTE: the directories containing pv.Key.filePath and pv.LastSignState.filePath must already exist. | |||
// It includes the LastSignature and LastSignBytes so we don't lose the signature | |||
// if the process crashes after signing but before the resulting consensus message is processed. | |||
type FilePV struct { | |||
Key FilePVKey | |||
LastSignState FilePVLastSignState | |||
} | |||
// GenFilePV generates a new validator with randomly generated private key | |||
// and sets the filePaths, but does not call Save(). | |||
func GenFilePV(keyFilePath, stateFilePath string) *FilePV { | |||
privKey := ed25519.GenPrivKey() | |||
return &FilePV{ | |||
Key: FilePVKey{ | |||
Address: privKey.PubKey().Address(), | |||
PubKey: privKey.PubKey(), | |||
PrivKey: privKey, | |||
filePath: keyFilePath, | |||
}, | |||
LastSignState: FilePVLastSignState{ | |||
Step: stepNone, | |||
filePath: stateFilePath, | |||
}, | |||
} | |||
} | |||
// LoadFilePV loads a FilePV from the filePaths. The FilePV handles double | |||
// signing prevention by persisting data to the stateFilePath. If either file path | |||
// does not exist, the program will exit. | |||
func LoadFilePV(keyFilePath, stateFilePath string) *FilePV { | |||
return loadFilePV(keyFilePath, stateFilePath, true) | |||
} | |||
// LoadFilePVEmptyState loads a FilePV from the given keyFilePath, with an empty LastSignState. | |||
// If the keyFilePath does not exist, the program will exit. | |||
func LoadFilePVEmptyState(keyFilePath, stateFilePath string) *FilePV { | |||
return loadFilePV(keyFilePath, stateFilePath, false) | |||
} | |||
// If loadState is true, we load from the stateFilePath. Otherwise, we use an empty LastSignState. | |||
func loadFilePV(keyFilePath, stateFilePath string, loadState bool) *FilePV { | |||
keyJSONBytes, err := ioutil.ReadFile(keyFilePath) | |||
if err != nil { | |||
tmos.Exit(err.Error()) | |||
} | |||
pvKey := FilePVKey{} | |||
err = tmjson.Unmarshal(keyJSONBytes, &pvKey) | |||
if err != nil { | |||
tmos.Exit(fmt.Sprintf("Error reading PrivValidator key from %v: %v\n", keyFilePath, err)) | |||
} | |||
// overwrite pubkey and address for convenience | |||
pvKey.PubKey = pvKey.PrivKey.PubKey() | |||
pvKey.Address = pvKey.PubKey.Address() | |||
pvKey.filePath = keyFilePath | |||
pvState := FilePVLastSignState{} | |||
if loadState { | |||
stateJSONBytes, err := ioutil.ReadFile(stateFilePath) | |||
if err != nil { | |||
tmos.Exit(err.Error()) | |||
} | |||
err = tmjson.Unmarshal(stateJSONBytes, &pvState) | |||
if err != nil { | |||
tmos.Exit(fmt.Sprintf("Error reading PrivValidator state from %v: %v\n", stateFilePath, err)) | |||
} | |||
} | |||
pvState.filePath = stateFilePath | |||
return &FilePV{ | |||
Key: pvKey, | |||
LastSignState: pvState, | |||
} | |||
} | |||
// LoadOrGenFilePV loads a FilePV from the given filePaths | |||
// or else generates a new one and saves it to the filePaths. | |||
func LoadOrGenFilePV(keyFilePath, stateFilePath string) *FilePV { | |||
var pv *FilePV | |||
if tmos.FileExists(keyFilePath) { | |||
pv = LoadFilePV(keyFilePath, stateFilePath) | |||
} else { | |||
pv = GenFilePV(keyFilePath, stateFilePath) | |||
pv.Save() | |||
} | |||
return pv | |||
} | |||
// GetAddress returns the address of the validator. | |||
// Implements PrivValidator. | |||
func (pv *FilePV) GetAddress() types.Address { | |||
return pv.Key.Address | |||
} | |||
// GetPubKey returns the public key of the validator. | |||
// Implements PrivValidator. | |||
func (pv *FilePV) GetPubKey() (crypto.PubKey, error) { | |||
return pv.Key.PubKey, nil | |||
} | |||
// SignVote signs a canonical representation of the vote, along with the | |||
// chainID. Implements PrivValidator. | |||
func (pv *FilePV) SignVote(chainID string, vote *tmproto.Vote) error { | |||
if err := pv.signVote(chainID, vote); err != nil { | |||
return fmt.Errorf("error signing vote: %v", err) | |||
} | |||
return nil | |||
} | |||
// SignProposal signs a canonical representation of the proposal, along with | |||
// the chainID. Implements PrivValidator. | |||
func (pv *FilePV) SignProposal(chainID string, proposal *tmproto.Proposal) error { | |||
if err := pv.signProposal(chainID, proposal); err != nil { | |||
return fmt.Errorf("error signing proposal: %v", err) | |||
} | |||
return nil | |||
} | |||
// Save persists the FilePV to disk. | |||
func (pv *FilePV) Save() { | |||
pv.Key.Save() | |||
pv.LastSignState.Save() | |||
} | |||
// Reset resets all fields in the FilePV. | |||
// NOTE: Unsafe! | |||
func (pv *FilePV) Reset() { | |||
var sig []byte | |||
pv.LastSignState.Height = 0 | |||
pv.LastSignState.Round = 0 | |||
pv.LastSignState.Step = 0 | |||
pv.LastSignState.Signature = sig | |||
pv.LastSignState.SignBytes = nil | |||
pv.Save() | |||
} | |||
// String returns a string representation of the FilePV. | |||
func (pv *FilePV) String() string { | |||
return fmt.Sprintf( | |||
"PrivValidator{%v LH:%v, LR:%v, LS:%v}", | |||
pv.GetAddress(), | |||
pv.LastSignState.Height, | |||
pv.LastSignState.Round, | |||
pv.LastSignState.Step, | |||
) | |||
} | |||
//------------------------------------------------------------------------------------ | |||
// signVote checks if the vote is good to sign and sets the vote signature. | |||
// It may need to set the timestamp as well if the vote is otherwise the same as | |||
// a previously signed vote (ie. we crashed after signing but before the vote hit the WAL). | |||
func (pv *FilePV) signVote(chainID string, vote *tmproto.Vote) error { | |||
height, round, step := vote.Height, vote.Round, voteToStep(vote) | |||
lss := pv.LastSignState | |||
_, err := lss.CheckHRS(height, round, step) | |||
if err != nil { | |||
return err | |||
} | |||
signBytes := types.VoteSignBytes(chainID, vote) | |||
// It passed the checks. Sign the vote | |||
sig, err := pv.Key.PrivKey.Sign(signBytes) | |||
if err != nil { | |||
return err | |||
} | |||
pv.saveSigned(height, round, step, signBytes, sig) | |||
vote.Signature = sig | |||
return nil | |||
} | |||
// signProposal checks if the proposal is good to sign and sets the proposal signature. | |||
// It may need to set the timestamp as well if the proposal is otherwise the same as | |||
// a previously signed proposal ie. we crashed after signing but before the proposal hit the WAL). | |||
func (pv *FilePV) signProposal(chainID string, proposal *tmproto.Proposal) error { | |||
height, round, step := proposal.Height, proposal.Round, stepPropose | |||
lss := pv.LastSignState | |||
_, err := lss.CheckHRS(height, round, step) | |||
if err != nil { | |||
return err | |||
} | |||
signBytes := types.ProposalSignBytes(chainID, proposal) | |||
// It passed the checks. Sign the proposal | |||
sig, err := pv.Key.PrivKey.Sign(signBytes) | |||
if err != nil { | |||
return err | |||
} | |||
pv.saveSigned(height, round, step, signBytes, sig) | |||
proposal.Signature = sig | |||
return nil | |||
} | |||
// Persist height/round/step and signature | |||
func (pv *FilePV) saveSigned(height int64, round int32, step int8, | |||
signBytes []byte, sig []byte) { | |||
pv.LastSignState.Height = height | |||
pv.LastSignState.Round = round | |||
pv.LastSignState.Step = step | |||
pv.LastSignState.Signature = sig | |||
pv.LastSignState.SignBytes = signBytes | |||
pv.LastSignState.Save() | |||
} |