You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

667 lines
14 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package autofile
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "log"
  8. "os"
  9. "path"
  10. "path/filepath"
  11. "regexp"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. . "github.com/tendermint/tmlibs/common"
  17. )
  18. /*
  19. You can open a Group to keep restrictions on an AutoFile, like
  20. the maximum size of each chunk, and/or the total amount of bytes
  21. stored in the group.
  22. The first file to be written in the Group.Dir is the head file.
  23. Dir/
  24. - <HeadPath>
  25. Once the Head file reaches the size limit, it will be rotated.
  26. Dir/
  27. - <HeadPath>.000 // First rolled file
  28. - <HeadPath> // New head path, starts empty.
  29. // The implicit index is 001.
  30. As more files are written, the index numbers grow...
  31. Dir/
  32. - <HeadPath>.000 // First rolled file
  33. - <HeadPath>.001 // Second rolled file
  34. - ...
  35. - <HeadPath> // New head path
  36. The Group can also be used to binary-search for some line,
  37. assuming that marker lines are written occasionally.
  38. */
  39. const groupCheckDuration = 5000 * time.Millisecond
  40. const defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB
  41. const defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB
  42. const maxFilesToRemove = 4 // needs to be greater than 1
  43. type Group struct {
  44. BaseService
  45. ID string
  46. Head *AutoFile // The head AutoFile to write to
  47. headBuf *bufio.Writer
  48. Dir string // Directory that contains .Head
  49. ticker *time.Ticker
  50. mtx sync.Mutex
  51. headSizeLimit int64
  52. totalSizeLimit int64
  53. minIndex int // Includes head
  54. maxIndex int // Includes head, where Head will move to
  55. // TODO: When we start deleting files, we need to start tracking GroupReaders
  56. // and their dependencies.
  57. }
  58. func OpenGroup(headPath string) (g *Group, err error) {
  59. dir := path.Dir(headPath)
  60. head, err := OpenAutoFile(headPath)
  61. if err != nil {
  62. return nil, err
  63. }
  64. g = &Group{
  65. ID: "group:" + head.ID,
  66. Head: head,
  67. headBuf: bufio.NewWriterSize(head, 4096*10),
  68. Dir: dir,
  69. ticker: time.NewTicker(groupCheckDuration),
  70. headSizeLimit: defaultHeadSizeLimit,
  71. totalSizeLimit: defaultTotalSizeLimit,
  72. minIndex: 0,
  73. maxIndex: 0,
  74. }
  75. g.BaseService = *NewBaseService(nil, "Group", g)
  76. gInfo := g.readGroupInfo()
  77. g.minIndex = gInfo.MinIndex
  78. g.maxIndex = gInfo.MaxIndex
  79. return
  80. }
  81. func (g *Group) OnStart() error {
  82. g.BaseService.OnStart()
  83. go g.processTicks()
  84. return nil
  85. }
  86. // NOTE: g.Head must be closed separately
  87. func (g *Group) OnStop() {
  88. g.BaseService.OnStop()
  89. g.ticker.Stop()
  90. }
  91. func (g *Group) SetHeadSizeLimit(limit int64) {
  92. g.mtx.Lock()
  93. g.headSizeLimit = limit
  94. g.mtx.Unlock()
  95. }
  96. func (g *Group) HeadSizeLimit() int64 {
  97. g.mtx.Lock()
  98. defer g.mtx.Unlock()
  99. return g.headSizeLimit
  100. }
  101. func (g *Group) SetTotalSizeLimit(limit int64) {
  102. g.mtx.Lock()
  103. g.totalSizeLimit = limit
  104. g.mtx.Unlock()
  105. }
  106. func (g *Group) TotalSizeLimit() int64 {
  107. g.mtx.Lock()
  108. defer g.mtx.Unlock()
  109. return g.totalSizeLimit
  110. }
  111. func (g *Group) MaxIndex() int {
  112. g.mtx.Lock()
  113. defer g.mtx.Unlock()
  114. return g.maxIndex
  115. }
  116. // Auto appends "\n"
  117. // NOTE: Writes are buffered so they don't write synchronously
  118. // TODO: Make it halt if space is unavailable
  119. func (g *Group) WriteLine(line string) error {
  120. g.mtx.Lock()
  121. defer g.mtx.Unlock()
  122. _, err := g.headBuf.Write([]byte(line + "\n"))
  123. return err
  124. }
  125. func (g *Group) Flush() error {
  126. g.mtx.Lock()
  127. defer g.mtx.Unlock()
  128. err := g.headBuf.Flush()
  129. if err == nil {
  130. err = g.Head.Sync()
  131. }
  132. return err
  133. }
  134. func (g *Group) processTicks() {
  135. for {
  136. _, ok := <-g.ticker.C
  137. if !ok {
  138. return // Done.
  139. }
  140. g.checkHeadSizeLimit()
  141. g.checkTotalSizeLimit()
  142. }
  143. }
  144. // NOTE: for testing
  145. func (g *Group) stopTicker() {
  146. g.ticker.Stop()
  147. }
  148. // NOTE: this function is called manually in tests.
  149. func (g *Group) checkHeadSizeLimit() {
  150. limit := g.HeadSizeLimit()
  151. if limit == 0 {
  152. return
  153. }
  154. size, err := g.Head.Size()
  155. if err != nil {
  156. panic(err)
  157. }
  158. if size >= limit {
  159. g.RotateFile()
  160. }
  161. }
  162. func (g *Group) checkTotalSizeLimit() {
  163. limit := g.TotalSizeLimit()
  164. if limit == 0 {
  165. return
  166. }
  167. gInfo := g.readGroupInfo()
  168. totalSize := gInfo.TotalSize
  169. for i := 0; i < maxFilesToRemove; i++ {
  170. index := gInfo.MinIndex + i
  171. if totalSize < limit {
  172. return
  173. }
  174. if index == gInfo.MaxIndex {
  175. // Special degenerate case, just do nothing.
  176. log.Println("WARNING: Group's head " + g.Head.Path + "may grow without bound")
  177. return
  178. }
  179. pathToRemove := filePathForIndex(g.Head.Path, index, gInfo.MaxIndex)
  180. fileInfo, err := os.Stat(pathToRemove)
  181. if err != nil {
  182. log.Println("WARNING: Failed to fetch info for file @" + pathToRemove)
  183. continue
  184. }
  185. err = os.Remove(pathToRemove)
  186. if err != nil {
  187. log.Println(err)
  188. return
  189. }
  190. totalSize -= fileInfo.Size()
  191. }
  192. }
  193. func (g *Group) RotateFile() {
  194. g.mtx.Lock()
  195. defer g.mtx.Unlock()
  196. headPath := g.Head.Path
  197. if err := g.Head.closeFile(); err != nil {
  198. panic(err)
  199. }
  200. indexPath := filePathForIndex(headPath, g.maxIndex, g.maxIndex+1)
  201. if err := os.Rename(headPath, indexPath); err != nil {
  202. panic(err)
  203. }
  204. g.maxIndex += 1
  205. }
  206. // NOTE: if error, returns no GroupReader.
  207. // CONTRACT: Caller must close the returned GroupReader
  208. func (g *Group) NewReader(index int) (*GroupReader, error) {
  209. r := newGroupReader(g)
  210. err := r.SetIndex(index)
  211. if err != nil {
  212. return nil, err
  213. } else {
  214. return r, nil
  215. }
  216. }
  217. // Returns -1 if line comes after, 0 if found, 1 if line comes before.
  218. type SearchFunc func(line string) (int, error)
  219. // Searches for the right file in Group, then returns a GroupReader to start
  220. // streaming lines.
  221. // Returns true if an exact match was found, otherwise returns the next greater
  222. // line that starts with prefix.
  223. // CONTRACT: Caller must close the returned GroupReader
  224. func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, bool, error) {
  225. g.mtx.Lock()
  226. minIndex, maxIndex := g.minIndex, g.maxIndex
  227. g.mtx.Unlock()
  228. // Now minIndex/maxIndex may change meanwhile,
  229. // but it shouldn't be a big deal
  230. // (maybe we'll want to limit scanUntil though)
  231. for {
  232. curIndex := (minIndex + maxIndex + 1) / 2
  233. // Base case, when there's only 1 choice left.
  234. if minIndex == maxIndex {
  235. r, err := g.NewReader(maxIndex)
  236. if err != nil {
  237. return nil, false, err
  238. }
  239. match, err := scanUntil(r, prefix, cmp)
  240. if err != nil {
  241. r.Close()
  242. return nil, false, err
  243. } else {
  244. return r, match, err
  245. }
  246. }
  247. // Read starting roughly at the middle file,
  248. // until we find line that has prefix.
  249. r, err := g.NewReader(curIndex)
  250. if err != nil {
  251. return nil, false, err
  252. }
  253. foundIndex, line, err := scanNext(r, prefix)
  254. r.Close()
  255. if err != nil {
  256. return nil, false, err
  257. }
  258. // Compare this line to our search query.
  259. val, err := cmp(line)
  260. if err != nil {
  261. return nil, false, err
  262. }
  263. if val < 0 {
  264. // Line will come later
  265. minIndex = foundIndex
  266. } else if val == 0 {
  267. // Stroke of luck, found the line
  268. r, err := g.NewReader(foundIndex)
  269. if err != nil {
  270. return nil, false, err
  271. }
  272. match, err := scanUntil(r, prefix, cmp)
  273. if !match {
  274. panic("Expected match to be true")
  275. }
  276. if err != nil {
  277. r.Close()
  278. return nil, false, err
  279. } else {
  280. return r, true, err
  281. }
  282. } else {
  283. // We passed it
  284. maxIndex = curIndex - 1
  285. }
  286. }
  287. }
  288. // Scans and returns the first line that starts with 'prefix'
  289. // Consumes line and returns it.
  290. func scanNext(r *GroupReader, prefix string) (int, string, error) {
  291. for {
  292. line, err := r.ReadLine()
  293. if err != nil {
  294. return 0, "", err
  295. }
  296. if !strings.HasPrefix(line, prefix) {
  297. continue
  298. }
  299. index := r.CurIndex()
  300. return index, line, nil
  301. }
  302. }
  303. // Returns true iff an exact match was found.
  304. // Pushes line, does not consume it.
  305. func scanUntil(r *GroupReader, prefix string, cmp SearchFunc) (bool, error) {
  306. for {
  307. line, err := r.ReadLine()
  308. if err != nil {
  309. return false, err
  310. }
  311. if !strings.HasPrefix(line, prefix) {
  312. continue
  313. }
  314. val, err := cmp(line)
  315. if err != nil {
  316. return false, err
  317. }
  318. if val < 0 {
  319. continue
  320. } else if val == 0 {
  321. r.PushLine(line)
  322. return true, nil
  323. } else {
  324. r.PushLine(line)
  325. return false, nil
  326. }
  327. }
  328. }
  329. // Searches backwards for the last line in Group with prefix.
  330. // Scans each file forward until the end to find the last match.
  331. func (g *Group) FindLast(prefix string) (match string, found bool, err error) {
  332. g.mtx.Lock()
  333. minIndex, maxIndex := g.minIndex, g.maxIndex
  334. g.mtx.Unlock()
  335. r, err := g.NewReader(maxIndex)
  336. if err != nil {
  337. return "", false, err
  338. }
  339. defer r.Close()
  340. // Open files from the back and read
  341. GROUP_LOOP:
  342. for i := maxIndex; i >= minIndex; i-- {
  343. err := r.SetIndex(i)
  344. if err != nil {
  345. return "", false, err
  346. }
  347. // Scan each line and test whether line matches
  348. for {
  349. line, err := r.ReadLine()
  350. if err == io.EOF {
  351. if found {
  352. return match, found, nil
  353. } else {
  354. continue GROUP_LOOP
  355. }
  356. } else if err != nil {
  357. return "", false, err
  358. }
  359. if strings.HasPrefix(line, prefix) {
  360. match = line
  361. found = true
  362. }
  363. if r.CurIndex() > i {
  364. if found {
  365. return match, found, nil
  366. } else {
  367. continue GROUP_LOOP
  368. }
  369. }
  370. }
  371. }
  372. return
  373. }
  374. type GroupInfo struct {
  375. MinIndex int
  376. MaxIndex int
  377. TotalSize int64
  378. HeadSize int64
  379. }
  380. // Returns info after scanning all files in g.Head's dir
  381. func (g *Group) ReadGroupInfo() GroupInfo {
  382. g.mtx.Lock()
  383. defer g.mtx.Unlock()
  384. return g.readGroupInfo()
  385. }
  386. // Index includes the head.
  387. // CONTRACT: caller should have called g.mtx.Lock
  388. func (g *Group) readGroupInfo() GroupInfo {
  389. groupDir := filepath.Dir(g.Head.Path)
  390. headBase := filepath.Base(g.Head.Path)
  391. var minIndex, maxIndex int = -1, -1
  392. var totalSize, headSize int64 = 0, 0
  393. dir, err := os.Open(groupDir)
  394. if err != nil {
  395. panic(err)
  396. }
  397. defer dir.Close()
  398. fiz, err := dir.Readdir(0)
  399. if err != nil {
  400. panic(err)
  401. }
  402. // For each file in the directory, filter by pattern
  403. for _, fileInfo := range fiz {
  404. if fileInfo.Name() == headBase {
  405. fileSize := fileInfo.Size()
  406. totalSize += fileSize
  407. headSize = fileSize
  408. continue
  409. } else if strings.HasPrefix(fileInfo.Name(), headBase) {
  410. fileSize := fileInfo.Size()
  411. totalSize += fileSize
  412. indexedFilePattern := regexp.MustCompile(`^.+\.([0-9]{3,})$`)
  413. submatch := indexedFilePattern.FindSubmatch([]byte(fileInfo.Name()))
  414. if len(submatch) != 0 {
  415. // Matches
  416. fileIndex, err := strconv.Atoi(string(submatch[1]))
  417. if err != nil {
  418. panic(err)
  419. }
  420. if maxIndex < fileIndex {
  421. maxIndex = fileIndex
  422. }
  423. if minIndex == -1 || fileIndex < minIndex {
  424. minIndex = fileIndex
  425. }
  426. }
  427. }
  428. }
  429. // Now account for the head.
  430. if minIndex == -1 {
  431. // If there were no numbered files,
  432. // then the head is index 0.
  433. minIndex, maxIndex = 0, 0
  434. } else {
  435. // Otherwise, the head file is 1 greater
  436. maxIndex += 1
  437. }
  438. return GroupInfo{minIndex, maxIndex, totalSize, headSize}
  439. }
  440. func filePathForIndex(headPath string, index int, maxIndex int) string {
  441. if index == maxIndex {
  442. return headPath
  443. } else {
  444. return fmt.Sprintf("%v.%03d", headPath, index)
  445. }
  446. }
  447. //--------------------------------------------------------------------------------
  448. type GroupReader struct {
  449. *Group
  450. mtx sync.Mutex
  451. curIndex int
  452. curFile *os.File
  453. curReader *bufio.Reader
  454. curLine []byte
  455. }
  456. func newGroupReader(g *Group) *GroupReader {
  457. return &GroupReader{
  458. Group: g,
  459. curIndex: 0,
  460. curFile: nil,
  461. curReader: nil,
  462. curLine: nil,
  463. }
  464. }
  465. func (gr *GroupReader) Close() error {
  466. gr.mtx.Lock()
  467. defer gr.mtx.Unlock()
  468. if gr.curReader != nil {
  469. err := gr.curFile.Close()
  470. gr.curIndex = 0
  471. gr.curReader = nil
  472. gr.curFile = nil
  473. gr.curLine = nil
  474. return err
  475. } else {
  476. return nil
  477. }
  478. }
  479. // Reads a line (without delimiter)
  480. // just return io.EOF if no new lines found.
  481. func (gr *GroupReader) ReadLine() (string, error) {
  482. gr.mtx.Lock()
  483. defer gr.mtx.Unlock()
  484. // From PushLine
  485. if gr.curLine != nil {
  486. line := string(gr.curLine)
  487. gr.curLine = nil
  488. return line, nil
  489. }
  490. // Open file if not open yet
  491. if gr.curReader == nil {
  492. err := gr.openFile(gr.curIndex)
  493. if err != nil {
  494. return "", err
  495. }
  496. }
  497. // Iterate over files until line is found
  498. var linePrefix string
  499. for {
  500. bytesRead, err := gr.curReader.ReadBytes('\n')
  501. if err == io.EOF {
  502. // Open the next file
  503. if err1 := gr.openFile(gr.curIndex + 1); err1 != nil {
  504. return "", err1
  505. }
  506. if len(bytesRead) > 0 && bytesRead[len(bytesRead)-1] == byte('\n') {
  507. return linePrefix + string(bytesRead[:len(bytesRead)-1]), nil
  508. } else {
  509. linePrefix += string(bytesRead)
  510. continue
  511. }
  512. } else if err != nil {
  513. return "", err
  514. }
  515. return linePrefix + string(bytesRead[:len(bytesRead)-1]), nil
  516. }
  517. }
  518. // IF index > gr.Group.maxIndex, returns io.EOF
  519. // CONTRACT: caller should hold gr.mtx
  520. func (gr *GroupReader) openFile(index int) error {
  521. // Lock on Group to ensure that head doesn't move in the meanwhile.
  522. gr.Group.mtx.Lock()
  523. defer gr.Group.mtx.Unlock()
  524. if index > gr.Group.maxIndex {
  525. return io.EOF
  526. }
  527. curFilePath := filePathForIndex(gr.Head.Path, index, gr.Group.maxIndex)
  528. curFile, err := os.Open(curFilePath)
  529. if err != nil {
  530. return err
  531. }
  532. curReader := bufio.NewReader(curFile)
  533. // Update gr.cur*
  534. if gr.curFile != nil {
  535. gr.curFile.Close() // TODO return error?
  536. }
  537. gr.curIndex = index
  538. gr.curFile = curFile
  539. gr.curReader = curReader
  540. gr.curLine = nil
  541. return nil
  542. }
  543. func (gr *GroupReader) PushLine(line string) {
  544. gr.mtx.Lock()
  545. defer gr.mtx.Unlock()
  546. if gr.curLine == nil {
  547. gr.curLine = []byte(line)
  548. } else {
  549. panic("PushLine failed, already have line")
  550. }
  551. }
  552. // Cursor's file index.
  553. func (gr *GroupReader) CurIndex() int {
  554. gr.mtx.Lock()
  555. defer gr.mtx.Unlock()
  556. return gr.curIndex
  557. }
  558. func (gr *GroupReader) SetIndex(index int) error {
  559. gr.mtx.Lock()
  560. defer gr.mtx.Unlock()
  561. return gr.openFile(index)
  562. }
  563. //--------------------------------------------------------------------------------
  564. // A simple SearchFunc that assumes that the marker is of form
  565. // <prefix><number>.
  566. // For example, if prefix is '#HEIGHT:', the markers of expected to be of the form:
  567. //
  568. // #HEIGHT:1
  569. // ...
  570. // #HEIGHT:2
  571. // ...
  572. func MakeSimpleSearchFunc(prefix string, target int) SearchFunc {
  573. return func(line string) (int, error) {
  574. if !strings.HasPrefix(line, prefix) {
  575. return -1, errors.New(Fmt("Marker line did not have prefix: %v", prefix))
  576. }
  577. i, err := strconv.Atoi(line[len(prefix):])
  578. if err != nil {
  579. return -1, errors.New(Fmt("Failed to parse marker line: %v", err.Error()))
  580. }
  581. if target < i {
  582. return 1, nil
  583. } else if target == i {
  584. return 0, nil
  585. } else {
  586. return -1, nil
  587. }
  588. }
  589. }