From fa3287c0127207db2d323f6fcf81e51a2ac9938a Mon Sep 17 00:00:00 2001 From: Marko Date: Fri, 5 Feb 2021 09:29:26 +0000 Subject: [PATCH] maverick: reduce some duplication (#6052) - Reduce duplication in messages and metrics. - merge WAL interfaces. Meant to push the developer to make changes in both places. --- test/maverick/consensus/metrics.go | 220 ------------- test/maverick/consensus/misbehavior.go | 7 +- test/maverick/consensus/msgs.go | 278 +--------------- test/maverick/consensus/reactor.go | 391 +++-------------------- test/maverick/consensus/replay.go | 17 +- test/maverick/consensus/replay_file.go | 5 +- test/maverick/consensus/state.go | 45 +-- test/maverick/consensus/wal.go | 70 ++-- test/maverick/consensus/wal_fuzz.go | 31 -- test/maverick/consensus/wal_generator.go | 13 +- test/maverick/node/node.go | 10 +- 11 files changed, 124 insertions(+), 963 deletions(-) delete mode 100644 test/maverick/consensus/metrics.go delete mode 100644 test/maverick/consensus/wal_fuzz.go diff --git a/test/maverick/consensus/metrics.go b/test/maverick/consensus/metrics.go deleted file mode 100644 index bbd823a3f..000000000 --- a/test/maverick/consensus/metrics.go +++ /dev/null @@ -1,220 +0,0 @@ -package consensus - -import ( - "github.com/go-kit/kit/metrics" - "github.com/go-kit/kit/metrics/discard" - - prometheus "github.com/go-kit/kit/metrics/prometheus" - stdprometheus "github.com/prometheus/client_golang/prometheus" -) - -const ( - // MetricsSubsystem is a subsystem shared by all metrics exposed by this - // package. - MetricsSubsystem = "consensus" -) - -// Metrics contains metrics exposed by this package. -type Metrics struct { - // Height of the chain. - Height metrics.Gauge - - // ValidatorLastSignedHeight of a validator. - ValidatorLastSignedHeight metrics.Gauge - - // Number of rounds. - Rounds metrics.Gauge - - // Number of validators. - Validators metrics.Gauge - // Total power of all validators. - ValidatorsPower metrics.Gauge - // Power of a validator. - ValidatorPower metrics.Gauge - // Amount of blocks missed by a validator. - ValidatorMissedBlocks metrics.Gauge - // Number of validators who did not sign. - MissingValidators metrics.Gauge - // Total power of the missing validators. - MissingValidatorsPower metrics.Gauge - // Number of validators who tried to double sign. - ByzantineValidators metrics.Gauge - // Total power of the byzantine validators. - ByzantineValidatorsPower metrics.Gauge - - // Time between this and the last block. - BlockIntervalSeconds metrics.Histogram - - // Number of transactions. - NumTxs metrics.Gauge - // Size of the block. - BlockSizeBytes metrics.Gauge - // Total number of transactions. - TotalTxs metrics.Gauge - // The latest block height. - CommittedHeight metrics.Gauge - // Whether or not a node is fast syncing. 1 if yes, 0 if no. - FastSyncing metrics.Gauge - // Whether or not a node is state syncing. 1 if yes, 0 if no. - StateSyncing metrics.Gauge - - // Number of blockparts transmitted by peer. - BlockParts metrics.Counter -} - -// PrometheusMetrics returns Metrics build using Prometheus client library. -// Optionally, labels can be provided along with their values ("foo", -// "fooValue"). -func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { - labels := []string{} - for i := 0; i < len(labelsAndValues); i += 2 { - labels = append(labels, labelsAndValues[i]) - } - return &Metrics{ - Height: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "height", - Help: "Height of the chain.", - }, labels).With(labelsAndValues...), - Rounds: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "rounds", - Help: "Number of rounds.", - }, labels).With(labelsAndValues...), - - Validators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "validators", - Help: "Number of validators.", - }, labels).With(labelsAndValues...), - ValidatorLastSignedHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "validator_last_signed_height", - Help: "Last signed height for a validator", - }, append(labels, "validator_address")).With(labelsAndValues...), - ValidatorMissedBlocks: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "validator_missed_blocks", - Help: "Total missed blocks for a validator", - }, append(labels, "validator_address")).With(labelsAndValues...), - ValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "validators_power", - Help: "Total power of all validators.", - }, labels).With(labelsAndValues...), - ValidatorPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "validator_power", - Help: "Power of a validator", - }, append(labels, "validator_address")).With(labelsAndValues...), - MissingValidators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "missing_validators", - Help: "Number of validators who did not sign.", - }, labels).With(labelsAndValues...), - MissingValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "missing_validators_power", - Help: "Total power of the missing validators.", - }, labels).With(labelsAndValues...), - ByzantineValidators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "byzantine_validators", - Help: "Number of validators who tried to double sign.", - }, labels).With(labelsAndValues...), - ByzantineValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "byzantine_validators_power", - Help: "Total power of the byzantine validators.", - }, labels).With(labelsAndValues...), - BlockIntervalSeconds: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "block_interval_seconds", - Help: "Time between this and the last block.", - }, labels).With(labelsAndValues...), - NumTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "num_txs", - Help: "Number of transactions.", - }, labels).With(labelsAndValues...), - BlockSizeBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "block_size_bytes", - Help: "Size of the block.", - }, labels).With(labelsAndValues...), - TotalTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "total_txs", - Help: "Total number of transactions.", - }, labels).With(labelsAndValues...), - CommittedHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "latest_block_height", - Help: "The latest block height.", - }, labels).With(labelsAndValues...), - FastSyncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "fast_syncing", - Help: "Whether or not a node is fast syncing. 1 if yes, 0 if no.", - }, labels).With(labelsAndValues...), - StateSyncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "state_syncing", - Help: "Whether or not a node is state syncing. 1 if yes, 0 if no.", - }, labels).With(labelsAndValues...), - BlockParts: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "block_parts", - Help: "Number of blockparts transmitted by peer.", - }, append(labels, "peer_id")).With(labelsAndValues...), - } -} - -// NopMetrics returns no-op Metrics. -func NopMetrics() *Metrics { - return &Metrics{ - Height: discard.NewGauge(), - - ValidatorLastSignedHeight: discard.NewGauge(), - - Rounds: discard.NewGauge(), - - Validators: discard.NewGauge(), - ValidatorsPower: discard.NewGauge(), - ValidatorPower: discard.NewGauge(), - ValidatorMissedBlocks: discard.NewGauge(), - MissingValidators: discard.NewGauge(), - MissingValidatorsPower: discard.NewGauge(), - ByzantineValidators: discard.NewGauge(), - ByzantineValidatorsPower: discard.NewGauge(), - - BlockIntervalSeconds: discard.NewHistogram(), - - NumTxs: discard.NewGauge(), - BlockSizeBytes: discard.NewGauge(), - TotalTxs: discard.NewGauge(), - CommittedHeight: discard.NewGauge(), - FastSyncing: discard.NewGauge(), - StateSyncing: discard.NewGauge(), - BlockParts: discard.NewCounter(), - } -} diff --git a/test/maverick/consensus/misbehavior.go b/test/maverick/consensus/misbehavior.go index 75d2bd278..ce78173a9 100644 --- a/test/maverick/consensus/misbehavior.go +++ b/test/maverick/consensus/misbehavior.go @@ -3,6 +3,7 @@ package consensus import ( "fmt" + tmcon "github.com/tendermint/tendermint/consensus" cstypes "github.com/tendermint/tendermint/consensus/types" tmproto "github.com/tendermint/tendermint/proto/tendermint/types" "github.com/tendermint/tendermint/types" @@ -89,16 +90,16 @@ func DoublePrevoteMisbehavior() Misbehavior { } // add our own vote - cs.sendInternalMessage(msgInfo{&VoteMessage{prevote}, ""}) + cs.sendInternalMessage(msgInfo{&tmcon.VoteMessage{Vote: prevote}, ""}) cs.Logger.Info("Sending conflicting votes") peers := cs.sw.Peers().List() // there has to be at least two other peers connected else this behavior works normally for idx, peer := range peers { if idx%2 == 0 { // sign the proposal block - peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote})) + peer.Send(VoteChannel, tmcon.MustEncode(&tmcon.VoteMessage{Vote: prevote})) } else { // sign a nil block - peer.Send(VoteChannel, MustEncode(&VoteMessage{nilPrevote})) + peer.Send(VoteChannel, tmcon.MustEncode(&tmcon.VoteMessage{Vote: nilPrevote})) } } } diff --git a/test/maverick/consensus/msgs.go b/test/maverick/consensus/msgs.go index 4de96b5f4..de78cad36 100644 --- a/test/maverick/consensus/msgs.go +++ b/test/maverick/consensus/msgs.go @@ -4,10 +4,8 @@ import ( "errors" "fmt" - "github.com/gogo/protobuf/proto" - + tmcon "github.com/tendermint/tendermint/consensus" cstypes "github.com/tendermint/tendermint/consensus/types" - "github.com/tendermint/tendermint/libs/bits" tmmath "github.com/tendermint/tendermint/libs/math" "github.com/tendermint/tendermint/p2p" tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus" @@ -15,267 +13,7 @@ import ( "github.com/tendermint/tendermint/types" ) -// MsgToProto takes a consensus message type and returns the proto defined consensus message -func MsgToProto(msg Message) (*tmcons.Message, error) { - if msg == nil { - return nil, errors.New("consensus: message is nil") - } - var pb tmcons.Message - - switch msg := msg.(type) { - case *NewRoundStepMessage: - pb = tmcons.Message{ - Sum: &tmcons.Message_NewRoundStep{ - NewRoundStep: &tmcons.NewRoundStep{ - Height: msg.Height, - Round: msg.Round, - Step: uint32(msg.Step), - SecondsSinceStartTime: msg.SecondsSinceStartTime, - LastCommitRound: msg.LastCommitRound, - }, - }, - } - case *NewValidBlockMessage: - pbPartSetHeader := msg.BlockPartSetHeader.ToProto() - pbBits := msg.BlockParts.ToProto() - pb = tmcons.Message{ - Sum: &tmcons.Message_NewValidBlock{ - NewValidBlock: &tmcons.NewValidBlock{ - Height: msg.Height, - Round: msg.Round, - BlockPartSetHeader: pbPartSetHeader, - BlockParts: pbBits, - IsCommit: msg.IsCommit, - }, - }, - } - case *ProposalMessage: - pbP := msg.Proposal.ToProto() - pb = tmcons.Message{ - Sum: &tmcons.Message_Proposal{ - Proposal: &tmcons.Proposal{ - Proposal: *pbP, - }, - }, - } - case *ProposalPOLMessage: - pbBits := msg.ProposalPOL.ToProto() - pb = tmcons.Message{ - Sum: &tmcons.Message_ProposalPol{ - ProposalPol: &tmcons.ProposalPOL{ - Height: msg.Height, - ProposalPolRound: msg.ProposalPOLRound, - ProposalPol: *pbBits, - }, - }, - } - case *BlockPartMessage: - parts, err := msg.Part.ToProto() - if err != nil { - return nil, fmt.Errorf("msg to proto error: %w", err) - } - pb = tmcons.Message{ - Sum: &tmcons.Message_BlockPart{ - BlockPart: &tmcons.BlockPart{ - Height: msg.Height, - Round: msg.Round, - Part: *parts, - }, - }, - } - case *VoteMessage: - vote := msg.Vote.ToProto() - pb = tmcons.Message{ - Sum: &tmcons.Message_Vote{ - Vote: &tmcons.Vote{ - Vote: vote, - }, - }, - } - case *HasVoteMessage: - pb = tmcons.Message{ - Sum: &tmcons.Message_HasVote{ - HasVote: &tmcons.HasVote{ - Height: msg.Height, - Round: msg.Round, - Type: msg.Type, - Index: msg.Index, - }, - }, - } - case *VoteSetMaj23Message: - bi := msg.BlockID.ToProto() - pb = tmcons.Message{ - Sum: &tmcons.Message_VoteSetMaj23{ - VoteSetMaj23: &tmcons.VoteSetMaj23{ - Height: msg.Height, - Round: msg.Round, - Type: msg.Type, - BlockID: bi, - }, - }, - } - case *VoteSetBitsMessage: - bi := msg.BlockID.ToProto() - bits := msg.Votes.ToProto() - - vsb := &tmcons.Message_VoteSetBits{ - VoteSetBits: &tmcons.VoteSetBits{ - Height: msg.Height, - Round: msg.Round, - Type: msg.Type, - BlockID: bi, - }, - } - - if bits != nil { - vsb.VoteSetBits.Votes = *bits - } - - pb = tmcons.Message{ - Sum: vsb, - } - - default: - return nil, fmt.Errorf("consensus: message not recognized: %T", msg) - } - - return &pb, nil -} - -// MsgFromProto takes a consensus proto message and returns the native go type -func MsgFromProto(msg *tmcons.Message) (Message, error) { - if msg == nil { - return nil, errors.New("consensus: nil message") - } - var pb Message - - switch msg := msg.Sum.(type) { - case *tmcons.Message_NewRoundStep: - rs, err := tmmath.SafeConvertUint8(int64(msg.NewRoundStep.Step)) - // deny message based on possible overflow - if err != nil { - return nil, fmt.Errorf("denying message due to possible overflow: %w", err) - } - pb = &NewRoundStepMessage{ - Height: msg.NewRoundStep.Height, - Round: msg.NewRoundStep.Round, - Step: cstypes.RoundStepType(rs), - SecondsSinceStartTime: msg.NewRoundStep.SecondsSinceStartTime, - LastCommitRound: msg.NewRoundStep.LastCommitRound, - } - case *tmcons.Message_NewValidBlock: - pbPartSetHeader, err := types.PartSetHeaderFromProto(&msg.NewValidBlock.BlockPartSetHeader) - if err != nil { - return nil, fmt.Errorf("parts to proto error: %w", err) - } - - pbBits := new(bits.BitArray) - pbBits.FromProto(msg.NewValidBlock.BlockParts) - - pb = &NewValidBlockMessage{ - Height: msg.NewValidBlock.Height, - Round: msg.NewValidBlock.Round, - BlockPartSetHeader: *pbPartSetHeader, - BlockParts: pbBits, - IsCommit: msg.NewValidBlock.IsCommit, - } - case *tmcons.Message_Proposal: - pbP, err := types.ProposalFromProto(&msg.Proposal.Proposal) - if err != nil { - return nil, fmt.Errorf("proposal msg to proto error: %w", err) - } - - pb = &ProposalMessage{ - Proposal: pbP, - } - case *tmcons.Message_ProposalPol: - pbBits := new(bits.BitArray) - pbBits.FromProto(&msg.ProposalPol.ProposalPol) - pb = &ProposalPOLMessage{ - Height: msg.ProposalPol.Height, - ProposalPOLRound: msg.ProposalPol.ProposalPolRound, - ProposalPOL: pbBits, - } - case *tmcons.Message_BlockPart: - parts, err := types.PartFromProto(&msg.BlockPart.Part) - if err != nil { - return nil, fmt.Errorf("blockpart msg to proto error: %w", err) - } - pb = &BlockPartMessage{ - Height: msg.BlockPart.Height, - Round: msg.BlockPart.Round, - Part: parts, - } - case *tmcons.Message_Vote: - vote, err := types.VoteFromProto(msg.Vote.Vote) - if err != nil { - return nil, fmt.Errorf("vote msg to proto error: %w", err) - } - - pb = &VoteMessage{ - Vote: vote, - } - case *tmcons.Message_HasVote: - pb = &HasVoteMessage{ - Height: msg.HasVote.Height, - Round: msg.HasVote.Round, - Type: msg.HasVote.Type, - Index: msg.HasVote.Index, - } - case *tmcons.Message_VoteSetMaj23: - bi, err := types.BlockIDFromProto(&msg.VoteSetMaj23.BlockID) - if err != nil { - return nil, fmt.Errorf("voteSetMaj23 msg to proto error: %w", err) - } - pb = &VoteSetMaj23Message{ - Height: msg.VoteSetMaj23.Height, - Round: msg.VoteSetMaj23.Round, - Type: msg.VoteSetMaj23.Type, - BlockID: *bi, - } - case *tmcons.Message_VoteSetBits: - bi, err := types.BlockIDFromProto(&msg.VoteSetBits.BlockID) - if err != nil { - return nil, fmt.Errorf("voteSetBits msg to proto error: %w", err) - } - bits := new(bits.BitArray) - bits.FromProto(&msg.VoteSetBits.Votes) - - pb = &VoteSetBitsMessage{ - Height: msg.VoteSetBits.Height, - Round: msg.VoteSetBits.Round, - Type: msg.VoteSetBits.Type, - BlockID: *bi, - Votes: bits, - } - default: - return nil, fmt.Errorf("consensus: message not recognized: %T", msg) - } - - if err := pb.ValidateBasic(); err != nil { - return nil, err - } - - return pb, nil -} - -// MustEncode takes the reactors msg, makes it proto and marshals it -// this mimics `MustMarshalBinaryBare` in that is panics on error -func MustEncode(msg Message) []byte { - pb, err := MsgToProto(msg) - if err != nil { - panic(err) - } - enc, err := proto.Marshal(pb) - if err != nil { - panic(err) - } - return enc -} - -// WALToProto takes a WAL message and return a proto walMessage and error -func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) { +func WALToProto(msg tmcon.WALMessage) (*tmcons.WALMessage, error) { var pb tmcons.WALMessage switch msg := msg.(type) { @@ -290,7 +28,7 @@ func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) { }, } case msgInfo: - consMsg, err := MsgToProto(msg.Msg) + consMsg, err := tmcon.MsgToProto(msg.Msg) if err != nil { return nil, err } @@ -313,7 +51,7 @@ func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) { }, }, } - case EndHeightMessage: + case tmcon.EndHeightMessage: pb = tmcons.WALMessage{ Sum: &tmcons.WALMessage_EndHeight{ EndHeight: &tmcons.EndHeight{ @@ -329,11 +67,11 @@ func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) { } // WALFromProto takes a proto wal message and return a consensus walMessage and error -func WALFromProto(msg *tmcons.WALMessage) (WALMessage, error) { +func WALFromProto(msg *tmcons.WALMessage) (tmcon.WALMessage, error) { if msg == nil { return nil, errors.New("nil WAL message") } - var pb WALMessage + var pb tmcon.WALMessage switch msg := msg.Sum.(type) { case *tmcons.WALMessage_EventDataRoundState: @@ -343,7 +81,7 @@ func WALFromProto(msg *tmcons.WALMessage) (WALMessage, error) { Step: msg.EventDataRoundState.Step, } case *tmcons.WALMessage_MsgInfo: - walMsg, err := MsgFromProto(&msg.MsgInfo.Msg) + walMsg, err := tmcon.MsgFromProto(&msg.MsgInfo.Msg) if err != nil { return nil, fmt.Errorf("msgInfo from proto error: %w", err) } @@ -366,7 +104,7 @@ func WALFromProto(msg *tmcons.WALMessage) (WALMessage, error) { } return pb, nil case *tmcons.WALMessage_EndHeight: - pb := EndHeightMessage{ + pb := tmcon.EndHeightMessage{ Height: msg.EndHeight.Height, } return pb, nil diff --git a/test/maverick/consensus/reactor.go b/test/maverick/consensus/reactor.go index e623aebee..bd303a2ee 100644 --- a/test/maverick/consensus/reactor.go +++ b/test/maverick/consensus/reactor.go @@ -9,6 +9,7 @@ import ( "github.com/gogo/protobuf/proto" + tmcon "github.com/tendermint/tendermint/consensus" cstypes "github.com/tendermint/tendermint/consensus/types" "github.com/tendermint/tendermint/libs/bits" tmevents "github.com/tendermint/tendermint/libs/events" @@ -47,7 +48,7 @@ type Reactor struct { waitSync bool eventBus *types.EventBus - Metrics *Metrics + Metrics *tmcon.Metrics } type ReactorOption func(*Reactor) @@ -58,7 +59,7 @@ func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption) conR := &Reactor{ conS: consensusState, waitSync: waitSync, - Metrics: NopMetrics(), + Metrics: tmcon.NopMetrics(), } conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR) @@ -252,7 +253,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { switch chID { case StateChannel: switch msg := msg.(type) { - case *NewRoundStepMessage: + case *tmcon.NewRoundStepMessage: conR.conS.mtx.Lock() initialHeight := conR.conS.state.InitialHeight conR.conS.mtx.Unlock() @@ -262,11 +263,11 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { return } ps.ApplyNewRoundStepMessage(msg) - case *NewValidBlockMessage: + case *tmcon.NewValidBlockMessage: ps.ApplyNewValidBlockMessage(msg) - case *HasVoteMessage: + case *tmcon.HasVoteMessage: ps.ApplyHasVoteMessage(msg) - case *VoteSetMaj23Message: + case *tmcon.VoteSetMaj23Message: cs := conR.conS cs.mtx.Lock() height, votes := cs.Height, cs.Votes @@ -291,7 +292,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { default: panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?") } - src.TrySend(VoteSetBitsChannel, MustEncode(&VoteSetBitsMessage{ + src.TrySend(VoteSetBitsChannel, tmcon.MustEncode(&tmcon.VoteSetBitsMessage{ Height: msg.Height, Round: msg.Round, Type: msg.Type, @@ -308,12 +309,12 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { return } switch msg := msg.(type) { - case *ProposalMessage: + case *tmcon.ProposalMessage: ps.SetHasProposal(msg.Proposal) conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()} - case *ProposalPOLMessage: + case *tmcon.ProposalPOLMessage: ps.ApplyProposalPOLMessage(msg) - case *BlockPartMessage: + case *tmcon.BlockPartMessage: ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index)) conR.Metrics.BlockParts.With("peer_id", string(src.ID())).Add(1) conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()} @@ -327,7 +328,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { return } switch msg := msg.(type) { - case *VoteMessage: + case *tmcon.VoteMessage: cs := conR.conS cs.mtx.RLock() height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size() @@ -349,7 +350,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { return } switch msg := msg.(type) { - case *VoteSetBitsMessage: + case *tmcon.VoteSetBitsMessage: cs := conR.conS cs.mtx.Lock() height, votes := cs.Height, cs.Votes @@ -429,29 +430,29 @@ func (conR *Reactor) unsubscribeFromBroadcastEvents() { func (conR *Reactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) { nrsMsg := makeRoundStepMessage(rs) - conR.Switch.Broadcast(StateChannel, MustEncode(nrsMsg)) + conR.Switch.Broadcast(StateChannel, tmcon.MustEncode(nrsMsg)) } func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) { - csMsg := &NewValidBlockMessage{ + csMsg := &tmcon.NewValidBlockMessage{ Height: rs.Height, Round: rs.Round, BlockPartSetHeader: rs.ProposalBlockParts.Header(), BlockParts: rs.ProposalBlockParts.BitArray(), IsCommit: rs.Step == cstypes.RoundStepCommit, } - conR.Switch.Broadcast(StateChannel, MustEncode(csMsg)) + conR.Switch.Broadcast(StateChannel, tmcon.MustEncode(csMsg)) } // Broadcasts HasVoteMessage to peers that care. func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) { - msg := &HasVoteMessage{ + msg := &tmcon.HasVoteMessage{ Height: vote.Height, Round: vote.Round, Type: vote.Type, Index: vote.ValidatorIndex, } - conR.Switch.Broadcast(StateChannel, MustEncode(msg)) + conR.Switch.Broadcast(StateChannel, tmcon.MustEncode(msg)) /* // TODO: Make this broadcast more selective. for _, peer := range conR.Switch.Peers().List() { @@ -472,8 +473,8 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) { */ } -func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage) { - nrsMsg = &NewRoundStepMessage{ +func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *tmcon.NewRoundStepMessage) { + nrsMsg = &tmcon.NewRoundStepMessage{ Height: rs.Height, Round: rs.Round, Step: rs.Step, @@ -486,7 +487,7 @@ func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage) func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) { rs := conR.conS.GetRoundState() nrsMsg := makeRoundStepMessage(rs) - peer.Send(StateChannel, MustEncode(nrsMsg)) + peer.Send(StateChannel, tmcon.MustEncode(nrsMsg)) } func (conR *Reactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) { @@ -506,13 +507,13 @@ OUTER_LOOP: if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader) { if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok { part := rs.ProposalBlockParts.GetPart(index) - msg := &BlockPartMessage{ + msg := &tmcon.BlockPartMessage{ Height: rs.Height, // This tells peer that this part applies to us. Round: rs.Round, // This tells peer that this part applies to us. Part: part, } logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round) - if peer.Send(DataChannel, MustEncode(msg)) { + if peer.Send(DataChannel, tmcon.MustEncode(msg)) { ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) } continue OUTER_LOOP @@ -555,9 +556,9 @@ OUTER_LOOP: if rs.Proposal != nil && !prs.Proposal { // Proposal: share the proposal metadata with peer. { - msg := &ProposalMessage{Proposal: rs.Proposal} + msg := &tmcon.ProposalMessage{Proposal: rs.Proposal} logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round) - if peer.Send(DataChannel, MustEncode(msg)) { + if peer.Send(DataChannel, tmcon.MustEncode(msg)) { // NOTE[ZM]: A peer might have received different proposal msg so this Proposal msg will be rejected! ps.SetHasProposal(rs.Proposal) } @@ -567,13 +568,13 @@ OUTER_LOOP: // rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round, // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound). if 0 <= rs.Proposal.POLRound { - msg := &ProposalPOLMessage{ + msg := &tmcon.ProposalPOLMessage{ Height: rs.Height, ProposalPOLRound: rs.Proposal.POLRound, ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(), } logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round) - peer.Send(DataChannel, MustEncode(msg)) + peer.Send(DataChannel, tmcon.MustEncode(msg)) } continue OUTER_LOOP } @@ -610,13 +611,13 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt return } // Send the part - msg := &BlockPartMessage{ + msg := &tmcon.BlockPartMessage{ Height: prs.Height, // Not our height, so it doesn't matter. Round: prs.Round, // Not our height, so it doesn't matter. Part: part, } logger.Debug("Sending block part for catchup", "round", prs.Round, "index", index) - if peer.Send(DataChannel, MustEncode(msg)) { + if peer.Send(DataChannel, tmcon.MustEncode(msg)) { ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) } else { logger.Debug("Sending block part for catchup failed") @@ -773,7 +774,7 @@ OUTER_LOOP: prs := ps.GetRoundState() if rs.Height == prs.Height { if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok { - peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{ + peer.TrySend(StateChannel, tmcon.MustEncode(&tmcon.VoteSetMaj23Message{ Height: prs.Height, Round: prs.Round, Type: tmproto.PrevoteType, @@ -790,7 +791,7 @@ OUTER_LOOP: prs := ps.GetRoundState() if rs.Height == prs.Height { if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok { - peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{ + peer.TrySend(StateChannel, tmcon.MustEncode(&tmcon.VoteSetMaj23Message{ Height: prs.Height, Round: prs.Round, Type: tmproto.PrecommitType, @@ -807,7 +808,7 @@ OUTER_LOOP: prs := ps.GetRoundState() if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 { if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok { - peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{ + peer.TrySend(StateChannel, tmcon.MustEncode(&tmcon.VoteSetMaj23Message{ Height: prs.Height, Round: prs.ProposalPOLRound, Type: tmproto.PrevoteType, @@ -827,7 +828,7 @@ OUTER_LOOP: if prs.CatchupCommitRound != -1 && prs.Height > 0 && prs.Height <= conR.conS.blockStore.Height() && prs.Height >= conR.conS.blockStore.Base() { if commit := conR.conS.LoadCommit(prs.Height); commit != nil { - peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{ + peer.TrySend(StateChannel, tmcon.MustEncode(&tmcon.VoteSetMaj23Message{ Height: prs.Height, Round: commit.Round, Type: tmproto.PrecommitType, @@ -866,11 +867,11 @@ func (conR *Reactor) peerStatsRoutine() { panic(fmt.Sprintf("Peer %v has no state", peer)) } switch msg.Msg.(type) { - case *VoteMessage: + case *tmcon.VoteMessage: if numVotes := ps.RecordVote(); numVotes%votesToContributeToBecomeGoodPeer == 0 { conR.Switch.MarkPeerAsGood(peer) } - case *BlockPartMessage: + case *tmcon.BlockPartMessage: if numParts := ps.RecordBlockPart(); numParts%blocksToContributeToBecomeGoodPeer == 0 { conR.Switch.MarkPeerAsGood(peer) } @@ -908,7 +909,7 @@ func (conR *Reactor) StringIndented(indent string) string { } // ReactorMetrics sets the metrics -func ReactorMetrics(metrics *Metrics) ReactorOption { +func ReactorMetrics(metrics *tmcon.Metrics) ReactorOption { return func(conR *Reactor) { conR.Metrics = metrics } } @@ -1046,9 +1047,9 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in // Returns true if vote was sent. func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool { if vote, ok := ps.PickVoteToSend(votes); ok { - msg := &VoteMessage{vote} + msg := &tmcon.VoteMessage{Vote: vote} ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote) - if ps.peer.Send(VoteChannel, MustEncode(msg)) { + if ps.peer.Send(VoteChannel, tmcon.MustEncode(msg)) { ps.SetHasVote(vote) return true } @@ -1255,7 +1256,7 @@ func (ps *PeerState) setHasVote(height int64, round int32, voteType tmproto.Sign } // ApplyNewRoundStepMessage updates the peer state for the new round. -func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) { +func (ps *PeerState) ApplyNewRoundStepMessage(msg *tmcon.NewRoundStepMessage) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -1308,7 +1309,7 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) { } // ApplyNewValidBlockMessage updates the peer state for the new valid block. -func (ps *PeerState) ApplyNewValidBlockMessage(msg *NewValidBlockMessage) { +func (ps *PeerState) ApplyNewValidBlockMessage(msg *tmcon.NewValidBlockMessage) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -1325,7 +1326,7 @@ func (ps *PeerState) ApplyNewValidBlockMessage(msg *NewValidBlockMessage) { } // ApplyProposalPOLMessage updates the peer state for the new proposal POL. -func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) { +func (ps *PeerState) ApplyProposalPOLMessage(msg *tmcon.ProposalPOLMessage) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -1342,7 +1343,7 @@ func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) { } // ApplyHasVoteMessage updates the peer state for the new vote. -func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) { +func (ps *PeerState) ApplyHasVoteMessage(msg *tmcon.HasVoteMessage) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -1358,7 +1359,7 @@ func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) { // `ourVotes` is a BitArray of votes we have for msg.BlockID // NOTE: if ourVotes is nil (e.g. msg.Height < rs.Height), // we conservatively overwrite ps's votes w/ msg.Votes. -func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *bits.BitArray) { +func (ps *PeerState) ApplyVoteSetBitsMessage(msg *tmcon.VoteSetBitsMessage, ourVotes *bits.BitArray) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -1395,12 +1396,6 @@ func (ps *PeerState) StringIndented(indent string) string { } //----------------------------------------------------------------------------- -// Messages - -// Message is a message that can be sent and received on the Reactor -type Message interface { - ValidateBasic() error -} // func init() { // tmjson.RegisterType(&NewRoundStepMessage{}, "tendermint/NewRoundStepMessage") @@ -1414,307 +1409,11 @@ type Message interface { // tmjson.RegisterType(&VoteSetBitsMessage{}, "tendermint/VoteSetBits") // } -func decodeMsg(bz []byte) (msg Message, err error) { +func decodeMsg(bz []byte) (msg tmcon.Message, err error) { pb := &tmcons.Message{} if err = proto.Unmarshal(bz, pb); err != nil { return msg, err } - return MsgFromProto(pb) -} - -//------------------------------------- - -// NewRoundStepMessage is sent for every step taken in the ConsensusState. -// For every height/round/step transition -type NewRoundStepMessage struct { - Height int64 - Round int32 - Step cstypes.RoundStepType - SecondsSinceStartTime int64 - LastCommitRound int32 -} - -// ValidateBasic performs basic validation. -func (m *NewRoundStepMessage) ValidateBasic() error { - if m.Height < 0 { - return errors.New("negative Height") - } - if m.Round < 0 { - return errors.New("negative Round") - } - if !m.Step.IsValid() { - return errors.New("invalid Step") - } - - // NOTE: SecondsSinceStartTime may be negative - - // LastCommitRound will be -1 for the initial height, but we don't know what height this is - // since it can be specified in genesis. The reactor will have to validate this via - // ValidateHeight(). - if m.LastCommitRound < -1 { - return errors.New("invalid LastCommitRound (cannot be < -1)") - } - - return nil -} - -// ValidateHeight validates the height given the chain's initial height. -func (m *NewRoundStepMessage) ValidateHeight(initialHeight int64) error { - if m.Height < initialHeight { - return fmt.Errorf("invalid Height %v (lower than initial height %v)", - m.Height, initialHeight) - } - if m.Height == initialHeight && m.LastCommitRound != -1 { - return fmt.Errorf("invalid LastCommitRound %v (must be -1 for initial height %v)", - m.LastCommitRound, initialHeight) - } - if m.Height > initialHeight && m.LastCommitRound < 0 { - return fmt.Errorf("LastCommitRound can only be negative for initial height %v", // nolint - initialHeight) - } - return nil -} - -// String returns a string representation. -func (m *NewRoundStepMessage) String() string { - return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]", - m.Height, m.Round, m.Step, m.LastCommitRound) -} - -//------------------------------------- - -// NewValidBlockMessage is sent when a validator observes a valid block B in some round r, -// i.e., there is a Proposal for block B and 2/3+ prevotes for the block B in the round r. -// In case the block is also committed, then IsCommit flag is set to true. -type NewValidBlockMessage struct { - Height int64 - Round int32 - BlockPartSetHeader types.PartSetHeader - BlockParts *bits.BitArray - IsCommit bool -} - -// ValidateBasic performs basic validation. -func (m *NewValidBlockMessage) ValidateBasic() error { - if m.Height < 0 { - return errors.New("negative Height") - } - if m.Round < 0 { - return errors.New("negative Round") - } - if err := m.BlockPartSetHeader.ValidateBasic(); err != nil { - return fmt.Errorf("wrong BlockPartSetHeader: %v", err) - } - if m.BlockParts.Size() == 0 { - return errors.New("empty blockParts") - } - if m.BlockParts.Size() != int(m.BlockPartSetHeader.Total) { - return fmt.Errorf("blockParts bit array size %d not equal to BlockPartSetHeader.Total %d", - m.BlockParts.Size(), - m.BlockPartSetHeader.Total) - } - if m.BlockParts.Size() > int(types.MaxBlockPartsCount) { - return fmt.Errorf("blockParts bit array is too big: %d, max: %d", m.BlockParts.Size(), types.MaxBlockPartsCount) - } - return nil -} - -// String returns a string representation. -func (m *NewValidBlockMessage) String() string { - return fmt.Sprintf("[ValidBlockMessage H:%v R:%v BP:%v BA:%v IsCommit:%v]", - m.Height, m.Round, m.BlockPartSetHeader, m.BlockParts, m.IsCommit) -} - -//------------------------------------- - -// ProposalMessage is sent when a new block is proposed. -type ProposalMessage struct { - Proposal *types.Proposal -} - -// ValidateBasic performs basic validation. -func (m *ProposalMessage) ValidateBasic() error { - return m.Proposal.ValidateBasic() -} - -// String returns a string representation. -func (m *ProposalMessage) String() string { - return fmt.Sprintf("[Proposal %v]", m.Proposal) -} - -//------------------------------------- - -// ProposalPOLMessage is sent when a previous proposal is re-proposed. -type ProposalPOLMessage struct { - Height int64 - ProposalPOLRound int32 - ProposalPOL *bits.BitArray -} - -// ValidateBasic performs basic validation. -func (m *ProposalPOLMessage) ValidateBasic() error { - if m.Height < 0 { - return errors.New("negative Height") - } - if m.ProposalPOLRound < 0 { - return errors.New("negative ProposalPOLRound") - } - if m.ProposalPOL.Size() == 0 { - return errors.New("empty ProposalPOL bit array") - } - if m.ProposalPOL.Size() > types.MaxVotesCount { - return fmt.Errorf("proposalPOL bit array is too big: %d, max: %d", m.ProposalPOL.Size(), types.MaxVotesCount) - } - return nil -} - -// String returns a string representation. -func (m *ProposalPOLMessage) String() string { - return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL) -} - -//------------------------------------- - -// BlockPartMessage is sent when gossipping a piece of the proposed block. -type BlockPartMessage struct { - Height int64 - Round int32 - Part *types.Part -} - -// ValidateBasic performs basic validation. -func (m *BlockPartMessage) ValidateBasic() error { - if m.Height < 0 { - return errors.New("negative Height") - } - if m.Round < 0 { - return errors.New("negative Round") - } - if err := m.Part.ValidateBasic(); err != nil { - return fmt.Errorf("wrong Part: %v", err) - } - return nil -} - -// String returns a string representation. -func (m *BlockPartMessage) String() string { - return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part) -} - -//------------------------------------- - -// VoteMessage is sent when voting for a proposal (or lack thereof). -type VoteMessage struct { - Vote *types.Vote -} - -// ValidateBasic performs basic validation. -func (m *VoteMessage) ValidateBasic() error { - return m.Vote.ValidateBasic() -} - -// String returns a string representation. -func (m *VoteMessage) String() string { - return fmt.Sprintf("[Vote %v]", m.Vote) -} - -//------------------------------------- - -// HasVoteMessage is sent to indicate that a particular vote has been received. -type HasVoteMessage struct { - Height int64 - Round int32 - Type tmproto.SignedMsgType - Index int32 -} - -// ValidateBasic performs basic validation. -func (m *HasVoteMessage) ValidateBasic() error { - if m.Height < 0 { - return errors.New("negative Height") - } - if m.Round < 0 { - return errors.New("negative Round") - } - if !types.IsVoteTypeValid(m.Type) { - return errors.New("invalid Type") - } - if m.Index < 0 { - return errors.New("negative Index") - } - return nil + return tmcon.MsgFromProto(pb) } - -// String returns a string representation. -func (m *HasVoteMessage) String() string { - return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v}]", m.Index, m.Height, m.Round, m.Type) -} - -//------------------------------------- - -// VoteSetMaj23Message is sent to indicate that a given BlockID has seen +2/3 votes. -type VoteSetMaj23Message struct { - Height int64 - Round int32 - Type tmproto.SignedMsgType - BlockID types.BlockID -} - -// ValidateBasic performs basic validation. -func (m *VoteSetMaj23Message) ValidateBasic() error { - if m.Height < 0 { - return errors.New("negative Height") - } - if m.Round < 0 { - return errors.New("negative Round") - } - if !types.IsVoteTypeValid(m.Type) { - return errors.New("invalid Type") - } - if err := m.BlockID.ValidateBasic(); err != nil { - return fmt.Errorf("wrong BlockID: %v", err) - } - return nil -} - -// String returns a string representation. -func (m *VoteSetMaj23Message) String() string { - return fmt.Sprintf("[VSM23 %v/%02d/%v %v]", m.Height, m.Round, m.Type, m.BlockID) -} - -//------------------------------------- - -// VoteSetBitsMessage is sent to communicate the bit-array of votes seen for the BlockID. -type VoteSetBitsMessage struct { - Height int64 - Round int32 - Type tmproto.SignedMsgType - BlockID types.BlockID - Votes *bits.BitArray -} - -// ValidateBasic performs basic validation. -func (m *VoteSetBitsMessage) ValidateBasic() error { - if m.Height < 0 { - return errors.New("negative Height") - } - if !types.IsVoteTypeValid(m.Type) { - return errors.New("invalid Type") - } - if err := m.BlockID.ValidateBasic(); err != nil { - return fmt.Errorf("wrong BlockID: %v", err) - } - // NOTE: Votes.Size() can be zero if the node does not have any - if m.Votes.Size() > types.MaxVotesCount { - return fmt.Errorf("votes bit array is too big: %d, max: %d", m.Votes.Size(), types.MaxVotesCount) - } - return nil -} - -// String returns a string representation. -func (m *VoteSetBitsMessage) String() string { - return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes) -} - -//------------------------------------- diff --git a/test/maverick/consensus/replay.go b/test/maverick/consensus/replay.go index bfec9e96d..22709b6ab 100644 --- a/test/maverick/consensus/replay.go +++ b/test/maverick/consensus/replay.go @@ -9,6 +9,7 @@ import ( "time" abci "github.com/tendermint/tendermint/abci/types" + tmcon "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/crypto/merkle" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/proxy" @@ -35,9 +36,9 @@ var crc32c = crc32.MakeTable(crc32.Castagnoli) // Unmarshal and apply a single message to the consensus state as if it were // received in receiveRoutine. Lines that start with "#" are ignored. // NOTE: receiveRoutine should not be running. -func (cs *State) readReplayMessage(msg *TimedWALMessage, newStepSub types.Subscription) error { +func (cs *State) readReplayMessage(msg *tmcon.TimedWALMessage, newStepSub types.Subscription) error { // Skip meta messages which exist for demarcating boundaries. - if _, ok := msg.Msg.(EndHeightMessage); ok { + if _, ok := msg.Msg.(tmcon.EndHeightMessage); ok { return nil } @@ -66,13 +67,13 @@ func (cs *State) readReplayMessage(msg *TimedWALMessage, newStepSub types.Subscr peerID = "local" } switch msg := m.Msg.(type) { - case *ProposalMessage: + case *tmcon.ProposalMessage: p := msg.Proposal cs.Logger.Info("Replay: Proposal", "height", p.Height, "round", p.Round, "header", p.BlockID.PartSetHeader, "pol", p.POLRound, "peer", peerID) - case *BlockPartMessage: + case *tmcon.BlockPartMessage: cs.Logger.Info("Replay: BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerID) - case *VoteMessage: + case *tmcon.VoteMessage: v := msg.Vote cs.Logger.Info("Replay: Vote", "height", v.Height, "round", v.Round, "type", v.Type, "blockID", v.BlockID, "peer", peerID) @@ -102,7 +103,7 @@ func (cs *State) catchupReplay(csHeight int64) error { // this check (since we can crash after writing #ENDHEIGHT). // // Ignore data corruption errors since this is a sanity check. - gr, found, err := cs.wal.SearchForEndHeight(csHeight, &WALSearchOptions{IgnoreDataCorruptionErrors: true}) + gr, found, err := cs.wal.SearchForEndHeight(csHeight, &tmcon.WALSearchOptions{IgnoreDataCorruptionErrors: true}) if err != nil { return err } @@ -125,7 +126,7 @@ func (cs *State) catchupReplay(csHeight int64) error { if csHeight == cs.state.InitialHeight { endHeight = 0 } - gr, found, err = cs.wal.SearchForEndHeight(endHeight, &WALSearchOptions{IgnoreDataCorruptionErrors: true}) + gr, found, err = cs.wal.SearchForEndHeight(endHeight, &tmcon.WALSearchOptions{IgnoreDataCorruptionErrors: true}) if err == io.EOF { cs.Logger.Error("Replay: wal.group.Search returned EOF", "#ENDHEIGHT", endHeight) } else if err != nil { @@ -138,7 +139,7 @@ func (cs *State) catchupReplay(csHeight int64) error { cs.Logger.Info("Catchup by replaying consensus messages", "height", csHeight) - var msg *TimedWALMessage + var msg *tmcon.TimedWALMessage dec := WALDecoder{gr} LOOP: diff --git a/test/maverick/consensus/replay_file.go b/test/maverick/consensus/replay_file.go index 0a02031f8..2dbc5cf37 100644 --- a/test/maverick/consensus/replay_file.go +++ b/test/maverick/consensus/replay_file.go @@ -13,6 +13,7 @@ import ( dbm "github.com/tendermint/tm-db" cfg "github.com/tendermint/tendermint/config" + tmcon "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/libs/log" tmos "github.com/tendermint/tendermint/libs/os" "github.com/tendermint/tendermint/proxy" @@ -73,7 +74,7 @@ func (cs *State) ReplayFile(file string, console bool) error { defer pb.fp.Close() var nextN int // apply N msgs in a row - var msg *TimedWALMessage + var msg *tmcon.TimedWALMessage for { if nextN == 0 && console { nextN = pb.replayConsoleLoop() @@ -147,7 +148,7 @@ func (pb *playback) replayReset(count int, newStepSub types.Subscription) error fmt.Printf("Reseting from %d to %d\n", pb.count, count) pb.count = 0 pb.cs = newCS - var msg *TimedWALMessage + var msg *tmcon.TimedWALMessage for i := 0; i < count; i++ { msg, err = pb.dec.Decode() if err == io.EOF { diff --git a/test/maverick/consensus/state.go b/test/maverick/consensus/state.go index bec89e1b1..f83dc56c6 100644 --- a/test/maverick/consensus/state.go +++ b/test/maverick/consensus/state.go @@ -14,6 +14,7 @@ import ( "github.com/gogo/protobuf/proto" cfg "github.com/tendermint/tendermint/config" + tmcon "github.com/tendermint/tendermint/consensus" cstypes "github.com/tendermint/tendermint/consensus/types" "github.com/tendermint/tendermint/crypto" tmevents "github.com/tendermint/tendermint/libs/events" @@ -78,7 +79,7 @@ type State struct { // a Write-Ahead Log ensures we can recover from any kind of crash // and helps us avoid signing conflicting votes - wal WAL + wal tmcon.WAL replayMode bool // so we don't log signing errors during replay doWALCatchup bool // determines if we even try to do the catchup @@ -96,7 +97,7 @@ type State struct { evsw tmevents.EventSwitch // for reporting metrics - metrics *Metrics + metrics *tmcon.Metrics // misbehaviors mapped for each height (can't have more than one misbehavior per height) misbehaviors map[int64]Misbehavior @@ -134,7 +135,7 @@ func NewState( wal: nilWAL{}, evpool: evpool, evsw: tmevents.NewEventSwitch(), - metrics: NopMetrics(), + metrics: tmcon.NopMetrics(), misbehaviors: misbehaviors, } // set function defaults (may be overwritten before calling Start) @@ -173,7 +174,7 @@ func (cs *State) handleMsg(mi msgInfo) { ) msg, peerID := mi.Msg, mi.PeerID switch msg := msg.(type) { - case *ProposalMessage: + case *tmcon.ProposalMessage: // will not cause transition. // once proposal is set, we can receive block parts // err = cs.setProposal(msg.Proposal) @@ -182,7 +183,7 @@ func (cs *State) handleMsg(mi msgInfo) { } else { err = defaultReceiveProposal(cs, msg.Proposal) } - case *BlockPartMessage: + case *tmcon.BlockPartMessage: // if the proposal is complete, we'll enterPrevote or tryFinalizeCommit added, err = cs.addProposalBlockPart(msg, peerID) if added { @@ -200,7 +201,7 @@ func (cs *State) handleMsg(mi msgInfo) { msg.Round) err = nil } - case *VoteMessage: + case *tmcon.VoteMessage: // attempt to add the vote and dupeout the validator if its a duplicate signature // if the vote gives us a 2/3-any or 2/3-one, we transition added, err = cs.tryAddVote(msg.Vote, peerID) @@ -442,8 +443,8 @@ var ( // msgs from the reactor which may update the state type msgInfo struct { - Msg Message `json:"msg"` - PeerID p2p.ID `json:"peer_key"` + Msg tmcon.Message `json:"msg"` + PeerID p2p.ID `json:"peer_key"` } // internally generated messages which may update the state @@ -485,7 +486,7 @@ func (cs *State) SetEventBus(b *types.EventBus) { } // StateMetrics sets the metrics. -func StateMetrics(metrics *Metrics) StateOption { +func StateMetrics(metrics *tmcon.Metrics) StateOption { return func(cs *State) { cs.metrics = metrics } } @@ -685,7 +686,7 @@ func (cs *State) Wait() { // OpenWAL opens a file to log all consensus messages and timeouts for // deterministic accountability. -func (cs *State) OpenWAL(walFile string) (WAL, error) { +func (cs *State) OpenWAL(walFile string) (tmcon.WAL, error) { wal, err := NewWAL(walFile) if err != nil { cs.Logger.Error("Failed to open WAL", "file", walFile, "err", err) @@ -709,9 +710,9 @@ func (cs *State) OpenWAL(walFile string) (WAL, error) { // AddVote inputs a vote. func (cs *State) AddVote(vote *types.Vote, peerID p2p.ID) (added bool, err error) { if peerID == "" { - cs.internalMsgQueue <- msgInfo{&VoteMessage{vote}, ""} + cs.internalMsgQueue <- msgInfo{&tmcon.VoteMessage{Vote: vote}, ""} } else { - cs.peerMsgQueue <- msgInfo{&VoteMessage{vote}, peerID} + cs.peerMsgQueue <- msgInfo{&tmcon.VoteMessage{Vote: vote}, peerID} } // TODO: wait for event?! @@ -722,9 +723,9 @@ func (cs *State) AddVote(vote *types.Vote, peerID p2p.ID) (added bool, err error func (cs *State) SetProposal(proposal *types.Proposal, peerID p2p.ID) error { if peerID == "" { - cs.internalMsgQueue <- msgInfo{&ProposalMessage{proposal}, ""} + cs.internalMsgQueue <- msgInfo{&tmcon.ProposalMessage{Proposal: proposal}, ""} } else { - cs.peerMsgQueue <- msgInfo{&ProposalMessage{proposal}, peerID} + cs.peerMsgQueue <- msgInfo{&tmcon.ProposalMessage{Proposal: proposal}, peerID} } // TODO: wait for event?! @@ -735,9 +736,9 @@ func (cs *State) SetProposal(proposal *types.Proposal, peerID p2p.ID) error { func (cs *State) AddProposalBlockPart(height int64, round int32, part *types.Part, peerID p2p.ID) error { if peerID == "" { - cs.internalMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, ""} + cs.internalMsgQueue <- msgInfo{&tmcon.BlockPartMessage{Height: height, Round: round, Part: part}, ""} } else { - cs.peerMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, peerID} + cs.peerMsgQueue <- msgInfo{&tmcon.BlockPartMessage{Height: height, Round: round, Part: part}, peerID} } // TODO: wait for event?! @@ -998,7 +999,7 @@ func (cs *State) receiveRoutine(maxSteps int) { panic(fmt.Sprintf("Failed to write %v msg to consensus wal due to %v. Check your FS and restart the node", mi, err)) } - if _, ok := mi.Msg.(*VoteMessage); ok { + if _, ok := mi.Msg.(*tmcon.VoteMessage); ok { // we actually want to simulate failing during // the previous WriteSync, but this isn't easy to do. // Equivalent would be to fail here and manually remove @@ -1210,10 +1211,10 @@ func (cs *State) defaultDecideProposal(height int64, round int32) { proposal.Signature = p.Signature // send proposal and block parts on internal msg queue - cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""}) + cs.sendInternalMessage(msgInfo{&tmcon.ProposalMessage{Proposal: proposal}, ""}) for i := 0; i < int(blockParts.Total()); i++ { part := blockParts.GetPart(i) - cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""}) + cs.sendInternalMessage(msgInfo{&tmcon.BlockPartMessage{Height: cs.Height, Round: cs.Round, Part: part}, ""}) } cs.Logger.Info("Signed proposal", "height", height, "round", round, "proposal", proposal) cs.Logger.Debug(fmt.Sprintf("Signed proposal block: %v", block)) @@ -1490,7 +1491,7 @@ func (cs *State) finalizeCommit(height int64) { // Either way, the State should not be resumed until we // successfully call ApplyBlock (ie. later here, or in Handshake after // restart). - endMsg := EndHeightMessage{height} + endMsg := tmcon.EndHeightMessage{Height: height} if err := cs.wal.WriteSync(endMsg); err != nil { // NOTE: fsync panic(fmt.Sprintf("Failed to write %v msg to consensus wal due to %v. Check your FS and restart the node", endMsg, err)) @@ -1658,7 +1659,7 @@ func (cs *State) recordMetrics(height int64, block *types.Block) { // NOTE: block is not necessarily valid. // Asynchronously triggers either enterPrevote (before we timeout of propose) or tryFinalizeCommit, // once we have the full block. -func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.ID) (added bool, err error) { +func (cs *State) addProposalBlockPart(msg *tmcon.BlockPartMessage, peerID p2p.ID) (added bool, err error) { height, round, part := msg.Height, msg.Round, msg.Part // Blocks might be reused, so round mismatch is OK @@ -1858,7 +1859,7 @@ func (cs *State) signAddVote(msgType tmproto.SignedMsgType, hash []byte, header // TODO: pass pubKey to signVote vote, err := cs.signVote(msgType, hash, header) if err == nil { - cs.sendInternalMessage(msgInfo{&VoteMessage{vote}, ""}) + cs.sendInternalMessage(msgInfo{&tmcon.VoteMessage{Vote: vote}, ""}) cs.Logger.Info("Signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote) return vote } diff --git a/test/maverick/consensus/wal.go b/test/maverick/consensus/wal.go index 7d698713f..99b929456 100644 --- a/test/maverick/consensus/wal.go +++ b/test/maverick/consensus/wal.go @@ -11,8 +11,9 @@ import ( "github.com/gogo/protobuf/proto" - auto "github.com/tendermint/tendermint/libs/autofile" // tmjson "github.com/tendermint/tendermint/libs/json" + tmcon "github.com/tendermint/tendermint/consensus" + auto "github.com/tendermint/tendermint/libs/autofile" "github.com/tendermint/tendermint/libs/log" tmos "github.com/tendermint/tendermint/libs/os" "github.com/tendermint/tendermint/libs/service" @@ -30,44 +31,12 @@ const ( //-------------------------------------------------------- // types and functions for savings consensus messages - -// TimedWALMessage wraps WALMessage and adds Time for debugging purposes. -type TimedWALMessage struct { - Time time.Time `json:"time"` - Msg WALMessage `json:"msg"` -} - -// EndHeightMessage marks the end of the given height inside WAL. -// @internal used by scripts/wal2json util. -type EndHeightMessage struct { - Height int64 `json:"height"` -} - -type WALMessage interface{} - // func init() { // tmjson.RegisterType(msgInfo{}, "tendermint/wal/MsgInfo") // tmjson.RegisterType(timeoutInfo{}, "tendermint/wal/TimeoutInfo") -// tmjson.RegisterType(EndHeightMessage{}, "tendermint/wal/EndHeightMessage") +// tmjson.RegisterType(tmcon.EndHeightMessage {}, "tendermint/wal/EndHeightMessage ") // } -//-------------------------------------------------------- -// Simple write-ahead logger - -// WAL is an interface for any write-ahead logger. -type WAL interface { - Write(WALMessage) error - WriteSync(WALMessage) error - FlushAndSync() error - - SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) - - // service methods - Start() error - Stop() error - Wait() -} - // Write ahead logger writes msgs to disk before they are processed. // Can be used for crash-recovery and deterministic replay. // TODO: currently the wal is overwritten during replay catchup, give it a mode @@ -84,7 +53,7 @@ type BaseWAL struct { flushInterval time.Duration } -var _ WAL = &BaseWAL{} +var _ tmcon.WAL = &BaseWAL{} // NewWAL returns a new write-ahead logger based on `baseWAL`, which implements // WAL. It's flushed and synced to disk every 2s and once when stopped. @@ -126,7 +95,7 @@ func (wal *BaseWAL) OnStart() error { if err != nil { return err } else if size == 0 { - if err := wal.WriteSync(EndHeightMessage{0}); err != nil { + if err := wal.WriteSync(tmcon.EndHeightMessage{Height: 0}); err != nil { return err } } @@ -181,12 +150,12 @@ func (wal *BaseWAL) Wait() { // Write is called in newStep and for each receive on the // peerMsgQueue and the timeoutTicker. // NOTE: does not call fsync() -func (wal *BaseWAL) Write(msg WALMessage) error { +func (wal *BaseWAL) Write(msg tmcon.WALMessage) error { if wal == nil { return nil } - if err := wal.enc.Encode(&TimedWALMessage{tmtime.Now(), msg}); err != nil { + if err := wal.enc.Encode(&tmcon.TimedWALMessage{Time: tmtime.Now(), Msg: msg}); err != nil { wal.Logger.Error("Error writing msg to consensus wal. WARNING: recover may not be possible for the current height", "err", err, "msg", msg) return err @@ -198,7 +167,7 @@ func (wal *BaseWAL) Write(msg WALMessage) error { // WriteSync is called when we receive a msg from ourselves // so that we write to disk before sending signed messages. // NOTE: calls fsync() -func (wal *BaseWAL) WriteSync(msg WALMessage) error { +func (wal *BaseWAL) WriteSync(msg tmcon.WALMessage) error { if wal == nil { return nil } @@ -230,9 +199,9 @@ type WALSearchOptions struct { // CONTRACT: caller must close group reader. func (wal *BaseWAL) SearchForEndHeight( height int64, - options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) { + options *tmcon.WALSearchOptions) (rd io.ReadCloser, found bool, err error) { var ( - msg *TimedWALMessage + msg *tmcon.TimedWALMessage gr *auto.GroupReader ) lastHeightFound := int64(-1) @@ -268,7 +237,7 @@ func (wal *BaseWAL) SearchForEndHeight( return nil, false, err } - if m, ok := msg.Msg.(EndHeightMessage); ok { + if m, ok := msg.Msg.(tmcon.EndHeightMessage); ok { lastHeightFound = m.Height if m.Height == height { // found wal.Logger.Info("Found", "height", height, "index", index) @@ -299,7 +268,7 @@ func NewWALEncoder(wr io.Writer) *WALEncoder { // Encode writes the custom encoding of v to the stream. It returns an error if // the encoded size of v is greater than 1MB. Any error encountered // during the write is also returned. -func (enc *WALEncoder) Encode(v *TimedWALMessage) error { +func (enc *WALEncoder) Encode(v *tmcon.TimedWALMessage) error { pbMsg, err := WALToProto(v.Msg) if err != nil { return err @@ -366,7 +335,7 @@ func NewWALDecoder(rd io.Reader) *WALDecoder { } // Decode reads the next custom-encoded value from its reader and returns it. -func (dec *WALDecoder) Decode() (*TimedWALMessage, error) { +func (dec *WALDecoder) Decode() (*tmcon.TimedWALMessage, error) { b := make([]byte, 4) _, err := dec.rd.Read(b) @@ -414,7 +383,7 @@ func (dec *WALDecoder) Decode() (*TimedWALMessage, error) { if err != nil { return nil, DataCorruptionError{fmt.Errorf("failed to convert from proto: %w", err)} } - tMsgWal := &TimedWALMessage{ + tMsgWal := &tmcon.TimedWALMessage{ Time: res.Time, Msg: walMsg, } @@ -424,12 +393,13 @@ func (dec *WALDecoder) Decode() (*TimedWALMessage, error) { type nilWAL struct{} -var _ WAL = nilWAL{} +var _ tmcon.WAL = nilWAL{} -func (nilWAL) Write(m WALMessage) error { return nil } -func (nilWAL) WriteSync(m WALMessage) error { return nil } -func (nilWAL) FlushAndSync() error { return nil } -func (nilWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) { +func (nilWAL) Write(m tmcon.WALMessage) error { return nil } +func (nilWAL) WriteSync(m tmcon.WALMessage) error { return nil } +func (nilWAL) FlushAndSync() error { return nil } +func (nilWAL) SearchForEndHeight(height int64, + options *tmcon.WALSearchOptions) (rd io.ReadCloser, found bool, err error) { return nil, false, nil } func (nilWAL) Start() error { return nil } diff --git a/test/maverick/consensus/wal_fuzz.go b/test/maverick/consensus/wal_fuzz.go deleted file mode 100644 index e15097c30..000000000 --- a/test/maverick/consensus/wal_fuzz.go +++ /dev/null @@ -1,31 +0,0 @@ -// +build gofuzz - -package consensus - -import ( - "bytes" - "io" -) - -func Fuzz(data []byte) int { - dec := NewWALDecoder(bytes.NewReader(data)) - for { - msg, err := dec.Decode() - if err == io.EOF { - break - } - if err != nil { - if msg != nil { - panic("msg != nil on error") - } - return 0 - } - var w bytes.Buffer - enc := NewWALEncoder(&w) - err = enc.Encode(msg) - if err != nil { - panic(err) - } - } - return 1 -} diff --git a/test/maverick/consensus/wal_generator.go b/test/maverick/consensus/wal_generator.go index fde9064b8..6997db14e 100644 --- a/test/maverick/consensus/wal_generator.go +++ b/test/maverick/consensus/wal_generator.go @@ -13,6 +13,7 @@ import ( "github.com/tendermint/tendermint/abci/example/kvstore" cfg "github.com/tendermint/tendermint/config" + tmcon "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/libs/log" tmrand "github.com/tendermint/tendermint/libs/rand" "github.com/tendermint/tendermint/privval" @@ -98,7 +99,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { numBlocksWritten := make(chan struct{}) wal := newByteBufferWAL(logger, NewWALEncoder(wr), int64(numBlocks), numBlocksWritten) // see wal.go#103 - if err := wal.Write(EndHeightMessage{0}); err != nil { + if err := wal.Write(tmcon.EndHeightMessage{Height: 0}); err != nil { t.Error(err) } @@ -187,13 +188,13 @@ func newByteBufferWAL(logger log.Logger, enc *WALEncoder, nBlocks int64, signalS // Save writes message to the internal buffer except when heightToStop is // reached, in which case it will signal the caller via signalWhenStopsTo and // skip writing. -func (w *byteBufferWAL) Write(m WALMessage) error { +func (w *byteBufferWAL) Write(m tmcon.WALMessage) error { if w.stopped { w.logger.Debug("WAL already stopped. Not writing message", "msg", m) return nil } - if endMsg, ok := m.(EndHeightMessage); ok { + if endMsg, ok := m.(tmcon.EndHeightMessage); ok { w.logger.Debug("WAL write end height message", "height", endMsg.Height, "stopHeight", w.heightToStop) if endMsg.Height == w.heightToStop { w.logger.Debug("Stopping WAL at height", "height", endMsg.Height) @@ -204,7 +205,7 @@ func (w *byteBufferWAL) Write(m WALMessage) error { } w.logger.Debug("WAL Write Message", "msg", m) - err := w.enc.Encode(&TimedWALMessage{fixedTime, m}) + err := w.enc.Encode(&tmcon.TimedWALMessage{Time: fixedTime, Msg: m}) if err != nil { panic(fmt.Sprintf("failed to encode the msg %v", m)) } @@ -212,7 +213,7 @@ func (w *byteBufferWAL) Write(m WALMessage) error { return nil } -func (w *byteBufferWAL) WriteSync(m WALMessage) error { +func (w *byteBufferWAL) WriteSync(m tmcon.WALMessage) error { return w.Write(m) } @@ -220,7 +221,7 @@ func (w *byteBufferWAL) FlushAndSync() error { return nil } func (w *byteBufferWAL) SearchForEndHeight( height int64, - options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) { + options *tmcon.WALSearchOptions) (rd io.ReadCloser, found bool, err error) { return nil, false, nil } diff --git a/test/maverick/node/node.go b/test/maverick/node/node.go index e1f41b6fb..22853de5f 100644 --- a/test/maverick/node/node.go +++ b/test/maverick/node/node.go @@ -138,19 +138,19 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger, misbehaviors map[int6 } // MetricsProvider returns a consensus, p2p and mempool Metrics. -type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) +type MetricsProvider func(chainID string) (*consensus.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) // DefaultMetricsProvider returns Metrics build using Prometheus client library // if Prometheus is enabled. Otherwise, it returns no-op Metrics. func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider { - return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) { + return func(chainID string) (*consensus.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) { if config.Prometheus { - return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID), + return consensus.PrometheusMetrics(config.Namespace, "chain_id", chainID), p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID), mempl.PrometheusMetrics(config.Namespace, "chain_id", chainID), sm.PrometheusMetrics(config.Namespace, "chain_id", chainID) } - return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics() + return consensus.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics() } } @@ -427,7 +427,7 @@ func createConsensusReactor(config *cfg.Config, mempool *mempl.CListMempool, evidencePool *evidence.Pool, privValidator types.PrivValidator, - csMetrics *cs.Metrics, + csMetrics *consensus.Metrics, waitSync bool, eventBus *types.EventBus, consensusLogger log.Logger,