Browse Source

state sync: reverse sync implementation (#6463)

pull/6560/head
Callum Waters 3 years ago
committed by GitHub
parent
commit
6f6ac5c04e
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 2394 additions and 140 deletions
  1. +2
    -1
      CHANGELOG_PENDING.md
  2. +263
    -0
      internal/statesync/block_queue.go
  3. +241
    -0
      internal/statesync/block_queue_test.go
  4. +322
    -0
      internal/statesync/dispatcher.go
  5. +179
    -0
      internal/statesync/dispatcher_test.go
  6. +1
    -1
      internal/statesync/mocks/state_provider.go
  7. +363
    -61
      internal/statesync/reactor.go
  8. +296
    -3
      internal/statesync/reactor_test.go
  9. +36
    -1
      internal/statesync/stateprovider.go
  10. +6
    -0
      internal/test/factory/block.go
  11. +7
    -9
      node/node.go
  12. +1
    -0
      node/setup.go
  13. +1
    -1
      proto/tendermint/abci/types.proto
  14. +0
    -1
      proto/tendermint/p2p/pex.proto
  15. +20
    -0
      proto/tendermint/statesync/message.go
  16. +532
    -30
      proto/tendermint/statesync/types.pb.go
  17. +16
    -4
      proto/tendermint/statesync/types.proto
  18. +1
    -1
      proxy/mocks/app_conn_consensus.go
  19. +1
    -1
      proxy/mocks/app_conn_mempool.go
  20. +1
    -1
      proxy/mocks/app_conn_query.go
  21. +1
    -1
      proxy/mocks/app_conn_snapshot.go
  22. +2
    -2
      rpc/core/blocks.go
  23. +1
    -1
      state/mocks/evidence_pool.go
  24. +15
    -1
      state/mocks/store.go
  25. +21
    -6
      state/store.go
  26. +42
    -0
      store/store.go
  27. +1
    -2
      test/e2e/networks/simple.toml
  28. +1
    -1
      test/e2e/pkg/testnet.go
  29. +1
    -1
      test/e2e/runner/load.go
  30. +12
    -3
      test/e2e/tests/block_test.go
  31. +2
    -2
      types/block_meta_test.go
  32. +4
    -3
      types/block_test.go
  33. +2
    -2
      types/light_test.go

+ 2
- 1
CHANGELOG_PENDING.md View File

@ -125,4 +125,5 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi
- [privval] \#5638 Increase read/write timeout to 5s and calculate ping interval based on it (@JoeKash)
- [blockchain/v1] [\#5701](https://github.com/tendermint/tendermint/pull/5701) Handle peers without blocks (@melekes)
- [blockchain/v1] \#5711 Fix deadlock (@melekes)
- [evidence] \#6375 Fix bug with inconsistent LightClientAttackEvidence hashing (cmwaters)
- [evidence] \#6375 Fix bug with inconsistent LightClientAttackEvidence hashing (@cmwaters)
- [statesync] \#6463 Adds Reverse Sync feature to fetch historical light blocks after state sync in order to verify any evidence (@cmwaters)

+ 263
- 0
internal/statesync/block_queue.go View File

@ -0,0 +1,263 @@
package statesync
import (
"container/heap"
"fmt"
"sync"
"time"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/types"
)
type lightBlockResponse struct {
block *types.LightBlock
peer p2p.NodeID
}
// a block queue is used for asynchronously fetching and verifying light blocks
type blockQueue struct {
mtx sync.Mutex
// cursors to keep track of which heights need to be fetched and verified
fetchHeight int64
verifyHeight int64
// termination conditions
stopHeight int64
stopTime time.Time
terminal *types.LightBlock
// track failed heights so we know what blocks to try fetch again
failed *maxIntHeap
// also count retries to know when to give up
retries int
maxRetries int
// store inbound blocks and serve them to a verifying thread via a channel
pending map[int64]lightBlockResponse
verifyCh chan lightBlockResponse
// waiters are workers on idle until a height is required
waiters []chan int64
// this channel is closed once the verification process is complete
doneCh chan struct{}
}
func newBlockQueue(
startHeight, stopHeight int64,
stopTime time.Time,
maxRetries int,
) *blockQueue {
return &blockQueue{
stopHeight: stopHeight,
stopTime: stopTime,
fetchHeight: startHeight,
verifyHeight: startHeight,
pending: make(map[int64]lightBlockResponse),
failed: &maxIntHeap{},
retries: 0,
maxRetries: maxRetries,
waiters: make([]chan int64, 0),
doneCh: make(chan struct{}),
}
}
// Add adds a block to the queue to be verified and stored
// CONTRACT: light blocks should have passed basic validation
func (q *blockQueue) add(l lightBlockResponse) {
q.mtx.Lock()
defer q.mtx.Unlock()
// return early if the process has already finished
select {
case <-q.doneCh:
return
default:
}
// sometimes more blocks are fetched then what is necessary. If we already
// have what we need then ignore this
if q.terminal != nil && l.block.Height < q.terminal.Height {
return
}
// if the block that was returned is at the verify height then the verifier
// is already waiting for this block so we send it directly to them
if l.block.Height == q.verifyHeight && q.verifyCh != nil {
q.verifyCh <- l
close(q.verifyCh)
q.verifyCh = nil
} else {
// else we add it in the pending bucket
q.pending[l.block.Height] = l
}
// Lastly, if the incoming block is past the stop time and stop height then
// we mark it as the terminal block
if l.block.Height <= q.stopHeight && l.block.Time.Before(q.stopTime) {
q.terminal = l.block
}
}
// NextHeight returns the next height that needs to be retrieved.
// We assume that for every height allocated that the peer will eventually add
// the block or signal that it needs to be retried
func (q *blockQueue) nextHeight() <-chan int64 {
q.mtx.Lock()
defer q.mtx.Unlock()
ch := make(chan int64, 1)
// if a previous process failed then we pick up this one
if q.failed.Len() > 0 {
failedHeight := heap.Pop(q.failed)
ch <- failedHeight.(int64)
close(ch)
return ch
}
if q.terminal == nil {
// return and decrement the fetch height
ch <- q.fetchHeight
q.fetchHeight--
close(ch)
return ch
}
// at this point there is no height that we know we need so we create a
// waiter to hold out for either an outgoing request to fail or a block to
// fail verification
q.waiters = append(q.waiters, ch)
return ch
}
// Finished returns true when the block queue has has all light blocks retrieved,
// verified and stored. There is no more work left to be done
func (q *blockQueue) done() <-chan struct{} {
return q.doneCh
}
// VerifyNext pulls the next block off the pending queue and adds it to a
// channel if it's already there or creates a waiter to add it to the
// channel once it comes in. NOTE: This is assumed to
// be a single thread as light blocks need to be sequentially verified.
func (q *blockQueue) verifyNext() <-chan lightBlockResponse {
q.mtx.Lock()
defer q.mtx.Unlock()
ch := make(chan lightBlockResponse, 1)
select {
case <-q.doneCh:
return ch
default:
}
if lb, ok := q.pending[q.verifyHeight]; ok {
ch <- lb
close(ch)
delete(q.pending, q.verifyHeight)
} else {
q.verifyCh = ch
}
return ch
}
// Retry is called when a dispatcher failed to fetch a light block or the
// fetched light block failed verification. It signals to the queue to add the
// height back to the request queue
func (q *blockQueue) retry(height int64) {
q.mtx.Lock()
defer q.mtx.Unlock()
select {
case <-q.doneCh:
return
default:
}
// we don't need to retry if this is below the terminal height
if q.terminal != nil && height < q.terminal.Height {
return
}
q.retries++
if q.retries >= q.maxRetries {
q._closeChannels()
return
}
if len(q.waiters) > 0 {
q.waiters[0] <- height
close(q.waiters[0])
q.waiters = q.waiters[1:]
} else {
heap.Push(q.failed, height)
}
}
// Success is called when a light block has been successfully verified and
// processed
func (q *blockQueue) success(height int64) {
q.mtx.Lock()
defer q.mtx.Unlock()
if q.terminal != nil && q.verifyHeight == q.terminal.Height {
q._closeChannels()
}
q.verifyHeight--
}
func (q *blockQueue) error() error {
q.mtx.Lock()
defer q.mtx.Unlock()
if q.retries >= q.maxRetries {
return fmt.Errorf("failed to backfill blocks following reverse sync. Max retries exceeded (%d). "+
"Target height: %d, height reached: %d", q.maxRetries, q.stopHeight, q.verifyHeight)
}
return nil
}
// close the queue and respective channels
func (q *blockQueue) close() {
q.mtx.Lock()
defer q.mtx.Unlock()
q._closeChannels()
}
// CONTRACT: must have a write lock. Use close instead
func (q *blockQueue) _closeChannels() {
close(q.doneCh)
// wait for the channel to be drained
select {
case <-q.doneCh:
return
default:
}
for _, ch := range q.waiters {
close(ch)
}
if q.verifyCh != nil {
close(q.verifyCh)
}
}
// A max-heap of ints.
type maxIntHeap []int64
func (h maxIntHeap) Len() int { return len(h) }
func (h maxIntHeap) Less(i, j int) bool { return h[i] < h[j] }
func (h maxIntHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *maxIntHeap) Push(x interface{}) {
*h = append(*h, x.(int64))
}
func (h *maxIntHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

+ 241
- 0
internal/statesync/block_queue_test.go View File

@ -0,0 +1,241 @@
package statesync
import (
"math/rand"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/test/factory"
)
var (
startHeight int64 = 200
stopHeight int64 = 100
stopTime = time.Date(2019, 1, 1, 1, 0, 0, 0, time.UTC)
endTime = stopTime.Add(-1 * time.Second)
numWorkers = 1
)
func TestBlockQueueBasic(t *testing.T) {
peerID, err := p2p.NewNodeID("0011223344556677889900112233445566778899")
require.NoError(t, err)
queue := newBlockQueue(startHeight, stopHeight, stopTime, 1)
wg := &sync.WaitGroup{}
// asynchronously fetch blocks and add it to the queue
for i := 0; i <= numWorkers; i++ {
wg.Add(1)
go func() {
for {
select {
case height := <-queue.nextHeight():
queue.add(mockLBResp(t, peerID, height, endTime))
case <-queue.done():
wg.Done()
return
}
}
}()
}
trackingHeight := startHeight
wg.Add(1)
loop:
for {
select {
case <-queue.done():
wg.Done()
break loop
case resp := <-queue.verifyNext():
// assert that the queue serializes the blocks
require.Equal(t, resp.block.Height, trackingHeight)
trackingHeight--
queue.success(resp.block.Height)
}
}
wg.Wait()
assert.Less(t, trackingHeight, stopHeight)
}
// Test with spurious failures and retries
func TestBlockQueueWithFailures(t *testing.T) {
peerID, err := p2p.NewNodeID("0011223344556677889900112233445566778899")
require.NoError(t, err)
queue := newBlockQueue(startHeight, stopHeight, stopTime, 200)
wg := &sync.WaitGroup{}
failureRate := 4
for i := 0; i <= numWorkers; i++ {
wg.Add(1)
go func() {
for {
select {
case height := <-queue.nextHeight():
if rand.Intn(failureRate) == 0 {
queue.retry(height)
} else {
queue.add(mockLBResp(t, peerID, height, endTime))
}
case <-queue.done():
wg.Done()
return
}
}
}()
}
trackingHeight := startHeight
for {
select {
case resp := <-queue.verifyNext():
// assert that the queue serializes the blocks
assert.Equal(t, resp.block.Height, trackingHeight)
if rand.Intn(failureRate) == 0 {
queue.retry(resp.block.Height)
} else {
trackingHeight--
queue.success(resp.block.Height)
}
case <-queue.done():
wg.Wait()
assert.Less(t, trackingHeight, stopHeight)
return
}
}
}
// Test that when all the blocks are retrieved that the queue still holds on to
// it's workers and in the event of failure can still fetch the failed block
func TestBlockQueueBlocks(t *testing.T) {
peerID, err := p2p.NewNodeID("0011223344556677889900112233445566778899")
require.NoError(t, err)
queue := newBlockQueue(startHeight, stopHeight, stopTime, 2)
expectedHeight := startHeight
retryHeight := stopHeight + 2
loop:
for {
select {
case height := <-queue.nextHeight():
require.Equal(t, height, expectedHeight)
require.GreaterOrEqual(t, height, stopHeight)
expectedHeight--
queue.add(mockLBResp(t, peerID, height, endTime))
case <-time.After(1 * time.Second):
if expectedHeight >= stopHeight {
t.Fatalf("expected next height %d", expectedHeight)
}
break loop
}
}
// close any waiter channels that the previous worker left hanging
for _, ch := range queue.waiters {
close(ch)
}
queue.waiters = make([]chan int64, 0)
wg := &sync.WaitGroup{}
wg.Add(1)
// so far so good. The worker is waiting. Now we fail a previous
// block and check that the worker fetches them
go func(t *testing.T) {
defer wg.Done()
select {
case height := <-queue.nextHeight():
require.Equal(t, retryHeight, height)
case <-time.After(1 * time.Second):
require.Fail(t, "queue didn't ask worker to fetch failed height")
}
}(t)
queue.retry(retryHeight)
wg.Wait()
}
func TestBlockQueueAcceptsNoMoreBlocks(t *testing.T) {
peerID, err := p2p.NewNodeID("0011223344556677889900112233445566778899")
require.NoError(t, err)
queue := newBlockQueue(startHeight, stopHeight, stopTime, 1)
defer queue.close()
loop:
for {
select {
case height := <-queue.nextHeight():
require.GreaterOrEqual(t, height, stopHeight)
queue.add(mockLBResp(t, peerID, height, endTime))
case <-time.After(1 * time.Second):
break loop
}
}
require.Len(t, queue.pending, int(startHeight-stopHeight)+1)
queue.add(mockLBResp(t, peerID, stopHeight-1, endTime))
require.Len(t, queue.pending, int(startHeight-stopHeight)+1)
}
// Test a scenario where more blocks are needed then just the stopheight because
// we haven't found a block with a small enough time.
func TestBlockQueueStopTime(t *testing.T) {
peerID, err := p2p.NewNodeID("0011223344556677889900112233445566778899")
require.NoError(t, err)
queue := newBlockQueue(startHeight, stopHeight, stopTime, 1)
wg := &sync.WaitGroup{}
baseTime := stopTime.Add(-50 * time.Second)
// asynchronously fetch blocks and add it to the queue
for i := 0; i <= numWorkers; i++ {
wg.Add(1)
go func() {
for {
select {
case height := <-queue.nextHeight():
blockTime := baseTime.Add(time.Duration(height) * time.Second)
queue.add(mockLBResp(t, peerID, height, blockTime))
case <-queue.done():
wg.Done()
return
}
}
}()
}
trackingHeight := startHeight
for {
select {
case resp := <-queue.verifyNext():
// assert that the queue serializes the blocks
assert.Equal(t, resp.block.Height, trackingHeight)
trackingHeight--
queue.success(resp.block.Height)
case <-queue.done():
wg.Wait()
assert.Less(t, trackingHeight, stopHeight-50)
return
}
}
}
func mockLBResp(t *testing.T, peer p2p.NodeID, height int64, time time.Time) lightBlockResponse {
return lightBlockResponse{
block: mockLB(t, height, time, factory.MakeBlockID()),
peer: peer,
}
}

+ 322
- 0
internal/statesync/dispatcher.go View File

@ -0,0 +1,322 @@
package statesync
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/light/provider"
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
proto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/types"
)
var (
errNoConnectedPeers = errors.New("no available peers to dispatch request to")
errUnsolicitedResponse = errors.New("unsolicited light block response")
errNoResponse = errors.New("peer failed to respond within timeout")
errPeerAlreadyBusy = errors.New("peer is already processing a request")
errDisconnected = errors.New("dispatcher has been disconnected")
)
// dispatcher keeps a list of peers and allows concurrent requests for light
// blocks. NOTE: It is not the responsibility of the dispatcher to verify the
// light blocks.
type dispatcher struct {
availablePeers *peerlist
requestCh chan<- p2p.Envelope
timeout time.Duration
mtx sync.Mutex
calls map[p2p.NodeID]chan *types.LightBlock
running bool
}
func newDispatcher(requestCh chan<- p2p.Envelope, timeout time.Duration) *dispatcher {
return &dispatcher{
availablePeers: newPeerList(),
timeout: timeout,
requestCh: requestCh,
calls: make(map[p2p.NodeID]chan *types.LightBlock),
running: true,
}
}
func (d *dispatcher) LightBlock(ctx context.Context, height int64) (*types.LightBlock, p2p.NodeID, error) {
d.mtx.Lock()
outgoingCalls := len(d.calls)
d.mtx.Unlock()
// check to see that the dispatcher is connected to at least one peer
if d.availablePeers.Len() == 0 && outgoingCalls == 0 {
return nil, "", errNoConnectedPeers
}
// fetch the next peer id in the list and request a light block from that
// peer
peer := d.availablePeers.Pop()
lb, err := d.lightBlock(ctx, height, peer)
return lb, peer, err
}
func (d *dispatcher) Providers(chainID string, timeout time.Duration) []provider.Provider {
d.mtx.Lock()
defer d.mtx.Unlock()
providers := make([]provider.Provider, d.availablePeers.Len())
peers := d.availablePeers.Peers()
for index, peer := range peers {
providers[index] = &blockProvider{
peer: peer,
dispatcher: d,
chainID: chainID,
timeout: timeout,
}
}
return providers
}
func (d *dispatcher) stop() {
d.mtx.Lock()
defer d.mtx.Unlock()
d.running = false
for peer, call := range d.calls {
close(call)
delete(d.calls, peer)
}
}
func (d *dispatcher) start() {
d.mtx.Lock()
defer d.mtx.Unlock()
d.running = true
}
func (d *dispatcher) lightBlock(ctx context.Context, height int64, peer p2p.NodeID) (*types.LightBlock, error) {
// dispatch the request to the peer
callCh, err := d.dispatch(peer, height)
if err != nil {
return nil, err
}
// wait for a response, cancel or timeout
select {
case resp := <-callCh:
return resp, nil
case <-ctx.Done():
d.release(peer)
return nil, nil
case <-time.After(d.timeout):
d.release(peer)
return nil, errNoResponse
}
}
// respond allows the underlying process which receives requests on the
// requestCh to respond with the respective light block
func (d *dispatcher) respond(lb *proto.LightBlock, peer p2p.NodeID) error {
d.mtx.Lock()
defer d.mtx.Unlock()
// check that the response came from a request
answerCh, ok := d.calls[peer]
if !ok {
// this can also happen if the response came in after the timeout
return errUnsolicitedResponse
}
// release the peer after returning the response
defer d.availablePeers.Append(peer)
defer close(answerCh)
defer delete(d.calls, peer)
if lb == nil {
answerCh <- nil
return nil
}
block, err := types.LightBlockFromProto(lb)
if err != nil {
fmt.Println("error with converting light block")
return err
}
answerCh <- block
return nil
}
func (d *dispatcher) addPeer(peer p2p.NodeID) {
d.availablePeers.Append(peer)
}
func (d *dispatcher) removePeer(peer p2p.NodeID) {
d.mtx.Lock()
defer d.mtx.Unlock()
if _, ok := d.calls[peer]; ok {
delete(d.calls, peer)
} else {
d.availablePeers.Remove(peer)
}
}
// dispatch takes a peer and allocates it a channel so long as it's not already
// busy and the receiving channel is still running. It then dispatches the message
func (d *dispatcher) dispatch(peer p2p.NodeID, height int64) (chan *types.LightBlock, error) {
d.mtx.Lock()
defer d.mtx.Unlock()
ch := make(chan *types.LightBlock, 1)
// check if the dispatcher is running or not
if !d.running {
close(ch)
return ch, errDisconnected
}
// this should happen only if we add the same peer twice (somehow)
if _, ok := d.calls[peer]; ok {
close(ch)
return ch, errPeerAlreadyBusy
}
d.calls[peer] = ch
// send request
d.requestCh <- p2p.Envelope{
To: peer,
Message: &ssproto.LightBlockRequest{
Height: uint64(height),
},
}
return ch, nil
}
// release appends the peer back to the list and deletes the allocated call so
// that a new call can be made to that peer
func (d *dispatcher) release(peer p2p.NodeID) {
d.mtx.Lock()
defer d.mtx.Unlock()
if call, ok := d.calls[peer]; ok {
close(call)
delete(d.calls, peer)
}
d.availablePeers.Append(peer)
}
//----------------------------------------------------------------
// blockProvider is a p2p based light provider which uses a dispatcher connected
// to the state sync reactor to serve light blocks to the light client
//
// TODO: This should probably be moved over to the light package but as we're
// not yet officially supporting p2p light clients we'll leave this here for now.
type blockProvider struct {
peer p2p.NodeID
chainID string
timeout time.Duration
dispatcher *dispatcher
}
func (p *blockProvider) LightBlock(ctx context.Context, height int64) (*types.LightBlock, error) {
// FIXME: The provider doesn't know if the dispatcher is still connected to
// that peer. If the connection is dropped for whatever reason the
// dispatcher needs to be able to relay this back to the provider so it can
// return ErrConnectionClosed instead of ErrNoResponse
ctx, cancel := context.WithTimeout(ctx, p.timeout)
defer cancel()
lb, _ := p.dispatcher.lightBlock(ctx, height, p.peer)
if lb == nil {
return nil, provider.ErrNoResponse
}
if err := lb.ValidateBasic(p.chainID); err != nil {
return nil, provider.ErrBadLightBlock{Reason: err}
}
return lb, nil
}
// ReportEvidence should allow for the light client to report any light client
// attacks. This is a no op as there currently isn't a way to wire this up to
// the evidence reactor (we should endeavor to do this in the future but for now
// it's not critical for backwards verification)
func (p *blockProvider) ReportEvidence(ctx context.Context, ev types.Evidence) error {
return nil
}
// String implements stringer interface
func (p *blockProvider) String() string { return string(p.peer) }
//----------------------------------------------------------------
// peerList is a rolling list of peers. This is used to distribute the load of
// retrieving blocks over all the peers the reactor is connected to
type peerlist struct {
mtx sync.Mutex
peers []p2p.NodeID
waiting []chan p2p.NodeID
}
func newPeerList() *peerlist {
return &peerlist{
peers: make([]p2p.NodeID, 0),
waiting: make([]chan p2p.NodeID, 0),
}
}
func (l *peerlist) Len() int {
l.mtx.Lock()
defer l.mtx.Unlock()
return len(l.peers)
}
func (l *peerlist) Pop() p2p.NodeID {
l.mtx.Lock()
if len(l.peers) == 0 {
// if we don't have any peers in the list we block until a peer is
// appended
wait := make(chan p2p.NodeID, 1)
l.waiting = append(l.waiting, wait)
// unlock whilst waiting so that the list can be appended to
l.mtx.Unlock()
peer := <-wait
return peer
}
peer := l.peers[0]
l.peers = l.peers[1:]
l.mtx.Unlock()
return peer
}
func (l *peerlist) Append(peer p2p.NodeID) {
l.mtx.Lock()
defer l.mtx.Unlock()
if len(l.waiting) > 0 {
wait := l.waiting[0]
l.waiting = l.waiting[1:]
wait <- peer
close(wait)
} else {
l.peers = append(l.peers, peer)
}
}
func (l *peerlist) Remove(peer p2p.NodeID) {
l.mtx.Lock()
defer l.mtx.Unlock()
for i, p := range l.peers {
if p == peer {
l.peers = append(l.peers[:i], l.peers[i+1:]...)
return
}
}
}
func (l *peerlist) Peers() []p2p.NodeID {
l.mtx.Lock()
defer l.mtx.Unlock()
return l.peers
}

+ 179
- 0
internal/statesync/dispatcher_test.go View File

@ -0,0 +1,179 @@
package statesync
import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/internal/p2p"
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
)
func TestDispatcherBasic(t *testing.T) {
ch := make(chan p2p.Envelope, 100)
closeCh := make(chan struct{})
defer close(closeCh)
d := newDispatcher(ch, 1*time.Second)
go handleRequests(t, d, ch, closeCh)
peers := createPeerSet(5)
for _, peer := range peers {
d.addPeer(peer)
}
wg := sync.WaitGroup{}
// make a bunch of async requests and require that the correct responses are
// given
for i := 1; i < 10; i++ {
wg.Add(1)
go func(height int64) {
defer wg.Done()
lb, peer, err := d.LightBlock(context.Background(), height)
require.NoError(t, err)
require.NotNil(t, lb)
require.Equal(t, lb.Height, height)
require.Contains(t, peers, peer)
}(int64(i))
}
wg.Wait()
}
func TestDispatcherProviders(t *testing.T) {
ch := make(chan p2p.Envelope, 100)
chainID := "state-sync-test"
closeCh := make(chan struct{})
defer close(closeCh)
d := newDispatcher(ch, 1*time.Second)
go handleRequests(t, d, ch, closeCh)
peers := createPeerSet(5)
for _, peer := range peers {
d.addPeer(peer)
}
providers := d.Providers(chainID, 5*time.Second)
require.Len(t, providers, 5)
for i, p := range providers {
bp, ok := p.(*blockProvider)
require.True(t, ok)
assert.Equal(t, bp.String(), string(peers[i]))
lb, err := p.LightBlock(context.Background(), 10)
assert.Error(t, err)
assert.Nil(t, lb)
}
}
func TestPeerListBasic(t *testing.T) {
peerList := newPeerList()
assert.Zero(t, peerList.Len())
numPeers := 10
peerSet := createPeerSet(numPeers)
for _, peer := range peerSet {
peerList.Append(peer)
}
for idx, peer := range peerList.Peers() {
assert.Equal(t, peer, peerSet[idx])
}
assert.Equal(t, numPeers, peerList.Len())
half := numPeers / 2
for i := 0; i < half; i++ {
assert.Equal(t, peerSet[i], peerList.Pop())
}
assert.Equal(t, half, peerList.Len())
peerList.Remove(p2p.NodeID("lp"))
assert.Equal(t, half, peerList.Len())
peerList.Remove(peerSet[half])
half++
assert.Equal(t, peerSet[half], peerList.Pop())
}
func TestPeerListConcurrent(t *testing.T) {
peerList := newPeerList()
numPeers := 10
wg := sync.WaitGroup{}
// we run a set of goroutines requesting the next peer in the list. As the
// peer list hasn't been populated each these go routines should block
for i := 0; i < numPeers/2; i++ {
go func() {
_ = peerList.Pop()
wg.Done()
}()
}
// now we add the peers to the list, this should allow the previously
// blocked go routines to unblock
for _, peer := range createPeerSet(numPeers) {
wg.Add(1)
peerList.Append(peer)
}
// we request the second half of the peer set
for i := 0; i < numPeers/2; i++ {
go func() {
_ = peerList.Pop()
wg.Done()
}()
}
// we use a context with cancel and a separate go routine to wait for all
// the other goroutines to close.
ctx, cancel := context.WithCancel(context.Background())
go func() { wg.Wait(); cancel() }()
select {
case <-time.After(time.Second):
// not all of the blocked go routines waiting on peers have closed after
// one second. This likely means the list got blocked.
t.Failed()
case <-ctx.Done():
// there should be no peers remaining
require.Equal(t, 0, peerList.Len())
}
}
// handleRequests is a helper function usually run in a separate go routine to
// imitate the expected responses of the reactor wired to the dispatcher
func handleRequests(t *testing.T, d *dispatcher, ch chan p2p.Envelope, closeCh chan struct{}) {
t.Helper()
for {
select {
case request := <-ch:
height := request.Message.(*ssproto.LightBlockRequest).Height
peer := request.To
resp := mockLBResp(t, peer, int64(height), time.Now())
block, _ := resp.block.ToProto()
require.NoError(t, d.respond(block, resp.peer))
case <-closeCh:
return
}
}
}
func createPeerSet(num int) []p2p.NodeID {
peers := make([]p2p.NodeID, num)
for i := 0; i < num; i++ {
peers[i], _ = p2p.NewNodeID(strings.Repeat(fmt.Sprintf("%d", i), 2*p2p.NodeIDByteLength))
}
return peers
}

+ 1
- 1
internal/statesync/mocks/state_provider.go View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.5.1. DO NOT EDIT.
// Code generated by mockery v0.0.0-dev. DO NOT EDIT.
package mocks


+ 363
- 61
internal/statesync/reactor.go View File

@ -1,9 +1,11 @@
package statesync
import (
"bytes"
"context"
"errors"
"fmt"
"reflect"
"sort"
"time"
@ -15,6 +17,7 @@ import (
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
)
@ -49,6 +52,17 @@ var (
SendQueueCapacity: 4,
RecvMessageCapacity: chunkMsgSize,
MaxSendBytes: 400,
},
},
LightBlockChannel: {
MsgType: new(ssproto.Message),
Descriptor: &p2p.ChannelDescriptor{
ID: byte(LightBlockChannel),
Priority: 1,
SendQueueCapacity: 10,
RecvMessageCapacity: lightBlockMsgSize,
MaxSendBytes: 400,
},
},
@ -62,6 +76,9 @@ const (
// ChunkChannel exchanges chunk contents
ChunkChannel = p2p.ChannelID(0x61)
// LightBlockChannel exchanges light blocks
LightBlockChannel = p2p.ChannelID(0x62)
// recentSnapshots is the number of recent snapshots to send and receive per peer.
recentSnapshots = 10
@ -70,6 +87,17 @@ const (
// chunkMsgSize is the maximum size of a chunkResponseMessage
chunkMsgSize = int(16e6)
// lightBlockMsgSize is the maximum size of a lightBlockResponseMessage
lightBlockMsgSize = int(1e7)
// lightBlockResponseTimeout is how long the dispatcher waits for a peer to
// return a light block
lightBlockResponseTimeout = 10 * time.Second
// maxLightBlockRequestRetries is the amount of retries acceptable before
// the backfill process aborts
maxLightBlockRequestRetries = 20
)
// Reactor handles state sync, both restoring snapshots for the local node and
@ -77,14 +105,20 @@ const (
type Reactor struct {
service.BaseService
stateStore sm.Store
blockStore *store.BlockStore
conn proxy.AppConnSnapshot
connQuery proxy.AppConnQuery
tempDir string
snapshotCh *p2p.Channel
chunkCh *p2p.Channel
blockCh *p2p.Channel
peerUpdates *p2p.PeerUpdates
closeCh chan struct{}
dispatcher *dispatcher
// This will only be set when a state sync is in progress. It is used to feed
// received snapshots and chunks into the sync.
mtx tmsync.RWMutex
@ -99,8 +133,10 @@ func NewReactor(
logger log.Logger,
conn proxy.AppConnSnapshot,
connQuery proxy.AppConnQuery,
snapshotCh, chunkCh *p2p.Channel,
snapshotCh, chunkCh, blockCh *p2p.Channel,
peerUpdates *p2p.PeerUpdates,
stateStore sm.Store,
blockStore *store.BlockStore,
tempDir string,
) *Reactor {
r := &Reactor{
@ -108,9 +144,13 @@ func NewReactor(
connQuery: connQuery,
snapshotCh: snapshotCh,
chunkCh: chunkCh,
blockCh: blockCh,
peerUpdates: peerUpdates,
closeCh: make(chan struct{}),
tempDir: tempDir,
dispatcher: newDispatcher(blockCh.Out, lightBlockResponseTimeout),
stateStore: stateStore,
blockStore: blockStore,
}
r.BaseService = *service.NewBaseService(logger, "StateSync", r)
@ -134,14 +174,21 @@ func (r *Reactor) OnStart() error {
// have to deal with bounding workers or pools.
go r.processChunkCh()
go r.processBlockCh()
go r.processPeerUpdates()
r.dispatcher.start()
return nil
}
// OnStop stops the reactor by signaling to all spawned goroutines to exit and
// blocking until they all exit.
func (r *Reactor) OnStop() {
// tell the dispatcher to stop sending any more requests
r.dispatcher.stop()
// Close closeCh to signal to all spawned goroutines to gracefully exit. All
// p2p Channels should execute Close().
close(r.closeCh)
@ -151,9 +198,205 @@ func (r *Reactor) OnStop() {
// panics will occur.
<-r.snapshotCh.Done()
<-r.chunkCh.Done()
<-r.blockCh.Done()
<-r.peerUpdates.Done()
}
// Sync runs a state sync, fetching snapshots and providing chunks to the
// application. It also saves tendermint state and runs a backfill process to
// retrieve the necessary amount of headers, commits and validators sets to be
// able to process evidence and participate in consensus.
func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) error {
r.mtx.Lock()
if r.syncer != nil {
r.mtx.Unlock()
return errors.New("a state sync is already in progress")
}
r.syncer = newSyncer(r.Logger, r.conn, r.connQuery, stateProvider, r.snapshotCh.Out, r.chunkCh.Out, r.tempDir)
r.mtx.Unlock()
hook := func() {
// request snapshots from all currently connected peers
r.Logger.Debug("requesting snapshots from known peers")
r.snapshotCh.Out <- p2p.Envelope{
Broadcast: true,
Message: &ssproto.SnapshotsRequest{},
}
}
hook()
state, commit, err := r.syncer.SyncAny(discoveryTime, hook)
if err != nil {
return err
}
r.mtx.Lock()
r.syncer = nil
r.mtx.Unlock()
err = r.stateStore.Bootstrap(state)
if err != nil {
return fmt.Errorf("failed to bootstrap node with new state: %w", err)
}
err = r.blockStore.SaveSeenCommit(state.LastBlockHeight, commit)
if err != nil {
return fmt.Errorf("failed to store last seen commit: %w", err)
}
// start backfill process to retrieve the necessary headers, commits and
// validator sets
return r.backfill(state)
}
// Backfill sequentially fetches, verifies and stores light blocks in reverse
// order. It does not stop verifying blocks until reaching a block with a height
// and time that is less or equal to the stopHeight and stopTime. The
// trustedBlockID should be of the header at startHeight.
func (r *Reactor) Backfill(
ctx context.Context,
chainID string,
startHeight, stopHeight int64,
trustedBlockID types.BlockID,
stopTime time.Time,
) error {
r.Logger.Info("starting backfill process...", "startHeight", startHeight,
"stopHeight", stopHeight, "trustedBlockID", trustedBlockID)
var (
lastValidatorSet *types.ValidatorSet
lastChangeHeight int64 = startHeight
)
queue := newBlockQueue(startHeight, stopHeight, stopTime, maxLightBlockRequestRetries)
// fetch light blocks across four workers. The aim with deploying concurrent
// workers is to equate the network messaging time with the verification
// time. Ideally we want the verification process to never have to be
// waiting on blocks. If it takes 4s to retrieve a block and 1s to verify
// it, then steady state involves four workers.
for i := 0; i < 4; i++ {
go func() {
for {
select {
case height := <-queue.nextHeight():
r.Logger.Debug("fetching next block", "height", height)
lb, peer, err := r.dispatcher.LightBlock(ctx, height)
if err != nil {
// we don't punish the peer as it might just not have the block
// at that height
r.Logger.Info("error with fetching light block",
"height", height, "err", err)
queue.retry(height)
continue
}
if lb == nil {
r.Logger.Info("peer didn't have block, fetching from another peer", "height", height)
queue.retry(height)
continue
}
if lb.Height != height {
r.Logger.Info("peer provided wrong height, retrying...", "height", height)
queue.retry(height)
continue
}
// run a validate basic. This checks the validator set and commit
// hashes line up
err = lb.ValidateBasic(chainID)
if err != nil {
r.Logger.Info("fetched light block failed validate basic, removing peer...", "err", err)
queue.retry(height)
r.blockCh.Error <- p2p.PeerError{
NodeID: peer,
Err: fmt.Errorf("received invalid light block: %w", err),
}
continue
}
// add block to queue to be verified
queue.add(lightBlockResponse{
block: lb,
peer: peer,
})
r.Logger.Debug("added light block to processing queue", "height", height)
case <-queue.done():
return
}
}
}()
}
// verify all light blocks
for {
select {
case <-r.closeCh:
queue.close()
return nil
case <-ctx.Done():
queue.close()
return nil
case resp := <-queue.verifyNext():
// validate the header hash. We take the last block id of the
// previous header (i.e. one height above) as the trusted hash which
// we equate to. ValidatorsHash and CommitHash have already been
// checked in the `ValidateBasic`
if w, g := trustedBlockID.Hash, resp.block.Hash(); !bytes.Equal(w, g) {
r.Logger.Info("received invalid light block. header hash doesn't match trusted LastBlockID",
"trustedHash", w, "receivedHash", g, "height", resp.block.Height)
r.blockCh.Error <- p2p.PeerError{
NodeID: resp.peer,
Err: fmt.Errorf("received invalid light block. Expected hash %v, got: %v", w, g),
}
queue.retry(resp.block.Height)
continue
}
// save the signed headers
err := r.blockStore.SaveSignedHeader(resp.block.SignedHeader, trustedBlockID)
if err != nil {
return err
}
// check if there has been a change in the validator set
if lastValidatorSet != nil && !bytes.Equal(resp.block.Header.ValidatorsHash, resp.block.Header.NextValidatorsHash) {
// save all the heights that the last validator set was the same
err = r.stateStore.SaveValidatorSets(resp.block.Height+1, lastChangeHeight, lastValidatorSet)
if err != nil {
return err
}
// update the lastChangeHeight
lastChangeHeight = resp.block.Height
}
trustedBlockID = resp.block.LastBlockID
queue.success(resp.block.Height)
r.Logger.Info("verified and stored light block", "height", resp.block.Height)
lastValidatorSet = resp.block.ValidatorSet
case <-queue.done():
if err := queue.error(); err != nil {
return err
}
// save the final batch of validators
return r.stateStore.SaveValidatorSets(queue.terminal.Height, lastChangeHeight, lastValidatorSet)
}
}
}
// Dispatcher exposes the dispatcher so that a state provider can use it for
// light client verification
func (r *Reactor) Dispatcher() *dispatcher { //nolint:golint
return r.dispatcher
}
// handleSnapshotMessage handles envelopes sent from peers on the
// SnapshotChannel. It returns an error only if the Envelope.Message is unknown
// for this channel. This should never be called outside of handleMessage.
@ -311,6 +554,44 @@ func (r *Reactor) handleChunkMessage(envelope p2p.Envelope) error {
return nil
}
func (r *Reactor) handleLightBlockMessage(envelope p2p.Envelope) error {
switch msg := envelope.Message.(type) {
case *ssproto.LightBlockRequest:
r.Logger.Info("received light block request", "height", msg.Height)
lb, err := r.fetchLightBlock(msg.Height)
if err != nil {
r.Logger.Error("failed to retrieve light block", "err", err, "height", msg.Height)
return err
}
lbproto, err := lb.ToProto()
if err != nil {
r.Logger.Error("marshaling light block to proto", "err", err)
return nil
}
// NOTE: If we don't have the light block we will send a nil light block
// back to the requested node, indicating that we don't have it.
r.blockCh.Out <- p2p.Envelope{
To: envelope.From,
Message: &ssproto.LightBlockResponse{
LightBlock: lbproto,
},
}
case *ssproto.LightBlockResponse:
if err := r.dispatcher.respond(msg.LightBlock, envelope.From); err != nil {
r.Logger.Error("error processing light block response", "err", err)
return err
}
default:
return fmt.Errorf("received unknown message: %T", msg)
}
return nil
}
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel.
@ -321,7 +602,7 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err
}
}()
r.Logger.Debug("received message", "message", envelope.Message, "peer", envelope.From)
r.Logger.Debug("received message", "message", reflect.TypeOf(envelope.Message), "peer", envelope.From)
switch chID {
case SnapshotChannel:
@ -330,6 +611,9 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err
case ChunkChannel:
err = r.handleChunkMessage(envelope)
case LightBlockChannel:
err = r.handleLightBlockMessage(envelope)
default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
}
@ -338,52 +622,44 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err
}
// processSnapshotCh initiates a blocking process where we listen for and handle
// envelopes on the SnapshotChannel. Any error encountered during message
// execution will result in a PeerError being sent on the SnapshotChannel. When
// the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully.
// envelopes on the SnapshotChannel.
func (r *Reactor) processSnapshotCh() {
defer r.snapshotCh.Close()
for {
select {
case envelope := <-r.snapshotCh.In:
if err := r.handleMessage(r.snapshotCh.ID, envelope); err != nil {
r.Logger.Error("failed to process message", "ch_id", r.snapshotCh.ID, "envelope", envelope, "err", err)
r.snapshotCh.Error <- p2p.PeerError{
NodeID: envelope.From,
Err: err,
}
}
case <-r.closeCh:
r.Logger.Debug("stopped listening on snapshot channel; closing...")
return
}
}
r.processCh(r.snapshotCh, "snapshot")
}
// processChunkCh initiates a blocking process where we listen for and handle
// envelopes on the ChunkChannel. Any error encountered during message
// execution will result in a PeerError being sent on the ChunkChannel. When
// the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully.
// envelopes on the ChunkChannel.
func (r *Reactor) processChunkCh() {
defer r.chunkCh.Close()
r.processCh(r.chunkCh, "chunk")
}
// processBlockCh initiates a blocking process where we listen for and handle
// envelopes on the LightBlockChannel.
func (r *Reactor) processBlockCh() {
r.processCh(r.blockCh, "light block")
}
// processCh routes state sync messages to their respective handlers. Any error
// encountered during message execution will result in a PeerError being sent on
// the respective channel. When the reactor is stopped, we will catch the signal
// and close the p2p Channel gracefully.
func (r *Reactor) processCh(ch *p2p.Channel, chName string) {
defer ch.Close()
for {
select {
case envelope := <-r.chunkCh.In:
if err := r.handleMessage(r.chunkCh.ID, envelope); err != nil {
r.Logger.Error("failed to process message", "ch_id", r.chunkCh.ID, "envelope", envelope, "err", err)
r.chunkCh.Error <- p2p.PeerError{
case envelope := <-ch.In:
if err := r.handleMessage(ch.ID, envelope); err != nil {
r.Logger.Error(fmt.Sprintf("failed to process %s message", chName),
"ch_id", ch.ID, "envelope", envelope, "err", err)
ch.Error <- p2p.PeerError{
NodeID: envelope.From,
Err: err,
}
}
case <-r.closeCh:
r.Logger.Debug("stopped listening on chunk channel; closing...")
r.Logger.Debug(fmt.Sprintf("stopped listening on %s channel; closing...", chName))
return
}
}
@ -397,14 +673,18 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
r.mtx.RLock()
defer r.mtx.RUnlock()
if r.syncer != nil {
switch peerUpdate.Status {
case p2p.PeerStatusUp:
switch peerUpdate.Status {
case p2p.PeerStatusUp:
if r.syncer != nil {
r.syncer.AddPeer(peerUpdate.NodeID)
}
r.dispatcher.addPeer(peerUpdate.NodeID)
case p2p.PeerStatusDown:
case p2p.PeerStatusDown:
if r.syncer != nil {
r.syncer.RemovePeer(peerUpdate.NodeID)
}
r.dispatcher.removePeer(peerUpdate.NodeID)
}
}
@ -465,34 +745,56 @@ func (r *Reactor) recentSnapshots(n uint32) ([]*snapshot, error) {
return snapshots, nil
}
// Sync runs a state sync, returning the new state and last commit at the snapshot height.
// The caller must store the state and commit in the state database and block store.
func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) (sm.State, *types.Commit, error) {
r.mtx.Lock()
if r.syncer != nil {
r.mtx.Unlock()
return sm.State{}, nil, errors.New("a state sync is already in progress")
}
// fetchLightBlock works out whether the node has a light block at a particular
// height and if so returns it so it can be gossiped to peers
func (r *Reactor) fetchLightBlock(height uint64) (*types.LightBlock, error) {
h := int64(height)
r.syncer = newSyncer(r.Logger, r.conn, r.connQuery, stateProvider, r.snapshotCh.Out, r.chunkCh.Out, r.tempDir)
r.mtx.Unlock()
blockMeta := r.blockStore.LoadBlockMeta(h)
if blockMeta == nil {
return nil, nil
}
hook := func() {
// request snapshots from all currently connected peers
r.Logger.Debug("requesting snapshots from known peers")
r.snapshotCh.Out <- p2p.Envelope{
Broadcast: true,
Message: &ssproto.SnapshotsRequest{},
}
commit := r.blockStore.LoadBlockCommit(h)
if commit == nil {
return nil, nil
}
hook()
vals, err := r.stateStore.LoadValidators(h)
if err != nil {
return nil, err
}
if vals == nil {
return nil, nil
}
state, commit, err := r.syncer.SyncAny(discoveryTime, hook)
return &types.LightBlock{
SignedHeader: &types.SignedHeader{
Header: &blockMeta.Header,
Commit: commit,
},
ValidatorSet: vals,
}, nil
r.mtx.Lock()
r.syncer = nil
r.mtx.Unlock()
}
return state, commit, err
// backfill is a convenience wrapper around the backfill function. It takes
// state to work out how many prior blocks need to be verified
func (r *Reactor) backfill(state sm.State) error {
params := state.ConsensusParams.Evidence
stopHeight := state.LastBlockHeight - params.MaxAgeNumBlocks
stopTime := state.LastBlockTime.Add(-params.MaxAgeDuration)
// ensure that stop height doesn't go below the initial height
if stopHeight < state.InitialHeight {
stopHeight = state.InitialHeight
// this essentially makes stop time a void criteria for termination
stopTime = state.LastBlockTime
}
return r.Backfill(
context.Background(),
state.ChainID,
state.LastBlockHeight, stopHeight,
state.LastBlockID,
stopTime,
)
}

+ 296
- 3
internal/statesync/reactor_test.go View File

@ -2,17 +2,29 @@ package statesync
import (
"context"
"fmt"
"math/rand"
"sync"
"testing"
"time"
// "github.com/fortytw2/leaktest"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/statesync/mocks"
"github.com/tendermint/tendermint/internal/test/factory"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/light/provider"
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
proxymocks "github.com/tendermint/tendermint/proxy/mocks"
smmocks "github.com/tendermint/tendermint/state/mocks"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
)
type reactorTestSuite struct {
@ -33,7 +45,16 @@ type reactorTestSuite struct {
chunkOutCh chan p2p.Envelope
chunkPeerErrCh chan p2p.PeerError
peerUpdates *p2p.PeerUpdates
blockChannel *p2p.Channel
blockInCh chan p2p.Envelope
blockOutCh chan p2p.Envelope
blockPeerErrCh chan p2p.PeerError
peerUpdateCh chan p2p.PeerUpdate
peerUpdates *p2p.PeerUpdates
stateStore *smmocks.Store
blockStore *store.BlockStore
}
func setup(
@ -62,12 +83,17 @@ func setup(
chunkInCh: make(chan p2p.Envelope, chBuf),
chunkOutCh: make(chan p2p.Envelope, chBuf),
chunkPeerErrCh: make(chan p2p.PeerError, chBuf),
peerUpdates: p2p.NewPeerUpdates(make(chan p2p.PeerUpdate), int(chBuf)),
blockInCh: make(chan p2p.Envelope, chBuf),
blockOutCh: make(chan p2p.Envelope, chBuf),
blockPeerErrCh: make(chan p2p.PeerError, chBuf),
conn: conn,
connQuery: connQuery,
stateProvider: stateProvider,
}
rts.peerUpdateCh = make(chan p2p.PeerUpdate, chBuf)
rts.peerUpdates = p2p.NewPeerUpdates(rts.peerUpdateCh, int(chBuf))
rts.snapshotChannel = p2p.NewChannel(
SnapshotChannel,
new(ssproto.Message),
@ -84,16 +110,33 @@ func setup(
rts.chunkPeerErrCh,
)
rts.blockChannel = p2p.NewChannel(
LightBlockChannel,
new(ssproto.Message),
rts.blockInCh,
rts.blockOutCh,
rts.blockPeerErrCh,
)
rts.stateStore = &smmocks.Store{}
rts.blockStore = store.NewBlockStore(dbm.NewMemDB())
rts.reactor = NewReactor(
log.NewNopLogger(),
log.TestingLogger(),
conn,
connQuery,
rts.snapshotChannel,
rts.chunkChannel,
rts.blockChannel,
rts.peerUpdates,
rts.stateStore,
rts.blockStore,
"",
)
// override the dispatcher with one with a shorter timeout
rts.reactor.dispatcher = newDispatcher(rts.blockChannel.Out, 1*time.Second)
rts.syncer = newSyncer(
log.NewNopLogger(),
conn,
@ -270,6 +313,172 @@ func TestReactor_SnapshotsRequest(t *testing.T) {
}
}
func TestReactor_LightBlockResponse(t *testing.T) {
rts := setup(t, nil, nil, nil, 2)
var height int64 = 10
h := factory.MakeRandomHeader()
h.Height = height
blockID := factory.MakeBlockIDWithHash(h.Hash())
vals, pv := factory.RandValidatorSet(1, 10)
vote, err := factory.MakeVote(pv[0], h.ChainID, 0, h.Height, 0, 2,
blockID, factory.DefaultTestTime)
require.NoError(t, err)
sh := &types.SignedHeader{
Header: h,
Commit: &types.Commit{
Height: h.Height,
BlockID: blockID,
Signatures: []types.CommitSig{
vote.CommitSig(),
},
},
}
lb := &types.LightBlock{
SignedHeader: sh,
ValidatorSet: vals,
}
require.NoError(t, rts.blockStore.SaveSignedHeader(sh, blockID))
rts.stateStore.On("LoadValidators", height).Return(vals, nil)
rts.blockInCh <- p2p.Envelope{
From: p2p.NodeID("aa"),
Message: &ssproto.LightBlockRequest{
Height: 10,
},
}
require.Empty(t, rts.blockPeerErrCh)
select {
case response := <-rts.blockOutCh:
require.Equal(t, p2p.NodeID("aa"), response.To)
res, ok := response.Message.(*ssproto.LightBlockResponse)
require.True(t, ok)
receivedLB, err := types.LightBlockFromProto(res.LightBlock)
require.NoError(t, err)
require.Equal(t, lb, receivedLB)
case <-time.After(1 * time.Second):
t.Fatal("expected light block response")
}
}
func TestReactor_Dispatcher(t *testing.T) {
rts := setup(t, nil, nil, nil, 2)
rts.peerUpdateCh <- p2p.PeerUpdate{
NodeID: p2p.NodeID("aa"),
Status: p2p.PeerStatusUp,
}
rts.peerUpdateCh <- p2p.PeerUpdate{
NodeID: p2p.NodeID("bb"),
Status: p2p.PeerStatusUp,
}
closeCh := make(chan struct{})
defer close(closeCh)
chain := buildLightBlockChain(t, 1, 10, time.Now())
go handleLightBlockRequests(t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0)
dispatcher := rts.reactor.Dispatcher()
providers := dispatcher.Providers(factory.DefaultTestChainID, 5*time.Second)
require.Len(t, providers, 2)
wg := sync.WaitGroup{}
for _, p := range providers {
wg.Add(1)
go func(t *testing.T, p provider.Provider) {
defer wg.Done()
for height := 2; height < 10; height++ {
lb, err := p.LightBlock(context.Background(), int64(height))
require.NoError(t, err)
require.NotNil(t, lb)
require.Equal(t, height, int(lb.Height))
}
}(t, p)
}
ctx, cancel := context.WithCancel(context.Background())
go func() { wg.Wait(); cancel() }()
select {
case <-time.After(time.Second):
// not all of the requests to the dispatcher were responded to
// within the timeout
t.Fail()
case <-ctx.Done():
}
}
func TestReactor_Backfill(t *testing.T) {
// test backfill algorithm with varying failure rates [0, 10]
failureRates := []int{0, 3, 9}
for _, failureRate := range failureRates {
failureRate := failureRate
t.Run(fmt.Sprintf("failure rate: %d", failureRate), func(t *testing.T) {
// t.Cleanup(leaktest.Check(t))
rts := setup(t, nil, nil, nil, 21)
var (
startHeight int64 = 20
stopHeight int64 = 10
stopTime = time.Date(2020, 1, 1, 0, 100, 0, 0, time.UTC)
)
peers := []string{"a", "b", "c", "d"}
for _, peer := range peers {
rts.peerUpdateCh <- p2p.PeerUpdate{
NodeID: p2p.NodeID(peer),
Status: p2p.PeerStatusUp,
}
}
trackingHeight := startHeight
rts.stateStore.On("SaveValidatorSets", mock.AnythingOfType("int64"), mock.AnythingOfType("int64"),
mock.AnythingOfType("*types.ValidatorSet")).Return(func(lh, uh int64, vals *types.ValidatorSet) error {
require.Equal(t, trackingHeight, lh)
require.Equal(t, lh, uh)
require.GreaterOrEqual(t, lh, stopHeight)
trackingHeight--
return nil
})
chain := buildLightBlockChain(t, stopHeight-1, startHeight+1, stopTime)
closeCh := make(chan struct{})
defer close(closeCh)
go handleLightBlockRequests(t, chain, rts.blockOutCh,
rts.blockInCh, closeCh, failureRate)
err := rts.reactor.Backfill(
context.Background(),
factory.DefaultTestChainID,
startHeight,
stopHeight,
factory.MakeBlockIDWithHash(chain[startHeight].Header.Hash()),
stopTime,
)
if failureRate > 5 {
require.Error(t, err)
} else {
require.NoError(t, err)
for height := startHeight; height <= stopHeight; height++ {
blockMeta := rts.blockStore.LoadBlockMeta(height)
require.NotNil(t, blockMeta)
}
require.Nil(t, rts.blockStore.LoadBlockMeta(stopHeight-1))
require.Nil(t, rts.blockStore.LoadBlockMeta(startHeight+1))
}
})
}
}
// retryUntil will continue to evaluate fn and will return successfully when true
// or fail when the timeout is reached.
func retryUntil(t *testing.T, fn func() bool, timeout time.Duration) {
@ -284,3 +493,87 @@ func retryUntil(t *testing.T, fn func() bool, timeout time.Duration) {
require.NoError(t, ctx.Err())
}
}
func handleLightBlockRequests(t *testing.T,
chain map[int64]*types.LightBlock,
receiving chan p2p.Envelope,
sending chan p2p.Envelope,
close chan struct{},
failureRate int) {
requests := 0
for {
select {
case envelope := <-receiving:
if msg, ok := envelope.Message.(*ssproto.LightBlockRequest); ok {
if requests%10 >= failureRate {
lb, err := chain[int64(msg.Height)].ToProto()
require.NoError(t, err)
sending <- p2p.Envelope{
From: envelope.To,
Message: &ssproto.LightBlockResponse{
LightBlock: lb,
},
}
} else {
switch rand.Intn(3) {
case 0: // send a different block
differntLB, err := mockLB(t, int64(msg.Height), factory.DefaultTestTime, factory.MakeBlockID()).ToProto()
require.NoError(t, err)
sending <- p2p.Envelope{
From: envelope.To,
Message: &ssproto.LightBlockResponse{
LightBlock: differntLB,
},
}
case 1: // send nil block i.e. pretend we don't have it
sending <- p2p.Envelope{
From: envelope.To,
Message: &ssproto.LightBlockResponse{
LightBlock: nil,
},
}
case 2: // don't do anything
}
}
}
case <-close:
return
}
requests++
}
}
func buildLightBlockChain(t *testing.T, fromHeight, toHeight int64, startTime time.Time) map[int64]*types.LightBlock {
chain := make(map[int64]*types.LightBlock, toHeight-fromHeight)
lastBlockID := factory.MakeBlockID()
blockTime := startTime.Add(-5 * time.Minute)
for height := fromHeight; height < toHeight; height++ {
chain[height] = mockLB(t, height, blockTime, lastBlockID)
lastBlockID = factory.MakeBlockIDWithHash(chain[height].Header.Hash())
blockTime = blockTime.Add(1 * time.Minute)
}
return chain
}
func mockLB(t *testing.T, height int64, time time.Time,
lastBlockID types.BlockID) *types.LightBlock {
header, err := factory.MakeHeader(&types.Header{
Height: height,
LastBlockID: lastBlockID,
Time: time,
})
require.NoError(t, err)
vals, pv := factory.RandValidatorSet(3, 10)
header.ValidatorsHash = vals.Hash()
lastBlockID = factory.MakeBlockIDWithHash(header.Hash())
voteSet := types.NewVoteSet(factory.DefaultTestChainID, height, 0, tmproto.PrecommitType, vals)
commit, err := factory.MakeCommit(lastBlockID, height, 0, voteSet, pv, time)
require.NoError(t, err)
return &types.LightBlock{
SignedHeader: &types.SignedHeader{
Header: header,
Commit: commit,
},
ValidatorSet: vals,
}
}

+ 36
- 1
internal/statesync/stateprovider.go View File

@ -53,7 +53,7 @@ func NewLightClientStateProvider(
logger log.Logger,
) (StateProvider, error) {
if len(servers) < 2 {
return nil, fmt.Errorf("at least 2 RPC servers are required, got %v", len(servers))
return nil, fmt.Errorf("at least 2 RPC servers are required, got %d", len(servers))
}
providers := make([]lightprovider.Provider, 0, len(servers))
@ -83,6 +83,41 @@ func NewLightClientStateProvider(
}, nil
}
// NewLightClientStateProviderFromDispatcher creates a light client state
// provider but uses a p2p connected dispatched instead of RPC endpoints
func NewLightClientStateProviderFromDispatcher(
ctx context.Context,
chainID string,
version sm.Version,
initialHeight int64,
dispatcher *dispatcher,
trustOptions light.TrustOptions,
logger log.Logger,
) (StateProvider, error) {
providers := dispatcher.Providers(chainID, 10*time.Second)
if len(providers) < 2 {
return nil, fmt.Errorf("at least 2 peers are required, got %d", len(providers))
}
providersMap := make(map[lightprovider.Provider]string)
for _, p := range providers {
providersMap[p] = p.(*blockProvider).String()
}
lc, err := light.NewClient(ctx, chainID, trustOptions, providers[0], providers[1:],
lightdb.New(dbm.NewMemDB()), light.Logger(logger))
if err != nil {
return nil, err
}
return &lightClientStateProvider{
lc: lc,
version: version,
initialHeight: initialHeight,
providers: providersMap,
}, nil
}
// AppHash implements StateProvider.
func (s *lightClientStateProvider) AppHash(ctx context.Context, height uint64) ([]byte, error) {
s.Lock()


+ 6
- 0
internal/test/factory/block.go View File

@ -1,6 +1,8 @@
package factory
import (
"time"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/types"
@ -11,6 +13,10 @@ const (
DefaultTestChainID = "test-chain"
)
var (
DefaultTestTime = time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)
)
func MakeVersion() version.Consensus {
return version.Consensus{
Block: version.BlockProtocol,


+ 7
- 9
node/node.go View File

@ -333,7 +333,10 @@ func makeNode(config *cfg.Config,
proxyApp.Query(),
channels[statesync.SnapshotChannel],
channels[statesync.ChunkChannel],
channels[statesync.LightBlockChannel],
peerUpdates,
stateStore,
blockStore,
config.StateSync.TempDir,
)
@ -1038,20 +1041,15 @@ func startStateSync(ssR *statesync.Reactor, bcR fastSyncReactor, conR *cs.Reacto
}
go func() {
state, commit, err := ssR.Sync(stateProvider, config.DiscoveryTime)
err := ssR.Sync(stateProvider, config.DiscoveryTime)
if err != nil {
ssR.Logger.Error("State sync failed", "err", err)
return
}
err = stateStore.Bootstrap(state)
if err != nil {
ssR.Logger.Error("Failed to bootstrap node with new state", "err", err)
return
}
err = blockStore.SaveSeenCommit(state.LastBlockHeight, commit)
state, err := stateStore.Load()
if err != nil {
ssR.Logger.Error("Failed to store last seen commit", "err", err)
return
ssR.Logger.Error("failed to load state", "err", err)
}
if fastSync {


+ 1
- 0
node/setup.go View File

@ -749,6 +749,7 @@ func makeNodeInfo(
byte(evidence.EvidenceChannel),
byte(statesync.SnapshotChannel),
byte(statesync.ChunkChannel),
byte(statesync.LightBlockChannel),
},
Moniker: config.Moniker,
Other: p2p.NodeInfoOther{


+ 1
- 1
proto/tendermint/abci/types.proto View File

@ -213,7 +213,7 @@ message ResponseDeliverTx {
message ResponseEndBlock {
repeated ValidatorUpdate validator_updates = 1 [(gogoproto.nullable) = false];
tendermint.types.ConsensusParams consensus_param_updates = 2;
repeated Event events = 3 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "events,omitempty"];
repeated Event events = 3 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "events,omitempty"];
}
message ResponseCommit {


+ 0
- 1
proto/tendermint/p2p/pex.proto View File

@ -27,7 +27,6 @@ message PexResponseV2 {
repeated PexAddressV2 addresses = 1 [(gogoproto.nullable) = false];
}
message PexMessage {
oneof sum {
PexRequest pex_request = 1;


+ 20
- 0
proto/tendermint/statesync/message.go View File

@ -22,6 +22,12 @@ func (m *Message) Wrap(pb proto.Message) error {
case *SnapshotsResponse:
m.Sum = &Message_SnapshotsResponse{SnapshotsResponse: msg}
case *LightBlockRequest:
m.Sum = &Message_LightBlockRequest{LightBlockRequest: msg}
case *LightBlockResponse:
m.Sum = &Message_LightBlockResponse{LightBlockResponse: msg}
default:
return fmt.Errorf("unknown message: %T", msg)
}
@ -45,6 +51,12 @@ func (m *Message) Unwrap() (proto.Message, error) {
case *Message_SnapshotsResponse:
return m.GetSnapshotsResponse(), nil
case *Message_LightBlockRequest:
return m.GetLightBlockRequest(), nil
case *Message_LightBlockResponse:
return m.GetLightBlockResponse(), nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
@ -86,6 +98,14 @@ func (m *Message) Validate() error {
return errors.New("snapshot has no chunks")
}
case *Message_LightBlockRequest:
if m.GetLightBlockRequest().Height == 0 {
return errors.New("height cannot be 0")
}
// light block validation handled by the backfill process
case *Message_LightBlockResponse:
default:
return fmt.Errorf("unknown message type: %T", msg)
}


+ 532
- 30
proto/tendermint/statesync/types.pb.go View File

@ -6,6 +6,7 @@ package statesync
import (
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
types "github.com/tendermint/tendermint/proto/tendermint/types"
io "io"
math "math"
math_bits "math/bits"
@ -28,6 +29,8 @@ type Message struct {
// *Message_SnapshotsResponse
// *Message_ChunkRequest
// *Message_ChunkResponse
// *Message_LightBlockRequest
// *Message_LightBlockResponse
Sum isMessage_Sum `protobuf_oneof:"sum"`
}
@ -82,11 +85,19 @@ type Message_ChunkRequest struct {
type Message_ChunkResponse struct {
ChunkResponse *ChunkResponse `protobuf:"bytes,4,opt,name=chunk_response,json=chunkResponse,proto3,oneof" json:"chunk_response,omitempty"`
}
type Message_LightBlockRequest struct {
LightBlockRequest *LightBlockRequest `protobuf:"bytes,5,opt,name=light_block_request,json=lightBlockRequest,proto3,oneof" json:"light_block_request,omitempty"`
}
type Message_LightBlockResponse struct {
LightBlockResponse *LightBlockResponse `protobuf:"bytes,6,opt,name=light_block_response,json=lightBlockResponse,proto3,oneof" json:"light_block_response,omitempty"`
}
func (*Message_SnapshotsRequest) isMessage_Sum() {}
func (*Message_SnapshotsResponse) isMessage_Sum() {}
func (*Message_ChunkRequest) isMessage_Sum() {}
func (*Message_ChunkResponse) isMessage_Sum() {}
func (*Message_SnapshotsRequest) isMessage_Sum() {}
func (*Message_SnapshotsResponse) isMessage_Sum() {}
func (*Message_ChunkRequest) isMessage_Sum() {}
func (*Message_ChunkResponse) isMessage_Sum() {}
func (*Message_LightBlockRequest) isMessage_Sum() {}
func (*Message_LightBlockResponse) isMessage_Sum() {}
func (m *Message) GetSum() isMessage_Sum {
if m != nil {
@ -123,6 +134,20 @@ func (m *Message) GetChunkResponse() *ChunkResponse {
return nil
}
func (m *Message) GetLightBlockRequest() *LightBlockRequest {
if x, ok := m.GetSum().(*Message_LightBlockRequest); ok {
return x.LightBlockRequest
}
return nil
}
func (m *Message) GetLightBlockResponse() *LightBlockResponse {
if x, ok := m.GetSum().(*Message_LightBlockResponse); ok {
return x.LightBlockResponse
}
return nil
}
// XXX_OneofWrappers is for the internal use of the proto package.
func (*Message) XXX_OneofWrappers() []interface{} {
return []interface{}{
@ -130,6 +155,8 @@ func (*Message) XXX_OneofWrappers() []interface{} {
(*Message_SnapshotsResponse)(nil),
(*Message_ChunkRequest)(nil),
(*Message_ChunkResponse)(nil),
(*Message_LightBlockRequest)(nil),
(*Message_LightBlockResponse)(nil),
}
}
@ -381,43 +408,139 @@ func (m *ChunkResponse) GetMissing() bool {
return false
}
type LightBlockRequest struct {
Height uint64 `protobuf:"varint,1,opt,name=height,proto3" json:"height,omitempty"`
}
func (m *LightBlockRequest) Reset() { *m = LightBlockRequest{} }
func (m *LightBlockRequest) String() string { return proto.CompactTextString(m) }
func (*LightBlockRequest) ProtoMessage() {}
func (*LightBlockRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_a1c2869546ca7914, []int{5}
}
func (m *LightBlockRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *LightBlockRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_LightBlockRequest.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *LightBlockRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_LightBlockRequest.Merge(m, src)
}
func (m *LightBlockRequest) XXX_Size() int {
return m.Size()
}
func (m *LightBlockRequest) XXX_DiscardUnknown() {
xxx_messageInfo_LightBlockRequest.DiscardUnknown(m)
}
var xxx_messageInfo_LightBlockRequest proto.InternalMessageInfo
func (m *LightBlockRequest) GetHeight() uint64 {
if m != nil {
return m.Height
}
return 0
}
type LightBlockResponse struct {
LightBlock *types.LightBlock `protobuf:"bytes,1,opt,name=light_block,json=lightBlock,proto3" json:"light_block,omitempty"`
}
func (m *LightBlockResponse) Reset() { *m = LightBlockResponse{} }
func (m *LightBlockResponse) String() string { return proto.CompactTextString(m) }
func (*LightBlockResponse) ProtoMessage() {}
func (*LightBlockResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_a1c2869546ca7914, []int{6}
}
func (m *LightBlockResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *LightBlockResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_LightBlockResponse.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *LightBlockResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_LightBlockResponse.Merge(m, src)
}
func (m *LightBlockResponse) XXX_Size() int {
return m.Size()
}
func (m *LightBlockResponse) XXX_DiscardUnknown() {
xxx_messageInfo_LightBlockResponse.DiscardUnknown(m)
}
var xxx_messageInfo_LightBlockResponse proto.InternalMessageInfo
func (m *LightBlockResponse) GetLightBlock() *types.LightBlock {
if m != nil {
return m.LightBlock
}
return nil
}
func init() {
proto.RegisterType((*Message)(nil), "tendermint.statesync.Message")
proto.RegisterType((*SnapshotsRequest)(nil), "tendermint.statesync.SnapshotsRequest")
proto.RegisterType((*SnapshotsResponse)(nil), "tendermint.statesync.SnapshotsResponse")
proto.RegisterType((*ChunkRequest)(nil), "tendermint.statesync.ChunkRequest")
proto.RegisterType((*ChunkResponse)(nil), "tendermint.statesync.ChunkResponse")
proto.RegisterType((*LightBlockRequest)(nil), "tendermint.statesync.LightBlockRequest")
proto.RegisterType((*LightBlockResponse)(nil), "tendermint.statesync.LightBlockResponse")
}
func init() { proto.RegisterFile("tendermint/statesync/types.proto", fileDescriptor_a1c2869546ca7914) }
var fileDescriptor_a1c2869546ca7914 = []byte{
// 393 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x53, 0xcd, 0x6a, 0xdb, 0x40,
0x18, 0x94, 0xfc, 0xcf, 0x57, 0xab, 0xd8, 0x8b, 0x29, 0xa2, 0x07, 0x61, 0x54, 0x68, 0x7b, 0x92,
0xa0, 0x3d, 0xf6, 0xe6, 0x5e, 0x5c, 0x68, 0x2f, 0xdb, 0x18, 0x42, 0x2e, 0x61, 0x2d, 0x6f, 0x24,
0x11, 0xb4, 0x52, 0xf4, 0xad, 0x20, 0x7e, 0x80, 0x9c, 0x72, 0xc9, 0x63, 0xe5, 0xe8, 0x63, 0xc8,
0x29, 0xd8, 0x2f, 0x12, 0xb4, 0x92, 0x65, 0xc5, 0x31, 0x09, 0x81, 0xdc, 0x76, 0xc6, 0xe3, 0xd1,
0xcc, 0xc0, 0x07, 0x63, 0xc9, 0xc5, 0x82, 0xa7, 0x51, 0x28, 0xa4, 0x8b, 0x92, 0x49, 0x8e, 0x4b,
0xe1, 0xb9, 0x72, 0x99, 0x70, 0x74, 0x92, 0x34, 0x96, 0x31, 0x19, 0xed, 0x14, 0x4e, 0xa5, 0xb0,
0xef, 0x1b, 0xd0, 0xfd, 0xc7, 0x11, 0x99, 0xcf, 0xc9, 0x0c, 0x86, 0x28, 0x58, 0x82, 0x41, 0x2c,
0xf1, 0x34, 0xe5, 0x17, 0x19, 0x47, 0x69, 0xea, 0x63, 0xfd, 0xfb, 0x87, 0x1f, 0x5f, 0x9d, 0x43,
0xff, 0x76, 0xfe, 0x6f, 0xe5, 0xb4, 0x50, 0x4f, 0x35, 0x3a, 0xc0, 0x3d, 0x8e, 0x1c, 0x03, 0xa9,
0xdb, 0x62, 0x12, 0x0b, 0xe4, 0x66, 0x43, 0xf9, 0x7e, 0x7b, 0xd5, 0xb7, 0x90, 0x4f, 0x35, 0x3a,
0xc4, 0x7d, 0x92, 0xfc, 0x01, 0xc3, 0x0b, 0x32, 0x71, 0x5e, 0x85, 0x6d, 0x2a, 0x53, 0xfb, 0xb0,
0xe9, 0xef, 0x5c, 0xba, 0x0b, 0xda, 0xf7, 0x6a, 0x98, 0xfc, 0x85, 0x8f, 0x5b, 0xab, 0x32, 0x60,
0x4b, 0x79, 0x7d, 0x79, 0xd1, 0xab, 0x0a, 0x67, 0x78, 0x75, 0x62, 0xd2, 0x86, 0x26, 0x66, 0x91,
0x4d, 0x60, 0xb0, 0xbf, 0x90, 0x7d, 0xad, 0xc3, 0xf0, 0x59, 0x3d, 0xf2, 0x09, 0x3a, 0x01, 0x0f,
0xfd, 0xa0, 0xd8, 0xbb, 0x45, 0x4b, 0x94, 0xf3, 0x67, 0x71, 0x1a, 0x31, 0xa9, 0xf6, 0x32, 0x68,
0x89, 0x72, 0x5e, 0x7d, 0x11, 0x55, 0x65, 0x83, 0x96, 0x88, 0x10, 0x68, 0x05, 0x0c, 0x03, 0x15,
0xbe, 0x4f, 0xd5, 0x9b, 0x7c, 0x86, 0x5e, 0xc4, 0x25, 0x5b, 0x30, 0xc9, 0xcc, 0xb6, 0xe2, 0x2b,
0x6c, 0x1f, 0x41, 0xbf, 0x3e, 0xcb, 0x9b, 0x73, 0x8c, 0xa0, 0x1d, 0x8a, 0x05, 0xbf, 0x2c, 0x63,
0x14, 0xc0, 0xbe, 0xd2, 0xc1, 0x78, 0xb2, 0xd0, 0xfb, 0xf8, 0xe6, 0xac, 0xea, 0x59, 0xd6, 0x2b,
0x00, 0x31, 0xa1, 0x1b, 0x85, 0x88, 0xa1, 0xf0, 0x55, 0xbd, 0x1e, 0xdd, 0xc2, 0xc9, 0xec, 0x76,
0x6d, 0xe9, 0xab, 0xb5, 0xa5, 0x3f, 0xac, 0x2d, 0xfd, 0x66, 0x63, 0x69, 0xab, 0x8d, 0xa5, 0xdd,
0x6d, 0x2c, 0xed, 0xe4, 0x97, 0x1f, 0xca, 0x20, 0x9b, 0x3b, 0x5e, 0x1c, 0xb9, 0xb5, 0xcb, 0xa9,
0x3d, 0xd5, 0xd1, 0xb8, 0x87, 0xae, 0x6a, 0xde, 0x51, 0xbf, 0xfd, 0x7c, 0x0c, 0x00, 0x00, 0xff,
0xff, 0xcc, 0x16, 0xc2, 0x8b, 0x74, 0x03, 0x00, 0x00,
// 485 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x54, 0x51, 0x6b, 0xd3, 0x50,
0x14, 0x4e, 0x5c, 0xdb, 0x8d, 0xb3, 0x46, 0x96, 0x63, 0x91, 0x32, 0x46, 0x18, 0x11, 0x74, 0x20,
0xa4, 0xa0, 0x8f, 0xe2, 0x4b, 0x7d, 0x99, 0x30, 0x5f, 0xee, 0x1c, 0xa8, 0x08, 0x23, 0x4d, 0xaf,
0x4d, 0xb0, 0x49, 0x6a, 0xcf, 0x2d, 0xb8, 0x1f, 0xe0, 0x93, 0x2f, 0x82, 0x7f, 0xca, 0xc7, 0x3d,
0xfa, 0x28, 0xed, 0x1f, 0x91, 0x9c, 0xdc, 0x26, 0x77, 0x6d, 0x5d, 0x11, 0xf6, 0x96, 0xef, 0xeb,
0x77, 0x3e, 0xbe, 0x73, 0xcf, 0xe9, 0x81, 0x63, 0x25, 0xb3, 0xa1, 0x9c, 0xa6, 0x49, 0xa6, 0x7a,
0xa4, 0x42, 0x25, 0xe9, 0x2a, 0x8b, 0x7a, 0xea, 0x6a, 0x22, 0x29, 0x98, 0x4c, 0x73, 0x95, 0x63,
0xa7, 0x56, 0x04, 0x95, 0xe2, 0xf0, 0xc8, 0xa8, 0x63, 0xb5, 0x59, 0xe3, 0xff, 0x6c, 0xc0, 0xee,
0x1b, 0x49, 0x14, 0x8e, 0x24, 0x5e, 0x80, 0x4b, 0x59, 0x38, 0xa1, 0x38, 0x57, 0x74, 0x39, 0x95,
0x5f, 0x66, 0x92, 0x54, 0xd7, 0x3e, 0xb6, 0x4f, 0xf6, 0x9f, 0x3d, 0x0e, 0x36, 0x79, 0x07, 0xe7,
0x4b, 0xb9, 0x28, 0xd5, 0xa7, 0x96, 0x38, 0xa0, 0x15, 0x0e, 0xdf, 0x01, 0x9a, 0xb6, 0x34, 0xc9,
0x33, 0x92, 0xdd, 0x7b, 0xec, 0xfb, 0x64, 0xab, 0x6f, 0x29, 0x3f, 0xb5, 0x84, 0x4b, 0xab, 0x24,
0xbe, 0x06, 0x27, 0x8a, 0x67, 0xd9, 0xe7, 0x2a, 0xec, 0x0e, 0x9b, 0xfa, 0x9b, 0x4d, 0x5f, 0x15,
0xd2, 0x3a, 0x68, 0x3b, 0x32, 0x30, 0x9e, 0xc1, 0xfd, 0xa5, 0x95, 0x0e, 0xd8, 0x60, 0xaf, 0x47,
0xb7, 0x7a, 0x55, 0xe1, 0x9c, 0xc8, 0x24, 0xf0, 0x3d, 0x3c, 0x18, 0x27, 0xa3, 0x58, 0x5d, 0x0e,
0xc6, 0x79, 0x54, 0xc7, 0x6b, 0xde, 0xd6, 0xf3, 0x59, 0x51, 0xd0, 0x2f, 0xf4, 0x75, 0x46, 0x77,
0xbc, 0x4a, 0xe2, 0x47, 0xe8, 0xdc, 0xb4, 0xd6, 0x71, 0x5b, 0xec, 0x7d, 0xb2, 0xdd, 0xbb, 0xca,
0x8c, 0xe3, 0x35, 0xb6, 0xdf, 0x84, 0x1d, 0x9a, 0xa5, 0x3e, 0xc2, 0xc1, 0xea, 0x68, 0xfd, 0xef,
0x36, 0xb8, 0x6b, 0x73, 0xc1, 0x87, 0xd0, 0x8a, 0x65, 0xe1, 0xc3, 0x8b, 0xd2, 0x10, 0x1a, 0x15,
0xfc, 0xa7, 0x7c, 0x9a, 0x86, 0x8a, 0x07, 0xed, 0x08, 0x8d, 0x0a, 0x9e, 0x9f, 0x8a, 0x78, 0x56,
0x8e, 0xd0, 0x08, 0x11, 0x1a, 0x71, 0x48, 0x31, 0xbf, 0x7a, 0x5b, 0xf0, 0x37, 0x1e, 0xc2, 0x5e,
0x2a, 0x55, 0x38, 0x0c, 0x55, 0xc8, 0x4f, 0xd7, 0x16, 0x15, 0xf6, 0xdf, 0x42, 0xdb, 0x9c, 0xe7,
0x7f, 0xe7, 0xe8, 0x40, 0x33, 0xc9, 0x86, 0xf2, 0xab, 0x8e, 0x51, 0x02, 0xff, 0x9b, 0x0d, 0xce,
0x8d, 0xd1, 0xde, 0x8d, 0x6f, 0xc1, 0x72, 0x9f, 0xba, 0xbd, 0x12, 0x60, 0x17, 0x76, 0xd3, 0x84,
0x28, 0xc9, 0x46, 0xdc, 0xde, 0x9e, 0x58, 0x42, 0xff, 0x29, 0xb8, 0x6b, 0xeb, 0xf0, 0xaf, 0x28,
0xfe, 0x39, 0xe0, 0xfa, 0x7c, 0xf1, 0x25, 0xec, 0x1b, 0x7b, 0xa2, 0xff, 0xc6, 0x47, 0xe6, 0x7a,
0x94, 0x67, 0xc0, 0x28, 0x85, 0x7a, 0x21, 0xfa, 0x17, 0xbf, 0xe6, 0x9e, 0x7d, 0x3d, 0xf7, 0xec,
0x3f, 0x73, 0xcf, 0xfe, 0xb1, 0xf0, 0xac, 0xeb, 0x85, 0x67, 0xfd, 0x5e, 0x78, 0xd6, 0x87, 0x17,
0xa3, 0x44, 0xc5, 0xb3, 0x41, 0x10, 0xe5, 0x69, 0xcf, 0x3c, 0x2d, 0xf5, 0x27, 0x5f, 0x96, 0xde,
0xa6, 0x73, 0x35, 0x68, 0xf1, 0x6f, 0xcf, 0xff, 0x06, 0x00, 0x00, 0xff, 0xff, 0xc1, 0x45, 0x35,
0xee, 0xcd, 0x04, 0x00, 0x00,
}
func (m *Message) Marshal() (dAtA []byte, err error) {
@ -536,6 +659,48 @@ func (m *Message_ChunkResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
}
return len(dAtA) - i, nil
}
func (m *Message_LightBlockRequest) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Message_LightBlockRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
if m.LightBlockRequest != nil {
{
size, err := m.LightBlockRequest.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x2a
}
return len(dAtA) - i, nil
}
func (m *Message_LightBlockResponse) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Message_LightBlockResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
if m.LightBlockResponse != nil {
{
size, err := m.LightBlockResponse.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x32
}
return len(dAtA) - i, nil
}
func (m *SnapshotsRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
@ -704,6 +869,69 @@ func (m *ChunkResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
return len(dAtA) - i, nil
}
func (m *LightBlockRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *LightBlockRequest) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *LightBlockRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.Height != 0 {
i = encodeVarintTypes(dAtA, i, uint64(m.Height))
i--
dAtA[i] = 0x8
}
return len(dAtA) - i, nil
}
func (m *LightBlockResponse) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *LightBlockResponse) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *LightBlockResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.LightBlock != nil {
{
size, err := m.LightBlock.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func encodeVarintTypes(dAtA []byte, offset int, v uint64) int {
offset -= sovTypes(v)
base := offset
@ -775,6 +1003,30 @@ func (m *Message_ChunkResponse) Size() (n int) {
}
return n
}
func (m *Message_LightBlockRequest) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.LightBlockRequest != nil {
l = m.LightBlockRequest.Size()
n += 1 + l + sovTypes(uint64(l))
}
return n
}
func (m *Message_LightBlockResponse) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.LightBlockResponse != nil {
l = m.LightBlockResponse.Size()
n += 1 + l + sovTypes(uint64(l))
}
return n
}
func (m *SnapshotsRequest) Size() (n int) {
if m == nil {
return 0
@ -853,6 +1105,31 @@ func (m *ChunkResponse) Size() (n int) {
return n
}
func (m *LightBlockRequest) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Height != 0 {
n += 1 + sovTypes(uint64(m.Height))
}
return n
}
func (m *LightBlockResponse) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.LightBlock != nil {
l = m.LightBlock.Size()
n += 1 + l + sovTypes(uint64(l))
}
return n
}
func sovTypes(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
@ -1028,6 +1305,76 @@ func (m *Message) Unmarshal(dAtA []byte) error {
}
m.Sum = &Message_ChunkResponse{v}
iNdEx = postIndex
case 5:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field LightBlockRequest", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthTypes
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthTypes
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
v := &LightBlockRequest{}
if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
m.Sum = &Message_LightBlockRequest{v}
iNdEx = postIndex
case 6:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field LightBlockResponse", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthTypes
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthTypes
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
v := &LightBlockResponse{}
if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
m.Sum = &Message_LightBlockResponse{v}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipTypes(dAtA[iNdEx:])
@ -1542,6 +1889,161 @@ func (m *ChunkResponse) Unmarshal(dAtA []byte) error {
}
return nil
}
func (m *LightBlockRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: LightBlockRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: LightBlockRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Height", wireType)
}
m.Height = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Height |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipTypes(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthTypes
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *LightBlockResponse) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: LightBlockResponse: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: LightBlockResponse: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field LightBlock", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthTypes
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthTypes
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.LightBlock == nil {
m.LightBlock = &types.LightBlock{}
}
if err := m.LightBlock.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipTypes(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthTypes
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipTypes(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0


+ 16
- 4
proto/tendermint/statesync/types.proto View File

@ -1,14 +1,18 @@
syntax = "proto3";
package tendermint.statesync;
import "tendermint/types/types.proto";
option go_package = "github.com/tendermint/tendermint/proto/tendermint/statesync";
message Message {
oneof sum {
SnapshotsRequest snapshots_request = 1;
SnapshotsResponse snapshots_response = 2;
ChunkRequest chunk_request = 3;
ChunkResponse chunk_response = 4;
SnapshotsRequest snapshots_request = 1;
SnapshotsResponse snapshots_response = 2;
ChunkRequest chunk_request = 3;
ChunkResponse chunk_response = 4;
LightBlockRequest light_block_request = 5;
LightBlockResponse light_block_response = 6;
}
}
@ -35,3 +39,11 @@ message ChunkResponse {
bytes chunk = 4;
bool missing = 5;
}
message LightBlockRequest {
uint64 height = 1;
}
message LightBlockResponse {
tendermint.types.LightBlock light_block = 1;
}

+ 1
- 1
proxy/mocks/app_conn_consensus.go View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.5.1. DO NOT EDIT.
// Code generated by mockery v0.0.0-dev. DO NOT EDIT.
package mocks


+ 1
- 1
proxy/mocks/app_conn_mempool.go View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.5.1. DO NOT EDIT.
// Code generated by mockery v0.0.0-dev. DO NOT EDIT.
package mocks


+ 1
- 1
proxy/mocks/app_conn_query.go View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.5.1. DO NOT EDIT.
// Code generated by mockery v0.0.0-dev. DO NOT EDIT.
package mocks


+ 1
- 1
proxy/mocks/app_conn_snapshot.go View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.5.1. DO NOT EDIT.
// Code generated by mockery v0.0.0-dev. DO NOT EDIT.
package mocks


+ 2
- 2
rpc/core/blocks.go View File

@ -98,8 +98,8 @@ func (env *Environment) Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.
block := env.BlockStore.LoadBlock(height)
blockMeta := env.BlockStore.LoadBlockMeta(height)
if blockMeta == nil {
return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: block}, nil
if blockMeta == nil || block == nil {
return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: &types.Block{}}, nil
}
return &ctypes.ResultBlock{BlockID: blockMeta.BlockID, Block: block}, nil
}


+ 1
- 1
state/mocks/evidence_pool.go View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.5.1. DO NOT EDIT.
// Code generated by mockery v0.0.0-dev. DO NOT EDIT.
package mocks


+ 15
- 1
state/mocks/store.go View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.5.1. DO NOT EDIT.
// Code generated by mockery v0.0.0-dev. DO NOT EDIT.
package mocks
@ -201,3 +201,17 @@ func (_m *Store) SaveABCIResponses(_a0 int64, _a1 *tendermintstate.ABCIResponses
return r0
}
// SaveValidatorSets provides a mock function with given fields: _a0, _a1, _a2
func (_m *Store) SaveValidatorSets(_a0 int64, _a1 int64, _a2 *types.ValidatorSet) error {
ret := _m.Called(_a0, _a1, _a2)
var r0 error
if rf, ok := ret.Get(0).(func(int64, int64, *types.ValidatorSet) error); ok {
r0 = rf(_a0, _a1, _a2)
} else {
r0 = ret.Error(0)
}
return r0
}

+ 21
- 6
state/store.go View File

@ -93,6 +93,8 @@ type Store interface {
Save(State) error
// SaveABCIResponses saves ABCIResponses for a given height
SaveABCIResponses(int64, *tmstate.ABCIResponses) error
// SaveValidatorSet saves the validator set at a given height
SaveValidatorSets(int64, int64, *types.ValidatorSet) error
// Bootstrap is used for bootstrapping state when not starting from a initial height.
Bootstrap(State) error
// PruneStates takes the height from which to prune up to (exclusive)
@ -502,6 +504,24 @@ func (store dbStore) saveABCIResponses(height int64, abciResponses *tmstate.ABCI
return store.db.SetSync(abciResponsesKey(height), bz)
}
// SaveValidatorSets is used to save the validator set over multiple heights.
// It is exposed so that a backfill operation during state sync can populate
// the store with the necessary amount of validator sets to verify any evidence
// it may encounter.
func (store dbStore) SaveValidatorSets(lowerHeight, upperHeight int64, vals *types.ValidatorSet) error {
batch := store.db.NewBatch()
defer batch.Close()
// batch together all the validator sets from lowerHeight to upperHeight
for height := lowerHeight; height <= upperHeight; height++ {
if err := store.saveValidatorsInfo(height, lowerHeight, vals, batch); err != nil {
return err
}
}
return batch.WriteSync()
}
//-----------------------------------------------------------------------------
// LoadValidators loads the ValidatorSet for a given height.
@ -606,12 +626,7 @@ func (store dbStore) saveValidatorsInfo(
return err
}
err = batch.Set(validatorsKey(height), bz)
if err != nil {
return err
}
return nil
return batch.Set(validatorsKey(height), bz)
}
//-----------------------------------------------------------------------------


+ 42
- 0
store/store.go View File

@ -519,6 +519,48 @@ func (bs *BlockStore) SaveSeenCommit(height int64, seenCommit *types.Commit) err
return bs.db.Set(seenCommitKey(height), seenCommitBytes)
}
func (bs *BlockStore) SaveSignedHeader(sh *types.SignedHeader, blockID types.BlockID) error {
// first check that the block store doesn't already have the block
bz, err := bs.db.Get(blockMetaKey(sh.Height))
if err != nil {
return err
}
if bz != nil {
return fmt.Errorf("block at height %d already saved", sh.Height)
}
// FIXME: saving signed headers although necessary for proving evidence,
// doesn't have complete parity with block meta's thus block size and num
// txs are filled with negative numbers. We should aim to find a solution to
// this.
blockMeta := &types.BlockMeta{
BlockID: blockID,
BlockSize: -1,
Header: *sh.Header,
NumTxs: -1,
}
batch := bs.db.NewBatch()
pbm := blockMeta.ToProto()
metaBytes := mustEncode(pbm)
if err := batch.Set(blockMetaKey(sh.Height), metaBytes); err != nil {
return fmt.Errorf("unable to save block meta: %w", err)
}
pbc := sh.Commit.ToProto()
blockCommitBytes := mustEncode(pbc)
if err := batch.Set(blockCommitKey(sh.Height), blockCommitBytes); err != nil {
return fmt.Errorf("unable to save commit: %w", err)
}
if err := batch.WriteSync(); err != nil {
return err
}
return batch.Close()
}
//---------------------------------- KEY ENCODING -----------------------------------------
// key prefixes


+ 1
- 2
test/e2e/networks/simple.toml View File

@ -1,5 +1,4 @@
[node.validator01]
[node.validator02]
[node.validator03]
[node.validator04]
[node.validator04]

+ 1
- 1
test/e2e/pkg/testnet.go View File

@ -49,7 +49,7 @@ const (
PerturbationRestart Perturbation = "restart"
EvidenceAgeHeight int64 = 5
EvidenceAgeTime time.Duration = 10 * time.Second
EvidenceAgeTime time.Duration = 500 * time.Millisecond
)
// Testnet represents a single testnet.


+ 1
- 1
test/e2e/runner/load.go View File

@ -81,7 +81,7 @@ func loadGenerate(ctx context.Context, chTx chan<- types.Tx, multiplier int) {
select {
case chTx <- tx:
time.Sleep(time.Duration(100/multiplier) * time.Millisecond)
time.Sleep(time.Second / time.Duration(multiplier))
case <-ctx.Done():
close(chTx)


+ 12
- 3
test/e2e/tests/block_test.go View File

@ -32,6 +32,11 @@ func TestBlock_Header(t *testing.T) {
if block.Header.Height < first {
continue
}
// the first blocks after state sync come from the backfill process
// and are therefore not complete
if node.StateSync && block.Header.Height <= first+e2e.EvidenceAgeHeight+1 {
continue
}
if block.Header.Height > last {
break
}
@ -63,10 +68,10 @@ func TestBlock_Range(t *testing.T) {
last := status.SyncInfo.LatestBlockHeight
switch {
// if the node state synced we ignore any assertions because it's hard to know how far back
// the node ran reverse sync for
case node.StateSync:
assert.Greater(t, first, node.Testnet.InitialHeight,
"state synced nodes should not contain network's initial height")
break
case node.RetainBlocks > 0 && int64(node.RetainBlocks) < (last-node.Testnet.InitialHeight+1):
// Delta handles race conditions in reading first/last heights.
assert.InDelta(t, node.RetainBlocks, last-first+1, 1,
@ -78,12 +83,16 @@ func TestBlock_Range(t *testing.T) {
}
for h := first; h <= last; h++ {
if node.StateSync && h <= first+e2e.EvidenceAgeHeight+1 {
continue
}
resp, err := client.Block(ctx, &(h))
if err != nil && node.RetainBlocks > 0 && h == first {
// Ignore errors in first block if node is pruning blocks due to race conditions.
continue
}
require.NoError(t, err)
require.NotNil(t, resp.Block)
assert.Equal(t, h, resp.Block.Height)
}


+ 2
- 2
types/block_meta_test.go View File

@ -10,7 +10,7 @@ import (
)
func TestBlockMeta_ToProto(t *testing.T) {
h := makeRandHeader()
h := MakeRandHeader()
bi := BlockID{Hash: h.Hash(), PartSetHeader: PartSetHeader{Total: 123, Hash: tmrand.Bytes(tmhash.Size)}}
bm := &BlockMeta{
@ -47,7 +47,7 @@ func TestBlockMeta_ToProto(t *testing.T) {
}
func TestBlockMeta_ValidateBasic(t *testing.T) {
h := makeRandHeader()
h := MakeRandHeader()
bi := BlockID{Hash: h.Hash(), PartSetHeader: PartSetHeader{Total: 123, Hash: tmrand.Bytes(tmhash.Size)}}
bi2 := BlockID{Hash: tmrand.Bytes(tmhash.Size),
PartSetHeader: PartSetHeader{Total: 123, Hash: tmrand.Bytes(tmhash.Size)}}


+ 4
- 3
types/block_test.go View File

@ -749,7 +749,8 @@ func TestEvidenceDataProtoBuf(t *testing.T) {
}
}
func makeRandHeader() Header {
// exposed for testing
func MakeRandHeader() Header {
chainID := "test"
t := time.Now()
height := mrand.Int63()
@ -778,7 +779,7 @@ func makeRandHeader() Header {
}
func TestHeaderProto(t *testing.T) {
h1 := makeRandHeader()
h1 := MakeRandHeader()
tc := []struct {
msg string
h1 *Header
@ -830,7 +831,7 @@ func TestBlockIDProtoBuf(t *testing.T) {
func TestSignedHeaderProtoBuf(t *testing.T) {
commit := randCommit(time.Now())
h := makeRandHeader()
h := MakeRandHeader()
sh := SignedHeader{Header: &h, Commit: commit}


+ 2
- 2
types/light_test.go View File

@ -12,7 +12,7 @@ import (
)
func TestLightBlockValidateBasic(t *testing.T) {
header := makeRandHeader()
header := MakeRandHeader()
commit := randCommit(time.Now())
vals, _ := randValidatorPrivValSet(5, 1)
header.Height = commit.Height
@ -57,7 +57,7 @@ func TestLightBlockValidateBasic(t *testing.T) {
}
func TestLightBlockProtobuf(t *testing.T) {
header := makeRandHeader()
header := MakeRandHeader()
commit := randCommit(time.Now())
vals, _ := randValidatorPrivValSet(5, 1)
header.Height = commit.Height


Loading…
Cancel
Save