You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

246 lines
4.6 KiB

7 years ago
7 years ago
7 years ago
  1. package db
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "net/url"
  6. "os"
  7. "path"
  8. "path/filepath"
  9. "sync"
  10. "github.com/pkg/errors"
  11. )
  12. const (
  13. keyPerm = os.FileMode(0600)
  14. dirPerm = os.FileMode(0700)
  15. )
  16. func init() {
  17. registerDBCreator(FSDBBackendStr, func(name string, dir string) (DB, error) {
  18. dbPath := filepath.Join(dir, name+".db")
  19. return NewFSDB(dbPath), nil
  20. }, false)
  21. }
  22. // It's slow.
  23. type FSDB struct {
  24. mtx sync.Mutex
  25. dir string
  26. }
  27. func NewFSDB(dir string) *FSDB {
  28. err := os.MkdirAll(dir, dirPerm)
  29. if err != nil {
  30. panic(errors.Wrap(err, "Creating FSDB dir "+dir))
  31. }
  32. database := &FSDB{
  33. dir: dir,
  34. }
  35. return database
  36. }
  37. func (db *FSDB) Get(key []byte) []byte {
  38. db.mtx.Lock()
  39. defer db.mtx.Unlock()
  40. path := db.nameToPath(key)
  41. value, err := read(path)
  42. if os.IsNotExist(err) {
  43. return nil
  44. } else if err != nil {
  45. panic(errors.Wrap(err, fmt.Sprintf("Getting key %s (0x%X)", string(key), key)))
  46. }
  47. return value
  48. }
  49. func (db *FSDB) Has(key []byte) bool {
  50. db.mtx.Lock()
  51. defer db.mtx.Unlock()
  52. path := db.nameToPath(key)
  53. _, err := read(path)
  54. if os.IsNotExist(err) {
  55. return false
  56. } else if err != nil {
  57. panic(errors.Wrap(err, fmt.Sprintf("Getting key %s (0x%X)", string(key), key)))
  58. }
  59. return true
  60. }
  61. func (db *FSDB) Set(key []byte, value []byte) {
  62. db.mtx.Lock()
  63. defer db.mtx.Unlock()
  64. db.SetNoLock(key, value)
  65. }
  66. func (db *FSDB) SetSync(key []byte, value []byte) {
  67. db.mtx.Lock()
  68. defer db.mtx.Unlock()
  69. db.SetNoLock(key, value)
  70. }
  71. // NOTE: Implements atomicSetDeleter.
  72. func (db *FSDB) SetNoLock(key []byte, value []byte) {
  73. if value == nil {
  74. value = []byte{}
  75. }
  76. path := db.nameToPath(key)
  77. err := write(path, value)
  78. if err != nil {
  79. panic(errors.Wrap(err, fmt.Sprintf("Setting key %s (0x%X)", string(key), key)))
  80. }
  81. }
  82. func (db *FSDB) Delete(key []byte) {
  83. db.mtx.Lock()
  84. defer db.mtx.Unlock()
  85. db.DeleteNoLock(key)
  86. }
  87. func (db *FSDB) DeleteSync(key []byte) {
  88. db.mtx.Lock()
  89. defer db.mtx.Unlock()
  90. db.DeleteNoLock(key)
  91. }
  92. // NOTE: Implements atomicSetDeleter.
  93. func (db *FSDB) DeleteNoLock(key []byte) {
  94. err := remove(string(key))
  95. if os.IsNotExist(err) {
  96. return
  97. } else if err != nil {
  98. panic(errors.Wrap(err, fmt.Sprintf("Removing key %s (0x%X)", string(key), key)))
  99. }
  100. }
  101. func (db *FSDB) Close() {
  102. // Nothing to do.
  103. }
  104. func (db *FSDB) Print() {
  105. db.mtx.Lock()
  106. defer db.mtx.Unlock()
  107. panic("FSDB.Print not yet implemented")
  108. }
  109. func (db *FSDB) Stats() map[string]string {
  110. db.mtx.Lock()
  111. defer db.mtx.Unlock()
  112. panic("FSDB.Stats not yet implemented")
  113. }
  114. func (db *FSDB) NewBatch() Batch {
  115. db.mtx.Lock()
  116. defer db.mtx.Unlock()
  117. // Not sure we would ever want to try...
  118. // It doesn't seem easy for general filesystems.
  119. panic("FSDB.NewBatch not yet implemented")
  120. }
  121. func (db *FSDB) Mutex() *sync.Mutex {
  122. return &(db.mtx)
  123. }
  124. func (db *FSDB) Iterator(start, end []byte) Iterator {
  125. /*
  126. XXX
  127. it := newMemDBIterator()
  128. it.db = db
  129. it.cur = 0
  130. db.mtx.Lock()
  131. defer db.mtx.Unlock()
  132. // We need a copy of all of the keys.
  133. // Not the best, but probably not a bottleneck depending.
  134. keys, err := list(db.dir)
  135. if err != nil {
  136. panic(errors.Wrap(err, fmt.Sprintf("Listing keys in %s", db.dir)))
  137. }
  138. sort.Strings(keys)
  139. it.keys = keys
  140. return it
  141. */
  142. return nil
  143. }
  144. func (db *FSDB) ReverseIterator(start, end []byte) Iterator {
  145. // XXX
  146. return nil
  147. }
  148. func (db *FSDB) nameToPath(name []byte) string {
  149. n := url.PathEscape(string(name))
  150. return path.Join(db.dir, n)
  151. }
  152. // Read some bytes to a file.
  153. // CONTRACT: returns os errors directly without wrapping.
  154. func read(path string) ([]byte, error) {
  155. f, err := os.Open(path)
  156. if err != nil {
  157. return nil, err
  158. }
  159. defer f.Close()
  160. d, err := ioutil.ReadAll(f)
  161. if err != nil {
  162. return nil, err
  163. }
  164. return d, nil
  165. }
  166. // Write some bytes from a file.
  167. // CONTRACT: returns os errors directly without wrapping.
  168. func write(path string, d []byte) error {
  169. f, err := os.OpenFile(path, os.O_CREATE|os.O_EXCL|os.O_WRONLY, keyPerm)
  170. if err != nil {
  171. return err
  172. }
  173. defer f.Close()
  174. _, err = f.Write(d)
  175. if err != nil {
  176. return err
  177. }
  178. err = f.Sync()
  179. return err
  180. }
  181. // Remove a file.
  182. // CONTRACT: returns os errors directly without wrapping.
  183. func remove(path string) error {
  184. return os.Remove(path)
  185. }
  186. // List files of a path.
  187. // Paths will NOT include dir as the prefix.
  188. // CONTRACT: returns os errors directly without wrapping.
  189. func list(dirPath string) (paths []string, err error) {
  190. dir, err := os.Open(dirPath)
  191. if err != nil {
  192. return nil, err
  193. }
  194. defer dir.Close()
  195. names, err := dir.Readdirnames(0)
  196. if err != nil {
  197. return nil, err
  198. }
  199. for i, name := range names {
  200. n, err := url.PathUnescape(name)
  201. if err != nil {
  202. return nil, fmt.Errorf("Failed to unescape %s while listing", name)
  203. }
  204. names[i] = n
  205. }
  206. return names, nil
  207. }