You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

192 lines
4.5 KiB

  1. // Copyright 2017 Tendermint. All rights reserved.
  2. // Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
  3. package trust
  4. import (
  5. "encoding/json"
  6. "sync"
  7. "time"
  8. cmn "github.com/tendermint/tmlibs/common"
  9. dbm "github.com/tendermint/tmlibs/db"
  10. )
  11. const defaultStorePeriodicSaveInterval = 1 * time.Minute
  12. var trustMetricKey = []byte("trustMetricStore")
  13. // TrustMetricStore - Manages all trust metrics for peers
  14. type TrustMetricStore struct {
  15. cmn.BaseService
  16. // Maps a Peer.Key to that peer's TrustMetric
  17. peerMetrics map[string]*TrustMetric
  18. // Mutex that protects the map and history data file
  19. mtx sync.Mutex
  20. // The db where peer trust metric history data will be stored
  21. db dbm.DB
  22. // This configuration will be used when creating new TrustMetrics
  23. config TrustMetricConfig
  24. }
  25. // NewTrustMetricStore returns a store that saves data to the DB
  26. // and uses the config when creating new trust metrics
  27. func NewTrustMetricStore(db dbm.DB, tmc TrustMetricConfig) *TrustMetricStore {
  28. tms := &TrustMetricStore{
  29. peerMetrics: make(map[string]*TrustMetric),
  30. db: db,
  31. config: tmc,
  32. }
  33. tms.BaseService = *cmn.NewBaseService(nil, "TrustMetricStore", tms)
  34. return tms
  35. }
  36. // OnStart implements Service
  37. func (tms *TrustMetricStore) OnStart() error {
  38. if err := tms.BaseService.OnStart(); err != nil {
  39. return err
  40. }
  41. tms.mtx.Lock()
  42. defer tms.mtx.Unlock()
  43. tms.loadFromDB()
  44. go tms.saveRoutine()
  45. return nil
  46. }
  47. // OnStop implements Service
  48. func (tms *TrustMetricStore) OnStop() {
  49. tms.BaseService.OnStop()
  50. tms.mtx.Lock()
  51. defer tms.mtx.Unlock()
  52. // Stop all trust metric go-routines
  53. for _, tm := range tms.peerMetrics {
  54. tm.Stop()
  55. }
  56. // Make the final trust history data save
  57. tms.saveToDB()
  58. }
  59. // Size returns the number of entries in the trust metric store
  60. func (tms *TrustMetricStore) Size() int {
  61. tms.mtx.Lock()
  62. defer tms.mtx.Unlock()
  63. return tms.size()
  64. }
  65. // GetPeerTrustMetric returns a trust metric by peer key
  66. func (tms *TrustMetricStore) GetPeerTrustMetric(key string) *TrustMetric {
  67. tms.mtx.Lock()
  68. defer tms.mtx.Unlock()
  69. tm, ok := tms.peerMetrics[key]
  70. if !ok {
  71. // If the metric is not available, we will create it
  72. tm = NewMetricWithConfig(tms.config)
  73. // The metric needs to be in the map
  74. tms.peerMetrics[key] = tm
  75. }
  76. return tm
  77. }
  78. // PeerDisconnected pauses the trust metric associated with the peer identified by the key
  79. func (tms *TrustMetricStore) PeerDisconnected(key string) {
  80. tms.mtx.Lock()
  81. defer tms.mtx.Unlock()
  82. // If the Peer that disconnected has a metric, pause it
  83. if tm, ok := tms.peerMetrics[key]; ok {
  84. tm.Pause()
  85. }
  86. }
  87. // Saves the history data for all peers to the store DB.
  88. // This public method acquires the trust metric store lock
  89. func (tms *TrustMetricStore) SaveToDB() {
  90. tms.mtx.Lock()
  91. defer tms.mtx.Unlock()
  92. tms.saveToDB()
  93. }
  94. /* Private methods */
  95. // size returns the number of entries in the store without acquiring the mutex
  96. func (tms *TrustMetricStore) size() int {
  97. return len(tms.peerMetrics)
  98. }
  99. /* Loading & Saving */
  100. /* Both loadFromDB and savetoDB assume the mutex has been acquired */
  101. // Loads the history data for all peers from the store DB
  102. // cmn.Panics if file is corrupt
  103. func (tms *TrustMetricStore) loadFromDB() bool {
  104. // Obtain the history data we have so far
  105. bytes := tms.db.Get(trustMetricKey)
  106. if bytes == nil {
  107. return false
  108. }
  109. peers := make(map[string]MetricHistoryJSON)
  110. err := json.Unmarshal(bytes, &peers)
  111. if err != nil {
  112. cmn.PanicCrisis(cmn.Fmt("Could not unmarshal Trust Metric Store DB data: %v", err))
  113. }
  114. // If history data exists in the file,
  115. // load it into trust metric
  116. for key, p := range peers {
  117. tm := NewMetricWithConfig(tms.config)
  118. tm.Init(p)
  119. // Load the peer trust metric into the store
  120. tms.peerMetrics[key] = tm
  121. }
  122. return true
  123. }
  124. // Saves the history data for all peers to the store DB
  125. func (tms *TrustMetricStore) saveToDB() {
  126. tms.Logger.Debug("Saving TrustHistory to DB", "size", tms.size())
  127. peers := make(map[string]MetricHistoryJSON)
  128. for key, tm := range tms.peerMetrics {
  129. // Add an entry for the peer identified by key
  130. peers[key] = tm.HistoryJSON()
  131. }
  132. // Write all the data back to the DB
  133. bytes, err := json.Marshal(peers)
  134. if err != nil {
  135. tms.Logger.Error("Failed to encode the TrustHistory", "err", err)
  136. return
  137. }
  138. tms.db.SetSync(trustMetricKey, bytes)
  139. }
  140. // Periodically saves the trust history data to the DB
  141. func (tms *TrustMetricStore) saveRoutine() {
  142. t := time.NewTicker(defaultStorePeriodicSaveInterval)
  143. defer t.Stop()
  144. loop:
  145. for {
  146. select {
  147. case <-t.C:
  148. tms.SaveToDB()
  149. case <-tms.Quit:
  150. break loop
  151. }
  152. }
  153. }