Browse Source

Channel bytes are spelled fully, "XXXChannel"

pull/43/merge
Jae Kwon 10 years ago
parent
commit
0237d284cc
10 changed files with 70 additions and 43 deletions
  1. +5
    -0
      consensus/pol.go
  2. +21
    -21
      consensus/reactor.go
  3. +2
    -2
      consensus/state.go
  4. +4
    -4
      mempool/reactor.go
  5. +19
    -7
      p2p/connection.go
  6. +6
    -0
      p2p/peer_set.go
  7. +4
    -4
      p2p/pex_reactor.go
  8. +2
    -2
      p2p/switch.go
  9. +3
    -3
      state/state_test.go
  10. +4
    -0
      types/block.go

+ 5
- 0
consensus/pol.go View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"github.com/tendermint/tendermint/account" "github.com/tendermint/tendermint/account"
"github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/common"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
@ -94,3 +95,7 @@ func (pol *POL) StringShort() string {
Fingerprint(pol.BlockHash), pol.BlockParts) Fingerprint(pol.BlockHash), pol.BlockParts)
} }
} }
func (pol *POL) MakePartSet() *types.PartSet {
return types.NewPartSetFromData(binary.BinaryBytes(pol))
}

+ 21
- 21
consensus/reactor.go View File

@ -17,9 +17,9 @@ import (
) )
const ( const (
StateCh = byte(0x20)
DataCh = byte(0x21)
VoteCh = byte(0x22)
StateChannel = byte(0x20)
DataChannel = byte(0x21)
VoteChannel = byte(0x22)
peerStateKey = "ConsensusReactor.peerState" peerStateKey = "ConsensusReactor.peerState"
@ -75,15 +75,15 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
// TODO optimize // TODO optimize
return []*p2p.ChannelDescriptor{ return []*p2p.ChannelDescriptor{
&p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{
Id: StateCh,
Id: StateChannel,
Priority: 5, Priority: 5,
}, },
&p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{
Id: DataCh,
Id: DataChannel,
Priority: 5, Priority: 5,
}, },
&p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{
Id: VoteCh,
Id: VoteChannel,
Priority: 5, Priority: 5,
}, },
} }
@ -122,7 +122,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_, "bytes", msgBytes) log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_, "bytes", msgBytes)
switch chId { switch chId {
case StateCh:
case StateChannel:
switch msg := msg_.(type) { switch msg := msg_.(type) {
case *NewRoundStepMessage: case *NewRoundStepMessage:
ps.ApplyNewRoundStepMessage(msg, rs) ps.ApplyNewRoundStepMessage(msg, rs)
@ -134,7 +134,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
// Ignore unknown message // Ignore unknown message
} }
case DataCh:
case DataChannel:
switch msg := msg_.(type) { switch msg := msg_.(type) {
case *Proposal: case *Proposal:
ps.SetHasProposal(msg) ps.SetHasProposal(msg)
@ -155,7 +155,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
// Ignore unknown message // Ignore unknown message
} }
case VoteCh:
case VoteChannel:
switch msg := msg_.(type) { switch msg := msg_.(type) {
case *VoteMessage: case *VoteMessage:
vote := msg.Vote vote := msg.Vote
@ -192,7 +192,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
Type: vote.Type, Type: vote.Type,
Index: index, Index: index,
} }
conR.sw.Broadcast(StateCh, msg)
conR.sw.Broadcast(StateChannel, msg)
} }
default: default:
@ -252,10 +252,10 @@ func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
nrsMsg, csMsg := makeRoundStepMessages(rs) nrsMsg, csMsg := makeRoundStepMessages(rs)
if nrsMsg != nil { if nrsMsg != nil {
conR.sw.Broadcast(StateCh, nrsMsg)
conR.sw.Broadcast(StateChannel, nrsMsg)
} }
if csMsg != nil { if csMsg != nil {
conR.sw.Broadcast(StateCh, csMsg)
conR.sw.Broadcast(StateChannel, csMsg)
} }
} }
} }
@ -264,10 +264,10 @@ func (conR *ConsensusReactor) sendNewRoundStepRoutine(peer *p2p.Peer) {
rs := conR.conS.GetRoundState() rs := conR.conS.GetRoundState()
nrsMsg, csMsg := makeRoundStepMessages(rs) nrsMsg, csMsg := makeRoundStepMessages(rs)
if nrsMsg != nil { if nrsMsg != nil {
peer.Send(StateCh, nrsMsg)
peer.Send(StateChannel, nrsMsg)
} }
if csMsg != nil { if csMsg != nil {
peer.Send(StateCh, nrsMsg)
peer.Send(StateChannel, nrsMsg)
} }
} }
@ -296,7 +296,7 @@ OUTER_LOOP:
Type: partTypeProposalBlock, Type: partTypeProposalBlock,
Part: part, Part: part,
} }
peer.Send(DataCh, msg)
peer.Send(DataChannel, msg)
ps.SetHasProposalBlockPart(rs.Height, rs.Round, index) ps.SetHasProposalBlockPart(rs.Height, rs.Round, index)
continue OUTER_LOOP continue OUTER_LOOP
} }
@ -306,7 +306,7 @@ OUTER_LOOP:
if 0 < prs.Height && prs.Height < rs.Height { if 0 < prs.Height && prs.Height < rs.Height {
//log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockBitArray", prs.ProposalBlockBitArray) //log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockBitArray", prs.ProposalBlockBitArray)
if index, ok := prs.ProposalBlockBitArray.Not().PickRandom(); ok { if index, ok := prs.ProposalBlockBitArray.Not().PickRandom(); ok {
// Ensure that the peer's PartSetHeaeder is correct
// Ensure that the peer's PartSetHeader is correct
blockMeta := conR.blockStore.LoadBlockMeta(prs.Height) blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
if !blockMeta.Parts.Equals(prs.ProposalBlockParts) { if !blockMeta.Parts.Equals(prs.ProposalBlockParts) {
log.Debug("Peer ProposalBlockParts mismatch, sleeping", log.Debug("Peer ProposalBlockParts mismatch, sleeping",
@ -329,7 +329,7 @@ OUTER_LOOP:
Type: partTypeProposalBlock, Type: partTypeProposalBlock,
Part: part, Part: part,
} }
peer.Send(DataCh, msg)
peer.Send(DataChannel, msg)
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
continue OUTER_LOOP continue OUTER_LOOP
} else { } else {
@ -349,7 +349,7 @@ OUTER_LOOP:
// Send proposal? // Send proposal?
if rs.Proposal != nil && !prs.Proposal { if rs.Proposal != nil && !prs.Proposal {
msg := p2p.TypedMessage{msgTypeProposal, rs.Proposal} msg := p2p.TypedMessage{msgTypeProposal, rs.Proposal}
peer.Send(DataCh, msg)
peer.Send(DataChannel, msg)
ps.SetHasProposal(rs.Proposal) ps.SetHasProposal(rs.Proposal)
continue OUTER_LOOP continue OUTER_LOOP
} }
@ -363,7 +363,7 @@ OUTER_LOOP:
Type: partTypeProposalPOL, Type: partTypeProposalPOL,
Part: rs.ProposalPOLParts.GetPart(index), Part: rs.ProposalPOLParts.GetPart(index),
} }
peer.Send(DataCh, msg)
peer.Send(DataChannel, msg)
ps.SetHasProposalPOLPart(rs.Height, rs.Round, index) ps.SetHasProposalPOLPart(rs.Height, rs.Round, index)
continue OUTER_LOOP continue OUTER_LOOP
} }
@ -397,7 +397,7 @@ OUTER_LOOP:
vote := voteSet.GetByIndex(index) vote := voteSet.GetByIndex(index)
// NOTE: vote may be a commit. // NOTE: vote may be a commit.
msg := &VoteMessage{index, vote} msg := &VoteMessage{index, vote}
peer.Send(VoteCh, msg)
peer.Send(VoteChannel, msg)
ps.SetHasVote(vote, index) ps.SetHasVote(vote, index)
return true return true
} }
@ -421,7 +421,7 @@ OUTER_LOOP:
Signature: commit.Signature, Signature: commit.Signature,
} }
msg := &VoteMessage{index, vote} msg := &VoteMessage{index, vote}
peer.Send(VoteCh, msg)
peer.Send(VoteChannel, msg)
ps.SetHasVote(vote, index) ps.SetHasVote(vote, index)
return true return true
} }


