From 11bee6194aeff40a7b3d62123db1a7cbca2e81e7 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Mon, 12 Mar 2018 00:39:34 -0700 Subject: [PATCH] DB as a service Fixes https://github.com/tendermint/tendermint/issues/1162 Databases as a service! Can now access Databases as a remote service via gRPC for performance and easy deployment. The caveat is that each service is stateful in regards to the DB i.e. each unique service uses only one unique DB but nonetheless multiple clients can access it. A full standalone example ```go package main import ( "bytes" "context" "log" grpcdb "github.com/tendermint/tmlibs/grpcdb" protodb "github.com/tendermint/tmlibs/proto" ) func main() { addr := ":8998" go func() { if err := grpcdb.BindRemoteDBServer(addr); err != nil { log.Fatalf("BindRemoteDBServer: %v", err) } }() client, err := grpcdb.NewClient(addr, false) if err != nil { log.Fatalf("Failed to create grpcDB client: %v", err) } ctx := context.Background() // 1. Initialize the DB in := &protodb.Init{ Type: "leveldb", Name: "grpc-uno-test", Dir: ".", } if _, err := client.Init(ctx, in); err != nil { log.Fatalf("Init error: %v", err) } // 2. Now it can be used! query1 := &protodb.Entity{Key: []byte("Project"), Value: []byte("Tmlibs-on-gRPC")} if _, err := client.SetSync(ctx, query1); err != nil { log.Fatalf("SetSync err: %v", err) } query2 := &protodb.Entity{Key: []byte("Project")} read, err := client.Get(ctx, query2) if err != nil { log.Fatalf("Get err: %v", err) } if g, w := read.Value, []byte("Tmlibs-on-gRPC"); !bytes.Equal(g, w) { log.Fatalf("got= (%q ==> % X)\nwant=(%q ==> % X)", g, g, w, w) } } ``` --- Makefile | 3 + grpcdb/client.go | 19 + grpcdb/example_test.go | 50 +++ grpcdb/server.go | 142 ++++++++ proto/defs.pb.go | 784 +++++++++++++++++++++++++++++++++++++++++ proto/defs.proto | 57 +++ 6 files changed, 1055 insertions(+) create mode 100644 grpcdb/client.go create mode 100644 grpcdb/example_test.go create mode 100644 grpcdb/server.go create mode 100644 proto/defs.pb.go create mode 100644 proto/defs.proto diff --git a/Makefile b/Makefile index 9e181f9f9..0236c480b 100644 --- a/Makefile +++ b/Makefile @@ -119,3 +119,6 @@ metalinter_all: # unless there is a reason not to. # https://www.gnu.org/software/make/manual/html_node/Phony-Targets.html .PHONY: check protoc build check_tools get_tools get_protoc update_tools get_vendor_deps test fmt metalinter metalinter_all + +grpc_dbserver: + protoc -I proto/ proto/defs.proto --go_out=plugins=grpc:proto diff --git a/grpcdb/client.go b/grpcdb/client.go new file mode 100644 index 000000000..45409a1f9 --- /dev/null +++ b/grpcdb/client.go @@ -0,0 +1,19 @@ +package grpcdb + +import ( + "google.golang.org/grpc" + + protodb "github.com/tendermint/tmlibs/proto" +) + +func NewClient(serverAddr string, secure bool) (protodb.DBClient, error) { + var opts []grpc.DialOption + if !secure { + opts = append(opts, grpc.WithInsecure()) + } + cc, err := grpc.Dial(serverAddr, opts...) + if err != nil { + return nil, err + } + return protodb.NewDBClient(cc), nil +} diff --git a/grpcdb/example_test.go b/grpcdb/example_test.go new file mode 100644 index 000000000..653180113 --- /dev/null +++ b/grpcdb/example_test.go @@ -0,0 +1,50 @@ +package grpcdb_test + +import ( + "bytes" + "context" + "log" + + grpcdb "github.com/tendermint/tmlibs/grpcdb" + protodb "github.com/tendermint/tmlibs/proto" +) + +func Example() { + addr := ":8998" + go func() { + if err := grpcdb.BindRemoteDBServer(addr); err != nil { + log.Fatalf("BindRemoteDBServer: %v", err) + } + }() + + client, err := grpcdb.NewClient(addr, false) + if err != nil { + log.Fatalf("Failed to create grpcDB client: %v", err) + } + + ctx := context.Background() + // 1. Initialize the DB + in := &protodb.Init{ + Type: "leveldb", + Name: "grpc-uno-test", + Dir: ".", + } + if _, err := client.Init(ctx, in); err != nil { + log.Fatalf("Init error: %v", err) + } + + // 2. Now it can be used! + query1 := &protodb.Entity{Key: []byte("Project"), Value: []byte("Tmlibs-on-gRPC")} + if _, err := client.SetSync(ctx, query1); err != nil { + log.Fatalf("SetSync err: %v", err) + } + + query2 := &protodb.Entity{Key: []byte("Project")} + read, err := client.Get(ctx, query2) + if err != nil { + log.Fatalf("Get err: %v", err) + } + if g, w := read.Value, []byte("Tmlibs-on-gRPC"); !bytes.Equal(g, w) { + log.Fatalf("got= (%q ==> % X)\nwant=(%q ==> % X)", g, g, w, w) + } +} diff --git a/grpcdb/server.go b/grpcdb/server.go new file mode 100644 index 000000000..26d0ffa9e --- /dev/null +++ b/grpcdb/server.go @@ -0,0 +1,142 @@ +package grpcdb + +import ( +"log" + "context" + "net" + "sync" + "time" + + "google.golang.org/grpc" + + "github.com/tendermint/tmlibs/db" + protodb "github.com/tendermint/tmlibs/proto" +) + +func BindRemoteDBServer(addr string, opts ...grpc.ServerOption) error { + ln, err := net.Listen("tcp", addr) + if err != nil { + return err + } + srv := grpc.NewServer(opts...) + protodb.RegisterDBServer(srv, new(server)) + return srv.Serve(ln) +} + +type server struct { + mu sync.Mutex + db db.DB +} + +var _ protodb.DBServer = (*server)(nil) + +func (s *server) Init(ctx context.Context, in *protodb.Init) (*protodb.Entity, error) { + s.mu.Lock() + defer s.mu.Unlock() + +log.Printf("in: %+v\n", in) + s.db = db.NewDB(in.Name, db.DBBackendType(in.Type), in.Dir) + return &protodb.Entity{TimeAt: time.Now().Unix()}, nil +} + +func (s *server) Delete(ctx context.Context, in *protodb.Entity) (*protodb.Nothing, error) { + s.db.Delete(in.Key) + return nothing, nil +} + +var nothing = new(protodb.Nothing) +func (s *server) DeleteSync(ctx context.Context, in *protodb.Entity) (*protodb.Nothing, error) { + s.db.DeleteSync(in.Key) + return nothing, nil +} + +func (s *server) Get(ctx context.Context, in *protodb.Entity) (*protodb.Entity, error) { + value := s.db.Get(in.Key) + return &protodb.Entity{Value: value}, nil +} + +func (s *server) GetStream(ds protodb.DB_GetStreamServer) error { + // Receive routine + responsesChan := make(chan *protodb.Entity) + go func() { + defer close(responsesChan) + ctx := context.Background() + for { + in, err := ds.Recv() + if err != nil { + responsesChan <- &protodb.Entity{Err: err.Error()} + return + } + out, err := s.Get(ctx, in) + if err != nil { + if out == nil { + out = new(protodb.Entity) + out.Key = in.Key + } + out.Err = err.Error() + responsesChan <- out + return + } + + // Otherwise continue on + responsesChan <- out + } + }() + + // Send routine, block until we return + for out := range responsesChan { + if err := ds.Send(out); err != nil { + return err + } + } + return nil +} + +func (s *server) Has(ctx context.Context, in *protodb.Entity) (*protodb.Entity, error) { + exists := s.db.Has(in.Key) + return &protodb.Entity{Exists: exists}, nil +} + +func (s *server) Set(ctx context.Context, in *protodb.Entity) (*protodb.Nothing, error) { + s.db.Set(in.Key, in.Value) + return nothing, nil +} + +func (s *server) SetSync(ctx context.Context, in *protodb.Entity) (*protodb.Nothing, error) { + s.db.SetSync(in.Key, in.Value) + return nothing, nil +} + +func (s *server) Iterator(query *protodb.Entity, dis protodb.DB_IteratorServer) error { + it := s.db.Iterator(query.Start, query.End) + return s.handleIterator(it, dis.Send) +} + +func (s *server) handleIterator(it db.Iterator, sendFunc func(*protodb.Iterator) error) error { + for it.Valid() { + start, end := it.Domain() + out := &protodb.Iterator{ + Domain: &protodb.DDomain{Start: start, End: end}, + Valid: it.Valid(), + Key: it.Key(), + Value: it.Value(), + } + if err := sendFunc(out); err != nil { + return err + } + + // Finally move the iterator forward + it.Next() + } + return nil +} + +func (s *server) ReverseIterator(query *protodb.Entity, dis protodb.DB_ReverseIteratorServer) error { + it := s.db.ReverseIterator(query.Start, query.End) + return s.handleIterator(it, dis.Send) +} + +func (s *server) Stats(context.Context, *protodb.Nothing) (*protodb.Stats, error) { + stats := s.db.Stats() + return &protodb.Stats{Data: stats, TimeAt: time.Now().Unix()}, nil +} diff --git a/proto/defs.pb.go b/proto/defs.pb.go new file mode 100644 index 000000000..61f687504 --- /dev/null +++ b/proto/defs.pb.go @@ -0,0 +1,784 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: defs.proto + +/* +Package protodb is a generated protocol buffer package. + +It is generated from these files: + defs.proto + +It has these top-level messages: + Entity + Nothing + DDomain + Iterator + Stats + Init +*/ +package protodb + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type Entity struct { + Id int32 `protobuf:"varint,1,opt,name=id" json:"id,omitempty"` + Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` + Value []byte `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"` + Exists bool `protobuf:"varint,4,opt,name=exists" json:"exists,omitempty"` + Start []byte `protobuf:"bytes,5,opt,name=start,proto3" json:"start,omitempty"` + End []byte `protobuf:"bytes,6,opt,name=end,proto3" json:"end,omitempty"` + Err string `protobuf:"bytes,7,opt,name=err" json:"err,omitempty"` + Print string `protobuf:"bytes,8,opt,name=print" json:"print,omitempty"` + TimeAt int64 `protobuf:"varint,9,opt,name=time_at,json=timeAt" json:"time_at,omitempty"` +} + +func (m *Entity) Reset() { *m = Entity{} } +func (m *Entity) String() string { return proto.CompactTextString(m) } +func (*Entity) ProtoMessage() {} +func (*Entity) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *Entity) GetId() int32 { + if m != nil { + return m.Id + } + return 0 +} + +func (m *Entity) GetKey() []byte { + if m != nil { + return m.Key + } + return nil +} + +func (m *Entity) GetValue() []byte { + if m != nil { + return m.Value + } + return nil +} + +func (m *Entity) GetExists() bool { + if m != nil { + return m.Exists + } + return false +} + +func (m *Entity) GetStart() []byte { + if m != nil { + return m.Start + } + return nil +} + +func (m *Entity) GetEnd() []byte { + if m != nil { + return m.End + } + return nil +} + +func (m *Entity) GetErr() string { + if m != nil { + return m.Err + } + return "" +} + +func (m *Entity) GetPrint() string { + if m != nil { + return m.Print + } + return "" +} + +func (m *Entity) GetTimeAt() int64 { + if m != nil { + return m.TimeAt + } + return 0 +} + +type Nothing struct { +} + +func (m *Nothing) Reset() { *m = Nothing{} } +func (m *Nothing) String() string { return proto.CompactTextString(m) } +func (*Nothing) ProtoMessage() {} +func (*Nothing) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +type DDomain struct { + Start []byte `protobuf:"bytes,1,opt,name=start,proto3" json:"start,omitempty"` + End []byte `protobuf:"bytes,2,opt,name=end,proto3" json:"end,omitempty"` +} + +func (m *DDomain) Reset() { *m = DDomain{} } +func (m *DDomain) String() string { return proto.CompactTextString(m) } +func (*DDomain) ProtoMessage() {} +func (*DDomain) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +func (m *DDomain) GetStart() []byte { + if m != nil { + return m.Start + } + return nil +} + +func (m *DDomain) GetEnd() []byte { + if m != nil { + return m.End + } + return nil +} + +type Iterator struct { + Domain *DDomain `protobuf:"bytes,1,opt,name=domain" json:"domain,omitempty"` + Valid bool `protobuf:"varint,2,opt,name=valid" json:"valid,omitempty"` + Key []byte `protobuf:"bytes,3,opt,name=key,proto3" json:"key,omitempty"` + Value []byte `protobuf:"bytes,4,opt,name=value,proto3" json:"value,omitempty"` +} + +func (m *Iterator) Reset() { *m = Iterator{} } +func (m *Iterator) String() string { return proto.CompactTextString(m) } +func (*Iterator) ProtoMessage() {} +func (*Iterator) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } + +func (m *Iterator) GetDomain() *DDomain { + if m != nil { + return m.Domain + } + return nil +} + +func (m *Iterator) GetValid() bool { + if m != nil { + return m.Valid + } + return false +} + +func (m *Iterator) GetKey() []byte { + if m != nil { + return m.Key + } + return nil +} + +func (m *Iterator) GetValue() []byte { + if m != nil { + return m.Value + } + return nil +} + +type Stats struct { + Data map[string]string `protobuf:"bytes,1,rep,name=data" json:"data,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + TimeAt int64 `protobuf:"varint,2,opt,name=time_at,json=timeAt" json:"time_at,omitempty"` +} + +func (m *Stats) Reset() { *m = Stats{} } +func (m *Stats) String() string { return proto.CompactTextString(m) } +func (*Stats) ProtoMessage() {} +func (*Stats) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } + +func (m *Stats) GetData() map[string]string { + if m != nil { + return m.Data + } + return nil +} + +func (m *Stats) GetTimeAt() int64 { + if m != nil { + return m.TimeAt + } + return 0 +} + +type Init struct { + Type string `protobuf:"bytes,1,opt,name=Type" json:"Type,omitempty"` + Name string `protobuf:"bytes,2,opt,name=Name" json:"Name,omitempty"` + Dir string `protobuf:"bytes,3,opt,name=Dir" json:"Dir,omitempty"` +} + +func (m *Init) Reset() { *m = Init{} } +func (m *Init) String() string { return proto.CompactTextString(m) } +func (*Init) ProtoMessage() {} +func (*Init) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } + +func (m *Init) GetType() string { + if m != nil { + return m.Type + } + return "" +} + +func (m *Init) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *Init) GetDir() string { + if m != nil { + return m.Dir + } + return "" +} + +func init() { + proto.RegisterType((*Entity)(nil), "protodb.Entity") + proto.RegisterType((*Nothing)(nil), "protodb.Nothing") + proto.RegisterType((*DDomain)(nil), "protodb.DDomain") + proto.RegisterType((*Iterator)(nil), "protodb.Iterator") + proto.RegisterType((*Stats)(nil), "protodb.Stats") + proto.RegisterType((*Init)(nil), "protodb.Init") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// Client API for DB service + +type DBClient interface { + Init(ctx context.Context, in *Init, opts ...grpc.CallOption) (*Entity, error) + Get(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Entity, error) + GetStream(ctx context.Context, opts ...grpc.CallOption) (DB_GetStreamClient, error) + Has(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Entity, error) + Set(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Nothing, error) + SetSync(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Nothing, error) + Delete(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Nothing, error) + DeleteSync(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Nothing, error) + Iterator(ctx context.Context, in *Entity, opts ...grpc.CallOption) (DB_IteratorClient, error) + ReverseIterator(ctx context.Context, in *Entity, opts ...grpc.CallOption) (DB_ReverseIteratorClient, error) + // rpc print(Nothing) returns (Entity) {} + Stats(ctx context.Context, in *Nothing, opts ...grpc.CallOption) (*Stats, error) +} + +type dBClient struct { + cc *grpc.ClientConn +} + +func NewDBClient(cc *grpc.ClientConn) DBClient { + return &dBClient{cc} +} + +func (c *dBClient) Init(ctx context.Context, in *Init, opts ...grpc.CallOption) (*Entity, error) { + out := new(Entity) + err := grpc.Invoke(ctx, "/protodb.DB/init", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *dBClient) Get(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Entity, error) { + out := new(Entity) + err := grpc.Invoke(ctx, "/protodb.DB/get", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *dBClient) GetStream(ctx context.Context, opts ...grpc.CallOption) (DB_GetStreamClient, error) { + stream, err := grpc.NewClientStream(ctx, &_DB_serviceDesc.Streams[0], c.cc, "/protodb.DB/getStream", opts...) + if err != nil { + return nil, err + } + x := &dBGetStreamClient{stream} + return x, nil +} + +type DB_GetStreamClient interface { + Send(*Entity) error + Recv() (*Entity, error) + grpc.ClientStream +} + +type dBGetStreamClient struct { + grpc.ClientStream +} + +func (x *dBGetStreamClient) Send(m *Entity) error { + return x.ClientStream.SendMsg(m) +} + +func (x *dBGetStreamClient) Recv() (*Entity, error) { + m := new(Entity) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *dBClient) Has(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Entity, error) { + out := new(Entity) + err := grpc.Invoke(ctx, "/protodb.DB/has", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *dBClient) Set(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Nothing, error) { + out := new(Nothing) + err := grpc.Invoke(ctx, "/protodb.DB/set", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *dBClient) SetSync(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Nothing, error) { + out := new(Nothing) + err := grpc.Invoke(ctx, "/protodb.DB/setSync", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *dBClient) Delete(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Nothing, error) { + out := new(Nothing) + err := grpc.Invoke(ctx, "/protodb.DB/delete", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *dBClient) DeleteSync(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Nothing, error) { + out := new(Nothing) + err := grpc.Invoke(ctx, "/protodb.DB/deleteSync", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *dBClient) Iterator(ctx context.Context, in *Entity, opts ...grpc.CallOption) (DB_IteratorClient, error) { + stream, err := grpc.NewClientStream(ctx, &_DB_serviceDesc.Streams[1], c.cc, "/protodb.DB/iterator", opts...) + if err != nil { + return nil, err + } + x := &dBIteratorClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type DB_IteratorClient interface { + Recv() (*Iterator, error) + grpc.ClientStream +} + +type dBIteratorClient struct { + grpc.ClientStream +} + +func (x *dBIteratorClient) Recv() (*Iterator, error) { + m := new(Iterator) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *dBClient) ReverseIterator(ctx context.Context, in *Entity, opts ...grpc.CallOption) (DB_ReverseIteratorClient, error) { + stream, err := grpc.NewClientStream(ctx, &_DB_serviceDesc.Streams[2], c.cc, "/protodb.DB/reverseIterator", opts...) + if err != nil { + return nil, err + } + x := &dBReverseIteratorClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type DB_ReverseIteratorClient interface { + Recv() (*Iterator, error) + grpc.ClientStream +} + +type dBReverseIteratorClient struct { + grpc.ClientStream +} + +func (x *dBReverseIteratorClient) Recv() (*Iterator, error) { + m := new(Iterator) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *dBClient) Stats(ctx context.Context, in *Nothing, opts ...grpc.CallOption) (*Stats, error) { + out := new(Stats) + err := grpc.Invoke(ctx, "/protodb.DB/stats", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for DB service + +type DBServer interface { + Init(context.Context, *Init) (*Entity, error) + Get(context.Context, *Entity) (*Entity, error) + GetStream(DB_GetStreamServer) error + Has(context.Context, *Entity) (*Entity, error) + Set(context.Context, *Entity) (*Nothing, error) + SetSync(context.Context, *Entity) (*Nothing, error) + Delete(context.Context, *Entity) (*Nothing, error) + DeleteSync(context.Context, *Entity) (*Nothing, error) + Iterator(*Entity, DB_IteratorServer) error + ReverseIterator(*Entity, DB_ReverseIteratorServer) error + // rpc print(Nothing) returns (Entity) {} + Stats(context.Context, *Nothing) (*Stats, error) +} + +func RegisterDBServer(s *grpc.Server, srv DBServer) { + s.RegisterService(&_DB_serviceDesc, srv) +} + +func _DB_Init_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Init) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DBServer).Init(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/protodb.DB/Init", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DBServer).Init(ctx, req.(*Init)) + } + return interceptor(ctx, in, info, handler) +} + +func _DB_Get_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Entity) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DBServer).Get(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/protodb.DB/Get", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DBServer).Get(ctx, req.(*Entity)) + } + return interceptor(ctx, in, info, handler) +} + +func _DB_GetStream_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(DBServer).GetStream(&dBGetStreamServer{stream}) +} + +type DB_GetStreamServer interface { + Send(*Entity) error + Recv() (*Entity, error) + grpc.ServerStream +} + +type dBGetStreamServer struct { + grpc.ServerStream +} + +func (x *dBGetStreamServer) Send(m *Entity) error { + return x.ServerStream.SendMsg(m) +} + +func (x *dBGetStreamServer) Recv() (*Entity, error) { + m := new(Entity) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _DB_Has_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Entity) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DBServer).Has(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/protodb.DB/Has", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DBServer).Has(ctx, req.(*Entity)) + } + return interceptor(ctx, in, info, handler) +} + +func _DB_Set_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Entity) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DBServer).Set(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/protodb.DB/Set", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DBServer).Set(ctx, req.(*Entity)) + } + return interceptor(ctx, in, info, handler) +} + +func _DB_SetSync_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Entity) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DBServer).SetSync(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/protodb.DB/SetSync", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DBServer).SetSync(ctx, req.(*Entity)) + } + return interceptor(ctx, in, info, handler) +} + +func _DB_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Entity) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DBServer).Delete(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/protodb.DB/Delete", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DBServer).Delete(ctx, req.(*Entity)) + } + return interceptor(ctx, in, info, handler) +} + +func _DB_DeleteSync_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Entity) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DBServer).DeleteSync(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/protodb.DB/DeleteSync", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DBServer).DeleteSync(ctx, req.(*Entity)) + } + return interceptor(ctx, in, info, handler) +} + +func _DB_Iterator_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(Entity) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(DBServer).Iterator(m, &dBIteratorServer{stream}) +} + +type DB_IteratorServer interface { + Send(*Iterator) error + grpc.ServerStream +} + +type dBIteratorServer struct { + grpc.ServerStream +} + +func (x *dBIteratorServer) Send(m *Iterator) error { + return x.ServerStream.SendMsg(m) +} + +func _DB_ReverseIterator_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(Entity) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(DBServer).ReverseIterator(m, &dBReverseIteratorServer{stream}) +} + +type DB_ReverseIteratorServer interface { + Send(*Iterator) error + grpc.ServerStream +} + +type dBReverseIteratorServer struct { + grpc.ServerStream +} + +func (x *dBReverseIteratorServer) Send(m *Iterator) error { + return x.ServerStream.SendMsg(m) +} + +func _DB_Stats_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Nothing) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DBServer).Stats(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/protodb.DB/Stats", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DBServer).Stats(ctx, req.(*Nothing)) + } + return interceptor(ctx, in, info, handler) +} + +var _DB_serviceDesc = grpc.ServiceDesc{ + ServiceName: "protodb.DB", + HandlerType: (*DBServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "init", + Handler: _DB_Init_Handler, + }, + { + MethodName: "get", + Handler: _DB_Get_Handler, + }, + { + MethodName: "has", + Handler: _DB_Has_Handler, + }, + { + MethodName: "set", + Handler: _DB_Set_Handler, + }, + { + MethodName: "setSync", + Handler: _DB_SetSync_Handler, + }, + { + MethodName: "delete", + Handler: _DB_Delete_Handler, + }, + { + MethodName: "deleteSync", + Handler: _DB_DeleteSync_Handler, + }, + { + MethodName: "stats", + Handler: _DB_Stats_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "getStream", + Handler: _DB_GetStream_Handler, + ServerStreams: true, + ClientStreams: true, + }, + { + StreamName: "iterator", + Handler: _DB_Iterator_Handler, + ServerStreams: true, + }, + { + StreamName: "reverseIterator", + Handler: _DB_ReverseIterator_Handler, + ServerStreams: true, + }, + }, + Metadata: "defs.proto", +} + +func init() { proto.RegisterFile("defs.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 498 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0xcd, 0x72, 0xd3, 0x4c, + 0x10, 0xf4, 0x4a, 0xb2, 0x64, 0x4d, 0xbe, 0x2f, 0x09, 0x5b, 0x14, 0x6c, 0xf9, 0xa4, 0xd2, 0x49, + 0xfc, 0xb9, 0x12, 0xe7, 0xc0, 0xcf, 0x09, 0x28, 0xe7, 0xe0, 0x4b, 0x0e, 0x32, 0x77, 0x6a, 0x83, + 0x06, 0x67, 0x8b, 0x58, 0x72, 0xed, 0x0e, 0x29, 0xf4, 0x04, 0x3c, 0x00, 0x4f, 0xc4, 0x9b, 0x51, + 0xbb, 0xfa, 0xb1, 0x43, 0x7c, 0x10, 0x27, 0x4d, 0xef, 0x76, 0xf7, 0x8c, 0x5a, 0x23, 0x80, 0x02, + 0xbf, 0x9a, 0xd9, 0x56, 0x57, 0x54, 0xf1, 0xc8, 0x3d, 0x8a, 0xeb, 0xf4, 0x37, 0x83, 0xf0, 0xb2, + 0x24, 0x45, 0x35, 0x3f, 0x06, 0x4f, 0x15, 0x82, 0x25, 0x2c, 0x1b, 0xe7, 0x9e, 0x2a, 0xf8, 0x29, + 0xf8, 0xdf, 0xb0, 0x16, 0x5e, 0xc2, 0xb2, 0xff, 0x72, 0x5b, 0xf2, 0xc7, 0x30, 0xbe, 0x93, 0xb7, + 0xdf, 0x51, 0xf8, 0xee, 0xac, 0x01, 0xfc, 0x09, 0x84, 0xf8, 0x43, 0x19, 0x32, 0x22, 0x48, 0x58, + 0x36, 0xc9, 0x5b, 0x64, 0xd9, 0x86, 0xa4, 0x26, 0x31, 0x6e, 0xd8, 0x0e, 0x58, 0x57, 0x2c, 0x0b, + 0x11, 0x36, 0xae, 0x58, 0xba, 0x3e, 0xa8, 0xb5, 0x88, 0x12, 0x96, 0xc5, 0xb9, 0x2d, 0xad, 0x72, + 0xab, 0x55, 0x49, 0x62, 0xe2, 0xce, 0x1a, 0xc0, 0x9f, 0x42, 0x44, 0x6a, 0x83, 0x9f, 0x25, 0x89, + 0x38, 0x61, 0x99, 0x9f, 0x87, 0x16, 0x7e, 0xa0, 0x34, 0x86, 0xe8, 0xaa, 0xa2, 0x1b, 0x55, 0xae, + 0xd3, 0x73, 0x88, 0x16, 0x8b, 0x6a, 0x23, 0x55, 0xb9, 0x6b, 0xcf, 0x0e, 0xb4, 0xf7, 0xfa, 0xf6, + 0xa9, 0x86, 0xc9, 0x92, 0x50, 0x4b, 0xaa, 0x34, 0xcf, 0x20, 0x2c, 0x9c, 0xda, 0x89, 0x8e, 0xe6, + 0xa7, 0xb3, 0x36, 0xa7, 0x59, 0xeb, 0x9a, 0xb7, 0xf7, 0x6d, 0x14, 0xaa, 0x71, 0x9a, 0xe4, 0x0d, + 0xe8, 0x22, 0xf3, 0x0f, 0x44, 0x16, 0xec, 0x45, 0x96, 0xfe, 0x64, 0x30, 0x5e, 0x91, 0x24, 0xc3, + 0x5f, 0x42, 0x50, 0x48, 0x92, 0x82, 0x25, 0x7e, 0x76, 0x34, 0x17, 0x7d, 0x3f, 0x77, 0x3b, 0x5b, + 0x48, 0x92, 0x97, 0x25, 0xe9, 0x3a, 0x77, 0xac, 0xfd, 0x08, 0xbc, 0xfd, 0x08, 0xa6, 0xaf, 0x21, + 0xee, 0xb9, 0xdd, 0x14, 0xac, 0x09, 0xf4, 0xde, 0x14, 0x5e, 0x13, 0xa8, 0x03, 0xef, 0xbc, 0x37, + 0x2c, 0x7d, 0x0f, 0xc1, 0xb2, 0x54, 0xc4, 0x39, 0x04, 0x9f, 0xea, 0x2d, 0xb6, 0x22, 0x57, 0xdb, + 0xb3, 0x2b, 0xb9, 0xe9, 0x44, 0xae, 0xb6, 0xde, 0x0b, 0xa5, 0xdd, 0x1b, 0xc6, 0xb9, 0x2d, 0xe7, + 0xbf, 0x02, 0xf0, 0x16, 0x1f, 0x79, 0x06, 0x81, 0xb2, 0x46, 0xff, 0xf7, 0xaf, 0x60, 0x7d, 0xa7, + 0x27, 0x3d, 0x6c, 0xb6, 0x2c, 0x1d, 0xf1, 0x67, 0xe0, 0xaf, 0x91, 0xf8, 0xdf, 0x37, 0x87, 0xa8, + 0x17, 0x10, 0xaf, 0x91, 0x56, 0xa4, 0x51, 0x6e, 0x86, 0x08, 0x32, 0x76, 0xc6, 0xac, 0xff, 0x8d, + 0x34, 0x83, 0xfc, 0x9f, 0x83, 0x6f, 0x0e, 0x8d, 0xb2, 0xfb, 0xee, 0xdd, 0x62, 0x8d, 0xf8, 0x0c, + 0x22, 0x83, 0xb4, 0xaa, 0xcb, 0x2f, 0xc3, 0xf8, 0xaf, 0x20, 0x2c, 0xf0, 0x16, 0x09, 0x87, 0xd1, + 0xcf, 0xed, 0xff, 0x69, 0xe9, 0xc3, 0x3b, 0xcc, 0x61, 0xa2, 0xba, 0xcd, 0x7d, 0x20, 0x78, 0xb4, + 0xfb, 0x0e, 0x2d, 0x27, 0x1d, 0x9d, 0x31, 0xfe, 0x16, 0x4e, 0x34, 0xde, 0xa1, 0x36, 0xb8, 0xfc, + 0x57, 0xe9, 0x0b, 0xf7, 0x43, 0x91, 0xe1, 0x0f, 0x66, 0x99, 0x1e, 0xdf, 0xdf, 0xdb, 0x74, 0x74, + 0x1d, 0xba, 0x83, 0x8b, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0xf4, 0x2e, 0x77, 0x07, 0x75, 0x04, + 0x00, 0x00, +} diff --git a/proto/defs.proto b/proto/defs.proto new file mode 100644 index 000000000..c203fd1eb --- /dev/null +++ b/proto/defs.proto @@ -0,0 +1,57 @@ +syntax = "proto3"; + +package protodb; + +message Entity { + int32 id = 1; + bytes key = 2; + bytes value = 3; + bool exists = 4; + bytes start = 5; + bytes end = 6; + string err = 7; + string print = 8; + int64 time_at = 9; +} + +message Nothing { +} + +message DDomain { + bytes start = 1; + bytes end = 2; +} + +message Iterator { + DDomain domain = 1; + bool valid = 2; + bytes key = 3; + bytes value = 4; +} + +message Stats { + map data = 1; + int64 time_at = 2; +} + +message Init { + string Type = 1; + string Name = 2; + string Dir = 3; +} + +service DB { + rpc init(Init) returns (Entity) {} + rpc get(Entity) returns (Entity) {} + rpc getStream(stream Entity) returns (stream Entity) {} + + rpc has(Entity) returns (Entity) {} + rpc set(Entity) returns (Nothing) {} + rpc setSync(Entity) returns (Nothing) {} + rpc delete(Entity) returns (Nothing) {} + rpc deleteSync(Entity) returns (Nothing) {} + rpc iterator(Entity) returns (stream Iterator) {} + rpc reverseIterator(Entity) returns (stream Iterator) {} + // rpc print(Nothing) returns (Entity) {} + rpc stats(Nothing) returns (Stats) {} +}