You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

589 lines
16 KiB

abci: Flush socket requests and responses immediately. (#6997) The main effect of this change is to flush the socket client and server message encoding buffers immediately once the message is fully and correctly encoded. This allows us to remove the timer and some other special cases, without changing the observed behaviour of the system. -- Background The socket protocol client and server each use a buffered writer to encode request and response messages onto the underlying connection. This reduces the possibility of a single message being split across multiple writes, but has the side-effect that a request may remain buffered for some time. The implementation worked around this by keeping a ticker that occasionally triggers a flush, and by flushing the writer in response to an explicit request baked into the client/server protocol (see also #6994). These workarounds are both unnecessary: Once a message has been dequeued for sending and fully encoded in wire format, there is no real use keeping all or part of it buffered locally. Moreover, using an asynchronous process to flush the buffer makes the round-trip performance of the request unpredictable. -- Benchmarks Code: https://play.golang.org/p/0ChUOxJOiHt I found no pre-existing performance benchmarks to justify the flush pattern, but a natural question is whether this will significantly harm client/server performance. To test this, I implemented a simple benchmark that transfers randomly-sized byte buffers from a no-op "client" to a no-op "server" over a Unix-domain socket, using a buffered writer, both with and without explicit flushes after each write. As the following data show, flushing every time (FLUSH=true) does reduce raw throughput, but not by a significant amount except for very small request sizes, where the transfer time is already trivial (1.9μs). Given that the client is calibrated for 1MiB transactions, the overhead is not meaningful. The percentage in each section is the speedup for flushing only when the buffer is full, relative to flushing every block. The benchmark uses the default buffer size (4096 bytes), which is the same value used by the socket client and server implementation: FLUSH NBLOCKS MAX AVG TOTAL ELAPSED TIME/BLOCK false 3957471 512 255 1011165416 2.00018873s 505ns true 1068568 512 255 273064368 2.000217051s 1.871µs (73%) false 536096 4096 2048 1098066401 2.000229108s 3.731µs true 477911 4096 2047 978746731 2.000177825s 4.185µs (10.8%) false 124595 16384 8181 1019340160 2.000235086s 16.053µs true 120995 16384 8179 989703064 2.000329349s 16.532µs (2.9%) false 2114 1048576 525693 1111316541 2.000479928s 946.3µs true 2083 1048576 526379 1096449173 2.001817137s 961.025µs (1.5%) Note also that the FLUSH=false baseline is actually faster than the production code, which flushes more often than is required by the buffer filling up. Moreover, the timer slows down the overall transaction rate of the client and server, indepenedent of how fast the socket transfer is, so the loss on a real workload is probably much less.
3 years ago
9 years ago
9 years ago
9 years ago
9 years ago
abci: Flush socket requests and responses immediately. (#6997) The main effect of this change is to flush the socket client and server message encoding buffers immediately once the message is fully and correctly encoded. This allows us to remove the timer and some other special cases, without changing the observed behaviour of the system. -- Background The socket protocol client and server each use a buffered writer to encode request and response messages onto the underlying connection. This reduces the possibility of a single message being split across multiple writes, but has the side-effect that a request may remain buffered for some time. The implementation worked around this by keeping a ticker that occasionally triggers a flush, and by flushing the writer in response to an explicit request baked into the client/server protocol (see also #6994). These workarounds are both unnecessary: Once a message has been dequeued for sending and fully encoded in wire format, there is no real use keeping all or part of it buffered locally. Moreover, using an asynchronous process to flush the buffer makes the round-trip performance of the request unpredictable. -- Benchmarks Code: https://play.golang.org/p/0ChUOxJOiHt I found no pre-existing performance benchmarks to justify the flush pattern, but a natural question is whether this will significantly harm client/server performance. To test this, I implemented a simple benchmark that transfers randomly-sized byte buffers from a no-op "client" to a no-op "server" over a Unix-domain socket, using a buffered writer, both with and without explicit flushes after each write. As the following data show, flushing every time (FLUSH=true) does reduce raw throughput, but not by a significant amount except for very small request sizes, where the transfer time is already trivial (1.9μs). Given that the client is calibrated for 1MiB transactions, the overhead is not meaningful. The percentage in each section is the speedup for flushing only when the buffer is full, relative to flushing every block. The benchmark uses the default buffer size (4096 bytes), which is the same value used by the socket client and server implementation: FLUSH NBLOCKS MAX AVG TOTAL ELAPSED TIME/BLOCK false 3957471 512 255 1011165416 2.00018873s 505ns true 1068568 512 255 273064368 2.000217051s 1.871µs (73%) false 536096 4096 2048 1098066401 2.000229108s 3.731µs true 477911 4096 2047 978746731 2.000177825s 4.185µs (10.8%) false 124595 16384 8181 1019340160 2.000235086s 16.053µs true 120995 16384 8179 989703064 2.000329349s 16.532µs (2.9%) false 2114 1048576 525693 1111316541 2.000479928s 946.3µs true 2083 1048576 526379 1096449173 2.001817137s 961.025µs (1.5%) Note also that the FLUSH=false baseline is actually faster than the production code, which flushes more often than is required by the buffer filling up. Moreover, the timer slows down the overall transaction rate of the client and server, indepenedent of how fast the socket transfer is, so the loss on a real workload is probably much less.
3 years ago
abci: Flush socket requests and responses immediately. (#6997) The main effect of this change is to flush the socket client and server message encoding buffers immediately once the message is fully and correctly encoded. This allows us to remove the timer and some other special cases, without changing the observed behaviour of the system. -- Background The socket protocol client and server each use a buffered writer to encode request and response messages onto the underlying connection. This reduces the possibility of a single message being split across multiple writes, but has the side-effect that a request may remain buffered for some time. The implementation worked around this by keeping a ticker that occasionally triggers a flush, and by flushing the writer in response to an explicit request baked into the client/server protocol (see also #6994). These workarounds are both unnecessary: Once a message has been dequeued for sending and fully encoded in wire format, there is no real use keeping all or part of it buffered locally. Moreover, using an asynchronous process to flush the buffer makes the round-trip performance of the request unpredictable. -- Benchmarks Code: https://play.golang.org/p/0ChUOxJOiHt I found no pre-existing performance benchmarks to justify the flush pattern, but a natural question is whether this will significantly harm client/server performance. To test this, I implemented a simple benchmark that transfers randomly-sized byte buffers from a no-op "client" to a no-op "server" over a Unix-domain socket, using a buffered writer, both with and without explicit flushes after each write. As the following data show, flushing every time (FLUSH=true) does reduce raw throughput, but not by a significant amount except for very small request sizes, where the transfer time is already trivial (1.9μs). Given that the client is calibrated for 1MiB transactions, the overhead is not meaningful. The percentage in each section is the speedup for flushing only when the buffer is full, relative to flushing every block. The benchmark uses the default buffer size (4096 bytes), which is the same value used by the socket client and server implementation: FLUSH NBLOCKS MAX AVG TOTAL ELAPSED TIME/BLOCK false 3957471 512 255 1011165416 2.00018873s 505ns true 1068568 512 255 273064368 2.000217051s 1.871µs (73%) false 536096 4096 2048 1098066401 2.000229108s 3.731µs true 477911 4096 2047 978746731 2.000177825s 4.185µs (10.8%) false 124595 16384 8181 1019340160 2.000235086s 16.053µs true 120995 16384 8179 989703064 2.000329349s 16.532µs (2.9%) false 2114 1048576 525693 1111316541 2.000479928s 946.3µs true 2083 1048576 526379 1096449173 2.001817137s 961.025µs (1.5%) Note also that the FLUSH=false baseline is actually faster than the production code, which flushes more often than is required by the buffer filling up. Moreover, the timer slows down the overall transaction rate of the client and server, indepenedent of how fast the socket transfer is, so the loss on a real workload is probably much less.
3 years ago
abci: Flush socket requests and responses immediately. (#6997) The main effect of this change is to flush the socket client and server message encoding buffers immediately once the message is fully and correctly encoded. This allows us to remove the timer and some other special cases, without changing the observed behaviour of the system. -- Background The socket protocol client and server each use a buffered writer to encode request and response messages onto the underlying connection. This reduces the possibility of a single message being split across multiple writes, but has the side-effect that a request may remain buffered for some time. The implementation worked around this by keeping a ticker that occasionally triggers a flush, and by flushing the writer in response to an explicit request baked into the client/server protocol (see also #6994). These workarounds are both unnecessary: Once a message has been dequeued for sending and fully encoded in wire format, there is no real use keeping all or part of it buffered locally. Moreover, using an asynchronous process to flush the buffer makes the round-trip performance of the request unpredictable. -- Benchmarks Code: https://play.golang.org/p/0ChUOxJOiHt I found no pre-existing performance benchmarks to justify the flush pattern, but a natural question is whether this will significantly harm client/server performance. To test this, I implemented a simple benchmark that transfers randomly-sized byte buffers from a no-op "client" to a no-op "server" over a Unix-domain socket, using a buffered writer, both with and without explicit flushes after each write. As the following data show, flushing every time (FLUSH=true) does reduce raw throughput, but not by a significant amount except for very small request sizes, where the transfer time is already trivial (1.9μs). Given that the client is calibrated for 1MiB transactions, the overhead is not meaningful. The percentage in each section is the speedup for flushing only when the buffer is full, relative to flushing every block. The benchmark uses the default buffer size (4096 bytes), which is the same value used by the socket client and server implementation: FLUSH NBLOCKS MAX AVG TOTAL ELAPSED TIME/BLOCK false 3957471 512 255 1011165416 2.00018873s 505ns true 1068568 512 255 273064368 2.000217051s 1.871µs (73%) false 536096 4096 2048 1098066401 2.000229108s 3.731µs true 477911 4096 2047 978746731 2.000177825s 4.185µs (10.8%) false 124595 16384 8181 1019340160 2.000235086s 16.053µs true 120995 16384 8179 989703064 2.000329349s 16.532µs (2.9%) false 2114 1048576 525693 1111316541 2.000479928s 946.3µs true 2083 1048576 526379 1096449173 2.001817137s 961.025µs (1.5%) Note also that the FLUSH=false baseline is actually faster than the production code, which flushes more often than is required by the buffer filling up. Moreover, the timer slows down the overall transaction rate of the client and server, indepenedent of how fast the socket transfer is, so the loss on a real workload is probably much less.
3 years ago
abci: Flush socket requests and responses immediately. (#6997) The main effect of this change is to flush the socket client and server message encoding buffers immediately once the message is fully and correctly encoded. This allows us to remove the timer and some other special cases, without changing the observed behaviour of the system. -- Background The socket protocol client and server each use a buffered writer to encode request and response messages onto the underlying connection. This reduces the possibility of a single message being split across multiple writes, but has the side-effect that a request may remain buffered for some time. The implementation worked around this by keeping a ticker that occasionally triggers a flush, and by flushing the writer in response to an explicit request baked into the client/server protocol (see also #6994). These workarounds are both unnecessary: Once a message has been dequeued for sending and fully encoded in wire format, there is no real use keeping all or part of it buffered locally. Moreover, using an asynchronous process to flush the buffer makes the round-trip performance of the request unpredictable. -- Benchmarks Code: https://play.golang.org/p/0ChUOxJOiHt I found no pre-existing performance benchmarks to justify the flush pattern, but a natural question is whether this will significantly harm client/server performance. To test this, I implemented a simple benchmark that transfers randomly-sized byte buffers from a no-op "client" to a no-op "server" over a Unix-domain socket, using a buffered writer, both with and without explicit flushes after each write. As the following data show, flushing every time (FLUSH=true) does reduce raw throughput, but not by a significant amount except for very small request sizes, where the transfer time is already trivial (1.9μs). Given that the client is calibrated for 1MiB transactions, the overhead is not meaningful. The percentage in each section is the speedup for flushing only when the buffer is full, relative to flushing every block. The benchmark uses the default buffer size (4096 bytes), which is the same value used by the socket client and server implementation: FLUSH NBLOCKS MAX AVG TOTAL ELAPSED TIME/BLOCK false 3957471 512 255 1011165416 2.00018873s 505ns true 1068568 512 255 273064368 2.000217051s 1.871µs (73%) false 536096 4096 2048 1098066401 2.000229108s 3.731µs true 477911 4096 2047 978746731 2.000177825s 4.185µs (10.8%) false 124595 16384 8181 1019340160 2.000235086s 16.053µs true 120995 16384 8179 989703064 2.000329349s 16.532µs (2.9%) false 2114 1048576 525693 1111316541 2.000479928s 946.3µs true 2083 1048576 526379 1096449173 2.001817137s 961.025µs (1.5%) Note also that the FLUSH=false baseline is actually faster than the production code, which flushes more often than is required by the buffer filling up. Moreover, the timer slows down the overall transaction rate of the client and server, indepenedent of how fast the socket transfer is, so the loss on a real workload is probably much less.
3 years ago
abci: Flush socket requests and responses immediately. (#6997) The main effect of this change is to flush the socket client and server message encoding buffers immediately once the message is fully and correctly encoded. This allows us to remove the timer and some other special cases, without changing the observed behaviour of the system. -- Background The socket protocol client and server each use a buffered writer to encode request and response messages onto the underlying connection. This reduces the possibility of a single message being split across multiple writes, but has the side-effect that a request may remain buffered for some time. The implementation worked around this by keeping a ticker that occasionally triggers a flush, and by flushing the writer in response to an explicit request baked into the client/server protocol (see also #6994). These workarounds are both unnecessary: Once a message has been dequeued for sending and fully encoded in wire format, there is no real use keeping all or part of it buffered locally. Moreover, using an asynchronous process to flush the buffer makes the round-trip performance of the request unpredictable. -- Benchmarks Code: https://play.golang.org/p/0ChUOxJOiHt I found no pre-existing performance benchmarks to justify the flush pattern, but a natural question is whether this will significantly harm client/server performance. To test this, I implemented a simple benchmark that transfers randomly-sized byte buffers from a no-op "client" to a no-op "server" over a Unix-domain socket, using a buffered writer, both with and without explicit flushes after each write. As the following data show, flushing every time (FLUSH=true) does reduce raw throughput, but not by a significant amount except for very small request sizes, where the transfer time is already trivial (1.9μs). Given that the client is calibrated for 1MiB transactions, the overhead is not meaningful. The percentage in each section is the speedup for flushing only when the buffer is full, relative to flushing every block. The benchmark uses the default buffer size (4096 bytes), which is the same value used by the socket client and server implementation: FLUSH NBLOCKS MAX AVG TOTAL ELAPSED TIME/BLOCK false 3957471 512 255 1011165416 2.00018873s 505ns true 1068568 512 255 273064368 2.000217051s 1.871µs (73%) false 536096 4096 2048 1098066401 2.000229108s 3.731µs true 477911 4096 2047 978746731 2.000177825s 4.185µs (10.8%) false 124595 16384 8181 1019340160 2.000235086s 16.053µs true 120995 16384 8179 989703064 2.000329349s 16.532µs (2.9%) false 2114 1048576 525693 1111316541 2.000479928s 946.3µs true 2083 1048576 526379 1096449173 2.001817137s 961.025µs (1.5%) Note also that the FLUSH=false baseline is actually faster than the production code, which flushes more often than is required by the buffer filling up. Moreover, the timer slows down the overall transaction rate of the client and server, indepenedent of how fast the socket transfer is, so the loss on a real workload is probably much less.
3 years ago
abci: Flush socket requests and responses immediately. (#6997) The main effect of this change is to flush the socket client and server message encoding buffers immediately once the message is fully and correctly encoded. This allows us to remove the timer and some other special cases, without changing the observed behaviour of the system. -- Background The socket protocol client and server each use a buffered writer to encode request and response messages onto the underlying connection. This reduces the possibility of a single message being split across multiple writes, but has the side-effect that a request may remain buffered for some time. The implementation worked around this by keeping a ticker that occasionally triggers a flush, and by flushing the writer in response to an explicit request baked into the client/server protocol (see also #6994). These workarounds are both unnecessary: Once a message has been dequeued for sending and fully encoded in wire format, there is no real use keeping all or part of it buffered locally. Moreover, using an asynchronous process to flush the buffer makes the round-trip performance of the request unpredictable. -- Benchmarks Code: https://play.golang.org/p/0ChUOxJOiHt I found no pre-existing performance benchmarks to justify the flush pattern, but a natural question is whether this will significantly harm client/server performance. To test this, I implemented a simple benchmark that transfers randomly-sized byte buffers from a no-op "client" to a no-op "server" over a Unix-domain socket, using a buffered writer, both with and without explicit flushes after each write. As the following data show, flushing every time (FLUSH=true) does reduce raw throughput, but not by a significant amount except for very small request sizes, where the transfer time is already trivial (1.9μs). Given that the client is calibrated for 1MiB transactions, the overhead is not meaningful. The percentage in each section is the speedup for flushing only when the buffer is full, relative to flushing every block. The benchmark uses the default buffer size (4096 bytes), which is the same value used by the socket client and server implementation: FLUSH NBLOCKS MAX AVG TOTAL ELAPSED TIME/BLOCK false 3957471 512 255 1011165416 2.00018873s 505ns true 1068568 512 255 273064368 2.000217051s 1.871µs (73%) false 536096 4096 2048 1098066401 2.000229108s 3.731µs true 477911 4096 2047 978746731 2.000177825s 4.185µs (10.8%) false 124595 16384 8181 1019340160 2.000235086s 16.053µs true 120995 16384 8179 989703064 2.000329349s 16.532µs (2.9%) false 2114 1048576 525693 1111316541 2.000479928s 946.3µs true 2083 1048576 526379 1096449173 2.001817137s 961.025µs (1.5%) Note also that the FLUSH=false baseline is actually faster than the production code, which flushes more often than is required by the buffer filling up. Moreover, the timer slows down the overall transaction rate of the client and server, indepenedent of how fast the socket transfer is, so the loss on a real workload is probably much less.
3 years ago
abci: Flush socket requests and responses immediately. (#6997) The main effect of this change is to flush the socket client and server message encoding buffers immediately once the message is fully and correctly encoded. This allows us to remove the timer and some other special cases, without changing the observed behaviour of the system. -- Background The socket protocol client and server each use a buffered writer to encode request and response messages onto the underlying connection. This reduces the possibility of a single message being split across multiple writes, but has the side-effect that a request may remain buffered for some time. The implementation worked around this by keeping a ticker that occasionally triggers a flush, and by flushing the writer in response to an explicit request baked into the client/server protocol (see also #6994). These workarounds are both unnecessary: Once a message has been dequeued for sending and fully encoded in wire format, there is no real use keeping all or part of it buffered locally. Moreover, using an asynchronous process to flush the buffer makes the round-trip performance of the request unpredictable. -- Benchmarks Code: https://play.golang.org/p/0ChUOxJOiHt I found no pre-existing performance benchmarks to justify the flush pattern, but a natural question is whether this will significantly harm client/server performance. To test this, I implemented a simple benchmark that transfers randomly-sized byte buffers from a no-op "client" to a no-op "server" over a Unix-domain socket, using a buffered writer, both with and without explicit flushes after each write. As the following data show, flushing every time (FLUSH=true) does reduce raw throughput, but not by a significant amount except for very small request sizes, where the transfer time is already trivial (1.9μs). Given that the client is calibrated for 1MiB transactions, the overhead is not meaningful. The percentage in each section is the speedup for flushing only when the buffer is full, relative to flushing every block. The benchmark uses the default buffer size (4096 bytes), which is the same value used by the socket client and server implementation: FLUSH NBLOCKS MAX AVG TOTAL ELAPSED TIME/BLOCK false 3957471 512 255 1011165416 2.00018873s 505ns true 1068568 512 255 273064368 2.000217051s 1.871µs (73%) false 536096 4096 2048 1098066401 2.000229108s 3.731µs true 477911 4096 2047 978746731 2.000177825s 4.185µs (10.8%) false 124595 16384 8181 1019340160 2.000235086s 16.053µs true 120995 16384 8179 989703064 2.000329349s 16.532µs (2.9%) false 2114 1048576 525693 1111316541 2.000479928s 946.3µs true 2083 1048576 526379 1096449173 2.001817137s 961.025µs (1.5%) Note also that the FLUSH=false baseline is actually faster than the production code, which flushes more often than is required by the buffer filling up. Moreover, the timer slows down the overall transaction rate of the client and server, indepenedent of how fast the socket transfer is, so the loss on a real workload is probably much less.
3 years ago
abci: Flush socket requests and responses immediately. (#6997) The main effect of this change is to flush the socket client and server message encoding buffers immediately once the message is fully and correctly encoded. This allows us to remove the timer and some other special cases, without changing the observed behaviour of the system. -- Background The socket protocol client and server each use a buffered writer to encode request and response messages onto the underlying connection. This reduces the possibility of a single message being split across multiple writes, but has the side-effect that a request may remain buffered for some time. The implementation worked around this by keeping a ticker that occasionally triggers a flush, and by flushing the writer in response to an explicit request baked into the client/server protocol (see also #6994). These workarounds are both unnecessary: Once a message has been dequeued for sending and fully encoded in wire format, there is no real use keeping all or part of it buffered locally. Moreover, using an asynchronous process to flush the buffer makes the round-trip performance of the request unpredictable. -- Benchmarks Code: https://play.golang.org/p/0ChUOxJOiHt I found no pre-existing performance benchmarks to justify the flush pattern, but a natural question is whether this will significantly harm client/server performance. To test this, I implemented a simple benchmark that transfers randomly-sized byte buffers from a no-op "client" to a no-op "server" over a Unix-domain socket, using a buffered writer, both with and without explicit flushes after each write. As the following data show, flushing every time (FLUSH=true) does reduce raw throughput, but not by a significant amount except for very small request sizes, where the transfer time is already trivial (1.9μs). Given that the client is calibrated for 1MiB transactions, the overhead is not meaningful. The percentage in each section is the speedup for flushing only when the buffer is full, relative to flushing every block. The benchmark uses the default buffer size (4096 bytes), which is the same value used by the socket client and server implementation: FLUSH NBLOCKS MAX AVG TOTAL ELAPSED TIME/BLOCK false 3957471 512 255 1011165416 2.00018873s 505ns true 1068568 512 255 273064368 2.000217051s 1.871µs (73%) false 536096 4096 2048 1098066401 2.000229108s 3.731µs true 477911 4096 2047 978746731 2.000177825s 4.185µs (10.8%) false 124595 16384 8181 1019340160 2.000235086s 16.053µs true 120995 16384 8179 989703064 2.000329349s 16.532µs (2.9%) false 2114 1048576 525693 1111316541 2.000479928s 946.3µs true 2083 1048576 526379 1096449173 2.001817137s 961.025µs (1.5%) Note also that the FLUSH=false baseline is actually faster than the production code, which flushes more often than is required by the buffer filling up. Moreover, the timer slows down the overall transaction rate of the client and server, indepenedent of how fast the socket transfer is, so the loss on a real workload is probably much less.
3 years ago
8 years ago
  1. package abciclient
  2. import (
  3. "bufio"
  4. "container/list"
  5. "context"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net"
  10. "reflect"
  11. "time"
  12. "github.com/tendermint/tendermint/abci/types"
  13. tmsync "github.com/tendermint/tendermint/internal/libs/sync"
  14. tmnet "github.com/tendermint/tendermint/libs/net"
  15. "github.com/tendermint/tendermint/libs/service"
  16. )
  17. const (
  18. // reqQueueSize is the max number of queued async requests.
  19. // (memory: 256MB max assuming 1MB transactions)
  20. reqQueueSize = 256
  21. )
  22. type reqResWithContext struct {
  23. R *ReqRes
  24. C context.Context // if context.Err is not nil, reqRes will be thrown away (ignored)
  25. }
  26. // This is goroutine-safe, but users should beware that the application in
  27. // general is not meant to be interfaced with concurrent callers.
  28. type socketClient struct {
  29. service.BaseService
  30. addr string
  31. mustConnect bool
  32. conn net.Conn
  33. reqQueue chan *reqResWithContext
  34. mtx tmsync.Mutex
  35. err error
  36. reqSent *list.List // list of requests sent, waiting for response
  37. resCb func(*types.Request, *types.Response) // called on all requests, if set.
  38. }
  39. var _ Client = (*socketClient)(nil)
  40. // NewSocketClient creates a new socket client, which connects to a given
  41. // address. If mustConnect is true, the client will return an error upon start
  42. // if it fails to connect.
  43. func NewSocketClient(addr string, mustConnect bool) Client {
  44. cli := &socketClient{
  45. reqQueue: make(chan *reqResWithContext, reqQueueSize),
  46. mustConnect: mustConnect,
  47. addr: addr,
  48. reqSent: list.New(),
  49. resCb: nil,
  50. }
  51. cli.BaseService = *service.NewBaseService(nil, "socketClient", cli)
  52. return cli
  53. }
  54. // OnStart implements Service by connecting to the server and spawning reading
  55. // and writing goroutines.
  56. func (cli *socketClient) OnStart() error {
  57. var (
  58. err error
  59. conn net.Conn
  60. )
  61. for {
  62. conn, err = tmnet.Connect(cli.addr)
  63. if err != nil {
  64. if cli.mustConnect {
  65. return err
  66. }
  67. cli.Logger.Error(fmt.Sprintf("abci.socketClient failed to connect to %v. Retrying after %vs...",
  68. cli.addr, dialRetryIntervalSeconds), "err", err)
  69. time.Sleep(time.Second * dialRetryIntervalSeconds)
  70. continue
  71. }
  72. cli.conn = conn
  73. go cli.sendRequestsRoutine(conn)
  74. go cli.recvResponseRoutine(conn)
  75. return nil
  76. }
  77. }
  78. // OnStop implements Service by closing connection and flushing all queues.
  79. func (cli *socketClient) OnStop() {
  80. if cli.conn != nil {
  81. cli.conn.Close()
  82. }
  83. cli.drainQueue()
  84. }
  85. // Error returns an error if the client was stopped abruptly.
  86. func (cli *socketClient) Error() error {
  87. cli.mtx.Lock()
  88. defer cli.mtx.Unlock()
  89. return cli.err
  90. }
  91. // SetResponseCallback sets a callback, which will be executed for each
  92. // non-error & non-empty response from the server.
  93. //
  94. // NOTE: callback may get internally generated flush responses.
  95. func (cli *socketClient) SetResponseCallback(resCb Callback) {
  96. cli.mtx.Lock()
  97. cli.resCb = resCb
  98. cli.mtx.Unlock()
  99. }
  100. //----------------------------------------
  101. func (cli *socketClient) sendRequestsRoutine(conn io.Writer) {
  102. bw := bufio.NewWriter(conn)
  103. for {
  104. select {
  105. case reqres := <-cli.reqQueue:
  106. if reqres.C.Err() != nil {
  107. cli.Logger.Debug("Request's context is done", "req", reqres.R, "err", reqres.C.Err())
  108. continue
  109. }
  110. cli.willSendReq(reqres.R)
  111. if err := types.WriteMessage(reqres.R.Request, bw); err != nil {
  112. cli.stopForError(fmt.Errorf("write to buffer: %w", err))
  113. return
  114. }
  115. if err := bw.Flush(); err != nil {
  116. cli.stopForError(fmt.Errorf("flush buffer: %w", err))
  117. return
  118. }
  119. case <-cli.Quit():
  120. return
  121. }
  122. }
  123. }
  124. func (cli *socketClient) recvResponseRoutine(conn io.Reader) {
  125. r := bufio.NewReader(conn)
  126. for {
  127. var res = &types.Response{}
  128. err := types.ReadMessage(r, res)
  129. if err != nil {
  130. cli.stopForError(fmt.Errorf("read message: %w", err))
  131. return
  132. }
  133. // cli.Logger.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
  134. switch r := res.Value.(type) {
  135. case *types.Response_Exception: // app responded with error
  136. // XXX After setting cli.err, release waiters (e.g. reqres.Done())
  137. cli.stopForError(errors.New(r.Exception.Error))
  138. return
  139. default:
  140. err := cli.didRecvResponse(res)
  141. if err != nil {
  142. cli.stopForError(err)
  143. return
  144. }
  145. }
  146. }
  147. }
  148. func (cli *socketClient) willSendReq(reqres *ReqRes) {
  149. cli.mtx.Lock()
  150. defer cli.mtx.Unlock()
  151. cli.reqSent.PushBack(reqres)
  152. }
  153. func (cli *socketClient) didRecvResponse(res *types.Response) error {
  154. cli.mtx.Lock()
  155. defer cli.mtx.Unlock()
  156. // Get the first ReqRes.
  157. next := cli.reqSent.Front()
  158. if next == nil {
  159. return fmt.Errorf("unexpected %v when nothing expected", reflect.TypeOf(res.Value))
  160. }
  161. reqres := next.Value.(*ReqRes)
  162. if !resMatchesReq(reqres.Request, res) {
  163. return fmt.Errorf("unexpected %v when response to %v expected",
  164. reflect.TypeOf(res.Value), reflect.TypeOf(reqres.Request.Value))
  165. }
  166. reqres.Response = res
  167. reqres.Done() // release waiters
  168. cli.reqSent.Remove(next) // pop first item from linked list
  169. // Notify client listener if set (global callback).
  170. if cli.resCb != nil {
  171. cli.resCb(reqres.Request, res)
  172. }
  173. // Notify reqRes listener if set (request specific callback).
  174. //
  175. // NOTE: It is possible this callback isn't set on the reqres object. At this
  176. // point, in which case it will be called after, when it is set.
  177. reqres.InvokeCallback()
  178. return nil
  179. }
  180. //----------------------------------------
  181. func (cli *socketClient) EchoAsync(ctx context.Context, msg string) (*ReqRes, error) {
  182. return cli.queueRequestAsync(ctx, types.ToRequestEcho(msg))
  183. }
  184. func (cli *socketClient) FlushAsync(ctx context.Context) (*ReqRes, error) {
  185. return cli.queueRequestAsync(ctx, types.ToRequestFlush())
  186. }
  187. func (cli *socketClient) InfoAsync(ctx context.Context, req types.RequestInfo) (*ReqRes, error) {
  188. return cli.queueRequestAsync(ctx, types.ToRequestInfo(req))
  189. }
  190. func (cli *socketClient) DeliverTxAsync(ctx context.Context, req types.RequestDeliverTx) (*ReqRes, error) {
  191. return cli.queueRequestAsync(ctx, types.ToRequestDeliverTx(req))
  192. }
  193. func (cli *socketClient) CheckTxAsync(ctx context.Context, req types.RequestCheckTx) (*ReqRes, error) {
  194. return cli.queueRequestAsync(ctx, types.ToRequestCheckTx(req))
  195. }
  196. func (cli *socketClient) QueryAsync(ctx context.Context, req types.RequestQuery) (*ReqRes, error) {
  197. return cli.queueRequestAsync(ctx, types.ToRequestQuery(req))
  198. }
  199. func (cli *socketClient) CommitAsync(ctx context.Context) (*ReqRes, error) {
  200. return cli.queueRequestAsync(ctx, types.ToRequestCommit())
  201. }
  202. func (cli *socketClient) InitChainAsync(ctx context.Context, req types.RequestInitChain) (*ReqRes, error) {
  203. return cli.queueRequestAsync(ctx, types.ToRequestInitChain(req))
  204. }
  205. func (cli *socketClient) BeginBlockAsync(ctx context.Context, req types.RequestBeginBlock) (*ReqRes, error) {
  206. return cli.queueRequestAsync(ctx, types.ToRequestBeginBlock(req))
  207. }
  208. func (cli *socketClient) EndBlockAsync(ctx context.Context, req types.RequestEndBlock) (*ReqRes, error) {
  209. return cli.queueRequestAsync(ctx, types.ToRequestEndBlock(req))
  210. }
  211. func (cli *socketClient) ListSnapshotsAsync(ctx context.Context, req types.RequestListSnapshots) (*ReqRes, error) {
  212. return cli.queueRequestAsync(ctx, types.ToRequestListSnapshots(req))
  213. }
  214. func (cli *socketClient) OfferSnapshotAsync(ctx context.Context, req types.RequestOfferSnapshot) (*ReqRes, error) {
  215. return cli.queueRequestAsync(ctx, types.ToRequestOfferSnapshot(req))
  216. }
  217. func (cli *socketClient) LoadSnapshotChunkAsync(
  218. ctx context.Context,
  219. req types.RequestLoadSnapshotChunk,
  220. ) (*ReqRes, error) {
  221. return cli.queueRequestAsync(ctx, types.ToRequestLoadSnapshotChunk(req))
  222. }
  223. func (cli *socketClient) ApplySnapshotChunkAsync(
  224. ctx context.Context,
  225. req types.RequestApplySnapshotChunk,
  226. ) (*ReqRes, error) {
  227. return cli.queueRequestAsync(ctx, types.ToRequestApplySnapshotChunk(req))
  228. }
  229. //----------------------------------------
  230. func (cli *socketClient) FlushSync(ctx context.Context) error {
  231. reqRes, err := cli.queueRequest(ctx, types.ToRequestFlush(), true)
  232. if err != nil {
  233. return queueErr(err)
  234. }
  235. if err := cli.Error(); err != nil {
  236. return err
  237. }
  238. gotResp := make(chan struct{})
  239. go func() {
  240. // NOTE: if we don't flush the queue, its possible to get stuck here
  241. reqRes.Wait()
  242. close(gotResp)
  243. }()
  244. select {
  245. case <-gotResp:
  246. return cli.Error()
  247. case <-ctx.Done():
  248. return ctx.Err()
  249. }
  250. }
  251. func (cli *socketClient) EchoSync(ctx context.Context, msg string) (*types.ResponseEcho, error) {
  252. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestEcho(msg))
  253. if err != nil {
  254. return nil, err
  255. }
  256. return reqres.Response.GetEcho(), nil
  257. }
  258. func (cli *socketClient) InfoSync(
  259. ctx context.Context,
  260. req types.RequestInfo,
  261. ) (*types.ResponseInfo, error) {
  262. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestInfo(req))
  263. if err != nil {
  264. return nil, err
  265. }
  266. return reqres.Response.GetInfo(), nil
  267. }
  268. func (cli *socketClient) DeliverTxSync(
  269. ctx context.Context,
  270. req types.RequestDeliverTx,
  271. ) (*types.ResponseDeliverTx, error) {
  272. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestDeliverTx(req))
  273. if err != nil {
  274. return nil, err
  275. }
  276. return reqres.Response.GetDeliverTx(), nil
  277. }
  278. func (cli *socketClient) CheckTxSync(
  279. ctx context.Context,
  280. req types.RequestCheckTx,
  281. ) (*types.ResponseCheckTx, error) {
  282. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestCheckTx(req))
  283. if err != nil {
  284. return nil, err
  285. }
  286. return reqres.Response.GetCheckTx(), nil
  287. }
  288. func (cli *socketClient) QuerySync(
  289. ctx context.Context,
  290. req types.RequestQuery,
  291. ) (*types.ResponseQuery, error) {
  292. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestQuery(req))
  293. if err != nil {
  294. return nil, err
  295. }
  296. return reqres.Response.GetQuery(), nil
  297. }
  298. func (cli *socketClient) CommitSync(ctx context.Context) (*types.ResponseCommit, error) {
  299. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestCommit())
  300. if err != nil {
  301. return nil, err
  302. }
  303. return reqres.Response.GetCommit(), nil
  304. }
  305. func (cli *socketClient) InitChainSync(
  306. ctx context.Context,
  307. req types.RequestInitChain,
  308. ) (*types.ResponseInitChain, error) {
  309. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestInitChain(req))
  310. if err != nil {
  311. return nil, err
  312. }
  313. return reqres.Response.GetInitChain(), nil
  314. }
  315. func (cli *socketClient) BeginBlockSync(
  316. ctx context.Context,
  317. req types.RequestBeginBlock,
  318. ) (*types.ResponseBeginBlock, error) {
  319. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestBeginBlock(req))
  320. if err != nil {
  321. return nil, err
  322. }
  323. return reqres.Response.GetBeginBlock(), nil
  324. }
  325. func (cli *socketClient) EndBlockSync(
  326. ctx context.Context,
  327. req types.RequestEndBlock,
  328. ) (*types.ResponseEndBlock, error) {
  329. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestEndBlock(req))
  330. if err != nil {
  331. return nil, err
  332. }
  333. return reqres.Response.GetEndBlock(), nil
  334. }
  335. func (cli *socketClient) ListSnapshotsSync(
  336. ctx context.Context,
  337. req types.RequestListSnapshots,
  338. ) (*types.ResponseListSnapshots, error) {
  339. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestListSnapshots(req))
  340. if err != nil {
  341. return nil, err
  342. }
  343. return reqres.Response.GetListSnapshots(), nil
  344. }
  345. func (cli *socketClient) OfferSnapshotSync(
  346. ctx context.Context,
  347. req types.RequestOfferSnapshot,
  348. ) (*types.ResponseOfferSnapshot, error) {
  349. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestOfferSnapshot(req))
  350. if err != nil {
  351. return nil, err
  352. }
  353. return reqres.Response.GetOfferSnapshot(), nil
  354. }
  355. func (cli *socketClient) LoadSnapshotChunkSync(
  356. ctx context.Context,
  357. req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
  358. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestLoadSnapshotChunk(req))
  359. if err != nil {
  360. return nil, err
  361. }
  362. return reqres.Response.GetLoadSnapshotChunk(), nil
  363. }
  364. func (cli *socketClient) ApplySnapshotChunkSync(
  365. ctx context.Context,
  366. req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
  367. reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestApplySnapshotChunk(req))
  368. if err != nil {
  369. return nil, err
  370. }
  371. return reqres.Response.GetApplySnapshotChunk(), nil
  372. }
  373. //----------------------------------------
  374. // queueRequest enqueues req onto the queue. If the queue is full, it ether
  375. // returns an error (sync=false) or blocks (sync=true).
  376. //
  377. // When sync=true, ctx can be used to break early. When sync=false, ctx will be
  378. // used later to determine if request should be dropped (if ctx.Err is
  379. // non-nil).
  380. //
  381. // The caller is responsible for checking cli.Error.
  382. func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request, sync bool) (*ReqRes, error) {
  383. reqres := NewReqRes(req)
  384. if sync {
  385. select {
  386. case cli.reqQueue <- &reqResWithContext{R: reqres, C: context.Background()}:
  387. case <-ctx.Done():
  388. return nil, ctx.Err()
  389. }
  390. } else {
  391. select {
  392. case cli.reqQueue <- &reqResWithContext{R: reqres, C: ctx}:
  393. default:
  394. return nil, errors.New("buffer is full")
  395. }
  396. }
  397. return reqres, nil
  398. }
  399. func (cli *socketClient) queueRequestAsync(
  400. ctx context.Context,
  401. req *types.Request,
  402. ) (*ReqRes, error) {
  403. reqres, err := cli.queueRequest(ctx, req, false)
  404. if err != nil {
  405. return nil, queueErr(err)
  406. }
  407. return reqres, cli.Error()
  408. }
  409. func (cli *socketClient) queueRequestAndFlushSync(
  410. ctx context.Context,
  411. req *types.Request,
  412. ) (*ReqRes, error) {
  413. reqres, err := cli.queueRequest(ctx, req, true)
  414. if err != nil {
  415. return nil, queueErr(err)
  416. }
  417. if err := cli.FlushSync(ctx); err != nil {
  418. return nil, err
  419. }
  420. return reqres, cli.Error()
  421. }
  422. func queueErr(e error) error {
  423. return fmt.Errorf("can't queue req: %w", e)
  424. }
  425. // drainQueue marks as complete and discards all remaining pending requests
  426. // from the queue.
  427. func (cli *socketClient) drainQueue() {
  428. cli.mtx.Lock()
  429. defer cli.mtx.Unlock()
  430. // mark all in-flight messages as resolved (they will get cli.Error())
  431. for req := cli.reqSent.Front(); req != nil; req = req.Next() {
  432. reqres := req.Value.(*ReqRes)
  433. reqres.Done()
  434. }
  435. // Mark all queued messages as resolved.
  436. //
  437. // TODO(creachadair): We can't simply range the channel, because it is never
  438. // closed, and the writer continues to add work.
  439. // See https://github.com/tendermint/tendermint/issues/6996.
  440. for {
  441. select {
  442. case reqres := <-cli.reqQueue:
  443. reqres.R.Done()
  444. default:
  445. return
  446. }
  447. }
  448. }
  449. //----------------------------------------
  450. func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
  451. switch req.Value.(type) {
  452. case *types.Request_Echo:
  453. _, ok = res.Value.(*types.Response_Echo)
  454. case *types.Request_Flush:
  455. _, ok = res.Value.(*types.Response_Flush)
  456. case *types.Request_Info:
  457. _, ok = res.Value.(*types.Response_Info)
  458. case *types.Request_DeliverTx:
  459. _, ok = res.Value.(*types.Response_DeliverTx)
  460. case *types.Request_CheckTx:
  461. _, ok = res.Value.(*types.Response_CheckTx)
  462. case *types.Request_Commit:
  463. _, ok = res.Value.(*types.Response_Commit)
  464. case *types.Request_Query:
  465. _, ok = res.Value.(*types.Response_Query)
  466. case *types.Request_InitChain:
  467. _, ok = res.Value.(*types.Response_InitChain)
  468. case *types.Request_BeginBlock:
  469. _, ok = res.Value.(*types.Response_BeginBlock)
  470. case *types.Request_EndBlock:
  471. _, ok = res.Value.(*types.Response_EndBlock)
  472. case *types.Request_ApplySnapshotChunk:
  473. _, ok = res.Value.(*types.Response_ApplySnapshotChunk)
  474. case *types.Request_LoadSnapshotChunk:
  475. _, ok = res.Value.(*types.Response_LoadSnapshotChunk)
  476. case *types.Request_ListSnapshots:
  477. _, ok = res.Value.(*types.Response_ListSnapshots)
  478. case *types.Request_OfferSnapshot:
  479. _, ok = res.Value.(*types.Response_OfferSnapshot)
  480. }
  481. return ok
  482. }
  483. func (cli *socketClient) stopForError(err error) {
  484. if !cli.IsRunning() {
  485. return
  486. }
  487. cli.mtx.Lock()
  488. cli.err = err
  489. cli.mtx.Unlock()
  490. cli.Logger.Info("Stopping abci.socketClient", "reason", err)
  491. if err := cli.Stop(); err != nil {
  492. cli.Logger.Error("Error stopping abci.socketClient", "err", err)
  493. }
  494. }