+ 2
- 2
consensus/state.go View File

@ -641,12 +641,12 @@ func (cs *ConsensusState) RunActionPropose(height uint, round uint) {
return return
} }
blockParts = types.NewPartSetFromData(binary.BinaryBytes(block))
blockParts = block.MakePartSet()
pol = cs.LockedPOL // If exists, is a PoUnlock. pol = cs.LockedPOL // If exists, is a PoUnlock.
} }
if pol != nil { if pol != nil {
polParts = types.NewPartSetFromData(binary.BinaryBytes(pol))
polParts = pol.MakePartSet()
} }
// Make proposal // Make proposal


+ 4
- 4
mempool/reactor.go View File

@ -11,7 +11,7 @@ import (
) )
var ( var (
MempoolCh = byte(0x30)
MempoolChannel = byte(0x30)
) )
// MempoolReactor handles mempool tx broadcasting amongst peers. // MempoolReactor handles mempool tx broadcasting amongst peers.
@ -52,7 +52,7 @@ func (memR *MempoolReactor) Stop() {
func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{ return []*p2p.ChannelDescriptor{
&p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{
Id: MempoolCh,
Id: MempoolChannel,
Priority: 5, Priority: 5,
}, },
} }
@ -92,7 +92,7 @@ func (memR *MempoolReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) {
if peer.Key == src.Key { if peer.Key == src.Key {
continue continue
} }
peer.TrySend(MempoolCh, msg)
peer.TrySend(MempoolChannel, msg)
} }
default: default:
@ -106,7 +106,7 @@ func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error {
return err return err
} }
msg := &TxMessage{Tx: tx} msg := &TxMessage{Tx: tx}
memR.sw.Broadcast(MempoolCh, msg)
memR.sw.Broadcast(MempoolChannel, msg)
return nil return nil
} }


