Browse Source

blockchain/v2: integrate with state sync

Integrates the blockchain v2 reactor with state sync, fixes #4765. This mostly involves deferring fast syncing until after state sync completes. I tried a few different approaches, this was the least effort:

* `Reactor.events` is `nil` if no fast sync is in progress, in which case events are not dispatched - most importantly `AddPeer`.

* Accept status messages from unknown peers in the scheduler and register them as ready. On fast sync startup, broadcast status requests to all existing peers.

* When switching from state sync, first send a `bcResetState` message to the processor and scheduler to update their states - most importantly the initial block height.

* When fast sync completes, shut down event loop, scheduler and processor, and set `events` channel to `nil`.
pull/4808/head
Erik Grinaker 5 years ago
committed by GitHub
parent
commit
eb443f4b77
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 203 additions and 83 deletions
  1. +3
    -2
      blockchain/v2/io.go
  2. +4
    -0
      blockchain/v2/processor.go
  3. +9
    -0
      blockchain/v2/processor_context.go
  4. +116
    -40
      blockchain/v2/reactor.go
  5. +4
    -5
      blockchain/v2/reactor_test.go
  6. +19
    -16
      blockchain/v2/scheduler.go
  7. +48
    -16
      blockchain/v2/scheduler_test.go
  8. +0
    -4
      node/node.go

+ 3
- 2
blockchain/v2/io.go View File

