From ffe2742a6c47b88c67b32bee9dcf2b8b5b89b9ff Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 23 Sep 2020 14:13:13 +0400 Subject: [PATCH] mempool: batch txs per peer in broadcastTxRoutine (#5321) Closes #625 --- CHANGELOG_PENDING.md | 2 +- config/config.go | 41 +++++--- config/toml.go | 4 + docs/tendermint-core/configuration.md | 4 + go.sum | 2 - mempool/clist_mempool.go | 3 - mempool/clist_mempool_test.go | 27 ++--- mempool/reactor.go | 112 ++++++++++++++------- mempool/reactor_test.go | 49 ++++++++- proto/tendermint/mempool/types.pb.go | 139 +++++++++++++------------- proto/tendermint/mempool/types.proto | 6 +- 11 files changed, 240 insertions(+), 149 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index e9d2c54c9..00899c5ea 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -12,10 +12,10 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi - [rpc/client, rpc/jsonrpc/client] \#5347 All client methods now accept `context.Context` as 1st param (@melekes) - Apps - - [abci] [\#5324](https://github.com/tendermint/tendermint/pull/5324) abci evidence type is an enum with two types of possible evidence (@cmwaters) - P2P Protocol + - [mempool] \#5321 Batch transactions when broadcasting them to peers (@melekes) `MaxBatchBytes` new config setting defines the max size of one batch. - Go API - [evidence] \#5317 Remove ConflictingHeaders evidence type & CompositeEvidence Interface. (@marbar3778) diff --git a/config/config.go b/config/config.go index 044514ed4..74726009d 100644 --- a/config/config.go +++ b/config/config.go @@ -644,14 +644,24 @@ func DefaultFuzzConnConfig() *FuzzConnConfig { // MempoolConfig defines the configuration options for the Tendermint mempool type MempoolConfig struct { - RootDir string `mapstructure:"home"` - Recheck bool `mapstructure:"recheck"` - Broadcast bool `mapstructure:"broadcast"` - WalPath string `mapstructure:"wal_dir"` - Size int `mapstructure:"size"` - MaxTxsBytes int64 `mapstructure:"max_txs_bytes"` - CacheSize int `mapstructure:"cache_size"` - MaxTxBytes int `mapstructure:"max_tx_bytes"` + RootDir string `mapstructure:"home"` + Recheck bool `mapstructure:"recheck"` + Broadcast bool `mapstructure:"broadcast"` + WalPath string `mapstructure:"wal_dir"` + // Maximum number of transactions in the mempool + Size int `mapstructure:"size"` + // Limit the total size of all txs in the mempool. + // This only accounts for raw transactions (e.g. given 1MB transactions and + // max_txs_bytes=5MB, mempool will only accept 5 transactions). + MaxTxsBytes int64 `mapstructure:"max_txs_bytes"` + // Size of the cache (used to filter transactions we saw earlier) in transactions + CacheSize int `mapstructure:"cache_size"` + // Maximum size of a single transaction + // NOTE: the max size of a tx transmitted over the network is {max_tx_bytes}. + MaxTxBytes int `mapstructure:"max_tx_bytes"` + // Maximum size of a batch of transactions to send to a peer + // Including space needed by encoding (one varint per transaction). + MaxBatchBytes int `mapstructure:"max_batch_bytes"` } // DefaultMempoolConfig returns a default configuration for the Tendermint mempool @@ -662,10 +672,11 @@ func DefaultMempoolConfig() *MempoolConfig { WalPath: "", // Each signature verification takes .5ms, Size reduced until we implement // ABCI Recheck - Size: 5000, - MaxTxsBytes: 1024 * 1024 * 1024, // 1GB - CacheSize: 10000, - MaxTxBytes: 1024 * 1024, // 1MB + Size: 5000, + MaxTxsBytes: 1024 * 1024 * 1024, // 1GB + CacheSize: 10000, + MaxTxBytes: 1024 * 1024, // 1MB + MaxBatchBytes: 10 * 1024 * 1024, // 10MB } } @@ -701,6 +712,12 @@ func (cfg *MempoolConfig) ValidateBasic() error { if cfg.MaxTxBytes < 0 { return errors.New("max_tx_bytes can't be negative") } + if cfg.MaxBatchBytes < 0 { + return errors.New("max_batch_bytes can't be negative") + } + if cfg.MaxBatchBytes <= cfg.MaxTxBytes { + return errors.New("max_batch_bytes can't be less or equal to max_tx_bytes") + } return nil } diff --git a/config/toml.go b/config/toml.go index db2f263ba..35de96875 100644 --- a/config/toml.go +++ b/config/toml.go @@ -329,6 +329,10 @@ cache_size = {{ .Mempool.CacheSize }} # NOTE: the max size of a tx transmitted over the network is {max_tx_bytes}. max_tx_bytes = {{ .Mempool.MaxTxBytes }} +# Maximum size of a batch of transactions to send to a peer +# Including space needed by encoding (one varint per transaction). +max_batch_bytes = {{ .Mempool.MaxBatchBytes }} + ####################################################### ### State Sync Configuration Options ### ####################################################### diff --git a/docs/tendermint-core/configuration.md b/docs/tendermint-core/configuration.md index a368f6fc5..1d79e9068 100644 --- a/docs/tendermint-core/configuration.md +++ b/docs/tendermint-core/configuration.md @@ -282,6 +282,10 @@ cache_size = 10000 # NOTE: the max size of a tx transmitted over the network is {max_tx_bytes}. max_tx_bytes = 1048576 +# Maximum size of a batch of transactions to send to a peer +# Including space needed by encoding (one varint per transaction). +max_batch_bytes = 10485760 + ####################################################### ### State Sync Configuration Options ### ####################################################### diff --git a/go.sum b/go.sum index f6ec82d6f..3146c274e 100644 --- a/go.sum +++ b/go.sum @@ -277,8 +277,6 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5 github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/mimoo/StrobeGo v0.0.0-20181016162300-f8f6d4d2b643 h1:hLDRPB66XQT/8+wG9WsDpiCvZf1yKO7sz7scAjSlBa0= github.com/mimoo/StrobeGo v0.0.0-20181016162300-f8f6d4d2b643/go.mod h1:43+3pMjjKimDBf5Kr4ZFNGbLql1zKkbImw+fZbw3geM= -github.com/minio/highwayhash v1.0.0 h1:iMSDhgUILCr0TNm8LWlSjF8N0ZIj2qbO8WHp6Q/J2BA= -github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5dxBpaMbdc= github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 6118ad58a..48a25e795 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -242,9 +242,6 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx return err } - // The size of the corresponding TxMessage - // can't be larger than the maxMsgSize, otherwise we can't - // relay it to peers. if txSize > mem.config.MaxTxBytes { return ErrTxTooLarge{mem.config.MaxTxBytes, txSize} } diff --git a/mempool/clist_mempool_test.go b/mempool/clist_mempool_test.go index 5d7074f14..5c27db5a7 100644 --- a/mempool/clist_mempool_test.go +++ b/mempool/clist_mempool_test.go @@ -420,52 +420,43 @@ func TestMempoolCloseWAL(t *testing.T) { require.Equal(t, 1, len(m3), "expecting the wal match in") } -func TestMempoolMaxMsgSize(t *testing.T) { +func TestMempool_CheckTxChecksTxSize(t *testing.T) { app := kvstore.NewApplication() cc := proxy.NewLocalClientCreator(app) mempl, cleanup := newMempoolWithApp(cc) defer cleanup() maxTxSize := mempl.config.MaxTxBytes - maxMsgSize := calcMaxMsgSize(maxTxSize) testCases := []struct { len int err bool }{ // check small txs. no error - {10, false}, - {1000, false}, - {1000000, false}, + 0: {10, false}, + 1: {1000, false}, + 2: {1000000, false}, // check around maxTxSize - // changes from no error to error - {maxTxSize - 2, false}, - {maxTxSize - 1, false}, - {maxTxSize, false}, - {maxTxSize + 1, true}, - {maxTxSize + 2, true}, - - // check around maxMsgSize. all error - {maxMsgSize - 1, true}, - {maxMsgSize, true}, - {maxMsgSize + 1, true}, + 3: {maxTxSize - 1, false}, + 4: {maxTxSize, false}, + 5: {maxTxSize + 1, true}, } for i, testCase := range testCases { caseString := fmt.Sprintf("case %d, len %d", i, testCase.len) tx := tmrand.Bytes(testCase.len) + err := mempl.CheckTx(tx, nil, TxInfo{}) bv := gogotypes.BytesValue{Value: tx} bz, err2 := bv.Marshal() require.NoError(t, err2) require.Equal(t, len(bz), proto.Size(&bv), caseString) + if !testCase.err { - require.True(t, len(bz) <= maxMsgSize, caseString) require.NoError(t, err, caseString) } else { - require.True(t, len(bz) > maxMsgSize, caseString) require.Equal(t, err, ErrTxTooLarge{maxTxSize, testCase.len}, caseString) } } diff --git a/mempool/reactor.go b/mempool/reactor.go index 6e30e4627..13e4b16f2 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -1,6 +1,7 @@ package mempool import ( + "errors" "fmt" "math" "time" @@ -17,8 +18,6 @@ import ( const ( MempoolChannel = byte(0x30) - protoOverheadForTxMessage = 4 - peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount // UnknownPeerID is the peer ID to use when running CheckTx when there is @@ -132,10 +131,10 @@ func (memR *Reactor) OnStart() error { return nil } -// GetChannels implements Reactor. -// It returns the list of channels for this reactor. +// GetChannels implements Reactor by returning the list of channels for this +// reactor. func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor { - maxMsgSize := calcMaxMsgSize(memR.config.MaxTxBytes) + maxMsgSize := memR.config.MaxBatchBytes return []*p2p.ChannelDescriptor{ { ID: MempoolChannel, @@ -174,9 +173,11 @@ func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { if src != nil { txInfo.SenderP2PID = src.ID() } - err = memR.mempool.CheckTx(msg.Tx, nil, txInfo) - if err != nil { - memR.Logger.Info("Could not check tx", "tx", txID(msg.Tx), "err", err) + for _, tx := range msg.Txs { + err = memR.mempool.CheckTx(tx, nil, txInfo) + if err != nil { + memR.Logger.Info("Could not check tx", "tx", txID(tx), "err", err) + } } // broadcasting happens from go routines per peer } @@ -190,6 +191,7 @@ type PeerState interface { func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { peerID := memR.ids.GetForPeer(peer) var next *clist.CElement + for { // In case of both next.NextWaitChan() and peer.Quit() are variable at the same time if !memR.IsRunning() || !peer.IsRunning() { @@ -211,9 +213,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { } } - memTx := next.Value.(*mempoolTx) - - // make sure the peer is up to date + // Make sure the peer is up to date. peerState, ok := peer.Get(types.PeerStateKey).(PeerState) if !ok { // Peer does not have a state yet. We set it in the consensus reactor, but @@ -224,25 +224,28 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) continue } - if peerState.GetHeight() < memTx.Height()-1 { // Allow for a lag of 1 block + + // Allow for a lag of 1 block. + memTx := next.Value.(*mempoolTx) + if peerState.GetHeight() < memTx.Height()-1 { time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) continue } - // ensure peer hasn't already sent us this tx - if _, ok := memTx.senders.Load(peerID); !ok { + txs := memR.txs(next, peerID, peerState.GetHeight()) // WARNING: mutates next! + + // send txs + if len(txs) > 0 { msg := protomem.Message{ - Sum: &protomem.Message_Tx{ - Tx: &protomem.Tx{ - Tx: []byte(memTx.tx), - }, + Sum: &protomem.Message_Txs{ + Txs: &protomem.Txs{Txs: txs}, }, } - bz, err := msg.Marshal() if err != nil { panic(err) } + memR.Logger.Debug("Sending N txs to peer", "N", len(txs), "peer", peer) success := peer.Send(MempoolChannel, bz) if !success { time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) @@ -262,21 +265,62 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { } } +// txs iterates over the transaction list and builds a batch of txs. next is +// included. +// WARNING: mutates next! +func (memR *Reactor) txs(next *clist.CElement, peerID uint16, peerHeight int64) [][]byte { + batch := make([][]byte, 0) + + for { + memTx := next.Value.(*mempoolTx) + + if _, ok := memTx.senders.Load(peerID); !ok { + // If current batch + this tx size is greater than max => return. + batchMsg := protomem.Message{ + Sum: &protomem.Message_Txs{ + Txs: &protomem.Txs{Txs: append(batch, memTx.tx)}, + }, + } + if batchMsg.Size() > memR.config.MaxBatchBytes { + return batch + } + + batch = append(batch, memTx.tx) + } + + if next.Next() == nil { + return batch + } + next = next.Next() + } +} + //----------------------------------------------------------------------------- // Messages -func (memR *Reactor) decodeMsg(bz []byte) (TxMessage, error) { +func (memR *Reactor) decodeMsg(bz []byte) (TxsMessage, error) { msg := protomem.Message{} err := msg.Unmarshal(bz) if err != nil { - return TxMessage{}, err + return TxsMessage{}, err } - var message TxMessage + var message TxsMessage + + if i, ok := msg.Sum.(*protomem.Message_Txs); ok { + txs := i.Txs.GetTxs() + + if len(txs) == 0 { + return message, errors.New("empty TxsMessage") + } + + decoded := make([]types.Tx, len(txs)) + for j, tx := range txs { + decoded[j] = types.Tx(tx) + } - if i, ok := msg.Sum.(*protomem.Message_Tx); ok { - message = TxMessage{ - Tx: types.Tx(i.Tx.GetTx()), + message = TxsMessage{ + Txs: decoded, } return message, nil } @@ -285,18 +329,12 @@ func (memR *Reactor) decodeMsg(bz []byte) (TxMessage, error) { //------------------------------------- -// TxMessage is a Message containing a transaction. -type TxMessage struct { - Tx types.Tx -} - -// String returns a string representation of the TxMessage. -func (m *TxMessage) String() string { - return fmt.Sprintf("[TxMessage %v]", m.Tx) +// TxsMessage is a Message containing transactions. +type TxsMessage struct { + Txs []types.Tx } -// calcMaxMsgSize returns the max size of TxMessage -// account for proto overhead of bytesValue -func calcMaxMsgSize(maxTxSize int) int { - return maxTxSize + protoOverheadForTxMessage +// String returns a string representation of the TxsMessage. +func (m *TxsMessage) String() string { + return fmt.Sprintf("[TxsMessage %v]", m.Txs) } diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index 071800185..3bc5597d9 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -16,6 +16,7 @@ import ( "github.com/tendermint/tendermint/abci/example/kvstore" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/log" + tmrand "github.com/tendermint/tendermint/libs/rand" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p/mock" memproto "github.com/tendermint/tendermint/proto/tendermint/mempool" @@ -38,7 +39,7 @@ func (ps peerState) GetHeight() int64 { // Send a bunch of txs to the first reactor's mempool and wait for them all to // be received in the others. -func TestReactorBroadcastTxMessage(t *testing.T) { +func TestReactorBroadcastTxsMessage(t *testing.T) { config := cfg.TestConfig() // if there were more than two reactors, the order of transactions could not be // asserted in waitForTxsOnReactors (due to transactions gossiping). If we @@ -87,6 +88,46 @@ func TestReactorNoBroadcastToSender(t *testing.T) { ensureNoTxs(t, reactors[peerID], 100*time.Millisecond) } +func TestReactor_MaxBatchBytes(t *testing.T) { + config := cfg.TestConfig() + config.Mempool.MaxBatchBytes = 1024 + + const N = 2 + reactors := makeAndConnectReactors(config, N) + defer func() { + for _, r := range reactors { + if err := r.Stop(); err != nil { + assert.NoError(t, err) + } + } + }() + for _, r := range reactors { + for _, peer := range r.Switch.Peers().List() { + peer.Set(types.PeerStateKey, peerState{1}) + } + } + + // Broadcast a tx, which has the max size (minus proto overhead) + // => ensure it's received by the second reactor. + tx1 := tmrand.Bytes(1018) + err := reactors[0].mempool.CheckTx(tx1, nil, TxInfo{SenderID: UnknownPeerID}) + require.NoError(t, err) + waitForTxsOnReactors(t, []types.Tx{tx1}, reactors) + + reactors[0].mempool.Flush() + reactors[1].mempool.Flush() + + // Broadcast a tx, which is beyond the max size + // => ensure it's not sent + tx2 := tmrand.Bytes(1020) + err = reactors[0].mempool.CheckTx(tx2, nil, TxInfo{SenderID: UnknownPeerID}) + require.NoError(t, err) + ensureNoTxs(t, reactors[1], 100*time.Millisecond) + // => ensure the second reactor did not disconnect from us + out, in, _ := reactors[1].Switch.NumPeers() + assert.Equal(t, 1, out+in) +} + func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) { if testing.Short() { t.Skip("skipping test in short mode.") @@ -266,7 +307,6 @@ func ensureNoTxs(t *testing.T, reactor *Reactor, timeout time.Duration) { } func TestMempoolVectors(t *testing.T) { - testCases := []struct { testName string tx []byte @@ -280,8 +320,8 @@ func TestMempoolVectors(t *testing.T) { tc := tc msg := memproto.Message{ - Sum: &memproto.Message_Tx{ - Tx: &memproto.Tx{Tx: tc.tx}, + Sum: &memproto.Message_Txs{ + Txs: &memproto.Txs{Txs: [][]byte{tc.tx}}, }, } bz, err := msg.Marshal() @@ -289,5 +329,4 @@ func TestMempoolVectors(t *testing.T) { require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName) } - } diff --git a/proto/tendermint/mempool/types.pb.go b/proto/tendermint/mempool/types.pb.go index a40e49d16..3487652bc 100644 --- a/proto/tendermint/mempool/types.pb.go +++ b/proto/tendermint/mempool/types.pb.go @@ -22,22 +22,22 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package -type Tx struct { - Tx []byte `protobuf:"bytes,1,opt,name=tx,proto3" json:"tx,omitempty"` +type Txs struct { + Txs [][]byte `protobuf:"bytes,1,rep,name=txs,proto3" json:"txs,omitempty"` } -func (m *Tx) Reset() { *m = Tx{} } -func (m *Tx) String() string { return proto.CompactTextString(m) } -func (*Tx) ProtoMessage() {} -func (*Tx) Descriptor() ([]byte, []int) { +func (m *Txs) Reset() { *m = Txs{} } +func (m *Txs) String() string { return proto.CompactTextString(m) } +func (*Txs) ProtoMessage() {} +func (*Txs) Descriptor() ([]byte, []int) { return fileDescriptor_2af51926fdbcbc05, []int{0} } -func (m *Tx) XXX_Unmarshal(b []byte) error { +func (m *Txs) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } -func (m *Tx) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { +func (m *Txs) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { - return xxx_messageInfo_Tx.Marshal(b, m, deterministic) + return xxx_messageInfo_Txs.Marshal(b, m, deterministic) } else { b = b[:cap(b)] n, err := m.MarshalToSizedBuffer(b) @@ -47,28 +47,28 @@ func (m *Tx) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return b[:n], nil } } -func (m *Tx) XXX_Merge(src proto.Message) { - xxx_messageInfo_Tx.Merge(m, src) +func (m *Txs) XXX_Merge(src proto.Message) { + xxx_messageInfo_Txs.Merge(m, src) } -func (m *Tx) XXX_Size() int { +func (m *Txs) XXX_Size() int { return m.Size() } -func (m *Tx) XXX_DiscardUnknown() { - xxx_messageInfo_Tx.DiscardUnknown(m) +func (m *Txs) XXX_DiscardUnknown() { + xxx_messageInfo_Txs.DiscardUnknown(m) } -var xxx_messageInfo_Tx proto.InternalMessageInfo +var xxx_messageInfo_Txs proto.InternalMessageInfo -func (m *Tx) GetTx() []byte { +func (m *Txs) GetTxs() [][]byte { if m != nil { - return m.Tx + return m.Txs } return nil } type Message struct { // Types that are valid to be assigned to Sum: - // *Message_Tx + // *Message_Txs Sum isMessage_Sum `protobuf_oneof:"sum"` } @@ -111,11 +111,11 @@ type isMessage_Sum interface { Size() int } -type Message_Tx struct { - Tx *Tx `protobuf:"bytes,1,opt,name=tx,proto3,oneof" json:"tx,omitempty"` +type Message_Txs struct { + Txs *Txs `protobuf:"bytes,1,opt,name=txs,proto3,oneof" json:"txs,omitempty"` } -func (*Message_Tx) isMessage_Sum() {} +func (*Message_Txs) isMessage_Sum() {} func (m *Message) GetSum() isMessage_Sum { if m != nil { @@ -124,9 +124,9 @@ func (m *Message) GetSum() isMessage_Sum { return nil } -func (m *Message) GetTx() *Tx { - if x, ok := m.GetSum().(*Message_Tx); ok { - return x.Tx +func (m *Message) GetTxs() *Txs { + if x, ok := m.GetSum().(*Message_Txs); ok { + return x.Txs } return nil } @@ -134,33 +134,34 @@ func (m *Message) GetTx() *Tx { // XXX_OneofWrappers is for the internal use of the proto package. func (*Message) XXX_OneofWrappers() []interface{} { return []interface{}{ - (*Message_Tx)(nil), + (*Message_Txs)(nil), } } func init() { - proto.RegisterType((*Tx)(nil), "tendermint.mempool.Tx") + proto.RegisterType((*Txs)(nil), "tendermint.mempool.Txs") proto.RegisterType((*Message)(nil), "tendermint.mempool.Message") } func init() { proto.RegisterFile("tendermint/mempool/types.proto", fileDescriptor_2af51926fdbcbc05) } var fileDescriptor_2af51926fdbcbc05 = []byte{ - // 175 bytes of a gzipped FileDescriptorProto + // 179 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x2b, 0x49, 0xcd, 0x4b, 0x49, 0x2d, 0xca, 0xcd, 0xcc, 0x2b, 0xd1, 0xcf, 0x4d, 0xcd, 0x2d, 0xc8, 0xcf, 0xcf, 0xd1, 0x2f, 0xa9, 0x2c, 0x48, 0x2d, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x12, 0x42, 0xc8, 0xeb, 0x41, - 0xe5, 0x95, 0x44, 0xb8, 0x98, 0x42, 0x2a, 0x84, 0xf8, 0xb8, 0x98, 0x4a, 0x2a, 0x24, 0x18, 0x15, - 0x18, 0x35, 0x78, 0x82, 0x98, 0x4a, 0x2a, 0x94, 0xac, 0xb8, 0xd8, 0x7d, 0x53, 0x8b, 0x8b, 0x13, - 0xd3, 0x53, 0x85, 0x34, 0xe0, 0x52, 0xdc, 0x46, 0x62, 0x7a, 0x98, 0x26, 0xe8, 0x85, 0x54, 0x78, - 0x30, 0x80, 0x34, 0x39, 0xb1, 0x72, 0x31, 0x17, 0x97, 0xe6, 0x3a, 0x05, 0x9f, 0x78, 0x24, 0xc7, - 0x78, 0xe1, 0x91, 0x1c, 0xe3, 0x83, 0x47, 0x72, 0x8c, 0x13, 0x1e, 0xcb, 0x31, 0x5c, 0x78, 0x2c, - 0xc7, 0x70, 0xe3, 0xb1, 0x1c, 0x43, 0x94, 0x65, 0x7a, 0x66, 0x49, 0x46, 0x69, 0x92, 0x5e, 0x72, - 0x7e, 0xae, 0x3e, 0x92, 0x53, 0x91, 0x98, 0x60, 0x77, 0xea, 0x63, 0x7a, 0x23, 0x89, 0x0d, 0x2c, - 0x63, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x21, 0xe2, 0x11, 0x57, 0xe3, 0x00, 0x00, 0x00, -} - -func (m *Tx) Marshal() (dAtA []byte, err error) { + 0xe5, 0x95, 0xc4, 0xb9, 0x98, 0x43, 0x2a, 0x8a, 0x85, 0x04, 0xb8, 0x98, 0x4b, 0x2a, 0x8a, 0x25, + 0x18, 0x15, 0x98, 0x35, 0x78, 0x82, 0x40, 0x4c, 0x25, 0x5b, 0x2e, 0x76, 0xdf, 0xd4, 0xe2, 0xe2, + 0xc4, 0xf4, 0x54, 0x21, 0x6d, 0x98, 0x24, 0xa3, 0x06, 0xb7, 0x91, 0xb8, 0x1e, 0xa6, 0x29, 0x7a, + 0x21, 0x15, 0xc5, 0x1e, 0x0c, 0x60, 0x7d, 0x4e, 0xac, 0x5c, 0xcc, 0xc5, 0xa5, 0xb9, 0x4e, 0xc1, + 0x27, 0x1e, 0xc9, 0x31, 0x5e, 0x78, 0x24, 0xc7, 0xf8, 0xe0, 0x91, 0x1c, 0xe3, 0x84, 0xc7, 0x72, + 0x0c, 0x17, 0x1e, 0xcb, 0x31, 0xdc, 0x78, 0x2c, 0xc7, 0x10, 0x65, 0x99, 0x9e, 0x59, 0x92, 0x51, + 0x9a, 0xa4, 0x97, 0x9c, 0x9f, 0xab, 0x8f, 0xe4, 0x60, 0x24, 0x26, 0xd8, 0xb5, 0xfa, 0x98, 0x9e, + 0x49, 0x62, 0x03, 0xcb, 0x18, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0xca, 0xc3, 0xa0, 0xfc, 0xe9, + 0x00, 0x00, 0x00, +} + +func (m *Txs) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -170,22 +171,24 @@ func (m *Tx) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *Tx) MarshalTo(dAtA []byte) (int, error) { +func (m *Txs) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *Tx) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *Txs) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int _ = l - if len(m.Tx) > 0 { - i -= len(m.Tx) - copy(dAtA[i:], m.Tx) - i = encodeVarintTypes(dAtA, i, uint64(len(m.Tx))) - i-- - dAtA[i] = 0xa + if len(m.Txs) > 0 { + for iNdEx := len(m.Txs) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.Txs[iNdEx]) + copy(dAtA[i:], m.Txs[iNdEx]) + i = encodeVarintTypes(dAtA, i, uint64(len(m.Txs[iNdEx]))) + i-- + dAtA[i] = 0xa + } } return len(dAtA) - i, nil } @@ -222,16 +225,16 @@ func (m *Message) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } -func (m *Message_Tx) MarshalTo(dAtA []byte) (int, error) { +func (m *Message_Txs) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *Message_Tx) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *Message_Txs) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) - if m.Tx != nil { + if m.Txs != nil { { - size, err := m.Tx.MarshalToSizedBuffer(dAtA[:i]) + size, err := m.Txs.MarshalToSizedBuffer(dAtA[:i]) if err != nil { return 0, err } @@ -254,15 +257,17 @@ func encodeVarintTypes(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return base } -func (m *Tx) Size() (n int) { +func (m *Txs) Size() (n int) { if m == nil { return 0 } var l int _ = l - l = len(m.Tx) - if l > 0 { - n += 1 + l + sovTypes(uint64(l)) + if len(m.Txs) > 0 { + for _, b := range m.Txs { + l = len(b) + n += 1 + l + sovTypes(uint64(l)) + } } return n } @@ -279,14 +284,14 @@ func (m *Message) Size() (n int) { return n } -func (m *Message_Tx) Size() (n int) { +func (m *Message_Txs) Size() (n int) { if m == nil { return 0 } var l int _ = l - if m.Tx != nil { - l = m.Tx.Size() + if m.Txs != nil { + l = m.Txs.Size() n += 1 + l + sovTypes(uint64(l)) } return n @@ -298,7 +303,7 @@ func sovTypes(x uint64) (n int) { func sozTypes(x uint64) (n int) { return sovTypes(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } -func (m *Tx) Unmarshal(dAtA []byte) error { +func (m *Txs) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 for iNdEx < l { @@ -321,15 +326,15 @@ func (m *Tx) Unmarshal(dAtA []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) if wireType == 4 { - return fmt.Errorf("proto: Tx: wiretype end group for non-group") + return fmt.Errorf("proto: Txs: wiretype end group for non-group") } if fieldNum <= 0 { - return fmt.Errorf("proto: Tx: illegal tag %d (wire type %d)", fieldNum, wire) + return fmt.Errorf("proto: Txs: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { case 1: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Tx", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field Txs", wireType) } var byteLen int for shift := uint(0); ; shift += 7 { @@ -356,10 +361,8 @@ func (m *Tx) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Tx = append(m.Tx[:0], dAtA[iNdEx:postIndex]...) - if m.Tx == nil { - m.Tx = []byte{} - } + m.Txs = append(m.Txs, make([]byte, postIndex-iNdEx)) + copy(m.Txs[len(m.Txs)-1], dAtA[iNdEx:postIndex]) iNdEx = postIndex default: iNdEx = preIndex @@ -416,7 +419,7 @@ func (m *Message) Unmarshal(dAtA []byte) error { switch fieldNum { case 1: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Tx", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field Txs", wireType) } var msglen int for shift := uint(0); ; shift += 7 { @@ -443,11 +446,11 @@ func (m *Message) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - v := &Tx{} + v := &Txs{} if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } - m.Sum = &Message_Tx{v} + m.Sum = &Message_Txs{v} iNdEx = postIndex default: iNdEx = preIndex diff --git a/proto/tendermint/mempool/types.proto b/proto/tendermint/mempool/types.proto index 876c0172b..b55d9717b 100644 --- a/proto/tendermint/mempool/types.proto +++ b/proto/tendermint/mempool/types.proto @@ -3,12 +3,12 @@ package tendermint.mempool; option go_package = "github.com/tendermint/tendermint/proto/tendermint/mempool"; -message Tx { - bytes tx = 1; +message Txs { + repeated bytes txs = 1; } message Message { oneof sum { - Tx tx = 1; + Txs txs = 1; } }