Browse Source

libs/db: bbolt (etcd's fork of bolt) (#3610)

Description:

    add boltdb implement for db interface.

    The boltdb is safe and high performence embedded KV database. It is based on B+tree and Mmap.
    Now it is maintained by etcd-io org. https://github.com/etcd-io/bbolt
    It is much better than LSM tree (levelDB) when RANDOM and SCAN read.

Replaced #3604
Refs #1835

Commits:

* add boltdb for storage

* update boltdb in gopkg.toml

* update bbolt in Gopkg.lock

* dep update  Gopkg.lock

* add boltdb'bucket when create Boltdb

* some modifies

* address my own comments

* fix most of the Iterator tests for boltdb

* bolt does not support concurrent writes while iterating

* add WARNING word

* note that ReadOnly option is not supported

* fixes after Ismail's review

Co-Authored-By: melekes <anton.kalyaev@gmail.com>

* panic if View returns error

plus specify bucket in the top comment
pull/3627/head
Anton Kaliaev 6 years ago
committed by GitHub
parent
commit
debf8f70c9
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 468 additions and 82 deletions
  1. +15
    -2
      Gopkg.lock
  2. +4
    -0
      Gopkg.toml
  3. +323
    -0
      libs/db/boltdb.go
  4. +35
    -0
      libs/db/boltdb_test.go
  5. +66
    -0
      libs/db/common_test.go
  6. +1
    -0
      libs/db/db.go
  7. +1
    -1
      libs/db/fsdb.go
  8. +17
    -78
      libs/db/go_level_db_test.go
  9. +1
    -1
      libs/db/mem_db.go
  10. +5
    -0
      libs/db/util_test.go

+ 15
- 2
Gopkg.lock View File

@ -34,6 +34,14 @@
revision = "8991bc29aa16c548c550c7ff78260e27b9ab7c73" revision = "8991bc29aa16c548c550c7ff78260e27b9ab7c73"
version = "v1.1.1" version = "v1.1.1"
[[projects]]
digest = "1:5f7414cf41466d4b4dd7ec52b2cd3e481e08cfd11e7e24fef730c0e483e88bb1"
name = "github.com/etcd-io/bbolt"
packages = ["."]
pruneopts = "UT"
revision = "63597a96ec0ad9e6d43c3fc81e809909e0237461"
version = "v1.3.2"
[[projects]] [[projects]]
digest = "1:544229a3ca0fb2dd5ebc2896d3d2ff7ce096d9751635301e44e37e761349ee70" digest = "1:544229a3ca0fb2dd5ebc2896d3d2ff7ce096d9751635301e44e37e761349ee70"
name = "github.com/fortytw2/leaktest" name = "github.com/fortytw2/leaktest"
@ -170,9 +178,12 @@
revision = "b84e30acd515aadc4b783ad4ff83aff3299bdfe0" revision = "b84e30acd515aadc4b783ad4ff83aff3299bdfe0"
[[projects]] [[projects]]
digest = "1:c568d7727aa262c32bdf8a3f7db83614f7af0ed661474b24588de635c20024c7"
digest = "1:53e8c5c79716437e601696140e8b1801aae4204f4ec54a504333702a49572c4f"
name = "github.com/magiconair/properties" name = "github.com/magiconair/properties"
packages = ["."]
packages = [
".",
"assert",
]
pruneopts = "UT" pruneopts = "UT"
revision = "c2353362d570a7bfa228149c62842019201cfb71" revision = "c2353362d570a7bfa228149c62842019201cfb71"
version = "v1.8.0" version = "v1.8.0"
@ -500,6 +511,7 @@
"github.com/btcsuite/btcd/btcec", "github.com/btcsuite/btcd/btcec",
"github.com/btcsuite/btcutil/base58", "github.com/btcsuite/btcutil/base58",
"github.com/btcsuite/btcutil/bech32", "github.com/btcsuite/btcutil/bech32",
"github.com/etcd-io/bbolt",
"github.com/fortytw2/leaktest", "github.com/fortytw2/leaktest",
"github.com/go-kit/kit/log", "github.com/go-kit/kit/log",
"github.com/go-kit/kit/log/level", "github.com/go-kit/kit/log/level",
@ -516,6 +528,7 @@
"github.com/golang/protobuf/ptypes/timestamp", "github.com/golang/protobuf/ptypes/timestamp",
"github.com/gorilla/websocket", "github.com/gorilla/websocket",
"github.com/jmhodges/levigo", "github.com/jmhodges/levigo",
"github.com/magiconair/properties/assert",
"github.com/pkg/errors", "github.com/pkg/errors",
"github.com/prometheus/client_golang/prometheus", "github.com/prometheus/client_golang/prometheus",
"github.com/prometheus/client_golang/prometheus/promhttp", "github.com/prometheus/client_golang/prometheus/promhttp",


+ 4
- 0
Gopkg.toml View File

@ -22,6 +22,10 @@
########################################################### ###########################################################
# Allow only patch releases for serialization libraries # Allow only patch releases for serialization libraries
[[constraint]]
name = "github.com/etcd-io/bbolt"
version = "v1.3.2"
[[constraint]] [[constraint]]
name = "github.com/tendermint/go-amino" name = "github.com/tendermint/go-amino"
version = "~0.14.1" version = "~0.14.1"


+ 323
- 0
libs/db/boltdb.go View File

@ -0,0 +1,323 @@
package db
import (
"bytes"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"github.com/etcd-io/bbolt"
)
var bucket = []byte("tm")
func init() {
registerDBCreator(BoltDBBackend, func(name, dir string) (DB, error) {
return NewBoltDB(name, dir)
}, false)
}
// BoltDB is a wrapper around etcd's fork of bolt
// (https://github.com/etcd-io/bbolt).
//
// NOTE: All operations (including Set, Delete) are synchronous by default. One
// can globally turn it off by using NoSync config option (not recommended).
//
// A single bucket ([]byte("tm")) is used per a database instance. This could
// lead to performance issues when/if there will be lots of keys.
type BoltDB struct {
db *bbolt.DB
}
// NewBoltDB returns a BoltDB with default options.
func NewBoltDB(name, dir string) (DB, error) {
return NewBoltDBWithOpts(name, dir, bbolt.DefaultOptions)
}
// NewBoltDBWithOpts allows you to supply *bbolt.Options. ReadOnly: true is not
// supported because NewBoltDBWithOpts creates a global bucket.
func NewBoltDBWithOpts(name string, dir string, opts *bbolt.Options) (DB, error) {
if opts.ReadOnly {
return nil, errors.New("ReadOnly: true is not supported")
}
dbPath := filepath.Join(dir, name+".db")
db, err := bbolt.Open(dbPath, os.ModePerm, opts)
if err != nil {
return nil, err
}
// create a global bucket
err = db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(bucket)
return err
})
if err != nil {
return nil, err
}
return &BoltDB{db: db}, nil
}
func (bdb *BoltDB) Get(key []byte) (value []byte) {
key = nonEmptyKey(nonNilBytes(key))
err := bdb.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(bucket)
value = b.Get(key)
return nil
})
if err != nil {
panic(err)
}
return
}
func (bdb *BoltDB) Has(key []byte) bool {
return bdb.Get(key) != nil
}
func (bdb *BoltDB) Set(key, value []byte) {
key = nonEmptyKey(nonNilBytes(key))
value = nonNilBytes(value)
err := bdb.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(bucket)
return b.Put(key, value)
})
if err != nil {
panic(err)
}
}
func (bdb *BoltDB) SetSync(key, value []byte) {
bdb.Set(key, value)
}
func (bdb *BoltDB) Delete(key []byte) {
key = nonEmptyKey(nonNilBytes(key))
err := bdb.db.Update(func(tx *bbolt.Tx) error {
return tx.Bucket(bucket).Delete(key)
})
if err != nil {
panic(err)
}
}
func (bdb *BoltDB) DeleteSync(key []byte) {
bdb.Delete(key)
}
func (bdb *BoltDB) Close() {
bdb.db.Close()
}
func (bdb *BoltDB) Print() {
stats := bdb.db.Stats()
fmt.Printf("%v\n", stats)
err := bdb.db.View(func(tx *bbolt.Tx) error {
tx.Bucket(bucket).ForEach(func(k, v []byte) error {
fmt.Printf("[%X]:\t[%X]\n", k, v)
return nil
})
return nil
})
if err != nil {
panic(err)
}
}
func (bdb *BoltDB) Stats() map[string]string {
stats := bdb.db.Stats()
m := make(map[string]string)
// Freelist stats
m["FreePageN"] = fmt.Sprintf("%v", stats.FreePageN)
m["PendingPageN"] = fmt.Sprintf("%v", stats.PendingPageN)
m["FreeAlloc"] = fmt.Sprintf("%v", stats.FreeAlloc)
m["FreelistInuse"] = fmt.Sprintf("%v", stats.FreelistInuse)
// Transaction stats
m["TxN"] = fmt.Sprintf("%v", stats.TxN)
m["OpenTxN"] = fmt.Sprintf("%v", stats.OpenTxN)
return m
}
// boltDBBatch stores key values in sync.Map and dumps them to the underlying
// DB upon Write call.
type boltDBBatch struct {
buffer *sync.Map
db *BoltDB
}
// NewBatch returns a new batch.
func (bdb *BoltDB) NewBatch() Batch {
return &boltDBBatch{
buffer: &sync.Map{},
db: bdb,
}
}
func (bdb *boltDBBatch) Set(key, value []byte) {
bdb.buffer.Store(key, value)
}
func (bdb *boltDBBatch) Delete(key []byte) {
bdb.buffer.Delete(key)
}
// NOTE: the operation is synchronous (see BoltDB for reasons)
func (bdb *boltDBBatch) Write() {
err := bdb.db.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(bucket)
var putErr error
bdb.buffer.Range(func(key, value interface{}) bool {
putErr = b.Put(key.([]byte), value.([]byte))
return putErr == nil // stop if putErr is not nil
})
return putErr
})
if err != nil {
panic(err)
}
}
func (bdb *boltDBBatch) WriteSync() {
bdb.Write()
}
func (bdb *boltDBBatch) Close() {}
// WARNING: Any concurrent writes (Set, SetSync) will block until the Iterator
// is closed.
func (bdb *BoltDB) Iterator(start, end []byte) Iterator {
tx, err := bdb.db.Begin(false)
if err != nil {
panic(err)
}
c := tx.Bucket(bucket).Cursor()
return newBoltDBIterator(c, start, end, false)
}
// WARNING: Any concurrent writes (Set, SetSync) will block until the Iterator
// is closed.
func (bdb *BoltDB) ReverseIterator(start, end []byte) Iterator {
tx, err := bdb.db.Begin(false)
if err != nil {
panic(err)
}
c := tx.Bucket(bucket).Cursor()
return newBoltDBIterator(c, start, end, true)
}
// boltDBIterator allows you to iterate on range of keys/values given some
// start / end keys (nil & nil will result in doing full scan).
type boltDBIterator struct {
itr *bbolt.Cursor
start []byte
end []byte
currentKey []byte
currentValue []byte
isInvalid bool
isReverse bool
}
func newBoltDBIterator(itr *bbolt.Cursor, start, end []byte, isReverse bool) *boltDBIterator {
var ck, cv []byte
if isReverse {
if end == nil {
ck, cv = itr.Last()
} else {
_, _ = itr.Seek(end) // after key
ck, cv = itr.Prev() // return to end key
}
} else {
if start == nil {
ck, cv = itr.First()
} else {
ck, cv = itr.Seek(start)
}
}
return &boltDBIterator{
itr: itr,
start: start,
end: end,
currentKey: ck,
currentValue: cv,
isReverse: isReverse,
isInvalid: false,
}
}
func (itr *boltDBIterator) Domain() ([]byte, []byte) {
return itr.start, itr.end
}
func (itr *boltDBIterator) Valid() bool {
if itr.isInvalid {
return false
}
// iterated to the end of the cursor
if len(itr.currentKey) == 0 {
itr.isInvalid = true
return false
}
if itr.isReverse {
if itr.start != nil && bytes.Compare(itr.currentKey, itr.start) < 0 {
itr.isInvalid = true
return false
}
} else {
if itr.end != nil && bytes.Compare(itr.end, itr.currentKey) <= 0 {
itr.isInvalid = true
return false
}
}
// Valid
return true
}
func (itr *boltDBIterator) Next() {
itr.assertIsValid()
if itr.isReverse {
itr.currentKey, itr.currentValue = itr.itr.Prev()
} else {
itr.currentKey, itr.currentValue = itr.itr.Next()
}
}
func (itr *boltDBIterator) Key() []byte {
itr.assertIsValid()
return itr.currentKey
}
func (itr *boltDBIterator) Value() []byte {
itr.assertIsValid()
return itr.currentValue
}
// boltdb cursor has no close op.
func (itr *boltDBIterator) Close() {}
func (itr *boltDBIterator) assertIsValid() {
if !itr.Valid() {
panic("Boltdb-iterator is invalid")
}
}
// nonEmptyKey returns a []byte("nil") if key is empty.
// WARNING: this may collude with "nil" user key!
func nonEmptyKey(key []byte) []byte {
if len(key) == 0 {
return []byte("nil")
}
return key
}

