Browse Source

Merge pull request #319 from tendermint/valset

Validator Set Changes
pull/328/head
Ethan Buchman 8 years ago
committed by GitHub
parent
commit
538a50c9e8
19 changed files with 646 additions and 475 deletions
  1. +2
    -1
      blockchain/reactor.go
  2. +1
    -0
      blockchain/store.go
  3. +278
    -0
      consensus/byzantine_test.go
  4. +12
    -1
      consensus/common.go
  5. +42
    -0
      consensus/common_test.go
  6. +1
    -6
      consensus/reactor.go
  7. +95
    -261
      consensus/reactor_test.go
  8. +10
    -6
      consensus/state.go
  9. +11
    -11
      glide.lock
  10. +2
    -2
      node/node.go
  11. +9
    -3
      rpc/test/client_test.go
  12. +118
    -137
      state/execution.go
  13. +24
    -7
      state/execution_test.go
  14. +9
    -6
      state/state.go
  15. +4
    -4
      test/app/dummy_test.sh
  16. +9
    -5
      types/block.go
  17. +1
    -1
      types/protobuf.go
  18. +14
    -14
      types/validator.go
  19. +4
    -10
      types/validator_set_test.go

+ 2
- 1
blockchain/reactor.go View File

@ -221,7 +221,7 @@ FOR_LOOP:
// We need both to sync the first block.
break SYNC_LOOP
}
firstParts := first.MakePartSet(bcR.config.GetInt("block_part_size"))
firstParts := first.MakePartSet(bcR.config.GetInt("block_part_size")) // TODO: put part size in parts header?
firstPartsHeader := firstParts.Header()
// Finally, verify the first block using the second's commit
// NOTE: we can probably make this more efficient, but note that calling
@ -236,6 +236,7 @@ FOR_LOOP:
} else {
bcR.pool.PopRequest()
// TODO: use ApplyBlock instead of Exec/Commit/SetAppHash/Save
// TODO: should we be firing events? need to fire NewBlock events manually ...
err := bcR.state.ExecBlock(bcR.evsw, bcR.proxyAppConn, first, firstPartsHeader)
if err != nil {
// TODO This is bad, are we zombie?


+ 1
- 0
blockchain/store.go View File

@ -163,6 +163,7 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s
bs.db.Set(calcBlockCommitKey(height-1), blockCommitBytes)
// Save seen commit (seen +2/3 precommits for block)
// NOTE: we can delete this at a later height
seenCommitBytes := wire.BinaryBytes(seenCommit)
bs.db.Set(calcSeenCommitKey(height), seenCommitBytes)


+ 278
- 0
consensus/byzantine_test.go View File

@ -0,0 +1,278 @@
package consensus
import (
"sync"
"testing"
"time"
"github.com/tendermint/tendermint/config/tendermint_test"
. "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-crypto"
"github.com/tendermint/go-events"
"github.com/tendermint/go-p2p"
"github.com/tendermint/tendermint/types"
)
func init() {
config = tendermint_test.ResetConfig("consensus_byzantine_test")
}
//----------------------------------------------
// byzantine failures
// 4 validators. 1 is byzantine. The other three are partitioned into A (1 val) and B (2 vals).
// byzantine validator sends conflicting proposals into A and B,
// and prevotes/precommits on both of them.
// B sees a commit, A doesn't.
// Byzantine validator refuses to prevote.
// Heal partition and ensure A sees the commit
func TestByzantine(t *testing.T) {
resetConfigTimeouts()
N := 4
css := randConsensusNet(N)
switches := make([]*p2p.Switch, N)
for i := 0; i < N; i++ {
switches[i] = p2p.NewSwitch(cfg.NewMapConfig(nil))
}
reactors := make([]p2p.Reactor, N)
eventChans := make([]chan interface{}, N)
for i := 0; i < N; i++ {
if i == 0 {
css[i].privValidator = NewByzantinePrivValidator(css[i].privValidator.(*types.PrivValidator))
// make byzantine
css[i].decideProposal = func(j int) func(int, int) {
return func(height, round int) {
byzantineDecideProposalFunc(height, round, css[j], switches[j])
}
}(i)
css[i].doPrevote = func(height, round int) {}
}
eventSwitch := events.NewEventSwitch()
_, err := eventSwitch.Start()
if err != nil {
t.Fatalf("Failed to start switch: %v", err)
}
eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1)
conR := NewConsensusReactor(css[i], false)
conR.SetEventSwitch(eventSwitch)
var conRI p2p.Reactor
conRI = conR
if i == 0 {
conRI = NewByzantineReactor(conR)
}
reactors[i] = conRI
}
p2p.MakeConnectedSwitches(N, func(i int, s *p2p.Switch) *p2p.Switch {
// ignore new switch s, we already made ours
switches[i].AddReactor("CONSENSUS", reactors[i])
return switches[i]
}, func(sws []*p2p.Switch, i, j int) {
// the network starts partitioned with globally active adversary
if i != 0 {
return
}
p2p.Connect2Switches(sws, i, j)
})
// byz proposer sends one block to peers[0]
// and the other block to peers[1] and peers[2].
// note peers and switches order don't match.
peers := switches[0].Peers().List()
ind0 := getSwitchIndex(switches, peers[0])
ind1 := getSwitchIndex(switches, peers[1])
ind2 := getSwitchIndex(switches, peers[2])
// connect the 2 peers in the larger partition
p2p.Connect2Switches(switches, ind1, ind2)
// wait for someone in the big partition to make a block
select {
case <-eventChans[ind2]:
}
log.Notice("A block has been committed. Healing partition")
// connect the partitions
p2p.Connect2Switches(switches, ind0, ind1)
p2p.Connect2Switches(switches, ind0, ind2)
// wait till everyone makes the first new block
// (one of them already has)
wg := new(sync.WaitGroup)
wg.Add(2)
for i := 1; i < N-1; i++ {
go func(j int) {
<-eventChans[j]
wg.Done()
}(i)
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
tick := time.NewTicker(time.Second * 10)
select {
case <-done:
case <-tick.C:
for i, reactor := range reactors {
t.Log(Fmt("Consensus Reactor %v", i))
t.Log(Fmt("%v", reactor))
}
t.Fatalf("Timed out waiting for all validators to commit first block")
}
}
//-------------------------------
// byzantine consensus functions
func byzantineDecideProposalFunc(height, round int, cs *ConsensusState, sw *p2p.Switch) {
// byzantine user should create two proposals and try to split the vote.
// Avoid sending on internalMsgQueue and running consensus state.
// Create a new proposal block from state/txs from the mempool.
block1, blockParts1 := cs.createProposalBlock()
polRound, polBlockID := cs.Votes.POLInfo()
proposal1 := types.NewProposal(height, round, blockParts1.Header(), polRound, polBlockID)
cs.privValidator.SignProposal(cs.state.ChainID, proposal1) // byzantine doesnt err
// Create a new proposal block from state/txs from the mempool.
block2, blockParts2 := cs.createProposalBlock()
polRound, polBlockID = cs.Votes.POLInfo()
proposal2 := types.NewProposal(height, round, blockParts2.Header(), polRound, polBlockID)
cs.privValidator.SignProposal(cs.state.ChainID, proposal2) // byzantine doesnt err
block1Hash := block1.Hash()
block2Hash := block2.Hash()
// broadcast conflicting proposals/block parts to peers
peers := sw.Peers().List()
log.Notice("Byzantine: broadcasting conflicting proposals", "peers", len(peers))
for i, peer := range peers {
if i < len(peers)/2 {
go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1)
} else {
go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2)
}
}
}
func sendProposalAndParts(height, round int, cs *ConsensusState, peer *p2p.Peer, proposal *types.Proposal, blockHash []byte, parts *types.PartSet) {
// proposal
msg := &ProposalMessage{Proposal: proposal}
peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
// parts
for i := 0; i < parts.Total(); i++ {
part := parts.GetPart(i)
msg := &BlockPartMessage{
Height: height, // This tells peer that this part applies to us.
Round: round, // This tells peer that this part applies to us.
Part: part,
}
peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
}
// votes
cs.mtx.Lock()
prevote, _ := cs.signVote(types.VoteTypePrevote, blockHash, parts.Header())
precommit, _ := cs.signVote(types.VoteTypePrecommit, blockHash, parts.Header())
cs.mtx.Unlock()
peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{prevote}})
peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{precommit}})
}
//----------------------------------------
// byzantine consensus reactor
type ByzantineReactor struct {
Service
reactor *ConsensusReactor
}
func NewByzantineReactor(conR *ConsensusReactor) *ByzantineReactor {
return &ByzantineReactor{
Service: conR,
reactor: conR,
}
}
func (br *ByzantineReactor) SetSwitch(s *p2p.Switch) { br.reactor.SetSwitch(s) }
func (br *ByzantineReactor) GetChannels() []*p2p.ChannelDescriptor { return br.reactor.GetChannels() }
func (br *ByzantineReactor) AddPeer(peer *p2p.Peer) {
if !br.reactor.IsRunning() {
return
}
// Create peerState for peer
peerState := NewPeerState(peer)
peer.Data.Set(types.PeerStateKey, peerState)
// Send our state to peer.
// If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
if !br.reactor.fastSync {
br.reactor.sendNewRoundStepMessage(peer)
}
}
func (br *ByzantineReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
br.reactor.RemovePeer(peer, reason)
}
func (br *ByzantineReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte) {
br.reactor.Receive(chID, peer, msgBytes)
}
//----------------------------------------
// byzantine privValidator
type ByzantinePrivValidator struct {
Address []byte `json:"address"`
types.Signer `json:"-"`
mtx sync.Mutex
}
// Return a priv validator that will sign anything
func NewByzantinePrivValidator(pv *types.PrivValidator) *ByzantinePrivValidator {
return &ByzantinePrivValidator{
Address: pv.Address,
Signer: pv.Signer,
}
}
func (privVal *ByzantinePrivValidator) GetAddress() []byte {
return privVal.Address
}
func (privVal *ByzantinePrivValidator) SignVote(chainID string, vote *types.Vote) error {
privVal.mtx.Lock()
defer privVal.mtx.Unlock()
// Sign
vote.Signature = privVal.Sign(types.SignBytes(chainID, vote)).(crypto.SignatureEd25519)
return nil
}
func (privVal *ByzantinePrivValidator) SignProposal(chainID string, proposal *types.Proposal) error {
privVal.mtx.Lock()
defer privVal.mtx.Unlock()
// Sign
proposal.Signature = privVal.Sign(types.SignBytes(chainID, proposal)).(crypto.SignatureEd25519)
return nil
}
func (privVal *ByzantinePrivValidator) String() string {
return Fmt("PrivValidator{%X}", privVal.Address)
}

