Browse Source

rpc: re-index missing events (#6535)

pull/6622/head
JayT106 4 years ago
committed by GitHub
parent
commit
167fa738a3
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 600 additions and 1 deletions
  1. +1
    -1
      docs/nodes/configuration.md
  2. +96
    -0
      rpc/core/dev.go
  3. +88
    -0
      rpc/core/dev_test.go
  4. +1
    -0
      rpc/core/routes.go
  5. +4
    -0
      rpc/core/types/responses.go
  6. +45
    -0
      rpc/openapi/openapi.yaml
  7. +2
    -0
      state/indexer/eventsink.go
  8. +194
    -0
      state/mocks/block_store.go
  9. +167
    -0
      state/mocks/event_sink.go
  10. +2
    -0
      state/services.go

+ 1
- 1
docs/nodes/configuration.md View File

@ -145,7 +145,7 @@ grpc-laddr = ""
# 1024 - 40 - 10 - 50 = 924 = ~900
grpc-max-open-connections = 900
# Activate unsafe RPC commands like /dial_seeds and /unsafe_flush_mempool
# Activate unsafe RPC commands like /dial_seeds, /re_index, and /unsafe_flush_mempool
unsafe = false
# Maximum number of simultaneous connections (including WebSocket).


+ 96
- 0
rpc/core/dev.go View File

@ -1,8 +1,13 @@
package core
import (
"fmt"
abcitypes "github.com/tendermint/tendermint/abci/types"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/types"
)
// UnsafeFlushMempool removes all transactions from the mempool.
@ -10,3 +15,94 @@ func (env *Environment) UnsafeFlushMempool(ctx *rpctypes.Context) (*ctypes.Resul
env.Mempool.Flush()
return &ctypes.ResultUnsafeFlushMempool{}, nil
}
// UnsafeReIndex re-index the block/transaction events into the eventsinks.
func (env *Environment) UnsafeReIndex(
ctx *rpctypes.Context,
start int64,
end int64) (*ctypes.ResultUnsafeReIndex, error) {
base := env.BlockStore.Base()
if start <= base {
return nil, fmt.Errorf("%s (requested start height: %d, base height: %d)", ctypes.ErrHeightNotAvailable, start, base)
}
height := env.BlockStore.Height()
if start > height {
return nil, fmt.Errorf(
"%s (requested start height: %d, store height: %d)", ctypes.ErrHeightNotAvailable, start, height)
}
if end <= base {
return nil, fmt.Errorf(
"%s (requested end height: %d, base height: %d)", ctypes.ErrHeightNotAvailable, end, base)
}
if end < start {
return nil, fmt.Errorf(
"%s (requested the end height: %d is less than the start height: %d)", ctypes.ErrInvalidRequest, start, end)
}
if end > height {
end = height
}
if !indexer.IndexingEnabled(env.EventSinks) {
return nil, fmt.Errorf("no event sink has been enabled")
}
for i := start; i <= end; i++ {
select {
case <-ctx.Context().Done():
return nil, ctx.Context().Err()
default:
b := env.BlockStore.LoadBlock(i)
if b == nil {
return nil, fmt.Errorf("not able to load block at height %d from the blockstore", i)
}
r, err := env.StateStore.LoadABCIResponses(i)
if err != nil {
return nil, fmt.Errorf("not able to load ABCI Response at height %d from the statestore", i)
}
e := types.EventDataNewBlockHeader{
Header: b.Header,
NumTxs: int64(len(b.Txs)),
ResultBeginBlock: *r.BeginBlock,
ResultEndBlock: *r.EndBlock,
}
var batch *indexer.Batch
if e.NumTxs > 0 {
batch = indexer.NewBatch(e.NumTxs)
for i, tx := range b.Data.Txs {
tr := abcitypes.TxResult{
Height: b.Height,
Index: uint32(i),
Tx: tx,
Result: *(r.DeliverTxs[i]),
}
_ = batch.Add(&tr)
}
}
for _, sink := range env.EventSinks {
if err := sink.IndexBlockEvents(e); err != nil {
return nil, err
}
if batch != nil {
if err := sink.IndexTxEvents(batch.Ops); err != nil {
return nil, err
}
}
}
}
}
return &ctypes.ResultUnsafeReIndex{Result: "re-index finished"}, nil
}

+ 88
- 0
rpc/core/dev_test.go View File

@ -0,0 +1,88 @@
package core
import (
"errors"
"testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
abcitypes "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/state/mocks"
"github.com/tendermint/tendermint/types"
)
const (
height int64 = 10
base int64 = 1
)
func TestUnsafeReIndex(t *testing.T) {
mockBlockStore := &mocks.BlockStore{}
mockStateStore := &mocks.Store{}
mockEventSink := &mocks.EventSink{}
env := &Environment{
BlockStore: mockBlockStore,
StateStore: mockStateStore,
EventSinks: []indexer.EventSink{mockEventSink},
Logger: log.TestingLogger()}
mockBlockStore.
On("Base").Return(base).
On("Height").Return(height).
On("LoadBlock", base+1).Return(nil).Once().
On("LoadBlock", base+1).Return(&types.Block{Data: types.Data{Txs: types.Txs{make(types.Tx, 1)}}})
mockEventSink.
On("Type").Return(indexer.NULL).Once().
On("Type").Return(indexer.KV).
On("IndexBlockEvents", mock.AnythingOfType("types.EventDataNewBlockHeader")).Return(errors.New("")).Once().
On("IndexBlockEvents", mock.AnythingOfType("types.EventDataNewBlockHeader")).Return(nil).
On("IndexTxEvents", mock.AnythingOfType("[]*types.TxResult")).Return(errors.New("")).Once().
On("IndexTxEvents", mock.AnythingOfType("[]*types.TxResult")).Return(nil)
dtx := abcitypes.ResponseDeliverTx{}
abciResp := &tmstate.ABCIResponses{
DeliverTxs: []*abcitypes.ResponseDeliverTx{&dtx},
EndBlock: &abcitypes.ResponseEndBlock{},
BeginBlock: &abcitypes.ResponseBeginBlock{},
}
mockStateStore.
On("LoadABCIResponses", base+1).Return(nil, errors.New("")).Once().
On("LoadABCIResponses", base+1).Return(abciResp, nil)
testCases := []struct {
startHeight int64
endHeight int64
enableSink bool
isErr bool
}{
{base, 0, false, true}, // the start height less equal than the base height
{height + 1, 0, false, true}, // the start height greater than the store height
{base + 1, base, false, true}, // the end height less equal than the base height
{height, height - 1, false, true}, // the start height greater than the end height
{base + 1, height + 1, false, true}, // the end height will be the same as the store height and no eventsink error
{base + 1, base + 1, true, true}, // LoadBlock error
{base + 1, base + 1, true, true}, // LoadABCIResponses error
{base + 1, base + 1, true, true}, // index block event error
{base + 1, base + 1, true, true}, // index tx event error
{base + 1, base + 1, true, false},
}
for _, tc := range testCases {
res, err := env.UnsafeReIndex(&rpctypes.Context{}, tc.startHeight, tc.endHeight)
if tc.isErr {
require.Error(t, err)
} else {
require.NoError(t, err)
require.NotNil(t, res)
}
}
}

+ 1
- 0
rpc/core/routes.go View File

@ -58,4 +58,5 @@ func (env *Environment) AddUnsafe(routes RoutesMap) {
routes["dial_seeds"] = rpc.NewRPCFunc(env.UnsafeDialSeeds, "seeds", false)
routes["dial_peers"] = rpc.NewRPCFunc(env.UnsafeDialPeers, "peers,persistent,unconditional,private", false)
routes["unsafe_flush_mempool"] = rpc.NewRPCFunc(env.UnsafeFlushMempool, "", false)
routes["re_index"] = rpc.NewRPCFunc(env.UnsafeReIndex, "start_height,end_height", false)
}

+ 4
- 0
rpc/core/types/responses.go View File

@ -266,3 +266,7 @@ type ResultEvent struct {
Data types.TMEventData `json:"data"`
Events map[string][]string `json:"events"`
}
type ResultUnsafeReIndex struct {
Result string `json:"result"`
}

+ 45
- 0
rpc/openapi/openapi.yaml View File

@ -1214,6 +1214,44 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/ErrorResponse"
/re_index:
post:
summary: Re-index the block/tx event data (Unsafe)
operationId: re_index
tags:
- Unsafe
description: |
re-index the block/tx event data to the indexer database, this route in under unsafe, and has to manually enabled to use
**Example:** curl -X POST "localhost:26657/re_index?start_height=1&end_height=2"
parameters:
- in: query
name: start_height
description: the block height would like to start for re-index.
required: true
schema:
type: integer
example: 1
- in: query
name: end_height
description: the block height would like to finish for re-index.
required: true
schema:
type: integer
example: 2
responses:
"200":
description: Re-index finished
content:
application/json:
schema:
$ref: "#/components/schemas/ReIndexResponse"
"500":
description: Request error
content:
application/json:
schema:
$ref: "#/components/schemas/ErrorResponse"
components:
schemas:
@ -2751,6 +2789,13 @@ components:
type: string
example: "Dialing seeds in progress. See /net_info for details"
ReIndexResponse:
type: object
properties:
Log:
type: string
example: "Finished re-indexing the block/tx events."
BlockSearchResponse:
type: object
required:


+ 2
- 0
state/indexer/eventsink.go View File

@ -16,6 +16,8 @@ const (
PSQL EventSinkType = "psql"
)
//go:generate mockery --case underscore --name EventSink
// EventSink interface is defined the APIs for the IndexerService to interact with the data store,
// including the block/transaction indexing and the search functions.
//


+ 194
- 0
state/mocks/block_store.go View File

@ -0,0 +1,194 @@
// Code generated by mockery 2.7.5. DO NOT EDIT.
package mocks
import (
mock "github.com/stretchr/testify/mock"
types "github.com/tendermint/tendermint/types"
)
// BlockStore is an autogenerated mock type for the BlockStore type
type BlockStore struct {
mock.Mock
}
// Base provides a mock function with given fields:
func (_m *BlockStore) Base() int64 {
ret := _m.Called()
var r0 int64
if rf, ok := ret.Get(0).(func() int64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int64)
}
return r0
}
// Height provides a mock function with given fields:
func (_m *BlockStore) Height() int64 {
ret := _m.Called()
var r0 int64
if rf, ok := ret.Get(0).(func() int64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int64)
}
return r0
}
// LoadBaseMeta provides a mock function with given fields:
func (_m *BlockStore) LoadBaseMeta() *types.BlockMeta {
ret := _m.Called()
var r0 *types.BlockMeta
if rf, ok := ret.Get(0).(func() *types.BlockMeta); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.BlockMeta)
}
}
return r0
}
// LoadBlock provides a mock function with given fields: height
func (_m *BlockStore) LoadBlock(height int64) *types.Block {
ret := _m.Called(height)
var r0 *types.Block
if rf, ok := ret.Get(0).(func(int64) *types.Block); ok {
r0 = rf(height)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.Block)
}
}
return r0
}
// LoadBlockByHash provides a mock function with given fields: hash
func (_m *BlockStore) LoadBlockByHash(hash []byte) *types.Block {
ret := _m.Called(hash)
var r0 *types.Block
if rf, ok := ret.Get(0).(func([]byte) *types.Block); ok {
r0 = rf(hash)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.Block)
}
}
return r0
}
// LoadBlockCommit provides a mock function with given fields: height
func (_m *BlockStore) LoadBlockCommit(height int64) *types.Commit {
ret := _m.Called(height)
var r0 *types.Commit
if rf, ok := ret.Get(0).(func(int64) *types.Commit); ok {
r0 = rf(height)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.Commit)
}
}
return r0
}
// LoadBlockMeta provides a mock function with given fields: height
func (_m *BlockStore) LoadBlockMeta(height int64) *types.BlockMeta {
ret := _m.Called(height)
var r0 *types.BlockMeta
if rf, ok := ret.Get(0).(func(int64) *types.BlockMeta); ok {
r0 = rf(height)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.BlockMeta)
}
}
return r0
}
// LoadBlockPart provides a mock function with given fields: height, index
func (_m *BlockStore) LoadBlockPart(height int64, index int) *types.Part {
ret := _m.Called(height, index)
var r0 *types.Part
if rf, ok := ret.Get(0).(func(int64, int) *types.Part); ok {
r0 = rf(height, index)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.Part)
}
}
return r0
}
// LoadSeenCommit provides a mock function with given fields: height
func (_m *BlockStore) LoadSeenCommit(height int64) *types.Commit {
ret := _m.Called(height)
var r0 *types.Commit
if rf, ok := ret.Get(0).(func(int64) *types.Commit); ok {
r0 = rf(height)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.Commit)
}
}
return r0
}
// PruneBlocks provides a mock function with given fields: height
func (_m *BlockStore) PruneBlocks(height int64) (uint64, error) {
ret := _m.Called(height)
var r0 uint64
if rf, ok := ret.Get(0).(func(int64) uint64); ok {
r0 = rf(height)
} else {
r0 = ret.Get(0).(uint64)
}
var r1 error
if rf, ok := ret.Get(1).(func(int64) error); ok {
r1 = rf(height)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// SaveBlock provides a mock function with given fields: block, blockParts, seenCommit
func (_m *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
_m.Called(block, blockParts, seenCommit)
}
// Size provides a mock function with given fields:
func (_m *BlockStore) Size() int64 {
ret := _m.Called()
var r0 int64
if rf, ok := ret.Get(0).(func() int64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int64)
}
return r0
}

+ 167
- 0
state/mocks/event_sink.go View File

@ -0,0 +1,167 @@
// Code generated by mockery 2.7.5. DO NOT EDIT.
package mocks
import (
context "context"
mock "github.com/stretchr/testify/mock"
indexer "github.com/tendermint/tendermint/state/indexer"
query "github.com/tendermint/tendermint/libs/pubsub/query"
tenderminttypes "github.com/tendermint/tendermint/types"
types "github.com/tendermint/tendermint/abci/types"
)
// EventSink is an autogenerated mock type for the EventSink type
type EventSink struct {
mock.Mock
}
// GetTxByHash provides a mock function with given fields: _a0
func (_m *EventSink) GetTxByHash(_a0 []byte) (*types.TxResult, error) {
ret := _m.Called(_a0)
var r0 *types.TxResult
if rf, ok := ret.Get(0).(func([]byte) *types.TxResult); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.TxResult)
}
}
var r1 error
if rf, ok := ret.Get(1).(func([]byte) error); ok {
r1 = rf(_a0)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// HasBlock provides a mock function with given fields: _a0
func (_m *EventSink) HasBlock(_a0 int64) (bool, error) {
ret := _m.Called(_a0)
var r0 bool
if rf, ok := ret.Get(0).(func(int64) bool); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(bool)
}
var r1 error
if rf, ok := ret.Get(1).(func(int64) error); ok {
r1 = rf(_a0)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// IndexBlockEvents provides a mock function with given fields: _a0
func (_m *EventSink) IndexBlockEvents(_a0 tenderminttypes.EventDataNewBlockHeader) error {
ret := _m.Called(_a0)
var r0 error
if rf, ok := ret.Get(0).(func(tenderminttypes.EventDataNewBlockHeader) error); ok {
r0 = rf(_a0)
} else {
r0 = ret.Error(0)
}
return r0
}
// IndexTxEvents provides a mock function with given fields: _a0
func (_m *EventSink) IndexTxEvents(_a0 []*types.TxResult) error {
ret := _m.Called(_a0)
var r0 error
if rf, ok := ret.Get(0).(func([]*types.TxResult) error); ok {
r0 = rf(_a0)
} else {
r0 = ret.Error(0)
}
return r0
}
// SearchBlockEvents provides a mock function with given fields: _a0, _a1
func (_m *EventSink) SearchBlockEvents(_a0 context.Context, _a1 *query.Query) ([]int64, error) {
ret := _m.Called(_a0, _a1)
var r0 []int64
if rf, ok := ret.Get(0).(func(context.Context, *query.Query) []int64); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]int64)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *query.Query) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// SearchTxEvents provides a mock function with given fields: _a0, _a1
func (_m *EventSink) SearchTxEvents(_a0 context.Context, _a1 *query.Query) ([]*types.TxResult, error) {
ret := _m.Called(_a0, _a1)
var r0 []*types.TxResult
if rf, ok := ret.Get(0).(func(context.Context, *query.Query) []*types.TxResult); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*types.TxResult)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *query.Query) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Stop provides a mock function with given fields:
func (_m *EventSink) Stop() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// Type provides a mock function with given fields:
func (_m *EventSink) Type() indexer.EventSinkType {
ret := _m.Called()
var r0 indexer.EventSinkType
if rf, ok := ret.Get(0).(func() indexer.EventSinkType); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(indexer.EventSinkType)
}
return r0
}

+ 2
- 0
state/services.go View File

@ -9,6 +9,8 @@ import (
// NOTE: Interfaces used by RPC must be thread safe!
//------------------------------------------------------
//go:generate mockery --case underscore --name BlockStore
//------------------------------------------------------
// blockstore


Loading…
Cancel
Save