You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

387 lines
12 KiB

9 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io/ioutil"
  6. "sort"
  7. "sync"
  8. "testing"
  9. "time"
  10. . "github.com/tendermint/go-common"
  11. cfg "github.com/tendermint/go-config"
  12. dbm "github.com/tendermint/go-db"
  13. "github.com/tendermint/go-logger"
  14. "github.com/tendermint/go-p2p"
  15. bc "github.com/tendermint/tendermint/blockchain"
  16. "github.com/tendermint/tendermint/config/tendermint_test"
  17. mempl "github.com/tendermint/tendermint/mempool"
  18. sm "github.com/tendermint/tendermint/state"
  19. "github.com/tendermint/tendermint/types"
  20. tmspcli "github.com/tendermint/tmsp/client"
  21. tmsp "github.com/tendermint/tmsp/types"
  22. "github.com/tendermint/tmsp/example/counter"
  23. "github.com/tendermint/tmsp/example/dummy"
  24. )
  25. var config cfg.Config // NOTE: must be reset for each _test.go file
  26. var ensureTimeout = time.Duration(2)
  27. type validatorStub struct {
  28. Index int // Validator index. NOTE: we don't assume validator set changes.
  29. Height int
  30. Round int
  31. *types.PrivValidator
  32. }
  33. var testMinPower = 10
  34. func NewValidatorStub(privValidator *types.PrivValidator, valIndex int) *validatorStub {
  35. return &validatorStub{
  36. Index: valIndex,
  37. PrivValidator: privValidator,
  38. }
  39. }
  40. func (vs *validatorStub) signVote(voteType byte, hash []byte, header types.PartSetHeader) (*types.Vote, error) {
  41. vote := &types.Vote{
  42. ValidatorIndex: vs.Index,
  43. ValidatorAddress: vs.PrivValidator.Address,
  44. Height: vs.Height,
  45. Round: vs.Round,
  46. Type: voteType,
  47. BlockID: types.BlockID{hash, header},
  48. }
  49. err := vs.PrivValidator.SignVote(config.GetString("chain_id"), vote)
  50. return vote, err
  51. }
  52. //-------------------------------------------------------------------------------
  53. // Convenience functions
  54. // Sign vote for type/hash/header
  55. func signVote(vs *validatorStub, voteType byte, hash []byte, header types.PartSetHeader) *types.Vote {
  56. v, err := vs.signVote(voteType, hash, header)
  57. if err != nil {
  58. panic(fmt.Errorf("failed to sign vote: %v", err))
  59. }
  60. return v
  61. }
  62. // Create proposal block from cs1 but sign it with vs
  63. func decideProposal(cs1 *ConsensusState, vs *validatorStub, height, round int) (proposal *types.Proposal, block *types.Block) {
  64. block, blockParts := cs1.createProposalBlock()
  65. if block == nil { // on error
  66. panic("error creating proposal block")
  67. }
  68. // Make proposal
  69. polRound, polBlockID := cs1.Votes.POLInfo()
  70. proposal = types.NewProposal(height, round, blockParts.Header(), polRound, polBlockID)
  71. if err := vs.SignProposal(config.GetString("chain_id"), proposal); err != nil {
  72. panic(err)
  73. }
  74. return
  75. }
  76. func addVotes(to *ConsensusState, votes ...*types.Vote) {
  77. for _, vote := range votes {
  78. to.peerMsgQueue <- msgInfo{Msg: &VoteMessage{vote}}
  79. }
  80. }
  81. func signVotes(voteType byte, hash []byte, header types.PartSetHeader, vss ...*validatorStub) []*types.Vote {
  82. votes := make([]*types.Vote, len(vss))
  83. for i, vs := range vss {
  84. votes[i] = signVote(vs, voteType, hash, header)
  85. }
  86. return votes
  87. }
  88. func signAddVotes(to *ConsensusState, voteType byte, hash []byte, header types.PartSetHeader, vss ...*validatorStub) {
  89. votes := signVotes(voteType, hash, header, vss...)
  90. addVotes(to, votes...)
  91. }
  92. func ensureNoNewStep(stepCh chan interface{}) {
  93. timeout := time.NewTicker(ensureTimeout * time.Second)
  94. select {
  95. case <-timeout.C:
  96. break
  97. case <-stepCh:
  98. panic("We should be stuck waiting for more votes, not moving to the next step")
  99. }
  100. }
  101. func incrementHeight(vss ...*validatorStub) {
  102. for _, vs := range vss {
  103. vs.Height += 1
  104. }
  105. }
  106. func incrementRound(vss ...*validatorStub) {
  107. for _, vs := range vss {
  108. vs.Round += 1
  109. }
  110. }
  111. func validatePrevote(t *testing.T, cs *ConsensusState, round int, privVal *validatorStub, blockHash []byte) {
  112. prevotes := cs.Votes.Prevotes(round)
  113. var vote *types.Vote
  114. if vote = prevotes.GetByAddress(privVal.Address); vote == nil {
  115. panic("Failed to find prevote from validator")
  116. }
  117. if blockHash == nil {
  118. if vote.BlockID.Hash != nil {
  119. panic(fmt.Sprintf("Expected prevote to be for nil, got %X", vote.BlockID.Hash))
  120. }
  121. } else {
  122. if !bytes.Equal(vote.BlockID.Hash, blockHash) {
  123. panic(fmt.Sprintf("Expected prevote to be for %X, got %X", blockHash, vote.BlockID.Hash))
  124. }
  125. }
  126. }
  127. func validateLastPrecommit(t *testing.T, cs *ConsensusState, privVal *validatorStub, blockHash []byte) {
  128. votes := cs.LastCommit
  129. var vote *types.Vote
  130. if vote = votes.GetByAddress(privVal.Address); vote == nil {
  131. panic("Failed to find precommit from validator")
  132. }
  133. if !bytes.Equal(vote.BlockID.Hash, blockHash) {
  134. panic(fmt.Sprintf("Expected precommit to be for %X, got %X", blockHash, vote.BlockID.Hash))
  135. }
  136. }
  137. func validatePrecommit(t *testing.T, cs *ConsensusState, thisRound, lockRound int, privVal *validatorStub, votedBlockHash, lockedBlockHash []byte) {
  138. precommits := cs.Votes.Precommits(thisRound)
  139. var vote *types.Vote
  140. if vote = precommits.GetByAddress(privVal.Address); vote == nil {
  141. panic("Failed to find precommit from validator")
  142. }
  143. if votedBlockHash == nil {
  144. if vote.BlockID.Hash != nil {
  145. panic("Expected precommit to be for nil")
  146. }
  147. } else {
  148. if !bytes.Equal(vote.BlockID.Hash, votedBlockHash) {
  149. panic("Expected precommit to be for proposal block")
  150. }
  151. }
  152. if lockedBlockHash == nil {
  153. if cs.LockedRound != lockRound || cs.LockedBlock != nil {
  154. panic(fmt.Sprintf("Expected to be locked on nil at round %d. Got locked at round %d with block %v", lockRound, cs.LockedRound, cs.LockedBlock))
  155. }
  156. } else {
  157. if cs.LockedRound != lockRound || !bytes.Equal(cs.LockedBlock.Hash(), lockedBlockHash) {
  158. panic(fmt.Sprintf("Expected block to be locked on round %d, got %d. Got locked block %X, expected %X", lockRound, cs.LockedRound, cs.LockedBlock.Hash(), lockedBlockHash))
  159. }
  160. }
  161. }
  162. func validatePrevoteAndPrecommit(t *testing.T, cs *ConsensusState, thisRound, lockRound int, privVal *validatorStub, votedBlockHash, lockedBlockHash []byte) {
  163. // verify the prevote
  164. validatePrevote(t, cs, thisRound, privVal, votedBlockHash)
  165. // verify precommit
  166. cs.mtx.Lock()
  167. validatePrecommit(t, cs, thisRound, lockRound, privVal, votedBlockHash, lockedBlockHash)
  168. cs.mtx.Unlock()
  169. }
  170. func fixedConsensusState() *ConsensusState {
  171. stateDB := dbm.NewMemDB()
  172. state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
  173. privValidatorFile := config.GetString("priv_validator_file")
  174. privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
  175. privValidator.Reset()
  176. cs := newConsensusState(state, privValidator, counter.NewCounterApplication(true))
  177. return cs
  178. }
  179. func fixedConsensusStateDummy() *ConsensusState {
  180. stateDB := dbm.NewMemDB()
  181. state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
  182. privValidatorFile := config.GetString("priv_validator_file")
  183. privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
  184. privValidator.Reset()
  185. cs := newConsensusState(state, privValidator, dummy.NewDummyApplication())
  186. return cs
  187. }
  188. func newConsensusStateWithConfig(thisConfig cfg.Config, state *sm.State, pv *types.PrivValidator, app tmsp.Application) *ConsensusState {
  189. // Get BlockStore
  190. blockDB := dbm.NewMemDB()
  191. blockStore := bc.NewBlockStore(blockDB)
  192. // one for mempool, one for consensus
  193. mtx := new(sync.Mutex)
  194. proxyAppConnMem := tmspcli.NewLocalClient(mtx, app)
  195. proxyAppConnCon := tmspcli.NewLocalClient(mtx, app)
  196. // Make Mempool
  197. mempool := mempl.NewMempool(thisConfig, proxyAppConnMem)
  198. // Make ConsensusReactor
  199. cs := NewConsensusState(thisConfig, state, proxyAppConnCon, blockStore, mempool)
  200. cs.SetPrivValidator(pv)
  201. evsw := types.NewEventSwitch()
  202. cs.SetEventSwitch(evsw)
  203. evsw.Start()
  204. return cs
  205. }
  206. func newConsensusState(state *sm.State, pv *types.PrivValidator, app tmsp.Application) *ConsensusState {
  207. return newConsensusStateWithConfig(config, state, pv, app)
  208. }
  209. func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) {
  210. // Get State
  211. state, privVals := randGenesisState(nValidators, false, 10)
  212. vss := make([]*validatorStub, nValidators)
  213. cs := newConsensusState(state, privVals[0], counter.NewCounterApplication(true))
  214. for i := 0; i < nValidators; i++ {
  215. vss[i] = NewValidatorStub(privVals[i], i)
  216. }
  217. // since cs1 starts at 1
  218. incrementHeight(vss[1:]...)
  219. return cs, vss
  220. }
  221. func randConsensusNet(nValidators int) []*ConsensusState {
  222. genDoc, privVals := randGenesisDoc(nValidators, false, 10)
  223. css := make([]*ConsensusState, nValidators)
  224. for i := 0; i < nValidators; i++ {
  225. db := dbm.NewMemDB() // each state needs its own db
  226. state := sm.MakeGenesisState(db, genDoc)
  227. state.Save()
  228. thisConfig := tendermint_test.ResetConfig(Fmt("consensus_reactor_test_%d", i))
  229. resetConfigTimeouts(thisConfig)
  230. EnsureDir(thisConfig.GetString("cs_wal_dir"), 0700) // dir for wal
  231. css[i] = newConsensusStateWithConfig(thisConfig, state, privVals[i], counter.NewCounterApplication(true))
  232. }
  233. return css
  234. }
  235. // nPeers = nValidators + nNotValidator
  236. func randConsensusNetWithPeers(nValidators int, nPeers int) []*ConsensusState {
  237. genDoc, privVals := randGenesisDoc(nValidators, false, int64(testMinPower))
  238. css := make([]*ConsensusState, nPeers)
  239. for i := 0; i < nPeers; i++ {
  240. db := dbm.NewMemDB() // each state needs its own db
  241. state := sm.MakeGenesisState(db, genDoc)
  242. state.Save()
  243. thisConfig := tendermint_test.ResetConfig(Fmt("consensus_reactor_test_%d", i))
  244. resetConfigTimeouts(thisConfig)
  245. EnsureDir(thisConfig.GetString("cs_wal_dir"), 0700) // dir for wal
  246. var privVal *types.PrivValidator
  247. if i < nValidators {
  248. privVal = privVals[i]
  249. } else {
  250. privVal = types.GenPrivValidator()
  251. _, tempFilePath := Tempfile("priv_validator_")
  252. privVal.SetFile(tempFilePath)
  253. }
  254. dir, _ := ioutil.TempDir("/tmp", "persistent-dummy")
  255. css[i] = newConsensusStateWithConfig(thisConfig, state, privVal, dummy.NewPersistentDummyApplication(dir))
  256. }
  257. return css
  258. }
  259. func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} {
  260. voteCh0 := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 1)
  261. voteCh := make(chan interface{})
  262. go func() {
  263. for {
  264. v := <-voteCh0
  265. vote := v.(types.EventDataVote)
  266. // we only fire for our own votes
  267. if bytes.Equal(addr, vote.Vote.ValidatorAddress) {
  268. voteCh <- v
  269. }
  270. }
  271. }()
  272. return voteCh
  273. }
  274. func readVotes(ch chan interface{}, reads int) chan struct{} {
  275. wg := make(chan struct{})
  276. go func() {
  277. for i := 0; i < reads; i++ {
  278. <-ch // read the precommit event
  279. }
  280. close(wg)
  281. }()
  282. return wg
  283. }
  284. func randGenesisState(numValidators int, randPower bool, minPower int64) (*sm.State, []*types.PrivValidator) {
  285. genDoc, privValidators := randGenesisDoc(numValidators, randPower, minPower)
  286. db := dbm.NewMemDB()
  287. s0 := sm.MakeGenesisState(db, genDoc)
  288. s0.Save()
  289. return s0, privValidators
  290. }
  291. func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.GenesisDoc, []*types.PrivValidator) {
  292. validators := make([]types.GenesisValidator, numValidators)
  293. privValidators := make([]*types.PrivValidator, numValidators)
  294. for i := 0; i < numValidators; i++ {
  295. val, privVal := types.RandValidator(randPower, minPower)
  296. validators[i] = types.GenesisValidator{
  297. PubKey: val.PubKey,
  298. Amount: val.VotingPower,
  299. }
  300. privValidators[i] = privVal
  301. }
  302. sort.Sort(types.PrivValidatorsByAddress(privValidators))
  303. return &types.GenesisDoc{
  304. GenesisTime: time.Now(),
  305. ChainID: config.GetString("chain_id"),
  306. Validators: validators,
  307. }, privValidators
  308. }
  309. func startTestRound(cs *ConsensusState, height, round int) {
  310. cs.enterNewRound(height, round)
  311. cs.startRoutines(0)
  312. }
  313. //--------------------------------
  314. // reactor stuff
  315. func getSwitchIndex(switches []*p2p.Switch, peer *p2p.Peer) int {
  316. for i, s := range switches {
  317. if bytes.Equal(peer.NodeInfo.PubKey.Address(), s.NodeInfo().PubKey.Address()) {
  318. return i
  319. }
  320. }
  321. panic("didnt find peer in switches")
  322. return -1
  323. }
  324. // so we dont violate synchrony assumptions
  325. // TODO: make tests more robust to this instead (handle round changes)
  326. // XXX: especially a problem when running the race detector on circle
  327. func resetConfigTimeouts(config cfg.Config) {
  328. logger.SetLogLevel("info")
  329. //config.Set("log_level", "notice")
  330. config.Set("timeout_propose", 110000) // TODO: crank it to eleventy
  331. // config.Set("timeout_propose_delta", 500)
  332. // config.Set("timeout_prevote", 1000)
  333. // config.Set("timeout_prevote_delta", 500)
  334. // config.Set("timeout_precommit", 1000)
  335. // config.Set("timeout_precommit_delta", 500)
  336. config.Set("timeout_commit", 1000)
  337. }