Browse Source

working draft of peer

pull/9/head
Jae Kwon 11 years ago
parent
commit
534635aae9
9 changed files with 1203 additions and 68 deletions
  1. +62
    -49
      peer/addrbook.go
  2. +155
    -0
      peer/client.go
  3. +143
    -0
      peer/connection.go
  4. +16
    -0
      peer/connection_test.go
  5. +10
    -10
      peer/knownaddress.go
  6. +60
    -0
      peer/listener.go
  7. +1
    -1
      peer/log.go
  8. +41
    -8
      peer/netaddress.go
  9. +715
    -0
      peer/peer.go

peer/addrmanager.go → peer/addrbook.go View File


+ 155
- 0
peer/client.go View File

@ -0,0 +1,155 @@
package peer
import (
. "github.com/tendermint/tendermint/binary"
"github.com/tendermint/tendermint/merkle"
"sync"
"io"
)
/* Client */
type Client struct {
listener *Listener
addrBook AddrBook
strategies map[String]*FilterStrategy
targetNumPeers int
peersMtx sync.Mutex
peers merkle.Tree // addr -> *Peer
filtersMtx sync.Mutex
filters merkle.Tree // channelName -> Filter (objects that I know of)
}
func NewClient(protocol string, laddr string) *Client {
// XXX set the handler
listener := NewListener(protocol, laddr, nil)
c := &Client{
listener: listener,
peers: merkle.NewIAVLTree(nil),
filters: merkle.NewIAVLTree(nil),
}
return c
}
func (c *Client) Start() (<-chan *IncomingMsg) {
return nil
}
func (c *Client) Stop() {
c.listener.Close()
}
func (c *Client) LocalAddress() *NetAddress {
return c.listener.LocalAddress()
}
func (c *Client) ConnectTo(addr *NetAddress) (*Peer, error) {
conn, err := addr.Dial()
if err != nil { return nil, err }
peer := NewPeer(conn)
// lock
c.peersMtx.Lock()
c.peers.Put(addr, peer)
c.peersMtx.Unlock()
// unlock
return peer, nil
}
func (c *Client) Broadcast(channel String, msg Binary) {
for v := range c.peersCopy().Values() {
peer, ok := v.(*Peer)
if !ok { panic("Expected peer but got something else") }
peer.Queue(channel, msg)
}
}
// Updates the client's filter for a channel & broadcasts it.
func (c *Client) UpdateFilter(channel String, filter Filter) {
c.filtersMtx.Lock()
c.filters.Put(channel, filter)
c.filtersMtx.Unlock()
c.Broadcast("", &NewFilterMsg{
Channel: channel,
Filter: filter,
})
}
func (c *Client) peersCopy() merkle.Tree {
c.peersMtx.Lock(); defer c.peersMtx.Unlock()
return c.peers.Copy()
}
/* Channel */
type Channel struct {
Name String
Filter Filter
//Stats Stats
}
/* Peer */
type Peer struct {
Conn *Connection
Channels map[String]*Channel
}
func NewPeer(conn *Connection) *Peer {
return &Peer{
Conn: conn,
Channels: nil,
}
}
// Must be quick and nonblocking.
func (p *Peer) Queue(channel String, msg Binary) {}
func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
return 0, nil // TODO
}
/* IncomingMsg */
type IncomingMsg struct {
SPeer *Peer
SChan *Channel
Time Time
Msg Binary
}
/* Filter
A Filter could be a bloom filter for lossy filtering, or could be a lossless filter.
Either way, it's used to keep track of what a peer knows of.
*/
type Filter interface {
Binary
Add(ByteSlice)
Has(ByteSlice) bool
}
/* FilterStrategy
Defines how filters are generated per peer, and whether they need to get refreshed occasionally.
*/
type FilterStrategy interface {
LoadFilter(ByteSlice) Filter
}
/* NewFilterMsg */
type NewFilterMsg struct {
Channel String
Filter Filter
}
func (m *NewFilterMsg) WriteTo(w io.Writer) (int64, error) {
return 0, nil // TODO
}

+ 143
- 0
peer/connection.go View File

