You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

159 lines
4.0 KiB

  1. package grpcdb
  2. import (
  3. "context"
  4. "log"
  5. "net"
  6. "sync"
  7. "time"
  8. "google.golang.org/grpc"
  9. "github.com/tendermint/tmlibs/db"
  10. protodb "github.com/tendermint/tmlibs/proto"
  11. )
  12. // BindServer is a blocking function that sets up a gRPC based
  13. // server at the address supplied, with the gRPC options passed in.
  14. // Normally in usage, invoke it in a goroutine like you would for http.ListenAndServe.
  15. func BindServer(addr string, opts ...grpc.ServerOption) error {
  16. ln, err := net.Listen("tcp", addr)
  17. if err != nil {
  18. return err
  19. }
  20. srv := grpc.NewServer(opts...)
  21. protodb.RegisterDBServer(srv, new(server))
  22. return srv.Serve(ln)
  23. }
  24. type server struct {
  25. mu sync.Mutex
  26. db db.DB
  27. }
  28. var _ protodb.DBServer = (*server)(nil)
  29. // Init initializes the server's database. Only one type of database
  30. // can be initialized per server.
  31. //
  32. // Dir is the directory on the file system in which the DB will be stored(if backed by disk) (TODO: remove)
  33. //
  34. // Name is representative filesystem entry's basepath
  35. //
  36. // Type can be either one of:
  37. // * cleveldb (if built with gcc enabled)
  38. // * fsdb
  39. // * memdB
  40. // * leveldb
  41. // See https://godoc.org/github.com/tendermint/tmlibs/db#DBBackendType
  42. func (s *server) Init(ctx context.Context, in *protodb.Init) (*protodb.Entity, error) {
  43. s.mu.Lock()
  44. defer s.mu.Unlock()
  45. log.Printf("in: %+v\n", in)
  46. s.db = db.NewDB(in.Name, db.DBBackendType(in.Type), in.Dir)
  47. return &protodb.Entity{TimeAt: time.Now().Unix()}, nil
  48. }
  49. func (s *server) Delete(ctx context.Context, in *protodb.Entity) (*protodb.Nothing, error) {
  50. s.db.Delete(in.Key)
  51. return nothing, nil
  52. }
  53. var nothing = new(protodb.Nothing)
  54. func (s *server) DeleteSync(ctx context.Context, in *protodb.Entity) (*protodb.Nothing, error) {
  55. s.db.DeleteSync(in.Key)
  56. return nothing, nil
  57. }
  58. func (s *server) Get(ctx context.Context, in *protodb.Entity) (*protodb.Entity, error) {
  59. value := s.db.Get(in.Key)
  60. return &protodb.Entity{Value: value}, nil
  61. }
  62. func (s *server) GetStream(ds protodb.DB_GetStreamServer) error {
  63. // Receive routine
  64. responsesChan := make(chan *protodb.Entity)
  65. go func() {
  66. defer close(responsesChan)
  67. ctx := context.Background()
  68. for {
  69. in, err := ds.Recv()
  70. if err != nil {
  71. responsesChan <- &protodb.Entity{Err: err.Error()}
  72. return
  73. }
  74. out, err := s.Get(ctx, in)
  75. if err != nil {
  76. if out == nil {
  77. out = new(protodb.Entity)
  78. out.Key = in.Key
  79. }
  80. out.Err = err.Error()
  81. responsesChan <- out
  82. return
  83. }
  84. // Otherwise continue on
  85. responsesChan <- out
  86. }
  87. }()
  88. // Send routine, block until we return
  89. for out := range responsesChan {
  90. if err := ds.Send(out); err != nil {
  91. return err
  92. }
  93. }
  94. return nil
  95. }
  96. func (s *server) Has(ctx context.Context, in *protodb.Entity) (*protodb.Entity, error) {
  97. exists := s.db.Has(in.Key)
  98. return &protodb.Entity{Exists: exists}, nil
  99. }
  100. func (s *server) Set(ctx context.Context, in *protodb.Entity) (*protodb.Nothing, error) {
  101. s.db.Set(in.Key, in.Value)
  102. return nothing, nil
  103. }
  104. func (s *server) SetSync(ctx context.Context, in *protodb.Entity) (*protodb.Nothing, error) {
  105. s.db.SetSync(in.Key, in.Value)
  106. return nothing, nil
  107. }
  108. func (s *server) Iterator(query *protodb.Entity, dis protodb.DB_IteratorServer) error {
  109. it := s.db.Iterator(query.Start, query.End)
  110. return s.handleIterator(it, dis.Send)
  111. }
  112. func (s *server) handleIterator(it db.Iterator, sendFunc func(*protodb.Iterator) error) error {
  113. for it.Valid() {
  114. start, end := it.Domain()
  115. out := &protodb.Iterator{
  116. Domain: &protodb.DDomain{Start: start, End: end},
  117. Valid: it.Valid(),
  118. Key: it.Key(),
  119. Value: it.Value(),
  120. }
  121. if err := sendFunc(out); err != nil {
  122. return err
  123. }
  124. // Finally move the iterator forward
  125. it.Next()
  126. }
  127. return nil
  128. }
  129. func (s *server) ReverseIterator(query *protodb.Entity, dis protodb.DB_ReverseIteratorServer) error {
  130. it := s.db.ReverseIterator(query.Start, query.End)
  131. return s.handleIterator(it, dis.Send)
  132. }
  133. func (s *server) Stats(context.Context, *protodb.Nothing) (*protodb.Stats, error) {
  134. stats := s.db.Stats()
  135. return &protodb.Stats{Data: stats, TimeAt: time.Now().Unix()}, nil
  136. }