|
package consensus
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"os"
|
|
"path"
|
|
"sort"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
abcicli "github.com/tendermint/abci/client"
|
|
abci "github.com/tendermint/abci/types"
|
|
bc "github.com/tendermint/tendermint/blockchain"
|
|
cfg "github.com/tendermint/tendermint/config"
|
|
cstypes "github.com/tendermint/tendermint/consensus/types"
|
|
mempl "github.com/tendermint/tendermint/mempool"
|
|
"github.com/tendermint/tendermint/p2p"
|
|
sm "github.com/tendermint/tendermint/state"
|
|
"github.com/tendermint/tendermint/types"
|
|
pvm "github.com/tendermint/tendermint/types/priv_validator"
|
|
cmn "github.com/tendermint/tmlibs/common"
|
|
dbm "github.com/tendermint/tmlibs/db"
|
|
"github.com/tendermint/tmlibs/log"
|
|
|
|
"github.com/tendermint/abci/example/counter"
|
|
"github.com/tendermint/abci/example/kvstore"
|
|
|
|
"github.com/go-kit/kit/log/term"
|
|
)
|
|
|
|
const (
|
|
testSubscriber = "test-client"
|
|
)
|
|
|
|
// genesis, chain_id, priv_val
|
|
var config *cfg.Config // NOTE: must be reset for each _test.go file
|
|
var ensureTimeout = time.Second * 1 // must be in seconds because CreateEmptyBlocksInterval is
|
|
|
|
func ensureDir(dir string, mode os.FileMode) {
|
|
if err := cmn.EnsureDir(dir, mode); err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func ResetConfig(name string) *cfg.Config {
|
|
return cfg.ResetTestRoot(name)
|
|
}
|
|
|
|
//-------------------------------------------------------------------------------
|
|
// validator stub (a kvstore consensus peer we control)
|
|
|
|
type validatorStub struct {
|
|
Index int // Validator index. NOTE: we don't assume validator set changes.
|
|
Height int64
|
|
Round int
|
|
types.PrivValidator
|
|
}
|
|
|
|
var testMinPower int64 = 10
|
|
|
|
func NewValidatorStub(privValidator types.PrivValidator, valIndex int) *validatorStub {
|
|
return &validatorStub{
|
|
Index: valIndex,
|
|
PrivValidator: privValidator,
|
|
}
|
|
}
|
|
|
|
func (vs *validatorStub) signVote(voteType byte, hash []byte, header types.PartSetHeader) (*types.Vote, error) {
|
|
vote := &types.Vote{
|
|
ValidatorIndex: vs.Index,
|
|
ValidatorAddress: vs.PrivValidator.GetAddress(),
|
|
Height: vs.Height,
|
|
Round: vs.Round,
|
|
Timestamp: time.Now().UTC(),
|
|
Type: voteType,
|
|
BlockID: types.BlockID{hash, header},
|
|
}
|
|
err := vs.PrivValidator.SignVote(config.ChainID(), vote)
|
|
return vote, err
|
|
}
|
|
|
|
// Sign vote for type/hash/header
|
|
func signVote(vs *validatorStub, voteType byte, hash []byte, header types.PartSetHeader) *types.Vote {
|
|
v, err := vs.signVote(voteType, hash, header)
|
|
if err != nil {
|
|
panic(fmt.Errorf("failed to sign vote: %v", err))
|
|
}
|
|
return v
|
|
}
|
|
|
|
func signVotes(voteType byte, hash []byte, header types.PartSetHeader, vss ...*validatorStub) []*types.Vote {
|
|
votes := make([]*types.Vote, len(vss))
|
|
for i, vs := range vss {
|
|
votes[i] = signVote(vs, voteType, hash, header)
|
|
}
|
|
return votes
|
|
}
|
|
|
|
func incrementHeight(vss ...*validatorStub) {
|
|
for _, vs := range vss {
|
|
vs.Height++
|
|
}
|
|
}
|
|
|
|
func incrementRound(vss ...*validatorStub) {
|
|
for _, vs := range vss {
|
|
vs.Round++
|
|
}
|
|
}
|
|
|
|
//-------------------------------------------------------------------------------
|
|
// Functions for transitioning the consensus state
|
|
|
|
func startTestRound(cs *ConsensusState, height int64, round int) {
|
|
cs.enterNewRound(height, round)
|
|
cs.startRoutines(0)
|
|
}
|
|
|
|
// Create proposal block from cs1 but sign it with vs
|
|
func decideProposal(cs1 *ConsensusState, vs *validatorStub, height int64, round int) (proposal *types.Proposal, block *types.Block) {
|
|
block, blockParts := cs1.createProposalBlock()
|
|
if block == nil { // on error
|
|
panic("error creating proposal block")
|
|
}
|
|
|
|
// Make proposal
|
|
polRound, polBlockID := cs1.Votes.POLInfo()
|
|
proposal = types.NewProposal(height, round, blockParts.Header(), polRound, polBlockID)
|
|
if err := vs.SignProposal(cs1.state.ChainID, proposal); err != nil {
|
|
panic(err)
|
|
}
|
|
return
|
|
}
|
|
|
|
func addVotes(to *ConsensusState, votes ...*types.Vote) {
|
|
for _, vote := range votes {
|
|
to.peerMsgQueue <- msgInfo{Msg: &VoteMessage{vote}}
|
|
}
|
|
}
|
|
|
|
func signAddVotes(to *ConsensusState, voteType byte, hash []byte, header types.PartSetHeader, vss ...*validatorStub) {
|
|
votes := signVotes(voteType, hash, header, vss...)
|
|
addVotes(to, votes...)
|
|
}
|
|
|
|
func validatePrevote(t *testing.T, cs *ConsensusState, round int, privVal *validatorStub, blockHash []byte) {
|
|
prevotes := cs.Votes.Prevotes(round)
|
|
var vote *types.Vote
|
|
if vote = prevotes.GetByAddress(privVal.GetAddress()); vote == nil {
|
|
panic("Failed to find prevote from validator")
|
|
}
|
|
if blockHash == nil {
|
|
if vote.BlockID.Hash != nil {
|
|
panic(fmt.Sprintf("Expected prevote to be for nil, got %X", vote.BlockID.Hash))
|
|
}
|
|
} else {
|
|
if !bytes.Equal(vote.BlockID.Hash, blockHash) {
|
|
panic(fmt.Sprintf("Expected prevote to be for %X, got %X", blockHash, vote.BlockID.Hash))
|
|
}
|
|
}
|
|
}
|
|
|
|
func validateLastPrecommit(t *testing.T, cs *ConsensusState, privVal *validatorStub, blockHash []byte) {
|
|
votes := cs.LastCommit
|
|
var vote *types.Vote
|
|
if vote = votes.GetByAddress(privVal.GetAddress()); vote == nil {
|
|
panic("Failed to find precommit from validator")
|
|
}
|
|
if !bytes.Equal(vote.BlockID.Hash, blockHash) {
|
|
panic(fmt.Sprintf("Expected precommit to be for %X, got %X", blockHash, vote.BlockID.Hash))
|
|
}
|
|
}
|
|
|
|
func validatePrecommit(t *testing.T, cs *ConsensusState, thisRound, lockRound int, privVal *validatorStub, votedBlockHash, lockedBlockHash []byte) {
|
|
precommits := cs.Votes.Precommits(thisRound)
|
|
var vote *types.Vote
|
|
if vote = precommits.GetByAddress(privVal.GetAddress()); vote == nil {
|
|
panic("Failed to find precommit from validator")
|
|
}
|
|
|
|
if votedBlockHash == nil {
|
|
if vote.BlockID.Hash != nil {
|
|
panic("Expected precommit to be for nil")
|
|
}
|
|
} else {
|
|
if !bytes.Equal(vote.BlockID.Hash, votedBlockHash) {
|
|
panic("Expected precommit to be for proposal block")
|
|
}
|
|
}
|
|
|
|
if lockedBlockHash == nil {
|
|
if cs.LockedRound != lockRound || cs.LockedBlock != nil {
|
|
panic(fmt.Sprintf("Expected to be locked on nil at round %d. Got locked at round %d with block %v", lockRound, cs.LockedRound, cs.LockedBlock))
|
|
}
|
|
} else {
|
|
if cs.LockedRound != lockRound || !bytes.Equal(cs.LockedBlock.Hash(), lockedBlockHash) {
|
|
panic(fmt.Sprintf("Expected block to be locked on round %d, got %d. Got locked block %X, expected %X", lockRound, cs.LockedRound, cs.LockedBlock.Hash(), lockedBlockHash))
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
func validatePrevoteAndPrecommit(t *testing.T, cs *ConsensusState, thisRound, lockRound int, privVal *validatorStub, votedBlockHash, lockedBlockHash []byte) {
|
|
// verify the prevote
|
|
validatePrevote(t, cs, thisRound, privVal, votedBlockHash)
|
|
// verify precommit
|
|
cs.mtx.Lock()
|
|
validatePrecommit(t, cs, thisRound, lockRound, privVal, votedBlockHash, lockedBlockHash)
|
|
cs.mtx.Unlock()
|
|
}
|
|
|
|
// genesis
|
|
func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} {
|
|
voteCh0 := make(chan interface{})
|
|
err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryVote, voteCh0)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("failed to subscribe %s to %v", testSubscriber, types.EventQueryVote))
|
|
}
|
|
voteCh := make(chan interface{})
|
|
go func() {
|
|
for v := range voteCh0 {
|
|
vote := v.(types.EventDataVote)
|
|
// we only fire for our own votes
|
|
if bytes.Equal(addr, vote.Vote.ValidatorAddress) {
|
|
voteCh <- v
|
|
}
|
|
}
|
|
}()
|
|
return voteCh
|
|
}
|
|
|
|
//-------------------------------------------------------------------------------
|
|
// consensus states
|
|
|
|
func newConsensusState(state sm.State, pv types.PrivValidator, app abci.Application) *ConsensusState {
|
|
return newConsensusStateWithConfig(config, state, pv, app)
|
|
}
|
|
|
|
func newConsensusStateWithConfig(thisConfig *cfg.Config, state sm.State, pv types.PrivValidator, app abci.Application) *ConsensusState {
|
|
blockDB := dbm.NewMemDB()
|
|
return newConsensusStateWithConfigAndBlockStore(thisConfig, state, pv, app, blockDB)
|
|
}
|
|
|
|
func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state sm.State, pv types.PrivValidator, app abci.Application, blockDB dbm.DB) *ConsensusState {
|
|
// Get BlockStore
|
|
blockStore := bc.NewBlockStore(blockDB)
|
|
|
|
// one for mempool, one for consensus
|
|
mtx := new(sync.Mutex)
|
|
proxyAppConnMem := abcicli.NewLocalClient(mtx, app)
|
|
proxyAppConnCon := abcicli.NewLocalClient(mtx, app)
|
|
|
|
// Make Mempool
|
|
mempool := mempl.NewMempool(thisConfig.Mempool, proxyAppConnMem, 0)
|
|
mempool.SetLogger(log.TestingLogger().With("module", "mempool"))
|
|
if thisConfig.Consensus.WaitForTxs() {
|
|
mempool.EnableTxsAvailable()
|
|
}
|
|
|
|
// mock the evidence pool
|
|
evpool := types.MockEvidencePool{}
|
|
|
|
// Make ConsensusReactor
|
|
stateDB := dbm.NewMemDB()
|
|
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
|
|
cs := NewConsensusState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
|
|
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
|
|
cs.SetPrivValidator(pv)
|
|
|
|
eventBus := types.NewEventBus()
|
|
eventBus.SetLogger(log.TestingLogger().With("module", "events"))
|
|
eventBus.Start()
|
|
cs.SetEventBus(eventBus)
|
|
return cs
|
|
}
|
|
|
|
func loadPrivValidator(config *cfg.Config) *pvm.FilePV {
|
|
privValidatorFile := config.PrivValidatorFile()
|
|
ensureDir(path.Dir(privValidatorFile), 0700)
|
|
privValidator := pvm.LoadOrGenFilePV(privValidatorFile)
|
|
privValidator.Reset()
|
|
return privValidator
|
|
}
|
|
|
|
func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) {
|
|
// Get State
|
|
state, privVals := randGenesisState(nValidators, false, 10)
|
|
|
|
vss := make([]*validatorStub, nValidators)
|
|
|
|
cs := newConsensusState(state, privVals[0], counter.NewCounterApplication(true))
|
|
|
|
for i := 0; i < nValidators; i++ {
|
|
vss[i] = NewValidatorStub(privVals[i], i)
|
|
}
|
|
// since cs1 starts at 1
|
|
incrementHeight(vss[1:]...)
|
|
|
|
return cs, vss
|
|
}
|
|
|
|
//-------------------------------------------------------------------------------
|
|
|
|
func ensureNoNewStep(stepCh <-chan interface{}) {
|
|
timer := time.NewTimer(ensureTimeout)
|
|
select {
|
|
case <-timer.C:
|
|
break
|
|
case <-stepCh:
|
|
panic("We should be stuck waiting, not moving to the next step")
|
|
}
|
|
}
|
|
|
|
func ensureNewStep(stepCh <-chan interface{}) {
|
|
timer := time.NewTimer(ensureTimeout)
|
|
select {
|
|
case <-timer.C:
|
|
panic("We shouldnt be stuck waiting")
|
|
case <-stepCh:
|
|
break
|
|
}
|
|
}
|
|
|
|
//-------------------------------------------------------------------------------
|
|
// consensus nets
|
|
|
|
// consensusLogger is a TestingLogger which uses a different
|
|
// color for each validator ("validator" key must exist).
|
|
func consensusLogger() log.Logger {
|
|
return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor {
|
|
for i := 0; i < len(keyvals)-1; i += 2 {
|
|
if keyvals[i] == "validator" {
|
|
return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))}
|
|
}
|
|
}
|
|
return term.FgBgColor{}
|
|
}).With("module", "consensus")
|
|
}
|
|
|
|
func randConsensusNet(nValidators int, testName string, tickerFunc func() TimeoutTicker, appFunc func() abci.Application, configOpts ...func(*cfg.Config)) []*ConsensusState {
|
|
genDoc, privVals := randGenesisDoc(nValidators, false, 30)
|
|
css := make([]*ConsensusState, nValidators)
|
|
logger := consensusLogger()
|
|
for i := 0; i < nValidators; i++ {
|
|
stateDB := dbm.NewMemDB() // each state needs its own db
|
|
state, _ := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc)
|
|
thisConfig := ResetConfig(cmn.Fmt("%s_%d", testName, i))
|
|
for _, opt := range configOpts {
|
|
opt(thisConfig)
|
|
}
|
|
ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
|
|
app := appFunc()
|
|
vals := types.TM2PB.Validators(state.Validators)
|
|
app.InitChain(abci.RequestInitChain{Validators: vals})
|
|
|
|
css[i] = newConsensusStateWithConfig(thisConfig, state, privVals[i], app)
|
|
css[i].SetTimeoutTicker(tickerFunc())
|
|
css[i].SetLogger(logger.With("validator", i, "module", "consensus"))
|
|
}
|
|
return css
|
|
}
|
|
|
|
// nPeers = nValidators + nNotValidator
|
|
func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerFunc func() TimeoutTicker, appFunc func() abci.Application) []*ConsensusState {
|
|
genDoc, privVals := randGenesisDoc(nValidators, false, testMinPower)
|
|
css := make([]*ConsensusState, nPeers)
|
|
logger := consensusLogger()
|
|
for i := 0; i < nPeers; i++ {
|
|
stateDB := dbm.NewMemDB() // each state needs its own db
|
|
state, _ := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc)
|
|
thisConfig := ResetConfig(cmn.Fmt("%s_%d", testName, i))
|
|
ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
|
|
var privVal types.PrivValidator
|
|
if i < nValidators {
|
|
privVal = privVals[i]
|
|
} else {
|
|
_, tempFilePath := cmn.Tempfile("priv_validator_")
|
|
privVal = pvm.GenFilePV(tempFilePath)
|
|
}
|
|
|
|
app := appFunc()
|
|
vals := types.TM2PB.Validators(state.Validators)
|
|
app.InitChain(abci.RequestInitChain{Validators: vals})
|
|
|
|
css[i] = newConsensusStateWithConfig(thisConfig, state, privVal, app)
|
|
css[i].SetTimeoutTicker(tickerFunc())
|
|
css[i].SetLogger(logger.With("validator", i, "module", "consensus"))
|
|
}
|
|
return css
|
|
}
|
|
|
|
func getSwitchIndex(switches []*p2p.Switch, peer p2p.Peer) int {
|
|
for i, s := range switches {
|
|
if bytes.Equal(peer.NodeInfo().PubKey.Address(), s.NodeInfo().PubKey.Address()) {
|
|
return i
|
|
}
|
|
}
|
|
panic("didnt find peer in switches")
|
|
return -1
|
|
}
|
|
|
|
//-------------------------------------------------------------------------------
|
|
// genesis
|
|
|
|
func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.GenesisDoc, []types.PrivValidator) {
|
|
validators := make([]types.GenesisValidator, numValidators)
|
|
privValidators := make([]types.PrivValidator, numValidators)
|
|
for i := 0; i < numValidators; i++ {
|
|
val, privVal := types.RandValidator(randPower, minPower)
|
|
validators[i] = types.GenesisValidator{
|
|
PubKey: val.PubKey,
|
|
Power: val.VotingPower,
|
|
}
|
|
privValidators[i] = privVal
|
|
}
|
|
sort.Sort(types.PrivValidatorsByAddress(privValidators))
|
|
|
|
return &types.GenesisDoc{
|
|
GenesisTime: time.Now(),
|
|
ChainID: config.ChainID(),
|
|
Validators: validators,
|
|
}, privValidators
|
|
}
|
|
|
|
func randGenesisState(numValidators int, randPower bool, minPower int64) (sm.State, []types.PrivValidator) {
|
|
genDoc, privValidators := randGenesisDoc(numValidators, randPower, minPower)
|
|
s0, _ := sm.MakeGenesisState(genDoc)
|
|
db := dbm.NewMemDB()
|
|
sm.SaveState(db, s0)
|
|
return s0, privValidators
|
|
}
|
|
|
|
//------------------------------------
|
|
// mock ticker
|
|
|
|
func newMockTickerFunc(onlyOnce bool) func() TimeoutTicker {
|
|
return func() TimeoutTicker {
|
|
return &mockTicker{
|
|
c: make(chan timeoutInfo, 10),
|
|
onlyOnce: onlyOnce,
|
|
}
|
|
}
|
|
}
|
|
|
|
// mock ticker only fires on RoundStepNewHeight
|
|
// and only once if onlyOnce=true
|
|
type mockTicker struct {
|
|
c chan timeoutInfo
|
|
|
|
mtx sync.Mutex
|
|
onlyOnce bool
|
|
fired bool
|
|
}
|
|
|
|
func (m *mockTicker) Start() error {
|
|
return nil
|
|
}
|
|
|
|
func (m *mockTicker) Stop() error {
|
|
return nil
|
|
}
|
|
|
|
func (m *mockTicker) ScheduleTimeout(ti timeoutInfo) {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
if m.onlyOnce && m.fired {
|
|
return
|
|
}
|
|
if ti.Step == cstypes.RoundStepNewHeight {
|
|
m.c <- ti
|
|
m.fired = true
|
|
}
|
|
}
|
|
|
|
func (m *mockTicker) Chan() <-chan timeoutInfo {
|
|
return m.c
|
|
}
|
|
|
|
func (mockTicker) SetLogger(log.Logger) {
|
|
}
|
|
|
|
//------------------------------------
|
|
|
|
func newCounter() abci.Application {
|
|
return counter.NewCounterApplication(true)
|
|
}
|
|
|
|
func newPersistentKVStore() abci.Application {
|
|
dir, _ := ioutil.TempDir("/tmp", "persistent-kvstore")
|
|
return kvstore.NewPersistentKVStoreApplication(dir)
|
|
}
|