+ 35
- 0
libs/db/boltdb_test.go View File

@ -0,0 +1,35 @@
package db
import (
"fmt"
"os"
"testing"
"github.com/stretchr/testify/require"
cmn "github.com/tendermint/tendermint/libs/common"
)
func TestBoltDBNewBoltDB(t *testing.T) {
name := fmt.Sprintf("test_%x", cmn.RandStr(12))
dir := os.TempDir()
defer cleanupDBDir(dir, name)
db, err := NewBoltDB(name, dir)
require.NoError(t, err)
db.Close()
}
func BenchmarkBoltDBRandomReadsWrites(b *testing.B) {
name := fmt.Sprintf("test_%x", cmn.RandStr(12))
db, err := NewBoltDB(name, "")
if err != nil {
b.Fatal(err)
}
defer func() {
db.Close()
cleanupDBDir("", name)
}()
benchmarkRandomReadsWrites(b, db)
}

+ 66
- 0
libs/db/common_test.go View File

@ -1,6 +1,8 @@
package db package db
import ( import (
"bytes"
"encoding/binary"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"sync" "sync"
@ -8,6 +10,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
cmn "github.com/tendermint/tendermint/libs/common"
) )
//---------------------------------------- //----------------------------------------
@ -188,3 +191,66 @@ func (mockIterator) Value() []byte {
func (mockIterator) Close() { func (mockIterator) Close() {
} }
func benchmarkRandomReadsWrites(b *testing.B, db DB) {
b.StopTimer()
// create dummy data
const numItems = int64(1000000)
internal := map[int64]int64{}
for i := 0; i < int(numItems); i++ {
internal[int64(i)] = int64(0)
}
// fmt.Println("ok, starting")
b.StartTimer()
for i := 0; i < b.N; i++ {
// Write something
{
idx := int64(cmn.RandInt()) % numItems
internal[idx]++
val := internal[idx]
idxBytes := int642Bytes(int64(idx))
valBytes := int642Bytes(int64(val))
//fmt.Printf("Set %X -> %X\n", idxBytes, valBytes)
db.Set(idxBytes, valBytes)
}
// Read something
{
idx := int64(cmn.RandInt()) % numItems
valExp := internal[idx]
idxBytes := int642Bytes(int64(idx))
valBytes := db.Get(idxBytes)
//fmt.Printf("Get %X -> %X\n", idxBytes, valBytes)
if valExp == 0 {
if !bytes.Equal(valBytes, nil) {
b.Errorf("Expected %v for %v, got %X", nil, idx, valBytes)
break
}
} else {
if len(valBytes) != 8 {
b.Errorf("Expected length 8 for %v, got %X", idx, valBytes)
break
}
valGot := bytes2Int64(valBytes)
if valExp != valGot {
b.Errorf("Expected %v for %v, got %v", valExp, idx, valGot)
break
}
}
}
}
}
func int642Bytes(i int64) []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(i))
return buf
}
func bytes2Int64(buf []byte) int64 {
return int64(binary.BigEndian.Uint64(buf))
}

