You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

482 lines
14 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
7 years ago
7 years ago
7 years ago
7 years ago
8 years ago
8 years ago
8 years ago
8 years ago
7 years ago
8 years ago
8 years ago
7 years ago
8 years ago
8 years ago
8 years ago
9 years ago
8 years ago
8 years ago
8 years ago
8 years ago
7 years ago
7 years ago
8 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
8 years ago
8 years ago
8 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io/ioutil"
  6. "os"
  7. "path"
  8. "sort"
  9. "sync"
  10. "testing"
  11. "time"
  12. abcicli "github.com/tendermint/abci/client"
  13. abci "github.com/tendermint/abci/types"
  14. bc "github.com/tendermint/tendermint/blockchain"
  15. cfg "github.com/tendermint/tendermint/config"
  16. cstypes "github.com/tendermint/tendermint/consensus/types"
  17. mempl "github.com/tendermint/tendermint/mempool"
  18. "github.com/tendermint/tendermint/p2p"
  19. sm "github.com/tendermint/tendermint/state"
  20. "github.com/tendermint/tendermint/types"
  21. cmn "github.com/tendermint/tmlibs/common"
  22. dbm "github.com/tendermint/tmlibs/db"
  23. "github.com/tendermint/tmlibs/log"
  24. "github.com/tendermint/abci/example/counter"
  25. "github.com/tendermint/abci/example/dummy"
  26. "github.com/go-kit/kit/log/term"
  27. )
  28. // genesis, chain_id, priv_val
  29. var config *cfg.Config // NOTE: must be reset for each _test.go file
  30. var ensureTimeout = time.Second * 2
  31. func ensureDir(dir string, mode os.FileMode) {
  32. if err := cmn.EnsureDir(dir, mode); err != nil {
  33. panic(err)
  34. }
  35. }
  36. func ResetConfig(name string) *cfg.Config {
  37. return cfg.ResetTestRoot(name)
  38. }
  39. //-------------------------------------------------------------------------------
  40. // validator stub (a dummy consensus peer we control)
  41. type validatorStub struct {
  42. Index int // Validator index. NOTE: we don't assume validator set changes.
  43. Height int
  44. Round int
  45. types.PrivValidator
  46. }
  47. var testMinPower = 10
  48. func NewValidatorStub(privValidator types.PrivValidator, valIndex int) *validatorStub {
  49. return &validatorStub{
  50. Index: valIndex,
  51. PrivValidator: privValidator,
  52. }
  53. }
  54. func (vs *validatorStub) signVote(voteType byte, hash []byte, header types.PartSetHeader) (*types.Vote, error) {
  55. vote := &types.Vote{
  56. ValidatorIndex: vs.Index,
  57. ValidatorAddress: vs.PrivValidator.GetAddress(),
  58. Height: vs.Height,
  59. Round: vs.Round,
  60. Type: voteType,
  61. BlockID: types.BlockID{hash, header},
  62. }
  63. err := vs.PrivValidator.SignVote(config.ChainID, vote)
  64. return vote, err
  65. }
  66. // Sign vote for type/hash/header
  67. func signVote(vs *validatorStub, voteType byte, hash []byte, header types.PartSetHeader) *types.Vote {
  68. v, err := vs.signVote(voteType, hash, header)
  69. if err != nil {
  70. panic(fmt.Errorf("failed to sign vote: %v", err))
  71. }
  72. return v
  73. }
  74. func signVotes(voteType byte, hash []byte, header types.PartSetHeader, vss ...*validatorStub) []*types.Vote {
  75. votes := make([]*types.Vote, len(vss))
  76. for i, vs := range vss {
  77. votes[i] = signVote(vs, voteType, hash, header)
  78. }
  79. return votes
  80. }
  81. func incrementHeight(vss ...*validatorStub) {
  82. for _, vs := range vss {
  83. vs.Height += 1
  84. }
  85. }
  86. func incrementRound(vss ...*validatorStub) {
  87. for _, vs := range vss {
  88. vs.Round += 1
  89. }
  90. }
  91. //-------------------------------------------------------------------------------
  92. // Functions for transitioning the consensus state
  93. func startTestRound(cs *ConsensusState, height, round int) {
  94. cs.enterNewRound(height, round)
  95. cs.startRoutines(0)
  96. }
  97. // Create proposal block from cs1 but sign it with vs
  98. func decideProposal(cs1 *ConsensusState, vs *validatorStub, height, round int) (proposal *types.Proposal, block *types.Block) {
  99. block, blockParts := cs1.createProposalBlock()
  100. if block == nil { // on error
  101. panic("error creating proposal block")
  102. }
  103. // Make proposal
  104. polRound, polBlockID := cs1.Votes.POLInfo()
  105. proposal = types.NewProposal(height, round, blockParts.Header(), polRound, polBlockID)
  106. if err := vs.SignProposal(config.ChainID, proposal); err != nil {
  107. panic(err)
  108. }
  109. return
  110. }
  111. func addVotes(to *ConsensusState, votes ...*types.Vote) {
  112. for _, vote := range votes {
  113. to.peerMsgQueue <- msgInfo{Msg: &VoteMessage{vote}}
  114. }
  115. }
  116. func signAddVotes(to *ConsensusState, voteType byte, hash []byte, header types.PartSetHeader, vss ...*validatorStub) {
  117. votes := signVotes(voteType, hash, header, vss...)
  118. addVotes(to, votes...)
  119. }
  120. func validatePrevote(t *testing.T, cs *ConsensusState, round int, privVal *validatorStub, blockHash []byte) {
  121. prevotes := cs.Votes.Prevotes(round)
  122. var vote *types.Vote
  123. if vote = prevotes.GetByAddress(privVal.GetAddress()); vote == nil {
  124. panic("Failed to find prevote from validator")
  125. }
  126. if blockHash == nil {
  127. if vote.BlockID.Hash != nil {
  128. panic(fmt.Sprintf("Expected prevote to be for nil, got %X", vote.BlockID.Hash))
  129. }
  130. } else {
  131. if !bytes.Equal(vote.BlockID.Hash, blockHash) {
  132. panic(fmt.Sprintf("Expected prevote to be for %X, got %X", blockHash, vote.BlockID.Hash))
  133. }
  134. }
  135. }
  136. func validateLastPrecommit(t *testing.T, cs *ConsensusState, privVal *validatorStub, blockHash []byte) {
  137. votes := cs.LastCommit
  138. var vote *types.Vote
  139. if vote = votes.GetByAddress(privVal.GetAddress()); vote == nil {
  140. panic("Failed to find precommit from validator")
  141. }
  142. if !bytes.Equal(vote.BlockID.Hash, blockHash) {
  143. panic(fmt.Sprintf("Expected precommit to be for %X, got %X", blockHash, vote.BlockID.Hash))
  144. }
  145. }
  146. func validatePrecommit(t *testing.T, cs *ConsensusState, thisRound, lockRound int, privVal *validatorStub, votedBlockHash, lockedBlockHash []byte) {
  147. precommits := cs.Votes.Precommits(thisRound)
  148. var vote *types.Vote
  149. if vote = precommits.GetByAddress(privVal.GetAddress()); vote == nil {
  150. panic("Failed to find precommit from validator")
  151. }
  152. if votedBlockHash == nil {
  153. if vote.BlockID.Hash != nil {
  154. panic("Expected precommit to be for nil")
  155. }
  156. } else {
  157. if !bytes.Equal(vote.BlockID.Hash, votedBlockHash) {
  158. panic("Expected precommit to be for proposal block")
  159. }
  160. }
  161. if lockedBlockHash == nil {
  162. if cs.LockedRound != lockRound || cs.LockedBlock != nil {
  163. panic(fmt.Sprintf("Expected to be locked on nil at round %d. Got locked at round %d with block %v", lockRound, cs.LockedRound, cs.LockedBlock))
  164. }
  165. } else {
  166. if cs.LockedRound != lockRound || !bytes.Equal(cs.LockedBlock.Hash(), lockedBlockHash) {
  167. panic(fmt.Sprintf("Expected block to be locked on round %d, got %d. Got locked block %X, expected %X", lockRound, cs.LockedRound, cs.LockedBlock.Hash(), lockedBlockHash))
  168. }
  169. }
  170. }
  171. func validatePrevoteAndPrecommit(t *testing.T, cs *ConsensusState, thisRound, lockRound int, privVal *validatorStub, votedBlockHash, lockedBlockHash []byte) {
  172. // verify the prevote
  173. validatePrevote(t, cs, thisRound, privVal, votedBlockHash)
  174. // verify precommit
  175. cs.mtx.Lock()
  176. validatePrecommit(t, cs, thisRound, lockRound, privVal, votedBlockHash, lockedBlockHash)
  177. cs.mtx.Unlock()
  178. }
  179. // genesis
  180. func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} {
  181. voteCh0 := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 1)
  182. voteCh := make(chan interface{})
  183. go func() {
  184. for {
  185. v := <-voteCh0
  186. vote := v.(types.TMEventData).Unwrap().(types.EventDataVote)
  187. // we only fire for our own votes
  188. if bytes.Equal(addr, vote.Vote.ValidatorAddress) {
  189. voteCh <- v
  190. }
  191. }
  192. }()
  193. return voteCh
  194. }
  195. //-------------------------------------------------------------------------------
  196. // consensus states
  197. func newConsensusState(state *sm.State, pv types.PrivValidator, app abci.Application) *ConsensusState {
  198. return newConsensusStateWithConfig(config, state, pv, app)
  199. }
  200. func newConsensusStateWithConfig(thisConfig *cfg.Config, state *sm.State, pv types.PrivValidator, app abci.Application) *ConsensusState {
  201. // Get BlockStore
  202. blockDB := dbm.NewMemDB()
  203. blockStore := bc.NewBlockStore(blockDB)
  204. // one for mempool, one for consensus
  205. mtx := new(sync.Mutex)
  206. proxyAppConnMem := abcicli.NewLocalClient(mtx, app)
  207. proxyAppConnCon := abcicli.NewLocalClient(mtx, app)
  208. // Make Mempool
  209. mempool := mempl.NewMempool(thisConfig.Mempool, proxyAppConnMem, 0)
  210. mempool.SetLogger(log.TestingLogger().With("module", "mempool"))
  211. if thisConfig.Consensus.WaitForTxs() {
  212. mempool.EnableTxsAvailable()
  213. }
  214. // Make ConsensusReactor
  215. cs := NewConsensusState(thisConfig.Consensus, state, proxyAppConnCon, blockStore, mempool)
  216. cs.SetLogger(log.TestingLogger())
  217. cs.SetPrivValidator(pv)
  218. evsw := types.NewEventSwitch()
  219. evsw.SetLogger(log.TestingLogger().With("module", "events"))
  220. cs.SetEventSwitch(evsw)
  221. evsw.Start()
  222. return cs
  223. }
  224. func loadPrivValidator(config *cfg.Config) *types.PrivValidatorFS {
  225. privValidatorFile := config.PrivValidatorFile()
  226. ensureDir(path.Dir(privValidatorFile), 0700)
  227. privValidator := types.LoadOrGenPrivValidatorFS(privValidatorFile)
  228. privValidator.Reset()
  229. return privValidator
  230. }
  231. func fixedConsensusStateDummy() *ConsensusState {
  232. stateDB := dbm.NewMemDB()
  233. state, _ := sm.MakeGenesisStateFromFile(stateDB, config.GenesisFile())
  234. state.SetLogger(log.TestingLogger().With("module", "state"))
  235. privValidator := loadPrivValidator(config)
  236. cs := newConsensusState(state, privValidator, dummy.NewDummyApplication())
  237. cs.SetLogger(log.TestingLogger())
  238. return cs
  239. }
  240. func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) {
  241. // Get State
  242. state, privVals := randGenesisState(nValidators, false, 10)
  243. vss := make([]*validatorStub, nValidators)
  244. cs := newConsensusState(state, privVals[0], counter.NewCounterApplication(true))
  245. cs.SetLogger(log.TestingLogger())
  246. for i := 0; i < nValidators; i++ {
  247. vss[i] = NewValidatorStub(privVals[i], i)
  248. }
  249. // since cs1 starts at 1
  250. incrementHeight(vss[1:]...)
  251. return cs, vss
  252. }
  253. //-------------------------------------------------------------------------------
  254. func ensureNoNewStep(stepCh chan interface{}) {
  255. timer := time.NewTimer(ensureTimeout)
  256. select {
  257. case <-timer.C:
  258. break
  259. case <-stepCh:
  260. panic("We should be stuck waiting, not moving to the next step")
  261. }
  262. }
  263. func ensureNewStep(stepCh chan interface{}) {
  264. timer := time.NewTimer(ensureTimeout)
  265. select {
  266. case <-timer.C:
  267. panic("We shouldnt be stuck waiting")
  268. case <-stepCh:
  269. break
  270. }
  271. }
  272. //-------------------------------------------------------------------------------
  273. // consensus nets
  274. // consensusLogger is a TestingLogger which uses a different
  275. // color for each validator ("validator" key must exist).
  276. func consensusLogger() log.Logger {
  277. return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor {
  278. for i := 0; i < len(keyvals)-1; i += 2 {
  279. if keyvals[i] == "validator" {
  280. return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))}
  281. }
  282. }
  283. return term.FgBgColor{}
  284. })
  285. }
  286. func randConsensusNet(nValidators int, testName string, tickerFunc func() TimeoutTicker, appFunc func() abci.Application, configOpts ...func(*cfg.Config)) []*ConsensusState {
  287. genDoc, privVals := randGenesisDoc(nValidators, false, 10)
  288. css := make([]*ConsensusState, nValidators)
  289. logger := consensusLogger()
  290. for i := 0; i < nValidators; i++ {
  291. db := dbm.NewMemDB() // each state needs its own db
  292. state, _ := sm.MakeGenesisState(db, genDoc)
  293. state.SetLogger(logger.With("module", "state", "validator", i))
  294. state.Save()
  295. thisConfig := ResetConfig(cmn.Fmt("%s_%d", testName, i))
  296. for _, opt := range configOpts {
  297. opt(thisConfig)
  298. }
  299. ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
  300. css[i] = newConsensusStateWithConfig(thisConfig, state, privVals[i], appFunc())
  301. css[i].SetLogger(logger.With("validator", i))
  302. css[i].SetTimeoutTicker(tickerFunc())
  303. }
  304. return css
  305. }
  306. // nPeers = nValidators + nNotValidator
  307. func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerFunc func() TimeoutTicker, appFunc func() abci.Application) []*ConsensusState {
  308. genDoc, privVals := randGenesisDoc(nValidators, false, int64(testMinPower))
  309. css := make([]*ConsensusState, nPeers)
  310. for i := 0; i < nPeers; i++ {
  311. db := dbm.NewMemDB() // each state needs its own db
  312. state, _ := sm.MakeGenesisState(db, genDoc)
  313. state.SetLogger(log.TestingLogger().With("module", "state"))
  314. state.Save()
  315. thisConfig := ResetConfig(cmn.Fmt("%s_%d", testName, i))
  316. ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
  317. var privVal types.PrivValidator
  318. if i < nValidators {
  319. privVal = privVals[i]
  320. } else {
  321. _, tempFilePath := cmn.Tempfile("priv_validator_")
  322. privVal = types.GenPrivValidatorFS(tempFilePath)
  323. }
  324. css[i] = newConsensusStateWithConfig(thisConfig, state, privVal, appFunc())
  325. css[i].SetLogger(log.TestingLogger())
  326. css[i].SetTimeoutTicker(tickerFunc())
  327. }
  328. return css
  329. }
  330. func getSwitchIndex(switches []*p2p.Switch, peer p2p.Peer) int {
  331. for i, s := range switches {
  332. if bytes.Equal(peer.NodeInfo().PubKey.Address(), s.NodeInfo().PubKey.Address()) {
  333. return i
  334. }
  335. }
  336. panic("didnt find peer in switches")
  337. return -1
  338. }
  339. //-------------------------------------------------------------------------------
  340. // genesis
  341. func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.GenesisDoc, []*types.PrivValidatorFS) {
  342. validators := make([]types.GenesisValidator, numValidators)
  343. privValidators := make([]*types.PrivValidatorFS, numValidators)
  344. for i := 0; i < numValidators; i++ {
  345. val, privVal := types.RandValidator(randPower, minPower)
  346. validators[i] = types.GenesisValidator{
  347. PubKey: val.PubKey,
  348. Power: val.VotingPower,
  349. }
  350. privValidators[i] = privVal
  351. }
  352. sort.Sort(types.PrivValidatorsByAddress(privValidators))
  353. return &types.GenesisDoc{
  354. GenesisTime: time.Now(),
  355. ChainID: config.ChainID,
  356. Validators: validators,
  357. }, privValidators
  358. }
  359. func randGenesisState(numValidators int, randPower bool, minPower int64) (*sm.State, []*types.PrivValidatorFS) {
  360. genDoc, privValidators := randGenesisDoc(numValidators, randPower, minPower)
  361. db := dbm.NewMemDB()
  362. s0, _ := sm.MakeGenesisState(db, genDoc)
  363. s0.SetLogger(log.TestingLogger().With("module", "state"))
  364. s0.Save()
  365. return s0, privValidators
  366. }
  367. //------------------------------------
  368. // mock ticker
  369. func newMockTickerFunc(onlyOnce bool) func() TimeoutTicker {
  370. return func() TimeoutTicker {
  371. return &mockTicker{
  372. c: make(chan timeoutInfo, 10),
  373. onlyOnce: onlyOnce,
  374. }
  375. }
  376. }
  377. // mock ticker only fires on RoundStepNewHeight
  378. // and only once if onlyOnce=true
  379. type mockTicker struct {
  380. c chan timeoutInfo
  381. mtx sync.Mutex
  382. onlyOnce bool
  383. fired bool
  384. }
  385. func (m *mockTicker) Start() (bool, error) {
  386. return true, nil
  387. }
  388. func (m *mockTicker) Stop() bool {
  389. return true
  390. }
  391. func (m *mockTicker) ScheduleTimeout(ti timeoutInfo) {
  392. m.mtx.Lock()
  393. defer m.mtx.Unlock()
  394. if m.onlyOnce && m.fired {
  395. return
  396. }
  397. if ti.Step == cstypes.RoundStepNewHeight {
  398. m.c <- ti
  399. m.fired = true
  400. }
  401. }
  402. func (m *mockTicker) Chan() <-chan timeoutInfo {
  403. return m.c
  404. }
  405. func (mockTicker) SetLogger(log.Logger) {
  406. }
  407. //------------------------------------
  408. func newCounter() abci.Application {
  409. return counter.NewCounterApplication(true)
  410. }
  411. func newPersistentDummy() abci.Application {
  412. dir, _ := ioutil.TempDir("/tmp", "persistent-dummy")
  413. return dummy.NewPersistentDummyApplication(dir)
  414. }