diff --git a/blockchain/pool.go b/blockchain/pool.go index bd52e280f..348ba09b3 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -11,11 +11,25 @@ import ( "github.com/tendermint/tmlibs/log" ) +/* + +eg, L = latency = 0.1s + P = num peers = 10 + FN = num full nodes + BS = 1kB block size + CB = 1 Mbit/s = 128 kB/s + CB/P = 12.8 kB + B/S = CB/P/BS = 12.8 blocks/s + + 12.8 * 0.1 = 1.28 blocks on conn + +*/ + const ( requestIntervalMS = 250 maxTotalRequesters = 300 maxPendingRequests = maxTotalRequesters - maxPendingRequestsPerPeer = 75 + maxPendingRequestsPerPeer = 10 minRecvRate = 10240 // 10Kb/s ) @@ -69,9 +83,7 @@ func (pool *BlockPool) OnStart() error { return nil } -func (pool *BlockPool) OnStop() { - pool.BaseService.OnStop() -} +func (pool *BlockPool) OnStop() {} // Run spawns requesters as needed. func (pool *BlockPool) makeRequestersRoutine() { @@ -188,15 +200,16 @@ func (pool *BlockPool) PopRequest() { // Remove the peer and redo request from others. func (pool *BlockPool) RedoRequest(height int) { pool.mtx.Lock() + defer pool.mtx.Unlock() + request := pool.requesters[height] - pool.mtx.Unlock() if request.block == nil { cmn.PanicSanity("Expected block to be non-nil") } // RemovePeer will redo all requesters associated with this peer. // TODO: record this malfeasance - pool.RemovePeer(request.peerID) + pool.removePeer(request.peerID) } // TODO: ensure that blocks come in order for each peer. @@ -206,13 +219,17 @@ func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int requester := pool.requesters[block.Height] if requester == nil { + // a block we didn't expect. + // TODO:if height is too far ahead, punish peer return } if requester.setBlock(block, peerID) { pool.numPending-- peer := pool.peers[peerID] - peer.decrPending(blockSize) + if peer != nil { + peer.decrPending(blockSize) + } } else { // Bad peer? } diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 9cc01fbac..b46ad40fa 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -12,6 +12,7 @@ import ( sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" cmn "github.com/tendermint/tmlibs/common" + "github.com/tendermint/tmlibs/log" ) const ( @@ -79,7 +80,13 @@ func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus, return bcR } -// OnStart implements BaseService +// SetLogger implements cmn.Service by setting the logger on reactor and pool. +func (bcR *BlockchainReactor) SetLogger(l log.Logger) { + bcR.BaseService.Logger = l + bcR.pool.Logger = l +} + +// OnStart implements cmn.Service. func (bcR *BlockchainReactor) OnStart() error { bcR.BaseReactor.OnStart() if bcR.fastSync { @@ -92,7 +99,7 @@ func (bcR *BlockchainReactor) OnStart() error { return nil } -// OnStop implements BaseService +// OnStop implements cmn.Service. func (bcR *BlockchainReactor) OnStop() { bcR.BaseReactor.OnStop() bcR.pool.Stop() @@ -214,9 +221,9 @@ FOR_LOOP: // ask for status updates go bcR.BroadcastStatusRequest() case <-switchToConsensusTicker.C: - height, numPending, _ := bcR.pool.GetStatus() + height, numPending, lenRequesters := bcR.pool.GetStatus() outbound, inbound, _ := bcR.Switch.NumPeers() - bcR.Logger.Info("Consensus ticker", "numPending", numPending, "total", len(bcR.pool.requesters), + bcR.Logger.Info("Consensus ticker", "numPending", numPending, "total", lenRequesters, "outbound", outbound, "inbound", inbound) if bcR.pool.IsCaughtUp() { bcR.Logger.Info("Time to switch to consensus reactor!", "height", height) diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index 633cae169..584aadf39 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -1,12 +1,11 @@ package blockchain import ( - "bytes" "testing" wire "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" - "github.com/tendermint/tmlibs/db" + dbm "github.com/tendermint/tmlibs/db" "github.com/tendermint/tmlibs/log" cfg "github.com/tendermint/tendermint/config" @@ -15,28 +14,24 @@ import ( "github.com/tendermint/tendermint/types" ) -func newBlockchainReactor(logger log.Logger, maxBlockHeight int) *BlockchainReactor { - config := cfg.ResetTestRoot("node_node_test") +func newBlockchainReactor(maxBlockHeight int) *BlockchainReactor { + logger := log.TestingLogger() + config := cfg.ResetTestRoot("blockchain_reactor_test") - blockStoreDB := db.NewDB("blockstore", config.DBBackend, config.DBDir()) - blockStore := NewBlockStore(blockStoreDB) - - stateLogger := logger.With("module", "state") + blockStore := NewBlockStore(dbm.NewMemDB()) // Get State - stateDB := db.NewDB("state", config.DBBackend, config.DBDir()) - state, _ := sm.GetState(stateDB, config.GenesisFile()) - - state.SetLogger(stateLogger) + state, _ := sm.GetState(dbm.NewMemDB(), config.GenesisFile()) + state.SetLogger(logger.With("module", "state")) state.Save() // Make the blockchainReactor itself fastSync := true bcReactor := NewBlockchainReactor(state.Copy(), nil, blockStore, fastSync) + bcReactor.SetLogger(logger.With("module", "blockchain")) // Next: we need to set a switch in order for peers to be added in bcReactor.Switch = p2p.NewSwitch(cfg.DefaultP2PConfig()) - bcReactor.SetLogger(logger.With("module", "blockchain")) // Lastly: let's add some blocks in for blockHeight := 1; blockHeight <= maxBlockHeight; blockHeight++ { @@ -50,12 +45,10 @@ func newBlockchainReactor(logger log.Logger, maxBlockHeight int) *BlockchainReac } func TestNoBlockMessageResponse(t *testing.T) { - logBuf := new(bytes.Buffer) - logger := log.NewTMLogger(logBuf) maxBlockHeight := 20 - bcr := newBlockchainReactor(logger, maxBlockHeight) - go bcr.OnStart() + bcr := newBlockchainReactor(maxBlockHeight) + bcr.Start() defer bcr.Stop() // Add some peers in diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 69ab55cc9..54c2d06b5 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -139,6 +139,7 @@ func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) { switch msg := msg.(type) { case *pexRequestMessage: // src requested some peers. + // NOTE: we might send an empty selection r.SendAddrs(src, r.book.GetSelection()) case *pexAddrsMessage: // We received some peer addresses from src.