diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 813d8f6b7..efa6e2f05 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -111,19 +111,19 @@ func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { } // AddPeer implements Reactor by sending our state to peer. -func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) { +func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) { if !peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}}) { // doing nothing, will try later in `poolRoutine` } } // RemovePeer implements Reactor by removing peer from the pool. -func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { - bcR.pool.RemovePeer(peer.Key) +func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { + bcR.pool.RemovePeer(peer.Key()) } // Receive implements Reactor by handling 4 types of messages (look below). -func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { +func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { _, msg, err := DecodeMessage(msgBytes) if err != nil { bcR.Logger.Error("Error decoding message", "err", err) @@ -148,7 +148,7 @@ func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) } case *bcBlockResponseMessage: // Got a block. - bcR.pool.AddBlock(src.Key, msg.Block, len(msgBytes)) + bcR.pool.AddBlock(src.Key(), msg.Block, len(msgBytes)) case *bcStatusRequestMessage: // Send peer our state. queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}}) @@ -157,7 +157,7 @@ func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) } case *bcStatusResponseMessage: // Got a peer status. Unverified. - bcR.pool.SetPeerHeight(src.Key, msg.Height) + bcR.pool.SetPeerHeight(src.Key(), msg.Height) default: bcR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg))) } diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 94a03c7aa..236ba2d40 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -188,7 +188,7 @@ func byzantineDecideProposalFunc(t *testing.T, height, round int, cs *ConsensusS } } -func sendProposalAndParts(height, round int, cs *ConsensusState, peer *p2p.Peer, proposal *types.Proposal, blockHash []byte, parts *types.PartSet) { +func sendProposalAndParts(height, round int, cs *ConsensusState, peer p2p.Peer, proposal *types.Proposal, blockHash []byte, parts *types.PartSet) { // proposal msg := &ProposalMessage{Proposal: proposal} peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) @@ -231,14 +231,14 @@ func NewByzantineReactor(conR *ConsensusReactor) *ByzantineReactor { func (br *ByzantineReactor) SetSwitch(s *p2p.Switch) { br.reactor.SetSwitch(s) } func (br *ByzantineReactor) GetChannels() []*p2p.ChannelDescriptor { return br.reactor.GetChannels() } -func (br *ByzantineReactor) AddPeer(peer *p2p.Peer) { +func (br *ByzantineReactor) AddPeer(peer p2p.Peer) { if !br.reactor.IsRunning() { return } // Create peerState for peer - peerState := NewPeerState(peer) - peer.Data.Set(types.PeerStateKey, peerState) + peerState := NewPeerState(peer).SetLogger(br.reactor.Logger) + peer.Set(types.PeerStateKey, peerState) // Send our state to peer. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus(). @@ -246,10 +246,10 @@ func (br *ByzantineReactor) AddPeer(peer *p2p.Peer) { br.reactor.sendNewRoundStepMessages(peer) } } -func (br *ByzantineReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { +func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) { br.reactor.RemovePeer(peer, reason) } -func (br *ByzantineReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte) { +func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { br.reactor.Receive(chID, peer, msgBytes) } diff --git a/consensus/common_test.go b/consensus/common_test.go index c59a6d969..84f47d021 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -380,9 +380,9 @@ func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerF return css } -func getSwitchIndex(switches []*p2p.Switch, peer *p2p.Peer) int { +func getSwitchIndex(switches []*p2p.Switch, peer p2p.Peer) int { for i, s := range switches { - if bytes.Equal(peer.NodeInfo.PubKey.Address(), s.NodeInfo().PubKey.Address()) { + if bytes.Equal(peer.NodeInfo().PubKey.Address(), s.NodeInfo().PubKey.Address()) { return i } } diff --git a/consensus/reactor.go b/consensus/reactor.go index f4107c653..59cbfea76 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -120,14 +120,14 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor { } // AddPeer implements Reactor -func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { +func (conR *ConsensusReactor) AddPeer(peer p2p.Peer) { if !conR.IsRunning() { return } // Create peerState for peer - peerState := NewPeerState(peer) - peer.Data.Set(types.PeerStateKey, peerState) + peerState := NewPeerState(peer).SetLogger(conR.Logger) + peer.Set(types.PeerStateKey, peerState) // Begin routines for this peer. go conR.gossipDataRoutine(peer, peerState) @@ -142,12 +142,12 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { } // RemovePeer implements Reactor -func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { +func (conR *ConsensusReactor) RemovePeer(peer p2p.Peer, reason interface{}) { if !conR.IsRunning() { return } // TODO - //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect() + //peer.Get(PeerStateKey).(*PeerState).Disconnect() } // Receive implements Reactor @@ -156,7 +156,7 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { // Peer state updates can happen in parallel, but processing of // proposals, block parts, and votes are ordered by the receiveRoutine // NOTE: blocks on consensus state for proposals, block parts, and votes -func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { +func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { if !conR.IsRunning() { conR.Logger.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes) return @@ -171,7 +171,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg) // Get peer states - ps := src.Data.Get(types.PeerStateKey).(*PeerState) + ps := src.Get(types.PeerStateKey).(*PeerState) switch chID { case StateChannel: @@ -191,7 +191,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) return } // Peer claims to have a maj23 for some BlockID at H,R,S, - votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.Key, msg.BlockID) + votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.Key(), msg.BlockID) // Respond with a VoteSetBitsMessage showing which votes we have. // (and consequently shows which we don't have) var ourVotes *cmn.BitArray @@ -228,12 +228,12 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) switch msg := msg.(type) { case *ProposalMessage: ps.SetHasProposal(msg.Proposal) - conR.conS.peerMsgQueue <- msgInfo{msg, src.Key} + conR.conS.peerMsgQueue <- msgInfo{msg, src.Key()} case *ProposalPOLMessage: ps.ApplyProposalPOLMessage(msg) case *BlockPartMessage: ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index) - conR.conS.peerMsgQueue <- msgInfo{msg, src.Key} + conR.conS.peerMsgQueue <- msgInfo{msg, src.Key()} default: conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg))) } @@ -253,7 +253,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) ps.EnsureVoteBitArrays(height-1, lastCommitSize) ps.SetHasVote(msg.Vote) - cs.peerMsgQueue <- msgInfo{msg, src.Key} + cs.peerMsgQueue <- msgInfo{msg, src.Key()} default: // don't punish (leave room for soft upgrades) @@ -367,7 +367,7 @@ func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote) { /* // TODO: Make this broadcast more selective. for _, peer := range conR.Switch.Peers().List() { - ps := peer.Data.Get(PeerStateKey).(*PeerState) + ps := peer.Get(PeerStateKey).(*PeerState) prs := ps.GetRoundState() if prs.Height == vote.Height { // TODO: Also filter on round? @@ -399,7 +399,7 @@ func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg * return } -func (conR *ConsensusReactor) sendNewRoundStepMessages(peer *p2p.Peer) { +func (conR *ConsensusReactor) sendNewRoundStepMessages(peer p2p.Peer) { rs := conR.conS.GetRoundState() nrsMsg, csMsg := makeRoundStepMessages(rs) if nrsMsg != nil { @@ -410,7 +410,7 @@ func (conR *ConsensusReactor) sendNewRoundStepMessages(peer *p2p.Peer) { } } -func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) { +func (conR *ConsensusReactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) { logger := conR.Logger.With("peer", peer) OUTER_LOOP: @@ -492,7 +492,7 @@ OUTER_LOOP: } func (conR *ConsensusReactor) gossipDataForCatchup(logger log.Logger, rs *RoundState, - prs *PeerRoundState, ps *PeerState, peer *p2p.Peer) { + prs *PeerRoundState, ps *PeerState, peer p2p.Peer) { if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok { // Ensure that the peer's PartSetHeader is correct @@ -534,7 +534,7 @@ func (conR *ConsensusReactor) gossipDataForCatchup(logger log.Logger, rs *RoundS } } -func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) { +func (conR *ConsensusReactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) { logger := conR.Logger.With("peer", peer) // Simple hack to throttle logs upon sleep. @@ -644,7 +644,7 @@ func (conR *ConsensusReactor) gossipVotesForHeight(logger log.Logger, rs *RoundS // NOTE: `queryMaj23Routine` has a simple crude design since it only comes // into play for liveness when there's a signature DDoS attack happening. -func (conR *ConsensusReactor) queryMaj23Routine(peer *p2p.Peer, ps *PeerState) { +func (conR *ConsensusReactor) queryMaj23Routine(peer p2p.Peer, ps *PeerState) { logger := conR.Logger.With("peer", peer) OUTER_LOOP: @@ -743,7 +743,7 @@ func (conR *ConsensusReactor) StringIndented(indent string) string { s := "ConsensusReactor{\n" s += indent + " " + conR.conS.StringIndented(indent+" ") + "\n" for _, peer := range conR.Switch.Peers().List() { - ps := peer.Data.Get(types.PeerStateKey).(*PeerState) + ps := peer.Get(types.PeerStateKey).(*PeerState) s += indent + " " + ps.StringIndented(indent+" ") + "\n" } s += indent + "}" @@ -808,16 +808,18 @@ var ( // PeerState contains the known state of a peer, including its connection // and threadsafe access to its PeerRoundState. type PeerState struct { - Peer *p2p.Peer + Peer p2p.Peer + logger log.Logger mtx sync.Mutex PeerRoundState } // NewPeerState returns a new PeerState for the given Peer -func NewPeerState(peer *p2p.Peer) *PeerState { +func NewPeerState(peer p2p.Peer) *PeerState { return &PeerState{ - Peer: peer, + Peer: peer, + logger: log.NewNopLogger(), PeerRoundState: PeerRoundState{ Round: -1, ProposalPOLRound: -1, @@ -827,6 +829,11 @@ func NewPeerState(peer *p2p.Peer) *PeerState { } } +func (ps *PeerState) SetLogger(logger log.Logger) *PeerState { + ps.logger = logger + return ps +} + // GetRoundState returns an atomic snapshot of the PeerRoundState. // There's no point in mutating it since it won't change PeerState. func (ps *PeerState) GetRoundState() *PeerRoundState { @@ -1025,7 +1032,7 @@ func (ps *PeerState) SetHasVote(vote *types.Vote) { } func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) { - logger := ps.Peer.Logger.With("peerRound", ps.Round, "height", height, "round", round) + logger := ps.logger.With("peerRound", ps.Round, "height", height, "round", round) logger.Debug("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index) // NOTE: some may be nil BitArrays -> no side effects. @@ -1163,7 +1170,7 @@ func (ps *PeerState) StringIndented(indent string) string { %s Key %v %s PRS %v %s}`, - indent, ps.Peer.Key, + indent, ps.Peer.Key(), indent, ps.PeerRoundState.StringIndented(indent+" "), indent) } diff --git a/mempool/reactor.go b/mempool/reactor.go index 7dbfa2924..87bac5d92 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -60,18 +60,18 @@ func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { // AddPeer implements Reactor. // It starts a broadcast routine ensuring all txs are forwarded to the given peer. -func (memR *MempoolReactor) AddPeer(peer *p2p.Peer) { +func (memR *MempoolReactor) AddPeer(peer p2p.Peer) { go memR.broadcastTxRoutine(peer) } // RemovePeer implements Reactor. -func (memR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { +func (memR *MempoolReactor) RemovePeer(peer p2p.Peer, reason interface{}) { // broadcast routine checks if peer is gone and returns } // Receive implements Reactor. // It adds any received transactions to the mempool. -func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { +func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { _, msg, err := DecodeMessage(msgBytes) if err != nil { memR.Logger.Error("Error decoding message", "err", err) diff --git a/p2p/peer.go b/p2p/peer.go index c15f61d32..2b986a7a7 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -12,12 +12,29 @@ import ( cmn "github.com/tendermint/tmlibs/common" ) +// Peer is an interface representing a peer connected on a reactor. +type Peer interface { + cmn.Service + + Key() string + IsOutbound() bool + IsPersistent() bool + NodeInfo() *NodeInfo + Status() ConnectionStatus + + Send(byte, interface{}) bool + TrySend(byte, interface{}) bool + + Set(string, interface{}) + Get(string) interface{} +} + // Peer could be marked as persistent, in which case you can use // Redial function to reconnect. Note that inbound peers can't be // made persistent. They should be made persistent on the other end. // // Before using a peer, you will need to perform a handshake on connection. -type Peer struct { +type peer struct { cmn.BaseService outbound bool @@ -28,9 +45,9 @@ type Peer struct { persistent bool config *PeerConfig - *NodeInfo - Key string - Data *cmn.CMap // User data. + nodeInfo *NodeInfo + key string + Data *cmn.CMap // User data. } // PeerConfig is a Peer configuration. @@ -60,7 +77,7 @@ func DefaultPeerConfig() *PeerConfig { } func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, - onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { + onPeerError func(Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peer, error) { conn, err := dial(addr, config) if err != nil { @@ -76,13 +93,13 @@ func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs [] } func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, - onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { + onPeerError func(Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peer, error) { return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config) } func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, - onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { + onPeerError func(Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*peer, error) { conn := rawConn @@ -104,7 +121,7 @@ func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[ } // Key and NodeInfo are set after Handshake - p := &Peer{ + p := &peer{ outbound: outbound, conn: conn, config: config, @@ -119,12 +136,12 @@ func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[ } // CloseConn should be used when the peer was created, but never started. -func (p *Peer) CloseConn() { +func (p *peer) CloseConn() { p.conn.Close() } // makePersistent marks the peer as persistent. -func (p *Peer) makePersistent() { +func (p *peer) makePersistent() { if !p.outbound { panic("inbound peers can't be made persistent") } @@ -133,13 +150,13 @@ func (p *Peer) makePersistent() { } // IsPersistent returns true if the peer is persitent, false otherwise. -func (p *Peer) IsPersistent() bool { +func (p *peer) IsPersistent() bool { return p.persistent } // HandshakeTimeout performs a handshake between a given node and the peer. // NOTE: blocking -func (p *Peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) error { +func (p *peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) error { // Set deadline for handshake so we don't block forever on conn.ReadFull p.conn.SetDeadline(time.Now().Add(timeout)) @@ -176,19 +193,19 @@ func (p *Peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) er peerNodeInfo.RemoteAddr = p.Addr().String() - p.NodeInfo = peerNodeInfo - p.Key = peerNodeInfo.PubKey.KeyString() + p.nodeInfo = peerNodeInfo + p.key = peerNodeInfo.PubKey.KeyString() return nil } // Addr returns peer's remote network address. -func (p *Peer) Addr() net.Addr { +func (p *peer) Addr() net.Addr { return p.conn.RemoteAddr() } // PubKey returns peer's public key. -func (p *Peer) PubKey() crypto.PubKeyEd25519 { +func (p *peer) PubKey() crypto.PubKeyEd25519 { if p.config.AuthEnc { return p.conn.(*SecretConnection).RemotePubKey() } @@ -199,31 +216,31 @@ func (p *Peer) PubKey() crypto.PubKeyEd25519 { } // OnStart implements BaseService. -func (p *Peer) OnStart() error { +func (p *peer) OnStart() error { p.BaseService.OnStart() _, err := p.mconn.Start() return err } // OnStop implements BaseService. -func (p *Peer) OnStop() { +func (p *peer) OnStop() { p.BaseService.OnStop() p.mconn.Stop() } // Connection returns underlying MConnection. -func (p *Peer) Connection() *MConnection { +func (p *peer) Connection() *MConnection { return p.mconn } // IsOutbound returns true if the connection is outbound, false otherwise. -func (p *Peer) IsOutbound() bool { +func (p *peer) IsOutbound() bool { return p.outbound } // Send msg to the channel identified by chID byte. Returns false if the send // queue is full after timeout, specified by MConnection. -func (p *Peer) Send(chID byte, msg interface{}) bool { +func (p *peer) Send(chID byte, msg interface{}) bool { if !p.IsRunning() { // see Switch#Broadcast, where we fetch the list of peers and loop over // them - while we're looping, one peer may be removed and stopped. @@ -234,7 +251,7 @@ func (p *Peer) Send(chID byte, msg interface{}) bool { // TrySend msg to the channel identified by chID byte. Immediately returns // false if the send queue is full. -func (p *Peer) TrySend(chID byte, msg interface{}) bool { +func (p *peer) TrySend(chID byte, msg interface{}) bool { if !p.IsRunning() { return false } @@ -242,7 +259,7 @@ func (p *Peer) TrySend(chID byte, msg interface{}) bool { } // CanSend returns true if the send queue is not full, false otherwise. -func (p *Peer) CanSend(chID byte) bool { +func (p *peer) CanSend(chID byte) bool { if !p.IsRunning() { return false } @@ -250,32 +267,53 @@ func (p *Peer) CanSend(chID byte) bool { } // WriteTo writes the peer's public key to w. -func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { +func (p *peer) WriteTo(w io.Writer) (n int64, err error) { var n_ int - wire.WriteString(p.Key, w, &n_, &err) + wire.WriteString(p.key, w, &n_, &err) n += int64(n_) return } // String representation. -func (p *Peer) String() string { +func (p *peer) String() string { if p.outbound { - return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12]) + return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.key[:12]) } - return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12]) + return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.key[:12]) } // Equals reports whenever 2 peers are actually represent the same node. -func (p *Peer) Equals(other *Peer) bool { - return p.Key == other.Key +func (p *peer) Equals(other Peer) bool { + return p.key == other.Key() } // Get the data for a given key. -func (p *Peer) Get(key string) interface{} { +func (p *peer) Get(key string) interface{} { return p.Data.Get(key) } +// Set sets the data for the given key. +func (p *peer) Set(key string, data interface{}) { + p.Data.Set(key, data) +} + +// Key returns the peer's id key. +func (p *peer) Key() string { + return p.key +} + +// NodeInfo returns a copy of the peer's NodeInfo. +func (p *peer) NodeInfo() *NodeInfo { + n := *p.nodeInfo // copy + return &n +} + +// Status returns the peer's ConnectionStatus. +func (p *peer) Status() ConnectionStatus { + return p.mconn.Status() +} + func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) { conn, err := addr.DialTimeout(config.DialTimeout * time.Second) if err != nil { @@ -284,8 +322,8 @@ func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) { return conn, nil } -func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, - onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection { +func createMConnection(conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, + onPeerError func(Peer, interface{}), config *MConnConfig) *MConnection { onReceive := func(chID byte, msgBytes []byte) { reactor := reactorsByCh[chID] diff --git a/p2p/peer_set.go b/p2p/peer_set.go index a5a443d87..c21748cf9 100644 --- a/p2p/peer_set.go +++ b/p2p/peer_set.go @@ -7,8 +7,8 @@ import ( // IPeerSet has a (immutable) subset of the methods of PeerSet. type IPeerSet interface { Has(key string) bool - Get(key string) *Peer - List() []*Peer + Get(key string) Peer + List() []Peer Size() int } @@ -19,11 +19,11 @@ type IPeerSet interface { type PeerSet struct { mtx sync.Mutex lookup map[string]*peerSetItem - list []*Peer + list []Peer } type peerSetItem struct { - peer *Peer + peer Peer index int } @@ -31,16 +31,16 @@ type peerSetItem struct { func NewPeerSet() *PeerSet { return &PeerSet{ lookup: make(map[string]*peerSetItem), - list: make([]*Peer, 0, 256), + list: make([]Peer, 0, 256), } } // Add adds the peer to the PeerSet. // It returns ErrSwitchDuplicatePeer if the peer is already present. -func (ps *PeerSet) Add(peer *Peer) error { +func (ps *PeerSet) Add(peer Peer) error { ps.mtx.Lock() defer ps.mtx.Unlock() - if ps.lookup[peer.Key] != nil { + if ps.lookup[peer.Key()] != nil { return ErrSwitchDuplicatePeer } @@ -48,7 +48,7 @@ func (ps *PeerSet) Add(peer *Peer) error { // Appending is safe even with other goroutines // iterating over the ps.list slice. ps.list = append(ps.list, peer) - ps.lookup[peer.Key] = &peerSetItem{peer, index} + ps.lookup[peer.Key()] = &peerSetItem{peer, index} return nil } @@ -62,7 +62,7 @@ func (ps *PeerSet) Has(peerKey string) bool { } // Get looks up a peer by the provided peerKey. -func (ps *PeerSet) Get(peerKey string) *Peer { +func (ps *PeerSet) Get(peerKey string) Peer { ps.mtx.Lock() defer ps.mtx.Unlock() item, ok := ps.lookup[peerKey] @@ -74,10 +74,10 @@ func (ps *PeerSet) Get(peerKey string) *Peer { } // Remove discards peer by its Key, if the peer was previously memoized. -func (ps *PeerSet) Remove(peer *Peer) { +func (ps *PeerSet) Remove(peer Peer) { ps.mtx.Lock() defer ps.mtx.Unlock() - item := ps.lookup[peer.Key] + item := ps.lookup[peer.Key()] if item == nil { return } @@ -85,23 +85,23 @@ func (ps *PeerSet) Remove(peer *Peer) { index := item.index // Create a new copy of the list but with one less item. // (we must copy because we'll be mutating the list). - newList := make([]*Peer, len(ps.list)-1) + newList := make([]Peer, len(ps.list)-1) copy(newList, ps.list) // If it's the last peer, that's an easy special case. if index == len(ps.list)-1 { ps.list = newList - delete(ps.lookup, peer.Key) + delete(ps.lookup, peer.Key()) return } // Replace the popped item with the last item in the old list. lastPeer := ps.list[len(ps.list)-1] - lastPeerKey := lastPeer.Key + lastPeerKey := lastPeer.Key() lastPeerItem := ps.lookup[lastPeerKey] newList[index] = lastPeer lastPeerItem.index = index ps.list = newList - delete(ps.lookup, peer.Key) + delete(ps.lookup, peer.Key()) } // Size returns the number of unique items in the peerSet. @@ -112,7 +112,7 @@ func (ps *PeerSet) Size() int { } // List returns the threadsafe list of peers. -func (ps *PeerSet) List() []*Peer { +func (ps *PeerSet) List() []Peer { ps.mtx.Lock() defer ps.mtx.Unlock() return ps.list diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index 309b7e970..e37455256 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -11,10 +11,10 @@ import ( ) // Returns an empty dummy peer -func randPeer() *Peer { - return &Peer{ - Key: cmn.RandStr(12), - NodeInfo: &NodeInfo{ +func randPeer() *peer { + return &peer{ + key: cmn.RandStr(12), + nodeInfo: &NodeInfo{ RemoteAddr: cmn.Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256), ListenAddr: cmn.Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256), }, @@ -25,7 +25,7 @@ func TestPeerSetAddRemoveOne(t *testing.T) { t.Parallel() peerSet := NewPeerSet() - var peerList []*Peer + var peerList []Peer for i := 0; i < 5; i++ { p := randPeer() peerSet.Add(p) @@ -38,7 +38,7 @@ func TestPeerSetAddRemoveOne(t *testing.T) { peerSet.Remove(peerAtFront) wantSize := n - i - 1 for j := 0; j < 2; j++ { - assert.Equal(t, false, peerSet.Has(peerAtFront.Key), "#%d Run #%d: failed to remove peer", i, j) + assert.Equal(t, false, peerSet.Has(peerAtFront.Key()), "#%d Run #%d: failed to remove peer", i, j) assert.Equal(t, wantSize, peerSet.Size(), "#%d Run #%d: failed to remove peer and decrement size", i, j) // Test the route of removing the now non-existent element peerSet.Remove(peerAtFront) @@ -55,7 +55,7 @@ func TestPeerSetAddRemoveOne(t *testing.T) { for i := n - 1; i >= 0; i-- { peerAtEnd := peerList[i] peerSet.Remove(peerAtEnd) - assert.Equal(t, false, peerSet.Has(peerAtEnd.Key), "#%d: failed to remove item at end", i) + assert.Equal(t, false, peerSet.Has(peerAtEnd.Key()), "#%d: failed to remove item at end", i) assert.Equal(t, i, peerSet.Size(), "#%d: differing sizes after peerSet.Remove(atEndPeer)", i) } } @@ -64,7 +64,7 @@ func TestPeerSetAddRemoveMany(t *testing.T) { t.Parallel() peerSet := NewPeerSet() - peers := []*Peer{} + peers := []Peer{} N := 100 for i := 0; i < N; i++ { peer := randPeer() @@ -79,7 +79,7 @@ func TestPeerSetAddRemoveMany(t *testing.T) { for i, peer := range peers { peerSet.Remove(peer) - if peerSet.Has(peer.Key) { + if peerSet.Has(peer.Key()) { t.Errorf("Failed to remove peer") } if peerSet.Size() != len(peers)-i-1 { @@ -126,7 +126,7 @@ func TestPeerSetGet(t *testing.T) { t.Parallel() peerSet := NewPeerSet() peer := randPeer() - assert.Nil(t, peerSet.Get(peer.Key), "expecting a nil lookup, before .Add") + assert.Nil(t, peerSet.Get(peer.Key()), "expecting a nil lookup, before .Add") if err := peerSet.Add(peer); err != nil { t.Fatalf("Failed to add new peer: %v", err) @@ -139,7 +139,7 @@ func TestPeerSetGet(t *testing.T) { wg.Add(1) go func(i int) { defer wg.Done() - got, want := peerSet.Get(peer.Key), peer + got, want := peerSet.Get(peer.Key()), peer assert.Equal(t, got, want, "#%d: got=%v want=%v", i, got, want) }(i) } diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 347de784c..ba52b22a4 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -76,13 +76,13 @@ func TestPeerSend(t *testing.T) { assert.True(p.Send(0x01, "Asylum")) } -func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) (*Peer, error) { +func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) (*peer, error) { chDescs := []*ChannelDescriptor{ &ChannelDescriptor{ID: 0x01, Priority: 1}, } reactorsByCh := map[byte]Reactor{0x01: NewTestReactor(chDescs, true)} pk := crypto.GenPrivKeyEd25519() - p, err := newOutboundPeer(addr, reactorsByCh, chDescs, func(p *Peer, r interface{}) {}, pk, config) + p, err := newOutboundPeer(addr, reactorsByCh, chDescs, func(p Peer, r interface{}) {}, pk, config) if err != nil { return nil, err } @@ -133,7 +133,7 @@ func (p *remotePeer) accept(l net.Listener) { if err != nil { golog.Fatalf("Failed to accept conn: %+v", err) } - peer, err := newInboundPeer(conn, make(map[byte]Reactor), make([]*ChannelDescriptor, 0), func(p *Peer, r interface{}) {}, p.PrivKey, p.Config) + peer, err := newInboundPeer(conn, make(map[byte]Reactor), make([]*ChannelDescriptor, 0), func(p Peer, r interface{}) {}, p.PrivKey, p.Config) if err != nil { golog.Fatalf("Failed to create a peer: %+v", err) } diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 30ebc3a7e..69ab55cc9 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -92,7 +92,7 @@ func (r *PEXReactor) GetChannels() []*ChannelDescriptor { // AddPeer implements Reactor by adding peer to the address book (if inbound) // or by requesting more addresses (if outbound). -func (r *PEXReactor) AddPeer(p *Peer) { +func (r *PEXReactor) AddPeer(p Peer) { if p.IsOutbound() { // For outbound peers, the address is already in the books. // Either it was added in DialSeeds or when we @@ -101,10 +101,10 @@ func (r *PEXReactor) AddPeer(p *Peer) { r.RequestPEX(p) } } else { // For inbound connections, the peer is its own source - addr, err := NewNetAddressString(p.ListenAddr) + addr, err := NewNetAddressString(p.NodeInfo().ListenAddr) if err != nil { // this should never happen - r.Logger.Error("Error in AddPeer: invalid peer address", "addr", p.ListenAddr, "err", err) + r.Logger.Error("Error in AddPeer: invalid peer address", "addr", p.NodeInfo().ListenAddr, "err", err) return } r.book.AddAddress(addr, addr) @@ -112,15 +112,15 @@ func (r *PEXReactor) AddPeer(p *Peer) { } // RemovePeer implements Reactor. -func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) { +func (r *PEXReactor) RemovePeer(p Peer, reason interface{}) { // If we aren't keeping track of local temp data for each peer here, then we // don't have to do anything. } // Receive implements Reactor by handling incoming PEX messages. -func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) { - srcAddr := src.Connection().RemoteAddress - srcAddrStr := srcAddr.String() +func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) { + srcAddrStr := src.NodeInfo().RemoteAddr + srcAddr, _ := NewNetAddressString(srcAddrStr) r.IncrementMsgCountForPeer(srcAddrStr) if r.ReachedMaxMsgCountForPeer(srcAddrStr) { @@ -154,12 +154,12 @@ func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) { } // RequestPEX asks peer for more addresses. -func (r *PEXReactor) RequestPEX(p *Peer) { +func (r *PEXReactor) RequestPEX(p Peer) { p.Send(PexChannel, struct{ PexMessage }{&pexRequestMessage{}}) } // SendAddrs sends addrs to the peer. -func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) { +func (r *PEXReactor) SendAddrs(p Peer, addrs []*NetAddress) { p.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}) } diff --git a/p2p/pex_reactor_test.go b/p2p/pex_reactor_test.go index 0748486e5..b2c15ed89 100644 --- a/p2p/pex_reactor_test.go +++ b/p2p/pex_reactor_test.go @@ -129,7 +129,7 @@ func TestPEXReactorReceive(t *testing.T) { peer := createRandomPeer(false) size := book.Size() - netAddr, _ := NewNetAddressString(peer.ListenAddr) + netAddr, _ := NewNetAddressString(peer.NodeInfo().ListenAddr) addrs := []*NetAddress{netAddr} msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}) r.Receive(PexChannel, peer, msg) @@ -159,16 +159,17 @@ func TestPEXReactorAbuseFromPeer(t *testing.T) { r.Receive(PexChannel, peer, msg) } - assert.True(r.ReachedMaxMsgCountForPeer(peer.ListenAddr)) + assert.True(r.ReachedMaxMsgCountForPeer(peer.NodeInfo().ListenAddr)) } -func createRandomPeer(outbound bool) *Peer { +func createRandomPeer(outbound bool) *peer { addr := cmn.Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256) netAddr, _ := NewNetAddressString(addr) - p := &Peer{ - Key: cmn.RandStr(12), - NodeInfo: &NodeInfo{ + p := &peer{ + key: cmn.RandStr(12), + nodeInfo: &NodeInfo{ ListenAddr: addr, + RemoteAddr: netAddr.String(), }, outbound: outbound, mconn: &MConnection{RemoteAddress: netAddr}, diff --git a/p2p/switch.go b/p2p/switch.go index d92dd6370..d1d615398 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -22,9 +22,9 @@ type Reactor interface { SetSwitch(*Switch) GetChannels() []*ChannelDescriptor - AddPeer(peer *Peer) - RemovePeer(peer *Peer, reason interface{}) - Receive(chID byte, peer *Peer, msgBytes []byte) + AddPeer(peer Peer) + RemovePeer(peer Peer, reason interface{}) + Receive(chID byte, peer Peer, msgBytes []byte) } //-------------------------------------- @@ -44,10 +44,10 @@ func NewBaseReactor(name string, impl Reactor) *BaseReactor { func (br *BaseReactor) SetSwitch(sw *Switch) { br.Switch = sw } -func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil } -func (_ *BaseReactor) AddPeer(peer *Peer) {} -func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{}) {} -func (_ *BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {} +func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil } +func (_ *BaseReactor) AddPeer(peer Peer) {} +func (_ *BaseReactor) RemovePeer(peer Peer, reason interface{}) {} +func (_ *BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {} //----------------------------------------------------------------------------- @@ -213,7 +213,8 @@ func (sw *Switch) OnStop() { // and to all registered reactors. // NOTE: This performs a blocking handshake before the peer is added. // CONTRACT: If error is returned, peer is nil, and conn is immediately closed. -func (sw *Switch) AddPeer(peer *Peer) error { +func (sw *Switch) AddPeer(peer *peer) error { + if err := sw.FilterConnByAddr(peer.Addr()); err != nil { return err } @@ -232,12 +233,12 @@ func (sw *Switch) AddPeer(peer *Peer) error { } // Check version, chain id - if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo); err != nil { + if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo()); err != nil { return err } // Check for duplicate peer - if sw.peers.Has(peer.Key) { + if sw.peers.Has(peer.Key()) { return ErrSwitchDuplicatePeer } @@ -285,7 +286,7 @@ func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) { sw.filterConnByPubKey = f } -func (sw *Switch) startInitPeer(peer *Peer) { +func (sw *Switch) startInitPeer(peer *peer) { peer.Start() // spawn send/recv routines for _, reactor := range sw.reactors { reactor.AddPeer(peer) @@ -337,7 +338,7 @@ func (sw *Switch) dialSeed(addr *NetAddress) { // DialPeerWithAddress dials the given peer and runs sw.AddPeer if it connects successfully. // If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails. -func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) { +func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (Peer, error) { sw.dialing.Set(addr.IP.String(), addr) defer sw.dialing.Delete(addr.IP.String()) @@ -375,7 +376,7 @@ func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool { successChan := make(chan bool, len(sw.peers.List())) sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg) for _, peer := range sw.peers.List() { - go func(peer *Peer) { + go func(peer Peer) { success := peer.Send(chID, msg) successChan <- success }(peer) @@ -387,7 +388,7 @@ func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool { func (sw *Switch) NumPeers() (outbound, inbound, dialing int) { peers := sw.peers.List() for _, peer := range peers { - if peer.outbound { + if peer.IsOutbound() { outbound++ } else { inbound++ @@ -405,8 +406,8 @@ func (sw *Switch) Peers() IPeerSet { // StopPeerForError disconnects from a peer due to external error. // If the peer is persistent, it will attempt to reconnect. // TODO: make record depending on reason. -func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { - addr := NewNetAddress(peer.Addr()) +func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) { + addr, _ := NewNetAddressString(peer.NodeInfo().RemoteAddr) sw.Logger.Error("Stopping peer for error", "peer", peer, "err", reason) sw.stopAndRemovePeer(peer, reason) @@ -438,12 +439,12 @@ func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { // StopPeerGracefully disconnects from a peer gracefully. // TODO: handle graceful disconnects. -func (sw *Switch) StopPeerGracefully(peer *Peer) { +func (sw *Switch) StopPeerGracefully(peer Peer) { sw.Logger.Info("Stopping peer gracefully") sw.stopAndRemovePeer(peer, nil) } -func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) { +func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) { sw.peers.Remove(peer) peer.Stop() for _, reactor := range sw.reactors { @@ -483,11 +484,11 @@ func (sw *Switch) listenerRoutine(l Listener) { //----------------------------------------------------------------------------- type SwitchEventNewPeer struct { - Peer *Peer + Peer Peer } type SwitchEventDonePeer struct { - Peer *Peer + Peer Peer Error interface{} } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index c686f9e46..d2435dfff 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -37,8 +37,8 @@ type TestReactor struct { mtx sync.Mutex channels []*ChannelDescriptor - peersAdded []*Peer - peersRemoved []*Peer + peersAdded []Peer + peersRemoved []Peer logMessages bool msgsCounter int msgsReceived map[byte][]PeerMessage @@ -59,24 +59,24 @@ func (tr *TestReactor) GetChannels() []*ChannelDescriptor { return tr.channels } -func (tr *TestReactor) AddPeer(peer *Peer) { +func (tr *TestReactor) AddPeer(peer Peer) { tr.mtx.Lock() defer tr.mtx.Unlock() tr.peersAdded = append(tr.peersAdded, peer) } -func (tr *TestReactor) RemovePeer(peer *Peer, reason interface{}) { +func (tr *TestReactor) RemovePeer(peer Peer, reason interface{}) { tr.mtx.Lock() defer tr.mtx.Unlock() tr.peersRemoved = append(tr.peersRemoved, peer) } -func (tr *TestReactor) Receive(chID byte, peer *Peer, msgBytes []byte) { +func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) { if tr.logMessages { tr.mtx.Lock() defer tr.mtx.Unlock() //fmt.Printf("Received: %X, %X\n", chID, msgBytes) - tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.Key, msgBytes, tr.msgsCounter}) + tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.Key(), msgBytes, tr.msgsCounter}) tr.msgsCounter++ } } diff --git a/rpc/core/consensus.go b/rpc/core/consensus.go index 848288447..0429c8d45 100644 --- a/rpc/core/consensus.go +++ b/rpc/core/consensus.go @@ -86,9 +86,9 @@ func DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) { peerRoundStates := []string{} for _, peer := range p2pSwitch.Peers().List() { // TODO: clean this up? - peerState := peer.Data.Get(types.PeerStateKey).(*cm.PeerState) + peerState := peer.Get(types.PeerStateKey).(*cm.PeerState) peerRoundState := peerState.GetRoundState() - peerRoundStateStr := peer.Key + ":" + string(wire.JSONBytes(peerRoundState)) + peerRoundStateStr := peer.Key() + ":" + string(wire.JSONBytes(peerRoundState)) peerRoundStates = append(peerRoundStates, peerRoundStateStr) } return &ctypes.ResultDumpConsensusState{roundState.String(), peerRoundStates}, nil diff --git a/rpc/core/net.go b/rpc/core/net.go index 6c2dc587e..c6749941c 100644 --- a/rpc/core/net.go +++ b/rpc/core/net.go @@ -42,9 +42,9 @@ func NetInfo() (*ctypes.ResultNetInfo, error) { peers := []ctypes.Peer{} for _, peer := range p2pSwitch.Peers().List() { peers = append(peers, ctypes.Peer{ - NodeInfo: *peer.NodeInfo, + NodeInfo: *peer.NodeInfo(), IsOutbound: peer.IsOutbound(), - ConnectionStatus: peer.Connection().Status(), + ConnectionStatus: peer.Status(), }) } return &ctypes.ResultNetInfo{