diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 724dd056b..4a272fa30 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -228,15 +228,15 @@ func (e ReachedHeightToStopError) Error() string { // Write simulate WAL's crashing by sending an error to the panicCh and then // exiting the cs.receiveRoutine. -func (w *crashingWAL) Write(m WALMessage) { +func (w *crashingWAL) Write(m WALMessage) error { if endMsg, ok := m.(EndHeightMessage); ok { if endMsg.Height == w.heightToStop { w.panicCh <- ReachedHeightToStopError{endMsg.Height} runtime.Goexit() - } else { - w.next.Write(m) + return nil } - return + + return w.next.Write(m) } if w.msgIndex > w.lastPanickedForMsgIndex { @@ -244,14 +244,15 @@ func (w *crashingWAL) Write(m WALMessage) { _, file, line, _ := runtime.Caller(1) w.panicCh <- WALWriteError{fmt.Sprintf("failed to write %T to WAL (fileline: %s:%d)", m, file, line)} runtime.Goexit() - } else { - w.msgIndex++ - w.next.Write(m) + return nil } + + w.msgIndex++ + return w.next.Write(m) } -func (w *crashingWAL) WriteSync(m WALMessage) { - w.Write(m) +func (w *crashingWAL) WriteSync(m WALMessage) error { + return w.Write(m) } func (w *crashingWAL) FlushAndSync() error { return w.next.FlushAndSync() } diff --git a/consensus/state.go b/consensus/state.go index 1f6bad9ab..78c04d999 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -632,7 +632,10 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { // may generate internal events (votes, complete proposals, 2/3 majorities) cs.handleMsg(mi) case mi = <-cs.internalMsgQueue: - cs.wal.WriteSync(mi) // NOTE: fsync + err := cs.wal.WriteSync(mi) // NOTE: fsync + if err != nil { + panic(fmt.Sprintf("Failed to write %v msg to consensus wal due to %v. Check your FS and restart the node", mi, err)) + } if _, ok := mi.Msg.(*VoteMessage); ok { // we actually want to simulate failing during @@ -1313,7 +1316,10 @@ func (cs *ConsensusState) finalizeCommit(height int64) { // Either way, the ConsensusState should not be resumed until we // successfully call ApplyBlock (ie. later here, or in Handshake after // restart). - cs.wal.WriteSync(EndHeightMessage{height}) // NOTE: fsync + me := EndHeightMessage{height} + if err := cs.wal.WriteSync(me); err != nil { // NOTE: fsync + panic(fmt.Sprintf("Failed to write %v msg to consensus wal due to %v. Check your FS and restart the node", me, err)) + } fail.Fail() // XXX diff --git a/consensus/wal.go b/consensus/wal.go index c63c6b940..373f01a16 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -29,8 +29,9 @@ const ( //-------------------------------------------------------- // types and functions for savings consensus messages +// TimedWALMessage wraps WALMessage and adds Time for debugging purposes. type TimedWALMessage struct { - Time time.Time `json:"time"` // for debugging purposes + Time time.Time `json:"time"` Msg WALMessage `json:"msg"` } @@ -55,8 +56,8 @@ func RegisterWALMessages(cdc *amino.Codec) { // WAL is an interface for any write-ahead logger. type WAL interface { - Write(WALMessage) - WriteSync(WALMessage) + Write(WALMessage) error + WriteSync(WALMessage) error FlushAndSync() error SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) @@ -174,29 +175,39 @@ func (wal *baseWAL) Wait() { // Write is called in newStep and for each receive on the // peerMsgQueue and the timeoutTicker. // NOTE: does not call fsync() -func (wal *baseWAL) Write(msg WALMessage) { +func (wal *baseWAL) Write(msg WALMessage) error { if wal == nil { - return + return nil } - // Write the wal message if err := wal.enc.Encode(&TimedWALMessage{tmtime.Now(), msg}); err != nil { - panic(fmt.Sprintf("Error writing msg to consensus wal: %v \n\nMessage: %v", err, msg)) + wal.Logger.Error("Error writing msg to consensus wal. WARNING: recover may not be possible for the current height", + "err", err, "msg", msg) + return err } + + return nil } // WriteSync is called when we receive a msg from ourselves // so that we write to disk before sending signed messages. // NOTE: calls fsync() -func (wal *baseWAL) WriteSync(msg WALMessage) { +func (wal *baseWAL) WriteSync(msg WALMessage) error { if wal == nil { - return + return nil + } + + if err := wal.Write(msg); err != nil { + return err } - wal.Write(msg) if err := wal.FlushAndSync(); err != nil { - panic(fmt.Sprintf("Error flushing consensus wal buf to file. Error: %v \n", err)) + wal.Logger.Error("WriteSync failed to flush consensus wal. WARNING: may result in creating alternative proposals / votes for the current height iff the node restarted", + "err", err) + return err } + + return nil } // WALSearchOptions are optional arguments to SearchForEndHeight. @@ -285,7 +296,7 @@ func (enc *WALEncoder) Encode(v *TimedWALMessage) error { crc := crc32.Checksum(data, crc32c) length := uint32(len(data)) if length > maxMsgSizeBytes { - return fmt.Errorf("Msg is too big: %d bytes, max: %d bytes", length, maxMsgSizeBytes) + return fmt.Errorf("msg is too big: %d bytes, max: %d bytes", length, maxMsgSizeBytes) } totalLength := 8 + int(length) @@ -295,7 +306,6 @@ func (enc *WALEncoder) Encode(v *TimedWALMessage) error { copy(msg[8:], data) _, err := enc.wr.Write(msg) - return err } @@ -383,9 +393,9 @@ type nilWAL struct{} var _ WAL = nilWAL{} -func (nilWAL) Write(m WALMessage) {} -func (nilWAL) WriteSync(m WALMessage) {} -func (nilWAL) FlushAndSync() error { return nil } +func (nilWAL) Write(m WALMessage) error { return nil } +func (nilWAL) WriteSync(m WALMessage) error { return nil } +func (nilWAL) FlushAndSync() error { return nil } func (nilWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) { return nil, false, nil } diff --git a/consensus/wal_generator.go b/consensus/wal_generator.go index 2faff27b5..2ad59bd07 100644 --- a/consensus/wal_generator.go +++ b/consensus/wal_generator.go @@ -167,10 +167,10 @@ func newByteBufferWAL(logger log.Logger, enc *WALEncoder, nBlocks int64, signalS // Save writes message to the internal buffer except when heightToStop is // reached, in which case it will signal the caller via signalWhenStopsTo and // skip writing. -func (w *byteBufferWAL) Write(m WALMessage) { +func (w *byteBufferWAL) Write(m WALMessage) error { if w.stopped { w.logger.Debug("WAL already stopped. Not writing message", "msg", m) - return + return nil } if endMsg, ok := m.(EndHeightMessage); ok { @@ -179,7 +179,7 @@ func (w *byteBufferWAL) Write(m WALMessage) { w.logger.Debug("Stopping WAL at height", "height", endMsg.Height) w.signalWhenStopsTo <- struct{}{} w.stopped = true - return + return nil } } @@ -188,10 +188,12 @@ func (w *byteBufferWAL) Write(m WALMessage) { if err != nil { panic(fmt.Sprintf("failed to encode the msg %v", m)) } + + return nil } -func (w *byteBufferWAL) WriteSync(m WALMessage) { - w.Write(m) +func (w *byteBufferWAL) WriteSync(m WALMessage) error { + return w.Write(m) } func (w *byteBufferWAL) FlushAndSync() error { return nil } diff --git a/consensus/wal_test.go b/consensus/wal_test.go index 5cb73fb7f..442cdb6fc 100644 --- a/consensus/wal_test.go +++ b/consensus/wal_test.go @@ -11,14 +11,15 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/consensus/types" + "github.com/tendermint/tendermint/crypto/merkle" "github.com/tendermint/tendermint/libs/autofile" "github.com/tendermint/tendermint/libs/log" tmtypes "github.com/tendermint/tendermint/types" tmtime "github.com/tendermint/tendermint/types/time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) const ( @@ -101,7 +102,7 @@ func TestWALEncoderDecoder(t *testing.T) { } } -func TestWALWritePanicsIfMsgIsTooBig(t *testing.T) { +func TestWALWrite(t *testing.T) { walDir, err := ioutil.TempDir("", "wal") require.NoError(t, err) defer os.RemoveAll(walDir) @@ -118,7 +119,24 @@ func TestWALWritePanicsIfMsgIsTooBig(t *testing.T) { wal.Wait() }() - assert.Panics(t, func() { wal.Write(make([]byte, maxMsgSizeBytes+1)) }) + // 1) Write returns an error if msg is too big + msg := &BlockPartMessage{ + Height: 1, + Round: 1, + Part: &tmtypes.Part{ + Index: 1, + Bytes: make([]byte, 1), + Proof: merkle.SimpleProof{ + Total: 1, + Index: 1, + LeafHash: make([]byte, maxMsgSizeBytes-30), + }, + }, + } + err = wal.Write(msg) + if assert.Error(t, err) { + assert.Equal(t, "msg is too big: 1048593 bytes, max: 1048576 bytes", err.Error()) + } } func TestWALSearchForEndHeight(t *testing.T) {