@ -0,0 +1,143 @@
package peer
import (
. "github.com/tendermint/tendermint/common"
. "github.com/tendermint/tendermint/binary"
"sync"
"net"
"runtime"
"fmt"
"time"
)
const (
OUT_QUEUE_SIZE = 50
IDLE_TIMEOUT_MINUTES = 5
PING_TIMEOUT_MINUTES = 2
)
/* Connnection */
type Connection struct {
ioStats IOStats
mtx sync.Mutex
outQueue chan ByteSlice
conn net.Conn
quit chan struct{}
disconnected bool
pingDebouncer *Debouncer
pong chan struct{}
}
var (
PACKET_TYPE_PING = UInt8(0x00)
PACKET_TYPE_PONG = UInt8(0x01)
PACKET_TYPE_MSG = UInt8(0x10)
)
func NewConnection(conn net.Conn) *Connection {
return &Connection{
outQueue: make(chan ByteSlice, OUT_QUEUE_SIZE),
conn: conn,
quit: make(chan struct{}),
pingDebouncer: NewDebouncer(PING_TIMEOUT_MINUTES * time.Minute),
pong: make(chan struct{}),
}
}
func (c *Connection) QueueMessage(msg ByteSlice) bool {
c.mtx.Lock(); defer c.mtx.Unlock()
if c.disconnected { return false }
select {
case c.outQueue <- msg:
return true
default: // buffer full
return false
}
}
func (c *Connection) Start() {
go c.outHandler()
go c.inHandler()
}
func (c *Connection) Disconnect() {
c.mtx.Lock(); defer c.mtx.Unlock()
close(c.quit)
c.conn.Close()
c.pingDebouncer.Stop()
// do not close c.pong
c.disconnected = true
}
func (c *Connection) flush() {
// TODO flush? (turn off nagel, turn back on, etc)
}
func (c *Connection) outHandler() {
FOR_LOOP:
for {
select {
case <-c.pingDebouncer.Ch:
PACKET_TYPE_PING.WriteTo(c.conn)
case outMsg := <-c.outQueue:
_, err := outMsg.WriteTo(c.conn)
if err != nil { Panicf("TODO: handle error %v", err) }
case <-c.pong:
PACKET_TYPE_PONG.WriteTo(c.conn)
case <-c.quit:
break FOR_LOOP
}
c.flush()
}
// cleanup
for _ = range c.outQueue {
// do nothing but drain.
}
}
func (c *Connection) inHandler() {
defer func() {
if e := recover(); e != nil {
// Get stack trace
buf := make([]byte, 1<<16)
runtime.Stack(buf, false)
// TODO do proper logging
fmt.Printf("Disconnecting due to error:\n\n%v\n", string(buf))
c.Disconnect()
}
}()
//FOR_LOOP:
for {
msgType := ReadUInt8(c.conn)
switch msgType {
case PACKET_TYPE_PING:
c.pong <- struct{}{}
case PACKET_TYPE_PONG:
// do nothing
case PACKET_TYPE_MSG:
ReadByteSlice(c.conn)
default:
Panicf("Unknown message type %v", msgType)
}
c.pingDebouncer.Reset()
}
}
/* IOStats */
type IOStats struct {
TimeConnected Time
LastSent Time
LastRecv Time
BytesRecv UInt64
BytesSent UInt64
MsgsRecv UInt64
MsgsSent UInt64
}

+ 16
- 0
peer/connection_test.go View File

@ -0,0 +1,16 @@
package peer
import (
"testing"
)
func TestLocalConnection(t *testing.T) {
c1 := NewClient("tcp", ":8080")
c2 := NewClient("tcp", ":8081")
c1.ConnectTo(c2.LocalAddress())
c1.Stop()
c2.Stop()
}

+ 10
- 10
peer/knownaddress.go View File