+ 19
- 7
p2p/connection.go View File

@ -50,8 +50,9 @@ There are two methods for sending messages:
func (m MConnection) TrySend(chId byte, msg interface{}) bool {} func (m MConnection) TrySend(chId byte, msg interface{}) bool {}
`Send(chId, msg)` is a blocking call that waits until `msg` is successfully queued `Send(chId, msg)` is a blocking call that waits until `msg` is successfully queued
for the channel with the given id byte `chId`. The message `msg` is serialized
using the `tendermint/binary` submodule's `WriteBinary()` reflection routine.
for the channel with the given id byte `chId`, or until the request times out.
The message `msg` is serialized using the `tendermint/binary` submodule's
`WriteBinary()` reflection routine.
`TrySend(chId, msg)` is a nonblocking call that returns false if the channel's `TrySend(chId, msg)` is a nonblocking call that returns false if the channel's
queue is full. queue is full.
@ -437,8 +438,19 @@ FOR_LOOP:
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
type ChannelDescriptor struct { type ChannelDescriptor struct {
Id byte
Priority uint
Id byte
Priority uint
SendQueueCapacity uint
RecvBufferCapacity uint
}
func (chDesc *ChannelDescriptor) FillDefaults() {
if chDesc.SendQueueCapacity == 0 {
chDesc.SendQueueCapacity = defaultSendQueueCapacity
}
if chDesc.RecvBufferCapacity == 0 {
chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
}
} }
// TODO: lowercase. // TODO: lowercase.
@ -448,7 +460,7 @@ type Channel struct {
desc *ChannelDescriptor desc *ChannelDescriptor
id byte id byte
sendQueue chan []byte sendQueue chan []byte
sendQueueSize uint32
sendQueueSize uint32 // atomic.
recving []byte recving []byte
sending []byte sending []byte
priority uint priority uint
@ -463,8 +475,8 @@ func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
conn: conn, conn: conn,
desc: desc, desc: desc,
id: desc.Id, id: desc.Id,
sendQueue: make(chan []byte, defaultSendQueueCapacity),
recving: make([]byte, 0, defaultRecvBufferCapacity),
sendQueue: make(chan []byte, desc.SendQueueCapacity),
recving: make([]byte, 0, desc.RecvBufferCapacity),
priority: desc.Priority, priority: desc.Priority,
} }
} }


+ 6
- 0
p2p/peer_set.go View File

