You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

152 lines
3.8 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package example
  2. import (
  3. "fmt"
  4. "net"
  5. "reflect"
  6. "testing"
  7. "time"
  8. "google.golang.org/grpc"
  9. "golang.org/x/net/context"
  10. abcicli "github.com/tendermint/abci/client"
  11. "github.com/tendermint/abci/example/dummy"
  12. "github.com/tendermint/abci/server"
  13. "github.com/tendermint/abci/types"
  14. cmn "github.com/tendermint/tmlibs/common"
  15. "github.com/tendermint/tmlibs/log"
  16. )
  17. func TestDummy(t *testing.T) {
  18. fmt.Println("### Testing Dummy")
  19. testStream(t, dummy.NewDummyApplication())
  20. }
  21. func TestBaseApp(t *testing.T) {
  22. fmt.Println("### Testing BaseApp")
  23. testStream(t, types.NewBaseApplication())
  24. }
  25. func TestGRPC(t *testing.T) {
  26. fmt.Println("### Testing GRPC")
  27. testGRPCSync(t, types.NewGRPCApplication(types.NewBaseApplication()))
  28. }
  29. func testStream(t *testing.T, app types.Application) {
  30. numDeliverTxs := 200000
  31. // Start the listener
  32. server := server.NewSocketServer("unix://test.sock", app)
  33. server.SetLogger(log.TestingLogger().With("module", "abci-server"))
  34. if _, err := server.Start(); err != nil {
  35. t.Fatalf("Error starting socket server: %v", err.Error())
  36. }
  37. defer server.Stop()
  38. // Connect to the socket
  39. client := abcicli.NewSocketClient("unix://test.sock", false)
  40. client.SetLogger(log.TestingLogger().With("module", "abci-client"))
  41. if _, err := client.Start(); err != nil {
  42. t.Fatalf("Error starting socket client: %v", err.Error())
  43. }
  44. defer client.Stop()
  45. done := make(chan struct{})
  46. counter := 0
  47. client.SetResponseCallback(func(req *types.Request, res *types.Response) {
  48. // Process response
  49. switch r := res.Value.(type) {
  50. case *types.Response_DeliverTx:
  51. counter++
  52. if r.DeliverTx.Code != types.CodeType_OK {
  53. t.Error("DeliverTx failed with ret_code", r.DeliverTx.Code)
  54. }
  55. if counter > numDeliverTxs {
  56. t.Fatalf("Too many DeliverTx responses. Got %d, expected %d", counter, numDeliverTxs)
  57. }
  58. if counter == numDeliverTxs {
  59. go func() {
  60. time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow
  61. close(done)
  62. }()
  63. return
  64. }
  65. case *types.Response_Flush:
  66. // ignore
  67. default:
  68. t.Error("Unexpected response type", reflect.TypeOf(res.Value))
  69. }
  70. })
  71. // Write requests
  72. for counter := 0; counter < numDeliverTxs; counter++ {
  73. // Send request
  74. reqRes := client.DeliverTxAsync([]byte("test"))
  75. _ = reqRes
  76. // check err ?
  77. // Sometimes send flush messages
  78. if counter%123 == 0 {
  79. client.FlushAsync()
  80. // check err ?
  81. }
  82. }
  83. // Send final flush message
  84. client.FlushAsync()
  85. <-done
  86. }
  87. //-------------------------
  88. // test grpc
  89. func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) {
  90. return cmn.Connect(addr)
  91. }
  92. func testGRPCSync(t *testing.T, app *types.GRPCApplication) {
  93. numDeliverTxs := 2000
  94. // Start the listener
  95. server := server.NewGRPCServer("unix://test.sock", app)
  96. server.SetLogger(log.TestingLogger().With("module", "abci-server"))
  97. if _, err := server.Start(); err != nil {
  98. t.Fatalf("Error starting GRPC server: %v", err.Error())
  99. }
  100. defer server.Stop()
  101. // Connect to the socket
  102. conn, err := grpc.Dial("unix://test.sock", grpc.WithInsecure(), grpc.WithDialer(dialerFunc))
  103. if err != nil {
  104. t.Fatalf("Error dialing GRPC server: %v", err.Error())
  105. }
  106. defer conn.Close()
  107. client := types.NewABCIApplicationClient(conn)
  108. // Write requests
  109. for counter := 0; counter < numDeliverTxs; counter++ {
  110. // Send request
  111. response, err := client.DeliverTx(context.Background(), &types.RequestDeliverTx{[]byte("test")})
  112. if err != nil {
  113. t.Fatalf("Error in GRPC DeliverTx: %v", err.Error())
  114. }
  115. counter++
  116. if response.Code != types.CodeType_OK {
  117. t.Error("DeliverTx failed with ret_code", response.Code)
  118. }
  119. if counter > numDeliverTxs {
  120. t.Fatal("Too many DeliverTx responses")
  121. }
  122. t.Log("response", counter)
  123. if counter == numDeliverTxs {
  124. go func() {
  125. time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow
  126. }()
  127. }
  128. }
  129. }