You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

369 lines
11 KiB

9 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io/ioutil"
  6. "sort"
  7. "sync"
  8. "testing"
  9. "time"
  10. . "github.com/tendermint/go-common"
  11. cfg "github.com/tendermint/go-config"
  12. dbm "github.com/tendermint/go-db"
  13. "github.com/tendermint/go-p2p"
  14. bc "github.com/tendermint/tendermint/blockchain"
  15. "github.com/tendermint/tendermint/config/tendermint_test"
  16. mempl "github.com/tendermint/tendermint/mempool"
  17. sm "github.com/tendermint/tendermint/state"
  18. "github.com/tendermint/tendermint/types"
  19. tmspcli "github.com/tendermint/tmsp/client"
  20. tmsp "github.com/tendermint/tmsp/types"
  21. "github.com/tendermint/tmsp/example/counter"
  22. "github.com/tendermint/tmsp/example/dummy"
  23. )
  24. var config cfg.Config // NOTE: must be reset for each _test.go file
  25. var ensureTimeout = time.Duration(2)
  26. type validatorStub struct {
  27. Index int // Validator index. NOTE: we don't assume validator set changes.
  28. Height int
  29. Round int
  30. *types.PrivValidator
  31. }
  32. var testMinPower = 10
  33. func NewValidatorStub(privValidator *types.PrivValidator, valIndex int) *validatorStub {
  34. return &validatorStub{
  35. Index: valIndex,
  36. PrivValidator: privValidator,
  37. }
  38. }
  39. func (vs *validatorStub) signVote(voteType byte, hash []byte, header types.PartSetHeader) (*types.Vote, error) {
  40. vote := &types.Vote{
  41. ValidatorIndex: vs.Index,
  42. ValidatorAddress: vs.PrivValidator.Address,
  43. Height: vs.Height,
  44. Round: vs.Round,
  45. Type: voteType,
  46. BlockID: types.BlockID{hash, header},
  47. }
  48. err := vs.PrivValidator.SignVote(config.GetString("chain_id"), vote)
  49. return vote, err
  50. }
  51. //-------------------------------------------------------------------------------
  52. // Convenience functions
  53. // Sign vote for type/hash/header
  54. func signVote(vs *validatorStub, voteType byte, hash []byte, header types.PartSetHeader) *types.Vote {
  55. v, err := vs.signVote(voteType, hash, header)
  56. if err != nil {
  57. panic(fmt.Errorf("failed to sign vote: %v", err))
  58. }
  59. return v
  60. }
  61. // Create proposal block from cs1 but sign it with vs
  62. func decideProposal(cs1 *ConsensusState, vs *validatorStub, height, round int) (proposal *types.Proposal, block *types.Block) {
  63. block, blockParts := cs1.createProposalBlock()
  64. if block == nil { // on error
  65. panic("error creating proposal block")
  66. }
  67. // Make proposal
  68. polRound, polBlockID := cs1.Votes.POLInfo()
  69. proposal = types.NewProposal(height, round, blockParts.Header(), polRound, polBlockID)
  70. if err := vs.SignProposal(config.GetString("chain_id"), proposal); err != nil {
  71. panic(err)
  72. }
  73. return
  74. }
  75. func addVotes(to *ConsensusState, votes ...*types.Vote) {
  76. for _, vote := range votes {
  77. to.peerMsgQueue <- msgInfo{Msg: &VoteMessage{vote}}
  78. }
  79. }
  80. func signVotes(voteType byte, hash []byte, header types.PartSetHeader, vss ...*validatorStub) []*types.Vote {
  81. votes := make([]*types.Vote, len(vss))
  82. for i, vs := range vss {
  83. votes[i] = signVote(vs, voteType, hash, header)
  84. }
  85. return votes
  86. }
  87. func signAddVotes(to *ConsensusState, voteType byte, hash []byte, header types.PartSetHeader, vss ...*validatorStub) {
  88. votes := signVotes(voteType, hash, header, vss...)
  89. addVotes(to, votes...)
  90. }
  91. func ensureNoNewStep(stepCh chan interface{}) {
  92. timeout := time.NewTicker(ensureTimeout * time.Second)
  93. select {
  94. case <-timeout.C:
  95. break
  96. case <-stepCh:
  97. panic("We should be stuck waiting for more votes, not moving to the next step")
  98. }
  99. }
  100. func incrementHeight(vss ...*validatorStub) {
  101. for _, vs := range vss {
  102. vs.Height += 1
  103. }
  104. }
  105. func incrementRound(vss ...*validatorStub) {
  106. for _, vs := range vss {
  107. vs.Round += 1
  108. }
  109. }
  110. func validatePrevote(t *testing.T, cs *ConsensusState, round int, privVal *validatorStub, blockHash []byte) {
  111. prevotes := cs.Votes.Prevotes(round)
  112. var vote *types.Vote
  113. if vote = prevotes.GetByAddress(privVal.Address); vote == nil {
  114. panic("Failed to find prevote from validator")
  115. }
  116. if blockHash == nil {
  117. if vote.BlockID.Hash != nil {
  118. panic(fmt.Sprintf("Expected prevote to be for nil, got %X", vote.BlockID.Hash))
  119. }
  120. } else {
  121. if !bytes.Equal(vote.BlockID.Hash, blockHash) {
  122. panic(fmt.Sprintf("Expected prevote to be for %X, got %X", blockHash, vote.BlockID.Hash))
  123. }
  124. }
  125. }
  126. func validateLastPrecommit(t *testing.T, cs *ConsensusState, privVal *validatorStub, blockHash []byte) {
  127. votes := cs.LastCommit
  128. var vote *types.Vote
  129. if vote = votes.GetByAddress(privVal.Address); vote == nil {
  130. panic("Failed to find precommit from validator")
  131. }
  132. if !bytes.Equal(vote.BlockID.Hash, blockHash) {
  133. panic(fmt.Sprintf("Expected precommit to be for %X, got %X", blockHash, vote.BlockID.Hash))
  134. }
  135. }
  136. func validatePrecommit(t *testing.T, cs *ConsensusState, thisRound, lockRound int, privVal *validatorStub, votedBlockHash, lockedBlockHash []byte) {
  137. precommits := cs.Votes.Precommits(thisRound)
  138. var vote *types.Vote
  139. if vote = precommits.GetByAddress(privVal.Address); vote == nil {
  140. panic("Failed to find precommit from validator")
  141. }
  142. if votedBlockHash == nil {
  143. if vote.BlockID.Hash != nil {
  144. panic("Expected precommit to be for nil")
  145. }
  146. } else {
  147. if !bytes.Equal(vote.BlockID.Hash, votedBlockHash) {
  148. panic("Expected precommit to be for proposal block")
  149. }
  150. }
  151. if lockedBlockHash == nil {
  152. if cs.LockedRound != lockRound || cs.LockedBlock != nil {
  153. panic(fmt.Sprintf("Expected to be locked on nil at round %d. Got locked at round %d with block %v", lockRound, cs.LockedRound, cs.LockedBlock))
  154. }
  155. } else {
  156. if cs.LockedRound != lockRound || !bytes.Equal(cs.LockedBlock.Hash(), lockedBlockHash) {
  157. panic(fmt.Sprintf("Expected block to be locked on round %d, got %d. Got locked block %X, expected %X", lockRound, cs.LockedRound, cs.LockedBlock.Hash(), lockedBlockHash))
  158. }
  159. }
  160. }
  161. func validatePrevoteAndPrecommit(t *testing.T, cs *ConsensusState, thisRound, lockRound int, privVal *validatorStub, votedBlockHash, lockedBlockHash []byte) {
  162. // verify the prevote
  163. validatePrevote(t, cs, thisRound, privVal, votedBlockHash)
  164. // verify precommit
  165. cs.mtx.Lock()
  166. validatePrecommit(t, cs, thisRound, lockRound, privVal, votedBlockHash, lockedBlockHash)
  167. cs.mtx.Unlock()
  168. }
  169. func fixedConsensusState() *ConsensusState {
  170. stateDB := dbm.NewMemDB()
  171. state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
  172. privValidatorFile := config.GetString("priv_validator_file")
  173. privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
  174. privValidator.Reset()
  175. cs := newConsensusState(state, privValidator, counter.NewCounterApplication(true))
  176. return cs
  177. }
  178. func fixedConsensusStateDummy() *ConsensusState {
  179. stateDB := dbm.NewMemDB()
  180. state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
  181. privValidatorFile := config.GetString("priv_validator_file")
  182. privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
  183. privValidator.Reset()
  184. cs := newConsensusState(state, privValidator, dummy.NewDummyApplication())
  185. return cs
  186. }
  187. func newConsensusStateWithConfig(thisConfig cfg.Config, state *sm.State, pv *types.PrivValidator, app tmsp.Application) *ConsensusState {
  188. // Get BlockStore
  189. blockDB := dbm.NewMemDB()
  190. blockStore := bc.NewBlockStore(blockDB)
  191. // one for mempool, one for consensus
  192. mtx := new(sync.Mutex)
  193. proxyAppConnMem := tmspcli.NewLocalClient(mtx, app)
  194. proxyAppConnCon := tmspcli.NewLocalClient(mtx, app)
  195. // Make Mempool
  196. mempool := mempl.NewMempool(thisConfig, proxyAppConnMem)
  197. // Make ConsensusReactor
  198. cs := NewConsensusState(thisConfig, state, proxyAppConnCon, blockStore, mempool)
  199. cs.SetPrivValidator(pv)
  200. evsw := types.NewEventSwitch()
  201. cs.SetEventSwitch(evsw)
  202. evsw.Start()
  203. return cs
  204. }
  205. func newConsensusState(state *sm.State, pv *types.PrivValidator, app tmsp.Application) *ConsensusState {
  206. return newConsensusStateWithConfig(config, state, pv, app)
  207. }
  208. func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) {
  209. // Get State
  210. state, privVals := randGenesisState(nValidators, false, 10)
  211. vss := make([]*validatorStub, nValidators)
  212. cs := newConsensusState(state, privVals[0], counter.NewCounterApplication(true))
  213. for i := 0; i < nValidators; i++ {
  214. vss[i] = NewValidatorStub(privVals[i], i)
  215. }
  216. // since cs1 starts at 1
  217. incrementHeight(vss[1:]...)
  218. return cs, vss
  219. }
  220. func randConsensusNet(nValidators int) []*ConsensusState {
  221. genDoc, privVals := randGenesisDoc(nValidators, false, 10)
  222. css := make([]*ConsensusState, nValidators)
  223. for i := 0; i < nValidators; i++ {
  224. db := dbm.NewMemDB() // each state needs its own db
  225. state := sm.MakeGenesisState(db, genDoc)
  226. state.Save()
  227. thisConfig := tendermint_test.ResetConfig(Fmt("consensus_reactor_test_%d", i))
  228. EnsureDir(thisConfig.GetString("cs_wal_dir"), 0700) // dir for wal
  229. css[i] = newConsensusStateWithConfig(thisConfig, state, privVals[i], counter.NewCounterApplication(true))
  230. }
  231. return css
  232. }
  233. // nPeers = nValidators + nNotValidator
  234. func randConsensusNetWithPeers(nValidators int, nPeers int) []*ConsensusState {
  235. genDoc, privVals := randGenesisDoc(nValidators, false, int64(testMinPower))
  236. css := make([]*ConsensusState, nPeers)
  237. for i := 0; i < nPeers; i++ {
  238. db := dbm.NewMemDB() // each state needs its own db
  239. state := sm.MakeGenesisState(db, genDoc)
  240. state.Save()
  241. thisConfig := tendermint_test.ResetConfig(Fmt("consensus_reactor_test_%d", i))
  242. EnsureDir(thisConfig.GetString("cs_wal_dir"), 0700) // dir for wal
  243. var privVal *types.PrivValidator
  244. if i < nValidators {
  245. privVal = privVals[i]
  246. } else {
  247. privVal = types.GenPrivValidator()
  248. _, tempFilePath := Tempfile("priv_validator_")
  249. privVal.SetFile(tempFilePath)
  250. }
  251. dir, _ := ioutil.TempDir("/tmp", "persistent-dummy")
  252. css[i] = newConsensusStateWithConfig(thisConfig, state, privVal, dummy.NewPersistentDummyApplication(dir))
  253. }
  254. return css
  255. }
  256. func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} {
  257. voteCh0 := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 1)
  258. voteCh := make(chan interface{})
  259. go func() {
  260. for {
  261. v := <-voteCh0
  262. vote := v.(types.EventDataVote)
  263. // we only fire for our own votes
  264. if bytes.Equal(addr, vote.Vote.ValidatorAddress) {
  265. voteCh <- v
  266. }
  267. }
  268. }()
  269. return voteCh
  270. }
  271. func readVotes(ch chan interface{}, reads int) chan struct{} {
  272. wg := make(chan struct{})
  273. go func() {
  274. for i := 0; i < reads; i++ {
  275. <-ch // read the precommit event
  276. }
  277. close(wg)
  278. }()
  279. return wg
  280. }
  281. func randGenesisState(numValidators int, randPower bool, minPower int64) (*sm.State, []*types.PrivValidator) {
  282. genDoc, privValidators := randGenesisDoc(numValidators, randPower, minPower)
  283. db := dbm.NewMemDB()
  284. s0 := sm.MakeGenesisState(db, genDoc)
  285. s0.Save()
  286. return s0, privValidators
  287. }
  288. func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.GenesisDoc, []*types.PrivValidator) {
  289. validators := make([]types.GenesisValidator, numValidators)
  290. privValidators := make([]*types.PrivValidator, numValidators)
  291. for i := 0; i < numValidators; i++ {
  292. val, privVal := types.RandValidator(randPower, minPower)
  293. validators[i] = types.GenesisValidator{
  294. PubKey: val.PubKey,
  295. Amount: val.VotingPower,
  296. }
  297. privValidators[i] = privVal
  298. }
  299. sort.Sort(types.PrivValidatorsByAddress(privValidators))
  300. return &types.GenesisDoc{
  301. GenesisTime: time.Now(),
  302. ChainID: config.GetString("chain_id"),
  303. Validators: validators,
  304. }, privValidators
  305. }
  306. func startTestRound(cs *ConsensusState, height, round int) {
  307. cs.enterNewRound(height, round)
  308. cs.startRoutines(0)
  309. }
  310. //--------------------------------
  311. // reactor stuff
  312. func getSwitchIndex(switches []*p2p.Switch, peer *p2p.Peer) int {
  313. for i, s := range switches {
  314. if bytes.Equal(peer.NodeInfo.PubKey.Address(), s.NodeInfo().PubKey.Address()) {
  315. return i
  316. }
  317. }
  318. panic("didnt find peer in switches")
  319. return -1
  320. }