You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

163 lines
4.3 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package server
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "net"
  7. "strings"
  8. . "github.com/tendermint/go-common"
  9. "github.com/tendermint/go-wire"
  10. "github.com/tendermint/tmsp/types"
  11. )
  12. // var maxNumberConnections = 2
  13. func StartListener(protoAddr string, app types.Application) (net.Listener, error) {
  14. parts := strings.SplitN(protoAddr, "://", 2)
  15. proto, addr := parts[0], parts[1]
  16. ln, err := net.Listen(proto, addr)
  17. if err != nil {
  18. return nil, err
  19. }
  20. // A goroutine to accept a connection.
  21. go func() {
  22. // semaphore := make(chan struct{}, maxNumberConnections)
  23. for {
  24. // semaphore <- struct{}{}
  25. // Accept a connection
  26. fmt.Println("Waiting for new connection...")
  27. conn, err := ln.Accept()
  28. if err != nil {
  29. Exit("Failed to accept connection")
  30. } else {
  31. fmt.Println("Accepted a new connection")
  32. }
  33. appContext := app.Open()
  34. closeConn := make(chan error, 2) // Push to signal connection closed
  35. responses := make(chan types.Response, 1000) // A channel to buffer responses
  36. // Read requests from conn and deal with them
  37. go handleRequests(appContext, closeConn, conn, responses)
  38. // Pull responses from 'responses' and write them to conn.
  39. go handleResponses(closeConn, responses, conn)
  40. go func() {
  41. // Wait until signal to close connection
  42. errClose := <-closeConn
  43. if errClose != nil {
  44. fmt.Printf("Connection error: %v\n", errClose)
  45. } else {
  46. fmt.Println("Connection was closed.")
  47. }
  48. // Close the connection
  49. err := conn.Close()
  50. if err != nil {
  51. fmt.Printf("Error in closing connection: %v\n", err)
  52. }
  53. // Close the AppContext
  54. err = appContext.Close()
  55. if err != nil {
  56. fmt.Printf("Error in closing app context: %v\n", err)
  57. }
  58. // <-semaphore
  59. }()
  60. }
  61. }()
  62. return ln, nil
  63. }
  64. // Read requests from conn and deal with them
  65. func handleRequests(appC types.AppContext, closeConn chan error, conn net.Conn, responses chan<- types.Response) {
  66. var count int
  67. var bufReader = bufio.NewReader(conn)
  68. for {
  69. var n int
  70. var err error
  71. var req types.Request
  72. wire.ReadBinaryPtrLengthPrefixed(&req, bufReader, 0, &n, &err)
  73. if err != nil {
  74. if err == io.EOF {
  75. closeConn <- fmt.Errorf("Connection closed by client")
  76. } else {
  77. closeConn <- fmt.Errorf("Error in handleRequests: %v", err.Error())
  78. }
  79. return
  80. }
  81. count++
  82. handleRequest(appC, req, responses)
  83. }
  84. }
  85. func handleRequest(appC types.AppContext, req types.Request, responses chan<- types.Response) {
  86. switch req := req.(type) {
  87. case types.RequestEcho:
  88. msg := appC.Echo(req.Message)
  89. responses <- types.ResponseEcho{msg}
  90. case types.RequestFlush:
  91. responses <- types.ResponseFlush{}
  92. case types.RequestInfo:
  93. data := appC.Info()
  94. responses <- types.ResponseInfo{data}
  95. case types.RequestSetOption:
  96. retCode := appC.SetOption(req.Key, req.Value)
  97. responses <- types.ResponseSetOption{retCode}
  98. case types.RequestAppendTx:
  99. events, retCode := appC.AppendTx(req.TxBytes)
  100. responses <- types.ResponseAppendTx{retCode}
  101. for _, event := range events {
  102. responses <- types.ResponseEvent{event}
  103. }
  104. case types.RequestGetHash:
  105. hash, retCode := appC.GetHash()
  106. responses <- types.ResponseGetHash{retCode, hash}
  107. case types.RequestCommit:
  108. retCode := appC.Commit()
  109. responses <- types.ResponseCommit{retCode}
  110. case types.RequestRollback:
  111. retCode := appC.Rollback()
  112. responses <- types.ResponseRollback{retCode}
  113. case types.RequestAddListener:
  114. retCode := appC.AddListener(req.EventKey)
  115. responses <- types.ResponseAddListener{retCode}
  116. case types.RequestRemListener:
  117. retCode := appC.RemListener(req.EventKey)
  118. responses <- types.ResponseRemListener{retCode}
  119. default:
  120. responses <- types.ResponseException{"Unknown request"}
  121. }
  122. }
  123. // Pull responses from 'responses' and write them to conn.
  124. func handleResponses(closeConn chan error, responses <-chan types.Response, conn net.Conn) {
  125. var count int
  126. var bufWriter = bufio.NewWriter(conn)
  127. for {
  128. var res = <-responses
  129. var n int
  130. var err error
  131. wire.WriteBinaryLengthPrefixed(struct{ types.Response }{res}, bufWriter, &n, &err)
  132. if err != nil {
  133. closeConn <- fmt.Errorf("Error in handleResponses: %v", err.Error())
  134. return
  135. }
  136. if _, ok := res.(types.ResponseFlush); ok {
  137. err = bufWriter.Flush()
  138. if err != nil {
  139. closeConn <- fmt.Errorf("Error in handleResponses: %v", err.Error())
  140. return
  141. }
  142. }
  143. count++
  144. }
  145. }