You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

437 lines
12 KiB

  1. package consensus
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "fmt"
  6. "hash/crc32"
  7. "io"
  8. "path/filepath"
  9. "time"
  10. "github.com/gogo/protobuf/proto"
  11. auto "github.com/tendermint/tendermint/libs/autofile"
  12. // tmjson "github.com/tendermint/tendermint/libs/json"
  13. "github.com/tendermint/tendermint/libs/log"
  14. tmos "github.com/tendermint/tendermint/libs/os"
  15. "github.com/tendermint/tendermint/libs/service"
  16. tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
  17. tmtime "github.com/tendermint/tendermint/types/time"
  18. )
  19. const (
  20. // time.Time + max consensus msg size
  21. maxMsgSizeBytes = maxMsgSize + 24
  22. // how often the WAL should be sync'd during period sync'ing
  23. walDefaultFlushInterval = 2 * time.Second
  24. )
  25. //--------------------------------------------------------
  26. // types and functions for savings consensus messages
  27. // TimedWALMessage wraps WALMessage and adds Time for debugging purposes.
  28. type TimedWALMessage struct {
  29. Time time.Time `json:"time"`
  30. Msg WALMessage `json:"msg"`
  31. }
  32. // EndHeightMessage marks the end of the given height inside WAL.
  33. // @internal used by scripts/wal2json util.
  34. type EndHeightMessage struct {
  35. Height int64 `json:"height"`
  36. }
  37. type WALMessage interface{}
  38. // func init() {
  39. // tmjson.RegisterType(msgInfo{}, "tendermint/wal/MsgInfo")
  40. // tmjson.RegisterType(timeoutInfo{}, "tendermint/wal/TimeoutInfo")
  41. // tmjson.RegisterType(EndHeightMessage{}, "tendermint/wal/EndHeightMessage")
  42. // }
  43. //--------------------------------------------------------
  44. // Simple write-ahead logger
  45. // WAL is an interface for any write-ahead logger.
  46. type WAL interface {
  47. Write(WALMessage) error
  48. WriteSync(WALMessage) error
  49. FlushAndSync() error
  50. SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error)
  51. // service methods
  52. Start() error
  53. Stop() error
  54. Wait()
  55. }
  56. // Write ahead logger writes msgs to disk before they are processed.
  57. // Can be used for crash-recovery and deterministic replay.
  58. // TODO: currently the wal is overwritten during replay catchup, give it a mode
  59. // so it's either reading or appending - must read to end to start appending
  60. // again.
  61. type BaseWAL struct {
  62. service.BaseService
  63. group *auto.Group
  64. enc *WALEncoder
  65. flushTicker *time.Ticker
  66. flushInterval time.Duration
  67. }
  68. var _ WAL = &BaseWAL{}
  69. // NewWAL returns a new write-ahead logger based on `baseWAL`, which implements
  70. // WAL. It's flushed and synced to disk every 2s and once when stopped.
  71. func NewWAL(walFile string, groupOptions ...func(*auto.Group)) (*BaseWAL, error) {
  72. err := tmos.EnsureDir(filepath.Dir(walFile), 0700)
  73. if err != nil {
  74. return nil, fmt.Errorf("failed to ensure WAL directory is in place: %w", err)
  75. }
  76. group, err := auto.OpenGroup(walFile, groupOptions...)
  77. if err != nil {
  78. return nil, err
  79. }
  80. wal := &BaseWAL{
  81. group: group,
  82. enc: NewWALEncoder(group),
  83. flushInterval: walDefaultFlushInterval,
  84. }
  85. wal.BaseService = *service.NewBaseService(nil, "baseWAL", wal)
  86. return wal, nil
  87. }
  88. // SetFlushInterval allows us to override the periodic flush interval for the WAL.
  89. func (wal *BaseWAL) SetFlushInterval(i time.Duration) {
  90. wal.flushInterval = i
  91. }
  92. func (wal *BaseWAL) Group() *auto.Group {
  93. return wal.group
  94. }
  95. func (wal *BaseWAL) SetLogger(l log.Logger) {
  96. wal.BaseService.Logger = l
  97. wal.group.SetLogger(l)
  98. }
  99. func (wal *BaseWAL) OnStart() error {
  100. size, err := wal.group.Head.Size()
  101. if err != nil {
  102. return err
  103. } else if size == 0 {
  104. if err := wal.WriteSync(EndHeightMessage{0}); err != nil {
  105. return err
  106. }
  107. }
  108. err = wal.group.Start()
  109. if err != nil {
  110. return err
  111. }
  112. wal.flushTicker = time.NewTicker(wal.flushInterval)
  113. go wal.processFlushTicks()
  114. return nil
  115. }
  116. func (wal *BaseWAL) processFlushTicks() {
  117. for {
  118. select {
  119. case <-wal.flushTicker.C:
  120. if err := wal.FlushAndSync(); err != nil {
  121. wal.Logger.Error("Periodic WAL flush failed", "err", err)
  122. }
  123. case <-wal.Quit():
  124. return
  125. }
  126. }
  127. }
  128. // FlushAndSync flushes and fsync's the underlying group's data to disk.
  129. // See auto#FlushAndSync
  130. func (wal *BaseWAL) FlushAndSync() error {
  131. return wal.group.FlushAndSync()
  132. }
  133. // Stop the underlying autofile group.
  134. // Use Wait() to ensure it's finished shutting down
  135. // before cleaning up files.
  136. func (wal *BaseWAL) OnStop() {
  137. wal.flushTicker.Stop()
  138. if err := wal.FlushAndSync(); err != nil {
  139. wal.Logger.Error("error on flush data to disk", "error", err)
  140. }
  141. if err := wal.group.Stop(); err != nil {
  142. wal.Logger.Error("error trying to stop wal", "error", err)
  143. }
  144. wal.group.Close()
  145. }
  146. // Wait for the underlying autofile group to finish shutting down
  147. // so it's safe to cleanup files.
  148. func (wal *BaseWAL) Wait() {
  149. wal.group.Wait()
  150. }
  151. // Write is called in newStep and for each receive on the
  152. // peerMsgQueue and the timeoutTicker.
  153. // NOTE: does not call fsync()
  154. func (wal *BaseWAL) Write(msg WALMessage) error {
  155. if wal == nil {
  156. return nil
  157. }
  158. if err := wal.enc.Encode(&TimedWALMessage{tmtime.Now(), msg}); err != nil {
  159. wal.Logger.Error("Error writing msg to consensus wal. WARNING: recover may not be possible for the current height",
  160. "err", err, "msg", msg)
  161. return err
  162. }
  163. return nil
  164. }
  165. // WriteSync is called when we receive a msg from ourselves
  166. // so that we write to disk before sending signed messages.
  167. // NOTE: calls fsync()
  168. func (wal *BaseWAL) WriteSync(msg WALMessage) error {
  169. if wal == nil {
  170. return nil
  171. }
  172. if err := wal.Write(msg); err != nil {
  173. return err
  174. }
  175. if err := wal.FlushAndSync(); err != nil {
  176. wal.Logger.Error(`WriteSync failed to flush consensus wal.
  177. WARNING: may result in creating alternative proposals / votes for the current height iff the node restarted`,
  178. "err", err)
  179. return err
  180. }
  181. return nil
  182. }
  183. // WALSearchOptions are optional arguments to SearchForEndHeight.
  184. type WALSearchOptions struct {
  185. // IgnoreDataCorruptionErrors set to true will result in skipping data corruption errors.
  186. IgnoreDataCorruptionErrors bool
  187. }
  188. // SearchForEndHeight searches for the EndHeightMessage with the given height
  189. // and returns an auto.GroupReader, whenever it was found or not and an error.
  190. // Group reader will be nil if found equals false.
  191. //
  192. // CONTRACT: caller must close group reader.
  193. func (wal *BaseWAL) SearchForEndHeight(
  194. height int64,
  195. options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
  196. var (
  197. msg *TimedWALMessage
  198. gr *auto.GroupReader
  199. )
  200. lastHeightFound := int64(-1)
  201. // NOTE: starting from the last file in the group because we're usually
  202. // searching for the last height. See replay.go
  203. min, max := wal.group.MinIndex(), wal.group.MaxIndex()
  204. wal.Logger.Info("Searching for height", "height", height, "min", min, "max", max)
  205. for index := max; index >= min; index-- {
  206. gr, err = wal.group.NewReader(index)
  207. if err != nil {
  208. return nil, false, err
  209. }
  210. dec := NewWALDecoder(gr)
  211. for {
  212. msg, err = dec.Decode()
  213. if err == io.EOF {
  214. // OPTIMISATION: no need to look for height in older files if we've seen h < height
  215. if lastHeightFound > 0 && lastHeightFound < height {
  216. gr.Close()
  217. return nil, false, nil
  218. }
  219. // check next file
  220. break
  221. }
  222. if options.IgnoreDataCorruptionErrors && IsDataCorruptionError(err) {
  223. wal.Logger.Error("Corrupted entry. Skipping...", "err", err)
  224. // do nothing
  225. continue
  226. } else if err != nil {
  227. gr.Close()
  228. return nil, false, err
  229. }
  230. if m, ok := msg.Msg.(EndHeightMessage); ok {
  231. lastHeightFound = m.Height
  232. if m.Height == height { // found
  233. wal.Logger.Info("Found", "height", height, "index", index)
  234. return gr, true, nil
  235. }
  236. }
  237. }
  238. gr.Close()
  239. }
  240. return nil, false, nil
  241. }
  242. // /////////////////////////////////////////////////////////////////////////////
  243. // A WALEncoder writes custom-encoded WAL messages to an output stream.
  244. //
  245. // Format: 4 bytes CRC sum + 4 bytes length + arbitrary-length value
  246. type WALEncoder struct {
  247. wr io.Writer
  248. }
  249. // NewWALEncoder returns a new encoder that writes to wr.
  250. func NewWALEncoder(wr io.Writer) *WALEncoder {
  251. return &WALEncoder{wr}
  252. }
  253. // Encode writes the custom encoding of v to the stream. It returns an error if
  254. // the encoded size of v is greater than 1MB. Any error encountered
  255. // during the write is also returned.
  256. func (enc *WALEncoder) Encode(v *TimedWALMessage) error {
  257. pbMsg, err := WALToProto(v.Msg)
  258. if err != nil {
  259. return err
  260. }
  261. pv := tmcons.TimedWALMessage{
  262. Time: v.Time,
  263. Msg: pbMsg,
  264. }
  265. data, err := proto.Marshal(&pv)
  266. if err != nil {
  267. panic(fmt.Errorf("encode timed wall message failure: %w", err))
  268. }
  269. crc := crc32.Checksum(data, crc32c)
  270. length := uint32(len(data))
  271. if length > maxMsgSizeBytes {
  272. return fmt.Errorf("msg is too big: %d bytes, max: %d bytes", length, maxMsgSizeBytes)
  273. }
  274. totalLength := 8 + int(length)
  275. msg := make([]byte, totalLength)
  276. binary.BigEndian.PutUint32(msg[0:4], crc)
  277. binary.BigEndian.PutUint32(msg[4:8], length)
  278. copy(msg[8:], data)
  279. _, err = enc.wr.Write(msg)
  280. return err
  281. }
  282. // /////////////////////////////////////////////////////////////////////////////
  283. // IsDataCorruptionError returns true if data has been corrupted inside WAL.
  284. func IsDataCorruptionError(err error) bool {
  285. _, ok := err.(DataCorruptionError)
  286. return ok
  287. }
  288. // DataCorruptionError is an error that occures if data on disk was corrupted.
  289. type DataCorruptionError struct {
  290. cause error
  291. }
  292. func (e DataCorruptionError) Error() string {
  293. return fmt.Sprintf("DataCorruptionError[%v]", e.cause)
  294. }
  295. func (e DataCorruptionError) Cause() error {
  296. return e.cause
  297. }
  298. // A WALDecoder reads and decodes custom-encoded WAL messages from an input
  299. // stream. See WALEncoder for the format used.
  300. //
  301. // It will also compare the checksums and make sure data size is equal to the
  302. // length from the header. If that is not the case, error will be returned.
  303. type WALDecoder struct {
  304. rd io.Reader
  305. }
  306. // NewWALDecoder returns a new decoder that reads from rd.
  307. func NewWALDecoder(rd io.Reader) *WALDecoder {
  308. return &WALDecoder{rd}
  309. }
  310. // Decode reads the next custom-encoded value from its reader and returns it.
  311. func (dec *WALDecoder) Decode() (*TimedWALMessage, error) {
  312. b := make([]byte, 4)
  313. _, err := dec.rd.Read(b)
  314. if errors.Is(err, io.EOF) {
  315. return nil, err
  316. }
  317. if err != nil {
  318. return nil, DataCorruptionError{fmt.Errorf("failed to read checksum: %v", err)}
  319. }
  320. crc := binary.BigEndian.Uint32(b)
  321. b = make([]byte, 4)
  322. _, err = dec.rd.Read(b)
  323. if err != nil {
  324. return nil, DataCorruptionError{fmt.Errorf("failed to read length: %v", err)}
  325. }
  326. length := binary.BigEndian.Uint32(b)
  327. if length > maxMsgSizeBytes {
  328. return nil, DataCorruptionError{fmt.Errorf(
  329. "length %d exceeded maximum possible value of %d bytes",
  330. length,
  331. maxMsgSizeBytes)}
  332. }
  333. data := make([]byte, length)
  334. n, err := dec.rd.Read(data)
  335. if err != nil {
  336. return nil, DataCorruptionError{fmt.Errorf("failed to read data: %v (read: %d, wanted: %d)", err, n, length)}
  337. }
  338. // check checksum before decoding data
  339. actualCRC := crc32.Checksum(data, crc32c)
  340. if actualCRC != crc {
  341. return nil, DataCorruptionError{fmt.Errorf("checksums do not match: read: %v, actual: %v", crc, actualCRC)}
  342. }
  343. var res = new(tmcons.TimedWALMessage)
  344. err = proto.Unmarshal(data, res)
  345. if err != nil {
  346. return nil, DataCorruptionError{fmt.Errorf("failed to decode data: %v", err)}
  347. }
  348. walMsg, err := WALFromProto(res.Msg)
  349. if err != nil {
  350. return nil, DataCorruptionError{fmt.Errorf("failed to convert from proto: %w", err)}
  351. }
  352. tMsgWal := &TimedWALMessage{
  353. Time: res.Time,
  354. Msg: walMsg,
  355. }
  356. return tMsgWal, err
  357. }
  358. type nilWAL struct{}
  359. var _ WAL = nilWAL{}
  360. func (nilWAL) Write(m WALMessage) error { return nil }
  361. func (nilWAL) WriteSync(m WALMessage) error { return nil }
  362. func (nilWAL) FlushAndSync() error { return nil }
  363. func (nilWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
  364. return nil, false, nil
  365. }
  366. func (nilWAL) Start() error { return nil }
  367. func (nilWAL) Stop() error { return nil }
  368. func (nilWAL) Wait() {}