Ethan Buchman 40e93a5f9e | 7 years ago | |
---|---|---|
.. | ||
upnp | 7 years ago | |
CHANGELOG.md | 8 years ago | |
Dockerfile | 8 years ago | |
README.md | 7 years ago | |
addrbook.go | 7 years ago | |
addrbook_test.go | 7 years ago | |
conn_go110.go | 7 years ago | |
conn_notgo110.go | 7 years ago | |
connection.go | 7 years ago | |
connection_test.go | 7 years ago | |
fuzz.go | 8 years ago | |
listener.go | 7 years ago | |
listener_test.go | 8 years ago | |
netaddress.go | 8 years ago | |
netaddress_test.go | 7 years ago | |
peer.go | 7 years ago | |
peer_set.go | 7 years ago | |
peer_set_test.go | 7 years ago | |
peer_test.go | 7 years ago | |
pex_reactor.go | 7 years ago | |
pex_reactor_test.go | 7 years ago | |
secret_connection.go | 8 years ago | |
secret_connection_test.go | 8 years ago | |
switch.go | 7 years ago | |
switch_test.go | 7 years ago | |
types.go | 8 years ago | |
util.go | 8 years ago | |
version.go | 8 years ago |
tendermint/tendermint/p2p
tendermint/tendermint/p2p
provides an abstraction around peer-to-peer communication.
MConnection
is a multiplex connection:
multiplex noun a system or signal involving simultaneous transmission of several messages along a single channel of communication.
Each MConnection
handles message transmission on multiple abstract communication
Channel
s. Each channel has a globally unique byte id.
The byte id and the relative priorities of each Channel
are configured upon
initialization of the connection.
The MConnection
supports three packet types: Ping, Pong, and Msg.
The ping and pong messages consist of writing a single byte to the connection; 0x1 and 0x2, respectively
When we haven't received any messages on an MConnection
in a time pingTimeout
, we send a ping message.
When a ping is received on the MConnection
, a pong is sent in response.
If a pong is not received in sufficient time, the peer's score should be decremented (TODO).
Messages in channels are chopped into smaller msgPackets for multiplexing.
type msgPacket struct {
ChannelID byte
EOF byte // 1 means message ends here.
Bytes []byte
}
The msgPacket is serialized using go-wire, and prefixed with a 0x3.
The received Bytes
of a sequential set of packets are appended together
until a packet with EOF=1
is received, at which point the complete serialized message
is returned for processing by the corresponding channels onReceive
function.
Messages are sent from a single sendRoutine
, which loops over a select statement that results in the sending
of a ping, a pong, or a batch of data messages. The batch of data messages may include messages from multiple channels.
Message bytes are queued for sending in their respective channel, with each channel holding one unsent message at a time.
Messages are chosen for a batch one a time from the channel with the lowest ratio of recently sent bytes to channel priority.
There are two methods for sending messages:
func (m MConnection) Send(chID byte, msg interface{}) bool {}
func (m MConnection) TrySend(chID byte, msg interface{}) bool {}
Send(chID, msg)
is a blocking call that waits until msg
is successfully queued
for the channel with the given id byte chID
. The message msg
is serialized
using the tendermint/wire
submodule's WriteBinary()
reflection routine.
TrySend(chID, msg)
is a nonblocking call that returns false if the channel's
queue is full.
Send()
and TrySend()
are also exposed for each Peer
.
Each peer has one MConnection
instance, and includes other information such as whether the connection
was outbound, whether the connection should be recreated if it closes, various identity information about the node,
and other higher level thread-safe data used by the reactors.
The Switch
handles peer connections and exposes an API to receive incoming messages
on Reactors
. Each Reactor
is responsible for handling incoming messages of one
or more Channels
. So while sending outgoing messages is typically performed on the peer,
incoming messages are received on the reactor.
// Declare a MyReactor reactor that handles messages on MyChannelID.
type MyReactor struct{}
func (reactor MyReactor) GetChannels() []*ChannelDescriptor {
return []*ChannelDescriptor{ChannelDescriptor{ID:MyChannelID, Priority: 1}}
}
func (reactor MyReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {
r, n, err := bytes.NewBuffer(msgBytes), new(int64), new(error)
msgString := ReadString(r, n, err)
fmt.Println(msgString)
}
// Other Reactor methods omitted for brevity
...
switch := NewSwitch([]Reactor{MyReactor{}})
...
// Send a random message to all outbound connections
for _, peer := range switch.Peers().List() {
if peer.IsOutbound() {
peer.Send(MyChannelID, "Here's a random message")
}
}
A PEXReactor
reactor implementation is provided to automate peer discovery.
book := p2p.NewAddrBook(addrBookFilePath)
pexReactor := p2p.NewPEXReactor(book)
...
switch := NewSwitch([]Reactor{pexReactor, myReactor, ...})