Browse Source

Fixing issues from review in #229

pull/314/head
Jae Kwon 8 years ago
committed by Ethan Buchman
parent
commit
95c8bb4252
2 changed files with 32 additions and 63 deletions
  1. +7
    -1
      consensus/height_vote_set_test.go
  2. +25
    -62
      consensus/reactor.go

+ 7
- 1
consensus/height_vote_set_test.go View File

@ -25,11 +25,17 @@ func TestPeerCatchupRounds(t *testing.T) {
vote1000_0 := makeVoteHR(t, 1, 1000, privVals, 0)
added, err = hvs.AddVote(vote1000_0, "peer1")
if !added || err != nil {
t.Error("Expected to successfully add vote from peer", added, err)
}
vote1001_0 := makeVoteHR(t, 1, 1001, privVals, 0)
added, err = hvs.AddVote(vote1001_0, "peer1")
if added {
t.Error("Expected to *not* add vote from peer, too many catchup rounds.")
}
added, err = hvs.AddVote(vote1000_0, "peer2")
added, err = hvs.AddVote(vote1001_0, "peer2")
if !added || err != nil {
t.Error("Expected to successfully add vote from another peer")
}


+ 25
- 62
consensus/reactor.go View File

@ -123,7 +123,6 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
go conR.gossipDataRoutine(peer, peerState)
go conR.gossipVotesRoutine(peer, peerState)
go conR.queryMaj23Routine(peer, peerState)
go conR.replyMaj23Routine(peer, peerState)
// Send our state to peer.
// If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
@ -178,10 +177,31 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
cs.mtx.Lock()
height, votes := cs.Height, cs.Votes
cs.mtx.Unlock()
if height == msg.Height {
votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.Key, msg.BlockID)
if height != msg.Height {
return
}
ps.ApplyVoteSetMaj23Message(msg)
// Peer claims to have a maj23 for some BlockID at H,R,S,
votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.Key, msg.BlockID)
// Respond with a VoteSetBitsMessage showing which votes we have.
// (and consequently shows which we don't have)
var ourVotes *BitArray
switch msg.Type {
case types.VoteTypePrevote:
ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
case types.VoteTypePrecommit:
ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID)
default:
log.Warn("Bad VoteSetBitsMessage field Type")
return
}
src.TrySend(VoteSetBitsChannel, struct{ ConsensusMessage }{&VoteSetBitsMessage{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: msg.BlockID,
Votes: ourVotes,
}})
default:
log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
}
@ -660,44 +680,6 @@ OUTER_LOOP:
}
}
func (conR *ConsensusReactor) replyMaj23Routine(peer *p2p.Peer, ps *PeerState) {
log := log.New("peer", peer)
OUTER_LOOP:
for {
// Manage disconnects from self or peer.
if !peer.IsRunning() || !conR.IsRunning() {
log.Notice(Fmt("Stopping replyMaj23Routine for %v.", peer))
return
}
rs := conR.conS.GetRoundState()
// Process a VoteSetMaj23Message
msg := <-ps.Maj23Queue
if rs.Height == msg.Height {
var ourVotes *BitArray
switch msg.Type {
case types.VoteTypePrevote:
ourVotes = rs.Votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
case types.VoteTypePrecommit:
ourVotes = rs.Votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID)
default:
log.Warn("Bad VoteSetBitsMessage field Type")
return
}
peer.TrySend(VoteSetBitsChannel, struct{ ConsensusMessage }{&VoteSetBitsMessage{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: msg.BlockID,
Votes: ourVotes,
}})
}
continue OUTER_LOOP
}
}
func (conR *ConsensusReactor) String() string {
return conR.StringIndented("")
}
@ -770,7 +752,6 @@ type PeerState struct {
mtx sync.Mutex
PeerRoundState
Maj23Queue chan *VoteSetMaj23Message
}
func NewPeerState(peer *p2p.Peer) *PeerState {
@ -782,7 +763,6 @@ func NewPeerState(peer *p2p.Peer) *PeerState {
LastCommitRound: -1,
CatchupCommitRound: -1,
},
Maj23Queue: make(chan *VoteSetMaj23Message, 2),
}
}
@ -873,7 +853,7 @@ func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote
}
func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray {
if type_ != types.VoteTypePrevote && type_ != types.VoteTypePrecommit {
if !types.IsVoteTypeValid(type_) {
PanicSanity("Invalid vote type")
}
@ -1077,21 +1057,6 @@ func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
}
// When a peer claims to have a maj23 for some BlockID at H,R,S,
// we will try to respond with a VoteSetBitsMessage showing which
// bits we already have (and which we don't yet have),
// but that happens in another goroutine.
func (ps *PeerState) ApplyVoteSetMaj23Message(msg *VoteSetMaj23Message) {
// ps.mtx.Lock()
// defer ps.mtx.Unlock()
select {
case ps.Maj23Queue <- msg:
default:
// Just ignore if we're already processing messages.
}
}
// The peer has responded with a bitarray of votes that it has
// of the corresponding BlockID.
// ourVotes: BitArray of votes we have for msg.BlockID
@ -1121,11 +1086,9 @@ func (ps *PeerState) StringIndented(indent string) string {
return fmt.Sprintf(`PeerState{
%s Key %v
%s PRS %v
%s MjQ %v
%s}`,
indent, ps.Peer.Key,
indent, ps.PeerRoundState.StringIndented(indent+" "),
indent, len(ps.Maj23Queue),
indent)
}


Loading…
Cancel
Save