Browse Source

move PartSetSize out of the config, into ConsensusParams

pull/650/head
Ethan Buchman 7 years ago
parent
commit
1f3e4d2d9a
8 changed files with 77 additions and 43 deletions
  1. +10
    -5
      blockchain/reactor.go
  2. +0
    -7
      config/config.go
  3. +27
    -0
      config/consensus.go
  4. +8
    -9
      consensus/replay_test.go
  5. +4
    -2
      consensus/state.go
  6. +7
    -7
      consensus/state_test.go
  7. +0
    -5
      types/block.go
  8. +21
    -8
      types/genesis.go

+ 10
- 5
blockchain/reactor.go View File

@ -28,7 +28,6 @@ const (
statusUpdateIntervalSeconds = 10 statusUpdateIntervalSeconds = 10
// check if we should switch to consensus reactor // check if we should switch to consensus reactor
switchToConsensusIntervalSeconds = 1 switchToConsensusIntervalSeconds = 1
maxBlockchainResponseSize = types.MaxBlockSize + 2 // TODO
) )
type consensusReactor interface { type consensusReactor interface {
@ -124,7 +123,7 @@ func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// Receive implements Reactor by handling 4 types of messages (look below). // Receive implements Reactor by handling 4 types of messages (look below).
func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
_, msg, err := DecodeMessage(msgBytes)
_, msg, err := DecodeMessage(msgBytes, bcR.maxMsgSize())
if err != nil { if err != nil {
bcR.Logger.Error("Error decoding message", "err", err) bcR.Logger.Error("Error decoding message", "err", err)
return return
@ -163,6 +162,12 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
} }
} }
// maxMsgSize returns the maximum allowable size of a
// message on the blockchain reactor.
func (bcR *BlockchainReactor) maxMsgSize() int {
return bcR.state.GenesisDoc.ConsensusParams.MaxBlockSizeBytes + 2
}
// Handle messages from the poolReactor telling the reactor what to do. // Handle messages from the poolReactor telling the reactor what to do.
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
// (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.) // (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
@ -221,7 +226,7 @@ FOR_LOOP:
// We need both to sync the first block. // We need both to sync the first block.
break SYNC_LOOP break SYNC_LOOP
} }
firstParts := first.MakePartSet(types.DefaultBlockPartSize)
firstParts := first.MakePartSet(bcR.state.GenesisDoc.ConsensusParams.BlockPartSizeBytes)
firstPartsHeader := firstParts.Header() firstPartsHeader := firstParts.Header()
// Finally, verify the first block using the second's commit // Finally, verify the first block using the second's commit
// NOTE: we can probably make this more efficient, but note that calling // NOTE: we can probably make this more efficient, but note that calling
@ -290,11 +295,11 @@ var _ = wire.RegisterInterface(
// DecodeMessage decodes BlockchainMessage. // DecodeMessage decodes BlockchainMessage.
// TODO: ensure that bz is completely read. // TODO: ensure that bz is completely read.
func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
func DecodeMessage(bz []byte, maxSize int) (msgType byte, msg BlockchainMessage, err error) {
msgType = bz[0] msgType = bz[0]
n := int(0) n := int(0)
r := bytes.NewReader(bz) r := bytes.NewReader(bz)
msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage
msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, maxSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage
if err != nil && n != len(bz) { if err != nil && n != len(bz) {
err = errors.New("DecodeMessage() had bytes left over") err = errors.New("DecodeMessage() had bytes left over")
} }


+ 0
- 7
config/config.go View File

@ -4,8 +4,6 @@ import (
"fmt" "fmt"
"path/filepath" "path/filepath"
"time" "time"
"github.com/tendermint/tendermint/types" // TODO: remove
) )
// Config defines the top level configuration for a Tendermint node // Config defines the top level configuration for a Tendermint node
@ -320,10 +318,6 @@ type ConsensusConfig struct {
CreateEmptyBlocks bool `mapstructure:"create_empty_blocks"` CreateEmptyBlocks bool `mapstructure:"create_empty_blocks"`
CreateEmptyBlocksInterval int `mapstructure:"create_empty_blocks_interval"` CreateEmptyBlocksInterval int `mapstructure:"create_empty_blocks_interval"`
// TODO: This probably shouldn't be exposed but it makes it
// easy to write tests for the wal/replay
BlockPartSize int `mapstructure:"block_part_size"`
// Reactor sleep duration parameters are in ms // Reactor sleep duration parameters are in ms
PeerGossipSleepDuration int `mapstructure:"peer_gossip_sleep_duration"` PeerGossipSleepDuration int `mapstructure:"peer_gossip_sleep_duration"`
PeerQueryMaj23SleepDuration int `mapstructure:"peer_query_maj23_sleep_duration"` PeerQueryMaj23SleepDuration int `mapstructure:"peer_query_maj23_sleep_duration"`
@ -386,7 +380,6 @@ func DefaultConsensusConfig() *ConsensusConfig {
MaxBlockSizeBytes: 1, // TODO MaxBlockSizeBytes: 1, // TODO
CreateEmptyBlocks: true, CreateEmptyBlocks: true,
CreateEmptyBlocksInterval: 0, CreateEmptyBlocksInterval: 0,
BlockPartSize: types.DefaultBlockPartSize, // TODO: we shouldnt be importing types
PeerGossipSleepDuration: 100, PeerGossipSleepDuration: 100,
PeerQueryMaj23SleepDuration: 2000, PeerQueryMaj23SleepDuration: 2000,
} }


+ 27
- 0
config/consensus.go View File

@ -0,0 +1,27 @@
package config
import (
"fmt"
)
type ConsensusParams struct {
MaxBlockSizeBytes int `json:"max_block_size_bytes"`
BlockPartSizeBytes int `json:"block_part_size_bytes"`
}
func DefaultConsensusParams() *ConsensusParams {
return &ConsensusParams{
MaxBlockSizeBytes: 22020096, // 21MB
BlockPartSizeBytes: 65536, // 64kB,
}
}
func (params *ConsensusParams) Validate() error {
if params.MaxBlockSizeBytes <= 0 {
return fmt.Errorf("MaxBlockSizeBytes must be greater than 0. Got %d", params.MaxBlockSizeBytes)
}
if params.BlockPartSizeBytes <= 0 {
return fmt.Errorf("BlockPartSizeBytes must be greater than 0. Got %d", params.BlockPartSizeBytes)
}
return nil
}

+ 8
- 9
consensus/replay_test.go View File

@ -267,8 +267,6 @@ func testReplayCrashBeforeWriteVote(t *testing.T, thisCase *testCase, lineNum in
var ( var (
NUM_BLOCKS = 6 // number of blocks in the test_data/many_blocks.cswal NUM_BLOCKS = 6 // number of blocks in the test_data/many_blocks.cswal
mempool = types.MockMempool{} mempool = types.MockMempool{}
testPartSize int
) )
//--------------------------------------- //---------------------------------------
@ -320,7 +318,6 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
config.Consensus.SetWalFile(walFile) config.Consensus.SetWalFile(walFile)
privVal := types.LoadPrivValidator(config.PrivValidatorFile()) privVal := types.LoadPrivValidator(config.PrivValidatorFile())
testPartSize = config.Consensus.BlockPartSize
wal, err := NewWAL(walFile, false) wal, err := NewWAL(walFile, false)
if err != nil { if err != nil {
@ -384,6 +381,7 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
} }
func applyBlock(st *sm.State, blk *types.Block, proxyApp proxy.AppConns) { func applyBlock(st *sm.State, blk *types.Block, proxyApp proxy.AppConns) {
testPartSize := st.GenesisDoc.ConsensusParams.BlockPartSizeBytes
err := st.ApplyBlock(nil, proxyApp.Consensus(), blk, blk.MakePartSet(testPartSize).Header(), mempool) err := st.ApplyBlock(nil, proxyApp.Consensus(), blk, blk.MakePartSet(testPartSize).Header(), mempool)
if err != nil { if err != nil {
panic(err) panic(err)
@ -503,7 +501,7 @@ func makeBlockchainFromWAL(wal *WAL) ([]*types.Block, []*types.Commit, error) {
// if its not the first one, we have a full block // if its not the first one, we have a full block
if blockParts != nil { if blockParts != nil {
var n int var n int
block := wire.ReadBinary(&types.Block{}, blockParts.GetReader(), types.MaxBlockSize, &n, &err).(*types.Block)
block := wire.ReadBinary(&types.Block{}, blockParts.GetReader(), 0, &n, &err).(*types.Block)
blocks = append(blocks, block) blocks = append(blocks, block)
} }
blockParts = types.NewPartSetFromHeader(*p) blockParts = types.NewPartSetFromHeader(*p)
@ -524,7 +522,7 @@ func makeBlockchainFromWAL(wal *WAL) ([]*types.Block, []*types.Commit, error) {
} }
// grab the last block too // grab the last block too
var n int var n int
block := wire.ReadBinary(&types.Block{}, blockParts.GetReader(), types.MaxBlockSize, &n, &err).(*types.Block)
block := wire.ReadBinary(&types.Block{}, blockParts.GetReader(), 0, &n, &err).(*types.Block)
blocks = append(blocks, block) blocks = append(blocks, block)
return blocks, commits, nil return blocks, commits, nil
} }
@ -563,7 +561,7 @@ func stateAndStore(config *cfg.Config, pubKey crypto.PubKey) (*sm.State, *mockBl
state := sm.MakeGenesisStateFromFile(stateDB, config.GenesisFile()) state := sm.MakeGenesisStateFromFile(stateDB, config.GenesisFile())
state.SetLogger(log.TestingLogger().With("module", "state")) state.SetLogger(log.TestingLogger().With("module", "state"))
store := NewMockBlockStore(config)
store := NewMockBlockStore(config, state.GenesisDoc.ConsensusParams)
return state, store return state, store
} }
@ -572,13 +570,14 @@ func stateAndStore(config *cfg.Config, pubKey crypto.PubKey) (*sm.State, *mockBl
type mockBlockStore struct { type mockBlockStore struct {
config *cfg.Config config *cfg.Config
params *cfg.ConsensusParams
chain []*types.Block chain []*types.Block
commits []*types.Commit commits []*types.Commit
} }
// TODO: NewBlockStore(db.NewMemDB) ... // TODO: NewBlockStore(db.NewMemDB) ...
func NewMockBlockStore(config *cfg.Config) *mockBlockStore {
return &mockBlockStore{config, nil, nil}
func NewMockBlockStore(config *cfg.Config, params *cfg.ConsensusParams) *mockBlockStore {
return &mockBlockStore{config, params, nil, nil}
} }
func (bs *mockBlockStore) Height() int { return len(bs.chain) } func (bs *mockBlockStore) Height() int { return len(bs.chain) }
@ -586,7 +585,7 @@ func (bs *mockBlockStore) LoadBlock(height int) *types.Block { return bs.chain[h
func (bs *mockBlockStore) LoadBlockMeta(height int) *types.BlockMeta { func (bs *mockBlockStore) LoadBlockMeta(height int) *types.BlockMeta {
block := bs.chain[height-1] block := bs.chain[height-1]
return &types.BlockMeta{ return &types.BlockMeta{
BlockID: types.BlockID{block.Hash(), block.MakePartSet(bs.config.Consensus.BlockPartSize).Header()},
BlockID: types.BlockID{block.Hash(), block.MakePartSet(bs.params.BlockPartSizeBytes).Header()},
Header: block.Header, Header: block.Header,
} }
} }


+ 4
- 2
consensus/state.go View File

@ -983,7 +983,8 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts
txs := cs.mempool.Reap(cs.config.MaxBlockSizeTxs) txs := cs.mempool.Reap(cs.config.MaxBlockSizeTxs)
return types.MakeBlock(cs.Height, cs.state.ChainID, txs, commit, return types.MakeBlock(cs.Height, cs.state.ChainID, txs, commit,
cs.state.LastBlockID, cs.state.Validators.Hash(), cs.state.AppHash, cs.config.BlockPartSize)
cs.state.LastBlockID, cs.state.Validators.Hash(),
cs.state.AppHash, cs.state.GenesisDoc.ConsensusParams.BlockPartSizeBytes)
} }
// Enter: `timeoutPropose` after entering Propose. // Enter: `timeoutPropose` after entering Propose.
@ -1417,7 +1418,8 @@ func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part, ver
// Added and completed! // Added and completed!
var n int var n int
var err error var err error
cs.ProposalBlock = wire.ReadBinary(&types.Block{}, cs.ProposalBlockParts.GetReader(), types.MaxBlockSize, &n, &err).(*types.Block)
cs.ProposalBlock = wire.ReadBinary(&types.Block{}, cs.ProposalBlockParts.GetReader(),
cs.state.GenesisDoc.ConsensusParams.MaxBlockSizeBytes, &n, &err).(*types.Block)
// NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal // NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal
cs.Logger.Info("Received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash()) cs.Logger.Info("Received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash())
if cs.Step == RoundStepPropose && cs.isProposalComplete() { if cs.Step == RoundStepPropose && cs.isProposalComplete() {


+ 7
- 7
consensus/state_test.go View File

@ -180,7 +180,7 @@ func TestBadProposal(t *testing.T) {
height, round := cs1.Height, cs1.Round height, round := cs1.Height, cs1.Round
vs2 := vss[1] vs2 := vss[1]
partSize := config.Consensus.BlockPartSize
partSize := cs1.state.GenesisDoc.ConsensusParams.BlockPartSizeBytes
proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1)
voteCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringVote(), 1) voteCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringVote(), 1)
@ -327,7 +327,7 @@ func TestLockNoPOL(t *testing.T) {
vs2 := vss[1] vs2 := vss[1]
height := cs1.Height height := cs1.Height
partSize := config.Consensus.BlockPartSize
partSize := cs1.state.GenesisDoc.ConsensusParams.BlockPartSizeBytes
timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1) timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1)
timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1) timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1)
@ -493,7 +493,7 @@ func TestLockPOLRelock(t *testing.T) {
cs1, vss := randConsensusState(4) cs1, vss := randConsensusState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3] vs2, vs3, vs4 := vss[1], vss[2], vss[3]
partSize := config.Consensus.BlockPartSize
partSize := cs1.state.GenesisDoc.ConsensusParams.BlockPartSizeBytes
timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1) timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1)
timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1) timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1)
@ -608,7 +608,7 @@ func TestLockPOLUnlock(t *testing.T) {
cs1, vss := randConsensusState(4) cs1, vss := randConsensusState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3] vs2, vs3, vs4 := vss[1], vss[2], vss[3]
partSize := config.Consensus.BlockPartSize
partSize := cs1.state.GenesisDoc.ConsensusParams.BlockPartSizeBytes
proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1)
timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1) timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1)
@ -703,7 +703,7 @@ func TestLockPOLSafety1(t *testing.T) {
cs1, vss := randConsensusState(4) cs1, vss := randConsensusState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3] vs2, vs3, vs4 := vss[1], vss[2], vss[3]
partSize := config.Consensus.BlockPartSize
partSize := cs1.state.GenesisDoc.ConsensusParams.BlockPartSizeBytes
proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1)
timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1) timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1)
@ -824,7 +824,7 @@ func TestLockPOLSafety2(t *testing.T) {
cs1, vss := randConsensusState(4) cs1, vss := randConsensusState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3] vs2, vs3, vs4 := vss[1], vss[2], vss[3]
partSize := config.Consensus.BlockPartSize
partSize := cs1.state.GenesisDoc.ConsensusParams.BlockPartSizeBytes
proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1)
timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1) timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1)
@ -999,7 +999,7 @@ func TestHalt1(t *testing.T) {
cs1, vss := randConsensusState(4) cs1, vss := randConsensusState(4)
vs2, vs3, vs4 := vss[1], vss[2], vss[3] vs2, vs3, vs4 := vss[1], vss[2], vss[3]
partSize := config.Consensus.BlockPartSize
partSize := cs1.state.GenesisDoc.ConsensusParams.BlockPartSizeBytes
proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1)
timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1) timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1)


