You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

775 lines
18 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
fix non deterministic test failures and race in privval socket (#3258) * node: decrease retry conn timeout in test Should fix #3256 The retry timeout was set to the default, which is the same as the accept timeout, so it's no wonder this would fail. Here we decrease the retry timeout so we can try many times before the accept timeout. * p2p: increase handshake timeout in test This fails sometimes, presumably because the handshake timeout is so low (only 50ms). So increase it to 1s. Should fix #3187 * privval: fix race with ping. closes #3237 Pings happen in a go-routine and can happen concurrently with other messages. Since we use a request/response protocol, we expect to send a request and get back the corresponding response. But with pings happening concurrently, this assumption could be violated. We were using a mutex, but only a RWMutex, where the RLock was being held for sending messages - this was to allow the underlying connection to be replaced if it fails. Turns out we actually need to use a full lock (not just a read lock) to prevent multiple requests from happening concurrently. * node: fix test name. DelayedStop -> DelayedStart * autofile: Wait() method In the TestWALTruncate in consensus/wal_test.go we remove the WAL directory at the end of the test. However the wal.Stop() does not properly wait for the autofile group to finish shutting down. Hence it was possible that the group's go-routine is still running when the cleanup happens, which causes a panic since the directory disappeared. Here we add a Wait() method to properly wait until the go-routine exits so we can safely clean up. This fixes #2852.
6 years ago
8 years ago
8 years ago
fix non deterministic test failures and race in privval socket (#3258) * node: decrease retry conn timeout in test Should fix #3256 The retry timeout was set to the default, which is the same as the accept timeout, so it's no wonder this would fail. Here we decrease the retry timeout so we can try many times before the accept timeout. * p2p: increase handshake timeout in test This fails sometimes, presumably because the handshake timeout is so low (only 50ms). So increase it to 1s. Should fix #3187 * privval: fix race with ping. closes #3237 Pings happen in a go-routine and can happen concurrently with other messages. Since we use a request/response protocol, we expect to send a request and get back the corresponding response. But with pings happening concurrently, this assumption could be violated. We were using a mutex, but only a RWMutex, where the RLock was being held for sending messages - this was to allow the underlying connection to be replaced if it fails. Turns out we actually need to use a full lock (not just a read lock) to prevent multiple requests from happening concurrently. * node: fix test name. DelayedStop -> DelayedStart * autofile: Wait() method In the TestWALTruncate in consensus/wal_test.go we remove the WAL directory at the end of the test. However the wal.Stop() does not properly wait for the autofile group to finish shutting down. Hence it was possible that the group's go-routine is still running when the cleanup happens, which causes a panic since the directory disappeared. Here we add a Wait() method to properly wait until the go-routine exits so we can safely clean up. This fixes #2852.
6 years ago
8 years ago
8 years ago
fix non deterministic test failures and race in privval socket (#3258) * node: decrease retry conn timeout in test Should fix #3256 The retry timeout was set to the default, which is the same as the accept timeout, so it's no wonder this would fail. Here we decrease the retry timeout so we can try many times before the accept timeout. * p2p: increase handshake timeout in test This fails sometimes, presumably because the handshake timeout is so low (only 50ms). So increase it to 1s. Should fix #3187 * privval: fix race with ping. closes #3237 Pings happen in a go-routine and can happen concurrently with other messages. Since we use a request/response protocol, we expect to send a request and get back the corresponding response. But with pings happening concurrently, this assumption could be violated. We were using a mutex, but only a RWMutex, where the RLock was being held for sending messages - this was to allow the underlying connection to be replaced if it fails. Turns out we actually need to use a full lock (not just a read lock) to prevent multiple requests from happening concurrently. * node: fix test name. DelayedStop -> DelayedStart * autofile: Wait() method In the TestWALTruncate in consensus/wal_test.go we remove the WAL directory at the end of the test. However the wal.Stop() does not properly wait for the autofile group to finish shutting down. Hence it was possible that the group's go-routine is still running when the cleanup happens, which causes a panic since the directory disappeared. Here we add a Wait() method to properly wait until the go-routine exits so we can safely clean up. This fixes #2852.
6 years ago
8 years ago
8 years ago
8 years ago
8 years ago
fix non deterministic test failures and race in privval socket (#3258) * node: decrease retry conn timeout in test Should fix #3256 The retry timeout was set to the default, which is the same as the accept timeout, so it's no wonder this would fail. Here we decrease the retry timeout so we can try many times before the accept timeout. * p2p: increase handshake timeout in test This fails sometimes, presumably because the handshake timeout is so low (only 50ms). So increase it to 1s. Should fix #3187 * privval: fix race with ping. closes #3237 Pings happen in a go-routine and can happen concurrently with other messages. Since we use a request/response protocol, we expect to send a request and get back the corresponding response. But with pings happening concurrently, this assumption could be violated. We were using a mutex, but only a RWMutex, where the RLock was being held for sending messages - this was to allow the underlying connection to be replaced if it fails. Turns out we actually need to use a full lock (not just a read lock) to prevent multiple requests from happening concurrently. * node: fix test name. DelayedStop -> DelayedStart * autofile: Wait() method In the TestWALTruncate in consensus/wal_test.go we remove the WAL directory at the end of the test. However the wal.Stop() does not properly wait for the autofile group to finish shutting down. Hence it was possible that the group's go-routine is still running when the cleanup happens, which causes a panic since the directory disappeared. Here we add a Wait() method to properly wait until the go-routine exits so we can safely clean up. This fixes #2852.
6 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package autofile
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "os"
  8. "path"
  9. "path/filepath"
  10. "regexp"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "time"
  15. cmn "github.com/tendermint/tendermint/libs/common"
  16. )
  17. const (
  18. defaultGroupCheckDuration = 5000 * time.Millisecond
  19. defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB
  20. defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB
  21. maxFilesToRemove = 4 // needs to be greater than 1
  22. )
  23. /*
  24. You can open a Group to keep restrictions on an AutoFile, like
  25. the maximum size of each chunk, and/or the total amount of bytes
  26. stored in the group.
  27. The first file to be written in the Group.Dir is the head file.
  28. Dir/
  29. - <HeadPath>
  30. Once the Head file reaches the size limit, it will be rotated.
  31. Dir/
  32. - <HeadPath>.000 // First rolled file
  33. - <HeadPath> // New head path, starts empty.
  34. // The implicit index is 001.
  35. As more files are written, the index numbers grow...
  36. Dir/
  37. - <HeadPath>.000 // First rolled file
  38. - <HeadPath>.001 // Second rolled file
  39. - ...
  40. - <HeadPath> // New head path
  41. The Group can also be used to binary-search for some line,
  42. assuming that marker lines are written occasionally.
  43. */
  44. type Group struct {
  45. cmn.BaseService
  46. ID string
  47. Head *AutoFile // The head AutoFile to write to
  48. headBuf *bufio.Writer
  49. Dir string // Directory that contains .Head
  50. ticker *time.Ticker
  51. mtx sync.Mutex
  52. headSizeLimit int64
  53. totalSizeLimit int64
  54. groupCheckDuration time.Duration
  55. minIndex int // Includes head
  56. maxIndex int // Includes head, where Head will move to
  57. // close this when the processTicks routine is done.
  58. // this ensures we can cleanup the dir after calling Stop
  59. // and the routine won't be trying to access it anymore
  60. doneProcessTicks chan struct{}
  61. // TODO: When we start deleting files, we need to start tracking GroupReaders
  62. // and their dependencies.
  63. }
  64. // OpenGroup creates a new Group with head at headPath. It returns an error if
  65. // it fails to open head file.
  66. func OpenGroup(headPath string, groupOptions ...func(*Group)) (g *Group, err error) {
  67. dir := path.Dir(headPath)
  68. head, err := OpenAutoFile(headPath)
  69. if err != nil {
  70. return nil, err
  71. }
  72. g = &Group{
  73. ID: "group:" + head.ID,
  74. Head: head,
  75. headBuf: bufio.NewWriterSize(head, 4096*10),
  76. Dir: dir,
  77. headSizeLimit: defaultHeadSizeLimit,
  78. totalSizeLimit: defaultTotalSizeLimit,
  79. groupCheckDuration: defaultGroupCheckDuration,
  80. minIndex: 0,
  81. maxIndex: 0,
  82. doneProcessTicks: make(chan struct{}),
  83. }
  84. for _, option := range groupOptions {
  85. option(g)
  86. }
  87. g.BaseService = *cmn.NewBaseService(nil, "Group", g)
  88. gInfo := g.readGroupInfo()
  89. g.minIndex = gInfo.MinIndex
  90. g.maxIndex = gInfo.MaxIndex
  91. return
  92. }
  93. // GroupCheckDuration allows you to overwrite default groupCheckDuration.
  94. func GroupCheckDuration(duration time.Duration) func(*Group) {
  95. return func(g *Group) {
  96. g.groupCheckDuration = duration
  97. }
  98. }
  99. // GroupHeadSizeLimit allows you to overwrite default head size limit - 10MB.
  100. func GroupHeadSizeLimit(limit int64) func(*Group) {
  101. return func(g *Group) {
  102. g.headSizeLimit = limit
  103. }
  104. }
  105. // GroupTotalSizeLimit allows you to overwrite default total size limit of the group - 1GB.
  106. func GroupTotalSizeLimit(limit int64) func(*Group) {
  107. return func(g *Group) {
  108. g.totalSizeLimit = limit
  109. }
  110. }
  111. // OnStart implements Service by starting the goroutine that checks file and
  112. // group limits.
  113. func (g *Group) OnStart() error {
  114. g.ticker = time.NewTicker(g.groupCheckDuration)
  115. go g.processTicks()
  116. return nil
  117. }
  118. // OnStop implements Service by stopping the goroutine described above.
  119. // NOTE: g.Head must be closed separately using Close.
  120. func (g *Group) OnStop() {
  121. g.ticker.Stop()
  122. g.Flush() // flush any uncommitted data
  123. }
  124. func (g *Group) Wait() {
  125. // wait for processTicks routine to finish
  126. <-g.doneProcessTicks
  127. }
  128. // Close closes the head file. The group must be stopped by this moment.
  129. func (g *Group) Close() {
  130. g.Flush() // flush any uncommitted data
  131. g.mtx.Lock()
  132. _ = g.Head.closeFile()
  133. g.mtx.Unlock()
  134. }
  135. // HeadSizeLimit returns the current head size limit.
  136. func (g *Group) HeadSizeLimit() int64 {
  137. g.mtx.Lock()
  138. defer g.mtx.Unlock()
  139. return g.headSizeLimit
  140. }
  141. // TotalSizeLimit returns total size limit of the group.
  142. func (g *Group) TotalSizeLimit() int64 {
  143. g.mtx.Lock()
  144. defer g.mtx.Unlock()
  145. return g.totalSizeLimit
  146. }
  147. // MaxIndex returns index of the last file in the group.
  148. func (g *Group) MaxIndex() int {
  149. g.mtx.Lock()
  150. defer g.mtx.Unlock()
  151. return g.maxIndex
  152. }
  153. // MinIndex returns index of the first file in the group.
  154. func (g *Group) MinIndex() int {
  155. g.mtx.Lock()
  156. defer g.mtx.Unlock()
  157. return g.minIndex
  158. }
  159. // Write writes the contents of p into the current head of the group. It
  160. // returns the number of bytes written. If nn < len(p), it also returns an
  161. // error explaining why the write is short.
  162. // NOTE: Writes are buffered so they don't write synchronously
  163. // TODO: Make it halt if space is unavailable
  164. func (g *Group) Write(p []byte) (nn int, err error) {
  165. g.mtx.Lock()
  166. defer g.mtx.Unlock()
  167. return g.headBuf.Write(p)
  168. }
  169. // WriteLine writes line into the current head of the group. It also appends "\n".
  170. // NOTE: Writes are buffered so they don't write synchronously
  171. // TODO: Make it halt if space is unavailable
  172. func (g *Group) WriteLine(line string) error {
  173. g.mtx.Lock()
  174. defer g.mtx.Unlock()
  175. _, err := g.headBuf.Write([]byte(line + "\n"))
  176. return err
  177. }
  178. // Flush writes any buffered data to the underlying file and commits the
  179. // current content of the file to stable storage.
  180. func (g *Group) Flush() error {
  181. g.mtx.Lock()
  182. defer g.mtx.Unlock()
  183. err := g.headBuf.Flush()
  184. if err == nil {
  185. err = g.Head.Sync()
  186. }
  187. return err
  188. }
  189. func (g *Group) processTicks() {
  190. defer close(g.doneProcessTicks)
  191. for {
  192. select {
  193. case <-g.ticker.C:
  194. g.checkHeadSizeLimit()
  195. g.checkTotalSizeLimit()
  196. case <-g.Quit():
  197. return
  198. }
  199. }
  200. }
  201. // NOTE: this function is called manually in tests.
  202. func (g *Group) checkHeadSizeLimit() {
  203. limit := g.HeadSizeLimit()
  204. if limit == 0 {
  205. return
  206. }
  207. size, err := g.Head.Size()
  208. if err != nil {
  209. g.Logger.Error("Group's head may grow without bound", "head", g.Head.Path, "err", err)
  210. return
  211. }
  212. if size >= limit {
  213. g.RotateFile()
  214. }
  215. }
  216. func (g *Group) checkTotalSizeLimit() {
  217. limit := g.TotalSizeLimit()
  218. if limit == 0 {
  219. return
  220. }
  221. gInfo := g.readGroupInfo()
  222. totalSize := gInfo.TotalSize
  223. for i := 0; i < maxFilesToRemove; i++ {
  224. index := gInfo.MinIndex + i
  225. if totalSize < limit {
  226. return
  227. }
  228. if index == gInfo.MaxIndex {
  229. // Special degenerate case, just do nothing.
  230. g.Logger.Error("Group's head may grow without bound", "head", g.Head.Path)
  231. return
  232. }
  233. pathToRemove := filePathForIndex(g.Head.Path, index, gInfo.MaxIndex)
  234. fInfo, err := os.Stat(pathToRemove)
  235. if err != nil {
  236. g.Logger.Error("Failed to fetch info for file", "file", pathToRemove)
  237. continue
  238. }
  239. err = os.Remove(pathToRemove)
  240. if err != nil {
  241. g.Logger.Error("Failed to remove path", "path", pathToRemove)
  242. return
  243. }
  244. totalSize -= fInfo.Size()
  245. }
  246. }
  247. // RotateFile causes group to close the current head and assign it some index.
  248. // Note it does not create a new head.
  249. func (g *Group) RotateFile() {
  250. g.mtx.Lock()
  251. defer g.mtx.Unlock()
  252. headPath := g.Head.Path
  253. if err := g.headBuf.Flush(); err != nil {
  254. panic(err)
  255. }
  256. if err := g.Head.Sync(); err != nil {
  257. panic(err)
  258. }
  259. if err := g.Head.closeFile(); err != nil {
  260. panic(err)
  261. }
  262. indexPath := filePathForIndex(headPath, g.maxIndex, g.maxIndex+1)
  263. if err := os.Rename(headPath, indexPath); err != nil {
  264. panic(err)
  265. }
  266. g.maxIndex++
  267. }
  268. // NewReader returns a new group reader.
  269. // CONTRACT: Caller must close the returned GroupReader.
  270. func (g *Group) NewReader(index int) (*GroupReader, error) {
  271. r := newGroupReader(g)
  272. err := r.SetIndex(index)
  273. if err != nil {
  274. return nil, err
  275. }
  276. return r, nil
  277. }
  278. // Returns -1 if line comes after, 0 if found, 1 if line comes before.
  279. type SearchFunc func(line string) (int, error)
  280. // Searches for the right file in Group, then returns a GroupReader to start
  281. // streaming lines.
  282. // Returns true if an exact match was found, otherwise returns the next greater
  283. // line that starts with prefix.
  284. // CONTRACT: Caller must close the returned GroupReader
  285. func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, bool, error) {
  286. g.mtx.Lock()
  287. minIndex, maxIndex := g.minIndex, g.maxIndex
  288. g.mtx.Unlock()
  289. // Now minIndex/maxIndex may change meanwhile,
  290. // but it shouldn't be a big deal
  291. // (maybe we'll want to limit scanUntil though)
  292. for {
  293. curIndex := (minIndex + maxIndex + 1) / 2
  294. // Base case, when there's only 1 choice left.
  295. if minIndex == maxIndex {
  296. r, err := g.NewReader(maxIndex)
  297. if err != nil {
  298. return nil, false, err
  299. }
  300. match, err := scanUntil(r, prefix, cmp)
  301. if err != nil {
  302. r.Close()
  303. return nil, false, err
  304. }
  305. return r, match, err
  306. }
  307. // Read starting roughly at the middle file,
  308. // until we find line that has prefix.
  309. r, err := g.NewReader(curIndex)
  310. if err != nil {
  311. return nil, false, err
  312. }
  313. foundIndex, line, err := scanNext(r, prefix)
  314. r.Close()
  315. if err != nil {
  316. return nil, false, err
  317. }
  318. // Compare this line to our search query.
  319. val, err := cmp(line)
  320. if err != nil {
  321. return nil, false, err
  322. }
  323. if val < 0 {
  324. // Line will come later
  325. minIndex = foundIndex
  326. } else if val == 0 {
  327. // Stroke of luck, found the line
  328. r, err := g.NewReader(foundIndex)
  329. if err != nil {
  330. return nil, false, err
  331. }
  332. match, err := scanUntil(r, prefix, cmp)
  333. if !match {
  334. panic("Expected match to be true")
  335. }
  336. if err != nil {
  337. r.Close()
  338. return nil, false, err
  339. }
  340. return r, true, err
  341. } else {
  342. // We passed it
  343. maxIndex = curIndex - 1
  344. }
  345. }
  346. }
  347. // Scans and returns the first line that starts with 'prefix'
  348. // Consumes line and returns it.
  349. func scanNext(r *GroupReader, prefix string) (int, string, error) {
  350. for {
  351. line, err := r.ReadLine()
  352. if err != nil {
  353. return 0, "", err
  354. }
  355. if !strings.HasPrefix(line, prefix) {
  356. continue
  357. }
  358. index := r.CurIndex()
  359. return index, line, nil
  360. }
  361. }
  362. // Returns true iff an exact match was found.
  363. // Pushes line, does not consume it.
  364. func scanUntil(r *GroupReader, prefix string, cmp SearchFunc) (bool, error) {
  365. for {
  366. line, err := r.ReadLine()
  367. if err != nil {
  368. return false, err
  369. }
  370. if !strings.HasPrefix(line, prefix) {
  371. continue
  372. }
  373. val, err := cmp(line)
  374. if err != nil {
  375. return false, err
  376. }
  377. if val < 0 {
  378. continue
  379. } else if val == 0 {
  380. r.PushLine(line)
  381. return true, nil
  382. } else {
  383. r.PushLine(line)
  384. return false, nil
  385. }
  386. }
  387. }
  388. // Searches backwards for the last line in Group with prefix.
  389. // Scans each file forward until the end to find the last match.
  390. func (g *Group) FindLast(prefix string) (match string, found bool, err error) {
  391. g.mtx.Lock()
  392. minIndex, maxIndex := g.minIndex, g.maxIndex
  393. g.mtx.Unlock()
  394. r, err := g.NewReader(maxIndex)
  395. if err != nil {
  396. return "", false, err
  397. }
  398. defer r.Close()
  399. // Open files from the back and read
  400. GROUP_LOOP:
  401. for i := maxIndex; i >= minIndex; i-- {
  402. err := r.SetIndex(i)
  403. if err != nil {
  404. return "", false, err
  405. }
  406. // Scan each line and test whether line matches
  407. for {
  408. line, err := r.ReadLine()
  409. if err == io.EOF {
  410. if found {
  411. return match, found, nil
  412. }
  413. continue GROUP_LOOP
  414. } else if err != nil {
  415. return "", false, err
  416. }
  417. if strings.HasPrefix(line, prefix) {
  418. match = line
  419. found = true
  420. }
  421. if r.CurIndex() > i {
  422. if found {
  423. return match, found, nil
  424. }
  425. continue GROUP_LOOP
  426. }
  427. }
  428. }
  429. return
  430. }
  431. // GroupInfo holds information about the group.
  432. type GroupInfo struct {
  433. MinIndex int // index of the first file in the group, including head
  434. MaxIndex int // index of the last file in the group, including head
  435. TotalSize int64 // total size of the group
  436. HeadSize int64 // size of the head
  437. }
  438. // Returns info after scanning all files in g.Head's dir.
  439. func (g *Group) ReadGroupInfo() GroupInfo {
  440. g.mtx.Lock()
  441. defer g.mtx.Unlock()
  442. return g.readGroupInfo()
  443. }
  444. // Index includes the head.
  445. // CONTRACT: caller should have called g.mtx.Lock
  446. func (g *Group) readGroupInfo() GroupInfo {
  447. groupDir := filepath.Dir(g.Head.Path)
  448. headBase := filepath.Base(g.Head.Path)
  449. var minIndex, maxIndex int = -1, -1
  450. var totalSize, headSize int64 = 0, 0
  451. dir, err := os.Open(groupDir)
  452. if err != nil {
  453. panic(err)
  454. }
  455. defer dir.Close()
  456. fiz, err := dir.Readdir(0)
  457. if err != nil {
  458. panic(err)
  459. }
  460. // For each file in the directory, filter by pattern
  461. for _, fileInfo := range fiz {
  462. if fileInfo.Name() == headBase {
  463. fileSize := fileInfo.Size()
  464. totalSize += fileSize
  465. headSize = fileSize
  466. continue
  467. } else if strings.HasPrefix(fileInfo.Name(), headBase) {
  468. fileSize := fileInfo.Size()
  469. totalSize += fileSize
  470. indexedFilePattern := regexp.MustCompile(`^.+\.([0-9]{3,})$`)
  471. submatch := indexedFilePattern.FindSubmatch([]byte(fileInfo.Name()))
  472. if len(submatch) != 0 {
  473. // Matches
  474. fileIndex, err := strconv.Atoi(string(submatch[1]))
  475. if err != nil {
  476. panic(err)
  477. }
  478. if maxIndex < fileIndex {
  479. maxIndex = fileIndex
  480. }
  481. if minIndex == -1 || fileIndex < minIndex {
  482. minIndex = fileIndex
  483. }
  484. }
  485. }
  486. }
  487. // Now account for the head.
  488. if minIndex == -1 {
  489. // If there were no numbered files,
  490. // then the head is index 0.
  491. minIndex, maxIndex = 0, 0
  492. } else {
  493. // Otherwise, the head file is 1 greater
  494. maxIndex++
  495. }
  496. return GroupInfo{minIndex, maxIndex, totalSize, headSize}
  497. }
  498. func filePathForIndex(headPath string, index int, maxIndex int) string {
  499. if index == maxIndex {
  500. return headPath
  501. }
  502. return fmt.Sprintf("%v.%03d", headPath, index)
  503. }
  504. //--------------------------------------------------------------------------------
  505. // GroupReader provides an interface for reading from a Group.
  506. type GroupReader struct {
  507. *Group
  508. mtx sync.Mutex
  509. curIndex int
  510. curFile *os.File
  511. curReader *bufio.Reader
  512. curLine []byte
  513. }
  514. func newGroupReader(g *Group) *GroupReader {
  515. return &GroupReader{
  516. Group: g,
  517. curIndex: 0,
  518. curFile: nil,
  519. curReader: nil,
  520. curLine: nil,
  521. }
  522. }
  523. // Close closes the GroupReader by closing the cursor file.
  524. func (gr *GroupReader) Close() error {
  525. gr.mtx.Lock()
  526. defer gr.mtx.Unlock()
  527. if gr.curReader != nil {
  528. err := gr.curFile.Close()
  529. gr.curIndex = 0
  530. gr.curReader = nil
  531. gr.curFile = nil
  532. gr.curLine = nil
  533. return err
  534. }
  535. return nil
  536. }
  537. // Read implements io.Reader, reading bytes from the current Reader
  538. // incrementing index until enough bytes are read.
  539. func (gr *GroupReader) Read(p []byte) (n int, err error) {
  540. lenP := len(p)
  541. if lenP == 0 {
  542. return 0, errors.New("given empty slice")
  543. }
  544. gr.mtx.Lock()
  545. defer gr.mtx.Unlock()
  546. // Open file if not open yet
  547. if gr.curReader == nil {
  548. if err = gr.openFile(gr.curIndex); err != nil {
  549. return 0, err
  550. }
  551. }
  552. // Iterate over files until enough bytes are read
  553. var nn int
  554. for {
  555. nn, err = gr.curReader.Read(p[n:])
  556. n += nn
  557. if err == io.EOF {
  558. if n >= lenP {
  559. return n, nil
  560. }
  561. // Open the next file
  562. if err1 := gr.openFile(gr.curIndex + 1); err1 != nil {
  563. return n, err1
  564. }
  565. } else if err != nil {
  566. return n, err
  567. } else if nn == 0 { // empty file
  568. return n, err
  569. }
  570. }
  571. }
  572. // ReadLine reads a line (without delimiter).
  573. // just return io.EOF if no new lines found.
  574. func (gr *GroupReader) ReadLine() (string, error) {
  575. gr.mtx.Lock()
  576. defer gr.mtx.Unlock()
  577. // From PushLine
  578. if gr.curLine != nil {
  579. line := string(gr.curLine)
  580. gr.curLine = nil
  581. return line, nil
  582. }
  583. // Open file if not open yet
  584. if gr.curReader == nil {
  585. err := gr.openFile(gr.curIndex)
  586. if err != nil {
  587. return "", err
  588. }
  589. }
  590. // Iterate over files until line is found
  591. var linePrefix string
  592. for {
  593. bytesRead, err := gr.curReader.ReadBytes('\n')
  594. if err == io.EOF {
  595. // Open the next file
  596. if err1 := gr.openFile(gr.curIndex + 1); err1 != nil {
  597. return "", err1
  598. }
  599. if len(bytesRead) > 0 && bytesRead[len(bytesRead)-1] == byte('\n') {
  600. return linePrefix + string(bytesRead[:len(bytesRead)-1]), nil
  601. }
  602. linePrefix += string(bytesRead)
  603. continue
  604. } else if err != nil {
  605. return "", err
  606. }
  607. return linePrefix + string(bytesRead[:len(bytesRead)-1]), nil
  608. }
  609. }
  610. // IF index > gr.Group.maxIndex, returns io.EOF
  611. // CONTRACT: caller should hold gr.mtx
  612. func (gr *GroupReader) openFile(index int) error {
  613. // Lock on Group to ensure that head doesn't move in the meanwhile.
  614. gr.Group.mtx.Lock()
  615. defer gr.Group.mtx.Unlock()
  616. if index > gr.Group.maxIndex {
  617. return io.EOF
  618. }
  619. curFilePath := filePathForIndex(gr.Head.Path, index, gr.Group.maxIndex)
  620. curFile, err := os.OpenFile(curFilePath, os.O_RDONLY|os.O_CREATE, autoFilePerms)
  621. if err != nil {
  622. return err
  623. }
  624. curReader := bufio.NewReader(curFile)
  625. // Update gr.cur*
  626. if gr.curFile != nil {
  627. gr.curFile.Close() // TODO return error?
  628. }
  629. gr.curIndex = index
  630. gr.curFile = curFile
  631. gr.curReader = curReader
  632. gr.curLine = nil
  633. return nil
  634. }
  635. // PushLine makes the given line the current one, so the next time somebody
  636. // calls ReadLine, this line will be returned.
  637. // panics if called twice without calling ReadLine.
  638. func (gr *GroupReader) PushLine(line string) {
  639. gr.mtx.Lock()
  640. defer gr.mtx.Unlock()
  641. if gr.curLine == nil {
  642. gr.curLine = []byte(line)
  643. } else {
  644. panic("PushLine failed, already have line")
  645. }
  646. }
  647. // CurIndex returns cursor's file index.
  648. func (gr *GroupReader) CurIndex() int {
  649. gr.mtx.Lock()
  650. defer gr.mtx.Unlock()
  651. return gr.curIndex
  652. }
  653. // SetIndex sets the cursor's file index to index by opening a file at this
  654. // position.
  655. func (gr *GroupReader) SetIndex(index int) error {
  656. gr.mtx.Lock()
  657. defer gr.mtx.Unlock()
  658. return gr.openFile(index)
  659. }
  660. //--------------------------------------------------------------------------------
  661. // A simple SearchFunc that assumes that the marker is of form
  662. // <prefix><number>.
  663. // For example, if prefix is '#HEIGHT:', the markers of expected to be of the form:
  664. //
  665. // #HEIGHT:1
  666. // ...
  667. // #HEIGHT:2
  668. // ...
  669. func MakeSimpleSearchFunc(prefix string, target int) SearchFunc {
  670. return func(line string) (int, error) {
  671. if !strings.HasPrefix(line, prefix) {
  672. return -1, fmt.Errorf("Marker line did not have prefix: %v", prefix)
  673. }
  674. i, err := strconv.Atoi(line[len(prefix):])
  675. if err != nil {
  676. return -1, fmt.Errorf("Failed to parse marker line: %v", err.Error())
  677. }
  678. if target < i {
  679. return 1, nil
  680. } else if target == i {
  681. return 0, nil
  682. } else {
  683. return -1, nil
  684. }
  685. }
  686. }