You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1386 lines
41 KiB

new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
7 years ago
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
7 years ago
7 years ago
10 years ago
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
7 years ago
10 years ago
7 years ago
10 years ago
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
7 years ago
10 years ago
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
7 years ago
7 years ago
9 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
7 years ago
7 years ago
10 years ago
7 years ago
10 years ago
10 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
10 years ago
7 years ago
10 years ago
7 years ago
7 years ago
10 years ago
10 years ago
7 years ago
7 years ago
10 years ago
10 years ago
7 years ago
7 years ago
7 years ago
7 years ago
10 years ago
10 years ago
10 years ago
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
7 years ago
7 years ago
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
7 years ago
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
7 years ago
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
7 years ago
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
7 years ago
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
10 years ago
10 years ago
7 years ago
10 years ago
10 years ago
9 years ago
7 years ago
10 years ago
10 years ago
10 years ago
10 years ago
7 years ago
7 years ago
10 years ago
7 years ago
10 years ago
10 years ago
7 years ago
9 years ago
10 years ago
10 years ago
7 years ago
10 years ago
10 years ago
8 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
10 years ago
7 years ago
7 years ago
7 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
7 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
7 years ago
  1. package consensus
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "reflect"
  7. "sync"
  8. "time"
  9. "github.com/pkg/errors"
  10. wire "github.com/tendermint/go-wire"
  11. cmn "github.com/tendermint/tmlibs/common"
  12. "github.com/tendermint/tmlibs/log"
  13. cstypes "github.com/tendermint/tendermint/consensus/types"
  14. "github.com/tendermint/tendermint/p2p"
  15. sm "github.com/tendermint/tendermint/state"
  16. "github.com/tendermint/tendermint/types"
  17. )
  18. const (
  19. StateChannel = byte(0x20)
  20. DataChannel = byte(0x21)
  21. VoteChannel = byte(0x22)
  22. VoteSetBitsChannel = byte(0x23)
  23. maxConsensusMessageSize = 1048576 // 1MB; NOTE/TODO: keep in sync with types.PartSet sizes.
  24. )
  25. //-----------------------------------------------------------------------------
  26. // ConsensusReactor defines a reactor for the consensus service.
  27. type ConsensusReactor struct {
  28. p2p.BaseReactor // BaseService + p2p.Switch
  29. conS *ConsensusState
  30. mtx sync.RWMutex
  31. fastSync bool
  32. eventBus *types.EventBus
  33. }
  34. // NewConsensusReactor returns a new ConsensusReactor with the given consensusState.
  35. func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *ConsensusReactor {
  36. conR := &ConsensusReactor{
  37. conS: consensusState,
  38. fastSync: fastSync,
  39. }
  40. conR.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", conR)
  41. return conR
  42. }
  43. // OnStart implements BaseService.
  44. func (conR *ConsensusReactor) OnStart() error {
  45. conR.Logger.Info("ConsensusReactor ", "fastSync", conR.FastSync())
  46. conR.BaseReactor.OnStart()
  47. err := conR.startBroadcastRoutine()
  48. if err != nil {
  49. return err
  50. }
  51. if !conR.FastSync() {
  52. _, err := conR.conS.Start()
  53. if err != nil {
  54. return err
  55. }
  56. }
  57. return nil
  58. }
  59. // OnStop implements BaseService
  60. func (conR *ConsensusReactor) OnStop() {
  61. conR.BaseReactor.OnStop()
  62. conR.conS.Stop()
  63. }
  64. // SwitchToConsensus switches from fast_sync mode to consensus mode.
  65. // It resets the state, turns off fast_sync, and starts the consensus state-machine
  66. func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State, blocksSynced int) {
  67. conR.Logger.Info("SwitchToConsensus")
  68. conR.conS.reconstructLastCommit(state)
  69. // NOTE: The line below causes broadcastNewRoundStepRoutine() to
  70. // broadcast a NewRoundStepMessage.
  71. conR.conS.updateToState(state)
  72. conR.mtx.Lock()
  73. conR.fastSync = false
  74. conR.mtx.Unlock()
  75. if blocksSynced > 0 {
  76. // dont bother with the WAL if we fast synced
  77. conR.conS.doWALCatchup = false
  78. }
  79. conR.conS.Start()
  80. }
  81. // GetChannels implements Reactor
  82. func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
  83. // TODO optimize
  84. return []*p2p.ChannelDescriptor{
  85. &p2p.ChannelDescriptor{
  86. ID: StateChannel,
  87. Priority: 5,
  88. SendQueueCapacity: 100,
  89. },
  90. &p2p.ChannelDescriptor{
  91. ID: DataChannel, // maybe split between gossiping current block and catchup stuff
  92. Priority: 10, // once we gossip the whole block there's nothing left to send until next height or round
  93. SendQueueCapacity: 100,
  94. RecvBufferCapacity: 50 * 4096,
  95. },
  96. &p2p.ChannelDescriptor{
  97. ID: VoteChannel,
  98. Priority: 5,
  99. SendQueueCapacity: 100,
  100. RecvBufferCapacity: 100 * 100,
  101. },
  102. &p2p.ChannelDescriptor{
  103. ID: VoteSetBitsChannel,
  104. Priority: 1,
  105. SendQueueCapacity: 2,
  106. RecvBufferCapacity: 1024,
  107. },
  108. }
  109. }
  110. // AddPeer implements Reactor
  111. func (conR *ConsensusReactor) AddPeer(peer p2p.Peer) {
  112. if !conR.IsRunning() {
  113. return
  114. }
  115. // Create peerState for peer
  116. peerState := NewPeerState(peer).SetLogger(conR.Logger)
  117. peer.Set(types.PeerStateKey, peerState)
  118. // Begin routines for this peer.
  119. go conR.gossipDataRoutine(peer, peerState)
  120. go conR.gossipVotesRoutine(peer, peerState)
  121. go conR.queryMaj23Routine(peer, peerState)
  122. // Send our state to peer.
  123. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
  124. if !conR.FastSync() {
  125. conR.sendNewRoundStepMessages(peer)
  126. }
  127. }
  128. // RemovePeer implements Reactor
  129. func (conR *ConsensusReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
  130. if !conR.IsRunning() {
  131. return
  132. }
  133. // TODO
  134. //peer.Get(PeerStateKey).(*PeerState).Disconnect()
  135. }
  136. // Receive implements Reactor
  137. // NOTE: We process these messages even when we're fast_syncing.
  138. // Messages affect either a peer state or the consensus state.
  139. // Peer state updates can happen in parallel, but processing of
  140. // proposals, block parts, and votes are ordered by the receiveRoutine
  141. // NOTE: blocks on consensus state for proposals, block parts, and votes
  142. func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
  143. if !conR.IsRunning() {
  144. conR.Logger.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes)
  145. return
  146. }
  147. _, msg, err := DecodeMessage(msgBytes)
  148. if err != nil {
  149. conR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
  150. // TODO punish peer?
  151. return
  152. }
  153. conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
  154. // Get peer states
  155. ps := src.Get(types.PeerStateKey).(*PeerState)
  156. switch chID {
  157. case StateChannel:
  158. switch msg := msg.(type) {
  159. case *NewRoundStepMessage:
  160. ps.ApplyNewRoundStepMessage(msg)
  161. case *CommitStepMessage:
  162. ps.ApplyCommitStepMessage(msg)
  163. case *HasVoteMessage:
  164. ps.ApplyHasVoteMessage(msg)
  165. case *VoteSetMaj23Message:
  166. cs := conR.conS
  167. cs.mtx.Lock()
  168. height, votes := cs.Height, cs.Votes
  169. cs.mtx.Unlock()
  170. if height != msg.Height {
  171. return
  172. }
  173. // Peer claims to have a maj23 for some BlockID at H,R,S,
  174. votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.Key(), msg.BlockID)
  175. // Respond with a VoteSetBitsMessage showing which votes we have.
  176. // (and consequently shows which we don't have)
  177. var ourVotes *cmn.BitArray
  178. switch msg.Type {
  179. case types.VoteTypePrevote:
  180. ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
  181. case types.VoteTypePrecommit:
  182. ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID)
  183. default:
  184. conR.Logger.Error("Bad VoteSetBitsMessage field Type")
  185. return
  186. }
  187. src.TrySend(VoteSetBitsChannel, struct{ ConsensusMessage }{&VoteSetBitsMessage{
  188. Height: msg.Height,
  189. Round: msg.Round,
  190. Type: msg.Type,
  191. BlockID: msg.BlockID,
  192. Votes: ourVotes,
  193. }})
  194. case *ProposalHeartbeatMessage:
  195. hb := msg.Heartbeat
  196. conR.Logger.Debug("Received proposal heartbeat message",
  197. "height", hb.Height, "round", hb.Round, "sequence", hb.Sequence,
  198. "valIdx", hb.ValidatorIndex, "valAddr", hb.ValidatorAddress)
  199. default:
  200. conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  201. }
  202. case DataChannel:
  203. if conR.FastSync() {
  204. conR.Logger.Info("Ignoring message received during fastSync", "msg", msg)
  205. return
  206. }
  207. switch msg := msg.(type) {
  208. case *ProposalMessage:
  209. ps.SetHasProposal(msg.Proposal)
  210. conR.conS.peerMsgQueue <- msgInfo{msg, src.Key()}
  211. case *ProposalPOLMessage:
  212. ps.ApplyProposalPOLMessage(msg)
  213. case *BlockPartMessage:
  214. ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
  215. conR.conS.peerMsgQueue <- msgInfo{msg, src.Key()}
  216. default:
  217. conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  218. }
  219. case VoteChannel:
  220. if conR.FastSync() {
  221. conR.Logger.Info("Ignoring message received during fastSync", "msg", msg)
  222. return
  223. }
  224. switch msg := msg.(type) {
  225. case *VoteMessage:
  226. cs := conR.conS
  227. cs.mtx.Lock()
  228. height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
  229. cs.mtx.Unlock()
  230. ps.EnsureVoteBitArrays(height, valSize)
  231. ps.EnsureVoteBitArrays(height-1, lastCommitSize)
  232. ps.SetHasVote(msg.Vote)
  233. cs.peerMsgQueue <- msgInfo{msg, src.Key()}
  234. default:
  235. // don't punish (leave room for soft upgrades)
  236. conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  237. }
  238. case VoteSetBitsChannel:
  239. if conR.FastSync() {
  240. conR.Logger.Info("Ignoring message received during fastSync", "msg", msg)
  241. return
  242. }
  243. switch msg := msg.(type) {
  244. case *VoteSetBitsMessage:
  245. cs := conR.conS
  246. cs.mtx.Lock()
  247. height, votes := cs.Height, cs.Votes
  248. cs.mtx.Unlock()
  249. if height == msg.Height {
  250. var ourVotes *cmn.BitArray
  251. switch msg.Type {
  252. case types.VoteTypePrevote:
  253. ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
  254. case types.VoteTypePrecommit:
  255. ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID)
  256. default:
  257. conR.Logger.Error("Bad VoteSetBitsMessage field Type")
  258. return
  259. }
  260. ps.ApplyVoteSetBitsMessage(msg, ourVotes)
  261. } else {
  262. ps.ApplyVoteSetBitsMessage(msg, nil)
  263. }
  264. default:
  265. // don't punish (leave room for soft upgrades)
  266. conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  267. }
  268. default:
  269. conR.Logger.Error(cmn.Fmt("Unknown chId %X", chID))
  270. }
  271. if err != nil {
  272. conR.Logger.Error("Error in Receive()", "err", err)
  273. }
  274. }
  275. // SetEventBus sets event bus.
  276. func (conR *ConsensusReactor) SetEventBus(b *types.EventBus) {
  277. conR.eventBus = b
  278. conR.conS.SetEventBus(b)
  279. }
  280. // FastSync returns whether the consensus reactor is in fast-sync mode.
  281. func (conR *ConsensusReactor) FastSync() bool {
  282. conR.mtx.RLock()
  283. defer conR.mtx.RUnlock()
  284. return conR.fastSync
  285. }
  286. //--------------------------------------
  287. // startBroadcastRoutine subscribes for new round steps, votes and proposal
  288. // heartbeats using the event bus and starts a go routine to broadcasts events
  289. // to peers upon receiving them.
  290. func (conR *ConsensusReactor) startBroadcastRoutine() error {
  291. const subscriber = "consensus-reactor"
  292. ctx := context.Background()
  293. // new round steps
  294. stepsCh := make(chan interface{})
  295. err := conR.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep, stepsCh)
  296. if err != nil {
  297. return errors.Wrapf(err, "failed to subscribe %s to %s", subscriber, types.EventQueryNewRoundStep)
  298. }
  299. // votes
  300. votesCh := make(chan interface{})
  301. err = conR.eventBus.Subscribe(ctx, subscriber, types.EventQueryVote, votesCh)
  302. if err != nil {
  303. return errors.Wrapf(err, "failed to subscribe %s to %s", subscriber, types.EventQueryVote)
  304. }
  305. // proposal heartbeats
  306. heartbeatsCh := make(chan interface{})
  307. err = conR.eventBus.Subscribe(ctx, subscriber, types.EventQueryProposalHeartbeat, heartbeatsCh)
  308. if err != nil {
  309. return errors.Wrapf(err, "failed to subscribe %s to %s", subscriber, types.EventQueryProposalHeartbeat)
  310. }
  311. go func() {
  312. for {
  313. select {
  314. case data, ok := <-stepsCh:
  315. if ok { // a receive from a closed channel returns the zero value immediately
  316. edrs := data.(types.TMEventData).Unwrap().(types.EventDataRoundState)
  317. conR.broadcastNewRoundStep(edrs.RoundState.(*cstypes.RoundState))
  318. }
  319. case data, ok := <-votesCh:
  320. if ok {
  321. edv := data.(types.TMEventData).Unwrap().(types.EventDataVote)
  322. conR.broadcastHasVoteMessage(edv.Vote)
  323. }
  324. case data, ok := <-heartbeatsCh:
  325. if ok {
  326. edph := data.(types.TMEventData).Unwrap().(types.EventDataProposalHeartbeat)
  327. conR.broadcastProposalHeartbeatMessage(edph)
  328. }
  329. case <-conR.Quit:
  330. conR.eventBus.UnsubscribeAll(ctx, subscriber)
  331. return
  332. }
  333. }
  334. }()
  335. return nil
  336. }
  337. func (conR *ConsensusReactor) broadcastProposalHeartbeatMessage(heartbeat types.EventDataProposalHeartbeat) {
  338. hb := heartbeat.Heartbeat
  339. conR.Logger.Debug("Broadcasting proposal heartbeat message",
  340. "height", hb.Height, "round", hb.Round, "sequence", hb.Sequence)
  341. msg := &ProposalHeartbeatMessage{hb}
  342. conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{msg})
  343. }
  344. func (conR *ConsensusReactor) broadcastNewRoundStep(rs *cstypes.RoundState) {
  345. nrsMsg, csMsg := makeRoundStepMessages(rs)
  346. if nrsMsg != nil {
  347. conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{nrsMsg})
  348. }
  349. if csMsg != nil {
  350. conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{csMsg})
  351. }
  352. }
  353. // Broadcasts HasVoteMessage to peers that care.
  354. func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote) {
  355. msg := &HasVoteMessage{
  356. Height: vote.Height,
  357. Round: vote.Round,
  358. Type: vote.Type,
  359. Index: vote.ValidatorIndex,
  360. }
  361. conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{msg})
  362. /*
  363. // TODO: Make this broadcast more selective.
  364. for _, peer := range conR.Switch.Peers().List() {
  365. ps := peer.Get(PeerStateKey).(*PeerState)
  366. prs := ps.GetRoundState()
  367. if prs.Height == vote.Height {
  368. // TODO: Also filter on round?
  369. peer.TrySend(StateChannel, struct{ ConsensusMessage }{msg})
  370. } else {
  371. // Height doesn't match
  372. // TODO: check a field, maybe CatchupCommitRound?
  373. // TODO: But that requires changing the struct field comment.
  374. }
  375. }
  376. */
  377. }
  378. func makeRoundStepMessages(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
  379. nrsMsg = &NewRoundStepMessage{
  380. Height: rs.Height,
  381. Round: rs.Round,
  382. Step: rs.Step,
  383. SecondsSinceStartTime: int(time.Since(rs.StartTime).Seconds()),
  384. LastCommitRound: rs.LastCommit.Round(),
  385. }
  386. if rs.Step == cstypes.RoundStepCommit {
  387. csMsg = &CommitStepMessage{
  388. Height: rs.Height,
  389. BlockPartsHeader: rs.ProposalBlockParts.Header(),
  390. BlockParts: rs.ProposalBlockParts.BitArray(),
  391. }
  392. }
  393. return
  394. }
  395. func (conR *ConsensusReactor) sendNewRoundStepMessages(peer p2p.Peer) {
  396. rs := conR.conS.GetRoundState()
  397. nrsMsg, csMsg := makeRoundStepMessages(rs)
  398. if nrsMsg != nil {
  399. peer.Send(StateChannel, struct{ ConsensusMessage }{nrsMsg})
  400. }
  401. if csMsg != nil {
  402. peer.Send(StateChannel, struct{ ConsensusMessage }{csMsg})
  403. }
  404. }
  405. func (conR *ConsensusReactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) {
  406. logger := conR.Logger.With("peer", peer)
  407. OUTER_LOOP:
  408. for {
  409. // Manage disconnects from self or peer.
  410. if !peer.IsRunning() || !conR.IsRunning() {
  411. logger.Info("Stopping gossipDataRoutine for peer")
  412. return
  413. }
  414. rs := conR.conS.GetRoundState()
  415. prs := ps.GetRoundState()
  416. // Send proposal Block parts?
  417. if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartsHeader) {
  418. if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
  419. part := rs.ProposalBlockParts.GetPart(index)
  420. msg := &BlockPartMessage{
  421. Height: rs.Height, // This tells peer that this part applies to us.
  422. Round: rs.Round, // This tells peer that this part applies to us.
  423. Part: part,
  424. }
  425. logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round)
  426. if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
  427. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  428. }
  429. continue OUTER_LOOP
  430. }
  431. }
  432. // If the peer is on a previous height, help catch up.
  433. if (0 < prs.Height) && (prs.Height < rs.Height) {
  434. heightLogger := logger.With("height", prs.Height)
  435. // if we never received the commit message from the peer, the block parts wont be initialized
  436. if prs.ProposalBlockParts == nil {
  437. blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height)
  438. if blockMeta == nil {
  439. cmn.PanicCrisis(cmn.Fmt("Failed to load block %d when blockStore is at %d",
  440. prs.Height, conR.conS.blockStore.Height()))
  441. }
  442. ps.InitProposalBlockParts(blockMeta.BlockID.PartsHeader)
  443. // continue the loop since prs is a copy and not effected by this initialization
  444. continue OUTER_LOOP
  445. }
  446. conR.gossipDataForCatchup(heightLogger, rs, prs, ps, peer)
  447. continue OUTER_LOOP
  448. }
  449. // If height and round don't match, sleep.
  450. if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
  451. //logger.Info("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
  452. time.Sleep(conR.conS.config.PeerGossipSleep())
  453. continue OUTER_LOOP
  454. }
  455. // By here, height and round match.
  456. // Proposal block parts were already matched and sent if any were wanted.
  457. // (These can match on hash so the round doesn't matter)
  458. // Now consider sending other things, like the Proposal itself.
  459. // Send Proposal && ProposalPOL BitArray?
  460. if rs.Proposal != nil && !prs.Proposal {
  461. // Proposal: share the proposal metadata with peer.
  462. {
  463. msg := &ProposalMessage{Proposal: rs.Proposal}
  464. logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round)
  465. if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
  466. ps.SetHasProposal(rs.Proposal)
  467. }
  468. }
  469. // ProposalPOL: lets peer know which POL votes we have so far.
  470. // Peer must receive ProposalMessage first.
  471. // rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
  472. // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
  473. if 0 <= rs.Proposal.POLRound {
  474. msg := &ProposalPOLMessage{
  475. Height: rs.Height,
  476. ProposalPOLRound: rs.Proposal.POLRound,
  477. ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
  478. }
  479. logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round)
  480. peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
  481. }
  482. continue OUTER_LOOP
  483. }
  484. // Nothing to do. Sleep.
  485. time.Sleep(conR.conS.config.PeerGossipSleep())
  486. continue OUTER_LOOP
  487. }
  488. }
  489. func (conR *ConsensusReactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundState,
  490. prs *cstypes.PeerRoundState, ps *PeerState, peer p2p.Peer) {
  491. if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
  492. // Ensure that the peer's PartSetHeader is correct
  493. blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height)
  494. if blockMeta == nil {
  495. logger.Error("Failed to load block meta",
  496. "ourHeight", rs.Height, "blockstoreHeight", conR.conS.blockStore.Height())
  497. time.Sleep(conR.conS.config.PeerGossipSleep())
  498. return
  499. } else if !blockMeta.BlockID.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
  500. logger.Info("Peer ProposalBlockPartsHeader mismatch, sleeping",
  501. "blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  502. time.Sleep(conR.conS.config.PeerGossipSleep())
  503. return
  504. }
  505. // Load the part
  506. part := conR.conS.blockStore.LoadBlockPart(prs.Height, index)
  507. if part == nil {
  508. logger.Error("Could not load part", "index", index,
  509. "blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
  510. time.Sleep(conR.conS.config.PeerGossipSleep())
  511. return
  512. }
  513. // Send the part
  514. msg := &BlockPartMessage{
  515. Height: prs.Height, // Not our height, so it doesn't matter.
  516. Round: prs.Round, // Not our height, so it doesn't matter.
  517. Part: part,
  518. }
  519. logger.Debug("Sending block part for catchup", "round", prs.Round, "index", index)
  520. if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
  521. ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
  522. } else {
  523. logger.Debug("Sending block part for catchup failed")
  524. }
  525. return
  526. } else {
  527. //logger.Info("No parts to send in catch-up, sleeping")
  528. time.Sleep(conR.conS.config.PeerGossipSleep())
  529. return
  530. }
  531. }
  532. func (conR *ConsensusReactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) {
  533. logger := conR.Logger.With("peer", peer)
  534. // Simple hack to throttle logs upon sleep.
  535. var sleeping = 0
  536. OUTER_LOOP:
  537. for {
  538. // Manage disconnects from self or peer.
  539. if !peer.IsRunning() || !conR.IsRunning() {
  540. logger.Info("Stopping gossipVotesRoutine for peer")
  541. return
  542. }
  543. rs := conR.conS.GetRoundState()
  544. prs := ps.GetRoundState()
  545. switch sleeping {
  546. case 1: // First sleep
  547. sleeping = 2
  548. case 2: // No more sleep
  549. sleeping = 0
  550. }
  551. //logger.Debug("gossipVotesRoutine", "rsHeight", rs.Height, "rsRound", rs.Round,
  552. // "prsHeight", prs.Height, "prsRound", prs.Round, "prsStep", prs.Step)
  553. // If height matches, then send LastCommit, Prevotes, Precommits.
  554. if rs.Height == prs.Height {
  555. heightLogger := logger.With("height", prs.Height)
  556. if conR.gossipVotesForHeight(heightLogger, rs, prs, ps) {
  557. continue OUTER_LOOP
  558. }
  559. }
  560. // Special catchup logic.
  561. // If peer is lagging by height 1, send LastCommit.
  562. if prs.Height != 0 && rs.Height == prs.Height+1 {
  563. if ps.PickSendVote(rs.LastCommit) {
  564. logger.Debug("Picked rs.LastCommit to send", "height", prs.Height)
  565. continue OUTER_LOOP
  566. }
  567. }
  568. // Catchup logic
  569. // If peer is lagging by more than 1, send Commit.
  570. if prs.Height != 0 && rs.Height >= prs.Height+2 {
  571. // Load the block commit for prs.Height,
  572. // which contains precommit signatures for prs.Height.
  573. commit := conR.conS.blockStore.LoadBlockCommit(prs.Height)
  574. logger.Info("Loaded BlockCommit for catch-up", "height", prs.Height, "commit", commit)
  575. if ps.PickSendVote(commit) {
  576. logger.Debug("Picked Catchup commit to send", "height", prs.Height)
  577. continue OUTER_LOOP
  578. }
  579. }
  580. if sleeping == 0 {
  581. // We sent nothing. Sleep...
  582. sleeping = 1
  583. logger.Debug("No votes to send, sleeping", "rs.Height", rs.Height, "prs.Height", prs.Height,
  584. "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
  585. "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
  586. } else if sleeping == 2 {
  587. // Continued sleep...
  588. sleeping = 1
  589. }
  590. time.Sleep(conR.conS.config.PeerGossipSleep())
  591. continue OUTER_LOOP
  592. }
  593. }
  594. func (conR *ConsensusReactor) gossipVotesForHeight(logger log.Logger, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState) bool {
  595. // If there are lastCommits to send...
  596. if prs.Step == cstypes.RoundStepNewHeight {
  597. if ps.PickSendVote(rs.LastCommit) {
  598. logger.Debug("Picked rs.LastCommit to send")
  599. return true
  600. }
  601. }
  602. // If there are prevotes to send...
  603. if prs.Step <= cstypes.RoundStepPrevote && prs.Round != -1 && prs.Round <= rs.Round {
  604. if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
  605. logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round)
  606. return true
  607. }
  608. }
  609. // If there are precommits to send...
  610. if prs.Step <= cstypes.RoundStepPrecommit && prs.Round != -1 && prs.Round <= rs.Round {
  611. if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
  612. logger.Debug("Picked rs.Precommits(prs.Round) to send", "round", prs.Round)
  613. return true
  614. }
  615. }
  616. // If there are POLPrevotes to send...
  617. if prs.ProposalPOLRound != -1 {
  618. if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
  619. if ps.PickSendVote(polPrevotes) {
  620. logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send",
  621. "round", prs.ProposalPOLRound)
  622. return true
  623. }
  624. }
  625. }
  626. return false
  627. }
  628. // NOTE: `queryMaj23Routine` has a simple crude design since it only comes
  629. // into play for liveness when there's a signature DDoS attack happening.
  630. func (conR *ConsensusReactor) queryMaj23Routine(peer p2p.Peer, ps *PeerState) {
  631. logger := conR.Logger.With("peer", peer)
  632. OUTER_LOOP:
  633. for {
  634. // Manage disconnects from self or peer.
  635. if !peer.IsRunning() || !conR.IsRunning() {
  636. logger.Info("Stopping queryMaj23Routine for peer")
  637. return
  638. }
  639. // Maybe send Height/Round/Prevotes
  640. {
  641. rs := conR.conS.GetRoundState()
  642. prs := ps.GetRoundState()
  643. if rs.Height == prs.Height {
  644. if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {
  645. peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
  646. Height: prs.Height,
  647. Round: prs.Round,
  648. Type: types.VoteTypePrevote,
  649. BlockID: maj23,
  650. }})
  651. time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
  652. }
  653. }
  654. }
  655. // Maybe send Height/Round/Precommits
  656. {
  657. rs := conR.conS.GetRoundState()
  658. prs := ps.GetRoundState()
  659. if rs.Height == prs.Height {
  660. if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
  661. peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
  662. Height: prs.Height,
  663. Round: prs.Round,
  664. Type: types.VoteTypePrecommit,
  665. BlockID: maj23,
  666. }})
  667. time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
  668. }
  669. }
  670. }
  671. // Maybe send Height/Round/ProposalPOL
  672. {
  673. rs := conR.conS.GetRoundState()
  674. prs := ps.GetRoundState()
  675. if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 {
  676. if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {
  677. peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
  678. Height: prs.Height,
  679. Round: prs.ProposalPOLRound,
  680. Type: types.VoteTypePrevote,
  681. BlockID: maj23,
  682. }})
  683. time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
  684. }
  685. }
  686. }
  687. // Little point sending LastCommitRound/LastCommit,
  688. // These are fleeting and non-blocking.
  689. // Maybe send Height/CatchupCommitRound/CatchupCommit.
  690. {
  691. prs := ps.GetRoundState()
  692. if prs.CatchupCommitRound != -1 && 0 < prs.Height && prs.Height <= conR.conS.blockStore.Height() {
  693. commit := conR.conS.LoadCommit(prs.Height)
  694. peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
  695. Height: prs.Height,
  696. Round: commit.Round(),
  697. Type: types.VoteTypePrecommit,
  698. BlockID: commit.BlockID,
  699. }})
  700. time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
  701. }
  702. }
  703. time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
  704. continue OUTER_LOOP
  705. }
  706. }
  707. // String returns a string representation of the ConsensusReactor.
  708. // NOTE: For now, it is just a hard-coded string to avoid accessing unprotected shared variables.
  709. // TODO: improve!
  710. func (conR *ConsensusReactor) String() string {
  711. // better not to access shared variables
  712. return "ConsensusReactor" // conR.StringIndented("")
  713. }
  714. // StringIndented returns an indented string representation of the ConsensusReactor
  715. func (conR *ConsensusReactor) StringIndented(indent string) string {
  716. s := "ConsensusReactor{\n"
  717. s += indent + " " + conR.conS.StringIndented(indent+" ") + "\n"
  718. for _, peer := range conR.Switch.Peers().List() {
  719. ps := peer.Get(types.PeerStateKey).(*PeerState)
  720. s += indent + " " + ps.StringIndented(indent+" ") + "\n"
  721. }
  722. s += indent + "}"
  723. return s
  724. }
  725. //-----------------------------------------------------------------------------
  726. var (
  727. ErrPeerStateHeightRegression = errors.New("Error peer state height regression")
  728. ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
  729. )
  730. // PeerState contains the known state of a peer, including its connection
  731. // and threadsafe access to its PeerRoundState.
  732. type PeerState struct {
  733. Peer p2p.Peer
  734. logger log.Logger
  735. mtx sync.Mutex
  736. cstypes.PeerRoundState
  737. }
  738. // NewPeerState returns a new PeerState for the given Peer
  739. func NewPeerState(peer p2p.Peer) *PeerState {
  740. return &PeerState{
  741. Peer: peer,
  742. logger: log.NewNopLogger(),
  743. PeerRoundState: cstypes.PeerRoundState{
  744. Round: -1,
  745. ProposalPOLRound: -1,
  746. LastCommitRound: -1,
  747. CatchupCommitRound: -1,
  748. },
  749. }
  750. }
  751. func (ps *PeerState) SetLogger(logger log.Logger) *PeerState {
  752. ps.logger = logger
  753. return ps
  754. }
  755. // GetRoundState returns an atomic snapshot of the PeerRoundState.
  756. // There's no point in mutating it since it won't change PeerState.
  757. func (ps *PeerState) GetRoundState() *cstypes.PeerRoundState {
  758. ps.mtx.Lock()
  759. defer ps.mtx.Unlock()
  760. prs := ps.PeerRoundState // copy
  761. return &prs
  762. }
  763. // GetHeight returns an atomic snapshot of the PeerRoundState's height
  764. // used by the mempool to ensure peers are caught up before broadcasting new txs
  765. func (ps *PeerState) GetHeight() int {
  766. ps.mtx.Lock()
  767. defer ps.mtx.Unlock()
  768. return ps.PeerRoundState.Height
  769. }
  770. // SetHasProposal sets the given proposal as known for the peer.
  771. func (ps *PeerState) SetHasProposal(proposal *types.Proposal) {
  772. ps.mtx.Lock()
  773. defer ps.mtx.Unlock()
  774. if ps.Height != proposal.Height || ps.Round != proposal.Round {
  775. return
  776. }
  777. if ps.Proposal {
  778. return
  779. }
  780. ps.Proposal = true
  781. ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader
  782. ps.ProposalBlockParts = cmn.NewBitArray(proposal.BlockPartsHeader.Total)
  783. ps.ProposalPOLRound = proposal.POLRound
  784. ps.ProposalPOL = nil // Nil until ProposalPOLMessage received.
  785. }
  786. // InitProposalBlockParts initializes the peer's proposal block parts header and bit array.
  787. func (ps *PeerState) InitProposalBlockParts(partsHeader types.PartSetHeader) {
  788. ps.mtx.Lock()
  789. defer ps.mtx.Unlock()
  790. if ps.ProposalBlockParts != nil {
  791. return
  792. }
  793. ps.ProposalBlockPartsHeader = partsHeader
  794. ps.ProposalBlockParts = cmn.NewBitArray(partsHeader.Total)
  795. }
  796. // SetHasProposalBlockPart sets the given block part index as known for the peer.
  797. func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
  798. ps.mtx.Lock()
  799. defer ps.mtx.Unlock()
  800. if ps.Height != height || ps.Round != round {
  801. return
  802. }
  803. ps.ProposalBlockParts.SetIndex(index, true)
  804. }
  805. // PickSendVote picks a vote and sends it to the peer.
  806. // Returns true if vote was sent.
  807. func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
  808. if vote, ok := ps.PickVoteToSend(votes); ok {
  809. msg := &VoteMessage{vote}
  810. return ps.Peer.Send(VoteChannel, struct{ ConsensusMessage }{msg})
  811. }
  812. return false
  813. }
  814. // PickVoteToSend picks a vote to send to the peer.
  815. // Returns true if a vote was picked.
  816. // NOTE: `votes` must be the correct Size() for the Height().
  817. func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote, ok bool) {
  818. ps.mtx.Lock()
  819. defer ps.mtx.Unlock()
  820. if votes.Size() == 0 {
  821. return nil, false
  822. }
  823. height, round, type_, size := votes.Height(), votes.Round(), votes.Type(), votes.Size()
  824. // Lazily set data using 'votes'.
  825. if votes.IsCommit() {
  826. ps.ensureCatchupCommitRound(height, round, size)
  827. }
  828. ps.ensureVoteBitArrays(height, size)
  829. psVotes := ps.getVoteBitArray(height, round, type_)
  830. if psVotes == nil {
  831. return nil, false // Not something worth sending
  832. }
  833. if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
  834. ps.setHasVote(height, round, type_, index)
  835. return votes.GetByIndex(index), true
  836. }
  837. return nil, false
  838. }
  839. func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *cmn.BitArray {
  840. if !types.IsVoteTypeValid(type_) {
  841. return nil
  842. }
  843. if ps.Height == height {
  844. if ps.Round == round {
  845. switch type_ {
  846. case types.VoteTypePrevote:
  847. return ps.Prevotes
  848. case types.VoteTypePrecommit:
  849. return ps.Precommits
  850. }
  851. }
  852. if ps.CatchupCommitRound == round {
  853. switch type_ {
  854. case types.VoteTypePrevote:
  855. return nil
  856. case types.VoteTypePrecommit:
  857. return ps.CatchupCommit
  858. }
  859. }
  860. if ps.ProposalPOLRound == round {
  861. switch type_ {
  862. case types.VoteTypePrevote:
  863. return ps.ProposalPOL
  864. case types.VoteTypePrecommit:
  865. return nil
  866. }
  867. }
  868. return nil
  869. }
  870. if ps.Height == height+1 {
  871. if ps.LastCommitRound == round {
  872. switch type_ {
  873. case types.VoteTypePrevote:
  874. return nil
  875. case types.VoteTypePrecommit:
  876. return ps.LastCommit
  877. }
  878. }
  879. return nil
  880. }
  881. return nil
  882. }
  883. // 'round': A round for which we have a +2/3 commit.
  884. func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators int) {
  885. if ps.Height != height {
  886. return
  887. }
  888. /*
  889. NOTE: This is wrong, 'round' could change.
  890. e.g. if orig round is not the same as block LastCommit round.
  891. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
  892. cmn.PanicSanity(cmn.Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
  893. }
  894. */
  895. if ps.CatchupCommitRound == round {
  896. return // Nothing to do!
  897. }
  898. ps.CatchupCommitRound = round
  899. if round == ps.Round {
  900. ps.CatchupCommit = ps.Precommits
  901. } else {
  902. ps.CatchupCommit = cmn.NewBitArray(numValidators)
  903. }
  904. }
  905. // EnsureVoteVitArrays ensures the bit-arrays have been allocated for tracking
  906. // what votes this peer has received.
  907. // NOTE: It's important to make sure that numValidators actually matches
  908. // what the node sees as the number of validators for height.
  909. func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) {
  910. ps.mtx.Lock()
  911. defer ps.mtx.Unlock()
  912. ps.ensureVoteBitArrays(height, numValidators)
  913. }
  914. func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) {
  915. if ps.Height == height {
  916. if ps.Prevotes == nil {
  917. ps.Prevotes = cmn.NewBitArray(numValidators)
  918. }
  919. if ps.Precommits == nil {
  920. ps.Precommits = cmn.NewBitArray(numValidators)
  921. }
  922. if ps.CatchupCommit == nil {
  923. ps.CatchupCommit = cmn.NewBitArray(numValidators)
  924. }
  925. if ps.ProposalPOL == nil {
  926. ps.ProposalPOL = cmn.NewBitArray(numValidators)
  927. }
  928. } else if ps.Height == height+1 {
  929. if ps.LastCommit == nil {
  930. ps.LastCommit = cmn.NewBitArray(numValidators)
  931. }
  932. }
  933. }
  934. // SetHasVote sets the given vote as known by the peer
  935. func (ps *PeerState) SetHasVote(vote *types.Vote) {
  936. ps.mtx.Lock()
  937. defer ps.mtx.Unlock()
  938. ps.setHasVote(vote.Height, vote.Round, vote.Type, vote.ValidatorIndex)
  939. }
  940. func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
  941. logger := ps.logger.With("peerH/R", cmn.Fmt("%d/%d", ps.Height, ps.Round), "H/R", cmn.Fmt("%d/%d", height, round))
  942. logger.Debug("setHasVote", "type", type_, "index", index)
  943. // NOTE: some may be nil BitArrays -> no side effects.
  944. psVotes := ps.getVoteBitArray(height, round, type_)
  945. if psVotes != nil {
  946. psVotes.SetIndex(index, true)
  947. }
  948. }
  949. // ApplyNewRoundStepMessage updates the peer state for the new round.
  950. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
  951. ps.mtx.Lock()
  952. defer ps.mtx.Unlock()
  953. // Ignore duplicates or decreases
  954. if CompareHRS(msg.Height, msg.Round, msg.Step, ps.Height, ps.Round, ps.Step) <= 0 {
  955. return
  956. }
  957. // Just remember these values.
  958. psHeight := ps.Height
  959. psRound := ps.Round
  960. //psStep := ps.Step
  961. psCatchupCommitRound := ps.CatchupCommitRound
  962. psCatchupCommit := ps.CatchupCommit
  963. startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
  964. ps.Height = msg.Height
  965. ps.Round = msg.Round
  966. ps.Step = msg.Step
  967. ps.StartTime = startTime
  968. if psHeight != msg.Height || psRound != msg.Round {
  969. ps.Proposal = false
  970. ps.ProposalBlockPartsHeader = types.PartSetHeader{}
  971. ps.ProposalBlockParts = nil
  972. ps.ProposalPOLRound = -1
  973. ps.ProposalPOL = nil
  974. // We'll update the BitArray capacity later.
  975. ps.Prevotes = nil
  976. ps.Precommits = nil
  977. }
  978. if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
  979. // Peer caught up to CatchupCommitRound.
  980. // Preserve psCatchupCommit!
  981. // NOTE: We prefer to use prs.Precommits if
  982. // pr.Round matches pr.CatchupCommitRound.
  983. ps.Precommits = psCatchupCommit
  984. }
  985. if psHeight != msg.Height {
  986. // Shift Precommits to LastCommit.
  987. if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
  988. ps.LastCommitRound = msg.LastCommitRound
  989. ps.LastCommit = ps.Precommits
  990. } else {
  991. ps.LastCommitRound = msg.LastCommitRound
  992. ps.LastCommit = nil
  993. }
  994. // We'll update the BitArray capacity later.
  995. ps.CatchupCommitRound = -1
  996. ps.CatchupCommit = nil
  997. }
  998. }
  999. // ApplyCommitStepMessage updates the peer state for the new commit.
  1000. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
  1001. ps.mtx.Lock()
  1002. defer ps.mtx.Unlock()
  1003. if ps.Height != msg.Height {
  1004. return
  1005. }
  1006. ps.ProposalBlockPartsHeader = msg.BlockPartsHeader
  1007. ps.ProposalBlockParts = msg.BlockParts
  1008. }
  1009. // ApplyProposalPOLMessage updates the peer state for the new proposal POL.
  1010. func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
  1011. ps.mtx.Lock()
  1012. defer ps.mtx.Unlock()
  1013. if ps.Height != msg.Height {
  1014. return
  1015. }
  1016. if ps.ProposalPOLRound != msg.ProposalPOLRound {
  1017. return
  1018. }
  1019. // TODO: Merge onto existing ps.ProposalPOL?
  1020. // We might have sent some prevotes in the meantime.
  1021. ps.ProposalPOL = msg.ProposalPOL
  1022. }
  1023. // ApplyHasVoteMessage updates the peer state for the new vote.
  1024. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
  1025. ps.mtx.Lock()
  1026. defer ps.mtx.Unlock()
  1027. if ps.Height != msg.Height {
  1028. return
  1029. }
  1030. ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
  1031. }
  1032. // ApplyVoteSetBitsMessage updates the peer state for the bit-array of votes
  1033. // it claims to have for the corresponding BlockID.
  1034. // `ourVotes` is a BitArray of votes we have for msg.BlockID
  1035. // NOTE: if ourVotes is nil (e.g. msg.Height < rs.Height),
  1036. // we conservatively overwrite ps's votes w/ msg.Votes.
  1037. func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *cmn.BitArray) {
  1038. ps.mtx.Lock()
  1039. defer ps.mtx.Unlock()
  1040. votes := ps.getVoteBitArray(msg.Height, msg.Round, msg.Type)
  1041. if votes != nil {
  1042. if ourVotes == nil {
  1043. votes.Update(msg.Votes)
  1044. } else {
  1045. otherVotes := votes.Sub(ourVotes)
  1046. hasVotes := otherVotes.Or(msg.Votes)
  1047. votes.Update(hasVotes)
  1048. }
  1049. }
  1050. }
  1051. // String returns a string representation of the PeerState
  1052. func (ps *PeerState) String() string {
  1053. return ps.StringIndented("")
  1054. }
  1055. // StringIndented returns a string representation of the PeerState
  1056. func (ps *PeerState) StringIndented(indent string) string {
  1057. return fmt.Sprintf(`PeerState{
  1058. %s Key %v
  1059. %s PRS %v
  1060. %s}`,
  1061. indent, ps.Peer.Key(),
  1062. indent, ps.PeerRoundState.StringIndented(indent+" "),
  1063. indent)
  1064. }
  1065. //-----------------------------------------------------------------------------
  1066. // Messages
  1067. const (
  1068. msgTypeNewRoundStep = byte(0x01)
  1069. msgTypeCommitStep = byte(0x02)
  1070. msgTypeProposal = byte(0x11)
  1071. msgTypeProposalPOL = byte(0x12)
  1072. msgTypeBlockPart = byte(0x13) // both block & POL
  1073. msgTypeVote = byte(0x14)
  1074. msgTypeHasVote = byte(0x15)
  1075. msgTypeVoteSetMaj23 = byte(0x16)
  1076. msgTypeVoteSetBits = byte(0x17)
  1077. msgTypeProposalHeartbeat = byte(0x20)
  1078. )
  1079. // ConsensusMessage is a message that can be sent and received on the ConsensusReactor
  1080. type ConsensusMessage interface{}
  1081. var _ = wire.RegisterInterface(
  1082. struct{ ConsensusMessage }{},
  1083. wire.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
  1084. wire.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
  1085. wire.ConcreteType{&ProposalMessage{}, msgTypeProposal},
  1086. wire.ConcreteType{&ProposalPOLMessage{}, msgTypeProposalPOL},
  1087. wire.ConcreteType{&BlockPartMessage{}, msgTypeBlockPart},
  1088. wire.ConcreteType{&VoteMessage{}, msgTypeVote},
  1089. wire.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
  1090. wire.ConcreteType{&VoteSetMaj23Message{}, msgTypeVoteSetMaj23},
  1091. wire.ConcreteType{&VoteSetBitsMessage{}, msgTypeVoteSetBits},
  1092. wire.ConcreteType{&ProposalHeartbeatMessage{}, msgTypeProposalHeartbeat},
  1093. )
  1094. // DecodeMessage decodes the given bytes into a ConsensusMessage.
  1095. // TODO: check for unnecessary extra bytes at the end.
  1096. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
  1097. msgType = bz[0]
  1098. n := new(int)
  1099. r := bytes.NewReader(bz)
  1100. msgI := wire.ReadBinary(struct{ ConsensusMessage }{}, r, maxConsensusMessageSize, n, &err)
  1101. msg = msgI.(struct{ ConsensusMessage }).ConsensusMessage
  1102. return
  1103. }
  1104. //-------------------------------------
  1105. // NewRoundStepMessage is sent for every step taken in the ConsensusState.
  1106. // For every height/round/step transition
  1107. type NewRoundStepMessage struct {
  1108. Height int
  1109. Round int
  1110. Step cstypes.RoundStepType
  1111. SecondsSinceStartTime int
  1112. LastCommitRound int
  1113. }
  1114. // String returns a string representation.
  1115. func (m *NewRoundStepMessage) String() string {
  1116. return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
  1117. m.Height, m.Round, m.Step, m.LastCommitRound)
  1118. }
  1119. //-------------------------------------
  1120. // CommitStepMessage is sent when a block is committed.
  1121. type CommitStepMessage struct {
  1122. Height int
  1123. BlockPartsHeader types.PartSetHeader
  1124. BlockParts *cmn.BitArray
  1125. }
  1126. // String returns a string representation.
  1127. func (m *CommitStepMessage) String() string {
  1128. return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts)
  1129. }
  1130. //-------------------------------------
  1131. // ProposalMessage is sent when a new block is proposed.
  1132. type ProposalMessage struct {
  1133. Proposal *types.Proposal
  1134. }
  1135. // String returns a string representation.
  1136. func (m *ProposalMessage) String() string {
  1137. return fmt.Sprintf("[Proposal %v]", m.Proposal)
  1138. }
  1139. //-------------------------------------
  1140. // ProposalPOLMessage is sent when a previous proposal is re-proposed.
  1141. type ProposalPOLMessage struct {
  1142. Height int
  1143. ProposalPOLRound int
  1144. ProposalPOL *cmn.BitArray
  1145. }
  1146. // String returns a string representation.
  1147. func (m *ProposalPOLMessage) String() string {
  1148. return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
  1149. }
  1150. //-------------------------------------
  1151. // BlockPartMessage is sent when gossipping a piece of the proposed block.
  1152. type BlockPartMessage struct {
  1153. Height int
  1154. Round int
  1155. Part *types.Part
  1156. }
  1157. // String returns a string representation.
  1158. func (m *BlockPartMessage) String() string {
  1159. return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
  1160. }
  1161. //-------------------------------------
  1162. // VoteMessage is sent when voting for a proposal (or lack thereof).
  1163. type VoteMessage struct {
  1164. Vote *types.Vote
  1165. }
  1166. // String returns a string representation.
  1167. func (m *VoteMessage) String() string {
  1168. return fmt.Sprintf("[Vote %v]", m.Vote)
  1169. }
  1170. //-------------------------------------
  1171. // HasVoteMessage is sent to indicate that a particular vote has been received.
  1172. type HasVoteMessage struct {
  1173. Height int
  1174. Round int
  1175. Type byte
  1176. Index int
  1177. }
  1178. // String returns a string representation.
  1179. func (m *HasVoteMessage) String() string {
  1180. return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
  1181. }
  1182. //-------------------------------------
  1183. // VoteSetMaj23Message is sent to indicate that a given BlockID has seen +2/3 votes.
  1184. type VoteSetMaj23Message struct {
  1185. Height int
  1186. Round int
  1187. Type byte
  1188. BlockID types.BlockID
  1189. }
  1190. // String returns a string representation.
  1191. func (m *VoteSetMaj23Message) String() string {
  1192. return fmt.Sprintf("[VSM23 %v/%02d/%v %v]", m.Height, m.Round, m.Type, m.BlockID)
  1193. }
  1194. //-------------------------------------
  1195. // VoteSetBitsMessage is sent to communicate the bit-array of votes seen for the BlockID.
  1196. type VoteSetBitsMessage struct {
  1197. Height int
  1198. Round int
  1199. Type byte
  1200. BlockID types.BlockID
  1201. Votes *cmn.BitArray
  1202. }
  1203. // String returns a string representation.
  1204. func (m *VoteSetBitsMessage) String() string {
  1205. return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes)
  1206. }
  1207. //-------------------------------------
  1208. // ProposalHeartbeatMessage is sent to signal that a node is alive and waiting for transactions for a proposal.
  1209. type ProposalHeartbeatMessage struct {
  1210. Heartbeat *types.Heartbeat
  1211. }
  1212. // String returns a string representation.
  1213. func (m *ProposalHeartbeatMessage) String() string {
  1214. return fmt.Sprintf("[HEARTBEAT %v]", m.Heartbeat)
  1215. }