You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

323 lines
8.7 KiB

  1. package consensus
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "hash/crc32"
  6. "io"
  7. "path/filepath"
  8. "time"
  9. "github.com/pkg/errors"
  10. amino "github.com/tendermint/go-amino"
  11. "github.com/tendermint/tendermint/types"
  12. auto "github.com/tendermint/tmlibs/autofile"
  13. cmn "github.com/tendermint/tmlibs/common"
  14. )
  15. const (
  16. // must be greater than params.BlockGossip.BlockPartSizeBytes + a few bytes
  17. maxMsgSizeBytes = 1024 * 1024 // 1MB
  18. )
  19. //--------------------------------------------------------
  20. // types and functions for savings consensus messages
  21. type TimedWALMessage struct {
  22. Time time.Time `json:"time"` // for debugging purposes
  23. Msg WALMessage `json:"msg"`
  24. }
  25. // EndHeightMessage marks the end of the given height inside WAL.
  26. // @internal used by scripts/wal2json util.
  27. type EndHeightMessage struct {
  28. Height int64 `json:"height"`
  29. }
  30. type WALMessage interface{}
  31. func RegisterWALMessages(cdc *amino.Codec) {
  32. cdc.RegisterInterface((*WALMessage)(nil), nil)
  33. cdc.RegisterConcrete(types.EventDataRoundState{}, "tendermint/wal/EventDataRoundState", nil)
  34. cdc.RegisterConcrete(msgInfo{}, "tendermint/wal/MsgInfo", nil)
  35. cdc.RegisterConcrete(timeoutInfo{}, "tendermint/wal/TimeoutInfo", nil)
  36. cdc.RegisterConcrete(EndHeightMessage{}, "tendermint/wal/EndHeightMessage", nil)
  37. }
  38. //--------------------------------------------------------
  39. // Simple write-ahead logger
  40. // WAL is an interface for any write-ahead logger.
  41. type WAL interface {
  42. Write(WALMessage)
  43. WriteSync(WALMessage)
  44. Group() *auto.Group
  45. SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error)
  46. Start() error
  47. Stop() error
  48. Wait()
  49. }
  50. // Write ahead logger writes msgs to disk before they are processed.
  51. // Can be used for crash-recovery and deterministic replay
  52. // TODO: currently the wal is overwritten during replay catchup
  53. // give it a mode so it's either reading or appending - must read to end to start appending again
  54. type baseWAL struct {
  55. cmn.BaseService
  56. group *auto.Group
  57. enc *WALEncoder
  58. }
  59. func NewWAL(walFile string) (*baseWAL, error) {
  60. err := cmn.EnsureDir(filepath.Dir(walFile), 0700)
  61. if err != nil {
  62. return nil, errors.Wrap(err, "failed to ensure WAL directory is in place")
  63. }
  64. group, err := auto.OpenGroup(walFile)
  65. if err != nil {
  66. return nil, err
  67. }
  68. wal := &baseWAL{
  69. group: group,
  70. enc: NewWALEncoder(group),
  71. }
  72. wal.BaseService = *cmn.NewBaseService(nil, "baseWAL", wal)
  73. return wal, nil
  74. }
  75. func (wal *baseWAL) Group() *auto.Group {
  76. return wal.group
  77. }
  78. func (wal *baseWAL) OnStart() error {
  79. size, err := wal.group.Head.Size()
  80. if err != nil {
  81. return err
  82. } else if size == 0 {
  83. wal.WriteSync(EndHeightMessage{0})
  84. }
  85. err = wal.group.Start()
  86. return err
  87. }
  88. func (wal *baseWAL) OnStop() {
  89. wal.group.Stop()
  90. wal.group.Close()
  91. }
  92. // Write is called in newStep and for each receive on the
  93. // peerMsgQueue and the timeoutTicker.
  94. // NOTE: does not call fsync()
  95. func (wal *baseWAL) Write(msg WALMessage) {
  96. if wal == nil {
  97. return
  98. }
  99. // Write the wal message
  100. if err := wal.enc.Encode(&TimedWALMessage{time.Now(), msg}); err != nil {
  101. panic(cmn.Fmt("Error writing msg to consensus wal: %v \n\nMessage: %v", err, msg))
  102. }
  103. }
  104. // WriteSync is called when we receive a msg from ourselves
  105. // so that we write to disk before sending signed messages.
  106. // NOTE: calls fsync()
  107. func (wal *baseWAL) WriteSync(msg WALMessage) {
  108. if wal == nil {
  109. return
  110. }
  111. wal.Write(msg)
  112. if err := wal.group.Flush(); err != nil {
  113. panic(cmn.Fmt("Error flushing consensus wal buf to file. Error: %v \n", err))
  114. }
  115. }
  116. // WALSearchOptions are optional arguments to SearchForEndHeight.
  117. type WALSearchOptions struct {
  118. // IgnoreDataCorruptionErrors set to true will result in skipping data corruption errors.
  119. IgnoreDataCorruptionErrors bool
  120. }
  121. // SearchForEndHeight searches for the EndHeightMessage with the given height
  122. // and returns an auto.GroupReader, whenever it was found or not and an error.
  123. // Group reader will be nil if found equals false.
  124. //
  125. // CONTRACT: caller must close group reader.
  126. func (wal *baseWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) {
  127. var msg *TimedWALMessage
  128. lastHeightFound := int64(-1)
  129. // NOTE: starting from the last file in the group because we're usually
  130. // searching for the last height. See replay.go
  131. min, max := wal.group.MinIndex(), wal.group.MaxIndex()
  132. wal.Logger.Debug("Searching for height", "height", height, "min", min, "max", max)
  133. for index := max; index >= min; index-- {
  134. gr, err = wal.group.NewReader(index)
  135. if err != nil {
  136. return nil, false, err
  137. }
  138. dec := NewWALDecoder(gr)
  139. for {
  140. msg, err = dec.Decode()
  141. if err == io.EOF {
  142. // OPTIMISATION: no need to look for height in older files if we've seen h < height
  143. if lastHeightFound > 0 && lastHeightFound < height {
  144. gr.Close()
  145. return nil, false, nil
  146. }
  147. // check next file
  148. break
  149. }
  150. if options.IgnoreDataCorruptionErrors && IsDataCorruptionError(err) {
  151. wal.Logger.Debug("Corrupted entry. Skipping...", "err", err)
  152. // do nothing
  153. continue
  154. } else if err != nil {
  155. gr.Close()
  156. return nil, false, err
  157. }
  158. if m, ok := msg.Msg.(EndHeightMessage); ok {
  159. lastHeightFound = m.Height
  160. if m.Height == height { // found
  161. wal.Logger.Debug("Found", "height", height, "index", index)
  162. return gr, true, nil
  163. }
  164. }
  165. }
  166. gr.Close()
  167. }
  168. return nil, false, nil
  169. }
  170. ///////////////////////////////////////////////////////////////////////////////
  171. // A WALEncoder writes custom-encoded WAL messages to an output stream.
  172. //
  173. // Format: 4 bytes CRC sum + 4 bytes length + arbitrary-length value (go-amino encoded)
  174. type WALEncoder struct {
  175. wr io.Writer
  176. }
  177. // NewWALEncoder returns a new encoder that writes to wr.
  178. func NewWALEncoder(wr io.Writer) *WALEncoder {
  179. return &WALEncoder{wr}
  180. }
  181. // Encode writes the custom encoding of v to the stream.
  182. func (enc *WALEncoder) Encode(v *TimedWALMessage) error {
  183. data := cdc.MustMarshalBinaryBare(v)
  184. crc := crc32.Checksum(data, crc32c)
  185. length := uint32(len(data))
  186. totalLength := 8 + int(length)
  187. msg := make([]byte, totalLength)
  188. binary.BigEndian.PutUint32(msg[0:4], crc)
  189. binary.BigEndian.PutUint32(msg[4:8], length)
  190. copy(msg[8:], data)
  191. _, err := enc.wr.Write(msg)
  192. return err
  193. }
  194. ///////////////////////////////////////////////////////////////////////////////
  195. // IsDataCorruptionError returns true if data has been corrupted inside WAL.
  196. func IsDataCorruptionError(err error) bool {
  197. _, ok := err.(DataCorruptionError)
  198. return ok
  199. }
  200. // DataCorruptionError is an error that occures if data on disk was corrupted.
  201. type DataCorruptionError struct {
  202. cause error
  203. }
  204. func (e DataCorruptionError) Error() string {
  205. return fmt.Sprintf("DataCorruptionError[%v]", e.cause)
  206. }
  207. func (e DataCorruptionError) Cause() error {
  208. return e.cause
  209. }
  210. // A WALDecoder reads and decodes custom-encoded WAL messages from an input
  211. // stream. See WALEncoder for the format used.
  212. //
  213. // It will also compare the checksums and make sure data size is equal to the
  214. // length from the header. If that is not the case, error will be returned.
  215. type WALDecoder struct {
  216. rd io.Reader
  217. }
  218. // NewWALDecoder returns a new decoder that reads from rd.
  219. func NewWALDecoder(rd io.Reader) *WALDecoder {
  220. return &WALDecoder{rd}
  221. }
  222. // Decode reads the next custom-encoded value from its reader and returns it.
  223. func (dec *WALDecoder) Decode() (*TimedWALMessage, error) {
  224. b := make([]byte, 4)
  225. _, err := dec.rd.Read(b)
  226. if err == io.EOF {
  227. return nil, err
  228. }
  229. if err != nil {
  230. return nil, fmt.Errorf("failed to read checksum: %v", err)
  231. }
  232. crc := binary.BigEndian.Uint32(b)
  233. b = make([]byte, 4)
  234. _, err = dec.rd.Read(b)
  235. if err != nil {
  236. return nil, fmt.Errorf("failed to read length: %v", err)
  237. }
  238. length := binary.BigEndian.Uint32(b)
  239. if length > maxMsgSizeBytes {
  240. return nil, fmt.Errorf("length %d exceeded maximum possible value of %d bytes", length, maxMsgSizeBytes)
  241. }
  242. data := make([]byte, length)
  243. _, err = dec.rd.Read(data)
  244. if err != nil {
  245. return nil, fmt.Errorf("failed to read data: %v", err)
  246. }
  247. // check checksum before decoding data
  248. actualCRC := crc32.Checksum(data, crc32c)
  249. if actualCRC != crc {
  250. return nil, DataCorruptionError{fmt.Errorf("checksums do not match: (read: %v, actual: %v)", crc, actualCRC)}
  251. }
  252. var res = new(TimedWALMessage) // nolint: gosimple
  253. err = cdc.UnmarshalBinaryBare(data, res)
  254. if err != nil {
  255. return nil, DataCorruptionError{fmt.Errorf("failed to decode data: %v", err)}
  256. }
  257. return res, err
  258. }
  259. type nilWAL struct{}
  260. func (nilWAL) Write(m WALMessage) {}
  261. func (nilWAL) WriteSync(m WALMessage) {}
  262. func (nilWAL) Group() *auto.Group { return nil }
  263. func (nilWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) {
  264. return nil, false, nil
  265. }
  266. func (nilWAL) Start() error { return nil }
  267. func (nilWAL) Stop() error { return nil }
  268. func (nilWAL) Wait() {}