From a6349f50633a45755041b8230a0f7eb990383cfd Mon Sep 17 00:00:00 2001 From: Anca Zamfir Date: Wed, 20 Mar 2019 01:56:13 +0200 Subject: [PATCH 01/25] Formalize proposer election algorithm properties (#3140) * Update proposer-selection.md * Fixed typos * fixed typos * Attempt to address some comments * Update proposer-selection.md * Update proposer-selection.md * Update proposer-selection.md Added the normalization step. * Addressed review comments * New example for normalization section Added a new example to better show the need for normalization Added requirement for changing validator set Addressed review comments * Fixed problem with R2 * fixed the math for new validator * test * more small updates * Moved the centering above the round-robin election - the centering is now done before the actual round-robin block - updated examples - cleanup * change to reflect new implementation for new validator --- .../reactors/consensus/proposer-selection.md | 305 ++++++++++++++++-- 1 file changed, 275 insertions(+), 30 deletions(-) diff --git a/docs/spec/reactors/consensus/proposer-selection.md b/docs/spec/reactors/consensus/proposer-selection.md index b5e0b35af..6cb596ec0 100644 --- a/docs/spec/reactors/consensus/proposer-selection.md +++ b/docs/spec/reactors/consensus/proposer-selection.md @@ -2,45 +2,290 @@ This document specifies the Proposer Selection Procedure that is used in Tendermint to choose a round proposer. As Tendermint is “leader-based protocol”, the proposer selection is critical for its correct functioning. -Let denote with `proposer_p(h,r)` a process returned by the Proposer Selection Procedure at the process p, at height h -and round r. Then the Proposer Selection procedure should fulfill the following properties: -`Agreement`: Given a validator set V, and two honest validators, -p and q, for each height h, and each round r, -proposer_p(h,r) = proposer_q(h,r) +At a given block height, the proposer selection algorithm runs with the same validator set at each round . +Between heights, an updated validator set may be specified by the application as part of the ABCIResponses' EndBlock. -`Liveness`: In every consecutive sequence of rounds of size K (K is system parameter), at least a -single round has an honest proposer. +## Requirements for Proposer Selection -`Fairness`: The proposer selection is proportional to the validator voting power, i.e., a validator with more -voting power is selected more frequently, proportional to its power. More precisely, given a set of processes -with the total voting power N, during a sequence of rounds of size N, every process is proposer in a number of rounds -equal to its voting power. +This sections covers the requirements with Rx being mandatory and Ox optional requirements. +The following requirements must be met by the Proposer Selection procedure: -We now look at a few particular cases to understand better how fairness should be implemented. -If we have 4 processes with the following voting power distribution (p0,4), (p1, 2), (p2, 2), (p3, 2) at some round r, -we have the following sequence of proposer selections in the following rounds: +#### R1: Determinism +Given a validator set `V`, and two honest validators `p` and `q`, for each height `h` and each round `r` the following must hold: -`p0, p1, p2, p3, p0, p0, p1, p2, p3, p0, p0, p1, p2, p3, p0, p0, p1, p2, p3, p0, etc` + `proposer_p(h,r) = proposer_q(h,r)` -Let consider now the following scenario where a total voting power of faulty processes is aggregated in a single process -p0: (p0,3), (p1, 1), (p2, 1), (p3, 1), (p4, 1), (p5, 1), (p6, 1), (p7, 1). -In this case the sequence of proposer selections looks like this: +where `proposer_p(h,r)` is the proposer returned by the Proposer Selection Procedure at process `p`, at height `h` and round `r`. -`p0, p1, p2, p3, p0, p4, p5, p6, p7, p0, p0, p1, p2, p3, p0, p4, p5, p6, p7, p0, etc` +#### R2: Fairness +Given a validator set with total voting power P and a sequence S of elections. In any sub-sequence of S with length C*P, a validator v must be elected as proposer P/VP(v) times, i.e. with frequency: -In this case, we see that a number of rounds coordinated by a faulty process is proportional to its voting power. -We consider also the case where we have voting power uniformly distributed among processes, i.e., we have 10 processes -each with voting power of 1. And let consider that there are 3 faulty processes with consecutive addresses, -for example the first 3 processes are faulty. Then the sequence looks like this: + f(v) ~ VP(v) / P -`p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, etc` +where C is a tolerance factor for validator set changes with following values: +- C == 1 if there are no validator set changes +- C ~ k when there are validator changes -In this case, we have 3 consecutive rounds with a faulty proposer. -One special case we consider is the case where a single honest process p0 has most of the voting power, for example: -(p0,100), (p1, 2), (p2, 3), (p3, 4). Then the sequence of proposer selection looks like this: +*[this needs more work]* -p0, p0, p0, p0, p0, p0, p0, p0, p0, p0, p0, p0, p0, p1, p0, p0, p0, p0, p0, etc +### Basic Algorithm -This basically means that almost all rounds have the same proposer. But in this case, the process p0 has anyway enough -voting power to decide whatever he wants, so the fact that he coordinates almost all rounds seems correct. +At its core, the proposer selection procedure uses a weighted round-robin algorithm. + +A model that gives a good intuition on how/ why the selection algorithm works and it is fair is that of a priority queue. The validators move ahead in this queue according to their voting power (the higher the voting power the faster a validator moves towards the head of the queue). When the algorithm runs the following happens: +- all validators move "ahead" according to their powers: for each validator, increase the priority by the voting power +- first in the queue becomes the proposer: select the validator with highest priority +- move the proposer back in the queue: decrease the proposer's priority by the total voting power + +Notation: +- vset - the validator set +- n - the number of validators +- VP(i) - voting power of validator i +- A(i) - accumulated priority for validator i +- P - total voting power of set +- avg - average of all validator priorities +- prop - proposer + +Simple view at the Selection Algorithm: + +``` + def ProposerSelection (vset): + + // compute priorities and elect proposer + for each validator i in vset: + A(i) += VP(i) + prop = max(A) + A(prop) -= P +``` + +### Stable Set + +Consider the validator set: + +Validator | p1| p2 +----------|---|--- +VP | 1 | 3 + +Assuming no validator changes, the following table shows the proposer priority computation over a few runs. Four runs of the selection procedure are shown, starting with the 5th the same values are computed. +Each row shows the priority queue and the process place in it. The proposer is the closest to the head, the rightmost validator. As priorities are updated, the validators move right in the queue. The proposer moves left as its priority is reduced after election. + +|Priority Run | -2| -1| 0 | 1| 2 | 3 | 4 | 5 | Alg step +|--------------- |---|---|---- |---|---- |---|---|---|-------- +| | | |p1,p2| | | | | |Initialized to 0 +|run 1 | | | | p1| | p2| | |A(i)+=VP(i) +| | | p2| | p1| | | | |A(p2)-= P +|run 2 | | | | |p1,p2| | | |A(i)+=VP(i) +| | p1| | | | p2| | | |A(p1)-= P +|run 3 | | p1| | | | | | p2|A(i)+=VP(i) +| | | p1| | p2| | | | |A(p2)-= P +|run 4 | | | p1| | | | p2| |A(i)+=VP(i) +| | | |p1,p2| | | | | |A(p2)-= P + +It can be shown that: +- At the end of each run k+1 the sum of the priorities is the same as at end of run k. If a new set's priorities are initialized to 0 then the sum of priorities will be 0 at each run while there are no changes. +- The max distance between priorites is (n-1) * P. *[formal proof not finished]* + +### Validator Set Changes +Between proposer selection runs the validator set may change. Some changes have implications on the proposer election. + +#### Voting Power Change +Consider again the earlier example and assume that the voting power of p1 is changed to 4: + +Validator | p1| p2 +----------|---| --- +VP | 4 | 3 + +Let's also assume that before this change the proposer priorites were as shown in first row (last run). As it can be seen, the selection could run again, without changes, as before. + +|Priority Run| -2 | -1 | 0 | 1 | 2 | Comment +|--------------| ---|--- |------|--- |--- |-------- +| last run | | p2 | | p1 | |__update VP(p1)__ +| next run | | | | | p2 |A(i)+=VP(i) +| | p1 | | | | p2 |A(p1)-= P + +However, when a validator changes power from a high to a low value, some other validator remain far back in the queue for a long time. This scenario is considered again in the Proposer Priority Range section. + +As before: +- At the end of each run k+1 the sum of the priorities is the same as at run k. +- The max distance between priorites is (n-1) * P. + +#### Validator Removal +Consider a new example with set: + +Validator | p1 | p2 | p3 | +--------- |--- |--- |--- | +VP | 1 | 2 | 3 | + +Let's assume that after the last run the proposer priorities were as shown in first row with their sum being 0. After p2 is removed, at the end of next proposer selection run (penultimate row) the sum of priorities is -2 (minus the priority of the removed process). + +The procedure could continue without modifications. However, after a sufficiently large number of modifications in validator set, the priority values would migrate towards maximum or minimum allowed values causing truncations due to overflow detection. +For this reason, the selection procedure adds another __new step__ that centers the current priority values such that the priority sum remains close to 0. + +|Priority Run |-3 | -2 | -1 | 0 | 1 | 2 | 4 |Comment +|--------------- |--- | ---|--- |--- |--- |--- |---|-------- +| last run |p3 | | | | p1 | p2 | |__remove p2__ +| nextrun | | | | | | | | +| __new step__ | | p3 | | | | p1 | |A(i) -= avg, avg = -1 +| | | | | | p3 | p1 | |A(i)+=VP(i) +| | | | p1 | | p3 | | |A(p1)-= P + +The modified selection algorithm is: + + def ProposerSelection (vset): + + // center priorities around zero + avg = sum(A(i) for i in vset)/len(vset) + for each validator i in vset: + A(i) -= avg + + // compute priorities and elect proposer + for each validator i in vset: + A(i) += VP(i) + prop = max(A) + A(prop) -= P + +Observations: +- The sum of priorities is now close to 0. Due to integer division the sum is an integer in (-n, n), where n is the number of validators. + +#### New Validator +When a new validator is added, same problem as the one described for removal appears, the sum of priorities in the new set is not zero. This is fixed with the centering step introduced above. + +One other issue that needs to be addressed is the following. A validator V that has just been elected is moved to the end of the queue. If the validator set is large and/ or other validators have significantly higher power, V will have to wait many runs to be elected. If V removes and re-adds itself to the set, it would make a significant (albeit unfair) "jump" ahead in the queue. + +In order to prevent this, when a new validator is added, its initial priority is set to: + + A(V) = -1.125 * P + +where P is the total voting power of the set including V. + +Curent implementation uses the penalty factor of 1.125 because it provides a small punishment that is efficient to calculate. See [here](https://github.com/tendermint/tendermint/pull/2785#discussion_r235038971) for more details. + +If we consider the validator set where p3 has just been added: + +Validator | p1 | p2 | p3 +----------|--- |--- |--- +VP | 1 | 3 | 8 + +then p3 will start with proposer priority: + + A(p3) = -1.125 * (1 + 3 + 8) ~ -13 + +Note that since current computation uses integer division there is penalty loss when sum of the voting power is less than 8. + +In the next run, p3 will still be ahead in the queue, elected as proposer and moved back in the queue. + +|Priority Run |-13 | -9 | -5 | -2 | -1 | 0 | 1 | 2 | 5 | 6 | 7 |Alg step +|---------------|--- |--- |--- |----|--- |--- |---|---|---|---|---|-------- +|last run | | | | p2 | | | | p1| | | |__add p3__ +| | p3 | | | p2 | | | | p1| | | |A(p3) = -4 +|next run | | p3 | | | | | | p2| | p1| |A(i) -= avg, avg = -4 +| | | | | | p3 | | | | p2| | p1|A(i)+=VP(i) +| | | | p1 | | p3 | | | | p2| | |A(p1)-=P + +### Proposer Priority Range +With the introduction of centering, some interesting cases occur. Low power validators that bind early in a set that includes high power validator(s) benefit from subsequent additions to the set. This is because these early validators run through more right shift operations during centering, operations that increase their priority. + +As an example, consider the set where p2 is added after p1, with priority -1.125 * 80k = -90k. After the selection procedure runs once: + +Validator | p1 | p2 | Comment +----------|-----|---- |--- +VP | 80k | 10 | +A | 0 |-90k | __added p2__ +A |-45k | 45k | __run selection__ + +Then execute the following steps: + +1. Add a new validator p3: + +Validator | p1 | p2 | p3 +----------|-----|--- |---- +VP | 80k | 10 | 10 + +2. Run selection once. The notation '..p'/'p..' means very small deviations compared to column priority. + +|Priority Run | -90k..| -60k | -45k | -15k| 0 | 45k | 75k | 155k | Comment +|--------------|------ |----- |------- |---- |---|---- |----- |------- |--------- +| last run | p3 | | p2 | | | p1 | | | __added p3__ +| next run +| *right_shift*| | p3 | | p2 | | | p1 | | A(i) -= avg,avg=-30k +| | | ..p3| | ..p2| | | | p1 | A(i)+=VP(i) +| | | ..p3| | ..p2| | | p1.. | | A(p1)-=P, P=80k+20 + + +3. Remove p1 and run selection once: + +Validator | p3 | p2 | Comment +----------|----- |---- |-------- +VP | 10 | 10 | +A |-60k |-15k | +A |-22.5k|22.5k| __run selection__ + +At this point, while the total voting power is 20, the distance between priorities is 45k. It will take 4500 runs for p3 to catch up with p2. + +In order to prevent these types of scenarios, the selection algorithm performs scaling of priorities such that the difference between min and max values is smaller than two times the total voting power. + +The modified selection algorithm is: + + def ProposerSelection (vset): + + // scale the priority values + diff = max(A)-min(A) + threshold = 2 * P + if diff > threshold: + scale = diff/threshold + for each validator i in vset: + A(i) = A(i)/scale + + // center priorities around zero + avg = sum(A(i) for i in vset)/len(vset) + for each validator i in vset: + A(i) -= avg + + // compute priorities and elect proposer + for each validator i in vset: + A(i) += VP(i) + prop = max(A) + A(prop) -= P + +Observations: +- With this modification, the maximum distance between priorites becomes 2 * P. + +Note also that even during steady state the priority range may increase beyond 2 * P. The scaling introduced here helps to keep the range bounded. + +### Wrinkles + +#### Validator Power Overflow Conditions +The validator voting power is a positive number stored as an int64. When a validator is added the `1.125 * P` computation must not overflow. As a consequence the code handling validator updates (add and update) checks for overflow conditions making sure the total voting power is never larger than the largest int64 `MAX`, with the property that `1.125 * MAX` is still in the bounds of int64. Fatal error is return when overflow condition is detected. + +#### Proposer Priority Overflow/ Underflow Handling +The proposer priority is stored as an int64. The selection algorithm performs additions and subtractions to these values and in the case of overflows and underflows it limits the values to: + + MaxInt64 = 1 << 63 - 1 + MinInt64 = -1 << 63 + +### Requirement Fulfillment Claims +__[R1]__ + +The proposer algorithm is deterministic giving consistent results across executions with same transactions and validator set modifications. +[WIP - needs more detail] + +__[R2]__ + +Given a set of processes with the total voting power P, during a sequence of elections of length P, the number of times any process is selected as proposer is equal to its voting power. The sequence of the P proposers then repeats. If we consider the validator set: + +Validator | p1| p2 +----------|---|--- +VP | 1 | 3 + +With no other changes to the validator set, the current implementation of proposer selection generates the sequence: +`p2, p1, p2, p2, p2, p1, p2, p2,...` or [`p2, p1, p2, p2`]* +A sequence that starts with any circular permutation of the [`p2, p1, p2, p2`] sub-sequence would also provide the same degree of fairness. In fact these circular permutations show in the sliding window (over the generated sequence) of size equal to the length of the sub-sequence. + +Assigning priorities to each validator based on the voting power and updating them at each run ensures the fairness of the proposer selection. In addition, every time a validator is elected as proposer its priority is decreased with the total voting power. + +Intuitively, a process v jumps ahead in the queue at most (max(A) - min(A))/VP(v) times until it reaches the head and is elected. The frequency is then: + + f(v) ~ VP(v)/(max(A)-min(A)) = 1/k * VP(v)/P + +For current implementation, this means v should be proposer at least VP(v) times out of k * P runs, with scaling factor k=2. From 60b2ae5f5a3e16625af1342e012462448d565394 Mon Sep 17 00:00:00 2001 From: needkane <604476380@qq.com> Date: Wed, 20 Mar 2019 08:00:53 +0800 Subject: [PATCH 02/25] crypto: delete unused code (#3426) --- crypto/doc.go | 3 --- crypto/example_test.go | 7 ------- crypto/hash.go | 8 -------- 3 files changed, 18 deletions(-) diff --git a/crypto/doc.go b/crypto/doc.go index 41b3f3021..95ae0af18 100644 --- a/crypto/doc.go +++ b/crypto/doc.go @@ -37,9 +37,6 @@ // sum := crypto.Sha256([]byte("This is Tendermint")) // fmt.Printf("%x\n", sum) -// Ripemd160 -// sum := crypto.Ripemd160([]byte("This is consensus")) -// fmt.Printf("%x\n", sum) package crypto // TODO: Add more docs in here diff --git a/crypto/example_test.go b/crypto/example_test.go index 904e1c610..f1d0013d4 100644 --- a/crypto/example_test.go +++ b/crypto/example_test.go @@ -26,10 +26,3 @@ func ExampleSha256() { // Output: // f91afb642f3d1c87c17eb01aae5cb65c242dfdbe7cf1066cc260f4ce5d33b94e } - -func ExampleRipemd160() { - sum := crypto.Ripemd160([]byte("This is Tendermint")) - fmt.Printf("%x\n", sum) - // Output: - // 051e22663e8f0fd2f2302f1210f954adff009005 -} diff --git a/crypto/hash.go b/crypto/hash.go index c1fb41f7a..e1d22523f 100644 --- a/crypto/hash.go +++ b/crypto/hash.go @@ -2,8 +2,6 @@ package crypto import ( "crypto/sha256" - - "golang.org/x/crypto/ripemd160" ) func Sha256(bytes []byte) []byte { @@ -11,9 +9,3 @@ func Sha256(bytes []byte) []byte { hasher.Write(bytes) return hasher.Sum(nil) } - -func Ripemd160(bytes []byte) []byte { - hasher := ripemd160.New() - hasher.Write(bytes) - return hasher.Sum(nil) -} From 7af4b5086af9268f7cc8b41f5a174ade675d8ab4 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 20 Mar 2019 03:10:54 +0300 Subject: [PATCH 03/25] Remove RepeatTimer and refactor Switch#Broadcast (#3429) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * p2p: refactor Switch#Broadcast func - call wg.Add only once - do not call peers.List twice! * bad for perfomance * peers list can change in between calls! Refs #3306 * p2p: use time.Ticker instead of RepeatTimer no need in RepeatTimer since we don't Reset them Refs #3306 * libs/common: remove RepeatTimer (also TimerMaker and Ticker interface) "ancient code that’s caused no end of trouble" Ethan I believe there's much simplier way to write a ticker than can be reset https://medium.com/@arpith/resetting-a-ticker-in-go-63858a2c17ec --- CHANGELOG_PENDING.md | 1 + libs/common/repeat_timer.go | 232 ------------------------------- libs/common/repeat_timer_test.go | 136 ------------------ p2p/conn/connection.go | 12 +- p2p/switch.go | 15 +- 5 files changed, 17 insertions(+), 379 deletions(-) delete mode 100644 libs/common/repeat_timer.go delete mode 100644 libs/common/repeat_timer_test.go diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 37ae3a510..3cbc63b7b 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -9,6 +9,7 @@ * Apps * Go API +- [libs/common] Remove RepeatTimer (also TimerMaker and Ticker interface) * Blockchain Protocol diff --git a/libs/common/repeat_timer.go b/libs/common/repeat_timer.go deleted file mode 100644 index 5d049738d..000000000 --- a/libs/common/repeat_timer.go +++ /dev/null @@ -1,232 +0,0 @@ -package common - -import ( - "sync" - "time" -) - -// Used by RepeatTimer the first time, -// and every time it's Reset() after Stop(). -type TickerMaker func(dur time.Duration) Ticker - -// Ticker is a basic ticker interface. -type Ticker interface { - - // Never changes, never closes. - Chan() <-chan time.Time - - // Stopping a stopped Ticker will panic. - Stop() -} - -//---------------------------------------- -// defaultTicker - -var _ Ticker = (*defaultTicker)(nil) - -type defaultTicker time.Ticker - -func defaultTickerMaker(dur time.Duration) Ticker { - ticker := time.NewTicker(dur) - return (*defaultTicker)(ticker) -} - -// Implements Ticker -func (t *defaultTicker) Chan() <-chan time.Time { - return t.C -} - -// Implements Ticker -func (t *defaultTicker) Stop() { - ((*time.Ticker)(t)).Stop() -} - -//---------------------------------------- -// LogicalTickerMaker - -// Construct a TickerMaker that always uses `source`. -// It's useful for simulating a deterministic clock. -func NewLogicalTickerMaker(source chan time.Time) TickerMaker { - return func(dur time.Duration) Ticker { - return newLogicalTicker(source, dur) - } -} - -type logicalTicker struct { - source <-chan time.Time - ch chan time.Time - quit chan struct{} -} - -func newLogicalTicker(source <-chan time.Time, interval time.Duration) Ticker { - lt := &logicalTicker{ - source: source, - ch: make(chan time.Time), - quit: make(chan struct{}), - } - go lt.fireRoutine(interval) - return lt -} - -// We need a goroutine to read times from t.source -// and fire on t.Chan() when `interval` has passed. -func (t *logicalTicker) fireRoutine(interval time.Duration) { - source := t.source - - // Init `lasttime` - lasttime := time.Time{} - select { - case lasttime = <-source: - case <-t.quit: - return - } - // Init `lasttime` end - - for { - select { - case newtime := <-source: - elapsed := newtime.Sub(lasttime) - if interval <= elapsed { - // Block for determinism until the ticker is stopped. - select { - case t.ch <- newtime: - case <-t.quit: - return - } - // Reset timeleft. - // Don't try to "catch up" by sending more. - // "Ticker adjusts the intervals or drops ticks to make up for - // slow receivers" - https://golang.org/pkg/time/#Ticker - lasttime = newtime - } - case <-t.quit: - return // done - } - } -} - -// Implements Ticker -func (t *logicalTicker) Chan() <-chan time.Time { - return t.ch // immutable -} - -// Implements Ticker -func (t *logicalTicker) Stop() { - close(t.quit) // it *should* panic when stopped twice. -} - -//--------------------------------------------------------------------- - -/* - RepeatTimer repeatedly sends a struct{}{} to `.Chan()` after each `dur` - period. (It's good for keeping connections alive.) - A RepeatTimer must be stopped, or it will keep a goroutine alive. -*/ -type RepeatTimer struct { - name string - ch chan time.Time - tm TickerMaker - - mtx sync.Mutex - dur time.Duration - ticker Ticker - quit chan struct{} -} - -// NewRepeatTimer returns a RepeatTimer with a defaultTicker. -func NewRepeatTimer(name string, dur time.Duration) *RepeatTimer { - return NewRepeatTimerWithTickerMaker(name, dur, defaultTickerMaker) -} - -// NewRepeatTimerWithTicker returns a RepeatTimer with the given ticker -// maker. -func NewRepeatTimerWithTickerMaker(name string, dur time.Duration, tm TickerMaker) *RepeatTimer { - var t = &RepeatTimer{ - name: name, - ch: make(chan time.Time), - tm: tm, - dur: dur, - ticker: nil, - quit: nil, - } - t.reset() - return t -} - -// receive ticks on ch, send out on t.ch -func (t *RepeatTimer) fireRoutine(ch <-chan time.Time, quit <-chan struct{}) { - for { - select { - case tick := <-ch: - select { - case t.ch <- tick: - case <-quit: - return - } - case <-quit: // NOTE: `t.quit` races. - return - } - } -} - -func (t *RepeatTimer) Chan() <-chan time.Time { - return t.ch -} - -func (t *RepeatTimer) Stop() { - t.mtx.Lock() - defer t.mtx.Unlock() - - t.stop() -} - -// Wait the duration again before firing. -func (t *RepeatTimer) Reset() { - t.mtx.Lock() - defer t.mtx.Unlock() - - t.reset() -} - -//---------------------------------------- -// Misc. - -// CONTRACT: (non-constructor) caller should hold t.mtx. -func (t *RepeatTimer) reset() { - if t.ticker != nil { - t.stop() - } - t.ticker = t.tm(t.dur) - t.quit = make(chan struct{}) - go t.fireRoutine(t.ticker.Chan(), t.quit) -} - -// CONTRACT: caller should hold t.mtx. -func (t *RepeatTimer) stop() { - if t.ticker == nil { - /* - Similar to the case of closing channels twice: - https://groups.google.com/forum/#!topic/golang-nuts/rhxMiNmRAPk - Stopping a RepeatTimer twice implies that you do - not know whether you are done or not. - If you're calling stop on a stopped RepeatTimer, - you probably have race conditions. - */ - panic("Tried to stop a stopped RepeatTimer") - } - t.ticker.Stop() - t.ticker = nil - /* - From https://golang.org/pkg/time/#Ticker: - "Stop the ticker to release associated resources" - "After Stop, no more ticks will be sent" - So we shouldn't have to do the below. - - select { - case <-t.ch: - // read off channel if there's anything there - default: - } - */ - close(t.quit) -} diff --git a/libs/common/repeat_timer_test.go b/libs/common/repeat_timer_test.go deleted file mode 100644 index f2a7b16c3..000000000 --- a/libs/common/repeat_timer_test.go +++ /dev/null @@ -1,136 +0,0 @@ -package common - -import ( - "sync" - "testing" - "time" - - "github.com/fortytw2/leaktest" - "github.com/stretchr/testify/assert" -) - -func TestDefaultTicker(t *testing.T) { - ticker := defaultTickerMaker(time.Millisecond * 10) - <-ticker.Chan() - ticker.Stop() -} - -func TestRepeatTimer(t *testing.T) { - - ch := make(chan time.Time, 100) - mtx := new(sync.Mutex) - - // tick() fires from start to end - // (exclusive) in milliseconds with incr. - // It locks on mtx, so subsequent calls - // run in series. - tick := func(startMs, endMs, incrMs time.Duration) { - mtx.Lock() - go func() { - for tMs := startMs; tMs < endMs; tMs += incrMs { - lt := time.Time{} - lt = lt.Add(tMs * time.Millisecond) - ch <- lt - } - mtx.Unlock() - }() - } - - // tock consumes Ticker.Chan() events and checks them against the ms in "timesMs". - tock := func(t *testing.T, rt *RepeatTimer, timesMs []int64) { - - // Check against timesMs. - for _, timeMs := range timesMs { - tyme := <-rt.Chan() - sinceMs := tyme.Sub(time.Time{}) / time.Millisecond - assert.Equal(t, timeMs, int64(sinceMs)) - } - - // TODO detect number of running - // goroutines to ensure that - // no other times will fire. - // See https://github.com/tendermint/tendermint/libs/issues/120. - time.Sleep(time.Millisecond * 100) - done := true - select { - case <-rt.Chan(): - done = false - default: - } - assert.True(t, done) - } - - tm := NewLogicalTickerMaker(ch) - rt := NewRepeatTimerWithTickerMaker("bar", time.Second, tm) - - /* NOTE: Useful for debugging deadlocks... - go func() { - time.Sleep(time.Second * 3) - trace := make([]byte, 102400) - count := runtime.Stack(trace, true) - fmt.Printf("Stack of %d bytes: %s\n", count, trace) - }() - */ - - tick(0, 1000, 10) - tock(t, rt, []int64{}) - tick(1000, 2000, 10) - tock(t, rt, []int64{1000}) - tick(2005, 5000, 10) - tock(t, rt, []int64{2005, 3005, 4005}) - tick(5001, 5999, 1) - // Read 5005 instead of 5001 because - // it's 1 second greater than 4005. - tock(t, rt, []int64{5005}) - tick(6000, 7005, 1) - tock(t, rt, []int64{6005}) - tick(7033, 8032, 1) - tock(t, rt, []int64{7033}) - - // After a reset, nothing happens - // until two ticks are received. - rt.Reset() - tock(t, rt, []int64{}) - tick(8040, 8041, 1) - tock(t, rt, []int64{}) - tick(9555, 9556, 1) - tock(t, rt, []int64{9555}) - - // After a stop, nothing more is sent. - rt.Stop() - tock(t, rt, []int64{}) - - // Another stop panics. - assert.Panics(t, func() { rt.Stop() }) -} - -func TestRepeatTimerReset(t *testing.T) { - // check that we are not leaking any go-routines - defer leaktest.Check(t)() - - timer := NewRepeatTimer("test", 20*time.Millisecond) - defer timer.Stop() - - // test we don't receive tick before duration ms. - select { - case <-timer.Chan(): - t.Fatal("did not expect to receive tick") - default: - } - - timer.Reset() - - // test we receive tick after Reset is called - select { - case <-timer.Chan(): - // all good - case <-time.After(40 * time.Millisecond): - t.Fatal("expected to receive tick after reset") - } - - // just random calls - for i := 0; i < 100; i++ { - time.Sleep(time.Duration(RandIntn(40)) * time.Millisecond) - timer.Reset() - } -} diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index c1e90ab76..e0ce062ab 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -95,13 +95,13 @@ type MConnection struct { stopMtx sync.Mutex flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled. - pingTimer *cmn.RepeatTimer // send pings periodically + pingTimer *time.Ticker // send pings periodically // close conn if pong is not received in pongTimeout pongTimer *time.Timer pongTimeoutCh chan bool // true - timeout, false - peer sent pong - chStatsTimer *cmn.RepeatTimer // update channel stats periodically + chStatsTimer *time.Ticker // update channel stats periodically created time.Time // time of creation @@ -201,9 +201,9 @@ func (c *MConnection) OnStart() error { return err } c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle) - c.pingTimer = cmn.NewRepeatTimer("ping", c.config.PingInterval) + c.pingTimer = time.NewTicker(c.config.PingInterval) c.pongTimeoutCh = make(chan bool, 1) - c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats) + c.chStatsTimer = time.NewTicker(updateStats) c.quitSendRoutine = make(chan struct{}) c.doneSendRoutine = make(chan struct{}) go c.sendRoutine() @@ -401,11 +401,11 @@ FOR_LOOP: // NOTE: flushTimer.Set() must be called every time // something is written to .bufConnWriter. c.flush() - case <-c.chStatsTimer.Chan(): + case <-c.chStatsTimer.C: for _, channel := range c.channels { channel.updateStats() } - case <-c.pingTimer.Chan(): + case <-c.pingTimer.C: c.Logger.Debug("Send Ping") _n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPing{}) if err != nil { diff --git a/p2p/switch.go b/p2p/switch.go index a07f70ce9..9e04fe7ce 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -234,21 +234,26 @@ func (sw *Switch) OnStop() { // // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved. func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool { - successChan := make(chan bool, len(sw.peers.List())) sw.Logger.Debug("Broadcast", "channel", chID, "msgBytes", fmt.Sprintf("%X", msgBytes)) + + peers := sw.peers.List() var wg sync.WaitGroup - for _, peer := range sw.peers.List() { - wg.Add(1) - go func(peer Peer) { + wg.Add(len(peers)) + successChan := make(chan bool, len(peers)) + + for _, peer := range peers { + go func(p Peer) { defer wg.Done() - success := peer.Send(chID, msgBytes) + success := p.Send(chID, msgBytes) successChan <- success }(peer) } + go func() { wg.Wait() close(successChan) }() + return successChan } From 03085c2da23b179c4a51f59a03cb40aa4e85a613 Mon Sep 17 00:00:00 2001 From: zjubfd Date: Wed, 20 Mar 2019 08:18:18 +0800 Subject: [PATCH 04/25] rpc: client disable compression (#3430) --- rpc/lib/client/http_client.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rpc/lib/client/http_client.go b/rpc/lib/client/http_client.go index 97b8dfe7b..cfa26e89c 100644 --- a/rpc/lib/client/http_client.go +++ b/rpc/lib/client/http_client.go @@ -74,7 +74,9 @@ func makeHTTPClient(remoteAddr string) (string, *http.Client) { protocol, address, dialer := makeHTTPDialer(remoteAddr) return protocol + "://" + address, &http.Client{ Transport: &http.Transport{ - Dial: dialer, + // Set to true to prevent GZIP-bomb DoS attacks + DisableCompression: true, + Dial: dialer, }, } } From 926127c774a2c9110c4284938411818918ffecac Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 20 Mar 2019 03:59:33 +0300 Subject: [PATCH 05/25] blockchain: update the maxHeight when a peer is removed (#3350) * blockchain: update the maxHeight when a peer is removed Refs #2699 * add a changelog entry * make linter pass --- CHANGELOG_PENDING.md | 2 ++ blockchain/pool.go | 52 ++++++++++++++++++++++++++++++++--------- blockchain/pool_test.go | 46 +++++++++++++++++++++++++++++++++--- 3 files changed, 86 insertions(+), 14 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 3cbc63b7b..de16fcc26 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -20,3 +20,5 @@ ### IMPROVEMENTS: ### BUG FIXES: + +- [blockchain] \#2699 update the maxHeight when a peer is removed diff --git a/blockchain/pool.go b/blockchain/pool.go index 2cb7dda96..c842c0d13 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -69,7 +69,7 @@ type BlockPool struct { height int64 // the lowest key in requesters. // peers peers map[p2p.ID]*bpPeer - maxPeerHeight int64 + maxPeerHeight int64 // the biggest reported height // atomic numPending int32 // number of requests pending assignment or block response @@ -78,6 +78,8 @@ type BlockPool struct { errorsCh chan<- peerError } +// NewBlockPool returns a new BlockPool with the height equal to start. Block +// requests and errors will be sent to requestsCh and errorsCh accordingly. func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- peerError) *BlockPool { bp := &BlockPool{ peers: make(map[p2p.ID]*bpPeer), @@ -93,15 +95,15 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p return bp } +// OnStart implements cmn.Service by spawning requesters routine and recording +// pool's start time. func (pool *BlockPool) OnStart() error { go pool.makeRequestersRoutine() pool.startTime = time.Now() return nil } -func (pool *BlockPool) OnStop() {} - -// Run spawns requesters as needed. +// spawns requesters as needed func (pool *BlockPool) makeRequestersRoutine() { for { if !pool.IsRunning() { @@ -150,6 +152,8 @@ func (pool *BlockPool) removeTimedoutPeers() { } } +// GetStatus returns pool's height, numPending requests and the number of +// requesters. func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequesters int) { pool.mtx.Lock() defer pool.mtx.Unlock() @@ -157,6 +161,7 @@ func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequester return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters) } +// IsCaughtUp returns true if this node is caught up, false - otherwise. // TODO: relax conditions, prevent abuse. func (pool *BlockPool) IsCaughtUp() bool { pool.mtx.Lock() @@ -170,8 +175,9 @@ func (pool *BlockPool) IsCaughtUp() bool { // Some conditions to determine if we're caught up. // Ensures we've either received a block or waited some amount of time, - // and that we're synced to the highest known height. Note we use maxPeerHeight - 1 - // because to sync block H requires block H+1 to verify the LastCommit. + // and that we're synced to the highest known height. + // Note we use maxPeerHeight - 1 because to sync block H requires block H+1 + // to verify the LastCommit. receivedBlockOrTimedOut := pool.height > 0 || time.Since(pool.startTime) > 5*time.Second ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= (pool.maxPeerHeight-1) isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers @@ -260,14 +266,14 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int } } -// MaxPeerHeight returns the highest height reported by a peer. +// MaxPeerHeight returns the highest reported height. func (pool *BlockPool) MaxPeerHeight() int64 { pool.mtx.Lock() defer pool.mtx.Unlock() return pool.maxPeerHeight } -// Sets the peer's alleged blockchain height. +// SetPeerHeight sets the peer's alleged blockchain height. func (pool *BlockPool) SetPeerHeight(peerID p2p.ID, height int64) { pool.mtx.Lock() defer pool.mtx.Unlock() @@ -286,6 +292,8 @@ func (pool *BlockPool) SetPeerHeight(peerID p2p.ID, height int64) { } } +// RemovePeer removes the peer with peerID from the pool. If there's no peer +// with peerID, function is a no-op. func (pool *BlockPool) RemovePeer(peerID p2p.ID) { pool.mtx.Lock() defer pool.mtx.Unlock() @@ -299,10 +307,32 @@ func (pool *BlockPool) removePeer(peerID p2p.ID) { requester.redo(peerID) } } - if p, exist := pool.peers[peerID]; exist && p.timeout != nil { - p.timeout.Stop() + + peer, ok := pool.peers[peerID] + if ok { + if peer.timeout != nil { + peer.timeout.Stop() + } + + delete(pool.peers, peerID) + + // Find a new peer with the biggest height and update maxPeerHeight if the + // peer's height was the biggest. + if peer.height == pool.maxPeerHeight { + pool.updateMaxPeerHeight() + } + } +} + +// If no peers are left, maxPeerHeight is set to 0. +func (pool *BlockPool) updateMaxPeerHeight() { + var max int64 + for _, peer := range pool.peers { + if peer.height > max { + max = peer.height + } } - delete(pool.peers, peerID) + pool.maxPeerHeight = max } // Pick an available peer with at least the given minHeight. diff --git a/blockchain/pool_test.go b/blockchain/pool_test.go index 75a03f631..e24f6131e 100644 --- a/blockchain/pool_test.go +++ b/blockchain/pool_test.go @@ -1,12 +1,15 @@ package blockchain import ( + "fmt" "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" ) @@ -66,7 +69,7 @@ func makePeers(numPeers int, minHeight, maxHeight int64) testPeers { return peers } -func TestBasic(t *testing.T) { +func TestBlockPoolBasic(t *testing.T) { start := int64(42) peers := makePeers(10, start+1, 1000) errorsCh := make(chan peerError, 1000) @@ -122,7 +125,7 @@ func TestBasic(t *testing.T) { } } -func TestTimeout(t *testing.T) { +func TestBlockPoolTimeout(t *testing.T) { start := int64(42) peers := makePeers(10, start+1, 1000) errorsCh := make(chan peerError, 1000) @@ -180,3 +183,40 @@ func TestTimeout(t *testing.T) { } } } + +func TestBlockPoolRemovePeer(t *testing.T) { + peers := make(testPeers, 10) + for i := 0; i < 10; i++ { + peerID := p2p.ID(fmt.Sprintf("%d", i+1)) + height := int64(i + 1) + peers[peerID] = testPeer{peerID, height, make(chan inputData)} + } + requestsCh := make(chan BlockRequest) + errorsCh := make(chan peerError) + + pool := NewBlockPool(1, requestsCh, errorsCh) + pool.SetLogger(log.TestingLogger()) + err := pool.Start() + require.NoError(t, err) + defer pool.Stop() + + // add peers + for peerID, peer := range peers { + pool.SetPeerHeight(peerID, peer.height) + } + assert.EqualValues(t, 10, pool.MaxPeerHeight()) + + // remove not-existing peer + assert.NotPanics(t, func() { pool.RemovePeer(p2p.ID("Superman")) }) + + // remove peer with biggest height + pool.RemovePeer(p2p.ID("10")) + assert.EqualValues(t, 9, pool.MaxPeerHeight()) + + // remove all peers + for peerID := range peers { + pool.RemovePeer(peerID) + } + + assert.EqualValues(t, 0, pool.MaxPeerHeight()) +} From 81b9bdf40010c6a4e336133ec53fe6e4e6089911 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 20 Mar 2019 08:29:40 -0400 Subject: [PATCH 06/25] comments on validator ordering (#3452) * comments on validator ordering * NextValidatorsHash --- docs/spec/blockchain/blockchain.md | 2 ++ rpc/core/consensus.go | 2 ++ types/validator_set.go | 3 ++- 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/docs/spec/blockchain/blockchain.md b/docs/spec/blockchain/blockchain.md index 00cccfc2e..60a07d42a 100644 --- a/docs/spec/blockchain/blockchain.md +++ b/docs/spec/blockchain/blockchain.md @@ -332,6 +332,7 @@ block.ValidatorsHash == MerkleRoot(state.Validators) MerkleRoot of the current validator set that is committing the block. This can be used to validate the `LastCommit` included in the next block. +Note the validators are sorted by their address before computing the MerkleRoot. ### NextValidatorsHash @@ -342,6 +343,7 @@ block.NextValidatorsHash == MerkleRoot(state.NextValidators) MerkleRoot of the next validator set that will be the validator set that commits the next block. This is included so that the current validator set gets a chance to sign the next validator sets Merkle root. +Note the validators are sorted by their address before computing the MerkleRoot. ### ConsensusHash diff --git a/rpc/core/consensus.go b/rpc/core/consensus.go index b8a91f107..3850999d3 100644 --- a/rpc/core/consensus.go +++ b/rpc/core/consensus.go @@ -10,6 +10,8 @@ import ( // Get the validator set at the given block height. // If no height is provided, it will fetch the current validator set. +// Note the validators are sorted by their address - this is the canonical +// order for the validators in the set as used in computing their Merkle root. // // ```shell // curl 'localhost:26657/validators' diff --git a/types/validator_set.go b/types/validator_set.go index 3d31cf7d0..36ce67f06 100644 --- a/types/validator_set.go +++ b/types/validator_set.go @@ -31,7 +31,8 @@ const ( // ValidatorSet represent a set of *Validator at a given height. // The validators can be fetched by address or index. // The index is in order of .Address, so the indices are fixed -// for all rounds of a given blockchain height. +// for all rounds of a given blockchain height - ie. the validators +// are sorted by their address. // On the other hand, the .ProposerPriority of each validator and // the designated .GetProposer() of a set changes every round, // upon calling .IncrementProposerPriority(). From 660bd4a53e0dd7642a473689c8686c3f83e3a0ca Mon Sep 17 00:00:00 2001 From: tracebundy <745403419@qq.com> Date: Wed, 20 Mar 2019 20:30:49 +0800 Subject: [PATCH 07/25] fix comment (#3454) --- libs/common/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/common/service.go b/libs/common/service.go index 96a5e632a..21fb0df3e 100644 --- a/libs/common/service.go +++ b/libs/common/service.go @@ -209,7 +209,7 @@ func (bs *BaseService) Wait() { <-bs.quit } -// String implements Servce by returning a string representation of the service. +// String implements Service by returning a string representation of the service. func (bs *BaseService) String() string { return bs.name } From 1d4afb179b9660bf13705c67b01e20838a4506eb Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 21 Mar 2019 11:05:39 +0100 Subject: [PATCH 08/25] replace PB2TM.ConsensusParams with a call to params#Update (#3448) Fixes #3444 --- consensus/replay.go | 2 +- types/protobuf.go | 33 --------------------------------- types/protobuf_test.go | 2 +- 3 files changed, 2 insertions(+), 35 deletions(-) diff --git a/consensus/replay.go b/consensus/replay.go index c8ab8a331..e47d4892a 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -324,7 +324,7 @@ func (h *Handshaker) ReplayBlocks( } if res.ConsensusParams != nil { - state.ConsensusParams = types.PB2TM.ConsensusParams(res.ConsensusParams, state.ConsensusParams.Block.TimeIotaMs) + state.ConsensusParams = state.ConsensusParams.Update(res.ConsensusParams) } sm.SaveState(h.stateDB, state) } diff --git a/types/protobuf.go b/types/protobuf.go index e10b91869..c87e82c0a 100644 --- a/types/protobuf.go +++ b/types/protobuf.go @@ -220,36 +220,3 @@ func (pb2tm) ValidatorUpdates(vals []abci.ValidatorUpdate) ([]*Validator, error) } return tmVals, nil } - -// BlockParams.TimeIotaMs is not exposed to the application. Therefore a caller -// must provide it. -func (pb2tm) ConsensusParams(csp *abci.ConsensusParams, blockTimeIotaMs int64) ConsensusParams { - params := ConsensusParams{ - Block: BlockParams{}, - Evidence: EvidenceParams{}, - Validator: ValidatorParams{}, - } - - // we must defensively consider any structs may be nil - if csp.Block != nil { - params.Block = BlockParams{ - MaxBytes: csp.Block.MaxBytes, - MaxGas: csp.Block.MaxGas, - TimeIotaMs: blockTimeIotaMs, - } - } - - if csp.Evidence != nil { - params.Evidence = EvidenceParams{ - MaxAge: csp.Evidence.MaxAge, - } - } - - if csp.Validator != nil { - params.Validator = ValidatorParams{ - PubKeyTypes: csp.Validator.PubKeyTypes, - } - } - - return params -} diff --git a/types/protobuf_test.go b/types/protobuf_test.go index 152c92d12..64caa3f4c 100644 --- a/types/protobuf_test.go +++ b/types/protobuf_test.go @@ -64,7 +64,7 @@ func TestABCIValidators(t *testing.T) { func TestABCIConsensusParams(t *testing.T) { cp := DefaultConsensusParams() abciCP := TM2PB.ConsensusParams(cp) - cp2 := PB2TM.ConsensusParams(abciCP, cp.Block.TimeIotaMs) + cp2 := cp.Update(abciCP) assert.Equal(t, *cp, cp2) } From 85be2a554e7e7752bed0b9409ab153bf04e05e7b Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Fri, 22 Mar 2019 09:16:38 -0400 Subject: [PATCH 09/25] tools/tm-signer-harness: update height and round for test harness (#3466) In order to re-enable the test harness for the KMS (see tendermint/kms#227), we need some marginally more realistic proposals and votes. This is because the KMS does some additional sanity checks now to ensure the height and round are increasing over time. --- tools/tm-signer-harness/internal/test_harness.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tools/tm-signer-harness/internal/test_harness.go b/tools/tm-signer-harness/internal/test_harness.go index 005489133..7fefdfb42 100644 --- a/tools/tm-signer-harness/internal/test_harness.go +++ b/tools/tm-signer-harness/internal/test_harness.go @@ -198,8 +198,8 @@ func (th *TestHarness) TestSignProposal() error { hash := tmhash.Sum([]byte("hash")) prop := &types.Proposal{ Type: types.ProposalType, - Height: 12345, - Round: 23456, + Height: 100, + Round: 0, POLRound: -1, BlockID: types.BlockID{ Hash: hash, @@ -240,8 +240,8 @@ func (th *TestHarness) TestSignVote() error { hash := tmhash.Sum([]byte("hash")) vote := &types.Vote{ Type: voteType, - Height: 12345, - Round: 23456, + Height: 101, + Round: 0, BlockID: types.BlockID{ Hash: hash, PartsHeader: types.PartSetHeader{ From 25a3c8b1724c9611d6edc175b1b0d079f5ee28c1 Mon Sep 17 00:00:00 2001 From: zjubfd Date: Sun, 24 Mar 2019 01:08:15 +0800 Subject: [PATCH 10/25] rpc: support tls rpc (#3469) Refs #3419 --- CHANGELOG_PENDING.md | 1 + config/config.go | 29 +++++++++++++++++++++++++++ config/toml.go | 11 ++++++++++ docs/tendermint-core/configuration.md | 11 ++++++++++ node/node.go | 23 +++++++++++++++------ 5 files changed, 69 insertions(+), 6 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index de16fcc26..7cf3ab4e5 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -16,6 +16,7 @@ * P2P Protocol ### FEATURES: +- [rpc] \#3419 Start HTTPS server if `rpc.tls_cert_file` and `rpc.tls_key_file` are provided in the config (@guagualvcha) ### IMPROVEMENTS: diff --git a/config/config.go b/config/config.go index 8342921a6..3ac22adbf 100644 --- a/config/config.go +++ b/config/config.go @@ -339,6 +339,20 @@ type RPCConfig struct { // global HTTP write timeout, which applies to all connections and endpoints. // See https://github.com/tendermint/tendermint/issues/3435 TimeoutBroadcastTxCommit time.Duration `mapstructure:"timeout_broadcast_tx_commit"` + + // The name of a file containing certificate that is used to create the HTTPS server. + // + // If the certificate is signed by a certificate authority, + // the certFile should be the concatenation of the server's certificate, any intermediates, + // and the CA's certificate. + // + // NOTE: both tls_cert_file and tls_key_file must be present for Tendermint to create HTTPS server. Otherwise, HTTP server is run. + TLSCertFile string `mapstructure:"tls_cert_file"` + + // The name of a file containing matching private key that is used to create the HTTPS server. + // + // NOTE: both tls_cert_file and tls_key_file must be present for Tendermint to create HTTPS server. Otherwise, HTTP server is run. + TLSKeyFile string `mapstructure:"tls_key_file"` } // DefaultRPCConfig returns a default configuration for the RPC server @@ -357,6 +371,9 @@ func DefaultRPCConfig() *RPCConfig { MaxSubscriptionClients: 100, MaxSubscriptionsPerClient: 5, TimeoutBroadcastTxCommit: 10 * time.Second, + + TLSCertFile: "", + TLSKeyFile: "", } } @@ -395,6 +412,18 @@ func (cfg *RPCConfig) IsCorsEnabled() bool { return len(cfg.CORSAllowedOrigins) != 0 } +func (cfg RPCConfig) KeyFile() string { + return rootify(filepath.Join(defaultConfigDir, cfg.TLSKeyFile), cfg.RootDir) +} + +func (cfg RPCConfig) CertFile() string { + return rootify(filepath.Join(defaultConfigDir, cfg.TLSCertFile), cfg.RootDir) +} + +func (cfg RPCConfig) IsTLSEnabled() bool { + return cfg.TLSCertFile != "" && cfg.TLSKeyFile != "" +} + //----------------------------------------------------------------------------- // P2PConfig diff --git a/config/toml.go b/config/toml.go index a0b651d99..978255aba 100644 --- a/config/toml.go +++ b/config/toml.go @@ -181,6 +181,17 @@ max_subscriptions_per_client = {{ .RPC.MaxSubscriptionsPerClient }} # See https://github.com/tendermint/tendermint/issues/3435 timeout_broadcast_tx_commit = "{{ .RPC.TimeoutBroadcastTxCommit }}" +# The name of a file containing certificate that is used to create the HTTPS server. +# If the certificate is signed by a certificate authority, +# the certFile should be the concatenation of the server's certificate, any intermediates, +# and the CA's certificate. +# NOTE: both tls_cert_file and tls_key_file must be present for Tendermint to create HTTPS server. Otherwise, HTTP server is run. +tls_cert_file = "{{ .RPC.TLSCertFile }}" + +# The name of a file containing matching private key that is used to create the HTTPS server. +# NOTE: both tls_cert_file and tls_key_file must be present for Tendermint to create HTTPS server. Otherwise, HTTP server is run. +tls_key_file = "{{ .RPC.TLSKeyFile }}" + ##### peer to peer configuration options ##### [p2p] diff --git a/docs/tendermint-core/configuration.md b/docs/tendermint-core/configuration.md index aa275c7a1..d19c272fc 100644 --- a/docs/tendermint-core/configuration.md +++ b/docs/tendermint-core/configuration.md @@ -127,6 +127,17 @@ max_subscriptions_per_client = 5 # See https://github.com/tendermint/tendermint/issues/3435 timeout_broadcast_tx_commit = "10s" +# The name of a file containing certificate that is used to create the HTTPS server. +# If the certificate is signed by a certificate authority, +# the certFile should be the concatenation of the server's certificate, any intermediates, +# and the CA's certificate. +# NOTE: both tls_cert_file and tls_key_file must be present for Tendermint to create HTTPS server. Otherwise, HTTP server is run. +tls_cert_file = "" + +# The name of a file containing matching private key that is used to create the HTTPS server. +# NOTE: both tls_cert_file and tls_key_file must be present for Tendermint to create HTTPS server. Otherwise, HTTP server is run. +tls_key_file = "" + ##### peer to peer configuration options ##### [p2p] diff --git a/node/node.go b/node/node.go index 8f71fa31a..3501b6a7a 100644 --- a/node/node.go +++ b/node/node.go @@ -715,13 +715,24 @@ func (n *Node) startRPC() ([]net.Listener, error) { }) rootHandler = corsMiddleware.Handler(mux) } + if n.config.RPC.IsTLSEnabled() { + go rpcserver.StartHTTPAndTLSServer( + listener, + rootHandler, + n.config.RPC.CertFile(), + n.config.RPC.KeyFile(), + rpcLogger, + config, + ) + } else { + go rpcserver.StartHTTPServer( + listener, + rootHandler, + rpcLogger, + config, + ) + } - go rpcserver.StartHTTPServer( - listener, - rootHandler, - rpcLogger, - config, - ) listeners[i] = listener } From 6de7effb05581f9bea2e8af06e4e74a85c34bc5f Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Tue, 26 Mar 2019 01:27:29 -0700 Subject: [PATCH 11/25] mempool no gossip back (#2778) Closes #1798 This is done by making every mempool tx maintain a list of peers who its received the tx from. Instead of using the 20byte peer ID, it instead uses a local map from peerID to uint16 counter, so every peer adds 2 bytes. (Word aligned to probably make it 8 bytes) This also required resetting the callback function on every CheckTx. This likely has performance ramifications for instruction caching. The actual setting operation isn't costly with the removal of defers in this PR. * Make the mempool not gossip txs back to peers its received it from * Fix adversarial memleak * Don't break interface * Update changelog * Forgot to add a mtx * forgot a mutex * Update mempool/reactor.go Co-Authored-By: ValarDragon * Update mempool/mempool.go Co-Authored-By: ValarDragon * Use unknown peer ID Co-Authored-By: ValarDragon * fix compilation * use next wait chan logic when skipping * Minor fixes * Add TxInfo * Add reverse map * Make activeID's auto-reserve 0 * 0 -> UnknownPeerID Co-Authored-By: ValarDragon * Switch to making the normal case set a callback on the reqres object The recheck case is still done via the global callback, and stats are also set via global callback * fix merge conflict * Addres comments * Add cache tests * add cache tests * minor fixes * update metrics in reqResCb and reformat code * goimport -w mempool/reactor.go * mempool: update memTx senders I had to introduce txsMap for quick mempoolTx lookups. * change senders type from []uint16 to sync.Map Fixes DATA RACE: ``` Read at 0x00c0013fcd3a by goroutine 183: github.com/tendermint/tendermint/mempool.(*MempoolReactor).broadcastTxRoutine() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:195 +0x3c7 Previous write at 0x00c0013fcd3a by D[2019-02-27|10:10:49.058] Read PacketMsg switch=3 peer=35bc1e3558c182927b31987eeff3feb3d58a0fc5@127.0.0.1 :46552 conn=MConn{pipe} packet="PacketMsg{30:2B06579D0A143EB78F3D3299DE8213A51D4E11FB05ACE4D6A14F T:1}" goroutine 190: github.com/tendermint/tendermint/mempool.(*Mempool).CheckTxWithInfo() /go/src/github.com/tendermint/tendermint/mempool/mempool.go:387 +0xdc1 github.com/tendermint/tendermint/mempool.(*MempoolReactor).Receive() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:134 +0xb04 github.com/tendermint/tendermint/p2p.createMConnection.func1() /go/src/github.com/tendermint/tendermint/p2p/peer.go:374 +0x25b github.com/tendermint/tendermint/p2p/conn.(*MConnection).recvRoutine() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:599 +0xcce Goroutine 183 (running) created at: D[2019-02-27|10:10:49.058] Send switch=2 peer=1efafad5443abeea4b7a8155218e4369525d987e@127.0.0.1:46193 channel=48 conn=MConn{pipe} m sgBytes=2B06579D0A146194480ADAE00C2836ED7125FEE65C1D9DD51049 github.com/tendermint/tendermint/mempool.(*MempoolReactor).AddPeer() /go/src/github.com/tendermint/tendermint/mempool/reactor.go:105 +0x1b1 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:683 +0x13b github.com/tendermint/tendermint/p2p.(*Switch).addPeer() /go/src/github.com/tendermint/tendermint/p2p/switch.go:650 +0x585 github.com/tendermint/tendermint/p2p.(*Switch).addPeerWithConnection() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:145 +0x939 github.com/tendermint/tendermint/p2p.Connect2Switches.func2() /go/src/github.com/tendermint/tendermint/p2p/test_util.go:109 +0x50 I[2019-02-27|10:10:49.058] Added good transaction validator=0 tx=43B4D1F0F03460BD262835C4AA560DB860CFBBE85BD02386D83DAC38C67B3AD7 res="&{CheckTx:gas_w anted:1 }" height=0 total=375 Goroutine 190 (running) created at: github.com/tendermint/tendermint/p2p/conn.(*MConnection).OnStart() /go/src/github.com/tendermint/tendermint/p2p/conn/connection.go:210 +0x313 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).OnStart() /go/src/github.com/tendermint/tendermint/p2p/peer.go:179 +0x56 github.com/tendermint/tendermint/libs/common.(*BaseService).Start() /go/src/github.com/tendermint/tendermint/libs/common/service.go:139 +0x4df github.com/tendermint/tendermint/p2p.(*peer).Start() :1 +0x43 github.com/tendermint/tendermint/p2p.(*Switch).startInitPeer() ``` * explain the choice of a map DS for senders * extract ids pool/mapper to a separate struct * fix literal copies lock value from senders: sync.Map contains sync.Mutex * use sync.Map#LoadOrStore instead of Load * fixes after Ismail's review * rename resCbNormal to resCbFirstTime --- CHANGELOG_PENDING.md | 2 + docs/spec/reactors/mempool/reactor.md | 2 + mempool/bench_test.go | 13 +++ mempool/cache_test.go | 101 +++++++++++++++++ mempool/mempool.go | 153 ++++++++++++++++++++------ mempool/mempool_test.go | 42 ++----- mempool/reactor.go | 88 +++++++++++++-- mempool/reactor_test.go | 30 ++++- state/services.go | 17 ++- 9 files changed, 362 insertions(+), 86 deletions(-) create mode 100644 mempool/cache_test.go diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 7cf3ab4e5..bebc3e6a6 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -20,6 +20,8 @@ ### IMPROVEMENTS: +- [mempool] \#2778 No longer send txs back to peers who sent it to you + ### BUG FIXES: - [blockchain] \#2699 update the maxHeight when a peer is removed diff --git a/docs/spec/reactors/mempool/reactor.md b/docs/spec/reactors/mempool/reactor.md index fa25eeb3e..d0b19f7ca 100644 --- a/docs/spec/reactors/mempool/reactor.md +++ b/docs/spec/reactors/mempool/reactor.md @@ -12,3 +12,5 @@ for details. Sending incorrectly encoded data or data exceeding `maxMsgSize` will result in stopping the peer. + +The mempool will not send a tx back to any peer which it received it from. \ No newline at end of file diff --git a/mempool/bench_test.go b/mempool/bench_test.go index 8936f8dfb..0cd394cd6 100644 --- a/mempool/bench_test.go +++ b/mempool/bench_test.go @@ -26,6 +26,19 @@ func BenchmarkReap(b *testing.B) { } } +func BenchmarkCheckTx(b *testing.B) { + app := kvstore.NewKVStoreApplication() + cc := proxy.NewLocalClientCreator(app) + mempool, cleanup := newMempoolWithApp(cc) + defer cleanup() + + for i := 0; i < b.N; i++ { + tx := make([]byte, 8) + binary.BigEndian.PutUint64(tx, uint64(i)) + mempool.CheckTx(tx, nil) + } +} + func BenchmarkCacheInsertTime(b *testing.B) { cache := newMapTxCache(b.N) txs := make([][]byte, b.N) diff --git a/mempool/cache_test.go b/mempool/cache_test.go new file mode 100644 index 000000000..26e560b6e --- /dev/null +++ b/mempool/cache_test.go @@ -0,0 +1,101 @@ +package mempool + +import ( + "crypto/rand" + "crypto/sha256" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/tendermint/tendermint/abci/example/kvstore" + "github.com/tendermint/tendermint/proxy" + "github.com/tendermint/tendermint/types" +) + +func TestCacheRemove(t *testing.T) { + cache := newMapTxCache(100) + numTxs := 10 + txs := make([][]byte, numTxs) + for i := 0; i < numTxs; i++ { + // probability of collision is 2**-256 + txBytes := make([]byte, 32) + rand.Read(txBytes) + txs[i] = txBytes + cache.Push(txBytes) + // make sure its added to both the linked list and the map + require.Equal(t, i+1, len(cache.map_)) + require.Equal(t, i+1, cache.list.Len()) + } + for i := 0; i < numTxs; i++ { + cache.Remove(txs[i]) + // make sure its removed from both the map and the linked list + require.Equal(t, numTxs-(i+1), len(cache.map_)) + require.Equal(t, numTxs-(i+1), cache.list.Len()) + } +} + +func TestCacheAfterUpdate(t *testing.T) { + app := kvstore.NewKVStoreApplication() + cc := proxy.NewLocalClientCreator(app) + mempool, cleanup := newMempoolWithApp(cc) + defer cleanup() + + // reAddIndices & txsInCache can have elements > numTxsToCreate + // also assumes max index is 255 for convenience + // txs in cache also checks order of elements + tests := []struct { + numTxsToCreate int + updateIndices []int + reAddIndices []int + txsInCache []int + }{ + {1, []int{}, []int{1}, []int{1, 0}}, // adding new txs works + {2, []int{1}, []int{}, []int{1, 0}}, // update doesn't remove tx from cache + {2, []int{2}, []int{}, []int{2, 1, 0}}, // update adds new tx to cache + {2, []int{1}, []int{1}, []int{1, 0}}, // re-adding after update doesn't make dupe + } + for tcIndex, tc := range tests { + for i := 0; i < tc.numTxsToCreate; i++ { + tx := types.Tx{byte(i)} + err := mempool.CheckTx(tx, nil) + require.NoError(t, err) + } + + updateTxs := []types.Tx{} + for _, v := range tc.updateIndices { + tx := types.Tx{byte(v)} + updateTxs = append(updateTxs, tx) + } + mempool.Update(int64(tcIndex), updateTxs, nil, nil) + + for _, v := range tc.reAddIndices { + tx := types.Tx{byte(v)} + _ = mempool.CheckTx(tx, nil) + } + + cache := mempool.cache.(*mapTxCache) + node := cache.list.Front() + counter := 0 + for node != nil { + require.NotEqual(t, len(tc.txsInCache), counter, + "cache larger than expected on testcase %d", tcIndex) + + nodeVal := node.Value.([sha256.Size]byte) + expectedBz := sha256.Sum256([]byte{byte(tc.txsInCache[len(tc.txsInCache)-counter-1])}) + // Reference for reading the errors: + // >>> sha256('\x00').hexdigest() + // '6e340b9cffb37a989ca544e6bb780a2c78901d3fb33738768511a30617afa01d' + // >>> sha256('\x01').hexdigest() + // '4bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a' + // >>> sha256('\x02').hexdigest() + // 'dbc1b4c900ffe48d575b5da5c638040125f65db0fe3e24494b76ea986457d986' + + require.Equal(t, expectedBz, nodeVal, "Equality failed on index %d, tc %d", counter, tcIndex) + counter++ + node = node.Next() + } + require.Equal(t, len(tc.txsInCache), counter, + "cache smaller than expected on testcase %d", tcIndex) + mempool.Flush() + } +} diff --git a/mempool/mempool.go b/mempool/mempool.go index 41ee59cb4..2064b7bce 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -31,6 +31,14 @@ type PreCheckFunc func(types.Tx) error // transaction doesn't require more gas than available for the block. type PostCheckFunc func(types.Tx, *abci.ResponseCheckTx) error +// TxInfo are parameters that get passed when attempting to add a tx to the +// mempool. +type TxInfo struct { + // We don't use p2p.ID here because it's too big. The gain is to store max 2 + // bytes with each tx to identify the sender rather than 20 bytes. + PeerID uint16 +} + /* The mempool pushes new txs onto the proxyAppConn. @@ -148,9 +156,12 @@ func TxID(tx []byte) string { type Mempool struct { config *cfg.MempoolConfig - proxyMtx sync.Mutex - proxyAppConn proxy.AppConnMempool - txs *clist.CList // concurrent linked-list of good txs + proxyMtx sync.Mutex + proxyAppConn proxy.AppConnMempool + txs *clist.CList // concurrent linked-list of good txs + // map for quick access to txs + // Used in CheckTx to record the tx sender. + txsMap map[[sha256.Size]byte]*clist.CElement height int64 // the last block Update()'d to rechecking int32 // for re-checking filtered txs on Update() recheckCursor *clist.CElement // next expected response @@ -161,7 +172,10 @@ type Mempool struct { postCheck PostCheckFunc // Atomic integers - txsBytes int64 // see TxsBytes + + // Used to check if the mempool size is bigger than the allowed limit. + // See TxsBytes + txsBytes int64 // Keep a cache of already-seen txs. // This reduces the pressure on the proxyApp. @@ -189,6 +203,7 @@ func NewMempool( config: config, proxyAppConn: proxyAppConn, txs: clist.New(), + txsMap: make(map[[sha256.Size]byte]*clist.CElement), height: height, rechecking: 0, recheckCursor: nil, @@ -286,8 +301,8 @@ func (mem *Mempool) TxsBytes() int64 { return atomic.LoadInt64(&mem.txsBytes) } -// FlushAppConn flushes the mempool connection to ensure async resCb calls are -// done e.g. from CheckTx. +// FlushAppConn flushes the mempool connection to ensure async reqResCb calls are +// done. E.g. from CheckTx. func (mem *Mempool) FlushAppConn() error { return mem.proxyAppConn.FlushSync() } @@ -304,6 +319,7 @@ func (mem *Mempool) Flush() { e.DetachPrev() } + mem.txsMap = make(map[[sha256.Size]byte]*clist.CElement) _ = atomic.SwapInt64(&mem.txsBytes, 0) } @@ -327,6 +343,13 @@ func (mem *Mempool) TxsWaitChan() <-chan struct{} { // It gets called from another goroutine. // CONTRACT: Either cb will get called, or err returned. func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) { + return mem.CheckTxWithInfo(tx, cb, TxInfo{PeerID: UnknownPeerID}) +} + +// CheckTxWithInfo performs the same operation as CheckTx, but with extra meta data about the tx. +// Currently this metadata is the peer who sent it, +// used to prevent the tx from being gossiped back to them. +func (mem *Mempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) (err error) { mem.proxyMtx.Lock() // use defer to unlock mutex because application (*local client*) might panic defer mem.proxyMtx.Unlock() @@ -357,6 +380,17 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) { // CACHE if !mem.cache.Push(tx) { + // record the sender + e, ok := mem.txsMap[sha256.Sum256(tx)] + if ok { // tx may be in cache, but not in the mempool + memTx := e.Value.(*mempoolTx) + if _, loaded := memTx.senders.LoadOrStore(txInfo.PeerID, true); loaded { + // TODO: consider punishing peer for dups, + // its non-trivial since invalid txs can become valid, + // but they can spam the same tx with little cost to them atm. + } + } + return ErrTxInCache } // END CACHE @@ -381,27 +415,77 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) { } reqRes := mem.proxyAppConn.CheckTxAsync(tx) if cb != nil { - reqRes.SetCallback(cb) + composedCallback := func(res *abci.Response) { + mem.reqResCb(tx, txInfo.PeerID)(res) + cb(res) + } + reqRes.SetCallback(composedCallback) + } else { + reqRes.SetCallback(mem.reqResCb(tx, txInfo.PeerID)) } return nil } -// ABCI callback function +// Global callback, which is called in the absence of the specific callback. +// +// In recheckTxs because no reqResCb (specific) callback is set, this callback +// will be called. func (mem *Mempool) resCb(req *abci.Request, res *abci.Response) { if mem.recheckCursor == nil { - mem.resCbNormal(req, res) - } else { - mem.metrics.RecheckTimes.Add(1) - mem.resCbRecheck(req, res) + return } + + mem.metrics.RecheckTimes.Add(1) + mem.resCbRecheck(req, res) + + // update metrics mem.metrics.Size.Set(float64(mem.Size())) } -func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) { +// Specific callback, which allows us to incorporate local information, like +// the peer that sent us this tx, so we can avoid sending it back to the same +// peer. +// +// Used in CheckTxWithInfo to record PeerID who sent us the tx. +func (mem *Mempool) reqResCb(tx []byte, peerID uint16) func(res *abci.Response) { + return func(res *abci.Response) { + if mem.recheckCursor != nil { + return + } + + mem.resCbFirstTime(tx, peerID, res) + + // update metrics + mem.metrics.Size.Set(float64(mem.Size())) + } +} + +func (mem *Mempool) addTx(memTx *mempoolTx) { + e := mem.txs.PushBack(memTx) + mem.txsMap[sha256.Sum256(memTx.tx)] = e + atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx))) + mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx))) +} + +func (mem *Mempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromCache bool) { + mem.txs.Remove(elem) + elem.DetachPrev() + delete(mem.txsMap, sha256.Sum256(tx)) + atomic.AddInt64(&mem.txsBytes, int64(-len(tx))) + + if removeFromCache { + mem.cache.Remove(tx) + } +} + +// callback, which is called after the app checked the tx for the first time. +// +// The case where the app checks the tx for the second and subsequent times is +// handled by the resCbRecheck callback. +func (mem *Mempool) resCbFirstTime(tx []byte, peerID uint16, res *abci.Response) { switch r := res.Value.(type) { case *abci.Response_CheckTx: - tx := req.GetCheckTx().Tx var postCheckErr error if mem.postCheck != nil { postCheckErr = mem.postCheck(tx, r.CheckTx) @@ -412,15 +496,14 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) { gasWanted: r.CheckTx.GasWanted, tx: tx, } - mem.txs.PushBack(memTx) - atomic.AddInt64(&mem.txsBytes, int64(len(tx))) + memTx.senders.Store(peerID, true) + mem.addTx(memTx) mem.logger.Info("Added good transaction", "tx", TxID(tx), "res", r, "height", memTx.height, "total", mem.Size(), ) - mem.metrics.TxSizeBytes.Observe(float64(len(tx))) mem.notifyTxsAvailable() } else { // ignore bad transaction @@ -434,6 +517,10 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) { } } +// callback, which is called after the app rechecked the tx. +// +// The case where the app checks the tx for the first time is handled by the +// resCbFirstTime callback. func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) { switch r := res.Value.(type) { case *abci.Response_CheckTx: @@ -454,12 +541,8 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) { } else { // Tx became invalidated due to newly committed block. mem.logger.Info("Tx is no longer valid", "tx", TxID(tx), "res", r, "err", postCheckErr) - mem.txs.Remove(mem.recheckCursor) - atomic.AddInt64(&mem.txsBytes, int64(-len(tx))) - mem.recheckCursor.DetachPrev() - - // remove from cache (it might be good later) - mem.cache.Remove(tx) + // NOTE: we remove tx from the cache because it might be good later + mem.removeTx(tx, mem.recheckCursor, true) } if mem.recheckCursor == mem.recheckEnd { mem.recheckCursor = nil @@ -627,12 +710,9 @@ func (mem *Mempool) removeTxs(txs types.Txs) []types.Tx { memTx := e.Value.(*mempoolTx) // Remove the tx if it's already in a block. if _, ok := txsMap[string(memTx.tx)]; ok { - // remove from clist - mem.txs.Remove(e) - atomic.AddInt64(&mem.txsBytes, int64(-len(memTx.tx))) - e.DetachPrev() - // NOTE: we don't remove committed txs from the cache. + mem.removeTx(memTx.tx, e, false) + continue } txsLeft = append(txsLeft, memTx.tx) @@ -650,7 +730,7 @@ func (mem *Mempool) recheckTxs(txs []types.Tx) { mem.recheckEnd = mem.txs.Back() // Push txs to proxyAppConn - // NOTE: resCb() may be called concurrently. + // NOTE: reqResCb may be called concurrently. for _, tx := range txs { mem.proxyAppConn.CheckTxAsync(tx) } @@ -663,6 +743,7 @@ func (mem *Mempool) recheckTxs(txs []types.Tx) { type mempoolTx struct { height int64 // height that this tx had been validated in gasWanted int64 // amount of gas this tx states it will require + senders sync.Map // ids of peers who've sent us this tx (as a map for quick lookups) tx types.Tx // } @@ -679,13 +760,13 @@ type txCache interface { Remove(tx types.Tx) } -// mapTxCache maintains a cache of transactions. This only stores -// the hash of the tx, due to memory concerns. +// mapTxCache maintains a LRU cache of transactions. This only stores the hash +// of the tx, due to memory concerns. type mapTxCache struct { mtx sync.Mutex size int map_ map[[sha256.Size]byte]*list.Element - list *list.List // to remove oldest tx when cache gets too big + list *list.List } var _ txCache = (*mapTxCache)(nil) @@ -707,8 +788,8 @@ func (cache *mapTxCache) Reset() { cache.mtx.Unlock() } -// Push adds the given tx to the cache and returns true. It returns false if tx -// is already in the cache. +// Push adds the given tx to the cache and returns true. It returns +// false if tx is already in the cache. func (cache *mapTxCache) Push(tx types.Tx) bool { cache.mtx.Lock() defer cache.mtx.Unlock() @@ -728,8 +809,8 @@ func (cache *mapTxCache) Push(tx types.Tx) bool { cache.list.Remove(popped) } } - cache.list.PushBack(txHash) - cache.map_[txHash] = cache.list.Back() + e := cache.list.PushBack(txHash) + cache.map_[txHash] = e return true } diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index 5928fbc56..dc7d595af 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -12,9 +12,10 @@ import ( "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + amino "github.com/tendermint/go-amino" + "github.com/tendermint/tendermint/abci/example/counter" "github.com/tendermint/tendermint/abci/example/kvstore" abci "github.com/tendermint/tendermint/abci/types" @@ -63,8 +64,9 @@ func ensureFire(t *testing.T, ch <-chan struct{}, timeoutMS int) { } } -func checkTxs(t *testing.T, mempool *Mempool, count int) types.Txs { +func checkTxs(t *testing.T, mempool *Mempool, count int, peerID uint16) types.Txs { txs := make(types.Txs, count) + txInfo := TxInfo{PeerID: peerID} for i := 0; i < count; i++ { txBytes := make([]byte, 20) txs[i] = txBytes @@ -72,7 +74,7 @@ func checkTxs(t *testing.T, mempool *Mempool, count int) types.Txs { if err != nil { t.Error(err) } - if err := mempool.CheckTx(txBytes, nil); err != nil { + if err := mempool.CheckTxWithInfo(txBytes, nil, txInfo); err != nil { // Skip invalid txs. // TestMempoolFilters will fail otherwise. It asserts a number of txs // returned. @@ -92,7 +94,7 @@ func TestReapMaxBytesMaxGas(t *testing.T) { defer cleanup() // Ensure gas calculation behaves as expected - checkTxs(t, mempool, 1) + checkTxs(t, mempool, 1, UnknownPeerID) tx0 := mempool.TxsFront().Value.(*mempoolTx) // assert that kv store has gas wanted = 1. require.Equal(t, app.CheckTx(tx0.tx).GasWanted, int64(1), "KVStore had a gas value neq to 1") @@ -126,7 +128,7 @@ func TestReapMaxBytesMaxGas(t *testing.T) { {20, 20000, 30, 20}, } for tcIndex, tt := range tests { - checkTxs(t, mempool, tt.numTxsToCreate) + checkTxs(t, mempool, tt.numTxsToCreate, UnknownPeerID) got := mempool.ReapMaxBytesMaxGas(tt.maxBytes, tt.maxGas) assert.Equal(t, tt.expectedNumTxs, len(got), "Got %d txs, expected %d, tc #%d", len(got), tt.expectedNumTxs, tcIndex) @@ -167,7 +169,7 @@ func TestMempoolFilters(t *testing.T) { } for tcIndex, tt := range tests { mempool.Update(1, emptyTxArr, tt.preFilter, tt.postFilter) - checkTxs(t, mempool, tt.numTxsToCreate) + checkTxs(t, mempool, tt.numTxsToCreate, UnknownPeerID) require.Equal(t, tt.expectedNumTxs, mempool.Size(), "mempool had the incorrect size, on test case %d", tcIndex) mempool.Flush() } @@ -198,7 +200,7 @@ func TestTxsAvailable(t *testing.T) { ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) // send a bunch of txs, it should only fire once - txs := checkTxs(t, mempool, 100) + txs := checkTxs(t, mempool, 100, UnknownPeerID) ensureFire(t, mempool.TxsAvailable(), timeoutMS) ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) @@ -213,7 +215,7 @@ func TestTxsAvailable(t *testing.T) { ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) // send a bunch more txs. we already fired for this height so it shouldnt fire again - moreTxs := checkTxs(t, mempool, 50) + moreTxs := checkTxs(t, mempool, 50, UnknownPeerID) ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) // now call update with all the txs. it should not fire as there are no txs left @@ -224,7 +226,7 @@ func TestTxsAvailable(t *testing.T) { ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) // send a bunch more txs, it should only fire once - checkTxs(t, mempool, 100) + checkTxs(t, mempool, 100, UnknownPeerID) ensureFire(t, mempool.TxsAvailable(), timeoutMS) ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) } @@ -340,28 +342,6 @@ func TestSerialReap(t *testing.T) { reapCheck(600) } -func TestCacheRemove(t *testing.T) { - cache := newMapTxCache(100) - numTxs := 10 - txs := make([][]byte, numTxs) - for i := 0; i < numTxs; i++ { - // probability of collision is 2**-256 - txBytes := make([]byte, 32) - rand.Read(txBytes) - txs[i] = txBytes - cache.Push(txBytes) - // make sure its added to both the linked list and the map - require.Equal(t, i+1, len(cache.map_)) - require.Equal(t, i+1, cache.list.Len()) - } - for i := 0; i < numTxs; i++ { - cache.Remove(txs[i]) - // make sure its removed from both the map and the linked list - require.Equal(t, numTxs-(i+1), len(cache.map_)) - require.Equal(t, numTxs-(i+1), cache.list.Len()) - } -} - func TestMempoolCloseWAL(t *testing.T) { // 1. Create the temporary directory for mempool and WAL testing. rootDir, err := ioutil.TempDir("", "mempool-test") diff --git a/mempool/reactor.go b/mempool/reactor.go index ff87f0506..555f38b8b 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -3,13 +3,14 @@ package mempool import ( "fmt" "reflect" + "sync" "time" amino "github.com/tendermint/go-amino" - "github.com/tendermint/tendermint/libs/clist" - "github.com/tendermint/tendermint/libs/log" cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/libs/clist" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" ) @@ -21,13 +22,70 @@ const ( maxTxSize = maxMsgSize - 8 // account for amino overhead of TxMessage peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount + + // UnknownPeerID is the peer ID to use when running CheckTx when there is + // no peer (e.g. RPC) + UnknownPeerID uint16 = 0 ) // MempoolReactor handles mempool tx broadcasting amongst peers. +// It maintains a map from peer ID to counter, to prevent gossiping txs to the +// peers you received it from. type MempoolReactor struct { p2p.BaseReactor config *cfg.MempoolConfig Mempool *Mempool + ids *mempoolIDs +} + +type mempoolIDs struct { + mtx sync.RWMutex + peerMap map[p2p.ID]uint16 + nextID uint16 // assumes that a node will never have over 65536 active peers + activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter +} + +// Reserve searches for the next unused ID and assignes it to the peer. +func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) { + ids.mtx.Lock() + defer ids.mtx.Unlock() + + curID := ids.nextPeerID() + ids.peerMap[peer.ID()] = curID + ids.activeIDs[curID] = struct{}{} +} + +// nextPeerID returns the next unused peer ID to use. +// This assumes that ids's mutex is already locked. +func (ids *mempoolIDs) nextPeerID() uint16 { + _, idExists := ids.activeIDs[ids.nextID] + for idExists { + ids.nextID++ + _, idExists = ids.activeIDs[ids.nextID] + } + curID := ids.nextID + ids.nextID++ + return curID +} + +// Reclaim returns the ID reserved for the peer back to unused pool. +func (ids *mempoolIDs) Reclaim(peer p2p.Peer) { + ids.mtx.Lock() + defer ids.mtx.Unlock() + + removedID, ok := ids.peerMap[peer.ID()] + if ok { + delete(ids.activeIDs, removedID) + delete(ids.peerMap, peer.ID()) + } +} + +// GetForPeer returns an ID reserved for the peer. +func (ids *mempoolIDs) GetForPeer(peer p2p.Peer) uint16 { + ids.mtx.RLock() + defer ids.mtx.RUnlock() + + return ids.peerMap[peer.ID()] } // NewMempoolReactor returns a new MempoolReactor with the given config and mempool. @@ -35,6 +93,11 @@ func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReac memR := &MempoolReactor{ config: config, Mempool: mempool, + ids: &mempoolIDs{ + peerMap: make(map[p2p.ID]uint16), + activeIDs: map[uint16]struct{}{0: {}}, + nextID: 1, // reserve unknownPeerID(0) for mempoolReactor.BroadcastTx + }, } memR.BaseReactor = *p2p.NewBaseReactor("MempoolReactor", memR) return memR @@ -68,11 +131,13 @@ func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { // AddPeer implements Reactor. // It starts a broadcast routine ensuring all txs are forwarded to the given peer. func (memR *MempoolReactor) AddPeer(peer p2p.Peer) { + memR.ids.ReserveForPeer(peer) go memR.broadcastTxRoutine(peer) } // RemovePeer implements Reactor. func (memR *MempoolReactor) RemovePeer(peer p2p.Peer, reason interface{}) { + memR.ids.Reclaim(peer) // broadcast routine checks if peer is gone and returns } @@ -89,7 +154,8 @@ func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { switch msg := msg.(type) { case *TxMessage: - err := memR.Mempool.CheckTx(msg.Tx, nil) + peerID := memR.ids.GetForPeer(src) + err := memR.Mempool.CheckTxWithInfo(msg.Tx, nil, TxInfo{PeerID: peerID}) if err != nil { memR.Logger.Info("Could not check tx", "tx", TxID(msg.Tx), "err", err) } @@ -110,6 +176,7 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { return } + peerID := memR.ids.GetForPeer(peer) var next *clist.CElement for { // This happens because the CElement we were looking at got garbage @@ -146,12 +213,15 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { continue } - // send memTx - msg := &TxMessage{Tx: memTx.tx} - success := peer.Send(MempoolChannel, cdc.MustMarshalBinaryBare(msg)) - if !success { - time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) - continue + // ensure peer hasn't already sent us this tx + if _, ok := memTx.senders.Load(peerID); !ok { + // send memTx + msg := &TxMessage{Tx: memTx.tx} + success := peer.Send(MempoolChannel, cdc.MustMarshalBinaryBare(msg)) + if !success { + time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) + continue + } } select { diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index 51d130187..f16f84479 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -7,15 +7,13 @@ import ( "time" "github.com/fortytw2/leaktest" + "github.com/go-kit/kit/log/term" "github.com/pkg/errors" "github.com/stretchr/testify/assert" - "github.com/go-kit/kit/log/term" - "github.com/tendermint/tendermint/abci/example/kvstore" - "github.com/tendermint/tendermint/libs/log" - cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" @@ -102,6 +100,12 @@ func _waitForTxs(t *testing.T, wg *sync.WaitGroup, txs types.Txs, reactorIdx int wg.Done() } +// ensure no txs on reactor after some timeout +func ensureNoTxs(t *testing.T, reactor *MempoolReactor, timeout time.Duration) { + time.Sleep(timeout) // wait for the txs in all mempools + assert.Zero(t, reactor.Mempool.Size()) +} + const ( NUM_TXS = 1000 TIMEOUT = 120 * time.Second // ridiculously high because CircleCI is slow @@ -124,10 +128,26 @@ func TestReactorBroadcastTxMessage(t *testing.T) { // send a bunch of txs to the first reactor's mempool // and wait for them all to be received in the others - txs := checkTxs(t, reactors[0].Mempool, NUM_TXS) + txs := checkTxs(t, reactors[0].Mempool, NUM_TXS, UnknownPeerID) waitForTxs(t, txs, reactors) } +func TestReactorNoBroadcastToSender(t *testing.T) { + config := cfg.TestConfig() + const N = 2 + reactors := makeAndConnectMempoolReactors(config, N) + defer func() { + for _, r := range reactors { + r.Stop() + } + }() + + // send a bunch of txs to the first reactor's mempool, claiming it came from peer + // ensure peer gets no txs + checkTxs(t, reactors[0].Mempool, NUM_TXS, 1) + ensureNoTxs(t, reactors[1], 100*time.Millisecond) +} + func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) { if testing.Short() { t.Skip("skipping test in short mode.") diff --git a/state/services.go b/state/services.go index 02c3aa7d1..07d12c5a1 100644 --- a/state/services.go +++ b/state/services.go @@ -23,6 +23,7 @@ type Mempool interface { Size() int CheckTx(types.Tx, func(*abci.Response)) error + CheckTxWithInfo(types.Tx, func(*abci.Response), mempool.TxInfo) error ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs Update(int64, types.Txs, mempool.PreCheckFunc, mempool.PostCheckFunc) error Flush() @@ -37,11 +38,17 @@ type MockMempool struct{} var _ Mempool = MockMempool{} -func (MockMempool) Lock() {} -func (MockMempool) Unlock() {} -func (MockMempool) Size() int { return 0 } -func (MockMempool) CheckTx(_ types.Tx, _ func(*abci.Response)) error { return nil } -func (MockMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } +func (MockMempool) Lock() {} +func (MockMempool) Unlock() {} +func (MockMempool) Size() int { return 0 } +func (MockMempool) CheckTx(_ types.Tx, _ func(*abci.Response)) error { + return nil +} +func (MockMempool) CheckTxWithInfo(_ types.Tx, _ func(*abci.Response), + _ mempool.TxInfo) error { + return nil +} +func (MockMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } func (MockMempool) Update( _ int64, _ types.Txs, From 1bb8e02a962a1ffd7ce4e7355f648b644105f649 Mon Sep 17 00:00:00 2001 From: HaoyangLiu Date: Tue, 26 Mar 2019 16:29:06 +0800 Subject: [PATCH 12/25] mempool: fix broadcastTxRoutine leak (#3478) Refs #3306, irisnet@fdbb676 I ran an irishub validator. After the validator node ran several days, I dump the whole goroutine stack. I found that there were hundreds of broadcastTxRoutine. However, the connected peer quantity was less than 30. So I belive that there must be broadcastTxRoutine leakage issue. According to my analysis, I think the root cause of this issue locate in below code: select { case <-next.NextWaitChan(): // see the start of the for loop for nil check next = next.Next() case <-peer.Quit(): return case <-memR.Quit(): return } As we know, if multiple paths are avaliable in the same time, then a random path will be selected. Suppose that next.NextWaitChan() and peer.Quit() are both avaliable, and next.NextWaitChan() is chosen. // send memTx msg := &TxMessage{Tx: memTx.tx} success := peer.Send(MempoolChannel, cdc.MustMarshalBinaryBare(msg)) if !success { time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) continue } Then next will be non-empty and the peer send operation won't be success. As a result, this go routine will be track into infinite loop and won't be released. My proposal is to check peer.Quit() and memR.Quit() in every loop no matter whether next is nil. --- mempool/reactor.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/mempool/reactor.go b/mempool/reactor.go index 555f38b8b..23fec2700 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -179,6 +179,10 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { peerID := memR.ids.GetForPeer(peer) var next *clist.CElement for { + // In case of both next.NextWaitChan() and peer.Quit() are variable at the same time + if !memR.IsRunning() || !peer.IsRunning() { + return + } // This happens because the CElement we were looking at got garbage // collected (removed). That is, .NextWait() returned nil. Go ahead and // start from the beginning. From a4d9539544ba4377f16e797fea01090bc974e1b5 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 26 Mar 2019 09:44:49 +0100 Subject: [PATCH 13/25] rpc/client: include NetworkClient interface into Client interface (#3473) I think it's nice when the Client interface has all the methods. If someone does not need a particular method/set of methods, she can use individual interfaces (e.g. NetworkClient, MempoolClient) or write her own interface. technically breaking Fixes #3458 --- CHANGELOG_PENDING.md | 1 + rpc/client/httpclient.go | 6 +----- rpc/client/interface.go | 8 +++----- rpc/client/localclient.go | 6 +----- rpc/client/mock/client.go | 12 ++++++++++++ 5 files changed, 18 insertions(+), 15 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index bebc3e6a6..eaf08928d 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -10,6 +10,7 @@ * Go API - [libs/common] Remove RepeatTimer (also TimerMaker and Ticker interface) +- [rpc/client] \#3458 Include NetworkClient interface into Client interface * Blockchain Protocol diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index e982292e7..55c7b4f17 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -52,11 +52,7 @@ func NewHTTP(remote, wsEndpoint string) *HTTP { } } -var ( - _ Client = (*HTTP)(nil) - _ NetworkClient = (*HTTP)(nil) - _ EventsClient = (*HTTP)(nil) -) +var _ Client = (*HTTP)(nil) func (c *HTTP) Status() (*ctypes.ResultStatus, error) { result := new(ctypes.ResultStatus) diff --git a/rpc/client/interface.go b/rpc/client/interface.go index 605d84ba2..8f9ed9372 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -72,17 +72,15 @@ type StatusClient interface { type Client interface { cmn.Service ABCIClient - SignClient + EventsClient HistoryClient + NetworkClient + SignClient StatusClient - EventsClient } // NetworkClient is general info about the network state. May not // be needed usually. -// -// Not included in the Client interface, but generally implemented -// by concrete implementations. type NetworkClient interface { NetInfo() (*ctypes.ResultNetInfo, error) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) diff --git a/rpc/client/localclient.go b/rpc/client/localclient.go index 976c9892a..d57ced311 100644 --- a/rpc/client/localclient.go +++ b/rpc/client/localclient.go @@ -58,11 +58,7 @@ func NewLocal(node *nm.Node) *Local { } } -var ( - _ Client = (*Local)(nil) - _ NetworkClient = (*Local)(nil) - _ EventsClient = (*Local)(nil) -) +var _ Client = (*Local)(nil) // SetLogger allows to set a logger on the client. func (c *Local) SetLogger(l log.Logger) { diff --git a/rpc/client/mock/client.go b/rpc/client/mock/client.go index 9c0eb75b8..c2e19b6d4 100644 --- a/rpc/client/mock/client.go +++ b/rpc/client/mock/client.go @@ -108,6 +108,18 @@ func (c Client) NetInfo() (*ctypes.ResultNetInfo, error) { return core.NetInfo(&rpctypes.Context{}) } +func (c Client) ConsensusState() (*ctypes.ResultConsensusState, error) { + return core.ConsensusState(&rpctypes.Context{}) +} + +func (c Client) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) { + return core.DumpConsensusState(&rpctypes.Context{}) +} + +func (c Client) Health() (*ctypes.ResultHealth, error) { + return core.Health(&rpctypes.Context{}) +} + func (c Client) DialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) { return core.UnsafeDialSeeds(&rpctypes.Context{}, seeds) } From 5a25b75b1d586c6d9c3fdfc940057f7b85089a90 Mon Sep 17 00:00:00 2001 From: zjubfd Date: Wed, 27 Mar 2019 00:13:14 +0800 Subject: [PATCH 14/25] p2p: refactor GetSelectionWithBias for addressbook (#3475) Why submit this pr: we have suffered from infinite loop in addrbook bug which takes us a long time to find out why process become a zombie peer. It have been fixed in #3232. But the ADDRS_LOOP is still there, risk of infinite loop is still exist. The algorithm that to random pick a bucket is not stable, which means the peer may unluckily always choose the wrong bucket for a long time, the time and cpu cost is meaningless. A simple improvement: shuffle bucketsNew and bucketsOld, and pick necessary number of address from them. A stable algorithm. --- p2p/pex/addrbook.go | 125 ++++++++++++++------------------------- p2p/pex/addrbook_test.go | 35 +++++------ 2 files changed, 58 insertions(+), 102 deletions(-) diff --git a/p2p/pex/addrbook.go b/p2p/pex/addrbook.go index 3cda9ac74..3cb91c380 100644 --- a/p2p/pex/addrbook.go +++ b/p2p/pex/addrbook.go @@ -9,6 +9,7 @@ import ( "encoding/binary" "fmt" "math" + "math/rand" "net" "sync" "time" @@ -405,89 +406,11 @@ func (a *addrBook) GetSelectionWithBias(biasTowardsNewAddrs int) []*p2p.NetAddre bookSize*getSelectionPercent/100) numAddresses = cmn.MinInt(maxGetSelection, numAddresses) - selection := make([]*p2p.NetAddress, numAddresses) - - oldBucketToAddrsMap := make(map[int]map[string]struct{}) - var oldIndex int - newBucketToAddrsMap := make(map[int]map[string]struct{}) - var newIndex int - - // initialize counters used to count old and new added addresses. - // len(oldBucketToAddrsMap) cannot be used as multiple addresses can endup in the same bucket. - var oldAddressesAdded int - var newAddressesAdded int - // number of new addresses that, if possible, should be in the beginning of the selection - numRequiredNewAdd := percentageOfNum(biasTowardsNewAddrs, numAddresses) - - selectionIndex := 0 -ADDRS_LOOP: - for selectionIndex < numAddresses { - // biasedTowardsOldAddrs indicates if the selection can switch to old addresses - biasedTowardsOldAddrs := selectionIndex >= numRequiredNewAdd - // An old addresses is selected if: - // - the bias is for old and old addressees are still available or, - // - there are no new addresses or all new addresses have been selected. - // numAddresses <= a.nOld + a.nNew therefore it is guaranteed that there are enough - // addresses to fill the selection - pickFromOldBucket := - (biasedTowardsOldAddrs && oldAddressesAdded < a.nOld) || - a.nNew == 0 || newAddressesAdded >= a.nNew - - bucket := make(map[string]*knownAddress) - - // loop until we pick a random non-empty bucket - for len(bucket) == 0 { - if pickFromOldBucket { - oldIndex = a.rand.Intn(len(a.bucketsOld)) - bucket = a.bucketsOld[oldIndex] - } else { - newIndex = a.rand.Intn(len(a.bucketsNew)) - bucket = a.bucketsNew[newIndex] - } - } - - // pick a random index - randIndex := a.rand.Intn(len(bucket)) - - // loop over the map to return that index - var selectedAddr *p2p.NetAddress - for _, ka := range bucket { - if randIndex == 0 { - selectedAddr = ka.Addr - break - } - randIndex-- - } - - // if we have selected the address before, restart the loop - // otherwise, record it and continue - if pickFromOldBucket { - if addrsMap, ok := oldBucketToAddrsMap[oldIndex]; ok { - if _, ok = addrsMap[selectedAddr.String()]; ok { - continue ADDRS_LOOP - } - } else { - oldBucketToAddrsMap[oldIndex] = make(map[string]struct{}) - } - oldBucketToAddrsMap[oldIndex][selectedAddr.String()] = struct{}{} - oldAddressesAdded++ - } else { - if addrsMap, ok := newBucketToAddrsMap[newIndex]; ok { - if _, ok = addrsMap[selectedAddr.String()]; ok { - continue ADDRS_LOOP - } - } else { - newBucketToAddrsMap[newIndex] = make(map[string]struct{}) - } - newBucketToAddrsMap[newIndex][selectedAddr.String()] = struct{}{} - newAddressesAdded++ - } - - selection[selectionIndex] = selectedAddr - selectionIndex++ - } - + // if there are no enough old addrs, will choose new addr instead. + numRequiredNewAdd := cmn.MaxInt(percentageOfNum(biasTowardsNewAddrs, numAddresses), numAddresses-a.nOld) + selection := a.randomPickAddresses(bucketTypeNew, numRequiredNewAdd) + selection = append(selection, a.randomPickAddresses(bucketTypeOld, numAddresses-len(selection))...) return selection } @@ -726,6 +649,44 @@ func (a *addrBook) addAddress(addr, src *p2p.NetAddress) error { return nil } +func (a *addrBook) randomPickAddresses(bucketType byte, num int) []*p2p.NetAddress { + var buckets []map[string]*knownAddress + switch bucketType { + case bucketTypeNew: + buckets = a.bucketsNew + case bucketTypeOld: + buckets = a.bucketsOld + default: + panic("unexpected bucketType") + } + total := 0 + for _, bucket := range buckets { + total = total + len(bucket) + } + addresses := make([]*knownAddress, 0, total) + for _, bucket := range buckets { + for _, ka := range bucket { + addresses = append(addresses, ka) + } + } + selection := make([]*p2p.NetAddress, 0, num) + chosenSet := make(map[string]bool, num) + rand.Shuffle(total, func(i, j int) { + addresses[i], addresses[j] = addresses[j], addresses[i] + }) + for _, addr := range addresses { + if chosenSet[addr.Addr.String()] { + continue + } + chosenSet[addr.Addr.String()] = true + selection = append(selection, addr.Addr) + if len(selection) >= num { + return selection + } + } + return selection +} + // Make space in the new buckets by expiring the really bad entries. // If no bad entries are available we remove the oldest. func (a *addrBook) expireNew(bucketIdx int) { diff --git a/p2p/pex/addrbook_test.go b/p2p/pex/addrbook_test.go index 9effa5d0e..fdcb0c8ad 100644 --- a/p2p/pex/addrbook_test.go +++ b/p2p/pex/addrbook_test.go @@ -435,12 +435,12 @@ func TestPrivatePeers(t *testing.T) { func testAddrBookAddressSelection(t *testing.T, bookSize int) { // generate all combinations of old (m) and new addresses - for nOld := 0; nOld <= bookSize; nOld++ { - nNew := bookSize - nOld - dbgStr := fmt.Sprintf("book of size %d (new %d, old %d)", bookSize, nNew, nOld) + for nBookOld := 0; nBookOld <= bookSize; nBookOld++ { + nBookNew := bookSize - nBookOld + dbgStr := fmt.Sprintf("book of size %d (new %d, old %d)", bookSize, nBookNew, nBookOld) // create book and get selection - book, fname := createAddrBookWithMOldAndNNewAddrs(t, nOld, nNew) + book, fname := createAddrBookWithMOldAndNNewAddrs(t, nBookOld, nBookNew) defer deleteTempFile(fname) addrs := book.GetSelectionWithBias(biasToSelectNewPeers) assert.NotNil(t, addrs, "%s - expected a non-nil selection", dbgStr) @@ -460,27 +460,25 @@ func testAddrBookAddressSelection(t *testing.T, bookSize int) { // Given: // n - num new addrs, m - num old addrs // k - num new addrs expected in the beginning (based on bias %) - // i=min(n, k), aka expFirstNew + // i=min(n, max(k,r-m)), aka expNew // j=min(m, r-i), aka expOld // // We expect this layout: - // indices: 0...i-1 i...i+j-1 i+j...r - // addresses: N0..Ni-1 O0..Oj-1 Ni... + // indices: 0...i-1 i...i+j-1 + // addresses: N0..Ni-1 O0..Oj-1 // // There is at least one partition and at most three. var ( - k = percentageOfNum(biasToSelectNewPeers, nAddrs) - expFirstNew = cmn.MinInt(nNew, k) - expOld = cmn.MinInt(nOld, nAddrs-expFirstNew) - expNew = nAddrs - expOld - expLastNew = expNew - expFirstNew + k = percentageOfNum(biasToSelectNewPeers, nAddrs) + expNew = cmn.MinInt(nNew, cmn.MaxInt(k, nAddrs-nBookOld)) + expOld = cmn.MinInt(nOld, nAddrs-expNew) ) // Verify that the number of old and new addresses are as expected - if nNew < expNew || nNew > expNew { + if nNew != expNew { t.Fatalf("%s - expected new addrs %d, got %d", dbgStr, expNew, nNew) } - if nOld < expOld || nOld > expOld { + if nOld != expOld { t.Fatalf("%s - expected old addrs %d, got %d", dbgStr, expOld, nOld) } @@ -499,15 +497,12 @@ func testAddrBookAddressSelection(t *testing.T, bookSize int) { case expOld == 0: // all new addresses expSeqLens = []int{nAddrs} expSeqTypes = []int{1} - case expFirstNew == 0: // all old addresses + case expNew == 0: // all old addresses expSeqLens = []int{nAddrs} expSeqTypes = []int{2} - case nAddrs-expFirstNew-expOld == 0: // new addresses, old addresses - expSeqLens = []int{expFirstNew, expOld} + case nAddrs-expNew-expOld == 0: // new addresses, old addresses + expSeqLens = []int{expNew, expOld} expSeqTypes = []int{1, 2} - default: // new addresses, old addresses, new addresses - expSeqLens = []int{expFirstNew, expOld, expLastNew} - expSeqTypes = []int{1, 2, 1} } assert.Equal(t, expSeqLens, seqLens, From 55b7118c981e93b920cead4616b91ec58130e316 Mon Sep 17 00:00:00 2001 From: Ismail Khoffi Date: Wed, 27 Mar 2019 15:35:32 +0100 Subject: [PATCH 15/25] Prep changelog: copy from pending & update version --- CHANGELOG.md | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f0ba675ae..4824c4f27 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,35 @@ # Changelog +## v0.31.1 + +*March 27th, 2019* + +### BREAKING CHANGES: + +* CLI/RPC/Config + +* Apps + +* Go API +- [libs/common] Remove RepeatTimer (also TimerMaker and Ticker interface) +- [rpc/client] \#3458 Include NetworkClient interface into Client interface + +* Blockchain Protocol + +* P2P Protocol + +### FEATURES: +- [rpc] \#3419 Start HTTPS server if `rpc.tls_cert_file` and `rpc.tls_key_file` are provided in the config (@guagualvcha) + +### IMPROVEMENTS: + +- [mempool] \#2778 No longer send txs back to peers who sent it to you + +### BUG FIXES: + +- [blockchain] \#2699 update the maxHeight when a peer is removed + + ## v0.31.0 *March 16th, 2019* From ed63e1f378eaf465b714084fa1edd6ac4761782f Mon Sep 17 00:00:00 2001 From: Ismail Khoffi Date: Wed, 27 Mar 2019 16:03:25 +0100 Subject: [PATCH 16/25] Add more entries to the Changelog, fix formatting, linkify --- CHANGELOG.md | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4824c4f27..7eb690e80 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ *March 27th, 2019* +Special thanks to external contributors on this release: + ### BREAKING CHANGES: * CLI/RPC/Config @@ -11,23 +13,30 @@ * Apps * Go API -- [libs/common] Remove RepeatTimer (also TimerMaker and Ticker interface) -- [rpc/client] \#3458 Include NetworkClient interface into Client interface + - [crypto] [\#3426](https://github.com/tendermint/tendermint/pull/3426) Remove `Ripemd160` helper method (@needkane) + - [libs/common] Remove `RepeatTimer` (also `TimerMaker` and `Ticker` interface) + - [rpc/client] [\#3458](https://github.com/tendermint/tendermint/issues/3458) Include `NetworkClient` interface into `Client` interface + - [types] [\#3448](https://github.com/tendermint/tendermint/issues/3448) Remove method `PB2TM.ConsensusParams` * Blockchain Protocol * P2P Protocol ### FEATURES: -- [rpc] \#3419 Start HTTPS server if `rpc.tls_cert_file` and `rpc.tls_key_file` are provided in the config (@guagualvcha) + + - [rpc] [\#3419](https://github.com/tendermint/tendermint/issues/3419) Start HTTPS server if `rpc.tls_cert_file` and `rpc.tls_key_file` are provided in the config (@guagualvcha) ### IMPROVEMENTS: -- [mempool] \#2778 No longer send txs back to peers who sent it to you +- [docs] [\#3140](https://github.com/tendermint/tendermint/issues/3140) Formalize proposer election algorithm properties +- [mempool] [\#2778](https://github.com/tendermint/tendermint/issues/2778) No longer send txs back to peers who sent it to you +- [p2p] [\#3475](https://github.com/tendermint/tendermint/issues/3475) Simplify `GetSelectionWithBias` for addressbook +- [rpc/lib/client] [\#3430](https://github.com/tendermint/tendermint/issues/3430) Disable compression for HTTP client to prevent GZIP-bomb DoS attacks (@guagualvcha) ### BUG FIXES: -- [blockchain] \#2699 update the maxHeight when a peer is removed +- [blockchain] [\#2699](https://github.com/tendermint/tendermint/issues/2699) update the maxHeight when a peer is removed +- [mempool] [\#3478](https://github.com/tendermint/tendermint/issues/3478) Fix memory-leak related to `broadcastTxRoutine` (@HaoyangLiu) ## v0.31.0 From e3f840e6a6f81406c44e760e578e48fcd3fedf12 Mon Sep 17 00:00:00 2001 From: Ismail Khoffi Date: Wed, 27 Mar 2019 16:04:43 +0100 Subject: [PATCH 17/25] reset CHANGELOG_PENDING.md --- CHANGELOG_PENDING.md | 6 ------ 1 file changed, 6 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index eaf08928d..470282aa9 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -9,20 +9,14 @@ * Apps * Go API -- [libs/common] Remove RepeatTimer (also TimerMaker and Ticker interface) -- [rpc/client] \#3458 Include NetworkClient interface into Client interface * Blockchain Protocol * P2P Protocol ### FEATURES: -- [rpc] \#3419 Start HTTPS server if `rpc.tls_cert_file` and `rpc.tls_key_file` are provided in the config (@guagualvcha) ### IMPROVEMENTS: -- [mempool] \#2778 No longer send txs back to peers who sent it to you - ### BUG FIXES: -- [blockchain] \#2699 update the maxHeight when a peer is removed From 52727863e187e85901e7f2e7285920a701d2a8e6 Mon Sep 17 00:00:00 2001 From: Ismail Khoffi Date: Wed, 27 Mar 2019 16:07:03 +0100 Subject: [PATCH 18/25] add external contributors --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7eb690e80..6e87e2b76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ *March 27th, 2019* Special thanks to external contributors on this release: +@guagualvcha, @HaoyangLiu, @needkane ### BREAKING CHANGES: From 5fa540bdc9eac01cc3df6de01119550d1122114b Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 27 Mar 2019 16:45:34 +0100 Subject: [PATCH 19/25] mempool: add a safety check, write tests for mempoolIDs (#3487) * mempool: add a safety check, write tests for mempoolIDs and document 65536 limit in the mempool reactor spec follow-up to https://github.com/tendermint/tendermint/pull/2778 * rename the test * fixes after Ismail's review --- consensus/state_test.go | 6 +- docs/spec/reactors/mempool/reactor.md | 8 ++- mempool/mempool.go | 5 +- mempool/reactor.go | 24 +++++-- mempool/reactor_test.go | 35 +++++++++ p2p/dummy/peer.go | 100 -------------------------- p2p/mock/peer.go | 68 ++++++++++++++++++ p2p/pex/pex_reactor_test.go | 53 ++------------ 8 files changed, 140 insertions(+), 159 deletions(-) delete mode 100644 p2p/dummy/peer.go create mode 100644 p2p/mock/peer.go diff --git a/consensus/state_test.go b/consensus/state_test.go index a4d01e584..fc1e3e949 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -14,7 +14,7 @@ import ( cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/log" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" - p2pdummy "github.com/tendermint/tendermint/p2p/dummy" + p2pmock "github.com/tendermint/tendermint/p2p/mock" "github.com/tendermint/tendermint/types" ) @@ -1548,7 +1548,7 @@ func TestStateHalt1(t *testing.T) { func TestStateOutputsBlockPartsStats(t *testing.T) { // create dummy peer cs, _ := randConsensusState(1) - peer := p2pdummy.NewPeer() + peer := p2pmock.NewPeer(nil) // 1) new block part parts := types.NewPartSetFromData(cmn.RandBytes(100), 10) @@ -1591,7 +1591,7 @@ func TestStateOutputsBlockPartsStats(t *testing.T) { func TestStateOutputVoteStats(t *testing.T) { cs, vss := randConsensusState(2) // create dummy peer - peer := p2pdummy.NewPeer() + peer := p2pmock.NewPeer(nil) vote := signVote(vss[1], types.PrecommitType, []byte("test"), types.PartSetHeader{}) diff --git a/docs/spec/reactors/mempool/reactor.md b/docs/spec/reactors/mempool/reactor.md index d0b19f7ca..d349fc7cc 100644 --- a/docs/spec/reactors/mempool/reactor.md +++ b/docs/spec/reactors/mempool/reactor.md @@ -13,4 +13,10 @@ for details. Sending incorrectly encoded data or data exceeding `maxMsgSize` will result in stopping the peer. -The mempool will not send a tx back to any peer which it received it from. \ No newline at end of file +The mempool will not send a tx back to any peer which it received it from. + +The reactor assigns an `uint16` number for each peer and maintains a map from +p2p.ID to `uint16`. Each mempool transaction carries a list of all the senders +(`[]uint16`). The list is updated every time mempool receives a transaction it +is already seen. `uint16` assumes that a node will never have over 65535 active +peers (0 is reserved for unknown source - e.g. RPC). diff --git a/mempool/mempool.go b/mempool/mempool.go index 2064b7bce..bd3cbf7d9 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -382,7 +382,10 @@ func (mem *Mempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo if !mem.cache.Push(tx) { // record the sender e, ok := mem.txsMap[sha256.Sum256(tx)] - if ok { // tx may be in cache, but not in the mempool + // The check is needed because tx may be in cache, but not in the mempool. + // E.g. after we've committed a block, txs are removed from the mempool, + // but not from the cache. + if ok { memTx := e.Value.(*mempoolTx) if _, loaded := memTx.senders.LoadOrStore(txInfo.PeerID, true); loaded { // TODO: consider punishing peer for dups, diff --git a/mempool/reactor.go b/mempool/reactor.go index 23fec2700..e1376b287 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -2,6 +2,7 @@ package mempool import ( "fmt" + "math" "reflect" "sync" "time" @@ -26,6 +27,8 @@ const ( // UnknownPeerID is the peer ID to use when running CheckTx when there is // no peer (e.g. RPC) UnknownPeerID uint16 = 0 + + maxActiveIDs = math.MaxUint16 ) // MempoolReactor handles mempool tx broadcasting amongst peers. @@ -45,7 +48,8 @@ type mempoolIDs struct { activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter } -// Reserve searches for the next unused ID and assignes it to the peer. +// Reserve searches for the next unused ID and assignes it to the +// peer. func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) { ids.mtx.Lock() defer ids.mtx.Unlock() @@ -58,6 +62,10 @@ func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) { // nextPeerID returns the next unused peer ID to use. // This assumes that ids's mutex is already locked. func (ids *mempoolIDs) nextPeerID() uint16 { + if len(ids.activeIDs) == maxActiveIDs { + panic(fmt.Sprintf("node has maximum %d active IDs and wanted to get one more", maxActiveIDs)) + } + _, idExists := ids.activeIDs[ids.nextID] for idExists { ids.nextID++ @@ -88,16 +96,20 @@ func (ids *mempoolIDs) GetForPeer(peer p2p.Peer) uint16 { return ids.peerMap[peer.ID()] } +func newMempoolIDs() *mempoolIDs { + return &mempoolIDs{ + peerMap: make(map[p2p.ID]uint16), + activeIDs: map[uint16]struct{}{0: {}}, + nextID: 1, // reserve unknownPeerID(0) for mempoolReactor.BroadcastTx + } +} + // NewMempoolReactor returns a new MempoolReactor with the given config and mempool. func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReactor { memR := &MempoolReactor{ config: config, Mempool: mempool, - ids: &mempoolIDs{ - peerMap: make(map[p2p.ID]uint16), - activeIDs: map[uint16]struct{}{0: {}}, - nextID: 1, // reserve unknownPeerID(0) for mempoolReactor.BroadcastTx - }, + ids: newMempoolIDs(), } memR.BaseReactor = *p2p.NewBaseReactor("MempoolReactor", memR) return memR diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index f16f84479..c9cf49809 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -2,6 +2,7 @@ package mempool import ( "fmt" + "net" "sync" "testing" "time" @@ -15,6 +16,7 @@ import ( cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/p2p/mock" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" ) @@ -189,3 +191,36 @@ func TestBroadcastTxForPeerStopsWhenReactorStops(t *testing.T) { // i.e. broadcastTxRoutine finishes when reactor is stopped leaktest.CheckTimeout(t, 10*time.Second)() } + +func TestMempoolIDsBasic(t *testing.T) { + ids := newMempoolIDs() + + peer := mock.NewPeer(net.IP{127, 0, 0, 1}) + + ids.ReserveForPeer(peer) + assert.EqualValues(t, 1, ids.GetForPeer(peer)) + ids.Reclaim(peer) + + ids.ReserveForPeer(peer) + assert.EqualValues(t, 2, ids.GetForPeer(peer)) + ids.Reclaim(peer) +} + +func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) { + if testing.Short() { + return + } + + // 0 is already reserved for UnknownPeerID + ids := newMempoolIDs() + + for i := 0; i < maxActiveIDs-1; i++ { + peer := mock.NewPeer(net.IP{127, 0, 0, 1}) + ids.ReserveForPeer(peer) + } + + assert.Panics(t, func() { + peer := mock.NewPeer(net.IP{127, 0, 0, 1}) + ids.ReserveForPeer(peer) + }) +} diff --git a/p2p/dummy/peer.go b/p2p/dummy/peer.go deleted file mode 100644 index 57edafc67..000000000 --- a/p2p/dummy/peer.go +++ /dev/null @@ -1,100 +0,0 @@ -package dummy - -import ( - "net" - - cmn "github.com/tendermint/tendermint/libs/common" - p2p "github.com/tendermint/tendermint/p2p" - tmconn "github.com/tendermint/tendermint/p2p/conn" -) - -type peer struct { - cmn.BaseService - kv map[string]interface{} -} - -var _ p2p.Peer = (*peer)(nil) - -// NewPeer creates new dummy peer. -func NewPeer() *peer { - p := &peer{ - kv: make(map[string]interface{}), - } - p.BaseService = *cmn.NewBaseService(nil, "peer", p) - - return p -} - -// FlushStop just calls Stop. -func (p *peer) FlushStop() { - p.Stop() -} - -// ID always returns dummy. -func (p *peer) ID() p2p.ID { - return p2p.ID("dummy") -} - -// IsOutbound always returns false. -func (p *peer) IsOutbound() bool { - return false -} - -// IsPersistent always returns false. -func (p *peer) IsPersistent() bool { - return false -} - -// NodeInfo always returns empty node info. -func (p *peer) NodeInfo() p2p.NodeInfo { - return p2p.DefaultNodeInfo{} -} - -// RemoteIP always returns localhost. -func (p *peer) RemoteIP() net.IP { - return net.ParseIP("127.0.0.1") -} - -// Addr always returns tcp://localhost:8800. -func (p *peer) RemoteAddr() net.Addr { - return &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8800} -} - -// CloseConn always returns nil. -func (p *peer) CloseConn() error { - return nil -} - -// Status always returns empry connection status. -func (p *peer) Status() tmconn.ConnectionStatus { - return tmconn.ConnectionStatus{} -} - -// Send does not do anything and just returns true. -func (p *peer) Send(byte, []byte) bool { - return true -} - -// TrySend does not do anything and just returns true. -func (p *peer) TrySend(byte, []byte) bool { - return true -} - -// Set records value under key specified in the map. -func (p *peer) Set(key string, value interface{}) { - p.kv[key] = value -} - -// Get returns a value associated with the key. Nil is returned if no value -// found. -func (p *peer) Get(key string) interface{} { - if value, ok := p.kv[key]; ok { - return value - } - return nil -} - -// OriginalAddr always returns nil. -func (p *peer) OriginalAddr() *p2p.NetAddress { - return nil -} diff --git a/p2p/mock/peer.go b/p2p/mock/peer.go new file mode 100644 index 000000000..5ee81f67e --- /dev/null +++ b/p2p/mock/peer.go @@ -0,0 +1,68 @@ +package mock + +import ( + "net" + + "github.com/tendermint/tendermint/crypto/ed25519" + cmn "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/p2p/conn" +) + +type Peer struct { + *cmn.BaseService + ip net.IP + id p2p.ID + addr *p2p.NetAddress + kv map[string]interface{} + Outbound, Persistent bool +} + +// NewPeer creates and starts a new mock peer. If the ip +// is nil, random routable address is used. +func NewPeer(ip net.IP) *Peer { + var netAddr *p2p.NetAddress + if ip == nil { + _, netAddr = p2p.CreateRoutableAddr() + } else { + netAddr = p2p.NewNetAddressIPPort(ip, 26656) + } + nodeKey := p2p.NodeKey{PrivKey: ed25519.GenPrivKey()} + netAddr.ID = nodeKey.ID() + mp := &Peer{ + ip: ip, + id: nodeKey.ID(), + addr: netAddr, + kv: make(map[string]interface{}), + } + mp.BaseService = cmn.NewBaseService(nil, "MockPeer", mp) + mp.Start() + return mp +} + +func (mp *Peer) FlushStop() { mp.Stop() } +func (mp *Peer) TrySend(chID byte, msgBytes []byte) bool { return true } +func (mp *Peer) Send(chID byte, msgBytes []byte) bool { return true } +func (mp *Peer) NodeInfo() p2p.NodeInfo { + return p2p.DefaultNodeInfo{ + ID_: mp.addr.ID, + ListenAddr: mp.addr.DialString(), + } +} +func (mp *Peer) Status() conn.ConnectionStatus { return conn.ConnectionStatus{} } +func (mp *Peer) ID() p2p.ID { return mp.id } +func (mp *Peer) IsOutbound() bool { return mp.Outbound } +func (mp *Peer) IsPersistent() bool { return mp.Persistent } +func (mp *Peer) Get(key string) interface{} { + if value, ok := mp.kv[key]; ok { + return value + } + return nil +} +func (mp *Peer) Set(key string, value interface{}) { + mp.kv[key] = value +} +func (mp *Peer) RemoteIP() net.IP { return mp.ip } +func (mp *Peer) OriginalAddr() *p2p.NetAddress { return mp.addr } +func (mp *Peer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} } +func (mp *Peer) CloseConn() error { return nil } diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 4f4ccb039..9e23058a5 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -3,7 +3,6 @@ package pex import ( "fmt" "io/ioutil" - "net" "os" "path/filepath" "testing" @@ -12,14 +11,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/tendermint/tendermint/crypto" - "github.com/tendermint/tendermint/crypto/ed25519" - cmn "github.com/tendermint/tendermint/libs/common" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" - "github.com/tendermint/tendermint/p2p/conn" + "github.com/tendermint/tendermint/p2p/mock" ) var ( @@ -148,7 +143,7 @@ func TestPEXReactorRequestMessageAbuse(t *testing.T) { sw := createSwitchAndAddReactors(r) sw.SetAddrBook(book) - peer := newMockPeer() + peer := mock.NewPeer(nil) p2p.AddPeerToSwitch(sw, peer) assert.True(t, sw.Peers().Has(peer.ID())) @@ -178,7 +173,7 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) { sw := createSwitchAndAddReactors(r) sw.SetAddrBook(book) - peer := newMockPeer() + peer := mock.NewPeer(nil) p2p.AddPeerToSwitch(sw, peer) assert.True(t, sw.Peers().Has(peer.ID())) @@ -418,7 +413,7 @@ func TestPEXReactorDialPeer(t *testing.T) { sw := createSwitchAndAddReactors(pexR) sw.SetAddrBook(book) - peer := newMockPeer() + peer := mock.NewPeer(nil) addr := peer.NodeInfo().NetAddress() assert.Equal(t, 0, pexR.AttemptsToDial(addr)) @@ -444,44 +439,6 @@ func TestPEXReactorDialPeer(t *testing.T) { } } -type mockPeer struct { - *cmn.BaseService - pubKey crypto.PubKey - addr *p2p.NetAddress - outbound, persistent bool -} - -func newMockPeer() mockPeer { - _, netAddr := p2p.CreateRoutableAddr() - mp := mockPeer{ - addr: netAddr, - pubKey: ed25519.GenPrivKey().PubKey(), - } - mp.BaseService = cmn.NewBaseService(nil, "MockPeer", mp) - mp.Start() - return mp -} - -func (mp mockPeer) FlushStop() { mp.Stop() } -func (mp mockPeer) ID() p2p.ID { return mp.addr.ID } -func (mp mockPeer) IsOutbound() bool { return mp.outbound } -func (mp mockPeer) IsPersistent() bool { return mp.persistent } -func (mp mockPeer) NodeInfo() p2p.NodeInfo { - return p2p.DefaultNodeInfo{ - ID_: mp.addr.ID, - ListenAddr: mp.addr.DialString(), - } -} -func (mockPeer) RemoteIP() net.IP { return net.ParseIP("127.0.0.1") } -func (mockPeer) Status() conn.ConnectionStatus { return conn.ConnectionStatus{} } -func (mockPeer) Send(byte, []byte) bool { return false } -func (mockPeer) TrySend(byte, []byte) bool { return false } -func (mockPeer) Set(string, interface{}) {} -func (mockPeer) Get(string) interface{} { return nil } -func (mockPeer) OriginalAddr() *p2p.NetAddress { return nil } -func (mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8800} } -func (mockPeer) CloseConn() error { return nil } - func assertPeersWithTimeout( t *testing.T, switches []*p2p.Switch, From 3c7bb6b571ac47e050a47013e22dbdfd68f02745 Mon Sep 17 00:00:00 2001 From: Ismail Khoffi Date: Wed, 27 Mar 2019 16:48:00 +0100 Subject: [PATCH 20/25] Add some numbers for #2778 --- CHANGELOG.md | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e87e2b76..4f2a9971f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,7 +30,15 @@ Special thanks to external contributors on this release: ### IMPROVEMENTS: - [docs] [\#3140](https://github.com/tendermint/tendermint/issues/3140) Formalize proposer election algorithm properties -- [mempool] [\#2778](https://github.com/tendermint/tendermint/issues/2778) No longer send txs back to peers who sent it to you +- [mempool] [\#2778](https://github.com/tendermint/tendermint/issues/2778) No longer send txs back to peers who sent it to you and limit to 65536 active peers. +This vastly improves the the b bandwidth consumption of nodes. +E.g. 250bytes txs for 120 sec. at 500 txs/sec. rate, so total 15MB: + - total bytes received from 1st node in 4 node localnet + - before: 42793967 (43MB) + - after: 30003256 (30MB) + - total bytes sent to 1st node in 4 node localnet + - before: 30569339 (30MB) + - after: 19304964 (19MB) - [p2p] [\#3475](https://github.com/tendermint/tendermint/issues/3475) Simplify `GetSelectionWithBias` for addressbook - [rpc/lib/client] [\#3430](https://github.com/tendermint/tendermint/issues/3430) Disable compression for HTTP client to prevent GZIP-bomb DoS attacks (@guagualvcha) From 233813483642c2ac370110dd42c3291556c26039 Mon Sep 17 00:00:00 2001 From: Ismail Khoffi Date: Wed, 27 Mar 2019 16:52:19 +0100 Subject: [PATCH 21/25] bump versions --- version/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version/version.go b/version/version.go index b2202206c..9090fc7e6 100644 --- a/version/version.go +++ b/version/version.go @@ -20,7 +20,7 @@ const ( // Must be a string because scripts like dist.sh read this file. // XXX: Don't change the name of this variable or you will break // automation :) - TMCoreSemVer = "0.31.0" + TMCoreSemVer = "0.31.1" // ABCISemVer is the semantic version of the ABCI library ABCISemVer = "0.16.0" From ae88965ff6ddf174607a02eee83cfc5ed11c6cbb Mon Sep 17 00:00:00 2001 From: Ismail Khoffi Date: Wed, 27 Mar 2019 17:46:29 +0100 Subject: [PATCH 22/25] changelog: add summary & fix link & add external contributor (#3490) --- CHANGELOG.md | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f2a9971f..1b3ee16d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ *March 27th, 2019* +This release contains a major improvement for the mempool that reduce the amount of sent data by about 30% +(see some numbers below). +It also fixes a memory leak in the mempool and gives you the ability to run HTTPS RPC servers (over TLS) by providing a certificate and key in the config. + Special thanks to external contributors on this release: @guagualvcha, @HaoyangLiu, @needkane @@ -15,7 +19,7 @@ Special thanks to external contributors on this release: * Go API - [crypto] [\#3426](https://github.com/tendermint/tendermint/pull/3426) Remove `Ripemd160` helper method (@needkane) - - [libs/common] Remove `RepeatTimer` (also `TimerMaker` and `Ticker` interface) + - [libs/common] [\#3429](https://github.com/tendermint/tendermint/pull/3429) Remove `RepeatTimer` (also `TimerMaker` and `Ticker` interface) - [rpc/client] [\#3458](https://github.com/tendermint/tendermint/issues/3458) Include `NetworkClient` interface into `Client` interface - [types] [\#3448](https://github.com/tendermint/tendermint/issues/3448) Remove method `PB2TM.ConsensusParams` @@ -30,7 +34,8 @@ Special thanks to external contributors on this release: ### IMPROVEMENTS: - [docs] [\#3140](https://github.com/tendermint/tendermint/issues/3140) Formalize proposer election algorithm properties -- [mempool] [\#2778](https://github.com/tendermint/tendermint/issues/2778) No longer send txs back to peers who sent it to you and limit to 65536 active peers. +- [mempool] [\#2778](https://github.com/tendermint/tendermint/issues/2778) No longer send txs back to peers who sent it to you. +Also, limit to 65536 active peers. This vastly improves the the b bandwidth consumption of nodes. E.g. 250bytes txs for 120 sec. at 500 txs/sec. rate, so total 15MB: - total bytes received from 1st node in 4 node localnet @@ -39,7 +44,7 @@ E.g. 250bytes txs for 120 sec. at 500 txs/sec. rate, so total 15MB: - total bytes sent to 1st node in 4 node localnet - before: 30569339 (30MB) - after: 19304964 (19MB) -- [p2p] [\#3475](https://github.com/tendermint/tendermint/issues/3475) Simplify `GetSelectionWithBias` for addressbook +- [p2p] [\#3475](https://github.com/tendermint/tendermint/issues/3475) Simplify `GetSelectionWithBias` for addressbook (@guagualvcha) - [rpc/lib/client] [\#3430](https://github.com/tendermint/tendermint/issues/3430) Disable compression for HTTP client to prevent GZIP-bomb DoS attacks (@guagualvcha) ### BUG FIXES: From ccfe75ec4a701ad7dcaf1f26df772b0881ab1184 Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Wed, 27 Mar 2019 18:51:57 +0100 Subject: [PATCH 23/25] docs: Fix broken links (#3482) (#3488) * docs: fix broken links (#3482) A bunch of links were broken in the documentation s they included the `docs` prefix. * Update CHANGELOG_PENDING * docs: switch to relative links for github compatitibility (#3482) --- docs/networks/docker-compose.md | 2 +- docs/spec/blockchain/blockchain.md | 8 ++++---- docs/spec/blockchain/encoding.md | 2 +- docs/spec/consensus/abci.md | 2 +- docs/spec/consensus/wal.md | 2 +- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/networks/docker-compose.md b/docs/networks/docker-compose.md index 7e4adde8d..8db49af5e 100644 --- a/docs/networks/docker-compose.md +++ b/docs/networks/docker-compose.md @@ -4,7 +4,7 @@ With Docker Compose, you can spin up local testnets with a single command. ## Requirements -1. [Install tendermint](/docs/introduction/install.md) +1. [Install tendermint](../introduction/install.md) 2. [Install docker](https://docs.docker.com/engine/installation/) 3. [Install docker-compose](https://docs.docker.com/compose/install/) diff --git a/docs/spec/blockchain/blockchain.md b/docs/spec/blockchain/blockchain.md index 60a07d42a..9f88d6417 100644 --- a/docs/spec/blockchain/blockchain.md +++ b/docs/spec/blockchain/blockchain.md @@ -103,7 +103,7 @@ type PartSetHeader struct { } ``` -See [MerkleRoot](/docs/spec/blockchain/encoding.md#MerkleRoot) for details. +See [MerkleRoot](./encoding.md#MerkleRoot) for details. ## Time @@ -163,7 +163,7 @@ a _precommit_ has `vote.Type == 2`. Signatures in Tendermint are raw bytes representing the underlying signature. -See the [signature spec](/docs/spec/blockchain/encoding.md#key-types) for more. +See the [signature spec](./encoding.md#key-types) for more. ## EvidenceData @@ -190,7 +190,7 @@ type DuplicateVoteEvidence struct { } ``` -See the [pubkey spec](/docs/spec/blockchain/encoding.md#key-types) for more. +See the [pubkey spec](./encoding.md#key-types) for more. ## Validation @@ -209,7 +209,7 @@ the current version of the `state` corresponds to the state after executing transactions from the `prevBlock`. Elements of an object are accessed as expected, ie. `block.Header`. -See the [definition of `State`](/docs/spec/blockchain/state.md). +See the [definition of `State`](./state.md). ### Header diff --git a/docs/spec/blockchain/encoding.md b/docs/spec/blockchain/encoding.md index e8258e4a9..bde580a14 100644 --- a/docs/spec/blockchain/encoding.md +++ b/docs/spec/blockchain/encoding.md @@ -339,6 +339,6 @@ type CanonicalVote struct { The field ordering and the fixed sized encoding for the first three fields is optimized to ease parsing of SignBytes in HSMs. It creates fixed offsets for relevant fields that need to be read in this context. -For more details, see the [signing spec](/docs/spec/consensus/signing.md). +For more details, see the [signing spec](../consensus/signing.md). Also, see the motivating discussion in [#1622](https://github.com/tendermint/tendermint/issues/1622). diff --git a/docs/spec/consensus/abci.md b/docs/spec/consensus/abci.md index 82b88161e..226d22899 100644 --- a/docs/spec/consensus/abci.md +++ b/docs/spec/consensus/abci.md @@ -1 +1 @@ -[Moved](/docs/spec/software/abci.md) +[Moved](../software/abci.md) diff --git a/docs/spec/consensus/wal.md b/docs/spec/consensus/wal.md index 589680f99..6146ab9c0 100644 --- a/docs/spec/consensus/wal.md +++ b/docs/spec/consensus/wal.md @@ -1 +1 @@ -[Moved](/docs/spec/software/wal.md) +[Moved](../software/wal.md) From a49d80b89c9df3cb2497adb8175915c654479708 Mon Sep 17 00:00:00 2001 From: Ismail Khoffi Date: Wed, 27 Mar 2019 19:12:01 +0100 Subject: [PATCH 24/25] catch up with develop and rebase on current release to include #3482 --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b3ee16d5..69c4f6071 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ This release contains a major improvement for the mempool that reduce the amount It also fixes a memory leak in the mempool and gives you the ability to run HTTPS RPC servers (over TLS) by providing a certificate and key in the config. Special thanks to external contributors on this release: -@guagualvcha, @HaoyangLiu, @needkane +@brapse, @guagualvcha, @HaoyangLiu, @needkane, @TraceBundy ### BREAKING CHANGES: @@ -34,6 +34,7 @@ Special thanks to external contributors on this release: ### IMPROVEMENTS: - [docs] [\#3140](https://github.com/tendermint/tendermint/issues/3140) Formalize proposer election algorithm properties +- [docs] [\#3482](https://github.com/tendermint/tendermint/issues/3482) fix broken links (@brapse) - [mempool] [\#2778](https://github.com/tendermint/tendermint/issues/2778) No longer send txs back to peers who sent it to you. Also, limit to 65536 active peers. This vastly improves the the b bandwidth consumption of nodes. From 9390a810ebda0b1aebd90f6e8b8688562bdf7958 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 27 Mar 2019 21:03:28 -0400 Subject: [PATCH 25/25] minor changelog updates (#3499) --- CHANGELOG.md | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 69c4f6071..31ee14c51 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,9 +4,9 @@ *March 27th, 2019* -This release contains a major improvement for the mempool that reduce the amount of sent data by about 30% +This release contains a major improvement for the mempool that reduce the amount of sent data by about 30% (see some numbers below). -It also fixes a memory leak in the mempool and gives you the ability to run HTTPS RPC servers (over TLS) by providing a certificate and key in the config. +It also fixes a memory leak in the mempool and adds TLS support to the RPC server by providing a certificate and key in the config. Special thanks to external contributors on this release: @brapse, @guagualvcha, @HaoyangLiu, @needkane, @TraceBundy @@ -34,23 +34,23 @@ Special thanks to external contributors on this release: ### IMPROVEMENTS: - [docs] [\#3140](https://github.com/tendermint/tendermint/issues/3140) Formalize proposer election algorithm properties -- [docs] [\#3482](https://github.com/tendermint/tendermint/issues/3482) fix broken links (@brapse) -- [mempool] [\#2778](https://github.com/tendermint/tendermint/issues/2778) No longer send txs back to peers who sent it to you. +- [docs] [\#3482](https://github.com/tendermint/tendermint/issues/3482) Fix broken links (@brapse) +- [mempool] [\#2778](https://github.com/tendermint/tendermint/issues/2778) No longer send txs back to peers who sent it to you. Also, limit to 65536 active peers. -This vastly improves the the b bandwidth consumption of nodes. -E.g. 250bytes txs for 120 sec. at 500 txs/sec. rate, so total 15MB: - - total bytes received from 1st node in 4 node localnet +This vastly improves the bandwidth consumption of nodes. +For instance, for a 4 node localnet, in a test sending 250byte txs for 120 sec. at 500 txs/sec (total of 15MB): + - total bytes received from 1st node: - before: 42793967 (43MB) - - after: 30003256 (30MB) - - total bytes sent to 1st node in 4 node localnet + - after: 30003256 (30MB) + - total bytes sent to 1st node: - before: 30569339 (30MB) - - after: 19304964 (19MB) + - after: 19304964 (19MB) - [p2p] [\#3475](https://github.com/tendermint/tendermint/issues/3475) Simplify `GetSelectionWithBias` for addressbook (@guagualvcha) - [rpc/lib/client] [\#3430](https://github.com/tendermint/tendermint/issues/3430) Disable compression for HTTP client to prevent GZIP-bomb DoS attacks (@guagualvcha) ### BUG FIXES: -- [blockchain] [\#2699](https://github.com/tendermint/tendermint/issues/2699) update the maxHeight when a peer is removed +- [blockchain] [\#2699](https://github.com/tendermint/tendermint/issues/2699) Update the maxHeight when a peer is removed - [mempool] [\#3478](https://github.com/tendermint/tendermint/issues/3478) Fix memory-leak related to `broadcastTxRoutine` (@HaoyangLiu)