You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

686 lines
21 KiB

  1. package consensus
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "os"
  9. "path"
  10. "runtime"
  11. "testing"
  12. "time"
  13. "github.com/stretchr/testify/assert"
  14. "github.com/stretchr/testify/require"
  15. "github.com/tendermint/tendermint/abci/example/kvstore"
  16. abci "github.com/tendermint/tendermint/abci/types"
  17. cfg "github.com/tendermint/tendermint/config"
  18. "github.com/tendermint/tendermint/crypto"
  19. auto "github.com/tendermint/tendermint/libs/autofile"
  20. dbm "github.com/tendermint/tendermint/libs/db"
  21. "github.com/tendermint/tendermint/libs/log"
  22. "github.com/tendermint/tendermint/privval"
  23. "github.com/tendermint/tendermint/proxy"
  24. sm "github.com/tendermint/tendermint/state"
  25. "github.com/tendermint/tendermint/types"
  26. "github.com/tendermint/tendermint/version"
  27. )
  28. func TestMain(m *testing.M) {
  29. config = ResetConfig("consensus_reactor_test")
  30. consensusReplayConfig = ResetConfig("consensus_replay_test")
  31. configStateTest := ResetConfig("consensus_state_test")
  32. configMempoolTest := ResetConfig("consensus_mempool_test")
  33. configByzantineTest := ResetConfig("consensus_byzantine_test")
  34. code := m.Run()
  35. os.RemoveAll(config.RootDir)
  36. os.RemoveAll(consensusReplayConfig.RootDir)
  37. os.RemoveAll(configStateTest.RootDir)
  38. os.RemoveAll(configMempoolTest.RootDir)
  39. os.RemoveAll(configByzantineTest.RootDir)
  40. os.Exit(code)
  41. }
  42. // These tests ensure we can always recover from failure at any part of the consensus process.
  43. // There are two general failure scenarios: failure during consensus, and failure while applying the block.
  44. // Only the latter interacts with the app and store,
  45. // but the former has to deal with restrictions on re-use of priv_validator keys.
  46. // The `WAL Tests` are for failures during the consensus;
  47. // the `Handshake Tests` are for failures in applying the block.
  48. // With the help of the WAL, we can recover from it all!
  49. //------------------------------------------------------------------------------------------
  50. // WAL Tests
  51. // TODO: It would be better to verify explicitly which states we can recover from without the wal
  52. // and which ones we need the wal for - then we'd also be able to only flush the
  53. // wal writer when we need to, instead of with every message.
  54. func startNewConsensusStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Config,
  55. lastBlockHeight int64, blockDB dbm.DB, stateDB dbm.DB) {
  56. logger := log.TestingLogger()
  57. state, _ := sm.LoadStateFromDBOrGenesisFile(stateDB, consensusReplayConfig.GenesisFile())
  58. privValidator := loadPrivValidator(consensusReplayConfig)
  59. cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, kvstore.NewKVStoreApplication(), blockDB)
  60. cs.SetLogger(logger)
  61. bytes, _ := ioutil.ReadFile(cs.config.WalFile())
  62. t.Logf("====== WAL: \n\r%X\n", bytes)
  63. err := cs.Start()
  64. require.NoError(t, err)
  65. defer cs.Stop()
  66. // This is just a signal that we haven't halted; its not something contained
  67. // in the WAL itself. Assuming the consensus state is running, replay of any
  68. // WAL, including the empty one, should eventually be followed by a new
  69. // block, or else something is wrong.
  70. newBlockCh := make(chan interface{}, 1)
  71. err = cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, newBlockCh)
  72. require.NoError(t, err)
  73. select {
  74. case <-newBlockCh:
  75. case <-time.After(60 * time.Second):
  76. t.Fatalf("Timed out waiting for new block (see trace above)")
  77. }
  78. }
  79. func sendTxs(cs *ConsensusState, ctx context.Context) {
  80. for i := 0; i < 256; i++ {
  81. select {
  82. case <-ctx.Done():
  83. return
  84. default:
  85. tx := []byte{byte(i)}
  86. assertMempool(cs.txNotifier).CheckTx(tx, nil)
  87. i++
  88. }
  89. }
  90. }
  91. // TestWALCrash uses crashing WAL to test we can recover from any WAL failure.
  92. func TestWALCrash(t *testing.T) {
  93. testCases := []struct {
  94. name string
  95. initFn func(dbm.DB, *ConsensusState, context.Context)
  96. heightToStop int64
  97. }{
  98. {"empty block",
  99. func(stateDB dbm.DB, cs *ConsensusState, ctx context.Context) {},
  100. 1},
  101. {"many non-empty blocks",
  102. func(stateDB dbm.DB, cs *ConsensusState, ctx context.Context) {
  103. go sendTxs(cs, ctx)
  104. },
  105. 3},
  106. }
  107. for i, tc := range testCases {
  108. consensusReplayConfig := ResetConfig(fmt.Sprintf("%s_%d", t.Name(), i))
  109. t.Run(tc.name, func(t *testing.T) {
  110. crashWALandCheckLiveness(t, consensusReplayConfig, tc.initFn, tc.heightToStop)
  111. })
  112. }
  113. }
  114. func crashWALandCheckLiveness(t *testing.T, consensusReplayConfig *cfg.Config,
  115. initFn func(dbm.DB, *ConsensusState, context.Context), heightToStop int64) {
  116. walPaniced := make(chan error)
  117. crashingWal := &crashingWAL{panicCh: walPaniced, heightToStop: heightToStop}
  118. i := 1
  119. LOOP:
  120. for {
  121. t.Logf("====== LOOP %d\n", i)
  122. // create consensus state from a clean slate
  123. logger := log.NewNopLogger()
  124. stateDB := dbm.NewMemDB()
  125. state, _ := sm.MakeGenesisStateFromFile(consensusReplayConfig.GenesisFile())
  126. privValidator := loadPrivValidator(consensusReplayConfig)
  127. blockDB := dbm.NewMemDB()
  128. cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, kvstore.NewKVStoreApplication(), blockDB)
  129. cs.SetLogger(logger)
  130. // start sending transactions
  131. ctx, cancel := context.WithCancel(context.Background())
  132. initFn(stateDB, cs, ctx)
  133. // clean up WAL file from the previous iteration
  134. walFile := cs.config.WalFile()
  135. os.Remove(walFile)
  136. // set crashing WAL
  137. csWal, err := cs.OpenWAL(walFile)
  138. require.NoError(t, err)
  139. crashingWal.next = csWal
  140. // reset the message counter
  141. crashingWal.msgIndex = 1
  142. cs.wal = crashingWal
  143. // start consensus state
  144. err = cs.Start()
  145. require.NoError(t, err)
  146. i++
  147. select {
  148. case err := <-walPaniced:
  149. t.Logf("WAL paniced: %v", err)
  150. // make sure we can make blocks after a crash
  151. startNewConsensusStateAndWaitForBlock(t, consensusReplayConfig, cs.Height, blockDB, stateDB)
  152. // stop consensus state and transactions sender (initFn)
  153. cs.Stop()
  154. cancel()
  155. // if we reached the required height, exit
  156. if _, ok := err.(ReachedHeightToStopError); ok {
  157. break LOOP
  158. }
  159. case <-time.After(10 * time.Second):
  160. t.Fatal("WAL did not panic for 10 seconds (check the log)")
  161. }
  162. }
  163. }
  164. // crashingWAL is a WAL which crashes or rather simulates a crash during Save
  165. // (before and after). It remembers a message for which we last panicked
  166. // (lastPanicedForMsgIndex), so we don't panic for it in subsequent iterations.
  167. type crashingWAL struct {
  168. next WAL
  169. panicCh chan error
  170. heightToStop int64
  171. msgIndex int // current message index
  172. lastPanicedForMsgIndex int // last message for which we panicked
  173. }
  174. // WALWriteError indicates a WAL crash.
  175. type WALWriteError struct {
  176. msg string
  177. }
  178. func (e WALWriteError) Error() string {
  179. return e.msg
  180. }
  181. // ReachedHeightToStopError indicates we've reached the required consensus
  182. // height and may exit.
  183. type ReachedHeightToStopError struct {
  184. height int64
  185. }
  186. func (e ReachedHeightToStopError) Error() string {
  187. return fmt.Sprintf("reached height to stop %d", e.height)
  188. }
  189. // Write simulate WAL's crashing by sending an error to the panicCh and then
  190. // exiting the cs.receiveRoutine.
  191. func (w *crashingWAL) Write(m WALMessage) {
  192. if endMsg, ok := m.(EndHeightMessage); ok {
  193. if endMsg.Height == w.heightToStop {
  194. w.panicCh <- ReachedHeightToStopError{endMsg.Height}
  195. runtime.Goexit()
  196. } else {
  197. w.next.Write(m)
  198. }
  199. return
  200. }
  201. if w.msgIndex > w.lastPanicedForMsgIndex {
  202. w.lastPanicedForMsgIndex = w.msgIndex
  203. _, file, line, _ := runtime.Caller(1)
  204. w.panicCh <- WALWriteError{fmt.Sprintf("failed to write %T to WAL (fileline: %s:%d)", m, file, line)}
  205. runtime.Goexit()
  206. } else {
  207. w.msgIndex++
  208. w.next.Write(m)
  209. }
  210. }
  211. func (w *crashingWAL) WriteSync(m WALMessage) {
  212. w.Write(m)
  213. }
  214. func (w *crashingWAL) Group() *auto.Group { return w.next.Group() }
  215. func (w *crashingWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) {
  216. return w.next.SearchForEndHeight(height, options)
  217. }
  218. func (w *crashingWAL) Start() error { return w.next.Start() }
  219. func (w *crashingWAL) Stop() error { return w.next.Stop() }
  220. func (w *crashingWAL) Wait() { w.next.Wait() }
  221. //------------------------------------------------------------------------------------------
  222. // Handshake Tests
  223. const (
  224. NUM_BLOCKS = 6
  225. )
  226. var (
  227. mempool = sm.MockMempool{}
  228. evpool = sm.MockEvidencePool{}
  229. )
  230. //---------------------------------------
  231. // Test handshake/replay
  232. // 0 - all synced up
  233. // 1 - saved block but app and state are behind
  234. // 2 - save block and committed but state is behind
  235. var modes = []uint{0, 1, 2}
  236. // Sync from scratch
  237. func TestHandshakeReplayAll(t *testing.T) {
  238. for i, m := range modes {
  239. config := ResetConfig(fmt.Sprintf("%s_%v", t.Name(), i))
  240. defer os.RemoveAll(config.RootDir)
  241. testHandshakeReplay(t, config, 0, m)
  242. }
  243. }
  244. // Sync many, not from scratch
  245. func TestHandshakeReplaySome(t *testing.T) {
  246. for i, m := range modes {
  247. config := ResetConfig(fmt.Sprintf("%s_%v", t.Name(), i))
  248. defer os.RemoveAll(config.RootDir)
  249. testHandshakeReplay(t, config, 1, m)
  250. }
  251. }
  252. // Sync from lagging by one
  253. func TestHandshakeReplayOne(t *testing.T) {
  254. for i, m := range modes {
  255. config := ResetConfig(fmt.Sprintf("%s_%v", t.Name(), i))
  256. defer os.RemoveAll(config.RootDir)
  257. testHandshakeReplay(t, config, NUM_BLOCKS-1, m)
  258. }
  259. }
  260. // Sync from caught up
  261. func TestHandshakeReplayNone(t *testing.T) {
  262. for i, m := range modes {
  263. config := ResetConfig(fmt.Sprintf("%s_%v", t.Name(), i))
  264. defer os.RemoveAll(config.RootDir)
  265. testHandshakeReplay(t, config, NUM_BLOCKS, m)
  266. }
  267. }
  268. func tempWALWithData(data []byte) string {
  269. walFile, err := ioutil.TempFile("", "wal")
  270. if err != nil {
  271. panic(fmt.Errorf("failed to create temp WAL file: %v", err))
  272. }
  273. _, err = walFile.Write(data)
  274. if err != nil {
  275. panic(fmt.Errorf("failed to write to temp WAL file: %v", err))
  276. }
  277. if err := walFile.Close(); err != nil {
  278. panic(fmt.Errorf("failed to close temp WAL file: %v", err))
  279. }
  280. return walFile.Name()
  281. }
  282. // Make some blocks. Start a fresh app and apply nBlocks blocks. Then restart the app and sync it up with the remaining blocks
  283. func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uint) {
  284. walBody, err := WALWithNBlocks(t, NUM_BLOCKS)
  285. require.NoError(t, err)
  286. walFile := tempWALWithData(walBody)
  287. config.Consensus.SetWalFile(walFile)
  288. privVal := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile())
  289. wal, err := NewWAL(walFile)
  290. require.NoError(t, err)
  291. wal.SetLogger(log.TestingLogger())
  292. err = wal.Start()
  293. require.NoError(t, err)
  294. defer wal.Stop()
  295. chain, commits, err := makeBlockchainFromWAL(wal)
  296. require.NoError(t, err)
  297. stateDB, state, store := stateAndStore(config, privVal.GetPubKey(), kvstore.ProtocolVersion)
  298. store.chain = chain
  299. store.commits = commits
  300. // run the chain through state.ApplyBlock to build up the tendermint state
  301. state = buildTMStateFromChain(config, stateDB, state, chain, mode)
  302. latestAppHash := state.AppHash
  303. // make a new client creator
  304. kvstoreApp := kvstore.NewPersistentKVStoreApplication(path.Join(config.DBDir(), "2"))
  305. clientCreator2 := proxy.NewLocalClientCreator(kvstoreApp)
  306. if nBlocks > 0 {
  307. // run nBlocks against a new client to build up the app state.
  308. // use a throwaway tendermint state
  309. proxyApp := proxy.NewAppConns(clientCreator2)
  310. stateDB, state, _ := stateAndStore(config, privVal.GetPubKey(), kvstore.ProtocolVersion)
  311. buildAppStateFromChain(proxyApp, stateDB, state, chain, nBlocks, mode)
  312. }
  313. // now start the app using the handshake - it should sync
  314. genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile())
  315. handshaker := NewHandshaker(stateDB, state, store, genDoc)
  316. proxyApp := proxy.NewAppConns(clientCreator2)
  317. if err := proxyApp.Start(); err != nil {
  318. t.Fatalf("Error starting proxy app connections: %v", err)
  319. }
  320. defer proxyApp.Stop()
  321. if err := handshaker.Handshake(proxyApp); err != nil {
  322. t.Fatalf("Error on abci handshake: %v", err)
  323. }
  324. // get the latest app hash from the app
  325. res, err := proxyApp.Query().InfoSync(abci.RequestInfo{Version: ""})
  326. if err != nil {
  327. t.Fatal(err)
  328. }
  329. // the app hash should be synced up
  330. if !bytes.Equal(latestAppHash, res.LastBlockAppHash) {
  331. t.Fatalf("Expected app hashes to match after handshake/replay. got %X, expected %X", res.LastBlockAppHash, latestAppHash)
  332. }
  333. expectedBlocksToSync := NUM_BLOCKS - nBlocks
  334. if nBlocks == NUM_BLOCKS && mode > 0 {
  335. expectedBlocksToSync++
  336. } else if nBlocks > 0 && mode == 1 {
  337. expectedBlocksToSync++
  338. }
  339. if handshaker.NBlocks() != expectedBlocksToSync {
  340. t.Fatalf("Expected handshake to sync %d blocks, got %d", expectedBlocksToSync, handshaker.NBlocks())
  341. }
  342. }
  343. func applyBlock(stateDB dbm.DB, st sm.State, blk *types.Block, proxyApp proxy.AppConns) sm.State {
  344. testPartSize := types.BlockPartSizeBytes
  345. blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
  346. blkID := types.BlockID{blk.Hash(), blk.MakePartSet(testPartSize).Header()}
  347. newState, err := blockExec.ApplyBlock(st, blkID, blk)
  348. if err != nil {
  349. panic(err)
  350. }
  351. return newState
  352. }
  353. func buildAppStateFromChain(proxyApp proxy.AppConns, stateDB dbm.DB,
  354. state sm.State, chain []*types.Block, nBlocks int, mode uint) {
  355. // start a new app without handshake, play nBlocks blocks
  356. if err := proxyApp.Start(); err != nil {
  357. panic(err)
  358. }
  359. defer proxyApp.Stop()
  360. validators := types.TM2PB.ValidatorUpdates(state.Validators)
  361. if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{
  362. Validators: validators,
  363. }); err != nil {
  364. panic(err)
  365. }
  366. switch mode {
  367. case 0:
  368. for i := 0; i < nBlocks; i++ {
  369. block := chain[i]
  370. state = applyBlock(stateDB, state, block, proxyApp)
  371. }
  372. case 1, 2:
  373. for i := 0; i < nBlocks-1; i++ {
  374. block := chain[i]
  375. state = applyBlock(stateDB, state, block, proxyApp)
  376. }
  377. if mode == 2 {
  378. // update the kvstore height and apphash
  379. // as if we ran commit but not
  380. state = applyBlock(stateDB, state, chain[nBlocks-1], proxyApp)
  381. }
  382. }
  383. }
  384. func buildTMStateFromChain(config *cfg.Config, stateDB dbm.DB, state sm.State, chain []*types.Block, mode uint) sm.State {
  385. // run the whole chain against this client to build up the tendermint state
  386. clientCreator := proxy.NewLocalClientCreator(kvstore.NewPersistentKVStoreApplication(path.Join(config.DBDir(), "1")))
  387. proxyApp := proxy.NewAppConns(clientCreator)
  388. if err := proxyApp.Start(); err != nil {
  389. panic(err)
  390. }
  391. defer proxyApp.Stop()
  392. validators := types.TM2PB.ValidatorUpdates(state.Validators)
  393. if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{
  394. Validators: validators,
  395. }); err != nil {
  396. panic(err)
  397. }
  398. switch mode {
  399. case 0:
  400. // sync right up
  401. for _, block := range chain {
  402. state = applyBlock(stateDB, state, block, proxyApp)
  403. }
  404. case 1, 2:
  405. // sync up to the penultimate as if we stored the block.
  406. // whether we commit or not depends on the appHash
  407. for _, block := range chain[:len(chain)-1] {
  408. state = applyBlock(stateDB, state, block, proxyApp)
  409. }
  410. // apply the final block to a state copy so we can
  411. // get the right next appHash but keep the state back
  412. applyBlock(stateDB, state, chain[len(chain)-1], proxyApp)
  413. }
  414. return state
  415. }
  416. //--------------------------
  417. // utils for making blocks
  418. func makeBlockchainFromWAL(wal WAL) ([]*types.Block, []*types.Commit, error) {
  419. // Search for height marker
  420. gr, found, err := wal.SearchForEndHeight(0, &WALSearchOptions{})
  421. if err != nil {
  422. return nil, nil, err
  423. }
  424. if !found {
  425. return nil, nil, fmt.Errorf("WAL does not contain height %d.", 1)
  426. }
  427. defer gr.Close() // nolint: errcheck
  428. // log.Notice("Build a blockchain by reading from the WAL")
  429. var blocks []*types.Block
  430. var commits []*types.Commit
  431. var thisBlockParts *types.PartSet
  432. var thisBlockCommit *types.Commit
  433. var height int64
  434. dec := NewWALDecoder(gr)
  435. for {
  436. msg, err := dec.Decode()
  437. if err == io.EOF {
  438. break
  439. } else if err != nil {
  440. return nil, nil, err
  441. }
  442. piece := readPieceFromWAL(msg)
  443. if piece == nil {
  444. continue
  445. }
  446. switch p := piece.(type) {
  447. case EndHeightMessage:
  448. // if its not the first one, we have a full block
  449. if thisBlockParts != nil {
  450. var block = new(types.Block)
  451. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(thisBlockParts.GetReader(), block, 0)
  452. if err != nil {
  453. panic(err)
  454. }
  455. if block.Height != height+1 {
  456. panic(fmt.Sprintf("read bad block from wal. got height %d, expected %d", block.Height, height+1))
  457. }
  458. commitHeight := thisBlockCommit.Precommits[0].Height
  459. if commitHeight != height+1 {
  460. panic(fmt.Sprintf("commit doesnt match. got height %d, expected %d", commitHeight, height+1))
  461. }
  462. blocks = append(blocks, block)
  463. commits = append(commits, thisBlockCommit)
  464. height++
  465. }
  466. case *types.PartSetHeader:
  467. thisBlockParts = types.NewPartSetFromHeader(*p)
  468. case *types.Part:
  469. _, err := thisBlockParts.AddPart(p)
  470. if err != nil {
  471. return nil, nil, err
  472. }
  473. case *types.Vote:
  474. if p.Type == types.PrecommitType {
  475. commitSigs := []*types.CommitSig{p.CommitSig()}
  476. thisBlockCommit = types.NewCommit(p.BlockID, commitSigs)
  477. }
  478. }
  479. }
  480. // grab the last block too
  481. var block = new(types.Block)
  482. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(thisBlockParts.GetReader(), block, 0)
  483. if err != nil {
  484. panic(err)
  485. }
  486. if block.Height != height+1 {
  487. panic(fmt.Sprintf("read bad block from wal. got height %d, expected %d", block.Height, height+1))
  488. }
  489. commitHeight := thisBlockCommit.Precommits[0].Height
  490. if commitHeight != height+1 {
  491. panic(fmt.Sprintf("commit doesnt match. got height %d, expected %d", commitHeight, height+1))
  492. }
  493. blocks = append(blocks, block)
  494. commits = append(commits, thisBlockCommit)
  495. return blocks, commits, nil
  496. }
  497. func readPieceFromWAL(msg *TimedWALMessage) interface{} {
  498. // for logging
  499. switch m := msg.Msg.(type) {
  500. case msgInfo:
  501. switch msg := m.Msg.(type) {
  502. case *ProposalMessage:
  503. return &msg.Proposal.BlockID.PartsHeader
  504. case *BlockPartMessage:
  505. return msg.Part
  506. case *VoteMessage:
  507. return msg.Vote
  508. }
  509. case EndHeightMessage:
  510. return m
  511. }
  512. return nil
  513. }
  514. // fresh state and mock store
  515. func stateAndStore(config *cfg.Config, pubKey crypto.PubKey, appVersion version.Protocol) (dbm.DB, sm.State, *mockBlockStore) {
  516. stateDB := dbm.NewMemDB()
  517. state, _ := sm.MakeGenesisStateFromFile(config.GenesisFile())
  518. state.Version.Consensus.App = appVersion
  519. store := NewMockBlockStore(config, state.ConsensusParams)
  520. return stateDB, state, store
  521. }
  522. //----------------------------------
  523. // mock block store
  524. type mockBlockStore struct {
  525. config *cfg.Config
  526. params types.ConsensusParams
  527. chain []*types.Block
  528. commits []*types.Commit
  529. }
  530. // TODO: NewBlockStore(db.NewMemDB) ...
  531. func NewMockBlockStore(config *cfg.Config, params types.ConsensusParams) *mockBlockStore {
  532. return &mockBlockStore{config, params, nil, nil}
  533. }
  534. func (bs *mockBlockStore) Height() int64 { return int64(len(bs.chain)) }
  535. func (bs *mockBlockStore) LoadBlock(height int64) *types.Block { return bs.chain[height-1] }
  536. func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta {
  537. block := bs.chain[height-1]
  538. return &types.BlockMeta{
  539. BlockID: types.BlockID{block.Hash(), block.MakePartSet(types.BlockPartSizeBytes).Header()},
  540. Header: block.Header,
  541. }
  542. }
  543. func (bs *mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { return nil }
  544. func (bs *mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
  545. }
  546. func (bs *mockBlockStore) LoadBlockCommit(height int64) *types.Commit {
  547. return bs.commits[height-1]
  548. }
  549. func (bs *mockBlockStore) LoadSeenCommit(height int64) *types.Commit {
  550. return bs.commits[height-1]
  551. }
  552. //----------------------------------------
  553. func TestInitChainUpdateValidators(t *testing.T) {
  554. val, _ := types.RandValidator(true, 10)
  555. vals := types.NewValidatorSet([]*types.Validator{val})
  556. app := &initChainApp{vals: types.TM2PB.ValidatorUpdates(vals)}
  557. clientCreator := proxy.NewLocalClientCreator(app)
  558. config := ResetConfig("proxy_test_")
  559. defer os.RemoveAll(config.RootDir)
  560. privVal := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile())
  561. stateDB, state, store := stateAndStore(config, privVal.GetPubKey(), 0x0)
  562. oldValAddr := state.Validators.Validators[0].Address
  563. // now start the app using the handshake - it should sync
  564. genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile())
  565. handshaker := NewHandshaker(stateDB, state, store, genDoc)
  566. proxyApp := proxy.NewAppConns(clientCreator)
  567. if err := proxyApp.Start(); err != nil {
  568. t.Fatalf("Error starting proxy app connections: %v", err)
  569. }
  570. defer proxyApp.Stop()
  571. if err := handshaker.Handshake(proxyApp); err != nil {
  572. t.Fatalf("Error on abci handshake: %v", err)
  573. }
  574. // reload the state, check the validator set was updated
  575. state = sm.LoadState(stateDB)
  576. newValAddr := state.Validators.Validators[0].Address
  577. expectValAddr := val.Address
  578. assert.NotEqual(t, oldValAddr, newValAddr)
  579. assert.Equal(t, newValAddr, expectValAddr)
  580. }
  581. // returns the vals on InitChain
  582. type initChainApp struct {
  583. abci.BaseApplication
  584. vals []abci.ValidatorUpdate
  585. }
  586. func (ica *initChainApp) InitChain(req abci.RequestInitChain) abci.ResponseInitChain {
  587. return abci.ResponseInitChain{
  588. Validators: ica.vals,
  589. }
  590. }