Browse Source

Merge branch 'v0.35.x' into wb/035-backport-ticker-startup

pull/8082/head
William Banfield 3 years ago
committed by GitHub
parent
commit
ff463b7e4f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 411 additions and 246 deletions
  1. +36
    -0
      .github/workflows/e2e-manual.yml
  2. +21
    -0
      CHANGELOG.md
  3. +1
    -5
      CHANGELOG_PENDING.md
  4. +15
    -22
      abci/client/socket_client.go
  5. +1
    -0
      docs/versions
  6. +49
    -39
      internal/p2p/conn/connection.go
  7. +68
    -67
      internal/p2p/conn/connection_test.go
  8. +3
    -4
      internal/p2p/p2ptest/network.go
  9. +7
    -4
      internal/p2p/p2ptest/require.go
  10. +22
    -13
      internal/p2p/peermanager.go
  11. +40
    -18
      internal/p2p/peermanager_test.go
  12. +12
    -7
      internal/p2p/router.go
  13. +1
    -2
      internal/p2p/shim.go
  14. +1
    -0
      internal/p2p/shim_test.go
  15. +1
    -1
      internal/proxy/app_conn.go
  16. +12
    -1
      internal/statesync/reactor.go
  17. +25
    -1
      internal/statesync/reactor_test.go
  18. +93
    -59
      internal/statesync/stateprovider.go
  19. +3
    -3
      node/setup.go

+ 36
- 0
.github/workflows/e2e-manual.yml View File

