|
|
@ -19,8 +19,7 @@ import ( |
|
|
|
const ( |
|
|
|
MempoolChannel = byte(0x30) |
|
|
|
|
|
|
|
maxMsgSize = 1048576 // 1MB TODO make it configurable
|
|
|
|
maxTxSize = maxMsgSize - 8 // account for amino overhead of TxMessage
|
|
|
|
aminoOverheadForTxMessage = 8 |
|
|
|
|
|
|
|
peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
|
|
|
|
|
|
|
@ -156,7 +155,7 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { |
|
|
|
// Receive implements Reactor.
|
|
|
|
// It adds any received transactions to the mempool.
|
|
|
|
func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { |
|
|
|
msg, err := decodeMsg(msgBytes) |
|
|
|
msg, err := memR.decodeMsg(msgBytes) |
|
|
|
if err != nil { |
|
|
|
memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) |
|
|
|
memR.Switch.StopPeerForError(src, err) |
|
|
@ -263,9 +262,9 @@ func RegisterMempoolMessages(cdc *amino.Codec) { |
|
|
|
cdc.RegisterConcrete(&TxMessage{}, "tendermint/mempool/TxMessage", nil) |
|
|
|
} |
|
|
|
|
|
|
|
func decodeMsg(bz []byte) (msg MempoolMessage, err error) { |
|
|
|
if len(bz) > maxMsgSize { |
|
|
|
return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", len(bz), maxMsgSize) |
|
|
|
func (memR *Reactor) decodeMsg(bz []byte) (msg MempoolMessage, err error) { |
|
|
|
if l := len(bz); l > memR.config.MaxMsgBytes { |
|
|
|
return msg, ErrTxTooLarge{memR.config.MaxMsgBytes, l} |
|
|
|
} |
|
|
|
err = cdc.UnmarshalBinaryBare(bz, &msg) |
|
|
|
return |
|
|
@ -282,3 +281,9 @@ type TxMessage struct { |
|
|
|
func (m *TxMessage) String() string { |
|
|
|
return fmt.Sprintf("[TxMessage %v]", m.Tx) |
|
|
|
} |
|
|
|
|
|
|
|
// calcMaxTxSize returns the max size of Tx
|
|
|
|
// account for amino overhead of TxMessage
|
|
|
|
func calcMaxTxSize(maxMsgSize int) int { |
|
|
|
return maxMsgSize - aminoOverheadForTxMessage |
|
|
|
} |