Browse Source

Merge branch 'master' into wb/issue-8124

pull/8145/head
William Banfield 3 years ago
committed by GitHub
parent
commit
cb374f56e9
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 27 additions and 77 deletions
  1. +5
    -6
      internal/consensus/common_test.go
  2. +3
    -1
      internal/consensus/reactor.go
  3. +6
    -18
      internal/consensus/replay_test.go
  4. +8
    -9
      internal/consensus/state.go
  5. +5
    -7
      internal/consensus/wal_generator.go
  6. +0
    -15
      internal/libs/autofile/group.go
  7. +0
    -10
      libs/events/events.go
  8. +0
    -11
      libs/events/events_test.go

+ 5
- 6
internal/consensus/common_test.go View File

@ -885,8 +885,11 @@ func randConsensusNetWithPeers(
app := appFunc(logger, filepath.Join(cfg.DBDir(), fmt.Sprintf("%s_%d", testName, i))) app := appFunc(logger, filepath.Join(cfg.DBDir(), fmt.Sprintf("%s_%d", testName, i)))
vals := types.TM2PB.ValidatorUpdates(state.Validators) vals := types.TM2PB.ValidatorUpdates(state.Validators)
if _, ok := app.(*kvstore.PersistentKVStoreApplication); ok {
// simulate handshake, receive app version. If don't do this, replay test will fail
switch app.(type) {
// simulate handshake, receive app version. If don't do this, replay test will fail
case *kvstore.PersistentKVStoreApplication:
state.Version.Consensus.App = kvstore.ProtocolVersion
case *kvstore.Application:
state.Version.Consensus.App = kvstore.ProtocolVersion state.Version.Consensus.App = kvstore.ProtocolVersion
} }
app.InitChain(abci.RequestInitChain{Validators: vals}) app.InitChain(abci.RequestInitChain{Validators: vals})
@ -973,10 +976,6 @@ func newEpehemeralKVStore(_ log.Logger, _ string) abci.Application {
return kvstore.NewApplication() return kvstore.NewApplication()
} }
func newPersistentKVStore(logger log.Logger, dbDir string) abci.Application {
return kvstore.NewPersistentKVStoreApplication(logger, dbDir)
}
func signDataIsEqual(v1 *types.Vote, v2 *tmproto.Vote) bool { func signDataIsEqual(v1 *types.Vote, v2 *tmproto.Vote) bool {
if v1 == nil || v2 == nil { if v1 == nil || v2 == nil {
return false return false


+ 3
- 1
internal/consensus/reactor.go View File

@ -348,6 +348,8 @@ func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote)
// internal pubsub defined in the consensus state to broadcast them to peers // internal pubsub defined in the consensus state to broadcast them to peers
// upon receiving. // upon receiving.
func (r *Reactor) subscribeToBroadcastEvents() { func (r *Reactor) subscribeToBroadcastEvents() {
onStopCh := r.state.getOnStopCh()
err := r.state.evsw.AddListenerForEvent( err := r.state.evsw.AddListenerForEvent(
listenerIDConsensus, listenerIDConsensus,
types.EventNewRoundStepValue, types.EventNewRoundStepValue,
@ -356,7 +358,7 @@ func (r *Reactor) subscribeToBroadcastEvents() {
return err return err
} }
select { select {
case r.state.onStopCh <- data.(*cstypes.RoundState):
case onStopCh <- data.(*cstypes.RoundState):
return nil return nil
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()


+ 6
- 18
internal/consensus/replay_test.go View File

@ -8,7 +8,6 @@ import (
"io" "io"
"math/rand" "math/rand"
"os" "os"
"path/filepath"
"runtime" "runtime"
"testing" "testing"
"time" "time"
@ -338,13 +337,13 @@ func setupSimulator(ctx context.Context, t *testing.T) *simulatorTestSuite {
nPeers, nPeers,
"replay_test", "replay_test",
newMockTickerFunc(true), newMockTickerFunc(true),
newPersistentKVStore)
newEpehemeralKVStore)
sim.Config = cfg sim.Config = cfg
defer func() { t.Cleanup(cleanup) }()
var err error var err error
sim.GenesisState, err = sm.MakeGenesisState(genDoc) sim.GenesisState, err = sm.MakeGenesisState(genDoc)
require.NoError(t, err) require.NoError(t, err)
sim.CleanupFunc = cleanup
partSize := types.BlockPartSizeBytes partSize := types.BlockPartSizeBytes
@ -584,9 +583,6 @@ func setupSimulator(ctx context.Context, t *testing.T) *simulatorTestSuite {
sim.Chain = append(sim.Chain, css[0].blockStore.LoadBlock(int64(i))) sim.Chain = append(sim.Chain, css[0].blockStore.LoadBlock(int64(i)))
sim.Commits = append(sim.Commits, css[0].blockStore.LoadBlockCommit(int64(i))) sim.Commits = append(sim.Commits, css[0].blockStore.LoadBlockCommit(int64(i)))
} }
if sim.CleanupFunc != nil {
t.Cleanup(sim.CleanupFunc)
}
return sim return sim
} }
@ -743,19 +739,14 @@ func testHandshakeReplay(
) )
latestAppHash := state.AppHash latestAppHash := state.AppHash
// make a new client creator
kvstoreApp := kvstore.NewPersistentKVStoreApplication(logger,
filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_a_r%d", nBlocks, mode, rand.Int())))
t.Cleanup(func() { require.NoError(t, kvstoreApp.Close()) })
eventBus := eventbus.NewDefault(logger) eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx)) require.NoError(t, eventBus.Start(ctx))
clientCreator2 := abciclient.NewLocalClient(logger, kvstoreApp)
client := abciclient.NewLocalClient(logger, kvstore.NewApplication())
if nBlocks > 0 { if nBlocks > 0 {
// run nBlocks against a new client to build up the app state. // run nBlocks against a new client to build up the app state.
// use a throwaway tendermint state // use a throwaway tendermint state
proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics())
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
stateDB1 := dbm.NewMemDB() stateDB1 := dbm.NewMemDB()
stateStore := sm.NewStore(stateDB1) stateStore := sm.NewStore(stateDB1)
err := stateStore.Save(genesisState) err := stateStore.Save(genesisState)
@ -776,7 +767,7 @@ func testHandshakeReplay(
genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile()) genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
require.NoError(t, err) require.NoError(t, err)
handshaker := NewHandshaker(logger, stateStore, state, store, eventBus, genDoc) handshaker := NewHandshaker(logger, stateStore, state, store, eventBus, genDoc)
proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics())
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections") require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections")
require.True(t, proxyApp.IsRunning()) require.True(t, proxyApp.IsRunning())
require.NotNil(t, proxyApp) require.NotNil(t, proxyApp)
@ -905,10 +896,7 @@ func buildTMStateFromChain(
t.Helper() t.Helper()
// run the whole chain against this client to build up the tendermint state // run the whole chain against this client to build up the tendermint state
kvstoreApp := kvstore.NewPersistentKVStoreApplication(logger,
filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_t", nBlocks, mode)))
defer kvstoreApp.Close()
client := abciclient.NewLocalClient(logger, kvstoreApp)
client := abciclient.NewLocalClient(logger, kvstore.NewApplication())
proxyApp := proxy.New(client, logger, proxy.NopMetrics()) proxyApp := proxy.New(client, logger, proxy.NopMetrics())
require.NoError(t, proxyApp.Start(ctx)) require.NoError(t, proxyApp.Start(ctx))


+ 8
- 9
internal/consensus/state.go View File

@ -452,10 +452,6 @@ func (cs *State) OnStart(ctx context.Context) error {
} }
} }
if err := cs.evsw.Start(ctx); err != nil {
return err
}
// Double Signing Risk Reduction // Double Signing Risk Reduction
if err := cs.checkDoubleSigningRisk(cs.Height); err != nil { if err := cs.checkDoubleSigningRisk(cs.Height); err != nil {
return err return err
@ -497,12 +493,19 @@ func (cs *State) loadWalFile(ctx context.Context) error {
return nil return nil
} }
func (cs *State) getOnStopCh() chan *cstypes.RoundState {
cs.mtx.RLock()
defer cs.mtx.RUnlock()
return cs.onStopCh
}
// OnStop implements service.Service. // OnStop implements service.Service.
func (cs *State) OnStop() { func (cs *State) OnStop() {
// If the node is committing a new block, wait until it is finished! // If the node is committing a new block, wait until it is finished!
if cs.GetRoundState().Step == cstypes.RoundStepCommit { if cs.GetRoundState().Step == cstypes.RoundStepCommit {
select { select {
case <-cs.onStopCh:
case <-cs.getOnStopCh():
case <-time.After(cs.config.TimeoutCommit): case <-time.After(cs.config.TimeoutCommit):
cs.logger.Error("OnStop: timeout waiting for commit to finish", "time", cs.config.TimeoutCommit) cs.logger.Error("OnStop: timeout waiting for commit to finish", "time", cs.config.TimeoutCommit)
} }
@ -510,10 +513,6 @@ func (cs *State) OnStop() {
close(cs.onStopCh) close(cs.onStopCh)
if cs.evsw.IsRunning() {
cs.evsw.Stop()
}
if cs.timeoutTicker.IsRunning() { if cs.timeoutTicker.IsRunning() {
cs.timeoutTicker.Stop() cs.timeoutTicker.Stop()
} }


+ 5
- 7
internal/consensus/wal_generator.go View File

@ -7,7 +7,6 @@ import (
"fmt" "fmt"
"io" "io"
mrand "math/rand" mrand "math/rand"
"path/filepath"
"testing" "testing"
"time" "time"
@ -26,18 +25,17 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
// WALGenerateNBlocks generates a consensus WAL. It does this by spinning up a
// stripped down version of node (proxy app, event bus, consensus state) with a
// persistent kvstore application and special consensus wal instance
// (byteBufferWAL) and waits until numBlocks are created.
// WALGenerateNBlocks generates a consensus WAL. It does this by
// spinning up a stripped down version of node (proxy app, event bus,
// consensus state) with a kvstore application and special consensus
// wal instance (byteBufferWAL) and waits until numBlocks are created.
// If the node fails to produce given numBlocks, it fails the test. // If the node fails to produce given numBlocks, it fails the test.
func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr io.Writer, numBlocks int) { func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr io.Writer, numBlocks int) {
t.Helper() t.Helper()
cfg := getConfig(t) cfg := getConfig(t)
app := kvstore.NewPersistentKVStoreApplication(logger, filepath.Join(cfg.DBDir(), "wal_generator"))
t.Cleanup(func() { require.NoError(t, app.Close()) })
app := kvstore.NewApplication()
logger.Info("generating WAL (last height msg excluded)", "numBlocks", numBlocks) logger.Info("generating WAL (last height msg excluded)", "numBlocks", numBlocks)


+ 0
- 15
internal/libs/autofile/group.go View File

@ -69,11 +69,6 @@ type Group struct {
minIndex int // Includes head minIndex int // Includes head
maxIndex int // Includes head, where Head will move to maxIndex int // Includes head, where Head will move to
// close this when the processTicks routine is done.
// this ensures we can cleanup the dir after calling Stop
// and the routine won't be trying to access it anymore
doneProcessTicks chan struct{}
// TODO: When we start deleting files, we need to start tracking GroupReaders // TODO: When we start deleting files, we need to start tracking GroupReaders
// and their dependencies. // and their dependencies.
} }
@ -101,7 +96,6 @@ func OpenGroup(ctx context.Context, logger log.Logger, headPath string, groupOpt
groupCheckDuration: defaultGroupCheckDuration, groupCheckDuration: defaultGroupCheckDuration,
minIndex: 0, minIndex: 0,
maxIndex: 0, maxIndex: 0,
doneProcessTicks: make(chan struct{}),
} }
for _, option := range groupOptions { for _, option := range groupOptions {
@ -154,13 +148,6 @@ func (g *Group) OnStop() {
} }
} }
// Wait blocks until all internal goroutines are finished. Supposed to be
// called after Stop.
func (g *Group) Wait() {
// wait for processTicks routine to finish
<-g.doneProcessTicks
}
// Close closes the head file. The group must be stopped by this moment. // Close closes the head file. The group must be stopped by this moment.
func (g *Group) Close() { func (g *Group) Close() {
if err := g.FlushAndSync(); err != nil { if err := g.FlushAndSync(); err != nil {
@ -241,8 +228,6 @@ func (g *Group) FlushAndSync() error {
} }
func (g *Group) processTicks(ctx context.Context) { func (g *Group) processTicks(ctx context.Context) {
defer close(g.doneProcessTicks)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():


+ 0
- 10
libs/events/events.go View File

@ -7,7 +7,6 @@ import (
"sync" "sync"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
) )
// ErrListenerWasRemoved is returned by AddEvent if the listener was removed. // ErrListenerWasRemoved is returned by AddEvent if the listener was removed.
@ -45,16 +44,11 @@ type Fireable interface {
// They can be removed by calling either RemoveListenerForEvent or // They can be removed by calling either RemoveListenerForEvent or
// RemoveListener (for all events). // RemoveListener (for all events).
type EventSwitch interface { type EventSwitch interface {
service.Service
Fireable Fireable
Stop()
AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error
} }
type eventSwitch struct { type eventSwitch struct {
service.BaseService
mtx sync.RWMutex mtx sync.RWMutex
eventCells map[string]*eventCell eventCells map[string]*eventCell
listeners map[string]*eventListener listeners map[string]*eventListener
@ -65,13 +59,9 @@ func NewEventSwitch(logger log.Logger) EventSwitch {
eventCells: make(map[string]*eventCell), eventCells: make(map[string]*eventCell),
listeners: make(map[string]*eventListener), listeners: make(map[string]*eventListener),
} }
evsw.BaseService = *service.NewBaseService(logger, "EventSwitch", evsw)
return evsw return evsw
} }
func (evsw *eventSwitch) OnStart(ctx context.Context) error { return nil }
func (evsw *eventSwitch) OnStop() {}
func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error { func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error {
// Get/Create eventCell and listener. // Get/Create eventCell and listener.
evsw.mtx.Lock() evsw.mtx.Lock()


+ 0
- 11
libs/events/events_test.go View File

@ -21,8 +21,6 @@ func TestAddListenerForEventFireOnce(t *testing.T) {
logger := log.NewTestingLogger(t) logger := log.NewTestingLogger(t)
evsw := NewEventSwitch(logger) evsw := NewEventSwitch(logger)
require.NoError(t, evsw.Start(ctx))
t.Cleanup(evsw.Wait)
messages := make(chan EventData) messages := make(chan EventData)
require.NoError(t, evsw.AddListenerForEvent("listener", "event", require.NoError(t, evsw.AddListenerForEvent("listener", "event",
@ -50,8 +48,6 @@ func TestAddListenerForEventFireMany(t *testing.T) {
logger := log.NewTestingLogger(t) logger := log.NewTestingLogger(t)
evsw := NewEventSwitch(logger) evsw := NewEventSwitch(logger)
require.NoError(t, evsw.Start(ctx))
t.Cleanup(evsw.Wait)
doneSum := make(chan uint64) doneSum := make(chan uint64)
doneSending := make(chan uint64) doneSending := make(chan uint64)
@ -88,8 +84,6 @@ func TestAddListenerForDifferentEvents(t *testing.T) {
logger := log.NewTestingLogger(t) logger := log.NewTestingLogger(t)
evsw := NewEventSwitch(logger) evsw := NewEventSwitch(logger)
require.NoError(t, evsw.Start(ctx))
t.Cleanup(evsw.Wait)
doneSum := make(chan uint64) doneSum := make(chan uint64)
doneSending1 := make(chan uint64) doneSending1 := make(chan uint64)
@ -151,9 +145,6 @@ func TestAddDifferentListenerForDifferentEvents(t *testing.T) {
logger := log.NewTestingLogger(t) logger := log.NewTestingLogger(t)
evsw := NewEventSwitch(logger) evsw := NewEventSwitch(logger)
require.NoError(t, evsw.Start(ctx))
t.Cleanup(evsw.Wait)
doneSum1 := make(chan uint64) doneSum1 := make(chan uint64)
doneSum2 := make(chan uint64) doneSum2 := make(chan uint64)
@ -247,8 +238,6 @@ func TestManageListenersAsync(t *testing.T) {
logger := log.NewTestingLogger(t) logger := log.NewTestingLogger(t)
evsw := NewEventSwitch(logger) evsw := NewEventSwitch(logger)
require.NoError(t, evsw.Start(ctx))
t.Cleanup(evsw.Wait)
doneSum1 := make(chan uint64) doneSum1 := make(chan uint64)
doneSum2 := make(chan uint64) doneSum2 := make(chan uint64)


Loading…
Cancel
Save