You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

111 lines
2.9 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package consensus
  2. import (
  3. "hash/crc32"
  4. "time"
  5. wire "github.com/tendermint/go-wire"
  6. "github.com/tendermint/tendermint/types"
  7. auto "github.com/tendermint/tmlibs/autofile"
  8. cmn "github.com/tendermint/tmlibs/common"
  9. )
  10. //--------------------------------------------------------
  11. // types and functions for savings consensus messages
  12. type TimedWALMessage struct {
  13. Time time.Time `json:"time"` // for debugging purposes
  14. CRC uint32 `json:"crc"`
  15. MsgSize uint32 `json:"msg_size"`
  16. Msg WALMessage `json:"msg"`
  17. }
  18. type WALMessage interface{}
  19. var _ = wire.RegisterInterface(
  20. struct{ WALMessage }{},
  21. wire.ConcreteType{types.EventDataRoundState{}, 0x01},
  22. wire.ConcreteType{msgInfo{}, 0x02},
  23. wire.ConcreteType{timeoutInfo{}, 0x03},
  24. )
  25. //--------------------------------------------------------
  26. // Simple write-ahead logger
  27. // Write ahead logger writes msgs to disk before they are processed.
  28. // Can be used for crash-recovery and deterministic replay
  29. // TODO: currently the wal is overwritten during replay catchup
  30. // give it a mode so it's either reading or appending - must read to end to start appending again
  31. type WAL struct {
  32. cmn.BaseService
  33. group *auto.Group
  34. light bool // ignore block parts
  35. }
  36. func NewWAL(walFile string, light bool) (*WAL, error) {
  37. group, err := auto.OpenGroup(walFile)
  38. if err != nil {
  39. return nil, err
  40. }
  41. wal := &WAL{
  42. group: group,
  43. light: light,
  44. }
  45. wal.BaseService = *cmn.NewBaseService(nil, "WAL", wal)
  46. return wal, nil
  47. }
  48. func (wal *WAL) OnStart() error {
  49. size, err := wal.group.Head.Size()
  50. if err != nil {
  51. return err
  52. } else if size == 0 {
  53. wal.writeEndHeight(0)
  54. }
  55. _, err = wal.group.Start()
  56. return err
  57. }
  58. func (wal *WAL) OnStop() {
  59. wal.BaseService.OnStop()
  60. wal.group.Stop()
  61. }
  62. // called in newStep and for each pass in receiveRoutine
  63. func (wal *WAL) Save(wmsg WALMessage) {
  64. if wal == nil {
  65. return
  66. }
  67. if wal.light {
  68. // in light mode we only write new steps, timeouts, and our own votes (no proposals, block parts)
  69. if mi, ok := wmsg.(msgInfo); ok {
  70. if mi.PeerKey != "" {
  71. return
  72. }
  73. }
  74. }
  75. // Write the wal message
  76. innerMsgBytes := wire.JSONBytes(wmsg)
  77. crc32c := crc32.MakeTable(crc32.Castagnoli)
  78. crc := crc32.Checksum(innerMsgBytes, crc32c)
  79. wmsgSize := uint32(len(innerMsgBytes))
  80. var wmsgBytes = wire.JSONBytes(TimedWALMessage{time.Now(), crc, wmsgSize, wmsg})
  81. err := wal.group.WriteLine(string(wmsgBytes))
  82. if err != nil {
  83. cmn.PanicQ(cmn.Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, wmsg))
  84. }
  85. // TODO: only flush when necessary
  86. if err := wal.group.Flush(); err != nil {
  87. cmn.PanicQ(cmn.Fmt("Error flushing consensus wal buf to file. Error: %v \n", err))
  88. }
  89. }
  90. func (wal *WAL) writeEndHeight(height int) {
  91. wal.group.WriteLine(cmn.Fmt("#ENDHEIGHT: %v", height))
  92. // TODO: only flush when necessary
  93. if err := wal.group.Flush(); err != nil {
  94. cmn.PanicQ(cmn.Fmt("Error flushing consensus wal buf to file. Error: %v \n", err))
  95. }
  96. }