diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 49d692365..6ba6b66fc 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -28,6 +28,7 @@ const ( statusUpdateIntervalSeconds = 10 // check if we should switch to consensus reactor switchToConsensusIntervalSeconds = 1 + maxBlockchainResponseSize = types.MaxBlockSize + 2 ) type consensusReactor interface { @@ -279,10 +280,10 @@ var _ = wire.RegisterInterface( // TODO: ensure that bz is completely read. func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) { msgType = bz[0] - n := int64(0) + n := int(0) r := bytes.NewReader(bz) - msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage - if err != nil && n != int64(len(bz)) { + msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage + if err != nil && n != len(bz) { err = errors.New("DecodeMessage() had bytes left over.") } return @@ -300,6 +301,7 @@ func (m *bcBlockRequestMessage) String() string { //------------------------------------- +// NOTE: keep up-to-date with maxBlockchainResponseSize type bcBlockResponseMessage struct { Block *types.Block } diff --git a/blockchain/store.go b/blockchain/store.go index 7f08c5817..491b6955f 100644 --- a/blockchain/store.go +++ b/blockchain/store.go @@ -1,4 +1,3 @@ - package blockchain import ( @@ -7,9 +6,9 @@ import ( "fmt" "io" - "github.com/tendermint/go-wire" . "github.com/tendermint/go-common" dbm "github.com/tendermint/go-db" + "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/types" ) @@ -54,13 +53,13 @@ func (bs *BlockStore) GetReader(key []byte) io.Reader { } func (bs *BlockStore) LoadBlock(height int) *types.Block { - var n int64 + var n int var err error r := bs.GetReader(calcBlockMetaKey(height)) if r == nil { return nil } - meta := wire.ReadBinary(&types.BlockMeta{}, r, &n, &err).(*types.BlockMeta) + meta := wire.ReadBinary(&types.BlockMeta{}, r, 0, &n, &err).(*types.BlockMeta) if err != nil { PanicCrisis(Fmt("Error reading block meta: %v", err)) } @@ -69,7 +68,7 @@ func (bs *BlockStore) LoadBlock(height int) *types.Block { part := bs.LoadBlockPart(height, i) bytez = append(bytez, part.Bytes...) } - block := wire.ReadBinary(&types.Block{}, bytes.NewReader(bytez), &n, &err).(*types.Block) + block := wire.ReadBinary(&types.Block{}, bytes.NewReader(bytez), 0, &n, &err).(*types.Block) if err != nil { PanicCrisis(Fmt("Error reading block: %v", err)) } @@ -77,13 +76,13 @@ func (bs *BlockStore) LoadBlock(height int) *types.Block { } func (bs *BlockStore) LoadBlockPart(height int, index int) *types.Part { - var n int64 + var n int var err error r := bs.GetReader(calcBlockPartKey(height, index)) if r == nil { return nil } - part := wire.ReadBinary(&types.Part{}, r, &n, &err).(*types.Part) + part := wire.ReadBinary(&types.Part{}, r, 0, &n, &err).(*types.Part) if err != nil { PanicCrisis(Fmt("Error reading block part: %v", err)) } @@ -91,13 +90,13 @@ func (bs *BlockStore) LoadBlockPart(height int, index int) *types.Part { } func (bs *BlockStore) LoadBlockMeta(height int) *types.BlockMeta { - var n int64 + var n int var err error r := bs.GetReader(calcBlockMetaKey(height)) if r == nil { return nil } - meta := wire.ReadBinary(&types.BlockMeta{}, r, &n, &err).(*types.BlockMeta) + meta := wire.ReadBinary(&types.BlockMeta{}, r, 0, &n, &err).(*types.BlockMeta) if err != nil { PanicCrisis(Fmt("Error reading block meta: %v", err)) } @@ -107,13 +106,13 @@ func (bs *BlockStore) LoadBlockMeta(height int) *types.BlockMeta { // The +2/3 and other Precommit-votes for block at `height`. // This Validation comes from block.LastValidation for `height+1`. func (bs *BlockStore) LoadBlockValidation(height int) *types.Validation { - var n int64 + var n int var err error r := bs.GetReader(calcBlockValidationKey(height)) if r == nil { return nil } - validation := wire.ReadBinary(&types.Validation{}, r, &n, &err).(*types.Validation) + validation := wire.ReadBinary(&types.Validation{}, r, 0, &n, &err).(*types.Validation) if err != nil { PanicCrisis(Fmt("Error reading validation: %v", err)) } @@ -122,13 +121,13 @@ func (bs *BlockStore) LoadBlockValidation(height int) *types.Validation { // NOTE: the Precommit-vote heights are for the block at `height` func (bs *BlockStore) LoadSeenValidation(height int) *types.Validation { - var n int64 + var n int var err error r := bs.GetReader(calcSeenValidationKey(height)) if r == nil { return nil } - validation := wire.ReadBinary(&types.Validation{}, r, &n, &err).(*types.Validation) + validation := wire.ReadBinary(&types.Validation{}, r, 0, &n, &err).(*types.Validation) if err != nil { PanicCrisis(Fmt("Error reading validation: %v", err)) } diff --git a/consensus/reactor.go b/consensus/reactor.go index 2b4497181..5da5ce568 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -1,4 +1,3 @@ - package consensus import ( @@ -9,13 +8,13 @@ import ( "sync" "time" - bc "github.com/tendermint/tendermint/blockchain" . "github.com/tendermint/go-common" - "github.com/tendermint/tendermint/events" "github.com/tendermint/go-p2p" + "github.com/tendermint/go-wire" + bc "github.com/tendermint/tendermint/blockchain" + "github.com/tendermint/tendermint/events" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" - "github.com/tendermint/go-wire" ) const ( @@ -24,6 +23,7 @@ const ( VoteChannel = byte(0x22) peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send. + maxConsensusMessageSize = 1048576 // 1MB; NOTE: keep in sync with types.PartSet sizes. ) //----------------------------------------------------------------------------- @@ -914,9 +914,9 @@ var _ = wire.RegisterInterface( // TODO: check for unnecessary extra bytes at the end. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) { msgType = bz[0] - n := new(int64) + n := new(int) r := bytes.NewReader(bz) - msg = wire.ReadBinary(struct{ ConsensusMessage }{}, r, n, &err).(struct{ ConsensusMessage }).ConsensusMessage + msg = wire.ReadBinary(struct{ ConsensusMessage }{}, r, maxConsensusMessageSize, n, &err).(struct{ ConsensusMessage }).ConsensusMessage return } diff --git a/consensus/state.go b/consensus/state.go index 7bf80c794..7490d39c0 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -902,9 +902,9 @@ func (cs *ConsensusState) AddProposalBlockPart(height int, part *types.Part) (ad } if added && cs.ProposalBlockParts.IsComplete() { // Added and completed! - var n int64 + var n int var err error - cs.ProposalBlock = wire.ReadBinary(&types.Block{}, cs.ProposalBlockParts.GetReader(), &n, &err).(*types.Block) + cs.ProposalBlock = wire.ReadBinary(&types.Block{}, cs.ProposalBlockParts.GetReader(), types.MaxBlockSize, &n, &err).(*types.Block) log.Info("Received complete proposal", "hash", cs.ProposalBlock.Hash()) if cs.Step == RoundStepPropose && cs.isProposalComplete() { // Move onto the next step diff --git a/mempool/reactor.go b/mempool/reactor.go index 17f580cdd..1101afa4b 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -8,19 +8,20 @@ import ( "time" . "github.com/tendermint/go-common" - "github.com/tendermint/tendermint/events" "github.com/tendermint/go-p2p" + "github.com/tendermint/go-wire" + "github.com/tendermint/tendermint/events" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" - "github.com/tendermint/go-wire" ) var ( MempoolChannel = byte(0x30) - checkExecutedTxsMilliseconds = 1 // check for new mempool txs to send to peer - txsToSendPerCheck = 64 // send up to this many txs from the mempool per check - newBlockChCapacity = 100 // queue to process this many ResetInfos per peer + checkExecutedTxsMilliseconds = 1 // check for new mempool txs to send to peer + txsToSendPerCheck = 64 // send up to this many txs from the mempool per check + newBlockChCapacity = 100 // queue to process this many ResetInfos per peer + maxMempoolMessageSize = 1048576 // 1MB TODO make it configurable ) // MempoolReactor handles mempool tx broadcasting amongst peers. @@ -228,9 +229,9 @@ var _ = wire.RegisterInterface( func DecodeMessage(bz []byte) (msgType byte, msg MempoolMessage, err error) { msgType = bz[0] - n := new(int64) + n := new(int) r := bytes.NewReader(bz) - msg = wire.ReadBinary(struct{ MempoolMessage }{}, r, n, &err).(struct{ MempoolMessage }).MempoolMessage + msg = wire.ReadBinary(struct{ MempoolMessage }{}, r, maxMempoolMessageSize, n, &err).(struct{ MempoolMessage }).MempoolMessage return } diff --git a/node/node.go b/node/node.go index 47a358a90..8b4106773 100644 --- a/node/node.go +++ b/node/node.go @@ -56,7 +56,7 @@ func NewNode() *Node { genDoc, state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) state.Save() // write the gendoc to db - buf, n, err := new(bytes.Buffer), new(int64), new(error) + buf, n, err := new(bytes.Buffer), new(int), new(error) wire.WriteJSON(genDoc, buf, n, err) stateDB.Set(types.GenDocKey, buf.Bytes()) if *err != nil { diff --git a/rpc/server/handlers.go b/rpc/server/handlers.go index b764106fc..bdb942ffa 100644 --- a/rpc/server/handlers.go +++ b/rpc/server/handlers.go @@ -1,4 +1,3 @@ - package rpcserver import ( @@ -14,11 +13,11 @@ import ( "github.com/gorilla/websocket" . "github.com/tendermint/go-common" + "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/events" ctypes "github.com/tendermint/tendermint/rpc/core/types" . "github.com/tendermint/tendermint/rpc/types" "github.com/tendermint/tendermint/types" - "github.com/tendermint/go-wire" ) func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) { @@ -394,7 +393,7 @@ func (wsc *WSConnection) readRoutine() { // receives on a write channel and writes out on the socket func (wsc *WSConnection) writeRoutine() { defer wsc.baseConn.Close() - var n, err = int64(0), error(nil) + var n, err = int(0), error(nil) for { select { case <-wsc.Quit: diff --git a/rpc/server/http_server.go b/rpc/server/http_server.go index a93e95f7e..906690065 100644 --- a/rpc/server/http_server.go +++ b/rpc/server/http_server.go @@ -10,10 +10,10 @@ import ( "runtime/debug" "time" - "github.com/tendermint/tendermint/alert" . "github.com/tendermint/go-common" - . "github.com/tendermint/tendermint/rpc/types" "github.com/tendermint/go-wire" + "github.com/tendermint/tendermint/alert" + . "github.com/tendermint/tendermint/rpc/types" ) func StartHTTPServer(listenAddr string, handler http.Handler) (net.Listener, error) { @@ -33,7 +33,7 @@ func StartHTTPServer(listenAddr string, handler http.Handler) (net.Listener, err } func WriteRPCResponse(w http.ResponseWriter, res RPCResponse) { - buf, n, err := new(bytes.Buffer), int64(0), error(nil) + buf, n, err := new(bytes.Buffer), int(0), error(nil) wire.WriteJSON(res, buf, &n, &err) if err != nil { log.Error("Failed to write RPC response", "error", err, "res", res) diff --git a/state/state.go b/state/state.go index d77d9a172..c9153da80 100644 --- a/state/state.go +++ b/state/state.go @@ -39,14 +39,14 @@ func LoadState(db dbm.DB) *State { if len(buf) == 0 { return nil } else { - r, n, err := bytes.NewReader(buf), new(int64), new(error) - s.ChainID = wire.ReadString(r, n, err) + r, n, err := bytes.NewReader(buf), new(int), new(error) + s.ChainID = wire.ReadString(r, 0, n, err) s.LastBlockHeight = wire.ReadVarint(r, n, err) - s.LastBlockHash = wire.ReadByteSlice(r, n, err) - s.LastBlockParts = wire.ReadBinary(types.PartSetHeader{}, r, n, err).(types.PartSetHeader) + s.LastBlockHash = wire.ReadByteSlice(r, 0, n, err) + s.LastBlockParts = wire.ReadBinary(types.PartSetHeader{}, r, 0, n, err).(types.PartSetHeader) s.LastBlockTime = wire.ReadTime(r, n, err) - s.Validators = wire.ReadBinary(&types.ValidatorSet{}, r, n, err).(*types.ValidatorSet) - s.LastValidators = wire.ReadBinary(&types.ValidatorSet{}, r, n, err).(*types.ValidatorSet) + s.Validators = wire.ReadBinary(&types.ValidatorSet{}, r, 0, n, err).(*types.ValidatorSet) + s.LastValidators = wire.ReadBinary(&types.ValidatorSet{}, r, 0, n, err).(*types.ValidatorSet) if *err != nil { // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED Exit(Fmt("Data has been corrupted or its spec has changed: %v\n", *err)) @@ -57,7 +57,7 @@ func LoadState(db dbm.DB) *State { } func (s *State) Save() { - buf, n, err := new(bytes.Buffer), new(int64), new(error) + buf, n, err := new(bytes.Buffer), new(int), new(error) wire.WriteString(s.ChainID, buf, n, err) wire.WriteVarint(s.LastBlockHeight, buf, n, err) wire.WriteByteSlice(s.LastBlockHash, buf, n, err) diff --git a/types/block.go b/types/block.go index 40b580820..20d139ff5 100644 --- a/types/block.go +++ b/types/block.go @@ -12,6 +12,8 @@ import ( "github.com/tendermint/go-wire" ) +const MaxBlockSize = 22020096 // 21MB TODO make it configurable + type Block struct { *Header `json:"header"` *Data `json:"data"` diff --git a/types/part_set.go b/types/part_set.go index 53e39848c..fc082667f 100644 --- a/types/part_set.go +++ b/types/part_set.go @@ -9,9 +9,9 @@ import ( "code.google.com/p/go.crypto/ripemd160" - "github.com/tendermint/go-wire" . "github.com/tendermint/go-common" "github.com/tendermint/go-merkle" + "github.com/tendermint/go-wire" ) const ( @@ -75,7 +75,7 @@ func (psh PartSetHeader) Equals(other PartSetHeader) bool { return psh.Total == other.Total && bytes.Equal(psh.Hash, other.Hash) } -func (psh PartSetHeader) WriteSignBytes(w io.Writer, n *int64, err *error) { +func (psh PartSetHeader) WriteSignBytes(w io.Writer, n *int, err *error) { wire.WriteTo([]byte(Fmt(`{"hash":"%X","total":%v}`, psh.Hash, psh.Total)), w, n, err) } diff --git a/types/proposal.go b/types/proposal.go index bd40b8502..bb212a47d 100644 --- a/types/proposal.go +++ b/types/proposal.go @@ -37,7 +37,7 @@ func (p *Proposal) String() string { p.BlockPartsHeader, p.POLRound, p.Signature) } -func (p *Proposal) WriteSignBytes(chainID string, w io.Writer, n *int64, err *error) { +func (p *Proposal) WriteSignBytes(chainID string, w io.Writer, n *int, err *error) { wire.WriteTo([]byte(Fmt(`{"chain_id":"%s"`, chainID)), w, n, err) wire.WriteTo([]byte(`,"proposal":{"block_parts_header":`), w, n, err) p.BlockPartsHeader.WriteSignBytes(w, n, err) diff --git a/types/signable.go b/types/signable.go index fddf6e438..df94e43b4 100644 --- a/types/signable.go +++ b/types/signable.go @@ -11,12 +11,12 @@ import ( // Signable is an interface for all signable things. // It typically removes signatures before serializing. type Signable interface { - WriteSignBytes(chainID string, w io.Writer, n *int64, err *error) + WriteSignBytes(chainID string, w io.Writer, n *int, err *error) } // SignBytes is a convenience method for getting the bytes to sign of a Signable. func SignBytes(chainID string, o Signable) []byte { - buf, n, err := new(bytes.Buffer), new(int64), new(error) + buf, n, err := new(bytes.Buffer), new(int), new(error) o.WriteSignBytes(chainID, buf, n, err) if *err != nil { PanicCrisis(err) diff --git a/types/validator.go b/types/validator.go index 1565a371e..d7fdb3b9e 100644 --- a/types/validator.go +++ b/types/validator.go @@ -71,12 +71,12 @@ var ValidatorCodec = validatorCodec{} type validatorCodec struct{} -func (vc validatorCodec) Encode(o interface{}, w io.Writer, n *int64, err *error) { +func (vc validatorCodec) Encode(o interface{}, w io.Writer, n *int, err *error) { wire.WriteBinary(o.(*Validator), w, n, err) } -func (vc validatorCodec) Decode(r io.Reader, n *int64, err *error) interface{} { - return wire.ReadBinary(&Validator{}, r, n, err) +func (vc validatorCodec) Decode(r io.Reader, n *int, err *error) interface{} { + return wire.ReadBinary(&Validator{}, r, 0, n, err) } func (vc validatorCodec) Compare(o1 interface{}, o2 interface{}) int { diff --git a/types/vote.go b/types/vote.go index 48ce10023..cd145552c 100644 --- a/types/vote.go +++ b/types/vote.go @@ -5,8 +5,8 @@ import ( "fmt" "io" - "github.com/tendermint/go-crypto" . "github.com/tendermint/go-common" + "github.com/tendermint/go-crypto" "github.com/tendermint/go-wire" ) @@ -28,11 +28,11 @@ func (err *ErrVoteConflictingSignature) Error() string { // Represents a prevote, precommit, or commit vote from validators for consensus. type Vote struct { - Height int `json:"height"` - Round int `json:"round"` - Type byte `json:"type"` - BlockHash []byte `json:"block_hash"` // empty if vote is nil. - BlockPartsHeader PartSetHeader `json:"block_parts_header"` // zero if vote is nil. + Height int `json:"height"` + Round int `json:"round"` + Type byte `json:"type"` + BlockHash []byte `json:"block_hash"` // empty if vote is nil. + BlockPartsHeader PartSetHeader `json:"block_parts_header"` // zero if vote is nil. Signature crypto.SignatureEd25519 `json:"signature"` } @@ -42,7 +42,7 @@ const ( VoteTypePrecommit = byte(0x02) ) -func (vote *Vote) WriteSignBytes(chainID string, w io.Writer, n *int64, err *error) { +func (vote *Vote) WriteSignBytes(chainID string, w io.Writer, n *int, err *error) { wire.WriteTo([]byte(Fmt(`{"chain_id":"%s"`, chainID)), w, n, err) wire.WriteTo([]byte(Fmt(`,"vote":{"block_hash":"%X","block_parts_header":%v`, vote.BlockHash, vote.BlockPartsHeader)), w, n, err) wire.WriteTo([]byte(Fmt(`,"height":%v,"round":%v,"type":%v}}`, vote.Height, vote.Round, vote.Type)), w, n, err)