You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

307 lines
8.9 KiB

7 years ago
7 years ago
7 years ago
7 years ago
  1. package consensus
  2. import (
  3. "sync"
  4. "testing"
  5. "time"
  6. "github.com/tendermint/tendermint/p2p"
  7. "github.com/tendermint/tendermint/types"
  8. . "github.com/tendermint/tmlibs/common"
  9. "github.com/tendermint/tmlibs/events"
  10. )
  11. func init() {
  12. config = ResetConfig("consensus_byzantine_test")
  13. }
  14. //----------------------------------------------
  15. // byzantine failures
  16. // 4 validators. 1 is byzantine. The other three are partitioned into A (1 val) and B (2 vals).
  17. // byzantine validator sends conflicting proposals into A and B,
  18. // and prevotes/precommits on both of them.
  19. // B sees a commit, A doesn't.
  20. // Byzantine validator refuses to prevote.
  21. // Heal partition and ensure A sees the commit
  22. func TestByzantine(t *testing.T) {
  23. N := 4
  24. logger := consensusLogger()
  25. css := randConsensusNet(N, "consensus_byzantine_test", newMockTickerFunc(false), newCounter)
  26. // give the byzantine validator a normal ticker
  27. css[0].SetTimeoutTicker(NewTimeoutTicker())
  28. switches := make([]*p2p.Switch, N)
  29. p2pLogger := logger.With("module", "p2p")
  30. for i := 0; i < N; i++ {
  31. switches[i] = p2p.NewSwitch(config.P2P)
  32. switches[i].SetLogger(p2pLogger.With("validator", i))
  33. }
  34. reactors := make([]p2p.Reactor, N)
  35. defer func() {
  36. for _, r := range reactors {
  37. if rr, ok := r.(*ByzantineReactor); ok {
  38. rr.reactor.Switch.Stop()
  39. } else {
  40. r.(*ConsensusReactor).Switch.Stop()
  41. }
  42. }
  43. }()
  44. eventChans := make([]chan interface{}, N)
  45. eventLogger := logger.With("module", "events")
  46. for i := 0; i < N; i++ {
  47. if i == 0 {
  48. css[i].privValidator = NewByzantinePrivValidator(css[i].privValidator.(*types.PrivValidator))
  49. // make byzantine
  50. css[i].decideProposal = func(j int) func(int, int) {
  51. return func(height, round int) {
  52. byzantineDecideProposalFunc(t, height, round, css[j], switches[j])
  53. }
  54. }(i)
  55. css[i].doPrevote = func(height, round int) {}
  56. }
  57. eventSwitch := events.NewEventSwitch()
  58. eventSwitch.SetLogger(eventLogger.With("validator", i))
  59. _, err := eventSwitch.Start()
  60. if err != nil {
  61. t.Fatalf("Failed to start switch: %v", err)
  62. }
  63. eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1)
  64. conR := NewConsensusReactor(css[i], true) // so we dont start the consensus states
  65. conR.SetLogger(logger.With("validator", i))
  66. conR.SetEventSwitch(eventSwitch)
  67. var conRI p2p.Reactor
  68. conRI = conR
  69. if i == 0 {
  70. conRI = NewByzantineReactor(conR)
  71. }
  72. reactors[i] = conRI
  73. }
  74. p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch {
  75. // ignore new switch s, we already made ours
  76. switches[i].AddReactor("CONSENSUS", reactors[i])
  77. return switches[i]
  78. }, func(sws []*p2p.Switch, i, j int) {
  79. // the network starts partitioned with globally active adversary
  80. if i != 0 {
  81. return
  82. }
  83. p2p.Connect2Switches(sws, i, j)
  84. })
  85. // start the state machines
  86. byzR := reactors[0].(*ByzantineReactor)
  87. s := byzR.reactor.conS.GetState()
  88. byzR.reactor.SwitchToConsensus(s)
  89. for i := 1; i < N; i++ {
  90. cr := reactors[i].(*ConsensusReactor)
  91. cr.SwitchToConsensus(cr.conS.GetState())
  92. }
  93. // byz proposer sends one block to peers[0]
  94. // and the other block to peers[1] and peers[2].
  95. // note peers and switches order don't match.
  96. peers := switches[0].Peers().List()
  97. ind0 := getSwitchIndex(switches, peers[0])
  98. ind1 := getSwitchIndex(switches, peers[1])
  99. ind2 := getSwitchIndex(switches, peers[2])
  100. // connect the 2 peers in the larger partition
  101. p2p.Connect2Switches(switches, ind1, ind2)
  102. // wait for someone in the big partition to make a block
  103. <-eventChans[ind2]
  104. t.Log("A block has been committed. Healing partition")
  105. // connect the partitions
  106. p2p.Connect2Switches(switches, ind0, ind1)
  107. p2p.Connect2Switches(switches, ind0, ind2)
  108. // wait till everyone makes the first new block
  109. // (one of them already has)
  110. wg := new(sync.WaitGroup)
  111. wg.Add(2)
  112. for i := 1; i < N-1; i++ {
  113. go func(j int) {
  114. <-eventChans[j]
  115. wg.Done()
  116. }(i)
  117. }
  118. done := make(chan struct{})
  119. go func() {
  120. wg.Wait()
  121. close(done)
  122. }()
  123. tick := time.NewTicker(time.Second * 10)
  124. select {
  125. case <-done:
  126. case <-tick.C:
  127. for i, reactor := range reactors {
  128. t.Log(Fmt("Consensus Reactor %v", i))
  129. t.Log(Fmt("%v", reactor))
  130. }
  131. t.Fatalf("Timed out waiting for all validators to commit first block")
  132. }
  133. }
  134. //-------------------------------
  135. // byzantine consensus functions
  136. func byzantineDecideProposalFunc(t *testing.T, height, round int, cs *ConsensusState, sw *p2p.Switch) {
  137. // byzantine user should create two proposals and try to split the vote.
  138. // Avoid sending on internalMsgQueue and running consensus state.
  139. // Create a new proposal block from state/txs from the mempool.
  140. block1, blockParts1 := cs.createProposalBlock()
  141. polRound, polBlockID := cs.Votes.POLInfo()
  142. proposal1 := types.NewProposal(height, round, blockParts1.Header(), polRound, polBlockID)
  143. cs.privValidator.SignProposal(cs.state.ChainID, proposal1) // byzantine doesnt err
  144. // Create a new proposal block from state/txs from the mempool.
  145. block2, blockParts2 := cs.createProposalBlock()
  146. polRound, polBlockID = cs.Votes.POLInfo()
  147. proposal2 := types.NewProposal(height, round, blockParts2.Header(), polRound, polBlockID)
  148. cs.privValidator.SignProposal(cs.state.ChainID, proposal2) // byzantine doesnt err
  149. block1Hash := block1.Hash()
  150. block2Hash := block2.Hash()
  151. // broadcast conflicting proposals/block parts to peers
  152. peers := sw.Peers().List()
  153. t.Logf("Byzantine: broadcasting conflicting proposals to %d peers", len(peers))
  154. for i, peer := range peers {
  155. if i < len(peers)/2 {
  156. go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1)
  157. } else {
  158. go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2)
  159. }
  160. }
  161. }
  162. func sendProposalAndParts(height, round int, cs *ConsensusState, peer *p2p.Peer, proposal *types.Proposal, blockHash []byte, parts *types.PartSet) {
  163. // proposal
  164. msg := &ProposalMessage{Proposal: proposal}
  165. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  166. // parts
  167. for i := 0; i < parts.Total(); i++ {
  168. part := parts.GetPart(i)
  169. msg := &BlockPartMessage{
  170. Height: height, // This tells peer that this part applies to us.
  171. Round: round, // This tells peer that this part applies to us.
  172. Part: part,
  173. }
  174. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  175. }
  176. // votes
  177. cs.mtx.Lock()
  178. prevote, _ := cs.signVote(types.VoteTypePrevote, blockHash, parts.Header())
  179. precommit, _ := cs.signVote(types.VoteTypePrecommit, blockHash, parts.Header())
  180. cs.mtx.Unlock()
  181. peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{prevote}})
  182. peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{precommit}})
  183. }
  184. //----------------------------------------
  185. // byzantine consensus reactor
  186. type ByzantineReactor struct {
  187. Service
  188. reactor *ConsensusReactor
  189. }
  190. func NewByzantineReactor(conR *ConsensusReactor) *ByzantineReactor {
  191. return &ByzantineReactor{
  192. Service: conR,
  193. reactor: conR,
  194. }
  195. }
  196. func (br *ByzantineReactor) SetSwitch(s *p2p.Switch) { br.reactor.SetSwitch(s) }
  197. func (br *ByzantineReactor) GetChannels() []*p2p.ChannelDescriptor { return br.reactor.GetChannels() }
  198. func (br *ByzantineReactor) AddPeer(peer *p2p.Peer) {
  199. if !br.reactor.IsRunning() {
  200. return
  201. }
  202. // Create peerState for peer
  203. peerState := NewPeerState(peer)
  204. peer.Data.Set(types.PeerStateKey, peerState)
  205. // Send our state to peer.
  206. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
  207. if !br.reactor.fastSync {
  208. br.reactor.sendNewRoundStepMessages(peer)
  209. }
  210. }
  211. func (br *ByzantineReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  212. br.reactor.RemovePeer(peer, reason)
  213. }
  214. func (br *ByzantineReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte) {
  215. br.reactor.Receive(chID, peer, msgBytes)
  216. }
  217. //----------------------------------------
  218. // byzantine privValidator
  219. type ByzantinePrivValidator struct {
  220. Address []byte `json:"address"`
  221. types.Signer `json:"-"`
  222. mtx sync.Mutex
  223. }
  224. // Return a priv validator that will sign anything
  225. func NewByzantinePrivValidator(pv *types.PrivValidator) *ByzantinePrivValidator {
  226. return &ByzantinePrivValidator{
  227. Address: pv.Address,
  228. Signer: pv.Signer,
  229. }
  230. }
  231. func (privVal *ByzantinePrivValidator) GetAddress() []byte {
  232. return privVal.Address
  233. }
  234. func (privVal *ByzantinePrivValidator) SignVote(chainID string, vote *types.Vote) error {
  235. privVal.mtx.Lock()
  236. defer privVal.mtx.Unlock()
  237. // Sign
  238. vote.Signature = privVal.Sign(types.SignBytes(chainID, vote))
  239. return nil
  240. }
  241. func (privVal *ByzantinePrivValidator) SignProposal(chainID string, proposal *types.Proposal) error {
  242. privVal.mtx.Lock()
  243. defer privVal.mtx.Unlock()
  244. // Sign
  245. proposal.Signature = privVal.Sign(types.SignBytes(chainID, proposal))
  246. return nil
  247. }
  248. func (privVal *ByzantinePrivValidator) SignHeartbeat(chainID string, heartbeat *types.Heartbeat) error {
  249. privVal.mtx.Lock()
  250. defer privVal.mtx.Unlock()
  251. // Sign
  252. heartbeat.Signature = privVal.Sign(types.SignBytes(chainID, heartbeat))
  253. return nil
  254. }
  255. func (privVal *ByzantinePrivValidator) String() string {
  256. return Fmt("PrivValidator{%X}", privVal.Address)
  257. }