Browse Source

cs: panic only when WAL#WriteSync fails

- modify WAL#Write and WAL#WriteSync to return an error
v0.31
Anton Kaliaev 5 years ago
committed by Jack Zampolin
parent
commit
2abad8528b
5 changed files with 74 additions and 37 deletions
  1. +10
    -9
      consensus/replay_test.go
  2. +8
    -2
      consensus/state.go
  3. +26
    -16
      consensus/wal.go
  4. +7
    -5
      consensus/wal_generator.go
  5. +23
    -5
      consensus/wal_test.go

+ 10
- 9
consensus/replay_test.go View File

@ -228,15 +228,15 @@ func (e ReachedHeightToStopError) Error() string {
// Write simulate WAL's crashing by sending an error to the panicCh and then
// exiting the cs.receiveRoutine.
func (w *crashingWAL) Write(m WALMessage) {
func (w *crashingWAL) Write(m WALMessage) error {
if endMsg, ok := m.(EndHeightMessage); ok {
if endMsg.Height == w.heightToStop {
w.panicCh <- ReachedHeightToStopError{endMsg.Height}
runtime.Goexit()
} else {
w.next.Write(m)
return nil
}
return
return w.next.Write(m)
}
if w.msgIndex > w.lastPanickedForMsgIndex {
@ -244,14 +244,15 @@ func (w *crashingWAL) Write(m WALMessage) {
_, file, line, _ := runtime.Caller(1)
w.panicCh <- WALWriteError{fmt.Sprintf("failed to write %T to WAL (fileline: %s:%d)", m, file, line)}
runtime.Goexit()
} else {
w.msgIndex++
w.next.Write(m)
return nil
}
w.msgIndex++
return w.next.Write(m)
}
func (w *crashingWAL) WriteSync(m WALMessage) {
w.Write(m)
func (w *crashingWAL) WriteSync(m WALMessage) error {
return w.Write(m)
}
func (w *crashingWAL) FlushAndSync() error { return w.next.FlushAndSync() }


+ 8
- 2
consensus/state.go View File

@ -632,7 +632,10 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) {
// may generate internal events (votes, complete proposals, 2/3 majorities)
cs.handleMsg(mi)
case mi = <-cs.internalMsgQueue:
cs.wal.WriteSync(mi) // NOTE: fsync
err := cs.wal.WriteSync(mi) // NOTE: fsync
if err != nil {
panic(fmt.Sprintf("Failed to write %v msg to consensus wal due to %v. Check your FS and restart the node", mi, err))
}
if _, ok := mi.Msg.(*VoteMessage); ok {
// we actually want to simulate failing during
@ -1313,7 +1316,10 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
// Either way, the ConsensusState should not be resumed until we
// successfully call ApplyBlock (ie. later here, or in Handshake after
// restart).
cs.wal.WriteSync(EndHeightMessage{height}) // NOTE: fsync
me := EndHeightMessage{height}
if err := cs.wal.WriteSync(me); err != nil { // NOTE: fsync
panic(fmt.Sprintf("Failed to write %v msg to consensus wal due to %v. Check your FS and restart the node", me, err))
}
fail.Fail() // XXX


+ 26
- 16
consensus/wal.go View File

@ -29,8 +29,9 @@ const (
//--------------------------------------------------------
// types and functions for savings consensus messages
// TimedWALMessage wraps WALMessage and adds Time for debugging purposes.
type TimedWALMessage struct {
Time time.Time `json:"time"` // for debugging purposes
Time time.Time `json:"time"`
Msg WALMessage `json:"msg"`
}
@ -55,8 +56,8 @@ func RegisterWALMessages(cdc *amino.Codec) {
// WAL is an interface for any write-ahead logger.
type WAL interface {
Write(WALMessage)
WriteSync(WALMessage)
Write(WALMessage) error
WriteSync(WALMessage) error
FlushAndSync() error
SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error)
@ -174,29 +175,39 @@ func (wal *baseWAL) Wait() {
// Write is called in newStep and for each receive on the
// peerMsgQueue and the timeoutTicker.
// NOTE: does not call fsync()
func (wal *baseWAL) Write(msg WALMessage) {
func (wal *baseWAL) Write(msg WALMessage) error {
if wal == nil {
return
return nil
}
// Write the wal message
if err := wal.enc.Encode(&TimedWALMessage{tmtime.Now(), msg}); err != nil {
panic(fmt.Sprintf("Error writing msg to consensus wal: %v \n\nMessage: %v", err, msg))
wal.Logger.Error("Error writing msg to consensus wal. WARNING: recover may not be possible for the current height",
"err", err, "msg", msg)
return err
}
return nil
}
// WriteSync is called when we receive a msg from ourselves
// so that we write to disk before sending signed messages.
// NOTE: calls fsync()
func (wal *baseWAL) WriteSync(msg WALMessage) {
func (wal *baseWAL) WriteSync(msg WALMessage) error {
if wal == nil {
return
return nil
}
if err := wal.Write(msg); err != nil {
return err
}
wal.Write(msg)
if err := wal.FlushAndSync(); err != nil {
panic(fmt.Sprintf("Error flushing consensus wal buf to file. Error: %v \n", err))
wal.Logger.Error("WriteSync failed to flush consensus wal. WARNING: may result in creating alternative proposals / votes for the current height iff the node restarted",
"err", err)
return err
}
return nil
}
// WALSearchOptions are optional arguments to SearchForEndHeight.
@ -285,7 +296,7 @@ func (enc *WALEncoder) Encode(v *TimedWALMessage) error {
crc := crc32.Checksum(data, crc32c)
length := uint32(len(data))
if length > maxMsgSizeBytes {
return fmt.Errorf("Msg is too big: %d bytes, max: %d bytes", length, maxMsgSizeBytes)
return fmt.Errorf("msg is too big: %d bytes, max: %d bytes", length, maxMsgSizeBytes)
}
totalLength := 8 + int(length)
@ -295,7 +306,6 @@ func (enc *WALEncoder) Encode(v *TimedWALMessage) error {
copy(msg[8:], data)
_, err := enc.wr.Write(msg)
return err
}
@ -383,9 +393,9 @@ type nilWAL struct{}
var _ WAL = nilWAL{}
func (nilWAL) Write(m WALMessage) {}
func (nilWAL) WriteSync(m WALMessage) {}
func (nilWAL) FlushAndSync() error { return nil }
func (nilWAL) Write(m WALMessage) error { return nil }
func (nilWAL) WriteSync(m WALMessage) error { return nil }
func (nilWAL) FlushAndSync() error { return nil }
func (nilWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
return nil, false, nil
}


+ 7
- 5
consensus/wal_generator.go View File

@ -167,10 +167,10 @@ func newByteBufferWAL(logger log.Logger, enc *WALEncoder, nBlocks int64, signalS
// Save writes message to the internal buffer except when heightToStop is
// reached, in which case it will signal the caller via signalWhenStopsTo and
// skip writing.
func (w *byteBufferWAL) Write(m WALMessage) {
func (w *byteBufferWAL) Write(m WALMessage) error {
if w.stopped {
w.logger.Debug("WAL already stopped. Not writing message", "msg", m)
return
return nil
}
if endMsg, ok := m.(EndHeightMessage); ok {
@ -179,7 +179,7 @@ func (w *byteBufferWAL) Write(m WALMessage) {
w.logger.Debug("Stopping WAL at height", "height", endMsg.Height)
w.signalWhenStopsTo <- struct{}{}
w.stopped = true
return
return nil
}
}
@ -188,10 +188,12 @@ func (w *byteBufferWAL) Write(m WALMessage) {
if err != nil {
panic(fmt.Sprintf("failed to encode the msg %v", m))
}
return nil
}
func (w *byteBufferWAL) WriteSync(m WALMessage) {
w.Write(m)
func (w *byteBufferWAL) WriteSync(m WALMessage) error {
return w.Write(m)
}
func (w *byteBufferWAL) FlushAndSync() error { return nil }


+ 23
- 5
consensus/wal_test.go View File

@ -11,14 +11,15 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/crypto/merkle"
"github.com/tendermint/tendermint/libs/autofile"
"github.com/tendermint/tendermint/libs/log"
tmtypes "github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
@ -101,7 +102,7 @@ func TestWALEncoderDecoder(t *testing.T) {
}
}
func TestWALWritePanicsIfMsgIsTooBig(t *testing.T) {
func TestWALWrite(t *testing.T) {
walDir, err := ioutil.TempDir("", "wal")
require.NoError(t, err)
defer os.RemoveAll(walDir)
@ -118,7 +119,24 @@ func TestWALWritePanicsIfMsgIsTooBig(t *testing.T) {
wal.Wait()
}()
assert.Panics(t, func() { wal.Write(make([]byte, maxMsgSizeBytes+1)) })
// 1) Write returns an error if msg is too big
msg := &BlockPartMessage{
Height: 1,
Round: 1,
Part: &tmtypes.Part{
Index: 1,
Bytes: make([]byte, 1),
Proof: merkle.SimpleProof{
Total: 1,
Index: 1,
LeafHash: make([]byte, maxMsgSizeBytes-30),
},
},
}
err = wal.Write(msg)
if assert.Error(t, err) {
assert.Equal(t, "msg is too big: 1048593 bytes, max: 1048576 bytes", err.Error())
}
}
func TestWALSearchForEndHeight(t *testing.T) {


Loading…
Cancel
Save