You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

150 lines
3.9 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package server
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "net"
  7. "strings"
  8. "sync"
  9. . "github.com/tendermint/go-common"
  10. "github.com/tendermint/go-wire"
  11. "github.com/tendermint/tmsp/types"
  12. )
  13. // var maxNumberConnections = 2
  14. func StartListener(protoAddr string, app types.Application) (net.Listener, error) {
  15. var mtx sync.Mutex // global mutex
  16. parts := strings.SplitN(protoAddr, "://", 2)
  17. proto, addr := parts[0], parts[1]
  18. ln, err := net.Listen(proto, addr)
  19. if err != nil {
  20. return nil, err
  21. }
  22. // A goroutine to accept a connection.
  23. go func() {
  24. // semaphore := make(chan struct{}, maxNumberConnections)
  25. for {
  26. // semaphore <- struct{}{}
  27. // Accept a connection
  28. fmt.Println("Waiting for new connection...")
  29. conn, err := ln.Accept()
  30. if err != nil {
  31. Exit("Failed to accept connection")
  32. } else {
  33. fmt.Println("Accepted a new connection")
  34. }
  35. closeConn := make(chan error, 2) // Push to signal connection closed
  36. responses := make(chan types.Response, 1000) // A channel to buffer responses
  37. // Read requests from conn and deal with them
  38. go handleRequests(&mtx, app, closeConn, conn, responses)
  39. // Pull responses from 'responses' and write them to conn.
  40. go handleResponses(closeConn, responses, conn)
  41. go func() {
  42. // Wait until signal to close connection
  43. errClose := <-closeConn
  44. if errClose != nil {
  45. fmt.Printf("Connection error: %v\n", errClose)
  46. } else {
  47. fmt.Println("Connection was closed.")
  48. }
  49. // Close the connection
  50. err := conn.Close()
  51. if err != nil {
  52. fmt.Printf("Error in closing connection: %v\n", err)
  53. }
  54. // <-semaphore
  55. }()
  56. }
  57. }()
  58. return ln, nil
  59. }
  60. // Read requests from conn and deal with them
  61. func handleRequests(mtx *sync.Mutex, app types.Application, closeConn chan error, conn net.Conn, responses chan<- types.Response) {
  62. var count int
  63. var bufReader = bufio.NewReader(conn)
  64. for {
  65. var n int
  66. var err error
  67. var req types.Request
  68. wire.ReadBinaryPtrLengthPrefixed(&req, bufReader, 0, &n, &err)
  69. if err != nil {
  70. if err == io.EOF {
  71. closeConn <- fmt.Errorf("Connection closed by client")
  72. } else {
  73. closeConn <- fmt.Errorf("Error in handleRequests: %v", err.Error())
  74. }
  75. return
  76. }
  77. mtx.Lock()
  78. count++
  79. handleRequest(app, req, responses)
  80. mtx.Unlock()
  81. }
  82. }
  83. func handleRequest(app types.Application, req types.Request, responses chan<- types.Response) {
  84. switch req := req.(type) {
  85. case types.RequestEcho:
  86. responses <- types.ResponseEcho{req.Message}
  87. case types.RequestFlush:
  88. responses <- types.ResponseFlush{}
  89. case types.RequestInfo:
  90. data := app.Info()
  91. responses <- types.ResponseInfo{data}
  92. case types.RequestSetOption:
  93. logStr := app.SetOption(req.Key, req.Value)
  94. responses <- types.ResponseSetOption{logStr}
  95. case types.RequestAppendTx:
  96. code, result, logStr := app.AppendTx(req.TxBytes)
  97. responses <- types.ResponseAppendTx{code, result, logStr}
  98. case types.RequestCheckTx:
  99. code, result, logStr := app.CheckTx(req.TxBytes)
  100. responses <- types.ResponseCheckTx{code, result, logStr}
  101. case types.RequestGetHash:
  102. hash, logStr := app.GetHash()
  103. responses <- types.ResponseGetHash{hash, logStr}
  104. case types.RequestQuery:
  105. result, logStr := app.Query(req.QueryBytes)
  106. responses <- types.ResponseQuery{result, logStr}
  107. default:
  108. responses <- types.ResponseException{"Unknown request"}
  109. }
  110. }
  111. // Pull responses from 'responses' and write them to conn.
  112. func handleResponses(closeConn chan error, responses <-chan types.Response, conn net.Conn) {
  113. var count int
  114. var bufWriter = bufio.NewWriter(conn)
  115. for {
  116. var res = <-responses
  117. var n int
  118. var err error
  119. wire.WriteBinaryLengthPrefixed(struct{ types.Response }{res}, bufWriter, &n, &err)
  120. if err != nil {
  121. closeConn <- fmt.Errorf("Error in handleResponses: %v", err.Error())
  122. return
  123. }
  124. if _, ok := res.(types.ResponseFlush); ok {
  125. err = bufWriter.Flush()
  126. if err != nil {
  127. closeConn <- fmt.Errorf("Error in handleResponses: %v", err.Error())
  128. return
  129. }
  130. }
  131. count++
  132. }
  133. }