Browse Source

integrated trust metric store as per PR comments

pull/787/head
caffix 7 years ago
parent
commit
54c25ccbf5
4 changed files with 444 additions and 229 deletions
  1. +10
    -1
      config/config.go
  2. +66
    -14
      docs/architecture/adr-006-trust-metric.md
  3. +9
    -3
      node/node.go
  4. +359
    -211
      p2p/trust/trustmetric.go

+ 10
- 1
config/config.go View File

@ -214,6 +214,9 @@ type P2PConfig struct {
// Set true for strict address routability rules // Set true for strict address routability rules
AddrBookStrict bool `mapstructure:"addr_book_strict"` AddrBookStrict bool `mapstructure:"addr_book_strict"`
// Path to the trust history file
TrustHistory string `mapstructure:"trust_history_file"`
// Set true to enable the peer-exchange reactor // Set true to enable the peer-exchange reactor
PexReactor bool `mapstructure:"pex"` PexReactor bool `mapstructure:"pex"`
@ -239,6 +242,7 @@ func DefaultP2PConfig() *P2PConfig {
ListenAddress: "tcp://0.0.0.0:46656", ListenAddress: "tcp://0.0.0.0:46656",
AddrBook: "addrbook.json", AddrBook: "addrbook.json",
AddrBookStrict: true, AddrBookStrict: true,
TrustHistory: "trusthistory.json",
MaxNumPeers: 50, MaxNumPeers: 50,
FlushThrottleTimeout: 100, FlushThrottleTimeout: 100,
MaxMsgPacketPayloadSize: 1024, // 1 kB MaxMsgPacketPayloadSize: 1024, // 1 kB
@ -255,11 +259,16 @@ func TestP2PConfig() *P2PConfig {
return conf return conf
} }
// AddrBookFile returns the full path to the address bool
// AddrBookFile returns the full path to the address book
func (p *P2PConfig) AddrBookFile() string { func (p *P2PConfig) AddrBookFile() string {
return rootify(p.AddrBook, p.RootDir) return rootify(p.AddrBook, p.RootDir)
} }
// TrustHistoryFile returns the full path to the trust metric store history
func (p *P2PConfig) TrustHistoryFile() string {
return rootify(p.TrustHistory, p.RootDir)
}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// MempoolConfig // MempoolConfig


+ 66
- 14
docs/architecture/adr-006-trust-metric.md View File

@ -76,39 +76,91 @@ R[0] = raw data for current time interval
This section will cover the Go programming language API designed for the previously proposed process. Below is the interface for a TrustMetric: This section will cover the Go programming language API designed for the previously proposed process. Below is the interface for a TrustMetric:
```go ```go
package trust package trust
type TrustMetric struct {
// TrustMetricStore - Manages all trust metrics for peers
type TrustMetricStore struct {
cmn.BaseService
// Private elements
} }
type TrustMetricConfig struct {
ProportionalWeight float64
IntegralWeight float64
HistoryMaxSize int
IntervalLen time.Duration
// OnStart implements Service
func (tms *TrustMetricStore) OnStart() error
/ OnStop implements Service
func (tms *TrustMetricStore) OnStop()
// NewTrustMetricStore returns a store that optionally saves data to
// the file path and uses the optional config when creating new trust metrics
func NewTrustMetricStore(filePath string, tmc *TrustMetricConfig) *TrustMetricStore
// GetPeerTrustMetric returns a trust metric by peer key
func (tms *TrustMetricStore) GetPeerTrustMetric(key string) *TrustMetric
// PeerDisconnected pauses the trust metric associated with the peer identified by the key
func (tms *TrustMetricStore) PeerDisconnected(key string)
//----------------------------------------------------------------------------------------
// TrustMetric - keeps track of peer reliability
type TrustMetric struct {
// Private elements.
} }
// Pause tells the metric to pause recording data over time intervals
func (tm *TrustMetric) Pause()
// Stop tells the metric to stop recording data over time intervals
func (tm *TrustMetric) Stop() func (tm *TrustMetric) Stop()
func (tm *TrustMetric) IncBad()
// BadEvent indicates that an undesirable event took place
func (tm *TrustMetric) BadEvent()
func (tm *TrustMetric) AddBad(num int)
// AddBadEvents acknowledges multiple undesirable events
func (tm *TrustMetric) AddBadEvents(num int)
func (tm *TrustMetric) IncGood()
// GoodEvent indicates that a desirable event took place
func (tm *TrustMetric) GoodEvent()
func (tm *TrustMetric) AddGood(num int)
// AddGoodEvents acknowledges multiple desirable events
func (tm *TrustMetric) AddGoodEvents(num int)
// get the dependable trust value
// TrustValue gets the dependable trust value; always between 0 and 1
func (tm *TrustMetric) TrustValue() float64 func (tm *TrustMetric) TrustValue() float64
// TrustScore gets a score based on the trust value always between 0 and 100
func (tm *TrustMetric) TrustScore() int
// NewMetric returns a trust metric with the default configuration
func NewMetric() *TrustMetric func NewMetric() *TrustMetric
func NewMetricWithConfig(tmc *TrustMetricConfig) *TrustMetric
func GetPeerTrustMetric(key string) *TrustMetric
// TrustMetricConfig - Configures the weight functions and time intervals for the metric
type TrustMetricConfig struct {
// Determines the percentage given to current behavior
ProportionalWeight float64
// Determines the percentage given to prior behavior
IntegralWeight float64
func PeerDisconnected(key string)
// The window of time that the trust metric will track events across.
// This can be set to cover many days without issue
TrackingWindow time.Duration
// Each interval should be short for adapability.
// Less than 30 seconds is too sensitive,
// and greater than 5 minutes will make the metric numb
IntervalLen time.Duration
}
// DefaultConfig returns a config with values that have been tested and produce desirable results
func DefaultConfig() *TrustMetricConfig
// NewMetricWithConfig returns a trust metric with a custom configuration
func NewMetricWithConfig(tmc *TrustMetricConfig) *TrustMetric
``` ```


+ 9
- 3
node/node.go View File

@ -22,6 +22,7 @@ import (
"github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/consensus"
mempl "github.com/tendermint/tendermint/mempool" mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/trust"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
rpccore "github.com/tendermint/tendermint/rpc/core" rpccore "github.com/tendermint/tendermint/rpc/core"
grpccore "github.com/tendermint/tendermint/rpc/grpc" grpccore "github.com/tendermint/tendermint/rpc/grpc"
@ -95,9 +96,10 @@ type Node struct {
privValidator types.PrivValidator // local node's validator key privValidator types.PrivValidator // local node's validator key
// network // network
privKey crypto.PrivKeyEd25519 // local node's p2p key
sw *p2p.Switch // p2p connections
addrBook *p2p.AddrBook // known peers
privKey crypto.PrivKeyEd25519 // local node's p2p key
sw *p2p.Switch // p2p connections
addrBook *p2p.AddrBook // known peers
tmStore *trust.TrustMetricStore // trust metrics for all peers
// services // services
eventBus *types.EventBus // pub/sub for services eventBus *types.EventBus // pub/sub for services
@ -239,9 +241,12 @@ func NewNode(config *cfg.Config,
// Optionally, start the pex reactor // Optionally, start the pex reactor
var addrBook *p2p.AddrBook var addrBook *p2p.AddrBook
var tmStore *trust.TrustMetricStore
if config.P2P.PexReactor { if config.P2P.PexReactor {
addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict) addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile())) addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
tmStore = trust.NewTrustMetricStore(config.P2P.TrustHistoryFile(), nil)
tmStore.SetLogger(p2pLogger.With("trust", config.P2P.TrustHistoryFile()))
pexReactor := p2p.NewPEXReactor(addrBook) pexReactor := p2p.NewPEXReactor(addrBook)
pexReactor.SetLogger(p2pLogger) pexReactor.SetLogger(p2pLogger)
sw.AddReactor("PEX", pexReactor) sw.AddReactor("PEX", pexReactor)
@ -297,6 +302,7 @@ func NewNode(config *cfg.Config,
privKey: privKey, privKey: privKey,
sw: sw, sw: sw,
addrBook: addrBook, addrBook: addrBook,
tmStore: tmStore,
blockStore: blockStore, blockStore: blockStore,
bcReactor: bcReactor, bcReactor: bcReactor,


+ 359
- 211
p2p/trust/trustmetric.go View File

@ -1,253 +1,450 @@
// Copyright 2017 Tendermint. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
package trust package trust
import ( import (
"encoding/json" "encoding/json"
"io/ioutil" "io/ioutil"
"math" "math"
"os"
"path/filepath"
"sync"
"time" "time"
)
var (
store *trustMetricStore
cmn "github.com/tendermint/tmlibs/common"
) )
type peerMetricRequest struct {
Key string
Resp chan *TrustMetric
// TrustMetricStore - Manages all trust metrics for peers
type TrustMetricStore struct {
cmn.BaseService
// Maps a Peer.Key to that peer's TrustMetric
peerMetrics map[string]*TrustMetric
// Mutex that protects the map and history data file
mtx sync.Mutex
// The file path where peer trust metric history data will be stored
filePath string
// This configuration will be used when creating new TrustMetrics
config *TrustMetricConfig
}
// NewTrustMetricStore returns a store that optionally saves data to
// the file path and uses the optional config when creating new trust metrics
func NewTrustMetricStore(filePath string, tmc *TrustMetricConfig) *TrustMetricStore {
tms := &TrustMetricStore{
peerMetrics: make(map[string]*TrustMetric),
filePath: filePath,
config: tmc,
}
tms.BaseService = *cmn.NewBaseService(nil, "TrustMetricStore", tms)
return tms
}
// OnStart implements Service
func (tms *TrustMetricStore) OnStart() error {
tms.BaseService.OnStart()
tms.mtx.Lock()
defer tms.mtx.Unlock()
tms.loadFromFile()
return nil
} }
type trustMetricStore struct {
PeerMetrics map[string]*TrustMetric
Requests chan *peerMetricRequest
Disconn chan string
// OnStop implements Service
func (tms *TrustMetricStore) OnStop() {
tms.mtx.Lock()
defer tms.mtx.Unlock()
// Stop all trust metric goroutines
for _, tm := range tms.peerMetrics {
tm.Stop()
}
tms.saveToFile()
tms.BaseService.OnStop()
} }
func init() {
store = &trustMetricStore{
PeerMetrics: make(map[string]*TrustMetric),
Requests: make(chan *peerMetricRequest, 10),
Disconn: make(chan string, 10),
// GetPeerTrustMetric returns a trust metric by peer key
func (tms *TrustMetricStore) GetPeerTrustMetric(key string) *TrustMetric {
tms.mtx.Lock()
defer tms.mtx.Unlock()
tm, ok := tms.peerMetrics[key]
if !ok {
// If the metric is not available, we will create it
tm = NewMetricWithConfig(tms.config)
if tm != nil {
// The metric needs to be in the map
tms.peerMetrics[key] = tm
}
} }
return tm
}
go store.processRequests()
// PeerDisconnected pauses the trust metric associated with the peer identified by the key
func (tms *TrustMetricStore) PeerDisconnected(key string) {
tms.mtx.Lock()
defer tms.mtx.Unlock()
// If the Peer that disconnected has a metric, pause it
if tm, ok := tms.peerMetrics[key]; ok {
tm.Pause()
}
} }
type peerHistory struct {
/* Loading & Saving */
type peerHistoryJSON struct {
NumIntervals int `json:"intervals"` NumIntervals int `json:"intervals"`
History []float64 `json:"history"` History []float64 `json:"history"`
} }
func loadSaveFromFile(key string, isLoad bool, data *peerHistory) *peerHistory {
tmhome, ok := os.LookupEnv("TMHOME")
if !ok {
return nil
// Loads the history data for the Peer identified by key from the store file.
// cmn.Panics if file is corrupt
func (tms *TrustMetricStore) loadFromFile() bool {
// Check that a file has been configured for use
if tms.filePath == "" {
// The trust metric store can operate without the file
return false
} }
filename := filepath.Join(tmhome, "trust_history.json")
peers := make(map[string]peerHistory, 0)
// read in previously written history data
content, err := ioutil.ReadFile(filename)
if err == nil {
err = json.Unmarshal(content, &peers)
// Obtain the history data we have so far
content, err := ioutil.ReadFile(tms.filePath)
if err != nil {
cmn.PanicCrisis(cmn.Fmt("Error reading file %s: %v", tms.filePath, err))
} }
var result *peerHistory
peers := make(map[string]peerHistoryJSON, 0)
err = json.Unmarshal(content, &peers)
if err != nil {
cmn.PanicCrisis(cmn.Fmt("Error decoding file %s: %v", tms.filePath, err))
}
if isLoad {
if p, ok := peers[key]; ok {
result = &p
// If history data exists in the file,
// load it into trust metrics and recalc
for key, p := range peers {
tm := NewMetricWithConfig(tms.config)
if tm == nil {
continue
} }
} else {
peers[key] = *data
b, err := json.Marshal(peers)
if err == nil {
err = ioutil.WriteFile(filename, b, 0644)
// Restore the number of time intervals we have previously tracked
if p.NumIntervals > tm.maxIntervals {
p.NumIntervals = tm.maxIntervals
}
tm.numIntervals = p.NumIntervals
// Restore the history and its current size
if len(p.History) > tm.historyMaxSize {
p.History = p.History[:tm.historyMaxSize]
} }
tm.history = p.History
tm.historySize = len(tm.history)
// Calculate the history value based on the loaded history data
tm.historyValue = tm.calcHistoryValue()
// Load the peer trust metric into the store
tms.peerMetrics[key] = tm
} }
return result
return true
} }
func createLoadPeerMetric(key string) *TrustMetric {
tm := NewMetric()
if tm == nil {
return tm
// Saves the history data for all peers to the store file
func (tms *TrustMetricStore) saveToFile() {
// Check that a file has been configured for use
if tms.filePath == "" {
// The trust metric store can operate without the file
return
} }
// attempt to load the peer's trust history data
if ph := loadSaveFromFile(key, true, nil); ph != nil {
tm.historySize = len(ph.History)
tms.Logger.Info("Saving TrustHistory to file", "size", len(tms.peerMetrics))
if tm.historySize > 0 {
tm.numIntervals = ph.NumIntervals
tm.history = ph.History
peers := make(map[string]peerHistoryJSON, 0)
tm.historyValue = tm.calcHistoryValue()
for key, tm := range tms.peerMetrics {
// Add an entry for the peer identified by key
peers[key] = peerHistoryJSON{
NumIntervals: tm.numIntervals,
History: tm.history,
} }
} }
return tm
// Write all the data back to the file
b, err := json.Marshal(peers)
if err != nil {
tms.Logger.Error("Failed to encode the TrustHistory", "err", err)
return
}
err = ioutil.WriteFile(tms.filePath, b, 0644)
if err != nil {
tms.Logger.Error("Failed to save TrustHistory to file", "err", err)
}
} }
func (tms *trustMetricStore) processRequests() {
for {
select {
case req := <-tms.Requests:
tm, ok := tms.PeerMetrics[req.Key]
//---------------------------------------------------------------------------------------
// TrustMetric - keeps track of peer reliability
// See tendermint/docs/architecture/adr-006-trust-metric.md for details
type TrustMetric struct {
// Determines the percentage given to current behavior
proportionalWeight float64
if !ok {
tm = createLoadPeerMetric(req.Key)
// Determines the percentage given to prior behavior
integralWeight float64
if tm != nil {
tms.PeerMetrics[req.Key] = tm
}
}
// Count of how many time intervals this metric has been tracking
numIntervals int
req.Resp <- tm
case key := <-tms.Disconn:
if tm, ok := tms.PeerMetrics[key]; ok {
ph := peerHistory{
NumIntervals: tm.numIntervals,
History: tm.history,
}
tm.Stop()
delete(tms.PeerMetrics, key)
loadSaveFromFile(key, false, &ph)
}
}
}
// Size of the time interval window for this trust metric
maxIntervals int
// The time duration for a single time interval
intervalLen time.Duration
// Stores the trust history data for this metric
history []float64
// The current number of history data elements
historySize int
// The maximum number of history data elements
historyMaxSize int
// The calculated history value for the current time interval
historyValue float64
// The number of recorded good and bad events for the current time interval
bad, good float64
// Sending true on this channel stops tracking, while false pauses tracking
stop chan bool
// For sending information about new good/bad events to be recorded
update chan *updateBadGood
// The channel to request a newly calculated trust value
trustValue chan *reqTrustValue
}
// For the TrustMetric update channel
type updateBadGood struct {
IsBad bool
Add int
} }
// request a TrustMetric by Peer Key
func GetPeerTrustMetric(key string) *TrustMetric {
resp := make(chan *TrustMetric, 1)
// For the TrustMetric trustValue channel
type reqTrustValue struct {
// The requested trust value is sent back on this channel
Resp chan float64
}
store.Requests <- &peerMetricRequest{Key: key, Resp: resp}
return <-resp
// Pause tells the metric to pause recording data over time intervals
func (tm *TrustMetric) Pause() {
tm.stop <- false
} }
// the trust metric store should know when a Peer disconnects
func PeerDisconnected(key string) {
store.Disconn <- key
// Stop tells the metric to stop recording data over time intervals
func (tm *TrustMetric) Stop() {
tm.stop <- true
} }
// keep track of Peer reliability
type TrustMetric struct {
proportionalWeight float64
integralWeight float64
numIntervals int
maxIntervals int
intervalLen time.Duration
history []float64
historySize int
historyMaxSize int
historyValue float64
bad, good float64
stop chan int
update chan *updateBadGood
trustValue chan *reqTrustValue
// BadEvent indicates that an undesirable event took place
func (tm *TrustMetric) BadEvent() {
tm.update <- &updateBadGood{IsBad: true, Add: 1}
} }
// AddBadEvents acknowledges multiple undesirable events
func (tm *TrustMetric) AddBadEvents(num int) {
tm.update <- &updateBadGood{IsBad: true, Add: num}
}
// GoodEvent indicates that a desirable event took place
func (tm *TrustMetric) GoodEvent() {
tm.update <- &updateBadGood{IsBad: false, Add: 1}
}
// AddGoodEvents acknowledges multiple desirable events
func (tm *TrustMetric) AddGoodEvents(num int) {
tm.update <- &updateBadGood{IsBad: false, Add: num}
}
// TrustValue gets the dependable trust value; always between 0 and 1
func (tm *TrustMetric) TrustValue() float64 {
resp := make(chan float64, 1)
tm.trustValue <- &reqTrustValue{Resp: resp}
return <-resp
}
// TrustScore gets a score based on the trust value always between 0 and 100
func (tm *TrustMetric) TrustScore() int {
resp := make(chan float64, 1)
tm.trustValue <- &reqTrustValue{Resp: resp}
return int(math.Floor(<-resp * 100))
}
// TrustMetricConfig - Configures the weight functions and time intervals for the metric
type TrustMetricConfig struct { type TrustMetricConfig struct {
// be careful changing these weights
// Determines the percentage given to current behavior
ProportionalWeight float64 ProportionalWeight float64
IntegralWeight float64
// don't allow 2^HistoryMaxSize to be greater than int max value
HistoryMaxSize int
// each interval should be short for adapability
// less than 30 seconds is too sensitive,
// Determines the percentage given to prior behavior
IntegralWeight float64
// The window of time that the trust metric will track events across.
// This can be set to cover many days without issue
TrackingWindow time.Duration
// Each interval should be short for adapability.
// Less than 30 seconds is too sensitive,
// and greater than 5 minutes will make the metric numb // and greater than 5 minutes will make the metric numb
IntervalLen time.Duration IntervalLen time.Duration
} }
func defaultConfig() *TrustMetricConfig {
// DefaultConfig returns a config with values that have been tested and produce desirable results
func DefaultConfig() *TrustMetricConfig {
return &TrustMetricConfig{ return &TrustMetricConfig{
ProportionalWeight: 0.4, ProportionalWeight: 0.4,
IntegralWeight: 0.6, IntegralWeight: 0.6,
HistoryMaxSize: 16,
TrackingWindow: (time.Minute * 60 * 24) * 14, // 14 days.
IntervalLen: 1 * time.Minute, IntervalLen: 1 * time.Minute,
} }
} }
type updateBadGood struct {
IsBad bool
Add int
// NewMetric returns a trust metric with the default configuration
func NewMetric() *TrustMetric {
return NewMetricWithConfig(nil)
} }
type reqTrustValue struct {
Resp chan float64
// NewMetricWithConfig returns a trust metric with a custom configuration
func NewMetricWithConfig(tmc *TrustMetricConfig) *TrustMetric {
var config *TrustMetricConfig
if tmc == nil {
config = DefaultConfig()
} else {
config = customConfig(tmc)
}
tm := new(TrustMetric)
// Setup using the configuration values
tm.proportionalWeight = config.ProportionalWeight
tm.integralWeight = config.IntegralWeight
tm.intervalLen = config.IntervalLen
// The maximum number of time intervals is the tracking window / interval length
tm.maxIntervals = int(config.TrackingWindow / tm.intervalLen)
// The history size will be determined by the maximum number of time intervals
tm.historyMaxSize = intervalToHistoryIndex(tm.maxIntervals) + 1
// This metric has a perfect history so far
tm.historyValue = 1.0
// Setup the channels
tm.update = make(chan *updateBadGood, 10)
tm.trustValue = make(chan *reqTrustValue, 10)
tm.stop = make(chan bool, 2)
go tm.processRequests()
return tm
} }
// calculates the derivative component
/* Private methods */
// Ensures that all configuration elements have valid values
func customConfig(tmc *TrustMetricConfig) *TrustMetricConfig {
config := DefaultConfig()
// Check the config for set values, and setup appropriately
if tmc.ProportionalWeight != 0 {
config.ProportionalWeight = tmc.ProportionalWeight
}
if tmc.IntegralWeight != 0 {
config.IntegralWeight = tmc.IntegralWeight
}
if tmc.TrackingWindow != time.Duration(0) {
config.TrackingWindow = tmc.TrackingWindow
}
if tmc.IntervalLen != time.Duration(0) {
config.IntervalLen = tmc.IntervalLen
}
return config
}
// Calculates the derivative component
func (tm *TrustMetric) derivativeValue() float64 { func (tm *TrustMetric) derivativeValue() float64 {
return tm.proportionalValue() - tm.historyValue return tm.proportionalValue() - tm.historyValue
} }
// strengthens the derivative component
// Strengthens the derivative component when the change is negative
func (tm *TrustMetric) weightedDerivative() float64 { func (tm *TrustMetric) weightedDerivative() float64 {
var weight float64 var weight float64
d := tm.derivativeValue() d := tm.derivativeValue()
if d < 0 { if d < 0 {
weight = 1.0 weight = 1.0
} }
return weight * d return weight * d
} }
// Map the interval value down to an actual history index
func intervalToHistoryIndex(interval int) int {
return int(math.Floor(math.Log(float64(interval)) / math.Log(2)))
}
// Retrieves the actual history data value that represents the requested time interval
func (tm *TrustMetric) fadedMemoryValue(interval int) float64 { func (tm *TrustMetric) fadedMemoryValue(interval int) float64 {
if interval == 0 { if interval == 0 {
// base case
// Base case
return tm.history[0] return tm.history[0]
} }
index := int(math.Floor(math.Log(float64(interval)) / math.Log(2)))
// map the interval value down to an actual history index
return tm.history[index]
return tm.history[intervalToHistoryIndex(interval)]
} }
// Performs the update for our Faded Memories process, which allows the
// trust metric tracking window to be large while maintaining a small
// number of history data values
func (tm *TrustMetric) updateFadedMemory() { func (tm *TrustMetric) updateFadedMemory() {
if tm.historySize < 2 { if tm.historySize < 2 {
return return
} }
// keep the last history element
// Keep the most recent history element
faded := tm.history[:1] faded := tm.history[:1]
for i := 1; i < tm.historySize; i++ { for i := 1; i < tm.historySize; i++ {
// The older the data is, the more we spread it out
x := math.Pow(2, float64(i)) x := math.Pow(2, float64(i))
// Two history data values are merged into a single value
ftv := ((tm.history[i] * (x - 1)) + tm.history[i-1]) / x ftv := ((tm.history[i] * (x - 1)) + tm.history[i-1]) / x
faded = append(faded, ftv) faded = append(faded, ftv)
} }
tm.history = faded tm.history = faded
} }
// calculates the integral (history) component of the trust value
// Calculates the integral (history) component of the trust value
func (tm *TrustMetric) calcHistoryValue() float64 { func (tm *TrustMetric) calcHistoryValue() float64 {
var wk []float64 var wk []float64
// create the weights
// Create the weights.
hlen := tm.numIntervals hlen := tm.numIntervals
for i := 0; i < hlen; i++ { for i := 0; i < hlen; i++ {
x := math.Pow(.8, float64(i+1)) // optimistic wk
x := math.Pow(.8, float64(i+1)) // Optimistic weight
wk = append(wk, x) wk = append(wk, x)
} }
var wsum float64 var wsum float64
// calculate the sum of the weights
// Calculate the sum of the weights
for _, v := range wk { for _, v := range wk {
wsum += v wsum += v
} }
var hv float64 var hv float64
// calculate the history value
// Calculate the history value
for i := 0; i < hlen; i++ { for i := 0; i < hlen; i++ {
weight := wk[i] / wsum weight := wk[i] / wsum
hv += tm.fadedMemoryValue(i) * weight hv += tm.fadedMemoryValue(i) * weight
@ -255,10 +452,10 @@ func (tm *TrustMetric) calcHistoryValue() float64 {
return hv return hv
} }
// calculates the current score for good experiences
// Calculates the current score for good/bad experiences
func (tm *TrustMetric) proportionalValue() float64 { func (tm *TrustMetric) proportionalValue() float64 {
value := 1.0 value := 1.0
// bad events are worth more
// Bad events are worth more in the calculation of our score
total := tm.good + math.Pow(tm.bad, 2) total := tm.good + math.Pow(tm.bad, 2)
if tm.bad > 0 || tm.good > 0 { if tm.bad > 0 || tm.good > 0 {
@ -267,37 +464,49 @@ func (tm *TrustMetric) proportionalValue() float64 {
return value return value
} }
// Calculates the trust value for the request processing
func (tm *TrustMetric) calcTrustValue() float64 { func (tm *TrustMetric) calcTrustValue() float64 {
weightedP := tm.proportionalWeight * tm.proportionalValue() weightedP := tm.proportionalWeight * tm.proportionalValue()
weightedI := tm.integralWeight * tm.historyValue weightedI := tm.integralWeight * tm.historyValue
weightedD := tm.weightedDerivative() weightedD := tm.weightedDerivative()
tv := weightedP + weightedI + weightedD tv := weightedP + weightedI + weightedD
// Do not return a negative value.
if tv < 0 { if tv < 0 {
tv = 0 tv = 0
} }
return tv return tv
} }
// This method is for a goroutine that handles all requests on the metric
func (tm *TrustMetric) processRequests() { func (tm *TrustMetric) processRequests() {
t := time.NewTicker(tm.intervalLen)
defer t.Stop()
var t *time.Ticker
loop: loop:
for { for {
select { select {
case bg := <-tm.update: case bg := <-tm.update:
// Check if this is the first experience with
// what we are tracking since being started or paused
if t == nil {
t = time.NewTicker(tm.intervalLen)
tm.good = 0
tm.bad = 0
}
if bg.IsBad { if bg.IsBad {
tm.bad += float64(bg.Add) tm.bad += float64(bg.Add)
} else { } else {
tm.good += float64(bg.Add) tm.good += float64(bg.Add)
} }
case rtv := <-tm.trustValue: case rtv := <-tm.trustValue:
// send the calculated trust value back
rtv.Resp <- tm.calcTrustValue() rtv.Resp <- tm.calcTrustValue()
case <-t.C: case <-t.C:
// Add the current trust value to the history data
newHist := tm.calcTrustValue() newHist := tm.calcTrustValue()
tm.history = append([]float64{newHist}, tm.history...) tm.history = append([]float64{newHist}, tm.history...)
// Update history and interval counters
if tm.historySize < tm.historyMaxSize { if tm.historySize < tm.historyMaxSize {
tm.historySize++ tm.historySize++
} else { } else {
@ -308,87 +517,26 @@ loop:
tm.numIntervals++ tm.numIntervals++
} }
// Update the history data using Faded Memories
tm.updateFadedMemory() tm.updateFadedMemory()
// Calculate the history value for the upcoming time interval
tm.historyValue = tm.calcHistoryValue() tm.historyValue = tm.calcHistoryValue()
tm.good = 0 tm.good = 0
tm.bad = 0 tm.bad = 0
case <-tm.stop:
break loop
case stop := <-tm.stop:
if stop {
// Stop all further tracking for this metric
break loop
}
// Pause the metric for now by stopping the ticker
if t != nil {
t.Stop()
t = nil
}
} }
} }
}
func (tm *TrustMetric) Stop() {
tm.stop <- 1
}
// indicate that an undesirable event took place
func (tm *TrustMetric) IncBad() {
tm.update <- &updateBadGood{IsBad: true, Add: 1}
}
// multiple undesirable events need to be acknowledged
func (tm *TrustMetric) AddBad(num int) {
tm.update <- &updateBadGood{IsBad: true, Add: num}
}
// positive events need to be recorded as well
func (tm *TrustMetric) IncGood() {
tm.update <- &updateBadGood{IsBad: false, Add: 1}
}
// multiple positive can be indicated in a single call
func (tm *TrustMetric) AddGood(num int) {
tm.update <- &updateBadGood{IsBad: false, Add: num}
}
// get the dependable trust value; a score that takes a long history into account
func (tm *TrustMetric) TrustValue() float64 {
resp := make(chan float64, 1)
tm.trustValue <- &reqTrustValue{Resp: resp}
return <-resp
}
func NewMetric() *TrustMetric {
return NewMetricWithConfig(defaultConfig())
}
func NewMetricWithConfig(tmc *TrustMetricConfig) *TrustMetric {
tm := new(TrustMetric)
dc := defaultConfig()
if tmc.ProportionalWeight != 0 {
tm.proportionalWeight = tmc.ProportionalWeight
} else {
tm.proportionalWeight = dc.ProportionalWeight
}
if tmc.IntegralWeight != 0 {
tm.integralWeight = tmc.IntegralWeight
} else {
tm.integralWeight = dc.IntegralWeight
}
if tmc.HistoryMaxSize != 0 {
tm.historyMaxSize = tmc.HistoryMaxSize
} else {
tm.historyMaxSize = dc.HistoryMaxSize
if t != nil {
t.Stop()
} }
if tmc.IntervalLen != time.Duration(0) {
tm.intervalLen = tmc.IntervalLen
} else {
tm.intervalLen = dc.IntervalLen
}
// this gives our metric a tracking window of days
tm.maxIntervals = int(math.Pow(2, float64(tm.historyMaxSize)))
tm.historyValue = 1.0
tm.update = make(chan *updateBadGood, 10)
tm.trustValue = make(chan *reqTrustValue, 10)
tm.stop = make(chan int, 1)
go tm.processRequests()
return tm
} }

Loading…
Cancel
Save