Browse Source

Merge pull request #162 from tendermint/db-gRPC

DB as a service: remove database deployment with API
pull/1780/head
Christopher Goes 7 years ago
committed by GitHub
parent
commit
67faf556ed
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1809 additions and 6 deletions
  1. +72
    -2
      Gopkg.lock
  2. +21
    -4
      Makefile
  3. +30
    -0
      grpcdb/client.go
  4. +32
    -0
      grpcdb/doc.go
  5. +52
    -0
      grpcdb/example_test.go
  6. +197
    -0
      grpcdb/server.go
  7. +914
    -0
      proto/defs.pb.go
  8. +71
    -0
      proto/defs.proto
  9. +37
    -0
      remotedb/doc.go
  10. +262
    -0
      remotedb/remotedb.go
  11. +115
    -0
      remotedb/remotedb_test.go
  12. +6
    -0
      test.sh

+ 72
- 2
Gopkg.lock View File

@ -51,6 +51,18 @@
revision = "1adfc126b41513cc696b209667c8656ea7aac67c"
version = "v1.0.0"
[[projects]]
name = "github.com/golang/protobuf"
packages = [
"proto",
"ptypes",
"ptypes/any",
"ptypes/duration",
"ptypes/timestamp"
]
revision = "b4deda0973fb4c70b50d226b1af49f3da59f5265"
version = "v1.1.0"
[[projects]]
branch = "master"
name = "github.com/golang/snappy"
@ -189,6 +201,20 @@
packages = ["ripemd160"]
revision = "edd5e9b0879d13ee6970a50153d85b8fec9f7686"
[[projects]]
branch = "master"
name = "golang.org/x/net"
packages = [
"context",
"http/httpguts",
"http2",
"http2/hpack",
"idna",
"internal/timeseries",
"trace"
]
revision = "d11bb6cd8e3c4e60239c9cb20ef68586d74500d0"
[[projects]]
name = "golang.org/x/sys"
packages = ["unix"]
@ -197,15 +223,59 @@
[[projects]]
name = "golang.org/x/text"
packages = [
"collate",
"collate/build",
"internal/colltab",
"internal/gen",
"internal/tag",
"internal/triegen",
"internal/ucd",
"language",
"secure/bidirule",
"transform",
"unicode/bidi",
"unicode/cldr",
"unicode/norm"
"unicode/norm",
"unicode/rangetable"
]
revision = "c01e4764d870b77f8abe5096ee19ad20d80e8075"
[[projects]]
branch = "master"
name = "google.golang.org/genproto"
packages = ["googleapis/rpc/status"]
revision = "86e600f69ee4704c6efbf6a2a40a5c10700e76c2"
[[projects]]
name = "google.golang.org/grpc"
packages = [
".",
"balancer",
"balancer/base",
"balancer/roundrobin",
"codes",
"connectivity",
"credentials",
"encoding",
"encoding/proto",
"grpclb/grpc_lb_v1/messages",
"grpclog",
"internal",
"keepalive",
"metadata",
"naming",
"peer",
"resolver",
"resolver/dns",
"resolver/passthrough",
"stats",
"status",
"tap",
"transport"
]
revision = "d11072e7ca9811b1100b80ca0269ac831f06d024"
version = "v1.11.3"
[[projects]]
name = "gopkg.in/yaml.v2"
packages = ["."]
@ -215,6 +285,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "c33ff784e40965e1cd0ec6232b43e379c6608cb41a9c5c707247742b68c906fb"
inputs-digest = "8aa4ea7ef6d0ff170127eb5bca89c6c37c767d58047159cfd26a431c5cd5e7ad"
solver-name = "gps-cdcl"
solver-version = 1

+ 21
- 4
Makefile View File

@ -1,13 +1,14 @@
GOTOOLS = \
github.com/golang/dep/cmd/dep \
github.com/gogo/protobuf/protoc-gen-gogo \
github.com/gogo/protobuf/gogoproto
github.com/gogo/protobuf/gogoproto \
github.com/square/certstrap
# github.com/alecthomas/gometalinter.v2 \
GOTOOLS_CHECK = dep gometalinter.v2 protoc protoc-gen-gogo
INCLUDE = -I=. -I=${GOPATH}/src -I=${GOPATH}/src/github.com/gogo/protobuf/protobuf
all: check get_vendor_deps protoc build test install metalinter
all: check get_vendor_deps protoc grpc_dbserver build test install metalinter
check: check_tools
@ -66,8 +67,21 @@ get_vendor_deps:
########################################
### Testing
test:
gen_certs: clean_certs
## Generating certificates for TLS testing...
certstrap init --common-name "tendermint.com" --passphrase ""
certstrap request-cert -ip "::" --passphrase ""
certstrap sign "::" --CA "tendermint.com" --passphrase ""
mv out/::.crt out/::.key remotedb
clean_certs:
## Cleaning TLS testing certificates...
rm -rf out
rm -f remotedb/::.crt remotedb/::.key
test: gen_certs
go test -tags gcc $(shell go list ./... | grep -v vendor)
make clean_certs
test100:
@for i in {1..100}; do make test; done
@ -118,4 +132,7 @@ metalinter_all:
# To avoid unintended conflicts with file names, always add to .PHONY
# unless there is a reason not to.
# https://www.gnu.org/software/make/manual/html_node/Phony-Targets.html
.PHONY: check protoc build check_tools get_tools get_protoc update_tools get_vendor_deps test fmt metalinter metalinter_all
.PHONY: check protoc build check_tools get_tools get_protoc update_tools get_vendor_deps test fmt metalinter metalinter_all gen_certs clean_certs
grpc_dbserver:
protoc -I proto/ proto/defs.proto --go_out=plugins=grpc:proto

+ 30
- 0
grpcdb/client.go View File

@ -0,0 +1,30 @@
package grpcdb
import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
protodb "github.com/tendermint/tmlibs/proto"
)
// Security defines how the client will talk to the gRPC server.
type Security uint
const (
Insecure Security = iota
Secure
)
// NewClient creates a gRPC client connected to the bound gRPC server at serverAddr.
// Use kind to set the level of security to either Secure or Insecure.
func NewClient(serverAddr, serverCert string) (protodb.DBClient, error) {
creds, err := credentials.NewClientTLSFromFile(serverCert, "")
if err != nil {
return nil, err
}
cc, err := grpc.Dial(serverAddr, grpc.WithTransportCredentials(creds))
if err != nil {
return nil, err
}
return protodb.NewDBClient(cc), nil
}

+ 32
- 0
grpcdb/doc.go View File

@ -0,0 +1,32 @@
/*
grpcdb is the distribution of Tendermint's db.DB instances using
the gRPC transport to decouple local db.DB usages from applications,
to using them over a network in a highly performant manner.
grpcdb allows users to initialize a database's server like
they would locally and invoke the respective methods of db.DB.
Most users shouldn't use this package, but should instead use
remotedb. Only the lower level users and database server deployers
should use it, for functionality such as:
ln, err := net.Listen("tcp", "0.0.0.0:0")
srv := grpcdb.NewServer()
defer srv.Stop()
go func() {
if err := srv.Serve(ln); err != nil {
t.Fatalf("BindServer: %v", err)
}
}()
or
addr := ":8998"
cert := "server.crt"
key := "server.key"
go func() {
if err := grpcdb.ListenAndServe(addr, cert, key); err != nil {
log.Fatalf("BindServer: %v", err)
}
}()
*/
package grpcdb

