You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

371 lines
8.8 KiB

package p2p
import (
"fmt"
"io"
"net"
"runtime/debug"
"time"
tmconn "github.com/tendermint/tendermint/internal/p2p/conn"
"github.com/tendermint/tendermint/libs/cmap"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/types"
)
//go:generate mockery --case underscore --name Peer
const metricsTickerDuration = 10 * time.Second
// Peer is an interface representing a peer connected on a reactor.
type Peer interface {
service.Service
FlushStop()
ID() types.NodeID // peer's cryptographic ID
RemoteIP() net.IP // remote IP of the connection
RemoteAddr() net.Addr // remote address of the connection
IsOutbound() bool // did we dial the peer
IsPersistent() bool // do we redial this peer when we disconnect
CloseConn() error // close original connection
NodeInfo() types.NodeInfo // peer's info
Status() tmconn.ConnectionStatus
SocketAddr() *NetAddress // actual address of the socket
Send(byte, []byte) bool
TrySend(byte, []byte) bool
Set(string, interface{})
Get(string) interface{}
}
//----------------------------------------------------------
// peerConn contains the raw connection and its config.
type peerConn struct {
outbound bool
persistent bool
conn Connection
ip net.IP // cached RemoteIP()
}
func newPeerConn(outbound, persistent bool, conn Connection) peerConn {
return peerConn{
outbound: outbound,
persistent: persistent,
conn: conn,
}
}
// Return the IP from the connection RemoteAddr
func (pc peerConn) RemoteIP() net.IP {
if pc.ip == nil {
pc.ip = pc.conn.RemoteEndpoint().IP
}
return pc.ip
}
// peer implements Peer.
//
// Before using a peer, you will need to perform a handshake on connection.
type peer struct {
service.BaseService
// raw peerConn and the multiplex connection
peerConn
// peer's node info and the channel it knows about
// channels = nodeInfo.Channels
// cached to avoid copying nodeInfo in hasChannel
nodeInfo types.NodeInfo
channels []byte
reactors map[byte]Reactor
onPeerError func(Peer, interface{})
// User data
Data *cmap.CMap
metrics *Metrics
metricsTicker *time.Ticker
}
type PeerOption func(*peer)
func newPeer(
nodeInfo types.NodeInfo,
pc peerConn,
reactorsByCh map[byte]Reactor,
onPeerError func(Peer, interface{}),
options ...PeerOption,
) *peer {
p := &peer{
peerConn: pc,
nodeInfo: nodeInfo,
channels: nodeInfo.Channels,
reactors: reactorsByCh,
onPeerError: onPeerError,
Data: cmap.NewCMap(),
metricsTicker: time.NewTicker(metricsTickerDuration),
metrics: NopMetrics(),
}
p.BaseService = *service.NewBaseService(nil, "Peer", p)
for _, option := range options {
option(p)
}
return p
}
// onError calls the peer error callback.
func (p *peer) onError(err interface{}) {
p.onPeerError(p, err)
}
// String representation.
func (p *peer) String() string {
if p.outbound {
return fmt.Sprintf("Peer{%v %v out}", p.conn, p.ID())
}
return fmt.Sprintf("Peer{%v %v in}", p.conn, p.ID())
}
//---------------------------------------------------
// Implements service.Service
// SetLogger implements BaseService.
func (p *peer) SetLogger(l log.Logger) {
p.Logger = l
}
// OnStart implements BaseService.
func (p *peer) OnStart() error {
if err := p.BaseService.OnStart(); err != nil {
return err
}
go p.processMessages()
go p.metricsReporter()
return nil
}
// processMessages processes messages received from the connection.
func (p *peer) processMessages() {
defer func() {
if r := recover(); r != nil {
p.Logger.Error("peer message processing panic", "err", r, "stack", string(debug.Stack()))
p.onError(fmt.Errorf("panic during peer message processing: %v", r))
}
}()
for {
chID, msg, err := p.conn.ReceiveMessage()
if err != nil {
p.onError(err)
return
}
reactor, ok := p.reactors[byte(chID)]
if !ok {
p.onError(fmt.Errorf("unknown channel %v", chID))
return
}
reactor.Receive(byte(chID), p, msg)
}
}
// FlushStop mimics OnStop but additionally ensures that all successful
// .Send() calls will get flushed before closing the connection.
// NOTE: it is not safe to call this method more than once.
func (p *peer) FlushStop() {
p.metricsTicker.Stop()
p.BaseService.OnStop()
if err := p.conn.FlushClose(); err != nil {
p.Logger.Debug("error while stopping peer", "err", err)
}
}
// OnStop implements BaseService.
func (p *peer) OnStop() {
p.metricsTicker.Stop()
p.BaseService.OnStop()
if err := p.conn.Close(); err != nil {
p.Logger.Debug("error while stopping peer", "err", err)
}
}
//---------------------------------------------------
// Implements Peer
// ID returns the peer's ID - the hex encoded hash of its pubkey.
func (p *peer) ID() types.NodeID {
return p.nodeInfo.ID()
}
// IsOutbound returns true if the connection is outbound, false otherwise.
func (p *peer) IsOutbound() bool {
return p.peerConn.outbound
}
// IsPersistent returns true if the peer is persitent, false otherwise.
func (p *peer) IsPersistent() bool {
return p.peerConn.persistent
}
// NodeInfo returns a copy of the peer's NodeInfo.
func (p *peer) NodeInfo() types.NodeInfo {
return p.nodeInfo
}
// SocketAddr returns the address of the socket.
// For outbound peers, it's the address dialed (after DNS resolution).
// For inbound peers, it's the address returned by the underlying connection
// (not what's reported in the peer's NodeInfo).
func (p *peer) SocketAddr() *NetAddress {
endpoint := p.peerConn.conn.RemoteEndpoint()
return &NetAddress{
ID: p.ID(),
IP: endpoint.IP,
Port: endpoint.Port,
}
}
// Status returns the peer's ConnectionStatus.
func (p *peer) Status() tmconn.ConnectionStatus {
return p.conn.Status()
}
// Send msg bytes to the channel identified by chID byte. Returns false if the
// send queue is full after timeout, specified by MConnection.
func (p *peer) Send(chID byte, msgBytes []byte) bool {
if !p.IsRunning() {
// see Switch#Broadcast, where we fetch the list of peers and loop over
// them - while we're looping, one peer may be removed and stopped.
return false
} else if !p.hasChannel(chID) {
return false
}
res, err := p.conn.SendMessage(ChannelID(chID), msgBytes)
if err == io.EOF {
return false
} else if err != nil {
p.onError(err)
return false
}
if res {
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
}
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
}
return res
}
// TrySend msg bytes to the channel identified by chID byte. Immediately returns
// false if the send queue is full.
func (p *peer) TrySend(chID byte, msgBytes []byte) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(chID) {
return false
}
res, err := p.conn.TrySendMessage(ChannelID(chID), msgBytes)
if err == io.EOF {
return false
} else if err != nil {
p.onError(err)
return false
}
if res {
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
}
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
}
return res
}
// Get the data for a given key.
func (p *peer) Get(key string) interface{} {
return p.Data.Get(key)
}
// Set sets the data for the given key.
func (p *peer) Set(key string, data interface{}) {
p.Data.Set(key, data)
}
// hasChannel returns true if the peer reported
// knowing about the given chID.
func (p *peer) hasChannel(chID byte) bool {
for _, ch := range p.channels {
if ch == chID {
return true
}
}
// NOTE: probably will want to remove this
// but could be helpful while the feature is new
p.Logger.Debug(
"Unknown channel for peer",
"channel",
chID,
"channels",
p.channels,
)
return false
}
// CloseConn closes original connection. Used for cleaning up in cases where the peer had not been started at all.
func (p *peer) CloseConn() error {
return p.peerConn.conn.Close()
}
//---------------------------------------------------
// methods only used for testing
// TODO: can we remove these?
// CloseConn closes the underlying connection
func (pc *peerConn) CloseConn() {
pc.conn.Close()
}
// RemoteAddr returns peer's remote network address.
func (p *peer) RemoteAddr() net.Addr {
endpoint := p.conn.RemoteEndpoint()
return &net.TCPAddr{
IP: endpoint.IP,
Port: int(endpoint.Port),
}
}
//---------------------------------------------------
func PeerMetrics(metrics *Metrics) PeerOption {
return func(p *peer) {
p.metrics = metrics
}
}
func (p *peer) metricsReporter() {
for {
select {
case <-p.metricsTicker.C:
status := p.conn.Status()
var sendQueueSize float64
for _, chStatus := range status.Channels {
sendQueueSize += float64(chStatus.SendQueueSize)
}
p.metrics.PeerPendingSendBytes.With("peer_id", string(p.ID())).Set(sendQueueSize)
case <-p.Quit():
return
}
}
}