Release/v0.31.6pull/3697/head v0.31.6
@ -1,5 +1,12 @@ | |||
<!-- Thanks for filing a PR! Before hitting the button, please check the following items.--> | |||
<!-- | |||
Thanks for filing a PR! Before hitting the button, please check the following items. | |||
Please note that every non-trivial PR must reference an issue that explains the | |||
changes in the PR. | |||
--> | |||
* [ ] Referenced an issue explaining the need for the change | |||
* [ ] Updated all relevant documentation in docs | |||
* [ ] Updated all code comments where relevant | |||
* [ ] Wrote tests | |||
@ -0,0 +1,100 @@ | |||
# ADR 037: Deliver Block | |||
Author: Daniil Lashin (@danil-lashin) | |||
## Changelog | |||
13-03-2019: Initial draft | |||
## Context | |||
Initial conversation: https://github.com/tendermint/tendermint/issues/2901 | |||
Some applications can handle transactions in parallel, or at least some | |||
part of tx processing can be parallelized. Now it is not possible for developer | |||
to execute txs in parallel because Tendermint delivers them consequentially. | |||
## Decision | |||
Now Tendermint have `BeginBlock`, `EndBlock`, `Commit`, `DeliverTx` steps | |||
while executing block. This doc proposes merging this steps into one `DeliverBlock` | |||
step. It will allow developers of applications to decide how they want to | |||
execute transactions (in parallel or consequentially). Also it will simplify and | |||
speed up communications between application and Tendermint. | |||
As @jaekwon [mentioned](https://github.com/tendermint/tendermint/issues/2901#issuecomment-477746128) | |||
in discussion not all application will benefit from this solution. In some cases, | |||
when application handles transaction consequentially, it way slow down the blockchain, | |||
because it need to wait until full block is transmitted to application to start | |||
processing it. Also, in the case of complete change of ABCI, we need to force all the apps | |||
to change their implementation completely. That's why I propose to introduce one more ABCI | |||
type. | |||
# Implementation Changes | |||
In addition to default application interface which now have this structure | |||
```go | |||
type Application interface { | |||
// Info and Mempool methods... | |||
// Consensus Connection | |||
InitChain(RequestInitChain) ResponseInitChain // Initialize blockchain with validators and other info from TendermintCore | |||
BeginBlock(RequestBeginBlock) ResponseBeginBlock // Signals the beginning of a block | |||
DeliverTx(tx []byte) ResponseDeliverTx // Deliver a tx for full processing | |||
EndBlock(RequestEndBlock) ResponseEndBlock // Signals the end of a block, returns changes to the validator set | |||
Commit() ResponseCommit // Commit the state and return the application Merkle root hash | |||
} | |||
``` | |||
this doc proposes to add one more: | |||
```go | |||
type Application interface { | |||
// Info and Mempool methods... | |||
// Consensus Connection | |||
InitChain(RequestInitChain) ResponseInitChain // Initialize blockchain with validators and other info from TendermintCore | |||
DeliverBlock(RequestDeliverBlock) ResponseDeliverBlock // Deliver full block | |||
Commit() ResponseCommit // Commit the state and return the application Merkle root hash | |||
} | |||
type RequestDeliverBlock struct { | |||
Hash []byte | |||
Header Header | |||
Txs Txs | |||
LastCommitInfo LastCommitInfo | |||
ByzantineValidators []Evidence | |||
} | |||
type ResponseDeliverBlock struct { | |||
ValidatorUpdates []ValidatorUpdate | |||
ConsensusParamUpdates *ConsensusParams | |||
Tags []common.KVPair | |||
TxResults []ResponseDeliverTx | |||
} | |||
``` | |||
Also, we will need to add new config param, which will specify what kind of ABCI application uses. | |||
For example, it can be `abci_type`. Then we will have 2 types: | |||
- `advanced` - current ABCI | |||
- `simple` - proposed implementation | |||
## Status | |||
In review | |||
## Consequences | |||
### Positive | |||
- much simpler introduction and tutorials for new developers (instead of implementing 5 methods whey | |||
will need to implement only 3) | |||
- txs can be handled in parallel | |||
- simpler interface | |||
- faster communications between Tendermint and application | |||
### Negative | |||
- Tendermint should now support 2 kinds of ABCI |
@ -0,0 +1,534 @@ | |||
# ADR 040: Blockchain Reactor Refactor | |||
## Changelog | |||
19-03-2019: Initial draft | |||
## Context | |||
The Blockchain Reactor's high level responsibility is to enable peers who are far behind the current state of the | |||
blockchain to quickly catch up by downloading many blocks in parallel from its peers, verifying block correctness, and | |||
executing them against the ABCI application. We call the protocol executed by the Blockchain Reactor `fast-sync`. | |||
The current architecture diagram of the blockchain reactor can be found here: | |||
![Blockchain Reactor Architecture Diagram](img/bc-reactor.png) | |||
The current architecture consists of dozens of routines and it is tightly depending on the `Switch`, making writing | |||
unit tests almost impossible. Current tests require setting up complex dependency graphs and dealing with concurrency. | |||
Note that having dozens of routines is in this case overkill as most of the time routines sits idle waiting for | |||
something to happen (message to arrive or timeout to expire). Due to dependency on the `Switch`, testing relatively | |||
complex network scenarios and failures (for example adding and removing peers) is very complex tasks and frequently lead | |||
to complex tests with not deterministic behavior ([#3400]). Impossibility to write proper tests makes confidence in | |||
the code low and this resulted in several issues (some are fixed in the meantime and some are still open): | |||
[#3400], [#2897], [#2896], [#2699], [#2888], [#2457], [#2622], [#2026]. | |||
## Decision | |||
To remedy these issues we plan a major refactor of the blockchain reactor. The proposed architecture is largely inspired | |||
by ADR-30 and is presented on the following diagram: | |||
![Blockchain Reactor Refactor Diagram](img/bc-reactor-refactor.png) | |||
We suggest a concurrency architecture where the core algorithm (we call it `Controller`) is extracted into a finite | |||
state machine. The active routine of the reactor is called `Executor` and is responsible for receiving and sending | |||
messages from/to peers and triggering timeouts. What messages should be sent and timeouts triggered is determined mostly | |||
by the `Controller`. The exception is `Peer Heartbeat` mechanism which is `Executor` responsibility. The heartbeat | |||
mechanism is used to remove slow and unresponsive peers from the peer list. Writing of unit tests is simpler with | |||
this architecture as most of the critical logic is part of the `Controller` function. We expect that simpler concurrency | |||
architecture will not have significant negative effect on the performance of this reactor (to be confirmed by | |||
experimental evaluation). | |||
### Implementation changes | |||
We assume the following system model for "fast sync" protocol: | |||
* a node is connected to a random subset of all nodes that represents its peer set. Some nodes are correct and some | |||
might be faulty. We don't make assumptions about ratio of faulty nodes, i.e., it is possible that all nodes in some | |||
peer set are faulty. | |||
* we assume that communication between correct nodes is synchronous, i.e., if a correct node `p` sends a message `m` to | |||
a correct node `q` at time `t`, then `q` will receive message the latest at time `t+Delta` where `Delta` is a system | |||
parameter that is known by network participants. `Delta` is normally chosen to be an order of magnitude higher than | |||
the real communication delay (maximum) between correct nodes. Therefore if a correct node `p` sends a request message | |||
to a correct node `q` at time `t` and there is no the corresponding reply at time `t + 2*Delta`, then `p` can assume | |||
that `q` is faulty. Note that the network assumptions for the consensus reactor are different (we assume partially | |||
synchronous model there). | |||
The requirements for the "fast sync" protocol are formally specified as follows: | |||
- `Correctness`: If a correct node `p` is connected to a correct node `q` for a long enough period of time, then `p` | |||
- will eventually download all requested blocks from `q`. | |||
- `Termination`: If a set of peers of a correct node `p` is stable (no new nodes are added to the peer set of `p`) for | |||
- a long enough period of time, then protocol eventually terminates. | |||
- `Fairness`: A correct node `p` sends requests for blocks to all peers from its peer set. | |||
As explained above, the `Executor` is responsible for sending and receiving messages that are part of the `fast-sync` | |||
protocol. The following messages are exchanged as part of `fast-sync` protocol: | |||
``` go | |||
type Message int | |||
const ( | |||
MessageUnknown Message = iota | |||
MessageStatusRequest | |||
MessageStatusResponse | |||
MessageBlockRequest | |||
MessageBlockResponse | |||
) | |||
``` | |||
`MessageStatusRequest` is sent periodically to all peers as a request for a peer to provide its current height. It is | |||
part of the `Peer Heartbeat` mechanism and a failure to respond timely to this message results in a peer being removed | |||
from the peer set. Note that the `Peer Heartbeat` mechanism is used only while a peer is in `fast-sync` mode. We assume | |||
here existence of a mechanism that gives node a possibility to inform its peers that it is in the `fast-sync` mode. | |||
``` go | |||
type MessageStatusRequest struct { | |||
SeqNum int64 // sequence number of the request | |||
} | |||
``` | |||
`MessageStatusResponse` is sent as a response to `MessageStatusRequest` to inform requester about the peer current | |||
height. | |||
``` go | |||
type MessageStatusResponse struct { | |||
SeqNum int64 // sequence number of the corresponding request | |||
Height int64 // current peer height | |||
} | |||
``` | |||
`MessageBlockRequest` is used to make a request for a block and the corresponding commit certificate at a given height. | |||
``` go | |||
type MessageBlockRequest struct { | |||
Height int64 | |||
} | |||
``` | |||
`MessageBlockResponse` is a response for the corresponding block request. In addition to providing the block and the | |||
corresponding commit certificate, it contains also a current peer height. | |||
``` go | |||
type MessageBlockResponse struct { | |||
Height int64 | |||
Block Block | |||
Commit Commit | |||
PeerHeight int64 | |||
} | |||
``` | |||
In addition to sending and receiving messages, and `HeartBeat` mechanism, controller is also managing timeouts | |||
that are triggered upon `Controller` request. `Controller` is then informed once a timeout expires. | |||
``` go | |||
type TimeoutTrigger int | |||
const ( | |||
TimeoutUnknown TimeoutTrigger = iota | |||
TimeoutResponseTrigger | |||
TimeoutTerminationTrigger | |||
) | |||
``` | |||
The `Controller` can be modelled as a function with clearly defined inputs: | |||
* `State` - current state of the node. Contains data about connected peers and its behavior, pending requests, | |||
* received blocks, etc. | |||
* `Event` - significant events in the network. | |||
producing clear outputs: | |||
* `State` - updated state of the node, | |||
* `MessageToSend` - signal what message to send and to which peer | |||
* `TimeoutTrigger` - signal that timeout should be triggered. | |||
We consider the following `Event` types: | |||
``` go | |||
type Event int | |||
const ( | |||
EventUnknown Event = iota | |||
EventStatusReport | |||
EventBlockRequest | |||
EventBlockResponse | |||
EventRemovePeer | |||
EventTimeoutResponse | |||
EventTimeoutTermination | |||
) | |||
``` | |||
`EventStatusResponse` event is generated once `MessageStatusResponse` is received by the `Executor`. | |||
``` go | |||
type EventStatusReport struct { | |||
PeerID ID | |||
Height int64 | |||
} | |||
``` | |||
`EventBlockRequest` event is generated once `MessageBlockRequest` is received by the `Executor`. | |||
``` go | |||
type EventBlockRequest struct { | |||
Height int64 | |||
PeerID p2p.ID | |||
} | |||
``` | |||
`EventBlockResponse` event is generated upon reception of `MessageBlockResponse` message by the `Executor`. | |||
``` go | |||
type EventBlockResponse struct { | |||
Height int64 | |||
Block Block | |||
Commit Commit | |||
PeerID ID | |||
PeerHeight int64 | |||
} | |||
``` | |||
`EventRemovePeer` is generated by `Executor` to signal that the connection to a peer is closed due to peer misbehavior. | |||
``` go | |||
type EventRemovePeer struct { | |||
PeerID ID | |||
} | |||
``` | |||
`EventTimeoutResponse` is generated by `Executor` to signal that a timeout triggered by `TimeoutResponseTrigger` has | |||
expired. | |||
``` go | |||
type EventTimeoutResponse struct { | |||
PeerID ID | |||
Height int64 | |||
} | |||
``` | |||
`EventTimeoutTermination` is generated by `Executor` to signal that a timeout triggered by `TimeoutTerminationTrigger` | |||
has expired. | |||
``` go | |||
type EventTimeoutTermination struct { | |||
Height int64 | |||
} | |||
``` | |||
`MessageToSend` is just a wrapper around `Message` type that contains id of the peer to which message should be sent. | |||
``` go | |||
type MessageToSend struct { | |||
PeerID ID | |||
Message Message | |||
} | |||
``` | |||
The Controller state machine can be in two modes: `ModeFastSync` when | |||
a node is trying to catch up with the network by downloading committed blocks, | |||
and `ModeConsensus` in which it executes Tendermint consensus protocol. We | |||
consider that `fast sync` mode terminates once the Controller switch to | |||
`ModeConsensus`. | |||
``` go | |||
type Mode int | |||
const ( | |||
ModeUnknown Mode = iota | |||
ModeFastSync | |||
ModeConsensus | |||
) | |||
``` | |||
`Controller` is managing the following state: | |||
``` go | |||
type ControllerState struct { | |||
Height int64 // the first block that is not committed | |||
Mode Mode // mode of operation | |||
PeerMap map[ID]PeerStats // map of peer IDs to peer statistics | |||
MaxRequestPending int64 // maximum height of the pending requests | |||
FailedRequests []int64 // list of failed block requests | |||
PendingRequestsNum int // total number of pending requests | |||
Store []BlockInfo // contains list of downloaded blocks | |||
Executor BlockExecutor // store, verify and executes blocks | |||
} | |||
``` | |||
`PeerStats` data structure keeps for every peer its current height and a list of pending requests for blocks. | |||
``` go | |||
type PeerStats struct { | |||
Height int64 | |||
PendingRequest int64 // a request sent to this peer | |||
} | |||
``` | |||
`BlockInfo` data structure is used to store information (as part of block store) about downloaded blocks: from what peer | |||
a block and the corresponding commit certificate are received. | |||
``` go | |||
type BlockInfo struct { | |||
Block Block | |||
Commit Commit | |||
PeerID ID // a peer from which we received the corresponding Block and Commit | |||
} | |||
``` | |||
The `Controller` is initialized by providing an initial height (`startHeight`) from which it will start downloading | |||
blocks from peers and the current state of the `BlockExecutor`. | |||
``` go | |||
func NewControllerState(startHeight int64, executor BlockExecutor) ControllerState { | |||
state = ControllerState {} | |||
state.Height = startHeight | |||
state.Mode = ModeFastSync | |||
state.MaxRequestPending = startHeight - 1 | |||
state.PendingRequestsNum = 0 | |||
state.Executor = executor | |||
initialize state.PeerMap, state.FailedRequests and state.Store to empty data structures | |||
return state | |||
} | |||
``` | |||
The core protocol logic is given with the following function: | |||
``` go | |||
func handleEvent(state ControllerState, event Event) (ControllerState, Message, TimeoutTrigger, Error) { | |||
msg = nil | |||
timeout = nil | |||
error = nil | |||
switch state.Mode { | |||
case ModeConsensus: | |||
switch event := event.(type) { | |||
case EventBlockRequest: | |||
msg = createBlockResponseMessage(state, event) | |||
return state, msg, timeout, error | |||
default: | |||
error = "Only respond to BlockRequests while in ModeConsensus!" | |||
return state, msg, timeout, error | |||
} | |||
case ModeFastSync: | |||
switch event := event.(type) { | |||
case EventBlockRequest: | |||
msg = createBlockResponseMessage(state, event) | |||
return state, msg, timeout, error | |||
case EventStatusResponse: | |||
return handleEventStatusResponse(event, state) | |||
case EventRemovePeer: | |||
return handleEventRemovePeer(event, state) | |||
case EventBlockResponse: | |||
return handleEventBlockResponse(event, state) | |||
case EventResponseTimeout: | |||
return handleEventResponseTimeout(event, state) | |||
case EventTerminationTimeout: | |||
// Termination timeout is triggered in case of empty peer set and in case there are no pending requests. | |||
// If this timeout expires and in the meantime no new peers are added or new pending requests are made | |||
// then `fast-sync` mode terminates by switching to `ModeConsensus`. | |||
// Note that termination timeout should be higher than the response timeout. | |||
if state.Height == event.Height && state.PendingRequestsNum == 0 { state.State = ConsensusMode } | |||
return state, msg, timeout, error | |||
default: | |||
error = "Received unknown event type!" | |||
return state, msg, timeout, error | |||
} | |||
} | |||
} | |||
``` | |||
``` go | |||
func createBlockResponseMessage(state ControllerState, event BlockRequest) MessageToSend { | |||
msgToSend = nil | |||
if _, ok := state.PeerMap[event.PeerID]; !ok { peerStats = PeerStats{-1, -1} } | |||
if state.Executor.ContainsBlockWithHeight(event.Height) && event.Height > peerStats.Height { | |||
peerStats = event.Height | |||
msg = BlockResponseMessage{ | |||
Height: event.Height, | |||
Block: state.Executor.getBlock(eventHeight), | |||
Commit: state.Executor.getCommit(eventHeight), | |||
PeerID: event.PeerID, | |||
CurrentHeight: state.Height - 1, | |||
} | |||
msgToSend = MessageToSend { event.PeerID, msg } | |||
} | |||
state.PeerMap[event.PeerID] = peerStats | |||
return msgToSend | |||
} | |||
``` | |||
``` go | |||
func handleEventStatusResponse(event EventStatusResponse, state ControllerState) (ControllerState, MessageToSend, TimeoutTrigger, Error) { | |||
if _, ok := state.PeerMap[event.PeerID]; !ok { | |||
peerStats = PeerStats{ -1, -1 } | |||
} else { | |||
peerStats = state.PeerMap[event.PeerID] | |||
} | |||
if event.Height > peerStats.Height { peerStats.Height = event.Height } | |||
// if there are no pending requests for this peer, try to send him a request for block | |||
if peerStats.PendingRequest == -1 { | |||
msg = createBlockRequestMessages(state, event.PeerID, peerStats.Height) | |||
// msg is nil if no request for block can be made to a peer at this point in time | |||
if msg != nil { | |||
peerStats.PendingRequests = msg.Height | |||
state.PendingRequestsNum++ | |||
// when a request for a block is sent to a peer, a response timeout is triggered. If no corresponding block is sent by the peer | |||
// during response timeout period, then the peer is considered faulty and is removed from the peer set. | |||
timeout = ResponseTimeoutTrigger{ msg.PeerID, msg.Height, PeerTimeout } | |||
} else if state.PendingRequestsNum == 0 { | |||
// if there are no pending requests and no new request can be placed to the peer, termination timeout is triggered. | |||
// If termination timeout expires and we are still at the same height and there are no pending requests, the "fast-sync" | |||
// mode is finished and we switch to `ModeConsensus`. | |||
timeout = TerminationTimeoutTrigger{ state.Height, TerminationTimeout } | |||
} | |||
} | |||
state.PeerMap[event.PeerID] = peerStats | |||
return state, msg, timeout, error | |||
} | |||
``` | |||
``` go | |||
func handleEventRemovePeer(event EventRemovePeer, state ControllerState) (ControllerState, MessageToSend, TimeoutTrigger, Error) { | |||
if _, ok := state.PeerMap[event.PeerID]; ok { | |||
pendingRequest = state.PeerMap[event.PeerID].PendingRequest | |||
// if a peer is removed from the peer set, its pending request is declared failed and added to the `FailedRequests` list | |||
// so it can be retried. | |||
if pendingRequest != -1 { | |||
add(state.FailedRequests, pendingRequest) | |||
} | |||
state.PendingRequestsNum-- | |||
delete(state.PeerMap, event.PeerID) | |||
// if the peer set is empty after removal of this peer then termination timeout is triggered. | |||
if state.PeerMap.isEmpty() { | |||
timeout = TerminationTimeoutTrigger{ state.Height, TerminationTimeout } | |||
} | |||
} else { error = "Removing unknown peer!" } | |||
return state, msg, timeout, error | |||
``` | |||
``` go | |||
func handleEventBlockResponse(event EventBlockResponse, state ControllerState) (ControllerState, MessageToSend, TimeoutTrigger, Error) | |||
if state.PeerMap[event.PeerID] { | |||
peerStats = state.PeerMap[event.PeerID] | |||
// when expected block arrives from a peer, it is added to the store so it can be verified and if correct executed after. | |||
if peerStats.PendingRequest == event.Height { | |||
peerStats.PendingRequest = -1 | |||
state.PendingRequestsNum-- | |||
if event.PeerHeight > peerStats.Height { peerStats.Height = event.PeerHeight } | |||
state.Store[event.Height] = BlockInfo{ event.Block, event.Commit, event.PeerID } | |||
// blocks are verified sequentially so adding a block to the store does not mean that it will be immediately verified | |||
// as some of the previous blocks might be missing. | |||
state = verifyBlocks(state) // it can lead to event.PeerID being removed from peer list | |||
if _, ok := state.PeerMap[event.PeerID]; ok { | |||
// we try to identify new request for a block that can be asked to the peer | |||
msg = createBlockRequestMessage(state, event.PeerID, peerStats.Height) | |||
if msg != nil { | |||
peerStats.PendingRequests = msg.Height | |||
state.PendingRequestsNum++ | |||
// if request for block is made, response timeout is triggered | |||
timeout = ResponseTimeoutTrigger{ msg.PeerID, msg.Height, PeerTimeout } | |||
} else if state.PeerMap.isEmpty() || state.PendingRequestsNum == 0 { | |||
// if the peer map is empty (the peer can be removed as block verification failed) or there are no pending requests | |||
// termination timeout is triggered. | |||
timeout = TerminationTimeoutTrigger{ state.Height, TerminationTimeout } | |||
} | |||
} | |||
} else { error = "Received Block from wrong peer!" } | |||
} else { error = "Received Block from unknown peer!" } | |||
state.PeerMap[event.PeerID] = peerStats | |||
return state, msg, timeout, error | |||
} | |||
``` | |||
``` go | |||
func handleEventResponseTimeout(event, state) { | |||
if _, ok := state.PeerMap[event.PeerID]; ok { | |||
peerStats = state.PeerMap[event.PeerID] | |||
// if a response timeout expires and the peer hasn't delivered the block, the peer is removed from the peer list and | |||
// the request is added to the `FailedRequests` so the block can be downloaded from other peer | |||
if peerStats.PendingRequest == event.Height { | |||
add(state.FailedRequests, pendingRequest) | |||
delete(state.PeerMap, event.PeerID) | |||
state.PendingRequestsNum-- | |||
// if peer set is empty, then termination timeout is triggered | |||
if state.PeerMap.isEmpty() { | |||
timeout = TimeoutTrigger{ state.Height, TerminationTimeout } | |||
} | |||
} | |||
} | |||
return state, msg, timeout, error | |||
} | |||
``` | |||
``` go | |||
func createBlockRequestMessage(state ControllerState, peerID ID, peerHeight int64) MessageToSend { | |||
msg = nil | |||
blockHeight = -1 | |||
r = find request in state.FailedRequests such that r <= peerHeight // returns `nil` if there are no such request | |||
// if there is a height in failed requests that can be downloaded from the peer send request to it | |||
if r != nil { | |||
blockNumber = r | |||
delete(state.FailedRequests, r) | |||
} else if state.MaxRequestPending < peerHeight { | |||
// if height of the maximum pending request is smaller than peer height, then ask peer for next block | |||
state.MaxRequestPending++ | |||
blockHeight = state.MaxRequestPending // increment state.MaxRequestPending and then return the new value | |||
} | |||
if blockHeight > -1 { msg = MessageToSend { peerID, MessageBlockRequest { blockHeight } } | |||
return msg | |||
} | |||
``` | |||
``` go | |||
func verifyBlocks(state State) State { | |||
done = false | |||
for !done { | |||
block = state.Store[height] | |||
if block != nil { | |||
verified = verify block.Block using block.Commit // return `true` is verification succeed, 'false` otherwise | |||
if verified { | |||
block.Execute() // executing block is costly operation so it might make sense executing asynchronously | |||
state.Height++ | |||
} else { | |||
// if block verification failed, then it is added to `FailedRequests` and the peer is removed from the peer set | |||
add(state.FailedRequests, height) | |||
state.Store[height] = nil | |||
if _, ok := state.PeerMap[block.PeerID]; ok { | |||
pendingRequest = state.PeerMap[block.PeerID].PendingRequest | |||
// if there is a pending request sent to the peer that is just to be removed from the peer set, add it to `FailedRequests` | |||
if pendingRequest != -1 { | |||
add(state.FailedRequests, pendingRequest) | |||
state.PendingRequestsNum-- | |||
} | |||
delete(state.PeerMap, event.PeerID) | |||
} | |||
done = true | |||
} | |||
} else { done = true } | |||
} | |||
return state | |||
} | |||
``` | |||
In the proposed architecture `Controller` is not active task, i.e., it is being called by the `Executor`. Depending on | |||
the return values returned by `Controller`,`Executor` will send a message to some peer (`msg` != nil), trigger a | |||
timeout (`timeout` != nil) or deal with errors (`error` != nil). | |||
In case a timeout is triggered, it will provide as an input to `Controller` the corresponding timeout event once | |||
timeout expires. | |||
## Status | |||
Draft. | |||
## Consequences | |||
### Positive | |||
- isolated implementation of the algorithm | |||
- improved testability - simpler to prove correctness | |||
- clearer separation of concerns - easier to reason | |||
### Negative | |||
### Neutral |
@ -0,0 +1,29 @@ | |||
# ADR 041: Application should be in charge of validator set | |||
## Changelog | |||
## Context | |||
Currently Tendermint is in charge of validator set and proposer selection. Application can only update the validator set changes at EndBlock time. | |||
To support Light Client, application should make sure at least 2/3 of validator are same at each round. | |||
Application should have full control on validator set changes and proposer selection. In each round Application can provide the list of validators for next rounds in order with their power. The proposer is the first in the list, in case the proposer is offline, the next one can propose the proposal and so on. | |||
## Decision | |||
## Status | |||
## Consequences | |||
Tendermint is no more in charge of validator set and its changes. The Application should provide the correct information. | |||
However Tendermint can provide psedo-randomness algorithm to help application for selecting proposer in each round. | |||
### Positive | |||
### Negative | |||
### Neutral | |||
## References | |||
@ -0,0 +1,353 @@ | |||
// +build boltdb | |||
package db | |||
import ( | |||
"bytes" | |||
"errors" | |||
"fmt" | |||
"os" | |||
"path/filepath" | |||
"github.com/etcd-io/bbolt" | |||
) | |||
var bucket = []byte("tm") | |||
func init() { | |||
registerDBCreator(BoltDBBackend, func(name, dir string) (DB, error) { | |||
return NewBoltDB(name, dir) | |||
}, false) | |||
} | |||
// BoltDB is a wrapper around etcd's fork of bolt | |||
// (https://github.com/etcd-io/bbolt). | |||
// | |||
// NOTE: All operations (including Set, Delete) are synchronous by default. One | |||
// can globally turn it off by using NoSync config option (not recommended). | |||
// | |||
// A single bucket ([]byte("tm")) is used per a database instance. This could | |||
// lead to performance issues when/if there will be lots of keys. | |||
type BoltDB struct { | |||
db *bbolt.DB | |||
} | |||
// NewBoltDB returns a BoltDB with default options. | |||
func NewBoltDB(name, dir string) (DB, error) { | |||
return NewBoltDBWithOpts(name, dir, bbolt.DefaultOptions) | |||
} | |||
// NewBoltDBWithOpts allows you to supply *bbolt.Options. ReadOnly: true is not | |||
// supported because NewBoltDBWithOpts creates a global bucket. | |||
func NewBoltDBWithOpts(name string, dir string, opts *bbolt.Options) (DB, error) { | |||
if opts.ReadOnly { | |||
return nil, errors.New("ReadOnly: true is not supported") | |||
} | |||
dbPath := filepath.Join(dir, name+".db") | |||
db, err := bbolt.Open(dbPath, os.ModePerm, opts) | |||
if err != nil { | |||
return nil, err | |||
} | |||
// create a global bucket | |||
err = db.Update(func(tx *bbolt.Tx) error { | |||
_, err := tx.CreateBucketIfNotExists(bucket) | |||
return err | |||
}) | |||
if err != nil { | |||
return nil, err | |||
} | |||
return &BoltDB{db: db}, nil | |||
} | |||
func (bdb *BoltDB) Get(key []byte) (value []byte) { | |||
key = nonEmptyKey(nonNilBytes(key)) | |||
err := bdb.db.View(func(tx *bbolt.Tx) error { | |||
b := tx.Bucket(bucket) | |||
value = b.Get(key) | |||
return nil | |||
}) | |||
if err != nil { | |||
panic(err) | |||
} | |||
return | |||
} | |||
func (bdb *BoltDB) Has(key []byte) bool { | |||
return bdb.Get(key) != nil | |||
} | |||
func (bdb *BoltDB) Set(key, value []byte) { | |||
key = nonEmptyKey(nonNilBytes(key)) | |||
value = nonNilBytes(value) | |||
err := bdb.db.Update(func(tx *bbolt.Tx) error { | |||
b := tx.Bucket(bucket) | |||
return b.Put(key, value) | |||
}) | |||
if err != nil { | |||
panic(err) | |||
} | |||
} | |||
func (bdb *BoltDB) SetSync(key, value []byte) { | |||
bdb.Set(key, value) | |||
} | |||
func (bdb *BoltDB) Delete(key []byte) { | |||
key = nonEmptyKey(nonNilBytes(key)) | |||
err := bdb.db.Update(func(tx *bbolt.Tx) error { | |||
return tx.Bucket(bucket).Delete(key) | |||
}) | |||
if err != nil { | |||
panic(err) | |||
} | |||
} | |||
func (bdb *BoltDB) DeleteSync(key []byte) { | |||
bdb.Delete(key) | |||
} | |||
func (bdb *BoltDB) Close() { | |||
bdb.db.Close() | |||
} | |||
func (bdb *BoltDB) Print() { | |||
stats := bdb.db.Stats() | |||
fmt.Printf("%v\n", stats) | |||
err := bdb.db.View(func(tx *bbolt.Tx) error { | |||
tx.Bucket(bucket).ForEach(func(k, v []byte) error { | |||
fmt.Printf("[%X]:\t[%X]\n", k, v) | |||
return nil | |||
}) | |||
return nil | |||
}) | |||
if err != nil { | |||
panic(err) | |||
} | |||
} | |||
func (bdb *BoltDB) Stats() map[string]string { | |||
stats := bdb.db.Stats() | |||
m := make(map[string]string) | |||
// Freelist stats | |||
m["FreePageN"] = fmt.Sprintf("%v", stats.FreePageN) | |||
m["PendingPageN"] = fmt.Sprintf("%v", stats.PendingPageN) | |||
m["FreeAlloc"] = fmt.Sprintf("%v", stats.FreeAlloc) | |||
m["FreelistInuse"] = fmt.Sprintf("%v", stats.FreelistInuse) | |||
// Transaction stats | |||
m["TxN"] = fmt.Sprintf("%v", stats.TxN) | |||
m["OpenTxN"] = fmt.Sprintf("%v", stats.OpenTxN) | |||
return m | |||
} | |||
// boltDBBatch stores key values in sync.Map and dumps them to the underlying | |||
// DB upon Write call. | |||
type boltDBBatch struct { | |||
buffer []struct { | |||
k []byte | |||
v []byte | |||
} | |||
db *BoltDB | |||
} | |||
// NewBatch returns a new batch. | |||
func (bdb *BoltDB) NewBatch() Batch { | |||
return &boltDBBatch{ | |||
buffer: make([]struct { | |||
k []byte | |||
v []byte | |||
}, 0), | |||
db: bdb, | |||
} | |||
} | |||
// It is safe to modify the contents of the argument after Set returns but not | |||
// before. | |||
func (bdb *boltDBBatch) Set(key, value []byte) { | |||
bdb.buffer = append(bdb.buffer, struct { | |||
k []byte | |||
v []byte | |||
}{ | |||
key, value, | |||
}) | |||
} | |||
// It is safe to modify the contents of the argument after Delete returns but | |||
// not before. | |||
func (bdb *boltDBBatch) Delete(key []byte) { | |||
for i, elem := range bdb.buffer { | |||
if bytes.Equal(elem.k, key) { | |||
// delete without preserving order | |||
bdb.buffer[i] = bdb.buffer[len(bdb.buffer)-1] | |||
bdb.buffer = bdb.buffer[:len(bdb.buffer)-1] | |||
return | |||
} | |||
} | |||
} | |||
// NOTE: the operation is synchronous (see BoltDB for reasons) | |||
func (bdb *boltDBBatch) Write() { | |||
err := bdb.db.db.Batch(func(tx *bbolt.Tx) error { | |||
b := tx.Bucket(bucket) | |||
for _, elem := range bdb.buffer { | |||
if putErr := b.Put(elem.k, elem.v); putErr != nil { | |||
return putErr | |||
} | |||
} | |||
return nil | |||
}) | |||
if err != nil { | |||
panic(err) | |||
} | |||
} | |||
func (bdb *boltDBBatch) WriteSync() { | |||
bdb.Write() | |||
} | |||
func (bdb *boltDBBatch) Close() {} | |||
// WARNING: Any concurrent writes or reads will block until the iterator is | |||
// closed. | |||
func (bdb *BoltDB) Iterator(start, end []byte) Iterator { | |||
tx, err := bdb.db.Begin(false) | |||
if err != nil { | |||
panic(err) | |||
} | |||
return newBoltDBIterator(tx, start, end, false) | |||
} | |||
// WARNING: Any concurrent writes or reads will block until the iterator is | |||
// closed. | |||
func (bdb *BoltDB) ReverseIterator(start, end []byte) Iterator { | |||
tx, err := bdb.db.Begin(false) | |||
if err != nil { | |||
panic(err) | |||
} | |||
return newBoltDBIterator(tx, start, end, true) | |||
} | |||
// boltDBIterator allows you to iterate on range of keys/values given some | |||
// start / end keys (nil & nil will result in doing full scan). | |||
type boltDBIterator struct { | |||
tx *bbolt.Tx | |||
itr *bbolt.Cursor | |||
start []byte | |||
end []byte | |||
currentKey []byte | |||
currentValue []byte | |||
isInvalid bool | |||
isReverse bool | |||
} | |||
func newBoltDBIterator(tx *bbolt.Tx, start, end []byte, isReverse bool) *boltDBIterator { | |||
itr := tx.Bucket(bucket).Cursor() | |||
var ck, cv []byte | |||
if isReverse { | |||
if end == nil { | |||
ck, cv = itr.Last() | |||
} else { | |||
_, _ = itr.Seek(end) // after key | |||
ck, cv = itr.Prev() // return to end key | |||
} | |||
} else { | |||
if start == nil { | |||
ck, cv = itr.First() | |||
} else { | |||
ck, cv = itr.Seek(start) | |||
} | |||
} | |||
return &boltDBIterator{ | |||
tx: tx, | |||
itr: itr, | |||
start: start, | |||
end: end, | |||
currentKey: ck, | |||
currentValue: cv, | |||
isReverse: isReverse, | |||
isInvalid: false, | |||
} | |||
} | |||
func (itr *boltDBIterator) Domain() ([]byte, []byte) { | |||
return itr.start, itr.end | |||
} | |||
func (itr *boltDBIterator) Valid() bool { | |||
if itr.isInvalid { | |||
return false | |||
} | |||
// iterated to the end of the cursor | |||
if len(itr.currentKey) == 0 { | |||
itr.isInvalid = true | |||
return false | |||
} | |||
if itr.isReverse { | |||
if itr.start != nil && bytes.Compare(itr.currentKey, itr.start) < 0 { | |||
itr.isInvalid = true | |||
return false | |||
} | |||
} else { | |||
if itr.end != nil && bytes.Compare(itr.end, itr.currentKey) <= 0 { | |||
itr.isInvalid = true | |||
return false | |||
} | |||
} | |||
// Valid | |||
return true | |||
} | |||
func (itr *boltDBIterator) Next() { | |||
itr.assertIsValid() | |||
if itr.isReverse { | |||
itr.currentKey, itr.currentValue = itr.itr.Prev() | |||
} else { | |||
itr.currentKey, itr.currentValue = itr.itr.Next() | |||
} | |||
} | |||
func (itr *boltDBIterator) Key() []byte { | |||
itr.assertIsValid() | |||
return itr.currentKey | |||
} | |||
func (itr *boltDBIterator) Value() []byte { | |||
itr.assertIsValid() | |||
return itr.currentValue | |||
} | |||
func (itr *boltDBIterator) Close() { | |||
err := itr.tx.Rollback() | |||
if err != nil { | |||
panic(err) | |||
} | |||
} | |||
func (itr *boltDBIterator) assertIsValid() { | |||
if !itr.Valid() { | |||
panic("Boltdb-iterator is invalid") | |||
} | |||
} | |||
// nonEmptyKey returns a []byte("nil") if key is empty. | |||
// WARNING: this may collude with "nil" user key! | |||
func nonEmptyKey(key []byte) []byte { | |||
if len(key) == 0 { | |||
return []byte("nil") | |||
} | |||
return key | |||
} |
@ -0,0 +1,37 @@ | |||
// +build boltdb | |||
package db | |||
import ( | |||
"fmt" | |||
"os" | |||
"testing" | |||
"github.com/stretchr/testify/require" | |||
cmn "github.com/tendermint/tendermint/libs/common" | |||
) | |||
func TestBoltDBNewBoltDB(t *testing.T) { | |||
name := fmt.Sprintf("test_%x", cmn.RandStr(12)) | |||
dir := os.TempDir() | |||
defer cleanupDBDir(dir, name) | |||
db, err := NewBoltDB(name, dir) | |||
require.NoError(t, err) | |||
db.Close() | |||
} | |||
func BenchmarkBoltDBRandomReadsWrites(b *testing.B) { | |||
name := fmt.Sprintf("test_%x", cmn.RandStr(12)) | |||
db, err := NewBoltDB(name, "") | |||
if err != nil { | |||
b.Fatal(err) | |||
} | |||
defer func() { | |||
db.Close() | |||
cleanupDBDir("", name) | |||
}() | |||
benchmarkRandomReadsWrites(b, db) | |||
} |
@ -0,0 +1,712 @@ | |||
package mempool | |||
import ( | |||
"bytes" | |||
"container/list" | |||
"crypto/sha256" | |||
"fmt" | |||
"sync" | |||
"sync/atomic" | |||
"time" | |||
"github.com/pkg/errors" | |||
abci "github.com/tendermint/tendermint/abci/types" | |||
cfg "github.com/tendermint/tendermint/config" | |||
auto "github.com/tendermint/tendermint/libs/autofile" | |||
"github.com/tendermint/tendermint/libs/clist" | |||
cmn "github.com/tendermint/tendermint/libs/common" | |||
"github.com/tendermint/tendermint/libs/log" | |||
"github.com/tendermint/tendermint/proxy" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
//-------------------------------------------------------------------------------- | |||
// CListMempool is an ordered in-memory pool for transactions before they are | |||
// proposed in a consensus round. Transaction validity is checked using the | |||
// CheckTx abci message before the transaction is added to the pool. The | |||
// mempool uses a concurrent list structure for storing transactions that can | |||
// be efficiently accessed by multiple concurrent readers. | |||
type CListMempool struct { | |||
config *cfg.MempoolConfig | |||
proxyMtx sync.Mutex | |||
proxyAppConn proxy.AppConnMempool | |||
txs *clist.CList // concurrent linked-list of good txs | |||
preCheck PreCheckFunc | |||
postCheck PostCheckFunc | |||
// Track whether we're rechecking txs. | |||
// These are not protected by a mutex and are expected to be mutated | |||
// in serial (ie. by abci responses which are called in serial). | |||
recheckCursor *clist.CElement // next expected response | |||
recheckEnd *clist.CElement // re-checking stops here | |||
// notify listeners (ie. consensus) when txs are available | |||
notifiedTxsAvailable bool | |||
txsAvailable chan struct{} // fires once for each height, when the mempool is not empty | |||
// Map for quick access to txs to record sender in CheckTx. | |||
// txsMap: txKey -> CElement | |||
txsMap sync.Map | |||
// Atomic integers | |||
height int64 // the last block Update()'d to | |||
rechecking int32 // for re-checking filtered txs on Update() | |||
txsBytes int64 // total size of mempool, in bytes | |||
// Keep a cache of already-seen txs. | |||
// This reduces the pressure on the proxyApp. | |||
cache txCache | |||
// A log of mempool txs | |||
wal *auto.AutoFile | |||
logger log.Logger | |||
metrics *Metrics | |||
} | |||
var _ Mempool = &CListMempool{} | |||
// CListMempoolOption sets an optional parameter on the mempool. | |||
type CListMempoolOption func(*CListMempool) | |||
// NewCListMempool returns a new mempool with the given configuration and connection to an application. | |||
func NewCListMempool( | |||
config *cfg.MempoolConfig, | |||
proxyAppConn proxy.AppConnMempool, | |||
height int64, | |||
options ...CListMempoolOption, | |||
) *CListMempool { | |||
mempool := &CListMempool{ | |||
config: config, | |||
proxyAppConn: proxyAppConn, | |||
txs: clist.New(), | |||
height: height, | |||
rechecking: 0, | |||
recheckCursor: nil, | |||
recheckEnd: nil, | |||
logger: log.NewNopLogger(), | |||
metrics: NopMetrics(), | |||
} | |||
if config.CacheSize > 0 { | |||
mempool.cache = newMapTxCache(config.CacheSize) | |||
} else { | |||
mempool.cache = nopTxCache{} | |||
} | |||
proxyAppConn.SetResponseCallback(mempool.globalCb) | |||
for _, option := range options { | |||
option(mempool) | |||
} | |||
return mempool | |||
} | |||
// NOTE: not thread safe - should only be called once, on startup | |||
func (mem *CListMempool) EnableTxsAvailable() { | |||
mem.txsAvailable = make(chan struct{}, 1) | |||
} | |||
// SetLogger sets the Logger. | |||
func (mem *CListMempool) SetLogger(l log.Logger) { | |||
mem.logger = l | |||
} | |||
// WithPreCheck sets a filter for the mempool to reject a tx if f(tx) returns | |||
// false. This is ran before CheckTx. | |||
func WithPreCheck(f PreCheckFunc) CListMempoolOption { | |||
return func(mem *CListMempool) { mem.preCheck = f } | |||
} | |||
// WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns | |||
// false. This is ran after CheckTx. | |||
func WithPostCheck(f PostCheckFunc) CListMempoolOption { | |||
return func(mem *CListMempool) { mem.postCheck = f } | |||
} | |||
// WithMetrics sets the metrics. | |||
func WithMetrics(metrics *Metrics) CListMempoolOption { | |||
return func(mem *CListMempool) { mem.metrics = metrics } | |||
} | |||
// *panics* if can't create directory or open file. | |||
// *not thread safe* | |||
func (mem *CListMempool) InitWAL() { | |||
walDir := mem.config.WalDir() | |||
err := cmn.EnsureDir(walDir, 0700) | |||
if err != nil { | |||
panic(errors.Wrap(err, "Error ensuring WAL dir")) | |||
} | |||
af, err := auto.OpenAutoFile(walDir + "/wal") | |||
if err != nil { | |||
panic(errors.Wrap(err, "Error opening WAL file")) | |||
} | |||
mem.wal = af | |||
} | |||
func (mem *CListMempool) CloseWAL() { | |||
mem.proxyMtx.Lock() | |||
defer mem.proxyMtx.Unlock() | |||
if err := mem.wal.Close(); err != nil { | |||
mem.logger.Error("Error closing WAL", "err", err) | |||
} | |||
mem.wal = nil | |||
} | |||
func (mem *CListMempool) Lock() { | |||
mem.proxyMtx.Lock() | |||
} | |||
func (mem *CListMempool) Unlock() { | |||
mem.proxyMtx.Unlock() | |||
} | |||
func (mem *CListMempool) Size() int { | |||
return mem.txs.Len() | |||
} | |||
func (mem *CListMempool) TxsBytes() int64 { | |||
return atomic.LoadInt64(&mem.txsBytes) | |||
} | |||
func (mem *CListMempool) FlushAppConn() error { | |||
return mem.proxyAppConn.FlushSync() | |||
} | |||
func (mem *CListMempool) Flush() { | |||
mem.proxyMtx.Lock() | |||
defer mem.proxyMtx.Unlock() | |||
mem.cache.Reset() | |||
for e := mem.txs.Front(); e != nil; e = e.Next() { | |||
mem.txs.Remove(e) | |||
e.DetachPrev() | |||
} | |||
mem.txsMap = sync.Map{} | |||
_ = atomic.SwapInt64(&mem.txsBytes, 0) | |||
} | |||
// TxsFront returns the first transaction in the ordered list for peer | |||
// goroutines to call .NextWait() on. | |||
// FIXME: leaking implementation details! | |||
func (mem *CListMempool) TxsFront() *clist.CElement { | |||
return mem.txs.Front() | |||
} | |||
// TxsWaitChan returns a channel to wait on transactions. It will be closed | |||
// once the mempool is not empty (ie. the internal `mem.txs` has at least one | |||
// element) | |||
func (mem *CListMempool) TxsWaitChan() <-chan struct{} { | |||
return mem.txs.WaitChan() | |||
} | |||
// It blocks if we're waiting on Update() or Reap(). | |||
// cb: A callback from the CheckTx command. | |||
// It gets called from another goroutine. | |||
// CONTRACT: Either cb will get called, or err returned. | |||
func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) { | |||
return mem.CheckTxWithInfo(tx, cb, TxInfo{SenderID: UnknownPeerID}) | |||
} | |||
func (mem *CListMempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) (err error) { | |||
mem.proxyMtx.Lock() | |||
// use defer to unlock mutex because application (*local client*) might panic | |||
defer mem.proxyMtx.Unlock() | |||
var ( | |||
memSize = mem.Size() | |||
txsBytes = mem.TxsBytes() | |||
) | |||
if memSize >= mem.config.Size || | |||
int64(len(tx))+txsBytes > mem.config.MaxTxsBytes { | |||
return ErrMempoolIsFull{ | |||
memSize, mem.config.Size, | |||
txsBytes, mem.config.MaxTxsBytes} | |||
} | |||
// The size of the corresponding amino-encoded TxMessage | |||
// can't be larger than the maxMsgSize, otherwise we can't | |||
// relay it to peers. | |||
if len(tx) > maxTxSize { | |||
return ErrTxTooLarge | |||
} | |||
if mem.preCheck != nil { | |||
if err := mem.preCheck(tx); err != nil { | |||
return ErrPreCheck{err} | |||
} | |||
} | |||
// CACHE | |||
if !mem.cache.Push(tx) { | |||
// Record a new sender for a tx we've already seen. | |||
// Note it's possible a tx is still in the cache but no longer in the mempool | |||
// (eg. after committing a block, txs are removed from mempool but not cache), | |||
// so we only record the sender for txs still in the mempool. | |||
if e, ok := mem.txsMap.Load(txKey(tx)); ok { | |||
memTx := e.(*clist.CElement).Value.(*mempoolTx) | |||
if _, loaded := memTx.senders.LoadOrStore(txInfo.SenderID, true); loaded { | |||
// TODO: consider punishing peer for dups, | |||
// its non-trivial since invalid txs can become valid, | |||
// but they can spam the same tx with little cost to them atm. | |||
} | |||
} | |||
return ErrTxInCache | |||
} | |||
// END CACHE | |||
// WAL | |||
if mem.wal != nil { | |||
// TODO: Notify administrators when WAL fails | |||
_, err := mem.wal.Write([]byte(tx)) | |||
if err != nil { | |||
mem.logger.Error("Error writing to WAL", "err", err) | |||
} | |||
_, err = mem.wal.Write([]byte("\n")) | |||
if err != nil { | |||
mem.logger.Error("Error writing to WAL", "err", err) | |||
} | |||
} | |||
// END WAL | |||
// NOTE: proxyAppConn may error if tx buffer is full | |||
if err = mem.proxyAppConn.Error(); err != nil { | |||
return err | |||
} | |||
reqRes := mem.proxyAppConn.CheckTxAsync(tx) | |||
reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, cb)) | |||
return nil | |||
} | |||
// Global callback that will be called after every ABCI response. | |||
// Having a single global callback avoids needing to set a callback for each request. | |||
// However, processing the checkTx response requires the peerID (so we can track which txs we heard from who), | |||
// and peerID is not included in the ABCI request, so we have to set request-specific callbacks that | |||
// include this information. If we're not in the midst of a recheck, this function will just return, | |||
// so the request specific callback can do the work. | |||
// When rechecking, we don't need the peerID, so the recheck callback happens here. | |||
func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) { | |||
if mem.recheckCursor == nil { | |||
return | |||
} | |||
mem.metrics.RecheckTimes.Add(1) | |||
mem.resCbRecheck(req, res) | |||
// update metrics | |||
mem.metrics.Size.Set(float64(mem.Size())) | |||
} | |||
// Request specific callback that should be set on individual reqRes objects | |||
// to incorporate local information when processing the response. | |||
// This allows us to track the peer that sent us this tx, so we can avoid sending it back to them. | |||
// NOTE: alternatively, we could include this information in the ABCI request itself. | |||
// | |||
// External callers of CheckTx, like the RPC, can also pass an externalCb through here that is called | |||
// when all other response processing is complete. | |||
// | |||
// Used in CheckTxWithInfo to record PeerID who sent us the tx. | |||
func (mem *CListMempool) reqResCb(tx []byte, peerID uint16, externalCb func(*abci.Response)) func(res *abci.Response) { | |||
return func(res *abci.Response) { | |||
if mem.recheckCursor != nil { | |||
// this should never happen | |||
panic("recheck cursor is not nil in reqResCb") | |||
} | |||
mem.resCbFirstTime(tx, peerID, res) | |||
// update metrics | |||
mem.metrics.Size.Set(float64(mem.Size())) | |||
// passed in by the caller of CheckTx, eg. the RPC | |||
if externalCb != nil { | |||
externalCb(res) | |||
} | |||
} | |||
} | |||
// Called from: | |||
// - resCbFirstTime (lock not held) if tx is valid | |||
func (mem *CListMempool) addTx(memTx *mempoolTx) { | |||
e := mem.txs.PushBack(memTx) | |||
mem.txsMap.Store(txKey(memTx.tx), e) | |||
atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx))) | |||
mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx))) | |||
} | |||
// Called from: | |||
// - Update (lock held) if tx was committed | |||
// - resCbRecheck (lock not held) if tx was invalidated | |||
func (mem *CListMempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromCache bool) { | |||
mem.txs.Remove(elem) | |||
elem.DetachPrev() | |||
mem.txsMap.Delete(txKey(tx)) | |||
atomic.AddInt64(&mem.txsBytes, int64(-len(tx))) | |||
if removeFromCache { | |||
mem.cache.Remove(tx) | |||
} | |||
} | |||
// callback, which is called after the app checked the tx for the first time. | |||
// | |||
// The case where the app checks the tx for the second and subsequent times is | |||
// handled by the resCbRecheck callback. | |||
func (mem *CListMempool) resCbFirstTime(tx []byte, peerID uint16, res *abci.Response) { | |||
switch r := res.Value.(type) { | |||
case *abci.Response_CheckTx: | |||
var postCheckErr error | |||
if mem.postCheck != nil { | |||
postCheckErr = mem.postCheck(tx, r.CheckTx) | |||
} | |||
if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil { | |||
memTx := &mempoolTx{ | |||
height: mem.height, | |||
gasWanted: r.CheckTx.GasWanted, | |||
tx: tx, | |||
} | |||
memTx.senders.Store(peerID, true) | |||
mem.addTx(memTx) | |||
mem.logger.Info("Added good transaction", | |||
"tx", txID(tx), | |||
"res", r, | |||
"height", memTx.height, | |||
"total", mem.Size(), | |||
) | |||
mem.notifyTxsAvailable() | |||
} else { | |||
// ignore bad transaction | |||
mem.logger.Info("Rejected bad transaction", "tx", txID(tx), "res", r, "err", postCheckErr) | |||
mem.metrics.FailedTxs.Add(1) | |||
// remove from cache (it might be good later) | |||
mem.cache.Remove(tx) | |||
} | |||
default: | |||
// ignore other messages | |||
} | |||
} | |||
// callback, which is called after the app rechecked the tx. | |||
// | |||
// The case where the app checks the tx for the first time is handled by the | |||
// resCbFirstTime callback. | |||
func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { | |||
switch r := res.Value.(type) { | |||
case *abci.Response_CheckTx: | |||
tx := req.GetCheckTx().Tx | |||
memTx := mem.recheckCursor.Value.(*mempoolTx) | |||
if !bytes.Equal(tx, memTx.tx) { | |||
panic(fmt.Sprintf( | |||
"Unexpected tx response from proxy during recheck\nExpected %X, got %X", | |||
memTx.tx, | |||
tx)) | |||
} | |||
var postCheckErr error | |||
if mem.postCheck != nil { | |||
postCheckErr = mem.postCheck(tx, r.CheckTx) | |||
} | |||
if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil { | |||
// Good, nothing to do. | |||
} else { | |||
// Tx became invalidated due to newly committed block. | |||
mem.logger.Info("Tx is no longer valid", "tx", txID(tx), "res", r, "err", postCheckErr) | |||
// NOTE: we remove tx from the cache because it might be good later | |||
mem.removeTx(tx, mem.recheckCursor, true) | |||
} | |||
if mem.recheckCursor == mem.recheckEnd { | |||
mem.recheckCursor = nil | |||
} else { | |||
mem.recheckCursor = mem.recheckCursor.Next() | |||
} | |||
if mem.recheckCursor == nil { | |||
// Done! | |||
atomic.StoreInt32(&mem.rechecking, 0) | |||
mem.logger.Info("Done rechecking txs") | |||
// incase the recheck removed all txs | |||
if mem.Size() > 0 { | |||
mem.notifyTxsAvailable() | |||
} | |||
} | |||
default: | |||
// ignore other messages | |||
} | |||
} | |||
func (mem *CListMempool) TxsAvailable() <-chan struct{} { | |||
return mem.txsAvailable | |||
} | |||
func (mem *CListMempool) notifyTxsAvailable() { | |||
if mem.Size() == 0 { | |||
panic("notified txs available but mempool is empty!") | |||
} | |||
if mem.txsAvailable != nil && !mem.notifiedTxsAvailable { | |||
// channel cap is 1, so this will send once | |||
mem.notifiedTxsAvailable = true | |||
select { | |||
case mem.txsAvailable <- struct{}{}: | |||
default: | |||
} | |||
} | |||
} | |||
func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { | |||
mem.proxyMtx.Lock() | |||
defer mem.proxyMtx.Unlock() | |||
for atomic.LoadInt32(&mem.rechecking) > 0 { | |||
// TODO: Something better? | |||
time.Sleep(time.Millisecond * 10) | |||
} | |||
var totalBytes int64 | |||
var totalGas int64 | |||
// TODO: we will get a performance boost if we have a good estimate of avg | |||
// size per tx, and set the initial capacity based off of that. | |||
// txs := make([]types.Tx, 0, cmn.MinInt(mem.txs.Len(), max/mem.avgTxSize)) | |||
txs := make([]types.Tx, 0, mem.txs.Len()) | |||
for e := mem.txs.Front(); e != nil; e = e.Next() { | |||
memTx := e.Value.(*mempoolTx) | |||
// Check total size requirement | |||
aminoOverhead := types.ComputeAminoOverhead(memTx.tx, 1) | |||
if maxBytes > -1 && totalBytes+int64(len(memTx.tx))+aminoOverhead > maxBytes { | |||
return txs | |||
} | |||
totalBytes += int64(len(memTx.tx)) + aminoOverhead | |||
// Check total gas requirement. | |||
// If maxGas is negative, skip this check. | |||
// Since newTotalGas < masGas, which | |||
// must be non-negative, it follows that this won't overflow. | |||
newTotalGas := totalGas + memTx.gasWanted | |||
if maxGas > -1 && newTotalGas > maxGas { | |||
return txs | |||
} | |||
totalGas = newTotalGas | |||
txs = append(txs, memTx.tx) | |||
} | |||
return txs | |||
} | |||
func (mem *CListMempool) ReapMaxTxs(max int) types.Txs { | |||
mem.proxyMtx.Lock() | |||
defer mem.proxyMtx.Unlock() | |||
if max < 0 { | |||
max = mem.txs.Len() | |||
} | |||
for atomic.LoadInt32(&mem.rechecking) > 0 { | |||
// TODO: Something better? | |||
time.Sleep(time.Millisecond * 10) | |||
} | |||
txs := make([]types.Tx, 0, cmn.MinInt(mem.txs.Len(), max)) | |||
for e := mem.txs.Front(); e != nil && len(txs) <= max; e = e.Next() { | |||
memTx := e.Value.(*mempoolTx) | |||
txs = append(txs, memTx.tx) | |||
} | |||
return txs | |||
} | |||
func (mem *CListMempool) Update( | |||
height int64, | |||
txs types.Txs, | |||
deliverTxResponses []*abci.ResponseDeliverTx, | |||
preCheck PreCheckFunc, | |||
postCheck PostCheckFunc, | |||
) error { | |||
// Set height | |||
mem.height = height | |||
mem.notifiedTxsAvailable = false | |||
if preCheck != nil { | |||
mem.preCheck = preCheck | |||
} | |||
if postCheck != nil { | |||
mem.postCheck = postCheck | |||
} | |||
for i, tx := range txs { | |||
if deliverTxResponses[i].Code == abci.CodeTypeOK { | |||
// Add valid committed tx to the cache (if missing). | |||
_ = mem.cache.Push(tx) | |||
// Remove valid committed tx from the mempool. | |||
if e, ok := mem.txsMap.Load(txKey(tx)); ok { | |||
mem.removeTx(tx, e.(*clist.CElement), false) | |||
} | |||
} else { | |||
// Allow invalid transactions to be resubmitted. | |||
mem.cache.Remove(tx) | |||
// Don't remove invalid tx from the mempool. | |||
// Otherwise evil proposer can drop valid txs. | |||
// Example: | |||
// 100 -> 101 -> 102 | |||
// Block, proposed by evil proposer: | |||
// 101 -> 102 | |||
// Mempool (if you remove txs): | |||
// 100 | |||
// https://github.com/tendermint/tendermint/issues/3322. | |||
} | |||
} | |||
// Either recheck non-committed txs to see if they became invalid | |||
// or just notify there're some txs left. | |||
if mem.Size() > 0 { | |||
if mem.config.Recheck { | |||
mem.logger.Info("Recheck txs", "numtxs", mem.Size(), "height", height) | |||
mem.recheckTxs() | |||
// At this point, mem.txs are being rechecked. | |||
// mem.recheckCursor re-scans mem.txs and possibly removes some txs. | |||
// Before mem.Reap(), we should wait for mem.recheckCursor to be nil. | |||
} else { | |||
mem.notifyTxsAvailable() | |||
} | |||
} | |||
// Update metrics | |||
mem.metrics.Size.Set(float64(mem.Size())) | |||
return nil | |||
} | |||
func (mem *CListMempool) recheckTxs() { | |||
if mem.Size() == 0 { | |||
panic("recheckTxs is called, but the mempool is empty") | |||
} | |||
atomic.StoreInt32(&mem.rechecking, 1) | |||
mem.recheckCursor = mem.txs.Front() | |||
mem.recheckEnd = mem.txs.Back() | |||
// Push txs to proxyAppConn | |||
// NOTE: globalCb may be called concurrently. | |||
for e := mem.txs.Front(); e != nil; e = e.Next() { | |||
memTx := e.Value.(*mempoolTx) | |||
mem.proxyAppConn.CheckTxAsync(memTx.tx) | |||
} | |||
mem.proxyAppConn.FlushAsync() | |||
} | |||
//-------------------------------------------------------------------------------- | |||
// mempoolTx is a transaction that successfully ran | |||
type mempoolTx struct { | |||
height int64 // height that this tx had been validated in | |||
gasWanted int64 // amount of gas this tx states it will require | |||
tx types.Tx // | |||
// ids of peers who've sent us this tx (as a map for quick lookups). | |||
// senders: PeerID -> bool | |||
senders sync.Map | |||
} | |||
// Height returns the height for this transaction | |||
func (memTx *mempoolTx) Height() int64 { | |||
return atomic.LoadInt64(&memTx.height) | |||
} | |||
//-------------------------------------------------------------------------------- | |||
type txCache interface { | |||
Reset() | |||
Push(tx types.Tx) bool | |||
Remove(tx types.Tx) | |||
} | |||
// mapTxCache maintains a LRU cache of transactions. This only stores the hash | |||
// of the tx, due to memory concerns. | |||
type mapTxCache struct { | |||
mtx sync.Mutex | |||
size int | |||
map_ map[[sha256.Size]byte]*list.Element | |||
list *list.List | |||
} | |||
var _ txCache = (*mapTxCache)(nil) | |||
// newMapTxCache returns a new mapTxCache. | |||
func newMapTxCache(cacheSize int) *mapTxCache { | |||
return &mapTxCache{ | |||
size: cacheSize, | |||
map_: make(map[[sha256.Size]byte]*list.Element, cacheSize), | |||
list: list.New(), | |||
} | |||
} | |||
// Reset resets the cache to an empty state. | |||
func (cache *mapTxCache) Reset() { | |||
cache.mtx.Lock() | |||
cache.map_ = make(map[[sha256.Size]byte]*list.Element, cache.size) | |||
cache.list.Init() | |||
cache.mtx.Unlock() | |||
} | |||
// Push adds the given tx to the cache and returns true. It returns | |||
// false if tx is already in the cache. | |||
func (cache *mapTxCache) Push(tx types.Tx) bool { | |||
cache.mtx.Lock() | |||
defer cache.mtx.Unlock() | |||
// Use the tx hash in the cache | |||
txHash := txKey(tx) | |||
if moved, exists := cache.map_[txHash]; exists { | |||
cache.list.MoveToBack(moved) | |||
return false | |||
} | |||
if cache.list.Len() >= cache.size { | |||
popped := cache.list.Front() | |||
poppedTxHash := popped.Value.([sha256.Size]byte) | |||
delete(cache.map_, poppedTxHash) | |||
if popped != nil { | |||
cache.list.Remove(popped) | |||
} | |||
} | |||
e := cache.list.PushBack(txHash) | |||
cache.map_[txHash] = e | |||
return true | |||
} | |||
// Remove removes the given tx from the cache. | |||
func (cache *mapTxCache) Remove(tx types.Tx) { | |||
cache.mtx.Lock() | |||
txHash := txKey(tx) | |||
popped := cache.map_[txHash] | |||
delete(cache.map_, txHash) | |||
if popped != nil { | |||
cache.list.Remove(popped) | |||
} | |||
cache.mtx.Unlock() | |||
} | |||
type nopTxCache struct{} | |||
var _ txCache = (*nopTxCache)(nil) | |||
func (nopTxCache) Reset() {} | |||
func (nopTxCache) Push(types.Tx) bool { return true } | |||
func (nopTxCache) Remove(types.Tx) {} | |||
//-------------------------------------------------------------------------------- | |||
// txKey is the fixed length array sha256 hash used as the key in maps. | |||
func txKey(tx types.Tx) [sha256.Size]byte { | |||
return sha256.Sum256(tx) | |||
} | |||
// txID is the hex encoded hash of the bytes as a types.Tx. | |||
func txID(tx []byte) string { | |||
return fmt.Sprintf("%X", types.Tx(tx).Hash()) | |||
} |
@ -0,0 +1,24 @@ | |||
// The mempool pushes new txs onto the proxyAppConn. | |||
// It gets a stream of (req, res) tuples from the proxy. | |||
// The mempool stores good txs in a concurrent linked-list. | |||
// Multiple concurrent go-routines can traverse this linked-list | |||
// safely by calling .NextWait() on each element. | |||
// So we have several go-routines: | |||
// 1. Consensus calling Update() and Reap() synchronously | |||
// 2. Many mempool reactor's peer routines calling CheckTx() | |||
// 3. Many mempool reactor's peer routines traversing the txs linked list | |||
// 4. Another goroutine calling GarbageCollectTxs() periodically | |||
// To manage these goroutines, there are three methods of locking. | |||
// 1. Mutations to the linked-list is protected by an internal mtx (CList is goroutine-safe) | |||
// 2. Mutations to the linked-list elements are atomic | |||
// 3. CheckTx() calls can be paused upon Update() and Reap(), protected by .proxyMtx | |||
// Garbage collection of old elements from mempool.txs is handlde via | |||
// the DetachPrev() call, which makes old elements not reachable by | |||
// peer broadcastTxRoutine() automatically garbage collected. | |||
// TODO: Better handle abci client errors. (make it automatically handle connection errors) | |||
package mempool |
@ -0,0 +1,46 @@ | |||
package mempool | |||
import ( | |||
"fmt" | |||
"github.com/pkg/errors" | |||
) | |||
var ( | |||
// ErrTxInCache is returned to the client if we saw tx earlier | |||
ErrTxInCache = errors.New("Tx already exists in cache") | |||
// ErrTxTooLarge means the tx is too big to be sent in a message to other peers | |||
ErrTxTooLarge = fmt.Errorf("Tx too large. Max size is %d", maxTxSize) | |||
) | |||
// ErrMempoolIsFull means Tendermint & an application can't handle that much load | |||
type ErrMempoolIsFull struct { | |||
numTxs int | |||
maxTxs int | |||
txsBytes int64 | |||
maxTxsBytes int64 | |||
} | |||
func (e ErrMempoolIsFull) Error() string { | |||
return fmt.Sprintf( | |||
"mempool is full: number of txs %d (max: %d), total txs bytes %d (max: %d)", | |||
e.numTxs, e.maxTxs, | |||
e.txsBytes, e.maxTxsBytes) | |||
} | |||
// ErrPreCheck is returned when tx is too big | |||
type ErrPreCheck struct { | |||
Reason error | |||
} | |||
func (e ErrPreCheck) Error() string { | |||
return e.Reason.Error() | |||
} | |||
// IsPreCheckError returns true if err is due to pre check failure. | |||
func IsPreCheckError(err error) bool { | |||
_, ok := err.(ErrPreCheck) | |||
return ok | |||
} |
@ -0,0 +1,46 @@ | |||
package mock | |||
import ( | |||
abci "github.com/tendermint/tendermint/abci/types" | |||
"github.com/tendermint/tendermint/libs/clist" | |||
mempl "github.com/tendermint/tendermint/mempool" | |||
"github.com/tendermint/tendermint/types" | |||
) | |||
// Mempool is an empty implementation of a Mempool, useful for testing. | |||
type Mempool struct{} | |||
var _ mempl.Mempool = Mempool{} | |||
func (Mempool) Lock() {} | |||
func (Mempool) Unlock() {} | |||
func (Mempool) Size() int { return 0 } | |||
func (Mempool) CheckTx(_ types.Tx, _ func(*abci.Response)) error { | |||
return nil | |||
} | |||
func (Mempool) CheckTxWithInfo(_ types.Tx, _ func(*abci.Response), | |||
_ mempl.TxInfo) error { | |||
return nil | |||
} | |||
func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } | |||
func (Mempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} } | |||
func (Mempool) Update( | |||
_ int64, | |||
_ types.Txs, | |||
_ []*abci.ResponseDeliverTx, | |||
_ mempl.PreCheckFunc, | |||
_ mempl.PostCheckFunc, | |||
) error { | |||
return nil | |||
} | |||
func (Mempool) Flush() {} | |||
func (Mempool) FlushAppConn() error { return nil } | |||
func (Mempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) } | |||
func (Mempool) EnableTxsAvailable() {} | |||
func (Mempool) TxsBytes() int64 { return 0 } | |||
func (Mempool) TxsFront() *clist.CElement { return nil } | |||
func (Mempool) TxsWaitChan() <-chan struct{} { return nil } | |||
func (Mempool) InitWAL() {} | |||
func (Mempool) CloseWAL() {} |
@ -0,0 +1,92 @@ | |||
package p2p | |||
import ( | |||
"errors" | |||
"sync" | |||
) | |||
// PeerBehaviour are types of reportable behaviours about peers. | |||
type PeerBehaviour int | |||
const ( | |||
PeerBehaviourBadMessage = iota | |||
PeerBehaviourMessageOutOfOrder | |||
PeerBehaviourVote | |||
PeerBehaviourBlockPart | |||
) | |||
// PeerBehaviourReporter provides an interface for reactors to report the behaviour | |||
// of peers synchronously to other components. | |||
type PeerBehaviourReporter interface { | |||
Report(peerID ID, behaviour PeerBehaviour) error | |||
} | |||
// SwitchPeerBehaviouReporter reports peer behaviour to an internal Switch | |||
type SwitchPeerBehaviourReporter struct { | |||
sw *Switch | |||
} | |||
// Return a new SwitchPeerBehaviourReporter instance which wraps the Switch. | |||
func NewSwitchPeerBehaviourReporter(sw *Switch) *SwitchPeerBehaviourReporter { | |||
return &SwitchPeerBehaviourReporter{ | |||
sw: sw, | |||
} | |||
} | |||
// Report reports the behaviour of a peer to the Switch | |||
func (spbr *SwitchPeerBehaviourReporter) Report(peerID ID, behaviour PeerBehaviour) error { | |||
peer := spbr.sw.Peers().Get(peerID) | |||
if peer == nil { | |||
return errors.New("Peer not found") | |||
} | |||
switch behaviour { | |||
case PeerBehaviourVote, PeerBehaviourBlockPart: | |||
spbr.sw.MarkPeerAsGood(peer) | |||
case PeerBehaviourBadMessage: | |||
spbr.sw.StopPeerForError(peer, "Bad message") | |||
case PeerBehaviourMessageOutOfOrder: | |||
spbr.sw.StopPeerForError(peer, "Message out of order") | |||
default: | |||
return errors.New("Unknown behaviour") | |||
} | |||
return nil | |||
} | |||
// MockPeerBehaviourReporter serves a mock concrete implementation of the | |||
// PeerBehaviourReporter interface used in reactor tests to ensure reactors | |||
// report the correct behaviour in manufactured scenarios. | |||
type MockPeerBehaviourReporter struct { | |||
mtx sync.RWMutex | |||
pb map[ID][]PeerBehaviour | |||
} | |||
// NewMockPeerBehaviourReporter returns a PeerBehaviourReporter which records all reported | |||
// behaviours in memory. | |||
func NewMockPeerBehaviourReporter() *MockPeerBehaviourReporter { | |||
return &MockPeerBehaviourReporter{ | |||
pb: map[ID][]PeerBehaviour{}, | |||
} | |||
} | |||
// Report stores the PeerBehaviour produced by the peer identified by peerID. | |||
func (mpbr *MockPeerBehaviourReporter) Report(peerID ID, behaviour PeerBehaviour) { | |||
mpbr.mtx.Lock() | |||
defer mpbr.mtx.Unlock() | |||
mpbr.pb[peerID] = append(mpbr.pb[peerID], behaviour) | |||
} | |||
// GetBehaviours returns all behaviours reported on the peer identified by peerID. | |||
func (mpbr *MockPeerBehaviourReporter) GetBehaviours(peerID ID) []PeerBehaviour { | |||
mpbr.mtx.RLock() | |||
defer mpbr.mtx.RUnlock() | |||
if items, ok := mpbr.pb[peerID]; ok { | |||
result := make([]PeerBehaviour, len(items)) | |||
copy(result, items) | |||
return result | |||
} else { | |||
return []PeerBehaviour{} | |||
} | |||
} |
@ -0,0 +1,180 @@ | |||
package p2p_test | |||
import ( | |||
"sync" | |||
"testing" | |||
"github.com/tendermint/tendermint/p2p" | |||
) | |||
// TestMockPeerBehaviour tests the MockPeerBehaviour' ability to store reported | |||
// peer behaviour in memory indexed by the peerID | |||
func TestMockPeerBehaviourReporter(t *testing.T) { | |||
var peerID p2p.ID = "MockPeer" | |||
pr := p2p.NewMockPeerBehaviourReporter() | |||
behaviours := pr.GetBehaviours(peerID) | |||
if len(behaviours) != 0 { | |||
t.Error("Expected to have no behaviours reported") | |||
} | |||
pr.Report(peerID, p2p.PeerBehaviourBadMessage) | |||
behaviours = pr.GetBehaviours(peerID) | |||
if len(behaviours) != 1 { | |||
t.Error("Expected the peer have one reported behaviour") | |||
} | |||
if behaviours[0] != p2p.PeerBehaviourBadMessage { | |||
t.Error("Expected PeerBehaviourBadMessage to have been reported") | |||
} | |||
} | |||
type scriptedBehaviours struct { | |||
PeerID p2p.ID | |||
Behaviours []p2p.PeerBehaviour | |||
} | |||
type scriptItem struct { | |||
PeerID p2p.ID | |||
Behaviour p2p.PeerBehaviour | |||
} | |||
// equalBehaviours returns true if a and b contain the same PeerBehaviours with | |||
// the same freequency and otherwise false. | |||
func equalBehaviours(a []p2p.PeerBehaviour, b []p2p.PeerBehaviour) bool { | |||
aHistogram := map[p2p.PeerBehaviour]int{} | |||
bHistogram := map[p2p.PeerBehaviour]int{} | |||
for _, behaviour := range a { | |||
aHistogram[behaviour] += 1 | |||
} | |||
for _, behaviour := range b { | |||
bHistogram[behaviour] += 1 | |||
} | |||
if len(aHistogram) != len(bHistogram) { | |||
return false | |||
} | |||
for _, behaviour := range a { | |||
if aHistogram[behaviour] != bHistogram[behaviour] { | |||
return false | |||
} | |||
} | |||
for _, behaviour := range b { | |||
if bHistogram[behaviour] != aHistogram[behaviour] { | |||
return false | |||
} | |||
} | |||
return true | |||
} | |||
// TestEqualPeerBehaviours tests that equalBehaviours can tell that two slices | |||
// of peer behaviours can be compared for the behaviours they contain and the | |||
// freequencies that those behaviours occur. | |||
func TestEqualPeerBehaviours(t *testing.T) { | |||
equals := []struct { | |||
left []p2p.PeerBehaviour | |||
right []p2p.PeerBehaviour | |||
}{ | |||
// Empty sets | |||
{[]p2p.PeerBehaviour{}, []p2p.PeerBehaviour{}}, | |||
// Single behaviours | |||
{[]p2p.PeerBehaviour{p2p.PeerBehaviourVote}, []p2p.PeerBehaviour{p2p.PeerBehaviourVote}}, | |||
// Equal Frequencies | |||
{[]p2p.PeerBehaviour{p2p.PeerBehaviourVote, p2p.PeerBehaviourVote}, | |||
[]p2p.PeerBehaviour{p2p.PeerBehaviourVote, p2p.PeerBehaviourVote}}, | |||
// Equal frequencies different orders | |||
{[]p2p.PeerBehaviour{p2p.PeerBehaviourVote, p2p.PeerBehaviourBlockPart}, | |||
[]p2p.PeerBehaviour{p2p.PeerBehaviourBlockPart, p2p.PeerBehaviourVote}}, | |||
} | |||
for _, test := range equals { | |||
if !equalBehaviours(test.left, test.right) { | |||
t.Errorf("Expected %#v and %#v to be equal", test.left, test.right) | |||
} | |||
} | |||
unequals := []struct { | |||
left []p2p.PeerBehaviour | |||
right []p2p.PeerBehaviour | |||
}{ | |||
// Comparing empty sets to non empty sets | |||
{[]p2p.PeerBehaviour{}, []p2p.PeerBehaviour{p2p.PeerBehaviourVote}}, | |||
// Different behaviours | |||
{[]p2p.PeerBehaviour{p2p.PeerBehaviourVote}, []p2p.PeerBehaviour{p2p.PeerBehaviourBlockPart}}, | |||
// Same behaviour with different frequencies | |||
{[]p2p.PeerBehaviour{p2p.PeerBehaviourVote}, | |||
[]p2p.PeerBehaviour{p2p.PeerBehaviourVote, p2p.PeerBehaviourVote}}, | |||
} | |||
for _, test := range unequals { | |||
if equalBehaviours(test.left, test.right) { | |||
t.Errorf("Expected %#v and %#v to be unequal", test.left, test.right) | |||
} | |||
} | |||
} | |||
// TestPeerBehaviourConcurrency constructs a scenario in which | |||
// multiple goroutines are using the same MockPeerBehaviourReporter instance. | |||
// This test reproduces the conditions in which MockPeerBehaviourReporter will | |||
// be used within a Reactor Receive method tests to ensure thread safety. | |||
func TestMockPeerBehaviourReporterConcurrency(t *testing.T) { | |||
behaviourScript := []scriptedBehaviours{ | |||
{"1", []p2p.PeerBehaviour{p2p.PeerBehaviourVote}}, | |||
{"2", []p2p.PeerBehaviour{p2p.PeerBehaviourVote, p2p.PeerBehaviourVote, p2p.PeerBehaviourVote, p2p.PeerBehaviourVote}}, | |||
{"3", []p2p.PeerBehaviour{p2p.PeerBehaviourBlockPart, p2p.PeerBehaviourVote, p2p.PeerBehaviourBlockPart, p2p.PeerBehaviourVote}}, | |||
{"4", []p2p.PeerBehaviour{p2p.PeerBehaviourVote, p2p.PeerBehaviourVote, p2p.PeerBehaviourVote, p2p.PeerBehaviourVote}}, | |||
{"5", []p2p.PeerBehaviour{p2p.PeerBehaviourBlockPart, p2p.PeerBehaviourVote, p2p.PeerBehaviourBlockPart, p2p.PeerBehaviourVote}}, | |||
} | |||
var receiveWg sync.WaitGroup | |||
pr := p2p.NewMockPeerBehaviourReporter() | |||
scriptItems := make(chan scriptItem) | |||
done := make(chan int) | |||
numConsumers := 3 | |||
for i := 0; i < numConsumers; i++ { | |||
receiveWg.Add(1) | |||
go func() { | |||
defer receiveWg.Done() | |||
for { | |||
select { | |||
case pb := <-scriptItems: | |||
pr.Report(pb.PeerID, pb.Behaviour) | |||
case <-done: | |||
return | |||
} | |||
} | |||
}() | |||
} | |||
var sendingWg sync.WaitGroup | |||
sendingWg.Add(1) | |||
go func() { | |||
defer sendingWg.Done() | |||
for _, item := range behaviourScript { | |||
for _, reason := range item.Behaviours { | |||
scriptItems <- scriptItem{item.PeerID, reason} | |||
} | |||
} | |||
}() | |||
sendingWg.Wait() | |||
for i := 0; i < numConsumers; i++ { | |||
done <- 1 | |||
} | |||
receiveWg.Wait() | |||
for _, items := range behaviourScript { | |||
reported := pr.GetBehaviours(items.PeerID) | |||
if !equalBehaviours(reported, items.Behaviours) { | |||
t.Errorf("Expected peer %s to have behaved \nExpected: %#v \nGot %#v \n", | |||
items.PeerID, items.Behaviours, reported) | |||
} | |||
} | |||
} |
@ -0,0 +1,126 @@ | |||
package client_test | |||
import ( | |||
"bytes" | |||
"fmt" | |||
"github.com/tendermint/tendermint/abci/example/kvstore" | |||
"github.com/tendermint/tendermint/rpc/client" | |||
ctypes "github.com/tendermint/tendermint/rpc/core/types" | |||
rpctest "github.com/tendermint/tendermint/rpc/test" | |||
) | |||
func ExampleHTTP_simple() { | |||
// Start a tendermint node (and kvstore) in the background to test against | |||
app := kvstore.NewKVStoreApplication() | |||
node := rpctest.StartTendermint(app, rpctest.SuppressStdout, rpctest.RecreateConfig) | |||
defer rpctest.StopTendermint(node) | |||
// Create our RPC client | |||
rpcAddr := rpctest.GetConfig().RPC.ListenAddress | |||
c := client.NewHTTP(rpcAddr, "/websocket") | |||
// Create a transaction | |||
k := []byte("name") | |||
v := []byte("satoshi") | |||
tx := append(k, append([]byte("="), v...)...) | |||
// Broadcast the transaction and wait for it to commit (rather use | |||
// c.BroadcastTxSync though in production) | |||
bres, err := c.BroadcastTxCommit(tx) | |||
if err != nil { | |||
panic(err) | |||
} | |||
if bres.CheckTx.IsErr() || bres.DeliverTx.IsErr() { | |||
panic("BroadcastTxCommit transaction failed") | |||
} | |||
// Now try to fetch the value for the key | |||
qres, err := c.ABCIQuery("/key", k) | |||
if err != nil { | |||
panic(err) | |||
} | |||
if qres.Response.IsErr() { | |||
panic("ABCIQuery failed") | |||
} | |||
if !bytes.Equal(qres.Response.Key, k) { | |||
panic("returned key does not match queried key") | |||
} | |||
if !bytes.Equal(qres.Response.Value, v) { | |||
panic("returned value does not match sent value") | |||
} | |||
fmt.Println("Sent tx :", string(tx)) | |||
fmt.Println("Queried for :", string(qres.Response.Key)) | |||
fmt.Println("Got value :", string(qres.Response.Value)) | |||
// Output: | |||
// Sent tx : name=satoshi | |||
// Queried for : name | |||
// Got value : satoshi | |||
} | |||
func ExampleHTTP_batching() { | |||
// Start a tendermint node (and kvstore) in the background to test against | |||
app := kvstore.NewKVStoreApplication() | |||
node := rpctest.StartTendermint(app, rpctest.SuppressStdout, rpctest.RecreateConfig) | |||
defer rpctest.StopTendermint(node) | |||
// Create our RPC client | |||
rpcAddr := rpctest.GetConfig().RPC.ListenAddress | |||
c := client.NewHTTP(rpcAddr, "/websocket") | |||
// Create our two transactions | |||
k1 := []byte("firstName") | |||
v1 := []byte("satoshi") | |||
tx1 := append(k1, append([]byte("="), v1...)...) | |||
k2 := []byte("lastName") | |||
v2 := []byte("nakamoto") | |||
tx2 := append(k2, append([]byte("="), v2...)...) | |||
txs := [][]byte{tx1, tx2} | |||
// Create a new batch | |||
batch := c.NewBatch() | |||
// Queue up our transactions | |||
for _, tx := range txs { | |||
if _, err := batch.BroadcastTxCommit(tx); err != nil { | |||
panic(err) | |||
} | |||
} | |||
// Send the batch of 2 transactions | |||
if _, err := batch.Send(); err != nil { | |||
panic(err) | |||
} | |||
// Now let's query for the original results as a batch | |||
keys := [][]byte{k1, k2} | |||
for _, key := range keys { | |||
if _, err := batch.ABCIQuery("/key", key); err != nil { | |||
panic(err) | |||
} | |||
} | |||
// Send the 2 queries and keep the results | |||
results, err := batch.Send() | |||
if err != nil { | |||
panic(err) | |||
} | |||
// Each result in the returned list is the deserialized result of each | |||
// respective ABCIQuery response | |||
for _, result := range results { | |||
qr, ok := result.(*ctypes.ResultABCIQuery) | |||
if !ok { | |||
panic("invalid result type from ABCIQuery request") | |||
} | |||
fmt.Println(string(qr.Response.Key), "=", string(qr.Response.Value)) | |||
} | |||
// Output: | |||
// firstName = satoshi | |||
// lastName = nakamoto | |||
} |