You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

218 lines
6.9 KiB

  1. package consensus
  2. import (
  3. "bufio"
  4. "bytes"
  5. "fmt"
  6. "io"
  7. "os"
  8. "path/filepath"
  9. "strings"
  10. "time"
  11. "github.com/pkg/errors"
  12. "github.com/tendermint/tendermint/abci/example/kvstore"
  13. bc "github.com/tendermint/tendermint/blockchain"
  14. cfg "github.com/tendermint/tendermint/config"
  15. auto "github.com/tendermint/tendermint/libs/autofile"
  16. cmn "github.com/tendermint/tendermint/libs/common"
  17. "github.com/tendermint/tendermint/libs/db"
  18. "github.com/tendermint/tendermint/libs/log"
  19. "github.com/tendermint/tendermint/privval"
  20. "github.com/tendermint/tendermint/proxy"
  21. sm "github.com/tendermint/tendermint/state"
  22. "github.com/tendermint/tendermint/types"
  23. )
  24. // WALGenerateNBlocks generates a consensus WAL. It does this by spining up a
  25. // stripped down version of node (proxy app, event bus, consensus state) with a
  26. // persistent kvstore application and special consensus wal instance
  27. // (byteBufferWAL) and waits until numBlocks are created. If the node fails to produce given numBlocks, it returns an error.
  28. func WALGenerateNBlocks(wr io.Writer, numBlocks int) (err error) {
  29. config := getConfig()
  30. app := kvstore.NewPersistentKVStoreApplication(filepath.Join(config.DBDir(), "wal_generator"))
  31. logger := log.TestingLogger().With("wal_generator", "wal_generator")
  32. logger.Info("generating WAL (last height msg excluded)", "numBlocks", numBlocks)
  33. /////////////////////////////////////////////////////////////////////////////
  34. // COPY PASTE FROM node.go WITH A FEW MODIFICATIONS
  35. // NOTE: we can't import node package because of circular dependency.
  36. // NOTE: we don't do handshake so need to set state.Version.Consensus.App directly.
  37. privValidatorKeyFile := config.PrivValidatorKeyFile()
  38. privValidatorStateFile := config.PrivValidatorStateFile()
  39. privValidator := privval.LoadOrGenFilePV(privValidatorKeyFile, privValidatorStateFile)
  40. genDoc, err := types.GenesisDocFromFile(config.GenesisFile())
  41. if err != nil {
  42. return errors.Wrap(err, "failed to read genesis file")
  43. }
  44. stateDB := db.NewMemDB()
  45. blockStoreDB := db.NewMemDB()
  46. state, err := sm.MakeGenesisState(genDoc)
  47. if err != nil {
  48. return errors.Wrap(err, "failed to make genesis state")
  49. }
  50. state.Version.Consensus.App = kvstore.ProtocolVersion
  51. blockStore := bc.NewBlockStore(blockStoreDB)
  52. proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app))
  53. proxyApp.SetLogger(logger.With("module", "proxy"))
  54. if err := proxyApp.Start(); err != nil {
  55. return errors.Wrap(err, "failed to start proxy app connections")
  56. }
  57. defer proxyApp.Stop()
  58. eventBus := types.NewEventBus()
  59. eventBus.SetLogger(logger.With("module", "events"))
  60. if err := eventBus.Start(); err != nil {
  61. return errors.Wrap(err, "failed to start event bus")
  62. }
  63. defer eventBus.Stop()
  64. mempool := sm.MockMempool{}
  65. evpool := sm.MockEvidencePool{}
  66. blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
  67. consensusState := NewConsensusState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
  68. consensusState.SetLogger(logger)
  69. consensusState.SetEventBus(eventBus)
  70. if privValidator != nil {
  71. consensusState.SetPrivValidator(privValidator)
  72. }
  73. // END OF COPY PASTE
  74. /////////////////////////////////////////////////////////////////////////////
  75. // set consensus wal to buffered WAL, which will write all incoming msgs to buffer
  76. numBlocksWritten := make(chan struct{})
  77. wal := newByteBufferWAL(logger, NewWALEncoder(wr), int64(numBlocks), numBlocksWritten)
  78. // see wal.go#103
  79. wal.Write(EndHeightMessage{0})
  80. consensusState.wal = wal
  81. if err := consensusState.Start(); err != nil {
  82. return errors.Wrap(err, "failed to start consensus state")
  83. }
  84. select {
  85. case <-numBlocksWritten:
  86. consensusState.Stop()
  87. return nil
  88. case <-time.After(1 * time.Minute):
  89. consensusState.Stop()
  90. return fmt.Errorf("waited too long for tendermint to produce %d blocks (grep logs for `wal_generator`)", numBlocks)
  91. }
  92. }
  93. //WALWithNBlocks returns a WAL content with numBlocks.
  94. func WALWithNBlocks(numBlocks int) (data []byte, err error) {
  95. var b bytes.Buffer
  96. wr := bufio.NewWriter(&b)
  97. if err := WALGenerateNBlocks(wr, numBlocks); err != nil {
  98. return []byte{}, err
  99. }
  100. wr.Flush()
  101. return b.Bytes(), nil
  102. }
  103. // f**ing long, but unique for each test
  104. func makePathname() string {
  105. // get path
  106. p, err := os.Getwd()
  107. if err != nil {
  108. panic(err)
  109. }
  110. // fmt.Println(p)
  111. sep := string(filepath.Separator)
  112. return strings.Replace(p, sep, "_", -1)
  113. }
  114. func randPort() int {
  115. // returns between base and base + spread
  116. base, spread := 20000, 20000
  117. return base + cmn.RandIntn(spread)
  118. }
  119. func makeAddrs() (string, string, string) {
  120. start := randPort()
  121. return fmt.Sprintf("tcp://0.0.0.0:%d", start),
  122. fmt.Sprintf("tcp://0.0.0.0:%d", start+1),
  123. fmt.Sprintf("tcp://0.0.0.0:%d", start+2)
  124. }
  125. // getConfig returns a config for test cases
  126. func getConfig() *cfg.Config {
  127. pathname := makePathname()
  128. c := cfg.ResetTestRoot(fmt.Sprintf("%s_%d", pathname, cmn.RandInt()))
  129. // and we use random ports to run in parallel
  130. tm, rpc, grpc := makeAddrs()
  131. c.P2P.ListenAddress = tm
  132. c.RPC.ListenAddress = rpc
  133. c.RPC.GRPCListenAddress = grpc
  134. return c
  135. }
  136. // byteBufferWAL is a WAL which writes all msgs to a byte buffer. Writing stops
  137. // when the heightToStop is reached. Client will be notified via
  138. // signalWhenStopsTo channel.
  139. type byteBufferWAL struct {
  140. enc *WALEncoder
  141. stopped bool
  142. heightToStop int64
  143. signalWhenStopsTo chan<- struct{}
  144. logger log.Logger
  145. }
  146. // needed for determinism
  147. var fixedTime, _ = time.Parse(time.RFC3339, "2017-01-02T15:04:05Z")
  148. func newByteBufferWAL(logger log.Logger, enc *WALEncoder, nBlocks int64, signalStop chan<- struct{}) *byteBufferWAL {
  149. return &byteBufferWAL{
  150. enc: enc,
  151. heightToStop: nBlocks,
  152. signalWhenStopsTo: signalStop,
  153. logger: logger,
  154. }
  155. }
  156. // Save writes message to the internal buffer except when heightToStop is
  157. // reached, in which case it will signal the caller via signalWhenStopsTo and
  158. // skip writing.
  159. func (w *byteBufferWAL) Write(m WALMessage) {
  160. if w.stopped {
  161. w.logger.Debug("WAL already stopped. Not writing message", "msg", m)
  162. return
  163. }
  164. if endMsg, ok := m.(EndHeightMessage); ok {
  165. w.logger.Debug("WAL write end height message", "height", endMsg.Height, "stopHeight", w.heightToStop)
  166. if endMsg.Height == w.heightToStop {
  167. w.logger.Debug("Stopping WAL at height", "height", endMsg.Height)
  168. w.signalWhenStopsTo <- struct{}{}
  169. w.stopped = true
  170. return
  171. }
  172. }
  173. w.logger.Debug("WAL Write Message", "msg", m)
  174. err := w.enc.Encode(&TimedWALMessage{fixedTime, m})
  175. if err != nil {
  176. panic(fmt.Sprintf("failed to encode the msg %v", m))
  177. }
  178. }
  179. func (w *byteBufferWAL) WriteSync(m WALMessage) {
  180. w.Write(m)
  181. }
  182. func (w *byteBufferWAL) Group() *auto.Group {
  183. panic("not implemented")
  184. }
  185. func (w *byteBufferWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) {
  186. return nil, false, nil
  187. }
  188. func (w *byteBufferWAL) Start() error { return nil }
  189. func (w *byteBufferWAL) Stop() error { return nil }
  190. func (w *byteBufferWAL) Wait() {}