Browse Source

p2p: cleanup transport interface (#7071)

This is another batch of things to cleanup in the legacy P2P system.
pull/7079/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
5bf30bb049
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 52 additions and 699 deletions
  1. +1
    -1
      CHANGELOG_PENDING.md
  2. +2
    -15
      cmd/tendermint/commands/reset_priv_validator.go
  3. +0
    -2
      cmd/tendermint/commands/run_node.go
  4. +0
    -1
      cmd/tendermint/commands/testnet.go
  5. +8
    -97
      config/config.go
  6. +0
    -20
      config/config_test.go
  7. +0
    -50
      config/toml.go
  8. +0
    -133
      internal/p2p/conn/connection.go
  9. +5
    -27
      internal/p2p/conn/connection_test.go
  10. +0
    -82
      internal/p2p/conn_set.go
  11. +4
    -5
      internal/p2p/mock/peer.go
  12. +5
    -64
      internal/p2p/mocks/connection.go
  13. +0
    -15
      internal/p2p/mocks/peer.go
  14. +1
    -2
      internal/p2p/router.go
  15. +0
    -13
      internal/p2p/shim.go
  16. +1
    -26
      internal/p2p/transport.go
  17. +8
    -41
      internal/p2p/transport_mconn.go
  18. +0
    -1
      internal/p2p/transport_mconn_test.go
  19. +4
    -36
      internal/p2p/transport_memory.go
  20. +9
    -42
      internal/p2p/transport_test.go
  21. +0
    -1
      internal/p2p/types.go
  22. +0
    -4
      node/node.go
  23. +3
    -19
      node/setup.go
  24. +1
    -1
      rpc/client/helpers.go
  25. +0
    -1
      test/e2e/runner/setup.go

+ 1
- 1
CHANGELOG_PENDING.md View File

@ -17,7 +17,7 @@ Special thanks to external contributors on this release:
- P2P Protocol - P2P Protocol
- [p2p] \#7035 Remove legacy P2P routing implementation and - [p2p] \#7035 Remove legacy P2P routing implementation and
associated configuration (@tychoish)
associated configuration options (@tychoish)
- Go API - Go API


+ 2
- 15
cmd/tendermint/commands/reset_priv_validator.go View File

@ -37,7 +37,7 @@ var ResetPrivValidatorCmd = &cobra.Command{
// XXX: this is totally unsafe. // XXX: this is totally unsafe.
// it's only suitable for testnets. // it's only suitable for testnets.
func resetAll(cmd *cobra.Command, args []string) error { func resetAll(cmd *cobra.Command, args []string) error {
return ResetAll(config.DBDir(), config.P2P.AddrBookFile(), config.PrivValidator.KeyFile(),
return ResetAll(config.DBDir(), config.PrivValidator.KeyFile(),
config.PrivValidator.StateFile(), logger) config.PrivValidator.StateFile(), logger)
} }
@ -49,12 +49,7 @@ func resetPrivValidator(cmd *cobra.Command, args []string) error {
// ResetAll removes address book files plus all data, and resets the privValdiator data. // ResetAll removes address book files plus all data, and resets the privValdiator data.
// Exported so other CLI tools can use it. // Exported so other CLI tools can use it.
func ResetAll(dbDir, addrBookFile, privValKeyFile, privValStateFile string, logger log.Logger) error {
if keepAddrBook {
logger.Info("The address book remains intact")
} else {
removeAddrBook(addrBookFile, logger)
}
func ResetAll(dbDir, privValKeyFile, privValStateFile string, logger log.Logger) error {
if err := os.RemoveAll(dbDir); err == nil { if err := os.RemoveAll(dbDir); err == nil {
logger.Info("Removed all blockchain history", "dir", dbDir) logger.Info("Removed all blockchain history", "dir", dbDir)
} else { } else {
@ -87,11 +82,3 @@ func resetFilePV(privValKeyFile, privValStateFile string, logger log.Logger) err
} }
return nil return nil
} }
func removeAddrBook(addrBookFile string, logger log.Logger) {
if err := os.Remove(addrBookFile); err == nil {
logger.Info("Removed existing address book", "file", addrBookFile)
} else if !os.IsNotExist(err) {
logger.Info("Error removing address book", "file", addrBookFile, "err", err)
}
}

+ 0
- 2
cmd/tendermint/commands/run_node.go View File

@ -84,8 +84,6 @@ func AddNodeFlags(cmd *cobra.Command) {
"node listen address. (0.0.0.0:0 means any interface, any port)") "node listen address. (0.0.0.0:0 means any interface, any port)")
cmd.Flags().String("p2p.seeds", config.P2P.Seeds, "comma-delimited ID@host:port seed nodes") cmd.Flags().String("p2p.seeds", config.P2P.Seeds, "comma-delimited ID@host:port seed nodes")
cmd.Flags().String("p2p.persistent-peers", config.P2P.PersistentPeers, "comma-delimited ID@host:port persistent peers") cmd.Flags().String("p2p.persistent-peers", config.P2P.PersistentPeers, "comma-delimited ID@host:port persistent peers")
cmd.Flags().String("p2p.unconditional-peer-ids",
config.P2P.UnconditionalPeerIDs, "comma-delimited IDs of unconditional peers")
cmd.Flags().Bool("p2p.upnp", config.P2P.UPNP, "enable/disable UPNP port forwarding") cmd.Flags().Bool("p2p.upnp", config.P2P.UPNP, "enable/disable UPNP port forwarding")
cmd.Flags().Bool("p2p.pex", config.P2P.PexReactor, "enable/disable Peer-Exchange") cmd.Flags().Bool("p2p.pex", config.P2P.PexReactor, "enable/disable Peer-Exchange")
cmd.Flags().String("p2p.private-peer-ids", config.P2P.PrivatePeerIDs, "comma-delimited private peer IDs") cmd.Flags().String("p2p.private-peer-ids", config.P2P.PrivatePeerIDs, "comma-delimited private peer IDs")


+ 0
- 1
cmd/tendermint/commands/testnet.go View File

@ -226,7 +226,6 @@ func testnetFiles(cmd *cobra.Command, args []string) error {
for i := 0; i < nValidators+nNonValidators; i++ { for i := 0; i < nValidators+nNonValidators; i++ {
nodeDir := filepath.Join(outputDir, fmt.Sprintf("%s%d", nodeDirPrefix, i)) nodeDir := filepath.Join(outputDir, fmt.Sprintf("%s%d", nodeDirPrefix, i))
config.SetRoot(nodeDir) config.SetRoot(nodeDir)
config.P2P.AddrBookStrict = false
config.P2P.AllowDuplicateIP = true config.P2P.AllowDuplicateIP = true
if populatePersistentPeers { if populatePersistentPeers {
persistentPeersWithoutSelf := make([]string, 0) persistentPeersWithoutSelf := make([]string, 0)


+ 8
- 97
config/config.go View File

@ -53,16 +53,14 @@ var (
defaultPrivValKeyName = "priv_validator_key.json" defaultPrivValKeyName = "priv_validator_key.json"
defaultPrivValStateName = "priv_validator_state.json" defaultPrivValStateName = "priv_validator_state.json"
defaultNodeKeyName = "node_key.json"
defaultAddrBookName = "addrbook.json"
defaultNodeKeyName = "node_key.json"
defaultConfigFilePath = filepath.Join(defaultConfigDir, defaultConfigFileName) defaultConfigFilePath = filepath.Join(defaultConfigDir, defaultConfigFileName)
defaultGenesisJSONPath = filepath.Join(defaultConfigDir, defaultGenesisJSONName) defaultGenesisJSONPath = filepath.Join(defaultConfigDir, defaultGenesisJSONName)
defaultPrivValKeyPath = filepath.Join(defaultConfigDir, defaultPrivValKeyName) defaultPrivValKeyPath = filepath.Join(defaultConfigDir, defaultPrivValKeyName)
defaultPrivValStatePath = filepath.Join(defaultDataDir, defaultPrivValStateName) defaultPrivValStatePath = filepath.Join(defaultDataDir, defaultPrivValStateName)
defaultNodeKeyPath = filepath.Join(defaultConfigDir, defaultNodeKeyName)
defaultAddrBookPath = filepath.Join(defaultConfigDir, defaultAddrBookName)
defaultNodeKeyPath = filepath.Join(defaultConfigDir, defaultNodeKeyName)
) )
// Config defines the top level configuration for a Tendermint node // Config defines the top level configuration for a Tendermint node
@ -141,9 +139,6 @@ func (cfg *Config) ValidateBasic() error {
if err := cfg.RPC.ValidateBasic(); err != nil { if err := cfg.RPC.ValidateBasic(); err != nil {
return fmt.Errorf("error in [rpc] section: %w", err) return fmt.Errorf("error in [rpc] section: %w", err)
} }
if err := cfg.P2P.ValidateBasic(); err != nil {
return fmt.Errorf("error in [p2p] section: %w", err)
}
if err := cfg.Mempool.ValidateBasic(); err != nil { if err := cfg.Mempool.ValidateBasic(); err != nil {
return fmt.Errorf("error in [mempool] section: %w", err) return fmt.Errorf("error in [mempool] section: %w", err)
} }
@ -646,25 +641,6 @@ type P2PConfig struct { //nolint: maligned
// UPNP port forwarding // UPNP port forwarding
UPNP bool `mapstructure:"upnp"` UPNP bool `mapstructure:"upnp"`
// Path to address book
AddrBook string `mapstructure:"addr-book-file"`
// Set true for strict address routability rules
// Set false for private or local networks
AddrBookStrict bool `mapstructure:"addr-book-strict"`
// Maximum number of inbound peers
//
// TODO: Remove once p2p refactor is complete in favor of MaxConnections.
// ref: https://github.com/tendermint/tendermint/issues/5670
MaxNumInboundPeers int `mapstructure:"max-num-inbound-peers"`
// Maximum number of outbound peers to connect to, excluding persistent peers.
//
// TODO: Remove once p2p refactor is complete in favor of MaxConnections.
// ref: https://github.com/tendermint/tendermint/issues/5670
MaxNumOutboundPeers int `mapstructure:"max-num-outbound-peers"`
// MaxConnections defines the maximum number of connected peers (inbound and // MaxConnections defines the maximum number of connected peers (inbound and
// outbound). // outbound).
MaxConnections uint16 `mapstructure:"max-connections"` MaxConnections uint16 `mapstructure:"max-connections"`
@ -673,24 +649,6 @@ type P2PConfig struct { //nolint: maligned
// attempts per IP address. // attempts per IP address.
MaxIncomingConnectionAttempts uint `mapstructure:"max-incoming-connection-attempts"` MaxIncomingConnectionAttempts uint `mapstructure:"max-incoming-connection-attempts"`
// List of node IDs, to which a connection will be (re)established ignoring any existing limits
UnconditionalPeerIDs string `mapstructure:"unconditional-peer-ids"`
// Maximum pause when redialing a persistent peer (if zero, exponential backoff is used)
PersistentPeersMaxDialPeriod time.Duration `mapstructure:"persistent-peers-max-dial-period"`
// Time to wait before flushing messages out on the connection
FlushThrottleTimeout time.Duration `mapstructure:"flush-throttle-timeout"`
// Maximum size of a message packet payload, in bytes
MaxPacketMsgPayloadSize int `mapstructure:"max-packet-msg-payload-size"`
// Rate at which packets can be sent, in bytes/second
SendRate int64 `mapstructure:"send-rate"`
// Rate at which packets can be received, in bytes/second
RecvRate int64 `mapstructure:"recv-rate"`
// Set true to enable the peer-exchange reactor // Set true to enable the peer-exchange reactor
PexReactor bool `mapstructure:"pex"` PexReactor bool `mapstructure:"pex"`
@ -721,28 +679,14 @@ func DefaultP2PConfig() *P2PConfig {
ListenAddress: "tcp://0.0.0.0:26656", ListenAddress: "tcp://0.0.0.0:26656",
ExternalAddress: "", ExternalAddress: "",
UPNP: false, UPNP: false,
AddrBook: defaultAddrBookPath,
AddrBookStrict: true,
MaxNumInboundPeers: 40,
MaxNumOutboundPeers: 10,
MaxConnections: 64, MaxConnections: 64,
MaxIncomingConnectionAttempts: 100, MaxIncomingConnectionAttempts: 100,
PersistentPeersMaxDialPeriod: 0 * time.Second,
FlushThrottleTimeout: 100 * time.Millisecond,
// The MTU (Maximum Transmission Unit) for Ethernet is 1500 bytes.
// The IP header and the TCP header take up 20 bytes each at least (unless
// optional header fields are used) and thus the max for (non-Jumbo frame)
// Ethernet is 1500 - 20 -20 = 1460
// Source: https://stackoverflow.com/a/3074427/820520
MaxPacketMsgPayloadSize: 1400,
SendRate: 5120000, // 5 mB/s
RecvRate: 5120000, // 5 mB/s
PexReactor: true,
AllowDuplicateIP: false,
HandshakeTimeout: 20 * time.Second,
DialTimeout: 3 * time.Second,
TestDialFail: false,
QueueType: "priority",
PexReactor: true,
AllowDuplicateIP: false,
HandshakeTimeout: 20 * time.Second,
DialTimeout: 3 * time.Second,
TestDialFail: false,
QueueType: "priority",
} }
} }
@ -750,43 +694,10 @@ func DefaultP2PConfig() *P2PConfig {
func TestP2PConfig() *P2PConfig { func TestP2PConfig() *P2PConfig {
cfg := DefaultP2PConfig() cfg := DefaultP2PConfig()
cfg.ListenAddress = "tcp://127.0.0.1:36656" cfg.ListenAddress = "tcp://127.0.0.1:36656"
cfg.FlushThrottleTimeout = 10 * time.Millisecond
cfg.AllowDuplicateIP = true cfg.AllowDuplicateIP = true
return cfg return cfg
} }
// AddrBookFile returns the full path to the address book
func (cfg *P2PConfig) AddrBookFile() string {
return rootify(cfg.AddrBook, cfg.RootDir)
}
// ValidateBasic performs basic validation (checking param bounds, etc.) and
// returns an error if any check fails.
func (cfg *P2PConfig) ValidateBasic() error {
if cfg.MaxNumInboundPeers < 0 {
return errors.New("max-num-inbound-peers can't be negative")
}
if cfg.MaxNumOutboundPeers < 0 {
return errors.New("max-num-outbound-peers can't be negative")
}
if cfg.FlushThrottleTimeout < 0 {
return errors.New("flush-throttle-timeout can't be negative")
}
if cfg.PersistentPeersMaxDialPeriod < 0 {
return errors.New("persistent-peers-max-dial-period can't be negative")
}
if cfg.MaxPacketMsgPayloadSize < 0 {
return errors.New("max-packet-msg-payload-size can't be negative")
}
if cfg.SendRate < 0 {
return errors.New("send-rate can't be negative")
}
if cfg.RecvRate < 0 {
return errors.New("recv-rate can't be negative")
}
return nil
}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// MempoolConfig // MempoolConfig


+ 0
- 20
config/config_test.go View File

@ -82,26 +82,6 @@ func TestRPCConfigValidateBasic(t *testing.T) {
} }
} }
func TestP2PConfigValidateBasic(t *testing.T) {
cfg := TestP2PConfig()
assert.NoError(t, cfg.ValidateBasic())
fieldsToTest := []string{
"MaxNumInboundPeers",
"MaxNumOutboundPeers",
"FlushThrottleTimeout",
"MaxPacketMsgPayloadSize",
"SendRate",
"RecvRate",
}
for _, fieldName := range fieldsToTest {
reflect.ValueOf(cfg).Elem().FieldByName(fieldName).SetInt(-1)
assert.Error(t, cfg.ValidateBasic())
reflect.ValueOf(cfg).Elem().FieldByName(fieldName).SetInt(0)
}
}
func TestMempoolConfigValidateBasic(t *testing.T) { func TestMempoolConfigValidateBasic(t *testing.T) {
cfg := TestMempoolConfig() cfg := TestMempoolConfig()
assert.NoError(t, cfg.ValidateBasic()) assert.NoError(t, cfg.ValidateBasic())


+ 0
- 50
config/toml.go View File

@ -296,62 +296,12 @@ persistent-peers = "{{ .P2P.PersistentPeers }}"
# UPNP port forwarding # UPNP port forwarding
upnp = {{ .P2P.UPNP }} upnp = {{ .P2P.UPNP }}
# Path to address book
# TODO: Remove once p2p refactor is complete in favor of peer store.
addr-book-file = "{{ js .P2P.AddrBook }}"
# Set true for strict address routability rules
# Set false for private or local networks
addr-book-strict = {{ .P2P.AddrBookStrict }}
# Maximum number of inbound peers
#
# TODO: Remove once p2p refactor is complete in favor of MaxConnections.
# ref: https://github.com/tendermint/tendermint/issues/5670
max-num-inbound-peers = {{ .P2P.MaxNumInboundPeers }}
# Maximum number of outbound peers to connect to, excluding persistent peers
#
# TODO: Remove once p2p refactor is complete in favor of MaxConnections.
# ref: https://github.com/tendermint/tendermint/issues/5670
max-num-outbound-peers = {{ .P2P.MaxNumOutboundPeers }}
# Maximum number of connections (inbound and outbound). # Maximum number of connections (inbound and outbound).
max-connections = {{ .P2P.MaxConnections }} max-connections = {{ .P2P.MaxConnections }}
# Rate limits the number of incoming connection attempts per IP address. # Rate limits the number of incoming connection attempts per IP address.
max-incoming-connection-attempts = {{ .P2P.MaxIncomingConnectionAttempts }} max-incoming-connection-attempts = {{ .P2P.MaxIncomingConnectionAttempts }}
# List of node IDs, to which a connection will be (re)established ignoring any existing limits
# TODO: Remove once p2p refactor is complete.
# ref: https://github.com/tendermint/tendermint/issues/5670
unconditional-peer-ids = "{{ .P2P.UnconditionalPeerIDs }}"
# Maximum pause when redialing a persistent peer (if zero, exponential backoff is used)
# TODO: Remove once p2p refactor is complete
# ref: https:#github.com/tendermint/tendermint/issues/5670
persistent-peers-max-dial-period = "{{ .P2P.PersistentPeersMaxDialPeriod }}"
# Time to wait before flushing messages out on the connection
# TODO: Remove once p2p refactor is complete
# ref: https:#github.com/tendermint/tendermint/issues/5670
flush-throttle-timeout = "{{ .P2P.FlushThrottleTimeout }}"
# Maximum size of a message packet payload, in bytes
# TODO: Remove once p2p refactor is complete
# ref: https:#github.com/tendermint/tendermint/issues/5670
max-packet-msg-payload-size = {{ .P2P.MaxPacketMsgPayloadSize }}
# Rate at which packets can be sent, in bytes/second
# TODO: Remove once p2p refactor is complete
# ref: https:#github.com/tendermint/tendermint/issues/5670
send-rate = {{ .P2P.SendRate }}
# Rate at which packets can be received, in bytes/second
# TODO: Remove once p2p refactor is complete
# ref: https:#github.com/tendermint/tendermint/issues/5670
recv-rate = {{ .P2P.RecvRate }}
# Set true to enable the peer-exchange reactor # Set true to enable the peer-exchange reactor
pex = {{ .P2P.PexReactor }} pex = {{ .P2P.PexReactor }}


+ 0
- 133
internal/p2p/conn/connection.go View File

@ -64,15 +64,11 @@ initialization of the connection.
There are two methods for sending messages: There are two methods for sending messages:
func (m MConnection) Send(chID byte, msgBytes []byte) bool {} func (m MConnection) Send(chID byte, msgBytes []byte) bool {}
func (m MConnection) TrySend(chID byte, msgBytes []byte}) bool {}
`Send(chID, msgBytes)` is a blocking call that waits until `msg` is `Send(chID, msgBytes)` is a blocking call that waits until `msg` is
successfully queued for the channel with the given id byte `chID`, or until the successfully queued for the channel with the given id byte `chID`, or until the
request times out. The message `msg` is serialized using Protobuf. request times out. The message `msg` is serialized using Protobuf.
`TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
channel's queue is full.
Inbound message bytes are handled with an onReceive callback function. Inbound message bytes are handled with an onReceive callback function.
*/ */
type MConnection struct { type MConnection struct {
@ -265,43 +261,6 @@ func (c *MConnection) stopServices() (alreadyStopped bool) {
return false return false
} }
// FlushStop replicates the logic of OnStop.
// It additionally ensures that all successful
// .Send() calls will get flushed before closing
// the connection.
func (c *MConnection) FlushStop() {
if c.stopServices() {
return
}
// this block is unique to FlushStop
{
// wait until the sendRoutine exits
// so we dont race on calling sendSomePacketMsgs
<-c.doneSendRoutine
// Send and flush all pending msgs.
// Since sendRoutine has exited, we can call this
// safely
eof := c.sendSomePacketMsgs()
for !eof {
eof = c.sendSomePacketMsgs()
}
c.flush()
// Now we can close the connection
}
c.conn.Close()
// We can't close pong safely here because
// recvRoutine may write to it after we've stopped.
// Though it doesn't need to get closed at all,
// we close it @ recvRoutine.
// c.Stop()
}
// OnStop implements BaseService // OnStop implements BaseService
func (c *MConnection) OnStop() { func (c *MConnection) OnStop() {
if c.stopServices() { if c.stopServices() {
@ -375,49 +334,6 @@ func (c *MConnection) Send(chID byte, msgBytes []byte) bool {
return success return success
} }
// Queues a message to be sent to channel.
// Nonblocking, returns true if successful.
func (c *MConnection) TrySend(chID byte, msgBytes []byte) bool {
if !c.IsRunning() {
return false
}
c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msgBytes", msgBytes)
// Send message to channel.
channel, ok := c.channelsIdx[chID]
if !ok {
c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
return false
}
ok = channel.trySendBytes(msgBytes)
if ok {
// Wake up sendRoutine if necessary
select {
case c.send <- struct{}{}:
default:
}
}
return ok
}
// CanSend returns true if you can send more data onto the chID, false
// otherwise. Use only as a heuristic.
func (c *MConnection) CanSend(chID byte) bool {
if !c.IsRunning() {
return false
}
channel, ok := c.channelsIdx[chID]
if !ok {
c.Logger.Error(fmt.Sprintf("Unknown channel %X", chID))
return false
}
return channel.canSend()
}
// sendRoutine polls for packets to send from channels. // sendRoutine polls for packets to send from channels.
func (c *MConnection) sendRoutine() { func (c *MConnection) sendRoutine() {
defer c._recover() defer c._recover()
@ -682,13 +598,6 @@ func (c *MConnection) maxPacketMsgSize() int {
return len(bz) return len(bz)
} }
type ConnectionStatus struct {
Duration time.Duration
SendMonitor flow.Status
RecvMonitor flow.Status
Channels []ChannelStatus
}
type ChannelStatus struct { type ChannelStatus struct {
ID byte ID byte
SendQueueCapacity int SendQueueCapacity int
@ -697,24 +606,6 @@ type ChannelStatus struct {
RecentlySent int64 RecentlySent int64
} }
func (c *MConnection) Status() ConnectionStatus {
var status ConnectionStatus
status.Duration = time.Since(c.created)
status.SendMonitor = c.sendMonitor.Status()
status.RecvMonitor = c.recvMonitor.Status()
status.Channels = make([]ChannelStatus, len(c.channels))
for i, channel := range c.channels {
status.Channels[i] = ChannelStatus{
ID: channel.desc.ID,
SendQueueCapacity: cap(channel.sendQueue),
SendQueueSize: int(atomic.LoadInt32(&channel.sendQueueSize)),
Priority: channel.desc.Priority,
RecentlySent: atomic.LoadInt64(&channel.recentlySent),
}
}
return status
}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
type ChannelDescriptor struct { type ChannelDescriptor struct {
@ -800,30 +691,6 @@ func (ch *Channel) sendBytes(bytes []byte) bool {
} }
} }
// Queues message to send to this channel.
// Nonblocking, returns true if successful.
// Goroutine-safe
func (ch *Channel) trySendBytes(bytes []byte) bool {
select {
case ch.sendQueue <- bytes:
atomic.AddInt32(&ch.sendQueueSize, 1)
return true
default:
return false
}
}
// Goroutine-safe
func (ch *Channel) loadSendQueueSize() (size int) {
return int(atomic.LoadInt32(&ch.sendQueueSize))
}
// Goroutine-safe
// Use only as a heuristic.
func (ch *Channel) canSend() bool {
return ch.loadSendQueueSize() < defaultSendQueueCapacity
}
// Returns true if any PacketMsgs are pending to be sent. // Returns true if any PacketMsgs are pending to be sent.
// Call before calling nextPacketMsg() // Call before calling nextPacketMsg()
// Goroutine-safe // Goroutine-safe


+ 5
- 27
internal/p2p/conn/connection_test.go View File

@ -69,9 +69,6 @@ func TestMConnectionSendFlushStop(t *testing.T) {
errCh <- err errCh <- err
}() }()
// stop the conn - it should flush all conns
clientConn.FlushStop()
timer := time.NewTimer(3 * time.Second) timer := time.NewTimer(3 * time.Second)
select { select {
case <-errCh: case <-errCh:
@ -97,16 +94,14 @@ func TestMConnectionSend(t *testing.T) {
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
assert.True(t, mconn.CanSend(0x01))
msg = []byte("Spider-Man") msg = []byte("Spider-Man")
assert.True(t, mconn.TrySend(0x01, msg))
assert.True(t, mconn.Send(0x01, msg))
_, err = server.Read(make([]byte, len(msg))) _, err = server.Read(make([]byte, len(msg)))
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
assert.False(t, mconn.CanSend(0x05), "CanSend should return false because channel is unknown")
assert.False(t, mconn.Send(0x05, []byte("Absorbing Man")), "Send should return false because channel is unknown") assert.False(t, mconn.Send(0x05, []byte("Absorbing Man")), "Send should return false because channel is unknown")
} }
@ -145,20 +140,6 @@ func TestMConnectionReceive(t *testing.T) {
} }
} }
func TestMConnectionStatus(t *testing.T) {
server, client := NetPipe()
t.Cleanup(closeAll(t, client, server))
mconn := createTestMConnection(client)
err := mconn.Start()
require.Nil(t, err)
t.Cleanup(stopAll(t, mconn))
status := mconn.Status()
assert.NotNil(t, status)
assert.Zero(t, status.Channels[0].SendQueueSize)
}
func TestMConnectionPongTimeoutResultsInError(t *testing.T) { func TestMConnectionPongTimeoutResultsInError(t *testing.T) {
server, client := net.Pipe() server, client := net.Pipe()
t.Cleanup(closeAll(t, client, server)) t.Cleanup(closeAll(t, client, server))
@ -514,18 +495,15 @@ func TestMConnectionTrySend(t *testing.T) {
msg := []byte("Semicolon-Woman") msg := []byte("Semicolon-Woman")
resultCh := make(chan string, 2) resultCh := make(chan string, 2)
assert.True(t, mconn.TrySend(0x01, msg))
assert.True(t, mconn.Send(0x01, msg))
_, err = server.Read(make([]byte, len(msg))) _, err = server.Read(make([]byte, len(msg)))
require.NoError(t, err) require.NoError(t, err)
assert.True(t, mconn.CanSend(0x01))
assert.True(t, mconn.TrySend(0x01, msg))
assert.False(t, mconn.CanSend(0x01))
assert.True(t, mconn.Send(0x01, msg))
go func() { go func() {
mconn.TrySend(0x01, msg)
mconn.Send(0x01, msg)
resultCh <- "TrySend" resultCh <- "TrySend"
}() }()
assert.False(t, mconn.CanSend(0x01))
assert.False(t, mconn.TrySend(0x01, msg))
assert.False(t, mconn.Send(0x01, msg))
assert.Equal(t, "TrySend", <-resultCh) assert.Equal(t, "TrySend", <-resultCh)
} }


+ 0
- 82
internal/p2p/conn_set.go View File

@ -1,82 +0,0 @@
package p2p
import (
"net"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
)
// ConnSet is a lookup table for connections and all their ips.
type ConnSet interface {
Has(net.Conn) bool
HasIP(net.IP) bool
Set(net.Conn, []net.IP)
Remove(net.Conn)
RemoveAddr(net.Addr)
}
type connSetItem struct {
conn net.Conn
ips []net.IP
}
type connSet struct {
tmsync.RWMutex
conns map[string]connSetItem
}
// NewConnSet returns a ConnSet implementation.
func NewConnSet() ConnSet {
return &connSet{
conns: map[string]connSetItem{},
}
}
func (cs *connSet) Has(c net.Conn) bool {
cs.RLock()
defer cs.RUnlock()
_, ok := cs.conns[c.RemoteAddr().String()]
return ok
}
func (cs *connSet) HasIP(ip net.IP) bool {
cs.RLock()
defer cs.RUnlock()
for _, c := range cs.conns {
for _, known := range c.ips {
if known.Equal(ip) {
return true
}
}
}
return false
}
func (cs *connSet) Remove(c net.Conn) {
cs.Lock()
defer cs.Unlock()
delete(cs.conns, c.RemoteAddr().String())
}
func (cs *connSet) RemoveAddr(addr net.Addr) {
cs.Lock()
defer cs.Unlock()
delete(cs.conns, addr.String())
}
func (cs *connSet) Set(c net.Conn, ips []net.IP) {
cs.Lock()
defer cs.Unlock()
cs.conns[c.RemoteAddr().String()] = connSetItem{
conn: c,
ips: ips,
}
}

+ 4
- 5
internal/p2p/mock/peer.go View File

@ -4,7 +4,6 @@ import (
"net" "net"
"github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/conn"
"github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -51,10 +50,10 @@ func (mp *Peer) NodeInfo() types.NodeInfo {
ListenAddr: mp.addr.DialString(), ListenAddr: mp.addr.DialString(),
} }
} }
func (mp *Peer) Status() conn.ConnectionStatus { return conn.ConnectionStatus{} }
func (mp *Peer) ID() types.NodeID { return mp.id }
func (mp *Peer) IsOutbound() bool { return mp.Outbound }
func (mp *Peer) IsPersistent() bool { return mp.Persistent }
func (mp *Peer) ID() types.NodeID { return mp.id }
func (mp *Peer) IsOutbound() bool { return mp.Outbound }
func (mp *Peer) IsPersistent() bool { return mp.Persistent }
func (mp *Peer) Get(key string) interface{} { func (mp *Peer) Get(key string) interface{} {
if value, ok := mp.kv[key]; ok { if value, ok := mp.kv[key]; ok {
return value return value


+ 5
- 64
internal/p2p/mocks/connection.go View File

@ -5,11 +5,8 @@ package mocks
import ( import (
context "context" context "context"
conn "github.com/tendermint/tendermint/internal/p2p/conn"
crypto "github.com/tendermint/tendermint/crypto"
mock "github.com/stretchr/testify/mock" mock "github.com/stretchr/testify/mock"
crypto "github.com/tendermint/tendermint/crypto"
p2p "github.com/tendermint/tendermint/internal/p2p" p2p "github.com/tendermint/tendermint/internal/p2p"
@ -35,20 +32,6 @@ func (_m *Connection) Close() error {
return r0 return r0
} }
// FlushClose provides a mock function with given fields:
func (_m *Connection) FlushClose() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// Handshake provides a mock function with given fields: _a0, _a1, _a2 // Handshake provides a mock function with given fields: _a0, _a1, _a2
func (_m *Connection) Handshake(_a0 context.Context, _a1 types.NodeInfo, _a2 crypto.PrivKey) (types.NodeInfo, crypto.PubKey, error) { func (_m *Connection) Handshake(_a0 context.Context, _a1 types.NodeInfo, _a2 crypto.PrivKey) (types.NodeInfo, crypto.PubKey, error) {
ret := _m.Called(_a0, _a1, _a2) ret := _m.Called(_a0, _a1, _a2)
@ -138,35 +121,14 @@ func (_m *Connection) RemoteEndpoint() p2p.Endpoint {
} }
// SendMessage provides a mock function with given fields: _a0, _a1 // SendMessage provides a mock function with given fields: _a0, _a1
func (_m *Connection) SendMessage(_a0 p2p.ChannelID, _a1 []byte) (bool, error) {
func (_m *Connection) SendMessage(_a0 p2p.ChannelID, _a1 []byte) error {
ret := _m.Called(_a0, _a1) ret := _m.Called(_a0, _a1)
var r0 bool
if rf, ok := ret.Get(0).(func(p2p.ChannelID, []byte) bool); ok {
var r0 error
if rf, ok := ret.Get(0).(func(p2p.ChannelID, []byte) error); ok {
r0 = rf(_a0, _a1) r0 = rf(_a0, _a1)
} else { } else {
r0 = ret.Get(0).(bool)
}
var r1 error
if rf, ok := ret.Get(1).(func(p2p.ChannelID, []byte) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Status provides a mock function with given fields:
func (_m *Connection) Status() conn.ConnectionStatus {
ret := _m.Called()
var r0 conn.ConnectionStatus
if rf, ok := ret.Get(0).(func() conn.ConnectionStatus); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(conn.ConnectionStatus)
r0 = ret.Error(0)
} }
return r0 return r0
@ -185,24 +147,3 @@ func (_m *Connection) String() string {
return r0 return r0
} }
// TrySendMessage provides a mock function with given fields: _a0, _a1
func (_m *Connection) TrySendMessage(_a0 p2p.ChannelID, _a1 []byte) (bool, error) {
ret := _m.Called(_a0, _a1)
var r0 bool
if rf, ok := ret.Get(0).(func(p2p.ChannelID, []byte) bool); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Get(0).(bool)
}
var r1 error
if rf, ok := ret.Get(1).(func(p2p.ChannelID, []byte) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}

+ 0
- 15
internal/p2p/mocks/peer.go View File

@ -3,7 +3,6 @@
package mocks package mocks
import ( import (
conn "github.com/tendermint/tendermint/internal/p2p/conn"
log "github.com/tendermint/tendermint/libs/log" log "github.com/tendermint/tendermint/libs/log"
mock "github.com/stretchr/testify/mock" mock "github.com/stretchr/testify/mock"
@ -272,20 +271,6 @@ func (_m *Peer) Start() error {
return r0 return r0
} }
// Status provides a mock function with given fields:
func (_m *Peer) Status() conn.ConnectionStatus {
ret := _m.Called()
var r0 conn.ConnectionStatus
if rf, ok := ret.Get(0).(func() conn.ConnectionStatus); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(conn.ConnectionStatus)
}
return r0
}
// Stop provides a mock function with given fields: // Stop provides a mock function with given fields:
func (_m *Peer) Stop() error { func (_m *Peer) Stop() error {
ret := _m.Called() ret := _m.Called()


+ 1
- 2
internal/p2p/router.go View File

@ -960,8 +960,7 @@ func (r *Router) sendPeer(peerID types.NodeID, conn Connection, peerQueue queue)
continue continue
} }
_, err = conn.SendMessage(envelope.channelID, bz)
if err != nil {
if err = conn.SendMessage(envelope.channelID, bz); err != nil {
return err return err
} }


+ 0
- 13
internal/p2p/shim.go View File

@ -4,8 +4,6 @@ import (
"sort" "sort"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/p2p/conn"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
) )
@ -74,17 +72,6 @@ func NewChannelShim(cds *ChannelDescriptorShim, buf uint) *ChannelShim {
} }
} }
// MConnConfig returns an MConnConfig based on the defaults, with fields updated
// from the P2PConfig.
func MConnConfig(cfg *config.P2PConfig) conn.MConnConfig {
mConfig := conn.DefaultMConnConfig()
mConfig.FlushThrottle = cfg.FlushThrottleTimeout
mConfig.SendRate = cfg.SendRate
mConfig.RecvRate = cfg.RecvRate
mConfig.MaxPacketMsgPayloadSize = cfg.MaxPacketMsgPayloadSize
return mConfig
}
// GetChannels implements the legacy Reactor interface for getting a slice of all // GetChannels implements the legacy Reactor interface for getting a slice of all
// the supported ChannelDescriptors. // the supported ChannelDescriptors.
func (rs *ReactorShim) GetChannels() []*ChannelDescriptor { func (rs *ReactorShim) GetChannels() []*ChannelDescriptor {


+ 1
- 26
internal/p2p/transport.go View File

@ -7,7 +7,6 @@ import (
"net" "net"
"github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/internal/p2p/conn"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -82,19 +81,7 @@ type Connection interface {
ReceiveMessage() (ChannelID, []byte, error) ReceiveMessage() (ChannelID, []byte, error)
// SendMessage sends a message on the connection. Returns io.EOF if closed. // SendMessage sends a message on the connection. Returns io.EOF if closed.
//
// FIXME: For compatibility with the legacy P2P stack, it returns an
// additional boolean false if the message timed out waiting to be accepted
// into the send buffer. This should be removed.
SendMessage(ChannelID, []byte) (bool, error)
// TrySendMessage is a non-blocking version of SendMessage that returns
// immediately if the message buffer is full. It returns true if the message
// was accepted.
//
// FIXME: This method is here for backwards-compatibility with the legacy
// P2P stack and should be removed.
TrySendMessage(ChannelID, []byte) (bool, error)
SendMessage(ChannelID, []byte) error
// LocalEndpoint returns the local endpoint for the connection. // LocalEndpoint returns the local endpoint for the connection.
LocalEndpoint() Endpoint LocalEndpoint() Endpoint
@ -105,18 +92,6 @@ type Connection interface {
// Close closes the connection. // Close closes the connection.
Close() error Close() error
// FlushClose flushes all pending sends and then closes the connection.
//
// FIXME: This only exists for backwards-compatibility with the current
// MConnection implementation. There should really be a separate Flush()
// method, but there is no easy way to synchronously flush pending data with
// the current MConnection code.
FlushClose() error
// Status returns the current connection status.
// FIXME: Only here for compatibility with the current Peer code.
Status() conn.ConnectionStatus
// Stringer is used to display the connection, e.g. in logs. // Stringer is used to display the connection, e.g. in logs.
// //
// Without this, the logger may use reflection to access and display // Without this, the logger may use reflection to access and display


+ 8
- 41
internal/p2p/transport_mconn.go View File

@ -377,32 +377,21 @@ func (c *mConnConnection) String() string {
} }
// SendMessage implements Connection. // SendMessage implements Connection.
func (c *mConnConnection) SendMessage(chID ChannelID, msg []byte) (bool, error) {
func (c *mConnConnection) SendMessage(chID ChannelID, msg []byte) error {
if chID > math.MaxUint8 { if chID > math.MaxUint8 {
return false, fmt.Errorf("MConnection only supports 1-byte channel IDs (got %v)", chID)
return fmt.Errorf("MConnection only supports 1-byte channel IDs (got %v)", chID)
} }
select { select {
case err := <-c.errorCh: case err := <-c.errorCh:
return false, err
return err
case <-c.closeCh: case <-c.closeCh:
return false, io.EOF
return io.EOF
default: default:
return c.mconn.Send(byte(chID), msg), nil
}
}
if ok := c.mconn.Send(byte(chID), msg); !ok {
return errors.New("sending message timed out")
}
// TrySendMessage implements Connection.
func (c *mConnConnection) TrySendMessage(chID ChannelID, msg []byte) (bool, error) {
if chID > math.MaxUint8 {
return false, fmt.Errorf("MConnection only supports 1-byte channel IDs (got %v)", chID)
}
select {
case err := <-c.errorCh:
return false, err
case <-c.closeCh:
return false, io.EOF
default:
return c.mconn.TrySend(byte(chID), msg), nil
return nil
} }
} }
@ -442,14 +431,6 @@ func (c *mConnConnection) RemoteEndpoint() Endpoint {
return endpoint return endpoint
} }
// Status implements Connection.
func (c *mConnConnection) Status() conn.ConnectionStatus {
if c.mconn == nil {
return conn.ConnectionStatus{}
}
return c.mconn.Status()
}
// Close implements Connection. // Close implements Connection.
func (c *mConnConnection) Close() error { func (c *mConnConnection) Close() error {
var err error var err error
@ -463,17 +444,3 @@ func (c *mConnConnection) Close() error {
}) })
return err return err
} }
// FlushClose implements Connection.
func (c *mConnConnection) FlushClose() error {
var err error
c.closeOnce.Do(func() {
if c.mconn != nil && c.mconn.IsRunning() {
c.mconn.FlushStop()
} else {
err = c.conn.Close()
}
close(c.closeCh)
})
return err
}

+ 0
- 1
internal/p2p/transport_mconn_test.go View File

@ -195,7 +195,6 @@ func TestMConnTransport_Listen(t *testing.T) {
_ = conn.Close() _ = conn.Close()
<-dialedChan <-dialedChan
time.Sleep(time.Minute)
// closing the connection should not error // closing the connection should not error
require.NoError(t, peerConn.Close()) require.NoError(t, peerConn.Close())


+ 4
- 36
internal/p2p/transport_memory.go View File

@ -10,7 +10,6 @@ import (
"github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto"
tmsync "github.com/tendermint/tendermint/internal/libs/sync" tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/internal/p2p/conn"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -262,11 +261,6 @@ func (c *MemoryConnection) RemoteEndpoint() Endpoint {
} }
} }
// Status implements Connection.
func (c *MemoryConnection) Status() conn.ConnectionStatus {
return conn.ConnectionStatus{}
}
// Handshake implements Connection. // Handshake implements Connection.
func (c *MemoryConnection) Handshake( func (c *MemoryConnection) Handshake(
ctx context.Context, ctx context.Context,
@ -316,42 +310,21 @@ func (c *MemoryConnection) ReceiveMessage() (ChannelID, []byte, error) {
} }
// SendMessage implements Connection. // SendMessage implements Connection.
func (c *MemoryConnection) SendMessage(chID ChannelID, msg []byte) (bool, error) {
// Check close first, since channels are buffered. Otherwise, below select
// may non-deterministically return non-error even when closed.
select {
case <-c.closer.Done():
return false, io.EOF
default:
}
select {
case c.sendCh <- memoryMessage{channelID: chID, message: msg}:
c.logger.Debug("sent message", "chID", chID, "msg", msg)
return true, nil
case <-c.closer.Done():
return false, io.EOF
}
}
// TrySendMessage implements Connection.
func (c *MemoryConnection) TrySendMessage(chID ChannelID, msg []byte) (bool, error) {
func (c *MemoryConnection) SendMessage(chID ChannelID, msg []byte) error {
// Check close first, since channels are buffered. Otherwise, below select // Check close first, since channels are buffered. Otherwise, below select
// may non-deterministically return non-error even when closed. // may non-deterministically return non-error even when closed.
select { select {
case <-c.closer.Done(): case <-c.closer.Done():
return false, io.EOF
return io.EOF
default: default:
} }
select { select {
case c.sendCh <- memoryMessage{channelID: chID, message: msg}: case c.sendCh <- memoryMessage{channelID: chID, message: msg}:
c.logger.Debug("sent message", "chID", chID, "msg", msg) c.logger.Debug("sent message", "chID", chID, "msg", msg)
return true, nil
return nil
case <-c.closer.Done(): case <-c.closer.Done():
return false, io.EOF
default:
return false, nil
return io.EOF
} }
} }
@ -366,8 +339,3 @@ func (c *MemoryConnection) Close() error {
} }
return nil return nil
} }
// FlushClose implements Connection.
func (c *MemoryConnection) FlushClose() error {
return c.Close()
}

+ 9
- 42
internal/p2p/transport_test.go View File

@ -315,22 +315,16 @@ func TestConnection_FlushClose(t *testing.T) {
b := makeTransport(t) b := makeTransport(t)
ab, _ := dialAcceptHandshake(t, a, b) ab, _ := dialAcceptHandshake(t, a, b)
// FIXME: FlushClose should be removed (and replaced by separate Flush
// and Close calls if necessary). We can't reliably test it, so we just
// make sure it closes both ends and that it's idempotent.
err := ab.FlushClose()
err := ab.Close()
require.NoError(t, err) require.NoError(t, err)
_, _, err = ab.ReceiveMessage() _, _, err = ab.ReceiveMessage()
require.Error(t, err) require.Error(t, err)
require.Equal(t, io.EOF, err) require.Equal(t, io.EOF, err)
_, err = ab.SendMessage(chID, []byte("closed"))
err = ab.SendMessage(chID, []byte("closed"))
require.Error(t, err) require.Error(t, err)
require.Equal(t, io.EOF, err) require.Equal(t, io.EOF, err)
err = ab.FlushClose()
require.NoError(t, err)
}) })
} }
@ -355,9 +349,8 @@ func TestConnection_SendReceive(t *testing.T) {
ab, ba := dialAcceptHandshake(t, a, b) ab, ba := dialAcceptHandshake(t, a, b)
// Can send and receive a to b. // Can send and receive a to b.
ok, err := ab.SendMessage(chID, []byte("foo"))
err := ab.SendMessage(chID, []byte("foo"))
require.NoError(t, err) require.NoError(t, err)
require.True(t, ok)
ch, msg, err := ba.ReceiveMessage() ch, msg, err := ba.ReceiveMessage()
require.NoError(t, err) require.NoError(t, err)
@ -365,30 +358,20 @@ func TestConnection_SendReceive(t *testing.T) {
require.Equal(t, chID, ch) require.Equal(t, chID, ch)
// Can send and receive b to a. // Can send and receive b to a.
_, err = ba.SendMessage(chID, []byte("bar"))
err = ba.SendMessage(chID, []byte("bar"))
require.NoError(t, err) require.NoError(t, err)
_, msg, err = ab.ReceiveMessage() _, msg, err = ab.ReceiveMessage()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, []byte("bar"), msg) require.Equal(t, []byte("bar"), msg)
// TrySendMessage also works.
ok, err = ba.TrySendMessage(chID, []byte("try"))
require.NoError(t, err)
require.True(t, ok)
ch, msg, err = ab.ReceiveMessage()
require.NoError(t, err)
require.Equal(t, []byte("try"), msg)
require.Equal(t, chID, ch)
// Connections should still be active after closing the transports. // Connections should still be active after closing the transports.
err = a.Close() err = a.Close()
require.NoError(t, err) require.NoError(t, err)
err = b.Close() err = b.Close()
require.NoError(t, err) require.NoError(t, err)
_, err = ab.SendMessage(chID, []byte("still here"))
err = ab.SendMessage(chID, []byte("still here"))
require.NoError(t, err) require.NoError(t, err)
ch, msg, err = ba.ReceiveMessage() ch, msg, err = ba.ReceiveMessage()
require.NoError(t, err) require.NoError(t, err)
@ -403,37 +386,21 @@ func TestConnection_SendReceive(t *testing.T) {
_, _, err = ab.ReceiveMessage() _, _, err = ab.ReceiveMessage()
require.Error(t, err) require.Error(t, err)
require.Equal(t, io.EOF, err) require.Equal(t, io.EOF, err)
_, err = ab.TrySendMessage(chID, []byte("closed try"))
require.Error(t, err)
require.Equal(t, io.EOF, err)
_, err = ab.SendMessage(chID, []byte("closed"))
err = ab.SendMessage(chID, []byte("closed"))
require.Error(t, err) require.Error(t, err)
require.Equal(t, io.EOF, err) require.Equal(t, io.EOF, err)
_, _, err = ba.ReceiveMessage() _, _, err = ba.ReceiveMessage()
require.Error(t, err) require.Error(t, err)
require.Equal(t, io.EOF, err) require.Equal(t, io.EOF, err)
_, err = ba.TrySendMessage(chID, []byte("closed try"))
require.Error(t, err)
require.Equal(t, io.EOF, err)
_, err = ba.SendMessage(chID, []byte("closed"))
err = ba.SendMessage(chID, []byte("closed"))
require.Error(t, err) require.Error(t, err)
require.Equal(t, io.EOF, err) require.Equal(t, io.EOF, err)
}) })
} }
func TestConnection_Status(t *testing.T) {
withTransports(t, func(t *testing.T, makeTransport transportFactory) {
a := makeTransport(t)
b := makeTransport(t)
ab, _ := dialAcceptHandshake(t, a, b)
// FIXME: This isn't implemented in all transports, so for now we just
// check that it doesn't panic, which isn't really much of a test.
ab.Status()
})
}
func TestConnection_String(t *testing.T) { func TestConnection_String(t *testing.T) {
withTransports(t, func(t *testing.T, makeTransport transportFactory) { withTransports(t, func(t *testing.T, makeTransport transportFactory) {
a := makeTransport(t) a := makeTransport(t)


+ 0
- 1
internal/p2p/types.go View File

@ -5,4 +5,3 @@ import (
) )
type ChannelDescriptor = conn.ChannelDescriptor type ChannelDescriptor = conn.ChannelDescriptor
type ConnectionStatus = conn.ConnectionStatus

+ 0
- 4
node/node.go View File

@ -1043,10 +1043,6 @@ func getRouterConfig(conf *config.Config, proxyApp proxy.AppConns) p2p.RouterOpt
QueueType: conf.P2P.QueueType, QueueType: conf.P2P.QueueType,
} }
if conf.P2P.MaxNumInboundPeers > 0 {
opts.MaxIncomingConnectionAttempts = conf.P2P.MaxIncomingConnectionAttempts
}
if conf.FilterPeers && proxyApp != nil { if conf.FilterPeers && proxyApp != nil {
opts.FilterPeerByID = func(ctx context.Context, id types.NodeID) error { opts.FilterPeerByID = func(ctx context.Context, id types.NodeID) error {
res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{ res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{


+ 3
- 19
node/setup.go View File

@ -3,7 +3,6 @@ package node
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"math"
"time" "time"
dbm "github.com/tendermint/tm-db" dbm "github.com/tendermint/tm-db"
@ -18,6 +17,7 @@ import (
mempoolv0 "github.com/tendermint/tendermint/internal/mempool/v0" mempoolv0 "github.com/tendermint/tendermint/internal/mempool/v0"
mempoolv1 "github.com/tendermint/tendermint/internal/mempool/v1" mempoolv1 "github.com/tendermint/tendermint/internal/mempool/v1"
"github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/conn"
"github.com/tendermint/tendermint/internal/p2p/pex" "github.com/tendermint/tendermint/internal/p2p/pex"
"github.com/tendermint/tendermint/internal/proxy" "github.com/tendermint/tendermint/internal/proxy"
sm "github.com/tendermint/tendermint/internal/state" sm "github.com/tendermint/tendermint/internal/state"
@ -344,11 +344,9 @@ func createConsensusReactor(
func createTransport(logger log.Logger, cfg *config.Config) *p2p.MConnTransport { func createTransport(logger log.Logger, cfg *config.Config) *p2p.MConnTransport {
return p2p.NewMConnTransport( return p2p.NewMConnTransport(
logger, p2p.MConnConfig(cfg.P2P), []*p2p.ChannelDescriptor{},
logger, conn.DefaultMConnConfig(), []*p2p.ChannelDescriptor{},
p2p.MConnTransportOptions{ p2p.MConnTransportOptions{
MaxAcceptedConnections: uint32(cfg.P2P.MaxNumInboundPeers +
len(tmstrings.SplitAndTrimEmpty(cfg.P2P.UnconditionalPeerIDs, ",", " ")),
),
MaxAcceptedConnections: uint32(cfg.P2P.MaxConnections),
}, },
) )
} }
@ -365,20 +363,6 @@ func createPeerManager(
switch { switch {
case cfg.P2P.MaxConnections > 0: case cfg.P2P.MaxConnections > 0:
maxConns = cfg.P2P.MaxConnections maxConns = cfg.P2P.MaxConnections
case cfg.P2P.MaxNumInboundPeers > 0 && cfg.P2P.MaxNumOutboundPeers > 0:
x := cfg.P2P.MaxNumInboundPeers + cfg.P2P.MaxNumOutboundPeers
if x > math.MaxUint16 {
return nil, fmt.Errorf(
"max inbound peers (%d) + max outbound peers (%d) exceeds maximum (%d)",
cfg.P2P.MaxNumInboundPeers,
cfg.P2P.MaxNumOutboundPeers,
math.MaxUint16,
)
}
maxConns = uint16(x)
default: default:
maxConns = 64 maxConns = 64
} }


+ 1
- 1
rpc/client/helpers.go View File

@ -79,7 +79,7 @@ func WaitForOneEvent(c EventsClient, eventValue string, timeout time.Duration) (
select { select {
case event := <-eventCh: case event := <-eventCh:
return event.Data.(types.TMEventData), nil
return event.Data, nil
case <-ctx.Done(): case <-ctx.Done():
return nil, errors.New("timed out waiting for event") return nil, errors.New("timed out waiting for event")
} }


+ 0
- 1
test/e2e/runner/setup.go View File

@ -237,7 +237,6 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) {
cfg.RPC.ListenAddress = "tcp://0.0.0.0:26657" cfg.RPC.ListenAddress = "tcp://0.0.0.0:26657"
cfg.RPC.PprofListenAddress = ":6060" cfg.RPC.PprofListenAddress = ":6060"
cfg.P2P.ExternalAddress = fmt.Sprintf("tcp://%v", node.AddressP2P(false)) cfg.P2P.ExternalAddress = fmt.Sprintf("tcp://%v", node.AddressP2P(false))
cfg.P2P.AddrBookStrict = false
cfg.P2P.QueueType = node.QueueType cfg.P2P.QueueType = node.QueueType
cfg.DBBackend = node.Database cfg.DBBackend = node.Database
cfg.StateSync.DiscoveryTime = 5 * time.Second cfg.StateSync.DiscoveryTime = 5 * time.Second


Loading…
Cancel
Save