You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

159 lines
3.3 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package peer
  2. import (
  3. . "github.com/tendermint/tendermint/binary"
  4. "sync/atomic"
  5. "sync"
  6. "io"
  7. "time"
  8. )
  9. /* Peer */
  10. type Peer struct {
  11. outgoing bool
  12. conn *Connection
  13. channels map[String]*Channel
  14. mtx sync.Mutex
  15. quit chan struct{}
  16. stopped uint32
  17. }
  18. func NewPeer(conn *Connection) *Peer {
  19. return &Peer{
  20. conn: conn,
  21. quit: make(chan struct{}),
  22. stopped: 0,
  23. }
  24. }
  25. func (p *Peer) Start(peerInQueues map[String]chan *InboundMsg ) {
  26. for chName, _ := range p.channels {
  27. go p.inHandler(chName, peerInQueues[chName])
  28. go p.outHandler(chName)
  29. }
  30. }
  31. func (p *Peer) Stop() {
  32. // lock
  33. p.mtx.Lock()
  34. if atomic.CompareAndSwapUint32(&p.stopped, 0, 1) {
  35. close(p.quit)
  36. p.conn.Stop()
  37. }
  38. p.mtx.Unlock()
  39. // unlock
  40. }
  41. func (p *Peer) LocalAddress() *NetAddress {
  42. return p.conn.LocalAddress()
  43. }
  44. func (p *Peer) RemoteAddress() *NetAddress {
  45. return p.conn.RemoteAddress()
  46. }
  47. func (p *Peer) Channel(chName String) *Channel {
  48. return p.channels[chName]
  49. }
  50. // Queue the msg for output.
  51. // If the queue is full, just return false.
  52. func (p *Peer) TryQueueOut(chName String, msg Msg) bool {
  53. channel := p.Channel(chName)
  54. outQueue := channel.OutQueue()
  55. // lock & defer
  56. p.mtx.Lock(); defer p.mtx.Unlock()
  57. if p.stopped == 1 { return false }
  58. select {
  59. case outQueue <- msg:
  60. return true
  61. default: // buffer full
  62. return false
  63. }
  64. // unlock deferred
  65. }
  66. func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
  67. return p.RemoteAddress().WriteTo(w)
  68. }
  69. func (p *Peer) inHandler(chName String, inboundMsgQueue chan<- *InboundMsg) {
  70. channel := p.channels[chName]
  71. inQueue := channel.InQueue()
  72. FOR_LOOP:
  73. for {
  74. select {
  75. case <-p.quit:
  76. break FOR_LOOP
  77. case msg := <-inQueue:
  78. // send to inboundMsgQueue
  79. inboundMsg := &InboundMsg{
  80. Peer: p,
  81. Channel: channel,
  82. Time: Time{time.Now()},
  83. Msg: msg,
  84. }
  85. select {
  86. case <-p.quit:
  87. break FOR_LOOP
  88. case inboundMsgQueue <- inboundMsg:
  89. continue
  90. }
  91. }
  92. }
  93. // cleanup
  94. // (none)
  95. }
  96. func (p *Peer) outHandler(chName String) {
  97. outQueue := p.channels[chName].outQueue
  98. FOR_LOOP:
  99. for {
  100. select {
  101. case <-p.quit:
  102. break FOR_LOOP
  103. case msg := <-outQueue:
  104. // blocks until the connection is Stop'd,
  105. // which happens when this peer is Stop'd.
  106. p.conn.QueueOut(msg.Bytes)
  107. }
  108. }
  109. // cleanup
  110. // (none)
  111. }
  112. /* Channel */
  113. type Channel struct {
  114. name String
  115. inQueue chan Msg
  116. outQueue chan Msg
  117. //stats Stats
  118. }
  119. func NewChannel(name string, bufferSize int) *Channel {
  120. return &Channel{
  121. name: String(name),
  122. inQueue: make(chan Msg, bufferSize),
  123. outQueue: make(chan Msg, bufferSize),
  124. }
  125. }
  126. func (c *Channel) Name() String {
  127. return c.name
  128. }
  129. func (c *Channel) InQueue() <-chan Msg {
  130. return c.inQueue
  131. }
  132. func (c *Channel) OutQueue() chan<- Msg {
  133. return c.outQueue
  134. }