diff --git a/p2p/trust/trustmetric.go b/p2p/trust/trustmetric.go index 6733996b0..39c24c247 100644 --- a/p2p/trust/trustmetric.go +++ b/p2p/trust/trustmetric.go @@ -13,6 +13,8 @@ import ( dbm "github.com/tendermint/tmlibs/db" ) +const defaultStorePeriodicSaveInterval = 1 * time.Minute + // TrustMetricStore - Manages all trust metrics for peers type TrustMetricStore struct { cmn.BaseService @@ -28,6 +30,9 @@ type TrustMetricStore struct { // This configuration will be used when creating new TrustMetrics config TrustMetricConfig + + // This channel is used to stop the store go-routine + stop chan int } // NewTrustMetricStore returns a store that saves data to the DB @@ -37,6 +42,7 @@ func NewTrustMetricStore(db dbm.DB, tmc TrustMetricConfig) *TrustMetricStore { peerMetrics: make(map[string]*TrustMetric), db: db, config: tmc, + stop: make(chan int, 2), } tms.BaseService = *cmn.NewBaseService(nil, "TrustMetricStore", tms) @@ -51,19 +57,24 @@ func (tms *TrustMetricStore) OnStart() error { defer tms.mtx.Unlock() tms.loadFromDB() + go tms.periodicSave() return nil } // OnStop implements Service func (tms *TrustMetricStore) OnStop() { + // Stop the store periodic save go-routine + tms.stop <- 1 + tms.mtx.Lock() defer tms.mtx.Unlock() - // Stop all trust metric goroutines + // Stop all trust metric go-routines for _, tm := range tms.peerMetrics { tm.Stop() } + // Make the final trust history data save tms.saveToDB() tms.BaseService.OnStop() } @@ -85,10 +96,8 @@ func (tms *TrustMetricStore) GetPeerTrustMetric(key string) *TrustMetric { if !ok { // If the metric is not available, we will create it tm = NewMetricWithConfig(tms.config) - if tm != nil { - // The metric needs to be in the map - tms.peerMetrics[key] = tm - } + // The metric needs to be in the map + tms.peerMetrics[key] = tm } return tm } @@ -133,7 +142,7 @@ func (tms *TrustMetricStore) loadFromDB() bool { peers := make(map[string]peerHistoryJSON, 0) err := json.Unmarshal(bytes, &peers) if err != nil { - cmn.PanicCrisis(cmn.Fmt("Could not unmarchal Trust Metric Store DB data: %v", err)) + cmn.PanicCrisis(cmn.Fmt("Could not unmarshal Trust Metric Store DB data: %v", err)) } // If history data exists in the file, @@ -183,6 +192,23 @@ func (tms *TrustMetricStore) saveToDB() { tms.db.SetSync(trustMetricKey, bytes) } +// Periodically saves the trust history data to the DB +func (tms *TrustMetricStore) periodicSave() { + t := time.NewTicker(defaultStorePeriodicSaveInterval) + defer t.Stop() +loop: + for { + select { + case <-t.C: + tms.mtx.Lock() + tms.saveToDB() + tms.mtx.Unlock() + case <-tms.stop: + break loop + } + } +} + //--------------------------------------------------------------------------------------- // The number of event updates that can be sent on a single metric before blocking @@ -341,7 +367,7 @@ func NewMetricWithConfig(tmc TrustMetricConfig) *TrustMetric { // The maximum number of time intervals is the tracking window / interval length tm.maxIntervals = int(config.TrackingWindow / tm.intervalLen) // The history size will be determined by the maximum number of time intervals - tm.historyMaxSize = intervalToHistoryIndex(tm.maxIntervals) + 1 + tm.historyMaxSize = intervalToHistoryOffset(tm.maxIntervals) + 1 // This metric has a perfect history so far tm.historyValue = 1.0 // Setup the channels @@ -397,20 +423,6 @@ func (tm *TrustMetric) weightedDerivative() float64 { return weight * d } -// Map the interval value down to an actual history index -func intervalToHistoryIndex(interval int) int { - return int(math.Floor(math.Log(float64(interval)) / math.Log(2))) -} - -// Retrieves the actual history data value that represents the requested time interval -func (tm *TrustMetric) fadedMemoryValue(interval int) float64 { - if interval == 0 { - // Base case - return tm.history[0] - } - return tm.history[intervalToHistoryIndex(interval)] -} - // Performs the update for our Faded Memories process, which allows the // trust metric tracking window to be large while maintaining a small // number of history data values @@ -419,18 +431,32 @@ func (tm *TrustMetric) updateFadedMemory() { return } + first := tm.historySize - 1 // Keep the most recent history element - faded := tm.history[:1] - - for i := 1; i < tm.historySize; i++ { + for count, i := 1, first-1; count < tm.historySize; count, i = count+1, i-1 { // The older the data is, the more we spread it out - x := math.Pow(2, float64(i)) + x := math.Pow(2, float64(count)) // Two history data values are merged into a single value - ftv := ((tm.history[i] * (x - 1)) + tm.history[i-1]) / x - faded = append(faded, ftv) + tm.history[i] = ((tm.history[i] * (x - 1)) + tm.history[i+1]) / x + } +} + +// Map the interval value down to an offset from the beginning of history +func intervalToHistoryOffset(interval int) int { + return int(math.Floor(math.Log(float64(interval)) / math.Log(2))) +} + +// Retrieves the actual history data value that represents the requested time interval +func (tm *TrustMetric) fadedMemoryValue(interval int) float64 { + first := tm.historySize - 1 + + if interval == 0 { + // Base case + return tm.history[first] } - tm.history = faded + offset := intervalToHistoryOffset(interval) + return tm.history[first-offset] } // Calculates the integral (history) component of the trust value @@ -513,13 +539,16 @@ loop: if !tm.paused { // Add the current trust value to the history data newHist := tm.calcTrustValue() - tm.history = append([]float64{newHist}, tm.history...) + tm.history = append(tm.history, newHist) // Update history and interval counters if tm.historySize < tm.historyMaxSize { tm.historySize++ } else { - tm.history = tm.history[:tm.historyMaxSize] + last := len(tm.history) - tm.historyMaxSize + + // Keep the history no larger than historyMaxSize + tm.history = tm.history[last:] } if tm.numIntervals < tm.maxIntervals { diff --git a/p2p/trust/trustmetric_test.go b/p2p/trust/trustmetric_test.go index 9c61bec96..626ca3bd5 100644 --- a/p2p/trust/trustmetric_test.go +++ b/p2p/trust/trustmetric_test.go @@ -15,16 +15,11 @@ import ( "github.com/tendermint/tmlibs/log" ) -func getTempDir(prefix string) string { - dir, err := ioutil.TempDir("", prefix) +func TestTrustMetricStoreSaveLoad(t *testing.T) { + dir, err := ioutil.TempDir("", "trust_test") if err != nil { panic(err) } - return dir -} - -func TestTrustMetricStoreSaveLoad(t *testing.T) { - dir := getTempDir("trustMetricStoreTest") defer os.Remove(dir) historyDB := dbm.NewDB("trusthistory", "goleveldb", dir)