Browse Source

Merge pull request #3489 from tendermint/release/v0.31.1

Release/v0.31.1
pull/3501/head v0.31.1
Ethan Buchman 5 years ago
committed by GitHub
parent
commit
a0234affb6
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
48 changed files with 1094 additions and 852 deletions
  1. +54
    -0
      CHANGELOG.md
  2. +1
    -0
      CHANGELOG_PENDING.md
  3. +41
    -11
      blockchain/pool.go
  4. +43
    -3
      blockchain/pool_test.go
  5. +29
    -0
      config/config.go
  6. +11
    -0
      config/toml.go
  7. +1
    -1
      consensus/replay.go
  8. +3
    -3
      consensus/state_test.go
  9. +0
    -3
      crypto/doc.go
  10. +0
    -7
      crypto/example_test.go
  11. +0
    -8
      crypto/hash.go
  12. +1
    -1
      docs/networks/docker-compose.md
  13. +6
    -4
      docs/spec/blockchain/blockchain.md
  14. +1
    -1
      docs/spec/blockchain/encoding.md
  15. +1
    -1
      docs/spec/consensus/abci.md
  16. +1
    -1
      docs/spec/consensus/wal.md
  17. +275
    -30
      docs/spec/reactors/consensus/proposer-selection.md
  18. +8
    -0
      docs/spec/reactors/mempool/reactor.md
  19. +11
    -0
      docs/tendermint-core/configuration.md
  20. +0
    -232
      libs/common/repeat_timer.go
  21. +0
    -136
      libs/common/repeat_timer_test.go
  22. +1
    -1
      libs/common/service.go
  23. +13
    -0
      mempool/bench_test.go
  24. +101
    -0
      mempool/cache_test.go
  25. +120
    -36
      mempool/mempool.go
  26. +11
    -31
      mempool/mempool_test.go
  27. +95
    -9
      mempool/reactor.go
  28. +60
    -5
      mempool/reactor_test.go
  29. +17
    -6
      node/node.go
  30. +6
    -6
      p2p/conn/connection.go
  31. +0
    -100
      p2p/dummy/peer.go
  32. +68
    -0
      p2p/mock/peer.go
  33. +43
    -82
      p2p/pex/addrbook.go
  34. +15
    -20
      p2p/pex/addrbook_test.go
  35. +5
    -48
      p2p/pex/pex_reactor_test.go
  36. +10
    -5
      p2p/switch.go
  37. +1
    -5
      rpc/client/httpclient.go
  38. +3
    -5
      rpc/client/interface.go
  39. +1
    -5
      rpc/client/localclient.go
  40. +12
    -0
      rpc/client/mock/client.go
  41. +2
    -0
      rpc/core/consensus.go
  42. +3
    -1
      rpc/lib/client/http_client.go
  43. +12
    -5
      state/services.go
  44. +4
    -4
      tools/tm-signer-harness/internal/test_harness.go
  45. +0
    -33
      types/protobuf.go
  46. +1
    -1
      types/protobuf_test.go
  47. +2
    -1
      types/validator_set.go
  48. +1
    -1
      version/version.go

+ 54
- 0
CHANGELOG.md View File

