From 0237d284cc673cd5a37d3022c920b84a868b986c Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Tue, 24 Mar 2015 12:00:27 -0700 Subject: [PATCH] Channel bytes are spelled fully, "XXXChannel" --- consensus/pol.go | 5 +++++ consensus/reactor.go | 42 +++++++++++++++++++++--------------------- consensus/state.go | 4 ++-- mempool/reactor.go | 8 ++++---- p2p/connection.go | 26 +++++++++++++++++++------- p2p/peer_set.go | 6 ++++++ p2p/pex_reactor.go | 8 ++++---- p2p/switch.go | 4 ++-- state/state_test.go | 6 +++--- types/block.go | 4 ++++ 10 files changed, 70 insertions(+), 43 deletions(-) diff --git a/consensus/pol.go b/consensus/pol.go index c87b4ee5d..06784d588 100644 --- a/consensus/pol.go +++ b/consensus/pol.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/tendermint/tendermint/account" + "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" @@ -94,3 +95,7 @@ func (pol *POL) StringShort() string { Fingerprint(pol.BlockHash), pol.BlockParts) } } + +func (pol *POL) MakePartSet() *types.PartSet { + return types.NewPartSetFromData(binary.BinaryBytes(pol)) +} diff --git a/consensus/reactor.go b/consensus/reactor.go index 83c244995..6e6f33662 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -17,9 +17,9 @@ import ( ) const ( - StateCh = byte(0x20) - DataCh = byte(0x21) - VoteCh = byte(0x22) + StateChannel = byte(0x20) + DataChannel = byte(0x21) + VoteChannel = byte(0x22) peerStateKey = "ConsensusReactor.peerState" @@ -75,15 +75,15 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor { // TODO optimize return []*p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{ - Id: StateCh, + Id: StateChannel, Priority: 5, }, &p2p.ChannelDescriptor{ - Id: DataCh, + Id: DataChannel, Priority: 5, }, &p2p.ChannelDescriptor{ - Id: VoteCh, + Id: VoteChannel, Priority: 5, }, } @@ -122,7 +122,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_, "bytes", msgBytes) switch chId { - case StateCh: + case StateChannel: switch msg := msg_.(type) { case *NewRoundStepMessage: ps.ApplyNewRoundStepMessage(msg, rs) @@ -134,7 +134,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte // Ignore unknown message } - case DataCh: + case DataChannel: switch msg := msg_.(type) { case *Proposal: ps.SetHasProposal(msg) @@ -155,7 +155,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte // Ignore unknown message } - case VoteCh: + case VoteChannel: switch msg := msg_.(type) { case *VoteMessage: vote := msg.Vote @@ -192,7 +192,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte Type: vote.Type, Index: index, } - conR.sw.Broadcast(StateCh, msg) + conR.sw.Broadcast(StateChannel, msg) } default: @@ -252,10 +252,10 @@ func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() { nrsMsg, csMsg := makeRoundStepMessages(rs) if nrsMsg != nil { - conR.sw.Broadcast(StateCh, nrsMsg) + conR.sw.Broadcast(StateChannel, nrsMsg) } if csMsg != nil { - conR.sw.Broadcast(StateCh, csMsg) + conR.sw.Broadcast(StateChannel, csMsg) } } } @@ -264,10 +264,10 @@ func (conR *ConsensusReactor) sendNewRoundStepRoutine(peer *p2p.Peer) { rs := conR.conS.GetRoundState() nrsMsg, csMsg := makeRoundStepMessages(rs) if nrsMsg != nil { - peer.Send(StateCh, nrsMsg) + peer.Send(StateChannel, nrsMsg) } if csMsg != nil { - peer.Send(StateCh, nrsMsg) + peer.Send(StateChannel, nrsMsg) } } @@ -296,7 +296,7 @@ OUTER_LOOP: Type: partTypeProposalBlock, Part: part, } - peer.Send(DataCh, msg) + peer.Send(DataChannel, msg) ps.SetHasProposalBlockPart(rs.Height, rs.Round, index) continue OUTER_LOOP } @@ -306,7 +306,7 @@ OUTER_LOOP: if 0 < prs.Height && prs.Height < rs.Height { //log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockBitArray", prs.ProposalBlockBitArray) if index, ok := prs.ProposalBlockBitArray.Not().PickRandom(); ok { - // Ensure that the peer's PartSetHeaeder is correct + // Ensure that the peer's PartSetHeader is correct blockMeta := conR.blockStore.LoadBlockMeta(prs.Height) if !blockMeta.Parts.Equals(prs.ProposalBlockParts) { log.Debug("Peer ProposalBlockParts mismatch, sleeping", @@ -329,7 +329,7 @@ OUTER_LOOP: Type: partTypeProposalBlock, Part: part, } - peer.Send(DataCh, msg) + peer.Send(DataChannel, msg) ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) continue OUTER_LOOP } else { @@ -349,7 +349,7 @@ OUTER_LOOP: // Send proposal? if rs.Proposal != nil && !prs.Proposal { msg := p2p.TypedMessage{msgTypeProposal, rs.Proposal} - peer.Send(DataCh, msg) + peer.Send(DataChannel, msg) ps.SetHasProposal(rs.Proposal) continue OUTER_LOOP } @@ -363,7 +363,7 @@ OUTER_LOOP: Type: partTypeProposalPOL, Part: rs.ProposalPOLParts.GetPart(index), } - peer.Send(DataCh, msg) + peer.Send(DataChannel, msg) ps.SetHasProposalPOLPart(rs.Height, rs.Round, index) continue OUTER_LOOP } @@ -397,7 +397,7 @@ OUTER_LOOP: vote := voteSet.GetByIndex(index) // NOTE: vote may be a commit. msg := &VoteMessage{index, vote} - peer.Send(VoteCh, msg) + peer.Send(VoteChannel, msg) ps.SetHasVote(vote, index) return true } @@ -421,7 +421,7 @@ OUTER_LOOP: Signature: commit.Signature, } msg := &VoteMessage{index, vote} - peer.Send(VoteCh, msg) + peer.Send(VoteChannel, msg) ps.SetHasVote(vote, index) return true } diff --git a/consensus/state.go b/consensus/state.go index d44dd7b5a..5fb4268d8 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -641,12 +641,12 @@ func (cs *ConsensusState) RunActionPropose(height uint, round uint) { return } - blockParts = types.NewPartSetFromData(binary.BinaryBytes(block)) + blockParts = block.MakePartSet() pol = cs.LockedPOL // If exists, is a PoUnlock. } if pol != nil { - polParts = types.NewPartSetFromData(binary.BinaryBytes(pol)) + polParts = pol.MakePartSet() } // Make proposal diff --git a/mempool/reactor.go b/mempool/reactor.go index 5bed4e18b..e16cf9332 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -11,7 +11,7 @@ import ( ) var ( - MempoolCh = byte(0x30) + MempoolChannel = byte(0x30) ) // MempoolReactor handles mempool tx broadcasting amongst peers. @@ -52,7 +52,7 @@ func (memR *MempoolReactor) Stop() { func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { return []*p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{ - Id: MempoolCh, + Id: MempoolChannel, Priority: 5, }, } @@ -92,7 +92,7 @@ func (memR *MempoolReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) { if peer.Key == src.Key { continue } - peer.TrySend(MempoolCh, msg) + peer.TrySend(MempoolChannel, msg) } default: @@ -106,7 +106,7 @@ func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error { return err } msg := &TxMessage{Tx: tx} - memR.sw.Broadcast(MempoolCh, msg) + memR.sw.Broadcast(MempoolChannel, msg) return nil } diff --git a/p2p/connection.go b/p2p/connection.go index 89086bc2a..578dea1b3 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -50,8 +50,9 @@ There are two methods for sending messages: func (m MConnection) TrySend(chId byte, msg interface{}) bool {} `Send(chId, msg)` is a blocking call that waits until `msg` is successfully queued -for the channel with the given id byte `chId`. The message `msg` is serialized -using the `tendermint/binary` submodule's `WriteBinary()` reflection routine. +for the channel with the given id byte `chId`, or until the request times out. +The message `msg` is serialized using the `tendermint/binary` submodule's +`WriteBinary()` reflection routine. `TrySend(chId, msg)` is a nonblocking call that returns false if the channel's queue is full. @@ -437,8 +438,19 @@ FOR_LOOP: //----------------------------------------------------------------------------- type ChannelDescriptor struct { - Id byte - Priority uint + Id byte + Priority uint + SendQueueCapacity uint + RecvBufferCapacity uint +} + +func (chDesc *ChannelDescriptor) FillDefaults() { + if chDesc.SendQueueCapacity == 0 { + chDesc.SendQueueCapacity = defaultSendQueueCapacity + } + if chDesc.RecvBufferCapacity == 0 { + chDesc.RecvBufferCapacity = defaultRecvBufferCapacity + } } // TODO: lowercase. @@ -448,7 +460,7 @@ type Channel struct { desc *ChannelDescriptor id byte sendQueue chan []byte - sendQueueSize uint32 + sendQueueSize uint32 // atomic. recving []byte sending []byte priority uint @@ -463,8 +475,8 @@ func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel { conn: conn, desc: desc, id: desc.Id, - sendQueue: make(chan []byte, defaultSendQueueCapacity), - recving: make([]byte, 0, defaultRecvBufferCapacity), + sendQueue: make(chan []byte, desc.SendQueueCapacity), + recving: make([]byte, 0, desc.RecvBufferCapacity), priority: desc.Priority, } } diff --git a/p2p/peer_set.go b/p2p/peer_set.go index b4230ffa3..f365cd8ea 100644 --- a/p2p/peer_set.go +++ b/p2p/peer_set.go @@ -55,6 +55,12 @@ func (ps *PeerSet) Has(peerKey string) bool { return ok } +func (ps *PeerSet) Get(peerKey string) *Peer { + ps.mtx.Lock() + defer ps.mtx.Unlock() + return ps.lookup[peerKey].peer +} + func (ps *PeerSet) Remove(peer *Peer) { ps.mtx.Lock() defer ps.mtx.Unlock() diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 90be9b24c..de742645f 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -14,7 +14,7 @@ import ( var pexErrInvalidMessage = errors.New("Invalid PEX message") const ( - PexCh = byte(0x00) + PexChannel = byte(0x00) ensurePeersPeriodSeconds = 30 minNumOutboundPeers = 10 maxNumPeers = 50 @@ -62,7 +62,7 @@ func (pexR *PEXReactor) Stop() { func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor { return []*ChannelDescriptor{ &ChannelDescriptor{ - Id: PexCh, + Id: PexChannel, Priority: 1, }, } @@ -122,11 +122,11 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) { // Asks peer for more addresses. func (pexR *PEXReactor) RequestPEX(peer *Peer) { - peer.Send(PexCh, &pexRequestMessage{}) + peer.Send(PexChannel, &pexRequestMessage{}) } func (pexR *PEXReactor) SendAddrs(peer *Peer, addrs []*NetAddress) { - peer.Send(PexCh, &pexAddrsMessage{Addrs: addrs}) + peer.Send(PexChannel, &pexAddrsMessage{Addrs: addrs}) } // Ensures that sufficient peers are connected. (continuous) diff --git a/p2p/switch.go b/p2p/switch.go index 635e0ecaa..12267fa10 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -133,7 +133,7 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er // Send handshake msg := &pexHandshakeMessage{ChainId: sw.chainId} - peer.Send(PexCh, msg) + peer.Send(PexChannel, msg) return peer, nil } @@ -164,7 +164,7 @@ func (sw *Switch) IsDialing(addr *NetAddress) bool { return sw.dialing.Has(addr.String()) } -// Broadcast runs a go routine for each attemptted send, which will block +// Broadcast runs a go routine for each attempted send, which will block // trying to send for defaultSendTimeoutSeconds. Returns a channel // which receives success values for each attempted send (false if times out) func (sw *Switch) Broadcast(chId byte, msg interface{}) chan bool { diff --git a/state/state_test.go b/state/state_test.go index da7960a77..71efd80f2 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -91,7 +91,7 @@ func TestGenesisSaveLoad(t *testing.T) { // Make complete block and blockParts block := makeBlock(t, s0, nil, nil) - blockParts := types.NewPartSetFromData(binary.BinaryBytes(block)) + blockParts := block.MakePartSet() // Now append the block to s0. err := s0.AppendBlock(block, blockParts.Header()) @@ -338,7 +338,7 @@ func TestAddValidator(t *testing.T) { // Make complete block and blockParts block0 := makeBlock(t, s0, nil, []types.Tx{bondTx}) - block0Parts := types.NewPartSetFromData(binary.BinaryBytes(block0)) + block0Parts := block0.MakePartSet() // Sanity check if s0.BondedValidators.Size() != 1 { @@ -379,7 +379,7 @@ func TestAddValidator(t *testing.T) { }, }, nil, ) - block1Parts := types.NewPartSetFromData(binary.BinaryBytes(block1)) + block1Parts := block1.MakePartSet() err = s0.AppendBlock(block1, block1Parts.Header()) if err != nil { t.Error("Error appending secondary block:", err) diff --git a/types/block.go b/types/block.go index d56f4e467..11dfb3b9d 100644 --- a/types/block.go +++ b/types/block.go @@ -66,6 +66,10 @@ func (b *Block) Hash() []byte { return merkle.HashFromHashes(hashes) } +func (b *Block) MakePartSet() *PartSet { + return NewPartSetFromData(binary.BinaryBytes(b)) +} + // Convenience. // A nil block never hashes to anything. // Nothing hashes to a nil hash.