From 167fa738a379147f333b86f539bc03f597b978e5 Mon Sep 17 00:00:00 2001 From: JayT106 Date: Fri, 25 Jun 2021 09:14:37 -0400 Subject: [PATCH] rpc: re-index missing events (#6535) --- docs/nodes/configuration.md | 2 +- rpc/core/dev.go | 96 ++++++++++++++++++ rpc/core/dev_test.go | 88 ++++++++++++++++ rpc/core/routes.go | 1 + rpc/core/types/responses.go | 4 + rpc/openapi/openapi.yaml | 45 +++++++++ state/indexer/eventsink.go | 2 + state/mocks/block_store.go | 194 ++++++++++++++++++++++++++++++++++++ state/mocks/event_sink.go | 167 +++++++++++++++++++++++++++++++ state/services.go | 2 + 10 files changed, 600 insertions(+), 1 deletion(-) create mode 100644 rpc/core/dev_test.go create mode 100644 state/mocks/block_store.go create mode 100644 state/mocks/event_sink.go diff --git a/docs/nodes/configuration.md b/docs/nodes/configuration.md index 203863990..c91701848 100644 --- a/docs/nodes/configuration.md +++ b/docs/nodes/configuration.md @@ -145,7 +145,7 @@ grpc-laddr = "" # 1024 - 40 - 10 - 50 = 924 = ~900 grpc-max-open-connections = 900 -# Activate unsafe RPC commands like /dial_seeds and /unsafe_flush_mempool +# Activate unsafe RPC commands like /dial_seeds, /re_index, and /unsafe_flush_mempool unsafe = false # Maximum number of simultaneous connections (including WebSocket). diff --git a/rpc/core/dev.go b/rpc/core/dev.go index 0e365cdcc..51a55157c 100644 --- a/rpc/core/dev.go +++ b/rpc/core/dev.go @@ -1,8 +1,13 @@ package core import ( + "fmt" + + abcitypes "github.com/tendermint/tendermint/abci/types" ctypes "github.com/tendermint/tendermint/rpc/core/types" rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types" + "github.com/tendermint/tendermint/state/indexer" + "github.com/tendermint/tendermint/types" ) // UnsafeFlushMempool removes all transactions from the mempool. @@ -10,3 +15,94 @@ func (env *Environment) UnsafeFlushMempool(ctx *rpctypes.Context) (*ctypes.Resul env.Mempool.Flush() return &ctypes.ResultUnsafeFlushMempool{}, nil } + +// UnsafeReIndex re-index the block/transaction events into the eventsinks. +func (env *Environment) UnsafeReIndex( + ctx *rpctypes.Context, + start int64, + end int64) (*ctypes.ResultUnsafeReIndex, error) { + + base := env.BlockStore.Base() + + if start <= base { + return nil, fmt.Errorf("%s (requested start height: %d, base height: %d)", ctypes.ErrHeightNotAvailable, start, base) + } + + height := env.BlockStore.Height() + if start > height { + return nil, fmt.Errorf( + "%s (requested start height: %d, store height: %d)", ctypes.ErrHeightNotAvailable, start, height) + } + + if end <= base { + return nil, fmt.Errorf( + "%s (requested end height: %d, base height: %d)", ctypes.ErrHeightNotAvailable, end, base) + } + + if end < start { + return nil, fmt.Errorf( + "%s (requested the end height: %d is less than the start height: %d)", ctypes.ErrInvalidRequest, start, end) + } + + if end > height { + end = height + } + + if !indexer.IndexingEnabled(env.EventSinks) { + return nil, fmt.Errorf("no event sink has been enabled") + } + + for i := start; i <= end; i++ { + select { + case <-ctx.Context().Done(): + return nil, ctx.Context().Err() + default: + b := env.BlockStore.LoadBlock(i) + if b == nil { + return nil, fmt.Errorf("not able to load block at height %d from the blockstore", i) + } + + r, err := env.StateStore.LoadABCIResponses(i) + if err != nil { + return nil, fmt.Errorf("not able to load ABCI Response at height %d from the statestore", i) + } + + e := types.EventDataNewBlockHeader{ + Header: b.Header, + NumTxs: int64(len(b.Txs)), + ResultBeginBlock: *r.BeginBlock, + ResultEndBlock: *r.EndBlock, + } + + var batch *indexer.Batch + if e.NumTxs > 0 { + batch = indexer.NewBatch(e.NumTxs) + + for i, tx := range b.Data.Txs { + tr := abcitypes.TxResult{ + Height: b.Height, + Index: uint32(i), + Tx: tx, + Result: *(r.DeliverTxs[i]), + } + + _ = batch.Add(&tr) + } + } + + for _, sink := range env.EventSinks { + if err := sink.IndexBlockEvents(e); err != nil { + return nil, err + } + + if batch != nil { + if err := sink.IndexTxEvents(batch.Ops); err != nil { + return nil, err + } + } + } + } + } + + return &ctypes.ResultUnsafeReIndex{Result: "re-index finished"}, nil +} diff --git a/rpc/core/dev_test.go b/rpc/core/dev_test.go new file mode 100644 index 000000000..3660e7f81 --- /dev/null +++ b/rpc/core/dev_test.go @@ -0,0 +1,88 @@ +package core + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + abcitypes "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/log" + tmstate "github.com/tendermint/tendermint/proto/tendermint/state" + rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types" + "github.com/tendermint/tendermint/state/indexer" + "github.com/tendermint/tendermint/state/mocks" + "github.com/tendermint/tendermint/types" +) + +const ( + height int64 = 10 + base int64 = 1 +) + +func TestUnsafeReIndex(t *testing.T) { + + mockBlockStore := &mocks.BlockStore{} + mockStateStore := &mocks.Store{} + mockEventSink := &mocks.EventSink{} + + env := &Environment{ + BlockStore: mockBlockStore, + StateStore: mockStateStore, + EventSinks: []indexer.EventSink{mockEventSink}, + Logger: log.TestingLogger()} + + mockBlockStore. + On("Base").Return(base). + On("Height").Return(height). + On("LoadBlock", base+1).Return(nil).Once(). + On("LoadBlock", base+1).Return(&types.Block{Data: types.Data{Txs: types.Txs{make(types.Tx, 1)}}}) + + mockEventSink. + On("Type").Return(indexer.NULL).Once(). + On("Type").Return(indexer.KV). + On("IndexBlockEvents", mock.AnythingOfType("types.EventDataNewBlockHeader")).Return(errors.New("")).Once(). + On("IndexBlockEvents", mock.AnythingOfType("types.EventDataNewBlockHeader")).Return(nil). + On("IndexTxEvents", mock.AnythingOfType("[]*types.TxResult")).Return(errors.New("")).Once(). + On("IndexTxEvents", mock.AnythingOfType("[]*types.TxResult")).Return(nil) + + dtx := abcitypes.ResponseDeliverTx{} + abciResp := &tmstate.ABCIResponses{ + DeliverTxs: []*abcitypes.ResponseDeliverTx{&dtx}, + EndBlock: &abcitypes.ResponseEndBlock{}, + BeginBlock: &abcitypes.ResponseBeginBlock{}, + } + + mockStateStore. + On("LoadABCIResponses", base+1).Return(nil, errors.New("")).Once(). + On("LoadABCIResponses", base+1).Return(abciResp, nil) + + testCases := []struct { + startHeight int64 + endHeight int64 + enableSink bool + isErr bool + }{ + {base, 0, false, true}, // the start height less equal than the base height + {height + 1, 0, false, true}, // the start height greater than the store height + {base + 1, base, false, true}, // the end height less equal than the base height + {height, height - 1, false, true}, // the start height greater than the end height + {base + 1, height + 1, false, true}, // the end height will be the same as the store height and no eventsink error + {base + 1, base + 1, true, true}, // LoadBlock error + {base + 1, base + 1, true, true}, // LoadABCIResponses error + {base + 1, base + 1, true, true}, // index block event error + {base + 1, base + 1, true, true}, // index tx event error + {base + 1, base + 1, true, false}, + } + + for _, tc := range testCases { + res, err := env.UnsafeReIndex(&rpctypes.Context{}, tc.startHeight, tc.endHeight) + if tc.isErr { + require.Error(t, err) + } else { + require.NoError(t, err) + require.NotNil(t, res) + } + } +} diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 1eb50fe4e..52f5cbc81 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -58,4 +58,5 @@ func (env *Environment) AddUnsafe(routes RoutesMap) { routes["dial_seeds"] = rpc.NewRPCFunc(env.UnsafeDialSeeds, "seeds", false) routes["dial_peers"] = rpc.NewRPCFunc(env.UnsafeDialPeers, "peers,persistent,unconditional,private", false) routes["unsafe_flush_mempool"] = rpc.NewRPCFunc(env.UnsafeFlushMempool, "", false) + routes["re_index"] = rpc.NewRPCFunc(env.UnsafeReIndex, "start_height,end_height", false) } diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index d6d4b0983..93178cc03 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -266,3 +266,7 @@ type ResultEvent struct { Data types.TMEventData `json:"data"` Events map[string][]string `json:"events"` } + +type ResultUnsafeReIndex struct { + Result string `json:"result"` +} diff --git a/rpc/openapi/openapi.yaml b/rpc/openapi/openapi.yaml index 6a2c16d19..4cdb60c96 100644 --- a/rpc/openapi/openapi.yaml +++ b/rpc/openapi/openapi.yaml @@ -1214,6 +1214,44 @@ paths: application/json: schema: $ref: "#/components/schemas/ErrorResponse" + /re_index: + post: + summary: Re-index the block/tx event data (Unsafe) + operationId: re_index + tags: + - Unsafe + description: | + re-index the block/tx event data to the indexer database, this route in under unsafe, and has to manually enabled to use + + **Example:** curl -X POST "localhost:26657/re_index?start_height=1&end_height=2" + parameters: + - in: query + name: start_height + description: the block height would like to start for re-index. + required: true + schema: + type: integer + example: 1 + - in: query + name: end_height + description: the block height would like to finish for re-index. + required: true + schema: + type: integer + example: 2 + responses: + "200": + description: Re-index finished + content: + application/json: + schema: + $ref: "#/components/schemas/ReIndexResponse" + "500": + description: Request error + content: + application/json: + schema: + $ref: "#/components/schemas/ErrorResponse" components: schemas: @@ -2751,6 +2789,13 @@ components: type: string example: "Dialing seeds in progress. See /net_info for details" + ReIndexResponse: + type: object + properties: + Log: + type: string + example: "Finished re-indexing the block/tx events." + BlockSearchResponse: type: object required: diff --git a/state/indexer/eventsink.go b/state/indexer/eventsink.go index 8c2529103..19d93e4e5 100644 --- a/state/indexer/eventsink.go +++ b/state/indexer/eventsink.go @@ -16,6 +16,8 @@ const ( PSQL EventSinkType = "psql" ) +//go:generate mockery --case underscore --name EventSink + // EventSink interface is defined the APIs for the IndexerService to interact with the data store, // including the block/transaction indexing and the search functions. // diff --git a/state/mocks/block_store.go b/state/mocks/block_store.go new file mode 100644 index 000000000..b9488547d --- /dev/null +++ b/state/mocks/block_store.go @@ -0,0 +1,194 @@ +// Code generated by mockery 2.7.5. DO NOT EDIT. + +package mocks + +import ( + mock "github.com/stretchr/testify/mock" + + types "github.com/tendermint/tendermint/types" +) + +// BlockStore is an autogenerated mock type for the BlockStore type +type BlockStore struct { + mock.Mock +} + +// Base provides a mock function with given fields: +func (_m *BlockStore) Base() int64 { + ret := _m.Called() + + var r0 int64 + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} + +// Height provides a mock function with given fields: +func (_m *BlockStore) Height() int64 { + ret := _m.Called() + + var r0 int64 + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} + +// LoadBaseMeta provides a mock function with given fields: +func (_m *BlockStore) LoadBaseMeta() *types.BlockMeta { + ret := _m.Called() + + var r0 *types.BlockMeta + if rf, ok := ret.Get(0).(func() *types.BlockMeta); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.BlockMeta) + } + } + + return r0 +} + +// LoadBlock provides a mock function with given fields: height +func (_m *BlockStore) LoadBlock(height int64) *types.Block { + ret := _m.Called(height) + + var r0 *types.Block + if rf, ok := ret.Get(0).(func(int64) *types.Block); ok { + r0 = rf(height) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Block) + } + } + + return r0 +} + +// LoadBlockByHash provides a mock function with given fields: hash +func (_m *BlockStore) LoadBlockByHash(hash []byte) *types.Block { + ret := _m.Called(hash) + + var r0 *types.Block + if rf, ok := ret.Get(0).(func([]byte) *types.Block); ok { + r0 = rf(hash) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Block) + } + } + + return r0 +} + +// LoadBlockCommit provides a mock function with given fields: height +func (_m *BlockStore) LoadBlockCommit(height int64) *types.Commit { + ret := _m.Called(height) + + var r0 *types.Commit + if rf, ok := ret.Get(0).(func(int64) *types.Commit); ok { + r0 = rf(height) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Commit) + } + } + + return r0 +} + +// LoadBlockMeta provides a mock function with given fields: height +func (_m *BlockStore) LoadBlockMeta(height int64) *types.BlockMeta { + ret := _m.Called(height) + + var r0 *types.BlockMeta + if rf, ok := ret.Get(0).(func(int64) *types.BlockMeta); ok { + r0 = rf(height) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.BlockMeta) + } + } + + return r0 +} + +// LoadBlockPart provides a mock function with given fields: height, index +func (_m *BlockStore) LoadBlockPart(height int64, index int) *types.Part { + ret := _m.Called(height, index) + + var r0 *types.Part + if rf, ok := ret.Get(0).(func(int64, int) *types.Part); ok { + r0 = rf(height, index) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Part) + } + } + + return r0 +} + +// LoadSeenCommit provides a mock function with given fields: height +func (_m *BlockStore) LoadSeenCommit(height int64) *types.Commit { + ret := _m.Called(height) + + var r0 *types.Commit + if rf, ok := ret.Get(0).(func(int64) *types.Commit); ok { + r0 = rf(height) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Commit) + } + } + + return r0 +} + +// PruneBlocks provides a mock function with given fields: height +func (_m *BlockStore) PruneBlocks(height int64) (uint64, error) { + ret := _m.Called(height) + + var r0 uint64 + if rf, ok := ret.Get(0).(func(int64) uint64); ok { + r0 = rf(height) + } else { + r0 = ret.Get(0).(uint64) + } + + var r1 error + if rf, ok := ret.Get(1).(func(int64) error); ok { + r1 = rf(height) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// SaveBlock provides a mock function with given fields: block, blockParts, seenCommit +func (_m *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { + _m.Called(block, blockParts, seenCommit) +} + +// Size provides a mock function with given fields: +func (_m *BlockStore) Size() int64 { + ret := _m.Called() + + var r0 int64 + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} diff --git a/state/mocks/event_sink.go b/state/mocks/event_sink.go new file mode 100644 index 000000000..749515ccf --- /dev/null +++ b/state/mocks/event_sink.go @@ -0,0 +1,167 @@ +// Code generated by mockery 2.7.5. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + indexer "github.com/tendermint/tendermint/state/indexer" + + query "github.com/tendermint/tendermint/libs/pubsub/query" + + tenderminttypes "github.com/tendermint/tendermint/types" + + types "github.com/tendermint/tendermint/abci/types" +) + +// EventSink is an autogenerated mock type for the EventSink type +type EventSink struct { + mock.Mock +} + +// GetTxByHash provides a mock function with given fields: _a0 +func (_m *EventSink) GetTxByHash(_a0 []byte) (*types.TxResult, error) { + ret := _m.Called(_a0) + + var r0 *types.TxResult + if rf, ok := ret.Get(0).(func([]byte) *types.TxResult); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.TxResult) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func([]byte) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// HasBlock provides a mock function with given fields: _a0 +func (_m *EventSink) HasBlock(_a0 int64) (bool, error) { + ret := _m.Called(_a0) + + var r0 bool + if rf, ok := ret.Get(0).(func(int64) bool); ok { + r0 = rf(_a0) + } else { + r0 = ret.Get(0).(bool) + } + + var r1 error + if rf, ok := ret.Get(1).(func(int64) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// IndexBlockEvents provides a mock function with given fields: _a0 +func (_m *EventSink) IndexBlockEvents(_a0 tenderminttypes.EventDataNewBlockHeader) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(tenderminttypes.EventDataNewBlockHeader) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// IndexTxEvents provides a mock function with given fields: _a0 +func (_m *EventSink) IndexTxEvents(_a0 []*types.TxResult) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func([]*types.TxResult) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SearchBlockEvents provides a mock function with given fields: _a0, _a1 +func (_m *EventSink) SearchBlockEvents(_a0 context.Context, _a1 *query.Query) ([]int64, error) { + ret := _m.Called(_a0, _a1) + + var r0 []int64 + if rf, ok := ret.Get(0).(func(context.Context, *query.Query) []int64); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]int64) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *query.Query) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// SearchTxEvents provides a mock function with given fields: _a0, _a1 +func (_m *EventSink) SearchTxEvents(_a0 context.Context, _a1 *query.Query) ([]*types.TxResult, error) { + ret := _m.Called(_a0, _a1) + + var r0 []*types.TxResult + if rf, ok := ret.Get(0).(func(context.Context, *query.Query) []*types.TxResult); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*types.TxResult) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *query.Query) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Stop provides a mock function with given fields: +func (_m *EventSink) Stop() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Type provides a mock function with given fields: +func (_m *EventSink) Type() indexer.EventSinkType { + ret := _m.Called() + + var r0 indexer.EventSinkType + if rf, ok := ret.Get(0).(func() indexer.EventSinkType); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(indexer.EventSinkType) + } + + return r0 +} diff --git a/state/services.go b/state/services.go index a46863904..0ab6b78d0 100644 --- a/state/services.go +++ b/state/services.go @@ -9,6 +9,8 @@ import ( // NOTE: Interfaces used by RPC must be thread safe! //------------------------------------------------------ +//go:generate mockery --case underscore --name BlockStore + //------------------------------------------------------ // blockstore