From 9f81134388b0ddceeb0384f57c4f210018f27297 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Tue, 29 Nov 2016 16:06:36 -0800 Subject: [PATCH] Add support for levigo bindings --- db.go | 14 ++++++ level_db.go | 27 ++++++++++ level_db2.go | 124 ++++++++++++++++++++++++++++++++++++++++++++++ level_db2_test.go | 84 +++++++++++++++++++++++++++++++ mem_db.go | 64 +++++++++++++++++++++++- 5 files changed, 312 insertions(+), 1 deletion(-) create mode 100644 level_db2.go create mode 100644 level_db2_test.go diff --git a/db.go b/db.go index 6bb1efae7..ab38501e2 100644 --- a/db.go +++ b/db.go @@ -13,16 +13,24 @@ type DB interface { Delete([]byte) DeleteSync([]byte) Close() + NewBatch() Batch // For debugging Print() } +type Batch interface { + Set(key, value []byte) + Delete(key []byte) + Write() +} + //----------------------------------------------------------------------------- // Database types const DBBackendMemDB = "memdb" const DBBackendLevelDB = "leveldb" +const DBBackendLevelDB2 = "leveldb2" func NewDB(name string, backend string, dir string) DB { switch backend { @@ -35,6 +43,12 @@ func NewDB(name string, backend string, dir string) DB { PanicCrisis(err) } return db + case DBBackendLevelDB2: + db, err := NewLevelDB2(path.Join(dir, name+".db")) + if err != nil { + PanicCrisis(err) + } + return db default: PanicSanity(Fmt("Unknown DB backend: %v", backend)) } diff --git a/level_db.go b/level_db.go index dee57a321..360362285 100644 --- a/level_db.go +++ b/level_db.go @@ -81,3 +81,30 @@ func (db *LevelDB) Print() { fmt.Printf("[%X]:\t[%X]\n", key, value) } } + +func (db *LevelDB) NewBatch() Batch { + batch := new(leveldb.Batch) + return &levelDBBatch{db, batch} +} + +//-------------------------------------------------------------------------------- + +type levelDBBatch struct { + db *LevelDB + batch *leveldb.Batch +} + +func (mBatch *levelDBBatch) Set(key, value []byte) { + mBatch.batch.Put(key, value) +} + +func (mBatch *levelDBBatch) Delete(key []byte) { + mBatch.batch.Delete(key) +} + +func (mBatch *levelDBBatch) Write() { + err := mBatch.db.db.Write(mBatch.batch, nil) + if err != nil { + PanicCrisis(err) + } +} diff --git a/level_db2.go b/level_db2.go new file mode 100644 index 000000000..e3049aae3 --- /dev/null +++ b/level_db2.go @@ -0,0 +1,124 @@ +package db + +import ( + "fmt" + "path" + + "github.com/jmhodges/levigo" + + . "github.com/tendermint/go-common" +) + +type LevelDB2 struct { + db *levigo.DB + ro *levigo.ReadOptions + wo *levigo.WriteOptions + woSync *levigo.WriteOptions +} + +func NewLevelDB2(name string) (*LevelDB2, error) { + dbPath := path.Join(name) + + opts := levigo.NewOptions() + opts.SetCache(levigo.NewLRUCache(1 << 30)) + opts.SetCreateIfMissing(true) + db, err := levigo.Open(dbPath, opts) + if err != nil { + return nil, err + } + ro := levigo.NewReadOptions() + wo := levigo.NewWriteOptions() + woSync := levigo.NewWriteOptions() + woSync.SetSync(true) + database := &LevelDB2{ + db: db, + ro: ro, + wo: wo, + woSync: woSync, + } + return database, nil +} + +func (db *LevelDB2) Get(key []byte) []byte { + res, err := db.db.Get(db.ro, key) + if err != nil { + PanicCrisis(err) + } + return res +} + +func (db *LevelDB2) Set(key []byte, value []byte) { + err := db.db.Put(db.wo, key, value) + if err != nil { + PanicCrisis(err) + } +} + +func (db *LevelDB2) SetSync(key []byte, value []byte) { + err := db.db.Put(db.woSync, key, value) + if err != nil { + PanicCrisis(err) + } +} + +func (db *LevelDB2) Delete(key []byte) { + err := db.db.Delete(db.wo, key) + if err != nil { + PanicCrisis(err) + } +} + +func (db *LevelDB2) DeleteSync(key []byte) { + err := db.db.Delete(db.woSync, key) + if err != nil { + PanicCrisis(err) + } +} + +func (db *LevelDB2) DB() *levigo.DB { + return db.db +} + +func (db *LevelDB2) Close() { + db.db.Close() + db.ro.Close() + db.wo.Close() + db.woSync.Close() +} + +func (db *LevelDB2) Print() { + iter := db.db.NewIterator(db.ro) + defer iter.Close() + for iter.Seek(nil); iter.Valid(); iter.Next() { + key := iter.Key() + value := iter.Value() + fmt.Printf("[%X]:\t[%X]\n", key, value) + } +} + +func (db *LevelDB2) NewBatch() Batch { + batch := levigo.NewWriteBatch() + return &levelDB2Batch{db, batch} +} + +//-------------------------------------------------------------------------------- + +type levelDB2Batch struct { + db *LevelDB2 + batch *levigo.WriteBatch +} + +func (mBatch *levelDB2Batch) Set(key, value []byte) { + mBatch.batch.Put(key, value) +} + +func (mBatch *levelDB2Batch) Delete(key []byte) { + mBatch.batch.Delete(key) +} + +func (mBatch *levelDB2Batch) Write() { + err := mBatch.db.db.Write(mBatch.db.wo, mBatch.batch) + if err != nil { + PanicCrisis(err) + } +} diff --git a/level_db2_test.go b/level_db2_test.go new file mode 100644 index 000000000..27a558407 --- /dev/null +++ b/level_db2_test.go @@ -0,0 +1,84 @@ +package db + +import ( + "bytes" + "fmt" + "testing" + + . "github.com/tendermint/go-common" +) + +func BenchmarkRandomReadsWrites2(b *testing.B) { + b.StopTimer() + + numItems := int64(1000000) + internal := map[int64]int64{} + for i := 0; i < int(numItems); i++ { + internal[int64(i)] = int64(0) + } + db, err := NewLevelDB2(Fmt("test_%x", RandStr(12))) + if err != nil { + b.Fatal(err.Error()) + return + } + + fmt.Println("ok, starting") + b.StartTimer() + + for i := 0; i < b.N; i++ { + // Write something + { + idx := (int64(RandInt()) % numItems) + internal[idx] += 1 + val := internal[idx] + idxBytes := int642Bytes(int64(idx)) + valBytes := int642Bytes(int64(val)) + //fmt.Printf("Set %X -> %X\n", idxBytes, valBytes) + db.Set( + idxBytes, + valBytes, + ) + } + // Read something + { + idx := (int64(RandInt()) % numItems) + val := internal[idx] + idxBytes := int642Bytes(int64(idx)) + valBytes := db.Get(idxBytes) + //fmt.Printf("Get %X -> %X\n", idxBytes, valBytes) + if val == 0 { + if !bytes.Equal(valBytes, nil) { + b.Errorf("Expected %X for %v, got %X", + nil, idx, valBytes) + break + } + } else { + if len(valBytes) != 8 { + b.Errorf("Expected length 8 for %v, got %X", + idx, valBytes) + break + } + valGot := bytes2Int64(valBytes) + if val != valGot { + b.Errorf("Expected %v for %v, got %v", + val, idx, valGot) + break + } + } + } + } + + db.Close() +} + +/* +func int642Bytes(i int64) []byte { + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, uint64(i)) + return buf +} + +func bytes2Int64(buf []byte) int64 { + return int64(binary.BigEndian.Uint64(buf)) +} +*/ diff --git a/mem_db.go b/mem_db.go index b7d8918d4..d27159dab 100644 --- a/mem_db.go +++ b/mem_db.go @@ -2,10 +2,12 @@ package db import ( "fmt" + "sync" ) type MemDB struct { - db map[string][]byte + mtx sync.Mutex + db map[string][]byte } func NewMemDB() *MemDB { @@ -14,31 +16,91 @@ func NewMemDB() *MemDB { } func (db *MemDB) Get(key []byte) []byte { + db.mtx.Lock() + defer db.mtx.Unlock() return db.db[string(key)] } func (db *MemDB) Set(key []byte, value []byte) { + db.mtx.Lock() + defer db.mtx.Unlock() db.db[string(key)] = value } func (db *MemDB) SetSync(key []byte, value []byte) { + db.mtx.Lock() + defer db.mtx.Unlock() db.db[string(key)] = value } func (db *MemDB) Delete(key []byte) { + db.mtx.Lock() + defer db.mtx.Unlock() delete(db.db, string(key)) } func (db *MemDB) DeleteSync(key []byte) { + db.mtx.Lock() + defer db.mtx.Unlock() delete(db.db, string(key)) } func (db *MemDB) Close() { + db.mtx.Lock() + defer db.mtx.Unlock() db = nil } func (db *MemDB) Print() { + db.mtx.Lock() + defer db.mtx.Unlock() for key, value := range db.db { fmt.Printf("[%X]:\t[%X]\n", []byte(key), value) } } + +func (db *MemDB) NewBatch() Batch { + return &memDBBatch{db, nil} +} + +//-------------------------------------------------------------------------------- + +type memDBBatch struct { + db *MemDB + ops []operation +} + +type opType int + +const ( + opTypeSet = 1 + opTypeDelete = 2 +) + +type operation struct { + opType + key []byte + value []byte +} + +func (mBatch *memDBBatch) Set(key, value []byte) { + mBatch.ops = append(mBatch.ops, operation{opTypeSet, key, value}) +} + +func (mBatch *memDBBatch) Delete(key []byte) { + mBatch.ops = append(mBatch.ops, operation{opTypeDelete, key, nil}) +} + +func (mBatch *memDBBatch) Write() { + mBatch.db.mtx.Lock() + defer mBatch.db.mtx.Unlock() + + for _, op := range mBatch.ops { + if op.opType == opTypeSet { + mBatch.db.db[string(op.key)] = op.value + } else if op.opType == opTypeDelete { + delete(mBatch.db.db, string(op.key)) + } + } + +}