From c9698e4848749503673d0ced1c503947de47e2ff Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 22 Dec 2016 21:51:58 -0500 Subject: [PATCH] fixes from review --- blockchain/store.go | 19 +++++++++---------- consensus/common.go | 3 +++ consensus/common_test.go | 2 +- consensus/reactor_test.go | 4 ++-- consensus/replay_test.go | 19 +++++++++++-------- 5 files changed, 26 insertions(+), 21 deletions(-) diff --git a/blockchain/store.go b/blockchain/store.go index 82043900c..db8974651 100644 --- a/blockchain/store.go +++ b/blockchain/store.go @@ -30,7 +30,7 @@ Panics indicate probable corruption in the data type BlockStore struct { db dbm.DB - mtx sync.Mutex + mtx sync.RWMutex height int } @@ -44,8 +44,8 @@ func NewBlockStore(db dbm.DB) *BlockStore { // Height() returns the last known contiguous block height. func (bs *BlockStore) Height() int { - bs.mtx.Lock() - defer bs.mtx.Unlock() + bs.mtx.RLock() + defer bs.mtx.RUnlock() return bs.height } @@ -146,9 +146,8 @@ func (bs *BlockStore) LoadSeenCommit(height int) *types.Commit { // most recent height. Otherwise they'd stall at H-1. func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { height := block.Height - bsHeight := bs.Height() - if height != bsHeight+1 { - PanicSanity(Fmt("BlockStore can only save contiguous blocks. Wanted %v, got %v", bsHeight+1, height)) + if height != bs.Height()+1 { + PanicSanity(Fmt("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.Height()+1, height)) } if !blockParts.IsComplete() { PanicSanity(Fmt("BlockStore can only save complete block part sets")) @@ -161,7 +160,7 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s // Save block parts for i := 0; i < blockParts.Total(); i++ { - bs.saveBlockPart(height, bsHeight, i, blockParts.GetPart(i)) + bs.saveBlockPart(height, i, blockParts.GetPart(i)) } // Save block commit (duplicate and separate from the Block) @@ -185,9 +184,9 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s bs.db.SetSync(nil, nil) } -func (bs *BlockStore) saveBlockPart(height, bsHeight int, index int, part *types.Part) { - if height != bsHeight+1 { - PanicSanity(Fmt("BlockStore can only save contiguous blocks. Wanted %v, got %v", bsHeight+1, height)) +func (bs *BlockStore) saveBlockPart(height int, index int, part *types.Part) { + if height != bs.Height()+1 { + PanicSanity(Fmt("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.Height()+1, height)) } partBytes := wire.BinaryBytes(part) bs.db.Set(calcBlockPartKey(height, index), partBytes) diff --git a/consensus/common.go b/consensus/common.go index 1f78c585a..6f76d1887 100644 --- a/consensus/common.go +++ b/consensus/common.go @@ -4,6 +4,9 @@ import ( "github.com/tendermint/tendermint/types" ) +// XXX: WARNING: these functions can halt the consensus as firing events is synchronous. +// Make sure to read off the channels, and in the case of subscribeToEventRespond, to write back on it + // NOTE: if chanCap=0, this blocks on the event being consumed func subscribeToEvent(evsw types.EventSwitch, receiver, eventID string, chanCap int) chan interface{} { // listen for event diff --git a/consensus/common_test.go b/consensus/common_test.go index ae9b97af6..b3ffaa2b6 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -393,7 +393,7 @@ func newMockTickerFunc(onlyOnce bool) func() TimeoutTicker { } } -// mock ticker only fires once +// mock ticker only fires on RoundStepNewHeight // and only once if onlyOnce=true type mockTicker struct { c chan timeoutInfo diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 6888fdabb..037118e0d 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -79,8 +79,8 @@ func TestValidatorSetChanges(t *testing.T) { }, p2p.Connect2Switches) // now that everyone is connected, start the state machines - // (otherwise, we could block forever in firing new block while a peer is trying to - // access state info for AddPeer) + // If we started the state machines before everyone was connected, + // we'd block when the cs fires NewBlockEvent and the peers are trying to start their reactors for i := 0; i < nPeers; i++ { s := reactors[i].conS.GetState() reactors[i].SwitchToConsensus(s) diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 149ffb2f0..2d3d131dd 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -15,9 +15,10 @@ import ( ) // TODO: these tests ensure we can always recover from any state of the wal, -// assuming a related state of the priv val -// it would be better to verify explicitly which states we can recover from without the wal -// and which ones we need the wal for +// assuming it comes with a correct related state for the priv_validator.json. +// It would be better to verify explicitly which states we can recover from without the wal +// and which ones we need the wal for - then we'd also be able to only flush the +// wal writer when we need to, instead of with every message. var data_dir = path.Join(GoPath, "src/github.com/tendermint/tendermint/consensus", "test_data") @@ -147,7 +148,7 @@ func setupReplayTest(thisCase *testCase, nLines int, crashAfter bool) (*Consensu return cs, newBlockCh, lastMsg, walDir } -func readJSON(t *testing.T, walMsg string) TimedWALMessage { +func readTimedWALMessage(t *testing.T, walMsg string) TimedWALMessage { var err error var msg TimedWALMessage wire.ReadJSON(&msg, []byte(walMsg), &err) @@ -178,8 +179,9 @@ func TestReplayCrashAfterWrite(t *testing.T) { func TestReplayCrashBeforeWritePropose(t *testing.T) { for _, thisCase := range testCases { lineNum := thisCase.proposeLine - cs, newBlockCh, proposalMsg, walDir := setupReplayTest(thisCase, lineNum, false) // propose - msg := readJSON(t, proposalMsg) + // setup replay test where last message is a proposal + cs, newBlockCh, proposalMsg, walDir := setupReplayTest(thisCase, lineNum, false) + msg := readTimedWALMessage(t, proposalMsg) proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage) // Set LastSig toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal) @@ -201,9 +203,10 @@ func TestReplayCrashBeforeWritePrecommit(t *testing.T) { } func testReplayCrashBeforeWriteVote(t *testing.T, thisCase *testCase, lineNum int, eventString string) { - cs, newBlockCh, voteMsg, walDir := setupReplayTest(thisCase, lineNum, false) // prevote + // setup replay test where last message is a vote + cs, newBlockCh, voteMsg, walDir := setupReplayTest(thisCase, lineNum, false) types.AddListenerForEvent(cs.evsw, "tester", eventString, func(data types.TMEventData) { - msg := readJSON(t, voteMsg) + msg := readTimedWALMessage(t, voteMsg) vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) // Set LastSig toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote)