Browse Source

Merge branch 'master' into develop

pull/1780/merge
Ethan Buchman 7 years ago
parent
commit
9cd9f3338b
7 changed files with 76 additions and 25 deletions
  1. +2
    -0
      .github/ISSUE_TEMPLATE
  2. +19
    -3
      CHANGELOG.md
  3. +6
    -2
      consensus/state.go
  4. +34
    -13
      mempool/mempool.go
  5. +11
    -2
      mempool/reactor.go
  6. +1
    -2
      types/keys.go
  7. +3
    -3
      version/version.go

+ 2
- 0
.github/ISSUE_TEMPLATE View File

@ -35,6 +35,8 @@ in a case of bug.
**Logs (you can paste a part showing an error or attach the whole file)**: **Logs (you can paste a part showing an error or attach the whole file)**:
**Config (you can paste only the changes you've made)**:
**`/dump_consensus_state` output for consensus bugs** **`/dump_consensus_state` output for consensus bugs**
**Anything else do we need to know**: **Anything else do we need to know**:

+ 19
- 3
CHANGELOG.md View File

@ -5,11 +5,27 @@
FEATURES: FEATURES:
- [node] added metrics (served under /metrics using a Prometheus client; disabled by default) - [node] added metrics (served under /metrics using a Prometheus client; disabled by default)
## 0.20.1
## 0.21.0
BUG FIXES:
*June 21th, 2018*
BREAKING CHANGES
- [config] Change default ports from 4665X to 2665X. Ports over 32768 are
ephemeral and reserved for use by the kernel.
- [cmd] `unsafe_reset_all` removes the addrbook.json
IMPROVEMENT
- [pubsub] Set default capacity to 0
- [docs] Various improvements
BUG FIXES
- [rpc] fix memory leak in Websocket (when using `/subscribe` method)
- [consensus] Fix an issue where we don't make blocks after `fast_sync` when `create_empty_blocks=false`
- [mempool] Fix #1761 where we don't process txs if `cache_size=0`
- [rpc] Fix memory leak in Websocket (when using `/subscribe` method)
- [config] Escape paths in config - fixes config paths on Windows
## 0.20.0 ## 0.20.0


+ 6
- 2
consensus/state.go View File

@ -484,9 +484,12 @@ func (cs *ConsensusState) updateToState(state sm.State) {
// If state isn't further out than cs.state, just ignore. // If state isn't further out than cs.state, just ignore.
// This happens when SwitchToConsensus() is called in the reactor. // This happens when SwitchToConsensus() is called in the reactor.
// We don't want to reset e.g. the Votes.
// We don't want to reset e.g. the Votes, but we still want to
// signal the new round step, because other services (eg. mempool)
// depend on having an up-to-date peer state!
if !cs.state.IsEmpty() && (state.LastBlockHeight <= cs.state.LastBlockHeight) { if !cs.state.IsEmpty() && (state.LastBlockHeight <= cs.state.LastBlockHeight) {
cs.Logger.Info("Ignoring updateToState()", "newHeight", state.LastBlockHeight+1, "oldHeight", cs.state.LastBlockHeight+1) cs.Logger.Info("Ignoring updateToState()", "newHeight", state.LastBlockHeight+1, "oldHeight", cs.state.LastBlockHeight+1)
cs.newStep()
return return
} }
@ -516,6 +519,7 @@ func (cs *ConsensusState) updateToState(state sm.State) {
} else { } else {
cs.StartTime = cs.config.Commit(cs.CommitTime) cs.StartTime = cs.config.Commit(cs.CommitTime)
} }
cs.Validators = validators cs.Validators = validators
cs.Proposal = nil cs.Proposal = nil
cs.ProposalBlock = nil cs.ProposalBlock = nil
@ -541,7 +545,7 @@ func (cs *ConsensusState) newStep() {
rs := cs.RoundStateEvent() rs := cs.RoundStateEvent()
cs.wal.Write(rs) cs.wal.Write(rs)
cs.nSteps++ cs.nSteps++
// newStep is called by updateToStep in NewConsensusState before the eventBus is set!
// newStep is called by updateToState in NewConsensusState before the eventBus is set!
if cs.eventBus != nil { if cs.eventBus != nil {
cs.eventBus.PublishEventNewRoundStep(rs) cs.eventBus.PublishEventNewRoundStep(rs)
cs.evsw.FireEvent(types.EventNewRoundStep, &cs.RoundState) cs.evsw.FireEvent(types.EventNewRoundStep, &cs.RoundState)


+ 34
- 13
mempool/mempool.go View File

@ -77,7 +77,7 @@ type Mempool struct {
// Keep a cache of already-seen txs. // Keep a cache of already-seen txs.
// This reduces the pressure on the proxyApp. // This reduces the pressure on the proxyApp.
cache *txCache
cache txCache
// A log of mempool txs // A log of mempool txs
wal *auto.AutoFile wal *auto.AutoFile
@ -107,9 +107,13 @@ func NewMempool(
recheckCursor: nil, recheckCursor: nil,
recheckEnd: nil, recheckEnd: nil,
logger: log.NewNopLogger(), logger: log.NewNopLogger(),
cache: newTxCache(config.CacheSize),
metrics: NopMetrics(), metrics: NopMetrics(),
} }
if config.CacheSize > 0 {
mempool.cache = newMapTxCache(config.CacheSize)
} else {
mempool.cache = nopTxCache{}
}
proxyAppConn.SetResponseCallback(mempool.resCb) proxyAppConn.SetResponseCallback(mempool.resCb)
for _, option := range options { for _, option := range options {
option(mempool) option(mempool)
@ -348,11 +352,11 @@ func (mem *Mempool) notifyTxsAvailable() {
panic("notified txs available but mempool is empty!") panic("notified txs available but mempool is empty!")
} }
if mem.txsAvailable != nil && !mem.notifiedTxsAvailable { if mem.txsAvailable != nil && !mem.notifiedTxsAvailable {
// channel cap is 1, so this will send once
select { select {
case mem.txsAvailable <- mem.height + 1: case mem.txsAvailable <- mem.height + 1:
default: default:
} }
mem.notifiedTxsAvailable = true mem.notifiedTxsAvailable = true
} }
} }
@ -469,33 +473,42 @@ func (memTx *mempoolTx) Height() int64 {
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
// txCache maintains a cache of transactions.
type txCache struct {
type txCache interface {
Reset()
Push(tx types.Tx) bool
Remove(tx types.Tx)
}
// mapTxCache maintains a cache of transactions.
type mapTxCache struct {
mtx sync.Mutex mtx sync.Mutex
size int size int
map_ map[string]struct{} map_ map[string]struct{}
list *list.List // to remove oldest tx when cache gets too big list *list.List // to remove oldest tx when cache gets too big
} }
// newTxCache returns a new txCache.
func newTxCache(cacheSize int) *txCache {
return &txCache{
var _ txCache = (*mapTxCache)(nil)
// newMapTxCache returns a new mapTxCache.
func newMapTxCache(cacheSize int) *mapTxCache {
return &mapTxCache{
size: cacheSize, size: cacheSize,
map_: make(map[string]struct{}, cacheSize), map_: make(map[string]struct{}, cacheSize),
list: list.New(), list: list.New(),
} }
} }
// Reset resets the txCache to empty.
func (cache *txCache) Reset() {
// Reset resets the cache to an empty state.
func (cache *mapTxCache) Reset() {
cache.mtx.Lock() cache.mtx.Lock()
cache.map_ = make(map[string]struct{}, cache.size) cache.map_ = make(map[string]struct{}, cache.size)
cache.list.Init() cache.list.Init()
cache.mtx.Unlock() cache.mtx.Unlock()
} }
// Push adds the given tx to the txCache. It returns false if tx is already in the cache.
func (cache *txCache) Push(tx types.Tx) bool {
// Push adds the given tx to the cache and returns true. It returns false if tx
// is already in the cache.
func (cache *mapTxCache) Push(tx types.Tx) bool {
cache.mtx.Lock() cache.mtx.Lock()
defer cache.mtx.Unlock() defer cache.mtx.Unlock()
@ -517,8 +530,16 @@ func (cache *txCache) Push(tx types.Tx) bool {
} }
// Remove removes the given tx from the cache. // Remove removes the given tx from the cache.
func (cache *txCache) Remove(tx types.Tx) {
func (cache *mapTxCache) Remove(tx types.Tx) {
cache.mtx.Lock() cache.mtx.Lock()
delete(cache.map_, string(tx)) delete(cache.map_, string(tx))
cache.mtx.Unlock() cache.mtx.Unlock()
} }
type nopTxCache struct{}
var _ txCache = (*nopTxCache)(nil)
func (nopTxCache) Reset() {}
func (nopTxCache) Push(types.Tx) bool { return true }
func (nopTxCache) Remove(types.Tx) {}

+ 11
- 2
mempool/reactor.go View File

@ -6,7 +6,7 @@ import (
"time" "time"
abci "github.com/tendermint/abci/types" abci "github.com/tendermint/abci/types"
"github.com/tendermint/go-amino"
amino "github.com/tendermint/go-amino"
"github.com/tendermint/tmlibs/clist" "github.com/tendermint/tmlibs/clist"
"github.com/tendermint/tmlibs/log" "github.com/tendermint/tmlibs/log"
@ -45,6 +45,14 @@ func (memR *MempoolReactor) SetLogger(l log.Logger) {
memR.Mempool.SetLogger(l) memR.Mempool.SetLogger(l)
} }
// OnStart implements p2p.BaseReactor.
func (memR *MempoolReactor) OnStart() error {
if !memR.config.Broadcast {
memR.Logger.Info("Tx broadcasting is disabled")
}
return nil
}
// GetChannels implements Reactor. // GetChannels implements Reactor.
// It returns the list of channels for this reactor. // It returns the list of channels for this reactor.
func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
@ -129,7 +137,8 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
height := memTx.Height() height := memTx.Height()
if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil { if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil {
peerState := peerState_i.(PeerState) peerState := peerState_i.(PeerState)
if peerState.GetHeight() < height-1 { // Allow for a lag of 1 block
peerHeight := peerState.GetHeight()
if peerHeight < height-1 { // Allow for a lag of 1 block
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
continue continue
} }


+ 1
- 2
types/keys.go View File

@ -2,6 +2,5 @@ package types
// UNSTABLE // UNSTABLE
var ( var (
PeerStateKey = "ConsensusReactor.peerState"
PeerMempoolChKey = "MempoolReactor.peerMempoolCh"
PeerStateKey = "ConsensusReactor.peerState"
) )

+ 3
- 3
version/version.go View File

@ -3,14 +3,14 @@ package version
// Version components // Version components
const ( const (
Maj = "0" Maj = "0"
Min = "20"
Fix = "1"
Min = "21"
Fix = "0"
) )
var ( var (
// Version is the current version of Tendermint // Version is the current version of Tendermint
// Must be a string because scripts like dist.sh read this file. // Must be a string because scripts like dist.sh read this file.
Version = "0.20.1-dev"
Version = "0.21.0"
// GitCommit is the current HEAD set using ldflags. // GitCommit is the current HEAD set using ldflags.
GitCommit string GitCommit string


Loading…
Cancel
Save