package p2p
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"runtime/debug"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/tendermint/tendermint/libs/cmap"
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
"github.com/tendermint/tendermint/libs/service"
|
|
|
|
tmconn "github.com/tendermint/tendermint/p2p/conn"
|
|
)
|
|
|
|
// PeerStatus specifies peer statuses.
|
|
type PeerStatus string
|
|
|
|
const (
|
|
PeerStatusNew = PeerStatus("new") // New peer which we haven't tried to contact yet.
|
|
PeerStatusUp = PeerStatus("up") // Peer which we have an active connection to.
|
|
PeerStatusDown = PeerStatus("down") // Peer which we're temporarily disconnected from.
|
|
PeerStatusRemoved = PeerStatus("removed") // Peer which has been removed.
|
|
PeerStatusBanned = PeerStatus("banned") // Peer which is banned for misbehavior.
|
|
)
|
|
|
|
// PeerPriority specifies peer priorities.
|
|
type PeerPriority int
|
|
|
|
const (
|
|
PeerPriorityNormal PeerPriority = iota + 1
|
|
PeerPriorityValidator
|
|
PeerPriorityPersistent
|
|
)
|
|
|
|
// PeerError is a peer error reported by a reactor via the Error channel. The
|
|
// severity may cause the peer to be disconnected or banned depending on policy.
|
|
type PeerError struct {
|
|
PeerID NodeID
|
|
Err error
|
|
Severity PeerErrorSeverity
|
|
}
|
|
|
|
// PeerErrorSeverity determines the severity of a peer error.
|
|
type PeerErrorSeverity string
|
|
|
|
const (
|
|
PeerErrorSeverityLow PeerErrorSeverity = "low" // Mostly ignored.
|
|
PeerErrorSeverityHigh PeerErrorSeverity = "high" // May disconnect.
|
|
PeerErrorSeverityCritical PeerErrorSeverity = "critical" // Ban.
|
|
)
|
|
|
|
// PeerUpdatesCh defines a wrapper around a PeerUpdate go channel that allows
|
|
// a reactor to listen for peer updates and safely close it when stopping.
|
|
type PeerUpdatesCh struct {
|
|
closeOnce sync.Once
|
|
|
|
// updatesCh defines the go channel in which the router sends peer updates to
|
|
// reactors. Each reactor will have its own PeerUpdatesCh to listen for updates
|
|
// from.
|
|
updatesCh chan PeerUpdate
|
|
|
|
// doneCh is used to signal that a PeerUpdatesCh is closed. It is the
|
|
// reactor's responsibility to invoke Close.
|
|
doneCh chan struct{}
|
|
}
|
|
|
|
// NewPeerUpdates returns a reference to a new PeerUpdatesCh.
|
|
func NewPeerUpdates() *PeerUpdatesCh {
|
|
return &PeerUpdatesCh{
|
|
updatesCh: make(chan PeerUpdate),
|
|
doneCh: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Updates returns a read-only go channel where a consuming reactor can listen
|
|
// for peer updates sent from the router.
|
|
func (puc *PeerUpdatesCh) Updates() <-chan PeerUpdate {
|
|
return puc.updatesCh
|
|
}
|
|
|
|
// Close closes the PeerUpdatesCh channel. It should only be closed by the respective
|
|
// reactor when stopping and ensure nothing is listening for updates.
|
|
//
|
|
// NOTE: After a PeerUpdatesCh is closed, the router may safely assume it can no
|
|
// longer send on the internal updatesCh, however it should NEVER explicitly close
|
|
// it as that could result in panics by sending on a closed channel.
|
|
func (puc *PeerUpdatesCh) Close() {
|
|
puc.closeOnce.Do(func() {
|
|
close(puc.doneCh)
|
|
})
|
|
}
|
|
|
|
// Done returns a read-only version of the PeerUpdatesCh's internal doneCh go
|
|
// channel that should be used by a router to signal when it is safe to explicitly
|
|
// not send any peer updates.
|
|
func (puc *PeerUpdatesCh) Done() <-chan struct{} {
|
|
return puc.doneCh
|
|
}
|
|
|
|
// PeerUpdate is a peer status update for reactors.
|
|
type PeerUpdate struct {
|
|
PeerID NodeID
|
|
Status PeerStatus
|
|
}
|
|
|
|
// ============================================================================
|
|
// Types and business logic below may be deprecated.
|
|
//
|
|
// TODO: Rename once legacy p2p types are removed.
|
|
// ref: https://github.com/tendermint/tendermint/issues/5670
|
|
// ============================================================================
|
|
|
|
//go:generate mockery --case underscore --name Peer
|
|
|
|
const metricsTickerDuration = 10 * time.Second
|
|
|
|
// Peer is an interface representing a peer connected on a reactor.
|
|
type Peer interface {
|
|
service.Service
|
|
FlushStop()
|
|
|
|
ID() NodeID // peer's cryptographic ID
|
|
RemoteIP() net.IP // remote IP of the connection
|
|
RemoteAddr() net.Addr // remote address of the connection
|
|
|
|
IsOutbound() bool // did we dial the peer
|
|
IsPersistent() bool // do we redial this peer when we disconnect
|
|
|
|
CloseConn() error // close original connection
|
|
|
|
NodeInfo() NodeInfo // peer's info
|
|
Status() tmconn.ConnectionStatus
|
|
SocketAddr() *NetAddress // actual address of the socket
|
|
|
|
Send(byte, []byte) bool
|
|
TrySend(byte, []byte) bool
|
|
|
|
Set(string, interface{})
|
|
Get(string) interface{}
|
|
}
|
|
|
|
//----------------------------------------------------------
|
|
|
|
// peerConn contains the raw connection and its config.
|
|
type peerConn struct {
|
|
outbound bool
|
|
persistent bool
|
|
conn Connection
|
|
ip net.IP // cached RemoteIP()
|
|
}
|
|
|
|
func newPeerConn(outbound, persistent bool, conn Connection) peerConn {
|
|
return peerConn{
|
|
outbound: outbound,
|
|
persistent: persistent,
|
|
conn: conn,
|
|
}
|
|
}
|
|
|
|
// ID only exists for SecretConnection.
|
|
func (pc peerConn) ID() NodeID {
|
|
return NodeIDFromPubKey(pc.conn.PubKey())
|
|
}
|
|
|
|
// Return the IP from the connection RemoteAddr
|
|
func (pc peerConn) RemoteIP() net.IP {
|
|
if pc.ip == nil {
|
|
pc.ip = pc.conn.RemoteEndpoint().IP
|
|
}
|
|
return pc.ip
|
|
}
|
|
|
|
// peer implements Peer.
|
|
//
|
|
// Before using a peer, you will need to perform a handshake on connection.
|
|
type peer struct {
|
|
service.BaseService
|
|
|
|
// raw peerConn and the multiplex connection
|
|
peerConn
|
|
|
|
// peer's node info and the channel it knows about
|
|
// channels = nodeInfo.Channels
|
|
// cached to avoid copying nodeInfo in hasChannel
|
|
nodeInfo NodeInfo
|
|
channels []byte
|
|
reactors map[byte]Reactor
|
|
onPeerError func(Peer, interface{})
|
|
|
|
// User data
|
|
Data *cmap.CMap
|
|
|
|
metrics *Metrics
|
|
metricsTicker *time.Ticker
|
|
}
|
|
|
|
type PeerOption func(*peer)
|
|
|
|
func newPeer(
|
|
pc peerConn,
|
|
reactorsByCh map[byte]Reactor,
|
|
onPeerError func(Peer, interface{}),
|
|
options ...PeerOption,
|
|
) *peer {
|
|
nodeInfo := pc.conn.NodeInfo()
|
|
p := &peer{
|
|
peerConn: pc,
|
|
nodeInfo: nodeInfo,
|
|
channels: nodeInfo.Channels, // TODO
|
|
reactors: reactorsByCh,
|
|
onPeerError: onPeerError,
|
|
Data: cmap.NewCMap(),
|
|
metricsTicker: time.NewTicker(metricsTickerDuration),
|
|
metrics: NopMetrics(),
|
|
}
|
|
|
|
p.BaseService = *service.NewBaseService(nil, "Peer", p)
|
|
for _, option := range options {
|
|
option(p)
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
// onError calls the peer error callback.
|
|
func (p *peer) onError(err interface{}) {
|
|
p.onPeerError(p, err)
|
|
}
|
|
|
|
// String representation.
|
|
func (p *peer) String() string {
|
|
if p.outbound {
|
|
return fmt.Sprintf("Peer{%v %v out}", p.conn, p.ID())
|
|
}
|
|
|
|
return fmt.Sprintf("Peer{%v %v in}", p.conn, p.ID())
|
|
}
|
|
|
|
//---------------------------------------------------
|
|
// Implements service.Service
|
|
|
|
// SetLogger implements BaseService.
|
|
func (p *peer) SetLogger(l log.Logger) {
|
|
p.Logger = l
|
|
}
|
|
|
|
// OnStart implements BaseService.
|
|
func (p *peer) OnStart() error {
|
|
if err := p.BaseService.OnStart(); err != nil {
|
|
return err
|
|
}
|
|
|
|
go p.processMessages()
|
|
go p.metricsReporter()
|
|
|
|
return nil
|
|
}
|
|
|
|
// processMessages processes messages received from the connection.
|
|
func (p *peer) processMessages() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
p.Logger.Error("peer message processing panic", "err", r, "stack", string(debug.Stack()))
|
|
p.onError(fmt.Errorf("panic during peer message processing: %v", r))
|
|
}
|
|
}()
|
|
|
|
for {
|
|
chID, msg, err := p.conn.ReceiveMessage()
|
|
if err != nil {
|
|
p.onError(err)
|
|
return
|
|
}
|
|
reactor, ok := p.reactors[chID]
|
|
if !ok {
|
|
p.onError(fmt.Errorf("unknown channel %v", chID))
|
|
return
|
|
}
|
|
reactor.Receive(chID, p, msg)
|
|
}
|
|
}
|
|
|
|
// FlushStop mimics OnStop but additionally ensures that all successful
|
|
// .Send() calls will get flushed before closing the connection.
|
|
// NOTE: it is not safe to call this method more than once.
|
|
func (p *peer) FlushStop() {
|
|
p.metricsTicker.Stop()
|
|
p.BaseService.OnStop()
|
|
if err := p.conn.FlushClose(); err != nil {
|
|
p.Logger.Debug("error while stopping peer", "err", err)
|
|
}
|
|
}
|
|
|
|
// OnStop implements BaseService.
|
|
func (p *peer) OnStop() {
|
|
p.metricsTicker.Stop()
|
|
p.BaseService.OnStop()
|
|
if err := p.conn.Close(); err != nil {
|
|
p.Logger.Debug("error while stopping peer", "err", err)
|
|
}
|
|
}
|
|
|
|
//---------------------------------------------------
|
|
// Implements Peer
|
|
|
|
// ID returns the peer's ID - the hex encoded hash of its pubkey.
|
|
func (p *peer) ID() NodeID {
|
|
return p.nodeInfo.ID()
|
|
}
|
|
|
|
// IsOutbound returns true if the connection is outbound, false otherwise.
|
|
func (p *peer) IsOutbound() bool {
|
|
return p.peerConn.outbound
|
|
}
|
|
|
|
// IsPersistent returns true if the peer is persitent, false otherwise.
|
|
func (p *peer) IsPersistent() bool {
|
|
return p.peerConn.persistent
|
|
}
|
|
|
|
// NodeInfo returns a copy of the peer's NodeInfo.
|
|
func (p *peer) NodeInfo() NodeInfo {
|
|
return p.nodeInfo
|
|
}
|
|
|
|
// SocketAddr returns the address of the socket.
|
|
// For outbound peers, it's the address dialed (after DNS resolution).
|
|
// For inbound peers, it's the address returned by the underlying connection
|
|
// (not what's reported in the peer's NodeInfo).
|
|
func (p *peer) SocketAddr() *NetAddress {
|
|
return p.peerConn.conn.RemoteEndpoint().NetAddress()
|
|
}
|
|
|
|
// Status returns the peer's ConnectionStatus.
|
|
func (p *peer) Status() tmconn.ConnectionStatus {
|
|
return p.conn.Status()
|
|
}
|
|
|
|
// Send msg bytes to the channel identified by chID byte. Returns false if the
|
|
// send queue is full after timeout, specified by MConnection.
|
|
func (p *peer) Send(chID byte, msgBytes []byte) bool {
|
|
if !p.IsRunning() {
|
|
// see Switch#Broadcast, where we fetch the list of peers and loop over
|
|
// them - while we're looping, one peer may be removed and stopped.
|
|
return false
|
|
} else if !p.hasChannel(chID) {
|
|
return false
|
|
}
|
|
res, err := p.conn.SendMessage(chID, msgBytes)
|
|
if err == io.EOF {
|
|
return false
|
|
} else if err != nil {
|
|
p.onError(err)
|
|
return false
|
|
}
|
|
if res {
|
|
labels := []string{
|
|
"peer_id", string(p.ID()),
|
|
"chID", fmt.Sprintf("%#x", chID),
|
|
}
|
|
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
|
|
}
|
|
return res
|
|
}
|
|
|
|
// TrySend msg bytes to the channel identified by chID byte. Immediately returns
|
|
// false if the send queue is full.
|
|
func (p *peer) TrySend(chID byte, msgBytes []byte) bool {
|
|
if !p.IsRunning() {
|
|
return false
|
|
} else if !p.hasChannel(chID) {
|
|
return false
|
|
}
|
|
res, err := p.conn.TrySendMessage(chID, msgBytes)
|
|
if err == io.EOF {
|
|
return false
|
|
} else if err != nil {
|
|
p.onError(err)
|
|
return false
|
|
}
|
|
if res {
|
|
labels := []string{
|
|
"peer_id", string(p.ID()),
|
|
"chID", fmt.Sprintf("%#x", chID),
|
|
}
|
|
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
|
|
}
|
|
return res
|
|
}
|
|
|
|
// Get the data for a given key.
|
|
func (p *peer) Get(key string) interface{} {
|
|
return p.Data.Get(key)
|
|
}
|
|
|
|
// Set sets the data for the given key.
|
|
func (p *peer) Set(key string, data interface{}) {
|
|
p.Data.Set(key, data)
|
|
}
|
|
|
|
// hasChannel returns true if the peer reported
|
|
// knowing about the given chID.
|
|
func (p *peer) hasChannel(chID byte) bool {
|
|
for _, ch := range p.channels {
|
|
if ch == chID {
|
|
return true
|
|
}
|
|
}
|
|
// NOTE: probably will want to remove this
|
|
// but could be helpful while the feature is new
|
|
p.Logger.Debug(
|
|
"Unknown channel for peer",
|
|
"channel",
|
|
chID,
|
|
"channels",
|
|
p.channels,
|
|
)
|
|
return false
|
|
}
|
|
|
|
// CloseConn closes original connection. Used for cleaning up in cases where the peer had not been started at all.
|
|
func (p *peer) CloseConn() error {
|
|
return p.peerConn.conn.Close()
|
|
}
|
|
|
|
//---------------------------------------------------
|
|
// methods only used for testing
|
|
// TODO: can we remove these?
|
|
|
|
// CloseConn closes the underlying connection
|
|
func (pc *peerConn) CloseConn() {
|
|
pc.conn.Close()
|
|
}
|
|
|
|
// RemoteAddr returns peer's remote network address.
|
|
func (p *peer) RemoteAddr() net.Addr {
|
|
endpoint := p.conn.RemoteEndpoint()
|
|
return &net.TCPAddr{
|
|
IP: endpoint.IP,
|
|
Port: int(endpoint.Port),
|
|
}
|
|
}
|
|
|
|
//---------------------------------------------------
|
|
|
|
func PeerMetrics(metrics *Metrics) PeerOption {
|
|
return func(p *peer) {
|
|
p.metrics = metrics
|
|
}
|
|
}
|
|
|
|
func (p *peer) metricsReporter() {
|
|
for {
|
|
select {
|
|
case <-p.metricsTicker.C:
|
|
status := p.conn.Status()
|
|
var sendQueueSize float64
|
|
for _, chStatus := range status.Channels {
|
|
sendQueueSize += float64(chStatus.SendQueueSize)
|
|
}
|
|
|
|
p.metrics.PeerPendingSendBytes.With("peer_id", string(p.ID())).Set(sendQueueSize)
|
|
case <-p.Quit():
|
|
return
|
|
}
|
|
}
|
|
}
|