You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

300 lines
8.9 KiB

7 years ago
7 years ago
8 years ago
8 years ago
8 years ago
8 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package consensus
  2. import (
  3. "sync"
  4. "testing"
  5. "time"
  6. crypto "github.com/tendermint/go-crypto"
  7. data "github.com/tendermint/go-wire/data"
  8. "github.com/tendermint/tendermint/p2p"
  9. "github.com/tendermint/tendermint/types"
  10. . "github.com/tendermint/tmlibs/common"
  11. "github.com/tendermint/tmlibs/events"
  12. )
  13. func init() {
  14. config = ResetConfig("consensus_byzantine_test")
  15. }
  16. //----------------------------------------------
  17. // byzantine failures
  18. // 4 validators. 1 is byzantine. The other three are partitioned into A (1 val) and B (2 vals).
  19. // byzantine validator sends conflicting proposals into A and B,
  20. // and prevotes/precommits on both of them.
  21. // B sees a commit, A doesn't.
  22. // Byzantine validator refuses to prevote.
  23. // Heal partition and ensure A sees the commit
  24. func TestByzantine(t *testing.T) {
  25. N := 4
  26. logger := consensusLogger()
  27. css := randConsensusNet(N, "consensus_byzantine_test", newMockTickerFunc(false), newCounter)
  28. // give the byzantine validator a normal ticker
  29. css[0].SetTimeoutTicker(NewTimeoutTicker())
  30. switches := make([]*p2p.Switch, N)
  31. p2pLogger := logger.With("module", "p2p")
  32. for i := 0; i < N; i++ {
  33. switches[i] = p2p.NewSwitch(config.P2P)
  34. switches[i].SetLogger(p2pLogger.With("validator", i))
  35. }
  36. reactors := make([]p2p.Reactor, N)
  37. defer func() {
  38. for _, r := range reactors {
  39. if rr, ok := r.(*ByzantineReactor); ok {
  40. rr.reactor.Switch.Stop()
  41. } else {
  42. r.(*ConsensusReactor).Switch.Stop()
  43. }
  44. }
  45. }()
  46. eventChans := make([]chan interface{}, N)
  47. eventLogger := logger.With("module", "events")
  48. for i := 0; i < N; i++ {
  49. if i == 0 {
  50. css[i].privValidator = NewByzantinePrivValidator(css[i].privValidator)
  51. // make byzantine
  52. css[i].decideProposal = func(j int) func(int, int) {
  53. return func(height, round int) {
  54. byzantineDecideProposalFunc(t, height, round, css[j], switches[j])
  55. }
  56. }(i)
  57. css[i].doPrevote = func(height, round int) {}
  58. }
  59. eventSwitch := events.NewEventSwitch()
  60. eventSwitch.SetLogger(eventLogger.With("validator", i))
  61. _, err := eventSwitch.Start()
  62. if err != nil {
  63. t.Fatalf("Failed to start switch: %v", err)
  64. }
  65. eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1)
  66. conR := NewConsensusReactor(css[i], true) // so we dont start the consensus states
  67. conR.SetLogger(logger.With("validator", i))
  68. conR.SetEventSwitch(eventSwitch)
  69. var conRI p2p.Reactor
  70. conRI = conR
  71. if i == 0 {
  72. conRI = NewByzantineReactor(conR)
  73. }
  74. reactors[i] = conRI
  75. }
  76. p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch {
  77. // ignore new switch s, we already made ours
  78. switches[i].AddReactor("CONSENSUS", reactors[i])
  79. return switches[i]
  80. }, func(sws []*p2p.Switch, i, j int) {
  81. // the network starts partitioned with globally active adversary
  82. if i != 0 {
  83. return
  84. }
  85. p2p.Connect2Switches(sws, i, j)
  86. })
  87. // start the state machines
  88. byzR := reactors[0].(*ByzantineReactor)
  89. s := byzR.reactor.conS.GetState()
  90. byzR.reactor.SwitchToConsensus(s)
  91. for i := 1; i < N; i++ {
  92. cr := reactors[i].(*ConsensusReactor)
  93. cr.SwitchToConsensus(cr.conS.GetState())
  94. }
  95. // byz proposer sends one block to peers[0]
  96. // and the other block to peers[1] and peers[2].
  97. // note peers and switches order don't match.
  98. peers := switches[0].Peers().List()
  99. ind0 := getSwitchIndex(switches, peers[0])
  100. ind1 := getSwitchIndex(switches, peers[1])
  101. ind2 := getSwitchIndex(switches, peers[2])
  102. // connect the 2 peers in the larger partition
  103. p2p.Connect2Switches(switches, ind1, ind2)
  104. // wait for someone in the big partition to make a block
  105. <-eventChans[ind2]
  106. t.Log("A block has been committed. Healing partition")
  107. // connect the partitions
  108. p2p.Connect2Switches(switches, ind0, ind1)
  109. p2p.Connect2Switches(switches, ind0, ind2)
  110. // wait till everyone makes the first new block
  111. // (one of them already has)
  112. wg := new(sync.WaitGroup)
  113. wg.Add(2)
  114. for i := 1; i < N-1; i++ {
  115. go func(j int) {
  116. <-eventChans[j]
  117. wg.Done()
  118. }(i)
  119. }
  120. done := make(chan struct{})
  121. go func() {
  122. wg.Wait()
  123. close(done)
  124. }()
  125. tick := time.NewTicker(time.Second * 10)
  126. select {
  127. case <-done:
  128. case <-tick.C:
  129. for i, reactor := range reactors {
  130. t.Log(Fmt("Consensus Reactor %v", i))
  131. t.Log(Fmt("%v", reactor))
  132. }
  133. t.Fatalf("Timed out waiting for all validators to commit first block")
  134. }
  135. }
  136. //-------------------------------
  137. // byzantine consensus functions
  138. func byzantineDecideProposalFunc(t *testing.T, height, round int, cs *ConsensusState, sw *p2p.Switch) {
  139. // byzantine user should create two proposals and try to split the vote.
  140. // Avoid sending on internalMsgQueue and running consensus state.
  141. // Create a new proposal block from state/txs from the mempool.
  142. block1, blockParts1 := cs.createProposalBlock()
  143. polRound, polBlockID := cs.Votes.POLInfo()
  144. proposal1 := types.NewProposal(height, round, blockParts1.Header(), polRound, polBlockID)
  145. cs.privValidator.SignProposal(cs.state.ChainID, proposal1) // byzantine doesnt err
  146. // Create a new proposal block from state/txs from the mempool.
  147. block2, blockParts2 := cs.createProposalBlock()
  148. polRound, polBlockID = cs.Votes.POLInfo()
  149. proposal2 := types.NewProposal(height, round, blockParts2.Header(), polRound, polBlockID)
  150. cs.privValidator.SignProposal(cs.state.ChainID, proposal2) // byzantine doesnt err
  151. block1Hash := block1.Hash()
  152. block2Hash := block2.Hash()
  153. // broadcast conflicting proposals/block parts to peers
  154. peers := sw.Peers().List()
  155. t.Logf("Byzantine: broadcasting conflicting proposals to %d peers", len(peers))
  156. for i, peer := range peers {
  157. if i < len(peers)/2 {
  158. go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1)
  159. } else {
  160. go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2)
  161. }
  162. }
  163. }
  164. func sendProposalAndParts(height, round int, cs *ConsensusState, peer p2p.Peer, proposal *types.Proposal, blockHash []byte, parts *types.PartSet) {
  165. // proposal
  166. msg := &ProposalMessage{Proposal: proposal}
  167. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  168. // parts
  169. for i := 0; i < parts.Total(); i++ {
  170. part := parts.GetPart(i)
  171. msg := &BlockPartMessage{
  172. Height: height, // This tells peer that this part applies to us.
  173. Round: round, // This tells peer that this part applies to us.
  174. Part: part,
  175. }
  176. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  177. }
  178. // votes
  179. cs.mtx.Lock()
  180. prevote, _ := cs.signVote(types.VoteTypePrevote, blockHash, parts.Header())
  181. precommit, _ := cs.signVote(types.VoteTypePrecommit, blockHash, parts.Header())
  182. cs.mtx.Unlock()
  183. peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{prevote}})
  184. peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{precommit}})
  185. }
  186. //----------------------------------------
  187. // byzantine consensus reactor
  188. type ByzantineReactor struct {
  189. Service
  190. reactor *ConsensusReactor
  191. }
  192. func NewByzantineReactor(conR *ConsensusReactor) *ByzantineReactor {
  193. return &ByzantineReactor{
  194. Service: conR,
  195. reactor: conR,
  196. }
  197. }
  198. func (br *ByzantineReactor) SetSwitch(s *p2p.Switch) { br.reactor.SetSwitch(s) }
  199. func (br *ByzantineReactor) GetChannels() []*p2p.ChannelDescriptor { return br.reactor.GetChannels() }
  200. func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {
  201. if !br.reactor.IsRunning() {
  202. return
  203. }
  204. // Create peerState for peer
  205. peerState := NewPeerState(peer).SetLogger(br.reactor.Logger)
  206. peer.Set(types.PeerStateKey, peerState)
  207. // Send our state to peer.
  208. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
  209. if !br.reactor.fastSync {
  210. br.reactor.sendNewRoundStepMessages(peer)
  211. }
  212. }
  213. func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
  214. br.reactor.RemovePeer(peer, reason)
  215. }
  216. func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
  217. br.reactor.Receive(chID, peer, msgBytes)
  218. }
  219. //----------------------------------------
  220. // byzantine privValidator
  221. type ByzantinePrivValidator struct {
  222. types.Signer
  223. pv types.PrivValidator
  224. }
  225. // Return a priv validator that will sign anything
  226. func NewByzantinePrivValidator(pv types.PrivValidator) *ByzantinePrivValidator {
  227. return &ByzantinePrivValidator{
  228. Signer: pv.(*types.PrivValidatorFS).Signer,
  229. pv: pv,
  230. }
  231. }
  232. func (privVal *ByzantinePrivValidator) GetAddress() data.Bytes {
  233. return privVal.pv.GetAddress()
  234. }
  235. func (privVal *ByzantinePrivValidator) GetPubKey() crypto.PubKey {
  236. return privVal.pv.GetPubKey()
  237. }
  238. func (privVal *ByzantinePrivValidator) SignVote(chainID string, vote *types.Vote) (err error) {
  239. vote.Signature, err = privVal.Sign(types.SignBytes(chainID, vote))
  240. return err
  241. }
  242. func (privVal *ByzantinePrivValidator) SignProposal(chainID string, proposal *types.Proposal) (err error) {
  243. proposal.Signature, err = privVal.Sign(types.SignBytes(chainID, proposal))
  244. return nil
  245. }
  246. func (privVal *ByzantinePrivValidator) SignHeartbeat(chainID string, heartbeat *types.Heartbeat) (err error) {
  247. heartbeat.Signature, err = privVal.Sign(types.SignBytes(chainID, heartbeat))
  248. return nil
  249. }
  250. func (privVal *ByzantinePrivValidator) String() string {
  251. return Fmt("PrivValidator{%X}", privVal.GetAddress())
  252. }