diff --git a/CHANGELOG.md b/CHANGELOG.md index bfdb9a50e..e8cb63ba5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,52 @@ # Changelog +## v0.26.3 + +*November 17th, 2018* + +Special thanks to external contributors on this release: +@danil-lashin, @kevlubkcm, @krhubert, @srmo + +Friendly reminder, we have a [bug bounty +program](https://hackerone.com/tendermint). + +### BREAKING CHANGES: + +* Go API + - [rpc] [\#2791](https://github.com/tendermint/tendermint/issues/2791) Functions that start HTTP servers are now blocking: + - Impacts `StartHTTPServer`, `StartHTTPAndTLSServer`, and `StartGRPCServer` + - These functions now take a `net.Listener` instead of an address + - [rpc] [\#2767](https://github.com/tendermint/tendermint/issues/2767) Subscribing to events + `NewRound` and `CompleteProposal` return new types `EventDataNewRound` and + `EventDataCompleteProposal`, respectively, instead of the generic `EventDataRoundState`. (@kevlubkcm) + +### FEATURES: + +- [log] [\#2843](https://github.com/tendermint/tendermint/issues/2843) New `log_format` config option, which can be set to 'plain' for colored + text or 'json' for JSON output +- [types] [\#2767](https://github.com/tendermint/tendermint/issues/2767) New event types EventDataNewRound (with ProposerInfo) and EventDataCompleteProposal (with BlockID). (@kevlubkcm) + +### IMPROVEMENTS: + +- [dep] [\#2844](https://github.com/tendermint/tendermint/issues/2844) Dependencies are no longer pinned to an exact version in the + Gopkg.toml: + - Serialization libs are allowed to vary by patch release + - Other libs are allowed to vary by minor release +- [p2p] [\#2857](https://github.com/tendermint/tendermint/issues/2857) "Send failed" is logged at debug level instead of error. +- [rpc] [\#2780](https://github.com/tendermint/tendermint/issues/2780) Add read and write timeouts to HTTP servers +- [state] [\#2848](https://github.com/tendermint/tendermint/issues/2848) Make "Update to validators" msg value pretty (@danil-lashin) + +### BUG FIXES: +- [consensus] [\#2819](https://github.com/tendermint/tendermint/issues/2819) Don't send proposalHearbeat if not a validator +- [docs] [\#2859](https://github.com/tendermint/tendermint/issues/2859) Fix ConsensusParams details in spec +- [libs/autofile] [\#2760](https://github.com/tendermint/tendermint/issues/2760) Comment out autofile permissions check - should fix + running Tendermint on Windows +- [p2p] [\#2869](https://github.com/tendermint/tendermint/issues/2869) Set connection config properly instead of always using default +- [p2p/pex] [\#2802](https://github.com/tendermint/tendermint/issues/2802) Seed mode fixes: + - Only disconnect from inbound peers + - Use FlushStop instead of Sleep to ensure all messages are sent before + disconnecting + ## v0.26.2 *November 15th, 2018* diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 6494867d7..fd340d4da 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -1,6 +1,6 @@ # Pending -## v0.26.3 +## v0.26.4 *TBD* @@ -21,7 +21,6 @@ program](https://hackerone.com/tendermint). * P2P Protocol - ### FEATURES: ### IMPROVEMENTS: diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index a0057aae5..3dab3b8ab 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -69,16 +69,39 @@ vagrant ssh make test ``` -## Testing +## Changelog -All repos should be hooked up to [CircleCI](https://circleci.com/). +Every fix, improvement, feature, or breaking change should be made in a +pull-request that includes an update to the `CHANGELOG_PENDING.md` file. -If they have `.go` files in the root directory, they will be automatically -tested by circle using `go test -v -race ./...`. If not, they will need a -`circle.yml`. Ideally, every repo has a `Makefile` that defines `make test` and -includes its continuous integration status using a badge in the `README.md`. +Changelog entries should be formatted as follows: -## Changelog +``` +- [module] \#xxx Some description about the change (@contributor) +``` + +Here, `module` is the part of the code that changed (typically a +top-level Go package), `xxx` is the pull-request number, and `contributor` +is the author/s of the change. + +It's also acceptable for `xxx` to refer to the relevent issue number, but pull-request +numbers are preferred. +Note this means pull-requests should be opened first so the changelog can then +be updated with the pull-request's number. +There is no need to include the full link, as this will be added +automatically during release. But please include the backslash and pound, eg. `\#2313`. + +Changelog entries should be ordered alphabetically according to the +`module`, and numerically according to the pull-request number. + +Changes with multiple classifications should be doubly included (eg. a bug fix +that is also a breaking change should be recorded under both). + +Breaking changes are further subdivided according to the APIs/users they impact. +Any change that effects multiple APIs/users should be recorded multiply - for +instance, a change to the `Blockchain Protocol` that removes a field from the +header should also be recorded under `CLI/RPC/Config` since the field will be +removed from the header in rpc responses as well. ## Branching Model and Release @@ -104,13 +127,14 @@ master constitutes a tagged release. - start on `develop` - run integration tests (see `test_integrations` in Makefile) - prepare changelog: - - copy `CHANGELOG_PENDING.md` to `CHANGELOG.md` + - copy `CHANGELOG_PENDING.md` to top of `CHANGELOG.md` - run `python ./scripts/linkify_changelog.py CHANGELOG.md` to add links for all issues - run `bash ./scripts/authors.sh` to get a list of authors since the latest release, and add the github aliases of external contributors to the top of the changelog. To lookup an alias from an email, try `bash ./scripts/authors.sh ` + - reset the `CHANGELOG_PENDING.md` - bump versions - push to release/vX.X.X to run the extended integration tests on the CI - merge to master @@ -127,3 +151,13 @@ master constitutes a tagged release. - merge hotfix-vX.X.X to master - merge hotfix-vX.X.X to develop - delete the hotfix-vX.X.X branch + + +## Testing + +All repos should be hooked up to [CircleCI](https://circleci.com/). + +If they have `.go` files in the root directory, they will be automatically +tested by circle using `go test -v -race ./...`. If not, they will need a +`circle.yml`. Ideally, every repo has a `Makefile` that defines `make test` and +includes its continuous integration status using a badge in the `README.md`. diff --git a/Gopkg.lock b/Gopkg.lock index 8695205e7..0c4779c80 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -128,14 +128,6 @@ revision = "ea4d1f681babbce9545c9c5f3d5194a789c89f5b" version = "v1.2.0" -[[projects]] - digest = "1:b0c25f00bad20d783d259af2af8666969e2fc343fa0dc9efe52936bbd67fb758" - name = "github.com/rs/cors" - packages = ["."] - pruneopts = "UT" - revision = "9a47f48565a795472d43519dd49aac781f3034fb" - version = "v1.6.0" - [[projects]] digest = "1:ea40c24cdbacd054a6ae9de03e62c5f252479b96c716375aace5c120d68647c8" name = "github.com/hashicorp/hcl" @@ -226,14 +218,16 @@ version = "v1.0.0" [[projects]] - digest = "1:c1a04665f9613e082e1209cf288bf64f4068dcd6c87a64bf1c4ff006ad422ba0" + digest = "1:26663fafdea73a38075b07e8e9d82fc0056379d2be8bb4e13899e8fda7c7dd23" name = "github.com/prometheus/client_golang" packages = [ "prometheus", + "prometheus/internal", "prometheus/promhttp", ] pruneopts = "UT" - revision = "ae27198cdd90bf12cd134ad79d1366a6cf49f632" + revision = "abad2d1bd44235a26707c172eab6bca5bf2dbad3" + version = "v0.9.1" [[projects]] branch = "master" @@ -275,6 +269,14 @@ pruneopts = "UT" revision = "e2704e165165ec55d062f5919b4b29494e9fa790" +[[projects]] + digest = "1:b0c25f00bad20d783d259af2af8666969e2fc343fa0dc9efe52936bbd67fb758" + name = "github.com/rs/cors" + packages = ["."] + pruneopts = "UT" + revision = "9a47f48565a795472d43519dd49aac781f3034fb" + version = "v1.6.0" + [[projects]] digest = "1:6a4a11ba764a56d2758899ec6f3848d24698d48442ebce85ee7a3f63284526cd" name = "github.com/spf13/afero" @@ -524,6 +526,7 @@ "github.com/prometheus/client_golang/prometheus", "github.com/prometheus/client_golang/prometheus/promhttp", "github.com/rcrowley/go-metrics", + "github.com/rs/cors", "github.com/spf13/cobra", "github.com/spf13/viper", "github.com/stretchr/testify/assert", diff --git a/Gopkg.toml b/Gopkg.toml index 1ab6a18e9..c5e625e9b 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -20,57 +20,60 @@ # unused-packages = true # ########################################################### -# NOTE: All packages should be pinned to specific versions. -# Packages without releases must pin to a commit. - +# Allow only patch releases for serialization libraries [[constraint]] - name = "github.com/go-kit/kit" - version = "=0.6.0" + name = "github.com/tendermint/go-amino" + version = "~0.14.1" [[constraint]] name = "github.com/gogo/protobuf" - version = "=1.1.1" + version = "~1.1.1" [[constraint]] name = "github.com/golang/protobuf" - version = "=1.1.0" + version = "~1.1.0" + +# Allow only minor releases for other libraries +[[constraint]] + name = "github.com/go-kit/kit" + version = "^0.6.0" [[constraint]] name = "github.com/gorilla/websocket" - version = "=1.2.0" + version = "^1.2.0" [[constraint]] name = "github.com/rs/cors" - version = "1.6.0" + version = "^1.6.0" [[constraint]] name = "github.com/pkg/errors" - version = "=0.8.0" + version = "^0.8.0" [[constraint]] name = "github.com/spf13/cobra" - version = "=0.0.1" + version = "^0.0.1" [[constraint]] name = "github.com/spf13/viper" - version = "=1.0.0" + version = "^1.0.0" [[constraint]] name = "github.com/stretchr/testify" - version = "=1.2.1" - -[[constraint]] - name = "github.com/tendermint/go-amino" - version = "v0.14.1" + version = "^1.2.1" [[constraint]] name = "google.golang.org/grpc" - version = "=1.13.0" + version = "^1.13.0" [[constraint]] name = "github.com/fortytw2/leaktest" - version = "=1.2.0" + version = "^1.2.0" + +[[constraint]] + name = "github.com/prometheus/client_golang" + version = "^0.9.1" ################################### ## Some repos dont have releases. @@ -94,11 +97,6 @@ name = "github.com/tendermint/btcd" revision = "e5840949ff4fff0c56f9b6a541e22b63581ea9df" -# Haven't made a release since 2016. -[[constraint]] - name = "github.com/prometheus/client_golang" - revision = "ae27198cdd90bf12cd134ad79d1366a6cf49f632" - [[constraint]] name = "github.com/rcrowley/go-metrics" revision = "e2704e165165ec55d062f5919b4b29494e9fa790" diff --git a/Vagrantfile b/Vagrantfile index 320f3b1c3..f058d78e7 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -53,6 +53,6 @@ Vagrant.configure("2") do |config| # get all deps and tools, ready to install/test su - vagrant -c 'source /home/vagrant/.bash_profile' - su - vagrant -c 'cd /home/vagrant/go/src/github.com/tendermint/tendermint && make get_tools && make get_vendor_deps' + su - vagrant -c 'cd /home/vagrant/go/src/github.com/tendermint/tendermint && make get_tools && make get_dev_tools && make get_vendor_deps' SHELL end diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go index d5cd233a3..94aabc5e0 100644 --- a/abci/client/grpc_client.go +++ b/abci/client/grpc_client.go @@ -54,7 +54,7 @@ RETRY_LOOP: if cli.mustConnect { return err } - cli.Logger.Error(fmt.Sprintf("abci.grpcClient failed to connect to %v. Retrying...\n", cli.addr)) + cli.Logger.Error(fmt.Sprintf("abci.grpcClient failed to connect to %v. Retrying...\n", cli.addr), "err", err) time.Sleep(time.Second * dialRetryIntervalSeconds) continue RETRY_LOOP } diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index 531d12bca..562676605 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -67,7 +67,7 @@ RETRY_LOOP: if cli.mustConnect { return err } - cli.Logger.Error(fmt.Sprintf("abci.socketClient failed to connect to %v. Retrying...", cli.addr)) + cli.Logger.Error(fmt.Sprintf("abci.socketClient failed to connect to %v. Retrying...", cli.addr), "err", err) time.Sleep(time.Second * dialRetryIntervalSeconds) continue RETRY_LOOP } diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index fca063e0c..9b26f919a 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -197,6 +197,7 @@ func (tp *bcrTestPeer) TrySend(chID byte, msgBytes []byte) bool { return true } +func (tp *bcrTestPeer) FlushStop() {} func (tp *bcrTestPeer) Send(chID byte, msgBytes []byte) bool { return tp.TrySend(chID, msgBytes) } func (tp *bcrTestPeer) NodeInfo() p2p.NodeInfo { return p2p.DefaultNodeInfo{} } func (tp *bcrTestPeer) Status() p2p.ConnectionStatus { return p2p.ConnectionStatus{} } diff --git a/cmd/tendermint/commands/root.go b/cmd/tendermint/commands/root.go index 89ffbe749..6d79f75c0 100644 --- a/cmd/tendermint/commands/root.go +++ b/cmd/tendermint/commands/root.go @@ -54,6 +54,9 @@ var RootCmd = &cobra.Command{ if err != nil { return err } + if config.LogFormat == cfg.LogFormatJSON { + logger = log.NewTMJSONLogger(log.NewSyncWriter(os.Stdout)) + } logger, err = tmflags.ParseLogLevel(config.LogLevel, logger, cfg.DefaultLogLevel()) if err != nil { return err diff --git a/config/config.go b/config/config.go index ea6582c09..23b033994 100644 --- a/config/config.go +++ b/config/config.go @@ -14,6 +14,11 @@ const ( FuzzModeDrop = iota // FuzzModeDelay is a mode in which we randomly sleep FuzzModeDelay + + // LogFormatPlain is a format for colored text + LogFormatPlain = "plain" + // LogFormatJSON is a format for json output + LogFormatJSON = "json" ) // NOTE: Most of the structs & relevant comments + the @@ -94,6 +99,9 @@ func (cfg *Config) SetRoot(root string) *Config { // ValidateBasic performs basic validation (checking param bounds, etc.) and // returns an error if any check fails. func (cfg *Config) ValidateBasic() error { + if err := cfg.BaseConfig.ValidateBasic(); err != nil { + return err + } if err := cfg.RPC.ValidateBasic(); err != nil { return errors.Wrap(err, "Error in [rpc] section") } @@ -145,6 +153,9 @@ type BaseConfig struct { // Output level for logging LogLevel string `mapstructure:"log_level"` + // Output format: 'plain' (colored text) or 'json' + LogFormat string `mapstructure:"log_format"` + // Path to the JSON file containing the initial validator set and other meta data Genesis string `mapstructure:"genesis_file"` @@ -179,6 +190,7 @@ func DefaultBaseConfig() BaseConfig { ProxyApp: "tcp://127.0.0.1:26658", ABCI: "socket", LogLevel: DefaultPackageLogLevels(), + LogFormat: LogFormatPlain, ProfListenAddress: "", FastSync: true, FilterPeers: false, @@ -221,6 +233,17 @@ func (cfg BaseConfig) DBDir() string { return rootify(cfg.DBPath, cfg.RootDir) } +// ValidateBasic performs basic validation (checking param bounds, etc.) and +// returns an error if any check fails. +func (cfg BaseConfig) ValidateBasic() error { + switch cfg.LogFormat { + case LogFormatPlain, LogFormatJSON: + default: + return errors.New("unknown log_format (must be 'plain' or 'json')") + } + return nil +} + // DefaultLogLevel returns a default log level of "error" func DefaultLogLevel() string { return "error" diff --git a/config/toml.go b/config/toml.go index 89be3783d..6f0578e44 100644 --- a/config/toml.go +++ b/config/toml.go @@ -86,6 +86,9 @@ db_dir = "{{ js .BaseConfig.DBPath }}" # Output level for logging, including package level options log_level = "{{ .BaseConfig.LogLevel }}" +# Output format: 'plain' (colored text) or 'json' +log_format = "{{ .BaseConfig.LogFormat }}" + ##### additional base config options ##### # Path to the JSON file containing the initial validator set and other meta data diff --git a/consensus/common_test.go b/consensus/common_test.go index 4f48f4424..8a2d8a42f 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -405,8 +405,38 @@ func ensureNewVote(voteCh <-chan interface{}, height int64, round int) { } func ensureNewRound(roundCh <-chan interface{}, height int64, round int) { - ensureNewEvent(roundCh, height, round, ensureTimeout, - "Timeout expired while waiting for NewRound event") + select { + case <-time.After(ensureTimeout): + panic("Timeout expired while waiting for NewRound event") + case ev := <-roundCh: + rs, ok := ev.(types.EventDataNewRound) + if !ok { + panic( + fmt.Sprintf( + "expected a EventDataNewRound, got %v.Wrong subscription channel?", + reflect.TypeOf(rs))) + } + if rs.Height != height { + panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height)) + } + if rs.Round != round { + panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round)) + } + } +} + +func ensureProposalHeartbeat(heartbeatCh <-chan interface{}) { + select { + case <-time.After(ensureTimeout): + panic("Timeout expired while waiting for ProposalHeartbeat event") + case ev := <-heartbeatCh: + heartbeat, ok := ev.(types.EventDataProposalHeartbeat) + if !ok { + panic(fmt.Sprintf("expected a *types.EventDataProposalHeartbeat, "+ + "got %v. wrong subscription channel?", + reflect.TypeOf(heartbeat))) + } + } } func ensureNewTimeout(timeoutCh <-chan interface{}, height int64, round int, timeout int64) { @@ -416,8 +446,24 @@ func ensureNewTimeout(timeoutCh <-chan interface{}, height int64, round int, tim } func ensureNewProposal(proposalCh <-chan interface{}, height int64, round int) { - ensureNewEvent(proposalCh, height, round, ensureTimeout, - "Timeout expired while waiting for NewProposal event") + select { + case <-time.After(ensureTimeout): + panic("Timeout expired while waiting for NewProposal event") + case ev := <-proposalCh: + rs, ok := ev.(types.EventDataCompleteProposal) + if !ok { + panic( + fmt.Sprintf( + "expected a EventDataCompleteProposal, got %v.Wrong subscription channel?", + reflect.TypeOf(rs))) + } + if rs.Height != height { + panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height)) + } + if rs.Round != round { + panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round)) + } + } } func ensureNewValidBlock(validBlockCh <-chan interface{}, height int64, round int) { @@ -492,6 +538,30 @@ func ensureVote(voteCh <-chan interface{}, height int64, round int, } } +func ensureProposal(proposalCh <-chan interface{}, height int64, round int, propId types.BlockID) { + select { + case <-time.After(ensureTimeout): + panic("Timeout expired while waiting for NewProposal event") + case ev := <-proposalCh: + rs, ok := ev.(types.EventDataCompleteProposal) + if !ok { + panic( + fmt.Sprintf( + "expected a EventDataCompleteProposal, got %v.Wrong subscription channel?", + reflect.TypeOf(rs))) + } + if rs.Height != height { + panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height)) + } + if rs.Round != round { + panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round)) + } + if !rs.BlockID.Equals(propId) { + panic("Proposed block does not match expected block") + } + } +} + func ensurePrecommit(voteCh <-chan interface{}, height int64, round int) { ensureVote(voteCh, height, round, types.PrecommitType) } diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index 3dc1cd5ff..6d36d1e74 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -71,18 +71,18 @@ func TestMempoolProgressInHigherRound(t *testing.T) { } startTestRound(cs, height, round) - ensureNewRoundStep(newRoundCh, height, round) // first round at first height + ensureNewRound(newRoundCh, height, round) // first round at first height ensureNewEventOnChannel(newBlockCh) // first block gets committed height = height + 1 // moving to the next height round = 0 - ensureNewRoundStep(newRoundCh, height, round) // first round at next height + ensureNewRound(newRoundCh, height, round) // first round at next height deliverTxsRange(cs, 0, 1) // we deliver txs, but dont set a proposal so we get the next round ensureNewTimeout(timeoutCh, height, round, cs.config.TimeoutPropose.Nanoseconds()) round = round + 1 // moving to the next round - ensureNewRoundStep(newRoundCh, height, round) // wait for the next round + ensureNewRound(newRoundCh, height, round) // wait for the next round ensureNewEventOnChannel(newBlockCh) // now we can commit the block } diff --git a/consensus/reactor.go b/consensus/reactor.go index 1768a8f08..b3298e9dc 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -402,7 +402,7 @@ func (conR *ConsensusReactor) unsubscribeFromBroadcastEvents() { func (conR *ConsensusReactor) broadcastProposalHeartbeatMessage(hb *types.Heartbeat) { conR.Logger.Debug("Broadcasting proposal heartbeat message", - "height", hb.Height, "round", hb.Round, "sequence", hb.Sequence) + "height", hb.Height, "round", hb.Round, "sequence", hb.Sequence, "address", hb.ValidatorAddress) msg := &ProposalHeartbeatMessage{hb} conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(msg)) } diff --git a/consensus/state.go b/consensus/state.go index e8603011f..0f7b56bc5 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -772,7 +772,7 @@ func (cs *ConsensusState) enterNewRound(height int64, round int) { cs.Votes.SetRound(round + 1) // also track next round (round+1) to allow round-skipping cs.triggeredTimeoutPrecommit = false - cs.eventBus.PublishEventNewRound(cs.RoundStateEvent()) + cs.eventBus.PublishEventNewRound(cs.NewRoundEvent()) cs.metrics.Rounds.Set(float64(round)) // Wait for txs to be available in the mempool @@ -802,8 +802,14 @@ func (cs *ConsensusState) needProofBlock(height int64) bool { } func (cs *ConsensusState) proposalHeartbeat(height int64, round int) { - counter := 0 + logger := cs.Logger.With("height", height, "round", round) addr := cs.privValidator.GetAddress() + + if !cs.Validators.HasAddress(addr) { + logger.Debug("Not sending proposalHearbeat. This node is not a validator", "addr", addr, "vals", cs.Validators) + return + } + counter := 0 valIndex, _ := cs.Validators.GetByAddress(addr) chainID := cs.state.ChainID for { @@ -1404,7 +1410,7 @@ func (cs *ConsensusState) defaultSetProposal(proposal *types.Proposal) error { return nil } - // Verify POLRound, which must be -1 or in range [0, proposal.Round). + // Verify POLRound, which must be -1 or in range [0, proposal.Round). if proposal.POLRound < -1 || (proposal.POLRound >= 0 && proposal.POLRound >= proposal.Round) { return ErrInvalidProposalPOLRound @@ -1462,7 +1468,7 @@ func (cs *ConsensusState) addProposalBlockPart(msg *BlockPartMessage, peerID p2p } // NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal cs.Logger.Info("Received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash()) - cs.eventBus.PublishEventCompleteProposal(cs.RoundStateEvent()) + cs.eventBus.PublishEventCompleteProposal(cs.CompleteProposalEvent()) // Update Valid* if we can. prevotes := cs.Votes.Prevotes(cs.Round) diff --git a/consensus/state_test.go b/consensus/state_test.go index 9bf4fada5..19dde0532 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -11,6 +11,8 @@ import ( "github.com/stretchr/testify/require" cstypes "github.com/tendermint/tendermint/consensus/types" + tmevents "github.com/tendermint/tendermint/libs/events" + cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/log" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" @@ -197,9 +199,8 @@ func TestStateBadProposal(t *testing.T) { stateHash[0] = byte((stateHash[0] + 1) % 255) propBlock.AppHash = stateHash propBlockParts := propBlock.MakePartSet(partSize) - proposal := types.NewProposal( - vs2.Height, round, -1, - types.BlockID{propBlock.Hash(), propBlockParts.Header()}) + blockID := types.BlockID{propBlock.Hash(), propBlockParts.Header()} + proposal := types.NewProposal(vs2.Height, round, -1, blockID) if err := vs2.SignProposal(config.ChainID(), proposal); err != nil { t.Fatal("failed to sign bad proposal", err) } @@ -213,7 +214,7 @@ func TestStateBadProposal(t *testing.T) { startTestRound(cs1, height, round) // wait for proposal - ensureNewProposal(proposalCh, height, round) + ensureProposal(proposalCh, height, round, blockID) // wait for prevote ensurePrevote(voteCh, height, round) @@ -1028,6 +1029,33 @@ func TestSetValidBlockOnDelayedPrevote(t *testing.T) { assert.True(t, rs.ValidRound == round) } +// regression for #2518 +func TestNoHearbeatWhenNotValidator(t *testing.T) { + cs, _ := randConsensusState(4) + cs.Validators = types.NewValidatorSet(nil) // make sure we are not in the validator set + + cs.evsw.AddListenerForEvent("testing", types.EventProposalHeartbeat, + func(data tmevents.EventData) { + t.Errorf("Should not have broadcasted heartbeat") + }) + go cs.proposalHeartbeat(10, 1) + + cs.Stop() + + // if a faulty implementation sends an event, we should wait here a little bit to make sure we don't miss it by prematurely leaving the test method + time.Sleep((proposalHeartbeatIntervalSeconds + 1) * time.Second) +} + +// regression for #2518 +func TestHearbeatWhenWeAreValidator(t *testing.T) { + cs, _ := randConsensusState(4) + heartbeatCh := subscribe(cs.eventBus, types.EventQueryProposalHeartbeat) + + go cs.proposalHeartbeat(10, 1) + ensureProposalHeartbeat(heartbeatCh) + +} + // What we want: // P0 miss to lock B as Proposal Block is missing, but set valid block to B after // receiving delayed Block Proposal. diff --git a/consensus/types/round_state.go b/consensus/types/round_state.go index ef4236118..6359a6555 100644 --- a/consensus/types/round_state.go +++ b/consensus/types/round_state.go @@ -112,18 +112,50 @@ func (rs *RoundState) RoundStateSimple() RoundStateSimple { } } +// NewRoundEvent returns the RoundState with proposer information as an event. +func (rs *RoundState) NewRoundEvent() types.EventDataNewRound { + addr := rs.Validators.GetProposer().Address + idx, _ := rs.Validators.GetByAddress(addr) + + return types.EventDataNewRound{ + Height: rs.Height, + Round: rs.Round, + Step: rs.Step.String(), + Proposer: types.ValidatorInfo{ + Address: addr, + Index: idx, + }, + } +} + +// CompleteProposalEvent returns information about a proposed block as an event. +func (rs *RoundState) CompleteProposalEvent() types.EventDataCompleteProposal { + // We must construct BlockID from ProposalBlock and ProposalBlockParts + // cs.Proposal is not guaranteed to be set when this function is called + blockId := types.BlockID{ + Hash: rs.ProposalBlock.Hash(), + PartsHeader: rs.ProposalBlockParts.Header(), + } + + return types.EventDataCompleteProposal{ + Height: rs.Height, + Round: rs.Round, + Step: rs.Step.String(), + BlockID: blockId, + } +} + // RoundStateEvent returns the H/R/S of the RoundState as an event. func (rs *RoundState) RoundStateEvent() types.EventDataRoundState { // copy the RoundState. // TODO: if we want to avoid this, we may need synchronous events after all rsCopy := *rs - edrs := types.EventDataRoundState{ + return types.EventDataRoundState{ Height: rs.Height, Round: rs.Round, Step: rs.Step.String(), RoundState: &rsCopy, } - return edrs } // String returns a string diff --git a/docs/README.md b/docs/README.md index c32935477..ae4b4731e 100644 --- a/docs/README.md +++ b/docs/README.md @@ -21,7 +21,7 @@ For more details on using Tendermint, see the respective documentation for ## Contribute -To contribute to the documentation, see [this file](./DOCS_README.md) for details of the build process and +To contribute to the documentation, see [this file](https://github.com/tendermint/tendermint/blob/master/docs/DOCS_README.md) for details of the build process and considerations when making changes. ## Version diff --git a/docs/architecture/adr-035-documentation.md b/docs/architecture/adr-035-documentation.md new file mode 100644 index 000000000..92cb07916 --- /dev/null +++ b/docs/architecture/adr-035-documentation.md @@ -0,0 +1,40 @@ +# ADR 035: Documentation + +Author: @zramsay (Zach Ramsay) + +## Changelog + +### November 2nd 2018 + +- initial write-up + +## Context + +The Tendermint documentation has undergone several changes until settling on the current model. Originally, the documentation was hosted on the website and had to be updated asynchronously from the code. Along with the other repositories requiring documentation, the whole stack moved to using Read The Docs to automatically generate, publish, and host the documentation. This, however, was insufficient; the RTD site had advertisement, it wasn't easily accessible to devs, didn't collect metrics, was another set of external links, etc. + +## Decision + +For two reasons, the decision was made to use VuePress: + +1) ability to get metrics (implemented on both Tendermint and SDK) +2) host the documentation on the website as a `/docs` endpoint. + +This is done while maintaining synchrony between the docs and code, i.e., the website is built whenever the docs are updated. + +## Status + +The two points above have been implemented; the `config.js` has a Google Analytics identifier and the documentation workflow has been up and running largely without problems for several months. Details about the documentation build & workflow can be found [here](../DOCS_README.md) + +## Consequences + +Because of the organizational seperation between Tendermint & Cosmos, there is a challenge of "what goes where" for certain aspects of documentation. + +### Positive + +This architecture is largely positive relative to prior docs arrangements. + +### Negative + +A significant portion of the docs automation / build process is in private repos with limited access/visibility to devs. However, these tasks are handled by the SRE team. + +### Neutral diff --git a/docs/architecture/adr-template.md b/docs/architecture/adr-template.md index 4879afc40..28a5ecfbb 100644 --- a/docs/architecture/adr-template.md +++ b/docs/architecture/adr-template.md @@ -1,19 +1,36 @@ -# ADR 000: Template for an ADR - -Author: +# ADR {ADR-NUMBER}: {TITLE} ## Changelog +* {date}: {changelog} ## Context +> This section contains all the context one needs to understand the current state, and why there is a problem. It should be as succinct as possible and introduce the high level idea behind the solution. ## Decision +> This section explains all of the details of the proposed solution, including implementation details. +It should also describe affects / corollary items that may need to be changed as a part of this. +If the proposed change will be large, please also indicate a way to do the change to maximize ease of review. +(e.g. the optimal split of things to do between separate PR's) + ## Status +> A decision may be "proposed" if it hasn't been agreed upon yet, or "accepted" once it is agreed upon. If a later ADR changes or reverses a decision, it may be marked as "deprecated" or "superseded" with a reference to its replacement. + +{Deprecated|Proposed|Accepted} + ## Consequences +> This section describes the consequences, after applying the decision. All consequences should be summarized here, not just the "positive" ones. + ### Positive ### Negative ### Neutral + +## References + +> Are there any relevant PR comments, issues that led up to this, or articles referrenced for why we made the given design choice? If so link them here! + +* {reference link} diff --git a/docs/spec/blockchain/state.md b/docs/spec/blockchain/state.md index 502f9d696..0a07890f8 100644 --- a/docs/spec/blockchain/state.md +++ b/docs/spec/blockchain/state.md @@ -79,30 +79,24 @@ func TotalVotingPower(vals []Validators) int64{ ConsensusParams define various limits for blockchain data structures. Like validator sets, they are set during genesis and can be updated by the application through ABCI. -``` +```go type ConsensusParams struct { BlockSize - TxSize - BlockGossip - EvidenceParams + Evidence + Validator } type BlockSize struct { - MaxBytes int + MaxBytes int64 MaxGas int64 } -type TxSize struct { - MaxBytes int - MaxGas int64 -} - -type BlockGossip struct { - BlockPartSizeBytes int +type Evidence struct { + MaxAge int64 } -type EvidenceParams struct { - MaxAge int64 +type Validator struct { + PubKeyTypes []string } ``` @@ -115,20 +109,15 @@ otherwise. Blocks should additionally be limited by the amount of "gas" consumed by the transactions in the block, though this is not yet implemented. -#### TxSize - -These parameters are not yet enforced and may disappear. See [issue -#2347](https://github.com/tendermint/tendermint/issues/2347). - -#### BlockGossip - -When gossipping blocks in the consensus, they are first split into parts. The -size of each part is `ConsensusParams.BlockGossip.BlockPartSizeBytes`. - -#### EvidenceParams +#### Evidence For evidence in a block to be valid, it must satisfy: ``` -block.Header.Height - evidence.Height < ConsensusParams.EvidenceParams.MaxAge +block.Header.Height - evidence.Height < ConsensusParams.Evidence.MaxAge ``` + +#### Validator + +Validators from genesis file and `ResponseEndBlock` must have pubkeys of type ∈ +`ConsensusParams.Validator.PubKeyTypes`. diff --git a/docs/tendermint-core/configuration.md b/docs/tendermint-core/configuration.md index 7052ca507..13894a308 100644 --- a/docs/tendermint-core/configuration.md +++ b/docs/tendermint-core/configuration.md @@ -39,6 +39,9 @@ db_dir = "data" # Output level for logging log_level = "state:info,*:error" +# Output format: 'plain' (colored text) or 'json' +log_format = "plain" + ##### additional base config options ##### # The ID of the chain to join (should be signed with every transaction and vote) diff --git a/libs/autofile/autofile.go b/libs/autofile/autofile.go index a1e2f49e6..e428e26c5 100644 --- a/libs/autofile/autofile.go +++ b/libs/autofile/autofile.go @@ -8,7 +8,6 @@ import ( "time" cmn "github.com/tendermint/tendermint/libs/common" - "github.com/tendermint/tendermint/libs/errors" ) /* AutoFile usage @@ -157,13 +156,13 @@ func (af *AutoFile) openFile() error { if err != nil { return err } - fileInfo, err := file.Stat() - if err != nil { - return err - } - if fileInfo.Mode() != autoFilePerms { - return errors.NewErrPermissionsChanged(file.Name(), fileInfo.Mode(), autoFilePerms) - } + // fileInfo, err := file.Stat() + // if err != nil { + // return err + // } + // if fileInfo.Mode() != autoFilePerms { + // return errors.NewErrPermissionsChanged(file.Name(), fileInfo.Mode(), autoFilePerms) + // } af.file = file return nil } diff --git a/libs/autofile/autofile_test.go b/libs/autofile/autofile_test.go index 0b3521c2a..9903f1e68 100644 --- a/libs/autofile/autofile_test.go +++ b/libs/autofile/autofile_test.go @@ -10,7 +10,6 @@ import ( "github.com/stretchr/testify/require" cmn "github.com/tendermint/tendermint/libs/common" - "github.com/tendermint/tendermint/libs/errors" ) func TestSIGHUP(t *testing.T) { @@ -58,32 +57,32 @@ func TestSIGHUP(t *testing.T) { } } -// Manually modify file permissions, close, and reopen using autofile: -// We expect the file permissions to be changed back to the intended perms. -func TestOpenAutoFilePerms(t *testing.T) { - file, err := ioutil.TempFile("", "permission_test") - require.NoError(t, err) - err = file.Close() - require.NoError(t, err) - name := file.Name() - - // open and change permissions - af, err := OpenAutoFile(name) - require.NoError(t, err) - err = af.file.Chmod(0755) - require.NoError(t, err) - err = af.Close() - require.NoError(t, err) - - // reopen and expect an ErrPermissionsChanged as Cause - af, err = OpenAutoFile(name) - require.Error(t, err) - if e, ok := err.(*errors.ErrPermissionsChanged); ok { - t.Logf("%v", e) - } else { - t.Errorf("unexpected error %v", e) - } -} +// // Manually modify file permissions, close, and reopen using autofile: +// // We expect the file permissions to be changed back to the intended perms. +// func TestOpenAutoFilePerms(t *testing.T) { +// file, err := ioutil.TempFile("", "permission_test") +// require.NoError(t, err) +// err = file.Close() +// require.NoError(t, err) +// name := file.Name() + +// // open and change permissions +// af, err := OpenAutoFile(name) +// require.NoError(t, err) +// err = af.file.Chmod(0755) +// require.NoError(t, err) +// err = af.Close() +// require.NoError(t, err) + +// // reopen and expect an ErrPermissionsChanged as Cause +// af, err = OpenAutoFile(name) +// require.Error(t, err) +// if e, ok := err.(*errors.ErrPermissionsChanged); ok { +// t.Logf("%v", e) +// } else { +// t.Errorf("unexpected error %v", e) +// } +// } func TestAutoFileSize(t *testing.T) { // First, create an AutoFile writing to a tempfile dir @@ -120,4 +119,4 @@ func TestAutoFileSize(t *testing.T) { // Cleanup _ = os.Remove(f.Name()) -} +} \ No newline at end of file diff --git a/libs/db/fsdb.go b/libs/db/fsdb.go index 92c059d42..b1d40c7b4 100644 --- a/libs/db/fsdb.go +++ b/libs/db/fsdb.go @@ -12,7 +12,6 @@ import ( "github.com/pkg/errors" cmn "github.com/tendermint/tendermint/libs/common" - tmerrors "github.com/tendermint/tendermint/libs/errors" ) const ( @@ -207,13 +206,13 @@ func write(path string, d []byte) error { return err } defer f.Close() - fInfo, err := f.Stat() - if err != nil { - return err - } - if fInfo.Mode() != keyPerm { - return tmerrors.NewErrPermissionsChanged(f.Name(), keyPerm, fInfo.Mode()) - } + // fInfo, err := f.Stat() + // if err != nil { + // return err + // } + // if fInfo.Mode() != keyPerm { + // return tmerrors.NewErrPermissionsChanged(f.Name(), keyPerm, fInfo.Mode()) + // } _, err = f.Write(d) if err != nil { return err diff --git a/libs/errors/errors.go b/libs/errors/errors.go index ae5d94392..a03382780 100644 --- a/libs/errors/errors.go +++ b/libs/errors/errors.go @@ -1,26 +1,21 @@ // Package errors contains errors that are thrown across packages. package errors -import ( - "fmt" - "os" -) +// // ErrPermissionsChanged occurs if the file permission have changed since the file was created. +// type ErrPermissionsChanged struct { +// name string +// got, want os.FileMode +// } -// ErrPermissionsChanged occurs if the file permission have changed since the file was created. -type ErrPermissionsChanged struct { - name string - got, want os.FileMode -} +// func NewErrPermissionsChanged(name string, got, want os.FileMode) *ErrPermissionsChanged { +// return &ErrPermissionsChanged{name: name, got: got, want: want} +// } -func NewErrPermissionsChanged(name string, got, want os.FileMode) *ErrPermissionsChanged { - return &ErrPermissionsChanged{name: name, got: got, want: want} -} - -func (e ErrPermissionsChanged) Error() string { - return fmt.Sprintf( - "file: [%v]\nexpected file permissions: %v, got: %v", - e.name, - e.want, - e.got, - ) -} +// func (e ErrPermissionsChanged) Error() string { +// return fmt.Sprintf( +// "file: [%v]\nexpected file permissions: %v, got: %v", +// e.name, +// e.want, +// e.got, +// ) +// } diff --git a/lite/proxy/proxy.go b/lite/proxy/proxy.go index ffd9db1d7..d7ffb27d0 100644 --- a/lite/proxy/proxy.go +++ b/lite/proxy/proxy.go @@ -9,7 +9,7 @@ import ( rpcclient "github.com/tendermint/tendermint/rpc/client" "github.com/tendermint/tendermint/rpc/core" ctypes "github.com/tendermint/tendermint/rpc/core/types" - rpc "github.com/tendermint/tendermint/rpc/lib/server" + rpcserver "github.com/tendermint/tendermint/rpc/lib/server" ) const ( @@ -19,6 +19,7 @@ const ( // StartProxy will start the websocket manager on the client, // set up the rpc routes to proxy via the given client, // and start up an http/rpc server on the location given by bind (eg. :1234) +// NOTE: This function blocks - you may want to call it in a go-routine. func StartProxy(c rpcclient.Client, listenAddr string, logger log.Logger, maxOpenConnections int) error { err := c.Start() if err != nil { @@ -31,47 +32,49 @@ func StartProxy(c rpcclient.Client, listenAddr string, logger log.Logger, maxOpe // build the handler... mux := http.NewServeMux() - rpc.RegisterRPCFuncs(mux, r, cdc, logger) + rpcserver.RegisterRPCFuncs(mux, r, cdc, logger) - wm := rpc.NewWebsocketManager(r, cdc, rpc.EventSubscriber(c)) + wm := rpcserver.NewWebsocketManager(r, cdc, rpcserver.EventSubscriber(c)) wm.SetLogger(logger) core.SetLogger(logger) mux.HandleFunc(wsEndpoint, wm.WebsocketHandler) - _, err = rpc.StartHTTPServer(listenAddr, mux, logger, rpc.Config{MaxOpenConnections: maxOpenConnections}) - - return err + l, err := rpcserver.Listen(listenAddr, rpcserver.Config{MaxOpenConnections: maxOpenConnections}) + if err != nil { + return err + } + return rpcserver.StartHTTPServer(l, mux, logger) } // RPCRoutes just routes everything to the given client, as if it were // a tendermint fullnode. // // if we want security, the client must implement it as a secure client -func RPCRoutes(c rpcclient.Client) map[string]*rpc.RPCFunc { +func RPCRoutes(c rpcclient.Client) map[string]*rpcserver.RPCFunc { - return map[string]*rpc.RPCFunc{ + return map[string]*rpcserver.RPCFunc{ // Subscribe/unsubscribe are reserved for websocket events. // We can just use the core tendermint impl, which uses the // EventSwitch we registered in NewWebsocketManager above - "subscribe": rpc.NewWSRPCFunc(core.Subscribe, "query"), - "unsubscribe": rpc.NewWSRPCFunc(core.Unsubscribe, "query"), + "subscribe": rpcserver.NewWSRPCFunc(core.Subscribe, "query"), + "unsubscribe": rpcserver.NewWSRPCFunc(core.Unsubscribe, "query"), // info API - "status": rpc.NewRPCFunc(c.Status, ""), - "blockchain": rpc.NewRPCFunc(c.BlockchainInfo, "minHeight,maxHeight"), - "genesis": rpc.NewRPCFunc(c.Genesis, ""), - "block": rpc.NewRPCFunc(c.Block, "height"), - "commit": rpc.NewRPCFunc(c.Commit, "height"), - "tx": rpc.NewRPCFunc(c.Tx, "hash,prove"), - "validators": rpc.NewRPCFunc(c.Validators, ""), + "status": rpcserver.NewRPCFunc(c.Status, ""), + "blockchain": rpcserver.NewRPCFunc(c.BlockchainInfo, "minHeight,maxHeight"), + "genesis": rpcserver.NewRPCFunc(c.Genesis, ""), + "block": rpcserver.NewRPCFunc(c.Block, "height"), + "commit": rpcserver.NewRPCFunc(c.Commit, "height"), + "tx": rpcserver.NewRPCFunc(c.Tx, "hash,prove"), + "validators": rpcserver.NewRPCFunc(c.Validators, ""), // broadcast API - "broadcast_tx_commit": rpc.NewRPCFunc(c.BroadcastTxCommit, "tx"), - "broadcast_tx_sync": rpc.NewRPCFunc(c.BroadcastTxSync, "tx"), - "broadcast_tx_async": rpc.NewRPCFunc(c.BroadcastTxAsync, "tx"), + "broadcast_tx_commit": rpcserver.NewRPCFunc(c.BroadcastTxCommit, "tx"), + "broadcast_tx_sync": rpcserver.NewRPCFunc(c.BroadcastTxSync, "tx"), + "broadcast_tx_async": rpcserver.NewRPCFunc(c.BroadcastTxAsync, "tx"), // abci API - "abci_query": rpc.NewRPCFunc(c.ABCIQuery, "path,data,prove"), - "abci_info": rpc.NewRPCFunc(c.ABCIInfo, ""), + "abci_query": rpcserver.NewRPCFunc(c.ABCIQuery, "path,data,prove"), + "abci_info": rpcserver.NewRPCFunc(c.ABCIInfo, ""), } } diff --git a/node/node.go b/node/node.go index 796bbc2a8..bfd8d02e2 100644 --- a/node/node.go +++ b/node/node.go @@ -371,7 +371,8 @@ func NewNode(config *cfg.Config, // Setup Transport. var ( - transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey) + mConnConfig = p2p.MConnConfig(config.P2P) + transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, mConnConfig) connFilters = []p2p.ConnFilterFunc{} peerFilters = []p2p.PeerFilterFunc{} ) @@ -653,6 +654,14 @@ func (n *Node) startRPC() ([]net.Listener, error) { mux.HandleFunc("/websocket", wm.WebsocketHandler) rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger) + listener, err := rpcserver.Listen( + listenAddr, + rpcserver.Config{MaxOpenConnections: n.config.RPC.MaxOpenConnections}, + ) + if err != nil { + return nil, err + } + var rootHandler http.Handler = mux if n.config.RPC.IsCorsEnabled() { corsMiddleware := cors.New(cors.Options{ @@ -663,30 +672,23 @@ func (n *Node) startRPC() ([]net.Listener, error) { rootHandler = corsMiddleware.Handler(mux) } - listener, err := rpcserver.StartHTTPServer( - listenAddr, + go rpcserver.StartHTTPServer( + listener, rootHandler, rpcLogger, - rpcserver.Config{MaxOpenConnections: n.config.RPC.MaxOpenConnections}, ) - if err != nil { - return nil, err - } listeners[i] = listener } // we expose a simplified api over grpc for convenience to app devs grpcListenAddr := n.config.RPC.GRPCListenAddress if grpcListenAddr != "" { - listener, err := grpccore.StartGRPCServer( - grpcListenAddr, - grpccore.Config{ - MaxOpenConnections: n.config.RPC.GRPCMaxOpenConnections, - }, - ) + listener, err := rpcserver.Listen( + grpcListenAddr, rpcserver.Config{MaxOpenConnections: n.config.RPC.GRPCMaxOpenConnections}) if err != nil { return nil, err } + go grpccore.StartGRPCServer(listener) listeners = append(listeners, listener) } diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 80fc53ddb..c6aad038b 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -84,7 +84,11 @@ type MConnection struct { errored uint32 config MConnConfig - quit chan struct{} + // Closing quitSendRoutine will cause + // doneSendRoutine to close. + quitSendRoutine chan struct{} + doneSendRoutine chan struct{} + flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled. pingTimer *cmn.RepeatTimer // send pings periodically @@ -190,7 +194,8 @@ func (c *MConnection) OnStart() error { if err := c.BaseService.OnStart(); err != nil { return err } - c.quit = make(chan struct{}) + c.quitSendRoutine = make(chan struct{}) + c.doneSendRoutine = make(chan struct{}) c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle) c.pingTimer = cmn.NewRepeatTimer("ping", c.config.PingInterval) c.pongTimeoutCh = make(chan bool, 1) @@ -200,15 +205,59 @@ func (c *MConnection) OnStart() error { return nil } +// FlushStop replicates the logic of OnStop. +// It additionally ensures that all successful +// .Send() calls will get flushed before closing +// the connection. +// NOTE: it is not safe to call this method more than once. +func (c *MConnection) FlushStop() { + c.BaseService.OnStop() + c.flushTimer.Stop() + c.pingTimer.Stop() + c.chStatsTimer.Stop() + if c.quitSendRoutine != nil { + close(c.quitSendRoutine) + // wait until the sendRoutine exits + // so we dont race on calling sendSomePacketMsgs + <-c.doneSendRoutine + } + + // Send and flush all pending msgs. + // By now, IsRunning == false, + // so any concurrent attempts to send will fail. + // Since sendRoutine has exited, we can call this + // safely + eof := c.sendSomePacketMsgs() + for !eof { + eof = c.sendSomePacketMsgs() + } + c.flush() + + // Now we can close the connection + c.conn.Close() // nolint: errcheck + + // We can't close pong safely here because + // recvRoutine may write to it after we've stopped. + // Though it doesn't need to get closed at all, + // we close it @ recvRoutine. + + // c.Stop() +} + // OnStop implements BaseService func (c *MConnection) OnStop() { + select { + case <-c.quitSendRoutine: + // already quit via FlushStop + return + default: + } + c.BaseService.OnStop() c.flushTimer.Stop() c.pingTimer.Stop() c.chStatsTimer.Stop() - if c.quit != nil { - close(c.quit) - } + close(c.quitSendRoutine) c.conn.Close() // nolint: errcheck // We can't close pong safely here because @@ -269,7 +318,7 @@ func (c *MConnection) Send(chID byte, msgBytes []byte) bool { default: } } else { - c.Logger.Error("Send failed", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes)) + c.Logger.Debug("Send failed", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes)) } return success } @@ -365,7 +414,8 @@ FOR_LOOP: } c.sendMonitor.Update(int(_n)) c.flush() - case <-c.quit: + case <-c.quitSendRoutine: + close(c.doneSendRoutine) break FOR_LOOP case <-c.send: // Send some PacketMsgs diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 59fe0d1df..a757f07a6 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -36,6 +36,43 @@ func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msg return c } +func TestMConnectionSendFlushStop(t *testing.T) { + server, client := NetPipe() + defer server.Close() // nolint: errcheck + defer client.Close() // nolint: errcheck + + clientConn := createTestMConnection(client) + err := clientConn.Start() + require.Nil(t, err) + defer clientConn.Stop() + + msg := []byte("abc") + assert.True(t, clientConn.Send(0x01, msg)) + + aminoMsgLength := 14 + + // start the reader in a new routine, so we can flush + errCh := make(chan error) + go func() { + msgB := make([]byte, aminoMsgLength) + _, err := server.Read(msgB) + if err != nil { + t.Fatal(err) + } + errCh <- err + }() + + // stop the conn - it should flush all conns + clientConn.FlushStop() + + timer := time.NewTimer(3 * time.Second) + select { + case <-errCh: + case <-timer.C: + t.Error("timed out waiting for msgs to be read") + } +} + func TestMConnectionSend(t *testing.T) { server, client := NetPipe() defer server.Close() // nolint: errcheck diff --git a/p2p/dummy/peer.go b/p2p/dummy/peer.go index 65ff65fb2..71def27e0 100644 --- a/p2p/dummy/peer.go +++ b/p2p/dummy/peer.go @@ -25,6 +25,11 @@ func NewPeer() *peer { return p } +// FlushStop just calls Stop. +func (p *peer) FlushStop() { + p.Stop() +} + // ID always returns dummy. func (p *peer) ID() p2p.ID { return p2p.ID("dummy") diff --git a/p2p/peer.go b/p2p/peer.go index e98c16d26..da301d497 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -8,7 +8,6 @@ import ( cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/config" tmconn "github.com/tendermint/tendermint/p2p/conn" ) @@ -17,6 +16,7 @@ const metricsTickerDuration = 10 * time.Second // Peer is an interface representing a peer connected on a reactor. type Peer interface { cmn.Service + FlushStop() ID() ID // peer's cryptographic ID RemoteIP() net.IP // remote IP of the connection @@ -41,7 +41,6 @@ type Peer interface { type peerConn struct { outbound bool persistent bool - config *config.P2PConfig conn net.Conn // source connection originalAddr *NetAddress // nil for inbound connections @@ -52,7 +51,6 @@ type peerConn struct { func newPeerConn( outbound, persistent bool, - config *config.P2PConfig, conn net.Conn, originalAddr *NetAddress, ) peerConn { @@ -60,7 +58,6 @@ func newPeerConn( return peerConn{ outbound: outbound, persistent: persistent, - config: config, conn: conn, originalAddr: originalAddr, } @@ -184,6 +181,15 @@ func (p *peer) OnStart() error { return nil } +// FlushStop mimics OnStop but additionally ensures that all successful +// .Send() calls will get flushed before closing the connection. +// NOTE: it is not safe to call this method more than once. +func (p *peer) FlushStop() { + p.metricsTicker.Stop() + p.BaseService.OnStop() + p.mconn.FlushStop() // stop everything and close the conn +} + // OnStop implements BaseService. func (p *peer) OnStop() { p.metricsTicker.Stop() diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index daa9b2c82..04b877b0d 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -18,6 +18,7 @@ type mockPeer struct { id ID } +func (mp *mockPeer) FlushStop() { mp.Stop() } func (mp *mockPeer) TrySend(chID byte, msgBytes []byte) bool { return true } func (mp *mockPeer) Send(chID byte, msgBytes []byte) bool { return true } func (mp *mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} } diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 85d292b09..057aadaa2 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -208,25 +208,38 @@ func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) { switch msg := msg.(type) { case *pexRequestMessage: - // Check we're not receiving too many requests - if err := r.receiveRequest(src); err != nil { - r.Switch.StopPeerForError(src, err) - return - } - // Seeds disconnect after sending a batch of addrs - // NOTE: this is a prime candidate for amplification attacks + // NOTE: this is a prime candidate for amplification attacks, // so it's important we // 1) restrict how frequently peers can request // 2) limit the output size - if r.config.SeedMode { + + // If we're a seed and this is an inbound peer, + // respond once and disconnect. + if r.config.SeedMode && !src.IsOutbound() { + id := string(src.ID()) + v := r.lastReceivedRequests.Get(id) + if v != nil { + // FlushStop/StopPeer are already + // running in a go-routine. + return + } + r.lastReceivedRequests.Set(id, time.Now()) + + // Send addrs and disconnect r.SendAddrs(src, r.book.GetSelectionWithBias(biasToSelectNewPeers)) go func() { - // TODO Fix properly #2092 - time.Sleep(time.Second * 5) + // In a go-routine so it doesn't block .Receive. + src.FlushStop() r.Switch.StopPeerGracefully(src) }() + } else { + // Check we're not receiving requests too frequently. + if err := r.receiveRequest(src); err != nil { + r.Switch.StopPeerForError(src, err) + return + } r.SendAddrs(src, r.book.GetSelection()) } diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 9d3f49bba..8f3ceb89c 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -387,6 +387,7 @@ func newMockPeer() mockPeer { return mp } +func (mp mockPeer) FlushStop() { mp.Stop() } func (mp mockPeer) ID() p2p.ID { return mp.addr.ID } func (mp mockPeer) IsOutbound() bool { return mp.outbound } func (mp mockPeer) IsPersistent() bool { return mp.persistent } diff --git a/p2p/switch.go b/p2p/switch.go index b70900ea9..4996ebd91 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -27,6 +27,17 @@ const ( reconnectBackOffBaseSeconds = 3 ) +// MConnConfig returns an MConnConfig with fields updated +// from the P2PConfig. +func MConnConfig(cfg *config.P2PConfig) conn.MConnConfig { + mConfig := conn.DefaultMConnConfig() + mConfig.FlushThrottle = cfg.FlushThrottleTimeout + mConfig.SendRate = cfg.SendRate + mConfig.RecvRate = cfg.RecvRate + mConfig.MaxPacketMsgPayloadSize = cfg.MaxPacketMsgPayloadSize + return mConfig +} + //----------------------------------------------------------------------------- // An AddrBook represents an address book from the pex package, which is used @@ -70,8 +81,6 @@ type Switch struct { filterTimeout time.Duration peerFilters []PeerFilterFunc - mConfig conn.MConnConfig - rng *cmn.Rand // seed for randomizing dial times and orders metrics *Metrics @@ -102,14 +111,6 @@ func NewSwitch( // Ensure we have a completely undeterministic PRNG. sw.rng = cmn.NewRand() - mConfig := conn.DefaultMConnConfig() - mConfig.FlushThrottle = cfg.FlushThrottleTimeout - mConfig.SendRate = cfg.SendRate - mConfig.RecvRate = cfg.RecvRate - mConfig.MaxPacketMsgPayloadSize = cfg.MaxPacketMsgPayloadSize - - sw.mConfig = mConfig - sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw) for _, option := range options { diff --git a/p2p/test_util.go b/p2p/test_util.go index d72c0c760..b8a34600c 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -135,7 +135,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error { p := newPeer( pc, - sw.mConfig, + MConnConfig(sw.config), ni, sw.reactorsByCh, sw.chDescs, @@ -175,7 +175,7 @@ func MakeSwitch( } nodeInfo := testNodeInfo(nodeKey.ID(), fmt.Sprintf("node%d", i)) - t := NewMultiplexTransport(nodeInfo, nodeKey) + t := NewMultiplexTransport(nodeInfo, nodeKey, MConnConfig(cfg)) addr := nodeInfo.NetAddress() if err := t.Listen(*addr); err != nil { @@ -232,7 +232,6 @@ func testPeerConn( // Only the information we already have return peerConn{ - config: cfg, outbound: outbound, persistent: persistent, conn: conn, diff --git a/p2p/transport.go b/p2p/transport.go index 0b9b436f0..b16db54db 100644 --- a/p2p/transport.go +++ b/p2p/transport.go @@ -6,7 +6,6 @@ import ( "net" "time" - "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/p2p/conn" ) @@ -129,11 +128,10 @@ type MultiplexTransport struct { nodeKey NodeKey resolver IPResolver - // TODO(xla): Those configs are still needed as we parameterise peerConn and + // TODO(xla): This config is still needed as we parameterise peerConn and // peer currently. All relevant configuration should be refactored into options // with sane defaults. - mConfig conn.MConnConfig - p2pConfig config.P2PConfig + mConfig conn.MConnConfig } // Test multiplexTransport for interface completeness. @@ -144,6 +142,7 @@ var _ transportLifecycle = (*MultiplexTransport)(nil) func NewMultiplexTransport( nodeInfo NodeInfo, nodeKey NodeKey, + mConfig conn.MConnConfig, ) *MultiplexTransport { return &MultiplexTransport{ acceptc: make(chan accept), @@ -151,7 +150,7 @@ func NewMultiplexTransport( dialTimeout: defaultDialTimeout, filterTimeout: defaultFilterTimeout, handshakeTimeout: defaultHandshakeTimeout, - mConfig: conn.DefaultMConnConfig(), + mConfig: mConfig, nodeInfo: nodeInfo, nodeKey: nodeKey, conns: NewConnSet(), @@ -405,7 +404,6 @@ func (mt *MultiplexTransport) wrapPeer( peerConn := newPeerConn( cfg.outbound, cfg.persistent, - &mt.p2pConfig, c, dialedAddr, ) diff --git a/p2p/transport_test.go b/p2p/transport_test.go index 8a5c06bc3..182b28899 100644 --- a/p2p/transport_test.go +++ b/p2p/transport_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/tendermint/tendermint/crypto/ed25519" + "github.com/tendermint/tendermint/p2p/conn" ) var defaultNodeName = "host_peer" @@ -17,8 +18,20 @@ func emptyNodeInfo() NodeInfo { return DefaultNodeInfo{} } +// newMultiplexTransport returns a tcp connected multiplexed peer +// using the default MConnConfig. It's a convenience function used +// for testing. +func newMultiplexTransport( + nodeInfo NodeInfo, + nodeKey NodeKey, +) *MultiplexTransport { + return NewMultiplexTransport( + nodeInfo, nodeKey, conn.DefaultMConnConfig(), + ) +} + func TestTransportMultiplexConnFilter(t *testing.T) { - mt := NewMultiplexTransport( + mt := newMultiplexTransport( emptyNodeInfo(), NodeKey{ PrivKey: ed25519.GenPrivKey(), @@ -75,7 +88,7 @@ func TestTransportMultiplexConnFilter(t *testing.T) { } func TestTransportMultiplexConnFilterTimeout(t *testing.T) { - mt := NewMultiplexTransport( + mt := newMultiplexTransport( emptyNodeInfo(), NodeKey{ PrivKey: ed25519.GenPrivKey(), @@ -140,7 +153,7 @@ func TestTransportMultiplexAcceptMultiple(t *testing.T) { go func() { var ( pv = ed25519.GenPrivKey() - dialer = NewMultiplexTransport( + dialer = newMultiplexTransport( testNodeInfo(PubKeyToID(pv.PubKey()), defaultNodeName), NodeKey{ PrivKey: pv, @@ -261,7 +274,7 @@ func TestTransportMultiplexAcceptNonBlocking(t *testing.T) { <-slowc var ( - dialer = NewMultiplexTransport( + dialer = newMultiplexTransport( fastNodeInfo, NodeKey{ PrivKey: fastNodePV, @@ -307,7 +320,7 @@ func TestTransportMultiplexValidateNodeInfo(t *testing.T) { go func() { var ( pv = ed25519.GenPrivKey() - dialer = NewMultiplexTransport( + dialer = newMultiplexTransport( testNodeInfo(PubKeyToID(pv.PubKey()), ""), // Should not be empty NodeKey{ PrivKey: pv, @@ -350,7 +363,7 @@ func TestTransportMultiplexRejectMissmatchID(t *testing.T) { errc := make(chan error) go func() { - dialer := NewMultiplexTransport( + dialer := newMultiplexTransport( testNodeInfo( PubKeyToID(ed25519.GenPrivKey().PubKey()), "dialer", ), @@ -396,7 +409,7 @@ func TestTransportMultiplexRejectIncompatible(t *testing.T) { go func() { var ( pv = ed25519.GenPrivKey() - dialer = NewMultiplexTransport( + dialer = newMultiplexTransport( testNodeInfoWithNetwork(PubKeyToID(pv.PubKey()), "dialer", "incompatible-network"), NodeKey{ PrivKey: pv, @@ -553,7 +566,7 @@ func TestTransportHandshake(t *testing.T) { func testSetupMultiplexTransport(t *testing.T) *MultiplexTransport { var ( pv = ed25519.GenPrivKey() - mt = NewMultiplexTransport( + mt = newMultiplexTransport( testNodeInfo( PubKeyToID(pv.PubKey()), "transport", ), diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 598774921..7b3c368af 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -9,6 +9,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" ctypes "github.com/tendermint/tendermint/rpc/core/types" + rpcserver "github.com/tendermint/tendermint/rpc/lib/server" "github.com/tendermint/tendermint/types" ) @@ -194,7 +195,8 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { } // Wait for the tx to be included in a block or timeout. - var deliverTxTimeout = 10 * time.Second // TODO: configurable? + // TODO: configurable? + var deliverTxTimeout = rpcserver.WriteTimeout / 2 select { case deliverTxResMsg := <-deliverTxResCh: // The tx was included in a block. deliverTxRes := deliverTxResMsg.(types.EventDataTx) diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index 188ea1c36..ae8ae056a 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -1,8 +1,6 @@ package core import ( - "time" - "github.com/tendermint/tendermint/consensus" crypto "github.com/tendermint/tendermint/crypto" dbm "github.com/tendermint/tendermint/libs/db" @@ -10,6 +8,7 @@ import ( mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/proxy" + rpcserver "github.com/tendermint/tendermint/rpc/lib/server" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" @@ -21,7 +20,7 @@ const ( maxPerPage = 100 ) -var subscribeTimeout = 5 * time.Second +var subscribeTimeout = rpcserver.WriteTimeout / 2 //---------------------------------------------- // These interfaces are used by RPC and must be thread safe diff --git a/rpc/grpc/client_server.go b/rpc/grpc/client_server.go index c88989685..2bc89864d 100644 --- a/rpc/grpc/client_server.go +++ b/rpc/grpc/client_server.go @@ -1,12 +1,9 @@ package core_grpc import ( - "fmt" "net" - "strings" "time" - "golang.org/x/net/netutil" "google.golang.org/grpc" cmn "github.com/tendermint/tendermint/libs/common" @@ -17,28 +14,12 @@ type Config struct { MaxOpenConnections int } -// StartGRPCServer starts a new gRPC BroadcastAPIServer, listening on -// protoAddr, in a goroutine. Returns a listener and an error, if it fails to -// parse an address. -func StartGRPCServer(protoAddr string, config Config) (net.Listener, error) { - parts := strings.SplitN(protoAddr, "://", 2) - if len(parts) != 2 { - return nil, fmt.Errorf("Invalid listen address for grpc server (did you forget a tcp:// prefix?) : %s", protoAddr) - } - proto, addr := parts[0], parts[1] - ln, err := net.Listen(proto, addr) - if err != nil { - return nil, err - } - if config.MaxOpenConnections > 0 { - ln = netutil.LimitListener(ln, config.MaxOpenConnections) - } - +// StartGRPCServer starts a new gRPC BroadcastAPIServer using the given net.Listener. +// NOTE: This function blocks - you may want to call it in a go-routine. +func StartGRPCServer(ln net.Listener) error { grpcServer := grpc.NewServer() RegisterBroadcastAPIServer(grpcServer, &broadcastAPI{}) - go grpcServer.Serve(ln) // nolint: errcheck - - return ln, nil + return grpcServer.Serve(ln) } // StartGRPCClient dials the gRPC server using protoAddr and returns a new diff --git a/rpc/lib/doc.go b/rpc/lib/doc.go index dbdb362da..aa9638bfd 100644 --- a/rpc/lib/doc.go +++ b/rpc/lib/doc.go @@ -70,12 +70,9 @@ // wm := rpcserver.NewWebsocketManager(Routes) // mux.HandleFunc("/websocket", wm.WebsocketHandler) // logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout)) -// go func() { -// _, err := rpcserver.StartHTTPServer("0.0.0.0:8008", mux, logger) -// if err != nil { -// panic(err) -// } -// }() +// listener, err := rpc.Listen("0.0.0.0:8080", rpcserver.Config{}) +// if err != nil { panic(err) } +// go rpcserver.StartHTTPServer(listener, mux, logger) // // Note that unix sockets are supported as well (eg. `/path/to/socket` instead of `0.0.0.0:8008`) // Now see all available endpoints by sending a GET request to `0.0.0.0:8008`. diff --git a/rpc/lib/rpc_test.go b/rpc/lib/rpc_test.go index 3d76db323..794ab462c 100644 --- a/rpc/lib/rpc_test.go +++ b/rpc/lib/rpc_test.go @@ -121,12 +121,11 @@ func setup() { wm := server.NewWebsocketManager(Routes, RoutesCdc, server.ReadWait(5*time.Second), server.PingPeriod(1*time.Second)) wm.SetLogger(tcpLogger) mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler) - go func() { - _, err := server.StartHTTPServer(tcpAddr, mux, tcpLogger, server.Config{}) - if err != nil { - panic(err) - } - }() + listener1, err := server.Listen(tcpAddr, server.Config{}) + if err != nil { + panic(err) + } + go server.StartHTTPServer(listener1, mux, tcpLogger) unixLogger := logger.With("socket", "unix") mux2 := http.NewServeMux() @@ -134,12 +133,11 @@ func setup() { wm = server.NewWebsocketManager(Routes, RoutesCdc) wm.SetLogger(unixLogger) mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler) - go func() { - _, err := server.StartHTTPServer(unixAddr, mux2, unixLogger, server.Config{}) - if err != nil { - panic(err) - } - }() + listener2, err := server.Listen(unixAddr, server.Config{}) + if err != nil { + panic(err) + } + go server.StartHTTPServer(listener2, mux2, unixLogger) // wait for servers to start time.Sleep(time.Second * 2) diff --git a/rpc/lib/server/http_server.go b/rpc/lib/server/http_server.go index 8cacaeefb..1fd422a9b 100644 --- a/rpc/lib/server/http_server.go +++ b/rpc/lib/server/http_server.go @@ -27,92 +27,56 @@ const ( // maxBodyBytes controls the maximum number of bytes the // server will read parsing the request body. maxBodyBytes = int64(1000000) // 1MB + + // same as the net/http default + maxHeaderBytes = 1 << 20 + + // Timeouts for reading/writing to the http connection. + // Public so handlers can read them - + // /broadcast_tx_commit has it's own timeout, which should + // be less than the WriteTimeout here. + // TODO: use a config instead. + ReadTimeout = 3 * time.Second + WriteTimeout = 20 * time.Second ) -// StartHTTPServer starts an HTTP server on listenAddr with the given handler. +// StartHTTPServer takes a listener and starts an HTTP server with the given handler. // It wraps handler with RecoverAndLogHandler. -func StartHTTPServer( - listenAddr string, - handler http.Handler, - logger log.Logger, - config Config, -) (listener net.Listener, err error) { - var proto, addr string - parts := strings.SplitN(listenAddr, "://", 2) - if len(parts) != 2 { - return nil, errors.Errorf( - "Invalid listening address %s (use fully formed addresses, including the tcp:// or unix:// prefix)", - listenAddr, - ) - } - proto, addr = parts[0], parts[1] - - logger.Info(fmt.Sprintf("Starting RPC HTTP server on %s", listenAddr)) - listener, err = net.Listen(proto, addr) - if err != nil { - return nil, errors.Errorf("Failed to listen on %v: %v", listenAddr, err) +// NOTE: This function blocks - you may want to call it in a go-routine. +func StartHTTPServer(listener net.Listener, handler http.Handler, logger log.Logger) error { + logger.Info(fmt.Sprintf("Starting RPC HTTP server on %s", listener.Addr())) + s := &http.Server{ + Handler: RecoverAndLogHandler(maxBytesHandler{h: handler, n: maxBodyBytes}, logger), + ReadTimeout: ReadTimeout, + WriteTimeout: WriteTimeout, + MaxHeaderBytes: maxHeaderBytes, } - if config.MaxOpenConnections > 0 { - listener = netutil.LimitListener(listener, config.MaxOpenConnections) - } - - go func() { - err := http.Serve( - listener, - RecoverAndLogHandler(maxBytesHandler{h: handler, n: maxBodyBytes}, logger), - ) - logger.Info("RPC HTTP server stopped", "err", err) - }() - return listener, nil + err := s.Serve(listener) + logger.Info("RPC HTTP server stopped", "err", err) + return err } -// StartHTTPAndTLSServer starts an HTTPS server on listenAddr with the given -// handler. +// StartHTTPAndTLSServer takes a listener and starts an HTTPS server with the given handler. // It wraps handler with RecoverAndLogHandler. +// NOTE: This function blocks - you may want to call it in a go-routine. func StartHTTPAndTLSServer( - listenAddr string, + listener net.Listener, handler http.Handler, certFile, keyFile string, logger log.Logger, - config Config, -) (listener net.Listener, err error) { - var proto, addr string - parts := strings.SplitN(listenAddr, "://", 2) - if len(parts) != 2 { - return nil, errors.Errorf( - "Invalid listening address %s (use fully formed addresses, including the tcp:// or unix:// prefix)", - listenAddr, - ) - } - proto, addr = parts[0], parts[1] - - logger.Info( - fmt.Sprintf( - "Starting RPC HTTPS server on %s (cert: %q, key: %q)", - listenAddr, - certFile, - keyFile, - ), - ) - listener, err = net.Listen(proto, addr) - if err != nil { - return nil, errors.Errorf("Failed to listen on %v: %v", listenAddr, err) - } - if config.MaxOpenConnections > 0 { - listener = netutil.LimitListener(listener, config.MaxOpenConnections) +) error { + logger.Info(fmt.Sprintf("Starting RPC HTTPS server on %s (cert: %q, key: %q)", + listener.Addr(), certFile, keyFile)) + s := &http.Server{ + Handler: RecoverAndLogHandler(maxBytesHandler{h: handler, n: maxBodyBytes}, logger), + ReadTimeout: ReadTimeout, + WriteTimeout: WriteTimeout, + MaxHeaderBytes: maxHeaderBytes, } + err := s.ServeTLS(listener, certFile, keyFile) - err = http.ServeTLS( - listener, - RecoverAndLogHandler(maxBytesHandler{h: handler, n: maxBodyBytes}, logger), - certFile, - keyFile, - ) - if err != nil { - logger.Error("RPC HTTPS server stopped", "err", err) - return nil, err - } - return listener, nil + logger.Error("RPC HTTPS server stopped", "err", err) + return err } func WriteRPCResponseHTTPError( @@ -213,3 +177,25 @@ func (h maxBytesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { r.Body = http.MaxBytesReader(w, r.Body, h.n) h.h.ServeHTTP(w, r) } + +// Listen starts a new net.Listener on the given address. +// It returns an error if the address is invalid or the call to Listen() fails. +func Listen(addr string, config Config) (listener net.Listener, err error) { + parts := strings.SplitN(addr, "://", 2) + if len(parts) != 2 { + return nil, errors.Errorf( + "Invalid listening address %s (use fully formed addresses, including the tcp:// or unix:// prefix)", + addr, + ) + } + proto, addr := parts[0], parts[1] + listener, err = net.Listen(proto, addr) + if err != nil { + return nil, errors.Errorf("Failed to listen on %v: %v", addr, err) + } + if config.MaxOpenConnections > 0 { + listener = netutil.LimitListener(listener, config.MaxOpenConnections) + } + + return listener, nil +} diff --git a/rpc/lib/server/http_server_test.go b/rpc/lib/server/http_server_test.go index 73ebc2e7e..6b852afae 100644 --- a/rpc/lib/server/http_server_test.go +++ b/rpc/lib/server/http_server_test.go @@ -30,11 +30,10 @@ func TestMaxOpenConnections(t *testing.T) { time.Sleep(10 * time.Millisecond) fmt.Fprint(w, "some body") }) - l, err := StartHTTPServer("tcp://127.0.0.1:0", mux, log.TestingLogger(), Config{MaxOpenConnections: max}) - if err != nil { - t.Fatal(err) - } + l, err := Listen("tcp://127.0.0.1:0", Config{MaxOpenConnections: max}) + require.NoError(t, err) defer l.Close() + go StartHTTPServer(l, mux, log.TestingLogger()) // Make N GET calls to the server. attempts := max * 2 @@ -67,11 +66,14 @@ func TestMaxOpenConnections(t *testing.T) { func TestStartHTTPAndTLSServer(t *testing.T) { // set up fixtures listenerAddr := "tcp://0.0.0.0:0" + listener, err := Listen(listenerAddr, Config{MaxOpenConnections: 1}) + require.NoError(t, err) mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {}) // test failure - gotListener, err := StartHTTPAndTLSServer(listenerAddr, mux, "", "", log.TestingLogger(), Config{MaxOpenConnections: 1}) - require.Nil(t, gotListener) + err = StartHTTPAndTLSServer(listener, mux, "", "", log.TestingLogger()) require.IsType(t, (*os.PathError)(nil), err) + + // TODO: test that starting the server can actually work } diff --git a/rpc/lib/test/main.go b/rpc/lib/test/main.go index 544284b9c..0a9684d76 100644 --- a/rpc/lib/test/main.go +++ b/rpc/lib/test/main.go @@ -28,11 +28,11 @@ func main() { cdc := amino.NewCodec() logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout)) rpcserver.RegisterRPCFuncs(mux, routes, cdc, logger) - _, err := rpcserver.StartHTTPServer("0.0.0.0:8008", mux, logger, rpcserver.Config{}) + listener, err := rpcserver.Listen("0.0.0.0:8008", rpcserver.Config{}) if err != nil { cmn.Exit(err.Error()) } - + go rpcserver.StartHTTPServer(listener, mux, logger) // Wait forever cmn.TrapSignal(func() { }) diff --git a/state/execution.go b/state/execution.go index 4f5a1545c..9aa714ebd 100644 --- a/state/execution.go +++ b/state/execution.go @@ -2,6 +2,7 @@ package state import ( "fmt" + "strings" "time" abci "github.com/tendermint/tendermint/abci/types" @@ -107,7 +108,7 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b fail.Fail() // XXX // Update the state with the block and responses. - state, err = updateState(state, blockID, &block.Header, abciResponses) + state, err = updateState(blockExec.logger, state, blockID, &block.Header, abciResponses) if err != nil { return state, fmt.Errorf("Commit failed for application: %v", err) } @@ -254,12 +255,6 @@ func execBlockOnProxyApp( logger.Info("Executed block", "height", block.Height, "validTxs", validTxs, "invalidTxs", invalidTxs) - valUpdates := abciResponses.EndBlock.ValidatorUpdates - if len(valUpdates) > 0 { - // TODO: cleanup the formatting - logger.Info("Updates to validators", "updates", valUpdates) - } - return abciResponses, nil } @@ -315,16 +310,16 @@ func getBeginBlockValidatorInfo(block *types.Block, lastValSet *types.ValidatorS // If more or equal than 1/3 of total voting power changed in one block, then // a light client could never prove the transition externally. See // ./lite/doc.go for details on how a light client tracks validators. -func updateValidators(currentSet *types.ValidatorSet, abciUpdates []abci.ValidatorUpdate) error { +func updateValidators(currentSet *types.ValidatorSet, abciUpdates []abci.ValidatorUpdate) ([]*types.Validator, error) { updates, err := types.PB2TM.ValidatorUpdates(abciUpdates) if err != nil { - return err + return nil, err } // these are tendermint types now for _, valUpdate := range updates { if valUpdate.VotingPower < 0 { - return fmt.Errorf("Voting power can't be negative %v", valUpdate) + return nil, fmt.Errorf("Voting power can't be negative %v", valUpdate) } address := valUpdate.Address @@ -333,27 +328,28 @@ func updateValidators(currentSet *types.ValidatorSet, abciUpdates []abci.Validat // remove val _, removed := currentSet.Remove(address) if !removed { - return fmt.Errorf("Failed to remove validator %X", address) + return nil, fmt.Errorf("Failed to remove validator %X", address) } } else if val == nil { // add val added := currentSet.Add(valUpdate) if !added { - return fmt.Errorf("Failed to add new validator %v", valUpdate) + return nil, fmt.Errorf("Failed to add new validator %v", valUpdate) } } else { // update val updated := currentSet.Update(valUpdate) if !updated { - return fmt.Errorf("Failed to update validator %X to %v", address, valUpdate) + return nil, fmt.Errorf("Failed to update validator %X to %v", address, valUpdate) } } } - return nil + return updates, nil } // updateState returns a new State updated according to the header and responses. func updateState( + logger log.Logger, state State, blockID types.BlockID, header *types.Header, @@ -367,12 +363,14 @@ func updateState( // Update the validator set with the latest abciResponses. lastHeightValsChanged := state.LastHeightValidatorsChanged if len(abciResponses.EndBlock.ValidatorUpdates) > 0 { - err := updateValidators(nValSet, abciResponses.EndBlock.ValidatorUpdates) + validatorUpdates, err := updateValidators(nValSet, abciResponses.EndBlock.ValidatorUpdates) if err != nil { return state, fmt.Errorf("Error changing validator set: %v", err) } // Change results from this height but only applies to the next next height. lastHeightValsChanged = header.Height + 1 + 1 + + logger.Info("Updates to validators", "updates", makeValidatorUpdatesLogString(validatorUpdates)) } // Update validator accums and set state variables. @@ -466,3 +464,13 @@ func ExecCommitBlock( // ResponseCommit has no error or log, just data return res.Data, nil } + +// Make pretty string for validatorUpdates logging +func makeValidatorUpdatesLogString(vals []*types.Validator) string { + chunks := make([]string, len(vals)) + for i, val := range vals { + chunks[i] = fmt.Sprintf("%s:%d", val.Address, val.VotingPower) + } + + return strings.Join(chunks, ",") +} diff --git a/state/execution_test.go b/state/execution_test.go index 273e9ebea..41d9a4849 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -218,7 +218,7 @@ func TestUpdateValidators(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - err := updateValidators(tc.currentSet, tc.abciUpdates) + _, err := updateValidators(tc.currentSet, tc.abciUpdates) if tc.shouldErr { assert.Error(t, err) } else { diff --git a/state/state_test.go b/state/state_test.go index 88200e17e..17293f6fe 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -3,6 +3,7 @@ package state import ( "bytes" "fmt" + "github.com/tendermint/tendermint/libs/log" "testing" "github.com/stretchr/testify/assert" @@ -228,7 +229,7 @@ func TestOneValidatorChangesSaveLoad(t *testing.T) { power++ } header, blockID, responses := makeHeaderPartsResponsesValPowerChange(state, i, power) - state, err = updateState(state, blockID, &header, responses) + state, err = updateState(log.TestingLogger(), state, blockID, &header, responses) assert.Nil(t, err) nextHeight := state.LastBlockHeight + 1 saveValidatorsInfo(stateDB, nextHeight+1, state.LastHeightValidatorsChanged, state.NextValidators) @@ -280,7 +281,7 @@ func TestManyValidatorChangesSaveLoad(t *testing.T) { // Save state etc. var err error - state, err = updateState(state, blockID, &header, responses) + state, err = updateState(log.TestingLogger(), state, blockID, &header, responses) require.Nil(t, err) nextHeight := state.LastBlockHeight + 1 saveValidatorsInfo(stateDB, nextHeight+1, state.LastHeightValidatorsChanged, state.NextValidators) @@ -359,7 +360,7 @@ func TestConsensusParamsChangesSaveLoad(t *testing.T) { cp = params[changeIndex] } header, blockID, responses := makeHeaderPartsResponsesParams(state, i, cp) - state, err = updateState(state, blockID, &header, responses) + state, err = updateState(log.TestingLogger(), state, blockID, &header, responses) require.Nil(t, err) nextHeight := state.LastBlockHeight + 1 diff --git a/tools/tm-monitor/main.go b/tools/tm-monitor/main.go index 32897b978..6e4aea5f9 100644 --- a/tools/tm-monitor/main.go +++ b/tools/tm-monitor/main.go @@ -48,13 +48,13 @@ Examples: logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout)) } - m := startMonitor(flag.Arg(0)) + monitor := startMonitor(flag.Arg(0)) - startRPC(listenAddr, m, logger) + listener := startRPC(listenAddr, monitor, logger) var ton *Ton if !noton { - ton = NewTon(m) + ton = NewTon(monitor) ton.Start() } @@ -62,7 +62,8 @@ Examples: if !noton { ton.Stop() } - m.Stop() + monitor.Stop() + listener.Close() }) } diff --git a/tools/tm-monitor/rpc.go b/tools/tm-monitor/rpc.go index ab62e0462..1a08a9ecd 100644 --- a/tools/tm-monitor/rpc.go +++ b/tools/tm-monitor/rpc.go @@ -2,6 +2,7 @@ package main import ( "errors" + "net" "net/http" "github.com/tendermint/tendermint/libs/log" @@ -9,16 +10,19 @@ import ( monitor "github.com/tendermint/tendermint/tools/tm-monitor/monitor" ) -func startRPC(listenAddr string, m *monitor.Monitor, logger log.Logger) { +func startRPC(listenAddr string, m *monitor.Monitor, logger log.Logger) net.Listener { routes := routes(m) mux := http.NewServeMux() wm := rpc.NewWebsocketManager(routes, nil) mux.HandleFunc("/websocket", wm.WebsocketHandler) rpc.RegisterRPCFuncs(mux, routes, cdc, logger) - if _, err := rpc.StartHTTPServer(listenAddr, mux, logger, rpc.Config{}); err != nil { + listener, err := rpc.Listen(listenAddr, rpc.Config{}) + if err != nil { panic(err) } + go rpc.StartHTTPServer(listener, mux, logger) + return listener } func routes(m *monitor.Monitor) map[string]*rpc.RPCFunc { diff --git a/types/event_bus.go b/types/event_bus.go index 65206e938..fbe5ac478 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -136,11 +136,11 @@ func (b *EventBus) PublishEventTimeoutWait(data EventDataRoundState) error { return b.Publish(EventTimeoutWait, data) } -func (b *EventBus) PublishEventNewRound(data EventDataRoundState) error { +func (b *EventBus) PublishEventNewRound(data EventDataNewRound) error { return b.Publish(EventNewRound, data) } -func (b *EventBus) PublishEventCompleteProposal(data EventDataRoundState) error { +func (b *EventBus) PublishEventCompleteProposal(data EventDataCompleteProposal) error { return b.Publish(EventCompleteProposal, data) } diff --git a/types/event_bus_test.go b/types/event_bus_test.go index f0e825d5d..4056dacd4 100644 --- a/types/event_bus_test.go +++ b/types/event_bus_test.go @@ -96,9 +96,9 @@ func TestEventBusPublish(t *testing.T) { require.NoError(t, err) err = eventBus.PublishEventTimeoutWait(EventDataRoundState{}) require.NoError(t, err) - err = eventBus.PublishEventNewRound(EventDataRoundState{}) + err = eventBus.PublishEventNewRound(EventDataNewRound{}) require.NoError(t, err) - err = eventBus.PublishEventCompleteProposal(EventDataRoundState{}) + err = eventBus.PublishEventCompleteProposal(EventDataCompleteProposal{}) require.NoError(t, err) err = eventBus.PublishEventPolka(EventDataRoundState{}) require.NoError(t, err) diff --git a/types/events.go b/types/events.go index 33aa712ef..2f9dc76ee 100644 --- a/types/events.go +++ b/types/events.go @@ -43,6 +43,8 @@ func RegisterEventDatas(cdc *amino.Codec) { cdc.RegisterConcrete(EventDataNewBlockHeader{}, "tendermint/event/NewBlockHeader", nil) cdc.RegisterConcrete(EventDataTx{}, "tendermint/event/Tx", nil) cdc.RegisterConcrete(EventDataRoundState{}, "tendermint/event/RoundState", nil) + cdc.RegisterConcrete(EventDataNewRound{}, "tendermint/event/NewRound", nil) + cdc.RegisterConcrete(EventDataCompleteProposal{}, "tendermint/event/CompleteProposal", nil) cdc.RegisterConcrete(EventDataVote{}, "tendermint/event/Vote", nil) cdc.RegisterConcrete(EventDataProposalHeartbeat{}, "tendermint/event/ProposalHeartbeat", nil) cdc.RegisterConcrete(EventDataValidatorSetUpdates{}, "tendermint/event/ValidatorSetUpdates", nil) @@ -80,6 +82,27 @@ type EventDataRoundState struct { RoundState interface{} `json:"-"` } +type ValidatorInfo struct { + Address Address `json:"address"` + Index int `json:"index"` +} + +type EventDataNewRound struct { + Height int64 `json:"height"` + Round int `json:"round"` + Step string `json:"step"` + + Proposer ValidatorInfo `json:"proposer"` +} + +type EventDataCompleteProposal struct { + Height int64 `json:"height"` + Round int `json:"round"` + Step string `json:"step"` + + BlockID BlockID `json:"block_id"` +} + type EventDataVote struct { Vote *Vote } diff --git a/types/part_set_test.go b/types/part_set_test.go index e597088c6..daa2fa5c5 100644 --- a/types/part_set_test.go +++ b/types/part_set_test.go @@ -84,8 +84,7 @@ func TestWrongProof(t *testing.T) { } } -func TestPartSetHeaderSetValidateBasic(t *testing.T) { - +func TestPartSetHeaderValidateBasic(t *testing.T) { testCases := []struct { testName string malleatePartSetHeader func(*PartSetHeader) @@ -107,7 +106,6 @@ func TestPartSetHeaderSetValidateBasic(t *testing.T) { } func TestPartValidateBasic(t *testing.T) { - testCases := []struct { testName string malleatePart func(*Part) diff --git a/version/version.go b/version/version.go index aae545129..aa52a82ec 100644 --- a/version/version.go +++ b/version/version.go @@ -18,7 +18,7 @@ const ( // TMCoreSemVer is the current version of Tendermint Core. // It's the Semantic Version of the software. // Must be a string because scripts like dist.sh read this file. - TMCoreSemVer = "0.26.2" + TMCoreSemVer = "0.26.3" // ABCISemVer is the semantic version of the ABCI library ABCISemVer = "0.15.0"