You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

351 lines
9.8 KiB

  1. package consensus
  2. import (
  3. "bytes"
  4. "sync"
  5. "testing"
  6. "time"
  7. "github.com/tendermint/tendermint/config/tendermint_test"
  8. . "github.com/tendermint/go-common"
  9. cfg "github.com/tendermint/go-config"
  10. "github.com/tendermint/go-crypto"
  11. "github.com/tendermint/go-events"
  12. "github.com/tendermint/go-logger"
  13. "github.com/tendermint/go-p2p"
  14. "github.com/tendermint/tendermint/types"
  15. )
  16. func init() {
  17. config = tendermint_test.ResetConfig("consensus_reactor_test")
  18. }
  19. func resetConfigTimeouts() {
  20. logger.SetLogLevel("notice")
  21. //config.Set("log_level", "notice")
  22. config.Set("timeout_propose", 2000)
  23. // config.Set("timeout_propose_delta", 500)
  24. // config.Set("timeout_prevote", 1000)
  25. // config.Set("timeout_prevote_delta", 500)
  26. // config.Set("timeout_precommit", 1000)
  27. // config.Set("timeout_precommit_delta", 500)
  28. // config.Set("timeout_commit", 1000)
  29. }
  30. func TestReactor(t *testing.T) {
  31. resetConfigTimeouts()
  32. N := 4
  33. css := randConsensusNet(N)
  34. reactors := make([]*ConsensusReactor, N)
  35. eventChans := make([]chan interface{}, N)
  36. for i := 0; i < N; i++ {
  37. reactors[i] = NewConsensusReactor(css[i], false)
  38. reactors[i].SetPrivValidator(css[i].privValidator)
  39. eventSwitch := events.NewEventSwitch()
  40. _, err := eventSwitch.Start()
  41. if err != nil {
  42. t.Fatalf("Failed to start switch: %v", err)
  43. }
  44. reactors[i].SetEventSwitch(eventSwitch)
  45. eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1)
  46. }
  47. p2p.MakeConnectedSwitches(N, func(i int, s *p2p.Switch) *p2p.Switch {
  48. s.AddReactor("CONSENSUS", reactors[i])
  49. return s
  50. }, p2p.Connect2Switches)
  51. // wait till everyone makes the first new block
  52. wg := new(sync.WaitGroup)
  53. wg.Add(N)
  54. for i := 0; i < N; i++ {
  55. go func(j int) {
  56. <-eventChans[j]
  57. wg.Done()
  58. }(i)
  59. }
  60. // Make wait into a channel
  61. done := make(chan struct{})
  62. go func() {
  63. wg.Wait()
  64. close(done)
  65. }()
  66. tick := time.NewTicker(time.Second * 3)
  67. select {
  68. case <-done:
  69. case <-tick.C:
  70. t.Fatalf("Timed out waiting for all validators to commit first block")
  71. }
  72. }
  73. // 4 validators. 1 is byzantine. The other three are partitioned into A (1 val) and B (2 vals).
  74. // byzantine validator sends conflicting proposals into A and B,
  75. // and prevotes/precommits on both of them.
  76. // B sees a commit, A doesn't.
  77. // Byzantine validator refuses to prevote.
  78. // Heal partition and ensure A sees the commit
  79. func TestByzantine(t *testing.T) {
  80. resetConfigTimeouts()
  81. N := 4
  82. css := randConsensusNet(N)
  83. switches := make([]*p2p.Switch, N)
  84. for i := 0; i < N; i++ {
  85. switches[i] = p2p.NewSwitch(cfg.NewMapConfig(nil))
  86. }
  87. reactors := make([]p2p.Reactor, N)
  88. eventChans := make([]chan interface{}, N)
  89. for i := 0; i < N; i++ {
  90. var privVal PrivValidator
  91. privVal = css[i].privValidator
  92. if i == 0 {
  93. privVal = NewByzantinePrivValidator(privVal.(*types.PrivValidator))
  94. // make byzantine
  95. css[i].decideProposal = func(j int) func(int, int) {
  96. return func(height, round int) {
  97. byzantineDecideProposalFunc(height, round, css[j], switches[j])
  98. }
  99. }(i)
  100. css[i].doPrevote = func(height, round int) {}
  101. }
  102. eventSwitch := events.NewEventSwitch()
  103. _, err := eventSwitch.Start()
  104. if err != nil {
  105. t.Fatalf("Failed to start switch: %v", err)
  106. }
  107. eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1)
  108. conR := NewConsensusReactor(css[i], false)
  109. conR.SetPrivValidator(privVal)
  110. conR.SetEventSwitch(eventSwitch)
  111. var conRI p2p.Reactor
  112. conRI = conR
  113. if i == 0 {
  114. conRI = NewByzantineReactor(conR)
  115. }
  116. reactors[i] = conRI
  117. }
  118. p2p.MakeConnectedSwitches(N, func(i int, s *p2p.Switch) *p2p.Switch {
  119. // ignore new switch s, we already made ours
  120. switches[i].AddReactor("CONSENSUS", reactors[i])
  121. return switches[i]
  122. }, func(sws []*p2p.Switch, i, j int) {
  123. // the network starts partitioned with globally active adversary
  124. if i != 0 {
  125. return
  126. }
  127. p2p.Connect2Switches(sws, i, j)
  128. })
  129. // byz proposer sends one block to peers[0]
  130. // and the other block to peers[1] and peers[2].
  131. // note peers and switches order don't match.
  132. peers := switches[0].Peers().List()
  133. ind0 := getSwitchIndex(switches, peers[0])
  134. ind1 := getSwitchIndex(switches, peers[1])
  135. ind2 := getSwitchIndex(switches, peers[2])
  136. // connect the 2 peers in the larger partition
  137. p2p.Connect2Switches(switches, ind1, ind2)
  138. // wait for someone in the big partition to make a block
  139. select {
  140. case <-eventChans[ind2]:
  141. }
  142. log.Notice("A block has been committed. Healing partition")
  143. // connect the partitions
  144. p2p.Connect2Switches(switches, ind0, ind1)
  145. p2p.Connect2Switches(switches, ind0, ind2)
  146. // wait till everyone makes the first new block
  147. // (one of them already has)
  148. wg := new(sync.WaitGroup)
  149. wg.Add(2)
  150. for i := 1; i < N-1; i++ {
  151. go func(j int) {
  152. <-eventChans[j]
  153. wg.Done()
  154. }(i)
  155. }
  156. done := make(chan struct{})
  157. go func() {
  158. wg.Wait()
  159. close(done)
  160. }()
  161. tick := time.NewTicker(time.Second * 10)
  162. select {
  163. case <-done:
  164. case <-tick.C:
  165. for i, reactor := range reactors {
  166. t.Log(Fmt("Consensus Reactor %v", i))
  167. t.Log(Fmt("%v", reactor))
  168. }
  169. t.Fatalf("Timed out waiting for all validators to commit first block")
  170. }
  171. }
  172. func getSwitchIndex(switches []*p2p.Switch, peer *p2p.Peer) int {
  173. for i, s := range switches {
  174. if bytes.Equal(peer.NodeInfo.PubKey.Address(), s.NodeInfo().PubKey.Address()) {
  175. return i
  176. }
  177. }
  178. panic("didnt find peer in switches")
  179. return -1
  180. }
  181. //-------------------------------
  182. // byzantine consensus functions
  183. func byzantineDecideProposalFunc(height, round int, cs *ConsensusState, sw *p2p.Switch) {
  184. // byzantine user should create two proposals and try to split the vote.
  185. // Avoid sending on internalMsgQueue and running consensus state.
  186. // Create a new proposal block from state/txs from the mempool.
  187. block1, blockParts1 := cs.createProposalBlock()
  188. polRound, polBlockID := cs.Votes.POLInfo()
  189. proposal1 := types.NewProposal(height, round, blockParts1.Header(), polRound, polBlockID)
  190. cs.privValidator.SignProposal(cs.state.ChainID, proposal1) // byzantine doesnt err
  191. // Create a new proposal block from state/txs from the mempool.
  192. block2, blockParts2 := cs.createProposalBlock()
  193. polRound, polBlockID = cs.Votes.POLInfo()
  194. proposal2 := types.NewProposal(height, round, blockParts2.Header(), polRound, polBlockID)
  195. cs.privValidator.SignProposal(cs.state.ChainID, proposal2) // byzantine doesnt err
  196. block1Hash := block1.Hash()
  197. block2Hash := block2.Hash()
  198. // broadcast conflicting proposals/block parts to peers
  199. peers := sw.Peers().List()
  200. log.Notice("Byzantine: broadcasting conflicting proposals", "peers", len(peers))
  201. for i, peer := range peers {
  202. if i < len(peers)/2 {
  203. go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1)
  204. } else {
  205. go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2)
  206. }
  207. }
  208. }
  209. func sendProposalAndParts(height, round int, cs *ConsensusState, peer *p2p.Peer, proposal *types.Proposal, blockHash []byte, parts *types.PartSet) {
  210. // proposal
  211. msg := &ProposalMessage{Proposal: proposal}
  212. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  213. // parts
  214. for i := 0; i < parts.Total(); i++ {
  215. part := parts.GetPart(i)
  216. msg := &BlockPartMessage{
  217. Height: height, // This tells peer that this part applies to us.
  218. Round: round, // This tells peer that this part applies to us.
  219. Part: part,
  220. }
  221. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  222. }
  223. // votes
  224. cs.mtx.Lock()
  225. prevote, _ := cs.signVote(types.VoteTypePrevote, blockHash, parts.Header())
  226. precommit, _ := cs.signVote(types.VoteTypePrecommit, blockHash, parts.Header())
  227. cs.mtx.Unlock()
  228. peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{prevote}})
  229. peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{precommit}})
  230. }
  231. //----------------------------------------
  232. // byzantine consensus reactor
  233. type ByzantineReactor struct {
  234. Service
  235. reactor *ConsensusReactor
  236. }
  237. func NewByzantineReactor(conR *ConsensusReactor) *ByzantineReactor {
  238. return &ByzantineReactor{
  239. Service: conR,
  240. reactor: conR,
  241. }
  242. }
  243. func (br *ByzantineReactor) SetSwitch(s *p2p.Switch) { br.reactor.SetSwitch(s) }
  244. func (br *ByzantineReactor) GetChannels() []*p2p.ChannelDescriptor { return br.reactor.GetChannels() }
  245. func (br *ByzantineReactor) AddPeer(peer *p2p.Peer) {
  246. if !br.reactor.IsRunning() {
  247. return
  248. }
  249. // Create peerState for peer
  250. peerState := NewPeerState(peer)
  251. peer.Data.Set(types.PeerStateKey, peerState)
  252. // Send our state to peer.
  253. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
  254. if !br.reactor.fastSync {
  255. br.reactor.sendNewRoundStepMessage(peer)
  256. }
  257. }
  258. func (br *ByzantineReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  259. br.reactor.RemovePeer(peer, reason)
  260. }
  261. func (br *ByzantineReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte) {
  262. br.reactor.Receive(chID, peer, msgBytes)
  263. }
  264. //----------------------------------------
  265. // byzantine privValidator
  266. type ByzantinePrivValidator struct {
  267. Address []byte `json:"address"`
  268. types.Signer `json:"-"`
  269. mtx sync.Mutex
  270. }
  271. // Return a priv validator that will sign anything
  272. func NewByzantinePrivValidator(pv *types.PrivValidator) *ByzantinePrivValidator {
  273. return &ByzantinePrivValidator{
  274. Address: pv.Address,
  275. Signer: pv.Signer,
  276. }
  277. }
  278. func (privVal *ByzantinePrivValidator) GetAddress() []byte {
  279. return privVal.Address
  280. }
  281. func (privVal *ByzantinePrivValidator) SignVote(chainID string, vote *types.Vote) error {
  282. privVal.mtx.Lock()
  283. defer privVal.mtx.Unlock()
  284. // Sign
  285. vote.Signature = privVal.Sign(types.SignBytes(chainID, vote)).(crypto.SignatureEd25519)
  286. return nil
  287. }
  288. func (privVal *ByzantinePrivValidator) SignProposal(chainID string, proposal *types.Proposal) error {
  289. privVal.mtx.Lock()
  290. defer privVal.mtx.Unlock()
  291. // Sign
  292. proposal.Signature = privVal.Sign(types.SignBytes(chainID, proposal)).(crypto.SignatureEd25519)
  293. return nil
  294. }
  295. func (privVal *ByzantinePrivValidator) String() string {
  296. return Fmt("PrivValidator{%X}", privVal.Address)
  297. }