You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

202 lines
6.5 KiB

7 years ago
7 years ago
  1. package consensus
  2. import (
  3. "bufio"
  4. "bytes"
  5. "fmt"
  6. "math/rand"
  7. "os"
  8. "path/filepath"
  9. "strings"
  10. "time"
  11. "github.com/pkg/errors"
  12. "github.com/tendermint/abci/example/kvstore"
  13. bc "github.com/tendermint/tendermint/blockchain"
  14. cfg "github.com/tendermint/tendermint/config"
  15. "github.com/tendermint/tendermint/proxy"
  16. sm "github.com/tendermint/tendermint/state"
  17. "github.com/tendermint/tendermint/types"
  18. pvm "github.com/tendermint/tendermint/types/priv_validator"
  19. auto "github.com/tendermint/tmlibs/autofile"
  20. cmn "github.com/tendermint/tmlibs/common"
  21. "github.com/tendermint/tmlibs/db"
  22. "github.com/tendermint/tmlibs/log"
  23. )
  24. // WALWithNBlocks generates a consensus WAL. It does this by spining up a
  25. // stripped down version of node (proxy app, event bus, consensus state) with a
  26. // persistent kvstore application and special consensus wal instance
  27. // (byteBufferWAL) and waits until numBlocks are created. Then it returns a WAL
  28. // content.
  29. func WALWithNBlocks(numBlocks int) (data []byte, err error) {
  30. config := getConfig()
  31. app := kvstore.NewPersistentKVStoreApplication(filepath.Join(config.DBDir(), "wal_generator"))
  32. logger := log.TestingLogger().With("wal_generator", "wal_generator")
  33. logger.Info("generating WAL (last height msg excluded)", "numBlocks", numBlocks)
  34. /////////////////////////////////////////////////////////////////////////////
  35. // COPY PASTE FROM node.go WITH A FEW MODIFICATIONS
  36. // NOTE: we can't import node package because of circular dependency
  37. privValidatorFile := config.PrivValidatorFile()
  38. privValidator := pvm.LoadOrGenFilePV(privValidatorFile)
  39. genDoc, err := types.GenesisDocFromFile(config.GenesisFile())
  40. if err != nil {
  41. return nil, errors.Wrap(err, "failed to read genesis file")
  42. }
  43. stateDB := db.NewMemDB()
  44. blockStoreDB := db.NewMemDB()
  45. state, err := sm.MakeGenesisState(genDoc)
  46. if err != nil {
  47. return nil, errors.Wrap(err, "failed to make genesis state")
  48. }
  49. blockStore := bc.NewBlockStore(blockStoreDB)
  50. handshaker := NewHandshaker(stateDB, state, blockStore, genDoc.AppState())
  51. proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app), handshaker)
  52. proxyApp.SetLogger(logger.With("module", "proxy"))
  53. if err := proxyApp.Start(); err != nil {
  54. return nil, errors.Wrap(err, "failed to start proxy app connections")
  55. }
  56. defer proxyApp.Stop()
  57. eventBus := types.NewEventBus()
  58. eventBus.SetLogger(logger.With("module", "events"))
  59. if err := eventBus.Start(); err != nil {
  60. return nil, errors.Wrap(err, "failed to start event bus")
  61. }
  62. defer eventBus.Stop()
  63. mempool := types.MockMempool{}
  64. evpool := types.MockEvidencePool{}
  65. blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
  66. consensusState := NewConsensusState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
  67. consensusState.SetLogger(logger)
  68. consensusState.SetEventBus(eventBus)
  69. if privValidator != nil {
  70. consensusState.SetPrivValidator(privValidator)
  71. }
  72. // END OF COPY PASTE
  73. /////////////////////////////////////////////////////////////////////////////
  74. // set consensus wal to buffered WAL, which will write all incoming msgs to buffer
  75. var b bytes.Buffer
  76. wr := bufio.NewWriter(&b)
  77. numBlocksWritten := make(chan struct{})
  78. wal := newByteBufferWAL(logger, NewWALEncoder(wr), int64(numBlocks), numBlocksWritten)
  79. // see wal.go#103
  80. wal.Save(EndHeightMessage{0})
  81. consensusState.wal = wal
  82. if err := consensusState.Start(); err != nil {
  83. return nil, errors.Wrap(err, "failed to start consensus state")
  84. }
  85. defer consensusState.Stop()
  86. select {
  87. case <-numBlocksWritten:
  88. wr.Flush()
  89. return b.Bytes(), nil
  90. case <-time.After(1 * time.Minute):
  91. wr.Flush()
  92. return b.Bytes(), fmt.Errorf("waited too long for tendermint to produce %d blocks (grep logs for `wal_generator`)", numBlocks)
  93. }
  94. }
  95. // f**ing long, but unique for each test
  96. func makePathname() string {
  97. // get path
  98. p, err := os.Getwd()
  99. if err != nil {
  100. panic(err)
  101. }
  102. // fmt.Println(p)
  103. sep := string(filepath.Separator)
  104. return strings.Replace(p, sep, "_", -1)
  105. }
  106. func randPort() int {
  107. // returns between base and base + spread
  108. base, spread := 20000, 20000
  109. return base + rand.Intn(spread)
  110. }
  111. func makeAddrs() (string, string, string) {
  112. start := randPort()
  113. return fmt.Sprintf("tcp://0.0.0.0:%d", start),
  114. fmt.Sprintf("tcp://0.0.0.0:%d", start+1),
  115. fmt.Sprintf("tcp://0.0.0.0:%d", start+2)
  116. }
  117. // getConfig returns a config for test cases
  118. func getConfig() *cfg.Config {
  119. pathname := makePathname()
  120. c := cfg.ResetTestRoot(fmt.Sprintf("%s_%d", pathname, cmn.RandInt()))
  121. // and we use random ports to run in parallel
  122. tm, rpc, grpc := makeAddrs()
  123. c.P2P.ListenAddress = tm
  124. c.RPC.ListenAddress = rpc
  125. c.RPC.GRPCListenAddress = grpc
  126. return c
  127. }
  128. // byteBufferWAL is a WAL which writes all msgs to a byte buffer. Writing stops
  129. // when the heightToStop is reached. Client will be notified via
  130. // signalWhenStopsTo channel.
  131. type byteBufferWAL struct {
  132. enc *WALEncoder
  133. stopped bool
  134. heightToStop int64
  135. signalWhenStopsTo chan<- struct{}
  136. logger log.Logger
  137. }
  138. // needed for determinism
  139. var fixedTime, _ = time.Parse(time.RFC3339, "2017-01-02T15:04:05Z")
  140. func newByteBufferWAL(logger log.Logger, enc *WALEncoder, nBlocks int64, signalStop chan<- struct{}) *byteBufferWAL {
  141. return &byteBufferWAL{
  142. enc: enc,
  143. heightToStop: nBlocks,
  144. signalWhenStopsTo: signalStop,
  145. logger: logger,
  146. }
  147. }
  148. // Save writes message to the internal buffer except when heightToStop is
  149. // reached, in which case it will signal the caller via signalWhenStopsTo and
  150. // skip writing.
  151. func (w *byteBufferWAL) Save(m WALMessage) {
  152. if w.stopped {
  153. w.logger.Debug("WAL already stopped. Not writing message", "msg", m)
  154. return
  155. }
  156. if endMsg, ok := m.(EndHeightMessage); ok {
  157. w.logger.Debug("WAL write end height message", "height", endMsg.Height, "stopHeight", w.heightToStop)
  158. if endMsg.Height == w.heightToStop {
  159. w.logger.Debug("Stopping WAL at height", "height", endMsg.Height)
  160. w.signalWhenStopsTo <- struct{}{}
  161. w.stopped = true
  162. return
  163. }
  164. }
  165. w.logger.Debug("WAL Write Message", "msg", m)
  166. err := w.enc.Encode(&TimedWALMessage{fixedTime, m})
  167. if err != nil {
  168. panic(fmt.Sprintf("failed to encode the msg %v", m))
  169. }
  170. }
  171. func (w *byteBufferWAL) Group() *auto.Group {
  172. panic("not implemented")
  173. }
  174. func (w *byteBufferWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) {
  175. return nil, false, nil
  176. }
  177. func (w *byteBufferWAL) Start() error { return nil }
  178. func (w *byteBufferWAL) Stop() error { return nil }
  179. func (w *byteBufferWAL) Wait() {}