You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

279 lines
7.0 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "fmt"
  6. "hash/crc32"
  7. "io"
  8. "time"
  9. wire "github.com/tendermint/go-wire"
  10. "github.com/tendermint/tendermint/types"
  11. auto "github.com/tendermint/tmlibs/autofile"
  12. cmn "github.com/tendermint/tmlibs/common"
  13. )
  14. //--------------------------------------------------------
  15. // types and functions for savings consensus messages
  16. var (
  17. walSeparator = []byte{55, 127, 6, 130} // 0x377f0682 - magic number
  18. )
  19. type TimedWALMessage struct {
  20. Time time.Time `json:"time"` // for debugging purposes
  21. Msg WALMessage `json:"msg"`
  22. }
  23. // EndHeightMessage marks the end of the given height inside WAL.
  24. // @internal used by scripts/cutWALUntil util.
  25. type EndHeightMessage struct {
  26. Height uint64 `json:"height"`
  27. }
  28. type WALMessage interface{}
  29. var _ = wire.RegisterInterface(
  30. struct{ WALMessage }{},
  31. wire.ConcreteType{types.EventDataRoundState{}, 0x01},
  32. wire.ConcreteType{msgInfo{}, 0x02},
  33. wire.ConcreteType{timeoutInfo{}, 0x03},
  34. wire.ConcreteType{EndHeightMessage{}, 0x04},
  35. )
  36. //--------------------------------------------------------
  37. // Simple write-ahead logger
  38. // Write ahead logger writes msgs to disk before they are processed.
  39. // Can be used for crash-recovery and deterministic replay
  40. // TODO: currently the wal is overwritten during replay catchup
  41. // give it a mode so it's either reading or appending - must read to end to start appending again
  42. type WAL struct {
  43. cmn.BaseService
  44. group *auto.Group
  45. light bool // ignore block parts
  46. enc *WALEncoder
  47. }
  48. func NewWAL(walFile string, light bool) (*WAL, error) {
  49. group, err := auto.OpenGroup(walFile)
  50. if err != nil {
  51. return nil, err
  52. }
  53. wal := &WAL{
  54. group: group,
  55. light: light,
  56. enc: NewWALEncoder(group),
  57. }
  58. wal.BaseService = *cmn.NewBaseService(nil, "WAL", wal)
  59. return wal, nil
  60. }
  61. func (wal *WAL) OnStart() error {
  62. size, err := wal.group.Head.Size()
  63. if err != nil {
  64. return err
  65. } else if size == 0 {
  66. wal.Save(EndHeightMessage{0})
  67. }
  68. _, err = wal.group.Start()
  69. return err
  70. }
  71. func (wal *WAL) OnStop() {
  72. wal.BaseService.OnStop()
  73. wal.group.Stop()
  74. }
  75. // called in newStep and for each pass in receiveRoutine
  76. func (wal *WAL) Save(msg WALMessage) {
  77. if wal == nil {
  78. return
  79. }
  80. if wal.light {
  81. // in light mode we only write new steps, timeouts, and our own votes (no proposals, block parts)
  82. if mi, ok := msg.(msgInfo); ok {
  83. if mi.PeerKey != "" {
  84. return
  85. }
  86. }
  87. }
  88. // Write the wal message
  89. if err := wal.enc.Encode(&TimedWALMessage{time.Now(), msg}); err != nil {
  90. cmn.PanicQ(cmn.Fmt("Error writing msg to consensus wal: %v \n\nMessage: %v", err, msg))
  91. }
  92. // TODO: only flush when necessary
  93. if err := wal.group.Flush(); err != nil {
  94. cmn.PanicQ(cmn.Fmt("Error flushing consensus wal buf to file. Error: %v \n", err))
  95. }
  96. }
  97. // SearchForEndHeight searches for the EndHeightMessage with the height and
  98. // returns an auto.GroupReader, whenever it was found or not and an error.
  99. // Group reader will be nil if found equals false.
  100. //
  101. // CONTRACT: caller must close group reader.
  102. func (wal *WAL) SearchForEndHeight(height uint64) (gr *auto.GroupReader, found bool, err error) {
  103. var msg *TimedWALMessage
  104. // NOTE: starting from the last file in the group because we're usually
  105. // searching for the last height. See replay.go
  106. min, max := wal.group.MinIndex(), wal.group.MaxIndex()
  107. wal.Logger.Debug("Searching for height", "height", height, "min", min, "max", max)
  108. for index := max; index >= min; index-- {
  109. gr, err = wal.group.NewReader(index)
  110. if err != nil {
  111. return nil, false, err
  112. }
  113. dec := NewWALDecoder(gr)
  114. for {
  115. msg, err = dec.Decode()
  116. if err == io.EOF {
  117. // check next file
  118. break
  119. }
  120. if err != nil {
  121. gr.Close()
  122. return nil, false, err
  123. }
  124. if m, ok := msg.Msg.(EndHeightMessage); ok {
  125. if m.Height == height { // found
  126. wal.Logger.Debug("Found", "height", height, "index", index)
  127. return gr, true, nil
  128. }
  129. }
  130. }
  131. gr.Close()
  132. }
  133. return nil, false, nil
  134. }
  135. ///////////////////////////////////////////////////////////////////////////////
  136. // A WALEncoder writes custom-encoded WAL messages to an output stream.
  137. //
  138. // Format: 4 bytes CRC sum + 4 bytes length + arbitrary-length value (go-wire encoded)
  139. type WALEncoder struct {
  140. wr io.Writer
  141. }
  142. // NewWALEncoder returns a new encoder that writes to wr.
  143. func NewWALEncoder(wr io.Writer) *WALEncoder {
  144. return &WALEncoder{wr}
  145. }
  146. // Encode writes the custom encoding of v to the stream.
  147. func (enc *WALEncoder) Encode(v interface{}) error {
  148. data := wire.BinaryBytes(v)
  149. crc := crc32.Checksum(data, crc32c)
  150. length := uint32(len(data))
  151. totalLength := 8 + int(length)
  152. msg := make([]byte, totalLength)
  153. binary.BigEndian.PutUint32(msg[0:4], crc)
  154. binary.BigEndian.PutUint32(msg[4:8], length)
  155. copy(msg[8:], data)
  156. _, err := enc.wr.Write(msg)
  157. if err == nil {
  158. // TODO [Anton Kaliaev 23 Oct 2017]: remove separator
  159. _, err = enc.wr.Write(walSeparator)
  160. }
  161. return err
  162. }
  163. ///////////////////////////////////////////////////////////////////////////////
  164. // A WALDecoder reads and decodes custom-encoded WAL messages from an input
  165. // stream. See WALEncoder for the format used.
  166. //
  167. // It will also compare the checksums and make sure data size is equal to the
  168. // length from the header. If that is not the case, error will be returned.
  169. type WALDecoder struct {
  170. rd io.Reader
  171. }
  172. // NewWALDecoder returns a new decoder that reads from rd.
  173. func NewWALDecoder(rd io.Reader) *WALDecoder {
  174. return &WALDecoder{rd}
  175. }
  176. // Decode reads the next custom-encoded value from its reader and returns it.
  177. func (dec *WALDecoder) Decode() (*TimedWALMessage, error) {
  178. b := make([]byte, 4)
  179. n, err := dec.rd.Read(b)
  180. if err == io.EOF {
  181. return nil, err
  182. }
  183. if err != nil {
  184. return nil, fmt.Errorf("failed to read checksum: %v", err)
  185. }
  186. crc := binary.BigEndian.Uint32(b)
  187. b = make([]byte, 4)
  188. n, err = dec.rd.Read(b)
  189. if err == io.EOF {
  190. return nil, err
  191. }
  192. if err != nil {
  193. return nil, fmt.Errorf("failed to read length: %v", err)
  194. }
  195. length := binary.BigEndian.Uint32(b)
  196. data := make([]byte, length)
  197. n, err = dec.rd.Read(data)
  198. if err == io.EOF {
  199. return nil, err
  200. }
  201. if err != nil {
  202. return nil, fmt.Errorf("not enough bytes for data: %v (want: %d, read: %v)", err, length, n)
  203. }
  204. // check checksum before decoding data
  205. actualCRC := crc32.Checksum(data, crc32c)
  206. if actualCRC != crc {
  207. return nil, fmt.Errorf("checksums do not match: (read: %v, actual: %v)", crc, actualCRC)
  208. }
  209. var nn int
  210. var res *TimedWALMessage
  211. res = wire.ReadBinary(&TimedWALMessage{}, bytes.NewBuffer(data), int(length), &nn, &err).(*TimedWALMessage)
  212. if err != nil {
  213. return nil, fmt.Errorf("failed to decode data: %v", err)
  214. }
  215. // TODO [Anton Kaliaev 23 Oct 2017]: remove separator
  216. if err = readSeparator(dec.rd); err != nil {
  217. return nil, err
  218. }
  219. return res, err
  220. }
  221. // readSeparator reads a separator from r. It returns any error from underlying
  222. // reader or if it's not a separator.
  223. func readSeparator(r io.Reader) error {
  224. b := make([]byte, len(walSeparator))
  225. _, err := r.Read(b)
  226. if err != nil {
  227. return fmt.Errorf("failed to read separator: %v", err)
  228. }
  229. if !bytes.Equal(b, walSeparator) {
  230. return fmt.Errorf("not a separator: %v", b)
  231. }
  232. return nil
  233. }