You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

309 lines
12 KiB

8 years ago
8 years ago
8 years ago
8 years ago
  1. package consensus
  2. import (
  3. "fmt"
  4. "sync"
  5. "testing"
  6. "time"
  7. "github.com/tendermint/tendermint/config/tendermint_test"
  8. "github.com/tendermint/tmlibs/events"
  9. "github.com/tendermint/go-p2p"
  10. "github.com/tendermint/tendermint/types"
  11. "github.com/tendermint/abci/example/dummy"
  12. )
  13. func init() {
  14. config = tendermint_test.ResetConfig("consensus_reactor_test")
  15. }
  16. //----------------------------------------------
  17. // in-process testnets
  18. func startConsensusNet(t *testing.T, css []*ConsensusState, N int, subscribeEventRespond bool) ([]*ConsensusReactor, []chan interface{}) {
  19. reactors := make([]*ConsensusReactor, N)
  20. eventChans := make([]chan interface{}, N)
  21. for i := 0; i < N; i++ {
  22. reactors[i] = NewConsensusReactor(css[i], true) // so we dont start the consensus states
  23. eventSwitch := events.NewEventSwitch()
  24. _, err := eventSwitch.Start()
  25. if err != nil {
  26. t.Fatalf("Failed to start switch: %v", err)
  27. }
  28. reactors[i].SetEventSwitch(eventSwitch)
  29. if subscribeEventRespond {
  30. eventChans[i] = subscribeToEventRespond(eventSwitch, "tester", types.EventStringNewBlock())
  31. } else {
  32. eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1)
  33. }
  34. }
  35. // make connected switches and start all reactors
  36. p2p.MakeConnectedSwitches(N, func(i int, s *p2p.Switch) *p2p.Switch {
  37. s.AddReactor("CONSENSUS", reactors[i])
  38. return s
  39. }, p2p.Connect2Switches)
  40. // now that everyone is connected, start the state machines
  41. // If we started the state machines before everyone was connected,
  42. // we'd block when the cs fires NewBlockEvent and the peers are trying to start their reactors
  43. for i := 0; i < N; i++ {
  44. s := reactors[i].conS.GetState()
  45. reactors[i].SwitchToConsensus(s)
  46. }
  47. return reactors, eventChans
  48. }
  49. func stopConsensusNet(reactors []*ConsensusReactor) {
  50. for _, r := range reactors {
  51. r.Switch.Stop()
  52. }
  53. }
  54. // Ensure a testnet makes blocks
  55. func TestReactor(t *testing.T) {
  56. N := 4
  57. css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
  58. reactors, eventChans := startConsensusNet(t, css, N, false)
  59. defer stopConsensusNet(reactors)
  60. // wait till everyone makes the first new block
  61. timeoutWaitGroup(t, N, func(wg *sync.WaitGroup, j int) {
  62. <-eventChans[j]
  63. wg.Done()
  64. }, css)
  65. }
  66. //-------------------------------------------------------------
  67. // ensure we can make blocks despite cycling a validator set
  68. func TestVotingPowerChange(t *testing.T) {
  69. nVals := 4
  70. css := randConsensusNet(nVals, "consensus_voting_power_changes_test", newMockTickerFunc(true), newPersistentDummy)
  71. reactors, eventChans := startConsensusNet(t, css, nVals, true)
  72. defer stopConsensusNet(reactors)
  73. // map of active validators
  74. activeVals := make(map[string]struct{})
  75. for i := 0; i < nVals; i++ {
  76. activeVals[string(css[i].privValidator.GetAddress())] = struct{}{}
  77. }
  78. // wait till everyone makes block 1
  79. timeoutWaitGroup(t, nVals, func(wg *sync.WaitGroup, j int) {
  80. <-eventChans[j]
  81. eventChans[j] <- struct{}{}
  82. wg.Done()
  83. }, css)
  84. //---------------------------------------------------------------------------
  85. log.Info("---------------------------- Testing changing the voting power of one validator a few times")
  86. val1PubKey := css[0].privValidator.(*types.PrivValidator).PubKey
  87. updateValidatorTx := dummy.MakeValSetChangeTx(val1PubKey.Bytes(), 25)
  88. previousTotalVotingPower := css[0].GetRoundState().LastValidators.TotalVotingPower()
  89. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx)
  90. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  91. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  92. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  93. if css[0].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower {
  94. t.Fatalf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[0].GetRoundState().LastValidators.TotalVotingPower())
  95. }
  96. updateValidatorTx = dummy.MakeValSetChangeTx(val1PubKey.Bytes(), 2)
  97. previousTotalVotingPower = css[0].GetRoundState().LastValidators.TotalVotingPower()
  98. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx)
  99. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  100. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  101. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  102. if css[0].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower {
  103. t.Fatalf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[0].GetRoundState().LastValidators.TotalVotingPower())
  104. }
  105. updateValidatorTx = dummy.MakeValSetChangeTx(val1PubKey.Bytes(), 100)
  106. previousTotalVotingPower = css[0].GetRoundState().LastValidators.TotalVotingPower()
  107. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx)
  108. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  109. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  110. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  111. if css[0].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower {
  112. t.Fatalf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[0].GetRoundState().LastValidators.TotalVotingPower())
  113. }
  114. }
  115. func TestValidatorSetChanges(t *testing.T) {
  116. nPeers := 7
  117. nVals := 4
  118. css := randConsensusNetWithPeers(nVals, nPeers, "consensus_val_set_changes_test", newMockTickerFunc(true), newPersistentDummy)
  119. reactors, eventChans := startConsensusNet(t, css, nPeers, true)
  120. defer stopConsensusNet(reactors)
  121. // map of active validators
  122. activeVals := make(map[string]struct{})
  123. for i := 0; i < nVals; i++ {
  124. activeVals[string(css[i].privValidator.GetAddress())] = struct{}{}
  125. }
  126. // wait till everyone makes block 1
  127. timeoutWaitGroup(t, nPeers, func(wg *sync.WaitGroup, j int) {
  128. <-eventChans[j]
  129. eventChans[j] <- struct{}{}
  130. wg.Done()
  131. }, css)
  132. //---------------------------------------------------------------------------
  133. log.Info("---------------------------- Testing adding one validator")
  134. newValidatorPubKey1 := css[nVals].privValidator.(*types.PrivValidator).PubKey
  135. newValidatorTx1 := dummy.MakeValSetChangeTx(newValidatorPubKey1.Bytes(), uint64(testMinPower))
  136. // wait till everyone makes block 2
  137. // ensure the commit includes all validators
  138. // send newValTx to change vals in block 3
  139. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, newValidatorTx1)
  140. // wait till everyone makes block 3.
  141. // it includes the commit for block 2, which is by the original validator set
  142. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  143. // wait till everyone makes block 4.
  144. // it includes the commit for block 3, which is by the original validator set
  145. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  146. // the commits for block 4 should be with the updated validator set
  147. activeVals[string(newValidatorPubKey1.Address())] = struct{}{}
  148. // wait till everyone makes block 5
  149. // it includes the commit for block 4, which should have the updated validator set
  150. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  151. //---------------------------------------------------------------------------
  152. log.Info("---------------------------- Testing changing the voting power of one validator")
  153. updateValidatorPubKey1 := css[nVals].privValidator.(*types.PrivValidator).PubKey
  154. updateValidatorTx1 := dummy.MakeValSetChangeTx(updateValidatorPubKey1.Bytes(), 25)
  155. previousTotalVotingPower := css[nVals].GetRoundState().LastValidators.TotalVotingPower()
  156. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, updateValidatorTx1)
  157. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  158. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  159. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  160. if css[nVals].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower {
  161. t.Errorf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[nVals].GetRoundState().LastValidators.TotalVotingPower())
  162. }
  163. //---------------------------------------------------------------------------
  164. log.Info("---------------------------- Testing adding two validators at once")
  165. newValidatorPubKey2 := css[nVals+1].privValidator.(*types.PrivValidator).PubKey
  166. newValidatorTx2 := dummy.MakeValSetChangeTx(newValidatorPubKey2.Bytes(), uint64(testMinPower))
  167. newValidatorPubKey3 := css[nVals+2].privValidator.(*types.PrivValidator).PubKey
  168. newValidatorTx3 := dummy.MakeValSetChangeTx(newValidatorPubKey3.Bytes(), uint64(testMinPower))
  169. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, newValidatorTx2, newValidatorTx3)
  170. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  171. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  172. activeVals[string(newValidatorPubKey2.Address())] = struct{}{}
  173. activeVals[string(newValidatorPubKey3.Address())] = struct{}{}
  174. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  175. //---------------------------------------------------------------------------
  176. log.Info("---------------------------- Testing removing two validators at once")
  177. removeValidatorTx2 := dummy.MakeValSetChangeTx(newValidatorPubKey2.Bytes(), 0)
  178. removeValidatorTx3 := dummy.MakeValSetChangeTx(newValidatorPubKey3.Bytes(), 0)
  179. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, removeValidatorTx2, removeValidatorTx3)
  180. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  181. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  182. delete(activeVals, string(newValidatorPubKey2.Address()))
  183. delete(activeVals, string(newValidatorPubKey3.Address()))
  184. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  185. }
  186. // Check we can make blocks with skip_timeout_commit=false
  187. func TestReactorWithTimeoutCommit(t *testing.T) {
  188. N := 4
  189. css := randConsensusNet(N, "consensus_reactor_with_timeout_commit_test", newMockTickerFunc(false), newCounter)
  190. // override default SkipTimeoutCommit == true for tests
  191. for i := 0; i < N; i++ {
  192. css[i].timeoutParams.SkipTimeoutCommit = false
  193. }
  194. reactors, eventChans := startConsensusNet(t, css, N-1, false)
  195. defer stopConsensusNet(reactors)
  196. // wait till everyone makes the first new block
  197. timeoutWaitGroup(t, N-1, func(wg *sync.WaitGroup, j int) {
  198. <-eventChans[j]
  199. wg.Done()
  200. }, css)
  201. }
  202. func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) {
  203. timeoutWaitGroup(t, n, func(wg *sync.WaitGroup, j int) {
  204. newBlockI := <-eventChans[j]
  205. newBlock := newBlockI.(types.EventDataNewBlock).Block
  206. log.Warn("Got block", "height", newBlock.Height, "validator", j)
  207. err := validateBlock(newBlock, activeVals)
  208. if err != nil {
  209. t.Fatal(err)
  210. }
  211. for _, tx := range txs {
  212. css[j].mempool.CheckTx(tx, nil)
  213. }
  214. eventChans[j] <- struct{}{}
  215. wg.Done()
  216. log.Warn("Done wait group", "height", newBlock.Height, "validator", j)
  217. }, css)
  218. }
  219. // expects high synchrony!
  220. func validateBlock(block *types.Block, activeVals map[string]struct{}) error {
  221. if block.LastCommit.Size() != len(activeVals) {
  222. return fmt.Errorf("Commit size doesn't match number of active validators. Got %d, expected %d", block.LastCommit.Size(), len(activeVals))
  223. }
  224. for _, vote := range block.LastCommit.Precommits {
  225. if _, ok := activeVals[string(vote.ValidatorAddress)]; !ok {
  226. return fmt.Errorf("Found vote for unactive validator %X", vote.ValidatorAddress)
  227. }
  228. }
  229. return nil
  230. }
  231. func timeoutWaitGroup(t *testing.T, n int, f func(*sync.WaitGroup, int), css []*ConsensusState) {
  232. wg := new(sync.WaitGroup)
  233. wg.Add(n)
  234. for i := 0; i < n; i++ {
  235. go f(wg, i)
  236. }
  237. done := make(chan struct{})
  238. go func() {
  239. wg.Wait()
  240. close(done)
  241. }()
  242. select {
  243. case <-done:
  244. case <-time.After(time.Second * 10):
  245. for i, cs := range css {
  246. fmt.Println("#################")
  247. fmt.Println("Validator", i)
  248. fmt.Println(cs.GetRoundState())
  249. fmt.Println("")
  250. }
  251. panic("Timed out waiting for all validators to commit a block")
  252. }
  253. }