+ 0
- 5
types/block.go View File

@ -14,11 +14,6 @@ import (
"github.com/tendermint/tmlibs/merkle" "github.com/tendermint/tmlibs/merkle"
) )
const (
MaxBlockSize = 22020096 // 21MB TODO make it configurable
DefaultBlockPartSize = 65536 // 64kB TODO: put part size in parts header?
)
// Block defines the atomic unit of a Tendermint blockchain // Block defines the atomic unit of a Tendermint blockchain
type Block struct { type Block struct {
*Header `json:"header"` *Header `json:"header"`


+ 21
- 8
types/genesis.go View File

@ -10,6 +10,8 @@ import (
"github.com/tendermint/go-crypto" "github.com/tendermint/go-crypto"
"github.com/tendermint/go-wire/data" "github.com/tendermint/go-wire/data"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
cfg "github.com/tendermint/tendermint/config"
) )
//------------------------------------------------------------ //------------------------------------------------------------
@ -24,10 +26,11 @@ type GenesisValidator struct {
// GenesisDoc defines the initial conditions for a tendermint blockchain, in particular its validator set. // GenesisDoc defines the initial conditions for a tendermint blockchain, in particular its validator set.
type GenesisDoc struct { type GenesisDoc struct {
GenesisTime time.Time `json:"genesis_time"`
ChainID string `json:"chain_id"`
Validators []GenesisValidator `json:"validators"`
AppHash data.Bytes `json:"app_hash"`
GenesisTime time.Time `json:"genesis_time"`
ChainID string `json:"chain_id"`
ConsensusParams *cfg.ConsensusParams `json:"consensus_params"`
Validators []GenesisValidator `json:"validators"`
AppHash data.Bytes `json:"app_hash"`
} }
// SaveAs is a utility method for saving GenensisDoc as a JSON file. // SaveAs is a utility method for saving GenensisDoc as a JSON file.
@ -56,6 +59,19 @@ func (genDoc *GenesisDoc) ValidatorHash() []byte {
func GenesisDocFromJSON(jsonBlob []byte) (*GenesisDoc, error) { func GenesisDocFromJSON(jsonBlob []byte) (*GenesisDoc, error) {
genDoc := GenesisDoc{} genDoc := GenesisDoc{}
err := json.Unmarshal(jsonBlob, &genDoc) err := json.Unmarshal(jsonBlob, &genDoc)
// validate genesis
if genDoc.ChainID == "" {
return nil, errors.Errorf("Genesis doc must include non-empty chain_id")
}
if genDoc.ConsensusParams == nil {
genDoc.ConsensusParams = cfg.DefaultConsensusParams()
} else {
if err := genDoc.ConsensusParams.Validate(); err != nil {
return nil, err
}
}
return &genDoc, err return &genDoc, err
} }
@ -67,10 +83,7 @@ func GenesisDocFromFile(genDocFile string) (*GenesisDoc, error) {
} }
genDoc, err := GenesisDocFromJSON(jsonBlob) genDoc, err := GenesisDocFromJSON(jsonBlob)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Error reading GenesisDoc")
}
if genDoc.ChainID == "" {
return nil, errors.Errorf("Genesis doc %v must include non-empty chain_id", genDocFile)
return nil, errors.Wrap(err, cmn.Fmt("Error reading GenesisDoc at %v", genDocFile))
} }
return genDoc, nil return genDoc, nil
} }

Loading…
Cancel
Save