Browse Source

Make ConsensusReactor use ConsensusState's blockstore; debug functions

pull/314/head
Jae Kwon 8 years ago
committed by Ethan Buchman
parent
commit
3e3b034252
2 changed files with 79 additions and 28 deletions
  1. +72
    -19
      consensus/reactor.go
  2. +7
    -9
      consensus/reactor_test.go

+ 72
- 19
consensus/reactor.go View File

@ -11,7 +11,6 @@ import (
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/go-p2p" "github.com/tendermint/go-p2p"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
bc "github.com/tendermint/tendermint/blockchain"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -23,7 +22,7 @@ const (
VoteSetBitsChannel = byte(0x23) VoteSetBitsChannel = byte(0x23)
peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send. peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
peerQueryMaj23SleepDuration = 5 * time.Second // Time to sleep after each VoteSetMaj23Message sent
peerQueryMaj23SleepDuration = 2 * time.Second // Time to sleep after each VoteSetMaj23Message sent
maxConsensusMessageSize = 1048576 // 1MB; NOTE: keep in sync with types.PartSet sizes. maxConsensusMessageSize = 1048576 // 1MB; NOTE: keep in sync with types.PartSet sizes.
) )
@ -32,17 +31,15 @@ const (
type ConsensusReactor struct { type ConsensusReactor struct {
p2p.BaseReactor // BaseService + p2p.Switch p2p.BaseReactor // BaseService + p2p.Switch
blockStore *bc.BlockStore
conS *ConsensusState
fastSync bool
evsw types.EventSwitch
conS *ConsensusState
fastSync bool
evsw types.EventSwitch
} }
func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor {
func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *ConsensusReactor {
conR := &ConsensusReactor{ conR := &ConsensusReactor{
blockStore: blockStore,
conS: consensusState,
fastSync: fastSync,
conS: consensusState,
fastSync: fastSync,
} }
conR.BaseReactor = *p2p.NewBaseReactor(log, "ConsensusReactor", conR) conR.BaseReactor = *p2p.NewBaseReactor(log, "ConsensusReactor", conR)
return conR return conR
@ -398,9 +395,9 @@ OUTER_LOOP:
//log.Info("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockParts", prs.ProposalBlockParts) //log.Info("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockParts", prs.ProposalBlockParts)
if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok { if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
// Ensure that the peer's PartSetHeader is correct // Ensure that the peer's PartSetHeader is correct
blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height)
if blockMeta == nil { if blockMeta == nil {
log.Warn("Failed to load block meta", "peer height", prs.Height, "our height", rs.Height, "blockstore height", conR.blockStore.Height(), "pv", conR.conS.privValidator)
log.Warn("Failed to load block meta", "peer height", prs.Height, "our height", rs.Height, "blockstore height", conR.conS.blockStore.Height(), "pv", conR.conS.privValidator)
time.Sleep(peerGossipSleepDuration) time.Sleep(peerGossipSleepDuration)
continue OUTER_LOOP continue OUTER_LOOP
} else if !blockMeta.PartsHeader.Equals(prs.ProposalBlockPartsHeader) { } else if !blockMeta.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
@ -410,7 +407,7 @@ OUTER_LOOP:
continue OUTER_LOOP continue OUTER_LOOP
} }
// Load the part // Load the part
part := conR.blockStore.LoadBlockPart(prs.Height, index)
part := conR.conS.blockStore.LoadBlockPart(prs.Height, index)
if part == nil { if part == nil {
log.Warn("Could not load part", "index", index, log.Warn("Could not load part", "index", index,
"peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader) "peerHeight", prs.Height, "blockPartsHeader", blockMeta.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
@ -548,8 +545,8 @@ OUTER_LOOP:
if prs.Height != 0 && rs.Height >= prs.Height+2 { if prs.Height != 0 && rs.Height >= prs.Height+2 {
// Load the block commit for prs.Height, // Load the block commit for prs.Height,
// which contains precommit signatures for prs.Height. // which contains precommit signatures for prs.Height.
commit := conR.blockStore.LoadBlockCommit(prs.Height)
log.Debug("Loaded BlockCommit for catch-up", "height", prs.Height, "commit", commit)
commit := conR.conS.blockStore.LoadBlockCommit(prs.Height)
log.Info("Loaded BlockCommit for catch-up", "height", prs.Height, "commit", commit)
if ps.PickSendVote(commit) { if ps.PickSendVote(commit) {
log.Debug("Picked Catchup commit to send") log.Debug("Picked Catchup commit to send")
continue OUTER_LOOP continue OUTER_LOOP
@ -596,8 +593,6 @@ OUTER_LOOP:
BlockID: maj23, BlockID: maj23,
}}) }})
time.Sleep(peerQueryMaj23SleepDuration) time.Sleep(peerQueryMaj23SleepDuration)
rs = conR.conS.GetRoundState()
prs = ps.GetRoundState()
} }
} }
} }
@ -642,8 +637,9 @@ OUTER_LOOP:
// Maybe send Height/CatchupCommitRound/CatchupCommit. // Maybe send Height/CatchupCommitRound/CatchupCommit.
{ {
prs := ps.GetRoundState() prs := ps.GetRoundState()
if prs.CatchupCommitRound != -1 && prs.Height <= conR.blockStore.Height() {
commit := conR.blockStore.LoadBlockCommit(prs.Height)
if prs.CatchupCommitRound != -1 && 1 < prs.Height && prs.Height <= conR.conS.blockStore.Height() {
log.Warn("uh", "CatchupCommitRound", prs.CatchupCommitRound, "prs.Height", prs.Height, "blockstoreHeight", conR.conS.blockStore.Height())
commit := conR.conS.blockStore.LoadBlockCommit(prs.Height)
peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{ peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
Height: prs.Height, Height: prs.Height,
Round: commit.Round(), Round: commit.Round(),
@ -654,6 +650,8 @@ OUTER_LOOP:
} }
} }
time.Sleep(peerQueryMaj23SleepDuration)
continue OUTER_LOOP continue OUTER_LOOP
} }
} }
@ -696,6 +694,21 @@ OUTER_LOOP:
} }
} }
func (conR *ConsensusReactor) String() string {
return conR.StringIndented("")
}
func (conR *ConsensusReactor) StringIndented(indent string) string {
s := "ConsensusReactor{\n"
s += indent + " " + conR.conS.StringIndented(indent+" ") + "\n"
for _, peer := range conR.Switch.Peers().List() {
ps := peer.Data.Get(types.PeerStateKey).(*PeerState)
s += indent + " " + ps.StringIndented(indent+" ") + "\n"
}
s += indent + "}"
return s
}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// Read only when returned by PeerState.GetRoundState(). // Read only when returned by PeerState.GetRoundState().
@ -717,6 +730,30 @@ type PeerRoundState struct {
CatchupCommit *BitArray // All commit precommits peer has for this height & CatchupCommitRound CatchupCommit *BitArray // All commit precommits peer has for this height & CatchupCommitRound
} }
func (prs PeerRoundState) String() string {
return prs.StringIndented("")
}
func (prs PeerRoundState) StringIndented(indent string) string {
return fmt.Sprintf(`PeerRoundState{
%s %v/%v/%v @%v
%s Proposal %v -> %v
%s POL %v (round %v)
%s Prevotes %v
%s Precommits %v
%s LastCommit %v (round %v)
%s Catchup %v (round %v)
%s}`,
indent, prs.Height, prs.Round, prs.Step, prs.StartTime,
indent, prs.ProposalBlockPartsHeader, prs.ProposalBlockParts,
indent, prs.ProposalPOL, prs.ProposalPOLRound,
indent, prs.Prevotes,
indent, prs.Precommits,
indent, prs.LastCommit, prs.LastCommitRound,
indent, prs.CatchupCommit, prs.CatchupCommitRound,
indent)
}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
var ( var (
@ -1072,6 +1109,22 @@ func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *
} }
} }
func (ps *PeerState) String() string {
return ps.StringIndented("")
}
func (ps *PeerState) StringIndented(indent string) string {
return fmt.Sprintf(`PeerState{
%s Key %v
%s PRS %v
%s MjQ %v
%s}`,
indent, ps.Peer.Key,
indent, ps.PeerRoundState.StringIndented(indent+" "),
indent, len(ps.Maj23Queue),
indent)
}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// Messages // Messages


