Browse Source

commitTime is used to derive next startTime. :)

pull/9/head
Jae Kwon 10 years ago
parent
commit
7523f501fd
4 changed files with 97 additions and 76 deletions
  1. +53
    -33
      consensus/consensus.go
  2. +4
    -2
      consensus/state.go
  3. +31
    -38
      consensus/vote.go
  4. +9
    -3
      state/state.go

+ 53
- 33
consensus/consensus.go View File

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"math"
"math/rand"
"sync"
"sync/atomic"
"time"
@ -29,6 +30,10 @@ const (
roundDurationDelta = 15 * time.Second // Each successive round lasts 15 seconds longer.
roundDeadlineBare = float64(1.0 / 3.0) // When the bare vote is due.
roundDeadlinePrecommit = float64(2.0 / 3.0) // When the precommit vote is due.
newBlockWaitDuration = roundDuration0 / 3 // The time to wait between commitTime and startTime of next consensus rounds.
voteRankCutoff = 2 // Higher ranks --> do not send votes.
unsolicitedVoteRate = 0.01 // Probability of sending a high ranked vote.
)
//-----------------------------------------------------------------------------
@ -45,10 +50,11 @@ func calcRoundStartTime(round uint16, startTime time.Time) time.Time {
}
// calcs the current round given startTime of round zero.
// NOTE: round is zero if startTime is in the future.
func calcRound(startTime time.Time) uint16 {
now := time.Now()
if now.Before(startTime) {
Panicf("Cannot calc round when startTime is in the future: %v", startTime)
return 0
}
// Start + D_0 * R + D_delta * (R^2 - R)/2 <= Now; find largest integer R.
// D_delta * R^2 + (2D_0 - D_delta) * R + 2(Start - Now) <= 0.
@ -71,6 +77,7 @@ func calcRound(startTime time.Time) uint16 {
}
// convenience
// NOTE: elapsedRatio can be negative if startTime is in the future.
func calcRoundInfo(startTime time.Time) (round uint16, roundStartTime time.Time, roundDuration time.Duration,
roundElapsed time.Duration, elapsedRatio float64) {
round = calcRound(startTime)
@ -346,7 +353,7 @@ func (cm *ConsensusManager) stageProposal(proposal *BlockPartSet) error {
cm.mtx.Unlock()
// Commit block onto the copied state.
err = stateCopy.CommitBlock(block, block.Header.Time) // NOTE: fake commit time.
err = stateCopy.CommitBlock(block)
if err != nil {
return err
}
@ -409,7 +416,8 @@ func (cm *ConsensusManager) voteProposal(rs *RoundState) error {
func (cm *ConsensusManager) precommitProposal(rs *RoundState) error {
// If we see a 2/3 majority for votes for a block, precommit.
if hash, ok := rs.RoundBareVotes.TwoThirdsMajority(); ok {
// TODO: maybe could use commitTime here and avg it with later commitTime?
if hash, _, ok := rs.RoundBareVotes.TwoThirdsMajority(); ok {
if len(hash) == 0 {
// 2/3 majority voted for nil.
return nil
@ -442,18 +450,18 @@ func (cm *ConsensusManager) precommitProposal(rs *RoundState) error {
}
// Commit or unlock.
// Call after RoundStepPrecommit, after round has expired.
func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) error {
// Call after RoundStepPrecommit, after round has completely expired.
func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) (commitTime time.Time, err error) {
// If there exists a 2/3 majority of precommits.
// Validate the block and commit.
if hash, ok := rs.RoundPrecommits.TwoThirdsMajority(); ok {
if hash, commitTime, ok := rs.RoundPrecommits.TwoThirdsMajority(); ok {
// If the proposal is invalid or we don't have it,
// do not commit.
// TODO If we were just late to receive the block, when
// do we actually get it? Document it.
if cm.stageProposal(rs.Proposal) != nil {
return nil
return time.Time{}, nil
}
// TODO: Remove?
cm.cs.LockProposal(rs.Proposal)
@ -465,16 +473,11 @@ func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) error {
Hash: hash,
})
if err != nil {
return err
return time.Time{}, err
}
// Commit block.
// XXX use adjusted commit time.
// If we just use time.Now() we're not converging
// time differences between nodes, so nodes end up drifting
// in time.
commitTime := time.Now()
cm.commitProposal(rs.Proposal, commitTime)
return nil
return commitTime, nil
} else {
// Otherwise, if a 1/3 majority if a block that isn't our locked one exists, unlock.
locked := cm.cs.LockedProposal()
@ -483,14 +486,13 @@ func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) error {
if hashOrNil == nil {
continue
}
hash := hashOrNil.([]byte)
if !bytes.Equal(hash, locked.Block().Hash()) {
if !bytes.Equal(hashOrNil, locked.Block().Hash()) {
// Unlock our lock.
cm.cs.LockProposal(nil)
}
}
}
return nil
return time.Time{}, nil
}
}
@ -511,6 +513,7 @@ func (cm *ConsensusManager) commitProposal(proposal *BlockPartSet, commitTime ti
// What was staged becomes committed.
cm.state = cm.stagedState
cm.state.Save(commitTime)
cm.cs.Update(cm.state)
cm.stagedProposal = nil
cm.stagedState = nil
@ -650,7 +653,11 @@ func (cm *ConsensusManager) proposeAndVoteRoutine() {
_, _, roundDuration, _, elapsedRatio := calcRoundInfo(rs.StartTime)
switch rs.Step() {
case RoundStepStart:
// It's a new RoundState, immediately wake up and xn to RoundStepProposal.
// It's a new RoundState.
if elapsedRatio < 0 {
// startTime is in the future.
time.Sleep(time.Duration(-1.0*elapsedRatio) * roundDuration)
}
cm.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepProposal}
case RoundStepProposal:
// Wake up when it's time to vote.
@ -722,14 +729,21 @@ func (cm *ConsensusManager) proposeAndVoteRoutine() {
log.Info("Error attempting to precommit for proposal: %v", err)
}
} else if step == RoundStepCommitOrUnlock && rs.Step() <= RoundStepPrecommits {
err := cm.commitOrUnlockProposal(rs)
commitTime, err := cm.commitOrUnlockProposal(rs)
if err != nil {
log.Info("Error attempting to commit or update for proposal: %v", err)
}
// Round is over. This is a special case.
// Prepare a new RoundState for the next state.
cm.cs.SetupRound(rs.Round + 1)
return // setAlarm() takes care of the rest.
if !commitTime.IsZero() {
// We already set up ConsensusState for the next height
// (it happens in the call to cm.commitProposal).
// XXX: call cm.cs.SetupHeight()
} else {
// Round is over. This is a special case.
// Prepare a new RoundState for the next state.
cm.cs.SetupRound(rs.Round + 1)
return // setAlarm() takes care of the rest.
}
} else {
return // Action is not relevant.
}
@ -747,6 +761,7 @@ var (
ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
)
// TODO: voteRanks should purge bygone validators.
type PeerState struct {
mtx sync.Mutex
connected bool
@ -754,7 +769,7 @@ type PeerState struct {
height uint32
startTime time.Time // Derived from offset seconds.
blockPartsBitArray []byte
votesWanted map[uint64]float32
voteRanks map[uint64]uint8
cbHeight uint32
cbRound uint16
cbFunc func()
@ -762,10 +777,10 @@ type PeerState struct {
func NewPeerState(peer *p2p.Peer) *PeerState {
return &PeerState{
connected: true,
peer: peer,
height: 0,
votesWanted: make(map[uint64]float32),
connected: true,
peer: peer,
height: 0,
voteRanks: make(map[uint64]uint8),
}
}
@ -818,10 +833,15 @@ func (ps *PeerState) WantsVote(vote *Vote) bool {
if !ps.connected {
return false
}
// Only wants the vote if votesWanted says so
if ps.votesWanted[vote.SignerId] <= 0 {
// TODO: sometimes, send unsolicited votes to see if peer wants it.
return false
// Only wants the vote if voteRank is low.
if ps.voteRanks[vote.SignerId] > voteRankCutoff {
// Sometimes, send unsolicited votes to see if peer wants it.
if rand.Float32() < unsolicitedVoteRate {
// Continue on...
} else {
// Rank too high. Do not send vote.
return false
}
}
// Only wants the vote if peer's current height and round matches.
if ps.height == vote.Height {
@ -890,7 +910,7 @@ func (ps *PeerState) ApplyKnownBlockPartsMessage(msg *KnownBlockPartsMessage) er
func (ps *PeerState) ApplyVoteRankMessage(msg *VoteRankMessage) error {
ps.mtx.Lock()
defer ps.mtx.Unlock()
// XXX IMPLEMENT
ps.voteRanks[msg.ValidatorId] = msg.Rank
return nil
}


+ 4
- 2
consensus/state.go View File

@ -22,6 +22,7 @@ type ConsensusState struct {
startTime time.Time // Start of round 0 for this height.
commits *VoteSet // Commits for this height.
roundState *RoundState // The RoundState object for the current round.
commitTime time.Time // Time at which a block was found to be committed by +2/3.
}
func NewConsensusState(state *State) *ConsensusState {
@ -54,6 +55,7 @@ func (cs *ConsensusState) RoundState() *RoundState {
return cs.roundState
}
// Primarily gets called upon block commit by ConsensusManager.
func (cs *ConsensusState) Update(state *State) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
@ -68,7 +70,7 @@ func (cs *ConsensusState) Update(state *State) {
cs.height = stateHeight
cs.validatorsR0 = state.Validators().Copy() // NOTE: immutable.
cs.lockedProposal = nil
cs.startTime = state.CommitTime() // XXX is this what we want?
cs.startTime = state.CommitTime().Add(newBlockWaitDuration) // NOTE: likely future time.
cs.commits = NewVoteSet(stateHeight, 0, VoteTypeCommit, cs.validatorsR0)
// Setup the roundState
@ -77,7 +79,7 @@ func (cs *ConsensusState) Update(state *State) {
}
// If cs.roundSTate isn't at round, set up new roundState at round.
// If cs.roundState isn't at round, set up new roundState at round.
func (cs *ConsensusState) SetupRound(round uint16) {
cs.mtx.Lock()
defer cs.mtx.Unlock()


+ 31
- 38
consensus/vote.go View File

@ -5,6 +5,7 @@ import (
"errors"
"io"
"sync"
"time"
. "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/blocks"
@ -61,16 +62,19 @@ func (v *Vote) GetDocument() string {
// VoteSet helps collect signatures from validators at each height+round
// for a predefined vote type.
// TODO: test majority calculations etc.
type VoteSet struct {
mtx sync.Mutex
height uint32
round uint16
type_ byte
validators *ValidatorSet
votes map[uint64]*Vote
votesByHash map[string]uint64
totalVotes uint64
totalVotingPower uint64
mtx sync.Mutex
height uint32
round uint16
type_ byte
validators *ValidatorSet
votes map[uint64]*Vote
votesByHash map[string]uint64
totalVotes uint64
totalVotingPower uint64
oneThirdMajority [][]byte
twoThirdsCommitTime time.Time
}
// Constructs a new VoteSet struct used to accumulate votes for each round.
@ -126,49 +130,38 @@ func (vs *VoteSet) AddVote(vote *Vote) (bool, error) {
}
}
vs.votes[vote.SignerId] = vote
vs.votesByHash[string(vote.Hash)] += val.VotingPower
totalHashVotes := vs.votesByHash[string(vote.Hash)] + val.VotingPower
vs.votesByHash[string(vote.Hash)] = totalHashVotes
vs.totalVotes += val.VotingPower
// If we just nudged it up to one thirds majority, add it.
if totalHashVotes > vs.totalVotingPower/3 &&
(totalHashVotes-val.VotingPower) <= vs.totalVotingPower/3 {
vs.oneThirdMajority = append(vs.oneThirdMajority, vote.Hash)
} else if totalHashVotes > vs.totalVotingPower*2/3 &&
(totalHashVotes-val.VotingPower) <= vs.totalVotingPower*2/3 {
vs.twoThirdsCommitTime = time.Now()
}
return true, nil
}
// Returns either a blockhash (or nil) that received +2/3 majority.
// If there exists no such majority, returns (nil, false).
func (vs *VoteSet) TwoThirdsMajority() (hash []byte, ok bool) {
func (vs *VoteSet) TwoThirdsMajority() (hash []byte, commitTime time.Time, ok bool) {
vs.mtx.Lock()
defer vs.mtx.Unlock()
twoThirdsMajority := (vs.totalVotingPower*2 + 2) / 3
if vs.totalVotes < twoThirdsMajority {
return nil, false
}
for hash, votes := range vs.votesByHash {
if votes >= twoThirdsMajority {
if hash == "" {
return nil, true
} else {
return []byte(hash), true
}
// There's only one or two in the array.
for _, hash := range vs.oneThirdMajority {
if vs.votesByHash[string(hash)] > vs.totalVotingPower*2/3 {
return hash, vs.twoThirdsCommitTime, true
}
}
return nil, false
return nil, time.Time{}, false
}
// Returns blockhashes (or nil) that received a +1/3 majority.
// If there exists no such majority, returns nil.
func (vs *VoteSet) OneThirdMajority() (hashes []interface{}) {
func (vs *VoteSet) OneThirdMajority() (hashes [][]byte) {
vs.mtx.Lock()
defer vs.mtx.Unlock()
oneThirdMajority := (vs.totalVotingPower + 2) / 3
if vs.totalVotes < oneThirdMajority {
return nil
}
for hash, votes := range vs.votesByHash {
if votes >= oneThirdMajority {
if hash == "" {
hashes = append(hashes, nil)
} else {
hashes = append(hashes, []byte(hash))
}
}
}
return hashes
return vs.oneThirdMajority
}

+ 9
- 3
state/state.go View File

@ -52,15 +52,19 @@ func LoadState(db db_.Db) *State {
return s
}
func (s *State) Save() {
// Save this state into the db.
// For convenience, the commitTime (required by ConsensusManager)
// is saved here.
func (s *State) Save(commitTime time.Time) {
s.mtx.Lock()
defer s.mtx.Unlock()
s.commitTime = commitTime
s.accounts.Save()
var buf bytes.Buffer
var n int64
var err error
WriteUInt32(&buf, s.height, &n, &err)
WriteTime(&buf, s.commitTime, &n, &err)
WriteTime(&buf, commitTime, &n, &err)
WriteByteSlice(&buf, s.accounts.Hash(), &n, &err)
for _, validator := range s.validators.Map() {
WriteBinary(&buf, validator, &n, &err)
@ -91,7 +95,9 @@ func (s *State) CommitTx(tx *Tx) error {
return nil
}
func (s *State) CommitBlock(b *Block, commitTime time.Time) error {
// This is called during staging.
// The resulting state is cached until it is actually committed.
func (s *State) CommitBlock(b *Block) error {
s.mtx.Lock()
defer s.mtx.Unlock()
// TODO commit the txs


Loading…
Cancel
Save