From 8128627f085ccdfb6992c08de7363b890f3da9b2 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Thu, 21 Jun 2018 01:57:35 -0700 Subject: [PATCH 1/3] Optimizing blockchain reactor. Should be paired with https://github.com/tendermint/iavl/pull/65. --- Gopkg.lock | 16 +---- Gopkg.toml | 4 ++ Makefile | 14 ++-- blockchain/pool.go | 18 +++-- blockchain/reactor.go | 128 +++++++++++++++++++-------------- privval/priv_validator_test.go | 11 +-- state/store.go | 7 +- types/canonical_json.go | 2 +- 8 files changed, 103 insertions(+), 97 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index f9729ffab..8d73c93ff 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -130,7 +130,6 @@ version = "v1.0" [[projects]] - branch = "master" name = "github.com/jmhodges/levigo" packages = ["."] revision = "c42d9e0ca023e2198120196f842701bb4c55d7b9" @@ -286,19 +285,6 @@ ] revision = "e2150783cd35f5b607daca48afd8c57ec54cc995" -[[projects]] - name = "github.com/tendermint/abci" - packages = [ - "client", - "example/code", - "example/counter", - "example/kvstore", - "server", - "types" - ] - revision = "198dccf0ddfd1bb176f87657e3286a05a6ed9540" - version = "v0.12.0" - [[projects]] branch = "master" name = "github.com/tendermint/ed25519" @@ -435,6 +421,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "d17038089dd6383ff5028229d4026bb92f5c7adc7e9c1cd52584237e2e5fd431" + inputs-digest = "400de835ace8c8a69747afd675d1952daf750c251a02b9dac82a3c9dce4f65a8" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 4c32f3d80..ff245d47e 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -86,6 +86,10 @@ name = "google.golang.org/genproto" revision = "7fd901a49ba6a7f87732eb344f6e3c5b19d1b200" +[[override]] + name = "github.com/jmhodges/levigo" + revision = "c42d9e0ca023e2198120196f842701bb4c55d7b9" + [prune] go-tests = true unused-packages = true diff --git a/Makefile b/Makefile index 079c58f90..9525560a1 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ GOTOOLS = \ github.com/golang/dep/cmd/dep \ gopkg.in/alecthomas/gometalinter.v2 PACKAGES=$(shell go list ./... | grep -v '/vendor/') -BUILD_TAGS?=tendermint +BUILD_TAGS?='tendermint' BUILD_FLAGS = -ldflags "-X github.com/tendermint/tendermint/version.GitCommit=`git rev-parse --short=8 HEAD`" all: check build test install @@ -14,20 +14,20 @@ check: check_tools ensure_deps ### Build build: - CGO_ENABLED=0 go build $(BUILD_FLAGS) -tags '$(BUILD_TAGS)' -o build/tendermint ./cmd/tendermint/ + CGO_ENABLED=0 go build $(BUILD_FLAGS) -tags $(BUILD_TAGS) -o build/tendermint ./cmd/tendermint/ build_race: - CGO_ENABLED=0 go build -race $(BUILD_FLAGS) -tags '$(BUILD_TAGS)' -o build/tendermint ./cmd/tendermint + CGO_ENABLED=0 go build -race $(BUILD_FLAGS) -tags $(BUILD_TAGS) -o build/tendermint ./cmd/tendermint install: - CGO_ENABLED=0 go install $(BUILD_FLAGS) -tags '$(BUILD_TAGS)' ./cmd/tendermint + CGO_ENABLED=0 go install $(BUILD_FLAGS) -tags $(BUILD_TAGS) ./cmd/tendermint ######################################## ### Distribution # dist builds binaries for all platforms and packages them for distribution dist: - @BUILD_TAGS='$(BUILD_TAGS)' sh -c "'$(CURDIR)/scripts/dist.sh'" + @BUILD_TAGS=$(BUILD_TAGS) sh -c "'$(CURDIR)/scripts/dist.sh'" ######################################## ### Tools & dependencies @@ -66,7 +66,7 @@ draw_deps: get_deps_bin_size: @# Copy of build recipe with additional flags to perform binary size analysis - $(eval $(shell go build -work -a $(BUILD_FLAGS) -tags '$(BUILD_TAGS)' -o build/tendermint ./cmd/tendermint/ 2>&1)) + $(eval $(shell go build -work -a $(BUILD_FLAGS) -tags $(BUILD_TAGS) -o build/tendermint ./cmd/tendermint/ 2>&1)) @find $(WORK) -type f -name "*.a" | xargs -I{} du -hxs "{}" | sort -rh | sed -e s:${WORK}/::g > deps_bin_size.log @echo "Results can be found here: $(CURDIR)/deps_bin_size.log" @@ -132,7 +132,7 @@ vagrant_test: ### go tests test: @echo "--> Running go test" - @go test $(PACKAGES) + @GOCACHE=off go test $(PACKAGES) test_race: @echo "--> Running go test --race" diff --git a/blockchain/pool.go b/blockchain/pool.go index 8b964e81a..efd5c6a3a 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -29,10 +29,10 @@ eg, L = latency = 0.1s */ const ( - requestIntervalMS = 100 - maxTotalRequesters = 1000 + requestIntervalMS = 2 + maxTotalRequesters = 600 maxPendingRequests = maxTotalRequesters - maxPendingRequestsPerPeer = 50 + maxPendingRequestsPerPeer = 20 // Minimum recv rate to ensure we're receiving blocks from a peer fast // enough. If a peer is not sending us data at at least that rate, we @@ -219,14 +219,12 @@ func (pool *BlockPool) RedoRequest(height int64) p2p.ID { defer pool.mtx.Unlock() request := pool.requesters[height] - - if request.block == nil { - panic("Expected block to be non-nil") + peerID := request.getPeerID() + if peerID != p2p.ID("") { + // RemovePeer will redo all requesters associated with this peer. + pool.removePeer(peerID) } - - // RemovePeer will redo all requesters associated with this peer. - pool.removePeer(request.peerID) - return request.peerID + return peerID } // TODO: ensure that blocks come in order for each peer. diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 33dfdd288..bf6214e01 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -17,7 +17,8 @@ const ( // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height) BlockchainChannel = byte(0x40) - trySyncIntervalMS = 50 + trySyncIntervalMS = 10 + // stop syncing when last block's time is // within this much of the system time. // stopSyncingDurationMinutes = 10 @@ -75,8 +76,9 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl store.Height())) } - const capacity = 1000 // must be bigger than peers count - requestsCh := make(chan BlockRequest, capacity) + requestsCh := make(chan BlockRequest, maxTotalRequesters) + + const capacity = 1000 // must be bigger than peers count errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock pool := NewBlockPool( @@ -208,7 +210,6 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) // Handle messages from the poolReactor telling the reactor what to do. // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! -// (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.) func (bcR *BlockchainReactor) poolRoutine() { trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) @@ -223,6 +224,8 @@ func (bcR *BlockchainReactor) poolRoutine() { lastHundred := time.Now() lastRate := 0.0 + didProcessCh := make(chan struct{}, 1) + FOR_LOOP: for { select { @@ -238,14 +241,17 @@ FOR_LOOP: // The pool handles timeouts, just let it go. continue FOR_LOOP } + case err := <-bcR.errorsCh: peer := bcR.Switch.Peers().Get(err.peerID) if peer != nil { bcR.Switch.StopPeerForError(peer, err) } + case <-statusUpdateTicker.C: // ask for status updates go bcR.BroadcastStatusRequest() // nolint: errcheck + case <-switchToConsensusTicker.C: height, numPending, lenRequesters := bcR.pool.GetStatus() outbound, inbound, _ := bcR.Switch.NumPeers() @@ -260,60 +266,78 @@ FOR_LOOP: break FOR_LOOP } + case <-trySyncTicker.C: // chan time - // This loop can be slow as long as it's doing syncing work. - SYNC_LOOP: - for i := 0; i < 10; i++ { - // See if there are any blocks to sync. - first, second := bcR.pool.PeekTwoBlocks() - //bcR.Logger.Info("TrySync peeked", "first", first, "second", second) - if first == nil || second == nil { - // We need both to sync the first block. - break SYNC_LOOP + select { + case didProcessCh <- struct{}{}: + default: + } + + case <-didProcessCh: + // NOTE: It is a subtle mistake to process more than a single block + // at a time (e.g. 10) here, because we only TrySend 1 request per + // loop. The ratio mismatch can result in starving of blocks, a + // sudden burst of requests and responses, and repeat. + // Consequently, it is better to split these routines rather than + // coupling them as it's written here. TODO uncouple from request + // routine. + + // See if there are any blocks to sync. + first, second := bcR.pool.PeekTwoBlocks() + //bcR.Logger.Info("TrySync peeked", "first", first, "second", second) + if first == nil || second == nil { + // We need both to sync the first block. + continue FOR_LOOP + } else { + // Try again quickly next loop. + didProcessCh <- struct{}{} + } + + firstParts := first.MakePartSet(state.ConsensusParams.BlockPartSizeBytes) + firstPartsHeader := firstParts.Header() + firstID := types.BlockID{first.Hash(), firstPartsHeader} + // Finally, verify the first block using the second's commit + // NOTE: we can probably make this more efficient, but note that calling + // first.Hash() doesn't verify the tx contents, so MakePartSet() is + // currently necessary. + err := state.Validators.VerifyCommit( + chainID, firstID, first.Height, second.LastCommit) + if err != nil { + bcR.Logger.Error("Error in validation", "err", err) + peerID := bcR.pool.RedoRequest(first.Height) + peer := bcR.Switch.Peers().Get(peerID) + if peer != nil { + // NOTE: we've already removed the peer's request, but we + // still need to clean up the rest. + bcR.Switch.StopPeerForError(peer, fmt.Errorf("BlockchainReactor validation error: %v", err)) } - firstParts := first.MakePartSet(state.ConsensusParams.BlockPartSizeBytes) - firstPartsHeader := firstParts.Header() - firstID := types.BlockID{first.Hash(), firstPartsHeader} - // Finally, verify the first block using the second's commit - // NOTE: we can probably make this more efficient, but note that calling - // first.Hash() doesn't verify the tx contents, so MakePartSet() is - // currently necessary. - err := state.Validators.VerifyCommit( - chainID, firstID, first.Height, second.LastCommit) + continue FOR_LOOP + } else { + bcR.pool.PopRequest() + + // TODO: batch saves so we dont persist to disk every block + bcR.store.SaveBlock(first, firstParts, second.LastCommit) + + // TODO: same thing for app - but we would need a way to + // get the hash without persisting the state + var err error + state, err = bcR.blockExec.ApplyBlock(state, firstID, first) if err != nil { - bcR.Logger.Error("Error in validation", "err", err) - peerID := bcR.pool.RedoRequest(first.Height) - peer := bcR.Switch.Peers().Get(peerID) - if peer != nil { - bcR.Switch.StopPeerForError(peer, fmt.Errorf("BlockchainReactor validation error: %v", err)) - } - break SYNC_LOOP - } else { - bcR.pool.PopRequest() - - // TODO: batch saves so we dont persist to disk every block - bcR.store.SaveBlock(first, firstParts, second.LastCommit) - - // TODO: same thing for app - but we would need a way to - // get the hash without persisting the state - var err error - state, err = bcR.blockExec.ApplyBlock(state, firstID, first) - if err != nil { - // TODO This is bad, are we zombie? - cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", - first.Height, first.Hash(), err)) - } - blocksSynced++ - - if blocksSynced%100 == 0 { - lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) - bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height, - "max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate) - lastHundred = time.Now() - } + // TODO This is bad, are we zombie? + cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", + first.Height, first.Hash(), err)) + } + blocksSynced++ + + if blocksSynced%100 == 0 { + lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) + bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height, + "max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate) + lastHundred = time.Now() } } continue FOR_LOOP + case <-bcR.Quit(): break FOR_LOOP } diff --git a/privval/priv_validator_test.go b/privval/priv_validator_test.go index 4fc8f97fc..314101632 100644 --- a/privval/priv_validator_test.go +++ b/privval/priv_validator_test.go @@ -183,7 +183,7 @@ func TestDifferByTimestamp(t *testing.T) { assert.NoError(t, err, "expected no error signing proposal") signBytes := proposal.SignBytes(chainID) sig := proposal.Signature - timeStamp := clipToMS(proposal.Timestamp) + timeStamp := proposal.Timestamp // manipulate the timestamp. should get changed back proposal.Timestamp = proposal.Timestamp.Add(time.Millisecond) @@ -207,7 +207,7 @@ func TestDifferByTimestamp(t *testing.T) { signBytes := vote.SignBytes(chainID) sig := vote.Signature - timeStamp := clipToMS(vote.Timestamp) + timeStamp := vote.Timestamp // manipulate the timestamp. should get changed back vote.Timestamp = vote.Timestamp.Add(time.Millisecond) @@ -242,10 +242,3 @@ func newProposal(height int64, round int, partsHeader types.PartSetHeader) *type Timestamp: time.Now().UTC(), } } - -func clipToMS(t time.Time) time.Time { - nano := t.UnixNano() - million := int64(1000000) - nano = (nano / million) * million - return time.Unix(0, nano).UTC() -} diff --git a/state/store.go b/state/store.go index 798932541..c9d268996 100644 --- a/state/store.go +++ b/state/store.go @@ -80,6 +80,7 @@ func loadState(db dbm.DB, key []byte) (state State) { } // SaveState persists the State, the ValidatorsInfo, and the ConsensusParamsInfo to the database. +// This flushes the writes (e.g. calls SetSync). func SaveState(db dbm.DB, state State) { saveState(db, state, stateKey) } @@ -148,7 +149,7 @@ func LoadABCIResponses(db dbm.DB, height int64) (*ABCIResponses, error) { // This is useful in case we crash after app.Commit and before s.Save(). // Responses are indexed by height so they can also be loaded later to produce Merkle proofs. func saveABCIResponses(db dbm.DB, height int64, abciResponses *ABCIResponses) { - db.SetSync(calcABCIResponsesKey(height), abciResponses.Bytes()) + db.Set(calcABCIResponsesKey(height), abciResponses.Bytes()) } //----------------------------------------------------------------------------- @@ -213,7 +214,7 @@ func saveValidatorsInfo(db dbm.DB, nextHeight, changeHeight int64, valSet *types if changeHeight == nextHeight { valInfo.ValidatorSet = valSet } - db.SetSync(calcValidatorsKey(nextHeight), valInfo.Bytes()) + db.Set(calcValidatorsKey(nextHeight), valInfo.Bytes()) } //----------------------------------------------------------------------------- @@ -278,5 +279,5 @@ func saveConsensusParamsInfo(db dbm.DB, nextHeight, changeHeight int64, params t if changeHeight == nextHeight { paramsInfo.ConsensusParams = params } - db.SetSync(calcConsensusParamsKey(nextHeight), paramsInfo.Bytes()) + db.Set(calcConsensusParamsKey(nextHeight), paramsInfo.Bytes()) } diff --git a/types/canonical_json.go b/types/canonical_json.go index 258f7714b..88509712f 100644 --- a/types/canonical_json.go +++ b/types/canonical_json.go @@ -9,7 +9,7 @@ import ( // Canonical json is amino's json for structs with fields in alphabetical order // TimeFormat is used for generating the sigs -const TimeFormat = "2006-01-02T15:04:05.000Z" +const TimeFormat = time.RFC3339Nano type CanonicalJSONBlockID struct { Hash cmn.HexBytes `json:"hash,omitempty"` From b41b89732df7c8fdedad7dbd7fa005e0d4e92788 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Fri, 20 Jul 2018 14:38:27 -0700 Subject: [PATCH 2/3] Update store.go Revert to SetSync for saveABCIResponses() as per Ethan's feedback --- state/store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/state/store.go b/state/store.go index c9d268996..13e25a958 100644 --- a/state/store.go +++ b/state/store.go @@ -149,7 +149,7 @@ func LoadABCIResponses(db dbm.DB, height int64) (*ABCIResponses, error) { // This is useful in case we crash after app.Commit and before s.Save(). // Responses are indexed by height so they can also be loaded later to produce Merkle proofs. func saveABCIResponses(db dbm.DB, height int64, abciResponses *ABCIResponses) { - db.Set(calcABCIResponsesKey(height), abciResponses.Bytes()) + db.SetSync(calcABCIResponsesKey(height), abciResponses.Bytes()) } //----------------------------------------------------------------------------- From 54d753e64eaa8294a8b7900623c904bc6bf074f4 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 23 Jul 2018 22:35:55 -0400 Subject: [PATCH 3/3] fix Gopkg, add changelog --- CHANGELOG_PENDING.md | 10 ++++++++++ Gopkg.toml | 4 ---- 2 files changed, 10 insertions(+), 4 deletions(-) create mode 100644 CHANGELOG_PENDING.md diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md new file mode 100644 index 000000000..c51a2ab6f --- /dev/null +++ b/CHANGELOG_PENDING.md @@ -0,0 +1,10 @@ +# Pending + +BREAKING CHANGES: +- [types] CanonicalTime uses nanoseconds instead of clipping to ms + - breaks serialization/signing of all messages with a timestamp + +IMPROVEMENTS: +- [blockchain] Improve fast-sync logic + - tweak params + - only process one block at a time to avoid starving diff --git a/Gopkg.toml b/Gopkg.toml index e0eb2e011..3ddd36deb 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -95,10 +95,6 @@ name = "github.com/jmhodges/levigo" revision = "c42d9e0ca023e2198120196f842701bb4c55d7b9" -[prune] - go-tests = true - unused-packages = true - [[constraint]] name = "github.com/ebuchman/fail-test" revision = "95f809107225be108efcf10a3509e4ea6ceef3c4"