You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

153 lines
3.5 KiB

8 years ago
8 years ago
  1. package consensus
  2. import (
  3. "bufio"
  4. "os"
  5. "time"
  6. . "github.com/tendermint/go-common"
  7. "github.com/tendermint/go-wire"
  8. "github.com/tendermint/tendermint/types"
  9. )
  10. //--------------------------------------------------------
  11. // types and functions for savings consensus messages
  12. type ConsensusLogMessage struct {
  13. Time time.Time `json:"time"`
  14. Msg ConsensusLogMessageInterface `json:"msg"`
  15. }
  16. type ConsensusLogMessageInterface interface{}
  17. var _ = wire.RegisterInterface(
  18. struct{ ConsensusLogMessageInterface }{},
  19. wire.ConcreteType{types.EventDataRoundState{}, 0x01},
  20. wire.ConcreteType{msgInfo{}, 0x02},
  21. wire.ConcreteType{timeoutInfo{}, 0x03},
  22. )
  23. //--------------------------------------------------------
  24. // Simple write-ahead logger
  25. // Write ahead logger writes msgs to disk before they are processed.
  26. // Can be used for crash-recovery and deterministic replay
  27. // TODO: currently the wal is overwritten during replay catchup
  28. // give it a mode so it's either reading or appending - must read to end to start appending again
  29. type WAL struct {
  30. fp *os.File
  31. exists bool // if the file already existed (restarted process)
  32. done chan struct{}
  33. light bool // ignore block parts
  34. }
  35. func NewWAL(file string, light bool) (*WAL, error) {
  36. var walExists bool
  37. if _, err := os.Stat(file); err == nil {
  38. walExists = true
  39. }
  40. fp, err := os.OpenFile(file, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
  41. if err != nil {
  42. return nil, err
  43. }
  44. return &WAL{
  45. fp: fp,
  46. exists: walExists,
  47. done: make(chan struct{}),
  48. light: light,
  49. }, nil
  50. }
  51. func (wal *WAL) Exists() bool {
  52. if wal == nil {
  53. log.Warn("consensus msg log is nil")
  54. return false
  55. }
  56. return wal.exists
  57. }
  58. // called in newStep and for each pass in receiveRoutine
  59. func (wal *WAL) Save(clm ConsensusLogMessageInterface) {
  60. if wal == nil {
  61. return
  62. }
  63. if wal.light {
  64. // in light mode we only write new steps, timeouts, and our own votes (no proposals, block parts)
  65. if mi, ok := clm.(msgInfo); ok {
  66. _ = mi
  67. if mi.PeerKey != "" {
  68. return
  69. }
  70. }
  71. }
  72. var clmBytes = wire.JSONBytes(ConsensusLogMessage{time.Now(), clm})
  73. var n int
  74. var err error
  75. wire.WriteTo(append(clmBytes, byte('\n')), wal.fp, &n, &err) // one message per line
  76. if err != nil {
  77. PanicQ(Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, clm))
  78. }
  79. }
  80. // Must not be called concurrently with a write.
  81. func (wal *WAL) Close() {
  82. if wal != nil {
  83. wal.fp.Close()
  84. }
  85. wal.done <- struct{}{}
  86. }
  87. func (wal *WAL) Wait() {
  88. <-wal.done
  89. }
  90. // TODO: remove once we stop supporting older golang version
  91. const (
  92. ioSeekStart = 0
  93. ioSeekCurrent = 1
  94. ioSeekEnd = 2
  95. )
  96. func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) {
  97. var current int64
  98. // start at the end
  99. current, err = wal.fp.Seek(0, ioSeekEnd)
  100. if err != nil {
  101. return
  102. }
  103. // backup until we find the the right line
  104. // current is how far we are from the beginning
  105. for {
  106. current -= 1
  107. if current < 0 {
  108. wal.fp.Seek(0, ioSeekStart) // back to beginning
  109. return
  110. }
  111. // backup one and read a new byte
  112. if _, err = wal.fp.Seek(current, ioSeekStart); err != nil {
  113. return
  114. }
  115. b := make([]byte, 1)
  116. if _, err = wal.fp.Read(b); err != nil {
  117. return
  118. }
  119. if b[0] == '\n' || len(b) == 0 {
  120. nLines += 1
  121. // read a full line
  122. reader := bufio.NewReader(wal.fp)
  123. lineBytes, _ := reader.ReadBytes('\n')
  124. if len(lineBytes) == 0 {
  125. continue
  126. }
  127. if found(lineBytes) {
  128. wal.fp.Seek(0, ioSeekCurrent) // (?)
  129. wal.fp.Seek(current, ioSeekStart)
  130. return
  131. }
  132. }
  133. }
  134. }