Browse Source

Store BlockParts && also Validation on its own.

pull/9/head
Jae Kwon 10 years ago
parent
commit
08d1b50570
4 changed files with 218 additions and 73 deletions
  1. +128
    -45
      blocks/store.go
  2. +66
    -8
      consensus/reactor.go
  3. +23
    -11
      consensus/state.go
  4. +1
    -9
      consensus/state_test.go

+ 128
- 45
blocks/store.go View File

@ -4,45 +4,13 @@ import (
"bytes"
"encoding/binary"
"encoding/json"
"io"
. "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common"
db_ "github.com/tendermint/tendermint/db"
)
var (
blockStoreKey = []byte("blockStore")
)
//-----------------------------------------------------------------------------
type BlockStoreJSON struct {
Height uint32
}
func (bsj BlockStoreJSON) Save(db db_.DB) {
bytes, err := json.Marshal(bsj)
if err != nil {
Panicf("Could not marshal state bytes: %v", err)
}
db.Set(blockStoreKey, bytes)
}
func LoadBlockStoreJSON(db db_.DB) BlockStoreJSON {
bytes := db.Get(blockStoreKey)
if bytes == nil {
return BlockStoreJSON{
Height: 0,
}
}
bsj := BlockStoreJSON{}
err := json.Unmarshal(bytes, &bsj)
if err != nil {
Panicf("Could not unmarshal bytes: %X", bytes)
}
return bsj
}
//-----------------------------------------------------------------------------
/*
@ -66,37 +34,152 @@ func (bs *BlockStore) Height() uint32 {
return bs.height
}
func (bs *BlockStore) LoadBlock(height uint32) *Block {
blockBytes := bs.db.Get(calcBlockKey(height))
if blockBytes == nil {
func (bs *BlockStore) GetReader(key []byte) io.Reader {
bytez := bs.db.Get(key)
if bytez == nil {
return nil
}
return bytes.NewReader(bytez)
}
func (bs *BlockStore) LoadBlock(height uint32) *Block {
var n int64
var err error
block := ReadBlock(bytes.NewReader(blockBytes), &n, &err)
meta := ReadBlockMeta(bs.GetReader(calcBlockMetaKey(height)), &n, &err)
if err != nil {
Panicf("Error reading block meta: %v", err)
}
bytez := []byte{}
for i := uint16(0); i < meta.Parts.Total; i++ {
part := bs.LoadBlockPart(height, i)
bytez = append(bytez, part.Bytes...)
}
block := ReadBlock(bytes.NewReader(bytez), &n, &err)
if err != nil {
Panicf("Error reading block: %v", err)
}
return block
}
// Writes are synchronous and atomic.
func (bs *BlockStore) SaveBlock(block *Block) {
func (bs *BlockStore) LoadBlockPart(height uint32, index uint16) *Part {
var n int64
var err error
part := ReadPart(bs.GetReader(calcBlockPartKey(height, index)), &n, &err)
if err != nil {
Panicf("Error reading block part: %v", err)
}
return part
}
func (bs *BlockStore) LoadBlockValidation(height uint32) *Validation {
var n int64
var err error
validation := ReadValidation(bs.GetReader(calcBlockValidationKey(height)), &n, &err)
if err != nil {
Panicf("Error reading validation: %v", err)
}
return &validation
}
func (bs *BlockStore) SaveBlock(block *Block, blockParts *PartSet) {
height := block.Height
if height != bs.height+1 {
Panicf("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.height+1, height)
}
// Save block
blockBytes := BinaryBytes(block)
bs.db.Set(calcBlockKey(height), blockBytes)
if !blockParts.IsComplete() {
Panicf("BlockStore can only save complete block part sets")
}
meta := BlockMeta{Hash: block.Hash(), Parts: blockParts.Header()}
// Save block meta
metaBytes := BinaryBytes(meta)
bs.db.Set(calcBlockMetaKey(height), metaBytes)
// Save block parts
for i := uint16(0); i < blockParts.Total(); i++ {
bs.saveBlockPart(height, i, blockParts.GetPart(i))
}
// Save block validation (duplicate and separate)
validationBytes := BinaryBytes(&block.Validation)
bs.db.Set(calcBlockValidationKey(height), validationBytes)
// Save new BlockStoreJSON descriptor
BlockStoreJSON{Height: height}.Save(bs.db)
}
func (bs *BlockStore) saveBlockPart(height uint32, index uint16, part *Part) {
if height != bs.height+1 {
Panicf("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.height+1, height)
}
partBytes := BinaryBytes(part)
bs.db.Set(calcBlockPartKey(height, index), partBytes)
}
//-----------------------------------------------------------------------------
type BlockMeta struct {
Hash []byte
Parts PartSetHeader
}
func ReadBlockMeta(r io.Reader, n *int64, err *error) BlockMeta {
return BlockMeta{
Hash: ReadByteSlice(r, n, err),
Parts: ReadPartSetHeader(r, n, err),
}
}
func (bm BlockMeta) WriteTo(w io.Writer) (n int64, err error) {
WriteByteSlice(w, bm.Hash, &n, &err)
WriteBinary(w, bm.Parts, &n, &err)
return
}
//-----------------------------------------------------------------------------
func calcBlockKey(height uint32) []byte {
buf := [9]byte{'B'}
binary.BigEndian.PutUint32(buf[1:9], height)
func calcBlockMetaKey(height uint32) []byte {
buf := [5]byte{'H'}
binary.BigEndian.PutUint32(buf[1:5], height)
return buf[:]
}
func calcBlockPartKey(height uint32, partIndex uint16) []byte {
buf := [7]byte{'P'}
binary.BigEndian.PutUint32(buf[1:5], height)
binary.BigEndian.PutUint16(buf[5:7], partIndex)
return buf[:]
}
func calcBlockValidationKey(height uint32) []byte {
buf := [5]byte{'V'}
binary.BigEndian.PutUint32(buf[1:5], height)
return buf[:]
}
//-----------------------------------------------------------------------------
var blockStoreKey = []byte("blockStore")
type BlockStoreJSON struct {
Height uint32
}
func (bsj BlockStoreJSON) Save(db db_.DB) {
bytes, err := json.Marshal(bsj)
if err != nil {
Panicf("Could not marshal state bytes: %v", err)
}
db.Set(blockStoreKey, bytes)
}
func LoadBlockStoreJSON(db db_.DB) BlockStoreJSON {
bytes := db.Get(blockStoreKey)
if bytes == nil {
return BlockStoreJSON{
Height: 0,
}
}
bsj := BlockStoreJSON{}
err := json.Unmarshal(bytes, &bsj)
if err != nil {
Panicf("Could not unmarshal bytes: %X", bytes)
}
return bsj
}

+ 66
- 8
consensus/reactor.go View File

@ -125,6 +125,10 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
msg := msg_.(*NewRoundStepMessage)
ps.ApplyNewRoundStepMessage(msg, rs)
case *CommitMessage:
msg := msg_.(*CommitMessage)
ps.ApplyCommitMessage(msg)
case *HasVotesMessage:
msg := msg_.(*HasVotesMessage)
ps.ApplyHasVotesMessage(msg)
@ -211,7 +215,6 @@ func (conR *ConsensusReactor) SetPrivValidator(priv *PrivValidator) {
//--------------------------------------
// XXX We need to ensure that Proposal* etc are also set appropriately.
// Listens for changes to the ConsensusState.Step by pulling
// on conR.conS.NewStepCh().
func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
@ -229,13 +232,25 @@ func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
timeElapsed := rs.StartTime.Sub(time.Now())
// Broadcast NewRoundStepMessage
msg := &NewRoundStepMessage{
Height: rs.Height,
Round: rs.Round,
Step: rs.Step,
SecondsSinceStartTime: uint32(timeElapsed.Seconds()),
{
msg := &NewRoundStepMessage{
Height: rs.Height,
Round: rs.Round,
Step: rs.Step,
SecondsSinceStartTime: uint32(timeElapsed.Seconds()),
}
conR.sw.Broadcast(StateCh, msg)
}
// If the step is commit, then also broadcast a CommitMessage.
if rs.Step == RoundStepCommit {
msg := &CommitMessage{
Height: rs.Height,
BlockParts: rs.ProposalBlockParts.Header(),
BlockBitArray: rs.ProposalBlockParts.BitArray(),
}
conR.sw.Broadcast(StateCh, msg)
}
conR.sw.Broadcast(StateCh, msg)
}
}
@ -521,6 +536,18 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *Roun
}
}
func (ps *PeerState) ApplyCommitMessage(msg *CommitMessage) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.Height != msg.Height {
return
}
ps.ProposalBlockParts = msg.BlockParts
ps.ProposalBlockBitArray = msg.BlockBitArray
}
func (ps *PeerState) ApplyHasVotesMessage(msg *HasVotesMessage) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
@ -546,7 +573,8 @@ const (
msgTypeUnknown = byte(0x00)
// Messages for communicating state changes
msgTypeNewRoundStep = byte(0x01)
msgTypeHasVotes = byte(0x02)
msgTypeCommit = byte(0x02)
msgTypeHasVotes = byte(0x03)
// Messages of data
msgTypeProposal = byte(0x11)
msgTypePart = byte(0x12) // both block & POL
@ -563,6 +591,8 @@ func decodeMessage(bz []byte) (msgType byte, msg interface{}) {
// Messages for communicating state changes
case msgTypeNewRoundStep:
msg = readNewRoundStepMessage(r, n, err)
case msgTypeCommit:
msg = readCommitMessage(r, n, err)
case msgTypeHasVotes:
msg = readHasVotesMessage(r, n, err)
// Messages of data
@ -611,6 +641,34 @@ func (m *NewRoundStepMessage) String() string {
//-------------------------------------
type CommitMessage struct {
Height uint32
BlockParts PartSetHeader
BlockBitArray BitArray
}
func readCommitMessage(r io.Reader, n *int64, err *error) *CommitMessage {
return &CommitMessage{
Height: ReadUInt32(r, n, err),
BlockParts: ReadPartSetHeader(r, n, err),
BlockBitArray: ReadBitArray(r, n, err),
}
}
func (m *CommitMessage) WriteTo(w io.Writer) (n int64, err error) {
WriteByte(w, msgTypeCommit, &n, &err)
WriteUInt32(w, m.Height, &n, &err)
WriteBinary(w, m.BlockParts, &n, &err)
WriteBinary(w, m.BlockBitArray, &n, &err)
return
}
func (m *CommitMessage) String() string {
return fmt.Sprintf("[Commit %v/%v/%v]", m.Height, m.BlockParts, m.BlockBitArray)
}
//-------------------------------------
type HasVotesMessage struct {
Height uint32
Round uint16


+ 23
- 11
consensus/state.go View File

@ -416,6 +416,7 @@ func (cs *ConsensusState) updateToState(state *state.State) {
}
}
// After the call cs.Step becomes RoundStepNewRound.
func (cs *ConsensusState) setupNewRound(round uint16) {
// Sanity check
if round == 0 {
@ -461,6 +462,7 @@ func (cs *ConsensusState) SetupNewRound(height uint32, desiredRound uint16) bool
return false
}
cs.setupNewRound(desiredRound)
// c.Step is now RoundStepNewRound
cs.newStepCh <- cs.getRoundState()
return true
}
@ -471,8 +473,10 @@ func (cs *ConsensusState) RunActionPropose(height uint32, round uint16) {
if cs.Height != height || cs.Round != round {
return
}
cs.Step = RoundStepPropose
cs.newStepCh <- cs.getRoundState()
defer func() {
cs.Step = RoundStepPropose
cs.newStepCh <- cs.getRoundState()
}()
// Nothing to do if it's not our turn.
if cs.PrivValidator == nil || cs.Validators.Proposer().Id != cs.PrivValidator.Id {
@ -550,8 +554,10 @@ func (cs *ConsensusState) RunActionPrevote(height uint32, round uint16) {
if cs.Height != height || cs.Round != round {
Panicf("RunActionPrevote(%v/%v), expected %v/%v", height, round, cs.Height, cs.Round)
}
cs.Step = RoundStepPrevote
cs.newStepCh <- cs.getRoundState()
defer func() {
cs.Step = RoundStepPrevote
cs.newStepCh <- cs.getRoundState()
}()
// If a block is locked, prevote that.
if cs.LockedBlock != nil {
@ -586,8 +592,10 @@ func (cs *ConsensusState) RunActionPrecommit(height uint32, round uint16) {
if cs.Height != height || cs.Round != round {
Panicf("RunActionPrecommit(%v/%v), expected %v/%v", height, round, cs.Height, cs.Round)
}
cs.Step = RoundStepPrecommit
cs.newStepCh <- cs.getRoundState()
defer func() {
cs.Step = RoundStepPrecommit
cs.newStepCh <- cs.getRoundState()
}()
hash, partsHeader, ok := cs.Prevotes.TwoThirdsMajority()
if !ok {
@ -633,7 +641,6 @@ func (cs *ConsensusState) RunActionPrecommit(height uint32, round uint16) {
return
}
// XXX Need to send new message to peers to get the right parts.
// Enter commit step. See the diagram for details.
func (cs *ConsensusState) RunActionCommit(height uint32) {
cs.mtx.Lock()
@ -641,6 +648,11 @@ func (cs *ConsensusState) RunActionCommit(height uint32) {
if cs.Height != height {
Panicf("RunActionCommit(%v), expected %v", height, cs.Height)
}
defer func() {
cs.Step = RoundStepCommit
cs.newStepCh <- cs.getRoundState()
}()
// There are two ways to enter:
// 1. +2/3 precommits at the end of RoundStepPrecommit
// 2. +2/3 commits at any time
@ -651,12 +663,9 @@ func (cs *ConsensusState) RunActionCommit(height uint32) {
panic("RunActionCommit() expects +2/3 precommits or commits")
}
}
cs.Step = RoundStepCommit
cs.newStepCh <- cs.getRoundState()
// Clear the Locked* fields and use cs.Proposed*
if cs.LockedBlock.HashesTo(hash) {
// XXX maybe just use CommitBlock* instead. Proposals need to be signed...
cs.ProposalBlock = cs.LockedBlock
cs.ProposalBlockParts = cs.LockedBlockParts
cs.LockedBlock = nil
@ -671,11 +680,13 @@ func (cs *ConsensusState) RunActionCommit(height uint32) {
// Set up ProposalBlockParts and keep waiting.
cs.ProposalBlock = nil
cs.ProposalBlockParts = NewPartSetFromHeader(partsHeader)
} else {
// We just need to keep waiting.
}
} else {
// We have the block, so save/stage/sign-commit-vote.
cs.processBlockForCommit(cs.ProposalBlock, cs.ProposalBlockParts)
}
@ -683,6 +694,7 @@ func (cs *ConsensusState) RunActionCommit(height uint32) {
if cs.Commits.HasTwoThirdsMajority() {
cs.CommitTime = time.Now()
}
}
// Returns true if Finalize happened, which increments height && sets
@ -893,7 +905,7 @@ func (cs *ConsensusState) processBlockForCommit(block *Block, blockParts *PartSe
}
// Save to blockStore
cs.blockStore.SaveBlock(block)
cs.blockStore.SaveBlock(block, blockParts)
// Save the state
cs.stagedState.Save()


+ 1
- 9
consensus/state_test.go View File

@ -6,7 +6,6 @@ import (
. "github.com/tendermint/tendermint/blocks"
. "github.com/tendermint/tendermint/common"
. "github.com/tendermint/tendermint/common/test"
db_ "github.com/tendermint/tendermint/db"
"github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/state"
@ -78,7 +77,7 @@ func TestSetupRound(t *testing.T) {
// Setup round 1 (next round)
cs.SetupNewRound(1, 1)
<-cs.NewStepCh() // TODO: test this value too.
<-cs.NewStepCh()
// Now the commit should be copied over to prevotes and precommits.
rs1 := cs.GetRoundState()
@ -150,9 +149,6 @@ func TestRunActionPrecommitCommitFinalize(t *testing.T) {
cs.RunActionPropose(1, 0)
<-cs.NewStepCh() // TODO: test this value too.
// Test RunActionPrecommit failures:
AssertPanics(t, "Wrong height ", func() { cs.RunActionPrecommit(2, 0) })
AssertPanics(t, "Wrong round", func() { cs.RunActionPrecommit(1, 1) })
cs.RunActionPrecommit(1, 0)
<-cs.NewStepCh() // TODO: test this value too.
if cs.Precommits.GetById(0) != nil {
@ -180,10 +176,6 @@ func TestRunActionPrecommitCommitFinalize(t *testing.T) {
}
checkRoundState(t, cs.GetRoundState(), 1, 0, RoundStepPrecommit)
// Test RunActionCommit failures:
AssertPanics(t, "Wrong height ", func() { cs.RunActionCommit(2) })
AssertPanics(t, "Wrong round", func() { cs.RunActionCommit(1) })
// Add at least +2/3 precommits.
for i := 0; i < 7; i++ {
vote := &Vote{


Loading…
Cancel
Save