diff --git a/p2p/trust/trustmetric.go b/p2p/trust/trustmetric.go index e68903ff5..cbc2db7d5 100644 --- a/p2p/trust/trustmetric.go +++ b/p2p/trust/trustmetric.go @@ -107,6 +107,15 @@ func (tms *TrustMetricStore) PeerDisconnected(key string) { } } +// Saves the history data for all peers to the store DB. +// This public method acquires the trust metric store lock +func (tms *TrustMetricStore) SaveToDB() { + tms.mtx.Lock() + defer tms.mtx.Unlock() + + tms.saveToDB() +} + /* Private methods */ // size returns the number of entries in the store without acquiring the mutex @@ -115,46 +124,10 @@ func (tms *TrustMetricStore) size() int { } /* Loading & Saving */ -/* Both of these methods assume the mutex has been acquired, since they write to the map */ +/* Both loadFromDB and savetoDB assume the mutex has been acquired */ var trustMetricKey = []byte("trustMetricStore") -type peerHistoryJSON struct { - NumIntervals int `json:"intervals"` - History []float64 `json:"history"` -} - -// Loads the history data for a single peer and takes care of trust metric locking -func reinstantiateMetric(tm *TrustMetric, ph peerHistoryJSON) { - tm.mtx.Lock() - defer tm.mtx.Unlock() - - // Restore the number of time intervals we have previously tracked - if ph.NumIntervals > tm.maxIntervals { - ph.NumIntervals = tm.maxIntervals - } - tm.numIntervals = ph.NumIntervals - // Restore the history and its current size - if len(ph.History) > tm.historyMaxSize { - // Keep the history no larger than historyMaxSize - last := len(ph.History) - tm.historyMaxSize - ph.History = ph.History[last:] - } - tm.history = ph.History - tm.historySize = len(tm.history) - // Create the history weight values and weight sum - for i := 1; i <= tm.numIntervals; i++ { - x := math.Pow(defaultHistoryDataWeight, float64(i)) // Optimistic weight - tm.historyWeights = append(tm.historyWeights, x) - } - - for _, v := range tm.historyWeights { - tm.historyWeightSum += v - } - // Calculate the history value based on the loaded history data - tm.historyValue = tm.calcHistoryValue() -} - // Loads the history data for all peers from the store DB // cmn.Panics if file is corrupt func (tms *TrustMetricStore) loadFromDB() bool { @@ -164,18 +137,18 @@ func (tms *TrustMetricStore) loadFromDB() bool { return false } - peers := make(map[string]peerHistoryJSON, 0) + peers := make(map[string]MetricHistoryJSON, 0) err := json.Unmarshal(bytes, &peers) if err != nil { cmn.PanicCrisis(cmn.Fmt("Could not unmarshal Trust Metric Store DB data: %v", err)) } // If history data exists in the file, - // load it into trust metrics and recalc + // load it into trust metric for key, p := range peers { tm := NewMetricWithConfig(tms.config) - reinstantiateMetric(tm, p) + tm.Init(p) // Load the peer trust metric into the store tms.peerMetrics[key] = tm } @@ -186,16 +159,11 @@ func (tms *TrustMetricStore) loadFromDB() bool { func (tms *TrustMetricStore) saveToDB() { tms.Logger.Debug("Saving TrustHistory to DB", "size", tms.size()) - peers := make(map[string]peerHistoryJSON, 0) + peers := make(map[string]MetricHistoryJSON, 0) for key, tm := range tms.peerMetrics { - tm.mtx.Lock() // Add an entry for the peer identified by key - peers[key] = peerHistoryJSON{ - NumIntervals: tm.numIntervals, - History: tm.history, - } - tm.mtx.Unlock() + peers[key] = tm.HistoryJSON() } // Write all the data back to the DB @@ -215,9 +183,7 @@ loop: for { select { case <-t.C: - tms.mtx.Lock() - tms.saveToDB() - tms.mtx.Unlock() + tms.SaveToDB() case <-tms.Quit: break loop } @@ -227,12 +193,6 @@ loop: //--------------------------------------------------------------------------------------- const ( - // The number of event updates that can be sent on a single metric before blocking - defaultUpdateChanCapacity = 10 - - // The number of trust value requests that can be made simultaneously before blocking - defaultRequestChanCapacity = 10 - // The weight applied to the derivative when current behavior is >= previous behavior defaultDerivativeGamma1 = 0 @@ -288,19 +248,70 @@ type TrustMetric struct { // While true, history data is not modified paused bool - // Sending true on this channel stops tracking, while false pauses tracking - stop chan bool + // Signal channel for stopping the trust metric go-routine + stop chan struct{} +} + +// MetricHistoryJSON - history data necessary to save the trust metric +type MetricHistoryJSON struct { + NumIntervals int `json:"intervals"` + History []float64 `json:"history"` +} + +// Returns a snapshot of the trust metric history data +func (tm *TrustMetric) HistoryJSON() MetricHistoryJSON { + tm.mtx.Lock() + defer tm.mtx.Unlock() + + return MetricHistoryJSON{ + NumIntervals: tm.numIntervals, + History: tm.history, + } +} + +// Instantiates a trust metric by loading the history data for a single peer. +// This is called only once and only right after creation, which is why the +// lock is not held while accessing the trust metric struct members +func (tm *TrustMetric) Init(hist MetricHistoryJSON) { + // Restore the number of time intervals we have previously tracked + if hist.NumIntervals > tm.maxIntervals { + hist.NumIntervals = tm.maxIntervals + } + tm.numIntervals = hist.NumIntervals + // Restore the history and its current size + if len(hist.History) > tm.historyMaxSize { + // Keep the history no larger than historyMaxSize + last := len(hist.History) - tm.historyMaxSize + hist.History = hist.History[last:] + } + tm.history = hist.History + tm.historySize = len(tm.history) + // Create the history weight values and weight sum + for i := 1; i <= tm.numIntervals; i++ { + x := math.Pow(defaultHistoryDataWeight, float64(i)) // Optimistic weight + tm.historyWeights = append(tm.historyWeights, x) + } + + for _, v := range tm.historyWeights { + tm.historyWeightSum += v + } + // Calculate the history value based on the loaded history data + tm.historyValue = tm.calcHistoryValue() } // Pause tells the metric to pause recording data over time intervals. // All method calls that indicate events will unpause the metric func (tm *TrustMetric) Pause() { - tm.stop <- false + tm.mtx.Lock() + defer tm.mtx.Unlock() + + // Pause the metric for now + tm.paused = true } // Stop tells the metric to stop recording data over time intervals func (tm *TrustMetric) Stop() { - tm.stop <- true + tm.stop <- struct{}{} } // BadEvents indicates that an undesirable event(s) took place @@ -336,6 +347,70 @@ func (tm *TrustMetric) TrustScore() int { return int(math.Floor(score)) } +// NextTimeInterval saves current time interval data and prepares for the following interval +func (tm *TrustMetric) NextTimeInterval() { + tm.mtx.Lock() + defer tm.mtx.Unlock() + + if tm.paused { + // Do not prepare for the next time interval while paused + return + } + + // Add the current trust value to the history data + newHist := tm.calcTrustValue() + tm.history = append(tm.history, newHist) + + // Update history and interval counters + if tm.historySize < tm.historyMaxSize { + tm.historySize++ + } else { + // Keep the history no larger than historyMaxSize + last := len(tm.history) - tm.historyMaxSize + tm.history = tm.history[last:] + } + + if tm.numIntervals < tm.maxIntervals { + tm.numIntervals++ + // Add the optimistic weight for the new time interval + wk := math.Pow(defaultHistoryDataWeight, float64(tm.numIntervals)) + tm.historyWeights = append(tm.historyWeights, wk) + tm.historyWeightSum += wk + } + + // Update the history data using Faded Memories + tm.updateFadedMemory() + // Calculate the history value for the upcoming time interval + tm.historyValue = tm.calcHistoryValue() + tm.good = 0 + tm.bad = 0 +} + +// Copy returns a new trust metric with members containing the same values +func (tm *TrustMetric) Copy() *TrustMetric { + if tm == nil { + return nil + } + + return &TrustMetric{ + proportionalWeight: tm.proportionalWeight, + integralWeight: tm.integralWeight, + numIntervals: tm.numIntervals, + maxIntervals: tm.maxIntervals, + intervalLen: tm.intervalLen, + history: tm.history, + historyWeights: tm.historyWeights, + historyWeightSum: tm.historyWeightSum, + historySize: tm.historySize, + historyMaxSize: tm.historyMaxSize, + historyValue: tm.historyValue, + good: tm.good, + bad: tm.bad, + paused: tm.paused, + stop: make(chan struct{}), + } +} + // TrustMetricConfig - Configures the weight functions and time intervals for the metric type TrustMetricConfig struct { // Determines the percentage given to current behavior @@ -385,7 +460,7 @@ func NewMetricWithConfig(tmc TrustMetricConfig) *TrustMetric { // This metric has a perfect history so far tm.historyValue = 1.0 // Setup the stop channel - tm.stop = make(chan bool, 1) + tm.stop = make(chan struct{}) go tm.processRequests() return tm @@ -531,47 +606,10 @@ loop: for { select { case <-t.C: - tm.mtx.Lock() - if !tm.paused { - // Add the current trust value to the history data - newHist := tm.calcTrustValue() - tm.history = append(tm.history, newHist) - - // Update history and interval counters - if tm.historySize < tm.historyMaxSize { - tm.historySize++ - } else { - // Keep the history no larger than historyMaxSize - last := len(tm.history) - tm.historyMaxSize - tm.history = tm.history[last:] - } - - if tm.numIntervals < tm.maxIntervals { - tm.numIntervals++ - // Add the optimistic weight for the new time interval - wk := math.Pow(defaultHistoryDataWeight, float64(tm.numIntervals)) - tm.historyWeights = append(tm.historyWeights, wk) - tm.historyWeightSum += wk - } - - // Update the history data using Faded Memories - tm.updateFadedMemory() - // Calculate the history value for the upcoming time interval - tm.historyValue = tm.calcHistoryValue() - tm.good = 0 - tm.bad = 0 - } - tm.mtx.Unlock() - case stop := <-tm.stop: - tm.mtx.Lock() - if stop { - // Stop all further tracking for this metric - tm.mtx.Unlock() - break loop - } - // Pause the metric for now - tm.paused = true - tm.mtx.Unlock() + tm.NextTimeInterval() + case <-tm.stop: + // Stop all further tracking for this metric + break loop } } } diff --git a/p2p/trust/trustmetric_test.go_ b/p2p/trust/trustmetric_test.go similarity index 98% rename from p2p/trust/trustmetric_test.go_ rename to p2p/trust/trustmetric_test.go index af4a945db..6c6137538 100644 --- a/p2p/trust/trustmetric_test.go_ +++ b/p2p/trust/trustmetric_test.go @@ -210,10 +210,7 @@ func TestTrustMetricStopPause(t *testing.T) { // Give the pause some time to take place time.Sleep(10 * time.Millisecond) - tm.mtx.Lock() - first := tm.numIntervals - tm.mtx.Unlock() - + first := tm.Copy().numIntervals // Allow more time to pass and check the intervals are unchanged time.Sleep(50 * time.Millisecond) assert.Equal(t, first, tm.numIntervals) @@ -226,10 +223,7 @@ func TestTrustMetricStopPause(t *testing.T) { // Give the stop some time to take place time.Sleep(10 * time.Millisecond) - tm.mtx.Lock() - second := tm.numIntervals - tm.mtx.Unlock() - + second := tm.Copy().numIntervals // Allow more time to pass and check the intervals are unchanged time.Sleep(50 * time.Millisecond) assert.Equal(t, second, tm.numIntervals)