You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

687 lines
21 KiB

cs: sync WAL more frequently (#3300) As per #3043, this adds a ticker to sync the WAL every 2s while the WAL is running. * Flush WAL every 2s This adds a ticker that flushes the WAL every 2s while the WAL is running. This is related to #3043. * Fix spelling * Increase timeout to 2mins for slower build environments * Make WAL sync interval configurable * Add TODO to replace testChan with more comprehensive testBus * Remove extraneous debug statement * Remove testChan in favour of using system time As per https://github.com/tendermint/tendermint/pull/3300#discussion_r255886586, this removes the `testChan` WAL member and replaces the approach with a system time-oriented one. In this new approach, we keep track of the system time at which each flush and periodic flush successfully occurred. The naming of the various functions is also updated here to be more consistent with "flushing" as opposed to "sync'ing". * Update naming convention and ensure lock for timestamp update * Add Flush method as part of WAL interface Adds a `Flush` method as part of the WAL interface to enforce the idea that we can manually trigger a WAL flush from outside of the WAL. This is employed in the consensus state management to flush the WAL prior to signing votes/proposals, as per https://github.com/tendermint/tendermint/issues/3043#issuecomment-453853630 * Update CHANGELOG_PENDING * Remove mutex approach and replace with DI The dependency injection approach to dealing with testing concerns could allow similar effects to some kind of "testing bus"-based approach. This commit introduces an example of this, where instead of relying on (potentially fragile) timing of things between the code and the test, we inject code into the function under test that can signal the test through a channel. This allows us to avoid the `time.Sleep()`-based approach previously employed. * Update comment on WAL flushing during vote signing Co-Authored-By: thanethomson <connect@thanethomson.com> * Simplify flush interval definition Co-Authored-By: thanethomson <connect@thanethomson.com> * Expand commentary on WAL disk flushing Co-Authored-By: thanethomson <connect@thanethomson.com> * Add broken test to illustrate WAL sync test problem Removes test-related state (dependency injection code) from the WAL data structure and adds test code to illustrate the problem with using `WALGenerateNBlocks` and `wal.SearchForEndHeight` to test periodic sync'ing. * Fix test error messages * Use WAL group buffer size to check for flush A function is added to `libs/autofile/group.go#Group` in order to return the size of the buffered data (i.e. data that has not yet been flushed to disk). The test now checks that, prior to a `time.Sleep`, the group buffer has data in it. After the `time.Sleep` (during which time the periodic flush should have been called), the buffer should be empty. * Remove config root dir removal from #3291 * Add godoc for NewWAL mentioning periodic sync
6 years ago
cs: sync WAL more frequently (#3300) As per #3043, this adds a ticker to sync the WAL every 2s while the WAL is running. * Flush WAL every 2s This adds a ticker that flushes the WAL every 2s while the WAL is running. This is related to #3043. * Fix spelling * Increase timeout to 2mins for slower build environments * Make WAL sync interval configurable * Add TODO to replace testChan with more comprehensive testBus * Remove extraneous debug statement * Remove testChan in favour of using system time As per https://github.com/tendermint/tendermint/pull/3300#discussion_r255886586, this removes the `testChan` WAL member and replaces the approach with a system time-oriented one. In this new approach, we keep track of the system time at which each flush and periodic flush successfully occurred. The naming of the various functions is also updated here to be more consistent with "flushing" as opposed to "sync'ing". * Update naming convention and ensure lock for timestamp update * Add Flush method as part of WAL interface Adds a `Flush` method as part of the WAL interface to enforce the idea that we can manually trigger a WAL flush from outside of the WAL. This is employed in the consensus state management to flush the WAL prior to signing votes/proposals, as per https://github.com/tendermint/tendermint/issues/3043#issuecomment-453853630 * Update CHANGELOG_PENDING * Remove mutex approach and replace with DI The dependency injection approach to dealing with testing concerns could allow similar effects to some kind of "testing bus"-based approach. This commit introduces an example of this, where instead of relying on (potentially fragile) timing of things between the code and the test, we inject code into the function under test that can signal the test through a channel. This allows us to avoid the `time.Sleep()`-based approach previously employed. * Update comment on WAL flushing during vote signing Co-Authored-By: thanethomson <connect@thanethomson.com> * Simplify flush interval definition Co-Authored-By: thanethomson <connect@thanethomson.com> * Expand commentary on WAL disk flushing Co-Authored-By: thanethomson <connect@thanethomson.com> * Add broken test to illustrate WAL sync test problem Removes test-related state (dependency injection code) from the WAL data structure and adds test code to illustrate the problem with using `WALGenerateNBlocks` and `wal.SearchForEndHeight` to test periodic sync'ing. * Fix test error messages * Use WAL group buffer size to check for flush A function is added to `libs/autofile/group.go#Group` in order to return the size of the buffered data (i.e. data that has not yet been flushed to disk). The test now checks that, prior to a `time.Sleep`, the group buffer has data in it. After the `time.Sleep` (during which time the periodic flush should have been called), the buffer should be empty. * Remove config root dir removal from #3291 * Add godoc for NewWAL mentioning periodic sync
6 years ago
cs: sync WAL more frequently (#3300) As per #3043, this adds a ticker to sync the WAL every 2s while the WAL is running. * Flush WAL every 2s This adds a ticker that flushes the WAL every 2s while the WAL is running. This is related to #3043. * Fix spelling * Increase timeout to 2mins for slower build environments * Make WAL sync interval configurable * Add TODO to replace testChan with more comprehensive testBus * Remove extraneous debug statement * Remove testChan in favour of using system time As per https://github.com/tendermint/tendermint/pull/3300#discussion_r255886586, this removes the `testChan` WAL member and replaces the approach with a system time-oriented one. In this new approach, we keep track of the system time at which each flush and periodic flush successfully occurred. The naming of the various functions is also updated here to be more consistent with "flushing" as opposed to "sync'ing". * Update naming convention and ensure lock for timestamp update * Add Flush method as part of WAL interface Adds a `Flush` method as part of the WAL interface to enforce the idea that we can manually trigger a WAL flush from outside of the WAL. This is employed in the consensus state management to flush the WAL prior to signing votes/proposals, as per https://github.com/tendermint/tendermint/issues/3043#issuecomment-453853630 * Update CHANGELOG_PENDING * Remove mutex approach and replace with DI The dependency injection approach to dealing with testing concerns could allow similar effects to some kind of "testing bus"-based approach. This commit introduces an example of this, where instead of relying on (potentially fragile) timing of things between the code and the test, we inject code into the function under test that can signal the test through a channel. This allows us to avoid the `time.Sleep()`-based approach previously employed. * Update comment on WAL flushing during vote signing Co-Authored-By: thanethomson <connect@thanethomson.com> * Simplify flush interval definition Co-Authored-By: thanethomson <connect@thanethomson.com> * Expand commentary on WAL disk flushing Co-Authored-By: thanethomson <connect@thanethomson.com> * Add broken test to illustrate WAL sync test problem Removes test-related state (dependency injection code) from the WAL data structure and adds test code to illustrate the problem with using `WALGenerateNBlocks` and `wal.SearchForEndHeight` to test periodic sync'ing. * Fix test error messages * Use WAL group buffer size to check for flush A function is added to `libs/autofile/group.go#Group` in order to return the size of the buffered data (i.e. data that has not yet been flushed to disk). The test now checks that, prior to a `time.Sleep`, the group buffer has data in it. After the `time.Sleep` (during which time the periodic flush should have been called), the buffer should be empty. * Remove config root dir removal from #3291 * Add godoc for NewWAL mentioning periodic sync
6 years ago
cs: sync WAL more frequently (#3300) As per #3043, this adds a ticker to sync the WAL every 2s while the WAL is running. * Flush WAL every 2s This adds a ticker that flushes the WAL every 2s while the WAL is running. This is related to #3043. * Fix spelling * Increase timeout to 2mins for slower build environments * Make WAL sync interval configurable * Add TODO to replace testChan with more comprehensive testBus * Remove extraneous debug statement * Remove testChan in favour of using system time As per https://github.com/tendermint/tendermint/pull/3300#discussion_r255886586, this removes the `testChan` WAL member and replaces the approach with a system time-oriented one. In this new approach, we keep track of the system time at which each flush and periodic flush successfully occurred. The naming of the various functions is also updated here to be more consistent with "flushing" as opposed to "sync'ing". * Update naming convention and ensure lock for timestamp update * Add Flush method as part of WAL interface Adds a `Flush` method as part of the WAL interface to enforce the idea that we can manually trigger a WAL flush from outside of the WAL. This is employed in the consensus state management to flush the WAL prior to signing votes/proposals, as per https://github.com/tendermint/tendermint/issues/3043#issuecomment-453853630 * Update CHANGELOG_PENDING * Remove mutex approach and replace with DI The dependency injection approach to dealing with testing concerns could allow similar effects to some kind of "testing bus"-based approach. This commit introduces an example of this, where instead of relying on (potentially fragile) timing of things between the code and the test, we inject code into the function under test that can signal the test through a channel. This allows us to avoid the `time.Sleep()`-based approach previously employed. * Update comment on WAL flushing during vote signing Co-Authored-By: thanethomson <connect@thanethomson.com> * Simplify flush interval definition Co-Authored-By: thanethomson <connect@thanethomson.com> * Expand commentary on WAL disk flushing Co-Authored-By: thanethomson <connect@thanethomson.com> * Add broken test to illustrate WAL sync test problem Removes test-related state (dependency injection code) from the WAL data structure and adds test code to illustrate the problem with using `WALGenerateNBlocks` and `wal.SearchForEndHeight` to test periodic sync'ing. * Fix test error messages * Use WAL group buffer size to check for flush A function is added to `libs/autofile/group.go#Group` in order to return the size of the buffered data (i.e. data that has not yet been flushed to disk). The test now checks that, prior to a `time.Sleep`, the group buffer has data in it. After the `time.Sleep` (during which time the periodic flush should have been called), the buffer should be empty. * Remove config root dir removal from #3291 * Add godoc for NewWAL mentioning periodic sync
6 years ago
cs: sync WAL more frequently (#3300) As per #3043, this adds a ticker to sync the WAL every 2s while the WAL is running. * Flush WAL every 2s This adds a ticker that flushes the WAL every 2s while the WAL is running. This is related to #3043. * Fix spelling * Increase timeout to 2mins for slower build environments * Make WAL sync interval configurable * Add TODO to replace testChan with more comprehensive testBus * Remove extraneous debug statement * Remove testChan in favour of using system time As per https://github.com/tendermint/tendermint/pull/3300#discussion_r255886586, this removes the `testChan` WAL member and replaces the approach with a system time-oriented one. In this new approach, we keep track of the system time at which each flush and periodic flush successfully occurred. The naming of the various functions is also updated here to be more consistent with "flushing" as opposed to "sync'ing". * Update naming convention and ensure lock for timestamp update * Add Flush method as part of WAL interface Adds a `Flush` method as part of the WAL interface to enforce the idea that we can manually trigger a WAL flush from outside of the WAL. This is employed in the consensus state management to flush the WAL prior to signing votes/proposals, as per https://github.com/tendermint/tendermint/issues/3043#issuecomment-453853630 * Update CHANGELOG_PENDING * Remove mutex approach and replace with DI The dependency injection approach to dealing with testing concerns could allow similar effects to some kind of "testing bus"-based approach. This commit introduces an example of this, where instead of relying on (potentially fragile) timing of things between the code and the test, we inject code into the function under test that can signal the test through a channel. This allows us to avoid the `time.Sleep()`-based approach previously employed. * Update comment on WAL flushing during vote signing Co-Authored-By: thanethomson <connect@thanethomson.com> * Simplify flush interval definition Co-Authored-By: thanethomson <connect@thanethomson.com> * Expand commentary on WAL disk flushing Co-Authored-By: thanethomson <connect@thanethomson.com> * Add broken test to illustrate WAL sync test problem Removes test-related state (dependency injection code) from the WAL data structure and adds test code to illustrate the problem with using `WALGenerateNBlocks` and `wal.SearchForEndHeight` to test periodic sync'ing. * Fix test error messages * Use WAL group buffer size to check for flush A function is added to `libs/autofile/group.go#Group` in order to return the size of the buffered data (i.e. data that has not yet been flushed to disk). The test now checks that, prior to a `time.Sleep`, the group buffer has data in it. After the `time.Sleep` (during which time the periodic flush should have been called), the buffer should be empty. * Remove config root dir removal from #3291 * Add godoc for NewWAL mentioning periodic sync
6 years ago
cs: sync WAL more frequently (#3300) As per #3043, this adds a ticker to sync the WAL every 2s while the WAL is running. * Flush WAL every 2s This adds a ticker that flushes the WAL every 2s while the WAL is running. This is related to #3043. * Fix spelling * Increase timeout to 2mins for slower build environments * Make WAL sync interval configurable * Add TODO to replace testChan with more comprehensive testBus * Remove extraneous debug statement * Remove testChan in favour of using system time As per https://github.com/tendermint/tendermint/pull/3300#discussion_r255886586, this removes the `testChan` WAL member and replaces the approach with a system time-oriented one. In this new approach, we keep track of the system time at which each flush and periodic flush successfully occurred. The naming of the various functions is also updated here to be more consistent with "flushing" as opposed to "sync'ing". * Update naming convention and ensure lock for timestamp update * Add Flush method as part of WAL interface Adds a `Flush` method as part of the WAL interface to enforce the idea that we can manually trigger a WAL flush from outside of the WAL. This is employed in the consensus state management to flush the WAL prior to signing votes/proposals, as per https://github.com/tendermint/tendermint/issues/3043#issuecomment-453853630 * Update CHANGELOG_PENDING * Remove mutex approach and replace with DI The dependency injection approach to dealing with testing concerns could allow similar effects to some kind of "testing bus"-based approach. This commit introduces an example of this, where instead of relying on (potentially fragile) timing of things between the code and the test, we inject code into the function under test that can signal the test through a channel. This allows us to avoid the `time.Sleep()`-based approach previously employed. * Update comment on WAL flushing during vote signing Co-Authored-By: thanethomson <connect@thanethomson.com> * Simplify flush interval definition Co-Authored-By: thanethomson <connect@thanethomson.com> * Expand commentary on WAL disk flushing Co-Authored-By: thanethomson <connect@thanethomson.com> * Add broken test to illustrate WAL sync test problem Removes test-related state (dependency injection code) from the WAL data structure and adds test code to illustrate the problem with using `WALGenerateNBlocks` and `wal.SearchForEndHeight` to test periodic sync'ing. * Fix test error messages * Use WAL group buffer size to check for flush A function is added to `libs/autofile/group.go#Group` in order to return the size of the buffered data (i.e. data that has not yet been flushed to disk). The test now checks that, prior to a `time.Sleep`, the group buffer has data in it. After the `time.Sleep` (during which time the periodic flush should have been called), the buffer should be empty. * Remove config root dir removal from #3291 * Add godoc for NewWAL mentioning periodic sync
6 years ago
cs: sync WAL more frequently (#3300) As per #3043, this adds a ticker to sync the WAL every 2s while the WAL is running. * Flush WAL every 2s This adds a ticker that flushes the WAL every 2s while the WAL is running. This is related to #3043. * Fix spelling * Increase timeout to 2mins for slower build environments * Make WAL sync interval configurable * Add TODO to replace testChan with more comprehensive testBus * Remove extraneous debug statement * Remove testChan in favour of using system time As per https://github.com/tendermint/tendermint/pull/3300#discussion_r255886586, this removes the `testChan` WAL member and replaces the approach with a system time-oriented one. In this new approach, we keep track of the system time at which each flush and periodic flush successfully occurred. The naming of the various functions is also updated here to be more consistent with "flushing" as opposed to "sync'ing". * Update naming convention and ensure lock for timestamp update * Add Flush method as part of WAL interface Adds a `Flush` method as part of the WAL interface to enforce the idea that we can manually trigger a WAL flush from outside of the WAL. This is employed in the consensus state management to flush the WAL prior to signing votes/proposals, as per https://github.com/tendermint/tendermint/issues/3043#issuecomment-453853630 * Update CHANGELOG_PENDING * Remove mutex approach and replace with DI The dependency injection approach to dealing with testing concerns could allow similar effects to some kind of "testing bus"-based approach. This commit introduces an example of this, where instead of relying on (potentially fragile) timing of things between the code and the test, we inject code into the function under test that can signal the test through a channel. This allows us to avoid the `time.Sleep()`-based approach previously employed. * Update comment on WAL flushing during vote signing Co-Authored-By: thanethomson <connect@thanethomson.com> * Simplify flush interval definition Co-Authored-By: thanethomson <connect@thanethomson.com> * Expand commentary on WAL disk flushing Co-Authored-By: thanethomson <connect@thanethomson.com> * Add broken test to illustrate WAL sync test problem Removes test-related state (dependency injection code) from the WAL data structure and adds test code to illustrate the problem with using `WALGenerateNBlocks` and `wal.SearchForEndHeight` to test periodic sync'ing. * Fix test error messages * Use WAL group buffer size to check for flush A function is added to `libs/autofile/group.go#Group` in order to return the size of the buffered data (i.e. data that has not yet been flushed to disk). The test now checks that, prior to a `time.Sleep`, the group buffer has data in it. After the `time.Sleep` (during which time the periodic flush should have been called), the buffer should be empty. * Remove config root dir removal from #3291 * Add godoc for NewWAL mentioning periodic sync
6 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "os"
  9. "path"
  10. "runtime"
  11. "testing"
  12. "time"
  13. "github.com/stretchr/testify/assert"
  14. "github.com/stretchr/testify/require"
  15. "github.com/tendermint/tendermint/abci/example/kvstore"
  16. abci "github.com/tendermint/tendermint/abci/types"
  17. cfg "github.com/tendermint/tendermint/config"
  18. "github.com/tendermint/tendermint/crypto"
  19. auto "github.com/tendermint/tendermint/libs/autofile"
  20. dbm "github.com/tendermint/tendermint/libs/db"
  21. "github.com/tendermint/tendermint/libs/log"
  22. "github.com/tendermint/tendermint/privval"
  23. "github.com/tendermint/tendermint/proxy"
  24. sm "github.com/tendermint/tendermint/state"
  25. "github.com/tendermint/tendermint/types"
  26. "github.com/tendermint/tendermint/version"
  27. )
  28. func TestMain(m *testing.M) {
  29. config = ResetConfig("consensus_reactor_test")
  30. consensusReplayConfig = ResetConfig("consensus_replay_test")
  31. configStateTest := ResetConfig("consensus_state_test")
  32. configMempoolTest := ResetConfig("consensus_mempool_test")
  33. configByzantineTest := ResetConfig("consensus_byzantine_test")
  34. code := m.Run()
  35. os.RemoveAll(config.RootDir)
  36. os.RemoveAll(consensusReplayConfig.RootDir)
  37. os.RemoveAll(configStateTest.RootDir)
  38. os.RemoveAll(configMempoolTest.RootDir)
  39. os.RemoveAll(configByzantineTest.RootDir)
  40. os.Exit(code)
  41. }
  42. // These tests ensure we can always recover from failure at any part of the consensus process.
  43. // There are two general failure scenarios: failure during consensus, and failure while applying the block.
  44. // Only the latter interacts with the app and store,
  45. // but the former has to deal with restrictions on re-use of priv_validator keys.
  46. // The `WAL Tests` are for failures during the consensus;
  47. // the `Handshake Tests` are for failures in applying the block.
  48. // With the help of the WAL, we can recover from it all!
  49. //------------------------------------------------------------------------------------------
  50. // WAL Tests
  51. // TODO: It would be better to verify explicitly which states we can recover from without the wal
  52. // and which ones we need the wal for - then we'd also be able to only flush the
  53. // wal writer when we need to, instead of with every message.
  54. func startNewConsensusStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Config,
  55. lastBlockHeight int64, blockDB dbm.DB, stateDB dbm.DB) {
  56. logger := log.TestingLogger()
  57. state, _ := sm.LoadStateFromDBOrGenesisFile(stateDB, consensusReplayConfig.GenesisFile())
  58. privValidator := loadPrivValidator(consensusReplayConfig)
  59. cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, kvstore.NewKVStoreApplication(), blockDB)
  60. cs.SetLogger(logger)
  61. bytes, _ := ioutil.ReadFile(cs.config.WalFile())
  62. t.Logf("====== WAL: \n\r%X\n", bytes)
  63. err := cs.Start()
  64. require.NoError(t, err)
  65. defer cs.Stop()
  66. // This is just a signal that we haven't halted; its not something contained
  67. // in the WAL itself. Assuming the consensus state is running, replay of any
  68. // WAL, including the empty one, should eventually be followed by a new
  69. // block, or else something is wrong.
  70. newBlockCh := make(chan interface{}, 1)
  71. err = cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, newBlockCh)
  72. require.NoError(t, err)
  73. select {
  74. case <-newBlockCh:
  75. case <-time.After(120 * time.Second):
  76. t.Fatalf("Timed out waiting for new block (see trace above)")
  77. }
  78. }
  79. func sendTxs(cs *ConsensusState, ctx context.Context) {
  80. for i := 0; i < 256; i++ {
  81. select {
  82. case <-ctx.Done():
  83. return
  84. default:
  85. tx := []byte{byte(i)}
  86. assertMempool(cs.txNotifier).CheckTx(tx, nil)
  87. i++
  88. }
  89. }
  90. }
  91. // TestWALCrash uses crashing WAL to test we can recover from any WAL failure.
  92. func TestWALCrash(t *testing.T) {
  93. testCases := []struct {
  94. name string
  95. initFn func(dbm.DB, *ConsensusState, context.Context)
  96. heightToStop int64
  97. }{
  98. {"empty block",
  99. func(stateDB dbm.DB, cs *ConsensusState, ctx context.Context) {},
  100. 1},
  101. {"many non-empty blocks",
  102. func(stateDB dbm.DB, cs *ConsensusState, ctx context.Context) {
  103. go sendTxs(cs, ctx)
  104. },
  105. 3},
  106. }
  107. for i, tc := range testCases {
  108. consensusReplayConfig := ResetConfig(fmt.Sprintf("%s_%d", t.Name(), i))
  109. t.Run(tc.name, func(t *testing.T) {
  110. crashWALandCheckLiveness(t, consensusReplayConfig, tc.initFn, tc.heightToStop)
  111. })
  112. }
  113. }
  114. func crashWALandCheckLiveness(t *testing.T, consensusReplayConfig *cfg.Config,
  115. initFn func(dbm.DB, *ConsensusState, context.Context), heightToStop int64) {
  116. walPanicked := make(chan error)
  117. crashingWal := &crashingWAL{panicCh: walPanicked, heightToStop: heightToStop}
  118. i := 1
  119. LOOP:
  120. for {
  121. t.Logf("====== LOOP %d\n", i)
  122. // create consensus state from a clean slate
  123. logger := log.NewNopLogger()
  124. stateDB := dbm.NewMemDB()
  125. state, _ := sm.MakeGenesisStateFromFile(consensusReplayConfig.GenesisFile())
  126. privValidator := loadPrivValidator(consensusReplayConfig)
  127. blockDB := dbm.NewMemDB()
  128. cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, kvstore.NewKVStoreApplication(), blockDB)
  129. cs.SetLogger(logger)
  130. // start sending transactions
  131. ctx, cancel := context.WithCancel(context.Background())
  132. initFn(stateDB, cs, ctx)
  133. // clean up WAL file from the previous iteration
  134. walFile := cs.config.WalFile()
  135. os.Remove(walFile)
  136. // set crashing WAL
  137. csWal, err := cs.OpenWAL(walFile)
  138. require.NoError(t, err)
  139. crashingWal.next = csWal
  140. // reset the message counter
  141. crashingWal.msgIndex = 1
  142. cs.wal = crashingWal
  143. // start consensus state
  144. err = cs.Start()
  145. require.NoError(t, err)
  146. i++
  147. select {
  148. case err := <-walPanicked:
  149. t.Logf("WAL panicked: %v", err)
  150. // make sure we can make blocks after a crash
  151. startNewConsensusStateAndWaitForBlock(t, consensusReplayConfig, cs.Height, blockDB, stateDB)
  152. // stop consensus state and transactions sender (initFn)
  153. cs.Stop()
  154. cancel()
  155. // if we reached the required height, exit
  156. if _, ok := err.(ReachedHeightToStopError); ok {
  157. break LOOP
  158. }
  159. case <-time.After(10 * time.Second):
  160. t.Fatal("WAL did not panic for 10 seconds (check the log)")
  161. }
  162. }
  163. }
  164. // crashingWAL is a WAL which crashes or rather simulates a crash during Save
  165. // (before and after). It remembers a message for which we last panicked
  166. // (lastPanickedForMsgIndex), so we don't panic for it in subsequent iterations.
  167. type crashingWAL struct {
  168. next WAL
  169. panicCh chan error
  170. heightToStop int64
  171. msgIndex int // current message index
  172. lastPanickedForMsgIndex int // last message for which we panicked
  173. }
  174. // WALWriteError indicates a WAL crash.
  175. type WALWriteError struct {
  176. msg string
  177. }
  178. func (e WALWriteError) Error() string {
  179. return e.msg
  180. }
  181. // ReachedHeightToStopError indicates we've reached the required consensus
  182. // height and may exit.
  183. type ReachedHeightToStopError struct {
  184. height int64
  185. }
  186. func (e ReachedHeightToStopError) Error() string {
  187. return fmt.Sprintf("reached height to stop %d", e.height)
  188. }
  189. // Write simulate WAL's crashing by sending an error to the panicCh and then
  190. // exiting the cs.receiveRoutine.
  191. func (w *crashingWAL) Write(m WALMessage) {
  192. if endMsg, ok := m.(EndHeightMessage); ok {
  193. if endMsg.Height == w.heightToStop {
  194. w.panicCh <- ReachedHeightToStopError{endMsg.Height}
  195. runtime.Goexit()
  196. } else {
  197. w.next.Write(m)
  198. }
  199. return
  200. }
  201. if w.msgIndex > w.lastPanickedForMsgIndex {
  202. w.lastPanickedForMsgIndex = w.msgIndex
  203. _, file, line, _ := runtime.Caller(1)
  204. w.panicCh <- WALWriteError{fmt.Sprintf("failed to write %T to WAL (fileline: %s:%d)", m, file, line)}
  205. runtime.Goexit()
  206. } else {
  207. w.msgIndex++
  208. w.next.Write(m)
  209. }
  210. }
  211. func (w *crashingWAL) WriteSync(m WALMessage) {
  212. w.Write(m)
  213. }
  214. func (w *crashingWAL) Group() *auto.Group { return w.next.Group() }
  215. func (w *crashingWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) {
  216. return w.next.SearchForEndHeight(height, options)
  217. }
  218. func (w *crashingWAL) Start() error { return w.next.Start() }
  219. func (w *crashingWAL) Stop() error { return w.next.Stop() }
  220. func (w *crashingWAL) Wait() { w.next.Wait() }
  221. func (w *crashingWAL) Flush() error { return w.Group().Flush() }
  222. //------------------------------------------------------------------------------------------
  223. // Handshake Tests
  224. const (
  225. NUM_BLOCKS = 6
  226. )
  227. var (
  228. mempool = sm.MockMempool{}
  229. evpool = sm.MockEvidencePool{}
  230. )
  231. //---------------------------------------
  232. // Test handshake/replay
  233. // 0 - all synced up
  234. // 1 - saved block but app and state are behind
  235. // 2 - save block and committed but state is behind
  236. var modes = []uint{0, 1, 2}
  237. // Sync from scratch
  238. func TestHandshakeReplayAll(t *testing.T) {
  239. for i, m := range modes {
  240. config := ResetConfig(fmt.Sprintf("%s_%v", t.Name(), i))
  241. defer os.RemoveAll(config.RootDir)
  242. testHandshakeReplay(t, config, 0, m)
  243. }
  244. }
  245. // Sync many, not from scratch
  246. func TestHandshakeReplaySome(t *testing.T) {
  247. for i, m := range modes {
  248. config := ResetConfig(fmt.Sprintf("%s_%v", t.Name(), i))
  249. defer os.RemoveAll(config.RootDir)
  250. testHandshakeReplay(t, config, 1, m)
  251. }
  252. }
  253. // Sync from lagging by one
  254. func TestHandshakeReplayOne(t *testing.T) {
  255. for i, m := range modes {
  256. config := ResetConfig(fmt.Sprintf("%s_%v", t.Name(), i))
  257. defer os.RemoveAll(config.RootDir)
  258. testHandshakeReplay(t, config, NUM_BLOCKS-1, m)
  259. }
  260. }
  261. // Sync from caught up
  262. func TestHandshakeReplayNone(t *testing.T) {
  263. for i, m := range modes {
  264. config := ResetConfig(fmt.Sprintf("%s_%v", t.Name(), i))
  265. defer os.RemoveAll(config.RootDir)
  266. testHandshakeReplay(t, config, NUM_BLOCKS, m)
  267. }
  268. }
  269. func tempWALWithData(data []byte) string {
  270. walFile, err := ioutil.TempFile("", "wal")
  271. if err != nil {
  272. panic(fmt.Errorf("failed to create temp WAL file: %v", err))
  273. }
  274. _, err = walFile.Write(data)
  275. if err != nil {
  276. panic(fmt.Errorf("failed to write to temp WAL file: %v", err))
  277. }
  278. if err := walFile.Close(); err != nil {
  279. panic(fmt.Errorf("failed to close temp WAL file: %v", err))
  280. }
  281. return walFile.Name()
  282. }
  283. // Make some blocks. Start a fresh app and apply nBlocks blocks. Then restart the app and sync it up with the remaining blocks
  284. func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uint) {
  285. walBody, err := WALWithNBlocks(t, NUM_BLOCKS)
  286. require.NoError(t, err)
  287. walFile := tempWALWithData(walBody)
  288. config.Consensus.SetWalFile(walFile)
  289. privVal := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile())
  290. wal, err := NewWAL(walFile)
  291. require.NoError(t, err)
  292. wal.SetLogger(log.TestingLogger())
  293. err = wal.Start()
  294. require.NoError(t, err)
  295. defer wal.Stop()
  296. chain, commits, err := makeBlockchainFromWAL(wal)
  297. require.NoError(t, err)
  298. stateDB, state, store := stateAndStore(config, privVal.GetPubKey(), kvstore.ProtocolVersion)
  299. store.chain = chain
  300. store.commits = commits
  301. // run the chain through state.ApplyBlock to build up the tendermint state
  302. state = buildTMStateFromChain(config, stateDB, state, chain, mode)
  303. latestAppHash := state.AppHash
  304. // make a new client creator
  305. kvstoreApp := kvstore.NewPersistentKVStoreApplication(path.Join(config.DBDir(), "2"))
  306. clientCreator2 := proxy.NewLocalClientCreator(kvstoreApp)
  307. if nBlocks > 0 {
  308. // run nBlocks against a new client to build up the app state.
  309. // use a throwaway tendermint state
  310. proxyApp := proxy.NewAppConns(clientCreator2)
  311. stateDB, state, _ := stateAndStore(config, privVal.GetPubKey(), kvstore.ProtocolVersion)
  312. buildAppStateFromChain(proxyApp, stateDB, state, chain, nBlocks, mode)
  313. }
  314. // now start the app using the handshake - it should sync
  315. genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile())
  316. handshaker := NewHandshaker(stateDB, state, store, genDoc)
  317. proxyApp := proxy.NewAppConns(clientCreator2)
  318. if err := proxyApp.Start(); err != nil {
  319. t.Fatalf("Error starting proxy app connections: %v", err)
  320. }
  321. defer proxyApp.Stop()
  322. if err := handshaker.Handshake(proxyApp); err != nil {
  323. t.Fatalf("Error on abci handshake: %v", err)
  324. }
  325. // get the latest app hash from the app
  326. res, err := proxyApp.Query().InfoSync(abci.RequestInfo{Version: ""})
  327. if err != nil {
  328. t.Fatal(err)
  329. }
  330. // the app hash should be synced up
  331. if !bytes.Equal(latestAppHash, res.LastBlockAppHash) {
  332. t.Fatalf("Expected app hashes to match after handshake/replay. got %X, expected %X", res.LastBlockAppHash, latestAppHash)
  333. }
  334. expectedBlocksToSync := NUM_BLOCKS - nBlocks
  335. if nBlocks == NUM_BLOCKS && mode > 0 {
  336. expectedBlocksToSync++
  337. } else if nBlocks > 0 && mode == 1 {
  338. expectedBlocksToSync++
  339. }
  340. if handshaker.NBlocks() != expectedBlocksToSync {
  341. t.Fatalf("Expected handshake to sync %d blocks, got %d", expectedBlocksToSync, handshaker.NBlocks())
  342. }
  343. }
  344. func applyBlock(stateDB dbm.DB, st sm.State, blk *types.Block, proxyApp proxy.AppConns) sm.State {
  345. testPartSize := types.BlockPartSizeBytes
  346. blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
  347. blkID := types.BlockID{blk.Hash(), blk.MakePartSet(testPartSize).Header()}
  348. newState, err := blockExec.ApplyBlock(st, blkID, blk)
  349. if err != nil {
  350. panic(err)
  351. }
  352. return newState
  353. }
  354. func buildAppStateFromChain(proxyApp proxy.AppConns, stateDB dbm.DB,
  355. state sm.State, chain []*types.Block, nBlocks int, mode uint) {
  356. // start a new app without handshake, play nBlocks blocks
  357. if err := proxyApp.Start(); err != nil {
  358. panic(err)
  359. }
  360. defer proxyApp.Stop()
  361. validators := types.TM2PB.ValidatorUpdates(state.Validators)
  362. if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{
  363. Validators: validators,
  364. }); err != nil {
  365. panic(err)
  366. }
  367. switch mode {
  368. case 0:
  369. for i := 0; i < nBlocks; i++ {
  370. block := chain[i]
  371. state = applyBlock(stateDB, state, block, proxyApp)
  372. }
  373. case 1, 2:
  374. for i := 0; i < nBlocks-1; i++ {
  375. block := chain[i]
  376. state = applyBlock(stateDB, state, block, proxyApp)
  377. }
  378. if mode == 2 {
  379. // update the kvstore height and apphash
  380. // as if we ran commit but not
  381. state = applyBlock(stateDB, state, chain[nBlocks-1], proxyApp)
  382. }
  383. }
  384. }
  385. func buildTMStateFromChain(config *cfg.Config, stateDB dbm.DB, state sm.State, chain []*types.Block, mode uint) sm.State {
  386. // run the whole chain against this client to build up the tendermint state
  387. clientCreator := proxy.NewLocalClientCreator(kvstore.NewPersistentKVStoreApplication(path.Join(config.DBDir(), "1")))
  388. proxyApp := proxy.NewAppConns(clientCreator)
  389. if err := proxyApp.Start(); err != nil {
  390. panic(err)
  391. }
  392. defer proxyApp.Stop()
  393. validators := types.TM2PB.ValidatorUpdates(state.Validators)
  394. if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{
  395. Validators: validators,
  396. }); err != nil {
  397. panic(err)
  398. }
  399. switch mode {
  400. case 0:
  401. // sync right up
  402. for _, block := range chain {
  403. state = applyBlock(stateDB, state, block, proxyApp)
  404. }
  405. case 1, 2:
  406. // sync up to the penultimate as if we stored the block.
  407. // whether we commit or not depends on the appHash
  408. for _, block := range chain[:len(chain)-1] {
  409. state = applyBlock(stateDB, state, block, proxyApp)
  410. }
  411. // apply the final block to a state copy so we can
  412. // get the right next appHash but keep the state back
  413. applyBlock(stateDB, state, chain[len(chain)-1], proxyApp)
  414. }
  415. return state
  416. }
  417. //--------------------------
  418. // utils for making blocks
  419. func makeBlockchainFromWAL(wal WAL) ([]*types.Block, []*types.Commit, error) {
  420. // Search for height marker
  421. gr, found, err := wal.SearchForEndHeight(0, &WALSearchOptions{})
  422. if err != nil {
  423. return nil, nil, err
  424. }
  425. if !found {
  426. return nil, nil, fmt.Errorf("WAL does not contain height %d.", 1)
  427. }
  428. defer gr.Close() // nolint: errcheck
  429. // log.Notice("Build a blockchain by reading from the WAL")
  430. var blocks []*types.Block
  431. var commits []*types.Commit
  432. var thisBlockParts *types.PartSet
  433. var thisBlockCommit *types.Commit
  434. var height int64
  435. dec := NewWALDecoder(gr)
  436. for {
  437. msg, err := dec.Decode()
  438. if err == io.EOF {
  439. break
  440. } else if err != nil {
  441. return nil, nil, err
  442. }
  443. piece := readPieceFromWAL(msg)
  444. if piece == nil {
  445. continue
  446. }
  447. switch p := piece.(type) {
  448. case EndHeightMessage:
  449. // if its not the first one, we have a full block
  450. if thisBlockParts != nil {
  451. var block = new(types.Block)
  452. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(thisBlockParts.GetReader(), block, 0)
  453. if err != nil {
  454. panic(err)
  455. }
  456. if block.Height != height+1 {
  457. panic(fmt.Sprintf("read bad block from wal. got height %d, expected %d", block.Height, height+1))
  458. }
  459. commitHeight := thisBlockCommit.Precommits[0].Height
  460. if commitHeight != height+1 {
  461. panic(fmt.Sprintf("commit doesnt match. got height %d, expected %d", commitHeight, height+1))
  462. }
  463. blocks = append(blocks, block)
  464. commits = append(commits, thisBlockCommit)
  465. height++
  466. }
  467. case *types.PartSetHeader:
  468. thisBlockParts = types.NewPartSetFromHeader(*p)
  469. case *types.Part:
  470. _, err := thisBlockParts.AddPart(p)
  471. if err != nil {
  472. return nil, nil, err
  473. }
  474. case *types.Vote:
  475. if p.Type == types.PrecommitType {
  476. commitSigs := []*types.CommitSig{p.CommitSig()}
  477. thisBlockCommit = types.NewCommit(p.BlockID, commitSigs)
  478. }
  479. }
  480. }
  481. // grab the last block too
  482. var block = new(types.Block)
  483. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(thisBlockParts.GetReader(), block, 0)
  484. if err != nil {
  485. panic(err)
  486. }
  487. if block.Height != height+1 {
  488. panic(fmt.Sprintf("read bad block from wal. got height %d, expected %d", block.Height, height+1))
  489. }
  490. commitHeight := thisBlockCommit.Precommits[0].Height
  491. if commitHeight != height+1 {
  492. panic(fmt.Sprintf("commit doesnt match. got height %d, expected %d", commitHeight, height+1))
  493. }
  494. blocks = append(blocks, block)
  495. commits = append(commits, thisBlockCommit)
  496. return blocks, commits, nil
  497. }
  498. func readPieceFromWAL(msg *TimedWALMessage) interface{} {
  499. // for logging
  500. switch m := msg.Msg.(type) {
  501. case msgInfo:
  502. switch msg := m.Msg.(type) {
  503. case *ProposalMessage:
  504. return &msg.Proposal.BlockID.PartsHeader
  505. case *BlockPartMessage:
  506. return msg.Part
  507. case *VoteMessage:
  508. return msg.Vote
  509. }
  510. case EndHeightMessage:
  511. return m
  512. }
  513. return nil
  514. }
  515. // fresh state and mock store
  516. func stateAndStore(config *cfg.Config, pubKey crypto.PubKey, appVersion version.Protocol) (dbm.DB, sm.State, *mockBlockStore) {
  517. stateDB := dbm.NewMemDB()
  518. state, _ := sm.MakeGenesisStateFromFile(config.GenesisFile())
  519. state.Version.Consensus.App = appVersion
  520. store := NewMockBlockStore(config, state.ConsensusParams)
  521. return stateDB, state, store
  522. }
  523. //----------------------------------
  524. // mock block store
  525. type mockBlockStore struct {
  526. config *cfg.Config
  527. params types.ConsensusParams
  528. chain []*types.Block
  529. commits []*types.Commit
  530. }
  531. // TODO: NewBlockStore(db.NewMemDB) ...
  532. func NewMockBlockStore(config *cfg.Config, params types.ConsensusParams) *mockBlockStore {
  533. return &mockBlockStore{config, params, nil, nil}
  534. }
  535. func (bs *mockBlockStore) Height() int64 { return int64(len(bs.chain)) }
  536. func (bs *mockBlockStore) LoadBlock(height int64) *types.Block { return bs.chain[height-1] }
  537. func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta {
  538. block := bs.chain[height-1]
  539. return &types.BlockMeta{
  540. BlockID: types.BlockID{block.Hash(), block.MakePartSet(types.BlockPartSizeBytes).Header()},
  541. Header: block.Header,
  542. }
  543. }
  544. func (bs *mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { return nil }
  545. func (bs *mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
  546. }
  547. func (bs *mockBlockStore) LoadBlockCommit(height int64) *types.Commit {
  548. return bs.commits[height-1]
  549. }
  550. func (bs *mockBlockStore) LoadSeenCommit(height int64) *types.Commit {
  551. return bs.commits[height-1]
  552. }
  553. //----------------------------------------
  554. func TestInitChainUpdateValidators(t *testing.T) {
  555. val, _ := types.RandValidator(true, 10)
  556. vals := types.NewValidatorSet([]*types.Validator{val})
  557. app := &initChainApp{vals: types.TM2PB.ValidatorUpdates(vals)}
  558. clientCreator := proxy.NewLocalClientCreator(app)
  559. config := ResetConfig("proxy_test_")
  560. defer os.RemoveAll(config.RootDir)
  561. privVal := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile())
  562. stateDB, state, store := stateAndStore(config, privVal.GetPubKey(), 0x0)
  563. oldValAddr := state.Validators.Validators[0].Address
  564. // now start the app using the handshake - it should sync
  565. genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile())
  566. handshaker := NewHandshaker(stateDB, state, store, genDoc)
  567. proxyApp := proxy.NewAppConns(clientCreator)
  568. if err := proxyApp.Start(); err != nil {
  569. t.Fatalf("Error starting proxy app connections: %v", err)
  570. }
  571. defer proxyApp.Stop()
  572. if err := handshaker.Handshake(proxyApp); err != nil {
  573. t.Fatalf("Error on abci handshake: %v", err)
  574. }
  575. // reload the state, check the validator set was updated
  576. state = sm.LoadState(stateDB)
  577. newValAddr := state.Validators.Validators[0].Address
  578. expectValAddr := val.Address
  579. assert.NotEqual(t, oldValAddr, newValAddr)
  580. assert.Equal(t, newValAddr, expectValAddr)
  581. }
  582. // returns the vals on InitChain
  583. type initChainApp struct {
  584. abci.BaseApplication
  585. vals []abci.ValidatorUpdate
  586. }
  587. func (ica *initChainApp) InitChain(req abci.RequestInitChain) abci.ResponseInitChain {
  588. return abci.ResponseInitChain{
  589. Validators: ica.vals,
  590. }
  591. }