You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

549 lines
15 KiB

rpc/lib/client & server: try to conform to JSON-RPC 2.0 spec (#4141) https://www.jsonrpc.org/specification What is done in this PR: JSONRPCClient: validate that Response.ID matches Request.ID I wanted to do the same for the WSClient, but since we're sending events as responses, not notifications, checking IDs would require storing them in memory indefinitely (and we won't be able to remove them upon client unsubscribing because ID is different then). Request.ID is now optional. Notification is a Request without an ID. Previously "" or 0 were considered as notifications Remove #event suffix from ID from an event response (partially fixes #2949) ID must be either string, int or null AND must be equal to request's ID. Now, because we've implemented events as responses, WS clients are tripping when they see Response.ID("0#event") != Request.ID("0"). Implementing events as requests would require a lot of time (~ 2 days to completely rewrite WS client and server) generate unique ID for each request switch to integer IDs instead of "json-client-XYZ" id=0 method=/subscribe id=0 result=... id=1 method=/abci_query id=1 result=... > send events (resulting from /subscribe) as requests+notifications (not responses) this will require a lot of work. probably not worth it * rpc: generate an unique ID for each request in conformance with JSON-RPC spec * WSClient: check for unsolicited responses * fix golangci warnings * save commit * fix errors * remove ID from responses from subscribe Refs #2949 * clients are safe for concurrent access * tm-bench: switch to int ID * fixes after my own review * comment out sentIDs in WSClient see commit body for the reason * remove body.Close it will be closed automatically * stop ws connection outside of write/read routines also, use t.Rate in tm-bench indexer when calculating ID fix gocritic issues * update swagger.yaml * Apply suggestions from code review * fix stylecheck and golint linter warnings * update changelog * update changelog2
5 years ago
7 years ago
rpc/lib/client & server: try to conform to JSON-RPC 2.0 spec (#4141) https://www.jsonrpc.org/specification What is done in this PR: JSONRPCClient: validate that Response.ID matches Request.ID I wanted to do the same for the WSClient, but since we're sending events as responses, not notifications, checking IDs would require storing them in memory indefinitely (and we won't be able to remove them upon client unsubscribing because ID is different then). Request.ID is now optional. Notification is a Request without an ID. Previously "" or 0 were considered as notifications Remove #event suffix from ID from an event response (partially fixes #2949) ID must be either string, int or null AND must be equal to request's ID. Now, because we've implemented events as responses, WS clients are tripping when they see Response.ID("0#event") != Request.ID("0"). Implementing events as requests would require a lot of time (~ 2 days to completely rewrite WS client and server) generate unique ID for each request switch to integer IDs instead of "json-client-XYZ" id=0 method=/subscribe id=0 result=... id=1 method=/abci_query id=1 result=... > send events (resulting from /subscribe) as requests+notifications (not responses) this will require a lot of work. probably not worth it * rpc: generate an unique ID for each request in conformance with JSON-RPC spec * WSClient: check for unsolicited responses * fix golangci warnings * save commit * fix errors * remove ID from responses from subscribe Refs #2949 * clients are safe for concurrent access * tm-bench: switch to int ID * fixes after my own review * comment out sentIDs in WSClient see commit body for the reason * remove body.Close it will be closed automatically * stop ws connection outside of write/read routines also, use t.Rate in tm-bench indexer when calculating ID fix gocritic issues * update swagger.yaml * Apply suggestions from code review * fix stylecheck and golint linter warnings * update changelog * update changelog2
5 years ago
rpc/lib/client & server: try to conform to JSON-RPC 2.0 spec (#4141) https://www.jsonrpc.org/specification What is done in this PR: JSONRPCClient: validate that Response.ID matches Request.ID I wanted to do the same for the WSClient, but since we're sending events as responses, not notifications, checking IDs would require storing them in memory indefinitely (and we won't be able to remove them upon client unsubscribing because ID is different then). Request.ID is now optional. Notification is a Request without an ID. Previously "" or 0 were considered as notifications Remove #event suffix from ID from an event response (partially fixes #2949) ID must be either string, int or null AND must be equal to request's ID. Now, because we've implemented events as responses, WS clients are tripping when they see Response.ID("0#event") != Request.ID("0"). Implementing events as requests would require a lot of time (~ 2 days to completely rewrite WS client and server) generate unique ID for each request switch to integer IDs instead of "json-client-XYZ" id=0 method=/subscribe id=0 result=... id=1 method=/abci_query id=1 result=... > send events (resulting from /subscribe) as requests+notifications (not responses) this will require a lot of work. probably not worth it * rpc: generate an unique ID for each request in conformance with JSON-RPC spec * WSClient: check for unsolicited responses * fix golangci warnings * save commit * fix errors * remove ID from responses from subscribe Refs #2949 * clients are safe for concurrent access * tm-bench: switch to int ID * fixes after my own review * comment out sentIDs in WSClient see commit body for the reason * remove body.Close it will be closed automatically * stop ws connection outside of write/read routines also, use t.Rate in tm-bench indexer when calculating ID fix gocritic issues * update swagger.yaml * Apply suggestions from code review * fix stylecheck and golint linter warnings * update changelog * update changelog2
5 years ago
7 years ago
rpc/lib/client & server: try to conform to JSON-RPC 2.0 spec (#4141) https://www.jsonrpc.org/specification What is done in this PR: JSONRPCClient: validate that Response.ID matches Request.ID I wanted to do the same for the WSClient, but since we're sending events as responses, not notifications, checking IDs would require storing them in memory indefinitely (and we won't be able to remove them upon client unsubscribing because ID is different then). Request.ID is now optional. Notification is a Request without an ID. Previously "" or 0 were considered as notifications Remove #event suffix from ID from an event response (partially fixes #2949) ID must be either string, int or null AND must be equal to request's ID. Now, because we've implemented events as responses, WS clients are tripping when they see Response.ID("0#event") != Request.ID("0"). Implementing events as requests would require a lot of time (~ 2 days to completely rewrite WS client and server) generate unique ID for each request switch to integer IDs instead of "json-client-XYZ" id=0 method=/subscribe id=0 result=... id=1 method=/abci_query id=1 result=... > send events (resulting from /subscribe) as requests+notifications (not responses) this will require a lot of work. probably not worth it * rpc: generate an unique ID for each request in conformance with JSON-RPC spec * WSClient: check for unsolicited responses * fix golangci warnings * save commit * fix errors * remove ID from responses from subscribe Refs #2949 * clients are safe for concurrent access * tm-bench: switch to int ID * fixes after my own review * comment out sentIDs in WSClient see commit body for the reason * remove body.Close it will be closed automatically * stop ws connection outside of write/read routines also, use t.Rate in tm-bench indexer when calculating ID fix gocritic issues * update swagger.yaml * Apply suggestions from code review * fix stylecheck and golint linter warnings * update changelog * update changelog2
5 years ago
7 years ago
7 years ago
7 years ago
7 years ago
rpc/lib/client & server: try to conform to JSON-RPC 2.0 spec (#4141) https://www.jsonrpc.org/specification What is done in this PR: JSONRPCClient: validate that Response.ID matches Request.ID I wanted to do the same for the WSClient, but since we're sending events as responses, not notifications, checking IDs would require storing them in memory indefinitely (and we won't be able to remove them upon client unsubscribing because ID is different then). Request.ID is now optional. Notification is a Request without an ID. Previously "" or 0 were considered as notifications Remove #event suffix from ID from an event response (partially fixes #2949) ID must be either string, int or null AND must be equal to request's ID. Now, because we've implemented events as responses, WS clients are tripping when they see Response.ID("0#event") != Request.ID("0"). Implementing events as requests would require a lot of time (~ 2 days to completely rewrite WS client and server) generate unique ID for each request switch to integer IDs instead of "json-client-XYZ" id=0 method=/subscribe id=0 result=... id=1 method=/abci_query id=1 result=... > send events (resulting from /subscribe) as requests+notifications (not responses) this will require a lot of work. probably not worth it * rpc: generate an unique ID for each request in conformance with JSON-RPC spec * WSClient: check for unsolicited responses * fix golangci warnings * save commit * fix errors * remove ID from responses from subscribe Refs #2949 * clients are safe for concurrent access * tm-bench: switch to int ID * fixes after my own review * comment out sentIDs in WSClient see commit body for the reason * remove body.Close it will be closed automatically * stop ws connection outside of write/read routines also, use t.Rate in tm-bench indexer when calculating ID fix gocritic issues * update swagger.yaml * Apply suggestions from code review * fix stylecheck and golint linter warnings * update changelog * update changelog2
5 years ago
rpc/lib/client & server: try to conform to JSON-RPC 2.0 spec (#4141) https://www.jsonrpc.org/specification What is done in this PR: JSONRPCClient: validate that Response.ID matches Request.ID I wanted to do the same for the WSClient, but since we're sending events as responses, not notifications, checking IDs would require storing them in memory indefinitely (and we won't be able to remove them upon client unsubscribing because ID is different then). Request.ID is now optional. Notification is a Request without an ID. Previously "" or 0 were considered as notifications Remove #event suffix from ID from an event response (partially fixes #2949) ID must be either string, int or null AND must be equal to request's ID. Now, because we've implemented events as responses, WS clients are tripping when they see Response.ID("0#event") != Request.ID("0"). Implementing events as requests would require a lot of time (~ 2 days to completely rewrite WS client and server) generate unique ID for each request switch to integer IDs instead of "json-client-XYZ" id=0 method=/subscribe id=0 result=... id=1 method=/abci_query id=1 result=... > send events (resulting from /subscribe) as requests+notifications (not responses) this will require a lot of work. probably not worth it * rpc: generate an unique ID for each request in conformance with JSON-RPC spec * WSClient: check for unsolicited responses * fix golangci warnings * save commit * fix errors * remove ID from responses from subscribe Refs #2949 * clients are safe for concurrent access * tm-bench: switch to int ID * fixes after my own review * comment out sentIDs in WSClient see commit body for the reason * remove body.Close it will be closed automatically * stop ws connection outside of write/read routines also, use t.Rate in tm-bench indexer when calculating ID fix gocritic issues * update swagger.yaml * Apply suggestions from code review * fix stylecheck and golint linter warnings * update changelog * update changelog2
5 years ago
rpc/lib/client & server: try to conform to JSON-RPC 2.0 spec (#4141) https://www.jsonrpc.org/specification What is done in this PR: JSONRPCClient: validate that Response.ID matches Request.ID I wanted to do the same for the WSClient, but since we're sending events as responses, not notifications, checking IDs would require storing them in memory indefinitely (and we won't be able to remove them upon client unsubscribing because ID is different then). Request.ID is now optional. Notification is a Request without an ID. Previously "" or 0 were considered as notifications Remove #event suffix from ID from an event response (partially fixes #2949) ID must be either string, int or null AND must be equal to request's ID. Now, because we've implemented events as responses, WS clients are tripping when they see Response.ID("0#event") != Request.ID("0"). Implementing events as requests would require a lot of time (~ 2 days to completely rewrite WS client and server) generate unique ID for each request switch to integer IDs instead of "json-client-XYZ" id=0 method=/subscribe id=0 result=... id=1 method=/abci_query id=1 result=... > send events (resulting from /subscribe) as requests+notifications (not responses) this will require a lot of work. probably not worth it * rpc: generate an unique ID for each request in conformance with JSON-RPC spec * WSClient: check for unsolicited responses * fix golangci warnings * save commit * fix errors * remove ID from responses from subscribe Refs #2949 * clients are safe for concurrent access * tm-bench: switch to int ID * fixes after my own review * comment out sentIDs in WSClient see commit body for the reason * remove body.Close it will be closed automatically * stop ws connection outside of write/read routines also, use t.Rate in tm-bench indexer when calculating ID fix gocritic issues * update swagger.yaml * Apply suggestions from code review * fix stylecheck and golint linter warnings * update changelog * update changelog2
5 years ago
rpc/lib/client & server: try to conform to JSON-RPC 2.0 spec (#4141) https://www.jsonrpc.org/specification What is done in this PR: JSONRPCClient: validate that Response.ID matches Request.ID I wanted to do the same for the WSClient, but since we're sending events as responses, not notifications, checking IDs would require storing them in memory indefinitely (and we won't be able to remove them upon client unsubscribing because ID is different then). Request.ID is now optional. Notification is a Request without an ID. Previously "" or 0 were considered as notifications Remove #event suffix from ID from an event response (partially fixes #2949) ID must be either string, int or null AND must be equal to request's ID. Now, because we've implemented events as responses, WS clients are tripping when they see Response.ID("0#event") != Request.ID("0"). Implementing events as requests would require a lot of time (~ 2 days to completely rewrite WS client and server) generate unique ID for each request switch to integer IDs instead of "json-client-XYZ" id=0 method=/subscribe id=0 result=... id=1 method=/abci_query id=1 result=... > send events (resulting from /subscribe) as requests+notifications (not responses) this will require a lot of work. probably not worth it * rpc: generate an unique ID for each request in conformance with JSON-RPC spec * WSClient: check for unsolicited responses * fix golangci warnings * save commit * fix errors * remove ID from responses from subscribe Refs #2949 * clients are safe for concurrent access * tm-bench: switch to int ID * fixes after my own review * comment out sentIDs in WSClient see commit body for the reason * remove body.Close it will be closed automatically * stop ws connection outside of write/read routines also, use t.Rate in tm-bench indexer when calculating ID fix gocritic issues * update swagger.yaml * Apply suggestions from code review * fix stylecheck and golint linter warnings * update changelog * update changelog2
5 years ago
rpc/lib/client & server: try to conform to JSON-RPC 2.0 spec (#4141) https://www.jsonrpc.org/specification What is done in this PR: JSONRPCClient: validate that Response.ID matches Request.ID I wanted to do the same for the WSClient, but since we're sending events as responses, not notifications, checking IDs would require storing them in memory indefinitely (and we won't be able to remove them upon client unsubscribing because ID is different then). Request.ID is now optional. Notification is a Request without an ID. Previously "" or 0 were considered as notifications Remove #event suffix from ID from an event response (partially fixes #2949) ID must be either string, int or null AND must be equal to request's ID. Now, because we've implemented events as responses, WS clients are tripping when they see Response.ID("0#event") != Request.ID("0"). Implementing events as requests would require a lot of time (~ 2 days to completely rewrite WS client and server) generate unique ID for each request switch to integer IDs instead of "json-client-XYZ" id=0 method=/subscribe id=0 result=... id=1 method=/abci_query id=1 result=... > send events (resulting from /subscribe) as requests+notifications (not responses) this will require a lot of work. probably not worth it * rpc: generate an unique ID for each request in conformance with JSON-RPC spec * WSClient: check for unsolicited responses * fix golangci warnings * save commit * fix errors * remove ID from responses from subscribe Refs #2949 * clients are safe for concurrent access * tm-bench: switch to int ID * fixes after my own review * comment out sentIDs in WSClient see commit body for the reason * remove body.Close it will be closed automatically * stop ws connection outside of write/read routines also, use t.Rate in tm-bench indexer when calculating ID fix gocritic issues * update swagger.yaml * Apply suggestions from code review * fix stylecheck and golint linter warnings * update changelog * update changelog2
5 years ago
rpc/lib/client & server: try to conform to JSON-RPC 2.0 spec (#4141) https://www.jsonrpc.org/specification What is done in this PR: JSONRPCClient: validate that Response.ID matches Request.ID I wanted to do the same for the WSClient, but since we're sending events as responses, not notifications, checking IDs would require storing them in memory indefinitely (and we won't be able to remove them upon client unsubscribing because ID is different then). Request.ID is now optional. Notification is a Request without an ID. Previously "" or 0 were considered as notifications Remove #event suffix from ID from an event response (partially fixes #2949) ID must be either string, int or null AND must be equal to request's ID. Now, because we've implemented events as responses, WS clients are tripping when they see Response.ID("0#event") != Request.ID("0"). Implementing events as requests would require a lot of time (~ 2 days to completely rewrite WS client and server) generate unique ID for each request switch to integer IDs instead of "json-client-XYZ" id=0 method=/subscribe id=0 result=... id=1 method=/abci_query id=1 result=... > send events (resulting from /subscribe) as requests+notifications (not responses) this will require a lot of work. probably not worth it * rpc: generate an unique ID for each request in conformance with JSON-RPC spec * WSClient: check for unsolicited responses * fix golangci warnings * save commit * fix errors * remove ID from responses from subscribe Refs #2949 * clients are safe for concurrent access * tm-bench: switch to int ID * fixes after my own review * comment out sentIDs in WSClient see commit body for the reason * remove body.Close it will be closed automatically * stop ws connection outside of write/read routines also, use t.Rate in tm-bench indexer when calculating ID fix gocritic issues * update swagger.yaml * Apply suggestions from code review * fix stylecheck and golint linter warnings * update changelog * update changelog2
5 years ago
rpc/lib/client & server: try to conform to JSON-RPC 2.0 spec (#4141) https://www.jsonrpc.org/specification What is done in this PR: JSONRPCClient: validate that Response.ID matches Request.ID I wanted to do the same for the WSClient, but since we're sending events as responses, not notifications, checking IDs would require storing them in memory indefinitely (and we won't be able to remove them upon client unsubscribing because ID is different then). Request.ID is now optional. Notification is a Request without an ID. Previously "" or 0 were considered as notifications Remove #event suffix from ID from an event response (partially fixes #2949) ID must be either string, int or null AND must be equal to request's ID. Now, because we've implemented events as responses, WS clients are tripping when they see Response.ID("0#event") != Request.ID("0"). Implementing events as requests would require a lot of time (~ 2 days to completely rewrite WS client and server) generate unique ID for each request switch to integer IDs instead of "json-client-XYZ" id=0 method=/subscribe id=0 result=... id=1 method=/abci_query id=1 result=... > send events (resulting from /subscribe) as requests+notifications (not responses) this will require a lot of work. probably not worth it * rpc: generate an unique ID for each request in conformance with JSON-RPC spec * WSClient: check for unsolicited responses * fix golangci warnings * save commit * fix errors * remove ID from responses from subscribe Refs #2949 * clients are safe for concurrent access * tm-bench: switch to int ID * fixes after my own review * comment out sentIDs in WSClient see commit body for the reason * remove body.Close it will be closed automatically * stop ws connection outside of write/read routines also, use t.Rate in tm-bench indexer when calculating ID fix gocritic issues * update swagger.yaml * Apply suggestions from code review * fix stylecheck and golint linter warnings * update changelog * update changelog2
5 years ago
7 years ago
rpc/lib/client & server: try to conform to JSON-RPC 2.0 spec (#4141) https://www.jsonrpc.org/specification What is done in this PR: JSONRPCClient: validate that Response.ID matches Request.ID I wanted to do the same for the WSClient, but since we're sending events as responses, not notifications, checking IDs would require storing them in memory indefinitely (and we won't be able to remove them upon client unsubscribing because ID is different then). Request.ID is now optional. Notification is a Request without an ID. Previously "" or 0 were considered as notifications Remove #event suffix from ID from an event response (partially fixes #2949) ID must be either string, int or null AND must be equal to request's ID. Now, because we've implemented events as responses, WS clients are tripping when they see Response.ID("0#event") != Request.ID("0"). Implementing events as requests would require a lot of time (~ 2 days to completely rewrite WS client and server) generate unique ID for each request switch to integer IDs instead of "json-client-XYZ" id=0 method=/subscribe id=0 result=... id=1 method=/abci_query id=1 result=... > send events (resulting from /subscribe) as requests+notifications (not responses) this will require a lot of work. probably not worth it * rpc: generate an unique ID for each request in conformance with JSON-RPC spec * WSClient: check for unsolicited responses * fix golangci warnings * save commit * fix errors * remove ID from responses from subscribe Refs #2949 * clients are safe for concurrent access * tm-bench: switch to int ID * fixes after my own review * comment out sentIDs in WSClient see commit body for the reason * remove body.Close it will be closed automatically * stop ws connection outside of write/read routines also, use t.Rate in tm-bench indexer when calculating ID fix gocritic issues * update swagger.yaml * Apply suggestions from code review * fix stylecheck and golint linter warnings * update changelog * update changelog2
5 years ago
rpc/lib/client & server: try to conform to JSON-RPC 2.0 spec (#4141) https://www.jsonrpc.org/specification What is done in this PR: JSONRPCClient: validate that Response.ID matches Request.ID I wanted to do the same for the WSClient, but since we're sending events as responses, not notifications, checking IDs would require storing them in memory indefinitely (and we won't be able to remove them upon client unsubscribing because ID is different then). Request.ID is now optional. Notification is a Request without an ID. Previously "" or 0 were considered as notifications Remove #event suffix from ID from an event response (partially fixes #2949) ID must be either string, int or null AND must be equal to request's ID. Now, because we've implemented events as responses, WS clients are tripping when they see Response.ID("0#event") != Request.ID("0"). Implementing events as requests would require a lot of time (~ 2 days to completely rewrite WS client and server) generate unique ID for each request switch to integer IDs instead of "json-client-XYZ" id=0 method=/subscribe id=0 result=... id=1 method=/abci_query id=1 result=... > send events (resulting from /subscribe) as requests+notifications (not responses) this will require a lot of work. probably not worth it * rpc: generate an unique ID for each request in conformance with JSON-RPC spec * WSClient: check for unsolicited responses * fix golangci warnings * save commit * fix errors * remove ID from responses from subscribe Refs #2949 * clients are safe for concurrent access * tm-bench: switch to int ID * fixes after my own review * comment out sentIDs in WSClient see commit body for the reason * remove body.Close it will be closed automatically * stop ws connection outside of write/read routines also, use t.Rate in tm-bench indexer when calculating ID fix gocritic issues * update swagger.yaml * Apply suggestions from code review * fix stylecheck and golint linter warnings * update changelog * update changelog2
5 years ago
rpc/lib/client & server: try to conform to JSON-RPC 2.0 spec (#4141) https://www.jsonrpc.org/specification What is done in this PR: JSONRPCClient: validate that Response.ID matches Request.ID I wanted to do the same for the WSClient, but since we're sending events as responses, not notifications, checking IDs would require storing them in memory indefinitely (and we won't be able to remove them upon client unsubscribing because ID is different then). Request.ID is now optional. Notification is a Request without an ID. Previously "" or 0 were considered as notifications Remove #event suffix from ID from an event response (partially fixes #2949) ID must be either string, int or null AND must be equal to request's ID. Now, because we've implemented events as responses, WS clients are tripping when they see Response.ID("0#event") != Request.ID("0"). Implementing events as requests would require a lot of time (~ 2 days to completely rewrite WS client and server) generate unique ID for each request switch to integer IDs instead of "json-client-XYZ" id=0 method=/subscribe id=0 result=... id=1 method=/abci_query id=1 result=... > send events (resulting from /subscribe) as requests+notifications (not responses) this will require a lot of work. probably not worth it * rpc: generate an unique ID for each request in conformance with JSON-RPC spec * WSClient: check for unsolicited responses * fix golangci warnings * save commit * fix errors * remove ID from responses from subscribe Refs #2949 * clients are safe for concurrent access * tm-bench: switch to int ID * fixes after my own review * comment out sentIDs in WSClient see commit body for the reason * remove body.Close it will be closed automatically * stop ws connection outside of write/read routines also, use t.Rate in tm-bench indexer when calculating ID fix gocritic issues * update swagger.yaml * Apply suggestions from code review * fix stylecheck and golint linter warnings * update changelog * update changelog2
5 years ago
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
8 years ago
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
8 years ago
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
8 years ago
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
8 years ago
  1. package rpcclient
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "net"
  7. "net/http"
  8. "sync"
  9. "time"
  10. "github.com/gorilla/websocket"
  11. "github.com/pkg/errors"
  12. metrics "github.com/rcrowley/go-metrics"
  13. amino "github.com/tendermint/go-amino"
  14. tmrand "github.com/tendermint/tendermint/libs/rand"
  15. "github.com/tendermint/tendermint/libs/service"
  16. types "github.com/tendermint/tendermint/rpc/lib/types"
  17. )
  18. const (
  19. defaultMaxReconnectAttempts = 25
  20. defaultWriteWait = 0
  21. defaultReadWait = 0
  22. defaultPingPeriod = 0
  23. )
  24. // WSClient is a JSON-RPC client, which uses WebSocket for communication with
  25. // the remote server.
  26. //
  27. // WSClient is safe for concurrent use by multiple goroutines.
  28. type WSClient struct { // nolint: maligned
  29. conn *websocket.Conn
  30. cdc *amino.Codec
  31. Address string // IP:PORT or /path/to/socket
  32. Endpoint string // /websocket/url/endpoint
  33. Dialer func(string, string) (net.Conn, error)
  34. // Single user facing channel to read RPCResponses from, closed only when the
  35. // client is being stopped.
  36. ResponsesCh chan types.RPCResponse
  37. // Callback, which will be called each time after successful reconnect.
  38. onReconnect func()
  39. // internal channels
  40. send chan types.RPCRequest // user requests
  41. backlog chan types.RPCRequest // stores a single user request received during a conn failure
  42. reconnectAfter chan error // reconnect requests
  43. readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine
  44. // Maximum reconnect attempts (0 or greater; default: 25).
  45. maxReconnectAttempts int
  46. // Support both ws and wss protocols
  47. protocol string
  48. wg sync.WaitGroup
  49. mtx sync.RWMutex
  50. sentLastPingAt time.Time
  51. reconnecting bool
  52. nextReqID int
  53. // sentIDs map[types.JSONRPCIntID]bool // IDs of the requests currently in flight
  54. // Time allowed to write a message to the server. 0 means block until operation succeeds.
  55. writeWait time.Duration
  56. // Time allowed to read the next message from the server. 0 means block until operation succeeds.
  57. readWait time.Duration
  58. // Send pings to server with this period. Must be less than readWait. If 0, no pings will be sent.
  59. pingPeriod time.Duration
  60. service.BaseService
  61. // Time between sending a ping and receiving a pong. See
  62. // https://godoc.org/github.com/rcrowley/go-metrics#Timer.
  63. PingPongLatencyTimer metrics.Timer
  64. }
  65. // NewWSClient returns a new client. See the commentary on the func(*WSClient)
  66. // functions for a detailed description of how to configure ping period and
  67. // pong wait time. The endpoint argument must begin with a `/`.
  68. // An error is returned on invalid remote. The function panics when remote is nil.
  69. func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) (*WSClient, error) {
  70. parsedURL, err := newParsedURL(remoteAddr)
  71. if err != nil {
  72. return nil, err
  73. }
  74. // default to ws protocol, unless wss is explicitly specified
  75. if parsedURL.Scheme != protoWSS {
  76. parsedURL.Scheme = protoWS
  77. }
  78. dialFn, err := makeHTTPDialer(remoteAddr)
  79. if err != nil {
  80. return nil, err
  81. }
  82. c := &WSClient{
  83. cdc: amino.NewCodec(),
  84. Address: parsedURL.GetTrimmedHostWithPath(),
  85. Dialer: dialFn,
  86. Endpoint: endpoint,
  87. PingPongLatencyTimer: metrics.NewTimer(),
  88. maxReconnectAttempts: defaultMaxReconnectAttempts,
  89. readWait: defaultReadWait,
  90. writeWait: defaultWriteWait,
  91. pingPeriod: defaultPingPeriod,
  92. protocol: parsedURL.Scheme,
  93. // sentIDs: make(map[types.JSONRPCIntID]bool),
  94. }
  95. c.BaseService = *service.NewBaseService(nil, "WSClient", c)
  96. for _, option := range options {
  97. option(c)
  98. }
  99. return c, nil
  100. }
  101. // MaxReconnectAttempts sets the maximum number of reconnect attempts before returning an error.
  102. // It should only be used in the constructor and is not Goroutine-safe.
  103. func MaxReconnectAttempts(max int) func(*WSClient) {
  104. return func(c *WSClient) {
  105. c.maxReconnectAttempts = max
  106. }
  107. }
  108. // ReadWait sets the amount of time to wait before a websocket read times out.
  109. // It should only be used in the constructor and is not Goroutine-safe.
  110. func ReadWait(readWait time.Duration) func(*WSClient) {
  111. return func(c *WSClient) {
  112. c.readWait = readWait
  113. }
  114. }
  115. // WriteWait sets the amount of time to wait before a websocket write times out.
  116. // It should only be used in the constructor and is not Goroutine-safe.
  117. func WriteWait(writeWait time.Duration) func(*WSClient) {
  118. return func(c *WSClient) {
  119. c.writeWait = writeWait
  120. }
  121. }
  122. // PingPeriod sets the duration for sending websocket pings.
  123. // It should only be used in the constructor - not Goroutine-safe.
  124. func PingPeriod(pingPeriod time.Duration) func(*WSClient) {
  125. return func(c *WSClient) {
  126. c.pingPeriod = pingPeriod
  127. }
  128. }
  129. // OnReconnect sets the callback, which will be called every time after
  130. // successful reconnect.
  131. func OnReconnect(cb func()) func(*WSClient) {
  132. return func(c *WSClient) {
  133. c.onReconnect = cb
  134. }
  135. }
  136. // String returns WS client full address.
  137. func (c *WSClient) String() string {
  138. return fmt.Sprintf("WSClient{%s (%s)}", c.Address, c.Endpoint)
  139. }
  140. // OnStart implements service.Service by dialing a server and creating read and
  141. // write routines.
  142. func (c *WSClient) OnStart() error {
  143. err := c.dial()
  144. if err != nil {
  145. return err
  146. }
  147. c.ResponsesCh = make(chan types.RPCResponse)
  148. c.send = make(chan types.RPCRequest)
  149. // 1 additional error may come from the read/write
  150. // goroutine depending on which failed first.
  151. c.reconnectAfter = make(chan error, 1)
  152. // capacity for 1 request. a user won't be able to send more because the send
  153. // channel is unbuffered.
  154. c.backlog = make(chan types.RPCRequest, 1)
  155. c.startReadWriteRoutines()
  156. go c.reconnectRoutine()
  157. return nil
  158. }
  159. // Stop overrides service.Service#Stop. There is no other way to wait until Quit
  160. // channel is closed.
  161. func (c *WSClient) Stop() error {
  162. if err := c.BaseService.Stop(); err != nil {
  163. return err
  164. }
  165. // only close user-facing channels when we can't write to them
  166. c.wg.Wait()
  167. close(c.ResponsesCh)
  168. return nil
  169. }
  170. // IsReconnecting returns true if the client is reconnecting right now.
  171. func (c *WSClient) IsReconnecting() bool {
  172. c.mtx.RLock()
  173. defer c.mtx.RUnlock()
  174. return c.reconnecting
  175. }
  176. // IsActive returns true if the client is running and not reconnecting.
  177. func (c *WSClient) IsActive() bool {
  178. return c.IsRunning() && !c.IsReconnecting()
  179. }
  180. // Send the given RPC request to the server. Results will be available on
  181. // ResponsesCh, errors, if any, on ErrorsCh. Will block until send succeeds or
  182. // ctx.Done is closed.
  183. func (c *WSClient) Send(ctx context.Context, request types.RPCRequest) error {
  184. select {
  185. case c.send <- request:
  186. c.Logger.Info("sent a request", "req", request)
  187. // c.mtx.Lock()
  188. // c.sentIDs[request.ID.(types.JSONRPCIntID)] = true
  189. // c.mtx.Unlock()
  190. return nil
  191. case <-ctx.Done():
  192. return ctx.Err()
  193. }
  194. }
  195. // Call enqueues a call request onto the Send queue. Requests are JSON encoded.
  196. func (c *WSClient) Call(ctx context.Context, method string, params map[string]interface{}) error {
  197. request, err := types.MapToRequest(c.cdc, c.nextRequestID(), method, params)
  198. if err != nil {
  199. return err
  200. }
  201. return c.Send(ctx, request)
  202. }
  203. // CallWithArrayParams enqueues a call request onto the Send queue. Params are
  204. // in a form of array (e.g. []interface{}{"abcd"}). Requests are JSON encoded.
  205. func (c *WSClient) CallWithArrayParams(ctx context.Context, method string, params []interface{}) error {
  206. request, err := types.ArrayToRequest(c.cdc, c.nextRequestID(), method, params)
  207. if err != nil {
  208. return err
  209. }
  210. return c.Send(ctx, request)
  211. }
  212. func (c *WSClient) Codec() *amino.Codec { return c.cdc }
  213. func (c *WSClient) SetCodec(cdc *amino.Codec) { c.cdc = cdc }
  214. ///////////////////////////////////////////////////////////////////////////////
  215. // Private methods
  216. func (c *WSClient) nextRequestID() types.JSONRPCIntID {
  217. c.mtx.Lock()
  218. id := c.nextReqID
  219. c.nextReqID++
  220. c.mtx.Unlock()
  221. return types.JSONRPCIntID(id)
  222. }
  223. func (c *WSClient) dial() error {
  224. dialer := &websocket.Dialer{
  225. NetDial: c.Dialer,
  226. Proxy: http.ProxyFromEnvironment,
  227. }
  228. rHeader := http.Header{}
  229. conn, _, err := dialer.Dial(c.protocol+"://"+c.Address+c.Endpoint, rHeader) // nolint:bodyclose
  230. if err != nil {
  231. return err
  232. }
  233. c.conn = conn
  234. return nil
  235. }
  236. // reconnect tries to redial up to maxReconnectAttempts with exponential
  237. // backoff.
  238. func (c *WSClient) reconnect() error {
  239. attempt := 0
  240. c.mtx.Lock()
  241. c.reconnecting = true
  242. c.mtx.Unlock()
  243. defer func() {
  244. c.mtx.Lock()
  245. c.reconnecting = false
  246. c.mtx.Unlock()
  247. }()
  248. for {
  249. jitterSeconds := time.Duration(tmrand.Float64() * float64(time.Second)) // 1s == (1e9 ns)
  250. backoffDuration := jitterSeconds + ((1 << uint(attempt)) * time.Second)
  251. c.Logger.Info("reconnecting", "attempt", attempt+1, "backoff_duration", backoffDuration)
  252. time.Sleep(backoffDuration)
  253. err := c.dial()
  254. if err != nil {
  255. c.Logger.Error("failed to redial", "err", err)
  256. } else {
  257. c.Logger.Info("reconnected")
  258. if c.onReconnect != nil {
  259. go c.onReconnect()
  260. }
  261. return nil
  262. }
  263. attempt++
  264. if attempt > c.maxReconnectAttempts {
  265. return errors.Wrap(err, "reached maximum reconnect attempts")
  266. }
  267. }
  268. }
  269. func (c *WSClient) startReadWriteRoutines() {
  270. c.wg.Add(2)
  271. c.readRoutineQuit = make(chan struct{})
  272. go c.readRoutine()
  273. go c.writeRoutine()
  274. }
  275. func (c *WSClient) processBacklog() error {
  276. select {
  277. case request := <-c.backlog:
  278. if c.writeWait > 0 {
  279. if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil {
  280. c.Logger.Error("failed to set write deadline", "err", err)
  281. }
  282. }
  283. if err := c.conn.WriteJSON(request); err != nil {
  284. c.Logger.Error("failed to resend request", "err", err)
  285. c.reconnectAfter <- err
  286. // requeue request
  287. c.backlog <- request
  288. return err
  289. }
  290. c.Logger.Info("resend a request", "req", request)
  291. default:
  292. }
  293. return nil
  294. }
  295. func (c *WSClient) reconnectRoutine() {
  296. for {
  297. select {
  298. case originalError := <-c.reconnectAfter:
  299. // wait until writeRoutine and readRoutine finish
  300. c.wg.Wait()
  301. if err := c.reconnect(); err != nil {
  302. c.Logger.Error("failed to reconnect", "err", err, "original_err", originalError)
  303. c.Stop()
  304. return
  305. }
  306. // drain reconnectAfter
  307. LOOP:
  308. for {
  309. select {
  310. case <-c.reconnectAfter:
  311. default:
  312. break LOOP
  313. }
  314. }
  315. err := c.processBacklog()
  316. if err == nil {
  317. c.startReadWriteRoutines()
  318. }
  319. case <-c.Quit():
  320. return
  321. }
  322. }
  323. }
  324. // The client ensures that there is at most one writer to a connection by
  325. // executing all writes from this goroutine.
  326. func (c *WSClient) writeRoutine() {
  327. var ticker *time.Ticker
  328. if c.pingPeriod > 0 {
  329. // ticker with a predefined period
  330. ticker = time.NewTicker(c.pingPeriod)
  331. } else {
  332. // ticker that never fires
  333. ticker = &time.Ticker{C: make(<-chan time.Time)}
  334. }
  335. defer func() {
  336. ticker.Stop()
  337. c.conn.Close()
  338. // err != nil {
  339. // ignore error; it will trigger in tests
  340. // likely because it's closing an already closed connection
  341. // }
  342. c.wg.Done()
  343. }()
  344. for {
  345. select {
  346. case request := <-c.send:
  347. if c.writeWait > 0 {
  348. if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil {
  349. c.Logger.Error("failed to set write deadline", "err", err)
  350. }
  351. }
  352. if err := c.conn.WriteJSON(request); err != nil {
  353. c.Logger.Error("failed to send request", "err", err)
  354. c.reconnectAfter <- err
  355. // add request to the backlog, so we don't lose it
  356. c.backlog <- request
  357. return
  358. }
  359. case <-ticker.C:
  360. if c.writeWait > 0 {
  361. if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil {
  362. c.Logger.Error("failed to set write deadline", "err", err)
  363. }
  364. }
  365. if err := c.conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
  366. c.Logger.Error("failed to write ping", "err", err)
  367. c.reconnectAfter <- err
  368. return
  369. }
  370. c.mtx.Lock()
  371. c.sentLastPingAt = time.Now()
  372. c.mtx.Unlock()
  373. c.Logger.Debug("sent ping")
  374. case <-c.readRoutineQuit:
  375. return
  376. case <-c.Quit():
  377. if err := c.conn.WriteMessage(
  378. websocket.CloseMessage,
  379. websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""),
  380. ); err != nil {
  381. c.Logger.Error("failed to write message", "err", err)
  382. }
  383. return
  384. }
  385. }
  386. }
  387. // The client ensures that there is at most one reader to a connection by
  388. // executing all reads from this goroutine.
  389. func (c *WSClient) readRoutine() {
  390. defer func() {
  391. c.conn.Close()
  392. // err != nil {
  393. // ignore error; it will trigger in tests
  394. // likely because it's closing an already closed connection
  395. // }
  396. c.wg.Done()
  397. }()
  398. c.conn.SetPongHandler(func(string) error {
  399. // gather latency stats
  400. c.mtx.RLock()
  401. t := c.sentLastPingAt
  402. c.mtx.RUnlock()
  403. c.PingPongLatencyTimer.UpdateSince(t)
  404. c.Logger.Debug("got pong")
  405. return nil
  406. })
  407. for {
  408. // reset deadline for every message type (control or data)
  409. if c.readWait > 0 {
  410. if err := c.conn.SetReadDeadline(time.Now().Add(c.readWait)); err != nil {
  411. c.Logger.Error("failed to set read deadline", "err", err)
  412. }
  413. }
  414. _, data, err := c.conn.ReadMessage()
  415. if err != nil {
  416. if !websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
  417. return
  418. }
  419. c.Logger.Error("failed to read response", "err", err)
  420. close(c.readRoutineQuit)
  421. c.reconnectAfter <- err
  422. return
  423. }
  424. var response types.RPCResponse
  425. err = json.Unmarshal(data, &response)
  426. if err != nil {
  427. c.Logger.Error("failed to parse response", "err", err, "data", string(data))
  428. continue
  429. }
  430. if err = validateResponseID(response.ID); err != nil {
  431. c.Logger.Error("error in response ID", "id", response.ID, "err", err)
  432. continue
  433. }
  434. // TODO: events resulting from /subscribe do not work with ->
  435. // because they are implemented as responses with the subscribe request's
  436. // ID. According to the spec, they should be notifications (requests
  437. // without IDs).
  438. // https://github.com/tendermint/tendermint/issues/2949
  439. // c.mtx.Lock()
  440. // if _, ok := c.sentIDs[response.ID.(types.JSONRPCIntID)]; !ok {
  441. // c.Logger.Error("unsolicited response ID", "id", response.ID, "expected", c.sentIDs)
  442. // c.mtx.Unlock()
  443. // continue
  444. // }
  445. // delete(c.sentIDs, response.ID.(types.JSONRPCIntID))
  446. // c.mtx.Unlock()
  447. // Combine a non-blocking read on BaseService.Quit with a non-blocking write on ResponsesCh to avoid blocking
  448. // c.wg.Wait() in c.Stop(). Note we rely on Quit being closed so that it sends unlimited Quit signals to stop
  449. // both readRoutine and writeRoutine
  450. c.Logger.Info("got response", "id", response.ID, "result", fmt.Sprintf("%X", response.Result))
  451. select {
  452. case <-c.Quit():
  453. case c.ResponsesCh <- response:
  454. }
  455. }
  456. }
  457. ///////////////////////////////////////////////////////////////////////////////
  458. // Predefined methods
  459. // Subscribe to a query. Note the server must have a "subscribe" route
  460. // defined.
  461. func (c *WSClient) Subscribe(ctx context.Context, query string) error {
  462. params := map[string]interface{}{"query": query}
  463. return c.Call(ctx, "subscribe", params)
  464. }
  465. // Unsubscribe from a query. Note the server must have a "unsubscribe" route
  466. // defined.
  467. func (c *WSClient) Unsubscribe(ctx context.Context, query string) error {
  468. params := map[string]interface{}{"query": query}
  469. return c.Call(ctx, "unsubscribe", params)
  470. }
  471. // UnsubscribeAll from all. Note the server must have a "unsubscribe_all" route
  472. // defined.
  473. func (c *WSClient) UnsubscribeAll(ctx context.Context) error {
  474. params := map[string]interface{}{}
  475. return c.Call(ctx, "unsubscribe_all", params)
  476. }