You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

137 lines
2.8 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package p2p
  2. import (
  3. "fmt"
  4. "io"
  5. "net"
  6. . "github.com/tendermint/go-common"
  7. cfg "github.com/tendermint/go-config"
  8. "github.com/tendermint/go-wire"
  9. )
  10. type Peer struct {
  11. BaseService
  12. outbound bool
  13. mconn *MConnection
  14. *NodeInfo
  15. Key string
  16. Data *CMap // User data.
  17. }
  18. // NOTE: blocking
  19. // Before creating a peer with newPeer(), perform a handshake on connection.
  20. func peerHandshake(conn net.Conn, ourNodeInfo *NodeInfo) (*NodeInfo, error) {
  21. var peerNodeInfo = new(NodeInfo)
  22. var err1 error
  23. var err2 error
  24. Parallel(
  25. func() {
  26. var n int
  27. wire.WriteBinary(ourNodeInfo, conn, &n, &err1)
  28. },
  29. func() {
  30. var n int
  31. wire.ReadBinary(peerNodeInfo, conn, maxNodeInfoSize, &n, &err2)
  32. log.Notice("Peer handshake", "peerNodeInfo", peerNodeInfo)
  33. })
  34. if err1 != nil {
  35. return nil, err1
  36. }
  37. if err2 != nil {
  38. return nil, err2
  39. }
  40. peerNodeInfo.RemoteAddr = conn.RemoteAddr().String()
  41. return peerNodeInfo, nil
  42. }
  43. // NOTE: call peerHandshake on conn before calling newPeer().
  44. func newPeer(config cfg.Config, conn net.Conn, peerNodeInfo *NodeInfo, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
  45. var p *Peer
  46. onReceive := func(chID byte, msgBytes []byte) {
  47. reactor := reactorsByCh[chID]
  48. if reactor == nil {
  49. PanicSanity(Fmt("Unknown channel %X", chID))
  50. }
  51. reactor.Receive(chID, p, msgBytes)
  52. }
  53. onError := func(r interface{}) {
  54. p.Stop()
  55. onPeerError(p, r)
  56. }
  57. mconn := NewMConnection(config, conn, chDescs, onReceive, onError)
  58. p = &Peer{
  59. outbound: outbound,
  60. mconn: mconn,
  61. NodeInfo: peerNodeInfo,
  62. Key: peerNodeInfo.PubKey.KeyString(),
  63. Data: NewCMap(),
  64. }
  65. p.BaseService = *NewBaseService(log, "Peer", p)
  66. return p
  67. }
  68. func (p *Peer) OnStart() error {
  69. p.BaseService.OnStart()
  70. _, err := p.mconn.Start()
  71. return err
  72. }
  73. func (p *Peer) OnStop() {
  74. p.BaseService.OnStop()
  75. p.mconn.Stop()
  76. }
  77. func (p *Peer) Connection() *MConnection {
  78. return p.mconn
  79. }
  80. func (p *Peer) IsOutbound() bool {
  81. return p.outbound
  82. }
  83. func (p *Peer) Send(chID byte, msg interface{}) bool {
  84. if !p.IsRunning() {
  85. return false
  86. }
  87. return p.mconn.Send(chID, msg)
  88. }
  89. func (p *Peer) TrySend(chID byte, msg interface{}) bool {
  90. if !p.IsRunning() {
  91. return false
  92. }
  93. return p.mconn.TrySend(chID, msg)
  94. }
  95. func (p *Peer) CanSend(chID byte) bool {
  96. if !p.IsRunning() {
  97. return false
  98. }
  99. return p.mconn.CanSend(chID)
  100. }
  101. func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
  102. var n_ int
  103. wire.WriteString(p.Key, w, &n_, &err)
  104. n += int64(n_)
  105. return
  106. }
  107. func (p *Peer) String() string {
  108. if p.outbound {
  109. return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.Key[:12])
  110. } else {
  111. return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.Key[:12])
  112. }
  113. }
  114. func (p *Peer) Equals(other *Peer) bool {
  115. return p.Key == other.Key
  116. }
  117. func (p *Peer) Get(key string) interface{} {
  118. return p.Data.Get(key)
  119. }