You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

830 lines
25 KiB

pubsub 2.0 (#3227) * green pubsub tests :OK: * get rid of clientToQueryMap * Subscribe and SubscribeUnbuffered * start adapting other pkgs to new pubsub * nope * rename MsgAndTags to Message * remove TagMap it does not bring any additional benefits * bring back EventSubscriber * fix test * fix data race in TestStartNextHeightCorrectly ``` Write at 0x00c0001c7418 by goroutine 796: github.com/tendermint/tendermint/consensus.TestStartNextHeightCorrectly() /go/src/github.com/tendermint/tendermint/consensus/state_test.go:1296 +0xad testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Previous read at 0x00c0001c7418 by goroutine 858: github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote() /go/src/github.com/tendermint/tendermint/consensus/state.go:1631 +0x1366 github.com/tendermint/tendermint/consensus.(*ConsensusState).tryAddVote() /go/src/github.com/tendermint/tendermint/consensus/state.go:1476 +0x8f github.com/tendermint/tendermint/consensus.(*ConsensusState).handleMsg() /go/src/github.com/tendermint/tendermint/consensus/state.go:667 +0xa1e github.com/tendermint/tendermint/consensus.(*ConsensusState).receiveRoutine() /go/src/github.com/tendermint/tendermint/consensus/state.go:628 +0x794 Goroutine 796 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 testing.runTests.func1() /usr/local/go/src/testing/testing.go:1119 +0xa8 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 testing.runTests() /usr/local/go/src/testing/testing.go:1117 +0x4ee testing.(*M).Run() /usr/local/go/src/testing/testing.go:1034 +0x2ee main.main() _testmain.go:214 +0x332 Goroutine 858 (running) created at: github.com/tendermint/tendermint/consensus.(*ConsensusState).startRoutines() /go/src/github.com/tendermint/tendermint/consensus/state.go:334 +0x221 github.com/tendermint/tendermint/consensus.startTestRound() /go/src/github.com/tendermint/tendermint/consensus/common_test.go:122 +0x63 github.com/tendermint/tendermint/consensus.TestStateFullRound1() /go/src/github.com/tendermint/tendermint/consensus/state_test.go:255 +0x397 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ``` * fixes after my own review * fix formatting * wait 100ms before kicking a subscriber out + a test for indexer_service * fixes after my second review * no timeout * add changelog entries * fix merge conflicts * fix typos after Thane's review Co-Authored-By: melekes <anton.kalyaev@gmail.com> * reformat code * rewrite indexer service in the attempt to fix failing test https://github.com/tendermint/tendermint/pull/3227/#issuecomment-462316527 * Revert "rewrite indexer service in the attempt to fix failing test" This reverts commit 0d9107a098230de7138abb1c201877c246e89ed1. * another attempt to fix indexer * fixes after Ethan's review * use unbuffered channel when indexing transactions Refs https://github.com/tendermint/tendermint/pull/3227#discussion_r258786716 * add a comment for EventBus#SubscribeUnbuffered * format code
6 years ago
pubsub 2.0 (#3227) * green pubsub tests :OK: * get rid of clientToQueryMap * Subscribe and SubscribeUnbuffered * start adapting other pkgs to new pubsub * nope * rename MsgAndTags to Message * remove TagMap it does not bring any additional benefits * bring back EventSubscriber * fix test * fix data race in TestStartNextHeightCorrectly ``` Write at 0x00c0001c7418 by goroutine 796: github.com/tendermint/tendermint/consensus.TestStartNextHeightCorrectly() /go/src/github.com/tendermint/tendermint/consensus/state_test.go:1296 +0xad testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Previous read at 0x00c0001c7418 by goroutine 858: github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote() /go/src/github.com/tendermint/tendermint/consensus/state.go:1631 +0x1366 github.com/tendermint/tendermint/consensus.(*ConsensusState).tryAddVote() /go/src/github.com/tendermint/tendermint/consensus/state.go:1476 +0x8f github.com/tendermint/tendermint/consensus.(*ConsensusState).handleMsg() /go/src/github.com/tendermint/tendermint/consensus/state.go:667 +0xa1e github.com/tendermint/tendermint/consensus.(*ConsensusState).receiveRoutine() /go/src/github.com/tendermint/tendermint/consensus/state.go:628 +0x794 Goroutine 796 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 testing.runTests.func1() /usr/local/go/src/testing/testing.go:1119 +0xa8 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 testing.runTests() /usr/local/go/src/testing/testing.go:1117 +0x4ee testing.(*M).Run() /usr/local/go/src/testing/testing.go:1034 +0x2ee main.main() _testmain.go:214 +0x332 Goroutine 858 (running) created at: github.com/tendermint/tendermint/consensus.(*ConsensusState).startRoutines() /go/src/github.com/tendermint/tendermint/consensus/state.go:334 +0x221 github.com/tendermint/tendermint/consensus.startTestRound() /go/src/github.com/tendermint/tendermint/consensus/common_test.go:122 +0x63 github.com/tendermint/tendermint/consensus.TestStateFullRound1() /go/src/github.com/tendermint/tendermint/consensus/state_test.go:255 +0x397 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ``` * fixes after my own review * fix formatting * wait 100ms before kicking a subscriber out + a test for indexer_service * fixes after my second review * no timeout * add changelog entries * fix merge conflicts * fix typos after Thane's review Co-Authored-By: melekes <anton.kalyaev@gmail.com> * reformat code * rewrite indexer service in the attempt to fix failing test https://github.com/tendermint/tendermint/pull/3227/#issuecomment-462316527 * Revert "rewrite indexer service in the attempt to fix failing test" This reverts commit 0d9107a098230de7138abb1c201877c246e89ed1. * another attempt to fix indexer * fixes after Ethan's review * use unbuffered channel when indexing transactions Refs https://github.com/tendermint/tendermint/pull/3227#discussion_r258786716 * add a comment for EventBus#SubscribeUnbuffered * format code
6 years ago
cs: sync WAL more frequently (#3300) As per #3043, this adds a ticker to sync the WAL every 2s while the WAL is running. * Flush WAL every 2s This adds a ticker that flushes the WAL every 2s while the WAL is running. This is related to #3043. * Fix spelling * Increase timeout to 2mins for slower build environments * Make WAL sync interval configurable * Add TODO to replace testChan with more comprehensive testBus * Remove extraneous debug statement * Remove testChan in favour of using system time As per https://github.com/tendermint/tendermint/pull/3300#discussion_r255886586, this removes the `testChan` WAL member and replaces the approach with a system time-oriented one. In this new approach, we keep track of the system time at which each flush and periodic flush successfully occurred. The naming of the various functions is also updated here to be more consistent with "flushing" as opposed to "sync'ing". * Update naming convention and ensure lock for timestamp update * Add Flush method as part of WAL interface Adds a `Flush` method as part of the WAL interface to enforce the idea that we can manually trigger a WAL flush from outside of the WAL. This is employed in the consensus state management to flush the WAL prior to signing votes/proposals, as per https://github.com/tendermint/tendermint/issues/3043#issuecomment-453853630 * Update CHANGELOG_PENDING * Remove mutex approach and replace with DI The dependency injection approach to dealing with testing concerns could allow similar effects to some kind of "testing bus"-based approach. This commit introduces an example of this, where instead of relying on (potentially fragile) timing of things between the code and the test, we inject code into the function under test that can signal the test through a channel. This allows us to avoid the `time.Sleep()`-based approach previously employed. * Update comment on WAL flushing during vote signing Co-Authored-By: thanethomson <connect@thanethomson.com> * Simplify flush interval definition Co-Authored-By: thanethomson <connect@thanethomson.com> * Expand commentary on WAL disk flushing Co-Authored-By: thanethomson <connect@thanethomson.com> * Add broken test to illustrate WAL sync test problem Removes test-related state (dependency injection code) from the WAL data structure and adds test code to illustrate the problem with using `WALGenerateNBlocks` and `wal.SearchForEndHeight` to test periodic sync'ing. * Fix test error messages * Use WAL group buffer size to check for flush A function is added to `libs/autofile/group.go#Group` in order to return the size of the buffered data (i.e. data that has not yet been flushed to disk). The test now checks that, prior to a `time.Sleep`, the group buffer has data in it. After the `time.Sleep` (during which time the periodic flush should have been called), the buffer should be empty. * Remove config root dir removal from #3291 * Add godoc for NewWAL mentioning periodic sync
6 years ago
pubsub 2.0 (#3227) * green pubsub tests :OK: * get rid of clientToQueryMap * Subscribe and SubscribeUnbuffered * start adapting other pkgs to new pubsub * nope * rename MsgAndTags to Message * remove TagMap it does not bring any additional benefits * bring back EventSubscriber * fix test * fix data race in TestStartNextHeightCorrectly ``` Write at 0x00c0001c7418 by goroutine 796: github.com/tendermint/tendermint/consensus.TestStartNextHeightCorrectly() /go/src/github.com/tendermint/tendermint/consensus/state_test.go:1296 +0xad testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Previous read at 0x00c0001c7418 by goroutine 858: github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote() /go/src/github.com/tendermint/tendermint/consensus/state.go:1631 +0x1366 github.com/tendermint/tendermint/consensus.(*ConsensusState).tryAddVote() /go/src/github.com/tendermint/tendermint/consensus/state.go:1476 +0x8f github.com/tendermint/tendermint/consensus.(*ConsensusState).handleMsg() /go/src/github.com/tendermint/tendermint/consensus/state.go:667 +0xa1e github.com/tendermint/tendermint/consensus.(*ConsensusState).receiveRoutine() /go/src/github.com/tendermint/tendermint/consensus/state.go:628 +0x794 Goroutine 796 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 testing.runTests.func1() /usr/local/go/src/testing/testing.go:1119 +0xa8 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 testing.runTests() /usr/local/go/src/testing/testing.go:1117 +0x4ee testing.(*M).Run() /usr/local/go/src/testing/testing.go:1034 +0x2ee main.main() _testmain.go:214 +0x332 Goroutine 858 (running) created at: github.com/tendermint/tendermint/consensus.(*ConsensusState).startRoutines() /go/src/github.com/tendermint/tendermint/consensus/state.go:334 +0x221 github.com/tendermint/tendermint/consensus.startTestRound() /go/src/github.com/tendermint/tendermint/consensus/common_test.go:122 +0x63 github.com/tendermint/tendermint/consensus.TestStateFullRound1() /go/src/github.com/tendermint/tendermint/consensus/state_test.go:255 +0x397 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ``` * fixes after my own review * fix formatting * wait 100ms before kicking a subscriber out + a test for indexer_service * fixes after my second review * no timeout * add changelog entries * fix merge conflicts * fix typos after Thane's review Co-Authored-By: melekes <anton.kalyaev@gmail.com> * reformat code * rewrite indexer service in the attempt to fix failing test https://github.com/tendermint/tendermint/pull/3227/#issuecomment-462316527 * Revert "rewrite indexer service in the attempt to fix failing test" This reverts commit 0d9107a098230de7138abb1c201877c246e89ed1. * another attempt to fix indexer * fixes after Ethan's review * use unbuffered channel when indexing transactions Refs https://github.com/tendermint/tendermint/pull/3227#discussion_r258786716 * add a comment for EventBus#SubscribeUnbuffered * format code
6 years ago
cs: sync WAL more frequently (#3300) As per #3043, this adds a ticker to sync the WAL every 2s while the WAL is running. * Flush WAL every 2s This adds a ticker that flushes the WAL every 2s while the WAL is running. This is related to #3043. * Fix spelling * Increase timeout to 2mins for slower build environments * Make WAL sync interval configurable * Add TODO to replace testChan with more comprehensive testBus * Remove extraneous debug statement * Remove testChan in favour of using system time As per https://github.com/tendermint/tendermint/pull/3300#discussion_r255886586, this removes the `testChan` WAL member and replaces the approach with a system time-oriented one. In this new approach, we keep track of the system time at which each flush and periodic flush successfully occurred. The naming of the various functions is also updated here to be more consistent with "flushing" as opposed to "sync'ing". * Update naming convention and ensure lock for timestamp update * Add Flush method as part of WAL interface Adds a `Flush` method as part of the WAL interface to enforce the idea that we can manually trigger a WAL flush from outside of the WAL. This is employed in the consensus state management to flush the WAL prior to signing votes/proposals, as per https://github.com/tendermint/tendermint/issues/3043#issuecomment-453853630 * Update CHANGELOG_PENDING * Remove mutex approach and replace with DI The dependency injection approach to dealing with testing concerns could allow similar effects to some kind of "testing bus"-based approach. This commit introduces an example of this, where instead of relying on (potentially fragile) timing of things between the code and the test, we inject code into the function under test that can signal the test through a channel. This allows us to avoid the `time.Sleep()`-based approach previously employed. * Update comment on WAL flushing during vote signing Co-Authored-By: thanethomson <connect@thanethomson.com> * Simplify flush interval definition Co-Authored-By: thanethomson <connect@thanethomson.com> * Expand commentary on WAL disk flushing Co-Authored-By: thanethomson <connect@thanethomson.com> * Add broken test to illustrate WAL sync test problem Removes test-related state (dependency injection code) from the WAL data structure and adds test code to illustrate the problem with using `WALGenerateNBlocks` and `wal.SearchForEndHeight` to test periodic sync'ing. * Fix test error messages * Use WAL group buffer size to check for flush A function is added to `libs/autofile/group.go#Group` in order to return the size of the buffered data (i.e. data that has not yet been flushed to disk). The test now checks that, prior to a `time.Sleep`, the group buffer has data in it. After the `time.Sleep` (during which time the periodic flush should have been called), the buffer should be empty. * Remove config root dir removal from #3291 * Add godoc for NewWAL mentioning periodic sync
6 years ago
cs: sync WAL more frequently (#3300) As per #3043, this adds a ticker to sync the WAL every 2s while the WAL is running. * Flush WAL every 2s This adds a ticker that flushes the WAL every 2s while the WAL is running. This is related to #3043. * Fix spelling * Increase timeout to 2mins for slower build environments * Make WAL sync interval configurable * Add TODO to replace testChan with more comprehensive testBus * Remove extraneous debug statement * Remove testChan in favour of using system time As per https://github.com/tendermint/tendermint/pull/3300#discussion_r255886586, this removes the `testChan` WAL member and replaces the approach with a system time-oriented one. In this new approach, we keep track of the system time at which each flush and periodic flush successfully occurred. The naming of the various functions is also updated here to be more consistent with "flushing" as opposed to "sync'ing". * Update naming convention and ensure lock for timestamp update * Add Flush method as part of WAL interface Adds a `Flush` method as part of the WAL interface to enforce the idea that we can manually trigger a WAL flush from outside of the WAL. This is employed in the consensus state management to flush the WAL prior to signing votes/proposals, as per https://github.com/tendermint/tendermint/issues/3043#issuecomment-453853630 * Update CHANGELOG_PENDING * Remove mutex approach and replace with DI The dependency injection approach to dealing with testing concerns could allow similar effects to some kind of "testing bus"-based approach. This commit introduces an example of this, where instead of relying on (potentially fragile) timing of things between the code and the test, we inject code into the function under test that can signal the test through a channel. This allows us to avoid the `time.Sleep()`-based approach previously employed. * Update comment on WAL flushing during vote signing Co-Authored-By: thanethomson <connect@thanethomson.com> * Simplify flush interval definition Co-Authored-By: thanethomson <connect@thanethomson.com> * Expand commentary on WAL disk flushing Co-Authored-By: thanethomson <connect@thanethomson.com> * Add broken test to illustrate WAL sync test problem Removes test-related state (dependency injection code) from the WAL data structure and adds test code to illustrate the problem with using `WALGenerateNBlocks` and `wal.SearchForEndHeight` to test periodic sync'ing. * Fix test error messages * Use WAL group buffer size to check for flush A function is added to `libs/autofile/group.go#Group` in order to return the size of the buffered data (i.e. data that has not yet been flushed to disk). The test now checks that, prior to a `time.Sleep`, the group buffer has data in it. After the `time.Sleep` (during which time the periodic flush should have been called), the buffer should be empty. * Remove config root dir removal from #3291 * Add godoc for NewWAL mentioning periodic sync
6 years ago
cs: sync WAL more frequently (#3300) As per #3043, this adds a ticker to sync the WAL every 2s while the WAL is running. * Flush WAL every 2s This adds a ticker that flushes the WAL every 2s while the WAL is running. This is related to #3043. * Fix spelling * Increase timeout to 2mins for slower build environments * Make WAL sync interval configurable * Add TODO to replace testChan with more comprehensive testBus * Remove extraneous debug statement * Remove testChan in favour of using system time As per https://github.com/tendermint/tendermint/pull/3300#discussion_r255886586, this removes the `testChan` WAL member and replaces the approach with a system time-oriented one. In this new approach, we keep track of the system time at which each flush and periodic flush successfully occurred. The naming of the various functions is also updated here to be more consistent with "flushing" as opposed to "sync'ing". * Update naming convention and ensure lock for timestamp update * Add Flush method as part of WAL interface Adds a `Flush` method as part of the WAL interface to enforce the idea that we can manually trigger a WAL flush from outside of the WAL. This is employed in the consensus state management to flush the WAL prior to signing votes/proposals, as per https://github.com/tendermint/tendermint/issues/3043#issuecomment-453853630 * Update CHANGELOG_PENDING * Remove mutex approach and replace with DI The dependency injection approach to dealing with testing concerns could allow similar effects to some kind of "testing bus"-based approach. This commit introduces an example of this, where instead of relying on (potentially fragile) timing of things between the code and the test, we inject code into the function under test that can signal the test through a channel. This allows us to avoid the `time.Sleep()`-based approach previously employed. * Update comment on WAL flushing during vote signing Co-Authored-By: thanethomson <connect@thanethomson.com> * Simplify flush interval definition Co-Authored-By: thanethomson <connect@thanethomson.com> * Expand commentary on WAL disk flushing Co-Authored-By: thanethomson <connect@thanethomson.com> * Add broken test to illustrate WAL sync test problem Removes test-related state (dependency injection code) from the WAL data structure and adds test code to illustrate the problem with using `WALGenerateNBlocks` and `wal.SearchForEndHeight` to test periodic sync'ing. * Fix test error messages * Use WAL group buffer size to check for flush A function is added to `libs/autofile/group.go#Group` in order to return the size of the buffered data (i.e. data that has not yet been flushed to disk). The test now checks that, prior to a `time.Sleep`, the group buffer has data in it. After the `time.Sleep` (during which time the periodic flush should have been called), the buffer should be empty. * Remove config root dir removal from #3291 * Add godoc for NewWAL mentioning periodic sync
6 years ago
cs: sync WAL more frequently (#3300) As per #3043, this adds a ticker to sync the WAL every 2s while the WAL is running. * Flush WAL every 2s This adds a ticker that flushes the WAL every 2s while the WAL is running. This is related to #3043. * Fix spelling * Increase timeout to 2mins for slower build environments * Make WAL sync interval configurable * Add TODO to replace testChan with more comprehensive testBus * Remove extraneous debug statement * Remove testChan in favour of using system time As per https://github.com/tendermint/tendermint/pull/3300#discussion_r255886586, this removes the `testChan` WAL member and replaces the approach with a system time-oriented one. In this new approach, we keep track of the system time at which each flush and periodic flush successfully occurred. The naming of the various functions is also updated here to be more consistent with "flushing" as opposed to "sync'ing". * Update naming convention and ensure lock for timestamp update * Add Flush method as part of WAL interface Adds a `Flush` method as part of the WAL interface to enforce the idea that we can manually trigger a WAL flush from outside of the WAL. This is employed in the consensus state management to flush the WAL prior to signing votes/proposals, as per https://github.com/tendermint/tendermint/issues/3043#issuecomment-453853630 * Update CHANGELOG_PENDING * Remove mutex approach and replace with DI The dependency injection approach to dealing with testing concerns could allow similar effects to some kind of "testing bus"-based approach. This commit introduces an example of this, where instead of relying on (potentially fragile) timing of things between the code and the test, we inject code into the function under test that can signal the test through a channel. This allows us to avoid the `time.Sleep()`-based approach previously employed. * Update comment on WAL flushing during vote signing Co-Authored-By: thanethomson <connect@thanethomson.com> * Simplify flush interval definition Co-Authored-By: thanethomson <connect@thanethomson.com> * Expand commentary on WAL disk flushing Co-Authored-By: thanethomson <connect@thanethomson.com> * Add broken test to illustrate WAL sync test problem Removes test-related state (dependency injection code) from the WAL data structure and adds test code to illustrate the problem with using `WALGenerateNBlocks` and `wal.SearchForEndHeight` to test periodic sync'ing. * Fix test error messages * Use WAL group buffer size to check for flush A function is added to `libs/autofile/group.go#Group` in order to return the size of the buffered data (i.e. data that has not yet been flushed to disk). The test now checks that, prior to a `time.Sleep`, the group buffer has data in it. After the `time.Sleep` (during which time the periodic flush should have been called), the buffer should be empty. * Remove config root dir removal from #3291 * Add godoc for NewWAL mentioning periodic sync
6 years ago
cs: sync WAL more frequently (#3300) As per #3043, this adds a ticker to sync the WAL every 2s while the WAL is running. * Flush WAL every 2s This adds a ticker that flushes the WAL every 2s while the WAL is running. This is related to #3043. * Fix spelling * Increase timeout to 2mins for slower build environments * Make WAL sync interval configurable * Add TODO to replace testChan with more comprehensive testBus * Remove extraneous debug statement * Remove testChan in favour of using system time As per https://github.com/tendermint/tendermint/pull/3300#discussion_r255886586, this removes the `testChan` WAL member and replaces the approach with a system time-oriented one. In this new approach, we keep track of the system time at which each flush and periodic flush successfully occurred. The naming of the various functions is also updated here to be more consistent with "flushing" as opposed to "sync'ing". * Update naming convention and ensure lock for timestamp update * Add Flush method as part of WAL interface Adds a `Flush` method as part of the WAL interface to enforce the idea that we can manually trigger a WAL flush from outside of the WAL. This is employed in the consensus state management to flush the WAL prior to signing votes/proposals, as per https://github.com/tendermint/tendermint/issues/3043#issuecomment-453853630 * Update CHANGELOG_PENDING * Remove mutex approach and replace with DI The dependency injection approach to dealing with testing concerns could allow similar effects to some kind of "testing bus"-based approach. This commit introduces an example of this, where instead of relying on (potentially fragile) timing of things between the code and the test, we inject code into the function under test that can signal the test through a channel. This allows us to avoid the `time.Sleep()`-based approach previously employed. * Update comment on WAL flushing during vote signing Co-Authored-By: thanethomson <connect@thanethomson.com> * Simplify flush interval definition Co-Authored-By: thanethomson <connect@thanethomson.com> * Expand commentary on WAL disk flushing Co-Authored-By: thanethomson <connect@thanethomson.com> * Add broken test to illustrate WAL sync test problem Removes test-related state (dependency injection code) from the WAL data structure and adds test code to illustrate the problem with using `WALGenerateNBlocks` and `wal.SearchForEndHeight` to test periodic sync'ing. * Fix test error messages * Use WAL group buffer size to check for flush A function is added to `libs/autofile/group.go#Group` in order to return the size of the buffered data (i.e. data that has not yet been flushed to disk). The test now checks that, prior to a `time.Sleep`, the group buffer has data in it. After the `time.Sleep` (during which time the periodic flush should have been called), the buffer should be empty. * Remove config root dir removal from #3291 * Add godoc for NewWAL mentioning periodic sync
6 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "os"
  9. "path/filepath"
  10. "runtime"
  11. "testing"
  12. "time"
  13. "github.com/stretchr/testify/assert"
  14. "github.com/stretchr/testify/require"
  15. "github.com/tendermint/tendermint/abci/example/kvstore"
  16. abci "github.com/tendermint/tendermint/abci/types"
  17. cfg "github.com/tendermint/tendermint/config"
  18. "github.com/tendermint/tendermint/crypto"
  19. cmn "github.com/tendermint/tendermint/libs/common"
  20. dbm "github.com/tendermint/tendermint/libs/db"
  21. "github.com/tendermint/tendermint/libs/log"
  22. "github.com/tendermint/tendermint/privval"
  23. "github.com/tendermint/tendermint/proxy"
  24. sm "github.com/tendermint/tendermint/state"
  25. "github.com/tendermint/tendermint/types"
  26. tmtime "github.com/tendermint/tendermint/types/time"
  27. "github.com/tendermint/tendermint/version"
  28. )
  29. func TestMain(m *testing.M) {
  30. config = ResetConfig("consensus_reactor_test")
  31. consensusReplayConfig = ResetConfig("consensus_replay_test")
  32. configStateTest := ResetConfig("consensus_state_test")
  33. configMempoolTest := ResetConfig("consensus_mempool_test")
  34. configByzantineTest := ResetConfig("consensus_byzantine_test")
  35. code := m.Run()
  36. os.RemoveAll(config.RootDir)
  37. os.RemoveAll(consensusReplayConfig.RootDir)
  38. os.RemoveAll(configStateTest.RootDir)
  39. os.RemoveAll(configMempoolTest.RootDir)
  40. os.RemoveAll(configByzantineTest.RootDir)
  41. os.Exit(code)
  42. }
  43. // These tests ensure we can always recover from failure at any part of the consensus process.
  44. // There are two general failure scenarios: failure during consensus, and failure while applying the block.
  45. // Only the latter interacts with the app and store,
  46. // but the former has to deal with restrictions on re-use of priv_validator keys.
  47. // The `WAL Tests` are for failures during the consensus;
  48. // the `Handshake Tests` are for failures in applying the block.
  49. // With the help of the WAL, we can recover from it all!
  50. //------------------------------------------------------------------------------------------
  51. // WAL Tests
  52. // TODO: It would be better to verify explicitly which states we can recover from without the wal
  53. // and which ones we need the wal for - then we'd also be able to only flush the
  54. // wal writer when we need to, instead of with every message.
  55. func startNewConsensusStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Config,
  56. lastBlockHeight int64, blockDB dbm.DB, stateDB dbm.DB) {
  57. logger := log.TestingLogger()
  58. state, _ := sm.LoadStateFromDBOrGenesisFile(stateDB, consensusReplayConfig.GenesisFile())
  59. privValidator := loadPrivValidator(consensusReplayConfig)
  60. cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, kvstore.NewKVStoreApplication(), blockDB)
  61. cs.SetLogger(logger)
  62. bytes, _ := ioutil.ReadFile(cs.config.WalFile())
  63. t.Logf("====== WAL: \n\r%X\n", bytes)
  64. err := cs.Start()
  65. require.NoError(t, err)
  66. defer cs.Stop()
  67. // This is just a signal that we haven't halted; its not something contained
  68. // in the WAL itself. Assuming the consensus state is running, replay of any
  69. // WAL, including the empty one, should eventually be followed by a new
  70. // block, or else something is wrong.
  71. newBlockSub, err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
  72. require.NoError(t, err)
  73. select {
  74. case <-newBlockSub.Out():
  75. case <-newBlockSub.Cancelled():
  76. t.Fatal("newBlockSub was cancelled")
  77. case <-time.After(120 * time.Second):
  78. t.Fatal("Timed out waiting for new block (see trace above)")
  79. }
  80. }
  81. func sendTxs(ctx context.Context, cs *ConsensusState) {
  82. for i := 0; i < 256; i++ {
  83. select {
  84. case <-ctx.Done():
  85. return
  86. default:
  87. tx := []byte{byte(i)}
  88. assertMempool(cs.txNotifier).CheckTx(tx, nil)
  89. i++
  90. }
  91. }
  92. }
  93. // TestWALCrash uses crashing WAL to test we can recover from any WAL failure.
  94. func TestWALCrash(t *testing.T) {
  95. testCases := []struct {
  96. name string
  97. initFn func(dbm.DB, *ConsensusState, context.Context)
  98. heightToStop int64
  99. }{
  100. {"empty block",
  101. func(stateDB dbm.DB, cs *ConsensusState, ctx context.Context) {},
  102. 1},
  103. {"many non-empty blocks",
  104. func(stateDB dbm.DB, cs *ConsensusState, ctx context.Context) {
  105. go sendTxs(ctx, cs)
  106. },
  107. 3},
  108. }
  109. for i, tc := range testCases {
  110. consensusReplayConfig := ResetConfig(fmt.Sprintf("%s_%d", t.Name(), i))
  111. t.Run(tc.name, func(t *testing.T) {
  112. crashWALandCheckLiveness(t, consensusReplayConfig, tc.initFn, tc.heightToStop)
  113. })
  114. }
  115. }
  116. func crashWALandCheckLiveness(t *testing.T, consensusReplayConfig *cfg.Config,
  117. initFn func(dbm.DB, *ConsensusState, context.Context), heightToStop int64) {
  118. walPanicked := make(chan error)
  119. crashingWal := &crashingWAL{panicCh: walPanicked, heightToStop: heightToStop}
  120. i := 1
  121. LOOP:
  122. for {
  123. t.Logf("====== LOOP %d\n", i)
  124. // create consensus state from a clean slate
  125. logger := log.NewNopLogger()
  126. stateDB := dbm.NewMemDB()
  127. state, _ := sm.MakeGenesisStateFromFile(consensusReplayConfig.GenesisFile())
  128. privValidator := loadPrivValidator(consensusReplayConfig)
  129. blockDB := dbm.NewMemDB()
  130. cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, kvstore.NewKVStoreApplication(), blockDB)
  131. cs.SetLogger(logger)
  132. // start sending transactions
  133. ctx, cancel := context.WithCancel(context.Background())
  134. initFn(stateDB, cs, ctx)
  135. // clean up WAL file from the previous iteration
  136. walFile := cs.config.WalFile()
  137. os.Remove(walFile)
  138. // set crashing WAL
  139. csWal, err := cs.OpenWAL(walFile)
  140. require.NoError(t, err)
  141. crashingWal.next = csWal
  142. // reset the message counter
  143. crashingWal.msgIndex = 1
  144. cs.wal = crashingWal
  145. // start consensus state
  146. err = cs.Start()
  147. require.NoError(t, err)
  148. i++
  149. select {
  150. case err := <-walPanicked:
  151. t.Logf("WAL panicked: %v", err)
  152. // make sure we can make blocks after a crash
  153. startNewConsensusStateAndWaitForBlock(t, consensusReplayConfig, cs.Height, blockDB, stateDB)
  154. // stop consensus state and transactions sender (initFn)
  155. cs.Stop()
  156. cancel()
  157. // if we reached the required height, exit
  158. if _, ok := err.(ReachedHeightToStopError); ok {
  159. break LOOP
  160. }
  161. case <-time.After(10 * time.Second):
  162. t.Fatal("WAL did not panic for 10 seconds (check the log)")
  163. }
  164. }
  165. }
  166. // crashingWAL is a WAL which crashes or rather simulates a crash during Save
  167. // (before and after). It remembers a message for which we last panicked
  168. // (lastPanickedForMsgIndex), so we don't panic for it in subsequent iterations.
  169. type crashingWAL struct {
  170. next WAL
  171. panicCh chan error
  172. heightToStop int64
  173. msgIndex int // current message index
  174. lastPanickedForMsgIndex int // last message for which we panicked
  175. }
  176. var _ WAL = &crashingWAL{}
  177. // WALWriteError indicates a WAL crash.
  178. type WALWriteError struct {
  179. msg string
  180. }
  181. func (e WALWriteError) Error() string {
  182. return e.msg
  183. }
  184. // ReachedHeightToStopError indicates we've reached the required consensus
  185. // height and may exit.
  186. type ReachedHeightToStopError struct {
  187. height int64
  188. }
  189. func (e ReachedHeightToStopError) Error() string {
  190. return fmt.Sprintf("reached height to stop %d", e.height)
  191. }
  192. // Write simulate WAL's crashing by sending an error to the panicCh and then
  193. // exiting the cs.receiveRoutine.
  194. func (w *crashingWAL) Write(m WALMessage) {
  195. if endMsg, ok := m.(EndHeightMessage); ok {
  196. if endMsg.Height == w.heightToStop {
  197. w.panicCh <- ReachedHeightToStopError{endMsg.Height}
  198. runtime.Goexit()
  199. } else {
  200. w.next.Write(m)
  201. }
  202. return
  203. }
  204. if w.msgIndex > w.lastPanickedForMsgIndex {
  205. w.lastPanickedForMsgIndex = w.msgIndex
  206. _, file, line, _ := runtime.Caller(1)
  207. w.panicCh <- WALWriteError{fmt.Sprintf("failed to write %T to WAL (fileline: %s:%d)", m, file, line)}
  208. runtime.Goexit()
  209. } else {
  210. w.msgIndex++
  211. w.next.Write(m)
  212. }
  213. }
  214. func (w *crashingWAL) WriteSync(m WALMessage) {
  215. w.Write(m)
  216. }
  217. func (w *crashingWAL) FlushAndSync() error { return w.next.FlushAndSync() }
  218. func (w *crashingWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
  219. return w.next.SearchForEndHeight(height, options)
  220. }
  221. func (w *crashingWAL) Start() error { return w.next.Start() }
  222. func (w *crashingWAL) Stop() error { return w.next.Stop() }
  223. func (w *crashingWAL) Wait() { w.next.Wait() }
  224. //------------------------------------------------------------------------------------------
  225. // Handshake Tests
  226. const (
  227. numBlocks = 6
  228. )
  229. var (
  230. mempool = sm.MockMempool{}
  231. evpool = sm.MockEvidencePool{}
  232. )
  233. //---------------------------------------
  234. // Test handshake/replay
  235. // 0 - all synced up
  236. // 1 - saved block but app and state are behind
  237. // 2 - save block and committed but state is behind
  238. var modes = []uint{0, 1, 2}
  239. // Sync from scratch
  240. func TestHandshakeReplayAll(t *testing.T) {
  241. for i, m := range modes {
  242. config := ResetConfig(fmt.Sprintf("%s_%v", t.Name(), i))
  243. defer os.RemoveAll(config.RootDir)
  244. testHandshakeReplay(t, config, 0, m)
  245. }
  246. }
  247. // Sync many, not from scratch
  248. func TestHandshakeReplaySome(t *testing.T) {
  249. for i, m := range modes {
  250. config := ResetConfig(fmt.Sprintf("%s_%v", t.Name(), i))
  251. defer os.RemoveAll(config.RootDir)
  252. testHandshakeReplay(t, config, 1, m)
  253. }
  254. }
  255. // Sync from lagging by one
  256. func TestHandshakeReplayOne(t *testing.T) {
  257. for i, m := range modes {
  258. config := ResetConfig(fmt.Sprintf("%s_%v", t.Name(), i))
  259. defer os.RemoveAll(config.RootDir)
  260. testHandshakeReplay(t, config, numBlocks-1, m)
  261. }
  262. }
  263. // Sync from caught up
  264. func TestHandshakeReplayNone(t *testing.T) {
  265. for i, m := range modes {
  266. config := ResetConfig(fmt.Sprintf("%s_%v", t.Name(), i))
  267. defer os.RemoveAll(config.RootDir)
  268. testHandshakeReplay(t, config, numBlocks, m)
  269. }
  270. }
  271. func tempWALWithData(data []byte) string {
  272. walFile, err := ioutil.TempFile("", "wal")
  273. if err != nil {
  274. panic(fmt.Sprintf("failed to create temp WAL file: %v", err))
  275. }
  276. _, err = walFile.Write(data)
  277. if err != nil {
  278. panic(fmt.Sprintf("failed to write to temp WAL file: %v", err))
  279. }
  280. if err := walFile.Close(); err != nil {
  281. panic(fmt.Sprintf("failed to close temp WAL file: %v", err))
  282. }
  283. return walFile.Name()
  284. }
  285. // Make some blocks. Start a fresh app and apply nBlocks blocks. Then restart
  286. // the app and sync it up with the remaining blocks.
  287. func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uint) {
  288. walBody, err := WALWithNBlocks(t, numBlocks)
  289. require.NoError(t, err)
  290. walFile := tempWALWithData(walBody)
  291. config.Consensus.SetWalFile(walFile)
  292. privVal := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile())
  293. wal, err := NewWAL(walFile)
  294. require.NoError(t, err)
  295. wal.SetLogger(log.TestingLogger())
  296. err = wal.Start()
  297. require.NoError(t, err)
  298. defer wal.Stop()
  299. chain, commits, err := makeBlockchainFromWAL(wal)
  300. require.NoError(t, err)
  301. stateDB, state, store := stateAndStore(config, privVal.GetPubKey(), kvstore.ProtocolVersion)
  302. store.chain = chain
  303. store.commits = commits
  304. // run the chain through state.ApplyBlock to build up the tendermint state
  305. clientCreator := proxy.NewLocalClientCreator(kvstore.NewPersistentKVStoreApplication(filepath.Join(config.DBDir(), "1")))
  306. proxyApp := proxy.NewAppConns(clientCreator)
  307. err = proxyApp.Start()
  308. require.NoError(t, err)
  309. state = buildTMStateFromChain(config, stateDB, state, chain, proxyApp, mode)
  310. proxyApp.Stop()
  311. latestAppHash := state.AppHash
  312. // make a new client creator
  313. kvstoreApp := kvstore.NewPersistentKVStoreApplication(filepath.Join(config.DBDir(), "2"))
  314. clientCreator2 := proxy.NewLocalClientCreator(kvstoreApp)
  315. if nBlocks > 0 {
  316. // run nBlocks against a new client to build up the app state.
  317. // use a throwaway tendermint state
  318. proxyApp := proxy.NewAppConns(clientCreator2)
  319. stateDB, state, _ := stateAndStore(config, privVal.GetPubKey(), kvstore.ProtocolVersion)
  320. buildAppStateFromChain(proxyApp, stateDB, state, chain, nBlocks, mode)
  321. }
  322. // now start the app using the handshake - it should sync
  323. genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile())
  324. handshaker := NewHandshaker(stateDB, state, store, genDoc)
  325. proxyApp = proxy.NewAppConns(clientCreator2)
  326. if err := proxyApp.Start(); err != nil {
  327. t.Fatalf("Error starting proxy app connections: %v", err)
  328. }
  329. defer proxyApp.Stop()
  330. if err := handshaker.Handshake(proxyApp); err != nil {
  331. t.Fatalf("Error on abci handshake: %v", err)
  332. }
  333. // get the latest app hash from the app
  334. res, err := proxyApp.Query().InfoSync(abci.RequestInfo{Version: ""})
  335. if err != nil {
  336. t.Fatal(err)
  337. }
  338. // the app hash should be synced up
  339. if !bytes.Equal(latestAppHash, res.LastBlockAppHash) {
  340. t.Fatalf("Expected app hashes to match after handshake/replay. got %X, expected %X", res.LastBlockAppHash, latestAppHash)
  341. }
  342. expectedBlocksToSync := numBlocks - nBlocks
  343. if nBlocks == numBlocks && mode > 0 {
  344. expectedBlocksToSync++
  345. } else if nBlocks > 0 && mode == 1 {
  346. expectedBlocksToSync++
  347. }
  348. if handshaker.NBlocks() != expectedBlocksToSync {
  349. t.Fatalf("Expected handshake to sync %d blocks, got %d", expectedBlocksToSync, handshaker.NBlocks())
  350. }
  351. }
  352. func applyBlock(stateDB dbm.DB, st sm.State, blk *types.Block, proxyApp proxy.AppConns) sm.State {
  353. testPartSize := types.BlockPartSizeBytes
  354. blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
  355. blkID := types.BlockID{Hash: blk.Hash(), PartsHeader: blk.MakePartSet(testPartSize).Header()}
  356. newState, err := blockExec.ApplyBlock(st, blkID, blk)
  357. if err != nil {
  358. panic(err)
  359. }
  360. return newState
  361. }
  362. func buildAppStateFromChain(proxyApp proxy.AppConns, stateDB dbm.DB,
  363. state sm.State, chain []*types.Block, nBlocks int, mode uint) {
  364. // start a new app without handshake, play nBlocks blocks
  365. if err := proxyApp.Start(); err != nil {
  366. panic(err)
  367. }
  368. defer proxyApp.Stop()
  369. validators := types.TM2PB.ValidatorUpdates(state.Validators)
  370. if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{
  371. Validators: validators,
  372. }); err != nil {
  373. panic(err)
  374. }
  375. switch mode {
  376. case 0:
  377. for i := 0; i < nBlocks; i++ {
  378. block := chain[i]
  379. state = applyBlock(stateDB, state, block, proxyApp)
  380. }
  381. case 1, 2:
  382. for i := 0; i < nBlocks-1; i++ {
  383. block := chain[i]
  384. state = applyBlock(stateDB, state, block, proxyApp)
  385. }
  386. if mode == 2 {
  387. // update the kvstore height and apphash
  388. // as if we ran commit but not
  389. state = applyBlock(stateDB, state, chain[nBlocks-1], proxyApp)
  390. }
  391. }
  392. }
  393. func buildTMStateFromChain(config *cfg.Config, stateDB dbm.DB, state sm.State,
  394. chain []*types.Block, proxyApp proxy.AppConns, mode uint) sm.State {
  395. validators := types.TM2PB.ValidatorUpdates(state.Validators)
  396. if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{
  397. Validators: validators,
  398. }); err != nil {
  399. panic(err)
  400. }
  401. switch mode {
  402. case 0:
  403. // sync right up
  404. for _, block := range chain {
  405. state = applyBlock(stateDB, state, block, proxyApp)
  406. }
  407. case 1, 2:
  408. // sync up to the penultimate as if we stored the block.
  409. // whether we commit or not depends on the appHash
  410. for _, block := range chain[:len(chain)-1] {
  411. state = applyBlock(stateDB, state, block, proxyApp)
  412. }
  413. // apply the final block to a state copy so we can
  414. // get the right next appHash but keep the state back
  415. applyBlock(stateDB, state, chain[len(chain)-1], proxyApp)
  416. }
  417. return state
  418. }
  419. func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
  420. // 1. Initialize tendermint and commit 3 blocks with the following app hashes:
  421. // - 0x01
  422. // - 0x02
  423. // - 0x03
  424. config := ResetConfig("handshake_test_")
  425. defer os.RemoveAll(config.RootDir)
  426. privVal := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile())
  427. const appVersion = 0x0
  428. stateDB, state, store := stateAndStore(config, privVal.GetPubKey(), appVersion)
  429. genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile())
  430. state.LastValidators = state.Validators.Copy()
  431. // mode = 0 for committing all the blocks
  432. blocks := makeBlocks(3, &state, privVal)
  433. store.chain = blocks
  434. // 2. Tendermint must panic if app returns wrong hash for the first block
  435. // - RANDOM HASH
  436. // - 0x02
  437. // - 0x03
  438. {
  439. app := &badApp{numBlocks: 3, allHashesAreWrong: true}
  440. clientCreator := proxy.NewLocalClientCreator(app)
  441. proxyApp := proxy.NewAppConns(clientCreator)
  442. err := proxyApp.Start()
  443. require.NoError(t, err)
  444. defer proxyApp.Stop()
  445. assert.Panics(t, func() {
  446. h := NewHandshaker(stateDB, state, store, genDoc)
  447. h.Handshake(proxyApp)
  448. })
  449. }
  450. // 3. Tendermint must panic if app returns wrong hash for the last block
  451. // - 0x01
  452. // - 0x02
  453. // - RANDOM HASH
  454. {
  455. app := &badApp{numBlocks: 3, onlyLastHashIsWrong: true}
  456. clientCreator := proxy.NewLocalClientCreator(app)
  457. proxyApp := proxy.NewAppConns(clientCreator)
  458. err := proxyApp.Start()
  459. require.NoError(t, err)
  460. defer proxyApp.Stop()
  461. assert.Panics(t, func() {
  462. h := NewHandshaker(stateDB, state, store, genDoc)
  463. h.Handshake(proxyApp)
  464. })
  465. }
  466. }
  467. func makeBlocks(n int, state *sm.State, privVal types.PrivValidator) []*types.Block {
  468. blocks := make([]*types.Block, 0)
  469. var (
  470. prevBlock *types.Block
  471. prevBlockMeta *types.BlockMeta
  472. )
  473. appHeight := byte(0x01)
  474. for i := 0; i < n; i++ {
  475. height := int64(i + 1)
  476. block, parts := makeBlock(*state, prevBlock, prevBlockMeta, privVal, height)
  477. blocks = append(blocks, block)
  478. prevBlock = block
  479. prevBlockMeta = types.NewBlockMeta(block, parts)
  480. // update state
  481. state.AppHash = []byte{appHeight}
  482. appHeight++
  483. state.LastBlockHeight = height
  484. }
  485. return blocks
  486. }
  487. func makeVote(header *types.Header, blockID types.BlockID, valset *types.ValidatorSet, privVal types.PrivValidator) *types.Vote {
  488. addr := privVal.GetPubKey().Address()
  489. idx, _ := valset.GetByAddress(addr)
  490. vote := &types.Vote{
  491. ValidatorAddress: addr,
  492. ValidatorIndex: idx,
  493. Height: header.Height,
  494. Round: 1,
  495. Timestamp: tmtime.Now(),
  496. Type: types.PrecommitType,
  497. BlockID: blockID,
  498. }
  499. privVal.SignVote(header.ChainID, vote)
  500. return vote
  501. }
  502. func makeBlock(state sm.State, lastBlock *types.Block, lastBlockMeta *types.BlockMeta,
  503. privVal types.PrivValidator, height int64) (*types.Block, *types.PartSet) {
  504. lastCommit := types.NewCommit(types.BlockID{}, nil)
  505. if height > 1 {
  506. vote := makeVote(&lastBlock.Header, lastBlockMeta.BlockID, state.Validators, privVal).CommitSig()
  507. lastCommit = types.NewCommit(lastBlockMeta.BlockID, []*types.CommitSig{vote})
  508. }
  509. return state.MakeBlock(height, []types.Tx{}, lastCommit, nil, state.Validators.GetProposer().Address)
  510. }
  511. type badApp struct {
  512. abci.BaseApplication
  513. numBlocks byte
  514. height byte
  515. allHashesAreWrong bool
  516. onlyLastHashIsWrong bool
  517. }
  518. func (app *badApp) Commit() abci.ResponseCommit {
  519. app.height++
  520. if app.onlyLastHashIsWrong {
  521. if app.height == app.numBlocks {
  522. return abci.ResponseCommit{Data: cmn.RandBytes(8)}
  523. }
  524. return abci.ResponseCommit{Data: []byte{app.height}}
  525. } else if app.allHashesAreWrong {
  526. return abci.ResponseCommit{Data: cmn.RandBytes(8)}
  527. }
  528. panic("either allHashesAreWrong or onlyLastHashIsWrong must be set")
  529. }
  530. //--------------------------
  531. // utils for making blocks
  532. func makeBlockchainFromWAL(wal WAL) ([]*types.Block, []*types.Commit, error) {
  533. var height int64
  534. // Search for height marker
  535. gr, found, err := wal.SearchForEndHeight(height, &WALSearchOptions{})
  536. if err != nil {
  537. return nil, nil, err
  538. }
  539. if !found {
  540. return nil, nil, fmt.Errorf("WAL does not contain height %d", height)
  541. }
  542. defer gr.Close() // nolint: errcheck
  543. // log.Notice("Build a blockchain by reading from the WAL")
  544. var (
  545. blocks []*types.Block
  546. commits []*types.Commit
  547. thisBlockParts *types.PartSet
  548. thisBlockCommit *types.Commit
  549. )
  550. dec := NewWALDecoder(gr)
  551. for {
  552. msg, err := dec.Decode()
  553. if err == io.EOF {
  554. break
  555. } else if err != nil {
  556. return nil, nil, err
  557. }
  558. piece := readPieceFromWAL(msg)
  559. if piece == nil {
  560. continue
  561. }
  562. switch p := piece.(type) {
  563. case EndHeightMessage:
  564. // if its not the first one, we have a full block
  565. if thisBlockParts != nil {
  566. var block = new(types.Block)
  567. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(thisBlockParts.GetReader(), block, 0)
  568. if err != nil {
  569. panic(err)
  570. }
  571. if block.Height != height+1 {
  572. panic(fmt.Sprintf("read bad block from wal. got height %d, expected %d", block.Height, height+1))
  573. }
  574. commitHeight := thisBlockCommit.Precommits[0].Height
  575. if commitHeight != height+1 {
  576. panic(fmt.Sprintf("commit doesnt match. got height %d, expected %d", commitHeight, height+1))
  577. }
  578. blocks = append(blocks, block)
  579. commits = append(commits, thisBlockCommit)
  580. height++
  581. }
  582. case *types.PartSetHeader:
  583. thisBlockParts = types.NewPartSetFromHeader(*p)
  584. case *types.Part:
  585. _, err := thisBlockParts.AddPart(p)
  586. if err != nil {
  587. return nil, nil, err
  588. }
  589. case *types.Vote:
  590. if p.Type == types.PrecommitType {
  591. commitSigs := []*types.CommitSig{p.CommitSig()}
  592. thisBlockCommit = types.NewCommit(p.BlockID, commitSigs)
  593. }
  594. }
  595. }
  596. // grab the last block too
  597. var block = new(types.Block)
  598. _, err = cdc.UnmarshalBinaryLengthPrefixedReader(thisBlockParts.GetReader(), block, 0)
  599. if err != nil {
  600. panic(err)
  601. }
  602. if block.Height != height+1 {
  603. panic(fmt.Sprintf("read bad block from wal. got height %d, expected %d", block.Height, height+1))
  604. }
  605. commitHeight := thisBlockCommit.Precommits[0].Height
  606. if commitHeight != height+1 {
  607. panic(fmt.Sprintf("commit doesnt match. got height %d, expected %d", commitHeight, height+1))
  608. }
  609. blocks = append(blocks, block)
  610. commits = append(commits, thisBlockCommit)
  611. return blocks, commits, nil
  612. }
  613. func readPieceFromWAL(msg *TimedWALMessage) interface{} {
  614. // for logging
  615. switch m := msg.Msg.(type) {
  616. case msgInfo:
  617. switch msg := m.Msg.(type) {
  618. case *ProposalMessage:
  619. return &msg.Proposal.BlockID.PartsHeader
  620. case *BlockPartMessage:
  621. return msg.Part
  622. case *VoteMessage:
  623. return msg.Vote
  624. }
  625. case EndHeightMessage:
  626. return m
  627. }
  628. return nil
  629. }
  630. // fresh state and mock store
  631. func stateAndStore(config *cfg.Config, pubKey crypto.PubKey, appVersion version.Protocol) (dbm.DB, sm.State, *mockBlockStore) {
  632. stateDB := dbm.NewMemDB()
  633. state, _ := sm.MakeGenesisStateFromFile(config.GenesisFile())
  634. state.Version.Consensus.App = appVersion
  635. store := newMockBlockStore(config, state.ConsensusParams)
  636. return stateDB, state, store
  637. }
  638. //----------------------------------
  639. // mock block store
  640. type mockBlockStore struct {
  641. config *cfg.Config
  642. params types.ConsensusParams
  643. chain []*types.Block
  644. commits []*types.Commit
  645. }
  646. // TODO: NewBlockStore(db.NewMemDB) ...
  647. func newMockBlockStore(config *cfg.Config, params types.ConsensusParams) *mockBlockStore {
  648. return &mockBlockStore{config, params, nil, nil}
  649. }
  650. func (bs *mockBlockStore) Height() int64 { return int64(len(bs.chain)) }
  651. func (bs *mockBlockStore) LoadBlock(height int64) *types.Block { return bs.chain[height-1] }
  652. func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta {
  653. block := bs.chain[height-1]
  654. return &types.BlockMeta{
  655. BlockID: types.BlockID{Hash: block.Hash(), PartsHeader: block.MakePartSet(types.BlockPartSizeBytes).Header()},
  656. Header: block.Header,
  657. }
  658. }
  659. func (bs *mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { return nil }
  660. func (bs *mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
  661. }
  662. func (bs *mockBlockStore) LoadBlockCommit(height int64) *types.Commit {
  663. return bs.commits[height-1]
  664. }
  665. func (bs *mockBlockStore) LoadSeenCommit(height int64) *types.Commit {
  666. return bs.commits[height-1]
  667. }
  668. //---------------------------------------
  669. // Test handshake/init chain
  670. func TestHandshakeUpdatesValidators(t *testing.T) {
  671. val, _ := types.RandValidator(true, 10)
  672. vals := types.NewValidatorSet([]*types.Validator{val})
  673. app := &initChainApp{vals: types.TM2PB.ValidatorUpdates(vals)}
  674. clientCreator := proxy.NewLocalClientCreator(app)
  675. config := ResetConfig("handshake_test_")
  676. defer os.RemoveAll(config.RootDir)
  677. privVal := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile())
  678. stateDB, state, store := stateAndStore(config, privVal.GetPubKey(), 0x0)
  679. oldValAddr := state.Validators.Validators[0].Address
  680. // now start the app using the handshake - it should sync
  681. genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile())
  682. handshaker := NewHandshaker(stateDB, state, store, genDoc)
  683. proxyApp := proxy.NewAppConns(clientCreator)
  684. if err := proxyApp.Start(); err != nil {
  685. t.Fatalf("Error starting proxy app connections: %v", err)
  686. }
  687. defer proxyApp.Stop()
  688. if err := handshaker.Handshake(proxyApp); err != nil {
  689. t.Fatalf("Error on abci handshake: %v", err)
  690. }
  691. // reload the state, check the validator set was updated
  692. state = sm.LoadState(stateDB)
  693. newValAddr := state.Validators.Validators[0].Address
  694. expectValAddr := val.Address
  695. assert.NotEqual(t, oldValAddr, newValAddr)
  696. assert.Equal(t, newValAddr, expectValAddr)
  697. }
  698. // returns the vals on InitChain
  699. type initChainApp struct {
  700. abci.BaseApplication
  701. vals []abci.ValidatorUpdate
  702. }
  703. func (ica *initChainApp) InitChain(req abci.RequestInitChain) abci.ResponseInitChain {
  704. return abci.ResponseInitChain{
  705. Validators: ica.vals,
  706. }
  707. }