+ 12
- 1
consensus/common.go View File

@ -4,7 +4,7 @@ import (
"github.com/tendermint/tendermint/types"
)
// NOTE: this is blocking
// NOTE: if chanCap=0, this blocks on the event being consumed
func subscribeToEvent(evsw types.EventSwitch, receiver, eventID string, chanCap int) chan interface{} {
// listen for event
ch := make(chan interface{}, chanCap)
@ -13,3 +13,14 @@ func subscribeToEvent(evsw types.EventSwitch, receiver, eventID string, chanCap
})
return ch
}
// NOTE: this blocks on receiving a response after the event is consumed
func subscribeToEventRespond(evsw types.EventSwitch, receiver, eventID string) chan interface{} {
// listen for event
ch := make(chan interface{})
types.AddListenerForEvent(evsw, receiver, eventID, func(data types.TMEventData) {
ch <- data
<-ch
})
return ch
}

+ 42
- 0
consensus/common_test.go View File

@ -3,6 +3,7 @@ package consensus
import (
"bytes"
"fmt"
"io/ioutil"
"sort"
"sync"
"testing"
@ -11,6 +12,7 @@ import (
. "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
dbm "github.com/tendermint/go-db"
"github.com/tendermint/go-p2p"
bc "github.com/tendermint/tendermint/blockchain"
"github.com/tendermint/tendermint/config/tendermint_test"
mempl "github.com/tendermint/tendermint/mempool"
@ -33,6 +35,8 @@ type validatorStub struct {
*types.PrivValidator
}
var testMinPower = 10
func NewValidatorStub(privValidator *types.PrivValidator, valIndex int) *validatorStub {
return &validatorStub{
Index: valIndex,
@ -266,6 +270,31 @@ func randConsensusNet(nValidators int) []*ConsensusState {
return css
}
// nPeers = nValidators + nNotValidator
func randConsensusNetWithPeers(nValidators int, nPeers int) []*ConsensusState {
genDoc, privVals := randGenesisDoc(nValidators, false, int64(testMinPower))
css := make([]*ConsensusState, nPeers)
for i := 0; i < nPeers; i++ {
db := dbm.NewMemDB() // each state needs its own db
state := sm.MakeGenesisState(db, genDoc)
state.Save()
thisConfig := tendermint_test.ResetConfig(Fmt("consensus_reactor_test_%d", i))
EnsureDir(thisConfig.GetString("cs_wal_dir"), 0700) // dir for wal
var privVal *types.PrivValidator
if i < nValidators {
privVal = privVals[i]
} else {
privVal = types.GenPrivValidator()
_, tempFilePath := Tempfile("priv_validator_")
privVal.SetFile(tempFilePath)
}
dir, _ := ioutil.TempDir("/tmp", "persistent-dummy")
css[i] = newConsensusStateWithConfig(thisConfig, state, privVal, dummy.NewPersistentDummyApplication(dir))
}
return css
}
func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} {
voteCh0 := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 1)
voteCh := make(chan interface{})
@ -325,3 +354,16 @@ func startTestRound(cs *ConsensusState, height, round int) {
cs.enterNewRound(height, round)
cs.startRoutines(0)
}
//--------------------------------
// reactor stuff
func getSwitchIndex(switches []*p2p.Switch, peer *p2p.Peer) int {
for i, s := range switches {
if bytes.Equal(peer.NodeInfo.PubKey.Address(), s.NodeInfo().PubKey.Address()) {
return i
}
}
panic("didnt find peer in switches")
return -1
}

+ 1
- 6
consensus/reactor.go View File

@ -287,11 +287,6 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
}
}
// Sets our private validator account for signing votes.
func (conR *ConsensusReactor) SetPrivValidator(priv PrivValidator) {
conR.conS.SetPrivValidator(priv)
}
// implements events.Eventable
func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) {
conR.evsw = evsw
@ -954,7 +949,7 @@ func (ps *PeerState) SetHasVote(vote *types.Vote) {
func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
log := log.New("peer", ps.Peer, "peerRound", ps.Round, "height", height, "round", round)
log.Info("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
log.Debug("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
// NOTE: some may be nil BitArrays -> no side effects.
ps.getVoteBitArray(height, round, type_).SetIndex(index, true)


+ 95
- 261
consensus/reactor_test.go View File

@ -1,20 +1,18 @@
package consensus
import (
"bytes"
"fmt"
"sync"
"testing"
"time"
"github.com/tendermint/tendermint/config/tendermint_test"
. "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-crypto"
"github.com/tendermint/go-events"
"github.com/tendermint/go-logger"
"github.com/tendermint/go-p2p"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tmsp/example/dummy"
)
func init() {
@ -22,7 +20,7 @@ func init() {
}
func resetConfigTimeouts() {
logger.SetLogLevel("notice")
logger.SetLogLevel("info")
//config.Set("log_level", "notice")
config.Set("timeout_propose", 2000)
// config.Set("timeout_propose_delta", 500)
@ -30,9 +28,13 @@ func resetConfigTimeouts() {
// config.Set("timeout_prevote_delta", 500)
// config.Set("timeout_precommit", 1000)
// config.Set("timeout_precommit_delta", 500)
// config.Set("timeout_commit", 1000)
config.Set("timeout_commit", 1000)
}
//----------------------------------------------
// in-process testnets
// Ensure a testnet makes blocks
func TestReactor(t *testing.T) {
resetConfigTimeouts()
N := 4
@ -41,7 +43,6 @@ func TestReactor(t *testing.T) {
eventChans := make([]chan interface{}, N)
for i := 0; i < N; i++ {
reactors[i] = NewConsensusReactor(css[i], false)
reactors[i].SetPrivValidator(css[i].privValidator)
eventSwitch := events.NewEventSwitch()
_, err := eventSwitch.Start()
@ -52,300 +53,133 @@ func TestReactor(t *testing.T) {
reactors[i].SetEventSwitch(eventSwitch)
eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1)
}
// make connected switches and start all reactors
p2p.MakeConnectedSwitches(N, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("CONSENSUS", reactors[i])
return s
}, p2p.Connect2Switches)
// wait till everyone makes the first new block
wg := new(sync.WaitGroup)
wg.Add(N)
for i := 0; i < N; i++ {
go func(j int) {
<-eventChans[j]
wg.Done()
}(i)
}
// Make wait into a channel
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
tick := time.NewTicker(time.Second * 3)
select {
case <-done:
case <-tick.C:
t.Fatalf("Timed out waiting for all validators to commit first block")
}
timeoutWaitGroup(t, N, func(wg *sync.WaitGroup, j int) {
<-eventChans[j]
wg.Done()
})
}
// 4 validators. 1 is byzantine. The other three are partitioned into A (1 val) and B (2 vals).
// byzantine validator sends conflicting proposals into A and B,
// and prevotes/precommits on both of them.
// B sees a commit, A doesn't.
// Byzantine validator refuses to prevote.
// Heal partition and ensure A sees the commit
func TestByzantine(t *testing.T) {
resetConfigTimeouts()
N := 4
css := randConsensusNet(N)
switches := make([]*p2p.Switch, N)
for i := 0; i < N; i++ {
switches[i] = p2p.NewSwitch(cfg.NewMapConfig(nil))
}
//-------------------------------------------------------------
// ensure we can make blocks despite cycling a validator set
reactors := make([]p2p.Reactor, N)
eventChans := make([]chan interface{}, N)
for i := 0; i < N; i++ {
var privVal PrivValidator
privVal = css[i].privValidator
if i == 0 {
privVal = NewByzantinePrivValidator(privVal.(*types.PrivValidator))
// make byzantine
css[i].decideProposal = func(j int) func(int, int) {
return func(height, round int) {
byzantineDecideProposalFunc(height, round, css[j], switches[j])
}
}(i)
css[i].doPrevote = func(height, round int) {}
}
func TestValidatorSetChanges(t *testing.T) {
resetConfigTimeouts()
nPeers := 8
nVals := 4
css := randConsensusNetWithPeers(nVals, nPeers)
reactors := make([]*ConsensusReactor, nPeers)
eventChans := make([]chan interface{}, nPeers)
for i := 0; i < nPeers; i++ {
reactors[i] = NewConsensusReactor(css[i], false)
eventSwitch := events.NewEventSwitch()
_, err := eventSwitch.Start()
if err != nil {
t.Fatalf("Failed to start switch: %v", err)
}
eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1)
conR := NewConsensusReactor(css[i], false)
conR.SetPrivValidator(privVal)
conR.SetEventSwitch(eventSwitch)
reactors[i].SetEventSwitch(eventSwitch)
eventChans[i] = subscribeToEventRespond(eventSwitch, "tester", types.EventStringNewBlock())
}
p2p.MakeConnectedSwitches(nPeers, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("CONSENSUS", reactors[i])
return s
}, p2p.Connect2Switches)
var conRI p2p.Reactor
conRI = conR
if i == 0 {
conRI = NewByzantineReactor(conR)
}
reactors[i] = conRI
// map of active validators
activeVals := make(map[string]struct{})
for i := 0; i < nVals; i++ {
activeVals[string(css[i].privValidator.GetAddress())] = struct{}{}
}
p2p.MakeConnectedSwitches(N, func(i int, s *p2p.Switch) *p2p.Switch {
// ignore new switch s, we already made ours
switches[i].AddReactor("CONSENSUS", reactors[i])
return switches[i]
}, func(sws []*p2p.Switch, i, j int) {
// the network starts partitioned with globally active adversary
if i != 0 {
return
}
p2p.Connect2Switches(sws, i, j)
// wait till everyone makes block 1
timeoutWaitGroup(t, nPeers, func(wg *sync.WaitGroup, j int) {
<-eventChans[j]
eventChans[j] <- struct{}{}
wg.Done()
})
// byz proposer sends one block to peers[0]
// and the other block to peers[1] and peers[2].
// note peers and switches order don't match.
peers := switches[0].Peers().List()
ind0 := getSwitchIndex(switches, peers[0])
ind1 := getSwitchIndex(switches, peers[1])
ind2 := getSwitchIndex(switches, peers[2])
newValidatorPubKey := css[nVals].privValidator.(*types.PrivValidator).PubKey
newValidatorTx := dummy.MakeValSetChangeTx(newValidatorPubKey.Bytes(), uint64(testMinPower))
// connect the 2 peers in the larger partition
p2p.Connect2Switches(switches, ind1, ind2)
// wait till everyone makes block 2
// ensure the commit includes all validators
// send newValTx to change vals in block 3
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, newValidatorTx)
// wait for someone in the big partition to make a block
// wait till everyone makes block 3.
// it includes the commit for block 2, which is by the original validator set
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
select {
case <-eventChans[ind2]:
}
// wait till everyone makes block 4.
// it includes the commit for block 3, which is by the original validator set
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
log.Notice("A block has been committed. Healing partition")
// the commits for block 4 should be with the updated validator set
activeVals[string(newValidatorPubKey.Address())] = struct{}{}
// connect the partitions
p2p.Connect2Switches(switches, ind0, ind1)
p2p.Connect2Switches(switches, ind0, ind2)
// wait till everyone makes block 5
// it includes the commit for block 4, which should have the updated validator set
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
// wait till everyone makes the first new block
// (one of them already has)
wg := new(sync.WaitGroup)
wg.Add(2)
for i := 1; i < N-1; i++ {
go func(j int) {
<-eventChans[j]
wg.Done()
}(i)
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
tick := time.NewTicker(time.Second * 10)
select {
case <-done:
case <-tick.C:
for i, reactor := range reactors {
t.Log(Fmt("Consensus Reactor %v", i))
t.Log(Fmt("%v", reactor))
}
t.Fatalf("Timed out waiting for all validators to commit first block")
}
// TODO: test more changes!
}
func getSwitchIndex(switches []*p2p.Switch, peer *p2p.Peer) int {
for i, s := range switches {
if bytes.Equal(peer.NodeInfo.PubKey.Address(), s.NodeInfo().PubKey.Address()) {
return i
func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) {
timeoutWaitGroup(t, n, func(wg *sync.WaitGroup, j int) {
newBlock := <-eventChans[j]
err := validateBlock(newBlock.(types.EventDataNewBlock).Block, activeVals)
if err != nil {
t.Fatal(err)
}
}
panic("didnt find peer in switches")
return -1
}
//-------------------------------
// byzantine consensus functions
func byzantineDecideProposalFunc(height, round int, cs *ConsensusState, sw *p2p.Switch) {
// byzantine user should create two proposals and try to split the vote.
// Avoid sending on internalMsgQueue and running consensus state.
// Create a new proposal block from state/txs from the mempool.
block1, blockParts1 := cs.createProposalBlock()
polRound, polBlockID := cs.Votes.POLInfo()
proposal1 := types.NewProposal(height, round, blockParts1.Header(), polRound, polBlockID)
cs.privValidator.SignProposal(cs.state.ChainID, proposal1) // byzantine doesnt err
// Create a new proposal block from state/txs from the mempool.
block2, blockParts2 := cs.createProposalBlock()
polRound, polBlockID = cs.Votes.POLInfo()
proposal2 := types.NewProposal(height, round, blockParts2.Header(), polRound, polBlockID)
cs.privValidator.SignProposal(cs.state.ChainID, proposal2) // byzantine doesnt err
block1Hash := block1.Hash()
block2Hash := block2.Hash()
// broadcast conflicting proposals/block parts to peers
peers := sw.Peers().List()
log.Notice("Byzantine: broadcasting conflicting proposals", "peers", len(peers))
for i, peer := range peers {
if i < len(peers)/2 {
go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1)
} else {
go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2)
for _, tx := range txs {
css[j].mempool.CheckTx(tx, nil)
}
}
}
func sendProposalAndParts(height, round int, cs *ConsensusState, peer *p2p.Peer, proposal *types.Proposal, blockHash []byte, parts *types.PartSet) {
// proposal
msg := &ProposalMessage{Proposal: proposal}
peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
// parts
for i := 0; i < parts.Total(); i++ {
part := parts.GetPart(i)
msg := &BlockPartMessage{
Height: height, // This tells peer that this part applies to us.
Round: round, // This tells peer that this part applies to us.
Part: part,
}
peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
}
// votes
cs.mtx.Lock()
prevote, _ := cs.signVote(types.VoteTypePrevote, blockHash, parts.Header())
precommit, _ := cs.signVote(types.VoteTypePrecommit, blockHash, parts.Header())
cs.mtx.Unlock()
peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{prevote}})
peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{precommit}})
}
//----------------------------------------
// byzantine consensus reactor
type ByzantineReactor struct {
Service
reactor *ConsensusReactor
}
func NewByzantineReactor(conR *ConsensusReactor) *ByzantineReactor {
return &ByzantineReactor{
Service: conR,
reactor: conR,
}
eventChans[j] <- struct{}{}
wg.Done()
})
}
func (br *ByzantineReactor) SetSwitch(s *p2p.Switch) { br.reactor.SetSwitch(s) }
func (br *ByzantineReactor) GetChannels() []*p2p.ChannelDescriptor { return br.reactor.GetChannels() }
func (br *ByzantineReactor) AddPeer(peer *p2p.Peer) {
if !br.reactor.IsRunning() {
return
}
// Create peerState for peer
peerState := NewPeerState(peer)
peer.Data.Set(types.PeerStateKey, peerState)
// Send our state to peer.
// If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
if !br.reactor.fastSync {
br.reactor.sendNewRoundStepMessage(peer)
// expects high synchrony!
func validateBlock(block *types.Block, activeVals map[string]struct{}) error {
if block.LastCommit.Size() != len(activeVals) {
return fmt.Errorf("Commit size doesn't match number of active validators. Got %d, expected %d", block.LastCommit.Size(), len(activeVals))
}
}
func (br *ByzantineReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
br.reactor.RemovePeer(peer, reason)
}
func (br *ByzantineReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte) {
br.reactor.Receive(chID, peer, msgBytes)
}
//----------------------------------------
// byzantine privValidator
type ByzantinePrivValidator struct {
Address []byte `json:"address"`
types.Signer `json:"-"`
mtx sync.Mutex
}
// Return a priv validator that will sign anything
func NewByzantinePrivValidator(pv *types.PrivValidator) *ByzantinePrivValidator {
return &ByzantinePrivValidator{
Address: pv.Address,
Signer: pv.Signer,
for _, vote := range block.LastCommit.Precommits {
if _, ok := activeVals[string(vote.ValidatorAddress)]; !ok {
return fmt.Errorf("Found vote for unactive validator %X", vote.ValidatorAddress)
}
}
}
func (privVal *ByzantinePrivValidator) GetAddress() []byte {
return privVal.Address
}
func (privVal *ByzantinePrivValidator) SignVote(chainID string, vote *types.Vote) error {
privVal.mtx.Lock()
defer privVal.mtx.Unlock()
// Sign
vote.Signature = privVal.Sign(types.SignBytes(chainID, vote)).(crypto.SignatureEd25519)
return nil
}
func (privVal *ByzantinePrivValidator) SignProposal(chainID string, proposal *types.Proposal) error {
privVal.mtx.Lock()
defer privVal.mtx.Unlock()
func timeoutWaitGroup(t *testing.T, n int, f func(*sync.WaitGroup, int)) {
wg := new(sync.WaitGroup)
wg.Add(n)
for i := 0; i < n; i++ {
go f(wg, i)
}
// Sign
proposal.Signature = privVal.Sign(types.SignBytes(chainID, proposal)).(crypto.SignatureEd25519)
return nil
}
// Make wait into a channel
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
func (privVal *ByzantinePrivValidator) String() string {
return Fmt("PrivValidator{%X}", privVal.Address)
tick := time.NewTicker(time.Second * 3)
select {
case <-done:
case <-tick.C:
t.Fatalf("Timed out waiting for all validators to commit a block")
}
}

+ 10
- 6
consensus/state.go View File

@ -221,11 +221,12 @@ type PrivValidator interface {
type ConsensusState struct {
BaseService
config cfg.Config
proxyAppConn proxy.AppConnConsensus
blockStore *bc.BlockStore
mempool *mempl.Mempool
privValidator PrivValidator
config cfg.Config
proxyAppConn proxy.AppConnConsensus
blockStore *bc.BlockStore
mempool *mempl.Mempool
privValidator PrivValidator // for signing votes
mtx sync.Mutex
RoundState
@ -313,6 +314,7 @@ func (cs *ConsensusState) GetValidators() (int, []*types.Validator) {
return cs.state.LastBlockHeight, cs.state.Validators.Copy().Validators
}
// Sets our private validator account for signing votes.
func (cs *ConsensusState) SetPrivValidator(priv PrivValidator) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
@ -1253,6 +1255,8 @@ func (cs *ConsensusState) finalizeCommit(height int) {
// Save to blockStore.
if cs.blockStore.Height() < block.Height {
// NOTE: the seenCommit is local justification to commit this block,
// but may differ from the LastCommit included in the next block
precommits := cs.Votes.Precommits(cs.CommitRound)
seenCommit := precommits.MakeCommit()
cs.blockStore.SaveBlock(block, blockParts, seenCommit)
@ -1498,7 +1502,6 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerKey string) (added bool,
}
func (cs *ConsensusState) signVote(type_ byte, hash []byte, header types.PartSetHeader) (*types.Vote, error) {
// TODO: store our index in the cs so we don't have to do this every time
addr := cs.privValidator.GetAddress()
valIndex, _ := cs.Validators.GetByAddress(addr)
vote := &types.Vote{
@ -1515,6 +1518,7 @@ func (cs *ConsensusState) signVote(type_ byte, hash []byte, header types.PartSet
// sign the vote and publish on internalMsgQueue
func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.PartSetHeader) *types.Vote {
// if we don't have a key or we're not in the validator set, do nothing
if cs.privValidator == nil || !cs.Validators.HasAddress(cs.privValidator.GetAddress()) {
return nil
}


+ 11
- 11
glide.lock View File

@ -1,8 +1,8 @@
hash: 20cb38481a78b73ba3a42af08e34cd825ddb7c826833d67cc61e45c1b3a4c484
updated: 2016-11-16T16:25:10.693961906-05:00
updated: 2016-11-23T18:28:52.925584919-05:00
imports:
- name: github.com/btcsuite/btcd
version: b134beb3b7809de6370a93cc5f6a684d6942e2e8
version: afec1bd1245a4a19e6dfe1306974b733e7cbb9b8
subpackages:
- btcec
- name: github.com/btcsuite/fastsha256
@ -18,7 +18,7 @@ imports:
subpackages:
- proto
- name: github.com/golang/protobuf
version: 224aaba33b1ac32a92a165f27489409fb8133d08
version: 8ee79997227bf9b34611aee7946ae64735e6fd93
subpackages:
- proto
- name: github.com/golang/snappy
@ -28,7 +28,7 @@ imports:
- name: github.com/mattn/go-colorable
version: d228849504861217f796da67fae4f6e347643f15
- name: github.com/mattn/go-isatty
version: 66b8e73f3f5cda9f96b69efd03dd3d7fc4a5cdb8
version: 30a891c33c7cde7b02a981314b4228ec99380cca
- name: github.com/spf13/pflag
version: 5ccb023bc27df288a957c5e994cd44fd19619465
- name: github.com/syndtr/goleveldb
@ -58,7 +58,7 @@ imports:
- name: github.com/tendermint/go-clist
version: 3baa390bbaf7634251c42ad69a8682e7e3990552
- name: github.com/tendermint/go-common
version: fa3daa7abc253264c916c12fecce3effa01a1287
version: 6b4160f2a57487f277c42bf06fd280195dfdb278
subpackages:
- test
- name: github.com/tendermint/go-config
@ -76,9 +76,9 @@ imports:
- name: github.com/tendermint/go-logger
version: cefb3a45c0bf3c493a04e9bcd9b1540528be59f2
- name: github.com/tendermint/go-merkle
version: 05042c6ab9cad51d12e4cecf717ae68e3b1409a8
version: bfc4afe28c7a50045d4d1eb043e67460f8a51a4f
- name: github.com/tendermint/go-p2p
version: 62b37014a89b5eddff74844846979d30911cffda
version: f173a17ed3e9b341d480b36e5041819c8a5b8350
subpackages:
- upnp
- name: github.com/tendermint/go-rpc
@ -94,7 +94,7 @@ imports:
subpackages:
- term
- name: github.com/tendermint/tmsp
version: 0bdb3b887e70b1ef16d32eece0248ec071fd8490
version: 3742e35e6db5dcd7596aac9f661b46f5699ebec8
subpackages:
- client
- example/counter
@ -103,7 +103,7 @@ imports:
- server
- types
- name: golang.org/x/crypto
version: 9477e0b78b9ac3d0b03822fd95422e2fe07627cd
version: ede567c8e044a5913dad1d1af3696d9da953104c
subpackages:
- curve25519
- nacl/box
@ -124,11 +124,11 @@ imports:
- lex/httplex
- trace
- name: golang.org/x/sys
version: b699b7032584f0953262cb2788a0ca19bb494703
version: 30237cf4eefd639b184d1f2cb77a581ea0be8947
subpackages:
- unix
- name: google.golang.org/grpc
version: 941cc894cea3c87a12943fd12b594964541b6d28
version: eca2ad68af4d7bf894ada6bd263133f069a441d5
subpackages:
- codes
- credentials


+ 2
- 2
node/node.go View File

@ -101,10 +101,10 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato
// Make ConsensusReactor
consensusState := consensus.NewConsensusState(config, state.Copy(), proxyApp.Consensus(), blockStore, mempool)
consensusReactor := consensus.NewConsensusReactor(consensusState, fastSync)
if privValidator != nil {
consensusReactor.SetPrivValidator(privValidator)
consensusState.SetPrivValidator(privValidator)
}
consensusReactor := consensus.NewConsensusReactor(consensusState, fastSync)
// Make p2p network switch
sw := p2p.NewSwitch(config.GetConfig("p2p"))


+ 9
- 3
rpc/test/client_test.go View File

@ -5,13 +5,14 @@ import (
crand "crypto/rand"
"fmt"
"math/rand"
"strings"
"testing"
"time"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tmsp/example/dummy"
tmsp "github.com/tendermint/tmsp/types"
)
@ -156,9 +157,14 @@ func testTMSPQuery(t *testing.T, statusI interface{}, value []byte) {
if query.Result.IsErr() {
panic(Fmt("Query returned an err: %v", query))
}
qResult := new(dummy.QueryResult)
if err := wire.ReadJSONBytes(query.Result.Data, qResult); err != nil {
t.Fatal(err)
}
// XXX: specific to value returned by the dummy
if !strings.Contains(string(query.Result.Data), "exists=true") {
panic(Fmt("Query error. Expected to find 'exists=true'. Got: %s", query.Result.Data))
if qResult.Exists != true {
panic(Fmt("Query error. Expected to find 'exists=true'. Got: %v", qResult))
}
}


+ 118
- 137
state/execution.go View File

@ -8,6 +8,7 @@ import (
. "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-crypto"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
tmsp "github.com/tendermint/tmsp/types"
@ -21,26 +22,33 @@ import (
func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, blockPartsHeader types.PartSetHeader) error {
// Validate the block.
err := s.validateBlock(block)
if err != nil {
if err := s.validateBlock(block); err != nil {
return ErrInvalidBlock(err)
}
// Update the validator set
// compute bitarray of validators that signed
signed := commitBitArrayFromBlock(block)
_ = signed // TODO send on begin block
// copy the valset
valSet := s.Validators.Copy()
// Update valSet with signatures from block.
updateValidatorsWithBlock(s.LastValidators, valSet, block)
// TODO: Update the validator set (e.g. block.Data.ValidatorUpdates?)
nextValSet := valSet.Copy()
// Execute the block txs
err = s.execBlockOnProxyApp(eventCache, proxyAppConn, block)
changedValidators, err := execBlockOnProxyApp(eventCache, proxyAppConn, block)
if err != nil {
// There was some error in proxyApp
// TODO Report error and wait for proxyApp to be available.
return ErrProxyAppConn(err)
}
// update the validator set
err = updateValidators(nextValSet, changedValidators)
if err != nil {
log.Warn("Error changing validator set", "error", err)
// TODO: err or carry on?
}
// All good!
// Update validator accums and set state variables
nextValSet.IncrementAccum(1)
@ -54,8 +62,9 @@ func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnC
}
// Executes block's transactions on proxyAppConn.
// Returns a list of updates to the validator set
// TODO: Generate a bitmap or otherwise store tx validity in state.
func (s *State) execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) error {
func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) ([]*tmsp.Validator, error) {
var validTxs, invalidTxs = 0, 0
@ -94,7 +103,7 @@ func (s *State) execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn prox
err := proxyAppConn.BeginBlockSync(block.Hash(), types.TM2PB.Header(block.Header))
if err != nil {
log.Warn("Error in proxyAppConn.BeginBlock", "error", err)
return err
return nil, err
}
fail.Fail() // XXX
@ -104,7 +113,7 @@ func (s *State) execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn prox
fail.FailRand(len(block.Txs)) // XXX
proxyAppConn.AppendTxAsync(tx)
if err := proxyAppConn.Error(); err != nil {
return err
return nil, err
}
}
@ -114,44 +123,69 @@ func (s *State) execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn prox
changedValidators, err := proxyAppConn.EndBlockSync(uint64(block.Height))
if err != nil {
log.Warn("Error in proxyAppConn.EndBlock", "error", err)
return err
return nil, err
}
fail.Fail() // XXX
// TODO: Do something with changedValidators
log.Debug("TODO: Do something with changedValidators", "changedValidators", changedValidators)
log.Info(Fmt("ExecBlock got %v valid txs and %v invalid txs", validTxs, invalidTxs))
return nil
log.Info("Executed block", "height", block.Height, "valid txs", validTxs, "invalid txs", invalidTxs)
if len(changedValidators) > 0 {
log.Info("Update to validator set", "updates", tmsp.ValidatorsString(changedValidators))
}
return changedValidators, nil
}
// Updates the LastCommitHeight of the validators in valSet, in place.
// Assumes that lastValSet matches the valset of block.LastCommit
// CONTRACT: lastValSet is not mutated.
func updateValidatorsWithBlock(lastValSet *types.ValidatorSet, valSet *types.ValidatorSet, block *types.Block) {
func updateValidators(validators *types.ValidatorSet, changedValidators []*tmsp.Validator) error {
// TODO: prevent change of 1/3+ at once
for i, precommit := range block.LastCommit.Precommits {
if precommit == nil {
continue
for _, v := range changedValidators {
pubkey, err := crypto.PubKeyFromBytes(v.PubKey) // NOTE: expects go-wire encoded pubkey
if err != nil {
return err
}
_, val := lastValSet.GetByIndex(i)
if val == nil {
PanicCrisis(Fmt("Failed to fetch validator at index %v", i))
address := pubkey.Address()
power := int64(v.Power)
// mind the overflow from uint64
if power < 0 {
return errors.New(Fmt("Power (%d) overflows int64", v.Power))
}
if _, val_ := valSet.GetByAddress(val.Address); val_ != nil {
val_.LastCommitHeight = block.Height - 1
updated := valSet.Update(val_)
if !updated {
PanicCrisis("Failed to update validator LastCommitHeight")
_, val := validators.GetByAddress(address)
if val == nil {
// add val
added := validators.Add(types.NewValidator(pubkey, power))
if !added {
return errors.New(Fmt("Failed to add new validator %X with voting power %d", address, power))
}
} else if v.Power == 0 {
// remove val
_, removed := validators.Remove(address)
if !removed {
return errors.New(Fmt("Failed to remove validator %X)"))
}
} else {
// XXX This is not an error if validator was removed.
// But, we don't mutate validators yet so go ahead and panic.
PanicCrisis("Could not find validator")
// update val
val.VotingPower = power
updated := validators.Update(val)
if !updated {
return errors.New(Fmt("Failed to update validator %X with voting power %d", address, power))
}
}
}
return nil
}
// return a bit array of validators that signed the last commit
// NOTE: assumes commits have already been authenticated
func commitBitArrayFromBlock(block *types.Block) *BitArray {
signed := NewBitArray(len(block.LastCommit.Precommits))
for i, precommit := range block.LastCommit.Precommits {
if precommit != nil {
signed.SetIndex(i, true) // val_.LastCommitHeight = block.Height - 1
}
}
return signed
}
//-----------------------------------------------------
@ -259,6 +293,7 @@ func (m mockMempool) Update(height int, txs []types.Tx) {}
type BlockStore interface {
Height() int
LoadBlock(height int) *types.Block
LoadBlockMeta(height int) *types.BlockMeta
}
type Handshaker struct {
@ -273,8 +308,7 @@ func NewHandshaker(config cfg.Config, state *State, store BlockStore) *Handshake
return &Handshaker{config, state, store, 0}
}
// TODO: retry the handshake once if it fails the first time
// ... let Info take an argument determining its behaviour
// TODO: retry the handshake/replay if it fails ?
func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
// handshake is done via info request on the query conn
res, tmspInfo, blockInfo, configInfo := proxyApp.Query().InfoSync()
@ -287,10 +321,9 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
return nil
}
log.Notice("TMSP Handshake", "height", blockInfo.BlockHeight, "block_hash", blockInfo.BlockHash, "app_hash", blockInfo.AppHash)
log.Notice("TMSP Handshake", "height", blockInfo.BlockHeight, "app_hash", blockInfo.AppHash)
blockHeight := int(blockInfo.BlockHeight) // safe, should be an int32
blockHash := blockInfo.BlockHash
blockHeight := int(blockInfo.BlockHeight) // XXX: beware overflow
appHash := blockInfo.AppHash
if tmspInfo != nil {
@ -298,40 +331,13 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
_ = tmspInfo
}
// last block (nil if we starting from 0)
var header *types.Header
var partsHeader types.PartSetHeader
// replay all blocks after blockHeight
// if blockHeight == 0, we will replay everything
if blockHeight != 0 {
block := h.store.LoadBlock(blockHeight)
if block == nil {
return ErrUnknownBlock{blockHeight}
}
// check block hash
if !bytes.Equal(block.Hash(), blockHash) {
return ErrBlockHashMismatch{block.Hash(), blockHash, blockHeight}
}
// NOTE: app hash should be in the next block ...
// check app hash
/*if !bytes.Equal(block.Header.AppHash, appHash) {
return fmt.Errorf("Handshake error. App hash at height %d does not match. Got %X, expected %X", blockHeight, appHash, block.Header.AppHash)
}*/
header = block.Header
partsHeader = block.MakePartSet(h.config.GetInt("block_part_size")).Header()
}
if configInfo != nil {
// TODO: set config info
_ = configInfo
}
// replay blocks up to the latest in the blockstore
err := h.ReplayBlocks(appHash, header, partsHeader, proxyApp.Consensus())
err := h.ReplayBlocks(appHash, blockHeight, proxyApp.Consensus())
if err != nil {
return errors.New(Fmt("Error on replay: %v", err))
}
@ -342,97 +348,72 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
}
// Replay all blocks after blockHeight and ensure the result matches the current state.
func (h *Handshaker) ReplayBlocks(appHash []byte, header *types.Header, partsHeader types.PartSetHeader,
appConnConsensus proxy.AppConnConsensus) error {
// NOTE/TODO: tendermint may crash after the app commits
// but before it can save the new state root.
// it should save all eg. valset changes before calling Commit.
// then, if tm state is behind app state, the only thing missing can be app hash
// get a fresh state and reset to the apps latest
stateCopy := h.state.Copy()
// TODO: put validators in iavl tree so we can set the state with an older validator set
lastVals, nextVals := stateCopy.GetValidators()
if header == nil {
stateCopy.LastBlockHeight = 0
stateCopy.LastBlockID = types.BlockID{}
// stateCopy.LastBlockTime = ... doesnt matter
stateCopy.Validators = nextVals
stateCopy.LastValidators = lastVals
} else {
stateCopy.SetBlockAndValidators(header, partsHeader, lastVals, nextVals)
}
stateCopy.Stale = false
stateCopy.AppHash = appHash
func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, appConnConsensus proxy.AppConnConsensus) error {
appBlockHeight := stateCopy.LastBlockHeight
coreBlockHeight := h.store.Height()
if coreBlockHeight < appBlockHeight {
storeBlockHeight := h.store.Height()
if storeBlockHeight < appBlockHeight {
// if the app is ahead, there's nothing we can do
return ErrAppBlockHeightTooHigh{coreBlockHeight, appBlockHeight}
return ErrAppBlockHeightTooHigh{storeBlockHeight, appBlockHeight}
} else if coreBlockHeight == appBlockHeight {
} else if storeBlockHeight == appBlockHeight {
// if we crashed between Commit and SaveState,
// the state's app hash is stale.
// the state's app hash is stale
// otherwise we're synced
if h.state.Stale {
h.state.Stale = false
if h.state.AppHashIsStale {
h.state.AppHashIsStale = false
h.state.AppHash = appHash
}
return checkState(h.state, stateCopy)
return nil
} else if h.state.LastBlockHeight == appBlockHeight {
// core is ahead of app but core's state height is at apps height
// store is ahead of app but core's state height is at apps height
// this happens if we crashed after saving the block,
// but before committing it. We should be 1 ahead
if coreBlockHeight != appBlockHeight+1 {
PanicSanity(Fmt("core.state.height == app.height but core.height (%d) > app.height+1 (%d)", coreBlockHeight, appBlockHeight+1))
if storeBlockHeight != appBlockHeight+1 {
PanicSanity(Fmt("core.state.height == app.height but store.height (%d) > app.height+1 (%d)", storeBlockHeight, appBlockHeight+1))
}
// check that the blocks last apphash is the states apphash
block := h.store.LoadBlock(coreBlockHeight)
block := h.store.LoadBlock(storeBlockHeight)
if !bytes.Equal(block.Header.AppHash, appHash) {
return ErrLastStateMismatch{coreBlockHeight, block.Header.AppHash, appHash}
return ErrLastStateMismatch{storeBlockHeight, block.Header.AppHash, appHash}
}
// replay the block against the actual tendermint state (not the copy)
return h.loadApplyBlock(coreBlockHeight, h.state, appConnConsensus)
blockMeta := h.store.LoadBlockMeta(storeBlockHeight)
h.nBlocks += 1
var eventCache types.Fireable // nil
// replay the block against the actual tendermint state
return h.state.ApplyBlock(eventCache, appConnConsensus, block, blockMeta.PartsHeader, mockMempool{})
} else {
// either we're caught up or there's blocks to replay
// replay all blocks starting with appBlockHeight+1
for i := appBlockHeight + 1; i <= coreBlockHeight; i++ {
h.loadApplyBlock(i, stateCopy, appConnConsensus)
var eventCache types.Fireable // nil
var appHash []byte
for i := appBlockHeight + 1; i <= storeBlockHeight; i++ {
h.nBlocks += 1
block := h.store.LoadBlock(i)
_, err := execBlockOnProxyApp(eventCache, appConnConsensus, block)
if err != nil {
log.Warn("Error executing block on proxy app", "height", i, "err", err)
return err
}
// Commit block, get hash back
res := appConnConsensus.CommitSync()
if res.IsErr() {
log.Warn("Error in proxyAppConn.CommitSync", "error", res)
return res
}
if res.Log != "" {
log.Info("Commit.Log: " + res.Log)
}
appHash = res.Data
}
return checkState(h.state, stateCopy)
}
}
func checkState(s, stateCopy *State) error {
// The computed state and the previously set state should be identical
if !s.Equals(stateCopy) {
return ErrStateMismatch{stateCopy, s}
}
return nil
}
func (h *Handshaker) loadApplyBlock(blockIndex int, state *State, appConnConsensus proxy.AppConnConsensus) error {
h.nBlocks += 1
block := h.store.LoadBlock(blockIndex)
panicOnNilBlock(blockIndex, h.store.Height(), block) // XXX
var eventCache types.Fireable // nil
return state.ApplyBlock(eventCache, appConnConsensus, block, block.MakePartSet(h.config.GetInt("block_part_size")).Header(), mockMempool{})
}
func panicOnNilBlock(height, bsHeight int, block *types.Block) {
if block == nil {
// Sanity?
PanicCrisis(Fmt(`
block is nil for height <= blockStore.Height() (%d <= %d).
Block: %v,
`, height, bsHeight, block))
if !bytes.Equal(h.state.AppHash, appHash) {
return errors.New(Fmt("Tendermint state.AppHash does not match AppHash after replay", "expected", h.state.AppHash, "got", appHash))
}
return nil
}
}

+ 24
- 7
state/execution_test.go View File

@ -8,6 +8,7 @@ import (
"github.com/tendermint/tendermint/config/tendermint_test"
// . "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-crypto"
dbm "github.com/tendermint/go-db"
"github.com/tendermint/tendermint/proxy"
@ -23,10 +24,16 @@ var (
testPartSize = 65536
)
//---------------------------------------
// Test block execution
func TestExecBlock(t *testing.T) {
// TODO
}
//---------------------------------------
// Test handshake/replay
// Sync from scratch
func TestHandshakeReplayAll(t *testing.T) {
testHandshakeReplay(t, 0)
@ -51,7 +58,7 @@ func TestHandshakeReplayNone(t *testing.T) {
func testHandshakeReplay(t *testing.T, n int) {
config := tendermint_test.ResetConfig("proxy_test_")
state, store := stateAndStore()
state, store := stateAndStore(config)
clientCreator := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.GetString("db_dir"), "1")))
clientCreator2 := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.GetString("db_dir"), "2")))
proxyApp := proxy.NewAppConns(config, clientCreator, NewHandshaker(config, state, store))
@ -69,7 +76,7 @@ func testHandshakeReplay(t *testing.T, n int) {
if _, err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err)
}
state2, _ := stateAndStore()
state2, _ := stateAndStore(config)
for i := 0; i < n; i++ {
block := chain[i]
err := state2.ApplyBlock(nil, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), mempool)
@ -105,6 +112,7 @@ func testHandshakeReplay(t *testing.T, n int) {
}
//--------------------------
// utils for making blocks
// make some bogus txs
func txsFunc(blockNum int) (txs []types.Tx) {
@ -167,7 +175,7 @@ func makeBlockchain(t *testing.T, proxyApp proxy.AppConns, state *State) (blockc
}
// fresh state and mock store
func stateAndStore() (*State, *mockBlockStore) {
func stateAndStore(config cfg.Config) (*State, *mockBlockStore) {
stateDB := dbm.NewMemDB()
return MakeGenesisState(stateDB, &types.GenesisDoc{
ChainID: chainID,
@ -175,19 +183,28 @@ func stateAndStore() (*State, *mockBlockStore) {
types.GenesisValidator{privKey.PubKey(), 10000, "test"},
},
AppHash: nil,
}), NewMockBlockStore(nil)
}), NewMockBlockStore(config, nil)
}
//----------------------------------
// mock block store
type mockBlockStore struct {
chain []*types.Block
config cfg.Config
chain []*types.Block
}
func NewMockBlockStore(chain []*types.Block) *mockBlockStore {
return &mockBlockStore{chain}
func NewMockBlockStore(config cfg.Config, chain []*types.Block) *mockBlockStore {
return &mockBlockStore{config, chain}
}
func (bs *mockBlockStore) Height() int { return len(bs.chain) }
func (bs *mockBlockStore) LoadBlock(height int) *types.Block { return bs.chain[height-1] }
func (bs *mockBlockStore) LoadBlockMeta(height int) *types.BlockMeta {
block := bs.chain[height-1]
return &types.BlockMeta{
Hash: block.Hash(),
Header: block.Header,
PartsHeader: block.MakePartSet(bs.config.GetInt("block_part_size")).Header(),
}
}

+ 9
- 6
state/state.go View File

@ -34,12 +34,12 @@ type State struct {
LastBlockID types.BlockID
LastBlockTime time.Time
Validators *types.ValidatorSet
LastValidators *types.ValidatorSet
LastValidators *types.ValidatorSet // block.LastCommit validated against this
// AppHash is updated after Commit;
// it's stale after ExecBlock and before Commit
Stale bool
AppHash []byte
AppHashIsStale bool
AppHash []byte
}
func LoadState(db dbm.DB) *State {
@ -60,6 +60,9 @@ func LoadState(db dbm.DB) *State {
}
func (s *State) Copy() *State {
if s.AppHashIsStale {
PanicSanity(Fmt("App hash is stale: %v", s))
}
return &State{
db: s.db,
GenesisDoc: s.GenesisDoc,
@ -69,7 +72,7 @@ func (s *State) Copy() *State {
LastBlockTime: s.LastBlockTime,
Validators: s.Validators.Copy(),
LastValidators: s.LastValidators.Copy(),
Stale: s.Stale, // XXX: but really state shouldnt be copied while its stale
AppHashIsStale: false,
AppHash: s.AppHash,
}
}
@ -94,7 +97,7 @@ func (s *State) Bytes() []byte {
}
// Mutate state variables to match block and validators
// Since we don't have the new AppHash yet, we set s.Stale=true
// Since we don't have the new AppHash yet, we set s.AppHashIsStale=true
func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, prevValSet, nextValSet *types.ValidatorSet) {
s.LastBlockHeight = header.Height
s.LastBlockID = types.BlockID{header.Hash(), blockPartsHeader}
@ -102,7 +105,7 @@ func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader typ
s.Validators = nextValSet
s.LastValidators = prevValSet
s.Stale = true
s.AppHashIsStale = true
}
func (s *State) GetValidators() (*types.ValidatorSet, *types.ValidatorSet) {


+ 4
- 4
test/app/dummy_test.sh View File

@ -26,7 +26,7 @@ echo ""
RESPONSE=`tmsp-cli query $KEY`
set +e
A=`echo $RESPONSE | grep exists=true`
A=`echo $RESPONSE | grep '"exists":true'`
if [[ $? != 0 ]]; then
echo "Failed to find 'exists=true' for $KEY. Response:"
echo "$RESPONSE"
@ -37,7 +37,7 @@ set -e
# we should not be able to look up the value
RESPONSE=`tmsp-cli query $VALUE`
set +e
A=`echo $RESPONSE | grep exists=true`
A=`echo $RESPONSE | grep '"exists":true'`
if [[ $? == 0 ]]; then
echo "Found 'exists=true' for $VALUE when we should not have. Response:"
echo "$RESPONSE"
@ -54,7 +54,7 @@ RESPONSE=`curl -s 127.0.0.1:46657/tmsp_query?query=\"$(toHex $KEY)\"`
RESPONSE=`echo $RESPONSE | jq .result[1].result.Data | xxd -r -p`
set +e
A=`echo $RESPONSE | grep exists=true`
A=`echo $RESPONSE | grep '"exists":true'`
if [[ $? != 0 ]]; then
echo "Failed to find 'exists=true' for $KEY. Response:"
echo "$RESPONSE"
@ -66,7 +66,7 @@ set -e
RESPONSE=`curl -s 127.0.0.1:46657/tmsp_query?query=\"$(toHex $VALUE)\"`
RESPONSE=`echo $RESPONSE | jq .result[1].result.Data | xxd -r -p`
set +e
A=`echo $RESPONSE | grep exists=true`
A=`echo $RESPONSE | grep '"exists":true'`
if [[ $? == 0 ]]; then
echo "Found 'exists=true' for $VALUE when we should not have. Response:"
echo "$RESPONSE"


+ 9
- 5
types/block.go View File

@ -21,6 +21,7 @@ type Block struct {
LastCommit *Commit `json:"last_commit"`
}
// TODO: version
func MakeBlock(height int, chainID string, txs []Tx, commit *Commit,
prevBlockID BlockID, valHash, appHash []byte, partSize int) (*Block, *PartSet) {
block := &Block{
@ -150,14 +151,15 @@ func (b *Block) StringShort() string {
type Header struct {
ChainID string `json:"chain_id"`
Version string `json:"version"` // TODO:
Height int `json:"height"`
Time time.Time `json:"time"`
NumTxs int `json:"num_txs"`
NumTxs int `json:"num_txs"` // XXX: Can we get rid of this?
LastBlockID BlockID `json:"last_block_id"`
LastCommitHash []byte `json:"last_commit_hash"`
DataHash []byte `json:"data_hash"`
ValidatorsHash []byte `json:"validators_hash"`
AppHash []byte `json:"app_hash"` // state merkle root of txs from the previous block
LastCommitHash []byte `json:"last_commit_hash"` // commit from validators from the last block
DataHash []byte `json:"data_hash"` // transactions
ValidatorsHash []byte `json:"validators_hash"` // validators for the current block
AppHash []byte `json:"app_hash"` // state after txs from the previous block
}
// NOTE: hash is nil if required fields are missing.
@ -291,6 +293,8 @@ func (commit *Commit) ValidateBasic() error {
return errors.New("No precommits in commit")
}
height, round := commit.Height(), commit.Round()
// validate the precommits
for _, precommit := range commit.Precommits {
// It's OK for precommits to be missing.
if precommit == nil {


+ 1
- 1
types/protobuf.go View File

@ -12,7 +12,7 @@ type tm2pb struct{}
func (tm2pb) Header(header *Header) *types.Header {
return &types.Header{
ChainId: header.ChainID,
Height: int32(header.Height),
Height: uint64(header.Height),
Time: uint64(header.Time.Unix()),
NumTxs: uint64(header.NumTxs),
LastBlockId: TM2PB.BlockID(header.LastBlockID),


+ 14
- 14
types/validator.go View File

@ -12,14 +12,21 @@ import (
// Volatile state for each Validator
// TODO: make non-volatile identity
// - Remove LastCommitHeight, send bitarray of vals that signed in BeginBlock
// - Remove Accum - it can be computed, and now valset becomes identifying
type Validator struct {
Address []byte `json:"address"`
PubKey crypto.PubKey `json:"pub_key"`
LastCommitHeight int `json:"last_commit_height"`
VotingPower int64 `json:"voting_power"`
Accum int64 `json:"accum"`
Address []byte `json:"address"`
PubKey crypto.PubKey `json:"pub_key"`
VotingPower int64 `json:"voting_power"`
Accum int64 `json:"accum"`
}
func NewValidator(pubKey crypto.PubKey, votingPower int64) *Validator {
return &Validator{
Address: pubKey.Address(),
PubKey: pubKey,
VotingPower: votingPower,
Accum: 0,
}
}
// Creates a new copy of the validator so we can mutate accum.
@ -57,7 +64,6 @@ func (v *Validator) String() string {
return fmt.Sprintf("Validator{%X %v %v VP:%v A:%v}",
v.Address,
v.PubKey,
v.LastCommitHeight,
v.VotingPower,
v.Accum)
}
@ -96,12 +102,6 @@ func RandValidator(randPower bool, minPower int64) (*Validator, *PrivValidator)
if randPower {
votePower += int64(RandUint32())
}
val := &Validator{
Address: privVal.Address,
PubKey: privVal.PubKey,
LastCommitHeight: 0,
VotingPower: votePower,
Accum: 0,
}
val := NewValidator(privVal.PubKey, votePower)
return val, privVal
}

+ 4
- 10
types/validator_set_test.go View File

@ -16,12 +16,9 @@ func randPubKey() crypto.PubKeyEd25519 {
}
func randValidator_() *Validator {
return &Validator{
Address: RandBytes(20),
PubKey: randPubKey(),
VotingPower: RandInt64(),
Accum: RandInt64(),
}
val := NewValidator(randPubKey(), RandInt64())
val.Accum = RandInt64()
return val
}
func randValidatorSet(numValidators int) *ValidatorSet {
@ -147,10 +144,7 @@ func BenchmarkValidatorSetCopy(b *testing.B) {
for i := 0; i < 1000; i++ {
privKey := crypto.GenPrivKeyEd25519()
pubKey := privKey.PubKey().(crypto.PubKeyEd25519)
val := &Validator{
Address: pubKey.Address(),
PubKey: pubKey,
}
val := NewValidator(pubKey, 0)
if !vset.Add(val) {
panic("Failed to add validator")
}


Loading…
Cancel
Save