diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 22416fc3f..33fc24eaa 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -26,4 +26,5 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi ### BUG FIXES +- [blockchain/v2] \#5499 Fix "duplicate block enqueued by processor" panic (@melekes) - [abci/grpc] \#5520 Return async responses in order, to avoid mempool panics. (@erikgrinaker) diff --git a/blockchain/v2/processor.go b/blockchain/v2/processor.go index d4b2c637b..f9036f3b9 100644 --- a/blockchain/v2/processor.go +++ b/blockchain/v2/processor.go @@ -17,6 +17,11 @@ type pcBlockVerificationFailure struct { secondPeerID p2p.ID } +func (e pcBlockVerificationFailure) String() string { + return fmt.Sprintf("pcBlockVerificationFailure{%d 1st peer: %v, 2nd peer: %v}", + e.height, e.firstPeerID, e.secondPeerID) +} + // successful block execution type pcBlockProcessed struct { priorityNormal @@ -24,6 +29,10 @@ type pcBlockProcessed struct { peerID p2p.ID } +func (e pcBlockProcessed) String() string { + return fmt.Sprintf("pcBlockProcessed{%d peer: %v}", e.height, e.peerID) +} + // processor has finished type pcFinished struct { priorityNormal @@ -87,9 +96,12 @@ func (state *pcState) synced() bool { } func (state *pcState) enqueue(peerID p2p.ID, block *types.Block, height int64) { - if _, ok := state.queue[height]; ok { - panic("duplicate block enqueued by processor") + if item, ok := state.queue[height]; ok { + panic(fmt.Sprintf( + "duplicate block %d (%X) enqueued by processor (sent by %v; existing block %X from %v)", + height, block.Hash(), peerID, item.block.Hash(), item.peerID)) } + state.queue[height] = queueItem{block: block, peerID: peerID} } @@ -145,16 +157,20 @@ func (state *pcState) handle(event Event) (Event, error) { } return noOp, nil } - first, second := firstItem.block, secondItem.block - firstParts := first.MakePartSet(types.BlockPartSizeBytes) - firstPartSetHeader := firstParts.Header() - firstID := types.BlockID{Hash: first.Hash(), PartSetHeader: firstPartSetHeader} + var ( + first, second = firstItem.block, secondItem.block + firstParts = first.MakePartSet(types.BlockPartSizeBytes) + firstID = types.BlockID{Hash: first.Hash(), PartSetHeader: firstParts.Header()} + ) + // verify if +second+ last commit "confirms" +first+ block err = state.context.verifyCommit(tmState.ChainID, firstID, first.Height, second.LastCommit) if err != nil { state.purgePeer(firstItem.peerID) - state.purgePeer(secondItem.peerID) + if firstItem.peerID != secondItem.peerID { + state.purgePeer(secondItem.peerID) + } return pcBlockVerificationFailure{ height: first.Height, firstPeerID: firstItem.peerID, secondPeerID: secondItem.peerID}, nil @@ -170,7 +186,6 @@ func (state *pcState) handle(event Event) (Event, error) { state.blocksSynced++ return pcBlockProcessed{height: first.Height, peerID: firstItem.peerID}, nil - } return noOp, nil diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index 6aee5078e..8cb33f542 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -187,7 +187,7 @@ type rTryPrunePeer struct { } func (e rTryPrunePeer) String() string { - return fmt.Sprintf(": %v", e.time) + return fmt.Sprintf("rTryPrunePeer{%v}", e.time) } // ticker event for scheduling block requests @@ -197,7 +197,7 @@ type rTrySchedule struct { } func (e rTrySchedule) String() string { - return fmt.Sprintf(": %v", e.time) + return fmt.Sprintf("rTrySchedule{%v}", e.time) } // ticker for block processing @@ -205,6 +205,10 @@ type rProcessBlock struct { priorityNormal } +func (e rProcessBlock) String() string { + return "rProcessBlock" +} + // reactor generated events based on blockchain related messages from peers: // blockResponse message received from a peer type bcBlockResponse struct { @@ -215,6 +219,11 @@ type bcBlockResponse struct { block *types.Block } +func (resp bcBlockResponse) String() string { + return fmt.Sprintf("bcBlockResponse{%d#%X (size: %d bytes) from %v at %v}", + resp.block.Height, resp.block.Hash(), resp.size, resp.peerID, resp.time) +} + // blockNoResponse message received from a peer type bcNoBlockResponse struct { priorityNormal @@ -223,6 +232,11 @@ type bcNoBlockResponse struct { height int64 } +func (resp bcNoBlockResponse) String() string { + return fmt.Sprintf("bcNoBlockResponse{%v has no block at height %d at %v}", + resp.peerID, resp.height, resp.time) +} + // statusResponse message received from a peer type bcStatusResponse struct { priorityNormal @@ -232,12 +246,21 @@ type bcStatusResponse struct { height int64 } +func (resp bcStatusResponse) String() string { + return fmt.Sprintf("bcStatusResponse{%v is at height %d (base: %d) at %v}", + resp.peerID, resp.height, resp.base, resp.time) +} + // new peer is connected type bcAddNewPeer struct { priorityNormal peerID p2p.ID } +func (resp bcAddNewPeer) String() string { + return fmt.Sprintf("bcAddNewPeer{%v}", resp.peerID) +} + // existing peer is removed type bcRemovePeer struct { priorityHigh @@ -245,12 +268,20 @@ type bcRemovePeer struct { reason interface{} } +func (resp bcRemovePeer) String() string { + return fmt.Sprintf("bcRemovePeer{%v due to %v}", resp.peerID, resp.reason) +} + // resets the scheduler and processor state, e.g. following a switch from state syncing type bcResetState struct { priorityHigh state state.State } +func (e bcResetState) String() string { + return fmt.Sprintf("bcResetState{%v}", e.state) +} + // Takes the channel as a parameter to avoid race conditions on r.events. func (r *BlockchainReactor) demux(events <-chan Event) { var lastRate = 0.0 diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index 40e1971fe..e4ca52add 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -2,6 +2,7 @@ package v2 import ( "fmt" + "strings" "sync/atomic" "github.com/Workiva/go-datastructures/queue" @@ -11,6 +12,8 @@ import ( type handleFunc = func(event Event) (Event, error) +const historySize = 25 + // Routine is a structure that models a finite state machine as serialized // stream of events processed by a handle function. This Routine structure // handles the concurrency and messaging guarantees. Events are sent via @@ -21,6 +24,7 @@ type Routine struct { name string handle handleFunc queue *queue.PriorityQueue + history []Event out chan Event fin chan error rdy chan struct{} @@ -34,6 +38,7 @@ func newRoutine(name string, handleFunc handleFunc, bufferSize int) *Routine { name: name, handle: handleFunc, queue: queue.NewPriorityQueue(bufferSize, true), + history: make([]Event, 0, historySize), out: make(chan Event, bufferSize), rdy: make(chan struct{}, 1), fin: make(chan error, 1), @@ -53,13 +58,24 @@ func (rt *Routine) setMetrics(metrics *Metrics) { } func (rt *Routine) start() { - rt.logger.Info(fmt.Sprintf("%s: run\n", rt.name)) + rt.logger.Info(fmt.Sprintf("%s: run", rt.name)) running := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1)) if !running { panic(fmt.Sprintf("%s is already running", rt.name)) } close(rt.rdy) defer func() { + if r := recover(); r != nil { + var ( + b strings.Builder + j int + ) + for i := len(rt.history) - 1; i >= 0; i-- { + fmt.Fprintf(&b, "%d: %+v\n", j, rt.history[i]) + j++ + } + panic(fmt.Sprintf("%v\nlast events:\n%v", r, b.String())) + } stopped := atomic.CompareAndSwapUint32(rt.running, uint32(1), uint32(0)) if !stopped { panic(fmt.Sprintf("%s is failed to stop", rt.name)) @@ -82,7 +98,19 @@ func (rt *Routine) start() { return } rt.metrics.EventsOut.With("routine", rt.name).Add(1) - rt.logger.Debug(fmt.Sprintf("%s: produced %T %+v\n", rt.name, oEvent, oEvent)) + rt.logger.Debug(fmt.Sprintf("%s: produced %T %+v", rt.name, oEvent, oEvent)) + + // Skip rTrySchedule and rProcessBlock events as they clutter the history + // due to their frequency. + switch events[0].(type) { + case rTrySchedule: + case rProcessBlock: + default: + rt.history = append(rt.history, events[0].(Event)) + if len(rt.history) > historySize { + rt.history = rt.history[1:] + } + } rt.out <- oEvent } @@ -97,7 +125,7 @@ func (rt *Routine) send(event Event) bool { err := rt.queue.Put(event) if err != nil { rt.metrics.EventsShed.With("routine", rt.name).Add(1) - rt.logger.Info(fmt.Sprintf("%s: send failed, queue was full/stopped \n", rt.name)) + rt.logger.Error(fmt.Sprintf("%s: send failed, queue was full/stopped", rt.name)) return false } @@ -122,7 +150,7 @@ func (rt *Routine) stop() { return } - rt.logger.Info(fmt.Sprintf("%s: stop\n", rt.name)) + rt.logger.Info(fmt.Sprintf("%s: stop", rt.name)) rt.queue.Dispose() // this should block until all queue items are free? } diff --git a/blockchain/v2/scheduler.go b/blockchain/v2/scheduler.go index 84dee7a53..a5ca33cd0 100644 --- a/blockchain/v2/scheduler.go +++ b/blockchain/v2/scheduler.go @@ -2,6 +2,7 @@ package v2 import ( "bytes" + "errors" "fmt" "math" "sort" @@ -18,6 +19,10 @@ type scFinishedEv struct { reason string } +func (e scFinishedEv) String() string { + return fmt.Sprintf("scFinishedEv{%v}", e.reason) +} + // send a blockRequest message type scBlockRequest struct { priorityNormal @@ -25,6 +30,10 @@ type scBlockRequest struct { height int64 } +func (e scBlockRequest) String() string { + return fmt.Sprintf("scBlockRequest{%d from %v}", e.height, e.peerID) +} + // a block has been received and validated by the scheduler type scBlockReceived struct { priorityNormal @@ -32,6 +41,10 @@ type scBlockReceived struct { block *types.Block } +func (e scBlockReceived) String() string { + return fmt.Sprintf("scBlockReceived{%d#%X from %v}", e.block.Height, e.block.Hash(), e.peerID) +} + // scheduler detected a peer error type scPeerError struct { priorityHigh @@ -40,7 +53,7 @@ type scPeerError struct { } func (e scPeerError) String() string { - return fmt.Sprintf("scPeerError - peerID %s, err %s", e.peerID, e.reason) + return fmt.Sprintf("scPeerError{%v errored with %v}", e.peerID, e.reason) } // scheduler removed a set of peers (timed out or slow peer) @@ -49,6 +62,10 @@ type scPeersPruned struct { peers []p2p.ID } +func (e scPeersPruned) String() string { + return fmt.Sprintf("scPeersPruned{%v}", e.peers) +} + // XXX: make this fatal? // scheduler encountered a fatal error type scSchedulerFail struct { @@ -56,6 +73,10 @@ type scSchedulerFail struct { reason error } +func (e scSchedulerFail) String() string { + return fmt.Sprintf("scSchedulerFail{%v}", e.reason) +} + type blockState int const ( @@ -295,6 +316,9 @@ func (sc *scheduler) setPeerRange(peerID p2p.ID, base int64, height int64) error } if base > height { + if err := sc.removePeer(peerID); err != nil { + return err + } return fmt.Errorf("cannot set peer base higher than its height") } @@ -418,7 +442,7 @@ func (sc *scheduler) markProcessed(height int64) error { return fmt.Errorf("cannot mark height %d received from block state %s", height, state) } - sc.height++ + sc.height = height + 1 delete(sc.receivedBlocks, height) delete(sc.blockStates, height) sc.addNewBlocks() @@ -532,14 +556,12 @@ func (sc *scheduler) handleBlockResponse(event bcBlockResponse) (Event, error) { } func (sc *scheduler) handleNoBlockResponse(event bcNoBlockResponse) (Event, error) { - if len(sc.peers) == 0 { - return noOp, nil - } - + // No such peer or peer was removed. peer, ok := sc.peers[event.peerID] if !ok || peer.state == peerStateRemoved { return noOp, nil } + // The peer may have been just removed due to errors, low speed or timeouts. _ = sc.removePeer(event.peerID) @@ -550,8 +572,9 @@ func (sc *scheduler) handleNoBlockResponse(event bcNoBlockResponse) (Event, erro func (sc *scheduler) handleBlockProcessed(event pcBlockProcessed) (Event, error) { if event.height != sc.height { - panic(fmt.Sprintf("processed height %d but expected height %d", event.height, sc.height)) + panic(fmt.Sprintf("processed height %d, but expected height %d", event.height, sc.height)) } + err := sc.markProcessed(event.height) if err != nil { // It is possible that a peer error or timeout is handled after the processor @@ -601,11 +624,13 @@ func (sc *scheduler) handleRemovePeer(event bcRemovePeer) (Event, error) { if sc.allBlocksProcessed() { return scFinishedEv{reason: "removed peer"}, nil } - return noOp, nil + + // Return scPeerError so the peer (and all associated blocks) is removed from + // the processor. + return scPeerError{peerID: event.peerID, reason: errors.New("peer was stopped")}, nil } func (sc *scheduler) handleTryPrunePeer(event rTryPrunePeer) (Event, error) { - // Check behavior of peer responsible to deliver block at sc.height. timeHeightAsked, ok := sc.pendingTime[sc.height] if ok && time.Since(timeHeightAsked) > sc.peerTimeout { diff --git a/blockchain/v2/scheduler_test.go b/blockchain/v2/scheduler_test.go index a4636d954..fce0c0563 100644 --- a/blockchain/v2/scheduler_test.go +++ b/blockchain/v2/scheduler_test.go @@ -586,8 +586,7 @@ func TestScSetPeerRange(t *testing.T) { allB: []int64{1, 2, 3, 4}}, args: args{peerID: "P1", base: 6, height: 5}, wantFields: scTestParams{ - peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, - allB: []int64{1, 2, 3, 4}}, + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateRemoved}}}, wantErr: true, }, {