@ -55,6 +55,12 @@ func (ps *PeerSet) Has(peerKey string) bool {
return ok return ok
} }
func (ps *PeerSet) Get(peerKey string) *Peer {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return ps.lookup[peerKey].peer
}
func (ps *PeerSet) Remove(peer *Peer) { func (ps *PeerSet) Remove(peer *Peer) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()


+ 4
- 4
p2p/pex_reactor.go View File

@ -14,7 +14,7 @@ import (
var pexErrInvalidMessage = errors.New("Invalid PEX message") var pexErrInvalidMessage = errors.New("Invalid PEX message")
const ( const (
PexCh = byte(0x00)
PexChannel = byte(0x00)
ensurePeersPeriodSeconds = 30 ensurePeersPeriodSeconds = 30
minNumOutboundPeers = 10 minNumOutboundPeers = 10
maxNumPeers = 50 maxNumPeers = 50
@ -62,7 +62,7 @@ func (pexR *PEXReactor) Stop() {
func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor { func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor {
return []*ChannelDescriptor{ return []*ChannelDescriptor{
&ChannelDescriptor{ &ChannelDescriptor{
Id: PexCh,
Id: PexChannel,
Priority: 1, Priority: 1,
}, },
} }
@ -122,11 +122,11 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) {
// Asks peer for more addresses. // Asks peer for more addresses.
func (pexR *PEXReactor) RequestPEX(peer *Peer) { func (pexR *PEXReactor) RequestPEX(peer *Peer) {
peer.Send(PexCh, &pexRequestMessage{})
peer.Send(PexChannel, &pexRequestMessage{})
} }
func (pexR *PEXReactor) SendAddrs(peer *Peer, addrs []*NetAddress) { func (pexR *PEXReactor) SendAddrs(peer *Peer, addrs []*NetAddress) {
peer.Send(PexCh, &pexAddrsMessage{Addrs: addrs})
peer.Send(PexChannel, &pexAddrsMessage{Addrs: addrs})
} }
// Ensures that sufficient peers are connected. (continuous) // Ensures that sufficient peers are connected. (continuous)


+ 2
- 2
p2p/switch.go View File

@ -133,7 +133,7 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er
// Send handshake // Send handshake
msg := &pexHandshakeMessage{ChainId: sw.chainId} msg := &pexHandshakeMessage{ChainId: sw.chainId}
peer.Send(PexCh, msg)
peer.Send(PexChannel, msg)
return peer, nil return peer, nil
} }
@ -164,7 +164,7 @@ func (sw *Switch) IsDialing(addr *NetAddress) bool {
return sw.dialing.Has(addr.String()) return sw.dialing.Has(addr.String())
} }
// Broadcast runs a go routine for each attemptted send, which will block
// Broadcast runs a go routine for each attempted send, which will block
// trying to send for defaultSendTimeoutSeconds. Returns a channel // trying to send for defaultSendTimeoutSeconds. Returns a channel
// which receives success values for each attempted send (false if times out) // which receives success values for each attempted send (false if times out)
func (sw *Switch) Broadcast(chId byte, msg interface{}) chan bool { func (sw *Switch) Broadcast(chId byte, msg interface{}) chan bool {


+ 3
- 3
state/state_test.go View File

@ -91,7 +91,7 @@ func TestGenesisSaveLoad(t *testing.T) {
// Make complete block and blockParts // Make complete block and blockParts
block := makeBlock(t, s0, nil, nil) block := makeBlock(t, s0, nil, nil)
blockParts := types.NewPartSetFromData(binary.BinaryBytes(block))
blockParts := block.MakePartSet()
// Now append the block to s0. // Now append the block to s0.
err := s0.AppendBlock(block, blockParts.Header()) err := s0.AppendBlock(block, blockParts.Header())
@ -338,7 +338,7 @@ func TestAddValidator(t *testing.T) {
// Make complete block and blockParts // Make complete block and blockParts
block0 := makeBlock(t, s0, nil, []types.Tx{bondTx}) block0 := makeBlock(t, s0, nil, []types.Tx{bondTx})
block0Parts := types.NewPartSetFromData(binary.BinaryBytes(block0))
block0Parts := block0.MakePartSet()
// Sanity check // Sanity check
if s0.BondedValidators.Size() != 1 { if s0.BondedValidators.Size() != 1 {
@ -379,7 +379,7 @@ func TestAddValidator(t *testing.T) {
}, },
}, nil, }, nil,
) )
block1Parts := types.NewPartSetFromData(binary.BinaryBytes(block1))
block1Parts := block1.MakePartSet()
err = s0.AppendBlock(block1, block1Parts.Header()) err = s0.AppendBlock(block1, block1Parts.Header())
if err != nil { if err != nil {
t.Error("Error appending secondary block:", err) t.Error("Error appending secondary block:", err)


+ 4
- 0
types/block.go View File

@ -66,6 +66,10 @@ func (b *Block) Hash() []byte {
return merkle.HashFromHashes(hashes) return merkle.HashFromHashes(hashes)
} }
func (b *Block) MakePartSet() *PartSet {
return NewPartSetFromData(binary.BinaryBytes(b))
}
// Convenience. // Convenience.
// A nil block never hashes to anything. // A nil block never hashes to anything.
// Nothing hashes to a nil hash. // Nothing hashes to a nil hash.


Loading…
Cancel
Save