Browse Source

consensus: p2p refactor (#5969)

pull/6132/head
Aleksandr Bezobchuk 3 years ago
committed by GitHub
parent
commit
16bbe8c862
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 3357 additions and 2597 deletions
  1. +2
    -2
      consensus/README.md
  2. +321
    -332
      consensus/byzantine_test.go
  3. +31
    -20
      consensus/common_test.go
  4. +56
    -39
      consensus/invalid_test.go
  5. +10
    -0
      consensus/mempool_test.go
  6. +319
    -20
      consensus/msgs.go
  7. +310
    -0
      consensus/msgs_test.go
  8. +528
    -0
      consensus/peer_state.go
  9. +1029
    -1369
      consensus/reactor.go
  10. +478
    -753
      consensus/reactor_test.go
  11. +12
    -15
      consensus/replay_test.go
  12. +58
    -0
      consensus/state_test.go
  13. +43
    -16
      node/node.go
  14. +46
    -8
      p2p/p2ptest/network.go
  15. +2
    -2
      p2p/queue.go
  16. +8
    -7
      p2p/router.go
  17. +20
    -10
      p2p/router_test.go
  18. +80
    -0
      proto/tendermint/consensus/message.go
  19. +2
    -2
      proto/tendermint/statesync/message.go
  20. +2
    -2
      statesync/reactor.go

+ 2
- 2
consensus/README.md View File

@ -1,3 +1,3 @@
# Consensus
# Consensus
See the [consensus spec](https://github.com/tendermint/spec/tree/master/spec/consensus) and the [reactor consensus spec](https://github.com/tendermint/spec/tree/master/spec/reactors/consensus) for more information.
See the [consensus spec](https://github.com/tendermint/spec/tree/master/spec/consensus).

+ 321
- 332
consensus/byzantine_test.go View File

@ -1,7 +1,6 @@
package consensus
import (
"context"
"fmt"
"os"
"path"
@ -11,45 +10,44 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
abcicli "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/evidence"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
tmsync "github.com/tendermint/tendermint/libs/sync"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
dbm "github.com/tendermint/tm-db"
)
//----------------------------------------------
// byzantine failures
// Byzantine node sends two different prevotes (nil and blockID) to the same validator
// Byzantine node sends two different prevotes (nil and blockID) to the same
// validator.
func TestByzantinePrevoteEquivocation(t *testing.T) {
const nValidators = 4
const byzantineNode = 0
const prevoteHeight = int64(2)
configSetup(t)
nValidators := 4
prevoteHeight := int64(2)
testName := "consensus_byzantine_test"
tickerFunc := newMockTickerFunc(true)
appFunc := newCounter
genDoc, privVals := randGenesisDoc(nValidators, false, 30)
css := make([]*State, nValidators)
states := make([]*State, nValidators)
for i := 0; i < nValidators; i++ {
logger := consensusLogger().With("test", "byzantine", "validator", i)
stateDB := dbm.NewMemDB() // each state needs its own db
stateStore := sm.NewStore(stateDB)
state, _ := stateStore.LoadFromDBOrGenesisDoc(genDoc)
thisConfig := ResetConfig(fmt.Sprintf("%s_%d", testName, i))
defer os.RemoveAll(thisConfig.RootDir)
ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
app := appFunc()
vals := types.TM2PB.ValidatorUpdates(state.Validators)
@ -92,77 +90,67 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
cs.SetTimeoutTicker(tickerFunc())
cs.SetLogger(logger)
css[i] = cs
states[i] = cs
}
// initialize the reactors for each of the validators
reactors := make([]*Reactor, nValidators)
blocksSubs := make([]types.Subscription, 0)
eventBuses := make([]*types.EventBus, nValidators)
for i := 0; i < nValidators; i++ {
reactors[i] = NewReactor(css[i], true) // so we dont start the consensus states
reactors[i].SetLogger(css[i].Logger)
// eventBus is already started with the cs
eventBuses[i] = css[i].eventBus
reactors[i].SetEventBus(eventBuses[i])
blocksSub, err := eventBuses[i].Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, 100)
require.NoError(t, err)
blocksSubs = append(blocksSubs, blocksSub)
if css[i].state.LastBlockHeight == 0 { // simulate handle initChain in handshake
err = css[i].blockExec.Store().Save(css[i].state)
require.NoError(t, err)
}
}
// make connected switches and start all reactors
p2p.MakeConnectedSwitches(config.P2P, nValidators, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("CONSENSUS", reactors[i])
s.SetLogger(reactors[i].conS.Logger.With("module", "p2p"))
return s
}, p2p.Connect2Switches)
rts := setup(t, nValidators, states, 100) // buffer must be large enough to not deadlock
// create byzantine validator
bcs := css[byzantineNode]
bzNode := rts.network.RandomNode()
bzReactor := rts.reactors[bzNode.NodeID]
bzState := rts.states[bzNode.NodeID]
// alter prevote so that the byzantine node double votes when height is 2
bcs.doPrevote = func(height int64, round int32) {
bzState.doPrevote = func(height int64, round int32) {
// allow first height to happen normally so that byzantine validator is no longer proposer
if height == prevoteHeight {
bcs.Logger.Info("Sending two votes")
prevote1, err := bcs.signVote(tmproto.PrevoteType, bcs.ProposalBlock.Hash(), bcs.ProposalBlockParts.Header())
prevote1, err := bzState.signVote(
tmproto.PrevoteType,
bzState.ProposalBlock.Hash(),
bzState.ProposalBlockParts.Header(),
)
require.NoError(t, err)
prevote2, err := bcs.signVote(tmproto.PrevoteType, nil, types.PartSetHeader{})
prevote2, err := bzState.signVote(tmproto.PrevoteType, nil, types.PartSetHeader{})
require.NoError(t, err)
peerList := reactors[byzantineNode].Switch.Peers().List()
bcs.Logger.Info("Getting peer list", "peers", peerList)
// send two votes to all peers (1st to one half, 2nd to another half)
for i, peer := range peerList {
if i < len(peerList)/2 {
bcs.Logger.Info("Signed and pushed vote", "vote", prevote1, "peer", peer)
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote1}))
i := 0
for _, ps := range bzReactor.peers {
if i < len(bzReactor.peers)/2 {
bzState.Logger.Info("signed and pushed vote", "vote", prevote1, "peer", ps.peerID)
bzReactor.voteCh.Out <- p2p.Envelope{
To: ps.peerID,
Message: &tmcons.Vote{
Vote: prevote1.ToProto(),
},
}
} else {
bcs.Logger.Info("Signed and pushed vote", "vote", prevote2, "peer", peer)
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote2}))
bzState.Logger.Info("signed and pushed vote", "vote", prevote2, "peer", ps.peerID)
bzReactor.voteCh.Out <- p2p.Envelope{
To: ps.peerID,
Message: &tmcons.Vote{
Vote: prevote2.ToProto(),
},
}
}
i++
}
} else {
bcs.Logger.Info("Behaving normally")
bcs.defaultDoPrevote(height, round)
bzState.Logger.Info("behaving normally")
bzState.defaultDoPrevote(height, round)
}
}
// introducing a lazy proposer means that the time of the block committed is different to the
// timestamp that the other nodes have. This tests to ensure that the evidence that finally gets
// proposed will have a valid timestamp
lazyProposer := css[1]
// Introducing a lazy proposer means that the time of the block committed is
// different to the timestamp that the other nodes have. This tests to ensure
// that the evidence that finally gets proposed will have a valid timestamp.
lazyProposer := states[1]
lazyProposer.decideProposal = func(height int64, round int32) {
lazyProposer.Logger.Info("Lazy Proposer proposing condensed commit")
if lazyProposer.privValidator == nil {
panic("entered createProposalBlock with privValidator being nil")
}
require.NotNil(t, lazyProposer.privValidator)
var commit *types.Commit
switch {
@ -219,30 +207,33 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
}
}
// start the consensus reactors
for i := 0; i < nValidators; i++ {
s := reactors[i].conS.GetState()
reactors[i].SwitchToConsensus(s, false)
for _, reactor := range rts.reactors {
state := reactor.state.GetState()
reactor.SwitchToConsensus(state, false)
}
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
// Evidence should be submitted and committed at the third height but
// we will check the first six just in case
evidenceFromEachValidator := make([]types.Evidence, nValidators)
wg := new(sync.WaitGroup)
for i := 0; i < nValidators; i++ {
i := 0
for _, sub := range rts.subs {
wg.Add(1)
go func(i int) {
go func(j int, s types.Subscription) {
defer wg.Done()
for msg := range blocksSubs[i].Out() {
for msg := range s.Out() {
block := msg.Data().(types.EventDataNewBlock).Block
if len(block.Evidence.Evidence) != 0 {
evidenceFromEachValidator[i] = block.Evidence.Evidence[0]
evidenceFromEachValidator[j] = block.Evidence.Evidence[0]
return
}
}
}(i)
}(i, sub)
i++
}
done := make(chan struct{})
@ -251,7 +242,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
close(done)
}()
pubkey, err := bcs.privValidator.GetPubKey()
pubkey, err := bzState.privValidator.GetPubKey()
require.NoError(t, err)
select {
@ -265,10 +256,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
}
}
case <-time.After(20 * time.Second):
for i, reactor := range reactors {
t.Logf("Consensus Reactor %d\n%v", i, reactor)
}
t.Fatalf("Timed out waiting for validators to commit evidence")
t.Fatalf("timed out waiting for validators to commit evidence")
}
}
@ -278,259 +266,260 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// B sees a commit, A doesn't.
// Heal partition and ensure A sees the commit
func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
N := 4
logger := consensusLogger().With("test", "byzantine")
app := newCounter
css, cleanup := randConsensusNet(N, "consensus_byzantine_test", newMockTickerFunc(false), app)
defer cleanup()
// give the byzantine validator a normal ticker
ticker := NewTimeoutTicker()
ticker.SetLogger(css[0].Logger)
css[0].SetTimeoutTicker(ticker)
p2pLogger := logger.With("module", "p2p")
blocksSubs := make([]types.Subscription, N)
reactors := make([]p2p.Reactor, N)
for i := 0; i < N; i++ {
// enable txs so we can create different proposals
assertMempool(css[i].txNotifier).EnableTxsAvailable()
eventBus := css[i].eventBus
eventBus.SetLogger(logger.With("module", "events", "validator", i))
var err error
blocksSubs[i], err = eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
require.NoError(t, err)
conR := NewReactor(css[i], true) // so we don't start the consensus states
conR.SetLogger(logger.With("validator", i))
conR.SetEventBus(eventBus)
var conRI p2p.Reactor = conR
// make first val byzantine
if i == 0 {
conRI = NewByzantineReactor(conR)
}
reactors[i] = conRI
err = css[i].blockExec.Store().Save(css[i].state) // for save height 1's validators info
require.NoError(t, err)
}
switches := p2p.MakeConnectedSwitches(config.P2P, N, func(i int, sw *p2p.Switch) *p2p.Switch {
sw.SetLogger(p2pLogger.With("validator", i))
sw.AddReactor("CONSENSUS", reactors[i])
return sw
}, func(sws []*p2p.Switch, i, j int) {
// the network starts partitioned with globally active adversary
if i != 0 {
return
}
p2p.Connect2Switches(sws, i, j)
})
// make first val byzantine
// NOTE: Now, test validators are MockPV, which by default doesn't
// do any safety checks.
css[0].privValidator.(types.MockPV).DisableChecks()
css[0].decideProposal = func(j int32) func(int64, int32) {
return func(height int64, round int32) {
byzantineDecideProposalFunc(t, height, round, css[j], switches[j])
}
}(int32(0))
// We are setting the prevote function to do nothing because the prevoting
// and precommitting are done alongside the proposal.
css[0].doPrevote = func(height int64, round int32) {}
defer func() {
for _, sw := range switches {
err := sw.Stop()
require.NoError(t, err)
}
}()
// start the non-byz state machines.
// note these must be started before the byz
for i := 1; i < N; i++ {
cr := reactors[i].(*Reactor)
cr.SwitchToConsensus(cr.conS.GetState(), false)
}
// start the byzantine state machine
byzR := reactors[0].(*ByzantineReactor)
s := byzR.reactor.conS.GetState()
byzR.reactor.SwitchToConsensus(s, false)
// byz proposer sends one block to peers[0]
// and the other block to peers[1] and peers[2].
// note peers and switches order don't match.
peers := switches[0].Peers().List()
// partition A
ind0 := getSwitchIndex(switches, peers[0])
// partition B
ind1 := getSwitchIndex(switches, peers[1])
ind2 := getSwitchIndex(switches, peers[2])
p2p.Connect2Switches(switches, ind1, ind2)
// wait for someone in the big partition (B) to make a block
<-blocksSubs[ind2].Out()
t.Log("A block has been committed. Healing partition")
p2p.Connect2Switches(switches, ind0, ind1)
p2p.Connect2Switches(switches, ind0, ind2)
// wait till everyone makes the first new block
// (one of them already has)
wg := new(sync.WaitGroup)
for i := 1; i < N-1; i++ {
wg.Add(1)
go func(j int) {
<-blocksSubs[j].Out()
wg.Done()
}(i)
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
tick := time.NewTicker(time.Second * 10)
select {
case <-done:
case <-tick.C:
for i, reactor := range reactors {
t.Log(fmt.Sprintf("Consensus Reactor %v", i))
t.Log(fmt.Sprintf("%v", reactor))
}
t.Fatalf("Timed out waiting for all validators to commit first block")
}
}
//-------------------------------
// byzantine consensus functions
func byzantineDecideProposalFunc(t *testing.T, height int64, round int32, cs *State, sw *p2p.Switch) {
// byzantine user should create two proposals and try to split the vote.
// Avoid sending on internalMsgQueue and running consensus state.
// Create a new proposal block from state/txs from the mempool.
block1, blockParts1 := cs.createProposalBlock()
polRound, propBlockID := cs.ValidRound, types.BlockID{Hash: block1.Hash(), PartSetHeader: blockParts1.Header()}
proposal1 := types.NewProposal(height, round, polRound, propBlockID)
p1 := proposal1.ToProto()
if err := cs.privValidator.SignProposal(cs.state.ChainID, p1); err != nil {
t.Error(err)
}
proposal1.Signature = p1.Signature
// some new transactions come in (this ensures that the proposals are different)
deliverTxsRange(cs, 0, 1)
// Create a new proposal block from state/txs from the mempool.
block2, blockParts2 := cs.createProposalBlock()
polRound, propBlockID = cs.ValidRound, types.BlockID{Hash: block2.Hash(), PartSetHeader: blockParts2.Header()}
proposal2 := types.NewProposal(height, round, polRound, propBlockID)
p2 := proposal2.ToProto()
if err := cs.privValidator.SignProposal(cs.state.ChainID, p2); err != nil {
t.Error(err)
}
proposal2.Signature = p2.Signature
block1Hash := block1.Hash()
block2Hash := block2.Hash()
// broadcast conflicting proposals/block parts to peers
peers := sw.Peers().List()
t.Logf("Byzantine: broadcasting conflicting proposals to %d peers", len(peers))
for i, peer := range peers {
if i < len(peers)/2 {
go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1)
} else {
go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2)
}
}
}
func sendProposalAndParts(
height int64,
round int32,
cs *State,
peer p2p.Peer,
proposal *types.Proposal,
blockHash []byte,
parts *types.PartSet,
) {
// proposal
msg := &ProposalMessage{Proposal: proposal}
peer.Send(DataChannel, MustEncode(msg))
// parts
for i := 0; i < int(parts.Total()); i++ {
part := parts.GetPart(i)
msg := &BlockPartMessage{
Height: height, // This tells peer that this part applies to us.
Round: round, // This tells peer that this part applies to us.
Part: part,
}
peer.Send(DataChannel, MustEncode(msg))
}
// votes
cs.mtx.Lock()
prevote, _ := cs.signVote(tmproto.PrevoteType, blockHash, parts.Header())
precommit, _ := cs.signVote(tmproto.PrecommitType, blockHash, parts.Header())
cs.mtx.Unlock()
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote}))
peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
// TODO: https://github.com/tendermint/tendermint/issues/6092
t.SkipNow()
// n := 4
// logger := consensusLogger().With("test", "byzantine")
// app := newCounter
// states, cleanup := randConsensusState(n, "consensus_byzantine_test", newMockTickerFunc(false), app)
// t.Cleanup(cleanup)
// // give the byzantine validator a normal ticker
// ticker := NewTimeoutTicker()
// ticker.SetLogger(states[0].Logger)
// states[0].SetTimeoutTicker(ticker)
// p2pLogger := logger.With("module", "p2p")
// blocksSubs := make([]types.Subscription, n)
// reactors := make([]p2p.Reactor, n)
// for i := 0; i < n; i++ {
// // enable txs so we can create different proposals
// assertMempool(states[i].txNotifier).EnableTxsAvailable()
// eventBus := states[i].eventBus
// eventBus.SetLogger(logger.With("module", "events", "validator", i))
// var err error
// blocksSubs[i], err = eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
// require.NoError(t, err)
// conR := NewReactor(states[i], true) // so we don't start the consensus states
// conR.SetLogger(logger.With("validator", i))
// conR.SetEventBus(eventBus)
// var conRI p2p.Reactor = conR
// // make first val byzantine
// if i == 0 {
// conRI = NewByzantineReactor(conR)
// }
// reactors[i] = conRI
// err = states[i].blockExec.Store().Save(states[i].state) // for save height 1's validators info
// require.NoError(t, err)
// }
// switches := p2p.MakeConnectedSwitches(config.P2P, N, func(i int, sw *p2p.Switch) *p2p.Switch {
// sw.SetLogger(p2pLogger.With("validator", i))
// sw.AddReactor("CONSENSUS", reactors[i])
// return sw
// }, func(sws []*p2p.Switch, i, j int) {
// // the network starts partitioned with globally active adversary
// if i != 0 {
// return
// }
// p2p.Connect2Switches(sws, i, j)
// })
// // make first val byzantine
// // NOTE: Now, test validators are MockPV, which by default doesn't
// // do any safety checks.
// states[0].privValidator.(types.MockPV).DisableChecks()
// states[0].decideProposal = func(j int32) func(int64, int32) {
// return func(height int64, round int32) {
// byzantineDecideProposalFunc(t, height, round, states[j], switches[j])
// }
// }(int32(0))
// // We are setting the prevote function to do nothing because the prevoting
// // and precommitting are done alongside the proposal.
// states[0].doPrevote = func(height int64, round int32) {}
// defer func() {
// for _, sw := range switches {
// err := sw.Stop()
// require.NoError(t, err)
// }
// }()
// // start the non-byz state machines.
// // note these must be started before the byz
// for i := 1; i < n; i++ {
// cr := reactors[i].(*Reactor)
// cr.SwitchToConsensus(cr.conS.GetState(), false)
// }
// // start the byzantine state machine
// byzR := reactors[0].(*ByzantineReactor)
// s := byzR.reactor.conS.GetState()
// byzR.reactor.SwitchToConsensus(s, false)
// // byz proposer sends one block to peers[0]
// // and the other block to peers[1] and peers[2].
// // note peers and switches order don't match.
// peers := switches[0].Peers().List()
// // partition A
// ind0 := getSwitchIndex(switches, peers[0])
// // partition B
// ind1 := getSwitchIndex(switches, peers[1])
// ind2 := getSwitchIndex(switches, peers[2])
// p2p.Connect2Switches(switches, ind1, ind2)
// // wait for someone in the big partition (B) to make a block
// <-blocksSubs[ind2].Out()
// t.Log("A block has been committed. Healing partition")
// p2p.Connect2Switches(switches, ind0, ind1)
// p2p.Connect2Switches(switches, ind0, ind2)
// // wait till everyone makes the first new block
// // (one of them already has)
// wg := new(sync.WaitGroup)
// for i := 1; i < N-1; i++ {
// wg.Add(1)
// go func(j int) {
// <-blocksSubs[j].Out()
// wg.Done()
// }(i)
// }
// done := make(chan struct{})
// go func() {
// wg.Wait()
// close(done)
// }()
// tick := time.NewTicker(time.Second * 10)
// select {
// case <-done:
// case <-tick.C:
// for i, reactor := range reactors {
// t.Log(fmt.Sprintf("Consensus Reactor %v", i))
// t.Log(fmt.Sprintf("%v", reactor))
// }
// t.Fatalf("Timed out waiting for all validators to commit first block")
// }
}
//----------------------------------------
// byzantine consensus reactor
type ByzantineReactor struct {
service.Service
reactor *Reactor
}
func NewByzantineReactor(conR *Reactor) *ByzantineReactor {
return &ByzantineReactor{
Service: conR,
reactor: conR,
}
}
func (br *ByzantineReactor) SetSwitch(s *p2p.Switch) { br.reactor.SetSwitch(s) }
func (br *ByzantineReactor) GetChannels() []*p2p.ChannelDescriptor { return br.reactor.GetChannels() }
func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {
if !br.reactor.IsRunning() {
return
}
// Create peerState for peer
peerState := NewPeerState(peer).SetLogger(br.reactor.Logger)
peer.Set(types.PeerStateKey, peerState)
// Send our state to peer.
// If we're syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
if !br.reactor.waitSync {
br.reactor.sendNewRoundStepMessage(peer)
}
}
func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
br.reactor.RemovePeer(peer, reason)
}
func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
br.reactor.Receive(chID, peer, msgBytes)
}
func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }
// func byzantineDecideProposalFunc(t *testing.T, height int64, round int32, cs *State, sw *p2p.Switch) {
// // byzantine user should create two proposals and try to split the vote.
// // Avoid sending on internalMsgQueue and running consensus state.
// // Create a new proposal block from state/txs from the mempool.
// block1, blockParts1 := cs.createProposalBlock()
// polRound, propBlockID := cs.ValidRound, types.BlockID{Hash: block1.Hash(), PartSetHeader: blockParts1.Header()}
// proposal1 := types.NewProposal(height, round, polRound, propBlockID)
// p1 := proposal1.ToProto()
// if err := cs.privValidator.SignProposal(cs.state.ChainID, p1); err != nil {
// t.Error(err)
// }
// proposal1.Signature = p1.Signature
// // some new transactions come in (this ensures that the proposals are different)
// deliverTxsRange(cs, 0, 1)
// // Create a new proposal block from state/txs from the mempool.
// block2, blockParts2 := cs.createProposalBlock()
// polRound, propBlockID = cs.ValidRound, types.BlockID{Hash: block2.Hash(), PartSetHeader: blockParts2.Header()}
// proposal2 := types.NewProposal(height, round, polRound, propBlockID)
// p2 := proposal2.ToProto()
// if err := cs.privValidator.SignProposal(cs.state.ChainID, p2); err != nil {
// t.Error(err)
// }
// proposal2.Signature = p2.Signature
// block1Hash := block1.Hash()
// block2Hash := block2.Hash()
// // broadcast conflicting proposals/block parts to peers
// peers := sw.Peers().List()
// t.Logf("Byzantine: broadcasting conflicting proposals to %d peers", len(peers))
// for i, peer := range peers {
// if i < len(peers)/2 {
// go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1)
// } else {
// go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2)
// }
// }
// }
// func sendProposalAndParts(
// height int64,
// round int32,
// cs *State,
// peer p2p.Peer,
// proposal *types.Proposal,
// blockHash []byte,
// parts *types.PartSet,
// ) {
// // proposal
// msg := &ProposalMessage{Proposal: proposal}
// peer.Send(DataChannel, MustEncode(msg))
// // parts
// for i := 0; i < int(parts.Total()); i++ {
// part := parts.GetPart(i)
// msg := &BlockPartMessage{
// Height: height, // This tells peer that this part applies to us.
// Round: round, // This tells peer that this part applies to us.
// Part: part,
// }
// peer.Send(DataChannel, MustEncode(msg))
// }
// // votes
// cs.mtx.Lock()
// prevote, _ := cs.signVote(tmproto.PrevoteType, blockHash, parts.Header())
// precommit, _ := cs.signVote(tmproto.PrecommitType, blockHash, parts.Header())
// cs.mtx.Unlock()
// peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote}))
// peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
// }
// type ByzantineReactor struct {
// service.Service
// reactor *Reactor
// }
// func NewByzantineReactor(conR *Reactor) *ByzantineReactor {
// return &ByzantineReactor{
// Service: conR,
// reactor: conR,
// }
// }
// func (br *ByzantineReactor) SetSwitch(s *p2p.Switch) { br.reactor.SetSwitch(s) }
// func (br *ByzantineReactor) GetChannels() []*p2p.ChannelDescriptor { return br.reactor.GetChannels() }
// func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {
// if !br.reactor.IsRunning() {
// return
// }
// // Create peerState for peer
// peerState := NewPeerState(peer).SetLogger(br.reactor.Logger)
// peer.Set(types.PeerStateKey, peerState)
// // Send our state to peer.
// // If we're syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
// if !br.reactor.waitSync {
// br.reactor.sendNewRoundStepMessage(peer)
// }
// }
// func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// br.reactor.RemovePeer(peer, reason)
// }
// func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
// br.reactor.Receive(chID, peer, msgBytes)
// }
// func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }

