You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

557 lines
13 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
cs: sync WAL more frequently (#3300) As per #3043, this adds a ticker to sync the WAL every 2s while the WAL is running. * Flush WAL every 2s This adds a ticker that flushes the WAL every 2s while the WAL is running. This is related to #3043. * Fix spelling * Increase timeout to 2mins for slower build environments * Make WAL sync interval configurable * Add TODO to replace testChan with more comprehensive testBus * Remove extraneous debug statement * Remove testChan in favour of using system time As per https://github.com/tendermint/tendermint/pull/3300#discussion_r255886586, this removes the `testChan` WAL member and replaces the approach with a system time-oriented one. In this new approach, we keep track of the system time at which each flush and periodic flush successfully occurred. The naming of the various functions is also updated here to be more consistent with "flushing" as opposed to "sync'ing". * Update naming convention and ensure lock for timestamp update * Add Flush method as part of WAL interface Adds a `Flush` method as part of the WAL interface to enforce the idea that we can manually trigger a WAL flush from outside of the WAL. This is employed in the consensus state management to flush the WAL prior to signing votes/proposals, as per https://github.com/tendermint/tendermint/issues/3043#issuecomment-453853630 * Update CHANGELOG_PENDING * Remove mutex approach and replace with DI The dependency injection approach to dealing with testing concerns could allow similar effects to some kind of "testing bus"-based approach. This commit introduces an example of this, where instead of relying on (potentially fragile) timing of things between the code and the test, we inject code into the function under test that can signal the test through a channel. This allows us to avoid the `time.Sleep()`-based approach previously employed. * Update comment on WAL flushing during vote signing Co-Authored-By: thanethomson <connect@thanethomson.com> * Simplify flush interval definition Co-Authored-By: thanethomson <connect@thanethomson.com> * Expand commentary on WAL disk flushing Co-Authored-By: thanethomson <connect@thanethomson.com> * Add broken test to illustrate WAL sync test problem Removes test-related state (dependency injection code) from the WAL data structure and adds test code to illustrate the problem with using `WALGenerateNBlocks` and `wal.SearchForEndHeight` to test periodic sync'ing. * Fix test error messages * Use WAL group buffer size to check for flush A function is added to `libs/autofile/group.go#Group` in order to return the size of the buffered data (i.e. data that has not yet been flushed to disk). The test now checks that, prior to a `time.Sleep`, the group buffer has data in it. After the `time.Sleep` (during which time the periodic flush should have been called), the buffer should be empty. * Remove config root dir removal from #3291 * Add godoc for NewWAL mentioning periodic sync
6 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package autofile
  2. import (
  3. "bufio"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path/filepath"
  10. "regexp"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "time"
  15. "github.com/tendermint/tendermint/libs/log"
  16. "github.com/tendermint/tendermint/libs/service"
  17. )
  18. const (
  19. defaultGroupCheckDuration = 5000 * time.Millisecond
  20. defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB
  21. defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB
  22. maxFilesToRemove = 4 // needs to be greater than 1
  23. )
  24. /*
  25. You can open a Group to keep restrictions on an AutoFile, like
  26. the maximum size of each chunk, and/or the total amount of bytes
  27. stored in the group.
  28. The first file to be written in the Group.Dir is the head file.
  29. Dir/
  30. - <HeadPath>
  31. Once the Head file reaches the size limit, it will be rotated.
  32. Dir/
  33. - <HeadPath>.000 // First rolled file
  34. - <HeadPath> // New head path, starts empty.
  35. // The implicit index is 001.
  36. As more files are written, the index numbers grow...
  37. Dir/
  38. - <HeadPath>.000 // First rolled file
  39. - <HeadPath>.001 // Second rolled file
  40. - ...
  41. - <HeadPath> // New head path
  42. The Group can also be used to binary-search for some line,
  43. assuming that marker lines are written occasionally.
  44. */
  45. type Group struct {
  46. service.BaseService
  47. logger log.Logger
  48. ID string
  49. Head *AutoFile // The head AutoFile to write to
  50. headBuf *bufio.Writer
  51. Dir string // Directory that contains .Head
  52. ticker *time.Ticker
  53. mtx sync.Mutex
  54. headSizeLimit int64
  55. totalSizeLimit int64
  56. groupCheckDuration time.Duration
  57. minIndex int // Includes head
  58. maxIndex int // Includes head, where Head will move to
  59. // TODO: When we start deleting files, we need to start tracking GroupReaders
  60. // and their dependencies.
  61. }
  62. // OpenGroup creates a new Group with head at headPath. It returns an error if
  63. // it fails to open head file.
  64. func OpenGroup(ctx context.Context, logger log.Logger, headPath string, groupOptions ...func(*Group)) (*Group, error) {
  65. dir, err := filepath.Abs(filepath.Dir(headPath))
  66. if err != nil {
  67. return nil, err
  68. }
  69. head, err := OpenAutoFile(ctx, headPath)
  70. if err != nil {
  71. return nil, err
  72. }
  73. g := &Group{
  74. logger: logger,
  75. ID: "group:" + head.ID,
  76. Head: head,
  77. headBuf: bufio.NewWriterSize(head, 4096*10),
  78. Dir: dir,
  79. headSizeLimit: defaultHeadSizeLimit,
  80. totalSizeLimit: defaultTotalSizeLimit,
  81. groupCheckDuration: defaultGroupCheckDuration,
  82. minIndex: 0,
  83. maxIndex: 0,
  84. }
  85. for _, option := range groupOptions {
  86. option(g)
  87. }
  88. g.BaseService = *service.NewBaseService(logger, "Group", g)
  89. gInfo := g.readGroupInfo()
  90. g.minIndex = gInfo.MinIndex
  91. g.maxIndex = gInfo.MaxIndex
  92. return g, nil
  93. }
  94. // GroupCheckDuration allows you to overwrite default groupCheckDuration.
  95. func GroupCheckDuration(duration time.Duration) func(*Group) {
  96. return func(g *Group) {
  97. g.groupCheckDuration = duration
  98. }
  99. }
  100. // GroupHeadSizeLimit allows you to overwrite default head size limit - 10MB.
  101. func GroupHeadSizeLimit(limit int64) func(*Group) {
  102. return func(g *Group) {
  103. g.headSizeLimit = limit
  104. }
  105. }
  106. // GroupTotalSizeLimit allows you to overwrite default total size limit of the group - 1GB.
  107. func GroupTotalSizeLimit(limit int64) func(*Group) {
  108. return func(g *Group) {
  109. g.totalSizeLimit = limit
  110. }
  111. }
  112. // OnStart implements service.Service by starting the goroutine that checks file
  113. // and group limits.
  114. func (g *Group) OnStart(ctx context.Context) error {
  115. g.ticker = time.NewTicker(g.groupCheckDuration)
  116. go g.processTicks(ctx)
  117. return nil
  118. }
  119. // OnStop implements service.Service by stopping the goroutine described above.
  120. // NOTE: g.Head must be closed separately using Close.
  121. func (g *Group) OnStop() {
  122. g.ticker.Stop()
  123. if err := g.FlushAndSync(); err != nil {
  124. g.logger.Error("error flushing to disk", "err", err)
  125. }
  126. }
  127. // Close closes the head file. The group must be stopped by this moment.
  128. func (g *Group) Close() {
  129. if err := g.FlushAndSync(); err != nil {
  130. g.logger.Error("error flushing to disk", "err", err)
  131. }
  132. g.mtx.Lock()
  133. _ = g.Head.Close()
  134. g.mtx.Unlock()
  135. }
  136. // HeadSizeLimit returns the current head size limit.
  137. func (g *Group) HeadSizeLimit() int64 {
  138. g.mtx.Lock()
  139. defer g.mtx.Unlock()
  140. return g.headSizeLimit
  141. }
  142. // TotalSizeLimit returns total size limit of the group.
  143. func (g *Group) TotalSizeLimit() int64 {
  144. g.mtx.Lock()
  145. defer g.mtx.Unlock()
  146. return g.totalSizeLimit
  147. }
  148. // MaxIndex returns index of the last file in the group.
  149. func (g *Group) MaxIndex() int {
  150. g.mtx.Lock()
  151. defer g.mtx.Unlock()
  152. return g.maxIndex
  153. }
  154. // MinIndex returns index of the first file in the group.
  155. func (g *Group) MinIndex() int {
  156. g.mtx.Lock()
  157. defer g.mtx.Unlock()
  158. return g.minIndex
  159. }
  160. // Write writes the contents of p into the current head of the group. It
  161. // returns the number of bytes written. If nn < len(p), it also returns an
  162. // error explaining why the write is short.
  163. // NOTE: Writes are buffered so they don't write synchronously
  164. // TODO: Make it halt if space is unavailable
  165. func (g *Group) Write(p []byte) (nn int, err error) {
  166. g.mtx.Lock()
  167. defer g.mtx.Unlock()
  168. return g.headBuf.Write(p)
  169. }
  170. // WriteLine writes line into the current head of the group. It also appends "\n".
  171. // NOTE: Writes are buffered so they don't write synchronously
  172. // TODO: Make it halt if space is unavailable
  173. func (g *Group) WriteLine(line string) error {
  174. g.mtx.Lock()
  175. defer g.mtx.Unlock()
  176. _, err := g.headBuf.Write([]byte(line + "\n"))
  177. return err
  178. }
  179. // Buffered returns the size of the currently buffered data.
  180. func (g *Group) Buffered() int {
  181. g.mtx.Lock()
  182. defer g.mtx.Unlock()
  183. return g.headBuf.Buffered()
  184. }
  185. // FlushAndSync writes any buffered data to the underlying file and commits the
  186. // current content of the file to stable storage (fsync).
  187. func (g *Group) FlushAndSync() error {
  188. g.mtx.Lock()
  189. defer g.mtx.Unlock()
  190. err := g.headBuf.Flush()
  191. if err == nil {
  192. err = g.Head.Sync()
  193. }
  194. return err
  195. }
  196. func (g *Group) processTicks(ctx context.Context) {
  197. for {
  198. select {
  199. case <-ctx.Done():
  200. return
  201. case <-g.ticker.C:
  202. g.checkHeadSizeLimit(ctx)
  203. g.checkTotalSizeLimit(ctx)
  204. }
  205. }
  206. }
  207. // NOTE: this function is called manually in tests.
  208. func (g *Group) checkHeadSizeLimit(ctx context.Context) {
  209. limit := g.HeadSizeLimit()
  210. if limit == 0 {
  211. return
  212. }
  213. size, err := g.Head.Size()
  214. if err != nil {
  215. g.logger.Error("Group's head may grow without bound", "head", g.Head.Path, "err", err)
  216. return
  217. }
  218. if size >= limit {
  219. g.rotateFile(ctx)
  220. }
  221. }
  222. func (g *Group) checkTotalSizeLimit(ctx context.Context) {
  223. g.mtx.Lock()
  224. defer g.mtx.Unlock()
  225. if err := ctx.Err(); err != nil {
  226. return
  227. }
  228. if g.totalSizeLimit == 0 {
  229. return
  230. }
  231. gInfo := g.readGroupInfo()
  232. totalSize := gInfo.TotalSize
  233. for i := 0; i < maxFilesToRemove; i++ {
  234. index := gInfo.MinIndex + i
  235. if totalSize < g.totalSizeLimit {
  236. return
  237. }
  238. if index == gInfo.MaxIndex {
  239. // Special degenerate case, just do nothing.
  240. g.logger.Error("Group's head may grow without bound", "head", g.Head.Path)
  241. return
  242. }
  243. if ctx.Err() != nil {
  244. return
  245. }
  246. pathToRemove := filePathForIndex(g.Head.Path, index, gInfo.MaxIndex)
  247. fInfo, err := os.Stat(pathToRemove)
  248. if err != nil {
  249. g.logger.Error("Failed to fetch info for file", "file", pathToRemove)
  250. continue
  251. }
  252. if ctx.Err() != nil {
  253. return
  254. }
  255. if err = os.Remove(pathToRemove); err != nil {
  256. g.logger.Error("Failed to remove path", "path", pathToRemove)
  257. return
  258. }
  259. totalSize -= fInfo.Size()
  260. }
  261. }
  262. // rotateFile causes group to close the current head and assign it
  263. // some index. Panics if it encounters an error.
  264. func (g *Group) rotateFile(ctx context.Context) {
  265. g.mtx.Lock()
  266. defer g.mtx.Unlock()
  267. if err := ctx.Err(); err != nil {
  268. return
  269. }
  270. headPath := g.Head.Path
  271. if err := g.headBuf.Flush(); err != nil {
  272. panic(err)
  273. }
  274. if err := g.Head.Sync(); err != nil {
  275. panic(err)
  276. }
  277. err := g.Head.withLock(func() error {
  278. if err := ctx.Err(); err != nil {
  279. return err
  280. }
  281. if err := g.Head.unsyncCloseFile(); err != nil {
  282. return err
  283. }
  284. indexPath := filePathForIndex(headPath, g.maxIndex, g.maxIndex+1)
  285. return os.Rename(headPath, indexPath)
  286. })
  287. if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
  288. return
  289. }
  290. if err != nil {
  291. panic(err)
  292. }
  293. g.maxIndex++
  294. }
  295. // NewReader returns a new group reader.
  296. // CONTRACT: Caller must close the returned GroupReader.
  297. func (g *Group) NewReader(index int) (*GroupReader, error) {
  298. r := newGroupReader(g)
  299. err := r.SetIndex(index)
  300. if err != nil {
  301. return nil, err
  302. }
  303. return r, nil
  304. }
  305. // GroupInfo holds information about the group.
  306. type GroupInfo struct {
  307. MinIndex int // index of the first file in the group, including head
  308. MaxIndex int // index of the last file in the group, including head
  309. TotalSize int64 // total size of the group
  310. HeadSize int64 // size of the head
  311. }
  312. // Returns info after scanning all files in g.Head's dir.
  313. func (g *Group) ReadGroupInfo() GroupInfo {
  314. g.mtx.Lock()
  315. defer g.mtx.Unlock()
  316. return g.readGroupInfo()
  317. }
  318. // Index includes the head.
  319. // CONTRACT: caller should have called g.mtx.Lock
  320. func (g *Group) readGroupInfo() GroupInfo {
  321. groupDir := filepath.Dir(g.Head.Path)
  322. headBase := filepath.Base(g.Head.Path)
  323. var minIndex, maxIndex int = -1, -1
  324. var totalSize, headSize int64 = 0, 0
  325. dir, err := os.Open(groupDir)
  326. if err != nil {
  327. panic(err)
  328. }
  329. defer dir.Close()
  330. fiz, err := dir.Readdir(0)
  331. if err != nil {
  332. panic(err)
  333. }
  334. // For each file in the directory, filter by pattern
  335. for _, fileInfo := range fiz {
  336. if fileInfo.Name() == headBase {
  337. fileSize := fileInfo.Size()
  338. totalSize += fileSize
  339. headSize = fileSize
  340. continue
  341. } else if strings.HasPrefix(fileInfo.Name(), headBase) {
  342. fileSize := fileInfo.Size()
  343. totalSize += fileSize
  344. indexedFilePattern := regexp.MustCompile(`^.+\.([0-9]{3,})$`)
  345. submatch := indexedFilePattern.FindSubmatch([]byte(fileInfo.Name()))
  346. if len(submatch) != 0 {
  347. // Matches
  348. fileIndex, err := strconv.Atoi(string(submatch[1]))
  349. if err != nil {
  350. panic(err)
  351. }
  352. if maxIndex < fileIndex {
  353. maxIndex = fileIndex
  354. }
  355. if minIndex == -1 || fileIndex < minIndex {
  356. minIndex = fileIndex
  357. }
  358. }
  359. }
  360. }
  361. // Now account for the head.
  362. if minIndex == -1 {
  363. // If there were no numbered files,
  364. // then the head is index 0.
  365. minIndex, maxIndex = 0, 0
  366. } else {
  367. // Otherwise, the head file is 1 greater
  368. maxIndex++
  369. }
  370. return GroupInfo{minIndex, maxIndex, totalSize, headSize}
  371. }
  372. func filePathForIndex(headPath string, index int, maxIndex int) string {
  373. if index == maxIndex {
  374. return headPath
  375. }
  376. return fmt.Sprintf("%v.%03d", headPath, index)
  377. }
  378. //--------------------------------------------------------------------------------
  379. // GroupReader provides an interface for reading from a Group.
  380. type GroupReader struct {
  381. *Group
  382. mtx sync.Mutex
  383. curIndex int
  384. curFile *os.File
  385. curReader *bufio.Reader
  386. curLine []byte
  387. }
  388. func newGroupReader(g *Group) *GroupReader {
  389. return &GroupReader{
  390. Group: g,
  391. curIndex: 0,
  392. curFile: nil,
  393. curReader: nil,
  394. curLine: nil,
  395. }
  396. }
  397. // Close closes the GroupReader by closing the cursor file.
  398. func (gr *GroupReader) Close() error {
  399. gr.mtx.Lock()
  400. defer gr.mtx.Unlock()
  401. if gr.curReader != nil {
  402. err := gr.curFile.Close()
  403. gr.curIndex = 0
  404. gr.curReader = nil
  405. gr.curFile = nil
  406. gr.curLine = nil
  407. return err
  408. }
  409. return nil
  410. }
  411. // Read implements io.Reader, reading bytes from the current Reader
  412. // incrementing index until enough bytes are read.
  413. func (gr *GroupReader) Read(p []byte) (n int, err error) {
  414. lenP := len(p)
  415. if lenP == 0 {
  416. return 0, errors.New("given empty slice")
  417. }
  418. gr.mtx.Lock()
  419. defer gr.mtx.Unlock()
  420. // Open file if not open yet
  421. if gr.curReader == nil {
  422. if err = gr.openFile(gr.curIndex); err != nil {
  423. return 0, err
  424. }
  425. }
  426. // Iterate over files until enough bytes are read
  427. var nn int
  428. for {
  429. nn, err = gr.curReader.Read(p[n:])
  430. n += nn
  431. switch {
  432. case err == io.EOF:
  433. if n >= lenP {
  434. return n, nil
  435. }
  436. // Open the next file
  437. if err1 := gr.openFile(gr.curIndex + 1); err1 != nil {
  438. return n, err1
  439. }
  440. case err != nil:
  441. return n, err
  442. case nn == 0: // empty file
  443. return n, err
  444. }
  445. }
  446. }
  447. // IF index > gr.Group.maxIndex, returns io.EOF
  448. // CONTRACT: caller should hold gr.mtx
  449. func (gr *GroupReader) openFile(index int) error {
  450. // Lock on Group to ensure that head doesn't move in the meanwhile.
  451. gr.Group.mtx.Lock()
  452. defer gr.Group.mtx.Unlock()
  453. if index > gr.Group.maxIndex {
  454. return io.EOF
  455. }
  456. curFilePath := filePathForIndex(gr.Head.Path, index, gr.Group.maxIndex)
  457. curFile, err := os.OpenFile(curFilePath, os.O_RDONLY|os.O_CREATE, autoFilePerms)
  458. if err != nil {
  459. return err
  460. }
  461. curReader := bufio.NewReader(curFile)
  462. // Update gr.cur*
  463. if gr.curFile != nil {
  464. gr.curFile.Close() // TODO return error?
  465. }
  466. gr.curIndex = index
  467. gr.curFile = curFile
  468. gr.curReader = curReader
  469. gr.curLine = nil
  470. return nil
  471. }
  472. // CurIndex returns cursor's file index.
  473. func (gr *GroupReader) CurIndex() int {
  474. gr.mtx.Lock()
  475. defer gr.mtx.Unlock()
  476. return gr.curIndex
  477. }
  478. // SetIndex sets the cursor's file index to index by opening a file at this
  479. // position.
  480. func (gr *GroupReader) SetIndex(index int) error {
  481. gr.mtx.Lock()
  482. defer gr.mtx.Unlock()
  483. return gr.openFile(index)
  484. }