Browse Source

Merge pull request #882 from caffix/develop

fixed race condition reported in issue #881
pull/898/merge
Ethan Buchman 7 years ago
committed by GitHub
parent
commit
7dd249f754
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 167 additions and 127 deletions
  1. +165
    -125
      p2p/trust/trustmetric.go
  2. +2
    -2
      p2p/trust/trustmetric_test.go

+ 165
- 125
p2p/trust/trustmetric.go View File

@ -107,6 +107,15 @@ func (tms *TrustMetricStore) PeerDisconnected(key string) {
} }
} }
// Saves the history data for all peers to the store DB.
// This public method acquires the trust metric store lock
func (tms *TrustMetricStore) SaveToDB() {
tms.mtx.Lock()
defer tms.mtx.Unlock()
tms.saveToDB()
}
/* Private methods */ /* Private methods */
// size returns the number of entries in the store without acquiring the mutex // size returns the number of entries in the store without acquiring the mutex
@ -115,15 +124,10 @@ func (tms *TrustMetricStore) size() int {
} }
/* Loading & Saving */ /* Loading & Saving */
/* Both of these methods assume the mutex has been acquired, since they write to the map */
/* Both loadFromDB and savetoDB assume the mutex has been acquired */
var trustMetricKey = []byte("trustMetricStore") var trustMetricKey = []byte("trustMetricStore")
type peerHistoryJSON struct {
NumIntervals int `json:"intervals"`
History []float64 `json:"history"`
}
// Loads the history data for all peers from the store DB // Loads the history data for all peers from the store DB
// cmn.Panics if file is corrupt // cmn.Panics if file is corrupt
func (tms *TrustMetricStore) loadFromDB() bool { func (tms *TrustMetricStore) loadFromDB() bool {
@ -133,41 +137,18 @@ func (tms *TrustMetricStore) loadFromDB() bool {
return false return false
} }
peers := make(map[string]peerHistoryJSON, 0)
peers := make(map[string]MetricHistoryJSON, 0)
err := json.Unmarshal(bytes, &peers) err := json.Unmarshal(bytes, &peers)
if err != nil { if err != nil {
cmn.PanicCrisis(cmn.Fmt("Could not unmarshal Trust Metric Store DB data: %v", err)) cmn.PanicCrisis(cmn.Fmt("Could not unmarshal Trust Metric Store DB data: %v", err))
} }
// If history data exists in the file, // If history data exists in the file,
// load it into trust metrics and recalc
// load it into trust metric
for key, p := range peers { for key, p := range peers {
tm := NewMetricWithConfig(tms.config) tm := NewMetricWithConfig(tms.config)
// Restore the number of time intervals we have previously tracked
if p.NumIntervals > tm.maxIntervals {
p.NumIntervals = tm.maxIntervals
}
tm.numIntervals = p.NumIntervals
// Restore the history and its current size
if len(p.History) > tm.historyMaxSize {
// Keep the history no larger than historyMaxSize
last := len(p.History) - tm.historyMaxSize
p.History = p.History[last:]
}
tm.history = p.History
tm.historySize = len(tm.history)
// Create the history weight values and weight sum
for i := 1; i <= tm.numIntervals; i++ {
x := math.Pow(defaultHistoryDataWeight, float64(i)) // Optimistic weight
tm.historyWeights = append(tm.historyWeights, x)
}
for _, v := range tm.historyWeights {
tm.historyWeightSum += v
}
// Calculate the history value based on the loaded history data
tm.historyValue = tm.calcHistoryValue()
tm.Init(p)
// Load the peer trust metric into the store // Load the peer trust metric into the store
tms.peerMetrics[key] = tm tms.peerMetrics[key] = tm
} }
@ -178,14 +159,11 @@ func (tms *TrustMetricStore) loadFromDB() bool {
func (tms *TrustMetricStore) saveToDB() { func (tms *TrustMetricStore) saveToDB() {
tms.Logger.Debug("Saving TrustHistory to DB", "size", tms.size()) tms.Logger.Debug("Saving TrustHistory to DB", "size", tms.size())
peers := make(map[string]peerHistoryJSON, 0)
peers := make(map[string]MetricHistoryJSON, 0)
for key, tm := range tms.peerMetrics { for key, tm := range tms.peerMetrics {
// Add an entry for the peer identified by key // Add an entry for the peer identified by key
peers[key] = peerHistoryJSON{
NumIntervals: tm.numIntervals,
History: tm.history,
}
peers[key] = tm.HistoryJSON()
} }
// Write all the data back to the DB // Write all the data back to the DB
@ -205,9 +183,7 @@ loop:
for { for {
select { select {
case <-t.C: case <-t.C:
tms.mtx.Lock()
tms.saveToDB()
tms.mtx.Unlock()
tms.SaveToDB()
case <-tms.Quit: case <-tms.Quit:
break loop break loop
} }
@ -217,12 +193,6 @@ loop:
//--------------------------------------------------------------------------------------- //---------------------------------------------------------------------------------------
const ( const (
// The number of event updates that can be sent on a single metric before blocking
defaultUpdateChanCapacity = 10
// The number of trust value requests that can be made simultaneously before blocking
defaultRequestChanCapacity = 10
// The weight applied to the derivative when current behavior is >= previous behavior // The weight applied to the derivative when current behavior is >= previous behavior
defaultDerivativeGamma1 = 0 defaultDerivativeGamma1 = 0
@ -236,6 +206,9 @@ const (
// TrustMetric - keeps track of peer reliability // TrustMetric - keeps track of peer reliability
// See tendermint/docs/architecture/adr-006-trust-metric.md for details // See tendermint/docs/architecture/adr-006-trust-metric.md for details
type TrustMetric struct { type TrustMetric struct {
// Mutex that protects the metric from concurrent access
mtx sync.Mutex
// Determines the percentage given to current behavior // Determines the percentage given to current behavior
proportionalWeight float64 proportionalWeight float64
@ -275,55 +248,96 @@ type TrustMetric struct {
// While true, history data is not modified // While true, history data is not modified
paused bool paused bool
// Sending true on this channel stops tracking, while false pauses tracking
stop chan bool
// For sending information about new good/bad events to be recorded
update chan *updateBadGood
// Signal channel for stopping the trust metric go-routine
stop chan struct{}
}
// The channel to request a newly calculated trust value
trustValue chan *reqTrustValue
// MetricHistoryJSON - history data necessary to save the trust metric
type MetricHistoryJSON struct {
NumIntervals int `json:"intervals"`
History []float64 `json:"history"`
} }
// For the TrustMetric update channel
type updateBadGood struct {
IsBad bool
Add int
// Returns a snapshot of the trust metric history data
func (tm *TrustMetric) HistoryJSON() MetricHistoryJSON {
tm.mtx.Lock()
defer tm.mtx.Unlock()
return MetricHistoryJSON{
NumIntervals: tm.numIntervals,
History: tm.history,
}
} }
// For the TrustMetric trustValue channel
type reqTrustValue struct {
// The requested trust value is sent back on this channel
Resp chan float64
// Instantiates a trust metric by loading the history data for a single peer.
// This is called only once and only right after creation, which is why the
// lock is not held while accessing the trust metric struct members
func (tm *TrustMetric) Init(hist MetricHistoryJSON) {
// Restore the number of time intervals we have previously tracked
if hist.NumIntervals > tm.maxIntervals {
hist.NumIntervals = tm.maxIntervals
}
tm.numIntervals = hist.NumIntervals
// Restore the history and its current size
if len(hist.History) > tm.historyMaxSize {
// Keep the history no larger than historyMaxSize
last := len(hist.History) - tm.historyMaxSize
hist.History = hist.History[last:]
}
tm.history = hist.History
tm.historySize = len(tm.history)
// Create the history weight values and weight sum
for i := 1; i <= tm.numIntervals; i++ {
x := math.Pow(defaultHistoryDataWeight, float64(i)) // Optimistic weight
tm.historyWeights = append(tm.historyWeights, x)
}
for _, v := range tm.historyWeights {
tm.historyWeightSum += v
}
// Calculate the history value based on the loaded history data
tm.historyValue = tm.calcHistoryValue()
} }
// Pause tells the metric to pause recording data over time intervals. // Pause tells the metric to pause recording data over time intervals.
// All method calls that indicate events will unpause the metric // All method calls that indicate events will unpause the metric
func (tm *TrustMetric) Pause() { func (tm *TrustMetric) Pause() {
tm.stop <- false
tm.mtx.Lock()
defer tm.mtx.Unlock()
// Pause the metric for now
tm.paused = true
} }
// Stop tells the metric to stop recording data over time intervals // Stop tells the metric to stop recording data over time intervals
func (tm *TrustMetric) Stop() { func (tm *TrustMetric) Stop() {
tm.stop <- true
tm.stop <- struct{}{}
} }
// BadEvents indicates that an undesirable event(s) took place // BadEvents indicates that an undesirable event(s) took place
func (tm *TrustMetric) BadEvents(num int) { func (tm *TrustMetric) BadEvents(num int) {
tm.update <- &updateBadGood{IsBad: true, Add: num}
tm.mtx.Lock()
defer tm.mtx.Unlock()
tm.unpause()
tm.bad += float64(num)
} }
// GoodEvents indicates that a desirable event(s) took place // GoodEvents indicates that a desirable event(s) took place
func (tm *TrustMetric) GoodEvents(num int) { func (tm *TrustMetric) GoodEvents(num int) {
tm.update <- &updateBadGood{IsBad: false, Add: num}
tm.mtx.Lock()
defer tm.mtx.Unlock()
tm.unpause()
tm.good += float64(num)
} }
// TrustValue gets the dependable trust value; always between 0 and 1 // TrustValue gets the dependable trust value; always between 0 and 1
func (tm *TrustMetric) TrustValue() float64 { func (tm *TrustMetric) TrustValue() float64 {
resp := make(chan float64, 1)
tm.mtx.Lock()
defer tm.mtx.Unlock()
tm.trustValue <- &reqTrustValue{Resp: resp}
return <-resp
return tm.calcTrustValue()
} }
// TrustScore gets a score based on the trust value always between 0 and 100 // TrustScore gets a score based on the trust value always between 0 and 100
@ -333,6 +347,70 @@ func (tm *TrustMetric) TrustScore() int {
return int(math.Floor(score)) return int(math.Floor(score))
} }
// NextTimeInterval saves current time interval data and prepares for the following interval
func (tm *TrustMetric) NextTimeInterval() {
tm.mtx.Lock()
defer tm.mtx.Unlock()
if tm.paused {
// Do not prepare for the next time interval while paused
return
}
// Add the current trust value to the history data
newHist := tm.calcTrustValue()
tm.history = append(tm.history, newHist)
// Update history and interval counters
if tm.historySize < tm.historyMaxSize {
tm.historySize++
} else {
// Keep the history no larger than historyMaxSize
last := len(tm.history) - tm.historyMaxSize
tm.history = tm.history[last:]
}
if tm.numIntervals < tm.maxIntervals {
tm.numIntervals++
// Add the optimistic weight for the new time interval
wk := math.Pow(defaultHistoryDataWeight, float64(tm.numIntervals))
tm.historyWeights = append(tm.historyWeights, wk)
tm.historyWeightSum += wk
}
// Update the history data using Faded Memories
tm.updateFadedMemory()
// Calculate the history value for the upcoming time interval
tm.historyValue = tm.calcHistoryValue()
tm.good = 0
tm.bad = 0
}
// Copy returns a new trust metric with members containing the same values
func (tm *TrustMetric) Copy() *TrustMetric {
if tm == nil {
return nil
}
return &TrustMetric{
proportionalWeight: tm.proportionalWeight,
integralWeight: tm.integralWeight,
numIntervals: tm.numIntervals,
maxIntervals: tm.maxIntervals,
intervalLen: tm.intervalLen,
history: tm.history,
historyWeights: tm.historyWeights,
historyWeightSum: tm.historyWeightSum,
historySize: tm.historySize,
historyMaxSize: tm.historyMaxSize,
historyValue: tm.historyValue,
good: tm.good,
bad: tm.bad,
paused: tm.paused,
stop: make(chan struct{}),
}
}
// TrustMetricConfig - Configures the weight functions and time intervals for the metric // TrustMetricConfig - Configures the weight functions and time intervals for the metric
type TrustMetricConfig struct { type TrustMetricConfig struct {
// Determines the percentage given to current behavior // Determines the percentage given to current behavior
@ -381,10 +459,8 @@ func NewMetricWithConfig(tmc TrustMetricConfig) *TrustMetric {
tm.historyMaxSize = intervalToHistoryOffset(tm.maxIntervals) + 1 tm.historyMaxSize = intervalToHistoryOffset(tm.maxIntervals) + 1
// This metric has a perfect history so far // This metric has a perfect history so far
tm.historyValue = 1.0 tm.historyValue = 1.0
// Setup the channels
tm.update = make(chan *updateBadGood, defaultUpdateChanCapacity)
tm.trustValue = make(chan *reqTrustValue, defaultRequestChanCapacity)
tm.stop = make(chan bool, 1)
// Setup the stop channel
tm.stop = make(chan struct{})
go tm.processRequests() go tm.processRequests()
return tm return tm
@ -417,6 +493,19 @@ func customConfig(tmc TrustMetricConfig) TrustMetricConfig {
return config return config
} }
// Wakes the trust metric up if it is currently paused
// This method needs to be called with the mutex locked
func (tm *TrustMetric) unpause() {
// Check if this is the first experience with
// what we are tracking since being paused
if tm.paused {
tm.good = 0
tm.bad = 0
// New events cause us to unpause the metric
tm.paused = false
}
}
// Calculates the derivative component // Calculates the derivative component
func (tm *TrustMetric) derivativeValue() float64 { func (tm *TrustMetric) derivativeValue() float64 {
return tm.proportionalValue() - tm.historyValue return tm.proportionalValue() - tm.historyValue
@ -516,60 +605,11 @@ func (tm *TrustMetric) processRequests() {
loop: loop:
for { for {
select { select {
case bg := <-tm.update:
// Check if this is the first experience with
// what we are tracking since being paused
if tm.paused {
tm.good = 0
tm.bad = 0
// New events cause us to unpause the metric
tm.paused = false
}
if bg.IsBad {
tm.bad += float64(bg.Add)
} else {
tm.good += float64(bg.Add)
}
case rtv := <-tm.trustValue:
rtv.Resp <- tm.calcTrustValue()
case <-t.C: case <-t.C:
if !tm.paused {
// Add the current trust value to the history data
newHist := tm.calcTrustValue()
tm.history = append(tm.history, newHist)
// Update history and interval counters
if tm.historySize < tm.historyMaxSize {
tm.historySize++
} else {
// Keep the history no larger than historyMaxSize
last := len(tm.history) - tm.historyMaxSize
tm.history = tm.history[last:]
}
if tm.numIntervals < tm.maxIntervals {
tm.numIntervals++
// Add the optimistic weight for the new time interval
wk := math.Pow(defaultHistoryDataWeight, float64(tm.numIntervals))
tm.historyWeights = append(tm.historyWeights, wk)
tm.historyWeightSum += wk
}
// Update the history data using Faded Memories
tm.updateFadedMemory()
// Calculate the history value for the upcoming time interval
tm.historyValue = tm.calcHistoryValue()
tm.good = 0
tm.bad = 0
}
case stop := <-tm.stop:
if stop {
// Stop all further tracking for this metric
break loop
}
// Pause the metric for now
tm.paused = true
tm.NextTimeInterval()
case <-tm.stop:
// Stop all further tracking for this metric
break loop
} }
} }
} }

p2p/trust/trustmetric_test.go_ → p2p/trust/trustmetric_test.go View File


Loading…
Cancel
Save