@ -0,0 +1,36 @@
# Manually run randomly generated E2E testnets (as nightly).
name: e2e-manual
on:
workflow_dispatch:
jobs:
e2e-nightly-test:
# Run parallel jobs for the listed testnet groups (must match the
# ./build/generator -g flag)
strategy:
fail-fast: false
matrix:
p2p: ['legacy', 'new', 'hybrid']
group: ['00', '01', '02', '03']
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
- uses: actions/setup-go@v2
with:
go-version: '1.17'
- uses: actions/checkout@v2.4.0
- name: Build
working-directory: test/e2e
# Run make jobs in parallel, since we can't run steps in parallel.
run: make -j2 docker generator runner tests
- name: Generate testnets
working-directory: test/e2e
# When changing -g, also change the matrix groups above
run: ./build/generator -g 4 -d networks/nightly/${{ matrix.p2p }} -p ${{ matrix.p2p }}
- name: Run ${{ matrix.p2p }} p2p testnets
working-directory: test/e2e
run: ./run-multiple.sh networks/nightly/${{ matrix.p2p }}/*-group${{ matrix.group }}-*.toml

+ 21
- 0
CHANGELOG.md View File

@ -2,6 +2,27 @@
Friendly reminder: We have a [bug bounty program](https://hackerone.com/cosmos). Friendly reminder: We have a [bug bounty program](https://hackerone.com/cosmos).
## v0.35.2
February 28, 2022
Special thanks to external contributors on this release: @ashcherbakov, @yihuang, @waelsy123
### IMPROVEMENTS
- [consensus] [\#7875](https://github.com/tendermint/tendermint/pull/7875) additional timing metrics. (@williambanfield)
### BUG FIXES
- [abci] [\#7990](https://github.com/tendermint/tendermint/pull/7990) revert buffer limit change. (@williambanfield)
- [cli] [#7837](https://github.com/tendermint/tendermint/pull/7837) fix app hash in state rollback. (@yihuang)
- [cli] [\#7869](https://github.com/tendermint/tendermint/pull/7869) Update unsafe-reset-all command to match release v35. (waelsy123)
- [light] [\#7640](https://github.com/tendermint/tendermint/pull/7640) Light Client: fix absence proof verification (@ashcherbakov)
- [light] [\#7641](https://github.com/tendermint/tendermint/pull/7641) Light Client: fix querying against the latest height (@ashcherbakov)
- [mempool] [\#7718](https://github.com/tendermint/tendermint/pull/7718) return duplicate tx errors more consistently. (@tychoish)
- [rpc] [\#7744](https://github.com/tendermint/tendermint/pull/7744) fix layout of endpoint list. (@creachadair)
- [statesync] [\#7886](https://github.com/tendermint/tendermint/pull/7886) assert app version matches. (@cmwaters)
## v0.35.1 ## v0.35.1
January 26, 2022 January 26, 2022


+ 1
- 5
CHANGELOG_PENDING.md View File

@ -2,7 +2,7 @@
Friendly reminder: We have a [bug bounty program](https://hackerone.com/cosmos). Friendly reminder: We have a [bug bounty program](https://hackerone.com/cosmos).
## vX.X
## v0.35.3
Month, DD, YYYY Month, DD, YYYY
@ -25,7 +25,3 @@ Special thanks to external contributors on this release:
### IMPROVEMENTS ### IMPROVEMENTS
### BUG FIXES ### BUG FIXES
- [light] \#7640 Light Client: fix absence proof verification (@ashcherbakov)
- [light] \#7641 Light Client: fix querying against the latest height (@ashcherbakov)
- [cli] [#7837](https://github.com/tendermint/tendermint/pull/7837) fix app hash in state rollback. (@yihuang)

+ 15
- 22
abci/client/socket_client.go View File

@ -279,7 +279,7 @@ func (cli *socketClient) ApplySnapshotChunkAsync(
//---------------------------------------- //----------------------------------------
func (cli *socketClient) FlushSync(ctx context.Context) error { func (cli *socketClient) FlushSync(ctx context.Context) error {
reqRes, err := cli.queueRequest(ctx, types.ToRequestFlush(), true)
reqRes, err := cli.queueRequest(ctx, types.ToRequestFlush())
if err != nil { if err != nil {
return queueErr(err) return queueErr(err)
} }
@ -448,29 +448,22 @@ func (cli *socketClient) ApplySnapshotChunkSync(
//---------------------------------------- //----------------------------------------
// queueRequest enqueues req onto the queue. If the queue is full, it ether
// returns an error (sync=false) or blocks (sync=true).
//
// When sync=true, ctx can be used to break early. When sync=false, ctx will be
// used later to determine if request should be dropped (if ctx.Err is
// non-nil).
// queueRequest enqueues req onto the queue. The request can break early if the
// the context is canceled. If the queue is full, this method blocks to allow
// the request to be placed onto the queue. This has the effect of creating an
// unbounded queue of goroutines waiting to write to this queue which is a bit
// antithetical to the purposes of a queue, however, undoing this behavior has
// dangerous upstream implications as a result of the usage of this behavior upstream.
// Remove at your peril.
// //
// The caller is responsible for checking cli.Error. // The caller is responsible for checking cli.Error.
func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request, sync bool) (*ReqRes, error) {
func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request) (*ReqRes, error) {
reqres := NewReqRes(req) reqres := NewReqRes(req)
if sync {
select {
case cli.reqQueue <- &reqResWithContext{R: reqres, C: context.Background()}:
case <-ctx.Done():
return nil, ctx.Err()
}
} else {
select {
case cli.reqQueue <- &reqResWithContext{R: reqres, C: ctx}:
default:
return nil, errors.New("buffer is full")
}
select {
case cli.reqQueue <- &reqResWithContext{R: reqres, C: ctx}:
case <-ctx.Done():
return nil, ctx.Err()
} }
return reqres, nil return reqres, nil
@ -481,7 +474,7 @@ func (cli *socketClient) queueRequestAsync(
req *types.Request, req *types.Request,
) (*ReqRes, error) { ) (*ReqRes, error) {
reqres, err := cli.queueRequest(ctx, req, false)
reqres, err := cli.queueRequest(ctx, req)
if err != nil { if err != nil {
return nil, queueErr(err) return nil, queueErr(err)
} }
@ -494,7 +487,7 @@ func (cli *socketClient) queueRequestAndFlushSync(
req *types.Request, req *types.Request,
) (*ReqRes, error) { ) (*ReqRes, error) {
reqres, err := cli.queueRequest(ctx, req, true)
reqres, err := cli.queueRequest(ctx, req)
if err != nil { if err != nil {
return nil, queueErr(err) return nil, queueErr(err)
} }


+ 1
- 0
docs/versions View File

@ -1,3 +1,4 @@
master master
v0.33.x v0.33 v0.33.x v0.33
v0.34.x v0.34 v0.34.x v0.34
v0.35.x v0.35 v0.35.x v0.35

+ 49
- 39
internal/p2p/conn/connection.go View File

@ -9,6 +9,7 @@ import (
"net" "net"
"reflect" "reflect"
"runtime/debug" "runtime/debug"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -45,7 +46,7 @@ const (
defaultRecvRate = int64(512000) // 500KB/s defaultRecvRate = int64(512000) // 500KB/s
defaultSendTimeout = 10 * time.Second defaultSendTimeout = 10 * time.Second
defaultPingInterval = 60 * time.Second defaultPingInterval = 60 * time.Second
defaultPongTimeout = 45 * time.Second
defaultPongTimeout = 90 * time.Second
) )
type receiveCbFunc func(chID byte, msgBytes []byte) type receiveCbFunc func(chID byte, msgBytes []byte)
@ -108,8 +109,10 @@ type MConnection struct {
pingTimer *time.Ticker // send pings periodically pingTimer *time.Ticker // send pings periodically
// close conn if pong is not received in pongTimeout // close conn if pong is not received in pongTimeout
pongTimer *time.Timer
pongTimeoutCh chan bool // true - timeout, false - peer sent pong
lastMsgRecv struct {
sync.Mutex
at time.Time
}
chStatsTimer *time.Ticker // update channel stats periodically chStatsTimer *time.Ticker // update channel stats periodically
@ -171,10 +174,6 @@ func NewMConnectionWithConfig(
onError errorCbFunc, onError errorCbFunc,
config MConnConfig, config MConnConfig,
) *MConnection { ) *MConnection {
if config.PongTimeout >= config.PingInterval {
panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)")
}
mconn := &MConnection{ mconn := &MConnection{
conn: conn, conn: conn,
bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize), bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize),
@ -223,16 +222,28 @@ func (c *MConnection) OnStart() error {
} }
c.flushTimer = timer.NewThrottleTimer("flush", c.config.FlushThrottle) c.flushTimer = timer.NewThrottleTimer("flush", c.config.FlushThrottle)
c.pingTimer = time.NewTicker(c.config.PingInterval) c.pingTimer = time.NewTicker(c.config.PingInterval)
c.pongTimeoutCh = make(chan bool, 1)
c.chStatsTimer = time.NewTicker(updateStats) c.chStatsTimer = time.NewTicker(updateStats)
c.quitSendRoutine = make(chan struct{}) c.quitSendRoutine = make(chan struct{})
c.doneSendRoutine = make(chan struct{}) c.doneSendRoutine = make(chan struct{})
c.quitRecvRoutine = make(chan struct{}) c.quitRecvRoutine = make(chan struct{})
c.setRecvLastMsgAt(time.Now())
go c.sendRoutine() go c.sendRoutine()
go c.recvRoutine() go c.recvRoutine()
return nil return nil
} }
func (c *MConnection) setRecvLastMsgAt(t time.Time) {
c.lastMsgRecv.Lock()
defer c.lastMsgRecv.Unlock()
c.lastMsgRecv.at = t
}
func (c *MConnection) getLastMessageAt() time.Time {
c.lastMsgRecv.Lock()
defer c.lastMsgRecv.Unlock()
return c.lastMsgRecv.at
}
// stopServices stops the BaseService and timers and closes the quitSendRoutine. // stopServices stops the BaseService and timers and closes the quitSendRoutine.
// if the quitSendRoutine was already closed, it returns true, otherwise it returns false. // if the quitSendRoutine was already closed, it returns true, otherwise it returns false.
// It uses the stopMtx to ensure only one of FlushStop and OnStop can do this at a time. // It uses the stopMtx to ensure only one of FlushStop and OnStop can do this at a time.
@ -423,6 +434,8 @@ func (c *MConnection) sendRoutine() {
defer c._recover() defer c._recover()
protoWriter := protoio.NewDelimitedWriter(c.bufConnWriter) protoWriter := protoio.NewDelimitedWriter(c.bufConnWriter)
pongTimeout := time.NewTicker(c.config.PongTimeout)
defer pongTimeout.Stop()
FOR_LOOP: FOR_LOOP:
for { for {
var _n int var _n int
@ -445,21 +458,7 @@ FOR_LOOP:
break SELECTION break SELECTION
} }
c.sendMonitor.Update(_n) c.sendMonitor.Update(_n)
c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout)
c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() {
select {
case c.pongTimeoutCh <- true:
default:
}
})
c.flush() c.flush()
case timeout := <-c.pongTimeoutCh:
if timeout {
c.Logger.Debug("Pong timeout")
err = errors.New("pong timeout")
} else {
c.stopPongTimer()
}
case <-c.pong: case <-c.pong:
c.Logger.Debug("Send Pong") c.Logger.Debug("Send Pong")
_n, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{})) _n, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
@ -471,6 +470,14 @@ FOR_LOOP:
c.flush() c.flush()
case <-c.quitSendRoutine: case <-c.quitSendRoutine:
break FOR_LOOP break FOR_LOOP
case <-pongTimeout.C:
// the point of the pong timer is to check to
// see if we've seen a message recently, so we
// want to make sure that we escape this
// select statement on an interval to ensure
// that we avoid hanging on to dead
// connections for too long.
break SELECTION
case <-c.send: case <-c.send:
// Send some PacketMsgs // Send some PacketMsgs
eof := c.sendSomePacketMsgs() eof := c.sendSomePacketMsgs()
@ -483,18 +490,21 @@ FOR_LOOP:
} }
} }
if !c.IsRunning() {
break FOR_LOOP
if time.Since(c.getLastMessageAt()) > c.config.PongTimeout {
err = errors.New("pong timeout")
} }
if err != nil { if err != nil {
c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err) c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err)
c.stopForError(err) c.stopForError(err)
break FOR_LOOP break FOR_LOOP
} }
if !c.IsRunning() {
break FOR_LOOP
}
} }
// Cleanup // Cleanup
c.stopPongTimer()
close(c.doneSendRoutine) close(c.doneSendRoutine)
} }
@ -563,6 +573,14 @@ func (c *MConnection) recvRoutine() {
FOR_LOOP: FOR_LOOP:
for { for {
select {
case <-c.quitRecvRoutine:
break FOR_LOOP
case <-c.doneSendRoutine:
break FOR_LOOP
default:
}
// Block until .recvMonitor says we can read. // Block until .recvMonitor says we can read.
c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true) c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true)
@ -605,6 +623,9 @@ FOR_LOOP:
break FOR_LOOP break FOR_LOOP
} }
// record for pong/heartbeat
c.setRecvLastMsgAt(time.Now())
// Read more depending on packet type. // Read more depending on packet type.
switch pkt := packet.Sum.(type) { switch pkt := packet.Sum.(type) {
case *tmp2p.Packet_PacketPing: case *tmp2p.Packet_PacketPing:
@ -617,12 +638,9 @@ FOR_LOOP:
// never block // never block
} }
case *tmp2p.Packet_PacketPong: case *tmp2p.Packet_PacketPong:
c.Logger.Debug("Receive Pong")
select {
case c.pongTimeoutCh <- false:
default:
// never block
}
// do nothing, we updated the "last message
// received" timestamp above, so we can ignore
// this message
case *tmp2p.Packet_PacketMsg: case *tmp2p.Packet_PacketMsg:
channelID := byte(pkt.PacketMsg.ChannelID) channelID := byte(pkt.PacketMsg.ChannelID)
channel, ok := c.channelsIdx[channelID] channel, ok := c.channelsIdx[channelID]
@ -661,14 +679,6 @@ FOR_LOOP:
} }
} }
// not goroutine-safe
func (c *MConnection) stopPongTimer() {
if c.pongTimer != nil {
_ = c.pongTimer.Stop()
c.pongTimer = nil
}
}
// maxPacketMsgSize returns a maximum size of PacketMsg // maxPacketMsgSize returns a maximum size of PacketMsg
func (c *MConnection) maxPacketMsgSize() int { func (c *MConnection) maxPacketMsgSize() int {
bz, err := proto.Marshal(mustWrapPacket(&tmp2p.PacketMsg{ bz, err := proto.Marshal(mustWrapPacket(&tmp2p.PacketMsg{


+ 68
- 67
internal/p2p/conn/connection_test.go View File

@ -1,7 +1,9 @@
package conn package conn
import ( import (
"context"
"encoding/hex" "encoding/hex"
"io"
"net" "net"
"testing" "testing"
"time" "time"
@ -35,8 +37,8 @@ func createMConnectionWithCallbacks(
onError func(r interface{}), onError func(r interface{}),
) *MConnection { ) *MConnection {
cfg := DefaultMConnConfig() cfg := DefaultMConnConfig()
cfg.PingInterval = 90 * time.Millisecond
cfg.PongTimeout = 45 * time.Millisecond
cfg.PingInterval = 250 * time.Millisecond
cfg.PongTimeout = 500 * time.Millisecond
chDescs := []*ChannelDescriptor{{ID: 0x01, Priority: 1, SendQueueCapacity: 1}} chDescs := []*ChannelDescriptor{{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
c := NewMConnectionWithConfig(conn, chDescs, onReceive, onError, cfg) c := NewMConnectionWithConfig(conn, chDescs, onReceive, onError, cfg)
c.SetLogger(log.TestingLogger()) c.SetLogger(log.TestingLogger())
@ -159,41 +161,43 @@ func TestMConnectionStatus(t *testing.T) {
assert.Zero(t, status.Channels[0].SendQueueSize) assert.Zero(t, status.Channels[0].SendQueueSize)
} }
func TestMConnectionPongTimeoutResultsInError(t *testing.T) {
func TestMConnectionWillEventuallyTimeout(t *testing.T) {
server, client := net.Pipe() server, client := net.Pipe()
t.Cleanup(closeAll(t, client, server)) t.Cleanup(closeAll(t, client, server))
receivedCh := make(chan []byte)
errorsCh := make(chan interface{})
onReceive := func(chID byte, msgBytes []byte) {
receivedCh <- msgBytes
}
onError := func(r interface{}) {
errorsCh <- r
}
mconn := createMConnectionWithCallbacks(client, onReceive, onError)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mconn := createMConnectionWithCallbacks(client, nil, nil)
err := mconn.Start() err := mconn.Start()
require.Nil(t, err)
t.Cleanup(stopAll(t, mconn))
require.NoError(t, err)
require.True(t, mconn.IsRunning())
serverGotPing := make(chan struct{})
go func() { go func() {
// read ping
var pkt tmp2p.Packet
_, err := protoio.NewDelimitedReader(server, maxPingPongPacketSize).ReadMsg(&pkt)
require.NoError(t, err)
serverGotPing <- struct{}{}
// read the send buffer so that the send receive
// doesn't get blocked.
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
_, _ = io.ReadAll(server)
case <-ctx.Done():
return
}
}
}() }()
<-serverGotPing
pongTimerExpired := mconn.config.PongTimeout + 200*time.Millisecond
// wait for the send routine to die because it doesn't
select { select {
case msgBytes := <-receivedCh:
t.Fatalf("Expected error, but got %v", msgBytes)
case err := <-errorsCh:
assert.NotNil(t, err)
case <-time.After(pongTimerExpired):
t.Fatalf("Expected to receive error after %v", pongTimerExpired)
case <-mconn.doneSendRoutine:
require.True(t, time.Since(mconn.getLastMessageAt()) > mconn.config.PongTimeout,
"the connection state reflects that we've passed the pong timeout")
// since we hit the timeout, things should be shutdown
require.False(t, mconn.IsRunning())
case <-time.After(2 * mconn.config.PongTimeout):
t.Fatal("connection did not hit timeout", mconn.config.PongTimeout)
} }
} }
@ -226,19 +230,14 @@ func TestMConnectionMultiplePongsInTheBeginning(t *testing.T) {
_, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{})) _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
require.NoError(t, err) require.NoError(t, err)
serverGotPing := make(chan struct{})
go func() {
// read ping (one byte)
var packet tmp2p.Packet
_, err := protoio.NewDelimitedReader(server, maxPingPongPacketSize).ReadMsg(&packet)
require.NoError(t, err)
serverGotPing <- struct{}{}
// respond with pong
_, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
require.NoError(t, err)
}()
<-serverGotPing
// read ping (one byte)
var packet tmp2p.Packet
_, err = protoio.NewDelimitedReader(server, maxPingPongPacketSize).ReadMsg(&packet)
require.NoError(t, err)
// respond with pong
_, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
require.NoError(t, err)
pongTimerExpired := mconn.config.PongTimeout + 20*time.Millisecond pongTimerExpired := mconn.config.PongTimeout + 20*time.Millisecond
select { select {
@ -299,52 +298,54 @@ func TestMConnectionPingPongs(t *testing.T) {
// check that we are not leaking any go-routines // check that we are not leaking any go-routines
t.Cleanup(leaktest.CheckTimeout(t, 10*time.Second)) t.Cleanup(leaktest.CheckTimeout(t, 10*time.Second))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
server, client := net.Pipe() server, client := net.Pipe()
t.Cleanup(closeAll(t, client, server)) t.Cleanup(closeAll(t, client, server))
receivedCh := make(chan []byte) receivedCh := make(chan []byte)
errorsCh := make(chan interface{}) errorsCh := make(chan interface{})
onReceive := func(chID byte, msgBytes []byte) { onReceive := func(chID byte, msgBytes []byte) {
receivedCh <- msgBytes
select {
case <-ctx.Done():
case receivedCh <- msgBytes:
}
} }
onError := func(r interface{}) { onError := func(r interface{}) {
errorsCh <- r
select {
case errorsCh <- r:
case <-ctx.Done():
}
} }
mconn := createMConnectionWithCallbacks(client, onReceive, onError) mconn := createMConnectionWithCallbacks(client, onReceive, onError)
err := mconn.Start() err := mconn.Start()
require.Nil(t, err) require.Nil(t, err)
t.Cleanup(stopAll(t, mconn)) t.Cleanup(stopAll(t, mconn))
serverGotPing := make(chan struct{})
go func() {
protoReader := protoio.NewDelimitedReader(server, maxPingPongPacketSize)
protoWriter := protoio.NewDelimitedWriter(server)
var pkt tmp2p.PacketPing
protoReader := protoio.NewDelimitedReader(server, maxPingPongPacketSize)
protoWriter := protoio.NewDelimitedWriter(server)
var pkt tmp2p.PacketPing
// read ping
_, err = protoReader.ReadMsg(&pkt)
require.NoError(t, err)
serverGotPing <- struct{}{}
// read ping
_, err = protoReader.ReadMsg(&pkt)
require.NoError(t, err)
// respond with pong
_, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
require.NoError(t, err)
// respond with pong
_, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
require.NoError(t, err)
time.Sleep(mconn.config.PingInterval)
time.Sleep(mconn.config.PingInterval)
// read ping
_, err = protoReader.ReadMsg(&pkt)
require.NoError(t, err)
serverGotPing <- struct{}{}
// read ping
_, err = protoReader.ReadMsg(&pkt)
require.NoError(t, err)
// respond with pong
_, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
require.NoError(t, err)
}()
<-serverGotPing
<-serverGotPing
// respond with pong
_, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
require.NoError(t, err)
pongTimerExpired := (mconn.config.PongTimeout + 20*time.Millisecond) * 2
pongTimerExpired := (mconn.config.PongTimeout + 20*time.Millisecond) * 4
select { select {
case msgBytes := <-receivedCh: case msgBytes := <-receivedCh:
t.Fatalf("Expected no data, but got %v", msgBytes) t.Fatalf("Expected no data, but got %v", msgBytes)


+ 3
- 4
internal/p2p/p2ptest/network.go View File

@ -95,10 +95,8 @@ func (n *Network) Start(t *testing.T) {
select { select {
case peerUpdate := <-sourceSub.Updates(): case peerUpdate := <-sourceSub.Updates():
require.Equal(t, p2p.PeerUpdate{
NodeID: targetNode.NodeID,
Status: p2p.PeerStatusUp,
}, peerUpdate)
require.Equal(t, targetNode.NodeID, peerUpdate.NodeID)
require.Equal(t, p2p.PeerStatusUp, peerUpdate.Status)
case <-time.After(3 * time.Second): case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for peer", "%v dialing %v", require.Fail(t, "timed out waiting for peer", "%v dialing %v",
sourceNode.NodeID, targetNode.NodeID) sourceNode.NodeID, targetNode.NodeID)
@ -106,6 +104,7 @@ func (n *Network) Start(t *testing.T) {
select { select {
case peerUpdate := <-targetSub.Updates(): case peerUpdate := <-targetSub.Updates():
peerUpdate.Channels = nil
require.Equal(t, p2p.PeerUpdate{ require.Equal(t, p2p.PeerUpdate{
NodeID: sourceNode.NodeID, NodeID: sourceNode.NodeID,
Status: p2p.PeerStatusUp, Status: p2p.PeerStatusUp,


+ 7
- 4
internal/p2p/p2ptest/require.go View File

@ -120,11 +120,10 @@ func RequireUpdate(t *testing.T, peerUpdates *p2p.PeerUpdates, expect p2p.PeerUp
select { select {
case update := <-peerUpdates.Updates(): case update := <-peerUpdates.Updates():
require.Equal(t, expect, update, "peer update did not match")
require.Equal(t, expect.NodeID, update.NodeID, "node id did not match")
require.Equal(t, expect.Status, update.Status, "statuses did not match")
case <-peerUpdates.Done(): case <-peerUpdates.Done():
require.Fail(t, "peer updates subscription is closed") require.Fail(t, "peer updates subscription is closed")
case <-timer.C: case <-timer.C:
require.Fail(t, "timed out waiting for peer update", "expected %v", expect) require.Fail(t, "timed out waiting for peer update", "expected %v", expect)
} }
@ -142,7 +141,11 @@ func RequireUpdates(t *testing.T, peerUpdates *p2p.PeerUpdates, expect []p2p.Pee
case update := <-peerUpdates.Updates(): case update := <-peerUpdates.Updates():
actual = append(actual, update) actual = append(actual, update)
if len(actual) == len(expect) { if len(actual) == len(expect) {
require.Equal(t, expect, actual)
for idx := range expect {
require.Equal(t, expect[idx].NodeID, actual[idx].NodeID)
require.Equal(t, expect[idx].Status, actual[idx].Status)
}
return return
} }


+ 22
- 13
internal/p2p/peermanager.go View File

@ -47,8 +47,9 @@ const (
// PeerUpdate is a peer update event sent via PeerUpdates. // PeerUpdate is a peer update event sent via PeerUpdates.
type PeerUpdate struct { type PeerUpdate struct {
NodeID types.NodeID
Status PeerStatus
NodeID types.NodeID
Status PeerStatus
Channels ChannelIDSet
} }
// PeerUpdates is a peer update subscription with notifications about peer // PeerUpdates is a peer update subscription with notifications about peer
@ -697,19 +698,23 @@ func (m *PeerManager) Accepted(peerID types.NodeID) error {
return nil return nil
} }
// Ready marks a peer as ready, broadcasting status updates to subscribers. The
// peer must already be marked as connected. This is separate from Dialed() and
// Accepted() to allow the router to set up its internal queues before reactors
// start sending messages.
func (m *PeerManager) Ready(peerID types.NodeID) {
// Ready marks a peer as ready, broadcasting status updates to
// subscribers. The peer must already be marked as connected. This is
// separate from Dialed() and Accepted() to allow the router to set up
// its internal queues before reactors start sending messages. The
// channels set here are passed in the peer update broadcast to
// reactors, which can then mediate their own behavior based on the
// capability of the peers.
func (m *PeerManager) Ready(peerID types.NodeID, channels ChannelIDSet) {
m.mtx.Lock() m.mtx.Lock()
defer m.mtx.Unlock() defer m.mtx.Unlock()
if m.connected[peerID] { if m.connected[peerID] {
m.ready[peerID] = true m.ready[peerID] = true
m.broadcast(PeerUpdate{ m.broadcast(PeerUpdate{
NodeID: peerID,
Status: PeerStatusUp,
NodeID: peerID,
Status: PeerStatusUp,
Channels: channels,
}) })
} }
} }
@ -1038,13 +1043,15 @@ func (m *PeerManager) retryDelay(failures uint32, persistent bool) time.Duration
maxDelay = m.options.MaxRetryTimePersistent maxDelay = m.options.MaxRetryTimePersistent
} }
delay := m.options.MinRetryTime * time.Duration(math.Pow(2, float64(failures-1)))
if maxDelay > 0 && delay > maxDelay {
delay = maxDelay
}
delay := m.options.MinRetryTime * time.Duration(failures)
if m.options.RetryTimeJitter > 0 { if m.options.RetryTimeJitter > 0 {
delay += time.Duration(m.rand.Int63n(int64(m.options.RetryTimeJitter))) delay += time.Duration(m.rand.Int63n(int64(m.options.RetryTimeJitter)))
} }
if maxDelay > 0 && delay > maxDelay {
delay = maxDelay
}
return delay return delay
} }
@ -1240,6 +1247,7 @@ type peerInfo struct {
// These fields are ephemeral, i.e. not persisted to the database. // These fields are ephemeral, i.e. not persisted to the database.
Persistent bool Persistent bool
Seed bool
Height int64 Height int64
FixedScore PeerScore // mainly for tests FixedScore PeerScore // mainly for tests
@ -1262,6 +1270,7 @@ func peerInfoFromProto(msg *p2pproto.PeerInfo) (*peerInfo, error) {
return nil, err return nil, err
} }
p.AddressInfo[addressInfo.Address] = addressInfo p.AddressInfo[addressInfo.Address] = addressInfo
} }
return p, p.Validate() return p, p.Validate()
} }


+ 40
- 18
internal/p2p/peermanager_test.go View File

@ -3,11 +3,13 @@ package p2p_test
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"strings" "strings"
"testing" "testing"
"time" "time"
"github.com/fortytw2/leaktest" "github.com/fortytw2/leaktest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db" dbm "github.com/tendermint/tm-db"
@ -314,13 +316,14 @@ func TestPeerManager_DialNext_Retry(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second) ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel() defer cancel()
for i := 0; i <= 5; i++ {
for i := 0; i <= 6; i++ {
start := time.Now() start := time.Now()
dial, err := peerManager.DialNext(ctx) dial, err := peerManager.DialNext(ctx)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, a, dial) require.Equal(t, a, dial)
elapsed := time.Since(start).Round(time.Millisecond) elapsed := time.Since(start).Round(time.Millisecond)
fmt.Println(elapsed, options.MinRetryTime)
switch i { switch i {
case 0: case 0:
require.LessOrEqual(t, elapsed, options.MinRetryTime) require.LessOrEqual(t, elapsed, options.MinRetryTime)
@ -329,12 +332,11 @@ func TestPeerManager_DialNext_Retry(t *testing.T) {
case 2: case 2:
require.GreaterOrEqual(t, elapsed, 2*options.MinRetryTime) require.GreaterOrEqual(t, elapsed, 2*options.MinRetryTime)
case 3: case 3:
require.GreaterOrEqual(t, elapsed, 3*options.MinRetryTime)
case 4, 5, 6:
require.GreaterOrEqual(t, elapsed, 4*options.MinRetryTime) require.GreaterOrEqual(t, elapsed, 4*options.MinRetryTime)
case 4, 5:
require.GreaterOrEqual(t, elapsed, options.MaxRetryTime)
require.LessOrEqual(t, elapsed, 8*options.MinRetryTime)
default: default:
require.Fail(t, "unexpected retry")
t.Fatal("unexpected retry")
} }
require.NoError(t, peerManager.DialFailed(a)) require.NoError(t, peerManager.DialFailed(a))
@ -1270,7 +1272,7 @@ func TestPeerManager_Ready(t *testing.T) {
require.Equal(t, p2p.PeerStatusDown, peerManager.Status(a.NodeID)) require.Equal(t, p2p.PeerStatusDown, peerManager.Status(a.NodeID))
// Marking a as ready should transition it to PeerStatusUp and send an update. // Marking a as ready should transition it to PeerStatusUp and send an update.
peerManager.Ready(a.NodeID)
peerManager.Ready(a.NodeID, nil)
require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID)) require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID))
require.Equal(t, p2p.PeerUpdate{ require.Equal(t, p2p.PeerUpdate{
NodeID: a.NodeID, NodeID: a.NodeID,
@ -1282,11 +1284,31 @@ func TestPeerManager_Ready(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.True(t, added) require.True(t, added)
require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID)) require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID))
peerManager.Ready(b.NodeID)
peerManager.Ready(b.NodeID, nil)
require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID)) require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID))
require.Empty(t, sub.Updates()) require.Empty(t, sub.Updates())
} }
func TestPeerManager_Ready_Channels(t *testing.T) {
pm, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
require.NoError(t, err)
sub := pm.Subscribe()
a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
added, err := pm.Add(a)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, pm.Accepted(a.NodeID))
pm.Ready(a.NodeID, p2p.ChannelIDSet{42: struct{}{}})
require.NotEmpty(t, sub.Updates())
update := <-sub.Updates()
assert.Equal(t, a.NodeID, update.NodeID)
require.True(t, update.Channels.Contains(42))
require.False(t, update.Channels.Contains(48))
}
// See TryEvictNext for most tests, this just tests blocking behavior. // See TryEvictNext for most tests, this just tests blocking behavior.
func TestPeerManager_EvictNext(t *testing.T) { func TestPeerManager_EvictNext(t *testing.T) {
a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))}
@ -1298,7 +1320,7 @@ func TestPeerManager_EvictNext(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.True(t, added) require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID)) require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(a.NodeID)
peerManager.Ready(a.NodeID, nil)
// Since there are no peers to evict, EvictNext should block until timeout. // Since there are no peers to evict, EvictNext should block until timeout.
timeoutCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) timeoutCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
@ -1331,7 +1353,7 @@ func TestPeerManager_EvictNext_WakeOnError(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.True(t, added) require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID)) require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(a.NodeID)
peerManager.Ready(a.NodeID, nil)
// Spawn a goroutine to error a peer after a delay. // Spawn a goroutine to error a peer after a delay.
go func() { go func() {
@ -1363,7 +1385,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeDialed(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.True(t, added) require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID)) require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(a.NodeID)
peerManager.Ready(a.NodeID, nil)
// Spawn a goroutine to upgrade to b with a delay. // Spawn a goroutine to upgrade to b with a delay.
go func() { go func() {
@ -1401,7 +1423,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeAccepted(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.True(t, added) require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID)) require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(a.NodeID)
peerManager.Ready(a.NodeID, nil)
// Spawn a goroutine to upgrade b with a delay. // Spawn a goroutine to upgrade b with a delay.
go func() { go func() {
@ -1433,7 +1455,7 @@ func TestPeerManager_TryEvictNext(t *testing.T) {
// Connecting to a won't evict anything either. // Connecting to a won't evict anything either.
require.NoError(t, peerManager.Accepted(a.NodeID)) require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(a.NodeID)
peerManager.Ready(a.NodeID, nil)
// But if a errors it should be evicted. // But if a errors it should be evicted.
peerManager.Errored(a.NodeID, errors.New("foo")) peerManager.Errored(a.NodeID, errors.New("foo"))
@ -1478,7 +1500,7 @@ func TestPeerManager_Disconnected(t *testing.T) {
_, err = peerManager.Add(a) _, err = peerManager.Add(a)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, peerManager.Accepted(a.NodeID)) require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(a.NodeID)
peerManager.Ready(a.NodeID, nil)
require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID)) require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID))
require.NotEmpty(t, sub.Updates()) require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{ require.Equal(t, p2p.PeerUpdate{
@ -1530,7 +1552,7 @@ func TestPeerManager_Errored(t *testing.T) {
require.Zero(t, evict) require.Zero(t, evict)
require.NoError(t, peerManager.Accepted(a.NodeID)) require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(a.NodeID)
peerManager.Ready(a.NodeID, nil)
evict, err = peerManager.TryEvictNext() evict, err = peerManager.TryEvictNext()
require.NoError(t, err) require.NoError(t, err)
require.Zero(t, evict) require.Zero(t, evict)
@ -1561,7 +1583,7 @@ func TestPeerManager_Subscribe(t *testing.T) {
require.NoError(t, peerManager.Accepted(a.NodeID)) require.NoError(t, peerManager.Accepted(a.NodeID))
require.Empty(t, sub.Updates()) require.Empty(t, sub.Updates())
peerManager.Ready(a.NodeID)
peerManager.Ready(a.NodeID, nil)
require.NotEmpty(t, sub.Updates()) require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates()) require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates())
@ -1578,7 +1600,7 @@ func TestPeerManager_Subscribe(t *testing.T) {
require.NoError(t, peerManager.Dialed(a)) require.NoError(t, peerManager.Dialed(a))
require.Empty(t, sub.Updates()) require.Empty(t, sub.Updates())
peerManager.Ready(a.NodeID)
peerManager.Ready(a.NodeID, nil)
require.NotEmpty(t, sub.Updates()) require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates()) require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates())
@ -1618,7 +1640,7 @@ func TestPeerManager_Subscribe_Close(t *testing.T) {
require.NoError(t, peerManager.Accepted(a.NodeID)) require.NoError(t, peerManager.Accepted(a.NodeID))
require.Empty(t, sub.Updates()) require.Empty(t, sub.Updates())
peerManager.Ready(a.NodeID)
peerManager.Ready(a.NodeID, nil)
require.NotEmpty(t, sub.Updates()) require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates()) require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates())
@ -1648,7 +1670,7 @@ func TestPeerManager_Subscribe_Broadcast(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.True(t, added) require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID)) require.NoError(t, peerManager.Accepted(a.NodeID))
peerManager.Ready(a.NodeID)
peerManager.Ready(a.NodeID, nil)
expectUp := p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp} expectUp := p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}
require.NotEmpty(t, s1) require.NotEmpty(t, s1)


+ 12
- 7
internal/p2p/router.go View File

@ -260,7 +260,7 @@ type Router struct {
peerMtx sync.RWMutex peerMtx sync.RWMutex
peerQueues map[types.NodeID]queue // outbound messages per peer for all channels peerQueues map[types.NodeID]queue // outbound messages per peer for all channels
// the channels that the peer queue has open // the channels that the peer queue has open
peerChannels map[types.NodeID]channelIDs
peerChannels map[types.NodeID]ChannelIDSet
queueFactory func(int) queue queueFactory func(int) queue
// FIXME: We don't strictly need to use a mutex for this if we seal the // FIXME: We don't strictly need to use a mutex for this if we seal the
@ -306,7 +306,7 @@ func NewRouter(
channelQueues: map[ChannelID]queue{}, channelQueues: map[ChannelID]queue{},
channelMessages: map[ChannelID]proto.Message{}, channelMessages: map[ChannelID]proto.Message{},
peerQueues: map[types.NodeID]queue{}, peerQueues: map[types.NodeID]queue{},
peerChannels: make(map[types.NodeID]channelIDs),
peerChannels: make(map[types.NodeID]ChannelIDSet),
} }
router.BaseService = service.NewBaseService(logger, "router", router) router.BaseService = service.NewBaseService(logger, "router", router)
@ -739,7 +739,7 @@ func (r *Router) connectPeer(ctx context.Context, address NodeAddress) {
go r.routePeer(address.NodeID, conn, toChannelIDs(peerInfo.Channels)) go r.routePeer(address.NodeID, conn, toChannelIDs(peerInfo.Channels))
} }
func (r *Router) getOrMakeQueue(peerID types.NodeID, channels channelIDs) queue {
func (r *Router) getOrMakeQueue(peerID types.NodeID, channels ChannelIDSet) queue {
r.peerMtx.Lock() r.peerMtx.Lock()
defer r.peerMtx.Unlock() defer r.peerMtx.Unlock()
@ -851,9 +851,9 @@ func (r *Router) runWithPeerMutex(fn func() error) error {
// routePeer routes inbound and outbound messages between a peer and the reactor // routePeer routes inbound and outbound messages between a peer and the reactor
// channels. It will close the given connection and send queue when done, or if // channels. It will close the given connection and send queue when done, or if
// they are closed elsewhere it will cause this method to shut down and return. // they are closed elsewhere it will cause this method to shut down and return.
func (r *Router) routePeer(peerID types.NodeID, conn Connection, channels channelIDs) {
func (r *Router) routePeer(peerID types.NodeID, conn Connection, channels ChannelIDSet) {
r.metrics.Peers.Add(1) r.metrics.Peers.Add(1)
r.peerManager.Ready(peerID)
r.peerManager.Ready(peerID, channels)
sendQueue := r.getOrMakeQueue(peerID, channels) sendQueue := r.getOrMakeQueue(peerID, channels)
defer func() { defer func() {
@ -1092,9 +1092,14 @@ func (r *Router) stopCtx() context.Context {
return ctx return ctx
} }
type channelIDs map[ChannelID]struct{}
type ChannelIDSet map[ChannelID]struct{}
func toChannelIDs(bytes []byte) channelIDs {
func (cs ChannelIDSet) Contains(id ChannelID) bool {
_, ok := cs[id]
return ok
}
func toChannelIDs(bytes []byte) ChannelIDSet {
c := make(map[ChannelID]struct{}, len(bytes)) c := make(map[ChannelID]struct{}, len(bytes))
for _, b := range bytes { for _, b := range bytes {
c[ChannelID(b)] = struct{}{} c[ChannelID(b)] = struct{}{}


+ 1
- 2
internal/p2p/shim.go View File

@ -233,9 +233,8 @@ func (rs *ReactorShim) GetChannels() []*ChannelDescriptor {
// handle adding a peer. // handle adding a peer.
func (rs *ReactorShim) AddPeer(peer Peer) { func (rs *ReactorShim) AddPeer(peer Peer) {
select { select {
case rs.PeerUpdates.reactorUpdatesCh <- PeerUpdate{NodeID: peer.ID(), Status: PeerStatusUp}:
case rs.PeerUpdates.reactorUpdatesCh <- PeerUpdate{NodeID: peer.ID(), Status: PeerStatusUp, Channels: toChannelIDs(peer.NodeInfo().Channels)}:
rs.Logger.Debug("sent peer update", "reactor", rs.Name, "peer", peer.ID(), "status", PeerStatusUp) rs.Logger.Debug("sent peer update", "reactor", rs.Name, "peer", peer.ID(), "status", PeerStatusUp)
case <-rs.PeerUpdates.Done(): case <-rs.PeerUpdates.Done():
// NOTE: We explicitly DO NOT close the PeerUpdatesCh's updateCh go channel. // NOTE: We explicitly DO NOT close the PeerUpdatesCh's updateCh go channel.
// This is because there may be numerous spawned goroutines that are // This is because there may be numerous spawned goroutines that are


+ 1
- 0
internal/p2p/shim_test.go View File

@ -84,6 +84,7 @@ func simplePeer(t *testing.T, id string) (*p2pmocks.Peer, types.NodeID) {
peerID := types.NodeID(id) peerID := types.NodeID(id)
peer := &p2pmocks.Peer{} peer := &p2pmocks.Peer{}
peer.On("ID").Return(peerID) peer.On("ID").Return(peerID)
peer.On("NodeInfo").Return(types.NodeInfo{NodeID: peerID}).Maybe()
return peer, peerID return peer, peerID
} }


+ 1
- 1
internal/proxy/app_conn.go View File

@ -105,7 +105,7 @@ func (app *appConnConsensus) EndBlockSync(
ctx context.Context, ctx context.Context,
req types.RequestEndBlock, req types.RequestEndBlock,
) (*types.ResponseEndBlock, error) { ) (*types.ResponseEndBlock, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "deliver_tx", "type", "sync"))()
defer addTimeSample(app.metrics.MethodTiming.With("method", "end_block", "type", "sync"))()
return app.appConn.EndBlockSync(ctx, req) return app.appConn.EndBlockSync(ctx, req)
} }


+ 12
- 1
internal/statesync/reactor.go View File

@ -885,7 +885,17 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
switch peerUpdate.Status { switch peerUpdate.Status {
case p2p.PeerStatusUp: case p2p.PeerStatusUp:
r.peers.Append(peerUpdate.NodeID)
if peerUpdate.Channels.Contains(SnapshotChannel) &&
peerUpdate.Channels.Contains(ChunkChannel) &&
peerUpdate.Channels.Contains(LightBlockChannel) &&
peerUpdate.Channels.Contains(ParamsChannel) {
r.peers.Append(peerUpdate.NodeID)
} else {
r.Logger.Error("could not use peer for statesync", "peer", peerUpdate.NodeID)
}
case p2p.PeerStatusDown: case p2p.PeerStatusDown:
r.peers.Remove(peerUpdate.NodeID) r.peers.Remove(peerUpdate.NodeID)
} }
@ -898,6 +908,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
switch peerUpdate.Status { switch peerUpdate.Status {
case p2p.PeerStatusUp: case p2p.PeerStatusUp:
newProvider := NewBlockProvider(peerUpdate.NodeID, r.chainID, r.dispatcher) newProvider := NewBlockProvider(peerUpdate.NodeID, r.chainID, r.dispatcher)
r.providers[peerUpdate.NodeID] = newProvider r.providers[peerUpdate.NodeID] = newProvider
err := r.syncer.AddPeer(peerUpdate.NodeID) err := r.syncer.AddPeer(peerUpdate.NodeID)


+ 25
- 1
internal/statesync/reactor_test.go View File

@ -453,10 +453,22 @@ func TestReactor_BlockProviders(t *testing.T) {
rts.peerUpdateCh <- p2p.PeerUpdate{ rts.peerUpdateCh <- p2p.PeerUpdate{
NodeID: types.NodeID("aa"), NodeID: types.NodeID("aa"),
Status: p2p.PeerStatusUp, Status: p2p.PeerStatusUp,
Channels: p2p.ChannelIDSet{
SnapshotChannel: struct{}{},
ChunkChannel: struct{}{},
LightBlockChannel: struct{}{},
ParamsChannel: struct{}{},
},
} }
rts.peerUpdateCh <- p2p.PeerUpdate{ rts.peerUpdateCh <- p2p.PeerUpdate{
NodeID: types.NodeID("bb"), NodeID: types.NodeID("bb"),
Status: p2p.PeerStatusUp, Status: p2p.PeerStatusUp,
Channels: p2p.ChannelIDSet{
SnapshotChannel: struct{}{},
ChunkChannel: struct{}{},
LightBlockChannel: struct{}{},
ParamsChannel: struct{}{},
},
} }
closeCh := make(chan struct{}) closeCh := make(chan struct{})
@ -548,7 +560,7 @@ func TestReactor_StateProviderP2P(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
rts.reactor.syncer.stateProvider = rts.reactor.stateProvider rts.reactor.syncer.stateProvider = rts.reactor.stateProvider
actx, cancel := context.WithTimeout(bctx, 10*time.Second)
actx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel() defer cancel()
appHash, err := rts.reactor.stateProvider.AppHash(actx, 5) appHash, err := rts.reactor.stateProvider.AppHash(actx, 5)
@ -591,6 +603,12 @@ func TestReactor_Backfill(t *testing.T) {
rts.peerUpdateCh <- p2p.PeerUpdate{ rts.peerUpdateCh <- p2p.PeerUpdate{
NodeID: types.NodeID(peer), NodeID: types.NodeID(peer),
Status: p2p.PeerStatusUp, Status: p2p.PeerStatusUp,
Channels: p2p.ChannelIDSet{
SnapshotChannel: struct{}{},
ChunkChannel: struct{}{},
LightBlockChannel: struct{}{},
ParamsChannel: struct{}{},
},
} }
} }
@ -789,6 +807,12 @@ func graduallyAddPeers(
peerUpdateCh <- p2p.PeerUpdate{ peerUpdateCh <- p2p.PeerUpdate{
NodeID: factory.RandomNodeID(), NodeID: factory.RandomNodeID(),
Status: p2p.PeerStatusUp, Status: p2p.PeerStatusUp,
Channels: p2p.ChannelIDSet{
SnapshotChannel: struct{}{},
ChunkChannel: struct{}{},
LightBlockChannel: struct{}{},
ParamsChannel: struct{}{},
},
} }
case <-closeCh: case <-closeCh:
return return


+ 93
- 59
internal/statesync/stateprovider.go View File

@ -3,9 +3,10 @@ package statesync
import ( import (
"bytes" "bytes"
"context" "context"
"errors"
"fmt" "fmt"
"math/rand"
"strings" "strings"
"sync"
"time" "time"
dbm "github.com/tendermint/tm-db" dbm "github.com/tendermint/tm-db"
@ -328,7 +329,7 @@ func (s *stateProviderP2P) State(ctx context.Context, height uint64) (sm.State,
// We'll also need to fetch consensus params via P2P. // We'll also need to fetch consensus params via P2P.
state.ConsensusParams, err = s.consensusParams(ctx, currentLightBlock.Height) state.ConsensusParams, err = s.consensusParams(ctx, currentLightBlock.Height)
if err != nil { if err != nil {
return sm.State{}, err
return sm.State{}, fmt.Errorf("fetching consensus params: %w", err)
} }
// validate the consensus params // validate the consensus params
if !bytes.Equal(nextLightBlock.ConsensusHash, state.ConsensusParams.HashConsensusParams()) { if !bytes.Equal(nextLightBlock.ConsensusHash, state.ConsensusParams.HashConsensusParams()) {
@ -352,73 +353,106 @@ func (s *stateProviderP2P) addProvider(p lightprovider.Provider) {
// consensusParams sends out a request for consensus params blocking // consensusParams sends out a request for consensus params blocking
// until one is returned. // until one is returned.
// //
// If it fails to get a valid set of consensus params from any of the
// providers it returns an error; however, it will retry indefinitely
// (with backoff) until the context is canceled.
// It attempts to send requests to all witnesses in parallel, but if
// none responds it will retry them all sometime later until it
// receives some response. This operation will block until it receives
// a response or the context is canceled.
func (s *stateProviderP2P) consensusParams(ctx context.Context, height int64) (types.ConsensusParams, error) { func (s *stateProviderP2P) consensusParams(ctx context.Context, height int64) (types.ConsensusParams, error) {
iterCount := 0
for {
params, err := s.tryGetConsensusParamsFromWitnesses(ctx, height)
if err != nil {
return types.ConsensusParams{}, err
}
if params != nil {
return *params, nil
}
iterCount++
ctx, cancel := context.WithCancel(ctx)
defer cancel()
select {
case <-ctx.Done():
return types.ConsensusParams{}, ctx.Err()
case <-time.After(time.Duration(iterCount) * consensusParamsResponseTimeout):
out := make(chan types.ConsensusParams)
retryAll := func() (<-chan struct{}, error) {
wg := &sync.WaitGroup{}
for _, provider := range s.lc.Witnesses() {
p, ok := provider.(*BlockProvider)
if !ok {
return nil, fmt.Errorf("witness is not BlockProvider [%T]", provider)
}
peer, err := types.NewNodeID(p.String())
if err != nil {
return nil, fmt.Errorf("invalid provider (%s) node id: %w", p.String(), err)
}
wg.Add(1)
go func(p *BlockProvider, peer types.NodeID, requestCh chan<- p2p.Envelope, responseCh <-chan types.ConsensusParams) {
defer wg.Done()
timer := time.NewTimer(0)
defer timer.Stop()
var iterCount int64
for {
iterCount++
select {
case s.paramsSendCh <- p2p.Envelope{
To: peer,
Message: &ssproto.ParamsRequest{
Height: uint64(height),
},
}:
case <-ctx.Done():
return
}
// jitter+backoff the retry loop
timer.Reset(time.Duration(iterCount)*consensusParamsResponseTimeout +
time.Duration(100*rand.Int63n(iterCount))*time.Millisecond) // nolint:gosec
select {
case <-timer.C:
continue
case <-ctx.Done():
return
case params, ok := <-responseCh:
if !ok {
return
}
select {
case <-ctx.Done():
return
case out <- params:
return
}
}
}
}(p, peer, s.paramsSendCh, s.paramsRecvCh)
} }
sig := make(chan struct{})
go func() { wg.Wait(); close(sig) }()
return sig, nil
} }
}
// tryGetConsensusParamsFromWitnesses attempts to get consensus
// parameters from the light clients available witnesses. If both
// return parameters are nil, then it can be retried.
func (s *stateProviderP2P) tryGetConsensusParamsFromWitnesses(
ctx context.Context,
height int64,
) (*types.ConsensusParams, error) {
for _, provider := range s.lc.Witnesses() {
p, ok := provider.(*BlockProvider)
if !ok {
panic("expected p2p state provider to use p2p block providers")
}
timer := time.NewTimer(0)
defer timer.Stop()
// extract the nodeID of the provider
peer, err := types.NewNodeID(p.String())
var iterCount int64
for {
iterCount++
sig, err := retryAll()
if err != nil { if err != nil {
return nil, fmt.Errorf("invalid provider (%s) node id: %w", p.String(), err)
}
select {
case s.paramsSendCh <- p2p.Envelope{
To: peer,
Message: &ssproto.ParamsRequest{
Height: uint64(height),
},
}:
case <-ctx.Done():
return nil, ctx.Err()
return types.ConsensusParams{}, err
} }
select { select {
// if we get no response from this provider we move on to the next one
case <-time.After(consensusParamsResponseTimeout):
continue
case <-ctx.Done():
return nil, ctx.Err()
case params, ok := <-s.paramsRecvCh:
if !ok {
return nil, errors.New("params channel closed")
case <-sig:
// jitter+backoff the retry loop
timer.Reset(time.Duration(iterCount)*consensusParamsResponseTimeout +
time.Duration(100*rand.Int63n(iterCount))*time.Millisecond) // nolint:gosec
select {
case param := <-out:
return param, nil
case <-ctx.Done():
return types.ConsensusParams{}, ctx.Err()
case <-timer.C:
} }
return &params, nil
case <-ctx.Done():
return types.ConsensusParams{}, ctx.Err()
case param := <-out:
return param, nil
} }
} }
// signal to caller to retry.
return nil, nil
} }

+ 3
- 3
node/setup.go View File

@ -461,10 +461,10 @@ func createPeerManager(
MaxConnected: maxConns, MaxConnected: maxConns,
MaxConnectedUpgrade: 4, MaxConnectedUpgrade: 4,
MaxPeers: 1000, MaxPeers: 1000,
MinRetryTime: 100 * time.Millisecond,
MaxRetryTime: 8 * time.Hour,
MinRetryTime: 250 * time.Millisecond,
MaxRetryTime: 30 * time.Minute,
MaxRetryTimePersistent: 5 * time.Minute, MaxRetryTimePersistent: 5 * time.Minute,
RetryTimeJitter: 3 * time.Second,
RetryTimeJitter: 5 * time.Second,
PrivatePeers: privatePeerIDs, PrivatePeers: privatePeerIDs,
} }


Loading…
Cancel
Save