You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

687 lines
20 KiB

  1. package consensus
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "io/ioutil"
  9. "os"
  10. "path"
  11. "runtime"
  12. "testing"
  13. "time"
  14. "github.com/stretchr/testify/assert"
  15. "github.com/stretchr/testify/require"
  16. "github.com/tendermint/tendermint/abci/example/kvstore"
  17. abci "github.com/tendermint/tendermint/abci/types"
  18. crypto "github.com/tendermint/tendermint/crypto"
  19. auto "github.com/tendermint/tmlibs/autofile"
  20. cmn "github.com/tendermint/tmlibs/common"
  21. dbm "github.com/tendermint/tmlibs/db"
  22. cfg "github.com/tendermint/tendermint/config"
  23. "github.com/tendermint/tendermint/privval"
  24. "github.com/tendermint/tendermint/proxy"
  25. sm "github.com/tendermint/tendermint/state"
  26. "github.com/tendermint/tendermint/types"
  27. "github.com/tendermint/tmlibs/log"
  28. )
  29. var consensusReplayConfig *cfg.Config
  30. func init() {
  31. consensusReplayConfig = ResetConfig("consensus_replay_test")
  32. }
  33. // These tests ensure we can always recover from failure at any part of the consensus process.
  34. // There are two general failure scenarios: failure during consensus, and failure while applying the block.
  35. // Only the latter interacts with the app and store,
  36. // but the former has to deal with restrictions on re-use of priv_validator keys.
  37. // The `WAL Tests` are for failures during the consensus;
  38. // the `Handshake Tests` are for failures in applying the block.
  39. // With the help of the WAL, we can recover from it all!
  40. //------------------------------------------------------------------------------------------
  41. // WAL Tests
  42. // TODO: It would be better to verify explicitly which states we can recover from without the wal
  43. // and which ones we need the wal for - then we'd also be able to only flush the
  44. // wal writer when we need to, instead of with every message.
  45. func startNewConsensusStateAndWaitForBlock(t *testing.T, lastBlockHeight int64, blockDB dbm.DB, stateDB dbm.DB) {
  46. logger := log.TestingLogger()
  47. state, _ := sm.LoadStateFromDBOrGenesisFile(stateDB, consensusReplayConfig.GenesisFile())
  48. privValidator := loadPrivValidator(consensusReplayConfig)
  49. cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, kvstore.NewKVStoreApplication(), blockDB)
  50. cs.SetLogger(logger)
  51. bytes, _ := ioutil.ReadFile(cs.config.WalFile())
  52. // fmt.Printf("====== WAL: \n\r%s\n", bytes)
  53. t.Logf("====== WAL: \n\r%X\n", bytes)
  54. err := cs.Start()
  55. require.NoError(t, err)
  56. defer cs.Stop()
  57. // This is just a signal that we haven't halted; its not something contained
  58. // in the WAL itself. Assuming the consensus state is running, replay of any
  59. // WAL, including the empty one, should eventually be followed by a new
  60. // block, or else something is wrong.
  61. newBlockCh := make(chan interface{}, 1)
  62. err = cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, newBlockCh)
  63. require.NoError(t, err)
  64. select {
  65. case <-newBlockCh:
  66. case <-time.After(60 * time.Second):
  67. t.Fatalf("Timed out waiting for new block (see trace above)")
  68. }
  69. }
  70. func sendTxs(cs *ConsensusState, ctx context.Context) {
  71. for i := 0; i < 256; i++ {
  72. select {
  73. case <-ctx.Done():
  74. return
  75. default:
  76. tx := []byte{byte(i)}
  77. cs.mempool.CheckTx(tx, nil)
  78. i++
  79. }
  80. }
  81. }
  82. // TestWALCrash uses crashing WAL to test we can recover from any WAL failure.
  83. func TestWALCrash(t *testing.T) {
  84. testCases := []struct {
  85. name string
  86. initFn func(dbm.DB, *ConsensusState, context.Context)
  87. heightToStop int64
  88. }{
  89. {"empty block",
  90. func(stateDB dbm.DB, cs *ConsensusState, ctx context.Context) {},
  91. 1},
  92. {"block with a smaller part size",
  93. func(stateDB dbm.DB, cs *ConsensusState, ctx context.Context) {
  94. // XXX: is there a better way to change BlockPartSizeBytes?
  95. cs.state.ConsensusParams.BlockPartSizeBytes = 512
  96. sm.SaveState(stateDB, cs.state)
  97. go sendTxs(cs, ctx)
  98. },
  99. 1},
  100. {"many non-empty blocks",
  101. func(stateDB dbm.DB, cs *ConsensusState, ctx context.Context) {
  102. go sendTxs(cs, ctx)
  103. },
  104. 3},
  105. }
  106. for _, tc := range testCases {
  107. t.Run(tc.name, func(t *testing.T) {
  108. crashWALandCheckLiveness(t, tc.initFn, tc.heightToStop)
  109. })
  110. }
  111. }
  112. func crashWALandCheckLiveness(t *testing.T, initFn func(dbm.DB, *ConsensusState, context.Context), heightToStop int64) {
  113. walPaniced := make(chan error)
  114. crashingWal := &crashingWAL{panicCh: walPaniced, heightToStop: heightToStop}
  115. i := 1
  116. LOOP:
  117. for {
  118. // fmt.Printf("====== LOOP %d\n", i)
  119. t.Logf("====== LOOP %d\n", i)
  120. // create consensus state from a clean slate
  121. logger := log.NewNopLogger()
  122. stateDB := dbm.NewMemDB()
  123. state, _ := sm.MakeGenesisStateFromFile(consensusReplayConfig.GenesisFile())
  124. privValidator := loadPrivValidator(consensusReplayConfig)
  125. blockDB := dbm.NewMemDB()
  126. cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, kvstore.NewKVStoreApplication(), blockDB)
  127. cs.SetLogger(logger)
  128. // start sending transactions
  129. ctx, cancel := context.WithCancel(context.Background())
  130. initFn(stateDB, cs, ctx)
  131. // clean up WAL file from the previous iteration
  132. walFile := cs.config.WalFile()
  133. os.Remove(walFile)
  134. // set crashing WAL
  135. csWal, err := cs.OpenWAL(walFile)
  136. require.NoError(t, err)
  137. crashingWal.next = csWal
  138. // reset the message counter
  139. crashingWal.msgIndex = 1
  140. cs.wal = crashingWal
  141. // start consensus state
  142. err = cs.Start()
  143. require.NoError(t, err)
  144. i++
  145. select {
  146. case err := <-walPaniced:
  147. t.Logf("WAL paniced: %v", err)
  148. // make sure we can make blocks after a crash
  149. startNewConsensusStateAndWaitForBlock(t, cs.Height, blockDB, stateDB)
  150. // stop consensus state and transactions sender (initFn)
  151. cs.Stop()
  152. cancel()
  153. // if we reached the required height, exit
  154. if _, ok := err.(ReachedHeightToStopError); ok {
  155. break LOOP
  156. }
  157. case <-time.After(10 * time.Second):
  158. t.Fatal("WAL did not panic for 10 seconds (check the log)")
  159. }
  160. }
  161. }
  162. // crashingWAL is a WAL which crashes or rather simulates a crash during Save
  163. // (before and after). It remembers a message for which we last panicked
  164. // (lastPanicedForMsgIndex), so we don't panic for it in subsequent iterations.
  165. type crashingWAL struct {
  166. next WAL
  167. panicCh chan error
  168. heightToStop int64
  169. msgIndex int // current message index
  170. lastPanicedForMsgIndex int // last message for which we panicked
  171. }
  172. // WALWriteError indicates a WAL crash.
  173. type WALWriteError struct {
  174. msg string
  175. }
  176. func (e WALWriteError) Error() string {
  177. return e.msg
  178. }
  179. // ReachedHeightToStopError indicates we've reached the required consensus
  180. // height and may exit.
  181. type ReachedHeightToStopError struct {
  182. height int64
  183. }
  184. func (e ReachedHeightToStopError) Error() string {
  185. return fmt.Sprintf("reached height to stop %d", e.height)
  186. }
  187. // Write simulate WAL's crashing by sending an error to the panicCh and then
  188. // exiting the cs.receiveRoutine.
  189. func (w *crashingWAL) Write(m WALMessage) {
  190. if endMsg, ok := m.(EndHeightMessage); ok {
  191. if endMsg.Height == w.heightToStop {
  192. w.panicCh <- ReachedHeightToStopError{endMsg.Height}
  193. runtime.Goexit()
  194. } else {
  195. w.next.Write(m)
  196. }
  197. return
  198. }
  199. if w.msgIndex > w.lastPanicedForMsgIndex {
  200. w.lastPanicedForMsgIndex = w.msgIndex
  201. _, file, line, _ := runtime.Caller(1)
  202. w.panicCh <- WALWriteError{fmt.Sprintf("failed to write %T to WAL (fileline: %s:%d)", m, file, line)}
  203. runtime.Goexit()
  204. } else {
  205. w.msgIndex++
  206. w.next.Write(m)
  207. }
  208. }
  209. func (w *crashingWAL) WriteSync(m WALMessage) {
  210. w.Write(m)
  211. }
  212. func (w *crashingWAL) Group() *auto.Group { return w.next.Group() }
  213. func (w *crashingWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) {
  214. return w.next.SearchForEndHeight(height, options)
  215. }
  216. func (w *crashingWAL) Start() error { return w.next.Start() }
  217. func (w *crashingWAL) Stop() error { return w.next.Stop() }
  218. func (w *crashingWAL) Wait() { w.next.Wait() }
  219. //------------------------------------------------------------------------------------------
  220. // Handshake Tests
  221. const (
  222. NUM_BLOCKS = 6
  223. )
  224. var (
  225. mempool = sm.MockMempool{}
  226. evpool = sm.MockEvidencePool{}
  227. )
  228. //---------------------------------------
  229. // Test handshake/replay
  230. // 0 - all synced up
  231. // 1 - saved block but app and state are behind
  232. // 2 - save block and committed but state is behind
  233. var modes = []uint{0, 1, 2}
  234. // Sync from scratch
  235. func TestHandshakeReplayAll(t *testing.T) {
  236. for _, m := range modes {
  237. testHandshakeReplay(t, 0, m)
  238. }
  239. }
  240. // Sync many, not from scratch
  241. func TestHandshakeReplaySome(t *testing.T) {
  242. for _, m := range modes {
  243. testHandshakeReplay(t, 1, m)
  244. }
  245. }
  246. // Sync from lagging by one
  247. func TestHandshakeReplayOne(t *testing.T) {
  248. for _, m := range modes {
  249. testHandshakeReplay(t, NUM_BLOCKS-1, m)
  250. }
  251. }
  252. // Sync from caught up
  253. func TestHandshakeReplayNone(t *testing.T) {
  254. for _, m := range modes {
  255. testHandshakeReplay(t, NUM_BLOCKS, m)
  256. }
  257. }
  258. func tempWALWithData(data []byte) string {
  259. walFile, err := ioutil.TempFile("", "wal")
  260. if err != nil {
  261. panic(fmt.Errorf("failed to create temp WAL file: %v", err))
  262. }
  263. _, err = walFile.Write(data)
  264. if err != nil {
  265. panic(fmt.Errorf("failed to write to temp WAL file: %v", err))
  266. }
  267. if err := walFile.Close(); err != nil {
  268. panic(fmt.Errorf("failed to close temp WAL file: %v", err))
  269. }
  270. return walFile.Name()
  271. }
  272. // Make some blocks. Start a fresh app and apply nBlocks blocks. Then restart the app and sync it up with the remaining blocks
  273. func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
  274. config := ResetConfig("proxy_test_")
  275. walBody, err := WALWithNBlocks(NUM_BLOCKS)
  276. if err != nil {
  277. t.Fatal(err)
  278. }
  279. walFile := tempWALWithData(walBody)
  280. config.Consensus.SetWalFile(walFile)
  281. privVal := privval.LoadFilePV(config.PrivValidatorFile())
  282. wal, err := NewWAL(walFile)
  283. if err != nil {
  284. t.Fatal(err)
  285. }
  286. wal.SetLogger(log.TestingLogger())
  287. if err := wal.Start(); err != nil {
  288. t.Fatal(err)
  289. }
  290. defer wal.Stop()
  291. chain, commits, err := makeBlockchainFromWAL(wal)
  292. if err != nil {
  293. t.Fatalf(err.Error())
  294. }
  295. stateDB, state, store := stateAndStore(config, privVal.GetPubKey())
  296. store.chain = chain
  297. store.commits = commits
  298. // run the chain through state.ApplyBlock to build up the tendermint state
  299. state = buildTMStateFromChain(config, stateDB, state, chain, mode)
  300. latestAppHash := state.AppHash
  301. // make a new client creator
  302. kvstoreApp := kvstore.NewPersistentKVStoreApplication(path.Join(config.DBDir(), "2"))
  303. clientCreator2 := proxy.NewLocalClientCreator(kvstoreApp)
  304. if nBlocks > 0 {
  305. // run nBlocks against a new client to build up the app state.
  306. // use a throwaway tendermint state
  307. proxyApp := proxy.NewAppConns(clientCreator2, nil)
  308. stateDB, state, _ := stateAndStore(config, privVal.GetPubKey())
  309. buildAppStateFromChain(proxyApp, stateDB, state, chain, nBlocks, mode)
  310. }
  311. // now start the app using the handshake - it should sync
  312. genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile())
  313. handshaker := NewHandshaker(stateDB, state, store, genDoc)
  314. proxyApp := proxy.NewAppConns(clientCreator2, handshaker)
  315. if err := proxyApp.Start(); err != nil {
  316. t.Fatalf("Error starting proxy app connections: %v", err)
  317. }
  318. defer proxyApp.Stop()
  319. // get the latest app hash from the app
  320. res, err := proxyApp.Query().InfoSync(abci.RequestInfo{""})
  321. if err != nil {
  322. t.Fatal(err)
  323. }
  324. // the app hash should be synced up
  325. if !bytes.Equal(latestAppHash, res.LastBlockAppHash) {
  326. t.Fatalf("Expected app hashes to match after handshake/replay. got %X, expected %X", res.LastBlockAppHash, latestAppHash)
  327. }
  328. expectedBlocksToSync := NUM_BLOCKS - nBlocks
  329. if nBlocks == NUM_BLOCKS && mode > 0 {
  330. expectedBlocksToSync++
  331. } else if nBlocks > 0 && mode == 1 {
  332. expectedBlocksToSync++
  333. }
  334. if handshaker.NBlocks() != expectedBlocksToSync {
  335. t.Fatalf("Expected handshake to sync %d blocks, got %d", expectedBlocksToSync, handshaker.NBlocks())
  336. }
  337. }
  338. func applyBlock(stateDB dbm.DB, st sm.State, blk *types.Block, proxyApp proxy.AppConns) sm.State {
  339. testPartSize := st.ConsensusParams.BlockPartSizeBytes
  340. blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
  341. blkID := types.BlockID{blk.Hash(), blk.MakePartSet(testPartSize).Header()}
  342. newState, err := blockExec.ApplyBlock(st, blkID, blk)
  343. if err != nil {
  344. panic(err)
  345. }
  346. return newState
  347. }
  348. func buildAppStateFromChain(proxyApp proxy.AppConns, stateDB dbm.DB,
  349. state sm.State, chain []*types.Block, nBlocks int, mode uint) {
  350. // start a new app without handshake, play nBlocks blocks
  351. if err := proxyApp.Start(); err != nil {
  352. panic(err)
  353. }
  354. defer proxyApp.Stop()
  355. validators := types.TM2PB.Validators(state.Validators)
  356. if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{
  357. Validators: validators,
  358. }); err != nil {
  359. panic(err)
  360. }
  361. switch mode {
  362. case 0:
  363. for i := 0; i < nBlocks; i++ {
  364. block := chain[i]
  365. state = applyBlock(stateDB, state, block, proxyApp)
  366. }
  367. case 1, 2:
  368. for i := 0; i < nBlocks-1; i++ {
  369. block := chain[i]
  370. state = applyBlock(stateDB, state, block, proxyApp)
  371. }
  372. if mode == 2 {
  373. // update the kvstore height and apphash
  374. // as if we ran commit but not
  375. state = applyBlock(stateDB, state, chain[nBlocks-1], proxyApp)
  376. }
  377. }
  378. }
  379. func buildTMStateFromChain(config *cfg.Config, stateDB dbm.DB, state sm.State, chain []*types.Block, mode uint) sm.State {
  380. // run the whole chain against this client to build up the tendermint state
  381. clientCreator := proxy.NewLocalClientCreator(kvstore.NewPersistentKVStoreApplication(path.Join(config.DBDir(), "1")))
  382. proxyApp := proxy.NewAppConns(clientCreator, nil) // sm.NewHandshaker(config, state, store, ReplayLastBlock))
  383. if err := proxyApp.Start(); err != nil {
  384. panic(err)
  385. }
  386. defer proxyApp.Stop()
  387. validators := types.TM2PB.Validators(state.Validators)
  388. if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{
  389. Validators: validators,
  390. }); err != nil {
  391. panic(err)
  392. }
  393. switch mode {
  394. case 0:
  395. // sync right up
  396. for _, block := range chain {
  397. state = applyBlock(stateDB, state, block, proxyApp)
  398. }
  399. case 1, 2:
  400. // sync up to the penultimate as if we stored the block.
  401. // whether we commit or not depends on the appHash
  402. for _, block := range chain[:len(chain)-1] {
  403. state = applyBlock(stateDB, state, block, proxyApp)
  404. }
  405. // apply the final block to a state copy so we can
  406. // get the right next appHash but keep the state back
  407. applyBlock(stateDB, state, chain[len(chain)-1], proxyApp)
  408. }
  409. return state
  410. }
  411. //--------------------------
  412. // utils for making blocks
  413. func makeBlockchainFromWAL(wal WAL) ([]*types.Block, []*types.Commit, error) {
  414. // Search for height marker
  415. gr, found, err := wal.SearchForEndHeight(0, &WALSearchOptions{})
  416. if err != nil {
  417. return nil, nil, err
  418. }
  419. if !found {
  420. return nil, nil, errors.New(cmn.Fmt("WAL does not contain height %d.", 1))
  421. }
  422. defer gr.Close() // nolint: errcheck
  423. // log.Notice("Build a blockchain by reading from the WAL")
  424. var blocks []*types.Block
  425. var commits []*types.Commit
  426. var thisBlockParts *types.PartSet
  427. var thisBlockCommit *types.Commit
  428. var height int64
  429. dec := NewWALDecoder(gr)
  430. for {
  431. msg, err := dec.Decode()
  432. if err == io.EOF {
  433. break
  434. } else if err != nil {
  435. return nil, nil, err
  436. }
  437. piece := readPieceFromWAL(msg)
  438. if piece == nil {
  439. continue
  440. }
  441. switch p := piece.(type) {
  442. case EndHeightMessage:
  443. // if its not the first one, we have a full block
  444. if thisBlockParts != nil {
  445. var block = new(types.Block)
  446. _, err = cdc.UnmarshalBinaryReader(thisBlockParts.GetReader(), block, 0)
  447. if err != nil {
  448. panic(err)
  449. }
  450. if block.Height != height+1 {
  451. panic(cmn.Fmt("read bad block from wal. got height %d, expected %d", block.Height, height+1))
  452. }
  453. commitHeight := thisBlockCommit.Precommits[0].Height
  454. if commitHeight != height+1 {
  455. panic(cmn.Fmt("commit doesnt match. got height %d, expected %d", commitHeight, height+1))
  456. }
  457. blocks = append(blocks, block)
  458. commits = append(commits, thisBlockCommit)
  459. height++
  460. }
  461. case *types.PartSetHeader:
  462. thisBlockParts = types.NewPartSetFromHeader(*p)
  463. case *types.Part:
  464. _, err := thisBlockParts.AddPart(p)
  465. if err != nil {
  466. return nil, nil, err
  467. }
  468. case *types.Vote:
  469. if p.Type == types.VoteTypePrecommit {
  470. thisBlockCommit = &types.Commit{
  471. BlockID: p.BlockID,
  472. Precommits: []*types.Vote{p},
  473. }
  474. }
  475. }
  476. }
  477. // grab the last block too
  478. var block = new(types.Block)
  479. _, err = cdc.UnmarshalBinaryReader(thisBlockParts.GetReader(), block, 0)
  480. if err != nil {
  481. panic(err)
  482. }
  483. if block.Height != height+1 {
  484. panic(cmn.Fmt("read bad block from wal. got height %d, expected %d", block.Height, height+1))
  485. }
  486. commitHeight := thisBlockCommit.Precommits[0].Height
  487. if commitHeight != height+1 {
  488. panic(cmn.Fmt("commit doesnt match. got height %d, expected %d", commitHeight, height+1))
  489. }
  490. blocks = append(blocks, block)
  491. commits = append(commits, thisBlockCommit)
  492. return blocks, commits, nil
  493. }
  494. func readPieceFromWAL(msg *TimedWALMessage) interface{} {
  495. // for logging
  496. switch m := msg.Msg.(type) {
  497. case msgInfo:
  498. switch msg := m.Msg.(type) {
  499. case *ProposalMessage:
  500. return &msg.Proposal.BlockPartsHeader
  501. case *BlockPartMessage:
  502. return msg.Part
  503. case *VoteMessage:
  504. return msg.Vote
  505. }
  506. case EndHeightMessage:
  507. return m
  508. }
  509. return nil
  510. }
  511. // fresh state and mock store
  512. func stateAndStore(config *cfg.Config, pubKey crypto.PubKey) (dbm.DB, sm.State, *mockBlockStore) {
  513. stateDB := dbm.NewMemDB()
  514. state, _ := sm.MakeGenesisStateFromFile(config.GenesisFile())
  515. store := NewMockBlockStore(config, state.ConsensusParams)
  516. return stateDB, state, store
  517. }
  518. //----------------------------------
  519. // mock block store
  520. type mockBlockStore struct {
  521. config *cfg.Config
  522. params types.ConsensusParams
  523. chain []*types.Block
  524. commits []*types.Commit
  525. }
  526. // TODO: NewBlockStore(db.NewMemDB) ...
  527. func NewMockBlockStore(config *cfg.Config, params types.ConsensusParams) *mockBlockStore {
  528. return &mockBlockStore{config, params, nil, nil}
  529. }
  530. func (bs *mockBlockStore) Height() int64 { return int64(len(bs.chain)) }
  531. func (bs *mockBlockStore) LoadBlock(height int64) *types.Block { return bs.chain[height-1] }
  532. func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta {
  533. block := bs.chain[height-1]
  534. return &types.BlockMeta{
  535. BlockID: types.BlockID{block.Hash(), block.MakePartSet(bs.params.BlockPartSizeBytes).Header()},
  536. Header: block.Header,
  537. }
  538. }
  539. func (bs *mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { return nil }
  540. func (bs *mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
  541. }
  542. func (bs *mockBlockStore) LoadBlockCommit(height int64) *types.Commit {
  543. return bs.commits[height-1]
  544. }
  545. func (bs *mockBlockStore) LoadSeenCommit(height int64) *types.Commit {
  546. return bs.commits[height-1]
  547. }
  548. //----------------------------------------
  549. func TestInitChainUpdateValidators(t *testing.T) {
  550. val, _ := types.RandValidator(true, 10)
  551. vals := types.NewValidatorSet([]*types.Validator{val})
  552. app := &initChainApp{vals: types.TM2PB.Validators(vals)}
  553. clientCreator := proxy.NewLocalClientCreator(app)
  554. config := ResetConfig("proxy_test_")
  555. privVal := privval.LoadFilePV(config.PrivValidatorFile())
  556. stateDB, state, store := stateAndStore(config, privVal.GetPubKey())
  557. oldValAddr := state.Validators.Validators[0].Address
  558. // now start the app using the handshake - it should sync
  559. genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile())
  560. handshaker := NewHandshaker(stateDB, state, store, genDoc)
  561. proxyApp := proxy.NewAppConns(clientCreator, handshaker)
  562. if err := proxyApp.Start(); err != nil {
  563. t.Fatalf("Error starting proxy app connections: %v", err)
  564. }
  565. defer proxyApp.Stop()
  566. // reload the state, check the validator set was updated
  567. state = sm.LoadState(stateDB)
  568. newValAddr := state.Validators.Validators[0].Address
  569. expectValAddr := val.Address
  570. assert.NotEqual(t, oldValAddr, newValAddr)
  571. assert.Equal(t, newValAddr, expectValAddr)
  572. }
  573. func newInitChainApp(vals []abci.Validator) *initChainApp {
  574. return &initChainApp{
  575. vals: vals,
  576. }
  577. }
  578. // returns the vals on InitChain
  579. type initChainApp struct {
  580. abci.BaseApplication
  581. vals []abci.Validator
  582. }
  583. func (ica *initChainApp) InitChain(req abci.RequestInitChain) abci.ResponseInitChain {
  584. return abci.ResponseInitChain{
  585. Validators: ica.vals,
  586. }
  587. }