+ 31
- 20
consensus/common_test.go View File

@ -31,7 +31,6 @@ import (
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmsync "github.com/tendermint/tendermint/libs/sync"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/privval"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
sm "github.com/tendermint/tendermint/state"
@ -55,6 +54,24 @@ var (
ensureTimeout = time.Millisecond * 200
)
func configSetup(t *testing.T) {
t.Helper()
config = ResetConfig("consensus_reactor_test")
consensusReplayConfig = ResetConfig("consensus_replay_test")
configStateTest := ResetConfig("consensus_state_test")
configMempoolTest := ResetConfig("consensus_mempool_test")
configByzantineTest := ResetConfig("consensus_byzantine_test")
t.Cleanup(func() {
os.RemoveAll(config.RootDir)
os.RemoveAll(consensusReplayConfig.RootDir)
os.RemoveAll(configStateTest.RootDir)
os.RemoveAll(configMempoolTest.RootDir)
os.RemoveAll(configByzantineTest.RootDir)
})
}
func ensureDir(dir string, mode os.FileMode) {
if err := tmos.EnsureDir(dir, mode); err != nil {
panic(err)
@ -675,11 +692,18 @@ func consensusLogger() log.Logger {
}).With("module", "consensus")
}
func randConsensusNet(nValidators int, testName string, tickerFunc func() TimeoutTicker,
appFunc func() abci.Application, configOpts ...func(*cfg.Config)) ([]*State, cleanupFunc) {
func randConsensusState(
nValidators int,
testName string,
tickerFunc func() TimeoutTicker,
appFunc func() abci.Application,
configOpts ...func(*cfg.Config),
) ([]*State, cleanupFunc) {
genDoc, privVals := randGenesisDoc(nValidators, false, 30)
css := make([]*State, nValidators)
logger := consensusLogger()
configRootDirs := make([]string, 0, nValidators)
for i := 0; i < nValidators; i++ {
stateDB := dbm.NewMemDB() // each state needs its own db
@ -687,10 +711,13 @@ func randConsensusNet(nValidators int, testName string, tickerFunc func() Timeou
state, _ := stateStore.LoadFromDBOrGenesisDoc(genDoc)
thisConfig := ResetConfig(fmt.Sprintf("%s_%d", testName, i))
configRootDirs = append(configRootDirs, thisConfig.RootDir)
for _, opt := range configOpts {
opt(thisConfig)
}
ensureDir(filepath.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
app := appFunc()
vals := types.TM2PB.ValidatorUpdates(state.Validators)
app.InitChain(abci.RequestInitChain{Validators: vals})
@ -699,6 +726,7 @@ func randConsensusNet(nValidators int, testName string, tickerFunc func() Timeou
css[i].SetTimeoutTicker(tickerFunc())
css[i].SetLogger(logger.With("validator", i, "module", "consensus"))
}
return css, func() {
for _, dir := range configRootDirs {
os.RemoveAll(dir)
@ -768,18 +796,6 @@ func randConsensusNetWithPeers(
}
}
func getSwitchIndex(switches []*p2p.Switch, peer p2p.Peer) int {
for i, s := range switches {
if peer.NodeInfo().ID() == s.NodeInfo().ID() {
return i
}
}
panic("didnt find peer in switches")
}
//-------------------------------------------------------------------------------
// genesis
func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.GenesisDoc, []types.PrivValidator) {
validators := make([]types.GenesisValidator, numValidators)
privValidators := make([]types.PrivValidator, numValidators)
@ -807,9 +823,6 @@ func randGenesisState(numValidators int, randPower bool, minPower int64) (sm.Sta
return s0, privValidators
}
//------------------------------------
// mock ticker
func newMockTickerFunc(onlyOnce bool) func() TimeoutTicker {
return func() TimeoutTicker {
return &mockTicker{
@ -855,8 +868,6 @@ func (m *mockTicker) Chan() <-chan timeoutInfo {
func (*mockTicker) SetLogger(log.Logger) {}
//------------------------------------
func newCounter() abci.Application {
return counter.NewApplication(true)
}


+ 56
- 39
consensus/invalid_test.go View File

@ -1,60 +1,72 @@
package consensus
import (
"sync"
"testing"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/p2p"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/types"
)
//----------------------------------------------
// byzantine failures
// one byz val sends a precommit for a random block at each height
// Ensure a testnet makes blocks
func TestReactorInvalidPrecommit(t *testing.T) {
N := 4
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
configSetup(t)
n := 4
states, cleanup := randConsensusState(n, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
t.Cleanup(cleanup)
for i := 0; i < 4; i++ {
ticker := NewTimeoutTicker()
ticker.SetLogger(css[i].Logger)
css[i].SetTimeoutTicker(ticker)
ticker.SetLogger(states[i].Logger)
states[i].SetTimeoutTicker(ticker)
}
reactors, blocksSubs, eventBuses := startConsensusNet(t, css, N)
rts := setup(t, n, states, 100) // buffer must be large enough to not deadlock
for _, reactor := range rts.reactors {
state := reactor.state.GetState()
reactor.SwitchToConsensus(state, false)
}
// this val sends a random precommit at each height
byzValIdx := 0
byzVal := css[byzValIdx]
byzR := reactors[byzValIdx]
// update the doPrevote function to just send a valid precommit for a random block
// and otherwise disable the priv validator
byzVal.mtx.Lock()
pv := byzVal.privValidator
byzVal.doPrevote = func(height int64, round int32) {
invalidDoPrevoteFunc(t, height, round, byzVal, byzR.Switch, pv)
node := rts.network.RandomNode()
byzState := rts.states[node.NodeID]
byzReactor := rts.reactors[node.NodeID]
// Update the doPrevote function to just send a valid precommit for a random
// block and otherwise disable the priv validator.
byzState.mtx.Lock()
privVal := byzState.privValidator
byzState.doPrevote = func(height int64, round int32) {
invalidDoPrevoteFunc(t, height, round, byzState, byzReactor, privVal)
}
byzVal.mtx.Unlock()
t.Cleanup(func() { stopConsensusNet(log.TestingLogger(), reactors, eventBuses) })
byzState.mtx.Unlock()
// wait for a bunch of blocks
// TODO: make this tighter by ensuring the halt happens by block 2
//
// TODO: Make this tighter by ensuring the halt happens by block 2.
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
timeoutWaitGroup(t, N, func(j int) {
<-blocksSubs[j].Out()
}, css)
for _, sub := range rts.subs {
wg.Add(1)
go func(s types.Subscription) {
<-s.Out()
wg.Done()
}(sub)
}
}
wg.Wait()
}
func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, sw *p2p.Switch, pv types.PrivValidator) {
func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, r *Reactor, pv types.PrivValidator) {
// routine to:
// - precommit for a random block
// - send precommit to all peers
@ -62,10 +74,10 @@ func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, sw
go func() {
cs.mtx.Lock()
cs.privValidator = pv
pubKey, err := cs.privValidator.GetPubKey()
if err != nil {
panic(err)
}
require.NoError(t, err)
addr := pubKey.Address()
valIndex, _ := cs.Validators.GetByAddress(addr)
@ -82,19 +94,24 @@ func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, sw
Hash: blockHash,
PartSetHeader: types.PartSetHeader{Total: 1, Hash: tmrand.Bytes(32)}},
}
p := precommit.ToProto()
err = cs.privValidator.SignVote(cs.state.ChainID, p)
if err != nil {
t.Error(err)
}
require.NoError(t, err)
precommit.Signature = p.Signature
cs.privValidator = nil // disable priv val so we don't do normal votes
cs.mtx.Unlock()
peers := sw.Peers().List()
for _, peer := range peers {
cs.Logger.Info("Sending bad vote", "block", blockHash, "peer", peer)
peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
for _, ps := range r.peers {
cs.Logger.Info("sending bad vote", "block", blockHash, "peer", ps.peerID)
r.voteCh.Out <- p2p.Envelope{
To: ps.peerID,
Message: &tmcons.Vote{
Vote: precommit.ToProto(),
},
}
}
}()
}

+ 10
- 0
consensus/mempool_test.go View File

@ -25,6 +25,8 @@ func assertMempool(txn txNotifier) mempl.Mempool {
}
func TestMempoolNoProgressUntilTxsAvailable(t *testing.T) {
configSetup(t)
config := ResetConfig("consensus_mempool_txs_available_test")
t.Cleanup(func() { _ = os.RemoveAll(config.RootDir) })
@ -45,6 +47,8 @@ func TestMempoolNoProgressUntilTxsAvailable(t *testing.T) {
}
func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) {
configSetup(t)
config := ResetConfig("consensus_mempool_txs_available_test")
t.Cleanup(func() { _ = os.RemoveAll(config.RootDir) })
@ -63,6 +67,8 @@ func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) {
}
func TestMempoolProgressInHigherRound(t *testing.T) {
configSetup(t)
config := ResetConfig("consensus_mempool_txs_available_test")
t.Cleanup(func() { _ = os.RemoveAll(config.RootDir) })
@ -113,6 +119,8 @@ func deliverTxsRange(cs *State, start, end int) {
}
func TestMempoolTxConcurrentWithCommit(t *testing.T) {
configSetup(t)
state, privVals := randGenesisState(1, false, 10)
blockDB := dbm.NewMemDB()
stateStore := sm.NewStore(blockDB)
@ -137,6 +145,8 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) {
}
func TestMempoolRmBadTx(t *testing.T) {
configSetup(t)
state, privVals := randGenesisState(1, false, 10)
app := NewCounterApplication()
blockDB := dbm.NewMemDB()


+ 319
- 20
consensus/msgs.go View File

@ -4,10 +4,9 @@ import (
"errors"
"fmt"
"github.com/gogo/protobuf/proto"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/libs/bits"
tmjson "github.com/tendermint/tendermint/libs/json"
tmmath "github.com/tendermint/tendermint/libs/math"
"github.com/tendermint/tendermint/p2p"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
@ -15,7 +14,309 @@ import (
"github.com/tendermint/tendermint/types"
)
// MsgToProto takes a consensus message type and returns the proto defined consensus message
// Message defines an interface that the consensus domain types implement. When
// a proto message is received on a consensus p2p Channel, it is wrapped and then
// converted to a Message via MsgFromProto.
type Message interface {
ValidateBasic() error
}
func init() {
tmjson.RegisterType(&NewRoundStepMessage{}, "tendermint/NewRoundStepMessage")
tmjson.RegisterType(&NewValidBlockMessage{}, "tendermint/NewValidBlockMessage")
tmjson.RegisterType(&ProposalMessage{}, "tendermint/Proposal")
tmjson.RegisterType(&ProposalPOLMessage{}, "tendermint/ProposalPOL")
tmjson.RegisterType(&BlockPartMessage{}, "tendermint/BlockPart")
tmjson.RegisterType(&VoteMessage{}, "tendermint/Vote")
tmjson.RegisterType(&HasVoteMessage{}, "tendermint/HasVote")
tmjson.RegisterType(&VoteSetMaj23Message{}, "tendermint/VoteSetMaj23")
tmjson.RegisterType(&VoteSetBitsMessage{}, "tendermint/VoteSetBits")
}
// NewRoundStepMessage is sent for every step taken in the ConsensusState.
// For every height/round/step transition
type NewRoundStepMessage struct {
Height int64
Round int32
Step cstypes.RoundStepType
SecondsSinceStartTime int64
LastCommitRound int32
}
// ValidateBasic performs basic validation.
func (m *NewRoundStepMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
if m.Round < 0 {
return errors.New("negative Round")
}
if !m.Step.IsValid() {
return errors.New("invalid Step")
}
// NOTE: SecondsSinceStartTime may be negative
// LastCommitRound will be -1 for the initial height, but we don't know what height this is
// since it can be specified in genesis. The reactor will have to validate this via
// ValidateHeight().
if m.LastCommitRound < -1 {
return errors.New("invalid LastCommitRound (cannot be < -1)")
}
return nil
}
// ValidateHeight validates the height given the chain's initial height.
func (m *NewRoundStepMessage) ValidateHeight(initialHeight int64) error {
if m.Height < initialHeight {
return fmt.Errorf("invalid Height %v (lower than initial height %v)",
m.Height, initialHeight)
}
if m.Height == initialHeight && m.LastCommitRound != -1 {
return fmt.Errorf("invalid LastCommitRound %v (must be -1 for initial height %v)",
m.LastCommitRound, initialHeight)
}
if m.Height > initialHeight && m.LastCommitRound < 0 {
return fmt.Errorf("LastCommitRound can only be negative for initial height %v", // nolint
initialHeight)
}
return nil
}
// String returns a string representation.
func (m *NewRoundStepMessage) String() string {
return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
m.Height, m.Round, m.Step, m.LastCommitRound)
}
// NewValidBlockMessage is sent when a validator observes a valid block B in some round r,
// i.e., there is a Proposal for block B and 2/3+ prevotes for the block B in the round r.
// In case the block is also committed, then IsCommit flag is set to true.
type NewValidBlockMessage struct {
Height int64
Round int32
BlockPartSetHeader types.PartSetHeader
BlockParts *bits.BitArray
IsCommit bool
}
// ValidateBasic performs basic validation.
func (m *NewValidBlockMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
if m.Round < 0 {
return errors.New("negative Round")
}
if err := m.BlockPartSetHeader.ValidateBasic(); err != nil {
return fmt.Errorf("wrong BlockPartSetHeader: %v", err)
}
if m.BlockParts.Size() == 0 {
return errors.New("empty blockParts")
}
if m.BlockParts.Size() != int(m.BlockPartSetHeader.Total) {
return fmt.Errorf("blockParts bit array size %d not equal to BlockPartSetHeader.Total %d",
m.BlockParts.Size(),
m.BlockPartSetHeader.Total)
}
if m.BlockParts.Size() > int(types.MaxBlockPartsCount) {
return fmt.Errorf("blockParts bit array is too big: %d, max: %d", m.BlockParts.Size(), types.MaxBlockPartsCount)
}
return nil
}
// String returns a string representation.
func (m *NewValidBlockMessage) String() string {
return fmt.Sprintf("[ValidBlockMessage H:%v R:%v BP:%v BA:%v IsCommit:%v]",
m.Height, m.Round, m.BlockPartSetHeader, m.BlockParts, m.IsCommit)
}
// ProposalMessage is sent when a new block is proposed.
type ProposalMessage struct {
Proposal *types.Proposal
}
// ValidateBasic performs basic validation.
func (m *ProposalMessage) ValidateBasic() error {
return m.Proposal.ValidateBasic()
}
// String returns a string representation.
func (m *ProposalMessage) String() string {
return fmt.Sprintf("[Proposal %v]", m.Proposal)
}
// ProposalPOLMessage is sent when a previous proposal is re-proposed.
type ProposalPOLMessage struct {
Height int64
ProposalPOLRound int32
ProposalPOL *bits.BitArray
}
// ValidateBasic performs basic validation.
func (m *ProposalPOLMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
if m.ProposalPOLRound < 0 {
return errors.New("negative ProposalPOLRound")
}
if m.ProposalPOL.Size() == 0 {
return errors.New("empty ProposalPOL bit array")
}
if m.ProposalPOL.Size() > types.MaxVotesCount {
return fmt.Errorf("proposalPOL bit array is too big: %d, max: %d", m.ProposalPOL.Size(), types.MaxVotesCount)
}
return nil
}
// String returns a string representation.
func (m *ProposalPOLMessage) String() string {
return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
}
// BlockPartMessage is sent when gossipping a piece of the proposed block.
type BlockPartMessage struct {
Height int64
Round int32
Part *types.Part
}
// ValidateBasic performs basic validation.
func (m *BlockPartMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
if m.Round < 0 {
return errors.New("negative Round")
}
if err := m.Part.ValidateBasic(); err != nil {
return fmt.Errorf("wrong Part: %v", err)
}
return nil
}
// String returns a string representation.
func (m *BlockPartMessage) String() string {
return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
}
// VoteMessage is sent when voting for a proposal (or lack thereof).
type VoteMessage struct {
Vote *types.Vote
}
// ValidateBasic performs basic validation.
func (m *VoteMessage) ValidateBasic() error {
return m.Vote.ValidateBasic()
}
// String returns a string representation.
func (m *VoteMessage) String() string {
return fmt.Sprintf("[Vote %v]", m.Vote)
}
// HasVoteMessage is sent to indicate that a particular vote has been received.
type HasVoteMessage struct {
Height int64
Round int32
Type tmproto.SignedMsgType
Index int32
}
// ValidateBasic performs basic validation.
func (m *HasVoteMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
if m.Round < 0 {
return errors.New("negative Round")
}
if !types.IsVoteTypeValid(m.Type) {
return errors.New("invalid Type")
}
if m.Index < 0 {
return errors.New("negative Index")
}
return nil
}
// String returns a string representation.
func (m *HasVoteMessage) String() string {
return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v}]", m.Index, m.Height, m.Round, m.Type)
}
// VoteSetMaj23Message is sent to indicate that a given BlockID has seen +2/3 votes.
type VoteSetMaj23Message struct {
Height int64
Round int32
Type tmproto.SignedMsgType
BlockID types.BlockID
}
// ValidateBasic performs basic validation.
func (m *VoteSetMaj23Message) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
if m.Round < 0 {
return errors.New("negative Round")
}
if !types.IsVoteTypeValid(m.Type) {
return errors.New("invalid Type")
}
if err := m.BlockID.ValidateBasic(); err != nil {
return fmt.Errorf("wrong BlockID: %v", err)
}
return nil
}
// String returns a string representation.
func (m *VoteSetMaj23Message) String() string {
return fmt.Sprintf("[VSM23 %v/%02d/%v %v]", m.Height, m.Round, m.Type, m.BlockID)
}
// VoteSetBitsMessage is sent to communicate the bit-array of votes seen for the
// BlockID.
type VoteSetBitsMessage struct {
Height int64
Round int32
Type tmproto.SignedMsgType
BlockID types.BlockID
Votes *bits.BitArray
}
// ValidateBasic performs basic validation.
func (m *VoteSetBitsMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
if !types.IsVoteTypeValid(m.Type) {
return errors.New("invalid Type")
}
if err := m.BlockID.ValidateBasic(); err != nil {
return fmt.Errorf("wrong BlockID: %v", err)
}
// NOTE: Votes.Size() can be zero if the node does not have any
if m.Votes.Size() > types.MaxVotesCount {
return fmt.Errorf("votes bit array is too big: %d, max: %d", m.Votes.Size(), types.MaxVotesCount)
}
return nil
}
// String returns a string representation.
func (m *VoteSetBitsMessage) String() string {
return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes)
}
// MsgToProto takes a consensus message type and returns the proto defined
// consensus message.
//
// TODO: This needs to be removed, but WALToProto depends on this.
func MsgToProto(msg Message) (*tmcons.Message, error) {
if msg == nil {
return nil, errors.New("consensus: message is nil")
@ -143,7 +444,7 @@ func MsgToProto(msg Message) (*tmcons.Message, error) {
return &pb, nil
}
// MsgFromProto takes a consensus proto message and returns the native go type
// MsgFromProto takes a consensus proto message and returns the native go type.
func MsgFromProto(msg *tmcons.Message) (Message, error) {
if msg == nil {
return nil, errors.New("consensus: nil message")
@ -269,21 +570,7 @@ func MsgFromProto(msg *tmcons.Message) (Message, error) {
return pb, nil
}
// MustEncode takes the reactors msg, makes it proto and marshals it
// this mimics `MustMarshalBinaryBare` in that is panics on error
func MustEncode(msg Message) []byte {
pb, err := MsgToProto(msg)
if err != nil {
panic(err)
}
enc, err := proto.Marshal(pb)
if err != nil {
panic(err)
}
return enc
}
// WALToProto takes a WAL message and return a proto walMessage and error
// WALToProto takes a WAL message and return a proto walMessage and error.
func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
var pb tmcons.WALMessage
@ -311,6 +598,7 @@ func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
},
},
}
case timeoutInfo:
pb = tmcons.WALMessage{
Sum: &tmcons.WALMessage_TimeoutInfo{
@ -322,6 +610,7 @@ func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
},
},
}
case EndHeightMessage:
pb = tmcons.WALMessage{
Sum: &tmcons.WALMessage_EndHeight{
@ -330,6 +619,7 @@ func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
},
},
}
default:
return nil, fmt.Errorf("to proto: wal message not recognized: %T", msg)
}
@ -337,11 +627,13 @@ func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
return &pb, nil
}
// WALFromProto takes a proto wal message and return a consensus walMessage and error
// WALFromProto takes a proto wal message and return a consensus walMessage and
// error.
func WALFromProto(msg *tmcons.WALMessage) (WALMessage, error) {
if msg == nil {
return nil, errors.New("nil WAL message")
}
var pb WALMessage
switch msg := msg.Sum.(type) {
@ -351,6 +643,7 @@ func WALFromProto(msg *tmcons.WALMessage) (WALMessage, error) {
Round: msg.EventDataRoundState.Round,
Step: msg.EventDataRoundState.Step,
}
case *tmcons.WALMessage_MsgInfo:
walMsg, err := MsgFromProto(&msg.MsgInfo.Msg)
if err != nil {
@ -367,20 +660,26 @@ func WALFromProto(msg *tmcons.WALMessage) (WALMessage, error) {
if err != nil {
return nil, fmt.Errorf("denying message due to possible overflow: %w", err)
}
pb = timeoutInfo{
Duration: msg.TimeoutInfo.Duration,
Height: msg.TimeoutInfo.Height,
Round: msg.TimeoutInfo.Round,
Step: cstypes.RoundStepType(tis),
}
return pb, nil
case *tmcons.WALMessage_EndHeight:
pb := EndHeightMessage{
Height: msg.EndHeight.Height,
}
return pb, nil
default:
return nil, fmt.Errorf("from proto: wal message not recognized: %T", msg)
}
return pb, nil
}

+ 310
- 0
consensus/msgs_test.go View File

@ -2,6 +2,7 @@ package consensus
import (
"encoding/hex"
"fmt"
"math"
"testing"
"time"
@ -10,8 +11,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/crypto/merkle"
"github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/libs/bits"
"github.com/tendermint/tendermint/libs/bytes"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/p2p"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
@ -425,3 +429,309 @@ func TestConsMsgsVectors(t *testing.T) {
})
}
}
func TestVoteSetMaj23MessageValidateBasic(t *testing.T) {
const (
validSignedMsgType tmproto.SignedMsgType = 0x01
invalidSignedMsgType tmproto.SignedMsgType = 0x03
)
validBlockID := types.BlockID{}
invalidBlockID := types.BlockID{
Hash: bytes.HexBytes{},
PartSetHeader: types.PartSetHeader{
Total: 1,
Hash: []byte{0},
},
}
testCases := []struct { // nolint: maligned
expectErr bool
messageRound int32
messageHeight int64
testName string
messageType tmproto.SignedMsgType
messageBlockID types.BlockID
}{
{false, 0, 0, "Valid Message", validSignedMsgType, validBlockID},
{true, -1, 0, "Invalid Message", validSignedMsgType, validBlockID},
{true, 0, -1, "Invalid Message", validSignedMsgType, validBlockID},
{true, 0, 0, "Invalid Message", invalidSignedMsgType, validBlockID},
{true, 0, 0, "Invalid Message", validSignedMsgType, invalidBlockID},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
message := VoteSetMaj23Message{
Height: tc.messageHeight,
Round: tc.messageRound,
Type: tc.messageType,
BlockID: tc.messageBlockID,
}
assert.Equal(t, tc.expectErr, message.ValidateBasic() != nil, "Validate Basic had an unexpected result")
})
}
}
func TestVoteSetBitsMessageValidateBasic(t *testing.T) {
testCases := []struct {
malleateFn func(*VoteSetBitsMessage)
expErr string
}{
{func(msg *VoteSetBitsMessage) {}, ""},
{func(msg *VoteSetBitsMessage) { msg.Height = -1 }, "negative Height"},
{func(msg *VoteSetBitsMessage) { msg.Type = 0x03 }, "invalid Type"},
{func(msg *VoteSetBitsMessage) {
msg.BlockID = types.BlockID{
Hash: bytes.HexBytes{},
PartSetHeader: types.PartSetHeader{
Total: 1,
Hash: []byte{0},
},
}
}, "wrong BlockID: wrong PartSetHeader: wrong Hash:"},
{func(msg *VoteSetBitsMessage) { msg.Votes = bits.NewBitArray(types.MaxVotesCount + 1) },
"votes bit array is too big: 10001, max: 10000"},
}
for i, tc := range testCases {
tc := tc
t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) {
msg := &VoteSetBitsMessage{
Height: 1,
Round: 0,
Type: 0x01,
Votes: bits.NewBitArray(1),
BlockID: types.BlockID{},
}
tc.malleateFn(msg)
err := msg.ValidateBasic()
if tc.expErr != "" && assert.Error(t, err) {
assert.Contains(t, err.Error(), tc.expErr)
}
})
}
}
func TestNewRoundStepMessageValidateBasic(t *testing.T) {
testCases := []struct { // nolint: maligned
expectErr bool
messageRound int32
messageLastCommitRound int32
messageHeight int64
testName string
messageStep cstypes.RoundStepType
}{
{false, 0, 0, 0, "Valid Message", cstypes.RoundStepNewHeight},
{true, -1, 0, 0, "Negative round", cstypes.RoundStepNewHeight},
{true, 0, 0, -1, "Negative height", cstypes.RoundStepNewHeight},
{true, 0, 0, 0, "Invalid Step", cstypes.RoundStepCommit + 1},
// The following cases will be handled by ValidateHeight
{false, 0, 0, 1, "H == 1 but LCR != -1 ", cstypes.RoundStepNewHeight},
{false, 0, -1, 2, "H > 1 but LCR < 0", cstypes.RoundStepNewHeight},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
message := NewRoundStepMessage{
Height: tc.messageHeight,
Round: tc.messageRound,
Step: tc.messageStep,
LastCommitRound: tc.messageLastCommitRound,
}
err := message.ValidateBasic()
if tc.expectErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
func TestNewRoundStepMessageValidateHeight(t *testing.T) {
initialHeight := int64(10)
testCases := []struct { // nolint: maligned
expectErr bool
messageLastCommitRound int32
messageHeight int64
testName string
}{
{false, 0, 11, "Valid Message"},
{true, 0, -1, "Negative height"},
{true, 0, 0, "Zero height"},
{true, 0, 10, "Initial height but LCR != -1 "},
{true, -1, 11, "Normal height but LCR < 0"},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
message := NewRoundStepMessage{
Height: tc.messageHeight,
Round: 0,
Step: cstypes.RoundStepNewHeight,
LastCommitRound: tc.messageLastCommitRound,
}
err := message.ValidateHeight(initialHeight)
if tc.expectErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
func TestNewValidBlockMessageValidateBasic(t *testing.T) {
testCases := []struct {
malleateFn func(*NewValidBlockMessage)
expErr string
}{
{func(msg *NewValidBlockMessage) {}, ""},
{func(msg *NewValidBlockMessage) { msg.Height = -1 }, "negative Height"},
{func(msg *NewValidBlockMessage) { msg.Round = -1 }, "negative Round"},
{
func(msg *NewValidBlockMessage) { msg.BlockPartSetHeader.Total = 2 },
"blockParts bit array size 1 not equal to BlockPartSetHeader.Total 2",
},
{
func(msg *NewValidBlockMessage) {
msg.BlockPartSetHeader.Total = 0
msg.BlockParts = bits.NewBitArray(0)
},
"empty blockParts",
},
{
func(msg *NewValidBlockMessage) { msg.BlockParts = bits.NewBitArray(int(types.MaxBlockPartsCount) + 1) },
"blockParts bit array size 1602 not equal to BlockPartSetHeader.Total 1",
},
}
for i, tc := range testCases {
tc := tc
t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) {
msg := &NewValidBlockMessage{
Height: 1,
Round: 0,
BlockPartSetHeader: types.PartSetHeader{
Total: 1,
},
BlockParts: bits.NewBitArray(1),
}
tc.malleateFn(msg)
err := msg.ValidateBasic()
if tc.expErr != "" && assert.Error(t, err) {
assert.Contains(t, err.Error(), tc.expErr)
}
})
}
}
func TestProposalPOLMessageValidateBasic(t *testing.T) {
testCases := []struct {
malleateFn func(*ProposalPOLMessage)
expErr string
}{
{func(msg *ProposalPOLMessage) {}, ""},
{func(msg *ProposalPOLMessage) { msg.Height = -1 }, "negative Height"},
{func(msg *ProposalPOLMessage) { msg.ProposalPOLRound = -1 }, "negative ProposalPOLRound"},
{func(msg *ProposalPOLMessage) { msg.ProposalPOL = bits.NewBitArray(0) }, "empty ProposalPOL bit array"},
{func(msg *ProposalPOLMessage) { msg.ProposalPOL = bits.NewBitArray(types.MaxVotesCount + 1) },
"proposalPOL bit array is too big: 10001, max: 10000"},
}
for i, tc := range testCases {
tc := tc
t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) {
msg := &ProposalPOLMessage{
Height: 1,
ProposalPOLRound: 1,
ProposalPOL: bits.NewBitArray(1),
}
tc.malleateFn(msg)
err := msg.ValidateBasic()
if tc.expErr != "" && assert.Error(t, err) {
assert.Contains(t, err.Error(), tc.expErr)
}
})
}
}
func TestBlockPartMessageValidateBasic(t *testing.T) {
testPart := new(types.Part)
testPart.Proof.LeafHash = tmhash.Sum([]byte("leaf"))
testCases := []struct {
testName string
messageHeight int64
messageRound int32
messagePart *types.Part
expectErr bool
}{
{"Valid Message", 0, 0, testPart, false},
{"Invalid Message", -1, 0, testPart, true},
{"Invalid Message", 0, -1, testPart, true},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
message := BlockPartMessage{
Height: tc.messageHeight,
Round: tc.messageRound,
Part: tc.messagePart,
}
assert.Equal(t, tc.expectErr, message.ValidateBasic() != nil, "Validate Basic had an unexpected result")
})
}
message := BlockPartMessage{Height: 0, Round: 0, Part: new(types.Part)}
message.Part.Index = 1
assert.Equal(t, true, message.ValidateBasic() != nil, "Validate Basic had an unexpected result")
}
func TestHasVoteMessageValidateBasic(t *testing.T) {
const (
validSignedMsgType tmproto.SignedMsgType = 0x01
invalidSignedMsgType tmproto.SignedMsgType = 0x03
)
testCases := []struct { // nolint: maligned
expectErr bool
messageRound int32
messageIndex int32
messageHeight int64
testName string
messageType tmproto.SignedMsgType
}{
{false, 0, 0, 0, "Valid Message", validSignedMsgType},
{true, -1, 0, 0, "Invalid Message", validSignedMsgType},
{true, 0, -1, 0, "Invalid Message", validSignedMsgType},
{true, 0, 0, 0, "Invalid Message", invalidSignedMsgType},
{true, 0, 0, -1, "Invalid Message", validSignedMsgType},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
message := HasVoteMessage{
Height: tc.messageHeight,
Round: tc.messageRound,
Type: tc.messageType,
Index: tc.messageIndex,
}
assert.Equal(t, tc.expectErr, message.ValidateBasic() != nil, "Validate Basic had an unexpected result")
})
}
}