+ 52
- 0
grpcdb/example_test.go View File

@ -0,0 +1,52 @@
package grpcdb_test
import (
"bytes"
"context"
"log"
grpcdb "github.com/tendermint/tmlibs/grpcdb"
protodb "github.com/tendermint/tmlibs/proto"
)
func Example() {
addr := ":8998"
cert := "server.crt"
key := "server.key"
go func() {
if err := grpcdb.ListenAndServe(addr, cert, key); err != nil {
log.Fatalf("BindServer: %v", err)
}
}()
client, err := grpcdb.NewClient(addr, cert)
if err != nil {
log.Fatalf("Failed to create grpcDB client: %v", err)
}
ctx := context.Background()
// 1. Initialize the DB
in := &protodb.Init{
Type: "leveldb",
Name: "grpc-uno-test",
Dir: ".",
}
if _, err := client.Init(ctx, in); err != nil {
log.Fatalf("Init error: %v", err)
}
// 2. Now it can be used!
query1 := &protodb.Entity{Key: []byte("Project"), Value: []byte("Tmlibs-on-gRPC")}
if _, err := client.SetSync(ctx, query1); err != nil {
log.Fatalf("SetSync err: %v", err)
}
query2 := &protodb.Entity{Key: []byte("Project")}
read, err := client.Get(ctx, query2)
if err != nil {
log.Fatalf("Get err: %v", err)
}
if g, w := read.Value, []byte("Tmlibs-on-gRPC"); !bytes.Equal(g, w) {
log.Fatalf("got= (%q ==> % X)\nwant=(%q ==> % X)", g, g, w, w)
}
}

+ 197
- 0
grpcdb/server.go View File

@ -0,0 +1,197 @@
package grpcdb
import (
"context"
"net"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"github.com/tendermint/tmlibs/db"
protodb "github.com/tendermint/tmlibs/proto"
)
// ListenAndServe is a blocking function that sets up a gRPC based
// server at the address supplied, with the gRPC options passed in.
// Normally in usage, invoke it in a goroutine like you would for http.ListenAndServe.
func ListenAndServe(addr, cert, key string, opts ...grpc.ServerOption) error {
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
srv, err := NewServer(cert, key, opts...)
if err != nil {
return err
}
return srv.Serve(ln)
}
func NewServer(cert, key string, opts ...grpc.ServerOption) (*grpc.Server, error) {
creds, err := credentials.NewServerTLSFromFile(cert, key)
if err != nil {
return nil, err
}
opts = append(opts, grpc.Creds(creds))
srv := grpc.NewServer(opts...)
protodb.RegisterDBServer(srv, new(server))
return srv, nil
}
type server struct {
mu sync.Mutex
db db.DB
}
var _ protodb.DBServer = (*server)(nil)
// Init initializes the server's database. Only one type of database
// can be initialized per server.
//
// Dir is the directory on the file system in which the DB will be stored(if backed by disk) (TODO: remove)
//
// Name is representative filesystem entry's basepath
//
// Type can be either one of:
// * cleveldb (if built with gcc enabled)
// * fsdb
// * memdB
// * leveldb
// See https://godoc.org/github.com/tendermint/tmlibs/db#DBBackendType
func (s *server) Init(ctx context.Context, in *protodb.Init) (*protodb.Entity, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.db = db.NewDB(in.Name, db.DBBackendType(in.Type), in.Dir)
return &protodb.Entity{CreatedAt: time.Now().Unix()}, nil
}
func (s *server) Delete(ctx context.Context, in *protodb.Entity) (*protodb.Nothing, error) {
s.db.Delete(in.Key)
return nothing, nil
}
var nothing = new(protodb.Nothing)
func (s *server) DeleteSync(ctx context.Context, in *protodb.Entity) (*protodb.Nothing, error) {
s.db.DeleteSync(in.Key)
return nothing, nil
}
func (s *server) Get(ctx context.Context, in *protodb.Entity) (*protodb.Entity, error) {
value := s.db.Get(in.Key)
return &protodb.Entity{Value: value}, nil
}
func (s *server) GetStream(ds protodb.DB_GetStreamServer) error {
// Receive routine
responsesChan := make(chan *protodb.Entity)
go func() {
defer close(responsesChan)
ctx := context.Background()
for {
in, err := ds.Recv()
if err != nil {
responsesChan <- &protodb.Entity{Err: err.Error()}
return
}
out, err := s.Get(ctx, in)
if err != nil {
if out == nil {
out = new(protodb.Entity)
out.Key = in.Key
}
out.Err = err.Error()
responsesChan <- out
return
}
// Otherwise continue on
responsesChan <- out
}
}()
// Send routine, block until we return
for out := range responsesChan {
if err := ds.Send(out); err != nil {
return err
}
}
return nil
}
func (s *server) Has(ctx context.Context, in *protodb.Entity) (*protodb.Entity, error) {
exists := s.db.Has(in.Key)
return &protodb.Entity{Exists: exists}, nil
}
func (s *server) Set(ctx context.Context, in *protodb.Entity) (*protodb.Nothing, error) {
s.db.Set(in.Key, in.Value)
return nothing, nil
}
func (s *server) SetSync(ctx context.Context, in *protodb.Entity) (*protodb.Nothing, error) {
s.db.SetSync(in.Key, in.Value)
return nothing, nil
}
func (s *server) Iterator(query *protodb.Entity, dis protodb.DB_IteratorServer) error {
it := s.db.Iterator(query.Start, query.End)
return s.handleIterator(it, dis.Send)
}
func (s *server) handleIterator(it db.Iterator, sendFunc func(*protodb.Iterator) error) error {
for it.Valid() {
start, end := it.Domain()
out := &protodb.Iterator{
Domain: &protodb.Domain{Start: start, End: end},
Valid: it.Valid(),
Key: it.Key(),
Value: it.Value(),
}
if err := sendFunc(out); err != nil {
return err
}
// Finally move the iterator forward
it.Next()
}
return nil
}
func (s *server) ReverseIterator(query *protodb.Entity, dis protodb.DB_ReverseIteratorServer) error {
it := s.db.ReverseIterator(query.Start, query.End)
return s.handleIterator(it, dis.Send)
}
func (s *server) Stats(context.Context, *protodb.Nothing) (*protodb.Stats, error) {
stats := s.db.Stats()
return &protodb.Stats{Data: stats, TimeAt: time.Now().Unix()}, nil
}
func (s *server) BatchWrite(c context.Context, b *protodb.Batch) (*protodb.Nothing, error) {
return s.batchWrite(c, b, false)
}
func (s *server) BatchWriteSync(c context.Context, b *protodb.Batch) (*protodb.Nothing, error) {
return s.batchWrite(c, b, true)
}
func (s *server) batchWrite(c context.Context, b *protodb.Batch, sync bool) (*protodb.Nothing, error) {
bat := s.db.NewBatch()
for _, op := range b.Ops {
switch op.Type {
case protodb.Operation_SET:
bat.Set(op.Entity.Key, op.Entity.Value)
case protodb.Operation_DELETE:
bat.Delete(op.Entity.Key)
}
}
if sync {
bat.WriteSync()
} else {
bat.Write()
}
return nothing, nil
}

