Browse Source

p2p: peer.Key -> peer.ID

pull/1048/head
Ethan Buchman 7 years ago
parent
commit
a17105fd46
20 changed files with 137 additions and 117 deletions
  1. +3
    -3
      benchmarks/codec_test.go
  2. +17
    -16
      blockchain/pool.go
  3. +8
    -7
      blockchain/pool_test.go
  4. +5
    -5
      blockchain/reactor.go
  5. +6
    -6
      blockchain/reactor_test.go
  6. +5
    -5
      consensus/reactor.go
  7. +6
    -6
      consensus/replay.go
  8. +24
    -23
      consensus/state.go
  9. +8
    -7
      consensus/types/height_vote_set.go
  10. +1
    -1
      consensus/wal.go
  11. +14
    -3
      p2p/key.go
  12. +8
    -8
      p2p/peer.go
  13. +12
    -12
      p2p/peer_set.go
  14. +7
    -5
      p2p/peer_set_test.go
  15. +2
    -1
      p2p/pex_reactor.go
  16. +1
    -1
      p2p/switch.go
  17. +2
    -2
      p2p/switch_test.go
  18. +3
    -2
      rpc/core/consensus.go
  19. +1
    -1
      rpc/core/types/responses.go
  20. +4
    -3
      types/vote_set.go

+ 3
- 3
benchmarks/codec_test.go View File

