diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md new file mode 100644 index 000000000..c51a2ab6f --- /dev/null +++ b/CHANGELOG_PENDING.md @@ -0,0 +1,10 @@ +# Pending + +BREAKING CHANGES: +- [types] CanonicalTime uses nanoseconds instead of clipping to ms + - breaks serialization/signing of all messages with a timestamp + +IMPROVEMENTS: +- [blockchain] Improve fast-sync logic + - tweak params + - only process one block at a time to avoid starving diff --git a/Gopkg.lock b/Gopkg.lock index 5efb7254f..3c305f02b 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -131,7 +131,6 @@ version = "v1.0" [[projects]] - branch = "master" name = "github.com/jmhodges/levigo" packages = ["."] revision = "c42d9e0ca023e2198120196f842701bb4c55d7b9" diff --git a/Gopkg.toml b/Gopkg.toml index f53c5bb94..3ddd36deb 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -91,6 +91,10 @@ name = "google.golang.org/genproto" revision = "7fd901a49ba6a7f87732eb344f6e3c5b19d1b200" +[[override]] + name = "github.com/jmhodges/levigo" + revision = "c42d9e0ca023e2198120196f842701bb4c55d7b9" + [[constraint]] name = "github.com/ebuchman/fail-test" revision = "95f809107225be108efcf10a3509e4ea6ceef3c4" diff --git a/Makefile b/Makefile index 50e626cdc..b567fe793 100644 --- a/Makefile +++ b/Makefile @@ -5,9 +5,10 @@ GOTOOLS = \ github.com/gogo/protobuf/protoc-gen-gogo \ github.com/gogo/protobuf/gogoproto \ github.com/square/certstrap -PACKAGES=$(shell go list ./...) +PACKAGES=$(shell go list ./... | grep -v '/vendor/') + INCLUDE = -I=. -I=${GOPATH}/src -I=${GOPATH}/src/github.com/gogo/protobuf/protobuf -BUILD_TAGS?=tendermint +BUILD_TAGS?='tendermint' BUILD_FLAGS = -ldflags "-X github.com/tendermint/tendermint/version.GitCommit=`git rev-parse --short=8 HEAD`" all: check build test install @@ -19,13 +20,13 @@ check: check_tools ensure_deps ### Build Tendermint build: - CGO_ENABLED=0 go build $(BUILD_FLAGS) -tags '$(BUILD_TAGS)' -o build/tendermint ./cmd/tendermint/ + CGO_ENABLED=0 go build $(BUILD_FLAGS) -tags $(BUILD_TAGS) -o build/tendermint ./cmd/tendermint/ build_race: - CGO_ENABLED=0 go build -race $(BUILD_FLAGS) -tags '$(BUILD_TAGS)' -o build/tendermint ./cmd/tendermint + CGO_ENABLED=0 go build -race $(BUILD_FLAGS) -tags $(BUILD_TAGS) -o build/tendermint ./cmd/tendermint install: - CGO_ENABLED=0 go install $(BUILD_FLAGS) -tags '$(BUILD_TAGS)' ./cmd/tendermint + CGO_ENABLED=0 go install $(BUILD_FLAGS) -tags $(BUILD_TAGS) ./cmd/tendermint ######################################## ### Protobuf @@ -57,7 +58,7 @@ install_abci: # dist builds binaries for all platforms and packages them for distribution # TODO add abci to these scripts dist: - @BUILD_TAGS='$(BUILD_TAGS)' sh -c "'$(CURDIR)/scripts/dist.sh'" + @BUILD_TAGS=$(BUILD_TAGS) sh -c "'$(CURDIR)/scripts/dist.sh'" ######################################## ### Tools & dependencies @@ -107,7 +108,7 @@ draw_deps: get_deps_bin_size: @# Copy of build recipe with additional flags to perform binary size analysis - $(eval $(shell go build -work -a $(BUILD_FLAGS) -tags '$(BUILD_TAGS)' -o build/tendermint ./cmd/tendermint/ 2>&1)) + $(eval $(shell go build -work -a $(BUILD_FLAGS) -tags $(BUILD_TAGS) -o build/tendermint ./cmd/tendermint/ 2>&1)) @find $(WORK) -type f -name "*.a" | xargs -I{} du -hxs "{}" | sort -rh | sed -e s:${WORK}/::g > deps_bin_size.log @echo "Results can be found here: $(CURDIR)/deps_bin_size.log" @@ -207,7 +208,7 @@ vagrant_test: ### go tests test: @echo "--> Running go test" - @go test $(PACKAGES) + @GOCACHE=off go test $(PACKAGES) test_race: @echo "--> Running go test --race" diff --git a/blockchain/pool.go b/blockchain/pool.go index e379d846a..a881c7cb7 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -29,10 +29,10 @@ eg, L = latency = 0.1s */ const ( - requestIntervalMS = 100 - maxTotalRequesters = 1000 + requestIntervalMS = 2 + maxTotalRequesters = 600 maxPendingRequests = maxTotalRequesters - maxPendingRequestsPerPeer = 50 + maxPendingRequestsPerPeer = 20 // Minimum recv rate to ensure we're receiving blocks from a peer fast // enough. If a peer is not sending us data at at least that rate, we @@ -219,14 +219,12 @@ func (pool *BlockPool) RedoRequest(height int64) p2p.ID { defer pool.mtx.Unlock() request := pool.requesters[height] - - if request.block == nil { - panic("Expected block to be non-nil") + peerID := request.getPeerID() + if peerID != p2p.ID("") { + // RemovePeer will redo all requesters associated with this peer. + pool.removePeer(peerID) } - - // RemovePeer will redo all requesters associated with this peer. - pool.removePeer(request.peerID) - return request.peerID + return peerID } // TODO: ensure that blocks come in order for each peer. diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 449a42ff0..eadeedc91 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -18,7 +18,8 @@ const ( // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height) BlockchainChannel = byte(0x40) - trySyncIntervalMS = 50 + trySyncIntervalMS = 10 + // stop syncing when last block's time is // within this much of the system time. // stopSyncingDurationMinutes = 10 @@ -76,8 +77,9 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl store.Height())) } - const capacity = 1000 // must be bigger than peers count - requestsCh := make(chan BlockRequest, capacity) + requestsCh := make(chan BlockRequest, maxTotalRequesters) + + const capacity = 1000 // must be bigger than peers count errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock pool := NewBlockPool( @@ -209,7 +211,6 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) // Handle messages from the poolReactor telling the reactor what to do. // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! -// (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.) func (bcR *BlockchainReactor) poolRoutine() { trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) @@ -224,6 +225,8 @@ func (bcR *BlockchainReactor) poolRoutine() { lastHundred := time.Now() lastRate := 0.0 + didProcessCh := make(chan struct{}, 1) + FOR_LOOP: for { select { @@ -239,14 +242,17 @@ FOR_LOOP: // The pool handles timeouts, just let it go. continue FOR_LOOP } + case err := <-bcR.errorsCh: peer := bcR.Switch.Peers().Get(err.peerID) if peer != nil { bcR.Switch.StopPeerForError(peer, err) } + case <-statusUpdateTicker.C: // ask for status updates go bcR.BroadcastStatusRequest() // nolint: errcheck + case <-switchToConsensusTicker.C: height, numPending, lenRequesters := bcR.pool.GetStatus() outbound, inbound, _ := bcR.Switch.NumPeers() @@ -261,60 +267,78 @@ FOR_LOOP: break FOR_LOOP } + case <-trySyncTicker.C: // chan time - // This loop can be slow as long as it's doing syncing work. - SYNC_LOOP: - for i := 0; i < 10; i++ { - // See if there are any blocks to sync. - first, second := bcR.pool.PeekTwoBlocks() - //bcR.Logger.Info("TrySync peeked", "first", first, "second", second) - if first == nil || second == nil { - // We need both to sync the first block. - break SYNC_LOOP + select { + case didProcessCh <- struct{}{}: + default: + } + + case <-didProcessCh: + // NOTE: It is a subtle mistake to process more than a single block + // at a time (e.g. 10) here, because we only TrySend 1 request per + // loop. The ratio mismatch can result in starving of blocks, a + // sudden burst of requests and responses, and repeat. + // Consequently, it is better to split these routines rather than + // coupling them as it's written here. TODO uncouple from request + // routine. + + // See if there are any blocks to sync. + first, second := bcR.pool.PeekTwoBlocks() + //bcR.Logger.Info("TrySync peeked", "first", first, "second", second) + if first == nil || second == nil { + // We need both to sync the first block. + continue FOR_LOOP + } else { + // Try again quickly next loop. + didProcessCh <- struct{}{} + } + + firstParts := first.MakePartSet(state.ConsensusParams.BlockPartSizeBytes) + firstPartsHeader := firstParts.Header() + firstID := types.BlockID{first.Hash(), firstPartsHeader} + // Finally, verify the first block using the second's commit + // NOTE: we can probably make this more efficient, but note that calling + // first.Hash() doesn't verify the tx contents, so MakePartSet() is + // currently necessary. + err := state.Validators.VerifyCommit( + chainID, firstID, first.Height, second.LastCommit) + if err != nil { + bcR.Logger.Error("Error in validation", "err", err) + peerID := bcR.pool.RedoRequest(first.Height) + peer := bcR.Switch.Peers().Get(peerID) + if peer != nil { + // NOTE: we've already removed the peer's request, but we + // still need to clean up the rest. + bcR.Switch.StopPeerForError(peer, fmt.Errorf("BlockchainReactor validation error: %v", err)) } - firstParts := first.MakePartSet(state.ConsensusParams.BlockPartSizeBytes) - firstPartsHeader := firstParts.Header() - firstID := types.BlockID{first.Hash(), firstPartsHeader} - // Finally, verify the first block using the second's commit - // NOTE: we can probably make this more efficient, but note that calling - // first.Hash() doesn't verify the tx contents, so MakePartSet() is - // currently necessary. - err := state.Validators.VerifyCommit( - chainID, firstID, first.Height, second.LastCommit) + continue FOR_LOOP + } else { + bcR.pool.PopRequest() + + // TODO: batch saves so we dont persist to disk every block + bcR.store.SaveBlock(first, firstParts, second.LastCommit) + + // TODO: same thing for app - but we would need a way to + // get the hash without persisting the state + var err error + state, err = bcR.blockExec.ApplyBlock(state, firstID, first) if err != nil { - bcR.Logger.Error("Error in validation", "err", err) - peerID := bcR.pool.RedoRequest(first.Height) - peer := bcR.Switch.Peers().Get(peerID) - if peer != nil { - bcR.Switch.StopPeerForError(peer, fmt.Errorf("BlockchainReactor validation error: %v", err)) - } - break SYNC_LOOP - } else { - bcR.pool.PopRequest() - - // TODO: batch saves so we dont persist to disk every block - bcR.store.SaveBlock(first, firstParts, second.LastCommit) - - // TODO: same thing for app - but we would need a way to - // get the hash without persisting the state - var err error - state, err = bcR.blockExec.ApplyBlock(state, firstID, first) - if err != nil { - // TODO This is bad, are we zombie? - cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", - first.Height, first.Hash(), err)) - } - blocksSynced++ - - if blocksSynced%100 == 0 { - lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) - bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height, - "max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate) - lastHundred = time.Now() - } + // TODO This is bad, are we zombie? + cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", + first.Height, first.Hash(), err)) + } + blocksSynced++ + + if blocksSynced%100 == 0 { + lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) + bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height, + "max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate) + lastHundred = time.Now() } } continue FOR_LOOP + case <-bcR.Quit(): break FOR_LOOP } diff --git a/privval/priv_validator_test.go b/privval/priv_validator_test.go index 548ca6cae..7c9c93fcf 100644 --- a/privval/priv_validator_test.go +++ b/privval/priv_validator_test.go @@ -184,7 +184,7 @@ func TestDifferByTimestamp(t *testing.T) { assert.NoError(t, err, "expected no error signing proposal") signBytes := proposal.SignBytes(chainID) sig := proposal.Signature - timeStamp := clipToMS(proposal.Timestamp) + timeStamp := proposal.Timestamp // manipulate the timestamp. should get changed back proposal.Timestamp = proposal.Timestamp.Add(time.Millisecond) @@ -208,7 +208,7 @@ func TestDifferByTimestamp(t *testing.T) { signBytes := vote.SignBytes(chainID) sig := vote.Signature - timeStamp := clipToMS(vote.Timestamp) + timeStamp := vote.Timestamp // manipulate the timestamp. should get changed back vote.Timestamp = vote.Timestamp.Add(time.Millisecond) @@ -243,10 +243,3 @@ func newProposal(height int64, round int, partsHeader types.PartSetHeader) *type Timestamp: time.Now().UTC(), } } - -func clipToMS(t time.Time) time.Time { - nano := t.UnixNano() - million := int64(1000000) - nano = (nano / million) * million - return time.Unix(0, nano).UTC() -} diff --git a/state/store.go b/state/store.go index 9e94e36fa..3a1a6231b 100644 --- a/state/store.go +++ b/state/store.go @@ -80,6 +80,7 @@ func loadState(db dbm.DB, key []byte) (state State) { } // SaveState persists the State, the ValidatorsInfo, and the ConsensusParamsInfo to the database. +// This flushes the writes (e.g. calls SetSync). func SaveState(db dbm.DB, state State) { saveState(db, state, stateKey) } @@ -218,7 +219,7 @@ func saveValidatorsInfo(db dbm.DB, nextHeight, changeHeight int64, valSet *types if changeHeight == nextHeight { valInfo.ValidatorSet = valSet } - db.SetSync(calcValidatorsKey(nextHeight), valInfo.Bytes()) + db.Set(calcValidatorsKey(nextHeight), valInfo.Bytes()) } //----------------------------------------------------------------------------- @@ -289,5 +290,5 @@ func saveConsensusParamsInfo(db dbm.DB, nextHeight, changeHeight int64, params t if changeHeight == nextHeight { paramsInfo.ConsensusParams = params } - db.SetSync(calcConsensusParamsKey(nextHeight), paramsInfo.Bytes()) + db.Set(calcConsensusParamsKey(nextHeight), paramsInfo.Bytes()) } diff --git a/types/canonical_json.go b/types/canonical_json.go index 189a8a7a2..aca9e9b79 100644 --- a/types/canonical_json.go +++ b/types/canonical_json.go @@ -9,7 +9,7 @@ import ( // Canonical json is amino's json for structs with fields in alphabetical order // TimeFormat is used for generating the sigs -const TimeFormat = "2006-01-02T15:04:05.000Z" +const TimeFormat = time.RFC3339Nano type CanonicalJSONBlockID struct { Hash cmn.HexBytes `json:"hash,omitempty"`