You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

136 lines
2.7 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "fmt"
  4. "io"
  5. "net"
  6. . "github.com/tendermint/go-common"
  7. "github.com/tendermint/go-wire"
  8. )
  9. type Peer struct {
  10. BaseService
  11. outbound bool
  12. mconn *MConnection
  13. *NodeInfo
  14. Key string
  15. Data *CMap // User data.
  16. }
  17. // NOTE: blocking
  18. // Before creating a peer with newPeer(), perform a handshake on connection.
  19. func peerHandshake(conn net.Conn, ourNodeInfo *NodeInfo) (*NodeInfo, error) {
  20. var peerNodeInfo = new(NodeInfo)
  21. var err1 error
  22. var err2 error
  23. Parallel(
  24. func() {
  25. var n int
  26. wire.WriteBinary(ourNodeInfo, conn, &n, &err1)
  27. },
  28. func() {
  29. var n int
  30. wire.ReadBinary(peerNodeInfo, conn, maxNodeInfoSize, &n, &err2)
  31. log.Notice("Peer handshake", "peerNodeInfo", peerNodeInfo)
  32. })
  33. if err1 != nil {
  34. return nil, err1
  35. }
  36. if err2 != nil {
  37. return nil, err2
  38. }
  39. peerNodeInfo.RemoteAddr = conn.RemoteAddr().String()
  40. return peerNodeInfo, nil
  41. }
  42. // NOTE: call peerHandshake on conn before calling newPeer().
  43. func newPeer(conn net.Conn, peerNodeInfo *NodeInfo, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
  44. var p *Peer
  45. onReceive := func(chID byte, msgBytes []byte) {
  46. reactor := reactorsByCh[chID]
  47. if reactor == nil {
  48. PanicSanity(Fmt("Unknown channel %X", chID))
  49. }
  50. reactor.Receive(chID, p, msgBytes)
  51. }
  52. onError := func(r interface{}) {
  53. p.Stop()
  54. onPeerError(p, r)
  55. }
  56. mconn := NewMConnection(conn, chDescs, onReceive, onError)
  57. p = &Peer{
  58. outbound: outbound,
  59. mconn: mconn,
  60. NodeInfo: peerNodeInfo,
  61. Key: peerNodeInfo.PubKey.KeyString(),
  62. Data: NewCMap(),
  63. }
  64. p.BaseService = *NewBaseService(log, "Peer", p)
  65. return p
  66. }
  67. func (p *Peer) OnStart() error {
  68. p.BaseService.OnStart()
  69. _, err := p.mconn.Start()
  70. return err
  71. }
  72. func (p *Peer) OnStop() {
  73. p.BaseService.OnStop()
  74. p.mconn.Stop()
  75. }
  76. func (p *Peer) Connection() *MConnection {
  77. return p.mconn
  78. }
  79. func (p *Peer) IsOutbound() bool {
  80. return p.outbound
  81. }
  82. func (p *Peer) Send(chID byte, msg interface{}) bool {
  83. if !p.IsRunning() {
  84. return false
  85. }
  86. return p.mconn.Send(chID, msg)
  87. }
  88. func (p *Peer) TrySend(chID byte, msg interface{}) bool {
  89. if !p.IsRunning() {
  90. return false
  91. }
  92. return p.mconn.TrySend(chID, msg)
  93. }
  94. func (p *Peer) CanSend(chID byte) bool {
  95. if !p.IsRunning() {
  96. return false
  97. }
  98. return p.mconn.CanSend(chID)
  99. }
  100. func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
  101. var n_ int
  102. wire.WriteString(p.Key, w, &n_, &err)
  103. n += int64(n_)
  104. return
  105. }
  106. func (p *Peer) String() string {
  107. if p.outbound {
  108. return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12])
  109. } else {
  110. return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
  111. }
  112. }
  113. func (p *Peer) Equals(other *Peer) bool {
  114. return p.Key == other.Key
  115. }
  116. func (p *Peer) Get(key string) interface{} {
  117. return p.Data.Get(key)
  118. }