|
|
@ -31,25 +31,13 @@ const ( |
|
|
|
maxActiveIDs = math.MaxUint16 |
|
|
|
) |
|
|
|
|
|
|
|
// MempoolWithWait extends the standard Mempool interface to allow reactor to
|
|
|
|
// wait for transactions and iterate on them once there are any. Also it
|
|
|
|
// includes ReapMaxTxs function, which is useful for testing.
|
|
|
|
//
|
|
|
|
// UNSTABLE
|
|
|
|
type MempoolWithWait interface { |
|
|
|
Mempool |
|
|
|
TxsFront() *clist.CElement |
|
|
|
TxsWaitChan() <-chan struct{} |
|
|
|
ReapMaxTxs(n int) types.Txs |
|
|
|
} |
|
|
|
|
|
|
|
// MempoolReactor handles mempool tx broadcasting amongst peers.
|
|
|
|
// Reactor handles mempool tx broadcasting amongst peers.
|
|
|
|
// It maintains a map from peer ID to counter, to prevent gossiping txs to the
|
|
|
|
// peers you received it from.
|
|
|
|
type MempoolReactor struct { |
|
|
|
type Reactor struct { |
|
|
|
p2p.BaseReactor |
|
|
|
config *cfg.MempoolConfig |
|
|
|
Mempool MempoolWithWait |
|
|
|
mempool Mempool |
|
|
|
ids *mempoolIDs |
|
|
|
} |
|
|
|
|
|
|
@ -116,14 +104,14 @@ func newMempoolIDs() *mempoolIDs { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// NewMempoolReactor returns a new MempoolReactor with the given config and mempool.
|
|
|
|
func NewMempoolReactor(config *cfg.MempoolConfig, mempool MempoolWithWait) *MempoolReactor { |
|
|
|
memR := &MempoolReactor{ |
|
|
|
// NewReactor returns a new Reactor with the given config and mempool.
|
|
|
|
func NewReactor(config *cfg.MempoolConfig, mempool Mempool) *Reactor { |
|
|
|
memR := &Reactor{ |
|
|
|
config: config, |
|
|
|
Mempool: mempool, |
|
|
|
mempool: mempool, |
|
|
|
ids: newMempoolIDs(), |
|
|
|
} |
|
|
|
memR.BaseReactor = *p2p.NewBaseReactor("MempoolReactor", memR) |
|
|
|
memR.BaseReactor = *p2p.NewBaseReactor("Reactor", memR) |
|
|
|
return memR |
|
|
|
} |
|
|
|
|
|
|
@ -131,18 +119,18 @@ type mempoolWithLogger interface { |
|
|
|
SetLogger(log.Logger) |
|
|
|
} |
|
|
|
|
|
|
|
// SetLogger sets the Logger on the reactor and the underlying Mempool.
|
|
|
|
func (memR *MempoolReactor) SetLogger(l log.Logger) { |
|
|
|
// SetLogger sets the Logger on the reactor and the underlying mempool.
|
|
|
|
func (memR *Reactor) SetLogger(l log.Logger) { |
|
|
|
memR.Logger = l |
|
|
|
|
|
|
|
// set mempoolWithLogger if mempool supports it
|
|
|
|
if mem, ok := memR.Mempool.(mempoolWithLogger); ok { |
|
|
|
if mem, ok := memR.mempool.(mempoolWithLogger); ok { |
|
|
|
mem.SetLogger(l) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// OnStart implements p2p.BaseReactor.
|
|
|
|
func (memR *MempoolReactor) OnStart() error { |
|
|
|
func (memR *Reactor) OnStart() error { |
|
|
|
if !memR.config.Broadcast { |
|
|
|
memR.Logger.Info("Tx broadcasting is disabled") |
|
|
|
} |
|
|
@ -151,7 +139,7 @@ func (memR *MempoolReactor) OnStart() error { |
|
|
|
|
|
|
|
// GetChannels implements Reactor.
|
|
|
|
// It returns the list of channels for this reactor.
|
|
|
|
func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { |
|
|
|
func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor { |
|
|
|
return []*p2p.ChannelDescriptor{ |
|
|
|
{ |
|
|
|
ID: MempoolChannel, |
|
|
@ -162,20 +150,20 @@ func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { |
|
|
|
|
|
|
|
// AddPeer implements Reactor.
|
|
|
|
// It starts a broadcast routine ensuring all txs are forwarded to the given peer.
|
|
|
|
func (memR *MempoolReactor) AddPeer(peer p2p.Peer) { |
|
|
|
func (memR *Reactor) AddPeer(peer p2p.Peer) { |
|
|
|
memR.ids.ReserveForPeer(peer) |
|
|
|
go memR.broadcastTxRoutine(peer) |
|
|
|
} |
|
|
|
|
|
|
|
// RemovePeer implements Reactor.
|
|
|
|
func (memR *MempoolReactor) RemovePeer(peer p2p.Peer, reason interface{}) { |
|
|
|
func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { |
|
|
|
memR.ids.Reclaim(peer) |
|
|
|
// broadcast routine checks if peer is gone and returns
|
|
|
|
} |
|
|
|
|
|
|
|
// Receive implements Reactor.
|
|
|
|
// It adds any received transactions to the mempool.
|
|
|
|
func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { |
|
|
|
func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { |
|
|
|
msg, err := decodeMsg(msgBytes) |
|
|
|
if err != nil { |
|
|
|
memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) |
|
|
@ -187,7 +175,7 @@ func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { |
|
|
|
switch msg := msg.(type) { |
|
|
|
case *TxMessage: |
|
|
|
peerID := memR.ids.GetForPeer(src) |
|
|
|
err := memR.Mempool.CheckTxWithInfo(msg.Tx, nil, TxInfo{PeerID: peerID}) |
|
|
|
err := memR.mempool.CheckTxWithInfo(msg.Tx, nil, TxInfo{SenderID: peerID}) |
|
|
|
if err != nil { |
|
|
|
memR.Logger.Info("Could not check tx", "tx", txID(msg.Tx), "err", err) |
|
|
|
} |
|
|
@ -203,7 +191,7 @@ type PeerState interface { |
|
|
|
} |
|
|
|
|
|
|
|
// Send new mempool txs to peer.
|
|
|
|
func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { |
|
|
|
func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { |
|
|
|
if !memR.config.Broadcast { |
|
|
|
return |
|
|
|
} |
|
|
@ -220,8 +208,8 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { |
|
|
|
// start from the beginning.
|
|
|
|
if next == nil { |
|
|
|
select { |
|
|
|
case <-memR.Mempool.TxsWaitChan(): // Wait until a tx is available
|
|
|
|
if next = memR.Mempool.TxsFront(); next == nil { |
|
|
|
case <-memR.mempool.TxsWaitChan(): // Wait until a tx is available
|
|
|
|
if next = memR.mempool.TxsFront(); next == nil { |
|
|
|
continue |
|
|
|
} |
|
|
|
case <-peer.Quit(): |
|
|
@ -275,7 +263,7 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { |
|
|
|
//-----------------------------------------------------------------------------
|
|
|
|
// Messages
|
|
|
|
|
|
|
|
// MempoolMessage is a message sent or received by the MempoolReactor.
|
|
|
|
// MempoolMessage is a message sent or received by the Reactor.
|
|
|
|
type MempoolMessage interface{} |
|
|
|
|
|
|
|
func RegisterMempoolMessages(cdc *amino.Codec) { |
|
|
|