You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

154 lines
3.9 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package example
  2. import (
  3. "fmt"
  4. "net"
  5. "reflect"
  6. "testing"
  7. "time"
  8. "google.golang.org/grpc"
  9. "golang.org/x/net/context"
  10. cmn "github.com/tendermint/tmlibs/common"
  11. "github.com/tendermint/tmlibs/log"
  12. abcicli "github.com/tendermint/tendermint/abci/client"
  13. "github.com/tendermint/tendermint/abci/example/code"
  14. "github.com/tendermint/tendermint/abci/example/kvstore"
  15. abciserver "github.com/tendermint/tendermint/abci/server"
  16. "github.com/tendermint/tendermint/abci/types"
  17. )
  18. func TestKVStore(t *testing.T) {
  19. fmt.Println("### Testing KVStore")
  20. testStream(t, kvstore.NewKVStoreApplication())
  21. }
  22. func TestBaseApp(t *testing.T) {
  23. fmt.Println("### Testing BaseApp")
  24. testStream(t, types.NewBaseApplication())
  25. }
  26. func TestGRPC(t *testing.T) {
  27. fmt.Println("### Testing GRPC")
  28. testGRPCSync(t, types.NewGRPCApplication(types.NewBaseApplication()))
  29. }
  30. func testStream(t *testing.T, app types.Application) {
  31. numDeliverTxs := 200000
  32. // Start the listener
  33. server := abciserver.NewSocketServer("unix://test.sock", app)
  34. server.SetLogger(log.TestingLogger().With("module", "abci-server"))
  35. if err := server.Start(); err != nil {
  36. t.Fatalf("Error starting socket server: %v", err.Error())
  37. }
  38. defer server.Stop()
  39. // Connect to the socket
  40. client := abcicli.NewSocketClient("unix://test.sock", false)
  41. client.SetLogger(log.TestingLogger().With("module", "abci-client"))
  42. if err := client.Start(); err != nil {
  43. t.Fatalf("Error starting socket client: %v", err.Error())
  44. }
  45. defer client.Stop()
  46. done := make(chan struct{})
  47. counter := 0
  48. client.SetResponseCallback(func(req *types.Request, res *types.Response) {
  49. // Process response
  50. switch r := res.Value.(type) {
  51. case *types.Response_DeliverTx:
  52. counter++
  53. if r.DeliverTx.Code != code.CodeTypeOK {
  54. t.Error("DeliverTx failed with ret_code", r.DeliverTx.Code)
  55. }
  56. if counter > numDeliverTxs {
  57. t.Fatalf("Too many DeliverTx responses. Got %d, expected %d", counter, numDeliverTxs)
  58. }
  59. if counter == numDeliverTxs {
  60. go func() {
  61. time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow
  62. close(done)
  63. }()
  64. return
  65. }
  66. case *types.Response_Flush:
  67. // ignore
  68. default:
  69. t.Error("Unexpected response type", reflect.TypeOf(res.Value))
  70. }
  71. })
  72. // Write requests
  73. for counter := 0; counter < numDeliverTxs; counter++ {
  74. // Send request
  75. reqRes := client.DeliverTxAsync([]byte("test"))
  76. _ = reqRes
  77. // check err ?
  78. // Sometimes send flush messages
  79. if counter%123 == 0 {
  80. client.FlushAsync()
  81. // check err ?
  82. }
  83. }
  84. // Send final flush message
  85. client.FlushAsync()
  86. <-done
  87. }
  88. //-------------------------
  89. // test grpc
  90. func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) {
  91. return cmn.Connect(addr)
  92. }
  93. func testGRPCSync(t *testing.T, app *types.GRPCApplication) {
  94. numDeliverTxs := 2000
  95. // Start the listener
  96. server := abciserver.NewGRPCServer("unix://test.sock", app)
  97. server.SetLogger(log.TestingLogger().With("module", "abci-server"))
  98. if err := server.Start(); err != nil {
  99. t.Fatalf("Error starting GRPC server: %v", err.Error())
  100. }
  101. defer server.Stop()
  102. // Connect to the socket
  103. conn, err := grpc.Dial("unix://test.sock", grpc.WithInsecure(), grpc.WithDialer(dialerFunc))
  104. if err != nil {
  105. t.Fatalf("Error dialing GRPC server: %v", err.Error())
  106. }
  107. defer conn.Close()
  108. client := types.NewABCIApplicationClient(conn)
  109. // Write requests
  110. for counter := 0; counter < numDeliverTxs; counter++ {
  111. // Send request
  112. response, err := client.DeliverTx(context.Background(), &types.RequestDeliverTx{[]byte("test")})
  113. if err != nil {
  114. t.Fatalf("Error in GRPC DeliverTx: %v", err.Error())
  115. }
  116. counter++
  117. if response.Code != code.CodeTypeOK {
  118. t.Error("DeliverTx failed with ret_code", response.Code)
  119. }
  120. if counter > numDeliverTxs {
  121. t.Fatal("Too many DeliverTx responses")
  122. }
  123. t.Log("response", counter)
  124. if counter == numDeliverTxs {
  125. go func() {
  126. time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow
  127. }()
  128. }
  129. }
  130. }