You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

340 lines
13 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package consensus
  2. import (
  3. "fmt"
  4. "sync"
  5. "testing"
  6. "time"
  7. "github.com/tendermint/abci/example/dummy"
  8. "github.com/tendermint/tmlibs/events"
  9. cfg "github.com/tendermint/tendermint/config"
  10. "github.com/tendermint/tendermint/p2p"
  11. "github.com/tendermint/tendermint/types"
  12. )
  13. func init() {
  14. config = ResetConfig("consensus_reactor_test")
  15. }
  16. //----------------------------------------------
  17. // in-process testnets
  18. func startConsensusNet(t *testing.T, css []*ConsensusState, N int, subscribeEventRespond bool) ([]*ConsensusReactor, []chan interface{}) {
  19. reactors := make([]*ConsensusReactor, N)
  20. eventChans := make([]chan interface{}, N)
  21. logger := consensusLogger()
  22. for i := 0; i < N; i++ {
  23. reactors[i] = NewConsensusReactor(css[i], true) // so we dont start the consensus states
  24. reactors[i].SetLogger(logger.With("validator", i))
  25. eventSwitch := events.NewEventSwitch()
  26. eventSwitch.SetLogger(logger.With("module", "events", "validator", i))
  27. _, err := eventSwitch.Start()
  28. if err != nil {
  29. t.Fatalf("Failed to start switch: %v", err)
  30. }
  31. reactors[i].SetEventSwitch(eventSwitch)
  32. if subscribeEventRespond {
  33. eventChans[i] = subscribeToEventRespond(eventSwitch, "tester", types.EventStringNewBlock())
  34. } else {
  35. eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1)
  36. }
  37. }
  38. // make connected switches and start all reactors
  39. p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch {
  40. s.AddReactor("CONSENSUS", reactors[i])
  41. return s
  42. }, p2p.Connect2Switches)
  43. // now that everyone is connected, start the state machines
  44. // If we started the state machines before everyone was connected,
  45. // we'd block when the cs fires NewBlockEvent and the peers are trying to start their reactors
  46. for i := 0; i < N; i++ {
  47. s := reactors[i].conS.GetState()
  48. reactors[i].SwitchToConsensus(s)
  49. }
  50. return reactors, eventChans
  51. }
  52. func stopConsensusNet(reactors []*ConsensusReactor) {
  53. for _, r := range reactors {
  54. r.Switch.Stop()
  55. }
  56. }
  57. // Ensure a testnet makes blocks
  58. func TestReactor(t *testing.T) {
  59. N := 4
  60. css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
  61. reactors, eventChans := startConsensusNet(t, css, N, false)
  62. defer stopConsensusNet(reactors)
  63. // wait till everyone makes the first new block
  64. timeoutWaitGroup(t, N, func(wg *sync.WaitGroup, j int) {
  65. <-eventChans[j]
  66. wg.Done()
  67. }, css)
  68. }
  69. // Ensure a testnet sends proposal heartbeats and makes blocks when there are txs
  70. func TestReactorProposalHeartbeats(t *testing.T) {
  71. N := 4
  72. css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter,
  73. func(c *cfg.Config) {
  74. c.Consensus.CreateEmptyBlocks = false
  75. })
  76. reactors, eventChans := startConsensusNet(t, css, N, false)
  77. defer stopConsensusNet(reactors)
  78. heartbeatChans := make([]chan interface{}, N)
  79. for i := 0; i < N; i++ {
  80. heartbeatChans[i] = subscribeToEvent(css[i].evsw, "tester", types.EventStringProposalHeartbeat(), 1)
  81. }
  82. // wait till everyone sends a proposal heartbeat
  83. timeoutWaitGroup(t, N, func(wg *sync.WaitGroup, j int) {
  84. <-heartbeatChans[j]
  85. wg.Done()
  86. }, css)
  87. // send a tx
  88. css[3].mempool.CheckTx([]byte{1, 2, 3}, nil)
  89. // wait till everyone makes the first new block
  90. timeoutWaitGroup(t, N, func(wg *sync.WaitGroup, j int) {
  91. <-eventChans[j]
  92. wg.Done()
  93. }, css)
  94. }
  95. //-------------------------------------------------------------
  96. // ensure we can make blocks despite cycling a validator set
  97. func TestVotingPowerChange(t *testing.T) {
  98. nVals := 4
  99. css := randConsensusNet(nVals, "consensus_voting_power_changes_test", newMockTickerFunc(true), newPersistentDummy)
  100. reactors, eventChans := startConsensusNet(t, css, nVals, true)
  101. defer stopConsensusNet(reactors)
  102. // map of active validators
  103. activeVals := make(map[string]struct{})
  104. for i := 0; i < nVals; i++ {
  105. activeVals[string(css[i].privValidator.GetAddress())] = struct{}{}
  106. }
  107. // wait till everyone makes block 1
  108. timeoutWaitGroup(t, nVals, func(wg *sync.WaitGroup, j int) {
  109. <-eventChans[j]
  110. eventChans[j] <- struct{}{}
  111. wg.Done()
  112. }, css)
  113. //---------------------------------------------------------------------------
  114. t.Log("---------------------------- Testing changing the voting power of one validator a few times")
  115. val1PubKey := css[0].privValidator.(*types.PrivValidator).PubKey
  116. updateValidatorTx := dummy.MakeValSetChangeTx(val1PubKey.Bytes(), 25)
  117. previousTotalVotingPower := css[0].GetRoundState().LastValidators.TotalVotingPower()
  118. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx)
  119. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  120. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  121. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  122. if css[0].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower {
  123. t.Fatalf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[0].GetRoundState().LastValidators.TotalVotingPower())
  124. }
  125. updateValidatorTx = dummy.MakeValSetChangeTx(val1PubKey.Bytes(), 2)
  126. previousTotalVotingPower = css[0].GetRoundState().LastValidators.TotalVotingPower()
  127. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx)
  128. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  129. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  130. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  131. if css[0].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower {
  132. t.Fatalf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[0].GetRoundState().LastValidators.TotalVotingPower())
  133. }
  134. updateValidatorTx = dummy.MakeValSetChangeTx(val1PubKey.Bytes(), 100)
  135. previousTotalVotingPower = css[0].GetRoundState().LastValidators.TotalVotingPower()
  136. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx)
  137. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  138. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  139. waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
  140. if css[0].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower {
  141. t.Fatalf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[0].GetRoundState().LastValidators.TotalVotingPower())
  142. }
  143. }
  144. func TestValidatorSetChanges(t *testing.T) {
  145. nPeers := 7
  146. nVals := 4
  147. css := randConsensusNetWithPeers(nVals, nPeers, "consensus_val_set_changes_test", newMockTickerFunc(true), newPersistentDummy)
  148. reactors, eventChans := startConsensusNet(t, css, nPeers, true)
  149. defer stopConsensusNet(reactors)
  150. // map of active validators
  151. activeVals := make(map[string]struct{})
  152. for i := 0; i < nVals; i++ {
  153. activeVals[string(css[i].privValidator.GetAddress())] = struct{}{}
  154. }
  155. // wait till everyone makes block 1
  156. timeoutWaitGroup(t, nPeers, func(wg *sync.WaitGroup, j int) {
  157. <-eventChans[j]
  158. eventChans[j] <- struct{}{}
  159. wg.Done()
  160. }, css)
  161. //---------------------------------------------------------------------------
  162. t.Log("---------------------------- Testing adding one validator")
  163. newValidatorPubKey1 := css[nVals].privValidator.(*types.PrivValidator).PubKey
  164. newValidatorTx1 := dummy.MakeValSetChangeTx(newValidatorPubKey1.Bytes(), uint64(testMinPower))
  165. // wait till everyone makes block 2
  166. // ensure the commit includes all validators
  167. // send newValTx to change vals in block 3
  168. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, newValidatorTx1)
  169. // wait till everyone makes block 3.
  170. // it includes the commit for block 2, which is by the original validator set
  171. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  172. // wait till everyone makes block 4.
  173. // it includes the commit for block 3, which is by the original validator set
  174. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  175. // the commits for block 4 should be with the updated validator set
  176. activeVals[string(newValidatorPubKey1.Address())] = struct{}{}
  177. // wait till everyone makes block 5
  178. // it includes the commit for block 4, which should have the updated validator set
  179. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  180. //---------------------------------------------------------------------------
  181. t.Log("---------------------------- Testing changing the voting power of one validator")
  182. updateValidatorPubKey1 := css[nVals].privValidator.(*types.PrivValidator).PubKey
  183. updateValidatorTx1 := dummy.MakeValSetChangeTx(updateValidatorPubKey1.Bytes(), 25)
  184. previousTotalVotingPower := css[nVals].GetRoundState().LastValidators.TotalVotingPower()
  185. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, updateValidatorTx1)
  186. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  187. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  188. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  189. if css[nVals].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower {
  190. t.Errorf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[nVals].GetRoundState().LastValidators.TotalVotingPower())
  191. }
  192. //---------------------------------------------------------------------------
  193. t.Log("---------------------------- Testing adding two validators at once")
  194. newValidatorPubKey2 := css[nVals+1].privValidator.(*types.PrivValidator).PubKey
  195. newValidatorTx2 := dummy.MakeValSetChangeTx(newValidatorPubKey2.Bytes(), uint64(testMinPower))
  196. newValidatorPubKey3 := css[nVals+2].privValidator.(*types.PrivValidator).PubKey
  197. newValidatorTx3 := dummy.MakeValSetChangeTx(newValidatorPubKey3.Bytes(), uint64(testMinPower))
  198. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, newValidatorTx2, newValidatorTx3)
  199. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  200. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  201. activeVals[string(newValidatorPubKey2.Address())] = struct{}{}
  202. activeVals[string(newValidatorPubKey3.Address())] = struct{}{}
  203. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  204. //---------------------------------------------------------------------------
  205. t.Log("---------------------------- Testing removing two validators at once")
  206. removeValidatorTx2 := dummy.MakeValSetChangeTx(newValidatorPubKey2.Bytes(), 0)
  207. removeValidatorTx3 := dummy.MakeValSetChangeTx(newValidatorPubKey3.Bytes(), 0)
  208. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, removeValidatorTx2, removeValidatorTx3)
  209. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  210. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  211. delete(activeVals, string(newValidatorPubKey2.Address()))
  212. delete(activeVals, string(newValidatorPubKey3.Address()))
  213. waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
  214. }
  215. // Check we can make blocks with skip_timeout_commit=false
  216. func TestReactorWithTimeoutCommit(t *testing.T) {
  217. N := 4
  218. css := randConsensusNet(N, "consensus_reactor_with_timeout_commit_test", newMockTickerFunc(false), newCounter)
  219. // override default SkipTimeoutCommit == true for tests
  220. for i := 0; i < N; i++ {
  221. css[i].config.SkipTimeoutCommit = false
  222. }
  223. reactors, eventChans := startConsensusNet(t, css, N-1, false)
  224. defer stopConsensusNet(reactors)
  225. // wait till everyone makes the first new block
  226. timeoutWaitGroup(t, N-1, func(wg *sync.WaitGroup, j int) {
  227. <-eventChans[j]
  228. wg.Done()
  229. }, css)
  230. }
  231. func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) {
  232. timeoutWaitGroup(t, n, func(wg *sync.WaitGroup, j int) {
  233. newBlockI := <-eventChans[j]
  234. newBlock := newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block
  235. t.Logf("[WARN] Got block height=%v validator=%v", newBlock.Height, j)
  236. err := validateBlock(newBlock, activeVals)
  237. if err != nil {
  238. t.Fatal(err)
  239. }
  240. for _, tx := range txs {
  241. css[j].mempool.CheckTx(tx, nil)
  242. }
  243. eventChans[j] <- struct{}{}
  244. wg.Done()
  245. }, css)
  246. }
  247. // expects high synchrony!
  248. func validateBlock(block *types.Block, activeVals map[string]struct{}) error {
  249. if block.LastCommit.Size() != len(activeVals) {
  250. return fmt.Errorf("Commit size doesn't match number of active validators. Got %d, expected %d", block.LastCommit.Size(), len(activeVals))
  251. }
  252. for _, vote := range block.LastCommit.Precommits {
  253. if _, ok := activeVals[string(vote.ValidatorAddress)]; !ok {
  254. return fmt.Errorf("Found vote for unactive validator %X", vote.ValidatorAddress)
  255. }
  256. }
  257. return nil
  258. }
  259. func timeoutWaitGroup(t *testing.T, n int, f func(*sync.WaitGroup, int), css []*ConsensusState) {
  260. wg := new(sync.WaitGroup)
  261. wg.Add(n)
  262. for i := 0; i < n; i++ {
  263. go f(wg, i)
  264. }
  265. done := make(chan struct{})
  266. go func() {
  267. wg.Wait()
  268. close(done)
  269. }()
  270. select {
  271. case <-done:
  272. case <-time.After(time.Second * 10):
  273. for i, cs := range css {
  274. fmt.Println("#################")
  275. fmt.Println("Validator", i)
  276. fmt.Println(cs.GetRoundState())
  277. fmt.Println("")
  278. }
  279. panic("Timed out waiting for all validators to commit a block")
  280. }
  281. }