Browse Source

maverick: reduce some duplication (#6052)

## Description 

- Reduce duplication in messages and metrics. 
- merge WAL interfaces. Meant to push the developer to make changes in both places.
Marko 3 years ago
committed by GitHub
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 124 additions and 973 deletions
  1. +0
  2. +4
  3. +8
  4. +45
  5. +9
  6. +3
  7. +23
  8. +20
  9. +0
  10. +7
  11. +5

+ 0
- 220
test/maverick/consensus/metrics.go View File

@ -1,220 +0,0 @@
package consensus
import (
prometheus ""
stdprometheus ""
const (
// MetricsSubsystem is a subsystem shared by all metrics exposed by this
// package.
MetricsSubsystem = "consensus"
// Metrics contains metrics exposed by this package.
type Metrics struct {
// Height of the chain.
Height metrics.Gauge
// ValidatorLastSignedHeight of a validator.
ValidatorLastSignedHeight metrics.Gauge
// Number of rounds.
Rounds metrics.Gauge
// Number of validators.
Validators metrics.Gauge
// Total power of all validators.
ValidatorsPower metrics.Gauge
// Power of a validator.
ValidatorPower metrics.Gauge
// Amount of blocks missed by a validator.
ValidatorMissedBlocks metrics.Gauge
// Number of validators who did not sign.
MissingValidators metrics.Gauge
// Total power of the missing validators.
MissingValidatorsPower metrics.Gauge
// Number of validators who tried to double sign.
ByzantineValidators metrics.Gauge
// Total power of the byzantine validators.
ByzantineValidatorsPower metrics.Gauge
// Time between this and the last block.
BlockIntervalSeconds metrics.Histogram
// Number of transactions.
NumTxs metrics.Gauge
// Size of the block.
BlockSizeBytes metrics.Gauge
// Total number of transactions.
TotalTxs metrics.Gauge
// The latest block height.
CommittedHeight metrics.Gauge
// Whether or not a node is fast syncing. 1 if yes, 0 if no.
FastSyncing metrics.Gauge
// Whether or not a node is state syncing. 1 if yes, 0 if no.
StateSyncing metrics.Gauge
// Number of blockparts transmitted by peer.
BlockParts metrics.Counter
// PrometheusMetrics returns Metrics build using Prometheus client library.
// Optionally, labels can be provided along with their values ("foo",
// "fooValue").
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
labels := []string{}
for i := 0; i < len(labelsAndValues); i += 2 {
labels = append(labels, labelsAndValues[i])
return &Metrics{
Height: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "height",
Help: "Height of the chain.",
}, labels).With(labelsAndValues...),
Rounds: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "rounds",
Help: "Number of rounds.",
}, labels).With(labelsAndValues...),
Validators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "validators",
Help: "Number of validators.",
}, labels).With(labelsAndValues...),
ValidatorLastSignedHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "validator_last_signed_height",
Help: "Last signed height for a validator",
}, append(labels, "validator_address")).With(labelsAndValues...),
ValidatorMissedBlocks: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "validator_missed_blocks",
Help: "Total missed blocks for a validator",
}, append(labels, "validator_address")).With(labelsAndValues...),
ValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "validators_power",
Help: "Total power of all validators.",
}, labels).With(labelsAndValues...),
ValidatorPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "validator_power",
Help: "Power of a validator",
}, append(labels, "validator_address")).With(labelsAndValues...),
MissingValidators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "missing_validators",
Help: "Number of validators who did not sign.",
}, labels).With(labelsAndValues...),
MissingValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "missing_validators_power",
Help: "Total power of the missing validators.",
}, labels).With(labelsAndValues...),
ByzantineValidators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "byzantine_validators",
Help: "Number of validators who tried to double sign.",
}, labels).With(labelsAndValues...),
ByzantineValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "byzantine_validators_power",
Help: "Total power of the byzantine validators.",
}, labels).With(labelsAndValues...),
BlockIntervalSeconds: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "block_interval_seconds",
Help: "Time between this and the last block.",
}, labels).With(labelsAndValues...),
NumTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "num_txs",
Help: "Number of transactions.",
}, labels).With(labelsAndValues...),
BlockSizeBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "block_size_bytes",
Help: "Size of the block.",
}, labels).With(labelsAndValues...),
TotalTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "total_txs",
Help: "Total number of transactions.",
}, labels).With(labelsAndValues...),
CommittedHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "latest_block_height",
Help: "The latest block height.",
}, labels).With(labelsAndValues...),
FastSyncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "fast_syncing",
Help: "Whether or not a node is fast syncing. 1 if yes, 0 if no.",
}, labels).With(labelsAndValues...),
StateSyncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "state_syncing",
Help: "Whether or not a node is state syncing. 1 if yes, 0 if no.",
}, labels).With(labelsAndValues...),
BlockParts: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "block_parts",
Help: "Number of blockparts transmitted by peer.",
}, append(labels, "peer_id")).With(labelsAndValues...),
// NopMetrics returns no-op Metrics.
func NopMetrics() *Metrics {
return &Metrics{
Height: discard.NewGauge(),
ValidatorLastSignedHeight: discard.NewGauge(),
Rounds: discard.NewGauge(),
Validators: discard.NewGauge(),
ValidatorsPower: discard.NewGauge(),
ValidatorPower: discard.NewGauge(),
ValidatorMissedBlocks: discard.NewGauge(),
MissingValidators: discard.NewGauge(),
MissingValidatorsPower: discard.NewGauge(),
ByzantineValidators: discard.NewGauge(),
ByzantineValidatorsPower: discard.NewGauge(),
BlockIntervalSeconds: discard.NewHistogram(),
NumTxs: discard.NewGauge(),
BlockSizeBytes: discard.NewGauge(),
TotalTxs: discard.NewGauge(),
CommittedHeight: discard.NewGauge(),
FastSyncing: discard.NewGauge(),
StateSyncing: discard.NewGauge(),
BlockParts: discard.NewCounter(),

+ 4
- 3
test/maverick/consensus/misbehavior.go View File

@ -3,6 +3,7 @@ package consensus
import (
tmcon ""
cstypes ""
tmproto ""
@ -89,16 +90,16 @@ func DoublePrevoteMisbehavior() Misbehavior {
// add our own vote
cs.sendInternalMessage(msgInfo{&VoteMessage{prevote}, ""})
cs.sendInternalMessage(msgInfo{&tmcon.VoteMessage{Vote: prevote}, ""})
cs.Logger.Info("Sending conflicting votes")
peers := cs.sw.Peers().List()
// there has to be at least two other peers connected else this behavior works normally
for idx, peer := range peers {
if idx%2 == 0 { // sign the proposal block
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote}))
peer.Send(VoteChannel, tmcon.MustEncode(&tmcon.VoteMessage{Vote: prevote}))
} else { // sign a nil block
peer.Send(VoteChannel, MustEncode(&VoteMessage{nilPrevote}))
peer.Send(VoteChannel, tmcon.MustEncode(&tmcon.VoteMessage{Vote: nilPrevote}))

+ 8
- 280
test/maverick/consensus/msgs.go View File

@ -4,10 +4,8 @@ import (
tmcon ""
cstypes ""
tmmath ""
tmcons ""
@ -15,277 +13,7 @@ import (
// MsgToProto takes a consensus message type and returns the proto defined consensus message
func MsgToProto(msg Message) (*tmcons.Message, error) {
if msg == nil {
return nil, errors.New("consensus: message is nil")
var pb tmcons.Message
switch msg := msg.(type) {
case *NewRoundStepMessage:
pb = tmcons.Message{
Sum: &tmcons.Message_NewRoundStep{
NewRoundStep: &tmcons.NewRoundStep{
Height: msg.Height,
Round: msg.Round,
Step: uint32(msg.Step),
SecondsSinceStartTime: msg.SecondsSinceStartTime,
LastCommitRound: msg.LastCommitRound,
case *NewValidBlockMessage:
pbPartSetHeader := msg.BlockPartSetHeader.ToProto()
pbBits := msg.BlockParts.ToProto()
pb = tmcons.Message{
Sum: &tmcons.Message_NewValidBlock{
NewValidBlock: &tmcons.NewValidBlock{
Height: msg.Height,
Round: msg.Round,
BlockPartSetHeader: pbPartSetHeader,
BlockParts: pbBits,
IsCommit: msg.IsCommit,
case *ProposalMessage:
pbP := msg.Proposal.ToProto()
pb = tmcons.Message{
Sum: &tmcons.Message_Proposal{
Proposal: &tmcons.Proposal{
Proposal: *pbP,
case *ProposalPOLMessage:
pbBits := msg.ProposalPOL.ToProto()
pb = tmcons.Message{
Sum: &tmcons.Message_ProposalPol{
ProposalPol: &tmcons.ProposalPOL{
Height: msg.Height,
ProposalPolRound: msg.ProposalPOLRound,
ProposalPol: *pbBits,
case *BlockPartMessage:
parts, err := msg.Part.ToProto()
if err != nil {
return nil, fmt.Errorf("msg to proto error: %w", err)
pb = tmcons.Message{
Sum: &tmcons.Message_BlockPart{
BlockPart: &tmcons.BlockPart{
Height: msg.Height,
Round: msg.Round,
Part: *parts,
case *VoteMessage:
vote := msg.Vote.ToProto()
pb = tmcons.Message{
Sum: &tmcons.Message_Vote{
Vote: &tmcons.Vote{
Vote: vote,
case *HasVoteMessage:
pb = tmcons.Message{
Sum: &tmcons.Message_HasVote{
HasVote: &tmcons.HasVote{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
Index: msg.Index,
case *VoteSetMaj23Message:
bi := msg.BlockID.ToProto()
pb = tmcons.Message{
Sum: &tmcons.Message_VoteSetMaj23{
VoteSetMaj23: &tmcons.VoteSetMaj23{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: bi,
case *VoteSetBitsMessage:
bi := msg.BlockID.ToProto()
bits := msg.Votes.ToProto()
vsb := &tmcons.Message_VoteSetBits{
VoteSetBits: &tmcons.VoteSetBits{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: bi,
if bits != nil {
vsb.VoteSetBits.Votes = *bits
pb = tmcons.Message{
Sum: vsb,
return nil, fmt.Errorf("consensus: message not recognized: %T", msg)
return &pb, nil
// MsgFromProto takes a consensus proto message and returns the native go type
func MsgFromProto(msg *tmcons.Message) (Message, error) {
if msg == nil {
return nil, errors.New("consensus: nil message")
var pb Message
switch msg := msg.Sum.(type) {
case *tmcons.Message_NewRoundStep:
rs, err := tmmath.SafeConvertUint8(int64(msg.NewRoundStep.Step))
// deny message based on possible overflow
if err != nil {
return nil, fmt.Errorf("denying message due to possible overflow: %w", err)
pb = &NewRoundStepMessage{
Height: msg.NewRoundStep.Height,
Round: msg.NewRoundStep.Round,
Step: cstypes.RoundStepType(rs),
SecondsSinceStartTime: msg.NewRoundStep.SecondsSinceStartTime,
LastCommitRound: msg.NewRoundStep.LastCommitRound,
case *tmcons.Message_NewValidBlock:
pbPartSetHeader, err := types.PartSetHeaderFromProto(&msg.NewValidBlock.BlockPartSetHeader)
if err != nil {
return nil, fmt.Errorf("parts header to proto error: %w", err)
pbBits := new(bits.BitArray)
err = pbBits.FromProto(msg.NewValidBlock.BlockParts)
if err != nil {
return nil, fmt.Errorf("parts to proto error: %w", err)
pb = &NewValidBlockMessage{
Height: msg.NewValidBlock.Height,
Round: msg.NewValidBlock.Round,
BlockPartSetHeader: *pbPartSetHeader,
BlockParts: pbBits,
IsCommit: msg.NewValidBlock.IsCommit,
case *tmcons.Message_Proposal:
pbP, err := types.ProposalFromProto(&msg.Proposal.Proposal)
if err != nil {
return nil, fmt.Errorf("proposal msg to proto error: %w", err)
pb = &ProposalMessage{
Proposal: pbP,
case *tmcons.Message_ProposalPol:
pbBits := new(bits.BitArray)
err := pbBits.FromProto(&msg.ProposalPol.ProposalPol)
if err != nil {
return nil, fmt.Errorf("proposal PoL to proto error: %w", err)
pb = &ProposalPOLMessage{
Height: msg.ProposalPol.Height,
ProposalPOLRound: msg.ProposalPol.ProposalPolRound,
ProposalPOL: pbBits,
case *tmcons.Message_BlockPart:
parts, err := types.PartFromProto(&msg.BlockPart.Part)
if err != nil {
return nil, fmt.Errorf("blockpart msg to proto error: %w", err)
pb = &BlockPartMessage{
Height: msg.BlockPart.Height,
Round: msg.BlockPart.Round,
Part: parts,
case *tmcons.Message_Vote:
vote, err := types.VoteFromProto(msg.Vote.Vote)
if err != nil {
return nil, fmt.Errorf("vote msg to proto error: %w", err)
pb = &VoteMessage{
Vote: vote,
case *tmcons.Message_HasVote:
pb = &HasVoteMessage{
Height: msg.HasVote.Height,
Round: msg.HasVote.Round,
Type: msg.HasVote.Type,
Index: msg.HasVote.Index,
case *tmcons.Message_VoteSetMaj23:
bi, err := types.BlockIDFromProto(&msg.VoteSetMaj23.BlockID)
if err != nil {
return nil, fmt.Errorf("voteSetMaj23 msg to proto error: %w", err)
pb = &VoteSetMaj23Message{
Height: msg.VoteSetMaj23.Height,
Round: msg.VoteSetMaj23.Round,
Type: msg.VoteSetMaj23.Type,
BlockID: *bi,
case *tmcons.Message_VoteSetBits:
bi, err := types.BlockIDFromProto(&msg.VoteSetBits.BlockID)
if err != nil {
return nil, fmt.Errorf("block ID msg to proto error: %w", err)
bits := new(bits.BitArray)
err = bits.FromProto(&msg.VoteSetBits.Votes)
if err != nil {
return nil, fmt.Errorf("votes to proto error: %w", err)
pb = &VoteSetBitsMessage{
Height: msg.VoteSetBits.Height,
Round: msg.VoteSetBits.Round,
Type: msg.VoteSetBits.Type,
BlockID: *bi,
Votes: bits,
return nil, fmt.Errorf("consensus: message not recognized: %T", msg)
if err := pb.ValidateBasic(); err != nil {
return nil, err
return pb, nil
// MustEncode takes the reactors msg, makes it proto and marshals it
// this mimics `MustMarshalBinaryBare` in that is panics on error
func MustEncode(msg Message) []byte {
pb, err := MsgToProto(msg)
if err != nil {
enc, err := proto.Marshal(pb)
if err != nil {
return enc
// WALToProto takes a WAL message and return a proto walMessage and error
func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
func WALToProto(msg tmcon.WALMessage) (*tmcons.WALMessage, error) {
var pb tmcons.WALMessage
switch msg := msg.(type) {
@ -300,7 +28,7 @@ func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
case msgInfo:
consMsg, err := MsgToProto(msg.Msg)
consMsg, err := tmcon.MsgToProto(msg.Msg)
if err != nil {
return nil, err
@ -323,7 +51,7 @@ func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
case EndHeightMessage:
case tmcon.EndHeightMessage:
pb = tmcons.WALMessage{
Sum: &tmcons.WALMessage_EndHeight{
EndHeight: &tmcons.EndHeight{
@ -339,11 +67,11 @@ func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
// WALFromProto takes a proto wal message and return a consensus walMessage and error
func WALFromProto(msg *tmcons.WALMessage) (WALMessage, error) {
func WALFromProto(msg *tmcons.WALMessage) (tmcon.WALMessage, error) {
if msg == nil {
return nil, errors.New("nil WAL message")
var pb WALMessage
var pb tmcon.WALMessage
switch msg := msg.Sum.(type) {
case *tmcons.WALMessage_EventDataRoundState:
@ -353,7 +81,7 @@ func WALFromProto(msg *tmcons.WALMessage) (WALMessage, error) {
Step: msg.EventDataRoundState.Step,
case *tmcons.WALMessage_MsgInfo:
walMsg, err := MsgFromProto(&msg.MsgInfo.Msg)
walMsg, err := tmcon.MsgFromProto(&msg.MsgInfo.Msg)
if err != nil {
return nil, fmt.Errorf("msgInfo from proto error: %w", err)
@ -376,7 +104,7 @@ func WALFromProto(msg *tmcons.WALMessage) (WALMessage, error) {
return pb, nil
case *tmcons.WALMessage_EndHeight:
pb := EndHeightMessage{
pb := tmcon.EndHeightMessage{
Height: msg.EndHeight.Height,
return pb, nil

+ 45
- 346
test/maverick/consensus/reactor.go View File

@ -9,6 +9,7 @@ import (
tmcon ""
cstypes ""
tmevents ""
@ -47,7 +48,7 @@ type Reactor struct {
waitSync bool
eventBus *types.EventBus
Metrics *Metrics
Metrics *tmcon.Metrics
type ReactorOption func(*Reactor)
@ -58,7 +59,7 @@ func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption)
conR := &Reactor{
conS: consensusState,
waitSync: waitSync,
Metrics: NopMetrics(),
Metrics: tmcon.NopMetrics(),
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)
@ -252,7 +253,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
switch chID {
case StateChannel:
switch msg := msg.(type) {
case *NewRoundStepMessage:
case *tmcon.NewRoundStepMessage:
initialHeight := conR.conS.state.InitialHeight
@ -262,11 +263,11 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
case *NewValidBlockMessage:
case *tmcon.NewValidBlockMessage:
case *HasVoteMessage:
case *tmcon.HasVoteMessage:
case *VoteSetMaj23Message:
case *tmcon.VoteSetMaj23Message:
cs := conR.conS
height, votes := cs.Height, cs.Votes
@ -291,7 +292,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?")
src.TrySend(VoteSetBitsChannel, MustEncode(&VoteSetBitsMessage{
src.TrySend(VoteSetBitsChannel, tmcon.MustEncode(&tmcon.VoteSetBitsMessage{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
@ -308,12 +309,12 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
switch msg := msg.(type) {
case *ProposalMessage:
case *tmcon.ProposalMessage:
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
case *ProposalPOLMessage:
case *tmcon.ProposalPOLMessage:
case *BlockPartMessage:
case *tmcon.BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index))
conR.Metrics.BlockParts.With("peer_id", string(src.ID())).Add(1)
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
@ -327,7 +328,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
switch msg := msg.(type) {
case *VoteMessage:
case *tmcon.VoteMessage:
cs := conR.conS
height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
@ -349,7 +350,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
switch msg := msg.(type) {
case *VoteSetBitsMessage:
case *tmcon.VoteSetBitsMessage:
cs := conR.conS
height, votes := cs.Height, cs.Votes
@ -429,29 +430,29 @@ func (conR *Reactor) unsubscribeFromBroadcastEvents() {
func (conR *Reactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) {
nrsMsg := makeRoundStepMessage(rs)
conR.Switch.Broadcast(StateChannel, MustEncode(nrsMsg))
conR.Switch.Broadcast(StateChannel, tmcon.MustEncode(nrsMsg))
func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) {
csMsg := &NewValidBlockMessage{
csMsg := &tmcon.NewValidBlockMessage{
Height: rs.Height,
Round: rs.Round,
BlockPartSetHeader: rs.ProposalBlockParts.Header(),
BlockParts: rs.ProposalBlockParts.BitArray(),
IsCommit: rs.Step == cstypes.RoundStepCommit,
conR.Switch.Broadcast(StateChannel, MustEncode(csMsg))
conR.Switch.Broadcast(StateChannel, tmcon.MustEncode(csMsg))
// Broadcasts HasVoteMessage to peers that care.
func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
msg := &HasVoteMessage{
msg := &tmcon.HasVoteMessage{
Height: vote.Height,
Round: vote.Round,
Type: vote.Type,
Index: vote.ValidatorIndex,
conR.Switch.Broadcast(StateChannel, MustEncode(msg))
conR.Switch.Broadcast(StateChannel, tmcon.MustEncode(msg))
// TODO: Make this broadcast more selective.
for _, peer := range conR.Switch.Peers().List() {
@ -472,8 +473,8 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage) {
nrsMsg = &NewRoundStepMessage{
func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *tmcon.NewRoundStepMessage) {
nrsMsg = &tmcon.NewRoundStepMessage{
Height: rs.Height,
Round: rs.Round,
Step: rs.Step,
@ -486,7 +487,7 @@ func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage)
func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) {
rs := conR.conS.GetRoundState()
nrsMsg := makeRoundStepMessage(rs)
peer.Send(StateChannel, MustEncode(nrsMsg))
peer.Send(StateChannel, tmcon.MustEncode(nrsMsg))
func (conR *Reactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) {
@ -506,13 +507,13 @@ OUTER_LOOP:
if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader) {
if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
part := rs.ProposalBlockParts.GetPart(index)
msg := &BlockPartMessage{
msg := &tmcon.BlockPartMessage{
Height: rs.Height, // This tells peer that this part applies to us.
Round: rs.Round, // This tells peer that this part applies to us.
Part: part,
logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round)
if peer.Send(DataChannel, MustEncode(msg)) {
if peer.Send(DataChannel, tmcon.MustEncode(msg)) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
continue OUTER_LOOP
@ -555,9 +556,9 @@ OUTER_LOOP:
if rs.Proposal != nil && !prs.Proposal {
// Proposal: share the proposal metadata with peer.
msg := &ProposalMessage{Proposal: rs.Proposal}
msg := &tmcon.ProposalMessage{Proposal: rs.Proposal}
logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round)
if peer.Send(DataChannel, MustEncode(msg)) {
if peer.Send(DataChannel, tmcon.MustEncode(msg)) {
// NOTE[ZM]: A peer might have received different proposal msg so this Proposal msg will be rejected!
@ -567,13 +568,13 @@ OUTER_LOOP:
// rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
// so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
if 0 <= rs.Proposal.POLRound {
msg := &ProposalPOLMessage{
msg := &tmcon.ProposalPOLMessage{
Height: rs.Height,
ProposalPOLRound: rs.Proposal.POLRound,
ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round)
peer.Send(DataChannel, MustEncode(msg))
peer.Send(DataChannel, tmcon.MustEncode(msg))
continue OUTER_LOOP
@ -610,13 +611,13 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt
// Send the part
msg := &BlockPartMessage{
msg := &tmcon.BlockPartMessage{
Height: prs.Height, // Not our height, so it doesn't matter.
Round: prs.Round, // Not our height, so it doesn't matter.
Part: part,
logger.Debug("Sending block part for catchup", "round", prs.Round, "index", index)
if peer.Send(DataChannel, MustEncode(msg)) {
if peer.Send(DataChannel, tmcon.MustEncode(msg)) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
} else {
logger.Debug("Sending block part for catchup failed")
@ -773,7 +774,7 @@ OUTER_LOOP:
prs := ps.GetRoundState()
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
peer.TrySend(StateChannel, tmcon.MustEncode(&tmcon.VoteSetMaj23Message{
Height: prs.Height,
Round: prs.Round,
Type: tmproto.PrevoteType,
@ -790,7 +791,7 @@ OUTER_LOOP:
prs := ps.GetRoundState()
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
peer.TrySend(StateChannel, tmcon.MustEncode(&tmcon.VoteSetMaj23Message{
Height: prs.Height,
Round: prs.Round,
Type: tmproto.PrecommitType,
@ -807,7 +808,7 @@ OUTER_LOOP:
prs := ps.GetRoundState()
if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 {
if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
peer.TrySend(StateChannel, tmcon.MustEncode(&tmcon.VoteSetMaj23Message{
Height: prs.Height,
Round: prs.ProposalPOLRound,
Type: tmproto.PrevoteType,
@ -827,7 +828,7 @@ OUTER_LOOP:
if prs.CatchupCommitRound != -1 && prs.Height > 0 && prs.Height <= conR.conS.blockStore.Height() &&
prs.Height >= conR.conS.blockStore.Base() {
if commit := conR.conS.LoadCommit(prs.Height); commit != nil {
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
peer.TrySend(StateChannel, tmcon.MustEncode(&tmcon.VoteSetMaj23Message{
Height: prs.Height,
Round: commit.Round,
Type: tmproto.PrecommitType,
@ -866,11 +867,11 @@ func (conR *Reactor) peerStatsRoutine() {
panic(fmt.Sprintf("Peer %v has no state", peer))
switch msg.Msg.(type) {
case *VoteMessage:
case *tmcon.VoteMessage:
if numVotes := ps.RecordVote(); numVotes%votesToContributeToBecomeGoodPeer == 0 {
case *BlockPartMessage:
case *tmcon.BlockPartMessage:
if numParts := ps.RecordBlockPart(); numParts%blocksToContributeToBecomeGoodPeer == 0 {
@ -908,7 +909,7 @@ func (conR *Reactor) StringIndented(indent string) string {
// ReactorMetrics sets the metrics
func ReactorMetrics(metrics *Metrics) ReactorOption {
func ReactorMetrics(metrics *tmcon.Metrics) ReactorOption {
return func(conR *Reactor) { conR.Metrics = metrics }
@ -1046,9 +1047,9 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in
// Returns true if vote was sent.
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
if vote, ok := ps.PickVoteToSend(votes); ok {
msg := &VoteMessage{vote}
msg := &tmcon.VoteMessage{Vote: vote}
ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote)
if ps.peer.Send(VoteChannel, MustEncode(msg)) {
if ps.peer.Send(VoteChannel, tmcon.MustEncode(msg)) {
return true
@ -1255,7 +1256,7 @@ func (ps *PeerState) setHasVote(height int64, round int32, voteType tmproto.Sign
// ApplyNewRoundStepMessage updates the peer state for the new round.
func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
func (ps *PeerState) ApplyNewRoundStepMessage(msg *tmcon.NewRoundStepMessage) {
defer ps.mtx.Unlock()
@ -1308,7 +1309,7 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
// ApplyNewValidBlockMessage updates the peer state for the new valid block.
func (ps *PeerState) ApplyNewValidBlockMessage(msg *NewValidBlockMessage) {
func (ps *PeerState) ApplyNewValidBlockMessage(msg *tmcon.NewValidBlockMessage) {
defer ps.mtx.Unlock()
@ -1325,7 +1326,7 @@ func (ps *PeerState) ApplyNewValidBlockMessage(msg *NewValidBlockMessage) {
// ApplyProposalPOLMessage updates the peer state for the new proposal POL.
func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
func (ps *PeerState) ApplyProposalPOLMessage(msg *tmcon.ProposalPOLMessage) {
defer ps.mtx.Unlock()
@ -1342,7 +1343,7 @@ func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
// ApplyHasVoteMessage updates the peer state for the new vote.
func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
func (ps *PeerState) ApplyHasVoteMessage(msg *tmcon.HasVoteMessage) {
defer ps.mtx.Unlock()
@ -1358,7 +1359,7 @@ func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
// `ourVotes` is a BitArray of votes we have for msg.BlockID
// NOTE: if ourVotes is nil (e.g. msg.Height < rs.Height),
// we conservatively overwrite ps's votes w/ msg.Votes.
func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *bits.BitArray) {
func (ps *PeerState) ApplyVoteSetBitsMessage(msg *tmcon.VoteSetBitsMessage, ourVotes *bits.BitArray) {
defer ps.mtx.Unlock()
@ -1395,12 +1396,6 @@ func (ps *PeerState) StringIndented(indent string) string {
// Messages
// Message is a message that can be sent and received on the Reactor
type Message interface {
ValidateBasic() error
// func init() {
// tmjson.RegisterType(&NewRoundStepMessage{}, "tendermint/NewRoundStepMessage")
@ -1414,307 +1409,11 @@ type Message interface {
// tmjson.RegisterType(&VoteSetBitsMessage{}, "tendermint/VoteSetBits")
// }
func decodeMsg(bz []byte) (msg Message, err error) {
func decodeMsg(bz []byte) (msg tmcon.Message, err error) {
pb := &tmcons.Message{}
if err = proto.Unmarshal(bz, pb); err != nil {
return msg, err
return MsgFromProto(pb)
// NewRoundStepMessage is sent for every step taken in the ConsensusState.
// For every height/round/step transition
type NewRoundStepMessage struct {
Height int64
Round int32
Step cstypes.RoundStepType
SecondsSinceStartTime int64
LastCommitRound int32
// ValidateBasic performs basic validation.
func (m *NewRoundStepMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
if m.Round < 0 {
return errors.New("negative Round")
if !m.Step.IsValid() {
return errors.New("invalid Step")
// NOTE: SecondsSinceStartTime may be negative
// LastCommitRound will be -1 for the initial height, but we don't know what height this is
// since it can be specified in genesis. The reactor will have to validate this via
// ValidateHeight().
if m.LastCommitRound < -1 {
return errors.New("invalid LastCommitRound (cannot be < -1)")
return nil
// ValidateHeight validates the height given the chain's initial height.
func (m *NewRoundStepMessage) ValidateHeight(initialHeight int64) error {
if m.Height < initialHeight {
return fmt.Errorf("invalid Height %v (lower than initial height %v)",
m.Height, initialHeight)
if m.Height == initialHeight && m.LastCommitRound != -1 {
return fmt.Errorf("invalid LastCommitRound %v (must be -1 for initial height %v)",
m.LastCommitRound, initialHeight)
if m.Height > initialHeight && m.LastCommitRound < 0 {
return fmt.Errorf("LastCommitRound can only be negative for initial height %v", // nolint
return nil
// String returns a string representation.
func (m *NewRoundStepMessage) String() string {
return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
m.Height, m.Round, m.Step, m.LastCommitRound)
// NewValidBlockMessage is sent when a validator observes a valid block B in some round r,
// i.e., there is a Proposal for block B and 2/3+ prevotes for the block B in the round r.
// In case the block is also committed, then IsCommit flag is set to true.
type NewValidBlockMessage struct {
Height int64
Round int32
BlockPartSetHeader types.PartSetHeader
BlockParts *bits.BitArray
IsCommit bool
// ValidateBasic performs basic validation.
func (m *NewValidBlockMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
if m.Round < 0 {
return errors.New("negative Round")
if err := m.BlockPartSetHeader.ValidateBasic(); err != nil {
return fmt.Errorf("wrong BlockPartSetHeader: %v", err)
if m.BlockParts.Size() == 0 {
return errors.New("empty blockParts")
if m.BlockParts.Size() != int(m.BlockPartSetHeader.Total) {
return fmt.Errorf("blockParts bit array size %d not equal to BlockPartSetHeader.Total %d",
if m.BlockParts.Size() > int(types.MaxBlockPartsCount) {
return fmt.Errorf("blockParts bit array is too big: %d, max: %d", m.BlockParts.Size(), types.MaxBlockPartsCount)
return nil
// String returns a string representation.
func (m *NewValidBlockMessage) String() string {
return fmt.Sprintf("[ValidBlockMessage H:%v R:%v BP:%v BA:%v IsCommit:%v]",
m.Height, m.Round, m.BlockPartSetHeader, m.BlockParts, m.IsCommit)
// ProposalMessage is sent when a new block is proposed.
type ProposalMessage struct {
Proposal *types.Proposal
// ValidateBasic performs basic validation.
func (m *ProposalMessage) ValidateBasic() error {
return m.Proposal.ValidateBasic()
// String returns a string representation.
func (m *ProposalMessage) String() string {
return fmt.Sprintf("[Proposal %v]", m.Proposal)
// ProposalPOLMessage is sent when a previous proposal is re-proposed.
type ProposalPOLMessage struct {
Height int64
ProposalPOLRound int32
ProposalPOL *bits.BitArray
// ValidateBasic performs basic validation.
func (m *ProposalPOLMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
if m.ProposalPOLRound < 0 {
return errors.New("negative ProposalPOLRound")
if m.ProposalPOL.Size() == 0 {
return errors.New("empty ProposalPOL bit array")
if m.ProposalPOL.Size() > types.MaxVotesCount {
return fmt.Errorf("proposalPOL bit array is too big: %d, max: %d", m.ProposalPOL.Size(), types.MaxVotesCount)
return nil
// String returns a string representation.
func (m *ProposalPOLMessage) String() string {
return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
// BlockPartMessage is sent when gossipping a piece of the proposed block.
type BlockPartMessage struct {
Height int64
Round int32
Part *types.Part
// ValidateBasic performs basic validation.
func (m *BlockPartMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
if m.Round < 0 {
return errors.New("negative Round")
if err := m.Part.ValidateBasic(); err != nil {
return fmt.Errorf("wrong Part: %v", err)
return nil
// String returns a string representation.
func (m *BlockPartMessage) String() string {
return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
// VoteMessage is sent when voting for a proposal (or lack thereof).
type VoteMessage struct {
Vote *types.Vote
// ValidateBasic performs basic validation.
func (m *VoteMessage) ValidateBasic() error {
return m.Vote.ValidateBasic()
// String returns a string representation.
func (m *VoteMessage) String() string {
return fmt.Sprintf("[Vote %v]", m.Vote)
// HasVoteMessage is sent to indicate that a particular vote has been received.
type HasVoteMessage struct {
Height int64
Round int32
Type tmproto.SignedMsgType
Index int32
// ValidateBasic performs basic validation.
func (m *HasVoteMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
if m.Round < 0 {
return errors.New("negative Round")
if !types.IsVoteTypeValid(m.Type) {
return errors.New("invalid Type")
if m.Index < 0 {
return errors.New("negative Index")
return nil
return tmcon.MsgFromProto(pb)
// String returns a string representation.
func (m *HasVoteMessage) String() string {
return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v}]", m.Index, m.Height, m.Round, m.Type)
// VoteSetMaj23Message is sent to indicate that a given BlockID has seen +2/3 votes.
type VoteSetMaj23Message struct {
Height int64
Round int32
Type tmproto.SignedMsgType
BlockID types.BlockID
// ValidateBasic performs basic validation.
func (m *VoteSetMaj23Message) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
if m.Round < 0 {
return errors.New("negative Round")
if !types.IsVoteTypeValid(m.Type) {
return errors.New("invalid Type")
if err := m.BlockID.ValidateBasic(); err != nil {
return fmt.Errorf("wrong BlockID: %v", err)
return nil
// String returns a string representation.
func (m *VoteSetMaj23Message) String() string {
return fmt.Sprintf("[VSM23 %v/%02d/%v %v]", m.Height, m.Round, m.Type, m.BlockID)
// VoteSetBitsMessage is sent to communicate the bit-array of votes seen for the BlockID.
type VoteSetBitsMessage struct {
Height int64
Round int32
Type tmproto.SignedMsgType
BlockID types.BlockID
Votes *bits.BitArray
// ValidateBasic performs basic validation.
func (m *VoteSetBitsMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
if !types.IsVoteTypeValid(m.Type) {
return errors.New("invalid Type")
if err := m.BlockID.ValidateBasic(); err != nil {
return fmt.Errorf("wrong BlockID: %v", err)
// NOTE: Votes.Size() can be zero if the node does not have any
if m.Votes.Size() > types.MaxVotesCount {
return fmt.Errorf("votes bit array is too big: %d, max: %d", m.Votes.Size(), types.MaxVotesCount)
return nil
// String returns a string representation.
func (m *VoteSetBitsMessage) String() string {
return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes)

+ 9
- 8
test/maverick/consensus/replay.go View File

@ -10,6 +10,7 @@ import (
abci ""
tmcon ""
@ -36,9 +37,9 @@ var crc32c = crc32.MakeTable(crc32.Castagnoli)
// Unmarshal and apply a single message to the consensus state as if it were
// received in receiveRoutine. Lines that start with "#" are ignored.
// NOTE: receiveRoutine should not be running.
func (cs *State) readReplayMessage(msg *TimedWALMessage, newStepSub types.Subscription) error {
func (cs *State) readReplayMessage(msg *tmcon.TimedWALMessage, newStepSub types.Subscription) error {
// Skip meta messages which exist for demarcating boundaries.
if _, ok := msg.Msg.(EndHeightMessage); ok {
if _, ok := msg.Msg.(tmcon.EndHeightMessage); ok {
return nil
@ -67,13 +68,13 @@ func (cs *State) readReplayMessage(msg *TimedWALMessage, newStepSub types.Subscr
peerID = "local"
switch msg := m.Msg.(type) {
case *ProposalMessage:
case *tmcon.ProposalMessage:
p := msg.Proposal
cs.Logger.Info("Replay: Proposal", "height", p.Height, "round", p.Round, "header",
p.BlockID.PartSetHeader, "pol", p.POLRound, "peer", peerID)
case *BlockPartMessage:
case *tmcon.BlockPartMessage:
cs.Logger.Info("Replay: BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerID)
case *VoteMessage:
case *tmcon.VoteMessage:
v := msg.Vote
cs.Logger.Info("Replay: Vote", "height", v.Height, "round", v.Round, "type", v.Type,
"blockID", v.BlockID, "peer", peerID)
@ -103,7 +104,7 @@ func (cs *State) catchupReplay(csHeight int64) error {
// this check (since we can crash after writing #ENDHEIGHT).
// Ignore data corruption errors since this is a sanity check.
gr, found, err := cs.wal.SearchForEndHeight(csHeight, &WALSearchOptions{IgnoreDataCorruptionErrors: true})
gr, found, err := cs.wal.SearchForEndHeight(csHeight, &tmcon.WALSearchOptions{IgnoreDataCorruptionErrors: true})
if err != nil {
return err
@ -126,7 +127,7 @@ func (cs *State) catchupReplay(csHeight int64) error {
if csHeight == cs.state.InitialHeight {
endHeight = 0
gr, found, err = cs.wal.SearchForEndHeight(endHeight, &WALSearchOptions{IgnoreDataCorruptionErrors: true})
gr, found, err = cs.wal.SearchForEndHeight(endHeight, &tmcon.WALSearchOptions{IgnoreDataCorruptionErrors: true})
if err == io.EOF {
cs.Logger.Error("Replay: returned EOF", "#ENDHEIGHT", endHeight)
} else if err != nil {
@ -139,7 +140,7 @@ func (cs *State) catchupReplay(csHeight int64) error {
cs.Logger.Info("Catchup by replaying consensus messages", "height", csHeight)
var msg *TimedWALMessage
var msg *tmcon.TimedWALMessage
dec := WALDecoder{gr}

+ 3
- 2
test/maverick/consensus/replay_file.go View File

@ -13,6 +13,7 @@ import (
dbm ""
cfg ""
tmcon ""
tmos ""
@ -73,7 +74,7 @@ func (cs *State) ReplayFile(file string, console bool) error {
defer pb.fp.Close()
var nextN int // apply N msgs in a row
var msg *TimedWALMessage
var msg *tmcon.TimedWALMessage
for {
if nextN == 0 && console {
nextN = pb.replayConsoleLoop()
@ -147,7 +148,7 @@ func (pb *playback) replayReset(count int, newStepSub types.Subscription) error
fmt.Printf("Reseting from %d to %d\n", pb.count, count)
pb.count = 0
pb.cs = newCS
var msg *TimedWALMessage
var msg *tmcon.TimedWALMessage
for i := 0; i < count; i++ {
msg, err = pb.dec.Decode()
if err == io.EOF {

+ 23
- 22
test/maverick/consensus/state.go View File

@ -14,6 +14,7 @@ import (
cfg ""
tmcon ""
cstypes ""
tmevents ""
@ -78,7 +79,7 @@ type State struct {
// a Write-Ahead Log ensures we can recover from any kind of crash
// and helps us avoid signing conflicting votes
wal WAL
wal tmcon.WAL
replayMode bool // so we don't log signing errors during replay
doWALCatchup bool // determines if we even try to do the catchup
@ -96,7 +97,7 @@ type State struct {
evsw tmevents.EventSwitch
// for reporting metrics
metrics *Metrics
metrics *tmcon.Metrics
// misbehaviors mapped for each height (can't have more than one misbehavior per height)
misbehaviors map[int64]Misbehavior
@ -134,7 +135,7 @@ func NewState(
wal: nilWAL{},
evpool: evpool,
evsw: tmevents.NewEventSwitch(),
metrics: NopMetrics(),
metrics: tmcon.NopMetrics(),
misbehaviors: misbehaviors,
// set function defaults (may be overwritten before calling Start)
@ -173,7 +174,7 @@ func (cs *State) handleMsg(mi msgInfo) {
msg, peerID := mi.Msg, mi.PeerID
switch msg := msg.(type) {
case *ProposalMessage:
case *tmcon.ProposalMessage:
// will not cause transition.
// once proposal is set, we can receive block parts
// err = cs.setProposal(msg.Proposal)
@ -182,7 +183,7 @@ func (cs *State) handleMsg(mi msgInfo) {
} else {
err = defaultReceiveProposal(cs, msg.Proposal)
case *BlockPartMessage:
case *tmcon.BlockPartMessage:
// if the proposal is complete, we'll enterPrevote or tryFinalizeCommit
added, err = cs.addProposalBlockPart(msg, peerID)
if added {
@ -200,7 +201,7 @@ func (cs *State) handleMsg(mi msgInfo) {
err = nil
case *VoteMessage:
case *tmcon.VoteMessage:
// attempt to add the vote and dupeout the validator if its a duplicate signature
// if the vote gives us a 2/3-any or 2/3-one, we transition
added, err = cs.tryAddVote(msg.Vote, peerID)
@ -442,8 +443,8 @@ var (
// msgs from the reactor which may update the state
type msgInfo struct {
Msg Message `json:"msg"`
PeerID p2p.NodeID `json:"peer_key"`
Msg tmcon.Message `json:"msg"`
PeerID p2p.NodeID `json:"peer_key"`
// internally generated messages which may update the state
@ -485,7 +486,7 @@ func (cs *State) SetEventBus(b *types.EventBus) {
// StateMetrics sets the metrics.
func StateMetrics(metrics *Metrics) StateOption {
func StateMetrics(metrics *tmcon.Metrics) StateOption {
return func(cs *State) { cs.metrics = metrics }
@ -685,7 +686,7 @@ func (cs *State) Wait() {
// OpenWAL opens a file to log all consensus messages and timeouts for
// deterministic accountability.
func (cs *State) OpenWAL(walFile string) (WAL, error) {
func (cs *State) OpenWAL(walFile string) (tmcon.WAL, error) {
wal, err := NewWAL(walFile)
if err != nil {
cs.Logger.Error("Failed to open WAL", "file", walFile, "err", err)
@ -709,9 +710,9 @@ func (cs *State) OpenWAL(walFile string) (WAL, error) {
// AddVote inputs a vote.
func (cs *State) AddVote(vote *types.Vote, peerID p2p.NodeID) (added bool, err error) {
if peerID == "" {
cs.internalMsgQueue <- msgInfo{&VoteMessage{vote}, ""}
cs.internalMsgQueue <- msgInfo{&tmcon.VoteMessage{Vote: vote}, ""}
} else {
cs.peerMsgQueue <- msgInfo{&VoteMessage{vote}, peerID}
cs.peerMsgQueue <- msgInfo{&tmcon.VoteMessage{Vote: vote}, peerID}
// TODO: wait for event?!
@ -722,9 +723,9 @@ func (cs *State) AddVote(vote *types.Vote, peerID p2p.NodeID) (added bool, err e
func (cs *State) SetProposal(proposal *types.Proposal, peerID p2p.NodeID) error {
if peerID == "" {
cs.internalMsgQueue <- msgInfo{&ProposalMessage{proposal}, ""}
cs.internalMsgQueue <- msgInfo{&tmcon.ProposalMessage{Proposal: proposal}, ""}
} else {
cs.peerMsgQueue <- msgInfo{&ProposalMessage{proposal}, peerID}
cs.peerMsgQueue <- msgInfo{&tmcon.ProposalMessage{Proposal: proposal}, peerID}
// TODO: wait for event?!
@ -735,9 +736,9 @@ func (cs *State) SetProposal(proposal *types.Proposal, peerID p2p.NodeID) error
func (cs *State) AddProposalBlockPart(height int64, round int32, part *types.Part, peerID p2p.NodeID) error {
if peerID == "" {
cs.internalMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, ""}
cs.internalMsgQueue <- msgInfo{&tmcon.BlockPartMessage{Height: height, Round: round, Part: part}, ""}
} else {
cs.peerMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, peerID}
cs.peerMsgQueue <- msgInfo{&tmcon.BlockPartMessage{Height: height, Round: round, Part: part}, peerID}
// TODO: wait for event?!
@ -998,7 +999,7 @@ func (cs *State) receiveRoutine(maxSteps int) {
panic(fmt.Sprintf("Failed to write %v msg to consensus wal due to %v. Check your FS and restart the node", mi, err))
if _, ok := mi.Msg.(*VoteMessage); ok {
if _, ok := mi.Msg.(*tmcon.VoteMessage); ok {
// we actually want to simulate failing during
// the previous WriteSync, but this isn't easy to do.
// Equivalent would be to fail here and manually remove
@ -1210,10 +1211,10 @@ func (cs *State) defaultDecideProposal(height int64, round int32) {
proposal.Signature = p.Signature
// send proposal and block parts on internal msg queue
cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""})
cs.sendInternalMessage(msgInfo{&tmcon.ProposalMessage{Proposal: proposal}, ""})
for i := 0; i < int(blockParts.Total()); i++ {
part := blockParts.GetPart(i)
cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""})
cs.sendInternalMessage(msgInfo{&tmcon.BlockPartMessage{Height: cs.Height, Round: cs.Round, Part: part}, ""})
cs.Logger.Info("Signed proposal", "height", height, "round", round, "proposal", proposal)
cs.Logger.Debug(fmt.Sprintf("Signed proposal block: %v", block))
@ -1491,7 +1492,7 @@ func (cs *State) finalizeCommit(height int64) {
// Either way, the State should not be resumed until we
// successfully call ApplyBlock (ie. later here, or in Handshake after
// restart).
endMsg := EndHeightMessage{height}
endMsg := tmcon.EndHeightMessage{Height: height}
if err := cs.wal.WriteSync(endMsg); err != nil { // NOTE: fsync
panic(fmt.Sprintf("Failed to write %v msg to consensus wal due to %v. Check your FS and restart the node",
endMsg, err))
@ -1660,7 +1661,7 @@ func (cs *State) recordMetrics(height int64, block *types.Block) {
// NOTE: block is not necessarily valid.
// Asynchronously triggers either enterPrevote (before we timeout of propose) or tryFinalizeCommit,
// once we have the full block.
func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.NodeID) (added bool, err error) {
func (cs *State) addProposalBlockPart(msg *tmcon.BlockPartMessage, peerID p2p.NodeID) (added bool, err error) {
height, round, part := msg.Height, msg.Round, msg.Part
// Blocks might be reused, so round mismatch is OK
@ -1860,7 +1861,7 @@ func (cs *State) signAddVote(msgType tmproto.SignedMsgType, hash []byte, header
// TODO: pass pubKey to signVote
vote, err := cs.signVote(msgType, hash, header)
if err == nil {
cs.sendInternalMessage(msgInfo{&VoteMessage{vote}, ""})
cs.sendInternalMessage(msgInfo{&tmcon.VoteMessage{Vote: vote}, ""})
cs.Logger.Info("Signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote)
return vote

+ 20
- 50
test/maverick/consensus/wal.go View File

@ -11,8 +11,9 @@ import (
auto ""
// tmjson ""
tmcon ""
auto ""
tmos ""
@ -30,44 +31,12 @@ const (
// types and functions for savings consensus messages
// TimedWALMessage wraps WALMessage and adds Time for debugging purposes.
type TimedWALMessage struct {
Time time.Time `json:"time"`
Msg WALMessage `json:"msg"`
// EndHeightMessage marks the end of the given height inside WAL.
// @internal used by scripts/wal2json util.
type EndHeightMessage struct {
Height int64 `json:"height"`
type WALMessage interface{}
// func init() {
// tmjson.RegisterType(msgInfo{}, "tendermint/wal/MsgInfo")
// tmjson.RegisterType(timeoutInfo{}, "tendermint/wal/TimeoutInfo")
// tmjson.RegisterType(EndHeightMessage{}, "tendermint/wal/EndHeightMessage")
// tmjson.RegisterType(tmcon.EndHeightMessage {}, "tendermint/wal/EndHeightMessage ")
// }
// Simple write-ahead logger
// WAL is an interface for any write-ahead logger.
type WAL interface {
Write(WALMessage) error
WriteSync(WALMessage) error
FlushAndSync() error
SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error)
// service methods
Start() error
Stop() error
// Write ahead logger writes msgs to disk before they are processed.
// Can be used for crash-recovery and deterministic replay.
// TODO: currently the wal is overwritten during replay catchup, give it a mode
@ -84,7 +53,7 @@ type BaseWAL struct {
flushInterval time.Duration
var _ WAL = &BaseWAL{}
var _ tmcon.WAL = &BaseWAL{}
// NewWAL returns a new write-ahead logger based on `baseWAL`, which implements
// WAL. It's flushed and synced to disk every 2s and once when stopped.
@ -126,7 +95,7 @@ func (wal *BaseWAL) OnStart() error {
if err != nil {
return err
} else if size == 0 {
if err := wal.WriteSync(EndHeightMessage{0}); err != nil {
if err := wal.WriteSync(tmcon.EndHeightMessage{Height: 0}); err != nil {
return err
@ -181,12 +150,12 @@ func (wal *BaseWAL) Wait() {
// Write is called in newStep and for each receive on the
// peerMsgQueue and the timeoutTicker.
// NOTE: does not call fsync()
func (wal *BaseWAL) Write(msg WALMessage) error {
func (wal *BaseWAL) Write(msg tmcon.WALMessage) error {
if wal == nil {
return nil
if err := wal.enc.Encode(&TimedWALMessage{tmtime.Now(), msg}); err != nil {
if err := wal.enc.Encode(&tmcon.TimedWALMessage{Time: tmtime.Now(), Msg: msg}); err != nil {
wal.Logger.Error("Error writing msg to consensus wal. WARNING: recover may not be possible for the current height",
"err", err, "msg", msg)
return err
@ -198,7 +167,7 @@ func (wal *BaseWAL) Write(msg WALMessage) error {
// WriteSync is called when we receive a msg from ourselves
// so that we write to disk before sending signed messages.
// NOTE: calls fsync()
func (wal *BaseWAL) WriteSync(msg WALMessage) error {
func (wal *BaseWAL) WriteSync(msg tmcon.WALMessage) error {
if wal == nil {
return nil
@ -230,9 +199,9 @@ type WALSearchOptions struct {
// CONTRACT: caller must close group reader.
func (wal *BaseWAL) SearchForEndHeight(
height int64,
options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
options *tmcon.WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
var (
msg *TimedWALMessage
msg *tmcon.TimedWALMessage
gr *auto.GroupReader
lastHeightFound := int64(-1)
@ -268,7 +237,7 @@ func (wal *BaseWAL) SearchForEndHeight(
return nil, false, err
if m, ok := msg.Msg.(EndHeightMessage); ok {
if m, ok := msg.Msg.(tmcon.EndHeightMessage); ok {
lastHeightFound = m.Height
if m.Height == height { // found
wal.Logger.Info("Found", "height", height, "index", index)
@ -299,7 +268,7 @@ func NewWALEncoder(wr io.Writer) *WALEncoder {
// Encode writes the custom encoding of v to the stream. It returns an error if
// the encoded size of v is greater than 1MB. Any error encountered
// during the write is also returned.
func (enc *WALEncoder) Encode(v *TimedWALMessage) error {
func (enc *WALEncoder) Encode(v *tmcon.TimedWALMessage) error {
pbMsg, err := WALToProto(v.Msg)
if err != nil {
return err
@ -366,7 +335,7 @@ func NewWALDecoder(rd io.Reader) *WALDecoder {
// Decode reads the next custom-encoded value from its reader and returns it.
func (dec *WALDecoder) Decode() (*TimedWALMessage, error) {
func (dec *WALDecoder) Decode() (*tmcon.TimedWALMessage, error) {
b := make([]byte, 4)
_, err := dec.rd.Read(b)
@ -414,7 +383,7 @@ func (dec *WALDecoder) Decode() (*TimedWALMessage, error) {
if err != nil {
return nil, DataCorruptionError{fmt.Errorf("failed to convert from proto: %w", err)}
tMsgWal := &TimedWALMessage{
tMsgWal := &tmcon.TimedWALMessage{
Time: res.Time,
Msg: walMsg,
@ -424,12 +393,13 @@ func (dec *WALDecoder) Decode() (*TimedWALMessage, error) {
type nilWAL struct{}
var _ WAL = nilWAL{}
var _ tmcon.WAL = nilWAL{}
func (nilWAL) Write(m WALMessage) error { return nil }
func (nilWAL) WriteSync(m WALMessage) error { return nil }
func (nilWAL) FlushAndSync() error { return nil }
func (nilWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
func (nilWAL) Write(m tmcon.WALMessage) error { return nil }
func (nilWAL) WriteSync(m tmcon.WALMessage) error { return nil }
func (nilWAL) FlushAndSync() error { return nil }
func (nilWAL) SearchForEndHeight(height int64,
options *tmcon.WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
return nil, false, nil
func (nilWAL) Start() error { return nil }

+ 0
- 31
test/maverick/consensus/wal_fuzz.go View File

@ -1,31 +0,0 @@
// +build gofuzz
package consensus
import (
func Fuzz(data []byte) int {
dec := NewWALDecoder(bytes.NewReader(data))
for {
msg, err := dec.Decode()
if err == io.EOF {
if err != nil {
if msg != nil {
panic("msg != nil on error")
return 0
var w bytes.Buffer
enc := NewWALEncoder(&w)
err = enc.Encode(msg)
if err != nil {
return 1

+ 7
- 6
test/maverick/consensus/wal_generator.go View File

@ -13,6 +13,7 @@ import (
cfg ""
tmcon ""
tmrand ""
@ -101,7 +102,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
numBlocksWritten := make(chan struct{})
wal := newByteBufferWAL(logger, NewWALEncoder(wr), int64(numBlocks), numBlocksWritten)
// see wal.go#103
if err := wal.Write(EndHeightMessage{0}); err != nil {
if err := wal.Write(tmcon.EndHeightMessage{Height: 0}); err != nil {
@ -190,13 +191,13 @@ func newByteBufferWAL(logger log.Logger, enc *WALEncoder, nBlocks int64, signalS
// Save writes message to the internal buffer except when heightToStop is
// reached, in which case it will signal the caller via signalWhenStopsTo and
// skip writing.
func (w *byteBufferWAL) Write(m WALMessage) error {
func (w *byteBufferWAL) Write(m tmcon.WALMessage) error {
if w.stopped {
w.logger.Debug("WAL already stopped. Not writing message", "msg", m)
return nil
if endMsg, ok := m.(EndHeightMessage); ok {
if endMsg, ok := m.(tmcon.EndHeightMessage); ok {
w.logger.Debug("WAL write end height message", "height", endMsg.Height, "stopHeight", w.heightToStop)
if endMsg.Height == w.heightToStop {
w.logger.Debug("Stopping WAL at height", "height", endMsg.Height)
@ -207,7 +208,7 @@ func (w *byteBufferWAL) Write(m WALMessage) error {
w.logger.Debug("WAL Write Message", "msg", m)
err := w.enc.Encode(&TimedWALMessage{fixedTime, m})
err := w.enc.Encode(&tmcon.TimedWALMessage{Time: fixedTime, Msg: m})
if err != nil {
panic(fmt.Sprintf("failed to encode the msg %v", m))
@ -215,7 +216,7 @@ func (w *byteBufferWAL) Write(m WALMessage) error {
return nil
func (w *byteBufferWAL) WriteSync(m WALMessage) error {
func (w *byteBufferWAL) WriteSync(m tmcon.WALMessage) error {
return w.Write(m)
@ -223,7 +224,7 @@ func (w *byteBufferWAL) FlushAndSync() error { return nil }
func (w *byteBufferWAL) SearchForEndHeight(
height int64,
options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
options *tmcon.WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
return nil, false, nil

+ 5
- 5
test/maverick/node/node.go View File

@ -137,19 +137,19 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger, misbehaviors map[int6
// MetricsProvider returns a consensus, p2p and mempool Metrics.
type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics)
type MetricsProvider func(chainID string) (*consensus.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics)
// DefaultMetricsProvider returns Metrics build using Prometheus client library
// if Prometheus is enabled. Otherwise, it returns no-op Metrics.
func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) {
return func(chainID string) (*consensus.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) {
if config.Prometheus {
return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID),
return consensus.PrometheusMetrics(config.Namespace, "chain_id", chainID),
p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID),
mempl.PrometheusMetrics(config.Namespace, "chain_id", chainID),
sm.PrometheusMetrics(config.Namespace, "chain_id", chainID)
return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics()
return consensus.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics()
@ -472,7 +472,7 @@ func createConsensusReactor(config *cfg.Config,
mempool *mempl.CListMempool,
evidencePool *evidence.Pool,
privValidator types.PrivValidator,
csMetrics *cs.Metrics,
csMetrics *consensus.Metrics,
waitSync bool,
eventBus *types.EventBus,
consensusLogger log.Logger,
