From 701df09971781179918a05db0e14dd5b9d1eabc0 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 19 Mar 2018 08:22:45 +0300 Subject: [PATCH 1/7] do not use keywords Refs #1317 --- blockchain/reactor.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 2b334c232..3c25eed2f 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -76,9 +76,9 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl store.Height())) } - const cap = 1000 // must be bigger than peers count - requestsCh := make(chan BlockRequest, cap) - errorsCh := make(chan peerError, cap) // so we don't block in #Receive#pool.AddBlock + const capacity = 1000 // must be bigger than peers count + requestsCh := make(chan BlockRequest, capacity) + errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock pool := NewBlockPool( store.Height()+1, From 5fab8e404de52a3e031ae12d9117e2b448cd69fc Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 19 Mar 2018 09:55:08 +0300 Subject: [PATCH 2/7] replace magic number with blocksToContributeToBecomeGoodPeer const Refs #1317 --- consensus/reactor.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index 5c672a0c7..2c61464d5 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -27,6 +27,8 @@ const ( VoteSetBitsChannel = byte(0x23) maxConsensusMessageSize = 1048576 // 1MB; NOTE/TODO: keep in sync with types.PartSet sizes. + + blocksToContributeToBecomeGoodPeer = 10000 ) //----------------------------------------------------------------------------- @@ -251,7 +253,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) ps.ApplyProposalPOLMessage(msg) case *BlockPartMessage: ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index) - if numBlocks := ps.RecordBlockPart(msg); numBlocks > 10000 { + if numBlocks := ps.RecordBlockPart(msg); numBlocks > blocksToContributeToBecomeGoodPeer { conR.Switch.MarkPeerAsGood(src) } conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()} @@ -273,7 +275,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) ps.EnsureVoteBitArrays(height, valSize) ps.EnsureVoteBitArrays(height-1, lastCommitSize) ps.SetHasVote(msg.Vote) - if blocks := ps.RecordVote(msg.Vote); blocks > 10000 { + if blocks := ps.RecordVote(msg.Vote); blocks > blocksToContributeToBecomeGoodPeer { conR.Switch.MarkPeerAsGood(src) } From 31f3dd42e7c420ae6a018c7b9a965f22c5736186 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 19 Mar 2018 09:57:21 +0300 Subject: [PATCH 3/7] mark peer as good only once or should we do it every N blocks? Refs #1317 --- consensus/reactor.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index 2c61464d5..6b1eb9b6f 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -253,7 +253,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) ps.ApplyProposalPOLMessage(msg) case *BlockPartMessage: ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index) - if numBlocks := ps.RecordBlockPart(msg); numBlocks > blocksToContributeToBecomeGoodPeer { + if numBlocks := ps.RecordBlockPart(msg); numBlocks == blocksToContributeToBecomeGoodPeer { conR.Switch.MarkPeerAsGood(src) } conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()} @@ -275,7 +275,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) ps.EnsureVoteBitArrays(height, valSize) ps.EnsureVoteBitArrays(height-1, lastCommitSize) ps.SetHasVote(msg.Vote) - if blocks := ps.RecordVote(msg.Vote); blocks > blocksToContributeToBecomeGoodPeer { + if blocks := ps.RecordVote(msg.Vote); blocks == blocksToContributeToBecomeGoodPeer { conR.Switch.MarkPeerAsGood(src) } From 42e345788422ea732d45aa5ef0273db4db08b466 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 19 Mar 2018 10:06:25 +0300 Subject: [PATCH 4/7] fix tracking of votes and blockparts to not allow old information also remove mutex Refs #1317 --- consensus/reactor.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index 6b1eb9b6f..5c4446bf5 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -1079,10 +1079,7 @@ func (ps *PeerState) ensureVoteBitArrays(height int64, numValidators int) { // It returns the total number of votes (1 per block). This essentially means // the number of blocks for which peer has been sending us votes. func (ps *PeerState) RecordVote(vote *types.Vote) int { - ps.mtx.Lock() - defer ps.mtx.Unlock() - - if ps.stats.lastVoteHeight == vote.Height { + if ps.stats.lastVoteHeight >= vote.Height { return ps.stats.votes } ps.stats.lastVoteHeight = vote.Height @@ -1094,10 +1091,7 @@ func (ps *PeerState) RecordVote(vote *types.Vote) int { // It returns the total number of block parts (1 per block). This essentially means // the number of blocks for which peer has been sending us block parts. func (ps *PeerState) RecordBlockPart(bp *BlockPartMessage) int { - ps.mtx.Lock() - defer ps.mtx.Unlock() - - if ps.stats.lastBlockPartHeight == bp.Height { + if ps.stats.lastBlockPartHeight >= bp.Height { return ps.stats.blockParts } From ab59f64f57c59ef26cbe328c518c2eaecbd74d44 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 19 Mar 2018 13:13:19 +0300 Subject: [PATCH 5/7] test we record votes and block parts Refs #1317 --- consensus/reactor.go | 22 +++++++- consensus/reactor_test.go | 109 ++++++++++++++++++++++++++++++++++++++ p2p/dummy/peer.go | 72 +++++++++++++++++++++++++ 3 files changed, 201 insertions(+), 2 deletions(-) create mode 100644 p2p/dummy/peer.go diff --git a/consensus/reactor.go b/consensus/reactor.go index 5c4446bf5..60ac3d9c9 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -852,6 +852,10 @@ type peerStateStats struct { blockParts int } +func (pss peerStateStats) String() string { + return fmt.Sprintf("peerStateStats{votes: %d, blockParts: %d}", pss.votes, pss.blockParts) +} + // NewPeerState returns a new PeerState for the given Peer func NewPeerState(peer p2p.Peer) *PeerState { return &PeerState{ @@ -1087,6 +1091,12 @@ func (ps *PeerState) RecordVote(vote *types.Vote) int { return ps.stats.votes } +// VotesSent returns the number of blocks for which peer has been sending us +// votes. +func (ps *PeerState) VotesSent() int { + return ps.stats.votes +} + // RecordVote updates internal statistics for this peer by recording the block part. // It returns the total number of block parts (1 per block). This essentially means // the number of blocks for which peer has been sending us block parts. @@ -1100,6 +1110,12 @@ func (ps *PeerState) RecordBlockPart(bp *BlockPartMessage) int { return ps.stats.blockParts } +// BlockPartsSent returns the number of blocks for which peer has been sending +// us block parts. +func (ps *PeerState) BlockPartsSent() int { + return ps.stats.blockParts +} + // SetHasVote sets the given vote as known by the peer func (ps *PeerState) SetHasVote(vote *types.Vote) { ps.mtx.Lock() @@ -1246,11 +1262,13 @@ func (ps *PeerState) StringIndented(indent string) string { ps.mtx.Lock() defer ps.mtx.Unlock() return fmt.Sprintf(`PeerState{ -%s Key %v -%s PRS %v +%s Key %v +%s PRS %v +%s Stats %v %s}`, indent, ps.Peer.ID(), indent, ps.PeerRoundState.StringIndented(indent+" "), + indent, ps.stats, indent) } diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 8e96de2bb..26fc7e171 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -11,10 +11,13 @@ import ( "time" "github.com/tendermint/abci/example/kvstore" + wire "github.com/tendermint/tendermint/wire" + cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/p2p" + p2pdummy "github.com/tendermint/tendermint/p2p/dummy" "github.com/tendermint/tendermint/types" "github.com/stretchr/testify/assert" @@ -121,6 +124,112 @@ func TestReactorProposalHeartbeats(t *testing.T) { }, css) } +// Test we record block parts from other peers +func TestReactorRecordsBlockParts(t *testing.T) { + // create dummy peer + peer := p2pdummy.NewPeer() + ps := NewPeerState(peer).SetLogger(log.TestingLogger()) + peer.Set(types.PeerStateKey, ps) + + // create reactor + css := randConsensusNet(1, "consensus_reactor_records_block_parts_test", newMockTickerFunc(true), newPersistentKVStore) + reactor := NewConsensusReactor(css[0], false) // so we dont start the consensus states + reactor.SetEventBus(css[0].eventBus) + reactor.SetLogger(log.TestingLogger()) + sw := p2p.MakeSwitch(cfg.DefaultP2PConfig(), 1, "testing", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw }) + reactor.SetSwitch(sw) + err := reactor.Start() + require.NoError(t, err) + defer reactor.Stop() + + // 1) new block part + parts := types.NewPartSetFromData(cmn.RandBytes(100), 10) + msg := &BlockPartMessage{ + Height: 2, + Round: 0, + Part: parts.GetPart(0), + } + bz, err := wire.MarshalBinary(struct{ ConsensusMessage }{msg}) + require.NoError(t, err) + + reactor.Receive(DataChannel, peer, bz) + assert.Equal(t, 1, ps.BlockPartsSent(), "number of block parts sent should have increased by 1") + + // 2) block part with the same height, but different round + msg.Round = 1 + + bz, err = wire.MarshalBinary(struct{ ConsensusMessage }{msg}) + require.NoError(t, err) + + reactor.Receive(DataChannel, peer, bz) + assert.Equal(t, 1, ps.BlockPartsSent(), "number of block parts sent should stay the same") + + // 3) block part from earlier height + msg.Height = 1 + msg.Round = 0 + + bz, err = wire.MarshalBinary(struct{ ConsensusMessage }{msg}) + require.NoError(t, err) + + reactor.Receive(DataChannel, peer, bz) + assert.Equal(t, 1, ps.BlockPartsSent(), "number of block parts sent should stay the same") +} + +// Test we record votes from other peers +func TestReactorRecordsVotes(t *testing.T) { + // create dummy peer + peer := p2pdummy.NewPeer() + ps := NewPeerState(peer).SetLogger(log.TestingLogger()) + peer.Set(types.PeerStateKey, ps) + + // create reactor + css := randConsensusNet(1, "consensus_reactor_records_votes_test", newMockTickerFunc(true), newPersistentKVStore) + reactor := NewConsensusReactor(css[0], false) // so we dont start the consensus states + reactor.SetEventBus(css[0].eventBus) + reactor.SetLogger(log.TestingLogger()) + sw := p2p.MakeSwitch(cfg.DefaultP2PConfig(), 1, "testing", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw }) + reactor.SetSwitch(sw) + err := reactor.Start() + require.NoError(t, err) + defer reactor.Stop() + _, val := css[0].state.Validators.GetByIndex(0) + + // 1) new vote + vote := &types.Vote{ + ValidatorIndex: 0, + ValidatorAddress: val.Address, + Height: 2, + Round: 0, + Timestamp: time.Now().UTC(), + Type: types.VoteTypePrevote, + BlockID: types.BlockID{}, + } + bz, err := wire.MarshalBinary(struct{ ConsensusMessage }{&VoteMessage{vote}}) + require.NoError(t, err) + + reactor.Receive(VoteChannel, peer, bz) + assert.Equal(t, 1, ps.VotesSent(), "number of votes sent should have increased by 1") + + // 2) vote with the same height, but different round + vote.Round = 1 + + bz, err = wire.MarshalBinary(struct{ ConsensusMessage }{&VoteMessage{vote}}) + require.NoError(t, err) + + reactor.Receive(VoteChannel, peer, bz) + assert.Equal(t, 1, ps.VotesSent(), "number of votes sent should stay the same") + + // 3) vote from earlier height + vote.Height = 1 + vote.Round = 0 + + bz, err = wire.MarshalBinary(struct{ ConsensusMessage }{&VoteMessage{vote}}) + require.NoError(t, err) + + reactor.Receive(VoteChannel, peer, bz) + assert.Equal(t, 1, ps.VotesSent(), "number of votes sent should stay the same") +} + //------------------------------------------------------------- // ensure we can make blocks despite cycling a validator set diff --git a/p2p/dummy/peer.go b/p2p/dummy/peer.go new file mode 100644 index 000000000..61c3a8ace --- /dev/null +++ b/p2p/dummy/peer.go @@ -0,0 +1,72 @@ +package dummy + +import ( + p2p "github.com/tendermint/tendermint/p2p" + tmconn "github.com/tendermint/tendermint/p2p/conn" + cmn "github.com/tendermint/tmlibs/common" +) + +type peer struct { + cmn.BaseService + kv map[string]interface{} +} + +var _ p2p.Peer = (*peer)(nil) + +// NewPeer creates new dummy peer. +func NewPeer() *peer { + p := &peer{ + kv: make(map[string]interface{}), + } + p.BaseService = *cmn.NewBaseService(nil, "peer", p) + return p +} + +// ID always returns dummy. +func (p *peer) ID() p2p.ID { + return p2p.ID("dummy") +} + +// IsOutbound always returns false. +func (p *peer) IsOutbound() bool { + return false +} + +// IsPersistent always returns false. +func (p *peer) IsPersistent() bool { + return false +} + +// NodeInfo always returns empty node info. +func (p *peer) NodeInfo() p2p.NodeInfo { + return p2p.NodeInfo{} +} + +// Status always returns empry connection status. +func (p *peer) Status() tmconn.ConnectionStatus { + return tmconn.ConnectionStatus{} +} + +// Send does not do anything and just returns true. +func (p *peer) Send(byte, interface{}) bool { + return true +} + +// TrySend does not do anything and just returns true. +func (p *peer) TrySend(byte, interface{}) bool { + return true +} + +// Set records value under key specified in the map. +func (p *peer) Set(key string, value interface{}) { + p.kv[key] = value +} + +// Get returns a value associated with the key. Nil is returned if no value +// found. +func (p *peer) Get(key string) interface{} { + if value, ok := p.kv[key]; ok { + return value + } + return nil +} From d8b08cd943be0ad097dae940c4c2e44a15b6047a Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 19 Mar 2018 13:19:05 +0300 Subject: [PATCH 6/7] return back panic in peer#onReceive Refs #1317 --- p2p/peer.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/p2p/peer.go b/p2p/peer.go index e20271149..4af6eeaae 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -358,7 +358,9 @@ func createMConnection(conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, ch onReceive := func(chID byte, msgBytes []byte) { reactor := reactorsByCh[chID] if reactor == nil { - onPeerError(p, fmt.Errorf("Unknown channel %X", chID)) + // Note that its ok to panic here as it's caught in the conn._recover, + // which does onPeerError. + panic(cmn.Fmt("Unknown channel %X", chID)) } reactor.Receive(chID, p, msgBytes) } From 2c445059f2379a75a6e10c9da75b3ea428f6d5f4 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 19 Mar 2018 14:10:25 +0300 Subject: [PATCH 7/7] mark peer as good every blocksToContributeToBecomeGoodPeer blocks if enough peers are marked good eventually some will become unmarked, so good to have a force that will continue to cycle them back into good territory! Refs #1317 --- consensus/reactor.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index 60ac3d9c9..a265f76c0 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -253,7 +253,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) ps.ApplyProposalPOLMessage(msg) case *BlockPartMessage: ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index) - if numBlocks := ps.RecordBlockPart(msg); numBlocks == blocksToContributeToBecomeGoodPeer { + if numBlocks := ps.RecordBlockPart(msg); numBlocks%blocksToContributeToBecomeGoodPeer == 0 { conR.Switch.MarkPeerAsGood(src) } conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()} @@ -275,7 +275,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) ps.EnsureVoteBitArrays(height, valSize) ps.EnsureVoteBitArrays(height-1, lastCommitSize) ps.SetHasVote(msg.Vote) - if blocks := ps.RecordVote(msg.Vote); blocks == blocksToContributeToBecomeGoodPeer { + if blocks := ps.RecordVote(msg.Vote); blocks%blocksToContributeToBecomeGoodPeer == 0 { conR.Switch.MarkPeerAsGood(src) }