You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

108 lines
2.1 KiB

10 years ago
10 years ago
10 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
  1. package p2p
  2. import (
  3. "fmt"
  4. "io"
  5. "net"
  6. "sync/atomic"
  7. "github.com/tendermint/tendermint/binary"
  8. . "github.com/tendermint/tendermint/common"
  9. )
  10. type Peer struct {
  11. outbound bool
  12. mconn *MConnection
  13. running uint32
  14. Key string
  15. Data *CMap // User data.
  16. }
  17. func newPeer(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
  18. var p *Peer
  19. onReceive := func(chId byte, msgBytes []byte) {
  20. reactor := reactorsByCh[chId]
  21. if reactor == nil {
  22. panic(Fmt("Unknown channel %X", chId))
  23. }
  24. reactor.Receive(chId, p, msgBytes)
  25. }
  26. onError := func(r interface{}) {
  27. p.stop()
  28. onPeerError(p, r)
  29. }
  30. mconn := NewMConnection(conn, chDescs, onReceive, onError)
  31. p = &Peer{
  32. outbound: outbound,
  33. mconn: mconn,
  34. running: 0,
  35. Key: mconn.RemoteAddress.String(),
  36. Data: NewCMap(),
  37. }
  38. return p
  39. }
  40. func (p *Peer) start() {
  41. if atomic.CompareAndSwapUint32(&p.running, 0, 1) {
  42. log.Debug("Starting Peer", "peer", p)
  43. p.mconn.Start()
  44. }
  45. }
  46. func (p *Peer) stop() {
  47. if atomic.CompareAndSwapUint32(&p.running, 1, 0) {
  48. log.Debug("Stopping Peer", "peer", p)
  49. p.mconn.Stop()
  50. }
  51. }
  52. func (p *Peer) IsRunning() bool {
  53. return atomic.LoadUint32(&p.running) == 1
  54. }
  55. func (p *Peer) Connection() *MConnection {
  56. return p.mconn
  57. }
  58. func (p *Peer) IsOutbound() bool {
  59. return p.outbound
  60. }
  61. func (p *Peer) Send(chId byte, msg interface{}) bool {
  62. if atomic.LoadUint32(&p.running) == 0 {
  63. return false
  64. }
  65. return p.mconn.Send(chId, msg)
  66. }
  67. func (p *Peer) TrySend(chId byte, msg interface{}) bool {
  68. if atomic.LoadUint32(&p.running) == 0 {
  69. return false
  70. }
  71. return p.mconn.TrySend(chId, msg)
  72. }
  73. func (p *Peer) CanSend(chId byte) bool {
  74. if atomic.LoadUint32(&p.running) == 0 {
  75. return false
  76. }
  77. return p.mconn.CanSend(chId)
  78. }
  79. func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
  80. binary.WriteString(p.Key, w, &n, &err)
  81. return
  82. }
  83. func (p *Peer) String() string {
  84. if p.outbound {
  85. return fmt.Sprintf("Peer{->%v}", p.mconn)
  86. } else {
  87. return fmt.Sprintf("Peer{%v->}", p.mconn)
  88. }
  89. }
  90. func (p *Peer) Equals(other *Peer) bool {
  91. return p.Key == other.Key
  92. }