You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

487 lines
14 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
9 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io/ioutil"
  6. "os"
  7. "path"
  8. "sort"
  9. "sync"
  10. "testing"
  11. "time"
  12. abcicli "github.com/tendermint/abci/client"
  13. abci "github.com/tendermint/abci/types"
  14. bc "github.com/tendermint/tendermint/blockchain"
  15. cfg "github.com/tendermint/tendermint/config"
  16. mempl "github.com/tendermint/tendermint/mempool"
  17. "github.com/tendermint/tendermint/p2p"
  18. sm "github.com/tendermint/tendermint/state"
  19. "github.com/tendermint/tendermint/types"
  20. . "github.com/tendermint/tmlibs/common"
  21. dbm "github.com/tendermint/tmlibs/db"
  22. "github.com/tendermint/tmlibs/log"
  23. "github.com/tendermint/abci/example/counter"
  24. "github.com/tendermint/abci/example/dummy"
  25. "github.com/go-kit/kit/log/term"
  26. )
  27. // genesis, chain_id, priv_val
  28. var config *cfg.Config // NOTE: must be reset for each _test.go file
  29. var ensureTimeout = time.Duration(2)
  30. func ensureDir(dir string, mode os.FileMode) {
  31. if err := EnsureDir(dir, mode); err != nil {
  32. panic(err)
  33. }
  34. }
  35. func ResetConfig(name string) *cfg.Config {
  36. return cfg.ResetTestRoot(name)
  37. }
  38. //-------------------------------------------------------------------------------
  39. // validator stub (a dummy consensus peer we control)
  40. type validatorStub struct {
  41. Index int // Validator index. NOTE: we don't assume validator set changes.
  42. Height int
  43. Round int
  44. *types.PrivValidator
  45. }
  46. var testMinPower = 10
  47. func NewValidatorStub(privValidator *types.PrivValidator, valIndex int) *validatorStub {
  48. return &validatorStub{
  49. Index: valIndex,
  50. PrivValidator: privValidator,
  51. }
  52. }
  53. func (vs *validatorStub) signVote(voteType byte, hash []byte, header types.PartSetHeader) (*types.Vote, error) {
  54. vote := &types.Vote{
  55. ValidatorIndex: vs.Index,
  56. ValidatorAddress: vs.PrivValidator.Address,
  57. Height: vs.Height,
  58. Round: vs.Round,
  59. Type: voteType,
  60. BlockID: types.BlockID{hash, header},
  61. }
  62. err := vs.PrivValidator.SignVote(config.ChainID, vote)
  63. return vote, err
  64. }
  65. // Sign vote for type/hash/header
  66. func signVote(vs *validatorStub, voteType byte, hash []byte, header types.PartSetHeader) *types.Vote {
  67. v, err := vs.signVote(voteType, hash, header)
  68. if err != nil {
  69. panic(fmt.Errorf("failed to sign vote: %v", err))
  70. }
  71. return v
  72. }
  73. func signVotes(voteType byte, hash []byte, header types.PartSetHeader, vss ...*validatorStub) []*types.Vote {
  74. votes := make([]*types.Vote, len(vss))
  75. for i, vs := range vss {
  76. votes[i] = signVote(vs, voteType, hash, header)
  77. }
  78. return votes
  79. }
  80. func incrementHeight(vss ...*validatorStub) {
  81. for _, vs := range vss {
  82. vs.Height += 1
  83. }
  84. }
  85. func incrementRound(vss ...*validatorStub) {
  86. for _, vs := range vss {
  87. vs.Round += 1
  88. }
  89. }
  90. //-------------------------------------------------------------------------------
  91. // Functions for transitioning the consensus state
  92. func startTestRound(cs *ConsensusState, height, round int) {
  93. cs.enterNewRound(height, round)
  94. cs.startRoutines(0)
  95. }
  96. // Create proposal block from cs1 but sign it with vs
  97. func decideProposal(cs1 *ConsensusState, vs *validatorStub, height, round int) (proposal *types.Proposal, block *types.Block) {
  98. block, blockParts := cs1.createProposalBlock()
  99. if block == nil { // on error
  100. panic("error creating proposal block")
  101. }
  102. // Make proposal
  103. polRound, polBlockID := cs1.Votes.POLInfo()
  104. proposal = types.NewProposal(height, round, blockParts.Header(), polRound, polBlockID)
  105. if err := vs.SignProposal(config.ChainID, proposal); err != nil {
  106. panic(err)
  107. }
  108. return
  109. }
  110. func addVotes(to *ConsensusState, votes ...*types.Vote) {
  111. for _, vote := range votes {
  112. to.peerMsgQueue <- msgInfo{Msg: &VoteMessage{vote}}
  113. }
  114. }
  115. func signAddVotes(to *ConsensusState, voteType byte, hash []byte, header types.PartSetHeader, vss ...*validatorStub) {
  116. votes := signVotes(voteType, hash, header, vss...)
  117. addVotes(to, votes...)
  118. }
  119. func validatePrevote(t *testing.T, cs *ConsensusState, round int, privVal *validatorStub, blockHash []byte) {
  120. prevotes := cs.Votes.Prevotes(round)
  121. var vote *types.Vote
  122. if vote = prevotes.GetByAddress(privVal.Address); vote == nil {
  123. panic("Failed to find prevote from validator")
  124. }
  125. if blockHash == nil {
  126. if vote.BlockID.Hash != nil {
  127. panic(fmt.Sprintf("Expected prevote to be for nil, got %X", vote.BlockID.Hash))
  128. }
  129. } else {
  130. if !bytes.Equal(vote.BlockID.Hash, blockHash) {
  131. panic(fmt.Sprintf("Expected prevote to be for %X, got %X", blockHash, vote.BlockID.Hash))
  132. }
  133. }
  134. }
  135. func validateLastPrecommit(t *testing.T, cs *ConsensusState, privVal *validatorStub, blockHash []byte) {
  136. votes := cs.LastCommit
  137. var vote *types.Vote
  138. if vote = votes.GetByAddress(privVal.Address); vote == nil {
  139. panic("Failed to find precommit from validator")
  140. }
  141. if !bytes.Equal(vote.BlockID.Hash, blockHash) {
  142. panic(fmt.Sprintf("Expected precommit to be for %X, got %X", blockHash, vote.BlockID.Hash))
  143. }
  144. }
  145. func validatePrecommit(t *testing.T, cs *ConsensusState, thisRound, lockRound int, privVal *validatorStub, votedBlockHash, lockedBlockHash []byte) {
  146. precommits := cs.Votes.Precommits(thisRound)
  147. var vote *types.Vote
  148. if vote = precommits.GetByAddress(privVal.Address); vote == nil {
  149. panic("Failed to find precommit from validator")
  150. }
  151. if votedBlockHash == nil {
  152. if vote.BlockID.Hash != nil {
  153. panic("Expected precommit to be for nil")
  154. }
  155. } else {
  156. if !bytes.Equal(vote.BlockID.Hash, votedBlockHash) {
  157. panic("Expected precommit to be for proposal block")
  158. }
  159. }
  160. if lockedBlockHash == nil {
  161. if cs.LockedRound != lockRound || cs.LockedBlock != nil {
  162. panic(fmt.Sprintf("Expected to be locked on nil at round %d. Got locked at round %d with block %v", lockRound, cs.LockedRound, cs.LockedBlock))
  163. }
  164. } else {
  165. if cs.LockedRound != lockRound || !bytes.Equal(cs.LockedBlock.Hash(), lockedBlockHash) {
  166. panic(fmt.Sprintf("Expected block to be locked on round %d, got %d. Got locked block %X, expected %X", lockRound, cs.LockedRound, cs.LockedBlock.Hash(), lockedBlockHash))
  167. }
  168. }
  169. }
  170. func validatePrevoteAndPrecommit(t *testing.T, cs *ConsensusState, thisRound, lockRound int, privVal *validatorStub, votedBlockHash, lockedBlockHash []byte) {
  171. // verify the prevote
  172. validatePrevote(t, cs, thisRound, privVal, votedBlockHash)
  173. // verify precommit
  174. cs.mtx.Lock()
  175. validatePrecommit(t, cs, thisRound, lockRound, privVal, votedBlockHash, lockedBlockHash)
  176. cs.mtx.Unlock()
  177. }
  178. // genesis
  179. func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} {
  180. voteCh0 := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 1)
  181. voteCh := make(chan interface{})
  182. go func() {
  183. for {
  184. v := <-voteCh0
  185. vote := v.(types.TMEventData).Unwrap().(types.EventDataVote)
  186. // we only fire for our own votes
  187. if bytes.Equal(addr, vote.Vote.ValidatorAddress) {
  188. voteCh <- v
  189. }
  190. }
  191. }()
  192. return voteCh
  193. }
  194. func readVotes(ch chan interface{}, reads int) chan struct{} {
  195. wg := make(chan struct{})
  196. go func() {
  197. for i := 0; i < reads; i++ {
  198. <-ch // read the precommit event
  199. }
  200. close(wg)
  201. }()
  202. return wg
  203. }
  204. //-------------------------------------------------------------------------------
  205. // consensus states
  206. func newConsensusState(state *sm.State, pv *types.PrivValidator, app abci.Application) *ConsensusState {
  207. return newConsensusStateWithConfig(config, state, pv, app)
  208. }
  209. func newConsensusStateWithConfig(thisConfig *cfg.Config, state *sm.State, pv *types.PrivValidator, app abci.Application) *ConsensusState {
  210. // Get BlockStore
  211. blockDB := dbm.NewMemDB()
  212. blockStore := bc.NewBlockStore(blockDB)
  213. // one for mempool, one for consensus
  214. mtx := new(sync.Mutex)
  215. proxyAppConnMem := abcicli.NewLocalClient(mtx, app)
  216. proxyAppConnCon := abcicli.NewLocalClient(mtx, app)
  217. // Make Mempool
  218. mempool := mempl.NewMempool(thisConfig.Mempool, proxyAppConnMem)
  219. mempool.SetLogger(log.TestingLogger().With("module", "mempool"))
  220. // Make ConsensusReactor
  221. cs := NewConsensusState(thisConfig.Consensus, state, proxyAppConnCon, blockStore, mempool)
  222. cs.SetLogger(log.TestingLogger())
  223. cs.SetPrivValidator(pv)
  224. evsw := types.NewEventSwitch()
  225. evsw.SetLogger(log.TestingLogger().With("module", "events"))
  226. cs.SetEventSwitch(evsw)
  227. evsw.Start()
  228. return cs
  229. }
  230. func loadPrivValidator(config *cfg.Config) *types.PrivValidator {
  231. privValidatorFile := config.PrivValidatorFile()
  232. ensureDir(path.Dir(privValidatorFile), 0700)
  233. privValidator := types.LoadOrGenPrivValidator(privValidatorFile, log.TestingLogger())
  234. privValidator.Reset()
  235. return privValidator
  236. }
  237. func fixedConsensusState() *ConsensusState {
  238. stateDB := dbm.NewMemDB()
  239. state := sm.MakeGenesisStateFromFile(stateDB, config.GenesisFile())
  240. state.SetLogger(log.TestingLogger().With("module", "state"))
  241. privValidator := loadPrivValidator(config)
  242. cs := newConsensusState(state, privValidator, counter.NewCounterApplication(true))
  243. cs.SetLogger(log.TestingLogger())
  244. return cs
  245. }
  246. func fixedConsensusStateDummy() *ConsensusState {
  247. stateDB := dbm.NewMemDB()
  248. state := sm.MakeGenesisStateFromFile(stateDB, config.GenesisFile())
  249. state.SetLogger(log.TestingLogger().With("module", "state"))
  250. privValidator := loadPrivValidator(config)
  251. cs := newConsensusState(state, privValidator, dummy.NewDummyApplication())
  252. cs.SetLogger(log.TestingLogger())
  253. return cs
  254. }
  255. func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) {
  256. // Get State
  257. state, privVals := randGenesisState(nValidators, false, 10)
  258. vss := make([]*validatorStub, nValidators)
  259. cs := newConsensusState(state, privVals[0], counter.NewCounterApplication(true))
  260. cs.SetLogger(log.TestingLogger())
  261. for i := 0; i < nValidators; i++ {
  262. vss[i] = NewValidatorStub(privVals[i], i)
  263. }
  264. // since cs1 starts at 1
  265. incrementHeight(vss[1:]...)
  266. return cs, vss
  267. }
  268. //-------------------------------------------------------------------------------
  269. func ensureNoNewStep(stepCh chan interface{}) {
  270. timeout := time.NewTicker(ensureTimeout * time.Second)
  271. select {
  272. case <-timeout.C:
  273. break
  274. case <-stepCh:
  275. panic("We should be stuck waiting for more votes, not moving to the next step")
  276. }
  277. }
  278. //-------------------------------------------------------------------------------
  279. // consensus nets
  280. // consensusLogger is a TestingLogger which uses a different
  281. // color for each validator ("validator" key must exist).
  282. func consensusLogger() log.Logger {
  283. return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor {
  284. for i := 0; i < len(keyvals)-1; i += 2 {
  285. if keyvals[i] == "validator" {
  286. return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))}
  287. }
  288. }
  289. return term.FgBgColor{}
  290. })
  291. }
  292. func randConsensusNet(nValidators int, testName string, tickerFunc func() TimeoutTicker, appFunc func() abci.Application) []*ConsensusState {
  293. genDoc, privVals := randGenesisDoc(nValidators, false, 10)
  294. css := make([]*ConsensusState, nValidators)
  295. logger := consensusLogger()
  296. for i := 0; i < nValidators; i++ {
  297. db := dbm.NewMemDB() // each state needs its own db
  298. state := sm.MakeGenesisState(db, genDoc)
  299. state.SetLogger(logger.With("module", "state", "validator", i))
  300. state.Save()
  301. thisConfig := ResetConfig(Fmt("%s_%d", testName, i))
  302. ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
  303. css[i] = newConsensusStateWithConfig(thisConfig, state, privVals[i], appFunc())
  304. css[i].SetLogger(logger.With("validator", i))
  305. css[i].SetTimeoutTicker(tickerFunc())
  306. }
  307. return css
  308. }
  309. // nPeers = nValidators + nNotValidator
  310. func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerFunc func() TimeoutTicker, appFunc func() abci.Application) []*ConsensusState {
  311. genDoc, privVals := randGenesisDoc(nValidators, false, int64(testMinPower))
  312. css := make([]*ConsensusState, nPeers)
  313. for i := 0; i < nPeers; i++ {
  314. db := dbm.NewMemDB() // each state needs its own db
  315. state := sm.MakeGenesisState(db, genDoc)
  316. state.SetLogger(log.TestingLogger().With("module", "state"))
  317. state.Save()
  318. thisConfig := ResetConfig(Fmt("%s_%d", testName, i))
  319. ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
  320. var privVal *types.PrivValidator
  321. if i < nValidators {
  322. privVal = privVals[i]
  323. } else {
  324. privVal = types.GenPrivValidator()
  325. _, tempFilePath := Tempfile("priv_validator_")
  326. privVal.SetFile(tempFilePath)
  327. }
  328. css[i] = newConsensusStateWithConfig(thisConfig, state, privVal, appFunc())
  329. css[i].SetLogger(log.TestingLogger())
  330. css[i].SetTimeoutTicker(tickerFunc())
  331. }
  332. return css
  333. }
  334. func getSwitchIndex(switches []*p2p.Switch, peer *p2p.Peer) int {
  335. for i, s := range switches {
  336. if bytes.Equal(peer.NodeInfo.PubKey.Address(), s.NodeInfo().PubKey.Address()) {
  337. return i
  338. }
  339. }
  340. panic("didnt find peer in switches")
  341. return -1
  342. }
  343. //-------------------------------------------------------------------------------
  344. // genesis
  345. func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.GenesisDoc, []*types.PrivValidator) {
  346. validators := make([]types.GenesisValidator, numValidators)
  347. privValidators := make([]*types.PrivValidator, numValidators)
  348. for i := 0; i < numValidators; i++ {
  349. val, privVal := types.RandValidator(randPower, minPower)
  350. validators[i] = types.GenesisValidator{
  351. PubKey: val.PubKey,
  352. Amount: val.VotingPower,
  353. }
  354. privValidators[i] = privVal
  355. }
  356. sort.Sort(types.PrivValidatorsByAddress(privValidators))
  357. return &types.GenesisDoc{
  358. GenesisTime: time.Now(),
  359. ChainID: config.ChainID,
  360. Validators: validators,
  361. }, privValidators
  362. }
  363. func randGenesisState(numValidators int, randPower bool, minPower int64) (*sm.State, []*types.PrivValidator) {
  364. genDoc, privValidators := randGenesisDoc(numValidators, randPower, minPower)
  365. db := dbm.NewMemDB()
  366. s0 := sm.MakeGenesisState(db, genDoc)
  367. s0.SetLogger(log.TestingLogger().With("module", "state"))
  368. s0.Save()
  369. return s0, privValidators
  370. }
  371. //------------------------------------
  372. // mock ticker
  373. func newMockTickerFunc(onlyOnce bool) func() TimeoutTicker {
  374. return func() TimeoutTicker {
  375. return &mockTicker{
  376. c: make(chan timeoutInfo, 10),
  377. onlyOnce: onlyOnce,
  378. }
  379. }
  380. }
  381. // mock ticker only fires on RoundStepNewHeight
  382. // and only once if onlyOnce=true
  383. type mockTicker struct {
  384. c chan timeoutInfo
  385. mtx sync.Mutex
  386. onlyOnce bool
  387. fired bool
  388. }
  389. func (m *mockTicker) Start() (bool, error) {
  390. return true, nil
  391. }
  392. func (m *mockTicker) Stop() bool {
  393. return true
  394. }
  395. func (m *mockTicker) ScheduleTimeout(ti timeoutInfo) {
  396. m.mtx.Lock()
  397. defer m.mtx.Unlock()
  398. if m.onlyOnce && m.fired {
  399. return
  400. }
  401. if ti.Step == RoundStepNewHeight {
  402. m.c <- ti
  403. m.fired = true
  404. }
  405. }
  406. func (m *mockTicker) Chan() <-chan timeoutInfo {
  407. return m.c
  408. }
  409. func (mockTicker) SetLogger(log.Logger) {
  410. }
  411. //------------------------------------
  412. func newCounter() abci.Application {
  413. return counter.NewCounterApplication(true)
  414. }
  415. func newPersistentDummy() abci.Application {
  416. dir, _ := ioutil.TempDir("/tmp", "persistent-dummy")
  417. return dummy.NewPersistentDummyApplication(dir)
  418. }