From 11bee6194aeff40a7b3d62123db1a7cbca2e81e7 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Mon, 12 Mar 2018 00:39:34 -0700 Subject: [PATCH 01/10] DB as a service Fixes https://github.com/tendermint/tendermint/issues/1162 Databases as a service! Can now access Databases as a remote service via gRPC for performance and easy deployment. The caveat is that each service is stateful in regards to the DB i.e. each unique service uses only one unique DB but nonetheless multiple clients can access it. A full standalone example ```go package main import ( "bytes" "context" "log" grpcdb "github.com/tendermint/tmlibs/grpcdb" protodb "github.com/tendermint/tmlibs/proto" ) func main() { addr := ":8998" go func() { if err := grpcdb.BindRemoteDBServer(addr); err != nil { log.Fatalf("BindRemoteDBServer: %v", err) } }() client, err := grpcdb.NewClient(addr, false) if err != nil { log.Fatalf("Failed to create grpcDB client: %v", err) } ctx := context.Background() // 1. Initialize the DB in := &protodb.Init{ Type: "leveldb", Name: "grpc-uno-test", Dir: ".", } if _, err := client.Init(ctx, in); err != nil { log.Fatalf("Init error: %v", err) } // 2. Now it can be used! query1 := &protodb.Entity{Key: []byte("Project"), Value: []byte("Tmlibs-on-gRPC")} if _, err := client.SetSync(ctx, query1); err != nil { log.Fatalf("SetSync err: %v", err) } query2 := &protodb.Entity{Key: []byte("Project")} read, err := client.Get(ctx, query2) if err != nil { log.Fatalf("Get err: %v", err) } if g, w := read.Value, []byte("Tmlibs-on-gRPC"); !bytes.Equal(g, w) { log.Fatalf("got= (%q ==> % X)\nwant=(%q ==> % X)", g, g, w, w) } } ``` --- Makefile | 3 + grpcdb/client.go | 19 + grpcdb/example_test.go | 50 +++ grpcdb/server.go | 142 ++++++++ proto/defs.pb.go | 784 +++++++++++++++++++++++++++++++++++++++++ proto/defs.proto | 57 +++ 6 files changed, 1055 insertions(+) create mode 100644 grpcdb/client.go create mode 100644 grpcdb/example_test.go create mode 100644 grpcdb/server.go create mode 100644 proto/defs.pb.go create mode 100644 proto/defs.proto diff --git a/Makefile b/Makefile index 9e181f9f9..0236c480b 100644 --- a/Makefile +++ b/Makefile @@ -119,3 +119,6 @@ metalinter_all: # unless there is a reason not to. # https://www.gnu.org/software/make/manual/html_node/Phony-Targets.html .PHONY: check protoc build check_tools get_tools get_protoc update_tools get_vendor_deps test fmt metalinter metalinter_all + +grpc_dbserver: + protoc -I proto/ proto/defs.proto --go_out=plugins=grpc:proto diff --git a/grpcdb/client.go b/grpcdb/client.go new file mode 100644 index 000000000..45409a1f9 --- /dev/null +++ b/grpcdb/client.go @@ -0,0 +1,19 @@ +package grpcdb + +import ( + "google.golang.org/grpc" + + protodb "github.com/tendermint/tmlibs/proto" +) + +func NewClient(serverAddr string, secure bool) (protodb.DBClient, error) { + var opts []grpc.DialOption + if !secure { + opts = append(opts, grpc.WithInsecure()) + } + cc, err := grpc.Dial(serverAddr, opts...) + if err != nil { + return nil, err + } + return protodb.NewDBClient(cc), nil +} diff --git a/grpcdb/example_test.go b/grpcdb/example_test.go new file mode 100644 index 000000000..653180113 --- /dev/null +++ b/grpcdb/example_test.go @@ -0,0 +1,50 @@ +package grpcdb_test + +import ( + "bytes" + "context" + "log" + + grpcdb "github.com/tendermint/tmlibs/grpcdb" + protodb "github.com/tendermint/tmlibs/proto" +) + +func Example() { + addr := ":8998" + go func() { + if err := grpcdb.BindRemoteDBServer(addr); err != nil { + log.Fatalf("BindRemoteDBServer: %v", err) + } + }() + + client, err := grpcdb.NewClient(addr, false) + if err != nil { + log.Fatalf("Failed to create grpcDB client: %v", err) + } + + ctx := context.Background() + // 1. Initialize the DB + in := &protodb.Init{ + Type: "leveldb", + Name: "grpc-uno-test", + Dir: ".", + } + if _, err := client.Init(ctx, in); err != nil { + log.Fatalf("Init error: %v", err) + } + + // 2. Now it can be used! + query1 := &protodb.Entity{Key: []byte("Project"), Value: []byte("Tmlibs-on-gRPC")} + if _, err := client.SetSync(ctx, query1); err != nil { + log.Fatalf("SetSync err: %v", err) + } + + query2 := &protodb.Entity{Key: []byte("Project")} + read, err := client.Get(ctx, query2) + if err != nil { + log.Fatalf("Get err: %v", err) + } + if g, w := read.Value, []byte("Tmlibs-on-gRPC"); !bytes.Equal(g, w) { + log.Fatalf("got= (%q ==> % X)\nwant=(%q ==> % X)", g, g, w, w) + } +} diff --git a/grpcdb/server.go b/grpcdb/server.go new file mode 100644 index 000000000..26d0ffa9e --- /dev/null +++ b/grpcdb/server.go @@ -0,0 +1,142 @@ +package grpcdb + +import ( +"log" + "context" + "net" + "sync" + "time" + + "google.golang.org/grpc" + + "github.com/tendermint/tmlibs/db" + protodb "github.com/tendermint/tmlibs/proto" +) + +func BindRemoteDBServer(addr string, opts ...grpc.ServerOption) error { + ln, err := net.Listen("tcp", addr) + if err != nil { + return err + } + srv := grpc.NewServer(opts...) + protodb.RegisterDBServer(srv, new(server)) + return srv.Serve(ln) +} + +type server struct { + mu sync.Mutex + db db.DB +} + +var _ protodb.DBServer = (*server)(nil) + +func (s *server) Init(ctx context.Context, in *protodb.Init) (*protodb.Entity, error) { + s.mu.Lock() + defer s.mu.Unlock() + +log.Printf("in: %+v\n", in) + s.db = db.NewDB(in.Name, db.DBBackendType(in.Type), in.Dir) + return &protodb.Entity{TimeAt: time.Now().Unix()}, nil +} + +func (s *server) Delete(ctx context.Context, in *protodb.Entity) (*protodb.Nothing, error) { + s.db.Delete(in.Key) + return nothing, nil +} + +var nothing = new(protodb.Nothing) +func (s *server) DeleteSync(ctx context.Context, in *protodb.Entity) (*protodb.Nothing, error) { + s.db.DeleteSync(in.Key) + return nothing, nil +} + +func (s *server) Get(ctx context.Context, in *protodb.Entity) (*protodb.Entity, error) { + value := s.db.Get(in.Key) + return &protodb.Entity{Value: value}, nil +} + +func (s *server) GetStream(ds protodb.DB_GetStreamServer) error { + // Receive routine + responsesChan := make(chan *protodb.Entity) + go func() { + defer close(responsesChan) + ctx := context.Background() + for { + in, err := ds.Recv() + if err != nil { + responsesChan <- &protodb.Entity{Err: err.Error()} + return + } + out, err := s.Get(ctx, in) + if err != nil { + if out == nil { + out = new(protodb.Entity) + out.Key = in.Key + } + out.Err = err.Error() + responsesChan <- out + return + } + + // Otherwise continue on + responsesChan <- out + } + }() + + // Send routine, block until we return + for out := range responsesChan { + if err := ds.Send(out); err != nil { + return err + } + } + return nil +} + +func (s *server) Has(ctx context.Context, in *protodb.Entity) (*protodb.Entity, error) { + exists := s.db.Has(in.Key) + return &protodb.Entity{Exists: exists}, nil +} + +func (s *server) Set(ctx context.Context, in *protodb.Entity) (*protodb.Nothing, error) { + s.db.Set(in.Key, in.Value) + return nothing, nil +} + +func (s *server) SetSync(ctx context.Context, in *protodb.Entity) (*protodb.Nothing, error) { + s.db.SetSync(in.Key, in.Value) + return nothing, nil +} + +func (s *server) Iterator(query *protodb.Entity, dis protodb.DB_IteratorServer) error { + it := s.db.Iterator(query.Start, query.End) + return s.handleIterator(it, dis.Send) +} + +func (s *server) handleIterator(it db.Iterator, sendFunc func(*protodb.Iterator) error) error { + for it.Valid() { + start, end := it.Domain() + out := &protodb.Iterator{ + Domain: &protodb.DDomain{Start: start, End: end}, + Valid: it.Valid(), + Key: it.Key(), + Value: it.Value(), + } + if err := sendFunc(out); err != nil { + return err + } + + // Finally move the iterator forward + it.Next() + } + return nil +} + +func (s *server) ReverseIterator(query *protodb.Entity, dis protodb.DB_ReverseIteratorServer) error { + it := s.db.ReverseIterator(query.Start, query.End) + return s.handleIterator(it, dis.Send) +} + +func (s *server) Stats(context.Context, *protodb.Nothing) (*protodb.Stats, error) { + stats := s.db.Stats() + return &protodb.Stats{Data: stats, TimeAt: time.Now().Unix()}, nil +} diff --git a/proto/defs.pb.go b/proto/defs.pb.go new file mode 100644 index 000000000..61f687504 --- /dev/null +++ b/proto/defs.pb.go @@ -0,0 +1,784 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: defs.proto + +/* +Package protodb is a generated protocol buffer package. + +It is generated from these files: + defs.proto + +It has these top-level messages: + Entity + Nothing + DDomain + Iterator + Stats + Init +*/ +package protodb + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type Entity struct { + Id int32 `protobuf:"varint,1,opt,name=id" json:"id,omitempty"` + Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` + Value []byte `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"` + Exists bool `protobuf:"varint,4,opt,name=exists" json:"exists,omitempty"` + Start []byte `protobuf:"bytes,5,opt,name=start,proto3" json:"start,omitempty"` + End []byte `protobuf:"bytes,6,opt,name=end,proto3" json:"end,omitempty"` + Err string `protobuf:"bytes,7,opt,name=err" json:"err,omitempty"` + Print string `protobuf:"bytes,8,opt,name=print" json:"print,omitempty"` + TimeAt int64 `protobuf:"varint,9,opt,name=time_at,json=timeAt" json:"time_at,omitempty"` +} + +func (m *Entity) Reset() { *m = Entity{} } +func (m *Entity) String() string { return proto.CompactTextString(m) } +func (*Entity) ProtoMessage() {} +func (*Entity) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *Entity) GetId() int32 { + if m != nil { + return m.Id + } + return 0 +} + +func (m *Entity) GetKey() []byte { + if m != nil { + return m.Key + } + return nil +} + +func (m *Entity) GetValue() []byte { + if m != nil { + return m.Value + } + return nil +} + +func (m *Entity) GetExists() bool { + if m != nil { + return m.Exists + } + return false +} + +func (m *Entity) GetStart() []byte { + if m != nil { + return m.Start + } + return nil +} + +func (m *Entity) GetEnd() []byte { + if m != nil { + return m.End + } + return nil +} + +func (m *Entity) GetErr() string { + if m != nil { + return m.Err + } + return "" +} + +func (m *Entity) GetPrint() string { + if m != nil { + return m.Print + } + return "" +} + +func (m *Entity) GetTimeAt() int64 { + if m != nil { + return m.TimeAt + } + return 0 +} + +type Nothing struct { +} + +func (m *Nothing) Reset() { *m = Nothing{} } +func (m *Nothing) String() string { return proto.CompactTextString(m) } +func (*Nothing) ProtoMessage() {} +func (*Nothing) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +type DDomain struct { + Start []byte `protobuf:"bytes,1,opt,name=start,proto3" json:"start,omitempty"` + End []byte `protobuf:"bytes,2,opt,name=end,proto3" json:"end,omitempty"` +} + +func (m *DDomain) Reset() { *m = DDomain{} } +func (m *DDomain) String() string { return proto.CompactTextString(m) } +func (*DDomain) ProtoMessage() {} +func (*DDomain) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +func (m *DDomain) GetStart() []byte { + if m != nil { + return m.Start + } + return nil +} + +func (m *DDomain) GetEnd() []byte { + if m != nil { + return m.End + } + return nil +} + +type Iterator struct { + Domain *DDomain `protobuf:"bytes,1,opt,name=domain" json:"domain,omitempty"` + Valid bool `protobuf:"varint,2,opt,name=valid" json:"valid,omitempty"` + Key []byte `protobuf:"bytes,3,opt,name=key,proto3" json:"key,omitempty"` + Value []byte `protobuf:"bytes,4,opt,name=value,proto3" json:"value,omitempty"` +} + +func (m *Iterator) Reset() { *m = Iterator{} } +func (m *Iterator) String() string { return proto.CompactTextString(m) } +func (*Iterator) ProtoMessage() {} +func (*Iterator) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } + +func (m *Iterator) GetDomain() *DDomain { + if m != nil { + return m.Domain + } + return nil +} + +func (m *Iterator) GetValid() bool { + if m != nil { + return m.Valid + } + return false +} + +func (m *Iterator) GetKey() []byte { + if m != nil { + return m.Key + } + return nil +} + +func (m *Iterator) GetValue() []byte { + if m != nil { + return m.Value + } + return nil +} + +type Stats struct { + Data map[string]string `protobuf:"bytes,1,rep,name=data" json:"data,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + TimeAt int64 `protobuf:"varint,2,opt,name=time_at,json=timeAt" json:"time_at,omitempty"` +} + +func (m *Stats) Reset() { *m = Stats{} } +func (m *Stats) String() string { return proto.CompactTextString(m) } +func (*Stats) ProtoMessage() {} +func (*Stats) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } + +func (m *Stats) GetData() map[string]string { + if m != nil { + return m.Data + } + return nil +} + +func (m *Stats) GetTimeAt() int64 { + if m != nil { + return m.TimeAt + } + return 0 +} + +type Init struct { + Type string `protobuf:"bytes,1,opt,name=Type" json:"Type,omitempty"` + Name string `protobuf:"bytes,2,opt,name=Name" json:"Name,omitempty"` + Dir string `protobuf:"bytes,3,opt,name=Dir" json:"Dir,omitempty"` +} + +func (m *Init) Reset() { *m = Init{} } +func (m *Init) String() string { return proto.CompactTextString(m) } +func (*Init) ProtoMessage() {} +func (*Init) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } + +func (m *Init) GetType() string { + if m != nil { + return m.Type + } + return "" +} + +func (m *Init) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *Init) GetDir() string { + if m != nil { + return m.Dir + } + return "" +} + +func init() { + proto.RegisterType((*Entity)(nil), "protodb.Entity") + proto.RegisterType((*Nothing)(nil), "protodb.Nothing") + proto.RegisterType((*DDomain)(nil), "protodb.DDomain") + proto.RegisterType((*Iterator)(nil), "protodb.Iterator") + proto.RegisterType((*Stats)(nil), "protodb.Stats") + proto.RegisterType((*Init)(nil), "protodb.Init") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// Client API for DB service + +type DBClient interface { + Init(ctx context.Context, in *Init, opts ...grpc.CallOption) (*Entity, error) + Get(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Entity, error) + GetStream(ctx context.Context, opts ...grpc.CallOption) (DB_GetStreamClient, error) + Has(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Entity, error) + Set(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Nothing, error) + SetSync(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Nothing, error) + Delete(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Nothing, error) + DeleteSync(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Nothing, error) + Iterator(ctx context.Context, in *Entity, opts ...grpc.CallOption) (DB_IteratorClient, error) + ReverseIterator(ctx context.Context, in *Entity, opts ...grpc.CallOption) (DB_ReverseIteratorClient, error) + // rpc print(Nothing) returns (Entity) {} + Stats(ctx context.Context, in *Nothing, opts ...grpc.CallOption) (*Stats, error) +} + +type dBClient struct { + cc *grpc.ClientConn +} + +func NewDBClient(cc *grpc.ClientConn) DBClient { + return &dBClient{cc} +} + +func (c *dBClient) Init(ctx context.Context, in *Init, opts ...grpc.CallOption) (*Entity, error) { + out := new(Entity) + err := grpc.Invoke(ctx, "/protodb.DB/init", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *dBClient) Get(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Entity, error) { + out := new(Entity) + err := grpc.Invoke(ctx, "/protodb.DB/get", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *dBClient) GetStream(ctx context.Context, opts ...grpc.CallOption) (DB_GetStreamClient, error) { + stream, err := grpc.NewClientStream(ctx, &_DB_serviceDesc.Streams[0], c.cc, "/protodb.DB/getStream", opts...) + if err != nil { + return nil, err + } + x := &dBGetStreamClient{stream} + return x, nil +} + +type DB_GetStreamClient interface { + Send(*Entity) error + Recv() (*Entity, error) + grpc.ClientStream +} + +type dBGetStreamClient struct { + grpc.ClientStream +} + +func (x *dBGetStreamClient) Send(m *Entity) error { + return x.ClientStream.SendMsg(m) +} + +func (x *dBGetStreamClient) Recv() (*Entity, error) { + m := new(Entity) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *dBClient) Has(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Entity, error) { + out := new(Entity) + err := grpc.Invoke(ctx, "/protodb.DB/has", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *dBClient) Set(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Nothing, error) { + out := new(Nothing) + err := grpc.Invoke(ctx, "/protodb.DB/set", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *dBClient) SetSync(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Nothing, error) { + out := new(Nothing) + err := grpc.Invoke(ctx, "/protodb.DB/setSync", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *dBClient) Delete(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Nothing, error) { + out := new(Nothing) + err := grpc.Invoke(ctx, "/protodb.DB/delete", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *dBClient) DeleteSync(ctx context.Context, in *Entity, opts ...grpc.CallOption) (*Nothing, error) { + out := new(Nothing) + err := grpc.Invoke(ctx, "/protodb.DB/deleteSync", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *dBClient) Iterator(ctx context.Context, in *Entity, opts ...grpc.CallOption) (DB_IteratorClient, error) { + stream, err := grpc.NewClientStream(ctx, &_DB_serviceDesc.Streams[1], c.cc, "/protodb.DB/iterator", opts...) + if err != nil { + return nil, err + } + x := &dBIteratorClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type DB_IteratorClient interface { + Recv() (*Iterator, error) + grpc.ClientStream +} + +type dBIteratorClient struct { + grpc.ClientStream +} + +func (x *dBIteratorClient) Recv() (*Iterator, error) { + m := new(Iterator) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *dBClient) ReverseIterator(ctx context.Context, in *Entity, opts ...grpc.CallOption) (DB_ReverseIteratorClient, error) { + stream, err := grpc.NewClientStream(ctx, &_DB_serviceDesc.Streams[2], c.cc, "/protodb.DB/reverseIterator", opts...) + if err != nil { + return nil, err + } + x := &dBReverseIteratorClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type DB_ReverseIteratorClient interface { + Recv() (*Iterator, error) + grpc.ClientStream +} + +type dBReverseIteratorClient struct { + grpc.ClientStream +} + +func (x *dBReverseIteratorClient) Recv() (*Iterator, error) { + m := new(Iterator) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *dBClient) Stats(ctx context.Context, in *Nothing, opts ...grpc.CallOption) (*Stats, error) { + out := new(Stats) + err := grpc.Invoke(ctx, "/protodb.DB/stats", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for DB service + +type DBServer interface { + Init(context.Context, *Init) (*Entity, error) + Get(context.Context, *Entity) (*Entity, error) + GetStream(DB_GetStreamServer) error + Has(context.Context, *Entity) (*Entity, error) + Set(context.Context, *Entity) (*Nothing, error) + SetSync(context.Context, *Entity) (*Nothing, error) + Delete(context.Context, *Entity) (*Nothing, error) + DeleteSync(context.Context, *Entity) (*Nothing, error) + Iterator(*Entity, DB_IteratorServer) error + ReverseIterator(*Entity, DB_ReverseIteratorServer) error + // rpc print(Nothing) returns (Entity) {} + Stats(context.Context, *Nothing) (*Stats, error) +} + +func RegisterDBServer(s *grpc.Server, srv DBServer) { + s.RegisterService(&_DB_serviceDesc, srv) +} + +func _DB_Init_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Init) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DBServer).Init(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/protodb.DB/Init", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DBServer).Init(ctx, req.(*Init)) + } + return interceptor(ctx, in, info, handler) +} + +func _DB_Get_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Entity) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DBServer).Get(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/protodb.DB/Get", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DBServer).Get(ctx, req.(*Entity)) + } + return interceptor(ctx, in, info, handler) +} + +func _DB_GetStream_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(DBServer).GetStream(&dBGetStreamServer{stream}) +} + +type DB_GetStreamServer interface { + Send(*Entity) error + Recv() (*Entity, error) + grpc.ServerStream +} + +type dBGetStreamServer struct { + grpc.ServerStream +} + +func (x *dBGetStreamServer) Send(m *Entity) error { + return x.ServerStream.SendMsg(m) +} + +func (x *dBGetStreamServer) Recv() (*Entity, error) { + m := new(Entity) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _DB_Has_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Entity) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DBServer).Has(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/protodb.DB/Has", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DBServer).Has(ctx, req.(*Entity)) + } + return interceptor(ctx, in, info, handler) +} + +func _DB_Set_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Entity) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DBServer).Set(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/protodb.DB/Set", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DBServer).Set(ctx, req.(*Entity)) + } + return interceptor(ctx, in, info, handler) +} + +func _DB_SetSync_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Entity) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DBServer).SetSync(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/protodb.DB/SetSync", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DBServer).SetSync(ctx, req.(*Entity)) + } + return interceptor(ctx, in, info, handler) +} + +func _DB_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Entity) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DBServer).Delete(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/protodb.DB/Delete", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DBServer).Delete(ctx, req.(*Entity)) + } + return interceptor(ctx, in, info, handler) +} + +func _DB_DeleteSync_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Entity) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DBServer).DeleteSync(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/protodb.DB/DeleteSync", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DBServer).DeleteSync(ctx, req.(*Entity)) + } + return interceptor(ctx, in, info, handler) +} + +func _DB_Iterator_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(Entity) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(DBServer).Iterator(m, &dBIteratorServer{stream}) +} + +type DB_IteratorServer interface { + Send(*Iterator) error + grpc.ServerStream +} + +type dBIteratorServer struct { + grpc.ServerStream +} + +func (x *dBIteratorServer) Send(m *Iterator) error { + return x.ServerStream.SendMsg(m) +} + +func _DB_ReverseIterator_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(Entity) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(DBServer).ReverseIterator(m, &dBReverseIteratorServer{stream}) +} + +type DB_ReverseIteratorServer interface { + Send(*Iterator) error + grpc.ServerStream +} + +type dBReverseIteratorServer struct { + grpc.ServerStream +} + +func (x *dBReverseIteratorServer) Send(m *Iterator) error { + return x.ServerStream.SendMsg(m) +} + +func _DB_Stats_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Nothing) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DBServer).Stats(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/protodb.DB/Stats", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DBServer).Stats(ctx, req.(*Nothing)) + } + return interceptor(ctx, in, info, handler) +} + +var _DB_serviceDesc = grpc.ServiceDesc{ + ServiceName: "protodb.DB", + HandlerType: (*DBServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "init", + Handler: _DB_Init_Handler, + }, + { + MethodName: "get", + Handler: _DB_Get_Handler, + }, + { + MethodName: "has", + Handler: _DB_Has_Handler, + }, + { + MethodName: "set", + Handler: _DB_Set_Handler, + }, + { + MethodName: "setSync", + Handler: _DB_SetSync_Handler, + }, + { + MethodName: "delete", + Handler: _DB_Delete_Handler, + }, + { + MethodName: "deleteSync", + Handler: _DB_DeleteSync_Handler, + }, + { + MethodName: "stats", + Handler: _DB_Stats_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "getStream", + Handler: _DB_GetStream_Handler, + ServerStreams: true, + ClientStreams: true, + }, + { + StreamName: "iterator", + Handler: _DB_Iterator_Handler, + ServerStreams: true, + }, + { + StreamName: "reverseIterator", + Handler: _DB_ReverseIterator_Handler, + ServerStreams: true, + }, + }, + Metadata: "defs.proto", +} + +func init() { proto.RegisterFile("defs.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 498 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0xcd, 0x72, 0xd3, 0x4c, + 0x10, 0xf4, 0x4a, 0xb2, 0x64, 0x4d, 0xbe, 0x2f, 0x09, 0x5b, 0x14, 0x6c, 0xf9, 0xa4, 0xd2, 0x49, + 0xfc, 0xb9, 0x12, 0xe7, 0xc0, 0xcf, 0x09, 0x28, 0xe7, 0xe0, 0x4b, 0x0e, 0x32, 0x77, 0x6a, 0x83, + 0x06, 0x67, 0x8b, 0x58, 0x72, 0xed, 0x0e, 0x29, 0xf4, 0x04, 0x3c, 0x00, 0x4f, 0xc4, 0x9b, 0x51, + 0xbb, 0xfa, 0xb1, 0x43, 0x7c, 0x10, 0x27, 0x4d, 0xef, 0x76, 0xf7, 0x8c, 0x5a, 0x23, 0x80, 0x02, + 0xbf, 0x9a, 0xd9, 0x56, 0x57, 0x54, 0xf1, 0xc8, 0x3d, 0x8a, 0xeb, 0xf4, 0x37, 0x83, 0xf0, 0xb2, + 0x24, 0x45, 0x35, 0x3f, 0x06, 0x4f, 0x15, 0x82, 0x25, 0x2c, 0x1b, 0xe7, 0x9e, 0x2a, 0xf8, 0x29, + 0xf8, 0xdf, 0xb0, 0x16, 0x5e, 0xc2, 0xb2, 0xff, 0x72, 0x5b, 0xf2, 0xc7, 0x30, 0xbe, 0x93, 0xb7, + 0xdf, 0x51, 0xf8, 0xee, 0xac, 0x01, 0xfc, 0x09, 0x84, 0xf8, 0x43, 0x19, 0x32, 0x22, 0x48, 0x58, + 0x36, 0xc9, 0x5b, 0x64, 0xd9, 0x86, 0xa4, 0x26, 0x31, 0x6e, 0xd8, 0x0e, 0x58, 0x57, 0x2c, 0x0b, + 0x11, 0x36, 0xae, 0x58, 0xba, 0x3e, 0xa8, 0xb5, 0x88, 0x12, 0x96, 0xc5, 0xb9, 0x2d, 0xad, 0x72, + 0xab, 0x55, 0x49, 0x62, 0xe2, 0xce, 0x1a, 0xc0, 0x9f, 0x42, 0x44, 0x6a, 0x83, 0x9f, 0x25, 0x89, + 0x38, 0x61, 0x99, 0x9f, 0x87, 0x16, 0x7e, 0xa0, 0x34, 0x86, 0xe8, 0xaa, 0xa2, 0x1b, 0x55, 0xae, + 0xd3, 0x73, 0x88, 0x16, 0x8b, 0x6a, 0x23, 0x55, 0xb9, 0x6b, 0xcf, 0x0e, 0xb4, 0xf7, 0xfa, 0xf6, + 0xa9, 0x86, 0xc9, 0x92, 0x50, 0x4b, 0xaa, 0x34, 0xcf, 0x20, 0x2c, 0x9c, 0xda, 0x89, 0x8e, 0xe6, + 0xa7, 0xb3, 0x36, 0xa7, 0x59, 0xeb, 0x9a, 0xb7, 0xf7, 0x6d, 0x14, 0xaa, 0x71, 0x9a, 0xe4, 0x0d, + 0xe8, 0x22, 0xf3, 0x0f, 0x44, 0x16, 0xec, 0x45, 0x96, 0xfe, 0x64, 0x30, 0x5e, 0x91, 0x24, 0xc3, + 0x5f, 0x42, 0x50, 0x48, 0x92, 0x82, 0x25, 0x7e, 0x76, 0x34, 0x17, 0x7d, 0x3f, 0x77, 0x3b, 0x5b, + 0x48, 0x92, 0x97, 0x25, 0xe9, 0x3a, 0x77, 0xac, 0xfd, 0x08, 0xbc, 0xfd, 0x08, 0xa6, 0xaf, 0x21, + 0xee, 0xb9, 0xdd, 0x14, 0xac, 0x09, 0xf4, 0xde, 0x14, 0x5e, 0x13, 0xa8, 0x03, 0xef, 0xbc, 0x37, + 0x2c, 0x7d, 0x0f, 0xc1, 0xb2, 0x54, 0xc4, 0x39, 0x04, 0x9f, 0xea, 0x2d, 0xb6, 0x22, 0x57, 0xdb, + 0xb3, 0x2b, 0xb9, 0xe9, 0x44, 0xae, 0xb6, 0xde, 0x0b, 0xa5, 0xdd, 0x1b, 0xc6, 0xb9, 0x2d, 0xe7, + 0xbf, 0x02, 0xf0, 0x16, 0x1f, 0x79, 0x06, 0x81, 0xb2, 0x46, 0xff, 0xf7, 0xaf, 0x60, 0x7d, 0xa7, + 0x27, 0x3d, 0x6c, 0xb6, 0x2c, 0x1d, 0xf1, 0x67, 0xe0, 0xaf, 0x91, 0xf8, 0xdf, 0x37, 0x87, 0xa8, + 0x17, 0x10, 0xaf, 0x91, 0x56, 0xa4, 0x51, 0x6e, 0x86, 0x08, 0x32, 0x76, 0xc6, 0xac, 0xff, 0x8d, + 0x34, 0x83, 0xfc, 0x9f, 0x83, 0x6f, 0x0e, 0x8d, 0xb2, 0xfb, 0xee, 0xdd, 0x62, 0x8d, 0xf8, 0x0c, + 0x22, 0x83, 0xb4, 0xaa, 0xcb, 0x2f, 0xc3, 0xf8, 0xaf, 0x20, 0x2c, 0xf0, 0x16, 0x09, 0x87, 0xd1, + 0xcf, 0xed, 0xff, 0x69, 0xe9, 0xc3, 0x3b, 0xcc, 0x61, 0xa2, 0xba, 0xcd, 0x7d, 0x20, 0x78, 0xb4, + 0xfb, 0x0e, 0x2d, 0x27, 0x1d, 0x9d, 0x31, 0xfe, 0x16, 0x4e, 0x34, 0xde, 0xa1, 0x36, 0xb8, 0xfc, + 0x57, 0xe9, 0x0b, 0xf7, 0x43, 0x91, 0xe1, 0x0f, 0x66, 0x99, 0x1e, 0xdf, 0xdf, 0xdb, 0x74, 0x74, + 0x1d, 0xba, 0x83, 0x8b, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0xf4, 0x2e, 0x77, 0x07, 0x75, 0x04, + 0x00, 0x00, +} diff --git a/proto/defs.proto b/proto/defs.proto new file mode 100644 index 000000000..c203fd1eb --- /dev/null +++ b/proto/defs.proto @@ -0,0 +1,57 @@ +syntax = "proto3"; + +package protodb; + +message Entity { + int32 id = 1; + bytes key = 2; + bytes value = 3; + bool exists = 4; + bytes start = 5; + bytes end = 6; + string err = 7; + string print = 8; + int64 time_at = 9; +} + +message Nothing { +} + +message DDomain { + bytes start = 1; + bytes end = 2; +} + +message Iterator { + DDomain domain = 1; + bool valid = 2; + bytes key = 3; + bytes value = 4; +} + +message Stats { + map data = 1; + int64 time_at = 2; +} + +message Init { + string Type = 1; + string Name = 2; + string Dir = 3; +} + +service DB { + rpc init(Init) returns (Entity) {} + rpc get(Entity) returns (Entity) {} + rpc getStream(stream Entity) returns (stream Entity) {} + + rpc has(Entity) returns (Entity) {} + rpc set(Entity) returns (Nothing) {} + rpc setSync(Entity) returns (Nothing) {} + rpc delete(Entity) returns (Nothing) {} + rpc deleteSync(Entity) returns (Nothing) {} + rpc iterator(Entity) returns (stream Iterator) {} + rpc reverseIterator(Entity) returns (stream Iterator) {} + // rpc print(Nothing) returns (Entity) {} + rpc stats(Nothing) returns (Stats) {} +} From 1260b75f6341088a1253c18059f0bb34527179dd Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Fri, 16 Mar 2018 14:54:15 -0700 Subject: [PATCH 02/10] grpcdb: Better readability for docs and constructor names * Added some docs for NewClient, BindServer, *server.Init * Security level clarified, whether "secure" for https or "insecure" for non-https gRPC connections. --- grpcdb/client.go | 14 ++++++++++++-- grpcdb/example_test.go | 6 +++--- grpcdb/server.go | 23 ++++++++++++++++++++--- 3 files changed, 35 insertions(+), 8 deletions(-) diff --git a/grpcdb/client.go b/grpcdb/client.go index 45409a1f9..a09720abc 100644 --- a/grpcdb/client.go +++ b/grpcdb/client.go @@ -6,9 +6,19 @@ import ( protodb "github.com/tendermint/tmlibs/proto" ) -func NewClient(serverAddr string, secure bool) (protodb.DBClient, error) { +// Security defines how the client will talk to the gRPC server. +type Security uint + +const ( + Insecure Security = iota + Secure +) + +// NewClient creates a gRPC client connected to the bound gRPC server at serverAddr. +// Use kind to set the level of security to either Secure or Insecure. +func NewClient(serverAddr string, kind Security) (protodb.DBClient, error) { var opts []grpc.DialOption - if !secure { + if kind == Insecure { opts = append(opts, grpc.WithInsecure()) } cc, err := grpc.Dial(serverAddr, opts...) diff --git a/grpcdb/example_test.go b/grpcdb/example_test.go index 653180113..451428b97 100644 --- a/grpcdb/example_test.go +++ b/grpcdb/example_test.go @@ -12,12 +12,12 @@ import ( func Example() { addr := ":8998" go func() { - if err := grpcdb.BindRemoteDBServer(addr); err != nil { - log.Fatalf("BindRemoteDBServer: %v", err) + if err := grpcdb.BindServer(addr); err != nil { + log.Fatalf("BindServer: %v", err) } }() - client, err := grpcdb.NewClient(addr, false) + client, err := grpcdb.NewClient(addr, grpcdb.Insecure) if err != nil { log.Fatalf("Failed to create grpcDB client: %v", err) } diff --git a/grpcdb/server.go b/grpcdb/server.go index 26d0ffa9e..c4d115bd7 100644 --- a/grpcdb/server.go +++ b/grpcdb/server.go @@ -1,8 +1,8 @@ package grpcdb import ( -"log" "context" + "log" "net" "sync" "time" @@ -13,7 +13,10 @@ import ( protodb "github.com/tendermint/tmlibs/proto" ) -func BindRemoteDBServer(addr string, opts ...grpc.ServerOption) error { +// BindServer is a blocking function that sets up a gRPC based +// server at the address supplied, with the gRPC options passed in. +// Normally in usage, invoke it in a goroutine like you would for http.ListenAndServe. +func BindServer(addr string, opts ...grpc.ServerOption) error { ln, err := net.Listen("tcp", addr) if err != nil { return err @@ -30,11 +33,24 @@ type server struct { var _ protodb.DBServer = (*server)(nil) +// Init initializes the server's database. Only one type of database +// can be initialized per server. +// +// Dir is the directory on the file system in which the DB will be stored(if backed by disk) (TODO: remove) +// +// Name is representative filesystem entry's basepath +// +// Type can be either one of: +// * cleveldb (if built with gcc enabled) +// * fsdb +// * memdB +// * leveldb +// See https://godoc.org/github.com/tendermint/tmlibs/db#DBBackendType func (s *server) Init(ctx context.Context, in *protodb.Init) (*protodb.Entity, error) { s.mu.Lock() defer s.mu.Unlock() -log.Printf("in: %+v\n", in) + log.Printf("in: %+v\n", in) s.db = db.NewDB(in.Name, db.DBBackendType(in.Type), in.Dir) return &protodb.Entity{TimeAt: time.Now().Unix()}, nil } @@ -45,6 +61,7 @@ func (s *server) Delete(ctx context.Context, in *protodb.Entity) (*protodb.Nothi } var nothing = new(protodb.Nothing) + func (s *server) DeleteSync(ctx context.Context, in *protodb.Entity) (*protodb.Nothing, error) { s.db.DeleteSync(in.Key) return nothing, nil From 5d12e1eb46ef00868fbf74bc2e990d96592b5635 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Sat, 17 Mar 2018 16:58:49 -0700 Subject: [PATCH 03/10] remotedb: a client package implementing the db.DB interface Simplified the abstractions to remotedb, a package that allows clients to use whatever database they can in client code without having to switch out their code e.g ```go client, err := remotedb.NewInsecure(":9888") ... // Just like they'd initialize locally in := &remotedb.Init{ Name: "test-remote-db", Type: "leveldb", Dir: "/tmp/dbs", } if err := client.InitRemote(in); err != nil { log.Fatalf("Failed to initialize the database") } v1 := client.Get(k1) client.Set(k9, dog) for itr := client.Iterator(a1, z1); itr.Valid(); itr.Next() { k, v := itr.Key(), itr.Value() dom := itr.Domain() ... } ``` --- grpcdb/doc.go | 30 +++++ grpcdb/example_test.go | 2 +- grpcdb/server.go | 13 ++- remotedb/doc.go | 37 +++++++ remotedb/remotedb.go | 226 ++++++++++++++++++++++++++++++++++++++ remotedb/remotedb_test.go | 41 +++++++ 6 files changed, 343 insertions(+), 6 deletions(-) create mode 100644 grpcdb/doc.go create mode 100644 remotedb/doc.go create mode 100644 remotedb/remotedb.go create mode 100644 remotedb/remotedb_test.go diff --git a/grpcdb/doc.go b/grpcdb/doc.go new file mode 100644 index 000000000..a54cab207 --- /dev/null +++ b/grpcdb/doc.go @@ -0,0 +1,30 @@ +/* +grpcdb is the distribution of Tendermint's db.DB instances using +the gRPC transport to decouple local db.DB usages from applications, +to using them over a network in a highly performant manner. + +grpcdb allows users to initialize a database's server like +they would locally and invoke the respective methods of db.DB. + +Most users shouldn't use this package, but should instead use +remotedb. Only the lower level users and database server deployers +should use it, for functionality such as: + + ln, err := net.Listen("tcp", "0.0.0.0:0") + srv := grpcdb.NewServer() + defer srv.Stop() + go func() { + if err := srv.Serve(ln); err != nil { + t.Fatalf("BindServer: %v", err) + } + }() + +or + addr := ":8998" + go func() { + if err := grpcdb.ListenAndServe(addr); err != nil { + log.Fatalf("BindServer: %v", err) + } + }() +*/ +package grpcdb diff --git a/grpcdb/example_test.go b/grpcdb/example_test.go index 451428b97..cbe1abf92 100644 --- a/grpcdb/example_test.go +++ b/grpcdb/example_test.go @@ -12,7 +12,7 @@ import ( func Example() { addr := ":8998" go func() { - if err := grpcdb.BindServer(addr); err != nil { + if err := grpcdb.ListenAndServe(addr); err != nil { log.Fatalf("BindServer: %v", err) } }() diff --git a/grpcdb/server.go b/grpcdb/server.go index c4d115bd7..301f43f23 100644 --- a/grpcdb/server.go +++ b/grpcdb/server.go @@ -2,7 +2,6 @@ package grpcdb import ( "context" - "log" "net" "sync" "time" @@ -13,17 +12,22 @@ import ( protodb "github.com/tendermint/tmlibs/proto" ) -// BindServer is a blocking function that sets up a gRPC based +// ListenAndServe is a blocking function that sets up a gRPC based // server at the address supplied, with the gRPC options passed in. // Normally in usage, invoke it in a goroutine like you would for http.ListenAndServe. -func BindServer(addr string, opts ...grpc.ServerOption) error { +func ListenAndServe(addr string, opts ...grpc.ServerOption) error { ln, err := net.Listen("tcp", addr) if err != nil { return err } + srv := NewServer(opts...) + return srv.Serve(ln) +} + +func NewServer(opts ...grpc.ServerOption) *grpc.Server { srv := grpc.NewServer(opts...) protodb.RegisterDBServer(srv, new(server)) - return srv.Serve(ln) + return srv } type server struct { @@ -50,7 +54,6 @@ func (s *server) Init(ctx context.Context, in *protodb.Init) (*protodb.Entity, e s.mu.Lock() defer s.mu.Unlock() - log.Printf("in: %+v\n", in) s.db = db.NewDB(in.Name, db.DBBackendType(in.Type), in.Dir) return &protodb.Entity{TimeAt: time.Now().Unix()}, nil } diff --git a/remotedb/doc.go b/remotedb/doc.go new file mode 100644 index 000000000..07c95a56a --- /dev/null +++ b/remotedb/doc.go @@ -0,0 +1,37 @@ +/* +remotedb is a package for connecting to distributed Tendermint db.DB +instances. The purpose is to detach difficult deployments such as +CLevelDB that requires gcc or perhaps for databases that require +custom configurations such as extra disk space. It also eases +the burden and cost of deployment of dependencies for databases +to be used by Tendermint developers. Most importantly it is built +over the high performant gRPC transport. + +remotedb's RemoteDB implements db.DB so can be used normally +like other databases. One just has to explicitly connect to the +remote database with a client setup such as: + + client, err := remotedb.NewInsecure(addr) + // Make sure to invoke InitRemote! + if err := client.InitRemote(&remotedb.Init{Name: "test-remote-db", Type: "leveldb"}); err != nil { + log.Fatalf("Failed to initialize the remote db") + } + + client.Set(key1, value) + gv1 := client.SetSync(k2, v2) + + client.Delete(k1) + gv2 := client.Get(k1) + + for itr := client.Iterator(k1, k9); itr.Valid(); itr.Next() { + ik, iv := itr.Key(), itr.Value() + ds, de := itr.Domain() + } + + stats := client.Stats() + + if !client.Has(dk1) { + client.SetSync(dk1, dv1) + } +*/ +package remotedb diff --git a/remotedb/remotedb.go b/remotedb/remotedb.go new file mode 100644 index 000000000..a110e816c --- /dev/null +++ b/remotedb/remotedb.go @@ -0,0 +1,226 @@ +package remotedb + +import ( + "context" + "fmt" + + "github.com/tendermint/tmlibs/db" + "github.com/tendermint/tmlibs/grpcdb" + protodb "github.com/tendermint/tmlibs/proto" +) + +type RemoteDB struct { + ctx context.Context + dc protodb.DBClient +} + +func NewSecure(serverAddr string) (*RemoteDB, error) { + return newRemoteDB(grpcdb.NewClient(serverAddr, grpcdb.Secure)) +} + +func NewInsecure(serverAddr string) (*RemoteDB, error) { + return newRemoteDB(grpcdb.NewClient(serverAddr, grpcdb.Insecure)) +} + +func newRemoteDB(gdc protodb.DBClient, err error) (*RemoteDB, error) { + if err != nil { + return nil, err + } + return &RemoteDB{dc: gdc, ctx: context.Background()}, nil +} + +type Init struct { + Dir string + Name string + Type string +} + +func (rd *RemoteDB) InitRemote(in *Init) error { + _, err := rd.dc.Init(rd.ctx, &protodb.Init{Dir: in.Dir, Type: in.Type, Name: in.Name}) + return err +} + +var _ db.DB = (*RemoteDB)(nil) + +// Close is a noop currently +func (rd *RemoteDB) Close() { +} + +func (rd *RemoteDB) Delete(key []byte) { + if _, err := rd.dc.Delete(rd.ctx, &protodb.Entity{Key: key}); err != nil { + panic(fmt.Sprintf("RemoteDB.Delete: %v", err)) + } +} + +func (rd *RemoteDB) DeleteSync(key []byte) { + if _, err := rd.dc.DeleteSync(rd.ctx, &protodb.Entity{Key: key}); err != nil { + panic(fmt.Sprintf("RemoteDB.DeleteSync: %v", err)) + } +} + +func (rd *RemoteDB) Set(key, value []byte) { + if _, err := rd.dc.Set(rd.ctx, &protodb.Entity{Key: key, Value: value}); err != nil { + panic(fmt.Sprintf("RemoteDB.Set: %v", err)) + } +} + +func (rd *RemoteDB) SetSync(key, value []byte) { + if _, err := rd.dc.SetSync(rd.ctx, &protodb.Entity{Key: key, Value: value}); err != nil { + panic(fmt.Sprintf("RemoteDB.SetSync: %v", err)) + } +} + +func (rd *RemoteDB) Get(key []byte) []byte { + res, err := rd.dc.Get(rd.ctx, &protodb.Entity{Key: key}) + if err != nil { + panic(fmt.Sprintf("RemoteDB.Get error: %v", err)) + } + return res.Value +} + +func (rd *RemoteDB) Has(key []byte) bool { + res, err := rd.dc.Has(rd.ctx, &protodb.Entity{Key: key}) + if err != nil { + panic(fmt.Sprintf("RemoteDB.Has error: %v", err)) + } + return res.Exists +} + +func (rd *RemoteDB) ReverseIterator(start, end []byte) db.Iterator { + dic, err := rd.dc.ReverseIterator(rd.ctx, &protodb.Entity{Start: start, End: end}) + if err != nil { + panic(fmt.Sprintf("RemoteDB.Iterator error: %v", err)) + } + return makeReverseIterator(dic) +} + +// TODO: Implement NewBatch +func (rd *RemoteDB) NewBatch() db.Batch { + panic("Unimplemented") +} + +// TODO: Implement Print when db.DB implements a method +// to print to a string and not db.Print to stdout. +func (rd *RemoteDB) Print() { + panic("Unimplemented") +} + +func (rd *RemoteDB) Stats() map[string]string { + stats, err := rd.dc.Stats(rd.ctx, &protodb.Nothing{}) + if err != nil { + panic(fmt.Sprintf("RemoteDB.Stats error: %v", err)) + } + if stats == nil { + return nil + } + return stats.Data +} + +func (rd *RemoteDB) Iterator(start, end []byte) db.Iterator { + dic, err := rd.dc.Iterator(rd.ctx, &protodb.Entity{Start: start, End: end}) + if err != nil { + panic(fmt.Sprintf("RemoteDB.Iterator error: %v", err)) + } + return makeIterator(dic) +} + +func makeIterator(dic protodb.DB_IteratorClient) db.Iterator { + return &iterator{dic: dic} +} + +func makeReverseIterator(dric protodb.DB_ReverseIteratorClient) db.Iterator { + return &reverseIterator{dric: dric} +} + +type reverseIterator struct { + dric protodb.DB_ReverseIteratorClient + cur *protodb.Iterator +} + +var _ db.Iterator = (*iterator)(nil) + +func (rItr *reverseIterator) Valid() bool { + return rItr.cur != nil && rItr.cur.Valid +} + +func (rItr *reverseIterator) Domain() (start, end []byte) { + if rItr.cur == nil || rItr.cur.Domain == nil { + return nil, nil + } + return rItr.cur.Domain.Start, rItr.cur.Domain.End +} + +// Next advances the current reverseIterator +func (rItr *reverseIterator) Next() { + var err error + rItr.cur, err = rItr.dric.Recv() + if err != nil { + panic(fmt.Sprintf("RemoteDB.ReverseIterator.Next error: %v", err)) + } +} + +func (rItr *reverseIterator) Key() []byte { + if rItr.cur == nil { + return nil + } + return rItr.cur.Key +} + +func (rItr *reverseIterator) Value() []byte { + if rItr.cur == nil { + return nil + } + return rItr.cur.Value +} + +func (rItr *reverseIterator) Close() { +} + +// iterator implements the db.Iterator by retrieving +// streamed iterators from the remote backend as +// needed. It is NOT safe for concurrent usage, +// matching the behavior of other iterators. +type iterator struct { + dic protodb.DB_IteratorClient + cur *protodb.Iterator +} + +var _ db.Iterator = (*iterator)(nil) + +func (itr *iterator) Valid() bool { + return itr.cur != nil && itr.cur.Valid +} + +func (itr *iterator) Domain() (start, end []byte) { + if itr.cur == nil || itr.cur.Domain == nil { + return nil, nil + } + return itr.cur.Domain.Start, itr.cur.Domain.End +} + +// Next advances the current iterator +func (itr *iterator) Next() { + var err error + itr.cur, err = itr.dic.Recv() + if err != nil { + panic(fmt.Sprintf("RemoteDB.Iterator.Next error: %v", err)) + } +} + +func (itr *iterator) Key() []byte { + if itr.cur == nil { + return nil + } + return itr.cur.Key +} + +func (itr *iterator) Value() []byte { + if itr.cur == nil { + return nil + } + return itr.cur.Value +} + +func (itr *iterator) Close() { + // TODO: Shut down the iterator +} diff --git a/remotedb/remotedb_test.go b/remotedb/remotedb_test.go new file mode 100644 index 000000000..37ce0c59a --- /dev/null +++ b/remotedb/remotedb_test.go @@ -0,0 +1,41 @@ +package remotedb_test + +import ( + "net" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/tendermint/tmlibs/grpcdb" + "github.com/tendermint/tmlibs/remotedb" +) + +func TestRemoteDB(t *testing.T) { + ln, err := net.Listen("tcp", "0.0.0.0:0") + require.Nil(t, err, "expecting a port to have been assigned on which we can listen") + srv := grpcdb.NewServer() + defer srv.Stop() + go func() { + if err := srv.Serve(ln); err != nil { + t.Fatalf("BindServer: %v", err) + } + }() + + client, err := remotedb.NewInsecure(ln.Addr().String()) + require.Nil(t, err, "expecting a successful client creation") + require.Nil(t, client.InitRemote(&remotedb.Init{Name: "test-remote-db", Type: "leveldb"})) + + k1 := []byte("key-1") + v1 := client.Get(k1) + require.Equal(t, 0, len(v1), "expecting no key1 to have been stored") + vv1 := []byte("value-1") + client.Set(k1, vv1) + gv1 := client.Get(k1) + require.Equal(t, gv1, vv1) + + // Deletion + client.Delete(k1) + gv2 := client.Get(k1) + require.Equal(t, len(gv2), 0, "after deletion, not expecting the key to exist anymore") + require.NotEqual(t, len(gv1), len(gv2), "after deletion, not expecting the key to exist anymore") +} From bf16d6453c64c553c224d9334e53ef7a60d298b0 Mon Sep 17 00:00:00 2001 From: Christopher Goes Date: Mon, 7 May 2018 22:12:26 +0200 Subject: [PATCH 04/10] Address PR comments --- Gopkg.lock | 74 ++++++++++++++++++++++++++++- Makefile | 2 +- grpcdb/server.go | 4 +- proto/defs.pb.go | 118 ++++++++++++++++++++++------------------------- proto/defs.proto | 7 ++- 5 files changed, 133 insertions(+), 72 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 45b4d2887..32669c198 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -51,6 +51,18 @@ revision = "1adfc126b41513cc696b209667c8656ea7aac67c" version = "v1.0.0" +[[projects]] + name = "github.com/golang/protobuf" + packages = [ + "proto", + "ptypes", + "ptypes/any", + "ptypes/duration", + "ptypes/timestamp" + ] + revision = "b4deda0973fb4c70b50d226b1af49f3da59f5265" + version = "v1.1.0" + [[projects]] branch = "master" name = "github.com/golang/snappy" @@ -189,6 +201,20 @@ packages = ["ripemd160"] revision = "edd5e9b0879d13ee6970a50153d85b8fec9f7686" +[[projects]] + branch = "master" + name = "golang.org/x/net" + packages = [ + "context", + "http/httpguts", + "http2", + "http2/hpack", + "idna", + "internal/timeseries", + "trace" + ] + revision = "d11bb6cd8e3c4e60239c9cb20ef68586d74500d0" + [[projects]] name = "golang.org/x/sys" packages = ["unix"] @@ -197,15 +223,59 @@ [[projects]] name = "golang.org/x/text" packages = [ + "collate", + "collate/build", + "internal/colltab", "internal/gen", + "internal/tag", "internal/triegen", "internal/ucd", + "language", + "secure/bidirule", "transform", + "unicode/bidi", "unicode/cldr", - "unicode/norm" + "unicode/norm", + "unicode/rangetable" ] revision = "c01e4764d870b77f8abe5096ee19ad20d80e8075" +[[projects]] + branch = "master" + name = "google.golang.org/genproto" + packages = ["googleapis/rpc/status"] + revision = "86e600f69ee4704c6efbf6a2a40a5c10700e76c2" + +[[projects]] + name = "google.golang.org/grpc" + packages = [ + ".", + "balancer", + "balancer/base", + "balancer/roundrobin", + "codes", + "connectivity", + "credentials", + "encoding", + "encoding/proto", + "grpclb/grpc_lb_v1/messages", + "grpclog", + "internal", + "keepalive", + "metadata", + "naming", + "peer", + "resolver", + "resolver/dns", + "resolver/passthrough", + "stats", + "status", + "tap", + "transport" + ] + revision = "d11072e7ca9811b1100b80ca0269ac831f06d024" + version = "v1.11.3" + [[projects]] name = "gopkg.in/yaml.v2" packages = ["."] @@ -215,6 +285,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "c33ff784e40965e1cd0ec6232b43e379c6608cb41a9c5c707247742b68c906fb" + inputs-digest = "8b1ff7eb1a874905f0d7772407cfacd3fca77a2214530e633c0f4d7e468a6f92" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Makefile b/Makefile index 0236c480b..0e715ef16 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ GOTOOLS = \ GOTOOLS_CHECK = dep gometalinter.v2 protoc protoc-gen-gogo INCLUDE = -I=. -I=${GOPATH}/src -I=${GOPATH}/src/github.com/gogo/protobuf/protobuf -all: check get_vendor_deps protoc build test install metalinter +all: check get_vendor_deps protoc grpc_dbserver build test install metalinter check: check_tools diff --git a/grpcdb/server.go b/grpcdb/server.go index 301f43f23..1e8495300 100644 --- a/grpcdb/server.go +++ b/grpcdb/server.go @@ -55,7 +55,7 @@ func (s *server) Init(ctx context.Context, in *protodb.Init) (*protodb.Entity, e defer s.mu.Unlock() s.db = db.NewDB(in.Name, db.DBBackendType(in.Type), in.Dir) - return &protodb.Entity{TimeAt: time.Now().Unix()}, nil + return &protodb.Entity{CreatedAt: time.Now().Unix()}, nil } func (s *server) Delete(ctx context.Context, in *protodb.Entity) (*protodb.Nothing, error) { @@ -136,7 +136,7 @@ func (s *server) handleIterator(it db.Iterator, sendFunc func(*protodb.Iterator) for it.Valid() { start, end := it.Domain() out := &protodb.Iterator{ - Domain: &protodb.DDomain{Start: start, End: end}, + Domain: &protodb.Domain{Start: start, End: end}, Valid: it.Valid(), Key: it.Key(), Value: it.Value(), diff --git a/proto/defs.pb.go b/proto/defs.pb.go index 61f687504..c65b28e08 100644 --- a/proto/defs.pb.go +++ b/proto/defs.pb.go @@ -10,7 +10,7 @@ It is generated from these files: It has these top-level messages: Entity Nothing - DDomain + Domain Iterator Stats Init @@ -38,15 +38,14 @@ var _ = math.Inf const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package type Entity struct { - Id int32 `protobuf:"varint,1,opt,name=id" json:"id,omitempty"` - Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` - Value []byte `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"` - Exists bool `protobuf:"varint,4,opt,name=exists" json:"exists,omitempty"` - Start []byte `protobuf:"bytes,5,opt,name=start,proto3" json:"start,omitempty"` - End []byte `protobuf:"bytes,6,opt,name=end,proto3" json:"end,omitempty"` - Err string `protobuf:"bytes,7,opt,name=err" json:"err,omitempty"` - Print string `protobuf:"bytes,8,opt,name=print" json:"print,omitempty"` - TimeAt int64 `protobuf:"varint,9,opt,name=time_at,json=timeAt" json:"time_at,omitempty"` + Id int32 `protobuf:"varint,1,opt,name=id" json:"id,omitempty"` + Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` + Value []byte `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"` + Exists bool `protobuf:"varint,4,opt,name=exists" json:"exists,omitempty"` + Start []byte `protobuf:"bytes,5,opt,name=start,proto3" json:"start,omitempty"` + End []byte `protobuf:"bytes,6,opt,name=end,proto3" json:"end,omitempty"` + Err string `protobuf:"bytes,7,opt,name=err" json:"err,omitempty"` + CreatedAt int64 `protobuf:"varint,8,opt,name=created_at,json=createdAt" json:"created_at,omitempty"` } func (m *Entity) Reset() { *m = Entity{} } @@ -103,16 +102,9 @@ func (m *Entity) GetErr() string { return "" } -func (m *Entity) GetPrint() string { +func (m *Entity) GetCreatedAt() int64 { if m != nil { - return m.Print - } - return "" -} - -func (m *Entity) GetTimeAt() int64 { - if m != nil { - return m.TimeAt + return m.CreatedAt } return 0 } @@ -125,24 +117,24 @@ func (m *Nothing) String() string { return proto.CompactTextString(m) func (*Nothing) ProtoMessage() {} func (*Nothing) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } -type DDomain struct { +type Domain struct { Start []byte `protobuf:"bytes,1,opt,name=start,proto3" json:"start,omitempty"` End []byte `protobuf:"bytes,2,opt,name=end,proto3" json:"end,omitempty"` } -func (m *DDomain) Reset() { *m = DDomain{} } -func (m *DDomain) String() string { return proto.CompactTextString(m) } -func (*DDomain) ProtoMessage() {} -func (*DDomain) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } +func (m *Domain) Reset() { *m = Domain{} } +func (m *Domain) String() string { return proto.CompactTextString(m) } +func (*Domain) ProtoMessage() {} +func (*Domain) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } -func (m *DDomain) GetStart() []byte { +func (m *Domain) GetStart() []byte { if m != nil { return m.Start } return nil } -func (m *DDomain) GetEnd() []byte { +func (m *Domain) GetEnd() []byte { if m != nil { return m.End } @@ -150,10 +142,10 @@ func (m *DDomain) GetEnd() []byte { } type Iterator struct { - Domain *DDomain `protobuf:"bytes,1,opt,name=domain" json:"domain,omitempty"` - Valid bool `protobuf:"varint,2,opt,name=valid" json:"valid,omitempty"` - Key []byte `protobuf:"bytes,3,opt,name=key,proto3" json:"key,omitempty"` - Value []byte `protobuf:"bytes,4,opt,name=value,proto3" json:"value,omitempty"` + Domain *Domain `protobuf:"bytes,1,opt,name=domain" json:"domain,omitempty"` + Valid bool `protobuf:"varint,2,opt,name=valid" json:"valid,omitempty"` + Key []byte `protobuf:"bytes,3,opt,name=key,proto3" json:"key,omitempty"` + Value []byte `protobuf:"bytes,4,opt,name=value,proto3" json:"value,omitempty"` } func (m *Iterator) Reset() { *m = Iterator{} } @@ -161,7 +153,7 @@ func (m *Iterator) String() string { return proto.CompactTextString(m func (*Iterator) ProtoMessage() {} func (*Iterator) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } -func (m *Iterator) GetDomain() *DDomain { +func (m *Iterator) GetDomain() *Domain { if m != nil { return m.Domain } @@ -248,7 +240,7 @@ func (m *Init) GetDir() string { func init() { proto.RegisterType((*Entity)(nil), "protodb.Entity") proto.RegisterType((*Nothing)(nil), "protodb.Nothing") - proto.RegisterType((*DDomain)(nil), "protodb.DDomain") + proto.RegisterType((*Domain)(nil), "protodb.Domain") proto.RegisterType((*Iterator)(nil), "protodb.Iterator") proto.RegisterType((*Stats)(nil), "protodb.Stats") proto.RegisterType((*Init)(nil), "protodb.Init") @@ -749,36 +741,36 @@ func init() { proto.RegisterFile("defs.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ // 498 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0xcd, 0x72, 0xd3, 0x4c, - 0x10, 0xf4, 0x4a, 0xb2, 0x64, 0x4d, 0xbe, 0x2f, 0x09, 0x5b, 0x14, 0x6c, 0xf9, 0xa4, 0xd2, 0x49, - 0xfc, 0xb9, 0x12, 0xe7, 0xc0, 0xcf, 0x09, 0x28, 0xe7, 0xe0, 0x4b, 0x0e, 0x32, 0x77, 0x6a, 0x83, - 0x06, 0x67, 0x8b, 0x58, 0x72, 0xed, 0x0e, 0x29, 0xf4, 0x04, 0x3c, 0x00, 0x4f, 0xc4, 0x9b, 0x51, - 0xbb, 0xfa, 0xb1, 0x43, 0x7c, 0x10, 0x27, 0x4d, 0xef, 0x76, 0xf7, 0x8c, 0x5a, 0x23, 0x80, 0x02, - 0xbf, 0x9a, 0xd9, 0x56, 0x57, 0x54, 0xf1, 0xc8, 0x3d, 0x8a, 0xeb, 0xf4, 0x37, 0x83, 0xf0, 0xb2, - 0x24, 0x45, 0x35, 0x3f, 0x06, 0x4f, 0x15, 0x82, 0x25, 0x2c, 0x1b, 0xe7, 0x9e, 0x2a, 0xf8, 0x29, - 0xf8, 0xdf, 0xb0, 0x16, 0x5e, 0xc2, 0xb2, 0xff, 0x72, 0x5b, 0xf2, 0xc7, 0x30, 0xbe, 0x93, 0xb7, - 0xdf, 0x51, 0xf8, 0xee, 0xac, 0x01, 0xfc, 0x09, 0x84, 0xf8, 0x43, 0x19, 0x32, 0x22, 0x48, 0x58, - 0x36, 0xc9, 0x5b, 0x64, 0xd9, 0x86, 0xa4, 0x26, 0x31, 0x6e, 0xd8, 0x0e, 0x58, 0x57, 0x2c, 0x0b, - 0x11, 0x36, 0xae, 0x58, 0xba, 0x3e, 0xa8, 0xb5, 0x88, 0x12, 0x96, 0xc5, 0xb9, 0x2d, 0xad, 0x72, - 0xab, 0x55, 0x49, 0x62, 0xe2, 0xce, 0x1a, 0xc0, 0x9f, 0x42, 0x44, 0x6a, 0x83, 0x9f, 0x25, 0x89, - 0x38, 0x61, 0x99, 0x9f, 0x87, 0x16, 0x7e, 0xa0, 0x34, 0x86, 0xe8, 0xaa, 0xa2, 0x1b, 0x55, 0xae, - 0xd3, 0x73, 0x88, 0x16, 0x8b, 0x6a, 0x23, 0x55, 0xb9, 0x6b, 0xcf, 0x0e, 0xb4, 0xf7, 0xfa, 0xf6, - 0xa9, 0x86, 0xc9, 0x92, 0x50, 0x4b, 0xaa, 0x34, 0xcf, 0x20, 0x2c, 0x9c, 0xda, 0x89, 0x8e, 0xe6, - 0xa7, 0xb3, 0x36, 0xa7, 0x59, 0xeb, 0x9a, 0xb7, 0xf7, 0x6d, 0x14, 0xaa, 0x71, 0x9a, 0xe4, 0x0d, - 0xe8, 0x22, 0xf3, 0x0f, 0x44, 0x16, 0xec, 0x45, 0x96, 0xfe, 0x64, 0x30, 0x5e, 0x91, 0x24, 0xc3, - 0x5f, 0x42, 0x50, 0x48, 0x92, 0x82, 0x25, 0x7e, 0x76, 0x34, 0x17, 0x7d, 0x3f, 0x77, 0x3b, 0x5b, - 0x48, 0x92, 0x97, 0x25, 0xe9, 0x3a, 0x77, 0xac, 0xfd, 0x08, 0xbc, 0xfd, 0x08, 0xa6, 0xaf, 0x21, - 0xee, 0xb9, 0xdd, 0x14, 0xac, 0x09, 0xf4, 0xde, 0x14, 0x5e, 0x13, 0xa8, 0x03, 0xef, 0xbc, 0x37, - 0x2c, 0x7d, 0x0f, 0xc1, 0xb2, 0x54, 0xc4, 0x39, 0x04, 0x9f, 0xea, 0x2d, 0xb6, 0x22, 0x57, 0xdb, - 0xb3, 0x2b, 0xb9, 0xe9, 0x44, 0xae, 0xb6, 0xde, 0x0b, 0xa5, 0xdd, 0x1b, 0xc6, 0xb9, 0x2d, 0xe7, - 0xbf, 0x02, 0xf0, 0x16, 0x1f, 0x79, 0x06, 0x81, 0xb2, 0x46, 0xff, 0xf7, 0xaf, 0x60, 0x7d, 0xa7, - 0x27, 0x3d, 0x6c, 0xb6, 0x2c, 0x1d, 0xf1, 0x67, 0xe0, 0xaf, 0x91, 0xf8, 0xdf, 0x37, 0x87, 0xa8, - 0x17, 0x10, 0xaf, 0x91, 0x56, 0xa4, 0x51, 0x6e, 0x86, 0x08, 0x32, 0x76, 0xc6, 0xac, 0xff, 0x8d, - 0x34, 0x83, 0xfc, 0x9f, 0x83, 0x6f, 0x0e, 0x8d, 0xb2, 0xfb, 0xee, 0xdd, 0x62, 0x8d, 0xf8, 0x0c, - 0x22, 0x83, 0xb4, 0xaa, 0xcb, 0x2f, 0xc3, 0xf8, 0xaf, 0x20, 0x2c, 0xf0, 0x16, 0x09, 0x87, 0xd1, - 0xcf, 0xed, 0xff, 0x69, 0xe9, 0xc3, 0x3b, 0xcc, 0x61, 0xa2, 0xba, 0xcd, 0x7d, 0x20, 0x78, 0xb4, - 0xfb, 0x0e, 0x2d, 0x27, 0x1d, 0x9d, 0x31, 0xfe, 0x16, 0x4e, 0x34, 0xde, 0xa1, 0x36, 0xb8, 0xfc, - 0x57, 0xe9, 0x0b, 0xf7, 0x43, 0x91, 0xe1, 0x0f, 0x66, 0x99, 0x1e, 0xdf, 0xdf, 0xdb, 0x74, 0x74, - 0x1d, 0xba, 0x83, 0x8b, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0xf4, 0x2e, 0x77, 0x07, 0x75, 0x04, + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0xcd, 0x6e, 0xd3, 0x40, + 0x10, 0xce, 0xda, 0x8e, 0x13, 0x4f, 0xa1, 0x2d, 0x2b, 0x04, 0xab, 0x4a, 0x48, 0x96, 0x2f, 0x98, + 0x3f, 0x2b, 0xa4, 0x07, 0x7e, 0x4e, 0x14, 0xa5, 0x87, 0x5c, 0x7a, 0x70, 0xb8, 0xa3, 0x6d, 0x3d, + 0xa4, 0x2b, 0x1a, 0xbb, 0xec, 0x0e, 0x15, 0x7e, 0x02, 0x1e, 0x80, 0x27, 0xe1, 0x0d, 0xd1, 0xae, + 0x7f, 0x42, 0x69, 0x0e, 0xe6, 0xe4, 0x99, 0xd9, 0xef, 0xfb, 0x66, 0xf6, 0xf3, 0x2c, 0x40, 0x81, + 0x5f, 0x4c, 0x76, 0xad, 0x2b, 0xaa, 0xf8, 0xc4, 0x7d, 0x8a, 0xf3, 0xe4, 0x37, 0x83, 0xf0, 0xb4, + 0x24, 0x45, 0x35, 0xdf, 0x07, 0x4f, 0x15, 0x82, 0xc5, 0x2c, 0x1d, 0xe7, 0x9e, 0x2a, 0xf8, 0x21, + 0xf8, 0x5f, 0xb1, 0x16, 0x5e, 0xcc, 0xd2, 0x7b, 0xb9, 0x0d, 0xf9, 0x43, 0x18, 0xdf, 0xc8, 0xab, + 0xef, 0x28, 0x7c, 0x57, 0x6b, 0x12, 0xfe, 0x08, 0x42, 0xfc, 0xa1, 0x0c, 0x19, 0x11, 0xc4, 0x2c, + 0x9d, 0xe6, 0x6d, 0x66, 0xd1, 0x86, 0xa4, 0x26, 0x31, 0x6e, 0xd0, 0x2e, 0xb1, 0xaa, 0x58, 0x16, + 0x22, 0x6c, 0x54, 0xb1, 0x74, 0x7d, 0x50, 0x6b, 0x31, 0x89, 0x59, 0x1a, 0xe5, 0x36, 0xe4, 0x4f, + 0x00, 0x2e, 0x34, 0x4a, 0xc2, 0xe2, 0xb3, 0x24, 0x31, 0x8d, 0x59, 0xea, 0xe7, 0x51, 0x5b, 0x39, + 0xa1, 0x24, 0x82, 0xc9, 0x59, 0x45, 0x97, 0xaa, 0x5c, 0x27, 0x33, 0x08, 0x17, 0xd5, 0x46, 0xaa, + 0x72, 0xdb, 0x8d, 0xed, 0xe8, 0xe6, 0xf5, 0xdd, 0x92, 0x6f, 0x30, 0x5d, 0x12, 0x6a, 0x49, 0x95, + 0xe6, 0x4f, 0x21, 0x2c, 0x1c, 0xdb, 0x91, 0xf6, 0xe6, 0x07, 0x59, 0x6b, 0x4b, 0xd6, 0x88, 0xe6, + 0xed, 0x71, 0x7b, 0x71, 0xd5, 0x08, 0x4d, 0xf3, 0x26, 0xe9, 0x0c, 0xf2, 0x77, 0x18, 0x14, 0xfc, + 0x65, 0x50, 0xf2, 0x93, 0xc1, 0x78, 0x45, 0x92, 0x0c, 0x7f, 0x09, 0x41, 0x21, 0x49, 0x0a, 0x16, + 0xfb, 0xe9, 0xde, 0x5c, 0xf4, 0xed, 0xdc, 0x69, 0xb6, 0x90, 0x24, 0x4f, 0x4b, 0xd2, 0x75, 0xee, + 0x50, 0xfc, 0x31, 0x4c, 0x48, 0x6d, 0xd0, 0x7a, 0xe0, 0x39, 0x0f, 0x42, 0x9b, 0x9e, 0xd0, 0xd1, + 0x1b, 0x88, 0x7a, 0x6c, 0x37, 0x05, 0x6b, 0xec, 0xbb, 0x35, 0x85, 0xe7, 0x6a, 0x4d, 0xf2, 0xde, + 0x7b, 0xcb, 0x92, 0x0f, 0x10, 0x2c, 0x4b, 0x45, 0x9c, 0x43, 0xf0, 0xa9, 0xbe, 0xc6, 0x96, 0xe4, + 0x62, 0x5b, 0x3b, 0x93, 0x9b, 0x8e, 0xe4, 0x62, 0xab, 0xbd, 0x50, 0xda, 0xdd, 0x30, 0xca, 0x6d, + 0x38, 0xff, 0x15, 0x80, 0xb7, 0xf8, 0xc8, 0x53, 0x08, 0x94, 0x15, 0xba, 0xdf, 0x5f, 0xc1, 0xea, + 0x1e, 0x6d, 0x0d, 0x6c, 0x76, 0x2a, 0x19, 0xf1, 0x67, 0xe0, 0xaf, 0x91, 0xf8, 0xbf, 0x27, 0xbb, + 0xa0, 0xc7, 0x10, 0xad, 0x91, 0x56, 0xa4, 0x51, 0x6e, 0x86, 0x10, 0x52, 0x36, 0x63, 0x56, 0xff, + 0x52, 0x9a, 0x41, 0xfa, 0xcf, 0xc1, 0x37, 0xbb, 0x46, 0x39, 0xec, 0x0b, 0xdd, 0x5a, 0x8d, 0x78, + 0x06, 0x13, 0x83, 0xb4, 0xaa, 0xcb, 0x8b, 0x61, 0xf8, 0x57, 0x10, 0x16, 0x78, 0x85, 0x84, 0xc3, + 0xe0, 0xaf, 0xed, 0x6b, 0xb4, 0xf0, 0xe1, 0x1d, 0xe6, 0x30, 0x55, 0xdd, 0xe2, 0xde, 0x21, 0x3c, + 0xd8, 0xfe, 0x87, 0x16, 0x93, 0x8c, 0x66, 0x8c, 0xbf, 0x83, 0x03, 0x8d, 0x37, 0xa8, 0x0d, 0x2e, + 0xff, 0x97, 0xfa, 0xc2, 0xbd, 0x27, 0x32, 0xfc, 0xce, 0x2c, 0x47, 0xfb, 0xb7, 0xf7, 0x36, 0x19, + 0x9d, 0x87, 0xae, 0x70, 0xfc, 0x27, 0x00, 0x00, 0xff, 0xff, 0x4d, 0xfe, 0x6a, 0xcc, 0x63, 0x04, 0x00, 0x00, } diff --git a/proto/defs.proto b/proto/defs.proto index c203fd1eb..4b52e9afa 100644 --- a/proto/defs.proto +++ b/proto/defs.proto @@ -10,20 +10,19 @@ message Entity { bytes start = 5; bytes end = 6; string err = 7; - string print = 8; - int64 time_at = 9; + int64 created_at = 8; } message Nothing { } -message DDomain { +message Domain { bytes start = 1; bytes end = 2; } message Iterator { - DDomain domain = 1; + Domain domain = 1; bool valid = 2; bytes key = 3; bytes value = 4; From 2cca5a7a4cb320c7ade7845af154b3116295124d Mon Sep 17 00:00:00 2001 From: Christopher Goes Date: Mon, 7 May 2018 23:16:06 +0200 Subject: [PATCH 05/10] Implement TLS/SSL --- Makefile | 20 +++++++++++++++++--- grpcdb/client.go | 11 ++++++----- grpcdb/doc.go | 4 +++- grpcdb/example_test.go | 6 ++++-- grpcdb/server.go | 17 +++++++++++++---- remotedb/remotedb.go | 8 ++------ remotedb/remotedb_test.go | 7 +++++-- 7 files changed, 50 insertions(+), 23 deletions(-) diff --git a/Makefile b/Makefile index 0e715ef16..93312024e 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,8 @@ GOTOOLS = \ github.com/golang/dep/cmd/dep \ github.com/gogo/protobuf/protoc-gen-gogo \ - github.com/gogo/protobuf/gogoproto + github.com/gogo/protobuf/gogoproto \ + github.com/square/certstrap # github.com/alecthomas/gometalinter.v2 \ GOTOOLS_CHECK = dep gometalinter.v2 protoc protoc-gen-gogo @@ -66,8 +67,21 @@ get_vendor_deps: ######################################## ### Testing -test: +gen_certs: clean_certs + ## Generating certificates for TLS testing... + certstrap init --common-name "tendermint.com" --passphrase "" + certstrap request-cert -ip "::" --passphrase "" + certstrap sign "::" --CA "tendermint.com" --passphrase "" + mv out/{::.crt,::.key} remotedb + +clean_certs: + ## Cleaning TLS testing certificates... + rm -rf out + rm -f remotedb/{::.crt,::.key} + +test: gen_certs go test -tags gcc $(shell go list ./... | grep -v vendor) + make clean_certs test100: @for i in {1..100}; do make test; done @@ -118,7 +132,7 @@ metalinter_all: # To avoid unintended conflicts with file names, always add to .PHONY # unless there is a reason not to. # https://www.gnu.org/software/make/manual/html_node/Phony-Targets.html -.PHONY: check protoc build check_tools get_tools get_protoc update_tools get_vendor_deps test fmt metalinter metalinter_all +.PHONY: check protoc build check_tools get_tools get_protoc update_tools get_vendor_deps test fmt metalinter metalinter_all gen_certs clean_certs grpc_dbserver: protoc -I proto/ proto/defs.proto --go_out=plugins=grpc:proto diff --git a/grpcdb/client.go b/grpcdb/client.go index a09720abc..07fd461ec 100644 --- a/grpcdb/client.go +++ b/grpcdb/client.go @@ -2,6 +2,7 @@ package grpcdb import ( "google.golang.org/grpc" + "google.golang.org/grpc/credentials" protodb "github.com/tendermint/tmlibs/proto" ) @@ -16,12 +17,12 @@ const ( // NewClient creates a gRPC client connected to the bound gRPC server at serverAddr. // Use kind to set the level of security to either Secure or Insecure. -func NewClient(serverAddr string, kind Security) (protodb.DBClient, error) { - var opts []grpc.DialOption - if kind == Insecure { - opts = append(opts, grpc.WithInsecure()) +func NewClient(serverAddr string, serverCert string) (protodb.DBClient, error) { + creds, err := credentials.NewClientTLSFromFile(serverCert, "") + if err != nil { + return nil, err } - cc, err := grpc.Dial(serverAddr, opts...) + cc, err := grpc.Dial(serverAddr, grpc.WithTransportCredentials(creds)) if err != nil { return nil, err } diff --git a/grpcdb/doc.go b/grpcdb/doc.go index a54cab207..c92de82d3 100644 --- a/grpcdb/doc.go +++ b/grpcdb/doc.go @@ -21,8 +21,10 @@ should use it, for functionality such as: or addr := ":8998" + cert := "server.crt" + key := "server.key" go func() { - if err := grpcdb.ListenAndServe(addr); err != nil { + if err := grpcdb.ListenAndServe(addr, cert, key); err != nil { log.Fatalf("BindServer: %v", err) } }() diff --git a/grpcdb/example_test.go b/grpcdb/example_test.go index cbe1abf92..5a9c6eed9 100644 --- a/grpcdb/example_test.go +++ b/grpcdb/example_test.go @@ -11,13 +11,15 @@ import ( func Example() { addr := ":8998" + cert := "server.crt" + key := "server.key" go func() { - if err := grpcdb.ListenAndServe(addr); err != nil { + if err := grpcdb.ListenAndServe(addr, cert, key); err != nil { log.Fatalf("BindServer: %v", err) } }() - client, err := grpcdb.NewClient(addr, grpcdb.Insecure) + client, err := grpcdb.NewClient(addr, cert) if err != nil { log.Fatalf("Failed to create grpcDB client: %v", err) } diff --git a/grpcdb/server.go b/grpcdb/server.go index 1e8495300..d8dc1581f 100644 --- a/grpcdb/server.go +++ b/grpcdb/server.go @@ -7,6 +7,7 @@ import ( "time" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" "github.com/tendermint/tmlibs/db" protodb "github.com/tendermint/tmlibs/proto" @@ -15,19 +16,27 @@ import ( // ListenAndServe is a blocking function that sets up a gRPC based // server at the address supplied, with the gRPC options passed in. // Normally in usage, invoke it in a goroutine like you would for http.ListenAndServe. -func ListenAndServe(addr string, opts ...grpc.ServerOption) error { +func ListenAndServe(addr string, cert string, key string, opts ...grpc.ServerOption) error { ln, err := net.Listen("tcp", addr) if err != nil { return err } - srv := NewServer(opts...) + srv, err := NewServer(cert, key, opts...) + if err != nil { + return err + } return srv.Serve(ln) } -func NewServer(opts ...grpc.ServerOption) *grpc.Server { +func NewServer(cert string, key string, opts ...grpc.ServerOption) (*grpc.Server, error) { + creds, err := credentials.NewServerTLSFromFile(cert, key) + if err != nil { + return nil, err + } + opts = append(opts, grpc.Creds(creds)) srv := grpc.NewServer(opts...) protodb.RegisterDBServer(srv, new(server)) - return srv + return srv, nil } type server struct { diff --git a/remotedb/remotedb.go b/remotedb/remotedb.go index a110e816c..b80cd3fdb 100644 --- a/remotedb/remotedb.go +++ b/remotedb/remotedb.go @@ -14,12 +14,8 @@ type RemoteDB struct { dc protodb.DBClient } -func NewSecure(serverAddr string) (*RemoteDB, error) { - return newRemoteDB(grpcdb.NewClient(serverAddr, grpcdb.Secure)) -} - -func NewInsecure(serverAddr string) (*RemoteDB, error) { - return newRemoteDB(grpcdb.NewClient(serverAddr, grpcdb.Insecure)) +func NewRemoteDB(serverAddr string, serverKey string) (*RemoteDB, error) { + return newRemoteDB(grpcdb.NewClient(serverAddr, serverKey)) } func newRemoteDB(gdc protodb.DBClient, err error) (*RemoteDB, error) { diff --git a/remotedb/remotedb_test.go b/remotedb/remotedb_test.go index 37ce0c59a..a5b77cf5a 100644 --- a/remotedb/remotedb_test.go +++ b/remotedb/remotedb_test.go @@ -11,9 +11,12 @@ import ( ) func TestRemoteDB(t *testing.T) { + cert := "::.crt" + key := "::.key" ln, err := net.Listen("tcp", "0.0.0.0:0") require.Nil(t, err, "expecting a port to have been assigned on which we can listen") - srv := grpcdb.NewServer() + srv, err := grpcdb.NewServer(cert, key) + require.Nil(t, err) defer srv.Stop() go func() { if err := srv.Serve(ln); err != nil { @@ -21,7 +24,7 @@ func TestRemoteDB(t *testing.T) { } }() - client, err := remotedb.NewInsecure(ln.Addr().String()) + client, err := remotedb.NewRemoteDB(ln.Addr().String(), cert) require.Nil(t, err, "expecting a successful client creation") require.Nil(t, client.InitRemote(&remotedb.Init{Name: "test-remote-db", Type: "leveldb"})) From 55f4ccd4fcdc0359ea70c2c7978313c6a3a17d78 Mon Sep 17 00:00:00 2001 From: Christopher Goes Date: Mon, 7 May 2018 23:23:15 +0200 Subject: [PATCH 06/10] CI fix --- Gopkg.lock | 2 +- Makefile | 4 ++-- test.sh | 6 ++++++ 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 32669c198..96df808a5 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -285,6 +285,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "8b1ff7eb1a874905f0d7772407cfacd3fca77a2214530e633c0f4d7e468a6f92" + inputs-digest = "8aa4ea7ef6d0ff170127eb5bca89c6c37c767d58047159cfd26a431c5cd5e7ad" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Makefile b/Makefile index 93312024e..41e5f1291 100644 --- a/Makefile +++ b/Makefile @@ -72,12 +72,12 @@ gen_certs: clean_certs certstrap init --common-name "tendermint.com" --passphrase "" certstrap request-cert -ip "::" --passphrase "" certstrap sign "::" --CA "tendermint.com" --passphrase "" - mv out/{::.crt,::.key} remotedb + mv out/::.crt out/::.key remotedb clean_certs: ## Cleaning TLS testing certificates... rm -rf out - rm -f remotedb/{::.crt,::.key} + rm -f remotedb/::.crt remotedb/::.key test: gen_certs go test -tags gcc $(shell go list ./... | grep -v vendor) diff --git a/test.sh b/test.sh index b3978d3fe..ecf17fc45 100755 --- a/test.sh +++ b/test.sh @@ -4,6 +4,9 @@ set -e # run the linter # make metalinter_test +# setup certs +make gen_certs + # run the unit tests with coverage echo "" > coverage.txt for d in $(go list ./... | grep -v vendor); do @@ -13,3 +16,6 @@ for d in $(go list ./... | grep -v vendor); do rm profile.out fi done + +# cleanup certs +make clean_certs From 39e1567d0ad3ef578bca645c914fc0c6b8f0fbed Mon Sep 17 00:00:00 2001 From: Christopher Goes Date: Tue, 8 May 2018 00:53:33 +0200 Subject: [PATCH 07/10] Add iterator tests --- remotedb/remotedb_test.go | 33 +++++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/remotedb/remotedb_test.go b/remotedb/remotedb_test.go index a5b77cf5a..c4014fe66 100644 --- a/remotedb/remotedb_test.go +++ b/remotedb/remotedb_test.go @@ -36,9 +36,38 @@ func TestRemoteDB(t *testing.T) { gv1 := client.Get(k1) require.Equal(t, gv1, vv1) + // Simple iteration + itr := client.Iterator(nil, nil) + itr.Next() + require.Equal(t, itr.Key(), []byte("key-1")) + require.Equal(t, itr.Value(), []byte("value-1")) + require.Panics(t, itr.Next) + itr.Close() + + // Set some more keys + k2 := []byte("key-2") + v2 := []byte("value-2") + client.Set(k2, v2) + gv2 := client.Get(k2) + require.Equal(t, gv2, v2) + + // More iteration + itr = client.Iterator(nil, nil) + itr.Next() + require.Equal(t, itr.Key(), []byte("key-1")) + require.Equal(t, itr.Value(), []byte("value-1")) + itr.Next() + require.Equal(t, itr.Key(), []byte("key-2")) + require.Equal(t, itr.Value(), []byte("value-2")) + require.Panics(t, itr.Next) + // Deletion client.Delete(k1) - gv2 := client.Get(k1) + client.Delete(k2) + gv1 = client.Get(k1) + gv2 = client.Get(k2) require.Equal(t, len(gv2), 0, "after deletion, not expecting the key to exist anymore") - require.NotEqual(t, len(gv1), len(gv2), "after deletion, not expecting the key to exist anymore") + require.Equal(t, len(gv1), 0, "after deletion, not expecting the key to exist anymore") + + // TODO Batch tests } From 45514a6013602659e2fa168440d10f46af1289fe Mon Sep 17 00:00:00 2001 From: Christopher Goes Date: Tue, 8 May 2018 15:45:49 +0200 Subject: [PATCH 08/10] Address PR comments --- grpcdb/client.go | 2 +- grpcdb/doc.go | 4 ++-- grpcdb/server.go | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/grpcdb/client.go b/grpcdb/client.go index 07fd461ec..bae38b1c5 100644 --- a/grpcdb/client.go +++ b/grpcdb/client.go @@ -17,7 +17,7 @@ const ( // NewClient creates a gRPC client connected to the bound gRPC server at serverAddr. // Use kind to set the level of security to either Secure or Insecure. -func NewClient(serverAddr string, serverCert string) (protodb.DBClient, error) { +func NewClient(serverAddr, serverCert string) (protodb.DBClient, error) { creds, err := credentials.NewClientTLSFromFile(serverCert, "") if err != nil { return nil, err diff --git a/grpcdb/doc.go b/grpcdb/doc.go index c92de82d3..0d8e380ce 100644 --- a/grpcdb/doc.go +++ b/grpcdb/doc.go @@ -21,8 +21,8 @@ should use it, for functionality such as: or addr := ":8998" - cert := "server.crt" - key := "server.key" + cert := "server.crt" + key := "server.key" go func() { if err := grpcdb.ListenAndServe(addr, cert, key); err != nil { log.Fatalf("BindServer: %v", err) diff --git a/grpcdb/server.go b/grpcdb/server.go index d8dc1581f..9b00be43f 100644 --- a/grpcdb/server.go +++ b/grpcdb/server.go @@ -16,7 +16,7 @@ import ( // ListenAndServe is a blocking function that sets up a gRPC based // server at the address supplied, with the gRPC options passed in. // Normally in usage, invoke it in a goroutine like you would for http.ListenAndServe. -func ListenAndServe(addr string, cert string, key string, opts ...grpc.ServerOption) error { +func ListenAndServe(addr, cert, key string, opts ...grpc.ServerOption) error { ln, err := net.Listen("tcp", addr) if err != nil { return err @@ -28,7 +28,7 @@ func ListenAndServe(addr string, cert string, key string, opts ...grpc.ServerOpt return srv.Serve(ln) } -func NewServer(cert string, key string, opts ...grpc.ServerOption) (*grpc.Server, error) { +func NewServer(cert, key string, opts ...grpc.ServerOption) (*grpc.Server, error) { creds, err := credentials.NewServerTLSFromFile(cert, key) if err != nil { return nil, err From 0b6d101c772579bff6c75d4611b9f8c2eae41a1e Mon Sep 17 00:00:00 2001 From: Christopher Goes Date: Tue, 8 May 2018 16:38:39 +0200 Subject: [PATCH 09/10] Implement batch operations --- grpcdb/server.go | 26 +++++ proto/defs.pb.go | 216 +++++++++++++++++++++++++++++++------- proto/defs.proto | 15 +++ remotedb/remotedb.go | 46 +++++++- remotedb/remotedb_test.go | 34 +++++- 5 files changed, 294 insertions(+), 43 deletions(-) diff --git a/grpcdb/server.go b/grpcdb/server.go index 9b00be43f..d4cfe4433 100644 --- a/grpcdb/server.go +++ b/grpcdb/server.go @@ -169,3 +169,29 @@ func (s *server) Stats(context.Context, *protodb.Nothing) (*protodb.Stats, error stats := s.db.Stats() return &protodb.Stats{Data: stats, TimeAt: time.Now().Unix()}, nil } + +func (s *server) BatchWrite(c context.Context, b *protodb.Batch) (*protodb.Nothing, error) { + return s.batchWrite(c, b, false) +} + +func (s *server) BatchWriteSync(c context.Context, b *protodb.Batch) (*protodb.Nothing, error) { + return s.batchWrite(c, b, true) +} + +func (s *server) batchWrite(c context.Context, b *protodb.Batch, sync bool) (*protodb.Nothing, error) { + bat := s.db.NewBatch() + for _, op := range b.Ops { + switch op.Type { + case protodb.Operation_SET: + bat.Set(op.Entity.Key, op.Entity.Value) + case protodb.Operation_DELETE: + bat.Delete(op.Entity.Key) + } + } + if sync { + bat.WriteSync() + } else { + bat.Write() + } + return nothing, nil +} diff --git a/proto/defs.pb.go b/proto/defs.pb.go index c65b28e08..4d9f0b272 100644 --- a/proto/defs.pb.go +++ b/proto/defs.pb.go @@ -8,6 +8,8 @@ It is generated from these files: defs.proto It has these top-level messages: + Batch + Operation Entity Nothing Domain @@ -37,6 +39,67 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package +type Operation_Type int32 + +const ( + Operation_SET Operation_Type = 0 + Operation_DELETE Operation_Type = 1 +) + +var Operation_Type_name = map[int32]string{ + 0: "SET", + 1: "DELETE", +} +var Operation_Type_value = map[string]int32{ + "SET": 0, + "DELETE": 1, +} + +func (x Operation_Type) String() string { + return proto.EnumName(Operation_Type_name, int32(x)) +} +func (Operation_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{1, 0} } + +type Batch struct { + Ops []*Operation `protobuf:"bytes,1,rep,name=ops" json:"ops,omitempty"` +} + +func (m *Batch) Reset() { *m = Batch{} } +func (m *Batch) String() string { return proto.CompactTextString(m) } +func (*Batch) ProtoMessage() {} +func (*Batch) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *Batch) GetOps() []*Operation { + if m != nil { + return m.Ops + } + return nil +} + +type Operation struct { + Entity *Entity `protobuf:"bytes,1,opt,name=entity" json:"entity,omitempty"` + Type Operation_Type `protobuf:"varint,2,opt,name=type,enum=protodb.Operation_Type" json:"type,omitempty"` +} + +func (m *Operation) Reset() { *m = Operation{} } +func (m *Operation) String() string { return proto.CompactTextString(m) } +func (*Operation) ProtoMessage() {} +func (*Operation) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func (m *Operation) GetEntity() *Entity { + if m != nil { + return m.Entity + } + return nil +} + +func (m *Operation) GetType() Operation_Type { + if m != nil { + return m.Type + } + return Operation_SET +} + type Entity struct { Id int32 `protobuf:"varint,1,opt,name=id" json:"id,omitempty"` Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` @@ -51,7 +114,7 @@ type Entity struct { func (m *Entity) Reset() { *m = Entity{} } func (m *Entity) String() string { return proto.CompactTextString(m) } func (*Entity) ProtoMessage() {} -func (*Entity) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } +func (*Entity) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } func (m *Entity) GetId() int32 { if m != nil { @@ -115,7 +178,7 @@ type Nothing struct { func (m *Nothing) Reset() { *m = Nothing{} } func (m *Nothing) String() string { return proto.CompactTextString(m) } func (*Nothing) ProtoMessage() {} -func (*Nothing) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } +func (*Nothing) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } type Domain struct { Start []byte `protobuf:"bytes,1,opt,name=start,proto3" json:"start,omitempty"` @@ -125,7 +188,7 @@ type Domain struct { func (m *Domain) Reset() { *m = Domain{} } func (m *Domain) String() string { return proto.CompactTextString(m) } func (*Domain) ProtoMessage() {} -func (*Domain) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } +func (*Domain) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } func (m *Domain) GetStart() []byte { if m != nil { @@ -151,7 +214,7 @@ type Iterator struct { func (m *Iterator) Reset() { *m = Iterator{} } func (m *Iterator) String() string { return proto.CompactTextString(m) } func (*Iterator) ProtoMessage() {} -func (*Iterator) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } +func (*Iterator) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } func (m *Iterator) GetDomain() *Domain { if m != nil { @@ -189,7 +252,7 @@ type Stats struct { func (m *Stats) Reset() { *m = Stats{} } func (m *Stats) String() string { return proto.CompactTextString(m) } func (*Stats) ProtoMessage() {} -func (*Stats) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } +func (*Stats) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } func (m *Stats) GetData() map[string]string { if m != nil { @@ -214,7 +277,7 @@ type Init struct { func (m *Init) Reset() { *m = Init{} } func (m *Init) String() string { return proto.CompactTextString(m) } func (*Init) ProtoMessage() {} -func (*Init) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } +func (*Init) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } func (m *Init) GetType() string { if m != nil { @@ -238,12 +301,15 @@ func (m *Init) GetDir() string { } func init() { + proto.RegisterType((*Batch)(nil), "protodb.Batch") + proto.RegisterType((*Operation)(nil), "protodb.Operation") proto.RegisterType((*Entity)(nil), "protodb.Entity") proto.RegisterType((*Nothing)(nil), "protodb.Nothing") proto.RegisterType((*Domain)(nil), "protodb.Domain") proto.RegisterType((*Iterator)(nil), "protodb.Iterator") proto.RegisterType((*Stats)(nil), "protodb.Stats") proto.RegisterType((*Init)(nil), "protodb.Init") + proto.RegisterEnum("protodb.Operation_Type", Operation_Type_name, Operation_Type_value) } // Reference imports to suppress errors if they are not otherwise used. @@ -269,6 +335,8 @@ type DBClient interface { ReverseIterator(ctx context.Context, in *Entity, opts ...grpc.CallOption) (DB_ReverseIteratorClient, error) // rpc print(Nothing) returns (Entity) {} Stats(ctx context.Context, in *Nothing, opts ...grpc.CallOption) (*Stats, error) + BatchWrite(ctx context.Context, in *Batch, opts ...grpc.CallOption) (*Nothing, error) + BatchWriteSync(ctx context.Context, in *Batch, opts ...grpc.CallOption) (*Nothing, error) } type dBClient struct { @@ -446,6 +514,24 @@ func (c *dBClient) Stats(ctx context.Context, in *Nothing, opts ...grpc.CallOpti return out, nil } +func (c *dBClient) BatchWrite(ctx context.Context, in *Batch, opts ...grpc.CallOption) (*Nothing, error) { + out := new(Nothing) + err := grpc.Invoke(ctx, "/protodb.DB/batchWrite", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *dBClient) BatchWriteSync(ctx context.Context, in *Batch, opts ...grpc.CallOption) (*Nothing, error) { + out := new(Nothing) + err := grpc.Invoke(ctx, "/protodb.DB/batchWriteSync", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // Server API for DB service type DBServer interface { @@ -461,6 +547,8 @@ type DBServer interface { ReverseIterator(*Entity, DB_ReverseIteratorServer) error // rpc print(Nothing) returns (Entity) {} Stats(context.Context, *Nothing) (*Stats, error) + BatchWrite(context.Context, *Batch) (*Nothing, error) + BatchWriteSync(context.Context, *Batch) (*Nothing, error) } func RegisterDBServer(s *grpc.Server, srv DBServer) { @@ -679,6 +767,42 @@ func _DB_Stats_Handler(srv interface{}, ctx context.Context, dec func(interface{ return interceptor(ctx, in, info, handler) } +func _DB_BatchWrite_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Batch) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DBServer).BatchWrite(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/protodb.DB/BatchWrite", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DBServer).BatchWrite(ctx, req.(*Batch)) + } + return interceptor(ctx, in, info, handler) +} + +func _DB_BatchWriteSync_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Batch) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DBServer).BatchWriteSync(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/protodb.DB/BatchWriteSync", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DBServer).BatchWriteSync(ctx, req.(*Batch)) + } + return interceptor(ctx, in, info, handler) +} + var _DB_serviceDesc = grpc.ServiceDesc{ ServiceName: "protodb.DB", HandlerType: (*DBServer)(nil), @@ -715,6 +839,14 @@ var _DB_serviceDesc = grpc.ServiceDesc{ MethodName: "stats", Handler: _DB_Stats_Handler, }, + { + MethodName: "batchWrite", + Handler: _DB_BatchWrite_Handler, + }, + { + MethodName: "batchWriteSync", + Handler: _DB_BatchWriteSync_Handler, + }, }, Streams: []grpc.StreamDesc{ { @@ -740,37 +872,43 @@ var _DB_serviceDesc = grpc.ServiceDesc{ func init() { proto.RegisterFile("defs.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 498 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0xcd, 0x6e, 0xd3, 0x40, - 0x10, 0xce, 0xda, 0x8e, 0x13, 0x4f, 0xa1, 0x2d, 0x2b, 0x04, 0xab, 0x4a, 0x48, 0x96, 0x2f, 0x98, - 0x3f, 0x2b, 0xa4, 0x07, 0x7e, 0x4e, 0x14, 0xa5, 0x87, 0x5c, 0x7a, 0x70, 0xb8, 0xa3, 0x6d, 0x3d, - 0xa4, 0x2b, 0x1a, 0xbb, 0xec, 0x0e, 0x15, 0x7e, 0x02, 0x1e, 0x80, 0x27, 0xe1, 0x0d, 0xd1, 0xae, - 0x7f, 0x42, 0x69, 0x0e, 0xe6, 0xe4, 0x99, 0xd9, 0xef, 0xfb, 0x66, 0xf6, 0xf3, 0x2c, 0x40, 0x81, - 0x5f, 0x4c, 0x76, 0xad, 0x2b, 0xaa, 0xf8, 0xc4, 0x7d, 0x8a, 0xf3, 0xe4, 0x37, 0x83, 0xf0, 0xb4, - 0x24, 0x45, 0x35, 0xdf, 0x07, 0x4f, 0x15, 0x82, 0xc5, 0x2c, 0x1d, 0xe7, 0x9e, 0x2a, 0xf8, 0x21, - 0xf8, 0x5f, 0xb1, 0x16, 0x5e, 0xcc, 0xd2, 0x7b, 0xb9, 0x0d, 0xf9, 0x43, 0x18, 0xdf, 0xc8, 0xab, - 0xef, 0x28, 0x7c, 0x57, 0x6b, 0x12, 0xfe, 0x08, 0x42, 0xfc, 0xa1, 0x0c, 0x19, 0x11, 0xc4, 0x2c, - 0x9d, 0xe6, 0x6d, 0x66, 0xd1, 0x86, 0xa4, 0x26, 0x31, 0x6e, 0xd0, 0x2e, 0xb1, 0xaa, 0x58, 0x16, - 0x22, 0x6c, 0x54, 0xb1, 0x74, 0x7d, 0x50, 0x6b, 0x31, 0x89, 0x59, 0x1a, 0xe5, 0x36, 0xe4, 0x4f, - 0x00, 0x2e, 0x34, 0x4a, 0xc2, 0xe2, 0xb3, 0x24, 0x31, 0x8d, 0x59, 0xea, 0xe7, 0x51, 0x5b, 0x39, - 0xa1, 0x24, 0x82, 0xc9, 0x59, 0x45, 0x97, 0xaa, 0x5c, 0x27, 0x33, 0x08, 0x17, 0xd5, 0x46, 0xaa, - 0x72, 0xdb, 0x8d, 0xed, 0xe8, 0xe6, 0xf5, 0xdd, 0x92, 0x6f, 0x30, 0x5d, 0x12, 0x6a, 0x49, 0x95, - 0xe6, 0x4f, 0x21, 0x2c, 0x1c, 0xdb, 0x91, 0xf6, 0xe6, 0x07, 0x59, 0x6b, 0x4b, 0xd6, 0x88, 0xe6, - 0xed, 0x71, 0x7b, 0x71, 0xd5, 0x08, 0x4d, 0xf3, 0x26, 0xe9, 0x0c, 0xf2, 0x77, 0x18, 0x14, 0xfc, - 0x65, 0x50, 0xf2, 0x93, 0xc1, 0x78, 0x45, 0x92, 0x0c, 0x7f, 0x09, 0x41, 0x21, 0x49, 0x0a, 0x16, - 0xfb, 0xe9, 0xde, 0x5c, 0xf4, 0xed, 0xdc, 0x69, 0xb6, 0x90, 0x24, 0x4f, 0x4b, 0xd2, 0x75, 0xee, - 0x50, 0xfc, 0x31, 0x4c, 0x48, 0x6d, 0xd0, 0x7a, 0xe0, 0x39, 0x0f, 0x42, 0x9b, 0x9e, 0xd0, 0xd1, - 0x1b, 0x88, 0x7a, 0x6c, 0x37, 0x05, 0x6b, 0xec, 0xbb, 0x35, 0x85, 0xe7, 0x6a, 0x4d, 0xf2, 0xde, - 0x7b, 0xcb, 0x92, 0x0f, 0x10, 0x2c, 0x4b, 0x45, 0x9c, 0x43, 0xf0, 0xa9, 0xbe, 0xc6, 0x96, 0xe4, - 0x62, 0x5b, 0x3b, 0x93, 0x9b, 0x8e, 0xe4, 0x62, 0xab, 0xbd, 0x50, 0xda, 0xdd, 0x30, 0xca, 0x6d, - 0x38, 0xff, 0x15, 0x80, 0xb7, 0xf8, 0xc8, 0x53, 0x08, 0x94, 0x15, 0xba, 0xdf, 0x5f, 0xc1, 0xea, - 0x1e, 0x6d, 0x0d, 0x6c, 0x76, 0x2a, 0x19, 0xf1, 0x67, 0xe0, 0xaf, 0x91, 0xf8, 0xbf, 0x27, 0xbb, - 0xa0, 0xc7, 0x10, 0xad, 0x91, 0x56, 0xa4, 0x51, 0x6e, 0x86, 0x10, 0x52, 0x36, 0x63, 0x56, 0xff, - 0x52, 0x9a, 0x41, 0xfa, 0xcf, 0xc1, 0x37, 0xbb, 0x46, 0x39, 0xec, 0x0b, 0xdd, 0x5a, 0x8d, 0x78, - 0x06, 0x13, 0x83, 0xb4, 0xaa, 0xcb, 0x8b, 0x61, 0xf8, 0x57, 0x10, 0x16, 0x78, 0x85, 0x84, 0xc3, - 0xe0, 0xaf, 0xed, 0x6b, 0xb4, 0xf0, 0xe1, 0x1d, 0xe6, 0x30, 0x55, 0xdd, 0xe2, 0xde, 0x21, 0x3c, - 0xd8, 0xfe, 0x87, 0x16, 0x93, 0x8c, 0x66, 0x8c, 0xbf, 0x83, 0x03, 0x8d, 0x37, 0xa8, 0x0d, 0x2e, - 0xff, 0x97, 0xfa, 0xc2, 0xbd, 0x27, 0x32, 0xfc, 0xce, 0x2c, 0x47, 0xfb, 0xb7, 0xf7, 0x36, 0x19, - 0x9d, 0x87, 0xae, 0x70, 0xfc, 0x27, 0x00, 0x00, 0xff, 0xff, 0x4d, 0xfe, 0x6a, 0xcc, 0x63, 0x04, - 0x00, 0x00, + // 606 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0x4f, 0x6f, 0xd3, 0x4e, + 0x10, 0xcd, 0xda, 0x8e, 0x13, 0x4f, 0x7f, 0xbf, 0x34, 0x8c, 0x10, 0xb5, 0x8a, 0x90, 0x22, 0x0b, + 0x09, 0x43, 0x69, 0x14, 0x52, 0x24, 0xfe, 0x9c, 0x68, 0x95, 0x1c, 0x2a, 0xa1, 0x22, 0x39, 0x95, + 0x38, 0xa2, 0x6d, 0x3d, 0x34, 0x2b, 0x1a, 0x3b, 0xac, 0x87, 0x8a, 0x5c, 0xb8, 0xf2, 0x79, 0xf8, + 0x7c, 0x5c, 0xd0, 0xae, 0x1d, 0x87, 0x36, 0x39, 0x84, 0x53, 0x76, 0x66, 0xde, 0x7b, 0xb3, 0xf3, + 0x32, 0x5e, 0x80, 0x94, 0x3e, 0x17, 0xfd, 0xb9, 0xce, 0x39, 0xc7, 0x96, 0xfd, 0x49, 0x2f, 0xa2, + 0x43, 0x68, 0x9e, 0x48, 0xbe, 0x9c, 0xe2, 0x63, 0x70, 0xf3, 0x79, 0x11, 0x8a, 0x9e, 0x1b, 0xef, + 0x0c, 0xb1, 0x5f, 0xd5, 0xfb, 0x1f, 0xe6, 0xa4, 0x25, 0xab, 0x3c, 0x4b, 0x4c, 0x39, 0xfa, 0x01, + 0x41, 0x9d, 0xc1, 0x27, 0xe0, 0x53, 0xc6, 0x8a, 0x17, 0xa1, 0xe8, 0x89, 0x78, 0x67, 0xb8, 0x5b, + 0xb3, 0xc6, 0x36, 0x9d, 0x54, 0x65, 0x3c, 0x00, 0x8f, 0x17, 0x73, 0x0a, 0x9d, 0x9e, 0x88, 0x3b, + 0xc3, 0xbd, 0x75, 0xf1, 0xfe, 0xf9, 0x62, 0x4e, 0x89, 0x05, 0x45, 0x0f, 0xc1, 0x33, 0x11, 0xb6, + 0xc0, 0x9d, 0x8c, 0xcf, 0xbb, 0x0d, 0x04, 0xf0, 0x47, 0xe3, 0xf7, 0xe3, 0xf3, 0x71, 0x57, 0x44, + 0xbf, 0x04, 0xf8, 0xa5, 0x38, 0x76, 0xc0, 0x51, 0xa9, 0xed, 0xdc, 0x4c, 0x1c, 0x95, 0x62, 0x17, + 0xdc, 0x2f, 0xb4, 0xb0, 0x3d, 0xfe, 0x4b, 0xcc, 0x11, 0xef, 0x43, 0xf3, 0x46, 0x5e, 0x7f, 0xa3, + 0xd0, 0xb5, 0xb9, 0x32, 0xc0, 0x07, 0xe0, 0xd3, 0x77, 0x55, 0x70, 0x11, 0x7a, 0x3d, 0x11, 0xb7, + 0x93, 0x2a, 0x32, 0xe8, 0x82, 0xa5, 0xe6, 0xb0, 0x59, 0xa2, 0x6d, 0x60, 0x54, 0x29, 0x4b, 0x43, + 0xbf, 0x54, 0xa5, 0xcc, 0xf6, 0x21, 0xad, 0xc3, 0x56, 0x4f, 0xc4, 0x41, 0x62, 0x8e, 0xf8, 0x08, + 0xe0, 0x52, 0x93, 0x64, 0x4a, 0x3f, 0x49, 0x0e, 0xdb, 0x3d, 0x11, 0xbb, 0x49, 0x50, 0x65, 0x8e, + 0x39, 0x0a, 0xa0, 0x75, 0x96, 0xf3, 0x54, 0x65, 0x57, 0xd1, 0x00, 0xfc, 0x51, 0x3e, 0x93, 0x2a, + 0x5b, 0x75, 0x13, 0x1b, 0xba, 0x39, 0x75, 0xb7, 0xe8, 0x2b, 0xb4, 0x4f, 0xd9, 0xb8, 0x94, 0x6b, + 0xe3, 0x77, 0x6a, 0xd9, 0x6b, 0x7e, 0x97, 0xa2, 0x49, 0x55, 0xae, 0x06, 0x57, 0xa5, 0x50, 0x3b, + 0x29, 0x83, 0xa5, 0x41, 0xee, 0x06, 0x83, 0xbc, 0xbf, 0x0c, 0x8a, 0x7e, 0x0a, 0x68, 0x4e, 0x58, + 0x72, 0x81, 0xcf, 0xc1, 0x4b, 0x25, 0xcb, 0x6a, 0x29, 0xc2, 0xba, 0x9d, 0xad, 0xf6, 0x47, 0x92, + 0xe5, 0x38, 0x63, 0xbd, 0x48, 0x2c, 0x0a, 0xf7, 0xa0, 0xc5, 0x6a, 0x46, 0xc6, 0x03, 0xc7, 0x7a, + 0xe0, 0x9b, 0xf0, 0x98, 0xf7, 0x5f, 0x41, 0x50, 0x63, 0x97, 0xb7, 0x10, 0xa5, 0x7d, 0xb7, 0x6e, + 0xe1, 0xd8, 0x5c, 0x19, 0xbc, 0x75, 0x5e, 0x8b, 0xe8, 0x1d, 0x78, 0xa7, 0x99, 0x62, 0xc4, 0x72, + 0x25, 0x2a, 0x52, 0xb9, 0x1e, 0x08, 0xde, 0x99, 0x9c, 0x2d, 0x49, 0xf6, 0x6c, 0xb4, 0x47, 0x4a, + 0xdb, 0x09, 0x83, 0xc4, 0x1c, 0x87, 0xbf, 0x3d, 0x70, 0x46, 0x27, 0x18, 0x83, 0xa7, 0x8c, 0xd0, + 0xff, 0xf5, 0x08, 0x46, 0x77, 0xff, 0xee, 0xc2, 0x46, 0x0d, 0x7c, 0x0a, 0xee, 0x15, 0x31, 0xde, + 0xad, 0x6c, 0x82, 0x1e, 0x41, 0x70, 0x45, 0x3c, 0x61, 0x4d, 0x72, 0xb6, 0x0d, 0x21, 0x16, 0x03, + 0x61, 0xf4, 0xa7, 0xb2, 0xd8, 0x4a, 0xff, 0x19, 0xb8, 0xc5, 0xa6, 0xab, 0x74, 0xeb, 0xc4, 0x72, + 0xad, 0x1a, 0xd8, 0x87, 0x56, 0x41, 0x3c, 0x59, 0x64, 0x97, 0xdb, 0xe1, 0x0f, 0xc1, 0x4f, 0xe9, + 0x9a, 0x98, 0xb6, 0x83, 0xbf, 0x30, 0x8f, 0x87, 0x81, 0x6f, 0xdf, 0x61, 0x08, 0x6d, 0xb5, 0x5c, + 0xdc, 0x35, 0xc2, 0xbd, 0xd5, 0xff, 0x50, 0x61, 0xa2, 0xc6, 0x40, 0xe0, 0x1b, 0xd8, 0xd5, 0x74, + 0x43, 0xba, 0xa0, 0xd3, 0x7f, 0xa5, 0x1e, 0xd8, 0xef, 0x89, 0x0b, 0x5c, 0xbb, 0xcb, 0x7e, 0xe7, + 0xf6, 0xde, 0x46, 0x0d, 0x1c, 0x00, 0x5c, 0x98, 0x47, 0xef, 0xa3, 0x56, 0x4c, 0xb8, 0xaa, 0xdb, + 0x97, 0x70, 0xe3, 0x34, 0x2f, 0xa1, 0xb3, 0x62, 0x58, 0x13, 0xb6, 0x60, 0x5d, 0xf8, 0x36, 0x75, + 0xf4, 0x27, 0x00, 0x00, 0xff, 0xff, 0x95, 0xf4, 0xe3, 0x82, 0x7a, 0x05, 0x00, 0x00, } diff --git a/proto/defs.proto b/proto/defs.proto index 4b52e9afa..70471f234 100644 --- a/proto/defs.proto +++ b/proto/defs.proto @@ -2,6 +2,19 @@ syntax = "proto3"; package protodb; +message Batch { + repeated Operation ops = 1; +} + +message Operation { + Entity entity = 1; + enum Type { + SET = 0; + DELETE = 1; + } + Type type = 2; +} + message Entity { int32 id = 1; bytes key = 2; @@ -53,4 +66,6 @@ service DB { rpc reverseIterator(Entity) returns (stream Iterator) {} // rpc print(Nothing) returns (Entity) {} rpc stats(Nothing) returns (Stats) {} + rpc batchWrite(Batch) returns (Nothing) {} + rpc batchWriteSync(Batch) returns (Nothing) {} } diff --git a/remotedb/remotedb.go b/remotedb/remotedb.go index b80cd3fdb..f6e4d9c17 100644 --- a/remotedb/remotedb.go +++ b/remotedb/remotedb.go @@ -90,9 +90,11 @@ func (rd *RemoteDB) ReverseIterator(start, end []byte) db.Iterator { return makeReverseIterator(dic) } -// TODO: Implement NewBatch func (rd *RemoteDB) NewBatch() db.Batch { - panic("Unimplemented") + return &batch{ + db: rd, + ops: nil, + } } // TODO: Implement Print when db.DB implements a method @@ -218,5 +220,43 @@ func (itr *iterator) Value() []byte { } func (itr *iterator) Close() { - // TODO: Shut down the iterator + err := itr.dic.CloseSend() + if err != nil { + panic(fmt.Sprintf("Error closing iterator: %v", err)) + } +} + +type batch struct { + db *RemoteDB + ops []*protodb.Operation +} + +var _ db.Batch = (*batch)(nil) + +func (bat *batch) Set(key, value []byte) { + op := &protodb.Operation{ + Entity: &protodb.Entity{Key: key, Value: value}, + Type: protodb.Operation_SET, + } + bat.ops = append(bat.ops, op) +} + +func (bat *batch) Delete(key []byte) { + op := &protodb.Operation{ + Entity: &protodb.Entity{Key: key}, + Type: protodb.Operation_DELETE, + } + bat.ops = append(bat.ops, op) +} + +func (bat *batch) Write() { + if _, err := bat.db.dc.BatchWrite(bat.db.ctx, &protodb.Batch{Ops: bat.ops}); err != nil { + panic(fmt.Sprintf("RemoteDB.BatchWrite: %v", err)) + } +} + +func (bat *batch) WriteSync() { + if _, err := bat.db.dc.BatchWriteSync(bat.db.ctx, &protodb.Batch{Ops: bat.ops}); err != nil { + panic(fmt.Sprintf("RemoteDB.BatchWriteSync: %v", err)) + } } diff --git a/remotedb/remotedb_test.go b/remotedb/remotedb_test.go index c4014fe66..6bc0c77bd 100644 --- a/remotedb/remotedb_test.go +++ b/remotedb/remotedb_test.go @@ -60,6 +60,7 @@ func TestRemoteDB(t *testing.T) { require.Equal(t, itr.Key(), []byte("key-2")) require.Equal(t, itr.Value(), []byte("value-2")) require.Panics(t, itr.Next) + itr.Close() // Deletion client.Delete(k1) @@ -69,5 +70,36 @@ func TestRemoteDB(t *testing.T) { require.Equal(t, len(gv2), 0, "after deletion, not expecting the key to exist anymore") require.Equal(t, len(gv1), 0, "after deletion, not expecting the key to exist anymore") - // TODO Batch tests + // Batch tests - set + k3 := []byte("key-3") + k4 := []byte("key-4") + k5 := []byte("key-5") + v3 := []byte("value-3") + v4 := []byte("value-4") + v5 := []byte("value-5") + bat := client.NewBatch() + bat.Set(k3, v3) + bat.Set(k4, v4) + rv3 := client.Get(k3) + require.Equal(t, 0, len(rv3), "expecting no k3 to have been stored") + rv4 := client.Get(k4) + require.Equal(t, 0, len(rv4), "expecting no k4 to have been stored") + bat.Write() + rv3 = client.Get(k3) + require.Equal(t, rv3, v3, "expecting k3 to have been stored") + rv4 = client.Get(k4) + require.Equal(t, rv4, v4, "expecting k4 to have been stored") + + // Batch tests - set and delete + bat = client.NewBatch() + bat.Delete(k4) + bat.Set(k5, v5) + bat.Delete(k3) + bat.WriteSync() + rv3 = client.Get(k3) + require.Equal(t, 0, len(rv3), "expecting k3 to have been deleted") + rv4 = client.Get(k4) + require.Equal(t, 0, len(rv4), "expecting k4 to have been deleted") + rv5 := client.Get(k5) + require.Equal(t, rv5, v5, "expecting k5 to have been stored") } From 20be8c75e53523b022f22e4e306ba8a4da034e80 Mon Sep 17 00:00:00 2001 From: Christopher Goes Date: Tue, 8 May 2018 17:13:13 +0200 Subject: [PATCH 10/10] Tweak testcases --- remotedb/remotedb_test.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/remotedb/remotedb_test.go b/remotedb/remotedb_test.go index 6bc0c77bd..cbe9d9095 100644 --- a/remotedb/remotedb_test.go +++ b/remotedb/remotedb_test.go @@ -47,7 +47,9 @@ func TestRemoteDB(t *testing.T) { // Set some more keys k2 := []byte("key-2") v2 := []byte("value-2") - client.Set(k2, v2) + client.SetSync(k2, v2) + has := client.Has(k2) + require.True(t, has) gv2 := client.Get(k2) require.Equal(t, gv2, v2) @@ -64,7 +66,7 @@ func TestRemoteDB(t *testing.T) { // Deletion client.Delete(k1) - client.Delete(k2) + client.DeleteSync(k2) gv1 = client.Get(k1) gv2 = client.Get(k2) require.Equal(t, len(gv2), 0, "after deletion, not expecting the key to exist anymore") @@ -90,16 +92,24 @@ func TestRemoteDB(t *testing.T) { rv4 = client.Get(k4) require.Equal(t, rv4, v4, "expecting k4 to have been stored") - // Batch tests - set and delete + // Batch tests - deletion bat = client.NewBatch() bat.Delete(k4) - bat.Set(k5, v5) bat.Delete(k3) bat.WriteSync() rv3 = client.Get(k3) require.Equal(t, 0, len(rv3), "expecting k3 to have been deleted") rv4 = client.Get(k4) require.Equal(t, 0, len(rv4), "expecting k4 to have been deleted") + + // Batch tests - set and delete + bat = client.NewBatch() + bat.Set(k4, v4) + bat.Set(k5, v5) + bat.Delete(k4) + bat.WriteSync() + rv4 = client.Get(k4) + require.Equal(t, 0, len(rv4), "expecting k4 to have been deleted") rv5 := client.Get(k5) require.Equal(t, rv5, v5, "expecting k5 to have been stored") }