You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

370 lines
8.7 KiB

package p2p
import (
tmconn ""
//go:generate mockery --case underscore --name Peer
const metricsTickerDuration = 10 * time.Second
// Peer is an interface representing a peer connected on a reactor.
type Peer interface {
ID() NodeID // peer's cryptographic ID
RemoteIP() net.IP // remote IP of the connection
RemoteAddr() net.Addr // remote address of the connection
IsOutbound() bool // did we dial the peer
IsPersistent() bool // do we redial this peer when we disconnect
CloseConn() error // close original connection
NodeInfo() NodeInfo // peer's info
Status() tmconn.ConnectionStatus
SocketAddr() *NetAddress // actual address of the socket
Send(byte, []byte) bool
TrySend(byte, []byte) bool
Set(string, interface{})
Get(string) interface{}
// peerConn contains the raw connection and its config.
type peerConn struct {
outbound bool
persistent bool
conn Connection
ip net.IP // cached RemoteIP()
func newPeerConn(outbound, persistent bool, conn Connection) peerConn {
return peerConn{
outbound: outbound,
persistent: persistent,
conn: conn,
// Return the IP from the connection RemoteAddr
func (pc peerConn) RemoteIP() net.IP {
if pc.ip == nil {
pc.ip = pc.conn.RemoteEndpoint().IP
return pc.ip
// peer implements Peer.
// Before using a peer, you will need to perform a handshake on connection.
type peer struct {
// raw peerConn and the multiplex connection
// peer's node info and the channel it knows about
// channels = nodeInfo.Channels
// cached to avoid copying nodeInfo in hasChannel
nodeInfo NodeInfo
channels []byte
reactors map[byte]Reactor
onPeerError func(Peer, interface{})
// User data
Data *cmap.CMap
metrics *Metrics
metricsTicker *time.Ticker
type PeerOption func(*peer)
func newPeer(
nodeInfo NodeInfo,
pc peerConn,
reactorsByCh map[byte]Reactor,
onPeerError func(Peer, interface{}),
options ...PeerOption,
) *peer {
p := &peer{
peerConn: pc,
nodeInfo: nodeInfo,
channels: nodeInfo.Channels,
reactors: reactorsByCh,
onPeerError: onPeerError,
Data: cmap.NewCMap(),
metricsTicker: time.NewTicker(metricsTickerDuration),
metrics: NopMetrics(),
p.BaseService = *service.NewBaseService(nil, "Peer", p)
for _, option := range options {
return p
// onError calls the peer error callback.
func (p *peer) onError(err interface{}) {
p.onPeerError(p, err)
// String representation.
func (p *peer) String() string {
if p.outbound {
return fmt.Sprintf("Peer{%v %v out}", p.conn, p.ID())
return fmt.Sprintf("Peer{%v %v in}", p.conn, p.ID())
// Implements service.Service
// SetLogger implements BaseService.
func (p *peer) SetLogger(l log.Logger) {
p.Logger = l
// OnStart implements BaseService.
func (p *peer) OnStart() error {
if err := p.BaseService.OnStart(); err != nil {
return err
go p.processMessages()
go p.metricsReporter()
return nil
// processMessages processes messages received from the connection.
func (p *peer) processMessages() {
defer func() {
if r := recover(); r != nil {
p.Logger.Error("peer message processing panic", "err", r, "stack", string(debug.Stack()))
p.onError(fmt.Errorf("panic during peer message processing: %v", r))
for {
chID, msg, err := p.conn.ReceiveMessage()
if err != nil {
reactor, ok := p.reactors[byte(chID)]
if !ok {
p.onError(fmt.Errorf("unknown channel %v", chID))
reactor.Receive(byte(chID), p, msg)
// FlushStop mimics OnStop but additionally ensures that all successful
// .Send() calls will get flushed before closing the connection.
// NOTE: it is not safe to call this method more than once.
func (p *peer) FlushStop() {
if err := p.conn.FlushClose(); err != nil {
p.Logger.Debug("error while stopping peer", "err", err)
// OnStop implements BaseService.
func (p *peer) OnStop() {
if err := p.conn.Close(); err != nil {
p.Logger.Debug("error while stopping peer", "err", err)
// Implements Peer
// ID returns the peer's ID - the hex encoded hash of its pubkey.
func (p *peer) ID() NodeID {
return p.nodeInfo.ID()
// IsOutbound returns true if the connection is outbound, false otherwise.
func (p *peer) IsOutbound() bool {
return p.peerConn.outbound
// IsPersistent returns true if the peer is persitent, false otherwise.
func (p *peer) IsPersistent() bool {
return p.peerConn.persistent
// NodeInfo returns a copy of the peer's NodeInfo.
func (p *peer) NodeInfo() NodeInfo {
return p.nodeInfo
// SocketAddr returns the address of the socket.
// For outbound peers, it's the address dialed (after DNS resolution).
// For inbound peers, it's the address returned by the underlying connection
// (not what's reported in the peer's NodeInfo).
func (p *peer) SocketAddr() *NetAddress {
endpoint := p.peerConn.conn.RemoteEndpoint()
return &NetAddress{
ID: p.ID(),
IP: endpoint.IP,
Port: endpoint.Port,
// Status returns the peer's ConnectionStatus.
func (p *peer) Status() tmconn.ConnectionStatus {
return p.conn.Status()
// Send msg bytes to the channel identified by chID byte. Returns false if the
// send queue is full after timeout, specified by MConnection.
func (p *peer) Send(chID byte, msgBytes []byte) bool {
if !p.IsRunning() {
// see Switch#Broadcast, where we fetch the list of peers and loop over
// them - while we're looping, one peer may be removed and stopped.
return false
} else if !p.hasChannel(chID) {
return false
res, err := p.conn.SendMessage(ChannelID(chID), msgBytes)
if err == io.EOF {
return false
} else if err != nil {
return false
if res {
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
return res
// TrySend msg bytes to the channel identified by chID byte. Immediately returns
// false if the send queue is full.
func (p *peer) TrySend(chID byte, msgBytes []byte) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(chID) {
return false
res, err := p.conn.TrySendMessage(ChannelID(chID), msgBytes)
if err == io.EOF {
return false
} else if err != nil {
return false
if res {
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
return res
// Get the data for a given key.
func (p *peer) Get(key string) interface{} {
return p.Data.Get(key)
// Set sets the data for the given key.
func (p *peer) Set(key string, data interface{}) {
p.Data.Set(key, data)
// hasChannel returns true if the peer reported
// knowing about the given chID.
func (p *peer) hasChannel(chID byte) bool {
for _, ch := range p.channels {
if ch == chID {
return true
// NOTE: probably will want to remove this
// but could be helpful while the feature is new
"Unknown channel for peer",
return false
// CloseConn closes original connection. Used for cleaning up in cases where the peer had not been started at all.
func (p *peer) CloseConn() error {
return p.peerConn.conn.Close()
// methods only used for testing
// TODO: can we remove these?
// CloseConn closes the underlying connection
func (pc *peerConn) CloseConn() {
// RemoteAddr returns peer's remote network address.
func (p *peer) RemoteAddr() net.Addr {
endpoint := p.conn.RemoteEndpoint()
return &net.TCPAddr{
IP: endpoint.IP,
Port: int(endpoint.Port),
func PeerMetrics(metrics *Metrics) PeerOption {
return func(p *peer) {
p.metrics = metrics
func (p *peer) metricsReporter() {
for {
select {
case <-p.metricsTicker.C:
status := p.conn.Status()
var sendQueueSize float64
for _, chStatus := range status.Channels {
sendQueueSize += float64(chStatus.SendQueueSize)
p.metrics.PeerPendingSendBytes.With("peer_id", string(p.ID())).Set(sendQueueSize)
case <-p.Quit():