You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

109 lines
2.1 KiB

10 years ago
10 years ago
10 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package p2p
  2. import (
  3. "fmt"
  4. "io"
  5. "net"
  6. "sync/atomic"
  7. "github.com/tendermint/tendermint/binary"
  8. . "github.com/tendermint/tendermint/common"
  9. )
  10. type Peer struct {
  11. outbound bool
  12. mconn *MConnection
  13. started uint32
  14. stopped uint32
  15. Key string
  16. Data *CMap // User data.
  17. }
  18. func newPeer(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
  19. var p *Peer
  20. onReceive := func(chId byte, msgBytes []byte) {
  21. reactor := reactorsByCh[chId]
  22. if reactor == nil {
  23. panic(Fmt("Unknown channel %X", chId))
  24. }
  25. reactor.Receive(chId, p, msgBytes)
  26. }
  27. onError := func(r interface{}) {
  28. p.stop()
  29. onPeerError(p, r)
  30. }
  31. mconn := NewMConnection(conn, chDescs, onReceive, onError)
  32. p = &Peer{
  33. outbound: outbound,
  34. mconn: mconn,
  35. stopped: 0,
  36. Key: mconn.RemoteAddress.String(),
  37. Data: NewCMap(),
  38. }
  39. return p
  40. }
  41. func (p *Peer) start() {
  42. if atomic.CompareAndSwapUint32(&p.started, 0, 1) {
  43. log.Debug("Starting Peer", "peer", p)
  44. p.mconn.Start()
  45. }
  46. }
  47. func (p *Peer) stop() {
  48. if atomic.CompareAndSwapUint32(&p.stopped, 0, 1) {
  49. log.Debug("Stopping Peer", "peer", p)
  50. p.mconn.Stop()
  51. }
  52. }
  53. func (p *Peer) IsStopped() bool {
  54. return atomic.LoadUint32(&p.stopped) == 1
  55. }
  56. func (p *Peer) Connection() *MConnection {
  57. return p.mconn
  58. }
  59. func (p *Peer) IsOutbound() bool {
  60. return p.outbound
  61. }
  62. func (p *Peer) Send(chId byte, msg interface{}) bool {
  63. if atomic.LoadUint32(&p.stopped) == 1 {
  64. return false
  65. }
  66. return p.mconn.Send(chId, msg)
  67. }
  68. func (p *Peer) TrySend(chId byte, msg interface{}) bool {
  69. if atomic.LoadUint32(&p.stopped) == 1 {
  70. return false
  71. }
  72. return p.mconn.TrySend(chId, msg)
  73. }
  74. func (p *Peer) CanSend(chId byte) bool {
  75. if atomic.LoadUint32(&p.stopped) == 1 {
  76. return false
  77. }
  78. return p.mconn.CanSend(chId)
  79. }
  80. func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
  81. binary.WriteString(p.Key, w, &n, &err)
  82. return
  83. }
  84. func (p *Peer) String() string {
  85. if p.outbound {
  86. return fmt.Sprintf("Peer{->%v}", p.mconn)
  87. } else {
  88. return fmt.Sprintf("Peer{%v->}", p.mconn)
  89. }
  90. }
  91. func (p *Peer) Equals(other *Peer) bool {
  92. return p.Key == other.Key
  93. }