+ 7
- 9
consensus/reactor_test.go View File

@ -11,11 +11,9 @@ import (
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config" cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-crypto" "github.com/tendermint/go-crypto"
dbm "github.com/tendermint/go-db"
"github.com/tendermint/go-events" "github.com/tendermint/go-events"
"github.com/tendermint/go-logger" "github.com/tendermint/go-logger"
"github.com/tendermint/go-p2p" "github.com/tendermint/go-p2p"
bc "github.com/tendermint/tendermint/blockchain"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -42,9 +40,7 @@ func TestReactor(t *testing.T) {
reactors := make([]*ConsensusReactor, N) reactors := make([]*ConsensusReactor, N)
eventChans := make([]chan interface{}, N) eventChans := make([]chan interface{}, N)
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
blockStoreDB := dbm.NewDB(Fmt("blockstore%d", i), config.GetString("db_backend"), config.GetString("db_dir"))
blockStore := bc.NewBlockStore(blockStoreDB)
reactors[i] = NewConsensusReactor(css[i], blockStore, false)
reactors[i] = NewConsensusReactor(css[i], false)
reactors[i].SetPrivValidator(css[i].privValidator) reactors[i].SetPrivValidator(css[i].privValidator)
eventSwitch := events.NewEventSwitch() eventSwitch := events.NewEventSwitch()
@ -71,6 +67,7 @@ func TestReactor(t *testing.T) {
}(i) }(i)
} }
// Make wait into a channel
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
wg.Wait() wg.Wait()
@ -104,9 +101,6 @@ func TestByzantine(t *testing.T) {
reactors := make([]p2p.Reactor, N) reactors := make([]p2p.Reactor, N)
eventChans := make([]chan interface{}, N) eventChans := make([]chan interface{}, N)
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
blockStoreDB := dbm.NewDB(Fmt("blockstore%d", i), config.GetString("db_backend"), config.GetString("db_dir"))
blockStore := bc.NewBlockStore(blockStoreDB)
var privVal PrivValidator var privVal PrivValidator
privVal = css[i].privValidator privVal = css[i].privValidator
if i == 0 { if i == 0 {
@ -127,7 +121,7 @@ func TestByzantine(t *testing.T) {
} }
eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1) eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1)
conR := NewConsensusReactor(css[i], blockStore, false)
conR := NewConsensusReactor(css[i], false)
conR.SetPrivValidator(privVal) conR.SetPrivValidator(privVal)
conR.SetEventSwitch(eventSwitch) conR.SetEventSwitch(eventSwitch)
@ -195,6 +189,10 @@ func TestByzantine(t *testing.T) {
select { select {
case <-done: case <-done:
case <-tick.C: case <-tick.C:
for i, reactor := range reactors {
t.Log(Fmt("Consensus Reactor %v", i))
t.Log(Fmt("%v", reactor))
}
t.Fatalf("Timed out waiting for all validators to commit first block") t.Fatalf("Timed out waiting for all validators to commit first block")
} }
} }


Loading…
Cancel
Save