+ 914
- 0
proto/defs.pb.go View File

@ -0,0 +1,914 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: defs.proto
/*
Package protodb is a generated protocol buffer package.
It is generated from these files:
defs.proto
It has these top-level messages:
Batch
Operation
Entity
Nothing
Domain
Iterator
Stats
Init
*/
package protodb
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Operation_Type int32
const (
Operation_SET Operation_Type = 0
Operation_DELETE Operation_Type = 1
)
var Operation_Type_name = map[int32]string{
0: "SET",
1: "DELETE",
}
var Operation_Type_value = map[string]int32{
"SET": 0,
"DELETE": 1,
}
func (x Operation_Type) String() string {
return proto.EnumName(Operation_Type_name, int32(x))
}
func (Operation_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{1, 0} }
type Batch struct {
Ops []*Operation `protobuf:"bytes,1,rep,name=ops" json:"ops,omitempty"`
}
func (m *Batch) Reset() { *m = Batch{} }
func (m *Batch) String() string { return proto.CompactTextString(m) }
func (*Batch) ProtoMessage() {}
func (*Batch) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *Batch) GetOps() []*Operation {
if m != nil {
return m.Ops
}
return nil
}
type Operation struct {
Entity *Entity `protobuf:"bytes,1,opt,name=entity" json:"entity,omitempty"`
Type Operation_Type `protobuf:"varint,2,opt,name=type,enum=protodb.Operation_Type" json:"type,omitempty"`
}
func (m *Operation) Reset() { *m = Operation{} }
func (m *Operation) String() string { return proto.CompactTextString(m) }
func (*Operation) ProtoMessage() {}
func (*Operation) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *Operation) GetEntity() *Entity {
if m != nil {
return m.Entity
}
return nil
}
func (m *Operation) GetType() Operation_Type {
if m != nil {
return m.Type
}
return Operation_SET
}
type Entity struct {
Id int32 `protobuf:"varint,1,opt,name=id" json:"id,omitempty"`
Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"`
Value []byte `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"`
Exists bool `protobuf:"varint,4,opt,name=exists" json:"exists,omitempty"`
Start []byte `protobuf:"bytes,5,opt,name=start,proto3" json:"start,omitempty"`
End []byte `protobuf:"bytes,6,opt,name=end,proto3" json:"end,omitempty"`
Err string `protobuf:"bytes,7,opt,name=err" json:"err,omitempty"`
CreatedAt int64 `protobuf:"varint,8,opt,name=created_at,json=createdAt" json:"created_at,omitempty"`
}
func (m *Entity) Reset() { *m = Entity{} }
func (m *Entity) String() string { return proto.CompactTextString(m) }
func (*Entity) ProtoMessage() {}
func (*Entity) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
func (m *Entity) GetId() int32 {
if m != nil {
return m.Id
}
return 0
}
func (m *Entity) GetKey() []byte {
if m != nil {
return m.Key
}
return nil
}
func (m *Entity) GetValue() []byte {
if m != nil {
return m.Value
}
return nil
}
func (m *Entity) GetExists() bool {
if m != nil {
return m.Exists
}
return false
}
func (m *Entity) GetStart() []byte {
if m != nil {
return m.Start
}
return nil
}
func (m *Entity) GetEnd() []byte {
if m != nil {
return m.End
}
return nil
}
func (m *Entity) GetErr() string {
if m != nil {
return m.Err
}
return ""
}
func (m *Entity) GetCreatedAt() int64 {
if m != nil {
return m.CreatedAt
}
return 0
}
type Nothing struct {
}
func (m *Nothing) Reset() { *m = Nothing{} }
func (m *Nothing) String() string { return proto.CompactTextString(m) }
func (*Nothing) ProtoMessage() {}
func (*Nothing) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
type Domain struct {
Start []byte `protobuf:"bytes,1,opt,name=start,proto3" json:"start,omitempty"`
End []byte `protobuf:"bytes,2,opt,name=end,proto3" json:"end,omitempty"`
}
func (m *Domain) Reset() { *m = Domain{} }
func (m *Domain) String() string { return proto.CompactTextString(m) }
func (*Domain) ProtoMessage() {}
func (*Domain) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
func (m *Domain) GetStart() []byte {
if m != nil {
return m.Start
}
return nil
}
func (m *Domain) GetEnd() []byte {
if m != nil {
return m.End
}
return nil
}
type Iterator struct {
Domain *Domain `protobuf:"bytes,1,opt,name=domain" json:"domain,omitempty"`
Valid bool `protobuf:"varint,2,opt,name=valid" json:"valid,omitempty"`
Key []byte `protobuf:"bytes,3,opt,name=key,proto3" json:"key,omitempty"`
Value []byte `protobuf:"bytes,4,opt,name=value,proto3" json:"value,omitempty"`
}
func (m *Iterator) Reset() { *m = Iterator{} }
func (m *Iterator) String() string { return proto.CompactTextString(m) }
func (*Iterator) ProtoMessage() {}
func (*Iterator) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} }
func (m *Iterator) GetDomain() *Domain {
if m != nil {
return m.Domain
}
return nil
}
func (m *Iterator) GetValid() bool {
if m != nil {
return m.Valid
}
return false
}
func (m *Iterator) GetKey() []byte {
if m != nil {
return m.Key
}
return nil
}
func (m *Iterator) GetValue() []byte {
if m != nil {
return m.Value
}
return nil
}
type Stats struct {
Data map[string]string `protobuf:"bytes,1,rep,name=data" json:"data,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
TimeAt int64 `protobuf:"varint,2,opt,name=time_at,json=timeAt" json:"time_at,omitempty"`
}
func (m *Stats) Reset() { *m = Stats{} }
func (m *Stats) String() string { return proto.CompactTextString(m) }
func (*Stats) ProtoMessage() {}
func (*Stats) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} }
func (m *Stats) GetData() map[string]string {
if m != nil {
return m.Data
}
return nil
}
func (m *Stats) GetTimeAt() int64 {
if m != nil {
return m.TimeAt
}
return 0
}
type Init struct {
Type string `protobuf:"bytes,1,opt,name=Type" json:"Type,omitempty"`
Name string `protobuf:"bytes,2,opt,name=Name" json:"Name,omitempty"`
Dir string `protobuf:"bytes,3,opt,name=Dir" json:"Dir,omitempty"`
}
func (m *Init) Reset() { *m = Init{} }
func (m *Init) String() string { return proto.CompactTextString(m) }
func (*Init) ProtoMessage() {}
func (*Init) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} }
func (m *Init) GetType() string {
if m != nil {
return m.Type
}
return ""
}
func (m *Init) GetName() string {
if m != nil {
return m.Name
}
return ""
}
func (m *Init) GetDir() string {
if m != nil {
return m.Dir
}
return ""
}
func init() {
proto.RegisterType((*Batch)(nil), "protodb.Batch")
proto.RegisterType((*Operation)(nil), "protodb.Operation")
proto.RegisterType((*Entity)(nil), "protodb.Entity")
proto.RegisterType((*Nothing)(nil), "protodb.Nothing")
proto.RegisterType((*Domain)(nil), "protodb.Domain")
proto.RegisterType((*Iterator)(nil), "protodb.Iterator")
proto.RegisterType((*Stats)(nil), "protodb.Stats")
proto.RegisterType((*Init)(nil), "protodb.Init")
proto.RegisterEnum("protodb.Operation_Type", Operation_Type_name, Operation_Type_value)
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// Client API for DB service
type DBClient interface {
Init(ctx context.Context, in *Init, opts ...grpc.CallOption) (*Entity, error)
Get(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Entity, error)
GetStream(ctx context.Context, opts ...grpc.CallOption) (DB_GetStreamClient, error)
Has(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Entity, error)
Set(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Nothing, error)
SetSync(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Nothing, error)
Delete(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Nothing, error)
DeleteSync(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Nothing, error)
Iterator(ctx context.Context, in *Entity, opts ...grpc.CallOption) (DB_IteratorClient, error)
ReverseIterator(ctx context.Context, in *Entity, opts ...grpc.CallOption) (DB_ReverseIteratorClient, error)
// rpc print(Nothing) returns (Entity) {}
Stats(ctx context.Context, in *Nothing, opts ...grpc.CallOption) (*Stats, error)
BatchWrite(ctx context.Context, in *Batch, opts ...grpc.CallOption) (*Nothing, error)
BatchWriteSync(ctx context.Context, in *Batch, opts ...grpc.CallOption) (*Nothing, error)
}
type dBClient struct {
cc *grpc.ClientConn
}
func NewDBClient(cc *grpc.ClientConn) DBClient {
return &dBClient{cc}
}
func (c *dBClient) Init(ctx context.Context, in *Init, opts ...grpc.CallOption) (*Entity, error) {
out := new(Entity)
err := grpc.Invoke(ctx, "/protodb.DB/init", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *dBClient) Get(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Entity, error) {
out := new(Entity)
err := grpc.Invoke(ctx, "/protodb.DB/get", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *dBClient) GetStream(ctx context.Context, opts ...grpc.CallOption) (DB_GetStreamClient, error) {
stream, err := grpc.NewClientStream(ctx, &_DB_serviceDesc.Streams[0], c.cc, "/protodb.DB/getStream", opts...)
if err != nil {
return nil, err
}
x := &dBGetStreamClient{stream}
return x, nil
}
type DB_GetStreamClient interface {
Send(*Entity) error
Recv() (*Entity, error)
grpc.ClientStream
}
type dBGetStreamClient struct {
grpc.ClientStream
}
func (x *dBGetStreamClient) Send(m *Entity) error {
return x.ClientStream.SendMsg(m)
}
func (x *dBGetStreamClient) Recv() (*Entity, error) {
m := new(Entity)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *dBClient) Has(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Entity, error) {
out := new(Entity)
err := grpc.Invoke(ctx, "/protodb.DB/has", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *dBClient) Set(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Nothing, error) {
out := new(Nothing)
err := grpc.Invoke(ctx, "/protodb.DB/set", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *dBClient) SetSync(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Nothing, error) {
out := new(Nothing)
err := grpc.Invoke(ctx, "/protodb.DB/setSync", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *dBClient) Delete(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Nothing, error) {
out := new(Nothing)
err := grpc.Invoke(ctx, "/protodb.DB/delete", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *dBClient) DeleteSync(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Nothing, error) {
out := new(Nothing)
err := grpc.Invoke(ctx, "/protodb.DB/deleteSync", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *dBClient) Iterator(ctx context.Context, in *Entity, opts ...grpc.CallOption) (DB_IteratorClient, error) {
stream, err := grpc.NewClientStream(ctx, &_DB_serviceDesc.Streams[1], c.cc, "/protodb.DB/iterator", opts...)
if err != nil {
return nil, err
}
x := &dBIteratorClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type DB_IteratorClient interface {
Recv() (*Iterator, error)
grpc.ClientStream
}
type dBIteratorClient struct {
grpc.ClientStream
}
func (x *dBIteratorClient) Recv() (*Iterator, error) {
m := new(Iterator)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *dBClient) ReverseIterator(ctx context.Context, in *Entity, opts ...grpc.CallOption) (DB_ReverseIteratorClient, error) {
stream, err := grpc.NewClientStream(ctx, &_DB_serviceDesc.Streams[2], c.cc, "/protodb.DB/reverseIterator", opts...)
if err != nil {
return nil, err
}
x := &dBReverseIteratorClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type DB_ReverseIteratorClient interface {
Recv() (*Iterator, error)
grpc.ClientStream
}
type dBReverseIteratorClient struct {
grpc.ClientStream
}
func (x *dBReverseIteratorClient) Recv() (*Iterator, error) {
m := new(Iterator)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *dBClient) Stats(ctx context.Context, in *Nothing, opts ...grpc.CallOption) (*Stats, error) {
out := new(Stats)
err := grpc.Invoke(ctx, "/protodb.DB/stats", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *dBClient) BatchWrite(ctx context.Context, in *Batch, opts ...grpc.CallOption) (*Nothing, error) {
out := new(Nothing)
err := grpc.Invoke(ctx, "/protodb.DB/batchWrite", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *dBClient) BatchWriteSync(ctx context.Context, in *Batch, opts ...grpc.CallOption) (*Nothing, error) {
out := new(Nothing)
err := grpc.Invoke(ctx, "/protodb.DB/batchWriteSync", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for DB service
type DBServer interface {
Init(context.Context, *Init) (*Entity, error)
Get(context.Context, *Entity) (*Entity, error)
GetStream(DB_GetStreamServer) error
Has(context.Context, *Entity) (*Entity, error)
Set(context.Context, *Entity) (*Nothing, error)
SetSync(context.Context, *Entity) (*Nothing, error)
Delete(context.Context, *Entity) (*Nothing, error)
DeleteSync(context.Context, *Entity) (*Nothing, error)
Iterator(*Entity, DB_IteratorServer) error
ReverseIterator(*Entity, DB_ReverseIteratorServer) error
// rpc print(Nothing) returns (Entity) {}
Stats(context.Context, *Nothing) (*Stats, error)
BatchWrite(context.Context, *Batch) (*Nothing, error)
BatchWriteSync(context.Context, *Batch) (*Nothing, error)
}
func RegisterDBServer(s *grpc.Server, srv DBServer) {
s.RegisterService(&_DB_serviceDesc, srv)
}
func _DB_Init_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Init)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DBServer).Init(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/protodb.DB/Init",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DBServer).Init(ctx, req.(*Init))
}
return interceptor(ctx, in, info, handler)
}
func _DB_Get_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Entity)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DBServer).Get(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/protodb.DB/Get",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DBServer).Get(ctx, req.(*Entity))
}
return interceptor(ctx, in, info, handler)
}
func _DB_GetStream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(DBServer).GetStream(&dBGetStreamServer{stream})
}
type DB_GetStreamServer interface {
Send(*Entity) error
Recv() (*Entity, error)
grpc.ServerStream
}
type dBGetStreamServer struct {
grpc.ServerStream
}
func (x *dBGetStreamServer) Send(m *Entity) error {
return x.ServerStream.SendMsg(m)
}
func (x *dBGetStreamServer) Recv() (*Entity, error) {
m := new(Entity)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func _DB_Has_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Entity)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DBServer).Has(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/protodb.DB/Has",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DBServer).Has(ctx, req.(*Entity))
}
return interceptor(ctx, in, info, handler)
}
func _DB_Set_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Entity)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DBServer).Set(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/protodb.DB/Set",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DBServer).Set(ctx, req.(*Entity))
}
return interceptor(ctx, in, info, handler)
}
func _DB_SetSync_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Entity)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DBServer).SetSync(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/protodb.DB/SetSync",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DBServer).SetSync(ctx, req.(*Entity))
}
return interceptor(ctx, in, info, handler)
}
func _DB_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Entity)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DBServer).Delete(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/protodb.DB/Delete",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DBServer).Delete(ctx, req.(*Entity))
}
return interceptor(ctx, in, info, handler)
}
func _DB_DeleteSync_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Entity)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DBServer).DeleteSync(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/protodb.DB/DeleteSync",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DBServer).DeleteSync(ctx, req.(*Entity))
}
return interceptor(ctx, in, info, handler)
}
func _DB_Iterator_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(Entity)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(DBServer).Iterator(m, &dBIteratorServer{stream})
}
type DB_IteratorServer interface {
Send(*Iterator) error
grpc.ServerStream
}
type dBIteratorServer struct {
grpc.ServerStream
}
func (x *dBIteratorServer) Send(m *Iterator) error {
return x.ServerStream.SendMsg(m)
}
func _DB_ReverseIterator_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(Entity)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(DBServer).ReverseIterator(m, &dBReverseIteratorServer{stream})
}
type DB_ReverseIteratorServer interface {
Send(*Iterator) error
grpc.ServerStream
}
type dBReverseIteratorServer struct {
grpc.ServerStream
}
func (x *dBReverseIteratorServer) Send(m *Iterator) error {
return x.ServerStream.SendMsg(m)
}
func _DB_Stats_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Nothing)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DBServer).Stats(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/protodb.DB/Stats",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DBServer).Stats(ctx, req.(*Nothing))
}
return interceptor(ctx, in, info, handler)
}
func _DB_BatchWrite_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Batch)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DBServer).BatchWrite(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/protodb.DB/BatchWrite",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DBServer).BatchWrite(ctx, req.(*Batch))
}
return interceptor(ctx, in, info, handler)
}
func _DB_BatchWriteSync_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Batch)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DBServer).BatchWriteSync(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/protodb.DB/BatchWriteSync",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DBServer).BatchWriteSync(ctx, req.(*Batch))
}
return interceptor(ctx, in, info, handler)
}
var _DB_serviceDesc = grpc.ServiceDesc{
ServiceName: "protodb.DB",
HandlerType: (*DBServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "init",
Handler: _DB_Init_Handler,
},
{
MethodName: "get",
Handler: _DB_Get_Handler,
},
{
MethodName: "has",
Handler: _DB_Has_Handler,
},
{
MethodName: "set",
Handler: _DB_Set_Handler,
},
{
MethodName: "setSync",
Handler: _DB_SetSync_Handler,
},
{
MethodName: "delete",
Handler: _DB_Delete_Handler,
},
{
MethodName: "deleteSync",
Handler: _DB_DeleteSync_Handler,
},
{
MethodName: "stats",
Handler: _DB_Stats_Handler,
},
{
MethodName: "batchWrite",
Handler: _DB_BatchWrite_Handler,
},
{
MethodName: "batchWriteSync",
Handler: _DB_BatchWriteSync_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "getStream",
Handler: _DB_GetStream_Handler,
ServerStreams: true,
ClientStreams: true,
},
{
StreamName: "iterator",
Handler: _DB_Iterator_Handler,
ServerStreams: true,
},
{
StreamName: "reverseIterator",
Handler: _DB_ReverseIterator_Handler,
ServerStreams: true,
},
},
Metadata: "defs.proto",
}
func init() { proto.RegisterFile("defs.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 606 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0x4f, 0x6f, 0xd3, 0x4e,
0x10, 0xcd, 0xda, 0x8e, 0x13, 0x4f, 0x7f, 0xbf, 0x34, 0x8c, 0x10, 0xb5, 0x8a, 0x90, 0x22, 0x0b,
0x09, 0x43, 0x69, 0x14, 0x52, 0x24, 0xfe, 0x9c, 0x68, 0x95, 0x1c, 0x2a, 0xa1, 0x22, 0x39, 0x95,
0x38, 0xa2, 0x6d, 0x3d, 0x34, 0x2b, 0x1a, 0x3b, 0xac, 0x87, 0x8a, 0x5c, 0xb8, 0xf2, 0x79, 0xf8,
0x7c, 0x5c, 0xd0, 0xae, 0x1d, 0x87, 0x36, 0x39, 0x84, 0x53, 0x76, 0x66, 0xde, 0x7b, 0xb3, 0xf3,
0x32, 0x5e, 0x80, 0x94, 0x3e, 0x17, 0xfd, 0xb9, 0xce, 0x39, 0xc7, 0x96, 0xfd, 0x49, 0x2f, 0xa2,
0x43, 0x68, 0x9e, 0x48, 0xbe, 0x9c, 0xe2, 0x63, 0x70, 0xf3, 0x79, 0x11, 0x8a, 0x9e, 0x1b, 0xef,
0x0c, 0xb1, 0x5f, 0xd5, 0xfb, 0x1f, 0xe6, 0xa4, 0x25, 0xab, 0x3c, 0x4b, 0x4c, 0x39, 0xfa, 0x01,
0x41, 0x9d, 0xc1, 0x27, 0xe0, 0x53, 0xc6, 0x8a, 0x17, 0xa1, 0xe8, 0x89, 0x78, 0x67, 0xb8, 0x5b,
0xb3, 0xc6, 0x36, 0x9d, 0x54, 0x65, 0x3c, 0x00, 0x8f, 0x17, 0x73, 0x0a, 0x9d, 0x9e, 0x88, 0x3b,
0xc3, 0xbd, 0x75, 0xf1, 0xfe, 0xf9, 0x62, 0x4e, 0x89, 0x05, 0x45, 0x0f, 0xc1, 0x33, 0x11, 0xb6,
0xc0, 0x9d, 0x8c, 0xcf, 0xbb, 0x0d, 0x04, 0xf0, 0x47, 0xe3, 0xf7, 0xe3, 0xf3, 0x71, 0x57, 0x44,
0xbf, 0x04, 0xf8, 0xa5, 0x38, 0x76, 0xc0, 0x51, 0xa9, 0xed, 0xdc, 0x4c, 0x1c, 0x95, 0x62, 0x17,
0xdc, 0x2f, 0xb4, 0xb0, 0x3d, 0xfe, 0x4b, 0xcc, 0x11, 0xef, 0x43, 0xf3, 0x46, 0x5e, 0x7f, 0xa3,
0xd0, 0xb5, 0xb9, 0x32, 0xc0, 0x07, 0xe0, 0xd3, 0x77, 0x55, 0x70, 0x11, 0x7a, 0x3d, 0x11, 0xb7,
0x93, 0x2a, 0x32, 0xe8, 0x82, 0xa5, 0xe6, 0xb0, 0x59, 0xa2, 0x6d, 0x60, 0x54, 0x29, 0x4b, 0x43,
0xbf, 0x54, 0xa5, 0xcc, 0xf6, 0x21, 0xad, 0xc3, 0x56, 0x4f, 0xc4, 0x41, 0x62, 0x8e, 0xf8, 0x08,
0xe0, 0x52, 0x93, 0x64, 0x4a, 0x3f, 0x49, 0x0e, 0xdb, 0x3d, 0x11, 0xbb, 0x49, 0x50, 0x65, 0x8e,
0x39, 0x0a, 0xa0, 0x75, 0x96, 0xf3, 0x54, 0x65, 0x57, 0xd1, 0x00, 0xfc, 0x51, 0x3e, 0x93, 0x2a,
0x5b, 0x75, 0x13, 0x1b, 0xba, 0x39, 0x75, 0xb7, 0xe8, 0x2b, 0xb4, 0x4f, 0xd9, 0xb8, 0x94, 0x6b,
0xe3, 0x77, 0x6a, 0xd9, 0x6b, 0x7e, 0x97, 0xa2, 0x49, 0x55, 0xae, 0x06, 0x57, 0xa5, 0x50, 0x3b,
0x29, 0x83, 0xa5, 0x41, 0xee, 0x06, 0x83, 0xbc, 0xbf, 0x0c, 0x8a, 0x7e, 0x0a, 0x68, 0x4e, 0x58,
0x72, 0x81, 0xcf, 0xc1, 0x4b, 0x25, 0xcb, 0x6a, 0x29, 0xc2, 0xba, 0x9d, 0xad, 0xf6, 0x47, 0x92,
0xe5, 0x38, 0x63, 0xbd, 0x48, 0x2c, 0x0a, 0xf7, 0xa0, 0xc5, 0x6a, 0x46, 0xc6, 0x03, 0xc7, 0x7a,
0xe0, 0x9b, 0xf0, 0x98, 0xf7, 0x5f, 0x41, 0x50, 0x63, 0x97, 0xb7, 0x10, 0xa5, 0x7d, 0xb7, 0x6e,
0xe1, 0xd8, 0x5c, 0x19, 0xbc, 0x75, 0x5e, 0x8b, 0xe8, 0x1d, 0x78, 0xa7, 0x99, 0x62, 0xc4, 0x72,
0x25, 0x2a, 0x52, 0xb9, 0x1e, 0x08, 0xde, 0x99, 0x9c, 0x2d, 0x49, 0xf6, 0x6c, 0xb4, 0x47, 0x4a,
0xdb, 0x09, 0x83, 0xc4, 0x1c, 0x87, 0xbf, 0x3d, 0x70, 0x46, 0x27, 0x18, 0x83, 0xa7, 0x8c, 0xd0,
0xff, 0xf5, 0x08, 0x46, 0x77, 0xff, 0xee, 0xc2, 0x46, 0x0d, 0x7c, 0x0a, 0xee, 0x15, 0x31, 0xde,
0xad, 0x6c, 0x82, 0x1e, 0x41, 0x70, 0x45, 0x3c, 0x61, 0x4d, 0x72, 0xb6, 0x0d, 0x21, 0x16, 0x03,
0x61, 0xf4, 0xa7, 0xb2, 0xd8, 0x4a, 0xff, 0x19, 0xb8, 0xc5, 0xa6, 0xab, 0x74, 0xeb, 0xc4, 0x72,
0xad, 0x1a, 0xd8, 0x87, 0x56, 0x41, 0x3c, 0x59, 0x64, 0x97, 0xdb, 0xe1, 0x0f, 0xc1, 0x4f, 0xe9,
0x9a, 0x98, 0xb6, 0x83, 0xbf, 0x30, 0x8f, 0x87, 0x81, 0x6f, 0xdf, 0x61, 0x08, 0x6d, 0xb5, 0x5c,
0xdc, 0x35, 0xc2, 0xbd, 0xd5, 0xff, 0x50, 0x61, 0xa2, 0xc6, 0x40, 0xe0, 0x1b, 0xd8, 0xd5, 0x74,
0x43, 0xba, 0xa0, 0xd3, 0x7f, 0xa5, 0x1e, 0xd8, 0xef, 0x89, 0x0b, 0x5c, 0xbb, 0xcb, 0x7e, 0xe7,
0xf6, 0xde, 0x46, 0x0d, 0x1c, 0x00, 0x5c, 0x98, 0x47, 0xef, 0xa3, 0x56, 0x4c, 0xb8, 0xaa, 0xdb,
0x97, 0x70, 0xe3, 0x34, 0x2f, 0xa1, 0xb3, 0x62, 0x58, 0x13, 0xb6, 0x60, 0x5d, 0xf8, 0x36, 0x75,
0xf4, 0x27, 0x00, 0x00, 0xff, 0xff, 0x95, 0xf4, 0xe3, 0x82, 0x7a, 0x05, 0x00, 0x00,
}

+ 71
- 0
proto/defs.proto View File

@ -0,0 +1,71 @@
syntax = "proto3";
package protodb;
message Batch {
repeated Operation ops = 1;
}
message Operation {
Entity entity = 1;
enum Type {
SET = 0;
DELETE = 1;
}
Type type = 2;
}
message Entity {
int32 id = 1;
bytes key = 2;
bytes value = 3;
bool exists = 4;
bytes start = 5;
bytes end = 6;
string err = 7;
int64 created_at = 8;
}
message Nothing {
}
message Domain {
bytes start = 1;
bytes end = 2;
}
message Iterator {
Domain domain = 1;
bool valid = 2;
bytes key = 3;
bytes value = 4;
}
message Stats {
map<string, string> data = 1;
int64 time_at = 2;
}
message Init {
string Type = 1;
string Name = 2;
string Dir = 3;
}
service DB {
rpc init(Init) returns (Entity) {}
rpc get(Entity) returns (Entity) {}
rpc getStream(stream Entity) returns (stream Entity) {}
rpc has(Entity) returns (Entity) {}
rpc set(Entity) returns (Nothing) {}
rpc setSync(Entity) returns (Nothing) {}
rpc delete(Entity) returns (Nothing) {}
rpc deleteSync(Entity) returns (Nothing) {}
rpc iterator(Entity) returns (stream Iterator) {}
rpc reverseIterator(Entity) returns (stream Iterator) {}
// rpc print(Nothing) returns (Entity) {}
rpc stats(Nothing) returns (Stats) {}
rpc batchWrite(Batch) returns (Nothing) {}
rpc batchWriteSync(Batch) returns (Nothing) {}
}

+ 37
- 0
remotedb/doc.go View File

@ -0,0 +1,37 @@
/*
remotedb is a package for connecting to distributed Tendermint db.DB
instances. The purpose is to detach difficult deployments such as
CLevelDB that requires gcc or perhaps for databases that require
custom configurations such as extra disk space. It also eases
the burden and cost of deployment of dependencies for databases
to be used by Tendermint developers. Most importantly it is built
over the high performant gRPC transport.
remotedb's RemoteDB implements db.DB so can be used normally
like other databases. One just has to explicitly connect to the
remote database with a client setup such as:
client, err := remotedb.NewInsecure(addr)
// Make sure to invoke InitRemote!
if err := client.InitRemote(&remotedb.Init{Name: "test-remote-db", Type: "leveldb"}); err != nil {
log.Fatalf("Failed to initialize the remote db")
}
client.Set(key1, value)
gv1 := client.SetSync(k2, v2)
client.Delete(k1)
gv2 := client.Get(k1)
for itr := client.Iterator(k1, k9); itr.Valid(); itr.Next() {
ik, iv := itr.Key(), itr.Value()
ds, de := itr.Domain()
}
stats := client.Stats()
if !client.Has(dk1) {
client.SetSync(dk1, dv1)
}
*/
package remotedb

+ 262
- 0
remotedb/remotedb.go View File

@ -0,0 +1,262 @@
package remotedb
import (
"context"
"fmt"
"github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/grpcdb"
protodb "github.com/tendermint/tmlibs/proto"
)
type RemoteDB struct {
ctx context.Context
dc protodb.DBClient
}
func NewRemoteDB(serverAddr string, serverKey string) (*RemoteDB, error) {
return newRemoteDB(grpcdb.NewClient(serverAddr, serverKey))
}
func newRemoteDB(gdc protodb.DBClient, err error) (*RemoteDB, error) {
if err != nil {
return nil, err
}
return &RemoteDB{dc: gdc, ctx: context.Background()}, nil
}
type Init struct {
Dir string
Name string
Type string
}
func (rd *RemoteDB) InitRemote(in *Init) error {
_, err := rd.dc.Init(rd.ctx, &protodb.Init{Dir: in.Dir, Type: in.Type, Name: in.Name})
return err
}
var _ db.DB = (*RemoteDB)(nil)
// Close is a noop currently
func (rd *RemoteDB) Close() {
}
func (rd *RemoteDB) Delete(key []byte) {
if _, err := rd.dc.Delete(rd.ctx, &protodb.Entity{Key: key}); err != nil {
panic(fmt.Sprintf("RemoteDB.Delete: %v", err))
}
}
func (rd *RemoteDB) DeleteSync(key []byte) {
if _, err := rd.dc.DeleteSync(rd.ctx, &protodb.Entity{Key: key}); err != nil {
panic(fmt.Sprintf("RemoteDB.DeleteSync: %v", err))
}
}
func (rd *RemoteDB) Set(key, value []byte) {
if _, err := rd.dc.Set(rd.ctx, &protodb.Entity{Key: key, Value: value}); err != nil {
panic(fmt.Sprintf("RemoteDB.Set: %v", err))
}
}
func (rd *RemoteDB) SetSync(key, value []byte) {
if _, err := rd.dc.SetSync(rd.ctx, &protodb.Entity{Key: key, Value: value}); err != nil {
panic(fmt.Sprintf("RemoteDB.SetSync: %v", err))
}
}
func (rd *RemoteDB) Get(key []byte) []byte {
res, err := rd.dc.Get(rd.ctx, &protodb.Entity{Key: key})
if err != nil {
panic(fmt.Sprintf("RemoteDB.Get error: %v", err))
}
return res.Value
}
func (rd *RemoteDB) Has(key []byte) bool {
res, err := rd.dc.Has(rd.ctx, &protodb.Entity{Key: key})
if err != nil {
panic(fmt.Sprintf("RemoteDB.Has error: %v", err))
}
return res.Exists
}
func (rd *RemoteDB) ReverseIterator(start, end []byte) db.Iterator {
dic, err := rd.dc.ReverseIterator(rd.ctx, &protodb.Entity{Start: start, End: end})
if err != nil {
panic(fmt.Sprintf("RemoteDB.Iterator error: %v", err))
}
return makeReverseIterator(dic)
}
func (rd *RemoteDB) NewBatch() db.Batch {
return &batch{
db: rd,
ops: nil,
}
}
// TODO: Implement Print when db.DB implements a method
// to print to a string and not db.Print to stdout.
func (rd *RemoteDB) Print() {
panic("Unimplemented")
}
func (rd *RemoteDB) Stats() map[string]string {
stats, err := rd.dc.Stats(rd.ctx, &protodb.Nothing{})
if err != nil {
panic(fmt.Sprintf("RemoteDB.Stats error: %v", err))
}
if stats == nil {
return nil
}
return stats.Data
}
func (rd *RemoteDB) Iterator(start, end []byte) db.Iterator {
dic, err := rd.dc.Iterator(rd.ctx, &protodb.Entity{Start: start, End: end})
if err != nil {
panic(fmt.Sprintf("RemoteDB.Iterator error: %v", err))
}
return makeIterator(dic)
}
func makeIterator(dic protodb.DB_IteratorClient) db.Iterator {
return &iterator{dic: dic}
}
func makeReverseIterator(dric protodb.DB_ReverseIteratorClient) db.Iterator {
return &reverseIterator{dric: dric}
}
type reverseIterator struct {
dric protodb.DB_ReverseIteratorClient
cur *protodb.Iterator
}
var _ db.Iterator = (*iterator)(nil)
func (rItr *reverseIterator) Valid() bool {
return rItr.cur != nil && rItr.cur.Valid
}
func (rItr *reverseIterator) Domain() (start, end []byte) {
if rItr.cur == nil || rItr.cur.Domain == nil {
return nil, nil
}
return rItr.cur.Domain.Start, rItr.cur.Domain.End
}
// Next advances the current reverseIterator
func (rItr *reverseIterator) Next() {
var err error
rItr.cur, err = rItr.dric.Recv()
if err != nil {
panic(fmt.Sprintf("RemoteDB.ReverseIterator.Next error: %v", err))
}
}
func (rItr *reverseIterator) Key() []byte {
if rItr.cur == nil {
return nil
}
return rItr.cur.Key
}
func (rItr *reverseIterator) Value() []byte {
if rItr.cur == nil {
return nil
}
return rItr.cur.Value
}
func (rItr *reverseIterator) Close() {
}
// iterator implements the db.Iterator by retrieving
// streamed iterators from the remote backend as
// needed. It is NOT safe for concurrent usage,
// matching the behavior of other iterators.
type iterator struct {
dic protodb.DB_IteratorClient
cur *protodb.Iterator
}
var _ db.Iterator = (*iterator)(nil)
func (itr *iterator) Valid() bool {
return itr.cur != nil && itr.cur.Valid
}
func (itr *iterator) Domain() (start, end []byte) {
if itr.cur == nil || itr.cur.Domain == nil {
return nil, nil
}
return itr.cur.Domain.Start, itr.cur.Domain.End
}
// Next advances the current iterator
func (itr *iterator) Next() {
var err error
itr.cur, err = itr.dic.Recv()
if err != nil {
panic(fmt.Sprintf("RemoteDB.Iterator.Next error: %v", err))
}
}
func (itr *iterator) Key() []byte {
if itr.cur == nil {
return nil
}
return itr.cur.Key
}
func (itr *iterator) Value() []byte {
if itr.cur == nil {
return nil
}
return itr.cur.Value
}
func (itr *iterator) Close() {
err := itr.dic.CloseSend()
if err != nil {
panic(fmt.Sprintf("Error closing iterator: %v", err))
}
}
type batch struct {
db *RemoteDB
ops []*protodb.Operation
}
var _ db.Batch = (*batch)(nil)
func (bat *batch) Set(key, value []byte) {
op := &protodb.Operation{
Entity: &protodb.Entity{Key: key, Value: value},
Type: protodb.Operation_SET,
}
bat.ops = append(bat.ops, op)
}
func (bat *batch) Delete(key []byte) {
op := &protodb.Operation{
Entity: &protodb.Entity{Key: key},
Type: protodb.Operation_DELETE,
}
bat.ops = append(bat.ops, op)
}
func (bat *batch) Write() {
if _, err := bat.db.dc.BatchWrite(bat.db.ctx, &protodb.Batch{Ops: bat.ops}); err != nil {
panic(fmt.Sprintf("RemoteDB.BatchWrite: %v", err))
}
}
func (bat *batch) WriteSync() {
if _, err := bat.db.dc.BatchWriteSync(bat.db.ctx, &protodb.Batch{Ops: bat.ops}); err != nil {
panic(fmt.Sprintf("RemoteDB.BatchWriteSync: %v", err))
}
}

+ 115
- 0
remotedb/remotedb_test.go View File

@ -0,0 +1,115 @@
package remotedb_test
import (
"net"
"testing"
"github.com/stretchr/testify/require"
"github.com/tendermint/tmlibs/grpcdb"
"github.com/tendermint/tmlibs/remotedb"
)
func TestRemoteDB(t *testing.T) {
cert := "::.crt"
key := "::.key"
ln, err := net.Listen("tcp", "0.0.0.0:0")
require.Nil(t, err, "expecting a port to have been assigned on which we can listen")
srv, err := grpcdb.NewServer(cert, key)
require.Nil(t, err)
defer srv.Stop()
go func() {
if err := srv.Serve(ln); err != nil {
t.Fatalf("BindServer: %v", err)
}
}()
client, err := remotedb.NewRemoteDB(ln.Addr().String(), cert)
require.Nil(t, err, "expecting a successful client creation")
require.Nil(t, client.InitRemote(&remotedb.Init{Name: "test-remote-db", Type: "leveldb"}))
k1 := []byte("key-1")
v1 := client.Get(k1)
require.Equal(t, 0, len(v1), "expecting no key1 to have been stored")
vv1 := []byte("value-1")
client.Set(k1, vv1)
gv1 := client.Get(k1)
require.Equal(t, gv1, vv1)
// Simple iteration
itr := client.Iterator(nil, nil)
itr.Next()
require.Equal(t, itr.Key(), []byte("key-1"))
require.Equal(t, itr.Value(), []byte("value-1"))
require.Panics(t, itr.Next)
itr.Close()
// Set some more keys
k2 := []byte("key-2")
v2 := []byte("value-2")
client.SetSync(k2, v2)
has := client.Has(k2)
require.True(t, has)
gv2 := client.Get(k2)
require.Equal(t, gv2, v2)
// More iteration
itr = client.Iterator(nil, nil)
itr.Next()
require.Equal(t, itr.Key(), []byte("key-1"))
require.Equal(t, itr.Value(), []byte("value-1"))
itr.Next()
require.Equal(t, itr.Key(), []byte("key-2"))
require.Equal(t, itr.Value(), []byte("value-2"))
require.Panics(t, itr.Next)
itr.Close()
// Deletion
client.Delete(k1)
client.DeleteSync(k2)
gv1 = client.Get(k1)
gv2 = client.Get(k2)
require.Equal(t, len(gv2), 0, "after deletion, not expecting the key to exist anymore")
require.Equal(t, len(gv1), 0, "after deletion, not expecting the key to exist anymore")
// Batch tests - set
k3 := []byte("key-3")
k4 := []byte("key-4")
k5 := []byte("key-5")
v3 := []byte("value-3")
v4 := []byte("value-4")
v5 := []byte("value-5")
bat := client.NewBatch()
bat.Set(k3, v3)
bat.Set(k4, v4)
rv3 := client.Get(k3)
require.Equal(t, 0, len(rv3), "expecting no k3 to have been stored")
rv4 := client.Get(k4)
require.Equal(t, 0, len(rv4), "expecting no k4 to have been stored")
bat.Write()
rv3 = client.Get(k3)
require.Equal(t, rv3, v3, "expecting k3 to have been stored")
rv4 = client.Get(k4)
require.Equal(t, rv4, v4, "expecting k4 to have been stored")
// Batch tests - deletion
bat = client.NewBatch()
bat.Delete(k4)
bat.Delete(k3)
bat.WriteSync()
rv3 = client.Get(k3)
require.Equal(t, 0, len(rv3), "expecting k3 to have been deleted")
rv4 = client.Get(k4)
require.Equal(t, 0, len(rv4), "expecting k4 to have been deleted")
// Batch tests - set and delete
bat = client.NewBatch()
bat.Set(k4, v4)
bat.Set(k5, v5)
bat.Delete(k4)
bat.WriteSync()
rv4 = client.Get(k4)
require.Equal(t, 0, len(rv4), "expecting k4 to have been deleted")
rv5 := client.Get(k5)
require.Equal(t, rv5, v5, "expecting k5 to have been stored")
}

+ 6
- 0
test.sh View File

@ -4,6 +4,9 @@ set -e
# run the linter
# make metalinter_test
# setup certs
make gen_certs
# run the unit tests with coverage
echo "" > coverage.txt
for d in $(go list ./... | grep -v vendor); do
@ -13,3 +16,6 @@ for d in $(go list ./... | grep -v vendor); do
rm profile.out
fi
done
# cleanup certs
make clean_certs

Loading…
Cancel
Save