You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

305 lines
8.2 KiB

cleanup: Reduce and normalize import path aliasing. (#6975) The code in the Tendermint repository makes heavy use of import aliasing. This is made necessary by our extensive reuse of common base package names, and by repetition of similar names across different subdirectories. Unfortunately we have not been very consistent about which packages we alias in various circumstances, and the aliases we use vary. In the spirit of the advice in the style guide and https://github.com/golang/go/wiki/CodeReviewComments#imports, his change makes an effort to clean up and normalize import aliasing. This change makes no API or behavioral changes. It is a pure cleanup intended o help make the code more readable to developers (including myself) trying to understand what is being imported where. Only unexported names have been modified, and the changes were generated and applied mechanically with gofmt -r and comby, respecting the lexical and syntactic rules of Go. Even so, I did not fix every inconsistency. Where the changes would be too disruptive, I left it alone. The principles I followed in this cleanup are: - Remove aliases that restate the package name. - Remove aliases where the base package name is unambiguous. - Move overly-terse abbreviations from the import to the usage site. - Fix lexical issues (remove underscores, remove capitalization). - Fix import groupings to more closely match the style guide. - Group blank (side-effecting) imports and ensure they are commented. - Add aliases to multiple imports with the same base package name.
3 years ago
cleanup: Reduce and normalize import path aliasing. (#6975) The code in the Tendermint repository makes heavy use of import aliasing. This is made necessary by our extensive reuse of common base package names, and by repetition of similar names across different subdirectories. Unfortunately we have not been very consistent about which packages we alias in various circumstances, and the aliases we use vary. In the spirit of the advice in the style guide and https://github.com/golang/go/wiki/CodeReviewComments#imports, his change makes an effort to clean up and normalize import aliasing. This change makes no API or behavioral changes. It is a pure cleanup intended o help make the code more readable to developers (including myself) trying to understand what is being imported where. Only unexported names have been modified, and the changes were generated and applied mechanically with gofmt -r and comby, respecting the lexical and syntactic rules of Go. Even so, I did not fix every inconsistency. Where the changes would be too disruptive, I left it alone. The principles I followed in this cleanup are: - Remove aliases that restate the package name. - Remove aliases where the base package name is unambiguous. - Move overly-terse abbreviations from the import to the usage site. - Fix lexical issues (remove underscores, remove capitalization). - Fix import groupings to more closely match the style guide. - Group blank (side-effecting) imports and ensure they are commented. - Add aliases to multiple imports with the same base package name.
3 years ago
  1. package statesync
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "github.com/tendermint/tendermint/internal/p2p"
  8. "github.com/tendermint/tendermint/light/provider"
  9. ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
  10. tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
  11. "github.com/tendermint/tendermint/types"
  12. )
  13. var (
  14. errNoConnectedPeers = errors.New("no available peers to dispatch request to")
  15. errUnsolicitedResponse = errors.New("unsolicited light block response")
  16. errPeerAlreadyBusy = errors.New("peer is already processing a request")
  17. errDisconnected = errors.New("dispatcher disconnected")
  18. )
  19. // A Dispatcher multiplexes concurrent requests by multiple peers for light blocks.
  20. // Only one request per peer can be sent at a time. Subsequent concurrent requests will
  21. // report an error from the LightBlock method.
  22. // NOTE: It is not the responsibility of the dispatcher to verify the light blocks.
  23. type Dispatcher struct {
  24. // the channel with which to send light block requests on
  25. requestCh *p2p.Channel
  26. mtx sync.Mutex
  27. // all pending calls that have been dispatched and are awaiting an answer
  28. calls map[types.NodeID]chan *types.LightBlock
  29. }
  30. func NewDispatcher(requestChannel *p2p.Channel) *Dispatcher {
  31. return &Dispatcher{
  32. requestCh: requestChannel,
  33. calls: make(map[types.NodeID]chan *types.LightBlock),
  34. }
  35. }
  36. // LightBlock uses the request channel to fetch a light block from a given peer
  37. // tracking, the call and waiting for the reactor to pass back the response. A nil
  38. // LightBlock response is used to signal that the peer doesn't have the requested LightBlock.
  39. func (d *Dispatcher) LightBlock(ctx context.Context, height int64, peer types.NodeID) (*types.LightBlock, error) {
  40. // dispatch the request to the peer
  41. callCh, err := d.dispatch(ctx, peer, height)
  42. if err != nil {
  43. return nil, err
  44. }
  45. // clean up the call after a response is returned
  46. defer func() {
  47. d.mtx.Lock()
  48. defer d.mtx.Unlock()
  49. if call, ok := d.calls[peer]; ok {
  50. delete(d.calls, peer)
  51. close(call)
  52. }
  53. }()
  54. // wait for a response, cancel or timeout
  55. select {
  56. case resp := <-callCh:
  57. return resp, nil
  58. case <-ctx.Done():
  59. return nil, ctx.Err()
  60. }
  61. }
  62. // dispatch takes a peer and allocates it a channel so long as it's not already
  63. // busy and the receiving channel is still running. It then dispatches the message
  64. func (d *Dispatcher) dispatch(ctx context.Context, peer types.NodeID, height int64) (chan *types.LightBlock, error) {
  65. d.mtx.Lock()
  66. defer d.mtx.Unlock()
  67. select {
  68. case <-ctx.Done():
  69. return nil, errDisconnected
  70. default:
  71. }
  72. ch := make(chan *types.LightBlock, 1)
  73. // check if a request for the same peer has already been made
  74. if _, ok := d.calls[peer]; ok {
  75. close(ch)
  76. return ch, errPeerAlreadyBusy
  77. }
  78. d.calls[peer] = ch
  79. // send request
  80. if err := d.requestCh.Send(ctx, p2p.Envelope{
  81. To: peer,
  82. Message: &ssproto.LightBlockRequest{
  83. Height: uint64(height),
  84. },
  85. }); err != nil {
  86. close(ch)
  87. return ch, err
  88. }
  89. return ch, nil
  90. }
  91. // Respond allows the underlying process which receives requests on the
  92. // requestCh to respond with the respective light block. A nil response is used to
  93. // represent that the receiver of the request does not have a light block at that height.
  94. func (d *Dispatcher) Respond(lb *tmproto.LightBlock, peer types.NodeID) error {
  95. d.mtx.Lock()
  96. defer d.mtx.Unlock()
  97. // check that the response came from a request
  98. answerCh, ok := d.calls[peer]
  99. if !ok {
  100. // this can also happen if the response came in after the timeout
  101. return errUnsolicitedResponse
  102. }
  103. // If lb is nil we take that to mean that the peer didn't have the requested light
  104. // block and thus pass on the nil to the caller.
  105. if lb == nil {
  106. answerCh <- nil
  107. return nil
  108. }
  109. block, err := types.LightBlockFromProto(lb)
  110. if err != nil {
  111. return err
  112. }
  113. answerCh <- block
  114. return nil
  115. }
  116. // Close shuts down the dispatcher and cancels any pending calls awaiting responses.
  117. // Peers awaiting responses that have not arrived are delivered a nil block.
  118. func (d *Dispatcher) Close() {
  119. d.mtx.Lock()
  120. defer d.mtx.Unlock()
  121. for peer, call := range d.calls {
  122. delete(d.calls, peer)
  123. close(call)
  124. }
  125. }
  126. //----------------------------------------------------------------
  127. // BlockProvider is a p2p based light provider which uses a dispatcher connected
  128. // to the state sync reactor to serve light blocks to the light client
  129. //
  130. // TODO: This should probably be moved over to the light package but as we're
  131. // not yet officially supporting p2p light clients we'll leave this here for now.
  132. //
  133. // NOTE: BlockProvider will return an error with concurrent calls. However, we don't
  134. // need a mutex because a light client (and the backfill process) will never call a
  135. // method more than once at the same time
  136. type BlockProvider struct {
  137. peer types.NodeID
  138. chainID string
  139. dispatcher *Dispatcher
  140. }
  141. // Creates a block provider which implements the light client Provider interface.
  142. func NewBlockProvider(peer types.NodeID, chainID string, dispatcher *Dispatcher) *BlockProvider {
  143. return &BlockProvider{
  144. peer: peer,
  145. chainID: chainID,
  146. dispatcher: dispatcher,
  147. }
  148. }
  149. // LightBlock fetches a light block from the peer at a specified height returning either a
  150. // light block or an appropriate error.
  151. func (p *BlockProvider) LightBlock(ctx context.Context, height int64) (*types.LightBlock, error) {
  152. lb, err := p.dispatcher.LightBlock(ctx, height, p.peer)
  153. switch err {
  154. case nil:
  155. if lb == nil {
  156. return nil, provider.ErrLightBlockNotFound
  157. }
  158. case context.DeadlineExceeded, context.Canceled:
  159. return nil, err
  160. case errPeerAlreadyBusy:
  161. return nil, provider.ErrLightBlockNotFound
  162. default:
  163. return nil, provider.ErrUnreliableProvider{Reason: err.Error()}
  164. }
  165. // check that the height requested is the same one returned
  166. if lb.Height != height {
  167. return nil, provider.ErrBadLightBlock{
  168. Reason: fmt.Errorf("expected height %d, got height %d", height, lb.Height),
  169. }
  170. }
  171. // perform basic validation
  172. if err := lb.ValidateBasic(p.chainID); err != nil {
  173. return nil, provider.ErrBadLightBlock{Reason: err}
  174. }
  175. return lb, nil
  176. }
  177. // ReportEvidence should allow for the light client to report any light client
  178. // attacks. This is a no op as there currently isn't a way to wire this up to
  179. // the evidence reactor (we should endeavor to do this in the future but for now
  180. // it's not critical for backwards verification)
  181. func (p *BlockProvider) ReportEvidence(ctx context.Context, ev types.Evidence) error {
  182. return nil
  183. }
  184. // String implements stringer interface
  185. func (p *BlockProvider) String() string { return string(p.peer) }
  186. //----------------------------------------------------------------
  187. // peerList is a rolling list of peers. This is used to distribute the load of
  188. // retrieving blocks over all the peers the reactor is connected to
  189. type peerList struct {
  190. mtx sync.Mutex
  191. peers []types.NodeID
  192. waiting []chan types.NodeID
  193. }
  194. func newPeerList() *peerList {
  195. return &peerList{
  196. peers: make([]types.NodeID, 0),
  197. waiting: make([]chan types.NodeID, 0),
  198. }
  199. }
  200. func (l *peerList) Len() int {
  201. l.mtx.Lock()
  202. defer l.mtx.Unlock()
  203. return len(l.peers)
  204. }
  205. func (l *peerList) Pop(ctx context.Context) types.NodeID {
  206. l.mtx.Lock()
  207. if len(l.peers) == 0 {
  208. // if we don't have any peers in the list we block until a peer is
  209. // appended
  210. wait := make(chan types.NodeID, 1)
  211. l.waiting = append(l.waiting, wait)
  212. // unlock whilst waiting so that the list can be appended to
  213. l.mtx.Unlock()
  214. select {
  215. case peer := <-wait:
  216. return peer
  217. case <-ctx.Done():
  218. return ""
  219. }
  220. }
  221. peer := l.peers[0]
  222. l.peers = l.peers[1:]
  223. l.mtx.Unlock()
  224. return peer
  225. }
  226. func (l *peerList) Append(peer types.NodeID) {
  227. l.mtx.Lock()
  228. defer l.mtx.Unlock()
  229. if len(l.waiting) > 0 {
  230. wait := l.waiting[0]
  231. l.waiting = l.waiting[1:]
  232. wait <- peer
  233. close(wait)
  234. } else {
  235. l.peers = append(l.peers, peer)
  236. }
  237. }
  238. func (l *peerList) Remove(peer types.NodeID) {
  239. l.mtx.Lock()
  240. defer l.mtx.Unlock()
  241. for i, p := range l.peers {
  242. if p == peer {
  243. l.peers = append(l.peers[:i], l.peers[i+1:]...)
  244. return
  245. }
  246. }
  247. }
  248. func (l *peerList) All() []types.NodeID {
  249. l.mtx.Lock()
  250. defer l.mtx.Unlock()
  251. return l.peers
  252. }
  253. func (l *peerList) Contains(id types.NodeID) bool {
  254. l.mtx.Lock()
  255. defer l.mtx.Unlock()
  256. for _, p := range l.peers {
  257. if id == p {
  258. return true
  259. }
  260. }
  261. return false
  262. }