|
|
@ -12,10 +12,15 @@ import ( |
|
|
|
) |
|
|
|
|
|
|
|
const ( |
|
|
|
PexChannel = byte(0x00) |
|
|
|
PexChannel = byte(0x00) |
|
|
|
// period to ensure peers connected
|
|
|
|
defaultEnsurePeersPeriod = 30 * time.Second |
|
|
|
minNumOutboundPeers = 10 |
|
|
|
maxPexMessageSize = 1048576 // 1MB
|
|
|
|
|
|
|
|
// maximum messages one peer can send to us during `msgCountByPeerFlushInterval`
|
|
|
|
defaultMaxMsgCountByPeer = 1000 |
|
|
|
msgCountByPeerFlushInterval = 1 * time.Hour |
|
|
|
) |
|
|
|
|
|
|
|
// PEXReactor handles PEX (peer exchange) and ensures that an
|
|
|
@ -26,12 +31,18 @@ type PEXReactor struct { |
|
|
|
sw *Switch |
|
|
|
book *AddrBook |
|
|
|
ensurePeersPeriod time.Duration |
|
|
|
|
|
|
|
// tracks message count by peer, so we can prevent abuse
|
|
|
|
msgCountByPeer map[string]uint16 |
|
|
|
maxMsgCountByPeer uint16 |
|
|
|
} |
|
|
|
|
|
|
|
func NewPEXReactor(b *AddrBook) *PEXReactor { |
|
|
|
r := &PEXReactor{ |
|
|
|
book: b, |
|
|
|
ensurePeersPeriod: defaultEnsurePeersPeriod, |
|
|
|
msgCountByPeer: make(map[string]uint16), |
|
|
|
maxMsgCountByPeer: defaultMaxMsgCountByPeer, |
|
|
|
} |
|
|
|
r.BaseReactor = *NewBaseReactor(log, "PEXReactor", r) |
|
|
|
return r |
|
|
@ -41,6 +52,7 @@ func (r *PEXReactor) OnStart() error { |
|
|
|
r.BaseReactor.OnStart() |
|
|
|
r.book.Start() |
|
|
|
go r.ensurePeersRoutine() |
|
|
|
go r.flushMsgCountByPeer() |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
@ -89,24 +101,29 @@ func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) { |
|
|
|
|
|
|
|
// Receive implements Reactor by handling incoming PEX messages.
|
|
|
|
func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) { |
|
|
|
srcAddr := src.Connection().RemoteAddress |
|
|
|
srcAddrStr := srcAddr.String() |
|
|
|
r.msgCountByPeer[srcAddrStr]++ |
|
|
|
if r.ReachedMaxMsgCountForPeer(srcAddrStr) { |
|
|
|
log.Warn("Maximum number of messages reached for peer", "peer", srcAddrStr) |
|
|
|
// TODO remove src from peers?
|
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
_, msg, err := DecodeMessage(msgBytes) |
|
|
|
if err != nil { |
|
|
|
log.Warn("Error decoding message", "error", err) |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
log.Notice("Received message", "msg", msg) |
|
|
|
|
|
|
|
switch msg := msg.(type) { |
|
|
|
case *pexRequestMessage: |
|
|
|
// src requested some peers.
|
|
|
|
// TODO: prevent abuse.
|
|
|
|
r.SendAddrs(src, r.book.GetSelection()) |
|
|
|
case *pexAddrsMessage: |
|
|
|
// We received some peer addresses from src.
|
|
|
|
// TODO: prevent abuse.
|
|
|
|
// (We don't want to get spammed with bad peers)
|
|
|
|
srcAddr := src.Connection().RemoteAddress |
|
|
|
for _, addr := range msg.Addrs { |
|
|
|
if addr != nil { |
|
|
|
r.book.AddAddress(addr, srcAddr) |
|
|
@ -132,6 +149,17 @@ func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) { |
|
|
|
r.ensurePeersPeriod = d |
|
|
|
} |
|
|
|
|
|
|
|
// SetMaxMsgCountByPeer sets maximum messages one peer can send to us during 'msgCountByPeerFlushInterval'.
|
|
|
|
func (r *PEXReactor) SetMaxMsgCountByPeer(v uint16) { |
|
|
|
r.maxMsgCountByPeer = v |
|
|
|
} |
|
|
|
|
|
|
|
// ReachedMaxMsgCountForPeer returns true if we received too many
|
|
|
|
// messages from peer with address `addr`.
|
|
|
|
func (r *PEXReactor) ReachedMaxMsgCountForPeer(addr string) bool { |
|
|
|
return r.msgCountByPeer[addr] >= r.maxMsgCountByPeer |
|
|
|
} |
|
|
|
|
|
|
|
// Ensures that sufficient peers are connected. (continuous)
|
|
|
|
func (r *PEXReactor) ensurePeersRoutine() { |
|
|
|
// Randomize when routine starts
|
|
|
@ -143,17 +171,16 @@ func (r *PEXReactor) ensurePeersRoutine() { |
|
|
|
|
|
|
|
// fire periodically
|
|
|
|
ticker := time.NewTicker(r.ensurePeersPeriod) |
|
|
|
FOR_LOOP: |
|
|
|
|
|
|
|
for { |
|
|
|
select { |
|
|
|
case <-ticker.C: |
|
|
|
r.ensurePeers() |
|
|
|
case <-r.Quit: |
|
|
|
break FOR_LOOP |
|
|
|
ticker.Stop() |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
ticker.Stop() |
|
|
|
} |
|
|
|
|
|
|
|
// ensurePeers ensures that sufficient peers are connected. (once)
|
|
|
@ -222,6 +249,20 @@ func (r *PEXReactor) ensurePeers() { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (r *PEXReactor) flushMsgCountByPeer() { |
|
|
|
ticker := time.NewTicker(msgCountByPeerFlushInterval) |
|
|
|
|
|
|
|
for { |
|
|
|
select { |
|
|
|
case <-ticker.C: |
|
|
|
r.msgCountByPeer = make(map[string]uint16) |
|
|
|
case <-r.Quit: |
|
|
|
ticker.Stop() |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
//-----------------------------------------------------------------------------
|
|
|
|
// Messages
|
|
|
|
|
|
|
|