You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

353 lines
7.2 KiB

  1. // +build boltdb
  2. package db
  3. import (
  4. "bytes"
  5. "errors"
  6. "fmt"
  7. "os"
  8. "path/filepath"
  9. "github.com/etcd-io/bbolt"
  10. )
  11. var bucket = []byte("tm")
  12. func init() {
  13. registerDBCreator(BoltDBBackend, func(name, dir string) (DB, error) {
  14. return NewBoltDB(name, dir)
  15. }, false)
  16. }
  17. // BoltDB is a wrapper around etcd's fork of bolt
  18. // (https://github.com/etcd-io/bbolt).
  19. //
  20. // NOTE: All operations (including Set, Delete) are synchronous by default. One
  21. // can globally turn it off by using NoSync config option (not recommended).
  22. //
  23. // A single bucket ([]byte("tm")) is used per a database instance. This could
  24. // lead to performance issues when/if there will be lots of keys.
  25. type BoltDB struct {
  26. db *bbolt.DB
  27. }
  28. // NewBoltDB returns a BoltDB with default options.
  29. func NewBoltDB(name, dir string) (DB, error) {
  30. return NewBoltDBWithOpts(name, dir, bbolt.DefaultOptions)
  31. }
  32. // NewBoltDBWithOpts allows you to supply *bbolt.Options. ReadOnly: true is not
  33. // supported because NewBoltDBWithOpts creates a global bucket.
  34. func NewBoltDBWithOpts(name string, dir string, opts *bbolt.Options) (DB, error) {
  35. if opts.ReadOnly {
  36. return nil, errors.New("ReadOnly: true is not supported")
  37. }
  38. dbPath := filepath.Join(dir, name+".db")
  39. db, err := bbolt.Open(dbPath, os.ModePerm, opts)
  40. if err != nil {
  41. return nil, err
  42. }
  43. // create a global bucket
  44. err = db.Update(func(tx *bbolt.Tx) error {
  45. _, err := tx.CreateBucketIfNotExists(bucket)
  46. return err
  47. })
  48. if err != nil {
  49. return nil, err
  50. }
  51. return &BoltDB{db: db}, nil
  52. }
  53. func (bdb *BoltDB) Get(key []byte) (value []byte) {
  54. key = nonEmptyKey(nonNilBytes(key))
  55. err := bdb.db.View(func(tx *bbolt.Tx) error {
  56. b := tx.Bucket(bucket)
  57. value = b.Get(key)
  58. return nil
  59. })
  60. if err != nil {
  61. panic(err)
  62. }
  63. return
  64. }
  65. func (bdb *BoltDB) Has(key []byte) bool {
  66. return bdb.Get(key) != nil
  67. }
  68. func (bdb *BoltDB) Set(key, value []byte) {
  69. key = nonEmptyKey(nonNilBytes(key))
  70. value = nonNilBytes(value)
  71. err := bdb.db.Update(func(tx *bbolt.Tx) error {
  72. b := tx.Bucket(bucket)
  73. return b.Put(key, value)
  74. })
  75. if err != nil {
  76. panic(err)
  77. }
  78. }
  79. func (bdb *BoltDB) SetSync(key, value []byte) {
  80. bdb.Set(key, value)
  81. }
  82. func (bdb *BoltDB) Delete(key []byte) {
  83. key = nonEmptyKey(nonNilBytes(key))
  84. err := bdb.db.Update(func(tx *bbolt.Tx) error {
  85. return tx.Bucket(bucket).Delete(key)
  86. })
  87. if err != nil {
  88. panic(err)
  89. }
  90. }
  91. func (bdb *BoltDB) DeleteSync(key []byte) {
  92. bdb.Delete(key)
  93. }
  94. func (bdb *BoltDB) Close() {
  95. bdb.db.Close()
  96. }
  97. func (bdb *BoltDB) Print() {
  98. stats := bdb.db.Stats()
  99. fmt.Printf("%v\n", stats)
  100. err := bdb.db.View(func(tx *bbolt.Tx) error {
  101. tx.Bucket(bucket).ForEach(func(k, v []byte) error {
  102. fmt.Printf("[%X]:\t[%X]\n", k, v)
  103. return nil
  104. })
  105. return nil
  106. })
  107. if err != nil {
  108. panic(err)
  109. }
  110. }
  111. func (bdb *BoltDB) Stats() map[string]string {
  112. stats := bdb.db.Stats()
  113. m := make(map[string]string)
  114. // Freelist stats
  115. m["FreePageN"] = fmt.Sprintf("%v", stats.FreePageN)
  116. m["PendingPageN"] = fmt.Sprintf("%v", stats.PendingPageN)
  117. m["FreeAlloc"] = fmt.Sprintf("%v", stats.FreeAlloc)
  118. m["FreelistInuse"] = fmt.Sprintf("%v", stats.FreelistInuse)
  119. // Transaction stats
  120. m["TxN"] = fmt.Sprintf("%v", stats.TxN)
  121. m["OpenTxN"] = fmt.Sprintf("%v", stats.OpenTxN)
  122. return m
  123. }
  124. // boltDBBatch stores key values in sync.Map and dumps them to the underlying
  125. // DB upon Write call.
  126. type boltDBBatch struct {
  127. buffer []struct {
  128. k []byte
  129. v []byte
  130. }
  131. db *BoltDB
  132. }
  133. // NewBatch returns a new batch.
  134. func (bdb *BoltDB) NewBatch() Batch {
  135. return &boltDBBatch{
  136. buffer: make([]struct {
  137. k []byte
  138. v []byte
  139. }, 0),
  140. db: bdb,
  141. }
  142. }
  143. // It is safe to modify the contents of the argument after Set returns but not
  144. // before.
  145. func (bdb *boltDBBatch) Set(key, value []byte) {
  146. bdb.buffer = append(bdb.buffer, struct {
  147. k []byte
  148. v []byte
  149. }{
  150. key, value,
  151. })
  152. }
  153. // It is safe to modify the contents of the argument after Delete returns but
  154. // not before.
  155. func (bdb *boltDBBatch) Delete(key []byte) {
  156. for i, elem := range bdb.buffer {
  157. if bytes.Equal(elem.k, key) {
  158. // delete without preserving order
  159. bdb.buffer[i] = bdb.buffer[len(bdb.buffer)-1]
  160. bdb.buffer = bdb.buffer[:len(bdb.buffer)-1]
  161. return
  162. }
  163. }
  164. }
  165. // NOTE: the operation is synchronous (see BoltDB for reasons)
  166. func (bdb *boltDBBatch) Write() {
  167. err := bdb.db.db.Batch(func(tx *bbolt.Tx) error {
  168. b := tx.Bucket(bucket)
  169. for _, elem := range bdb.buffer {
  170. if putErr := b.Put(elem.k, elem.v); putErr != nil {
  171. return putErr
  172. }
  173. }
  174. return nil
  175. })
  176. if err != nil {
  177. panic(err)
  178. }
  179. }
  180. func (bdb *boltDBBatch) WriteSync() {
  181. bdb.Write()
  182. }
  183. func (bdb *boltDBBatch) Close() {}
  184. // WARNING: Any concurrent writes or reads will block until the iterator is
  185. // closed.
  186. func (bdb *BoltDB) Iterator(start, end []byte) Iterator {
  187. tx, err := bdb.db.Begin(false)
  188. if err != nil {
  189. panic(err)
  190. }
  191. return newBoltDBIterator(tx, start, end, false)
  192. }
  193. // WARNING: Any concurrent writes or reads will block until the iterator is
  194. // closed.
  195. func (bdb *BoltDB) ReverseIterator(start, end []byte) Iterator {
  196. tx, err := bdb.db.Begin(false)
  197. if err != nil {
  198. panic(err)
  199. }
  200. return newBoltDBIterator(tx, start, end, true)
  201. }
  202. // boltDBIterator allows you to iterate on range of keys/values given some
  203. // start / end keys (nil & nil will result in doing full scan).
  204. type boltDBIterator struct {
  205. tx *bbolt.Tx
  206. itr *bbolt.Cursor
  207. start []byte
  208. end []byte
  209. currentKey []byte
  210. currentValue []byte
  211. isInvalid bool
  212. isReverse bool
  213. }
  214. func newBoltDBIterator(tx *bbolt.Tx, start, end []byte, isReverse bool) *boltDBIterator {
  215. itr := tx.Bucket(bucket).Cursor()
  216. var ck, cv []byte
  217. if isReverse {
  218. if end == nil {
  219. ck, cv = itr.Last()
  220. } else {
  221. _, _ = itr.Seek(end) // after key
  222. ck, cv = itr.Prev() // return to end key
  223. }
  224. } else {
  225. if start == nil {
  226. ck, cv = itr.First()
  227. } else {
  228. ck, cv = itr.Seek(start)
  229. }
  230. }
  231. return &boltDBIterator{
  232. tx: tx,
  233. itr: itr,
  234. start: start,
  235. end: end,
  236. currentKey: ck,
  237. currentValue: cv,
  238. isReverse: isReverse,
  239. isInvalid: false,
  240. }
  241. }
  242. func (itr *boltDBIterator) Domain() ([]byte, []byte) {
  243. return itr.start, itr.end
  244. }
  245. func (itr *boltDBIterator) Valid() bool {
  246. if itr.isInvalid {
  247. return false
  248. }
  249. // iterated to the end of the cursor
  250. if len(itr.currentKey) == 0 {
  251. itr.isInvalid = true
  252. return false
  253. }
  254. if itr.isReverse {
  255. if itr.start != nil && bytes.Compare(itr.currentKey, itr.start) < 0 {
  256. itr.isInvalid = true
  257. return false
  258. }
  259. } else {
  260. if itr.end != nil && bytes.Compare(itr.end, itr.currentKey) <= 0 {
  261. itr.isInvalid = true
  262. return false
  263. }
  264. }
  265. // Valid
  266. return true
  267. }
  268. func (itr *boltDBIterator) Next() {
  269. itr.assertIsValid()
  270. if itr.isReverse {
  271. itr.currentKey, itr.currentValue = itr.itr.Prev()
  272. } else {
  273. itr.currentKey, itr.currentValue = itr.itr.Next()
  274. }
  275. }
  276. func (itr *boltDBIterator) Key() []byte {
  277. itr.assertIsValid()
  278. return itr.currentKey
  279. }
  280. func (itr *boltDBIterator) Value() []byte {
  281. itr.assertIsValid()
  282. return itr.currentValue
  283. }
  284. func (itr *boltDBIterator) Close() {
  285. err := itr.tx.Rollback()
  286. if err != nil {
  287. panic(err)
  288. }
  289. }
  290. func (itr *boltDBIterator) assertIsValid() {
  291. if !itr.Valid() {
  292. panic("Boltdb-iterator is invalid")
  293. }
  294. }
  295. // nonEmptyKey returns a []byte("nil") if key is empty.
  296. // WARNING: this may collude with "nil" user key!
  297. func nonEmptyKey(key []byte) []byte {
  298. if len(key) == 0 {
  299. return []byte("nil")
  300. }
  301. return key
  302. }