package mempool import ( "fmt" "math" "reflect" "sync" "time" amino "github.com/tendermint/go-amino" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" ) const ( MempoolChannel = byte(0x30) maxMsgSize = 1048576 // 1MB TODO make it configurable maxTxSize = maxMsgSize - 8 // account for amino overhead of TxMessage peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount // UnknownPeerID is the peer ID to use when running CheckTx when there is // no peer (e.g. RPC) UnknownPeerID uint16 = 0 maxActiveIDs = math.MaxUint16 ) // Reactor handles mempool tx broadcasting amongst peers. // It maintains a map from peer ID to counter, to prevent gossiping txs to the // peers you received it from. type Reactor struct { p2p.BaseReactor config *cfg.MempoolConfig mempool Mempool ids *mempoolIDs // txs to send to peers txs []Tx } type mempoolIDs struct { mtx sync.RWMutex peerMap map[p2p.ID]uint16 nextID uint16 // assumes that a node will never have over 65536 active peers activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter } // Reserve searches for the next unused ID and assignes it to the // peer. func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) { ids.mtx.Lock() defer ids.mtx.Unlock() curID := ids.nextPeerID() ids.peerMap[peer.ID()] = curID ids.activeIDs[curID] = struct{}{} } // nextPeerID returns the next unused peer ID to use. // This assumes that ids's mutex is already locked. func (ids *mempoolIDs) nextPeerID() uint16 { if len(ids.activeIDs) == maxActiveIDs { panic(fmt.Sprintf("node has maximum %d active IDs and wanted to get one more", maxActiveIDs)) } _, idExists := ids.activeIDs[ids.nextID] for idExists { ids.nextID++ _, idExists = ids.activeIDs[ids.nextID] } curID := ids.nextID ids.nextID++ return curID } // Reclaim returns the ID reserved for the peer back to unused pool. func (ids *mempoolIDs) Reclaim(peer p2p.Peer) { ids.mtx.Lock() defer ids.mtx.Unlock() removedID, ok := ids.peerMap[peer.ID()] if ok { delete(ids.activeIDs, removedID) delete(ids.peerMap, peer.ID()) } } // GetForPeer returns an ID reserved for the peer. func (ids *mempoolIDs) GetForPeer(peer p2p.Peer) uint16 { ids.mtx.RLock() defer ids.mtx.RUnlock() return ids.peerMap[peer.ID()] } func newMempoolIDs() *mempoolIDs { return &mempoolIDs{ peerMap: make(map[p2p.ID]uint16), activeIDs: map[uint16]struct{}{0: {}}, nextID: 1, // reserve unknownPeerID(0) for mempoolReactor.BroadcastTx } } // NewReactor returns a new Reactor with the given config and mempool. func NewReactor(config *cfg.MempoolConfig, mempool Mempool) *Reactor { memR := &Reactor{ config: config, mempool: mempool, ids: newMempoolIDs(), txs: make([]Tx, 0), } memR.BaseReactor = *p2p.NewBaseReactor("Reactor", memR) if config.Broadcast { mempool.OnNewTx(func(tx Tx) { memR.txs = append(memR.txs, tx) }) } return memR } type mempoolWithLogger interface { SetLogger(log.Logger) } // SetLogger sets the Logger on the reactor and the underlying mempool. func (memR *Reactor) SetLogger(l log.Logger) { memR.Logger = l // set mempoolWithLogger if mempool supports it if mem, ok := memR.mempool.(mempoolWithLogger); ok { mem.SetLogger(l) } } // OnStart implements p2p.BaseReactor. func (memR *Reactor) OnStart() error { if !memR.config.Broadcast { memR.Logger.Info("Tx broadcasting is disabled") } return nil } // GetChannels implements Reactor. // It returns the list of channels for this reactor. func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor { return []*p2p.ChannelDescriptor{ { ID: MempoolChannel, Priority: 5, }, } } // AddPeer implements Reactor. // It starts a broadcast routine ensuring all txs are forwarded to the given peer. func (memR *Reactor) AddPeer(peer p2p.Peer) { memR.ids.ReserveForPeer(peer) go memR.broadcastTxRoutine(0, peer) } // RemovePeer implements Reactor. func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { memR.ids.Reclaim(peer) // broadcast routine checks if peer is gone and returns } // Receive implements Reactor. // It adds any received transactions to the mempool. func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { msg, err := decodeMsg(msgBytes) if err != nil { memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) memR.Switch.StopPeerForError(src, err) return } memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg) switch msg := msg.(type) { case *TxMessage: peerID := memR.ids.GetForPeer(src) err := memR.mempool.CheckTxWithInfo(msg.Tx, nil, TxInfo{SenderID: peerID}) if err != nil { memR.Logger.Info("Could not check tx", "tx", txID(msg.Tx), "err", err) } // broadcasting happens from go routines per peer default: memR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) } } // PeerState describes the state of a peer. type PeerState interface { GetHeight() int64 } func (memR *Reactor) broadcastTxToPeers(tx Tx) { for pc := range memR.txsC { select { case pc <- tx: default: memR.Logger.Error("Peer's queue is full. Tx won't be send", "queue_size", cap(pc), "tx", tx) } } } func (memR *Reactor) broadcastTxRoutine(pc <-chan Tx, peer p2p.Peer) { for { select { case tx := <-pc: // Try to send the tx until success OR peer/mempool reactor is stopped. var success bool for !success { if !memR.IsRunning() || !peer.IsRunning() { return } success = memR.sendTxToPeer(tx, peer) } case <-memR.Quit(): return case <-peer.Quit(): return } } } func (memR *Reactor) sendTxToPeer(tx Tx, peer p2p.Peer) (success bool) { peerID := memR.ids.GetForPeer(peer) // make sure the peer is up to date peerState, ok := peer.Get(types.PeerStateKey).(PeerState) if !ok { // Peer does not have a state yet. We set it in the consensus reactor, but // when we add peer in Switch, the order we call reactors#AddPeer is // different every time due to us using a map. Sometimes other reactors // will be initialized before the consensus reactor. We should wait a few // milliseconds and retry. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) return false } if peerState.GetHeight() < tx.Height()-1 { // Allow for a lag of 1 block time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) return false } // ensure peer hasn't already sent us this tx if !tx.HasSender(peerID) { // send memTx msg := &TxMessage{Tx: tx.Raw()} success := peer.Send(MempoolChannel, cdc.MustMarshalBinaryBare(msg)) if !success { time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) return false } } return true } //----------------------------------------------------------------------------- // Messages // MempoolMessage is a message sent or received by the Reactor. type MempoolMessage interface{} func RegisterMempoolMessages(cdc *amino.Codec) { cdc.RegisterInterface((*MempoolMessage)(nil), nil) cdc.RegisterConcrete(&TxMessage{}, "tendermint/mempool/TxMessage", nil) } func decodeMsg(bz []byte) (msg MempoolMessage, err error) { if len(bz) > maxMsgSize { return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", len(bz), maxMsgSize) } err = cdc.UnmarshalBinaryBare(bz, &msg) return } //------------------------------------- // TxMessage is a MempoolMessage containing a transaction. type TxMessage struct { Tx types.Tx } // String returns a string representation of the TxMessage. func (m *TxMessage) String() string { return fmt.Sprintf("[TxMessage %v]", m.Tx) }