diff --git a/p2p/trust/config.go b/p2p/trust/config.go index 6fb0e681b..b20a8b2cb 100644 --- a/p2p/trust/config.go +++ b/p2p/trust/config.go @@ -51,6 +51,5 @@ func customConfig(tmc TrustMetricConfig) TrustMetricConfig { tmc.TrackingWindow >= config.IntervalLength { config.TrackingWindow = tmc.TrackingWindow } - return config } diff --git a/p2p/trust/metric.go b/p2p/trust/metric.go index 3a0bf4809..d40ffa54f 100644 --- a/p2p/trust/metric.go +++ b/p2p/trust/metric.go @@ -7,6 +7,8 @@ import ( "math" "sync" "time" + + cmn "github.com/tendermint/tmlibs/common" ) //--------------------------------------------------------------------------------------- @@ -31,6 +33,8 @@ type MetricHistoryJSON struct { // TrustMetric - keeps track of peer reliability // See tendermint/docs/architecture/adr-006-trust-metric.md for details type TrustMetric struct { + cmn.BaseService + // Mutex that protects the metric from concurrent access mtx sync.Mutex @@ -73,25 +77,24 @@ type TrustMetric struct { // While true, history data is not modified paused bool - // Set to true once the metric has been stopped - stopped bool - // Signal channel for stopping the trust metric go-routine stop chan struct{} - // Slice of signal channels fired when the metric is stopped - waitForStop []chan struct{} + // Signal channel fired when the metric go-routine has stopped + done chan struct{} - // Slice of signal channels fired when the metric time interval ticker is fired - waitForTimeInterval []chan struct{} + // Used during testing in order to control the passing of time intervals + testTicker MetricTicker } -// NewMetric returns a trust metric with the default configuration +// NewMetric returns a trust metric with the default configuration. +// Use Start to begin tracking the quality of peer behavior over time func NewMetric() *TrustMetric { return NewMetricWithConfig(DefaultConfig()) } -// NewMetricWithConfig returns a trust metric with a custom configuration +// NewMetricWithConfig returns a trust metric with a custom configuration. +// Use Start to begin tracking the quality of peer behavior over time func NewMetricWithConfig(tmc TrustMetricConfig) *TrustMetric { tm := new(TrustMetric) config := customConfig(tmc) @@ -106,13 +109,34 @@ func NewMetricWithConfig(tmc TrustMetricConfig) *TrustMetric { tm.historyMaxSize = intervalToHistoryOffset(tm.maxIntervals) + 1 // This metric has a perfect history so far tm.historyValue = 1.0 - // Setup the stop channel - tm.stop = make(chan struct{}) + // Setup the go-routine stop channel + tm.stop = make(chan struct{}, 1) + // Setup the go-routine done channel + tm.done = make(chan struct{}, 1) - go tm.processRequests() + tm.BaseService = *cmn.NewBaseService(nil, "TrustMetric", tm) return tm } +// OnStart implements Service +func (tm *TrustMetric) OnStart() error { + if err := tm.BaseService.OnStart(); err != nil { + return err + } + go tm.processRequests() + return nil +} + +// OnStop implements Service +// Stop tells the metric to stop recording data over time intervals +// This method also blocks until the metric has completely stopped +func (tm *TrustMetric) OnStop() { + tm.BaseService.OnStop() + + tm.stop <- struct{}{} + <-tm.done +} + // Returns a snapshot of the trust metric history data func (tm *TrustMetric) HistoryJSON() MetricHistoryJSON { tm.mtx.Lock() @@ -164,18 +188,6 @@ func (tm *TrustMetric) Pause() { tm.paused = true } -// Stop tells the metric to stop recording data over time intervals -// This method also blocks until the metric has completely stopped -func (tm *TrustMetric) Stop() { - tm.stop <- struct{}{} - - wait := make(chan struct{}) - - if tm.AddStopWaitChannel(wait) { - <-wait - } -} - // BadEvents indicates that an undesirable event(s) took place func (tm *TrustMetric) BadEvents(num int) { tm.mtx.Lock() @@ -248,6 +260,16 @@ func (tm *TrustMetric) NextTimeInterval() { tm.bad = 0 } +// SetTicker allows a TestTicker to be provided that will manually control +// the passing of time from the perspective of the TrustMetric. +// The ticker must be set before Start is called on the metric +func (tm *TrustMetric) SetTicker(ticker MetricTicker) { + tm.mtx.Lock() + defer tm.mtx.Unlock() + + tm.testTicker = ticker +} + // Copy returns a new trust metric with members containing the same values func (tm *TrustMetric) Copy() *TrustMetric { tm.mtx.Lock() @@ -271,98 +293,35 @@ func (tm *TrustMetric) Copy() *TrustMetric { good: tm.good, bad: tm.bad, paused: tm.paused, - stop: make(chan struct{}), - } -} - -// WaitForTimeIntervalToPass blocks until the metric -// go-routine ticker fire again -func (tm *TrustMetric) WaitForTimeIntervalToPass() { - interval := make(chan struct{}) - - if tm.AddTimeIntervalWaitChannel(interval) { - <-interval + stop: make(chan struct{}, 1), + done: make(chan struct{}, 1), } } -// SignalTimeIntervalPassed fires all the maintained signal channels -// and clears the group of signal channels -func (tm *TrustMetric) SignalTimeIntervalPassed() { - tm.mtx.Lock() - defer tm.mtx.Unlock() - - for _, interval := range tm.waitForTimeInterval { - interval <- struct{}{} - } - - tm.waitForTimeInterval = []chan struct{}{} -} - -// AddTimeIntervalWaitChannel adds a signal channel to a group of -// waiters for this metric. This method returns true if the channel -// was added to the group before the metric was stopped -func (tm *TrustMetric) AddTimeIntervalWaitChannel(interval chan struct{}) bool { - tm.mtx.Lock() - defer tm.mtx.Unlock() - - var added bool - - if !tm.stopped { - tm.waitForTimeInterval = append(tm.waitForTimeInterval, interval) - added = true - } - return added -} - -// SignalStopped fires all the maintained signal channels and sets -// metric stopped status to true -func (tm *TrustMetric) SignalStopped() { - tm.mtx.Lock() - defer tm.mtx.Unlock() - - for _, stop := range tm.waitForStop { - stop <- struct{}{} - } - - tm.stopped = true -} - -// AddStopWaitChannel adds a signal channel to a group of waiters -// for this metric. This method returns true if the channel was -// added to the group before the metric was stopped -func (tm *TrustMetric) AddStopWaitChannel(stop chan struct{}) bool { - tm.mtx.Lock() - defer tm.mtx.Unlock() - - var added bool - - if !tm.stopped { - tm.waitForStop = append(tm.waitForStop, stop) - added = true - } - return added -} - /* Private methods */ // This method is for a goroutine that handles all requests on the metric func (tm *TrustMetric) processRequests() { - t := time.NewTicker(tm.intervalLen) + t := tm.testTicker + if t == nil { + // No test ticker was provided, so we create a normal ticker + t = NewTicker(tm.intervalLen) + } defer t.Stop() + // Obtain the raw channel + tick := t.GetChannel() loop: for { select { - case <-t.C: + case <-tick: tm.NextTimeInterval() - tm.SignalTimeIntervalPassed() case <-tm.stop: // Stop all further tracking for this metric break loop } } - - // Change the status to stopped and signal the waiters - tm.SignalStopped() + // Send the done signal + tm.done <- struct{}{} } // Wakes the trust metric up if it is currently paused diff --git a/p2p/trust/metric_test.go b/p2p/trust/metric_test.go index a83848ced..1fae7917f 100644 --- a/p2p/trust/metric_test.go +++ b/p2p/trust/metric_test.go @@ -9,6 +9,7 @@ import ( func TestTrustMetricScores(t *testing.T) { tm := NewMetric() + tm.Start() // Perfect score tm.GoodEvents(1) @@ -31,6 +32,7 @@ func TestTrustMetricConfig(t *testing.T) { } tm := NewMetricWithConfig(config) + tm.Start() // The max time intervals should be the TrackingWindow / IntervalLen assert.Equal(t, int(config.TrackingWindow/config.IntervalLength), tm.maxIntervals) @@ -44,6 +46,7 @@ func TestTrustMetricConfig(t *testing.T) { config.ProportionalWeight = 0.3 config.IntegralWeight = 0.7 tm = NewMetricWithConfig(config) + tm.Start() // These weights should be equal to our custom values assert.Equal(t, config.ProportionalWeight, tm.proportionalWeight) @@ -52,14 +55,12 @@ func TestTrustMetricConfig(t *testing.T) { } func TestTrustMetricStopPause(t *testing.T) { - // Cause time intervals to pass quickly - config := TrustMetricConfig{ - TrackingWindow: 5 * time.Minute, - IntervalLength: 10 * time.Millisecond, - } - - tm := NewMetricWithConfig(config) - + // The TestTicker will provide manual control over + // the passing of time within the metric + tt := NewTestTicker() + tm := NewMetric() + tm.SetTicker(tt) + tm.Start() // Allow some time intervals to pass and pause tm.NextTimeInterval() tm.NextTimeInterval() @@ -67,8 +68,8 @@ func TestTrustMetricStopPause(t *testing.T) { first := tm.Copy().numIntervals // Allow more time to pass and check the intervals are unchanged - tm.WaitForTimeIntervalToPass() - tm.WaitForTimeIntervalToPass() + tt.NextTick() + tt.NextTick() assert.Equal(t, first, tm.Copy().numIntervals) // Get the trust metric activated again diff --git a/p2p/trust/store.go b/p2p/trust/store.go index e86aecd2c..c070f33d9 100644 --- a/p2p/trust/store.go +++ b/p2p/trust/store.go @@ -34,7 +34,8 @@ type TrustMetricStore struct { } // NewTrustMetricStore returns a store that saves data to the DB -// and uses the config when creating new trust metrics +// and uses the config when creating new trust metrics. +// Use Start to to initialize the trust metric store func NewTrustMetricStore(db dbm.DB, tmc TrustMetricConfig) *TrustMetricStore { tms := &TrustMetricStore{ peerMetrics: make(map[string]*TrustMetric), @@ -84,6 +85,18 @@ func (tms *TrustMetricStore) Size() int { return tms.size() } +// AddPeerTrustMetric takes an existing trust metric and associates it with a peer key. +// The caller is expected to call Start on the TrustMetric being added +func (tms *TrustMetricStore) AddPeerTrustMetric(key string, tm *TrustMetric) { + tms.mtx.Lock() + defer tms.mtx.Unlock() + + if key == "" || tm == nil { + return + } + tms.peerMetrics[key] = tm +} + // GetPeerTrustMetric returns a trust metric by peer key func (tms *TrustMetricStore) GetPeerTrustMetric(key string) *TrustMetric { tms.mtx.Lock() @@ -93,6 +106,7 @@ func (tms *TrustMetricStore) GetPeerTrustMetric(key string) *TrustMetric { if !ok { // If the metric is not available, we will create it tm = NewMetricWithConfig(tms.config) + tm.Start() // The metric needs to be in the map tms.peerMetrics[key] = tm } @@ -149,6 +163,7 @@ func (tms *TrustMetricStore) loadFromDB() bool { for key, p := range peers { tm := NewMetricWithConfig(tms.config) + tm.Start() tm.Init(p) // Load the peer trust metric into the store tms.peerMetrics[key] = tm diff --git a/p2p/trust/store_test.go b/p2p/trust/store_test.go index c0306bba8..4e5553961 100644 --- a/p2p/trust/store_test.go +++ b/p2p/trust/store_test.go @@ -8,7 +8,6 @@ import ( "io/ioutil" "os" "testing" - "time" "github.com/stretchr/testify/assert" dbm "github.com/tendermint/tmlibs/db" @@ -24,46 +23,50 @@ func TestTrustMetricStoreSaveLoad(t *testing.T) { historyDB := dbm.NewDB("trusthistory", "goleveldb", dir) - config := TrustMetricConfig{ - TrackingWindow: 5 * time.Minute, - IntervalLength: 50 * time.Millisecond, - } - // 0 peers saved - store := NewTrustMetricStore(historyDB, config) + store := NewTrustMetricStore(historyDB, DefaultConfig()) store.SetLogger(log.TestingLogger()) store.saveToDB() // Load the data from the file - store = NewTrustMetricStore(historyDB, config) + store = NewTrustMetricStore(historyDB, DefaultConfig()) store.SetLogger(log.TestingLogger()) - store.loadFromDB() + store.Start() // Make sure we still have 0 entries assert.Zero(t, store.Size()) + // 100 TestTickers + var tt []*TestTicker + for i := 0; i < 100; i++ { + // The TestTicker will provide manual control over + // the passing of time within the metric + tt = append(tt, NewTestTicker()) + } // 100 peers for i := 0; i < 100; i++ { key := fmt.Sprintf("peer_%d", i) - tm := store.GetPeerTrustMetric(key) + tm := NewMetric() + + tm.SetTicker(tt[i]) + tm.Start() + store.AddPeerTrustMetric(key, tm) tm.BadEvents(10) tm.GoodEvents(1) } - // Check that we have 100 entries and save assert.Equal(t, 100, store.Size()) - // Give the metrics time to process the history data - time.Sleep(1 * time.Second) - - // Stop all the trust metrics and save - for _, tm := range store.peerMetrics { - tm.Stop() + // Give the 100 metrics time to process the history data + for i := 0; i < 100; i++ { + tt[i].NextTick() + tt[i].NextTick() } - store.saveToDB() + // Stop all the trust metrics and save + store.Stop() // Load the data from the DB - store = NewTrustMetricStore(historyDB, config) + store = NewTrustMetricStore(historyDB, DefaultConfig()) store.SetLogger(log.TestingLogger()) - store.loadFromDB() + store.Start() // Check that we still have 100 peers with imperfect trust values assert.Equal(t, 100, store.Size()) @@ -71,10 +74,7 @@ func TestTrustMetricStoreSaveLoad(t *testing.T) { assert.NotEqual(t, 1.0, tm.TrustValue()) } - // Stop all the trust metrics - for _, tm := range store.peerMetrics { - tm.Stop() - } + store.Stop() } func TestTrustMetricStoreConfig(t *testing.T) { @@ -88,6 +88,7 @@ func TestTrustMetricStoreConfig(t *testing.T) { // Create a store with custom config store := NewTrustMetricStore(historyDB, config) store.SetLogger(log.TestingLogger()) + store.Start() // Have the store make us a metric with the config tm := store.GetPeerTrustMetric("TestKey") @@ -95,7 +96,7 @@ func TestTrustMetricStoreConfig(t *testing.T) { // Check that the options made it to the metric assert.Equal(t, 0.5, tm.proportionalWeight) assert.Equal(t, 0.5, tm.integralWeight) - tm.Stop() + store.Stop() } func TestTrustMetricStoreLookup(t *testing.T) { @@ -103,6 +104,7 @@ func TestTrustMetricStoreLookup(t *testing.T) { store := NewTrustMetricStore(historyDB, DefaultConfig()) store.SetLogger(log.TestingLogger()) + store.Start() // Create 100 peers in the trust metric store for i := 0; i < 100; i++ { @@ -114,10 +116,7 @@ func TestTrustMetricStoreLookup(t *testing.T) { assert.NotNil(t, ktm, "Expected to find TrustMetric %s but wasn't there.", key) } - // Stop all the trust metrics - for _, tm := range store.peerMetrics { - tm.Stop() - } + store.Stop() } func TestTrustMetricStorePeerScore(t *testing.T) { @@ -125,6 +124,7 @@ func TestTrustMetricStorePeerScore(t *testing.T) { store := NewTrustMetricStore(historyDB, DefaultConfig()) store.SetLogger(log.TestingLogger()) + store.Start() key := "TestKey" tm := store.GetPeerTrustMetric(key) @@ -148,5 +148,5 @@ func TestTrustMetricStorePeerScore(t *testing.T) { // We will remember our experiences with this peer tm = store.GetPeerTrustMetric(key) assert.NotEqual(t, 100, tm.TrustScore()) - tm.Stop() + store.Stop() } diff --git a/p2p/trust/ticker.go b/p2p/trust/ticker.go new file mode 100644 index 000000000..bce9fcc24 --- /dev/null +++ b/p2p/trust/ticker.go @@ -0,0 +1,62 @@ +// Copyright 2017 Tendermint. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package trust + +import ( + "time" +) + +// MetricTicker provides a single ticker interface for the trust metric +type MetricTicker interface { + // GetChannel returns the receive only channel that fires at each time interval + GetChannel() <-chan time.Time + + // Stop will halt further activity on the ticker channel + Stop() +} + +// The ticker used during testing that provides manual control over time intervals +type TestTicker struct { + C chan time.Time + stopped bool +} + +// NewTestTicker returns our ticker used within test routines +func NewTestTicker() *TestTicker { + c := make(chan time.Time, 1) + return &TestTicker{ + C: c, + } +} + +func (t *TestTicker) GetChannel() <-chan time.Time { + return t.C +} + +func (t *TestTicker) Stop() { + t.stopped = true +} + +// NextInterval manually sends Time on the ticker channel +func (t *TestTicker) NextTick() { + if t.stopped { + return + } + t.C <- time.Now() +} + +// Ticker is just a wrap around time.Ticker that allows it +// to meet the requirements of our interface +type Ticker struct { + *time.Ticker +} + +// NewTicker returns a normal time.Ticker wrapped to meet our interface +func NewTicker(d time.Duration) *Ticker { + return &Ticker{time.NewTicker(d)} +} + +func (t *Ticker) GetChannel() <-chan time.Time { + return t.C +}