From 08d1b50570d197063abe6ba339913a065c579fc4 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Sat, 1 Nov 2014 04:04:58 -0700 Subject: [PATCH] Store BlockParts && also Validation on its own. --- blocks/store.go | 173 +++++++++++++++++++++++++++++----------- consensus/reactor.go | 74 +++++++++++++++-- consensus/state.go | 34 +++++--- consensus/state_test.go | 10 +-- 4 files changed, 218 insertions(+), 73 deletions(-) diff --git a/blocks/store.go b/blocks/store.go index f0c2f2848..6f89369b1 100644 --- a/blocks/store.go +++ b/blocks/store.go @@ -4,45 +4,13 @@ import ( "bytes" "encoding/binary" "encoding/json" + "io" . "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" db_ "github.com/tendermint/tendermint/db" ) -var ( - blockStoreKey = []byte("blockStore") -) - -//----------------------------------------------------------------------------- - -type BlockStoreJSON struct { - Height uint32 -} - -func (bsj BlockStoreJSON) Save(db db_.DB) { - bytes, err := json.Marshal(bsj) - if err != nil { - Panicf("Could not marshal state bytes: %v", err) - } - db.Set(blockStoreKey, bytes) -} - -func LoadBlockStoreJSON(db db_.DB) BlockStoreJSON { - bytes := db.Get(blockStoreKey) - if bytes == nil { - return BlockStoreJSON{ - Height: 0, - } - } - bsj := BlockStoreJSON{} - err := json.Unmarshal(bytes, &bsj) - if err != nil { - Panicf("Could not unmarshal bytes: %X", bytes) - } - return bsj -} - //----------------------------------------------------------------------------- /* @@ -66,37 +34,152 @@ func (bs *BlockStore) Height() uint32 { return bs.height } -func (bs *BlockStore) LoadBlock(height uint32) *Block { - blockBytes := bs.db.Get(calcBlockKey(height)) - if blockBytes == nil { +func (bs *BlockStore) GetReader(key []byte) io.Reader { + bytez := bs.db.Get(key) + if bytez == nil { return nil } + return bytes.NewReader(bytez) +} + +func (bs *BlockStore) LoadBlock(height uint32) *Block { var n int64 var err error - block := ReadBlock(bytes.NewReader(blockBytes), &n, &err) + meta := ReadBlockMeta(bs.GetReader(calcBlockMetaKey(height)), &n, &err) + if err != nil { + Panicf("Error reading block meta: %v", err) + } + bytez := []byte{} + for i := uint16(0); i < meta.Parts.Total; i++ { + part := bs.LoadBlockPart(height, i) + bytez = append(bytez, part.Bytes...) + } + block := ReadBlock(bytes.NewReader(bytez), &n, &err) if err != nil { Panicf("Error reading block: %v", err) } return block } -// Writes are synchronous and atomic. -func (bs *BlockStore) SaveBlock(block *Block) { +func (bs *BlockStore) LoadBlockPart(height uint32, index uint16) *Part { + var n int64 + var err error + part := ReadPart(bs.GetReader(calcBlockPartKey(height, index)), &n, &err) + if err != nil { + Panicf("Error reading block part: %v", err) + } + return part +} + +func (bs *BlockStore) LoadBlockValidation(height uint32) *Validation { + var n int64 + var err error + validation := ReadValidation(bs.GetReader(calcBlockValidationKey(height)), &n, &err) + if err != nil { + Panicf("Error reading validation: %v", err) + } + return &validation +} + +func (bs *BlockStore) SaveBlock(block *Block, blockParts *PartSet) { height := block.Height if height != bs.height+1 { Panicf("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.height+1, height) } - // Save block - blockBytes := BinaryBytes(block) - bs.db.Set(calcBlockKey(height), blockBytes) + if !blockParts.IsComplete() { + Panicf("BlockStore can only save complete block part sets") + } + meta := BlockMeta{Hash: block.Hash(), Parts: blockParts.Header()} + // Save block meta + metaBytes := BinaryBytes(meta) + bs.db.Set(calcBlockMetaKey(height), metaBytes) + // Save block parts + for i := uint16(0); i < blockParts.Total(); i++ { + bs.saveBlockPart(height, i, blockParts.GetPart(i)) + } + // Save block validation (duplicate and separate) + validationBytes := BinaryBytes(&block.Validation) + bs.db.Set(calcBlockValidationKey(height), validationBytes) // Save new BlockStoreJSON descriptor BlockStoreJSON{Height: height}.Save(bs.db) } +func (bs *BlockStore) saveBlockPart(height uint32, index uint16, part *Part) { + if height != bs.height+1 { + Panicf("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.height+1, height) + } + partBytes := BinaryBytes(part) + bs.db.Set(calcBlockPartKey(height, index), partBytes) +} + +//----------------------------------------------------------------------------- + +type BlockMeta struct { + Hash []byte + Parts PartSetHeader +} + +func ReadBlockMeta(r io.Reader, n *int64, err *error) BlockMeta { + return BlockMeta{ + Hash: ReadByteSlice(r, n, err), + Parts: ReadPartSetHeader(r, n, err), + } +} + +func (bm BlockMeta) WriteTo(w io.Writer) (n int64, err error) { + WriteByteSlice(w, bm.Hash, &n, &err) + WriteBinary(w, bm.Parts, &n, &err) + return +} + //----------------------------------------------------------------------------- -func calcBlockKey(height uint32) []byte { - buf := [9]byte{'B'} - binary.BigEndian.PutUint32(buf[1:9], height) +func calcBlockMetaKey(height uint32) []byte { + buf := [5]byte{'H'} + binary.BigEndian.PutUint32(buf[1:5], height) return buf[:] } + +func calcBlockPartKey(height uint32, partIndex uint16) []byte { + buf := [7]byte{'P'} + binary.BigEndian.PutUint32(buf[1:5], height) + binary.BigEndian.PutUint16(buf[5:7], partIndex) + return buf[:] +} + +func calcBlockValidationKey(height uint32) []byte { + buf := [5]byte{'V'} + binary.BigEndian.PutUint32(buf[1:5], height) + return buf[:] +} + +//----------------------------------------------------------------------------- + +var blockStoreKey = []byte("blockStore") + +type BlockStoreJSON struct { + Height uint32 +} + +func (bsj BlockStoreJSON) Save(db db_.DB) { + bytes, err := json.Marshal(bsj) + if err != nil { + Panicf("Could not marshal state bytes: %v", err) + } + db.Set(blockStoreKey, bytes) +} + +func LoadBlockStoreJSON(db db_.DB) BlockStoreJSON { + bytes := db.Get(blockStoreKey) + if bytes == nil { + return BlockStoreJSON{ + Height: 0, + } + } + bsj := BlockStoreJSON{} + err := json.Unmarshal(bytes, &bsj) + if err != nil { + Panicf("Could not unmarshal bytes: %X", bytes) + } + return bsj +} diff --git a/consensus/reactor.go b/consensus/reactor.go index 730576430..eee180cf7 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -125,6 +125,10 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte msg := msg_.(*NewRoundStepMessage) ps.ApplyNewRoundStepMessage(msg, rs) + case *CommitMessage: + msg := msg_.(*CommitMessage) + ps.ApplyCommitMessage(msg) + case *HasVotesMessage: msg := msg_.(*HasVotesMessage) ps.ApplyHasVotesMessage(msg) @@ -211,7 +215,6 @@ func (conR *ConsensusReactor) SetPrivValidator(priv *PrivValidator) { //-------------------------------------- -// XXX We need to ensure that Proposal* etc are also set appropriately. // Listens for changes to the ConsensusState.Step by pulling // on conR.conS.NewStepCh(). func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() { @@ -229,13 +232,25 @@ func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() { timeElapsed := rs.StartTime.Sub(time.Now()) // Broadcast NewRoundStepMessage - msg := &NewRoundStepMessage{ - Height: rs.Height, - Round: rs.Round, - Step: rs.Step, - SecondsSinceStartTime: uint32(timeElapsed.Seconds()), + { + msg := &NewRoundStepMessage{ + Height: rs.Height, + Round: rs.Round, + Step: rs.Step, + SecondsSinceStartTime: uint32(timeElapsed.Seconds()), + } + conR.sw.Broadcast(StateCh, msg) + } + + // If the step is commit, then also broadcast a CommitMessage. + if rs.Step == RoundStepCommit { + msg := &CommitMessage{ + Height: rs.Height, + BlockParts: rs.ProposalBlockParts.Header(), + BlockBitArray: rs.ProposalBlockParts.BitArray(), + } + conR.sw.Broadcast(StateCh, msg) } - conR.sw.Broadcast(StateCh, msg) } } @@ -521,6 +536,18 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *Roun } } +func (ps *PeerState) ApplyCommitMessage(msg *CommitMessage) { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + if ps.Height != msg.Height { + return + } + + ps.ProposalBlockParts = msg.BlockParts + ps.ProposalBlockBitArray = msg.BlockBitArray +} + func (ps *PeerState) ApplyHasVotesMessage(msg *HasVotesMessage) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -546,7 +573,8 @@ const ( msgTypeUnknown = byte(0x00) // Messages for communicating state changes msgTypeNewRoundStep = byte(0x01) - msgTypeHasVotes = byte(0x02) + msgTypeCommit = byte(0x02) + msgTypeHasVotes = byte(0x03) // Messages of data msgTypeProposal = byte(0x11) msgTypePart = byte(0x12) // both block & POL @@ -563,6 +591,8 @@ func decodeMessage(bz []byte) (msgType byte, msg interface{}) { // Messages for communicating state changes case msgTypeNewRoundStep: msg = readNewRoundStepMessage(r, n, err) + case msgTypeCommit: + msg = readCommitMessage(r, n, err) case msgTypeHasVotes: msg = readHasVotesMessage(r, n, err) // Messages of data @@ -611,6 +641,34 @@ func (m *NewRoundStepMessage) String() string { //------------------------------------- +type CommitMessage struct { + Height uint32 + BlockParts PartSetHeader + BlockBitArray BitArray +} + +func readCommitMessage(r io.Reader, n *int64, err *error) *CommitMessage { + return &CommitMessage{ + Height: ReadUInt32(r, n, err), + BlockParts: ReadPartSetHeader(r, n, err), + BlockBitArray: ReadBitArray(r, n, err), + } +} + +func (m *CommitMessage) WriteTo(w io.Writer) (n int64, err error) { + WriteByte(w, msgTypeCommit, &n, &err) + WriteUInt32(w, m.Height, &n, &err) + WriteBinary(w, m.BlockParts, &n, &err) + WriteBinary(w, m.BlockBitArray, &n, &err) + return +} + +func (m *CommitMessage) String() string { + return fmt.Sprintf("[Commit %v/%v/%v]", m.Height, m.BlockParts, m.BlockBitArray) +} + +//------------------------------------- + type HasVotesMessage struct { Height uint32 Round uint16 diff --git a/consensus/state.go b/consensus/state.go index 44ec75bb4..a4711c7ae 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -416,6 +416,7 @@ func (cs *ConsensusState) updateToState(state *state.State) { } } +// After the call cs.Step becomes RoundStepNewRound. func (cs *ConsensusState) setupNewRound(round uint16) { // Sanity check if round == 0 { @@ -461,6 +462,7 @@ func (cs *ConsensusState) SetupNewRound(height uint32, desiredRound uint16) bool return false } cs.setupNewRound(desiredRound) + // c.Step is now RoundStepNewRound cs.newStepCh <- cs.getRoundState() return true } @@ -471,8 +473,10 @@ func (cs *ConsensusState) RunActionPropose(height uint32, round uint16) { if cs.Height != height || cs.Round != round { return } - cs.Step = RoundStepPropose - cs.newStepCh <- cs.getRoundState() + defer func() { + cs.Step = RoundStepPropose + cs.newStepCh <- cs.getRoundState() + }() // Nothing to do if it's not our turn. if cs.PrivValidator == nil || cs.Validators.Proposer().Id != cs.PrivValidator.Id { @@ -550,8 +554,10 @@ func (cs *ConsensusState) RunActionPrevote(height uint32, round uint16) { if cs.Height != height || cs.Round != round { Panicf("RunActionPrevote(%v/%v), expected %v/%v", height, round, cs.Height, cs.Round) } - cs.Step = RoundStepPrevote - cs.newStepCh <- cs.getRoundState() + defer func() { + cs.Step = RoundStepPrevote + cs.newStepCh <- cs.getRoundState() + }() // If a block is locked, prevote that. if cs.LockedBlock != nil { @@ -586,8 +592,10 @@ func (cs *ConsensusState) RunActionPrecommit(height uint32, round uint16) { if cs.Height != height || cs.Round != round { Panicf("RunActionPrecommit(%v/%v), expected %v/%v", height, round, cs.Height, cs.Round) } - cs.Step = RoundStepPrecommit - cs.newStepCh <- cs.getRoundState() + defer func() { + cs.Step = RoundStepPrecommit + cs.newStepCh <- cs.getRoundState() + }() hash, partsHeader, ok := cs.Prevotes.TwoThirdsMajority() if !ok { @@ -633,7 +641,6 @@ func (cs *ConsensusState) RunActionPrecommit(height uint32, round uint16) { return } -// XXX Need to send new message to peers to get the right parts. // Enter commit step. See the diagram for details. func (cs *ConsensusState) RunActionCommit(height uint32) { cs.mtx.Lock() @@ -641,6 +648,11 @@ func (cs *ConsensusState) RunActionCommit(height uint32) { if cs.Height != height { Panicf("RunActionCommit(%v), expected %v", height, cs.Height) } + defer func() { + cs.Step = RoundStepCommit + cs.newStepCh <- cs.getRoundState() + }() + // There are two ways to enter: // 1. +2/3 precommits at the end of RoundStepPrecommit // 2. +2/3 commits at any time @@ -651,12 +663,9 @@ func (cs *ConsensusState) RunActionCommit(height uint32) { panic("RunActionCommit() expects +2/3 precommits or commits") } } - cs.Step = RoundStepCommit - cs.newStepCh <- cs.getRoundState() // Clear the Locked* fields and use cs.Proposed* if cs.LockedBlock.HashesTo(hash) { - // XXX maybe just use CommitBlock* instead. Proposals need to be signed... cs.ProposalBlock = cs.LockedBlock cs.ProposalBlockParts = cs.LockedBlockParts cs.LockedBlock = nil @@ -671,11 +680,13 @@ func (cs *ConsensusState) RunActionCommit(height uint32) { // Set up ProposalBlockParts and keep waiting. cs.ProposalBlock = nil cs.ProposalBlockParts = NewPartSetFromHeader(partsHeader) + } else { // We just need to keep waiting. } } else { // We have the block, so save/stage/sign-commit-vote. + cs.processBlockForCommit(cs.ProposalBlock, cs.ProposalBlockParts) } @@ -683,6 +694,7 @@ func (cs *ConsensusState) RunActionCommit(height uint32) { if cs.Commits.HasTwoThirdsMajority() { cs.CommitTime = time.Now() } + } // Returns true if Finalize happened, which increments height && sets @@ -893,7 +905,7 @@ func (cs *ConsensusState) processBlockForCommit(block *Block, blockParts *PartSe } // Save to blockStore - cs.blockStore.SaveBlock(block) + cs.blockStore.SaveBlock(block, blockParts) // Save the state cs.stagedState.Save() diff --git a/consensus/state_test.go b/consensus/state_test.go index 2713407ba..be19f01a6 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -6,7 +6,6 @@ import ( . "github.com/tendermint/tendermint/blocks" . "github.com/tendermint/tendermint/common" - . "github.com/tendermint/tendermint/common/test" db_ "github.com/tendermint/tendermint/db" "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/state" @@ -78,7 +77,7 @@ func TestSetupRound(t *testing.T) { // Setup round 1 (next round) cs.SetupNewRound(1, 1) - <-cs.NewStepCh() // TODO: test this value too. + <-cs.NewStepCh() // Now the commit should be copied over to prevotes and precommits. rs1 := cs.GetRoundState() @@ -150,9 +149,6 @@ func TestRunActionPrecommitCommitFinalize(t *testing.T) { cs.RunActionPropose(1, 0) <-cs.NewStepCh() // TODO: test this value too. - // Test RunActionPrecommit failures: - AssertPanics(t, "Wrong height ", func() { cs.RunActionPrecommit(2, 0) }) - AssertPanics(t, "Wrong round", func() { cs.RunActionPrecommit(1, 1) }) cs.RunActionPrecommit(1, 0) <-cs.NewStepCh() // TODO: test this value too. if cs.Precommits.GetById(0) != nil { @@ -180,10 +176,6 @@ func TestRunActionPrecommitCommitFinalize(t *testing.T) { } checkRoundState(t, cs.GetRoundState(), 1, 0, RoundStepPrecommit) - // Test RunActionCommit failures: - AssertPanics(t, "Wrong height ", func() { cs.RunActionCommit(2) }) - AssertPanics(t, "Wrong round", func() { cs.RunActionCommit(1) }) - // Add at least +2/3 precommits. for i := 0; i < 7; i++ { vote := &Vote{