diff --git a/grpcdb/doc.go b/grpcdb/doc.go new file mode 100644 index 000000000..a54cab207 --- /dev/null +++ b/grpcdb/doc.go @@ -0,0 +1,30 @@ +/* +grpcdb is the distribution of Tendermint's db.DB instances using +the gRPC transport to decouple local db.DB usages from applications, +to using them over a network in a highly performant manner. + +grpcdb allows users to initialize a database's server like +they would locally and invoke the respective methods of db.DB. + +Most users shouldn't use this package, but should instead use +remotedb. Only the lower level users and database server deployers +should use it, for functionality such as: + + ln, err := net.Listen("tcp", "0.0.0.0:0") + srv := grpcdb.NewServer() + defer srv.Stop() + go func() { + if err := srv.Serve(ln); err != nil { + t.Fatalf("BindServer: %v", err) + } + }() + +or + addr := ":8998" + go func() { + if err := grpcdb.ListenAndServe(addr); err != nil { + log.Fatalf("BindServer: %v", err) + } + }() +*/ +package grpcdb diff --git a/grpcdb/example_test.go b/grpcdb/example_test.go index 451428b97..cbe1abf92 100644 --- a/grpcdb/example_test.go +++ b/grpcdb/example_test.go @@ -12,7 +12,7 @@ import ( func Example() { addr := ":8998" go func() { - if err := grpcdb.BindServer(addr); err != nil { + if err := grpcdb.ListenAndServe(addr); err != nil { log.Fatalf("BindServer: %v", err) } }() diff --git a/grpcdb/server.go b/grpcdb/server.go index c4d115bd7..301f43f23 100644 --- a/grpcdb/server.go +++ b/grpcdb/server.go @@ -2,7 +2,6 @@ package grpcdb import ( "context" - "log" "net" "sync" "time" @@ -13,17 +12,22 @@ import ( protodb "github.com/tendermint/tmlibs/proto" ) -// BindServer is a blocking function that sets up a gRPC based +// ListenAndServe is a blocking function that sets up a gRPC based // server at the address supplied, with the gRPC options passed in. // Normally in usage, invoke it in a goroutine like you would for http.ListenAndServe. -func BindServer(addr string, opts ...grpc.ServerOption) error { +func ListenAndServe(addr string, opts ...grpc.ServerOption) error { ln, err := net.Listen("tcp", addr) if err != nil { return err } + srv := NewServer(opts...) + return srv.Serve(ln) +} + +func NewServer(opts ...grpc.ServerOption) *grpc.Server { srv := grpc.NewServer(opts...) protodb.RegisterDBServer(srv, new(server)) - return srv.Serve(ln) + return srv } type server struct { @@ -50,7 +54,6 @@ func (s *server) Init(ctx context.Context, in *protodb.Init) (*protodb.Entity, e s.mu.Lock() defer s.mu.Unlock() - log.Printf("in: %+v\n", in) s.db = db.NewDB(in.Name, db.DBBackendType(in.Type), in.Dir) return &protodb.Entity{TimeAt: time.Now().Unix()}, nil } diff --git a/remotedb/doc.go b/remotedb/doc.go new file mode 100644 index 000000000..07c95a56a --- /dev/null +++ b/remotedb/doc.go @@ -0,0 +1,37 @@ +/* +remotedb is a package for connecting to distributed Tendermint db.DB +instances. The purpose is to detach difficult deployments such as +CLevelDB that requires gcc or perhaps for databases that require +custom configurations such as extra disk space. It also eases +the burden and cost of deployment of dependencies for databases +to be used by Tendermint developers. Most importantly it is built +over the high performant gRPC transport. + +remotedb's RemoteDB implements db.DB so can be used normally +like other databases. One just has to explicitly connect to the +remote database with a client setup such as: + + client, err := remotedb.NewInsecure(addr) + // Make sure to invoke InitRemote! + if err := client.InitRemote(&remotedb.Init{Name: "test-remote-db", Type: "leveldb"}); err != nil { + log.Fatalf("Failed to initialize the remote db") + } + + client.Set(key1, value) + gv1 := client.SetSync(k2, v2) + + client.Delete(k1) + gv2 := client.Get(k1) + + for itr := client.Iterator(k1, k9); itr.Valid(); itr.Next() { + ik, iv := itr.Key(), itr.Value() + ds, de := itr.Domain() + } + + stats := client.Stats() + + if !client.Has(dk1) { + client.SetSync(dk1, dv1) + } +*/ +package remotedb diff --git a/remotedb/remotedb.go b/remotedb/remotedb.go new file mode 100644 index 000000000..a110e816c --- /dev/null +++ b/remotedb/remotedb.go @@ -0,0 +1,226 @@ +package remotedb + +import ( + "context" + "fmt" + + "github.com/tendermint/tmlibs/db" + "github.com/tendermint/tmlibs/grpcdb" + protodb "github.com/tendermint/tmlibs/proto" +) + +type RemoteDB struct { + ctx context.Context + dc protodb.DBClient +} + +func NewSecure(serverAddr string) (*RemoteDB, error) { + return newRemoteDB(grpcdb.NewClient(serverAddr, grpcdb.Secure)) +} + +func NewInsecure(serverAddr string) (*RemoteDB, error) { + return newRemoteDB(grpcdb.NewClient(serverAddr, grpcdb.Insecure)) +} + +func newRemoteDB(gdc protodb.DBClient, err error) (*RemoteDB, error) { + if err != nil { + return nil, err + } + return &RemoteDB{dc: gdc, ctx: context.Background()}, nil +} + +type Init struct { + Dir string + Name string + Type string +} + +func (rd *RemoteDB) InitRemote(in *Init) error { + _, err := rd.dc.Init(rd.ctx, &protodb.Init{Dir: in.Dir, Type: in.Type, Name: in.Name}) + return err +} + +var _ db.DB = (*RemoteDB)(nil) + +// Close is a noop currently +func (rd *RemoteDB) Close() { +} + +func (rd *RemoteDB) Delete(key []byte) { + if _, err := rd.dc.Delete(rd.ctx, &protodb.Entity{Key: key}); err != nil { + panic(fmt.Sprintf("RemoteDB.Delete: %v", err)) + } +} + +func (rd *RemoteDB) DeleteSync(key []byte) { + if _, err := rd.dc.DeleteSync(rd.ctx, &protodb.Entity{Key: key}); err != nil { + panic(fmt.Sprintf("RemoteDB.DeleteSync: %v", err)) + } +} + +func (rd *RemoteDB) Set(key, value []byte) { + if _, err := rd.dc.Set(rd.ctx, &protodb.Entity{Key: key, Value: value}); err != nil { + panic(fmt.Sprintf("RemoteDB.Set: %v", err)) + } +} + +func (rd *RemoteDB) SetSync(key, value []byte) { + if _, err := rd.dc.SetSync(rd.ctx, &protodb.Entity{Key: key, Value: value}); err != nil { + panic(fmt.Sprintf("RemoteDB.SetSync: %v", err)) + } +} + +func (rd *RemoteDB) Get(key []byte) []byte { + res, err := rd.dc.Get(rd.ctx, &protodb.Entity{Key: key}) + if err != nil { + panic(fmt.Sprintf("RemoteDB.Get error: %v", err)) + } + return res.Value +} + +func (rd *RemoteDB) Has(key []byte) bool { + res, err := rd.dc.Has(rd.ctx, &protodb.Entity{Key: key}) + if err != nil { + panic(fmt.Sprintf("RemoteDB.Has error: %v", err)) + } + return res.Exists +} + +func (rd *RemoteDB) ReverseIterator(start, end []byte) db.Iterator { + dic, err := rd.dc.ReverseIterator(rd.ctx, &protodb.Entity{Start: start, End: end}) + if err != nil { + panic(fmt.Sprintf("RemoteDB.Iterator error: %v", err)) + } + return makeReverseIterator(dic) +} + +// TODO: Implement NewBatch +func (rd *RemoteDB) NewBatch() db.Batch { + panic("Unimplemented") +} + +// TODO: Implement Print when db.DB implements a method +// to print to a string and not db.Print to stdout. +func (rd *RemoteDB) Print() { + panic("Unimplemented") +} + +func (rd *RemoteDB) Stats() map[string]string { + stats, err := rd.dc.Stats(rd.ctx, &protodb.Nothing{}) + if err != nil { + panic(fmt.Sprintf("RemoteDB.Stats error: %v", err)) + } + if stats == nil { + return nil + } + return stats.Data +} + +func (rd *RemoteDB) Iterator(start, end []byte) db.Iterator { + dic, err := rd.dc.Iterator(rd.ctx, &protodb.Entity{Start: start, End: end}) + if err != nil { + panic(fmt.Sprintf("RemoteDB.Iterator error: %v", err)) + } + return makeIterator(dic) +} + +func makeIterator(dic protodb.DB_IteratorClient) db.Iterator { + return &iterator{dic: dic} +} + +func makeReverseIterator(dric protodb.DB_ReverseIteratorClient) db.Iterator { + return &reverseIterator{dric: dric} +} + +type reverseIterator struct { + dric protodb.DB_ReverseIteratorClient + cur *protodb.Iterator +} + +var _ db.Iterator = (*iterator)(nil) + +func (rItr *reverseIterator) Valid() bool { + return rItr.cur != nil && rItr.cur.Valid +} + +func (rItr *reverseIterator) Domain() (start, end []byte) { + if rItr.cur == nil || rItr.cur.Domain == nil { + return nil, nil + } + return rItr.cur.Domain.Start, rItr.cur.Domain.End +} + +// Next advances the current reverseIterator +func (rItr *reverseIterator) Next() { + var err error + rItr.cur, err = rItr.dric.Recv() + if err != nil { + panic(fmt.Sprintf("RemoteDB.ReverseIterator.Next error: %v", err)) + } +} + +func (rItr *reverseIterator) Key() []byte { + if rItr.cur == nil { + return nil + } + return rItr.cur.Key +} + +func (rItr *reverseIterator) Value() []byte { + if rItr.cur == nil { + return nil + } + return rItr.cur.Value +} + +func (rItr *reverseIterator) Close() { +} + +// iterator implements the db.Iterator by retrieving +// streamed iterators from the remote backend as +// needed. It is NOT safe for concurrent usage, +// matching the behavior of other iterators. +type iterator struct { + dic protodb.DB_IteratorClient + cur *protodb.Iterator +} + +var _ db.Iterator = (*iterator)(nil) + +func (itr *iterator) Valid() bool { + return itr.cur != nil && itr.cur.Valid +} + +func (itr *iterator) Domain() (start, end []byte) { + if itr.cur == nil || itr.cur.Domain == nil { + return nil, nil + } + return itr.cur.Domain.Start, itr.cur.Domain.End +} + +// Next advances the current iterator +func (itr *iterator) Next() { + var err error + itr.cur, err = itr.dic.Recv() + if err != nil { + panic(fmt.Sprintf("RemoteDB.Iterator.Next error: %v", err)) + } +} + +func (itr *iterator) Key() []byte { + if itr.cur == nil { + return nil + } + return itr.cur.Key +} + +func (itr *iterator) Value() []byte { + if itr.cur == nil { + return nil + } + return itr.cur.Value +} + +func (itr *iterator) Close() { + // TODO: Shut down the iterator +} diff --git a/remotedb/remotedb_test.go b/remotedb/remotedb_test.go new file mode 100644 index 000000000..37ce0c59a --- /dev/null +++ b/remotedb/remotedb_test.go @@ -0,0 +1,41 @@ +package remotedb_test + +import ( + "net" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/tendermint/tmlibs/grpcdb" + "github.com/tendermint/tmlibs/remotedb" +) + +func TestRemoteDB(t *testing.T) { + ln, err := net.Listen("tcp", "0.0.0.0:0") + require.Nil(t, err, "expecting a port to have been assigned on which we can listen") + srv := grpcdb.NewServer() + defer srv.Stop() + go func() { + if err := srv.Serve(ln); err != nil { + t.Fatalf("BindServer: %v", err) + } + }() + + client, err := remotedb.NewInsecure(ln.Addr().String()) + require.Nil(t, err, "expecting a successful client creation") + require.Nil(t, client.InitRemote(&remotedb.Init{Name: "test-remote-db", Type: "leveldb"})) + + k1 := []byte("key-1") + v1 := client.Get(k1) + require.Equal(t, 0, len(v1), "expecting no key1 to have been stored") + vv1 := []byte("value-1") + client.Set(k1, vv1) + gv1 := client.Get(k1) + require.Equal(t, gv1, vv1) + + // Deletion + client.Delete(k1) + gv2 := client.Get(k1) + require.Equal(t, len(gv2), 0, "after deletion, not expecting the key to exist anymore") + require.NotEqual(t, len(gv1), len(gv2), "after deletion, not expecting the key to exist anymore") +}