Browse Source

Merge pull request #1601 from tendermint/bucky/wal-sync

consensus: only fsync wal after internal msgs
pull/1599/head v0.19.5-rc1
Ethan Buchman 7 years ago
committed by GitHub
parent
commit
018e096748
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 43 additions and 25 deletions
  1. +0
    -4
      .github/ISSUE_TEMPLATE
  2. +1
    -0
      CHANGELOG.md
  3. +8
    -4
      consensus/replay_test.go
  4. +5
    -5
      consensus/state.go
  5. +22
    -9
      consensus/wal.go
  6. +6
    -2
      consensus/wal_generator.go
  7. +1
    -1
      node/node_test.go

+ 0
- 4
.github/ISSUE_TEMPLATE View File

@ -19,10 +19,6 @@ in a case of bug.
**ABCI app** (name for built-in, URL for self-written if it's publicly available):
**Merkleeyes version** (use `git rev-parse --verify HEAD`, skip if you don't use it):
**Environment**:
- **OS** (e.g. from /etc/os-release):
- **Install tools**:


+ 1
- 0
CHANGELOG.md View File

@ -20,6 +20,7 @@ FEATURES
IMPROVEMENTS
- [docs] Lots of updates
- [consensus] Only Fsync() the WAL before executing msgs from ourselves
BUG FIXES


+ 8
- 4
consensus/replay_test.go View File

@ -218,15 +218,15 @@ func (e ReachedHeightToStopError) Error() string {
return fmt.Sprintf("reached height to stop %d", e.height)
}
// Save simulate WAL's crashing by sending an error to the panicCh and then
// Write simulate WAL's crashing by sending an error to the panicCh and then
// exiting the cs.receiveRoutine.
func (w *crashingWAL) Save(m WALMessage) {
func (w *crashingWAL) Write(m WALMessage) {
if endMsg, ok := m.(EndHeightMessage); ok {
if endMsg.Height == w.heightToStop {
w.panicCh <- ReachedHeightToStopError{endMsg.Height}
runtime.Goexit()
} else {
w.next.Save(m)
w.next.Write(m)
}
return
}
@ -238,10 +238,14 @@ func (w *crashingWAL) Save(m WALMessage) {
runtime.Goexit()
} else {
w.msgIndex++
w.next.Save(m)
w.next.Write(m)
}
}
func (w *crashingWAL) WriteSync(m WALMessage) {
w.Write(m)
}
func (w *crashingWAL) Group() *auto.Group { return w.next.Group() }
func (w *crashingWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) {
return w.next.SearchForEndHeight(height, options)


+ 5
- 5
consensus/state.go View File

@ -504,7 +504,7 @@ func (cs *ConsensusState) updateToState(state sm.State) {
func (cs *ConsensusState) newStep() {
rs := cs.RoundStateEvent()
cs.wal.Save(rs)
cs.wal.Write(rs)
cs.nSteps++
// newStep is called by updateToStep in NewConsensusState before the eventBus is set!
if cs.eventBus != nil {
@ -542,16 +542,16 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) {
case height := <-cs.mempool.TxsAvailable():
cs.handleTxsAvailable(height)
case mi = <-cs.peerMsgQueue:
cs.wal.Save(mi)
cs.wal.Write(mi)
// handles proposals, block parts, votes
// may generate internal events (votes, complete proposals, 2/3 majorities)
cs.handleMsg(mi)
case mi = <-cs.internalMsgQueue:
cs.wal.Save(mi)
cs.wal.WriteSync(mi) // NOTE: fsync
// handles proposals, block parts, votes
cs.handleMsg(mi)
case ti := <-cs.timeoutTicker.Chan(): // tockChan:
cs.wal.Save(ti)
cs.wal.Write(ti)
// if the timeout is relevant to the rs
// go to the next step
cs.handleTimeout(ti, rs)
@ -1241,7 +1241,7 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
// Either way, the ConsensusState should not be resumed until we
// successfully call ApplyBlock (ie. later here, or in Handshake after
// restart).
cs.wal.Save(EndHeightMessage{height})
cs.wal.WriteSync(EndHeightMessage{height}) // NOTE: fsync
fail.Fail() // XXX


+ 22
- 9
consensus/wal.go View File

@ -50,7 +50,8 @@ func RegisterWALMessages(cdc *amino.Codec) {
// WAL is an interface for any write-ahead logger.
type WAL interface {
Save(WALMessage)
Write(WALMessage)
WriteSync(WALMessage)
Group() *auto.Group
SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error)
@ -98,7 +99,7 @@ func (wal *baseWAL) OnStart() error {
if err != nil {
return err
} else if size == 0 {
wal.Save(EndHeightMessage{0})
wal.WriteSync(EndHeightMessage{0})
}
err = wal.group.Start()
return err
@ -109,20 +110,31 @@ func (wal *baseWAL) OnStop() {
wal.group.Stop()
}
// called in newStep and for each pass in receiveRoutine
func (wal *baseWAL) Save(msg WALMessage) {
// Write is called in newStep and for each receive on the
// peerMsgQueue and the timoutTicker.
// NOTE: does not call fsync()
func (wal *baseWAL) Write(msg WALMessage) {
if wal == nil {
return
}
// Write the wal message
if err := wal.enc.Encode(&TimedWALMessage{time.Now(), msg}); err != nil {
cmn.PanicQ(cmn.Fmt("Error writing msg to consensus wal: %v \n\nMessage: %v", err, msg))
panic(cmn.Fmt("Error writing msg to consensus wal: %v \n\nMessage: %v", err, msg))
}
}
// WriteSync is called when we receive a msg from ourselves
// so that we write to disk before sending signed messages.
// NOTE: calls fsync()
func (wal *baseWAL) WriteSync(msg WALMessage) {
if wal == nil {
return
}
// TODO: only flush when necessary
wal.Write(msg)
if err := wal.group.Flush(); err != nil {
cmn.PanicQ(cmn.Fmt("Error flushing consensus wal buf to file. Error: %v \n", err))
panic(cmn.Fmt("Error flushing consensus wal buf to file. Error: %v \n", err))
}
}
@ -297,8 +309,9 @@ func (dec *WALDecoder) Decode() (*TimedWALMessage, error) {
type nilWAL struct{}
func (nilWAL) Save(m WALMessage) {}
func (nilWAL) Group() *auto.Group { return nil }
func (nilWAL) Write(m WALMessage) {}
func (nilWAL) WriteSync(m WALMessage) {}
func (nilWAL) Group() *auto.Group { return nil }
func (nilWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) {
return nil, false, nil
}


+ 6
- 2
consensus/wal_generator.go View File

@ -83,7 +83,7 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) {
numBlocksWritten := make(chan struct{})
wal := newByteBufferWAL(logger, NewWALEncoder(wr), int64(numBlocks), numBlocksWritten)
// see wal.go#103
wal.Save(EndHeightMessage{0})
wal.Write(EndHeightMessage{0})
consensusState.wal = wal
if err := consensusState.Start(); err != nil {
@ -166,7 +166,7 @@ func newByteBufferWAL(logger log.Logger, enc *WALEncoder, nBlocks int64, signalS
// Save writes message to the internal buffer except when heightToStop is
// reached, in which case it will signal the caller via signalWhenStopsTo and
// skip writing.
func (w *byteBufferWAL) Save(m WALMessage) {
func (w *byteBufferWAL) Write(m WALMessage) {
if w.stopped {
w.logger.Debug("WAL already stopped. Not writing message", "msg", m)
return
@ -189,6 +189,10 @@ func (w *byteBufferWAL) Save(m WALMessage) {
}
}
func (w *byteBufferWAL) WriteSync(m WALMessage) {
w.Write(m)
}
func (w *byteBufferWAL) Group() *auto.Group {
panic("not implemented")
}


+ 1
- 1
node/node_test.go View File

@ -31,7 +31,7 @@ func TestNodeStartStop(t *testing.T) {
assert.NoError(t, err)
select {
case <-blockCh:
case <-time.After(5 * time.Second):
case <-time.After(10 * time.Second):
t.Fatal("timed out waiting for the node to produce a block")
}


Loading…
Cancel
Save