You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

770 lines
18 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package autofile
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "log"
  8. "os"
  9. "path"
  10. "path/filepath"
  11. "regexp"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. cmn "github.com/tendermint/tendermint/libs/common"
  17. )
  18. const (
  19. defaultGroupCheckDuration = 5000 * time.Millisecond
  20. defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB
  21. defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB
  22. maxFilesToRemove = 4 // needs to be greater than 1
  23. )
  24. /*
  25. You can open a Group to keep restrictions on an AutoFile, like
  26. the maximum size of each chunk, and/or the total amount of bytes
  27. stored in the group.
  28. The first file to be written in the Group.Dir is the head file.
  29. Dir/
  30. - <HeadPath>
  31. Once the Head file reaches the size limit, it will be rotated.
  32. Dir/
  33. - <HeadPath>.000 // First rolled file
  34. - <HeadPath> // New head path, starts empty.
  35. // The implicit index is 001.
  36. As more files are written, the index numbers grow...
  37. Dir/
  38. - <HeadPath>.000 // First rolled file
  39. - <HeadPath>.001 // Second rolled file
  40. - ...
  41. - <HeadPath> // New head path
  42. The Group can also be used to binary-search for some line,
  43. assuming that marker lines are written occasionally.
  44. */
  45. type Group struct {
  46. cmn.BaseService
  47. ID string
  48. Head *AutoFile // The head AutoFile to write to
  49. headBuf *bufio.Writer
  50. Dir string // Directory that contains .Head
  51. ticker *time.Ticker
  52. mtx sync.Mutex
  53. headSizeLimit int64
  54. totalSizeLimit int64
  55. groupCheckDuration time.Duration
  56. minIndex int // Includes head
  57. maxIndex int // Includes head, where Head will move to
  58. // TODO: When we start deleting files, we need to start tracking GroupReaders
  59. // and their dependencies.
  60. }
  61. // OpenGroup creates a new Group with head at headPath. It returns an error if
  62. // it fails to open head file.
  63. func OpenGroup(headPath string, groupOptions ...func(*Group)) (g *Group, err error) {
  64. dir := path.Dir(headPath)
  65. head, err := OpenAutoFile(headPath)
  66. if err != nil {
  67. return nil, err
  68. }
  69. g = &Group{
  70. ID: "group:" + head.ID,
  71. Head: head,
  72. headBuf: bufio.NewWriterSize(head, 4096*10),
  73. Dir: dir,
  74. headSizeLimit: defaultHeadSizeLimit,
  75. totalSizeLimit: defaultTotalSizeLimit,
  76. groupCheckDuration: defaultGroupCheckDuration,
  77. minIndex: 0,
  78. maxIndex: 0,
  79. }
  80. for _, option := range groupOptions {
  81. option(g)
  82. }
  83. g.BaseService = *cmn.NewBaseService(nil, "Group", g)
  84. gInfo := g.readGroupInfo()
  85. g.minIndex = gInfo.MinIndex
  86. g.maxIndex = gInfo.MaxIndex
  87. return
  88. }
  89. // GroupCheckDuration allows you to overwrite default groupCheckDuration.
  90. func GroupCheckDuration(duration time.Duration) func(*Group) {
  91. return func(g *Group) {
  92. g.groupCheckDuration = duration
  93. }
  94. }
  95. // GroupHeadSizeLimit allows you to overwrite default head size limit - 10MB.
  96. func GroupHeadSizeLimit(limit int64) func(*Group) {
  97. return func(g *Group) {
  98. g.headSizeLimit = limit
  99. }
  100. }
  101. // GroupTotalSizeLimit allows you to overwrite default total size limit of the group - 1GB.
  102. func GroupTotalSizeLimit(limit int64) func(*Group) {
  103. return func(g *Group) {
  104. g.totalSizeLimit = limit
  105. }
  106. }
  107. // OnStart implements Service by starting the goroutine that checks file and
  108. // group limits.
  109. func (g *Group) OnStart() error {
  110. g.ticker = time.NewTicker(g.groupCheckDuration)
  111. go g.processTicks()
  112. return nil
  113. }
  114. // OnStop implements Service by stopping the goroutine described above.
  115. // NOTE: g.Head must be closed separately using Close.
  116. func (g *Group) OnStop() {
  117. g.ticker.Stop()
  118. g.Flush() // flush any uncommitted data
  119. }
  120. // Close closes the head file. The group must be stopped by this moment.
  121. func (g *Group) Close() {
  122. g.Flush() // flush any uncommitted data
  123. g.mtx.Lock()
  124. _ = g.Head.closeFile()
  125. g.mtx.Unlock()
  126. }
  127. // HeadSizeLimit returns the current head size limit.
  128. func (g *Group) HeadSizeLimit() int64 {
  129. g.mtx.Lock()
  130. defer g.mtx.Unlock()
  131. return g.headSizeLimit
  132. }
  133. // TotalSizeLimit returns total size limit of the group.
  134. func (g *Group) TotalSizeLimit() int64 {
  135. g.mtx.Lock()
  136. defer g.mtx.Unlock()
  137. return g.totalSizeLimit
  138. }
  139. // MaxIndex returns index of the last file in the group.
  140. func (g *Group) MaxIndex() int {
  141. g.mtx.Lock()
  142. defer g.mtx.Unlock()
  143. return g.maxIndex
  144. }
  145. // MinIndex returns index of the first file in the group.
  146. func (g *Group) MinIndex() int {
  147. g.mtx.Lock()
  148. defer g.mtx.Unlock()
  149. return g.minIndex
  150. }
  151. // Write writes the contents of p into the current head of the group. It
  152. // returns the number of bytes written. If nn < len(p), it also returns an
  153. // error explaining why the write is short.
  154. // NOTE: Writes are buffered so they don't write synchronously
  155. // TODO: Make it halt if space is unavailable
  156. func (g *Group) Write(p []byte) (nn int, err error) {
  157. g.mtx.Lock()
  158. defer g.mtx.Unlock()
  159. return g.headBuf.Write(p)
  160. }
  161. // WriteLine writes line into the current head of the group. It also appends "\n".
  162. // NOTE: Writes are buffered so they don't write synchronously
  163. // TODO: Make it halt if space is unavailable
  164. func (g *Group) WriteLine(line string) error {
  165. g.mtx.Lock()
  166. defer g.mtx.Unlock()
  167. _, err := g.headBuf.Write([]byte(line + "\n"))
  168. return err
  169. }
  170. // Flush writes any buffered data to the underlying file and commits the
  171. // current content of the file to stable storage.
  172. func (g *Group) Flush() error {
  173. g.mtx.Lock()
  174. defer g.mtx.Unlock()
  175. err := g.headBuf.Flush()
  176. if err == nil {
  177. err = g.Head.Sync()
  178. }
  179. return err
  180. }
  181. func (g *Group) processTicks() {
  182. for {
  183. select {
  184. case <-g.ticker.C:
  185. g.checkHeadSizeLimit()
  186. g.checkTotalSizeLimit()
  187. case <-g.Quit():
  188. return
  189. }
  190. }
  191. }
  192. // NOTE: this function is called manually in tests.
  193. func (g *Group) checkHeadSizeLimit() {
  194. limit := g.HeadSizeLimit()
  195. if limit == 0 {
  196. return
  197. }
  198. size, err := g.Head.Size()
  199. if err != nil {
  200. panic(err)
  201. }
  202. if size >= limit {
  203. g.RotateFile()
  204. }
  205. }
  206. func (g *Group) checkTotalSizeLimit() {
  207. limit := g.TotalSizeLimit()
  208. if limit == 0 {
  209. return
  210. }
  211. gInfo := g.readGroupInfo()
  212. totalSize := gInfo.TotalSize
  213. for i := 0; i < maxFilesToRemove; i++ {
  214. index := gInfo.MinIndex + i
  215. if totalSize < limit {
  216. return
  217. }
  218. if index == gInfo.MaxIndex {
  219. // Special degenerate case, just do nothing.
  220. log.Println("WARNING: Group's head " + g.Head.Path + "may grow without bound")
  221. return
  222. }
  223. pathToRemove := filePathForIndex(g.Head.Path, index, gInfo.MaxIndex)
  224. fileInfo, err := os.Stat(pathToRemove)
  225. if err != nil {
  226. log.Println("WARNING: Failed to fetch info for file @" + pathToRemove)
  227. continue
  228. }
  229. err = os.Remove(pathToRemove)
  230. if err != nil {
  231. log.Println(err)
  232. return
  233. }
  234. totalSize -= fileInfo.Size()
  235. }
  236. }
  237. // RotateFile causes group to close the current head and assign it some index.
  238. // Note it does not create a new head.
  239. func (g *Group) RotateFile() {
  240. g.mtx.Lock()
  241. defer g.mtx.Unlock()
  242. headPath := g.Head.Path
  243. if err := g.headBuf.Flush(); err != nil {
  244. panic(err) //panic is used for consistent with below
  245. }
  246. if err := g.Head.Sync(); err != nil {
  247. panic(err)
  248. }
  249. if err := g.Head.closeFile(); err != nil {
  250. panic(err)
  251. }
  252. indexPath := filePathForIndex(headPath, g.maxIndex, g.maxIndex+1)
  253. if err := os.Rename(headPath, indexPath); err != nil {
  254. panic(err)
  255. }
  256. //make sure head file exist, there is a window time between rename and next write
  257. //when NewReader(maxIndex), lead to "open /tmp/wal058868562/wal: no such file or directory"
  258. if err := g.Head.openFile(); err != nil {
  259. panic(err)
  260. }
  261. g.maxIndex++
  262. }
  263. // NewReader returns a new group reader.
  264. // CONTRACT: Caller must close the returned GroupReader.
  265. func (g *Group) NewReader(index int) (*GroupReader, error) {
  266. r := newGroupReader(g)
  267. err := r.SetIndex(index)
  268. if err != nil {
  269. return nil, err
  270. }
  271. return r, nil
  272. }
  273. // Returns -1 if line comes after, 0 if found, 1 if line comes before.
  274. type SearchFunc func(line string) (int, error)
  275. // Searches for the right file in Group, then returns a GroupReader to start
  276. // streaming lines.
  277. // Returns true if an exact match was found, otherwise returns the next greater
  278. // line that starts with prefix.
  279. // CONTRACT: Caller must close the returned GroupReader
  280. func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, bool, error) {
  281. g.mtx.Lock()
  282. minIndex, maxIndex := g.minIndex, g.maxIndex
  283. g.mtx.Unlock()
  284. // Now minIndex/maxIndex may change meanwhile,
  285. // but it shouldn't be a big deal
  286. // (maybe we'll want to limit scanUntil though)
  287. for {
  288. curIndex := (minIndex + maxIndex + 1) / 2
  289. // Base case, when there's only 1 choice left.
  290. if minIndex == maxIndex {
  291. r, err := g.NewReader(maxIndex)
  292. if err != nil {
  293. return nil, false, err
  294. }
  295. match, err := scanUntil(r, prefix, cmp)
  296. if err != nil {
  297. r.Close()
  298. return nil, false, err
  299. }
  300. return r, match, err
  301. }
  302. // Read starting roughly at the middle file,
  303. // until we find line that has prefix.
  304. r, err := g.NewReader(curIndex)
  305. if err != nil {
  306. return nil, false, err
  307. }
  308. foundIndex, line, err := scanNext(r, prefix)
  309. r.Close()
  310. if err != nil {
  311. return nil, false, err
  312. }
  313. // Compare this line to our search query.
  314. val, err := cmp(line)
  315. if err != nil {
  316. return nil, false, err
  317. }
  318. if val < 0 {
  319. // Line will come later
  320. minIndex = foundIndex
  321. } else if val == 0 {
  322. // Stroke of luck, found the line
  323. r, err := g.NewReader(foundIndex)
  324. if err != nil {
  325. return nil, false, err
  326. }
  327. match, err := scanUntil(r, prefix, cmp)
  328. if !match {
  329. panic("Expected match to be true")
  330. }
  331. if err != nil {
  332. r.Close()
  333. return nil, false, err
  334. }
  335. return r, true, err
  336. } else {
  337. // We passed it
  338. maxIndex = curIndex - 1
  339. }
  340. }
  341. }
  342. // Scans and returns the first line that starts with 'prefix'
  343. // Consumes line and returns it.
  344. func scanNext(r *GroupReader, prefix string) (int, string, error) {
  345. for {
  346. line, err := r.ReadLine()
  347. if err != nil {
  348. return 0, "", err
  349. }
  350. if !strings.HasPrefix(line, prefix) {
  351. continue
  352. }
  353. index := r.CurIndex()
  354. return index, line, nil
  355. }
  356. }
  357. // Returns true iff an exact match was found.
  358. // Pushes line, does not consume it.
  359. func scanUntil(r *GroupReader, prefix string, cmp SearchFunc) (bool, error) {
  360. for {
  361. line, err := r.ReadLine()
  362. if err != nil {
  363. return false, err
  364. }
  365. if !strings.HasPrefix(line, prefix) {
  366. continue
  367. }
  368. val, err := cmp(line)
  369. if err != nil {
  370. return false, err
  371. }
  372. if val < 0 {
  373. continue
  374. } else if val == 0 {
  375. r.PushLine(line)
  376. return true, nil
  377. } else {
  378. r.PushLine(line)
  379. return false, nil
  380. }
  381. }
  382. }
  383. // Searches backwards for the last line in Group with prefix.
  384. // Scans each file forward until the end to find the last match.
  385. func (g *Group) FindLast(prefix string) (match string, found bool, err error) {
  386. g.mtx.Lock()
  387. minIndex, maxIndex := g.minIndex, g.maxIndex
  388. g.mtx.Unlock()
  389. r, err := g.NewReader(maxIndex)
  390. if err != nil {
  391. return "", false, err
  392. }
  393. defer r.Close()
  394. // Open files from the back and read
  395. GROUP_LOOP:
  396. for i := maxIndex; i >= minIndex; i-- {
  397. err := r.SetIndex(i)
  398. if err != nil {
  399. return "", false, err
  400. }
  401. // Scan each line and test whether line matches
  402. for {
  403. line, err := r.ReadLine()
  404. if err == io.EOF {
  405. if found {
  406. return match, found, nil
  407. }
  408. continue GROUP_LOOP
  409. } else if err != nil {
  410. return "", false, err
  411. }
  412. if strings.HasPrefix(line, prefix) {
  413. match = line
  414. found = true
  415. }
  416. if r.CurIndex() > i {
  417. if found {
  418. return match, found, nil
  419. }
  420. continue GROUP_LOOP
  421. }
  422. }
  423. }
  424. return
  425. }
  426. // GroupInfo holds information about the group.
  427. type GroupInfo struct {
  428. MinIndex int // index of the first file in the group, including head
  429. MaxIndex int // index of the last file in the group, including head
  430. TotalSize int64 // total size of the group
  431. HeadSize int64 // size of the head
  432. }
  433. // Returns info after scanning all files in g.Head's dir.
  434. func (g *Group) ReadGroupInfo() GroupInfo {
  435. g.mtx.Lock()
  436. defer g.mtx.Unlock()
  437. return g.readGroupInfo()
  438. }
  439. // Index includes the head.
  440. // CONTRACT: caller should have called g.mtx.Lock
  441. func (g *Group) readGroupInfo() GroupInfo {
  442. groupDir := filepath.Dir(g.Head.Path)
  443. headBase := filepath.Base(g.Head.Path)
  444. var minIndex, maxIndex int = -1, -1
  445. var totalSize, headSize int64 = 0, 0
  446. dir, err := os.Open(groupDir)
  447. if err != nil {
  448. panic(err)
  449. }
  450. defer dir.Close()
  451. fiz, err := dir.Readdir(0)
  452. if err != nil {
  453. panic(err)
  454. }
  455. // For each file in the directory, filter by pattern
  456. for _, fileInfo := range fiz {
  457. if fileInfo.Name() == headBase {
  458. fileSize := fileInfo.Size()
  459. totalSize += fileSize
  460. headSize = fileSize
  461. continue
  462. } else if strings.HasPrefix(fileInfo.Name(), headBase) {
  463. fileSize := fileInfo.Size()
  464. totalSize += fileSize
  465. indexedFilePattern := regexp.MustCompile(`^.+\.([0-9]{3,})$`)
  466. submatch := indexedFilePattern.FindSubmatch([]byte(fileInfo.Name()))
  467. if len(submatch) != 0 {
  468. // Matches
  469. fileIndex, err := strconv.Atoi(string(submatch[1]))
  470. if err != nil {
  471. panic(err)
  472. }
  473. if maxIndex < fileIndex {
  474. maxIndex = fileIndex
  475. }
  476. if minIndex == -1 || fileIndex < minIndex {
  477. minIndex = fileIndex
  478. }
  479. }
  480. }
  481. }
  482. // Now account for the head.
  483. if minIndex == -1 {
  484. // If there were no numbered files,
  485. // then the head is index 0.
  486. minIndex, maxIndex = 0, 0
  487. } else {
  488. // Otherwise, the head file is 1 greater
  489. maxIndex++
  490. }
  491. return GroupInfo{minIndex, maxIndex, totalSize, headSize}
  492. }
  493. func filePathForIndex(headPath string, index int, maxIndex int) string {
  494. if index == maxIndex {
  495. return headPath
  496. }
  497. return fmt.Sprintf("%v.%03d", headPath, index)
  498. }
  499. //--------------------------------------------------------------------------------
  500. // GroupReader provides an interface for reading from a Group.
  501. type GroupReader struct {
  502. *Group
  503. mtx sync.Mutex
  504. curIndex int
  505. curFile *os.File
  506. curReader *bufio.Reader
  507. curLine []byte
  508. }
  509. func newGroupReader(g *Group) *GroupReader {
  510. return &GroupReader{
  511. Group: g,
  512. curIndex: 0,
  513. curFile: nil,
  514. curReader: nil,
  515. curLine: nil,
  516. }
  517. }
  518. // Close closes the GroupReader by closing the cursor file.
  519. func (gr *GroupReader) Close() error {
  520. gr.mtx.Lock()
  521. defer gr.mtx.Unlock()
  522. if gr.curReader != nil {
  523. err := gr.curFile.Close()
  524. gr.curIndex = 0
  525. gr.curReader = nil
  526. gr.curFile = nil
  527. gr.curLine = nil
  528. return err
  529. }
  530. return nil
  531. }
  532. // Read implements io.Reader, reading bytes from the current Reader
  533. // incrementing index until enough bytes are read.
  534. func (gr *GroupReader) Read(p []byte) (n int, err error) {
  535. lenP := len(p)
  536. if lenP == 0 {
  537. return 0, errors.New("given empty slice")
  538. }
  539. gr.mtx.Lock()
  540. defer gr.mtx.Unlock()
  541. // Open file if not open yet
  542. if gr.curReader == nil {
  543. if err = gr.openFile(gr.curIndex); err != nil {
  544. return 0, err
  545. }
  546. }
  547. // Iterate over files until enough bytes are read
  548. var nn int
  549. for {
  550. nn, err = gr.curReader.Read(p[n:])
  551. n += nn
  552. if err == io.EOF {
  553. if n >= lenP {
  554. return n, nil
  555. }
  556. // Open the next file
  557. if err1 := gr.openFile(gr.curIndex + 1); err1 != nil {
  558. return n, err1
  559. }
  560. } else if err != nil {
  561. return n, err
  562. } else if nn == 0 { // empty file
  563. return n, err
  564. }
  565. }
  566. }
  567. // ReadLine reads a line (without delimiter).
  568. // just return io.EOF if no new lines found.
  569. func (gr *GroupReader) ReadLine() (string, error) {
  570. gr.mtx.Lock()
  571. defer gr.mtx.Unlock()
  572. // From PushLine
  573. if gr.curLine != nil {
  574. line := string(gr.curLine)
  575. gr.curLine = nil
  576. return line, nil
  577. }
  578. // Open file if not open yet
  579. if gr.curReader == nil {
  580. err := gr.openFile(gr.curIndex)
  581. if err != nil {
  582. return "", err
  583. }
  584. }
  585. // Iterate over files until line is found
  586. var linePrefix string
  587. for {
  588. bytesRead, err := gr.curReader.ReadBytes('\n')
  589. if err == io.EOF {
  590. // Open the next file
  591. if err1 := gr.openFile(gr.curIndex + 1); err1 != nil {
  592. return "", err1
  593. }
  594. if len(bytesRead) > 0 && bytesRead[len(bytesRead)-1] == byte('\n') {
  595. return linePrefix + string(bytesRead[:len(bytesRead)-1]), nil
  596. }
  597. linePrefix += string(bytesRead)
  598. continue
  599. } else if err != nil {
  600. return "", err
  601. }
  602. return linePrefix + string(bytesRead[:len(bytesRead)-1]), nil
  603. }
  604. }
  605. // IF index > gr.Group.maxIndex, returns io.EOF
  606. // CONTRACT: caller should hold gr.mtx
  607. func (gr *GroupReader) openFile(index int) error {
  608. // Lock on Group to ensure that head doesn't move in the meanwhile.
  609. gr.Group.mtx.Lock()
  610. defer gr.Group.mtx.Unlock()
  611. if index > gr.Group.maxIndex {
  612. return io.EOF
  613. }
  614. curFilePath := filePathForIndex(gr.Head.Path, index, gr.Group.maxIndex)
  615. curFile, err := os.Open(curFilePath)
  616. if err != nil {
  617. return err
  618. }
  619. curReader := bufio.NewReader(curFile)
  620. // Update gr.cur*
  621. if gr.curFile != nil {
  622. gr.curFile.Close() // TODO return error?
  623. }
  624. gr.curIndex = index
  625. gr.curFile = curFile
  626. gr.curReader = curReader
  627. gr.curLine = nil
  628. return nil
  629. }
  630. // PushLine makes the given line the current one, so the next time somebody
  631. // calls ReadLine, this line will be returned.
  632. // panics if called twice without calling ReadLine.
  633. func (gr *GroupReader) PushLine(line string) {
  634. gr.mtx.Lock()
  635. defer gr.mtx.Unlock()
  636. if gr.curLine == nil {
  637. gr.curLine = []byte(line)
  638. } else {
  639. panic("PushLine failed, already have line")
  640. }
  641. }
  642. // CurIndex returns cursor's file index.
  643. func (gr *GroupReader) CurIndex() int {
  644. gr.mtx.Lock()
  645. defer gr.mtx.Unlock()
  646. return gr.curIndex
  647. }
  648. // SetIndex sets the cursor's file index to index by opening a file at this
  649. // position.
  650. func (gr *GroupReader) SetIndex(index int) error {
  651. gr.mtx.Lock()
  652. defer gr.mtx.Unlock()
  653. return gr.openFile(index)
  654. }
  655. //--------------------------------------------------------------------------------
  656. // A simple SearchFunc that assumes that the marker is of form
  657. // <prefix><number>.
  658. // For example, if prefix is '#HEIGHT:', the markers of expected to be of the form:
  659. //
  660. // #HEIGHT:1
  661. // ...
  662. // #HEIGHT:2
  663. // ...
  664. func MakeSimpleSearchFunc(prefix string, target int) SearchFunc {
  665. return func(line string) (int, error) {
  666. if !strings.HasPrefix(line, prefix) {
  667. return -1, fmt.Errorf("Marker line did not have prefix: %v", prefix)
  668. }
  669. i, err := strconv.Atoi(line[len(prefix):])
  670. if err != nil {
  671. return -1, fmt.Errorf("Failed to parse marker line: %v", err.Error())
  672. }
  673. if target < i {
  674. return 1, nil
  675. } else if target == i {
  676. return 0, nil
  677. } else {
  678. return -1, nil
  679. }
  680. }
  681. }