diff --git a/blockchain/v2/io.go b/blockchain/v2/io.go index 29850d3e5..70240aed6 100644 --- a/blockchain/v2/io.go +++ b/blockchain/v2/io.go @@ -16,7 +16,7 @@ type iIO interface { broadcastStatusRequest(base int64, height int64) - trySwitchToConsensus(state state.State, skipWAL bool) + trySwitchToConsensus(state state.State, skipWAL bool) bool } type switchIO struct { @@ -97,11 +97,12 @@ func (sio *switchIO) sendBlockNotFound(height int64, peerID p2p.ID) error { return nil } -func (sio *switchIO) trySwitchToConsensus(state state.State, skipWAL bool) { +func (sio *switchIO) trySwitchToConsensus(state state.State, skipWAL bool) bool { conR, ok := sio.sw.Reactor("CONSENSUS").(consensusReactor) if ok { conR.SwitchToConsensus(state, skipWAL) } + return ok } func (sio *switchIO) broadcastStatusRequest(base int64, height int64) { diff --git a/blockchain/v2/processor.go b/blockchain/v2/processor.go index d6a2fe1e8..539c6c88e 100644 --- a/blockchain/v2/processor.go +++ b/blockchain/v2/processor.go @@ -110,6 +110,10 @@ func (state *pcState) purgePeer(peerID p2p.ID) { // handle processes FSM events func (state *pcState) handle(event Event) (Event, error) { switch event := event.(type) { + case bcResetState: + state.context.setState(event.state) + return noOp, nil + case scFinishedEv: if state.synced() { return pcFinished{tmState: state.context.tmState(), blocksSynced: state.blocksSynced}, nil diff --git a/blockchain/v2/processor_context.go b/blockchain/v2/processor_context.go index 2e8142adc..a429411da 100644 --- a/blockchain/v2/processor_context.go +++ b/blockchain/v2/processor_context.go @@ -12,6 +12,7 @@ type processorContext interface { verifyCommit(chainID string, blockID types.BlockID, height int64, commit *types.Commit) error saveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) tmState() state.State + setState(state.State) } type pContext struct { @@ -38,6 +39,10 @@ func (pc pContext) tmState() state.State { return pc.state } +func (pc *pContext) setState(state state.State) { + pc.state = state +} + func (pc pContext) verifyCommit(chainID string, blockID types.BlockID, height int64, commit *types.Commit) error { return pc.state.Validators.VerifyCommit(chainID, blockID, height, commit) } @@ -86,6 +91,10 @@ func (mpc *mockPContext) saveBlock(block *types.Block, blockParts *types.PartSet } +func (mpc *mockPContext) setState(state state.State) { + mpc.state = state +} + func (mpc *mockPContext) tmState() state.State { return mpc.state } diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index 00d919335..98ea58306 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -15,6 +15,11 @@ import ( "github.com/tendermint/tendermint/types" ) +const ( + // chBufferSize is the buffer size of all event channels. + chBufferSize int = 1000 +) + //------------------------------------- type bcBlockRequestMessage struct { @@ -129,15 +134,16 @@ type blockStore interface { type BlockchainReactor struct { p2p.BaseReactor - fastSync bool // if true, enable fast sync on start - events chan Event // XXX: Rename eventsFromPeers - scheduler *Routine - processor *Routine - logger log.Logger + fastSync bool // if true, enable fast sync on start + stateSynced bool // set to true when SwitchToFastSync is called by state sync + scheduler *Routine + processor *Routine + logger log.Logger mtx sync.RWMutex maxPeerHeight int64 syncHeight int64 + events chan Event // non-nil during a fast sync reporter behaviour.Reporter io iIO @@ -154,9 +160,8 @@ type blockApplier interface { } // XXX: unify naming in this package around tmState -// XXX: V1 stores a copy of state as initialState, which is never mutated. Is that nessesary? func newReactor(state state.State, store blockStore, reporter behaviour.Reporter, - blockApplier blockApplier, bufferSize int, fastSync bool) *BlockchainReactor { + blockApplier blockApplier, fastSync bool) *BlockchainReactor { scheduler := newScheduler(state.LastBlockHeight, time.Now()) pContext := newProcessorContext(store, blockApplier, state) // TODO: Fix naming to just newProcesssor @@ -164,9 +169,8 @@ func newReactor(state state.State, store blockStore, reporter behaviour.Reporter processor := newPcState(pContext) return &BlockchainReactor{ - events: make(chan Event, bufferSize), - scheduler: newRoutine("scheduler", scheduler.handle, bufferSize), - processor: newRoutine("processor", processor.handle, bufferSize), + scheduler: newRoutine("scheduler", scheduler.handle, chBufferSize), + processor: newRoutine("processor", processor.handle, chBufferSize), store: store, reporter: reporter, logger: log.NewNopLogger(), @@ -181,7 +185,7 @@ func NewBlockchainReactor( store blockStore, fastSync bool) *BlockchainReactor { reporter := behaviour.NewMockReporter() - return newReactor(state, store, reporter, blockApplier, 1000, fastSync) + return newReactor(state, store, reporter, blockApplier, fastSync) } // SetSwitch implements Reactor interface. @@ -226,13 +230,54 @@ func (r *BlockchainReactor) SetLogger(logger log.Logger) { func (r *BlockchainReactor) Start() error { r.reporter = behaviour.NewSwitchReporter(r.BaseReactor.Switch) if r.fastSync { - go r.scheduler.start() - go r.processor.start() - go r.demux() + err := r.startSync(nil) + if err != nil { + return fmt.Errorf("failed to start fast sync: %w", err) + } } return nil } +// startSync begins a fast sync, signalled by r.events being non-nil. If state is non-nil, +// the scheduler and processor is updated with this state on startup. +func (r *BlockchainReactor) startSync(state *state.State) error { + r.mtx.Lock() + defer r.mtx.Unlock() + if r.events != nil { + return errors.New("fast sync already in progress") + } + r.events = make(chan Event, chBufferSize) + go r.scheduler.start() + go r.processor.start() + if state != nil { + <-r.scheduler.ready() + <-r.processor.ready() + r.scheduler.send(bcResetState{state: *state}) + r.processor.send(bcResetState{state: *state}) + } + go r.demux(r.events) + return nil +} + +// endSync ends a fast sync +func (r *BlockchainReactor) endSync() { + r.mtx.Lock() + defer r.mtx.Unlock() + if r.events != nil { + close(r.events) + } + r.events = nil + r.scheduler.stop() + r.processor.stop() +} + +// SwitchToFastSync is called by the state sync reactor when switching to fast sync. +func (r *BlockchainReactor) SwitchToFastSync(state state.State) error { + r.stateSynced = true + state = state.Copy() + return r.startSync(&state) +} + // reactor generated ticker events: // ticker for cleaning peers type rTryPrunePeer struct { @@ -299,7 +344,14 @@ type bcRemovePeer struct { reason interface{} } -func (r *BlockchainReactor) demux() { +// resets the scheduler and processor state, e.g. following a switch from state syncing +type bcResetState struct { + priorityHigh + state state.State +} + +// Takes the channel as a parameter to avoid race conditions on r.events. +func (r *BlockchainReactor) demux(events <-chan Event) { var lastRate = 0.0 var lastHundred = time.Now() @@ -330,6 +382,7 @@ func (r *BlockchainReactor) demux() { doStatusTk = time.NewTicker(statusFreq) ) defer doStatusTk.Stop() + doStatusCh <- struct{}{} // immediately broadcast to get status of existing peers // XXX: Extract timers to make testing atemporal for { @@ -367,7 +420,7 @@ func (r *BlockchainReactor) demux() { r.io.broadcastStatusRequest(r.store.Base(), r.SyncHeight()) // Events from peers. Closing the channel signals event loop termination. - case event, ok := <-r.events: + case event, ok := <-events: if !ok { r.logger.Info("Stopping event processing") return @@ -379,10 +432,10 @@ func (r *BlockchainReactor) demux() { case bcAddNewPeer, bcRemovePeer, bcBlockResponse, bcNoBlockResponse: r.scheduler.send(event) default: - r.logger.Error("Received unknown event", "event", fmt.Sprintf("%T", event)) + r.logger.Error("Received unexpected event", "event", fmt.Sprintf("%T", event)) } - // Incremental events form scheduler + // Incremental events from scheduler case event := <-r.scheduler.next(): switch event := event.(type) { case scBlockReceived: @@ -395,9 +448,13 @@ func (r *BlockchainReactor) demux() { case scFinishedEv: r.processor.send(event) r.scheduler.stop() + case scSchedulerFail: + r.logger.Error("Scheduler failure", "err", event.reason.Error()) + case scPeersPruned: + r.logger.Debug("Pruned peers", "count", len(event.peers)) case noOpEvent: default: - r.logger.Error("Received unknown scheduler event", "event", fmt.Sprintf("%T", event)) + r.logger.Error("Received unexpected scheduler event", "event", fmt.Sprintf("%T", event)) } // Incremental events from processor @@ -407,7 +464,7 @@ func (r *BlockchainReactor) demux() { r.setSyncHeight(event.height) if r.syncHeight%100 == 0 { lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) - r.logger.Info("Fast Syncc Rate", "height", r.syncHeight, + r.logger.Info("Fast Sync Rate", "height", r.syncHeight, "max_peer_height", r.maxPeerHeight, "blocks/s", lastRate) lastHundred = time.Now() } @@ -415,11 +472,15 @@ func (r *BlockchainReactor) demux() { case pcBlockVerificationFailure: r.scheduler.send(event) case pcFinished: - r.io.trySwitchToConsensus(event.tmState, event.blocksSynced > 0) - r.processor.stop() + r.logger.Info("Fast sync complete, switching to consensus") + if !r.io.trySwitchToConsensus(event.tmState, event.blocksSynced > 0 || r.stateSynced) { + r.logger.Error("Failed to switch to consensus reactor") + } + r.endSync() + return case noOpEvent: default: - r.logger.Error("Received unknown processor event", "event", fmt.Sprintf("%T", event)) + r.logger.Error("Received unexpected processor event", "event", fmt.Sprintf("%T", event)) } // Terminal event from scheduler @@ -446,11 +507,7 @@ func (r *BlockchainReactor) demux() { // Stop implements cmn.Service interface. func (r *BlockchainReactor) Stop() error { r.logger.Info("reactor stopping") - - r.scheduler.stop() - r.processor.stop() - close(r.events) - + r.endSync() r.logger.Info("reactor stopped") return nil } @@ -523,18 +580,30 @@ func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { } case *bcStatusResponseMessage: - r.events <- bcStatusResponse{peerID: src.ID(), base: msg.Base, height: msg.Height} + r.mtx.RLock() + if r.events != nil { + r.events <- bcStatusResponse{peerID: src.ID(), base: msg.Base, height: msg.Height} + } + r.mtx.RUnlock() case *bcBlockResponseMessage: - r.events <- bcBlockResponse{ - peerID: src.ID(), - block: msg.Block, - size: int64(len(msgBytes)), - time: time.Now(), + r.mtx.RLock() + if r.events != nil { + r.events <- bcBlockResponse{ + peerID: src.ID(), + block: msg.Block, + size: int64(len(msgBytes)), + time: time.Now(), + } } + r.mtx.RUnlock() case *bcNoBlockResponseMessage: - r.events <- bcNoBlockResponse{peerID: src.ID(), height: msg.Height, time: time.Now()} + r.mtx.RLock() + if r.events != nil { + r.events <- bcNoBlockResponse{peerID: src.ID(), height: msg.Height, time: time.Now()} + } + r.mtx.RUnlock() } } @@ -544,16 +613,23 @@ func (r *BlockchainReactor) AddPeer(peer p2p.Peer) { if err != nil { r.logger.Error("Could not send status message to peer new", "src", peer.ID, "height", r.SyncHeight()) } - r.events <- bcAddNewPeer{peerID: peer.ID()} + r.mtx.RLock() + defer r.mtx.RUnlock() + if r.events != nil { + r.events <- bcAddNewPeer{peerID: peer.ID()} + } } // RemovePeer implements Reactor interface. func (r *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { - event := bcRemovePeer{ - peerID: peer.ID(), - reason: reason, + r.mtx.RLock() + defer r.mtx.RUnlock() + if r.events != nil { + r.events <- bcRemovePeer{ + peerID: peer.ID(), + reason: reason, + } } - r.events <- event } // GetChannels implements Reactor diff --git a/blockchain/v2/reactor_test.go b/blockchain/v2/reactor_test.go index cdcd795ef..9230d7714 100644 --- a/blockchain/v2/reactor_test.go +++ b/blockchain/v2/reactor_test.go @@ -117,10 +117,11 @@ func (sio *mockSwitchIo) sendBlockNotFound(height int64, peerID p2p.ID) error { return nil } -func (sio *mockSwitchIo) trySwitchToConsensus(state sm.State, skipWAL bool) { +func (sio *mockSwitchIo) trySwitchToConsensus(state sm.State, skipWAL bool) bool { sio.mtx.Lock() defer sio.mtx.Unlock() sio.switchedToConsensus = true + return true } func (sio *mockSwitchIo) broadcastStatusRequest(base int64, height int64) { @@ -131,7 +132,6 @@ type testReactorParams struct { genDoc *types.GenesisDoc privVals []types.PrivValidator startHeight int64 - bufferSize int mockA bool } @@ -156,7 +156,7 @@ func newTestReactor(p testReactorParams) *BlockchainReactor { sm.SaveState(db, state) } - r := newReactor(state, store, reporter, appl, p.bufferSize, true) + r := newReactor(state, store, reporter, appl, true) logger := log.TestingLogger() r.SetLogger(logger.With("module", "blockchain")) @@ -353,7 +353,6 @@ func TestReactorHelperMode(t *testing.T) { genDoc: genDoc, privVals: privVals, startHeight: 20, - bufferSize: 100, mockA: true, } @@ -383,9 +382,9 @@ func TestReactorHelperMode(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { reactor := newTestReactor(params) - reactor.Start() mockSwitch := &mockSwitchIo{switchedToConsensus: false} reactor.io = mockSwitch + reactor.Start() for i := 0; i < len(tt.msgs); i++ { step := tt.msgs[i] diff --git a/blockchain/v2/scheduler.go b/blockchain/v2/scheduler.go index 803955b22..b769a195e 100644 --- a/blockchain/v2/scheduler.go +++ b/blockchain/v2/scheduler.go @@ -195,13 +195,11 @@ func newScheduler(initHeight int64, startTime time.Time) *scheduler { return &sc } -func (sc *scheduler) addPeer(peerID p2p.ID) error { - if _, ok := sc.peers[peerID]; ok { - // In the future we should be able to add a previously removed peer - return fmt.Errorf("cannot add duplicate peer %s", peerID) +func (sc *scheduler) ensurePeer(peerID p2p.ID) *scPeer { + if _, ok := sc.peers[peerID]; !ok { + sc.peers[peerID] = newScPeer(peerID) } - sc.peers[peerID] = newScPeer(peerID) - return nil + return sc.peers[peerID] } func (sc *scheduler) touchPeer(peerID p2p.ID, time time.Time) error { @@ -222,7 +220,7 @@ func (sc *scheduler) touchPeer(peerID p2p.ID, time time.Time) error { func (sc *scheduler) removePeer(peerID p2p.ID) error { peer, ok := sc.peers[peerID] if !ok { - return fmt.Errorf("couldn't find peer %s", peerID) + return nil } if peer.state == peerStateRemoved { @@ -283,10 +281,7 @@ func (sc *scheduler) addNewBlocks() { } func (sc *scheduler) setPeerRange(peerID p2p.ID, base int64, height int64) error { - peer, ok := sc.peers[peerID] - if !ok { - return fmt.Errorf("cannot find peer %s", peerID) - } + peer := sc.ensurePeer(peerID) if peer.state == peerStateRemoved { return fmt.Errorf("cannot set peer height for a peer in peerStateRemoved") @@ -354,7 +349,7 @@ func (sc *scheduler) setStateAtHeight(height int64, state blockState) { func (sc *scheduler) markReceived(peerID p2p.ID, height int64, size int64, now time.Time) error { peer, ok := sc.peers[peerID] if !ok { - return fmt.Errorf("couldn't find peer %s", peerID) + return fmt.Errorf("received block from unknown peer %s", peerID) } if peer.state != peerStateReady { @@ -590,10 +585,7 @@ func (sc *scheduler) handleBlockProcessError(event pcBlockVerificationFailure) ( } func (sc *scheduler) handleAddNewPeer(event bcAddNewPeer) (Event, error) { - err := sc.addPeer(event.peerID) - if err != nil { - return scSchedulerFail{reason: err}, nil - } + sc.ensurePeer(event.peerID) return noOp, nil } @@ -643,6 +635,14 @@ func (sc *scheduler) handleTryPrunePeer(event rTryPrunePeer) (Event, error) { } +func (sc *scheduler) handleResetState(event bcResetState) (Event, error) { + sc.initHeight = event.state.LastBlockHeight + 1 + sc.height = event.state.LastBlockHeight + 1 + sc.lastAdvance = time.Now() + sc.addNewBlocks() + return noOp, nil +} + func (sc *scheduler) handleTrySchedule(event rTrySchedule) (Event, error) { if time.Since(sc.lastAdvance) > sc.syncTimeout { return scFinishedEv{reason: "timeout, no advance"}, nil @@ -674,6 +674,9 @@ func (sc *scheduler) handleStatusResponse(event bcStatusResponse) (Event, error) func (sc *scheduler) handle(event Event) (Event, error) { switch event := event.(type) { + case bcResetState: + nextEvent, err := sc.handleResetState(event) + return nextEvent, err case bcStatusResponse: nextEvent, err := sc.handleStatusResponse(event) return nextEvent, err diff --git a/blockchain/v2/scheduler_test.go b/blockchain/v2/scheduler_test.go index 4ec81e123..0f0f4cc07 100644 --- a/blockchain/v2/scheduler_test.go +++ b/blockchain/v2/scheduler_test.go @@ -8,8 +8,10 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) @@ -178,7 +180,7 @@ func TestScMaxHeights(t *testing.T) { } } -func TestScAddPeer(t *testing.T) { +func TestScEnsurePeer(t *testing.T) { type args struct { peerID p2p.ID @@ -188,7 +190,6 @@ func TestScAddPeer(t *testing.T) { fields scTestParams args args wantFields scTestParams - wantErr bool }{ { name: "add first peer", @@ -205,20 +206,18 @@ func TestScAddPeer(t *testing.T) { "P2": {base: -1, height: -1, state: peerStateNew}}}, }, { - name: "attempt to add duplicate peer", + name: "add duplicate peer is fine", fields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1}}}, args: args{peerID: "P1"}, wantFields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1}}}, - wantErr: true, }, { - name: "attempt to add duplicate peer with existing peer in Ready state", + name: "add duplicate peer with existing peer in Ready state is noop", fields: scTestParams{ peers: map[string]*scPeer{"P1": {state: peerStateReady, height: 3}}, allB: []int64{1, 2, 3}, }, - args: args{peerID: "P1"}, - wantErr: true, + args: args{peerID: "P1"}, wantFields: scTestParams{ peers: map[string]*scPeer{"P1": {state: peerStateReady, height: 3}}, allB: []int64{1, 2, 3}, @@ -230,9 +229,7 @@ func TestScAddPeer(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { sc := newTestScheduler(tt.fields) - if err := sc.addPeer(tt.args.peerID); (err != nil) != tt.wantErr { - t.Errorf("scAddPeer() wantErr %v, error = %v", tt.wantErr, err) - } + sc.ensurePeer(tt.args.peerID) wantSc := newTestScheduler(tt.wantFields) assert.Equal(t, wantSc, sc, "wanted peers %v, got %v", wantSc.peers, sc.peers) }) @@ -374,7 +371,6 @@ func TestScRemovePeer(t *testing.T) { fields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1}}}, args: args{peerID: "P2"}, wantFields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1}}}, - wantErr: true, }, { name: "remove single New peer", @@ -522,9 +518,11 @@ func TestScSetPeerRange(t *testing.T) { allB: []int64{1, 2}}, args: args{peerID: "P2", height: 4}, wantFields: scTestParams{ - peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, - allB: []int64{1, 2}}, - wantErr: true, + peers: map[string]*scPeer{ + "P1": {height: 2, state: peerStateReady}, + "P2": {height: 4, state: peerStateReady}, + }, + allB: []int64{1, 2, 3, 4}}, }, { name: "increase height of removed peer", @@ -1043,6 +1041,40 @@ func TestScMarkProcessed(t *testing.T) { } } +func TestScResetState(t *testing.T) { + tests := []struct { + name string + fields scTestParams + state state.State + wantFields scTestParams + }{ + { + name: "updates height and initHeight", + fields: scTestParams{ + height: 0, + initHeight: 0, + }, + state: state.State{LastBlockHeight: 7}, + wantFields: scTestParams{ + height: 8, + initHeight: 8, + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + e, err := sc.handleResetState(bcResetState{state: tt.state}) + require.NoError(t, err) + assert.Equal(t, e, noOp) + wantSc := newTestScheduler(tt.wantFields) + checkSameScheduler(t, wantSc, sc) + }) + } +} + func TestScAllBlocksProcessed(t *testing.T) { now := time.Now() @@ -1715,7 +1747,7 @@ func TestScHandleAddNewPeer(t *testing.T) { allB: []int64{6, 7, 8}, }, args: args{event: addP1}, - wantEvent: scSchedulerFail{reason: fmt.Errorf("some error")}, + wantEvent: noOpEvent{}, }, { name: "add P1 to non empty scheduler", @@ -1961,7 +1993,7 @@ func TestScHandleStatusResponse(t *testing.T) { allB: []int64{1, 2}, }, args: args{event: statusRespP1Ev}, - wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")}, + wantEvent: noOpEvent{}, }, { diff --git a/node/node.go b/node/node.go index 7623563d2..b53095eff 100644 --- a/node/node.go +++ b/node/node.go @@ -683,10 +683,6 @@ func NewNode(config *cfg.Config, logger.Info("Found local state with non-zero height, skipping state sync") stateSync = false } - // Don't check fastSync == true, since the v2 reactor has a bug where it fast syncs regardless. - if stateSync && config.FastSync.Version == "v2" { - return nil, errors.New("state sync is not supported with blockchain v2 reactor") - } // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state, // and replays any blocks as necessary to sync tendermint with the app.