diff --git a/README.md b/README.md index 5e94a196b..3fe82051a 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ TenderMint - proof of concept * **[mempool](https://github.com/tendermint/tendermint/blob/master/mempool):** Handles the broadcasting of uncommitted transactions. * **[crypto](https://github.com/tendermint/tendermint/blob/master/crypto):** Includes cgo bindings of ed25519. -### Status +### Development Status * Mempool *now* * Consensus *complete* @@ -17,3 +17,7 @@ TenderMint - proof of concept * p2p/* *complete* * Ed25519 bindings *complete* * merkle/* *complete* + +### Issues + +* merkle/* does not free old children nodes. Implement something memory-aware that makes merkle/* act like a weakly referenced map. diff --git a/consensus/consensus.go b/consensus/consensus.go index 0aa0c245e..2b5491108 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -49,7 +49,7 @@ func calcRoundStartTime(round uint16, startTime time.Time) time.Time { roundDurationDelta*(time.Duration((int64(round)*int64(round)-int64(round))/2))) } -// calcs the current round given startTime of round zero. +// calculates the current round given startTime of round zero. // NOTE: round is zero if startTime is in the future. func calcRound(startTime time.Time) uint16 { now := time.Now() @@ -90,14 +90,14 @@ func calcRoundInfo(startTime time.Time) (round uint16, roundStartTime time.Time, //----------------------------------------------------------------------------- -type ConsensusManager struct { +type ConsensusAgent struct { sw *p2p.Switch swEvents chan interface{} quit chan struct{} started uint32 stopped uint32 - cs *ConsensusState + conS *ConsensusState blockStore *BlockStore doActionCh chan RoundAction @@ -109,61 +109,61 @@ type ConsensusManager struct { stagedState *State } -func NewConsensusManager(sw *p2p.Switch, state *State, blockStore *BlockStore) *ConsensusManager { +func NewConsensusAgent(sw *p2p.Switch, state *State, blockStore *BlockStore) *ConsensusAgent { swEvents := make(chan interface{}) - sw.AddEventListener("ConsensusManager.swEvents", swEvents) - cs := NewConsensusState(state) - cm := &ConsensusManager{ + sw.AddEventListener("ConsensusAgent.swEvents", swEvents) + conS := NewConsensusState(state) + conA := &ConsensusAgent{ sw: sw, swEvents: swEvents, quit: make(chan struct{}), - cs: cs, + conS: conS, blockStore: blockStore, doActionCh: make(chan RoundAction, 1), state: state, peerStates: make(map[string]*PeerState), } - return cm + return conA } // Sets our private validator account for signing votes. -func (cm *ConsensusManager) SetPrivValidator(priv *PrivValidator) { - cm.mtx.Lock() - defer cm.mtx.Unlock() - cm.privValidator = priv +func (conA *ConsensusAgent) SetPrivValidator(priv *PrivValidator) { + conA.mtx.Lock() + defer conA.mtx.Unlock() + conA.privValidator = priv } -func (cm *ConsensusManager) PrivValidator() *PrivValidator { - cm.mtx.Lock() - defer cm.mtx.Unlock() - return cm.privValidator +func (conA *ConsensusAgent) PrivValidator() *PrivValidator { + conA.mtx.Lock() + defer conA.mtx.Unlock() + return conA.privValidator } -func (cm *ConsensusManager) Start() { - if atomic.CompareAndSwapUint32(&cm.started, 0, 1) { - log.Info("Starting ConsensusManager") - go cm.switchEventsRoutine() - go cm.gossipProposalRoutine() - go cm.knownPartsRoutine() - go cm.gossipVoteRoutine() - go cm.proposeAndVoteRoutine() +func (conA *ConsensusAgent) Start() { + if atomic.CompareAndSwapUint32(&conA.started, 0, 1) { + log.Info("Starting ConsensusAgent") + go conA.switchEventsRoutine() + go conA.gossipProposalRoutine() + go conA.knownPartsRoutine() + go conA.gossipVoteRoutine() + go conA.proposeAndVoteRoutine() } } -func (cm *ConsensusManager) Stop() { - if atomic.CompareAndSwapUint32(&cm.stopped, 0, 1) { - log.Info("Stopping ConsensusManager") - close(cm.quit) - close(cm.swEvents) +func (conA *ConsensusAgent) Stop() { + if atomic.CompareAndSwapUint32(&conA.stopped, 0, 1) { + log.Info("Stopping ConsensusAgent") + close(conA.quit) + close(conA.swEvents) } } // Handle peer new/done events -func (cm *ConsensusManager) switchEventsRoutine() { +func (conA *ConsensusAgent) switchEventsRoutine() { for { - swEvent, ok := <-cm.swEvents + swEvent, ok := <-conA.swEvents if !ok { break } @@ -171,24 +171,24 @@ func (cm *ConsensusManager) switchEventsRoutine() { case p2p.SwitchEventNewPeer: event := swEvent.(p2p.SwitchEventNewPeer) // Create peerState for event.Peer - cm.mtx.Lock() - cm.peerStates[event.Peer.Key] = NewPeerState(event.Peer) - cm.mtx.Unlock() + conA.mtx.Lock() + conA.peerStates[event.Peer.Key] = NewPeerState(event.Peer) + conA.mtx.Unlock() // Share our state with event.Peer // By sending KnownBlockPartsMessage, // we send our height/round + startTime, and known block parts, // which is sufficient for the peer to begin interacting with us. - event.Peer.TrySend(ProposalCh, cm.makeKnownBlockPartsMessage(cm.cs.RoundState())) + event.Peer.TrySend(ProposalCh, conA.makeKnownBlockPartsMessage(conA.conS.RoundState())) case p2p.SwitchEventDonePeer: event := swEvent.(p2p.SwitchEventDonePeer) // Delete peerState for event.Peer - cm.mtx.Lock() - peerState := cm.peerStates[event.Peer.Key] + conA.mtx.Lock() + peerState := conA.peerStates[event.Peer.Key] if peerState != nil { peerState.Disconnect() - delete(cm.peerStates, event.Peer.Key) + delete(conA.peerStates, event.Peer.Key) } - cm.mtx.Unlock() + conA.mtx.Unlock() default: log.Warning("Unhandled switch event type") } @@ -196,7 +196,7 @@ func (cm *ConsensusManager) switchEventsRoutine() { } // Like, how large is it and how often can we send it? -func (cm *ConsensusManager) makeKnownBlockPartsMessage(rs *RoundState) *KnownBlockPartsMessage { +func (conA *ConsensusAgent) makeKnownBlockPartsMessage(rs *RoundState) *KnownBlockPartsMessage { return &KnownBlockPartsMessage{ Height: rs.Height, SecondsSinceStartTime: uint32(time.Now().Sub(rs.StartTime).Seconds()), @@ -205,20 +205,20 @@ func (cm *ConsensusManager) makeKnownBlockPartsMessage(rs *RoundState) *KnownBlo } // NOTE: may return nil, but (nil).Wants*() returns false. -func (cm *ConsensusManager) getPeerState(peer *p2p.Peer) *PeerState { - cm.mtx.Lock() - defer cm.mtx.Unlock() - return cm.peerStates[peer.Key] +func (conA *ConsensusAgent) getPeerState(peer *p2p.Peer) *PeerState { + conA.mtx.Lock() + defer conA.mtx.Unlock() + return conA.peerStates[peer.Key] } -func (cm *ConsensusManager) gossipProposalRoutine() { +func (conA *ConsensusAgent) gossipProposalRoutine() { OUTER_LOOP: for { // Get round state - rs := cm.cs.RoundState() + rs := conA.conS.RoundState() // Receive incoming message on ProposalCh - inMsg, ok := cm.sw.Receive(ProposalCh) + inMsg, ok := conA.sw.Receive(ProposalCh) if !ok { break OUTER_LOOP // Client has stopped } @@ -243,7 +243,7 @@ OUTER_LOOP: // If we are the proposer, then don't do anything else. // We're already sending peers our proposal on another routine. - privValidator := cm.PrivValidator() + privValidator := conA.PrivValidator() if privValidator != nil && rs.Proposer.Account.Id == privValidator.Id { continue OUTER_LOOP } @@ -258,10 +258,10 @@ OUTER_LOOP: if added { // If peer wants this part, send peer the part // and our new blockParts state. - kbpMsg := cm.makeKnownBlockPartsMessage(rs) + kbpMsg := conA.makeKnownBlockPartsMessage(rs) partMsg := &BlockPartMessage{BlockPart: msg.BlockPart} - for _, peer := range cm.sw.Peers().List() { - peerState := cm.getPeerState(peer) + for _, peer := range conA.sw.Peers().List() { + peerState := conA.getPeerState(peer) if peerState.WantsBlockPart(msg.BlockPart) { peer.TrySend(KnownPartsCh, kbpMsg) peer.TrySend(ProposalCh, partMsg) @@ -277,18 +277,18 @@ OUTER_LOOP: default: // Ignore unknown message - // cm.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage) + // conA.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage) } } // Cleanup } -func (cm *ConsensusManager) knownPartsRoutine() { +func (conA *ConsensusAgent) knownPartsRoutine() { OUTER_LOOP: for { // Receive incoming message on ProposalCh - inMsg, ok := cm.sw.Receive(KnownPartsCh) + inMsg, ok := conA.sw.Receive(KnownPartsCh) if !ok { break OUTER_LOOP // Client has stopped } @@ -298,10 +298,10 @@ OUTER_LOOP: msg, ok := msg_.(*KnownBlockPartsMessage) if !ok { // Ignore unknown message type - // cm.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage) + // conA.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage) continue OUTER_LOOP } - peerState := cm.getPeerState(inMsg.MConn.Peer) + peerState := conA.getPeerState(inMsg.MConn.Peer) if !peerState.IsConnected() { // Peer disconnected before we were able to process. continue OUTER_LOOP @@ -314,27 +314,27 @@ OUTER_LOOP: // Signs a vote document and broadcasts it. // hash can be nil to vote "nil" -func (cm *ConsensusManager) signAndVote(vote *Vote) error { - privValidator := cm.PrivValidator() +func (conA *ConsensusAgent) signAndVote(vote *Vote) error { + privValidator := conA.PrivValidator() if privValidator != nil { err := privValidator.SignVote(vote) if err != nil { return err } msg := p2p.TypedMessage{msgTypeVote, vote} - cm.sw.Broadcast(VoteCh, msg) + conA.sw.Broadcast(VoteCh, msg) } return nil } -func (cm *ConsensusManager) stageProposal(proposal *BlockPartSet) error { +func (conA *ConsensusAgent) stageProposal(proposal *BlockPartSet) error { // Already staged? - cm.mtx.Lock() - if cm.stagedProposal == proposal { - cm.mtx.Unlock() + conA.mtx.Lock() + if conA.stagedProposal == proposal { + conA.mtx.Unlock() return nil } else { - cm.mtx.Unlock() + conA.mtx.Unlock() } // Basic validation @@ -348,9 +348,9 @@ func (cm *ConsensusManager) stageProposal(proposal *BlockPartSet) error { } // Create a copy of the state for staging - cm.mtx.Lock() - stateCopy := cm.state.Copy() // Deep copy the state before staging. - cm.mtx.Unlock() + conA.mtx.Lock() + stateCopy := conA.state.Copy() // Deep copy the state before staging. + conA.mtx.Unlock() // Commit block onto the copied state. err = stateCopy.CommitBlock(block) @@ -359,15 +359,15 @@ func (cm *ConsensusManager) stageProposal(proposal *BlockPartSet) error { } // Looks good! - cm.mtx.Lock() - cm.stagedProposal = proposal - cm.stagedState = stateCopy - cm.mtx.Unlock() + conA.mtx.Lock() + conA.stagedProposal = proposal + conA.stagedState = stateCopy + conA.mtx.Unlock() return nil } // Constructs an unsigned proposal -func (cm *ConsensusManager) constructProposal(rs *RoundState) (*BlockPartSet, error) { +func (conA *ConsensusAgent) constructProposal(rs *RoundState) (*BlockPartSet, error) { // XXX implement, first implement mempool // proposal := block.ToBlockPartSet() return nil, nil @@ -376,12 +376,12 @@ func (cm *ConsensusManager) constructProposal(rs *RoundState) (*BlockPartSet, er // Vote for (or against) the proposal for this round. // Call during transition from RoundStepProposal to RoundStepVote. // We may not have received a full proposal. -func (cm *ConsensusManager) voteProposal(rs *RoundState) error { +func (conA *ConsensusAgent) voteProposal(rs *RoundState) error { // If we're locked, must vote that. - locked := cm.cs.LockedProposal() + locked := conA.conS.LockedProposal() if locked != nil { block := locked.Block() - err := cm.signAndVote(&Vote{ + err := conA.signAndVote(&Vote{ Height: rs.Height, Round: rs.Round, Type: VoteTypeBare, @@ -390,10 +390,10 @@ func (cm *ConsensusManager) voteProposal(rs *RoundState) error { return err } // Stage proposal - err := cm.stageProposal(rs.Proposal) + err := conA.stageProposal(rs.Proposal) if err != nil { // Vote for nil, whatever the error. - err := cm.signAndVote(&Vote{ + err := conA.signAndVote(&Vote{ Height: rs.Height, Round: rs.Round, Type: VoteTypeBare, @@ -402,7 +402,7 @@ func (cm *ConsensusManager) voteProposal(rs *RoundState) error { return err } // Vote for block. - err = cm.signAndVote(&Vote{ + err = conA.signAndVote(&Vote{ Height: rs.Height, Round: rs.Round, Type: VoteTypeBare, @@ -413,7 +413,7 @@ func (cm *ConsensusManager) voteProposal(rs *RoundState) error { // Precommit proposal if we see enough votes for it. // Call during transition from RoundStepVote to RoundStepPrecommit. -func (cm *ConsensusManager) precommitProposal(rs *RoundState) error { +func (conA *ConsensusAgent) precommitProposal(rs *RoundState) error { // If we see a 2/3 majority for votes for a block, precommit. // TODO: maybe could use commitTime here and avg it with later commitTime? @@ -426,16 +426,16 @@ func (cm *ConsensusManager) precommitProposal(rs *RoundState) error { // If proposal is invalid or unknown, do nothing. // See note on ZombieValidators to see why. - if cm.stageProposal(rs.Proposal) != nil { + if conA.stageProposal(rs.Proposal) != nil { return nil } // Lock this proposal. // NOTE: we're unlocking any prior locks. - cm.cs.LockProposal(rs.Proposal) + conA.conS.LockProposal(rs.Proposal) // Send precommit vote. - err := cm.signAndVote(&Vote{ + err := conA.signAndVote(&Vote{ Height: rs.Height, Round: rs.Round, Type: VoteTypePrecommit, @@ -451,7 +451,7 @@ func (cm *ConsensusManager) precommitProposal(rs *RoundState) error { // Commit or unlock. // Call after RoundStepPrecommit, after round has completely expired. -func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) (commitTime time.Time, err error) { +func (conA *ConsensusAgent) commitOrUnlockProposal(rs *RoundState) (commitTime time.Time, err error) { // If there exists a 2/3 majority of precommits. // Validate the block and commit. if hash, commitTime, ok := rs.RoundPrecommits.TwoThirdsMajority(); ok { @@ -460,13 +460,13 @@ func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) (commitTime t // do not commit. // TODO If we were just late to receive the block, when // do we actually get it? Document it. - if cm.stageProposal(rs.Proposal) != nil { + if conA.stageProposal(rs.Proposal) != nil { return time.Time{}, nil } // TODO: Remove? - cm.cs.LockProposal(rs.Proposal) + conA.conS.LockProposal(rs.Proposal) // Vote commit. - err := cm.signAndVote(&Vote{ + err := conA.signAndVote(&Vote{ Height: rs.Height, Round: rs.Round, Type: VoteTypePrecommit, @@ -476,11 +476,11 @@ func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) (commitTime t return time.Time{}, err } // Commit block. - cm.commitProposal(rs.Proposal, commitTime) + conA.commitProposal(rs.Proposal, commitTime) return commitTime, nil } else { // Otherwise, if a 1/3 majority if a block that isn't our locked one exists, unlock. - locked := cm.cs.LockedProposal() + locked := conA.conS.LockedProposal() if locked != nil { for _, hashOrNil := range rs.RoundPrecommits.OneThirdMajority() { if hashOrNil == nil { @@ -488,7 +488,7 @@ func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) (commitTime t } if !bytes.Equal(hashOrNil, locked.Block().Hash()) { // Unlock our lock. - cm.cs.LockProposal(nil) + conA.conS.LockProposal(nil) } } } @@ -496,50 +496,50 @@ func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) (commitTime t } } -func (cm *ConsensusManager) commitProposal(proposal *BlockPartSet, commitTime time.Time) error { - cm.mtx.Lock() - defer cm.mtx.Unlock() +func (conA *ConsensusAgent) commitProposal(proposal *BlockPartSet, commitTime time.Time) error { + conA.mtx.Lock() + defer conA.mtx.Unlock() - if cm.stagedProposal != proposal { + if conA.stagedProposal != proposal { panic("Unexpected stagedProposal.") // Shouldn't happen. } // Save to blockStore block, blockParts := proposal.Block(), proposal.BlockParts() - err := cm.blockStore.SaveBlockParts(block.Height, blockParts) + err := conA.blockStore.SaveBlockParts(block.Height, blockParts) if err != nil { return err } // What was staged becomes committed. - cm.state = cm.stagedState - cm.state.Save(commitTime) - cm.cs.Update(cm.state) - cm.stagedProposal = nil - cm.stagedState = nil + conA.state = conA.stagedState + conA.state.Save(commitTime) + conA.conS.Update(conA.state) + conA.stagedProposal = nil + conA.stagedState = nil return nil } // Given a RoundState where we are the proposer, // broadcast rs.proposal to all the peers. -func (cm *ConsensusManager) shareProposal(rs *RoundState) { - privValidator := cm.PrivValidator() +func (conA *ConsensusAgent) shareProposal(rs *RoundState) { + privValidator := conA.PrivValidator() proposal := rs.Proposal if privValidator == nil || proposal == nil { return } privValidator.SignProposal(rs.Round, proposal) blockParts := proposal.BlockParts() - peers := cm.sw.Peers().List() + peers := conA.sw.Peers().List() if len(peers) == 0 { log.Warning("Could not propose: no peers") return } numBlockParts := uint16(len(blockParts)) - kbpMsg := cm.makeKnownBlockPartsMessage(rs) + kbpMsg := conA.makeKnownBlockPartsMessage(rs) for i, peer := range peers { - peerState := cm.getPeerState(peer) + peerState := conA.getPeerState(peer) if !peerState.IsConnected() { continue // Peer was disconnected. } @@ -554,7 +554,7 @@ func (cm *ConsensusManager) shareProposal(rs *RoundState) { for i := uint16(0); i < numBlockParts; i++ { part := blockParts[(startIndex+i)%numBlockParts] // Ensure round hasn't expired on our end. - currentRS := cm.cs.RoundState() + currentRS := conA.conS.RoundState() if currentRS != rs { return } @@ -571,14 +571,14 @@ func (cm *ConsensusManager) shareProposal(rs *RoundState) { } } -func (cm *ConsensusManager) gossipVoteRoutine() { +func (conA *ConsensusAgent) gossipVoteRoutine() { OUTER_LOOP: for { // Get round state - rs := cm.cs.RoundState() + rs := conA.conS.RoundState() // Receive incoming message on VoteCh - inMsg, ok := cm.sw.Receive(VoteCh) + inMsg, ok := conA.sw.Receive(VoteCh) if !ok { break // Client has stopped } @@ -622,8 +622,8 @@ OUTER_LOOP: } // Gossip vote. - for _, peer := range cm.sw.Peers().List() { - peerState := cm.getPeerState(peer) + for _, peer := range conA.sw.Peers().List() { + peerState := conA.getPeerState(peer) wantsVote, unsolicited := peerState.WantsVote(vote) if wantsVote { if unsolicited { @@ -641,7 +641,7 @@ OUTER_LOOP: case *VoteRankMessage: msg := msg_.(*VoteRankMessage) - peerState := cm.getPeerState(inMsg.MConn.Peer) + peerState := conA.getPeerState(inMsg.MConn.Peer) if !peerState.IsConnected() { // Peer disconnected before we were able to process. continue OUTER_LOOP @@ -650,7 +650,7 @@ OUTER_LOOP: default: // Ignore unknown message - // cm.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage) + // conA.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage) } } @@ -666,17 +666,17 @@ type RoundAction struct { // Source of all round state transitions and votes. // It can be preemptively woken up via amessage to // doActionCh. -func (cm *ConsensusManager) proposeAndVoteRoutine() { +func (conA *ConsensusAgent) proposeAndVoteRoutine() { // Figure out when to wake up next (in the absence of other events) setAlarm := func() { - if len(cm.doActionCh) > 0 { + if len(conA.doActionCh) > 0 { return // Already going to wake up later. } // Figure out which height/round/step we're at, // then schedule an action for when it is due. - rs := cm.cs.RoundState() + rs := conA.conS.RoundState() _, _, roundDuration, _, elapsedRatio := calcRoundInfo(rs.StartTime) switch rs.Step() { case RoundStepStart: @@ -685,19 +685,19 @@ func (cm *ConsensusManager) proposeAndVoteRoutine() { // startTime is in the future. time.Sleep(time.Duration(-1.0*elapsedRatio) * roundDuration) } - cm.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepProposal} + conA.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepProposal} case RoundStepProposal: // Wake up when it's time to vote. time.Sleep(time.Duration(roundDeadlineBare-elapsedRatio) * roundDuration) - cm.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepBareVotes} + conA.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepBareVotes} case RoundStepBareVotes: // Wake up when it's time to precommit. time.Sleep(time.Duration(roundDeadlinePrecommit-elapsedRatio) * roundDuration) - cm.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepPrecommits} + conA.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepPrecommits} case RoundStepPrecommits: // Wake up when the round is over. time.Sleep(time.Duration(1.0-elapsedRatio) * roundDuration) - cm.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepCommitOrUnlock} + conA.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepCommitOrUnlock} case RoundStepCommitOrUnlock: // This shouldn't happen. // Before setAlarm() got called, @@ -708,7 +708,7 @@ func (cm *ConsensusManager) proposeAndVoteRoutine() { for { func() { - roundAction := <-cm.doActionCh + roundAction := <-conA.doActionCh // Always set the alarm after any processing below. defer setAlarm() @@ -718,21 +718,21 @@ func (cm *ConsensusManager) proposeAndVoteRoutine() { // We only consider transitioning to given step. step := roundAction.XnToStep // This is the current state. - rs := cm.cs.RoundState() + rs := conA.conS.RoundState() if height != rs.Height || round != rs.Round { return // Not relevant. } if step == RoundStepProposal && rs.Step() == RoundStepStart { // Propose a block if I am the proposer. - privValidator := cm.PrivValidator() + privValidator := conA.PrivValidator() if privValidator != nil && rs.Proposer.Account.Id == privValidator.Id { // If we're already locked on a proposal, use that. - proposal := cm.cs.LockedProposal() + proposal := conA.conS.LockedProposal() if proposal != nil { // Otherwise, construct a new proposal. var err error - proposal, err = cm.constructProposal(rs) + proposal, err = conA.constructProposal(rs) if err != nil { log.Error("Error attempting to construct a proposal: %v", err) return // Pretend like we weren't the proposer. Shrug. @@ -743,31 +743,31 @@ func (cm *ConsensusManager) proposeAndVoteRoutine() { // Share the parts. // We send all parts to all of our peers, but everyone receives parts // starting at a different index, wrapping around back to 0. - cm.shareProposal(rs) + conA.shareProposal(rs) } } else if step == RoundStepBareVotes && rs.Step() <= RoundStepProposal { - err := cm.voteProposal(rs) + err := conA.voteProposal(rs) if err != nil { log.Info("Error attempting to vote for proposal: %v", err) } } else if step == RoundStepPrecommits && rs.Step() <= RoundStepBareVotes { - err := cm.precommitProposal(rs) + err := conA.precommitProposal(rs) if err != nil { log.Info("Error attempting to precommit for proposal: %v", err) } } else if step == RoundStepCommitOrUnlock && rs.Step() <= RoundStepPrecommits { - commitTime, err := cm.commitOrUnlockProposal(rs) + commitTime, err := conA.commitOrUnlockProposal(rs) if err != nil { log.Info("Error attempting to commit or update for proposal: %v", err) } if !commitTime.IsZero() { // We already set up ConsensusState for the next height - // (it happens in the call to cm.commitProposal). + // (it happens in the call to conA.commitProposal). } else { // Round is over. This is a special case. // Prepare a new RoundState for the next state. - cm.cs.SetupRound(rs.Round + 1) + conA.conS.SetupRound(rs.Round + 1) return // setAlarm() takes care of the rest. } } else { diff --git a/consensus/state.go b/consensus/state.go index fbc80d39f..3d41f863e 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -55,7 +55,7 @@ func (cs *ConsensusState) RoundState() *RoundState { return cs.roundState } -// Primarily gets called upon block commit by ConsensusManager. +// Primarily gets called upon block commit by ConsensusAgent. func (cs *ConsensusState) Update(state *State) { cs.mtx.Lock() defer cs.mtx.Unlock() diff --git a/main.go b/main.go index 3e436b089..d807c017f 100644 --- a/main.go +++ b/main.go @@ -10,10 +10,10 @@ import ( ) type Node struct { - lz []p2p.Listener - sw *p2p.Switch - book *p2p.AddrBook - pmgr *p2p.PeerManager + lz []p2p.Listener + sw *p2p.Switch + book *p2p.AddrBook + pexAgent *p2p.PEXAgent } func NewNode() *Node { @@ -53,12 +53,12 @@ func NewNode() *Node { } sw := p2p.NewSwitch(chDescs) book := p2p.NewAddrBook(config.RootDir + "/addrbook.json") - pmgr := p2p.NewPeerManager(sw, book) + pexAgent := p2p.NewPEXAgent(sw, book) return &Node{ - sw: sw, - book: book, - pmgr: pmgr, + sw: sw, + book: book, + pexAgent: pexAgent, } } @@ -69,7 +69,7 @@ func (n *Node) Start() { } n.sw.Start() n.book.Start() - n.pmgr.Start() + n.pexAgent.Start() } func (n *Node) Stop() { @@ -77,7 +77,7 @@ func (n *Node) Stop() { // TODO: gracefully disconnect from peers. n.sw.Stop() n.book.Stop() - n.pmgr.Stop() + n.pexAgent.Stop() } // Add a Listener to accept inbound peer connections. @@ -102,7 +102,7 @@ func (n *Node) inboundConnectionRoutine(l p2p.Listener) { } // NOTE: We don't yet have the external address of the // remote (if they have a listener at all). - // PeerManager's pexRoutine will handle that. + // PEXAgent's pexRoutine will handle that. } // cleanup diff --git a/mempool/agent.go b/mempool/agent.go new file mode 100644 index 000000000..4bbce327f --- /dev/null +++ b/mempool/agent.go @@ -0,0 +1,127 @@ +package mempol + +import ( + "github.com/tendermint/tendermint/p2p" +) + +type MempoolAgent struct { + sw *p2p.Switch + swEvents chan interface{} + quit chan struct{} + started uint32 + stopped uint32 +} + +func NewMempoolAgent(sw *p2p.Switch) *MempoolAgent { + swEvents := make(chan interface{}) + sw.AddEventListener("MempoolAgent.swEvents", swEvents) + memA := &MempoolAgent{ + sw: sw, + swEvents: swEvents, + quit: make(chan struct{}), + } + return memA +} + +func (memA *MempoolAgent) Start() { + if atomic.CompareAndSwapUint32(&memA.started, 0, 1) { + log.Info("Starting MempoolAgent") + go memA.switchEventsRoutine() + go memA.gossipTxRoutine() + } +} + +func (memA *MempoolAgent) Stop() { + if atomic.CompareAndSwapUint32(&memA.stopped, 0, 1) { + log.Info("Stopping MempoolAgent") + close(memA.quit) + close(memA.swEvents) + } +} + +// Handle peer new/done events +func (memA *MempoolAgent) switchEventsRoutine() { + for { + swEvent, ok := <-memA.swEvents + if !ok { + break + } + switch swEvent.(type) { + case p2p.SwitchEventNewPeer: + // event := swEvent.(p2p.SwitchEventNewPeer) + case p2p.SwitchEventDonePeer: + // event := swEvent.(p2p.SwitchEventDonePeer) + default: + log.Warning("Unhandled switch event type") + } + } +} + +func (memA *MempoolAgent) gossipTxRoutine() { +OUTER_LOOP: + for { + // Receive incoming message on ProposalCh + inMsg, ok := memA.sw.Receive(ProposalCh) + if !ok { + break OUTER_LOOP // Client has stopped + } + _, msg_ := decodeMessage(inMsg.Bytes) + log.Info("gossipProposalRoutine received %v", msg_) + + switch msg_.(type) { + case *TxMessage: + // msg := msg_.(*TxMessage) + // XXX + + default: + // Ignore unknown message + // memA.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage) + } + } + + // Cleanup +} + +//----------------------------------------------------------------------------- +// Messages + +const ( + msgTypeUnknown = byte(0x00) + msgTypeTx = byte(0x10) +) + +// TODO: check for unnecessary extra bytes at the end. +func decodeMessage(bz []byte) (msgType byte, msg interface{}) { + n, err := new(int64), new(error) + // log.Debug("decoding msg bytes: %X", bz) + msgType = bz[0] + switch msgType { + case msgTypeTx: + msg = readTxMessage(bytes.NewReader(bz[1:]), n, err) + default: + msg = nil + } + return +} + +//------------------------------------- + +type TxMessage struct { + Tx Tx +} + +func readTxMessage(r io.Reader, n *int64, err *error) *TxMessage { + return &TxMessage{ + Tx: ReadTx(r, n, err), + } +} + +func (m *TxMessage) WriteTo(w io.Writer) (n int64, err error) { + WriteByte(w, msgTypeTx, &n, &err) + WriteBinary(w, m.Tx, &n, &err) + return +} + +func (m *TxMessage) String() string { + return fmt.Sprintf("[TxMessage %v]", m.Tx) +} diff --git a/p2p/peer_manager.go b/p2p/pex_agent.go similarity index 74% rename from p2p/peer_manager.go rename to p2p/pex_agent.go index 0a88b0f50..ea5f55891 100644 --- a/p2p/peer_manager.go +++ b/p2p/pex_agent.go @@ -22,10 +22,10 @@ const ( ) /* -PeerManager handles PEX (peer exchange) and ensures that an +PEXAgent handles PEX (peer exchange) and ensures that an adequate number of peers are connected to the switch. */ -type PeerManager struct { +type PEXAgent struct { sw *Switch swEvents chan interface{} quit chan struct{} @@ -35,49 +35,49 @@ type PeerManager struct { book *AddrBook } -func NewPeerManager(sw *Switch, book *AddrBook) *PeerManager { +func NewPEXAgent(sw *Switch, book *AddrBook) *PEXAgent { swEvents := make(chan interface{}) - sw.AddEventListener("PeerManager.swEvents", swEvents) - pm := &PeerManager{ + sw.AddEventListener("PEXAgent.swEvents", swEvents) + pexA := &PEXAgent{ sw: sw, swEvents: swEvents, quit: make(chan struct{}), book: book, } - return pm + return pexA } -func (pm *PeerManager) Start() { - if atomic.CompareAndSwapUint32(&pm.started, 0, 1) { - log.Info("Starting PeerManager") - go pm.switchEventsRoutine() - go pm.requestRoutine() - go pm.ensurePeersRoutine() +func (pexA *PEXAgent) Start() { + if atomic.CompareAndSwapUint32(&pexA.started, 0, 1) { + log.Info("Starting PEXAgent") + go pexA.switchEventsRoutine() + go pexA.requestRoutine() + go pexA.ensurePeersRoutine() } } -func (pm *PeerManager) Stop() { - if atomic.CompareAndSwapUint32(&pm.stopped, 0, 1) { - log.Info("Stopping PeerManager") - close(pm.quit) - close(pm.swEvents) +func (pexA *PEXAgent) Stop() { + if atomic.CompareAndSwapUint32(&pexA.stopped, 0, 1) { + log.Info("Stopping PEXAgent") + close(pexA.quit) + close(pexA.swEvents) } } // Asks peer for more addresses. -func (pm *PeerManager) RequestPEX(peer *Peer) { +func (pexA *PEXAgent) RequestPEX(peer *Peer) { peer.TrySend(PexCh, &pexRequestMessage{}) } -func (pm *PeerManager) SendAddrs(peer *Peer, addrs []*NetAddress) { +func (pexA *PEXAgent) SendAddrs(peer *Peer, addrs []*NetAddress) { peer.Send(PexCh, &pexAddrsMessage{Addrs: addrs}) } // For new outbound peers, announce our listener addresses if any, // and if .book needs more addresses, ask for them. -func (pm *PeerManager) switchEventsRoutine() { +func (pexA *PEXAgent) switchEventsRoutine() { for { - swEvent, ok := <-pm.swEvents + swEvent, ok := <-pexA.swEvents if !ok { break } @@ -85,9 +85,9 @@ func (pm *PeerManager) switchEventsRoutine() { case SwitchEventNewPeer: event := swEvent.(SwitchEventNewPeer) if event.Peer.IsOutbound() { - pm.SendAddrs(event.Peer, pm.book.OurAddresses()) - if pm.book.NeedMoreAddrs() { - pm.RequestPEX(event.Peer) + pexA.SendAddrs(event.Peer, pexA.book.OurAddresses()) + if pexA.book.NeedMoreAddrs() { + pexA.RequestPEX(event.Peer) } } case SwitchEventDonePeer: @@ -97,17 +97,17 @@ func (pm *PeerManager) switchEventsRoutine() { } // Ensures that sufficient peers are connected. (continuous) -func (pm *PeerManager) ensurePeersRoutine() { +func (pexA *PEXAgent) ensurePeersRoutine() { // fire once immediately. - pm.ensurePeers() + pexA.ensurePeers() // fire periodically timer := NewRepeatTimer(ensurePeersPeriodSeconds * time.Second) FOR_LOOP: for { select { case <-timer.Ch: - pm.ensurePeers() - case <-pm.quit: + pexA.ensurePeers() + case <-pexA.quit: break FOR_LOOP } } @@ -117,8 +117,8 @@ FOR_LOOP: } // Ensures that sufficient peers are connected. (once) -func (pm *PeerManager) ensurePeers() { - numOutPeers, _, numDialing := pm.sw.NumPeers() +func (pexA *PEXAgent) ensurePeers() { + numOutPeers, _, numDialing := pexA.sw.NumPeers() numToDial := minNumOutboundPeers - (numOutPeers + numDialing) if numToDial <= 0 { return @@ -133,13 +133,13 @@ func (pm *PeerManager) ensurePeers() { // Try to fetch a new peer 3 times. // This caps the maximum number of tries to 3 * numToDial. for j := 0; i < 3; j++ { - picked = pm.book.PickAddress(newBias) + picked = pexA.book.PickAddress(newBias) if picked == nil { return } if toDial.Has(picked.String()) || - pm.sw.IsDialing(picked) || - pm.sw.Peers().Has(picked.String()) { + pexA.sw.IsDialing(picked) || + pexA.sw.Peers().Has(picked.String()) { continue } else { break @@ -155,19 +155,19 @@ func (pm *PeerManager) ensurePeers() { for _, item := range toDial.Values() { picked := item.(*NetAddress) go func() { - _, err := pm.sw.DialPeerWithAddress(picked) + _, err := pexA.sw.DialPeerWithAddress(picked) if err != nil { - pm.book.MarkAttempt(picked) + pexA.book.MarkAttempt(picked) } }() } } // Handles incoming PEX messages. -func (pm *PeerManager) requestRoutine() { +func (pexA *PEXAgent) requestRoutine() { for { - inMsg, ok := pm.sw.Receive(PexCh) // {Peer, Time, Packet} + inMsg, ok := pexA.sw.Receive(PexCh) // {Peer, Time, Packet} if !ok { // Client has stopped break @@ -181,7 +181,7 @@ func (pm *PeerManager) requestRoutine() { case *pexRequestMessage: // inMsg.MConn.Peer requested some peers. // TODO: prevent abuse. - addrs := pm.book.GetSelection() + addrs := pexA.book.GetSelection() msg := &pexAddrsMessage{Addrs: addrs} queued := inMsg.MConn.Peer.TrySend(PexCh, msg) if !queued { @@ -193,11 +193,11 @@ func (pm *PeerManager) requestRoutine() { // (We don't want to get spammed with bad peers) srcAddr := inMsg.MConn.RemoteAddress for _, addr := range msg.(*pexAddrsMessage).Addrs { - pm.book.AddAddress(addr, srcAddr) + pexA.book.AddAddress(addr, srcAddr) } default: // Ignore unknown message. - // pm.sw.StopPeerForError(inMsg.MConn.Peer, pexErrInvalidMessage) + // pexA.sw.StopPeerForError(inMsg.MConn.Peer, pexErrInvalidMessage) } } diff --git a/state/state.go b/state/state.go index 936dd45dc..f0e9d453f 100644 --- a/state/state.go +++ b/state/state.go @@ -56,7 +56,7 @@ func LoadState(db db_.Db) *State { } // Save this state into the db. -// For convenience, the commitTime (required by ConsensusManager) +// For convenience, the commitTime (required by ConsensusAgent) // is saved here. func (s *State) Save(commitTime time.Time) { s.mtx.Lock()