|
|
@ -107,6 +107,15 @@ func (tms *TrustMetricStore) PeerDisconnected(key string) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Saves the history data for all peers to the store DB.
|
|
|
|
// This public method acquires the trust metric store lock
|
|
|
|
func (tms *TrustMetricStore) SaveToDB() { |
|
|
|
tms.mtx.Lock() |
|
|
|
defer tms.mtx.Unlock() |
|
|
|
|
|
|
|
tms.saveToDB() |
|
|
|
} |
|
|
|
|
|
|
|
/* Private methods */ |
|
|
|
|
|
|
|
// size returns the number of entries in the store without acquiring the mutex
|
|
|
@ -115,46 +124,10 @@ func (tms *TrustMetricStore) size() int { |
|
|
|
} |
|
|
|
|
|
|
|
/* Loading & Saving */ |
|
|
|
/* Both of these methods assume the mutex has been acquired, since they write to the map */ |
|
|
|
/* Both loadFromDB and savetoDB assume the mutex has been acquired */ |
|
|
|
|
|
|
|
var trustMetricKey = []byte("trustMetricStore") |
|
|
|
|
|
|
|
type peerHistoryJSON struct { |
|
|
|
NumIntervals int `json:"intervals"` |
|
|
|
History []float64 `json:"history"` |
|
|
|
} |
|
|
|
|
|
|
|
// Loads the history data for a single peer and takes care of trust metric locking
|
|
|
|
func reinstantiateMetric(tm *TrustMetric, ph peerHistoryJSON) { |
|
|
|
tm.mtx.Lock() |
|
|
|
defer tm.mtx.Unlock() |
|
|
|
|
|
|
|
// Restore the number of time intervals we have previously tracked
|
|
|
|
if ph.NumIntervals > tm.maxIntervals { |
|
|
|
ph.NumIntervals = tm.maxIntervals |
|
|
|
} |
|
|
|
tm.numIntervals = ph.NumIntervals |
|
|
|
// Restore the history and its current size
|
|
|
|
if len(ph.History) > tm.historyMaxSize { |
|
|
|
// Keep the history no larger than historyMaxSize
|
|
|
|
last := len(ph.History) - tm.historyMaxSize |
|
|
|
ph.History = ph.History[last:] |
|
|
|
} |
|
|
|
tm.history = ph.History |
|
|
|
tm.historySize = len(tm.history) |
|
|
|
// Create the history weight values and weight sum
|
|
|
|
for i := 1; i <= tm.numIntervals; i++ { |
|
|
|
x := math.Pow(defaultHistoryDataWeight, float64(i)) // Optimistic weight
|
|
|
|
tm.historyWeights = append(tm.historyWeights, x) |
|
|
|
} |
|
|
|
|
|
|
|
for _, v := range tm.historyWeights { |
|
|
|
tm.historyWeightSum += v |
|
|
|
} |
|
|
|
// Calculate the history value based on the loaded history data
|
|
|
|
tm.historyValue = tm.calcHistoryValue() |
|
|
|
} |
|
|
|
|
|
|
|
// Loads the history data for all peers from the store DB
|
|
|
|
// cmn.Panics if file is corrupt
|
|
|
|
func (tms *TrustMetricStore) loadFromDB() bool { |
|
|
@ -164,18 +137,18 @@ func (tms *TrustMetricStore) loadFromDB() bool { |
|
|
|
return false |
|
|
|
} |
|
|
|
|
|
|
|
peers := make(map[string]peerHistoryJSON, 0) |
|
|
|
peers := make(map[string]MetricHistoryJSON, 0) |
|
|
|
err := json.Unmarshal(bytes, &peers) |
|
|
|
if err != nil { |
|
|
|
cmn.PanicCrisis(cmn.Fmt("Could not unmarshal Trust Metric Store DB data: %v", err)) |
|
|
|
} |
|
|
|
|
|
|
|
// If history data exists in the file,
|
|
|
|
// load it into trust metrics and recalc
|
|
|
|
// load it into trust metric
|
|
|
|
for key, p := range peers { |
|
|
|
tm := NewMetricWithConfig(tms.config) |
|
|
|
|
|
|
|
reinstantiateMetric(tm, p) |
|
|
|
tm.Init(p) |
|
|
|
// Load the peer trust metric into the store
|
|
|
|
tms.peerMetrics[key] = tm |
|
|
|
} |
|
|
@ -186,16 +159,11 @@ func (tms *TrustMetricStore) loadFromDB() bool { |
|
|
|
func (tms *TrustMetricStore) saveToDB() { |
|
|
|
tms.Logger.Debug("Saving TrustHistory to DB", "size", tms.size()) |
|
|
|
|
|
|
|
peers := make(map[string]peerHistoryJSON, 0) |
|
|
|
peers := make(map[string]MetricHistoryJSON, 0) |
|
|
|
|
|
|
|
for key, tm := range tms.peerMetrics { |
|
|
|
tm.mtx.Lock() |
|
|
|
// Add an entry for the peer identified by key
|
|
|
|
peers[key] = peerHistoryJSON{ |
|
|
|
NumIntervals: tm.numIntervals, |
|
|
|
History: tm.history, |
|
|
|
} |
|
|
|
tm.mtx.Unlock() |
|
|
|
peers[key] = tm.HistoryJSON() |
|
|
|
} |
|
|
|
|
|
|
|
// Write all the data back to the DB
|
|
|
@ -215,9 +183,7 @@ loop: |
|
|
|
for { |
|
|
|
select { |
|
|
|
case <-t.C: |
|
|
|
tms.mtx.Lock() |
|
|
|
tms.saveToDB() |
|
|
|
tms.mtx.Unlock() |
|
|
|
tms.SaveToDB() |
|
|
|
case <-tms.Quit: |
|
|
|
break loop |
|
|
|
} |
|
|
@ -227,12 +193,6 @@ loop: |
|
|
|
//---------------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
const ( |
|
|
|
// The number of event updates that can be sent on a single metric before blocking
|
|
|
|
defaultUpdateChanCapacity = 10 |
|
|
|
|
|
|
|
// The number of trust value requests that can be made simultaneously before blocking
|
|
|
|
defaultRequestChanCapacity = 10 |
|
|
|
|
|
|
|
// The weight applied to the derivative when current behavior is >= previous behavior
|
|
|
|
defaultDerivativeGamma1 = 0 |
|
|
|
|
|
|
@ -288,19 +248,70 @@ type TrustMetric struct { |
|
|
|
// While true, history data is not modified
|
|
|
|
paused bool |
|
|
|
|
|
|
|
// Sending true on this channel stops tracking, while false pauses tracking
|
|
|
|
stop chan bool |
|
|
|
// Signal channel for stopping the trust metric go-routine
|
|
|
|
stop chan struct{} |
|
|
|
} |
|
|
|
|
|
|
|
// MetricHistoryJSON - history data necessary to save the trust metric
|
|
|
|
type MetricHistoryJSON struct { |
|
|
|
NumIntervals int `json:"intervals"` |
|
|
|
History []float64 `json:"history"` |
|
|
|
} |
|
|
|
|
|
|
|
// Returns a snapshot of the trust metric history data
|
|
|
|
func (tm *TrustMetric) HistoryJSON() MetricHistoryJSON { |
|
|
|
tm.mtx.Lock() |
|
|
|
defer tm.mtx.Unlock() |
|
|
|
|
|
|
|
return MetricHistoryJSON{ |
|
|
|
NumIntervals: tm.numIntervals, |
|
|
|
History: tm.history, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Instantiates a trust metric by loading the history data for a single peer.
|
|
|
|
// This is called only once and only right after creation, which is why the
|
|
|
|
// lock is not held while accessing the trust metric struct members
|
|
|
|
func (tm *TrustMetric) Init(hist MetricHistoryJSON) { |
|
|
|
// Restore the number of time intervals we have previously tracked
|
|
|
|
if hist.NumIntervals > tm.maxIntervals { |
|
|
|
hist.NumIntervals = tm.maxIntervals |
|
|
|
} |
|
|
|
tm.numIntervals = hist.NumIntervals |
|
|
|
// Restore the history and its current size
|
|
|
|
if len(hist.History) > tm.historyMaxSize { |
|
|
|
// Keep the history no larger than historyMaxSize
|
|
|
|
last := len(hist.History) - tm.historyMaxSize |
|
|
|
hist.History = hist.History[last:] |
|
|
|
} |
|
|
|
tm.history = hist.History |
|
|
|
tm.historySize = len(tm.history) |
|
|
|
// Create the history weight values and weight sum
|
|
|
|
for i := 1; i <= tm.numIntervals; i++ { |
|
|
|
x := math.Pow(defaultHistoryDataWeight, float64(i)) // Optimistic weight
|
|
|
|
tm.historyWeights = append(tm.historyWeights, x) |
|
|
|
} |
|
|
|
|
|
|
|
for _, v := range tm.historyWeights { |
|
|
|
tm.historyWeightSum += v |
|
|
|
} |
|
|
|
// Calculate the history value based on the loaded history data
|
|
|
|
tm.historyValue = tm.calcHistoryValue() |
|
|
|
} |
|
|
|
|
|
|
|
// Pause tells the metric to pause recording data over time intervals.
|
|
|
|
// All method calls that indicate events will unpause the metric
|
|
|
|
func (tm *TrustMetric) Pause() { |
|
|
|
tm.stop <- false |
|
|
|
tm.mtx.Lock() |
|
|
|
defer tm.mtx.Unlock() |
|
|
|
|
|
|
|
// Pause the metric for now
|
|
|
|
tm.paused = true |
|
|
|
} |
|
|
|
|
|
|
|
// Stop tells the metric to stop recording data over time intervals
|
|
|
|
func (tm *TrustMetric) Stop() { |
|
|
|
tm.stop <- true |
|
|
|
tm.stop <- struct{}{} |
|
|
|
} |
|
|
|
|
|
|
|
// BadEvents indicates that an undesirable event(s) took place
|
|
|
@ -336,6 +347,70 @@ func (tm *TrustMetric) TrustScore() int { |
|
|
|
return int(math.Floor(score)) |
|
|
|
} |
|
|
|
|
|
|
|
// NextTimeInterval saves current time interval data and prepares for the following interval
|
|
|
|
func (tm *TrustMetric) NextTimeInterval() { |
|
|
|
tm.mtx.Lock() |
|
|
|
defer tm.mtx.Unlock() |
|
|
|
|
|
|
|
if tm.paused { |
|
|
|
// Do not prepare for the next time interval while paused
|
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
// Add the current trust value to the history data
|
|
|
|
newHist := tm.calcTrustValue() |
|
|
|
tm.history = append(tm.history, newHist) |
|
|
|
|
|
|
|
// Update history and interval counters
|
|
|
|
if tm.historySize < tm.historyMaxSize { |
|
|
|
tm.historySize++ |
|
|
|
} else { |
|
|
|
// Keep the history no larger than historyMaxSize
|
|
|
|
last := len(tm.history) - tm.historyMaxSize |
|
|
|
tm.history = tm.history[last:] |
|
|
|
} |
|
|
|
|
|
|
|
if tm.numIntervals < tm.maxIntervals { |
|
|
|
tm.numIntervals++ |
|
|
|
// Add the optimistic weight for the new time interval
|
|
|
|
wk := math.Pow(defaultHistoryDataWeight, float64(tm.numIntervals)) |
|
|
|
tm.historyWeights = append(tm.historyWeights, wk) |
|
|
|
tm.historyWeightSum += wk |
|
|
|
} |
|
|
|
|
|
|
|
// Update the history data using Faded Memories
|
|
|
|
tm.updateFadedMemory() |
|
|
|
// Calculate the history value for the upcoming time interval
|
|
|
|
tm.historyValue = tm.calcHistoryValue() |
|
|
|
tm.good = 0 |
|
|
|
tm.bad = 0 |
|
|
|
} |
|
|
|
|
|
|
|
// Copy returns a new trust metric with members containing the same values
|
|
|
|
func (tm *TrustMetric) Copy() *TrustMetric { |
|
|
|
if tm == nil { |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
return &TrustMetric{ |
|
|
|
proportionalWeight: tm.proportionalWeight, |
|
|
|
integralWeight: tm.integralWeight, |
|
|
|
numIntervals: tm.numIntervals, |
|
|
|
maxIntervals: tm.maxIntervals, |
|
|
|
intervalLen: tm.intervalLen, |
|
|
|
history: tm.history, |
|
|
|
historyWeights: tm.historyWeights, |
|
|
|
historyWeightSum: tm.historyWeightSum, |
|
|
|
historySize: tm.historySize, |
|
|
|
historyMaxSize: tm.historyMaxSize, |
|
|
|
historyValue: tm.historyValue, |
|
|
|
good: tm.good, |
|
|
|
bad: tm.bad, |
|
|
|
paused: tm.paused, |
|
|
|
stop: make(chan struct{}), |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// TrustMetricConfig - Configures the weight functions and time intervals for the metric
|
|
|
|
type TrustMetricConfig struct { |
|
|
|
// Determines the percentage given to current behavior
|
|
|
@ -385,7 +460,7 @@ func NewMetricWithConfig(tmc TrustMetricConfig) *TrustMetric { |
|
|
|
// This metric has a perfect history so far
|
|
|
|
tm.historyValue = 1.0 |
|
|
|
// Setup the stop channel
|
|
|
|
tm.stop = make(chan bool, 1) |
|
|
|
tm.stop = make(chan struct{}) |
|
|
|
|
|
|
|
go tm.processRequests() |
|
|
|
return tm |
|
|
@ -531,47 +606,10 @@ loop: |
|
|
|
for { |
|
|
|
select { |
|
|
|
case <-t.C: |
|
|
|
tm.mtx.Lock() |
|
|
|
if !tm.paused { |
|
|
|
// Add the current trust value to the history data
|
|
|
|
newHist := tm.calcTrustValue() |
|
|
|
tm.history = append(tm.history, newHist) |
|
|
|
|
|
|
|
// Update history and interval counters
|
|
|
|
if tm.historySize < tm.historyMaxSize { |
|
|
|
tm.historySize++ |
|
|
|
} else { |
|
|
|
// Keep the history no larger than historyMaxSize
|
|
|
|
last := len(tm.history) - tm.historyMaxSize |
|
|
|
tm.history = tm.history[last:] |
|
|
|
} |
|
|
|
|
|
|
|
if tm.numIntervals < tm.maxIntervals { |
|
|
|
tm.numIntervals++ |
|
|
|
// Add the optimistic weight for the new time interval
|
|
|
|
wk := math.Pow(defaultHistoryDataWeight, float64(tm.numIntervals)) |
|
|
|
tm.historyWeights = append(tm.historyWeights, wk) |
|
|
|
tm.historyWeightSum += wk |
|
|
|
} |
|
|
|
|
|
|
|
// Update the history data using Faded Memories
|
|
|
|
tm.updateFadedMemory() |
|
|
|
// Calculate the history value for the upcoming time interval
|
|
|
|
tm.historyValue = tm.calcHistoryValue() |
|
|
|
tm.good = 0 |
|
|
|
tm.bad = 0 |
|
|
|
} |
|
|
|
tm.mtx.Unlock() |
|
|
|
case stop := <-tm.stop: |
|
|
|
tm.mtx.Lock() |
|
|
|
if stop { |
|
|
|
// Stop all further tracking for this metric
|
|
|
|
tm.mtx.Unlock() |
|
|
|
break loop |
|
|
|
} |
|
|
|
// Pause the metric for now
|
|
|
|
tm.paused = true |
|
|
|
tm.mtx.Unlock() |
|
|
|
tm.NextTimeInterval() |
|
|
|
case <-tm.stop: |
|
|
|
// Stop all further tracking for this metric
|
|
|
|
break loop |
|
|
|
} |
|
|
|
} |
|
|
|
} |