You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

117 lines
2.8 KiB

  1. package consensus
  2. import (
  3. "bufio"
  4. "os"
  5. "time"
  6. . "github.com/tendermint/go-common"
  7. "github.com/tendermint/go-wire"
  8. "github.com/tendermint/tendermint/types"
  9. )
  10. //--------------------------------------------------------
  11. // types and functions for savings consensus messages
  12. type ConsensusLogMessage struct {
  13. Time time.Time `json:"time"`
  14. Msg ConsensusLogMessageInterface `json:"msg"`
  15. }
  16. type ConsensusLogMessageInterface interface{}
  17. var _ = wire.RegisterInterface(
  18. struct{ ConsensusLogMessageInterface }{},
  19. wire.ConcreteType{types.EventDataRoundState{}, 0x01},
  20. wire.ConcreteType{msgInfo{}, 0x02},
  21. wire.ConcreteType{timeoutInfo{}, 0x03},
  22. )
  23. //--------------------------------------------------------
  24. // Simple write-ahead logger
  25. // Write ahead logger writes msgs to disk before they are processed.
  26. // Can be used for crash-recovery and deterministic replay
  27. // TODO: currently the wal is overwritten during replay catchup
  28. // give it a mode so it's either reading or appending - must read to end to start appending again
  29. type WAL struct {
  30. fp *os.File
  31. exists bool // if the file already existed (restarted process)
  32. }
  33. func NewWAL(file string) (*WAL, error) {
  34. var walExists bool
  35. if _, err := os.Stat(file); err == nil {
  36. walExists = true
  37. }
  38. fp, err := os.OpenFile(file, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
  39. if err != nil {
  40. return nil, err
  41. }
  42. return &WAL{
  43. fp: fp,
  44. exists: walExists,
  45. }, nil
  46. }
  47. // called in newStep and for each pass in receiveRoutine
  48. func (wal *WAL) Save(msg ConsensusLogMessageInterface) {
  49. if wal != nil {
  50. var n int
  51. var err error
  52. wire.WriteJSON(ConsensusLogMessage{time.Now(), msg}, wal.fp, &n, &err)
  53. wire.WriteTo([]byte("\n"), wal.fp, &n, &err) // one message per line
  54. if err != nil {
  55. PanicQ(Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, msg))
  56. }
  57. }
  58. }
  59. // Must not be called concurrently.
  60. func (wal *WAL) Close() {
  61. if wal != nil {
  62. wal.fp.Close()
  63. }
  64. }
  65. func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) {
  66. var current int64
  67. // start at the end
  68. current, err = wal.fp.Seek(0, 2)
  69. if err != nil {
  70. return
  71. }
  72. // backup until we find the the right line
  73. // current is how far we are from the beginning
  74. for {
  75. current -= 1
  76. if current < 0 {
  77. wal.fp.Seek(0, 0) // back to beginning
  78. return
  79. }
  80. // backup one and read a new byte
  81. if _, err = wal.fp.Seek(current, 0); err != nil {
  82. return
  83. }
  84. b := make([]byte, 1)
  85. if _, err = wal.fp.Read(b); err != nil {
  86. return
  87. }
  88. if b[0] == '\n' || len(b) == 0 {
  89. nLines += 1
  90. // read a full line
  91. reader := bufio.NewReader(wal.fp)
  92. lineBytes, _ := reader.ReadBytes('\n')
  93. if len(lineBytes) == 0 {
  94. continue
  95. }
  96. if found(lineBytes) {
  97. wal.fp.Seek(0, 1) // (?)
  98. wal.fp.Seek(current, 0)
  99. return
  100. }
  101. }
  102. }
  103. }