You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

307 lines
12 KiB

8 years ago
8 years ago
  1. package consensus
  2. import (
  3. "fmt"
  4. "sync"
  5. "testing"
  6. "time"
  7. "github.com/tendermint/abci/example/dummy"
  8. "github.com/tendermint/tendermint/p2p"
  9. "github.com/tendermint/tendermint/types"
  10. "github.com/tendermint/tmlibs/events"
  11. )
  12. func init() {
  13. config = ResetConfig("consensus_reactor_test")
  14. }
  15. //----------------------------------------------
  16. // in-process testnets
  17. func startConsensusNet(t *testing.T, css []*ConsensusState, N int, subscribeEventRespond bool) ([]*ConsensusReactor, []chan interface{}) {
  18. reactors := make([]*ConsensusReactor, N)
  19. eventChans := make([]chan interface{}, N)
  20. for i := 0; i < N; i++ {
  21. reactors[i] = NewConsensusReactor(css[i], true) // so we dont start the consensus states
  22. eventSwitch := events.NewEventSwitch()
  23. _, err := eventSwitch.Start()
  24. if err != nil {
  25. t.Fatalf("Failed to start switch: %v", err)
  26. }
  27. reactors[i].SetEventSwitch(eventSwitch)
  28. if subscribeEventRespond {
  29. eventChans[i] = subscribeToEventRespond(eventSwitch, "tester", types.EventStringNewBlock())
  30. } else {
  31. eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1)
  32. }
  33. }
  34. // make connected switches and start all reactors
  35. p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch {
  36. s.AddReactor("CONSENSUS", reactors[i])
  37. return s
  38. }, p2p.Connect2Switches)
  39. // now that everyone is connected, start the state machines
  40. // If we started the state machines before everyone was connected,
  41. // we'd block when the cs fires NewBlockEvent and the peers are trying to start their reactors
  42. for i := 0; i < N; i++ {
  43. s := reactors[i].conS.GetState()
  44. reactors[i].SwitchToConsensus(s)
  45. }
  46. return reactors, eventChans
  47. }
  48. func stopConsensusNet(reactors []*ConsensusReactor) {
  49. for _, r := range reactors {
  50. r.Switch.Stop()
  51. }
  52. }
  53. // Ensure a testnet makes blocks
  54. func TestReactor(t *testing.T) {
  55. N := 4
  56. css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
  57. reactors, eventChans := startConsensusNet(t, css, N, false)
  58. defer stopConsensusNet(reactors)
  59. // wait till everyone makes the first new block
  60. timeoutWaitGroup(t, N, func(wg *sync.WaitGroup, j int) {
  61. <-eventChans[j]
  62. wg.Done()
  63. }, css)
  64. }
  65. //-------------------------------------------------------------
  66. // ensure we can make blocks despite cycling a validator set
  67. func TestVotingPowerChange(t *testing.T) {
  68. nVals := 4
  69. css := randConsensusNet(nVals, "consensus_voting_power_changes_test", newMockTickerFunc(true), newPersistentDummy)
  70. reactors, eventChans := startConsensusNet(t, css, nVals, true)
  71. defer stopConsensusNet(reactors)
  72. // map of active validators
  73. activeVals := make(map[string]struct{})
  74. for i := 0; i < nVals; i++ {
  75. activeVals[string(css[i].privValidator.GetAddress())] = struct{}{}
  76. }
  77. // wait till everyone makes block 1
  78. timeoutWaitGroup(t, nVals, func(wg *sync.WaitGroup, j int) {
  79. <-eventChans[j]
  80. eventChans[j] <- struct{}{}
  81. wg.Done()
  82. }, css)
  83. //---------------------------------------------------------------------------
  84. log.Info("---------------------------- Testing changing the voting power of one validator a few times")
  85. val1PubKey := css[0].privValidator.(*types.PrivValidator).PubKey
  86. updateValidatorTx := dummy.MakeValSetChangeTx(val1PubKey.Bytes(), 25)
  87. previousTotalVotingPower := css[0].GetRoundState().LastValidators.TotalVotingPower()
  88. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx)
  89. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  90. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  91. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  92. if css[0].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower {
  93. t.Fatalf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[0].GetRoundState().LastValidators.TotalVotingPower())
  94. }
  95. updateValidatorTx = dummy.MakeValSetChangeTx(val1PubKey.Bytes(), 2)
  96. previousTotalVotingPower = css[0].GetRoundState().LastValidators.TotalVotingPower()
  97. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx)
  98. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  99. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  100. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  101. if css[0].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower {
  102. t.Fatalf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[0].GetRoundState().LastValidators.TotalVotingPower())
  103. }
  104. updateValidatorTx = dummy.MakeValSetChangeTx(val1PubKey.Bytes(), 100)
  105. previousTotalVotingPower = css[0].GetRoundState().LastValidators.TotalVotingPower()
  106. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx)
  107. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  108. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  109. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  110. if css[0].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower {
  111. t.Fatalf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[0].GetRoundState().LastValidators.TotalVotingPower())
  112. }
  113. }
  114. func TestValidatorSetChanges(t *testing.T) {
  115. nPeers := 7
  116. nVals := 4
  117. css := randConsensusNetWithPeers(nVals, nPeers, "consensus_val_set_changes_test", newMockTickerFunc(true), newPersistentDummy)
  118. reactors, eventChans := startConsensusNet(t, css, nPeers, true)
  119. defer stopConsensusNet(reactors)
  120. // map of active validators
  121. activeVals := make(map[string]struct{})
  122. for i := 0; i < nVals; i++ {
  123. activeVals[string(css[i].privValidator.GetAddress())] = struct{}{}
  124. }
  125. // wait till everyone makes block 1
  126. timeoutWaitGroup(t, nPeers, func(wg *sync.WaitGroup, j int) {
  127. <-eventChans[j]
  128. eventChans[j] <- struct{}{}
  129. wg.Done()
  130. }, css)
  131. //---------------------------------------------------------------------------
  132. log.Info("---------------------------- Testing adding one validator")
  133. newValidatorPubKey1 := css[nVals].privValidator.(*types.PrivValidator).PubKey
  134. newValidatorTx1 := dummy.MakeValSetChangeTx(newValidatorPubKey1.Bytes(), uint64(testMinPower))
  135. // wait till everyone makes block 2
  136. // ensure the commit includes all validators
  137. // send newValTx to change vals in block 3
  138. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, newValidatorTx1)
  139. // wait till everyone makes block 3.
  140. // it includes the commit for block 2, which is by the original validator set
  141. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  142. // wait till everyone makes block 4.
  143. // it includes the commit for block 3, which is by the original validator set
  144. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  145. // the commits for block 4 should be with the updated validator set
  146. activeVals[string(newValidatorPubKey1.Address())] = struct{}{}
  147. // wait till everyone makes block 5
  148. // it includes the commit for block 4, which should have the updated validator set
  149. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  150. //---------------------------------------------------------------------------
  151. log.Info("---------------------------- Testing changing the voting power of one validator")
  152. updateValidatorPubKey1 := css[nVals].privValidator.(*types.PrivValidator).PubKey
  153. updateValidatorTx1 := dummy.MakeValSetChangeTx(updateValidatorPubKey1.Bytes(), 25)
  154. previousTotalVotingPower := css[nVals].GetRoundState().LastValidators.TotalVotingPower()
  155. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, updateValidatorTx1)
  156. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  157. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  158. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  159. if css[nVals].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower {
  160. t.Errorf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[nVals].GetRoundState().LastValidators.TotalVotingPower())
  161. }
  162. //---------------------------------------------------------------------------
  163. log.Info("---------------------------- Testing adding two validators at once")
  164. newValidatorPubKey2 := css[nVals+1].privValidator.(*types.PrivValidator).PubKey
  165. newValidatorTx2 := dummy.MakeValSetChangeTx(newValidatorPubKey2.Bytes(), uint64(testMinPower))
  166. newValidatorPubKey3 := css[nVals+2].privValidator.(*types.PrivValidator).PubKey
  167. newValidatorTx3 := dummy.MakeValSetChangeTx(newValidatorPubKey3.Bytes(), uint64(testMinPower))
  168. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, newValidatorTx2, newValidatorTx3)
  169. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  170. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  171. activeVals[string(newValidatorPubKey2.Address())] = struct{}{}
  172. activeVals[string(newValidatorPubKey3.Address())] = struct{}{}
  173. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  174. //---------------------------------------------------------------------------
  175. log.Info("---------------------------- Testing removing two validators at once")
  176. removeValidatorTx2 := dummy.MakeValSetChangeTx(newValidatorPubKey2.Bytes(), 0)
  177. removeValidatorTx3 := dummy.MakeValSetChangeTx(newValidatorPubKey3.Bytes(), 0)
  178. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, removeValidatorTx2, removeValidatorTx3)
  179. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  180. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  181. delete(activeVals, string(newValidatorPubKey2.Address()))
  182. delete(activeVals, string(newValidatorPubKey3.Address()))
  183. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  184. }
  185. // Check we can make blocks with skip_timeout_commit=false
  186. func TestReactorWithTimeoutCommit(t *testing.T) {
  187. N := 4
  188. css := randConsensusNet(N, "consensus_reactor_with_timeout_commit_test", newMockTickerFunc(false), newCounter)
  189. // override default SkipTimeoutCommit == true for tests
  190. for i := 0; i < N; i++ {
  191. css[i].config.SkipTimeoutCommit = false
  192. }
  193. reactors, eventChans := startConsensusNet(t, css, N-1, false)
  194. defer stopConsensusNet(reactors)
  195. // wait till everyone makes the first new block
  196. timeoutWaitGroup(t, N-1, func(wg *sync.WaitGroup, j int) {
  197. <-eventChans[j]
  198. wg.Done()
  199. }, css)
  200. }
  201. func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) {
  202. timeoutWaitGroup(t, n, func(wg *sync.WaitGroup, j int) {
  203. newBlockI := <-eventChans[j]
  204. newBlock := newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block
  205. log.Warn("Got block", "height", newBlock.Height, "validator", j)
  206. err := validateBlock(newBlock, activeVals)
  207. if err != nil {
  208. t.Fatal(err)
  209. }
  210. for _, tx := range txs {
  211. css[j].mempool.CheckTx(tx, nil)
  212. }
  213. eventChans[j] <- struct{}{}
  214. wg.Done()
  215. log.Warn("Done wait group", "height", newBlock.Height, "validator", j)
  216. }, css)
  217. }
  218. // expects high synchrony!
  219. func validateBlock(block *types.Block, activeVals map[string]struct{}) error {
  220. if block.LastCommit.Size() != len(activeVals) {
  221. return fmt.Errorf("Commit size doesn't match number of active validators. Got %d, expected %d", block.LastCommit.Size(), len(activeVals))
  222. }
  223. for _, vote := range block.LastCommit.Precommits {
  224. if _, ok := activeVals[string(vote.ValidatorAddress)]; !ok {
  225. return fmt.Errorf("Found vote for unactive validator %X", vote.ValidatorAddress)
  226. }
  227. }
  228. return nil
  229. }
  230. func timeoutWaitGroup(t *testing.T, n int, f func(*sync.WaitGroup, int), css []*ConsensusState) {
  231. wg := new(sync.WaitGroup)
  232. wg.Add(n)
  233. for i := 0; i < n; i++ {
  234. go f(wg, i)
  235. }
  236. done := make(chan struct{})
  237. go func() {
  238. wg.Wait()
  239. close(done)
  240. }()
  241. select {
  242. case <-done:
  243. case <-time.After(time.Second * 10):
  244. for i, cs := range css {
  245. fmt.Println("#################")
  246. fmt.Println("Validator", i)
  247. fmt.Println(cs.GetRoundState())
  248. fmt.Println("")
  249. }
  250. panic("Timed out waiting for all validators to commit a block")
  251. }
  252. }