From 0da7d873514ce84533d3864a057ce5c6e61f09ca Mon Sep 17 00:00:00 2001 From: Marko Date: Wed, 3 Jun 2020 07:30:52 +0200 Subject: [PATCH] proto: move mempool to proto (#4940) * proto: move mempool to proto - changes according to moving the mempool reactor to proto Signed-off-by: Marko Baricevic Closes: #2883 --- CHANGELOG_PENDING.md | 1 + docs/app-dev/getting-started.md | 7 ++-- mempool/clist_mempool.go | 7 ++-- mempool/clist_mempool_test.go | 50 ++++++++++++--------------- mempool/codec.go | 11 ------ mempool/mempool.go | 15 +++----- mempool/reactor.go | 61 +++++++++++++++------------------ state/tx_filter.go | 2 +- types/block.go | 1 - types/tx.go | 17 --------- types/tx_test.go | 57 ------------------------------ 11 files changed, 62 insertions(+), 167 deletions(-) delete mode 100644 mempool/codec.go diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 266efe99f..79241726f 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -29,6 +29,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi - [types] [\#4792](https://github.com/tendermint/tendermint/pull/4792) Sort validators by voting power to enable faster commit verification (@melekes) - [evidence] [\#4780](https://github.com/tendermint/tendermint/pull/4780) Cap evidence to an absolute number (@cmwaters) Add `max_num` to consensus evidence parameters (default: 50 items). + - [mempool] \#4940 Migrate mempool from amino binary encoding to Protobuf ### FEATURES: diff --git a/docs/app-dev/getting-started.md b/docs/app-dev/getting-started.md index 4a83b0ad2..531012833 100644 --- a/docs/app-dev/getting-started.md +++ b/docs/app-dev/getting-started.md @@ -23,6 +23,7 @@ using Tendermint. The first apps we will work with are written in Go. To install them, you need to [install Go](https://golang.org/doc/install), put `$GOPATH/bin` in your `$PATH` and enable go modules with these instructions: + ```bash echo export GOPATH=\"\$HOME/go\" >> ~/.bash_profile echo export PATH=\"\$PATH:\$GOPATH/bin\" >> ~/.bash_profile @@ -58,9 +59,9 @@ Let's start a kvstore application. abci-cli kvstore ``` -In another terminal, we can start Tendermint. You should already have the -Tendermint binary installed. If not, follow the steps from -[here](../introduction/install.md). If you have never run Tendermint +In another terminal, we can start Tendermint. You should already have the +Tendermint binary installed. If not, follow the steps from +[here](../introduction/install.md). If you have never run Tendermint before, use: ``` diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index d73105165..36c26fa75 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -234,7 +234,7 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx return err } - // The size of the corresponding amino-encoded TxMessage + // The size of the corresponding TxMessage // can't be larger than the maxMsgSize, otherwise we can't // relay it to peers. if txSize > mem.config.MaxTxBytes { @@ -518,11 +518,10 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { for e := mem.txs.Front(); e != nil; e = e.Next() { memTx := e.Value.(*mempoolTx) // Check total size requirement - aminoOverhead := types.ComputeAminoOverhead(memTx.tx, 1) - if maxBytes > -1 && totalBytes+int64(len(memTx.tx))+aminoOverhead > maxBytes { + if maxBytes > -1 && totalBytes+int64(len(memTx.tx)) > maxBytes { return txs } - totalBytes += int64(len(memTx.tx)) + aminoOverhead + totalBytes += int64(len(memTx.tx)) // Check total gas requirement. // If maxGas is negative, skip this check. // Since newTotalGas < masGas, which diff --git a/mempool/clist_mempool_test.go b/mempool/clist_mempool_test.go index 17ab83f33..245fda53d 100644 --- a/mempool/clist_mempool_test.go +++ b/mempool/clist_mempool_test.go @@ -12,11 +12,11 @@ import ( "testing" "time" + "github.com/gogo/protobuf/proto" + gogotypes "github.com/gogo/protobuf/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - amino "github.com/tendermint/go-amino" - "github.com/tendermint/tendermint/abci/example/counter" "github.com/tendermint/tendermint/abci/example/kvstore" abciserver "github.com/tendermint/tendermint/abci/server" @@ -107,7 +107,7 @@ func TestReapMaxBytesMaxGas(t *testing.T) { mempool.Flush() // each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs. - // each tx has 20 bytes + amino overhead = 21 bytes, 1 gas + // each tx has 20 bytes tests := []struct { numTxsToCreate int maxBytes int64 @@ -121,11 +121,11 @@ func TestReapMaxBytesMaxGas(t *testing.T) { {20, 0, -1, 0}, {20, 0, 10, 0}, {20, 10, 10, 0}, - {20, 22, 10, 1}, - {20, 220, -1, 10}, - {20, 220, 5, 5}, - {20, 220, 10, 10}, - {20, 220, 15, 10}, + {20, 20, 10, 1}, + {20, 100, 5, 5}, + {20, 200, -1, 10}, + {20, 200, 10, 10}, + {20, 200, 15, 10}, {20, 20000, -1, 20}, {20, 20000, 5, 5}, {20, 20000, 30, 20}, @@ -150,7 +150,7 @@ func TestMempoolFilters(t *testing.T) { nopPostFilter := func(tx types.Tx, res *abci.ResponseCheckTx) error { return nil } // each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs. - // each tx has 20 bytes + amino overhead = 21 bytes, 1 gas + // each tx has 20 bytes tests := []struct { numTxsToCreate int preFilter PreCheckFunc @@ -158,17 +158,16 @@ func TestMempoolFilters(t *testing.T) { expectedNumTxs int }{ {10, nopPreFilter, nopPostFilter, 10}, - {10, PreCheckAminoMaxBytes(10), nopPostFilter, 0}, - {10, PreCheckAminoMaxBytes(20), nopPostFilter, 0}, - {10, PreCheckAminoMaxBytes(22), nopPostFilter, 10}, + {10, PreCheckMaxBytes(10), nopPostFilter, 0}, + {10, PreCheckMaxBytes(20), nopPostFilter, 10}, {10, nopPreFilter, PostCheckMaxGas(-1), 10}, {10, nopPreFilter, PostCheckMaxGas(0), 0}, {10, nopPreFilter, PostCheckMaxGas(1), 10}, {10, nopPreFilter, PostCheckMaxGas(3000), 10}, - {10, PreCheckAminoMaxBytes(10), PostCheckMaxGas(20), 0}, - {10, PreCheckAminoMaxBytes(30), PostCheckMaxGas(20), 10}, - {10, PreCheckAminoMaxBytes(22), PostCheckMaxGas(1), 10}, - {10, PreCheckAminoMaxBytes(22), PostCheckMaxGas(0), 0}, + {10, PreCheckMaxBytes(10), PostCheckMaxGas(20), 0}, + {10, PreCheckMaxBytes(30), PostCheckMaxGas(20), 10}, + {10, PreCheckMaxBytes(20), PostCheckMaxGas(1), 10}, + {10, PreCheckMaxBytes(20), PostCheckMaxGas(0), 0}, } for tcIndex, tt := range tests { mempool.Update(1, emptyTxArr, abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter, tt.postFilter) @@ -414,13 +413,6 @@ func TestMempoolCloseWAL(t *testing.T) { require.Equal(t, 1, len(m3), "expecting the wal match in") } -// Size of the amino encoded TxMessage is the length of the -// encoded byte array, plus 1 for the struct field, plus 4 -// for the amino prefix. -func txMessageSize(tx types.Tx) int { - return amino.ByteSliceSize(tx) + 1 + 4 -} - func TestMempoolMaxMsgSize(t *testing.T) { app := kvstore.NewApplication() cc := proxy.NewLocalClientCreator(app) @@ -458,18 +450,18 @@ func TestMempoolMaxMsgSize(t *testing.T) { tx := tmrand.Bytes(testCase.len) err := mempl.CheckTx(tx, nil, TxInfo{}) - msg := &TxMessage{tx} - encoded := cdc.MustMarshalBinaryBare(msg) - require.Equal(t, len(encoded), txMessageSize(tx), caseString) + bv := gogotypes.BytesValue{Value: tx} + bz, err2 := bv.Marshal() + require.NoError(t, err2) + require.Equal(t, len(bz), proto.Size(&bv), caseString) if !testCase.err { - require.True(t, len(encoded) <= maxMsgSize, caseString) + require.True(t, len(bz) <= maxMsgSize, caseString) require.NoError(t, err, caseString) } else { - require.True(t, len(encoded) > maxMsgSize, caseString) + require.True(t, len(bz) > maxMsgSize, caseString) require.Equal(t, err, ErrTxTooLarge{maxTxSize, testCase.len}, caseString) } } - } func TestMempoolTxsBytes(t *testing.T) { diff --git a/mempool/codec.go b/mempool/codec.go deleted file mode 100644 index 9647e8c2c..000000000 --- a/mempool/codec.go +++ /dev/null @@ -1,11 +0,0 @@ -package mempool - -import ( - amino "github.com/tendermint/go-amino" -) - -var cdc = amino.NewCodec() - -func init() { - RegisterMessages(cdc) -} diff --git a/mempool/mempool.go b/mempool/mempool.go index 68eec8674..3df88a676 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -102,19 +102,12 @@ type TxInfo struct { //-------------------------------------------------------------------------------- -// PreCheckAminoMaxBytes checks that the size of the transaction plus the amino -// overhead is smaller or equal to the expected maxBytes. -func PreCheckAminoMaxBytes(maxBytes int64) PreCheckFunc { +// PreCheckMaxBytes checks that the size of the transaction is smaller or equal to the expected maxBytes. +func PreCheckMaxBytes(maxBytes int64) PreCheckFunc { return func(tx types.Tx) error { - // We have to account for the amino overhead in the tx size as well - // NOTE: fieldNum = 1 as types.Block.Data contains Txs []Tx as first field. - // If this field order ever changes this needs to updated here accordingly. - // NOTE: if some []Tx are encoded without a parenting struct, the - // fieldNum is also equal to 1. - aminoOverhead := types.ComputeAminoOverhead(tx, 1) - txSize := int64(len(tx)) + aminoOverhead + txSize := int64(len(tx)) if txSize > maxBytes { - return fmt.Errorf("tx size (including amino overhead) is too big: %d, max: %d", + return fmt.Errorf("tx size is too big: %d, max: %d", txSize, maxBytes) } return nil diff --git a/mempool/reactor.go b/mempool/reactor.go index bf34b2615..4ca4bee5e 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -3,12 +3,10 @@ package mempool import ( "fmt" "math" - "reflect" "sync" "time" - amino "github.com/tendermint/go-amino" - + gogotypes "github.com/gogo/protobuf/types" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/clist" "github.com/tendermint/tendermint/libs/log" @@ -19,7 +17,7 @@ import ( const ( MempoolChannel = byte(0x30) - aminoOverheadForTxMessage = 8 + protoOverheadForTxMessage = 4 peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount @@ -172,20 +170,15 @@ func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { } memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg) - switch msg := msg.(type) { - case *TxMessage: - txInfo := TxInfo{SenderID: memR.ids.GetForPeer(src)} - if src != nil { - txInfo.SenderP2PID = src.ID() - } - err := memR.mempool.CheckTx(msg.Tx, nil, txInfo) - if err != nil { - memR.Logger.Info("Could not check tx", "tx", txID(msg.Tx), "err", err) - } - // broadcasting happens from go routines per peer - default: - memR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) + txInfo := TxInfo{SenderID: memR.ids.GetForPeer(src)} + if src != nil { + txInfo.SenderP2PID = src.ID() } + err = memR.mempool.CheckTx(msg.Tx, nil, txInfo) + if err != nil { + memR.Logger.Info("Could not check tx", "tx", txID(msg.Tx), "err", err) + } + // broadcasting happens from go routines per peer } // PeerState describes the state of a peer. @@ -238,9 +231,12 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { // ensure peer hasn't already sent us this tx if _, ok := memTx.senders.Load(peerID); !ok { - // send memTx - msg := &TxMessage{Tx: memTx.tx} - success := peer.Send(MempoolChannel, cdc.MustMarshalBinaryBare(msg)) + msg := gogotypes.BytesValue{Value: []byte(memTx.tx)} + bz, err := msg.Marshal() + if err != nil { + panic(err) + } + success := peer.Send(MempoolChannel, bz) if !success { time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) continue @@ -262,17 +258,16 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { //----------------------------------------------------------------------------- // Messages -// Message is a message sent or received by the Reactor. -type Message interface{} - -func RegisterMessages(cdc *amino.Codec) { - cdc.RegisterInterface((*Message)(nil), nil) - cdc.RegisterConcrete(&TxMessage{}, "tendermint/mempool/TxMessage", nil) -} - -func (memR *Reactor) decodeMsg(bz []byte) (msg Message, err error) { - err = cdc.UnmarshalBinaryBare(bz, &msg) - return +func (memR *Reactor) decodeMsg(bz []byte) (TxMessage, error) { + msg := gogotypes.BytesValue{} + err := msg.Unmarshal(bz) + if err != nil { + return TxMessage{}, err + } + txMsg := TxMessage{ + Tx: types.Tx(msg.Value), + } + return txMsg, err } //------------------------------------- @@ -288,7 +283,7 @@ func (m *TxMessage) String() string { } // calcMaxMsgSize returns the max size of TxMessage -// account for amino overhead of TxMessage +// account for proto overhead of bytesValue func calcMaxMsgSize(maxTxSize int) int { - return maxTxSize + aminoOverheadForTxMessage + return maxTxSize + protoOverheadForTxMessage } diff --git a/state/tx_filter.go b/state/tx_filter.go index 0754be3b1..acc4aad8f 100644 --- a/state/tx_filter.go +++ b/state/tx_filter.go @@ -13,7 +13,7 @@ func TxPreCheck(state State) mempl.PreCheckFunc { state.Validators.Size(), state.ConsensusParams.Evidence.MaxNum, ) - return mempl.PreCheckAminoMaxBytes(maxDataBytes) + return mempl.PreCheckMaxBytes(maxDataBytes) } // TxPostCheck returns a function to filter transactions after processing. diff --git a/types/block.go b/types/block.go index 6bdc4bdf1..c4c9dacfa 100644 --- a/types/block.go +++ b/types/block.go @@ -26,7 +26,6 @@ const ( // MaxAminoOverheadForBlock - maximum amino overhead to encode a block (up to // MaxBlockSizeBytes in size) not including it's parts except Data. // This means it also excludes the overhead for individual transactions. - // To compute individual transactions' overhead use types.ComputeAminoOverhead(tx types.Tx, fieldNum int). // // Uvarint length of MaxBlockSizeBytes: 4 bytes // 2 fields (2 embedded): 2 bytes diff --git a/types/tx.go b/types/tx.go index 311730228..ffaa27f12 100644 --- a/types/tx.go +++ b/types/tx.go @@ -5,8 +5,6 @@ import ( "errors" "fmt" - amino "github.com/tendermint/go-amino" - abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/crypto/merkle" "github.com/tendermint/tendermint/crypto/tmhash" @@ -121,18 +119,3 @@ type TxResult struct { Tx Tx `json:"tx"` Result abci.ResponseDeliverTx `json:"result"` } - -// ComputeAminoOverhead calculates the overhead for amino encoding a transaction. -// The overhead consists of varint encoding the field number and the wire type -// (= length-delimited = 2), and another varint encoding the length of the -// transaction. -// The field number can be the field number of the particular transaction, or -// the field number of the parenting struct that contains the transactions []Tx -// as a field (this field number is repeated for each contained Tx). -// If some []Tx are encoded directly (without a parenting struct), the default -// fieldNum is also 1 (see BinFieldNum in amino.MarshalBinaryBare). -func ComputeAminoOverhead(tx Tx, fieldNum int) int64 { - fnum := uint64(fieldNum) - typ3AndFieldNum := (fnum << 3) | uint64(amino.Typ3_ByteLength) - return int64(amino.UvarintSize(typ3AndFieldNum)) + int64(amino.UvarintSize(uint64(len(tx)))) -} diff --git a/types/tx_test.go b/types/tx_test.go index 84fe1f2fc..6bf190aed 100644 --- a/types/tx_test.go +++ b/types/tx_test.go @@ -95,63 +95,6 @@ func TestTxProofUnchangable(t *testing.T) { } } -func TestComputeTxsOverhead(t *testing.T) { - cases := []struct { - txs Txs - wantOverhead int - }{ - {Txs{[]byte{6, 6, 6, 6, 6, 6}}, 2}, - // one 21 Mb transaction: - {Txs{make([]byte, 22020096)}, 5}, - // two 21Mb/2 sized transactions: - {Txs{make([]byte, 11010048), make([]byte, 11010048)}, 10}, - {Txs{[]byte{1, 2, 3}, []byte{1, 2, 3}, []byte{4, 5, 6}}, 6}, - {Txs{[]byte{100, 5, 64}, []byte{42, 116, 118}, []byte{6, 6, 6}, []byte{6, 6, 6}}, 8}, - } - - for _, tc := range cases { - totalBytes := int64(0) - totalOverhead := int64(0) - for _, tx := range tc.txs { - aminoOverhead := ComputeAminoOverhead(tx, 1) - totalOverhead += aminoOverhead - totalBytes += aminoOverhead + int64(len(tx)) - } - bz, err := cdc.MarshalBinaryBare(tc.txs) - assert.EqualValues(t, tc.wantOverhead, totalOverhead) - assert.NoError(t, err) - assert.EqualValues(t, len(bz), totalBytes) - } -} - -func TestComputeAminoOverhead(t *testing.T) { - cases := []struct { - tx Tx - fieldNum int - want int - }{ - {[]byte{6, 6, 6}, 1, 2}, - {[]byte{6, 6, 6}, 16, 3}, - {[]byte{6, 6, 6}, 32, 3}, - {[]byte{6, 6, 6}, 64, 3}, - {[]byte{6, 6, 6}, 512, 3}, - {[]byte{6, 6, 6}, 1024, 3}, - {[]byte{6, 6, 6}, 2048, 4}, - {make([]byte, 64), 1, 2}, - {make([]byte, 65), 1, 2}, - {make([]byte, 127), 1, 2}, - {make([]byte, 128), 1, 3}, - {make([]byte, 256), 1, 3}, - {make([]byte, 512), 1, 3}, - {make([]byte, 1024), 1, 3}, - {make([]byte, 128), 16, 4}, - } - for _, tc := range cases { - got := ComputeAminoOverhead(tc.tx, tc.fieldNum) - assert.EqualValues(t, tc.want, got) - } -} - func testTxProofUnchangable(t *testing.T) { // make some proof txs := makeTxs(randInt(2, 100), randInt(16, 128))