You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

129 lines
2.6 KiB

11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
10 years ago
10 years ago
11 years ago
11 years ago
10 years ago
11 years ago
10 years ago
11 years ago
11 years ago
10 years ago
10 years ago
11 years ago
  1. package p2p
  2. import (
  3. "fmt"
  4. "io"
  5. "net"
  6. "github.com/tendermint/tendermint/wire"
  7. . "github.com/tendermint/tendermint/common"
  8. "github.com/tendermint/tendermint/types"
  9. )
  10. type Peer struct {
  11. BaseService
  12. outbound bool
  13. mconn *MConnection
  14. *types.NodeInfo
  15. Key string
  16. Data *CMap // User data.
  17. }
  18. // NOTE: blocking
  19. // Before creating a peer with newPeer(), perform a handshake on connection.
  20. func peerHandshake(conn net.Conn, ourNodeInfo *types.NodeInfo) (*types.NodeInfo, error) {
  21. var peerNodeInfo = new(types.NodeInfo)
  22. var err1 error
  23. var err2 error
  24. Parallel(
  25. func() {
  26. var n int64
  27. wire.WriteBinary(ourNodeInfo, conn, &n, &err1)
  28. },
  29. func() {
  30. var n int64
  31. wire.ReadBinary(peerNodeInfo, conn, &n, &err2)
  32. log.Notice("Peer handshake", "peerNodeInfo", peerNodeInfo)
  33. })
  34. if err1 != nil {
  35. return nil, err1
  36. }
  37. if err2 != nil {
  38. return nil, err2
  39. }
  40. return peerNodeInfo, nil
  41. }
  42. // NOTE: call peerHandshake on conn before calling newPeer().
  43. func newPeer(conn net.Conn, peerNodeInfo *types.NodeInfo, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
  44. var p *Peer
  45. onReceive := func(chId byte, msgBytes []byte) {
  46. reactor := reactorsByCh[chId]
  47. if reactor == nil {
  48. PanicSanity(Fmt("Unknown channel %X", chId))
  49. }
  50. reactor.Receive(chId, p, msgBytes)
  51. }
  52. onError := func(r interface{}) {
  53. p.Stop()
  54. onPeerError(p, r)
  55. }
  56. mconn := NewMConnection(conn, chDescs, onReceive, onError)
  57. p = &Peer{
  58. outbound: outbound,
  59. mconn: mconn,
  60. NodeInfo: peerNodeInfo,
  61. Key: peerNodeInfo.PubKey.KeyString(),
  62. Data: NewCMap(),
  63. }
  64. p.BaseService = *NewBaseService(log, "Peer", p)
  65. return p
  66. }
  67. func (p *Peer) OnStart() {
  68. p.BaseService.OnStart()
  69. p.mconn.Start()
  70. }
  71. func (p *Peer) OnStop() {
  72. p.BaseService.OnStop()
  73. p.mconn.Stop()
  74. }
  75. func (p *Peer) Connection() *MConnection {
  76. return p.mconn
  77. }
  78. func (p *Peer) IsOutbound() bool {
  79. return p.outbound
  80. }
  81. func (p *Peer) Send(chId byte, msg interface{}) bool {
  82. if !p.IsRunning() {
  83. return false
  84. }
  85. return p.mconn.Send(chId, msg)
  86. }
  87. func (p *Peer) TrySend(chId byte, msg interface{}) bool {
  88. if !p.IsRunning() {
  89. return false
  90. }
  91. return p.mconn.TrySend(chId, msg)
  92. }
  93. func (p *Peer) CanSend(chId byte) bool {
  94. if !p.IsRunning() {
  95. return false
  96. }
  97. return p.mconn.CanSend(chId)
  98. }
  99. func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
  100. wire.WriteString(p.Key, w, &n, &err)
  101. return
  102. }
  103. func (p *Peer) String() string {
  104. if p.outbound {
  105. return fmt.Sprintf("Peer{->%v}", p.mconn)
  106. } else {
  107. return fmt.Sprintf("Peer{%v->}", p.mconn)
  108. }
  109. }
  110. func (p *Peer) Equals(other *Peer) bool {
  111. return p.Key == other.Key
  112. }