|
|
@ -7,6 +7,8 @@ import ( |
|
|
|
"math" |
|
|
|
"sync" |
|
|
|
"time" |
|
|
|
|
|
|
|
cmn "github.com/tendermint/tmlibs/common" |
|
|
|
) |
|
|
|
|
|
|
|
//---------------------------------------------------------------------------------------
|
|
|
@ -31,6 +33,8 @@ type MetricHistoryJSON struct { |
|
|
|
// TrustMetric - keeps track of peer reliability
|
|
|
|
// See tendermint/docs/architecture/adr-006-trust-metric.md for details
|
|
|
|
type TrustMetric struct { |
|
|
|
cmn.BaseService |
|
|
|
|
|
|
|
// Mutex that protects the metric from concurrent access
|
|
|
|
mtx sync.Mutex |
|
|
|
|
|
|
@ -73,25 +77,24 @@ type TrustMetric struct { |
|
|
|
// While true, history data is not modified
|
|
|
|
paused bool |
|
|
|
|
|
|
|
// Set to true once the metric has been stopped
|
|
|
|
stopped bool |
|
|
|
|
|
|
|
// Signal channel for stopping the trust metric go-routine
|
|
|
|
stop chan struct{} |
|
|
|
|
|
|
|
// Slice of signal channels fired when the metric is stopped
|
|
|
|
waitForStop []chan struct{} |
|
|
|
// Signal channel fired when the metric go-routine has stopped
|
|
|
|
done chan struct{} |
|
|
|
|
|
|
|
// Slice of signal channels fired when the metric time interval ticker is fired
|
|
|
|
waitForTimeInterval []chan struct{} |
|
|
|
// Used during testing in order to control the passing of time intervals
|
|
|
|
testTicker MetricTicker |
|
|
|
} |
|
|
|
|
|
|
|
// NewMetric returns a trust metric with the default configuration
|
|
|
|
// NewMetric returns a trust metric with the default configuration.
|
|
|
|
// Use Start to begin tracking the quality of peer behavior over time
|
|
|
|
func NewMetric() *TrustMetric { |
|
|
|
return NewMetricWithConfig(DefaultConfig()) |
|
|
|
} |
|
|
|
|
|
|
|
// NewMetricWithConfig returns a trust metric with a custom configuration
|
|
|
|
// NewMetricWithConfig returns a trust metric with a custom configuration.
|
|
|
|
// Use Start to begin tracking the quality of peer behavior over time
|
|
|
|
func NewMetricWithConfig(tmc TrustMetricConfig) *TrustMetric { |
|
|
|
tm := new(TrustMetric) |
|
|
|
config := customConfig(tmc) |
|
|
@ -106,13 +109,34 @@ func NewMetricWithConfig(tmc TrustMetricConfig) *TrustMetric { |
|
|
|
tm.historyMaxSize = intervalToHistoryOffset(tm.maxIntervals) + 1 |
|
|
|
// This metric has a perfect history so far
|
|
|
|
tm.historyValue = 1.0 |
|
|
|
// Setup the stop channel
|
|
|
|
tm.stop = make(chan struct{}) |
|
|
|
// Setup the go-routine stop channel
|
|
|
|
tm.stop = make(chan struct{}, 1) |
|
|
|
// Setup the go-routine done channel
|
|
|
|
tm.done = make(chan struct{}, 1) |
|
|
|
|
|
|
|
go tm.processRequests() |
|
|
|
tm.BaseService = *cmn.NewBaseService(nil, "TrustMetric", tm) |
|
|
|
return tm |
|
|
|
} |
|
|
|
|
|
|
|
// OnStart implements Service
|
|
|
|
func (tm *TrustMetric) OnStart() error { |
|
|
|
if err := tm.BaseService.OnStart(); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
go tm.processRequests() |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// OnStop implements Service
|
|
|
|
// Stop tells the metric to stop recording data over time intervals
|
|
|
|
// This method also blocks until the metric has completely stopped
|
|
|
|
func (tm *TrustMetric) OnStop() { |
|
|
|
tm.BaseService.OnStop() |
|
|
|
|
|
|
|
tm.stop <- struct{}{} |
|
|
|
<-tm.done |
|
|
|
} |
|
|
|
|
|
|
|
// Returns a snapshot of the trust metric history data
|
|
|
|
func (tm *TrustMetric) HistoryJSON() MetricHistoryJSON { |
|
|
|
tm.mtx.Lock() |
|
|
@ -164,18 +188,6 @@ func (tm *TrustMetric) Pause() { |
|
|
|
tm.paused = true |
|
|
|
} |
|
|
|
|
|
|
|
// Stop tells the metric to stop recording data over time intervals
|
|
|
|
// This method also blocks until the metric has completely stopped
|
|
|
|
func (tm *TrustMetric) Stop() { |
|
|
|
tm.stop <- struct{}{} |
|
|
|
|
|
|
|
wait := make(chan struct{}) |
|
|
|
|
|
|
|
if tm.AddStopWaitChannel(wait) { |
|
|
|
<-wait |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// BadEvents indicates that an undesirable event(s) took place
|
|
|
|
func (tm *TrustMetric) BadEvents(num int) { |
|
|
|
tm.mtx.Lock() |
|
|
@ -248,6 +260,16 @@ func (tm *TrustMetric) NextTimeInterval() { |
|
|
|
tm.bad = 0 |
|
|
|
} |
|
|
|
|
|
|
|
// SetTicker allows a TestTicker to be provided that will manually control
|
|
|
|
// the passing of time from the perspective of the TrustMetric.
|
|
|
|
// The ticker must be set before Start is called on the metric
|
|
|
|
func (tm *TrustMetric) SetTicker(ticker MetricTicker) { |
|
|
|
tm.mtx.Lock() |
|
|
|
defer tm.mtx.Unlock() |
|
|
|
|
|
|
|
tm.testTicker = ticker |
|
|
|
} |
|
|
|
|
|
|
|
// Copy returns a new trust metric with members containing the same values
|
|
|
|
func (tm *TrustMetric) Copy() *TrustMetric { |
|
|
|
tm.mtx.Lock() |
|
|
@ -271,98 +293,35 @@ func (tm *TrustMetric) Copy() *TrustMetric { |
|
|
|
good: tm.good, |
|
|
|
bad: tm.bad, |
|
|
|
paused: tm.paused, |
|
|
|
stop: make(chan struct{}), |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// WaitForTimeIntervalToPass blocks until the metric
|
|
|
|
// go-routine ticker fire again
|
|
|
|
func (tm *TrustMetric) WaitForTimeIntervalToPass() { |
|
|
|
interval := make(chan struct{}) |
|
|
|
|
|
|
|
if tm.AddTimeIntervalWaitChannel(interval) { |
|
|
|
<-interval |
|
|
|
stop: make(chan struct{}, 1), |
|
|
|
done: make(chan struct{}, 1), |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// SignalTimeIntervalPassed fires all the maintained signal channels
|
|
|
|
// and clears the group of signal channels
|
|
|
|
func (tm *TrustMetric) SignalTimeIntervalPassed() { |
|
|
|
tm.mtx.Lock() |
|
|
|
defer tm.mtx.Unlock() |
|
|
|
|
|
|
|
for _, interval := range tm.waitForTimeInterval { |
|
|
|
interval <- struct{}{} |
|
|
|
} |
|
|
|
|
|
|
|
tm.waitForTimeInterval = []chan struct{}{} |
|
|
|
} |
|
|
|
|
|
|
|
// AddTimeIntervalWaitChannel adds a signal channel to a group of
|
|
|
|
// waiters for this metric. This method returns true if the channel
|
|
|
|
// was added to the group before the metric was stopped
|
|
|
|
func (tm *TrustMetric) AddTimeIntervalWaitChannel(interval chan struct{}) bool { |
|
|
|
tm.mtx.Lock() |
|
|
|
defer tm.mtx.Unlock() |
|
|
|
|
|
|
|
var added bool |
|
|
|
|
|
|
|
if !tm.stopped { |
|
|
|
tm.waitForTimeInterval = append(tm.waitForTimeInterval, interval) |
|
|
|
added = true |
|
|
|
} |
|
|
|
return added |
|
|
|
} |
|
|
|
|
|
|
|
// SignalStopped fires all the maintained signal channels and sets
|
|
|
|
// metric stopped status to true
|
|
|
|
func (tm *TrustMetric) SignalStopped() { |
|
|
|
tm.mtx.Lock() |
|
|
|
defer tm.mtx.Unlock() |
|
|
|
|
|
|
|
for _, stop := range tm.waitForStop { |
|
|
|
stop <- struct{}{} |
|
|
|
} |
|
|
|
|
|
|
|
tm.stopped = true |
|
|
|
} |
|
|
|
|
|
|
|
// AddStopWaitChannel adds a signal channel to a group of waiters
|
|
|
|
// for this metric. This method returns true if the channel was
|
|
|
|
// added to the group before the metric was stopped
|
|
|
|
func (tm *TrustMetric) AddStopWaitChannel(stop chan struct{}) bool { |
|
|
|
tm.mtx.Lock() |
|
|
|
defer tm.mtx.Unlock() |
|
|
|
|
|
|
|
var added bool |
|
|
|
|
|
|
|
if !tm.stopped { |
|
|
|
tm.waitForStop = append(tm.waitForStop, stop) |
|
|
|
added = true |
|
|
|
} |
|
|
|
return added |
|
|
|
} |
|
|
|
|
|
|
|
/* Private methods */ |
|
|
|
|
|
|
|
// This method is for a goroutine that handles all requests on the metric
|
|
|
|
func (tm *TrustMetric) processRequests() { |
|
|
|
t := time.NewTicker(tm.intervalLen) |
|
|
|
t := tm.testTicker |
|
|
|
if t == nil { |
|
|
|
// No test ticker was provided, so we create a normal ticker
|
|
|
|
t = NewTicker(tm.intervalLen) |
|
|
|
} |
|
|
|
defer t.Stop() |
|
|
|
// Obtain the raw channel
|
|
|
|
tick := t.GetChannel() |
|
|
|
loop: |
|
|
|
for { |
|
|
|
select { |
|
|
|
case <-t.C: |
|
|
|
case <-tick: |
|
|
|
tm.NextTimeInterval() |
|
|
|
tm.SignalTimeIntervalPassed() |
|
|
|
case <-tm.stop: |
|
|
|
// Stop all further tracking for this metric
|
|
|
|
break loop |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Change the status to stopped and signal the waiters
|
|
|
|
tm.SignalStopped() |
|
|
|
// Send the done signal
|
|
|
|
tm.done <- struct{}{} |
|
|
|
} |
|
|
|
|
|
|
|
// Wakes the trust metric up if it is currently paused
|
|
|
|