@ -16,7 +16,7 @@ type iIO interface {
broadcastStatusRequest(base int64, height int64)
trySwitchToConsensus(state state.State, skipWAL bool)
trySwitchToConsensus(state state.State, skipWAL bool) bool
}
type switchIO struct {
@ -97,11 +97,12 @@ func (sio *switchIO) sendBlockNotFound(height int64, peerID p2p.ID) error {
return nil
}
func (sio *switchIO) trySwitchToConsensus(state state.State, skipWAL bool) {
func (sio *switchIO) trySwitchToConsensus(state state.State, skipWAL bool) bool {
conR, ok := sio.sw.Reactor("CONSENSUS").(consensusReactor)
if ok {
conR.SwitchToConsensus(state, skipWAL)
}
return ok
}
func (sio *switchIO) broadcastStatusRequest(base int64, height int64) {


+ 4
- 0
blockchain/v2/processor.go View File

@ -110,6 +110,10 @@ func (state *pcState) purgePeer(peerID p2p.ID) {
// handle processes FSM events
func (state *pcState) handle(event Event) (Event, error) {
switch event := event.(type) {
case bcResetState:
state.context.setState(event.state)
return noOp, nil
case scFinishedEv:
if state.synced() {
return pcFinished{tmState: state.context.tmState(), blocksSynced: state.blocksSynced}, nil


+ 9
- 0
blockchain/v2/processor_context.go View File

@ -12,6 +12,7 @@ type processorContext interface {
verifyCommit(chainID string, blockID types.BlockID, height int64, commit *types.Commit) error
saveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit)
tmState() state.State
setState(state.State)
}
type pContext struct {
@ -38,6 +39,10 @@ func (pc pContext) tmState() state.State {
return pc.state
}
func (pc *pContext) setState(state state.State) {
pc.state = state
}
func (pc pContext) verifyCommit(chainID string, blockID types.BlockID, height int64, commit *types.Commit) error {
return pc.state.Validators.VerifyCommit(chainID, blockID, height, commit)
}
@ -86,6 +91,10 @@ func (mpc *mockPContext) saveBlock(block *types.Block, blockParts *types.PartSet
}
func (mpc *mockPContext) setState(state state.State) {
mpc.state = state
}
func (mpc *mockPContext) tmState() state.State {
return mpc.state
}

+ 116
- 40
blockchain/v2/reactor.go View File

@ -15,6 +15,11 @@ import (
"github.com/tendermint/tendermint/types"
)
const (
// chBufferSize is the buffer size of all event channels.
chBufferSize int = 1000
)
//-------------------------------------
type bcBlockRequestMessage struct {
@ -129,15 +134,16 @@ type blockStore interface {
type BlockchainReactor struct {
p2p.BaseReactor
fastSync bool // if true, enable fast sync on start
events chan Event // XXX: Rename eventsFromPeers
scheduler *Routine
processor *Routine
logger log.Logger
fastSync bool // if true, enable fast sync on start
stateSynced bool // set to true when SwitchToFastSync is called by state sync
scheduler *Routine
processor *Routine
logger log.Logger
mtx sync.RWMutex
maxPeerHeight int64
syncHeight int64
events chan Event // non-nil during a fast sync
reporter behaviour.Reporter
io iIO
@ -154,9 +160,8 @@ type blockApplier interface {
}
// XXX: unify naming in this package around tmState
// XXX: V1 stores a copy of state as initialState, which is never mutated. Is that nessesary?
func newReactor(state state.State, store blockStore, reporter behaviour.Reporter,
blockApplier blockApplier, bufferSize int, fastSync bool) *BlockchainReactor {
blockApplier blockApplier, fastSync bool) *BlockchainReactor {
scheduler := newScheduler(state.LastBlockHeight, time.Now())
pContext := newProcessorContext(store, blockApplier, state)
// TODO: Fix naming to just newProcesssor
@ -164,9 +169,8 @@ func newReactor(state state.State, store blockStore, reporter behaviour.Reporter
processor := newPcState(pContext)
return &BlockchainReactor{
events: make(chan Event, bufferSize),
scheduler: newRoutine("scheduler", scheduler.handle, bufferSize),
processor: newRoutine("processor", processor.handle, bufferSize),
scheduler: newRoutine("scheduler", scheduler.handle, chBufferSize),
processor: newRoutine("processor", processor.handle, chBufferSize),
store: store,
reporter: reporter,
logger: log.NewNopLogger(),
@ -181,7 +185,7 @@ func NewBlockchainReactor(
store blockStore,
fastSync bool) *BlockchainReactor {
reporter := behaviour.NewMockReporter()
return newReactor(state, store, reporter, blockApplier, 1000, fastSync)
return newReactor(state, store, reporter, blockApplier, fastSync)
}
// SetSwitch implements Reactor interface.
@ -226,13 +230,54 @@ func (r *BlockchainReactor) SetLogger(logger log.Logger) {
func (r *BlockchainReactor) Start() error {
r.reporter = behaviour.NewSwitchReporter(r.BaseReactor.Switch)
if r.fastSync {
go r.scheduler.start()
go r.processor.start()
go r.demux()
err := r.startSync(nil)
if err != nil {
return fmt.Errorf("failed to start fast sync: %w", err)
}
}
return nil
}
// startSync begins a fast sync, signalled by r.events being non-nil. If state is non-nil,
// the scheduler and processor is updated with this state on startup.
func (r *BlockchainReactor) startSync(state *state.State) error {
r.mtx.Lock()
defer r.mtx.Unlock()
if r.events != nil {
return errors.New("fast sync already in progress")
}
r.events = make(chan Event, chBufferSize)
go r.scheduler.start()
go r.processor.start()
if state != nil {
<-r.scheduler.ready()
<-r.processor.ready()
r.scheduler.send(bcResetState{state: *state})
r.processor.send(bcResetState{state: *state})
}
go r.demux(r.events)
return nil
}
// endSync ends a fast sync
func (r *BlockchainReactor) endSync() {
r.mtx.Lock()
defer r.mtx.Unlock()
if r.events != nil {
close(r.events)
}
r.events = nil
r.scheduler.stop()
r.processor.stop()
}
// SwitchToFastSync is called by the state sync reactor when switching to fast sync.
func (r *BlockchainReactor) SwitchToFastSync(state state.State) error {
r.stateSynced = true
state = state.Copy()
return r.startSync(&state)
}
// reactor generated ticker events:
// ticker for cleaning peers
type rTryPrunePeer struct {
@ -299,7 +344,14 @@ type bcRemovePeer struct {
reason interface{}
}
func (r *BlockchainReactor) demux() {
// resets the scheduler and processor state, e.g. following a switch from state syncing
type bcResetState struct {
priorityHigh
state state.State
}
// Takes the channel as a parameter to avoid race conditions on r.events.
func (r *BlockchainReactor) demux(events <-chan Event) {
var lastRate = 0.0
var lastHundred = time.Now()
@ -330,6 +382,7 @@ func (r *BlockchainReactor) demux() {
doStatusTk = time.NewTicker(statusFreq)
)
defer doStatusTk.Stop()
doStatusCh <- struct{}{} // immediately broadcast to get status of existing peers
// XXX: Extract timers to make testing atemporal
for {
@ -367,7 +420,7 @@ func (r *BlockchainReactor) demux() {
r.io.broadcastStatusRequest(r.store.Base(), r.SyncHeight())
// Events from peers. Closing the channel signals event loop termination.
case event, ok := <-r.events:
case event, ok := <-events:
if !ok {
r.logger.Info("Stopping event processing")
return
@ -379,10 +432,10 @@ func (r *BlockchainReactor) demux() {
case bcAddNewPeer, bcRemovePeer, bcBlockResponse, bcNoBlockResponse:
r.scheduler.send(event)
default:
r.logger.Error("Received unknown event", "event", fmt.Sprintf("%T", event))
r.logger.Error("Received unexpected event", "event", fmt.Sprintf("%T", event))
}
// Incremental events form scheduler
// Incremental events from scheduler
case event := <-r.scheduler.next():
switch event := event.(type) {
case scBlockReceived:
@ -395,9 +448,13 @@ func (r *BlockchainReactor) demux() {
case scFinishedEv:
r.processor.send(event)
r.scheduler.stop()
case scSchedulerFail:
r.logger.Error("Scheduler failure", "err", event.reason.Error())
case scPeersPruned:
r.logger.Debug("Pruned peers", "count", len(event.peers))
case noOpEvent:
default:
r.logger.Error("Received unknown scheduler event", "event", fmt.Sprintf("%T", event))
r.logger.Error("Received unexpected scheduler event", "event", fmt.Sprintf("%T", event))
}
// Incremental events from processor
@ -407,7 +464,7 @@ func (r *BlockchainReactor) demux() {
r.setSyncHeight(event.height)
if r.syncHeight%100 == 0 {
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
r.logger.Info("Fast Syncc Rate", "height", r.syncHeight,
r.logger.Info("Fast Sync Rate", "height", r.syncHeight,
"max_peer_height", r.maxPeerHeight, "blocks/s", lastRate)
lastHundred = time.Now()
}
@ -415,11 +472,15 @@ func (r *BlockchainReactor) demux() {
case pcBlockVerificationFailure:
r.scheduler.send(event)
case pcFinished:
r.io.trySwitchToConsensus(event.tmState, event.blocksSynced > 0)
r.processor.stop()
r.logger.Info("Fast sync complete, switching to consensus")
if !r.io.trySwitchToConsensus(event.tmState, event.blocksSynced > 0 || r.stateSynced) {
r.logger.Error("Failed to switch to consensus reactor")
}
r.endSync()
return
case noOpEvent:
default:
r.logger.Error("Received unknown processor event", "event", fmt.Sprintf("%T", event))
r.logger.Error("Received unexpected processor event", "event", fmt.Sprintf("%T", event))
}
// Terminal event from scheduler
@ -446,11 +507,7 @@ func (r *BlockchainReactor) demux() {
// Stop implements cmn.Service interface.
func (r *BlockchainReactor) Stop() error {
r.logger.Info("reactor stopping")
r.scheduler.stop()
r.processor.stop()
close(r.events)
r.endSync()
r.logger.Info("reactor stopped")
return nil
}
@ -523,18 +580,30 @@ func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
}
case *bcStatusResponseMessage:
r.events <- bcStatusResponse{peerID: src.ID(), base: msg.Base, height: msg.Height}
r.mtx.RLock()
if r.events != nil {
r.events <- bcStatusResponse{peerID: src.ID(), base: msg.Base, height: msg.Height}
}
r.mtx.RUnlock()
case *bcBlockResponseMessage:
r.events <- bcBlockResponse{
peerID: src.ID(),
block: msg.Block,
size: int64(len(msgBytes)),
time: time.Now(),
r.mtx.RLock()
if r.events != nil {
r.events <- bcBlockResponse{
peerID: src.ID(),
block: msg.Block,
size: int64(len(msgBytes)),
time: time.Now(),
}
}
r.mtx.RUnlock()
case *bcNoBlockResponseMessage:
r.events <- bcNoBlockResponse{peerID: src.ID(), height: msg.Height, time: time.Now()}
r.mtx.RLock()
if r.events != nil {
r.events <- bcNoBlockResponse{peerID: src.ID(), height: msg.Height, time: time.Now()}
}
r.mtx.RUnlock()
}
}
@ -544,16 +613,23 @@ func (r *BlockchainReactor) AddPeer(peer p2p.Peer) {
if err != nil {
r.logger.Error("Could not send status message to peer new", "src", peer.ID, "height", r.SyncHeight())
}
r.events <- bcAddNewPeer{peerID: peer.ID()}
r.mtx.RLock()
defer r.mtx.RUnlock()
if r.events != nil {
r.events <- bcAddNewPeer{peerID: peer.ID()}
}
}
// RemovePeer implements Reactor interface.
func (r *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
event := bcRemovePeer{
peerID: peer.ID(),
reason: reason,
r.mtx.RLock()
defer r.mtx.RUnlock()
if r.events != nil {
r.events <- bcRemovePeer{
peerID: peer.ID(),
reason: reason,
}
}
r.events <- event
}
// GetChannels implements Reactor


+ 4
- 5
blockchain/v2/reactor_test.go View File

@ -117,10 +117,11 @@ func (sio *mockSwitchIo) sendBlockNotFound(height int64, peerID p2p.ID) error {
return nil
}
func (sio *mockSwitchIo) trySwitchToConsensus(state sm.State, skipWAL bool) {
func (sio *mockSwitchIo) trySwitchToConsensus(state sm.State, skipWAL bool) bool {
sio.mtx.Lock()
defer sio.mtx.Unlock()
sio.switchedToConsensus = true
return true
}
func (sio *mockSwitchIo) broadcastStatusRequest(base int64, height int64) {
@ -131,7 +132,6 @@ type testReactorParams struct {
genDoc *types.GenesisDoc
privVals []types.PrivValidator
startHeight int64
bufferSize int
mockA bool
}
@ -156,7 +156,7 @@ func newTestReactor(p testReactorParams) *BlockchainReactor {
sm.SaveState(db, state)
}
r := newReactor(state, store, reporter, appl, p.bufferSize, true)
r := newReactor(state, store, reporter, appl, true)
logger := log.TestingLogger()
r.SetLogger(logger.With("module", "blockchain"))
@ -353,7 +353,6 @@ func TestReactorHelperMode(t *testing.T) {
genDoc: genDoc,
privVals: privVals,
startHeight: 20,
bufferSize: 100,
mockA: true,
}
@ -383,9 +382,9 @@ func TestReactorHelperMode(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
reactor := newTestReactor(params)
reactor.Start()
mockSwitch := &mockSwitchIo{switchedToConsensus: false}
reactor.io = mockSwitch
reactor.Start()
for i := 0; i < len(tt.msgs); i++ {
step := tt.msgs[i]


+ 19
- 16
blockchain/v2/scheduler.go View File

@ -195,13 +195,11 @@ func newScheduler(initHeight int64, startTime time.Time) *scheduler {
return &sc
}
func (sc *scheduler) addPeer(peerID p2p.ID) error {
if _, ok := sc.peers[peerID]; ok {
// In the future we should be able to add a previously removed peer
return fmt.Errorf("cannot add duplicate peer %s", peerID)
func (sc *scheduler) ensurePeer(peerID p2p.ID) *scPeer {
if _, ok := sc.peers[peerID]; !ok {
sc.peers[peerID] = newScPeer(peerID)
}
sc.peers[peerID] = newScPeer(peerID)
return nil
return sc.peers[peerID]
}
func (sc *scheduler) touchPeer(peerID p2p.ID, time time.Time) error {
@ -222,7 +220,7 @@ func (sc *scheduler) touchPeer(peerID p2p.ID, time time.Time) error {
func (sc *scheduler) removePeer(peerID p2p.ID) error {
peer, ok := sc.peers[peerID]
if !ok {
return fmt.Errorf("couldn't find peer %s", peerID)
return nil
}
if peer.state == peerStateRemoved {
@ -283,10 +281,7 @@ func (sc *scheduler) addNewBlocks() {
}
func (sc *scheduler) setPeerRange(peerID p2p.ID, base int64, height int64) error {
peer, ok := sc.peers[peerID]
if !ok {
return fmt.Errorf("cannot find peer %s", peerID)
}
peer := sc.ensurePeer(peerID)
if peer.state == peerStateRemoved {
return fmt.Errorf("cannot set peer height for a peer in peerStateRemoved")
@ -354,7 +349,7 @@ func (sc *scheduler) setStateAtHeight(height int64, state blockState) {
func (sc *scheduler) markReceived(peerID p2p.ID, height int64, size int64, now time.Time) error {
peer, ok := sc.peers[peerID]
if !ok {
return fmt.Errorf("couldn't find peer %s", peerID)
return fmt.Errorf("received block from unknown peer %s", peerID)
}
if peer.state != peerStateReady {
@ -590,10 +585,7 @@ func (sc *scheduler) handleBlockProcessError(event pcBlockVerificationFailure) (
}
func (sc *scheduler) handleAddNewPeer(event bcAddNewPeer) (Event, error) {
err := sc.addPeer(event.peerID)
if err != nil {
return scSchedulerFail{reason: err}, nil
}
sc.ensurePeer(event.peerID)
return noOp, nil
}
@ -643,6 +635,14 @@ func (sc *scheduler) handleTryPrunePeer(event rTryPrunePeer) (Event, error) {
}
func (sc *scheduler) handleResetState(event bcResetState) (Event, error) {
sc.initHeight = event.state.LastBlockHeight + 1
sc.height = event.state.LastBlockHeight + 1
sc.lastAdvance = time.Now()
sc.addNewBlocks()
return noOp, nil
}
func (sc *scheduler) handleTrySchedule(event rTrySchedule) (Event, error) {
if time.Since(sc.lastAdvance) > sc.syncTimeout {
return scFinishedEv{reason: "timeout, no advance"}, nil
@ -674,6 +674,9 @@ func (sc *scheduler) handleStatusResponse(event bcStatusResponse) (Event, error)
func (sc *scheduler) handle(event Event) (Event, error) {
switch event := event.(type) {
case bcResetState:
nextEvent, err := sc.handleResetState(event)
return nextEvent, err
case bcStatusResponse:
nextEvent, err := sc.handleStatusResponse(event)
return nextEvent, err


+ 48
- 16
blockchain/v2/scheduler_test.go View File

@ -8,8 +8,10 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
)
@ -178,7 +180,7 @@ func TestScMaxHeights(t *testing.T) {
}
}
func TestScAddPeer(t *testing.T) {
func TestScEnsurePeer(t *testing.T) {
type args struct {
peerID p2p.ID
@ -188,7 +190,6 @@ func TestScAddPeer(t *testing.T) {
fields scTestParams
args args
wantFields scTestParams
wantErr bool
}{
{
name: "add first peer",
@ -205,20 +206,18 @@ func TestScAddPeer(t *testing.T) {
"P2": {base: -1, height: -1, state: peerStateNew}}},
},
{
name: "attempt to add duplicate peer",
name: "add duplicate peer is fine",
fields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1}}},
args: args{peerID: "P1"},
wantFields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1}}},
wantErr: true,
},
{
name: "attempt to add duplicate peer with existing peer in Ready state",
name: "add duplicate peer with existing peer in Ready state is noop",
fields: scTestParams{
peers: map[string]*scPeer{"P1": {state: peerStateReady, height: 3}},
allB: []int64{1, 2, 3},
},
args: args{peerID: "P1"},
wantErr: true,
args: args{peerID: "P1"},
wantFields: scTestParams{
peers: map[string]*scPeer{"P1": {state: peerStateReady, height: 3}},
allB: []int64{1, 2, 3},
@ -230,9 +229,7 @@ func TestScAddPeer(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
sc := newTestScheduler(tt.fields)
if err := sc.addPeer(tt.args.peerID); (err != nil) != tt.wantErr {
t.Errorf("scAddPeer() wantErr %v, error = %v", tt.wantErr, err)
}
sc.ensurePeer(tt.args.peerID)
wantSc := newTestScheduler(tt.wantFields)
assert.Equal(t, wantSc, sc, "wanted peers %v, got %v", wantSc.peers, sc.peers)
})
@ -374,7 +371,6 @@ func TestScRemovePeer(t *testing.T) {
fields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1}}},
args: args{peerID: "P2"},
wantFields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1}}},
wantErr: true,
},
{
name: "remove single New peer",
@ -522,9 +518,11 @@ func TestScSetPeerRange(t *testing.T) {
allB: []int64{1, 2}},
args: args{peerID: "P2", height: 4},
wantFields: scTestParams{
peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}},
allB: []int64{1, 2}},
wantErr: true,
peers: map[string]*scPeer{
"P1": {height: 2, state: peerStateReady},
"P2": {height: 4, state: peerStateReady},
},
allB: []int64{1, 2, 3, 4}},
},
{
name: "increase height of removed peer",
@ -1043,6 +1041,40 @@ func TestScMarkProcessed(t *testing.T) {
}
}
func TestScResetState(t *testing.T) {
tests := []struct {
name string
fields scTestParams
state state.State
wantFields scTestParams
}{
{
name: "updates height and initHeight",
fields: scTestParams{
height: 0,
initHeight: 0,
},
state: state.State{LastBlockHeight: 7},
wantFields: scTestParams{
height: 8,
initHeight: 8,
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
sc := newTestScheduler(tt.fields)
e, err := sc.handleResetState(bcResetState{state: tt.state})
require.NoError(t, err)
assert.Equal(t, e, noOp)
wantSc := newTestScheduler(tt.wantFields)
checkSameScheduler(t, wantSc, sc)
})
}
}
func TestScAllBlocksProcessed(t *testing.T) {
now := time.Now()
@ -1715,7 +1747,7 @@ func TestScHandleAddNewPeer(t *testing.T) {
allB: []int64{6, 7, 8},
},
args: args{event: addP1},
wantEvent: scSchedulerFail{reason: fmt.Errorf("some error")},
wantEvent: noOpEvent{},
},
{
name: "add P1 to non empty scheduler",
@ -1961,7 +1993,7 @@ func TestScHandleStatusResponse(t *testing.T) {
allB: []int64{1, 2},
},
args: args{event: statusRespP1Ev},
wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")},
wantEvent: noOpEvent{},
},
{


+ 0
- 4
node/node.go View File

@ -683,10 +683,6 @@ func NewNode(config *cfg.Config,
logger.Info("Found local state with non-zero height, skipping state sync")
stateSync = false
}
// Don't check fastSync == true, since the v2 reactor has a bug where it fast syncs regardless.
if stateSync && config.FastSync.Version == "v2" {
return nil, errors.New("state sync is not supported with blockchain v2 reactor")
}
// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
// and replays any blocks as necessary to sync tendermint with the app.


Loading…
Cancel
Save