+ 528
- 0
consensus/peer_state.go View File

@ -0,0 +1,528 @@
package consensus
import (
"errors"
"fmt"
"sync"
"time"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/libs/bits"
tmjson "github.com/tendermint/tendermint/libs/json"
"github.com/tendermint/tendermint/libs/log"
tmsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/p2p"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"
)
var (
ErrPeerStateHeightRegression = errors.New("peer state height regression")
ErrPeerStateInvalidStartTime = errors.New("peer state invalid startTime")
)
// peerStateStats holds internal statistics for a peer.
type peerStateStats struct {
Votes int `json:"votes"`
BlockParts int `json:"block_parts"`
}
func (pss peerStateStats) String() string {
return fmt.Sprintf("peerStateStats{votes: %d, blockParts: %d}", pss.Votes, pss.BlockParts)
}
// PeerState contains the known state of a peer, including its connection and
// threadsafe access to its PeerRoundState.
// NOTE: THIS GETS DUMPED WITH rpc/core/consensus.go.
// Be mindful of what you Expose.
type PeerState struct {
peerID p2p.NodeID
logger log.Logger
// NOTE: Modify below using setters, never directly.
mtx tmsync.RWMutex
running bool
PRS cstypes.PeerRoundState `json:"round_state"`
Stats *peerStateStats `json:"stats"`
broadcastWG sync.WaitGroup
closer *tmsync.Closer
}
// NewPeerState returns a new PeerState for the given node ID.
func NewPeerState(logger log.Logger, peerID p2p.NodeID) *PeerState {
return &PeerState{
peerID: peerID,
logger: logger,
closer: tmsync.NewCloser(),
PRS: cstypes.PeerRoundState{
Round: -1,
ProposalPOLRound: -1,
LastCommitRound: -1,
CatchupCommitRound: -1,
},
Stats: &peerStateStats{},
}
}
// SetRunning sets the running state of the peer.
func (ps *PeerState) SetRunning(v bool) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
ps.running = v
}
// IsRunning returns true if a PeerState is considered running where multiple
// broadcasting goroutines exist for the peer.
func (ps *PeerState) IsRunning() bool {
ps.mtx.RLock()
defer ps.mtx.RUnlock()
return ps.running
}
// GetRoundState returns a shallow copy of the PeerRoundState. There's no point
// in mutating it since it won't change PeerState.
func (ps *PeerState) GetRoundState() *cstypes.PeerRoundState {
ps.mtx.Lock()
defer ps.mtx.Unlock()
prs := ps.PRS // copy
return &prs
}
// ToJSON returns a json of PeerState.
func (ps *PeerState) ToJSON() ([]byte, error) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return tmjson.Marshal(ps)
}
// GetHeight returns an atomic snapshot of the PeerRoundState's height used by
// the mempool to ensure peers are caught up before broadcasting new txs.
func (ps *PeerState) GetHeight() int64 {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return ps.PRS.Height
}
// SetHasProposal sets the given proposal as known for the peer.
func (ps *PeerState) SetHasProposal(proposal *types.Proposal) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.PRS.Height != proposal.Height || ps.PRS.Round != proposal.Round {
return
}
if ps.PRS.Proposal {
return
}
ps.PRS.Proposal = true
// ps.PRS.ProposalBlockParts is set due to NewValidBlockMessage
if ps.PRS.ProposalBlockParts != nil {
return
}
ps.PRS.ProposalBlockPartSetHeader = proposal.BlockID.PartSetHeader
ps.PRS.ProposalBlockParts = bits.NewBitArray(int(proposal.BlockID.PartSetHeader.Total))
ps.PRS.ProposalPOLRound = proposal.POLRound
ps.PRS.ProposalPOL = nil // Nil until ProposalPOLMessage received.
}
// InitProposalBlockParts initializes the peer's proposal block parts header
// and bit array.
func (ps *PeerState) InitProposalBlockParts(partSetHeader types.PartSetHeader) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.PRS.ProposalBlockParts != nil {
return
}
ps.PRS.ProposalBlockPartSetHeader = partSetHeader
ps.PRS.ProposalBlockParts = bits.NewBitArray(int(partSetHeader.Total))
}
// SetHasProposalBlockPart sets the given block part index as known for the peer.
func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index int) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.PRS.Height != height || ps.PRS.Round != round {
return
}
ps.PRS.ProposalBlockParts.SetIndex(index, true)
}
// PickVoteToSend picks a vote to send to the peer. It will return true if a
// vote was picked.
//
// NOTE: `votes` must be the correct Size() for the Height().
func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (*types.Vote, bool) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if votes.Size() == 0 {
return nil, false
}
var (
height = votes.GetHeight()
round = votes.GetRound()
votesType = tmproto.SignedMsgType(votes.Type())
size = votes.Size()
)
// lazily set data using 'votes'
if votes.IsCommit() {
ps.ensureCatchupCommitRound(height, round, size)
}
ps.ensureVoteBitArrays(height, size)
psVotes := ps.getVoteBitArray(height, round, votesType)
if psVotes == nil {
return nil, false // not something worth sending
}
if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
return votes.GetByIndex(int32(index)), true
}
return nil, false
}
func (ps *PeerState) getVoteBitArray(height int64, round int32, votesType tmproto.SignedMsgType) *bits.BitArray {
if !types.IsVoteTypeValid(votesType) {
return nil
}
if ps.PRS.Height == height {
if ps.PRS.Round == round {
switch votesType {
case tmproto.PrevoteType:
return ps.PRS.Prevotes
case tmproto.PrecommitType:
return ps.PRS.Precommits
}
}
if ps.PRS.CatchupCommitRound == round {
switch votesType {
case tmproto.PrevoteType:
return nil
case tmproto.PrecommitType:
return ps.PRS.CatchupCommit
}
}
if ps.PRS.ProposalPOLRound == round {
switch votesType {
case tmproto.PrevoteType:
return ps.PRS.ProposalPOL
case tmproto.PrecommitType:
return nil
}
}
return nil
}
if ps.PRS.Height == height+1 {
if ps.PRS.LastCommitRound == round {
switch votesType {
case tmproto.PrevoteType:
return nil
case tmproto.PrecommitType:
return ps.PRS.LastCommit
}
}
return nil
}
return nil
}
// 'round': A round for which we have a +2/3 commit.
func (ps *PeerState) ensureCatchupCommitRound(height int64, round int32, numValidators int) {
if ps.PRS.Height != height {
return
}
/*
NOTE: This is wrong, 'round' could change.
e.g. if orig round is not the same as block LastCommit round.
if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
panic(fmt.Sprintf(
"Conflicting CatchupCommitRound. Height: %v,
Orig: %v,
New: %v",
height,
ps.CatchupCommitRound,
round))
}
*/
if ps.PRS.CatchupCommitRound == round {
return // Nothing to do!
}
ps.PRS.CatchupCommitRound = round
if round == ps.PRS.Round {
ps.PRS.CatchupCommit = ps.PRS.Precommits
} else {
ps.PRS.CatchupCommit = bits.NewBitArray(numValidators)
}
}
// EnsureVoteBitArrays ensures the bit-arrays have been allocated for tracking
// what votes this peer has received.
// NOTE: It's important to make sure that numValidators actually matches
// what the node sees as the number of validators for height.
func (ps *PeerState) EnsureVoteBitArrays(height int64, numValidators int) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
ps.ensureVoteBitArrays(height, numValidators)
}
func (ps *PeerState) ensureVoteBitArrays(height int64, numValidators int) {
if ps.PRS.Height == height {
if ps.PRS.Prevotes == nil {
ps.PRS.Prevotes = bits.NewBitArray(numValidators)
}
if ps.PRS.Precommits == nil {
ps.PRS.Precommits = bits.NewBitArray(numValidators)
}
if ps.PRS.CatchupCommit == nil {
ps.PRS.CatchupCommit = bits.NewBitArray(numValidators)
}
if ps.PRS.ProposalPOL == nil {
ps.PRS.ProposalPOL = bits.NewBitArray(numValidators)
}
} else if ps.PRS.Height == height+1 {
if ps.PRS.LastCommit == nil {
ps.PRS.LastCommit = bits.NewBitArray(numValidators)
}
}
}
// RecordVote increments internal votes related statistics for this peer.
// It returns the total number of added votes.
func (ps *PeerState) RecordVote() int {
ps.mtx.Lock()
defer ps.mtx.Unlock()
ps.Stats.Votes++
return ps.Stats.Votes
}
// VotesSent returns the number of blocks for which peer has been sending us
// votes.
func (ps *PeerState) VotesSent() int {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return ps.Stats.Votes
}
// RecordBlockPart increments internal block part related statistics for this peer.
// It returns the total number of added block parts.
func (ps *PeerState) RecordBlockPart() int {
ps.mtx.Lock()
defer ps.mtx.Unlock()
ps.Stats.BlockParts++
return ps.Stats.BlockParts
}
// BlockPartsSent returns the number of useful block parts the peer has sent us.
func (ps *PeerState) BlockPartsSent() int {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return ps.Stats.BlockParts
}
// SetHasVote sets the given vote as known by the peer
func (ps *PeerState) SetHasVote(vote *types.Vote) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
ps.setHasVote(vote.Height, vote.Round, vote.Type, vote.ValidatorIndex)
}
func (ps *PeerState) setHasVote(height int64, round int32, voteType tmproto.SignedMsgType, index int32) {
logger := ps.logger.With(
"peerH/R", fmt.Sprintf("%d/%d", ps.PRS.Height, ps.PRS.Round),
"H/R", fmt.Sprintf("%d/%d", height, round),
)
logger.Debug("setHasVote", "type", voteType, "index", index)
// NOTE: some may be nil BitArrays -> no side effects
psVotes := ps.getVoteBitArray(height, round, voteType)
if psVotes != nil {
psVotes.SetIndex(int(index), true)
}
}
// ApplyNewRoundStepMessage updates the peer state for the new round.
func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
// ignore duplicates or decreases
if CompareHRS(msg.Height, msg.Round, msg.Step, ps.PRS.Height, ps.PRS.Round, ps.PRS.Step) <= 0 {
return
}
var (
psHeight = ps.PRS.Height
psRound = ps.PRS.Round
psCatchupCommitRound = ps.PRS.CatchupCommitRound
psCatchupCommit = ps.PRS.CatchupCommit
startTime = tmtime.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
)
ps.PRS.Height = msg.Height
ps.PRS.Round = msg.Round
ps.PRS.Step = msg.Step
ps.PRS.StartTime = startTime
if psHeight != msg.Height || psRound != msg.Round {
ps.PRS.Proposal = false
ps.PRS.ProposalBlockPartSetHeader = types.PartSetHeader{}
ps.PRS.ProposalBlockParts = nil
ps.PRS.ProposalPOLRound = -1
ps.PRS.ProposalPOL = nil
// we'll update the BitArray capacity later
ps.PRS.Prevotes = nil
ps.PRS.Precommits = nil
}
if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
// Peer caught up to CatchupCommitRound.
// Preserve psCatchupCommit!
// NOTE: We prefer to use prs.Precommits if
// pr.Round matches pr.CatchupCommitRound.
ps.PRS.Precommits = psCatchupCommit
}
if psHeight != msg.Height {
// shift Precommits to LastCommit
if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
ps.PRS.LastCommitRound = msg.LastCommitRound
ps.PRS.LastCommit = ps.PRS.Precommits
} else {
ps.PRS.LastCommitRound = msg.LastCommitRound
ps.PRS.LastCommit = nil
}
// we'll update the BitArray capacity later
ps.PRS.CatchupCommitRound = -1
ps.PRS.CatchupCommit = nil
}
}
// ApplyNewValidBlockMessage updates the peer state for the new valid block.
func (ps *PeerState) ApplyNewValidBlockMessage(msg *NewValidBlockMessage) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.PRS.Height != msg.Height {
return
}
if ps.PRS.Round != msg.Round && !msg.IsCommit {
return
}
ps.PRS.ProposalBlockPartSetHeader = msg.BlockPartSetHeader
ps.PRS.ProposalBlockParts = msg.BlockParts
}
// ApplyProposalPOLMessage updates the peer state for the new proposal POL.
func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.PRS.Height != msg.Height {
return
}
if ps.PRS.ProposalPOLRound != msg.ProposalPOLRound {
return
}
// TODO: Merge onto existing ps.PRS.ProposalPOL?
// We might have sent some prevotes in the meantime.
ps.PRS.ProposalPOL = msg.ProposalPOL
}
// ApplyHasVoteMessage updates the peer state for the new vote.
func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.PRS.Height != msg.Height {
return
}
ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
}
// ApplyVoteSetBitsMessage updates the peer state for the bit-array of votes
// it claims to have for the corresponding BlockID.
// `ourVotes` is a BitArray of votes we have for msg.BlockID
// NOTE: if ourVotes is nil (e.g. msg.Height < rs.Height),
// we conservatively overwrite ps's votes w/ msg.Votes.
func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *bits.BitArray) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
votes := ps.getVoteBitArray(msg.Height, msg.Round, msg.Type)
if votes != nil {
if ourVotes == nil {
votes.Update(msg.Votes)
} else {
otherVotes := votes.Sub(ourVotes)
hasVotes := otherVotes.Or(msg.Votes)
votes.Update(hasVotes)
}
}
}
// String returns a string representation of the PeerState
func (ps *PeerState) String() string {
return ps.StringIndented("")
}
// StringIndented returns a string representation of the PeerState
func (ps *PeerState) StringIndented(indent string) string {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return fmt.Sprintf(`PeerState{
%s Key %v
%s RoundState %v
%s Stats %v
%s}`,
indent, ps.peerID,
indent, ps.PRS.StringIndented(indent+" "),
indent, ps.Stats,
indent,
)
}