+ 1
- 0
libs/db/db.go View File

@ -16,6 +16,7 @@ const (
GoLevelDBBackend DBBackendType = "goleveldb" GoLevelDBBackend DBBackendType = "goleveldb"
MemDBBackend DBBackendType = "memdb" MemDBBackend DBBackendType = "memdb"
FSDBBackend DBBackendType = "fsdb" // using the filesystem naively FSDBBackend DBBackendType = "fsdb" // using the filesystem naively
BoltDBBackend DBBackendType = "boltdb"
) )
type dbCreator func(name string, dir string) (DB, error) type dbCreator func(name string, dir string) (DB, error)


+ 1
- 1
libs/db/fsdb.go View File

@ -20,7 +20,7 @@ const (
) )
func init() { func init() {
registerDBCreator(FSDBBackend, func(name string, dir string) (DB, error) {
registerDBCreator(FSDBBackend, func(name, dir string) (DB, error) {
dbPath := filepath.Join(dir, name+".db") dbPath := filepath.Join(dir, name+".db")
return NewFSDB(dbPath), nil return NewFSDB(dbPath), nil
}, false) }, false)


+ 17
- 78
libs/db/go_level_db_test.go View File

@ -1,29 +1,27 @@
package db package db
import ( import (
"bytes"
"encoding/binary"
"fmt" "fmt"
"os"
"testing" "testing"
"github.com/stretchr/testify/require"
"github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/opt"
"github.com/stretchr/testify/require"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
) )
func TestNewGoLevelDB(t *testing.T) {
func TestGoLevelDBNewGoLevelDB(t *testing.T) {
name := fmt.Sprintf("test_%x", cmn.RandStr(12)) name := fmt.Sprintf("test_%x", cmn.RandStr(12))
// Test write locks
db, err := NewGoLevelDB(name, "")
defer cleanupDBDir("", name)
// Test we can't open the db twice for writing
wr1, err := NewGoLevelDB(name, "")
require.Nil(t, err) require.Nil(t, err)
defer os.RemoveAll("./" + name + ".db")
_, err = NewGoLevelDB(name, "") _, err = NewGoLevelDB(name, "")
require.NotNil(t, err) require.NotNil(t, err)
db.Close() // Close the db to release the lock
wr1.Close() // Close the db to release the lock
// Open the db twice in a row to test read-only locks
// Test we can open the db twice for reading only
ro1, err := NewGoLevelDBWithOpts(name, "", &opt.Options{ReadOnly: true}) ro1, err := NewGoLevelDBWithOpts(name, "", &opt.Options{ReadOnly: true})
defer ro1.Close() defer ro1.Close()
require.Nil(t, err) require.Nil(t, err)
@ -32,75 +30,16 @@ func TestNewGoLevelDB(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
} }
func BenchmarkRandomReadsWrites(b *testing.B) {
b.StopTimer()
numItems := int64(1000000)
internal := map[int64]int64{}
for i := 0; i < int(numItems); i++ {
internal[int64(i)] = int64(0)
}
db, err := NewGoLevelDB(fmt.Sprintf("test_%x", cmn.RandStr(12)), "")
func BenchmarkGoLevelDBRandomReadsWrites(b *testing.B) {
name := fmt.Sprintf("test_%x", cmn.RandStr(12))
db, err := NewGoLevelDB(name, "")
if err != nil { if err != nil {
b.Fatal(err.Error())
return
b.Fatal(err)
} }
defer func() {
db.Close()
cleanupDBDir("", name)
}()
fmt.Println("ok, starting")
b.StartTimer()
for i := 0; i < b.N; i++ {
// Write something
{
idx := (int64(cmn.RandInt()) % numItems)
internal[idx]++
val := internal[idx]
idxBytes := int642Bytes(int64(idx))
valBytes := int642Bytes(int64(val))
//fmt.Printf("Set %X -> %X\n", idxBytes, valBytes)
db.Set(
idxBytes,
valBytes,
)
}
// Read something
{
idx := (int64(cmn.RandInt()) % numItems)
val := internal[idx]
idxBytes := int642Bytes(int64(idx))
valBytes := db.Get(idxBytes)
//fmt.Printf("Get %X -> %X\n", idxBytes, valBytes)
if val == 0 {
if !bytes.Equal(valBytes, nil) {
b.Errorf("Expected %v for %v, got %X",
nil, idx, valBytes)
break
}
} else {
if len(valBytes) != 8 {
b.Errorf("Expected length 8 for %v, got %X",
idx, valBytes)
break
}
valGot := bytes2Int64(valBytes)
if val != valGot {
b.Errorf("Expected %v for %v, got %v",
val, idx, valGot)
break
}
}
}
}
db.Close()
}
func int642Bytes(i int64) []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(i))
return buf
}
func bytes2Int64(buf []byte) int64 {
return int64(binary.BigEndian.Uint64(buf))
benchmarkRandomReadsWrites(b, db)
} }

+ 1
- 1
libs/db/mem_db.go View File

@ -7,7 +7,7 @@ import (
) )
func init() { func init() {
registerDBCreator(MemDBBackend, func(name string, dir string) (DB, error) {
registerDBCreator(MemDBBackend, func(name, dir string) (DB, error) {
return NewMemDB(), nil return NewMemDB(), nil
}, false) }, false)
} }


+ 5
- 0
libs/db/util_test.go View File

@ -22,6 +22,11 @@ func TestPrefixIteratorNoMatchNil(t *testing.T) {
// Empty iterator for db populated after iterator created. // Empty iterator for db populated after iterator created.
func TestPrefixIteratorNoMatch1(t *testing.T) { func TestPrefixIteratorNoMatch1(t *testing.T) {
for backend := range backends { for backend := range backends {
if backend == BoltDBBackend {
t.Log("bolt does not support concurrent writes while iterating")
continue
}
t.Run(fmt.Sprintf("Prefix w/ backend %s", backend), func(t *testing.T) { t.Run(fmt.Sprintf("Prefix w/ backend %s", backend), func(t *testing.T) {
db, dir := newTempDB(t, backend) db, dir := newTempDB(t, backend)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)


Loading…
Cancel
Save