From 87ce804b4a11ef1e0172da3747f69af85f9966b6 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 26 Feb 2018 17:33:55 +0400 Subject: [PATCH 01/14] cmn.PanicSanity is deprecated --- blockchain/pool.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/blockchain/pool.go b/blockchain/pool.go index d0f4d2976..688be0e72 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -199,7 +199,7 @@ func (pool *BlockPool) PopRequest() { delete(pool.requesters, pool.height) pool.height++ } else { - cmn.PanicSanity(cmn.Fmt("Expected requester to pop, got nothing at height %v", pool.height)) + panic(fmt.Sprintf("Expected requester to pop, got nothing at height %v", pool.height)) } } @@ -213,8 +213,9 @@ func (pool *BlockPool) RedoRequest(height int64) p2p.ID { request := pool.requesters[height] if request.block == nil { - cmn.PanicSanity("Expected block to be non-nil") + panic("Expected block to be non-nil") } + // RemovePeer will redo all requesters associated with this peer. pool.removePeer(request.peerID) return request.peerID From 0c7e871ef071f7bdb8b125327707ecc2654aeaf6 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 26 Feb 2018 17:35:01 +0400 Subject: [PATCH 02/14] [blockchain] replace timeoutsCh with more abstract errorsCh --- blockchain/pool.go | 27 +++++++++++++++------------ blockchain/pool_test.go | 25 +++++++++++++------------ blockchain/reactor.go | 31 +++++++++++++++++++------------ 3 files changed, 47 insertions(+), 36 deletions(-) diff --git a/blockchain/pool.go b/blockchain/pool.go index 688be0e72..5de719792 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -1,6 +1,7 @@ package blockchain import ( + "errors" "fmt" "math" "sync" @@ -41,7 +42,7 @@ const ( minRecvRate = 7680 ) -var peerTimeoutSeconds = time.Duration(15) // not const so we can override with tests +var peerTimeout = 15 * time.Second // not const so we can override with tests /* Peers self report their heights when we join the block pool. @@ -68,10 +69,10 @@ type BlockPool struct { maxPeerHeight int64 requestsCh chan<- BlockRequest - timeoutsCh chan<- p2p.ID + errorsCh chan<- peerError } -func NewBlockPool(start int64, requestsCh chan<- BlockRequest, timeoutsCh chan<- p2p.ID) *BlockPool { +func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- peerError) *BlockPool { bp := &BlockPool{ peers: make(map[p2p.ID]*bpPeer), @@ -80,7 +81,7 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, timeoutsCh chan<- numPending: 0, requestsCh: requestsCh, - timeoutsCh: timeoutsCh, + errorsCh: errorsCh, } bp.BaseService = *cmn.NewBaseService(nil, "BlockPool", bp) return bp @@ -128,9 +129,10 @@ func (pool *BlockPool) removeTimedoutPeers() { curRate := peer.recvMonitor.Status().CurRate // curRate can be 0 on start if curRate != 0 && curRate < minRecvRate { - pool.sendTimeout(peer.id) + err := errors.New("peer is not sending us data fast enough") + pool.sendError(err, peer.id) pool.Logger.Error("SendTimeout", "peer", peer.id, - "reason", "peer is not sending us data fast enough", + "reason", err, "curRate", fmt.Sprintf("%d KB/s", curRate/1024), "minRate", fmt.Sprintf("%d KB/s", minRecvRate/1024)) peer.didTimeout = true @@ -340,11 +342,11 @@ func (pool *BlockPool) sendRequest(height int64, peerID p2p.ID) { pool.requestsCh <- BlockRequest{height, peerID} } -func (pool *BlockPool) sendTimeout(peerID p2p.ID) { +func (pool *BlockPool) sendError(err error, peerID p2p.ID) { if !pool.IsRunning() { return } - pool.timeoutsCh <- peerID + pool.errorsCh <- peerError{err, peerID} } // unused by tendermint; left for debugging purposes @@ -403,9 +405,9 @@ func (peer *bpPeer) resetMonitor() { func (peer *bpPeer) resetTimeout() { if peer.timeout == nil { - peer.timeout = time.AfterFunc(time.Second*peerTimeoutSeconds, peer.onTimeout) + peer.timeout = time.AfterFunc(peerTimeout, peer.onTimeout) } else { - peer.timeout.Reset(time.Second * peerTimeoutSeconds) + peer.timeout.Reset(peerTimeout) } } @@ -431,8 +433,9 @@ func (peer *bpPeer) onTimeout() { peer.pool.mtx.Lock() defer peer.pool.mtx.Unlock() - peer.pool.sendTimeout(peer.id) - peer.logger.Error("SendTimeout", "reason", "onTimeout") + err := errors.New("peer did not send us anything") + peer.pool.sendError(err, peer.id) + peer.logger.Error("SendTimeout", "reason", err, "timeout", peerTimeout) peer.didTimeout = true } diff --git a/blockchain/pool_test.go b/blockchain/pool_test.go index ce16899a7..790216ac0 100644 --- a/blockchain/pool_test.go +++ b/blockchain/pool_test.go @@ -13,7 +13,7 @@ import ( ) func init() { - peerTimeoutSeconds = time.Duration(2) + peerTimeout = 2 * time.Second } type testPeer struct { @@ -34,9 +34,9 @@ func makePeers(numPeers int, minHeight, maxHeight int64) map[p2p.ID]testPeer { func TestBasic(t *testing.T) { start := int64(42) peers := makePeers(10, start+1, 1000) - timeoutsCh := make(chan p2p.ID, 100) - requestsCh := make(chan BlockRequest, 100) - pool := NewBlockPool(start, requestsCh, timeoutsCh) + errorsCh := make(chan peerError) + requestsCh := make(chan BlockRequest) + pool := NewBlockPool(start, requestsCh, errorsCh) pool.SetLogger(log.TestingLogger()) err := pool.Start() @@ -71,8 +71,8 @@ func TestBasic(t *testing.T) { // Pull from channels for { select { - case peerID := <-timeoutsCh: - t.Errorf("timeout: %v", peerID) + case err := <-errorsCh: + t.Error(err) case request := <-requestsCh: t.Logf("Pulled new BlockRequest %v", request) if request.Height == 300 { @@ -91,9 +91,9 @@ func TestBasic(t *testing.T) { func TestTimeout(t *testing.T) { start := int64(42) peers := makePeers(10, start+1, 1000) - timeoutsCh := make(chan p2p.ID, 100) - requestsCh := make(chan BlockRequest, 100) - pool := NewBlockPool(start, requestsCh, timeoutsCh) + errorsCh := make(chan peerError) + requestsCh := make(chan BlockRequest) + pool := NewBlockPool(start, requestsCh, errorsCh) pool.SetLogger(log.TestingLogger()) err := pool.Start() if err != nil { @@ -132,9 +132,10 @@ func TestTimeout(t *testing.T) { timedOut := map[p2p.ID]struct{}{} for { select { - case peerID := <-timeoutsCh: - t.Logf("Peer %v timeouted", peerID) - if _, ok := timedOut[peerID]; !ok { + case err := <-errorsCh: + t.Log(err) + // consider error to be always timeout here + if _, ok := timedOut[err.peerID]; !ok { counter++ if counter == len(peers) { return // Done! diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 2ad6770be..cae55e807 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -22,8 +22,7 @@ const ( // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height) BlockchainChannel = byte(0x40) - defaultChannelCapacity = 1000 - trySyncIntervalMS = 50 + trySyncIntervalMS = 50 // stop syncing when last block's time is // within this much of the system time. // stopSyncingDurationMinutes = 10 @@ -40,6 +39,15 @@ type consensusReactor interface { SwitchToConsensus(sm.State, int) } +type peerError struct { + err error + peerID p2p.ID +} + +func (e peerError) Error() string { + return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error()) +} + // BlockchainReactor handles long-term catchup syncing. type BlockchainReactor struct { p2p.BaseReactor @@ -56,7 +64,7 @@ type BlockchainReactor struct { fastSync bool requestsCh <-chan BlockRequest - timeoutsCh <-chan p2p.ID + errorsCh <-chan peerError } // NewBlockchainReactor returns new reactor instance. @@ -68,12 +76,12 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl store.Height())) } - requestsCh := make(chan BlockRequest, defaultChannelCapacity) - timeoutsCh := make(chan p2p.ID, defaultChannelCapacity) + requestsCh := make(chan BlockRequest) + errorsCh := make(chan peerError) pool := NewBlockPool( store.Height()+1, requestsCh, - timeoutsCh, + errorsCh, ) bcR := &BlockchainReactor{ params: state.ConsensusParams, @@ -83,7 +91,7 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl pool: pool, fastSync: fastSync, requestsCh: requestsCh, - timeoutsCh: timeoutsCh, + errorsCh: errorsCh, } bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR) return bcR @@ -230,7 +238,7 @@ func (bcR *BlockchainReactor) poolRoutine() { FOR_LOOP: for { select { - case request := <-bcR.requestsCh: // chan BlockRequest + case request := <-bcR.requestsCh: peer := bcR.Switch.Peers().Get(request.PeerID) if peer == nil { continue FOR_LOOP // Peer has since been disconnected. @@ -242,11 +250,10 @@ FOR_LOOP: // The pool handles timeouts, just let it go. continue FOR_LOOP } - case peerID := <-bcR.timeoutsCh: // chan string - // Peer timed out. - peer := bcR.Switch.Peers().Get(peerID) + case err := <-bcR.errorsCh: + peer := bcR.Switch.Peers().Get(err.peerID) if peer != nil { - bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout")) + bcR.Switch.StopPeerForError(peer, err) } case <-statusUpdateTicker.C: // ask for status updates From baf457e6d4a8ff38e0976119cf0cca590264ec3c Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Sun, 4 Mar 2018 13:42:08 +0400 Subject: [PATCH 03/14] return error if peer sent us a block we didn't expect with a height too far ahead/behind --- blockchain/pool.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/blockchain/pool.go b/blockchain/pool.go index 5de719792..603b4bf2a 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -40,6 +40,9 @@ const ( // Assuming a DSL connection (not a good choice) 128 Kbps (upload) ~ 15 KB/s, // sending data across atlantic ~ 7.5 KB/s. minRecvRate = 7680 + + // Maximum difference between current and new block's height. + maxDiffBetweenCurrentAndReceivedBlockHeight = 100 ) var peerTimeout = 15 * time.Second // not const so we can override with tests @@ -230,8 +233,14 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int requester := pool.requesters[block.Height] if requester == nil { - // a block we didn't expect. - // TODO:if height is too far ahead, punish peer + pool.Logger.Info("peer sent us a block we didn't expect", "peer", peerID, "curHeight", pool.height, "blockHeight", block.Height) + diff := pool.height - block.Height + if diff < 0 { + diff *= -1 + } + if diff > maxDiffBetweenCurrentAndReceivedBlockHeight { + pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID) + } return } From f2996895738b2f19d9d192133bba9e91acd4ea01 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Sun, 4 Mar 2018 13:42:26 +0400 Subject: [PATCH 04/14] return back defaultChannelCapacity --- blockchain/pool_test.go | 8 ++++---- blockchain/reactor.go | 5 +++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/blockchain/pool_test.go b/blockchain/pool_test.go index 790216ac0..82120eaef 100644 --- a/blockchain/pool_test.go +++ b/blockchain/pool_test.go @@ -34,8 +34,8 @@ func makePeers(numPeers int, minHeight, maxHeight int64) map[p2p.ID]testPeer { func TestBasic(t *testing.T) { start := int64(42) peers := makePeers(10, start+1, 1000) - errorsCh := make(chan peerError) - requestsCh := make(chan BlockRequest) + errorsCh := make(chan peerError, 1000) + requestsCh := make(chan BlockRequest, 1000) pool := NewBlockPool(start, requestsCh, errorsCh) pool.SetLogger(log.TestingLogger()) @@ -91,8 +91,8 @@ func TestBasic(t *testing.T) { func TestTimeout(t *testing.T) { start := int64(42) peers := makePeers(10, start+1, 1000) - errorsCh := make(chan peerError) - requestsCh := make(chan BlockRequest) + errorsCh := make(chan peerError, 1000) + requestsCh := make(chan BlockRequest, 1000) pool := NewBlockPool(start, requestsCh, errorsCh) pool.SetLogger(log.TestingLogger()) err := pool.Start() diff --git a/blockchain/reactor.go b/blockchain/reactor.go index cae55e807..7462580b6 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -76,8 +76,9 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl store.Height())) } - requestsCh := make(chan BlockRequest) - errorsCh := make(chan peerError) + const cap = 1000 // must be bigger than peers count + requestsCh := make(chan BlockRequest, cap) + errorsCh := make(chan peerError, cap) // so we don't block in #Receive#pool.AddBlock pool := NewBlockPool( store.Height()+1, requestsCh, From 42423528522b23dc2dba4f6dc1d80c972a430e98 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Sun, 4 Mar 2018 13:42:45 +0400 Subject: [PATCH 05/14] stop peer on decoding error --- blockchain/reactor.go | 3 ++- consensus/reactor.go | 2 +- evidence/reactor.go | 3 ++- mempool/reactor.go | 3 ++- p2p/pex/pex_reactor.go | 3 ++- 5 files changed, 9 insertions(+), 5 deletions(-) diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 7462580b6..23d399426 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -175,7 +175,8 @@ func (bcR *BlockchainReactor) respondToPeer(msg *bcBlockRequestMessage, func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { _, msg, err := DecodeMessage(msgBytes, bcR.maxMsgSize()) if err != nil { - bcR.Logger.Error("Error decoding message", "err", err) + bcR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) + bcR.Switch.StopPeerForError(src, err) return } diff --git a/consensus/reactor.go b/consensus/reactor.go index b63793670..8318f2bb0 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -179,7 +179,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) _, msg, err := DecodeMessage(msgBytes) if err != nil { conR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) - // TODO punish peer? + conR.Switch.StopPeerForError(src, err) return } conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg) diff --git a/evidence/reactor.go b/evidence/reactor.go index 169a274d3..7eb6bfb65 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -84,7 +84,8 @@ func (evR *EvidenceReactor) RemovePeer(peer p2p.Peer, reason interface{}) { func (evR *EvidenceReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { _, msg, err := DecodeMessage(msgBytes) if err != nil { - evR.Logger.Error("Error decoding message", "err", err) + evR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) + evR.Switch.StopPeerForError(src, err) return } evR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg) diff --git a/mempool/reactor.go b/mempool/reactor.go index 58650a197..514347e94 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -73,7 +73,8 @@ func (memR *MempoolReactor) RemovePeer(peer p2p.Peer, reason interface{}) { func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { _, msg, err := DecodeMessage(msgBytes) if err != nil { - memR.Logger.Error("Error decoding message", "err", err) + memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) + memR.Switch.StopPeerForError(src, err) return } memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg) diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 193efc88d..441010aa7 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -167,7 +167,8 @@ func (r *PEXReactor) RemovePeer(p Peer, reason interface{}) { func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) { _, msg, err := DecodeMessage(msgBytes) if err != nil { - r.Logger.Error("Error decoding message", "err", err) + r.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) + r.Switch.StopPeerForError(src, err) return } r.Logger.Debug("Received message", "src", src, "chId", chID, "msg", msg) From d0c67bbe1684aa21f3ca2ad0c8324f81dddcd6d7 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Sun, 4 Mar 2018 14:22:58 +0400 Subject: [PATCH 06/14] stop peer if evidence is not valid --- evidence/reactor.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/evidence/reactor.go b/evidence/reactor.go index 7eb6bfb65..6647db969 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -96,7 +96,8 @@ func (evR *EvidenceReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { err := evR.evpool.AddEvidence(ev) if err != nil { evR.Logger.Info("Evidence is not valid", "evidence", msg.Evidence, "err", err) - // TODO: punish peer + // punish peer + evR.Switch.StopPeerForError(src, err) } } default: From d86855ad7af2cccfaa19f922c213b448272c4256 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Sun, 4 Mar 2018 14:36:06 +0400 Subject: [PATCH 07/14] stop peer if it sends us msg with unknown channel --- p2p/peer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/peer.go b/p2p/peer.go index 2e876d11b..e20271149 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -358,7 +358,7 @@ func createMConnection(conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, ch onReceive := func(chID byte, msgBytes []byte) { reactor := reactorsByCh[chID] if reactor == nil { - cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID)) + onPeerError(p, fmt.Errorf("Unknown channel %X", chID)) } reactor.Receive(chID, p, msgBytes) } From bcf54b0aa37f3f44756b00fd790e306bfd2cf879 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Sun, 4 Mar 2018 14:58:43 +0400 Subject: [PATCH 08/14] PanicSanity is deprecated --- blockchain/reactor.go | 4 +++- blockchain/store.go | 14 +++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 23d399426..2b334c232 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -72,18 +72,20 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl fastSync bool) *BlockchainReactor { if state.LastBlockHeight != store.Height() { - cmn.PanicSanity(cmn.Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, + panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height())) } const cap = 1000 // must be bigger than peers count requestsCh := make(chan BlockRequest, cap) errorsCh := make(chan peerError, cap) // so we don't block in #Receive#pool.AddBlock + pool := NewBlockPool( store.Height()+1, requestsCh, errorsCh, ) + bcR := &BlockchainReactor{ params: state.ConsensusParams, initialState: state, diff --git a/blockchain/store.go b/blockchain/store.go index a9a543436..b949bc904 100644 --- a/blockchain/store.go +++ b/blockchain/store.go @@ -76,7 +76,7 @@ func (bs *BlockStore) LoadBlock(height int64) *types.Block { } blockMeta := wire.ReadBinary(&types.BlockMeta{}, r, 0, &n, &err).(*types.BlockMeta) if err != nil { - cmn.PanicCrisis(cmn.Fmt("Error reading block meta: %v", err)) + panic(fmt.Sprintf("Error reading block meta: %v", err)) } bytez := []byte{} for i := 0; i < blockMeta.BlockID.PartsHeader.Total; i++ { @@ -85,7 +85,7 @@ func (bs *BlockStore) LoadBlock(height int64) *types.Block { } block := wire.ReadBinary(&types.Block{}, bytes.NewReader(bytez), 0, &n, &err).(*types.Block) if err != nil { - cmn.PanicCrisis(cmn.Fmt("Error reading block: %v", err)) + panic(fmt.Sprintf("Error reading block: %v", err)) } return block } @@ -102,7 +102,7 @@ func (bs *BlockStore) LoadBlockPart(height int64, index int) *types.Part { } part := wire.ReadBinary(&types.Part{}, r, 0, &n, &err).(*types.Part) if err != nil { - cmn.PanicCrisis(cmn.Fmt("Error reading block part: %v", err)) + panic(fmt.Sprintf("Error reading block part: %v", err)) } return part } @@ -118,7 +118,7 @@ func (bs *BlockStore) LoadBlockMeta(height int64) *types.BlockMeta { } blockMeta := wire.ReadBinary(&types.BlockMeta{}, r, 0, &n, &err).(*types.BlockMeta) if err != nil { - cmn.PanicCrisis(cmn.Fmt("Error reading block meta: %v", err)) + panic(fmt.Sprintf("Error reading block meta: %v", err)) } return blockMeta } @@ -136,7 +136,7 @@ func (bs *BlockStore) LoadBlockCommit(height int64) *types.Commit { } commit := wire.ReadBinary(&types.Commit{}, r, 0, &n, &err).(*types.Commit) if err != nil { - cmn.PanicCrisis(cmn.Fmt("Error reading commit: %v", err)) + panic(fmt.Sprintf("Error reading commit: %v", err)) } return commit } @@ -153,7 +153,7 @@ func (bs *BlockStore) LoadSeenCommit(height int64) *types.Commit { } commit := wire.ReadBinary(&types.Commit{}, r, 0, &n, &err).(*types.Commit) if err != nil { - cmn.PanicCrisis(cmn.Fmt("Error reading commit: %v", err)) + panic(fmt.Sprintf("Error reading commit: %v", err)) } return commit } @@ -262,7 +262,7 @@ func LoadBlockStoreStateJSON(db dbm.DB) BlockStoreStateJSON { bsj := BlockStoreStateJSON{} err := json.Unmarshal(bytes, &bsj) if err != nil { - cmn.PanicCrisis(cmn.Fmt("Could not unmarshal bytes: %X", bytes)) + panic(fmt.Sprintf("Could not unmarshal bytes: %X", bytes)) } return bsj } From 266974cb5952e267b5185d8ffd7f15aee7be01ab Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 5 Mar 2018 13:04:31 +0400 Subject: [PATCH 09/14] stop peer if it sends invalid vote --- consensus/reactor.go | 38 ++++++++++++++++++++++++++++++++++++++ consensus/state.go | 16 +++++++++++++++- 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index 8318f2bb0..b57d85573 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -64,6 +64,11 @@ func (conR *ConsensusReactor) OnStart() error { return err } + err = conR.startPeerErrorsRoutine() + if err != nil { + return err + } + if !conR.FastSync() { err := conR.conS.Start() if err != nil { @@ -334,6 +339,39 @@ func (conR *ConsensusReactor) FastSync() bool { //-------------------------------------- +// startPeerErrorsRoutine spawns a new gororoutine listening for errors from +// consensus/state or other consensus modules. +func (conR *ConsensusReactor) startPeerErrorsRoutine() error { + const subscriber = "consensus-reactor" + ctx := context.Background() + + errorsCh := make(chan interface{}) + err := conR.eventBus.Subscribe(ctx, subscriber, types.QueryForEvent(peerErrorEvent), errorsCh) + if err != nil { + return errors.Wrapf(err, "failed to subscribe %s to %s", subscriber, peerErrorEvent) + } + + go func() { + for { + select { + case data, ok := <-errorsCh: + if ok { + pErr := data.(types.TMEventData).Unwrap().(peerError) + peer := conR.Switch.Peers().Get(pErr.peerID) + if peer != nil { + conR.Switch.StopPeerForError(peer, pErr.err) + } + } + case <-conR.Quit(): + conR.eventBus.UnsubscribeAll(ctx, subscriber) + return + } + } + }() + + return nil +} + // startBroadcastRoutine subscribes for new round steps, votes and proposal // heartbeats using the event bus and starts a go routine to broadcasts events // to peers upon receiving them. diff --git a/consensus/state.go b/consensus/state.go index 30bd56f10..958574dd0 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -63,6 +63,19 @@ func (ti *timeoutInfo) String() string { return fmt.Sprintf("%v ; %d/%d %v", ti.Duration, ti.Height, ti.Round, ti.Step) } +type peerError struct { + err error + peerID p2p.ID +} + +func (e peerError) Error() string { + return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error()) +} + +const ( + peerErrorEvent = "cs.PeerError" +) + // ConsensusState handles execution of the consensus algorithm. // It processes votes and proposals, and upon reaching agreement, // commits blocks to the chain and executes them against the application. @@ -582,7 +595,8 @@ func (cs *ConsensusState) handleMsg(mi msgInfo) { // if the vote gives us a 2/3-any or 2/3-one, we transition err := cs.tryAddVote(msg.Vote, peerID) if err == ErrAddingVote { - // TODO: punish peer + // punish peer + cs.eventBus.Publish(peerErrorEvent, types.TMEventData{peerError{err, peerID}}) } // NOTE: the vote is broadcast to peers by the reactor listening From 63cb69cb96c4251c07df592120dfea1da630d77b Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 5 Mar 2018 15:26:36 +0400 Subject: [PATCH 10/14] comment out ErrAddingVote because it breaks byzantine_test --- consensus/state.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/consensus/state.go b/consensus/state.go index 958574dd0..342ea9830 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -595,8 +595,9 @@ func (cs *ConsensusState) handleMsg(mi msgInfo) { // if the vote gives us a 2/3-any or 2/3-one, we transition err := cs.tryAddVote(msg.Vote, peerID) if err == ErrAddingVote { - // punish peer - cs.eventBus.Publish(peerErrorEvent, types.TMEventData{peerError{err, peerID}}) + // TODO: punish peer + // breaks byzantine_test + // cs.eventBus.Publish(peerErrorEvent, types.TMEventData{peerError{err, peerID}}) } // NOTE: the vote is broadcast to peers by the reactor listening From b0d8f552c505ae0adc5a8fe22525af31b9a71081 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 5 Mar 2018 15:35:50 +0400 Subject: [PATCH 11/14] return err if peer has sent a vote that does not match our round --- consensus/types/height_vote_set.go | 11 +++++++---- consensus/types/height_vote_set_test.go | 4 ++-- types/vote.go | 2 +- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/consensus/types/height_vote_set.go b/consensus/types/height_vote_set.go index 7db932045..a155bce08 100644 --- a/consensus/types/height_vote_set.go +++ b/consensus/types/height_vote_set.go @@ -1,6 +1,7 @@ package types import ( + "errors" "fmt" "strings" "sync" @@ -15,6 +16,10 @@ type RoundVoteSet struct { Precommits *types.VoteSet } +var ( + GotVoteFromUnwantedRoundError = errors.New("Peer has sent a vote that does not match our round for more than one round") +) + /* Keeps track of all VoteSets from round 0 to round 'round'. @@ -117,10 +122,8 @@ func (hvs *HeightVoteSet) AddVote(vote *types.Vote, peerID p2p.ID) (added bool, voteSet = hvs.getVoteSet(vote.Round, vote.Type) hvs.peerCatchupRounds[peerID] = append(rndz, vote.Round) } else { - // Peer has sent a vote that does not match our round, - // for more than one round. Bad peer! - // TODO punish peer. - // log.Warn("Deal with peer giving votes from unwanted rounds") + // punish peer + err = GotVoteFromUnwantedRoundError return } } diff --git a/consensus/types/height_vote_set_test.go b/consensus/types/height_vote_set_test.go index 5719d7eea..246c0b711 100644 --- a/consensus/types/height_vote_set_test.go +++ b/consensus/types/height_vote_set_test.go @@ -34,8 +34,8 @@ func TestPeerCatchupRounds(t *testing.T) { vote1001_0 := makeVoteHR(t, 1, 1001, privVals, 0) added, err = hvs.AddVote(vote1001_0, "peer1") - if err != nil { - t.Error("AddVote error", err) + if err != GotVoteFromUnwantedRoundError { + t.Errorf("Expected GotVoteFromUnwantedRoundError, but got %v", err) } if added { t.Error("Expected to *not* add vote from peer, too many catchup rounds.") diff --git a/types/vote.go b/types/vote.go index 6b36e0f4f..ceb6e985e 100644 --- a/types/vote.go +++ b/types/vote.go @@ -6,7 +6,7 @@ import ( "fmt" "time" - "github.com/tendermint/go-crypto" + crypto "github.com/tendermint/go-crypto" "github.com/tendermint/tendermint/wire" cmn "github.com/tendermint/tmlibs/common" ) From 714f885dac20dd7e58589c27aa11e9de25f8565a Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 5 Mar 2018 16:51:52 +0400 Subject: [PATCH 12/14] mark peer as good if it contributed enough votes or block parts Refs #1147 --- consensus/reactor.go | 49 ++++++++++++++++++++++++++++++++++++++++++++ node/node.go | 2 ++ p2p/switch.go | 15 ++++++++++++++ 3 files changed, 66 insertions(+) diff --git a/consensus/reactor.go b/consensus/reactor.go index b57d85573..f19b0a9ad 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -256,6 +256,9 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) ps.ApplyProposalPOLMessage(msg) case *BlockPartMessage: ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index) + if numBlocks := ps.RecordBlockPart(msg); numBlocks > 10000 { + conR.Switch.MarkPeerAsGood(src) + } conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()} default: conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg))) @@ -275,6 +278,9 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) ps.EnsureVoteBitArrays(height, valSize) ps.EnsureVoteBitArrays(height-1, lastCommitSize) ps.SetHasVote(msg.Vote) + if blocks := ps.RecordVote(msg.Vote); blocks > 10000 { + conR.Switch.MarkPeerAsGood(src) + } cs.peerMsgQueue <- msgInfo{msg, src.ID()} @@ -869,6 +875,17 @@ type PeerState struct { mtx sync.Mutex cstypes.PeerRoundState + + stats *peerStateStats +} + +// peerStateStats holds internal statistics for a peer. +type peerStateStats struct { + lastVoteHeight int64 + votes int + + lastBlockPartHeight int64 + blockParts int } // NewPeerState returns a new PeerState for the given Peer @@ -882,6 +899,7 @@ func NewPeerState(peer p2p.Peer) *PeerState { LastCommitRound: -1, CatchupCommitRound: -1, }, + stats: &peerStateStats{}, } } @@ -1093,6 +1111,37 @@ func (ps *PeerState) ensureVoteBitArrays(height int64, numValidators int) { } } +// RecordVote updates internal statistics for this peer by recording the vote. +// It returns the total number of votes (1 per block). This essentially means +// the number of blocks for which peer has been sending us block parts. +func (ps *PeerState) RecordVote(vote *types.Vote) int { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + if ps.stats.lastVoteHeight == vote.Height { + return ps.stats.votes + } + ps.stats.lastVoteHeight = vote.Height + ps.stats.votes += 1 + return ps.stats.votes +} + +// RecordVote updates internal statistics for this peer by recording the block part. +// It returns the total number of block parts (1 per block). This essentially means +// the number of blocks for which peer has been sending us block parts. +func (ps *PeerState) RecordBlockPart(bp *BlockPartMessage) int { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + if ps.stats.lastBlockPartHeight == bp.Height { + return ps.stats.blockParts + } + + ps.stats.lastBlockPartHeight = bp.Height + ps.stats.blockParts += 1 + return ps.stats.blockParts +} + // SetHasVote sets the given vote as known by the peer func (ps *PeerState) SetHasVote(vote *types.Vote) { ps.mtx.Lock() diff --git a/node/node.go b/node/node.go index d40322fad..83ac50ec6 100644 --- a/node/node.go +++ b/node/node.go @@ -287,6 +287,8 @@ func NewNode(config *cfg.Config, sw.AddReactor("PEX", pexReactor) } + sw.SetAddrBook(addrBook) + // Filter peers by addr or pubkey with an ABCI query. // If the query return code is OK, add peer. // XXX: Query format subject to change diff --git a/p2p/switch.go b/p2p/switch.go index cffadf3be..63deace29 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -35,6 +35,7 @@ const ( type AddrBook interface { AddAddress(addr *NetAddress, src *NetAddress) error + MarkGood(*NetAddress) Save() } @@ -57,6 +58,7 @@ type Switch struct { dialing *cmn.CMap nodeInfo NodeInfo // our node info nodeKey *NodeKey // our node privkey + addrBook AddrBook filterConnByAddr func(net.Addr) error filterConnByID func(ID) error @@ -317,6 +319,19 @@ func (sw *Switch) reconnectToPeer(peer Peer) { sw.Logger.Error("Failed to reconnect to peer. Giving up", "peer", peer, "elapsed", time.Since(start)) } +// SetAddrBook allows to set address book on Switch. +func (sw *Switch) SetAddrBook(addrBook AddrBook) { + sw.addrBook = addrBook +} + +// MarkPeerAsGood marks the given peer as good when it did something useful +// like contributed to consensus. +func (sw *Switch) MarkPeerAsGood(peer Peer) { + if sw.addrBook != nil { + sw.addrBook.MarkGood(peer.NodeInfo().NetAddress()) + } +} + //--------------------------------------------------------------------- // Dialing From a655500047b776fcbd263ed4f19e16c255837a97 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 6 Mar 2018 11:41:02 +0400 Subject: [PATCH 13/14] fix copy-pasted comment [ci skip] --- consensus/reactor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index f19b0a9ad..2eacc38db 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -1113,7 +1113,7 @@ func (ps *PeerState) ensureVoteBitArrays(height int64, numValidators int) { // RecordVote updates internal statistics for this peer by recording the vote. // It returns the total number of votes (1 per block). This essentially means -// the number of blocks for which peer has been sending us block parts. +// the number of blocks for which peer has been sending us votes. func (ps *PeerState) RecordVote(vote *types.Vote) int { ps.mtx.Lock() defer ps.mtx.Unlock() From 86ddf17db01b88f1f7b49ab5f161ef4d4bc60b11 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 6 Mar 2018 13:40:09 +0400 Subject: [PATCH 14/14] add a todo Refs #1281 --- consensus/byzantine_test.go | 14 ++++++++------ consensus/reactor.go | 38 ------------------------------------- consensus/state.go | 19 ++++--------------- 3 files changed, 12 insertions(+), 59 deletions(-) diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 0d817215d..f8163c07d 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -46,9 +46,9 @@ func TestByzantine(t *testing.T) { eventChans := make([]chan interface{}, N) reactors := make([]p2p.Reactor, N) for i := 0; i < N; i++ { + // make first val byzantine if i == 0 { css[i].privValidator = NewByzantinePrivValidator(css[i].privValidator) - // make byzantine css[i].decideProposal = func(j int) func(int64, int) { return func(height int64, round int) { byzantineDecideProposalFunc(t, height, round, css[j], switches[j]) @@ -74,9 +74,11 @@ func TestByzantine(t *testing.T) { var conRI p2p.Reactor // nolint: gotype, gosimple conRI = conR + // make first val byzantine if i == 0 { conRI = NewByzantineReactor(conR) } + reactors[i] = conRI } @@ -115,19 +117,19 @@ func TestByzantine(t *testing.T) { // and the other block to peers[1] and peers[2]. // note peers and switches order don't match. peers := switches[0].Peers().List() + + // partition A ind0 := getSwitchIndex(switches, peers[0]) + + // partition B ind1 := getSwitchIndex(switches, peers[1]) ind2 := getSwitchIndex(switches, peers[2]) - - // connect the 2 peers in the larger partition p2p.Connect2Switches(switches, ind1, ind2) - // wait for someone in the big partition to make a block + // wait for someone in the big partition (B) to make a block <-eventChans[ind2] t.Log("A block has been committed. Healing partition") - - // connect the partitions p2p.Connect2Switches(switches, ind0, ind1) p2p.Connect2Switches(switches, ind0, ind2) diff --git a/consensus/reactor.go b/consensus/reactor.go index 2eacc38db..5c672a0c7 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -64,11 +64,6 @@ func (conR *ConsensusReactor) OnStart() error { return err } - err = conR.startPeerErrorsRoutine() - if err != nil { - return err - } - if !conR.FastSync() { err := conR.conS.Start() if err != nil { @@ -345,39 +340,6 @@ func (conR *ConsensusReactor) FastSync() bool { //-------------------------------------- -// startPeerErrorsRoutine spawns a new gororoutine listening for errors from -// consensus/state or other consensus modules. -func (conR *ConsensusReactor) startPeerErrorsRoutine() error { - const subscriber = "consensus-reactor" - ctx := context.Background() - - errorsCh := make(chan interface{}) - err := conR.eventBus.Subscribe(ctx, subscriber, types.QueryForEvent(peerErrorEvent), errorsCh) - if err != nil { - return errors.Wrapf(err, "failed to subscribe %s to %s", subscriber, peerErrorEvent) - } - - go func() { - for { - select { - case data, ok := <-errorsCh: - if ok { - pErr := data.(types.TMEventData).Unwrap().(peerError) - peer := conR.Switch.Peers().Get(pErr.peerID) - if peer != nil { - conR.Switch.StopPeerForError(peer, pErr.err) - } - } - case <-conR.Quit(): - conR.eventBus.UnsubscribeAll(ctx, subscriber) - return - } - } - }() - - return nil -} - // startBroadcastRoutine subscribes for new round steps, votes and proposal // heartbeats using the event bus and starts a go routine to broadcasts events // to peers upon receiving them. diff --git a/consensus/state.go b/consensus/state.go index 342ea9830..3cde13bd2 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -63,19 +63,6 @@ func (ti *timeoutInfo) String() string { return fmt.Sprintf("%v ; %d/%d %v", ti.Duration, ti.Height, ti.Round, ti.Step) } -type peerError struct { - err error - peerID p2p.ID -} - -func (e peerError) Error() string { - return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error()) -} - -const ( - peerErrorEvent = "cs.PeerError" -) - // ConsensusState handles execution of the consensus algorithm. // It processes votes and proposals, and upon reaching agreement, // commits blocks to the chain and executes them against the application. @@ -596,8 +583,10 @@ func (cs *ConsensusState) handleMsg(mi msgInfo) { err := cs.tryAddVote(msg.Vote, peerID) if err == ErrAddingVote { // TODO: punish peer - // breaks byzantine_test - // cs.eventBus.Publish(peerErrorEvent, types.TMEventData{peerError{err, peerID}}) + // We probably don't want to stop the peer here. The vote does not + // necessarily comes from a malicious peer but can be just broadcasted by + // a typical peer. + // https://github.com/tendermint/tendermint/issues/1281 } // NOTE: the vote is broadcast to peers by the reactor listening