You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

197 lines
5.0 KiB

  1. package grpcdb
  2. import (
  3. "context"
  4. "net"
  5. "sync"
  6. "time"
  7. "google.golang.org/grpc"
  8. "google.golang.org/grpc/credentials"
  9. "github.com/tendermint/tmlibs/db"
  10. protodb "github.com/tendermint/tmlibs/proto"
  11. )
  12. // ListenAndServe is a blocking function that sets up a gRPC based
  13. // server at the address supplied, with the gRPC options passed in.
  14. // Normally in usage, invoke it in a goroutine like you would for http.ListenAndServe.
  15. func ListenAndServe(addr, cert, key string, opts ...grpc.ServerOption) error {
  16. ln, err := net.Listen("tcp", addr)
  17. if err != nil {
  18. return err
  19. }
  20. srv, err := NewServer(cert, key, opts...)
  21. if err != nil {
  22. return err
  23. }
  24. return srv.Serve(ln)
  25. }
  26. func NewServer(cert, key string, opts ...grpc.ServerOption) (*grpc.Server, error) {
  27. creds, err := credentials.NewServerTLSFromFile(cert, key)
  28. if err != nil {
  29. return nil, err
  30. }
  31. opts = append(opts, grpc.Creds(creds))
  32. srv := grpc.NewServer(opts...)
  33. protodb.RegisterDBServer(srv, new(server))
  34. return srv, nil
  35. }
  36. type server struct {
  37. mu sync.Mutex
  38. db db.DB
  39. }
  40. var _ protodb.DBServer = (*server)(nil)
  41. // Init initializes the server's database. Only one type of database
  42. // can be initialized per server.
  43. //
  44. // Dir is the directory on the file system in which the DB will be stored(if backed by disk) (TODO: remove)
  45. //
  46. // Name is representative filesystem entry's basepath
  47. //
  48. // Type can be either one of:
  49. // * cleveldb (if built with gcc enabled)
  50. // * fsdb
  51. // * memdB
  52. // * leveldb
  53. // See https://godoc.org/github.com/tendermint/tmlibs/db#DBBackendType
  54. func (s *server) Init(ctx context.Context, in *protodb.Init) (*protodb.Entity, error) {
  55. s.mu.Lock()
  56. defer s.mu.Unlock()
  57. s.db = db.NewDB(in.Name, db.DBBackendType(in.Type), in.Dir)
  58. return &protodb.Entity{CreatedAt: time.Now().Unix()}, nil
  59. }
  60. func (s *server) Delete(ctx context.Context, in *protodb.Entity) (*protodb.Nothing, error) {
  61. s.db.Delete(in.Key)
  62. return nothing, nil
  63. }
  64. var nothing = new(protodb.Nothing)
  65. func (s *server) DeleteSync(ctx context.Context, in *protodb.Entity) (*protodb.Nothing, error) {
  66. s.db.DeleteSync(in.Key)
  67. return nothing, nil
  68. }
  69. func (s *server) Get(ctx context.Context, in *protodb.Entity) (*protodb.Entity, error) {
  70. value := s.db.Get(in.Key)
  71. return &protodb.Entity{Value: value}, nil
  72. }
  73. func (s *server) GetStream(ds protodb.DB_GetStreamServer) error {
  74. // Receive routine
  75. responsesChan := make(chan *protodb.Entity)
  76. go func() {
  77. defer close(responsesChan)
  78. ctx := context.Background()
  79. for {
  80. in, err := ds.Recv()
  81. if err != nil {
  82. responsesChan <- &protodb.Entity{Err: err.Error()}
  83. return
  84. }
  85. out, err := s.Get(ctx, in)
  86. if err != nil {
  87. if out == nil {
  88. out = new(protodb.Entity)
  89. out.Key = in.Key
  90. }
  91. out.Err = err.Error()
  92. responsesChan <- out
  93. return
  94. }
  95. // Otherwise continue on
  96. responsesChan <- out
  97. }
  98. }()
  99. // Send routine, block until we return
  100. for out := range responsesChan {
  101. if err := ds.Send(out); err != nil {
  102. return err
  103. }
  104. }
  105. return nil
  106. }
  107. func (s *server) Has(ctx context.Context, in *protodb.Entity) (*protodb.Entity, error) {
  108. exists := s.db.Has(in.Key)
  109. return &protodb.Entity{Exists: exists}, nil
  110. }
  111. func (s *server) Set(ctx context.Context, in *protodb.Entity) (*protodb.Nothing, error) {
  112. s.db.Set(in.Key, in.Value)
  113. return nothing, nil
  114. }
  115. func (s *server) SetSync(ctx context.Context, in *protodb.Entity) (*protodb.Nothing, error) {
  116. s.db.SetSync(in.Key, in.Value)
  117. return nothing, nil
  118. }
  119. func (s *server) Iterator(query *protodb.Entity, dis protodb.DB_IteratorServer) error {
  120. it := s.db.Iterator(query.Start, query.End)
  121. return s.handleIterator(it, dis.Send)
  122. }
  123. func (s *server) handleIterator(it db.Iterator, sendFunc func(*protodb.Iterator) error) error {
  124. for it.Valid() {
  125. start, end := it.Domain()
  126. out := &protodb.Iterator{
  127. Domain: &protodb.Domain{Start: start, End: end},
  128. Valid: it.Valid(),
  129. Key: it.Key(),
  130. Value: it.Value(),
  131. }
  132. if err := sendFunc(out); err != nil {
  133. return err
  134. }
  135. // Finally move the iterator forward
  136. it.Next()
  137. }
  138. return nil
  139. }
  140. func (s *server) ReverseIterator(query *protodb.Entity, dis protodb.DB_ReverseIteratorServer) error {
  141. it := s.db.ReverseIterator(query.Start, query.End)
  142. return s.handleIterator(it, dis.Send)
  143. }
  144. func (s *server) Stats(context.Context, *protodb.Nothing) (*protodb.Stats, error) {
  145. stats := s.db.Stats()
  146. return &protodb.Stats{Data: stats, TimeAt: time.Now().Unix()}, nil
  147. }
  148. func (s *server) BatchWrite(c context.Context, b *protodb.Batch) (*protodb.Nothing, error) {
  149. return s.batchWrite(c, b, false)
  150. }
  151. func (s *server) BatchWriteSync(c context.Context, b *protodb.Batch) (*protodb.Nothing, error) {
  152. return s.batchWrite(c, b, true)
  153. }
  154. func (s *server) batchWrite(c context.Context, b *protodb.Batch, sync bool) (*protodb.Nothing, error) {
  155. bat := s.db.NewBatch()
  156. for _, op := range b.Ops {
  157. switch op.Type {
  158. case protodb.Operation_SET:
  159. bat.Set(op.Entity.Key, op.Entity.Value)
  160. case protodb.Operation_DELETE:
  161. bat.Delete(op.Entity.Key)
  162. }
  163. }
  164. if sync {
  165. bat.WriteSync()
  166. } else {
  167. bat.Write()
  168. }
  169. return nothing, nil
  170. }