From 4087326f45a55bd95fec83c65877cbfd3bca8c80 Mon Sep 17 00:00:00 2001 From: caffix Date: Mon, 20 Nov 2017 16:45:59 -0500 Subject: [PATCH] fixed race condition reported in issue #881 --- p2p/trust/trustmetric.go | 86 +++++++++++++++++------------------ p2p/trust/trustmetric_test.go | 6 +++ 2 files changed, 49 insertions(+), 43 deletions(-) diff --git a/p2p/trust/trustmetric.go b/p2p/trust/trustmetric.go index eaed78e3a..4216a0b5b 100644 --- a/p2p/trust/trustmetric.go +++ b/p2p/trust/trustmetric.go @@ -144,6 +144,7 @@ func (tms *TrustMetricStore) loadFromDB() bool { for key, p := range peers { tm := NewMetricWithConfig(tms.config) + tm.mtx.Lock() // Restore the number of time intervals we have previously tracked if p.NumIntervals > tm.maxIntervals { p.NumIntervals = tm.maxIntervals @@ -168,6 +169,8 @@ func (tms *TrustMetricStore) loadFromDB() bool { } // Calculate the history value based on the loaded history data tm.historyValue = tm.calcHistoryValue() + tm.mtx.Unlock() + // Load the peer trust metric into the store tms.peerMetrics[key] = tm } @@ -181,11 +184,13 @@ func (tms *TrustMetricStore) saveToDB() { peers := make(map[string]peerHistoryJSON, 0) for key, tm := range tms.peerMetrics { + tm.mtx.Lock() // Add an entry for the peer identified by key peers[key] = peerHistoryJSON{ NumIntervals: tm.numIntervals, History: tm.history, } + tm.mtx.Unlock() } // Write all the data back to the DB @@ -236,6 +241,9 @@ const ( // TrustMetric - keeps track of peer reliability // See tendermint/docs/architecture/adr-006-trust-metric.md for details type TrustMetric struct { + // Mutex that protects the metric from concurrent access + mtx sync.Mutex + // Determines the percentage given to current behavior proportionalWeight float64 @@ -277,24 +285,6 @@ type TrustMetric struct { // Sending true on this channel stops tracking, while false pauses tracking stop chan bool - - // For sending information about new good/bad events to be recorded - update chan *updateBadGood - - // The channel to request a newly calculated trust value - trustValue chan *reqTrustValue -} - -// For the TrustMetric update channel -type updateBadGood struct { - IsBad bool - Add int -} - -// For the TrustMetric trustValue channel -type reqTrustValue struct { - // The requested trust value is sent back on this channel - Resp chan float64 } // Pause tells the metric to pause recording data over time intervals. @@ -310,20 +300,44 @@ func (tm *TrustMetric) Stop() { // BadEvents indicates that an undesirable event(s) took place func (tm *TrustMetric) BadEvents(num int) { - tm.update <- &updateBadGood{IsBad: true, Add: num} + tm.mtx.Lock() + defer tm.mtx.Unlock() + + // Check if this is the first experience with + // what we are tracking since being paused + if tm.paused { + tm.good = 0 + tm.bad = 0 + // New events cause us to unpause the metric + tm.paused = false + } + + tm.bad += float64(num) } // GoodEvents indicates that a desirable event(s) took place func (tm *TrustMetric) GoodEvents(num int) { - tm.update <- &updateBadGood{IsBad: false, Add: num} + tm.mtx.Lock() + defer tm.mtx.Unlock() + + // Check if this is the first experience with + // what we are tracking since being paused + if tm.paused { + tm.good = 0 + tm.bad = 0 + // New events cause us to unpause the metric + tm.paused = false + } + + tm.good += float64(num) } // TrustValue gets the dependable trust value; always between 0 and 1 func (tm *TrustMetric) TrustValue() float64 { - resp := make(chan float64, 1) + tm.mtx.Lock() + defer tm.mtx.Unlock() - tm.trustValue <- &reqTrustValue{Resp: resp} - return <-resp + return tm.calcTrustValue() } // TrustScore gets a score based on the trust value always between 0 and 100 @@ -381,9 +395,7 @@ func NewMetricWithConfig(tmc TrustMetricConfig) *TrustMetric { tm.historyMaxSize = intervalToHistoryOffset(tm.maxIntervals) + 1 // This metric has a perfect history so far tm.historyValue = 1.0 - // Setup the channels - tm.update = make(chan *updateBadGood, defaultUpdateChanCapacity) - tm.trustValue = make(chan *reqTrustValue, defaultRequestChanCapacity) + // Setup the stop channel tm.stop = make(chan bool, 1) go tm.processRequests() @@ -516,24 +528,8 @@ func (tm *TrustMetric) processRequests() { loop: for { select { - case bg := <-tm.update: - // Check if this is the first experience with - // what we are tracking since being paused - if tm.paused { - tm.good = 0 - tm.bad = 0 - // New events cause us to unpause the metric - tm.paused = false - } - - if bg.IsBad { - tm.bad += float64(bg.Add) - } else { - tm.good += float64(bg.Add) - } - case rtv := <-tm.trustValue: - rtv.Resp <- tm.calcTrustValue() case <-t.C: + tm.mtx.Lock() if !tm.paused { // Add the current trust value to the history data newHist := tm.calcTrustValue() @@ -563,13 +559,17 @@ loop: tm.good = 0 tm.bad = 0 } + tm.mtx.Unlock() case stop := <-tm.stop: + tm.mtx.Lock() if stop { // Stop all further tracking for this metric + tm.mtx.Unlock() break loop } // Pause the metric for now tm.paused = true + tm.mtx.Unlock() } } } diff --git a/p2p/trust/trustmetric_test.go b/p2p/trust/trustmetric_test.go index 56441c721..af4a945db 100644 --- a/p2p/trust/trustmetric_test.go +++ b/p2p/trust/trustmetric_test.go @@ -210,7 +210,10 @@ func TestTrustMetricStopPause(t *testing.T) { // Give the pause some time to take place time.Sleep(10 * time.Millisecond) + tm.mtx.Lock() first := tm.numIntervals + tm.mtx.Unlock() + // Allow more time to pass and check the intervals are unchanged time.Sleep(50 * time.Millisecond) assert.Equal(t, first, tm.numIntervals) @@ -223,7 +226,10 @@ func TestTrustMetricStopPause(t *testing.T) { // Give the stop some time to take place time.Sleep(10 * time.Millisecond) + tm.mtx.Lock() second := tm.numIntervals + tm.mtx.Unlock() + // Allow more time to pass and check the intervals are unchanged time.Sleep(50 * time.Millisecond) assert.Equal(t, second, tm.numIntervals)