@ -16,8 +16,8 @@ type KnownAddress struct {
Addr *NetAddress
Src *NetAddress
Attempts UInt32
LastAttempt UInt64
LastSuccess UInt64
LastAttempt Time
LastSuccess Time
NewRefs UInt16
OldBucket Int16 // TODO init to -1
}
@ -27,7 +27,7 @@ func NewKnownAddress(addr *NetAddress, src *NetAddress) *KnownAddress {
Addr: addr,
Src: src,
OldBucket: -1,
LastAttempt: UInt64(time.Now().Unix()),
LastAttempt: Time{time.Now()},
Attempts: 0,
}
}
@ -37,8 +37,8 @@ func ReadKnownAddress(r io.Reader) *KnownAddress {
Addr: ReadNetAddress(r),
Src: ReadNetAddress(r),
Attempts: ReadUInt32(r),
LastAttempt: ReadUInt64(r),
LastSuccess: ReadUInt64(r),
LastAttempt: ReadTime(r),
LastSuccess: ReadTime(r),
NewRefs: ReadUInt16(r),
OldBucket: ReadInt16(r),
}
@ -56,7 +56,7 @@ func (ka *KnownAddress) WriteTo(w io.Writer) (n int64, err error) {
}
func (ka *KnownAddress) MarkAttempt(success bool) {
now := UInt64(time.Now().Unix())
now := Time{time.Now()}
ka.LastAttempt = now
if success {
ka.LastSuccess = now
@ -80,22 +80,22 @@ func (ka *KnownAddress) MarkAttempt(success bool) {
*/
func (ka *KnownAddress) Bad() bool {
// Has been attempted in the last minute --> good
if ka.LastAttempt < UInt64(time.Now().Add(-1 * time.Minute).Unix()) {
if ka.LastAttempt.Before(time.Now().Add(-1 * time.Minute)) {
return false
}
// Over a month old?
if ka.LastAttempt > UInt64(time.Now().Add(-1 * numMissingDays * time.Hour * 24).Unix()) {
if ka.LastAttempt.After(time.Now().Add(-1 * numMissingDays * time.Hour * 24)) {
return true
}
// Never succeeded?
if ka.LastSuccess == 0 && ka.Attempts >= numRetries {
if ka.LastSuccess.IsZero() && ka.Attempts >= numRetries {
return true
}
// Hasn't succeeded in too long?
if ka.LastSuccess < UInt64(time.Now().Add(-1*minBadDays*time.Hour*24).Unix()) &&
if ka.LastSuccess.Before(time.Now().Add(-1*minBadDays*time.Hour*24)) &&
ka.Attempts >= maxFailures {
return true
}


+ 60
- 0
peer/listener.go View File

@ -0,0 +1,60 @@
package peer
import (
"sync"
"net"
)
/* Listener */
type Listener struct {
listener net.Listener
handler func(net.Conn)
mtx sync.Mutex
closed bool
}
func NewListener(protocol string, laddr string, handler func(net.Conn)) *Listener {
ln, err := net.Listen(protocol, laddr)
if err != nil { panic(err) }
s := &Listener{
listener: ln,
handler: handler,
}
go s.listen()
return s
}
func (s *Listener) listen() {
for {
conn, err := s.listener.Accept()
if err != nil {
// lock & defer
s.mtx.Lock(); defer s.mtx.Unlock()
if s.closed {
return
} else {
panic(err)
}
// unlock (deferred)
}
go s.handler(conn)
}
}
func (s *Listener) LocalAddress() *NetAddress {
return NewNetAddress(s.listener.Addr())
}
func (s *Listener) Close() {
// lock
s.mtx.Lock()
s.closed = true
s.mtx.Unlock()
// unlock
s.listener.Close()
}

+ 1
- 1
peer/log.go View File

@ -4,4 +4,4 @@ import (
"github.com/tendermint/btclog"
)
var amgrLog = btclog.Disabled
var log = btclog.Disabled

+ 41
- 8
peer/netaddress.go View File

@ -18,6 +18,25 @@ type NetAddress struct {
Port UInt16
}
// TODO: socks proxies?
func NewNetAddress(addr net.Addr) *NetAddress {
tcpAddr, ok := addr.(*net.TCPAddr)
if !ok { panic("Only TCPAddrs are supported") }
ip := tcpAddr.IP
port := UInt16(tcpAddr.Port)
return NewNetAddressIPPort(ip, port)
}
func NewNetAddressString(addr string) *NetAddress {
host, portStr, err := net.SplitHostPort(addr)
if err != nil { panic(err) }
ip := net.ParseIP(host)
port, err := strconv.ParseUint(portStr, 10, 16)
if err != nil { panic(err) }
na := NewNetAddressIPPort(ip, UInt16(port))
return na
}
func NewNetAddressIPPort(ip net.IP, port UInt16) *NetAddress {
na := NetAddress{
IP: ip,
@ -26,14 +45,6 @@ func NewNetAddressIPPort(ip net.IP, port UInt16) *NetAddress {
return &na
}
func NewNetAddress(addr net.Addr) *NetAddress {
tcpAddr, ok := addr.(*net.TCPAddr)
if !ok { panic("addr is not a net.TCPAddr") }
na := NewNetAddressIPPort(tcpAddr.IP, UInt16(tcpAddr.Port))
return na
}
func ReadNetAddress(r io.Reader) *NetAddress {
return &NetAddress{
IP: net.IP(ReadByteSlice(r)),
@ -47,12 +58,34 @@ func (na *NetAddress) WriteTo(w io.Writer) (n int64, err error) {
return
}
func (na *NetAddress) Equals(other Binary) bool {
if o, ok := other.(*NetAddress); ok {
return na.String() == o.String()
} else {
return false
}
}
func (na *NetAddress) Less(other Binary) bool {
if o, ok := other.(*NetAddress); ok {
return na.String() < o.String()
} else {
panic("Cannot compare unequal types")
}
}
func (na *NetAddress) String() string {
port := strconv.FormatUint(uint64(na.Port), 10)
addr := net.JoinHostPort(na.IP.String(), port)
return addr
}
func (na *NetAddress) Dial() (*Connection, error) {
conn, err := net.Dial("tcp", na.String())
if err != nil { return nil, err }
return NewConnection(conn), nil
}
func (na *NetAddress) Routable() bool {
// TODO(oga) bitcoind doesn't include RFC3849 here, but should we?
return na.Valid() && !(na.RFC1918() || na.RFC3927() || na.RFC4862() ||


+ 715
- 0
peer/peer.go View File

@ -0,0 +1,715 @@
package peer
import (
"bytes"
"container/list"
"fmt"
"github.com/davecgh/go-spew/spew"
"github.com/tendermint/btcwire"
"net"
"strconv"
"sync"
"sync/atomic"
"time"
)
const (
// max protocol version the peer supports.
maxProtocolVersion = 70001
// number of elements the output channels use.
outputBufferSize = 50
// number of seconds of inactivity before we timeout a peer
// that hasn't completed the initial version negotiation.
negotiateTimeoutSeconds = 30
// number of minutes of inactivity before we time out a peer.
idleTimeoutMinutes = 5
// number of minutes since we last sent a message
// requiring a reply before we will ping a host.
pingTimeoutMinutes = 2
)
var (
userAgentName = "tendermintd"
userAgentVersion = fmt.Sprintf("%d.%d.%d", appMajor, appMinor, appPatch)
)
// zeroHash is the zero value hash (all zeros). It is defined as a convenience.
var zeroHash btcwire.ShaHash
// minUint32 is a helper function to return the minimum of two uint32s.
// This avoids a math import and the need to cast to floats.
func minUint32(a, b uint32) uint32 {
if a < b {
return a
}
return b
}
// TODO(davec): Rename and comment this
type outMsg struct {
msg btcwire.Message
doneChan chan bool
}
/*
The overall data flow is split into 2 goroutines.
Inbound messages are read via the inHandler goroutine and generally
dispatched to their own handler.
Outbound messages are queued via QueueMessage.
*/
type peer struct {
server *server
addr *NetAddress
inbound bool
persistent bool
started bool // atomic
quit chan bool
conn net.Conn
connMtx sync.Mutex
disconnected bool // atomic && protected by connMtx
knownAddresses map[string]bool
outputQueue chan outMsg
statMtx sync.Mutex // protects all below here.
protocolVersion uint32
timeConnected time.Time
lastSend time.Time
lastRecv time.Time
bytesReceived uint64
bytesSent uint64
userAgent string
lastPingNonce uint64 // Set to nonce if we have a pending ping.
lastPingTime time.Time // Time we sent last ping.
lastPingMicros int64 // Time for last ping to return.
}
// String returns the peer's address and directionality as a human-readable
// string.
func (p *peer) String() string {
return fmt.Sprintf("%s (%s)", p.addr.String(), directionString(p.inbound))
}
// VersionKnown returns the whether or not the version of a peer is known locally.
// It is safe for concurrent access.
func (p *peer) VersionKnown() bool {
p.statMtx.Lock(); defer p.statMtx.Unlock()
return p.protocolVersion != 0
}
// ProtocolVersion returns the peer protocol version in a manner that is safe
// for concurrent access.
func (p *peer) ProtocolVersion() uint32 {
p.statMtx.Lock(); defer p.statMtx.Unlock()
return p.protocolVersion
}
// pushVersionMsg sends a version message to the connected peer using the
// current state.
func (p *peer) pushVersionMsg() {
_, blockNum, err := p.server.db.NewestSha()
if err != nil { panic(err) }
// Version message.
// TODO: DisableListen -> send zero address
msg := btcwire.NewMsgVersion(
p.server.addrManager.getBestLocalAddress(p.addr), p.addr,
p.server.nonce, int32(blockNum))
msg.AddUserAgent(userAgentName, userAgentVersion)
// Advertise our max supported protocol version.
msg.ProtocolVersion = maxProtocolVersion
p.QueueMessage(msg, nil)
}
// handleVersionMsg is invoked when a peer receives a version bitcoin message
// and is used to negotiate the protocol version details as well as kick start
// the communications.
func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) {
// Detect self connections.
if msg.Nonce == p.server.nonce {
peerLog.Debugf("Disconnecting peer connected to self %s", p)
p.Disconnect()
return
}
p.statMtx.Lock() // Updating a bunch of stats.
// Limit to one version message per peer.
if p.protocolVersion != 0 {
p.logError("Only one version message per peer is allowed %s.", p)
p.statMtx.Unlock()
p.Disconnect()
return
}
// Negotiate the protocol version.
p.protocolVersion = minUint32(p.protocolVersion, uint32(msg.ProtocolVersion))
peerLog.Debugf("Negotiated protocol version %d for peer %s", p.protocolVersion, p)
// Set the remote peer's user agent.
p.userAgent = msg.UserAgent
p.statMtx.Unlock()
// Inbound connections.
if p.inbound {
// Send version.
p.pushVersionMsg()
}
// Send verack.
p.QueueMessage(btcwire.NewMsgVerAck(), nil)
if p.inbound {
// A peer might not be advertising the same address that it
// actually connected from. One example of why this can happen
// is with NAT. Only add the address to the address manager if
// the addresses agree.
if msg.AddrMe.String() == p.addr.String() {
p.server.addrManager.AddAddress(p.addr, p.addr)
}
} else {
// Request known addresses from the remote peer.
if !cfg.SimNet && p.server.addrManager.NeedMoreAddresses() {
p.QueueMessage(btcwire.NewMsgGetAddr(), nil)
}
}
// Mark the address as a known good address.
p.server.addrManager.MarkGood(p.addr)
// Signal the block manager this peer is a new sync candidate.
p.server.blockManager.NewPeer(p)
// TODO: Relay alerts.
}
// handleGetAddrMsg is invoked when a peer receives a getaddr bitcoin message
// and is used to provide the peer with known addresses from the address
// manager.
func (p *peer) handleGetAddrMsg(msg *btcwire.MsgGetAddr) {
// Don't return any addresses when running on the simulation test
// network. This helps prevent the network from becoming another
// public test network since it will not be able to learn about other
// peers that have not specifically been provided.
if cfg.SimNet {
return
}
// Get the current known addresses from the address manager.
addrCache := p.server.addrManager.AddressCache()
// Push the addresses.
p.pushAddrMsg(addrCache)
}
// pushAddrMsg sends one, or more, addr message(s) to the connected peer using
// the provided addresses.
func (p *peer) pushAddrMsg(addresses []*NetAddress) {
// Nothing to send.
if len(addresses) == 0 { return }
numAdded := 0
msg := btcwire.NewMsgAddr()
for _, addr := range addresses {
// Filter addresses the peer already knows about.
if p.knownAddresses[addr.String()] {
continue
}
// Add the address to the message.
err := msg.AddAddress(addr)
if err != nil { panic(err) } // XXX remove error condition
numAdded++
// Split into multiple messages as needed.
if numAdded > 0 && numAdded%btcwire.MaxAddrPerMsg == 0 {
p.QueueMessage(msg, nil)
// NOTE: This needs to be a new address message and not
// simply call ClearAddresses since the message is a
// pointer and queueing it does not make a copy.
msg = btcwire.NewMsgAddr()
}
}
// Send message with remaining addresses if needed.
if numAdded%btcwire.MaxAddrPerMsg != 0 {
p.QueueMessage(msg, nil)
}
}
// handleAddrMsg is invoked when a peer receives an addr bitcoin message and
// is used to notify the server about advertised addresses.
func (p *peer) handleAddrMsg(msg *btcwire.MsgAddr) {
// Ignore addresses when running on the simulation test network. This
// helps prevent the network from becoming another public test network
// since it will not be able to learn about other peers that have not
// specifically been provided.
if cfg.SimNet {
return
}
// A message that has no addresses is invalid.
if len(msg.AddrList) == 0 {
p.logError("Command [%s] from %s does not contain any addresses", msg.Command(), p)
p.Disconnect()
return
}
for _, addr := range msg.AddrList {
// Set the timestamp to 5 days ago if it's more than 24 hours
// in the future so this address is one of the first to be
// removed when space is needed.
now := time.Now()
if addr.Timestamp.After(now.Add(time.Minute * 10)) {
addr.Timestamp = now.Add(-1 * time.Hour * 24 * 5)
}
// Add address to known addresses for this peer.
p.knownAddresses[addr.String()] = true
}
// Add addresses to server address manager. The address manager handles
// the details of things such as preventing duplicate addresses, max
// addresses, and last seen updates.
// XXX bitcoind gives a 2 hour time penalty here, do we want to do the
// same?
p.server.addrManager.AddAddresses(msg.AddrList, p.addr)
}
func (p *peer) handlePingMsg(msg *btcwire.MsgPing) {
// Include nonce from ping so pong can be identified.
p.QueueMessage(btcwire.NewMsgPong(msg.Nonce), nil)
}
func (p *peer) handlePongMsg(msg *btcwire.MsgPong) {
p.statMtx.Lock(); defer p.statMtx.Unlock()
// Arguably we could use a buffered channel here sending data
// in a fifo manner whenever we send a ping, or a list keeping track of
// the times of each ping. For now we just make a best effort and
// only record stats if it was for the last ping sent. Any preceding
// and overlapping pings will be ignored. It is unlikely to occur
// without large usage of the ping rpc call since we ping
// infrequently enough that if they overlap we would have timed out
// the peer.
if p.lastPingNonce != 0 && msg.Nonce == p.lastPingNonce {
p.lastPingMicros = time.Now().Sub(p.lastPingTime).Nanoseconds()
p.lastPingMicros /= 1000 // convert to usec.
p.lastPingNonce = 0
}
}
// readMessage reads the next bitcoin message from the peer with logging.
func (p *peer) readMessage() (btcwire.Message, []byte, error) {
n, msg, buf, err := btcwire.ReadMessageN(p.conn, p.ProtocolVersion())
p.statMtx.Lock()
p.bytesReceived += uint64(n)
p.statMtx.Unlock()
p.server.AddBytesReceived(uint64(n))
if err != nil {
return nil, nil, err
}
// Use closures to log expensive operations so they are only run when
// the logging level requires it.
peerLog.Debugf("%v", newLogClosure(func() string {
// Debug summary of message.
summary := messageSummary(msg)
if len(summary) > 0 {
summary = " (" + summary + ")"
}
return fmt.Sprintf("Received %v%s from %s", msg.Command(), summary, p)
}))
peerLog.Tracef("%v", newLogClosure(func() string {
return spew.Sdump(msg)
}))
peerLog.Tracef("%v", newLogClosure(func() string {
return spew.Sdump(buf)
}))
return msg, buf, nil
}
// writeMessage sends a bitcoin Message to the peer with logging.
func (p *peer) writeMessage(msg btcwire.Message) {
if p.Disconnected() { return }
if !p.VersionKnown() {
switch msg.(type) {
case *btcwire.MsgVersion:
// This is OK.
default:
// We drop all messages other than version if we
// haven't done the handshake already.
return
}
}
// Use closures to log expensive operations so they are only run when
// the logging level requires it.
peerLog.Debugf("%v", newLogClosure(func() string {
// Debug summary of message.
summary := messageSummary(msg)
if len(summary) > 0 {
summary = " (" + summary + ")"
}
return fmt.Sprintf("Sending %v%s to %s", msg.Command(), summary, p)
}))
peerLog.Tracef("%v", newLogClosure(func() string {
return spew.Sdump(msg)
}))
peerLog.Tracef("%v", newLogClosure(func() string {
var buf bytes.Buffer
err := btcwire.WriteMessage(&buf, msg, p.ProtocolVersion())
if err != nil {
return err.Error()
}
return spew.Sdump(buf.Bytes())
}))
// Write the message to the peer.
n, err := btcwire.WriteMessageN(p.conn, msg, p.ProtocolVersion())
p.statMtx.Lock()
p.bytesSent += uint64(n)
p.statMtx.Unlock()
p.server.AddBytesSent(uint64(n))
if err != nil {
p.Disconnect()
p.logError("Can't send message to %s: %v", p, err)
return
}
}
// inHandler handles all incoming messages for the peer. It must be run as a
// goroutine.
func (p *peer) inHandler() {
// Peers must complete the initial version negotiation within a shorter
// timeframe than a general idle timeout. The timer is then reset below
// to idleTimeoutMinutes for all future messages.
idleTimer := time.AfterFunc(negotiateTimeoutSeconds*time.Second, func() {
if p.VersionKnown() {
peerLog.Warnf("Peer %s no answer for %d minutes, disconnecting", p, idleTimeoutMinutes)
}
p.Disconnect()
})
out:
for !p.Disconnected() {
rmsg, buf, err := p.readMessage()
// Stop the timer now, if we go around again we will reset it.
idleTimer.Stop()
if err != nil {
if !p.Disconnected() {
p.logError("Can't read message from %s: %v", p, err)
}
break out
}
p.statMtx.Lock()
p.lastRecv = time.Now()
p.statMtx.Unlock()
// Ensure version message comes first.
if _, ok := rmsg.(*btcwire.MsgVersion); !ok && !p.VersionKnown() {
p.logError("A version message must precede all others")
break out
}
// Handle each supported message type.
markGood := false
switch msg := rmsg.(type) {
case *btcwire.MsgVersion:
p.handleVersionMsg(msg)
case *btcwire.MsgVerAck:
// Do nothing.
case *btcwire.MsgGetAddr:
p.handleGetAddrMsg(msg)
case *btcwire.MsgAddr:
p.handleAddrMsg(msg)
markGood = true
case *btcwire.MsgPing:
p.handlePingMsg(msg)
markGood = true
case *btcwire.MsgPong:
p.handlePongMsg(msg)
case *btcwire.MsgAlert:
p.server.BroadcastMessage(msg, p)
case *btcwire.MsgNotFound:
// TODO(davec): Ignore this for now, but ultimately
// it should probably be used to detect when something
// we requested needs to be re-requested from another
// peer.
default:
peerLog.Debugf("Received unhandled message of type %v: Fix Me", rmsg.Command())
}
// Mark the address as currently connected and working as of
// now if one of the messages that trigger it was processed.
if markGood && !p.Disconnected() {
if p.addr == nil {
peerLog.Warnf("we're getting stuff before we got a version message. that's bad")
continue
}
p.server.addrManager.MarkGood(p.addr)
}
// ok we got a message, reset the timer.
// timer just calls p.Disconnect() after logging.
idleTimer.Reset(idleTimeoutMinutes * time.Minute)
}
idleTimer.Stop()
// Ensure connection is closed and notify the server that the peer is done.
p.Disconnect()
p.server.donePeers <- p
// Only tell block manager we are gone if we ever told it we existed.
if p.VersionKnown() {
p.server.blockManager.DonePeer(p)
}
peerLog.Tracef("Peer input handler done for %s", p)
}
// outHandler handles all outgoing messages for the peer. It must be run as a
// goroutine. It uses a buffered channel to serialize output messages while
// allowing the sender to continue running asynchronously.
func (p *peer) outHandler() {
pingTimer := time.AfterFunc(pingTimeoutMinutes*time.Minute, func() {
nonce, err := btcwire.RandomUint64()
if err != nil {
peerLog.Errorf("Not sending ping on timeout to %s: %v",
p, err)
return
}
p.QueueMessage(btcwire.NewMsgPing(nonce), nil)
})
out:
for {
select {
case msg := <-p.outputQueue:
// If the message is one we should get a reply for
// then reset the timer, we only want to send pings
// when otherwise we would not receive a reply from
// the peer.
peerLog.Tracef("%s: received from outputQueue", p)
reset := true
switch m := msg.msg.(type) {
case *btcwire.MsgVersion:
// should get an ack
case *btcwire.MsgGetAddr:
// should get addresses
case *btcwire.MsgPing:
// expects pong
// Also set up statistics.
p.statMtx.Lock()
p.lastPingNonce = m.Nonce
p.lastPingTime = time.Now()
p.statMtx.Unlock()
default:
// Not one of the above, no sure reply.
// We want to ping if nothing else
// interesting happens.
reset = false
}
if reset {
pingTimer.Reset(pingTimeoutMinutes * time.Minute)
}
p.writeMessage(msg.msg)
p.statMtx.Lock()
p.lastSend = time.Now()
p.statMtx.Unlock()
if msg.doneChan != nil {
msg.doneChan <- true
}
case <-p.quit:
break out
}
}
pingTimer.Stop()
// Drain outputQueue
for msg := range p.outputQueue {
if msg.doneChan != nil {
msg.doneChan <- false
}
}
peerLog.Tracef("Peer output handler done for %s", p)
}
// QueueMessage adds the passed bitcoin message to the peer outputQueue. It
// uses a buffered channel to communicate with the output handler goroutine so
// it is automatically rate limited and safe for concurrent access.
func (p *peer) QueueMessage(msg btcwire.Message, doneChan chan bool) {
// Avoid risk of deadlock if goroutine already exited. The goroutine
// we will be sending to hangs around until it knows for a fact that
// it is marked as disconnected. *then* it drains the channels.
if p.Disconnected() {
// avoid deadlock...
if doneChan != nil {
go func() {
doneChan <- false
}()
}
return
}
p.outputQueue <- outMsg{msg: msg, doneChan: doneChan}
}
// True if is (or will become) disconnected.
func (p *peer) Disconnected() bool {
return atomic.LoadInt32(&p.disconnected) == 1
}
// Disconnects the peer by closing the connection. It also sets
// a flag so the impending shutdown can be detected.
func (p *peer) Disconnect() {
p.connMtx.Lock(); defer p.connMtx.Unlock()
// did we win the race?
if atomic.AddInt32(&p.disconnected, 1) != 1 {
return
}
peerLog.Tracef("disconnecting %s", p)
close(p.quit)
if p.conn != nil {
p.conn.Close()
}
}
// Sets the connection & starts
func (p *peer) StartWithConnection(conn *net.Conn) {
p.connMtx.Lock(); defer p.connMtx.Unlock()
if p.conn != nil { panic("Conn already set") }
if atomic.LoadInt32(&p.disconnected) == 1 { return }
peerLog.Debugf("Connected to %s", conn.RemoteAddr())
p.timeConnected = time.Now()
p.conn = conn
p.Start()
}
// Start begins processing input and output messages. It also sends the initial
// version message for outbound connections to start the negotiation process.
func (p *peer) Start() error {
// Already started?
if atomic.AddInt32(&p.started, 1) != 1 {
return nil
}
peerLog.Tracef("Starting peer %s", p)
// Send an initial version message if this is an outbound connection.
if !p.inbound {
p.pushVersionMsg()
}
// Start processing input and output.
go p.inHandler()
go p.outHandler()
return nil
}
// Shutdown gracefully shuts down the peer by disconnecting it.
func (p *peer) Shutdown() {
peerLog.Tracef("Shutdown peer %s", p)
p.Disconnect()
}
// newPeerBase returns a new base peer for the provided server and inbound flag.
// This is used by the newInboundPeer and newOutboundPeer functions to perform
// base setup needed by both types of peers.
func newPeerBase(s *server, inbound bool) *peer {
p := peer{
server: s,
protocolVersion: maxProtocolVersion,
inbound: inbound,
knownAddresses: make(map[string]bool),
outputQueue: make(chan outMsg, outputBufferSize),
quit: make(chan bool),
}
return &p
}
// newPeer returns a new inbound bitcoin peer for the provided server and
// connection. Use Start to begin processing incoming and outgoing messages.
func newInboundPeer(s *server, conn net.Conn) *peer {
addr := NewNetAddress(conn.RemoteAddr())
// XXX What if p.addr doesn't match (to be) reported addr due to NAT?
s.addrManager.MarkAttempt(addr)
p := newPeerBase(s, true)
p.conn = conn
p.addr = addr
p.timeConnected = time.Now()
return p
}
// newOutbountPeer returns a new outbound bitcoin peer for the provided server and
// address and connects to it asynchronously. If the connection is successful
// then the peer will also be started.
func newOutboundPeer(s *server, addr *NetAddress, persistent bool) *peer {
p := newPeerBase(s, false)
p.addr = addr
p.persistent = persistent
go func() {
// Mark this as one attempt, regardless of # of reconnects.
s.addrManager.MarkAttempt(p.addr)
retryCount := 0
// Attempt to connect to the peer. If the connection fails and
// this is a persistent connection, retry after the retry
// interval.
for {
peerLog.Debugf("Attempting to connect to %s", addr)
conn, err := addr.Dial()
if err == nil {
p.StartWithConnection(conn)
return
} else {
retryCount++
peerLog.Debugf("Failed to connect to %s: %v", addr, err)
if !persistent {
p.server.donePeers <- p
return
}
scaledInterval := connectionRetryInterval.Nanoseconds() * retryCount / 2
scaledDuration := time.Duration(scaledInterval)
peerLog.Debugf("Retrying connection to %s in %s", addr, scaledDuration)
time.Sleep(scaledDuration)
continue
}
}
}()
return p
}
// logError makes sure that we only log errors loudly on user peers.
func (p *peer) logError(fmt string, args ...interface{}) {
if p.persistent {
peerLog.Errorf(fmt, args...)
} else {
peerLog.Debugf(fmt, args...)
}
}

Loading…
Cancel
Save