From 077cf13a1f58727e99df0e3942aa717f5b817aa4 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 27 Mar 2017 15:41:45 -0400 Subject: [PATCH 1/4] consensus: timeout on replayLastBlock --- config/tendermint/config.go | 1 + config/tendermint_test/config.go | 1 + consensus/replay.go | 25 +++++++++++++++++++++---- consensus/state.go | 14 +++++++------- 4 files changed, 30 insertions(+), 11 deletions(-) diff --git a/config/tendermint/config.go b/config/tendermint/config.go index c210d6e01..ea2f1d43a 100644 --- a/config/tendermint/config.go +++ b/config/tendermint/config.go @@ -79,6 +79,7 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("block_size", 10000) // max number of txs mapConfig.SetDefault("block_part_size", 65536) // part size 64K mapConfig.SetDefault("disable_data_hash", false) + mapConfig.SetDefault("timeout_handshake", 10000) mapConfig.SetDefault("timeout_propose", 3000) mapConfig.SetDefault("timeout_propose_delta", 500) mapConfig.SetDefault("timeout_prevote", 1000) diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index 55e3adb4e..26a483358 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -93,6 +93,7 @@ func ResetConfig(localPath string) cfg.Config { mapConfig.SetDefault("block_size", 10000) mapConfig.SetDefault("block_part_size", 65536) // part size 64K mapConfig.SetDefault("disable_data_hash", false) + mapConfig.SetDefault("timeout_handshake", 10000) mapConfig.SetDefault("timeout_propose", 2000) mapConfig.SetDefault("timeout_propose_delta", 1) mapConfig.SetDefault("timeout_prevote", 10) diff --git a/consensus/replay.go b/consensus/replay.go index 6c4e65a03..4bdc2e870 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -190,6 +190,8 @@ func (h *Handshaker) NBlocks() int { return h.nBlocks } +var ErrReplayLastBlockTimeout = errors.New("Timed out waiting for last block to be replayed") + // TODO: retry the handshake/replay if it fails ? func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { // handshake is done via info request on the query conn @@ -207,7 +209,11 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { // replay blocks up to the latest in the blockstore _, err = h.ReplayBlocks(appHash, blockHeight, proxyApp) - if err != nil { + if err == ErrReplayLastBlockTimeout { + log.Warn("Failed to sync via handshake. Trying other means. If they fail, please increase the timeout_handshake parameter") + return nil + + } else if err != nil { return errors.New(Fmt("Error on replay: %v", err)) } @@ -320,6 +326,7 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store func (h *Handshaker) replayLastBlock(proxyApp proxy.AppConnConsensus) ([]byte, error) { mempool := types.MockMempool{} cs := NewConsensusState(h.config, h.state, proxyApp, h.store, mempool) + defer cs.Stop() evsw := types.NewEventSwitch() evsw.Start() @@ -328,9 +335,19 @@ func (h *Handshaker) replayLastBlock(proxyApp proxy.AppConnConsensus) ([]byte, e newBlockCh := subscribeToEvent(evsw, "consensus-replay", types.EventStringNewBlock(), 1) // run through the WAL, commit new block, stop - cs.Start() - <-newBlockCh // TODO: use a timeout and return err? - cs.Stop() + if _, err := cs.Start(); err != nil { + return nil, err + } + + timeout := h.config.GetInt("timeout_handshake") + timer := time.NewTimer(time.Duration(timeout) * time.Millisecond) + log.Notice("Attempting to replay last block", "height", h.store.Height(), "timeout", timeout) + + select { + case <-newBlockCh: + case <-timer.C: + return nil, ErrReplayLastBlockTimeout + } h.nBlocks += 1 diff --git a/consensus/state.go b/consensus/state.go index 23eaff748..cca9d2ed4 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -343,13 +343,7 @@ func (cs *ConsensusState) OnStart() error { cs.BaseService.OnStart() walFile := cs.config.GetString("cs_wal_file") - err := EnsureDir(path.Dir(walFile), 0700) - if err != nil { - log.Error("Error ensuring ConsensusState wal dir", "error", err.Error()) - return err - } - err = cs.OpenWAL(walFile) - if err != nil { + if err := cs.OpenWAL(walFile); err != nil { log.Error("Error loading ConsensusState wal", "error", err.Error()) return err } @@ -404,6 +398,12 @@ func (cs *ConsensusState) Wait() { // Open file to log all consensus messages and timeouts for deterministic accountability func (cs *ConsensusState) OpenWAL(walFile string) (err error) { + err = EnsureDir(path.Dir(walFile), 0700) + if err != nil { + log.Error("Error ensuring ConsensusState wal dir", "error", err.Error()) + return err + } + cs.mtx.Lock() defer cs.mtx.Unlock() wal, err := NewWAL(walFile, cs.config.GetBool("cs_wal_light")) From 85e83934a10d10f08dfd7bc7f0bd55573fe1c7bb Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 28 Mar 2017 12:07:32 -0400 Subject: [PATCH 2/4] fixes from review --- config/tendermint/config.go | 3 +++ consensus/replay.go | 2 +- consensus/state.go | 5 +++-- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/config/tendermint/config.go b/config/tendermint/config.go index ea2f1d43a..fd9075e3b 100644 --- a/config/tendermint/config.go +++ b/config/tendermint/config.go @@ -79,6 +79,8 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("block_size", 10000) // max number of txs mapConfig.SetDefault("block_part_size", 65536) // part size 64K mapConfig.SetDefault("disable_data_hash", false) + + // all timeouts are in ms mapConfig.SetDefault("timeout_handshake", 10000) mapConfig.SetDefault("timeout_propose", 3000) mapConfig.SetDefault("timeout_propose_delta", 500) @@ -87,6 +89,7 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("timeout_precommit", 1000) mapConfig.SetDefault("timeout_precommit_delta", 500) mapConfig.SetDefault("timeout_commit", 1000) + // make progress asap (no `timeout_commit`) on full precommit votes mapConfig.SetDefault("skip_timeout_commit", false) mapConfig.SetDefault("mempool_recheck", true) diff --git a/consensus/replay.go b/consensus/replay.go index 4bdc2e870..ff11a390b 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -326,7 +326,6 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store func (h *Handshaker) replayLastBlock(proxyApp proxy.AppConnConsensus) ([]byte, error) { mempool := types.MockMempool{} cs := NewConsensusState(h.config, h.state, proxyApp, h.store, mempool) - defer cs.Stop() evsw := types.NewEventSwitch() evsw.Start() @@ -338,6 +337,7 @@ func (h *Handshaker) replayLastBlock(proxyApp proxy.AppConnConsensus) ([]byte, e if _, err := cs.Start(); err != nil { return nil, err } + defer cs.Stop() timeout := h.config.GetInt("timeout_handshake") timer := time.NewTimer(time.Duration(timeout) * time.Millisecond) diff --git a/consensus/state.go b/consensus/state.go index cca9d2ed4..d4c63a681 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -358,8 +358,9 @@ func (cs *ConsensusState) OnStart() error { // we may have lost some votes if the process crashed // reload from consensus log to catchup if err := cs.catchupReplay(cs.Height); err != nil { - log.Error("Error on catchup replay", "error", err.Error()) - // let's go for it anyways, maybe we're fine + log.Error("Error on catchup replay. Proceeding to start ConsensusState anyway", "error", err.Error()) + // NOTE: if we ever do return an error here, + // make sure to stop the timeoutTicker } // now start the receiveRoutine From 4fd1471f11ba814b0edb3a6a4e8debc2510edbe2 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 28 Mar 2017 12:09:11 -0400 Subject: [PATCH 3/4] remove BaseService.OnStart --- blockchain/pool.go | 2 -- consensus/replay_file.go | 2 -- consensus/state.go | 1 - consensus/ticker.go | 1 - consensus/wal.go | 1 - node/node.go | 1 - proxy/multi_app_conn.go | 1 - 7 files changed, 9 deletions(-) diff --git a/blockchain/pool.go b/blockchain/pool.go index ef673b342..32db956c1 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -63,7 +63,6 @@ func NewBlockPool(start int, requestsCh chan<- BlockRequest, timeoutsCh chan<- s } func (pool *BlockPool) OnStart() error { - pool.BaseService.OnStart() go pool.makeRequestersRoutine() pool.startTime = time.Now() return nil @@ -409,7 +408,6 @@ func newBPRequester(pool *BlockPool, height int) *bpRequester { } func (bpr *bpRequester) OnStart() error { - bpr.BaseService.OnStart() go bpr.requestRoutine() return nil } diff --git a/consensus/replay_file.go b/consensus/replay_file.go index 6ff380880..5ad1b9457 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -127,8 +127,6 @@ func (pb *playback) replayReset(count int, newStepCh chan interface{}) error { } func (cs *ConsensusState) startForReplay() { - // don't want to start full cs - cs.BaseService.OnStart() log.Warn("Replay commands are disabled until someone updates them and writes tests") /* TODO:! diff --git a/consensus/state.go b/consensus/state.go index d4c63a681..63616cbd3 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -340,7 +340,6 @@ func (cs *ConsensusState) LoadCommit(height int) *types.Commit { } func (cs *ConsensusState) OnStart() error { - cs.BaseService.OnStart() walFile := cs.config.GetString("cs_wal_file") if err := cs.OpenWAL(walFile); err != nil { diff --git a/consensus/ticker.go b/consensus/ticker.go index 06a8f7d20..b318597d3 100644 --- a/consensus/ticker.go +++ b/consensus/ticker.go @@ -45,7 +45,6 @@ func NewTimeoutTicker() TimeoutTicker { } func (t *timeoutTicker) OnStart() error { - t.BaseService.OnStart() go t.timeoutRoutine() diff --git a/consensus/wal.go b/consensus/wal.go index 99035ee2e..6d8eb3819 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -55,7 +55,6 @@ func NewWAL(walFile string, light bool) (*WAL, error) { } func (wal *WAL) OnStart() error { - wal.BaseService.OnStart() size, err := wal.group.Head.Size() if err != nil { return err diff --git a/node/node.go b/node/node.go index f9c69c289..6815c779b 100644 --- a/node/node.go +++ b/node/node.go @@ -194,7 +194,6 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato } func (n *Node) OnStart() error { - n.BaseService.OnStart() // Create & add listener protocol, address := ProtocolAndAddress(n.config.GetString("node_laddr")) diff --git a/proxy/multi_app_conn.go b/proxy/multi_app_conn.go index 353ade35c..81e01aa29 100644 --- a/proxy/multi_app_conn.go +++ b/proxy/multi_app_conn.go @@ -72,7 +72,6 @@ func (app *multiAppConn) Query() AppConnQuery { } func (app *multiAppConn) OnStart() error { - app.BaseService.OnStart() // query connection querycli, err := app.clientCreator.NewABCIClient() From 09f7dabd5ef31bb16c3fa6b511a7d36ecae00229 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 28 Mar 2017 14:06:03 -0400 Subject: [PATCH 4/4] update comment --- consensus/replay.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/consensus/replay.go b/consensus/replay.go index ff11a390b..731e7d216 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -292,10 +292,11 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp p func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int, useReplayFunc bool) ([]byte, error) { // App is further behind than it should be, so we need to replay blocks. - // We replay all blocks from appBlockHeight+1 to storeBlockHeight-1, - // and let the final block be replayed through ReplayBlocks. + // We replay all blocks from appBlockHeight+1. + // If useReplayFunc == true, stop short of the last block + // so it can be replayed using the WAL in ReplayBlocks. // Note that we don't have an old version of the state, - // so we by-pass state validation using applyBlock here. + // so we by-pass state validation using sm.ApplyBlock. var appHash []byte var err error