You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

140 lines
3.7 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package server
  2. import (
  3. "bufio"
  4. "fmt"
  5. "net"
  6. "strings"
  7. . "github.com/tendermint/go-common"
  8. "github.com/tendermint/go-wire"
  9. "github.com/tendermint/tmsp/types"
  10. )
  11. func StartListener(protoAddr string, app types.Application) (net.Listener, error) {
  12. parts := strings.SplitN(protoAddr, "://", 2)
  13. proto, addr := parts[0], parts[1]
  14. ln, err := net.Listen(proto, addr)
  15. if err != nil {
  16. return nil, err
  17. }
  18. // A goroutine to accept a connection.
  19. go func() {
  20. for {
  21. // Accept a connection
  22. conn, err := ln.Accept()
  23. if err != nil {
  24. Exit("Failed to accept connection")
  25. } else {
  26. fmt.Println("Accepted a new connection")
  27. }
  28. connClosed := make(chan struct{}, 2) // Push to signal connection closed
  29. responses := make(chan types.Response, 1000) // A channel to buffer responses
  30. // Read requests from conn and deal with them
  31. go handleRequests(app, connClosed, conn, responses)
  32. // Pull responses from 'responses' and write them to conn.
  33. go handleResponses(connClosed, responses, conn)
  34. // Wait until connection is closed
  35. <-connClosed
  36. fmt.Println("Connection was closed. Waiting for new connection...")
  37. }
  38. }()
  39. return ln, nil
  40. }
  41. // Read requests from conn and deal with them
  42. func handleRequests(app types.Application, connClosed chan struct{}, conn net.Conn, responses chan<- types.Response) {
  43. var count int
  44. var bufReader = bufio.NewReader(conn)
  45. for {
  46. var n int64
  47. var err error
  48. var req types.Request
  49. wire.ReadBinaryPtr(&req, bufReader, &n, &err)
  50. if err != nil {
  51. fmt.Println("Error in handleRequests:", err.Error())
  52. connClosed <- struct{}{}
  53. return
  54. }
  55. count++
  56. handleRequest(app, req, responses)
  57. }
  58. }
  59. func handleRequest(app types.Application, req types.Request, responses chan<- types.Response) {
  60. switch req := req.(type) {
  61. case types.RequestEcho:
  62. msg := app.Echo(req.Message)
  63. responses <- types.ResponseEcho{msg}
  64. case types.RequestFlush:
  65. responses <- types.ResponseFlush{}
  66. case types.RequestInfo:
  67. data := app.Info()
  68. responses <- types.ResponseInfo{data}
  69. case types.RequestAppendTx:
  70. retCode := app.AppendTx(req.TxBytes)
  71. responses <- types.ResponseAppendTx{retCode}
  72. events := app.GetEvents()
  73. for _, event := range events {
  74. responses <- types.ResponseEvent{event}
  75. }
  76. case types.RequestGetHash:
  77. hash, retCode := app.GetHash()
  78. responses <- types.ResponseGetHash{retCode, hash}
  79. case types.RequestCommit:
  80. retCode := app.Commit()
  81. responses <- types.ResponseCommit{retCode}
  82. case types.RequestRollback:
  83. retCode := app.Rollback()
  84. responses <- types.ResponseRollback{retCode}
  85. case types.RequestSetEventsMode:
  86. retCode := app.SetEventsMode(req.EventsMode)
  87. responses <- types.ResponseSetEventsMode{retCode}
  88. if req.EventsMode == types.EventsModeOn {
  89. events := app.GetEvents()
  90. for _, event := range events {
  91. responses <- types.ResponseEvent{event}
  92. }
  93. }
  94. case types.RequestAddListener:
  95. retCode := app.AddListener(req.EventKey)
  96. responses <- types.ResponseAddListener{retCode}
  97. case types.RequestRemListener:
  98. retCode := app.RemListener(req.EventKey)
  99. responses <- types.ResponseRemListener{retCode}
  100. default:
  101. responses <- types.ResponseException{"Unknown request"}
  102. }
  103. }
  104. // Pull responses from 'responses' and write them to conn.
  105. func handleResponses(connClosed chan struct{}, responses <-chan types.Response, conn net.Conn) {
  106. var count int
  107. var bufWriter = bufio.NewWriter(conn)
  108. for {
  109. var res = <-responses
  110. var n int64
  111. var err error
  112. wire.WriteBinary(res, bufWriter, &n, &err)
  113. if err != nil {
  114. fmt.Println(err.Error())
  115. connClosed <- struct{}{}
  116. return
  117. }
  118. if _, ok := res.(types.ResponseFlush); ok {
  119. err = bufWriter.Flush()
  120. if err != nil {
  121. fmt.Println(err.Error())
  122. connClosed <- struct{}{}
  123. return
  124. }
  125. }
  126. count++
  127. }
  128. }