You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

683 lines
20 KiB

  1. package consensus
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "os"
  9. "path"
  10. "runtime"
  11. "testing"
  12. "time"
  13. "github.com/stretchr/testify/assert"
  14. "github.com/stretchr/testify/require"
  15. "github.com/tendermint/tendermint/abci/example/kvstore"
  16. abci "github.com/tendermint/tendermint/abci/types"
  17. crypto "github.com/tendermint/tendermint/crypto"
  18. auto "github.com/tendermint/tendermint/libs/autofile"
  19. dbm "github.com/tendermint/tendermint/libs/db"
  20. cfg "github.com/tendermint/tendermint/config"
  21. "github.com/tendermint/tendermint/libs/log"
  22. "github.com/tendermint/tendermint/privval"
  23. "github.com/tendermint/tendermint/proxy"
  24. sm "github.com/tendermint/tendermint/state"
  25. "github.com/tendermint/tendermint/types"
  26. )
  27. var consensusReplayConfig *cfg.Config
  28. func init() {
  29. consensusReplayConfig = ResetConfig("consensus_replay_test")
  30. }
  31. // These tests ensure we can always recover from failure at any part of the consensus process.
  32. // There are two general failure scenarios: failure during consensus, and failure while applying the block.
  33. // Only the latter interacts with the app and store,
  34. // but the former has to deal with restrictions on re-use of priv_validator keys.
  35. // The `WAL Tests` are for failures during the consensus;
  36. // the `Handshake Tests` are for failures in applying the block.
  37. // With the help of the WAL, we can recover from it all!
  38. //------------------------------------------------------------------------------------------
  39. // WAL Tests
  40. // TODO: It would be better to verify explicitly which states we can recover from without the wal
  41. // and which ones we need the wal for - then we'd also be able to only flush the
  42. // wal writer when we need to, instead of with every message.
  43. func startNewConsensusStateAndWaitForBlock(t *testing.T, lastBlockHeight int64, blockDB dbm.DB, stateDB dbm.DB) {
  44. logger := log.TestingLogger()
  45. state, _ := sm.LoadStateFromDBOrGenesisFile(stateDB, consensusReplayConfig.GenesisFile())
  46. privValidator := loadPrivValidator(consensusReplayConfig)
  47. cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, kvstore.NewKVStoreApplication(), blockDB)
  48. cs.SetLogger(logger)
  49. bytes, _ := ioutil.ReadFile(cs.config.WalFile())
  50. // fmt.Printf("====== WAL: \n\r%s\n", bytes)
  51. t.Logf("====== WAL: \n\r%X\n", bytes)
  52. err := cs.Start()
  53. require.NoError(t, err)
  54. defer cs.Stop()
  55. // This is just a signal that we haven't halted; its not something contained
  56. // in the WAL itself. Assuming the consensus state is running, replay of any
  57. // WAL, including the empty one, should eventually be followed by a new
  58. // block, or else something is wrong.
  59. newBlockCh := make(chan interface{}, 1)
  60. err = cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, newBlockCh)
  61. require.NoError(t, err)
  62. select {
  63. case <-newBlockCh:
  64. case <-time.After(60 * time.Second):
  65. t.Fatalf("Timed out waiting for new block (see trace above)")
  66. }
  67. }
  68. func sendTxs(cs *ConsensusState, ctx context.Context) {
  69. for i := 0; i < 256; i++ {
  70. select {
  71. case <-ctx.Done():
  72. return
  73. default:
  74. tx := []byte{byte(i)}
  75. cs.mempool.CheckTx(tx, nil)
  76. i++
  77. }
  78. }
  79. }
  80. // TestWALCrash uses crashing WAL to test we can recover from any WAL failure.
  81. func TestWALCrash(t *testing.T) {
  82. testCases := []struct {
  83. name string
  84. initFn func(dbm.DB, *ConsensusState, context.Context)
  85. heightToStop int64
  86. }{
  87. {"empty block",
  88. func(stateDB dbm.DB, cs *ConsensusState, ctx context.Context) {},
  89. 1},
  90. {"many non-empty blocks",
  91. func(stateDB dbm.DB, cs *ConsensusState, ctx context.Context) {
  92. go sendTxs(cs, ctx)
  93. },
  94. 3},
  95. }
  96. for _, tc := range testCases {
  97. t.Run(tc.name, func(t *testing.T) {
  98. crashWALandCheckLiveness(t, tc.initFn, tc.heightToStop)
  99. })
  100. }
  101. }
  102. func crashWALandCheckLiveness(t *testing.T, initFn func(dbm.DB, *ConsensusState, context.Context), heightToStop int64) {
  103. walPaniced := make(chan error)
  104. crashingWal := &crashingWAL{panicCh: walPaniced, heightToStop: heightToStop}
  105. i := 1
  106. LOOP:
  107. for {
  108. // fmt.Printf("====== LOOP %d\n", i)
  109. t.Logf("====== LOOP %d\n", i)
  110. // create consensus state from a clean slate
  111. logger := log.NewNopLogger()
  112. stateDB := dbm.NewMemDB()
  113. state, _ := sm.MakeGenesisStateFromFile(consensusReplayConfig.GenesisFile())
  114. privValidator := loadPrivValidator(consensusReplayConfig)
  115. blockDB := dbm.NewMemDB()
  116. cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, kvstore.NewKVStoreApplication(), blockDB)
  117. cs.SetLogger(logger)
  118. // start sending transactions
  119. ctx, cancel := context.WithCancel(context.Background())
  120. initFn(stateDB, cs, ctx)
  121. // clean up WAL file from the previous iteration
  122. walFile := cs.config.WalFile()
  123. os.Remove(walFile)
  124. // set crashing WAL
  125. csWal, err := cs.OpenWAL(walFile)
  126. require.NoError(t, err)
  127. crashingWal.next = csWal
  128. // reset the message counter
  129. crashingWal.msgIndex = 1
  130. cs.wal = crashingWal
  131. // start consensus state
  132. err = cs.Start()
  133. require.NoError(t, err)
  134. i++
  135. select {
  136. case err := <-walPaniced:
  137. t.Logf("WAL paniced: %v", err)
  138. // make sure we can make blocks after a crash
  139. startNewConsensusStateAndWaitForBlock(t, cs.Height, blockDB, stateDB)
  140. // stop consensus state and transactions sender (initFn)
  141. cs.Stop()
  142. cancel()
  143. // if we reached the required height, exit
  144. if _, ok := err.(ReachedHeightToStopError); ok {
  145. break LOOP
  146. }
  147. case <-time.After(10 * time.Second):
  148. t.Fatal("WAL did not panic for 10 seconds (check the log)")
  149. }
  150. }
  151. }
  152. // crashingWAL is a WAL which crashes or rather simulates a crash during Save
  153. // (before and after). It remembers a message for which we last panicked
  154. // (lastPanicedForMsgIndex), so we don't panic for it in subsequent iterations.
  155. type crashingWAL struct {
  156. next WAL
  157. panicCh chan error
  158. heightToStop int64
  159. msgIndex int // current message index
  160. lastPanicedForMsgIndex int // last message for which we panicked
  161. }
  162. // WALWriteError indicates a WAL crash.
  163. type WALWriteError struct {
  164. msg string
  165. }
  166. func (e WALWriteError) Error() string {
  167. return e.msg
  168. }
  169. // ReachedHeightToStopError indicates we've reached the required consensus
  170. // height and may exit.
  171. type ReachedHeightToStopError struct {
  172. height int64
  173. }
  174. func (e ReachedHeightToStopError) Error() string {
  175. return fmt.Sprintf("reached height to stop %d", e.height)
  176. }
  177. // Write simulate WAL's crashing by sending an error to the panicCh and then
  178. // exiting the cs.receiveRoutine.
  179. func (w *crashingWAL) Write(m WALMessage) {
  180. if endMsg, ok := m.(EndHeightMessage); ok {
  181. if endMsg.Height == w.heightToStop {
  182. w.panicCh <- ReachedHeightToStopError{endMsg.Height}
  183. runtime.Goexit()
  184. } else {
  185. w.next.Write(m)
  186. }
  187. return
  188. }
  189. if w.msgIndex > w.lastPanicedForMsgIndex {
  190. w.lastPanicedForMsgIndex = w.msgIndex
  191. _, file, line, _ := runtime.Caller(1)
  192. w.panicCh <- WALWriteError{fmt.Sprintf("failed to write %T to WAL (fileline: %s:%d)", m, file, line)}
  193. runtime.Goexit()
  194. } else {
  195. w.msgIndex++
  196. w.next.Write(m)
  197. }
  198. }
  199. func (w *crashingWAL) WriteSync(m WALMessage) {
  200. w.Write(m)
  201. }
  202. func (w *crashingWAL) Group() *auto.Group { return w.next.Group() }
  203. func (w *crashingWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) {
  204. return w.next.SearchForEndHeight(height, options)
  205. }
  206. func (w *crashingWAL) Start() error { return w.next.Start() }
  207. func (w *crashingWAL) Stop() error { return w.next.Stop() }
  208. func (w *crashingWAL) Wait() { w.next.Wait() }
  209. //------------------------------------------------------------------------------------------
  210. // Handshake Tests
  211. const (
  212. NUM_BLOCKS = 6
  213. )
  214. var (
  215. mempool = sm.MockMempool{}
  216. evpool = sm.MockEvidencePool{}
  217. )
  218. //---------------------------------------
  219. // Test handshake/replay
  220. // 0 - all synced up
  221. // 1 - saved block but app and state are behind
  222. // 2 - save block and committed but state is behind
  223. var modes = []uint{0, 1, 2}
  224. // Sync from scratch
  225. func TestHandshakeReplayAll(t *testing.T) {
  226. for _, m := range modes {
  227. testHandshakeReplay(t, 0, m)
  228. }
  229. }
  230. // Sync many, not from scratch
  231. func TestHandshakeReplaySome(t *testing.T) {
  232. for _, m := range modes {
  233. testHandshakeReplay(t, 1, m)
  234. }
  235. }
  236. // Sync from lagging by one
  237. func TestHandshakeReplayOne(t *testing.T) {
  238. for _, m := range modes {
  239. testHandshakeReplay(t, NUM_BLOCKS-1, m)
  240. }
  241. }
  242. // Sync from caught up
  243. func TestHandshakeReplayNone(t *testing.T) {
  244. for _, m := range modes {
  245. testHandshakeReplay(t, NUM_BLOCKS, m)
  246. }
  247. }
  248. func tempWALWithData(data []byte) string {
  249. walFile, err := ioutil.TempFile("", "wal")
  250. if err != nil {
  251. panic(fmt.Errorf("failed to create temp WAL file: %v", err))
  252. }
  253. _, err = walFile.Write(data)
  254. if err != nil {
  255. panic(fmt.Errorf("failed to write to temp WAL file: %v", err))
  256. }
  257. if err := walFile.Close(); err != nil {
  258. panic(fmt.Errorf("failed to close temp WAL file: %v", err))
  259. }
  260. return walFile.Name()
  261. }
  262. // Make some blocks. Start a fresh app and apply nBlocks blocks. Then restart the app and sync it up with the remaining blocks
  263. func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
  264. config := ResetConfig("proxy_test_")
  265. walBody, err := WALWithNBlocks(NUM_BLOCKS)
  266. if err != nil {
  267. t.Fatal(err)
  268. }
  269. walFile := tempWALWithData(walBody)
  270. config.Consensus.SetWalFile(walFile)
  271. privVal := privval.LoadFilePV(config.PrivValidatorFile())
  272. wal, err := NewWAL(walFile)
  273. if err != nil {
  274. t.Fatal(err)
  275. }
  276. wal.SetLogger(log.TestingLogger())
  277. if err := wal.Start(); err != nil {
  278. t.Fatal(err)
  279. }
  280. defer wal.Stop()
  281. chain, commits, err := makeBlockchainFromWAL(wal)
  282. if err != nil {
  283. t.Fatalf(err.Error())
  284. }
  285. stateDB, state, store := stateAndStore(config, privVal.GetPubKey())
  286. store.chain = chain
  287. store.commits = commits
  288. // run the chain through state.ApplyBlock to build up the tendermint state
  289. state = buildTMStateFromChain(config, stateDB, state, chain, mode)
  290. latestAppHash := state.AppHash
  291. // make a new client creator
  292. kvstoreApp := kvstore.NewPersistentKVStoreApplication(path.Join(config.DBDir(), "2"))
  293. clientCreator2 := proxy.NewLocalClientCreator(kvstoreApp)
  294. if nBlocks > 0 {
  295. // run nBlocks against a new client to build up the app state.
  296. // use a throwaway tendermint state
  297. proxyApp := proxy.NewAppConns(clientCreator2)
  298. stateDB, state, _ := stateAndStore(config, privVal.GetPubKey())
  299. buildAppStateFromChain(proxyApp, stateDB, state, chain, nBlocks, mode)
  300. }
  301. // now start the app using the handshake - it should sync
  302. genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile())
  303. handshaker := NewHandshaker(stateDB, state, store, genDoc)
  304. proxyApp := proxy.NewAppConns(clientCreator2)
  305. if err := proxyApp.Start(); err != nil {
  306. t.Fatalf("Error starting proxy app connections: %v", err)
  307. }
  308. defer proxyApp.Stop()
  309. if err := handshaker.Handshake(proxyApp); err != nil {
  310. t.Fatalf("Error on abci handshake: %v", err)
  311. }
  312. // get the latest app hash from the app
  313. res, err := proxyApp.Query().InfoSync(abci.RequestInfo{Version: ""})
  314. if err != nil {
  315. t.Fatal(err)
  316. }
  317. // the app hash should be synced up
  318. if !bytes.Equal(latestAppHash, res.LastBlockAppHash) {
  319. t.Fatalf("Expected app hashes to match after handshake/replay. got %X, expected %X", res.LastBlockAppHash, latestAppHash)
  320. }
  321. expectedBlocksToSync := NUM_BLOCKS - nBlocks
  322. if nBlocks == NUM_BLOCKS && mode > 0 {
  323. expectedBlocksToSync++
  324. } else if nBlocks > 0 && mode == 1 {
  325. expectedBlocksToSync++
  326. }
  327. if handshaker.NBlocks() != expectedBlocksToSync {
  328. t.Fatalf("Expected handshake to sync %d blocks, got %d", expectedBlocksToSync, handshaker.NBlocks())
  329. }
  330. }
  331. func applyBlock(stateDB dbm.DB, st sm.State, blk *types.Block, proxyApp proxy.AppConns) sm.State {
  332. testPartSize := types.BlockPartSizeBytes
  333. blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
  334. blkID := types.BlockID{blk.Hash(), blk.MakePartSet(testPartSize).Header()}
  335. newState, err := blockExec.ApplyBlock(st, blkID, blk)
  336. if err != nil {
  337. panic(err)
  338. }
  339. return newState
  340. }
  341. func buildAppStateFromChain(proxyApp proxy.AppConns, stateDB dbm.DB,
  342. state sm.State, chain []*types.Block, nBlocks int, mode uint) {
  343. // start a new app without handshake, play nBlocks blocks
  344. if err := proxyApp.Start(); err != nil {
  345. panic(err)
  346. }
  347. defer proxyApp.Stop()
  348. validators := types.TM2PB.ValidatorUpdates(state.Validators)
  349. if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{
  350. Validators: validators,
  351. }); err != nil {
  352. panic(err)
  353. }
  354. switch mode {
  355. case 0:
  356. for i := 0; i < nBlocks; i++ {
  357. block := chain[i]
  358. state = applyBlock(stateDB, state, block, proxyApp)
  359. }
  360. case 1, 2:
  361. for i := 0; i < nBlocks-1; i++ {
  362. block := chain[i]
  363. state = applyBlock(stateDB, state, block, proxyApp)
  364. }
  365. if mode == 2 {
  366. // update the kvstore height and apphash
  367. // as if we ran commit but not
  368. state = applyBlock(stateDB, state, chain[nBlocks-1], proxyApp)
  369. }
  370. }
  371. }
  372. func buildTMStateFromChain(config *cfg.Config, stateDB dbm.DB, state sm.State, chain []*types.Block, mode uint) sm.State {
  373. // run the whole chain against this client to build up the tendermint state
  374. clientCreator := proxy.NewLocalClientCreator(kvstore.NewPersistentKVStoreApplication(path.Join(config.DBDir(), "1")))
  375. proxyApp := proxy.NewAppConns(clientCreator) // sm.NewHandshaker(config, state, store, ReplayLastBlock))
  376. if err := proxyApp.Start(); err != nil {
  377. panic(err)
  378. }
  379. defer proxyApp.Stop()
  380. validators := types.TM2PB.ValidatorUpdates(state.Validators)
  381. if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{
  382. Validators: validators,
  383. }); err != nil {
  384. panic(err)
  385. }
  386. switch mode {
  387. case 0:
  388. // sync right up
  389. for _, block := range chain {
  390. state = applyBlock(stateDB, state, block, proxyApp)
  391. }
  392. case 1, 2:
  393. // sync up to the penultimate as if we stored the block.
  394. // whether we commit or not depends on the appHash
  395. for _, block := range chain[:len(chain)-1] {
  396. state = applyBlock(stateDB, state, block, proxyApp)
  397. }
  398. // apply the final block to a state copy so we can
  399. // get the right next appHash but keep the state back
  400. applyBlock(stateDB, state, chain[len(chain)-1], proxyApp)
  401. }
  402. return state
  403. }
  404. //--------------------------
  405. // utils for making blocks
  406. func makeBlockchainFromWAL(wal WAL) ([]*types.Block, []*types.Commit, error) {
  407. // Search for height marker
  408. gr, found, err := wal.SearchForEndHeight(0, &WALSearchOptions{})
  409. if err != nil {
  410. return nil, nil, err
  411. }
  412. if !found {
  413. return nil, nil, fmt.Errorf("WAL does not contain height %d.", 1)
  414. }
  415. defer gr.Close() // nolint: errcheck
  416. // log.Notice("Build a blockchain by reading from the WAL")
  417. var blocks []*types.Block
  418. var commits []*types.Commit
  419. var thisBlockParts *types.PartSet
  420. var thisBlockCommit *types.Commit
  421. var height int64
  422. dec := NewWALDecoder(gr)
  423. for {
  424. msg, err := dec.Decode()
  425. if err == io.EOF {
  426. break
  427. } else if err != nil {
  428. return nil, nil, err
  429. }
  430. piece := readPieceFromWAL(msg)
  431. if piece == nil {
  432. continue
  433. }
  434. switch p := piece.(type) {
  435. case EndHeightMessage:
  436. // if its not the first one, we have a full block
  437. if thisBlockParts != nil {
  438. var block = new(types.Block)
  439. _, err = cdc.UnmarshalBinaryReader(thisBlockParts.GetReader(), block, 0)
  440. if err != nil {
  441. panic(err)
  442. }
  443. if block.Height != height+1 {
  444. panic(fmt.Sprintf("read bad block from wal. got height %d, expected %d", block.Height, height+1))
  445. }
  446. commitHeight := thisBlockCommit.Precommits[0].Height
  447. if commitHeight != height+1 {
  448. panic(fmt.Sprintf("commit doesnt match. got height %d, expected %d", commitHeight, height+1))
  449. }
  450. blocks = append(blocks, block)
  451. commits = append(commits, thisBlockCommit)
  452. height++
  453. }
  454. case *types.PartSetHeader:
  455. thisBlockParts = types.NewPartSetFromHeader(*p)
  456. case *types.Part:
  457. _, err := thisBlockParts.AddPart(p)
  458. if err != nil {
  459. return nil, nil, err
  460. }
  461. case *types.Vote:
  462. if p.Type == types.PrecommitType {
  463. thisBlockCommit = &types.Commit{
  464. BlockID: p.BlockID,
  465. Precommits: []*types.Vote{p},
  466. }
  467. }
  468. }
  469. }
  470. // grab the last block too
  471. var block = new(types.Block)
  472. _, err = cdc.UnmarshalBinaryReader(thisBlockParts.GetReader(), block, 0)
  473. if err != nil {
  474. panic(err)
  475. }
  476. if block.Height != height+1 {
  477. panic(fmt.Sprintf("read bad block from wal. got height %d, expected %d", block.Height, height+1))
  478. }
  479. commitHeight := thisBlockCommit.Precommits[0].Height
  480. if commitHeight != height+1 {
  481. panic(fmt.Sprintf("commit doesnt match. got height %d, expected %d", commitHeight, height+1))
  482. }
  483. blocks = append(blocks, block)
  484. commits = append(commits, thisBlockCommit)
  485. return blocks, commits, nil
  486. }
  487. func readPieceFromWAL(msg *TimedWALMessage) interface{} {
  488. // for logging
  489. switch m := msg.Msg.(type) {
  490. case msgInfo:
  491. switch msg := m.Msg.(type) {
  492. case *ProposalMessage:
  493. return &msg.Proposal.BlockPartsHeader
  494. case *BlockPartMessage:
  495. return msg.Part
  496. case *VoteMessage:
  497. return msg.Vote
  498. }
  499. case EndHeightMessage:
  500. return m
  501. }
  502. return nil
  503. }
  504. // fresh state and mock store
  505. func stateAndStore(config *cfg.Config, pubKey crypto.PubKey) (dbm.DB, sm.State, *mockBlockStore) {
  506. stateDB := dbm.NewMemDB()
  507. state, _ := sm.MakeGenesisStateFromFile(config.GenesisFile())
  508. store := NewMockBlockStore(config, state.ConsensusParams)
  509. return stateDB, state, store
  510. }
  511. //----------------------------------
  512. // mock block store
  513. type mockBlockStore struct {
  514. config *cfg.Config
  515. params types.ConsensusParams
  516. chain []*types.Block
  517. commits []*types.Commit
  518. }
  519. // TODO: NewBlockStore(db.NewMemDB) ...
  520. func NewMockBlockStore(config *cfg.Config, params types.ConsensusParams) *mockBlockStore {
  521. return &mockBlockStore{config, params, nil, nil}
  522. }
  523. func (bs *mockBlockStore) Height() int64 { return int64(len(bs.chain)) }
  524. func (bs *mockBlockStore) LoadBlock(height int64) *types.Block { return bs.chain[height-1] }
  525. func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta {
  526. block := bs.chain[height-1]
  527. return &types.BlockMeta{
  528. BlockID: types.BlockID{block.Hash(), block.MakePartSet(types.BlockPartSizeBytes).Header()},
  529. Header: block.Header,
  530. }
  531. }
  532. func (bs *mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { return nil }
  533. func (bs *mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
  534. }
  535. func (bs *mockBlockStore) LoadBlockCommit(height int64) *types.Commit {
  536. return bs.commits[height-1]
  537. }
  538. func (bs *mockBlockStore) LoadSeenCommit(height int64) *types.Commit {
  539. return bs.commits[height-1]
  540. }
  541. //----------------------------------------
  542. func TestInitChainUpdateValidators(t *testing.T) {
  543. val, _ := types.RandValidator(true, 10)
  544. vals := types.NewValidatorSet([]*types.Validator{val})
  545. app := &initChainApp{vals: types.TM2PB.ValidatorUpdates(vals)}
  546. clientCreator := proxy.NewLocalClientCreator(app)
  547. config := ResetConfig("proxy_test_")
  548. privVal := privval.LoadFilePV(config.PrivValidatorFile())
  549. stateDB, state, store := stateAndStore(config, privVal.GetPubKey())
  550. oldValAddr := state.Validators.Validators[0].Address
  551. // now start the app using the handshake - it should sync
  552. genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile())
  553. handshaker := NewHandshaker(stateDB, state, store, genDoc)
  554. proxyApp := proxy.NewAppConns(clientCreator)
  555. if err := proxyApp.Start(); err != nil {
  556. t.Fatalf("Error starting proxy app connections: %v", err)
  557. }
  558. defer proxyApp.Stop()
  559. if err := handshaker.Handshake(proxyApp); err != nil {
  560. t.Fatalf("Error on abci handshake: %v", err)
  561. }
  562. // reload the state, check the validator set was updated
  563. state = sm.LoadState(stateDB)
  564. newValAddr := state.Validators.Validators[0].Address
  565. expectValAddr := val.Address
  566. assert.NotEqual(t, oldValAddr, newValAddr)
  567. assert.Equal(t, newValAddr, expectValAddr)
  568. }
  569. func newInitChainApp(vals []abci.ValidatorUpdate) *initChainApp {
  570. return &initChainApp{
  571. vals: vals,
  572. }
  573. }
  574. // returns the vals on InitChain
  575. type initChainApp struct {
  576. abci.BaseApplication
  577. vals []abci.ValidatorUpdate
  578. }
  579. func (ica *initChainApp) InitChain(req abci.RequestInitChain) abci.ResponseInitChain {
  580. return abci.ResponseInitChain{
  581. Validators: ica.vals,
  582. }
  583. }