@ -17,7 +17,7 @@ func BenchmarkEncodeStatusWire(b *testing.B) {
pubKey := crypto.GenPrivKeyEd25519().PubKey()
status := &ctypes.ResultStatus{
NodeInfo: &p2p.NodeInfo{
PubKey: pubKey.Unwrap().(crypto.PubKeyEd25519),
PubKey: pubKey,
Moniker: "SOMENAME",
Network: "SOMENAME",
RemoteAddr: "SOMEADDR",
@ -42,7 +42,7 @@ func BenchmarkEncodeStatusWire(b *testing.B) {
func BenchmarkEncodeNodeInfoWire(b *testing.B) {
b.StopTimer()
pubKey := crypto.GenPrivKeyEd25519().PubKey().Unwrap().(crypto.PubKeyEd25519)
pubKey := crypto.GenPrivKeyEd25519().PubKey()
nodeInfo := &p2p.NodeInfo{
PubKey: pubKey,
Moniker: "SOMENAME",
@ -63,7 +63,7 @@ func BenchmarkEncodeNodeInfoWire(b *testing.B) {
func BenchmarkEncodeNodeInfoBinary(b *testing.B) {
b.StopTimer()
pubKey := crypto.GenPrivKeyEd25519().PubKey().Unwrap().(crypto.PubKeyEd25519)
pubKey := crypto.GenPrivKeyEd25519().PubKey()
nodeInfo := &p2p.NodeInfo{
PubKey: pubKey,
Moniker: "SOMENAME",


+ 17
- 16
blockchain/pool.go View File

@ -5,6 +5,7 @@ import (
"sync"
"time"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
flow "github.com/tendermint/tmlibs/flowrate"
@ -56,16 +57,16 @@ type BlockPool struct {
height int64 // the lowest key in requesters.
numPending int32 // number of requests pending assignment or block response
// peers
peers map[string]*bpPeer
peers map[p2p.ID]*bpPeer
maxPeerHeight int64
requestsCh chan<- BlockRequest
timeoutsCh chan<- string
timeoutsCh chan<- p2p.ID
}
func NewBlockPool(start int64, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool {
func NewBlockPool(start int64, requestsCh chan<- BlockRequest, timeoutsCh chan<- p2p.ID) *BlockPool {
bp := &BlockPool{
peers: make(map[string]*bpPeer),
peers: make(map[p2p.ID]*bpPeer),
requesters: make(map[int64]*bpRequester),
height: start,
@ -210,7 +211,7 @@ func (pool *BlockPool) RedoRequest(height int64) {
}
// TODO: ensure that blocks come in order for each peer.
func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int) {
func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
@ -240,7 +241,7 @@ func (pool *BlockPool) MaxPeerHeight() int64 {
}
// Sets the peer's alleged blockchain height.
func (pool *BlockPool) SetPeerHeight(peerID string, height int64) {
func (pool *BlockPool) SetPeerHeight(peerID p2p.ID, height int64) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
@ -258,14 +259,14 @@ func (pool *BlockPool) SetPeerHeight(peerID string, height int64) {
}
}
func (pool *BlockPool) RemovePeer(peerID string) {
func (pool *BlockPool) RemovePeer(peerID p2p.ID) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
pool.removePeer(peerID)
}
func (pool *BlockPool) removePeer(peerID string) {
func (pool *BlockPool) removePeer(peerID p2p.ID) {
for _, requester := range pool.requesters {
if requester.getPeerID() == peerID {
if requester.getBlock() != nil {
@ -321,14 +322,14 @@ func (pool *BlockPool) requestersLen() int64 {
return int64(len(pool.requesters))
}
func (pool *BlockPool) sendRequest(height int64, peerID string) {
func (pool *BlockPool) sendRequest(height int64, peerID p2p.ID) {
if !pool.IsRunning() {
return
}
pool.requestsCh <- BlockRequest{height, peerID}
}
func (pool *BlockPool) sendTimeout(peerID string) {
func (pool *BlockPool) sendTimeout(peerID p2p.ID) {
if !pool.IsRunning() {
return
}
@ -357,7 +358,7 @@ func (pool *BlockPool) debug() string {
type bpPeer struct {
pool *BlockPool
id string
id p2p.ID
recvMonitor *flow.Monitor
height int64
@ -368,7 +369,7 @@ type bpPeer struct {
logger log.Logger
}
func newBPPeer(pool *BlockPool, peerID string, height int64) *bpPeer {
func newBPPeer(pool *BlockPool, peerID p2p.ID, height int64) *bpPeer {
peer := &bpPeer{
pool: pool,
id: peerID,
@ -434,7 +435,7 @@ type bpRequester struct {
redoCh chan struct{}
mtx sync.Mutex
peerID string
peerID p2p.ID
block *types.Block
}
@ -458,7 +459,7 @@ func (bpr *bpRequester) OnStart() error {
}
// Returns true if the peer matches
func (bpr *bpRequester) setBlock(block *types.Block, peerID string) bool {
func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool {
bpr.mtx.Lock()
if bpr.block != nil || bpr.peerID != peerID {
bpr.mtx.Unlock()
@ -477,7 +478,7 @@ func (bpr *bpRequester) getBlock() *types.Block {
return bpr.block
}
func (bpr *bpRequester) getPeerID() string {
func (bpr *bpRequester) getPeerID() p2p.ID {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
return bpr.peerID
@ -551,5 +552,5 @@ OUTER_LOOP:
type BlockRequest struct {
Height int64
PeerID string
PeerID p2p.ID
}

+ 8
- 7
blockchain/pool_test.go View File

@ -5,6 +5,7 @@ import (
"testing"
"time"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
@ -15,14 +16,14 @@ func init() {
}
type testPeer struct {
id string
id p2p.ID
height int64
}
func makePeers(numPeers int, minHeight, maxHeight int64) map[string]testPeer {
peers := make(map[string]testPeer, numPeers)
func makePeers(numPeers int, minHeight, maxHeight int64) map[p2p.ID]testPeer {
peers := make(map[p2p.ID]testPeer, numPeers)
for i := 0; i < numPeers; i++ {
peerID := cmn.RandStr(12)
peerID := p2p.ID(cmn.RandStr(12))
height := minHeight + rand.Int63n(maxHeight-minHeight)
peers[peerID] = testPeer{peerID, height}
}
@ -32,7 +33,7 @@ func makePeers(numPeers int, minHeight, maxHeight int64) map[string]testPeer {
func TestBasic(t *testing.T) {
start := int64(42)
peers := makePeers(10, start+1, 1000)
timeoutsCh := make(chan string, 100)
timeoutsCh := make(chan p2p.ID, 100)
requestsCh := make(chan BlockRequest, 100)
pool := NewBlockPool(start, requestsCh, timeoutsCh)
pool.SetLogger(log.TestingLogger())
@ -89,7 +90,7 @@ func TestBasic(t *testing.T) {
func TestTimeout(t *testing.T) {
start := int64(42)
peers := makePeers(10, start+1, 1000)
timeoutsCh := make(chan string, 100)
timeoutsCh := make(chan p2p.ID, 100)
requestsCh := make(chan BlockRequest, 100)
pool := NewBlockPool(start, requestsCh, timeoutsCh)
pool.SetLogger(log.TestingLogger())
@ -127,7 +128,7 @@ func TestTimeout(t *testing.T) {
// Pull from channels
counter := 0
timedOut := map[string]struct{}{}
timedOut := map[p2p.ID]struct{}{}
for {
select {
case peerID := <-timeoutsCh:


+ 5
- 5
blockchain/reactor.go View File

@ -52,7 +52,7 @@ type BlockchainReactor struct {
pool *BlockPool
fastSync bool
requestsCh chan BlockRequest
timeoutsCh chan string
timeoutsCh chan p2p.ID
}
// NewBlockchainReactor returns new reactor instance.
@ -62,7 +62,7 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl
}
requestsCh := make(chan BlockRequest, defaultChannelCapacity)
timeoutsCh := make(chan string, defaultChannelCapacity)
timeoutsCh := make(chan p2p.ID, defaultChannelCapacity)
pool := NewBlockPool(
store.Height()+1,
requestsCh,
@ -131,7 +131,7 @@ func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) {
// RemovePeer implements Reactor by removing peer from the pool.
func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
bcR.pool.RemovePeer(peer.Key())
bcR.pool.RemovePeer(peer.ID())
}
// respondToPeer loads a block and sends it to the requesting peer,
@ -170,7 +170,7 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
}
case *bcBlockResponseMessage:
// Got a block.
bcR.pool.AddBlock(src.Key(), msg.Block, len(msgBytes))
bcR.pool.AddBlock(src.ID(), msg.Block, len(msgBytes))
case *bcStatusRequestMessage:
// Send peer our state.
queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}})
@ -179,7 +179,7 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
}
case *bcStatusResponseMessage:
// Got a peer status. Unverified.
bcR.pool.SetPeerHeight(src.Key(), msg.Height)
bcR.pool.SetPeerHeight(src.ID(), msg.Height)
default:
bcR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
}


+ 6
- 6
blockchain/reactor_test.go View File

@ -55,7 +55,7 @@ func TestNoBlockMessageResponse(t *testing.T) {
defer bcr.Stop()
// Add some peers in
peer := newbcrTestPeer(cmn.RandStr(12))
peer := newbcrTestPeer(p2p.ID(cmn.RandStr(12)))
bcr.AddPeer(peer)
chID := byte(0x01)
@ -113,16 +113,16 @@ func makeBlock(height int64, state sm.State) *types.Block {
// The Test peer
type bcrTestPeer struct {
cmn.Service
key string
ch chan interface{}
id p2p.ID
ch chan interface{}
}
var _ p2p.Peer = (*bcrTestPeer)(nil)
func newbcrTestPeer(key string) *bcrTestPeer {
func newbcrTestPeer(id p2p.ID) *bcrTestPeer {
return &bcrTestPeer{
Service: cmn.NewBaseService(nil, "bcrTestPeer", nil),
key: key,
id: id,
ch: make(chan interface{}, 2),
}
}
@ -144,7 +144,7 @@ func (tp *bcrTestPeer) TrySend(chID byte, value interface{}) bool {
func (tp *bcrTestPeer) Send(chID byte, data interface{}) bool { return tp.TrySend(chID, data) }
func (tp *bcrTestPeer) NodeInfo() *p2p.NodeInfo { return nil }
func (tp *bcrTestPeer) Status() p2p.ConnectionStatus { return p2p.ConnectionStatus{} }
func (tp *bcrTestPeer) Key() string { return tp.key }
func (tp *bcrTestPeer) ID() p2p.ID { return tp.id }
func (tp *bcrTestPeer) IsOutbound() bool { return false }
func (tp *bcrTestPeer) IsPersistent() bool { return true }
func (tp *bcrTestPeer) Get(s string) interface{} { return s }


+ 5
- 5
consensus/reactor.go View File

@ -205,7 +205,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
return
}
// Peer claims to have a maj23 for some BlockID at H,R,S,
votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.Key(), msg.BlockID)
votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.ID(), msg.BlockID)
// Respond with a VoteSetBitsMessage showing which votes we have.
// (and consequently shows which we don't have)
var ourVotes *cmn.BitArray
@ -242,12 +242,12 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
switch msg := msg.(type) {
case *ProposalMessage:
ps.SetHasProposal(msg.Proposal)
conR.conS.peerMsgQueue <- msgInfo{msg, src.Key()}
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
case *ProposalPOLMessage:
ps.ApplyProposalPOLMessage(msg)
case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
conR.conS.peerMsgQueue <- msgInfo{msg, src.Key()}
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
default:
conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
}
@ -267,7 +267,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
ps.SetHasVote(msg.Vote)
cs.peerMsgQueue <- msgInfo{msg, src.Key()}
cs.peerMsgQueue <- msgInfo{msg, src.ID()}
default:
// don't punish (leave room for soft upgrades)
@ -1200,7 +1200,7 @@ func (ps *PeerState) StringIndented(indent string) string {
%s Key %v
%s PRS %v
%s}`,
indent, ps.Peer.Key(),
indent, ps.Peer.ID(),
indent, ps.PeerRoundState.StringIndented(indent+" "),
indent)
}


+ 6
- 6
consensus/replay.go View File

@ -61,21 +61,21 @@ func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepCh chan
}
}
case msgInfo:
peerKey := m.PeerKey
if peerKey == "" {
peerKey = "local"
peerID := m.PeerID
if peerID == "" {
peerID = "local"
}
switch msg := m.Msg.(type) {
case *ProposalMessage:
p := msg.Proposal
cs.Logger.Info("Replay: Proposal", "height", p.Height, "round", p.Round, "header",
p.BlockPartsHeader, "pol", p.POLRound, "peer", peerKey)
p.BlockPartsHeader, "pol", p.POLRound, "peer", peerID)
case *BlockPartMessage:
cs.Logger.Info("Replay: BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey)
cs.Logger.Info("Replay: BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerID)
case *VoteMessage:
v := msg.Vote
cs.Logger.Info("Replay: Vote", "height", v.Height, "round", v.Round, "type", v.Type,
"blockID", v.BlockID, "peer", peerKey)
"blockID", v.BlockID, "peer", peerID)
}
cs.handleMsg(m)


+ 24
- 23
consensus/state.go View File

@ -17,6 +17,7 @@ import (
cfg "github.com/tendermint/tendermint/config"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
)
@ -46,8 +47,8 @@ var (
// msgs from the reactor which may update the state
type msgInfo struct {
Msg ConsensusMessage `json:"msg"`
PeerKey string `json:"peer_key"`
Msg ConsensusMessage `json:"msg"`
PeerID p2p.ID `json:"peer_key"`
}
// internally generated messages which may update the state
@ -303,17 +304,17 @@ func (cs *ConsensusState) OpenWAL(walFile string) (WAL, error) {
//------------------------------------------------------------
// Public interface for passing messages into the consensus state, possibly causing a state transition.
// If peerKey == "", the msg is considered internal.
// If peerID == "", the msg is considered internal.
// Messages are added to the appropriate queue (peer or internal).
// If the queue is full, the function may block.
// TODO: should these return anything or let callers just use events?
// AddVote inputs a vote.
func (cs *ConsensusState) AddVote(vote *types.Vote, peerKey string) (added bool, err error) {
if peerKey == "" {
func (cs *ConsensusState) AddVote(vote *types.Vote, peerID p2p.ID) (added bool, err error) {
if peerID == "" {
cs.internalMsgQueue <- msgInfo{&VoteMessage{vote}, ""}
} else {
cs.peerMsgQueue <- msgInfo{&VoteMessage{vote}, peerKey}
cs.peerMsgQueue <- msgInfo{&VoteMessage{vote}, peerID}
}
// TODO: wait for event?!
@ -321,12 +322,12 @@ func (cs *ConsensusState) AddVote(vote *types.Vote, peerKey string) (added bool,
}
// SetProposal inputs a proposal.
func (cs *ConsensusState) SetProposal(proposal *types.Proposal, peerKey string) error {
func (cs *ConsensusState) SetProposal(proposal *types.Proposal, peerID p2p.ID) error {
if peerKey == "" {
if peerID == "" {
cs.internalMsgQueue <- msgInfo{&ProposalMessage{proposal}, ""}
} else {
cs.peerMsgQueue <- msgInfo{&ProposalMessage{proposal}, peerKey}
cs.peerMsgQueue <- msgInfo{&ProposalMessage{proposal}, peerID}
}
// TODO: wait for event?!
@ -334,12 +335,12 @@ func (cs *ConsensusState) SetProposal(proposal *types.Proposal, peerKey string)
}
// AddProposalBlockPart inputs a part of the proposal block.
func (cs *ConsensusState) AddProposalBlockPart(height int64, round int, part *types.Part, peerKey string) error {
func (cs *ConsensusState) AddProposalBlockPart(height int64, round int, part *types.Part, peerID p2p.ID) error {
if peerKey == "" {
if peerID == "" {
cs.internalMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, ""}
} else {
cs.peerMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, peerKey}
cs.peerMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, peerID}
}
// TODO: wait for event?!
@ -347,13 +348,13 @@ func (cs *ConsensusState) AddProposalBlockPart(height int64, round int, part *ty
}
// SetProposalAndBlock inputs the proposal and all block parts.
func (cs *ConsensusState) SetProposalAndBlock(proposal *types.Proposal, block *types.Block, parts *types.PartSet, peerKey string) error {
if err := cs.SetProposal(proposal, peerKey); err != nil {
func (cs *ConsensusState) SetProposalAndBlock(proposal *types.Proposal, block *types.Block, parts *types.PartSet, peerID p2p.ID) error {
if err := cs.SetProposal(proposal, peerID); err != nil {
return err
}
for i := 0; i < parts.Total(); i++ {
part := parts.GetPart(i)
if err := cs.AddProposalBlockPart(proposal.Height, proposal.Round, part, peerKey); err != nil {
if err := cs.AddProposalBlockPart(proposal.Height, proposal.Round, part, peerID); err != nil {
return err
}
}
@ -561,7 +562,7 @@ func (cs *ConsensusState) handleMsg(mi msgInfo) {
defer cs.mtx.Unlock()
var err error
msg, peerKey := mi.Msg, mi.PeerKey
msg, peerID := mi.Msg, mi.PeerID
switch msg := msg.(type) {
case *ProposalMessage:
// will not cause transition.
@ -569,14 +570,14 @@ func (cs *ConsensusState) handleMsg(mi msgInfo) {
err = cs.setProposal(msg.Proposal)
case *BlockPartMessage:
// if the proposal is complete, we'll enterPrevote or tryFinalizeCommit
_, err = cs.addProposalBlockPart(msg.Height, msg.Part, peerKey != "")
_, err = cs.addProposalBlockPart(msg.Height, msg.Part, peerID != "")
if err != nil && msg.Round != cs.Round {
err = nil
}
case *VoteMessage:
// attempt to add the vote and dupeout the validator if its a duplicate signature
// if the vote gives us a 2/3-any or 2/3-one, we transition
err := cs.tryAddVote(msg.Vote, peerKey)
err := cs.tryAddVote(msg.Vote, peerID)
if err == ErrAddingVote {
// TODO: punish peer
}
@ -591,7 +592,7 @@ func (cs *ConsensusState) handleMsg(mi msgInfo) {
cs.Logger.Error("Unknown msg type", reflect.TypeOf(msg))
}
if err != nil {
cs.Logger.Error("Error with msg", "type", reflect.TypeOf(msg), "peer", peerKey, "err", err, "msg", msg)
cs.Logger.Error("Error with msg", "type", reflect.TypeOf(msg), "peer", peerID, "err", err, "msg", msg)
}
}
@ -1308,8 +1309,8 @@ func (cs *ConsensusState) addProposalBlockPart(height int64, part *types.Part, v
}
// Attempt to add the vote. if its a duplicate signature, dupeout the validator
func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerKey string) error {
_, err := cs.addVote(vote, peerKey)
func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerID p2p.ID) error {
_, err := cs.addVote(vote, peerID)
if err != nil {
// If the vote height is off, we'll just ignore it,
// But if it's a conflicting sig, add it to the cs.evpool.
@ -1335,7 +1336,7 @@ func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerKey string) error {
//-----------------------------------------------------------------------------
func (cs *ConsensusState) addVote(vote *types.Vote, peerKey string) (added bool, err error) {
func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool, err error) {
cs.Logger.Debug("addVote", "voteHeight", vote.Height, "voteType", vote.Type, "valIndex", vote.ValidatorIndex, "csHeight", cs.Height)
// A precommit for the previous height?
@ -1365,7 +1366,7 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerKey string) (added bool,
// A prevote/precommit for this height?
if vote.Height == cs.Height {
height := cs.Height
added, err = cs.Votes.AddVote(vote, peerKey)
added, err = cs.Votes.AddVote(vote, peerID)
if added {
cs.eventBus.PublishEventVote(types.EventDataVote{vote})


+ 8
- 7
consensus/types/height_vote_set.go View File

@ -4,6 +4,7 @@ import (
"strings"
"sync"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
)
@ -35,7 +36,7 @@ type HeightVoteSet struct {
mtx sync.Mutex
round int // max tracked round
roundVoteSets map[int]RoundVoteSet // keys: [0...round]
peerCatchupRounds map[string][]int // keys: peer.Key; values: at most 2 rounds
peerCatchupRounds map[p2p.ID][]int // keys: peer.ID; values: at most 2 rounds
}
func NewHeightVoteSet(chainID string, height int64, valSet *types.ValidatorSet) *HeightVoteSet {
@ -53,7 +54,7 @@ func (hvs *HeightVoteSet) Reset(height int64, valSet *types.ValidatorSet) {
hvs.height = height
hvs.valSet = valSet
hvs.roundVoteSets = make(map[int]RoundVoteSet)
hvs.peerCatchupRounds = make(map[string][]int)
hvs.peerCatchupRounds = make(map[p2p.ID][]int)
hvs.addRound(0)
hvs.round = 0
@ -101,8 +102,8 @@ func (hvs *HeightVoteSet) addRound(round int) {
}
// Duplicate votes return added=false, err=nil.
// By convention, peerKey is "" if origin is self.
func (hvs *HeightVoteSet) AddVote(vote *types.Vote, peerKey string) (added bool, err error) {
// By convention, peerID is "" if origin is self.
func (hvs *HeightVoteSet) AddVote(vote *types.Vote, peerID p2p.ID) (added bool, err error) {
hvs.mtx.Lock()
defer hvs.mtx.Unlock()
if !types.IsVoteTypeValid(vote.Type) {
@ -110,10 +111,10 @@ func (hvs *HeightVoteSet) AddVote(vote *types.Vote, peerKey string) (added bool,
}
voteSet := hvs.getVoteSet(vote.Round, vote.Type)
if voteSet == nil {
if rndz := hvs.peerCatchupRounds[peerKey]; len(rndz) < 2 {
if rndz := hvs.peerCatchupRounds[peerID]; len(rndz) < 2 {
hvs.addRound(vote.Round)
voteSet = hvs.getVoteSet(vote.Round, vote.Type)
hvs.peerCatchupRounds[peerKey] = append(rndz, vote.Round)
hvs.peerCatchupRounds[peerID] = append(rndz, vote.Round)
} else {
// Peer has sent a vote that does not match our round,
// for more than one round. Bad peer!
@ -206,7 +207,7 @@ func (hvs *HeightVoteSet) StringIndented(indent string) string {
// NOTE: if there are too many peers, or too much peer churn,
// this can cause memory issues.
// TODO: implement ability to remove peers too
func (hvs *HeightVoteSet) SetPeerMaj23(round int, type_ byte, peerID string, blockID types.BlockID) {
func (hvs *HeightVoteSet) SetPeerMaj23(round int, type_ byte, peerID p2p.ID, blockID types.BlockID) {
hvs.mtx.Lock()
defer hvs.mtx.Unlock()
if !types.IsVoteTypeValid(type_) {


+ 1
- 1
consensus/wal.go View File

@ -121,7 +121,7 @@ func (wal *baseWAL) Save(msg WALMessage) {
if wal.light {
// in light mode we only write new steps, timeouts, and our own votes (no proposals, block parts)
if mi, ok := msg.(msgInfo); ok {
if mi.PeerKey != "" {
if mi.PeerID != "" {
return
}
}


+ 14
- 3
p2p/key.go View File

@ -2,6 +2,7 @@ package p2p
import (
"bytes"
"encoding/hex"
"encoding/json"
"fmt"
"io/ioutil"
@ -21,8 +22,14 @@ type NodeKey struct {
PrivKey crypto.PrivKey `json:"priv_key"` // our priv key
}
type ID string
// ID returns the peer's canonical ID - the hash of its public key.
func (nodeKey *NodeKey) ID() []byte {
func (nodeKey *NodeKey) ID() ID {
return ID(hex.EncodeToString(nodeKey.id()))
}
func (nodeKey *NodeKey) id() []byte {
return nodeKey.PrivKey.PubKey().Address()
}
@ -31,6 +38,10 @@ func (nodeKey *NodeKey) PubKey() crypto.PubKey {
return nodeKey.PrivKey.PubKey()
}
func (nodeKey *NodeKey) SatisfiesTarget(target []byte) bool {
return bytes.Compare(nodeKey.id(), target) < 0
}
// LoadOrGenNodeKey attempts to load the NodeKey from the given filePath,
// and checks that the corresponding ID is less than the target.
// If the file does not exist, it generates and saves a new NodeKey
@ -41,8 +52,8 @@ func LoadOrGenNodeKey(filePath string, target []byte) (*NodeKey, error) {
if err != nil {
return nil, err
}
if bytes.Compare(nodeKey.ID(), target) >= 0 {
return nil, fmt.Errorf("Loaded ID (%X) does not satisfy target (%X)", nodeKey.ID(), target)
if !nodeKey.SatisfiesTarget(target) {
return nil, fmt.Errorf("Loaded ID (%s) does not satisfy target (%X)", nodeKey.ID(), target)
}
return nodeKey, nil
} else {


+ 8
- 8
p2p/peer.go View File

@ -1,6 +1,7 @@
package p2p
import (
"encoding/hex"
"fmt"
"net"
"time"
@ -17,7 +18,7 @@ import (
type Peer interface {
cmn.Service
Key() string
ID() ID
IsOutbound() bool
IsPersistent() bool
NodeInfo() *NodeInfo
@ -282,15 +283,15 @@ func (p *peer) CanSend(chID byte) bool {
// String representation.
func (p *peer) String() string {
if p.outbound {
return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key())
return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.ID())
}
return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key())
return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID())
}
// Equals reports whenever 2 peers are actually represent the same node.
func (p *peer) Equals(other Peer) bool {
return p.Key() == other.Key()
return p.ID() == other.ID()
}
// Get the data for a given key.
@ -303,10 +304,9 @@ func (p *peer) Set(key string, data interface{}) {
p.Data.Set(key, data)
}
// Key returns the peer's id key.
// TODO: call this ID
func (p *peer) Key() string {
return p.nodeInfo.ListenAddr // XXX: should probably be PubKey.KeyString()
// Key returns the peer's ID - the hex encoded hash of its pubkey.
func (p *peer) ID() ID {
return ID(hex.EncodeToString(p.nodeInfo.PubKey.Address()))
}
// NodeInfo returns a copy of the peer's NodeInfo.


+ 12
- 12
p2p/peer_set.go View File

@ -6,8 +6,8 @@ import (
// IPeerSet has a (immutable) subset of the methods of PeerSet.
type IPeerSet interface {
Has(key string) bool
Get(key string) Peer
Has(key ID) bool
Get(key ID) Peer
List() []Peer
Size() int
}
@ -18,7 +18,7 @@ type IPeerSet interface {
// Iteration over the peers is super fast and thread-safe.
type PeerSet struct {
mtx sync.Mutex
lookup map[string]*peerSetItem
lookup map[ID]*peerSetItem
list []Peer
}
@ -30,7 +30,7 @@ type peerSetItem struct {
// NewPeerSet creates a new peerSet with a list of initial capacity of 256 items.
func NewPeerSet() *PeerSet {
return &PeerSet{
lookup: make(map[string]*peerSetItem),
lookup: make(map[ID]*peerSetItem),
list: make([]Peer, 0, 256),
}
}
@ -40,7 +40,7 @@ func NewPeerSet() *PeerSet {
func (ps *PeerSet) Add(peer Peer) error {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.lookup[peer.Key()] != nil {
if ps.lookup[peer.ID()] != nil {
return ErrSwitchDuplicatePeer
}
@ -48,13 +48,13 @@ func (ps *PeerSet) Add(peer Peer) error {
// Appending is safe even with other goroutines
// iterating over the ps.list slice.
ps.list = append(ps.list, peer)
ps.lookup[peer.Key()] = &peerSetItem{peer, index}
ps.lookup[peer.ID()] = &peerSetItem{peer, index}
return nil
}
// Has returns true iff the PeerSet contains
// the peer referred to by this peerKey.
func (ps *PeerSet) Has(peerKey string) bool {
func (ps *PeerSet) Has(peerKey ID) bool {
ps.mtx.Lock()
_, ok := ps.lookup[peerKey]
ps.mtx.Unlock()
@ -62,7 +62,7 @@ func (ps *PeerSet) Has(peerKey string) bool {
}
// Get looks up a peer by the provided peerKey.
func (ps *PeerSet) Get(peerKey string) Peer {
func (ps *PeerSet) Get(peerKey ID) Peer {
ps.mtx.Lock()
defer ps.mtx.Unlock()
item, ok := ps.lookup[peerKey]
@ -77,7 +77,7 @@ func (ps *PeerSet) Get(peerKey string) Peer {
func (ps *PeerSet) Remove(peer Peer) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
item := ps.lookup[peer.Key()]
item := ps.lookup[peer.ID()]
if item == nil {
return
}
@ -90,18 +90,18 @@ func (ps *PeerSet) Remove(peer Peer) {
// If it's the last peer, that's an easy special case.
if index == len(ps.list)-1 {
ps.list = newList
delete(ps.lookup, peer.Key())
delete(ps.lookup, peer.ID())
return
}
// Replace the popped item with the last item in the old list.
lastPeer := ps.list[len(ps.list)-1]
lastPeerKey := lastPeer.Key()
lastPeerKey := lastPeer.ID()
lastPeerItem := ps.lookup[lastPeerKey]
newList[index] = lastPeer
lastPeerItem.index = index
ps.list = newList
delete(ps.lookup, peer.Key())
delete(ps.lookup, peer.ID())
}
// Size returns the number of unique items in the peerSet.


+ 7
- 5
p2p/peer_set_test.go View File

@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/assert"
crypto "github.com/tendermint/go-crypto"
cmn "github.com/tendermint/tmlibs/common"
)
@ -16,6 +17,7 @@ func randPeer() *peer {
nodeInfo: &NodeInfo{
RemoteAddr: cmn.Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256),
ListenAddr: cmn.Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256),
PubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(),
},
}
}
@ -39,7 +41,7 @@ func TestPeerSetAddRemoveOne(t *testing.T) {
peerSet.Remove(peerAtFront)
wantSize := n - i - 1
for j := 0; j < 2; j++ {
assert.Equal(t, false, peerSet.Has(peerAtFront.Key()), "#%d Run #%d: failed to remove peer", i, j)
assert.Equal(t, false, peerSet.Has(peerAtFront.ID()), "#%d Run #%d: failed to remove peer", i, j)
assert.Equal(t, wantSize, peerSet.Size(), "#%d Run #%d: failed to remove peer and decrement size", i, j)
// Test the route of removing the now non-existent element
peerSet.Remove(peerAtFront)
@ -58,7 +60,7 @@ func TestPeerSetAddRemoveOne(t *testing.T) {
for i := n - 1; i >= 0; i-- {
peerAtEnd := peerList[i]
peerSet.Remove(peerAtEnd)
assert.Equal(t, false, peerSet.Has(peerAtEnd.Key()), "#%d: failed to remove item at end", i)
assert.Equal(t, false, peerSet.Has(peerAtEnd.ID()), "#%d: failed to remove item at end", i)
assert.Equal(t, i, peerSet.Size(), "#%d: differing sizes after peerSet.Remove(atEndPeer)", i)
}
}
@ -82,7 +84,7 @@ func TestPeerSetAddRemoveMany(t *testing.T) {
for i, peer := range peers {
peerSet.Remove(peer)
if peerSet.Has(peer.Key()) {
if peerSet.Has(peer.ID()) {
t.Errorf("Failed to remove peer")
}
if peerSet.Size() != len(peers)-i-1 {
@ -129,7 +131,7 @@ func TestPeerSetGet(t *testing.T) {
t.Parallel()
peerSet := NewPeerSet()
peer := randPeer()
assert.Nil(t, peerSet.Get(peer.Key()), "expecting a nil lookup, before .Add")
assert.Nil(t, peerSet.Get(peer.ID()), "expecting a nil lookup, before .Add")
if err := peerSet.Add(peer); err != nil {
t.Fatalf("Failed to add new peer: %v", err)
@ -142,7 +144,7 @@ func TestPeerSetGet(t *testing.T) {
wg.Add(1)
go func(i int) {
defer wg.Done()
got, want := peerSet.Get(peer.Key()), peer
got, want := peerSet.Get(peer.ID()), peer
assert.Equal(t, got, want, "#%d: got=%v want=%v", i, got, want)
}(i)
}


+ 2
- 1
p2p/pex_reactor.go View File

@ -265,7 +265,8 @@ func (r *PEXReactor) ensurePeers() {
continue
}
// XXX: Should probably use pubkey as peer key ...
if connected := r.Switch.Peers().Has(try.String()); connected {
// TODO: use the ID correctly
if connected := r.Switch.Peers().Has(ID(try.String())); connected {
continue
}
r.Logger.Info("Will dial address", "addr", try)


+ 1
- 1
p2p/switch.go View File

@ -257,7 +257,7 @@ func (sw *Switch) addPeer(peer *peer) error {
}
// Avoid duplicate
if sw.peers.Has(peer.Key()) {
if sw.peers.Has(peer.ID()) {
return ErrSwitchDuplicatePeer
}


+ 2
- 2
p2p/switch_test.go View File

@ -28,7 +28,7 @@ func init() {
}
type PeerMessage struct {
PeerKey string
PeerID ID
Bytes []byte
Counter int
}
@ -77,7 +77,7 @@ func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
tr.mtx.Lock()
defer tr.mtx.Unlock()
//fmt.Printf("Received: %X, %X\n", chID, msgBytes)
tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.Key(), msgBytes, tr.msgsCounter})
tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
tr.msgsCounter++
}
}


+ 3
- 2
rpc/core/consensus.go View File

@ -3,6 +3,7 @@ package core
import (
cm "github.com/tendermint/tendermint/consensus"
cstypes "github.com/tendermint/tendermint/consensus/types"
p2p "github.com/tendermint/tendermint/p2p"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
@ -82,11 +83,11 @@ func Validators(heightPtr *int64) (*ctypes.ResultValidators, error) {
// }
// ```
func DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
peerRoundStates := make(map[string]*cstypes.PeerRoundState)
peerRoundStates := make(map[p2p.ID]*cstypes.PeerRoundState)
for _, peer := range p2pSwitch.Peers().List() {
peerState := peer.Get(types.PeerStateKey).(*cm.PeerState)
peerRoundState := peerState.GetRoundState()
peerRoundStates[peer.Key()] = peerRoundState
peerRoundStates[peer.ID()] = peerRoundState
}
return &ctypes.ResultDumpConsensusState{consensusState.GetRoundState(), peerRoundStates}, nil
}

+ 1
- 1
rpc/core/types/responses.go View File

@ -99,7 +99,7 @@ type ResultValidators struct {
type ResultDumpConsensusState struct {
RoundState *cstypes.RoundState `json:"round_state"`
PeerRoundStates map[string]*cstypes.PeerRoundState `json:"peer_round_states"`
PeerRoundStates map[p2p.ID]*cstypes.PeerRoundState `json:"peer_round_states"`
}
type ResultBroadcastTx struct {


+ 4
- 3
types/vote_set.go View File

@ -8,6 +8,7 @@ import (
"github.com/pkg/errors"
"github.com/tendermint/tendermint/p2p"
cmn "github.com/tendermint/tmlibs/common"
)
@ -58,7 +59,7 @@ type VoteSet struct {
sum int64 // Sum of voting power for seen votes, discounting conflicts
maj23 *BlockID // First 2/3 majority seen
votesByBlock map[string]*blockVotes // string(blockHash|blockParts) -> blockVotes
peerMaj23s map[string]BlockID // Maj23 for each peer
peerMaj23s map[p2p.ID]BlockID // Maj23 for each peer
}
// Constructs a new VoteSet struct used to accumulate votes for given height/round.
@ -77,7 +78,7 @@ func NewVoteSet(chainID string, height int64, round int, type_ byte, valSet *Val
sum: 0,
maj23: nil,
votesByBlock: make(map[string]*blockVotes, valSet.Size()),
peerMaj23s: make(map[string]BlockID),
peerMaj23s: make(map[p2p.ID]BlockID),
}
}
@ -290,7 +291,7 @@ func (voteSet *VoteSet) addVerifiedVote(vote *Vote, blockKey string, votingPower
// this can cause memory issues.
// TODO: implement ability to remove peers too
// NOTE: VoteSet must not be nil
func (voteSet *VoteSet) SetPeerMaj23(peerID string, blockID BlockID) {
func (voteSet *VoteSet) SetPeerMaj23(peerID p2p.ID, blockID BlockID) {
if voteSet == nil {
cmn.PanicSanity("SetPeerMaj23() on nil VoteSet")
}


Loading…
Cancel
Save