|
|
@ -17,6 +17,11 @@ import ( |
|
|
|
cmn "github.com/tendermint/tmlibs/common" |
|
|
|
) |
|
|
|
|
|
|
|
const ( |
|
|
|
// must be greater than params.BlockGossipParams.BlockPartSizeBytes + a few bytes
|
|
|
|
maxMsgSizeBytes = 1024 * 1024 // 1MB
|
|
|
|
) |
|
|
|
|
|
|
|
//--------------------------------------------------------
|
|
|
|
// types and functions for savings consensus messages
|
|
|
|
|
|
|
@ -48,7 +53,7 @@ var _ = wire.RegisterInterface( |
|
|
|
type WAL interface { |
|
|
|
Save(WALMessage) |
|
|
|
Group() *auto.Group |
|
|
|
SearchForEndHeight(height int64) (gr *auto.GroupReader, found bool, err error) |
|
|
|
SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) |
|
|
|
|
|
|
|
Start() error |
|
|
|
Stop() error |
|
|
@ -133,12 +138,18 @@ func (wal *baseWAL) Save(msg WALMessage) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// WALSearchOptions are optional arguments to SearchForEndHeight.
|
|
|
|
type WALSearchOptions struct { |
|
|
|
// IgnoreDataCorruptionErrors set to true will result in skipping data corruption errors.
|
|
|
|
IgnoreDataCorruptionErrors bool |
|
|
|
} |
|
|
|
|
|
|
|
// SearchForEndHeight searches for the EndHeightMessage with the height and
|
|
|
|
// returns an auto.GroupReader, whenever it was found or not and an error.
|
|
|
|
// Group reader will be nil if found equals false.
|
|
|
|
//
|
|
|
|
// CONTRACT: caller must close group reader.
|
|
|
|
func (wal *baseWAL) SearchForEndHeight(height int64) (gr *auto.GroupReader, found bool, err error) { |
|
|
|
func (wal *baseWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) { |
|
|
|
var msg *TimedWALMessage |
|
|
|
|
|
|
|
// NOTE: starting from the last file in the group because we're usually
|
|
|
@ -158,7 +169,9 @@ func (wal *baseWAL) SearchForEndHeight(height int64) (gr *auto.GroupReader, foun |
|
|
|
// check next file
|
|
|
|
break |
|
|
|
} |
|
|
|
if err != nil { |
|
|
|
if options.IgnoreDataCorruptionErrors && IsDataCorruptionError(err) { |
|
|
|
// do nothing
|
|
|
|
} else if err != nil { |
|
|
|
gr.Close() |
|
|
|
return nil, false, err |
|
|
|
} |
|
|
@ -210,6 +223,25 @@ func (enc *WALEncoder) Encode(v *TimedWALMessage) error { |
|
|
|
|
|
|
|
///////////////////////////////////////////////////////////////////////////////
|
|
|
|
|
|
|
|
// IsDataCorruptionError returns true if data has been corrupted inside WAL.
|
|
|
|
func IsDataCorruptionError(err error) bool { |
|
|
|
_, ok := err.(DataCorruptionError) |
|
|
|
return ok |
|
|
|
} |
|
|
|
|
|
|
|
// DataCorruptionError is an error that occures if data on disk was corrupted.
|
|
|
|
type DataCorruptionError struct { |
|
|
|
cause error |
|
|
|
} |
|
|
|
|
|
|
|
func (e DataCorruptionError) Error() string { |
|
|
|
return fmt.Sprintf("DataCorruptionError[%v]", e.cause) |
|
|
|
} |
|
|
|
|
|
|
|
func (e DataCorruptionError) Cause() error { |
|
|
|
return e.cause |
|
|
|
} |
|
|
|
|
|
|
|
// A WALDecoder reads and decodes custom-encoded WAL messages from an input
|
|
|
|
// stream. See WALEncoder for the format used.
|
|
|
|
//
|
|
|
@ -228,7 +260,7 @@ func NewWALDecoder(rd io.Reader) *WALDecoder { |
|
|
|
func (dec *WALDecoder) Decode() (*TimedWALMessage, error) { |
|
|
|
b := make([]byte, 4) |
|
|
|
|
|
|
|
n, err := dec.rd.Read(b) |
|
|
|
_, err := dec.rd.Read(b) |
|
|
|
if err == io.EOF { |
|
|
|
return nil, err |
|
|
|
} |
|
|
@ -238,7 +270,7 @@ func (dec *WALDecoder) Decode() (*TimedWALMessage, error) { |
|
|
|
crc := binary.BigEndian.Uint32(b) |
|
|
|
|
|
|
|
b = make([]byte, 4) |
|
|
|
n, err = dec.rd.Read(b) |
|
|
|
_, err = dec.rd.Read(b) |
|
|
|
if err == io.EOF { |
|
|
|
return nil, err |
|
|
|
} |
|
|
@ -247,26 +279,30 @@ func (dec *WALDecoder) Decode() (*TimedWALMessage, error) { |
|
|
|
} |
|
|
|
length := binary.BigEndian.Uint32(b) |
|
|
|
|
|
|
|
if length > maxMsgSizeBytes { |
|
|
|
return nil, DataCorruptionError{fmt.Errorf("length %d exceeded maximum possible value of %d bytes", length, maxMsgSizeBytes)} |
|
|
|
} |
|
|
|
|
|
|
|
data := make([]byte, length) |
|
|
|
n, err = dec.rd.Read(data) |
|
|
|
_, err = dec.rd.Read(data) |
|
|
|
if err == io.EOF { |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
if err != nil { |
|
|
|
return nil, fmt.Errorf("not enough bytes for data: %v (want: %d, read: %v)", err, length, n) |
|
|
|
return nil, fmt.Errorf("failed to read data: %v", err) |
|
|
|
} |
|
|
|
|
|
|
|
// check checksum before decoding data
|
|
|
|
actualCRC := crc32.Checksum(data, crc32c) |
|
|
|
if actualCRC != crc { |
|
|
|
return nil, fmt.Errorf("checksums do not match: (read: %v, actual: %v)", crc, actualCRC) |
|
|
|
return nil, DataCorruptionError{fmt.Errorf("checksums do not match: (read: %v, actual: %v)", crc, actualCRC)} |
|
|
|
} |
|
|
|
|
|
|
|
var nn int |
|
|
|
var res *TimedWALMessage // nolint: gosimple
|
|
|
|
res = wire.ReadBinary(&TimedWALMessage{}, bytes.NewBuffer(data), int(length), &nn, &err).(*TimedWALMessage) |
|
|
|
if err != nil { |
|
|
|
return nil, fmt.Errorf("failed to decode data: %v", err) |
|
|
|
return nil, DataCorruptionError{fmt.Errorf("failed to decode data: %v", err)} |
|
|
|
} |
|
|
|
|
|
|
|
return res, err |
|
|
@ -276,7 +312,7 @@ type nilWAL struct{} |
|
|
|
|
|
|
|
func (nilWAL) Save(m WALMessage) {} |
|
|
|
func (nilWAL) Group() *auto.Group { return nil } |
|
|
|
func (nilWAL) SearchForEndHeight(height int64) (gr *auto.GroupReader, found bool, err error) { |
|
|
|
func (nilWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) { |
|
|
|
return nil, false, nil |
|
|
|
} |
|
|
|
func (nilWAL) Start() error { return nil } |
|
|
|