You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

219 lines
6.7 KiB

8 years ago
  1. package consensus
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "os"
  6. "path"
  7. "strings"
  8. "testing"
  9. "time"
  10. . "github.com/tendermint/go-common"
  11. "github.com/tendermint/go-wire"
  12. "github.com/tendermint/tendermint/types"
  13. )
  14. var data_dir = path.Join(GoPath, "src/github.com/tendermint/tendermint/consensus", "test_data")
  15. // the priv validator changes step at these lines for a block with 1 val and 1 part
  16. var baseStepChanges = []int{3, 6, 8}
  17. // test recovery from each line in each testCase
  18. var testCases = []*testCase{
  19. newTestCase("empty_block", baseStepChanges), // empty block (has 1 block part)
  20. newTestCase("small_block1", baseStepChanges), // small block with txs in 1 block part
  21. newTestCase("small_block2", []int{3, 10, 12}), // small block with txs across 5 smaller block parts
  22. }
  23. type testCase struct {
  24. name string
  25. log string //full cs wal
  26. stepMap map[int]int8 // map lines of log to privval step
  27. proposeLine int
  28. prevoteLine int
  29. precommitLine int
  30. }
  31. func newTestCase(name string, stepChanges []int) *testCase {
  32. if len(stepChanges) != 3 {
  33. panic(Fmt("a full wal has 3 step changes! Got array %v", stepChanges))
  34. }
  35. return &testCase{
  36. name: name,
  37. log: readWAL(path.Join(data_dir, name+".cswal")),
  38. stepMap: newMapFromChanges(stepChanges),
  39. proposeLine: stepChanges[0],
  40. prevoteLine: stepChanges[1],
  41. precommitLine: stepChanges[2],
  42. }
  43. }
  44. func newMapFromChanges(changes []int) map[int]int8 {
  45. changes = append(changes, changes[2]+1) // so we add the last step change to the map
  46. m := make(map[int]int8)
  47. var count int
  48. for changeNum, nextChange := range changes {
  49. for ; count < nextChange; count++ {
  50. m[count] = int8(changeNum)
  51. }
  52. }
  53. return m
  54. }
  55. func readWAL(p string) string {
  56. b, err := ioutil.ReadFile(p)
  57. if err != nil {
  58. panic(err)
  59. }
  60. return string(b)
  61. }
  62. func writeWAL(walMsgs string) string {
  63. tempDir := os.TempDir()
  64. walDir := tempDir + "/wal" + RandStr(12)
  65. // Create WAL directory
  66. err := EnsureDir(walDir, 0700)
  67. if err != nil {
  68. panic(err)
  69. }
  70. // Write the needed WAL to file
  71. err = WriteFile(walDir+"/wal", []byte(walMsgs), 0600)
  72. if err != nil {
  73. panic(err)
  74. }
  75. return walDir
  76. }
  77. func waitForBlock(newBlockCh chan interface{}, thisCase *testCase, i int) {
  78. after := time.After(time.Second * 10)
  79. select {
  80. case <-newBlockCh:
  81. case <-after:
  82. panic(Fmt("Timed out waiting for new block for case '%s' line %d", thisCase.name, i))
  83. }
  84. }
  85. func runReplayTest(t *testing.T, cs *ConsensusState, walDir string, newBlockCh chan interface{},
  86. thisCase *testCase, i int) {
  87. cs.config.Set("cs_wal_dir", walDir)
  88. cs.Start()
  89. // Wait to make a new block.
  90. // This is just a signal that we haven't halted; its not something contained in the WAL itself.
  91. // Assuming the consensus state is running, replay of any WAL, including the empty one,
  92. // should eventually be followed by a new block, or else something is wrong
  93. waitForBlock(newBlockCh, thisCase, i)
  94. cs.Stop()
  95. cs.Wait()
  96. }
  97. func toPV(pv PrivValidator) *types.PrivValidator {
  98. return pv.(*types.PrivValidator)
  99. }
  100. func setupReplayTest(thisCase *testCase, nLines int, crashAfter bool) (*ConsensusState, chan interface{}, string, string) {
  101. fmt.Println("-------------------------------------")
  102. log.Notice(Fmt("Starting replay test of %d lines of WAL. Crash after = %v", nLines, crashAfter))
  103. lineStep := nLines
  104. if crashAfter {
  105. lineStep -= 1
  106. }
  107. split := strings.Split(thisCase.log, "\n")
  108. lastMsg := split[nLines]
  109. // we write those lines up to (not including) one with the signature
  110. walDir := writeWAL(strings.Join(split[:nLines], "\n") + "\n")
  111. cs := fixedConsensusStateDummy()
  112. // set the last step according to when we crashed vs the wal
  113. toPV(cs.privValidator).LastHeight = 1 // first block
  114. toPV(cs.privValidator).LastStep = thisCase.stepMap[lineStep]
  115. log.Warn("setupReplayTest", "LastStep", toPV(cs.privValidator).LastStep)
  116. newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1)
  117. return cs, newBlockCh, lastMsg, walDir
  118. }
  119. //-----------------------------------------------
  120. // Test the log at every iteration, and set the privVal last step
  121. // as if the log was written after signing, before the crash
  122. func TestReplayCrashAfterWrite(t *testing.T) {
  123. for _, thisCase := range testCases {
  124. split := strings.Split(thisCase.log, "\n")
  125. for i := 0; i < len(split)-1; i++ {
  126. cs, newBlockCh, _, walDir := setupReplayTest(thisCase, i+1, true)
  127. runReplayTest(t, cs, walDir, newBlockCh, thisCase, i+1)
  128. }
  129. }
  130. }
  131. //-----------------------------------------------
  132. // Test the log as if we crashed after signing but before writing.
  133. // This relies on privValidator.LastSignature being set
  134. func TestReplayCrashBeforeWritePropose(t *testing.T) {
  135. for _, thisCase := range testCases {
  136. lineNum := thisCase.proposeLine
  137. cs, newBlockCh, proposalMsg, walDir := setupReplayTest(thisCase, lineNum, false) // propose
  138. // Set LastSig
  139. var err error
  140. var msg TimedWALMessage
  141. wire.ReadJSON(&msg, []byte(proposalMsg), &err)
  142. proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage)
  143. if err != nil {
  144. t.Fatalf("Error reading json data: %v", err)
  145. }
  146. toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal)
  147. toPV(cs.privValidator).LastSignature = proposal.Proposal.Signature
  148. runReplayTest(t, cs, walDir, newBlockCh, thisCase, lineNum)
  149. }
  150. }
  151. func TestReplayCrashBeforeWritePrevote(t *testing.T) {
  152. for _, thisCase := range testCases {
  153. lineNum := thisCase.prevoteLine
  154. cs, newBlockCh, voteMsg, walDir := setupReplayTest(thisCase, lineNum, false) // prevote
  155. types.AddListenerForEvent(cs.evsw, "tester", types.EventStringCompleteProposal(), func(data types.TMEventData) {
  156. // Set LastSig
  157. var err error
  158. var msg TimedWALMessage
  159. wire.ReadJSON(&msg, []byte(voteMsg), &err)
  160. vote := msg.Msg.(msgInfo).Msg.(*VoteMessage)
  161. if err != nil {
  162. t.Fatalf("Error reading json data: %v", err)
  163. }
  164. toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote)
  165. toPV(cs.privValidator).LastSignature = vote.Vote.Signature
  166. })
  167. runReplayTest(t, cs, walDir, newBlockCh, thisCase, lineNum)
  168. }
  169. }
  170. func TestReplayCrashBeforeWritePrecommit(t *testing.T) {
  171. for _, thisCase := range testCases {
  172. lineNum := thisCase.precommitLine
  173. cs, newBlockCh, voteMsg, walDir := setupReplayTest(thisCase, lineNum, false) // precommit
  174. types.AddListenerForEvent(cs.evsw, "tester", types.EventStringPolka(), func(data types.TMEventData) {
  175. // Set LastSig
  176. var err error
  177. var msg TimedWALMessage
  178. wire.ReadJSON(&msg, []byte(voteMsg), &err)
  179. vote := msg.Msg.(msgInfo).Msg.(*VoteMessage)
  180. if err != nil {
  181. t.Fatalf("Error reading json data: %v", err)
  182. }
  183. toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote)
  184. toPV(cs.privValidator).LastSignature = vote.Vote.Signature
  185. })
  186. runReplayTest(t, cs, walDir, newBlockCh, thisCase, lineNum)
  187. }
  188. }