You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

747 lines
17 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package autofile
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "log"
  8. "os"
  9. "path"
  10. "path/filepath"
  11. "regexp"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. cmn "github.com/tendermint/tmlibs/common"
  17. )
  18. const (
  19. groupCheckDuration = 5000 * time.Millisecond
  20. defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB
  21. defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB
  22. maxFilesToRemove = 4 // needs to be greater than 1
  23. )
  24. /*
  25. You can open a Group to keep restrictions on an AutoFile, like
  26. the maximum size of each chunk, and/or the total amount of bytes
  27. stored in the group.
  28. The first file to be written in the Group.Dir is the head file.
  29. Dir/
  30. - <HeadPath>
  31. Once the Head file reaches the size limit, it will be rotated.
  32. Dir/
  33. - <HeadPath>.000 // First rolled file
  34. - <HeadPath> // New head path, starts empty.
  35. // The implicit index is 001.
  36. As more files are written, the index numbers grow...
  37. Dir/
  38. - <HeadPath>.000 // First rolled file
  39. - <HeadPath>.001 // Second rolled file
  40. - ...
  41. - <HeadPath> // New head path
  42. The Group can also be used to binary-search for some line,
  43. assuming that marker lines are written occasionally.
  44. */
  45. type Group struct {
  46. cmn.BaseService
  47. ID string
  48. Head *AutoFile // The head AutoFile to write to
  49. headBuf *bufio.Writer
  50. Dir string // Directory that contains .Head
  51. ticker *time.Ticker
  52. mtx sync.Mutex
  53. headSizeLimit int64
  54. totalSizeLimit int64
  55. minIndex int // Includes head
  56. maxIndex int // Includes head, where Head will move to
  57. // TODO: When we start deleting files, we need to start tracking GroupReaders
  58. // and their dependencies.
  59. }
  60. // OpenGroup creates a new Group with head at headPath. It returns an error if
  61. // it fails to open head file.
  62. func OpenGroup(headPath string) (g *Group, err error) {
  63. dir := path.Dir(headPath)
  64. head, err := OpenAutoFile(headPath)
  65. if err != nil {
  66. return nil, err
  67. }
  68. g = &Group{
  69. ID: "group:" + head.ID,
  70. Head: head,
  71. headBuf: bufio.NewWriterSize(head, 4096*10),
  72. Dir: dir,
  73. ticker: time.NewTicker(groupCheckDuration),
  74. headSizeLimit: defaultHeadSizeLimit,
  75. totalSizeLimit: defaultTotalSizeLimit,
  76. minIndex: 0,
  77. maxIndex: 0,
  78. }
  79. g.BaseService = *cmn.NewBaseService(nil, "Group", g)
  80. gInfo := g.readGroupInfo()
  81. g.minIndex = gInfo.MinIndex
  82. g.maxIndex = gInfo.MaxIndex
  83. return
  84. }
  85. // OnStart implements Service by starting the goroutine that checks file and
  86. // group limits.
  87. func (g *Group) OnStart() error {
  88. go g.processTicks()
  89. return nil
  90. }
  91. // OnStop implements Service by stopping the goroutine described above.
  92. // NOTE: g.Head must be closed separately using Close.
  93. func (g *Group) OnStop() {
  94. g.ticker.Stop()
  95. g.Flush() // flush any uncommitted data
  96. }
  97. // Close closes the head file. The group must be stopped by this moment.
  98. func (g *Group) Close() {
  99. g.Flush() // flush any uncommitted data
  100. g.mtx.Lock()
  101. _ = g.Head.closeFile()
  102. g.mtx.Unlock()
  103. }
  104. // SetHeadSizeLimit allows you to overwrite default head size limit - 10MB.
  105. func (g *Group) SetHeadSizeLimit(limit int64) {
  106. g.mtx.Lock()
  107. g.headSizeLimit = limit
  108. g.mtx.Unlock()
  109. }
  110. // HeadSizeLimit returns the current head size limit.
  111. func (g *Group) HeadSizeLimit() int64 {
  112. g.mtx.Lock()
  113. defer g.mtx.Unlock()
  114. return g.headSizeLimit
  115. }
  116. // SetTotalSizeLimit allows you to overwrite default total size limit of the
  117. // group - 1GB.
  118. func (g *Group) SetTotalSizeLimit(limit int64) {
  119. g.mtx.Lock()
  120. g.totalSizeLimit = limit
  121. g.mtx.Unlock()
  122. }
  123. // TotalSizeLimit returns total size limit of the group.
  124. func (g *Group) TotalSizeLimit() int64 {
  125. g.mtx.Lock()
  126. defer g.mtx.Unlock()
  127. return g.totalSizeLimit
  128. }
  129. // MaxIndex returns index of the last file in the group.
  130. func (g *Group) MaxIndex() int {
  131. g.mtx.Lock()
  132. defer g.mtx.Unlock()
  133. return g.maxIndex
  134. }
  135. // MinIndex returns index of the first file in the group.
  136. func (g *Group) MinIndex() int {
  137. g.mtx.Lock()
  138. defer g.mtx.Unlock()
  139. return g.minIndex
  140. }
  141. // Write writes the contents of p into the current head of the group. It
  142. // returns the number of bytes written. If nn < len(p), it also returns an
  143. // error explaining why the write is short.
  144. // NOTE: Writes are buffered so they don't write synchronously
  145. // TODO: Make it halt if space is unavailable
  146. func (g *Group) Write(p []byte) (nn int, err error) {
  147. g.mtx.Lock()
  148. defer g.mtx.Unlock()
  149. return g.headBuf.Write(p)
  150. }
  151. // WriteLine writes line into the current head of the group. It also appends "\n".
  152. // NOTE: Writes are buffered so they don't write synchronously
  153. // TODO: Make it halt if space is unavailable
  154. func (g *Group) WriteLine(line string) error {
  155. g.mtx.Lock()
  156. defer g.mtx.Unlock()
  157. _, err := g.headBuf.Write([]byte(line + "\n"))
  158. return err
  159. }
  160. // Flush writes any buffered data to the underlying file and commits the
  161. // current content of the file to stable storage.
  162. func (g *Group) Flush() error {
  163. g.mtx.Lock()
  164. defer g.mtx.Unlock()
  165. err := g.headBuf.Flush()
  166. if err == nil {
  167. err = g.Head.Sync()
  168. }
  169. return err
  170. }
  171. func (g *Group) processTicks() {
  172. for {
  173. _, ok := <-g.ticker.C
  174. if !ok {
  175. return // Done.
  176. }
  177. g.checkHeadSizeLimit()
  178. g.checkTotalSizeLimit()
  179. }
  180. }
  181. // NOTE: for testing
  182. func (g *Group) stopTicker() {
  183. g.ticker.Stop()
  184. }
  185. // NOTE: this function is called manually in tests.
  186. func (g *Group) checkHeadSizeLimit() {
  187. limit := g.HeadSizeLimit()
  188. if limit == 0 {
  189. return
  190. }
  191. size, err := g.Head.Size()
  192. if err != nil {
  193. panic(err)
  194. }
  195. if size >= limit {
  196. g.RotateFile()
  197. }
  198. }
  199. func (g *Group) checkTotalSizeLimit() {
  200. limit := g.TotalSizeLimit()
  201. if limit == 0 {
  202. return
  203. }
  204. gInfo := g.readGroupInfo()
  205. totalSize := gInfo.TotalSize
  206. for i := 0; i < maxFilesToRemove; i++ {
  207. index := gInfo.MinIndex + i
  208. if totalSize < limit {
  209. return
  210. }
  211. if index == gInfo.MaxIndex {
  212. // Special degenerate case, just do nothing.
  213. log.Println("WARNING: Group's head " + g.Head.Path + "may grow without bound")
  214. return
  215. }
  216. pathToRemove := filePathForIndex(g.Head.Path, index, gInfo.MaxIndex)
  217. fileInfo, err := os.Stat(pathToRemove)
  218. if err != nil {
  219. log.Println("WARNING: Failed to fetch info for file @" + pathToRemove)
  220. continue
  221. }
  222. err = os.Remove(pathToRemove)
  223. if err != nil {
  224. log.Println(err)
  225. return
  226. }
  227. totalSize -= fileInfo.Size()
  228. }
  229. }
  230. // RotateFile causes group to close the current head and assign it some index.
  231. // Note it does not create a new head.
  232. func (g *Group) RotateFile() {
  233. g.mtx.Lock()
  234. defer g.mtx.Unlock()
  235. headPath := g.Head.Path
  236. if err := g.Head.closeFile(); err != nil {
  237. panic(err)
  238. }
  239. indexPath := filePathForIndex(headPath, g.maxIndex, g.maxIndex+1)
  240. if err := os.Rename(headPath, indexPath); err != nil {
  241. panic(err)
  242. }
  243. g.maxIndex++
  244. }
  245. // NewReader returns a new group reader.
  246. // CONTRACT: Caller must close the returned GroupReader.
  247. func (g *Group) NewReader(index int) (*GroupReader, error) {
  248. r := newGroupReader(g)
  249. err := r.SetIndex(index)
  250. if err != nil {
  251. return nil, err
  252. }
  253. return r, nil
  254. }
  255. // Returns -1 if line comes after, 0 if found, 1 if line comes before.
  256. type SearchFunc func(line string) (int, error)
  257. // Searches for the right file in Group, then returns a GroupReader to start
  258. // streaming lines.
  259. // Returns true if an exact match was found, otherwise returns the next greater
  260. // line that starts with prefix.
  261. // CONTRACT: Caller must close the returned GroupReader
  262. func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, bool, error) {
  263. g.mtx.Lock()
  264. minIndex, maxIndex := g.minIndex, g.maxIndex
  265. g.mtx.Unlock()
  266. // Now minIndex/maxIndex may change meanwhile,
  267. // but it shouldn't be a big deal
  268. // (maybe we'll want to limit scanUntil though)
  269. for {
  270. curIndex := (minIndex + maxIndex + 1) / 2
  271. // Base case, when there's only 1 choice left.
  272. if minIndex == maxIndex {
  273. r, err := g.NewReader(maxIndex)
  274. if err != nil {
  275. return nil, false, err
  276. }
  277. match, err := scanUntil(r, prefix, cmp)
  278. if err != nil {
  279. r.Close()
  280. return nil, false, err
  281. }
  282. return r, match, err
  283. }
  284. // Read starting roughly at the middle file,
  285. // until we find line that has prefix.
  286. r, err := g.NewReader(curIndex)
  287. if err != nil {
  288. return nil, false, err
  289. }
  290. foundIndex, line, err := scanNext(r, prefix)
  291. r.Close()
  292. if err != nil {
  293. return nil, false, err
  294. }
  295. // Compare this line to our search query.
  296. val, err := cmp(line)
  297. if err != nil {
  298. return nil, false, err
  299. }
  300. if val < 0 {
  301. // Line will come later
  302. minIndex = foundIndex
  303. } else if val == 0 {
  304. // Stroke of luck, found the line
  305. r, err := g.NewReader(foundIndex)
  306. if err != nil {
  307. return nil, false, err
  308. }
  309. match, err := scanUntil(r, prefix, cmp)
  310. if !match {
  311. panic("Expected match to be true")
  312. }
  313. if err != nil {
  314. r.Close()
  315. return nil, false, err
  316. }
  317. return r, true, err
  318. } else {
  319. // We passed it
  320. maxIndex = curIndex - 1
  321. }
  322. }
  323. }
  324. // Scans and returns the first line that starts with 'prefix'
  325. // Consumes line and returns it.
  326. func scanNext(r *GroupReader, prefix string) (int, string, error) {
  327. for {
  328. line, err := r.ReadLine()
  329. if err != nil {
  330. return 0, "", err
  331. }
  332. if !strings.HasPrefix(line, prefix) {
  333. continue
  334. }
  335. index := r.CurIndex()
  336. return index, line, nil
  337. }
  338. }
  339. // Returns true iff an exact match was found.
  340. // Pushes line, does not consume it.
  341. func scanUntil(r *GroupReader, prefix string, cmp SearchFunc) (bool, error) {
  342. for {
  343. line, err := r.ReadLine()
  344. if err != nil {
  345. return false, err
  346. }
  347. if !strings.HasPrefix(line, prefix) {
  348. continue
  349. }
  350. val, err := cmp(line)
  351. if err != nil {
  352. return false, err
  353. }
  354. if val < 0 {
  355. continue
  356. } else if val == 0 {
  357. r.PushLine(line)
  358. return true, nil
  359. } else {
  360. r.PushLine(line)
  361. return false, nil
  362. }
  363. }
  364. }
  365. // Searches backwards for the last line in Group with prefix.
  366. // Scans each file forward until the end to find the last match.
  367. func (g *Group) FindLast(prefix string) (match string, found bool, err error) {
  368. g.mtx.Lock()
  369. minIndex, maxIndex := g.minIndex, g.maxIndex
  370. g.mtx.Unlock()
  371. r, err := g.NewReader(maxIndex)
  372. if err != nil {
  373. return "", false, err
  374. }
  375. defer r.Close()
  376. // Open files from the back and read
  377. GROUP_LOOP:
  378. for i := maxIndex; i >= minIndex; i-- {
  379. err := r.SetIndex(i)
  380. if err != nil {
  381. return "", false, err
  382. }
  383. // Scan each line and test whether line matches
  384. for {
  385. line, err := r.ReadLine()
  386. if err == io.EOF {
  387. if found {
  388. return match, found, nil
  389. }
  390. continue GROUP_LOOP
  391. } else if err != nil {
  392. return "", false, err
  393. }
  394. if strings.HasPrefix(line, prefix) {
  395. match = line
  396. found = true
  397. }
  398. if r.CurIndex() > i {
  399. if found {
  400. return match, found, nil
  401. }
  402. continue GROUP_LOOP
  403. }
  404. }
  405. }
  406. return
  407. }
  408. // GroupInfo holds information about the group.
  409. type GroupInfo struct {
  410. MinIndex int // index of the first file in the group, including head
  411. MaxIndex int // index of the last file in the group, including head
  412. TotalSize int64 // total size of the group
  413. HeadSize int64 // size of the head
  414. }
  415. // Returns info after scanning all files in g.Head's dir.
  416. func (g *Group) ReadGroupInfo() GroupInfo {
  417. g.mtx.Lock()
  418. defer g.mtx.Unlock()
  419. return g.readGroupInfo()
  420. }
  421. // Index includes the head.
  422. // CONTRACT: caller should have called g.mtx.Lock
  423. func (g *Group) readGroupInfo() GroupInfo {
  424. groupDir := filepath.Dir(g.Head.Path)
  425. headBase := filepath.Base(g.Head.Path)
  426. var minIndex, maxIndex int = -1, -1
  427. var totalSize, headSize int64 = 0, 0
  428. dir, err := os.Open(groupDir)
  429. if err != nil {
  430. panic(err)
  431. }
  432. defer dir.Close()
  433. fiz, err := dir.Readdir(0)
  434. if err != nil {
  435. panic(err)
  436. }
  437. // For each file in the directory, filter by pattern
  438. for _, fileInfo := range fiz {
  439. if fileInfo.Name() == headBase {
  440. fileSize := fileInfo.Size()
  441. totalSize += fileSize
  442. headSize = fileSize
  443. continue
  444. } else if strings.HasPrefix(fileInfo.Name(), headBase) {
  445. fileSize := fileInfo.Size()
  446. totalSize += fileSize
  447. indexedFilePattern := regexp.MustCompile(`^.+\.([0-9]{3,})$`)
  448. submatch := indexedFilePattern.FindSubmatch([]byte(fileInfo.Name()))
  449. if len(submatch) != 0 {
  450. // Matches
  451. fileIndex, err := strconv.Atoi(string(submatch[1]))
  452. if err != nil {
  453. panic(err)
  454. }
  455. if maxIndex < fileIndex {
  456. maxIndex = fileIndex
  457. }
  458. if minIndex == -1 || fileIndex < minIndex {
  459. minIndex = fileIndex
  460. }
  461. }
  462. }
  463. }
  464. // Now account for the head.
  465. if minIndex == -1 {
  466. // If there were no numbered files,
  467. // then the head is index 0.
  468. minIndex, maxIndex = 0, 0
  469. } else {
  470. // Otherwise, the head file is 1 greater
  471. maxIndex++
  472. }
  473. return GroupInfo{minIndex, maxIndex, totalSize, headSize}
  474. }
  475. func filePathForIndex(headPath string, index int, maxIndex int) string {
  476. if index == maxIndex {
  477. return headPath
  478. }
  479. return fmt.Sprintf("%v.%03d", headPath, index)
  480. }
  481. //--------------------------------------------------------------------------------
  482. // GroupReader provides an interface for reading from a Group.
  483. type GroupReader struct {
  484. *Group
  485. mtx sync.Mutex
  486. curIndex int
  487. curFile *os.File
  488. curReader *bufio.Reader
  489. curLine []byte
  490. }
  491. func newGroupReader(g *Group) *GroupReader {
  492. return &GroupReader{
  493. Group: g,
  494. curIndex: 0,
  495. curFile: nil,
  496. curReader: nil,
  497. curLine: nil,
  498. }
  499. }
  500. // Close closes the GroupReader by closing the cursor file.
  501. func (gr *GroupReader) Close() error {
  502. gr.mtx.Lock()
  503. defer gr.mtx.Unlock()
  504. if gr.curReader != nil {
  505. err := gr.curFile.Close()
  506. gr.curIndex = 0
  507. gr.curReader = nil
  508. gr.curFile = nil
  509. gr.curLine = nil
  510. return err
  511. }
  512. return nil
  513. }
  514. // Read implements io.Reader, reading bytes from the current Reader
  515. // incrementing index until enough bytes are read.
  516. func (gr *GroupReader) Read(p []byte) (n int, err error) {
  517. lenP := len(p)
  518. if lenP == 0 {
  519. return 0, errors.New("given empty slice")
  520. }
  521. gr.mtx.Lock()
  522. defer gr.mtx.Unlock()
  523. // Open file if not open yet
  524. if gr.curReader == nil {
  525. if err = gr.openFile(gr.curIndex); err != nil {
  526. return 0, err
  527. }
  528. }
  529. // Iterate over files until enough bytes are read
  530. var nn int
  531. for {
  532. nn, err = gr.curReader.Read(p[n:])
  533. n += nn
  534. if err == io.EOF {
  535. if n >= lenP {
  536. return n, nil
  537. }
  538. // Open the next file
  539. if err1 := gr.openFile(gr.curIndex + 1); err1 != nil {
  540. return n, err1
  541. }
  542. } else if err != nil {
  543. return n, err
  544. } else if nn == 0 { // empty file
  545. return n, err
  546. }
  547. }
  548. }
  549. // ReadLine reads a line (without delimiter).
  550. // just return io.EOF if no new lines found.
  551. func (gr *GroupReader) ReadLine() (string, error) {
  552. gr.mtx.Lock()
  553. defer gr.mtx.Unlock()
  554. // From PushLine
  555. if gr.curLine != nil {
  556. line := string(gr.curLine)
  557. gr.curLine = nil
  558. return line, nil
  559. }
  560. // Open file if not open yet
  561. if gr.curReader == nil {
  562. err := gr.openFile(gr.curIndex)
  563. if err != nil {
  564. return "", err
  565. }
  566. }
  567. // Iterate over files until line is found
  568. var linePrefix string
  569. for {
  570. bytesRead, err := gr.curReader.ReadBytes('\n')
  571. if err == io.EOF {
  572. // Open the next file
  573. if err1 := gr.openFile(gr.curIndex + 1); err1 != nil {
  574. return "", err1
  575. }
  576. if len(bytesRead) > 0 && bytesRead[len(bytesRead)-1] == byte('\n') {
  577. return linePrefix + string(bytesRead[:len(bytesRead)-1]), nil
  578. }
  579. linePrefix += string(bytesRead)
  580. continue
  581. } else if err != nil {
  582. return "", err
  583. }
  584. return linePrefix + string(bytesRead[:len(bytesRead)-1]), nil
  585. }
  586. }
  587. // IF index > gr.Group.maxIndex, returns io.EOF
  588. // CONTRACT: caller should hold gr.mtx
  589. func (gr *GroupReader) openFile(index int) error {
  590. // Lock on Group to ensure that head doesn't move in the meanwhile.
  591. gr.Group.mtx.Lock()
  592. defer gr.Group.mtx.Unlock()
  593. if index > gr.Group.maxIndex {
  594. return io.EOF
  595. }
  596. curFilePath := filePathForIndex(gr.Head.Path, index, gr.Group.maxIndex)
  597. curFile, err := os.Open(curFilePath)
  598. if err != nil {
  599. return err
  600. }
  601. curReader := bufio.NewReader(curFile)
  602. // Update gr.cur*
  603. if gr.curFile != nil {
  604. gr.curFile.Close() // TODO return error?
  605. }
  606. gr.curIndex = index
  607. gr.curFile = curFile
  608. gr.curReader = curReader
  609. gr.curLine = nil
  610. return nil
  611. }
  612. // PushLine makes the given line the current one, so the next time somebody
  613. // calls ReadLine, this line will be returned.
  614. // panics if called twice without calling ReadLine.
  615. func (gr *GroupReader) PushLine(line string) {
  616. gr.mtx.Lock()
  617. defer gr.mtx.Unlock()
  618. if gr.curLine == nil {
  619. gr.curLine = []byte(line)
  620. } else {
  621. panic("PushLine failed, already have line")
  622. }
  623. }
  624. // CurIndex returns cursor's file index.
  625. func (gr *GroupReader) CurIndex() int {
  626. gr.mtx.Lock()
  627. defer gr.mtx.Unlock()
  628. return gr.curIndex
  629. }
  630. // SetIndex sets the cursor's file index to index by opening a file at this
  631. // position.
  632. func (gr *GroupReader) SetIndex(index int) error {
  633. gr.mtx.Lock()
  634. defer gr.mtx.Unlock()
  635. return gr.openFile(index)
  636. }
  637. //--------------------------------------------------------------------------------
  638. // A simple SearchFunc that assumes that the marker is of form
  639. // <prefix><number>.
  640. // For example, if prefix is '#HEIGHT:', the markers of expected to be of the form:
  641. //
  642. // #HEIGHT:1
  643. // ...
  644. // #HEIGHT:2
  645. // ...
  646. func MakeSimpleSearchFunc(prefix string, target int) SearchFunc {
  647. return func(line string) (int, error) {
  648. if !strings.HasPrefix(line, prefix) {
  649. return -1, errors.New(cmn.Fmt("Marker line did not have prefix: %v", prefix))
  650. }
  651. i, err := strconv.Atoi(line[len(prefix):])
  652. if err != nil {
  653. return -1, errors.New(cmn.Fmt("Failed to parse marker line: %v", err.Error()))
  654. }
  655. if target < i {
  656. return 1, nil
  657. } else if target == i {
  658. return 0, nil
  659. } else {
  660. return -1, nil
  661. }
  662. }
  663. }