Browse Source

Fix mempool

pull/1347/head
Jae Kwon 7 years ago
parent
commit
35a1d747b0
2 changed files with 20 additions and 17 deletions
  1. +9
    -17
      mempool/reactor.go
  2. +11
    -0
      mempool/wire.go

+ 9
- 17
mempool/reactor.go View File

@ -1,13 +1,12 @@
package mempool
import (
"bytes"
"fmt"
"reflect"
"time"
abci "github.com/tendermint/abci/types"
wire "github.com/tendermint/go-wire"
"github.com/tendermint/go-amino"
"github.com/tendermint/tmlibs/clist"
"github.com/tendermint/tmlibs/log"
@ -71,7 +70,7 @@ func (memR *MempoolReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// Receive implements Reactor.
// It adds any received transactions to the mempool.
func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
_, msg, err := DecodeMessage(msgBytes)
msg, err := DecodeMessage(msgBytes)
if err != nil {
memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
memR.Switch.StopPeerForError(src, err)
@ -137,7 +136,7 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
}
// send memTx
msg := &TxMessage{Tx: memTx.tx}
success := peer.Send(MempoolChannel, struct{ MempoolMessage }{msg})
success := peer.Send(MempoolChannel, cdc.MustMarshalBinaryBare(msg))
if !success {
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
continue
@ -158,24 +157,17 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
//-----------------------------------------------------------------------------
// Messages
const (
msgTypeTx = byte(0x01)
)
// MempoolMessage is a message sent or received by the MempoolReactor.
type MempoolMessage interface{}
var _ = wire.RegisterInterface(
struct{ MempoolMessage }{},
wire.ConcreteType{&TxMessage{}, msgTypeTx},
)
func RegisterMempoolMessages(cdc *amino.Codec) {
cdc.RegisterInterface((*MempoolMessage)(nil), nil)
cdc.RegisterConcrete(&TxMessage{}, "tendermint/mempool/TxMessage", nil)
}
// DecodeMessage decodes a byte-array into a MempoolMessage.
func DecodeMessage(bz []byte) (msgType byte, msg MempoolMessage, err error) {
msgType = bz[0]
n := new(int)
r := bytes.NewReader(bz)
msg = wire.ReadBinary(struct{ MempoolMessage }{}, r, maxMempoolMessageSize, n, &err).(struct{ MempoolMessage }).MempoolMessage
func DecodeMessage(bz []byte) (msg MempoolMessage, err error) {
err = cdc.UnmarshalBinaryBare(bz, &msg)
return
}


+ 11
- 0
mempool/wire.go View File

@ -0,0 +1,11 @@
package mempool
import (
"github.com/tendermint/go-amino"
)
var cdc = amino.NewCodec()
func init() {
RegisterMempoolMessages(cdc)
}

Loading…
Cancel
Save