You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

181 lines
5.6 KiB

  1. package consensus
  2. import (
  3. "bufio"
  4. "bytes"
  5. "fmt"
  6. "math/rand"
  7. "os"
  8. "path/filepath"
  9. "strings"
  10. "time"
  11. "github.com/pkg/errors"
  12. "github.com/tendermint/abci/example/dummy"
  13. bc "github.com/tendermint/tendermint/blockchain"
  14. cfg "github.com/tendermint/tendermint/config"
  15. "github.com/tendermint/tendermint/proxy"
  16. sm "github.com/tendermint/tendermint/state"
  17. "github.com/tendermint/tendermint/types"
  18. auto "github.com/tendermint/tmlibs/autofile"
  19. "github.com/tendermint/tmlibs/db"
  20. "github.com/tendermint/tmlibs/log"
  21. )
  22. // WALWithNBlocks generates a consensus WAL. It does this by spining up a
  23. // stripped down version of node (proxy app, event bus, consensus state) with a
  24. // persistent dummy application and special consensus wal instance
  25. // (byteBufferWAL) and waits until numBlocks are created. Then it returns a WAL
  26. // content.
  27. func WALWithNBlocks(numBlocks int) (data []byte, err error) {
  28. config := getConfig()
  29. app := dummy.NewPersistentDummyApplication(filepath.Join(config.DBDir(), "wal_generator"))
  30. logger := log.NewNopLogger() // log.TestingLogger().With("wal_generator", "wal_generator")
  31. /////////////////////////////////////////////////////////////////////////////
  32. // COPY PASTE FROM node.go WITH A FEW MODIFICATIONS
  33. // NOTE: we can't import node package because of circular dependency
  34. privValidatorFile := config.PrivValidatorFile()
  35. privValidator := types.LoadOrGenPrivValidatorFS(privValidatorFile)
  36. genDoc, err := types.GenesisDocFromFile(config.GenesisFile())
  37. if err != nil {
  38. return nil, errors.Wrap(err, "failed to read genesis file")
  39. }
  40. stateDB := db.NewMemDB()
  41. blockStoreDB := db.NewMemDB()
  42. state, err := sm.MakeGenesisState(stateDB, genDoc)
  43. state.SetLogger(logger.With("module", "state"))
  44. if err != nil {
  45. return nil, errors.Wrap(err, "failed to make genesis state")
  46. }
  47. blockStore := bc.NewBlockStore(blockStoreDB)
  48. handshaker := NewHandshaker(state, blockStore)
  49. proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app), handshaker)
  50. proxyApp.SetLogger(logger.With("module", "proxy"))
  51. if err := proxyApp.Start(); err != nil {
  52. return nil, errors.Wrap(err, "failed to start proxy app connections")
  53. }
  54. defer proxyApp.Stop()
  55. eventBus := types.NewEventBus()
  56. eventBus.SetLogger(logger.With("module", "events"))
  57. if err := eventBus.Start(); err != nil {
  58. return nil, errors.Wrap(err, "failed to start event bus")
  59. }
  60. mempool := types.MockMempool{}
  61. consensusState := NewConsensusState(config.Consensus, state.Copy(), proxyApp.Consensus(), blockStore, mempool)
  62. consensusState.SetLogger(logger)
  63. consensusState.SetEventBus(eventBus)
  64. if privValidator != nil {
  65. consensusState.SetPrivValidator(privValidator)
  66. }
  67. // END OF COPY PASTE
  68. /////////////////////////////////////////////////////////////////////////////
  69. // set consensus wal to buffered WAL, which will write all incoming msgs to buffer
  70. var b bytes.Buffer
  71. wr := bufio.NewWriter(&b)
  72. numBlocksWritten := make(chan struct{})
  73. wal := &byteBufferWAL{enc: NewWALEncoder(wr), heightToStop: int64(numBlocks), signalWhenStopsTo: numBlocksWritten}
  74. // see wal.go#103
  75. wal.Save(EndHeightMessage{0})
  76. consensusState.wal = wal
  77. if err := consensusState.Start(); err != nil {
  78. return nil, errors.Wrap(err, "failed to start consensus state")
  79. }
  80. defer consensusState.Stop()
  81. select {
  82. case <-numBlocksWritten:
  83. wr.Flush()
  84. return b.Bytes(), nil
  85. case <-time.After(time.Duration(2*numBlocks) * time.Second):
  86. return b.Bytes(), fmt.Errorf("waited too long for tendermint to produce %d blocks", numBlocks)
  87. }
  88. }
  89. // f**ing long, but unique for each test
  90. func makePathname() string {
  91. // get path
  92. p, err := os.Getwd()
  93. if err != nil {
  94. panic(err)
  95. }
  96. fmt.Println(p)
  97. sep := string(filepath.Separator)
  98. return strings.Replace(p, sep, "_", -1)
  99. }
  100. func randPort() int {
  101. // returns between base and base + spread
  102. base, spread := 20000, 20000
  103. return base + rand.Intn(spread)
  104. }
  105. func makeAddrs() (string, string, string) {
  106. start := randPort()
  107. return fmt.Sprintf("tcp://0.0.0.0:%d", start),
  108. fmt.Sprintf("tcp://0.0.0.0:%d", start+1),
  109. fmt.Sprintf("tcp://0.0.0.0:%d", start+2)
  110. }
  111. // getConfig returns a config for test cases
  112. func getConfig() *cfg.Config {
  113. pathname := makePathname()
  114. c := cfg.ResetTestRoot(pathname)
  115. // and we use random ports to run in parallel
  116. tm, rpc, grpc := makeAddrs()
  117. c.P2P.ListenAddress = tm
  118. c.RPC.ListenAddress = rpc
  119. c.RPC.GRPCListenAddress = grpc
  120. return c
  121. }
  122. // byteBufferWAL is a WAL which writes all msgs to a byte buffer. Writing stops
  123. // when the heightToStop is reached. Client will be notified via
  124. // signalWhenStopsTo channel.
  125. type byteBufferWAL struct {
  126. enc *WALEncoder
  127. stopped bool
  128. heightToStop int64
  129. signalWhenStopsTo chan struct{}
  130. }
  131. // needed for determinism
  132. var fixedTime, _ = time.Parse(time.RFC3339, "2017-01-02T15:04:05Z")
  133. // Save writes message to the internal buffer except when heightToStop is
  134. // reached, in which case it will signal the caller via signalWhenStopsTo and
  135. // skip writing.
  136. func (w *byteBufferWAL) Save(m WALMessage) {
  137. if w.stopped {
  138. return
  139. }
  140. if endMsg, ok := m.(EndHeightMessage); ok {
  141. if endMsg.Height == w.heightToStop {
  142. w.signalWhenStopsTo <- struct{}{}
  143. w.stopped = true
  144. return
  145. }
  146. }
  147. err := w.enc.Encode(&TimedWALMessage{fixedTime, m})
  148. if err != nil {
  149. panic(fmt.Sprintf("failed to encode the msg %v", m))
  150. }
  151. }
  152. func (w *byteBufferWAL) Group() *auto.Group {
  153. panic("not implemented")
  154. }
  155. func (w *byteBufferWAL) SearchForEndHeight(height int64) (gr *auto.GroupReader, found bool, err error) {
  156. return nil, false, nil
  157. }
  158. func (w *byteBufferWAL) Start() error { return nil }
  159. func (w *byteBufferWAL) Stop() error { return nil }
  160. func (w *byteBufferWAL) Wait() {}