Browse Source

fix tests

pull/9/head
Jae Kwon 10 years ago
parent
commit
e42771e36f
20 changed files with 1045 additions and 794 deletions
  1. +2
    -1
      README.md
  2. +34
    -36
      blocks/block.go
  3. +8
    -3
      blocks/block_test.go
  4. +75
    -31
      blocks/part_set.go
  5. +7
    -7
      blocks/part_set_test.go
  6. +44
    -0
      blocks/signature.go
  7. +19
    -16
      blocks/vote.go
  8. +7
    -1
      common/bit_array.go
  9. +7
    -0
      common/byteslice.go
  10. +29
    -25
      consensus/pol.go
  11. +4
    -8
      consensus/pol_test.go
  12. +19
    -31
      consensus/proposal.go
  13. +78
    -348
      consensus/reactor.go
  14. +519
    -168
      consensus/state.go
  15. +43
    -45
      consensus/state_test.go
  16. +75
    -48
      consensus/vote_set.go
  17. +18
    -18
      consensus/vote_set_test.go
  18. +50
    -5
      state/state.go
  19. +1
    -0
      state/state_test.go
  20. +6
    -3
      state/validator_set.go

+ 2
- 1
README.md View File

@ -11,7 +11,8 @@ TenderMint - proof of concept
### Development Status
* Testnet *pending*
* Bootstrapping *now*
* Blockchain catchup *now*
* Bootstrapping *complete*
* Mempool *complete*
* Consensus *complete*
* Block propagation *sidelined*


+ 34
- 36
blocks/block.go View File

