Browse Source

Merge branch '916-remove-sleeps-from-tests' into develop

pull/968/merge
Ethan Buchman 7 years ago
parent
commit
64f056b57d
6 changed files with 177 additions and 68 deletions
  1. +0
    -1
      p2p/trust/config.go
  2. +44
    -16
      p2p/trust/metric.go
  3. +25
    -20
      p2p/trust/metric_test.go
  4. +16
    -1
      p2p/trust/store.go
  5. +30
    -30
      p2p/trust/store_test.go
  6. +62
    -0
      p2p/trust/ticker.go

+ 0
- 1
p2p/trust/config.go View File

@ -51,6 +51,5 @@ func customConfig(tmc TrustMetricConfig) TrustMetricConfig {
tmc.TrackingWindow >= config.IntervalLength { tmc.TrackingWindow >= config.IntervalLength {
config.TrackingWindow = tmc.TrackingWindow config.TrackingWindow = tmc.TrackingWindow
} }
return config return config
} }

+ 44
- 16
p2p/trust/metric.go View File

@ -7,6 +7,8 @@ import (
"math" "math"
"sync" "sync"
"time" "time"
cmn "github.com/tendermint/tmlibs/common"
) )
//--------------------------------------------------------------------------------------- //---------------------------------------------------------------------------------------
@ -31,6 +33,8 @@ type MetricHistoryJSON struct {
// TrustMetric - keeps track of peer reliability // TrustMetric - keeps track of peer reliability
// See tendermint/docs/architecture/adr-006-trust-metric.md for details // See tendermint/docs/architecture/adr-006-trust-metric.md for details
type TrustMetric struct { type TrustMetric struct {
cmn.BaseService
// Mutex that protects the metric from concurrent access // Mutex that protects the metric from concurrent access
mtx sync.Mutex mtx sync.Mutex
@ -73,16 +77,18 @@ type TrustMetric struct {
// While true, history data is not modified // While true, history data is not modified
paused bool paused bool
// Signal channel for stopping the trust metric go-routine
stop chan struct{}
// Used during testing in order to control the passing of time intervals
testTicker MetricTicker
} }
// NewMetric returns a trust metric with the default configuration
// NewMetric returns a trust metric with the default configuration.
// Use Start to begin tracking the quality of peer behavior over time
func NewMetric() *TrustMetric { func NewMetric() *TrustMetric {
return NewMetricWithConfig(DefaultConfig()) return NewMetricWithConfig(DefaultConfig())
} }
// NewMetricWithConfig returns a trust metric with a custom configuration
// NewMetricWithConfig returns a trust metric with a custom configuration.
// Use Start to begin tracking the quality of peer behavior over time
func NewMetricWithConfig(tmc TrustMetricConfig) *TrustMetric { func NewMetricWithConfig(tmc TrustMetricConfig) *TrustMetric {
tm := new(TrustMetric) tm := new(TrustMetric)
config := customConfig(tmc) config := customConfig(tmc)
@ -97,13 +103,24 @@ func NewMetricWithConfig(tmc TrustMetricConfig) *TrustMetric {
tm.historyMaxSize = intervalToHistoryOffset(tm.maxIntervals) + 1 tm.historyMaxSize = intervalToHistoryOffset(tm.maxIntervals) + 1
// This metric has a perfect history so far // This metric has a perfect history so far
tm.historyValue = 1.0 tm.historyValue = 1.0
// Setup the stop channel
tm.stop = make(chan struct{})
go tm.processRequests()
tm.BaseService = *cmn.NewBaseService(nil, "TrustMetric", tm)
return tm return tm
} }
// OnStart implements Service
func (tm *TrustMetric) OnStart() error {
if err := tm.BaseService.OnStart(); err != nil {
return err
}
go tm.processRequests()
return nil
}
// OnStop implements Service
// Nothing to do since the goroutine shuts down by itself via BaseService.Quit
func (tm *TrustMetric) OnStop() {}
// Returns a snapshot of the trust metric history data // Returns a snapshot of the trust metric history data
func (tm *TrustMetric) HistoryJSON() MetricHistoryJSON { func (tm *TrustMetric) HistoryJSON() MetricHistoryJSON {
tm.mtx.Lock() tm.mtx.Lock()
@ -155,11 +172,6 @@ func (tm *TrustMetric) Pause() {
tm.paused = true tm.paused = true
} }
// Stop tells the metric to stop recording data over time intervals
func (tm *TrustMetric) Stop() {
tm.stop <- struct{}{}
}
// BadEvents indicates that an undesirable event(s) took place // BadEvents indicates that an undesirable event(s) took place
func (tm *TrustMetric) BadEvents(num int) { func (tm *TrustMetric) BadEvents(num int) {
tm.mtx.Lock() tm.mtx.Lock()
@ -232,6 +244,16 @@ func (tm *TrustMetric) NextTimeInterval() {
tm.bad = 0 tm.bad = 0
} }
// SetTicker allows a TestTicker to be provided that will manually control
// the passing of time from the perspective of the TrustMetric.
// The ticker must be set before Start is called on the metric
func (tm *TrustMetric) SetTicker(ticker MetricTicker) {
tm.mtx.Lock()
defer tm.mtx.Unlock()
tm.testTicker = ticker
}
// Copy returns a new trust metric with members containing the same values // Copy returns a new trust metric with members containing the same values
func (tm *TrustMetric) Copy() *TrustMetric { func (tm *TrustMetric) Copy() *TrustMetric {
tm.mtx.Lock() tm.mtx.Lock()
@ -255,22 +277,28 @@ func (tm *TrustMetric) Copy() *TrustMetric {
good: tm.good, good: tm.good,
bad: tm.bad, bad: tm.bad,
paused: tm.paused, paused: tm.paused,
stop: make(chan struct{}),
} }
} }
/* Private methods */ /* Private methods */
// This method is for a goroutine that handles all requests on the metric // This method is for a goroutine that handles all requests on the metric
func (tm *TrustMetric) processRequests() { func (tm *TrustMetric) processRequests() {
t := time.NewTicker(tm.intervalLen)
t := tm.testTicker
if t == nil {
// No test ticker was provided, so we create a normal ticker
t = NewTicker(tm.intervalLen)
}
defer t.Stop() defer t.Stop()
// Obtain the raw channel
tick := t.GetChannel()
loop: loop:
for { for {
select { select {
case <-t.C:
case <-tick:
tm.NextTimeInterval() tm.NextTimeInterval()
case <-tm.stop:
case <-tm.Quit:
// Stop all further tracking for this metric // Stop all further tracking for this metric
break loop break loop
} }


+ 25
- 20
p2p/trust/metric_test.go View File

@ -9,6 +9,7 @@ import (
func TestTrustMetricScores(t *testing.T) { func TestTrustMetricScores(t *testing.T) {
tm := NewMetric() tm := NewMetric()
tm.Start()
// Perfect score // Perfect score
tm.GoodEvents(1) tm.GoodEvents(1)
@ -31,6 +32,7 @@ func TestTrustMetricConfig(t *testing.T) {
} }
tm := NewMetricWithConfig(config) tm := NewMetricWithConfig(config)
tm.Start()
// The max time intervals should be the TrackingWindow / IntervalLen // The max time intervals should be the TrackingWindow / IntervalLen
assert.Equal(t, int(config.TrackingWindow/config.IntervalLength), tm.maxIntervals) assert.Equal(t, int(config.TrackingWindow/config.IntervalLength), tm.maxIntervals)
@ -40,51 +42,54 @@ func TestTrustMetricConfig(t *testing.T) {
assert.Equal(t, dc.ProportionalWeight, tm.proportionalWeight) assert.Equal(t, dc.ProportionalWeight, tm.proportionalWeight)
assert.Equal(t, dc.IntegralWeight, tm.integralWeight) assert.Equal(t, dc.IntegralWeight, tm.integralWeight)
tm.Stop() tm.Stop()
tm.Wait()
config.ProportionalWeight = 0.3 config.ProportionalWeight = 0.3
config.IntegralWeight = 0.7 config.IntegralWeight = 0.7
tm = NewMetricWithConfig(config) tm = NewMetricWithConfig(config)
tm.Start()
// These weights should be equal to our custom values // These weights should be equal to our custom values
assert.Equal(t, config.ProportionalWeight, tm.proportionalWeight) assert.Equal(t, config.ProportionalWeight, tm.proportionalWeight)
assert.Equal(t, config.IntegralWeight, tm.integralWeight) assert.Equal(t, config.IntegralWeight, tm.integralWeight)
tm.Stop() tm.Stop()
tm.Wait()
} }
func TestTrustMetricStopPause(t *testing.T) { func TestTrustMetricStopPause(t *testing.T) {
// Cause time intervals to pass quickly
config := TrustMetricConfig{
TrackingWindow: 5 * time.Minute,
IntervalLength: 10 * time.Millisecond,
}
tm := NewMetricWithConfig(config)
// The TestTicker will provide manual control over
// the passing of time within the metric
tt := NewTestTicker()
tm := NewMetric()
tm.SetTicker(tt)
tm.Start()
// Allow some time intervals to pass and pause // Allow some time intervals to pass and pause
time.Sleep(50 * time.Millisecond)
tt.NextTick()
tt.NextTick()
tm.Pause() tm.Pause()
// Give the pause some time to take place
time.Sleep(10 * time.Millisecond)
first := tm.Copy().numIntervals first := tm.Copy().numIntervals
// Allow more time to pass and check the intervals are unchanged // Allow more time to pass and check the intervals are unchanged
time.Sleep(50 * time.Millisecond)
assert.Equal(t, first, tm.numIntervals)
tt.NextTick()
tt.NextTick()
assert.Equal(t, first, tm.Copy().numIntervals)
// Get the trust metric activated again // Get the trust metric activated again
tm.GoodEvents(5) tm.GoodEvents(5)
// Allow some time intervals to pass and stop // Allow some time intervals to pass and stop
time.Sleep(50 * time.Millisecond)
tt.NextTick()
tt.NextTick()
tm.Stop() tm.Stop()
// Give the stop some time to take place
time.Sleep(10 * time.Millisecond)
tm.Wait()
second := tm.Copy().numIntervals second := tm.Copy().numIntervals
// Allow more time to pass and check the intervals are unchanged
time.Sleep(50 * time.Millisecond)
assert.Equal(t, second, tm.numIntervals)
// Allow more intervals to pass while the metric is stopped
// and check that the number of intervals match
tm.NextTimeInterval()
tm.NextTimeInterval()
assert.Equal(t, second+2, tm.Copy().numIntervals)
if first >= second {
if first > second {
t.Fatalf("numIntervals should always increase or stay the same over time") t.Fatalf("numIntervals should always increase or stay the same over time")
} }
} }

+ 16
- 1
p2p/trust/store.go View File

@ -34,7 +34,8 @@ type TrustMetricStore struct {
} }
// NewTrustMetricStore returns a store that saves data to the DB // NewTrustMetricStore returns a store that saves data to the DB
// and uses the config when creating new trust metrics
// and uses the config when creating new trust metrics.
// Use Start to to initialize the trust metric store
func NewTrustMetricStore(db dbm.DB, tmc TrustMetricConfig) *TrustMetricStore { func NewTrustMetricStore(db dbm.DB, tmc TrustMetricConfig) *TrustMetricStore {
tms := &TrustMetricStore{ tms := &TrustMetricStore{
peerMetrics: make(map[string]*TrustMetric), peerMetrics: make(map[string]*TrustMetric),
@ -84,6 +85,18 @@ func (tms *TrustMetricStore) Size() int {
return tms.size() return tms.size()
} }
// AddPeerTrustMetric takes an existing trust metric and associates it with a peer key.
// The caller is expected to call Start on the TrustMetric being added
func (tms *TrustMetricStore) AddPeerTrustMetric(key string, tm *TrustMetric) {
tms.mtx.Lock()
defer tms.mtx.Unlock()
if key == "" || tm == nil {
return
}
tms.peerMetrics[key] = tm
}
// GetPeerTrustMetric returns a trust metric by peer key // GetPeerTrustMetric returns a trust metric by peer key
func (tms *TrustMetricStore) GetPeerTrustMetric(key string) *TrustMetric { func (tms *TrustMetricStore) GetPeerTrustMetric(key string) *TrustMetric {
tms.mtx.Lock() tms.mtx.Lock()
@ -93,6 +106,7 @@ func (tms *TrustMetricStore) GetPeerTrustMetric(key string) *TrustMetric {
if !ok { if !ok {
// If the metric is not available, we will create it // If the metric is not available, we will create it
tm = NewMetricWithConfig(tms.config) tm = NewMetricWithConfig(tms.config)
tm.Start()
// The metric needs to be in the map // The metric needs to be in the map
tms.peerMetrics[key] = tm tms.peerMetrics[key] = tm
} }
@ -149,6 +163,7 @@ func (tms *TrustMetricStore) loadFromDB() bool {
for key, p := range peers { for key, p := range peers {
tm := NewMetricWithConfig(tms.config) tm := NewMetricWithConfig(tms.config)
tm.Start()
tm.Init(p) tm.Init(p)
// Load the peer trust metric into the store // Load the peer trust metric into the store
tms.peerMetrics[key] = tm tms.peerMetrics[key] = tm


+ 30
- 30
p2p/trust/store_test.go View File

@ -8,7 +8,6 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"testing" "testing"
"time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
dbm "github.com/tendermint/tmlibs/db" dbm "github.com/tendermint/tmlibs/db"
@ -24,46 +23,50 @@ func TestTrustMetricStoreSaveLoad(t *testing.T) {
historyDB := dbm.NewDB("trusthistory", "goleveldb", dir) historyDB := dbm.NewDB("trusthistory", "goleveldb", dir)
config := TrustMetricConfig{
TrackingWindow: 5 * time.Minute,
IntervalLength: 50 * time.Millisecond,
}
// 0 peers saved // 0 peers saved
store := NewTrustMetricStore(historyDB, config)
store := NewTrustMetricStore(historyDB, DefaultConfig())
store.SetLogger(log.TestingLogger()) store.SetLogger(log.TestingLogger())
store.saveToDB() store.saveToDB()
// Load the data from the file // Load the data from the file
store = NewTrustMetricStore(historyDB, config)
store = NewTrustMetricStore(historyDB, DefaultConfig())
store.SetLogger(log.TestingLogger()) store.SetLogger(log.TestingLogger())
store.loadFromDB()
store.Start()
// Make sure we still have 0 entries // Make sure we still have 0 entries
assert.Zero(t, store.Size()) assert.Zero(t, store.Size())
// 100 TestTickers
var tt []*TestTicker
for i := 0; i < 100; i++ {
// The TestTicker will provide manual control over
// the passing of time within the metric
tt = append(tt, NewTestTicker())
}
// 100 peers // 100 peers
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
key := fmt.Sprintf("peer_%d", i) key := fmt.Sprintf("peer_%d", i)
tm := store.GetPeerTrustMetric(key)
tm := NewMetric()
tm.SetTicker(tt[i])
tm.Start()
store.AddPeerTrustMetric(key, tm)
tm.BadEvents(10) tm.BadEvents(10)
tm.GoodEvents(1) tm.GoodEvents(1)
} }
// Check that we have 100 entries and save // Check that we have 100 entries and save
assert.Equal(t, 100, store.Size()) assert.Equal(t, 100, store.Size())
// Give the metrics time to process the history data
time.Sleep(1 * time.Second)
// Stop all the trust metrics and save
for _, tm := range store.peerMetrics {
tm.Stop()
// Give the 100 metrics time to process the history data
for i := 0; i < 100; i++ {
tt[i].NextTick()
tt[i].NextTick()
} }
store.saveToDB()
// Stop all the trust metrics and save
store.Stop()
// Load the data from the DB // Load the data from the DB
store = NewTrustMetricStore(historyDB, config)
store = NewTrustMetricStore(historyDB, DefaultConfig())
store.SetLogger(log.TestingLogger()) store.SetLogger(log.TestingLogger())
store.loadFromDB()
store.Start()
// Check that we still have 100 peers with imperfect trust values // Check that we still have 100 peers with imperfect trust values
assert.Equal(t, 100, store.Size()) assert.Equal(t, 100, store.Size())
@ -71,10 +74,7 @@ func TestTrustMetricStoreSaveLoad(t *testing.T) {
assert.NotEqual(t, 1.0, tm.TrustValue()) assert.NotEqual(t, 1.0, tm.TrustValue())
} }
// Stop all the trust metrics
for _, tm := range store.peerMetrics {
tm.Stop()
}
store.Stop()
} }
func TestTrustMetricStoreConfig(t *testing.T) { func TestTrustMetricStoreConfig(t *testing.T) {
@ -88,6 +88,7 @@ func TestTrustMetricStoreConfig(t *testing.T) {
// Create a store with custom config // Create a store with custom config
store := NewTrustMetricStore(historyDB, config) store := NewTrustMetricStore(historyDB, config)
store.SetLogger(log.TestingLogger()) store.SetLogger(log.TestingLogger())
store.Start()
// Have the store make us a metric with the config // Have the store make us a metric with the config
tm := store.GetPeerTrustMetric("TestKey") tm := store.GetPeerTrustMetric("TestKey")
@ -95,7 +96,7 @@ func TestTrustMetricStoreConfig(t *testing.T) {
// Check that the options made it to the metric // Check that the options made it to the metric
assert.Equal(t, 0.5, tm.proportionalWeight) assert.Equal(t, 0.5, tm.proportionalWeight)
assert.Equal(t, 0.5, tm.integralWeight) assert.Equal(t, 0.5, tm.integralWeight)
tm.Stop()
store.Stop()
} }
func TestTrustMetricStoreLookup(t *testing.T) { func TestTrustMetricStoreLookup(t *testing.T) {
@ -103,6 +104,7 @@ func TestTrustMetricStoreLookup(t *testing.T) {
store := NewTrustMetricStore(historyDB, DefaultConfig()) store := NewTrustMetricStore(historyDB, DefaultConfig())
store.SetLogger(log.TestingLogger()) store.SetLogger(log.TestingLogger())
store.Start()
// Create 100 peers in the trust metric store // Create 100 peers in the trust metric store
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
@ -114,10 +116,7 @@ func TestTrustMetricStoreLookup(t *testing.T) {
assert.NotNil(t, ktm, "Expected to find TrustMetric %s but wasn't there.", key) assert.NotNil(t, ktm, "Expected to find TrustMetric %s but wasn't there.", key)
} }
// Stop all the trust metrics
for _, tm := range store.peerMetrics {
tm.Stop()
}
store.Stop()
} }
func TestTrustMetricStorePeerScore(t *testing.T) { func TestTrustMetricStorePeerScore(t *testing.T) {
@ -125,6 +124,7 @@ func TestTrustMetricStorePeerScore(t *testing.T) {
store := NewTrustMetricStore(historyDB, DefaultConfig()) store := NewTrustMetricStore(historyDB, DefaultConfig())
store.SetLogger(log.TestingLogger()) store.SetLogger(log.TestingLogger())
store.Start()
key := "TestKey" key := "TestKey"
tm := store.GetPeerTrustMetric(key) tm := store.GetPeerTrustMetric(key)
@ -148,5 +148,5 @@ func TestTrustMetricStorePeerScore(t *testing.T) {
// We will remember our experiences with this peer // We will remember our experiences with this peer
tm = store.GetPeerTrustMetric(key) tm = store.GetPeerTrustMetric(key)
assert.NotEqual(t, 100, tm.TrustScore()) assert.NotEqual(t, 100, tm.TrustScore())
tm.Stop()
store.Stop()
} }

+ 62
- 0
p2p/trust/ticker.go View File

@ -0,0 +1,62 @@
// Copyright 2017 Tendermint. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
package trust
import (
"time"
)
// MetricTicker provides a single ticker interface for the trust metric
type MetricTicker interface {
// GetChannel returns the receive only channel that fires at each time interval
GetChannel() <-chan time.Time
// Stop will halt further activity on the ticker channel
Stop()
}
// The ticker used during testing that provides manual control over time intervals
type TestTicker struct {
C chan time.Time
stopped bool
}
// NewTestTicker returns our ticker used within test routines
func NewTestTicker() *TestTicker {
c := make(chan time.Time, 1)
return &TestTicker{
C: c,
}
}
func (t *TestTicker) GetChannel() <-chan time.Time {
return t.C
}
func (t *TestTicker) Stop() {
t.stopped = true
}
// NextInterval manually sends Time on the ticker channel
func (t *TestTicker) NextTick() {
if t.stopped {
return
}
t.C <- time.Now()
}
// Ticker is just a wrap around time.Ticker that allows it
// to meet the requirements of our interface
type Ticker struct {
*time.Ticker
}
// NewTicker returns a normal time.Ticker wrapped to meet our interface
func NewTicker(d time.Duration) *Ticker {
return &Ticker{time.NewTicker(d)}
}
func (t *Ticker) GetChannel() <-chan time.Time {
return t.C
}

Loading…
Cancel
Save