|
|
@ -329,7 +329,9 @@ func (cs *State) OnStart() error { |
|
|
|
cs.Logger.Info("WAL file is corrupted. Attempting repair", "err", err) |
|
|
|
|
|
|
|
// 1) prep work
|
|
|
|
cs.wal.Stop() |
|
|
|
if err := cs.wal.Stop(); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
repairAttempted = true |
|
|
|
|
|
|
|
// 2) backup original WAL file
|
|
|
@ -658,11 +660,15 @@ func (cs *State) updateToState(state sm.State) { |
|
|
|
|
|
|
|
func (cs *State) newStep() { |
|
|
|
rs := cs.RoundStateEvent() |
|
|
|
cs.wal.Write(rs) |
|
|
|
if err := cs.wal.Write(rs); err != nil { |
|
|
|
cs.Logger.Error("Error writing to wal", "err", err) |
|
|
|
} |
|
|
|
cs.nSteps++ |
|
|
|
// newStep is called by updateToState in NewState before the eventBus is set!
|
|
|
|
if cs.eventBus != nil { |
|
|
|
cs.eventBus.PublishEventNewRoundStep(rs) |
|
|
|
if err := cs.eventBus.PublishEventNewRoundStep(rs); err != nil { |
|
|
|
cs.Logger.Error("Error publishing new round step", "err", err) |
|
|
|
} |
|
|
|
cs.evsw.FireEvent(types.EventNewRoundStep, &cs.RoundState) |
|
|
|
} |
|
|
|
} |
|
|
@ -720,7 +726,9 @@ func (cs *State) receiveRoutine(maxSteps int) { |
|
|
|
case <-cs.txNotifier.TxsAvailable(): |
|
|
|
cs.handleTxsAvailable() |
|
|
|
case mi = <-cs.peerMsgQueue: |
|
|
|
cs.wal.Write(mi) |
|
|
|
if err := cs.wal.Write(mi); err != nil { |
|
|
|
cs.Logger.Error("Error writing to wal", "err", err) |
|
|
|
} |
|
|
|
// handles proposals, block parts, votes
|
|
|
|
// may generate internal events (votes, complete proposals, 2/3 majorities)
|
|
|
|
cs.handleMsg(mi) |
|
|
@ -741,7 +749,9 @@ func (cs *State) receiveRoutine(maxSteps int) { |
|
|
|
// handles proposals, block parts, votes
|
|
|
|
cs.handleMsg(mi) |
|
|
|
case ti := <-cs.timeoutTicker.Chan(): // tockChan:
|
|
|
|
cs.wal.Write(ti) |
|
|
|
if err := cs.wal.Write(ti); err != nil { |
|
|
|
cs.Logger.Error("Error writing to wal", "err", err) |
|
|
|
} |
|
|
|
// if the timeout is relevant to the rs
|
|
|
|
// go to the next step
|
|
|
|
cs.handleTimeout(ti, rs) |
|
|
@ -839,13 +849,19 @@ func (cs *State) handleTimeout(ti timeoutInfo, rs cstypes.RoundState) { |
|
|
|
case cstypes.RoundStepNewRound: |
|
|
|
cs.enterPropose(ti.Height, 0) |
|
|
|
case cstypes.RoundStepPropose: |
|
|
|
cs.eventBus.PublishEventTimeoutPropose(cs.RoundStateEvent()) |
|
|
|
if err := cs.eventBus.PublishEventTimeoutPropose(cs.RoundStateEvent()); err != nil { |
|
|
|
cs.Logger.Error("Error publishing timeout propose", "err", err) |
|
|
|
} |
|
|
|
cs.enterPrevote(ti.Height, ti.Round) |
|
|
|
case cstypes.RoundStepPrevoteWait: |
|
|
|
cs.eventBus.PublishEventTimeoutWait(cs.RoundStateEvent()) |
|
|
|
if err := cs.eventBus.PublishEventTimeoutWait(cs.RoundStateEvent()); err != nil { |
|
|
|
cs.Logger.Error("Error publishing timeout wait", "err", err) |
|
|
|
} |
|
|
|
cs.enterPrecommit(ti.Height, ti.Round) |
|
|
|
case cstypes.RoundStepPrecommitWait: |
|
|
|
cs.eventBus.PublishEventTimeoutWait(cs.RoundStateEvent()) |
|
|
|
if err := cs.eventBus.PublishEventTimeoutWait(cs.RoundStateEvent()); err != nil { |
|
|
|
cs.Logger.Error("Error publishing timeout wait", "err", err) |
|
|
|
} |
|
|
|
cs.enterPrecommit(ti.Height, ti.Round) |
|
|
|
cs.enterNewRound(ti.Height, ti.Round+1) |
|
|
|
default: |
|
|
@ -933,7 +949,9 @@ func (cs *State) enterNewRound(height int64, round int32) { |
|
|
|
cs.Votes.SetRound(tmmath.SafeAddInt32(round, 1)) // also track next round (round+1) to allow round-skipping
|
|
|
|
cs.TriggeredTimeoutPrecommit = false |
|
|
|
|
|
|
|
cs.eventBus.PublishEventNewRound(cs.NewRoundEvent()) |
|
|
|
if err := cs.eventBus.PublishEventNewRound(cs.NewRoundEvent()); err != nil { |
|
|
|
cs.Logger.Error("Error publishing new round", "err", err) |
|
|
|
} |
|
|
|
cs.metrics.Rounds.Set(float64(round)) |
|
|
|
|
|
|
|
// Wait for txs to be available in the mempool
|
|
|
@ -1058,7 +1076,9 @@ func (cs *State) defaultDecideProposal(height int64, round int32) { |
|
|
|
|
|
|
|
// Flush the WAL. Otherwise, we may not recompute the same proposal to sign,
|
|
|
|
// and the privValidator will refuse to sign anything.
|
|
|
|
cs.wal.FlushAndSync() |
|
|
|
if err := cs.wal.FlushAndSync(); err != nil { |
|
|
|
cs.Logger.Error("Error flushing to disk") |
|
|
|
} |
|
|
|
|
|
|
|
// Make proposal
|
|
|
|
propBlockID := types.BlockID{Hash: block.Hash(), PartSetHeader: blockParts.Header()} |
|
|
@ -1269,7 +1289,9 @@ func (cs *State) enterPrecommit(height int64, round int32) { |
|
|
|
} |
|
|
|
|
|
|
|
// At this point +2/3 prevoted for a particular block or nil.
|
|
|
|
cs.eventBus.PublishEventPolka(cs.RoundStateEvent()) |
|
|
|
if err := cs.eventBus.PublishEventPolka(cs.RoundStateEvent()); err != nil { |
|
|
|
cs.Logger.Error("Error publishing polka", "err", err) |
|
|
|
} |
|
|
|
|
|
|
|
// the latest POLRound should be this round.
|
|
|
|
polRound, _ := cs.Votes.POLInfo() |
|
|
@ -1286,7 +1308,9 @@ func (cs *State) enterPrecommit(height int64, round int32) { |
|
|
|
cs.LockedRound = -1 |
|
|
|
cs.LockedBlock = nil |
|
|
|
cs.LockedBlockParts = nil |
|
|
|
cs.eventBus.PublishEventUnlock(cs.RoundStateEvent()) |
|
|
|
if err := cs.eventBus.PublishEventUnlock(cs.RoundStateEvent()); err != nil { |
|
|
|
cs.Logger.Error("Error publishing event unlock", "err", err) |
|
|
|
} |
|
|
|
} |
|
|
|
cs.signAddVote(tmproto.PrecommitType, nil, types.PartSetHeader{}) |
|
|
|
return |
|
|
@ -1298,7 +1322,9 @@ func (cs *State) enterPrecommit(height int64, round int32) { |
|
|
|
if cs.LockedBlock.HashesTo(blockID.Hash) { |
|
|
|
logger.Info("enterPrecommit: +2/3 prevoted locked block. Relocking") |
|
|
|
cs.LockedRound = round |
|
|
|
cs.eventBus.PublishEventRelock(cs.RoundStateEvent()) |
|
|
|
if err := cs.eventBus.PublishEventRelock(cs.RoundStateEvent()); err != nil { |
|
|
|
cs.Logger.Error("Error publishing event relock", "err", err) |
|
|
|
} |
|
|
|
cs.signAddVote(tmproto.PrecommitType, blockID.Hash, blockID.PartSetHeader) |
|
|
|
return |
|
|
|
} |
|
|
@ -1313,7 +1339,9 @@ func (cs *State) enterPrecommit(height int64, round int32) { |
|
|
|
cs.LockedRound = round |
|
|
|
cs.LockedBlock = cs.ProposalBlock |
|
|
|
cs.LockedBlockParts = cs.ProposalBlockParts |
|
|
|
cs.eventBus.PublishEventLock(cs.RoundStateEvent()) |
|
|
|
if err := cs.eventBus.PublishEventLock(cs.RoundStateEvent()); err != nil { |
|
|
|
cs.Logger.Error("Error publishing event lock", "err", err) |
|
|
|
} |
|
|
|
cs.signAddVote(tmproto.PrecommitType, blockID.Hash, blockID.PartSetHeader) |
|
|
|
return |
|
|
|
} |
|
|
@ -1329,7 +1357,9 @@ func (cs *State) enterPrecommit(height int64, round int32) { |
|
|
|
cs.ProposalBlock = nil |
|
|
|
cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader) |
|
|
|
} |
|
|
|
cs.eventBus.PublishEventUnlock(cs.RoundStateEvent()) |
|
|
|
if err := cs.eventBus.PublishEventUnlock(cs.RoundStateEvent()); err != nil { |
|
|
|
cs.Logger.Error("Error publishing event unlock", "err", err) |
|
|
|
} |
|
|
|
cs.signAddVote(tmproto.PrecommitType, nil, types.PartSetHeader{}) |
|
|
|
} |
|
|
|
|
|
|
@ -1415,7 +1445,9 @@ func (cs *State) enterCommit(height int64, commitRound int32) { |
|
|
|
// Set up ProposalBlockParts and keep waiting.
|
|
|
|
cs.ProposalBlock = nil |
|
|
|
cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader) |
|
|
|
cs.eventBus.PublishEventValidBlock(cs.RoundStateEvent()) |
|
|
|
if err := cs.eventBus.PublishEventValidBlock(cs.RoundStateEvent()); err != nil { |
|
|
|
cs.Logger.Error("Error publishing valid block", "err", err) |
|
|
|
} |
|
|
|
cs.evsw.FireEvent(types.EventValidBlock, &cs.RoundState) |
|
|
|
} |
|
|
|
// else {
|
|
|
@ -1756,7 +1788,9 @@ func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.ID) (add |
|
|
|
cs.ProposalBlock = block |
|
|
|
// NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal
|
|
|
|
cs.Logger.Info("Received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash()) |
|
|
|
cs.eventBus.PublishEventCompleteProposal(cs.CompleteProposalEvent()) |
|
|
|
if err := cs.eventBus.PublishEventCompleteProposal(cs.CompleteProposalEvent()); err != nil { |
|
|
|
cs.Logger.Error("Error publishing event complete proposal", "err", err) |
|
|
|
} |
|
|
|
|
|
|
|
// Update Valid* if we can.
|
|
|
|
prevotes := cs.Votes.Prevotes(cs.Round) |
|
|
@ -1872,7 +1906,9 @@ func (cs *State) addVote( |
|
|
|
} |
|
|
|
|
|
|
|
cs.Logger.Info(fmt.Sprintf("Added to lastPrecommits: %v", cs.LastCommit.StringShort())) |
|
|
|
cs.eventBus.PublishEventVote(types.EventDataVote{Vote: vote}) |
|
|
|
if err := cs.eventBus.PublishEventVote(types.EventDataVote{Vote: vote}); err != nil { |
|
|
|
return added, err |
|
|
|
} |
|
|
|
cs.evsw.FireEvent(types.EventVote, vote) |
|
|
|
|
|
|
|
// if we can skip timeoutCommit and have all the votes now,
|
|
|
@ -1899,7 +1935,9 @@ func (cs *State) addVote( |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
cs.eventBus.PublishEventVote(types.EventDataVote{Vote: vote}) |
|
|
|
if err := cs.eventBus.PublishEventVote(types.EventDataVote{Vote: vote}); err != nil { |
|
|
|
return added, err |
|
|
|
} |
|
|
|
cs.evsw.FireEvent(types.EventVote, vote) |
|
|
|
|
|
|
|
switch vote.Type { |
|
|
@ -1925,7 +1963,9 @@ func (cs *State) addVote( |
|
|
|
cs.LockedRound = -1 |
|
|
|
cs.LockedBlock = nil |
|
|
|
cs.LockedBlockParts = nil |
|
|
|
cs.eventBus.PublishEventUnlock(cs.RoundStateEvent()) |
|
|
|
if err := cs.eventBus.PublishEventUnlock(cs.RoundStateEvent()); err != nil { |
|
|
|
return added, err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Update Valid* if we can.
|
|
|
@ -1949,7 +1989,9 @@ func (cs *State) addVote( |
|
|
|
cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader) |
|
|
|
} |
|
|
|
cs.evsw.FireEvent(types.EventValidBlock, &cs.RoundState) |
|
|
|
cs.eventBus.PublishEventValidBlock(cs.RoundStateEvent()) |
|
|
|
if err := cs.eventBus.PublishEventValidBlock(cs.RoundStateEvent()); err != nil { |
|
|
|
return added, err |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@ -2009,7 +2051,9 @@ func (cs *State) signVote( |
|
|
|
) (*types.Vote, error) { |
|
|
|
// Flush the WAL. Otherwise, we may not recompute the same vote to sign,
|
|
|
|
// and the privValidator will refuse to sign anything.
|
|
|
|
cs.wal.FlushAndSync() |
|
|
|
if err := cs.wal.FlushAndSync(); err != nil { |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
|
|
|
|
if cs.privValidatorPubKey == nil { |
|
|
|
return nil, errPubKeyIsNotSet |
|
|
|