diff --git a/p2p/trust/trustmetric.go b/p2p/trust/trustmetric.go index eaed78e3a..cbc2db7d5 100644 --- a/p2p/trust/trustmetric.go +++ b/p2p/trust/trustmetric.go @@ -107,6 +107,15 @@ func (tms *TrustMetricStore) PeerDisconnected(key string) { } } +// Saves the history data for all peers to the store DB. +// This public method acquires the trust metric store lock +func (tms *TrustMetricStore) SaveToDB() { + tms.mtx.Lock() + defer tms.mtx.Unlock() + + tms.saveToDB() +} + /* Private methods */ // size returns the number of entries in the store without acquiring the mutex @@ -115,15 +124,10 @@ func (tms *TrustMetricStore) size() int { } /* Loading & Saving */ -/* Both of these methods assume the mutex has been acquired, since they write to the map */ +/* Both loadFromDB and savetoDB assume the mutex has been acquired */ var trustMetricKey = []byte("trustMetricStore") -type peerHistoryJSON struct { - NumIntervals int `json:"intervals"` - History []float64 `json:"history"` -} - // Loads the history data for all peers from the store DB // cmn.Panics if file is corrupt func (tms *TrustMetricStore) loadFromDB() bool { @@ -133,41 +137,18 @@ func (tms *TrustMetricStore) loadFromDB() bool { return false } - peers := make(map[string]peerHistoryJSON, 0) + peers := make(map[string]MetricHistoryJSON, 0) err := json.Unmarshal(bytes, &peers) if err != nil { cmn.PanicCrisis(cmn.Fmt("Could not unmarshal Trust Metric Store DB data: %v", err)) } // If history data exists in the file, - // load it into trust metrics and recalc + // load it into trust metric for key, p := range peers { tm := NewMetricWithConfig(tms.config) - // Restore the number of time intervals we have previously tracked - if p.NumIntervals > tm.maxIntervals { - p.NumIntervals = tm.maxIntervals - } - tm.numIntervals = p.NumIntervals - // Restore the history and its current size - if len(p.History) > tm.historyMaxSize { - // Keep the history no larger than historyMaxSize - last := len(p.History) - tm.historyMaxSize - p.History = p.History[last:] - } - tm.history = p.History - tm.historySize = len(tm.history) - // Create the history weight values and weight sum - for i := 1; i <= tm.numIntervals; i++ { - x := math.Pow(defaultHistoryDataWeight, float64(i)) // Optimistic weight - tm.historyWeights = append(tm.historyWeights, x) - } - - for _, v := range tm.historyWeights { - tm.historyWeightSum += v - } - // Calculate the history value based on the loaded history data - tm.historyValue = tm.calcHistoryValue() + tm.Init(p) // Load the peer trust metric into the store tms.peerMetrics[key] = tm } @@ -178,14 +159,11 @@ func (tms *TrustMetricStore) loadFromDB() bool { func (tms *TrustMetricStore) saveToDB() { tms.Logger.Debug("Saving TrustHistory to DB", "size", tms.size()) - peers := make(map[string]peerHistoryJSON, 0) + peers := make(map[string]MetricHistoryJSON, 0) for key, tm := range tms.peerMetrics { // Add an entry for the peer identified by key - peers[key] = peerHistoryJSON{ - NumIntervals: tm.numIntervals, - History: tm.history, - } + peers[key] = tm.HistoryJSON() } // Write all the data back to the DB @@ -205,9 +183,7 @@ loop: for { select { case <-t.C: - tms.mtx.Lock() - tms.saveToDB() - tms.mtx.Unlock() + tms.SaveToDB() case <-tms.Quit: break loop } @@ -217,12 +193,6 @@ loop: //--------------------------------------------------------------------------------------- const ( - // The number of event updates that can be sent on a single metric before blocking - defaultUpdateChanCapacity = 10 - - // The number of trust value requests that can be made simultaneously before blocking - defaultRequestChanCapacity = 10 - // The weight applied to the derivative when current behavior is >= previous behavior defaultDerivativeGamma1 = 0 @@ -236,6 +206,9 @@ const ( // TrustMetric - keeps track of peer reliability // See tendermint/docs/architecture/adr-006-trust-metric.md for details type TrustMetric struct { + // Mutex that protects the metric from concurrent access + mtx sync.Mutex + // Determines the percentage given to current behavior proportionalWeight float64 @@ -275,55 +248,96 @@ type TrustMetric struct { // While true, history data is not modified paused bool - // Sending true on this channel stops tracking, while false pauses tracking - stop chan bool - - // For sending information about new good/bad events to be recorded - update chan *updateBadGood + // Signal channel for stopping the trust metric go-routine + stop chan struct{} +} - // The channel to request a newly calculated trust value - trustValue chan *reqTrustValue +// MetricHistoryJSON - history data necessary to save the trust metric +type MetricHistoryJSON struct { + NumIntervals int `json:"intervals"` + History []float64 `json:"history"` } -// For the TrustMetric update channel -type updateBadGood struct { - IsBad bool - Add int +// Returns a snapshot of the trust metric history data +func (tm *TrustMetric) HistoryJSON() MetricHistoryJSON { + tm.mtx.Lock() + defer tm.mtx.Unlock() + + return MetricHistoryJSON{ + NumIntervals: tm.numIntervals, + History: tm.history, + } } -// For the TrustMetric trustValue channel -type reqTrustValue struct { - // The requested trust value is sent back on this channel - Resp chan float64 +// Instantiates a trust metric by loading the history data for a single peer. +// This is called only once and only right after creation, which is why the +// lock is not held while accessing the trust metric struct members +func (tm *TrustMetric) Init(hist MetricHistoryJSON) { + // Restore the number of time intervals we have previously tracked + if hist.NumIntervals > tm.maxIntervals { + hist.NumIntervals = tm.maxIntervals + } + tm.numIntervals = hist.NumIntervals + // Restore the history and its current size + if len(hist.History) > tm.historyMaxSize { + // Keep the history no larger than historyMaxSize + last := len(hist.History) - tm.historyMaxSize + hist.History = hist.History[last:] + } + tm.history = hist.History + tm.historySize = len(tm.history) + // Create the history weight values and weight sum + for i := 1; i <= tm.numIntervals; i++ { + x := math.Pow(defaultHistoryDataWeight, float64(i)) // Optimistic weight + tm.historyWeights = append(tm.historyWeights, x) + } + + for _, v := range tm.historyWeights { + tm.historyWeightSum += v + } + // Calculate the history value based on the loaded history data + tm.historyValue = tm.calcHistoryValue() } // Pause tells the metric to pause recording data over time intervals. // All method calls that indicate events will unpause the metric func (tm *TrustMetric) Pause() { - tm.stop <- false + tm.mtx.Lock() + defer tm.mtx.Unlock() + + // Pause the metric for now + tm.paused = true } // Stop tells the metric to stop recording data over time intervals func (tm *TrustMetric) Stop() { - tm.stop <- true + tm.stop <- struct{}{} } // BadEvents indicates that an undesirable event(s) took place func (tm *TrustMetric) BadEvents(num int) { - tm.update <- &updateBadGood{IsBad: true, Add: num} + tm.mtx.Lock() + defer tm.mtx.Unlock() + + tm.unpause() + tm.bad += float64(num) } // GoodEvents indicates that a desirable event(s) took place func (tm *TrustMetric) GoodEvents(num int) { - tm.update <- &updateBadGood{IsBad: false, Add: num} + tm.mtx.Lock() + defer tm.mtx.Unlock() + + tm.unpause() + tm.good += float64(num) } // TrustValue gets the dependable trust value; always between 0 and 1 func (tm *TrustMetric) TrustValue() float64 { - resp := make(chan float64, 1) + tm.mtx.Lock() + defer tm.mtx.Unlock() - tm.trustValue <- &reqTrustValue{Resp: resp} - return <-resp + return tm.calcTrustValue() } // TrustScore gets a score based on the trust value always between 0 and 100 @@ -333,6 +347,70 @@ func (tm *TrustMetric) TrustScore() int { return int(math.Floor(score)) } +// NextTimeInterval saves current time interval data and prepares for the following interval +func (tm *TrustMetric) NextTimeInterval() { + tm.mtx.Lock() + defer tm.mtx.Unlock() + + if tm.paused { + // Do not prepare for the next time interval while paused + return + } + + // Add the current trust value to the history data + newHist := tm.calcTrustValue() + tm.history = append(tm.history, newHist) + + // Update history and interval counters + if tm.historySize < tm.historyMaxSize { + tm.historySize++ + } else { + // Keep the history no larger than historyMaxSize + last := len(tm.history) - tm.historyMaxSize + tm.history = tm.history[last:] + } + + if tm.numIntervals < tm.maxIntervals { + tm.numIntervals++ + // Add the optimistic weight for the new time interval + wk := math.Pow(defaultHistoryDataWeight, float64(tm.numIntervals)) + tm.historyWeights = append(tm.historyWeights, wk) + tm.historyWeightSum += wk + } + + // Update the history data using Faded Memories + tm.updateFadedMemory() + // Calculate the history value for the upcoming time interval + tm.historyValue = tm.calcHistoryValue() + tm.good = 0 + tm.bad = 0 +} + +// Copy returns a new trust metric with members containing the same values +func (tm *TrustMetric) Copy() *TrustMetric { + if tm == nil { + return nil + } + + return &TrustMetric{ + proportionalWeight: tm.proportionalWeight, + integralWeight: tm.integralWeight, + numIntervals: tm.numIntervals, + maxIntervals: tm.maxIntervals, + intervalLen: tm.intervalLen, + history: tm.history, + historyWeights: tm.historyWeights, + historyWeightSum: tm.historyWeightSum, + historySize: tm.historySize, + historyMaxSize: tm.historyMaxSize, + historyValue: tm.historyValue, + good: tm.good, + bad: tm.bad, + paused: tm.paused, + stop: make(chan struct{}), + } +} + // TrustMetricConfig - Configures the weight functions and time intervals for the metric type TrustMetricConfig struct { // Determines the percentage given to current behavior @@ -381,10 +459,8 @@ func NewMetricWithConfig(tmc TrustMetricConfig) *TrustMetric { tm.historyMaxSize = intervalToHistoryOffset(tm.maxIntervals) + 1 // This metric has a perfect history so far tm.historyValue = 1.0 - // Setup the channels - tm.update = make(chan *updateBadGood, defaultUpdateChanCapacity) - tm.trustValue = make(chan *reqTrustValue, defaultRequestChanCapacity) - tm.stop = make(chan bool, 1) + // Setup the stop channel + tm.stop = make(chan struct{}) go tm.processRequests() return tm @@ -417,6 +493,19 @@ func customConfig(tmc TrustMetricConfig) TrustMetricConfig { return config } +// Wakes the trust metric up if it is currently paused +// This method needs to be called with the mutex locked +func (tm *TrustMetric) unpause() { + // Check if this is the first experience with + // what we are tracking since being paused + if tm.paused { + tm.good = 0 + tm.bad = 0 + // New events cause us to unpause the metric + tm.paused = false + } +} + // Calculates the derivative component func (tm *TrustMetric) derivativeValue() float64 { return tm.proportionalValue() - tm.historyValue @@ -516,60 +605,11 @@ func (tm *TrustMetric) processRequests() { loop: for { select { - case bg := <-tm.update: - // Check if this is the first experience with - // what we are tracking since being paused - if tm.paused { - tm.good = 0 - tm.bad = 0 - // New events cause us to unpause the metric - tm.paused = false - } - - if bg.IsBad { - tm.bad += float64(bg.Add) - } else { - tm.good += float64(bg.Add) - } - case rtv := <-tm.trustValue: - rtv.Resp <- tm.calcTrustValue() case <-t.C: - if !tm.paused { - // Add the current trust value to the history data - newHist := tm.calcTrustValue() - tm.history = append(tm.history, newHist) - - // Update history and interval counters - if tm.historySize < tm.historyMaxSize { - tm.historySize++ - } else { - // Keep the history no larger than historyMaxSize - last := len(tm.history) - tm.historyMaxSize - tm.history = tm.history[last:] - } - - if tm.numIntervals < tm.maxIntervals { - tm.numIntervals++ - // Add the optimistic weight for the new time interval - wk := math.Pow(defaultHistoryDataWeight, float64(tm.numIntervals)) - tm.historyWeights = append(tm.historyWeights, wk) - tm.historyWeightSum += wk - } - - // Update the history data using Faded Memories - tm.updateFadedMemory() - // Calculate the history value for the upcoming time interval - tm.historyValue = tm.calcHistoryValue() - tm.good = 0 - tm.bad = 0 - } - case stop := <-tm.stop: - if stop { - // Stop all further tracking for this metric - break loop - } - // Pause the metric for now - tm.paused = true + tm.NextTimeInterval() + case <-tm.stop: + // Stop all further tracking for this metric + break loop } } } diff --git a/p2p/trust/trustmetric_test.go_ b/p2p/trust/trustmetric_test.go similarity index 98% rename from p2p/trust/trustmetric_test.go_ rename to p2p/trust/trustmetric_test.go index 56441c721..6c6137538 100644 --- a/p2p/trust/trustmetric_test.go_ +++ b/p2p/trust/trustmetric_test.go @@ -210,7 +210,7 @@ func TestTrustMetricStopPause(t *testing.T) { // Give the pause some time to take place time.Sleep(10 * time.Millisecond) - first := tm.numIntervals + first := tm.Copy().numIntervals // Allow more time to pass and check the intervals are unchanged time.Sleep(50 * time.Millisecond) assert.Equal(t, first, tm.numIntervals) @@ -223,7 +223,7 @@ func TestTrustMetricStopPause(t *testing.T) { // Give the stop some time to take place time.Sleep(10 * time.Millisecond) - second := tm.numIntervals + second := tm.Copy().numIntervals // Allow more time to pass and check the intervals are unchanged time.Sleep(50 * time.Millisecond) assert.Equal(t, second, tm.numIntervals)