diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 567f4cee2..c40eb65ba 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -38,6 +38,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi - [evidence] [\#4780](https://github.com/tendermint/tendermint/pull/4780) Cap evidence to an absolute number (@cmwaters) Add `max_num` to consensus evidence parameters (default: 50 items). - [mempool] \#4940 Migrate mempool from amino binary encoding to Protobuf + - [statesync] \#4943 Migrate statesync reactor from amino binary encoding to Protobuf ### FEATURES: diff --git a/statesync/messages.go b/statesync/messages.go index 83aecd7f1..2f9312d23 100644 --- a/statesync/messages.go +++ b/statesync/messages.go @@ -4,9 +4,9 @@ import ( "errors" "fmt" - amino "github.com/tendermint/go-amino" + "github.com/gogo/protobuf/proto" - "github.com/tendermint/tendermint/types" + ssproto "github.com/tendermint/tendermint/proto/statesync" ) const ( @@ -14,116 +14,84 @@ const ( snapshotMsgSize = int(4e6) // chunkMsgSize is the maximum size of a chunkResponseMessage chunkMsgSize = int(16e6) - // maxMsgSize is the maximum size of any message - maxMsgSize = chunkMsgSize ) -var cdc = amino.NewCodec() - -func init() { - cdc.RegisterInterface((*Message)(nil), nil) - cdc.RegisterConcrete(&snapshotsRequestMessage{}, "tendermint/SnapshotsRequestMessage", nil) - cdc.RegisterConcrete(&snapshotsResponseMessage{}, "tendermint/SnapshotsResponseMessage", nil) - cdc.RegisterConcrete(&chunkRequestMessage{}, "tendermint/ChunkRequestMessage", nil) - cdc.RegisterConcrete(&chunkResponseMessage{}, "tendermint/ChunkResponseMessage", nil) - types.RegisterBlockAmino(cdc) -} - -// decodeMsg decodes a message. -func decodeMsg(bz []byte) (Message, error) { - if len(bz) > maxMsgSize { - return nil, fmt.Errorf("msg exceeds max size (%d > %d)", len(bz), maxMsgSize) +// mustEncodeMsg encodes a Protobuf message, panicing on error. +func mustEncodeMsg(pb proto.Message) []byte { + msg := ssproto.Message{} + switch pb := pb.(type) { + case *ssproto.ChunkRequest: + msg.Sum = &ssproto.Message_ChunkRequest{ChunkRequest: pb} + case *ssproto.ChunkResponse: + msg.Sum = &ssproto.Message_ChunkResponse{ChunkResponse: pb} + case *ssproto.SnapshotsRequest: + msg.Sum = &ssproto.Message_SnapshotsRequest{SnapshotsRequest: pb} + case *ssproto.SnapshotsResponse: + msg.Sum = &ssproto.Message_SnapshotsResponse{SnapshotsResponse: pb} + default: + panic(fmt.Errorf("unknown message type %T", pb)) } - var msg Message - err := cdc.UnmarshalBinaryBare(bz, &msg) + bz, err := proto.Marshal(&msg) if err != nil { - return nil, err + panic(fmt.Errorf("unable to marshal %T: %w", pb, err)) } - return msg, nil + return bz } -// Message is a message sent and received by the reactor. -type Message interface { - ValidateBasic() error -} - -// snapshotsRequestMessage requests recent snapshots from a peer. -type snapshotsRequestMessage struct{} - -// ValidateBasic implements Message. -func (m *snapshotsRequestMessage) ValidateBasic() error { - if m == nil { - return errors.New("nil message") - } - return nil -} - -// SnapshotResponseMessage contains information about a single snapshot. -type snapshotsResponseMessage struct { - Height uint64 - Format uint32 - Chunks uint32 - Hash []byte - Metadata []byte -} - -// ValidateBasic implements Message. -func (m *snapshotsResponseMessage) ValidateBasic() error { - if m == nil { - return errors.New("nil message") - } - if m.Height == 0 { - return errors.New("height cannot be 0") - } - if len(m.Hash) == 0 { - return errors.New("snapshot has no hash") - } - if m.Chunks == 0 { - return errors.New("snapshot has no chunks") - } - return nil -} - -// chunkRequestMessage requests a single chunk from a peer. -type chunkRequestMessage struct { - Height uint64 - Format uint32 - Index uint32 -} - -// ValidateBasic implements Message. -func (m *chunkRequestMessage) ValidateBasic() error { - if m == nil { - return errors.New("nil message") +// decodeMsg decodes a Protobuf message. +func decodeMsg(bz []byte) (proto.Message, error) { + pb := &ssproto.Message{} + err := proto.Unmarshal(bz, pb) + if err != nil { + return nil, err } - if m.Height == 0 { - return errors.New("height cannot be 0") + switch msg := pb.Sum.(type) { + case *ssproto.Message_ChunkRequest: + return msg.ChunkRequest, nil + case *ssproto.Message_ChunkResponse: + return msg.ChunkResponse, nil + case *ssproto.Message_SnapshotsRequest: + return msg.SnapshotsRequest, nil + case *ssproto.Message_SnapshotsResponse: + return msg.SnapshotsResponse, nil + default: + return nil, fmt.Errorf("unknown message type %T", msg) } - return nil } -// chunkResponseMessage contains a single chunk from a peer. -type chunkResponseMessage struct { - Height uint64 - Format uint32 - Index uint32 - Chunk []byte - Missing bool -} - -// ValidateBasic implements Message. -func (m *chunkResponseMessage) ValidateBasic() error { - if m == nil { - return errors.New("nil message") - } - if m.Height == 0 { - return errors.New("height cannot be 0") - } - if m.Missing && len(m.Chunk) > 0 { - return errors.New("missing chunk cannot have contents") +// validateMsg validates a message. +func validateMsg(pb proto.Message) error { + if pb == nil { + return errors.New("message cannot be nil") } - if !m.Missing && m.Chunk == nil { - return errors.New("chunk cannot be nil") + switch msg := pb.(type) { + case *ssproto.ChunkRequest: + if msg.Height == 0 { + return errors.New("height cannot be 0") + } + case *ssproto.ChunkResponse: + if msg.Height == 0 { + return errors.New("height cannot be 0") + } + if msg.Missing && len(msg.Chunk) > 0 { + return errors.New("missing chunk cannot have contents") + } + if !msg.Missing && msg.Chunk == nil { + return errors.New("chunk cannot be nil") + } + case *ssproto.SnapshotsRequest: + case *ssproto.SnapshotsResponse: + if msg.Height == 0 { + return errors.New("height cannot be 0") + } + if len(msg.Hash) == 0 { + return errors.New("snapshot has no hash") + } + if msg.Chunks == 0 { + return errors.New("snapshot has no chunks") + } + default: + return fmt.Errorf("unknown message type %T", msg) } return nil } diff --git a/statesync/messages_test.go b/statesync/messages_test.go index 68244a5da..cf7630e29 100644 --- a/statesync/messages_test.go +++ b/statesync/messages_test.go @@ -3,99 +3,76 @@ package statesync import ( "testing" + "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/require" + + ssproto "github.com/tendermint/tendermint/proto/statesync" + tmproto "github.com/tendermint/tendermint/proto/types" ) -func TestSnapshotsRequestMessage_ValidateBasic(t *testing.T) { +func TestValidateMsg(t *testing.T) { testcases := map[string]struct { - msg *snapshotsRequestMessage + msg proto.Message valid bool }{ - "nil": {nil, false}, - "valid": {&snapshotsRequestMessage{}, true}, - } - for name, tc := range testcases { - tc := tc - t.Run(name, func(t *testing.T) { - err := tc.msg.ValidateBasic() - if tc.valid { - require.NoError(t, err) - } else { - require.Error(t, err) - } - }) - } -} + "nil": {nil, false}, + "unrelated": {&tmproto.Block{}, false}, -func TestSnapshotsResponseMessage_ValidateBasic(t *testing.T) { - testcases := map[string]struct { - msg *snapshotsResponseMessage - valid bool - }{ - "nil": {nil, false}, - "valid": {&snapshotsResponseMessage{Height: 1, Format: 1, Chunks: 2, Hash: []byte{1}}, true}, - "0 height": {&snapshotsResponseMessage{Height: 0, Format: 1, Chunks: 2, Hash: []byte{1}}, false}, - "0 format": {&snapshotsResponseMessage{Height: 1, Format: 0, Chunks: 2, Hash: []byte{1}}, true}, - "0 chunks": {&snapshotsResponseMessage{Height: 1, Format: 1, Hash: []byte{1}}, false}, - "no hash": {&snapshotsResponseMessage{Height: 1, Format: 1, Chunks: 2, Hash: []byte{}}, false}, - } - for name, tc := range testcases { - tc := tc - t.Run(name, func(t *testing.T) { - err := tc.msg.ValidateBasic() - if tc.valid { - require.NoError(t, err) - } else { - require.Error(t, err) - } - }) - } -} + "ChunkRequest valid": {&ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1}, true}, + "ChunkRequest 0 height": {&ssproto.ChunkRequest{Height: 0, Format: 1, Index: 1}, false}, + "ChunkRequest 0 format": {&ssproto.ChunkRequest{Height: 1, Format: 0, Index: 1}, true}, + "ChunkRequest 0 chunk": {&ssproto.ChunkRequest{Height: 1, Format: 1, Index: 0}, true}, -func TestChunkRequestMessage_ValidateBasic(t *testing.T) { - testcases := map[string]struct { - msg *chunkRequestMessage - valid bool - }{ - "nil": {nil, false}, - "valid": {&chunkRequestMessage{Height: 1, Format: 1, Index: 1}, true}, - "0 height": {&chunkRequestMessage{Height: 0, Format: 1, Index: 1}, false}, - "0 format": {&chunkRequestMessage{Height: 1, Format: 0, Index: 1}, true}, - "0 chunk": {&chunkRequestMessage{Height: 1, Format: 1, Index: 0}, true}, - } - for name, tc := range testcases { - tc := tc - t.Run(name, func(t *testing.T) { - err := tc.msg.ValidateBasic() - if tc.valid { - require.NoError(t, err) - } else { - require.Error(t, err) - } - }) - } -} + "ChunkResponse valid": { + &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Chunk: []byte{1}}, + true}, + "ChunkResponse 0 height": { + &ssproto.ChunkResponse{Height: 0, Format: 1, Index: 1, Chunk: []byte{1}}, + false}, + "ChunkResponse 0 format": { + &ssproto.ChunkResponse{Height: 1, Format: 0, Index: 1, Chunk: []byte{1}}, + true}, + "ChunkResponse 0 chunk": { + &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 0, Chunk: []byte{1}}, + true}, + "ChunkResponse empty body": { + &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Chunk: []byte{}}, + true}, + "ChunkResponse nil body": { + &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Chunk: nil}, + false}, + "ChunkResponse missing": { + &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Missing: true}, + true}, + "ChunkResponse missing with empty": { + &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Missing: true, Chunk: []byte{}}, + true}, + "ChunkResponse missing with body": { + &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Missing: true, Chunk: []byte{1}}, + false}, -func TestChunkResponseMessage_ValidateBasic(t *testing.T) { - testcases := map[string]struct { - msg *chunkResponseMessage - valid bool - }{ - "nil message": {nil, false}, - "valid": {&chunkResponseMessage{Height: 1, Format: 1, Index: 1, Chunk: []byte{1}}, true}, - "0 height": {&chunkResponseMessage{Height: 0, Format: 1, Index: 1, Chunk: []byte{1}}, false}, - "0 format": {&chunkResponseMessage{Height: 1, Format: 0, Index: 1, Chunk: []byte{1}}, true}, - "0 chunk": {&chunkResponseMessage{Height: 1, Format: 1, Index: 0, Chunk: []byte{1}}, true}, - "empty body": {&chunkResponseMessage{Height: 1, Format: 1, Index: 1, Chunk: []byte{}}, true}, - "nil body": {&chunkResponseMessage{Height: 1, Format: 1, Index: 1, Chunk: nil}, false}, - "missing": {&chunkResponseMessage{Height: 1, Format: 1, Index: 1, Missing: true}, true}, - "missing with empty": {&chunkResponseMessage{Height: 1, Format: 1, Index: 1, Missing: true, Chunk: []byte{}}, true}, - "missing with body": {&chunkResponseMessage{Height: 1, Format: 1, Index: 1, Missing: true, Chunk: []byte{1}}, false}, + "SnapshotsRequest valid": {&ssproto.SnapshotsRequest{}, true}, + + "SnapshotsResponse valid": { + &ssproto.SnapshotsResponse{Height: 1, Format: 1, Chunks: 2, Hash: []byte{1}}, + true}, + "SnapshotsResponse 0 height": { + &ssproto.SnapshotsResponse{Height: 0, Format: 1, Chunks: 2, Hash: []byte{1}}, + false}, + "SnapshotsResponse 0 format": { + &ssproto.SnapshotsResponse{Height: 1, Format: 0, Chunks: 2, Hash: []byte{1}}, + true}, + "SnapshotsResponse 0 chunks": { + &ssproto.SnapshotsResponse{Height: 1, Format: 1, Hash: []byte{1}}, + false}, + "SnapshotsResponse no hash": { + &ssproto.SnapshotsResponse{Height: 1, Format: 1, Chunks: 2, Hash: []byte{}}, + false}, } for name, tc := range testcases { tc := tc t.Run(name, func(t *testing.T) { - err := tc.msg.ValidateBasic() + err := validateMsg(tc.msg) if tc.valid { require.NoError(t, err) } else { diff --git a/statesync/reactor.go b/statesync/reactor.go index b4221bff6..4c09a008d 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -7,6 +7,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/p2p" + ssproto "github.com/tendermint/tendermint/proto/statesync" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" @@ -99,7 +100,7 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { r.Switch.StopPeerForError(src, err) return } - err = msg.ValidateBasic() + err = validateMsg(msg) if err != nil { r.Logger.Error("Invalid message", "peer", src, "msg", msg, "err", err) r.Switch.StopPeerForError(src, err) @@ -109,7 +110,7 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { switch chID { case SnapshotChannel: switch msg := msg.(type) { - case *snapshotsRequestMessage: + case *ssproto.SnapshotsRequest: snapshots, err := r.recentSnapshots(recentSnapshots) if err != nil { r.Logger.Error("Failed to fetch snapshots", "err", err) @@ -118,7 +119,7 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { for _, snapshot := range snapshots { r.Logger.Debug("Advertising snapshot", "height", snapshot.Height, "format", snapshot.Format, "peer", src.ID()) - src.Send(chID, cdc.MustMarshalBinaryBare(&snapshotsResponseMessage{ + src.Send(chID, mustEncodeMsg(&ssproto.SnapshotsResponse{ Height: snapshot.Height, Format: snapshot.Format, Chunks: snapshot.Chunks, @@ -127,7 +128,7 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { })) } - case *snapshotsResponseMessage: + case *ssproto.SnapshotsResponse: r.mtx.RLock() defer r.mtx.RUnlock() if r.syncer == nil { @@ -154,7 +155,7 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { case ChunkChannel: switch msg := msg.(type) { - case *chunkRequestMessage: + case *ssproto.ChunkRequest: r.Logger.Debug("Received chunk request", "height", msg.Height, "format", msg.Format, "chunk", msg.Index, "peer", src.ID()) resp, err := r.conn.LoadSnapshotChunkSync(abci.RequestLoadSnapshotChunk{ @@ -169,7 +170,7 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { } r.Logger.Debug("Sending chunk", "height", msg.Height, "format", msg.Format, "chunk", msg.Index, "peer", src.ID()) - src.Send(ChunkChannel, cdc.MustMarshalBinaryBare(&chunkResponseMessage{ + src.Send(ChunkChannel, mustEncodeMsg(&ssproto.ChunkResponse{ Height: msg.Height, Format: msg.Format, Index: msg.Index, @@ -177,7 +178,7 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { Missing: resp.Chunk == nil, })) - case *chunkResponseMessage: + case *ssproto.ChunkResponse: r.mtx.RLock() defer r.mtx.RUnlock() if r.syncer == nil { diff --git a/statesync/reactor_test.go b/statesync/reactor_test.go index 402eba6d7..03ed44009 100644 --- a/statesync/reactor_test.go +++ b/statesync/reactor_test.go @@ -11,27 +11,28 @@ import ( abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/p2p" p2pmocks "github.com/tendermint/tendermint/p2p/mocks" + ssproto "github.com/tendermint/tendermint/proto/statesync" proxymocks "github.com/tendermint/tendermint/proxy/mocks" ) -func TestReactor_Receive_ChunkRequestMessage(t *testing.T) { +func TestReactor_Receive_ChunkRequest(t *testing.T) { testcases := map[string]struct { - request *chunkRequestMessage + request *ssproto.ChunkRequest chunk []byte - expectResponse *chunkResponseMessage + expectResponse *ssproto.ChunkResponse }{ "chunk is returned": { - &chunkRequestMessage{Height: 1, Format: 1, Index: 1}, + &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1}, []byte{1, 2, 3}, - &chunkResponseMessage{Height: 1, Format: 1, Index: 1, Chunk: []byte{1, 2, 3}}}, + &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Chunk: []byte{1, 2, 3}}}, "empty chunk is returned, as nil": { - &chunkRequestMessage{Height: 1, Format: 1, Index: 1}, + &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1}, []byte{}, - &chunkResponseMessage{Height: 1, Format: 1, Index: 1, Chunk: nil}}, + &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Chunk: nil}}, "nil (missing) chunk is returned as missing": { - &chunkRequestMessage{Height: 1, Format: 1, Index: 1}, + &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1}, nil, - &chunkResponseMessage{Height: 1, Format: 1, Index: 1, Missing: true}, + &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Missing: true}, }, } @@ -49,22 +50,22 @@ func TestReactor_Receive_ChunkRequestMessage(t *testing.T) { // Mock peer to store response, if found peer := &p2pmocks.Peer{} peer.On("ID").Return(p2p.ID("id")) - var response *chunkResponseMessage + var response *ssproto.ChunkResponse if tc.expectResponse != nil { peer.On("Send", ChunkChannel, mock.Anything).Run(func(args mock.Arguments) { msg, err := decodeMsg(args[1].([]byte)) require.NoError(t, err) - response = msg.(*chunkResponseMessage) + response = msg.(*ssproto.ChunkResponse) }).Return(true) } - // Start a reactor and send a chunkRequestMessage, then wait for and check response + // Start a reactor and send a ssproto.ChunkRequest, then wait for and check response r := NewReactor(conn, nil, "") err := r.Start() require.NoError(t, err) defer r.Stop() - r.Receive(ChunkChannel, peer, cdc.MustMarshalBinaryBare(tc.request)) + r.Receive(ChunkChannel, peer, mustEncodeMsg(tc.request)) time.Sleep(100 * time.Millisecond) assert.Equal(t, tc.expectResponse, response) @@ -74,12 +75,12 @@ func TestReactor_Receive_ChunkRequestMessage(t *testing.T) { } } -func TestReactor_Receive_SnapshotRequestMessage(t *testing.T) { +func TestReactor_Receive_SnapshotsRequest(t *testing.T) { testcases := map[string]struct { snapshots []*abci.Snapshot - expectResponses []*snapshotsResponseMessage + expectResponses []*ssproto.SnapshotsResponse }{ - "no snapshots": {nil, []*snapshotsResponseMessage{}}, + "no snapshots": {nil, []*ssproto.SnapshotsResponse{}}, ">10 unordered snapshots": { []*abci.Snapshot{ {Height: 1, Format: 2, Chunks: 7, Hash: []byte{1, 2}, Metadata: []byte{1}}, @@ -95,7 +96,7 @@ func TestReactor_Receive_SnapshotRequestMessage(t *testing.T) { {Height: 2, Format: 3, Chunks: 7, Hash: []byte{2, 3}, Metadata: []byte{11}}, {Height: 3, Format: 3, Chunks: 7, Hash: []byte{3, 3}, Metadata: []byte{12}}, }, - []*snapshotsResponseMessage{ + []*ssproto.SnapshotsResponse{ {Height: 3, Format: 4, Chunks: 7, Hash: []byte{3, 4}, Metadata: []byte{9}}, {Height: 3, Format: 3, Chunks: 7, Hash: []byte{3, 3}, Metadata: []byte{12}}, {Height: 3, Format: 2, Chunks: 7, Hash: []byte{3, 2}, Metadata: []byte{3}}, @@ -120,14 +121,14 @@ func TestReactor_Receive_SnapshotRequestMessage(t *testing.T) { }, nil) // Mock peer to catch responses and store them in a slice - responses := []*snapshotsResponseMessage{} + responses := []*ssproto.SnapshotsResponse{} peer := &p2pmocks.Peer{} if len(tc.expectResponses) > 0 { peer.On("ID").Return(p2p.ID("id")) peer.On("Send", SnapshotChannel, mock.Anything).Run(func(args mock.Arguments) { msg, err := decodeMsg(args[1].([]byte)) require.NoError(t, err) - responses = append(responses, msg.(*snapshotsResponseMessage)) + responses = append(responses, msg.(*ssproto.SnapshotsResponse)) }).Return(true) } @@ -137,7 +138,7 @@ func TestReactor_Receive_SnapshotRequestMessage(t *testing.T) { require.NoError(t, err) defer r.Stop() - r.Receive(SnapshotChannel, peer, cdc.MustMarshalBinaryBare(&snapshotsRequestMessage{})) + r.Receive(SnapshotChannel, peer, mustEncodeMsg(&ssproto.SnapshotsRequest{})) time.Sleep(100 * time.Millisecond) assert.Equal(t, tc.expectResponses, responses) diff --git a/statesync/syncer.go b/statesync/syncer.go index 3386c746c..836dfb2ec 100644 --- a/statesync/syncer.go +++ b/statesync/syncer.go @@ -11,6 +11,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" + ssproto "github.com/tendermint/tendermint/proto/statesync" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" @@ -115,7 +116,7 @@ func (s *syncer) AddSnapshot(peer p2p.Peer, snapshot *snapshot) (bool, error) { // to discover snapshots, later we may want to do retries and stuff. func (s *syncer) AddPeer(peer p2p.Peer) { s.logger.Debug("Requesting snapshots from peer", "peer", peer.ID()) - peer.Send(SnapshotChannel, cdc.MustMarshalBinaryBare(&snapshotsRequestMessage{})) + peer.Send(SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{})) } // RemovePeer removes a peer from the pool. @@ -411,7 +412,7 @@ func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) { } s.logger.Debug("Requesting snapshot chunk", "height", snapshot.Height, "format", snapshot.Format, "chunk", chunk, "peer", peer.ID()) - peer.Send(ChunkChannel, cdc.MustMarshalBinaryBare(&chunkRequestMessage{ + peer.Send(ChunkChannel, mustEncodeMsg(&ssproto.ChunkRequest{ Height: snapshot.Height, Format: snapshot.Format, Index: chunk, diff --git a/statesync/syncer_test.go b/statesync/syncer_test.go index b532eac40..ee7158987 100644 --- a/statesync/syncer_test.go +++ b/statesync/syncer_test.go @@ -14,6 +14,7 @@ import ( "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" p2pmocks "github.com/tendermint/tendermint/p2p/mocks" + ssproto "github.com/tendermint/tendermint/proto/statesync" "github.com/tendermint/tendermint/proxy" proxymocks "github.com/tendermint/tendermint/proxy/mocks" sm "github.com/tendermint/tendermint/state" @@ -90,13 +91,13 @@ func TestSyncer_SyncAny(t *testing.T) { // Adding a couple of peers should trigger snapshot discovery messages peerA := &p2pmocks.Peer{} peerA.On("ID").Return(p2p.ID("a")) - peerA.On("Send", SnapshotChannel, cdc.MustMarshalBinaryBare(&snapshotsRequestMessage{})).Return(true) + peerA.On("Send", SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{})).Return(true) syncer.AddPeer(peerA) peerA.AssertExpectations(t) peerB := &p2pmocks.Peer{} peerB.On("ID").Return(p2p.ID("b")) - peerB.On("Send", SnapshotChannel, cdc.MustMarshalBinaryBare(&snapshotsRequestMessage{})).Return(true) + peerB.On("Send", SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{})).Return(true) syncer.AddPeer(peerB) peerB.AssertExpectations(t) @@ -139,9 +140,9 @@ func TestSyncer_SyncAny(t *testing.T) { chunkRequests := make(map[uint32]int) chunkRequestsMtx := sync.Mutex{} onChunkRequest := func(args mock.Arguments) { - msg := &chunkRequestMessage{} - err := cdc.UnmarshalBinaryBare(args[1].([]byte), &msg) + pb, err := decodeMsg(args[1].([]byte)) require.NoError(t, err) + msg := pb.(*ssproto.ChunkRequest) require.EqualValues(t, 1, msg.Height) require.EqualValues(t, 1, msg.Format) require.LessOrEqual(t, msg.Index, uint32(len(chunks)))