diff --git a/blockchain/pool.go b/blockchain/pool.go index 0409976f3..06556154f 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -15,10 +15,11 @@ const ( maxTotalRequesters = 300 maxPendingRequests = maxTotalRequesters maxPendingRequestsPerPeer = 75 - peerTimeoutSeconds = 15 minRecvRate = 10240 // 10Kb/s ) +var peerTimeoutSeconds = time.Duration(15) // not const so we can override with tests + /* Peers self report their heights when we join the block pool. Starting from our latest pool.height, we request blocks diff --git a/blockchain/pool_test.go b/blockchain/pool_test.go index c91f33680..8e6dbee94 100644 --- a/blockchain/pool_test.go +++ b/blockchain/pool_test.go @@ -1,4 +1,3 @@ - package blockchain import ( @@ -10,6 +9,10 @@ import ( "github.com/tendermint/tendermint/types" ) +func init() { + peerTimeoutSeconds = time.Duration(2) +} + type testPeer struct { id string height int diff --git a/config/tendermint/config.go b/config/tendermint/config.go index bdeb6add5..6378607fe 100644 --- a/config/tendermint/config.go +++ b/config/tendermint/config.go @@ -62,12 +62,24 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("priv_validator_file", rootDir+"/priv_validator.json") mapConfig.SetDefault("db_backend", "leveldb") mapConfig.SetDefault("db_dir", rootDir+"/data") - mapConfig.SetDefault("vm_log", true) mapConfig.SetDefault("log_level", "info") mapConfig.SetDefault("rpc_laddr", "0.0.0.0:46657") mapConfig.SetDefault("prof_laddr", "") mapConfig.SetDefault("revision_file", rootDir+"/revision") - mapConfig.SetDefault("cswal", rootDir+"/cswal") + mapConfig.SetDefault("cswal", rootDir+"/data/cswal") + mapConfig.SetDefault("cswal_light", false) + + mapConfig.SetDefault("block_size", 10000) + mapConfig.SetDefault("timeout_propose", 3000) + mapConfig.SetDefault("timeout_propose_delta", 500) + mapConfig.SetDefault("timeout_prevote", 1000) + mapConfig.SetDefault("timeout_prevote_delta", 500) + mapConfig.SetDefault("timeout_precommit", 1000) + mapConfig.SetDefault("timeout_precommit_delta", 500) + mapConfig.SetDefault("timeout_commit", 1000) + mapConfig.SetDefault("mempool_recheck", true) + mapConfig.SetDefault("mempool_broadcast", true) + return mapConfig } diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index 21dbe9332..bb886a2b7 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -87,11 +87,23 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("db_backend", "memdb") mapConfig.SetDefault("db_dir", rootDir+"/data") mapConfig.SetDefault("log_level", "debug") - mapConfig.SetDefault("vm_log", true) mapConfig.SetDefault("rpc_laddr", "0.0.0.0:36657") mapConfig.SetDefault("prof_laddr", "") mapConfig.SetDefault("revision_file", rootDir+"/revision") - mapConfig.SetDefault("cswal", rootDir+"/cswal") + mapConfig.SetDefault("cswal", rootDir+"/data/cswal") + mapConfig.SetDefault("cswal_light", false) + + mapConfig.SetDefault("block_size", 10000) + mapConfig.SetDefault("timeout_propose", 100) + mapConfig.SetDefault("timeout_propose_delta", 1) + mapConfig.SetDefault("timeout_prevote", 1) + mapConfig.SetDefault("timeout_prevote_delta", 1) + mapConfig.SetDefault("timeout_precommit", 1) + mapConfig.SetDefault("timeout_precommit_delta", 1) + mapConfig.SetDefault("timeout_commit", 1) + mapConfig.SetDefault("mempool_recheck", true) + mapConfig.SetDefault("mempool_broadcast", true) + return mapConfig } diff --git a/consensus/reactor.go b/consensus/reactor.go index c013f6a12..664deb6bd 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -797,8 +797,8 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) { ps.mtx.Lock() defer ps.mtx.Unlock() - // Ignore duplicate messages. - if ps.Height == msg.Height && ps.Round == msg.Round && ps.Step == msg.Step { + // Ignore duplicates or decreases + if CompareHRS(msg.Height, msg.Round, msg.Step, ps.Height, ps.Round, ps.Step) <= 0 { return } diff --git a/consensus/replay.go b/consensus/replay.go index aa06d468e..e929847c0 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -130,7 +130,7 @@ func (cs *ConsensusState) catchupReplay(height int) error { } else if len(msgBytes) == 1 && msgBytes[0] == '\n' { continue } - // the first msg is (usually) the NewHeight event, so we can ignore it + // the first msg is the NewHeight event (if we're not at the beginning), so we can ignore it if !beginning && i == 1 { continue } diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 2e93450a5..f18cc0ccb 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -63,7 +63,7 @@ func TestReplayCatchup(t *testing.T) { func openWAL(t *testing.T, cs *ConsensusState, file string) { // open the wal - wal, err := NewWAL(file) + wal, err := NewWAL(file, config.GetBool("cswal_light")) if err != nil { t.Fatal(err) } diff --git a/consensus/state.go b/consensus/state.go index 201586643..6b12d05de 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -18,16 +18,55 @@ import ( "github.com/tendermint/tendermint/types" ) -var ( - timeoutPropose0 = 3000 * time.Millisecond // Wait this long for a proposal - timeoutProposeDelta = 0500 * time.Millisecond // timeoutProposeN is timeoutPropose0 + timeoutProposeDelta*N - timeoutPrevote0 = 1000 * time.Millisecond // After any +2/3 prevotes received, wait this long for stragglers. - timeoutPrevoteDelta = 0500 * time.Millisecond // timeoutPrevoteN is timeoutPrevote0 + timeoutPrevoteDelta*N - timeoutPrecommit0 = 1000 * time.Millisecond // After any +2/3 precommits received, wait this long for stragglers. - timeoutPrecommitDelta = 0500 * time.Millisecond // timeoutPrecommitN is timeoutPrecommit0 + timeoutPrecommitDelta*N - timeoutCommit = 1000 * time.Millisecond // After +2/3 commits received for committed block, wait this long for stragglers in the next height's RoundStepNewHeight. +//----------------------------------------------------------------------------- +// Timeout Parameters + +// All in milliseconds +type TimeoutParams struct { + Propose0 int + ProposeDelta int + Prevote0 int + PrevoteDelta int + Precommit0 int + PrecommitDelta int + Commit0 int +} -) +// Wait this long for a proposal +func (tp *TimeoutParams) Propose(round int) time.Duration { + return time.Duration(tp.Propose0+tp.ProposeDelta*round) * time.Millisecond +} + +// After receiving any +2/3 prevote, wait this long for stragglers +func (tp *TimeoutParams) Prevote(round int) time.Duration { + return time.Duration(tp.Prevote0+tp.PrevoteDelta*round) * time.Millisecond +} + +// After receiving any +2/3 precommits, wait this long for stragglers +func (tp *TimeoutParams) Precommit(round int) time.Duration { + return time.Duration(tp.Precommit0+tp.PrecommitDelta*round) * time.Millisecond +} + +// After receiving +2/3 precommits for a single block (a commit), wait this long for stragglers in the next height's RoundStepNewHeight +func (tp *TimeoutParams) Commit(t time.Time) time.Time { + return t.Add(time.Duration(tp.Commit0) * time.Millisecond) +} + +// Initialize parameters from config file +func InitTimeoutParamsFromConfig() *TimeoutParams { + return &TimeoutParams{ + Propose0: config.GetInt("timeout_propose"), + ProposeDelta: config.GetInt("timeout_propose_delta"), + Prevote0: config.GetInt("timeout_prevote"), + PrevoteDelta: config.GetInt("timeout_prevote_delta"), + Precommit0: config.GetInt("timeout_precommit"), + PrecommitDelta: config.GetInt("timeout_precommit_delta"), + Commit0: config.GetInt("timeout_commit"), + } +} + +//----------------------------------------------------------------------------- +// Errors var ( ErrInvalidProposalSignature = errors.New("Error invalid proposal signature") @@ -188,6 +227,7 @@ type ConsensusState struct { timeoutTicker *time.Ticker // ticker for timeouts tickChan chan timeoutInfo // start the timeoutTicker in the timeoutRoutine tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine + timeoutParams *TimeoutParams // parameters and functions for timeout intervals evsw *events.EventSwitch @@ -206,6 +246,7 @@ func NewConsensusState(state *sm.State, proxyAppConn proxy.AppConn, blockStore * timeoutTicker: new(time.Ticker), tickChan: make(chan timeoutInfo, tickTockBufferSize), tockChan: make(chan timeoutInfo, tickTockBufferSize), + timeoutParams: InitTimeoutParamsFromConfig(), } cs.updateToState(state) // Don't call scheduleRound0 yet. @@ -278,13 +319,16 @@ func (cs *ConsensusState) startRoutines(maxSteps int) { func (cs *ConsensusState) OnStop() { cs.QuitService.OnStop() + if cs.wal != nil && cs.IsRunning() { + cs.wal.Wait() + } } // Open file to log all consensus messages and timeouts for deterministic accountability func (cs *ConsensusState) OpenWAL(file string) (err error) { cs.mtx.Lock() defer cs.mtx.Unlock() - wal, err := NewWAL(file) + wal, err := NewWAL(file, config.GetBool("cswal_light")) if err != nil { return err } @@ -453,9 +497,9 @@ func (cs *ConsensusState) updateToState(state *sm.State) { // to be gathered for the first block. // And alternative solution that relies on clocks: // cs.StartTime = state.LastBlockTime.Add(timeoutCommit) - cs.StartTime = time.Now().Add(timeoutCommit) + cs.StartTime = cs.timeoutParams.Commit(time.Now()) } else { - cs.StartTime = cs.CommitTime.Add(timeoutCommit) + cs.StartTime = cs.timeoutParams.Commit(cs.CommitTime) } cs.CommitTime = time.Time{} cs.Validators = validators @@ -575,6 +619,19 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { // go to the next step cs.handleTimeout(ti, rs) case <-cs.Quit: + + // drain the internalMsgQueue in case we eg. signed a proposal but it didn't hit the wal + FLUSH: + for { + select { + case mi = <-cs.internalMsgQueue: + cs.wal.Save(mi) + cs.handleMsg(mi, rs) + default: + break FLUSH + } + } + // close wal now that we're done writing to it if cs.wal != nil { cs.wal.Close() @@ -598,8 +655,10 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { err = cs.setProposal(msg.Proposal) case *BlockPartMessage: // if the proposal is complete, we'll enterPrevote or tryFinalizeCommit - // if we're the only validator, the enterPrevote may take us through to the next round _, err = cs.addProposalBlockPart(msg.Height, msg.Part) + if err != nil && msg.Round != cs.Round { + err = nil + } case *VoteMessage: // attempt to add the vote and dupeout the validator if its a duplicate signature // if the vote gives us a 2/3-any or 2/3-one, we transition @@ -618,7 +677,7 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { log.Warn("Unknown msg type", reflect.TypeOf(msg)) } if err != nil { - log.Error("Error with msg", "type", reflect.TypeOf(msg), "error", err, "msg", msg) + log.Error("Error with msg", "type", reflect.TypeOf(msg), "peer", peerKey, "error", err, "msg", msg) } } @@ -726,8 +785,8 @@ func (cs *ConsensusState) enterPropose(height int, round int) { } }() - // This step times out after `timeoutPropose` - cs.scheduleTimeout(timeoutPropose0+timeoutProposeDelta*time.Duration(round), height, round, RoundStepPropose) + // If we don't get the proposal and all block parts quick enough, enterPrevote + cs.scheduleTimeout(cs.timeoutParams.Propose(round), height, round, RoundStepPropose) // Nothing more to do if we're not a validator if cs.privValidator == nil { @@ -818,8 +877,19 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts return } + maxBlockSize := config.GetInt("block_size") + // Mempool validated transactions - txs := cs.mempool.Reap() + // if block_size < 0, no txs will be included + var txs []types.Tx + if maxBlockSize >= 0 { + txs = cs.mempool.Reap() + } + + // Cap the number of txs in a block + if maxBlockSize > 0 && maxBlockSize < len(txs) { + txs = txs[:maxBlockSize] + } block = &types.Block{ Header: &types.Header{ @@ -928,8 +998,8 @@ func (cs *ConsensusState) enterPrevoteWait(height int, round int) { cs.newStep() }() - // After `timeoutPrevote0+timeoutPrevoteDelta*round`, enterPrecommit() - cs.scheduleTimeout(timeoutPrevote0+timeoutPrevoteDelta*time.Duration(round), height, round, RoundStepPrevoteWait) + // Wait for some more prevotes; enterPrecommit + cs.scheduleTimeout(cs.timeoutParams.Prevote(round), height, round, RoundStepPrevoteWait) } // Enter: +2/3 precomits for block or nil. @@ -978,9 +1048,9 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) { // +2/3 prevoted nil. Unlock and precommit nil. if len(hash) == 0 { if cs.LockedBlock == nil { - log.Info("enterPrecommit: +2/3 prevoted for nil.") + log.Notice("enterPrecommit: +2/3 prevoted for nil.") } else { - log.Info("enterPrecommit: +2/3 prevoted for nil. Unlocking") + log.Notice("enterPrecommit: +2/3 prevoted for nil. Unlocking") cs.LockedRound = 0 cs.LockedBlock = nil cs.LockedBlockParts = nil @@ -994,7 +1064,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) { // If we're already locked on that block, precommit it, and update the LockedRound if cs.LockedBlock.HashesTo(hash) { - log.Info("enterPrecommit: +2/3 prevoted locked block. Relocking") + log.Notice("enterPrecommit: +2/3 prevoted locked block. Relocking") cs.LockedRound = round cs.evsw.FireEvent(types.EventStringRelock(), cs.RoundStateEvent()) cs.signAddVote(types.VoteTypePrecommit, hash, partsHeader) @@ -1003,7 +1073,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) { // If +2/3 prevoted for proposal block, stage and precommit it if cs.ProposalBlock.HashesTo(hash) { - log.Info("enterPrecommit: +2/3 prevoted proposal block. Locking", "hash", hash) + log.Notice("enterPrecommit: +2/3 prevoted proposal block. Locking", "hash", hash) // Validate the block. if err := cs.state.ValidateBlock(cs.ProposalBlock); err != nil { PanicConsensus(Fmt("enterPrecommit: +2/3 prevoted for an invalid block: %v", err)) @@ -1049,8 +1119,8 @@ func (cs *ConsensusState) enterPrecommitWait(height int, round int) { cs.newStep() }() - // After `timeoutPrecommit0+timeoutPrecommitDelta*round`, enterNewRound() - cs.scheduleTimeout(timeoutPrecommit0+timeoutPrecommitDelta*time.Duration(round), height, round, RoundStepPrecommitWait) + // Wait for some more precommits; enterNewRound + cs.scheduleTimeout(cs.timeoutParams.Precommit(round), height, round, RoundStepPrecommitWait) } @@ -1064,7 +1134,7 @@ func (cs *ConsensusState) enterCommit(height int, commitRound int) { defer func() { // Done enterCommit: - // keep ca.Round the same, it points to the right Precommits set. + // keep cs.Round the same, commitRound points to the right Precommits set. cs.updateRoundStep(cs.Round, RoundStepCommit) cs.CommitRound = commitRound cs.newStep() @@ -1232,7 +1302,7 @@ func (cs *ConsensusState) setProposal(proposal *types.Proposal) error { } // NOTE: block is not necessarily valid. -// This can trigger us to go into enterPrevote asynchronously (before we timeout of propose) or to attempt to commit +// Asynchronously triggers either enterPrevote (before we timeout of propose) or tryFinalizeCommit, once we have the full block. func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (added bool, err error) { // Blocks might be reused, so round mismatch is OK if cs.Height != height { diff --git a/consensus/state_test.go b/consensus/state_test.go index 1def6ae8e..b6e55d6ca 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -11,6 +11,10 @@ import ( "github.com/tendermint/tendermint/types" ) +func (tp *TimeoutParams) ensureProposeTimeout() time.Duration { + return time.Duration(tp.Propose0*2) * time.Millisecond +} + /* ProposeSuite @@ -44,12 +48,6 @@ x * TestHalt1 - if we see +2/3 precommits after timing out into new round, we sh //---------------------------------------------------------------------------------------------------- // ProposeSuite -func init() { - fmt.Println("") - timeoutPropose0 = 100 * time.Millisecond - timeoutProposeDelta = 1 * time.Millisecond -} - func TestProposerSelection0(t *testing.T) { cs1, vss := randConsensusState(4) height, round := cs1.Height, cs1.Round @@ -124,7 +122,7 @@ func TestEnterProposeNoPrivValidator(t *testing.T) { startTestRound(cs, height, round) // if we're not a validator, EnterPropose should timeout - ticker := time.NewTicker(timeoutPropose0 * 2) + ticker := time.NewTicker(cs.timeoutParams.ensureProposeTimeout()) select { case <-timeoutCh: case <-ticker.C: @@ -165,7 +163,7 @@ func TestEnterProposeYesPrivValidator(t *testing.T) { } // if we're a validator, enterPropose should not timeout - ticker := time.NewTicker(timeoutPropose0 * 2) + ticker := time.NewTicker(cs.timeoutParams.ensureProposeTimeout()) select { case <-timeoutCh: t.Fatal("Expected EnterPropose not to timeout") diff --git a/consensus/wal.go b/consensus/wal.go index 46f7a9df2..56bb294e4 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -37,9 +37,13 @@ var _ = wire.RegisterInterface( type WAL struct { fp *os.File exists bool // if the file already existed (restarted process) + + done chan struct{} + + light bool // ignore block parts } -func NewWAL(file string) (*WAL, error) { +func NewWAL(file string, light bool) (*WAL, error) { var walExists bool if _, err := os.Stat(file); err == nil { walExists = true @@ -51,12 +55,21 @@ func NewWAL(file string) (*WAL, error) { return &WAL{ fp: fp, exists: walExists, + done: make(chan struct{}), + light: light, }, nil } // called in newStep and for each pass in receiveRoutine func (wal *WAL) Save(msg ConsensusLogMessageInterface) { if wal != nil { + if wal.light { + if m, ok := msg.(msgInfo); ok { + if _, ok := m.Msg.(*BlockPartMessage); ok { + return + } + } + } var n int var err error wire.WriteJSON(ConsensusLogMessage{time.Now(), msg}, wal.fp, &n, &err) @@ -67,11 +80,16 @@ func (wal *WAL) Save(msg ConsensusLogMessageInterface) { } } -// Must not be called concurrently. +// Must not be called concurrently with a write. func (wal *WAL) Close() { if wal != nil { wal.fp.Close() } + wal.done <- struct{}{} +} + +func (wal *WAL) Wait() { + <-wal.done } func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) { diff --git a/consensus/wal_test.go b/consensus/wal_test.go index 8348ca24d..808a5e414 100644 --- a/consensus/wal_test.go +++ b/consensus/wal_test.go @@ -30,7 +30,7 @@ func TestSeek(t *testing.T) { } f.Close() - wal, err := NewWAL(path.Join(os.TempDir(), name)) + wal, err := NewWAL(path.Join(os.TempDir(), name), config.GetBool("cswal_light")) if err != nil { t.Fatal(err) } diff --git a/glide.lock b/glide.lock index 03c9c79ca..dd884dd42 100644 --- a/glide.lock +++ b/glide.lock @@ -1,70 +1,111 @@ -hash: 3a6f5e164fee64da0e1cc9f6617851fb11c7741159d906cc981cd2fc19bb0396 -updated: 2016-01-06T12:53:34.83503023-08:00 +hash: f3eab3f91c9d2c07574e8ec6f2f5d56bd946af1b061533a0baf9db8765f97a51 +updated: 2016-03-05T17:20:40.721925401-05:00 imports: -- name: github.com/codegangsta/cli - version: c31a7975863e7810c92e2e288a9ab074f9a88f29 - name: github.com/gogo/protobuf - version: b25331f05786694d634d632a8043fef600a84e62 + version: f4cc07910fc38f5b6b8d6e75d7457cf504157b6c subpackages: - - /proto + - proto - name: github.com/golang/protobuf - version: 04eac41517df87aa1adec44b1c032a027b546f05 + version: c75fbf01dc6cb73649c4cd4326182c3e44aa9dbb + subpackages: + - proto - name: github.com/golang/snappy - version: 723cc1e459b8eea2dea4583200fd60757d40097a + version: 5f1c01d9f64b941dd9582c638279d046eda6ca31 - name: github.com/gorilla/websocket - version: 3986be78bf859e01f01af631ad76da5b269d270c + version: c45a635370221f34fea2d5163fd156fcb4e38e8a - name: github.com/inconshreveable/log15 version: 210d6fdc4d979ef6579778f1b6ed84571454abb4 + subpackages: + - stack + - term - name: github.com/mattn/go-colorable - version: 3dac7b4f76f6e17fb39b768b89e3783d16e237fe + version: 9cbef7c35391cca05f15f8181dc0b18bc9736dbb +- name: github.com/mattn/go-isatty + version: 56b76bdf51f7708750eac80fa38b952bb9f32639 - name: github.com/naoina/go-stringutil version: 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b - name: github.com/naoina/toml version: 751171607256bb66e64c9f0220c00662420c38e9 -- name: github.com/onsi/ginkgo - version: e43390e35a4a88f3f95d5ddf9055efb7a1170469 -- name: github.com/onsi/gomega - version: 0fe204460da2c8fa1babcaac196e694de8f1aaa1 + subpackages: + - ast - name: github.com/sfreiberg/gotwilio - version: b7230c284bd0c1614c94d00b9998c49f9a2737d8 + version: f024bbefe80fdb7bcc8c43b6add05dae97744e0e - name: github.com/spf13/pflag version: 7f60f83a2c81bc3c3c0d5297f61ddfa68da9d3b7 - name: github.com/syndtr/goleveldb - version: 5acacf6e72d3aeaf26dd3d3f163c635d3ef1e6e6 + version: ad0d8b2ab58a55ed5c58073aa46451d5e1ca1280 + subpackages: + - leveldb + - leveldb/errors + - leveldb/opt + - leveldb/cache + - leveldb/comparer + - leveldb/filter + - leveldb/iterator + - leveldb/journal + - leveldb/memdb + - leveldb/storage + - leveldb/table + - leveldb/util - name: github.com/tendermint/ed25519 version: fdac6641497281ed1cc368687ec6377e96e02b24 + subpackages: + - extra25519 + - edwards25519 - name: github.com/tendermint/flowcontrol version: 84d9671090430e8ec80e35b339907e0579b999eb +- name: github.com/tendermint/go-alert + version: b824a5721dd8bdda6e5f3033ea35df3d82e4516d - name: github.com/tendermint/go-clist version: 634527f5b60fd7c71ca811262493df2ad65ee0ca - name: github.com/tendermint/go-common - version: f592570310c9512c9bdc78335927ffb15fd58975 + version: 1559ae1ac90c88b1373ff114c409399c5a1cedac - name: github.com/tendermint/go-config - version: 3b895c7ce4999ee6fff7b7ca6253f0b41d9bf85c + version: c077af2c1ecf584fb797fd1956758545b25d952b - name: github.com/tendermint/go-crypto - version: ea56ed5ea24c2d6928be62d9921ff99643dfe8db + version: 76ba23e4c0c627b8c66d1f97b6a18dc77f4f0297 - name: github.com/tendermint/go-db - version: 28d39f8726c76b163e881c3d05dad227c93200ae + version: a7878f1d0d8eaebf15f87bc2df15f7a1088cce7f +- name: github.com/tendermint/go-events + version: 7b75ca7bb55aa25e9ef765eb8c0b69486b227357 - name: github.com/tendermint/go-logger - version: 980f02a5001b46f02ab3fbb036531d4ea789d2bf + version: 4901b71ade2b834ca0f4c2ca69edb96792dca05b - name: github.com/tendermint/go-logio version: 04f3aa0a3b38d06dcadefbafd988c8b85e499225 - name: github.com/tendermint/go-merkle - version: 0df23fe1f8e5d82baeeea02e902b2d3abd6bece4 + version: 67b535ce9633be7df575dc3a7833fa2301020c25 - name: github.com/tendermint/go-p2p - version: 1f2c1d07600b9612a1f92f42a3c8c893eafd922c + version: 7f6aad20fbad6ef1a132d5a8bebd18f3521fff1a subpackages: - upnp +- name: github.com/tendermint/go-rpc + version: 1410693eae5400a50efbbac3f23b9e3e94b7d6c8 + subpackages: + - client + - types + - server - name: github.com/tendermint/go-wire - version: 009f9185cd906f7edd33334c6d9fcf1b938138e0 + version: 9acb294893c790427e2b9abf2877e69690cd5b6c - name: github.com/tendermint/log15 version: 6e460758f10ef42a4724b8e4a82fee59aaa0e41d - name: github.com/tendermint/tmsp - version: 1b7243a9909b86dc668f374fd68b17a405740bc4 + version: 72540f9cac4840989cb05b147cc89be8cd91f043 subpackages: - - /types + - types + - example/dummy + - example/nil + - client - name: golang.org/x/crypto - version: 552e9d568fde9701ea1944fb01c8aadaceaa7353 + version: 5dc8cb4b8a8eb076cbb5a06bc3b8682c15bdbbd3 + subpackages: + - ripemd160 + - nacl/box + - nacl/secretbox + - curve25519 + - salsa20/salsa + - poly1305 +- name: golang.org/x/sys + version: 7a56174f0086b32866ebd746a794417edbc678a1 subpackages: - - /ripemd160 + - unix devImports: [] diff --git a/mempool/mempool.go b/mempool/mempool.go index 2af9bc0e1..1d0a375d6 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -223,12 +223,12 @@ func (mem *Mempool) Update(height int, txs []types.Tx) { // Remove transactions that are already in txs. goodTxs := mem.filterTxs(txsMap) // Recheck mempool txs - // TODO: make optional - mem.recheckTxs(goodTxs) - - // At this point, mem.txs are being rechecked. - // mem.recheckCursor re-scans mem.txs and possibly removes some txs. - // Before mem.Reap(), we should wait for mem.recheckCursor to be nil. + if config.GetBool("mempool_recheck") { + mem.recheckTxs(goodTxs) + // At this point, mem.txs are being rechecked. + // mem.recheckCursor re-scans mem.txs and possibly removes some txs. + // Before mem.Reap(), we should wait for mem.recheckCursor to be nil. + } } func (mem *Mempool) filterTxs(blockTxsMap map[string]struct{}) []types.Tx { diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index 1d93efc90..82738c180 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -5,6 +5,7 @@ import ( "sync" "testing" + _ "github.com/tendermint/tendermint/config/tendermint_test" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" "github.com/tendermint/tmsp/example/counter" diff --git a/mempool/reactor.go b/mempool/reactor.go index 9a8649fd2..dc4c9fede 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -101,6 +101,10 @@ type Peer interface { // TODO: Handle mempool or reactor shutdown? // As is this routine may block forever if no new txs come in. func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) { + if !config.GetBool("mempool_broadcast") { + return + } + var next *clist.CElement for { if !memR.IsRunning() { diff --git a/node/node.go b/node/node.go index 53805e211..8beafcee8 100644 --- a/node/node.go +++ b/node/node.go @@ -58,8 +58,9 @@ func NewNode(privValidator *types.PrivValidator) *Node { proxyAppConnMempool := getProxyApp(proxyAddr, state.AppHash) proxyAppConnConsensus := getProxyApp(proxyAddr, state.AppHash) - // add the chainid to the global config + // add the chainid and number of validators to the global config config.Set("chain_id", state.ChainID) + config.Set("num_vals", state.Validators.Size()) // Generate node PrivKey privKey := crypto.GenPrivKeyEd25519() diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 23cd56163..53604c0fc 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -7,8 +7,10 @@ import ( ) var Routes = map[string]*rpc.RPCFunc{ - "subscribe": rpc.NewWSRPCFunc(SubscribeResult, "event"), - "unsubscribe": rpc.NewWSRPCFunc(UnsubscribeResult, "event"), + // subscribe/unsubscribe are reserved for websocket events. + "subscribe": rpc.NewWSRPCFunc(SubscribeResult, "event"), + "unsubscribe": rpc.NewWSRPCFunc(UnsubscribeResult, "event"), + "status": rpc.NewRPCFunc(StatusResult, ""), "net_info": rpc.NewRPCFunc(NetInfoResult, ""), "dial_seeds": rpc.NewRPCFunc(DialSeedsResult, "seeds"), @@ -20,7 +22,8 @@ var Routes = map[string]*rpc.RPCFunc{ "broadcast_tx_sync": rpc.NewRPCFunc(BroadcastTxSyncResult, "tx"), "broadcast_tx_async": rpc.NewRPCFunc(BroadcastTxAsyncResult, "tx"), "unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxsResult, ""), - // subscribe/unsubscribe are reserved for websocket events. + + "unsafe_set_config": rpc.NewRPCFunc(UnsafeSetConfigResult, "type,key,value"), } func SubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) { @@ -126,3 +129,11 @@ func BroadcastTxAsyncResult(tx []byte) (ctypes.TMResult, error) { return r, nil } } + +func UnsafeSetConfigResult(typ, key, value string) (ctypes.TMResult, error) { + if r, err := UnsafeSetConfig(typ, key, value); err != nil { + return nil, err + } else { + return r, nil + } +} diff --git a/rpc/core/status.go b/rpc/core/status.go index bf3d69ffe..8f056fd4e 100644 --- a/rpc/core/status.go +++ b/rpc/core/status.go @@ -1,6 +1,9 @@ package core import ( + "fmt" + "strconv" + ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" ) @@ -28,3 +31,28 @@ func Status() (*ctypes.ResultStatus, error) { LatestBlockHeight: latestHeight, LatestBlockTime: latestBlockTime}, nil } + +func UnsafeSetConfig(typ, key, value string) (*ctypes.ResultUnsafeSetConfig, error) { + switch typ { + case "string": + config.Set(key, value) + case "int": + val, err := strconv.Atoi(value) + if err != nil { + return nil, fmt.Errorf("non-integer value found. key:%s; value:%s; err:%v", key, value, err) + } + config.Set(key, val) + case "bool": + switch value { + case "true": + config.Set(key, true) + case "false": + config.Set(key, false) + default: + return nil, fmt.Errorf("bool value must be true or false. got %s", value) + } + default: + return nil, fmt.Errorf("Unknown type %s", typ) + } + return &ctypes.ResultUnsafeSetConfig{}, nil +} diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index b56d701b9..660c6e0c1 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -68,6 +68,8 @@ type ResultUnconfirmedTxs struct { Txs []types.Tx `json:"txs"` } +type ResultUnsafeSetConfig struct{} + type ResultSubscribe struct { } @@ -105,6 +107,9 @@ const ( ResultTypeSubscribe = byte(0x80) ResultTypeUnsubscribe = byte(0x81) ResultTypeEvent = byte(0x82) + + // 0xa bytes for testing + ResultTypeUnsafeSetConfig = byte(0xa0) ) type TMResult interface { @@ -127,4 +132,5 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&ResultSubscribe{}, ResultTypeSubscribe}, wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe}, wire.ConcreteType{&ResultEvent{}, ResultTypeEvent}, + wire.ConcreteType{&ResultUnsafeSetConfig{}, ResultTypeUnsafeSetConfig}, ) diff --git a/rpc/test/client_test.go b/rpc/test/client_test.go index 64f557586..4b5b66db4 100644 --- a/rpc/test/client_test.go +++ b/rpc/test/client_test.go @@ -11,6 +11,10 @@ import ( //-------------------------------------------------------------------------------- // Test the HTTP client +//-------------------------------------------------------------------------------- + +//-------------------------------------------------------------------------------- +// status func TestURIStatus(t *testing.T) { tmResult := new(ctypes.TMResult) @@ -39,6 +43,63 @@ func testStatus(t *testing.T, statusI interface{}) { } } +//-------------------------------------------------------------------------------- +// unsafe_set_config + +var stringVal = "my string" +var intVal = 987654321 +var boolVal = true + +// don't change these +var testCasesUnsafeSetConfig = [][]string{ + []string{"string", "key1", stringVal}, + []string{"int", "key2", fmt.Sprintf("%v", intVal)}, + []string{"bool", "key3", fmt.Sprintf("%v", boolVal)}, +} + +func TestURIUnsafeSetConfig(t *testing.T) { + for _, testCase := range testCasesUnsafeSetConfig { + tmResult := new(ctypes.TMResult) + _, err := clientURI.Call("unsafe_set_config", map[string]interface{}{ + "type": testCase[0], + "key": testCase[1], + "value": testCase[2], + }, tmResult) + if err != nil { + t.Fatal(err) + } + } + testUnsafeSetConfig(t) +} + +func TestJSONUnsafeSetConfig(t *testing.T) { + for _, testCase := range testCasesUnsafeSetConfig { + tmResult := new(ctypes.TMResult) + _, err := clientJSON.Call("unsafe_set_config", []interface{}{testCase[0], testCase[1], testCase[2]}, tmResult) + if err != nil { + t.Fatal(err) + } + } + testUnsafeSetConfig(t) +} + +func testUnsafeSetConfig(t *testing.T) { + s := config.GetString("key1") + if s != stringVal { + t.Fatalf("got %v, expected %v", s, stringVal) + } + + i := config.GetInt("key2") + if i != intVal { + t.Fatalf("got %v, expected %v", i, intVal) + } + + b := config.GetBool("key3") + if b != boolVal { + t.Fatalf("got %v, expected %v", b, boolVal) + } +} + /*func TestURIBroadcastTx(t *testing.T) { testBroadcastTx(t, "HTTP") }*/