From c79062ef6ad8415650efec7868acaab9ee7130f7 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Thu, 9 Jul 2015 21:46:15 -0700 Subject: [PATCH] Fix state/State race condition bug. --- consensus/reactor.go | 63 ++++++++++++++++++-------------------------- consensus/state.go | 13 ++++++--- node/node.go | 4 +-- types/block.go | 10 +++---- 4 files changed, 41 insertions(+), 49 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index 3b00fe3cf..2df09e5af 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -26,14 +26,11 @@ const ( PeerStateKey = "ConsensusReactor.peerState" - peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send. - rebroadcastRoundStepDuration = 1000 * time.Millisecond // Time to sleep if there's nothing to send. + peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send. ) //----------------------------------------------------------------------------- -// The reactor's underlying ConsensusState may change state at any time. -// We atomically copy the RoundState struct before using it. type ConsensusReactor struct { sw *p2p.Switch running uint32 @@ -41,19 +38,17 @@ type ConsensusReactor struct { blockStore *bc.BlockStore conS *ConsensusState - - // if fast sync is running we don't really do anything - sync bool + fastSync bool evsw events.Fireable } -func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, sync bool) *ConsensusReactor { +func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor { conR := &ConsensusReactor{ quit: make(chan struct{}), blockStore: blockStore, conS: consensusState, - sync: sync, + fastSync: fastSync, } return conR } @@ -61,13 +56,12 @@ func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockSto // Implements Reactor func (conR *ConsensusReactor) Start(sw *p2p.Switch) { if atomic.CompareAndSwapUint32(&conR.running, 0, 1) { - log.Info("Starting ConsensusReactor") + log.Info("Starting ConsensusReactor", "fastSync", conR.fastSync) conR.sw = sw - if !conR.sync { + if !conR.fastSync { conR.conS.Start() } go conR.broadcastNewRoundStepRoutine() - // go conR.rebroadcastRoundStepRoutine() } } @@ -121,7 +115,10 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { go conR.gossipVotesRoutine(peer, peerState) // Send our state to peer. - conR.sendNewRoundStepMessage(peer) + // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus(). + if !conR.fastSync { + conR.sendNewRoundStepMessage(peer) + } } // Implements Reactor @@ -148,7 +145,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg_, "error", err, "bytes", msgBytes) return } - log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_) //, "bytes", msgBytes) + log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_, "rsHeight", rs.Height) //, "bytes", msgBytes) switch chId { case StateChannel: @@ -164,6 +161,10 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte } case DataChannel: + if conR.fastSync { + log.Warn("Ignoring message received during fastSync", "msg", msg_) + return + } switch msg := msg_.(type) { case *ProposalMessage: ps.SetHasProposal(msg.Proposal) @@ -178,6 +179,10 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte } case VoteChannel: + if conR.fastSync { + log.Warn("Ignoring message received during fastSync", "msg", msg_) + return + } switch msg := msg_.(type) { case *VoteMessage: vote := msg.Vote @@ -266,11 +271,14 @@ func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) { conR.conS.SetPrivValidator(priv) } -// Switch from the fast sync to the consensus: -// reset the state, turn off fast sync, start the consensus-state-machine +// Switch from the fast_sync to the consensus: +// reset the state, turn off fast_sync, start the consensus-state-machine func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) { + log.Info("SwitchToConsensus") + // NOTE: The line below causes broadcastNewRoundStepRoutine() to + // broadcast a NewRoundStepMessage. conR.conS.updateToState(state, false) - conR.sync = false + conR.fastSync = false conR.conS.Start() } @@ -322,26 +330,6 @@ func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() { } } -/* TODO delete -// Periodically broadcast NewRoundStepMessage. -// This is a hack. TODO remove the need for it? -// The issue is with Start() happening after a NewRoundStep message -// was received from a peer, for the bootstrapping set. -func (conR *ConsensusReactor) rebroadcastRoundStepRoutine() { - for { - time.Sleep(rebroadcastRoundStepDuration) - rs := conR.conS.GetRoundState() - nrsMsg, csMsg := makeRoundStepMessages(rs) - if nrsMsg != nil { - conR.sw.Broadcast(StateChannel, nrsMsg) - } - if csMsg != nil { - conR.sw.Broadcast(StateChannel, csMsg) - } - } -} -*/ - func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) { rs := conR.conS.GetRoundState() nrsMsg, csMsg := makeRoundStepMessages(rs) @@ -833,7 +821,6 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *Roun defer ps.mtx.Unlock() // Ignore duplicate messages. - // TODO: This is only necessary because rebroadcastRoundStepRoutine. if ps.Height == msg.Height && ps.Round == msg.Round && ps.Step == msg.Step { return } diff --git a/consensus/state.go b/consensus/state.go index 807c84378..a99eefbaa 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -328,6 +328,9 @@ func (cs *ConsensusState) reconstructLastCommit(state *sm.State) { lastPrecommits := NewVoteSet(state.LastBlockHeight, 0, types.VoteTypePrecommit, state.LastBondedValidators) seenValidation := cs.blockStore.LoadSeenValidation(state.LastBlockHeight) for idx, precommit := range seenValidation.Precommits { + if precommit == nil { + continue + } added, _, err := lastPrecommits.AddByIndex(idx, precommit) if !added || err != nil { panic(Fmt("Failed to reconstruct LastCommit: %v", err)) @@ -386,10 +389,6 @@ func (cs *ConsensusState) Stop() { } } -func (cs *ConsensusState) IsStopped() bool { - return atomic.LoadUint32(&cs.stopped) == 1 -} - // Updates ConsensusState and increments height to match that of state. // The round becomes 0 and cs.Step becomes RoundStepNewHeight. func (cs *ConsensusState) updateToState(state *sm.State, contiguous bool) { @@ -398,6 +397,12 @@ func (cs *ConsensusState) updateToState(state *sm.State, contiguous bool) { panic(Fmt("updateToState() expected state height of %v but found %v", cs.Height, state.LastBlockHeight)) } + if cs.state != nil && cs.state.LastBlockHeight+1 != cs.Height { + // This might happen when someone else is mutating cs.state. + // Someone forgot to pass in state.Copy() somewhere! + panic(Fmt("Inconsistent cs.state.LastBlockHeight+1 %v vs cs.Height %v", + cs.state.LastBlockHeight+1, cs.Height)) + } // END SANITY CHECK // If state isn't further out than cs.state, just ignore. diff --git a/node/node.go b/node/node.go index 47e3eee41..1075c1d7f 100644 --- a/node/node.go +++ b/node/node.go @@ -99,14 +99,14 @@ func NewNode() *Node { pexReactor := p2p.NewPEXReactor(book) // Get BlockchainReactor - bcReactor := bc.NewBlockchainReactor(state, blockStore, config.GetBool("fast_sync")) + bcReactor := bc.NewBlockchainReactor(state.Copy(), blockStore, config.GetBool("fast_sync")) // Get MempoolReactor mempool := mempl.NewMempool(state.Copy()) mempoolReactor := mempl.NewMempoolReactor(mempool) // Get ConsensusReactor - consensusState := consensus.NewConsensusState(state, blockStore, mempoolReactor) + consensusState := consensus.NewConsensusState(state.Copy(), blockStore, mempoolReactor) consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore, config.GetBool("fast_sync")) if privValidator != nil { consensusReactor.SetPrivValidator(privValidator) diff --git a/types/block.go b/types/block.go index d607c2c17..c34387a86 100644 --- a/types/block.go +++ b/types/block.go @@ -24,19 +24,19 @@ type Block struct { func (b *Block) ValidateBasic(chainID string, lastBlockHeight int, lastBlockHash []byte, lastBlockParts PartSetHeader, lastBlockTime time.Time) error { if b.ChainID != chainID { - return errors.New("Wrong Block.Header.ChainID") + return errors.New(Fmt("Wrong Block.Header.ChainID. Expected %v, got %v", chainID, b.ChainID)) } if b.Height != lastBlockHeight+1 { - return errors.New("Wrong Block.Header.Height") + return errors.New(Fmt("Wrong Block.Header.Height. Expected %v, got %v", lastBlockHeight+1, b.Height)) } if b.NumTxs != len(b.Data.Txs) { - return errors.New("Wrong Block.Header.NumTxs") + return errors.New(Fmt("Wrong Block.Header.NumTxs. Expected %v, got %v", len(b.Data.Txs), b.NumTxs)) } if !bytes.Equal(b.LastBlockHash, lastBlockHash) { - return errors.New("Wrong Block.Header.LastBlockHash") + return errors.New(Fmt("Wrong Block.Header.LastBlockHash. Expected %X, got %X", lastBlockHash, b.LastBlockHash)) } if !b.LastBlockParts.Equals(lastBlockParts) { - return errors.New("Wrong Block.Header.LastBlockParts") + return errors.New(Fmt("Wrong Block.Header.LastBlockParts. Expected %v, got %v", lastBlockParts, b.LastBlockParts)) } /* TODO: Determine bounds See blockchain/reactor "stopSyncingDurationMinutes"