@ -1,55 +0,0 @@ | |||
package trust | |||
import "time" | |||
// MetricConfig - Configures the weight functions and time intervals for the metric | |||
type MetricConfig struct { | |||
// Determines the percentage given to current behavior | |||
ProportionalWeight float64 | |||
// Determines the percentage given to prior behavior | |||
IntegralWeight float64 | |||
// The window of time that the trust metric will track events across. | |||
// This can be set to cover many days without issue | |||
TrackingWindow time.Duration | |||
// Each interval should be short for adapability. | |||
// Less than 30 seconds is too sensitive, | |||
// and greater than 5 minutes will make the metric numb | |||
IntervalLength time.Duration | |||
} | |||
// DefaultConfig returns a config with values that have been tested and produce desirable results | |||
func DefaultConfig() MetricConfig { | |||
return MetricConfig{ | |||
ProportionalWeight: 0.4, | |||
IntegralWeight: 0.6, | |||
TrackingWindow: (time.Minute * 60 * 24) * 14, // 14 days. | |||
IntervalLength: 1 * time.Minute, | |||
} | |||
} | |||
// Ensures that all configuration elements have valid values | |||
func customConfig(tmc MetricConfig) MetricConfig { | |||
config := DefaultConfig() | |||
// Check the config for set values, and setup appropriately | |||
if tmc.ProportionalWeight > 0 { | |||
config.ProportionalWeight = tmc.ProportionalWeight | |||
} | |||
if tmc.IntegralWeight > 0 { | |||
config.IntegralWeight = tmc.IntegralWeight | |||
} | |||
if tmc.IntervalLength > time.Duration(0) { | |||
config.IntervalLength = tmc.IntervalLength | |||
} | |||
if tmc.TrackingWindow > time.Duration(0) && | |||
tmc.TrackingWindow >= config.IntervalLength { | |||
config.TrackingWindow = tmc.TrackingWindow | |||
} | |||
return config | |||
} |
@ -1,413 +0,0 @@ | |||
// Copyright 2017 Tendermint. All rights reserved. | |||
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. | |||
package trust | |||
import ( | |||
"context" | |||
"math" | |||
"time" | |||
tmsync "github.com/tendermint/tendermint/internal/libs/sync" | |||
"github.com/tendermint/tendermint/libs/service" | |||
) | |||
//--------------------------------------------------------------------------------------- | |||
const ( | |||
// The weight applied to the derivative when current behavior is >= previous behavior | |||
defaultDerivativeGamma1 = 0 | |||
// The weight applied to the derivative when current behavior is less than previous behavior | |||
defaultDerivativeGamma2 = 1.0 | |||
// The weight applied to history data values when calculating the history value | |||
defaultHistoryDataWeight = 0.8 | |||
) | |||
// MetricHistoryJSON - history data necessary to save the trust metric | |||
type MetricHistoryJSON struct { | |||
NumIntervals int `json:"intervals"` | |||
History []float64 `json:"history"` | |||
} | |||
// Metric - keeps track of peer reliability | |||
// See tendermint/docs/architecture/adr-006-trust-metric.md for details | |||
type Metric struct { | |||
service.BaseService | |||
// Mutex that protects the metric from concurrent access | |||
mtx tmsync.Mutex | |||
// Determines the percentage given to current behavior | |||
proportionalWeight float64 | |||
// Determines the percentage given to prior behavior | |||
integralWeight float64 | |||
// Count of how many time intervals this metric has been tracking | |||
numIntervals int | |||
// Size of the time interval window for this trust metric | |||
maxIntervals int | |||
// The time duration for a single time interval | |||
intervalLen time.Duration | |||
// Stores the trust history data for this metric | |||
history []float64 | |||
// Weights applied to the history data when calculating the history value | |||
historyWeights []float64 | |||
// The sum of the history weights used when calculating the history value | |||
historyWeightSum float64 | |||
// The current number of history data elements | |||
historySize int | |||
// The maximum number of history data elements | |||
historyMaxSize int | |||
// The calculated history value for the current time interval | |||
historyValue float64 | |||
// The number of recorded good and bad events for the current time interval | |||
bad, good float64 | |||
// While true, history data is not modified | |||
paused bool | |||
// Used during testing in order to control the passing of time intervals | |||
testTicker MetricTicker | |||
} | |||
// NewMetric returns a trust metric with the default configuration. | |||
// Use Start to begin tracking the quality of peer behavior over time | |||
func NewMetric() *Metric { | |||
return NewMetricWithConfig(DefaultConfig()) | |||
} | |||
// NewMetricWithConfig returns a trust metric with a custom configuration. | |||
// Use Start to begin tracking the quality of peer behavior over time | |||
func NewMetricWithConfig(tmc MetricConfig) *Metric { | |||
tm := new(Metric) | |||
config := customConfig(tmc) | |||
// Setup using the configuration values | |||
tm.proportionalWeight = config.ProportionalWeight | |||
tm.integralWeight = config.IntegralWeight | |||
tm.intervalLen = config.IntervalLength | |||
// The maximum number of time intervals is the tracking window / interval length | |||
tm.maxIntervals = int(config.TrackingWindow / tm.intervalLen) | |||
// The history size will be determined by the maximum number of time intervals | |||
tm.historyMaxSize = intervalToHistoryOffset(tm.maxIntervals) + 1 | |||
// This metric has a perfect history so far | |||
tm.historyValue = 1.0 | |||
tm.BaseService = *service.NewBaseService(nil, "Metric", tm) | |||
return tm | |||
} | |||
// OnStart implements Service | |||
func (tm *Metric) OnStart(ctx context.Context) error { | |||
if err := tm.BaseService.OnStart(ctx); err != nil { | |||
return err | |||
} | |||
go tm.processRequests() | |||
return nil | |||
} | |||
// OnStop implements Service | |||
// Nothing to do since the goroutine shuts down by itself via BaseService.Quit() | |||
func (tm *Metric) OnStop() {} | |||
// Returns a snapshot of the trust metric history data | |||
func (tm *Metric) HistoryJSON() MetricHistoryJSON { | |||
tm.mtx.Lock() | |||
defer tm.mtx.Unlock() | |||
return MetricHistoryJSON{ | |||
NumIntervals: tm.numIntervals, | |||
History: tm.history, | |||
} | |||
} | |||
// Instantiates a trust metric by loading the history data for a single peer. | |||
// This is called only once and only right after creation, which is why the | |||
// lock is not held while accessing the trust metric struct members | |||
func (tm *Metric) Init(hist MetricHistoryJSON) { | |||
// Restore the number of time intervals we have previously tracked | |||
if hist.NumIntervals > tm.maxIntervals { | |||
hist.NumIntervals = tm.maxIntervals | |||
} | |||
tm.numIntervals = hist.NumIntervals | |||
// Restore the history and its current size | |||
if len(hist.History) > tm.historyMaxSize { | |||
// Keep the history no larger than historyMaxSize | |||
last := len(hist.History) - tm.historyMaxSize | |||
hist.History = hist.History[last:] | |||
} | |||
tm.history = hist.History | |||
tm.historySize = len(tm.history) | |||
// Create the history weight values and weight sum | |||
for i := 1; i <= tm.numIntervals; i++ { | |||
x := math.Pow(defaultHistoryDataWeight, float64(i)) // Optimistic weight | |||
tm.historyWeights = append(tm.historyWeights, x) | |||
} | |||
for _, v := range tm.historyWeights { | |||
tm.historyWeightSum += v | |||
} | |||
// Calculate the history value based on the loaded history data | |||
tm.historyValue = tm.calcHistoryValue() | |||
} | |||
// Pause tells the metric to pause recording data over time intervals. | |||
// All method calls that indicate events will unpause the metric | |||
func (tm *Metric) Pause() { | |||
tm.mtx.Lock() | |||
defer tm.mtx.Unlock() | |||
// Pause the metric for now | |||
tm.paused = true | |||
} | |||
// BadEvents indicates that an undesirable event(s) took place | |||
func (tm *Metric) BadEvents(num int) { | |||
tm.mtx.Lock() | |||
defer tm.mtx.Unlock() | |||
tm.unpause() | |||
tm.bad += float64(num) | |||
} | |||
// GoodEvents indicates that a desirable event(s) took place | |||
func (tm *Metric) GoodEvents(num int) { | |||
tm.mtx.Lock() | |||
defer tm.mtx.Unlock() | |||
tm.unpause() | |||
tm.good += float64(num) | |||
} | |||
// TrustValue gets the dependable trust value; always between 0 and 1 | |||
func (tm *Metric) TrustValue() float64 { | |||
tm.mtx.Lock() | |||
defer tm.mtx.Unlock() | |||
return tm.calcTrustValue() | |||
} | |||
// TrustScore gets a score based on the trust value always between 0 and 100 | |||
func (tm *Metric) TrustScore() int { | |||
score := tm.TrustValue() * 100 | |||
return int(math.Floor(score)) | |||
} | |||
// NextTimeInterval saves current time interval data and prepares for the following interval | |||
func (tm *Metric) NextTimeInterval() { | |||
tm.mtx.Lock() | |||
defer tm.mtx.Unlock() | |||
if tm.paused { | |||
// Do not prepare for the next time interval while paused | |||
return | |||
} | |||
// Add the current trust value to the history data | |||
newHist := tm.calcTrustValue() | |||
tm.history = append(tm.history, newHist) | |||
// Update history and interval counters | |||
if tm.historySize < tm.historyMaxSize { | |||
tm.historySize++ | |||
} else { | |||
// Keep the history no larger than historyMaxSize | |||
last := len(tm.history) - tm.historyMaxSize | |||
tm.history = tm.history[last:] | |||
} | |||
if tm.numIntervals < tm.maxIntervals { | |||
tm.numIntervals++ | |||
// Add the optimistic weight for the new time interval | |||
wk := math.Pow(defaultHistoryDataWeight, float64(tm.numIntervals)) | |||
tm.historyWeights = append(tm.historyWeights, wk) | |||
tm.historyWeightSum += wk | |||
} | |||
// Update the history data using Faded Memories | |||
tm.updateFadedMemory() | |||
// Calculate the history value for the upcoming time interval | |||
tm.historyValue = tm.calcHistoryValue() | |||
tm.good = 0 | |||
tm.bad = 0 | |||
} | |||
// SetTicker allows a TestTicker to be provided that will manually control | |||
// the passing of time from the perspective of the Metric. | |||
// The ticker must be set before Start is called on the metric | |||
func (tm *Metric) SetTicker(ticker MetricTicker) { | |||
tm.mtx.Lock() | |||
defer tm.mtx.Unlock() | |||
tm.testTicker = ticker | |||
} | |||
// Copy returns a new trust metric with members containing the same values | |||
func (tm *Metric) Copy() *Metric { | |||
if tm == nil { | |||
return nil | |||
} | |||
tm.mtx.Lock() | |||
defer tm.mtx.Unlock() | |||
return &Metric{ | |||
proportionalWeight: tm.proportionalWeight, | |||
integralWeight: tm.integralWeight, | |||
numIntervals: tm.numIntervals, | |||
maxIntervals: tm.maxIntervals, | |||
intervalLen: tm.intervalLen, | |||
history: tm.history, | |||
historyWeights: tm.historyWeights, | |||
historyWeightSum: tm.historyWeightSum, | |||
historySize: tm.historySize, | |||
historyMaxSize: tm.historyMaxSize, | |||
historyValue: tm.historyValue, | |||
good: tm.good, | |||
bad: tm.bad, | |||
paused: tm.paused, | |||
} | |||
} | |||
/* Private methods */ | |||
// This method is for a goroutine that handles all requests on the metric | |||
func (tm *Metric) processRequests() { | |||
t := tm.testTicker | |||
if t == nil { | |||
// No test ticker was provided, so we create a normal ticker | |||
t = NewTicker(tm.intervalLen) | |||
} | |||
defer t.Stop() | |||
// Obtain the raw channel | |||
tick := t.GetChannel() | |||
loop: | |||
for { | |||
select { | |||
case <-tick: | |||
tm.NextTimeInterval() | |||
case <-tm.Quit(): | |||
// Stop all further tracking for this metric | |||
break loop | |||
} | |||
} | |||
} | |||
// Wakes the trust metric up if it is currently paused | |||
// This method needs to be called with the mutex locked | |||
func (tm *Metric) unpause() { | |||
// Check if this is the first experience with | |||
// what we are tracking since being paused | |||
if tm.paused { | |||
tm.good = 0 | |||
tm.bad = 0 | |||
// New events cause us to unpause the metric | |||
tm.paused = false | |||
} | |||
} | |||
// Calculates the trust value for the request processing | |||
func (tm *Metric) calcTrustValue() float64 { | |||
weightedP := tm.proportionalWeight * tm.proportionalValue() | |||
weightedI := tm.integralWeight * tm.historyValue | |||
weightedD := tm.weightedDerivative() | |||
tv := weightedP + weightedI + weightedD | |||
// Do not return a negative value. | |||
if tv < 0 { | |||
tv = 0 | |||
} | |||
return tv | |||
} | |||
// Calculates the current score for good/bad experiences | |||
func (tm *Metric) proportionalValue() float64 { | |||
value := 1.0 | |||
total := tm.good + tm.bad | |||
if total > 0 { | |||
value = tm.good / total | |||
} | |||
return value | |||
} | |||
// Strengthens the derivative component when the change is negative | |||
func (tm *Metric) weightedDerivative() float64 { | |||
var weight float64 = defaultDerivativeGamma1 | |||
d := tm.derivativeValue() | |||
if d < 0 { | |||
weight = defaultDerivativeGamma2 | |||
} | |||
return weight * d | |||
} | |||
// Calculates the derivative component | |||
func (tm *Metric) derivativeValue() float64 { | |||
return tm.proportionalValue() - tm.historyValue | |||
} | |||
// Calculates the integral (history) component of the trust value | |||
func (tm *Metric) calcHistoryValue() float64 { | |||
var hv float64 | |||
for i := 0; i < tm.numIntervals; i++ { | |||
hv += tm.fadedMemoryValue(i) * tm.historyWeights[i] | |||
} | |||
return hv / tm.historyWeightSum | |||
} | |||
// Retrieves the actual history data value that represents the requested time interval | |||
func (tm *Metric) fadedMemoryValue(interval int) float64 { | |||
first := tm.historySize - 1 | |||
if interval == 0 { | |||
// Base case | |||
return tm.history[first] | |||
} | |||
offset := intervalToHistoryOffset(interval) | |||
return tm.history[first-offset] | |||
} | |||
// Performs the update for our Faded Memories process, which allows the | |||
// trust metric tracking window to be large while maintaining a small | |||
// number of history data values | |||
func (tm *Metric) updateFadedMemory() { | |||
if tm.historySize < 2 { | |||
return | |||
} | |||
end := tm.historySize - 1 | |||
// Keep the most recent history element | |||
for count := 1; count < tm.historySize; count++ { | |||
i := end - count | |||
// The older the data is, the more we spread it out | |||
x := math.Pow(2, float64(count)) | |||
// Two history data values are merged into a single value | |||
tm.history[i] = ((tm.history[i] * (x - 1)) + tm.history[i+1]) / x | |||
} | |||
} | |||
// Map the interval value down to an offset from the beginning of history | |||
func intervalToHistoryOffset(interval int) int { | |||
// The system maintains 2^m interval values in the form of m history | |||
// data values. Therefore, we access the ith interval by obtaining | |||
// the history data index = the floor of log2(i) | |||
return int(math.Floor(math.Log2(float64(interval)))) | |||
} |
@ -1,128 +0,0 @@ | |||
package trust | |||
import ( | |||
"context" | |||
"testing" | |||
"time" | |||
"github.com/stretchr/testify/assert" | |||
"github.com/stretchr/testify/require" | |||
) | |||
func TestTrustMetricScores(t *testing.T) { | |||
ctx, cancel := context.WithCancel(context.Background()) | |||
defer cancel() | |||
tm := NewMetric() | |||
err := tm.Start(ctx) | |||
require.NoError(t, err) | |||
// Perfect score | |||
tm.GoodEvents(1) | |||
score := tm.TrustScore() | |||
assert.Equal(t, 100, score) | |||
// Less than perfect score | |||
tm.BadEvents(10) | |||
score = tm.TrustScore() | |||
assert.NotEqual(t, 100, score) | |||
err = tm.Stop() | |||
require.NoError(t, err) | |||
} | |||
func TestTrustMetricConfig(t *testing.T) { | |||
ctx, cancel := context.WithCancel(context.Background()) | |||
defer cancel() | |||
// 7 days | |||
window := time.Minute * 60 * 24 * 7 | |||
config := MetricConfig{ | |||
TrackingWindow: window, | |||
IntervalLength: 2 * time.Minute, | |||
} | |||
tm := NewMetricWithConfig(config) | |||
err := tm.Start(ctx) | |||
require.NoError(t, err) | |||
// The max time intervals should be the TrackingWindow / IntervalLen | |||
assert.Equal(t, int(config.TrackingWindow/config.IntervalLength), tm.maxIntervals) | |||
dc := DefaultConfig() | |||
// These weights should still be the default values | |||
assert.Equal(t, dc.ProportionalWeight, tm.proportionalWeight) | |||
assert.Equal(t, dc.IntegralWeight, tm.integralWeight) | |||
err = tm.Stop() | |||
require.NoError(t, err) | |||
tm.Wait() | |||
config.ProportionalWeight = 0.3 | |||
config.IntegralWeight = 0.7 | |||
tm = NewMetricWithConfig(config) | |||
err = tm.Start(ctx) | |||
require.NoError(t, err) | |||
// These weights should be equal to our custom values | |||
assert.Equal(t, config.ProportionalWeight, tm.proportionalWeight) | |||
assert.Equal(t, config.IntegralWeight, tm.integralWeight) | |||
err = tm.Stop() | |||
require.NoError(t, err) | |||
tm.Wait() | |||
} | |||
func TestTrustMetricCopyNilPointer(t *testing.T) { | |||
var tm *Metric | |||
ctm := tm.Copy() | |||
assert.Nil(t, ctm) | |||
} | |||
// XXX: This test fails non-deterministically | |||
//nolint:unused,deadcode | |||
func _TestTrustMetricStopPause(t *testing.T) { | |||
ctx, cancel := context.WithCancel(context.Background()) | |||
defer cancel() | |||
// The TestTicker will provide manual control over | |||
// the passing of time within the metric | |||
tt := NewTestTicker() | |||
tm := NewMetric() | |||
tm.SetTicker(tt) | |||
err := tm.Start(ctx) | |||
require.NoError(t, err) | |||
// Allow some time intervals to pass and pause | |||
tt.NextTick() | |||
tt.NextTick() | |||
tm.Pause() | |||
// could be 1 or 2 because Pause and NextTick race | |||
first := tm.Copy().numIntervals | |||
// Allow more time to pass and check the intervals are unchanged | |||
tt.NextTick() | |||
tt.NextTick() | |||
assert.Equal(t, first, tm.Copy().numIntervals) | |||
// Get the trust metric activated again | |||
tm.GoodEvents(5) | |||
// Allow some time intervals to pass and stop | |||
tt.NextTick() | |||
tt.NextTick() | |||
err = tm.Stop() | |||
require.NoError(t, err) | |||
tm.Wait() | |||
second := tm.Copy().numIntervals | |||
// Allow more intervals to pass while the metric is stopped | |||
// and check that the number of intervals match | |||
tm.NextTimeInterval() | |||
tm.NextTimeInterval() | |||
// XXX: fails non-deterministically: | |||
// expected 5, got 6 | |||
assert.Equal(t, second+2, tm.Copy().numIntervals) | |||
if first > second { | |||
t.Fatalf("numIntervals should always increase or stay the same over time") | |||
} | |||
} |
@ -1,222 +0,0 @@ | |||
// Copyright 2017 Tendermint. All rights reserved. | |||
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. | |||
package trust | |||
import ( | |||
"context" | |||
"encoding/json" | |||
"fmt" | |||
"time" | |||
dbm "github.com/tendermint/tm-db" | |||
tmsync "github.com/tendermint/tendermint/internal/libs/sync" | |||
"github.com/tendermint/tendermint/libs/log" | |||
"github.com/tendermint/tendermint/libs/service" | |||
) | |||
const defaultStorePeriodicSaveInterval = 1 * time.Minute | |||
var trustMetricKey = []byte("trustMetricStore") | |||
// MetricStore - Manages all trust metrics for peers | |||
type MetricStore struct { | |||
service.BaseService | |||
// Maps a Peer.Key to that peer's TrustMetric | |||
peerMetrics map[string]*Metric | |||
// Mutex that protects the map and history data file | |||
mtx tmsync.Mutex | |||
// The db where peer trust metric history data will be stored | |||
db dbm.DB | |||
// This configuration will be used when creating new TrustMetrics | |||
config MetricConfig | |||
} | |||
// NewTrustMetricStore returns a store that saves data to the DB | |||
// and uses the config when creating new trust metrics. | |||
// Use Start to to initialize the trust metric store | |||
func NewTrustMetricStore(db dbm.DB, tmc MetricConfig, logger log.Logger) *MetricStore { | |||
tms := &MetricStore{ | |||
peerMetrics: make(map[string]*Metric), | |||
db: db, | |||
config: tmc, | |||
} | |||
tms.BaseService = *service.NewBaseService(logger, "MetricStore", tms) | |||
return tms | |||
} | |||
// OnStart implements Service | |||
func (tms *MetricStore) OnStart(ctx context.Context) error { | |||
if err := tms.BaseService.OnStart(ctx); err != nil { | |||
return err | |||
} | |||
tms.mtx.Lock() | |||
defer tms.mtx.Unlock() | |||
tms.loadFromDB(ctx) | |||
go tms.saveRoutine() | |||
return nil | |||
} | |||
// OnStop implements Service | |||
func (tms *MetricStore) OnStop() { | |||
tms.BaseService.OnStop() | |||
tms.mtx.Lock() | |||
defer tms.mtx.Unlock() | |||
// Stop all trust metric go-routines | |||
for _, tm := range tms.peerMetrics { | |||
if err := tm.Stop(); err != nil { | |||
tms.Logger.Error("unable to stop metric store", "error", err) | |||
} | |||
} | |||
// Make the final trust history data save | |||
tms.saveToDB() | |||
} | |||
// Size returns the number of entries in the trust metric store | |||
func (tms *MetricStore) Size() int { | |||
tms.mtx.Lock() | |||
defer tms.mtx.Unlock() | |||
return tms.size() | |||
} | |||
// AddPeerTrustMetric takes an existing trust metric and associates it with a peer key. | |||
// The caller is expected to call Start on the TrustMetric being added | |||
func (tms *MetricStore) AddPeerTrustMetric(key string, tm *Metric) { | |||
tms.mtx.Lock() | |||
defer tms.mtx.Unlock() | |||
if key == "" || tm == nil { | |||
return | |||
} | |||
tms.peerMetrics[key] = tm | |||
} | |||
// GetPeerTrustMetric returns a trust metric by peer key | |||
func (tms *MetricStore) GetPeerTrustMetric(ctx context.Context, key string) *Metric { | |||
tms.mtx.Lock() | |||
defer tms.mtx.Unlock() | |||
tm, ok := tms.peerMetrics[key] | |||
if !ok { | |||
// If the metric is not available, we will create it | |||
tm = NewMetricWithConfig(tms.config) | |||
if err := tm.Start(ctx); err != nil { | |||
tms.Logger.Error("unable to start metric store", "error", err) | |||
} | |||
// The metric needs to be in the map | |||
tms.peerMetrics[key] = tm | |||
} | |||
return tm | |||
} | |||
// PeerDisconnected pauses the trust metric associated with the peer identified by the key | |||
func (tms *MetricStore) PeerDisconnected(key string) { | |||
tms.mtx.Lock() | |||
defer tms.mtx.Unlock() | |||
// If the Peer that disconnected has a metric, pause it | |||
if tm, ok := tms.peerMetrics[key]; ok { | |||
tm.Pause() | |||
} | |||
} | |||
// Saves the history data for all peers to the store DB. | |||
// This public method acquires the trust metric store lock | |||
func (tms *MetricStore) SaveToDB() { | |||
tms.mtx.Lock() | |||
defer tms.mtx.Unlock() | |||
tms.saveToDB() | |||
} | |||
/* Private methods */ | |||
// size returns the number of entries in the store without acquiring the mutex | |||
func (tms *MetricStore) size() int { | |||
return len(tms.peerMetrics) | |||
} | |||
/* Loading & Saving */ | |||
/* Both loadFromDB and savetoDB assume the mutex has been acquired */ | |||
// Loads the history data for all peers from the store DB | |||
// cmn.Panics if file is corrupt | |||
func (tms *MetricStore) loadFromDB(ctx context.Context) bool { | |||
// Obtain the history data we have so far | |||
bytes, err := tms.db.Get(trustMetricKey) | |||
if err != nil { | |||
panic(err) | |||
} | |||
if bytes == nil { | |||
return false | |||
} | |||
peers := make(map[string]MetricHistoryJSON) | |||
err = json.Unmarshal(bytes, &peers) | |||
if err != nil { | |||
panic(fmt.Sprintf("Could not unmarshal Trust Metric Store DB data: %v", err)) | |||
} | |||
// If history data exists in the file, | |||
// load it into trust metric | |||
for key, p := range peers { | |||
tm := NewMetricWithConfig(tms.config) | |||
if err := tm.Start(ctx); err != nil { | |||
tms.Logger.Error("unable to start metric", "error", err) | |||
} | |||
tm.Init(p) | |||
// Load the peer trust metric into the store | |||
tms.peerMetrics[key] = tm | |||
} | |||
return true | |||
} | |||
// Saves the history data for all peers to the store DB | |||
func (tms *MetricStore) saveToDB() { | |||
tms.Logger.Debug("Saving TrustHistory to DB", "size", tms.size()) | |||
peers := make(map[string]MetricHistoryJSON) | |||
for key, tm := range tms.peerMetrics { | |||
// Add an entry for the peer identified by key | |||
peers[key] = tm.HistoryJSON() | |||
} | |||
// Write all the data back to the DB | |||
bytes, err := json.Marshal(peers) | |||
if err != nil { | |||
tms.Logger.Error("Failed to encode the TrustHistory", "err", err) | |||
return | |||
} | |||
if err := tms.db.SetSync(trustMetricKey, bytes); err != nil { | |||
tms.Logger.Error("failed to flush data to disk", "error", err) | |||
} | |||
} | |||
// Periodically saves the trust history data to the DB | |||
func (tms *MetricStore) saveRoutine() { | |||
t := time.NewTicker(defaultStorePeriodicSaveInterval) | |||
defer t.Stop() | |||
loop: | |||
for { | |||
select { | |||
case <-t.C: | |||
tms.SaveToDB() | |||
case <-tms.Quit(): | |||
break loop | |||
} | |||
} | |||
} |
@ -1,176 +0,0 @@ | |||
// Copyright 2017 Tendermint. All rights reserved. | |||
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. | |||
package trust | |||
import ( | |||
"context" | |||
"fmt" | |||
"testing" | |||
"github.com/stretchr/testify/assert" | |||
"github.com/stretchr/testify/require" | |||
dbm "github.com/tendermint/tm-db" | |||
"github.com/tendermint/tendermint/libs/log" | |||
) | |||
func TestTrustMetricStoreSaveLoad(t *testing.T) { | |||
ctx, cancel := context.WithCancel(context.Background()) | |||
defer cancel() | |||
dir := t.TempDir() | |||
logger := log.TestingLogger() | |||
historyDB, err := dbm.NewDB("trusthistory", "goleveldb", dir) | |||
require.NoError(t, err) | |||
// 0 peers saved | |||
store := NewTrustMetricStore(historyDB, DefaultConfig(), logger) | |||
store.saveToDB() | |||
// Load the data from the file | |||
store = NewTrustMetricStore(historyDB, DefaultConfig(), logger) | |||
err = store.Start(ctx) | |||
require.NoError(t, err) | |||
// Make sure we still have 0 entries | |||
assert.Zero(t, store.Size()) | |||
// 100 TestTickers | |||
var tt []*TestTicker | |||
for i := 0; i < 100; i++ { | |||
// The TestTicker will provide manual control over | |||
// the passing of time within the metric | |||
tt = append(tt, NewTestTicker()) | |||
} | |||
// 100 peers | |||
for i := 0; i < 100; i++ { | |||
key := fmt.Sprintf("peer_%d", i) | |||
tm := NewMetric() | |||
tm.SetTicker(tt[i]) | |||
err = tm.Start(ctx) | |||
require.NoError(t, err) | |||
store.AddPeerTrustMetric(key, tm) | |||
tm.BadEvents(10) | |||
tm.GoodEvents(1) | |||
} | |||
// Check that we have 100 entries and save | |||
assert.Equal(t, 100, store.Size()) | |||
// Give the 100 metrics time to process the history data | |||
for i := 0; i < 100; i++ { | |||
tt[i].NextTick() | |||
tt[i].NextTick() | |||
} | |||
// Stop all the trust metrics and save | |||
err = store.Stop() | |||
require.NoError(t, err) | |||
// Load the data from the DB | |||
store = NewTrustMetricStore(historyDB, DefaultConfig(), logger) | |||
err = store.Start(ctx) | |||
require.NoError(t, err) | |||
// Check that we still have 100 peers with imperfect trust values | |||
assert.Equal(t, 100, store.Size()) | |||
for _, tm := range store.peerMetrics { | |||
assert.NotEqual(t, 1.0, tm.TrustValue()) | |||
} | |||
err = store.Stop() | |||
require.NoError(t, err) | |||
} | |||
func TestTrustMetricStoreConfig(t *testing.T) { | |||
ctx, cancel := context.WithCancel(context.Background()) | |||
defer cancel() | |||
historyDB, err := dbm.NewDB("", "memdb", "") | |||
require.NoError(t, err) | |||
config := MetricConfig{ | |||
ProportionalWeight: 0.5, | |||
IntegralWeight: 0.5, | |||
} | |||
logger := log.TestingLogger() | |||
// Create a store with custom config | |||
store := NewTrustMetricStore(historyDB, config, logger) | |||
err = store.Start(ctx) | |||
require.NoError(t, err) | |||
// Have the store make us a metric with the config | |||
tm := store.GetPeerTrustMetric(ctx, "TestKey") | |||
// Check that the options made it to the metric | |||
assert.Equal(t, 0.5, tm.proportionalWeight) | |||
assert.Equal(t, 0.5, tm.integralWeight) | |||
err = store.Stop() | |||
require.NoError(t, err) | |||
} | |||
func TestTrustMetricStoreLookup(t *testing.T) { | |||
ctx, cancel := context.WithCancel(context.Background()) | |||
defer cancel() | |||
historyDB, err := dbm.NewDB("", "memdb", "") | |||
require.NoError(t, err) | |||
store := NewTrustMetricStore(historyDB, DefaultConfig(), log.TestingLogger()) | |||
err = store.Start(ctx) | |||
require.NoError(t, err) | |||
// Create 100 peers in the trust metric store | |||
for i := 0; i < 100; i++ { | |||
key := fmt.Sprintf("peer_%d", i) | |||
store.GetPeerTrustMetric(ctx, key) | |||
// Check that the trust metric was successfully entered | |||
ktm := store.peerMetrics[key] | |||
assert.NotNil(t, ktm, "Expected to find TrustMetric %s but wasn't there.", key) | |||
} | |||
err = store.Stop() | |||
require.NoError(t, err) | |||
} | |||
func TestTrustMetricStorePeerScore(t *testing.T) { | |||
ctx, cancel := context.WithCancel(context.Background()) | |||
defer cancel() | |||
historyDB, err := dbm.NewDB("", "memdb", "") | |||
require.NoError(t, err) | |||
store := NewTrustMetricStore(historyDB, DefaultConfig(), log.TestingLogger()) | |||
err = store.Start(ctx) | |||
require.NoError(t, err) | |||
key := "TestKey" | |||
tm := store.GetPeerTrustMetric(ctx, key) | |||
// This peer is innocent so far | |||
first := tm.TrustScore() | |||
assert.Equal(t, 100, first) | |||
// Add some undesirable events and disconnect | |||
tm.BadEvents(1) | |||
first = tm.TrustScore() | |||
assert.NotEqual(t, 100, first) | |||
tm.BadEvents(10) | |||
second := tm.TrustScore() | |||
if second > first { | |||
t.Errorf("a greater number of bad events should lower the trust score") | |||
} | |||
store.PeerDisconnected(key) | |||
// We will remember our experiences with this peer | |||
tm = store.GetPeerTrustMetric(ctx, key) | |||
assert.NotEqual(t, 100, tm.TrustScore()) | |||
err = store.Stop() | |||
require.NoError(t, err) | |||
} |
@ -1,62 +0,0 @@ | |||
// Copyright 2017 Tendermint. All rights reserved. | |||
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. | |||
package trust | |||
import ( | |||
"time" | |||
) | |||
// MetricTicker provides a single ticker interface for the trust metric | |||
type MetricTicker interface { | |||
// GetChannel returns the receive only channel that fires at each time interval | |||
GetChannel() <-chan time.Time | |||
// Stop will halt further activity on the ticker channel | |||
Stop() | |||
} | |||
// The ticker used during testing that provides manual control over time intervals | |||
type TestTicker struct { | |||
C chan time.Time | |||
stopped bool | |||
} | |||
// NewTestTicker returns our ticker used within test routines | |||
func NewTestTicker() *TestTicker { | |||
c := make(chan time.Time) | |||
return &TestTicker{ | |||
C: c, | |||
} | |||
} | |||
func (t *TestTicker) GetChannel() <-chan time.Time { | |||
return t.C | |||
} | |||
func (t *TestTicker) Stop() { | |||
t.stopped = true | |||
} | |||
// NextInterval manually sends Time on the ticker channel | |||
func (t *TestTicker) NextTick() { | |||
if t.stopped { | |||
return | |||
} | |||
t.C <- time.Now() | |||
} | |||
// Ticker is just a wrap around time.Ticker that allows it | |||
// to meet the requirements of our interface | |||
type Ticker struct { | |||
*time.Ticker | |||
} | |||
// NewTicker returns a normal time.Ticker wrapped to meet our interface | |||
func NewTicker(d time.Duration) *Ticker { | |||
return &Ticker{time.NewTicker(d)} | |||
} | |||
func (t *Ticker) GetChannel() <-chan time.Time { | |||
return t.C | |||
} |