From 9c1795a04da29dcab6c9269d644f9f5f55d05b18 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Thu, 11 Sep 2014 22:44:59 -0700 Subject: [PATCH] add mempool to consensus --- blocks/block.go | 43 ++++++++++++++++++++++++++++++------------ blocks/block_test.go | 4 ++-- blocks/codec_test.go | 34 ++++++++++++++++----------------- consensus/consensus.go | 26 +++++++++++++++---------- mempool/agent.go | 37 ++++++++++++++++++++++++++++++++---- mempool/mempool.go | 9 ++------- p2p/README.md | 4 +++- state/state.go | 35 ++++++++++++++++++++++++++++------ 8 files changed, 133 insertions(+), 59 deletions(-) diff --git a/blocks/block.go b/blocks/block.go index c16e7f059..d9f6d4f43 100644 --- a/blocks/block.go +++ b/blocks/block.go @@ -1,12 +1,15 @@ package blocks import ( + "bytes" "crypto/sha256" + "errors" "io" "time" . "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" + . "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/merkle" ) @@ -14,6 +17,12 @@ const ( defaultBlockPartSizeBytes = 4096 ) +var ( + ErrBlockInvalidNetwork = errors.New("Error block invalid network") + ErrBlockInvalidBlockHeight = errors.New("Error block invalid height") + ErrBlockInvalidLastBlockHash = errors.New("Error block invalid last blockhash") +) + type Block struct { Header Validation @@ -38,8 +47,18 @@ func (b *Block) WriteTo(w io.Writer) (n int64, err error) { return } -func (b *Block) ValidateBasic() error { - // TODO Basic validation that doesn't involve context. +// Basic validation that doesn't involve state data. +func (b *Block) ValidateBasic(lastBlockHeight uint32, lastBlockHash []byte) error { + if b.Header.Network != Config.Network { + return ErrBlockInvalidNetwork + } + if b.Header.Height != lastBlockHeight { + return ErrBlockInvalidBlockHeight + } + if !bytes.Equal(b.Header.LastBlockHash, lastBlockHash) { + return ErrBlockInvalidLastBlockHash + } + // XXX more validation return nil } @@ -83,11 +102,11 @@ func (b *Block) ToBlockPartSet() *BlockPartSet { func (b *Block) MakeNextBlock() *Block { return &Block{ Header: Header{ - Name: b.Header.Name, - Height: b.Header.Height + 1, + Network: b.Header.Network, + Height: b.Header.Height + 1, //Fees: uint64(0), - Time: time.Now(), - PrevHash: b.Hash(), + Time: time.Now(), + LastBlockHash: b.Hash(), //ValidationStateHash: nil, //AccountStateHash: nil, }, @@ -149,11 +168,11 @@ func (bp *BlockPart) Hash() []byte { //----------------------------------------------------------------------------- type Header struct { - Name string + Network string Height uint32 Fees uint64 Time time.Time - PrevHash []byte + LastBlockHash []byte ValidationStateHash []byte AccountStateHash []byte @@ -166,22 +185,22 @@ func ReadHeader(r io.Reader, n *int64, err *error) (h Header) { return Header{} } return Header{ - Name: ReadString(r, n, err), + Network: ReadString(r, n, err), Height: ReadUInt32(r, n, err), Fees: ReadUInt64(r, n, err), Time: ReadTime(r, n, err), - PrevHash: ReadByteSlice(r, n, err), + LastBlockHash: ReadByteSlice(r, n, err), ValidationStateHash: ReadByteSlice(r, n, err), AccountStateHash: ReadByteSlice(r, n, err), } } func (h *Header) WriteTo(w io.Writer) (n int64, err error) { - WriteString(w, h.Name, &n, &err) + WriteString(w, h.Network, &n, &err) WriteUInt32(w, h.Height, &n, &err) WriteUInt64(w, h.Fees, &n, &err) WriteTime(w, h.Time, &n, &err) - WriteByteSlice(w, h.PrevHash, &n, &err) + WriteByteSlice(w, h.LastBlockHash, &n, &err) WriteByteSlice(w, h.ValidationStateHash, &n, &err) WriteByteSlice(w, h.AccountStateHash, &n, &err) return diff --git a/blocks/block_test.go b/blocks/block_test.go index c46631055..6763c8230 100644 --- a/blocks/block_test.go +++ b/blocks/block_test.go @@ -100,11 +100,11 @@ func TestBlock(t *testing.T) { block := &Block{ Header: Header{ - Name: "Tendermint", + Network: "Tendermint", Height: randUInt32(), Fees: randUInt64(), Time: randTime(), - PrevHash: randBytes(32), + LastBlockHash: randBytes(32), ValidationHash: randBytes(32), DataHash: randBytes(32), }, diff --git a/blocks/codec_test.go b/blocks/codec_test.go index 340d4b391..8f5482754 100644 --- a/blocks/codec_test.go +++ b/blocks/codec_test.go @@ -15,11 +15,11 @@ func BenchmarkTestCustom(b *testing.B) { b.StopTimer() h := &Header{ - Name: "Header", + Network: "Header", Height: 123, Fees: 123, Time: time.Unix(123, 0), - PrevHash: []byte("prevhash"), + LastBlockHash: []byte("prevhash"), ValidationHash: []byte("validationhash"), DataHash: []byte("datahash"), } @@ -33,18 +33,18 @@ func BenchmarkTestCustom(b *testing.B) { var n int64 var err error h2 := ReadHeader(buf, &n, &err) - if h2.Name != "Header" { + if h2.Network != "Header" { b.Fatalf("wrong name") } } } type HHeader struct { - Name string `json:"N"` + Network string `json:"N"` Height uint64 `json:"H"` Fees uint64 `json:"F"` Time uint64 `json:"T"` - PrevHash []byte `json:"PH"` + LastBlockHash []byte `json:"PH"` ValidationHash []byte `json:"VH"` DataHash []byte `json:"DH"` } @@ -53,11 +53,11 @@ func BenchmarkTestJSON(b *testing.B) { b.StopTimer() h := &HHeader{ - Name: "Header", + Network: "Header", Height: 123, Fees: 123, Time: 123, - PrevHash: []byte("prevhash"), + LastBlockHash: []byte("prevhash"), ValidationHash: []byte("validationhash"), DataHash: []byte("datahash"), } @@ -72,7 +72,7 @@ func BenchmarkTestJSON(b *testing.B) { buf.Reset() enc.Encode(h) dec.Decode(h2) - if h2.Name != "Header" { + if h2.Network != "Header" { b.Fatalf("wrong name") } } @@ -82,11 +82,11 @@ func BenchmarkTestGob(b *testing.B) { b.StopTimer() h := &Header{ - Name: "Header", + Network: "Header", Height: 123, Fees: 123, Time: time.Unix(123, 0), - PrevHash: []byte("prevhash"), + LastBlockHash: []byte("prevhash"), ValidationHash: []byte("validationhash"), DataHash: []byte("datahash"), } @@ -101,7 +101,7 @@ func BenchmarkTestGob(b *testing.B) { buf.Reset() enc.Encode(h) dec.Decode(h2) - if h2.Name != "Header" { + if h2.Network != "Header" { b.Fatalf("wrong name") } } @@ -111,11 +111,11 @@ func BenchmarkTestMsgPack(b *testing.B) { b.StopTimer() h := &Header{ - Name: "Header", + Network: "Header", Height: 123, Fees: 123, Time: time.Unix(123, 0), - PrevHash: []byte("prevhash"), + LastBlockHash: []byte("prevhash"), ValidationHash: []byte("validationhash"), DataHash: []byte("datahash"), } @@ -130,7 +130,7 @@ func BenchmarkTestMsgPack(b *testing.B) { buf.Reset() enc.Encode(h) dec.Decode(h2) - if h2.Name != "Header" { + if h2.Network != "Header" { b.Fatalf("wrong name") } } @@ -140,11 +140,11 @@ func BenchmarkTestMsgPack2(b *testing.B) { b.StopTimer() h := &Header{ - Name: "Header", + Network: "Header", Height: 123, Fees: 123, Time: time.Unix(123, 0), - PrevHash: []byte("prevhash"), + LastBlockHash: []byte("prevhash"), ValidationHash: []byte("validationhash"), DataHash: []byte("datahash"), } @@ -161,7 +161,7 @@ func BenchmarkTestMsgPack2(b *testing.B) { buf.Reset() enc.Encode(h) dec.Decode(h2) - if h2.Name != "Header" { + if h2.Network != "Header" { b.Fatalf("wrong name") } } diff --git a/consensus/consensus.go b/consensus/consensus.go index 2b5491108..1b9293767 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -14,6 +14,7 @@ import ( . "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/blocks" . "github.com/tendermint/tendermint/common" + . "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/p2p" . "github.com/tendermint/tendermint/state" ) @@ -99,6 +100,7 @@ type ConsensusAgent struct { conS *ConsensusState blockStore *BlockStore + mempool *Mempool doActionCh chan RoundAction mtx sync.Mutex @@ -109,7 +111,7 @@ type ConsensusAgent struct { stagedState *State } -func NewConsensusAgent(sw *p2p.Switch, state *State, blockStore *BlockStore) *ConsensusAgent { +func NewConsensusAgent(sw *p2p.Switch, blockStore *BlockStore, mempool *Mempool, state *State) *ConsensusAgent { swEvents := make(chan interface{}) sw.AddEventListener("ConsensusAgent.swEvents", swEvents) conS := NewConsensusState(state) @@ -120,6 +122,7 @@ func NewConsensusAgent(sw *p2p.Switch, state *State, blockStore *BlockStore) *Co conS: conS, blockStore: blockStore, + mempool: mempool, doActionCh: make(chan RoundAction, 1), state: state, @@ -337,15 +340,16 @@ func (conA *ConsensusAgent) stageProposal(proposal *BlockPartSet) error { conA.mtx.Unlock() } - // Basic validation if !proposal.IsComplete() { return errors.New("Incomplete proposal BlockPartSet") } block := proposal.Block() - err := block.ValidateBasic() - if err != nil { - return err - } + + // Basic validation is done in state.CommitBlock(). + //err := block.ValidateBasic() + //if err != nil { + // return err + //} // Create a copy of the state for staging conA.mtx.Lock() @@ -353,7 +357,7 @@ func (conA *ConsensusAgent) stageProposal(proposal *BlockPartSet) error { conA.mtx.Unlock() // Commit block onto the copied state. - err = stateCopy.CommitBlock(block) + err := stateCopy.CommitBlock(block) if err != nil { return err } @@ -368,9 +372,10 @@ func (conA *ConsensusAgent) stageProposal(proposal *BlockPartSet) error { // Constructs an unsigned proposal func (conA *ConsensusAgent) constructProposal(rs *RoundState) (*BlockPartSet, error) { - // XXX implement, first implement mempool - // proposal := block.ToBlockPartSet() - return nil, nil + // TODO: make use of state returned from MakeProposal() + proposalBlock, _ := conA.mempool.MakeProposal() + proposal := proposalBlock.ToBlockPartSet() + return proposal, nil } // Vote for (or against) the proposal for this round. @@ -517,6 +522,7 @@ func (conA *ConsensusAgent) commitProposal(proposal *BlockPartSet, commitTime ti conA.conS.Update(conA.state) conA.stagedProposal = nil conA.stagedState = nil + conA.mempool.ResetForBlockAndState(block, conA.state) return nil } diff --git a/mempool/agent.go b/mempool/agent.go index 36733869e..bedf82dbb 100644 --- a/mempool/agent.go +++ b/mempool/agent.go @@ -22,15 +22,18 @@ type MempoolAgent struct { quit chan struct{} started uint32 stopped uint32 + + mempool *Mempool } -func NewMempoolAgent(sw *p2p.Switch) *MempoolAgent { +func NewMempoolAgent(sw *p2p.Switch, mempool *Mempool) *MempoolAgent { swEvents := make(chan interface{}) sw.AddEventListener("MempoolAgent.swEvents", swEvents) memA := &MempoolAgent{ sw: sw, swEvents: swEvents, quit: make(chan struct{}), + mempool: mempool, } return memA } @@ -51,6 +54,16 @@ func (memA *MempoolAgent) Stop() { } } +func (memA *MempoolAgent) BroadcastTx(tx Tx) error { + err := memA.mempool.AddTx(tx) + if err != nil { + return err + } + msg := &TxMessage{Tx: tx} + memA.sw.Broadcast(MempoolCh, msg) + return nil +} + // Handle peer new/done events func (memA *MempoolAgent) switchEventsRoutine() { for { @@ -78,12 +91,28 @@ OUTER_LOOP: break OUTER_LOOP // Client has stopped } _, msg_ := decodeMessage(inMsg.Bytes) - log.Info("gossipMempoolRoutine received %v", msg_) + log.Info("gossipTxRoutine received %v", msg_) switch msg_.(type) { case *TxMessage: - // msg := msg_.(*TxMessage) - // XXX + msg := msg_.(*TxMessage) + err := memA.mempool.AddTx(msg.Tx) + if err != nil { + // Bad, seen, or conflicting tx. + log.Debug("Could not add tx %v", msg.Tx) + continue OUTER_LOOP + } else { + log.Debug("Added valid tx %V", msg.Tx) + } + // Share tx. + // We use a simple shotgun approach for now. + // TODO: improve efficiency + for _, peer := range memA.sw.Peers().List() { + if peer.Key == inMsg.MConn.Peer.Key { + continue + } + peer.TrySend(MempoolCh, msg) + } default: // Ignore unknown message diff --git a/mempool/mempool.go b/mempool/mempool.go index 5bbe9e196..c44c77940 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -1,11 +1,6 @@ /* Mempool receives new transactions and applies them to the latest committed state. -If the transaction is acceptable, then it broadcasts a fingerprint to peers. - -The transaction fingerprint is a short sequence of bytes (shorter than a full hash). -Each peer connection uses a different algorithm for turning the tx hash into a -fingerprint in order to prevent transaction blocking attacks. Upon inspecting a -tx fingerprint, the receiver may query the source for the full tx bytes. +If the transaction is acceptable, then it broadcasts the tx to peers. When this node happens to be the next proposer, it simply takes the recently modified state (and the associated transactions) and use that as the proposal. @@ -60,7 +55,7 @@ func (mem *Mempool) MakeProposal() (*Block, *State) { // Txs that are present in block are discarded from mempool. // Txs that have become invalid in the new state are also discarded. -func (mem *Mempool) ResetForBlockandState(block *Block, state *State) { +func (mem *Mempool) ResetForBlockAndState(block *Block, state *State) { mem.mtx.Lock() defer mem.mtx.Unlock() mem.lastBlock = block diff --git a/p2p/README.md b/p2p/README.md index 411fbb9ad..9b5654806 100644 --- a/p2p/README.md +++ b/p2p/README.md @@ -77,8 +77,10 @@ func (memA *MempoolAgent) switchEventsRoutine() { switch swEvent.(type) { case p2p.SwitchEventNewPeer: // event := swEvent.(p2p.SwitchEventNewPeer) + // NOTE: set up peer state case p2p.SwitchEventDonePeer: // event := swEvent.(p2p.SwitchEventDonePeer) + // NOTE: tear down peer state default: log.Warning("Unhandled switch event type") } @@ -99,7 +101,7 @@ OUTER_LOOP: switch msg_.(type) { case *TxMessage: // msg := msg_.(*TxMessage) - // XXX + // handle msg default: // Ignore unknown message diff --git a/state/state.go b/state/state.go index f0e9d453f..3923e7ead 100644 --- a/state/state.go +++ b/state/state.go @@ -21,7 +21,8 @@ var ( type State struct { mtx sync.Mutex db db_.Db - height uint32 + height uint32 // Last known block height + blockHash []byte // Last known block hash commitTime time.Time accounts merkle.Tree validators *ValidatorSet @@ -33,6 +34,7 @@ func LoadState(db db_.Db) *State { if len(buf) == 0 { s.height = uint32(0) s.commitTime = time.Unix(0, 0) // XXX BOOTSTRAP + s.blockHash = nil // XXX BOOTSTRAP s.accounts = merkle.NewIAVLTree(db) // XXX BOOTSTRAP s.validators = NewValidatorSet(nil) // XXX BOOTSTRAP } else { @@ -41,6 +43,7 @@ func LoadState(db db_.Db) *State { var err error s.height = ReadUInt32(reader, &n, &err) s.commitTime = ReadTime(reader, &n, &err) + s.blockHash = ReadByteSlice(reader, &n, &err) accountsMerkleRoot := ReadByteSlice(reader, &n, &err) s.accounts = merkle.NewIAVLTreeFromHash(db, accountsMerkleRoot) s.validators = NewValidatorSet(nil) @@ -68,6 +71,7 @@ func (s *State) Save(commitTime time.Time) { var err error WriteUInt32(&buf, s.height, &n, &err) WriteTime(&buf, commitTime, &n, &err) + WriteByteSlice(&buf, s.blockHash, &n, &err) WriteByteSlice(&buf, s.accounts.Hash(), &n, &err) for _, validator := range s.validators.Map() { WriteBinary(&buf, validator, &n, &err) @@ -85,15 +89,21 @@ func (s *State) Copy() *State { db: s.db, height: s.height, commitTime: s.commitTime, + blockHash: s.blockHash, accounts: s.accounts.Copy(), validators: s.validators.Copy(), } } -// May return ErrStateInvalidSequenceNumber +// If the tx is invalid, an error will be returned. +// Unlike CommitBlock(), state will not be altered. func (s *State) CommitTx(tx Tx) error { s.mtx.Lock() defer s.mtx.Unlock() + return s.commitTx(tx) +} + +func (s *State) commitTx(tx Tx) error { /* // Get the signer's incr signerId := tx.Signature().SignerId @@ -106,13 +116,26 @@ func (s *State) CommitTx(tx Tx) error { return nil } -// This is called during staging. -// The resulting state is cached until it is actually committed. +// NOTE: If an error occurs during block execution, state will be left +// at an invalid state. Copy the state before calling Commit! func (s *State) CommitBlock(b *Block) error { s.mtx.Lock() defer s.mtx.Unlock() - // XXX commit block by mutating state. - panic("Implement CommitBlock()") + + // Basic block validation. + err := b.ValidateBasic(s.height, s.blockHash) + if err != nil { + return err + } + + // Commit each tx + for _, tx := range b.Data.Txs { + err := s.commitTx(tx) + if err != nil { + return err + } + } + // After all state has been mutated, finally increment validators. s.validators.IncrementAccum() return nil