diff --git a/Gopkg.lock b/Gopkg.lock index 530cd89dd..6d9236559 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -34,6 +34,14 @@ revision = "8991bc29aa16c548c550c7ff78260e27b9ab7c73" version = "v1.1.1" +[[projects]] + digest = "1:5f7414cf41466d4b4dd7ec52b2cd3e481e08cfd11e7e24fef730c0e483e88bb1" + name = "github.com/etcd-io/bbolt" + packages = ["."] + pruneopts = "UT" + revision = "63597a96ec0ad9e6d43c3fc81e809909e0237461" + version = "v1.3.2" + [[projects]] digest = "1:544229a3ca0fb2dd5ebc2896d3d2ff7ce096d9751635301e44e37e761349ee70" name = "github.com/fortytw2/leaktest" @@ -170,9 +178,12 @@ revision = "b84e30acd515aadc4b783ad4ff83aff3299bdfe0" [[projects]] - digest = "1:c568d7727aa262c32bdf8a3f7db83614f7af0ed661474b24588de635c20024c7" + digest = "1:53e8c5c79716437e601696140e8b1801aae4204f4ec54a504333702a49572c4f" name = "github.com/magiconair/properties" - packages = ["."] + packages = [ + ".", + "assert", + ] pruneopts = "UT" revision = "c2353362d570a7bfa228149c62842019201cfb71" version = "v1.8.0" @@ -500,6 +511,7 @@ "github.com/btcsuite/btcd/btcec", "github.com/btcsuite/btcutil/base58", "github.com/btcsuite/btcutil/bech32", + "github.com/etcd-io/bbolt", "github.com/fortytw2/leaktest", "github.com/go-kit/kit/log", "github.com/go-kit/kit/log/level", @@ -516,6 +528,7 @@ "github.com/golang/protobuf/ptypes/timestamp", "github.com/gorilla/websocket", "github.com/jmhodges/levigo", + "github.com/magiconair/properties/assert", "github.com/pkg/errors", "github.com/prometheus/client_golang/prometheus", "github.com/prometheus/client_golang/prometheus/promhttp", diff --git a/Gopkg.toml b/Gopkg.toml index 505f0da4a..e5abf051c 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -22,6 +22,10 @@ ########################################################### # Allow only patch releases for serialization libraries +[[constraint]] + name = "github.com/etcd-io/bbolt" + version = "v1.3.2" + [[constraint]] name = "github.com/tendermint/go-amino" version = "~0.14.1" diff --git a/libs/db/boltdb.go b/libs/db/boltdb.go new file mode 100644 index 000000000..6a31f11a9 --- /dev/null +++ b/libs/db/boltdb.go @@ -0,0 +1,323 @@ +package db + +import ( + "bytes" + "errors" + "fmt" + "os" + "path/filepath" + "sync" + + "github.com/etcd-io/bbolt" +) + +var bucket = []byte("tm") + +func init() { + registerDBCreator(BoltDBBackend, func(name, dir string) (DB, error) { + return NewBoltDB(name, dir) + }, false) +} + +// BoltDB is a wrapper around etcd's fork of bolt +// (https://github.com/etcd-io/bbolt). +// +// NOTE: All operations (including Set, Delete) are synchronous by default. One +// can globally turn it off by using NoSync config option (not recommended). +// +// A single bucket ([]byte("tm")) is used per a database instance. This could +// lead to performance issues when/if there will be lots of keys. +type BoltDB struct { + db *bbolt.DB +} + +// NewBoltDB returns a BoltDB with default options. +func NewBoltDB(name, dir string) (DB, error) { + return NewBoltDBWithOpts(name, dir, bbolt.DefaultOptions) +} + +// NewBoltDBWithOpts allows you to supply *bbolt.Options. ReadOnly: true is not +// supported because NewBoltDBWithOpts creates a global bucket. +func NewBoltDBWithOpts(name string, dir string, opts *bbolt.Options) (DB, error) { + if opts.ReadOnly { + return nil, errors.New("ReadOnly: true is not supported") + } + + dbPath := filepath.Join(dir, name+".db") + db, err := bbolt.Open(dbPath, os.ModePerm, opts) + if err != nil { + return nil, err + } + + // create a global bucket + err = db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(bucket) + return err + }) + if err != nil { + return nil, err + } + + return &BoltDB{db: db}, nil +} + +func (bdb *BoltDB) Get(key []byte) (value []byte) { + key = nonEmptyKey(nonNilBytes(key)) + err := bdb.db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(bucket) + value = b.Get(key) + return nil + }) + if err != nil { + panic(err) + } + return +} + +func (bdb *BoltDB) Has(key []byte) bool { + return bdb.Get(key) != nil +} + +func (bdb *BoltDB) Set(key, value []byte) { + key = nonEmptyKey(nonNilBytes(key)) + value = nonNilBytes(value) + err := bdb.db.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket(bucket) + return b.Put(key, value) + }) + if err != nil { + panic(err) + } +} + +func (bdb *BoltDB) SetSync(key, value []byte) { + bdb.Set(key, value) +} + +func (bdb *BoltDB) Delete(key []byte) { + key = nonEmptyKey(nonNilBytes(key)) + err := bdb.db.Update(func(tx *bbolt.Tx) error { + return tx.Bucket(bucket).Delete(key) + }) + if err != nil { + panic(err) + } +} + +func (bdb *BoltDB) DeleteSync(key []byte) { + bdb.Delete(key) +} + +func (bdb *BoltDB) Close() { + bdb.db.Close() +} + +func (bdb *BoltDB) Print() { + stats := bdb.db.Stats() + fmt.Printf("%v\n", stats) + + err := bdb.db.View(func(tx *bbolt.Tx) error { + tx.Bucket(bucket).ForEach(func(k, v []byte) error { + fmt.Printf("[%X]:\t[%X]\n", k, v) + return nil + }) + return nil + }) + if err != nil { + panic(err) + } +} + +func (bdb *BoltDB) Stats() map[string]string { + stats := bdb.db.Stats() + m := make(map[string]string) + + // Freelist stats + m["FreePageN"] = fmt.Sprintf("%v", stats.FreePageN) + m["PendingPageN"] = fmt.Sprintf("%v", stats.PendingPageN) + m["FreeAlloc"] = fmt.Sprintf("%v", stats.FreeAlloc) + m["FreelistInuse"] = fmt.Sprintf("%v", stats.FreelistInuse) + + // Transaction stats + m["TxN"] = fmt.Sprintf("%v", stats.TxN) + m["OpenTxN"] = fmt.Sprintf("%v", stats.OpenTxN) + + return m +} + +// boltDBBatch stores key values in sync.Map and dumps them to the underlying +// DB upon Write call. +type boltDBBatch struct { + buffer *sync.Map + db *BoltDB +} + +// NewBatch returns a new batch. +func (bdb *BoltDB) NewBatch() Batch { + return &boltDBBatch{ + buffer: &sync.Map{}, + db: bdb, + } +} + +func (bdb *boltDBBatch) Set(key, value []byte) { + bdb.buffer.Store(key, value) +} + +func (bdb *boltDBBatch) Delete(key []byte) { + bdb.buffer.Delete(key) +} + +// NOTE: the operation is synchronous (see BoltDB for reasons) +func (bdb *boltDBBatch) Write() { + err := bdb.db.db.Batch(func(tx *bbolt.Tx) error { + b := tx.Bucket(bucket) + var putErr error + bdb.buffer.Range(func(key, value interface{}) bool { + putErr = b.Put(key.([]byte), value.([]byte)) + return putErr == nil // stop if putErr is not nil + }) + return putErr + }) + if err != nil { + panic(err) + } +} + +func (bdb *boltDBBatch) WriteSync() { + bdb.Write() +} + +func (bdb *boltDBBatch) Close() {} + +// WARNING: Any concurrent writes (Set, SetSync) will block until the Iterator +// is closed. +func (bdb *BoltDB) Iterator(start, end []byte) Iterator { + tx, err := bdb.db.Begin(false) + if err != nil { + panic(err) + } + c := tx.Bucket(bucket).Cursor() + return newBoltDBIterator(c, start, end, false) +} + +// WARNING: Any concurrent writes (Set, SetSync) will block until the Iterator +// is closed. +func (bdb *BoltDB) ReverseIterator(start, end []byte) Iterator { + tx, err := bdb.db.Begin(false) + if err != nil { + panic(err) + } + c := tx.Bucket(bucket).Cursor() + return newBoltDBIterator(c, start, end, true) +} + +// boltDBIterator allows you to iterate on range of keys/values given some +// start / end keys (nil & nil will result in doing full scan). +type boltDBIterator struct { + itr *bbolt.Cursor + start []byte + end []byte + + currentKey []byte + currentValue []byte + + isInvalid bool + isReverse bool +} + +func newBoltDBIterator(itr *bbolt.Cursor, start, end []byte, isReverse bool) *boltDBIterator { + var ck, cv []byte + if isReverse { + if end == nil { + ck, cv = itr.Last() + } else { + _, _ = itr.Seek(end) // after key + ck, cv = itr.Prev() // return to end key + } + } else { + if start == nil { + ck, cv = itr.First() + } else { + ck, cv = itr.Seek(start) + } + } + + return &boltDBIterator{ + itr: itr, + start: start, + end: end, + currentKey: ck, + currentValue: cv, + isReverse: isReverse, + isInvalid: false, + } +} + +func (itr *boltDBIterator) Domain() ([]byte, []byte) { + return itr.start, itr.end +} + +func (itr *boltDBIterator) Valid() bool { + if itr.isInvalid { + return false + } + + // iterated to the end of the cursor + if len(itr.currentKey) == 0 { + itr.isInvalid = true + return false + } + + if itr.isReverse { + if itr.start != nil && bytes.Compare(itr.currentKey, itr.start) < 0 { + itr.isInvalid = true + return false + } + } else { + if itr.end != nil && bytes.Compare(itr.end, itr.currentKey) <= 0 { + itr.isInvalid = true + return false + } + } + + // Valid + return true +} + +func (itr *boltDBIterator) Next() { + itr.assertIsValid() + if itr.isReverse { + itr.currentKey, itr.currentValue = itr.itr.Prev() + } else { + itr.currentKey, itr.currentValue = itr.itr.Next() + } +} + +func (itr *boltDBIterator) Key() []byte { + itr.assertIsValid() + return itr.currentKey +} + +func (itr *boltDBIterator) Value() []byte { + itr.assertIsValid() + return itr.currentValue +} + +// boltdb cursor has no close op. +func (itr *boltDBIterator) Close() {} + +func (itr *boltDBIterator) assertIsValid() { + if !itr.Valid() { + panic("Boltdb-iterator is invalid") + } +} + +// nonEmptyKey returns a []byte("nil") if key is empty. +// WARNING: this may collude with "nil" user key! +func nonEmptyKey(key []byte) []byte { + if len(key) == 0 { + return []byte("nil") + } + return key +} diff --git a/libs/db/boltdb_test.go b/libs/db/boltdb_test.go new file mode 100644 index 000000000..ced57d2e0 --- /dev/null +++ b/libs/db/boltdb_test.go @@ -0,0 +1,35 @@ +package db + +import ( + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/require" + + cmn "github.com/tendermint/tendermint/libs/common" +) + +func TestBoltDBNewBoltDB(t *testing.T) { + name := fmt.Sprintf("test_%x", cmn.RandStr(12)) + dir := os.TempDir() + defer cleanupDBDir(dir, name) + + db, err := NewBoltDB(name, dir) + require.NoError(t, err) + db.Close() +} + +func BenchmarkBoltDBRandomReadsWrites(b *testing.B) { + name := fmt.Sprintf("test_%x", cmn.RandStr(12)) + db, err := NewBoltDB(name, "") + if err != nil { + b.Fatal(err) + } + defer func() { + db.Close() + cleanupDBDir("", name) + }() + + benchmarkRandomReadsWrites(b, db) +} diff --git a/libs/db/common_test.go b/libs/db/common_test.go index 1e27a7cac..64a86979c 100644 --- a/libs/db/common_test.go +++ b/libs/db/common_test.go @@ -1,6 +1,8 @@ package db import ( + "bytes" + "encoding/binary" "fmt" "io/ioutil" "sync" @@ -8,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + cmn "github.com/tendermint/tendermint/libs/common" ) //---------------------------------------- @@ -188,3 +191,66 @@ func (mockIterator) Value() []byte { func (mockIterator) Close() { } + +func benchmarkRandomReadsWrites(b *testing.B, db DB) { + b.StopTimer() + + // create dummy data + const numItems = int64(1000000) + internal := map[int64]int64{} + for i := 0; i < int(numItems); i++ { + internal[int64(i)] = int64(0) + } + + // fmt.Println("ok, starting") + b.StartTimer() + + for i := 0; i < b.N; i++ { + // Write something + { + idx := int64(cmn.RandInt()) % numItems + internal[idx]++ + val := internal[idx] + idxBytes := int642Bytes(int64(idx)) + valBytes := int642Bytes(int64(val)) + //fmt.Printf("Set %X -> %X\n", idxBytes, valBytes) + db.Set(idxBytes, valBytes) + } + + // Read something + { + idx := int64(cmn.RandInt()) % numItems + valExp := internal[idx] + idxBytes := int642Bytes(int64(idx)) + valBytes := db.Get(idxBytes) + //fmt.Printf("Get %X -> %X\n", idxBytes, valBytes) + if valExp == 0 { + if !bytes.Equal(valBytes, nil) { + b.Errorf("Expected %v for %v, got %X", nil, idx, valBytes) + break + } + } else { + if len(valBytes) != 8 { + b.Errorf("Expected length 8 for %v, got %X", idx, valBytes) + break + } + valGot := bytes2Int64(valBytes) + if valExp != valGot { + b.Errorf("Expected %v for %v, got %v", valExp, idx, valGot) + break + } + } + } + + } +} + +func int642Bytes(i int64) []byte { + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, uint64(i)) + return buf +} + +func bytes2Int64(buf []byte) int64 { + return int64(binary.BigEndian.Uint64(buf)) +} diff --git a/libs/db/db.go b/libs/db/db.go index 8a3975a84..43b788281 100644 --- a/libs/db/db.go +++ b/libs/db/db.go @@ -16,6 +16,7 @@ const ( GoLevelDBBackend DBBackendType = "goleveldb" MemDBBackend DBBackendType = "memdb" FSDBBackend DBBackendType = "fsdb" // using the filesystem naively + BoltDBBackend DBBackendType = "boltdb" ) type dbCreator func(name string, dir string) (DB, error) diff --git a/libs/db/fsdb.go b/libs/db/fsdb.go index 2d82e7749..ca8eefe94 100644 --- a/libs/db/fsdb.go +++ b/libs/db/fsdb.go @@ -20,7 +20,7 @@ const ( ) func init() { - registerDBCreator(FSDBBackend, func(name string, dir string) (DB, error) { + registerDBCreator(FSDBBackend, func(name, dir string) (DB, error) { dbPath := filepath.Join(dir, name+".db") return NewFSDB(dbPath), nil }, false) diff --git a/libs/db/go_level_db_test.go b/libs/db/go_level_db_test.go index c24eec3c8..f781a2b3d 100644 --- a/libs/db/go_level_db_test.go +++ b/libs/db/go_level_db_test.go @@ -1,29 +1,27 @@ package db import ( - "bytes" - "encoding/binary" "fmt" - "os" "testing" + "github.com/stretchr/testify/require" "github.com/syndtr/goleveldb/leveldb/opt" - "github.com/stretchr/testify/require" cmn "github.com/tendermint/tendermint/libs/common" ) -func TestNewGoLevelDB(t *testing.T) { +func TestGoLevelDBNewGoLevelDB(t *testing.T) { name := fmt.Sprintf("test_%x", cmn.RandStr(12)) - // Test write locks - db, err := NewGoLevelDB(name, "") + defer cleanupDBDir("", name) + + // Test we can't open the db twice for writing + wr1, err := NewGoLevelDB(name, "") require.Nil(t, err) - defer os.RemoveAll("./" + name + ".db") _, err = NewGoLevelDB(name, "") require.NotNil(t, err) - db.Close() // Close the db to release the lock + wr1.Close() // Close the db to release the lock - // Open the db twice in a row to test read-only locks + // Test we can open the db twice for reading only ro1, err := NewGoLevelDBWithOpts(name, "", &opt.Options{ReadOnly: true}) defer ro1.Close() require.Nil(t, err) @@ -32,75 +30,16 @@ func TestNewGoLevelDB(t *testing.T) { require.Nil(t, err) } -func BenchmarkRandomReadsWrites(b *testing.B) { - b.StopTimer() - - numItems := int64(1000000) - internal := map[int64]int64{} - for i := 0; i < int(numItems); i++ { - internal[int64(i)] = int64(0) - } - db, err := NewGoLevelDB(fmt.Sprintf("test_%x", cmn.RandStr(12)), "") +func BenchmarkGoLevelDBRandomReadsWrites(b *testing.B) { + name := fmt.Sprintf("test_%x", cmn.RandStr(12)) + db, err := NewGoLevelDB(name, "") if err != nil { - b.Fatal(err.Error()) - return + b.Fatal(err) } + defer func() { + db.Close() + cleanupDBDir("", name) + }() - fmt.Println("ok, starting") - b.StartTimer() - - for i := 0; i < b.N; i++ { - // Write something - { - idx := (int64(cmn.RandInt()) % numItems) - internal[idx]++ - val := internal[idx] - idxBytes := int642Bytes(int64(idx)) - valBytes := int642Bytes(int64(val)) - //fmt.Printf("Set %X -> %X\n", idxBytes, valBytes) - db.Set( - idxBytes, - valBytes, - ) - } - // Read something - { - idx := (int64(cmn.RandInt()) % numItems) - val := internal[idx] - idxBytes := int642Bytes(int64(idx)) - valBytes := db.Get(idxBytes) - //fmt.Printf("Get %X -> %X\n", idxBytes, valBytes) - if val == 0 { - if !bytes.Equal(valBytes, nil) { - b.Errorf("Expected %v for %v, got %X", - nil, idx, valBytes) - break - } - } else { - if len(valBytes) != 8 { - b.Errorf("Expected length 8 for %v, got %X", - idx, valBytes) - break - } - valGot := bytes2Int64(valBytes) - if val != valGot { - b.Errorf("Expected %v for %v, got %v", - val, idx, valGot) - break - } - } - } - } - - db.Close() -} - -func int642Bytes(i int64) []byte { - buf := make([]byte, 8) - binary.BigEndian.PutUint64(buf, uint64(i)) - return buf -} - -func bytes2Int64(buf []byte) int64 { - return int64(binary.BigEndian.Uint64(buf)) + benchmarkRandomReadsWrites(b, db) } diff --git a/libs/db/mem_db.go b/libs/db/mem_db.go index ff516bc7d..fc567577d 100644 --- a/libs/db/mem_db.go +++ b/libs/db/mem_db.go @@ -7,7 +7,7 @@ import ( ) func init() { - registerDBCreator(MemDBBackend, func(name string, dir string) (DB, error) { + registerDBCreator(MemDBBackend, func(name, dir string) (DB, error) { return NewMemDB(), nil }, false) } diff --git a/libs/db/util_test.go b/libs/db/util_test.go index 07f9dd23e..39a02160c 100644 --- a/libs/db/util_test.go +++ b/libs/db/util_test.go @@ -22,6 +22,11 @@ func TestPrefixIteratorNoMatchNil(t *testing.T) { // Empty iterator for db populated after iterator created. func TestPrefixIteratorNoMatch1(t *testing.T) { for backend := range backends { + if backend == BoltDBBackend { + t.Log("bolt does not support concurrent writes while iterating") + continue + } + t.Run(fmt.Sprintf("Prefix w/ backend %s", backend), func(t *testing.T) { db, dir := newTempDB(t, backend) defer os.RemoveAll(dir)