You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

202 lines
6.6 KiB

cs: sync WAL more frequently (#3300) As per #3043, this adds a ticker to sync the WAL every 2s while the WAL is running. * Flush WAL every 2s This adds a ticker that flushes the WAL every 2s while the WAL is running. This is related to #3043. * Fix spelling * Increase timeout to 2mins for slower build environments * Make WAL sync interval configurable * Add TODO to replace testChan with more comprehensive testBus * Remove extraneous debug statement * Remove testChan in favour of using system time As per https://github.com/tendermint/tendermint/pull/3300#discussion_r255886586, this removes the `testChan` WAL member and replaces the approach with a system time-oriented one. In this new approach, we keep track of the system time at which each flush and periodic flush successfully occurred. The naming of the various functions is also updated here to be more consistent with "flushing" as opposed to "sync'ing". * Update naming convention and ensure lock for timestamp update * Add Flush method as part of WAL interface Adds a `Flush` method as part of the WAL interface to enforce the idea that we can manually trigger a WAL flush from outside of the WAL. This is employed in the consensus state management to flush the WAL prior to signing votes/proposals, as per https://github.com/tendermint/tendermint/issues/3043#issuecomment-453853630 * Update CHANGELOG_PENDING * Remove mutex approach and replace with DI The dependency injection approach to dealing with testing concerns could allow similar effects to some kind of "testing bus"-based approach. This commit introduces an example of this, where instead of relying on (potentially fragile) timing of things between the code and the test, we inject code into the function under test that can signal the test through a channel. This allows us to avoid the `time.Sleep()`-based approach previously employed. * Update comment on WAL flushing during vote signing Co-Authored-By: thanethomson <connect@thanethomson.com> * Simplify flush interval definition Co-Authored-By: thanethomson <connect@thanethomson.com> * Expand commentary on WAL disk flushing Co-Authored-By: thanethomson <connect@thanethomson.com> * Add broken test to illustrate WAL sync test problem Removes test-related state (dependency injection code) from the WAL data structure and adds test code to illustrate the problem with using `WALGenerateNBlocks` and `wal.SearchForEndHeight` to test periodic sync'ing. * Fix test error messages * Use WAL group buffer size to check for flush A function is added to `libs/autofile/group.go#Group` in order to return the size of the buffered data (i.e. data that has not yet been flushed to disk). The test now checks that, prior to a `time.Sleep`, the group buffer has data in it. After the `time.Sleep` (during which time the periodic flush should have been called), the buffer should be empty. * Remove config root dir removal from #3291 * Add godoc for NewWAL mentioning periodic sync
6 years ago
  1. package consensus
  2. import (
  3. "bufio"
  4. "bytes"
  5. "fmt"
  6. "io"
  7. "path/filepath"
  8. "testing"
  9. "time"
  10. "github.com/pkg/errors"
  11. "github.com/tendermint/tendermint/abci/example/kvstore"
  12. bc "github.com/tendermint/tendermint/blockchain"
  13. cfg "github.com/tendermint/tendermint/config"
  14. cmn "github.com/tendermint/tendermint/libs/common"
  15. "github.com/tendermint/tendermint/libs/db"
  16. "github.com/tendermint/tendermint/libs/log"
  17. "github.com/tendermint/tendermint/privval"
  18. "github.com/tendermint/tendermint/proxy"
  19. sm "github.com/tendermint/tendermint/state"
  20. "github.com/tendermint/tendermint/types"
  21. )
  22. // WALGenerateNBlocks generates a consensus WAL. It does this by spinning up a
  23. // stripped down version of node (proxy app, event bus, consensus state) with a
  24. // persistent kvstore application and special consensus wal instance
  25. // (byteBufferWAL) and waits until numBlocks are created. If the node fails to produce given numBlocks, it returns an error.
  26. func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
  27. config := getConfig(t)
  28. app := kvstore.NewPersistentKVStoreApplication(filepath.Join(config.DBDir(), "wal_generator"))
  29. logger := log.TestingLogger().With("wal_generator", "wal_generator")
  30. logger.Info("generating WAL (last height msg excluded)", "numBlocks", numBlocks)
  31. /////////////////////////////////////////////////////////////////////////////
  32. // COPY PASTE FROM node.go WITH A FEW MODIFICATIONS
  33. // NOTE: we can't import node package because of circular dependency.
  34. // NOTE: we don't do handshake so need to set state.Version.Consensus.App directly.
  35. privValidatorKeyFile := config.PrivValidatorKeyFile()
  36. privValidatorStateFile := config.PrivValidatorStateFile()
  37. privValidator := privval.LoadOrGenFilePV(privValidatorKeyFile, privValidatorStateFile)
  38. genDoc, err := types.GenesisDocFromFile(config.GenesisFile())
  39. if err != nil {
  40. return errors.Wrap(err, "failed to read genesis file")
  41. }
  42. stateDB := db.NewMemDB()
  43. blockStoreDB := db.NewMemDB()
  44. state, err := sm.MakeGenesisState(genDoc)
  45. if err != nil {
  46. return errors.Wrap(err, "failed to make genesis state")
  47. }
  48. state.Version.Consensus.App = kvstore.ProtocolVersion
  49. blockStore := bc.NewBlockStore(blockStoreDB)
  50. proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app))
  51. proxyApp.SetLogger(logger.With("module", "proxy"))
  52. if err := proxyApp.Start(); err != nil {
  53. return errors.Wrap(err, "failed to start proxy app connections")
  54. }
  55. defer proxyApp.Stop()
  56. eventBus := types.NewEventBus()
  57. eventBus.SetLogger(logger.With("module", "events"))
  58. if err := eventBus.Start(); err != nil {
  59. return errors.Wrap(err, "failed to start event bus")
  60. }
  61. defer eventBus.Stop()
  62. mempool := sm.MockMempool{}
  63. evpool := sm.MockEvidencePool{}
  64. blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
  65. consensusState := NewConsensusState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
  66. consensusState.SetLogger(logger)
  67. consensusState.SetEventBus(eventBus)
  68. if privValidator != nil {
  69. consensusState.SetPrivValidator(privValidator)
  70. }
  71. // END OF COPY PASTE
  72. /////////////////////////////////////////////////////////////////////////////
  73. // set consensus wal to buffered WAL, which will write all incoming msgs to buffer
  74. numBlocksWritten := make(chan struct{})
  75. wal := newByteBufferWAL(logger, NewWALEncoder(wr), int64(numBlocks), numBlocksWritten)
  76. // see wal.go#103
  77. wal.Write(EndHeightMessage{0})
  78. consensusState.wal = wal
  79. if err := consensusState.Start(); err != nil {
  80. return errors.Wrap(err, "failed to start consensus state")
  81. }
  82. select {
  83. case <-numBlocksWritten:
  84. consensusState.Stop()
  85. return nil
  86. case <-time.After(1 * time.Minute):
  87. consensusState.Stop()
  88. return fmt.Errorf("waited too long for tendermint to produce %d blocks (grep logs for `wal_generator`)", numBlocks)
  89. }
  90. }
  91. //WALWithNBlocks returns a WAL content with numBlocks.
  92. func WALWithNBlocks(t *testing.T, numBlocks int) (data []byte, err error) {
  93. var b bytes.Buffer
  94. wr := bufio.NewWriter(&b)
  95. if err := WALGenerateNBlocks(t, wr, numBlocks); err != nil {
  96. return []byte{}, err
  97. }
  98. wr.Flush()
  99. return b.Bytes(), nil
  100. }
  101. func randPort() int {
  102. // returns between base and base + spread
  103. base, spread := 20000, 20000
  104. return base + cmn.RandIntn(spread)
  105. }
  106. func makeAddrs() (string, string, string) {
  107. start := randPort()
  108. return fmt.Sprintf("tcp://0.0.0.0:%d", start),
  109. fmt.Sprintf("tcp://0.0.0.0:%d", start+1),
  110. fmt.Sprintf("tcp://0.0.0.0:%d", start+2)
  111. }
  112. // getConfig returns a config for test cases
  113. func getConfig(t *testing.T) *cfg.Config {
  114. c := cfg.ResetTestRoot(t.Name())
  115. // and we use random ports to run in parallel
  116. tm, rpc, grpc := makeAddrs()
  117. c.P2P.ListenAddress = tm
  118. c.RPC.ListenAddress = rpc
  119. c.RPC.GRPCListenAddress = grpc
  120. return c
  121. }
  122. // byteBufferWAL is a WAL which writes all msgs to a byte buffer. Writing stops
  123. // when the heightToStop is reached. Client will be notified via
  124. // signalWhenStopsTo channel.
  125. type byteBufferWAL struct {
  126. enc *WALEncoder
  127. stopped bool
  128. heightToStop int64
  129. signalWhenStopsTo chan<- struct{}
  130. logger log.Logger
  131. }
  132. // needed for determinism
  133. var fixedTime, _ = time.Parse(time.RFC3339, "2017-01-02T15:04:05Z")
  134. func newByteBufferWAL(logger log.Logger, enc *WALEncoder, nBlocks int64, signalStop chan<- struct{}) *byteBufferWAL {
  135. return &byteBufferWAL{
  136. enc: enc,
  137. heightToStop: nBlocks,
  138. signalWhenStopsTo: signalStop,
  139. logger: logger,
  140. }
  141. }
  142. // Save writes message to the internal buffer except when heightToStop is
  143. // reached, in which case it will signal the caller via signalWhenStopsTo and
  144. // skip writing.
  145. func (w *byteBufferWAL) Write(m WALMessage) {
  146. if w.stopped {
  147. w.logger.Debug("WAL already stopped. Not writing message", "msg", m)
  148. return
  149. }
  150. if endMsg, ok := m.(EndHeightMessage); ok {
  151. w.logger.Debug("WAL write end height message", "height", endMsg.Height, "stopHeight", w.heightToStop)
  152. if endMsg.Height == w.heightToStop {
  153. w.logger.Debug("Stopping WAL at height", "height", endMsg.Height)
  154. w.signalWhenStopsTo <- struct{}{}
  155. w.stopped = true
  156. return
  157. }
  158. }
  159. w.logger.Debug("WAL Write Message", "msg", m)
  160. err := w.enc.Encode(&TimedWALMessage{fixedTime, m})
  161. if err != nil {
  162. panic(fmt.Sprintf("failed to encode the msg %v", m))
  163. }
  164. }
  165. func (w *byteBufferWAL) WriteSync(m WALMessage) {
  166. w.Write(m)
  167. }
  168. func (w *byteBufferWAL) FlushAndSync() error { return nil }
  169. func (w *byteBufferWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
  170. return nil, false, nil
  171. }
  172. func (w *byteBufferWAL) Start() error { return nil }
  173. func (w *byteBufferWAL) Stop() error { return nil }
  174. func (w *byteBufferWAL) Wait() {}