From 1b6df6783def47b318a36e321d54971b8032d0a4 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 23 Dec 2020 13:37:05 +0100 Subject: [PATCH] p2p: replace PeerID with NodeID --- p2p/channel.go | 4 +-- p2p/peer.go | 36 ++----------------------- p2p/peer_test.go | 45 ------------------------------- p2p/shim.go | 44 +++++++++--------------------- p2p/shim_test.go | 10 +++---- statesync/chunks.go | 12 ++++----- statesync/chunks_test.go | 38 +++++++++++++------------- statesync/reactor.go | 22 +++++++-------- statesync/reactor_test.go | 12 ++++----- statesync/snapshots.go | 54 ++++++++++++++++++------------------- statesync/snapshots_test.go | 53 +++++++++++++++++------------------- statesync/syncer.go | 30 +++++++++------------ statesync/syncer_test.go | 34 +++++++++++------------ 13 files changed, 145 insertions(+), 249 deletions(-) diff --git a/p2p/channel.go b/p2p/channel.go index 54c076a6b..26808afc7 100644 --- a/p2p/channel.go +++ b/p2p/channel.go @@ -11,8 +11,8 @@ type ChannelID uint16 // Envelope specifies the message receiver and sender. type Envelope struct { - From PeerID // Message sender, or empty for outbound messages. - To PeerID // Message receiver, or empty for inbound messages. + From NodeID // Message sender, or empty for outbound messages. + To NodeID // Message receiver, or empty for inbound messages. Broadcast bool // Send message to all connected peers, ignoring To. Message proto.Message // Payload. } diff --git a/p2p/peer.go b/p2p/peer.go index 0d65dad39..f5bb9fa8e 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -1,13 +1,10 @@ package p2p import ( - "bytes" - "encoding/hex" "fmt" "io" "net" "runtime/debug" - "strings" "sync" "time" @@ -18,35 +15,6 @@ import ( tmconn "github.com/tendermint/tendermint/p2p/conn" ) -// PeerID is a unique peer ID, generally expressed in hex form. -type PeerID []byte - -// String implements the fmt.Stringer interface for the PeerID type. -func (pid PeerID) String() string { - return strings.ToLower(hex.EncodeToString(pid)) -} - -// Empty returns true if the PeerID is considered empty. -func (pid PeerID) Empty() bool { - return len(pid) == 0 -} - -// PeerIDFromString returns a PeerID from an encoded string or an error upon -// decode failure. -func PeerIDFromString(s string) (PeerID, error) { - bz, err := hex.DecodeString(s) - if err != nil { - return nil, fmt.Errorf("failed to decode PeerID (%s): %w", s, err) - } - - return PeerID(bz), nil -} - -// Equal reports whether two PeerID are equal. -func (pid PeerID) Equal(other PeerID) bool { - return bytes.Equal(pid, other) -} - // PeerStatus specifies peer statuses. type PeerStatus string @@ -70,7 +38,7 @@ const ( // PeerError is a peer error reported by a reactor via the Error channel. The // severity may cause the peer to be disconnected or banned depending on policy. type PeerError struct { - PeerID PeerID + PeerID NodeID Err error Severity PeerErrorSeverity } @@ -134,7 +102,7 @@ func (puc *PeerUpdatesCh) Done() <-chan struct{} { // PeerUpdate is a peer status update for reactors. type PeerUpdate struct { - PeerID PeerID + PeerID NodeID Status PeerStatus } diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 0bfc0002f..e8c53e354 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -19,51 +19,6 @@ import ( tmconn "github.com/tendermint/tendermint/p2p/conn" ) -func TestPeerIDFromString(t *testing.T) { - testCases := map[string]struct { - input string - expectedID PeerID - expectErr bool - }{ - "empty peer ID string": {"", PeerID{}, false}, - "invalid peer ID string": {"foo", nil, true}, - "valid peer ID string": {"ff", PeerID{0xFF}, false}, - } - - for name, tc := range testCases { - tc := tc - t.Run(name, func(t *testing.T) { - pID, err := PeerIDFromString(tc.input) - require.Equal(t, tc.expectErr, err != nil, err) - require.Equal(t, tc.expectedID, pID) - }) - } -} - -func TestPeerID_String(t *testing.T) { - require.Equal(t, "", PeerID{}.String()) - require.Equal(t, "ff", PeerID{0xFF}.String()) -} - -func TestPeerID_Equal(t *testing.T) { - testCases := map[string]struct { - idA PeerID - idB PeerID - equal bool - }{ - "empty IDs": {PeerID{}, PeerID{}, true}, - "not equal": {PeerID{0xFF}, PeerID{0xAA}, false}, - "equal": {PeerID{0xFF}, PeerID{0xFF}, true}, - } - - for name, tc := range testCases { - tc := tc - t.Run(name, func(t *testing.T) { - require.Equal(t, tc.equal, tc.idA.Equal(tc.idB)) - }) - } -} - func TestPeerBasic(t *testing.T) { assert, require := assert.New(t), require.New(t) diff --git a/p2p/shim.go b/p2p/shim.go index bf57ac8fa..398c8eb2e 100644 --- a/p2p/shim.go +++ b/p2p/shim.go @@ -123,14 +123,14 @@ func (rs *ReactorShim) proxyPeerEnvelopes() { case e.Broadcast: rs.Switch.Broadcast(cs.Descriptor.ID, bz) - case !e.To.Empty(): - src := rs.Switch.peers.Get(NodeID(e.To.String())) + case e.To != "": + src := rs.Switch.peers.Get(e.To) if src == nil { rs.Logger.Error( "failed to proxy envelope; failed to find peer", "ch_id", cs.Descriptor.ID, "msg", e.Message, - "peer", e.To.String(), + "peer", e.To, ) continue } @@ -140,7 +140,7 @@ func (rs *ReactorShim) proxyPeerEnvelopes() { "failed to proxy message to peer", "ch_id", cs.Descriptor.ID, "msg", e.Message, - "peer", e.To.String(), + "peer", e.To, ) } @@ -160,10 +160,10 @@ func (rs *ReactorShim) handlePeerErrors() { for _, cs := range rs.Channels { go func(cs *ChannelShim) { for pErr := range cs.Channel.errCh { - if !pErr.PeerID.Empty() { - peer := rs.Switch.peers.Get(NodeID(pErr.PeerID.String())) + if pErr.PeerID != "" { + peer := rs.Switch.peers.Get(pErr.PeerID) if peer == nil { - rs.Logger.Error("failed to handle peer error; failed to find peer", "peer", pErr.PeerID.String()) + rs.Logger.Error("failed to handle peer error; failed to find peer", "peer", pErr.PeerID) continue } @@ -222,15 +222,9 @@ func (rs *ReactorShim) GetChannels() []*ChannelDescriptor { // The embedding reactor must be sure to listen for messages on this channel to // handle adding a peer. func (rs *ReactorShim) AddPeer(peer Peer) { - peerID, err := PeerIDFromString(string(peer.ID())) - if err != nil { - rs.Logger.Error("failed to add peer", "peer", peer.ID(), "err", err) - return - } - select { - case rs.PeerUpdates.updatesCh <- PeerUpdate{PeerID: peerID, Status: PeerStatusUp}: - rs.Logger.Debug("sent peer update", "reactor", rs.Name, "peer", peerID.String(), "status", PeerStatusUp) + case rs.PeerUpdates.updatesCh <- PeerUpdate{PeerID: peer.ID(), Status: PeerStatusUp}: + rs.Logger.Debug("sent peer update", "reactor", rs.Name, "peer", peer.ID(), "status", PeerStatusUp) case <-rs.PeerUpdates.Done(): // NOTE: We explicitly DO NOT close the PeerUpdatesCh's updateCh go channel. @@ -247,18 +241,12 @@ func (rs *ReactorShim) AddPeer(peer Peer) { // The embedding reactor must be sure to listen for messages on this channel to // handle removing a peer. func (rs *ReactorShim) RemovePeer(peer Peer, reason interface{}) { - peerID, err := PeerIDFromString(string(peer.ID())) - if err != nil { - rs.Logger.Error("failed to remove peer", "peer", peer.ID(), "err", err) - return - } - select { - case rs.PeerUpdates.updatesCh <- PeerUpdate{PeerID: peerID, Status: PeerStatusDown}: + case rs.PeerUpdates.updatesCh <- PeerUpdate{PeerID: peer.ID(), Status: PeerStatusDown}: rs.Logger.Debug( "sent peer update", "reactor", rs.Name, - "peer", peerID.String(), + "peer", peer.ID(), "reason", reason, "status", PeerStatusDown, ) @@ -291,12 +279,6 @@ func (rs *ReactorShim) Receive(chID byte, src Peer, msgBytes []byte) { return } - peerID, err := PeerIDFromString(string(src.ID())) - if err != nil { - rs.Logger.Error("failed to convert peer ID", "peer", src, "ch_id", chID, "err", err) - return - } - msg := proto.Clone(channelShim.Channel.messageType) msg.Reset() @@ -327,8 +309,8 @@ func (rs *ReactorShim) Receive(chID byte, src Peer, msgBytes []byte) { } select { - case channelShim.Channel.inCh <- Envelope{From: peerID, Message: msg}: - rs.Logger.Debug("proxied envelope", "reactor", rs.Name, "ch_id", cID, "peer", peerID.String()) + case channelShim.Channel.inCh <- Envelope{From: src.ID(), Message: msg}: + rs.Logger.Debug("proxied envelope", "reactor", rs.Name, "ch_id", cID, "peer", src.ID()) case <-channelShim.Channel.Done(): // NOTE: We explicitly DO NOT close the p2p Channel's inbound go channel. diff --git a/p2p/shim_test.go b/p2p/shim_test.go index 918666d5b..2b9f9fbc4 100644 --- a/p2p/shim_test.go +++ b/p2p/shim_test.go @@ -76,16 +76,14 @@ func setup(t *testing.T, peers []p2p.Peer) *reactorShimTestSuite { return rts } -func simplePeer(t *testing.T, id string) (*p2pmocks.Peer, p2p.PeerID) { +func simplePeer(t *testing.T, id string) (*p2pmocks.Peer, p2p.NodeID) { t.Helper() + peerID := p2p.NodeID(id) peer := &p2pmocks.Peer{} - peer.On("ID").Return(p2p.NodeID(id)) + peer.On("ID").Return(peerID) - pID, err := p2p.PeerIDFromString(string(peer.ID())) - require.NoError(t, err) - - return peer, pID + return peer, peerID } func TestReactorShim_GetChannel(t *testing.T) { diff --git a/statesync/chunks.go b/statesync/chunks.go index 21b48bad2..c53fca8ed 100644 --- a/statesync/chunks.go +++ b/statesync/chunks.go @@ -22,7 +22,7 @@ type chunk struct { Format uint32 Index uint32 Chunk []byte - Sender p2p.PeerID + Sender p2p.NodeID } // chunkQueue manages chunks for a state sync process, ordering them if requested. It acts as an @@ -33,7 +33,7 @@ type chunkQueue struct { snapshot *snapshot // if this is nil, the queue has been closed dir string // temp dir for on-disk chunk storage chunkFiles map[uint32]string // path to temporary chunk file - chunkSenders map[uint32]p2p.PeerID // the peer who sent the given chunk + chunkSenders map[uint32]p2p.NodeID // the peer who sent the given chunk chunkAllocated map[uint32]bool // chunks that have been allocated via Allocate() chunkReturned map[uint32]bool // chunks returned via Next() waiters map[uint32][]chan<- uint32 // signals WaitFor() waiters about chunk arrival @@ -54,7 +54,7 @@ func newChunkQueue(snapshot *snapshot, tempDir string) (*chunkQueue, error) { snapshot: snapshot, dir: dir, chunkFiles: make(map[uint32]string, snapshot.Chunks), - chunkSenders: make(map[uint32]p2p.PeerID, snapshot.Chunks), + chunkSenders: make(map[uint32]p2p.NodeID, snapshot.Chunks), chunkAllocated: make(map[uint32]bool, snapshot.Chunks), chunkReturned: make(map[uint32]bool, snapshot.Chunks), waiters: make(map[uint32][]chan<- uint32), @@ -188,12 +188,12 @@ func (q *chunkQueue) discard(index uint32) error { // DiscardSender discards all *unreturned* chunks from a given sender. If the caller wants to // discard already returned chunks, this can be done via Discard(). -func (q *chunkQueue) DiscardSender(peerID p2p.PeerID) error { +func (q *chunkQueue) DiscardSender(peerID p2p.NodeID) error { q.Lock() defer q.Unlock() for index, sender := range q.chunkSenders { - if sender.Equal(peerID) && !q.chunkReturned[index] { + if sender == peerID && !q.chunkReturned[index] { err := q.discard(index) if err != nil { return err @@ -208,7 +208,7 @@ func (q *chunkQueue) DiscardSender(peerID p2p.PeerID) error { // GetSender returns the sender of the chunk with the given index, or empty if // not found. -func (q *chunkQueue) GetSender(index uint32) p2p.PeerID { +func (q *chunkQueue) GetSender(index uint32) p2p.NodeID { q.Lock() defer q.Unlock() return q.chunkSenders[index] diff --git a/statesync/chunks_test.go b/statesync/chunks_test.go index 40258b5e7..26ec2e8a5 100644 --- a/statesync/chunks_test.go +++ b/statesync/chunks_test.go @@ -274,7 +274,7 @@ func TestChunkQueue_DiscardSender(t *testing.T) { defer teardown() // Allocate and add all chunks to the queue - senders := []p2p.PeerID{p2p.PeerID("a"), p2p.PeerID("b"), p2p.PeerID("c")} + senders := []p2p.NodeID{p2p.NodeID("a"), p2p.NodeID("b"), p2p.NodeID("c")} for i := uint32(0); i < queue.Size(); i++ { _, err := queue.Allocate() require.NoError(t, err) @@ -295,14 +295,14 @@ func TestChunkQueue_DiscardSender(t *testing.T) { } // Discarding an unknown sender should do nothing - err := queue.DiscardSender(p2p.PeerID("x")) + err := queue.DiscardSender(p2p.NodeID("x")) require.NoError(t, err) _, err = queue.Allocate() assert.Equal(t, errDone, err) // Discarding sender b should discard chunk 4, but not chunk 1 which has already been // returned. - err = queue.DiscardSender(p2p.PeerID("b")) + err = queue.DiscardSender(p2p.NodeID("b")) require.NoError(t, err) index, err := queue.Allocate() require.NoError(t, err) @@ -315,24 +315,24 @@ func TestChunkQueue_GetSender(t *testing.T) { queue, teardown := setupChunkQueue(t) defer teardown() - peerAID := p2p.PeerID{0xaa} - peerBID := p2p.PeerID{0xbb} + peerAID := p2p.NodeID("aa") + peerBID := p2p.NodeID("bb") _, err := queue.Add(&chunk{Height: 3, Format: 1, Index: 0, Chunk: []byte{1}, Sender: peerAID}) require.NoError(t, err) _, err = queue.Add(&chunk{Height: 3, Format: 1, Index: 1, Chunk: []byte{2}, Sender: peerBID}) require.NoError(t, err) - assert.Equal(t, "aa", queue.GetSender(0).String()) - assert.Equal(t, "bb", queue.GetSender(1).String()) - assert.Equal(t, "", queue.GetSender(2).String()) + assert.EqualValues(t, "aa", queue.GetSender(0)) + assert.EqualValues(t, "bb", queue.GetSender(1)) + assert.EqualValues(t, "", queue.GetSender(2)) // After the chunk has been processed, we should still know who the sender was chunk, err := queue.Next() require.NoError(t, err) require.NotNil(t, chunk) require.EqualValues(t, 0, chunk.Index) - assert.Equal(t, "aa", queue.GetSender(0).String()) + assert.EqualValues(t, "aa", queue.GetSender(0)) } func TestChunkQueue_Next(t *testing.T) { @@ -354,7 +354,7 @@ func TestChunkQueue_Next(t *testing.T) { }() assert.Empty(t, chNext) - _, err := queue.Add(&chunk{Height: 3, Format: 1, Index: 1, Chunk: []byte{3, 1, 1}, Sender: p2p.PeerID("b")}) + _, err := queue.Add(&chunk{Height: 3, Format: 1, Index: 1, Chunk: []byte{3, 1, 1}, Sender: p2p.NodeID("b")}) require.NoError(t, err) select { case <-chNext: @@ -362,17 +362,17 @@ func TestChunkQueue_Next(t *testing.T) { default: } - _, err = queue.Add(&chunk{Height: 3, Format: 1, Index: 0, Chunk: []byte{3, 1, 0}, Sender: p2p.PeerID("a")}) + _, err = queue.Add(&chunk{Height: 3, Format: 1, Index: 0, Chunk: []byte{3, 1, 0}, Sender: p2p.NodeID("a")}) require.NoError(t, err) assert.Equal(t, - &chunk{Height: 3, Format: 1, Index: 0, Chunk: []byte{3, 1, 0}, Sender: p2p.PeerID("a")}, + &chunk{Height: 3, Format: 1, Index: 0, Chunk: []byte{3, 1, 0}, Sender: p2p.NodeID("a")}, <-chNext) assert.Equal(t, - &chunk{Height: 3, Format: 1, Index: 1, Chunk: []byte{3, 1, 1}, Sender: p2p.PeerID("b")}, + &chunk{Height: 3, Format: 1, Index: 1, Chunk: []byte{3, 1, 1}, Sender: p2p.NodeID("b")}, <-chNext) - _, err = queue.Add(&chunk{Height: 3, Format: 1, Index: 4, Chunk: []byte{3, 1, 4}, Sender: p2p.PeerID("e")}) + _, err = queue.Add(&chunk{Height: 3, Format: 1, Index: 4, Chunk: []byte{3, 1, 4}, Sender: p2p.NodeID("e")}) require.NoError(t, err) select { case <-chNext: @@ -380,19 +380,19 @@ func TestChunkQueue_Next(t *testing.T) { default: } - _, err = queue.Add(&chunk{Height: 3, Format: 1, Index: 2, Chunk: []byte{3, 1, 2}, Sender: p2p.PeerID("c")}) + _, err = queue.Add(&chunk{Height: 3, Format: 1, Index: 2, Chunk: []byte{3, 1, 2}, Sender: p2p.NodeID("c")}) require.NoError(t, err) - _, err = queue.Add(&chunk{Height: 3, Format: 1, Index: 3, Chunk: []byte{3, 1, 3}, Sender: p2p.PeerID("d")}) + _, err = queue.Add(&chunk{Height: 3, Format: 1, Index: 3, Chunk: []byte{3, 1, 3}, Sender: p2p.NodeID("d")}) require.NoError(t, err) assert.Equal(t, - &chunk{Height: 3, Format: 1, Index: 2, Chunk: []byte{3, 1, 2}, Sender: p2p.PeerID("c")}, + &chunk{Height: 3, Format: 1, Index: 2, Chunk: []byte{3, 1, 2}, Sender: p2p.NodeID("c")}, <-chNext) assert.Equal(t, - &chunk{Height: 3, Format: 1, Index: 3, Chunk: []byte{3, 1, 3}, Sender: p2p.PeerID("d")}, + &chunk{Height: 3, Format: 1, Index: 3, Chunk: []byte{3, 1, 3}, Sender: p2p.NodeID("d")}, <-chNext) assert.Equal(t, - &chunk{Height: 3, Format: 1, Index: 4, Chunk: []byte{3, 1, 4}, Sender: p2p.PeerID("e")}, + &chunk{Height: 3, Format: 1, Index: 4, Chunk: []byte{3, 1, 4}, Sender: p2p.NodeID("e")}, <-chNext) _, ok := <-chNext diff --git a/statesync/reactor.go b/statesync/reactor.go index 0a7ca68ce..071d4e534 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -167,7 +167,7 @@ func (r *Reactor) handleSnapshotMessage(envelope p2p.Envelope) error { "advertising snapshot", "height", snapshot.Height, "format", snapshot.Format, - "peer", envelope.From.String(), + "peer", envelope.From, ) r.snapshotCh.Out() <- p2p.Envelope{ To: envelope.From, @@ -194,7 +194,7 @@ func (r *Reactor) handleSnapshotMessage(envelope p2p.Envelope) error { "received snapshot", "height", msg.Height, "format", msg.Format, - "peer", envelope.From.String(), + "peer", envelope.From, ) _, err := r.syncer.AddSnapshot(envelope.From, &snapshot{ Height: msg.Height, @@ -215,7 +215,7 @@ func (r *Reactor) handleSnapshotMessage(envelope p2p.Envelope) error { } default: - r.Logger.Error("received unknown message", "msg", msg, "peer", envelope.From.String()) + r.Logger.Error("received unknown message", "msg", msg, "peer", envelope.From) return fmt.Errorf("received unknown message: %T", msg) } @@ -233,7 +233,7 @@ func (r *Reactor) handleChunkMessage(envelope p2p.Envelope) error { "height", msg.Height, "format", msg.Format, "chunk", msg.Index, - "peer", envelope.From.String(), + "peer", envelope.From, ) resp, err := r.conn.LoadSnapshotChunkSync(context.Background(), abci.RequestLoadSnapshotChunk{ Height: msg.Height, @@ -247,7 +247,7 @@ func (r *Reactor) handleChunkMessage(envelope p2p.Envelope) error { "format", msg.Format, "chunk", msg.Index, "err", err, - "peer", envelope.From.String(), + "peer", envelope.From, ) return nil } @@ -257,7 +257,7 @@ func (r *Reactor) handleChunkMessage(envelope p2p.Envelope) error { "height", msg.Height, "format", msg.Format, "chunk", msg.Index, - "peer", envelope.From.String(), + "peer", envelope.From, ) r.chunkCh.Out() <- p2p.Envelope{ To: envelope.From, @@ -275,7 +275,7 @@ func (r *Reactor) handleChunkMessage(envelope p2p.Envelope) error { defer r.mtx.RUnlock() if r.syncer == nil { - r.Logger.Debug("received unexpected chunk; no state sync in progress", "peer", envelope.From.String()) + r.Logger.Debug("received unexpected chunk; no state sync in progress", "peer", envelope.From) return nil } @@ -284,7 +284,7 @@ func (r *Reactor) handleChunkMessage(envelope p2p.Envelope) error { "height", msg.Height, "format", msg.Format, "chunk", msg.Index, - "peer", envelope.From.String(), + "peer", envelope.From, ) _, err := r.syncer.AddChunk(&chunk{ Height: msg.Height, @@ -300,13 +300,13 @@ func (r *Reactor) handleChunkMessage(envelope p2p.Envelope) error { "format", msg.Format, "chunk", msg.Index, "err", err, - "peer", envelope.From.String(), + "peer", envelope.From, ) return nil } default: - r.Logger.Error("received unknown message", "msg", msg, "peer", envelope.From.String()) + r.Logger.Error("received unknown message", "msg", msg, "peer", envelope.From) return fmt.Errorf("received unknown message: %T", msg) } @@ -400,7 +400,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) (err error) { } }() - r.Logger.Debug("received peer update", "peer", peerUpdate.PeerID.String(), "status", peerUpdate.Status) + r.Logger.Debug("received peer update", "peer", peerUpdate.PeerID, "status", peerUpdate.Status) r.mtx.RLock() defer r.mtx.RUnlock() diff --git a/statesync/reactor_test.go b/statesync/reactor_test.go index 9d527f08a..824d2432a 100644 --- a/statesync/reactor_test.go +++ b/statesync/reactor_test.go @@ -119,7 +119,7 @@ func TestReactor_ChunkRequest_InvalidRequest(t *testing.T) { rts := setup(t, nil, nil, nil, 2) rts.chunkInCh <- p2p.Envelope{ - From: p2p.PeerID{0xAA}, + From: p2p.NodeID("aa"), Message: &ssproto.SnapshotsRequest{}, } @@ -127,7 +127,7 @@ func TestReactor_ChunkRequest_InvalidRequest(t *testing.T) { require.Error(t, response.Err) require.Empty(t, rts.chunkOutCh) require.Contains(t, response.Err.Error(), "received unknown message") - require.Equal(t, p2p.PeerID{0xAA}, response.PeerID) + require.Equal(t, p2p.NodeID("aa"), response.PeerID) } func TestReactor_ChunkRequest(t *testing.T) { @@ -173,7 +173,7 @@ func TestReactor_ChunkRequest(t *testing.T) { rts := setup(t, conn, nil, nil, 2) rts.chunkInCh <- p2p.Envelope{ - From: p2p.PeerID{0xAA}, + From: p2p.NodeID("aa"), Message: tc.request, } @@ -190,7 +190,7 @@ func TestReactor_SnapshotsRequest_InvalidRequest(t *testing.T) { rts := setup(t, nil, nil, nil, 2) rts.snapshotInCh <- p2p.Envelope{ - From: p2p.PeerID{0xAA}, + From: p2p.NodeID("aa"), Message: &ssproto.ChunkRequest{}, } @@ -198,7 +198,7 @@ func TestReactor_SnapshotsRequest_InvalidRequest(t *testing.T) { require.Error(t, response.Err) require.Empty(t, rts.snapshotOutCh) require.Contains(t, response.Err.Error(), "received unknown message") - require.Equal(t, p2p.PeerID{0xAA}, response.PeerID) + require.Equal(t, p2p.NodeID("aa"), response.PeerID) } func TestReactor_SnapshotsRequest(t *testing.T) { @@ -250,7 +250,7 @@ func TestReactor_SnapshotsRequest(t *testing.T) { rts := setup(t, conn, nil, nil, 100) rts.snapshotInCh <- p2p.Envelope{ - From: p2p.PeerID{0xAA}, + From: p2p.NodeID("aa"), Message: &ssproto.SnapshotsRequest{}, } diff --git a/statesync/snapshots.go b/statesync/snapshots.go index ab8fa8e2e..53ffed4e7 100644 --- a/statesync/snapshots.go +++ b/statesync/snapshots.go @@ -1,12 +1,12 @@ package statesync import ( - "bytes" "context" "crypto/sha256" "fmt" "math/rand" "sort" + "strings" "time" tmsync "github.com/tendermint/tendermint/libs/sync" @@ -47,16 +47,16 @@ type snapshotPool struct { tmsync.Mutex snapshots map[snapshotKey]*snapshot - snapshotPeers map[snapshotKey]map[string]p2p.PeerID + snapshotPeers map[snapshotKey]map[p2p.NodeID]p2p.NodeID // indexes for fast searches formatIndex map[uint32]map[snapshotKey]bool heightIndex map[uint64]map[snapshotKey]bool - peerIndex map[string]map[snapshotKey]bool + peerIndex map[p2p.NodeID]map[snapshotKey]bool // blacklists for rejected items formatBlacklist map[uint32]bool - peerBlacklist map[string]bool + peerBlacklist map[p2p.NodeID]bool snapshotBlacklist map[snapshotKey]bool } @@ -65,12 +65,12 @@ func newSnapshotPool(stateProvider StateProvider) *snapshotPool { return &snapshotPool{ stateProvider: stateProvider, snapshots: make(map[snapshotKey]*snapshot), - snapshotPeers: make(map[snapshotKey]map[string]p2p.PeerID), + snapshotPeers: make(map[snapshotKey]map[p2p.NodeID]p2p.NodeID), formatIndex: make(map[uint32]map[snapshotKey]bool), heightIndex: make(map[uint64]map[snapshotKey]bool), - peerIndex: make(map[string]map[snapshotKey]bool), + peerIndex: make(map[p2p.NodeID]map[snapshotKey]bool), formatBlacklist: make(map[uint32]bool), - peerBlacklist: make(map[string]bool), + peerBlacklist: make(map[p2p.NodeID]bool), snapshotBlacklist: make(map[snapshotKey]bool), } } @@ -79,7 +79,7 @@ func newSnapshotPool(stateProvider StateProvider) *snapshotPool { // snapshots. It returns true if this was a new, non-blacklisted snapshot. The // snapshot height is verified using the light client, and the expected app hash // is set for the snapshot. -func (p *snapshotPool) Add(peer p2p.PeerID, snapshot *snapshot) (bool, error) { +func (p *snapshotPool) Add(peerID p2p.NodeID, snapshot *snapshot) (bool, error) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -96,23 +96,23 @@ func (p *snapshotPool) Add(peer p2p.PeerID, snapshot *snapshot) (bool, error) { switch { case p.formatBlacklist[snapshot.Format]: return false, nil - case p.peerBlacklist[peer.String()]: + case p.peerBlacklist[peerID]: return false, nil case p.snapshotBlacklist[key]: return false, nil - case len(p.peerIndex[peer.String()]) >= recentSnapshots: + case len(p.peerIndex[peerID]) >= recentSnapshots: return false, nil } if p.snapshotPeers[key] == nil { - p.snapshotPeers[key] = make(map[string]p2p.PeerID) + p.snapshotPeers[key] = make(map[p2p.NodeID]p2p.NodeID) } - p.snapshotPeers[key][peer.String()] = peer + p.snapshotPeers[key][peerID] = peerID - if p.peerIndex[peer.String()] == nil { - p.peerIndex[peer.String()] = make(map[snapshotKey]bool) + if p.peerIndex[peerID] == nil { + p.peerIndex[peerID] = make(map[snapshotKey]bool) } - p.peerIndex[peer.String()][key] = true + p.peerIndex[peerID][key] = true if p.snapshots[key] != nil { return false, nil @@ -142,29 +142,29 @@ func (p *snapshotPool) Best() *snapshot { } // GetPeer returns a random peer for a snapshot, if any. -func (p *snapshotPool) GetPeer(snapshot *snapshot) p2p.PeerID { +func (p *snapshotPool) GetPeer(snapshot *snapshot) p2p.NodeID { peers := p.GetPeers(snapshot) if len(peers) == 0 { - return nil + return "" } return peers[rand.Intn(len(peers))] // nolint:gosec // G404: Use of weak random number generator } // GetPeers returns the peers for a snapshot. -func (p *snapshotPool) GetPeers(snapshot *snapshot) []p2p.PeerID { +func (p *snapshotPool) GetPeers(snapshot *snapshot) []p2p.NodeID { key := snapshot.Key() p.Lock() defer p.Unlock() - peers := make([]p2p.PeerID, 0, len(p.snapshotPeers[key])) + peers := make([]p2p.NodeID, 0, len(p.snapshotPeers[key])) for _, peer := range p.snapshotPeers[key] { peers = append(peers, peer) } // sort results, for testability (otherwise order is random, so tests randomly fail) sort.Slice(peers, func(a int, b int) bool { - return bytes.Compare(peers[a], peers[b]) < 0 + return strings.Compare(string(peers[a]), string(peers[b])) < 0 }) return peers @@ -227,7 +227,7 @@ func (p *snapshotPool) RejectFormat(format uint32) { } // RejectPeer rejects a peer. It will never be used again. -func (p *snapshotPool) RejectPeer(peerID p2p.PeerID) { +func (p *snapshotPool) RejectPeer(peerID p2p.NodeID) { if len(peerID) == 0 { return } @@ -236,26 +236,26 @@ func (p *snapshotPool) RejectPeer(peerID p2p.PeerID) { defer p.Unlock() p.removePeer(peerID) - p.peerBlacklist[peerID.String()] = true + p.peerBlacklist[peerID] = true } // RemovePeer removes a peer from the pool, and any snapshots that no longer have peers. -func (p *snapshotPool) RemovePeer(peerID p2p.PeerID) { +func (p *snapshotPool) RemovePeer(peerID p2p.NodeID) { p.Lock() defer p.Unlock() p.removePeer(peerID) } // removePeer removes a peer. The caller must hold the mutex lock. -func (p *snapshotPool) removePeer(peerID p2p.PeerID) { - for key := range p.peerIndex[peerID.String()] { - delete(p.snapshotPeers[key], peerID.String()) +func (p *snapshotPool) removePeer(peerID p2p.NodeID) { + for key := range p.peerIndex[peerID] { + delete(p.snapshotPeers[key], peerID) if len(p.snapshotPeers[key]) == 0 { p.removeSnapshot(key) } } - delete(p.peerIndex, peerID.String()) + delete(p.peerIndex, peerID) } // removeSnapshot removes a snapshot. The caller must hold the mutex lock. diff --git a/statesync/snapshots_test.go b/statesync/snapshots_test.go index 866267fb7..085121f32 100644 --- a/statesync/snapshots_test.go +++ b/statesync/snapshots_test.go @@ -42,7 +42,7 @@ func TestSnapshotPool_Add(t *testing.T) { stateProvider := &mocks.StateProvider{} stateProvider.On("AppHash", mock.Anything, uint64(1)).Return([]byte("app_hash"), nil) - peerID := p2p.PeerID{0xAA} + peerID := p2p.NodeID("aa") // Adding to the pool should work pool := newSnapshotPool(stateProvider) @@ -56,8 +56,8 @@ func TestSnapshotPool_Add(t *testing.T) { require.True(t, added) // Adding again from a different peer should return false - otherPeerID := p2p.PeerID{0xBB} - added, err = pool.Add(otherPeerID, &snapshot{ + otherNodeID := p2p.NodeID("bb") + added, err = pool.Add(otherNodeID, &snapshot{ Height: 1, Format: 1, Chunks: 1, @@ -81,8 +81,8 @@ func TestSnapshotPool_GetPeer(t *testing.T) { s := &snapshot{Height: 1, Format: 1, Chunks: 1, Hash: []byte{1}} - peerAID := p2p.PeerID{0xAA} - peerBID := p2p.PeerID{0xBB} + peerAID := p2p.NodeID("aa") + peerBID := p2p.NodeID("bb") _, err := pool.Add(peerAID, s) require.NoError(t, err) @@ -98,17 +98,17 @@ func TestSnapshotPool_GetPeer(t *testing.T) { seenB := false for !seenA || !seenB { peer := pool.GetPeer(s) - if peer.Equal(peerAID) { + if peer == peerAID { seenA = true } - if peer.Equal(peerBID) { + if peer == peerBID { seenB = true } } - // GetPeer should return nil for an unknown snapshot + // GetPeer should return empty for an unknown snapshot peer := pool.GetPeer(&snapshot{Height: 9, Format: 9}) - require.Nil(t, peer) + require.EqualValues(t, "", peer) } func TestSnapshotPool_GetPeers(t *testing.T) { @@ -118,8 +118,8 @@ func TestSnapshotPool_GetPeers(t *testing.T) { s := &snapshot{Height: 1, Format: 1, Chunks: 1, Hash: []byte{1}} - peerAID := p2p.PeerID{0xAA} - peerBID := p2p.PeerID{0xBB} + peerAID := p2p.NodeID("aa") + peerBID := p2p.NodeID("bb") _, err := pool.Add(peerAID, s) require.NoError(t, err) @@ -146,22 +146,19 @@ func TestSnapshotPool_Ranked_Best(t *testing.T) { // tie-breaker. expectSnapshots := []struct { snapshot *snapshot - peers []string + peers []p2p.NodeID }{ - {&snapshot{Height: 2, Format: 2, Chunks: 4, Hash: []byte{1, 3}}, []string{"AA", "BB", "CC"}}, - {&snapshot{Height: 2, Format: 2, Chunks: 5, Hash: []byte{1, 2}}, []string{"AA"}}, - {&snapshot{Height: 2, Format: 1, Chunks: 3, Hash: []byte{1, 2}}, []string{"AA", "BB"}}, - {&snapshot{Height: 1, Format: 2, Chunks: 5, Hash: []byte{1, 2}}, []string{"AA", "BB"}}, - {&snapshot{Height: 1, Format: 1, Chunks: 4, Hash: []byte{1, 2}}, []string{"AA", "BB", "CC"}}, + {&snapshot{Height: 2, Format: 2, Chunks: 4, Hash: []byte{1, 3}}, []p2p.NodeID{"AA", "BB", "CC"}}, + {&snapshot{Height: 2, Format: 2, Chunks: 5, Hash: []byte{1, 2}}, []p2p.NodeID{"AA"}}, + {&snapshot{Height: 2, Format: 1, Chunks: 3, Hash: []byte{1, 2}}, []p2p.NodeID{"AA", "BB"}}, + {&snapshot{Height: 1, Format: 2, Chunks: 5, Hash: []byte{1, 2}}, []p2p.NodeID{"AA", "BB"}}, + {&snapshot{Height: 1, Format: 1, Chunks: 4, Hash: []byte{1, 2}}, []p2p.NodeID{"AA", "BB", "CC"}}, } // Add snapshots in reverse order, to make sure the pool enforces some order. for i := len(expectSnapshots) - 1; i >= 0; i-- { - for _, peerIDStr := range expectSnapshots[i].peers { - peerID, err := p2p.PeerIDFromString(peerIDStr) - require.NoError(t, err) - - _, err = pool.Add(peerID, expectSnapshots[i].snapshot) + for _, peerID := range expectSnapshots[i].peers { + _, err := pool.Add(peerID, expectSnapshots[i].snapshot) require.NoError(t, err) } } @@ -189,7 +186,7 @@ func TestSnapshotPool_Reject(t *testing.T) { stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) pool := newSnapshotPool(stateProvider) - peerID := p2p.PeerID{0xAA} + peerID := p2p.NodeID("aa") snapshots := []*snapshot{ {Height: 2, Format: 2, Chunks: 1, Hash: []byte{1, 2}}, @@ -219,7 +216,7 @@ func TestSnapshotPool_RejectFormat(t *testing.T) { stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) pool := newSnapshotPool(stateProvider) - peerID := p2p.PeerID{0xAA} + peerID := p2p.NodeID("aa") snapshots := []*snapshot{ {Height: 2, Format: 2, Chunks: 1, Hash: []byte{1, 2}}, @@ -250,8 +247,8 @@ func TestSnapshotPool_RejectPeer(t *testing.T) { stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) pool := newSnapshotPool(stateProvider) - peerAID := p2p.PeerID{0xAA} - peerBID := p2p.PeerID{0xBB} + peerAID := p2p.NodeID("aa") + peerBID := p2p.NodeID("bb") s1 := &snapshot{Height: 1, Format: 1, Chunks: 1, Hash: []byte{1}} s2 := &snapshot{Height: 2, Format: 1, Chunks: 1, Hash: []byte{2}} @@ -292,8 +289,8 @@ func TestSnapshotPool_RemovePeer(t *testing.T) { stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil) pool := newSnapshotPool(stateProvider) - peerAID := p2p.PeerID{0xAA} - peerBID := p2p.PeerID{0xBB} + peerAID := p2p.NodeID("aa") + peerBID := p2p.NodeID("bb") s1 := &snapshot{Height: 1, Format: 1, Chunks: 1, Hash: []byte{1}} s2 := &snapshot{Height: 2, Format: 1, Chunks: 1, Hash: []byte{2}} diff --git a/statesync/syncer.go b/statesync/syncer.go index 7e09a4b7b..f87129c87 100644 --- a/statesync/syncer.go +++ b/statesync/syncer.go @@ -107,8 +107,8 @@ func (s *syncer) AddChunk(chunk *chunk) (bool, error) { // AddSnapshot adds a snapshot to the snapshot pool. It returns true if a new, previously unseen // snapshot was accepted and added. -func (s *syncer) AddSnapshot(peer p2p.PeerID, snapshot *snapshot) (bool, error) { - added, err := s.snapshots.Add(peer, snapshot) +func (s *syncer) AddSnapshot(peerID p2p.NodeID, snapshot *snapshot) (bool, error) { + added, err := s.snapshots.Add(peerID, snapshot) if err != nil { return false, err } @@ -121,18 +121,18 @@ func (s *syncer) AddSnapshot(peer p2p.PeerID, snapshot *snapshot) (bool, error) // AddPeer adds a peer to the pool. For now we just keep it simple and send a // single request to discover snapshots, later we may want to do retries and stuff. -func (s *syncer) AddPeer(peer p2p.PeerID) { - s.logger.Debug("Requesting snapshots from peer", "peer", peer.String()) +func (s *syncer) AddPeer(peerID p2p.NodeID) { + s.logger.Debug("Requesting snapshots from peer", "peer", peerID) s.snapshotCh <- p2p.Envelope{ - To: peer, + To: peerID, Message: &ssproto.SnapshotsRequest{}, } } // RemovePeer removes a peer from the pool. -func (s *syncer) RemovePeer(peer p2p.PeerID) { - s.logger.Debug("Removing peer from sync", "peer", peer.String()) - s.snapshots.RemovePeer(peer) +func (s *syncer) RemovePeer(peerID p2p.NodeID) { + s.logger.Debug("Removing peer from sync", "peer", peerID) + s.snapshots.RemovePeer(peerID) } // SyncAny tries to sync any of the snapshots in the snapshot pool, waiting to discover further @@ -206,7 +206,7 @@ func (s *syncer) SyncAny(discoveryTime time.Duration) (sm.State, *types.Commit, "hash", fmt.Sprintf("%X", snapshot.Hash)) for _, peer := range s.snapshots.GetPeers(snapshot) { s.snapshots.RejectPeer(peer) - s.logger.Info("Snapshot sender rejected", "peer", peer.String()) + s.logger.Info("Snapshot sender rejected", "peer", peer) } default: @@ -335,7 +335,7 @@ func (s *syncer) applyChunks(chunks *chunkQueue) error { resp, err := s.conn.ApplySnapshotChunkSync(context.Background(), abci.RequestApplySnapshotChunk{ Index: chunk.Index, Chunk: chunk.Chunk, - Sender: chunk.Sender.String(), + Sender: string(chunk.Sender), }) if err != nil { return fmt.Errorf("failed to apply chunk %v: %w", chunk.Index, err) @@ -354,11 +354,7 @@ func (s *syncer) applyChunks(chunks *chunkQueue) error { // Reject any senders as requested by the app for _, sender := range resp.RejectSenders { if sender != "" { - peerID, err := p2p.PeerIDFromString(sender) - if err != nil { - return err - } - + peerID := p2p.NodeID(sender) s.snapshots.RejectPeer(peerID) if err := chunks.DiscardSender(peerID); err != nil { @@ -423,7 +419,7 @@ func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *ch // requestChunk requests a chunk from a peer. func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) { peer := s.snapshots.GetPeer(snapshot) - if peer == nil { + if peer == "" { s.logger.Error("No valid peers found for snapshot", "height", snapshot.Height, "format", snapshot.Format, "hash", snapshot.Hash) return @@ -434,7 +430,7 @@ func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) { "height", snapshot.Height, "format", snapshot.Format, "chunk", chunk, - "peer", peer.String(), + "peer", peer, ) s.chunkCh <- p2p.Envelope{ diff --git a/statesync/syncer_test.go b/statesync/syncer_test.go index 229e91c98..cb2da3595 100644 --- a/statesync/syncer_test.go +++ b/statesync/syncer_test.go @@ -66,8 +66,8 @@ func TestSyncer_SyncAny(t *testing.T) { connSnapshot := &proxymocks.AppConnSnapshot{} connQuery := &proxymocks.AppConnQuery{} - peerAID := p2p.PeerID{0xAA} - peerBID := p2p.PeerID{0xBB} + peerAID := p2p.NodeID("aa") + peerBID := p2p.NodeID("bb") rts := setup(t, connSnapshot, connQuery, stateProvider, 3) @@ -207,7 +207,7 @@ func TestSyncer_SyncAny_abort(t *testing.T) { rts := setup(t, nil, nil, stateProvider, 2) s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}} - peerID := p2p.PeerID{0xAA} + peerID := p2p.NodeID("aa") _, err := rts.syncer.AddSnapshot(peerID, s) require.NoError(t, err) @@ -232,7 +232,7 @@ func TestSyncer_SyncAny_reject(t *testing.T) { s12 := &snapshot{Height: 1, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}} s11 := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}} - peerID := p2p.PeerID{0xAA} + peerID := p2p.NodeID("aa") _, err := rts.syncer.AddSnapshot(peerID, s22) require.NoError(t, err) @@ -271,7 +271,7 @@ func TestSyncer_SyncAny_reject_format(t *testing.T) { s12 := &snapshot{Height: 1, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}} s11 := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}} - peerID := p2p.PeerID{0xAA} + peerID := p2p.NodeID("aa") _, err := rts.syncer.AddSnapshot(peerID, s22) require.NoError(t, err) @@ -301,9 +301,9 @@ func TestSyncer_SyncAny_reject_sender(t *testing.T) { rts := setup(t, nil, nil, stateProvider, 2) - peerAID := p2p.PeerID{0xAA} - peerBID := p2p.PeerID{0xBB} - peerCID := p2p.PeerID{0xCC} + peerAID := p2p.NodeID("aa") + peerBID := p2p.NodeID("bb") + peerCID := p2p.NodeID("cc") // sbc will be offered first, which will be rejected with reject_sender, causing all snapshots // submitted by both b and c (i.e. sb, sc, sbc) to be rejected. Finally, sa will reject and @@ -350,7 +350,7 @@ func TestSyncer_SyncAny_abciError(t *testing.T) { errBoom := errors.New("boom") s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}} - peerID := p2p.PeerID{0xAA} + peerID := p2p.NodeID("aa") _, err := rts.syncer.AddSnapshot(peerID, s) require.NoError(t, err) @@ -551,9 +551,9 @@ func TestSyncer_applyChunks_RejectSenders(t *testing.T) { // Set up three peers across two snapshots, and ask for one of them to be banned. // It should be banned from all snapshots. - peerAID := p2p.PeerID{0xAA} - peerBID := p2p.PeerID{0xBB} - peerCID := p2p.PeerID{0xCC} + peerAID := p2p.NodeID("aa") + peerBID := p2p.NodeID("bb") + peerCID := p2p.NodeID("cc") s1 := &snapshot{Height: 1, Format: 1, Chunks: 3} s2 := &snapshot{Height: 2, Format: 1, Chunks: 3} @@ -602,7 +602,7 @@ func TestSyncer_applyChunks_RejectSenders(t *testing.T) { Index: 2, Chunk: []byte{2}, Sender: "cc", }).Once().Return(&abci.ResponseApplySnapshotChunk{ Result: tc.result, - RejectSenders: []string{peerBID.String()}, + RejectSenders: []string{string(peerBID)}, }, nil) // On retry, the last chunk will be tried again, so we just accept it then. @@ -623,13 +623,13 @@ func TestSyncer_applyChunks_RejectSenders(t *testing.T) { s1peers := rts.syncer.snapshots.GetPeers(s1) require.Len(t, s1peers, 2) - require.EqualValues(t, "aa", s1peers[0].String()) - require.EqualValues(t, "cc", s1peers[1].String()) + require.EqualValues(t, "aa", s1peers[0]) + require.EqualValues(t, "cc", s1peers[1]) rts.syncer.snapshots.GetPeers(s1) require.Len(t, s1peers, 2) - require.EqualValues(t, "aa", s1peers[0].String()) - require.EqualValues(t, "cc", s1peers[1].String()) + require.EqualValues(t, "aa", s1peers[0]) + require.EqualValues(t, "cc", s1peers[1]) require.NoError(t, chunks.Close()) })