This commit should be one of the first to land as part of the v0.36 cycle *after* cutting the 0.35 branch. The blocksync/v2 reactor was originally implemented as an experiement to produce an implementation of the blockstack protocol that would be easier to test and validate, but it was never appropriately operationalized and this implementation was never fully debugged. When the p2p layer was refactored as part of the 0.35 cycle, the v2 implementation was not refactored and it was left in the codebase but not removed. This commit just removes all references to it.pull/7064/head
@ -1,42 +0,0 @@ | |||||
/* | |||||
Package Behavior provides a mechanism for reactors to report behavior of peers. | |||||
Instead of a reactor calling the switch directly it will call the behavior module which will | |||||
handle the stoping and marking peer as good on behalf of the reactor. | |||||
There are four different behaviors a reactor can report. | |||||
1. bad message | |||||
type badMessage struct { | |||||
explanation string | |||||
} | |||||
This message will request the peer be stopped for an error | |||||
2. message out of order | |||||
type messageOutOfOrder struct { | |||||
explanation string | |||||
} | |||||
This message will request the peer be stopped for an error | |||||
3. consesnsus Vote | |||||
type consensusVote struct { | |||||
explanation string | |||||
} | |||||
This message will request the peer be marked as good | |||||
4. block part | |||||
type blockPart struct { | |||||
explanation string | |||||
} | |||||
This message will request the peer be marked as good | |||||
*/ | |||||
package behavior |
@ -1,47 +0,0 @@ | |||||
package behavior | |||||
import "github.com/tendermint/tendermint/types" | |||||
// PeerBehavior is a struct describing a behavior a peer performed. | |||||
// `peerID` identifies the peer and reason characterizes the specific | |||||
// behavior performed by the peer. | |||||
type PeerBehavior struct { | |||||
peerID types.NodeID | |||||
reason interface{} | |||||
} | |||||
type badMessage struct { | |||||
explanation string | |||||
} | |||||
// BadMessage returns a badMessage PeerBehavior. | |||||
func BadMessage(peerID types.NodeID, explanation string) PeerBehavior { | |||||
return PeerBehavior{peerID: peerID, reason: badMessage{explanation}} | |||||
} | |||||
type messageOutOfOrder struct { | |||||
explanation string | |||||
} | |||||
// MessageOutOfOrder returns a messagOutOfOrder PeerBehavior. | |||||
func MessageOutOfOrder(peerID types.NodeID, explanation string) PeerBehavior { | |||||
return PeerBehavior{peerID: peerID, reason: messageOutOfOrder{explanation}} | |||||
} | |||||
type consensusVote struct { | |||||
explanation string | |||||
} | |||||
// ConsensusVote returns a consensusVote PeerBehavior. | |||||
func ConsensusVote(peerID types.NodeID, explanation string) PeerBehavior { | |||||
return PeerBehavior{peerID: peerID, reason: consensusVote{explanation}} | |||||
} | |||||
type blockPart struct { | |||||
explanation string | |||||
} | |||||
// BlockPart returns blockPart PeerBehavior. | |||||
func BlockPart(peerID types.NodeID, explanation string) PeerBehavior { | |||||
return PeerBehavior{peerID: peerID, reason: blockPart{explanation}} | |||||
} |
@ -1,87 +0,0 @@ | |||||
package behavior | |||||
import ( | |||||
"errors" | |||||
tmsync "github.com/tendermint/tendermint/internal/libs/sync" | |||||
"github.com/tendermint/tendermint/internal/p2p" | |||||
"github.com/tendermint/tendermint/types" | |||||
) | |||||
// Reporter provides an interface for reactors to report the behavior | |||||
// of peers synchronously to other components. | |||||
type Reporter interface { | |||||
Report(behavior PeerBehavior) error | |||||
} | |||||
// SwitchReporter reports peer behavior to an internal Switch. | |||||
type SwitchReporter struct { | |||||
sw *p2p.Switch | |||||
} | |||||
// NewSwitchReporter return a new SwitchReporter instance which wraps the Switch. | |||||
func NewSwitchReporter(sw *p2p.Switch) *SwitchReporter { | |||||
return &SwitchReporter{ | |||||
sw: sw, | |||||
} | |||||
} | |||||
// Report reports the behavior of a peer to the Switch. | |||||
func (spbr *SwitchReporter) Report(behavior PeerBehavior) error { | |||||
peer := spbr.sw.Peers().Get(behavior.peerID) | |||||
if peer == nil { | |||||
return errors.New("peer not found") | |||||
} | |||||
switch reason := behavior.reason.(type) { | |||||
case consensusVote, blockPart: | |||||
spbr.sw.MarkPeerAsGood(peer) | |||||
case badMessage: | |||||
spbr.sw.StopPeerForError(peer, reason.explanation) | |||||
case messageOutOfOrder: | |||||
spbr.sw.StopPeerForError(peer, reason.explanation) | |||||
default: | |||||
return errors.New("unknown reason reported") | |||||
} | |||||
return nil | |||||
} | |||||
// MockReporter is a concrete implementation of the Reporter | |||||
// interface used in reactor tests to ensure reactors report the correct | |||||
// behavior in manufactured scenarios. | |||||
type MockReporter struct { | |||||
mtx tmsync.RWMutex | |||||
pb map[types.NodeID][]PeerBehavior | |||||
} | |||||
// NewMockReporter returns a Reporter which records all reported | |||||
// behaviors in memory. | |||||
func NewMockReporter() *MockReporter { | |||||
return &MockReporter{ | |||||
pb: map[types.NodeID][]PeerBehavior{}, | |||||
} | |||||
} | |||||
// Report stores the PeerBehavior produced by the peer identified by peerID. | |||||
func (mpbr *MockReporter) Report(behavior PeerBehavior) error { | |||||
mpbr.mtx.Lock() | |||||
defer mpbr.mtx.Unlock() | |||||
mpbr.pb[behavior.peerID] = append(mpbr.pb[behavior.peerID], behavior) | |||||
return nil | |||||
} | |||||
// GetBehaviors returns all behaviors reported on the peer identified by peerID. | |||||
func (mpbr *MockReporter) GetBehaviors(peerID types.NodeID) []PeerBehavior { | |||||
mpbr.mtx.RLock() | |||||
defer mpbr.mtx.RUnlock() | |||||
if items, ok := mpbr.pb[peerID]; ok { | |||||
result := make([]PeerBehavior, len(items)) | |||||
copy(result, items) | |||||
return result | |||||
} | |||||
return []PeerBehavior{} | |||||
} |
@ -1,205 +0,0 @@ | |||||
package behavior_test | |||||
import ( | |||||
"sync" | |||||
"testing" | |||||
bh "github.com/tendermint/tendermint/internal/blocksync/v2/internal/behavior" | |||||
"github.com/tendermint/tendermint/types" | |||||
) | |||||
// TestMockReporter tests the MockReporter's ability to store reported | |||||
// peer behavior in memory indexed by the peerID. | |||||
func TestMockReporter(t *testing.T) { | |||||
var peerID types.NodeID = "MockPeer" | |||||
pr := bh.NewMockReporter() | |||||
behaviors := pr.GetBehaviors(peerID) | |||||
if len(behaviors) != 0 { | |||||
t.Error("Expected to have no behaviors reported") | |||||
} | |||||
badMessage := bh.BadMessage(peerID, "bad message") | |||||
if err := pr.Report(badMessage); err != nil { | |||||
t.Error(err) | |||||
} | |||||
behaviors = pr.GetBehaviors(peerID) | |||||
if len(behaviors) != 1 { | |||||
t.Error("Expected the peer have one reported behavior") | |||||
} | |||||
if behaviors[0] != badMessage { | |||||
t.Error("Expected Bad Message to have been reported") | |||||
} | |||||
} | |||||
type scriptItem struct { | |||||
peerID types.NodeID | |||||
behavior bh.PeerBehavior | |||||
} | |||||
// equalBehaviors returns true if a and b contain the same PeerBehaviors with | |||||
// the same freequencies and otherwise false. | |||||
func equalBehaviors(a []bh.PeerBehavior, b []bh.PeerBehavior) bool { | |||||
aHistogram := map[bh.PeerBehavior]int{} | |||||
bHistogram := map[bh.PeerBehavior]int{} | |||||
for _, behavior := range a { | |||||
aHistogram[behavior]++ | |||||
} | |||||
for _, behavior := range b { | |||||
bHistogram[behavior]++ | |||||
} | |||||
if len(aHistogram) != len(bHistogram) { | |||||
return false | |||||
} | |||||
for _, behavior := range a { | |||||
if aHistogram[behavior] != bHistogram[behavior] { | |||||
return false | |||||
} | |||||
} | |||||
for _, behavior := range b { | |||||
if bHistogram[behavior] != aHistogram[behavior] { | |||||
return false | |||||
} | |||||
} | |||||
return true | |||||
} | |||||
// TestEqualPeerBehaviors tests that equalBehaviors can tell that two slices | |||||
// of peer behaviors can be compared for the behaviors they contain and the | |||||
// freequencies that those behaviors occur. | |||||
func TestEqualPeerBehaviors(t *testing.T) { | |||||
var ( | |||||
peerID types.NodeID = "MockPeer" | |||||
consensusVote = bh.ConsensusVote(peerID, "voted") | |||||
blockPart = bh.BlockPart(peerID, "blocked") | |||||
equals = []struct { | |||||
left []bh.PeerBehavior | |||||
right []bh.PeerBehavior | |||||
}{ | |||||
// Empty sets | |||||
{[]bh.PeerBehavior{}, []bh.PeerBehavior{}}, | |||||
// Single behaviors | |||||
{[]bh.PeerBehavior{consensusVote}, []bh.PeerBehavior{consensusVote}}, | |||||
// Equal Frequencies | |||||
{[]bh.PeerBehavior{consensusVote, consensusVote}, | |||||
[]bh.PeerBehavior{consensusVote, consensusVote}}, | |||||
// Equal frequencies different orders | |||||
{[]bh.PeerBehavior{consensusVote, blockPart}, | |||||
[]bh.PeerBehavior{blockPart, consensusVote}}, | |||||
} | |||||
unequals = []struct { | |||||
left []bh.PeerBehavior | |||||
right []bh.PeerBehavior | |||||
}{ | |||||
// Comparing empty sets to non empty sets | |||||
{[]bh.PeerBehavior{}, []bh.PeerBehavior{consensusVote}}, | |||||
// Different behaviors | |||||
{[]bh.PeerBehavior{consensusVote}, []bh.PeerBehavior{blockPart}}, | |||||
// Same behavior with different frequencies | |||||
{[]bh.PeerBehavior{consensusVote}, | |||||
[]bh.PeerBehavior{consensusVote, consensusVote}}, | |||||
} | |||||
) | |||||
for _, test := range equals { | |||||
if !equalBehaviors(test.left, test.right) { | |||||
t.Errorf("expected %#v and %#v to be equal", test.left, test.right) | |||||
} | |||||
} | |||||
for _, test := range unequals { | |||||
if equalBehaviors(test.left, test.right) { | |||||
t.Errorf("expected %#v and %#v to be unequal", test.left, test.right) | |||||
} | |||||
} | |||||
} | |||||
// TestPeerBehaviorConcurrency constructs a scenario in which | |||||
// multiple goroutines are using the same MockReporter instance. | |||||
// This test reproduces the conditions in which MockReporter will | |||||
// be used within a Reactor `Receive` method tests to ensure thread safety. | |||||
func TestMockPeerBehaviorReporterConcurrency(t *testing.T) { | |||||
var ( | |||||
behaviorScript = []struct { | |||||
peerID types.NodeID | |||||
behaviors []bh.PeerBehavior | |||||
}{ | |||||
{"1", []bh.PeerBehavior{bh.ConsensusVote("1", "")}}, | |||||
{"2", []bh.PeerBehavior{bh.ConsensusVote("2", ""), bh.ConsensusVote("2", ""), bh.ConsensusVote("2", "")}}, | |||||
{ | |||||
"3", | |||||
[]bh.PeerBehavior{bh.BlockPart("3", ""), | |||||
bh.ConsensusVote("3", ""), | |||||
bh.BlockPart("3", ""), | |||||
bh.ConsensusVote("3", "")}}, | |||||
{ | |||||
"4", | |||||
[]bh.PeerBehavior{bh.ConsensusVote("4", ""), | |||||
bh.ConsensusVote("4", ""), | |||||
bh.ConsensusVote("4", ""), | |||||
bh.ConsensusVote("4", "")}}, | |||||
{ | |||||
"5", | |||||
[]bh.PeerBehavior{bh.BlockPart("5", ""), | |||||
bh.ConsensusVote("5", ""), | |||||
bh.BlockPart("5", ""), | |||||
bh.ConsensusVote("5", "")}}, | |||||
} | |||||
) | |||||
var receiveWg sync.WaitGroup | |||||
pr := bh.NewMockReporter() | |||||
scriptItems := make(chan scriptItem) | |||||
done := make(chan int) | |||||
numConsumers := 3 | |||||
for i := 0; i < numConsumers; i++ { | |||||
receiveWg.Add(1) | |||||
go func() { | |||||
defer receiveWg.Done() | |||||
for { | |||||
select { | |||||
case pb := <-scriptItems: | |||||
if err := pr.Report(pb.behavior); err != nil { | |||||
t.Error(err) | |||||
} | |||||
case <-done: | |||||
return | |||||
} | |||||
} | |||||
}() | |||||
} | |||||
var sendingWg sync.WaitGroup | |||||
sendingWg.Add(1) | |||||
go func() { | |||||
defer sendingWg.Done() | |||||
for _, item := range behaviorScript { | |||||
for _, reason := range item.behaviors { | |||||
scriptItems <- scriptItem{item.peerID, reason} | |||||
} | |||||
} | |||||
}() | |||||
sendingWg.Wait() | |||||
for i := 0; i < numConsumers; i++ { | |||||
done <- 1 | |||||
} | |||||
receiveWg.Wait() | |||||
for _, items := range behaviorScript { | |||||
reported := pr.GetBehaviors(items.peerID) | |||||
if !equalBehaviors(reported, items.behaviors) { | |||||
t.Errorf("expected peer %s to have behaved \nExpected: %#v \nGot %#v \n", | |||||
items.peerID, items.behaviors, reported) | |||||
} | |||||
} | |||||
} |
@ -1,187 +0,0 @@ | |||||
package v2 | |||||
import ( | |||||
"errors" | |||||
"github.com/gogo/protobuf/proto" | |||||
"github.com/tendermint/tendermint/internal/p2p" | |||||
"github.com/tendermint/tendermint/internal/state" | |||||
bcproto "github.com/tendermint/tendermint/proto/tendermint/blocksync" | |||||
"github.com/tendermint/tendermint/types" | |||||
) | |||||
var ( | |||||
errPeerQueueFull = errors.New("peer queue full") | |||||
) | |||||
type iIO interface { | |||||
sendBlockRequest(peer p2p.Peer, height int64) error | |||||
sendBlockToPeer(block *types.Block, peer p2p.Peer) error | |||||
sendBlockNotFound(height int64, peer p2p.Peer) error | |||||
sendStatusResponse(base, height int64, peer p2p.Peer) error | |||||
sendStatusRequest(peer p2p.Peer) error | |||||
broadcastStatusRequest() error | |||||
trySwitchToConsensus(state state.State, skipWAL bool) bool | |||||
} | |||||
type switchIO struct { | |||||
sw *p2p.Switch | |||||
} | |||||
func newSwitchIo(sw *p2p.Switch) *switchIO { | |||||
return &switchIO{ | |||||
sw: sw, | |||||
} | |||||
} | |||||
const ( | |||||
// BlockchainChannel is a channel for blocks and status updates (`BlockStore` height) | |||||
BlockchainChannel = byte(0x40) | |||||
) | |||||
type consensusReactor interface { | |||||
// for when we switch from blockchain reactor and block sync to | |||||
// the consensus machine | |||||
SwitchToConsensus(state state.State, skipWAL bool) | |||||
} | |||||
func (sio *switchIO) sendBlockRequest(peer p2p.Peer, height int64) error { | |||||
msgProto := &bcproto.Message{ | |||||
Sum: &bcproto.Message_BlockRequest{ | |||||
BlockRequest: &bcproto.BlockRequest{ | |||||
Height: height, | |||||
}, | |||||
}, | |||||
} | |||||
msgBytes, err := proto.Marshal(msgProto) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
queued := peer.TrySend(BlockchainChannel, msgBytes) | |||||
if !queued { | |||||
return errPeerQueueFull | |||||
} | |||||
return nil | |||||
} | |||||
func (sio *switchIO) sendStatusResponse(base int64, height int64, peer p2p.Peer) error { | |||||
msgProto := &bcproto.Message{ | |||||
Sum: &bcproto.Message_StatusResponse{ | |||||
StatusResponse: &bcproto.StatusResponse{ | |||||
Height: height, | |||||
Base: base, | |||||
}, | |||||
}, | |||||
} | |||||
msgBytes, err := proto.Marshal(msgProto) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued { | |||||
return errPeerQueueFull | |||||
} | |||||
return nil | |||||
} | |||||
func (sio *switchIO) sendBlockToPeer(block *types.Block, peer p2p.Peer) error { | |||||
if block == nil { | |||||
panic("trying to send nil block") | |||||
} | |||||
bpb, err := block.ToProto() | |||||
if err != nil { | |||||
return err | |||||
} | |||||
msgProto := &bcproto.Message{ | |||||
Sum: &bcproto.Message_BlockResponse{ | |||||
BlockResponse: &bcproto.BlockResponse{ | |||||
Block: bpb, | |||||
}, | |||||
}, | |||||
} | |||||
msgBytes, err := proto.Marshal(msgProto) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued { | |||||
return errPeerQueueFull | |||||
} | |||||
return nil | |||||
} | |||||
func (sio *switchIO) sendBlockNotFound(height int64, peer p2p.Peer) error { | |||||
msgProto := &bcproto.Message{ | |||||
Sum: &bcproto.Message_NoBlockResponse{ | |||||
NoBlockResponse: &bcproto.NoBlockResponse{ | |||||
Height: height, | |||||
}, | |||||
}, | |||||
} | |||||
msgBytes, err := proto.Marshal(msgProto) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued { | |||||
return errPeerQueueFull | |||||
} | |||||
return nil | |||||
} | |||||
func (sio *switchIO) trySwitchToConsensus(state state.State, skipWAL bool) bool { | |||||
conR, ok := sio.sw.Reactor("CONSENSUS").(consensusReactor) | |||||
if ok { | |||||
conR.SwitchToConsensus(state, skipWAL) | |||||
} | |||||
return ok | |||||
} | |||||
func (sio *switchIO) sendStatusRequest(peer p2p.Peer) error { | |||||
msgProto := &bcproto.Message{ | |||||
Sum: &bcproto.Message_StatusRequest{ | |||||
StatusRequest: &bcproto.StatusRequest{}, | |||||
}, | |||||
} | |||||
msgBytes, err := proto.Marshal(msgProto) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued { | |||||
return errPeerQueueFull | |||||
} | |||||
return nil | |||||
} | |||||
func (sio *switchIO) broadcastStatusRequest() error { | |||||
msgProto := &bcproto.Message{ | |||||
Sum: &bcproto.Message_StatusRequest{ | |||||
StatusRequest: &bcproto.StatusRequest{}, | |||||
}, | |||||
} | |||||
msgBytes, err := proto.Marshal(msgProto) | |||||
if err != nil { | |||||
return err | |||||
} | |||||
// XXX: maybe we should use an io specific peer list here | |||||
sio.sw.Broadcast(BlockchainChannel, msgBytes) | |||||
return nil | |||||
} |
@ -1,125 +0,0 @@ | |||||
package v2 | |||||
import ( | |||||
"github.com/go-kit/kit/metrics" | |||||
"github.com/go-kit/kit/metrics/discard" | |||||
"github.com/go-kit/kit/metrics/prometheus" | |||||
stdprometheus "github.com/prometheus/client_golang/prometheus" | |||||
) | |||||
const ( | |||||
// MetricsSubsystem is a subsystem shared by all metrics exposed by this | |||||
// package. | |||||
MetricsSubsystem = "blockchain" | |||||
) | |||||
// Metrics contains metrics exposed by this package. | |||||
type Metrics struct { | |||||
// events_in | |||||
EventsIn metrics.Counter | |||||
// events_in | |||||
EventsHandled metrics.Counter | |||||
// events_out | |||||
EventsOut metrics.Counter | |||||
// errors_in | |||||
ErrorsIn metrics.Counter | |||||
// errors_handled | |||||
ErrorsHandled metrics.Counter | |||||
// errors_out | |||||
ErrorsOut metrics.Counter | |||||
// events_shed | |||||
EventsShed metrics.Counter | |||||
// events_sent | |||||
EventsSent metrics.Counter | |||||
// errors_sent | |||||
ErrorsSent metrics.Counter | |||||
// errors_shed | |||||
ErrorsShed metrics.Counter | |||||
} | |||||
// PrometheusMetrics returns metrics for in and out events, errors, etc. handled by routines. | |||||
// Can we burn in the routine name here? | |||||
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { | |||||
labels := []string{} | |||||
for i := 0; i < len(labelsAndValues); i += 2 { | |||||
labels = append(labels, labelsAndValues[i]) | |||||
} | |||||
return &Metrics{ | |||||
EventsIn: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||||
Namespace: namespace, | |||||
Subsystem: MetricsSubsystem, | |||||
Name: "events_in", | |||||
Help: "Events read from the channel.", | |||||
}, labels).With(labelsAndValues...), | |||||
EventsHandled: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||||
Namespace: namespace, | |||||
Subsystem: MetricsSubsystem, | |||||
Name: "events_handled", | |||||
Help: "Events handled", | |||||
}, labels).With(labelsAndValues...), | |||||
EventsOut: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||||
Namespace: namespace, | |||||
Subsystem: MetricsSubsystem, | |||||
Name: "events_out", | |||||
Help: "Events output from routine.", | |||||
}, labels).With(labelsAndValues...), | |||||
ErrorsIn: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||||
Namespace: namespace, | |||||
Subsystem: MetricsSubsystem, | |||||
Name: "errors_in", | |||||
Help: "Errors read from the channel.", | |||||
}, labels).With(labelsAndValues...), | |||||
ErrorsHandled: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||||
Namespace: namespace, | |||||
Subsystem: MetricsSubsystem, | |||||
Name: "errors_handled", | |||||
Help: "Errors handled.", | |||||
}, labels).With(labelsAndValues...), | |||||
ErrorsOut: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||||
Namespace: namespace, | |||||
Subsystem: MetricsSubsystem, | |||||
Name: "errors_out", | |||||
Help: "Errors output from routine.", | |||||
}, labels).With(labelsAndValues...), | |||||
ErrorsSent: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||||
Namespace: namespace, | |||||
Subsystem: MetricsSubsystem, | |||||
Name: "errors_sent", | |||||
Help: "Errors sent to routine.", | |||||
}, labels).With(labelsAndValues...), | |||||
ErrorsShed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||||
Namespace: namespace, | |||||
Subsystem: MetricsSubsystem, | |||||
Name: "errors_shed", | |||||
Help: "Errors dropped from sending.", | |||||
}, labels).With(labelsAndValues...), | |||||
EventsSent: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||||
Namespace: namespace, | |||||
Subsystem: MetricsSubsystem, | |||||
Name: "events_sent", | |||||
Help: "Events sent to routine.", | |||||
}, labels).With(labelsAndValues...), | |||||
EventsShed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |||||
Namespace: namespace, | |||||
Subsystem: MetricsSubsystem, | |||||
Name: "events_shed", | |||||
Help: "Events dropped from sending.", | |||||
}, labels).With(labelsAndValues...), | |||||
} | |||||
} | |||||
// NopMetrics returns no-op Metrics. | |||||
func NopMetrics() *Metrics { | |||||
return &Metrics{ | |||||
EventsIn: discard.NewCounter(), | |||||
EventsHandled: discard.NewCounter(), | |||||
EventsOut: discard.NewCounter(), | |||||
ErrorsIn: discard.NewCounter(), | |||||
ErrorsHandled: discard.NewCounter(), | |||||
ErrorsOut: discard.NewCounter(), | |||||
EventsShed: discard.NewCounter(), | |||||
EventsSent: discard.NewCounter(), | |||||
ErrorsSent: discard.NewCounter(), | |||||
ErrorsShed: discard.NewCounter(), | |||||
} | |||||
} |
@ -1,193 +0,0 @@ | |||||
package v2 | |||||
import ( | |||||
"fmt" | |||||
tmstate "github.com/tendermint/tendermint/internal/state" | |||||
"github.com/tendermint/tendermint/types" | |||||
) | |||||
// Events generated by the processor: | |||||
// block execution failure, event will indicate the peer(s) that caused the error | |||||
type pcBlockVerificationFailure struct { | |||||
priorityNormal | |||||
height int64 | |||||
firstPeerID types.NodeID | |||||
secondPeerID types.NodeID | |||||
} | |||||
func (e pcBlockVerificationFailure) String() string { | |||||
return fmt.Sprintf("pcBlockVerificationFailure{%d 1st peer: %v, 2nd peer: %v}", | |||||
e.height, e.firstPeerID, e.secondPeerID) | |||||
} | |||||
// successful block execution | |||||
type pcBlockProcessed struct { | |||||
priorityNormal | |||||
height int64 | |||||
peerID types.NodeID | |||||
} | |||||
func (e pcBlockProcessed) String() string { | |||||
return fmt.Sprintf("pcBlockProcessed{%d peer: %v}", e.height, e.peerID) | |||||
} | |||||
// processor has finished | |||||
type pcFinished struct { | |||||
priorityNormal | |||||
blocksSynced int | |||||
tmState tmstate.State | |||||
} | |||||
func (p pcFinished) Error() string { | |||||
return "finished" | |||||
} | |||||
type queueItem struct { | |||||
block *types.Block | |||||
peerID types.NodeID | |||||
} | |||||
type blockQueue map[int64]queueItem | |||||
type pcState struct { | |||||
// blocks waiting to be processed | |||||
queue blockQueue | |||||
// draining indicates that the next rProcessBlock event with a queue miss constitutes completion | |||||
draining bool | |||||
// the number of blocks successfully synced by the processor | |||||
blocksSynced int | |||||
// the processorContext which contains the processor dependencies | |||||
context processorContext | |||||
} | |||||
func (state *pcState) String() string { | |||||
return fmt.Sprintf("height: %d queue length: %d draining: %v blocks synced: %d", | |||||
state.height(), len(state.queue), state.draining, state.blocksSynced) | |||||
} | |||||
// newPcState returns a pcState initialized with the last verified block enqueued | |||||
func newPcState(context processorContext) *pcState { | |||||
return &pcState{ | |||||
queue: blockQueue{}, | |||||
draining: false, | |||||
blocksSynced: 0, | |||||
context: context, | |||||
} | |||||
} | |||||
// nextTwo returns the next two unverified blocks | |||||
func (state *pcState) nextTwo() (queueItem, queueItem, error) { | |||||
if first, ok := state.queue[state.height()+1]; ok { | |||||
if second, ok := state.queue[state.height()+2]; ok { | |||||
return first, second, nil | |||||
} | |||||
} | |||||
return queueItem{}, queueItem{}, fmt.Errorf("not found") | |||||
} | |||||
// synced returns true when at most the last verified block remains in the queue | |||||
func (state *pcState) synced() bool { | |||||
return len(state.queue) <= 1 | |||||
} | |||||
func (state *pcState) enqueue(peerID types.NodeID, block *types.Block, height int64) { | |||||
if item, ok := state.queue[height]; ok { | |||||
panic(fmt.Sprintf( | |||||
"duplicate block %d (%X) enqueued by processor (sent by %v; existing block %X from %v)", | |||||
height, block.Hash(), peerID, item.block.Hash(), item.peerID)) | |||||
} | |||||
state.queue[height] = queueItem{block: block, peerID: peerID} | |||||
} | |||||
func (state *pcState) height() int64 { | |||||
return state.context.tmState().LastBlockHeight | |||||
} | |||||
// purgePeer moves all unprocessed blocks from the queue | |||||
func (state *pcState) purgePeer(peerID types.NodeID) { | |||||
// what if height is less than state.height? | |||||
for height, item := range state.queue { | |||||
if item.peerID == peerID { | |||||
delete(state.queue, height) | |||||
} | |||||
} | |||||
} | |||||
// handle processes FSM events | |||||
func (state *pcState) handle(event Event) (Event, error) { | |||||
switch event := event.(type) { | |||||
case bcResetState: | |||||
state.context.setState(event.state) | |||||
return noOp, nil | |||||
case scFinishedEv: | |||||
if state.synced() { | |||||
return pcFinished{tmState: state.context.tmState(), blocksSynced: state.blocksSynced}, nil | |||||
} | |||||
state.draining = true | |||||
return noOp, nil | |||||
case scPeerError: | |||||
state.purgePeer(event.peerID) | |||||
return noOp, nil | |||||
case scBlockReceived: | |||||
if event.block == nil { | |||||
return noOp, nil | |||||
} | |||||
// enqueue block if height is higher than state height, else ignore it | |||||
if event.block.Height > state.height() { | |||||
state.enqueue(event.peerID, event.block, event.block.Height) | |||||
} | |||||
return noOp, nil | |||||
case rProcessBlock: | |||||
tmstate := state.context.tmState() | |||||
firstItem, secondItem, err := state.nextTwo() | |||||
if err != nil { | |||||
if state.draining { | |||||
return pcFinished{tmState: tmstate, blocksSynced: state.blocksSynced}, nil | |||||
} | |||||
return noOp, nil | |||||
} | |||||
var ( | |||||
first, second = firstItem.block, secondItem.block | |||||
firstParts = first.MakePartSet(types.BlockPartSizeBytes) | |||||
firstID = types.BlockID{Hash: first.Hash(), PartSetHeader: firstParts.Header()} | |||||
) | |||||
// verify if +second+ last commit "confirms" +first+ block | |||||
err = state.context.verifyCommit(tmstate.ChainID, firstID, first.Height, second.LastCommit) | |||||
if err != nil { | |||||
state.purgePeer(firstItem.peerID) | |||||
if firstItem.peerID != secondItem.peerID { | |||||
state.purgePeer(secondItem.peerID) | |||||
} | |||||
return pcBlockVerificationFailure{ | |||||
height: first.Height, firstPeerID: firstItem.peerID, secondPeerID: secondItem.peerID}, | |||||
nil | |||||
} | |||||
state.context.saveBlock(first, firstParts, second.LastCommit) | |||||
if err := state.context.applyBlock(firstID, first); err != nil { | |||||
panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) | |||||
} | |||||
state.context.recordConsMetrics(first) | |||||
delete(state.queue, first.Height) | |||||
state.blocksSynced++ | |||||
return pcBlockProcessed{height: first.Height, peerID: firstItem.peerID}, nil | |||||
} | |||||
return noOp, nil | |||||
} |
@ -1,112 +0,0 @@ | |||||
package v2 | |||||
import ( | |||||
"fmt" | |||||
"github.com/tendermint/tendermint/internal/consensus" | |||||
"github.com/tendermint/tendermint/internal/state" | |||||
"github.com/tendermint/tendermint/types" | |||||
) | |||||
type processorContext interface { | |||||
applyBlock(blockID types.BlockID, block *types.Block) error | |||||
verifyCommit(chainID string, blockID types.BlockID, height int64, commit *types.Commit) error | |||||
saveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) | |||||
tmState() state.State | |||||
setState(state.State) | |||||
recordConsMetrics(block *types.Block) | |||||
} | |||||
type pContext struct { | |||||
store blockStore | |||||
applier blockApplier | |||||
state state.State | |||||
metrics *consensus.Metrics | |||||
} | |||||
func newProcessorContext(st blockStore, ex blockApplier, s state.State, m *consensus.Metrics) *pContext { | |||||
return &pContext{ | |||||
store: st, | |||||
applier: ex, | |||||
state: s, | |||||
metrics: m, | |||||
} | |||||
} | |||||
func (pc *pContext) applyBlock(blockID types.BlockID, block *types.Block) error { | |||||
newState, err := pc.applier.ApplyBlock(pc.state, blockID, block) | |||||
pc.state = newState | |||||
return err | |||||
} | |||||
func (pc pContext) tmState() state.State { | |||||
return pc.state | |||||
} | |||||
func (pc *pContext) setState(state state.State) { | |||||
pc.state = state | |||||
} | |||||
func (pc pContext) verifyCommit(chainID string, blockID types.BlockID, height int64, commit *types.Commit) error { | |||||
return pc.state.Validators.VerifyCommitLight(chainID, blockID, height, commit) | |||||
} | |||||
func (pc *pContext) saveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { | |||||
pc.store.SaveBlock(block, blockParts, seenCommit) | |||||
} | |||||
func (pc *pContext) recordConsMetrics(block *types.Block) { | |||||
pc.metrics.RecordConsMetrics(block) | |||||
} | |||||
type mockPContext struct { | |||||
applicationBL []int64 | |||||
verificationBL []int64 | |||||
state state.State | |||||
} | |||||
func newMockProcessorContext( | |||||
state state.State, | |||||
verificationBlackList []int64, | |||||
applicationBlackList []int64) *mockPContext { | |||||
return &mockPContext{ | |||||
applicationBL: applicationBlackList, | |||||
verificationBL: verificationBlackList, | |||||
state: state, | |||||
} | |||||
} | |||||
func (mpc *mockPContext) applyBlock(blockID types.BlockID, block *types.Block) error { | |||||
for _, h := range mpc.applicationBL { | |||||
if h == block.Height { | |||||
return fmt.Errorf("generic application error") | |||||
} | |||||
} | |||||
mpc.state.LastBlockHeight = block.Height | |||||
return nil | |||||
} | |||||
func (mpc *mockPContext) verifyCommit(chainID string, blockID types.BlockID, height int64, commit *types.Commit) error { | |||||
for _, h := range mpc.verificationBL { | |||||
if h == height { | |||||
return fmt.Errorf("generic verification error") | |||||
} | |||||
} | |||||
return nil | |||||
} | |||||
func (mpc *mockPContext) saveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { | |||||
} | |||||
func (mpc *mockPContext) setState(state state.State) { | |||||
mpc.state = state | |||||
} | |||||
func (mpc *mockPContext) tmState() state.State { | |||||
return mpc.state | |||||
} | |||||
func (mpc *mockPContext) recordConsMetrics(block *types.Block) { | |||||
} |
@ -1,305 +0,0 @@ | |||||
package v2 | |||||
import ( | |||||
"testing" | |||||
"github.com/stretchr/testify/assert" | |||||
tmstate "github.com/tendermint/tendermint/internal/state" | |||||
"github.com/tendermint/tendermint/types" | |||||
) | |||||
// pcBlock is a test helper structure with simple types. Its purpose is to help with test readability. | |||||
type pcBlock struct { | |||||
pid string | |||||
height int64 | |||||
} | |||||
// params is a test structure used to create processor state. | |||||
type params struct { | |||||
height int64 | |||||
items []pcBlock | |||||
blocksSynced int | |||||
verBL []int64 | |||||
appBL []int64 | |||||
draining bool | |||||
} | |||||
// makePcBlock makes an empty block. | |||||
func makePcBlock(height int64) *types.Block { | |||||
return &types.Block{Header: types.Header{Height: height}} | |||||
} | |||||
// makeState takes test parameters and creates a specific processor state. | |||||
func makeState(p *params) *pcState { | |||||
var ( | |||||
tmState = tmstate.State{LastBlockHeight: p.height} | |||||
context = newMockProcessorContext(tmState, p.verBL, p.appBL) | |||||
) | |||||
state := newPcState(context) | |||||
for _, item := range p.items { | |||||
state.enqueue(types.NodeID(item.pid), makePcBlock(item.height), item.height) | |||||
} | |||||
state.blocksSynced = p.blocksSynced | |||||
state.draining = p.draining | |||||
return state | |||||
} | |||||
func mBlockResponse(peerID types.NodeID, height int64) scBlockReceived { | |||||
return scBlockReceived{ | |||||
peerID: peerID, | |||||
block: makePcBlock(height), | |||||
} | |||||
} | |||||
type pcFsmMakeStateValues struct { | |||||
currentState *params | |||||
event Event | |||||
wantState *params | |||||
wantNextEvent Event | |||||
wantErr error | |||||
wantPanic bool | |||||
} | |||||
type testFields struct { | |||||
name string | |||||
steps []pcFsmMakeStateValues | |||||
} | |||||
func executeProcessorTests(t *testing.T, tests []testFields) { | |||||
for _, tt := range tests { | |||||
tt := tt | |||||
t.Run(tt.name, func(t *testing.T) { | |||||
var state *pcState | |||||
for _, step := range tt.steps { | |||||
defer func() { | |||||
r := recover() | |||||
if (r != nil) != step.wantPanic { | |||||
t.Errorf("recover = %v, wantPanic = %v", r, step.wantPanic) | |||||
} | |||||
}() | |||||
// First step must always initialize the currentState as state. | |||||
if step.currentState != nil { | |||||
state = makeState(step.currentState) | |||||
} | |||||
if state == nil { | |||||
panic("Bad (initial?) step") | |||||
} | |||||
nextEvent, err := state.handle(step.event) | |||||
t.Log(state) | |||||
assert.Equal(t, step.wantErr, err) | |||||
assert.Equal(t, makeState(step.wantState), state) | |||||
assert.Equal(t, step.wantNextEvent, nextEvent) | |||||
// Next step may use the wantedState as their currentState. | |||||
state = makeState(step.wantState) | |||||
} | |||||
}) | |||||
} | |||||
} | |||||
func TestRProcessPeerError(t *testing.T) { | |||||
tests := []testFields{ | |||||
{ | |||||
name: "error for existing peer", | |||||
steps: []pcFsmMakeStateValues{ | |||||
{ | |||||
currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}}, | |||||
event: scPeerError{peerID: "P2"}, | |||||
wantState: ¶ms{items: []pcBlock{{"P1", 1}}}, | |||||
wantNextEvent: noOp, | |||||
}, | |||||
}, | |||||
}, | |||||
{ | |||||
name: "error for unknown peer", | |||||
steps: []pcFsmMakeStateValues{ | |||||
{ | |||||
currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}}, | |||||
event: scPeerError{peerID: "P3"}, | |||||
wantState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}}, | |||||
wantNextEvent: noOp, | |||||
}, | |||||
}, | |||||
}, | |||||
} | |||||
executeProcessorTests(t, tests) | |||||
} | |||||
func TestPcBlockResponse(t *testing.T) { | |||||
tests := []testFields{ | |||||
{ | |||||
name: "add one block", | |||||
steps: []pcFsmMakeStateValues{ | |||||
{ | |||||
currentState: ¶ms{}, event: mBlockResponse("P1", 1), | |||||
wantState: ¶ms{items: []pcBlock{{"P1", 1}}}, wantNextEvent: noOp, | |||||
}, | |||||
}, | |||||
}, | |||||
{ | |||||
name: "add two blocks", | |||||
steps: []pcFsmMakeStateValues{ | |||||
{ | |||||
currentState: ¶ms{}, event: mBlockResponse("P1", 3), | |||||
wantState: ¶ms{items: []pcBlock{{"P1", 3}}}, wantNextEvent: noOp, | |||||
}, | |||||
{ // use previous wantState as currentState, | |||||
event: mBlockResponse("P1", 4), | |||||
wantState: ¶ms{items: []pcBlock{{"P1", 3}, {"P1", 4}}}, wantNextEvent: noOp, | |||||
}, | |||||
}, | |||||
}, | |||||
} | |||||
executeProcessorTests(t, tests) | |||||
} | |||||
func TestRProcessBlockSuccess(t *testing.T) { | |||||
tests := []testFields{ | |||||
{ | |||||
name: "noop - no blocks over current height", | |||||
steps: []pcFsmMakeStateValues{ | |||||
{ | |||||
currentState: ¶ms{}, event: rProcessBlock{}, | |||||
wantState: ¶ms{}, wantNextEvent: noOp, | |||||
}, | |||||
}, | |||||
}, | |||||
{ | |||||
name: "noop - high new blocks", | |||||
steps: []pcFsmMakeStateValues{ | |||||
{ | |||||
currentState: ¶ms{height: 5, items: []pcBlock{{"P1", 30}, {"P2", 31}}}, event: rProcessBlock{}, | |||||
wantState: ¶ms{height: 5, items: []pcBlock{{"P1", 30}, {"P2", 31}}}, wantNextEvent: noOp, | |||||
}, | |||||
}, | |||||
}, | |||||
{ | |||||
name: "blocks H+1 and H+2 present", | |||||
steps: []pcFsmMakeStateValues{ | |||||
{ | |||||
currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}}, event: rProcessBlock{}, | |||||
wantState: ¶ms{height: 1, items: []pcBlock{{"P2", 2}}, blocksSynced: 1}, | |||||
wantNextEvent: pcBlockProcessed{height: 1, peerID: "P1"}, | |||||
}, | |||||
}, | |||||
}, | |||||
{ | |||||
name: "blocks H+1 and H+2 present after draining", | |||||
steps: []pcFsmMakeStateValues{ | |||||
{ // some contiguous blocks - on stop check draining is set | |||||
currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}, {"P1", 4}}}, | |||||
event: scFinishedEv{}, | |||||
wantState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}, {"P1", 4}}, draining: true}, | |||||
wantNextEvent: noOp, | |||||
}, | |||||
{ | |||||
event: rProcessBlock{}, | |||||
wantState: ¶ms{height: 1, items: []pcBlock{{"P2", 2}, {"P1", 4}}, blocksSynced: 1, draining: true}, | |||||
wantNextEvent: pcBlockProcessed{height: 1, peerID: "P1"}, | |||||
}, | |||||
{ // finish when H+1 or/and H+2 are missing | |||||
event: rProcessBlock{}, | |||||
wantState: ¶ms{height: 1, items: []pcBlock{{"P2", 2}, {"P1", 4}}, blocksSynced: 1, draining: true}, | |||||
wantNextEvent: pcFinished{tmState: tmstate.State{LastBlockHeight: 1}, blocksSynced: 1}, | |||||
}, | |||||
}, | |||||
}, | |||||
} | |||||
executeProcessorTests(t, tests) | |||||
} | |||||
func TestRProcessBlockFailures(t *testing.T) { | |||||
tests := []testFields{ | |||||
{ | |||||
name: "blocks H+1 and H+2 present from different peers - H+1 verification fails ", | |||||
steps: []pcFsmMakeStateValues{ | |||||
{ | |||||
currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}, verBL: []int64{1}}, event: rProcessBlock{}, | |||||
wantState: ¶ms{items: []pcBlock{}, verBL: []int64{1}}, | |||||
wantNextEvent: pcBlockVerificationFailure{height: 1, firstPeerID: "P1", secondPeerID: "P2"}, | |||||
}, | |||||
}, | |||||
}, | |||||
{ | |||||
name: "blocks H+1 and H+2 present from same peer - H+1 applyBlock fails ", | |||||
steps: []pcFsmMakeStateValues{ | |||||
{ | |||||
currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}, appBL: []int64{1}}, event: rProcessBlock{}, | |||||
wantState: ¶ms{items: []pcBlock{}, appBL: []int64{1}}, wantPanic: true, | |||||
}, | |||||
}, | |||||
}, | |||||
{ | |||||
name: "blocks H+1 and H+2 present from same peers - H+1 verification fails ", | |||||
steps: []pcFsmMakeStateValues{ | |||||
{ | |||||
currentState: ¶ms{height: 0, items: []pcBlock{{"P1", 1}, {"P1", 2}, {"P2", 3}}, | |||||
verBL: []int64{1}}, event: rProcessBlock{}, | |||||
wantState: ¶ms{height: 0, items: []pcBlock{{"P2", 3}}, verBL: []int64{1}}, | |||||
wantNextEvent: pcBlockVerificationFailure{height: 1, firstPeerID: "P1", secondPeerID: "P1"}, | |||||
}, | |||||
}, | |||||
}, | |||||
{ | |||||
name: "blocks H+1 and H+2 present from different peers - H+1 applyBlock fails ", | |||||
steps: []pcFsmMakeStateValues{ | |||||
{ | |||||
currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}, {"P2", 3}}, appBL: []int64{1}}, | |||||
event: rProcessBlock{}, | |||||
wantState: ¶ms{items: []pcBlock{{"P2", 3}}, appBL: []int64{1}}, wantPanic: true, | |||||
}, | |||||
}, | |||||
}, | |||||
} | |||||
executeProcessorTests(t, tests) | |||||
} | |||||
func TestScFinishedEv(t *testing.T) { | |||||
tests := []testFields{ | |||||
{ | |||||
name: "no blocks", | |||||
steps: []pcFsmMakeStateValues{ | |||||
{ | |||||
currentState: ¶ms{height: 100, items: []pcBlock{}, blocksSynced: 100}, event: scFinishedEv{}, | |||||
wantState: ¶ms{height: 100, items: []pcBlock{}, blocksSynced: 100}, | |||||
wantNextEvent: pcFinished{tmState: tmstate.State{LastBlockHeight: 100}, blocksSynced: 100}, | |||||
}, | |||||
}, | |||||
}, | |||||
{ | |||||
name: "maxHeight+1 block present", | |||||
steps: []pcFsmMakeStateValues{ | |||||
{ | |||||
currentState: ¶ms{height: 100, items: []pcBlock{ | |||||
{"P1", 101}}, blocksSynced: 100}, event: scFinishedEv{}, | |||||
wantState: ¶ms{height: 100, items: []pcBlock{{"P1", 101}}, blocksSynced: 100}, | |||||
wantNextEvent: pcFinished{tmState: tmstate.State{LastBlockHeight: 100}, blocksSynced: 100}, | |||||
}, | |||||
}, | |||||
}, | |||||
{ | |||||
name: "more blocks present", | |||||
steps: []pcFsmMakeStateValues{ | |||||
{ | |||||
currentState: ¶ms{height: 100, items: []pcBlock{ | |||||
{"P1", 101}, {"P1", 102}}, blocksSynced: 100}, event: scFinishedEv{}, | |||||
wantState: ¶ms{height: 100, items: []pcBlock{ | |||||
{"P1", 101}, {"P1", 102}}, blocksSynced: 100, draining: true}, | |||||
wantNextEvent: noOp, | |||||
wantErr: nil, | |||||
}, | |||||
}, | |||||
}, | |||||
} | |||||
executeProcessorTests(t, tests) | |||||
} |
@ -1,643 +0,0 @@ | |||||
package v2 | |||||
import ( | |||||
"errors" | |||||
"fmt" | |||||
"time" | |||||
"github.com/gogo/protobuf/proto" | |||||
"github.com/tendermint/tendermint/internal/blocksync" | |||||
"github.com/tendermint/tendermint/internal/blocksync/v2/internal/behavior" | |||||
"github.com/tendermint/tendermint/internal/consensus" | |||||
tmsync "github.com/tendermint/tendermint/internal/libs/sync" | |||||
"github.com/tendermint/tendermint/internal/p2p" | |||||
"github.com/tendermint/tendermint/internal/state" | |||||
"github.com/tendermint/tendermint/libs/log" | |||||
"github.com/tendermint/tendermint/libs/sync" | |||||
bcproto "github.com/tendermint/tendermint/proto/tendermint/blocksync" | |||||
"github.com/tendermint/tendermint/types" | |||||
) | |||||
const ( | |||||
// chBufferSize is the buffer size of all event channels. | |||||
chBufferSize int = 1000 | |||||
) | |||||
type blockStore interface { | |||||
LoadBlock(height int64) *types.Block | |||||
SaveBlock(*types.Block, *types.PartSet, *types.Commit) | |||||
Base() int64 | |||||
Height() int64 | |||||
} | |||||
// BlockchainReactor handles block sync protocol. | |||||
type BlockchainReactor struct { | |||||
p2p.BaseReactor | |||||
blockSync *sync.AtomicBool // enable block sync on start when it's been Set | |||||
stateSynced bool // set to true when SwitchToBlockSync is called by state sync | |||||
scheduler *Routine | |||||
processor *Routine | |||||
logger log.Logger | |||||
mtx tmsync.RWMutex | |||||
maxPeerHeight int64 | |||||
syncHeight int64 | |||||
events chan Event // non-nil during a block sync | |||||
reporter behavior.Reporter | |||||
io iIO | |||||
store blockStore | |||||
syncStartTime time.Time | |||||
syncStartHeight int64 | |||||
lastSyncRate float64 // # blocks sync per sec base on the last 100 blocks | |||||
} | |||||
type blockApplier interface { | |||||
ApplyBlock(state state.State, blockID types.BlockID, block *types.Block) (state.State, error) | |||||
} | |||||
// XXX: unify naming in this package around tmState | |||||
func newReactor(state state.State, store blockStore, reporter behavior.Reporter, | |||||
blockApplier blockApplier, blockSync bool, metrics *consensus.Metrics) *BlockchainReactor { | |||||
initHeight := state.LastBlockHeight + 1 | |||||
if initHeight == 1 { | |||||
initHeight = state.InitialHeight | |||||
} | |||||
scheduler := newScheduler(initHeight, time.Now()) | |||||
pContext := newProcessorContext(store, blockApplier, state, metrics) | |||||
// TODO: Fix naming to just newProcesssor | |||||
// newPcState requires a processorContext | |||||
processor := newPcState(pContext) | |||||
return &BlockchainReactor{ | |||||
scheduler: newRoutine("scheduler", scheduler.handle, chBufferSize), | |||||
processor: newRoutine("processor", processor.handle, chBufferSize), | |||||
store: store, | |||||
reporter: reporter, | |||||
logger: log.NewNopLogger(), | |||||
blockSync: sync.NewBool(blockSync), | |||||
syncStartHeight: initHeight, | |||||
syncStartTime: time.Time{}, | |||||
lastSyncRate: 0, | |||||
} | |||||
} | |||||
// NewBlockchainReactor creates a new reactor instance. | |||||
func NewBlockchainReactor( | |||||
state state.State, | |||||
blockApplier blockApplier, | |||||
store blockStore, | |||||
blockSync bool, | |||||
metrics *consensus.Metrics) *BlockchainReactor { | |||||
reporter := behavior.NewMockReporter() | |||||
return newReactor(state, store, reporter, blockApplier, blockSync, metrics) | |||||
} | |||||
// SetSwitch implements Reactor interface. | |||||
func (r *BlockchainReactor) SetSwitch(sw *p2p.Switch) { | |||||
r.Switch = sw | |||||
if sw != nil { | |||||
r.io = newSwitchIo(sw) | |||||
} else { | |||||
r.io = nil | |||||
} | |||||
} | |||||
func (r *BlockchainReactor) setMaxPeerHeight(height int64) { | |||||
r.mtx.Lock() | |||||
defer r.mtx.Unlock() | |||||
if height > r.maxPeerHeight { | |||||
r.maxPeerHeight = height | |||||
} | |||||
} | |||||
func (r *BlockchainReactor) setSyncHeight(height int64) { | |||||
r.mtx.Lock() | |||||
defer r.mtx.Unlock() | |||||
r.syncHeight = height | |||||
} | |||||
// SyncHeight returns the height to which the BlockchainReactor has synced. | |||||
func (r *BlockchainReactor) SyncHeight() int64 { | |||||
r.mtx.RLock() | |||||
defer r.mtx.RUnlock() | |||||
return r.syncHeight | |||||
} | |||||
// SetLogger sets the logger of the reactor. | |||||
func (r *BlockchainReactor) SetLogger(logger log.Logger) { | |||||
r.logger = logger | |||||
r.scheduler.setLogger(logger) | |||||
r.processor.setLogger(logger) | |||||
} | |||||
// Start implements cmn.Service interface | |||||
func (r *BlockchainReactor) Start() error { | |||||
r.reporter = behavior.NewSwitchReporter(r.BaseReactor.Switch) | |||||
if r.blockSync.IsSet() { | |||||
err := r.startSync(nil) | |||||
if err != nil { | |||||
return fmt.Errorf("failed to start block sync: %w", err) | |||||
} | |||||
} | |||||
return nil | |||||
} | |||||
// startSync begins a block sync, signaled by r.events being non-nil. If state is non-nil, | |||||
// the scheduler and processor is updated with this state on startup. | |||||
func (r *BlockchainReactor) startSync(state *state.State) error { | |||||
r.mtx.Lock() | |||||
defer r.mtx.Unlock() | |||||
if r.events != nil { | |||||
return errors.New("block sync already in progress") | |||||
} | |||||
r.events = make(chan Event, chBufferSize) | |||||
go r.scheduler.start() | |||||
go r.processor.start() | |||||
if state != nil { | |||||
<-r.scheduler.ready() | |||||
<-r.processor.ready() | |||||
r.scheduler.send(bcResetState{state: *state}) | |||||
r.processor.send(bcResetState{state: *state}) | |||||
} | |||||
go r.demux(r.events) | |||||
return nil | |||||
} | |||||
// endSync ends a block sync | |||||
func (r *BlockchainReactor) endSync() { | |||||
r.mtx.Lock() | |||||
defer r.mtx.Unlock() | |||||
if r.events != nil { | |||||
close(r.events) | |||||
} | |||||
r.events = nil | |||||
r.scheduler.stop() | |||||
r.processor.stop() | |||||
} | |||||
// SwitchToBlockSync is called by the state sync reactor when switching to block sync. | |||||
func (r *BlockchainReactor) SwitchToBlockSync(state state.State) error { | |||||
r.stateSynced = true | |||||
state = state.Copy() | |||||
err := r.startSync(&state) | |||||
if err == nil { | |||||
r.syncStartTime = time.Now() | |||||
} | |||||
return err | |||||
} | |||||
// reactor generated ticker events: | |||||
// ticker for cleaning peers | |||||
type rTryPrunePeer struct { | |||||
priorityHigh | |||||
time time.Time | |||||
} | |||||
func (e rTryPrunePeer) String() string { | |||||
return fmt.Sprintf("rTryPrunePeer{%v}", e.time) | |||||
} | |||||
// ticker event for scheduling block requests | |||||
type rTrySchedule struct { | |||||
priorityHigh | |||||
time time.Time | |||||
} | |||||
func (e rTrySchedule) String() string { | |||||
return fmt.Sprintf("rTrySchedule{%v}", e.time) | |||||
} | |||||
// ticker for block processing | |||||
type rProcessBlock struct { | |||||
priorityNormal | |||||
} | |||||
func (e rProcessBlock) String() string { | |||||
return "rProcessBlock" | |||||
} | |||||
// reactor generated events based on blockchain related messages from peers: | |||||
// blockResponse message received from a peer | |||||
type bcBlockResponse struct { | |||||
priorityNormal | |||||
time time.Time | |||||
peerID types.NodeID | |||||
size int64 | |||||
block *types.Block | |||||
} | |||||
func (resp bcBlockResponse) String() string { | |||||
return fmt.Sprintf("bcBlockResponse{%d#%X (size: %d bytes) from %v at %v}", | |||||
resp.block.Height, resp.block.Hash(), resp.size, resp.peerID, resp.time) | |||||
} | |||||
// blockNoResponse message received from a peer | |||||
type bcNoBlockResponse struct { | |||||
priorityNormal | |||||
time time.Time | |||||
peerID types.NodeID | |||||
height int64 | |||||
} | |||||
func (resp bcNoBlockResponse) String() string { | |||||
return fmt.Sprintf("bcNoBlockResponse{%v has no block at height %d at %v}", | |||||
resp.peerID, resp.height, resp.time) | |||||
} | |||||
// statusResponse message received from a peer | |||||
type bcStatusResponse struct { | |||||
priorityNormal | |||||
time time.Time | |||||
peerID types.NodeID | |||||
base int64 | |||||
height int64 | |||||
} | |||||
func (resp bcStatusResponse) String() string { | |||||
return fmt.Sprintf("bcStatusResponse{%v is at height %d (base: %d) at %v}", | |||||
resp.peerID, resp.height, resp.base, resp.time) | |||||
} | |||||
// new peer is connected | |||||
type bcAddNewPeer struct { | |||||
priorityNormal | |||||
peerID types.NodeID | |||||
} | |||||
func (resp bcAddNewPeer) String() string { | |||||
return fmt.Sprintf("bcAddNewPeer{%v}", resp.peerID) | |||||
} | |||||
// existing peer is removed | |||||
type bcRemovePeer struct { | |||||
priorityHigh | |||||
peerID types.NodeID | |||||
reason interface{} | |||||
} | |||||
func (resp bcRemovePeer) String() string { | |||||
return fmt.Sprintf("bcRemovePeer{%v due to %v}", resp.peerID, resp.reason) | |||||
} | |||||
// resets the scheduler and processor state, e.g. following a switch from state syncing | |||||
type bcResetState struct { | |||||
priorityHigh | |||||
state state.State | |||||
} | |||||
func (e bcResetState) String() string { | |||||
return fmt.Sprintf("bcResetState{%v}", e.state) | |||||
} | |||||
// Takes the channel as a parameter to avoid race conditions on r.events. | |||||
func (r *BlockchainReactor) demux(events <-chan Event) { | |||||
var lastHundred = time.Now() | |||||
var ( | |||||
processBlockFreq = 20 * time.Millisecond | |||||
doProcessBlockCh = make(chan struct{}, 1) | |||||
doProcessBlockTk = time.NewTicker(processBlockFreq) | |||||
) | |||||
defer doProcessBlockTk.Stop() | |||||
var ( | |||||
prunePeerFreq = 1 * time.Second | |||||
doPrunePeerCh = make(chan struct{}, 1) | |||||
doPrunePeerTk = time.NewTicker(prunePeerFreq) | |||||
) | |||||
defer doPrunePeerTk.Stop() | |||||
var ( | |||||
scheduleFreq = 20 * time.Millisecond | |||||
doScheduleCh = make(chan struct{}, 1) | |||||
doScheduleTk = time.NewTicker(scheduleFreq) | |||||
) | |||||
defer doScheduleTk.Stop() | |||||
var ( | |||||
statusFreq = 10 * time.Second | |||||
doStatusCh = make(chan struct{}, 1) | |||||
doStatusTk = time.NewTicker(statusFreq) | |||||
) | |||||
defer doStatusTk.Stop() | |||||
doStatusCh <- struct{}{} // immediately broadcast to get status of existing peers | |||||
// Memoize the scSchedulerFail error to avoid printing it every scheduleFreq. | |||||
var scSchedulerFailErr error | |||||
// XXX: Extract timers to make testing atemporal | |||||
for { | |||||
select { | |||||
// Pacers: send at most per frequency but don't saturate | |||||
case <-doProcessBlockTk.C: | |||||
select { | |||||
case doProcessBlockCh <- struct{}{}: | |||||
default: | |||||
} | |||||
case <-doPrunePeerTk.C: | |||||
select { | |||||
case doPrunePeerCh <- struct{}{}: | |||||
default: | |||||
} | |||||
case <-doScheduleTk.C: | |||||
select { | |||||
case doScheduleCh <- struct{}{}: | |||||
default: | |||||
} | |||||
case <-doStatusTk.C: | |||||
select { | |||||
case doStatusCh <- struct{}{}: | |||||
default: | |||||
} | |||||
// Tickers: perform tasks periodically | |||||
case <-doScheduleCh: | |||||
r.scheduler.send(rTrySchedule{time: time.Now()}) | |||||
case <-doPrunePeerCh: | |||||
r.scheduler.send(rTryPrunePeer{time: time.Now()}) | |||||
case <-doProcessBlockCh: | |||||
r.processor.send(rProcessBlock{}) | |||||
case <-doStatusCh: | |||||
if err := r.io.broadcastStatusRequest(); err != nil { | |||||
r.logger.Error("Error broadcasting status request", "err", err) | |||||
} | |||||
// Events from peers. Closing the channel signals event loop termination. | |||||
case event, ok := <-events: | |||||
if !ok { | |||||
r.logger.Info("Stopping event processing") | |||||
return | |||||
} | |||||
switch event := event.(type) { | |||||
case bcStatusResponse: | |||||
r.setMaxPeerHeight(event.height) | |||||
r.scheduler.send(event) | |||||
case bcAddNewPeer, bcRemovePeer, bcBlockResponse, bcNoBlockResponse: | |||||
r.scheduler.send(event) | |||||
default: | |||||
r.logger.Error("Received unexpected event", "event", fmt.Sprintf("%T", event)) | |||||
} | |||||
// Incremental events from scheduler | |||||
case event := <-r.scheduler.next(): | |||||
switch event := event.(type) { | |||||
case scBlockReceived: | |||||
r.processor.send(event) | |||||
case scPeerError: | |||||
r.processor.send(event) | |||||
if err := r.reporter.Report(behavior.BadMessage(event.peerID, "scPeerError")); err != nil { | |||||
r.logger.Error("Error reporting peer", "err", err) | |||||
} | |||||
case scBlockRequest: | |||||
peer := r.Switch.Peers().Get(event.peerID) | |||||
if peer == nil { | |||||
r.logger.Error("Wanted to send block request, but no such peer", "peerID", event.peerID) | |||||
continue | |||||
} | |||||
if err := r.io.sendBlockRequest(peer, event.height); err != nil { | |||||
r.logger.Error("Error sending block request", "err", err) | |||||
} | |||||
case scFinishedEv: | |||||
r.processor.send(event) | |||||
r.scheduler.stop() | |||||
case scSchedulerFail: | |||||
if scSchedulerFailErr != event.reason { | |||||
r.logger.Error("Scheduler failure", "err", event.reason.Error()) | |||||
scSchedulerFailErr = event.reason | |||||
} | |||||
case scPeersPruned: | |||||
// Remove peers from the processor. | |||||
for _, peerID := range event.peers { | |||||
r.processor.send(scPeerError{peerID: peerID, reason: errors.New("peer was pruned")}) | |||||
} | |||||
r.logger.Debug("Pruned peers", "count", len(event.peers)) | |||||
case noOpEvent: | |||||
default: | |||||
r.logger.Error("Received unexpected scheduler event", "event", fmt.Sprintf("%T", event)) | |||||
} | |||||
// Incremental events from processor | |||||
case event := <-r.processor.next(): | |||||
switch event := event.(type) { | |||||
case pcBlockProcessed: | |||||
r.setSyncHeight(event.height) | |||||
if (r.syncHeight-r.syncStartHeight)%100 == 0 { | |||||
newSyncRate := 100 / time.Since(lastHundred).Seconds() | |||||
if r.lastSyncRate == 0 { | |||||
r.lastSyncRate = newSyncRate | |||||
} else { | |||||
r.lastSyncRate = 0.9*r.lastSyncRate + 0.1*newSyncRate | |||||
} | |||||
r.logger.Info("block sync Rate", "height", r.syncHeight, | |||||
"max_peer_height", r.maxPeerHeight, "blocks/s", r.lastSyncRate) | |||||
lastHundred = time.Now() | |||||
} | |||||
r.scheduler.send(event) | |||||
case pcBlockVerificationFailure: | |||||
r.scheduler.send(event) | |||||
case pcFinished: | |||||
r.logger.Info("block sync complete, switching to consensus") | |||||
if !r.io.trySwitchToConsensus(event.tmState, event.blocksSynced > 0 || r.stateSynced) { | |||||
r.logger.Error("Failed to switch to consensus reactor") | |||||
} | |||||
r.endSync() | |||||
r.blockSync.UnSet() | |||||
return | |||||
case noOpEvent: | |||||
default: | |||||
r.logger.Error("Received unexpected processor event", "event", fmt.Sprintf("%T", event)) | |||||
} | |||||
// Terminal event from scheduler | |||||
case err := <-r.scheduler.final(): | |||||
switch err { | |||||
case nil: | |||||
r.logger.Info("Scheduler stopped") | |||||
default: | |||||
r.logger.Error("Scheduler aborted with error", "err", err) | |||||
} | |||||
// Terminal event from processor | |||||
case err := <-r.processor.final(): | |||||
switch err { | |||||
case nil: | |||||
r.logger.Info("Processor stopped") | |||||
default: | |||||
r.logger.Error("Processor aborted with error", "err", err) | |||||
} | |||||
} | |||||
} | |||||
} | |||||
// Stop implements cmn.Service interface. | |||||
func (r *BlockchainReactor) Stop() error { | |||||
r.logger.Info("reactor stopping") | |||||
r.endSync() | |||||
r.logger.Info("reactor stopped") | |||||
return nil | |||||
} | |||||
// Receive implements Reactor by handling different message types. | |||||
// XXX: do not call any methods that can block or incur heavy processing. | |||||
// https://github.com/tendermint/tendermint/issues/2888 | |||||
func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { | |||||
logger := r.logger.With("src", src.ID(), "chID", chID) | |||||
msgProto := new(bcproto.Message) | |||||
if err := proto.Unmarshal(msgBytes, msgProto); err != nil { | |||||
logger.Error("error decoding message", "err", err) | |||||
_ = r.reporter.Report(behavior.BadMessage(src.ID(), err.Error())) | |||||
return | |||||
} | |||||
if err := msgProto.Validate(); err != nil { | |||||
logger.Error("peer sent us an invalid msg", "msg", msgProto, "err", err) | |||||
_ = r.reporter.Report(behavior.BadMessage(src.ID(), err.Error())) | |||||
return | |||||
} | |||||
r.logger.Debug("received", "msg", msgProto) | |||||
switch msg := msgProto.Sum.(type) { | |||||
case *bcproto.Message_StatusRequest: | |||||
if err := r.io.sendStatusResponse(r.store.Base(), r.store.Height(), src); err != nil { | |||||
logger.Error("Could not send status message to src peer") | |||||
} | |||||
case *bcproto.Message_BlockRequest: | |||||
block := r.store.LoadBlock(msg.BlockRequest.Height) | |||||
if block != nil { | |||||
if err := r.io.sendBlockToPeer(block, src); err != nil { | |||||
logger.Error("Could not send block message to src peer", "err", err) | |||||
} | |||||
} else { | |||||
logger.Info("peer asking for a block we don't have", "height", msg.BlockRequest.Height) | |||||
if err := r.io.sendBlockNotFound(msg.BlockRequest.Height, src); err != nil { | |||||
logger.Error("Couldn't send block not found msg", "err", err) | |||||
} | |||||
} | |||||
case *bcproto.Message_StatusResponse: | |||||
r.mtx.RLock() | |||||
if r.events != nil { | |||||
r.events <- bcStatusResponse{ | |||||
peerID: src.ID(), | |||||
base: msg.StatusResponse.Base, | |||||
height: msg.StatusResponse.Height, | |||||
} | |||||
} | |||||
r.mtx.RUnlock() | |||||
case *bcproto.Message_BlockResponse: | |||||
bi, err := types.BlockFromProto(msg.BlockResponse.Block) | |||||
if err != nil { | |||||
logger.Error("error transitioning block from protobuf", "err", err) | |||||
_ = r.reporter.Report(behavior.BadMessage(src.ID(), err.Error())) | |||||
return | |||||
} | |||||
r.mtx.RLock() | |||||
if r.events != nil { | |||||
r.events <- bcBlockResponse{ | |||||
peerID: src.ID(), | |||||
block: bi, | |||||
size: int64(len(msgBytes)), | |||||
time: time.Now(), | |||||
} | |||||
} | |||||
r.mtx.RUnlock() | |||||
case *bcproto.Message_NoBlockResponse: | |||||
r.mtx.RLock() | |||||
if r.events != nil { | |||||
r.events <- bcNoBlockResponse{ | |||||
peerID: src.ID(), | |||||
height: msg.NoBlockResponse.Height, | |||||
time: time.Now(), | |||||
} | |||||
} | |||||
r.mtx.RUnlock() | |||||
} | |||||
} | |||||
// AddPeer implements Reactor interface | |||||
func (r *BlockchainReactor) AddPeer(peer p2p.Peer) { | |||||
err := r.io.sendStatusResponse(r.store.Base(), r.store.Height(), peer) | |||||
if err != nil { | |||||
r.logger.Error("could not send our status to the new peer", "peer", peer.ID, "err", err) | |||||
} | |||||
err = r.io.sendStatusRequest(peer) | |||||
if err != nil { | |||||
r.logger.Error("could not send status request to the new peer", "peer", peer.ID, "err", err) | |||||
} | |||||
r.mtx.RLock() | |||||
defer r.mtx.RUnlock() | |||||
if r.events != nil { | |||||
r.events <- bcAddNewPeer{peerID: peer.ID()} | |||||
} | |||||
} | |||||
// RemovePeer implements Reactor interface. | |||||
func (r *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { | |||||
r.mtx.RLock() | |||||
defer r.mtx.RUnlock() | |||||
if r.events != nil { | |||||
r.events <- bcRemovePeer{ | |||||
peerID: peer.ID(), | |||||
reason: reason, | |||||
} | |||||
} | |||||
} | |||||
// GetChannels implements Reactor | |||||
func (r *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { | |||||
return []*p2p.ChannelDescriptor{ | |||||
{ | |||||
ID: BlockchainChannel, | |||||
Priority: 5, | |||||
SendQueueCapacity: 2000, | |||||
RecvBufferCapacity: 1024, | |||||
RecvMessageCapacity: blocksync.MaxMsgSize, | |||||
}, | |||||
} | |||||
} | |||||
func (r *BlockchainReactor) GetMaxPeerBlockHeight() int64 { | |||||
r.mtx.RLock() | |||||
defer r.mtx.RUnlock() | |||||
return r.maxPeerHeight | |||||
} | |||||
func (r *BlockchainReactor) GetTotalSyncedTime() time.Duration { | |||||
if !r.blockSync.IsSet() || r.syncStartTime.IsZero() { | |||||
return time.Duration(0) | |||||
} | |||||
return time.Since(r.syncStartTime) | |||||
} | |||||
func (r *BlockchainReactor) GetRemainingSyncTime() time.Duration { | |||||
if !r.blockSync.IsSet() { | |||||
return time.Duration(0) | |||||
} | |||||
r.mtx.RLock() | |||||
defer r.mtx.RUnlock() | |||||
targetSyncs := r.maxPeerHeight - r.syncStartHeight | |||||
currentSyncs := r.syncHeight - r.syncStartHeight + 1 | |||||
if currentSyncs < 0 || r.lastSyncRate < 0.001 { | |||||
return time.Duration(0) | |||||
} | |||||
remain := float64(targetSyncs-currentSyncs) / r.lastSyncRate | |||||
return time.Duration(int64(remain * float64(time.Second))) | |||||
} |
@ -1,533 +0,0 @@ | |||||
package v2 | |||||
import ( | |||||
"fmt" | |||||
"net" | |||||
"os" | |||||
"sync" | |||||
"testing" | |||||
"time" | |||||
"github.com/gogo/protobuf/proto" | |||||
"github.com/stretchr/testify/assert" | |||||
"github.com/stretchr/testify/require" | |||||
dbm "github.com/tendermint/tm-db" | |||||
abciclient "github.com/tendermint/tendermint/abci/client" | |||||
abci "github.com/tendermint/tendermint/abci/types" | |||||
"github.com/tendermint/tendermint/config" | |||||
"github.com/tendermint/tendermint/internal/blocksync/v2/internal/behavior" | |||||
"github.com/tendermint/tendermint/internal/consensus" | |||||
"github.com/tendermint/tendermint/internal/mempool/mock" | |||||
"github.com/tendermint/tendermint/internal/p2p" | |||||
"github.com/tendermint/tendermint/internal/p2p/conn" | |||||
"github.com/tendermint/tendermint/internal/proxy" | |||||
sm "github.com/tendermint/tendermint/internal/state" | |||||
sf "github.com/tendermint/tendermint/internal/state/test/factory" | |||||
tmstore "github.com/tendermint/tendermint/internal/store" | |||||
"github.com/tendermint/tendermint/internal/test/factory" | |||||
"github.com/tendermint/tendermint/libs/log" | |||||
"github.com/tendermint/tendermint/libs/service" | |||||
bcproto "github.com/tendermint/tendermint/proto/tendermint/blocksync" | |||||
"github.com/tendermint/tendermint/types" | |||||
) | |||||
type mockPeer struct { | |||||
service.Service | |||||
id types.NodeID | |||||
} | |||||
func (mp mockPeer) FlushStop() {} | |||||
func (mp mockPeer) ID() types.NodeID { return mp.id } | |||||
func (mp mockPeer) RemoteIP() net.IP { return net.IP{} } | |||||
func (mp mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.RemoteIP(), Port: 8800} } | |||||
func (mp mockPeer) IsOutbound() bool { return true } | |||||
func (mp mockPeer) IsPersistent() bool { return true } | |||||
func (mp mockPeer) CloseConn() error { return nil } | |||||
func (mp mockPeer) NodeInfo() types.NodeInfo { | |||||
return types.NodeInfo{ | |||||
NodeID: "", | |||||
ListenAddr: "", | |||||
} | |||||
} | |||||
func (mp mockPeer) Status() conn.ConnectionStatus { return conn.ConnectionStatus{} } | |||||
func (mp mockPeer) SocketAddr() *p2p.NetAddress { return &p2p.NetAddress{} } | |||||
func (mp mockPeer) Send(byte, []byte) bool { return true } | |||||
func (mp mockPeer) TrySend(byte, []byte) bool { return true } | |||||
func (mp mockPeer) Set(string, interface{}) {} | |||||
func (mp mockPeer) Get(string) interface{} { return struct{}{} } | |||||
//nolint:unused | |||||
type mockBlockStore struct { | |||||
blocks map[int64]*types.Block | |||||
} | |||||
//nolint:unused | |||||
func (ml *mockBlockStore) Height() int64 { | |||||
return int64(len(ml.blocks)) | |||||
} | |||||
//nolint:unused | |||||
func (ml *mockBlockStore) LoadBlock(height int64) *types.Block { | |||||
return ml.blocks[height] | |||||
} | |||||
//nolint:unused | |||||
func (ml *mockBlockStore) SaveBlock(block *types.Block, part *types.PartSet, commit *types.Commit) { | |||||
ml.blocks[block.Height] = block | |||||
} | |||||
type mockBlockApplier struct { | |||||
} | |||||
// XXX: Add whitelist/blacklist? | |||||
func (mba *mockBlockApplier) ApplyBlock( | |||||
state sm.State, blockID types.BlockID, block *types.Block, | |||||
) (sm.State, error) { | |||||
state.LastBlockHeight++ | |||||
return state, nil | |||||
} | |||||
type mockSwitchIo struct { | |||||
mtx sync.Mutex | |||||
switchedToConsensus bool | |||||
numStatusResponse int | |||||
numBlockResponse int | |||||
numNoBlockResponse int | |||||
numStatusRequest int | |||||
} | |||||
var _ iIO = (*mockSwitchIo)(nil) | |||||
func (sio *mockSwitchIo) sendBlockRequest(_ p2p.Peer, _ int64) error { | |||||
return nil | |||||
} | |||||
func (sio *mockSwitchIo) sendStatusResponse(_, _ int64, _ p2p.Peer) error { | |||||
sio.mtx.Lock() | |||||
defer sio.mtx.Unlock() | |||||
sio.numStatusResponse++ | |||||
return nil | |||||
} | |||||
func (sio *mockSwitchIo) sendBlockToPeer(_ *types.Block, _ p2p.Peer) error { | |||||
sio.mtx.Lock() | |||||
defer sio.mtx.Unlock() | |||||
sio.numBlockResponse++ | |||||
return nil | |||||
} | |||||
func (sio *mockSwitchIo) sendBlockNotFound(_ int64, _ p2p.Peer) error { | |||||
sio.mtx.Lock() | |||||
defer sio.mtx.Unlock() | |||||
sio.numNoBlockResponse++ | |||||
return nil | |||||
} | |||||
func (sio *mockSwitchIo) trySwitchToConsensus(_ sm.State, _ bool) bool { | |||||
sio.mtx.Lock() | |||||
defer sio.mtx.Unlock() | |||||
sio.switchedToConsensus = true | |||||
return true | |||||
} | |||||
func (sio *mockSwitchIo) broadcastStatusRequest() error { | |||||
return nil | |||||
} | |||||
func (sio *mockSwitchIo) sendStatusRequest(_ p2p.Peer) error { | |||||
sio.mtx.Lock() | |||||
defer sio.mtx.Unlock() | |||||
sio.numStatusRequest++ | |||||
return nil | |||||
} | |||||
type testReactorParams struct { | |||||
logger log.Logger | |||||
genDoc *types.GenesisDoc | |||||
privVals []types.PrivValidator | |||||
startHeight int64 | |||||
mockA bool | |||||
} | |||||
func newTestReactor(t *testing.T, p testReactorParams) *BlockchainReactor { | |||||
store, state, _ := newReactorStore(t, p.genDoc, p.privVals, p.startHeight) | |||||
reporter := behavior.NewMockReporter() | |||||
var appl blockApplier | |||||
if p.mockA { | |||||
appl = &mockBlockApplier{} | |||||
} else { | |||||
app := &testApp{} | |||||
cc := abciclient.NewLocalCreator(app) | |||||
proxyApp := proxy.NewAppConns(cc) | |||||
err := proxyApp.Start() | |||||
require.NoError(t, err) | |||||
db := dbm.NewMemDB() | |||||
stateStore := sm.NewStore(db) | |||||
blockStore := tmstore.NewBlockStore(dbm.NewMemDB()) | |||||
appl = sm.NewBlockExecutor( | |||||
stateStore, p.logger, proxyApp.Consensus(), mock.Mempool{}, sm.EmptyEvidencePool{}, blockStore) | |||||
err = stateStore.Save(state) | |||||
require.NoError(t, err) | |||||
} | |||||
r := newReactor(state, store, reporter, appl, true, consensus.NopMetrics()) | |||||
logger := log.TestingLogger() | |||||
r.SetLogger(logger.With("module", "blockchain")) | |||||
return r | |||||
} | |||||
// This test is left here and not deleted to retain the termination cases for | |||||
// future improvement in [#4482](https://github.com/tendermint/tendermint/issues/4482). | |||||
// func TestReactorTerminationScenarios(t *testing.T) { | |||||
// config := cfg.ResetTestRoot("blockchain_reactor_v2_test") | |||||
// defer os.RemoveAll(config.RootDir) | |||||
// genDoc, privVals := randGenesisDoc(config.ChainID(), 1, false, 30) | |||||
// refStore, _, _ := newReactorStore(genDoc, privVals, 20) | |||||
// params := testReactorParams{ | |||||
// logger: log.TestingLogger(), | |||||
// genDoc: genDoc, | |||||
// privVals: privVals, | |||||
// startHeight: 10, | |||||
// bufferSize: 100, | |||||
// mockA: true, | |||||
// } | |||||
// type testEvent struct { | |||||
// evType string | |||||
// peer string | |||||
// height int64 | |||||
// } | |||||
// tests := []struct { | |||||
// name string | |||||
// params testReactorParams | |||||
// msgs []testEvent | |||||
// }{ | |||||
// { | |||||
// name: "simple termination on max peer height - one peer", | |||||
// params: params, | |||||
// msgs: []testEvent{ | |||||
// {evType: "AddPeer", peer: "P1"}, | |||||
// {evType: "ReceiveS", peer: "P1", height: 13}, | |||||
// {evType: "BlockReq"}, | |||||
// {evType: "ReceiveB", peer: "P1", height: 11}, | |||||
// {evType: "BlockReq"}, | |||||
// {evType: "BlockReq"}, | |||||
// {evType: "ReceiveB", peer: "P1", height: 12}, | |||||
// {evType: "Process"}, | |||||
// {evType: "ReceiveB", peer: "P1", height: 13}, | |||||
// {evType: "Process"}, | |||||
// }, | |||||
// }, | |||||
// { | |||||
// name: "simple termination on max peer height - two peers", | |||||
// params: params, | |||||
// msgs: []testEvent{ | |||||
// {evType: "AddPeer", peer: "P1"}, | |||||
// {evType: "AddPeer", peer: "P2"}, | |||||
// {evType: "ReceiveS", peer: "P1", height: 13}, | |||||
// {evType: "ReceiveS", peer: "P2", height: 15}, | |||||
// {evType: "BlockReq"}, | |||||
// {evType: "BlockReq"}, | |||||
// {evType: "ReceiveB", peer: "P1", height: 11}, | |||||
// {evType: "ReceiveB", peer: "P2", height: 12}, | |||||
// {evType: "Process"}, | |||||
// {evType: "BlockReq"}, | |||||
// {evType: "BlockReq"}, | |||||
// {evType: "ReceiveB", peer: "P1", height: 13}, | |||||
// {evType: "Process"}, | |||||
// {evType: "ReceiveB", peer: "P2", height: 14}, | |||||
// {evType: "Process"}, | |||||
// {evType: "BlockReq"}, | |||||
// {evType: "ReceiveB", peer: "P2", height: 15}, | |||||
// {evType: "Process"}, | |||||
// }, | |||||
// }, | |||||
// { | |||||
// name: "termination on max peer height - two peers, noBlock error", | |||||
// params: params, | |||||
// msgs: []testEvent{ | |||||
// {evType: "AddPeer", peer: "P1"}, | |||||
// {evType: "AddPeer", peer: "P2"}, | |||||
// {evType: "ReceiveS", peer: "P1", height: 13}, | |||||
// {evType: "ReceiveS", peer: "P2", height: 15}, | |||||
// {evType: "BlockReq"}, | |||||
// {evType: "BlockReq"}, | |||||
// {evType: "ReceiveNB", peer: "P1", height: 11}, | |||||
// {evType: "BlockReq"}, | |||||
// {evType: "ReceiveB", peer: "P2", height: 12}, | |||||
// {evType: "ReceiveB", peer: "P2", height: 11}, | |||||
// {evType: "Process"}, | |||||
// {evType: "BlockReq"}, | |||||
// {evType: "BlockReq"}, | |||||
// {evType: "ReceiveB", peer: "P2", height: 13}, | |||||
// {evType: "Process"}, | |||||
// {evType: "ReceiveB", peer: "P2", height: 14}, | |||||
// {evType: "Process"}, | |||||
// {evType: "BlockReq"}, | |||||
// {evType: "ReceiveB", peer: "P2", height: 15}, | |||||
// {evType: "Process"}, | |||||
// }, | |||||
// }, | |||||
// { | |||||
// name: "termination on max peer height - two peers, remove one peer", | |||||
// params: params, | |||||
// msgs: []testEvent{ | |||||
// {evType: "AddPeer", peer: "P1"}, | |||||
// {evType: "AddPeer", peer: "P2"}, | |||||
// {evType: "ReceiveS", peer: "P1", height: 13}, | |||||
// {evType: "ReceiveS", peer: "P2", height: 15}, | |||||
// {evType: "BlockReq"}, | |||||
// {evType: "BlockReq"}, | |||||
// {evType: "RemovePeer", peer: "P1"}, | |||||
// {evType: "BlockReq"}, | |||||
// {evType: "ReceiveB", peer: "P2", height: 12}, | |||||
// {evType: "ReceiveB", peer: "P2", height: 11}, | |||||
// {evType: "Process"}, | |||||
// {evType: "BlockReq"}, | |||||
// {evType: "BlockReq"}, | |||||
// {evType: "ReceiveB", peer: "P2", height: 13}, | |||||
// {evType: "Process"}, | |||||
// {evType: "ReceiveB", peer: "P2", height: 14}, | |||||
// {evType: "Process"}, | |||||
// {evType: "BlockReq"}, | |||||
// {evType: "ReceiveB", peer: "P2", height: 15}, | |||||
// {evType: "Process"}, | |||||
// }, | |||||
// }, | |||||
// } | |||||
// for _, tt := range tests { | |||||
// tt := tt | |||||
// t.Run(tt.name, func(t *testing.T) { | |||||
// reactor := newTestReactor(params) | |||||
// reactor.Start() | |||||
// reactor.reporter = behavior.NewMockReporter() | |||||
// mockSwitch := &mockSwitchIo{switchedToConsensus: false} | |||||
// reactor.io = mockSwitch | |||||
// // time for go routines to start | |||||
// time.Sleep(time.Millisecond) | |||||
// for _, step := range tt.msgs { | |||||
// switch step.evType { | |||||
// case "AddPeer": | |||||
// reactor.scheduler.send(bcAddNewPeer{peerID: p2p.ID(step.peer)}) | |||||
// case "RemovePeer": | |||||
// reactor.scheduler.send(bcRemovePeer{peerID: p2p.ID(step.peer)}) | |||||
// case "ReceiveS": | |||||
// reactor.scheduler.send(bcStatusResponse{ | |||||
// peerID: p2p.ID(step.peer), | |||||
// height: step.height, | |||||
// time: time.Now(), | |||||
// }) | |||||
// case "ReceiveB": | |||||
// reactor.scheduler.send(bcBlockResponse{ | |||||
// peerID: p2p.ID(step.peer), | |||||
// block: refStore.LoadBlock(step.height), | |||||
// size: 10, | |||||
// time: time.Now(), | |||||
// }) | |||||
// case "ReceiveNB": | |||||
// reactor.scheduler.send(bcNoBlockResponse{ | |||||
// peerID: p2p.ID(step.peer), | |||||
// height: step.height, | |||||
// time: time.Now(), | |||||
// }) | |||||
// case "BlockReq": | |||||
// reactor.scheduler.send(rTrySchedule{time: time.Now()}) | |||||
// case "Process": | |||||
// reactor.processor.send(rProcessBlock{}) | |||||
// } | |||||
// // give time for messages to propagate between routines | |||||
// time.Sleep(time.Millisecond) | |||||
// } | |||||
// // time for processor to finish and reactor to switch to consensus | |||||
// time.Sleep(20 * time.Millisecond) | |||||
// assert.True(t, mockSwitch.hasSwitchedToConsensus()) | |||||
// reactor.Stop() | |||||
// }) | |||||
// } | |||||
// } | |||||
func TestReactorHelperMode(t *testing.T) { | |||||
var ( | |||||
channelID = byte(0x40) | |||||
) | |||||
cfg := config.ResetTestRoot("blockchain_reactor_v2_test") | |||||
defer os.RemoveAll(cfg.RootDir) | |||||
genDoc, privVals := factory.RandGenesisDoc(cfg, 1, false, 30) | |||||
params := testReactorParams{ | |||||
logger: log.TestingLogger(), | |||||
genDoc: genDoc, | |||||
privVals: privVals, | |||||
startHeight: 20, | |||||
mockA: true, | |||||
} | |||||
type testEvent struct { | |||||
peer string | |||||
event interface{} | |||||
} | |||||
tests := []struct { | |||||
name string | |||||
params testReactorParams | |||||
msgs []testEvent | |||||
}{ | |||||
{ | |||||
name: "status request", | |||||
params: params, | |||||
msgs: []testEvent{ | |||||
{"P1", bcproto.StatusRequest{}}, | |||||
{"P1", bcproto.BlockRequest{Height: 13}}, | |||||
{"P1", bcproto.BlockRequest{Height: 20}}, | |||||
{"P1", bcproto.BlockRequest{Height: 22}}, | |||||
}, | |||||
}, | |||||
} | |||||
for _, tt := range tests { | |||||
tt := tt | |||||
t.Run(tt.name, func(t *testing.T) { | |||||
reactor := newTestReactor(t, params) | |||||
mockSwitch := &mockSwitchIo{switchedToConsensus: false} | |||||
reactor.io = mockSwitch | |||||
err := reactor.Start() | |||||
require.NoError(t, err) | |||||
for i := 0; i < len(tt.msgs); i++ { | |||||
step := tt.msgs[i] | |||||
switch ev := step.event.(type) { | |||||
case bcproto.StatusRequest: | |||||
old := mockSwitch.numStatusResponse | |||||
msgProto := new(bcproto.Message) | |||||
require.NoError(t, msgProto.Wrap(&ev)) | |||||
msgBz, err := proto.Marshal(msgProto) | |||||
require.NoError(t, err) | |||||
reactor.Receive(channelID, mockPeer{id: types.NodeID(step.peer)}, msgBz) | |||||
assert.Equal(t, old+1, mockSwitch.numStatusResponse) | |||||
case bcproto.BlockRequest: | |||||
if ev.Height > params.startHeight { | |||||
old := mockSwitch.numNoBlockResponse | |||||
msgProto := new(bcproto.Message) | |||||
require.NoError(t, msgProto.Wrap(&ev)) | |||||
msgBz, err := proto.Marshal(msgProto) | |||||
require.NoError(t, err) | |||||
reactor.Receive(channelID, mockPeer{id: types.NodeID(step.peer)}, msgBz) | |||||
assert.Equal(t, old+1, mockSwitch.numNoBlockResponse) | |||||
} else { | |||||
old := mockSwitch.numBlockResponse | |||||
msgProto := new(bcproto.Message) | |||||
require.NoError(t, msgProto.Wrap(&ev)) | |||||
msgBz, err := proto.Marshal(msgProto) | |||||
require.NoError(t, err) | |||||
reactor.Receive(channelID, mockPeer{id: types.NodeID(step.peer)}, msgBz) | |||||
assert.Equal(t, old+1, mockSwitch.numBlockResponse) | |||||
} | |||||
} | |||||
} | |||||
err = reactor.Stop() | |||||
require.NoError(t, err) | |||||
}) | |||||
} | |||||
} | |||||
func TestReactorSetSwitchNil(t *testing.T) { | |||||
cfg := config.ResetTestRoot("blockchain_reactor_v2_test") | |||||
defer os.RemoveAll(cfg.RootDir) | |||||
genDoc, privVals := factory.RandGenesisDoc(cfg, 1, false, 30) | |||||
reactor := newTestReactor(t, testReactorParams{ | |||||
logger: log.TestingLogger(), | |||||
genDoc: genDoc, | |||||
privVals: privVals, | |||||
}) | |||||
reactor.SetSwitch(nil) | |||||
assert.Nil(t, reactor.Switch) | |||||
assert.Nil(t, reactor.io) | |||||
} | |||||
type testApp struct { | |||||
abci.BaseApplication | |||||
} | |||||
func newReactorStore( | |||||
t *testing.T, | |||||
genDoc *types.GenesisDoc, | |||||
privVals []types.PrivValidator, | |||||
maxBlockHeight int64) (*tmstore.BlockStore, sm.State, *sm.BlockExecutor) { | |||||
t.Helper() | |||||
require.Len(t, privVals, 1) | |||||
app := &testApp{} | |||||
cc := abciclient.NewLocalCreator(app) | |||||
proxyApp := proxy.NewAppConns(cc) | |||||
err := proxyApp.Start() | |||||
if err != nil { | |||||
panic(fmt.Errorf("error start app: %w", err)) | |||||
} | |||||
stateDB := dbm.NewMemDB() | |||||
blockStore := tmstore.NewBlockStore(dbm.NewMemDB()) | |||||
stateStore := sm.NewStore(stateDB) | |||||
state, err := sm.MakeGenesisState(genDoc) | |||||
require.NoError(t, err) | |||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), | |||||
mock.Mempool{}, sm.EmptyEvidencePool{}, blockStore) | |||||
err = stateStore.Save(state) | |||||
require.NoError(t, err) | |||||
// add blocks in | |||||
for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ { | |||||
lastCommit := types.NewCommit(blockHeight-1, 0, types.BlockID{}, nil) | |||||
if blockHeight > 1 { | |||||
lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1) | |||||
lastBlock := blockStore.LoadBlock(blockHeight - 1) | |||||
vote, err := factory.MakeVote( | |||||
privVals[0], | |||||
lastBlock.Header.ChainID, 0, | |||||
lastBlock.Header.Height, 0, 2, | |||||
lastBlockMeta.BlockID, | |||||
time.Now(), | |||||
) | |||||
require.NoError(t, err) | |||||
lastCommit = types.NewCommit(vote.Height, vote.Round, | |||||
lastBlockMeta.BlockID, []types.CommitSig{vote.CommitSig()}) | |||||
} | |||||
thisBlock := sf.MakeBlock(state, blockHeight, lastCommit) | |||||
thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes) | |||||
blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()} | |||||
state, err = blockExec.ApplyBlock(state, blockID, thisBlock) | |||||
require.NoError(t, err) | |||||
blockStore.SaveBlock(thisBlock, thisParts, lastCommit) | |||||
} | |||||
return blockStore, state, blockExec | |||||
} |
@ -1,166 +0,0 @@ | |||||
package v2 | |||||
import ( | |||||
"fmt" | |||||
"strings" | |||||
"sync/atomic" | |||||
"github.com/Workiva/go-datastructures/queue" | |||||
"github.com/tendermint/tendermint/libs/log" | |||||
) | |||||
type handleFunc = func(event Event) (Event, error) | |||||
const historySize = 25 | |||||
// Routine is a structure that models a finite state machine as serialized | |||||
// stream of events processed by a handle function. This Routine structure | |||||
// handles the concurrency and messaging guarantees. Events are sent via | |||||
// `send` are handled by the `handle` function to produce an iterator | |||||
// `next()`. Calling `stop()` on a routine will conclude processing of all | |||||
// sent events and produce `final()` event representing the terminal state. | |||||
type Routine struct { | |||||
name string | |||||
handle handleFunc | |||||
queue *queue.PriorityQueue | |||||
history []Event | |||||
out chan Event | |||||
fin chan error | |||||
rdy chan struct{} | |||||
running *uint32 | |||||
logger log.Logger | |||||
metrics *Metrics | |||||
} | |||||
func newRoutine(name string, handleFunc handleFunc, bufferSize int) *Routine { | |||||
return &Routine{ | |||||
name: name, | |||||
handle: handleFunc, | |||||
queue: queue.NewPriorityQueue(bufferSize, true), | |||||
history: make([]Event, 0, historySize), | |||||
out: make(chan Event, bufferSize), | |||||
rdy: make(chan struct{}, 1), | |||||
fin: make(chan error, 1), | |||||
running: new(uint32), | |||||
logger: log.NewNopLogger(), | |||||
metrics: NopMetrics(), | |||||
} | |||||
} | |||||
func (rt *Routine) setLogger(logger log.Logger) { | |||||
rt.logger = logger | |||||
} | |||||
// nolint:unused | |||||
func (rt *Routine) setMetrics(metrics *Metrics) { | |||||
rt.metrics = metrics | |||||
} | |||||
func (rt *Routine) start() { | |||||
rt.logger.Info(fmt.Sprintf("%s: run", rt.name)) | |||||
running := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1)) | |||||
if !running { | |||||
panic(fmt.Sprintf("%s is already running", rt.name)) | |||||
} | |||||
close(rt.rdy) | |||||
defer func() { | |||||
if r := recover(); r != nil { | |||||
var ( | |||||
b strings.Builder | |||||
j int | |||||
) | |||||
for i := len(rt.history) - 1; i >= 0; i-- { | |||||
fmt.Fprintf(&b, "%d: %+v\n", j, rt.history[i]) | |||||
j++ | |||||
} | |||||
panic(fmt.Sprintf("%v\nlast events:\n%v", r, b.String())) | |||||
} | |||||
stopped := atomic.CompareAndSwapUint32(rt.running, uint32(1), uint32(0)) | |||||
if !stopped { | |||||
panic(fmt.Sprintf("%s is failed to stop", rt.name)) | |||||
} | |||||
}() | |||||
for { | |||||
events, err := rt.queue.Get(1) | |||||
if err == queue.ErrDisposed { | |||||
rt.terminate(nil) | |||||
return | |||||
} else if err != nil { | |||||
rt.terminate(err) | |||||
return | |||||
} | |||||
oEvent, err := rt.handle(events[0].(Event)) | |||||
rt.metrics.EventsHandled.With("routine", rt.name).Add(1) | |||||
if err != nil { | |||||
rt.terminate(err) | |||||
return | |||||
} | |||||
rt.metrics.EventsOut.With("routine", rt.name).Add(1) | |||||
rt.logger.Debug(fmt.Sprintf("%s: produced %T %+v", rt.name, oEvent, oEvent)) | |||||
// Skip rTrySchedule and rProcessBlock events as they clutter the history | |||||
// due to their frequency. | |||||
switch events[0].(type) { | |||||
case rTrySchedule: | |||||
case rProcessBlock: | |||||
default: | |||||
rt.history = append(rt.history, events[0].(Event)) | |||||
if len(rt.history) > historySize { | |||||
rt.history = rt.history[1:] | |||||
} | |||||
} | |||||
rt.out <- oEvent | |||||
} | |||||
} | |||||
// XXX: look into returning OpError in the net package | |||||
func (rt *Routine) send(event Event) bool { | |||||
rt.logger.Debug(fmt.Sprintf("%s: received %T %+v", rt.name, event, event)) | |||||
if !rt.isRunning() { | |||||
return false | |||||
} | |||||
err := rt.queue.Put(event) | |||||
if err != nil { | |||||
rt.metrics.EventsShed.With("routine", rt.name).Add(1) | |||||
rt.logger.Error(fmt.Sprintf("%s: send failed, queue was full/stopped", rt.name)) | |||||
return false | |||||
} | |||||
rt.metrics.EventsSent.With("routine", rt.name).Add(1) | |||||
return true | |||||
} | |||||
func (rt *Routine) isRunning() bool { | |||||
return atomic.LoadUint32(rt.running) == 1 | |||||
} | |||||
func (rt *Routine) next() chan Event { | |||||
return rt.out | |||||
} | |||||
func (rt *Routine) ready() chan struct{} { | |||||
return rt.rdy | |||||
} | |||||
func (rt *Routine) stop() { | |||||
if !rt.isRunning() { // XXX: this should check rt.queue.Disposed() | |||||
return | |||||
} | |||||
rt.logger.Info(fmt.Sprintf("%s: stop", rt.name)) | |||||
rt.queue.Dispose() // this should block until all queue items are free? | |||||
} | |||||
func (rt *Routine) final() chan error { | |||||
return rt.fin | |||||
} | |||||
// XXX: Maybe get rid of this | |||||
func (rt *Routine) terminate(reason error) { | |||||
// We don't close the rt.out channel here, to avoid spinning on the closed channel | |||||
// in the event loop. | |||||
rt.fin <- reason | |||||
} |
@ -1,163 +0,0 @@ | |||||
package v2 | |||||
import ( | |||||
"fmt" | |||||
"testing" | |||||
"time" | |||||
"github.com/stretchr/testify/assert" | |||||
) | |||||
type eventA struct { | |||||
priorityNormal | |||||
} | |||||
var errDone = fmt.Errorf("done") | |||||
func simpleHandler(event Event) (Event, error) { | |||||
if _, ok := event.(eventA); ok { | |||||
return noOp, errDone | |||||
} | |||||
return noOp, nil | |||||
} | |||||
func TestRoutineFinal(t *testing.T) { | |||||
var ( | |||||
bufferSize = 10 | |||||
routine = newRoutine("simpleRoutine", simpleHandler, bufferSize) | |||||
) | |||||
assert.False(t, routine.isRunning(), | |||||
"expected an initialized routine to not be running") | |||||
go routine.start() | |||||
<-routine.ready() | |||||
assert.True(t, routine.isRunning(), | |||||
"expected an started routine") | |||||
assert.True(t, routine.send(eventA{}), | |||||
"expected sending to a ready routine to succeed") | |||||
assert.Equal(t, errDone, <-routine.final(), | |||||
"expected the final event to be done") | |||||
assert.False(t, routine.isRunning(), | |||||
"expected an completed routine to no longer be running") | |||||
} | |||||
func TestRoutineStop(t *testing.T) { | |||||
var ( | |||||
bufferSize = 10 | |||||
routine = newRoutine("simpleRoutine", simpleHandler, bufferSize) | |||||
) | |||||
assert.False(t, routine.send(eventA{}), | |||||
"expected sending to an unstarted routine to fail") | |||||
go routine.start() | |||||
<-routine.ready() | |||||
assert.True(t, routine.send(eventA{}), | |||||
"expected sending to a running routine to succeed") | |||||
routine.stop() | |||||
assert.False(t, routine.send(eventA{}), | |||||
"expected sending to a stopped routine to fail") | |||||
} | |||||
type finalCount struct { | |||||
count int | |||||
} | |||||
func (f finalCount) Error() string { | |||||
return "end" | |||||
} | |||||
func genStatefulHandler(maxCount int) handleFunc { | |||||
counter := 0 | |||||
return func(event Event) (Event, error) { | |||||
if _, ok := event.(eventA); ok { | |||||
counter++ | |||||
if counter >= maxCount { | |||||
return noOp, finalCount{counter} | |||||
} | |||||
return eventA{}, nil | |||||
} | |||||
return noOp, nil | |||||
} | |||||
} | |||||
func feedback(r *Routine) { | |||||
for event := range r.next() { | |||||
r.send(event) | |||||
} | |||||
} | |||||
func TestStatefulRoutine(t *testing.T) { | |||||
var ( | |||||
count = 10 | |||||
handler = genStatefulHandler(count) | |||||
bufferSize = 20 | |||||
routine = newRoutine("statefulRoutine", handler, bufferSize) | |||||
) | |||||
go routine.start() | |||||
go feedback(routine) | |||||
<-routine.ready() | |||||
assert.True(t, routine.send(eventA{}), | |||||
"expected sending to a started routine to succeed") | |||||
final := <-routine.final() | |||||
if fnl, ok := final.(finalCount); ok { | |||||
assert.Equal(t, count, fnl.count, | |||||
"expected the routine to count to 10") | |||||
} else { | |||||
t.Fail() | |||||
} | |||||
} | |||||
type lowPriorityEvent struct { | |||||
priorityLow | |||||
} | |||||
type highPriorityEvent struct { | |||||
priorityHigh | |||||
} | |||||
func handleWithPriority(event Event) (Event, error) { | |||||
switch event.(type) { | |||||
case lowPriorityEvent: | |||||
return noOp, nil | |||||
case highPriorityEvent: | |||||
return noOp, errDone | |||||
} | |||||
return noOp, nil | |||||
} | |||||
func TestPriority(t *testing.T) { | |||||
var ( | |||||
bufferSize = 20 | |||||
routine = newRoutine("priorityRoutine", handleWithPriority, bufferSize) | |||||
) | |||||
go routine.start() | |||||
<-routine.ready() | |||||
go func() { | |||||
for { | |||||
routine.send(lowPriorityEvent{}) | |||||
time.Sleep(1 * time.Millisecond) | |||||
} | |||||
}() | |||||
time.Sleep(10 * time.Millisecond) | |||||
assert.True(t, routine.isRunning(), | |||||
"expected an started routine") | |||||
assert.True(t, routine.send(highPriorityEvent{}), | |||||
"expected send to succeed even when saturated") | |||||
assert.Equal(t, errDone, <-routine.final()) | |||||
assert.False(t, routine.isRunning(), | |||||
"expected an started routine") | |||||
} |
@ -1,711 +0,0 @@ | |||||
package v2 | |||||
import ( | |||||
"bytes" | |||||
"errors" | |||||
"fmt" | |||||
"math" | |||||
"sort" | |||||
"time" | |||||
"github.com/tendermint/tendermint/types" | |||||
) | |||||
// Events generated by the scheduler: | |||||
// all blocks have been processed | |||||
type scFinishedEv struct { | |||||
priorityNormal | |||||
reason string | |||||
} | |||||
func (e scFinishedEv) String() string { | |||||
return fmt.Sprintf("scFinishedEv{%v}", e.reason) | |||||
} | |||||
// send a blockRequest message | |||||
type scBlockRequest struct { | |||||
priorityNormal | |||||
peerID types.NodeID | |||||
height int64 | |||||
} | |||||
func (e scBlockRequest) String() string { | |||||
return fmt.Sprintf("scBlockRequest{%d from %v}", e.height, e.peerID) | |||||
} | |||||
// a block has been received and validated by the scheduler | |||||
type scBlockReceived struct { | |||||
priorityNormal | |||||
peerID types.NodeID | |||||
block *types.Block | |||||
} | |||||
func (e scBlockReceived) String() string { | |||||
return fmt.Sprintf("scBlockReceived{%d#%X from %v}", e.block.Height, e.block.Hash(), e.peerID) | |||||
} | |||||
// scheduler detected a peer error | |||||
type scPeerError struct { | |||||
priorityHigh | |||||
peerID types.NodeID | |||||
reason error | |||||
} | |||||
func (e scPeerError) String() string { | |||||
return fmt.Sprintf("scPeerError{%v errored with %v}", e.peerID, e.reason) | |||||
} | |||||
// scheduler removed a set of peers (timed out or slow peer) | |||||
type scPeersPruned struct { | |||||
priorityHigh | |||||
peers []types.NodeID | |||||
} | |||||
func (e scPeersPruned) String() string { | |||||
return fmt.Sprintf("scPeersPruned{%v}", e.peers) | |||||
} | |||||
// XXX: make this fatal? | |||||
// scheduler encountered a fatal error | |||||
type scSchedulerFail struct { | |||||
priorityHigh | |||||
reason error | |||||
} | |||||
func (e scSchedulerFail) String() string { | |||||
return fmt.Sprintf("scSchedulerFail{%v}", e.reason) | |||||
} | |||||
type blockState int | |||||
const ( | |||||
blockStateUnknown blockState = iota + 1 // no known peer has this block | |||||
blockStateNew // indicates that a peer has reported having this block | |||||
blockStatePending // indicates that this block has been requested from a peer | |||||
blockStateReceived // indicates that this block has been received by a peer | |||||
blockStateProcessed // indicates that this block has been applied | |||||
) | |||||
func (e blockState) String() string { | |||||
switch e { | |||||
case blockStateUnknown: | |||||
return "Unknown" | |||||
case blockStateNew: | |||||
return "New" | |||||
case blockStatePending: | |||||
return "Pending" | |||||
case blockStateReceived: | |||||
return "Received" | |||||
case blockStateProcessed: | |||||
return "Processed" | |||||
default: | |||||
return fmt.Sprintf("invalid blockState: %d", e) | |||||
} | |||||
} | |||||
type peerState int | |||||
const ( | |||||
peerStateNew = iota + 1 | |||||
peerStateReady | |||||
peerStateRemoved | |||||
) | |||||
func (e peerState) String() string { | |||||
switch e { | |||||
case peerStateNew: | |||||
return "New" | |||||
case peerStateReady: | |||||
return "Ready" | |||||
case peerStateRemoved: | |||||
return "Removed" | |||||
default: | |||||
panic(fmt.Sprintf("unknown peerState: %d", e)) | |||||
} | |||||
} | |||||
type scPeer struct { | |||||
peerID types.NodeID | |||||
// initialized as New when peer is added, updated to Ready when statusUpdate is received, | |||||
// updated to Removed when peer is removed | |||||
state peerState | |||||
base int64 // updated when statusResponse is received | |||||
height int64 // updated when statusResponse is received | |||||
lastTouched time.Time | |||||
lastRate int64 // last receive rate in bytes | |||||
} | |||||
func (p scPeer) String() string { | |||||
return fmt.Sprintf("{state %v, base %d, height %d, lastTouched %v, lastRate %d, id %v}", | |||||
p.state, p.base, p.height, p.lastTouched, p.lastRate, p.peerID) | |||||
} | |||||
func newScPeer(peerID types.NodeID) *scPeer { | |||||
return &scPeer{ | |||||
peerID: peerID, | |||||
state: peerStateNew, | |||||
base: -1, | |||||
height: -1, | |||||
lastTouched: time.Time{}, | |||||
} | |||||
} | |||||
// The scheduler keep track of the state of each block and each peer. The | |||||
// scheduler will attempt to schedule new block requests with `trySchedule` | |||||
// events and remove slow peers with `tryPrune` events. | |||||
type scheduler struct { | |||||
initHeight int64 | |||||
// next block that needs to be processed. All blocks with smaller height are | |||||
// in Processed state. | |||||
height int64 | |||||
// lastAdvance tracks the last time a block execution happened. | |||||
// syncTimeout is the maximum time the scheduler waits to advance in the block sync process before finishing. | |||||
// This covers the cases where there are no peers or all peers have a lower height. | |||||
lastAdvance time.Time | |||||
syncTimeout time.Duration | |||||
// a map of peerID to scheduler specific peer struct `scPeer` used to keep | |||||
// track of peer specific state | |||||
peers map[types.NodeID]*scPeer | |||||
peerTimeout time.Duration // maximum response time from a peer otherwise prune | |||||
minRecvRate int64 // minimum receive rate from peer otherwise prune | |||||
// the maximum number of blocks that should be New, Received or Pending at any point | |||||
// in time. This is used to enforce a limit on the blockStates map. | |||||
targetPending int | |||||
// a list of blocks to be scheduled (New), Pending or Received. Its length should be | |||||
// smaller than targetPending. | |||||
blockStates map[int64]blockState | |||||
// a map of heights to the peer we are waiting a response from | |||||
pendingBlocks map[int64]types.NodeID | |||||
// the time at which a block was put in blockStatePending | |||||
pendingTime map[int64]time.Time | |||||
// a map of heights to the peers that put the block in blockStateReceived | |||||
receivedBlocks map[int64]types.NodeID | |||||
} | |||||
func (sc scheduler) String() string { | |||||
return fmt.Sprintf("ih: %d, bst: %v, peers: %v, pblks: %v, ptm %v, rblks: %v", | |||||
sc.initHeight, sc.blockStates, sc.peers, sc.pendingBlocks, sc.pendingTime, sc.receivedBlocks) | |||||
} | |||||
func newScheduler(initHeight int64, startTime time.Time) *scheduler { | |||||
sc := scheduler{ | |||||
initHeight: initHeight, | |||||
lastAdvance: startTime, | |||||
syncTimeout: 60 * time.Second, | |||||
height: initHeight, | |||||
blockStates: make(map[int64]blockState), | |||||
peers: make(map[types.NodeID]*scPeer), | |||||
pendingBlocks: make(map[int64]types.NodeID), | |||||
pendingTime: make(map[int64]time.Time), | |||||
receivedBlocks: make(map[int64]types.NodeID), | |||||
targetPending: 10, // TODO - pass as param | |||||
peerTimeout: 15 * time.Second, // TODO - pass as param | |||||
minRecvRate: 0, // int64(7680), TODO - pass as param | |||||
} | |||||
return &sc | |||||
} | |||||
func (sc *scheduler) ensurePeer(peerID types.NodeID) *scPeer { | |||||
if _, ok := sc.peers[peerID]; !ok { | |||||
sc.peers[peerID] = newScPeer(peerID) | |||||
} | |||||
return sc.peers[peerID] | |||||
} | |||||
func (sc *scheduler) touchPeer(peerID types.NodeID, time time.Time) error { | |||||
peer, ok := sc.peers[peerID] | |||||
if !ok { | |||||
return fmt.Errorf("couldn't find peer %s", peerID) | |||||
} | |||||
if peer.state != peerStateReady { | |||||
return fmt.Errorf("tried to touch peer in state %s, must be Ready", peer.state) | |||||
} | |||||
peer.lastTouched = time | |||||
return nil | |||||
} | |||||
func (sc *scheduler) removePeer(peerID types.NodeID) { | |||||
peer, ok := sc.peers[peerID] | |||||
if !ok { | |||||
return | |||||
} | |||||
if peer.state == peerStateRemoved { | |||||
return | |||||
} | |||||
for height, pendingPeerID := range sc.pendingBlocks { | |||||
if pendingPeerID == peerID { | |||||
sc.setStateAtHeight(height, blockStateNew) | |||||
delete(sc.pendingTime, height) | |||||
delete(sc.pendingBlocks, height) | |||||
} | |||||
} | |||||
for height, rcvPeerID := range sc.receivedBlocks { | |||||
if rcvPeerID == peerID { | |||||
sc.setStateAtHeight(height, blockStateNew) | |||||
delete(sc.receivedBlocks, height) | |||||
} | |||||
} | |||||
// remove the blocks from blockStates if the peer removal causes the max peer height to be lower. | |||||
peer.state = peerStateRemoved | |||||
maxPeerHeight := int64(0) | |||||
for _, otherPeer := range sc.peers { | |||||
if otherPeer.state != peerStateReady { | |||||
continue | |||||
} | |||||
if otherPeer.peerID != peer.peerID && otherPeer.height > maxPeerHeight { | |||||
maxPeerHeight = otherPeer.height | |||||
} | |||||
} | |||||
for h := range sc.blockStates { | |||||
if h > maxPeerHeight { | |||||
delete(sc.blockStates, h) | |||||
} | |||||
} | |||||
} | |||||
// check if the blockPool is running low and add new blocks in New state to be requested. | |||||
// This function is called when there is an increase in the maximum peer height or when | |||||
// blocks are processed. | |||||
func (sc *scheduler) addNewBlocks() { | |||||
if len(sc.blockStates) >= sc.targetPending { | |||||
return | |||||
} | |||||
for i := sc.height; i < int64(sc.targetPending)+sc.height; i++ { | |||||
if i > sc.maxHeight() { | |||||
break | |||||
} | |||||
if sc.getStateAtHeight(i) == blockStateUnknown { | |||||
sc.setStateAtHeight(i, blockStateNew) | |||||
} | |||||
} | |||||
} | |||||
func (sc *scheduler) setPeerRange(peerID types.NodeID, base int64, height int64) error { | |||||
peer := sc.ensurePeer(peerID) | |||||
if peer.state == peerStateRemoved { | |||||
return nil // noop | |||||
} | |||||
if height < peer.height { | |||||
sc.removePeer(peerID) | |||||
return fmt.Errorf("cannot move peer height lower. from %d to %d", peer.height, height) | |||||
} | |||||
if base > height { | |||||
sc.removePeer(peerID) | |||||
return fmt.Errorf("cannot set peer base higher than its height") | |||||
} | |||||
peer.base = base | |||||
peer.height = height | |||||
peer.state = peerStateReady | |||||
sc.addNewBlocks() | |||||
return nil | |||||
} | |||||
func (sc *scheduler) getStateAtHeight(height int64) blockState { | |||||
if height < sc.height { | |||||
return blockStateProcessed | |||||
} else if state, ok := sc.blockStates[height]; ok { | |||||
return state | |||||
} else { | |||||
return blockStateUnknown | |||||
} | |||||
} | |||||
func (sc *scheduler) getPeersWithHeight(height int64) []types.NodeID { | |||||
peers := make([]types.NodeID, 0) | |||||
for _, peer := range sc.peers { | |||||
if peer.state != peerStateReady { | |||||
continue | |||||
} | |||||
if peer.base <= height && peer.height >= height { | |||||
peers = append(peers, peer.peerID) | |||||
} | |||||
} | |||||
return peers | |||||
} | |||||
func (sc *scheduler) prunablePeers(peerTimout time.Duration, minRecvRate int64, now time.Time) []types.NodeID { | |||||
prunable := make([]types.NodeID, 0) | |||||
for peerID, peer := range sc.peers { | |||||
if peer.state != peerStateReady { | |||||
continue | |||||
} | |||||
if now.Sub(peer.lastTouched) > peerTimout || peer.lastRate < minRecvRate { | |||||
prunable = append(prunable, peerID) | |||||
} | |||||
} | |||||
// Tests for handleTryPrunePeer() may fail without sort due to range non-determinism | |||||
sort.Sort(PeerByID(prunable)) | |||||
return prunable | |||||
} | |||||
func (sc *scheduler) setStateAtHeight(height int64, state blockState) { | |||||
sc.blockStates[height] = state | |||||
} | |||||
// CONTRACT: peer exists and in Ready state. | |||||
func (sc *scheduler) markReceived(peerID types.NodeID, height int64, size int64, now time.Time) error { | |||||
peer := sc.peers[peerID] | |||||
if state := sc.getStateAtHeight(height); state != blockStatePending || sc.pendingBlocks[height] != peerID { | |||||
return fmt.Errorf("received block %d from peer %s without being requested", height, peerID) | |||||
} | |||||
pendingTime, ok := sc.pendingTime[height] | |||||
if !ok || now.Sub(pendingTime) <= 0 { | |||||
return fmt.Errorf("clock error: block %d received at %s but requested at %s", | |||||
height, pendingTime, now) | |||||
} | |||||
peer.lastRate = size / now.Sub(pendingTime).Nanoseconds() | |||||
sc.setStateAtHeight(height, blockStateReceived) | |||||
delete(sc.pendingBlocks, height) | |||||
delete(sc.pendingTime, height) | |||||
sc.receivedBlocks[height] = peerID | |||||
return nil | |||||
} | |||||
func (sc *scheduler) markPending(peerID types.NodeID, height int64, time time.Time) error { | |||||
state := sc.getStateAtHeight(height) | |||||
if state != blockStateNew { | |||||
return fmt.Errorf("block %d should be in blockStateNew but is %s", height, state) | |||||
} | |||||
peer, ok := sc.peers[peerID] | |||||
if !ok { | |||||
return fmt.Errorf("cannot find peer %s", peerID) | |||||
} | |||||
if peer.state != peerStateReady { | |||||
return fmt.Errorf("cannot schedule %d from %s in %s", height, peerID, peer.state) | |||||
} | |||||
if height > peer.height { | |||||
return fmt.Errorf("cannot request height %d from peer %s that is at height %d", | |||||
height, peerID, peer.height) | |||||
} | |||||
if height < peer.base { | |||||
return fmt.Errorf("cannot request height %d for peer %s with base %d", | |||||
height, peerID, peer.base) | |||||
} | |||||
sc.setStateAtHeight(height, blockStatePending) | |||||
sc.pendingBlocks[height] = peerID | |||||
sc.pendingTime[height] = time | |||||
return nil | |||||
} | |||||
func (sc *scheduler) markProcessed(height int64) error { | |||||
// It is possible that a peer error or timeout is handled after the processor | |||||
// has processed the block but before the scheduler received this event, so | |||||
// when pcBlockProcessed event is received, the block had been requested | |||||
// again => don't check the block state. | |||||
sc.lastAdvance = time.Now() | |||||
sc.height = height + 1 | |||||
delete(sc.pendingBlocks, height) | |||||
delete(sc.pendingTime, height) | |||||
delete(sc.receivedBlocks, height) | |||||
delete(sc.blockStates, height) | |||||
sc.addNewBlocks() | |||||
return nil | |||||
} | |||||
func (sc *scheduler) allBlocksProcessed() bool { | |||||
if len(sc.peers) == 0 { | |||||
return false | |||||
} | |||||
return sc.height >= sc.maxHeight() | |||||
} | |||||
// returns max peer height or the last processed block, i.e. sc.height | |||||
func (sc *scheduler) maxHeight() int64 { | |||||
max := sc.height - 1 | |||||
for _, peer := range sc.peers { | |||||
if peer.state != peerStateReady { | |||||
continue | |||||
} | |||||
if max < peer.height { | |||||
max = peer.height | |||||
} | |||||
} | |||||
return max | |||||
} | |||||
// lowest block in sc.blockStates with state == blockStateNew or -1 if no new blocks | |||||
func (sc *scheduler) nextHeightToSchedule() int64 { | |||||
var min int64 = math.MaxInt64 | |||||
for height, state := range sc.blockStates { | |||||
if state == blockStateNew && height < min { | |||||
min = height | |||||
} | |||||
} | |||||
if min == math.MaxInt64 { | |||||
min = -1 | |||||
} | |||||
return min | |||||
} | |||||
func (sc *scheduler) pendingFrom(peerID types.NodeID) []int64 { | |||||
var heights []int64 | |||||
for height, pendingPeerID := range sc.pendingBlocks { | |||||
if pendingPeerID == peerID { | |||||
heights = append(heights, height) | |||||
} | |||||
} | |||||
return heights | |||||
} | |||||
func (sc *scheduler) selectPeer(height int64) (types.NodeID, error) { | |||||
peers := sc.getPeersWithHeight(height) | |||||
if len(peers) == 0 { | |||||
return "", fmt.Errorf("cannot find peer for height %d", height) | |||||
} | |||||
// create a map from number of pending requests to a list | |||||
// of peers having that number of pending requests. | |||||
pendingFrom := make(map[int][]types.NodeID) | |||||
for _, peerID := range peers { | |||||
numPending := len(sc.pendingFrom(peerID)) | |||||
pendingFrom[numPending] = append(pendingFrom[numPending], peerID) | |||||
} | |||||
// find the set of peers with minimum number of pending requests. | |||||
var minPending int64 = math.MaxInt64 | |||||
for mp := range pendingFrom { | |||||
if int64(mp) < minPending { | |||||
minPending = int64(mp) | |||||
} | |||||
} | |||||
sort.Sort(PeerByID(pendingFrom[int(minPending)])) | |||||
return pendingFrom[int(minPending)][0], nil | |||||
} | |||||
// PeerByID is a list of peers sorted by peerID. | |||||
type PeerByID []types.NodeID | |||||
func (peers PeerByID) Len() int { | |||||
return len(peers) | |||||
} | |||||
func (peers PeerByID) Less(i, j int) bool { | |||||
return bytes.Compare([]byte(peers[i]), []byte(peers[j])) == -1 | |||||
} | |||||
func (peers PeerByID) Swap(i, j int) { | |||||
peers[i], peers[j] = peers[j], peers[i] | |||||
} | |||||
// Handlers | |||||
// This handler gets the block, performs some validation and then passes it on to the processor. | |||||
func (sc *scheduler) handleBlockResponse(event bcBlockResponse) (Event, error) { | |||||
err := sc.touchPeer(event.peerID, event.time) | |||||
if err != nil { | |||||
// peer does not exist OR not ready | |||||
return noOp, nil | |||||
} | |||||
err = sc.markReceived(event.peerID, event.block.Height, event.size, event.time) | |||||
if err != nil { | |||||
sc.removePeer(event.peerID) | |||||
return scPeerError{peerID: event.peerID, reason: err}, nil | |||||
} | |||||
return scBlockReceived{peerID: event.peerID, block: event.block}, nil | |||||
} | |||||
func (sc *scheduler) handleNoBlockResponse(event bcNoBlockResponse) (Event, error) { | |||||
// No such peer or peer was removed. | |||||
peer, ok := sc.peers[event.peerID] | |||||
if !ok || peer.state == peerStateRemoved { | |||||
return noOp, nil | |||||
} | |||||
// The peer may have been just removed due to errors, low speed or timeouts. | |||||
sc.removePeer(event.peerID) | |||||
return scPeerError{peerID: event.peerID, | |||||
reason: fmt.Errorf("peer %v with base %d height %d claims no block for %d", | |||||
event.peerID, peer.base, peer.height, event.height)}, nil | |||||
} | |||||
func (sc *scheduler) handleBlockProcessed(event pcBlockProcessed) (Event, error) { | |||||
if event.height != sc.height { | |||||
panic(fmt.Sprintf("processed height %d, but expected height %d", event.height, sc.height)) | |||||
} | |||||
err := sc.markProcessed(event.height) | |||||
if err != nil { | |||||
return scSchedulerFail{reason: err}, nil | |||||
} | |||||
if sc.allBlocksProcessed() { | |||||
return scFinishedEv{reason: "processed all blocks"}, nil | |||||
} | |||||
return noOp, nil | |||||
} | |||||
// Handles an error from the processor. The processor had already cleaned the blocks from | |||||
// the peers included in this event. Just attempt to remove the peers. | |||||
func (sc *scheduler) handleBlockProcessError(event pcBlockVerificationFailure) (Event, error) { | |||||
// The peers may have been just removed due to errors, low speed or timeouts. | |||||
sc.removePeer(event.firstPeerID) | |||||
if event.firstPeerID != event.secondPeerID { | |||||
sc.removePeer(event.secondPeerID) | |||||
} | |||||
if sc.allBlocksProcessed() { | |||||
return scFinishedEv{reason: "error on last block"}, nil | |||||
} | |||||
return noOp, nil | |||||
} | |||||
func (sc *scheduler) handleAddNewPeer(event bcAddNewPeer) (Event, error) { | |||||
sc.ensurePeer(event.peerID) | |||||
return noOp, nil | |||||
} | |||||
func (sc *scheduler) handleRemovePeer(event bcRemovePeer) (Event, error) { | |||||
sc.removePeer(event.peerID) | |||||
if sc.allBlocksProcessed() { | |||||
return scFinishedEv{reason: "removed peer"}, nil | |||||
} | |||||
// Return scPeerError so the peer (and all associated blocks) is removed from | |||||
// the processor. | |||||
return scPeerError{peerID: event.peerID, reason: errors.New("peer was stopped")}, nil | |||||
} | |||||
func (sc *scheduler) handleTryPrunePeer(event rTryPrunePeer) (Event, error) { | |||||
// Check behavior of peer responsible to deliver block at sc.height. | |||||
timeHeightAsked, ok := sc.pendingTime[sc.height] | |||||
if ok && time.Since(timeHeightAsked) > sc.peerTimeout { | |||||
// A request was sent to a peer for block at sc.height but a response was not received | |||||
// from that peer within sc.peerTimeout. Remove the peer. This is to ensure that a peer | |||||
// will be timed out even if it sends blocks at higher heights but prevents progress by | |||||
// not sending the block at current height. | |||||
sc.removePeer(sc.pendingBlocks[sc.height]) | |||||
} | |||||
prunablePeers := sc.prunablePeers(sc.peerTimeout, sc.minRecvRate, event.time) | |||||
if len(prunablePeers) == 0 { | |||||
return noOp, nil | |||||
} | |||||
for _, peerID := range prunablePeers { | |||||
sc.removePeer(peerID) | |||||
} | |||||
// If all blocks are processed we should finish. | |||||
if sc.allBlocksProcessed() { | |||||
return scFinishedEv{reason: "after try prune"}, nil | |||||
} | |||||
return scPeersPruned{peers: prunablePeers}, nil | |||||
} | |||||
func (sc *scheduler) handleResetState(event bcResetState) (Event, error) { | |||||
initHeight := event.state.LastBlockHeight + 1 | |||||
if initHeight == 1 { | |||||
initHeight = event.state.InitialHeight | |||||
} | |||||
sc.initHeight = initHeight | |||||
sc.height = initHeight | |||||
sc.lastAdvance = time.Now() | |||||
sc.addNewBlocks() | |||||
return noOp, nil | |||||
} | |||||
func (sc *scheduler) handleTrySchedule(event rTrySchedule) (Event, error) { | |||||
if time.Since(sc.lastAdvance) > sc.syncTimeout { | |||||
return scFinishedEv{reason: "timeout, no advance"}, nil | |||||
} | |||||
nextHeight := sc.nextHeightToSchedule() | |||||
if nextHeight == -1 { | |||||
return noOp, nil | |||||
} | |||||
bestPeerID, err := sc.selectPeer(nextHeight) | |||||
if err != nil { | |||||
return scSchedulerFail{reason: err}, nil | |||||
} | |||||
if err := sc.markPending(bestPeerID, nextHeight, event.time); err != nil { | |||||
return scSchedulerFail{reason: err}, nil // XXX: peerError might be more appropriate | |||||
} | |||||
return scBlockRequest{peerID: bestPeerID, height: nextHeight}, nil | |||||
} | |||||
func (sc *scheduler) handleStatusResponse(event bcStatusResponse) (Event, error) { | |||||
err := sc.setPeerRange(event.peerID, event.base, event.height) | |||||
if err != nil { | |||||
return scPeerError{peerID: event.peerID, reason: err}, nil | |||||
} | |||||
return noOp, nil | |||||
} | |||||
func (sc *scheduler) handle(event Event) (Event, error) { | |||||
switch event := event.(type) { | |||||
case bcResetState: | |||||
nextEvent, err := sc.handleResetState(event) | |||||
return nextEvent, err | |||||
case bcStatusResponse: | |||||
nextEvent, err := sc.handleStatusResponse(event) | |||||
return nextEvent, err | |||||
case bcBlockResponse: | |||||
nextEvent, err := sc.handleBlockResponse(event) | |||||
return nextEvent, err | |||||
case bcNoBlockResponse: | |||||
nextEvent, err := sc.handleNoBlockResponse(event) | |||||
return nextEvent, err | |||||
case rTrySchedule: | |||||
nextEvent, err := sc.handleTrySchedule(event) | |||||
return nextEvent, err | |||||
case bcAddNewPeer: | |||||
nextEvent, err := sc.handleAddNewPeer(event) | |||||
return nextEvent, err | |||||
case bcRemovePeer: | |||||
nextEvent, err := sc.handleRemovePeer(event) | |||||
return nextEvent, err | |||||
case rTryPrunePeer: | |||||
nextEvent, err := sc.handleTryPrunePeer(event) | |||||
return nextEvent, err | |||||
case pcBlockProcessed: | |||||
nextEvent, err := sc.handleBlockProcessed(event) | |||||
return nextEvent, err | |||||
case pcBlockVerificationFailure: | |||||
nextEvent, err := sc.handleBlockProcessError(event) | |||||
return nextEvent, err | |||||
default: | |||||
return scSchedulerFail{reason: fmt.Errorf("unknown event %v", event)}, nil | |||||
} | |||||
} |
@ -1,65 +0,0 @@ | |||||
package v2 | |||||
import ( | |||||
"github.com/Workiva/go-datastructures/queue" | |||||
) | |||||
// Event is the type that can be added to the priority queue. | |||||
type Event queue.Item | |||||
type priority interface { | |||||
Compare(other queue.Item) int | |||||
Priority() int | |||||
} | |||||
type priorityLow struct{} | |||||
type priorityNormal struct{} | |||||
type priorityHigh struct{} | |||||
func (p priorityLow) Priority() int { | |||||
return 1 | |||||
} | |||||
func (p priorityNormal) Priority() int { | |||||
return 2 | |||||
} | |||||
func (p priorityHigh) Priority() int { | |||||
return 3 | |||||
} | |||||
func (p priorityLow) Compare(other queue.Item) int { | |||||
op := other.(priority) | |||||
if p.Priority() > op.Priority() { | |||||
return 1 | |||||
} else if p.Priority() == op.Priority() { | |||||
return 0 | |||||
} | |||||
return -1 | |||||
} | |||||
func (p priorityNormal) Compare(other queue.Item) int { | |||||
op := other.(priority) | |||||
if p.Priority() > op.Priority() { | |||||
return 1 | |||||
} else if p.Priority() == op.Priority() { | |||||
return 0 | |||||
} | |||||
return -1 | |||||
} | |||||
func (p priorityHigh) Compare(other queue.Item) int { | |||||
op := other.(priority) | |||||
if p.Priority() > op.Priority() { | |||||
return 1 | |||||
} else if p.Priority() == op.Priority() { | |||||
return 0 | |||||
} | |||||
return -1 | |||||
} | |||||
type noOpEvent struct { | |||||
priorityLow | |||||
} | |||||
var noOp = noOpEvent{} |