You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

735 lines
17 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package autofile
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "log"
  8. "os"
  9. "path"
  10. "path/filepath"
  11. "regexp"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. cmn "github.com/tendermint/tmlibs/common"
  17. )
  18. const (
  19. groupCheckDuration = 5000 * time.Millisecond
  20. defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB
  21. defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB
  22. maxFilesToRemove = 4 // needs to be greater than 1
  23. )
  24. /*
  25. You can open a Group to keep restrictions on an AutoFile, like
  26. the maximum size of each chunk, and/or the total amount of bytes
  27. stored in the group.
  28. The first file to be written in the Group.Dir is the head file.
  29. Dir/
  30. - <HeadPath>
  31. Once the Head file reaches the size limit, it will be rotated.
  32. Dir/
  33. - <HeadPath>.000 // First rolled file
  34. - <HeadPath> // New head path, starts empty.
  35. // The implicit index is 001.
  36. As more files are written, the index numbers grow...
  37. Dir/
  38. - <HeadPath>.000 // First rolled file
  39. - <HeadPath>.001 // Second rolled file
  40. - ...
  41. - <HeadPath> // New head path
  42. The Group can also be used to binary-search for some line,
  43. assuming that marker lines are written occasionally.
  44. */
  45. type Group struct {
  46. cmn.BaseService
  47. ID string
  48. Head *AutoFile // The head AutoFile to write to
  49. headBuf *bufio.Writer
  50. Dir string // Directory that contains .Head
  51. ticker *time.Ticker
  52. mtx sync.Mutex
  53. headSizeLimit int64
  54. totalSizeLimit int64
  55. minIndex int // Includes head
  56. maxIndex int // Includes head, where Head will move to
  57. // TODO: When we start deleting files, we need to start tracking GroupReaders
  58. // and their dependencies.
  59. }
  60. func OpenGroup(headPath string) (g *Group, err error) {
  61. dir := path.Dir(headPath)
  62. head, err := OpenAutoFile(headPath)
  63. if err != nil {
  64. return nil, err
  65. }
  66. g = &Group{
  67. ID: "group:" + head.ID,
  68. Head: head,
  69. headBuf: bufio.NewWriterSize(head, 4096*10),
  70. Dir: dir,
  71. ticker: time.NewTicker(groupCheckDuration),
  72. headSizeLimit: defaultHeadSizeLimit,
  73. totalSizeLimit: defaultTotalSizeLimit,
  74. minIndex: 0,
  75. maxIndex: 0,
  76. }
  77. g.BaseService = *cmn.NewBaseService(nil, "Group", g)
  78. gInfo := g.readGroupInfo()
  79. g.minIndex = gInfo.MinIndex
  80. g.maxIndex = gInfo.MaxIndex
  81. return
  82. }
  83. func (g *Group) OnStart() error {
  84. g.BaseService.OnStart()
  85. go g.processTicks()
  86. return nil
  87. }
  88. // NOTE: g.Head must be closed separately
  89. func (g *Group) OnStop() {
  90. g.BaseService.OnStop()
  91. g.ticker.Stop()
  92. }
  93. // SetHeadSizeLimit allows you to overwrite default head size limit - 10MB.
  94. func (g *Group) SetHeadSizeLimit(limit int64) {
  95. g.mtx.Lock()
  96. g.headSizeLimit = limit
  97. g.mtx.Unlock()
  98. }
  99. // HeadSizeLimit returns the current head size limit.
  100. func (g *Group) HeadSizeLimit() int64 {
  101. g.mtx.Lock()
  102. defer g.mtx.Unlock()
  103. return g.headSizeLimit
  104. }
  105. // SetTotalSizeLimit allows you to overwrite default total size limit of the
  106. // group - 1GB.
  107. func (g *Group) SetTotalSizeLimit(limit int64) {
  108. g.mtx.Lock()
  109. g.totalSizeLimit = limit
  110. g.mtx.Unlock()
  111. }
  112. // TotalSizeLimit returns total size limit of the group.
  113. func (g *Group) TotalSizeLimit() int64 {
  114. g.mtx.Lock()
  115. defer g.mtx.Unlock()
  116. return g.totalSizeLimit
  117. }
  118. // MaxIndex returns index of the last file in the group.
  119. func (g *Group) MaxIndex() int {
  120. g.mtx.Lock()
  121. defer g.mtx.Unlock()
  122. return g.maxIndex
  123. }
  124. // MinIndex returns index of the first file in the group.
  125. func (g *Group) MinIndex() int {
  126. g.mtx.Lock()
  127. defer g.mtx.Unlock()
  128. return g.minIndex
  129. }
  130. // Write writes the contents of p into the current head of the group. It
  131. // returns the number of bytes written. If nn < len(p), it also returns an
  132. // error explaining why the write is short.
  133. // NOTE: Writes are buffered so they don't write synchronously
  134. // TODO: Make it halt if space is unavailable
  135. func (g *Group) Write(p []byte) (nn int, err error) {
  136. g.mtx.Lock()
  137. defer g.mtx.Unlock()
  138. return g.headBuf.Write(p)
  139. }
  140. // WriteLine writes line into the current head of the group. It also appends "\n".
  141. // NOTE: Writes are buffered so they don't write synchronously
  142. // TODO: Make it halt if space is unavailable
  143. func (g *Group) WriteLine(line string) error {
  144. g.mtx.Lock()
  145. defer g.mtx.Unlock()
  146. _, err := g.headBuf.Write([]byte(line + "\n"))
  147. return err
  148. }
  149. // Flush writes any buffered data to the underlying file and commits the
  150. // current content of the file to stable storage.
  151. func (g *Group) Flush() error {
  152. g.mtx.Lock()
  153. defer g.mtx.Unlock()
  154. err := g.headBuf.Flush()
  155. if err == nil {
  156. err = g.Head.Sync()
  157. }
  158. return err
  159. }
  160. func (g *Group) processTicks() {
  161. for {
  162. _, ok := <-g.ticker.C
  163. if !ok {
  164. return // Done.
  165. }
  166. g.checkHeadSizeLimit()
  167. g.checkTotalSizeLimit()
  168. }
  169. }
  170. // NOTE: for testing
  171. func (g *Group) stopTicker() {
  172. g.ticker.Stop()
  173. }
  174. // NOTE: this function is called manually in tests.
  175. func (g *Group) checkHeadSizeLimit() {
  176. limit := g.HeadSizeLimit()
  177. if limit == 0 {
  178. return
  179. }
  180. size, err := g.Head.Size()
  181. if err != nil {
  182. panic(err)
  183. }
  184. if size >= limit {
  185. g.RotateFile()
  186. }
  187. }
  188. func (g *Group) checkTotalSizeLimit() {
  189. limit := g.TotalSizeLimit()
  190. if limit == 0 {
  191. return
  192. }
  193. gInfo := g.readGroupInfo()
  194. totalSize := gInfo.TotalSize
  195. for i := 0; i < maxFilesToRemove; i++ {
  196. index := gInfo.MinIndex + i
  197. if totalSize < limit {
  198. return
  199. }
  200. if index == gInfo.MaxIndex {
  201. // Special degenerate case, just do nothing.
  202. log.Println("WARNING: Group's head " + g.Head.Path + "may grow without bound")
  203. return
  204. }
  205. pathToRemove := filePathForIndex(g.Head.Path, index, gInfo.MaxIndex)
  206. fileInfo, err := os.Stat(pathToRemove)
  207. if err != nil {
  208. log.Println("WARNING: Failed to fetch info for file @" + pathToRemove)
  209. continue
  210. }
  211. err = os.Remove(pathToRemove)
  212. if err != nil {
  213. log.Println(err)
  214. return
  215. }
  216. totalSize -= fileInfo.Size()
  217. }
  218. }
  219. // RotateFile causes group to close the current head and assign it some index.
  220. // Note it does not create a new head.
  221. func (g *Group) RotateFile() {
  222. g.mtx.Lock()
  223. defer g.mtx.Unlock()
  224. headPath := g.Head.Path
  225. if err := g.Head.closeFile(); err != nil {
  226. panic(err)
  227. }
  228. indexPath := filePathForIndex(headPath, g.maxIndex, g.maxIndex+1)
  229. if err := os.Rename(headPath, indexPath); err != nil {
  230. panic(err)
  231. }
  232. g.maxIndex++
  233. }
  234. // NewReader returns a new group reader.
  235. // CONTRACT: Caller must close the returned GroupReader.
  236. func (g *Group) NewReader(index int) (*GroupReader, error) {
  237. r := newGroupReader(g)
  238. err := r.SetIndex(index)
  239. if err != nil {
  240. return nil, err
  241. }
  242. return r, nil
  243. }
  244. // Returns -1 if line comes after, 0 if found, 1 if line comes before.
  245. type SearchFunc func(line string) (int, error)
  246. // Searches for the right file in Group, then returns a GroupReader to start
  247. // streaming lines.
  248. // Returns true if an exact match was found, otherwise returns the next greater
  249. // line that starts with prefix.
  250. // CONTRACT: Caller must close the returned GroupReader
  251. func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, bool, error) {
  252. g.mtx.Lock()
  253. minIndex, maxIndex := g.minIndex, g.maxIndex
  254. g.mtx.Unlock()
  255. // Now minIndex/maxIndex may change meanwhile,
  256. // but it shouldn't be a big deal
  257. // (maybe we'll want to limit scanUntil though)
  258. for {
  259. curIndex := (minIndex + maxIndex + 1) / 2
  260. // Base case, when there's only 1 choice left.
  261. if minIndex == maxIndex {
  262. r, err := g.NewReader(maxIndex)
  263. if err != nil {
  264. return nil, false, err
  265. }
  266. match, err := scanUntil(r, prefix, cmp)
  267. if err != nil {
  268. r.Close()
  269. return nil, false, err
  270. }
  271. return r, match, err
  272. }
  273. // Read starting roughly at the middle file,
  274. // until we find line that has prefix.
  275. r, err := g.NewReader(curIndex)
  276. if err != nil {
  277. return nil, false, err
  278. }
  279. foundIndex, line, err := scanNext(r, prefix)
  280. r.Close()
  281. if err != nil {
  282. return nil, false, err
  283. }
  284. // Compare this line to our search query.
  285. val, err := cmp(line)
  286. if err != nil {
  287. return nil, false, err
  288. }
  289. if val < 0 {
  290. // Line will come later
  291. minIndex = foundIndex
  292. } else if val == 0 {
  293. // Stroke of luck, found the line
  294. r, err := g.NewReader(foundIndex)
  295. if err != nil {
  296. return nil, false, err
  297. }
  298. match, err := scanUntil(r, prefix, cmp)
  299. if !match {
  300. panic("Expected match to be true")
  301. }
  302. if err != nil {
  303. r.Close()
  304. return nil, false, err
  305. }
  306. return r, true, err
  307. } else {
  308. // We passed it
  309. maxIndex = curIndex - 1
  310. }
  311. }
  312. }
  313. // Scans and returns the first line that starts with 'prefix'
  314. // Consumes line and returns it.
  315. func scanNext(r *GroupReader, prefix string) (int, string, error) {
  316. for {
  317. line, err := r.ReadLine()
  318. if err != nil {
  319. return 0, "", err
  320. }
  321. if !strings.HasPrefix(line, prefix) {
  322. continue
  323. }
  324. index := r.CurIndex()
  325. return index, line, nil
  326. }
  327. }
  328. // Returns true iff an exact match was found.
  329. // Pushes line, does not consume it.
  330. func scanUntil(r *GroupReader, prefix string, cmp SearchFunc) (bool, error) {
  331. for {
  332. line, err := r.ReadLine()
  333. if err != nil {
  334. return false, err
  335. }
  336. if !strings.HasPrefix(line, prefix) {
  337. continue
  338. }
  339. val, err := cmp(line)
  340. if err != nil {
  341. return false, err
  342. }
  343. if val < 0 {
  344. continue
  345. } else if val == 0 {
  346. r.PushLine(line)
  347. return true, nil
  348. } else {
  349. r.PushLine(line)
  350. return false, nil
  351. }
  352. }
  353. }
  354. // Searches backwards for the last line in Group with prefix.
  355. // Scans each file forward until the end to find the last match.
  356. func (g *Group) FindLast(prefix string) (match string, found bool, err error) {
  357. g.mtx.Lock()
  358. minIndex, maxIndex := g.minIndex, g.maxIndex
  359. g.mtx.Unlock()
  360. r, err := g.NewReader(maxIndex)
  361. if err != nil {
  362. return "", false, err
  363. }
  364. defer r.Close()
  365. // Open files from the back and read
  366. GROUP_LOOP:
  367. for i := maxIndex; i >= minIndex; i-- {
  368. err := r.SetIndex(i)
  369. if err != nil {
  370. return "", false, err
  371. }
  372. // Scan each line and test whether line matches
  373. for {
  374. line, err := r.ReadLine()
  375. if err == io.EOF {
  376. if found {
  377. return match, found, nil
  378. }
  379. continue GROUP_LOOP
  380. } else if err != nil {
  381. return "", false, err
  382. }
  383. if strings.HasPrefix(line, prefix) {
  384. match = line
  385. found = true
  386. }
  387. if r.CurIndex() > i {
  388. if found {
  389. return match, found, nil
  390. }
  391. continue GROUP_LOOP
  392. }
  393. }
  394. }
  395. return
  396. }
  397. // GroupInfo holds information about the group.
  398. type GroupInfo struct {
  399. MinIndex int // index of the first file in the group, including head
  400. MaxIndex int // index of the last file in the group, including head
  401. TotalSize int64 // total size of the group
  402. HeadSize int64 // size of the head
  403. }
  404. // Returns info after scanning all files in g.Head's dir.
  405. func (g *Group) ReadGroupInfo() GroupInfo {
  406. g.mtx.Lock()
  407. defer g.mtx.Unlock()
  408. return g.readGroupInfo()
  409. }
  410. // Index includes the head.
  411. // CONTRACT: caller should have called g.mtx.Lock
  412. func (g *Group) readGroupInfo() GroupInfo {
  413. groupDir := filepath.Dir(g.Head.Path)
  414. headBase := filepath.Base(g.Head.Path)
  415. var minIndex, maxIndex int = -1, -1
  416. var totalSize, headSize int64 = 0, 0
  417. dir, err := os.Open(groupDir)
  418. if err != nil {
  419. panic(err)
  420. }
  421. defer dir.Close()
  422. fiz, err := dir.Readdir(0)
  423. if err != nil {
  424. panic(err)
  425. }
  426. // For each file in the directory, filter by pattern
  427. for _, fileInfo := range fiz {
  428. if fileInfo.Name() == headBase {
  429. fileSize := fileInfo.Size()
  430. totalSize += fileSize
  431. headSize = fileSize
  432. continue
  433. } else if strings.HasPrefix(fileInfo.Name(), headBase) {
  434. fileSize := fileInfo.Size()
  435. totalSize += fileSize
  436. indexedFilePattern := regexp.MustCompile(`^.+\.([0-9]{3,})$`)
  437. submatch := indexedFilePattern.FindSubmatch([]byte(fileInfo.Name()))
  438. if len(submatch) != 0 {
  439. // Matches
  440. fileIndex, err := strconv.Atoi(string(submatch[1]))
  441. if err != nil {
  442. panic(err)
  443. }
  444. if maxIndex < fileIndex {
  445. maxIndex = fileIndex
  446. }
  447. if minIndex == -1 || fileIndex < minIndex {
  448. minIndex = fileIndex
  449. }
  450. }
  451. }
  452. }
  453. // Now account for the head.
  454. if minIndex == -1 {
  455. // If there were no numbered files,
  456. // then the head is index 0.
  457. minIndex, maxIndex = 0, 0
  458. } else {
  459. // Otherwise, the head file is 1 greater
  460. maxIndex++
  461. }
  462. return GroupInfo{minIndex, maxIndex, totalSize, headSize}
  463. }
  464. func filePathForIndex(headPath string, index int, maxIndex int) string {
  465. if index == maxIndex {
  466. return headPath
  467. }
  468. return fmt.Sprintf("%v.%03d", headPath, index)
  469. }
  470. //--------------------------------------------------------------------------------
  471. // GroupReader provides an interface for reading from a Group.
  472. type GroupReader struct {
  473. *Group
  474. mtx sync.Mutex
  475. curIndex int
  476. curFile *os.File
  477. curReader *bufio.Reader
  478. curLine []byte
  479. }
  480. func newGroupReader(g *Group) *GroupReader {
  481. return &GroupReader{
  482. Group: g,
  483. curIndex: 0,
  484. curFile: nil,
  485. curReader: nil,
  486. curLine: nil,
  487. }
  488. }
  489. // Close closes the GroupReader by closing the cursor file.
  490. func (gr *GroupReader) Close() error {
  491. gr.mtx.Lock()
  492. defer gr.mtx.Unlock()
  493. if gr.curReader != nil {
  494. err := gr.curFile.Close()
  495. gr.curIndex = 0
  496. gr.curReader = nil
  497. gr.curFile = nil
  498. gr.curLine = nil
  499. return err
  500. }
  501. return nil
  502. }
  503. // Read implements io.Reader, reading bytes from the current Reader
  504. // incrementing index until enough bytes are read.
  505. func (gr *GroupReader) Read(p []byte) (n int, err error) {
  506. lenP := len(p)
  507. if lenP == 0 {
  508. return 0, errors.New("given empty slice")
  509. }
  510. gr.mtx.Lock()
  511. defer gr.mtx.Unlock()
  512. // Open file if not open yet
  513. if gr.curReader == nil {
  514. if err = gr.openFile(gr.curIndex); err != nil {
  515. return 0, err
  516. }
  517. }
  518. // Iterate over files until enough bytes are read
  519. var nn int
  520. for {
  521. nn, err = gr.curReader.Read(p[n:])
  522. n += nn
  523. if err == io.EOF {
  524. if n >= lenP {
  525. return n, nil
  526. }
  527. // Open the next file
  528. if err1 := gr.openFile(gr.curIndex + 1); err1 != nil {
  529. return n, err1
  530. }
  531. } else if err != nil {
  532. return n, err
  533. } else if nn == 0 { // empty file
  534. return n, err
  535. }
  536. }
  537. }
  538. // ReadLine reads a line (without delimiter).
  539. // just return io.EOF if no new lines found.
  540. func (gr *GroupReader) ReadLine() (string, error) {
  541. gr.mtx.Lock()
  542. defer gr.mtx.Unlock()
  543. // From PushLine
  544. if gr.curLine != nil {
  545. line := string(gr.curLine)
  546. gr.curLine = nil
  547. return line, nil
  548. }
  549. // Open file if not open yet
  550. if gr.curReader == nil {
  551. err := gr.openFile(gr.curIndex)
  552. if err != nil {
  553. return "", err
  554. }
  555. }
  556. // Iterate over files until line is found
  557. var linePrefix string
  558. for {
  559. bytesRead, err := gr.curReader.ReadBytes('\n')
  560. if err == io.EOF {
  561. // Open the next file
  562. if err1 := gr.openFile(gr.curIndex + 1); err1 != nil {
  563. return "", err1
  564. }
  565. if len(bytesRead) > 0 && bytesRead[len(bytesRead)-1] == byte('\n') {
  566. return linePrefix + string(bytesRead[:len(bytesRead)-1]), nil
  567. }
  568. linePrefix += string(bytesRead)
  569. continue
  570. } else if err != nil {
  571. return "", err
  572. }
  573. return linePrefix + string(bytesRead[:len(bytesRead)-1]), nil
  574. }
  575. }
  576. // IF index > gr.Group.maxIndex, returns io.EOF
  577. // CONTRACT: caller should hold gr.mtx
  578. func (gr *GroupReader) openFile(index int) error {
  579. // Lock on Group to ensure that head doesn't move in the meanwhile.
  580. gr.Group.mtx.Lock()
  581. defer gr.Group.mtx.Unlock()
  582. if index > gr.Group.maxIndex {
  583. return io.EOF
  584. }
  585. curFilePath := filePathForIndex(gr.Head.Path, index, gr.Group.maxIndex)
  586. curFile, err := os.Open(curFilePath)
  587. if err != nil {
  588. return err
  589. }
  590. curReader := bufio.NewReader(curFile)
  591. // Update gr.cur*
  592. if gr.curFile != nil {
  593. gr.curFile.Close() // TODO return error?
  594. }
  595. gr.curIndex = index
  596. gr.curFile = curFile
  597. gr.curReader = curReader
  598. gr.curLine = nil
  599. return nil
  600. }
  601. // PushLine makes the given line the current one, so the next time somebody
  602. // calls ReadLine, this line will be returned.
  603. // panics if called twice without calling ReadLine.
  604. func (gr *GroupReader) PushLine(line string) {
  605. gr.mtx.Lock()
  606. defer gr.mtx.Unlock()
  607. if gr.curLine == nil {
  608. gr.curLine = []byte(line)
  609. } else {
  610. panic("PushLine failed, already have line")
  611. }
  612. }
  613. // CurIndex returns cursor's file index.
  614. func (gr *GroupReader) CurIndex() int {
  615. gr.mtx.Lock()
  616. defer gr.mtx.Unlock()
  617. return gr.curIndex
  618. }
  619. // SetIndex sets the cursor's file index to index by opening a file at this
  620. // position.
  621. func (gr *GroupReader) SetIndex(index int) error {
  622. gr.mtx.Lock()
  623. defer gr.mtx.Unlock()
  624. return gr.openFile(index)
  625. }
  626. //--------------------------------------------------------------------------------
  627. // A simple SearchFunc that assumes that the marker is of form
  628. // <prefix><number>.
  629. // For example, if prefix is '#HEIGHT:', the markers of expected to be of the form:
  630. //
  631. // #HEIGHT:1
  632. // ...
  633. // #HEIGHT:2
  634. // ...
  635. func MakeSimpleSearchFunc(prefix string, target int) SearchFunc {
  636. return func(line string) (int, error) {
  637. if !strings.HasPrefix(line, prefix) {
  638. return -1, errors.New(cmn.Fmt("Marker line did not have prefix: %v", prefix))
  639. }
  640. i, err := strconv.Atoi(line[len(prefix):])
  641. if err != nil {
  642. return -1, errors.New(cmn.Fmt("Failed to parse marker line: %v", err.Error()))
  643. }
  644. if target < i {
  645. return 1, nil
  646. } else if target == i {
  647. return 0, nil
  648. } else {
  649. return -1, nil
  650. }
  651. }
  652. }