From 3e3b034252affc9936d500a466872bb8eeb92e9c Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Thu, 15 Sep 2016 16:01:01 -0700 Subject: [PATCH] Make ConsensusReactor use ConsensusState's blockstore; debug functions --- consensus/reactor.go | 91 +++++++++++++++++++++++++++++++-------- consensus/reactor_test.go | 16 +++---- 2 files changed, 79 insertions(+), 28 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index 60c35fea1..4171c7a84 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -11,7 +11,6 @@ import ( . "github.com/tendermint/go-common" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" - bc "github.com/tendermint/tendermint/blockchain" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) @@ -23,7 +22,7 @@ const ( VoteSetBitsChannel = byte(0x23) peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send. - peerQueryMaj23SleepDuration = 5 * time.Second // Time to sleep after each VoteSetMaj23Message sent + peerQueryMaj23SleepDuration = 2 * time.Second // Time to sleep after each VoteSetMaj23Message sent maxConsensusMessageSize = 1048576 // 1MB; NOTE: keep in sync with types.PartSet sizes. ) @@ -32,17 +31,15 @@ const ( type ConsensusReactor struct { p2p.BaseReactor // BaseService + p2p.Switch - blockStore *bc.BlockStore - conS *ConsensusState - fastSync bool - evsw types.EventSwitch + conS *ConsensusState + fastSync bool + evsw types.EventSwitch } -func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor { +func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *ConsensusReactor { conR := &ConsensusReactor{ - blockStore: blockStore, - conS: consensusState, - fastSync: fastSync, + conS: consensusState, + fastSync: fastSync, } conR.BaseReactor = *p2p.NewBaseReactor(log, "ConsensusReactor", conR) return conR @@ -398,9 +395,9 @@ OUTER_LOOP: //log.Info("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockParts", prs.ProposalBlockParts) if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok { // Ensure that the peer's PartSetHeader is correct - blockMeta := conR.blockStore.LoadBlockMeta(prs.Height) + blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height) if blockMeta == nil { - log.Warn("Failed to load block meta", "peer height", prs.Height, "our height", rs.Height, "blockstore height", conR.blockStore.Height(), "pv", conR.conS.privValidator) + log.Warn("Failed to load block meta", "peer height", prs.Height, "our height", rs.Height, "blockstore height", conR.conS.blockStore.Height(), "pv", conR.conS.privValidator) time.Sleep(peerGossipSleepDuration) continue OUTER_LOOP } else if !blockMeta.PartsHeader.Equals(prs.ProposalBlockPartsHeader) { @@ -410,7 +407,7 @@ OUTER_LOOP: continue OUTER_LOOP } // Load the part - part := conR.blockStore.LoadBlockPart(prs.Height, index) + part := conR.conS.blockStore.LoadBlockPart(prs.Height, index) if part == nil { log.Warn("Could not load part", "index", index, "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader) @@ -548,8 +545,8 @@ OUTER_LOOP: if prs.Height != 0 && rs.Height >= prs.Height+2 { // Load the block commit for prs.Height, // which contains precommit signatures for prs.Height. - commit := conR.blockStore.LoadBlockCommit(prs.Height) - log.Debug("Loaded BlockCommit for catch-up", "height", prs.Height, "commit", commit) + commit := conR.conS.blockStore.LoadBlockCommit(prs.Height) + log.Info("Loaded BlockCommit for catch-up", "height", prs.Height, "commit", commit) if ps.PickSendVote(commit) { log.Debug("Picked Catchup commit to send") continue OUTER_LOOP @@ -596,8 +593,6 @@ OUTER_LOOP: BlockID: maj23, }}) time.Sleep(peerQueryMaj23SleepDuration) - rs = conR.conS.GetRoundState() - prs = ps.GetRoundState() } } } @@ -642,8 +637,9 @@ OUTER_LOOP: // Maybe send Height/CatchupCommitRound/CatchupCommit. { prs := ps.GetRoundState() - if prs.CatchupCommitRound != -1 && prs.Height <= conR.blockStore.Height() { - commit := conR.blockStore.LoadBlockCommit(prs.Height) + if prs.CatchupCommitRound != -1 && 1 < prs.Height && prs.Height <= conR.conS.blockStore.Height() { + log.Warn("uh", "CatchupCommitRound", prs.CatchupCommitRound, "prs.Height", prs.Height, "blockstoreHeight", conR.conS.blockStore.Height()) + commit := conR.conS.blockStore.LoadBlockCommit(prs.Height) peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{ Height: prs.Height, Round: commit.Round(), @@ -654,6 +650,8 @@ OUTER_LOOP: } } + time.Sleep(peerQueryMaj23SleepDuration) + continue OUTER_LOOP } } @@ -696,6 +694,21 @@ OUTER_LOOP: } } +func (conR *ConsensusReactor) String() string { + return conR.StringIndented("") +} + +func (conR *ConsensusReactor) StringIndented(indent string) string { + s := "ConsensusReactor{\n" + s += indent + " " + conR.conS.StringIndented(indent+" ") + "\n" + for _, peer := range conR.Switch.Peers().List() { + ps := peer.Data.Get(types.PeerStateKey).(*PeerState) + s += indent + " " + ps.StringIndented(indent+" ") + "\n" + } + s += indent + "}" + return s +} + //----------------------------------------------------------------------------- // Read only when returned by PeerState.GetRoundState(). @@ -717,6 +730,30 @@ type PeerRoundState struct { CatchupCommit *BitArray // All commit precommits peer has for this height & CatchupCommitRound } +func (prs PeerRoundState) String() string { + return prs.StringIndented("") +} + +func (prs PeerRoundState) StringIndented(indent string) string { + return fmt.Sprintf(`PeerRoundState{ +%s %v/%v/%v @%v +%s Proposal %v -> %v +%s POL %v (round %v) +%s Prevotes %v +%s Precommits %v +%s LastCommit %v (round %v) +%s Catchup %v (round %v) +%s}`, + indent, prs.Height, prs.Round, prs.Step, prs.StartTime, + indent, prs.ProposalBlockPartsHeader, prs.ProposalBlockParts, + indent, prs.ProposalPOL, prs.ProposalPOLRound, + indent, prs.Prevotes, + indent, prs.Precommits, + indent, prs.LastCommit, prs.LastCommitRound, + indent, prs.CatchupCommit, prs.CatchupCommitRound, + indent) +} + //----------------------------------------------------------------------------- var ( @@ -1072,6 +1109,22 @@ func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes * } } +func (ps *PeerState) String() string { + return ps.StringIndented("") +} + +func (ps *PeerState) StringIndented(indent string) string { + return fmt.Sprintf(`PeerState{ +%s Key %v +%s PRS %v +%s MjQ %v +%s}`, + indent, ps.Peer.Key, + indent, ps.PeerRoundState.StringIndented(indent+" "), + indent, len(ps.Maj23Queue), + indent) +} + //----------------------------------------------------------------------------- // Messages diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index becb73a80..6b76d2aaa 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -11,11 +11,9 @@ import ( . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" "github.com/tendermint/go-crypto" - dbm "github.com/tendermint/go-db" "github.com/tendermint/go-events" "github.com/tendermint/go-logger" "github.com/tendermint/go-p2p" - bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/types" ) @@ -42,9 +40,7 @@ func TestReactor(t *testing.T) { reactors := make([]*ConsensusReactor, N) eventChans := make([]chan interface{}, N) for i := 0; i < N; i++ { - blockStoreDB := dbm.NewDB(Fmt("blockstore%d", i), config.GetString("db_backend"), config.GetString("db_dir")) - blockStore := bc.NewBlockStore(blockStoreDB) - reactors[i] = NewConsensusReactor(css[i], blockStore, false) + reactors[i] = NewConsensusReactor(css[i], false) reactors[i].SetPrivValidator(css[i].privValidator) eventSwitch := events.NewEventSwitch() @@ -71,6 +67,7 @@ func TestReactor(t *testing.T) { }(i) } + // Make wait into a channel done := make(chan struct{}) go func() { wg.Wait() @@ -104,9 +101,6 @@ func TestByzantine(t *testing.T) { reactors := make([]p2p.Reactor, N) eventChans := make([]chan interface{}, N) for i := 0; i < N; i++ { - blockStoreDB := dbm.NewDB(Fmt("blockstore%d", i), config.GetString("db_backend"), config.GetString("db_dir")) - blockStore := bc.NewBlockStore(blockStoreDB) - var privVal PrivValidator privVal = css[i].privValidator if i == 0 { @@ -127,7 +121,7 @@ func TestByzantine(t *testing.T) { } eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1) - conR := NewConsensusReactor(css[i], blockStore, false) + conR := NewConsensusReactor(css[i], false) conR.SetPrivValidator(privVal) conR.SetEventSwitch(eventSwitch) @@ -195,6 +189,10 @@ func TestByzantine(t *testing.T) { select { case <-done: case <-tick.C: + for i, reactor := range reactors { + t.Log(Fmt("Consensus Reactor %v", i)) + t.Log(Fmt("%v", reactor)) + } t.Fatalf("Timed out waiting for all validators to commit first block") } }