diff --git a/peer/client.go b/peer/client.go index 7619c8f1d..029bd703e 100644 --- a/peer/client.go +++ b/peer/client.go @@ -34,7 +34,6 @@ type Client struct { peersMtx sync.Mutex peers merkle.Tree // addr -> *Peer quit chan struct{} - erroredPeers chan peerError stopped uint32 } @@ -64,7 +63,6 @@ func NewClient(makePeerFn func(*Connection) *Peer) *Client { pktRecvQueues: pktRecvQueues, peers: merkle.NewIAVLTree(nil), quit: make(chan struct{}), - erroredPeers: make(chan peerError), stopped: 0, } @@ -75,8 +73,9 @@ func NewClient(makePeerFn func(*Connection) *Peer) *Client { } func (c *Client) start() { - // Handle peer disconnects & errors - go c.peerErrorHandler() + // Handle PEX messages + // TODO: hmm + // go peerExchangeHandler(c) } func (c *Client) Stop() { @@ -110,7 +109,7 @@ func (c *Client) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer, return nil, err } - go peer.start(c.pktRecvQueues, c.erroredPeers) + go peer.start(c.pktRecvQueues, c.StopPeerForError) return peer, nil } @@ -123,7 +122,7 @@ func (c *Client) Broadcast(pkt Packet) (numSuccess, numFailure int) { log.Tracef("Broadcast on [%v] len: %v", pkt.Channel, len(pkt.Bytes)) for v := range c.peers.Values() { peer := v.(*Peer) - success := peer.TrySend(pkt) + success := peer.TryQueue(pkt) log.Tracef("Broadcast for peer %v success: %v", peer, success) if success { numSuccess += 1 @@ -165,7 +164,17 @@ func (c *Client) Peers() merkle.Tree { // unlock deferred } -func (c *Client) StopPeer(peer *Peer) { +// Disconnect from a peer due to external error. +// TODO: make record depending on reason. +func (c *Client) StopPeerForError(peer *Peer, reason interface{}) { + log.Infof("%v errored: %v", peer, reason) + c.StopPeer(peer, false) +} + +// Disconnect from a peer. +// If graceful is true, last message sent is a disconnect message. +// TODO: handle graceful disconnects. +func (c *Client) StopPeer(peer *Peer, graceful bool) { // lock c.peersMtx.Lock() peerValue, _ := c.peers.Remove(peer.RemoteAddress()) @@ -198,17 +207,3 @@ func (c *Client) addPeer(peer *Peer) error { } // unlock deferred } - -func (c *Client) peerErrorHandler() { - for { - select { - case <-c.quit: - return - case errPeer := <-c.erroredPeers: - log.Infof("%v errored: %v", errPeer.peer, errPeer.err) - // TODO: do more - c.StopPeer(errPeer.peer) - return - } - } -} diff --git a/peer/peer.go b/peer/peer.go index 7f8e41373..f5c71626f 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -29,14 +29,14 @@ func NewPeer(conn *Connection, channels map[String]*Channel) *Peer { } } -func (p *Peer) start(pktRecvQueues map[String]chan *InboundPacket, erroredPeers chan peerError) { +func (p *Peer) start(pktRecvQueues map[String]chan *InboundPacket, onPeerError func(*Peer, interface{})) { log.Debugf("Starting %v", p) if atomic.CompareAndSwapUint32(&p.started, 0, 1) { // on connection error onError := func(r interface{}) { p.stop() - erroredPeers <- peerError{p, r} + onPeerError(p, r) } p.conn.Start(p.channels, onError) for chName, _ := range p.channels { @@ -67,9 +67,9 @@ func (p *Peer) Channel(chName String) *Channel { return p.channels[chName] } -// TrySend returns true if the packet was successfully queued. +// TryQueue returns true if the packet was successfully queued. // Returning true does not imply that the packet will be sent. -func (p *Peer) TrySend(pkt Packet) bool { +func (p *Peer) TryQueue(pkt Packet) bool { channel := p.Channel(pkt.Channel) sendQueue := channel.sendQueue @@ -227,10 +227,3 @@ type InboundPacket struct { Time Time Packet } - -/* Misc */ - -type peerError struct { - peer *Peer - err interface{} -} diff --git a/peer/pex.go b/peer/pex.go index dc0a0319d..a391f470d 100644 --- a/peer/pex.go +++ b/peer/pex.go @@ -1,22 +1,52 @@ package peer import ( - . "github.com/tendermint/tendermint/binary" + "bytes" + "errors" "io" + + . "github.com/tendermint/tendermint/binary" ) +var pexErrInvalidMessage = errors.New("Invalid PEX message") + const pexCh = "PEX" func peerExchangeHandler(c *Client) { for { - // inPkt := c.Receive(pexCh) // {Peer, Time, Packet} + inPkt := c.Receive(pexCh) // {Peer, Time, Packet} + if inPkt == nil { + // Client has stopped + break + } // decode message - - // if message is a peer request - - // if message is + msg := decodeMessage(inPkt.Bytes) + + switch msg.(type) { + case *pexRequestMessage: + // inPkt.Peer requested some peers. + // TODO: prevent abuse. + addrs := c.addrBook.GetSelection() + response := &pexResponseMessage{Addrs: addrs} + pkt := NewPacket(pexCh, BinaryBytes(response)) + queued := inPkt.Peer.TryQueue(pkt) + if !queued { + // ignore + } + case *pexResponseMessage: + // We received some peer addresses from inPkt.Peer. + // TODO: prevent abuse. + // (We don't want to get spammed with bad peers) + srcAddr := inPkt.Peer.RemoteAddress() + for _, addr := range msg.(*pexResponseMessage).Addrs { + c.addrBook.AddAddress(addr, srcAddr) + } + default: + // Bad peer. + c.StopPeerForError(inPkt.Peer, pexErrInvalidMessage) + } } // cleanup @@ -26,42 +56,58 @@ func peerExchangeHandler(c *Client) { /* Messages */ const ( - pexTypeRequest = Byte(0x00) - pexTypeResponse = Byte(0x01) + pexTypeUnknown = Byte(0x00) + pexTypeRequest = Byte(0x01) + pexTypeResponse = Byte(0x02) ) -func decodeMsg(bytes ByteSlice) (t Byte, msg Message) { - //return pexTypeRequest, nil - return pexTypeResponse, nil +// TODO: check for unnecessary extra bytes at the end. +func decodeMessage(bz ByteSlice) (msg Message) { + switch Byte(bz[0]) { + case pexTypeRequest: + return &pexRequestMessage{} + case pexTypeResponse: + return readPexResponseMessage(bytes.NewReader(bz[1:])) + default: + return nil + } } /* A response with peer addresses */ -type pexResponseMsg struct { +type pexRequestMessage struct { +} + +func (m *pexRequestMessage) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteOnto(pexTypeRequest, w, n, err) + return +} + +/* +A response with peer addresses +*/ +type pexResponseMessage struct { Addrs []*NetAddress } -func readPexResponseMsg(r io.Reader) *pexResponseMsg { +func readPexResponseMessage(r io.Reader) *pexResponseMessage { numAddrs := int(ReadUInt32(r)) addrs := []*NetAddress{} for i := 0; i < numAddrs; i++ { addr := ReadNetAddress(r) addrs = append(addrs, addr) } - return &pexResponseMsg{ + return &pexResponseMessage{ Addrs: addrs, } } -func (m *pexResponseMsg) WriteTo(w io.Writer) (n int64, err error) { +func (m *pexResponseMessage) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteOnto(pexTypeResponse, w, n, err) n, err = WriteOnto(UInt32(len(m.Addrs)), w, n, err) for _, addr := range m.Addrs { n, err = WriteOnto(addr, w, n, err) } return } - -func (m *pexResponseMsg) Type() string { - return "pexTypeResponse" -}