You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

669 lines
14 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package autofile
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "log"
  8. "os"
  9. "path"
  10. "path/filepath"
  11. "regexp"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. . "github.com/tendermint/tmlibs/common"
  17. )
  18. /*
  19. You can open a Group to keep restrictions on an AutoFile, like
  20. the maximum size of each chunk, and/or the total amount of bytes
  21. stored in the group.
  22. The first file to be written in the Group.Dir is the head file.
  23. Dir/
  24. - <HeadPath>
  25. Once the Head file reaches the size limit, it will be rotated.
  26. Dir/
  27. - <HeadPath>.000 // First rolled file
  28. - <HeadPath> // New head path, starts empty.
  29. // The implicit index is 001.
  30. As more files are written, the index numbers grow...
  31. Dir/
  32. - <HeadPath>.000 // First rolled file
  33. - <HeadPath>.001 // Second rolled file
  34. - ...
  35. - <HeadPath> // New head path
  36. The Group can also be used to binary-search for some line,
  37. assuming that marker lines are written occasionally.
  38. */
  39. const groupCheckDuration = 5000 * time.Millisecond
  40. const defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB
  41. const defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB
  42. const maxFilesToRemove = 4 // needs to be greater than 1
  43. type Group struct {
  44. BaseService
  45. ID string
  46. Head *AutoFile // The head AutoFile to write to
  47. headBuf *bufio.Writer
  48. Dir string // Directory that contains .Head
  49. ticker *time.Ticker
  50. mtx sync.Mutex
  51. headSizeLimit int64
  52. totalSizeLimit int64
  53. minIndex int // Includes head
  54. maxIndex int // Includes head, where Head will move to
  55. // TODO: When we start deleting files, we need to start tracking GroupReaders
  56. // and their dependencies.
  57. }
  58. func OpenGroup(headPath string) (g *Group, err error) {
  59. dir := path.Dir(headPath)
  60. head, err := OpenAutoFile(headPath)
  61. if err != nil {
  62. return nil, err
  63. }
  64. g = &Group{
  65. ID: "group:" + head.ID,
  66. Head: head,
  67. headBuf: bufio.NewWriterSize(head, 4096*10),
  68. Dir: dir,
  69. ticker: time.NewTicker(groupCheckDuration),
  70. headSizeLimit: defaultHeadSizeLimit,
  71. totalSizeLimit: defaultTotalSizeLimit,
  72. minIndex: 0,
  73. maxIndex: 0,
  74. }
  75. g.BaseService = *NewBaseService(nil, "Group", g)
  76. gInfo := g.readGroupInfo()
  77. g.minIndex = gInfo.MinIndex
  78. g.maxIndex = gInfo.MaxIndex
  79. return
  80. }
  81. func (g *Group) OnStart() error {
  82. g.BaseService.OnStart()
  83. go g.processTicks()
  84. return nil
  85. }
  86. // NOTE: g.Head must be closed separately
  87. func (g *Group) OnStop() {
  88. g.BaseService.OnStop()
  89. g.ticker.Stop()
  90. return
  91. }
  92. func (g *Group) SetHeadSizeLimit(limit int64) {
  93. g.mtx.Lock()
  94. g.headSizeLimit = limit
  95. g.mtx.Unlock()
  96. }
  97. func (g *Group) HeadSizeLimit() int64 {
  98. g.mtx.Lock()
  99. defer g.mtx.Unlock()
  100. return g.headSizeLimit
  101. }
  102. func (g *Group) SetTotalSizeLimit(limit int64) {
  103. g.mtx.Lock()
  104. g.totalSizeLimit = limit
  105. g.mtx.Unlock()
  106. }
  107. func (g *Group) TotalSizeLimit() int64 {
  108. g.mtx.Lock()
  109. defer g.mtx.Unlock()
  110. return g.totalSizeLimit
  111. }
  112. func (g *Group) MaxIndex() int {
  113. g.mtx.Lock()
  114. defer g.mtx.Unlock()
  115. return g.maxIndex
  116. }
  117. // Auto appends "\n"
  118. // NOTE: Writes are buffered so they don't write synchronously
  119. // TODO: Make it halt if space is unavailable
  120. func (g *Group) WriteLine(line string) error {
  121. g.mtx.Lock()
  122. defer g.mtx.Unlock()
  123. _, err := g.headBuf.Write([]byte(line + "\n"))
  124. return err
  125. }
  126. func (g *Group) Flush() error {
  127. g.mtx.Lock()
  128. defer g.mtx.Unlock()
  129. err := g.headBuf.Flush()
  130. if err == nil {
  131. err = g.Head.Sync()
  132. }
  133. return err
  134. }
  135. func (g *Group) processTicks() {
  136. for {
  137. _, ok := <-g.ticker.C
  138. if !ok {
  139. return // Done.
  140. }
  141. g.checkHeadSizeLimit()
  142. g.checkTotalSizeLimit()
  143. }
  144. }
  145. // NOTE: for testing
  146. func (g *Group) stopTicker() {
  147. g.ticker.Stop()
  148. }
  149. // NOTE: this function is called manually in tests.
  150. func (g *Group) checkHeadSizeLimit() {
  151. limit := g.HeadSizeLimit()
  152. if limit == 0 {
  153. return
  154. }
  155. size, err := g.Head.Size()
  156. if err != nil {
  157. panic(err)
  158. }
  159. if size >= limit {
  160. g.RotateFile()
  161. }
  162. }
  163. func (g *Group) checkTotalSizeLimit() {
  164. limit := g.TotalSizeLimit()
  165. if limit == 0 {
  166. return
  167. }
  168. gInfo := g.readGroupInfo()
  169. totalSize := gInfo.TotalSize
  170. for i := 0; i < maxFilesToRemove; i++ {
  171. index := gInfo.MinIndex + i
  172. if totalSize < limit {
  173. return
  174. }
  175. if index == gInfo.MaxIndex {
  176. // Special degenerate case, just do nothing.
  177. log.Println("WARNING: Group's head " + g.Head.Path + "may grow without bound")
  178. return
  179. }
  180. pathToRemove := filePathForIndex(g.Head.Path, index, gInfo.MaxIndex)
  181. fileInfo, err := os.Stat(pathToRemove)
  182. if err != nil {
  183. log.Println("WARNING: Failed to fetch info for file @" + pathToRemove)
  184. continue
  185. }
  186. err = os.Remove(pathToRemove)
  187. if err != nil {
  188. log.Println(err)
  189. return
  190. }
  191. totalSize -= fileInfo.Size()
  192. }
  193. }
  194. func (g *Group) RotateFile() {
  195. g.mtx.Lock()
  196. defer g.mtx.Unlock()
  197. headPath := g.Head.Path
  198. if err := g.Head.closeFile(); err != nil {
  199. panic(err)
  200. }
  201. indexPath := filePathForIndex(headPath, g.maxIndex, g.maxIndex+1)
  202. if err := os.Rename(headPath, indexPath); err != nil {
  203. panic(err)
  204. }
  205. g.maxIndex += 1
  206. }
  207. // NOTE: if error, returns no GroupReader.
  208. // CONTRACT: Caller must close the returned GroupReader
  209. func (g *Group) NewReader(index int) (*GroupReader, error) {
  210. r := newGroupReader(g)
  211. err := r.SetIndex(index)
  212. if err != nil {
  213. return nil, err
  214. } else {
  215. return r, nil
  216. }
  217. }
  218. // Returns -1 if line comes after, 0 if found, 1 if line comes before.
  219. type SearchFunc func(line string) (int, error)
  220. // Searches for the right file in Group, then returns a GroupReader to start
  221. // streaming lines.
  222. // Returns true if an exact match was found, otherwise returns the next greater
  223. // line that starts with prefix.
  224. // CONTRACT: Caller must close the returned GroupReader
  225. func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, bool, error) {
  226. g.mtx.Lock()
  227. minIndex, maxIndex := g.minIndex, g.maxIndex
  228. g.mtx.Unlock()
  229. // Now minIndex/maxIndex may change meanwhile,
  230. // but it shouldn't be a big deal
  231. // (maybe we'll want to limit scanUntil though)
  232. for {
  233. curIndex := (minIndex + maxIndex + 1) / 2
  234. // Base case, when there's only 1 choice left.
  235. if minIndex == maxIndex {
  236. r, err := g.NewReader(maxIndex)
  237. if err != nil {
  238. return nil, false, err
  239. }
  240. match, err := scanUntil(r, prefix, cmp)
  241. if err != nil {
  242. r.Close()
  243. return nil, false, err
  244. } else {
  245. return r, match, err
  246. }
  247. }
  248. // Read starting roughly at the middle file,
  249. // until we find line that has prefix.
  250. r, err := g.NewReader(curIndex)
  251. if err != nil {
  252. return nil, false, err
  253. }
  254. foundIndex, line, err := scanNext(r, prefix)
  255. r.Close()
  256. if err != nil {
  257. return nil, false, err
  258. }
  259. // Compare this line to our search query.
  260. val, err := cmp(line)
  261. if err != nil {
  262. return nil, false, err
  263. }
  264. if val < 0 {
  265. // Line will come later
  266. minIndex = foundIndex
  267. } else if val == 0 {
  268. // Stroke of luck, found the line
  269. r, err := g.NewReader(foundIndex)
  270. if err != nil {
  271. return nil, false, err
  272. }
  273. match, err := scanUntil(r, prefix, cmp)
  274. if !match {
  275. panic("Expected match to be true")
  276. }
  277. if err != nil {
  278. r.Close()
  279. return nil, false, err
  280. } else {
  281. return r, true, err
  282. }
  283. } else {
  284. // We passed it
  285. maxIndex = curIndex - 1
  286. }
  287. }
  288. }
  289. // Scans and returns the first line that starts with 'prefix'
  290. // Consumes line and returns it.
  291. func scanNext(r *GroupReader, prefix string) (int, string, error) {
  292. for {
  293. line, err := r.ReadLine()
  294. if err != nil {
  295. return 0, "", err
  296. }
  297. if !strings.HasPrefix(line, prefix) {
  298. continue
  299. }
  300. index := r.CurIndex()
  301. return index, line, nil
  302. }
  303. }
  304. // Returns true iff an exact match was found.
  305. // Pushes line, does not consume it.
  306. func scanUntil(r *GroupReader, prefix string, cmp SearchFunc) (bool, error) {
  307. for {
  308. line, err := r.ReadLine()
  309. if err != nil {
  310. return false, err
  311. }
  312. if !strings.HasPrefix(line, prefix) {
  313. continue
  314. }
  315. val, err := cmp(line)
  316. if err != nil {
  317. return false, err
  318. }
  319. if val < 0 {
  320. continue
  321. } else if val == 0 {
  322. r.PushLine(line)
  323. return true, nil
  324. } else {
  325. r.PushLine(line)
  326. return false, nil
  327. }
  328. }
  329. }
  330. // Searches backwards for the last line in Group with prefix.
  331. // Scans each file forward until the end to find the last match.
  332. func (g *Group) FindLast(prefix string) (match string, found bool, err error) {
  333. g.mtx.Lock()
  334. minIndex, maxIndex := g.minIndex, g.maxIndex
  335. g.mtx.Unlock()
  336. r, err := g.NewReader(maxIndex)
  337. if err != nil {
  338. return "", false, err
  339. }
  340. defer r.Close()
  341. // Open files from the back and read
  342. GROUP_LOOP:
  343. for i := maxIndex; i >= minIndex; i-- {
  344. err := r.SetIndex(i)
  345. if err != nil {
  346. return "", false, err
  347. }
  348. // Scan each line and test whether line matches
  349. for {
  350. line, err := r.ReadLine()
  351. if err == io.EOF {
  352. if found {
  353. return match, found, nil
  354. } else {
  355. continue GROUP_LOOP
  356. }
  357. } else if err != nil {
  358. return "", false, err
  359. }
  360. if strings.HasPrefix(line, prefix) {
  361. match = line
  362. found = true
  363. }
  364. if r.CurIndex() > i {
  365. if found {
  366. return match, found, nil
  367. } else {
  368. continue GROUP_LOOP
  369. }
  370. }
  371. }
  372. }
  373. return
  374. }
  375. type GroupInfo struct {
  376. MinIndex int
  377. MaxIndex int
  378. TotalSize int64
  379. HeadSize int64
  380. }
  381. // Returns info after scanning all files in g.Head's dir
  382. func (g *Group) ReadGroupInfo() GroupInfo {
  383. g.mtx.Lock()
  384. defer g.mtx.Unlock()
  385. return g.readGroupInfo()
  386. }
  387. // Index includes the head.
  388. // CONTRACT: caller should have called g.mtx.Lock
  389. func (g *Group) readGroupInfo() GroupInfo {
  390. groupDir := filepath.Dir(g.Head.Path)
  391. headBase := filepath.Base(g.Head.Path)
  392. var minIndex, maxIndex int = -1, -1
  393. var totalSize, headSize int64 = 0, 0
  394. dir, err := os.Open(groupDir)
  395. if err != nil {
  396. panic(err)
  397. }
  398. defer dir.Close()
  399. fiz, err := dir.Readdir(0)
  400. if err != nil {
  401. panic(err)
  402. }
  403. // For each file in the directory, filter by pattern
  404. for _, fileInfo := range fiz {
  405. if fileInfo.Name() == headBase {
  406. fileSize := fileInfo.Size()
  407. totalSize += fileSize
  408. headSize = fileSize
  409. continue
  410. } else if strings.HasPrefix(fileInfo.Name(), headBase) {
  411. fileSize := fileInfo.Size()
  412. totalSize += fileSize
  413. indexedFilePattern := regexp.MustCompile(`^.+\.([0-9]{3,})$`)
  414. submatch := indexedFilePattern.FindSubmatch([]byte(fileInfo.Name()))
  415. if len(submatch) != 0 {
  416. // Matches
  417. fileIndex, err := strconv.Atoi(string(submatch[1]))
  418. if err != nil {
  419. panic(err)
  420. }
  421. if maxIndex < fileIndex {
  422. maxIndex = fileIndex
  423. }
  424. if minIndex == -1 || fileIndex < minIndex {
  425. minIndex = fileIndex
  426. }
  427. }
  428. }
  429. }
  430. // Now account for the head.
  431. if minIndex == -1 {
  432. // If there were no numbered files,
  433. // then the head is index 0.
  434. minIndex, maxIndex = 0, 0
  435. } else {
  436. // Otherwise, the head file is 1 greater
  437. maxIndex += 1
  438. }
  439. return GroupInfo{minIndex, maxIndex, totalSize, headSize}
  440. }
  441. func filePathForIndex(headPath string, index int, maxIndex int) string {
  442. if index == maxIndex {
  443. return headPath
  444. } else {
  445. return fmt.Sprintf("%v.%03d", headPath, index)
  446. }
  447. }
  448. //--------------------------------------------------------------------------------
  449. type GroupReader struct {
  450. *Group
  451. mtx sync.Mutex
  452. curIndex int
  453. curFile *os.File
  454. curReader *bufio.Reader
  455. curLine []byte
  456. }
  457. func newGroupReader(g *Group) *GroupReader {
  458. return &GroupReader{
  459. Group: g,
  460. curIndex: 0,
  461. curFile: nil,
  462. curReader: nil,
  463. curLine: nil,
  464. }
  465. }
  466. func (gr *GroupReader) Close() error {
  467. gr.mtx.Lock()
  468. defer gr.mtx.Unlock()
  469. if gr.curReader != nil {
  470. err := gr.curFile.Close()
  471. gr.curIndex = 0
  472. gr.curReader = nil
  473. gr.curFile = nil
  474. gr.curLine = nil
  475. return err
  476. } else {
  477. return nil
  478. }
  479. }
  480. // Reads a line (without delimiter)
  481. // just return io.EOF if no new lines found.
  482. func (gr *GroupReader) ReadLine() (string, error) {
  483. gr.mtx.Lock()
  484. defer gr.mtx.Unlock()
  485. // From PushLine
  486. if gr.curLine != nil {
  487. line := string(gr.curLine)
  488. gr.curLine = nil
  489. return line, nil
  490. }
  491. // Open file if not open yet
  492. if gr.curReader == nil {
  493. err := gr.openFile(gr.curIndex)
  494. if err != nil {
  495. return "", err
  496. }
  497. }
  498. // Iterate over files until line is found
  499. var linePrefix string
  500. for {
  501. bytesRead, err := gr.curReader.ReadBytes('\n')
  502. if err == io.EOF {
  503. // Open the next file
  504. err := gr.openFile(gr.curIndex + 1)
  505. if err != nil {
  506. return "", err
  507. }
  508. if len(bytesRead) > 0 && bytesRead[len(bytesRead)-1] == byte('\n') {
  509. return linePrefix + string(bytesRead[:len(bytesRead)-1]), nil
  510. } else {
  511. linePrefix += string(bytesRead)
  512. continue
  513. }
  514. } else if err != nil {
  515. return "", err
  516. }
  517. return linePrefix + string(bytesRead[:len(bytesRead)-1]), nil
  518. }
  519. }
  520. // IF index > gr.Group.maxIndex, returns io.EOF
  521. // CONTRACT: caller should hold gr.mtx
  522. func (gr *GroupReader) openFile(index int) error {
  523. // Lock on Group to ensure that head doesn't move in the meanwhile.
  524. gr.Group.mtx.Lock()
  525. defer gr.Group.mtx.Unlock()
  526. if index > gr.Group.maxIndex {
  527. return io.EOF
  528. }
  529. curFilePath := filePathForIndex(gr.Head.Path, index, gr.Group.maxIndex)
  530. curFile, err := os.Open(curFilePath)
  531. if err != nil {
  532. return err
  533. }
  534. curReader := bufio.NewReader(curFile)
  535. // Update gr.cur*
  536. if gr.curFile != nil {
  537. gr.curFile.Close() // TODO return error?
  538. }
  539. gr.curIndex = index
  540. gr.curFile = curFile
  541. gr.curReader = curReader
  542. gr.curLine = nil
  543. return nil
  544. }
  545. func (gr *GroupReader) PushLine(line string) {
  546. gr.mtx.Lock()
  547. defer gr.mtx.Unlock()
  548. if gr.curLine == nil {
  549. gr.curLine = []byte(line)
  550. } else {
  551. panic("PushLine failed, already have line")
  552. }
  553. }
  554. // Cursor's file index.
  555. func (gr *GroupReader) CurIndex() int {
  556. gr.mtx.Lock()
  557. defer gr.mtx.Unlock()
  558. return gr.curIndex
  559. }
  560. func (gr *GroupReader) SetIndex(index int) error {
  561. gr.mtx.Lock()
  562. defer gr.mtx.Unlock()
  563. return gr.openFile(index)
  564. }
  565. //--------------------------------------------------------------------------------
  566. // A simple SearchFunc that assumes that the marker is of form
  567. // <prefix><number>.
  568. // For example, if prefix is '#HEIGHT:', the markers of expected to be of the form:
  569. //
  570. // #HEIGHT:1
  571. // ...
  572. // #HEIGHT:2
  573. // ...
  574. func MakeSimpleSearchFunc(prefix string, target int) SearchFunc {
  575. return func(line string) (int, error) {
  576. if !strings.HasPrefix(line, prefix) {
  577. return -1, errors.New(Fmt("Marker line did not have prefix: %v", prefix))
  578. }
  579. i, err := strconv.Atoi(line[len(prefix):])
  580. if err != nil {
  581. return -1, errors.New(Fmt("Failed to parse marker line: %v", err.Error()))
  582. }
  583. if target < i {
  584. return 1, nil
  585. } else if target == i {
  586. return 0, nil
  587. } else {
  588. return -1, nil
  589. }
  590. }
  591. }