You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

606 lines
18 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
9 years ago
8 years ago
8 years ago
8 years ago
7 years ago
8 years ago
8 years ago
8 years ago
7 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "os"
  9. "path"
  10. "testing"
  11. "time"
  12. "github.com/tendermint/abci/example/dummy"
  13. abci "github.com/tendermint/abci/types"
  14. crypto "github.com/tendermint/go-crypto"
  15. wire "github.com/tendermint/go-wire"
  16. cmn "github.com/tendermint/tmlibs/common"
  17. dbm "github.com/tendermint/tmlibs/db"
  18. cfg "github.com/tendermint/tendermint/config"
  19. "github.com/tendermint/tendermint/proxy"
  20. sm "github.com/tendermint/tendermint/state"
  21. "github.com/tendermint/tendermint/types"
  22. "github.com/tendermint/tmlibs/log"
  23. )
  24. func init() {
  25. config = ResetConfig("consensus_replay_test")
  26. }
  27. // These tests ensure we can always recover from failure at any part of the consensus process.
  28. // There are two general failure scenarios: failure during consensus, and failure while applying the block.
  29. // Only the latter interacts with the app and store,
  30. // but the former has to deal with restrictions on re-use of priv_validator keys.
  31. // The `WAL Tests` are for failures during the consensus;
  32. // the `Handshake Tests` are for failures in applying the block.
  33. // With the help of the WAL, we can recover from it all!
  34. // NOTE: Files in this dir are generated by running the `build.sh` therein.
  35. // It's a simple way to generate wals for a single block, or multiple blocks, with random transactions,
  36. // and different part sizes. The output is not deterministic, and the stepChanges may need to be adjusted
  37. // after running it (eg. sometimes small_block2 will have 5 block parts, sometimes 6).
  38. // It should only have to be re-run if there is some breaking change to the consensus data structures (eg. blocks, votes)
  39. // or to the behaviour of the app (eg. computes app hash differently)
  40. var data_dir = path.Join(cmn.GoPath, "src/github.com/tendermint/tendermint/consensus", "test_data")
  41. //------------------------------------------------------------------------------------------
  42. // WAL Tests
  43. // TODO: It would be better to verify explicitly which states we can recover from without the wal
  44. // and which ones we need the wal for - then we'd also be able to only flush the
  45. // wal writer when we need to, instead of with every message.
  46. // the priv validator changes step at these lines for a block with 1 val and 1 part
  47. var baseStepChanges = []int{3, 6, 8}
  48. // test recovery from each line in each testCase
  49. var testCases = []*testCase{
  50. newTestCase("empty_block", baseStepChanges), // empty block (has 1 block part)
  51. newTestCase("small_block1", baseStepChanges), // small block with txs in 1 block part
  52. newTestCase("small_block2", []int{3, 12, 14}), // small block with txs across 6 smaller block parts
  53. }
  54. type testCase struct {
  55. name string
  56. log []byte //full cs wal
  57. stepMap map[int]int8 // map lines of log to privval step
  58. proposeLine int
  59. prevoteLine int
  60. precommitLine int
  61. }
  62. func newTestCase(name string, stepChanges []int) *testCase {
  63. if len(stepChanges) != 3 {
  64. panic(cmn.Fmt("a full wal has 3 step changes! Got array %v", stepChanges))
  65. }
  66. return &testCase{
  67. name: name,
  68. log: readWAL(path.Join(data_dir, name+".cswal")),
  69. stepMap: newMapFromChanges(stepChanges),
  70. proposeLine: stepChanges[0],
  71. prevoteLine: stepChanges[1],
  72. precommitLine: stepChanges[2],
  73. }
  74. }
  75. func newMapFromChanges(changes []int) map[int]int8 {
  76. changes = append(changes, changes[2]+1) // so we add the last step change to the map
  77. m := make(map[int]int8)
  78. var count int
  79. for changeNum, nextChange := range changes {
  80. for ; count < nextChange; count++ {
  81. m[count] = int8(changeNum)
  82. }
  83. }
  84. return m
  85. }
  86. func readWAL(p string) []byte {
  87. b, err := ioutil.ReadFile(p)
  88. if err != nil {
  89. panic(err)
  90. }
  91. return b
  92. }
  93. func writeWAL(walMsgs []byte) string {
  94. walFile, err := ioutil.TempFile("", "wal")
  95. if err != nil {
  96. panic(fmt.Errorf("failed to create temp WAL file: %v", err))
  97. }
  98. _, err = walFile.Write(walMsgs)
  99. if err != nil {
  100. panic(fmt.Errorf("failed to write to temp WAL file: %v", err))
  101. }
  102. if err := walFile.Close(); err != nil {
  103. panic(fmt.Errorf("failed to close temp WAL file: %v", err))
  104. }
  105. return walFile.Name()
  106. }
  107. func waitForBlock(newBlockCh chan interface{}, thisCase *testCase, i int) {
  108. after := time.After(time.Second * 10)
  109. select {
  110. case <-newBlockCh:
  111. case <-after:
  112. panic(cmn.Fmt("Timed out waiting for new block for case '%s' line %d", thisCase.name, i))
  113. }
  114. }
  115. func runReplayTest(t *testing.T, cs *ConsensusState, walFile string, newBlockCh chan interface{},
  116. thisCase *testCase, i int) {
  117. cs.config.SetWalFile(walFile)
  118. started, err := cs.Start()
  119. if err != nil {
  120. t.Fatalf("Cannot start consensus: %v", err)
  121. }
  122. if !started {
  123. t.Error("Consensus did not start")
  124. }
  125. // Wait to make a new block.
  126. // This is just a signal that we haven't halted; its not something contained in the WAL itself.
  127. // Assuming the consensus state is running, replay of any WAL, including the empty one,
  128. // should eventually be followed by a new block, or else something is wrong
  129. waitForBlock(newBlockCh, thisCase, i)
  130. cs.evsw.Stop()
  131. cs.Stop()
  132. LOOP:
  133. for {
  134. select {
  135. case <-newBlockCh:
  136. default:
  137. break LOOP
  138. }
  139. }
  140. cs.Wait()
  141. }
  142. func toPV(pv types.PrivValidator) *types.PrivValidatorFS {
  143. return pv.(*types.PrivValidatorFS)
  144. }
  145. func setupReplayTest(t *testing.T, thisCase *testCase, nLines int, crashAfter bool) (*ConsensusState, chan interface{}, []byte, string) {
  146. t.Log("-------------------------------------")
  147. t.Logf("Starting replay test %v (of %d lines of WAL). Crash after = %v", thisCase.name, nLines, crashAfter)
  148. lineStep := nLines
  149. if crashAfter {
  150. lineStep -= 1
  151. }
  152. split := bytes.Split(thisCase.log, walSeparator)
  153. lastMsg := split[nLines]
  154. // we write those lines up to (not including) one with the signature
  155. b := bytes.Join(split[:nLines], walSeparator)
  156. b = append(b, walSeparator...)
  157. walFile := writeWAL(b)
  158. cs := fixedConsensusStateDummy()
  159. // set the last step according to when we crashed vs the wal
  160. toPV(cs.privValidator).LastHeight = 1 // first block
  161. toPV(cs.privValidator).LastStep = thisCase.stepMap[lineStep]
  162. t.Logf("[WARN] setupReplayTest LastStep=%v", toPV(cs.privValidator).LastStep)
  163. newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1)
  164. return cs, newBlockCh, lastMsg, walFile
  165. }
  166. func readTimedWALMessage(t *testing.T, rawMsg []byte) TimedWALMessage {
  167. b := bytes.NewBuffer(rawMsg)
  168. // because rawMsg does not contain a separator and WALDecoder#Decode expects it
  169. _, err := b.Write(walSeparator)
  170. if err != nil {
  171. t.Fatal(err)
  172. }
  173. dec := NewWALDecoder(b)
  174. msg, err := dec.Decode()
  175. if err != nil {
  176. t.Fatalf("Error reading json data: %v", err)
  177. }
  178. return *msg
  179. }
  180. //-----------------------------------------------
  181. // Test the log at every iteration, and set the privVal last step
  182. // as if the log was written after signing, before the crash
  183. func TestWALCrashAfterWrite(t *testing.T) {
  184. for _, thisCase := range testCases {
  185. splitSize := bytes.Count(thisCase.log, walSeparator)
  186. for i := 0; i < splitSize-1; i++ {
  187. t.Run(fmt.Sprintf("%s:%d", thisCase.name, i), func(t *testing.T) {
  188. cs, newBlockCh, _, walFile := setupReplayTest(t, thisCase, i+1, true)
  189. cs.config.TimeoutPropose = 100
  190. runReplayTest(t, cs, walFile, newBlockCh, thisCase, i+1)
  191. // cleanup
  192. os.Remove(walFile)
  193. })
  194. }
  195. }
  196. }
  197. //-----------------------------------------------
  198. // Test the log as if we crashed after signing but before writing.
  199. // This relies on privValidator.LastSignature being set
  200. func TestWALCrashBeforeWritePropose(t *testing.T) {
  201. for _, thisCase := range testCases {
  202. lineNum := thisCase.proposeLine
  203. t.Run(fmt.Sprintf("%s:%d", thisCase.name, lineNum), func(t *testing.T) {
  204. // setup replay test where last message is a proposal
  205. cs, newBlockCh, proposalMsg, walFile := setupReplayTest(t, thisCase, lineNum, false)
  206. cs.config.TimeoutPropose = 100
  207. msg := readTimedWALMessage(t, proposalMsg)
  208. proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage)
  209. // Set LastSig
  210. toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal)
  211. toPV(cs.privValidator).LastSignature = proposal.Proposal.Signature
  212. runReplayTest(t, cs, walFile, newBlockCh, thisCase, lineNum)
  213. // cleanup
  214. os.Remove(walFile)
  215. })
  216. }
  217. }
  218. func TestWALCrashBeforeWritePrevote(t *testing.T) {
  219. for _, thisCase := range testCases {
  220. testReplayCrashBeforeWriteVote(t, thisCase, thisCase.prevoteLine, types.EventStringCompleteProposal())
  221. }
  222. }
  223. func TestWALCrashBeforeWritePrecommit(t *testing.T) {
  224. for _, thisCase := range testCases {
  225. testReplayCrashBeforeWriteVote(t, thisCase, thisCase.precommitLine, types.EventStringPolka())
  226. }
  227. }
  228. func testReplayCrashBeforeWriteVote(t *testing.T, thisCase *testCase, lineNum int, eventString string) {
  229. // setup replay test where last message is a vote
  230. cs, newBlockCh, voteMsg, walFile := setupReplayTest(t, thisCase, lineNum, false)
  231. types.AddListenerForEvent(cs.evsw, "tester", eventString, func(data types.TMEventData) {
  232. msg := readTimedWALMessage(t, voteMsg)
  233. vote := msg.Msg.(msgInfo).Msg.(*VoteMessage)
  234. // Set LastSig
  235. toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote)
  236. toPV(cs.privValidator).LastSignature = vote.Vote.Signature
  237. })
  238. runReplayTest(t, cs, walFile, newBlockCh, thisCase, lineNum)
  239. }
  240. //------------------------------------------------------------------------------------------
  241. // Handshake Tests
  242. var (
  243. NUM_BLOCKS = 6 // number of blocks in the test_data/many_blocks.cswal
  244. mempool = types.MockMempool{}
  245. )
  246. //---------------------------------------
  247. // Test handshake/replay
  248. // 0 - all synced up
  249. // 1 - saved block but app and state are behind
  250. // 2 - save block and committed but state is behind
  251. var modes = []uint{0, 1, 2}
  252. // Sync from scratch
  253. func TestHandshakeReplayAll(t *testing.T) {
  254. for _, m := range modes {
  255. testHandshakeReplay(t, 0, m)
  256. }
  257. }
  258. // Sync many, not from scratch
  259. func TestHandshakeReplaySome(t *testing.T) {
  260. for _, m := range modes {
  261. testHandshakeReplay(t, 1, m)
  262. }
  263. }
  264. // Sync from lagging by one
  265. func TestHandshakeReplayOne(t *testing.T) {
  266. for _, m := range modes {
  267. testHandshakeReplay(t, NUM_BLOCKS-1, m)
  268. }
  269. }
  270. // Sync from caught up
  271. func TestHandshakeReplayNone(t *testing.T) {
  272. for _, m := range modes {
  273. testHandshakeReplay(t, NUM_BLOCKS, m)
  274. }
  275. }
  276. // Make some blocks. Start a fresh app and apply nBlocks blocks. Then restart the app and sync it up with the remaining blocks
  277. func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
  278. config := ResetConfig("proxy_test_")
  279. // copy the many_blocks file
  280. walBody, err := cmn.ReadFile(path.Join(data_dir, "many_blocks.cswal"))
  281. if err != nil {
  282. t.Fatal(err)
  283. }
  284. walFile := writeWAL(walBody)
  285. config.Consensus.SetWalFile(walFile)
  286. privVal := types.LoadPrivValidatorFS(config.PrivValidatorFile())
  287. wal, err := NewWAL(walFile, false)
  288. if err != nil {
  289. t.Fatal(err)
  290. }
  291. wal.SetLogger(log.TestingLogger())
  292. if _, err := wal.Start(); err != nil {
  293. t.Fatal(err)
  294. }
  295. chain, commits, err := makeBlockchainFromWAL(wal)
  296. if err != nil {
  297. t.Fatalf(err.Error())
  298. }
  299. state, store := stateAndStore(config, privVal.GetPubKey())
  300. store.chain = chain
  301. store.commits = commits
  302. // run the chain through state.ApplyBlock to build up the tendermint state
  303. latestAppHash := buildTMStateFromChain(config, state, chain, mode)
  304. // make a new client creator
  305. dummyApp := dummy.NewPersistentDummyApplication(path.Join(config.DBDir(), "2"))
  306. clientCreator2 := proxy.NewLocalClientCreator(dummyApp)
  307. if nBlocks > 0 {
  308. // run nBlocks against a new client to build up the app state.
  309. // use a throwaway tendermint state
  310. proxyApp := proxy.NewAppConns(clientCreator2, nil)
  311. state, _ := stateAndStore(config, privVal.GetPubKey())
  312. buildAppStateFromChain(proxyApp, state, chain, nBlocks, mode)
  313. }
  314. // now start the app using the handshake - it should sync
  315. handshaker := NewHandshaker(state, store)
  316. proxyApp := proxy.NewAppConns(clientCreator2, handshaker)
  317. if _, err := proxyApp.Start(); err != nil {
  318. t.Fatalf("Error starting proxy app connections: %v", err)
  319. }
  320. // get the latest app hash from the app
  321. res, err := proxyApp.Query().InfoSync(abci.RequestInfo{""})
  322. if err != nil {
  323. t.Fatal(err)
  324. }
  325. // the app hash should be synced up
  326. if !bytes.Equal(latestAppHash, res.LastBlockAppHash) {
  327. t.Fatalf("Expected app hashes to match after handshake/replay. got %X, expected %X", res.LastBlockAppHash, latestAppHash)
  328. }
  329. expectedBlocksToSync := NUM_BLOCKS - nBlocks
  330. if nBlocks == NUM_BLOCKS && mode > 0 {
  331. expectedBlocksToSync += 1
  332. } else if nBlocks > 0 && mode == 1 {
  333. expectedBlocksToSync += 1
  334. }
  335. if handshaker.NBlocks() != expectedBlocksToSync {
  336. t.Fatalf("Expected handshake to sync %d blocks, got %d", expectedBlocksToSync, handshaker.NBlocks())
  337. }
  338. }
  339. func applyBlock(st *sm.State, blk *types.Block, proxyApp proxy.AppConns) {
  340. testPartSize := st.Params.BlockPartSizeBytes
  341. err := st.ApplyBlock(nil, proxyApp.Consensus(), blk, blk.MakePartSet(testPartSize).Header(), mempool)
  342. if err != nil {
  343. panic(err)
  344. }
  345. }
  346. func buildAppStateFromChain(proxyApp proxy.AppConns,
  347. state *sm.State, chain []*types.Block, nBlocks int, mode uint) {
  348. // start a new app without handshake, play nBlocks blocks
  349. if _, err := proxyApp.Start(); err != nil {
  350. panic(err)
  351. }
  352. validators := types.TM2PB.Validators(state.Validators)
  353. proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators})
  354. defer proxyApp.Stop()
  355. switch mode {
  356. case 0:
  357. for i := 0; i < nBlocks; i++ {
  358. block := chain[i]
  359. applyBlock(state, block, proxyApp)
  360. }
  361. case 1, 2:
  362. for i := 0; i < nBlocks-1; i++ {
  363. block := chain[i]
  364. applyBlock(state, block, proxyApp)
  365. }
  366. if mode == 2 {
  367. // update the dummy height and apphash
  368. // as if we ran commit but not
  369. applyBlock(state, chain[nBlocks-1], proxyApp)
  370. }
  371. }
  372. }
  373. func buildTMStateFromChain(config *cfg.Config, state *sm.State, chain []*types.Block, mode uint) []byte {
  374. // run the whole chain against this client to build up the tendermint state
  375. clientCreator := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.DBDir(), "1")))
  376. proxyApp := proxy.NewAppConns(clientCreator, nil) // sm.NewHandshaker(config, state, store, ReplayLastBlock))
  377. if _, err := proxyApp.Start(); err != nil {
  378. panic(err)
  379. }
  380. defer proxyApp.Stop()
  381. validators := types.TM2PB.Validators(state.Validators)
  382. proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators})
  383. var latestAppHash []byte
  384. switch mode {
  385. case 0:
  386. // sync right up
  387. for _, block := range chain {
  388. applyBlock(state, block, proxyApp)
  389. }
  390. latestAppHash = state.AppHash
  391. case 1, 2:
  392. // sync up to the penultimate as if we stored the block.
  393. // whether we commit or not depends on the appHash
  394. for _, block := range chain[:len(chain)-1] {
  395. applyBlock(state, block, proxyApp)
  396. }
  397. // apply the final block to a state copy so we can
  398. // get the right next appHash but keep the state back
  399. stateCopy := state.Copy()
  400. applyBlock(stateCopy, chain[len(chain)-1], proxyApp)
  401. latestAppHash = stateCopy.AppHash
  402. }
  403. return latestAppHash
  404. }
  405. //--------------------------
  406. // utils for making blocks
  407. func makeBlockchainFromWAL(wal *WAL) ([]*types.Block, []*types.Commit, error) {
  408. // Search for height marker
  409. gr, found, err := wal.SearchForEndHeight(0)
  410. if err != nil {
  411. return nil, nil, err
  412. }
  413. if !found {
  414. return nil, nil, errors.New(cmn.Fmt("WAL does not contain height %d.", 1))
  415. }
  416. defer gr.Close()
  417. // log.Notice("Build a blockchain by reading from the WAL")
  418. var blockParts *types.PartSet
  419. var blocks []*types.Block
  420. var commits []*types.Commit
  421. dec := NewWALDecoder(gr)
  422. for {
  423. msg, err := dec.Decode()
  424. if err == io.EOF {
  425. break
  426. } else if err != nil {
  427. return nil, nil, err
  428. }
  429. piece := readPieceFromWAL(msg)
  430. if piece == nil {
  431. continue
  432. }
  433. switch p := piece.(type) {
  434. case *types.PartSetHeader:
  435. // if its not the first one, we have a full block
  436. if blockParts != nil {
  437. var n int
  438. block := wire.ReadBinary(&types.Block{}, blockParts.GetReader(), 0, &n, &err).(*types.Block)
  439. blocks = append(blocks, block)
  440. }
  441. blockParts = types.NewPartSetFromHeader(*p)
  442. case *types.Part:
  443. _, err := blockParts.AddPart(p, false)
  444. if err != nil {
  445. return nil, nil, err
  446. }
  447. case *types.Vote:
  448. if p.Type == types.VoteTypePrecommit {
  449. commit := &types.Commit{
  450. BlockID: p.BlockID,
  451. Precommits: []*types.Vote{p},
  452. }
  453. commits = append(commits, commit)
  454. }
  455. }
  456. }
  457. // grab the last block too
  458. var n int
  459. block := wire.ReadBinary(&types.Block{}, blockParts.GetReader(), 0, &n, &err).(*types.Block)
  460. blocks = append(blocks, block)
  461. return blocks, commits, nil
  462. }
  463. func readPieceFromWAL(msg *TimedWALMessage) interface{} {
  464. // skip meta messages
  465. if _, ok := msg.Msg.(EndHeightMessage); ok {
  466. return nil
  467. }
  468. // for logging
  469. switch m := msg.Msg.(type) {
  470. case msgInfo:
  471. switch msg := m.Msg.(type) {
  472. case *ProposalMessage:
  473. return &msg.Proposal.BlockPartsHeader
  474. case *BlockPartMessage:
  475. return msg.Part
  476. case *VoteMessage:
  477. return msg.Vote
  478. }
  479. }
  480. return nil
  481. }
  482. // fresh state and mock store
  483. func stateAndStore(config *cfg.Config, pubKey crypto.PubKey) (*sm.State, *mockBlockStore) {
  484. stateDB := dbm.NewMemDB()
  485. state, _ := sm.MakeGenesisStateFromFile(stateDB, config.GenesisFile())
  486. state.SetLogger(log.TestingLogger().With("module", "state"))
  487. store := NewMockBlockStore(config, state.Params)
  488. return state, store
  489. }
  490. //----------------------------------
  491. // mock block store
  492. type mockBlockStore struct {
  493. config *cfg.Config
  494. params types.ConsensusParams
  495. chain []*types.Block
  496. commits []*types.Commit
  497. }
  498. // TODO: NewBlockStore(db.NewMemDB) ...
  499. func NewMockBlockStore(config *cfg.Config, params types.ConsensusParams) *mockBlockStore {
  500. return &mockBlockStore{config, params, nil, nil}
  501. }
  502. func (bs *mockBlockStore) Height() int { return len(bs.chain) }
  503. func (bs *mockBlockStore) LoadBlock(height int) *types.Block { return bs.chain[height-1] }
  504. func (bs *mockBlockStore) LoadBlockMeta(height int) *types.BlockMeta {
  505. block := bs.chain[height-1]
  506. return &types.BlockMeta{
  507. BlockID: types.BlockID{block.Hash(), block.MakePartSet(bs.params.BlockPartSizeBytes).Header()},
  508. Header: block.Header,
  509. }
  510. }
  511. func (bs *mockBlockStore) LoadBlockPart(height int, index int) *types.Part { return nil }
  512. func (bs *mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
  513. }
  514. func (bs *mockBlockStore) LoadBlockCommit(height int) *types.Commit {
  515. return bs.commits[height-1]
  516. }
  517. func (bs *mockBlockStore) LoadSeenCommit(height int) *types.Commit {
  518. return bs.commits[height-1]
  519. }