package mempool import ( "bytes" "fmt" "reflect" "time" "github.com/tendermint/go-clist" . "github.com/tendermint/go-common" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/types" ) const ( MempoolChannel = byte(0x30) maxMempoolMessageSize = 1048576 // 1MB TODO make it configurable peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount ) // MempoolReactor handles mempool tx broadcasting amongst peers. type MempoolReactor struct { p2p.BaseReactor Mempool *Mempool // TODO: un-expose evsw events.Fireable } func NewMempoolReactor(mempool *Mempool) *MempoolReactor { memR := &MempoolReactor{ Mempool: mempool, } memR.BaseReactor = *p2p.NewBaseReactor(log, "MempoolReactor", memR) return memR } // Implements Reactor func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { return []*p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{ ID: MempoolChannel, Priority: 5, }, } } // Implements Reactor func (memR *MempoolReactor) AddPeer(peer *p2p.Peer) { go memR.broadcastTxRoutine(peer) } // Implements Reactor func (memR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { // broadcast routine checks if peer is gone and returns } // Implements Reactor func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { _, msg, err := DecodeMessage(msgBytes) if err != nil { log.Warn("Error decoding message", "error", err) return } log.Notice("MempoolReactor received message", "msg", msg) switch msg := msg.(type) { case *TxMessage: err := memR.Mempool.AppendTx(msg.Tx) if err != nil { // Bad, seen, or conflicting tx. log.Info("Could not add tx", "tx", msg.Tx) return } else { log.Info("Added valid tx", "tx", msg.Tx) } // broadcasting happens from go routines per peer default: log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) } } // Just an alias for AppendTx since broadcasting happens in peer routines func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error { return memR.Mempool.AppendTx(tx) } type PeerState interface { GetHeight() int } type Peer interface { IsRunning() bool Send(byte, interface{}) bool Get(string) interface{} } // Send new mempool txs to peer. // TODO: Handle mempool or reactor shutdown? // As is this routine may block forever if no new txs come in. func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) { var next *clist.CElement for { if !memR.IsRunning() { return // Quit! } if next == nil { // This happens because the CElement we were looking at got // garbage collected (removed). That is, .NextWait() returned nil. // Go ahead and start from the beginning. next = memR.Mempool.TxsFrontWait() // Wait until a tx is available } memTx := next.Value.(*mempoolTx) // make sure the peer is up to date height := memTx.Height() if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil { peerState := peerState_i.(PeerState) if peerState.GetHeight() < height-1 { // Allow for a lag of 1 block time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) continue } } // send memTx msg := &TxMessage{Tx: memTx.tx} success := peer.Send(MempoolChannel, msg) if !success { time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) continue } next = next.NextWait() continue } } // implements events.Eventable func (memR *MempoolReactor) SetFireable(evsw events.Fireable) { memR.evsw = evsw } //----------------------------------------------------------------------------- // Messages const ( msgTypeTx = byte(0x01) ) type MempoolMessage interface{} var _ = wire.RegisterInterface( struct{ MempoolMessage }{}, wire.ConcreteType{&TxMessage{}, msgTypeTx}, ) func DecodeMessage(bz []byte) (msgType byte, msg MempoolMessage, err error) { msgType = bz[0] n := new(int) r := bytes.NewReader(bz) msg = wire.ReadBinary(struct{ MempoolMessage }{}, r, maxMempoolMessageSize, n, &err).(struct{ MempoolMessage }).MempoolMessage return } //------------------------------------- type TxMessage struct { Tx types.Tx } func (m *TxMessage) String() string { return fmt.Sprintf("[TxMessage %v]", m.Tx) }