diff --git a/consensus/wal.go b/consensus/wal.go index ba89cd1aa..d56ede26d 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -112,11 +112,20 @@ func (wal *baseWAL) OnStart() error { return err } +// Stop the underlying autofile group. +// Use Wait() to ensure it's finished shutting down +// before cleaning up files. func (wal *baseWAL) OnStop() { wal.group.Stop() wal.group.Close() } +// Wait for the underlying autofile group to finish shutting down +// so it's safe to cleanup files. +func (wal *baseWAL) Wait() { + wal.group.Wait() +} + // Write is called in newStep and for each receive on the // peerMsgQueue and the timeoutTicker. // NOTE: does not call fsync() diff --git a/consensus/wal_test.go b/consensus/wal_test.go index c2a7d8bba..93beb68bb 100644 --- a/consensus/wal_test.go +++ b/consensus/wal_test.go @@ -39,7 +39,12 @@ func TestWALTruncate(t *testing.T) { wal.SetLogger(log.TestingLogger()) err = wal.Start() require.NoError(t, err) - defer wal.Stop() + defer func() { + wal.Stop() + // wait for the wal to finish shutting down so we + // can safely remove the directory + wal.Wait() + }() //60 block's size nearly 70K, greater than group's headBuf size(4096 * 10), when headBuf is full, truncate content will Flush to the file. //at this time, RotateFile is called, truncate content exist in each file. diff --git a/libs/autofile/group.go b/libs/autofile/group.go index 1ec6b240d..7e9269461 100644 --- a/libs/autofile/group.go +++ b/libs/autofile/group.go @@ -67,6 +67,11 @@ type Group struct { minIndex int // Includes head maxIndex int // Includes head, where Head will move to + // close this when the processTicks routine is done. + // this ensures we can cleanup the dir after calling Stop + // and the routine won't be trying to access it anymore + doneProcessTicks chan struct{} + // TODO: When we start deleting files, we need to start tracking GroupReaders // and their dependencies. } @@ -90,6 +95,7 @@ func OpenGroup(headPath string, groupOptions ...func(*Group)) (g *Group, err err groupCheckDuration: defaultGroupCheckDuration, minIndex: 0, maxIndex: 0, + doneProcessTicks: make(chan struct{}), } for _, option := range groupOptions { @@ -140,6 +146,11 @@ func (g *Group) OnStop() { g.Flush() // flush any uncommitted data } +func (g *Group) Wait() { + // wait for processTicks routine to finish + <-g.doneProcessTicks +} + // Close closes the head file. The group must be stopped by this moment. func (g *Group) Close() { g.Flush() // flush any uncommitted data @@ -211,6 +222,7 @@ func (g *Group) Flush() error { } func (g *Group) processTicks() { + defer close(g.doneProcessTicks) for { select { case <-g.ticker.C: diff --git a/node/node_test.go b/node/node_test.go index 06561e07d..3218c8327 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -88,13 +88,13 @@ func TestSplitAndTrimEmpty(t *testing.T) { } } -func TestNodeDelayedStop(t *testing.T) { - config := cfg.ResetTestRoot("node_delayed_node_test") +func TestNodeDelayedStart(t *testing.T) { + config := cfg.ResetTestRoot("node_delayed_start_test") now := tmtime.Now() // create & start node n, err := DefaultNewNode(config, log.TestingLogger()) - n.GenesisDoc().GenesisTime = now.Add(5 * time.Second) + n.GenesisDoc().GenesisTime = now.Add(2 * time.Second) require.NoError(t, err) n.Start() @@ -133,6 +133,7 @@ func TestNodeSetPrivValTCP(t *testing.T) { types.NewMockPV(), dialer, ) + privval.RemoteSignerConnDeadline(100 * time.Millisecond)(pvsc) go func() { err := pvsc.Start() @@ -172,20 +173,18 @@ func TestNodeSetPrivValIPC(t *testing.T) { types.NewMockPV(), dialer, ) + privval.RemoteSignerConnDeadline(100 * time.Millisecond)(pvsc) - done := make(chan struct{}) go func() { - defer close(done) - n, err := DefaultNewNode(config, log.TestingLogger()) + err := pvsc.Start() require.NoError(t, err) - assert.IsType(t, &privval.SocketVal{}, n.PrivValidator()) }() + defer pvsc.Stop() - err := pvsc.Start() + n, err := DefaultNewNode(config, log.TestingLogger()) require.NoError(t, err) - defer pvsc.Stop() + assert.IsType(t, &privval.SocketVal{}, n.PrivValidator()) - <-done } // testFreeAddr claims a free port so we don't block on listener being ready. diff --git a/p2p/test_util.go b/p2p/test_util.go index aad32ef8f..2d320df85 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -125,7 +125,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error { return err } - ni, err := handshake(conn, 50*time.Millisecond, sw.nodeInfo) + ni, err := handshake(conn, time.Second, sw.nodeInfo) if err != nil { if err := conn.Close(); err != nil { sw.Logger.Error("Error closing connection", "err", err) diff --git a/privval/client.go b/privval/client.go index 1ad104d8d..11151fee3 100644 --- a/privval/client.go +++ b/privval/client.go @@ -53,9 +53,11 @@ type SocketVal struct { // reset if the connection fails. // failures are detected by a background // ping routine. + // All messages are request/response, so we hold the mutex + // so only one request/response pair can happen at a time. // Methods on the underlying net.Conn itself // are already gorountine safe. - mtx sync.RWMutex + mtx sync.Mutex signer *RemoteSignerClient } @@ -82,22 +84,22 @@ func NewSocketVal( // GetPubKey implements PrivValidator. func (sc *SocketVal) GetPubKey() crypto.PubKey { - sc.mtx.RLock() - defer sc.mtx.RUnlock() + sc.mtx.Lock() + defer sc.mtx.Unlock() return sc.signer.GetPubKey() } // SignVote implements PrivValidator. func (sc *SocketVal) SignVote(chainID string, vote *types.Vote) error { - sc.mtx.RLock() - defer sc.mtx.RUnlock() + sc.mtx.Lock() + defer sc.mtx.Unlock() return sc.signer.SignVote(chainID, vote) } // SignProposal implements PrivValidator. func (sc *SocketVal) SignProposal(chainID string, proposal *types.Proposal) error { - sc.mtx.RLock() - defer sc.mtx.RUnlock() + sc.mtx.Lock() + defer sc.mtx.Unlock() return sc.signer.SignProposal(chainID, proposal) } @@ -106,15 +108,15 @@ func (sc *SocketVal) SignProposal(chainID string, proposal *types.Proposal) erro // Ping is used to check connection health. func (sc *SocketVal) Ping() error { - sc.mtx.RLock() - defer sc.mtx.RUnlock() + sc.mtx.Lock() + defer sc.mtx.Unlock() return sc.signer.Ping() } // Close closes the underlying net.Conn. func (sc *SocketVal) Close() { - sc.mtx.RLock() - defer sc.mtx.RUnlock() + sc.mtx.Lock() + defer sc.mtx.Unlock() if sc.signer != nil { if err := sc.signer.Close(); err != nil { sc.Logger.Error("OnStop", "err", err)