Browse Source

Review from Anton

pull/1347/head
Jae Kwon 7 years ago
parent
commit
fb64314d1c
16 changed files with 64 additions and 61 deletions
  1. +12
    -8
      blockchain/reactor.go
  2. +4
    -9
      blockchain/store.go
  3. +1
    -1
      blockchain/store_test.go
  4. +4
    -4
      consensus/common_test.go
  5. +19
    -16
      consensus/reactor.go
  6. +2
    -2
      consensus/replay_test.go
  7. +1
    -1
      consensus/state.go
  8. +1
    -1
      consensus/wal.go
  9. +2
    -3
      consensus/wal_generator.go
  10. +1
    -1
      evidence/reactor.go
  11. +5
    -5
      node/node.go
  12. +2
    -2
      p2p/conn/connection.go
  13. +2
    -1
      p2p/conn/connection_test.go
  14. +3
    -2
      p2p/conn/secret_connection_test.go
  15. +1
    -1
      rpc/lib/server/handlers.go
  16. +4
    -4
      rpc/test/helpers.go

+ 12
- 8
blockchain/reactor.go View File

@ -26,6 +26,13 @@ const (
statusUpdateIntervalSeconds = 10 statusUpdateIntervalSeconds = 10
// check if we should switch to consensus reactor // check if we should switch to consensus reactor
switchToConsensusIntervalSeconds = 1 switchToConsensusIntervalSeconds = 1
// NOTE: keep up to date with bcBlockResponseMessage
bcBlockResponseMessagePrefixSize = 4
bcBlockResponseMessageFieldKeySize = 1
maxMessageSize = types.MaxBlockSizeBytes +
bcBlockResponseMessagePrefixSize +
bcBlockResponseMessageFieldKeySize
) )
type consensusReactor interface { type consensusReactor interface {
@ -122,9 +129,11 @@ func (bcR *BlockchainReactor) OnStop() {
func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{ return []*p2p.ChannelDescriptor{
{ {
ID: BlockchainChannel,
Priority: 10,
SendQueueCapacity: 1000,
ID: BlockchainChannel,
Priority: 10,
SendQueueCapacity: 1000,
RecvBufferCapacity: 50 * 4096,
RecvMessageCapacity: maxMessageSize,
}, },
} }
} }
@ -336,11 +345,6 @@ func RegisterBlockchainMessages(cdc *amino.Codec) {
// DecodeMessage decodes BlockchainMessage. // DecodeMessage decodes BlockchainMessage.
// TODO: ensure that bz is completely read. // TODO: ensure that bz is completely read.
func DecodeMessage(bz []byte) (msg BlockchainMessage, err error) { func DecodeMessage(bz []byte) (msg BlockchainMessage, err error) {
const (
prefixSize = 4
fieldKeySize = 1
maxMessageSize = types.MaxBlockSizeBytes + prefixSize + fieldKeySize
)
err = cdc.UnmarshalBinaryBare(bz, &msg) err = cdc.UnmarshalBinaryBare(bz, &msg)
if err != nil { if err != nil {
err = cmn.ErrorWrap(err, "DecodeMessage() had bytes left over") err = cmn.ErrorWrap(err, "DecodeMessage() had bytes left over")


+ 4
- 9
blockchain/store.go View File

@ -53,15 +53,10 @@ func (bs *BlockStore) Height() int64 {
// LoadBlock returns the block with the given height. // LoadBlock returns the block with the given height.
// If no block is found for that height, it returns nil. // If no block is found for that height, it returns nil.
func (bs *BlockStore) LoadBlock(height int64) *types.Block { func (bs *BlockStore) LoadBlock(height int64) *types.Block {
var blockMeta = new(types.BlockMeta)
bz := bs.db.Get(calcBlockMetaKey(height))
if len(bz) == 0 {
var blockMeta = bs.LoadBlockMeta(height)
if blockMeta == nil {
return nil return nil
} }
err := cdc.UnmarshalBinaryBare(bz, blockMeta)
if err != nil {
panic(cmn.ErrorWrap(err, "Error reading block meta"))
}
var block = new(types.Block) var block = new(types.Block)
buf := []byte{} buf := []byte{}
@ -69,7 +64,7 @@ func (bs *BlockStore) LoadBlock(height int64) *types.Block {
part := bs.LoadBlockPart(height, i) part := bs.LoadBlockPart(height, i)
buf = append(buf, part.Bytes...) buf = append(buf, part.Bytes...)
} }
err = cdc.UnmarshalBinary(buf, block)
err := cdc.UnmarshalBinary(buf, block)
if err != nil { if err != nil {
// NOTE: The existence of meta should imply the existence of the // NOTE: The existence of meta should imply the existence of the
// block. So, make sure meta is only saved after blocks are saved. // block. So, make sure meta is only saved after blocks are saved.
@ -137,7 +132,7 @@ func (bs *BlockStore) LoadSeenCommit(height int64) *types.Commit {
} }
err := cdc.UnmarshalBinaryBare(bz, commit) err := cdc.UnmarshalBinaryBare(bz, commit)
if err != nil { if err != nil {
panic(cmn.ErrorWrap(err, "Error reading block commit"))
panic(cmn.ErrorWrap(err, "Error reading block seen commit"))
} }
return commit return commit
} }


+ 1
- 1
blockchain/store_test.go View File

@ -179,7 +179,7 @@ func TestBlockStoreSaveLoadBlock(t *testing.T) {
seenCommit: seenCommit1, seenCommit: seenCommit1,
corruptSeenCommitInDB: true, corruptSeenCommitInDB: true,
wantPanic: "Error reading block commit",
wantPanic: "Error reading block seen commit",
}, },
{ {


+ 4
- 4
consensus/common_test.go View File

@ -21,7 +21,7 @@ import (
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
privval "github.com/tendermint/tendermint/types/priv_validator"
pvm "github.com/tendermint/tendermint/types/priv_validator"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
dbm "github.com/tendermint/tmlibs/db" dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log" "github.com/tendermint/tmlibs/log"
@ -278,10 +278,10 @@ func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state sm.S
return cs return cs
} }
func loadPrivValidator(config *cfg.Config) *privval.FilePV {
func loadPrivValidator(config *cfg.Config) *pvm.FilePV {
privValidatorFile := config.PrivValidatorFile() privValidatorFile := config.PrivValidatorFile()
ensureDir(path.Dir(privValidatorFile), 0700) ensureDir(path.Dir(privValidatorFile), 0700)
privValidator := privval.LoadOrGenFilePV(privValidatorFile)
privValidator := pvm.LoadOrGenFilePV(privValidatorFile)
privValidator.Reset() privValidator.Reset()
return privValidator return privValidator
} }
@ -379,7 +379,7 @@ func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerF
privVal = privVals[i] privVal = privVals[i]
} else { } else {
_, tempFilePath := cmn.Tempfile("priv_validator_") _, tempFilePath := cmn.Tempfile("priv_validator_")
privVal = privval.GenFilePV(tempFilePath)
privVal = pvm.GenFilePV(tempFilePath)
} }
app := appFunc() app := appFunc()


+ 19
- 16
consensus/reactor.go View File

@ -107,27 +107,31 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
// TODO optimize // TODO optimize
return []*p2p.ChannelDescriptor{ return []*p2p.ChannelDescriptor{
{ {
ID: StateChannel,
Priority: 5,
SendQueueCapacity: 100,
ID: StateChannel,
Priority: 5,
SendQueueCapacity: 100,
RecvMessageCapacity: maxConsensusMessageSize,
}, },
{ {
ID: DataChannel, // maybe split between gossiping current block and catchup stuff
Priority: 10, // once we gossip the whole block there's nothing left to send until next height or round
SendQueueCapacity: 100,
RecvBufferCapacity: 50 * 4096,
ID: DataChannel, // maybe split between gossiping current block and catchup stuff
Priority: 10, // once we gossip the whole block there's nothing left to send until next height or round
SendQueueCapacity: 100,
RecvBufferCapacity: 50 * 4096,
RecvMessageCapacity: maxConsensusMessageSize,
}, },
{ {
ID: VoteChannel,
Priority: 5,
SendQueueCapacity: 100,
RecvBufferCapacity: 100 * 100,
ID: VoteChannel,
Priority: 5,
SendQueueCapacity: 100,
RecvBufferCapacity: 100 * 100,
RecvMessageCapacity: maxConsensusMessageSize,
}, },
{ {
ID: VoteSetBitsChannel,
Priority: 1,
SendQueueCapacity: 2,
RecvBufferCapacity: 1024,
ID: VoteSetBitsChannel,
Priority: 1,
SendQueueCapacity: 2,
RecvBufferCapacity: 1024,
RecvMessageCapacity: maxConsensusMessageSize,
}, },
} }
} }
@ -1278,7 +1282,6 @@ func RegisterConsensusMessages(cdc *amino.Codec) {
} }
// DecodeMessage decodes the given bytes into a ConsensusMessage. // DecodeMessage decodes the given bytes into a ConsensusMessage.
// TODO: check for unnecessary extra bytes at the end.
func DecodeMessage(bz []byte) (msg ConsensusMessage, err error) { func DecodeMessage(bz []byte) (msg ConsensusMessage, err error) {
err = cdc.UnmarshalBinaryBare(bz, &msg) err = cdc.UnmarshalBinaryBare(bz, &msg)
return return


+ 2
- 2
consensus/replay_test.go View File

@ -26,7 +26,7 @@ import (
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
privval "github.com/tendermint/tendermint/types/priv_validator"
pvm "github.com/tendermint/tendermint/types/priv_validator"
"github.com/tendermint/tmlibs/log" "github.com/tendermint/tmlibs/log"
) )
@ -325,7 +325,7 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
walFile := tempWALWithData(walBody) walFile := tempWALWithData(walBody)
config.Consensus.SetWalFile(walFile) config.Consensus.SetWalFile(walFile)
privVal := privval.LoadFilePV(config.PrivValidatorFile())
privVal := pvm.LoadFilePV(config.PrivValidatorFile())
wal, err := NewWAL(walFile, false) wal, err := NewWAL(walFile, false)
if err != nil { if err != nil {


+ 1
- 1
consensus/state.go View File

@ -778,7 +778,7 @@ func (cs *ConsensusState) enterPropose(height int64, round int) {
// if not a validator, we're done // if not a validator, we're done
if !cs.Validators.HasAddress(cs.privValidator.GetAddress()) { if !cs.Validators.HasAddress(cs.privValidator.GetAddress()) {
cs.Logger.Debug("This node is not a validator 2", cs.privValidator.GetAddress(), cs.Validators)
cs.Logger.Debug("This node is not a validator", "addr", cs.privValidator.GetAddress(), "vals", cs.Validators)
return return
} }
cs.Logger.Debug("This node is a validator") cs.Logger.Debug("This node is a validator")


+ 1
- 1
consensus/wal.go View File

@ -42,7 +42,7 @@ func RegisterWALMessages(cdc *amino.Codec) {
cdc.RegisterConcrete(types.EventDataRoundState{}, "tendermint/wal/EventDataRoundState", nil) cdc.RegisterConcrete(types.EventDataRoundState{}, "tendermint/wal/EventDataRoundState", nil)
cdc.RegisterConcrete(msgInfo{}, "tendermint/wal/MsgInfo", nil) cdc.RegisterConcrete(msgInfo{}, "tendermint/wal/MsgInfo", nil)
cdc.RegisterConcrete(timeoutInfo{}, "tendermint/wal/TimeoutInfo", nil) cdc.RegisterConcrete(timeoutInfo{}, "tendermint/wal/TimeoutInfo", nil)
cdc.RegisterConcrete(EndHeightMessage{}, "tendermint/wal/EndHeightMessagE", nil)
cdc.RegisterConcrete(EndHeightMessage{}, "tendermint/wal/EndHeightMessage", nil)
} }
//-------------------------------------------------------- //--------------------------------------------------------


+ 2
- 3
consensus/wal_generator.go View File

@ -17,7 +17,7 @@ import (
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
privval "github.com/tendermint/tendermint/types/priv_validator"
pvm "github.com/tendermint/tendermint/types/priv_validator"
auto "github.com/tendermint/tmlibs/autofile" auto "github.com/tendermint/tmlibs/autofile"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/db" "github.com/tendermint/tmlibs/db"
@ -41,7 +41,7 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) {
// COPY PASTE FROM node.go WITH A FEW MODIFICATIONS // COPY PASTE FROM node.go WITH A FEW MODIFICATIONS
// NOTE: we can't import node package because of circular dependency // NOTE: we can't import node package because of circular dependency
privValidatorFile := config.PrivValidatorFile() privValidatorFile := config.PrivValidatorFile()
privValidator := privval.LoadOrGenFilePV(privValidatorFile)
privValidator := pvm.LoadOrGenFilePV(privValidatorFile)
genDoc, err := types.GenesisDocFromFile(config.GenesisFile()) genDoc, err := types.GenesisDocFromFile(config.GenesisFile())
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to read genesis file") return nil, errors.Wrap(err, "failed to read genesis file")
@ -72,7 +72,6 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) {
consensusState := NewConsensusState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool) consensusState := NewConsensusState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
consensusState.SetLogger(logger) consensusState.SetLogger(logger)
consensusState.SetEventBus(eventBus) consensusState.SetEventBus(eventBus)
fmt.Println(">>privval", privValidator)
if privValidator != nil { if privValidator != nil {
consensusState.SetPrivValidator(privValidator) consensusState.SetPrivValidator(privValidator)
} }


+ 1
- 1
evidence/reactor.go View File

@ -141,7 +141,7 @@ type EvidenceMessage interface{}
func RegisterEvidenceMessages(cdc *amino.Codec) { func RegisterEvidenceMessages(cdc *amino.Codec) {
cdc.RegisterInterface((*EvidenceMessage)(nil), nil) cdc.RegisterInterface((*EvidenceMessage)(nil), nil)
cdc.RegisterConcrete(&EvidenceListMessage{}, cdc.RegisterConcrete(&EvidenceListMessage{},
"tendermint/evidence/EvidenceListMessagE", nil)
"tendermint/evidence/EvidenceListMessage", nil)
} }
// DecodeMessage decodes a byte-array into a EvidenceMessage. // DecodeMessage decodes a byte-array into a EvidenceMessage.


+ 5
- 5
node/node.go View File

@ -35,7 +35,7 @@ import (
"github.com/tendermint/tendermint/state/txindex/kv" "github.com/tendermint/tendermint/state/txindex/kv"
"github.com/tendermint/tendermint/state/txindex/null" "github.com/tendermint/tendermint/state/txindex/null"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
privval "github.com/tendermint/tendermint/types/priv_validator"
pvm "github.com/tendermint/tendermint/types/priv_validator"
"github.com/tendermint/tendermint/version" "github.com/tendermint/tendermint/version"
_ "net/http/pprof" _ "net/http/pprof"
@ -80,7 +80,7 @@ type NodeProvider func(*cfg.Config, log.Logger) (*Node, error)
// It implements NodeProvider. // It implements NodeProvider.
func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) { func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
return NewNode(config, return NewNode(config,
privval.LoadOrGenFilePV(config.PrivValidatorFile()),
pvm.LoadOrGenFilePV(config.PrivValidatorFile()),
proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()), proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()),
DefaultGenesisDocProviderFunc(config), DefaultGenesisDocProviderFunc(config),
DefaultDBProvider, DefaultDBProvider,
@ -181,8 +181,8 @@ func NewNode(config *cfg.Config,
// TODO: persist this key so external signer // TODO: persist this key so external signer
// can actually authenticate us // can actually authenticate us
privKey = crypto.GenPrivKeyEd25519() privKey = crypto.GenPrivKeyEd25519()
pvsc = privval.NewSocketPV(
logger.With("module", "privval"),
pvsc = pvm.NewSocketPV(
logger.With("module", "pvm"),
config.PrivValidatorListenAddr, config.PrivValidatorListenAddr,
privKey, privKey,
) )
@ -446,7 +446,7 @@ func (n *Node) OnStop() {
n.eventBus.Stop() n.eventBus.Stop()
n.indexerService.Stop() n.indexerService.Stop()
if pvsc, ok := n.privValidator.(*privval.SocketPV); ok {
if pvsc, ok := n.privValidator.(*pvm.SocketPV); ok {
if err := pvsc.Stop(); err != nil { if err := pvsc.Stop(); err != nil {
n.Logger.Error("Error stopping priv validator socket client", "err", err) n.Logger.Error("Error stopping priv validator socket client", "err", err)
} }


+ 2
- 2
p2p/conn/connection.go View File

@ -18,8 +18,8 @@ import (
) )
const ( const (
maxPacketMsgPayloadSizeDefault = 1024
maxPacketMsgOverheadSize = 14
maxPacketMsgPayloadSizeDefault = 1024 // NOTE: Must be below 16,384 bytes for 14 below.
maxPacketMsgOverheadSize = 14 // NOTE: See connection_test for derivation.
numBatchPacketMsgs = 10 numBatchPacketMsgs = 10
minReadBufferSize = 1024 minReadBufferSize = 1024


+ 2
- 1
p2p/conn/connection_test.go View File

@ -136,7 +136,8 @@ func TestMConnectionPongTimeoutResultsInError(t *testing.T) {
go func() { go func() {
// read ping // read ping
var pkt PacketPing var pkt PacketPing
_, err = cdc.UnmarshalBinaryReader(server, &pkt, 1024)
const maxPacketPingSize = 1024
_, err = cdc.UnmarshalBinaryReader(server, &pkt, maxPacketPingSize)
assert.Nil(t, err) assert.Nil(t, err)
serverGotPing <- struct{}{} serverGotPing <- struct{}{}
}() }()


+ 3
- 2
p2p/conn/secret_connection_test.go View File

@ -240,9 +240,10 @@ func BenchmarkSecretConnection(b *testing.B) {
} }
func fingerprint(bz []byte) []byte { func fingerprint(bz []byte) []byte {
if len(bz) < 40 {
const fbsize = 40
if len(bz) < fbsize {
return bz return bz
} else { } else {
return bz[:40]
return bz[:fbsize]
} }
} }

+ 1
- 1
rpc/lib/server/handlers.go View File

@ -208,7 +208,7 @@ func jsonParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, raw []byte, argsOffset
return mapParamsToArgs(rpcFunc, cdc, m, argsOffset) return mapParamsToArgs(rpcFunc, cdc, m, argsOffset)
} }
// Ttherwise, try an array.
// Otherwise, try an array.
var a []json.RawMessage var a []json.RawMessage
err = json.Unmarshal(raw, &a) err = json.Unmarshal(raw, &a)
if err == nil { if err == nil {


+ 4
- 4
rpc/test/helpers.go View File

@ -19,7 +19,7 @@ import (
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
core_grpc "github.com/tendermint/tendermint/rpc/grpc" core_grpc "github.com/tendermint/tendermint/rpc/grpc"
rpcclient "github.com/tendermint/tendermint/rpc/lib/client" rpcclient "github.com/tendermint/tendermint/rpc/lib/client"
privval "github.com/tendermint/tendermint/types/priv_validator"
pvm "github.com/tendermint/tendermint/types/priv_validator"
) )
var globalConfig *cfg.Config var globalConfig *cfg.Config
@ -117,10 +117,10 @@ func NewTendermint(app abci.Application) *nm.Node {
config := GetConfig() config := GetConfig()
logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout)) logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout))
logger = log.NewFilter(logger, log.AllowError()) logger = log.NewFilter(logger, log.AllowError())
privValidatorFile := config.PrivValidatorFile()
privValidator := privval.LoadOrGenFilePV(privValidatorFile)
pvFile := config.PrivValidatorFile()
pv := pvm.LoadOrGenFilePV(pvFile)
papp := proxy.NewLocalClientCreator(app) papp := proxy.NewLocalClientCreator(app)
node, err := nm.NewNode(config, privValidator, papp,
node, err := nm.NewNode(config, pv, papp,
nm.DefaultGenesisDocProviderFunc(config), nm.DefaultGenesisDocProviderFunc(config),
nm.DefaultDBProvider, logger) nm.DefaultDBProvider, logger)
if err != nil { if err != nil {


Loading…
Cancel
Save