Browse Source

Cleanup of code and code docs

This cleans up some of the code in the state package
pull/750/head
Adrian Brink 7 years ago
committed by Ethan Buchman
parent
commit
782a836db0
8 changed files with 127 additions and 84 deletions
  1. +7
    -7
      mempool/mempool.go
  2. +3
    -1
      rpc/lib/client/http_client.go
  3. +70
    -47
      state/state.go
  4. +28
    -11
      state/state_test.go
  5. +9
    -10
      state/txindex/indexer.go
  6. +5
    -3
      state/txindex/kv/kv.go
  7. +2
    -2
      state/txindex/null/null.go
  8. +3
    -3
      types/block.go

+ 7
- 7
mempool/mempool.go View File

@ -50,9 +50,10 @@ TODO: Better handle abci client errors. (make it automatically handle connection
const cacheSize = 100000
// Mempool is an ordered in-memory pool for transactions before they are proposed in a consensus round.
// Transaction validity is checked using the CheckTx abci message before the transaction is added to the pool.
// The Mempool uses a concurrent list structure for storing transactions that can be efficiently accessed by multiple concurrent readers.
// Mempool is an ordered in-memory pool for transactions before they are proposed in a consensus
// round. Transaction validity is checked using the CheckTx abci message before the transaction is
// added to the pool. The Mempool uses a concurrent list structure for storing transactions that
// can be efficiently accessed by multiple concurrent readers.
type Mempool struct {
config *cfg.MempoolConfig
@ -78,6 +79,7 @@ type Mempool struct {
}
// NewMempool returns a new Mempool with the given configuration and connection to an application.
// TODO: Extract logger into arguments.
func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool, height int) *Mempool {
mempool := &Mempool{
config: config,
@ -162,7 +164,7 @@ func (mem *Mempool) TxsFrontWait() *clist.CElement {
// It blocks if we're waiting on Update() or Reap().
// cb: A callback from the CheckTx command.
// It gets called from another goroutine.
// CONTRACT: Either cb will get called, or err returned.
// NOTE: Either cb will get called, or err returned.
func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
@ -287,9 +289,7 @@ func (mem *Mempool) notifyTxsAvailable() {
if mem.Size() == 0 {
panic("notified txs available but mempool is empty!")
}
if mem.txsAvailable != nil &&
!mem.notifiedTxsAvailable {
if mem.txsAvailable != nil && !mem.notifiedTxsAvailable {
mem.notifiedTxsAvailable = true
mem.txsAvailable <- mem.height + 1
}


+ 3
- 1
rpc/lib/client/http_client.go View File

@ -12,6 +12,7 @@ import (
"strings"
"github.com/pkg/errors"
types "github.com/tendermint/tendermint/rpc/lib/types"
)
@ -60,12 +61,13 @@ func makeHTTPClient(remoteAddr string) (string, *http.Client) {
//------------------------------------------------------------------------------------
// JSON rpc takes params as a slice
// JSONRPCClient takes params as a slice
type JSONRPCClient struct {
address string
client *http.Client
}
// NewJSONRPCClient takes an address and returns a pointer to an instance of JSONRPCClient
func NewJSONRPCClient(remote string) *JSONRPCClient {
address, client := makeHTTPClient(remote)
return &JSONRPCClient{


+ 70
- 47
state/state.go View File

@ -8,11 +8,13 @@ import (
"time"
abci "github.com/tendermint/abci/types"
cmn "github.com/tendermint/tmlibs/common"
dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log"
wire "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/state/txindex/null"
"github.com/tendermint/tendermint/types"
@ -50,18 +52,18 @@ type State struct {
LastBlockTime time.Time
Validators *types.ValidatorSet
LastValidators *types.ValidatorSet
// AppHash is updated after Commit
AppHash []byte
TxIndexer txindex.TxIndexer `json:"-"` // Transaction indexer
// When a block returns a validator set change via EndBlock,
// the change only applies to the next block.
// So, if s.LastBlockHeight causes a valset change,
// we set s.LastHeightValidatorsChanged = s.LastBlockHeight + 1
LastHeightValidatorsChanged int
// AppHash is updated after Commit
AppHash []byte
// TxIndexer indexes transactions
TxIndexer txindex.TxIndexer `json:"-"`
logger log.Logger
}
@ -88,19 +90,21 @@ func LoadState(db dbm.DB) *State {
}
func loadState(db dbm.DB, key []byte) *State {
s := &State{db: db, TxIndexer: &null.TxIndex{}}
buf := db.Get(key)
if len(buf) == 0 {
return nil
} else {
r, n, err := bytes.NewReader(buf), new(int), new(error)
wire.ReadBinaryPtr(&s, r, 0, n, err)
if *err != nil {
// DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED
cmn.Exit(cmn.Fmt("LoadState: Data has been corrupted or its spec has changed: %v\n", *err))
}
// TODO: ensure that buf is completely read.
}
s := &State{db: db, TxIndexer: &null.TxIndex{}}
r, n, err := bytes.NewReader(buf), new(int), new(error)
wire.ReadBinaryPtr(&s, r, 0, n, err)
if *err != nil {
// DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED
cmn.Exit(cmn.Fmt(`LoadState: Data has been corrupted or its spec has changed:
%v\n`, *err))
}
// TODO: ensure that buf is completely read.
return s
}
@ -110,6 +114,8 @@ func (s *State) SetLogger(l log.Logger) {
}
// Copy makes a copy of the State for mutating.
// NOTE: Does not create a copy of TxIndexer. It creates a new pointer that points to the same
// underlying TxIndexer.
func (s *State) Copy() *State {
return &State{
db: s.db,
@ -119,7 +125,7 @@ func (s *State) Copy() *State {
Validators: s.Validators.Copy(),
LastValidators: s.LastValidators.Copy(),
AppHash: s.AppHash,
TxIndexer: s.TxIndexer, // pointer here, not value
TxIndexer: s.TxIndexer,
LastHeightValidatorsChanged: s.LastHeightValidatorsChanged,
logger: s.logger,
ChainID: s.ChainID,
@ -131,6 +137,7 @@ func (s *State) Copy() *State {
func (s *State) Save() {
s.mtx.Lock()
defer s.mtx.Unlock()
s.saveValidatorsInfo()
s.db.SetSync(stateKey, s.Bytes())
}
@ -142,38 +149,43 @@ func (s *State) SaveABCIResponses(abciResponses *ABCIResponses) {
}
// LoadABCIResponses loads the ABCIResponses from the database.
// This is useful for recovering from crashes where we called app.Commit and before we called
// s.Save()
func (s *State) LoadABCIResponses() *ABCIResponses {
abciResponses := new(ABCIResponses)
buf := s.db.Get(abciResponsesKey)
if len(buf) != 0 {
r, n, err := bytes.NewReader(buf), new(int), new(error)
wire.ReadBinaryPtr(abciResponses, r, 0, n, err)
if *err != nil {
// DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED
cmn.Exit(cmn.Fmt("LoadABCIResponses: Data has been corrupted or its spec has changed: %v\n", *err))
}
// TODO: ensure that buf is completely read.
if len(buf) == 0 {
return nil
}
abciResponses := new(ABCIResponses)
r, n, err := bytes.NewReader(buf), new(int), new(error)
wire.ReadBinaryPtr(abciResponses, r, 0, n, err)
if *err != nil {
// DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED
cmn.Exit(cmn.Fmt(`LoadABCIResponses: Data has been corrupted or its spec has
changed: %v\n`, *err))
}
// TODO: ensure that buf is completely read.
return abciResponses
}
// LoadValidators loads the ValidatorSet for a given height.
func (s *State) LoadValidators(height int) (*types.ValidatorSet, error) {
v := s.loadValidators(height)
if v == nil {
valInfo := s.loadValidators(height)
if valInfo == nil {
return nil, ErrNoValSetForHeight{height}
}
if v.ValidatorSet == nil {
v = s.loadValidators(v.LastHeightChanged)
if v == nil {
cmn.PanicSanity(fmt.Sprintf(`Couldn't find validators at
height %d as last changed from height %d`, v.LastHeightChanged, height))
if valInfo.ValidatorSet == nil {
valInfo = s.loadValidators(valInfo.LastHeightChanged)
if valInfo == nil {
cmn.PanicSanity(fmt.Sprintf(`Couldn't find validators at height %d as
last changed from height %d`, valInfo.LastHeightChanged, height))
}
}
return v.ValidatorSet, nil
return valInfo.ValidatorSet, nil
}
func (s *State) loadValidators(height int) *ValidatorsInfo {
@ -187,9 +199,11 @@ func (s *State) loadValidators(height int) *ValidatorsInfo {
wire.ReadBinaryPtr(v, r, 0, n, err)
if *err != nil {
// DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED
cmn.Exit(cmn.Fmt("LoadValidators: Data has been corrupted or its spec has changed: %v\n", *err))
cmn.Exit(cmn.Fmt(`LoadValidators: Data has been corrupted or its spec has changed:
%v\n`, *err))
}
// TODO: ensure that buf is completely read.
return v
}
@ -200,13 +214,13 @@ func (s *State) loadValidators(height int) *ValidatorsInfo {
func (s *State) saveValidatorsInfo() {
changeHeight := s.LastHeightValidatorsChanged
nextHeight := s.LastBlockHeight + 1
vi := &ValidatorsInfo{
valInfo := &ValidatorsInfo{
LastHeightChanged: changeHeight,
}
if changeHeight == nextHeight {
vi.ValidatorSet = s.Validators
valInfo.ValidatorSet = s.Validators
}
s.db.SetSync(calcValidatorsKey(nextHeight), vi.Bytes())
s.db.SetSync(calcValidatorsKey(nextHeight), valInfo.Bytes())
}
// Equals returns true if the States are identical.
@ -219,8 +233,10 @@ func (s *State) Bytes() []byte {
return wire.BinaryBytes(s)
}
// SetBlockAndValidators mutates State variables to update block and validators after running EndBlock.
func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, abciResponses *ABCIResponses) {
// SetBlockAndValidators mutates State variables to update block and validators after running
// EndBlock.
func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader,
abciResponses *ABCIResponses) {
// copy the valset so we can apply changes from EndBlock
// and update s.LastValidators and s.Validators
@ -248,8 +264,7 @@ func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader typ
}
func (s *State) setBlockAndValidators(
height int, blockID types.BlockID, blockTime time.Time,
func (s *State) setBlockAndValidators(height int, blockID types.BlockID, blockTime time.Time,
prevValSet, nextValSet *types.ValidatorSet) {
s.LastBlockHeight = height
@ -260,10 +275,17 @@ func (s *State) setBlockAndValidators(
}
// GetValidators returns the last and current validator sets.
func (s *State) GetValidators() (*types.ValidatorSet, *types.ValidatorSet) {
func (s *State) GetValidators() (last *types.ValidatorSet, current *types.ValidatorSet) {
return s.LastValidators, s.Validators
}
// Params returns the consensus parameters used for validating blocks
func (s *State) Params() types.ConsensusParams {
// TODO: this should move into the State proper
// when we allow the app to change it
return *s.GenesisDoc.ConsensusParams
}
//------------------------------------------------------------------------
// ABCIResponses retains the responses of the various ABCI calls during block processing.
@ -293,15 +315,15 @@ func (a *ABCIResponses) Bytes() []byte {
//-----------------------------------------------------------------------------
// ValidatorsInfo represents the latest validator set, or the last time it changed
// ValidatorsInfo represents the latest validator set, or the last height it changed
type ValidatorsInfo struct {
ValidatorSet *types.ValidatorSet
LastHeightChanged int
}
// Bytes serializes the ValidatorsInfo using go-wire
func (vi *ValidatorsInfo) Bytes() []byte {
return wire.BinaryBytes(*vi)
func (valInfo *ValidatorsInfo) Bytes() []byte {
return wire.BinaryBytes(*valInfo)
}
//------------------------------------------------------------------------
@ -353,6 +375,7 @@ func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) (*State, error) {
}
}
// we do not need indexer during replay and in tests
return &State{
db: db,
@ -365,7 +388,7 @@ func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) (*State, error) {
Validators: types.NewValidatorSet(validators),
LastValidators: types.NewValidatorSet(nil),
AppHash: genDoc.AppHash,
TxIndexer: &null.TxIndex{}, // we do not need indexer during replay and in tests
TxIndexer: &null.TxIndex{},
LastHeightValidatorsChanged: 1,
}, nil
}

+ 28
- 11
state/state_test.go View File

@ -7,15 +7,16 @@ import (
"github.com/stretchr/testify/assert"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/types"
abci "github.com/tendermint/abci/types"
crypto "github.com/tendermint/go-crypto"
cmn "github.com/tendermint/tmlibs/common"
dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/types"
)
// setupTestCase does setup common to all test cases
@ -31,22 +32,29 @@ func setupTestCase(t *testing.T) (func(t *testing.T), dbm.DB, *State) {
return tearDown, stateDB, state
}
// TestStateCopy tests the correct copying behaviour of State.
func TestStateCopy(t *testing.T) {
tearDown, _, state := setupTestCase(t)
defer tearDown(t)
// nolint: vetshadow
assert := assert.New(t)
stateCopy := state.Copy()
assert.True(state.Equals(stateCopy),
cmn.Fmt("expected state and its copy to be identical. got %v\n expected %v\n", stateCopy, state))
cmn.Fmt(`expected state and its copy to be identical. got %v\n expected %v\n`,
stateCopy, state))
stateCopy.LastBlockHeight++
assert.False(state.Equals(stateCopy), cmn.Fmt("expected states to be different. got same %v", state))
assert.False(state.Equals(stateCopy), cmn.Fmt(`expected states to be different. got same
%v`, state))
}
// TestStateSaveLoad tests saving and loading State from a db.
func TestStateSaveLoad(t *testing.T) {
tearDown, stateDB, state := setupTestCase(t)
defer tearDown(t)
// nolint: vetshadow
assert := assert.New(t)
state.LastBlockHeight++
@ -54,12 +62,15 @@ func TestStateSaveLoad(t *testing.T) {
loadedState := LoadState(stateDB)
assert.True(state.Equals(loadedState),
cmn.Fmt("expected state and its copy to be identical. got %v\n expected %v\n", loadedState, state))
cmn.Fmt(`expected state and its copy to be identical. got %v\n expected %v\n`,
loadedState, state))
}
// TestABCIResponsesSaveLoad tests saving and loading ABCIResponses.
func TestABCIResponsesSaveLoad(t *testing.T) {
tearDown, _, state := setupTestCase(t)
defer tearDown(t)
// nolint: vetshadow
assert := assert.New(t)
state.LastBlockHeight++
@ -78,17 +89,20 @@ func TestABCIResponsesSaveLoad(t *testing.T) {
abciResponses.txs = nil
state.SaveABCIResponses(abciResponses)
abciResponses2 := state.LoadABCIResponses()
assert.Equal(abciResponses, abciResponses2,
cmn.Fmt("ABCIResponses don't match: Got %v, Expected %v", abciResponses2, abciResponses))
loadedAbciResponses := state.LoadABCIResponses()
assert.Equal(abciResponses, loadedAbciResponses,
cmn.Fmt(`ABCIResponses don't match: Got %v, Expected %v`, loadedAbciResponses,
abciResponses))
}
// TestValidatorSimpleSaveLoad tests saving and loading validators.
func TestValidatorSimpleSaveLoad(t *testing.T) {
tearDown, _, state := setupTestCase(t)
defer tearDown(t)
// nolint: vetshadow
assert := assert.New(t)
// cant load anything for height 0
// can't load anything for height 0
v, err := state.LoadValidators(0)
assert.IsType(ErrNoValSetForHeight{}, err, "expected err at height 0")
@ -116,9 +130,11 @@ func TestValidatorSimpleSaveLoad(t *testing.T) {
assert.IsType(ErrNoValSetForHeight{}, err, "expected err at unknown height")
}
// TestValidatorChangesSaveLoad tests saving and loading a validator set with changes.
func TestValidatorChangesSaveLoad(t *testing.T) {
tearDown, _, state := setupTestCase(t)
defer tearDown(t)
// nolint: vetshadow
assert := assert.New(t)
// change vals at these heights
@ -171,7 +187,8 @@ func TestValidatorChangesSaveLoad(t *testing.T) {
assert.Equal(v.Size(), 1, "validator set size is greater than 1: %d", v.Size())
addr, _ := v.GetByIndex(0)
assert.Equal(addr, testCase.vals.Address(), fmt.Sprintf("unexpected pubkey at height %d", testCase.height))
assert.Equal(addr, testCase.vals.Address(), fmt.Sprintf(`unexpected pubkey at
height %d`, testCase.height))
}
}


+ 9
- 10
state/txindex/indexer.go View File

@ -6,17 +6,17 @@ import (
"github.com/tendermint/tendermint/types"
)
// Indexer interface defines methods to index and search transactions.
// TxIndexer interface defines methods to index and search transactions.
// TODO: Add the ability to create an independent TxIndexer.
type TxIndexer interface {
// Batch analyzes, indexes or stores a batch of transactions.
//
// NOTE We do not specify Index method for analyzing a single transaction
// AddBatch analyzes, indexes or stores a batch of transactions.
// NOTE: We do not specify Index method for analyzing a single transaction
// here because it bears heavy perfomance loses. Almost all advanced indexers
// support batching.
AddBatch(b *Batch) error
// Tx returns specified transaction or nil if the transaction is not indexed
// Get return the transaction specified by hash or nil if the transaction is not indexed
// or stored.
Get(hash []byte) (*types.TxResult, error)
}
@ -24,10 +24,9 @@ type TxIndexer interface {
//----------------------------------------------------
// Txs are written as a batch
// A Batch groups together multiple Index operations you would like performed
// at the same time. The Batch structure is NOT thread-safe. You should only
// perform operations on a batch from a single thread at a time. Once batch
// execution has started, you may not modify it.
// Batch groups together multiple Index operations you would like performed
// at the same time.
// NOTE: Bach is NOT thread-safe and should not be modified after starting its execution.
type Batch struct {
Ops []types.TxResult
}
@ -39,7 +38,7 @@ func NewBatch(n int) *Batch {
}
}
// Index adds or updates entry for the given result.Index.
// Add or update an entry for the given result.Index.
func (b *Batch) Add(result types.TxResult) error {
b.Ops[result.Index] = result
return nil


+ 5
- 3
state/txindex/kv/kv.go View File

@ -4,14 +4,16 @@ import (
"bytes"
"fmt"
db "github.com/tendermint/tmlibs/db"
"github.com/tendermint/go-wire"
db "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types"
)
// TxIndex is the simplest possible indexer, backed by Key-Value storage (levelDB).
// It could only index transaction by its identifier.
// It can only index transaction by its identifier.
type TxIndex struct {
store db.DB
}
@ -44,7 +46,7 @@ func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
return txResult, nil
}
// Batch writes a batch of transactions into the TxIndex storage.
// AddBatch writes a batch of transactions into the TxIndex storage.
func (txi *TxIndex) AddBatch(b *txindex.Batch) error {
storeBatch := txi.store.NewBatch()
for _, result := range b.Ops {


+ 2
- 2
state/txindex/null/null.go View File

@ -10,12 +10,12 @@ import (
// TxIndex acts as a /dev/null.
type TxIndex struct{}
// Tx panics.
// Get with a hash panics.
func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
return nil, errors.New(`Indexing is disabled (set 'tx_index = "kv"' in config)`)
}
// Batch returns nil.
// AddBatch returns nil.
func (txi *TxIndex) AddBatch(batch *txindex.Batch) error {
return nil
}

+ 3
- 3
types/block.go View File

@ -14,15 +14,15 @@ import (
"github.com/tendermint/tmlibs/merkle"
)
// Block defines the atomic unit of a Tendermint blockchain
// Block defines the atomic unit of a Tendermint blockchain.
type Block struct {
*Header `json:"header"`
*Data `json:"data"`
LastCommit *Commit `json:"last_commit"`
}
// MakeBlock returns a new block and corresponding part set from the given information
// TODO: version
// MakeBlock returns a new block and corresponding part set from the given information.
// TODO: Add version information to the Block struct.
func MakeBlock(height int, chainID string, txs []Tx, commit *Commit,
prevBlockID BlockID, valHash, appHash []byte, partSize int) (*Block, *PartSet) {
block := &Block{


Loading…
Cancel
Save