@ -55,6 +55,7 @@ func (b *Block) ValidateBasic(lastBlockHeight uint32, lastBlockHash []byte) erro
if !bytes.Equal(b.Header.LastBlockHash, lastBlockHash) {
return ErrBlockInvalidLastBlockHash
}
// XXX We need to validate LastBlockParts too.
// XXX more validation
return nil
}
@ -112,12 +113,13 @@ func (b *Block) Description() string {
//-----------------------------------------------------------------------------
type Header struct {
Network string
Height uint32
Time time.Time
Fees uint64
LastBlockHash []byte
StateHash []byte
Network string
Height uint32
Time time.Time
Fees uint64
LastBlockHash []byte
LastBlockParts PartSetHeader
StateHash []byte
// Volatile
hash []byte
@ -128,12 +130,13 @@ func ReadHeader(r io.Reader, n *int64, err *error) (h Header) {
return Header{}
}
return Header{
Network: ReadString(r, n, err),
Height: ReadUInt32(r, n, err),
Time: ReadTime(r, n, err),
Fees: ReadUInt64(r, n, err),
LastBlockHash: ReadByteSlice(r, n, err),
StateHash: ReadByteSlice(r, n, err),
Network: ReadString(r, n, err),
Height: ReadUInt32(r, n, err),
Time: ReadTime(r, n, err),
Fees: ReadUInt64(r, n, err),
LastBlockHash: ReadByteSlice(r, n, err),
LastBlockParts: ReadPartSetHeader(r, n, err),
StateHash: ReadByteSlice(r, n, err),
}
}
@ -143,6 +146,7 @@ func (h *Header) WriteTo(w io.Writer) (n int64, err error) {
WriteTime(w, h.Time, &n, &err)
WriteUInt64(w, h.Fees, &n, &err)
WriteByteSlice(w, h.LastBlockHash, &n, &err)
WriteBinary(w, h.LastBlockParts, &n, &err)
WriteByteSlice(w, h.StateHash, &n, &err)
return
}
@ -161,18 +165,20 @@ func (h *Header) Hash() []byte {
func (h *Header) StringWithIndent(indent string) string {
return fmt.Sprintf(`Header{
%s Network: %v
%s Height: %v
%s Time: %v
%s Fees: %v
%s LastBlockHash: %X
%s StateHash: %X
%s Network: %v
%s Height: %v
%s Time: %v
%s Fees: %v
%s LastBlockHash: %X
%s LastBlockParts: %v
%s StateHash: %X
%s}#%X`,
indent, h.Network,
indent, h.Height,
indent, h.Time,
indent, h.Fees,
indent, h.LastBlockHash,
indent, h.LastBlockParts,
indent, h.StateHash,
indent, h.hash)
}
@ -180,36 +186,28 @@ func (h *Header) StringWithIndent(indent string) string {
//-----------------------------------------------------------------------------
type Validation struct {
Signatures []Signature
Commits []RoundSignature
// Volatile
hash []byte
}
func ReadValidation(r io.Reader, n *int64, err *error) Validation {
numSigs := ReadUInt32(r, n, err)
sigs := make([]Signature, 0, numSigs)
for i := uint32(0); i < numSigs; i++ {
sigs = append(sigs, ReadSignature(r, n, err))
}
return Validation{
Signatures: sigs,
Commits: ReadRoundSignatures(r, n, err),
}
}
func (v *Validation) WriteTo(w io.Writer) (n int64, err error) {
WriteUInt32(w, uint32(len(v.Signatures)), &n, &err)
for _, sig := range v.Signatures {
WriteBinary(w, sig, &n, &err)
}
WriteRoundSignatures(w, v.Commits, &n, &err)
return
}
func (v *Validation) Hash() []byte {
if v.hash == nil {
bs := make([]Binary, len(v.Signatures))
for i, sig := range v.Signatures {
bs[i] = Binary(sig)
bs := make([]Binary, len(v.Commits))
for i, commit := range v.Commits {
bs[i] = Binary(commit)
}
v.hash = merkle.HashFromBinaries(bs)
}
@ -217,14 +215,14 @@ func (v *Validation) Hash() []byte {
}
func (v *Validation) StringWithIndent(indent string) string {
sigStrings := make([]string, len(v.Signatures))
for i, sig := range v.Signatures {
sigStrings[i] = sig.String()
commitStrings := make([]string, len(v.Commits))
for i, commit := range v.Commits {
commitStrings[i] = commit.String()
}
return fmt.Sprintf(`Validation{
%s %v
%s}#%X`,
indent, strings.Join(sigStrings, "\n"+indent+" "),
indent, strings.Join(commitStrings, "\n"+indent+" "),
indent, v.hash)
}


+ 8
- 3
blocks/block_test.go View File

@ -11,6 +11,10 @@ func randSig() Signature {
return Signature{RandUInt64Exp(), RandBytes(32)}
}
func randRoundSig() RoundSignature {
return RoundSignature{RandUInt16(), randSig()}
}
func randBaseTx() BaseTx {
return BaseTx{0, RandUInt64Exp(), randSig()}
}
@ -64,7 +68,7 @@ func randBlock() *Block {
StateHash: RandBytes(32),
},
Validation: Validation{
Signatures: []Signature{randSig(), randSig()},
Commits: []RoundSignature{randRoundSig(), randRoundSig()},
},
Data: Data{
Txs: []Tx{sendTx, nameTx, bondTx, unbondTx, dupeoutTx},
@ -99,8 +103,9 @@ func TestBlock(t *testing.T) {
expectChange(func(b *Block) { b.Header.Time = RandTime() }, "Expected hash to depend on Time")
expectChange(func(b *Block) { b.Header.LastBlockHash = RandBytes(32) }, "Expected hash to depend on LastBlockHash")
expectChange(func(b *Block) { b.Header.StateHash = RandBytes(32) }, "Expected hash to depend on StateHash")
expectChange(func(b *Block) { b.Validation.Signatures[0].SignerId += 1 }, "Expected hash to depend on Validation Signature")
expectChange(func(b *Block) { b.Validation.Signatures[0].Bytes = RandBytes(32) }, "Expected hash to depend on Validation Signature")
expectChange(func(b *Block) { b.Validation.Commits[0].Round += 1 }, "Expected hash to depend on Validation Commit")
expectChange(func(b *Block) { b.Validation.Commits[0].SignerId += 1 }, "Expected hash to depend on Validation Commit")
expectChange(func(b *Block) { b.Validation.Commits[0].Bytes = RandBytes(32) }, "Expected hash to depend on Validation Commit")
expectChange(func(b *Block) { b.Data.Txs[0].(*SendTx).Signature.SignerId += 1 }, "Expected hash to depend on tx Signature")
expectChange(func(b *Block) { b.Data.Txs[0].(*SendTx).Amount += 1 }, "Expected hash to depend on send tx Amount")


consensus/part_set.go → blocks/part_set.go View File


consensus/part_set_test.go → blocks/part_set_test.go View File


+ 44
- 0
blocks/signature.go View File

@ -60,3 +60,47 @@ func WriteSignatures(w io.Writer, sigs []Signature, n *int64, err *error) {
}
}
}
//-----------------------------------------------------------------------------
type RoundSignature struct {
Round uint16
Signature
}
func ReadRoundSignature(r io.Reader, n *int64, err *error) RoundSignature {
return RoundSignature{
ReadUInt16(r, n, err),
ReadSignature(r, n, err),
}
}
func (rsig RoundSignature) WriteTo(w io.Writer) (n int64, err error) {
WriteUInt16(w, rsig.Round, &n, &err)
WriteBinary(w, rsig.Signature, &n, &err)
return
}
func (rsig RoundSignature) IsZero() bool {
return rsig.Round == 0 && rsig.SignerId == 0 && len(rsig.Bytes) == 0
}
//-------------------------------------
func ReadRoundSignatures(r io.Reader, n *int64, err *error) (rsigs []RoundSignature) {
length := ReadUInt32(r, n, err)
for i := uint32(0); i < length; i++ {
rsigs = append(rsigs, ReadRoundSignature(r, n, err))
}
return
}
func WriteRoundSignatures(w io.Writer, rsigs []RoundSignature, n *int64, err *error) {
WriteUInt32(w, uint32(len(rsigs)), n, err)
for _, rsig := range rsigs {
WriteBinary(w, rsig, n, err)
if *err != nil {
return
}
}
}

+ 19
- 16
blocks/vote.go View File

@ -6,6 +6,7 @@ import (
"io"
. "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common"
)
const (
@ -24,20 +25,22 @@ var (
// Represents a prevote, precommit, or commit vote for proposals.
type Vote struct {
Height uint32
Round uint16
Type byte
BlockHash []byte // empty if vote is nil.
Height uint32
Round uint16
Type byte
BlockHash []byte // empty if vote is nil.
BlockParts PartSetHeader // zero if vote is nil.
Signature
}
func ReadVote(r io.Reader, n *int64, err *error) *Vote {
return &Vote{
Height: ReadUInt32(r, n, err),
Round: ReadUInt16(r, n, err),
Type: ReadByte(r, n, err),
BlockHash: ReadByteSlice(r, n, err),
Signature: ReadSignature(r, n, err),
Height: ReadUInt32(r, n, err),
Round: ReadUInt16(r, n, err),
Type: ReadByte(r, n, err),
BlockHash: ReadByteSlice(r, n, err),
BlockParts: ReadPartSetHeader(r, n, err),
Signature: ReadSignature(r, n, err),
}
}
@ -46,6 +49,7 @@ func (v *Vote) WriteTo(w io.Writer) (n int64, err error) {
WriteUInt16(w, v.Round, &n, &err)
WriteByte(w, v.Type, &n, &err)
WriteByteSlice(w, v.BlockHash, &n, &err)
WriteBinary(w, v.BlockParts, &n, &err)
WriteBinary(w, v.Signature, &n, &err)
return
}
@ -59,18 +63,17 @@ func (v *Vote) SetSignature(sig Signature) {
}
func (v *Vote) String() string {
blockHash := v.BlockHash
if len(v.BlockHash) == 0 {
blockHash = make([]byte, 6) // for printing
}
var typeString string
switch v.Type {
case VoteTypePrevote:
return fmt.Sprintf("Prevote{%v/%v:%X:%v}", v.Height, v.Round, blockHash, v.SignerId)
typeString = "Prevote"
case VoteTypePrecommit:
return fmt.Sprintf("Precommit{%v/%v:%X:%v}", v.Height, v.Round, blockHash, v.SignerId)
typeString = "Precommit"
case VoteTypeCommit:
return fmt.Sprintf("Commit{%v/%v:%X:%v}", v.Height, v.Round, blockHash, v.SignerId)
typeString = "Commit"
default:
panic("Unknown vote type")
}
return fmt.Sprintf("%v{%v/%v:%X:%v:%v}", typeString, v.Height, v.Round, Fingerprint(v.BlockHash), v.BlockParts, v.SignerId)
}

+ 7
- 1
common/bit_array.go View File

@ -21,6 +21,9 @@ func NewBitArray(bits uint) BitArray {
func ReadBitArray(r io.Reader, n *int64, err *error) BitArray {
bits := ReadUVarInt(r, n, err)
if bits == 0 {
return BitArray{}
}
elemsWritten := ReadUVarInt(r, n, err)
if *err != nil {
return BitArray{}
@ -36,6 +39,10 @@ func ReadBitArray(r io.Reader, n *int64, err *error) BitArray {
}
func (bA BitArray) WriteTo(w io.Writer) (n int64, err error) {
WriteUVarInt(w, bA.bits, &n, &err)
if bA.bits == 0 {
return
}
// Count the last element > 0.
elemsToWrite := 0
for i, elem := range bA.elems {
@ -43,7 +50,6 @@ func (bA BitArray) WriteTo(w io.Writer) (n int64, err error) {
elemsToWrite = i + 1
}
}
WriteUVarInt(w, bA.bits, &n, &err)
WriteUVarInt(w, uint(elemsToWrite), &n, &err)
for i, elem := range bA.elems {
if i >= elemsToWrite {


+ 7
- 0
common/byteslice.go View File

@ -0,0 +1,7 @@
package common
func Fingerprint(bytez []byte) []byte {
fingerprint := make([]byte, 6)
copy(fingerprint, bytez)
return fingerprint
}

+ 29
- 25
consensus/pol.go View File

@ -13,22 +13,22 @@ import (
// Proof of lock.
// +2/3 of validators' prevotes for a given blockhash (or nil)
type POL struct {
Height uint32
Round uint16
BlockHash []byte // Could be nil, which makes this a proof of unlock.
Votes []Signature // Vote signatures for height/round/hash
Commits []Signature // Commit signatures for height/hash
CommitRounds []uint16 // Rounds of the commits, less than POL.Round.
Height uint32
Round uint16
BlockHash []byte // Could be nil, which makes this a proof of unlock.
BlockParts PartSetHeader // When BlockHash is nil, this is zero.
Votes []Signature // Vote signatures for height/round/hash
Commits []RoundSignature // Commit signatures for height/hash
}
func ReadPOL(r io.Reader, n *int64, err *error) *POL {
return &POL{
Height: ReadUInt32(r, n, err),
Round: ReadUInt16(r, n, err),
BlockHash: ReadByteSlice(r, n, err),
Votes: ReadSignatures(r, n, err),
Commits: ReadSignatures(r, n, err),
CommitRounds: ReadUInt16s(r, n, err),
Height: ReadUInt32(r, n, err),
Round: ReadUInt16(r, n, err),
BlockHash: ReadByteSlice(r, n, err),
BlockParts: ReadPartSetHeader(r, n, err),
Votes: ReadSignatures(r, n, err),
Commits: ReadRoundSignatures(r, n, err),
}
}
@ -36,9 +36,9 @@ func (pol *POL) WriteTo(w io.Writer) (n int64, err error) {
WriteUInt32(w, pol.Height, &n, &err)
WriteUInt16(w, pol.Round, &n, &err)
WriteByteSlice(w, pol.BlockHash, &n, &err)
WriteBinary(w, pol.BlockParts, &n, &err)
WriteSignatures(w, pol.Votes, &n, &err)
WriteSignatures(w, pol.Commits, &n, &err)
WriteUInt16s(w, pol.CommitRounds, &n, &err)
WriteRoundSignatures(w, pol.Commits, &n, &err)
return
}
@ -46,8 +46,11 @@ func (pol *POL) WriteTo(w io.Writer) (n int64, err error) {
func (pol *POL) Verify(vset *state.ValidatorSet) error {
talliedVotingPower := uint64(0)
voteDoc := BinaryBytes(&Vote{Height: pol.Height, Round: pol.Round,
Type: VoteTypePrevote, BlockHash: pol.BlockHash})
voteDoc := BinaryBytes(&Vote{
Height: pol.Height, Round: pol.Round, Type: VoteTypePrevote,
BlockHash: pol.BlockHash,
BlockParts: pol.BlockParts,
})
seenValidators := map[uint64]struct{}{}
for _, sig := range pol.Votes {
@ -69,8 +72,9 @@ func (pol *POL) Verify(vset *state.ValidatorSet) error {
talliedVotingPower += val.VotingPower
}
for i, sig := range pol.Commits {
round := pol.CommitRounds[i]
for _, rsig := range pol.Commits {
round := rsig.Round
sig := rsig.Signature
// Validate
if _, seen := seenValidators[sig.SignerId]; seen {
@ -84,8 +88,11 @@ func (pol *POL) Verify(vset *state.ValidatorSet) error {
return Errorf("Invalid commit round %v for POL %v", round, pol)
}
commitDoc := BinaryBytes(&Vote{Height: pol.Height, Round: round,
Type: VoteTypeCommit, BlockHash: pol.BlockHash}) // TODO cache
commitDoc := BinaryBytes(&Vote{
Height: pol.Height, Round: round, Type: VoteTypeCommit,
BlockHash: pol.BlockHash,
BlockParts: pol.BlockParts,
})
if !val.VerifyBytes(commitDoc, sig) {
return Errorf("Invalid signature for commit %v for POL %v", sig, pol)
}
@ -108,10 +115,7 @@ func (pol *POL) Description() string {
if pol == nil {
return "nil-POL"
} else {
blockHash := pol.BlockHash
if blockHash != nil {
blockHash = blockHash[:6]
}
return fmt.Sprintf("POL{H:%v R:%v BH:%X}", pol.Height, pol.Round, blockHash)
return fmt.Sprintf("POL{H:%v R:%v BH:%X}", pol.Height, pol.Round,
Fingerprint(pol.BlockHash), pol.BlockParts)
}
}

+ 4
- 8
consensus/pol_test.go View File

@ -78,8 +78,7 @@ func TestVerifyCommits(t *testing.T) {
}
for i := 0; i < 7; i++ {
privAccounts[i].Sign(vote)
pol.Commits = append(pol.Commits, vote.Signature)
pol.CommitRounds = append(pol.CommitRounds, 1)
pol.Commits = append(pol.Commits, RoundSignature{1, vote.Signature})
}
// Check that validation succeeds.
@ -103,8 +102,7 @@ func TestVerifyInvalidCommits(t *testing.T) {
privAccounts[i].Sign(vote)
// Mutate the signature.
vote.Signature.Bytes[0] += byte(0x01)
pol.Commits = append(pol.Commits, vote.Signature)
pol.CommitRounds = append(pol.CommitRounds, 1)
pol.Commits = append(pol.Commits, RoundSignature{1, vote.Signature})
}
// Check that validation fails.
@ -126,8 +124,7 @@ func TestVerifyInvalidCommitRounds(t *testing.T) {
}
for i := 0; i < 7; i++ {
privAccounts[i].Sign(vote)
pol.Commits = append(pol.Commits, vote.Signature)
pol.CommitRounds = append(pol.CommitRounds, 2)
pol.Commits = append(pol.Commits, RoundSignature{2, vote.Signature})
}
// Check that validation fails.
@ -149,8 +146,7 @@ func TestVerifyInvalidCommitRounds2(t *testing.T) {
}
for i := 0; i < 7; i++ {
privAccounts[i].Sign(vote)
pol.Commits = append(pol.Commits, vote.Signature)
pol.CommitRounds = append(pol.CommitRounds, 3)
pol.Commits = append(pol.Commits, RoundSignature{3, vote.Signature})
}
// Check that validation fails.


+ 19
- 31
consensus/proposal.go View File

@ -15,48 +15,38 @@ var (
)
type Proposal struct {
Height uint32
Round uint16
BlockPartsTotal uint16
BlockPartsHash []byte
POLPartsTotal uint16
POLPartsHash []byte
Signature Signature
Height uint32
Round uint16
BlockParts PartSetHeader
POLParts PartSetHeader
Signature Signature
}
func NewProposal(height uint32, round uint16,
blockPartsTotal uint16, blockPartsHash []byte,
polPartsTotal uint16, polPartsHash []byte) *Proposal {
func NewProposal(height uint32, round uint16, blockParts, polParts PartSetHeader) *Proposal {
return &Proposal{
Height: height,
Round: round,
BlockPartsTotal: blockPartsTotal,
BlockPartsHash: blockPartsHash,
POLPartsTotal: polPartsTotal,
POLPartsHash: polPartsHash,
Height: height,
Round: round,
BlockParts: blockParts,
POLParts: polParts,
}
}
func ReadProposal(r io.Reader, n *int64, err *error) *Proposal {
return &Proposal{
Height: ReadUInt32(r, n, err),
Round: ReadUInt16(r, n, err),
BlockPartsTotal: ReadUInt16(r, n, err),
BlockPartsHash: ReadByteSlice(r, n, err),
POLPartsTotal: ReadUInt16(r, n, err),
POLPartsHash: ReadByteSlice(r, n, err),
Signature: ReadSignature(r, n, err),
Height: ReadUInt32(r, n, err),
Round: ReadUInt16(r, n, err),
BlockParts: ReadPartSetHeader(r, n, err),
POLParts: ReadPartSetHeader(r, n, err),
Signature: ReadSignature(r, n, err),
}
}
func (p *Proposal) WriteTo(w io.Writer) (n int64, err error) {
WriteUInt32(w, p.Height, &n, &err)
WriteUInt16(w, p.Round, &n, &err)
WriteUInt16(w, p.BlockPartsTotal, &n, &err)
WriteByteSlice(w, p.BlockPartsHash, &n, &err)
WriteUInt16(w, p.POLPartsTotal, &n, &err)
WriteByteSlice(w, p.POLPartsHash, &n, &err)
WriteBinary(w, p.BlockParts, &n, &err)
WriteBinary(w, p.POLParts, &n, &err)
WriteBinary(w, p.Signature, &n, &err)
return
}
@ -70,8 +60,6 @@ func (p *Proposal) SetSignature(sig Signature) {
}
func (p *Proposal) String() string {
return fmt.Sprintf("Proposal{%v/%v %X/%v %X/%v %v}", p.Height, p.Round,
p.BlockPartsHash, p.BlockPartsTotal,
p.POLPartsHash, p.POLPartsTotal,
p.Signature)
return fmt.Sprintf("Proposal{%v/%v %v %v %v}", p.Height, p.Round,
p.BlockParts, p.POLParts, p.Signature)
}

+ 78
- 348
consensus/reactor.go View File

@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"math"
"sync"
"sync/atomic"
"time"
@ -25,98 +24,26 @@ const (
peerStateKey = "ConsensusReactor.peerState"
voteTypeNil = byte(0x00)
voteTypeBlock = byte(0x01)
roundDuration0 = 60 * time.Second // The first round is 60 seconds long.
roundDurationDelta = 15 * time.Second // Each successive round lasts 15 seconds longer.
roundDeadlinePrevote = float64(1.0 / 3.0) // When the prevote is due.
roundDeadlinePrecommit = float64(2.0 / 3.0) // When the precommit vote is due.
finalizeDuration = roundDuration0 / 3 // The time to wait between commitTime and startTime of next consensus rounds.
peerGossipSleepDuration = 50 * time.Millisecond // Time to sleep if there's nothing to send.
hasVotesThreshold = 50 // After this many new votes we'll send a HasVotesMessage.
)
//-----------------------------------------------------------------------------
// total duration of given round
func calcRoundDuration(round uint16) time.Duration {
return roundDuration0 + roundDurationDelta*time.Duration(round)
}
// startTime is when round zero started.
func calcRoundStartTime(round uint16, startTime time.Time) time.Time {
return startTime.Add(roundDuration0*time.Duration(round) +
roundDurationDelta*(time.Duration((int64(round)*int64(round)-int64(round))/2)))
}
// calculates the current round given startTime of round zero.
// NOTE: round is zero if startTime is in the future.
func calcRound(startTime time.Time) uint16 {
now := time.Now()
if now.Before(startTime) {
return 0
}
// Start + D_0 * R + D_delta * (R^2 - R)/2 <= Now; find largest integer R.
// D_delta * R^2 + (2D_0 - D_delta) * R + 2(Start - Now) <= 0.
// AR^2 + BR + C <= 0; A = D_delta, B = (2_D0 - D_delta), C = 2(Start - Now).
// R = Floor((-B + Sqrt(B^2 - 4AC))/2A)
A := float64(roundDurationDelta)
B := 2.0*float64(roundDuration0) - float64(roundDurationDelta)
C := 2.0 * float64(startTime.Sub(now))
R := math.Floor((-B + math.Sqrt(B*B-4.0*A*C)) / (2 * A))
if math.IsNaN(R) {
panic("Could not calc round, should not happen")
}
if R > math.MaxInt16 {
Panicf("Could not calc round, round overflow: %v", R)
}
if R < 0 {
return 0
}
return uint16(R)
}
// convenience
// NOTE: elapsedRatio can be negative if startTime is in the future.
func calcRoundInfo(startTime time.Time) (round uint16, roundStartTime time.Time, roundDuration time.Duration,
roundElapsed time.Duration, elapsedRatio float64) {
round = calcRound(startTime)
roundStartTime = calcRoundStartTime(round, startTime)
roundDuration = calcRoundDuration(round)
roundElapsed = time.Now().Sub(roundStartTime)
elapsedRatio = float64(roundElapsed) / float64(roundDuration)
return
}
//-----------------------------------------------------------------------------
type RoundAction struct {
Height uint32 // The block height for which consensus is reaching for.
Round uint16 // The round number at given height.
Action RoundActionType // Action to perform.
}
//-----------------------------------------------------------------------------
type ConsensusReactor struct {
sw *p2p.Switch
quit chan struct{}
started uint32
stopped uint32
quit chan struct{}
conS *ConsensusState
doActionCh chan RoundAction
conS *ConsensusState
}
func NewConsensusReactor(blockStore *BlockStore, mempool *mempool.Mempool, state *state.State) *ConsensusReactor {
conS := NewConsensusState(state, blockStore, mempool)
conR := &ConsensusReactor{
quit: make(chan struct{}),
conS: conS,
doActionCh: make(chan RoundAction, 1),
conS: conS,
}
return conR
}
@ -126,7 +53,8 @@ func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
if atomic.CompareAndSwapUint32(&conR.started, 0, 1) {
log.Info("Starting ConsensusReactor")
conR.sw = sw
go conR.stepTransitionRoutine()
conR.conS.Start()
go conR.broadcastNewRoundStepRoutine()
}
}
@ -134,10 +62,15 @@ func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
func (conR *ConsensusReactor) Stop() {
if atomic.CompareAndSwapUint32(&conR.stopped, 0, 1) {
log.Info("Stopping ConsensusReactor")
conR.conS.Stop()
close(conR.quit)
}
}
func (conR *ConsensusReactor) IsStopped() bool {
return atomic.LoadUint32(&conR.stopped) == 1
}
// Implements Reactor
func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
// TODO optimize
@ -238,7 +171,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
return
}
ps.EnsureVoteBitArrays(rs.Height, rs.Round, rs.Validators.Size())
ps.SetHasVote(rs.Height, rs.Round, vote.Type, index)
ps.SetHasVote(rs.Height, rs.Round, index, vote)
added, err := conR.conS.AddVote(vote)
if err != nil {
log.Warning("Error attempting to add vote: %v", err)
@ -257,13 +190,6 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
}
conR.sw.Broadcast(StateCh, msg)
}
// Maybe run RoundActionCommitWait.
if vote.Type == VoteTypeCommit &&
rs.Commits.HasTwoThirdsMajority() &&
rs.Step < RoundStepCommitWait {
// NOTE: Do not call RunAction*() methods here directly.
conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundActionCommitWait}
}
}
default:
@ -283,211 +209,36 @@ func (conR *ConsensusReactor) SetPrivValidator(priv *PrivValidator) {
conR.conS.SetPrivValidator(priv)
}
func (conR *ConsensusReactor) IsStopped() bool {
return atomic.LoadUint32(&conR.stopped) == 1
}
//--------------------------------------
// Source of all round state transitions (and votes).
func (conR *ConsensusReactor) stepTransitionRoutine() {
// Schedule the next action by pushing a RoundAction{} to conR.doActionCh
// when it is due.
scheduleNextAction := func() {
rs := conR.conS.GetRoundState()
round, roundStartTime, roundDuration, _, elapsedRatio := calcRoundInfo(rs.StartTime)
log.Debug("Called scheduleNextAction. round:%v roundStartTime:%v elapsedRatio:%v",
round, roundStartTime, elapsedRatio)
go func() {
switch rs.Step {
case RoundStepStart:
// It's a new RoundState.
if elapsedRatio < 0 {
// startTime is in the future.
time.Sleep(time.Duration((-1.0 * elapsedRatio) * float64(roundDuration)))
}
conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPropose}
case RoundStepPropose:
// Wake up when it's time to vote.
time.Sleep(time.Duration((roundDeadlinePrevote - elapsedRatio) * float64(roundDuration)))
conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPrevote}
case RoundStepPrevote:
// Wake up when it's time to precommit.
time.Sleep(time.Duration((roundDeadlinePrecommit - elapsedRatio) * float64(roundDuration)))
conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPrecommit}
case RoundStepPrecommit:
// Wake up when the round is over.
time.Sleep(time.Duration((1.0 - elapsedRatio) * float64(roundDuration)))
conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundActionTryCommit}
case RoundStepCommit:
panic("Should not happen: RoundStepCommit waits until +2/3 commits.")
case RoundStepCommitWait:
// Wake up when it's time to finalize commit.
if rs.CommitTime.IsZero() {
panic("RoundStepCommitWait requires rs.CommitTime")
}
time.Sleep(rs.CommitTime.Sub(time.Now()) + finalizeDuration)
conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundActionFinalize}
default:
panic("Should not happen")
}
}()
}
scheduleNextAction()
// NOTE: All ConsensusState.RunAction*() calls must come from here.
// Since only one routine calls them, it is safe to assume that
// the RoundState Height/Round/Step won't change concurrently.
// However, other fields like Proposal could change, due to gossip.
ACTION_LOOP:
// XXX We need to ensure that Proposal* etc are also set appropriately.
// Listens for changes to the ConsensusState.Step by pulling
// on conR.conS.NewStepCh().
func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
for {
roundAction := <-conR.doActionCh
height := roundAction.Height
round := roundAction.Round
action := roundAction.Action
rs := conR.conS.GetRoundState()
log.Info("Running round action A:%X %v", action, rs.Description())
// NOTE: This function should only be called
// when the cs.Height is still rs.Height.
broadcastNewRoundStep := func(step RoundStep) {
// Get seconds since beginning of height.
// Due to the condition documented, this is safe.
timeElapsed := rs.StartTime.Sub(time.Now())
// Broadcast NewRoundStepMessage
msg := &NewRoundStepMessage{
Height: height,
Round: round,
Step: step,
SecondsSinceStartTime: uint32(timeElapsed.Seconds()),
}
conR.sw.Broadcast(StateCh, msg)
// Get RoundState with new Step or quit.
var rs *RoundState
select {
case rs = <-conR.conS.NewStepCh():
case <-conR.quit:
return
}
// Continue if action is not relevant
if height != rs.Height {
continue
}
// If action >= RoundActionCommitWait, the round doesn't matter.
if action < RoundActionCommitWait && round != rs.Round {
continue
}
// Get seconds since beginning of height.
// Due to the condition documented, this is safe.
timeElapsed := rs.StartTime.Sub(time.Now())
// Run action
switch action {
case RoundActionPropose:
if rs.Step != RoundStepStart {
continue ACTION_LOOP
}
conR.conS.RunActionPropose(rs.Height, rs.Round)
broadcastNewRoundStep(RoundStepPropose)
scheduleNextAction()
continue ACTION_LOOP
case RoundActionPrevote:
if rs.Step >= RoundStepPrevote {
continue ACTION_LOOP
}
vote := conR.conS.RunActionPrevote(rs.Height, rs.Round)
broadcastNewRoundStep(RoundStepPrevote)
if vote != nil {
conR.broadcastVote(rs, vote)
}
scheduleNextAction()
continue ACTION_LOOP
case RoundActionPrecommit:
if rs.Step >= RoundStepPrecommit {
continue ACTION_LOOP
}
vote := conR.conS.RunActionPrecommit(rs.Height, rs.Round)
broadcastNewRoundStep(RoundStepPrecommit)
if vote != nil {
conR.broadcastVote(rs, vote)
}
scheduleNextAction()
continue ACTION_LOOP
case RoundActionTryCommit:
if rs.Step >= RoundStepCommit {
continue ACTION_LOOP
}
if rs.Precommits.HasTwoThirdsMajority() {
// NOTE: Duplicated in RoundActionCommitWait.
vote := conR.conS.RunActionCommit(rs.Height, rs.Round)
broadcastNewRoundStep(RoundStepCommit)
if vote != nil {
conR.broadcastVote(rs, vote)
// If we have +2/3 commits, queue an action to RoundActionCommitWait.
// Likely this is a duplicate action being pushed.
// See also Receive() where RoundActionCommitWait can be pushed in
// response to a vote from the network.
if rs.Commits.HasTwoThirdsMajority() {
conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundActionCommitWait}
}
}
// do not schedule next action.
continue ACTION_LOOP
} else {
// Could not commit, move onto next round.
conR.conS.SetupRound(rs.Round + 1)
scheduleNextAction()
continue ACTION_LOOP
}
case RoundActionCommitWait:
if rs.Step >= RoundStepCommitWait {
continue ACTION_LOOP
}
// Commit first we haven't already.
if rs.Step < RoundStepCommit {
// NOTE: Duplicated in RoundActionCommit.
vote := conR.conS.RunActionCommit(rs.Height, rs.Round)
broadcastNewRoundStep(RoundStepCommit)
if vote != nil {
conR.broadcastVote(rs, vote)
}
}
// Wait for more commit votes.
conR.conS.RunActionCommitWait(rs.Height, rs.Round)
scheduleNextAction()
continue ACTION_LOOP
case RoundActionFinalize:
if rs.Step != RoundStepCommitWait {
panic("This shouldn't happen")
}
conR.conS.RunActionFinalize(rs.Height, rs.Round)
// Height has been incremented, step is now RoundStepStart.
scheduleNextAction()
continue ACTION_LOOP
default:
panic("Unknown action")
// Broadcast NewRoundStepMessage
msg := &NewRoundStepMessage{
Height: rs.Height,
Round: rs.Round,
Step: rs.Step,
SecondsSinceStartTime: uint32(timeElapsed.Seconds()),
}
// For clarity, ensure that all switch cases call "continue"
panic("Should not happen.")
conR.sw.Broadcast(StateCh, msg)
}
}
func (conR *ConsensusReactor) broadcastVote(rs *RoundState, vote *Vote) {
// Get our validator index
index, _ := rs.Validators.GetById(vote.SignerId)
msg := p2p.TypedMessage{msgTypeVote, vote}
for _, peer := range conR.sw.Peers().List() {
peer.Send(VoteCh, msg)
ps := peer.Data.Get(peerStateKey).(*PeerState)
ps.SetHasVote(rs.Height, rs.Round, vote.Type, index)
}
}
//--------------------------------------
func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
OUTER_LOOP:
@ -503,7 +254,7 @@ OUTER_LOOP:
// Send proposal Block parts?
// NOTE: if we or peer is at RoundStepCommit*, the round
// won't necessarily match, but that's OK.
if rs.ProposalBlockParts.HashesTo(prs.ProposalBlockPartsHash) {
if rs.ProposalBlockParts.Header().Equals(prs.ProposalBlockParts) {
if index, ok := rs.ProposalBlockParts.BitArray().Sub(
prs.ProposalBlockBitArray).PickRandom(); ok {
msg := &PartMessage{
@ -533,7 +284,7 @@ OUTER_LOOP:
}
// Send proposal POL parts?
if rs.ProposalPOLParts.HashesTo(prs.ProposalPOLPartsHash) {
if rs.ProposalPOLParts.Header().Equals(prs.ProposalPOLParts) {
if index, ok := rs.ProposalPOLParts.BitArray().Sub(
prs.ProposalPOLBitArray).PickRandom(); ok {
msg := &PartMessage{
@ -574,62 +325,37 @@ OUTER_LOOP:
// Ensure that peer's prevote/precommit/commit bitarrays of of sufficient capacity
ps.EnsureVoteBitArrays(rs.Height, rs.Round, rs.Validators.Size())
trySendVote := func(voteSet *VoteSet, peerVoteSet BitArray) (sent bool) {
// TODO: give priority to our vote.
index, ok := voteSet.BitArray().Sub(peerVoteSet).PickRandom()
if ok {
vote := voteSet.GetByIndex(index)
// NOTE: vote may be a commit.
msg := p2p.TypedMessage{msgTypeVote, vote}
peer.Send(VoteCh, msg)
ps.SetHasVote(rs.Height, rs.Round, index, vote)
return true
}
return false
}
// If there are prevotes to send...
if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
index, ok := rs.Prevotes.BitArray().Sub(prs.Prevotes).PickRandom()
if ok {
valId, val := rs.Validators.GetByIndex(index)
if val != nil {
vote := rs.Prevotes.Get(valId)
// NOTE: vote may be a commit
msg := p2p.TypedMessage{msgTypeVote, vote}
peer.Send(VoteCh, msg)
ps.SetHasVote(rs.Height, rs.Round, VoteTypePrevote, index)
if vote.Type == VoteTypeCommit {
ps.SetHasVote(rs.Height, rs.Round, VoteTypePrecommit, index)
ps.SetHasVote(rs.Height, rs.Round, VoteTypeCommit, index)
}
continue OUTER_LOOP
} else {
log.Error("index is not a valid validator index")
}
if trySendVote(rs.Prevotes, prs.Prevotes) {
continue OUTER_LOOP
}
}
// If there are precommits to send...
if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
index, ok := rs.Precommits.BitArray().Sub(prs.Precommits).PickRandom()
if ok {
valId, val := rs.Validators.GetByIndex(index)
if val != nil {
vote := rs.Precommits.Get(valId)
// NOTE: vote may be a commit
msg := p2p.TypedMessage{msgTypeVote, vote}
peer.Send(VoteCh, msg)
ps.SetHasVote(rs.Height, rs.Round, VoteTypePrecommit, index)
if vote.Type == VoteTypeCommit {
ps.SetHasVote(rs.Height, rs.Round, VoteTypeCommit, index)
}
continue OUTER_LOOP
} else {
log.Error("index is not a valid validator index")
}
if trySendVote(rs.Precommits, prs.Precommits) {
continue OUTER_LOOP
}
}
// If there are any commits to send...
index, ok := rs.Commits.BitArray().Sub(prs.Commits).PickRandom()
if ok {
valId, val := rs.Validators.GetByIndex(index)
if val != nil {
vote := rs.Commits.Get(valId)
msg := p2p.TypedMessage{msgTypeVote, vote}
peer.Send(VoteCh, msg)
ps.SetHasVote(rs.Height, rs.Round, VoteTypeCommit, index)
continue OUTER_LOOP
} else {
log.Error("index is not a valid validator index")
}
if trySendVote(rs.Commits, prs.Commits) {
continue OUTER_LOOP
}
// We sent nothing. Sleep...
@ -642,18 +368,18 @@ OUTER_LOOP:
// Read only when returned by PeerState.GetRoundState().
type PeerRoundState struct {
Height uint32 // Height peer is at
Round uint16 // Round peer is at
Step RoundStep // Step peer is at
StartTime time.Time // Estimated start of round 0 at this height
Proposal bool // True if peer has proposal for this round
ProposalBlockPartsHash []byte // Block parts merkle root
ProposalBlockBitArray BitArray // Block parts bitarray
ProposalPOLPartsHash []byte // POL parts merkle root
ProposalPOLBitArray BitArray // POL parts bitarray
Prevotes BitArray // All votes peer has for this round
Precommits BitArray // All precommits peer has for this round
Commits BitArray // All commits peer has for this height
Height uint32 // Height peer is at
Round uint16 // Round peer is at
Step RoundStep // Step peer is at
StartTime time.Time // Estimated start of round 0 at this height
Proposal bool // True if peer has proposal for this round
ProposalBlockParts PartSetHeader //
ProposalBlockBitArray BitArray // True bit -> has part
ProposalPOLParts PartSetHeader //
ProposalPOLBitArray BitArray // True bit -> has part
Prevotes BitArray // All votes peer has for this round
Precommits BitArray // All precommits peer has for this round
Commits BitArray // All commits peer has for this height
}
//-----------------------------------------------------------------------------
@ -693,10 +419,10 @@ func (ps *PeerState) SetHasProposal(proposal *Proposal) {
}
ps.Proposal = true
ps.ProposalBlockPartsHash = proposal.BlockPartsHash
ps.ProposalBlockBitArray = NewBitArray(uint(proposal.BlockPartsTotal))
ps.ProposalPOLPartsHash = proposal.POLPartsHash
ps.ProposalPOLBitArray = NewBitArray(uint(proposal.POLPartsTotal))
ps.ProposalBlockParts = proposal.BlockParts
ps.ProposalBlockBitArray = NewBitArray(uint(proposal.BlockParts.Total))
ps.ProposalPOLParts = proposal.POLParts
ps.ProposalPOLBitArray = NewBitArray(uint(proposal.POLParts.Total))
}
func (ps *PeerState) SetHasProposalBlockPart(height uint32, round uint16, index uint16) {
@ -740,20 +466,24 @@ func (ps *PeerState) EnsureVoteBitArrays(height uint32, round uint16, numValidat
}
}
func (ps *PeerState) SetHasVote(height uint32, round uint16, type_ uint8, index uint) {
func (ps *PeerState) SetHasVote(height uint32, round uint16, index uint, vote *Vote) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.Height != height || (ps.Round != round && type_ != VoteTypeCommit) {
if ps.Height != height {
return
}
switch type_ {
switch vote.Type {
case VoteTypePrevote:
ps.Prevotes.SetIndex(index, true)
case VoteTypePrecommit:
ps.Precommits.SetIndex(index, true)
case VoteTypeCommit:
if vote.Round < round {
ps.Prevotes.SetIndex(index, true)
ps.Precommits.SetIndex(index, true)
}
ps.Commits.SetIndex(index, true)
default:
panic("Invalid vote type")
@ -776,9 +506,9 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *Roun
ps.StartTime = startTime
if psHeight != msg.Height || psRound != msg.Round {
ps.Proposal = false
ps.ProposalBlockPartsHash = nil
ps.ProposalBlockParts = PartSetHeader{}
ps.ProposalBlockBitArray = BitArray{}
ps.ProposalPOLPartsHash = nil
ps.ProposalPOLParts = PartSetHeader{}
ps.ProposalPOLBitArray = BitArray{}
// We'll update the BitArray capacity later.
ps.Prevotes = BitArray{}


+ 519
- 168
consensus/state.go View File

@ -1,9 +1,62 @@
/*
Consensus State Machine Overview:
* Propose, Prevote, Precommit represent state machine stages. (aka RoundStep, or step).
Each take a predetermined amount of time depending on the round number.
* The Commit step can be entered by two means:
1. After the Precommit step, +2/3 Precommits were found
2. At any time, +2/3 Commits were found
* Once in the Commit stage, two conditions must both be satisfied
before proceeding to the next height NewHeight.
* The Propose step of the next height does not begin until
at least Delta duration *after* +2/3 Commits were found.
The step stays at NewHeight until this timeout occurs before
proceeding to Propose.
+-------------------------------------+
| |
v |(Wait til CommitTime + Delta)
+-----------+ +-----+-----+
+----------> | Propose +--------------+ | NewHeight |
| +-----------+ | +-----------+
| | ^
| | |
| | |
|(Else) v |
+-----+-----+ +-----------+ |
| Precommit | <------------------------+ Prevote | |
+-----+-----+ +-----------+ |
|(If +2/3 Precommits found) |
| |
| + (When +2/3 Commits found) |
| | |
v v |
+------------------------------------------------------------------------------+
| Commit | |
| | |
| +----------------+ * Save Block | |
| |Get Block Parts |---> * Stage Block +--+ + |
| +----------------+ * Broadcast Commit | * Setup New Height |
| | * Move Commits set to |
| +--> LastCommits to continue |
| | collecting commits |
| +-----------------+ | * Broadcast New State |
| |Get +2/3 Commits |--> * Set CommitTime +--+ |
| +-----------------+ |
| |
+------------------------------------------------------------------------------+
*/
package consensus
import (
"errors"
"fmt"
"math"
"sync"
"sync/atomic"
"time"
. "github.com/tendermint/tendermint/binary"
@ -18,35 +71,38 @@ type RoundStep uint8
type RoundActionType uint8
const (
RoundStepStart = RoundStep(0x00) // Round started.
RoundStepPropose = RoundStep(0x01) // Did propose, gossip proposal.
RoundStepPrevote = RoundStep(0x02) // Did prevote, gossip prevotes.
RoundStepPrecommit = RoundStep(0x03) // Did precommit, gossip precommits.
RoundStepCommit = RoundStep(0x10) // Did commit, gossip commits.
RoundStepCommitWait = RoundStep(0x11) // Found +2/3 commits, wait more.
// If a block could not be committed at a given round,
// we progress to the next round, skipping RoundStepCommit.
//
// If a block was committed, we goto RoundStepCommit,
// then wait "finalizeDuration" to gather more commits,
// then we progress to the next height at round 0.
// TODO: document how RoundStepCommit transcends all rounds.
RoundActionPropose = RoundActionType(0x00) // Goto RoundStepPropose
RoundActionPrevote = RoundActionType(0x01) // Goto RoundStepPrevote
RoundActionPrecommit = RoundActionType(0x02) // Goto RoundStepPrecommit
RoundActionTryCommit = RoundActionType(0x10) // Goto RoundStepCommit or RoundStepStart next round
RoundActionCommitWait = RoundActionType(0x11) // Goto RoundStepCommitWait
RoundActionFinalize = RoundActionType(0x12) // Goto RoundStepStart next height
RoundStepNewHeight = RoundStep(0x00) // Round0 for new height started, wait til CommitTime + Delta
RoundStepNewRound = RoundStep(0x01) // Pseudostep, immediately goes to RoundStepPropose
RoundStepPropose = RoundStep(0x10) // Did propose, gossip proposal
RoundStepPrevote = RoundStep(0x11) // Did prevote, gossip prevotes
RoundStepPrecommit = RoundStep(0x12) // Did precommit, gossip precommits
RoundStepCommit = RoundStep(0x20) // Entered commit state machine
RoundActionPropose = RoundActionType(0xA0) // Propose and goto RoundStepPropose
RoundActionPrevote = RoundActionType(0xA1) // Prevote and goto RoundStepPrevote
RoundActionPrecommit = RoundActionType(0xA2) // Precommit and goto RoundStepPrecommit
RoundActionTryCommit = RoundActionType(0xC0) // Goto RoundStepCommit, or RoundStepPropose for next round.
RoundActionTryFinalize = RoundActionType(0xC1) // Maybe goto RoundStepPropose for next round.
roundDuration0 = 60 * time.Second // The first round is 60 seconds long.
roundDurationDelta = 15 * time.Second // Each successive round lasts 15 seconds longer.
roundDeadlinePrevote = float64(1.0 / 3.0) // When the prevote is due.
roundDeadlinePrecommit = float64(2.0 / 3.0) // When the precommit vote is due.
newHeightDelta = roundDuration0 / 3 // The time to wait between commitTime and startTime of next consensus rounds.
)
var (
ErrInvalidProposalSignature = errors.New("Error invalid proposal signature")
consensusStateKey = []byte("consensusState")
)
type RoundAction struct {
Height uint32 // The block height for which consensus is reaching for.
Round uint16 // The round number at given height.
Action RoundActionType // Action to perform.
}
//-----------------------------------------------------------------------------
// Immutable when returned from ConsensusState.GetRoundState()
type RoundState struct {
Height uint32 // Height we are working on
@ -111,12 +167,18 @@ func (rs *RoundState) Description() string {
rs.Height, rs.Round, rs.Step, rs.StartTime)
}
//-------------------------------------
//-----------------------------------------------------------------------------
// Tracks consensus state across block heights and rounds.
type ConsensusState struct {
blockStore *BlockStore
mempool *mempool.Mempool
started uint32
stopped uint32
quit chan struct{}
blockStore *BlockStore
mempool *mempool.Mempool
runActionCh chan RoundAction
newStepCh chan *RoundState
mtx sync.Mutex
RoundState
@ -127,8 +189,11 @@ type ConsensusState struct {
func NewConsensusState(state *state.State, blockStore *BlockStore, mempool *mempool.Mempool) *ConsensusState {
cs := &ConsensusState{
blockStore: blockStore,
mempool: mempool,
quit: make(chan struct{}),
blockStore: blockStore,
mempool: mempool,
runActionCh: make(chan RoundAction, 1),
newStepCh: make(chan *RoundState, 1),
}
cs.updateToState(state)
return cs
@ -137,10 +202,176 @@ func NewConsensusState(state *state.State, blockStore *BlockStore, mempool *memp
func (cs *ConsensusState) GetRoundState() *RoundState {
cs.mtx.Lock()
defer cs.mtx.Unlock()
return cs.getRoundState()
}
func (cs *ConsensusState) getRoundState() *RoundState {
rs := cs.RoundState // copy
return &rs
}
func (cs *ConsensusState) NewStepCh() chan *RoundState {
return cs.newStepCh
}
func (cs *ConsensusState) Start() {
if atomic.CompareAndSwapUint32(&cs.started, 0, 1) {
log.Info("Starting ConsensusState")
go cs.stepTransitionRoutine()
}
}
func (cs *ConsensusState) Stop() {
if atomic.CompareAndSwapUint32(&cs.stopped, 0, 1) {
log.Info("Stopping ConsensusState")
close(cs.quit)
}
}
func (cs *ConsensusState) IsStopped() bool {
return atomic.LoadUint32(&cs.stopped) == 1
}
// Source of all round state transitions (and votes).
func (cs *ConsensusState) stepTransitionRoutine() {
// For clarity, all state transitions that happen after some timeout are here.
// Schedule the next action by pushing a RoundAction{} to cs.runActionCh.
scheduleNextAction := func() {
go func() {
rs := cs.getRoundState()
round, roundStartTime, roundDuration, _, elapsedRatio := calcRoundInfo(rs.StartTime)
log.Debug("Called scheduleNextAction. round:%v roundStartTime:%v elapsedRatio:%v", round, roundStartTime, elapsedRatio)
switch rs.Step {
case RoundStepNewHeight:
// We should run RoundActionPropose when rs.StartTime passes.
if elapsedRatio < 0 {
// startTime is in the future.
time.Sleep(time.Duration((-1.0 * elapsedRatio) * float64(roundDuration)))
}
cs.runActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPropose}
case RoundStepNewRound:
// Pseudostep: Immediately goto propose.
cs.runActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPropose}
case RoundStepPropose:
// Wake up when it's time to vote.
time.Sleep(time.Duration((roundDeadlinePrevote - elapsedRatio) * float64(roundDuration)))
cs.runActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPrevote}
case RoundStepPrevote:
// Wake up when it's time to precommit.
time.Sleep(time.Duration((roundDeadlinePrecommit - elapsedRatio) * float64(roundDuration)))
cs.runActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPrecommit}
case RoundStepPrecommit:
// Wake up when the round is over.
time.Sleep(time.Duration((1.0 - elapsedRatio) * float64(roundDuration)))
cs.runActionCh <- RoundAction{rs.Height, rs.Round, RoundActionTryCommit}
case RoundStepCommit:
// There's nothing to scheudle, we're waiting for
// ProposalBlockParts.IsComplete() &&
// Commits.HasTwoThirdsMajority()
panic("The next action from RoundStepCommit is not scheduled by time")
default:
panic("Should not happen")
}
}()
}
scheduleNextAction()
// NOTE: All ConsensusState.RunAction*() calls come from here.
// Since only one routine calls them, it is safe to assume that
// the RoundState Height/Round/Step won't change concurrently.
// However, other fields like Proposal could change, due to gossip.
ACTION_LOOP:
for {
var roundAction RoundAction
select {
case roundAction = <-cs.runActionCh:
case <-cs.quit:
return
}
height, round, action := roundAction.Height, roundAction.Round, roundAction.Action
rs := cs.GetRoundState()
log.Info("Running round action A:%X %v", action, rs.Description())
// Continue if action is not relevant
if height != rs.Height {
continue
}
// If action <= RoundActionPrecommit, the round must match too.
if action <= RoundActionPrecommit && round != rs.Round {
continue
}
// Run action
switch action {
case RoundActionPropose:
if rs.Step != RoundStepNewHeight && rs.Step != RoundStepNewRound {
continue ACTION_LOOP
}
cs.RunActionPropose(rs.Height, rs.Round)
scheduleNextAction()
continue ACTION_LOOP
case RoundActionPrevote:
if rs.Step >= RoundStepPrevote {
continue ACTION_LOOP
}
cs.RunActionPrevote(rs.Height, rs.Round)
scheduleNextAction()
continue ACTION_LOOP
case RoundActionPrecommit:
if rs.Step >= RoundStepPrecommit {
continue ACTION_LOOP
}
cs.RunActionPrecommit(rs.Height, rs.Round)
scheduleNextAction()
continue ACTION_LOOP
case RoundActionTryCommit:
if rs.Step >= RoundStepCommit {
continue ACTION_LOOP
}
if rs.Precommits.HasTwoThirdsMajority() {
// Enter RoundStepCommit and commit.
cs.RunActionCommit(rs.Height)
// Maybe finalize already
cs.runActionCh <- RoundAction{rs.Height, rs.Round, RoundActionTryFinalize}
continue ACTION_LOOP
} else {
// Could not commit, move onto next round.
cs.SetupNewRound(rs.Height, rs.Round+1)
// cs.Step is now at RoundStepNewRound
scheduleNextAction()
continue ACTION_LOOP
}
case RoundActionTryFinalize:
if cs.TryFinalizeCommit(rs.Height) {
// Now at new height
// cs.Step is at RoundStepNewHeight or RoundStepNewRound.
scheduleNextAction()
continue ACTION_LOOP
} else {
// do not schedule next action.
continue ACTION_LOOP
}
default:
panic("Unknown action")
}
// For clarity, ensure that all switch cases call "continue"
panic("Should not happen.")
}
}
// Updates ConsensusState and increments height to match that of state.
// If calculated round is greater than 0 (based on BlockTime or calculated StartTime)
// then also sets up the appropriate round, and cs.Step becomes RoundStepNewRound.
// Otherwise the round is 0 and cs.Step becomes RoundStepNewHeight.
func (cs *ConsensusState) updateToState(state *state.State) {
// Sanity check state.
if cs.Height > 0 && cs.Height != state.Height {
@ -153,11 +384,11 @@ func (cs *ConsensusState) updateToState(state *state.State) {
height := state.Height + 1 // next desired block height
cs.Height = height
cs.Round = 0
cs.Step = RoundStepStart
cs.Step = RoundStepNewHeight
if cs.CommitTime.IsZero() {
cs.StartTime = state.BlockTime.Add(finalizeDuration)
cs.StartTime = state.BlockTime.Add(newHeightDelta)
} else {
cs.StartTime = cs.CommitTime.Add(finalizeDuration)
cs.StartTime = cs.CommitTime.Add(newHeightDelta)
}
cs.CommitTime = time.Time{}
cs.Validators = validators
@ -181,20 +412,15 @@ func (cs *ConsensusState) updateToState(state *state.State) {
// Update the round if we need to.
round := calcRound(cs.StartTime)
if round > 0 {
cs.setupRound(round)
cs.setupNewRound(round)
}
}
func (cs *ConsensusState) SetupRound(round uint16) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if cs.Round >= round {
Panicf("ConsensusState round %v not lower than desired round %v", cs.Round, round)
func (cs *ConsensusState) setupNewRound(round uint16) {
// Sanity check
if round == 0 {
panic("setupNewRound() should never be called for round 0")
}
cs.setupRound(round)
}
func (cs *ConsensusState) setupRound(round uint16) {
// Increment all the way to round.
validators := cs.Validators.Copy()
@ -203,7 +429,7 @@ func (cs *ConsensusState) setupRound(round uint16) {
}
cs.Round = round
cs.Step = RoundStepStart
cs.Step = RoundStepNewRound
cs.Validators = validators
cs.Proposal = nil
cs.ProposalBlock = nil
@ -224,6 +450,21 @@ func (cs *ConsensusState) SetPrivValidator(priv *PrivValidator) {
//-----------------------------------------------------------------------------
// Set up the round to desired round and set step to RoundStepNewRound
func (cs *ConsensusState) SetupNewRound(height uint32, desiredRound uint16) bool {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if cs.Height != height {
return false
}
if desiredRound <= cs.Round {
return false
}
cs.setupNewRound(desiredRound)
cs.newStepCh <- cs.getRoundState()
return true
}
func (cs *ConsensusState) RunActionPropose(height uint32, round uint16) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
@ -231,7 +472,9 @@ func (cs *ConsensusState) RunActionPropose(height uint32, round uint16) {
return
}
cs.Step = RoundStepPropose
cs.newStepCh <- cs.getRoundState()
// Nothing to do if it's not our turn.
if cs.PrivValidator == nil || cs.Validators.Proposer().Id != cs.PrivValidator.Id {
return
}
@ -248,6 +491,7 @@ func (cs *ConsensusState) RunActionPropose(height uint32, round uint16) {
blockParts = cs.LockedBlockParts
pol = cs.LockedPOL
} else {
// Otherwise we should create a new proposal.
var validation Validation
if cs.Height == 1 {
// We're creating a proposal for the first block.
@ -285,9 +529,7 @@ func (cs *ConsensusState) RunActionPropose(height uint32, round uint16) {
}
// Make proposal
proposal := NewProposal(cs.Height, cs.Round,
blockParts.Total(), blockParts.RootHash(),
polParts.Total(), polParts.RootHash())
proposal := NewProposal(cs.Height, cs.Round, blockParts.Header(), polParts.Header())
cs.PrivValidator.Sign(proposal)
// Set fields
@ -298,167 +540,186 @@ func (cs *ConsensusState) RunActionPropose(height uint32, round uint16) {
cs.ProposalPOLParts = polParts
}
func (cs *ConsensusState) RunActionPrevote(height uint32, round uint16) *Vote {
// Prevote for LockedBlock if we're locked, or ProposealBlock if valid.
// Otherwise vote nil.
func (cs *ConsensusState) RunActionPrevote(height uint32, round uint16) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if cs.Height != height || cs.Round != round {
Panicf("RunActionPrevote(%v/%v), expected %v/%v", height, round, cs.Height, cs.Round)
}
cs.Step = RoundStepPrevote
cs.newStepCh <- cs.getRoundState()
// If a block is locked, prevote that.
if cs.LockedBlock != nil {
return cs.signAddVote(VoteTypePrevote, cs.LockedBlock.Hash())
cs.signAddVote(VoteTypePrevote, cs.LockedBlock.Hash(), cs.LockedBlockParts.Header())
return
}
// If ProposalBlock is nil, prevote nil.
if cs.ProposalBlock == nil {
return nil
cs.signAddVote(VoteTypePrevote, nil, PartSetHeader{})
return
}
// Try staging proposed block.
// Try staging cs.ProposalBlock
err := cs.stageBlock(cs.ProposalBlock)
if err != nil {
// Prevote nil.
return nil
} else {
// Prevote block.
return cs.signAddVote(VoteTypePrevote, cs.ProposalBlock.Hash())
// ProposalBlock is invalid, prevote nil.
cs.signAddVote(VoteTypePrevote, nil, PartSetHeader{})
return
}
// Prevote cs.ProposalBlock
cs.signAddVote(VoteTypePrevote, cs.ProposalBlock.Hash(), cs.ProposalBlockParts.Header())
return
}
// Lock the ProposalBlock if we have enough prevotes for it,
// Lock & Precommit the ProposalBlock if we have enough prevotes for it,
// or unlock an existing lock if +2/3 of prevotes were nil.
// Returns a blockhash if a block was locked.
func (cs *ConsensusState) RunActionPrecommit(height uint32, round uint16) *Vote {
func (cs *ConsensusState) RunActionPrecommit(height uint32, round uint16) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if cs.Height != height || cs.Round != round {
Panicf("RunActionPrecommit(%v/%v), expected %v/%v", height, round, cs.Height, cs.Round)
}
cs.Step = RoundStepPrecommit
cs.newStepCh <- cs.getRoundState()
if hash, ok := cs.Prevotes.TwoThirdsMajority(); ok {
// Remember this POL. (hash may be nil)
cs.LockedPOL = cs.Prevotes.MakePOL()
if len(hash) == 0 {
// +2/3 prevoted nil. Just unlock.
cs.LockedBlock = nil
cs.LockedBlockParts = nil
return nil
} else if cs.ProposalBlock.HashesTo(hash) {
// +2/3 prevoted for proposal block
// Validate the block.
// See note on ZombieValidators to see why.
if err := cs.stageBlock(cs.ProposalBlock); err != nil {
log.Warning("+2/3 prevoted for an invalid block: %v", err)
return nil
}
cs.LockedBlock = cs.ProposalBlock
cs.LockedBlockParts = cs.ProposalBlockParts
return cs.signAddVote(VoteTypePrecommit, hash)
} else if cs.LockedBlock.HashesTo(hash) {
// +2/3 prevoted for already locked block
return cs.signAddVote(VoteTypePrecommit, hash)
} else {
// We don't have the block that hashes to hash.
// Unlock if we're locked.
cs.LockedBlock = nil
cs.LockedBlockParts = nil
return nil
}
} else {
return nil
hash, partsHeader, ok := cs.Prevotes.TwoThirdsMajority()
if !ok {
// If we don't have two thirds of prevotes,
// don't do anything at all.
return
}
}
// Commits a block if we have enough precommits (and we have the block).
// If successful, saves the block and state and resets mempool,
// and returns the committed block.
// Commit is not finalized until FinalizeCommit() is called.
// This allows us to stay at this height and gather more commits.
func (cs *ConsensusState) RunActionCommit(height uint32, round uint16) *Vote {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if cs.Height != height || cs.Round != round {
Panicf("RunActionCommit(%v/%v), expected %v/%v", height, round, cs.Height, cs.Round)
}
cs.Step = RoundStepCommit
// Remember this POL. (hash may be nil)
cs.LockedPOL = cs.Prevotes.MakePOL()
if hash, ok := cs.Precommits.TwoThirdsMajority(); ok {
// There are some strange cases that shouldn't happen
// (unless voters are duplicitous).
// For example, the hash may not be the one that was
// proposed this round. These cases should be identified
// and warn the administrator. We should err on the side of
// caution and not, for example, sign a block.
// TODO: Identify these strange cases.
var block *Block
var blockParts *PartSet
if cs.LockedBlock.HashesTo(hash) {
block = cs.LockedBlock
blockParts = cs.LockedBlockParts
} else if cs.ProposalBlock.HashesTo(hash) {
block = cs.ProposalBlock
blockParts = cs.ProposalBlockParts
} else {
return nil
}
// If +2/3 prevoted nil. Just unlock.
if len(hash) == 0 {
cs.LockedBlock = nil
cs.LockedBlockParts = nil
return
}
// The proposal must be valid.
if err := cs.stageBlock(block); err != nil {
log.Warning("Network is commiting an invalid proposal? %v", err)
return nil
}
// If +2/3 prevoted for already locked block, precommit it.
if cs.LockedBlock.HashesTo(hash) {
cs.signAddVote(VoteTypePrecommit, hash, partsHeader)
return
}
// Keep block in cs.Proposal*
if !cs.ProposalBlock.HashesTo(hash) {
cs.ProposalBlock = block
cs.ProposalBlockParts = blockParts
// If +2/3 prevoted for cs.ProposalBlock, lock it and precommit it.
if cs.ProposalBlock.HashesTo(hash) {
// Validate the block.
if err := cs.stageBlock(cs.ProposalBlock); err != nil {
// Prevent zombies.
log.Warning("+2/3 prevoted for an invalid block: %v", err)
return
}
// Save to blockStore
cs.blockStore.SaveBlock(block)
// Save the state
cs.stagedState.Save()
// Update mempool.
cs.mempool.ResetForBlockAndState(block, cs.stagedState)
return cs.signAddVote(VoteTypeCommit, block.Hash())
cs.LockedBlock = cs.ProposalBlock
cs.LockedBlockParts = cs.ProposalBlockParts
cs.signAddVote(VoteTypePrecommit, hash, partsHeader)
return
}
return nil
// We don't have the block that validators prevoted.
// Unlock if we're locked.
cs.LockedBlock = nil
cs.LockedBlockParts = nil
return
}
func (cs *ConsensusState) RunActionCommitWait(height uint32, round uint16) {
// Enter commit step. See the diagram for details.
func (cs *ConsensusState) RunActionCommit(height uint32) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if cs.Height != height || cs.Round != round {
Panicf("RunActionCommitWait(%v/%v), expected %v/%v", height, round, cs.Height, cs.Round)
if cs.Height != height {
Panicf("RunActionCommit(%v), expected %v", height, cs.Height)
}
// There are two ways to enter:
// 1. +2/3 precommits at the end of RoundStepPrecommit
// 2. +2/3 commits at any time
hash, partsHeader, ok := cs.Precommits.TwoThirdsMajority()
if !ok {
hash, partsHeader, ok = cs.Commits.TwoThirdsMajority()
if !ok {
panic("RunActionCommit() expects +2/3 precommits or commits")
}
}
cs.Step = RoundStepCommit
cs.newStepCh <- cs.getRoundState()
// Clear the Locked* fields and use cs.Proposed*
if cs.LockedBlock.HashesTo(hash) {
cs.ProposalBlock = cs.LockedBlock
cs.ProposalBlockParts = cs.LockedBlockParts
cs.LockedBlock = nil
cs.LockedBlockParts = nil
cs.LockedPOL = nil
}
// If we don't have the block being committed, set up to get it.
if !cs.ProposalBlock.HashesTo(hash) {
if !cs.ProposalBlockParts.Header().Equals(partsHeader) {
// We're getting the wrong block.
// Set up ProposalBlockParts and keep waiting.
cs.ProposalBlock = nil
cs.ProposalBlockParts = NewPartSetFromHeader(partsHeader)
} else {
// We just need to keep waiting.
}
} else {
// We have the block, so save/stage/sign-commit-vote.
cs.processBlockForCommit(cs.ProposalBlock, cs.ProposalBlockParts)
}
cs.Step = RoundStepCommitWait
// If we have +2/3 commits, set the CommitTime
if cs.Commits.HasTwoThirdsMajority() {
cs.CommitTime = time.Now()
} else {
panic("RunActionCommitWait() expects +2/3 commits")
}
}
func (cs *ConsensusState) RunActionFinalize(height uint32, round uint16) {
// Returns true if Finalize happened, which increments height && sets
// the step to RoundStepNewHeight (or RoundStepNewRound, but probably not).
func (cs *ConsensusState) TryFinalizeCommit(height uint32) bool {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if cs.Height != height || cs.Round != round {
Panicf("RunActionFinalize(%v/%v), expected %v/%v", height, round, cs.Height, cs.Round)
if cs.Height != height {
Panicf("TryFinalizeCommit(%v), expected %v", height, cs.Height)
}
// What was staged becomes committed.
// XXX it's possible that this node never received the block to stage!!!
cs.updateToState(cs.stagedState)
if cs.Step == RoundStepCommit &&
cs.Commits.HasTwoThirdsMajority() &&
cs.ProposalBlockParts.IsComplete() {
// Sanity check
if cs.ProposalBlock == nil {
Panicf("Expected ProposalBlock to exist")
}
hash, header, _ := cs.Commits.TwoThirdsMajority()
if !cs.ProposalBlock.HashesTo(hash) {
Panicf("Expected ProposalBlock to hash to commit hash")
}
if !cs.ProposalBlockParts.Header().Equals(header) {
Panicf("Expected ProposalBlockParts header to be commit header")
}
err := cs.stageBlock(cs.ProposalBlock)
if err == nil {
// Increment height.
cs.updateToState(cs.stagedState)
// cs.Step is now RoundStepNewHeight or RoundStepNewRound
cs.newStepCh <- cs.getRoundState()
return true
} else {
// Prevent zombies.
Panicf("+2/3 committed an invalid block: %v", err)
}
}
return false
}
//-----------------------------------------------------------------------------
@ -472,23 +733,29 @@ func (cs *ConsensusState) SetProposal(proposal *Proposal) error {
return nil
}
// Invalid.
// Does not apply
if proposal.Height != cs.Height || proposal.Round != cs.Round {
return nil
}
// We don't care about the proposal if we're already in RoundStepCommit.
if cs.Step == RoundStepCommit {
return nil
}
// Verify signature
if !cs.Validators.Proposer().Verify(proposal) {
return ErrInvalidProposalSignature
}
cs.Proposal = proposal
cs.ProposalBlockParts = NewPartSetFromMetadata(proposal.BlockPartsTotal, proposal.BlockPartsHash)
cs.ProposalPOLParts = NewPartSetFromMetadata(proposal.POLPartsTotal, proposal.POLPartsHash)
cs.ProposalBlockParts = NewPartSetFromHeader(proposal.BlockParts)
cs.ProposalPOLParts = NewPartSetFromHeader(proposal.POLParts)
return nil
}
// NOTE: block is not necessarily valid.
// NOTE: This function may increment the height.
func (cs *ConsensusState) AddProposalBlockPart(height uint32, round uint16, part *Part) (added bool, err error) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
@ -499,7 +766,7 @@ func (cs *ConsensusState) AddProposalBlockPart(height uint32, round uint16, part
}
// We're not expecting a block part.
if cs.ProposalBlockParts != nil {
if cs.ProposalBlockParts == nil {
return false, nil // TODO: bad peer? Return error?
}
@ -511,6 +778,7 @@ func (cs *ConsensusState) AddProposalBlockPart(height uint32, round uint16, part
var n int64
var err error
cs.ProposalBlock = ReadBlock(cs.ProposalBlockParts.GetReader(), &n, &err)
cs.runActionCh <- RoundAction{cs.Height, cs.Round, RoundActionTryFinalize}
return true, err
}
return true, nil
@ -526,7 +794,7 @@ func (cs *ConsensusState) AddProposalPOLPart(height uint32, round uint16, part *
}
// We're not expecting a POL part.
if cs.ProposalPOLParts != nil {
if cs.ProposalPOLParts == nil {
return false, nil // TODO: bad peer? Return error?
}
@ -543,6 +811,7 @@ func (cs *ConsensusState) AddProposalPOLPart(height uint32, round uint16, part *
return true, nil
}
// NOTE: This function may increment the height.
func (cs *ConsensusState) AddVote(vote *Vote) (added bool, err error) {
switch vote.Type {
case VoteTypePrevote:
@ -553,14 +822,22 @@ func (cs *ConsensusState) AddVote(vote *Vote) (added bool, err error) {
return cs.Precommits.Add(vote)
case VoteTypeCommit:
// Commits checks for height match.
// No need to check if vote.Round < cs.Round ...
// Prevotes && Precommits already checks that.
cs.Prevotes.Add(vote)
cs.Precommits.Add(vote)
return cs.Commits.Add(vote)
added, err = cs.Commits.Add(vote)
if added && cs.Commits.HasTwoThirdsMajority() {
cs.runActionCh <- RoundAction{cs.Height, cs.Round, RoundActionTryFinalize}
}
return added, err
default:
panic("Unknown vote type")
}
}
//-----------------------------------------------------------------------------
func (cs *ConsensusState) stageBlock(block *Block) error {
if block == nil {
panic("Cannot stage nil block")
@ -586,17 +863,91 @@ func (cs *ConsensusState) stageBlock(block *Block) error {
}
}
func (cs *ConsensusState) signAddVote(type_ byte, hash []byte) *Vote {
func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header PartSetHeader) *Vote {
if cs.PrivValidator == nil || !cs.Validators.HasId(cs.PrivValidator.Id) {
return nil
}
vote := &Vote{
Height: cs.Height,
Round: cs.Round,
Type: type_,
BlockHash: hash,
Height: cs.Height,
Round: cs.Round,
Type: type_,
BlockHash: hash,
BlockParts: header,
}
cs.PrivValidator.Sign(vote)
cs.AddVote(vote)
return vote
}
func (cs *ConsensusState) processBlockForCommit(block *Block, blockParts *PartSet) {
// The proposal must be valid.
if err := cs.stageBlock(block); err != nil {
// Prevent zombies.
log.Warning("+2/3 precommitted an invalid block: %v", err)
return
}
// Save to blockStore
cs.blockStore.SaveBlock(block)
// Save the state
cs.stagedState.Save()
// Update mempool.
cs.mempool.ResetForBlockAndState(block, cs.stagedState)
cs.signAddVote(VoteTypeCommit, block.Hash(), blockParts.Header())
}
//-----------------------------------------------------------------------------
// total duration of given round
func calcRoundDuration(round uint16) time.Duration {
return roundDuration0 + roundDurationDelta*time.Duration(round)
}
// startTime is when round zero started.
func calcRoundStartTime(round uint16, startTime time.Time) time.Time {
return startTime.Add(roundDuration0*time.Duration(round) +
roundDurationDelta*(time.Duration((int64(round)*int64(round)-int64(round))/2)))
}
// calculates the current round given startTime of round zero.
// NOTE: round is zero if startTime is in the future.
func calcRound(startTime time.Time) uint16 {
now := time.Now()
if now.Before(startTime) {
return 0
}
// Start + D_0 * R + D_delta * (R^2 - R)/2 <= Now; find largest integer R.
// D_delta * R^2 + (2D_0 - D_delta) * R + 2(Start - Now) <= 0.
// AR^2 + BR + C <= 0; A = D_delta, B = (2_D0 - D_delta), C = 2(Start - Now).
// R = Floor((-B + Sqrt(B^2 - 4AC))/2A)
A := float64(roundDurationDelta)
B := 2.0*float64(roundDuration0) - float64(roundDurationDelta)
C := 2.0 * float64(startTime.Sub(now))
R := math.Floor((-B + math.Sqrt(B*B-4.0*A*C)) / (2 * A))
if math.IsNaN(R) {
panic("Could not calc round, should not happen")
}
if R > math.MaxInt16 {
Panicf("Could not calc round, round overflow: %v", R)
}
if R < 0 {
return 0
}
return uint16(R)
}
// convenience
// NOTE: elapsedRatio can be negative if startTime is in the future.
func calcRoundInfo(startTime time.Time) (round uint16, roundStartTime time.Time, roundDuration time.Duration,
roundElapsed time.Duration, elapsedRatio float64) {
round = calcRound(startTime)
roundStartTime = calcRoundStartTime(round, startTime)
roundDuration = calcRoundDuration(round)
roundElapsed = time.Now().Sub(roundStartTime)
elapsedRatio = float64(roundElapsed) / float64(roundDuration)
return
}

+ 43
- 45
consensus/state_test.go View File

@ -53,7 +53,7 @@ func makeConsensusState() (*ConsensusState, []*state.PrivAccount) {
func assertPanics(t *testing.T, msg string, f func()) {
defer func() {
if err := recover(); err == nil {
t.Error("Should have panic'd, but didn't. %v", msg)
t.Errorf("Should have panic'd, but didn't: %v", msg)
}
}()
f()
@ -74,36 +74,32 @@ func TestSetupRound(t *testing.T) {
// Ensure that vote appears in RoundState.
rs0 := cs.GetRoundState()
if vote := rs0.Prevotes.Get(0); vote == nil || vote.Type != VoteTypePrevote {
if vote := rs0.Prevotes.GetById(0); vote == nil || vote.Type != VoteTypePrevote {
t.Errorf("Expected to find prevote but got %v", vote)
}
if vote := rs0.Precommits.Get(0); vote == nil || vote.Type != VoteTypePrecommit {
if vote := rs0.Precommits.GetById(0); vote == nil || vote.Type != VoteTypePrecommit {
t.Errorf("Expected to find precommit but got %v", vote)
}
if vote := rs0.Commits.Get(0); vote == nil || vote.Type != VoteTypeCommit {
if vote := rs0.Commits.GetById(0); vote == nil || vote.Type != VoteTypeCommit {
t.Errorf("Expected to find commit but got %v", vote)
}
// Setup round 1 (next round)
cs.SetupRound(1)
cs.SetupNewRound(1, 1)
<-cs.NewStepCh() // TODO: test this value too.
// Now the commit should be copied over to prevotes and precommits.
rs1 := cs.GetRoundState()
if vote := rs1.Prevotes.Get(0); vote == nil || vote.Type != VoteTypeCommit {
if vote := rs1.Prevotes.GetById(0); vote == nil || vote.Type != VoteTypeCommit {
t.Errorf("Expected to find commit but got %v", vote)
}
if vote := rs1.Precommits.Get(0); vote == nil || vote.Type != VoteTypeCommit {
if vote := rs1.Precommits.GetById(0); vote == nil || vote.Type != VoteTypeCommit {
t.Errorf("Expected to find commit but got %v", vote)
}
if vote := rs1.Commits.Get(0); vote == nil || vote.Type != VoteTypeCommit {
if vote := rs1.Commits.GetById(0); vote == nil || vote.Type != VoteTypeCommit {
t.Errorf("Expected to find commit but got %v", vote)
}
// Setup round 1 (should fail)
assertPanics(t, "Round did not increment", func() {
cs.SetupRound(1)
})
}
func TestRunActionProposeNoPrivValidator(t *testing.T) {
@ -154,59 +150,66 @@ func TestRunActionPrecommitCommitFinalize(t *testing.T) {
priv := NewPrivValidator(db_.NewMemDB(), privAccounts[0])
cs.SetPrivValidator(priv)
vote := cs.RunActionPrecommit(1, 0)
if vote != nil {
cs.RunActionPrecommit(1, 0)
<-cs.NewStepCh() // TODO: test this value too.
if cs.Precommits.GetById(0) != nil {
t.Errorf("RunActionPrecommit should return nil without a proposal")
}
cs.RunActionPropose(1, 0)
<-cs.NewStepCh() // TODO: test this value too.
// Test RunActionPrecommit failures:
assertPanics(t, "Wrong height ", func() { cs.RunActionPrecommit(2, 0) })
assertPanics(t, "Wrong round", func() { cs.RunActionPrecommit(1, 1) })
vote = cs.RunActionPrecommit(1, 0)
if vote != nil {
cs.RunActionPrecommit(1, 0)
<-cs.NewStepCh() // TODO: test this value too.
if cs.Precommits.GetById(0) != nil {
t.Errorf("RunActionPrecommit should return nil, not enough prevotes")
}
// Add at least +2/3 prevotes.
for i := 0; i < 7; i++ {
vote := &Vote{
Height: 1,
Round: 0,
Type: VoteTypePrevote,
BlockHash: cs.ProposalBlock.Hash(),
Height: 1,
Round: 0,
Type: VoteTypePrevote,
BlockHash: cs.ProposalBlock.Hash(),
BlockParts: cs.ProposalBlockParts.Header(),
}
privAccounts[i].Sign(vote)
cs.AddVote(vote)
}
// Test RunActionPrecommit success:
vote = cs.RunActionPrecommit(1, 0)
if vote == nil {
cs.RunActionPrecommit(1, 0)
<-cs.NewStepCh() // TODO: test this value too.
if cs.Precommits.GetById(0) == nil {
t.Errorf("RunActionPrecommit should have succeeded")
}
checkRoundState(t, cs, 1, 0, RoundStepPrecommit)
// Test RunActionCommit failures:
assertPanics(t, "Wrong height ", func() { cs.RunActionCommit(2, 0) })
assertPanics(t, "Wrong round", func() { cs.RunActionCommit(1, 1) })
assertPanics(t, "Wrong height ", func() { cs.RunActionCommit(2) })
assertPanics(t, "Wrong round", func() { cs.RunActionCommit(1) })
// Add at least +2/3 precommits.
for i := 0; i < 7; i++ {
vote := &Vote{
Height: 1,
Round: 0,
Type: VoteTypePrecommit,
BlockHash: cs.ProposalBlock.Hash(),
Height: 1,
Round: 0,
Type: VoteTypePrecommit,
BlockHash: cs.ProposalBlock.Hash(),
BlockParts: cs.ProposalBlockParts.Header(),
}
privAccounts[i].Sign(vote)
cs.AddVote(vote)
}
// Test RunActionCommit success:
vote = cs.RunActionCommit(1, 0)
if vote == nil {
cs.RunActionCommit(1)
<-cs.NewStepCh() // TODO: test this value too.
if cs.Commits.GetById(0) == nil {
t.Errorf("RunActionCommit should have succeeded")
}
checkRoundState(t, cs, 1, 0, RoundStepCommit)
@ -219,23 +222,18 @@ func TestRunActionPrecommitCommitFinalize(t *testing.T) {
// Add at least +2/3 commits.
for i := 0; i < 7; i++ {
vote := &Vote{
Height: 1,
Round: uint16(i), // Doesn't matter what round
Type: VoteTypeCommit,
BlockHash: cs.ProposalBlock.Hash(),
Height: 1,
Round: uint16(i), // Doesn't matter what round
Type: VoteTypeCommit,
BlockHash: cs.ProposalBlock.Hash(),
BlockParts: cs.ProposalBlockParts.Header(),
}
privAccounts[i].Sign(vote)
cs.AddVote(vote)
}
// Test RunActionCommitWait:
cs.RunActionCommitWait(1, 0)
if cs.CommitTime.IsZero() {
t.Errorf("Expected CommitTime to have been set")
}
checkRoundState(t, cs, 1, 0, RoundStepCommitWait)
// Test RunActionFinalize:
cs.RunActionFinalize(1, 0)
checkRoundState(t, cs, 2, 0, RoundStepStart)
// Test TryFinalizeCommit:
cs.TryFinalizeCommit(1)
<-cs.NewStepCh() // TODO: test this value too.
checkRoundState(t, cs, 2, 0, RoundStepNewHeight)
}

+ 75
- 48
consensus/vote_set.go View File

@ -6,6 +6,7 @@ import (
"strings"
"sync"
. "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/blocks"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/state"
@ -22,14 +23,15 @@ type VoteSet struct {
round uint16
type_ byte
mtx sync.Mutex
vset *state.ValidatorSet
votes map[uint64]*Vote
votesBitArray BitArray
votesByBlockHash map[string]uint64
totalVotes uint64
twoThirdsMajority []byte
twoThirdsExists bool
mtx sync.Mutex
vset *state.ValidatorSet
votes map[uint64]*Vote
votesBitArray BitArray
votesByBlock map[string]uint64 // string(blockHash)+string(blockParts) -> vote sum.
totalVotes uint64
maj23Hash []byte
maj23Parts PartSetHeader
maj23Exists bool
}
// Constructs a new VoteSet struct used to accumulate votes for each round.
@ -38,14 +40,14 @@ func NewVoteSet(height uint32, round uint16, type_ byte, vset *state.ValidatorSe
panic("Expected round 0 for commit vote set")
}
return &VoteSet{
height: height,
round: round,
type_: type_,
vset: vset,
votes: make(map[uint64]*Vote, vset.Size()),
votesBitArray: NewBitArray(vset.Size()),
votesByBlockHash: make(map[string]uint64),
totalVotes: 0,
height: height,
round: round,
type_: type_,
vset: vset,
votes: make(map[uint64]*Vote, vset.Size()),
votesBitArray: NewBitArray(vset.Size()),
votesByBlock: make(map[string]uint64),
totalVotes: 0,
}
}
@ -96,15 +98,17 @@ func (vs *VoteSet) addVote(vote *Vote) (bool, error) {
return false, ErrVoteInvalidAccount
}
vs.votesBitArray.SetIndex(uint(voterIndex), true)
totalBlockHashVotes := vs.votesByBlockHash[string(vote.BlockHash)] + val.VotingPower
vs.votesByBlockHash[string(vote.BlockHash)] = totalBlockHashVotes
blockKey := string(vote.BlockHash) + string(BinaryBytes(vote.BlockParts))
totalBlockHashVotes := vs.votesByBlock[blockKey] + val.VotingPower
vs.votesByBlock[blockKey] = totalBlockHashVotes
vs.totalVotes += val.VotingPower
// If we just nudged it up to two thirds majority, add it.
if totalBlockHashVotes > vs.vset.TotalVotingPower()*2/3 &&
(totalBlockHashVotes-val.VotingPower) <= vs.vset.TotalVotingPower()*2/3 {
vs.twoThirdsMajority = vote.BlockHash
vs.twoThirdsExists = true
vs.maj23Hash = vote.BlockHash
vs.maj23Parts = vote.BlockParts
vs.maj23Exists = true
}
return true, nil
@ -126,7 +130,19 @@ func (vs *VoteSet) BitArray() BitArray {
return vs.votesBitArray.Copy()
}
func (vs *VoteSet) Get(id uint64) *Vote {
func (vs *VoteSet) GetByIndex(index uint) *Vote {
vs.mtx.Lock()
defer vs.mtx.Unlock()
id, val := vs.vset.GetByIndex(index)
if val == nil {
panic("GetByIndex(index) returned nil")
}
return vs.votes[id]
}
func (vs *VoteSet) GetById(id uint64) *Vote {
vs.mtx.Lock()
defer vs.mtx.Unlock()
return vs.votes[id]
@ -148,18 +164,18 @@ func (vs *VoteSet) HasTwoThirdsMajority() bool {
}
vs.mtx.Lock()
defer vs.mtx.Unlock()
return vs.twoThirdsExists
return vs.maj23Exists
}
// Returns either a blockhash (or nil) that received +2/3 majority.
// If there exists no such majority, returns (nil, false).
func (vs *VoteSet) TwoThirdsMajority() (hash []byte, ok bool) {
func (vs *VoteSet) TwoThirdsMajority() (hash []byte, parts PartSetHeader, ok bool) {
vs.mtx.Lock()
defer vs.mtx.Unlock()
if vs.twoThirdsExists {
return vs.twoThirdsMajority, true
if vs.maj23Exists {
return vs.maj23Hash, vs.maj23Parts, true
} else {
return nil, false
return nil, PartSetHeader{}, false
}
}
@ -169,25 +185,28 @@ func (vs *VoteSet) MakePOL() *POL {
}
vs.mtx.Lock()
defer vs.mtx.Unlock()
if !vs.twoThirdsExists {
if !vs.maj23Exists {
return nil
}
majHash := vs.twoThirdsMajority // hash may be nil.
pol := &POL{
Height: vs.height,
Round: vs.round,
BlockHash: majHash,
Height: vs.height,
Round: vs.round,
BlockHash: vs.maj23Hash,
BlockParts: vs.maj23Parts,
}
for _, vote := range vs.votes {
if bytes.Equal(vote.BlockHash, majHash) {
if vote.Type == VoteTypePrevote {
pol.Votes = append(pol.Votes, vote.Signature)
} else if vote.Type == VoteTypeCommit {
pol.Commits = append(pol.Votes, vote.Signature)
pol.CommitRounds = append(pol.CommitRounds, vote.Round)
} else {
Panicf("Unexpected vote type %X", vote.Type)
}
if !bytes.Equal(vote.BlockHash, vs.maj23Hash) {
continue
}
if !vote.BlockParts.Equals(vs.maj23Parts) {
continue
}
if vote.Type == VoteTypePrevote {
pol.Votes = append(pol.Votes, vote.Signature)
} else if vote.Type == VoteTypeCommit {
pol.Commits = append(pol.Commits, RoundSignature{vote.Round, vote.Signature})
} else {
Panicf("Unexpected vote type %X", vote.Type)
}
}
return pol
@ -199,18 +218,26 @@ func (vs *VoteSet) MakeValidation() Validation {
}
vs.mtx.Lock()
defer vs.mtx.Unlock()
if len(vs.twoThirdsMajority) == 0 {
if len(vs.maj23Hash) == 0 {
panic("Cannot MakeValidation() unless a blockhash has +2/3")
}
sigs := []Signature{}
for _, vote := range vs.votes {
if !bytes.Equal(vote.BlockHash, vs.twoThirdsMajority) {
continue
rsigs := make([]RoundSignature, vs.vset.Size())
vs.vset.Iterate(func(index uint, val *state.Validator) bool {
vote := vs.votes[val.Id]
if vote == nil {
return false
}
sigs = append(sigs, vote.Signature)
}
if !bytes.Equal(vote.BlockHash, vs.maj23Hash) {
return false
}
if !vote.BlockParts.Equals(vs.maj23Parts) {
return false
}
rsigs[index] = RoundSignature{vote.Round, vote.Signature}
return false
})
return Validation{
Signatures: sigs,
Commits: rsigs,
}
}


+ 18
- 18
consensus/vote_set_test.go View File

@ -14,14 +14,14 @@ func TestAddVote(t *testing.T) {
// t.Logf(">> %v", voteSet)
if voteSet.Get(0) != nil {
t.Errorf("Expected Get(0) to be nil")
if voteSet.GetById(0) != nil {
t.Errorf("Expected GetById(0) to be nil")
}
if voteSet.BitArray().GetIndex(0) {
t.Errorf("Expected BitArray.GetIndex(0) to be false")
}
hash, ok := voteSet.TwoThirdsMajority()
if hash != nil || ok {
hash, header, ok := voteSet.TwoThirdsMajority()
if hash != nil || !header.IsZero() || ok {
t.Errorf("There should be no 2/3 majority")
}
@ -29,14 +29,14 @@ func TestAddVote(t *testing.T) {
privAccounts[0].Sign(vote)
voteSet.Add(vote)
if voteSet.Get(0) == nil {
t.Errorf("Expected Get(0) to be present")
if voteSet.GetById(0) == nil {
t.Errorf("Expected GetById(0) to be present")
}
if !voteSet.BitArray().GetIndex(0) {
t.Errorf("Expected BitArray.GetIndex(0) to be true")
}
hash, ok = voteSet.TwoThirdsMajority()
if hash != nil || ok {
hash, header, ok = voteSet.TwoThirdsMajority()
if hash != nil || !header.IsZero() || ok {
t.Errorf("There should be no 2/3 majority")
}
}
@ -50,8 +50,8 @@ func Test2_3Majority(t *testing.T) {
privAccounts[i].Sign(vote)
voteSet.Add(vote)
}
hash, ok := voteSet.TwoThirdsMajority()
if hash != nil || ok {
hash, header, ok := voteSet.TwoThirdsMajority()
if hash != nil || !header.IsZero() || ok {
t.Errorf("There should be no 2/3 majority")
}
@ -59,8 +59,8 @@ func Test2_3Majority(t *testing.T) {
vote.BlockHash = CRandBytes(32)
privAccounts[6].Sign(vote)
voteSet.Add(vote)
hash, ok = voteSet.TwoThirdsMajority()
if hash != nil || ok {
hash, header, ok = voteSet.TwoThirdsMajority()
if hash != nil || !header.IsZero() || ok {
t.Errorf("There should be no 2/3 majority")
}
@ -68,8 +68,8 @@ func Test2_3Majority(t *testing.T) {
vote.BlockHash = nil
privAccounts[7].Sign(vote)
voteSet.Add(vote)
hash, ok = voteSet.TwoThirdsMajority()
if hash != nil || !ok {
hash, header, ok = voteSet.TwoThirdsMajority()
if hash != nil || !header.IsZero() || !ok {
t.Errorf("There should be 2/3 majority for nil")
}
@ -128,8 +128,8 @@ func TestAddCommitsToPrevoteVotes(t *testing.T) {
privAccounts[i].Sign(vote)
voteSet.Add(vote)
}
hash, ok := voteSet.TwoThirdsMajority()
if hash != nil || ok {
hash, header, ok := voteSet.TwoThirdsMajority()
if hash != nil || !header.IsZero() || ok {
t.Errorf("There should be no 2/3 majority")
}
@ -174,8 +174,8 @@ func TestAddCommitsToPrevoteVotes(t *testing.T) {
}
// We should have 2/3 majority
hash, ok = voteSet.TwoThirdsMajority()
if hash != nil || !ok {
hash, header, ok = voteSet.TwoThirdsMajority()
if hash != nil || !header.IsZero() || !ok {
t.Errorf("There should be 2/3 majority for nil")
}


+ 50
- 5
state/state.go View File

@ -288,11 +288,56 @@ func (s *State) releaseValidator(accountId uint64) {
// at an invalid state. Copy the state before calling AppendBlock!
func (s *State) AppendBlock(b *Block, checkStateHash bool) error {
// Basic block validation.
// XXX We need to validate LastBlockParts too.
err := b.ValidateBasic(s.Height, s.BlockHash)
if err != nil {
return err
}
// Validate block Validation.
if b.Height == 1 {
if len(b.Validation.Commits) != 0 {
return errors.New("Block at height 1 (first block) should have no Validation commits")
}
} else {
if uint(len(b.Validation.Commits)) != s.BondedValidators.Size() {
return errors.New("Invalid block validation size")
}
var sumVotingPower uint64
s.BondedValidators.Iterate(func(index uint, val *Validator) bool {
rsig := b.Validation.Commits[index]
if rsig.IsZero() {
return false
} else {
if rsig.SignerId != val.Id {
err = errors.New("Invalid validation order")
return true
}
vote := &Vote{
Height: b.Height,
Round: rsig.Round,
Type: VoteTypeCommit,
BlockHash: b.LastBlockHash,
BlockParts: b.LastBlockParts,
Signature: rsig.Signature,
}
if val.Verify(vote) {
sumVotingPower += val.VotingPower
return false
} else {
err = errors.New("Invalid validation signature")
return true
}
}
})
if err != nil {
return err
}
if sumVotingPower <= s.BondedValidators.TotalVotingPower()*2/3 {
return errors.New("Insufficient validation voting power")
}
}
// Commit each tx
for _, tx := range b.Data.Txs {
err := s.ExecTx(tx)
@ -301,9 +346,9 @@ func (s *State) AppendBlock(b *Block, checkStateHash bool) error {
}
}
// Update LastCommitHeight as necessary.
for _, sig := range b.Validation.Signatures {
_, val := s.BondedValidators.GetById(sig.SignerId)
// Update Validator.LastCommitHeight as necessary.
for _, rsig := range b.Validation.Commits {
_, val := s.BondedValidators.GetById(rsig.SignerId)
if val == nil {
return ErrStateInvalidSignature
}
@ -317,7 +362,7 @@ func (s *State) AppendBlock(b *Block, checkStateHash bool) error {
// If any unbonding periods are over,
// reward account with bonded coins.
toRelease := []*Validator{}
s.UnbondingValidators.Iterate(func(val *Validator) bool {
s.UnbondingValidators.Iterate(func(index uint, val *Validator) bool {
if val.UnbondHeight+unbondingPeriodBlocks < b.Height {
toRelease = append(toRelease, val)
}
@ -330,7 +375,7 @@ func (s *State) AppendBlock(b *Block, checkStateHash bool) error {
// If any validators haven't signed in a while,
// unbond them, they have timed out.
toTimeout := []*Validator{}
s.BondedValidators.Iterate(func(val *Validator) bool {
s.BondedValidators.Iterate(func(index uint, val *Validator) bool {
if val.LastCommitHeight+validatorTimeoutBlocks < b.Height {
toTimeout = append(toTimeout, val)
}


+ 1
- 0
state/state_test.go View File

@ -89,6 +89,7 @@ func TestGenesisSaveLoad(t *testing.T) {
Height: 1,
StateHash: nil,
},
Validation: Validation{},
Data: Data{
Txs: []Tx{},
},


+ 6
- 3
state/validator_set.go View File

@ -127,9 +127,12 @@ func (vset *ValidatorSet) Remove(validatorId uint64) (val *Validator, removed bo
return val_.(*Validator), removed
}
func (vset *ValidatorSet) Iterate(fn func(val *Validator) bool) {
func (vset *ValidatorSet) Iterate(fn func(index uint, val *Validator) bool) {
index := uint(0)
vset.validators.Iterate(func(key_ interface{}, val_ interface{}) bool {
return fn(val_.(*Validator))
stop := fn(index, val_.(*Validator))
index++
return stop
})
}
@ -139,7 +142,7 @@ func (vset *ValidatorSet) String() string {
func (vset *ValidatorSet) StringWithIndent(indent string) string {
valStrings := []string{}
vset.Iterate(func(val *Validator) bool {
vset.Iterate(func(index uint, val *Validator) bool {
valStrings = append(valStrings, val.String())
return false
})


Loading…
Cancel
Save