From b33f73eaf13258147c191c416f9b7a11226ee460 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 2 Aug 2018 16:33:34 +0400 Subject: [PATCH 1/6] stop autofile and autogroup properly NOTE: from the ticker#Stop documentation: ``` Stop does not close the channel, to prevent a read from the channel succeeding incorrectly. https://golang.org/src/time/tick.go?s=1318:1341#L35 ``` Refs #2072 --- libs/autofile/autofile.go | 28 +++++++++++++++------------- libs/autofile/group.go | 14 ++++---------- libs/autofile/group_test.go | 2 +- 3 files changed, 20 insertions(+), 24 deletions(-) diff --git a/libs/autofile/autofile.go b/libs/autofile/autofile.go index 313da6789..2f1bb4fd5 100644 --- a/libs/autofile/autofile.go +++ b/libs/autofile/autofile.go @@ -35,18 +35,20 @@ const autoFileOpenDuration = 1000 * time.Millisecond // Automatically closes and re-opens file for writing. // This is useful for using a log file with the logrotate tool. type AutoFile struct { - ID string - Path string - ticker *time.Ticker - mtx sync.Mutex - file *os.File + ID string + Path string + ticker *time.Ticker + tickerStopped chan struct{} // closed when ticker is stopped + mtx sync.Mutex + file *os.File } func OpenAutoFile(path string) (af *AutoFile, err error) { af = &AutoFile{ - ID: cmn.RandStr(12) + ":" + path, - Path: path, - ticker: time.NewTicker(autoFileOpenDuration), + ID: cmn.RandStr(12) + ":" + path, + Path: path, + ticker: time.NewTicker(autoFileOpenDuration), + tickerStopped: make(chan struct{}), } if err = af.openFile(); err != nil { return @@ -58,18 +60,18 @@ func OpenAutoFile(path string) (af *AutoFile, err error) { func (af *AutoFile) Close() error { af.ticker.Stop() + close(af.tickerStopped) err := af.closeFile() sighupWatchers.removeAutoFile(af) return err } func (af *AutoFile) processTicks() { - for { - _, ok := <-af.ticker.C - if !ok { - return // Done. - } + select { + case <-af.ticker.C: af.closeFile() + case <-af.tickerStopped: + return } } diff --git a/libs/autofile/group.go b/libs/autofile/group.go index b4368ed9e..9b78c5110 100644 --- a/libs/autofile/group.go +++ b/libs/autofile/group.go @@ -199,21 +199,15 @@ func (g *Group) Flush() error { } func (g *Group) processTicks() { - for { - _, ok := <-g.ticker.C - if !ok { - return // Done. - } + select { + case <-g.ticker.C: g.checkHeadSizeLimit() g.checkTotalSizeLimit() + case <-g.Quit(): + return } } -// NOTE: for testing -func (g *Group) stopTicker() { - g.ticker.Stop() -} - // NOTE: this function is called manually in tests. func (g *Group) checkHeadSizeLimit() { limit := g.HeadSizeLimit() diff --git a/libs/autofile/group_test.go b/libs/autofile/group_test.go index c7e8725cf..d6b10a420 100644 --- a/libs/autofile/group_test.go +++ b/libs/autofile/group_test.go @@ -26,7 +26,7 @@ func createTestGroup(t *testing.T, headSizeLimit int64) *Group { g, err := OpenGroup(headPath) require.NoError(t, err, "Error opening Group") g.SetHeadSizeLimit(headSizeLimit) - g.stopTicker() + g.ticker.Stop() require.NotEqual(t, nil, g, "Failed to create Group") return g } From 4c5a143a70c804126fe169e44ebf2bdd95fd8a5f Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 2 Aug 2018 16:36:28 +0400 Subject: [PATCH 2/6] respawn receiveRoutine so we can properly exit Closes #2072 --- consensus/state.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consensus/state.go b/consensus/state.go index f66a872eb..435427a91 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -556,6 +556,7 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { defer func() { if r := recover(); r != nil { cs.Logger.Error("CONSENSUS FAILURE!!!", "err", r, "stack", string(debug.Stack())) + go cs.receiveRoutine(0) } }() @@ -588,7 +589,6 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { // go to the next step cs.handleTimeout(ti, rs) case <-cs.Quit(): - // NOTE: the internalMsgQueue may have signed messages from our // priv_val that haven't hit the WAL, but its ok because // priv_val tracks LastSig From 8ed99c2c136d4833376be3cc41bc673729cf722a Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 2 Aug 2018 16:42:25 +0400 Subject: [PATCH 3/6] exit from initSighupWatcher child goroutine also, remove excessive log message Refs #2072 --- libs/autofile/sighup_watcher.go | 12 +++++++++--- p2p/pex/addrbook.go | 1 - 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/libs/autofile/sighup_watcher.go b/libs/autofile/sighup_watcher.go index 56fbd4d86..f72f12fcd 100644 --- a/libs/autofile/sighup_watcher.go +++ b/libs/autofile/sighup_watcher.go @@ -18,13 +18,19 @@ var sighupCounter int32 // For testing func initSighupWatcher() { sighupWatchers = newSighupWatcher() - c := make(chan os.Signal, 1) - signal.Notify(c, syscall.SIGHUP) + hup := make(chan os.Signal, 1) + signal.Notify(hup, syscall.SIGHUP) + + quit := make(chan os.Signal, 1) + signal.Notify(quit, os.Interrupt, syscall.SIGTERM) go func() { - for range c { + select { + case <-hup: sighupWatchers.closeAll() atomic.AddInt32(&sighupCounter, 1) + case <-quit: + return } }() } diff --git a/p2p/pex/addrbook.go b/p2p/pex/addrbook.go index ef7d7edaa..9596b1d76 100644 --- a/p2p/pex/addrbook.go +++ b/p2p/pex/addrbook.go @@ -496,7 +496,6 @@ out: } saveFileTicker.Stop() a.saveToFile(a.filePath) - a.Logger.Info("Address handler done") } //---------------------------------------------------------- From b82138b0025d19cdb155c003ce7a8bd019bdfdc5 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 2 Aug 2018 16:48:12 +0400 Subject: [PATCH 4/6] update changelog --- CHANGELOG_PENDING.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index e85a6ed64..010aea40e 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -26,3 +26,4 @@ BUG FIXES: - [common] Safely handle cases where atomic write files already exist [#2109](https://github.com/tendermint/tendermint/issues/2109) - [privval] fix a deadline for accepting new connections in socket private validator. +- [node] Fully exit when CTRL-C is pressed even if consensus state panics [#2072] From d09a3a6d3a3bcc43f1007c323e57375d2a10d783 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 3 Aug 2018 11:24:55 +0400 Subject: [PATCH 5/6] stop gracefully instead of trying to resume ops Refs #2072 We most probably shouldn't be running any further when there is some unexpected panic. Some unknown error happened, and so we don't know if that will result in the validator signing an invalid thing. It might be worthwhile to explore a mechanism for manual resuming via some console or secure RPC system, but for now, halting the chain upon unexpected consensus bugs sounds like the better option. --- consensus/state.go | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/consensus/state.go b/consensus/state.go index 435427a91..6ffe6ef64 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -553,10 +553,30 @@ func (cs *ConsensusState) newStep() { // Updates (state transitions) happen on timeouts, complete proposals, and 2/3 majorities. // ConsensusState must be locked before any internal state is updated. func (cs *ConsensusState) receiveRoutine(maxSteps int) { + onExit := func(cs *ConsensusState) { + // NOTE: the internalMsgQueue may have signed messages from our + // priv_val that haven't hit the WAL, but its ok because + // priv_val tracks LastSig + + // close wal now that we're done writing to it + cs.wal.Stop() + cs.wal.Wait() + + close(cs.done) + } + defer func() { if r := recover(); r != nil { cs.Logger.Error("CONSENSUS FAILURE!!!", "err", r, "stack", string(debug.Stack())) - go cs.receiveRoutine(0) + // stop gracefully + // + // NOTE: We most probably shouldn't be running any further when there is + // some unexpected panic. Some unknown error happened, and so we don't + // know if that will result in the validator signing an invalid thing. It + // might be worthwhile to explore a mechanism for manual resuming via + // some console or secure RPC system, but for now, halting the chain upon + // unexpected consensus bugs sounds like the better option. + onExit(cs) } }() @@ -589,15 +609,7 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { // go to the next step cs.handleTimeout(ti, rs) case <-cs.Quit(): - // NOTE: the internalMsgQueue may have signed messages from our - // priv_val that haven't hit the WAL, but its ok because - // priv_val tracks LastSig - - // close wal now that we're done writing to it - cs.wal.Stop() - cs.wal.Wait() - - close(cs.done) + onExit(cs) return } } From b1cff0f9bffd4d88ca2c6e9b9bc8e6d44cd3b463 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 3 Aug 2018 11:34:58 +0400 Subject: [PATCH 6/6] [libs/autofile] create a Group ticker on Start 1) no need to stop the ticker in createTestGroup() method 2) now there is a symmetry - we start the ticker in OnStart(), we stop it in OnStop() Refs #2072 --- libs/autofile/group.go | 2 +- libs/autofile/group_test.go | 34 ++++++++++++++++++---------------- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/libs/autofile/group.go b/libs/autofile/group.go index 9b78c5110..e747f04dd 100644 --- a/libs/autofile/group.go +++ b/libs/autofile/group.go @@ -85,7 +85,6 @@ func OpenGroup(headPath string) (g *Group, err error) { Head: head, headBuf: bufio.NewWriterSize(head, 4096*10), Dir: dir, - ticker: time.NewTicker(groupCheckDuration), headSizeLimit: defaultHeadSizeLimit, totalSizeLimit: defaultTotalSizeLimit, minIndex: 0, @@ -102,6 +101,7 @@ func OpenGroup(headPath string) (g *Group, err error) { // OnStart implements Service by starting the goroutine that checks file and // group limits. func (g *Group) OnStart() error { + g.ticker = time.NewTicker(groupCheckDuration) go g.processTicks() return nil } diff --git a/libs/autofile/group_test.go b/libs/autofile/group_test.go index d6b10a420..d87bdba82 100644 --- a/libs/autofile/group_test.go +++ b/libs/autofile/group_test.go @@ -16,23 +16,25 @@ import ( cmn "github.com/tendermint/tendermint/libs/common" ) -// NOTE: Returned group has ticker stopped -func createTestGroup(t *testing.T, headSizeLimit int64) *Group { +func createTestGroupWithHeadSizeLimit(t *testing.T, headSizeLimit int64) *Group { testID := cmn.RandStr(12) testDir := "_test_" + testID err := cmn.EnsureDir(testDir, 0700) require.NoError(t, err, "Error creating dir") + headPath := testDir + "/myfile" g, err := OpenGroup(headPath) require.NoError(t, err, "Error opening Group") - g.SetHeadSizeLimit(headSizeLimit) - g.ticker.Stop() require.NotEqual(t, nil, g, "Failed to create Group") + + g.SetHeadSizeLimit(headSizeLimit) + return g } func destroyTestGroup(t *testing.T, g *Group) { g.Close() + err := os.RemoveAll(g.Dir) require.NoError(t, err, "Error removing test Group directory") } @@ -45,7 +47,7 @@ func assertGroupInfo(t *testing.T, gInfo GroupInfo, minIndex, maxIndex int, tota } func TestCheckHeadSizeLimit(t *testing.T) { - g := createTestGroup(t, 1000*1000) + g := createTestGroupWithHeadSizeLimit(t, 1000*1000) // At first, there are no files. assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 0, 0) @@ -107,7 +109,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { } func TestSearch(t *testing.T) { - g := createTestGroup(t, 10*1000) + g := createTestGroupWithHeadSizeLimit(t, 10*1000) // Create some files in the group that have several INFO lines in them. // Try to put the INFO lines in various spots. @@ -208,7 +210,7 @@ func TestSearch(t *testing.T) { } func TestRotateFile(t *testing.T) { - g := createTestGroup(t, 0) + g := createTestGroupWithHeadSizeLimit(t, 0) g.WriteLine("Line 1") g.WriteLine("Line 2") g.WriteLine("Line 3") @@ -238,7 +240,7 @@ func TestRotateFile(t *testing.T) { } func TestFindLast1(t *testing.T) { - g := createTestGroup(t, 0) + g := createTestGroupWithHeadSizeLimit(t, 0) g.WriteLine("Line 1") g.WriteLine("Line 2") @@ -262,7 +264,7 @@ func TestFindLast1(t *testing.T) { } func TestFindLast2(t *testing.T) { - g := createTestGroup(t, 0) + g := createTestGroupWithHeadSizeLimit(t, 0) g.WriteLine("Line 1") g.WriteLine("Line 2") @@ -286,7 +288,7 @@ func TestFindLast2(t *testing.T) { } func TestFindLast3(t *testing.T) { - g := createTestGroup(t, 0) + g := createTestGroupWithHeadSizeLimit(t, 0) g.WriteLine("Line 1") g.WriteLine("# a") @@ -310,7 +312,7 @@ func TestFindLast3(t *testing.T) { } func TestFindLast4(t *testing.T) { - g := createTestGroup(t, 0) + g := createTestGroupWithHeadSizeLimit(t, 0) g.WriteLine("Line 1") g.WriteLine("Line 2") @@ -332,7 +334,7 @@ func TestFindLast4(t *testing.T) { } func TestWrite(t *testing.T) { - g := createTestGroup(t, 0) + g := createTestGroupWithHeadSizeLimit(t, 0) written := []byte("Medusa") g.Write(written) @@ -353,7 +355,7 @@ func TestWrite(t *testing.T) { // test that Read reads the required amount of bytes from all the files in the // group and returns no error if n == size of the given slice. func TestGroupReaderRead(t *testing.T) { - g := createTestGroup(t, 0) + g := createTestGroupWithHeadSizeLimit(t, 0) professor := []byte("Professor Monster") g.Write(professor) @@ -382,7 +384,7 @@ func TestGroupReaderRead(t *testing.T) { // test that Read returns an error if number of bytes read < size of // the given slice. Subsequent call should return 0, io.EOF. func TestGroupReaderRead2(t *testing.T) { - g := createTestGroup(t, 0) + g := createTestGroupWithHeadSizeLimit(t, 0) professor := []byte("Professor Monster") g.Write(professor) @@ -413,7 +415,7 @@ func TestGroupReaderRead2(t *testing.T) { } func TestMinIndex(t *testing.T) { - g := createTestGroup(t, 0) + g := createTestGroupWithHeadSizeLimit(t, 0) assert.Zero(t, g.MinIndex(), "MinIndex should be zero at the beginning") @@ -422,7 +424,7 @@ func TestMinIndex(t *testing.T) { } func TestMaxIndex(t *testing.T) { - g := createTestGroup(t, 0) + g := createTestGroupWithHeadSizeLimit(t, 0) assert.Zero(t, g.MaxIndex(), "MaxIndex should be zero at the beginning")