@ -1,5 +1,59 @@
# Changelog
## v0.31.1
*March 27th, 2019*
This release contains a major improvement for the mempool that reduce the amount of sent data by about 30%
(see some numbers below).
It also fixes a memory leak in the mempool and adds TLS support to the RPC server by providing a certificate and key in the config.
Special thanks to external contributors on this release:
@brapse, @guagualvcha, @HaoyangLiu, @needkane, @TraceBundy
### BREAKING CHANGES:
* CLI/RPC/Config
* Apps
* Go API
- [crypto] [\#3426](https://github.com/tendermint/tendermint/pull/3426) Remove `Ripemd160` helper method (@needkane)
- [libs/common] [\#3429](https://github.com/tendermint/tendermint/pull/3429) Remove `RepeatTimer` (also `TimerMaker` and `Ticker` interface)
- [rpc/client] [\#3458](https://github.com/tendermint/tendermint/issues/3458) Include `NetworkClient` interface into `Client` interface
- [types] [\#3448](https://github.com/tendermint/tendermint/issues/3448) Remove method `PB2TM.ConsensusParams`
* Blockchain Protocol
* P2P Protocol
### FEATURES:
- [rpc] [\#3419](https://github.com/tendermint/tendermint/issues/3419) Start HTTPS server if `rpc.tls_cert_file` and `rpc.tls_key_file` are provided in the config (@guagualvcha)
### IMPROVEMENTS:
- [docs] [\#3140](https://github.com/tendermint/tendermint/issues/3140) Formalize proposer election algorithm properties
- [docs] [\#3482](https://github.com/tendermint/tendermint/issues/3482) Fix broken links (@brapse)
- [mempool] [\#2778](https://github.com/tendermint/tendermint/issues/2778) No longer send txs back to peers who sent it to you.
Also, limit to 65536 active peers.
This vastly improves the bandwidth consumption of nodes.
For instance, for a 4 node localnet, in a test sending 250byte txs for 120 sec. at 500 txs/sec (total of 15MB):
- total bytes received from 1st node:
- before: 42793967 (43MB)
- after: 30003256 (30MB)
- total bytes sent to 1st node:
- before: 30569339 (30MB)
- after: 19304964 (19MB)
- [p2p] [\#3475](https://github.com/tendermint/tendermint/issues/3475) Simplify `GetSelectionWithBias` for addressbook (@guagualvcha)
- [rpc/lib/client] [\#3430](https://github.com/tendermint/tendermint/issues/3430) Disable compression for HTTP client to prevent GZIP-bomb DoS attacks (@guagualvcha)
### BUG FIXES:
- [blockchain] [\#2699](https://github.com/tendermint/tendermint/issues/2699) Update the maxHeight when a peer is removed
- [mempool] [\#3478](https://github.com/tendermint/tendermint/issues/3478) Fix memory-leak related to `broadcastTxRoutine` (@HaoyangLiu)
## v0.31.0
*March 16th, 2019*


+ 1
- 0
CHANGELOG_PENDING.md View File

@ -19,3 +19,4 @@
### IMPROVEMENTS:
### BUG FIXES:

+ 41
- 11
blockchain/pool.go View File

@ -69,7 +69,7 @@ type BlockPool struct {
height int64 // the lowest key in requesters.
// peers
peers map[p2p.ID]*bpPeer
maxPeerHeight int64
maxPeerHeight int64 // the biggest reported height
// atomic
numPending int32 // number of requests pending assignment or block response
@ -78,6 +78,8 @@ type BlockPool struct {
errorsCh chan<- peerError
}
// NewBlockPool returns a new BlockPool with the height equal to start. Block
// requests and errors will be sent to requestsCh and errorsCh accordingly.
func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- peerError) *BlockPool {
bp := &BlockPool{
peers: make(map[p2p.ID]*bpPeer),
@ -93,15 +95,15 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p
return bp
}
// OnStart implements cmn.Service by spawning requesters routine and recording
// pool's start time.
func (pool *BlockPool) OnStart() error {
go pool.makeRequestersRoutine()
pool.startTime = time.Now()
return nil
}
func (pool *BlockPool) OnStop() {}
// Run spawns requesters as needed.
// spawns requesters as needed
func (pool *BlockPool) makeRequestersRoutine() {
for {
if !pool.IsRunning() {
@ -150,6 +152,8 @@ func (pool *BlockPool) removeTimedoutPeers() {
}
}
// GetStatus returns pool's height, numPending requests and the number of
// requesters.
func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequesters int) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
@ -157,6 +161,7 @@ func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequester
return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters)
}
// IsCaughtUp returns true if this node is caught up, false - otherwise.
// TODO: relax conditions, prevent abuse.
func (pool *BlockPool) IsCaughtUp() bool {
pool.mtx.Lock()
@ -170,8 +175,9 @@ func (pool *BlockPool) IsCaughtUp() bool {
// Some conditions to determine if we're caught up.
// Ensures we've either received a block or waited some amount of time,
// and that we're synced to the highest known height. Note we use maxPeerHeight - 1
// because to sync block H requires block H+1 to verify the LastCommit.
// and that we're synced to the highest known height.
// Note we use maxPeerHeight - 1 because to sync block H requires block H+1
// to verify the LastCommit.
receivedBlockOrTimedOut := pool.height > 0 || time.Since(pool.startTime) > 5*time.Second
ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= (pool.maxPeerHeight-1)
isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers
@ -260,14 +266,14 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
}
}
// MaxPeerHeight returns the highest height reported by a peer.
// MaxPeerHeight returns the highest reported height.
func (pool *BlockPool) MaxPeerHeight() int64 {
pool.mtx.Lock()
defer pool.mtx.Unlock()
return pool.maxPeerHeight
}
// Sets the peer's alleged blockchain height.
// SetPeerHeight sets the peer's alleged blockchain height.
func (pool *BlockPool) SetPeerHeight(peerID p2p.ID, height int64) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
@ -286,6 +292,8 @@ func (pool *BlockPool) SetPeerHeight(peerID p2p.ID, height int64) {
}
}
// RemovePeer removes the peer with peerID from the pool. If there's no peer
// with peerID, function is a no-op.
func (pool *BlockPool) RemovePeer(peerID p2p.ID) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
@ -299,10 +307,32 @@ func (pool *BlockPool) removePeer(peerID p2p.ID) {
requester.redo(peerID)
}
}
if p, exist := pool.peers[peerID]; exist && p.timeout != nil {
p.timeout.Stop()
peer, ok := pool.peers[peerID]
if ok {
if peer.timeout != nil {
peer.timeout.Stop()
}
delete(pool.peers, peerID)
// Find a new peer with the biggest height and update maxPeerHeight if the
// peer's height was the biggest.
if peer.height == pool.maxPeerHeight {
pool.updateMaxPeerHeight()
}
}
}
// If no peers are left, maxPeerHeight is set to 0.
func (pool *BlockPool) updateMaxPeerHeight() {
var max int64
for _, peer := range pool.peers {
if peer.height > max {
max = peer.height
}
}
delete(pool.peers, peerID)
pool.maxPeerHeight = max
}
// Pick an available peer with at least the given minHeight.


+ 43
- 3
blockchain/pool_test.go View File

@ -1,12 +1,15 @@
package blockchain
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
)
@ -66,7 +69,7 @@ func makePeers(numPeers int, minHeight, maxHeight int64) testPeers {
return peers
}
func TestBasic(t *testing.T) {
func TestBlockPoolBasic(t *testing.T) {
start := int64(42)
peers := makePeers(10, start+1, 1000)
errorsCh := make(chan peerError, 1000)
@ -122,7 +125,7 @@ func TestBasic(t *testing.T) {
}
}
func TestTimeout(t *testing.T) {
func TestBlockPoolTimeout(t *testing.T) {
start := int64(42)
peers := makePeers(10, start+1, 1000)
errorsCh := make(chan peerError, 1000)
@ -180,3 +183,40 @@ func TestTimeout(t *testing.T) {
}
}
}
func TestBlockPoolRemovePeer(t *testing.T) {
peers := make(testPeers, 10)
for i := 0; i < 10; i++ {
peerID := p2p.ID(fmt.Sprintf("%d", i+1))
height := int64(i + 1)
peers[peerID] = testPeer{peerID, height, make(chan inputData)}
}
requestsCh := make(chan BlockRequest)
errorsCh := make(chan peerError)
pool := NewBlockPool(1, requestsCh, errorsCh)
pool.SetLogger(log.TestingLogger())
err := pool.Start()
require.NoError(t, err)
defer pool.Stop()
// add peers
for peerID, peer := range peers {
pool.SetPeerHeight(peerID, peer.height)
}
assert.EqualValues(t, 10, pool.MaxPeerHeight())
// remove not-existing peer
assert.NotPanics(t, func() { pool.RemovePeer(p2p.ID("Superman")) })
// remove peer with biggest height
pool.RemovePeer(p2p.ID("10"))
assert.EqualValues(t, 9, pool.MaxPeerHeight())
// remove all peers
for peerID := range peers {
pool.RemovePeer(peerID)
}
assert.EqualValues(t, 0, pool.MaxPeerHeight())
}

+ 29
- 0
config/config.go View File

@ -339,6 +339,20 @@ type RPCConfig struct {
// global HTTP write timeout, which applies to all connections and endpoints.
// See https://github.com/tendermint/tendermint/issues/3435
TimeoutBroadcastTxCommit time.Duration `mapstructure:"timeout_broadcast_tx_commit"`
// The name of a file containing certificate that is used to create the HTTPS server.
//
// If the certificate is signed by a certificate authority,
// the certFile should be the concatenation of the server's certificate, any intermediates,
// and the CA's certificate.
//
// NOTE: both tls_cert_file and tls_key_file must be present for Tendermint to create HTTPS server. Otherwise, HTTP server is run.
TLSCertFile string `mapstructure:"tls_cert_file"`
// The name of a file containing matching private key that is used to create the HTTPS server.
//
// NOTE: both tls_cert_file and tls_key_file must be present for Tendermint to create HTTPS server. Otherwise, HTTP server is run.
TLSKeyFile string `mapstructure:"tls_key_file"`
}
// DefaultRPCConfig returns a default configuration for the RPC server
@ -357,6 +371,9 @@ func DefaultRPCConfig() *RPCConfig {
MaxSubscriptionClients: 100,
MaxSubscriptionsPerClient: 5,
TimeoutBroadcastTxCommit: 10 * time.Second,
TLSCertFile: "",
TLSKeyFile: "",
}
}
@ -395,6 +412,18 @@ func (cfg *RPCConfig) IsCorsEnabled() bool {
return len(cfg.CORSAllowedOrigins) != 0
}
func (cfg RPCConfig) KeyFile() string {
return rootify(filepath.Join(defaultConfigDir, cfg.TLSKeyFile), cfg.RootDir)
}
func (cfg RPCConfig) CertFile() string {
return rootify(filepath.Join(defaultConfigDir, cfg.TLSCertFile), cfg.RootDir)
}
func (cfg RPCConfig) IsTLSEnabled() bool {
return cfg.TLSCertFile != "" && cfg.TLSKeyFile != ""
}
//-----------------------------------------------------------------------------
// P2PConfig


+ 11
- 0
config/toml.go View File

@ -181,6 +181,17 @@ max_subscriptions_per_client = {{ .RPC.MaxSubscriptionsPerClient }}
# See https://github.com/tendermint/tendermint/issues/3435
timeout_broadcast_tx_commit = "{{ .RPC.TimeoutBroadcastTxCommit }}"
# The name of a file containing certificate that is used to create the HTTPS server.
# If the certificate is signed by a certificate authority,
# the certFile should be the concatenation of the server's certificate, any intermediates,
# and the CA's certificate.
# NOTE: both tls_cert_file and tls_key_file must be present for Tendermint to create HTTPS server. Otherwise, HTTP server is run.
tls_cert_file = "{{ .RPC.TLSCertFile }}"
# The name of a file containing matching private key that is used to create the HTTPS server.
# NOTE: both tls_cert_file and tls_key_file must be present for Tendermint to create HTTPS server. Otherwise, HTTP server is run.
tls_key_file = "{{ .RPC.TLSKeyFile }}"
##### peer to peer configuration options #####
[p2p]


+ 1
- 1
consensus/replay.go View File

@ -324,7 +324,7 @@ func (h *Handshaker) ReplayBlocks(
}
if res.ConsensusParams != nil {
state.ConsensusParams = types.PB2TM.ConsensusParams(res.ConsensusParams, state.ConsensusParams.Block.TimeIotaMs)
state.ConsensusParams = state.ConsensusParams.Update(res.ConsensusParams)
}
sm.SaveState(h.stateDB, state)
}


+ 3
- 3
consensus/state_test.go View File

@ -14,7 +14,7 @@ import (
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
p2pdummy "github.com/tendermint/tendermint/p2p/dummy"
p2pmock "github.com/tendermint/tendermint/p2p/mock"
"github.com/tendermint/tendermint/types"
)
@ -1548,7 +1548,7 @@ func TestStateHalt1(t *testing.T) {
func TestStateOutputsBlockPartsStats(t *testing.T) {
// create dummy peer
cs, _ := randConsensusState(1)
peer := p2pdummy.NewPeer()
peer := p2pmock.NewPeer(nil)
// 1) new block part
parts := types.NewPartSetFromData(cmn.RandBytes(100), 10)
@ -1591,7 +1591,7 @@ func TestStateOutputsBlockPartsStats(t *testing.T) {
func TestStateOutputVoteStats(t *testing.T) {
cs, vss := randConsensusState(2)
// create dummy peer
peer := p2pdummy.NewPeer()
peer := p2pmock.NewPeer(nil)
vote := signVote(vss[1], types.PrecommitType, []byte("test"), types.PartSetHeader{})


+ 0
- 3
crypto/doc.go View File

@ -37,9 +37,6 @@
// sum := crypto.Sha256([]byte("This is Tendermint"))
// fmt.Printf("%x\n", sum)
// Ripemd160
// sum := crypto.Ripemd160([]byte("This is consensus"))
// fmt.Printf("%x\n", sum)
package crypto
// TODO: Add more docs in here

+ 0
- 7
crypto/example_test.go View File

@ -26,10 +26,3 @@ func ExampleSha256() {
// Output:
// f91afb642f3d1c87c17eb01aae5cb65c242dfdbe7cf1066cc260f4ce5d33b94e
}
func ExampleRipemd160() {
sum := crypto.Ripemd160([]byte("This is Tendermint"))
fmt.Printf("%x\n", sum)
// Output:
// 051e22663e8f0fd2f2302f1210f954adff009005
}

+ 0
- 8
crypto/hash.go View File

@ -2,8 +2,6 @@ package crypto
import (
"crypto/sha256"
"golang.org/x/crypto/ripemd160"
)
func Sha256(bytes []byte) []byte {
@ -11,9 +9,3 @@ func Sha256(bytes []byte) []byte {
hasher.Write(bytes)
return hasher.Sum(nil)
}
func Ripemd160(bytes []byte) []byte {
hasher := ripemd160.New()
hasher.Write(bytes)
return hasher.Sum(nil)
}

+ 1
- 1
docs/networks/docker-compose.md View File

@ -4,7 +4,7 @@ With Docker Compose, you can spin up local testnets with a single command.
## Requirements
1. [Install tendermint](/docs/introduction/install.md)
1. [Install tendermint](../introduction/install.md)
2. [Install docker](https://docs.docker.com/engine/installation/)
3. [Install docker-compose](https://docs.docker.com/compose/install/)


+ 6
- 4
docs/spec/blockchain/blockchain.md View File

@ -103,7 +103,7 @@ type PartSetHeader struct {
}
```
See [MerkleRoot](/docs/spec/blockchain/encoding.md#MerkleRoot) for details.
See [MerkleRoot](./encoding.md#MerkleRoot) for details.
## Time
@ -163,7 +163,7 @@ a _precommit_ has `vote.Type == 2`.
Signatures in Tendermint are raw bytes representing the underlying signature.
See the [signature spec](/docs/spec/blockchain/encoding.md#key-types) for more.
See the [signature spec](./encoding.md#key-types) for more.
## EvidenceData
@ -190,7 +190,7 @@ type DuplicateVoteEvidence struct {
}
```
See the [pubkey spec](/docs/spec/blockchain/encoding.md#key-types) for more.
See the [pubkey spec](./encoding.md#key-types) for more.
## Validation
@ -209,7 +209,7 @@ the current version of the `state` corresponds to the state
after executing transactions from the `prevBlock`.
Elements of an object are accessed as expected,
ie. `block.Header`.
See the [definition of `State`](/docs/spec/blockchain/state.md).
See the [definition of `State`](./state.md).
### Header
@ -332,6 +332,7 @@ block.ValidatorsHash == MerkleRoot(state.Validators)
MerkleRoot of the current validator set that is committing the block.
This can be used to validate the `LastCommit` included in the next block.
Note the validators are sorted by their address before computing the MerkleRoot.
### NextValidatorsHash
@ -342,6 +343,7 @@ block.NextValidatorsHash == MerkleRoot(state.NextValidators)
MerkleRoot of the next validator set that will be the validator set that commits the next block.
This is included so that the current validator set gets a chance to sign the
next validator sets Merkle root.
Note the validators are sorted by their address before computing the MerkleRoot.
### ConsensusHash


+ 1
- 1
docs/spec/blockchain/encoding.md View File

@ -339,6 +339,6 @@ type CanonicalVote struct {
The field ordering and the fixed sized encoding for the first three fields is optimized to ease parsing of SignBytes
in HSMs. It creates fixed offsets for relevant fields that need to be read in this context.
For more details, see the [signing spec](/docs/spec/consensus/signing.md).
For more details, see the [signing spec](../consensus/signing.md).
Also, see the motivating discussion in
[#1622](https://github.com/tendermint/tendermint/issues/1622).

+ 1
- 1
docs/spec/consensus/abci.md View File

@ -1 +1 @@
[Moved](/docs/spec/software/abci.md)
[Moved](../software/abci.md)

+ 1
- 1
docs/spec/consensus/wal.md View File

@ -1 +1 @@
[Moved](/docs/spec/software/wal.md)
[Moved](../software/wal.md)

+ 275
- 30
docs/spec/reactors/consensus/proposer-selection.md View File

@ -2,45 +2,290 @@
This document specifies the Proposer Selection Procedure that is used in Tendermint to choose a round proposer.
As Tendermint is “leader-based protocol”, the proposer selection is critical for its correct functioning.
Let denote with `proposer_p(h,r)` a process returned by the Proposer Selection Procedure at the process p, at height h
and round r. Then the Proposer Selection procedure should fulfill the following properties:
`Agreement`: Given a validator set V, and two honest validators,
p and q, for each height h, and each round r,
proposer_p(h,r) = proposer_q(h,r)
At a given block height, the proposer selection algorithm runs with the same validator set at each round .
Between heights, an updated validator set may be specified by the application as part of the ABCIResponses' EndBlock.
`Liveness`: In every consecutive sequence of rounds of size K (K is system parameter), at least a
single round has an honest proposer.
## Requirements for Proposer Selection
`Fairness`: The proposer selection is proportional to the validator voting power, i.e., a validator with more
voting power is selected more frequently, proportional to its power. More precisely, given a set of processes
with the total voting power N, during a sequence of rounds of size N, every process is proposer in a number of rounds
equal to its voting power.
This sections covers the requirements with Rx being mandatory and Ox optional requirements.
The following requirements must be met by the Proposer Selection procedure:
We now look at a few particular cases to understand better how fairness should be implemented.
If we have 4 processes with the following voting power distribution (p0,4), (p1, 2), (p2, 2), (p3, 2) at some round r,
we have the following sequence of proposer selections in the following rounds:
#### R1: Determinism
Given a validator set `V`, and two honest validators `p` and `q`, for each height `h` and each round `r` the following must hold:
`p0, p1, p2, p3, p0, p0, p1, p2, p3, p0, p0, p1, p2, p3, p0, p0, p1, p2, p3, p0, etc`
`proposer_p(h,r) = proposer_q(h,r)`
Let consider now the following scenario where a total voting power of faulty processes is aggregated in a single process
p0: (p0,3), (p1, 1), (p2, 1), (p3, 1), (p4, 1), (p5, 1), (p6, 1), (p7, 1).
In this case the sequence of proposer selections looks like this:
where `proposer_p(h,r)` is the proposer returned by the Proposer Selection Procedure at process `p`, at height `h` and round `r`.
`p0, p1, p2, p3, p0, p4, p5, p6, p7, p0, p0, p1, p2, p3, p0, p4, p5, p6, p7, p0, etc`
#### R2: Fairness
Given a validator set with total voting power P and a sequence S of elections. In any sub-sequence of S with length C*P, a validator v must be elected as proposer P/VP(v) times, i.e. with frequency:
In this case, we see that a number of rounds coordinated by a faulty process is proportional to its voting power.
We consider also the case where we have voting power uniformly distributed among processes, i.e., we have 10 processes
each with voting power of 1. And let consider that there are 3 faulty processes with consecutive addresses,
for example the first 3 processes are faulty. Then the sequence looks like this:
f(v) ~ VP(v) / P
`p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, etc`
where C is a tolerance factor for validator set changes with following values:
- C == 1 if there are no validator set changes
- C ~ k when there are validator changes
In this case, we have 3 consecutive rounds with a faulty proposer.
One special case we consider is the case where a single honest process p0 has most of the voting power, for example:
(p0,100), (p1, 2), (p2, 3), (p3, 4). Then the sequence of proposer selection looks like this:
*[this needs more work]*
p0, p0, p0, p0, p0, p0, p0, p0, p0, p0, p0, p0, p0, p1, p0, p0, p0, p0, p0, etc
### Basic Algorithm
This basically means that almost all rounds have the same proposer. But in this case, the process p0 has anyway enough
voting power to decide whatever he wants, so the fact that he coordinates almost all rounds seems correct.
At its core, the proposer selection procedure uses a weighted round-robin algorithm.
A model that gives a good intuition on how/ why the selection algorithm works and it is fair is that of a priority queue. The validators move ahead in this queue according to their voting power (the higher the voting power the faster a validator moves towards the head of the queue). When the algorithm runs the following happens:
- all validators move "ahead" according to their powers: for each validator, increase the priority by the voting power
- first in the queue becomes the proposer: select the validator with highest priority
- move the proposer back in the queue: decrease the proposer's priority by the total voting power
Notation:
- vset - the validator set
- n - the number of validators
- VP(i) - voting power of validator i
- A(i) - accumulated priority for validator i
- P - total voting power of set
- avg - average of all validator priorities
- prop - proposer
Simple view at the Selection Algorithm:
```
def ProposerSelection (vset):
// compute priorities and elect proposer
for each validator i in vset:
A(i) += VP(i)
prop = max(A)
A(prop) -= P
```
### Stable Set
Consider the validator set:
Validator | p1| p2
----------|---|---
VP | 1 | 3
Assuming no validator changes, the following table shows the proposer priority computation over a few runs. Four runs of the selection procedure are shown, starting with the 5th the same values are computed.
Each row shows the priority queue and the process place in it. The proposer is the closest to the head, the rightmost validator. As priorities are updated, the validators move right in the queue. The proposer moves left as its priority is reduced after election.
|Priority Run | -2| -1| 0 | 1| 2 | 3 | 4 | 5 | Alg step
|--------------- |---|---|---- |---|---- |---|---|---|--------
| | | |p1,p2| | | | | |Initialized to 0
|run 1 | | | | p1| | p2| | |A(i)+=VP(i)
| | | p2| | p1| | | | |A(p2)-= P
|run 2 | | | | |p1,p2| | | |A(i)+=VP(i)
| | p1| | | | p2| | | |A(p1)-= P
|run 3 | | p1| | | | | | p2|A(i)+=VP(i)
| | | p1| | p2| | | | |A(p2)-= P
|run 4 | | | p1| | | | p2| |A(i)+=VP(i)
| | | |p1,p2| | | | | |A(p2)-= P
It can be shown that:
- At the end of each run k+1 the sum of the priorities is the same as at end of run k. If a new set's priorities are initialized to 0 then the sum of priorities will be 0 at each run while there are no changes.
- The max distance between priorites is (n-1) * P. *[formal proof not finished]*
### Validator Set Changes
Between proposer selection runs the validator set may change. Some changes have implications on the proposer election.
#### Voting Power Change
Consider again the earlier example and assume that the voting power of p1 is changed to 4:
Validator | p1| p2
----------|---| ---
VP | 4 | 3
Let's also assume that before this change the proposer priorites were as shown in first row (last run). As it can be seen, the selection could run again, without changes, as before.
|Priority Run| -2 | -1 | 0 | 1 | 2 | Comment
|--------------| ---|--- |------|--- |--- |--------
| last run | | p2 | | p1 | |__update VP(p1)__
| next run | | | | | p2 |A(i)+=VP(i)
| | p1 | | | | p2 |A(p1)-= P
However, when a validator changes power from a high to a low value, some other validator remain far back in the queue for a long time. This scenario is considered again in the Proposer Priority Range section.
As before:
- At the end of each run k+1 the sum of the priorities is the same as at run k.
- The max distance between priorites is (n-1) * P.
#### Validator Removal
Consider a new example with set:
Validator | p1 | p2 | p3 |
--------- |--- |--- |--- |
VP | 1 | 2 | 3 |
Let's assume that after the last run the proposer priorities were as shown in first row with their sum being 0. After p2 is removed, at the end of next proposer selection run (penultimate row) the sum of priorities is -2 (minus the priority of the removed process).
The procedure could continue without modifications. However, after a sufficiently large number of modifications in validator set, the priority values would migrate towards maximum or minimum allowed values causing truncations due to overflow detection.
For this reason, the selection procedure adds another __new step__ that centers the current priority values such that the priority sum remains close to 0.
|Priority Run |-3 | -2 | -1 | 0 | 1 | 2 | 4 |Comment
|--------------- |--- | ---|--- |--- |--- |--- |---|--------
| last run |p3 | | | | p1 | p2 | |__remove p2__
| nextrun | | | | | | | |
| __new step__ | | p3 | | | | p1 | |A(i) -= avg, avg = -1
| | | | | | p3 | p1 | |A(i)+=VP(i)
| | | | p1 | | p3 | | |A(p1)-= P
The modified selection algorithm is:
def ProposerSelection (vset):
// center priorities around zero
avg = sum(A(i) for i in vset)/len(vset)
for each validator i in vset:
A(i) -= avg
// compute priorities and elect proposer
for each validator i in vset:
A(i) += VP(i)
prop = max(A)
A(prop) -= P
Observations:
- The sum of priorities is now close to 0. Due to integer division the sum is an integer in (-n, n), where n is the number of validators.
#### New Validator
When a new validator is added, same problem as the one described for removal appears, the sum of priorities in the new set is not zero. This is fixed with the centering step introduced above.
One other issue that needs to be addressed is the following. A validator V that has just been elected is moved to the end of the queue. If the validator set is large and/ or other validators have significantly higher power, V will have to wait many runs to be elected. If V removes and re-adds itself to the set, it would make a significant (albeit unfair) "jump" ahead in the queue.
In order to prevent this, when a new validator is added, its initial priority is set to:
A(V) = -1.125 * P
where P is the total voting power of the set including V.
Curent implementation uses the penalty factor of 1.125 because it provides a small punishment that is efficient to calculate. See [here](https://github.com/tendermint/tendermint/pull/2785#discussion_r235038971) for more details.
If we consider the validator set where p3 has just been added:
Validator | p1 | p2 | p3
----------|--- |--- |---
VP | 1 | 3 | 8
then p3 will start with proposer priority:
A(p3) = -1.125 * (1 + 3 + 8) ~ -13
Note that since current computation uses integer division there is penalty loss when sum of the voting power is less than 8.
In the next run, p3 will still be ahead in the queue, elected as proposer and moved back in the queue.
|Priority Run |-13 | -9 | -5 | -2 | -1 | 0 | 1 | 2 | 5 | 6 | 7 |Alg step
|---------------|--- |--- |--- |----|--- |--- |---|---|---|---|---|--------
|last run | | | | p2 | | | | p1| | | |__add p3__
| | p3 | | | p2 | | | | p1| | | |A(p3) = -4
|next run | | p3 | | | | | | p2| | p1| |A(i) -= avg, avg = -4
| | | | | | p3 | | | | p2| | p1|A(i)+=VP(i)
| | | | p1 | | p3 | | | | p2| | |A(p1)-=P
### Proposer Priority Range
With the introduction of centering, some interesting cases occur. Low power validators that bind early in a set that includes high power validator(s) benefit from subsequent additions to the set. This is because these early validators run through more right shift operations during centering, operations that increase their priority.
As an example, consider the set where p2 is added after p1, with priority -1.125 * 80k = -90k. After the selection procedure runs once:
Validator | p1 | p2 | Comment
----------|-----|---- |---
VP | 80k | 10 |
A | 0 |-90k | __added p2__
A |-45k | 45k | __run selection__
Then execute the following steps:
1. Add a new validator p3:
Validator | p1 | p2 | p3
----------|-----|--- |----
VP | 80k | 10 | 10
2. Run selection once. The notation '..p'/'p..' means very small deviations compared to column priority.
|Priority Run | -90k..| -60k | -45k | -15k| 0 | 45k | 75k | 155k | Comment
|--------------|------ |----- |------- |---- |---|---- |----- |------- |---------
| last run | p3 | | p2 | | | p1 | | | __added p3__
| next run
| *right_shift*| | p3 | | p2 | | | p1 | | A(i) -= avg,avg=-30k
| | | ..p3| | ..p2| | | | p1 | A(i)+=VP(i)
| | | ..p3| | ..p2| | | p1.. | | A(p1)-=P, P=80k+20
3. Remove p1 and run selection once:
Validator | p3 | p2 | Comment
----------|----- |---- |--------
VP | 10 | 10 |
A |-60k |-15k |
A |-22.5k|22.5k| __run selection__
At this point, while the total voting power is 20, the distance between priorities is 45k. It will take 4500 runs for p3 to catch up with p2.
In order to prevent these types of scenarios, the selection algorithm performs scaling of priorities such that the difference between min and max values is smaller than two times the total voting power.
The modified selection algorithm is:
def ProposerSelection (vset):
// scale the priority values
diff = max(A)-min(A)
threshold = 2 * P
if diff > threshold:
scale = diff/threshold
for each validator i in vset:
A(i) = A(i)/scale
// center priorities around zero
avg = sum(A(i) for i in vset)/len(vset)
for each validator i in vset:
A(i) -= avg
// compute priorities and elect proposer
for each validator i in vset:
A(i) += VP(i)
prop = max(A)
A(prop) -= P
Observations:
- With this modification, the maximum distance between priorites becomes 2 * P.
Note also that even during steady state the priority range may increase beyond 2 * P. The scaling introduced here helps to keep the range bounded.
### Wrinkles
#### Validator Power Overflow Conditions
The validator voting power is a positive number stored as an int64. When a validator is added the `1.125 * P` computation must not overflow. As a consequence the code handling validator updates (add and update) checks for overflow conditions making sure the total voting power is never larger than the largest int64 `MAX`, with the property that `1.125 * MAX` is still in the bounds of int64. Fatal error is return when overflow condition is detected.
#### Proposer Priority Overflow/ Underflow Handling
The proposer priority is stored as an int64. The selection algorithm performs additions and subtractions to these values and in the case of overflows and underflows it limits the values to:
MaxInt64 = 1 << 63 - 1
MinInt64 = -1 << 63
### Requirement Fulfillment Claims
__[R1]__
The proposer algorithm is deterministic giving consistent results across executions with same transactions and validator set modifications.
[WIP - needs more detail]
__[R2]__
Given a set of processes with the total voting power P, during a sequence of elections of length P, the number of times any process is selected as proposer is equal to its voting power. The sequence of the P proposers then repeats. If we consider the validator set:
Validator | p1| p2
----------|---|---
VP | 1 | 3
With no other changes to the validator set, the current implementation of proposer selection generates the sequence:
`p2, p1, p2, p2, p2, p1, p2, p2,...` or [`p2, p1, p2, p2`]*
A sequence that starts with any circular permutation of the [`p2, p1, p2, p2`] sub-sequence would also provide the same degree of fairness. In fact these circular permutations show in the sliding window (over the generated sequence) of size equal to the length of the sub-sequence.
Assigning priorities to each validator based on the voting power and updating them at each run ensures the fairness of the proposer selection. In addition, every time a validator is elected as proposer its priority is decreased with the total voting power.
Intuitively, a process v jumps ahead in the queue at most (max(A) - min(A))/VP(v) times until it reaches the head and is elected. The frequency is then:
f(v) ~ VP(v)/(max(A)-min(A)) = 1/k * VP(v)/P
For current implementation, this means v should be proposer at least VP(v) times out of k * P runs, with scaling factor k=2.

+ 8
- 0
docs/spec/reactors/mempool/reactor.md View File

@ -12,3 +12,11 @@ for details.
Sending incorrectly encoded data or data exceeding `maxMsgSize` will result
in stopping the peer.
The mempool will not send a tx back to any peer which it received it from.
The reactor assigns an `uint16` number for each peer and maintains a map from
p2p.ID to `uint16`. Each mempool transaction carries a list of all the senders
(`[]uint16`). The list is updated every time mempool receives a transaction it
is already seen. `uint16` assumes that a node will never have over 65535 active
peers (0 is reserved for unknown source - e.g. RPC).

+ 11
- 0
docs/tendermint-core/configuration.md View File

@ -127,6 +127,17 @@ max_subscriptions_per_client = 5
# See https://github.com/tendermint/tendermint/issues/3435
timeout_broadcast_tx_commit = "10s"
# The name of a file containing certificate that is used to create the HTTPS server.
# If the certificate is signed by a certificate authority,
# the certFile should be the concatenation of the server's certificate, any intermediates,
# and the CA's certificate.
# NOTE: both tls_cert_file and tls_key_file must be present for Tendermint to create HTTPS server. Otherwise, HTTP server is run.
tls_cert_file = ""
# The name of a file containing matching private key that is used to create the HTTPS server.
# NOTE: both tls_cert_file and tls_key_file must be present for Tendermint to create HTTPS server. Otherwise, HTTP server is run.
tls_key_file = ""
##### peer to peer configuration options #####
[p2p]


+ 0
- 232
libs/common/repeat_timer.go View File

@ -1,232 +0,0 @@
package common
import (
"sync"
"time"
)
// Used by RepeatTimer the first time,
// and every time it's Reset() after Stop().
type TickerMaker func(dur time.Duration) Ticker
// Ticker is a basic ticker interface.
type Ticker interface {
// Never changes, never closes.
Chan() <-chan time.Time
// Stopping a stopped Ticker will panic.
Stop()
}
//----------------------------------------
// defaultTicker
var _ Ticker = (*defaultTicker)(nil)
type defaultTicker time.Ticker
func defaultTickerMaker(dur time.Duration) Ticker {
ticker := time.NewTicker(dur)
return (*defaultTicker)(ticker)
}
// Implements Ticker
func (t *defaultTicker) Chan() <-chan time.Time {
return t.C
}
// Implements Ticker
func (t *defaultTicker) Stop() {
((*time.Ticker)(t)).Stop()
}
//----------------------------------------
// LogicalTickerMaker
// Construct a TickerMaker that always uses `source`.
// It's useful for simulating a deterministic clock.
func NewLogicalTickerMaker(source chan time.Time) TickerMaker {
return func(dur time.Duration) Ticker {
return newLogicalTicker(source, dur)
}
}
type logicalTicker struct {
source <-chan time.Time
ch chan time.Time
quit chan struct{}
}
func newLogicalTicker(source <-chan time.Time, interval time.Duration) Ticker {
lt := &logicalTicker{
source: source,
ch: make(chan time.Time),
quit: make(chan struct{}),
}
go lt.fireRoutine(interval)
return lt
}
// We need a goroutine to read times from t.source
// and fire on t.Chan() when `interval` has passed.
func (t *logicalTicker) fireRoutine(interval time.Duration) {
source := t.source
// Init `lasttime`
lasttime := time.Time{}
select {
case lasttime = <-source:
case <-t.quit:
return
}
// Init `lasttime` end
for {
select {
case newtime := <-source:
elapsed := newtime.Sub(lasttime)
if interval <= elapsed {
// Block for determinism until the ticker is stopped.
select {
case t.ch <- newtime:
case <-t.quit:
return
}
// Reset timeleft.
// Don't try to "catch up" by sending more.
// "Ticker adjusts the intervals or drops ticks to make up for
// slow receivers" - https://golang.org/pkg/time/#Ticker
lasttime = newtime
}
case <-t.quit:
return // done
}
}
}
// Implements Ticker
func (t *logicalTicker) Chan() <-chan time.Time {
return t.ch // immutable
}
// Implements Ticker
func (t *logicalTicker) Stop() {
close(t.quit) // it *should* panic when stopped twice.
}
//---------------------------------------------------------------------
/*
RepeatTimer repeatedly sends a struct{}{} to `.Chan()` after each `dur`
period. (It's good for keeping connections alive.)
A RepeatTimer must be stopped, or it will keep a goroutine alive.
*/
type RepeatTimer struct {
name string
ch chan time.Time
tm TickerMaker
mtx sync.Mutex
dur time.Duration
ticker Ticker
quit chan struct{}
}
// NewRepeatTimer returns a RepeatTimer with a defaultTicker.
func NewRepeatTimer(name string, dur time.Duration) *RepeatTimer {
return NewRepeatTimerWithTickerMaker(name, dur, defaultTickerMaker)
}
// NewRepeatTimerWithTicker returns a RepeatTimer with the given ticker
// maker.
func NewRepeatTimerWithTickerMaker(name string, dur time.Duration, tm TickerMaker) *RepeatTimer {
var t = &RepeatTimer{
name: name,
ch: make(chan time.Time),
tm: tm,
dur: dur,
ticker: nil,
quit: nil,
}
t.reset()
return t
}
// receive ticks on ch, send out on t.ch
func (t *RepeatTimer) fireRoutine(ch <-chan time.Time, quit <-chan struct{}) {
for {
select {
case tick := <-ch:
select {
case t.ch <- tick:
case <-quit:
return
}
case <-quit: // NOTE: `t.quit` races.
return
}
}
}
func (t *RepeatTimer) Chan() <-chan time.Time {
return t.ch
}
func (t *RepeatTimer) Stop() {
t.mtx.Lock()
defer t.mtx.Unlock()
t.stop()
}
// Wait the duration again before firing.
func (t *RepeatTimer) Reset() {
t.mtx.Lock()
defer t.mtx.Unlock()
t.reset()
}
//----------------------------------------
// Misc.
// CONTRACT: (non-constructor) caller should hold t.mtx.
func (t *RepeatTimer) reset() {
if t.ticker != nil {
t.stop()
}
t.ticker = t.tm(t.dur)
t.quit = make(chan struct{})
go t.fireRoutine(t.ticker.Chan(), t.quit)
}
// CONTRACT: caller should hold t.mtx.
func (t *RepeatTimer) stop() {
if t.ticker == nil {
/*
Similar to the case of closing channels twice:
https://groups.google.com/forum/#!topic/golang-nuts/rhxMiNmRAPk
Stopping a RepeatTimer twice implies that you do
not know whether you are done or not.
If you're calling stop on a stopped RepeatTimer,
you probably have race conditions.
*/
panic("Tried to stop a stopped RepeatTimer")
}
t.ticker.Stop()
t.ticker = nil
/*
From https://golang.org/pkg/time/#Ticker:
"Stop the ticker to release associated resources"
"After Stop, no more ticks will be sent"
So we shouldn't have to do the below.
select {
case <-t.ch:
// read off channel if there's anything there
default:
}
*/
close(t.quit)
}

+ 0
- 136
libs/common/repeat_timer_test.go View File

@ -1,136 +0,0 @@
package common
import (
"sync"
"testing"
"time"
"github.com/fortytw2/leaktest"
"github.com/stretchr/testify/assert"
)
func TestDefaultTicker(t *testing.T) {
ticker := defaultTickerMaker(time.Millisecond * 10)
<-ticker.Chan()
ticker.Stop()
}
func TestRepeatTimer(t *testing.T) {
ch := make(chan time.Time, 100)
mtx := new(sync.Mutex)
// tick() fires from start to end
// (exclusive) in milliseconds with incr.
// It locks on mtx, so subsequent calls
// run in series.
tick := func(startMs, endMs, incrMs time.Duration) {
mtx.Lock()
go func() {
for tMs := startMs; tMs < endMs; tMs += incrMs {
lt := time.Time{}
lt = lt.Add(tMs * time.Millisecond)
ch <- lt
}
mtx.Unlock()
}()
}
// tock consumes Ticker.Chan() events and checks them against the ms in "timesMs".
tock := func(t *testing.T, rt *RepeatTimer, timesMs []int64) {
// Check against timesMs.
for _, timeMs := range timesMs {
tyme := <-rt.Chan()
sinceMs := tyme.Sub(time.Time{}) / time.Millisecond
assert.Equal(t, timeMs, int64(sinceMs))
}
// TODO detect number of running
// goroutines to ensure that
// no other times will fire.
// See https://github.com/tendermint/tendermint/libs/issues/120.
time.Sleep(time.Millisecond * 100)
done := true
select {
case <-rt.Chan():
done = false
default:
}
assert.True(t, done)
}
tm := NewLogicalTickerMaker(ch)
rt := NewRepeatTimerWithTickerMaker("bar", time.Second, tm)
/* NOTE: Useful for debugging deadlocks...
go func() {
time.Sleep(time.Second * 3)
trace := make([]byte, 102400)
count := runtime.Stack(trace, true)
fmt.Printf("Stack of %d bytes: %s\n", count, trace)
}()
*/
tick(0, 1000, 10)
tock(t, rt, []int64{})
tick(1000, 2000, 10)
tock(t, rt, []int64{1000})
tick(2005, 5000, 10)
tock(t, rt, []int64{2005, 3005, 4005})
tick(5001, 5999, 1)
// Read 5005 instead of 5001 because
// it's 1 second greater than 4005.
tock(t, rt, []int64{5005})
tick(6000, 7005, 1)
tock(t, rt, []int64{6005})
tick(7033, 8032, 1)
tock(t, rt, []int64{7033})
// After a reset, nothing happens
// until two ticks are received.
rt.Reset()
tock(t, rt, []int64{})
tick(8040, 8041, 1)
tock(t, rt, []int64{})
tick(9555, 9556, 1)
tock(t, rt, []int64{9555})
// After a stop, nothing more is sent.
rt.Stop()
tock(t, rt, []int64{})
// Another stop panics.
assert.Panics(t, func() { rt.Stop() })
}
func TestRepeatTimerReset(t *testing.T) {
// check that we are not leaking any go-routines
defer leaktest.Check(t)()
timer := NewRepeatTimer("test", 20*time.Millisecond)
defer timer.Stop()
// test we don't receive tick before duration ms.
select {
case <-timer.Chan():
t.Fatal("did not expect to receive tick")
default:
}
timer.Reset()
// test we receive tick after Reset is called
select {
case <-timer.Chan():
// all good
case <-time.After(40 * time.Millisecond):
t.Fatal("expected to receive tick after reset")
}
// just random calls
for i := 0; i < 100; i++ {
time.Sleep(time.Duration(RandIntn(40)) * time.Millisecond)
timer.Reset()
}
}

+ 1
- 1
libs/common/service.go View File

@ -209,7 +209,7 @@ func (bs *BaseService) Wait() {
<-bs.quit
}
// String implements Servce by returning a string representation of the service.
// String implements Service by returning a string representation of the service.
func (bs *BaseService) String() string {
return bs.name
}


+ 13
- 0
mempool/bench_test.go View File

@ -26,6 +26,19 @@ func BenchmarkReap(b *testing.B) {
}
}
func BenchmarkCheckTx(b *testing.B) {
app := kvstore.NewKVStoreApplication()
cc := proxy.NewLocalClientCreator(app)
mempool, cleanup := newMempoolWithApp(cc)
defer cleanup()
for i := 0; i < b.N; i++ {
tx := make([]byte, 8)
binary.BigEndian.PutUint64(tx, uint64(i))
mempool.CheckTx(tx, nil)
}
}
func BenchmarkCacheInsertTime(b *testing.B) {
cache := newMapTxCache(b.N)
txs := make([][]byte, b.N)


+ 101
- 0
mempool/cache_test.go View File

@ -0,0 +1,101 @@
package mempool
import (
"crypto/rand"
"crypto/sha256"
"testing"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/abci/example/kvstore"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
)
func TestCacheRemove(t *testing.T) {
cache := newMapTxCache(100)
numTxs := 10
txs := make([][]byte, numTxs)
for i := 0; i < numTxs; i++ {
// probability of collision is 2**-256
txBytes := make([]byte, 32)
rand.Read(txBytes)
txs[i] = txBytes
cache.Push(txBytes)
// make sure its added to both the linked list and the map
require.Equal(t, i+1, len(cache.map_))
require.Equal(t, i+1, cache.list.Len())
}
for i := 0; i < numTxs; i++ {
cache.Remove(txs[i])
// make sure its removed from both the map and the linked list
require.Equal(t, numTxs-(i+1), len(cache.map_))
require.Equal(t, numTxs-(i+1), cache.list.Len())
}
}
func TestCacheAfterUpdate(t *testing.T) {
app := kvstore.NewKVStoreApplication()
cc := proxy.NewLocalClientCreator(app)
mempool, cleanup := newMempoolWithApp(cc)
defer cleanup()
// reAddIndices & txsInCache can have elements > numTxsToCreate
// also assumes max index is 255 for convenience
// txs in cache also checks order of elements
tests := []struct {
numTxsToCreate int
updateIndices []int
reAddIndices []int
txsInCache []int
}{
{1, []int{}, []int{1}, []int{1, 0}}, // adding new txs works
{2, []int{1}, []int{}, []int{1, 0}}, // update doesn't remove tx from cache
{2, []int{2}, []int{}, []int{2, 1, 0}}, // update adds new tx to cache
{2, []int{1}, []int{1}, []int{1, 0}}, // re-adding after update doesn't make dupe
}
for tcIndex, tc := range tests {
for i := 0; i < tc.numTxsToCreate; i++ {
tx := types.Tx{byte(i)}
err := mempool.CheckTx(tx, nil)
require.NoError(t, err)
}
updateTxs := []types.Tx{}
for _, v := range tc.updateIndices {
tx := types.Tx{byte(v)}
updateTxs = append(updateTxs, tx)
}
mempool.Update(int64(tcIndex), updateTxs, nil, nil)
for _, v := range tc.reAddIndices {
tx := types.Tx{byte(v)}
_ = mempool.CheckTx(tx, nil)
}
cache := mempool.cache.(*mapTxCache)
node := cache.list.Front()
counter := 0
for node != nil {
require.NotEqual(t, len(tc.txsInCache), counter,
"cache larger than expected on testcase %d", tcIndex)
nodeVal := node.Value.([sha256.Size]byte)
expectedBz := sha256.Sum256([]byte{byte(tc.txsInCache[len(tc.txsInCache)-counter-1])})
// Reference for reading the errors:
// >>> sha256('\x00').hexdigest()
// '6e340b9cffb37a989ca544e6bb780a2c78901d3fb33738768511a30617afa01d'
// >>> sha256('\x01').hexdigest()
// '4bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a'
// >>> sha256('\x02').hexdigest()
// 'dbc1b4c900ffe48d575b5da5c638040125f65db0fe3e24494b76ea986457d986'
require.Equal(t, expectedBz, nodeVal, "Equality failed on index %d, tc %d", counter, tcIndex)
counter++
node = node.Next()
}
require.Equal(t, len(tc.txsInCache), counter,
"cache smaller than expected on testcase %d", tcIndex)
mempool.Flush()
}
}

+ 120
- 36
mempool/mempool.go View File

@ -31,6 +31,14 @@ type PreCheckFunc func(types.Tx) error
// transaction doesn't require more gas than available for the block.
type PostCheckFunc func(types.Tx, *abci.ResponseCheckTx) error
// TxInfo are parameters that get passed when attempting to add a tx to the
// mempool.
type TxInfo struct {
// We don't use p2p.ID here because it's too big. The gain is to store max 2
// bytes with each tx to identify the sender rather than 20 bytes.
PeerID uint16
}
/*
The mempool pushes new txs onto the proxyAppConn.
@ -148,9 +156,12 @@ func TxID(tx []byte) string {
type Mempool struct {
config *cfg.MempoolConfig
proxyMtx sync.Mutex
proxyAppConn proxy.AppConnMempool
txs *clist.CList // concurrent linked-list of good txs
proxyMtx sync.Mutex
proxyAppConn proxy.AppConnMempool
txs *clist.CList // concurrent linked-list of good txs
// map for quick access to txs
// Used in CheckTx to record the tx sender.
txsMap map[[sha256.Size]byte]*clist.CElement
height int64 // the last block Update()'d to
rechecking int32 // for re-checking filtered txs on Update()
recheckCursor *clist.CElement // next expected response
@ -161,7 +172,10 @@ type Mempool struct {
postCheck PostCheckFunc
// Atomic integers
txsBytes int64 // see TxsBytes
// Used to check if the mempool size is bigger than the allowed limit.
// See TxsBytes
txsBytes int64
// Keep a cache of already-seen txs.
// This reduces the pressure on the proxyApp.
@ -189,6 +203,7 @@ func NewMempool(
config: config,
proxyAppConn: proxyAppConn,
txs: clist.New(),
txsMap: make(map[[sha256.Size]byte]*clist.CElement),
height: height,
rechecking: 0,
recheckCursor: nil,
@ -286,8 +301,8 @@ func (mem *Mempool) TxsBytes() int64 {
return atomic.LoadInt64(&mem.txsBytes)
}
// FlushAppConn flushes the mempool connection to ensure async resCb calls are
// done e.g. from CheckTx.
// FlushAppConn flushes the mempool connection to ensure async reqResCb calls are
// done. E.g. from CheckTx.
func (mem *Mempool) FlushAppConn() error {
return mem.proxyAppConn.FlushSync()
}
@ -304,6 +319,7 @@ func (mem *Mempool) Flush() {
e.DetachPrev()
}
mem.txsMap = make(map[[sha256.Size]byte]*clist.CElement)
_ = atomic.SwapInt64(&mem.txsBytes, 0)
}
@ -327,6 +343,13 @@ func (mem *Mempool) TxsWaitChan() <-chan struct{} {
// It gets called from another goroutine.
// CONTRACT: Either cb will get called, or err returned.
func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
return mem.CheckTxWithInfo(tx, cb, TxInfo{PeerID: UnknownPeerID})
}
// CheckTxWithInfo performs the same operation as CheckTx, but with extra meta data about the tx.
// Currently this metadata is the peer who sent it,
// used to prevent the tx from being gossiped back to them.
func (mem *Mempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) (err error) {
mem.proxyMtx.Lock()
// use defer to unlock mutex because application (*local client*) might panic
defer mem.proxyMtx.Unlock()
@ -357,6 +380,20 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
// CACHE
if !mem.cache.Push(tx) {
// record the sender
e, ok := mem.txsMap[sha256.Sum256(tx)]
// The check is needed because tx may be in cache, but not in the mempool.
// E.g. after we've committed a block, txs are removed from the mempool,
// but not from the cache.
if ok {
memTx := e.Value.(*mempoolTx)
if _, loaded := memTx.senders.LoadOrStore(txInfo.PeerID, true); loaded {
// TODO: consider punishing peer for dups,
// its non-trivial since invalid txs can become valid,
// but they can spam the same tx with little cost to them atm.
}
}
return ErrTxInCache
}
// END CACHE
@ -381,27 +418,77 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
}
reqRes := mem.proxyAppConn.CheckTxAsync(tx)
if cb != nil {
reqRes.SetCallback(cb)
composedCallback := func(res *abci.Response) {
mem.reqResCb(tx, txInfo.PeerID)(res)
cb(res)
}
reqRes.SetCallback(composedCallback)
} else {
reqRes.SetCallback(mem.reqResCb(tx, txInfo.PeerID))
}
return nil
}
// ABCI callback function
// Global callback, which is called in the absence of the specific callback.
//
// In recheckTxs because no reqResCb (specific) callback is set, this callback
// will be called.
func (mem *Mempool) resCb(req *abci.Request, res *abci.Response) {
if mem.recheckCursor == nil {
mem.resCbNormal(req, res)
} else {
mem.metrics.RecheckTimes.Add(1)
mem.resCbRecheck(req, res)
return
}
mem.metrics.RecheckTimes.Add(1)
mem.resCbRecheck(req, res)
// update metrics
mem.metrics.Size.Set(float64(mem.Size()))
}
func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
// Specific callback, which allows us to incorporate local information, like
// the peer that sent us this tx, so we can avoid sending it back to the same
// peer.
//
// Used in CheckTxWithInfo to record PeerID who sent us the tx.
func (mem *Mempool) reqResCb(tx []byte, peerID uint16) func(res *abci.Response) {
return func(res *abci.Response) {
if mem.recheckCursor != nil {
return
}
mem.resCbFirstTime(tx, peerID, res)
// update metrics
mem.metrics.Size.Set(float64(mem.Size()))
}
}
func (mem *Mempool) addTx(memTx *mempoolTx) {
e := mem.txs.PushBack(memTx)
mem.txsMap[sha256.Sum256(memTx.tx)] = e
atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx)))
mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx)))
}
func (mem *Mempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromCache bool) {
mem.txs.Remove(elem)
elem.DetachPrev()
delete(mem.txsMap, sha256.Sum256(tx))
atomic.AddInt64(&mem.txsBytes, int64(-len(tx)))
if removeFromCache {
mem.cache.Remove(tx)
}
}
// callback, which is called after the app checked the tx for the first time.
//
// The case where the app checks the tx for the second and subsequent times is
// handled by the resCbRecheck callback.
func (mem *Mempool) resCbFirstTime(tx []byte, peerID uint16, res *abci.Response) {
switch r := res.Value.(type) {
case *abci.Response_CheckTx:
tx := req.GetCheckTx().Tx
var postCheckErr error
if mem.postCheck != nil {
postCheckErr = mem.postCheck(tx, r.CheckTx)
@ -412,15 +499,14 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
gasWanted: r.CheckTx.GasWanted,
tx: tx,
}
mem.txs.PushBack(memTx)
atomic.AddInt64(&mem.txsBytes, int64(len(tx)))
memTx.senders.Store(peerID, true)
mem.addTx(memTx)
mem.logger.Info("Added good transaction",
"tx", TxID(tx),
"res", r,
"height", memTx.height,
"total", mem.Size(),
)
mem.metrics.TxSizeBytes.Observe(float64(len(tx)))
mem.notifyTxsAvailable()
} else {
// ignore bad transaction
@ -434,6 +520,10 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
}
}
// callback, which is called after the app rechecked the tx.
//
// The case where the app checks the tx for the first time is handled by the
// resCbFirstTime callback.
func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) {
switch r := res.Value.(type) {
case *abci.Response_CheckTx:
@ -454,12 +544,8 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) {
} else {
// Tx became invalidated due to newly committed block.
mem.logger.Info("Tx is no longer valid", "tx", TxID(tx), "res", r, "err", postCheckErr)
mem.txs.Remove(mem.recheckCursor)
atomic.AddInt64(&mem.txsBytes, int64(-len(tx)))
mem.recheckCursor.DetachPrev()
// remove from cache (it might be good later)
mem.cache.Remove(tx)
// NOTE: we remove tx from the cache because it might be good later
mem.removeTx(tx, mem.recheckCursor, true)
}
if mem.recheckCursor == mem.recheckEnd {
mem.recheckCursor = nil
@ -627,12 +713,9 @@ func (mem *Mempool) removeTxs(txs types.Txs) []types.Tx {
memTx := e.Value.(*mempoolTx)
// Remove the tx if it's already in a block.
if _, ok := txsMap[string(memTx.tx)]; ok {
// remove from clist
mem.txs.Remove(e)
atomic.AddInt64(&mem.txsBytes, int64(-len(memTx.tx)))
e.DetachPrev()
// NOTE: we don't remove committed txs from the cache.
mem.removeTx(memTx.tx, e, false)
continue
}
txsLeft = append(txsLeft, memTx.tx)
@ -650,7 +733,7 @@ func (mem *Mempool) recheckTxs(txs []types.Tx) {
mem.recheckEnd = mem.txs.Back()
// Push txs to proxyAppConn
// NOTE: resCb() may be called concurrently.
// NOTE: reqResCb may be called concurrently.
for _, tx := range txs {
mem.proxyAppConn.CheckTxAsync(tx)
}
@ -663,6 +746,7 @@ func (mem *Mempool) recheckTxs(txs []types.Tx) {
type mempoolTx struct {
height int64 // height that this tx had been validated in
gasWanted int64 // amount of gas this tx states it will require
senders sync.Map // ids of peers who've sent us this tx (as a map for quick lookups)
tx types.Tx //
}
@ -679,13 +763,13 @@ type txCache interface {
Remove(tx types.Tx)
}
// mapTxCache maintains a cache of transactions. This only stores
// the hash of the tx, due to memory concerns.
// mapTxCache maintains a LRU cache of transactions. This only stores the hash
// of the tx, due to memory concerns.
type mapTxCache struct {
mtx sync.Mutex
size int
map_ map[[sha256.Size]byte]*list.Element
list *list.List // to remove oldest tx when cache gets too big
list *list.List
}
var _ txCache = (*mapTxCache)(nil)
@ -707,8 +791,8 @@ func (cache *mapTxCache) Reset() {
cache.mtx.Unlock()
}
// Push adds the given tx to the cache and returns true. It returns false if tx
// is already in the cache.
// Push adds the given tx to the cache and returns true. It returns
// false if tx is already in the cache.
func (cache *mapTxCache) Push(tx types.Tx) bool {
cache.mtx.Lock()
defer cache.mtx.Unlock()
@ -728,8 +812,8 @@ func (cache *mapTxCache) Push(tx types.Tx) bool {
cache.list.Remove(popped)
}
}
cache.list.PushBack(txHash)
cache.map_[txHash] = cache.list.Back()
e := cache.list.PushBack(txHash)
cache.map_[txHash] = e
return true
}


+ 11
- 31
mempool/mempool_test.go View File

@ -12,9 +12,10 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
amino "github.com/tendermint/go-amino"
"github.com/tendermint/tendermint/abci/example/counter"
"github.com/tendermint/tendermint/abci/example/kvstore"
abci "github.com/tendermint/tendermint/abci/types"
@ -63,8 +64,9 @@ func ensureFire(t *testing.T, ch <-chan struct{}, timeoutMS int) {
}
}
func checkTxs(t *testing.T, mempool *Mempool, count int) types.Txs {
func checkTxs(t *testing.T, mempool *Mempool, count int, peerID uint16) types.Txs {
txs := make(types.Txs, count)
txInfo := TxInfo{PeerID: peerID}
for i := 0; i < count; i++ {
txBytes := make([]byte, 20)
txs[i] = txBytes
@ -72,7 +74,7 @@ func checkTxs(t *testing.T, mempool *Mempool, count int) types.Txs {
if err != nil {
t.Error(err)
}
if err := mempool.CheckTx(txBytes, nil); err != nil {
if err := mempool.CheckTxWithInfo(txBytes, nil, txInfo); err != nil {
// Skip invalid txs.
// TestMempoolFilters will fail otherwise. It asserts a number of txs
// returned.
@ -92,7 +94,7 @@ func TestReapMaxBytesMaxGas(t *testing.T) {
defer cleanup()
// Ensure gas calculation behaves as expected
checkTxs(t, mempool, 1)
checkTxs(t, mempool, 1, UnknownPeerID)
tx0 := mempool.TxsFront().Value.(*mempoolTx)
// assert that kv store has gas wanted = 1.
require.Equal(t, app.CheckTx(tx0.tx).GasWanted, int64(1), "KVStore had a gas value neq to 1")
@ -126,7 +128,7 @@ func TestReapMaxBytesMaxGas(t *testing.T) {
{20, 20000, 30, 20},
}
for tcIndex, tt := range tests {
checkTxs(t, mempool, tt.numTxsToCreate)
checkTxs(t, mempool, tt.numTxsToCreate, UnknownPeerID)
got := mempool.ReapMaxBytesMaxGas(tt.maxBytes, tt.maxGas)
assert.Equal(t, tt.expectedNumTxs, len(got), "Got %d txs, expected %d, tc #%d",
len(got), tt.expectedNumTxs, tcIndex)
@ -167,7 +169,7 @@ func TestMempoolFilters(t *testing.T) {
}
for tcIndex, tt := range tests {
mempool.Update(1, emptyTxArr, tt.preFilter, tt.postFilter)
checkTxs(t, mempool, tt.numTxsToCreate)
checkTxs(t, mempool, tt.numTxsToCreate, UnknownPeerID)
require.Equal(t, tt.expectedNumTxs, mempool.Size(), "mempool had the incorrect size, on test case %d", tcIndex)
mempool.Flush()
}
@ -198,7 +200,7 @@ func TestTxsAvailable(t *testing.T) {
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
// send a bunch of txs, it should only fire once
txs := checkTxs(t, mempool, 100)
txs := checkTxs(t, mempool, 100, UnknownPeerID)
ensureFire(t, mempool.TxsAvailable(), timeoutMS)
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
@ -213,7 +215,7 @@ func TestTxsAvailable(t *testing.T) {
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
// send a bunch more txs. we already fired for this height so it shouldnt fire again
moreTxs := checkTxs(t, mempool, 50)
moreTxs := checkTxs(t, mempool, 50, UnknownPeerID)
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
// now call update with all the txs. it should not fire as there are no txs left
@ -224,7 +226,7 @@ func TestTxsAvailable(t *testing.T) {
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
// send a bunch more txs, it should only fire once
checkTxs(t, mempool, 100)
checkTxs(t, mempool, 100, UnknownPeerID)
ensureFire(t, mempool.TxsAvailable(), timeoutMS)
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
}
@ -340,28 +342,6 @@ func TestSerialReap(t *testing.T) {
reapCheck(600)
}
func TestCacheRemove(t *testing.T) {
cache := newMapTxCache(100)
numTxs := 10
txs := make([][]byte, numTxs)
for i := 0; i < numTxs; i++ {
// probability of collision is 2**-256
txBytes := make([]byte, 32)
rand.Read(txBytes)
txs[i] = txBytes
cache.Push(txBytes)
// make sure its added to both the linked list and the map
require.Equal(t, i+1, len(cache.map_))
require.Equal(t, i+1, cache.list.Len())
}
for i := 0; i < numTxs; i++ {
cache.Remove(txs[i])
// make sure its removed from both the map and the linked list
require.Equal(t, numTxs-(i+1), len(cache.map_))
require.Equal(t, numTxs-(i+1), cache.list.Len())
}
}
func TestMempoolCloseWAL(t *testing.T) {
// 1. Create the temporary directory for mempool and WAL testing.
rootDir, err := ioutil.TempDir("", "mempool-test")


+ 95
- 9
mempool/reactor.go View File

@ -2,14 +2,16 @@ package mempool
import (
"fmt"
"math"
"reflect"
"sync"
"time"
amino "github.com/tendermint/go-amino"
"github.com/tendermint/tendermint/libs/clist"
"github.com/tendermint/tendermint/libs/log"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/clist"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
)
@ -21,13 +23,85 @@ const (
maxTxSize = maxMsgSize - 8 // account for amino overhead of TxMessage
peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
// UnknownPeerID is the peer ID to use when running CheckTx when there is
// no peer (e.g. RPC)
UnknownPeerID uint16 = 0
maxActiveIDs = math.MaxUint16
)
// MempoolReactor handles mempool tx broadcasting amongst peers.
// It maintains a map from peer ID to counter, to prevent gossiping txs to the
// peers you received it from.
type MempoolReactor struct {
p2p.BaseReactor
config *cfg.MempoolConfig
Mempool *Mempool
ids *mempoolIDs
}
type mempoolIDs struct {
mtx sync.RWMutex
peerMap map[p2p.ID]uint16
nextID uint16 // assumes that a node will never have over 65536 active peers
activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter
}
// Reserve searches for the next unused ID and assignes it to the
// peer.
func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) {
ids.mtx.Lock()
defer ids.mtx.Unlock()
curID := ids.nextPeerID()
ids.peerMap[peer.ID()] = curID
ids.activeIDs[curID] = struct{}{}
}
// nextPeerID returns the next unused peer ID to use.
// This assumes that ids's mutex is already locked.
func (ids *mempoolIDs) nextPeerID() uint16 {
if len(ids.activeIDs) == maxActiveIDs {
panic(fmt.Sprintf("node has maximum %d active IDs and wanted to get one more", maxActiveIDs))
}
_, idExists := ids.activeIDs[ids.nextID]
for idExists {
ids.nextID++
_, idExists = ids.activeIDs[ids.nextID]
}
curID := ids.nextID
ids.nextID++
return curID
}
// Reclaim returns the ID reserved for the peer back to unused pool.
func (ids *mempoolIDs) Reclaim(peer p2p.Peer) {
ids.mtx.Lock()
defer ids.mtx.Unlock()
removedID, ok := ids.peerMap[peer.ID()]
if ok {
delete(ids.activeIDs, removedID)
delete(ids.peerMap, peer.ID())
}
}
// GetForPeer returns an ID reserved for the peer.
func (ids *mempoolIDs) GetForPeer(peer p2p.Peer) uint16 {
ids.mtx.RLock()
defer ids.mtx.RUnlock()
return ids.peerMap[peer.ID()]
}
func newMempoolIDs() *mempoolIDs {
return &mempoolIDs{
peerMap: make(map[p2p.ID]uint16),
activeIDs: map[uint16]struct{}{0: {}},
nextID: 1, // reserve unknownPeerID(0) for mempoolReactor.BroadcastTx
}
}
// NewMempoolReactor returns a new MempoolReactor with the given config and mempool.
@ -35,6 +109,7 @@ func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReac
memR := &MempoolReactor{
config: config,
Mempool: mempool,
ids: newMempoolIDs(),
}
memR.BaseReactor = *p2p.NewBaseReactor("MempoolReactor", memR)
return memR
@ -68,11 +143,13 @@ func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
// AddPeer implements Reactor.
// It starts a broadcast routine ensuring all txs are forwarded to the given peer.
func (memR *MempoolReactor) AddPeer(peer p2p.Peer) {
memR.ids.ReserveForPeer(peer)
go memR.broadcastTxRoutine(peer)
}
// RemovePeer implements Reactor.
func (memR *MempoolReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
memR.ids.Reclaim(peer)
// broadcast routine checks if peer is gone and returns
}
@ -89,7 +166,8 @@ func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
switch msg := msg.(type) {
case *TxMessage:
err := memR.Mempool.CheckTx(msg.Tx, nil)
peerID := memR.ids.GetForPeer(src)
err := memR.Mempool.CheckTxWithInfo(msg.Tx, nil, TxInfo{PeerID: peerID})
if err != nil {
memR.Logger.Info("Could not check tx", "tx", TxID(msg.Tx), "err", err)
}
@ -110,8 +188,13 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
return
}
peerID := memR.ids.GetForPeer(peer)
var next *clist.CElement
for {
// In case of both next.NextWaitChan() and peer.Quit() are variable at the same time
if !memR.IsRunning() || !peer.IsRunning() {
return
}
// This happens because the CElement we were looking at got garbage
// collected (removed). That is, .NextWait() returned nil. Go ahead and
// start from the beginning.
@ -146,12 +229,15 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
continue
}
// send memTx
msg := &TxMessage{Tx: memTx.tx}
success := peer.Send(MempoolChannel, cdc.MustMarshalBinaryBare(msg))
if !success {
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
continue
// ensure peer hasn't already sent us this tx
if _, ok := memTx.senders.Load(peerID); !ok {
// send memTx
msg := &TxMessage{Tx: memTx.tx}
success := peer.Send(MempoolChannel, cdc.MustMarshalBinaryBare(msg))
if !success {
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
continue
}
}
select {


+ 60
- 5
mempool/reactor_test.go View File

@ -2,21 +2,21 @@ package mempool
import (
"fmt"
"net"
"sync"
"testing"
"time"
"github.com/fortytw2/leaktest"
"github.com/go-kit/kit/log/term"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/go-kit/kit/log/term"
"github.com/tendermint/tendermint/abci/example/kvstore"
"github.com/tendermint/tendermint/libs/log"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/mock"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
)
@ -102,6 +102,12 @@ func _waitForTxs(t *testing.T, wg *sync.WaitGroup, txs types.Txs, reactorIdx int
wg.Done()
}
// ensure no txs on reactor after some timeout
func ensureNoTxs(t *testing.T, reactor *MempoolReactor, timeout time.Duration) {
time.Sleep(timeout) // wait for the txs in all mempools
assert.Zero(t, reactor.Mempool.Size())
}
const (
NUM_TXS = 1000
TIMEOUT = 120 * time.Second // ridiculously high because CircleCI is slow
@ -124,10 +130,26 @@ func TestReactorBroadcastTxMessage(t *testing.T) {
// send a bunch of txs to the first reactor's mempool
// and wait for them all to be received in the others
txs := checkTxs(t, reactors[0].Mempool, NUM_TXS)
txs := checkTxs(t, reactors[0].Mempool, NUM_TXS, UnknownPeerID)
waitForTxs(t, txs, reactors)
}
func TestReactorNoBroadcastToSender(t *testing.T) {
config := cfg.TestConfig()
const N = 2
reactors := makeAndConnectMempoolReactors(config, N)
defer func() {
for _, r := range reactors {
r.Stop()
}
}()
// send a bunch of txs to the first reactor's mempool, claiming it came from peer
// ensure peer gets no txs
checkTxs(t, reactors[0].Mempool, NUM_TXS, 1)
ensureNoTxs(t, reactors[1], 100*time.Millisecond)
}
func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
@ -169,3 +191,36 @@ func TestBroadcastTxForPeerStopsWhenReactorStops(t *testing.T) {
// i.e. broadcastTxRoutine finishes when reactor is stopped
leaktest.CheckTimeout(t, 10*time.Second)()
}
func TestMempoolIDsBasic(t *testing.T) {
ids := newMempoolIDs()
peer := mock.NewPeer(net.IP{127, 0, 0, 1})
ids.ReserveForPeer(peer)
assert.EqualValues(t, 1, ids.GetForPeer(peer))
ids.Reclaim(peer)
ids.ReserveForPeer(peer)
assert.EqualValues(t, 2, ids.GetForPeer(peer))
ids.Reclaim(peer)
}
func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) {
if testing.Short() {
return
}
// 0 is already reserved for UnknownPeerID
ids := newMempoolIDs()
for i := 0; i < maxActiveIDs-1; i++ {
peer := mock.NewPeer(net.IP{127, 0, 0, 1})
ids.ReserveForPeer(peer)
}
assert.Panics(t, func() {
peer := mock.NewPeer(net.IP{127, 0, 0, 1})
ids.ReserveForPeer(peer)
})
}

+ 17
- 6
node/node.go View File

@ -715,13 +715,24 @@ func (n *Node) startRPC() ([]net.Listener, error) {
})
rootHandler = corsMiddleware.Handler(mux)
}
if n.config.RPC.IsTLSEnabled() {
go rpcserver.StartHTTPAndTLSServer(
listener,
rootHandler,
n.config.RPC.CertFile(),
n.config.RPC.KeyFile(),
rpcLogger,
config,
)
} else {
go rpcserver.StartHTTPServer(
listener,
rootHandler,
rpcLogger,
config,
)
}
go rpcserver.StartHTTPServer(
listener,
rootHandler,
rpcLogger,
config,
)
listeners[i] = listener
}


+ 6
- 6
p2p/conn/connection.go View File

@ -95,13 +95,13 @@ type MConnection struct {
stopMtx sync.Mutex
flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
pingTimer *cmn.RepeatTimer // send pings periodically
pingTimer *time.Ticker // send pings periodically
// close conn if pong is not received in pongTimeout
pongTimer *time.Timer
pongTimeoutCh chan bool // true - timeout, false - peer sent pong
chStatsTimer *cmn.RepeatTimer // update channel stats periodically
chStatsTimer *time.Ticker // update channel stats periodically
created time.Time // time of creation
@ -201,9 +201,9 @@ func (c *MConnection) OnStart() error {
return err
}
c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle)
c.pingTimer = cmn.NewRepeatTimer("ping", c.config.PingInterval)
c.pingTimer = time.NewTicker(c.config.PingInterval)
c.pongTimeoutCh = make(chan bool, 1)
c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats)
c.chStatsTimer = time.NewTicker(updateStats)
c.quitSendRoutine = make(chan struct{})
c.doneSendRoutine = make(chan struct{})
go c.sendRoutine()
@ -401,11 +401,11 @@ FOR_LOOP:
// NOTE: flushTimer.Set() must be called every time
// something is written to .bufConnWriter.
c.flush()
case <-c.chStatsTimer.Chan():
case <-c.chStatsTimer.C:
for _, channel := range c.channels {
channel.updateStats()
}
case <-c.pingTimer.Chan():
case <-c.pingTimer.C:
c.Logger.Debug("Send Ping")
_n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPing{})
if err != nil {


+ 0
- 100
p2p/dummy/peer.go View File

@ -1,100 +0,0 @@
package dummy
import (
"net"
cmn "github.com/tendermint/tendermint/libs/common"
p2p "github.com/tendermint/tendermint/p2p"
tmconn "github.com/tendermint/tendermint/p2p/conn"
)
type peer struct {
cmn.BaseService
kv map[string]interface{}
}
var _ p2p.Peer = (*peer)(nil)
// NewPeer creates new dummy peer.
func NewPeer() *peer {
p := &peer{
kv: make(map[string]interface{}),
}
p.BaseService = *cmn.NewBaseService(nil, "peer", p)
return p
}
// FlushStop just calls Stop.
func (p *peer) FlushStop() {
p.Stop()
}
// ID always returns dummy.
func (p *peer) ID() p2p.ID {
return p2p.ID("dummy")
}
// IsOutbound always returns false.
func (p *peer) IsOutbound() bool {
return false
}
// IsPersistent always returns false.
func (p *peer) IsPersistent() bool {
return false
}
// NodeInfo always returns empty node info.
func (p *peer) NodeInfo() p2p.NodeInfo {
return p2p.DefaultNodeInfo{}
}
// RemoteIP always returns localhost.
func (p *peer) RemoteIP() net.IP {
return net.ParseIP("127.0.0.1")
}
// Addr always returns tcp://localhost:8800.
func (p *peer) RemoteAddr() net.Addr {
return &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8800}
}
// CloseConn always returns nil.
func (p *peer) CloseConn() error {
return nil
}
// Status always returns empry connection status.
func (p *peer) Status() tmconn.ConnectionStatus {
return tmconn.ConnectionStatus{}
}
// Send does not do anything and just returns true.
func (p *peer) Send(byte, []byte) bool {
return true
}
// TrySend does not do anything and just returns true.
func (p *peer) TrySend(byte, []byte) bool {
return true
}
// Set records value under key specified in the map.
func (p *peer) Set(key string, value interface{}) {
p.kv[key] = value
}
// Get returns a value associated with the key. Nil is returned if no value
// found.
func (p *peer) Get(key string) interface{} {
if value, ok := p.kv[key]; ok {
return value
}
return nil
}
// OriginalAddr always returns nil.
func (p *peer) OriginalAddr() *p2p.NetAddress {
return nil
}

+ 68
- 0
p2p/mock/peer.go View File

@ -0,0 +1,68 @@
package mock
import (
"net"
"github.com/tendermint/tendermint/crypto/ed25519"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/conn"
)
type Peer struct {
*cmn.BaseService
ip net.IP
id p2p.ID
addr *p2p.NetAddress
kv map[string]interface{}
Outbound, Persistent bool
}
// NewPeer creates and starts a new mock peer. If the ip
// is nil, random routable address is used.
func NewPeer(ip net.IP) *Peer {
var netAddr *p2p.NetAddress
if ip == nil {
_, netAddr = p2p.CreateRoutableAddr()
} else {
netAddr = p2p.NewNetAddressIPPort(ip, 26656)
}
nodeKey := p2p.NodeKey{PrivKey: ed25519.GenPrivKey()}
netAddr.ID = nodeKey.ID()
mp := &Peer{
ip: ip,
id: nodeKey.ID(),
addr: netAddr,
kv: make(map[string]interface{}),
}
mp.BaseService = cmn.NewBaseService(nil, "MockPeer", mp)
mp.Start()
return mp
}
func (mp *Peer) FlushStop() { mp.Stop() }
func (mp *Peer) TrySend(chID byte, msgBytes []byte) bool { return true }
func (mp *Peer) Send(chID byte, msgBytes []byte) bool { return true }
func (mp *Peer) NodeInfo() p2p.NodeInfo {
return p2p.DefaultNodeInfo{
ID_: mp.addr.ID,
ListenAddr: mp.addr.DialString(),
}
}
func (mp *Peer) Status() conn.ConnectionStatus { return conn.ConnectionStatus{} }
func (mp *Peer) ID() p2p.ID { return mp.id }
func (mp *Peer) IsOutbound() bool { return mp.Outbound }
func (mp *Peer) IsPersistent() bool { return mp.Persistent }
func (mp *Peer) Get(key string) interface{} {
if value, ok := mp.kv[key]; ok {
return value
}
return nil
}
func (mp *Peer) Set(key string, value interface{}) {
mp.kv[key] = value
}
func (mp *Peer) RemoteIP() net.IP { return mp.ip }
func (mp *Peer) OriginalAddr() *p2p.NetAddress { return mp.addr }
func (mp *Peer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} }
func (mp *Peer) CloseConn() error { return nil }

+ 43
- 82
p2p/pex/addrbook.go View File

@ -9,6 +9,7 @@ import (
"encoding/binary"
"fmt"
"math"
"math/rand"
"net"
"sync"
"time"
@ -405,89 +406,11 @@ func (a *addrBook) GetSelectionWithBias(biasTowardsNewAddrs int) []*p2p.NetAddre
bookSize*getSelectionPercent/100)
numAddresses = cmn.MinInt(maxGetSelection, numAddresses)
selection := make([]*p2p.NetAddress, numAddresses)
oldBucketToAddrsMap := make(map[int]map[string]struct{})
var oldIndex int
newBucketToAddrsMap := make(map[int]map[string]struct{})
var newIndex int
// initialize counters used to count old and new added addresses.
// len(oldBucketToAddrsMap) cannot be used as multiple addresses can endup in the same bucket.
var oldAddressesAdded int
var newAddressesAdded int
// number of new addresses that, if possible, should be in the beginning of the selection
numRequiredNewAdd := percentageOfNum(biasTowardsNewAddrs, numAddresses)
selectionIndex := 0
ADDRS_LOOP:
for selectionIndex < numAddresses {
// biasedTowardsOldAddrs indicates if the selection can switch to old addresses
biasedTowardsOldAddrs := selectionIndex >= numRequiredNewAdd
// An old addresses is selected if:
// - the bias is for old and old addressees are still available or,
// - there are no new addresses or all new addresses have been selected.
// numAddresses <= a.nOld + a.nNew therefore it is guaranteed that there are enough
// addresses to fill the selection
pickFromOldBucket :=
(biasedTowardsOldAddrs && oldAddressesAdded < a.nOld) ||
a.nNew == 0 || newAddressesAdded >= a.nNew
bucket := make(map[string]*knownAddress)
// loop until we pick a random non-empty bucket
for len(bucket) == 0 {
if pickFromOldBucket {
oldIndex = a.rand.Intn(len(a.bucketsOld))
bucket = a.bucketsOld[oldIndex]
} else {
newIndex = a.rand.Intn(len(a.bucketsNew))
bucket = a.bucketsNew[newIndex]
}
}
// pick a random index
randIndex := a.rand.Intn(len(bucket))
// loop over the map to return that index
var selectedAddr *p2p.NetAddress
for _, ka := range bucket {
if randIndex == 0 {
selectedAddr = ka.Addr
break
}
randIndex--
}
// if we have selected the address before, restart the loop
// otherwise, record it and continue
if pickFromOldBucket {
if addrsMap, ok := oldBucketToAddrsMap[oldIndex]; ok {
if _, ok = addrsMap[selectedAddr.String()]; ok {
continue ADDRS_LOOP
}
} else {
oldBucketToAddrsMap[oldIndex] = make(map[string]struct{})
}
oldBucketToAddrsMap[oldIndex][selectedAddr.String()] = struct{}{}
oldAddressesAdded++
} else {
if addrsMap, ok := newBucketToAddrsMap[newIndex]; ok {
if _, ok = addrsMap[selectedAddr.String()]; ok {
continue ADDRS_LOOP
}
} else {
newBucketToAddrsMap[newIndex] = make(map[string]struct{})
}
newBucketToAddrsMap[newIndex][selectedAddr.String()] = struct{}{}
newAddressesAdded++
}
selection[selectionIndex] = selectedAddr
selectionIndex++
}
// if there are no enough old addrs, will choose new addr instead.
numRequiredNewAdd := cmn.MaxInt(percentageOfNum(biasTowardsNewAddrs, numAddresses), numAddresses-a.nOld)
selection := a.randomPickAddresses(bucketTypeNew, numRequiredNewAdd)
selection = append(selection, a.randomPickAddresses(bucketTypeOld, numAddresses-len(selection))...)
return selection
}
@ -726,6 +649,44 @@ func (a *addrBook) addAddress(addr, src *p2p.NetAddress) error {
return nil
}
func (a *addrBook) randomPickAddresses(bucketType byte, num int) []*p2p.NetAddress {
var buckets []map[string]*knownAddress
switch bucketType {
case bucketTypeNew:
buckets = a.bucketsNew
case bucketTypeOld:
buckets = a.bucketsOld
default:
panic("unexpected bucketType")
}
total := 0
for _, bucket := range buckets {
total = total + len(bucket)
}
addresses := make([]*knownAddress, 0, total)
for _, bucket := range buckets {
for _, ka := range bucket {
addresses = append(addresses, ka)
}
}
selection := make([]*p2p.NetAddress, 0, num)
chosenSet := make(map[string]bool, num)
rand.Shuffle(total, func(i, j int) {
addresses[i], addresses[j] = addresses[j], addresses[i]
})
for _, addr := range addresses {
if chosenSet[addr.Addr.String()] {
continue
}
chosenSet[addr.Addr.String()] = true
selection = append(selection, addr.Addr)
if len(selection) >= num {
return selection
}
}
return selection
}
// Make space in the new buckets by expiring the really bad entries.
// If no bad entries are available we remove the oldest.
func (a *addrBook) expireNew(bucketIdx int) {


+ 15
- 20
p2p/pex/addrbook_test.go View File

@ -435,12 +435,12 @@ func TestPrivatePeers(t *testing.T) {
func testAddrBookAddressSelection(t *testing.T, bookSize int) {
// generate all combinations of old (m) and new addresses
for nOld := 0; nOld <= bookSize; nOld++ {
nNew := bookSize - nOld
dbgStr := fmt.Sprintf("book of size %d (new %d, old %d)", bookSize, nNew, nOld)
for nBookOld := 0; nBookOld <= bookSize; nBookOld++ {
nBookNew := bookSize - nBookOld
dbgStr := fmt.Sprintf("book of size %d (new %d, old %d)", bookSize, nBookNew, nBookOld)
// create book and get selection
book, fname := createAddrBookWithMOldAndNNewAddrs(t, nOld, nNew)
book, fname := createAddrBookWithMOldAndNNewAddrs(t, nBookOld, nBookNew)
defer deleteTempFile(fname)
addrs := book.GetSelectionWithBias(biasToSelectNewPeers)
assert.NotNil(t, addrs, "%s - expected a non-nil selection", dbgStr)
@ -460,27 +460,25 @@ func testAddrBookAddressSelection(t *testing.T, bookSize int) {
// Given:
// n - num new addrs, m - num old addrs
// k - num new addrs expected in the beginning (based on bias %)
// i=min(n, k), aka expFirstNew
// i=min(n, max(k,r-m)), aka expNew
// j=min(m, r-i), aka expOld
//
// We expect this layout:
// indices: 0...i-1 i...i+j-1 i+j...r
// addresses: N0..Ni-1 O0..Oj-1 Ni...
// indices: 0...i-1 i...i+j-1
// addresses: N0..Ni-1 O0..Oj-1
//
// There is at least one partition and at most three.
var (
k = percentageOfNum(biasToSelectNewPeers, nAddrs)
expFirstNew = cmn.MinInt(nNew, k)
expOld = cmn.MinInt(nOld, nAddrs-expFirstNew)
expNew = nAddrs - expOld
expLastNew = expNew - expFirstNew
k = percentageOfNum(biasToSelectNewPeers, nAddrs)
expNew = cmn.MinInt(nNew, cmn.MaxInt(k, nAddrs-nBookOld))
expOld = cmn.MinInt(nOld, nAddrs-expNew)
)
// Verify that the number of old and new addresses are as expected
if nNew < expNew || nNew > expNew {
if nNew != expNew {
t.Fatalf("%s - expected new addrs %d, got %d", dbgStr, expNew, nNew)
}
if nOld < expOld || nOld > expOld {
if nOld != expOld {
t.Fatalf("%s - expected old addrs %d, got %d", dbgStr, expOld, nOld)
}
@ -499,15 +497,12 @@ func testAddrBookAddressSelection(t *testing.T, bookSize int) {
case expOld == 0: // all new addresses
expSeqLens = []int{nAddrs}
expSeqTypes = []int{1}
case expFirstNew == 0: // all old addresses
case expNew == 0: // all old addresses
expSeqLens = []int{nAddrs}
expSeqTypes = []int{2}
case nAddrs-expFirstNew-expOld == 0: // new addresses, old addresses
expSeqLens = []int{expFirstNew, expOld}
case nAddrs-expNew-expOld == 0: // new addresses, old addresses
expSeqLens = []int{expNew, expOld}
expSeqTypes = []int{1, 2}
default: // new addresses, old addresses, new addresses
expSeqLens = []int{expFirstNew, expOld, expLastNew}
expSeqTypes = []int{1, 2, 1}
}
assert.Equal(t, expSeqLens, seqLens,


+ 5
- 48
p2p/pex/pex_reactor_test.go View File

@ -3,7 +3,6 @@ package pex
import (
"fmt"
"io/ioutil"
"net"
"os"
"path/filepath"
"testing"
@ -12,14 +11,10 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/ed25519"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/conn"
"github.com/tendermint/tendermint/p2p/mock"
)
var (
@ -148,7 +143,7 @@ func TestPEXReactorRequestMessageAbuse(t *testing.T) {
sw := createSwitchAndAddReactors(r)
sw.SetAddrBook(book)
peer := newMockPeer()
peer := mock.NewPeer(nil)
p2p.AddPeerToSwitch(sw, peer)
assert.True(t, sw.Peers().Has(peer.ID()))
@ -178,7 +173,7 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
sw := createSwitchAndAddReactors(r)
sw.SetAddrBook(book)
peer := newMockPeer()
peer := mock.NewPeer(nil)
p2p.AddPeerToSwitch(sw, peer)
assert.True(t, sw.Peers().Has(peer.ID()))
@ -418,7 +413,7 @@ func TestPEXReactorDialPeer(t *testing.T) {
sw := createSwitchAndAddReactors(pexR)
sw.SetAddrBook(book)
peer := newMockPeer()
peer := mock.NewPeer(nil)
addr := peer.NodeInfo().NetAddress()
assert.Equal(t, 0, pexR.AttemptsToDial(addr))
@ -444,44 +439,6 @@ func TestPEXReactorDialPeer(t *testing.T) {
}
}
type mockPeer struct {
*cmn.BaseService
pubKey crypto.PubKey
addr *p2p.NetAddress
outbound, persistent bool
}
func newMockPeer() mockPeer {
_, netAddr := p2p.CreateRoutableAddr()
mp := mockPeer{
addr: netAddr,
pubKey: ed25519.GenPrivKey().PubKey(),
}
mp.BaseService = cmn.NewBaseService(nil, "MockPeer", mp)
mp.Start()
return mp
}
func (mp mockPeer) FlushStop() { mp.Stop() }
func (mp mockPeer) ID() p2p.ID { return mp.addr.ID }
func (mp mockPeer) IsOutbound() bool { return mp.outbound }
func (mp mockPeer) IsPersistent() bool { return mp.persistent }
func (mp mockPeer) NodeInfo() p2p.NodeInfo {
return p2p.DefaultNodeInfo{
ID_: mp.addr.ID,
ListenAddr: mp.addr.DialString(),
}
}
func (mockPeer) RemoteIP() net.IP { return net.ParseIP("127.0.0.1") }
func (mockPeer) Status() conn.ConnectionStatus { return conn.ConnectionStatus{} }
func (mockPeer) Send(byte, []byte) bool { return false }
func (mockPeer) TrySend(byte, []byte) bool { return false }
func (mockPeer) Set(string, interface{}) {}
func (mockPeer) Get(string) interface{} { return nil }
func (mockPeer) OriginalAddr() *p2p.NetAddress { return nil }
func (mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8800} }
func (mockPeer) CloseConn() error { return nil }
func assertPeersWithTimeout(
t *testing.T,
switches []*p2p.Switch,


+ 10
- 5
p2p/switch.go View File

@ -234,21 +234,26 @@ func (sw *Switch) OnStop() {
//
// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool {
successChan := make(chan bool, len(sw.peers.List()))
sw.Logger.Debug("Broadcast", "channel", chID, "msgBytes", fmt.Sprintf("%X", msgBytes))
peers := sw.peers.List()
var wg sync.WaitGroup
for _, peer := range sw.peers.List() {
wg.Add(1)
go func(peer Peer) {
wg.Add(len(peers))
successChan := make(chan bool, len(peers))
for _, peer := range peers {
go func(p Peer) {
defer wg.Done()
success := peer.Send(chID, msgBytes)
success := p.Send(chID, msgBytes)
successChan <- success
}(peer)
}
go func() {
wg.Wait()
close(successChan)
}()
return successChan
}


+ 1
- 5
rpc/client/httpclient.go View File

@ -52,11 +52,7 @@ func NewHTTP(remote, wsEndpoint string) *HTTP {
}
}
var (
_ Client = (*HTTP)(nil)
_ NetworkClient = (*HTTP)(nil)
_ EventsClient = (*HTTP)(nil)
)
var _ Client = (*HTTP)(nil)
func (c *HTTP) Status() (*ctypes.ResultStatus, error) {
result := new(ctypes.ResultStatus)


+ 3
- 5
rpc/client/interface.go View File

@ -72,17 +72,15 @@ type StatusClient interface {
type Client interface {
cmn.Service
ABCIClient
SignClient
EventsClient
HistoryClient
NetworkClient
SignClient
StatusClient
EventsClient
}
// NetworkClient is general info about the network state. May not
// be needed usually.
//
// Not included in the Client interface, but generally implemented
// by concrete implementations.
type NetworkClient interface {
NetInfo() (*ctypes.ResultNetInfo, error)
DumpConsensusState() (*ctypes.ResultDumpConsensusState, error)


+ 1
- 5
rpc/client/localclient.go View File

@ -58,11 +58,7 @@ func NewLocal(node *nm.Node) *Local {
}
}
var (
_ Client = (*Local)(nil)
_ NetworkClient = (*Local)(nil)
_ EventsClient = (*Local)(nil)
)
var _ Client = (*Local)(nil)
// SetLogger allows to set a logger on the client.
func (c *Local) SetLogger(l log.Logger) {


+ 12
- 0
rpc/client/mock/client.go View File

@ -108,6 +108,18 @@ func (c Client) NetInfo() (*ctypes.ResultNetInfo, error) {
return core.NetInfo(&rpctypes.Context{})
}
func (c Client) ConsensusState() (*ctypes.ResultConsensusState, error) {
return core.ConsensusState(&rpctypes.Context{})
}
func (c Client) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
return core.DumpConsensusState(&rpctypes.Context{})
}
func (c Client) Health() (*ctypes.ResultHealth, error) {
return core.Health(&rpctypes.Context{})
}
func (c Client) DialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) {
return core.UnsafeDialSeeds(&rpctypes.Context{}, seeds)
}


+ 2
- 0
rpc/core/consensus.go View File

@ -10,6 +10,8 @@ import (
// Get the validator set at the given block height.
// If no height is provided, it will fetch the current validator set.
// Note the validators are sorted by their address - this is the canonical
// order for the validators in the set as used in computing their Merkle root.
//
// ```shell
// curl 'localhost:26657/validators'


+ 3
- 1
rpc/lib/client/http_client.go View File

@ -74,7 +74,9 @@ func makeHTTPClient(remoteAddr string) (string, *http.Client) {
protocol, address, dialer := makeHTTPDialer(remoteAddr)
return protocol + "://" + address, &http.Client{
Transport: &http.Transport{
Dial: dialer,
// Set to true to prevent GZIP-bomb DoS attacks
DisableCompression: true,
Dial: dialer,
},
}
}


+ 12
- 5
state/services.go View File

@ -23,6 +23,7 @@ type Mempool interface {
Size() int
CheckTx(types.Tx, func(*abci.Response)) error
CheckTxWithInfo(types.Tx, func(*abci.Response), mempool.TxInfo) error
ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs
Update(int64, types.Txs, mempool.PreCheckFunc, mempool.PostCheckFunc) error
Flush()
@ -37,11 +38,17 @@ type MockMempool struct{}
var _ Mempool = MockMempool{}
func (MockMempool) Lock() {}
func (MockMempool) Unlock() {}
func (MockMempool) Size() int { return 0 }
func (MockMempool) CheckTx(_ types.Tx, _ func(*abci.Response)) error { return nil }
func (MockMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
func (MockMempool) Lock() {}
func (MockMempool) Unlock() {}
func (MockMempool) Size() int { return 0 }
func (MockMempool) CheckTx(_ types.Tx, _ func(*abci.Response)) error {
return nil
}
func (MockMempool) CheckTxWithInfo(_ types.Tx, _ func(*abci.Response),
_ mempool.TxInfo) error {
return nil
}
func (MockMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
func (MockMempool) Update(
_ int64,
_ types.Txs,


+ 4
- 4
tools/tm-signer-harness/internal/test_harness.go View File

@ -198,8 +198,8 @@ func (th *TestHarness) TestSignProposal() error {
hash := tmhash.Sum([]byte("hash"))
prop := &types.Proposal{
Type: types.ProposalType,
Height: 12345,
Round: 23456,
Height: 100,
Round: 0,
POLRound: -1,
BlockID: types.BlockID{
Hash: hash,
@ -240,8 +240,8 @@ func (th *TestHarness) TestSignVote() error {
hash := tmhash.Sum([]byte("hash"))
vote := &types.Vote{
Type: voteType,
Height: 12345,
Round: 23456,
Height: 101,
Round: 0,
BlockID: types.BlockID{
Hash: hash,
PartsHeader: types.PartSetHeader{


+ 0
- 33
types/protobuf.go View File

@ -220,36 +220,3 @@ func (pb2tm) ValidatorUpdates(vals []abci.ValidatorUpdate) ([]*Validator, error)
}
return tmVals, nil
}
// BlockParams.TimeIotaMs is not exposed to the application. Therefore a caller
// must provide it.
func (pb2tm) ConsensusParams(csp *abci.ConsensusParams, blockTimeIotaMs int64) ConsensusParams {
params := ConsensusParams{
Block: BlockParams{},
Evidence: EvidenceParams{},
Validator: ValidatorParams{},
}
// we must defensively consider any structs may be nil
if csp.Block != nil {
params.Block = BlockParams{
MaxBytes: csp.Block.MaxBytes,
MaxGas: csp.Block.MaxGas,
TimeIotaMs: blockTimeIotaMs,
}
}
if csp.Evidence != nil {
params.Evidence = EvidenceParams{
MaxAge: csp.Evidence.MaxAge,
}
}
if csp.Validator != nil {
params.Validator = ValidatorParams{
PubKeyTypes: csp.Validator.PubKeyTypes,
}
}
return params
}

+ 1
- 1
types/protobuf_test.go View File

@ -64,7 +64,7 @@ func TestABCIValidators(t *testing.T) {
func TestABCIConsensusParams(t *testing.T) {
cp := DefaultConsensusParams()
abciCP := TM2PB.ConsensusParams(cp)
cp2 := PB2TM.ConsensusParams(abciCP, cp.Block.TimeIotaMs)
cp2 := cp.Update(abciCP)
assert.Equal(t, *cp, cp2)
}


+ 2
- 1
types/validator_set.go View File

@ -31,7 +31,8 @@ const (
// ValidatorSet represent a set of *Validator at a given height.
// The validators can be fetched by address or index.
// The index is in order of .Address, so the indices are fixed
// for all rounds of a given blockchain height.
// for all rounds of a given blockchain height - ie. the validators
// are sorted by their address.
// On the other hand, the .ProposerPriority of each validator and
// the designated .GetProposer() of a set changes every round,
// upon calling .IncrementProposerPriority().


+ 1
- 1
version/version.go View File

@ -20,7 +20,7 @@ const (
// Must be a string because scripts like dist.sh read this file.
// XXX: Don't change the name of this variable or you will break
// automation :)
TMCoreSemVer = "0.31.0"
TMCoreSemVer = "0.31.1"
// ABCISemVer is the semantic version of the ABCI library
ABCISemVer = "0.16.0"


Loading…
Cancel
Save