+ 1029
- 1369
consensus/reactor.go
File diff suppressed because it is too large
View File


+ 478
- 753
consensus/reactor_test.go
File diff suppressed because it is too large
View File


+ 12
- 15
consensus/replay_test.go View File

@ -34,21 +34,6 @@ import (
"github.com/tendermint/tendermint/types"
)
func TestMain(m *testing.M) {
config = ResetConfig("consensus_reactor_test")
consensusReplayConfig = ResetConfig("consensus_replay_test")
configStateTest := ResetConfig("consensus_state_test")
configMempoolTest := ResetConfig("consensus_mempool_test")
configByzantineTest := ResetConfig("consensus_byzantine_test")
code := m.Run()
os.RemoveAll(config.RootDir)
os.RemoveAll(consensusReplayConfig.RootDir)
os.RemoveAll(configStateTest.RootDir)
os.RemoveAll(configMempoolTest.RootDir)
os.RemoveAll(configByzantineTest.RootDir)
os.Exit(code)
}
// These tests ensure we can always recover from failure at any part of the consensus process.
// There are two general failure scenarios: failure during consensus, and failure while applying the block.
// Only the latter interacts with the app and store,
@ -321,6 +306,8 @@ var modes = []uint{0, 1, 2, 3}
// This is actually not a test, it's for storing validator change tx data for testHandshakeReplay
func TestSimulateValidatorsChange(t *testing.T) {
configSetup(t)
nPeers := 7
nVals := 4
css, genDoc, config, cleanup := randConsensusNetWithPeers(
@ -544,6 +531,8 @@ func TestSimulateValidatorsChange(t *testing.T) {
// Sync from scratch
func TestHandshakeReplayAll(t *testing.T) {
configSetup(t)
for _, m := range modes {
testHandshakeReplay(t, config, 0, m, false)
}
@ -554,6 +543,8 @@ func TestHandshakeReplayAll(t *testing.T) {
// Sync many, not from scratch
func TestHandshakeReplaySome(t *testing.T) {
configSetup(t)
for _, m := range modes {
testHandshakeReplay(t, config, 2, m, false)
}
@ -564,6 +555,8 @@ func TestHandshakeReplaySome(t *testing.T) {
// Sync from lagging by one
func TestHandshakeReplayOne(t *testing.T) {
configSetup(t)
for _, m := range modes {
testHandshakeReplay(t, config, numBlocks-1, m, false)
}
@ -574,6 +567,8 @@ func TestHandshakeReplayOne(t *testing.T) {
// Sync from caught up
func TestHandshakeReplayNone(t *testing.T) {
configSetup(t)
for _, m := range modes {
testHandshakeReplay(t, config, numBlocks, m, false)
}
@ -584,6 +579,8 @@ func TestHandshakeReplayNone(t *testing.T) {
// Test mockProxyApp should not panic when app return ABCIResponses with some empty ResponseDeliverTx
func TestMockProxyApp(t *testing.T) {
configSetup(t)
sim.CleanupFunc() // clean the test env created in TestSimulateValidatorsChange
logger := log.TestingLogger()
var validTxs, invalidTxs = 0, 0


+ 58
- 0
consensus/state_test.go View File

@ -56,6 +56,8 @@ x * TestHalt1 - if we see +2/3 precommits after timing out into new round, we sh
// ProposeSuite
func TestStateProposerSelection0(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
height, round := cs1.Height, cs1.Round
@ -96,6 +98,8 @@ func TestStateProposerSelection0(t *testing.T) {
// Now let's do it all again, but starting from round 2 instead of 0
func TestStateProposerSelection2(t *testing.T) {
configSetup(t)
cs1, vss := randState(4) // test needs more work for more than 3 validators
height := cs1.Height
newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound)
@ -133,6 +137,8 @@ func TestStateProposerSelection2(t *testing.T) {
// a non-validator should timeout into the prevote round
func TestStateEnterProposeNoPrivValidator(t *testing.T) {
configSetup(t)
cs, _ := randState(1)
cs.SetPrivValidator(nil)
height, round := cs.Height, cs.Round
@ -152,6 +158,8 @@ func TestStateEnterProposeNoPrivValidator(t *testing.T) {
// a validator should not timeout of the prevote round (TODO: unless the block is really big!)
func TestStateEnterProposeYesPrivValidator(t *testing.T) {
configSetup(t)
cs, _ := randState(1)
height, round := cs.Height, cs.Round
@ -182,6 +190,8 @@ func TestStateEnterProposeYesPrivValidator(t *testing.T) {
}
func TestStateBadProposal(t *testing.T) {
configSetup(t)
cs1, vss := randState(2)
height, round := cs1.Height, cs1.Round
vs2 := vss[1]
@ -240,6 +250,8 @@ func TestStateBadProposal(t *testing.T) {
}
func TestStateOversizedBlock(t *testing.T) {
configSetup(t)
cs1, vss := randState(2)
cs1.state.ConsensusParams.Block.MaxBytes = 2000
height, round := cs1.Height, cs1.Round
@ -302,6 +314,8 @@ func TestStateOversizedBlock(t *testing.T) {
// propose, prevote, and precommit a block
func TestStateFullRound1(t *testing.T) {
configSetup(t)
cs, vss := randState(1)
height, round := cs.Height, cs.Round
@ -342,6 +356,8 @@ func TestStateFullRound1(t *testing.T) {
// nil is proposed, so prevote and precommit nil
func TestStateFullRoundNil(t *testing.T) {
configSetup(t)
cs, vss := randState(1)
height, round := cs.Height, cs.Round
@ -360,6 +376,8 @@ func TestStateFullRoundNil(t *testing.T) {
// run through propose, prevote, precommit commit with two validators
// where the first validator has to wait for votes from the second
func TestStateFullRound2(t *testing.T) {
configSetup(t)
cs1, vss := randState(2)
vs2 := vss[1]
height, round := cs1.Height, cs1.Round
@ -400,6 +418,8 @@ func TestStateFullRound2(t *testing.T) {
// two validators, 4 rounds.
// two vals take turns proposing. val1 locks on first one, precommits nil on everything else
func TestStateLockNoPOL(t *testing.T) {
configSetup(t)
cs1, vss := randState(2)
vs2 := vss[1]
height, round := cs1.Height, cs1.Round
@ -586,6 +606,8 @@ func TestStateLockNoPOL(t *testing.T) {
// in round two: v1 prevotes the same block that the node is locked on
// the others prevote a new block hence v1 changes lock and precommits the new block with the others
func TestStateLockPOLRelock(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@ -683,6 +705,8 @@ func TestStateLockPOLRelock(t *testing.T) {
// 4 vals, one precommits, other 3 polka at next round, so we unlock and precomit the polka
func TestStateLockPOLUnlock(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@ -775,6 +799,8 @@ func TestStateLockPOLUnlock(t *testing.T) {
// v1 should unlock and precommit nil. In the third round another block is proposed, all vals
// prevote and now v1 can lock onto the third block and precommit that
func TestStateLockPOLUnlockOnUnknownBlock(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@ -901,6 +927,8 @@ func TestStateLockPOLUnlockOnUnknownBlock(t *testing.T) {
// then a polka at round 2 that we lock on
// then we see the polka from round 1 but shouldn't unlock
func TestStateLockPOLSafety1(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@ -1020,6 +1048,8 @@ func TestStateLockPOLSafety1(t *testing.T) {
// What we want:
// dont see P0, lock on P1 at R1, dont unlock using P0 at R2
func TestStateLockPOLSafety2(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@ -1117,6 +1147,8 @@ func TestStateLockPOLSafety2(t *testing.T) {
// What we want:
// P0 proposes B0 at R3.
func TestProposeValidBlock(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@ -1207,6 +1239,8 @@ func TestProposeValidBlock(t *testing.T) {
// What we want:
// P0 miss to lock B but set valid block to B after receiving delayed prevote.
func TestSetValidBlockOnDelayedPrevote(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@ -1269,6 +1303,8 @@ func TestSetValidBlockOnDelayedPrevote(t *testing.T) {
// P0 miss to lock B as Proposal Block is missing, but set valid block to B after
// receiving delayed Block Proposal.
func TestSetValidBlockOnDelayedProposal(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@ -1325,6 +1361,8 @@ func TestSetValidBlockOnDelayedProposal(t *testing.T) {
// What we want:
// P0 waits for timeoutPrecommit before starting next round
func TestWaitingTimeoutOnNilPolka(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@ -1346,6 +1384,8 @@ func TestWaitingTimeoutOnNilPolka(t *testing.T) {
// What we want:
// P0 waits for timeoutPropose in the next round before entering prevote
func TestWaitingTimeoutProposeOnNewRound(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@ -1382,6 +1422,8 @@ func TestWaitingTimeoutProposeOnNewRound(t *testing.T) {
// What we want:
// P0 jump to higher round, precommit and start precommit wait
func TestRoundSkipOnNilPolkaFromHigherRound(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@ -1418,6 +1460,8 @@ func TestRoundSkipOnNilPolkaFromHigherRound(t *testing.T) {
// What we want:
// P0 wait for timeoutPropose to expire before sending prevote.
func TestWaitTimeoutProposeOnNilPolkaForTheCurrentRound(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, int32(1)
@ -1445,6 +1489,8 @@ func TestWaitTimeoutProposeOnNilPolkaForTheCurrentRound(t *testing.T) {
// What we want:
// P0 emit NewValidBlock event upon receiving 2/3+ Precommit for B but hasn't received block B yet
func TestEmitNewValidBlockEventOnCommitWithoutBlock(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, int32(1)
@ -1479,6 +1525,8 @@ func TestEmitNewValidBlockEventOnCommitWithoutBlock(t *testing.T) {
// P0 receives 2/3+ Precommit for B for round 0, while being in round 1. It emits NewValidBlock event.
// After receiving block, it executes block and moves to the next height.
func TestCommitFromPreviousRound(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, int32(1)
@ -1532,6 +1580,8 @@ func (n *fakeTxNotifier) Notify() {
// and third precommit arrives which leads to the commit of that header and the correct
// start of the next round
func TestStartNextHeightCorrectlyAfterTimeout(t *testing.T) {
configSetup(t)
config.Consensus.SkipTimeoutCommit = false
cs1, vss := randState(4)
cs1.txNotifier = &fakeTxNotifier{ch: make(chan struct{})}
@ -1593,6 +1643,8 @@ func TestStartNextHeightCorrectlyAfterTimeout(t *testing.T) {
}
func TestResetTimeoutPrecommitUponNewHeight(t *testing.T) {
configSetup(t)
config.Consensus.SkipTimeoutCommit = false
cs1, vss := randState(4)
@ -1735,6 +1787,8 @@ func TestStateSlashingPrecommits(t *testing.T) {
// 4 vals.
// we receive a final precommit after going into next round, but others might have gone to commit already!
func TestStateHalt1(t *testing.T) {
configSetup(t)
cs1, vss := randState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3]
height, round := cs1.Height, cs1.Round
@ -1802,6 +1856,8 @@ func TestStateHalt1(t *testing.T) {
}
func TestStateOutputsBlockPartsStats(t *testing.T) {
configSetup(t)
// create dummy peer
cs, _ := randState(1)
peer := p2pmock.NewPeer(nil)
@ -1845,6 +1901,8 @@ func TestStateOutputsBlockPartsStats(t *testing.T) {
}
func TestStateOutputVoteStats(t *testing.T) {
configSetup(t)
cs, vss := randState(2)
// create dummy peer
peer := p2pmock.NewPeer(nil)


+ 43
- 16
node/node.go View File

@ -424,7 +424,8 @@ func createBlockchainReactor(
}
}
func createConsensusReactor(config *cfg.Config,
func createConsensusReactor(
config *cfg.Config,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
@ -434,7 +435,8 @@ func createConsensusReactor(config *cfg.Config,
csMetrics *cs.Metrics,
waitSync bool,
eventBus *types.EventBus,
consensusLogger log.Logger) (*cs.Reactor, *cs.State) {
logger log.Logger,
) (*p2p.ReactorShim, *cs.Reactor, *cs.State) {
consensusState := cs.NewState(
config.Consensus,
@ -445,16 +447,31 @@ func createConsensusReactor(config *cfg.Config,
evidencePool,
cs.StateMetrics(csMetrics),
)
consensusState.SetLogger(consensusLogger)
consensusState.SetLogger(logger)
if privValidator != nil {
consensusState.SetPrivValidator(privValidator)
}
consensusReactor := cs.NewReactor(consensusState, waitSync, cs.ReactorMetrics(csMetrics))
consensusReactor.SetLogger(consensusLogger)
// services which will be publishing and/or subscribing for messages (events)
// consensusReactor will set it on consensusState and blockExecutor
consensusReactor.SetEventBus(eventBus)
return consensusReactor, consensusState
reactorShim := p2p.NewReactorShim(logger, "ConsensusShim", cs.ChannelShims)
reactor := cs.NewReactor(
logger,
consensusState,
reactorShim.GetChannel(cs.StateChannel),
reactorShim.GetChannel(cs.DataChannel),
reactorShim.GetChannel(cs.VoteChannel),
reactorShim.GetChannel(cs.VoteSetBitsChannel),
reactorShim.PeerUpdates,
waitSync,
cs.ReactorMetrics(csMetrics),
)
// Services which will be publishing and/or subscribing for messages (events)
// consensusReactor will set it on consensusState and blockExecutor.
reactor.SetEventBus(eventBus)
return reactorShim, reactor, consensusState
}
func createTransport(
@ -477,7 +494,7 @@ func createSwitch(config *cfg.Config,
mempoolReactor *p2p.ReactorShim,
bcReactor p2p.Reactor,
stateSyncReactor *p2p.ReactorShim,
consensusReactor *cs.Reactor,
consensusReactor *p2p.ReactorShim,
evidenceReactor *p2p.ReactorShim,
proxyApp proxy.AppConns,
nodeInfo p2p.NodeInfo,
@ -781,7 +798,7 @@ func NewNode(config *cfg.Config,
sm.BlockExecutorWithMetrics(smMetrics),
)
csReactor, csState := createConsensusReactor(
csReactorShim, csReactor, csState := createConsensusReactor(
config, state, blockExec, blockStore, mempool, evPool,
privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger,
)
@ -837,7 +854,7 @@ func NewNode(config *cfg.Config,
transport := createTransport(p2pLogger, config)
sw := createSwitch(
config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch,
stateSyncReactorShim, csReactor, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger,
stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger,
)
err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
@ -976,6 +993,11 @@ func (n *Node) OnStart() error {
}
}
// Start the real consensus reactor separately since the switch uses the shim.
if err := n.consensusReactor.Start(); err != nil {
return err
}
// Start the real state sync reactor separately since the switch uses the shim.
if err := n.stateSyncReactor.Start(); err != nil {
return err
@ -1039,6 +1061,11 @@ func (n *Node) OnStop() {
}
}
// Stop the real consensus reactor separately since the switch uses the shim.
if err := n.consensusReactor.Stop(); err != nil {
n.Logger.Error("failed to stop the consensus reactor", "err", err)
}
// Stop the real state sync reactor separately since the switch uses the shim.
if err := n.stateSyncReactor.Stop(); err != nil {
n.Logger.Error("failed to stop the state sync reactor", "err", err)
@ -1375,10 +1402,10 @@ func makeNodeInfo(
Version: version.TMCoreSemVer,
Channels: []byte{
bcChannel,
cs.StateChannel,
cs.DataChannel,
cs.VoteChannel,
cs.VoteSetBitsChannel,
byte(cs.StateChannel),
byte(cs.DataChannel),
byte(cs.VoteChannel),
byte(cs.VoteSetBitsChannel),
byte(mempl.MempoolChannel),
byte(evidence.EvidenceChannel),
byte(statesync.SnapshotChannel),


+ 46
- 8
p2p/p2ptest/network.go View File

@ -34,16 +34,24 @@ func MakeNetwork(t *testing.T, nodes int) *Network {
logger: logger,
memoryNetwork: p2p.NewMemoryNetwork(logger),
}
for i := 0; i < nodes; i++ {
node := MakeNode(t, network)
network.Nodes[node.NodeID] = node
}
return network
}
// Start starts the network by setting up a list of node addresses to dial in
// addition to creating a peer update subscription for each node. Finally, all
// nodes are connected to each other.
func (n *Network) Start(t *testing.T) {
// Set up a list of node addresses to dial, and a peer update subscription
// for each node.
dialQueue := []p2p.NodeAddress{}
subs := map[p2p.NodeID]*p2p.PeerUpdates{}
for _, node := range network.Nodes {
for _, node := range n.Nodes {
dialQueue = append(dialQueue, node.NodeAddress)
subs[node.NodeID] = node.PeerManager.Subscribe()
defer subs[node.NodeID].Close()
@ -53,10 +61,11 @@ func MakeNetwork(t *testing.T, nodes int) *Network {
// (either inbound or outbound), and wait for both sides to confirm the
// connection via the subscriptions.
for i, sourceAddress := range dialQueue {
sourceNode := network.Nodes[sourceAddress.NodeID]
sourceNode := n.Nodes[sourceAddress.NodeID]
sourceSub := subs[sourceAddress.NodeID]
for _, targetAddress := range dialQueue[i+1:] { // nodes <i already connected
targetNode := network.Nodes[targetAddress.NodeID]
targetNode := n.Nodes[targetAddress.NodeID]
targetSub := subs[targetAddress.NodeID]
require.NoError(t, sourceNode.PeerManager.Add(targetAddress))
@ -87,8 +96,6 @@ func MakeNetwork(t *testing.T, nodes int) *Network {
require.NoError(t, targetNode.PeerManager.Add(sourceAddress))
}
}
return network
}
// NodeIDs returns the network's node IDs.
@ -106,10 +113,27 @@ func (n *Network) MakeChannels(
t *testing.T,
chID p2p.ChannelID,
messageType proto.Message,
size int,
) map[p2p.NodeID]*p2p.Channel {
channels := map[p2p.NodeID]*p2p.Channel{}
for _, node := range n.Nodes {
channels[node.NodeID] = node.MakeChannel(t, chID, messageType, size)
}
return channels
}
// MakeChannelsNoCleanup makes a channel on all nodes and returns them,
// automatically doing error checks. The caller must ensure proper cleanup of
// all the channels.
func (n *Network) MakeChannelsNoCleanup(
t *testing.T,
chID p2p.ChannelID,
messageType proto.Message,
size int,
) map[p2p.NodeID]*p2p.Channel {
channels := map[p2p.NodeID]*p2p.Channel{}
for _, node := range n.Nodes {
channels[node.NodeID] = node.MakeChannel(t, chID, messageType)
channels[node.NodeID] = node.MakeChannelNoCleanup(t, chID, messageType, size)
}
return channels
}
@ -218,8 +242,8 @@ func MakeNode(t *testing.T, network *Network) *Node {
// MakeChannel opens a channel, with automatic error handling and cleanup. On
// test cleanup, it also checks that the channel is empty, to make sure
// all expected messages have been asserted.
func (n *Node) MakeChannel(t *testing.T, chID p2p.ChannelID, messageType proto.Message) *p2p.Channel {
channel, err := n.Router.OpenChannel(chID, messageType)
func (n *Node) MakeChannel(t *testing.T, chID p2p.ChannelID, messageType proto.Message, size int) *p2p.Channel {
channel, err := n.Router.OpenChannel(chID, messageType, size)
require.NoError(t, err)
t.Cleanup(func() {
RequireEmpty(t, channel)
@ -228,6 +252,20 @@ func (n *Node) MakeChannel(t *testing.T, chID p2p.ChannelID, messageType proto.M
return channel
}
// MakeChannelNoCleanup opens a channel, with automatic error handling. The
// caller must ensure proper cleanup of the channel.
func (n *Node) MakeChannelNoCleanup(
t *testing.T,
chID p2p.ChannelID,
messageType proto.Message,
size int,
) *p2p.Channel {
channel, err := n.Router.OpenChannel(chID, messageType, size)
require.NoError(t, err)
return channel
}
// MakePeerUpdates opens a peer update subscription, with automatic cleanup.
// It checks that all updates have been consumed during cleanup.
func (n *Node) MakePeerUpdates(t *testing.T) *p2p.PeerUpdates {


+ 2
- 2
p2p/queue.go View File

@ -33,9 +33,9 @@ type fifoQueue struct {
var _ queue = (*fifoQueue)(nil)
func newFIFOQueue() *fifoQueue {
func newFIFOQueue(size int) *fifoQueue {
return &fifoQueue{
queueCh: make(chan Envelope),
queueCh: make(chan Envelope, size),
closeCh: make(chan struct{}),
}
}


+ 8
- 7
p2p/router.go View File

@ -238,11 +238,12 @@ func NewRouter(
// close the channel when done, before stopping the Router. messageType is the
// type of message passed through the channel (used for unmarshaling), which can
// implement Wrapper to automatically (un)wrap multiple message types in a
// wrapper message.
func (r *Router) OpenChannel(id ChannelID, messageType proto.Message) (*Channel, error) {
queue := newFIFOQueue()
outCh := make(chan Envelope)
errCh := make(chan PeerError)
// wrapper message. The caller may provide a size to make the channel buffered,
// which internally makes the inbound, outbound, and error channel buffered.
func (r *Router) OpenChannel(id ChannelID, messageType proto.Message, size int) (*Channel, error) {
queue := newFIFOQueue(size)
outCh := make(chan Envelope, size)
errCh := make(chan PeerError, size)
channel := NewChannel(id, messageType, queue.dequeue(), outCh, errCh)
var wrapper Wrapper
@ -421,7 +422,7 @@ func (r *Router) acceptPeers(transport Transport) {
return
}
queue := newFIFOQueue()
queue := newFIFOQueue(0)
r.peerMtx.Lock()
r.peerQueues[peerInfo.NodeID] = queue
r.peerMtx.Unlock()
@ -495,7 +496,7 @@ func (r *Router) dialPeers() {
return
}
queue := newFIFOQueue()
queue := newFIFOQueue(0)
r.peerMtx.Lock()
r.peerQueues[peerID] = queue
r.peerMtx.Unlock()


+ 20
- 10
p2p/router_test.go View File

@ -45,9 +45,11 @@ func TestRouter_Network(t *testing.T) {
// Create a test network and open a channel where all peers run echoReactor.
network := p2ptest.MakeNetwork(t, 8)
network.Start(t)
local := network.RandomNode()
peers := network.Peers(local.NodeID)
channels := network.MakeChannels(t, 1, &p2ptest.Message{})
channels := network.MakeChannels(t, 1, &p2ptest.Message{}, 0)
channel := channels[local.NodeID]
for _, peer := range peers {
@ -104,22 +106,22 @@ func TestRouter_Channel(t *testing.T) {
})
// Opening a channel should work.
channel, err := router.OpenChannel(chID, &p2ptest.Message{})
channel, err := router.OpenChannel(chID, &p2ptest.Message{}, 0)
require.NoError(t, err)
// Opening the same channel again should fail.
_, err = router.OpenChannel(chID, &p2ptest.Message{})
_, err = router.OpenChannel(chID, &p2ptest.Message{}, 0)
require.Error(t, err)
// Opening a different channel should work.
_, err = router.OpenChannel(2, &p2ptest.Message{})
_, err = router.OpenChannel(2, &p2ptest.Message{}, 0)
require.NoError(t, err)
// Closing the channel, then opening it again should be fine.
channel.Close()
time.Sleep(100 * time.Millisecond) // yes yes, but Close() is async...
channel, err = router.OpenChannel(chID, &p2ptest.Message{})
channel, err = router.OpenChannel(chID, &p2ptest.Message{}, 0)
require.NoError(t, err)
// We should be able to send on the channel, even though there are no peers.
@ -142,11 +144,13 @@ func TestRouter_Channel_SendReceive(t *testing.T) {
// Create a test network and open a channel on all nodes.
network := p2ptest.MakeNetwork(t, 3)
network.Start(t)
ids := network.NodeIDs()
aID, bID, cID := ids[0], ids[1], ids[2]
channels := network.MakeChannels(t, chID, &p2ptest.Message{})
channels := network.MakeChannels(t, chID, &p2ptest.Message{}, 0)
a, b, c := channels[aID], channels[bID], channels[cID]
otherChannels := network.MakeChannels(t, 9, &p2ptest.Message{})
otherChannels := network.MakeChannels(t, 9, &p2ptest.Message{}, 0)
// Sending a message a->b should work, and not send anything
// further to a, b, or c.
@ -198,9 +202,11 @@ func TestRouter_Channel_Broadcast(t *testing.T) {
// Create a test network and open a channel on all nodes.
network := p2ptest.MakeNetwork(t, 4)
network.Start(t)
ids := network.NodeIDs()
aID, bID, cID, dID := ids[0], ids[1], ids[2], ids[3]
channels := network.MakeChannels(t, 1, &p2ptest.Message{})
channels := network.MakeChannels(t, 1, &p2ptest.Message{}, 0)
a, b, c, d := channels[aID], channels[bID], channels[cID], channels[dID]
// Sending a broadcast from b should work.
@ -223,9 +229,11 @@ func TestRouter_Channel_Wrapper(t *testing.T) {
// Create a test network and open a channel on all nodes.
network := p2ptest.MakeNetwork(t, 2)
network.Start(t)
ids := network.NodeIDs()
aID, bID := ids[0], ids[1]
channels := network.MakeChannels(t, 1, &wrapperMessage{})
channels := network.MakeChannels(t, 1, &wrapperMessage{}, 0)
a, b := channels[aID], channels[bID]
// Since wrapperMessage implements p2p.Wrapper and handles Message, it
@ -279,9 +287,11 @@ func TestRouter_Channel_Error(t *testing.T) {
// Create a test network and open a channel on all nodes.
network := p2ptest.MakeNetwork(t, 3)
network.Start(t)
ids := network.NodeIDs()
aID, bID := ids[0], ids[1]
channels := network.MakeChannels(t, 1, &p2ptest.Message{})
channels := network.MakeChannels(t, 1, &p2ptest.Message{}, 0)
a := channels[aID]
// Erroring b should cause it to be disconnected. It will reconnect shortly after.


+ 80
- 0
proto/tendermint/consensus/message.go View File

@ -0,0 +1,80 @@
package consensus
import (
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
)
// Wrap implements the p2p Wrapper interface and wraps a consensus proto message.
func (m *Message) Wrap(pb proto.Message) error {
switch msg := pb.(type) {
case *NewRoundStep:
m.Sum = &Message_NewRoundStep{NewRoundStep: msg}
case *NewValidBlock:
m.Sum = &Message_NewValidBlock{NewValidBlock: msg}
case *Proposal:
m.Sum = &Message_Proposal{Proposal: msg}
case *ProposalPOL:
m.Sum = &Message_ProposalPol{ProposalPol: msg}
case *BlockPart:
m.Sum = &Message_BlockPart{BlockPart: msg}
case *Vote:
m.Sum = &Message_Vote{Vote: msg}
case *HasVote:
m.Sum = &Message_HasVote{HasVote: msg}
case *VoteSetMaj23:
m.Sum = &Message_VoteSetMaj23{VoteSetMaj23: msg}
case *VoteSetBits:
m.Sum = &Message_VoteSetBits{VoteSetBits: msg}
default:
return fmt.Errorf("unknown message: %T", msg)
}
return nil
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped consensus
// proto message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_NewRoundStep:
return m.GetNewRoundStep(), nil
case *Message_NewValidBlock:
return m.GetNewValidBlock(), nil
case *Message_Proposal:
return m.GetProposal(), nil
case *Message_ProposalPol:
return m.GetProposalPol(), nil
case *Message_BlockPart:
return m.GetBlockPart(), nil
case *Message_Vote:
return m.GetVote(), nil
case *Message_HasVote:
return m.GetHasVote(), nil
case *Message_VoteSetMaj23:
return m.GetVoteSetMaj23(), nil
case *Message_VoteSetBits:
return m.GetVoteSetBits(), nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
}

+ 2
- 2
proto/tendermint/statesync/message.go View File

@ -7,7 +7,7 @@ import (
proto "github.com/gogo/protobuf/proto"
)
// Wrap implements the p2p Wrapper interface and wraps a state sync messages.
// Wrap implements the p2p Wrapper interface and wraps a state sync proto message.
func (m *Message) Wrap(pb proto.Message) error {
switch msg := pb.(type) {
case *ChunkRequest:
@ -30,7 +30,7 @@ func (m *Message) Wrap(pb proto.Message) error {
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped state sync
// message.
// proto message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_ChunkRequest:


+ 2
- 2
statesync/reactor.go View File

@ -68,8 +68,8 @@ const (
chunkMsgSize = int(16e6)
)
// Reactor handles state sync, both restoring snapshots for the local node and serving snapshots
// for other nodes.
// Reactor handles state sync, both restoring snapshots for the local node and
// serving snapshots for other nodes.
type Reactor struct {
service.BaseService


Loading…
Cancel
Save