diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go new file mode 100644 index 000000000..f049fa3ef --- /dev/null +++ b/consensus/byzantine_test.go @@ -0,0 +1,278 @@ +package consensus + +import ( + "sync" + "testing" + "time" + + "github.com/tendermint/tendermint/config/tendermint_test" + + . "github.com/tendermint/go-common" + cfg "github.com/tendermint/go-config" + "github.com/tendermint/go-crypto" + "github.com/tendermint/go-events" + "github.com/tendermint/go-p2p" + "github.com/tendermint/tendermint/types" +) + +func init() { + config = tendermint_test.ResetConfig("consensus_byzantine_test") +} + +//---------------------------------------------- +// byzantine failures + +// 4 validators. 1 is byzantine. The other three are partitioned into A (1 val) and B (2 vals). +// byzantine validator sends conflicting proposals into A and B, +// and prevotes/precommits on both of them. +// B sees a commit, A doesn't. +// Byzantine validator refuses to prevote. +// Heal partition and ensure A sees the commit +func TestByzantine(t *testing.T) { + resetConfigTimeouts() + N := 4 + css := randConsensusNet(N) + + switches := make([]*p2p.Switch, N) + for i := 0; i < N; i++ { + switches[i] = p2p.NewSwitch(cfg.NewMapConfig(nil)) + } + + reactors := make([]p2p.Reactor, N) + eventChans := make([]chan interface{}, N) + for i := 0; i < N; i++ { + if i == 0 { + css[i].privValidator = NewByzantinePrivValidator(css[i].privValidator.(*types.PrivValidator)) + // make byzantine + css[i].decideProposal = func(j int) func(int, int) { + return func(height, round int) { + byzantineDecideProposalFunc(height, round, css[j], switches[j]) + } + }(i) + css[i].doPrevote = func(height, round int) {} + } + + eventSwitch := events.NewEventSwitch() + _, err := eventSwitch.Start() + if err != nil { + t.Fatalf("Failed to start switch: %v", err) + } + eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1) + + conR := NewConsensusReactor(css[i], false) + conR.SetEventSwitch(eventSwitch) + + var conRI p2p.Reactor + conRI = conR + if i == 0 { + conRI = NewByzantineReactor(conR) + } + reactors[i] = conRI + } + + p2p.MakeConnectedSwitches(N, func(i int, s *p2p.Switch) *p2p.Switch { + // ignore new switch s, we already made ours + switches[i].AddReactor("CONSENSUS", reactors[i]) + return switches[i] + }, func(sws []*p2p.Switch, i, j int) { + // the network starts partitioned with globally active adversary + if i != 0 { + return + } + p2p.Connect2Switches(sws, i, j) + }) + + // byz proposer sends one block to peers[0] + // and the other block to peers[1] and peers[2]. + // note peers and switches order don't match. + peers := switches[0].Peers().List() + ind0 := getSwitchIndex(switches, peers[0]) + ind1 := getSwitchIndex(switches, peers[1]) + ind2 := getSwitchIndex(switches, peers[2]) + + // connect the 2 peers in the larger partition + p2p.Connect2Switches(switches, ind1, ind2) + + // wait for someone in the big partition to make a block + + select { + case <-eventChans[ind2]: + } + + log.Notice("A block has been committed. Healing partition") + + // connect the partitions + p2p.Connect2Switches(switches, ind0, ind1) + p2p.Connect2Switches(switches, ind0, ind2) + + // wait till everyone makes the first new block + // (one of them already has) + wg := new(sync.WaitGroup) + wg.Add(2) + for i := 1; i < N-1; i++ { + go func(j int) { + <-eventChans[j] + wg.Done() + }(i) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + tick := time.NewTicker(time.Second * 10) + select { + case <-done: + case <-tick.C: + for i, reactor := range reactors { + t.Log(Fmt("Consensus Reactor %v", i)) + t.Log(Fmt("%v", reactor)) + } + t.Fatalf("Timed out waiting for all validators to commit first block") + } +} + +//------------------------------- +// byzantine consensus functions + +func byzantineDecideProposalFunc(height, round int, cs *ConsensusState, sw *p2p.Switch) { + // byzantine user should create two proposals and try to split the vote. + // Avoid sending on internalMsgQueue and running consensus state. + + // Create a new proposal block from state/txs from the mempool. + block1, blockParts1 := cs.createProposalBlock() + polRound, polBlockID := cs.Votes.POLInfo() + proposal1 := types.NewProposal(height, round, blockParts1.Header(), polRound, polBlockID) + cs.privValidator.SignProposal(cs.state.ChainID, proposal1) // byzantine doesnt err + + // Create a new proposal block from state/txs from the mempool. + block2, blockParts2 := cs.createProposalBlock() + polRound, polBlockID = cs.Votes.POLInfo() + proposal2 := types.NewProposal(height, round, blockParts2.Header(), polRound, polBlockID) + cs.privValidator.SignProposal(cs.state.ChainID, proposal2) // byzantine doesnt err + + block1Hash := block1.Hash() + block2Hash := block2.Hash() + + // broadcast conflicting proposals/block parts to peers + peers := sw.Peers().List() + log.Notice("Byzantine: broadcasting conflicting proposals", "peers", len(peers)) + for i, peer := range peers { + if i < len(peers)/2 { + go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1) + } else { + go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2) + } + } +} + +func sendProposalAndParts(height, round int, cs *ConsensusState, peer *p2p.Peer, proposal *types.Proposal, blockHash []byte, parts *types.PartSet) { + // proposal + msg := &ProposalMessage{Proposal: proposal} + peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) + + // parts + for i := 0; i < parts.Total(); i++ { + part := parts.GetPart(i) + msg := &BlockPartMessage{ + Height: height, // This tells peer that this part applies to us. + Round: round, // This tells peer that this part applies to us. + Part: part, + } + peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) + } + + // votes + cs.mtx.Lock() + prevote, _ := cs.signVote(types.VoteTypePrevote, blockHash, parts.Header()) + precommit, _ := cs.signVote(types.VoteTypePrecommit, blockHash, parts.Header()) + cs.mtx.Unlock() + + peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{prevote}}) + peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{precommit}}) +} + +//---------------------------------------- +// byzantine consensus reactor + +type ByzantineReactor struct { + Service + reactor *ConsensusReactor +} + +func NewByzantineReactor(conR *ConsensusReactor) *ByzantineReactor { + return &ByzantineReactor{ + Service: conR, + reactor: conR, + } +} + +func (br *ByzantineReactor) SetSwitch(s *p2p.Switch) { br.reactor.SetSwitch(s) } +func (br *ByzantineReactor) GetChannels() []*p2p.ChannelDescriptor { return br.reactor.GetChannels() } +func (br *ByzantineReactor) AddPeer(peer *p2p.Peer) { + if !br.reactor.IsRunning() { + return + } + + // Create peerState for peer + peerState := NewPeerState(peer) + peer.Data.Set(types.PeerStateKey, peerState) + + // Send our state to peer. + // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus(). + if !br.reactor.fastSync { + br.reactor.sendNewRoundStepMessage(peer) + } +} +func (br *ByzantineReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { + br.reactor.RemovePeer(peer, reason) +} +func (br *ByzantineReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte) { + br.reactor.Receive(chID, peer, msgBytes) +} + +//---------------------------------------- +// byzantine privValidator + +type ByzantinePrivValidator struct { + Address []byte `json:"address"` + types.Signer `json:"-"` + + mtx sync.Mutex +} + +// Return a priv validator that will sign anything +func NewByzantinePrivValidator(pv *types.PrivValidator) *ByzantinePrivValidator { + return &ByzantinePrivValidator{ + Address: pv.Address, + Signer: pv.Signer, + } +} + +func (privVal *ByzantinePrivValidator) GetAddress() []byte { + return privVal.Address +} + +func (privVal *ByzantinePrivValidator) SignVote(chainID string, vote *types.Vote) error { + privVal.mtx.Lock() + defer privVal.mtx.Unlock() + + // Sign + vote.Signature = privVal.Sign(types.SignBytes(chainID, vote)).(crypto.SignatureEd25519) + return nil +} + +func (privVal *ByzantinePrivValidator) SignProposal(chainID string, proposal *types.Proposal) error { + privVal.mtx.Lock() + defer privVal.mtx.Unlock() + + // Sign + proposal.Signature = privVal.Sign(types.SignBytes(chainID, proposal)).(crypto.SignatureEd25519) + return nil +} + +func (privVal *ByzantinePrivValidator) String() string { + return Fmt("PrivValidator{%X}", privVal.Address) +} diff --git a/consensus/common.go b/consensus/common.go index 02b2c4a41..1f78c585a 100644 --- a/consensus/common.go +++ b/consensus/common.go @@ -4,7 +4,7 @@ import ( "github.com/tendermint/tendermint/types" ) -// NOTE: this is blocking +// NOTE: if chanCap=0, this blocks on the event being consumed func subscribeToEvent(evsw types.EventSwitch, receiver, eventID string, chanCap int) chan interface{} { // listen for event ch := make(chan interface{}, chanCap) @@ -13,3 +13,14 @@ func subscribeToEvent(evsw types.EventSwitch, receiver, eventID string, chanCap }) return ch } + +// NOTE: this blocks on receiving a response after the event is consumed +func subscribeToEventRespond(evsw types.EventSwitch, receiver, eventID string) chan interface{} { + // listen for event + ch := make(chan interface{}) + types.AddListenerForEvent(evsw, receiver, eventID, func(data types.TMEventData) { + ch <- data + <-ch + }) + return ch +} diff --git a/consensus/common_test.go b/consensus/common_test.go index 297b842e9..7f15ab5fb 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -3,6 +3,7 @@ package consensus import ( "bytes" "fmt" + "io/ioutil" "sort" "sync" "testing" @@ -11,6 +12,7 @@ import ( . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" dbm "github.com/tendermint/go-db" + "github.com/tendermint/go-p2p" bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/config/tendermint_test" mempl "github.com/tendermint/tendermint/mempool" @@ -33,6 +35,8 @@ type validatorStub struct { *types.PrivValidator } +var testMinPower = 10 + func NewValidatorStub(privValidator *types.PrivValidator, valIndex int) *validatorStub { return &validatorStub{ Index: valIndex, @@ -266,6 +270,31 @@ func randConsensusNet(nValidators int) []*ConsensusState { return css } +// nPeers = nValidators + nNotValidator +func randConsensusNetWithPeers(nValidators int, nPeers int) []*ConsensusState { + genDoc, privVals := randGenesisDoc(nValidators, false, int64(testMinPower)) + css := make([]*ConsensusState, nPeers) + for i := 0; i < nPeers; i++ { + db := dbm.NewMemDB() // each state needs its own db + state := sm.MakeGenesisState(db, genDoc) + state.Save() + thisConfig := tendermint_test.ResetConfig(Fmt("consensus_reactor_test_%d", i)) + EnsureDir(thisConfig.GetString("cs_wal_dir"), 0700) // dir for wal + var privVal *types.PrivValidator + if i < nValidators { + privVal = privVals[i] + } else { + privVal = types.GenPrivValidator() + _, tempFilePath := Tempfile("priv_validator_") + privVal.SetFile(tempFilePath) + } + + dir, _ := ioutil.TempDir("/tmp", "persistent-dummy") + css[i] = newConsensusStateWithConfig(thisConfig, state, privVal, dummy.NewPersistentDummyApplication(dir)) + } + return css +} + func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} { voteCh0 := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 1) voteCh := make(chan interface{}) @@ -325,3 +354,16 @@ func startTestRound(cs *ConsensusState, height, round int) { cs.enterNewRound(height, round) cs.startRoutines(0) } + +//-------------------------------- +// reactor stuff + +func getSwitchIndex(switches []*p2p.Switch, peer *p2p.Peer) int { + for i, s := range switches { + if bytes.Equal(peer.NodeInfo.PubKey.Address(), s.NodeInfo().PubKey.Address()) { + return i + } + } + panic("didnt find peer in switches") + return -1 +} diff --git a/consensus/reactor.go b/consensus/reactor.go index 418bdc62d..818f8a6e5 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -111,6 +111,7 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor { // Implements Reactor func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { + log.Info("ADDING PEER") if !conR.IsRunning() { return } @@ -949,7 +950,7 @@ func (ps *PeerState) SetHasVote(vote *types.Vote) { func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) { log := log.New("peer", ps.Peer, "peerRound", ps.Round, "height", height, "round", round) - log.Info("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index) + log.Debug("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index) // NOTE: some may be nil BitArrays -> no side effects. ps.getVoteBitArray(height, round, type_).SetIndex(index, true) diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index ce1b63c9b..504bd97f3 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -1,20 +1,18 @@ package consensus import ( - "bytes" + "fmt" "sync" "testing" "time" "github.com/tendermint/tendermint/config/tendermint_test" - . "github.com/tendermint/go-common" - cfg "github.com/tendermint/go-config" - "github.com/tendermint/go-crypto" "github.com/tendermint/go-events" "github.com/tendermint/go-logger" "github.com/tendermint/go-p2p" "github.com/tendermint/tendermint/types" + "github.com/tendermint/tmsp/example/dummy" ) func init() { @@ -22,7 +20,7 @@ func init() { } func resetConfigTimeouts() { - logger.SetLogLevel("notice") + logger.SetLogLevel("info") //config.Set("log_level", "notice") config.Set("timeout_propose", 2000) // config.Set("timeout_propose_delta", 500) @@ -30,9 +28,13 @@ func resetConfigTimeouts() { // config.Set("timeout_prevote_delta", 500) // config.Set("timeout_precommit", 1000) // config.Set("timeout_precommit_delta", 500) - // config.Set("timeout_commit", 1000) + config.Set("timeout_commit", 1000) } +//---------------------------------------------- +// in-process testnets + +// Ensure a testnet makes blocks func TestReactor(t *testing.T) { resetConfigTimeouts() N := 4 @@ -51,297 +53,133 @@ func TestReactor(t *testing.T) { reactors[i].SetEventSwitch(eventSwitch) eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1) } + // make connected switches and start all reactors p2p.MakeConnectedSwitches(N, func(i int, s *p2p.Switch) *p2p.Switch { s.AddReactor("CONSENSUS", reactors[i]) return s }, p2p.Connect2Switches) // wait till everyone makes the first new block - wg := new(sync.WaitGroup) - wg.Add(N) - for i := 0; i < N; i++ { - go func(j int) { - <-eventChans[j] - wg.Done() - }(i) - } - - // Make wait into a channel - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - - tick := time.NewTicker(time.Second * 3) - select { - case <-done: - case <-tick.C: - t.Fatalf("Timed out waiting for all validators to commit first block") - } + timeoutWaitGroup(t, N, func(wg *sync.WaitGroup, j int) { + <-eventChans[j] + wg.Done() + }) } -// 4 validators. 1 is byzantine. The other three are partitioned into A (1 val) and B (2 vals). -// byzantine validator sends conflicting proposals into A and B, -// and prevotes/precommits on both of them. -// B sees a commit, A doesn't. -// Byzantine validator refuses to prevote. -// Heal partition and ensure A sees the commit -func TestByzantine(t *testing.T) { - resetConfigTimeouts() - N := 4 - css := randConsensusNet(N) - - switches := make([]*p2p.Switch, N) - for i := 0; i < N; i++ { - switches[i] = p2p.NewSwitch(cfg.NewMapConfig(nil)) - } +//------------------------------------------------------------- +// ensure we can make blocks despite cycling a validator set - reactors := make([]p2p.Reactor, N) - eventChans := make([]chan interface{}, N) - for i := 0; i < N; i++ { - if i == 0 { - css[i].privValidator = NewByzantinePrivValidator(css[i].privValidator.(*types.PrivValidator)) - // make byzantine - css[i].decideProposal = func(j int) func(int, int) { - return func(height, round int) { - byzantineDecideProposalFunc(height, round, css[j], switches[j]) - } - }(i) - css[i].doPrevote = func(height, round int) {} - } +func TestValidatorSetChanges(t *testing.T) { + resetConfigTimeouts() + nPeers := 8 + nVals := 4 + css := randConsensusNetWithPeers(nVals, nPeers) + reactors := make([]*ConsensusReactor, nPeers) + eventChans := make([]chan interface{}, nPeers) + for i := 0; i < nPeers; i++ { + reactors[i] = NewConsensusReactor(css[i], false) eventSwitch := events.NewEventSwitch() _, err := eventSwitch.Start() if err != nil { t.Fatalf("Failed to start switch: %v", err) } - eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1) - conR := NewConsensusReactor(css[i], false) - conR.SetEventSwitch(eventSwitch) + reactors[i].SetEventSwitch(eventSwitch) + eventChans[i] = subscribeToEventRespond(eventSwitch, "tester", types.EventStringNewBlock()) + } + p2p.MakeConnectedSwitches(nPeers, func(i int, s *p2p.Switch) *p2p.Switch { + s.AddReactor("CONSENSUS", reactors[i]) + return s + }, p2p.Connect2Switches) - var conRI p2p.Reactor - conRI = conR - if i == 0 { - conRI = NewByzantineReactor(conR) - } - reactors[i] = conRI + // map of active validators + activeVals := make(map[string]struct{}) + for i := 0; i < nVals; i++ { + activeVals[string(css[i].privValidator.GetAddress())] = struct{}{} } - p2p.MakeConnectedSwitches(N, func(i int, s *p2p.Switch) *p2p.Switch { - // ignore new switch s, we already made ours - switches[i].AddReactor("CONSENSUS", reactors[i]) - return switches[i] - }, func(sws []*p2p.Switch, i, j int) { - // the network starts partitioned with globally active adversary - if i != 0 { - return - } - p2p.Connect2Switches(sws, i, j) + // wait till everyone makes block 1 + timeoutWaitGroup(t, nPeers, func(wg *sync.WaitGroup, j int) { + <-eventChans[j] + eventChans[j] <- struct{}{} + wg.Done() }) - // byz proposer sends one block to peers[0] - // and the other block to peers[1] and peers[2]. - // note peers and switches order don't match. - peers := switches[0].Peers().List() - ind0 := getSwitchIndex(switches, peers[0]) - ind1 := getSwitchIndex(switches, peers[1]) - ind2 := getSwitchIndex(switches, peers[2]) + newValidatorPubKey := css[nVals].privValidator.(*types.PrivValidator).PubKey + newValidatorTx := dummy.MakeValSetChangeTx(newValidatorPubKey.Bytes(), uint64(testMinPower)) - // connect the 2 peers in the larger partition - p2p.Connect2Switches(switches, ind1, ind2) + // wait till everyone makes block 2 + // ensure the commit includes all validators + // send newValTx to change vals in block 3 + waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, newValidatorTx) - // wait for someone in the big partition to make a block + // wait till everyone makes block 3. + // it includes the commit for block 2, which is by the original validator set + waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) - select { - case <-eventChans[ind2]: - } + // wait till everyone makes block 4. + // it includes the commit for block 3, which is by the original validator set + waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) - log.Notice("A block has been committed. Healing partition") + // the commits for block 4 should be with the updated validator set + activeVals[string(newValidatorPubKey.Address())] = struct{}{} - // connect the partitions - p2p.Connect2Switches(switches, ind0, ind1) - p2p.Connect2Switches(switches, ind0, ind2) + // wait till everyone makes block 5 + // it includes the commit for block 4, which should have the updated validator set + waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) - // wait till everyone makes the first new block - // (one of them already has) - wg := new(sync.WaitGroup) - wg.Add(2) - for i := 1; i < N-1; i++ { - go func(j int) { - <-eventChans[j] - wg.Done() - }(i) - } - - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - - tick := time.NewTicker(time.Second * 10) - select { - case <-done: - case <-tick.C: - for i, reactor := range reactors { - t.Log(Fmt("Consensus Reactor %v", i)) - t.Log(Fmt("%v", reactor)) - } - t.Fatalf("Timed out waiting for all validators to commit first block") - } + // TODO: test more changes! } -func getSwitchIndex(switches []*p2p.Switch, peer *p2p.Peer) int { - for i, s := range switches { - if bytes.Equal(peer.NodeInfo.PubKey.Address(), s.NodeInfo().PubKey.Address()) { - return i +func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) { + timeoutWaitGroup(t, n, func(wg *sync.WaitGroup, j int) { + newBlock := <-eventChans[j] + err := validateBlock(newBlock.(types.EventDataNewBlock).Block, activeVals) + if err != nil { + t.Fatal(err) } - } - panic("didnt find peer in switches") - return -1 -} - -//------------------------------- -// byzantine consensus functions - -func byzantineDecideProposalFunc(height, round int, cs *ConsensusState, sw *p2p.Switch) { - // byzantine user should create two proposals and try to split the vote. - // Avoid sending on internalMsgQueue and running consensus state. - - // Create a new proposal block from state/txs from the mempool. - block1, blockParts1 := cs.createProposalBlock() - polRound, polBlockID := cs.Votes.POLInfo() - proposal1 := types.NewProposal(height, round, blockParts1.Header(), polRound, polBlockID) - cs.privValidator.SignProposal(cs.state.ChainID, proposal1) // byzantine doesnt err - - // Create a new proposal block from state/txs from the mempool. - block2, blockParts2 := cs.createProposalBlock() - polRound, polBlockID = cs.Votes.POLInfo() - proposal2 := types.NewProposal(height, round, blockParts2.Header(), polRound, polBlockID) - cs.privValidator.SignProposal(cs.state.ChainID, proposal2) // byzantine doesnt err - - block1Hash := block1.Hash() - block2Hash := block2.Hash() - - // broadcast conflicting proposals/block parts to peers - peers := sw.Peers().List() - log.Notice("Byzantine: broadcasting conflicting proposals", "peers", len(peers)) - for i, peer := range peers { - if i < len(peers)/2 { - go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1) - } else { - go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2) + for _, tx := range txs { + css[j].mempool.CheckTx(tx, nil) } - } -} -func sendProposalAndParts(height, round int, cs *ConsensusState, peer *p2p.Peer, proposal *types.Proposal, blockHash []byte, parts *types.PartSet) { - // proposal - msg := &ProposalMessage{Proposal: proposal} - peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) - - // parts - for i := 0; i < parts.Total(); i++ { - part := parts.GetPart(i) - msg := &BlockPartMessage{ - Height: height, // This tells peer that this part applies to us. - Round: round, // This tells peer that this part applies to us. - Part: part, - } - peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) - } - - // votes - cs.mtx.Lock() - prevote, _ := cs.signVote(types.VoteTypePrevote, blockHash, parts.Header()) - precommit, _ := cs.signVote(types.VoteTypePrecommit, blockHash, parts.Header()) - cs.mtx.Unlock() - - peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{prevote}}) - peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{precommit}}) -} - -//---------------------------------------- -// byzantine consensus reactor - -type ByzantineReactor struct { - Service - reactor *ConsensusReactor -} - -func NewByzantineReactor(conR *ConsensusReactor) *ByzantineReactor { - return &ByzantineReactor{ - Service: conR, - reactor: conR, - } + eventChans[j] <- struct{}{} + wg.Done() + }) } -func (br *ByzantineReactor) SetSwitch(s *p2p.Switch) { br.reactor.SetSwitch(s) } -func (br *ByzantineReactor) GetChannels() []*p2p.ChannelDescriptor { return br.reactor.GetChannels() } -func (br *ByzantineReactor) AddPeer(peer *p2p.Peer) { - if !br.reactor.IsRunning() { - return - } - - // Create peerState for peer - peerState := NewPeerState(peer) - peer.Data.Set(types.PeerStateKey, peerState) - - // Send our state to peer. - // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus(). - if !br.reactor.fastSync { - br.reactor.sendNewRoundStepMessage(peer) +// expects high synchrony! +func validateBlock(block *types.Block, activeVals map[string]struct{}) error { + if block.LastCommit.Size() != len(activeVals) { + return fmt.Errorf("Commit size doesn't match number of active validators. Got %d, expected %d", block.LastCommit.Size(), len(activeVals)) } -} -func (br *ByzantineReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { - br.reactor.RemovePeer(peer, reason) -} -func (br *ByzantineReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte) { - br.reactor.Receive(chID, peer, msgBytes) -} - -//---------------------------------------- -// byzantine privValidator - -type ByzantinePrivValidator struct { - Address []byte `json:"address"` - types.Signer `json:"-"` - mtx sync.Mutex -} - -// Return a priv validator that will sign anything -func NewByzantinePrivValidator(pv *types.PrivValidator) *ByzantinePrivValidator { - return &ByzantinePrivValidator{ - Address: pv.Address, - Signer: pv.Signer, + for _, vote := range block.LastCommit.Precommits { + if _, ok := activeVals[string(vote.ValidatorAddress)]; !ok { + return fmt.Errorf("Found vote for unactive validator %X", vote.ValidatorAddress) + } } -} - -func (privVal *ByzantinePrivValidator) GetAddress() []byte { - return privVal.Address -} - -func (privVal *ByzantinePrivValidator) SignVote(chainID string, vote *types.Vote) error { - privVal.mtx.Lock() - defer privVal.mtx.Unlock() - - // Sign - vote.Signature = privVal.Sign(types.SignBytes(chainID, vote)).(crypto.SignatureEd25519) return nil } -func (privVal *ByzantinePrivValidator) SignProposal(chainID string, proposal *types.Proposal) error { - privVal.mtx.Lock() - defer privVal.mtx.Unlock() +func timeoutWaitGroup(t *testing.T, n int, f func(*sync.WaitGroup, int)) { + wg := new(sync.WaitGroup) + wg.Add(n) + for i := 0; i < n; i++ { + go f(wg, i) + } - // Sign - proposal.Signature = privVal.Sign(types.SignBytes(chainID, proposal)).(crypto.SignatureEd25519) - return nil -} + // Make wait into a channel + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() -func (privVal *ByzantinePrivValidator) String() string { - return Fmt("PrivValidator{%X}", privVal.Address) + tick := time.NewTicker(time.Second * 3) + select { + case <-done: + case <-tick.C: + t.Fatalf("Timed out waiting for all validators to commit a block") + } } diff --git a/state/execution.go b/state/execution.go index 47cb1c0a2..268a87d4c 100644 --- a/state/execution.go +++ b/state/execution.go @@ -130,7 +130,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo log.Info("Executed block", "height", block.Height, "valid txs", validTxs, "invalid txs", invalidTxs) if len(changedValidators) > 0 { - log.Info("Update to validator set", "updates", changedValidators) + log.Info("Update to validator set", "updates", tmsp.ValidatorsString(changedValidators)) } return changedValidators, nil } @@ -325,7 +325,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { return nil } - log.Notice("TMSP Handshake", "height", blockInfo.BlockHeight, "block_hash", blockInfo.BlockHash, "app_hash", blockInfo.AppHash) + log.Notice("TMSP Handshake", "height", blockInfo.BlockHeight, "app_hash", blockInfo.AppHash) blockHeight := int(blockInfo.BlockHeight) // XXX: beware overflow appHash := blockInfo.AppHash diff --git a/state/execution_test.go b/state/execution_test.go index b518c6031..e0527a42e 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -24,10 +24,16 @@ var ( testPartSize = 65536 ) +//--------------------------------------- +// Test block execution + func TestExecBlock(t *testing.T) { // TODO } +//--------------------------------------- +// Test handshake/replay + // Sync from scratch func TestHandshakeReplayAll(t *testing.T) { testHandshakeReplay(t, 0) @@ -106,6 +112,7 @@ func testHandshakeReplay(t *testing.T, n int) { } //-------------------------- +// utils for making blocks // make some bogus txs func txsFunc(blockNum int) (txs []types.Tx) { diff --git a/types/block.go b/types/block.go index 0a46b2f61..e9574adf8 100644 --- a/types/block.go +++ b/types/block.go @@ -21,6 +21,7 @@ type Block struct { LastCommit *Commit `json:"last_commit"` } +// TODO: version func MakeBlock(height int, chainID string, txs []Tx, commit *Commit, prevBlockID BlockID, valHash, appHash []byte, partSize int) (*Block, *PartSet) { block := &Block{ @@ -150,9 +151,10 @@ func (b *Block) StringShort() string { type Header struct { ChainID string `json:"chain_id"` + Version string `json:"version"` // TODO: Height int `json:"height"` Time time.Time `json:"time"` - NumTxs int `json:"num_txs"` + NumTxs int `json:"num_txs"` // XXX: Can we get rid of this? LastBlockID BlockID `json:"last_block_id"` LastCommitHash []byte `json:"last_commit_hash"` // commit from validators from the last block DataHash []byte `json:"data_hash"` // transactions @@ -291,6 +293,8 @@ func (commit *Commit) ValidateBasic() error { return errors.New("No precommits in commit") } height, round := commit.Height(), commit.Round() + + // validate the precommits for _, precommit := range commit.Precommits { // It's OK for precommits to be missing. if precommit == nil {