Browse Source

Make it compile

pull/1842/head
Jae Kwon 7 years ago
parent
commit
0d03cd9e31
8 changed files with 161 additions and 385 deletions
  1. +21
    -16
      db/c_level_db.go
  2. +0
    -233
      db/cache_db.go
  3. +3
    -1
      db/db.go
  4. +38
    -23
      db/fsdb.go
  5. +28
    -14
      db/go_level_db.go
  6. +31
    -22
      db/mem_db.go
  7. +15
    -4
      db/types.go
  8. +25
    -72
      db/util.go

+ 21
- 16
db/c_level_db.go View File

@ -22,8 +22,6 @@ type CLevelDB struct {
ro *levigo.ReadOptions
wo *levigo.WriteOptions
woSync *levigo.WriteOptions
cwwMutex
}
func NewCLevelDB(name string, dir string) (*CLevelDB, error) {
@ -45,8 +43,6 @@ func NewCLevelDB(name string, dir string) (*CLevelDB, error) {
ro: ro,
wo: wo,
woSync: woSync,
cwwMutex: NewCWWMutex(),
}
return database, nil
}
@ -59,6 +55,10 @@ func (db *CLevelDB) Get(key []byte) []byte {
return res
}
func (db *CLevelDB) Has(key []byte) bool {
panic("not implemented yet")
}
func (db *CLevelDB) Set(key []byte, value []byte) {
err := db.db.Put(db.wo, key, value)
if err != nil {
@ -99,9 +99,9 @@ func (db *CLevelDB) Close() {
}
func (db *CLevelDB) Print() {
itr := db.Iterator()
defer itr.Close()
for itr.Seek(nil); itr.Valid(); itr.Next() {
itr := db.Iterator(BeginningKey(), EndingKey())
defer itr.Release()
for ; itr.Valid(); itr.Next() {
key := itr.Key()
value := itr.Value()
fmt.Printf("[%X]:\t[%X]\n", key, value)
@ -120,10 +120,6 @@ func (db *CLevelDB) Stats() map[string]string {
return stats
}
func (db *CLevelDB) CacheDB() CacheDB {
return NewCacheDB(db, db.GetWriteLockVersion())
}
//----------------------------------------
// Batch
@ -155,10 +151,19 @@ func (mBatch *cLevelDBBatch) Write() {
//----------------------------------------
// Iterator
func (db *CLevelDB) Iterator() Iterator {
itr := db.db.NewIterator(db.ro)
itr.Seek([]byte{0x00})
return cLevelDBIterator{itr}
func (db *CLevelDB) Iterator(start, end []byte) Iterator {
/*
XXX
itr := db.db.NewIterator(db.ro)
itr.Seek([]byte{0x00})
return cLevelDBIterator{itr}
*/
return nil
}
func (db *CLevelDB) ReverseIterator(start, end []byte) Iterator {
// XXX
return nil
}
type cLevelDBIterator struct {
@ -204,7 +209,7 @@ func (c cLevelDBIterator) Prev() {
c.itr.Prev()
}
func (c cLevelDBIterator) Close() {
func (c cLevelDBIterator) Release() {
c.itr.Close()
}


+ 0
- 233
db/cache_db.go View File

@ -1,233 +0,0 @@
/*
package db
import (
"fmt"
"sort"
"sync"
"sync/atomic"
)
// If value is nil but deleted is false,
// it means the parent doesn't have the key.
// (No need to delete upon Write())
type cDBValue struct {
value []byte
deleted bool
dirty bool
}
// cacheDB wraps an in-memory cache around an underlying DB.
type cacheDB struct {
mtx sync.Mutex
cache map[string]cDBValue
parent DB
lockVersion interface{}
cwwMutex
}
// Needed by MultiStore.CacheWrap().
var _ atomicSetDeleter = (*cacheDB)(nil)
var _ CacheDB = (*cacheDB)(nil)
// Users should typically not be required to call NewCacheDB directly, as the
// DB implementations here provide a .CacheDB() function already.
// `lockVersion` is typically provided by parent.GetWriteLockVersion().
func NewCacheDB(parent DB, lockVersion interface{}) CacheDB {
db := &cacheDB{
cache: make(map[string]cDBValue),
parent: parent,
lockVersion: lockVersion,
cwwMutex: NewCWWMutex(),
}
return db
}
func (db *cacheDB) Get(key []byte) []byte {
db.mtx.Lock()
defer db.mtx.Unlock()
dbValue, ok := db.cache[string(key)]
if !ok {
data := db.parent.Get(key)
dbValue = cDBValue{value: data, deleted: false, dirty: false}
db.cache[string(key)] = dbValue
}
return dbValue.value
}
func (db *cacheDB) Set(key []byte, value []byte) {
db.mtx.Lock()
defer db.mtx.Unlock()
db.SetNoLock(key, value)
}
func (db *cacheDB) SetSync(key []byte, value []byte) {
db.mtx.Lock()
defer db.mtx.Unlock()
db.SetNoLock(key, value)
}
func (db *cacheDB) SetNoLock(key []byte, value []byte) {
db.cache[string(key)] = cDBValue{value: value, deleted: false, dirty: true}
}
func (db *cacheDB) Delete(key []byte) {
db.mtx.Lock()
defer db.mtx.Unlock()
db.DeleteNoLock(key)
}
func (db *cacheDB) DeleteSync(key []byte) {
db.mtx.Lock()
defer db.mtx.Unlock()
db.DeleteNoLock(key)
}
func (db *cacheDB) DeleteNoLock(key []byte) {
db.cache[string(key)] = cDBValue{value: nil, deleted: true, dirty: true}
}
func (db *cacheDB) Close() {
db.mtx.Lock()
defer db.mtx.Unlock()
db.parent.Close()
}
func (db *cacheDB) Print() {
db.mtx.Lock()
defer db.mtx.Unlock()
fmt.Println("cacheDB\ncache:")
for key, value := range db.cache {
fmt.Printf("[%X]:\t[%v]\n", []byte(key), value)
}
fmt.Println("\nparent:")
db.parent.Print()
}
func (db *cacheDB) Stats() map[string]string {
db.mtx.Lock()
defer db.mtx.Unlock()
stats := make(map[string]string)
stats["cache.size"] = fmt.Sprintf("%d", len(db.cache))
stats["cache.lock_version"] = fmt.Sprintf("%v", db.lockVersion)
mergeStats(db.parent.Stats(), stats, "parent.")
return stats
}
func (db *cacheDB) Iterator() Iterator {
panic("cacheDB.Iterator() not yet supported")
}
func (db *cacheDB) NewBatch() Batch {
return &memBatch{db, nil}
}
// Implements `atomicSetDeleter` for Batch support.
func (db *cacheDB) Mutex() *sync.Mutex {
return &(db.mtx)
}
// Write writes pending updates to the parent database and clears the cache.
func (db *cacheDB) Write() {
db.mtx.Lock()
defer db.mtx.Unlock()
// Optional sanity check to ensure that cacheDB is valid
if parent, ok := db.parent.(WriteLocker); ok {
if parent.TryWriteLock(db.lockVersion) {
// All good!
} else {
panic("cacheDB.Write() failed. Did this CacheDB expire?")
}
}
// We need a copy of all of the keys.
// Not the best, but probably not a bottleneck depending.
keys := make([]string, 0, len(db.cache))
for key, dbValue := range db.cache {
if dbValue.dirty {
keys = append(keys, key)
}
}
sort.Strings(keys)
batch := db.parent.NewBatch()
for _, key := range keys {
dbValue := db.cache[key]
if dbValue.deleted {
batch.Delete([]byte(key))
} else if dbValue.value == nil {
// Skip, it already doesn't exist in parent.
} else {
batch.Set([]byte(key), dbValue.value)
}
}
batch.Write()
// Clear the cache
db.cache = make(map[string]cDBValue)
}
//----------------------------------------
// To cache-wrap this cacheDB further.
func (db *cacheDB) CacheDB() CacheDB {
return NewCacheDB(db, db.GetWriteLockVersion())
}
// If the parent parent DB implements this, (e.g. such as a cacheDB parent to a
// cacheDB child), cacheDB will call `parent.TryWriteLock()` before attempting
// to write.
type WriteLocker interface {
GetWriteLockVersion() (lockVersion interface{})
TryWriteLock(lockVersion interface{}) bool
}
// Implements TryWriteLocker. Embed this in DB structs if desired.
type cwwMutex struct {
mtx sync.Mutex
// CONTRACT: reading/writing to `*written` should use `atomic.*`.
// CONTRACT: replacing `written` with another *int32 should use `.mtx`.
written *int32
}
func NewCWWMutex() cwwMutex {
return cwwMutex{
written: new(int32),
}
}
func (cww *cwwMutex) GetWriteLockVersion() interface{} {
cww.mtx.Lock()
defer cww.mtx.Unlock()
// `written` works as a "version" object because it gets replaced upon
// successful TryWriteLock.
return cww.written
}
func (cww *cwwMutex) TryWriteLock(version interface{}) bool {
cww.mtx.Lock()
defer cww.mtx.Unlock()
if version != cww.written {
return false // wrong "WriteLockVersion"
}
if !atomic.CompareAndSwapInt32(cww.written, 0, 1) {
return false // already written
}
// New "WriteLockVersion"
cww.written = new(int32)
return true
}
*/

+ 3
- 1
db/db.go View File

@ -1,5 +1,7 @@
package db
import "fmt"
//-----------------------------------------------------------------------------
// Main entry
@ -26,7 +28,7 @@ func registerDBCreator(backend string, creator dbCreator, force bool) {
func NewDB(name string, backend string, dir string) DB {
db, err := backends[backend](name, dir)
if err != nil {
PanicSanity(Fmt("Error initializing DB: %v", err))
panic(fmt.Sprintf("Error initializing DB: %v", err))
}
return db
}

+ 38
- 23
db/fsdb.go View File

@ -7,7 +7,6 @@ import (
"os"
"path"
"path/filepath"
"sort"
"sync"
"github.com/pkg/errors"
@ -29,8 +28,6 @@ func init() {
type FSDB struct {
mtx sync.Mutex
dir string
cwwMutex
}
func NewFSDB(dir string) *FSDB {
@ -39,8 +36,7 @@ func NewFSDB(dir string) *FSDB {
panic(errors.Wrap(err, "Creating FSDB dir "+dir))
}
database := &FSDB{
dir: dir,
cwwMutex: NewCWWMutex(),
dir: dir,
}
return database
}
@ -59,6 +55,20 @@ func (db *FSDB) Get(key []byte) []byte {
return value
}
func (db *FSDB) Has(key []byte) bool {
db.mtx.Lock()
defer db.mtx.Unlock()
path := db.nameToPath(key)
_, err := read(path)
if os.IsNotExist(err) {
return false
} else if err != nil {
panic(errors.Wrap(err, fmt.Sprintf("Getting key %s (0x%X)", string(key), key)))
}
return true
}
func (db *FSDB) Set(key []byte, value []byte) {
db.mtx.Lock()
defer db.mtx.Unlock()
@ -140,27 +150,32 @@ func (db *FSDB) Mutex() *sync.Mutex {
return &(db.mtx)
}
func (db *FSDB) CacheDB() CacheDB {
return NewCacheDB(db, db.GetWriteLockVersion())
}
func (db *FSDB) Iterator(start, end []byte) Iterator {
/*
XXX
it := newMemDBIterator()
it.db = db
it.cur = 0
func (db *FSDB) Iterator() Iterator {
it := newMemDBIterator()
it.db = db
it.cur = 0
db.mtx.Lock()
defer db.mtx.Unlock()
db.mtx.Lock()
defer db.mtx.Unlock()
// We need a copy of all of the keys.
// Not the best, but probably not a bottleneck depending.
keys, err := list(db.dir)
if err != nil {
panic(errors.Wrap(err, fmt.Sprintf("Listing keys in %s", db.dir)))
}
sort.Strings(keys)
it.keys = keys
return it
*/
return nil
}
// We need a copy of all of the keys.
// Not the best, but probably not a bottleneck depending.
keys, err := list(db.dir)
if err != nil {
panic(errors.Wrap(err, fmt.Sprintf("Listing keys in %s", db.dir)))
}
sort.Strings(keys)
it.keys = keys
return it
func (db *FSDB) ReverseIterator(start, end []byte) Iterator {
// XXX
return nil
}
func (db *FSDB) nameToPath(name []byte) string {


+ 28
- 14
db/go_level_db.go View File

@ -22,8 +22,6 @@ func init() {
type GoLevelDB struct {
db *leveldb.DB
cwwMutex
}
func NewGoLevelDB(name string, dir string) (*GoLevelDB, error) {
@ -33,8 +31,7 @@ func NewGoLevelDB(name string, dir string) (*GoLevelDB, error) {
return nil, err
}
database := &GoLevelDB{
db: db,
cwwMutex: NewCWWMutex(),
db: db,
}
return database, nil
}
@ -51,6 +48,18 @@ func (db *GoLevelDB) Get(key []byte) []byte {
return res
}
func (db *GoLevelDB) Has(key []byte) bool {
_, err := db.db.Get(key, nil)
if err != nil {
if err == errors.ErrNotFound {
return false
} else {
PanicCrisis(err)
}
}
return true
}
func (db *GoLevelDB) Set(key []byte, value []byte) {
err := db.db.Put(key, value, nil)
if err != nil {
@ -121,10 +130,6 @@ func (db *GoLevelDB) Stats() map[string]string {
return stats
}
func (db *GoLevelDB) CacheDB() CacheDB {
return NewCacheDB(db, db.GetWriteLockVersion())
}
//----------------------------------------
// Batch
@ -156,12 +161,21 @@ func (mBatch *goLevelDBBatch) Write() {
//----------------------------------------
// Iterator
func (db *GoLevelDB) Iterator() Iterator {
itr := &goLevelDBIterator{
source: db.db.NewIterator(nil, nil),
}
itr.Seek(nil)
return itr
func (db *GoLevelDB) Iterator(start, end []byte) Iterator {
/*
XXX
itr := &goLevelDBIterator{
source: db.db.NewIterator(nil, nil),
}
itr.Seek(nil)
return itr
*/
return nil
}
func (db *GoLevelDB) ReverseIterator(start, end []byte) Iterator {
// XXX
return nil
}
type goLevelDBIterator struct {


+ 31
- 22
db/mem_db.go View File

@ -3,7 +3,6 @@ package db
import (
"bytes"
"fmt"
"sort"
"sync"
)
@ -16,14 +15,11 @@ func init() {
type MemDB struct {
mtx sync.Mutex
db map[string][]byte
cwwMutex
}
func NewMemDB() *MemDB {
database := &MemDB{
db: make(map[string][]byte),
cwwMutex: NewCWWMutex(),
db: make(map[string][]byte),
}
return database
}
@ -35,6 +31,14 @@ func (db *MemDB) Get(key []byte) []byte {
return db.db[string(key)]
}
func (db *MemDB) Has(key []byte) bool {
db.mtx.Lock()
defer db.mtx.Unlock()
_, ok := db.db[string(key)]
return ok
}
func (db *MemDB) Set(key []byte, value []byte) {
db.mtx.Lock()
defer db.mtx.Unlock()
@ -114,27 +118,32 @@ func (db *MemDB) Mutex() *sync.Mutex {
return &(db.mtx)
}
func (db *MemDB) CacheDB() CacheDB {
return NewCacheDB(db, db.GetWriteLockVersion())
}
//----------------------------------------
func (db *MemDB) Iterator() Iterator {
it := newMemDBIterator()
it.db = db
it.cur = 0
func (db *MemDB) Iterator(start, end []byte) Iterator {
/*
XXX
it := newMemDBIterator()
it.db = db
it.cur = 0
db.mtx.Lock()
defer db.mtx.Unlock()
db.mtx.Lock()
defer db.mtx.Unlock()
// We need a copy of all of the keys.
// Not the best, but probably not a bottleneck depending.
for key, _ := range db.db {
it.keys = append(it.keys, key)
}
sort.Strings(it.keys)
return it
// We need a copy of all of the keys.
// Not the best, but probably not a bottleneck depending.
for key, _ := range db.db {
it.keys = append(it.keys, key)
}
sort.Strings(it.keys)
return it
*/
return nil
}
func (db *MemDB) ReverseIterator(start, end []byte) Iterator {
// XXX
return nil
}
type memDBIterator struct {


+ 15
- 4
db/types.go View File

@ -54,12 +54,23 @@ type SetDeleter interface {
//----------------------------------------
func BeginningKey() []byte {
return []byte{}
}
func EndingKey() []byte {
return nil
}
/*
Usage:
for itr.Seek(mykey); itr.Valid(); itr.Next() {
var itr Iterator = ...
defer itr.Release()
for ; itr.Valid(); itr.Next() {
k, v := itr.Key(); itr.Value()
....
// ...
}
*/
type Iterator interface {
@ -106,6 +117,6 @@ type Iterator interface {
// This method is safe to call when Valid returns false.
GetError() error
// Close deallocates the given Iterator.
Close()
// Release deallocates the given Iterator.
Release()
}

+ 25
- 72
db/util.go View File

@ -1,82 +1,35 @@
package db
import "bytes"
// A wrapper around itr that tries to keep the iterator
// within the bounds as defined by `prefix`
type prefixIterator struct {
itr Iterator
prefix []byte
invalid bool
}
func (pi *prefixIterator) Seek(key []byte) {
if !bytes.HasPrefix(key, pi.prefix) {
pi.invalid = true
return
}
pi.itr.Seek(key)
pi.checkInvalid()
}
func (pi *prefixIterator) checkInvalid() {
if !pi.itr.Valid() {
pi.invalid = true
}
}
func (pi *prefixIterator) Valid() bool {
if pi.invalid {
return false
}
key := pi.itr.Key()
ok := bytes.HasPrefix(key, pi.prefix)
if !ok {
pi.invalid = true
return false
}
return true
}
func (pi *prefixIterator) Next() {
if pi.invalid {
panic("prefixIterator Next() called when invalid")
func IteratePrefix(db DB, prefix []byte) Iterator {
var start, end []byte
if len(prefix) == 0 {
start = BeginningKey()
end = EndingKey()
} else {
start = cp(prefix)
end = cpIncr(prefix)
}
pi.itr.Next()
pi.checkInvalid()
return db.Iterator(start, end)
}
func (pi *prefixIterator) Prev() {
if pi.invalid {
panic("prefixIterator Prev() called when invalid")
}
pi.itr.Prev()
pi.checkInvalid()
}
//----------------------------------------
func (pi *prefixIterator) Key() []byte {
if pi.invalid {
panic("prefixIterator Key() called when invalid")
}
return pi.itr.Key()
func cp(bz []byte) (ret []byte) {
ret = make([]byte, len(bz))
copy(ret, bz)
return ret
}
func (pi *prefixIterator) Value() []byte {
if pi.invalid {
panic("prefixIterator Value() called when invalid")
}
return pi.itr.Value()
}
func (pi *prefixIterator) Close() { pi.itr.Close() }
func (pi *prefixIterator) GetError() error { return pi.itr.GetError() }
func IteratePrefix(db DB, prefix []byte) Iterator {
itr := db.Iterator()
pi := &prefixIterator{
itr: itr,
prefix: prefix,
// CONTRACT: len(bz) > 0
func cpIncr(bz []byte) (ret []byte) {
ret = cp(bz)
for i := len(bz) - 1; i >= 0; i-- {
if ret[i] < byte(0xFF) {
ret[i] += 1
return
} else {
ret[i] = byte(0x00)
}
}
pi.Seek(prefix)
return pi
return EndingKey()
}

Loading…
Cancel
Save